参考内容

说明

  • 本文章中的代码将mcp_server.py的mcp通信方式由stdio修改为streamable http,直接部署在kali系统,本地通过http进行连接访问。
  • 使用蜜罐系统进行测试环境,统计相关访问信息。
  • 测试体验:由于代码开发不完善,大模型工具调用能力各有不足,导致部分测试成功,部分测试失败,执行较复杂任务,模型会出现偷懒情况,不调用工具,体验不足,智能化程度不够,但对于基于大模型进行渗透测试等方向仍具有意义。

工具

蜜罐系统搭建

  • 访问hfish,选择合适的系统进行安装。具体使用方式参看hfish指导。
  • 提醒:蜜罐系统不要安装在kali系统中,可以安装在本地机器上。

mcp kali工具配置与运行

kali环境代码配置和运行

  1. kali安装minicondah环境,然后创建虚拟环境,在虚拟环境中安装需要的依赖。具体可参看Linux 系统中安装和部署 Miniconda 的详细教程
conda create -n mcp python=3.13
conda activate mcp
pip install flask
pip install fastmcp
  1. 创建kali_server.py【可以放在/usr/local/src下】
#!/usr/bin/env python3

# This script connect the MCP AI agent to Kali Linux terminal and API Server.

import argparse
import json
import logging
import os
import subprocess
import sys
import traceback
import threading
import tempfile
import signal
import time
from typing import Dict, Any, Optional, Tuple
from flask import Flask, request, jsonify

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger(__name__)

# Configuration
API_PORT = int(os.environ.get("API_PORT", 5000))
DEBUG_MODE = os.environ.get("DEBUG_MODE", "0").lower() in ("1", "true", "yes", "y")
COMMAND_TIMEOUT = 180  # 3 minutes default timeout

app = Flask(__name__)

class CommandExecutor:
    """Class to handle command execution with better timeout management"""
    
    def __init__(self, command: str, timeout: int = COMMAND_TIMEOUT, interactive: bool = False):
        self.command = command
        self.timeout = timeout
        self.process = None
        self.stdout_data = ""
        self.stderr_data = ""
        self.stdout_thread = None
        self.stderr_thread = None
        self.return_code = None
        self.timed_out = False
        self.interactive = interactive
        
        # Check if expect is installed - needed for complex interactive commands
        if self.interactive and "expect" in command:
            try:
                check_expect = subprocess.run(["which", "expect"], 
                                             stdout=subprocess.PIPE, 
                                             stderr=subprocess.PIPE)
                if check_expect.returncode != 0:
                    logger.warning("'expect' command not found, installing it...")
                    # Try to install expect
                    install_cmd = subprocess.run(["apt-get", "install", "-y", "expect"],
                                                stdout=subprocess.PIPE,
                                                stderr=subprocess.PIPE)
                    if install_cmd.returncode != 0:
                        logger.error("Failed to install 'expect'. Some interactive commands may not work properly.")
            except Exception as e:
                logger.error(f"Error checking/installing 'expect': {str(e)}")
    
    def _read_stdout(self):
        """Thread function to continuously read stdout"""
        for line in iter(self.process.stdout.readline, ''):
            self.stdout_data += line
            if DEBUG_MODE:
                logger.debug(f"STDOUT: {line.strip()}")
    
    def _read_stderr(self):
        """Thread function to continuously read stderr"""
        for line in iter(self.process.stderr.readline, ''):
            self.stderr_data += line
            if DEBUG_MODE:
                logger.debug(f"STDERR: {line.strip()}")
    
    def execute(self) -> Dict[str, Any]:
        """Execute the command and handle timeout gracefully"""
        logger.info(f"Executing command: {self.command}")

        # For interactive tools, use a different approach with pseudo-TTY
        if self.interactive:
            return self._execute_interactive()
        else:
            return self._execute_non_interactive()
    
    def _execute_non_interactive(self) -> Dict[str, Any]:
        """Execute non-interactive command"""
        try:
            self.process = subprocess.Popen(
                self.command,
                shell=True,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True,
                bufsize=1  # Line buffered
            )
            
            # Start threads to read output continuously
            self.stdout_thread = threading.Thread(target=self._read_stdout)
            self.stderr_thread = threading.Thread(target=self._read_stderr)
            self.stdout_thread.daemon = True
            self.stderr_thread.daemon = True
            self.stdout_thread.start()
            self.stderr_thread.start()
            
            # Wait for the process to complete or timeout
            try:
                self.return_code = self.process.wait(timeout=self.timeout)
                # Process completed, join the threads
                self.stdout_thread.join()
                self.stderr_thread.join()
            except subprocess.TimeoutExpired:
                # Process timed out but we might have partial results
                self.timed_out = True
                logger.warning(f"Command timed out after {self.timeout} seconds. Terminating process.")
                
                # Try to terminate gracefully first
                self.process.terminate()
                try:
                    self.process.wait(timeout=5)  # Give it 5 seconds to terminate
                except subprocess.TimeoutExpired:
                    # Force kill if it doesn't terminate
                    logger.warning("Process not responding to termination. Killing.")
                    self.process.kill()
                
                # Update final output
                self.return_code = -1
            
            # Always consider it a success if we have output, even with timeout
            success = True if self.timed_out and (self.stdout_data or self.stderr_data) else (self.return_code == 0)
            
            return {
                "stdout": self.stdout_data,
                "stderr": self.stderr_data,
                "return_code": self.return_code,
                "success": success,
                "timed_out": self.timed_out,
                "partial_results": self.timed_out and (self.stdout_data or self.stderr_data)
            }
        
        except Exception as e:
            logger.error(f"Error executing command: {str(e)}")
            logger.error(traceback.format_exc())
            return {
                "stdout": self.stdout_data,
                "stderr": f"Error executing command: {str(e)}\n{self.stderr_data}",
                "return_code": -1,
                "success": False,
                "timed_out": False,
                "partial_results": bool(self.stdout_data or self.stderr_data)
            }
    
    def _execute_interactive(self) -> Dict[str, Any]:
        """Execute command that requires a TTY/PTY"""
        temp_output = tempfile.NamedTemporaryFile(delete=False, mode='w+')
        temp_name = temp_output.name
        temp_output.close()
        
        try:
            # Create command that runs in a PTY and redirects output to our temp file
            pty_command = f"script -q -c '{self.command}' {temp_name}"
            
            # Execute the command
            process = subprocess.Popen(
                pty_command,
                shell=True,
                preexec_fn=os.setsid  # Create new process group for proper kill
            )
            
            # Wait for timeout or completion
            try:
                return_code = process.wait(timeout=self.timeout)
                timed_out = False
            except subprocess.TimeoutExpired:
                logger.warning(f"Interactive command timed out after {self.timeout} seconds")
                # Kill the entire process group
                os.killpg(os.getpgid(process.pid), signal.SIGTERM)
                time.sleep(1)
                try:
                    os.killpg(os.getpgid(process.pid), signal.SIGKILL)
                except:
                    pass
                return_code = -1
                timed_out = True
            
            # Read the captured output
            with open(temp_name, 'r') as f:
                output = f.read()
            
            # Determine success based on output presence even if timed out
            success = True if timed_out and output else (return_code == 0)
            
            return {
                "stdout": output,
                "stderr": "",  # All output is captured to stdout with the script command
                "return_code": return_code,
                "success": success,
                "timed_out": timed_out,
                "partial_results": timed_out and bool(output)
            }
        
        except Exception as e:
            logger.error(f"Error executing interactive command: {str(e)}")
            logger.error(traceback.format_exc())
            return {
                "stdout": "",
                "stderr": f"Error executing interactive command: {str(e)}",
                "return_code": -1,
                "success": False,
                "timed_out": False,
                "partial_results": False
            }
        finally:
            # Clean up temp file
            try:
                os.unlink(temp_name)
            except:
                pass


def execute_command(command: str, interactive: bool = False) -> Dict[str, Any]:
    """
    Execute a shell command and return the result
    
    Args:
        command: The command to execute
        interactive: Whether the command requires a TTY
        
    Returns:
        A dictionary containing the stdout, stderr, and return code
    """
    executor = CommandExecutor(command, interactive=interactive)
    return executor.execute()

# Helper function to determine if a command should be run in interactive mode
def needs_interactive_mode(command: str) -> bool:
    """Determine if a command likely needs an interactive TTY session"""
    interactive_tools = [
        "msfconsole", "sqlmap", "hydra", "wpscan", "metasploit",
        "meterpreter", "postgresql", "mysql", "ssh", "ftp", "telnet"
    ]
    
    return any(tool in command for tool in interactive_tools)


@app.route("/api/command", methods=["POST"])
def generic_command():
    """Execute any command provided in the request."""
    try:
        params = request.json
        command = params.get("command", "")
        force_interactive = params.get("interactive", False)
        
        if not command:
            logger.warning("Command endpoint called without command parameter")
            return jsonify({
                "error": "Command parameter is required"
            }), 400
        
        # Determine if command needs interactive mode
        interactive = force_interactive or needs_interactive_mode(command)
        logger.info(f"Running command in {'interactive' if interactive else 'non-interactive'} mode")
        
        result = execute_command(command, interactive=interactive)
        return jsonify(result)
    except Exception as e:
        logger.error(f"Error in command endpoint: {str(e)}")
        logger.error(traceback.format_exc())
        return jsonify({
            "error": f"Server error: {str(e)}"
        }), 500


@app.route("/api/tools/nmap", methods=["POST"])
def nmap():
    """Execute nmap scan with the provided parameters."""
    try:
        params = request.json
        target = params.get("target", "")
        scan_type = params.get("scan_type", "-sCV")
        ports = params.get("ports", "")
        additional_args = params.get("additional_args", "-T4 -Pn")
        
        if not target:
            logger.warning("Nmap called without target parameter")
            return jsonify({
                "error": "Target parameter is required"
            }), 400        
        
        command = f"nmap {scan_type}"
        
        if ports:
            command += f" -p {ports}"
        
        if additional_args:
            # Basic validation for additional args - more sophisticated validation would be better
            command += f" {additional_args}"
        
        command += f" {target}"
        
        result = execute_command(command)
        return jsonify(result)
    except Exception as e:
        logger.error(f"Error in nmap endpoint: {str(e)}")
        logger.error(traceback.format_exc())
        return jsonify({
            "error": f"Server error: {str(e)}"
        }), 500

@app.route("/api/tools/gobuster", methods=["POST"])
def gobuster():
    """Execute gobuster with the provided parameters."""
    try:
        params = request.json
        url = params.get("url", "")
        mode = params.get("mode", "dir")
        wordlist = params.get("wordlist", "/usr/share/wordlists/dirb/common.txt")
        additional_args = params.get("additional_args", "")
        
        if not url:
            logger.warning("Gobuster called without URL parameter")
            return jsonify({
                "error": "URL parameter is required"
            }), 400
        
        # Validate mode
        if mode not in ["dir", "dns", "fuzz", "vhost"]:
            logger.warning(f"Invalid gobuster mode: {mode}")
            return jsonify({
                "error": f"Invalid mode: {mode}. Must be one of: dir, dns, fuzz, vhost"
            }), 400
        
        command = f"gobuster {mode} -u {url} -w {wordlist}"
        
        if additional_args:
            command += f" {additional_args}"
        
        result = execute_command(command)
        return jsonify(result)
    except Exception as e:
        logger.error(f"Error in gobuster endpoint: {str(e)}")
        logger.error(traceback.format_exc())
        return jsonify({
            "error": f"Server error: {str(e)}"
        }), 500

@app.route("/api/tools/dirb", methods=["POST"])
def dirb():
    """Execute dirb with the provided parameters."""
    try:
        params = request.json
        url = params.get("url", "")
        wordlist = params.get("wordlist", "/usr/share/wordlists/dirb/common.txt")
        additional_args = params.get("additional_args", "")
        
        if not url:
            logger.warning("Dirb called without URL parameter")
            return jsonify({
                "error": "URL parameter is required"
            }), 400
        
        command = f"dirb {url} {wordlist}"
        
        if additional_args:
            command += f" {additional_args}"
        
        result = execute_command(command)
        return jsonify(result)
    except Exception as e:
        logger.error(f"Error in dirb endpoint: {str(e)}")
        logger.error(traceback.format_exc())
        return jsonify({
            "error": f"Server error: {str(e)}"
        }), 500

@app.route("/api/tools/nikto", methods=["POST"])
def nikto():
    """Execute nikto with the provided parameters."""
    try:
        params = request.json
        target = params.get("target", "")
        additional_args = params.get("additional_args", "")
        
        if not target:
            logger.warning("Nikto called without target parameter")
            return jsonify({
                "error": "Target parameter is required"
            }), 400
        
        command = f"nikto -h {target}"
        
        if additional_args:
            command += f" {additional_args}"
        
        result = execute_command(command)
        return jsonify(result)
    except Exception as e:
        logger.error(f"Error in nikto endpoint: {str(e)}")
        logger.error(traceback.format_exc())
        return jsonify({
            "error": f"Server error: {str(e)}"
        }), 500

@app.route("/api/tools/sqlmap", methods=["POST"])
def sqlmap():
    """Execute sqlmap with the provided parameters."""
    try:
        params = request.json
        url = params.get("url", "")
        data = params.get("data", "")
        additional_args = params.get("additional_args", "")
        
        if not url:
            logger.warning("SQLMap called without URL parameter")
            return jsonify({
                "error": "URL parameter is required"
            }), 400
        
        command = f"sqlmap -u {url} --batch"
        
        if data:
            command += f" --data=\"{data}\""
        
        if additional_args:
            command += f" {additional_args}"
        
        # SQLMap definitely needs interactive mode
        result = execute_command(command, interactive=True)
        return jsonify(result)
    except Exception as e:
        logger.error(f"Error in sqlmap endpoint: {str(e)}")
        logger.error(traceback.format_exc())
        return jsonify({
            "error": f"Server error: {str(e)}"
        }), 500

