首页
社区
课程
招聘
[原创]像pwntools一样操作python socket,而不要安装依赖
发表于: 2025-7-10 21:03 279

[原创]像pwntools一样操作python socket,而不要安装依赖

2025-7-10 21:03
279

工具描述

一个简化的类pwntools工具,只有一个简单的py脚本,用来操作网络请求,适用于一些不太方便安装pwntools的离线场景。因为刻意的保留了pwntools风格,所以也比较方便后续代码的移植和分享。

minipwn.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
 
import socket
import struct
import sys
import select
import os
import threading
 
# ================================================================= #
#                Packing and Unpacking functions                    #
# ================================================================= #
 
def _get_endian_prefix(endian):
    endian = endian.lower()
    if endian in ['little', 'le']: return '<'
    if endian in ['big', 'be']: return '>'
    raise ValueError("Endian must be 'little' or 'big'")
 
def p8(x, *args, **kwargs): return struct.pack('<B', x)
def p32(x, endian='little'): return struct.pack(f'{_get_endian_prefix(endian)}I', x)
def p64(x, endian='little'): return struct.pack(f'{_get_endian_prefix(endian)}Q', x)
def u8(x, *args, **kwargs): return struct.unpack('<B', x)[0]
def u32(x, endian='little'): return struct.unpack(f'{_get_endian_prefix(endian)}I', x)[0]
def u64(x, endian='little'): return struct.unpack(f'{_get_endian_prefix(endian)}Q', x)[0]
 
# ================================================================= #
#                      The Main Pwn Class                           #
# ================================================================= #
 
def remote(host, port):
    try:
        p = Pwn()
        p.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        p.sock.connect((host, port))
        return p
    except Exception as e:
        print(e)
        p.sock = None
        return None
 
class Pwn:
    def __init__(self):
        self.sock = None
        self.log_prefix = {
            'info':    '[*] ',
            'success': '[+] ',
            'error':   '[-] '
        }
        # 用于在多线程交互中同步状态
        self.interactive_stop_event = threading.Event()
     
    def _log(self, message, level='info'):
        print(f"{self.log_prefix.get(level, '')}{message}", flush=True)
         
    def send(self, data):
        if self.sock is None: self._log("Socket is not connected.", 'error'); return
        if isinstance(data, str): data = data.encode('utf-8')
        try:
            self.sock.sendall(data)
        except Exception as e:
            self._log(f"Failed to send data: {e}", 'error'); self.close()
 
    def sendline(self, data):
        if isinstance(data, str): data = data.encode('utf-8')
        self.send(data + b'\n')
 
    def recv(self, bufsize=4096, timeout=2):
        if self.sock is None: self._log("Socket is not connected.", 'error'); return b''
        try:
            self.sock.settimeout(timeout)
            return self.sock.recv(bufsize)
        except socket.timeout: self._log("Receive timed out.", 'info'); return b''
        except Exception as e: self._log(f"Failed to receive data: {e}", 'error'); self.close(); return b''
 
    def recvuntil(self, delims, timeout=2):
        if self.sock is None: self._log("Socket is not connected.", 'error'); return b''
        if isinstance(delims, str): delims = delims.encode('utf-8')
        data = b''
        try:
            self.sock.settimeout(timeout)
            while delims not in data:
                chunk = self.sock.recv(1)
                if not chunk: self._log("Connection closed by remote host.", 'info'); break
                data += chunk
            return data
        except socket.timeout: self._log("recvuntil timed out.", 'info'); return data
        except Exception as e: self._log(f"Failed in recvuntil: {e}", 'error'); self.close(); return b''
 
    def interactive(self):
        if self.sock is None: self._log("Socket is not connected.", 'error'); return
        self._log("Switching to interactive mode.")
        if sys.platform == 'win32':
            self._interactive_windows()
        else:
            self._interactive_linux()
 
    def _interactive_linux(self):
        try:
            while True:
                readable, _, _ = select.select([self.sock, sys.stdin], [], [])
                for s in readable:
                    if s is self.sock:
                        data = self.sock.recv(4096)
                        if not data: self._log("Connection closed by remote host."); return
                        os.write(sys.stdout.fileno(), data)
                    if s is sys.stdin:
                        user_input = os.read(sys.stdin.fileno(), 4096)
                        if not user_input: continue
                        self.sock.sendall(user_input)
        except KeyboardInterrupt: self._log("\nInteractive mode ended by user (Ctrl+C).")
        except Exception as e: self._log(f"\nAn error occurred in interactive mode: {e}", 'error')
        finally: self.close()
 
    def _interactive_windows(self):
        """Interactive mode for Windows using threading with consistent I/O."""
        self.interactive_stop_event.clear()
         
        def receiver():
            while not self.interactive_stop_event.is_set():
                try:
                    # 设置一个小的超时,以便线程可以周期性地检查 stop_event
                    self.sock.settimeout(0.1)
                    data = self.sock.recv(4096)
                    if not data:
                        self._log("Connection closed by remote host.")
                        break
                    # 使用高层级的 sys.stdout.write
                    sys.stdout.write(data.decode(errors='ignore'))
                    sys.stdout.flush()
                except socket.timeout:
                    continue # 超时是正常的,继续循环
                except (socket.error, OSError) as e:
                    # 如果套接字已关闭,则会出错
                    break
            self.interactive_stop_event.set()
 
        recv_thread = threading.Thread(target=receiver)
        recv_thread.daemon = True
        recv_thread.start()
 
        try:
            while not self.interactive_stop_event.is_set():
                # 使用高层级的 sys.stdin.readline
                user_input = sys.stdin.readline()
                if not user_input or self.interactive_stop_event.is_set():
                    break
                try:
                    self.send(user_input)
                except (socket.error, OSError):
                    break
        except KeyboardInterrupt:
            self._log("\nInteractive mode ended by user (Ctrl+C).")
        except Exception as e:
            if not self.interactive_stop_event.is_set():
                 self._log(f"\nAn error occurred in interactive mode: {e}", 'error')
        finally:
            self.interactive_stop_event.set()
            self.close()
            recv_thread.join(timeout=1.0) # 等待接收线程结束
 
    def close(self):
        if self.sock:
            self.interactive_stop_event.set() # 确保所有交互循环都停止
            try:
                # 关闭读写可以帮助阻塞的recv/send调用立即返回
                self.sock.shutdown(socket.SHUT_RDWR)
            except OSError:
                pass # 如果套接字已经关闭,会抛出OSError
            self.sock.close()
            self.sock = None
            self._log("Connection closed.")
 
# ================================================================= #
#                         Example Usage                             #
# ================================================================= #
 
if __name__ == '__main__':
    print("--- Testing Endianness Packing ---")
    val = 0x11223344
    le_packed = p32(val, endian='little'); print(f"Value 0x{val:x} packed as little-endian: {le_packed} (Hex: {le_packed.hex()})")
    be_packed = p32(val, endian='big'); print(f"Value 0x{val:x} packed as big-endian:    {be_packed} (Hex: {be_packed.hex()})")
    print("-" * 34 + "\n")
 
    p = remote("localhost", 4444)
     
    if p.sock:
        # P.S. `recvuntil` 超时是正常的,如果你的 nc 服务在连接后不主动发送任何带换行符的数据。
        welcome_message = p.recvuntil(b"\n", timeout=0.5)
        if welcome_message:
            print(f"Received from server: {welcome_message.decode(errors='ignore').strip()}")
         
        p.sendline(b"Hello from minipwn!")
        print("Initial message sent!")
         
        p.interactive()
        

测试效果

图片描述

其他说明

  1. 相对于minipwn.py的简单,pyoneGUI里打包了完整的pwntools工具,而且是绿色版,支持离线环境使用;
  2. minipwn.py是使用大语言模型辅助生成的,AI太强大了,我相信AGI不会太远。

[培训]Windows内核深度攻防:从Hook技术到Rootkit实战!

最后于 2025-7-10 21:44 被Jtian编辑 ,原因:
收藏
免费 1
支持
分享
最新回复 (0)
游客
登录 | 注册 方可回帖
返回