首页
社区
课程
招聘
[原创]TetCTF2022一道代码混淆题分析——crackme_pls
2022-1-14 15:14 16363

[原创]TetCTF2022一道代码混淆题分析——crackme_pls

2022-1-14 15:14
16363

新年第一帖!
年初做了道很有意思的代码混淆题crackme_pls,自闭了一个多星期后终于想出了相对比较好的去混淆方法,在这里给大伙分享一下分析思路。
题目附件和源代码:https://github.com/lkmidas/MyCTFPublic/tree/main/tetctf2022/crackme_pls

目录

0x00. 混淆概览

1. 魔改控制流平坦化

以main函数为例,这题的控制流图是这样的(IDA识别函数范围有误,需要手动修复一下):
图片描述
为啥说是魔改的控制流平坦化呢?因为这个混淆从分发块到真实块之间是间接跳转,因此更难处理。
图片描述
图片描述

2. 字符串加密

所有的字符串常量都已经被加密了,运行时在init()函数中解密,但这个混淆其实对我们分析程序的干扰不大,动调一下就能拿到解密后的字符串:
图片描述
图片描述

3. 间接函数调用

所有的函数调用都变成了间接调用,IDA无法识别函数的真实地址:
图片描述
图片描述
这一坨看起来奇奇怪怪的式子实际上就是函数表的地址加上一个偏移,然后从这个地址取出函数的真实地址,40CAA0就是函数表的地址,之所以看起来这么“奇怪,是因为加法运算经过了MBA表达式替换混淆。
复原MBA表达式确实很困难,但实际上我们并不需要复原MBA表达式,直接动态调试就能拿到函数的真实地址,符号执行、手动计算也是OK的。还有个WP说Ghirda和Binary Ninja能识别出函数的真实地址在函数表中的位置,我用Binary Ninja试了一下确实可以,还是挺香的:
图片描述
IDA在main函数下断点动调,然后就能在函数表中找到对应的真实地址,比如这里其实是调用了fork函数,在Binary Ninja中把函数名改掉,代码的可读性就很高了:
图片描述
图片描述
所以去混淆的关键是恢复被平坦化的控制流

0x01. 反混淆流程(main函数)

这题是控制流平坦化变体,所以函数被混淆后的结构和去混淆方法其实和控制流平坦化是有异曲同工之妙的,关于怎么去普通的控制流平坦化,可以参考一下两篇文章:

参考WP:

1. 混淆结构

所谓知己知彼才能百战百胜,首先我们来研究下函数被混淆的结构。总体来说跟控制流平坦化是很像的,因此我们可以把控制流平坦化的结构图拿来作为参考,对这道题的CFG进行划分,还是以main函数为例:
图片描述
图片描述
这个划分规则暂时还是非常适用的(注意,只是暂时适用,一个小伏笔)
完整的过程是每个真实块执行完毕后都会返回预分发块,预分发块返回主分发块,再由主分发块经过若干个自分发块到达下一个真实块,和控制流平坦化是一样的,只不过跳转方式由直接跳转变成了间接跳转。

2. 总体思路

这题跟控制流平坦化很像,那么能不能直接套用控制流平坦化的去混淆思路呢?答案是不能,主要有两点原因:

  1. 间接跳转导致angr的analyse模块无法分析出函数的CFG
  2. 不能通过直接从一个真实块符号执行来找到下一个真实块的真实地址,因为间接跳转需要从栈中获取跳转表

但是这两个问题也是我们不得不面对的问题,因为要恢复控制流就得找到真实块之间的跳转关系,要找到真实块就得拿到函数的CFG。既然常规的思路没法结果,那我们就发挥才智想出一些新的方法来,解决方案如下:

获取函数CFG

既然angr的“正规”手段分析不出函数CFG,那么我们就用一些“野蛮”手段来生成一个简易的CFG。
首先定义一个叫做SuperBlock的类,表示一个IDA-like的基本块,跟angr-management中的SuperCFGNode很相似,这里重新定义一个是为了方便后续处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class SuperBlock:
 
    def __init__(self, block_addr):
        self.addr = block_addr
        self.blocks = []
        self.size = 0
 
    def insert_block(self, block):
        self.blocks.append(block)
        self.size += block.size
 
    @property
    def terminator(self):
        return self.blocks[-1].capstone.insns[-1]
 
    @property
    def successor(self):
        terminator = self.terminator
        try:
            return int(terminator.op_str, 16)
        except:
            return None
 
    @property
    def insns(self):
        insns = []
        for block in self.blocks:
            insns += block.capstone.insns
        return insns
 
    def __repr__(self):
        return "<SuperBlock %#08x->%#08x>" % (self.addr, self.addr + self.size)
 
    def __hash__(self):
        return hash(('superblock', self.addr))

接下来怎么分析出一个函数中的所有SuperNode呢,方法很简单粗暴:

  1. 从当前地址扫描(初始值为函数起始地址),直到碰到跳转指令(如jmp, jz等)或ret指令,此为一个SuperNode,并添加到列表中
  2. 将当前地址改为上一个SuperNode的一下条指令的地址,继续扫描

我写了一个FunctionCFG来表示函数的CFG,这个简易CFG存储了函数的所有SuperNode,但没有保存基本块之间的跳转关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class FunctionCFG:
 
    def __init__(self, proj, func_addr, func_end):
        self.proj = proj
        self.addr = func_addr
        self.size = func_end - func_addr
 
    def get_super_block(self, block_addr):
        super_block = SuperBlock(block_addr)
        cur_addr = block_addr
        while True:
            block = self.proj.factory.block(cur_addr)
            if block.size == 0:
                return super_block
            insns = block.capstone.insns
            super_block.insert_block(block)
            cur_addr += block.size
            if insns[-1].mnemonic.startswith('j') or insns[-1].mnemonic == 'ret':
                return super_block
 
    def dump_all_blocks(self):
        curr_addr = self.addr
        blocks = []
        while curr_addr - self.addr < self.size:
            super_block = self.get_super_block(curr_addr)
            if super_block.size == 0:
                curr_addr += 1
                continue
            blocks.append(super_block)
            curr_addr += super_block.size
        return blocks

来测试一下用这种方法分析CFG的效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if __name__ == '__main__':
    # Initilization
    logging.getLogger('cle').setLevel(logging.ERROR)
    logging.getLogger('angr').setLevel(logging.ERROR)
    logger = logging.getLogger('deobfu')
    logger.setLevel(logging.DEBUG)
    logger.addHandler(logging.FileHandler(filename='%s.log' % time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())))
    filename = 'crackme-pls.bin'
    proj = angr.Project(filename)
    base_addr = proj.loader.main_object.mapped_base
    reverse_plt = proj.loader.main_object.reverse_plt
    with open(filename, 'rb') as file:
        content = bytearray(file.read())
    # Prepare sigificant arguments
    func_addr, func_end = 0x403D90, 0x40419F
    # Analyse function CFG in a very crude way
    cfg = FunctionCFG(proj, func_addr, func_end)
    print(cfg.dump_all_blocks())

输出,经过检验结果是正确的:

1
[<SuperBlock 0x403d90->0x403f4d>, <SuperBlock 0x403f4d->0x403f5f>, <SuperBlock 0x403f5f->0x403f8b>, <SuperBlock 0x403f8b->0x403f9d>, <SuperBlock 0x403f9d->0x403fb4>, <SuperBlock 0x403fb4->0x403fc4>, <SuperBlock 0x403fc4->0x403fe2>, <SuperBlock 0x403fe2->0x403ff4>, <SuperBlock 0x403ff4->0x404065>, <SuperBlock 0x404065->0x40407e>, <SuperBlock 0x40407e->0x404095>, <SuperBlock 0x404095->0x4040bf>, <SuperBlock 0x4040bf->0x4040ce>, <SuperBlock 0x4040ce->0x4040e5>, <SuperBlock 0x4040e5->0x4040fc>, <SuperBlock 0x4040fc->0x40410e>, <SuperBlock 0x40410e->0x40418b>, <SuperBlock 0x40418b->0x40419f>]

至此,我们已经拿到了函数的全部基本块。

识别真实块

在控制流平坦化的去混淆中,我们可以通过基本块的前驱块和后继块来识别哪些是真实块,但在上一步中我们仅仅是得到了函数的所有基本块,而没有识别基本块之间的跳转关系,所以传统的方法行不通。
但仍然有很简单粗暴的方法来识别哪些是真实块:

  1. 首先这个基本块不能是序言(即函数的入口块)
  2. 这个基本块必须有一个直接的后继(通常是跳转到预分发块,一个小伏笔,之后会出现特例)
  3. 或者这个基本块是返回块(以retn指令或call exit结束)

代码如下,main函数中还没有出现call exit,所以其实可以先忽略这种情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
def anaylse_blocks(self):
    all_blocks = self.dump_all_blocks()
    relevant_blocks = []
    useless_blocks = []
    for block in all_blocks:
        if block.addr != self.addr and \
                (block.successor is not None
                or block.terminator.mnemonic == 'ret'
                or block.terminator.mnemonic == 'call'):
            relevant_blocks.append(block)
        elif block.addr != self.addr:
            useless_blocks.append(block)
    return relevant_blocks, useless_blocks

其中useless_blocks即无用块包括主分发块、子分发块以及预分发块,这些基本块是没有用的,之后都要被nop掉,所以顺便也收集起来。
打印一下结果:

1
2
relevant_blocks, useless_blocks = cfg.anaylse_blocks()
print('Revelvant blocks: %s\nUseless blocks: %s' % (relevant_blocks, useless_blocks))

输出,也是完全正确的:

1
2
Revelvant blocks: [<SuperBlock 0x403fb4->0x403fc4>, <SuperBlock 0x403ff4->0x404065>, <SuperBlock 0x404095->0x4040bf>, <SuperBlock 0x4040e5->0x4040fc>, <SuperBlock 0x40410e->0x40418b>, <SuperBlock 0x40418b->0x40419f>]
Useless blocks: [<SuperBlock 0x403f4d->0x403f5f>, <SuperBlock 0x403f5f->0x403f8b>, <SuperBlock 0x403f8b->0x403f9d>, <SuperBlock 0x403f9d->0x403fb4>, <SuperBlock 0x403fc4->0x403fe2>, <SuperBlock 0x403fe2->0x403ff4>, <SuperBlock 0x404065->0x40407e>, <SuperBlock 0x40407e->0x404095>, <SuperBlock 0x4040bf->0x4040ce>, <SuperBlock 0x4040ce->0x4040e5>, <SuperBlock 0x4040fc->0x40410e>]

寻找后继真实块

之前提到了不能直接通过符号执行找到真实块的后继真实块,那么就从函数入口点开始符号执行,explore到当前的真实块,然后继续符号执行找到下一个真实块?这种方法或许可行,但是速度肯定很慢。我们可以用另一个方法曲线救国。
通过分析汇编我们发现程序在分发块中的执行路径其实是由eax这个寄存器决定的,也就是每一个真实块对应了一个固定的值,只要程序在主分发块时eax的值等于该分发块对应的值,那么程序接下来就会到达这个真实块
图片描述
那么怎么找到每个真实块对应的eax值呢?最简单的方法肯定是符号执行。我的思路是:

  1. 从函数入口点开始符号执行,到主分发块(因为函数的入口块中需要完成跳转表的初始化)
  2. 将当前状态的eax符号化,继续符号执行,过滤掉已经遍历过的地址(防止循环)
  3. 符号执行到达真实块时,对eax约束求解,求解的结果就是真实块对应的eax值

代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Map node onto key and return a dict
def map_k2n():
    state = proj.factory.call_state(addr=func_addr)
    # Skip call
    state.inspect.b('call', when=angr.BP_BEFORE, action=skip_call)
    simgr = proj.factory.simgr(state)
    simgr.explore(find=dispatcher_node)
    found = simgr.found[0]
    # Symbolize eax
    key = claripy.BVS('key', 32)
    found.regs.eax = key
    simgr = proj.factory.simgr(found)
    visited = []
    k2n_map = {}
    while len(simgr.active):
        # Filter all visited states to prevent from sticking in a loop
        simgr.move(from_stash='active', to_stash='deadended', filter_func=lambda state: state.addr in visited)
        for active in simgr.active:
            visited.append(active.addr)
            if active.addr in relevant_nodes:
                key_val = active.solver.eval(key)
                k2n_map[key_val] = active.addr
                logger.debug('[*] Mapped node(%#x)->key(%#x) [%d/%d]'
                             % (active.addr, key_val, len(k2n_map), len(relevant_nodes)))
        simgr.step()
    return k2n_map
1
2
3
4
5
6
7
8
9
10
11
# Analyse function CFG in a very crude way
cfg = FunctionCFG(proj, func_addr, func_end)
# Get prologue block at func addr
prologue_block = cfg.get_super_block(func_addr)
# The successor of prologue block is dispatcher block
dispatcher_node = prologue_block.successor
relevant_blocks, useless_blocks = cfg.anaylse_blocks()
# Convert block list to addr list
relevant_nodes = [block.addr for block in relevant_blocks]
# Map node onto key
k2n_map = map_k2n()

输出:

1
2
3
4
5
6
[*] Mapped node(0x403fb4)->key(0x1c8d08b0) [1/6]
[*] Mapped node(0x403ff4)->key(0x477f1a08) [2/6]
[*] Mapped node(0x404095)->key(0x1d507da0) [3/6]
[*] Mapped node(0x40410e)->key(0x4027245b) [4/6]
[*] Mapped node(0x4040e5)->key(0x4d20e191) [5/6]
[*] Mapped node(0x40418b)->key(0x5282f1e2) [6/6]

如何得到真实块的后继真实块呢?只需要知道真实块执行完毕后eax的值即可,有两个方法:一个是符号执行,另外一个是正则匹配。一开始我用的是符号执行的方法,但后来在处理其他函数的时候我发现符号执行处理不了一些特殊情况,无奈只能改成正则匹配了。
正则匹配的方法如下:

  1. 如果真实块只有一个后继块,那么真实块中一定有一条mov eax, 某个真实块的key指令
    图片描述
  2. 如果真实块有两个后继块,那么真实块中一定有一条mov eax, xxx指令和一条mov ecx, xxx指令,以及一条cmovxx eax, ecx指令(这三条指令相当于一个条件跳转)
    图片描述

通过正则匹配可以识别出哪些真实块是绝对跳转,哪些真实块是条件跳转,同时也能得到后继块的key,进而通过k2n_map得到跳转的地址。条件跳转的类型其实就是cmov指令的类型(例如cmovnz对应jnz)。如此以来便可以得到真实块之间的跳转关系。

修复控制流

通过正则匹配得到了真实块之间的跳转关系之后就可以直接patch了,代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def fix_branch():
    # Nop all useless blocks
    for block in useless_blocks:
        fill_nops(block.addr, block.size)
    relevant_blocks.append(prologue_block)
    for block in relevant_blocks:
        key_reg, sub_key_reg = 'eax', 'ecx'
        key, sub_key = None, None
        cmov_mnemonic = None
        for insn in block.insns:
            mnemonic, op_str = insn.mnemonic, insn.op_str
            if mnemonic == 'mov':
                if re.match(rf'{key_reg}, 0x[0-9a-f]+', op_str):
                    tmp_key = int(op_str.replace(f'{key_reg}, ', ''), 16)
                    if tmp_key in k2n_map:
                        key = tmp_key
                elif re.match(rf'{sub_key_reg}, 0x[0-9a-f]+', op_str):
                    tmp_sub_key = int(op_str.replace(f'{sub_key_reg}, ', ''), 16)
                    if tmp_sub_key in k2n_map:
                        sub_key = tmp_sub_key
            elif mnemonic.startswith('cmov') and op_str == f'{key_reg}, {sub_key_reg}':
                cmov_mnemonic = mnemonic
        terminator = block.terminator
        if key is not None and sub_key is None:
            logger.debug('[*] Node(%#x) has one successor(%#x)' % (block.addr, k2n_map[key]))
            fill_jmp(terminator.address, k2n_map[key])
        elif key is not None and sub_key is not None:
            if cmov_mnemonic is not None:
                logger.debug('[*] Node(%#x) has two successors(%#x, %#x)' % (block.addr, k2n_map[key], k2n_map[sub_key]))
                fill_jx(terminator.address, k2n_map[sub_key], cmov_mnemonic)
                fill_jmp(terminator.address + 6, k2n_map[key])
            else:
                logger.error('[*] Error at block(%#x)' % block.addr)
                exit(0)
        else:
            logger.debug('[*] Node(%#x) has no successor' % block.addr)

3. 反混淆效果检验

首先丢进虚拟机跑一下确定没有爆炸:
图片描述
IDA中查看,控制流已经恢复:
图片描述
Binary Ninja查看,恢复一下函数符号和字符串,代码的逻辑就很清晰了:
图片描述

4. 完整代码-v1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import angr
import logging
import re
import claripy
from keystone import *
import sys
import time
 
def fill_nops(addr, size):
    offset = addr - base_addr
    content[offset:offset + size] = b'\x90' * size
 
def fill_jmp(src, dest):
    offset = src - base_addr
    if dest != src + 5:
        content[offset] = 0xE9
        content[offset + 1:offset + 5] = (dest - src - 5).to_bytes(4, 'little', signed=True)
    else:
        fill_nops(src, 5)
 
def get_jx_opcode(jx_type):
    ks = Ks(KS_ARCH_X86, KS_MODE_64)
    code, count = ks.asm(f'{jx_type} 0xFFFFFFFF')
    return b''.join(map(lambda x: x.to_bytes(1, sys.byteorder), code[0:2]))
 
def fill_jx(src, dest, cmov_type):
    offset = src - base_addr
    content[offset:offset + 2] = get_jx_opcode(cmov_type.replace('cmov', 'j'))
    content[offset + 2:offset + 6] = (dest - src - 6).to_bytes(4, 'little', signed=True)
 
class SuperBlock:
    def __init__(self, block_addr):
        self.addr = block_addr
        self.blocks = []
        self.size = 0
 
    def insert_block(self, block):
        self.blocks.append(block)
        self.size += block.size
 
    @property
    def terminator(self):
        return self.blocks[-1].capstone.insns[-1]
 
    @property
    def successor(self):
        terminator = self.terminator
        try:
            return int(terminator.op_str, 16)
        except:
            return None
 
    @property
    def insns(self):
        insns = []
        for block in self.blocks:
            insns += block.capstone.insns
        return insns
 
    def __repr__(self):
        return "<SuperBlock %#08x->%#08x>" % (self.addr, self.addr + self.size)
 
    def __hash__(self):
        return hash(('superblock', self.addr))
 
class FunctionCFG:
 
    def __init__(self, proj, func_addr, func_end):
        self.proj = proj
        self.addr = func_addr
        self.size = func_end - func_addr
 
    def get_super_block(self, block_addr):
        super_block = SuperBlock(block_addr)
        cur_addr = block_addr
        while True:
            block = self.proj.factory.block(cur_addr)
            if block.size == 0:
                return super_block
            insns = block.capstone.insns
            super_block.insert_block(block)
            cur_addr += block.size
            if insns[-1].mnemonic.startswith('j') or insns[-1].mnemonic == 'ret':
                return super_block
 
    def dump_all_blocks(self):
        curr_addr = self.addr
        blocks = []
        while curr_addr - self.addr < self.size:
            super_block = self.get_super_block(curr_addr)
            if super_block.size == 0:
                curr_addr += 1
                continue
            blocks.append(super_block)
            curr_addr += super_block.size
        return blocks
 
    def anaylse_blocks(self):
        all_blocks = self.dump_all_blocks()
        relevant_blocks = []
        useless_blocks = []
        for block in all_blocks:
            if block.addr != self.addr and \
                    (block.successor is not None
                    or block.terminator.mnemonic == 'ret'
                    or block.terminator.mnemonic == 'call'):
                relevant_blocks.append(block)
            elif block.addr != self.addr:
                useless_blocks.append(block)
        return relevant_blocks, useless_blocks
 
def skip_call(state):
    proj.hook(state.addr, angr.SIM_PROCEDURES["stubs"]["ReturnUnconstrained"](), replace=True)
 
# Map node onto key and return a dict
def map_k2n():
    state = proj.factory.call_state(addr=func_addr)
    # Skip call
    state.inspect.b('call', when=angr.BP_BEFORE, action=skip_call)
    simgr = proj.factory.simgr(state)
    simgr.explore(find=dispatcher_node)
    found = simgr.found[0]
    # Symbolize eax
    key = claripy.BVS('key', 32)
    found.regs.eax = key
    simgr = proj.factory.simgr(found)
    visited = []
    k2n_map = {}
    while len(simgr.active):
        # Filter all visited states to prevent from sticking in a loop
        simgr.move(from_stash='active', to_stash='deadended', filter_func=lambda state: state.addr in visited)
        for active in simgr.active:
            visited.append(active.addr)
            if active.addr in relevant_nodes:
                key_val = active.solver.eval(key)
                k2n_map[key_val] = active.addr
                logger.debug('[*] Mapped node(%#x)->key(%#x) [%d/%d]'
                             % (active.addr, key_val, len(k2n_map), len(relevant_nodes)))
        simgr.step()
    return k2n_map
 
def fix_branch():
    # Nop all useless blocks
    for block in useless_blocks:
        fill_nops(block.addr, block.size)
    relevant_blocks.append(prologue_block)
    for block in relevant_blocks:
        key_reg, sub_key_reg = 'eax', 'ecx'
        key, sub_key = None, None
        cmov_mnemonic = None
        for insn in block.insns:
            mnemonic, op_str = insn.mnemonic, insn.op_str
            if mnemonic == 'mov':
                if re.match(rf'{key_reg}, 0x[0-9a-f]+', op_str):
                    tmp_key = int(op_str.replace(f'{key_reg}, ', ''), 16)
                    if tmp_key in k2n_map:
                        key = tmp_key
                elif re.match(rf'{sub_key_reg}, 0x[0-9a-f]+', op_str):
                    tmp_sub_key = int(op_str.replace(f'{sub_key_reg}, ', ''), 16)
                    if tmp_sub_key in k2n_map:
                        sub_key = tmp_sub_key
            elif mnemonic.startswith('cmov') and op_str == f'{key_reg}, {sub_key_reg}':
                cmov_mnemonic = mnemonic
        terminator = block.terminator
        if key is not None and sub_key is None:
            logger.debug('[*] Node(%#x) has one successor(%#x)' % (block.addr, k2n_map[key]))
            fill_jmp(terminator.address, k2n_map[key])
        elif key is not None and sub_key is not None:
            if cmov_mnemonic is not None:
                logger.debug('[*] Node(%#x) has two successors(%#x, %#x)' % (block.addr, k2n_map[key], k2n_map[sub_key]))
                fill_jx(terminator.address, k2n_map[sub_key], cmov_mnemonic)
                fill_jmp(terminator.address + 6, k2n_map[key])
            else:
                logger.error('[*] Error at block(%#x)' % block.addr)
                exit(0)
        else:
            logger.debug('[*] Node(%#x) has no successor' % block.addr)
 
if __name__ == '__main__':
    # Initilization
    logging.getLogger('cle').setLevel(logging.ERROR)
    logging.getLogger('angr').setLevel(logging.ERROR)
    logger = logging.getLogger('deobfu')
    logger.setLevel(logging.DEBUG)
    logger.addHandler(logging.FileHandler(filename='%s.log' % time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())))
    filename = 'crackme-pls.bin'
    proj = angr.Project(filename)
    base_addr = proj.loader.main_object.mapped_base
    reverse_plt = proj.loader.main_object.reverse_plt
    with open(filename, 'rb') as file:
        content = bytearray(file.read())
    # Prepare sigificant arguments
    func_addr, func_end = 0x403D90, 0x40419F
    # Analyse function CFG in a very crude way
    cfg = FunctionCFG(proj, func_addr, func_end)
    # Get prologue block at func addr
    prologue_block = cfg.get_super_block(func_addr)
    # The successor of prologue block is dispatcher block
    dispatcher_node = prologue_block.successor
    relevant_blocks, useless_blocks = cfg.anaylse_blocks()
    # Convert block list to addr list
    relevant_nodes = [block.addr for block in relevant_blocks]
    # Map node onto key
    k2n_map = map_k2n()
    # Fix control flow
    fix_branch()
    with open(filename + '.recover', 'wb') as file:
        file.write(content)

0x02. 反混淆流程(child_func函数)

在复原后的main函数中我们可以看到子进程读取了60个字节的输入之后调用了child_func函数,接下来我们来看看如何复原child_func函数。

1. 混淆结构

还是先来看看混淆结构:
图片描述
乍一看好像变成两层的控制流平坦化了?仔细分析发现原来是有几个基本块“特立独行”,它们的末尾是直接跳转,但没有像正常真实块那样跳转到预分发块,而是直接跳到了另外一个真实块:
图片描述
至于为什么会出现这种特立独行的基本块,根据题目的源代码来看,应该是因为编译器的神奇优化,将这几个异或操作之后的赋值操作合并到一个基本块中了(也即是402AA0中的mov [rax], cl指令):
图片描述

2. 处理方法

对于这几个特立独行的基本块,我们在分析时仍然将其当做真实块,但不要对它的跳转进行Patch。v1版本的去混淆脚本仍然适用,因为这几个基本块中没有mov eax, xxx指令,也就会被认为没有后继真实块,自然不会被Patch。

3. 反混淆效果检验

还是先运行一下,确定没有爆炸:
图片描述
拖进IDA查看,基本是正常的:
图片描述
用Biary Ninja复原一下函数调用,ptrace表明有父进程调试子进程的操作:
图片描述
中间的一大坨循环暂且跳过,再看看最后的函数调用:
图片描述
中间的循环Binary Ninja的反汇编效果不是很好,所以还是用IDA看,结合最后的函数调用可以看出是一个解密ELF文件然后执行的操作:
图片描述
加密后的ELF前四个字节与输入的前四个字节异或,要得到合法的ELF头0x7F和ELF三个字符。于是得到了输入的前四个字符:
图片描述
动态调试把解密后的 ELF dump下来,因为这个帖主要是讲去混淆,所以这部分就不细说了。

0x03. 反混淆流程(parent_func函数)

1. 混淆结构

这题最难搞的函数,首先还是来看看CFG,基本块展开成了4层:
图片描述
决定接下来要跳转到哪个真实块的关键寄存器变成了ecx
图片描述
第一层基本块都是真实块,直接跳转到预分发块:
图片描述
第二层基本块都跳转到了第一层的两个基本块之一,并且两个基本块没有mov ecx, xxx指令,也就是说接下来要跳转到哪个真实块实际上是由第二层的基本块自己决定的:
图片描述
这样就导致了patch工作非常困难,显然不能在标红框的两个基本块中patch跳转,因为后继真实块不是由它们决定的;但如果让第二层的基本块直接跳转到后继真实块,又会漏掉第一层基本块的一些指令,导致程序出错。
我没有想到更好的解决办法,只能牺牲掉一些准确性保全大局,如果你有更好的主意欢迎交流讨论。
再看到第三层基本块,都跳转到了第二层基本块中的三个之一:
图片描述
第四层的情况也差不多:
图片描述
这里还有两个以call exit结尾的基本块,很容易看出来,这两个基本块我们待会也要作为特殊情况考虑进去:
图片描述

2. 处理方法

因为关键寄存器变成了ecx,所以这里要改为:

1
key_reg, sub_key_reg = 'ecx', 'eax'

另外虽然关键寄存器变成了ecx,实际上是预分发块中有一条指令mov eax, ecx,子分发块还是根据eax来跳转的:
图片描述
所以处理一下序言块到第一个真实块的特殊情况:

1
2
3
if block.addr == prologue_block.addr:
    logger.debug('[*] Progolue node(%#x) has one successor(%#x)' % (block.addr, k2n_map[sub_key]))
    fill_jmp(terminator.address, k2n_map[sub_key])

然后来处理一下这种情况,mov ecx, xxxmov eax, xxx以及cmovxx ecx, eax不在同一个基本块中:
图片描述
那么我们需要在修复控制流时加上一种情况,如果在当前基本块发现了mov ecx, xxxmov eax, xxx但是没有发现cmovxx ecx, eax指令,就往它的直接后继基本块继续搜索:

1
2
3
4
5
6
7
8
elif key is not None and sub_key is not None:
    cur_block = block
    while cur_block.successor is not None and cmov_mnemonic is None:
        cur_block = cfg.get_super_block(cur_block.successor)
         for insn in cur_block.insns:
            mnemonic, op_str = insn.mnemonic, insn.op_str
            if mnemonic.startswith('cmov') and op_str == f'{key_reg}, {sub_key_reg}':
                cmov_mnemonic = mnemonic

接下来处理两个以call exit结尾的基本块:

1
2
3
# Prepare sigificant arguments
func_addr, func_end = 0x402B70, 0x403D81
call_exit = [0x403D3D, 0x403D7E]

手动记录一下两条call exit指令的地址,在分析SuperBlock时把这两个地址也考虑进去:

1
2
3
4
5
6
7
8
9
10
11
12
13
def get_super_block(self, block_addr):
    super_block = SuperBlock(block_addr)
    cur_addr = block_addr
    while True:
        block = self.proj.factory.block(cur_addr)
        if block.size == 0:
            return super_block
        insns = block.capstone.insns
        super_block.insert_block(block)
        cur_addr += block.size
        if insns[-1].mnemonic.startswith('j') or insns[-1].mnemonic == 'ret'\
                or insns[-1].address in call_exit: # 考虑以 call exit 结尾的情况
            return super_block

本来我以为这样就搞定了,但接下来我又发现了新的问题,在输出的日志中我发现有些应该有后继真实块的基本块却找不到后继真实块,例如:

1
[*] Node(0x403c20) has no successor

图片描述
很明显loc_403C20的后继真实块应该是326BF726h1594BB6Ah对应的基本块,而我们在恢复跳转关系的时候却找不到后继块,这是因为我们根本没有找到0x326BF726对应的真实块,真是奇了怪了:
图片描述
如果在主分发块处将eax设为claripy.BVV(0x326BF726, 32)开始符号执行,会发现它在返回预分发块之前到达了0x403230基本块:
图片描述
图片描述
这个基本块跟其它子分发块有点相似,但是有不完全相同,体现在它多出了一些指令,这些指令实际上本来应该是0x326BF726对应的真实块,貌似又是编译器神奇优化的锅:
图片描述
除了0x403230之外,后续还发现了一些这样的基本块,总共有三个,分别对应326BF726h3311C734h703837C4h
图片描述
图片描述
把这三个“小霸王”也考虑进去:

1
2
additional_map = {0x326BF726: 0x403243, 0x3311C734: 0x4035A1, 0x703837C4: 0x4037A0}
resize_map = {0x403230: 19, 0x40358E: 19, 0x403792: 14}
1
2
3
# Map node onto key
k2n_map = map_k2n()
k2n_map.update(additional_map)
1
2
3
relevant_blocks, useless_blocks = cfg.anaylse_blocks()
for addr in additional_map.values():
    relevant_blocks.append(cfg.get_super_block(addr))
1
2
3
# Nop all useless blocks
for block in useless_blocks:
    fill_nops(block.addr, resize_map[block.addr] if block.addr in resize_map else block.size)

这时修复控制流的结果就正常了。

3.反混淆效果检验

运行爆炸了,在意料之中:
图片描述
IDA打开看看,控制流恢复得还是挺不错的:
图片描述
Binary Ninja查看代码可读性还是很高的,就是漏了一些地方:
图片描述
与源代码对比:
图片描述 图片描述

4. 完整代码-v2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import angr
import logging
import re
import claripy
from keystone import *
import sys
import time
 
def fill_nops(addr, size):
    offset = addr - base_addr
    content[offset:offset + size] = b'\x90' * size
 
def fill_jmp(src, dest):
    offset = src - base_addr
    if dest != src + 5:
        content[offset] = 0xE9
        content[offset + 1:offset + 5] = (dest - src - 5).to_bytes(4, 'little', signed=True)
    else:
        fill_nops(src, 5)
 
def get_jx_opcode(jx_type):
    ks = Ks(KS_ARCH_X86, KS_MODE_64)
    code, count = ks.asm(f'{jx_type} 0xFFFFFFFF')
    return b''.join(map(lambda x: x.to_bytes(1, sys.byteorder), code[0:2]))
 
def fill_jx(src, dest, cmov_type):
    offset = src - base_addr
    content[offset:offset + 2] = get_jx_opcode(cmov_type.replace('cmov', 'j'))
    content[offset + 2:offset + 6] = (dest - src - 6).to_bytes(4, 'little', signed=True)
 
class SuperBlock:
    def __init__(self, block_addr):
        self.addr = block_addr
        self.blocks = []
        self.size = 0
 
    def insert_block(self, block):
        self.blocks.append(block)
        self.size += block.size
 
    @property
    def terminator(self):
        return self.blocks[-1].capstone.insns[-1]
 
    @property
    def successor(self):
        terminator = self.terminator
        try:
            return int(terminator.op_str, 16)
        except:
            return None
 
    @property
    def insns(self):
        insns = []
        for block in self.blocks:
            insns += block.capstone.insns
        return insns
 
    def __repr__(self):
        return "<SuperBlock %#08x->%#08x>" % (self.addr, self.addr + self.size)
 
    def __hash__(self):
        return hash(('superblock', self.addr))
 
class FunctionCFG:
 
    def __init__(self, proj, func_addr, func_end):
        self.proj = proj
        self.addr = func_addr
        self.size = func_end - func_addr
 
    def get_super_block(self, block_addr):
        super_block = SuperBlock(block_addr)
        cur_addr = block_addr
        while True:
            block = self.proj.factory.block(cur_addr)
            if block.size == 0:
                return super_block
            insns = block.capstone.insns
            super_block.insert_block(block)
            cur_addr += block.size
            if insns[-1].mnemonic.startswith('j') or insns[-1].mnemonic == 'ret'\
                    or insns[-1].address in call_exit:
                return super_block
 
    def dump_all_blocks(self):
        curr_addr = self.addr
        blocks = []
        while curr_addr - self.addr < self.size:
            super_block = self.get_super_block(curr_addr)
            if super_block.size == 0:
                curr_addr += 1
                continue
            blocks.append(super_block)
            curr_addr += super_block.size
        return blocks
 
    def anaylse_blocks(self):
        all_blocks = self.dump_all_blocks()
        relevant_blocks = []
        useless_blocks = []
        for block in all_blocks:
            if block.addr != self.addr and \
                    (block.successor is not None
                    or block.terminator.mnemonic == 'ret'
                    or block.terminator.mnemonic == 'call'
                    or block.addr in additional_map.values()):
                relevant_blocks.append(block)
            elif block.addr != self.addr:
                useless_blocks.append(block)
        return relevant_blocks, useless_blocks
 
def skip_call(state):
    proj.hook(state.addr, angr.SIM_PROCEDURES["stubs"]["ReturnUnconstrained"](), replace=True)
 
# Map node onto key and return a dict
def map_k2n():
    state = proj.factory.call_state(addr=func_addr)
    # Skip call
    state.inspect.b('call', when=angr.BP_BEFORE, action=skip_call)
    simgr = proj.factory.simgr(state)
    simgr.explore(find=dispatcher_node)
    found = simgr.found[0]
    # Symbolize eax
    key = claripy.BVS('key', 32)
    found.regs.eax = key
    simgr = proj.factory.simgr(found)
    visited = []
    k2n_map = {}
    while len(simgr.active):
        # Filter all visited states to prevent from sticking in a loop
        simgr.move(from_stash='active', to_stash='deadended', filter_func=lambda state: state.addr in visited)
        for active in simgr.active:
            visited.append(active.addr)
            if active.addr in relevant_nodes:
                key_val = active.solver.eval(key)
                k2n_map[key_val] = active.addr
                logger.debug('[*] Mapped node(%#x)->key(%#x) [%d/%d]'
                             % (active.addr, key_val, len(k2n_map), len(relevant_nodes)))
        simgr.step()
    return k2n_map
 
def fix_branch():
    # Nop all useless blocks
    for block in useless_blocks:
        fill_nops(block.addr, resize_map[block.addr] if block.addr in resize_map else block.size)
    relevant_blocks.append(prologue_block)
    for block in relevant_blocks:
        key_reg, sub_key_reg = 'ecx', 'eax'           
        key, sub_key = None, None
        cmov_mnemonic = None
        for insn in block.insns:
            mnemonic, op_str = insn.mnemonic, insn.op_str
            if mnemonic == 'mov':
                if re.match(rf'{key_reg}, 0x[0-9a-f]+', op_str):
                    tmp_key = int(op_str.replace(f'{key_reg}, ', ''), 16)
                    if tmp_key in k2n_map:
                        key = tmp_key
                elif re.match(rf'{sub_key_reg}, 0x[0-9a-f]+', op_str):
                    tmp_sub_key = int(op_str.replace(f'{sub_key_reg}, ', ''), 16)
                    if tmp_sub_key in k2n_map:
                        sub_key = tmp_sub_key
            elif mnemonic.startswith('cmov') and op_str == f'{key_reg}, {sub_key_reg}':
                cmov_mnemonic = mnemonic
        terminator = block.terminator
        if block.addr == prologue_block.addr:
            logger.debug('[*] Progolue node(%#x) has one successor(%#x)' % (block.addr, k2n_map[sub_key]))
            fill_jmp(terminator.address, k2n_map[sub_key])
        elif key is not None and sub_key is None:
            logger.debug('[*] Node(%#x) has one successor(%#x)' % (block.addr, k2n_map[key]))
            fill_jmp(terminator.address, k2n_map[key])
        elif key is not None and sub_key is not None:
            cur_block = block
            while cur_block.successor is not None and cmov_mnemonic is None:
                cur_block = cfg.get_super_block(cur_block.successor)
                for insn in cur_block.insns:
                    mnemonic, op_str = insn.mnemonic, insn.op_str
                    if mnemonic.startswith('cmov') and op_str == f'{key_reg}, {sub_key_reg}':
                        cmov_mnemonic = mnemonic
            if cmov_mnemonic is not None:
                logger.debug('[*] Node(%#x) has two successors(%#x, %#x)' % (block.addr, k2n_map[key], k2n_map[sub_key]))
                fill_jx(terminator.address, k2n_map[sub_key], cmov_mnemonic)
                fill_jmp(terminator.address + 6, k2n_map[key])
            else:
                logger.error('[*] Error at block(%#x)' % block.addr)
                exit(0)
        else:
            logger.debug('[*] Node(%#x) has no successor' % block.addr)
 
if __name__ == '__main__':
    # Initilization
    logging.getLogger('cle').setLevel(logging.ERROR)
    logging.getLogger('angr').setLevel(logging.ERROR)
    logger = logging.getLogger('deobfu')
    logger.setLevel(logging.DEBUG)
    logger.addHandler(logging.FileHandler(filename='%s.log' % time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())))
    filename = 'crackme-pls.bin.recover'
    proj = angr.Project(filename)
    base_addr = proj.loader.main_object.mapped_base
    reverse_plt = proj.loader.main_object.reverse_plt
    with open(filename, 'rb') as file:
        content = bytearray(file.read())
    # Prepare sigificant arguments
    func_addr, func_end = 0x402B70, 0x403D81
    call_exit = [0x403D3D, 0x403D7E]
    additional_map = {0x326BF726: 0x403243, 0x3311C734: 0x4035A1, 0x703837C4: 0x4037A0}
    resize_map = {0x403230: 19, 0x40358E: 19, 0x403792: 14}
    # Analyse function CFG in a very crude way
    cfg = FunctionCFG(proj, func_addr, func_end)
    # Get prologue block at func addr
    prologue_block = cfg.get_super_block(func_addr)
    # The successor of prologue block is dispatcher block
    dispatcher_node = prologue_block.successor
    relevant_blocks, useless_blocks = cfg.anaylse_blocks()
    for addr in additional_map.values():
        relevant_blocks.append(cfg.get_super_block(addr))
    # Convert block list to addr list
    relevant_nodes = [block.addr for block in relevant_blocks]
    # Map node onto key
    k2n_map = map_k2n()
    k2n_map.update(additional_map)
    # Fix control flow
    fix_branch()
    with open(filename + '.recover_', 'wb') as file:
        file.write(content)

0x04. 总结

这题的加密逻辑有点复杂,是一个父进程调试子进程的模式,还有其他ELF释放,所以我也没继续分析下去了,感兴趣的可以看看上面贴的WP或者题目的源代码。
至于剩下几个被混淆的函数就比较简单了,跟main函数类似,用v1版本的去混淆脚本能够通杀,所以这里不再赘述。v2版本的代码相当于是在v1的基础上缝缝补补了一些东西,很不优雅,如果大家有更巧妙的思路欢迎交流讨论


[培训]二进制漏洞攻防(第3期);满10人开班;模糊测试与工具使用二次开发;网络协议漏洞挖掘;Linux内核漏洞挖掘与利用;AOSP漏洞挖掘与利用;代码审计。

最后于 2022-1-14 15:19 被34r7hm4n编辑 ,原因:
收藏
点赞10
打赏
分享
最新回复 (6)
雪    币: 8270
活跃值: (4786)
能力值: ( LV4,RANK:45 )
在线值:
发帖
回帖
粉丝
v0id_ 2022-1-14 17:42
2
0
地球人 我的超人
雪    币: 676
活跃值: (857)
能力值: ( LV4,RANK:42 )
在线值:
发帖
回帖
粉丝
trackL 2022-1-18 23:37
3
0
wow感谢分享
雪    币: 172
活跃值: (741)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
4
0
楼主不考虑用ir中间语言的方法去混淆后回编译吗?貌似这种方法更通用且效果更好。
雪    币: 14301
活跃值: (10654)
能力值: ( LV12,RANK:360 )
在线值:
发帖
回帖
粉丝
34r7hm4n 7 2023-7-16 18:34
5
0
wx_ ╰☆L玄V0心E☆╮ 楼主不考虑用ir中间语言的方法去混淆后回编译吗?貌似这种方法更通用且效果更好。
是的。但是从可执行文件->ir中间语言这一步就很麻烦
雪    币: 19244
活跃值: (28872)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
秋狝 2023-7-17 10:02
6
1
感谢分享
雪    币: 172
活跃值: (741)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
7
0
34r7hm4n 是的。但是从可执行文件->ir中间语言这一步就很麻烦
这一步可以用retdec的模块实现,就比较方便呀
游客
登录 | 注册 方可回帖
返回