首页
社区
课程
招聘
[原创]OLLVM Deflattener
2021-7-26 22:23 12933

[原创]OLLVM Deflattener

erfze 活跃值
12
2021-7-26 22:23
12933

文章首发于安全客——链接

0x00 前言

笔者于五月份时遇到几个经控制流平坦化的样本,由于之前没有接触过这方面知识,未深入分析。

 

 

七月初看到一篇博客——MODeflattener - Miasm's OLLVM Deflattener,作者同时在Github上公开其源码,故笔者决定对其一探究竟。该工具使用miasm框架,Quarklab于2014年的博客——Deobfuscation: recovering an OLLVM-protected program中已提到此框架,但其采用符号执行以遍历代码,计算每个Relevant Block目标终点,恢复执行流程。腾讯SRC于2017年的博客——利用符号执行去除控制流平坦化中亦是采用符号执行(基于angr框架),而MODeflattener采用静态分析方式来恢复执行流程。笔者于下文首先结合具体样本分析其源码及各函数功能,之后指出该工具不足之处并提出改进方案。关于涉及到的miasm框架中类,函数,属性及方法可参阅miasm Documentationmiasm源码。LLVM,OLLVM,LLVM Pass及控制流平坦化不再赘述,具体可参阅基于LLVM Pass实现控制流平坦化OLLVM Flattening源码

0x01 源码分析

0x01.1 检测平坦化

1
2
3
4
5
6
7
8
9
10
11
12
def calc_flattening_score(asm_graph):
    score = 0.0
    for head in asm_graph.heads_iter():
        dominator_tree = asm_graph.compute_dominator_tree(head)
        for block in asm_graph.blocks:
            block_key = asm_graph.loc_db.get_offset_location(block.lines[0].offset)
            dominated = set(
                [block_key] + [b for b in dominator_tree.walk_depth_first_forward(block_key)])
            if not any([b in dominated for b in asm_graph.predecessors(block_key)]):
                continue
            score = max(score, len(dominated)/len(asm_graph.nodes()))
    return score

计算每个块支配块数量,除以图中所有块,取最高分——若不低于0.9,证明该函数经过控制流平坦化。如下图:

 

 

块b支配除块a外其余所有块(包含自身),那么其得分为9/10=0.9即最高分。读者可自行查阅Dominator Tree及Predecessors相关数据结构知识,此外不作展开。

0x01.2 获取平坦化相关信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def get_cff_info(asmcfg):
    preds = {}
    for blk in asmcfg.blocks:
        offset = asmcfg.loc_db.get_location_offset(blk.loc_key)
        preds[offset] = asmcfg.predecessors(blk.loc_key)
    # pre-dispatcher is the one with max predecessors
    pre_dispatcher = sorted(preds, key=lambda key: len(preds[key]), reverse=True)[0]
    # dispatcher is the one which suceeds pre-dispatcher
    dispatcher = asmcfg.successors(asmcfg.loc_db.get_offset_location(pre_dispatcher))[0]
    dispatcher = asmcfg.loc_db.get_location_offset(dispatcher)
 
    # relevant blocks are those which preceed the pre-dispatcher
    relevant_blocks = []
    for loc in preds[pre_dispatcher]:
        offset = asmcfg.loc_db.get_location_offset(loc)
        relevant_blocks.append(get_block_father(asmcfg, offset))
 
    return relevant_blocks, dispatcher, pre_dispatcher

首先拥有最大数量Predecessor者即为Pre-Dispatcher,而Pre-Dispatcher后继为DispatcherPre-Dispatcher所有前趋均为Relevant Block

1
2
3
dispatcher_blk = main_asmcfg.getby_offset(dispatcher)
dispatcher_first_instr = dispatcher_blk.lines[0]
state_var = dispatcher_first_instr.get_args_expr()[1]

Dispatcher块第一条指令为状态变量:

 

0x01.3 去平坦化

1
2
3
4
5
6
7
8
9
10
11
for addr in relevant_blocks:
        _log.debug("Getting info for relevant block @ %#x"%addr)
        loc_db = LocationDB()
        mdis = machine.dis_engine(cont.bin_stream, loc_db=loc_db)
        mdis.dis_block_callback = stop_on_jmp
        asmcfg = mdis.dis_multiblock(addr)
 
        lifter = machine.lifter_model_call(loc_db)
        ircfg = lifter.new_ircfg_from_asmcfg(asmcfg)
        ircfg_simplifier = IRCFGSimplifierCommon(lifter)
        ircfg_simplifier.simplify(ircfg, addr)

遍历每个Relevant Block,为其创建ASM CFG及IR CFG,作者实现了一save_cfg函数可将CFG输出(需要安装Graphviz):

1
2
3
4
5
def save_cfg(cfg, name):
    import subprocess
    open(name, 'w').write(cfg.dot())
    subprocess.call(["dot", "-Tpng", name, "-o", name.split('.')[0]+'.png'])
    subprocess.call(["rm", name])

之后查找所有使用状态变量及定义状态变量语句:

1
2
3
4
5
6
7
8
9
10
11
12
nop_addrs = find_state_var_usedefs(ircfg, state_var)
...
def find_state_var_usedefs(ircfg, search_var):
    var_addrs = set()
    reachings = ReachingDefinitions(ircfg)
    digraph = DiGraphDefUse(reachings)
    # the state var always a leaf
    for leaf in digraph.leaves():
        if leaf.var == search_var:
            for x in (digraph.reachable_parents(leaf)):
                var_addrs.add(ircfg.get_block(x.label)[x.index].instr.offset)
    return var_addrs

 

可以看到DefUse图与语句对应关系:

 

 

查找状态变量所有可能值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def find_var_asg(ircfg, var):
    val_list = []
    res = {}
    for lbl, irblock in viewitems(ircfg.blocks):
        for assignblk in irblock:
            result = set(assignblk).intersection(var)
            if not result:
                continue
            else:
                dst, src = assignblk.items()[0]
                if isinstance(src, ExprInt):
                    res['next'] = int(src)
                    val_list += [int(src)]
                elif isinstance(src, ExprSlice):
                    phi_vals = get_phi_vars(ircfg)
                    res['true_next'] = phi_vals[0]
                    res['false_next'] = phi_vals[1]
                    val_list += phi_vals
    return res, val_list

为状态变量赋值有两种情况——一种是直接赋值,不涉及条件判断及分支跳转,其对应ExprInt

 

 

 

另一种是CMOV条件赋值,其对应ExprSlice

 

 

 

之后计算每一状态变量可能值对应Relevant Block

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# get the value for reaching a particular relevant block
for lbl, irblock in viewitems(main_ircfg.blocks):
    for assignblk in irblock:
        asg_items = assignblk.items()
        if asg_items:    # do not enter if nop
            dst, src = asg_items[0]
            if isinstance(src, ExprOp):
                if src.op == 'FLAG_EQ_CMP':
                    arg = src.args[1]
                    if isinstance(arg, ExprInt):
                        if int(arg) in val_list:
                            cmp_val = int(arg)
                            var, locs = irblock[-1].items()[0]
                            true_dst = main_ircfg.loc_db.get_location_offset(locs.src1.loc_key)
                            backbone[hex(cmp_val)] = hex(true_dst)

如下图:

 

 

fixed_cfg中存储的状态变量值替换为目标Relevant Block地址:

1
2
3
4
5
6
7
8
9
10
11
for offset, link in fixed_cfg.items():
        if 'cond' in link:
            tval = fixed_cfg[offset]['true_next']
            fval = fixed_cfg[offset]['false_next']
            fixed_cfg[offset]['true_next'] = backbone[tval]
            fixed_cfg[offset]['false_next'] = backbone[fval]
        elif 'next' in link:
            fixed_cfg[offset]['next'] = backbone[link['next']]
        else:
            # the tail doesn't has any condition
            tail = int(offset, 16)

将所有无关指令替换为NOP:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
for addr in rel_blk_info.keys():
        _log.info('=> cleaning relevant block @ %#x' % addr)
        asmcfg, nop_addrs = rel_blk_info[addr]
        link = fixed_cfg[hex(addr)]
        instrs = [instr for blk in asmcfg.blocks for instr in blk.lines]
        last_instr = instrs[-1]
        end_addr = last_instr.offset + last_instr.l
        # calculate original length of block before patching
        orig_len = end_addr - addr
        # nop the jmp to pre-dispatcher
        nop_addrs.add(last_instr.offset)
        _log.debug('nop_addrs: ' + ', '.join([hex(addr) for addr in nop_addrs]))
        patch = patch_gen(instrs, asmcfg.loc_db, nop_addrs, link)
        patch = patch.ljust(orig_len, b"\x90")
        patches[addr] = patch
        _log.debug('patch generated %s\n' % encode_hex(patch))
 
    _log.info(">>> NOPing Backbone (%#x - %#x) <<<" % (backbone_start, backbone_end))
    nop_len = backbone_end - backbone_start
    patches[backbone_start] = b"\x90" * nop_len

其中patch_gen函数定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def patch_gen(instrs, loc_db, nop_addrs, link):
    final_patch = b""
    start_addr = instrs[0].offset
 
    for instr in instrs:
        #omitting useless instructions
        if instr.offset not in nop_addrs:
            if instr.is_subcall():
                #generate asm for fixed calls with relative addrs
                patch_addr = start_addr + len(final_patch)
                tgt = loc_db.get_location_offset(instr.args[0].loc_key)
                _log.info("CALL %#x" % tgt)
                call_patch_str = "CALL %s" % rel(tgt, patch_addr)
                _log.debug("call patch : %s" % call_patch_str)
                call_patch = asmb(call_patch_str, loc_db)
                final_patch += call_patch
                _log.debug("call patch asmb : %s" % encode_hex(call_patch))
            else:
                #add the original bytes
                final_patch += instr.b
 
    patch_addr = start_addr + len(final_patch)
    _log.debug("jmps patch_addr : %#x", patch_addr)
    jmp_patches = b""
    # cleaning the control flow by patching with real jmps locs
    if 'cond' in link:
        t_addr = int(link['true_next'], 16)
        f_addr = int(link['false_next'], 16)
        jcc = link['cond'].replace('CMOV', 'J')
        _log.info("%s %#x" % (jcc, t_addr))
        _log.info("JMP %#x" % f_addr)
 
        patch1_str = "%s %s" % (jcc, rel(t_addr, patch_addr))
        jmp_patches += asmb(patch1_str, loc_db)
        patch_addr += len(jmp_patches)
 
        patch2_str = "JMP %s" % (rel(f_addr, patch_addr))
        jmp_patches += asmb(patch2_str, loc_db)
        _log.debug("jmp patches : %s; %s" % (patch1_str, patch2_str))
 
    else:
        n_addr = int(link['next'], 16)
        _log.info("JMP %#x" % n_addr)
 
        patch_str = "JMP %s" % rel(n_addr, patch_addr)
        jmp_patches = asmb(patch_str, loc_db)
 
        _log.debug("jmp patches : %s" % patch_str)
 
    _log.debug("jmp patches asmb : %s" % encode_hex(jmp_patches))
    final_patch += jmp_patches
 
    return final_patch

首先检查每个块中是否存在函数调用,如果存在,计算其修补后偏移;否则直接使用原指令:

1
2
3
4
5
6
7
8
9
10
11
12
13
   if instr.is_subcall():
    #generate asm for fixed calls with relative addrs
    patch_addr = start_addr + len(final_patch)
    tgt = loc_db.get_location_offset(instr.args[0].loc_key)
    _log.info("CALL %#x" % tgt)
    call_patch_str = "CALL %s" % rel(tgt, patch_addr)
    _log.debug("call patch : %s" % call_patch_str)
    call_patch = asmb(call_patch_str, loc_db)
    final_patch += call_patch
    _log.debug("call patch asmb : %s" % encode_hex(call_patch))
else:
    #add the original bytes
    final_patch += instr.b

之后修复执行流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# cleaning the control flow by patching with real jmps locs
if 'cond' in link:
    t_addr = int(link['true_next'], 16)
    f_addr = int(link['false_next'], 16)
    jcc = link['cond'].replace('CMOV', 'J')
    _log.info("%s %#x" % (jcc, t_addr))
    _log.info("JMP %#x" % f_addr)
 
    patch1_str = "%s %s" % (jcc, rel(t_addr, patch_addr))
    jmp_patches += asmb(patch1_str, loc_db)
    patch_addr += len(jmp_patches)
 
    patch2_str = "JMP %s" % (rel(f_addr, patch_addr))
    jmp_patches += asmb(patch2_str, loc_db)
    _log.debug("jmp patches : %s; %s" % (patch1_str, patch2_str))
 
else:
    n_addr = int(link['next'], 16)
    _log.info("JMP %#x" % n_addr)
 
    patch_str = "JMP %s" % rel(n_addr, patch_addr)
    jmp_patches = asmb(patch_str, loc_db)

如果是条件赋值,替换为条件跳转;否则直接替换为JMP指令。

 

至此,源码分析结束。如下图所示,可以看到其效果很不错:

 

0x02 不足及改进

0x02.1 不支持PE文件

其计算基址是通过访问sh属性:

1
2
3
section_ep = cont.bin_stream.bin.virt.parent.getsectionbyvad(cont.entry_point).sh
bin_base_addr = section_ep.addr - section_ep.offset
_log.info('bin_base_addr: %#x' % bin_base_addr)

 

故笔者首先增加一参数baseaddr

1
parser.add_argument('-b',"--baseaddr", help="file base address")

之后判断文件类型:

1
2
3
4
5
6
7
8
9
10
11
try:
    if args.baseaddr:
        _log.info('Base Address:'+args.baseaddr)
        baseaddr=int(args.baseaddr,16)
    elif cont.executable.isPE():
        baseaddr=0x400C00
        _log.info('Base Address:%x'%baseaddr)
except AttributeError:
    section_ep = cont.bin_stream.bin.virt.parent.getsectionbyvad(cont.entry_point).sh
    baseaddr = section_ep.addr - section_ep.offset
    _log.info('Base Address:%x'%baseaddr)

如果是PE文件且未提供baseaddr参数,则直接给其赋值为0x400C00;若是ELF文件,则访问sh属性。

0x02.2 获取CMOV指令

1
2
3
4
5
6
7
8
9
elif len(var_asg) > 1:
            #extracting the condition from the last 3rd line
            cond_mnem = last_blk.lines[-3].name
            _log.debug('cond used: %s' % cond_mnem)
            var_asg['cond'] = cond_mnem
            var_asg['true_next'] = hex(var_asg['true_next'])
            var_asg['false_next'] = hex(var_asg['false_next'])
            # map the conditions and possible values dictionary to the cfg info
            fixed_cfg[hex(addr)] = var_asg

通常CMOV指令位于倒数第三条语句,但笔者在测试时发现部分块CMOV指令位于倒数第四条语句:

 

 

故改进如下:

1
2
3
4
5
6
7
8
9
10
11
elif len(var_asg) > 1:
            #extracting the condition from the last 3rd line
            cond_mnem = last_blk.lines[-3].name
            _log.debug('cond used: %s' % cond_mnem)
            if cond_mnem=='MOV':
                cond_mnem = last_blk.lines[-4].name
            var_asg['cond'] = cond_mnem
            var_asg['true_next'] = hex(var_asg['true_next'])
            var_asg['false_next'] = hex(var_asg['false_next'])
            # map the conditions and possible values dictionary to the cfg info
            fixed_cfg[hex(addr)] = var_asg

0x02.3 调用导入函数

1
2
3
4
5
6
7
8
9
10
if instr.is_subcall():
                #generate asm for fixed calls with relative addrs
                patch_addr = start_addr + len(final_patch)
                tgt = loc_db.get_location_offset(instr.args[0].loc_key)
                _log.info("CALL %#x" % tgt)
                call_patch_str = "CALL %s" % rel(tgt, patch_addr)
                _log.debug("call patch : %s" % call_patch_str)
                call_patch = asmb(call_patch_str, loc_db)
                final_patch += call_patch
                _log.debug("call patch asmb : %s" % encode_hex(call_patch))

仅判断是否为函数调用,但并未判断调用函数地址是否为导入函数地址,如下图两种情形所示:

 

 

 

改进如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
if instr.is_subcall():
                if isinstance(instr.args[0],ExprMem) or isinstance(instr.args[0],ExprId):
                    final_patch += instr.b
                else:
                    #generate asm for fixed calls with relative addrs
                    patch_addr = start_addr + len(final_patch)
                    tgt = loc_db.get_location_offset(instr.args[0].loc_key)
                    _log.info("CALL %#x" % tgt)
                    call_patch_str = "CALL %s" % rel(tgt, patch_addr)
                    _log.debug("call patch : %s" % call_patch_str)
                    call_patch = asmb(call_patch_str, loc_db)
                    final_patch += call_patch
                    _log.debug("call patch asmb : %s" % encode_hex(call_patch))

0x02.4 get_phi_vars

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def get_phi_vars(ircfg):
    res = []
    blks = list(ircfg.blocks)
    irblock = (ircfg.blocks[blks[-1]])
 
    if irblock_has_phi(irblock):
        for dst, sources in viewitems(irblock[0]):
            phi_vars = sources.args
            parent_blks = get_phi_sources_parent_block(
                ircfg,
                irblock.loc_key,
                phi_vars
            )
 
    for var, loc in parent_blks.items():
        irblock = ircfg.get_block(list(loc)[0])
        for asg in irblock:
            dst, src = asg.items()[0]
            if dst == var:
                res += [int(src)]
 
    return res

未考虑到由局部变量存储状态变量值情形:

1
2
3
4
5
6
7
8
9
mov     eax, 0B3584423h
mov     ecx, 0CAE581F5h
...
test    cl, 1
mov     r9d, [rbp+18Ch]
mov     r14d, [rbp+17Ch]
cmovnz  r9d, r14d
mov     [rbp+230h], r9d
jmp     loc_4044AE

如果是上述情形,状态变量值往往位于头节点中,故改进如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def get_phi_vars(ircfg,loc_db,mdis):
    res = []
    blks = list(ircfg.blocks)
    irblock = (ircfg.blocks[blks[-1]])
 
    if irblock_has_phi(irblock):
        for dst, sources in viewitems(irblock[0]):
            phi_vars = sources.args
            parent_blks = get_phi_sources_parent_block(
                ircfg,
                irblock.loc_key,
                phi_vars
            )
 
    for var, loc in parent_blks.items():
        irblock = ircfg.get_block(list(loc)[0])
        for asg in irblock:
            dst, src = asg.items()[0]
            if dst == var and isinstance(src,ExprInt):
                res += [int(src)]
            elif dst==var and isinstance(src,ExprOp):
                reachings = ReachingDefinitions(ircfg)
                digraph = DiGraphDefUse(reachings)
                # the state var always a leaf
                for head in digraph.heads():
                    if head.var == src.args[0]:
                        head_offset=loc_db.get_location_offset(head.label)
                        if isinstance(mdis.dis_instr(head_offset).args[1],ExprInt):
                            res+=[int(mdis.dis_instr(head_offset).args[1])]
 
    return res

0x03 参阅链接


[培训]二进制漏洞攻防(第3期);满10人开班;模糊测试与工具使用二次开发;网络协议漏洞挖掘;Linux内核漏洞挖掘与利用;AOSP漏洞挖掘与利用;代码审计。

最后于 2021-7-27 22:43 被erfze编辑 ,原因:
收藏
点赞8
打赏
分享
最新回复 (13)
雪    币: 6
活跃值: (1010)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
菜鸟也想飞 2021-7-27 09:41
2
0
感谢分享
雪    币: 576
活跃值: (2035)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
kakasasa 2021-7-28 18:42
3
0
感谢分享,有需要再研究
雪    币: 52
能力值: ( LV1,RANK:0 )
在线值:
发帖
回帖
粉丝
橄榄绿o 2021-7-28 19:22
4
0
感谢分享
雪    币: 3516
活跃值: (17989)
能力值: ( LV12,RANK:277 )
在线值:
发帖
回帖
粉丝
0x指纹 5 2021-7-28 21:39
5
0
感谢分享
雪    币: 2663
活跃值: (5215)
能力值: ( LV10,RANK:177 )
在线值:
发帖
回帖
粉丝
YenKoc 2 2021-7-28 21:52
6
0
感谢分享
雪    币: 8731
活跃值: (5493)
能力值: ( LV13,RANK:296 )
在线值:
发帖
回帖
粉丝
sunfishi 4 2021-7-28 23:31
7
0
感谢分享
雪    币: 3708
活跃值: (3884)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
caolinkai 2021-7-29 09:05
8
0
感谢分享
雪    币: 12700
活跃值: (16317)
能力值: (RANK:730 )
在线值:
发帖
回帖
粉丝
有毒 10 2021-7-29 09:08
9
0
这可以啊,感谢分享
雪    币: 6790
活跃值: (8540)
能力值: ( LV17,RANK:797 )
在线值:
发帖
回帖
粉丝
无名侠 12 2021-7-29 14:09
10
1
说实话,这些文章都解决的是最理想情况的混淆,然而现实很残酷
雪    币: 777
活跃值: (1066)
能力值: ( LV5,RANK:78 )
在线值:
发帖
回帖
粉丝
bambooqj 2021-7-29 18:37
11
0
无名侠 说实话,这些文章都解决的是最理想情况的混淆,然而现实很残酷
确实
雪    币: 6
活跃值: (980)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
lookzo 2021-7-29 20:40
12
0
ls两位,分享一下现实中的混淆如何解决吧
雪    币: 1331
活跃值: (9456)
能力值: ( LV12,RANK:650 )
在线值:
发帖
回帖
粉丝
erfze 12 2021-7-31 22:47
14
0
无名侠 说实话,这些文章都解决的是最理想情况的混淆,然而现实很残酷
确实是这样,遇到过的样本少。只不过这个脚本恰好可以解决我当时的需求
游客
登录 | 注册 方可回帖
返回