@app.route("/api/tools/metasploit", methods=["POST"])
def metasploit():
    """Execute metasploit module with the provided parameters."""
    try:
        params = request.json
        module = params.get("module", "")
        options = params.get("options", {})
        commands = params.get("commands", [])
        timeout = params.get("timeout", COMMAND_TIMEOUT)
        
        if not module:
            logger.warning("Metasploit called without module parameter")
            return jsonify({
                "error": "Module parameter is required"
            }), 400
        
        # Create a more sophisticated MSF resource script
        resource_content = f"use {module}\n"
        
        # Set module options
        for key, value in options.items():
            resource_content += f"set {key} {value}\n"
        
        # Run the exploit
        resource_content += "exploit -z\n"  # -z means don't interact with the session
        
        # Additional post-exploit commands if provided
        for cmd in commands:
            resource_content += f"{cmd}\n"
        
        # Handle sessions more gracefully
        resource_content += "sessions -l\n"  # List all sessions
        resource_content += "sleep 3\n"      # Give it time to complete
        resource_content += "sessions -K\n"  # Kill all sessions before exit
        resource_content += "exit\n"         # Exit msfconsole
        
        # Save resource script to a temporary file with unique name
        resource_file = f"/tmp/mcp_msf_resource_{os.getpid()}_{int(time.time())}.rc"
        with open(resource_file, "w") as f:
            f.write(resource_content)
        
        logger.info(f"Created Metasploit resource script: {resource_file}")
        logger.info(f"Resource script content:\n{resource_content}")
        
        # Use expect script to handle the metasploit console automatically
        # This creates a command that will automatically handle interactive prompts
        expect_script = f"""
timeout {timeout} expect -c '
    set timeout {timeout}
    spawn msfconsole -q -r {resource_file}
    expect {{
        "exploit completed" {{
            sleep 5
            send "exit\\r"
            exp_continue
        }}
        "session 1 opened" {{
            sleep 5
            send "\\r"
            exp_continue
        }}
        "command shell session" {{
            sleep 3
            send "exit\\r"
            exp_continue
        }}
        "meterpreter >" {{
            send "exit\\r"
            exp_continue
        }}
        timeout {{
            puts "TIMEOUT OCCURRED"
            exit 1
        }}
        eof {{
            exit 0
        }}
    }}
'
"""
        
        # Execute the metasploit command with the expect script
        command = expect_script
        result = execute_command(command, interactive=True)
        
        # Clean up the temporary file
        try:
            os.remove(resource_file)
        except Exception as e:
            logger.warning(f"Error removing temporary resource file: {str(e)}")
        
        return jsonify(result)
    except Exception as e:
        logger.error(f"Error in metasploit endpoint: {str(e)}")
        logger.error(traceback.format_exc())
        return jsonify({
            "error": f"Server error: {str(e)}"
        }), 500

@app.route("/api/tools/hydra", methods=["POST"])
def hydra():
    """Execute hydra with the provided parameters."""
    try:
        params = request.json
        target = params.get("target", "")
        service = params.get("service", "")
        username = params.get("username", "")
        username_file = params.get("username_file", "")
        password = params.get("password", "")
        password_file = params.get("password_file", "")
        additional_args = params.get("additional_args", "")
        
        if not target or not service:
            logger.warning("Hydra called without target or service parameter")
            return jsonify({
                "error": "Target and service parameters are required"
            }), 400
        
        if not (username or username_file) or not (password or password_file):
            logger.warning("Hydra called without username/password parameters")
            return jsonify({
                "error": "Username/username_file and password/password_file are required"
            }), 400
        
        command = f"hydra -t 4"
        
        if username:
            command += f" -l {username}"
        elif username_file:
            command += f" -L {username_file}"
        
        if password:
            command += f" -p {password}"
        elif password_file:
            command += f" -P {password_file}"
        
        if additional_args:
            command += f" {additional_args}"
        
        command += f" {target} {service}"
        
        # Hydra might benefit from interactive mode for certain services
        result = execute_command(command, interactive=True)
        return jsonify(result)
    except Exception as e:
        logger.error(f"Error in hydra endpoint: {str(e)}")
        logger.error(traceback.format_exc())
        return jsonify({
            "error": f"Server error: {str(e)}"
        }), 500

@app.route("/api/tools/john", methods=["POST"])
def john():
    """Execute john with the provided parameters."""
    try:
        params = request.json
        hash_file = params.get("hash_file", "")
        wordlist = params.get("wordlist", "/usr/share/wordlists/rockyou.txt")
        format_type = params.get("format", "")
        additional_args = params.get("additional_args", "")
        
        if not hash_file:
            logger.warning("John called without hash_file parameter")
            return jsonify({
                "error": "Hash file parameter is required"
            }), 400
        
        command = f"john"
        
        if format_type:
            command += f" --format={format_type}"
        
        if wordlist:
            command += f" --wordlist={wordlist}"
        
        if additional_args:
            command += f" {additional_args}"
        
        command += f" {hash_file}"
        
        result = execute_command(command)
        return jsonify(result)
    except Exception as e:
        logger.error(f"Error in john endpoint: {str(e)}")
        logger.error(traceback.format_exc())
        return jsonify({
            "error": f"Server error: {str(e)}"
        }), 500

@app.route("/api/tools/wpscan", methods=["POST"])
def wpscan():
    """Execute wpscan with the provided parameters."""
    try:
        params = request.json
        url = params.get("url", "")
        additional_args = params.get("additional_args", "")
        
        if not url:
            logger.warning("WPScan called without URL parameter")
            return jsonify({
                "error": "URL parameter is required"
            }), 400
        
        command = f"wpscan --url {url} --no-banner"
        
        if additional_args:
            command += f" {additional_args}"
        
        # WPScan needs interactive mode as it displays progress bars
        result = execute_command(command, interactive=True)
        return jsonify(result)
    except Exception as e:
        logger.error(f"Error in wpscan endpoint: {str(e)}")
        logger.error(traceback.format_exc())
        return jsonify({
            "error": f"Server error: {str(e)}"
        }), 500

@app.route("/api/tools/enum4linux", methods=["POST"])
def enum4linux():
    """Execute enum4linux with the provided parameters."""
    try:
        params = request.json
        target = params.get("target", "")
        additional_args = params.get("additional_args", "-a")
        
        if not target:
            logger.warning("Enum4linux called without target parameter")
            return jsonify({
                "error": "Target parameter is required"
            }), 400
        
        command = f"enum4linux {additional_args} {target}"
        
        result = execute_command(command)
        return jsonify(result)
    except Exception as e:
        logger.error(f"Error in enum4linux endpoint: {str(e)}")
        logger.error(traceback.format_exc())
        return jsonify({
            "error": f"Server error: {str(e)}"
        }), 500


# Health check endpoint
@app.route("/health", methods=["GET"])
def health_check():
    """Health check endpoint."""
    # Check if essential tools are installed
    essential_tools = ["nmap", "gobuster", "dirb", "nikto"]
    tools_status = {}
    
    for tool in essential_tools:
        try:
            result = execute_command(f"which {tool}")
            tools_status[tool] = result["success"]
        except:
            tools_status[tool] = False
    
    all_essential_tools_available = all(tools_status.values())
    
    return jsonify({
        "status": "healthy",
        "message": "Kali Linux Tools API Server is running",
        "tools_status": tools_status,
        "all_essential_tools_available": all_essential_tools_available
    })

def parse_args():
    """Parse command line arguments."""
    parser = argparse.ArgumentParser(description="Run the Kali Linux API Server")
    parser.add_argument("--debug", action="store_true", help="Enable debug mode")
    parser.add_argument("--port", type=int, default=API_PORT, help=f"Port for the API server (default: {API_PORT})")
    parser.add_argument("--timeout", type=int, default=COMMAND_TIMEOUT, 
                       help=f"Default command timeout in seconds (default: {COMMAND_TIMEOUT})")
    return parser.parse_args()

if __name__ == "__main__":
    args = parse_args()
    
    # Set configuration from command line arguments
    if args.debug:
        DEBUG_MODE = True
        os.environ["DEBUG_MODE"] = "1"
        logger.setLevel(logging.DEBUG)
    
    if args.port != API_PORT:
        API_PORT = args.port
    
    # Update default timeout if specified
    if args.timeout != COMMAND_TIMEOUT:
        COMMAND_TIMEOUT = args.timeout
        logger.info(f"Command timeout set to {COMMAND_TIMEOUT} seconds")
    
    # Check for essential dependencies
    try:
        dependencies = ["script", "expect"]
        for dep in dependencies:
            check = subprocess.run(["which", dep], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            if check.returncode != 0:
                logger.warning(f"'{dep}' command not found, installing required dependencies...")
                install = subprocess.run(["apt-get", "update"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                install = subprocess.run(["apt-get", "install", "-y", "expect", "bsdutils"], 
                                        stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                if install.returncode != 0:
                    logger.error(f"Failed to install required dependencies. Some features may not work properly.")
                break
    except Exception as e:
        logger.error(f"Error checking dependencies: {str(e)}")
    
    logger.info(f"Starting Kali Linux Tools API Server on port {API_PORT}")
    app.run(host="0.0.0.0", port=API_PORT, debug=DEBUG_MODE)

  1. 创建mcp_server.py【可以放在/usr/local/src下】
#!/usr/bin/env python3

# This script connect the MCP AI agent to Kali Linux terminal and API Server.

# some of the code here was inspired from https://github.com/whit3rabbit0/project_astro , be sure to check them out

import sys
import os
import argparse
import logging
from typing import Dict, Any, Optional, List, Union
import requests

from fastmcp import FastMCP

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger(__name__)

# Default configuration
DEFAULT_KALI_SERVER = "http://localhost:5000" # change to your linux IP
DEFAULT_REQUEST_TIMEOUT = 300  # 5 minutes default timeout for API requests

class KaliToolsClient:
    """Client for communicating with the Kali Linux Tools API Server"""
    
    def __init__(self, server_url: str, timeout: int = DEFAULT_REQUEST_TIMEOUT):
        """
        Initialize the Kali Tools Client
        
        Args:
            server_url: URL of the Kali Tools API Server
            timeout: Request timeout in seconds
        """
        self.server_url = server_url.rstrip("/")
        self.timeout = timeout
        logger.info(f"Initialized Kali Tools Client connecting to {server_url}")
        
    def safe_get(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """
        Perform a GET request with optional query parameters.
        
        Args:
            endpoint: API endpoint path (without leading slash)
            params: Optional query parameters
            
        Returns:
            Response data as dictionary
        """
        if params is None:
            params = {}

        url = f"{self.server_url}/{endpoint}"

        try:
            logger.debug(f"GET {url} with params: {params}")
            response = requests.get(url, params=params, timeout=self.timeout)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            logger.error(f"Request failed: {str(e)}")
            return {"error": f"Request failed: {str(e)}", "success": False}
        except Exception as e:
            logger.error(f"Unexpected error: {str(e)}")
            return {"error": f"Unexpected error: {str(e)}", "success": False}

    def safe_post(self, endpoint: str, json_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        Perform a POST request with JSON data.
        
        Args:
            endpoint: API endpoint path (without leading slash)
            json_data: JSON data to send
            
        Returns:
            Response data as dictionary
        """
        url = f"{self.server_url}/{endpoint}"
        
        try:
            logger.debug(f"POST {url} with data: {json_data}")
            response = requests.post(url, json=json_data, timeout=self.timeout)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            logger.error(f"Request failed: {str(e)}")
            return {"error": f"Request failed: {str(e)}", "success": False}
        except Exception as e:
            logger.error(f"Unexpected error: {str(e)}")
            return {"error": f"Unexpected error: {str(e)}", "success": False}

    def execute_command(self, command: str, interactive: bool = False) -> Dict[str, Any]:
        """
        Execute a generic command on the Kali server
        
        Args:
            command: Command to execute
            interactive: Whether the command requires interactive mode
            
        Returns:
            Command execution results
        """
        return self.safe_post("api/command", {"command": command, "interactive": interactive})
    
    def execute_interactive_tool(self, tool: str, commands: List[str] = None, timeout: int = None) -> Dict[str, Any]:
        """
        Execute an interactive tool with a sequence of commands
        
        Args:
            tool: The tool to execute (e.g., msfconsole, sqlmap)
            commands: List of commands to execute in the interactive session
            timeout: Custom timeout for this interactive session
            
        Returns:
            Interactive session results
        """
        data = {
            "command": tool,
            "interactive": True
        }
        
        if timeout is not None:
            data["timeout"] = timeout
            
        return self.safe_post("api/command", data)
    
    def check_health(self) -> Dict[str, Any]:
        """
        Check the health of the Kali Tools API Server
        
        Returns:
            Health status information
        """
        return self.safe_get("health")

def setup_mcp_server(kali_client: KaliToolsClient) -> FastMCP:
    """
    Set up the MCP server with all tool functions
    
    Args:
        kali_client: Initialized KaliToolsClient
        
    Returns:
        Configured FastMCP instance
    """
    mcp = FastMCP("kali-mcp")
    
    @mcp.tool()
    def nmap_scan(target: str, scan_type: str = "-sV", ports: str = "", additional_args: str = "") -> Dict[str, Any]:
        """
        Execute an Nmap scan against a target.
        
        Args:
            target: The IP address or hostname to scan
            scan_type: Scan type (e.g., -sV for version detection)
            ports: Comma-separated list of ports or port ranges
            additional_args: Additional Nmap arguments
            
        Returns:
            Scan results
        """
        data = {
            "target": target,
            "scan_type": scan_type,
            "ports": ports,
            "additional_args": additional_args
        }
        return kali_client.safe_post("api/tools/nmap", data)

    @mcp.tool()
    def gobuster_scan(url: str, mode: str = "dir", wordlist: str = "/usr/share/wordlists/dirb/common.txt", additional_args: str = "") -> Dict[str, Any]:
        """
        Execute Gobuster to find directories, DNS subdomains, or virtual hosts.
        
        Args:
            url: The target URL
            mode: Scan mode (dir, dns, fuzz, vhost)
            wordlist: Path to wordlist file
            additional_args: Additional Gobuster arguments
            
        Returns:
            Scan results
        """
        data = {
            "url": url,
            "mode": mode,
            "wordlist": wordlist,
            "additional_args": additional_args
        }
        return kali_client.safe_post("api/tools/gobuster", data)

    @mcp.tool()
    def dirb_scan(url: str, wordlist: str = "/usr/share/wordlists/dirb/common.txt", additional_args: str = "") -> Dict[str, Any]:
        """
        Execute Dirb web content scanner.
        
        Args:
            url: The target URL
            wordlist: Path to wordlist file
            additional_args: Additional Dirb arguments
            
        Returns:
            Scan results
        """
        data = {
            "url": url,
            "wordlist": wordlist,
            "additional_args": additional_args
        }
        return kali_client.safe_post("api/tools/dirb", data)

    @mcp.tool()
    def nikto_scan(target: str, additional_args: str = "") -> Dict[str, Any]:
        """
        Execute Nikto web server scanner.
        
        Args:
            target: The target URL or IP
            additional_args: Additional Nikto arguments
            
        Returns:
            Scan results
        """
        data = {
            "target": target,
            "additional_args": additional_args
        }
        return kali_client.safe_post("api/tools/nikto", data)

    @mcp.tool()
    def sqlmap_scan(url: str, data: str = "", additional_args: str = "") -> Dict[str, Any]:
        """
        Execute SQLmap SQL injection scanner.
        
        Args:
            url: The target URL
            data: POST data string
            additional_args: Additional SQLmap arguments
            
        Returns:
            Scan results
        """
        post_data = {
            "url": url,
            "data": data,
            "additional_args": additional_args
        }
        return kali_client.safe_post("api/tools/sqlmap", post_data)

   
    @mcp.tool()
    def msfconsole_interactive(resource_script: str = None, commands: List[str] = None, timeout: int = None) -> Dict[str, Any]:
        """
        Start an interactive Metasploit console session.
        
        Args:
            resource_script: Optional path to MSF resource script
            commands: List of commands to execute in MSF console
            timeout: Custom timeout for this MSF console session
            
        Returns:
            MSF console session results
        """
        base_command = "msfconsole -q"
        
        if resource_script:
            if not os.path.exists(resource_script):
                return {
                    "error": f"Resource script file not found: {resource_script}",
                    "success": False
                }
            base_command += f" -r {resource_script}"
        
        # Create a temporary resource script if commands are provided
        temp_resource_file = None
        if commands and not resource_script:
            try:
                import tempfile
                temp_resource_file = tempfile.NamedTemporaryFile(delete=False, mode='w+', suffix='.rc')
                for cmd in commands:
                    temp_resource_file.write(f"{cmd}\n")
                temp_resource_file.close()
                base_command += f" -r {temp_resource_file.name}"
            except Exception as e:
                logger.error(f"Error creating temporary resource file: {str(e)}")
                return {
                    "error": f"Failed to create resource script: {str(e)}",
                    "success": False
                }
        
        try:
            result = kali_client.execute_command(base_command, interactive=True)
            
            # Clean up temporary file if created
            if temp_resource_file and os.path.exists(temp_resource_file.name):
                try:
                    os.unlink(temp_resource_file.name)
                except:
                    pass
                
            return result
        except Exception as e:
            logger.error(f"Error in msfconsole interactive: {str(e)}")
            
            # Clean up on error too
            if temp_resource_file and os.path.exists(temp_resource_file.name):
                try:
                    os.unlink(temp_resource_file.name)
                except:
                    pass
                
            return {
                "error": f"MSF console execution failed: {str(e)}",
                "success": False
            }

    @mcp.tool()
    def hydra_attack(
        target: str, 
        service: str, 
        username: str = "", 
        username_file: str = "", 
        password: str = "", 
        password_file: str = "", 
        additional_args: str = ""
    ) -> Dict[str, Any]:
        """
        Execute Hydra password cracking tool.
        
        Args:
            target: Target IP or hostname
            service: Service to attack (ssh, ftp, http-post-form, etc.)
            username: Single username to try
            username_file: Path to username file
            password: Single password to try
            password_file: Path to password file
            additional_args: Additional Hydra arguments
            
        Returns:
            Attack results
        """
        data = {
            "target": target,
            "service": service,
            "username": username,
            "username_file": username_file,
            "password": password,
            "password_file": password_file,
            "additional_args": additional_args
        }
        return kali_client.safe_post("api/tools/hydra", data)

    @mcp.tool()
    def john_crack(
        hash_file: str, 
        wordlist: str = "/usr/share/wordlists/rockyou.txt", 
        format_type: str = "", 
        additional_args: str = ""
    ) -> Dict[str, Any]:
        """
        Execute John the Ripper password cracker.
        
        Args:
            hash_file: Path to file containing hashes
            wordlist: Path to wordlist file
            format_type: Hash format type
            additional_args: Additional John arguments
            
        Returns:
            Cracking results
        """
        data = {
            "hash_file": hash_file,
            "wordlist": wordlist,
            "format": format_type,
            "additional_args": additional_args
        }
        return kali_client.safe_post("api/tools/john", data)

    @mcp.tool()
    def wpscan_analyze(url: str, additional_args: str = "") -> Dict[str, Any]:
        """
        Execute WPScan WordPress vulnerability scanner.
        
        Args:
            url: The target WordPress URL
            additional_args: Additional WPScan arguments
            
        Returns:
            Scan results
        """
        data = {
            "url": url,
            "additional_args": additional_args
        }
        return kali_client.safe_post("api/tools/wpscan", data)

    @mcp.tool()
    def enum4linux_scan(target: str, additional_args: str = "-a") -> Dict[str, Any]:
        """
        Execute Enum4linux Windows/Samba enumeration tool.
        
        Args:
            target: The target IP or hostname
            additional_args: Additional enum4linux arguments
            
        Returns:
            Enumeration results
        """
        data = {
            "target": target,
            "additional_args": additional_args
        }
        return kali_client.safe_post("api/tools/enum4linux", data)

    @mcp.tool()
    def server_health() -> Dict[str, Any]:
        """
        Check the health status of the Kali API server.
        
        Returns:
            Server health information
        """
        return kali_client.check_health()
    
    @mcp.tool()
    def execute_command(command: str, interactive: bool = False) -> Dict[str, Any]:
        """
        Execute an arbitrary command on the Kali server.
        
        Args:
            command: The command to execute
            interactive: Whether the command requires interactive mode
            
        Returns:
            Command execution results
        """
        return kali_client.execute_command(command, interactive)
    
    @mcp.tool()
    def run_interactive(tool: str, commands: List[str] = None, timeout: int = None) -> Dict[str, Any]:
        """
        Run a tool in interactive mode.
        
        Args:
            tool: The tool to run (e.g., msfconsole, sqlmap, hydra)
            commands: List of commands to send to the interactive session
            timeout: Custom timeout for this interactive session
            
        Returns:
            Interactive session results
        """
        # Special handling for known interactive tools
        if tool.startswith("msfconsole"):
            return msfconsole_interactive(commands=commands, timeout=timeout)
        elif tool.startswith("sqlmap"):
            # Add --batch to make it non-interactive but still use the interactive executor
            if not "--batch" in tool:
                tool += " --batch"
        
        return kali_client.execute_interactive_tool(tool, commands, timeout)
    
    @mcp.tool()
    def sql_interactive(url: str, data: str = "", commands: List[str] = None, timeout: int = None) -> Dict[str, Any]:
        """
        Run SQLMap in interactive mode with more options.
        
        Args:
            url: Target URL
            data: Optional POST data
            commands: Additional SQLMap commands/options
            timeout: Custom timeout for this session
            
        Returns:
            SQLMap session results
        """
        command = f"sqlmap -u {url} --batch"
        
        if data:
            command += f" --data=\"{data}\""
            
        return kali_client.execute_command(command, interactive=True)
    
    @mcp.tool()
    def create_metasploit_resource(commands: List[str], output_file: str) -> Dict[str, Any]:
        """
        Create a Metasploit resource script.
        
        Args:
            commands: List of MSF commands to include in the script
            output_file: Path to save the resource script
            
        Returns:
            Result of script creation
        """
        try:
            with open(output_file, 'w') as f:
                for cmd in commands:
                    f.write(f"{cmd}\n")
            
            return {
                "success": True,
                "message": f"Created Metasploit resource script at {output_file}",
                "file": output_file
            }
        except Exception as e:
            logger.error(f"Error creating resource script: {str(e)}")
            return {
                "success": False,
                "error": f"Failed to create resource script: {str(e)}"
            }

    return mcp

def parse_args():
    """Parse command line arguments."""
    parser = argparse.ArgumentParser(description="Run the Kali MCP Client")
    parser.add_argument("--server", type=str, default=DEFAULT_KALI_SERVER, 
                      help=f"Kali API server URL (default: {DEFAULT_KALI_SERVER})")
    parser.add_argument("--timeout", type=int, default=DEFAULT_REQUEST_TIMEOUT,
                      help=f"Request timeout in seconds (default: {DEFAULT_REQUEST_TIMEOUT})")
    parser.add_argument("--debug", action="store_true", help="Enable debug logging")
    return parser.parse_args()

def main():
    """Main entry point for the MCP server."""
    args = parse_args()
    
    # Configure logging based on debug flag
    if args.debug:
        logger.setLevel(logging.DEBUG)
        logger.debug("Debug logging enabled")
    
    # Initialize the Kali Tools client
    kali_client = KaliToolsClient(args.server, args.timeout)
    
    # Check server health and log the result
    health = kali_client.check_health()
    if "error" in health:
        logger.warning(f"Unable to connect to Kali API server at {args.server}: {health['error']}")
        logger.warning("MCP server will start, but tool execution may fail")
    else:
        logger.info(f"Successfully connected to Kali API server at {args.server}")
        logger.info(f"Server health status: {health['status']}")
        if not health.get("all_essential_tools_available", False):
            logger.warning("Not all essential tools are available on the Kali server")
            missing_tools = [tool for tool, available in health.get("tools_status", {}).items() if not available]
            if missing_tools:
                logger.warning(f"Missing tools: {', '.join(missing_tools)}")
    
    # Set up and run the MCP server
    mcp = setup_mcp_server(kali_client)
    logger.info("Starting Kali MCP server")
    mcp.run(transport="streamable-http",host="0.0.0.0",port=3000,path="/mcp",log_level="debug")

if __name__ == "__main__":
    main()
  1. 启动命令
    python kali_server.py
    python mcp_server.py --server http://localhost:5000 --debug
    

本地环境配置和运行

  1. 本机安装cherry studio,配置好模型api key,同时记得设置模型的工具调用能力和mcp服务器配置。
    在这里插入图片描述
    在这里插入图片描述
  2. 创建新的对话,选择网络安全专家进行对话,勾选mcp服务器
    在这里插入图片描述

测试效果

kali工具调用效果

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

hfish蜜罐系统统计情况

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

Logo

更多推荐