基于angr和idapython的非标准OLLVM-fla反混淆分析还原
本文章主要是采用angr框架和idapython脚本相结合,实现对非标准ollvm-fla控制流平坦化反混淆的分析和处理,以及对angr和idapython相关api进项讲解。
要解决fla的混淆,需要实现三大步骤:
步骤1,3相对简单,这里可以看大家的喜好,愿意用angr也是可以的,我倾向于在分析fla混淆的时候能够实时的观测到修改块的实时现象,我就采用了idapython脚本处理。
这里主要说的是步骤2,比如,当我们拿到了所有真实块,我们应该怎么去找到真是块之间的连接关系。遇到混淆程度不高的,可以尝试体力活修改。那么遇到下面的这种混淆,请问阁下该如何应对。

所以,这里我们需要用到angr的一个强大功能:符号执行(angr更多的原理需自行百度查阅)
这里又衍生出一个新问题,什么是符号执行?
它的运行和unidbg类似,都是通过模拟执行,但是不同的是,unidbg的模拟执行需要传入具体的数值,而angr可以不需要。
比如下面一个简单的加法函数,unidbg要模拟执行它,就需要传入a和b具体的数值,假设传入a=1,b=2,这里unidbg的执行结果就是3。
而angr不需要,它执行直接传入符号a和b,注意这里是符号,不是具体的值,最后通过模拟执行,输出结果a+b。
所以我们就是利用angr符号执行的特性,用到其中主要的路径探索的能力,当执行到第一个真实块A的时候 ,把它标记为主块,然后让其继续运行,当它碰到的第一个真实块B的时候,这里就是A的后继块,那么A和B的连接关系就被我们给找到了。这里也要注意到angr的一个路径爆炸的问题,后面会说怎么去规避。
此函数作为本章内容的分析目标,这是一个非标准的ollvm,其有两个循环头

反混淆的第一步,找到函数的所有真实块。用idapython脚本处理
通过循环头我们可以直接获取到所有对应的真实块
采用广度搜索的原理实现循环头的获取
这里要处理标准fla和非标准fla的获取方式
非标准fla的循环头地址和汇聚块的地址是相等的

标准fla的循环头地址和汇聚块的地址是不相等的,其循环头的前驱只有两个基本块,一个是序言块,一个是汇聚块。

有了汇聚块,就可以通过block所属的preds()方法获得所有的前驱块,也就是它的真实块。标准fla还得注意循环头的前驱块中,需要保留序言块
这个真实块的获取也需要注意,有的会出现多个基本块的尾部指令是相同的,ida会把它单独提取出来共享,如果我们直接使用0x42288的基本块作为真实块,会出现真实块遗漏,导致反混淆的代码不全。所以这里需要取0x42288的所有前驱作为真实块。

ret块的获取
像标准fla,我们就需要0xA66C作为ret块

非标准fla,我们就需要0x42AB0作为ret块,为什么不选择0x42AC4为ret块呢?有两个原因:1.0x42AB0块中出有变量初始化的指令,如果直接选择0x42AC4,会导致反汇编后的真实代码遗漏。2.0x42AE4这条分支也没有后继。
从这里也可以引申出为社么标准fla要选择0xA66C作为ret块,如果选择0x96EC,因为0x9700分支有后继,还是会存在混淆代码,导致反混淆无效,虽然去除了一部分,但是还是无法直观分析代码。



调用后,通过颜色标记了所有的真实块
反混淆的第二步,找到函数的所有真实块连接关系。用angr处理
这里就解决前面说到的在探索路径时遇到的路径爆炸的问题,一般常出现在一个循环里带一个if条件判断,这个时候angr就会由一条路径分裂出两条路径,这两条路径分别是if为true时的路径,和为false的路径,然后继续执行循环循环,此时2条路径就会变成4条路径,继续循环,4条路径就会出现8条路径......所以遇到这种情况,路径会以指数的形式增加,最后路径会膨胀到非常大,导致程序卡死。

real_blocks是我上面获取到的所有真实块
所以接下来我采用的方式是,不让它整个程序一次性执行完,而是每次取一个真实块地址real_blocks[0],作为主块A,让其运行,当再次遇到的一个B地址在我保存的真实块地址里时我就停止运行,把这个块连接A->B保存下来。然后再取real_blocks[1]为主块,从头开始继续运行,再次遇到的一个地址在真实块地址里就停止运行。重复这个操作,我也就不用担心路径爆炸的问题,并且也会获得所有真实块的连接关系。
在一般情况下,加载程序都会将 auto_load_libs 置为 False ,这是因为如果将外部库一并加载,那么 Angr 就也会跟着一起去分析那些库了,这对性能的消耗是比较大的。
在序言块里会有许多寄存器的赋值操作,这些都是一些基本块的条件判断,通过寄存器值判断应该走哪条路径

典型的就是以基本块最后一条指令的B.EQ,B.GT等等作为判断,这些都是子分发器。

通过hook操作,当程序执行到主序言块的最后一条指令时,将pc寄存器赋值为真实块的值,这样可以避免执行大量的无用指令,减少性能消耗,节约更多的时间。
这里主要处理主序言块所有的真实块的操作,相关流程:

这个是第二个循环头的序言块,后面就叫它子序言块


这里也是一样用hook操作,改变pc寄存器的值。但是这里多了一步hook,多的 proj.hook(first_block_last_ins.address, jump_to_child_prologue_address, first_block_last_ins.size)这个hook是为了初始化子序言块里的寄存器值,因为子序言块0x42258里也有一些条件判断的寄存器赋值操作。
所以这里的流程是:

构建流程分析完了,这里就直接贴上相关脚本,脚本里也注释了相关代码的作用

反混淆的第三步,重建真实块之间的控制流。用idapython处理
重建控制流主要对两种方式进行处理
1.带csel指令的分支跳转

2.无分支跳转

脚本里写好了相关注释,这里直接贴代码

查看重建结果,可以看到已经反混淆成功了

针对上述的脚本最后也是归纳到一起了,内容较多,就不贴代码了,脚本文件会放置在github地址下载
使用的时候只需要提供函数地址即可

1.cfg图


2.cfg图


ollvm-fla的混淆围绕三大步骤展开可以实现反混淆,脚本不是全部通用,如果遇到混淆程度非常复杂的,还得需要针对性去完善相关功能。
分析样本可以用上篇文章的
相关文件下载地址:15eK9s2c8@1M7s2y4Q4x3@1q4Q4x3V1k6Q4x3V1k6Y4K9i4c8Z5N6h3u0Q4x3X3g2U0L8$3#2Q4x3V1k6B7K9i4g2@1K9h3q4F1y4U0j5$3i4K6u0r3P5r3c8W2k6X3I4S2i4K6u0W2k6$3W2@1
int add(int a ,int b){
return a+b;
}
int add(int a ,int b){
return a+b;
}
blocks = idaapi.FlowChart(idaapi.get_func(func_ea))
blocks = idaapi.FlowChart(idaapi.get_func(func_ea))
def find_loop_heads(func):
loop_heads = set()
queue = deque()
block = get_block_by_address(func)
queue.append((block, []))
while len(queue) > 0:
cur_block, path = queue.popleft()
if cur_block.start_ea in path:
loop_heads.add(cur_block.start_ea)
continue
path = path+ [cur_block.start_ea]
queue.extend((succ, path) for succ in cur_block.succs())
all_loop_heads = list(loop_heads)
all_loop_heads.sort()
return all_loop_heads
def find_loop_heads(func):
loop_heads = set()
queue = deque()
block = get_block_by_address(func)
queue.append((block, []))
while len(queue) > 0:
cur_block, path = queue.popleft()
if cur_block.start_ea in path:
loop_heads.add(cur_block.start_ea)
continue
path = path+ [cur_block.start_ea]
queue.extend((succ, path) for succ in cur_block.succs())
all_loop_heads = list(loop_heads)
all_loop_heads.sort()
return all_loop_heads
def find_converge_addr(loop_head_addr):
converge_addr = None
block = get_block_by_address(loop_head_addr)
preds = block.preds()
pred_list = list(preds)
if len(pred_list) == 2:
for pred in pred_list:
tmp_list = list(pred.preds())
if len(tmp_list) >1:
converge_addr = pred.start_ea
else:
converge_addr= loop_head_addr
return converge_addr
def find_converge_addr(loop_head_addr):
converge_addr = None
block = get_block_by_address(loop_head_addr)
preds = block.preds()
pred_list = list(preds)
if len(pred_list) == 2:
for pred in pred_list:
tmp_list = list(pred.preds())
if len(tmp_list) >1:
converge_addr = pred.start_ea
else:
converge_addr= loop_head_addr
return converge_addr
real_blocks = []
if loop_head_addr != converge_addr:
loop_head_preds_addr.remove(converge_addr)
real_blocks.extend(loop_head_preds_addr)
converge_block = get_block_by_address(converge_addr)
list_preds = list(converge_block.preds())
for pred_block in list_preds:
if pred_block.start_ea == loop_head_addr:
continue
end_ea = pred_block.end_ea
last_ins_ea = idc.prev_head(end_ea)
mnem = idc.print_insn_mnem(last_ins_ea)
size = get_basic_block_size(pred_block)
if size > 4 and "B." not in mnem:
start_ea = pred_block.start_ea
mnem = idc.print_insn_mnem(start_ea)
if mnem == "CSEL":
csel_preds = pred_block.preds()
for csel_pred in csel_preds:
real_blocks.append(csel_pred.start_ea)
else:
real_blocks.append(pred_block.start_ea)
real_blocks = []
if loop_head_addr != converge_addr:
loop_head_preds_addr.remove(converge_addr)
real_blocks.extend(loop_head_preds_addr)
converge_block = get_block_by_address(converge_addr)
list_preds = list(converge_block.preds())
for pred_block in list_preds:
if pred_block.start_ea == loop_head_addr:
continue
end_ea = pred_block.end_ea
last_ins_ea = idc.prev_head(end_ea)
mnem = idc.print_insn_mnem(last_ins_ea)
size = get_basic_block_size(pred_block)
if size > 4 and "B." not in mnem:
start_ea = pred_block.start_ea
mnem = idc.print_insn_mnem(start_ea)
if mnem == "CSEL":
csel_preds = pred_block.preds()
for csel_pred in csel_preds:
real_blocks.append(csel_pred.start_ea)
else:
real_blocks.append(pred_block.start_ea)
start_ea = pred_block.start_ea
mnem = idc.print_insn_mnem(start_ea)
if mnem == "CSEL":
csel_preds = pred_block.preds()
for csel_pred in csel_preds:
real_blocks.append(csel_pred.start_ea)
start_ea = pred_block.start_ea
mnem = idc.print_insn_mnem(start_ea)
if mnem == "CSEL":
csel_preds = pred_block.preds()
for csel_pred in csel_preds:
real_blocks.append(csel_pred.start_ea)
from collections import deque
import idaapi
import idc
def get_block_by_address(ea):
func = idaapi.get_func(ea)
blocks = idaapi.FlowChart(func)
for block in blocks:
if block.start_ea <= ea < block.end_ea:
return block
return None
def find_loop_heads(func):
loop_heads = set()
queue = deque()
block = get_block_by_address(func)
queue.append((block, []))
while len(queue) > 0:
cur_block, path = queue.popleft()
if cur_block.start_ea in path:
loop_heads.add(cur_block.start_ea)
continue
path = path+ [cur_block.start_ea]
queue.extend((succ, path) for succ in cur_block.succs())
all_loop_heads = list(loop_heads)
all_loop_heads.sort()
return all_loop_heads
def find_converge_addr(loop_head_addr):
converge_addr = None
block = get_block_by_address(loop_head_addr)
preds = block.preds()
pred_list = list(preds)
if len(pred_list) == 2:
for pred in pred_list:
tmp_list = list(pred.preds())
if len(tmp_list) >1:
converge_addr = pred.start_ea
else:
converge_addr= loop_head_addr
return converge_addr
def get_basic_block_size(bb):
return bb.end_ea - bb.start_ea
def add_block_color(ea):
block = get_block_by_address(ea)
curr_addr = block.start_ea
while curr_addr <block.end_ea:
idc.set_color(curr_addr,idc.CIC_ITEM,0xffcc33)
curr_addr = idc.next_head(curr_addr)
def del_func_color(curr_addr):
end_ea = idc.find_func_end(curr_addr)
while curr_addr < end_ea:
idc.set_color(curr_addr, idc.CIC_ITEM, 0xffffffff)
curr_addr = idc.next_head(curr_addr)
def find_ret_block_addr(blocks):
for block in blocks:
succs = block.succs()
succs_list = list(succs)
end_ea = block.end_ea
last_ins_ea = idc.prev_head(end_ea)
mnem = idc.print_insn_mnem(last_ins_ea)
if len(succs_list) == 0:
if mnem == "RET":
ori_ret_block = block
while True:
tmp_block = block.preds()
pred_list = list(tmp_block)
if len(pred_list) == 1:
block = pred_list[0]
if get_basic_block_size(block) == 4:
continue
else:
break
else:
break
block2 = block
num = 0
i = 0
while True:
i += 1
succs_block = block2.succs()
for succ in succs_block:
child_succs = succ.succs()
succ_list = list(child_succs)
if len(succ_list) != 0:
block2 = succ
num += 1
if num > 2:
block = ori_ret_block
break
if i > 2:
break
return block.start_ea
def find_all_real_block(func_ea):
blocks = idaapi.FlowChart(idaapi.get_func(func_ea))
loop_heads = find_loop_heads(func_ea)
print(f"循环头数量:{len(loop_heads)}----{[hex(loop_head) for loop_head in loop_heads]}")
all_real_block=[]
for loop_head_addr in loop_heads:
loop_head_block = get_block_by_address(loop_head_addr)
loop_head_preds = list(loop_head_block.preds())
loop_head_preds_addr = [loop_head_pred.start_ea for loop_head_pred in loop_head_preds]
converge_addr = find_converge_addr(loop_head_addr)
real_blocks = []
if loop_head_addr != converge_addr:
loop_head_preds_addr.remove(converge_addr)
real_blocks.extend(loop_head_preds_addr)
converge_block = get_block_by_address(converge_addr)
list_preds = list(converge_block.preds())
for pred_block in list_preds:
end_ea = pred_block.end_ea
last_ins_ea = idc.prev_head(end_ea)
mnem = idc.print_insn_mnem(last_ins_ea)
size = get_basic_block_size(pred_block)
if size > 4 and "B." not in mnem:
start_ea = pred_block.start_ea
mnem = idc.print_insn_mnem(start_ea)
if mnem == "CSEL":
csel_preds = pred_block.preds()
for csel_pred in csel_preds:
real_blocks.append(csel_pred.start_ea)
else:
real_blocks.append(pred_block.start_ea)
real_blocks.sort()
all_real_block.append(real_blocks)
print("子循环头:", [hex(child_block_ea) for child_block_ea in real_blocks])
ret_addr = find_ret_block_addr(blocks)
all_real_block.append(ret_addr)
print("all_real_block:",all_real_block)
all_real_block_list = []
for real_blocks in all_real_block:
if isinstance(real_blocks, list):
all_real_block_list.extend(real_blocks)
else:
all_real_block_list.append(real_blocks)
for real_block_ea in all_real_block_list:
add_block_color(real_block_ea)
print("\n所有真实块获取完成")
print("===========INT===============")
print(all_real_block_list)
print("===========HEX===============")
print(f"数量:{len(all_real_block_list)}")
print([hex(real_block_ea) for real_block_ea in all_real_block_list],"\n")
all_child_prologue_addr = all_real_block.copy()
all_child_prologue_addr.remove(ret_addr)
all_child_prologue_addr.remove(all_child_prologue_addr[0])
print("所有子序言块相关的真实块地址:",all_child_prologue_addr)
all_child_prologue_last_ins_ea = []
for child_prologue_array in all_child_prologue_addr:
child_prologue_addr = child_prologue_array[0]
child_prologue_block = get_block_by_address(child_prologue_addr)
child_prologue_end_ea = child_prologue_block.end_ea
child_prologue_last_ins_ea = idc.prev_head(child_prologue_end_ea)
all_child_prologue_last_ins_ea.append(child_prologue_last_ins_ea)
print("所有子序言块的最后一条指令的地址:", all_child_prologue_last_ins_ea)
return all_real_block,all_child_prologue_addr,all_child_prologue_last_ins_ea
func_ea = 0x41D08
reals = find_all_real_block(func_ea)
from collections import deque
import idaapi
import idc
def get_block_by_address(ea):
func = idaapi.get_func(ea)
blocks = idaapi.FlowChart(func)
for block in blocks:
if block.start_ea <= ea < block.end_ea:
return block
return None
def find_loop_heads(func):
loop_heads = set()
queue = deque()
block = get_block_by_address(func)
queue.append((block, []))
while len(queue) > 0:
cur_block, path = queue.popleft()
if cur_block.start_ea in path:
loop_heads.add(cur_block.start_ea)
continue
path = path+ [cur_block.start_ea]
queue.extend((succ, path) for succ in cur_block.succs())
all_loop_heads = list(loop_heads)
all_loop_heads.sort()
return all_loop_heads
def find_converge_addr(loop_head_addr):
converge_addr = None
block = get_block_by_address(loop_head_addr)
preds = block.preds()
pred_list = list(preds)
if len(pred_list) == 2:
for pred in pred_list:
tmp_list = list(pred.preds())
if len(tmp_list) >1:
converge_addr = pred.start_ea
else:
converge_addr= loop_head_addr
return converge_addr
def get_basic_block_size(bb):
return bb.end_ea - bb.start_ea
def add_block_color(ea):
block = get_block_by_address(ea)
curr_addr = block.start_ea
while curr_addr <block.end_ea:
idc.set_color(curr_addr,idc.CIC_ITEM,0xffcc33)
curr_addr = idc.next_head(curr_addr)
def del_func_color(curr_addr):
end_ea = idc.find_func_end(curr_addr)
while curr_addr < end_ea:
idc.set_color(curr_addr, idc.CIC_ITEM, 0xffffffff)
curr_addr = idc.next_head(curr_addr)
def find_ret_block_addr(blocks):
for block in blocks:
succs = block.succs()
succs_list = list(succs)
end_ea = block.end_ea
last_ins_ea = idc.prev_head(end_ea)
mnem = idc.print_insn_mnem(last_ins_ea)
if len(succs_list) == 0:
if mnem == "RET":
ori_ret_block = block
while True:
tmp_block = block.preds()
pred_list = list(tmp_block)
if len(pred_list) == 1:
block = pred_list[0]
if get_basic_block_size(block) == 4:
continue
else:
break
else:
break
block2 = block
num = 0
i = 0
while True:
i += 1
succs_block = block2.succs()
for succ in succs_block:
child_succs = succ.succs()
succ_list = list(child_succs)
if len(succ_list) != 0:
block2 = succ
num += 1
if num > 2:
block = ori_ret_block
break
if i > 2:
break
return block.start_ea
def find_all_real_block(func_ea):
blocks = idaapi.FlowChart(idaapi.get_func(func_ea))
loop_heads = find_loop_heads(func_ea)
print(f"循环头数量:{len(loop_heads)}----{[hex(loop_head) for loop_head in loop_heads]}")
all_real_block=[]
for loop_head_addr in loop_heads:
loop_head_block = get_block_by_address(loop_head_addr)
loop_head_preds = list(loop_head_block.preds())
loop_head_preds_addr = [loop_head_pred.start_ea for loop_head_pred in loop_head_preds]
converge_addr = find_converge_addr(loop_head_addr)
real_blocks = []
if loop_head_addr != converge_addr:
loop_head_preds_addr.remove(converge_addr)
real_blocks.extend(loop_head_preds_addr)
converge_block = get_block_by_address(converge_addr)
list_preds = list(converge_block.preds())
for pred_block in list_preds:
end_ea = pred_block.end_ea
last_ins_ea = idc.prev_head(end_ea)
mnem = idc.print_insn_mnem(last_ins_ea)
size = get_basic_block_size(pred_block)
if size > 4 and "B." not in mnem:
start_ea = pred_block.start_ea
mnem = idc.print_insn_mnem(start_ea)
if mnem == "CSEL":
csel_preds = pred_block.preds()
for csel_pred in csel_preds:
real_blocks.append(csel_pred.start_ea)
else:
real_blocks.append(pred_block.start_ea)
real_blocks.sort()
all_real_block.append(real_blocks)
print("子循环头:", [hex(child_block_ea) for child_block_ea in real_blocks])
ret_addr = find_ret_block_addr(blocks)
all_real_block.append(ret_addr)
print("all_real_block:",all_real_block)
all_real_block_list = []
for real_blocks in all_real_block:
if isinstance(real_blocks, list):
all_real_block_list.extend(real_blocks)
else:
all_real_block_list.append(real_blocks)
for real_block_ea in all_real_block_list:
add_block_color(real_block_ea)
print("\n所有真实块获取完成")
print("===========INT===============")
print(all_real_block_list)
print("===========HEX===============")
print(f"数量:{len(all_real_block_list)}")
print([hex(real_block_ea) for real_block_ea in all_real_block_list],"\n")
all_child_prologue_addr = all_real_block.copy()
all_child_prologue_addr.remove(ret_addr)
all_child_prologue_addr.remove(all_child_prologue_addr[0])
print("所有子序言块相关的真实块地址:",all_child_prologue_addr)
all_child_prologue_last_ins_ea = []
for child_prologue_array in all_child_prologue_addr:
child_prologue_addr = child_prologue_array[0]
child_prologue_block = get_block_by_address(child_prologue_addr)
child_prologue_end_ea = child_prologue_block.end_ea
child_prologue_last_ins_ea = idc.prev_head(child_prologue_end_ea)
all_child_prologue_last_ins_ea.append(child_prologue_last_ins_ea)
print("所有子序言块的最后一条指令的地址:", all_child_prologue_last_ins_ea)
return all_real_block,all_child_prologue_addr,all_child_prologue_last_ins_ea
func_ea = 0x41D08
reals = find_all_real_block(func_ea)
proj = angr.Project(file_path, auto_load_libs=False)
base = proj.loader.min_addr
func_addr = base + func_offset
init_state = proj.factory.blank_state(addr=func_addr)
init_state.options.add(angr.options.CALLLESS)
proj = angr.Project(file_path, auto_load_libs=False)
base = proj.loader.min_addr
传播安全知识、拓宽行业人脉——看雪讲师团队等你加入!
最后于 2025-4-21 01:24
被九天666编辑
,原因: