首页
社区
课程
招聘
[原创]Python3 常用代码整理
发表于: 2021-4-24 14:50 3253

[原创]Python3 常用代码整理

2021-4-24 14:50
3253

Python3 常用代码整理

前言

    该文章主要作用是整理收集常用代码,用来在以后快速编写程序提供源码。

一;Python3 网络编程

    python 使用模块socket来完成TCP和UDP服务器和客户端的创建,跟进一步说明就是socket模块当中封装了代码用来调用TCP,UDP协议,生成符合TCP,UDP等协议的数据包。

    1.1;TCP客户端

import socket

target_host = " "
target_port = 80

#建立一个socket对象,支持IPV4协议,且为TCP套接字
client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)

#链接客户端
client.connect((target_host,target_port))

#发送数据库
client.send(('GET / HTTP/1.1\r\n Host: baidu.com\r\n\r\n').encode())

#接收数据
response = client.recv(4096)

print(response)

    1.2;UDP客户端

import socket

target_host = " "
target_port = 80

#建立一个socket对象,支持IPV4协议,且为TCP套接字
client = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)

#发送数据库
client.sendto(('aaaaaaa').encode(),(target_host,target_port))

#接收数据
data,addr = client.recvfrom(4096)

print(data)

    1.3;TCP服务器

import socket
import threading

bind_ip = "0.0.0.0"
bind_port = 9999

server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)

server.bind((bind_ip,bind_port))
server.listen(5)

print("[*] Listening on %s:%d" %(bind_ip,bind_port))

#客户处理线程
def handle_client(client_socket):
    #打印出客户端发送得到的内容
    request = client_socket.recv(1024)
    print("[*] Received: %s" % request)

    #返回一个数据包
    client_socket.send("ACK!")

    #关闭套接字
    client_socket.close

while True:
    client,addr = server.accept()

    print("[*] Accepted connection from : %s:%d" % (addr[0],addr[1]))

    #挂起客户端线程,处理传入数据
    client_handler = threading.Thread(target=handle_client,args=(client,))
    client_handler.start()

    1.4;替代netcat

import sys
import socket
import getopt
import threading
import subprocess

#定义一些全局变量
listen = False
command = False
upload = False
execute = ""
target = ""
upload_destination = ""
port = 0

def usage():
    print("BHP Net Tool")
    print("Usage: bhpnet.py -t target_host -p port")
    print("-l --listen  - listen on [host]:[port] for incoming connections")
    print("-e --execute=file_to_run  - execute the give file upon receiving a connection")
    print("-c --command   - initialize a command shell")
    print("-u --upload = destination - upon receiving connection upload a file and write to [destination]")

    print(" ")
    print("Examples: ")
    print("bhpnet.py -t 192.168.1.1 -p 555 -l -c")
    print("bhpnet.py -t 192.168.1.1 -p 555 -l -u = c:\\target.exe")
    print("bhpnet.py -t 192.168.1.1 -p 555 -l -e = \"cat /etc/passwd\" ")
    print("echo '123' | ./bhpnet.py  -t 192.168.1.1 -p 555")
    sys.exit(0)

def main():
    global listen
    global port
    global execute
    global command
    global upload_destination
    global target

    if not len(sys.arge[1:]):
        usage()

    #读取命令行选项
    try:
        opts,args = getopt.getopt(sys.argv[1:],"hle:t:p:cu:",["help","listen","execute","target","port","command","upload"])
    except getopt.GetoptError as err:
        print(str(err))
        usage()

    for o,a in opts:
        if o in ("-h","--help"):
            usage()
        elif o in ("-l","--listen"):
            listen = True
        elif o in ("-e", "--execute"):
            execute = a
        elif o in ("-c","--commandshell"):
            command = True
        elif o in ("-u","--upload"):
            upload_destination = a
        elif o in ("-t","--target"):
            target = a
        elif o in ("-p","--post"):
            port = int(a)
        else:
            assert False,"Unhandled Option"

    #监听?输入发送数据?
    if not listen and len(target) and port > 0:
        #从命令行读取内存数据
        #这里将阻塞,所以不在标准输入发送数据时候发送CTRL-D
        buffer = sys.stdin.read()

        #发送数据
        client_sender(buffer)

        if listen:
            server_loop()

