首页
社区
课程
招聘
[原创]一种通用的将Linux命令进行MCP封装的方法
发表于: 2025-12-1 00:52 292

[原创]一种通用的将Linux命令进行MCP封装的方法

2025-12-1 00:52
292

generic_linux_mcp.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
"""
通用 Linux 命令 MCP 服务 (Generic Linux Command MCP)
 
核心功能:
1. 启动任意 Linux 命令 (spawn)
2. 智能读取输出 (Smart Read):结合提示符匹配 + 连续静默检测
3. 发送输入 (Input)
4. 发送信号 (Signals: Ctrl+C, Ctrl+Z)
 
解决了"不知道程序何时输出结束"的难题。
"""
 
import sys
import os
import time
import signal
import pexpect
from fastmcp import FastMCP
from typing import Optional, List, Union
 
# 初始化 MCP 服务
mcp = FastMCP("Generic-Linux-Command-MCP")
 
class ProcessManager:
    def __init__(self):
        self.child: Optional[pexpect.spawn] = None
        self.command: str = ""
        # 智能读取配置
        self.prompts: List[str] = []
        self.read_timeout: float = 1.0        # 单次读取超时
        self.stable_timeouts: int = 2         # 需要连续多少次超时才认为输出结束
        self.max_wait: int = 30               # 最大总等待时间
 
    def start(self, command: str, prompts: List[str] = None, cwd: str = None, env: dict = None):
        """启动进程"""
        if self.child and self.child.isalive():
            return "Error: Process is already running. Please stop it first."
 
        self.command = command
        self.prompts = prompts or []
         
        try:
            # 启动进程
            # 尝试使用 spawn (需要 PTY),如果失败则尝试 PopenSpawn (管道)
            try:
                # 优先尝试 PTY,支持真正的交互性
                self.child = pexpect.spawn(
                    command,
                    encoding='utf-8',
                    cwd=cwd,
                    env=env,
                    timeout=self.read_timeout
                    # preexec_fn=os.setsid # 已移除以兼容 Docker
                )
            except Exception as spawn_err:
                # 如果 spawn 失败(例如 Windows, WSL 特殊环境, 受限 Docker),尝试 PopenSpawn
                if sys.platform == 'win32' or "Operation not permitted" in str(spawn_err) or "ImportError" in str(spawn_err):
                    from pexpect.popen_spawn import PopenSpawn
                    self.child = PopenSpawn(
                        command,
                        encoding='utf-8',
                        cwd=cwd,
                        env=env,
                        timeout=self.read_timeout
                    )
                else:
                    raise spawn_err
             
            # 初始读取(获取启动信息)
            return self._smart_read(timeout_override=2.0)
             
        except Exception as e:
            return f"Failed to start command '{command}': {str(e)}"
 
    def write(self, input_text: str, wait_output: bool = True) -> str:
        """发送输入并获取输出"""
        if not self.is_alive():
            return "Error: Process not running."
 
        try:
            # 发送输入(自动添加换行)
            self.child.sendline(input_text)
        except Exception as e:
             return f"Error sending input: {str(e)}"
         
        if wait_output:
            return self._smart_read()
        return "Input sent."
 
    def send_control(self, char: str) -> str:
        """发送控制字符 (如 'c' 对应 Ctrl+C)"""
        if not self.is_alive():
            return "Error: Process not running."
         
        try:
            # 检查是否有 sendcontrol 方法 (PopenSpawn 没有)
            if hasattr(self.child, 'sendcontrol'):
                self.child.sendcontrol(char)
                return f"Sent Ctrl+{char.upper()}.\n" + self._smart_read()
            else:
                # 自动降级映射: 如果是 PopenSpawn (PIPE模式),尝试用信号模拟
                char = char.lower()
                if char == 'c':
                    return self.send_signal("SIGINT")
                elif char == 'z':
                    # PIPE 模式下 SIGTSTP 可能无效,但可以尝试
                    return self.send_signal("SIGTSTP")
                elif char == '\\':
                    return self.send_signal("SIGQUIT")
                elif char == 'd':
                    # PIPE 模式下 Ctrl+D 意味着关闭 stdin
                    self.child.sendeof()
                    return "Sent EOF (Ctrl+D)."
                else:
                    return f"Error: Current process mode (PIPE) does not support keyboard shortcuts like Ctrl+{char}. Please use 'send_signal' instead."
        except Exception as e:
            return f"Error sending control char: {e}"
 
    def send_signal(self, sig_name: str) -> str:
        """发送系统信号 (SIGINT, SIGTERM, etc.)"""
        if not self.is_alive():
            return "Error: Process not running."
         
        # 映射常用信号
        sig_map = {
            "SIGINT": signal.SIGINT,   # Ctrl+C
            "SIGTERM": signal.SIGTERM, # 终止
            "SIGKILL": signal.SIGKILL, # 强制杀死
            "SIGSTOP": signal.SIGSTOP, # 暂停
            "SIGCONT": signal.SIGCONT, # 继续
            "SIGTSTP": signal.SIGTSTP, # Ctrl+Z (终端暂停)
        }
         
        sig = sig_map.get(sig_name.upper())
        if not sig:
            return f"Error: Unknown signal '{sig_name}'. Supported: {list(sig_map.keys())}"
             
        try:
            # 注意:这里只给子进程发送信号,如果需要给进程组发送,可能需要 os.killpg
            self.child.kill(sig)
            return f"Sent signal {sig_name}.\n" + self._smart_read()
        except Exception as e:
            return f"Error sending signal: {e}"
 
    def stop(self):
        """停止进程"""
        if self.child:
            # 尝试优雅关闭
            self.child.close(force=False)
            if self.child.isalive():
                self.child.close(force=True)
            self.child = None
        return "Process stopped."
 
    def is_alive(self) -> bool:
        return self.child is not None and self.child.isalive()
 
    def _smart_read(self, timeout_override: float = None) -> str:
        """
        智能读取输出策略:
        1. 如果定义了 prompts,优先尝试匹配提示符(响应最快)。
        2. 如果匹配失败或无 prompts,切换到【连续超时检测】(Stable Silence Detection)。
        """
        if not self.is_alive():
            return ""
 
        output_buffer = ""
        current_timeout = timeout_override or self.read_timeout
         
        # 策略 1: 提示符匹配 (Fast Path)
        if self.prompts:
            try:
                # 尝试匹配提示符
                self.child.expect(self.prompts, timeout=current_timeout * 2)
                if self.child.before:
                    output_buffer += self.child.before
                return output_buffer + "\n[Prompt Detected]"
            except pexpect.TIMEOUT:
                # 提示符匹配超时,获取已读内容,进入策略 2
                if self.child.before:
                    output_buffer += self.child.before
                pass
            except pexpect.EOF:
                if self.child.before:
                    output_buffer += self.child.before
                return output_buffer + "\n[Process Finished]"
            except Exception as e:
                return f"Error during prompt expect: {str(e)}"
 
        # 策略 2: 连续超时检测 (Fallback / Default Path)
        consecutive_timeouts = 0
        start_time = time.time()
         
        while True:
            try:
                # 等待任何输出 (.+ 表示至少一个字符)
                self.child.expect([r'.+', pexpect.EOF], timeout=self.read_timeout)
                 
                # 1. 处理数据
                # 注意:对于 expect(r'.+'),匹配到的内容在 child.after 中(因为正则匹配了它)
                # child.before 是匹配之前的内容(通常为空,除非缓冲区有不匹配的前缀)
                # 但由于我们匹配的是 "任意字符",pexpect 可能会把一整块 buffer 切分。
                # 为了稳健,我们将 before 和 after 都拼起来。
                if self.child.before:
                    output_buffer += self.child.before
                if self.child.after and self.child.after != pexpect.EOF:
                    output_buffer += self.child.after
 
                # 2. 收到数据,重置超时计数
                consecutive_timeouts = 0
                 
                # 3. 检查 EOF
                if self.child.after == pexpect.EOF:
                    output_buffer += "\n[Process Finished]"
                    break
                     
            except pexpect.TIMEOUT:
                consecutive_timeouts += 1
                 
                # 4. 超时处理:检查是否有残留的 before 数据
                # 即使超时,buffer 里也可能有一部分数据没凑够 pattern(虽然 .+ 只要一个字符)
                if self.child.before:
                    output_buffer += self.child.before
                    # 清空 before 防止下次循环重复(pexpect 内部机制通常会在下一次 expect 前处理,但这里为了逻辑清晰)
                 
                # 5. 判断退出条件
                if consecutive_timeouts >= self.stable_timeouts:
                    break
                 
                if (time.time() - start_time) > self.max_wait:
                    output_buffer += "\n[Max Wait Time Reached]"
                    break
                     
                continue
            except Exception as e:
                output_buffer += f"\n[Error reading output: {str(e)}]"
                break
                 
        return output_buffer
 
# 全局进程管理器实例
manager = ProcessManager()
 
# --- MCP Tools ---
 
@mcp.tool()
def start_process(command: str, prompts: List[str] = []) -> str:
    """
    启动一个新的 Linux 命令进程。
     
    Args:
        command: 要执行的命令 (例如 "gdb ./main", "python3", "nc -l 4444")
        prompts: (可选) 程序的提示符列表,有助于更快地检测输出结束。
                 例如 GDB 使用 ["(gdb)"], Python 使用 [">>>", "..."]。默认为空列表。
    """
    return manager.start(command, prompts)
 
@mcp.tool()
def send_command(command: str) -> str:
    """
    向当前运行的进程发送命令(输入),并等待并返回输出结果。
     
    注意:此工具不仅发送输入,还会智能等待并返回程序的响应输出。
    """
    return manager.write(command)
 
@mcp.tool()
def send_signal(signal_name: str) -> str:
    """
    向进程发送系统信号 (SIGINT, SIGTERM 等)。
    """
    return manager.send_signal(signal_name)
 
@mcp.tool()
def send_keyboard_shortcut(key: str) -> str:
    """
    发送键盘快捷键 (Control + Key) 到当前进程。
     
    这模拟了用户在终端按住 Ctrl 键的同时按下另一个键。
     
    Args:
        key: 要与 Ctrl 组合的单个字符。
             常用示例:
             - "c": 对应 Ctrl+C (中断/SIGINT)。用于停止正在运行的命令,如 ping。
             - "z": 对应 Ctrl+Z (挂起/SIGTSTP)。
             - "d": 对应 Ctrl+D (EOF)。用于告诉程序输入结束,如退出 Python shell。
             - "\\": 对应 Ctrl+\\ (退出/SIGQUIT)。
    """
    return manager.send_control(key)
 
@mcp.tool()
def read_current_output() -> str:
    """
    读取当前缓冲区中的输出。
    """
    return manager._smart_read(timeout_override=0.5)
 
@mcp.tool()
def stop_process() -> str:
    """
    停止当前运行的进程。
    """
    return manager.stop()
 
if __name__ == "__main__":
    mcp.run(transport="sse")

软件依赖

1
2
3
4
5
6
7
8
root@Jtian:/app# python -V
Python 3.12.3
root@Jtian:/app# pip list |grep fastmcp
fastmcp                   1.0
root@Jtian:/app#
root@Jtian:/app# pip list |grep pexpect
pexpect                   4.9.0
root@Jtian:/app#

参考

https://bbs.kanxue.com/thread-289218.htm


传播安全知识、拓宽行业人脉——看雪讲师团队等你加入!

最后于 2025-12-1 00:55 被Jtian编辑 ,原因:
收藏
免费 0
支持
分享
最新回复 (0)
游客
登录 | 注册 方可回帖
返回