-
-
[原创]一种通用的将Linux命令进行MCP封装的方法
-
发表于: 2025-12-1 00:52 291
-
generic_linux_mcp.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 | """通用 Linux 命令 MCP 服务 (Generic Linux Command MCP)核心功能:1. 启动任意 Linux 命令 (spawn)2. 智能读取输出 (Smart Read):结合提示符匹配 + 连续静默检测3. 发送输入 (Input)4. 发送信号 (Signals: Ctrl+C, Ctrl+Z)解决了"不知道程序何时输出结束"的难题。"""import sysimport osimport timeimport signalimport pexpectfrom fastmcp import FastMCPfrom typing import Optional, List, Union# 初始化 MCP 服务mcp = FastMCP("Generic-Linux-Command-MCP")class ProcessManager: def __init__(self): self.child: Optional[pexpect.spawn] = None self.command: str = "" # 智能读取配置 self.prompts: List[str] = [] self.read_timeout: float = 1.0 # 单次读取超时 self.stable_timeouts: int = 2 # 需要连续多少次超时才认为输出结束 self.max_wait: int = 30 # 最大总等待时间 def start(self, command: str, prompts: List[str] = None, cwd: str = None, env: dict = None): """启动进程""" if self.child and self.child.isalive(): return "Error: Process is already running. Please stop it first." self.command = command self.prompts = prompts or [] try: # 启动进程 # 尝试使用 spawn (需要 PTY),如果失败则尝试 PopenSpawn (管道) try: # 优先尝试 PTY,支持真正的交互性 self.child = pexpect.spawn( command, encoding='utf-8', cwd=cwd, env=env, timeout=self.read_timeout # preexec_fn=os.setsid # 已移除以兼容 Docker ) except Exception as spawn_err: # 如果 spawn 失败(例如 Windows, WSL 特殊环境, 受限 Docker),尝试 PopenSpawn if sys.platform == 'win32' or "Operation not permitted" in str(spawn_err) or "ImportError" in str(spawn_err): from pexpect.popen_spawn import PopenSpawn self.child = PopenSpawn( command, encoding='utf-8', cwd=cwd, env=env, timeout=self.read_timeout ) else: raise spawn_err # 初始读取(获取启动信息) return self._smart_read(timeout_override=2.0) except Exception as e: return f"Failed to start command '{command}': {str(e)}" def write(self, input_text: str, wait_output: bool = True) -> str: """发送输入并获取输出""" if not self.is_alive(): return "Error: Process not running." try: # 发送输入(自动添加换行) self.child.sendline(input_text) except Exception as e: return f"Error sending input: {str(e)}" if wait_output: return self._smart_read() return "Input sent." def send_control(self, char: str) -> str: """发送控制字符 (如 'c' 对应 Ctrl+C)""" if not self.is_alive(): return "Error: Process not running." try: # 检查是否有 sendcontrol 方法 (PopenSpawn 没有) if hasattr(self.child, 'sendcontrol'): self.child.sendcontrol(char) return f"Sent Ctrl+{char.upper()}.\n" + self._smart_read() else: # 自动降级映射: 如果是 PopenSpawn (PIPE模式),尝试用信号模拟 char = char.lower() if char == 'c': return self.send_signal("SIGINT") elif char == 'z': # PIPE 模式下 SIGTSTP 可能无效,但可以尝试 return self.send_signal("SIGTSTP") elif char == '\\': return self.send_signal("SIGQUIT") elif char == 'd': # PIPE 模式下 Ctrl+D 意味着关闭 stdin self.child.sendeof() return "Sent EOF (Ctrl+D)." else: return f"Error: Current process mode (PIPE) does not support keyboard shortcuts like Ctrl+{char}. Please use 'send_signal' instead." except Exception as e: return f"Error sending control char: {e}" def send_signal(self, sig_name: str) -> str: """发送系统信号 (SIGINT, SIGTERM, etc.)""" if not self.is_alive(): return "Error: Process not running." # 映射常用信号 sig_map = { "SIGINT": signal.SIGINT, # Ctrl+C "SIGTERM": signal.SIGTERM, # 终止 "SIGKILL": signal.SIGKILL, # 强制杀死 "SIGSTOP": signal.SIGSTOP, # 暂停 "SIGCONT": signal.SIGCONT, # 继续 "SIGTSTP": signal.SIGTSTP, # Ctrl+Z (终端暂停) } sig = sig_map.get(sig_name.upper()) if not sig: return f"Error: Unknown signal '{sig_name}'. Supported: {list(sig_map.keys())}" try: # 注意:这里只给子进程发送信号,如果需要给进程组发送,可能需要 os.killpg self.child.kill(sig) return f"Sent signal {sig_name}.\n" + self._smart_read() except Exception as e: return f"Error sending signal: {e}" def stop(self): """停止进程""" if self.child: # 尝试优雅关闭 self.child.close(force=False) if self.child.isalive(): self.child.close(force=True) self.child = None return "Process stopped." def is_alive(self) -> bool: return self.child is not None and self.child.isalive() def _smart_read(self, timeout_override: float = None) -> str: """ 智能读取输出策略: 1. 如果定义了 prompts,优先尝试匹配提示符(响应最快)。 2. 如果匹配失败或无 prompts,切换到【连续超时检测】(Stable Silence Detection)。 """ if not self.is_alive(): return "" output_buffer = "" current_timeout = timeout_override or self.read_timeout # 策略 1: 提示符匹配 (Fast Path) if self.prompts: try: # 尝试匹配提示符 self.child.expect(self.prompts, timeout=current_timeout * 2) if self.child.before: output_buffer += self.child.before return output_buffer + "\n[Prompt Detected]" except pexpect.TIMEOUT: # 提示符匹配超时,获取已读内容,进入策略 2 if self.child.before: output_buffer += self.child.before pass except pexpect.EOF: if self.child.before: output_buffer += self.child.before return output_buffer + "\n[Process Finished]" except Exception as e: return f"Error during prompt expect: {str(e)}" # 策略 2: 连续超时检测 (Fallback / Default Path) consecutive_timeouts = 0 start_time = time.time() while True: try: # 等待任何输出 (.+ 表示至少一个字符) self.child.expect([r'.+', pexpect.EOF], timeout=self.read_timeout) # 1. 处理数据 # 注意:对于 expect(r'.+'),匹配到的内容在 child.after 中(因为正则匹配了它) # child.before 是匹配之前的内容(通常为空,除非缓冲区有不匹配的前缀) # 但由于我们匹配的是 "任意字符",pexpect 可能会把一整块 buffer 切分。 # 为了稳健,我们将 before 和 after 都拼起来。 if self.child.before: output_buffer += self.child.before if self.child.after and self.child.after != pexpect.EOF: output_buffer += self.child.after # 2. 收到数据,重置超时计数 consecutive_timeouts = 0 # 3. 检查 EOF if self.child.after == pexpect.EOF: output_buffer += "\n[Process Finished]" break except pexpect.TIMEOUT: consecutive_timeouts += 1 # 4. 超时处理:检查是否有残留的 before 数据 # 即使超时,buffer 里也可能有一部分数据没凑够 pattern(虽然 .+ 只要一个字符) if self.child.before: output_buffer += self.child.before # 清空 before 防止下次循环重复(pexpect 内部机制通常会在下一次 expect 前处理,但这里为了逻辑清晰) # 5. 判断退出条件 if consecutive_timeouts >= self.stable_timeouts: break if (time.time() - start_time) > self.max_wait: output_buffer += "\n[Max Wait Time Reached]" break continue except Exception as e: output_buffer += f"\n[Error reading output: {str(e)}]" break return output_buffer# 全局进程管理器实例manager = ProcessManager()# --- MCP Tools ---@mcp.tool()def start_process(command: str, prompts: List[str] = []) -> str: """ 启动一个新的 Linux 命令进程。 Args: command: 要执行的命令 (例如 "gdb ./main", "python3", "nc -l 4444") prompts: (可选) 程序的提示符列表,有助于更快地检测输出结束。 例如 GDB 使用 ["(gdb)"], Python 使用 [">>>", "..."]。默认为空列表。 """ return manager.start(command, prompts)@mcp.tool()def send_command(command: str) -> str: """ 向当前运行的进程发送命令(输入),并等待并返回输出结果。 注意:此工具不仅发送输入,还会智能等待并返回程序的响应输出。 """ return manager.write(command)@mcp.tool()def send_signal(signal_name: str) -> str: """ 向进程发送系统信号 (SIGINT, SIGTERM 等)。 """ return manager.send_signal(signal_name)@mcp.tool()def send_keyboard_shortcut(key: str) -> str: """ 发送键盘快捷键 (Control + Key) 到当前进程。 这模拟了用户在终端按住 Ctrl 键的同时按下另一个键。 Args: key: 要与 Ctrl 组合的单个字符。 常用示例: - "c": 对应 Ctrl+C (中断/SIGINT)。用于停止正在运行的命令,如 ping。 - "z": 对应 Ctrl+Z (挂起/SIGTSTP)。 - "d": 对应 Ctrl+D (EOF)。用于告诉程序输入结束,如退出 Python shell。 - "\\": 对应 Ctrl+\\ (退出/SIGQUIT)。 """ return manager.send_control(key)@mcp.tool()def read_current_output() -> str: """ 读取当前缓冲区中的输出。 """ return manager._smart_read(timeout_override=0.5)@mcp.tool()def stop_process() -> str: """ 停止当前运行的进程。 """ return manager.stop()if __name__ == "__main__": mcp.run(transport="sse") |
软件依赖
1 2 3 4 5 6 7 8 | root@Jtian:/app# python -VPython 3.12.3root@Jtian:/app# pip list |grep fastmcpfastmcp 1.0root@Jtian:/app#root@Jtian:/app# pip list |grep pexpectpexpect 4.9.0root@Jtian:/app# |
参考
https://bbs.kanxue.com/thread-289218.htm
最后于 2025-12-1 00:55
被Jtian编辑
,原因:
赞赏
他的文章
赞赏
雪币:
留言: