首页
社区
课程
招聘
[原创]某BR间接跳转反混淆
发表于: 2024-9-29 01:42 1753

[原创]某BR间接跳转反混淆

2024-9-29 01:42
1753

分析

与常见的br跳转不同的是这里使用了2个condition select

jcc跳转

图片描述
先是,根据cmp的结果通过condition select instruction (0x27B78这里是CSEL)得到一个下标,然后是一个存放偏移地址的table(off_1F4340),根据上面得到的下标从table取出地址,再一次根据前面cmp的结果选择一个常量与table里取出的地址相加减运算得到最终的跳转地址。
图片描述

从上面分析可以看到,两个condition select都是使用了同一个cmp结果,也就是说下标和常量是配对的,最终会根据不同结果得到两个不同的地址,这里就是模拟了jcc。

还有一种情况是将上面condition select得到的运算的常量进行了一次或多次加密,在参与运算前会解密这些常量,使用的算法是异或。比如下图是某函数的开始位置,通过memcpy从qword_1980B0复制了一段数据到栈上
图片描述
调用函数完成第一次解密
图片描述
图片描述
间接跳转的时候就会取出第二次解密计算
图片描述
也有一种解密方式是直接把解密函数内联优化了,直接在函数内通过condition branch完成。
图片描述

jmp跳转

另一种br间接跳转是模拟了无条件jmp,但是也是通过table[index] +/- value得到目标地址
图片描述

最后观察其他的br,可以看到每个function都有自己的跳转地址的table,并且在entry block赋值,这一点对后面一些特殊情况需要手动计算时比较有帮助。

反混淆思路

  1. 找出所有的br指令位置,判断br前的几个指令是否符合间接跳转的几个特征。识别br跳转的类型是jcc还是jmp并记录跳转的目标地址,如果是jcc还需要记录br之前第一个set condition的位置、 condition select使用的condition,如EQ、NE。
  2. 从函数entry block开始模拟执行,在set condition也就是cmp后面的位置修改条件命中的状态,修改命中状态之前先保存当前模拟器的完整context(包括寄存器和map的mem),之后分别在两种不同状态继续向下执行,对于条件跳转指令也是保存context后对两个分支分别执行。
  3. 得到所有分支的目标地址后进行patch。大部分跳转部分的代码都是计算好跳转目标后就直接br跳转了,中间的代码不会破坏标志位寄存器,所以可以直接用记录的condition重新patch一个条件跳转。patch实现主要难点在jcc部分,需要处理不同condition跳到不同地址。可以新建一个段专门存放各个分发部分的代码,br那里patch直接跳转到对应分发部分就好,也可以直接操作反编译器的ir完成。剩下的一小部分是上面带有常量解密部分的,先计算好跳转目标,之后去解密了一部分常量,解密过程会破坏标记位寄存器,不过好在只有几个位置是这种情况,直接nop掉解密常量的部分继续使用上面方法就好。

代码实现

提取br信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def _getBlockByAddr(addr, flowchart):
    bb: BasicBlock
    for bb in flowchart:
        if (addr >= bb.start_ea and addr < bb.end_ea) or addr == bb.start_ea:
            return bb
    return None
 
def process_br(addr):
    function = idaapi.get_func(addr)
    flowchart = list(idaapi.FlowChart(function))
    basic_block = _getBlockByAddr(addr, flowchart)
 
    instruction = idaapi.insn_t()
    idaapi.decode_insn(instruction, addr)
    branch_register = instruction.Op1.reg
 
    instruction_addr = addr
    instruction_count = (basic_block.end_ea - basic_block.start_ea) // 4
    if instruction_count <= 2# 一些特殊情况导致basicblock被分开
        while instruction_addr >= basic_block.start_ea and instruction_addr <= basic_block.end_ea:
            instruction_addr = idc.prev_head(instruction_addr, idc.get_inf_attr(idc.INF_MIN_EA))
        basic_block = _getBlockByAddr(instruction_addr, flowchart)
 
    if basic_block is None:
        return None
 
    is_entry_block = False
    for (start_addr, end_addr) in func_range:
        if basic_block.start_ea == start_addr:
            is_entry_block = True
            func_start_br.append((start_addr, end_addr))
            break
 
    instruction_addr = addr
    count = 0
    jcc_phase = 0
    jmp_phase = 0
    is_jcc = False
    is_jmp = False
    condition_instructions = []
    jcc_conditions = []
    jcc_base_register = ""
    jmp_base_register = ""
    calc_used_registers = set()
    cmp_address = 0
 
    '''
    jcc:
        CMP             X8, X9
        MOV             W8, #8
        MOV             W9, #0x90
        CSEL            X8, X9, X8, EQ
        LDR             X8, [X27,X8]
        MOV             W10, #0xB5AF 
        MOV             W11, #0xA4ED
        CSEL            X10, X11, X10, EQ
        SUB             X8, X8, X10 
        BR              X8           
 
    jmp:
        LDR             X8, [X24,#0xe8]
        MOV             X9, #0xFFFFFFFFFFFF7B09
        ADD             X8, X8, X9
        BR              X8
    '''
    while count < 50:
        instruction_addr = idc.prev_head(instruction_addr, idc.get_inf_attr(idc.INF_MIN_EA))
        count += 1
 
        instruction = idaapi.insn_t()
        length = idaapi.decode_insn(instruction, instruction_addr)
 
        mnemonic = idc.print_insn_mnem(instruction_addr).lower()
 
        if mnemonic in ["csel", "csinc", "csinv", "csneg", "cset", "csetm", "cinc", "cinv", "cneg"]:
            condition_instructions.append(GetDisasm(instruction_addr))
            jcc_conditions.append(GetDisasm(instruction_addr).split(", ")[-1])
            if jcc_phase == 1:
                jcc_phase += 1
            elif jcc_phase == 3:
                is_jcc = True
 
        elif mnemonic == "ldr":
            if instruction.Op1.type == o_reg and instruction.Op1.reg in calc_used_registers:
                if jcc_phase == 2:
                    jcc_phase += 1
                    jcc_base_register = reg_name[instruction.Op2.phrase]
                if jmp_phase == 2:
                    is_jmp = True
                    jmp_base_register = reg_name[instruction.Op2.phrase]
 
        elif mnemonic in ["br", "blr", "bl", "b", "ret"]:
            break
 
        elif mnemonic in ["add", "sub"]:
            if instruction.Op1.type == instruction.Op2.type == o_reg and instruction.Op1.reg == branch_register and instruction.Op3.type in [o_reg, o_idpspec0]:
                if jmp_phase == 0:
                    jmp_phase += 1
                    jmp_calc_used_register = instruction.Op3.reg
                    calc_used_registers.add(instruction.Op1.reg)
                    calc_used_registers.add(instruction.Op2.reg)
                if jcc_phase == 0:
                    jcc_phase += 1
                    calc_used_registers.add(instruction.Op1.reg)
                    calc_used_registers.add(instruction.Op2.reg)
 
        elif mnemonic == "mov":
            if jmp_phase == 1 and instruction.Op2.type == o_imm and instruction.Op1.type == o_reg and instruction.Op1.reg == jmp_calc_used_register:
                jmp_phase += 1
 
        elif spoils_flags(instruction):
            if is_jcc and cmp_address == 0:
                cmp_address = instruction_addr
 
    if is_jcc:
        if jcc_conditions[0] == jcc_conditions[1]:
            print(f"jcc br    addr: {hex(addr)}    is_entry_block: {is_entry_block}  base_reg: {jcc_base_register}  cmp addr: {hex(cmp_address)}")
        else: # 特殊情况,两个condtion select使用的条件不同
            print(f"jcc br    addr: {hex(addr)}    is_entry_block: {is_entry_block}  base_reg: {jcc_base_register}  condition not valid!  condition: {jcc_conditions}  condition_instructions: {condition_instructions}")
    elif is_jmp:
        print(f"jmp br    addr: {hex(addr)}    is_entry_block: {is_entry_block}  base_reg: {jmp_base_register}")
    else:
        print(f"Invalid br addr: {hex(addr)}")

模拟执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
class BranchInstruction:
    def __init__(self, address, branch_type, compare_address=None, condition=None, function_index=None):
        self.address = address
        self.branch_type = branch_type
        self.compare_address = compare_address
        self.condition = condition
        self.taken_branch_address = None
        self.not_taken_branch_address = None
        self.function_index = function_index
 
branch_info = dict()
 
branch_data = [
    {"branch_address": 0x27b9c, "compare_address": 0x27b70, "condition": "EQ", "branch_type": "jcc"},
    {"branch_address": 0x27be4, "branch_type": "jmp"},
    # ... 其他数据保持不变
]
 
breakpoint_addresses_branch = []
breakpoint_addresses_compare = []
all_conditions = set()
 
compare_address_lookup = dict()
function_ranges = [(162512, 164428), (182320, 185084), (220224, 220252), (294424, 294440), (300112, 300140),
                   (308092, 309132), (368928, 368976), ..., (0, 0)]
 
                       
def set_condition_flags(uc, condition, condition_hit=True):
    nzcv = uc.reg_read(arm64_const.UC_ARM64_REG_NZCV)
 
    # Helper functions to set or clear specific bits in NZCV
    def set_bit(value, bit):
        return value | (1 << bit)
 
    def clear_bit(value, bit):
        return value & ~(1 << bit)
 
    # N: Negative, Z: Zero, C: Carry, V: Overflow
    N_BIT = 31
    Z_BIT = 30
    C_BIT = 29
    V_BIT = 28
 
    if condition == "EQ"# Equal (Z set)
        if condition_hit:
            nzcv = set_bit(nzcv, Z_BIT)
        else:
            nzcv = clear_bit(nzcv, Z_BIT)
    elif condition == "NE"# Not Equal (Z clear)
        if condition_hit:
            nzcv = clear_bit(nzcv, Z_BIT)
        else:
            nzcv = set_bit(nzcv, Z_BIT)
    elif condition == "GT"# Greater Than (Z clear and N == V)
        if condition_hit:
            nzcv = clear_bit(nzcv, Z_BIT)
            nzcv = set_bit(nzcv, N_BIT) if (nzcv >> V_BIT) & 1 else clear_bit(nzcv, N_BIT)
        else:
            nzcv = set_bit(nzcv, Z_BIT)
    elif condition == "LT"# Less Than (N != V)
        if condition_hit:
            nzcv = set_bit(nzcv, N_BIT) if not (nzcv >> V_BIT) & 1 else clear_bit(nzcv, N_BIT)
        else:
            nzcv = set_bit(nzcv, N_BIT) if (nzcv >> V_BIT) & 1 else clear_bit(nzcv, N_BIT)
    elif condition == "HI"# Higher (C set and Z clear)
        if condition_hit:
            nzcv = set_bit(nzcv, C_BIT)
            nzcv = clear_bit(nzcv, Z_BIT)
        else:
            nzcv = clear_bit(nzcv, C_BIT)
            nzcv = set_bit(nzcv, Z_BIT)
    elif condition == "CC"# Carry Clear (C clear)
        if condition_hit:
            nzcv = clear_bit(nzcv, C_BIT)
        else:
            nzcv = set_bit(nzcv, C_BIT)
    else:
        raise ValueError(f"Unsupported condition: {condition}")
 
    uc.reg_write(arm64_const.UC_ARM64_REG_NZCV, nzcv)
 
 
def is_condition_met(uc, condition):
    nzcv = uc.reg_read(arm64_const.UC_ARM64_REG_NZCV)
 
    # Helper functions to check specific bits in NZCV
    def is_bit_set(value, bit):
        return (value >> bit) & 1
 
    # N: Negative, Z: Zero, C: Carry, V: Overflow
    N_BIT = 31
    Z_BIT = 30
    C_BIT = 29
    V_BIT = 28
 
    if condition == "EQ"# Equal (Z set)
        return is_bit_set(nzcv, Z_BIT)
    elif condition == "NE"# Not Equal (Z clear)
        return not is_bit_set(nzcv, Z_BIT)
    elif condition == "GT"# Greater Than (Z clear and N == V)
        return not is_bit_set(nzcv, Z_BIT) and (is_bit_set(nzcv, N_BIT) == is_bit_set(nzcv, V_BIT))
    elif condition == "LT"# Less Than (N != V)
        return is_bit_set(nzcv, N_BIT) != is_bit_set(nzcv, V_BIT)
    elif condition == "HI"# Higher (C set and Z clear)
        return is_bit_set(nzcv, C_BIT) and not is_bit_set(nzcv, Z_BIT)
    elif condition == "CC"# Carry Clear (C clear)
        return not is_bit_set(nzcv, C_BIT)
    else:
        raise ValueError(f"Unsupported condition: {condition}")
 
 
def is_valid_address(addr, segment_name=".text"):
    seg = get_segm_by_name(segment_name)
    return addr >= seg.start_ea and addr <= seg.end_ea
 
 
def instruction_hook(uc, address, size, userData):
    print(">>> Tracing instruction at 0x%x" % (address))
 
    eh: flare_emu.EmuHelper = userData["EmuHelper"]
    uc: unicorn.Uc
 
    # 特殊处理
    if address == 0xB9624:
        print("memcpy api hook")
        eh._handleApiHooks(address, eh.getArgv(), "memcpy", userData)
        uc.reg_write(arm64_const.UC_ARM64_REG_PC, address + size)
        return
 
    if address == 0xB96A4 or address == 0xBB18C or address in range(0xBA2E8, 0x0BA338):
        print("Consts Decrypt func execute")
        return
 
    if not is_valid_address(address):
        print("Error: Invalid address detected. Stopping emulation.")
        print(traceback.print_stack())
        uc.emu_stop()
        return
 
    if idc.print_insn_mnem(address) in ["BL", "BLR", "BLX"]:
        print("Info: Skipping function call.")
        uc.reg_write(arm64_const.UC_ARM64_REG_X0, 1)
        uc.reg_write(arm64_const.UC_ARM64_REG_PC, address + size)
 
    if idc.print_insn_mnem(address) == "RET":
        print("Info: Function returned. Stopping emulation.")
        uc.emu_stop()
        return
 
    if idc.print_insn_mnem(address)[:2] == "B." and address not in compare_visited:
        print(f"Info: Conditional branch detected at 0x{address:x}")
        compare_visited.append(address)
        current_state: unicorn.UcContext = uc.context_save()
        instruction = idaapi.insn_t()
        idaapi.decode_insn(instruction, address)
        current_emu_helper = flare_emu.EmuHelper(verbose=1, emuHelper=emu_helper)
        print(f"    Emulating branch condition: taken (0x{instruction.Op1.addr:x})")
        current_emu_helper.emulateRange(instruction.Op1.addr, user_data["endAddr"], instructionHook=instruction_hook, count=100,
                                        uc_context=current_state)
 
        current_emu_helper = flare_emu.EmuHelper(verbose=1, emuHelper=emu_helper)
        print(f"    Emulating branch condition: not taken (0x{address + 4:x})")
        current_emu_helper.emulateRange(address + 4, user_data["endAddr"], instructionHook=instruction_hook, count=100,
                                        uc_context=current_state)
 
        emu_helper.stopEmulation(user_data)
        return
 
    if address in breakpoint_addresses_compare:
        branch_address = compare_address_lookup[address]
        condition = branch_info[branch_address].condition
        print(f"Info: Compare instruction at 0x{address:x}, Condition: {condition}")
        breakpoint_addresses_compare.remove(address)
        start_pc = address + 4
        current_state: unicorn.UcContext = uc.context_save()
 
        if branch_info[branch_address].taken_branch_address is None:
            print(f"Info: Starting emulation with condition taken at 0x{start_pc:x}")
            current_emu_helper = flare_emu.EmuHelper(verbose=1, emuHelper=emu_helper)
            set_condition_flags(current_state, condition, True)
            current_emu_helper.emulateRange(start_pc, user_data["endAddr"], instructionHook=instruction_hook, count=100,
                                            uc_context=current_state)
 
        if branch_info[branch_address].not_taken_branch_address is None:
            print(f"Info: Starting emulation with condition not taken at 0x{start_pc:x}")
            current_emu_helper = flare_emu.EmuHelper(verbose=1, emuHelper=emu_helper)
            set_condition_flags(current_state, condition, False)
            current_emu_helper.emulateRange(start_pc, user_data["endAddr"], instructionHook=instruction_hook, count=100,
                                            uc_context=current_state)
 
        emu_helper.stopEmulation(user_data)
        return
    elif address in breakpoint_addresses_branch:
        instruction = idaapi.insn_t()
        idaapi.decode_insn(instruction, address)
        register_name = regid_2_name[instruction.Op1.reg]
        register_value = uc.reg_read(unicorn_reg_map[register_name])
        print(f"Info: Executing branch instruction at 0x{address:x}, {register_name} = 0x{register_value:x}")
        if branch_info[address].branch_type == "jcc":
            if is_condition_met(uc, branch_info[address].condition) and branch_info[address].taken_branch_address is None:
                if register_value in range(function_ranges[branch_info[address].function_index][0],
                                           function_ranges[branch_info[address].function_index][1]):
                    print(f"Info: Conditional jump (JCC) condition taken at 0x{address:x}, {register_name} = 0x{register_value:x}")
                    branch_info[address].taken_branch_address = register_value
            elif not is_condition_met(uc, branch_info[address].condition) and branch_info[address].not_taken_branch_address is None:
                if register_value in range(function_ranges[branch_info[address].function_index][0],
                                           function_ranges[branch_info[address].function_index][1]):
                    print(f"Info: Conditional jump (JCC) condition not taken at 0x{address:x}, {register_name} = 0x{register_value:x}")
                    branch_info[address].not_taken_branch_address = register_value
            if branch_info[address].taken_branch_address is not None and branch_info[address].not_taken_branch_address is not None:
                print(
                    f"Info: All conditional jump (JCC) paths processed at 0x{address:x}. "
                    f"Taken path: 0x{branch_info[address].taken_branch_address:x}, "
                    f"Not taken path: 0x{branch_info[address].not_taken_branch_address:x}")
                breakpoint_addresses_branch.remove(address)
 
        elif branch_info[address].branch_type == "jmp" and branch_info[address].taken_branch_address is None:
            if register_value in range(function_ranges[branch_info[address].function_index][0],
                                       function_ranges[branch_info[address].function_index][1]):
                branch_info[address].taken_branch_address = register_value
                print(f"Info: Unconditional jump (JMP) executed at 0x{address:x}, {register_name} = 0x{register_value:x}")
                breakpoint_addresses_branch.remove(address)
 
 
for (function_start, function_end) in function_ranges[:-1]:
    print(f"Info: Emulating function from 0x{function_start:x} to 0x{function_end:x}")
    emu_helper = flare_emu.EmuHelper(verbose=1)
    try:
        emu_helper.emulateRange(function_start, function_end - 4, instructionHook=instruction_hook, count=100)
        print("Info: Function emulation completed successfully.")
    except Exception as e:
        print(f"Error: Exception occurred during emulation: {e}")

需要注意的几个地方:

  1. unicorn执行arm64 atomic instructions会出错,在unicorn中直接hook跳过这条指令
    图片描述
    图片描述
  2. 两个condition select使用不同condition的情况,两个condition刚好是相反的,识别时对不同的swap一下
    图片描述
  3. unicorn的save_context默认只能保存寄存器状态,mem map的保存可以借助flare_emu的emuHelper完成
  4. 部分函数的常量解密部分的代码要正常执行。
  5. 如果br间接跳转会跳回当前basic block,那么模拟执行部分会出现死循环,不过好在没有跳回的情况出现。

反混淆效果

图片描述


[培训]内核驱动高级班,冲击BAT一流互联网大厂工作,每周日13:00-18:00直播授课

收藏
免费 5
支持
分享
最新回复 (5)
雪    币: 406
能力值: ( LV1,RANK:0 )
在线值:
发帖
回帖
粉丝
2
大佬,能给个案例so或者链接吗
2024-10-2 19:13
0
雪    币: 171
能力值: ( LV1,RANK:0 )
在线值:
发帖
回帖
粉丝
3
sinker_ 大佬,能给个案例so或者链接吗

附件

上传的附件:
2024-10-3 14:19
0
雪    币: 1925
活跃值: (2332)
能力值: ( LV3,RANK:20 )
在线值:
发帖
回帖
粉丝
4
感谢分享
2024-10-3 17:42
0
雪    币: 406
能力值: ( LV1,RANK:0 )
在线值:
发帖
回帖
粉丝
5
k423 附件
感谢大佬分享
2024-10-5 14:04
0
雪    币: 6
能力值: ( LV1,RANK:0 )
在线值:
发帖
回帖
粉丝
6
大佬,很强,刚好学习了
2天前
0
游客
登录 | 注册 方可回帖
返回
//