if __import__== "__main__":
    main()
    #发送数据
    def client_sender(buffer):
        client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        try:
            #链接到目标主机
            client.connect((target,port))
            if len(buffer):
                client.send(buffer)
            while True:
                #现在等待数据回传
                recv_len = 1
                response = ""
                while recv_len:
                    data = client.recv(4096)
                    recv_len = len(data)
                    response += data
                    if recv_len < 4096:
                        break
                print(response)

                #等待更多的输入
                buffer = raw_input("")
                buffer += "\n"

                #发送出去
                client.send(buffer)

        except:
            print("[*] Exception! Exiting.")
            #关闭链接
            client.close()

    #tcp服务器
    def server_loop():
        global target

        #如果没有定义目标,那么我们监听所有端口
        if not len(target):
            target = "0.0.0.0"

        server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        server.bind((target,port))

        server.listen(5)

        while True:
            client_socket,addr = server.accept()

            #分拆一个县城处理新的客户端
            client_thread = threading.Thread(target=client_handler,args=(client_socket,))
            client_thread.start()

    def run_command(command):
        #换行
        command = command.rstrip()
        #运行命令并将输出返回
        try:
            output = subprocess.check_output(command,stderr=subprocess.STDOUT,shell=True)
        except:
            output = "Failed to execute command.\r\n"

        #将输出发送
        return output

    #文件上传,命令执行,shell
    def client_handler(client_socket):
        global upload
        global execute
        global command

        #检测上传文件
        if len(upload_destination):
            #读取所有的字符并写下目标
            file_buffer = ""

            #持续读取数据知道没有符合的数据
            while True:
                data = client_socket.recv(1024)

                if not data:
                    break
                else:
                    file_buffer += data

        #将接收到的数据写出来
        try:
            file_descriptor = open(upload_destination,"wb")
            file_descriptor.write(file_buffer)
            file_descriptor.close()

            #确定写入完成
            client_socket.send("Successfully saved file to %s\r\n" % upload_destination)
        except:
            client_socket.send("Failed to save file to %s\r\n" % upload_destination)



        #检查命令执行
        if len(execute):
            #运行命令
            output = run_command(execute)
            client_socket.send(output)

            #如果需要一个命令行shell,那么我们进入另一个循环
            if command:
                while True:
                    #跳出一个窗口
                    client_socket.send("<BHP:#>")

                    #接受文件直到发现换行符
                    cmd_buffer = ""
                    while "\n" not in cmd_buffer:
                        cmd_buffer += client_socket.recv(1024)

                        #返还命令输出
                        response = run_command(cmd_buffer)

                        #返回响应数据
                        client_socket.send(response)

二;文件操作

    open()函数的操作模式,记录如下表:

操作模式注释
‘r’读取(默认)
‘w’写入(先截断之前的内容)
‘x’写入,如果文件已经存在会产生异常
‘a’追加,将内容写入到已有文件的末尾
‘b’二进制模式
‘t’文本模式
‘+’
更新(即可读可写)

    2.1;读取文件

def main():
    f = None
    try:
        f = open('somefile.txt','r',encoding='utf-8')
        print(f.read())
    except FileNotFoundError:
        print('无法打开指定的文件!')
    except LookupError:
        print('指定了未知的编码!')
    except UnicodeDecodeError:
        print('读取文件时编码错误')
    finally:
        if f:
            f.close()

if __name__ == '__main__':
    main()

    2.2;逐行读取文件

import time

def main():
    print("整个文件全部读取")
    #全部读取整个文件内容
    with open('somefile.txt',mode='r',encoding='utf-8') as f:
       print(f.read())

    print("for-in 循环逐行读取")
    #for-in 循环逐行读取
    with open('somefile.txt',mode='r') as f:
        for line in f:
            print(line,end='')
            time.sleep(0.5)
    print("\n")
    print("文件按行读取到列表当中")

    #读取文件按行读取到列表当中
    with open('somefile.txt') as f:
        lines = f.readline()
    print(lines)

if __name__ == '__main__':
    main()

    2.3;文件写入    

from math import sqrt

def is_prime(n):
    """判断素数的函数"""
    assert n > 0
    for factor in range(2,int(sqrt(n)) + 1):
        if n % factor == 0:
            return False
    return True if n != 1 else False

def main():
    filenames = ('a.txt','b.txt','c.txt')
    fs_list=[]
    try:
        for filename in filenames:
            fs_list.append(open(filename,'w',encoding='utf-8'))
        for number in range(1,10000):
            if is_prime(number):
                if number < 100:
                    fs_list[0].write(str(number) + '\n')
                elif number < 1000:
                    fs_list[1].write(str(number) + '\n')
                else:
                    fs_list[2].write(str(number) + '\n')
    except IOError as ex:
        print(ex)
        print('写文件时发生错误')
    finally:
        for fs in fs_list:
            fs.close()
    print('操作完成!')

if __name__ == '__main__':
    main()

    2.4;二进制写入

def main():
    try:
        with open('1.jpg','rb') as fs1:
            data = fs1.read()
            print(type(data))
        with open('2.jpg','wb') as fs2:
            fs2.write(data)
    except FileNotFoundError as e:
        print('指定的文件无法打开。')
    except IOError as e:
        print('读写文件时出现错误。')
    print('程序执行结束')


if __name__ == '__main__':
    main()

    2.5;读写JSON文件

    json的数据类型和Python的数据类型的对比

PythonJSON
dictobject
list,tuplearray
strstring
int,float,int- &float-derived Enums

number

True/Falsetrue/false
Nonenull

    使用Json模块,将字典,列表以JSON格式保存在文件当中。

import json


def main():
    mydict = {
        'name': '张三',
        'age': 333,
        'qq': 952348,
        'friends': ['王', '芳'],
        'cars': [
            {'brand': 'BYD', 'max_speed': 180},
            {'brand': 'Au2di', 'max_speed': 280},
            {'brand': 'Ben3z', 'max_speed': 320}
        ]
    }
    try:
        with open('data.json', 'w', encoding='utf-8') as fs:
            json.dump(mydict, fs)
    except IOError as e:
        print(e)
    print('保存数据完成!')


if __name__ == '__main__':
    main()

三;进程与线程

    多进程

from multiprocessing import Process
from os import getpid
from random import randint
from time import time,sleep

def download_task(filename):
    print('启动下载进程,进程号[%d]'% getpid())
    print('开始下载%s...'%filename)
    time_to_download = randint(5,10)
    sleep(time_to_download)
    print('%s下载完成!耗费了%d秒'%(filename,time_to_download))

def main():
    start = time()
    p1 = Process(target=download_task,args=('test.pdf'))  #创建进程p1
    p1.start()                                            #启动进程p1
    p2 = Process(target=download_task,args=('test2.pdf'))
    p2.start()
    p1.join()                                             #等待P1进程结束
    p2.join()
    end = time()
    print('总共耗费了%.2f秒'%(end - start))

if __name__ == '__main__':
    main()

    多线程

from random import randint
from threading import Thread
from time import time,sleep

def download(filename):
    print('开始下载%s....'%filename)
    time_to_download = randint(5,10)
    sleep(time_to_download)
    print('%s下载完成!耗费了%d秒' %(filename,time_to_download))

def main():
    start = time()
    t1 = Thread(target=download,args=('test.pdf')) #创建一个线程t1
    t1.start()                                     #启动线程t1
    t2 = Thread(target=download,args=('test2.pdf'))
    t2.start()
    t1.join()                                      #等待线程t1结束
    t2.join()
    end = time()
    print('总共耗费了%.3f秒' % (end - start))

if __name__ == '__main__':
    main()

    锁   

    我们可以通过“锁”来保护“临界资源”,只有获得“锁”的线程才能访问“临界资源”,而其他没有得到“锁”的线程只能被阻塞起来,直到获得“锁”的线程释放了“锁”,其他线程才有机会获得“锁”,进而访问被保护的“临界资源”(如果一个资源被多个线程竞争使用,那么我们通常称之为“临界资源”)。

    Python的解释器有一个“全局解释器锁”(GIL)的东西,任何线程执行前必须先获得GIL锁,然后每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行,这是一个历史遗留问题,但是即便如此,就如我们之前举的例子,使用多线程在提升执行效率和改善用户体验方面仍然是有积极意义的。

from time import sleep
from threading import Thread, Lock


class Account(object):

    def __init__(self):
        self._balance = 0
        self._lock = Lock()

    def deposit(self, money):
        # 先获取锁才能执行后续的代码
        self._lock.acquire()
        try:
            new_balance = self._balance + money
            sleep(0.01)
            self._balance = new_balance
        finally:
            # 在finally中执行释放锁的操作保证正常异常锁都能释放
            self._lock.release()

    @property
    def balance(self):
        return self._balance


class AddMoneyThread(Thread):

    def __init__(self, account, money):
        super().__init__()
        self._account = account
        self._money = money

    def run(self):
        self._account.deposit(self._money)


def main():
    account = Account()
    threads = []
    for _ in range(100):
        t = AddMoneyThread(account, 1)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    print('账户余额为: ¥%d元' % account.balance)


if __name__ == '__main__':
    main()




















N;参考书籍

1;Python黑帽子 黑客与渗透测试编程之道

2;https://github.com/jackfrued/Python-100-Days


[培训]内核驱动高级班,冲击BAT一流互联网大厂工作,每周日13:00-18:00直播授课

最后于 2021-4-30 16:05 被天象独行编辑 ,原因:
收藏
免费 0
支持
分享
最新回复 (0)
游客
登录 | 注册 方可回帖
返回
//