-
-
[原创]Python3 常用代码整理
-
发表于: 2021-4-24 14:50 3253
-
Python3 常用代码整理
前言
该文章主要作用是整理收集常用代码,用来在以后快速编写程序提供源码。
一;Python3 网络编程
python 使用模块socket来完成TCP和UDP服务器和客户端的创建,跟进一步说明就是socket模块当中封装了代码用来调用TCP,UDP协议,生成符合TCP,UDP等协议的数据包。
1.1;TCP客户端
import socket target_host = " " target_port = 80 #建立一个socket对象,支持IPV4协议,且为TCP套接字 client = socket.socket(socket.AF_INET,socket.SOCK_STREAM) #链接客户端 client.connect((target_host,target_port)) #发送数据库 client.send(('GET / HTTP/1.1\r\n Host: baidu.com\r\n\r\n').encode()) #接收数据 response = client.recv(4096) print(response)
1.2;UDP客户端
import socket target_host = " " target_port = 80 #建立一个socket对象,支持IPV4协议,且为TCP套接字 client = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) #发送数据库 client.sendto(('aaaaaaa').encode(),(target_host,target_port)) #接收数据 data,addr = client.recvfrom(4096) print(data)
1.3;TCP服务器
import socket import threading bind_ip = "0.0.0.0" bind_port = 9999 server = socket.socket(socket.AF_INET,socket.SOCK_STREAM) server.bind((bind_ip,bind_port)) server.listen(5) print("[*] Listening on %s:%d" %(bind_ip,bind_port)) #客户处理线程 def handle_client(client_socket): #打印出客户端发送得到的内容 request = client_socket.recv(1024) print("[*] Received: %s" % request) #返回一个数据包 client_socket.send("ACK!") #关闭套接字 client_socket.close while True: client,addr = server.accept() print("[*] Accepted connection from : %s:%d" % (addr[0],addr[1])) #挂起客户端线程,处理传入数据 client_handler = threading.Thread(target=handle_client,args=(client,)) client_handler.start()
1.4;替代netcat
import sys import socket import getopt import threading import subprocess #定义一些全局变量 listen = False command = False upload = False execute = "" target = "" upload_destination = "" port = 0 def usage(): print("BHP Net Tool") print("Usage: bhpnet.py -t target_host -p port") print("-l --listen - listen on [host]:[port] for incoming connections") print("-e --execute=file_to_run - execute the give file upon receiving a connection") print("-c --command - initialize a command shell") print("-u --upload = destination - upon receiving connection upload a file and write to [destination]") print(" ") print("Examples: ") print("bhpnet.py -t 192.168.1.1 -p 555 -l -c") print("bhpnet.py -t 192.168.1.1 -p 555 -l -u = c:\\target.exe") print("bhpnet.py -t 192.168.1.1 -p 555 -l -e = \"cat /etc/passwd\" ") print("echo '123' | ./bhpnet.py -t 192.168.1.1 -p 555") sys.exit(0) def main(): global listen global port global execute global command global upload_destination global target if not len(sys.arge[1:]): usage() #读取命令行选项 try: opts,args = getopt.getopt(sys.argv[1:],"hle:t:p:cu:",["help","listen","execute","target","port","command","upload"]) except getopt.GetoptError as err: print(str(err)) usage() for o,a in opts: if o in ("-h","--help"): usage() elif o in ("-l","--listen"): listen = True elif o in ("-e", "--execute"): execute = a elif o in ("-c","--commandshell"): command = True elif o in ("-u","--upload"): upload_destination = a elif o in ("-t","--target"): target = a elif o in ("-p","--post"): port = int(a) else: assert False,"Unhandled Option" #监听?输入发送数据? if not listen and len(target) and port > 0: #从命令行读取内存数据 #这里将阻塞,所以不在标准输入发送数据时候发送CTRL-D buffer = sys.stdin.read() #发送数据 client_sender(buffer) if listen: server_loop() if __import__== "__main__": main() #发送数据 def client_sender(buffer): client = socket.socket(socket.AF_INET,socket.SOCK_STREAM) try: #链接到目标主机 client.connect((target,port)) if len(buffer): client.send(buffer) while True: #现在等待数据回传 recv_len = 1 response = "" while recv_len: data = client.recv(4096) recv_len = len(data) response += data if recv_len < 4096: break print(response) #等待更多的输入 buffer = raw_input("") buffer += "\n" #发送出去 client.send(buffer) except: print("[*] Exception! Exiting.") #关闭链接 client.close() #tcp服务器 def server_loop(): global target #如果没有定义目标,那么我们监听所有端口 if not len(target): target = "0.0.0.0" server = socket.socket(socket.AF_INET,socket.SOCK_STREAM) server.bind((target,port)) server.listen(5) while True: client_socket,addr = server.accept() #分拆一个县城处理新的客户端 client_thread = threading.Thread(target=client_handler,args=(client_socket,)) client_thread.start() def run_command(command): #换行 command = command.rstrip() #运行命令并将输出返回 try: output = subprocess.check_output(command,stderr=subprocess.STDOUT,shell=True) except: output = "Failed to execute command.\r\n" #将输出发送 return output #文件上传,命令执行,shell def client_handler(client_socket): global upload global execute global command #检测上传文件 if len(upload_destination): #读取所有的字符并写下目标 file_buffer = "" #持续读取数据知道没有符合的数据 while True: data = client_socket.recv(1024) if not data: break else: file_buffer += data #将接收到的数据写出来 try: file_descriptor = open(upload_destination,"wb") file_descriptor.write(file_buffer) file_descriptor.close() #确定写入完成 client_socket.send("Successfully saved file to %s\r\n" % upload_destination) except: client_socket.send("Failed to save file to %s\r\n" % upload_destination) #检查命令执行 if len(execute): #运行命令 output = run_command(execute) client_socket.send(output) #如果需要一个命令行shell,那么我们进入另一个循环 if command: while True: #跳出一个窗口 client_socket.send("<BHP:#>") #接受文件直到发现换行符 cmd_buffer = "" while "\n" not in cmd_buffer: cmd_buffer += client_socket.recv(1024) #返还命令输出 response = run_command(cmd_buffer) #返回响应数据 client_socket.send(response)
二;文件操作
open()函数的操作模式,记录如下表:
操作模式 | 注释 |
‘r’ | 读取(默认) |
‘w’ | 写入(先截断之前的内容) |
‘x’ | 写入,如果文件已经存在会产生异常 |
‘a’ | 追加,将内容写入到已有文件的末尾 |
‘b’ | 二进制模式 |
‘t’ | 文本模式 |
‘+’ | 更新(即可读可写) |
2.1;读取文件
def main(): f = None try: f = open('somefile.txt','r',encoding='utf-8') print(f.read()) except FileNotFoundError: print('无法打开指定的文件!') except LookupError: print('指定了未知的编码!') except UnicodeDecodeError: print('读取文件时编码错误') finally: if f: f.close() if __name__ == '__main__': main()
2.2;逐行读取文件
import time def main(): print("整个文件全部读取") #全部读取整个文件内容 with open('somefile.txt',mode='r',encoding='utf-8') as f: print(f.read()) print("for-in 循环逐行读取") #for-in 循环逐行读取 with open('somefile.txt',mode='r') as f: for line in f: print(line,end='') time.sleep(0.5) print("\n") print("文件按行读取到列表当中") #读取文件按行读取到列表当中 with open('somefile.txt') as f: lines = f.readline() print(lines) if __name__ == '__main__': main()
2.3;文件写入
from math import sqrt def is_prime(n): """判断素数的函数""" assert n > 0 for factor in range(2,int(sqrt(n)) + 1): if n % factor == 0: return False return True if n != 1 else False def main(): filenames = ('a.txt','b.txt','c.txt') fs_list=[] try: for filename in filenames: fs_list.append(open(filename,'w',encoding='utf-8')) for number in range(1,10000): if is_prime(number): if number < 100: fs_list[0].write(str(number) + '\n') elif number < 1000: fs_list[1].write(str(number) + '\n') else: fs_list[2].write(str(number) + '\n') except IOError as ex: print(ex) print('写文件时发生错误') finally: for fs in fs_list: fs.close() print('操作完成!') if __name__ == '__main__': main()
2.4;二进制写入
def main(): try: with open('1.jpg','rb') as fs1: data = fs1.read() print(type(data)) with open('2.jpg','wb') as fs2: fs2.write(data) except FileNotFoundError as e: print('指定的文件无法打开。') except IOError as e: print('读写文件时出现错误。') print('程序执行结束') if __name__ == '__main__': main()
2.5;读写JSON文件
json的数据类型和Python的数据类型的对比
Python | JSON |
dict | object |
list,tuple | array |
str | string |
int,float,int- &float-derived Enums | number |
True/False | true/false |
None | null |
使用Json模块,将字典,列表以JSON格式保存在文件当中。
import json def main(): mydict = { 'name': '张三', 'age': 333, 'qq': 952348, 'friends': ['王', '芳'], 'cars': [ {'brand': 'BYD', 'max_speed': 180}, {'brand': 'Au2di', 'max_speed': 280}, {'brand': 'Ben3z', 'max_speed': 320} ] } try: with open('data.json', 'w', encoding='utf-8') as fs: json.dump(mydict, fs) except IOError as e: print(e) print('保存数据完成!') if __name__ == '__main__': main()
三;进程与线程
多进程
from multiprocessing import Process from os import getpid from random import randint from time import time,sleep def download_task(filename): print('启动下载进程,进程号[%d]'% getpid()) print('开始下载%s...'%filename) time_to_download = randint(5,10) sleep(time_to_download) print('%s下载完成!耗费了%d秒'%(filename,time_to_download)) def main(): start = time() p1 = Process(target=download_task,args=('test.pdf')) #创建进程p1 p1.start() #启动进程p1 p2 = Process(target=download_task,args=('test2.pdf')) p2.start() p1.join() #等待P1进程结束 p2.join() end = time() print('总共耗费了%.2f秒'%(end - start)) if __name__ == '__main__': main()
多线程
from random import randint from threading import Thread from time import time,sleep def download(filename): print('开始下载%s....'%filename) time_to_download = randint(5,10) sleep(time_to_download) print('%s下载完成!耗费了%d秒' %(filename,time_to_download)) def main(): start = time() t1 = Thread(target=download,args=('test.pdf')) #创建一个线程t1 t1.start() #启动线程t1 t2 = Thread(target=download,args=('test2.pdf')) t2.start() t1.join() #等待线程t1结束 t2.join() end = time() print('总共耗费了%.3f秒' % (end - start)) if __name__ == '__main__': main()
锁
我们可以通过“锁”来保护“临界资源”,只有获得“锁”的线程才能访问“临界资源”,而其他没有得到“锁”的线程只能被阻塞起来,直到获得“锁”的线程释放了“锁”,其他线程才有机会获得“锁”,进而访问被保护的“临界资源”(如果一个资源被多个线程竞争使用,那么我们通常称之为“临界资源”)。
Python的解释器有一个“全局解释器锁”(GIL)的东西,任何线程执行前必须先获得GIL锁,然后每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行,这是一个历史遗留问题,但是即便如此,就如我们之前举的例子,使用多线程在提升执行效率和改善用户体验方面仍然是有积极意义的。
from time import sleep from threading import Thread, Lock class Account(object): def __init__(self): self._balance = 0 self._lock = Lock() def deposit(self, money): # 先获取锁才能执行后续的代码 self._lock.acquire() try: new_balance = self._balance + money sleep(0.01) self._balance = new_balance finally: # 在finally中执行释放锁的操作保证正常异常锁都能释放 self._lock.release() @property def balance(self): return self._balance class AddMoneyThread(Thread): def __init__(self, account, money): super().__init__() self._account = account self._money = money def run(self): self._account.deposit(self._money) def main(): account = Account() threads = [] for _ in range(100): t = AddMoneyThread(account, 1) threads.append(t) t.start() for t in threads: t.join() print('账户余额为: ¥%d元' % account.balance) if __name__ == '__main__': main()
N;参考书籍
1;Python黑帽子 黑客与渗透测试编程之道
2;https://github.com/jackfrued/Python-100-Days
[培训]内核驱动高级班,冲击BAT一流互联网大厂工作,每周日13:00-18:00直播授课
最后于 2021-4-30 16:05
被天象独行编辑
,原因:
赞赏
他的文章
- [分享]应急响应工具-日志分析 4836
- [原创]MEMZ彩虹病毒分析 15646
- [原创]恶意程序分析-去除恶意程序混淆-学习记录 13139
- [原创]Ret2libc 学习记录笔记 15559
- [原创]基本语句逆向分析 12227
看原图
赞赏
雪币:
留言: