-
-
[原创]像pwntools一样操作python socket,而不要安装依赖
-
发表于: 2025-7-10 21:03 279
-
工具描述
一个简化的类pwntools工具,只有一个简单的py脚本,用来操作网络请求,适用于一些不太方便安装pwntools的离线场景。因为刻意的保留了pwntools风格,所以也比较方便后续代码的移植和分享。
minipwn.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 | #!/usr/bin/env python3# -*- coding: utf-8 -*-import socketimport structimport sysimport selectimport osimport threading# ================================================================= ## Packing and Unpacking functions ## ================================================================= #def _get_endian_prefix(endian): endian = endian.lower() if endian in ['little', 'le']: return '<' if endian in ['big', 'be']: return '>' raise ValueError("Endian must be 'little' or 'big'")def p8(x, *args, **kwargs): return struct.pack('<B', x)def p32(x, endian='little'): return struct.pack(f'{_get_endian_prefix(endian)}I', x)def p64(x, endian='little'): return struct.pack(f'{_get_endian_prefix(endian)}Q', x)def u8(x, *args, **kwargs): return struct.unpack('<B', x)[0]def u32(x, endian='little'): return struct.unpack(f'{_get_endian_prefix(endian)}I', x)[0]def u64(x, endian='little'): return struct.unpack(f'{_get_endian_prefix(endian)}Q', x)[0]# ================================================================= ## The Main Pwn Class ## ================================================================= #def remote(host, port): try: p = Pwn() p.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) p.sock.connect((host, port)) return p except Exception as e: print(e) p.sock = None return Noneclass Pwn: def __init__(self): self.sock = None self.log_prefix = { 'info': '[*] ', 'success': '[+] ', 'error': '[-] ' } # 用于在多线程交互中同步状态 self.interactive_stop_event = threading.Event() def _log(self, message, level='info'): print(f"{self.log_prefix.get(level, '')}{message}", flush=True) def send(self, data): if self.sock is None: self._log("Socket is not connected.", 'error'); return if isinstance(data, str): data = data.encode('utf-8') try: self.sock.sendall(data) except Exception as e: self._log(f"Failed to send data: {e}", 'error'); self.close() def sendline(self, data): if isinstance(data, str): data = data.encode('utf-8') self.send(data + b'\n') def recv(self, bufsize=4096, timeout=2): if self.sock is None: self._log("Socket is not connected.", 'error'); return b'' try: self.sock.settimeout(timeout) return self.sock.recv(bufsize) except socket.timeout: self._log("Receive timed out.", 'info'); return b'' except Exception as e: self._log(f"Failed to receive data: {e}", 'error'); self.close(); return b'' def recvuntil(self, delims, timeout=2): if self.sock is None: self._log("Socket is not connected.", 'error'); return b'' if isinstance(delims, str): delims = delims.encode('utf-8') data = b'' try: self.sock.settimeout(timeout) while delims not in data: chunk = self.sock.recv(1) if not chunk: self._log("Connection closed by remote host.", 'info'); break data += chunk return data except socket.timeout: self._log("recvuntil timed out.", 'info'); return data except Exception as e: self._log(f"Failed in recvuntil: {e}", 'error'); self.close(); return b'' def interactive(self): if self.sock is None: self._log("Socket is not connected.", 'error'); return self._log("Switching to interactive mode.") if sys.platform == 'win32': self._interactive_windows() else: self._interactive_linux() def _interactive_linux(self): try: while True: readable, _, _ = select.select([self.sock, sys.stdin], [], []) for s in readable: if s is self.sock: data = self.sock.recv(4096) if not data: self._log("Connection closed by remote host."); return os.write(sys.stdout.fileno(), data) if s is sys.stdin: user_input = os.read(sys.stdin.fileno(), 4096) if not user_input: continue self.sock.sendall(user_input) except KeyboardInterrupt: self._log("\nInteractive mode ended by user (Ctrl+C).") except Exception as e: self._log(f"\nAn error occurred in interactive mode: {e}", 'error') finally: self.close() def _interactive_windows(self): """Interactive mode for Windows using threading with consistent I/O.""" self.interactive_stop_event.clear() def receiver(): while not self.interactive_stop_event.is_set(): try: # 设置一个小的超时,以便线程可以周期性地检查 stop_event self.sock.settimeout(0.1) data = self.sock.recv(4096) if not data: self._log("Connection closed by remote host.") break # 使用高层级的 sys.stdout.write sys.stdout.write(data.decode(errors='ignore')) sys.stdout.flush() except socket.timeout: continue # 超时是正常的,继续循环 except (socket.error, OSError) as e: # 如果套接字已关闭,则会出错 break self.interactive_stop_event.set() recv_thread = threading.Thread(target=receiver) recv_thread.daemon = True recv_thread.start() try: while not self.interactive_stop_event.is_set(): # 使用高层级的 sys.stdin.readline user_input = sys.stdin.readline() if not user_input or self.interactive_stop_event.is_set(): break try: self.send(user_input) except (socket.error, OSError): break except KeyboardInterrupt: self._log("\nInteractive mode ended by user (Ctrl+C).") except Exception as e: if not self.interactive_stop_event.is_set(): self._log(f"\nAn error occurred in interactive mode: {e}", 'error') finally: self.interactive_stop_event.set() self.close() recv_thread.join(timeout=1.0) # 等待接收线程结束 def close(self): if self.sock: self.interactive_stop_event.set() # 确保所有交互循环都停止 try: # 关闭读写可以帮助阻塞的recv/send调用立即返回 self.sock.shutdown(socket.SHUT_RDWR) except OSError: pass # 如果套接字已经关闭,会抛出OSError self.sock.close() self.sock = None self._log("Connection closed.")# ================================================================= ## Example Usage ## ================================================================= #if __name__ == '__main__': print("--- Testing Endianness Packing ---") val = 0x11223344 le_packed = p32(val, endian='little'); print(f"Value 0x{val:x} packed as little-endian: {le_packed} (Hex: {le_packed.hex()})") be_packed = p32(val, endian='big'); print(f"Value 0x{val:x} packed as big-endian: {be_packed} (Hex: {be_packed.hex()})") print("-" * 34 + "\n") p = remote("localhost", 4444) if p.sock: # P.S. `recvuntil` 超时是正常的,如果你的 nc 服务在连接后不主动发送任何带换行符的数据。 welcome_message = p.recvuntil(b"\n", timeout=0.5) if welcome_message: print(f"Received from server: {welcome_message.decode(errors='ignore').strip()}") p.sendline(b"Hello from minipwn!") print("Initial message sent!") p.interactive() |
测试效果

其他说明
- 相对于minipwn.py的简单,pyoneGUI里打包了完整的pwntools工具,而且是绿色版,支持离线环境使用;
- minipwn.py是使用大语言模型辅助生成的,AI太强大了,我相信AGI不会太远。
[培训]Windows内核深度攻防:从Hook技术到Rootkit实战!
最后于 2025-7-10 21:44
被Jtian编辑
,原因:
赞赏
他的文章
赞赏
雪币:
留言: