新年第一帖!
年初做了道很有意思的代码混淆题crackme_pls
,自闭了一个多星期后终于想出了相对比较好的去混淆方法,在这里给大伙分享一下分析思路。
题目附件和源代码:https://github.com/lkmidas/MyCTFPublic/tree/main/tetctf2022/crackme_pls
以main函数为例,这题的控制流图是这样的(IDA识别函数范围有误,需要手动修复一下):
为啥说是魔改的控制流平坦化呢?因为这个混淆从分发块到真实块之间是间接跳转,因此更难处理。
所有的字符串常量都已经被加密了,运行时在init()函数中解密,但这个混淆其实对我们分析程序的干扰不大,动调一下就能拿到解密后的字符串:
所有的函数调用都变成了间接调用,IDA无法识别函数的真实地址:
这一坨看起来奇奇怪怪的式子实际上就是函数表的地址加上一个偏移,然后从这个地址取出函数的真实地址,40CAA0就是函数表的地址,之所以看起来这么“奇怪,是因为加法运算经过了MBA表达式替换混淆。
复原MBA表达式确实很困难,但实际上我们并不需要复原MBA表达式,直接动态调试就能拿到函数的真实地址,符号执行、手动计算也是OK的。还有个WP说Ghirda和Binary Ninja能识别出函数的真实地址在函数表中的位置,我用Binary Ninja试了一下确实可以,还是挺香的:
IDA在main函数下断点动调,然后就能在函数表中找到对应的真实地址,比如这里其实是调用了fork函数,在Binary Ninja中把函数名改掉,代码的可读性就很高了:
所以去混淆的关键是恢复被平坦化的控制流。
这题是控制流平坦化变体,所以函数被混淆后的结构和去混淆方法其实和控制流平坦化是有异曲同工之妙的,关于怎么去普通的控制流平坦化,可以参考一下两篇文章:
参考WP:
所谓知己知彼才能百战百胜,首先我们来研究下函数被混淆的结构。总体来说跟控制流平坦化是很像的,因此我们可以把控制流平坦化的结构图拿来作为参考,对这道题的CFG进行划分,还是以main函数为例:
这个划分规则暂时还是非常适用的(注意,只是暂时适用,一个小伏笔)。
完整的过程是每个真实块执行完毕后都会返回预分发块,预分发块返回主分发块,再由主分发块经过若干个自分发块到达下一个真实块,和控制流平坦化是一样的,只不过跳转方式由直接跳转变成了间接跳转。
这题跟控制流平坦化很像,那么能不能直接套用控制流平坦化的去混淆思路呢?答案是不能,主要有两点原因:
但是这两个问题也是我们不得不面对的问题,因为要恢复控制流就得找到真实块之间的跳转关系,要找到真实块就得拿到函数的CFG。既然常规的思路没法结果,那我们就发挥才智想出一些新的方法来,解决方案如下:
既然angr的“正规”手段分析不出函数CFG,那么我们就用一些“野蛮”手段来生成一个简易的CFG。
首先定义一个叫做SuperBlock
的类,表示一个IDA-like的基本块,跟angr-management中的SuperCFGNode
很相似,这里重新定义一个是为了方便后续处理:
接下来怎么分析出一个函数中的所有SuperNode
呢,方法很简单粗暴:
我写了一个FunctionCFG
来表示函数的CFG,这个简易CFG存储了函数的所有SuperNode
,但没有保存基本块之间的跳转关系:
来测试一下用这种方法分析CFG的效果:
输出,经过检验结果是正确的:
至此,我们已经拿到了函数的全部基本块。
在控制流平坦化的去混淆中,我们可以通过基本块的前驱块和后继块来识别哪些是真实块,但在上一步中我们仅仅是得到了函数的所有基本块,而没有识别基本块之间的跳转关系,所以传统的方法行不通。
但仍然有很简单粗暴的方法来识别哪些是真实块:
代码如下,main函数中还没有出现call exit,所以其实可以先忽略这种情况:
其中useless_blocks即无用块包括主分发块、子分发块以及预分发块,这些基本块是没有用的,之后都要被nop掉,所以顺便也收集起来。
打印一下结果:
输出,也是完全正确的:
之前提到了不能直接通过符号执行找到真实块的后继真实块,那么就从函数入口点开始符号执行,explore到当前的真实块,然后继续符号执行找到下一个真实块?这种方法或许可行,但是速度肯定很慢。我们可以用另一个方法曲线救国。
通过分析汇编我们发现程序在分发块中的执行路径其实是由eax这个寄存器决定的,也就是每一个真实块对应了一个固定的值,只要程序在主分发块时eax的值等于该分发块对应的值,那么程序接下来就会到达这个真实块:
那么怎么找到每个真实块对应的eax值呢?最简单的方法肯定是符号执行。我的思路是:
代码实现如下:
输出:
如何得到真实块的后继真实块呢?只需要知道真实块执行完毕后eax的值即可,有两个方法:一个是符号执行,另外一个是正则匹配。一开始我用的是符号执行的方法,但后来在处理其他函数的时候我发现符号执行处理不了一些特殊情况,无奈只能改成正则匹配了。
正则匹配的方法如下:
通过正则匹配可以识别出哪些真实块是绝对跳转,哪些真实块是条件跳转,同时也能得到后继块的key,进而通过k2n_map得到跳转的地址。条件跳转的类型其实就是cmov指令的类型(例如cmovnz对应jnz)。如此以来便可以得到真实块之间的跳转关系。
通过正则匹配得到了真实块之间的跳转关系之后就可以直接patch了,代码实现如下:
首先丢进虚拟机跑一下确定没有爆炸:
IDA中查看,控制流已经恢复:
Binary Ninja查看,恢复一下函数符号和字符串,代码的逻辑就很清晰了:
在复原后的main函数中我们可以看到子进程读取了60个字节的输入之后调用了child_func函数,接下来我们来看看如何复原child_func函数。
还是先来看看混淆结构:
乍一看好像变成两层的控制流平坦化了?仔细分析发现原来是有几个基本块“特立独行”,它们的末尾是直接跳转,但没有像正常真实块那样跳转到预分发块,而是直接跳到了另外一个真实块:
至于为什么会出现这种特立独行的基本块,根据题目的源代码来看,应该是因为编译器的神奇优化,将这几个异或操作之后的赋值操作合并到一个基本块中了(也即是402AA0中的mov [rax], cl
指令):
对于这几个特立独行的基本块,我们在分析时仍然将其当做真实块,但不要对它的跳转进行Patch。v1版本的去混淆脚本仍然适用,因为这几个基本块中没有mov eax, xxx
指令,也就会被认为没有后继真实块,自然不会被Patch。
还是先运行一下,确定没有爆炸:
拖进IDA查看,基本是正常的:
用Biary Ninja复原一下函数调用,ptrace
表明有父进程调试子进程的操作:
中间的一大坨循环暂且跳过,再看看最后的函数调用:
中间的循环Binary Ninja的反汇编效果不是很好,所以还是用IDA看,结合最后的函数调用可以看出是一个解密ELF文件然后执行的操作:
加密后的ELF前四个字节与输入的前四个字节异或,要得到合法的ELF头0x7F和ELF三个字符
。于是得到了输入的前四个字符:
动态调试把解密后的 ELF dump下来,因为这个帖主要是讲去混淆,所以这部分就不细说了。
这题最难搞的函数,首先还是来看看CFG,基本块展开成了4层:
决定接下来要跳转到哪个真实块的关键寄存器变成了ecx
:
第一层基本块都是真实块,直接跳转到预分发块:
第二层基本块都跳转到了第一层的两个基本块之一,并且两个基本块没有mov ecx, xxx
指令,也就是说接下来要跳转到哪个真实块实际上是由第二层的基本块自己决定的:
这样就导致了patch工作非常困难,显然不能在标红框的两个基本块中patch跳转,因为后继真实块不是由它们决定的;但如果让第二层的基本块直接跳转到后继真实块,又会漏掉第一层基本块的一些指令,导致程序出错。
我没有想到更好的解决办法,只能牺牲掉一些准确性保全大局,如果你有更好的主意欢迎交流讨论。
再看到第三层基本块,都跳转到了第二层基本块中的三个之一:
第四层的情况也差不多:
这里还有两个以call exit
结尾的基本块,很容易看出来,这两个基本块我们待会也要作为特殊情况考虑进去:
因为关键寄存器变成了ecx
,所以这里要改为:
另外虽然关键寄存器变成了ecx
,实际上是预分发块中有一条指令mov eax, ecx
,子分发块还是根据eax
来跳转的:
所以处理一下序言块到第一个真实块的特殊情况:
然后来处理一下这种情况,mov ecx, xxx
和mov eax, xxx
以及cmovxx ecx, eax
不在同一个基本块中:
那么我们需要在修复控制流时加上一种情况,如果在当前基本块发现了mov ecx, xxx
和mov eax, xxx
但是没有发现cmovxx ecx, eax
指令,就往它的直接后继基本块继续搜索:
接下来处理两个以call exit
结尾的基本块:
手动记录一下两条call exit
指令的地址,在分析SuperBlock
时把这两个地址也考虑进去:
本来我以为这样就搞定了,但接下来我又发现了新的问题,在输出的日志中我发现有些应该有后继真实块的基本块却找不到后继真实块,例如:
很明显loc_403C20
的后继真实块应该是326BF726h
和1594BB6Ah
对应的基本块,而我们在恢复跳转关系的时候却找不到后继块,这是因为我们根本没有找到0x326BF726
对应的真实块,真是奇了怪了:
如果在主分发块处将eax设为claripy.BVV(0x326BF726, 32)
开始符号执行,会发现它在返回预分发块之前到达了0x403230
基本块:
这个基本块跟其它子分发块有点相似,但是有不完全相同,体现在它多出了一些指令,这些指令实际上本来应该是0x326BF726
对应的真实块,貌似又是编译器神奇优化的锅:
除了0x403230
之外,后续还发现了一些这样的基本块,总共有三个,分别对应326BF726h
、3311C734h
和703837C4h
:
把这三个“小霸王”也考虑进去:
这时修复控制流的结果就正常了。
运行爆炸了,在意料之中:
IDA打开看看,控制流恢复得还是挺不错的:
Binary Ninja查看代码可读性还是很高的,就是漏了一些地方:
与源代码对比:
这题的加密逻辑有点复杂,是一个父进程调试子进程的模式,还有其他ELF释放,所以我也没继续分析下去了,感兴趣的可以看看上面贴的WP或者题目的源代码。
至于剩下几个被混淆的函数就比较简单了,跟main函数类似,用v1版本的去混淆脚本能够通杀,所以这里不再赘述。v2版本的代码相当于是在v1的基础上缝缝补补了一些东西,很不优雅,如果大家有更巧妙的思路欢迎交流讨论。
class
SuperBlock:
def
__init__(
self
, block_addr):
self
.addr
=
block_addr
self
.blocks
=
[]
self
.size
=
0
def
insert_block(
self
, block):
self
.blocks.append(block)
self
.size
+
=
block.size
@property
def
terminator(
self
):
return
self
.blocks[
-
1
].capstone.insns[
-
1
]
@property
def
successor(
self
):
terminator
=
self
.terminator
try
:
return
int
(terminator.op_str,
16
)
except
:
return
None
@property
def
insns(
self
):
insns
=
[]
for
block
in
self
.blocks:
insns
+
=
block.capstone.insns
return
insns
def
__repr__(
self
):
return
"<SuperBlock %#08x->%#08x>"
%
(
self
.addr,
self
.addr
+
self
.size)
def
__hash__(
self
):
return
hash
((
'superblock'
,
self
.addr))
class
SuperBlock:
def
__init__(
self
, block_addr):
self
.addr
=
block_addr
self
.blocks
=
[]
self
.size
=
0
def
insert_block(
self
, block):
self
.blocks.append(block)
self
.size
+
=
block.size
@property
def
terminator(
self
):
return
self
.blocks[
-
1
].capstone.insns[
-
1
]
@property
def
successor(
self
):
terminator
=
self
.terminator
try
:
return
int
(terminator.op_str,
16
)
except
:
return
None
@property
def
insns(
self
):
insns
=
[]
for
block
in
self
.blocks:
insns
+
=
block.capstone.insns
return
insns
def
__repr__(
self
):
return
"<SuperBlock %#08x->%#08x>"
%
(
self
.addr,
self
.addr
+
self
.size)
def
__hash__(
self
):
return
hash
((
'superblock'
,
self
.addr))
class
FunctionCFG:
def
__init__(
self
, proj, func_addr, func_end):
self
.proj
=
proj
self
.addr
=
func_addr
self
.size
=
func_end
-
func_addr
def
get_super_block(
self
, block_addr):
super_block
=
SuperBlock(block_addr)
cur_addr
=
block_addr
while
True
:
block
=
self
.proj.factory.block(cur_addr)
if
block.size
=
=
0
:
return
super_block
insns
=
block.capstone.insns
super_block.insert_block(block)
cur_addr
+
=
block.size
if
insns[
-
1
].mnemonic.startswith(
'j'
)
or
insns[
-
1
].mnemonic
=
=
'ret'
:
return
super_block
def
dump_all_blocks(
self
):
curr_addr
=
self
.addr
blocks
=
[]
while
curr_addr
-
self
.addr <
self
.size:
super_block
=
self
.get_super_block(curr_addr)
if
super_block.size
=
=
0
:
curr_addr
+
=
1
continue
blocks.append(super_block)
curr_addr
+
=
super_block.size
return
blocks
class
FunctionCFG:
def
__init__(
self
, proj, func_addr, func_end):
self
.proj
=
proj
self
.addr
=
func_addr
self
.size
=
func_end
-
func_addr
def
get_super_block(
self
, block_addr):
super_block
=
SuperBlock(block_addr)
cur_addr
=
block_addr
while
True
:
block
=
self
.proj.factory.block(cur_addr)
if
block.size
=
=
0
:
return
super_block
insns
=
block.capstone.insns
super_block.insert_block(block)
cur_addr
+
=
block.size
if
insns[
-
1
].mnemonic.startswith(
'j'
)
or
insns[
-
1
].mnemonic
=
=
'ret'
:
return
super_block
def
dump_all_blocks(
self
):
curr_addr
=
self
.addr
blocks
=
[]
while
curr_addr
-
self
.addr <
self
.size:
super_block
=
self
.get_super_block(curr_addr)
if
super_block.size
=
=
0
:
curr_addr
+
=
1
continue
blocks.append(super_block)
curr_addr
+
=
super_block.size
return
blocks
if
__name__
=
=
'__main__'
:
logging.getLogger(
'cle'
).setLevel(logging.ERROR)
logging.getLogger(
'angr'
).setLevel(logging.ERROR)
logger
=
logging.getLogger(
'deobfu'
)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.FileHandler(filename
=
'%s.log'
%
time.strftime(
"%Y-%m-%d-%H-%M-%S"
, time.localtime())))
filename
=
'crackme-pls.bin'
proj
=
angr.Project(filename)
base_addr
=
proj.loader.main_object.mapped_base
reverse_plt
=
proj.loader.main_object.reverse_plt
with
open
(filename,
'rb'
) as
file
:
content
=
bytearray(
file
.read())
func_addr, func_end
=
0x403D90
,
0x40419F
cfg
=
FunctionCFG(proj, func_addr, func_end)
print
(cfg.dump_all_blocks())
if
__name__
=
=
'__main__'
:
logging.getLogger(
'cle'
).setLevel(logging.ERROR)
logging.getLogger(
'angr'
).setLevel(logging.ERROR)
logger
=
logging.getLogger(
'deobfu'
)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.FileHandler(filename
=
'%s.log'
%
time.strftime(
"%Y-%m-%d-%H-%M-%S"
, time.localtime())))
filename
=
'crackme-pls.bin'
proj
=
angr.Project(filename)
base_addr
=
proj.loader.main_object.mapped_base
reverse_plt
=
proj.loader.main_object.reverse_plt
with
open
(filename,
'rb'
) as
file
:
content
=
bytearray(
file
.read())
func_addr, func_end
=
0x403D90
,
0x40419F
cfg
=
FunctionCFG(proj, func_addr, func_end)
print
(cfg.dump_all_blocks())
[<SuperBlock
0x403d90
-
>
0x403f4d
>, <SuperBlock
0x403f4d
-
>
0x403f5f
>, <SuperBlock
0x403f5f
-
>
0x403f8b
>, <SuperBlock
0x403f8b
-
>
0x403f9d
>, <SuperBlock
0x403f9d
-
>
0x403fb4
>, <SuperBlock
0x403fb4
-
>
0x403fc4
>, <SuperBlock
0x403fc4
-
>
0x403fe2
>, <SuperBlock
0x403fe2
-
>
0x403ff4
>, <SuperBlock
0x403ff4
-
>
0x404065
>, <SuperBlock
0x404065
-
>
0x40407e
>, <SuperBlock
0x40407e
-
>
0x404095
>, <SuperBlock
0x404095
-
>
0x4040bf
>, <SuperBlock
0x4040bf
-
>
0x4040ce
>, <SuperBlock
0x4040ce
-
>
0x4040e5
>, <SuperBlock
0x4040e5
-
>
0x4040fc
>, <SuperBlock
0x4040fc
-
>
0x40410e
>, <SuperBlock
0x40410e
-
>
0x40418b
>, <SuperBlock
0x40418b
-
>
0x40419f
>]
[<SuperBlock
0x403d90
-
>
0x403f4d
>, <SuperBlock
0x403f4d
-
>
0x403f5f
>, <SuperBlock
0x403f5f
-
>
0x403f8b
>, <SuperBlock
0x403f8b
-
>
0x403f9d
>, <SuperBlock
0x403f9d
-
>
0x403fb4
>, <SuperBlock
0x403fb4
-
>
0x403fc4
>, <SuperBlock
0x403fc4
-
>
0x403fe2
>, <SuperBlock
0x403fe2
-
>
0x403ff4
>, <SuperBlock
0x403ff4
-
>
0x404065
>, <SuperBlock
0x404065
-
>
0x40407e
>, <SuperBlock
0x40407e
-
>
0x404095
>, <SuperBlock
0x404095
-
>
0x4040bf
>, <SuperBlock
0x4040bf
-
>
0x4040ce
>, <SuperBlock
0x4040ce
-
>
0x4040e5
>, <SuperBlock
0x4040e5
-
>
0x4040fc
>, <SuperBlock
0x4040fc
-
>
0x40410e
>, <SuperBlock
0x40410e
-
>
0x40418b
>, <SuperBlock
0x40418b
-
>
0x40419f
>]
def
anaylse_blocks(
self
):
all_blocks
=
self
.dump_all_blocks()
relevant_blocks
=
[]
useless_blocks
=
[]
for
block
in
all_blocks:
if
block.addr !
=
self
.addr
and
\
(block.successor
is
not
None
or
block.terminator.mnemonic
=
=
'ret'
or
block.terminator.mnemonic
=
=
'call'
):
relevant_blocks.append(block)
elif
block.addr !
=
self
.addr:
useless_blocks.append(block)
return
relevant_blocks, useless_blocks
def
anaylse_blocks(
self
):
all_blocks
=
self
.dump_all_blocks()
relevant_blocks
=
[]
useless_blocks
=
[]
for
block
in
all_blocks:
if
block.addr !
=
self
.addr
and
\
(block.successor
is
not
None
or
block.terminator.mnemonic
=
=
'ret'
or
block.terminator.mnemonic
=
=
'call'
):
relevant_blocks.append(block)
elif
block.addr !
=
self
.addr:
useless_blocks.append(block)
return
relevant_blocks, useless_blocks
relevant_blocks, useless_blocks
=
cfg.anaylse_blocks()
print
(
'Revelvant blocks: %s\nUseless blocks: %s'
%
(relevant_blocks, useless_blocks))
relevant_blocks, useless_blocks
=
cfg.anaylse_blocks()
print
(
'Revelvant blocks: %s\nUseless blocks: %s'
%
(relevant_blocks, useless_blocks))
Revelvant blocks: [<SuperBlock
0x403fb4
-
>
0x403fc4
>, <SuperBlock
0x403ff4
-
>
0x404065
>, <SuperBlock
0x404095
-
>
0x4040bf
>, <SuperBlock
0x4040e5
-
>
0x4040fc
>, <SuperBlock
0x40410e
-
>
0x40418b
>, <SuperBlock
0x40418b
-
>
0x40419f
>]
Useless blocks: [<SuperBlock
0x403f4d
-
>
0x403f5f
>, <SuperBlock
0x403f5f
-
>
0x403f8b
>, <SuperBlock
0x403f8b
-
>
0x403f9d
>, <SuperBlock
0x403f9d
-
>
0x403fb4
>, <SuperBlock
0x403fc4
-
>
0x403fe2
>, <SuperBlock
0x403fe2
-
>
0x403ff4
>, <SuperBlock
0x404065
-
>
0x40407e
>, <SuperBlock
0x40407e
-
>
0x404095
>, <SuperBlock
0x4040bf
-
>
0x4040ce
>, <SuperBlock
0x4040ce
-
>
0x4040e5
>, <SuperBlock
0x4040fc
-
>
0x40410e
>]
Revelvant blocks: [<SuperBlock
0x403fb4
-
>
0x403fc4
>, <SuperBlock
0x403ff4
-
>
0x404065
>, <SuperBlock
0x404095
-
>
0x4040bf
>, <SuperBlock
0x4040e5
-
>
0x4040fc
>, <SuperBlock
0x40410e
-
>
0x40418b
>, <SuperBlock
0x40418b
-
>
0x40419f
>]
Useless blocks: [<SuperBlock
0x403f4d
-
>
0x403f5f
>, <SuperBlock
0x403f5f
-
>
0x403f8b
>, <SuperBlock
0x403f8b
-
>
0x403f9d
>, <SuperBlock
0x403f9d
-
>
0x403fb4
>, <SuperBlock
0x403fc4
-
>
0x403fe2
>, <SuperBlock
0x403fe2
-
>
0x403ff4
>, <SuperBlock
0x404065
-
>
0x40407e
>, <SuperBlock
0x40407e
-
>
0x404095
>, <SuperBlock
0x4040bf
-
>
0x4040ce
>, <SuperBlock
0x4040ce
-
>
0x4040e5
>, <SuperBlock
0x4040fc
-
>
0x40410e
>]
def
map_k2n():
state
=
proj.factory.call_state(addr
=
func_addr)
state.inspect.b(
'call'
, when
=
angr.BP_BEFORE, action
=
skip_call)
simgr
=
proj.factory.simgr(state)
simgr.explore(find
=
dispatcher_node)
found
=
simgr.found[
0
]
key
=
claripy.BVS(
'key'
,
32
)
found.regs.eax
=
key
simgr
=
proj.factory.simgr(found)
visited
=
[]
k2n_map
=
{}
while
len
(simgr.active):
simgr.move(from_stash
=
'active'
, to_stash
=
'deadended'
, filter_func
=
lambda
state: state.addr
in
visited)
for
active
in
simgr.active:
visited.append(active.addr)
if
active.addr
in
relevant_nodes:
key_val
=
active.solver.
eval
(key)
k2n_map[key_val]
=
active.addr
logger.debug(
'[*] Mapped node(%#x)->key(%#x) [%d/%d]'
%
(active.addr, key_val,
len
(k2n_map),
len
(relevant_nodes)))
simgr.step()
return
k2n_map
def
map_k2n():
state
=
proj.factory.call_state(addr
=
func_addr)
state.inspect.b(
'call'
, when
=
angr.BP_BEFORE, action
=
skip_call)
simgr
=
proj.factory.simgr(state)
simgr.explore(find
=
dispatcher_node)
found
=
simgr.found[
0
]
key
=
claripy.BVS(
'key'
,
32
)
found.regs.eax
=
key
simgr
=
proj.factory.simgr(found)
visited
=
[]
k2n_map
=
{}
while
len
(simgr.active):
simgr.move(from_stash
=
'active'
, to_stash
=
'deadended'
, filter_func
=
lambda
state: state.addr
in
visited)
for
active
in
simgr.active:
visited.append(active.addr)
if
active.addr
in
relevant_nodes:
key_val
=
active.solver.
eval
(key)
k2n_map[key_val]
=
active.addr
logger.debug(
'[*] Mapped node(%#x)->key(%#x) [%d/%d]'
%
(active.addr, key_val,
len
(k2n_map),
len
(relevant_nodes)))
simgr.step()
return
k2n_map
cfg
=
FunctionCFG(proj, func_addr, func_end)
prologue_block
=
cfg.get_super_block(func_addr)
dispatcher_node
=
prologue_block.successor
relevant_blocks, useless_blocks
=
cfg.anaylse_blocks()
relevant_nodes
=
[block.addr
for
block
in
relevant_blocks]
k2n_map
=
map_k2n()
cfg
=
FunctionCFG(proj, func_addr, func_end)
prologue_block
=
cfg.get_super_block(func_addr)
dispatcher_node
=
prologue_block.successor
relevant_blocks, useless_blocks
=
cfg.anaylse_blocks()
relevant_nodes
=
[block.addr
for
block
in
relevant_blocks]
k2n_map
=
map_k2n()
[
*
] Mapped node(
0x403fb4
)
-
>key(
0x1c8d08b0
) [
1
/
6
]
[
*
] Mapped node(
0x403ff4
)
-
>key(
0x477f1a08
) [
2
/
6
]
[
*
] Mapped node(
0x404095
)
-
>key(
0x1d507da0
) [
3
/
6
]
[
*
] Mapped node(
0x40410e
)
-
>key(
0x4027245b
) [
4
/
6
]
[
*
] Mapped node(
0x4040e5
)
-
>key(
0x4d20e191
) [
5
/
6
]
[
*
] Mapped node(
0x40418b
)
-
>key(
0x5282f1e2
) [
6
/
6
]
[
*
] Mapped node(
0x403fb4
)
-
>key(
0x1c8d08b0
) [
1
/
6
]
[
*
] Mapped node(
0x403ff4
)
-
>key(
0x477f1a08
) [
2
/
6
]
[
*
] Mapped node(
0x404095
)
-
>key(
0x1d507da0
) [
3
/
6
]
[
*
] Mapped node(
0x40410e
)
-
>key(
0x4027245b
) [
4
/
6
]
[
*
] Mapped node(
0x4040e5
)
-
>key(
0x4d20e191
) [
5
/
6
]
[
*
] Mapped node(
0x40418b
)
-
>key(
0x5282f1e2
) [
6
/
6
]
def
fix_branch():
for
block
in
useless_blocks:
fill_nops(block.addr, block.size)
relevant_blocks.append(prologue_block)
for
block
in
relevant_blocks:
key_reg, sub_key_reg
=
'eax'
,
'ecx'
key, sub_key
=
None
,
None
cmov_mnemonic
=
None
for
insn
in
block.insns:
mnemonic, op_str
=
insn.mnemonic, insn.op_str
if
mnemonic
=
=
'mov'
:
if
re.match(rf
'{key_reg}, 0x[0-9a-f]+'
, op_str):
tmp_key
=
int
(op_str.replace(f
'{key_reg}, '
, ''),
16
)
if
tmp_key
in
k2n_map:
key
=
tmp_key
elif
re.match(rf
'{sub_key_reg}, 0x[0-9a-f]+'
, op_str):
tmp_sub_key
=
int
(op_str.replace(f
'{sub_key_reg}, '
, ''),
16
)
if
tmp_sub_key
in
k2n_map:
sub_key
=
tmp_sub_key
elif
mnemonic.startswith(
'cmov'
)
and
op_str
=
=
f
'{key_reg}, {sub_key_reg}'
:
cmov_mnemonic
=
mnemonic
terminator
=
block.terminator
if
key
is
not
None
and
sub_key
is
None
:
logger.debug(
'[*] Node(%#x) has one successor(%#x)'
%
(block.addr, k2n_map[key]))
fill_jmp(terminator.address, k2n_map[key])
elif
key
is
not
None
and
sub_key
is
not
None
:
if
cmov_mnemonic
is
not
None
:
logger.debug(
'[*] Node(%#x) has two successors(%#x, %#x)'
%
(block.addr, k2n_map[key], k2n_map[sub_key]))
fill_jx(terminator.address, k2n_map[sub_key], cmov_mnemonic)
fill_jmp(terminator.address
+
6
, k2n_map[key])
else
:
logger.error(
'[*] Error at block(%#x)'
%
block.addr)
exit(
0
)
else
:
logger.debug(
'[*] Node(%#x) has no successor'
%
block.addr)
def
fix_branch():
for
block
in
useless_blocks:
fill_nops(block.addr, block.size)
relevant_blocks.append(prologue_block)
for
block
in
relevant_blocks:
key_reg, sub_key_reg
=
'eax'
,
'ecx'
key, sub_key
=
None
,
None
cmov_mnemonic
=
None
for
insn
in
block.insns:
mnemonic, op_str
=
insn.mnemonic, insn.op_str
if
mnemonic
=
=
'mov'
:
if
re.match(rf
'{key_reg}, 0x[0-9a-f]+'
, op_str):
tmp_key
=
int
(op_str.replace(f
'{key_reg}, '
, ''),
16
)
if
tmp_key
in
k2n_map:
key
=
tmp_key
elif
re.match(rf
'{sub_key_reg}, 0x[0-9a-f]+'
, op_str):
tmp_sub_key
=
int
(op_str.replace(f
'{sub_key_reg}, '
, ''),
16
)
if
tmp_sub_key
in
k2n_map:
sub_key
=
tmp_sub_key
elif
mnemonic.startswith(
'cmov'
)
and
op_str
=
=
f
'{key_reg}, {sub_key_reg}'
:
cmov_mnemonic
=
mnemonic
terminator
=
block.terminator
if
key
is
not
None
and
sub_key
is
None
:
logger.debug(
'[*] Node(%#x) has one successor(%#x)'
%
(block.addr, k2n_map[key]))
fill_jmp(terminator.address, k2n_map[key])
elif
key
is
not
None
and
sub_key
is
not
None
:
if
cmov_mnemonic
is
not
None
:
logger.debug(
'[*] Node(%#x) has two successors(%#x, %#x)'
%
(block.addr, k2n_map[key], k2n_map[sub_key]))
fill_jx(terminator.address, k2n_map[sub_key], cmov_mnemonic)
fill_jmp(terminator.address
+
6
, k2n_map[key])
else
:
logger.error(
'[*] Error at block(%#x)'
%
block.addr)
exit(
0
)
else
:
logger.debug(
'[*] Node(%#x) has no successor'
%
block.addr)
import
angr
import
logging
import
re
import
claripy
from
keystone
import
*
import
sys
import
time
def
fill_nops(addr, size):
offset
=
addr
-
base_addr
content[offset:offset
+
size]
=
b
'\x90'
*
size
def
fill_jmp(src, dest):
offset
=
src
-
base_addr
if
dest !
=
src
+
5
:
content[offset]
=
0xE9
content[offset
+
1
:offset
+
5
]
=
(dest
-
src
-
5
).to_bytes(
4
,
'little'
, signed
=
True
)
else
:
fill_nops(src,
5
)
def
get_jx_opcode(jx_type):
ks
=
Ks(KS_ARCH_X86, KS_MODE_64)
code, count
=
ks.asm(f
'{jx_type} 0xFFFFFFFF'
)
return
b''.join(
map
(
lambda
x: x.to_bytes(
1
, sys.byteorder), code[
0
:
2
]))
def
fill_jx(src, dest, cmov_type):
offset
=
src
-
base_addr
content[offset:offset
+
2
]
=
get_jx_opcode(cmov_type.replace(
'cmov'
,
'j'
))
content[offset
+
2
:offset
+
6
]
=
(dest
-
src
-
6
).to_bytes(
4
,
'little'
, signed
=
True
)
class
SuperBlock:
def
__init__(
self
, block_addr):
self
.addr
=
block_addr
self
.blocks
=
[]
self
.size
=
0
def
insert_block(
self
, block):
self
.blocks.append(block)
self
.size
+
=
block.size
@property
def
terminator(
self
):
return
self
.blocks[
-
1
].capstone.insns[
-
1
]
@property
def
successor(
self
):
terminator
=
self
.terminator
try
:
return
int
(terminator.op_str,
16
)
except
:
return
None
@property
def
insns(
self
):
insns
=
[]
for
block
in
self
.blocks:
insns
+
=
block.capstone.insns
return
insns
def
__repr__(
self
):
return
"<SuperBlock %#08x->%#08x>"
%
(
self
.addr,
self
.addr
+
self
.size)
def
__hash__(
self
):
return
hash
((
'superblock'
,
self
.addr))
class
FunctionCFG:
def
__init__(
self
, proj, func_addr, func_end):
self
.proj
=
proj
self
.addr
=
func_addr
self
.size
=
func_end
-
func_addr
def
get_super_block(
self
, block_addr):
super_block
=
SuperBlock(block_addr)
cur_addr
=
block_addr
while
True
:
block
=
self
.proj.factory.block(cur_addr)
if
block.size
=
=
0
:
return
super_block
insns
=
block.capstone.insns
super_block.insert_block(block)
cur_addr
+
=
block.size
if
insns[
-
1
].mnemonic.startswith(
'j'
)
or
insns[
-
1
].mnemonic
=
=
'ret'
:
return
super_block
def
dump_all_blocks(
self
):
curr_addr
=
self
.addr
blocks
=
[]
while
curr_addr
-
self
.addr <
self
.size:
super_block
=
self
.get_super_block(curr_addr)
if
super_block.size
=
=
0
:
curr_addr
+
=
1
continue
blocks.append(super_block)
curr_addr
+
=
super_block.size
return
blocks
def
anaylse_blocks(
self
):
all_blocks
=
self
.dump_all_blocks()
relevant_blocks
=
[]
useless_blocks
=
[]
for
block
in
all_blocks:
if
block.addr !
=
self
.addr
and
\
(block.successor
is
not
None
or
block.terminator.mnemonic
=
=
'ret'
or
block.terminator.mnemonic
=
=
'call'
):
relevant_blocks.append(block)
elif
block.addr !
=
self
.addr:
useless_blocks.append(block)
return
relevant_blocks, useless_blocks
def
skip_call(state):
proj.hook(state.addr, angr.SIM_PROCEDURES[
"stubs"
][
"ReturnUnconstrained"
](), replace
=
True
)
def
map_k2n():
state
=
proj.factory.call_state(addr
=
func_addr)
state.inspect.b(
'call'
, when
=
angr.BP_BEFORE, action
=
skip_call)
simgr
=
proj.factory.simgr(state)
simgr.explore(find
=
dispatcher_node)
found
=
simgr.found[
0
]
key
=
claripy.BVS(
'key'
,
32
)
found.regs.eax
=
key
simgr
=
proj.factory.simgr(found)
visited
=
[]
k2n_map
=
{}
while
len
(simgr.active):
simgr.move(from_stash
=
'active'
, to_stash
=
'deadended'
, filter_func
=
lambda
state: state.addr
in
visited)
for
active
in
simgr.active:
visited.append(active.addr)
if
active.addr
in
relevant_nodes:
key_val
=
active.solver.
eval
(key)
k2n_map[key_val]
=
active.addr
logger.debug(
'[*] Mapped node(%#x)->key(%#x) [%d/%d]'
%
(active.addr, key_val,
len
(k2n_map),
len
(relevant_nodes)))
simgr.step()
return
k2n_map
def
fix_branch():
for
block
in
useless_blocks:
fill_nops(block.addr, block.size)
relevant_blocks.append(prologue_block)
for
block
in
relevant_blocks:
key_reg, sub_key_reg
=
'eax'
,
'ecx'
key, sub_key
=
None
,
None
cmov_mnemonic
=
None
for
insn
in
block.insns:
mnemonic, op_str
=
insn.mnemonic, insn.op_str
if
mnemonic
=
=
'mov'
:
if
re.match(rf
'{key_reg}, 0x[0-9a-f]+'
, op_str):
tmp_key
=
int
(op_str.replace(f
'{key_reg}, '
, ''),
16
)
if
tmp_key
in
k2n_map:
key
=
tmp_key
elif
re.match(rf
'{sub_key_reg}, 0x[0-9a-f]+'
, op_str):
tmp_sub_key
=
int
(op_str.replace(f
'{sub_key_reg}, '
, ''),
16
)
if
tmp_sub_key
in
k2n_map:
sub_key
=
tmp_sub_key
elif
mnemonic.startswith(
'cmov'
)
and
op_str
=
=
f
'{key_reg}, {sub_key_reg}'
:
cmov_mnemonic
=
mnemonic
terminator
=
block.terminator
if
key
is
not
None
and
sub_key
is
None
:
logger.debug(
'[*] Node(%#x) has one successor(%#x)'
%
(block.addr, k2n_map[key]))
fill_jmp(terminator.address, k2n_map[key])
elif
key
is
not
None
and
sub_key
is
not
None
:
if
cmov_mnemonic
is
not
None
:
logger.debug(
'[*] Node(%#x) has two successors(%#x, %#x)'
%
(block.addr, k2n_map[key], k2n_map[sub_key]))
fill_jx(terminator.address, k2n_map[sub_key], cmov_mnemonic)
fill_jmp(terminator.address
+
6
, k2n_map[key])
else
:
logger.error(
'[*] Error at block(%#x)'
%
block.addr)
exit(
0
)
else
:
logger.debug(
'[*] Node(%#x) has no successor'
%
block.addr)
if
__name__
=
=
'__main__'
:
logging.getLogger(
'cle'
).setLevel(logging.ERROR)
logging.getLogger(
'angr'
).setLevel(logging.ERROR)
logger
=
logging.getLogger(
'deobfu'
)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.FileHandler(filename
=
'%s.log'
%
time.strftime(
"%Y-%m-%d-%H-%M-%S"
, time.localtime())))
filename
=
'crackme-pls.bin'
proj
=
angr.Project(filename)
base_addr
=
proj.loader.main_object.mapped_base
reverse_plt
=
proj.loader.main_object.reverse_plt
with
open
(filename,
'rb'
) as
file
:
content
=
bytearray(
file
.read())
func_addr, func_end
=
0x403D90
,
0x40419F
cfg
=
FunctionCFG(proj, func_addr, func_end)
prologue_block
=
cfg.get_super_block(func_addr)
dispatcher_node
=
prologue_block.successor
relevant_blocks, useless_blocks
=
cfg.anaylse_blocks()
relevant_nodes
=
[block.addr
for
block
in
relevant_blocks]
k2n_map
=
map_k2n()
fix_branch()
with
open
(filename
+
'.recover'
,
'wb'
) as
file
:
file
.write(content)
import
angr
import
logging
import
re
import
claripy
from
keystone
import
*
import
sys
import
time
def
fill_nops(addr, size):
offset
=
addr
-
base_addr
content[offset:offset
+
size]
=
b
'\x90'
*
size
def
fill_jmp(src, dest):
offset
=
src
-
base_addr
if
dest !
=
src
+
5
:
content[offset]
=
0xE9
content[offset
+
1
:offset
+
5
]
=
(dest
-
src
-
5
).to_bytes(
4
,
'little'
, signed
=
True
)
else
:
fill_nops(src,
5
)
def
get_jx_opcode(jx_type):
ks
=
Ks(KS_ARCH_X86, KS_MODE_64)
code, count
=
ks.asm(f
'{jx_type} 0xFFFFFFFF'
)
return
b''.join(
map
(
lambda
x: x.to_bytes(
1
, sys.byteorder), code[
0
:
2
]))
def
fill_jx(src, dest, cmov_type):
offset
=
src
-
base_addr
content[offset:offset
+
2
]
=
get_jx_opcode(cmov_type.replace(
'cmov'
,
'j'
))
content[offset
+
2
:offset
+
6
]
=
(dest
-
src
-
6
).to_bytes(
4
,
'little'
, signed
=
True
)
class
SuperBlock:
def
__init__(
self
, block_addr):
self
.addr
=
block_addr
self
.blocks
=
[]
self
.size
=
0
def
insert_block(
self
, block):
self
.blocks.append(block)
self
.size
+
=
block.size
@property
def
terminator(
self
):
return
self
.blocks[
-
1
].capstone.insns[
-
1
]
@property
def
successor(
self
):
terminator
=
self
.terminator
try
:
return
int
(terminator.op_str,
16
)
except
:
return
None
@property
def
insns(
self
):
insns
=
[]
for
block
in
self
.blocks:
insns
+
=
block.capstone.insns
return
insns
def
__repr__(
self
):
return
"<SuperBlock %#08x->%#08x>"
%
(
self
.addr,
self
.addr
+
self
.size)
def
__hash__(
self
):
return
hash
((
'superblock'
,
self
.addr))
class
FunctionCFG:
def
__init__(
self
, proj, func_addr, func_end):
self
.proj
=
proj
self
.addr
=
func_addr
self
.size
=
func_end
-
func_addr
def
get_super_block(
self
, block_addr):
super_block
=
SuperBlock(block_addr)
cur_addr
=
block_addr
while
True
:
block
=
self
.proj.factory.block(cur_addr)
if
block.size
=
=
0
:
return
super_block
insns
=
block.capstone.insns
super_block.insert_block(block)
cur_addr
+
=
block.size
if
insns[
-
1
].mnemonic.startswith(
'j'
)
or
insns[
-
1
].mnemonic
=
=
'ret'
:
return
super_block
def
dump_all_blocks(
self
):
curr_addr
=
self
.addr
blocks
=
[]
while
curr_addr
-
self
.addr <
self
.size:
super_block
=
self
.get_super_block(curr_addr)
if
super_block.size
=
=
0
:
curr_addr
+
=
1
continue
blocks.append(super_block)
curr_addr
+
=
super_block.size
return
blocks
def
anaylse_blocks(
self
):
all_blocks
=
self
.dump_all_blocks()
relevant_blocks
=
[]
useless_blocks
=
[]
for
block
in
all_blocks:
if
block.addr !
=
self
.addr
and
\
(block.successor
is
not
None
or
block.terminator.mnemonic
=
=
'ret'
or
block.terminator.mnemonic
=
=
'call'
):
relevant_blocks.append(block)
elif
block.addr !
=
self
.addr:
useless_blocks.append(block)
return
relevant_blocks, useless_blocks
def
skip_call(state):
proj.hook(state.addr, angr.SIM_PROCEDURES[
"stubs"
][
"ReturnUnconstrained"
](), replace
=
True
)
def
map_k2n():
state
=
proj.factory.call_state(addr
=
func_addr)
state.inspect.b(
'call'
, when
=
angr.BP_BEFORE, action
=
skip_call)
simgr
=
proj.factory.simgr(state)
simgr.explore(find
=
dispatcher_node)
found
=
simgr.found[
0
]
key
=
claripy.BVS(
'key'
,
32
)
found.regs.eax
=
key
simgr
=
proj.factory.simgr(found)
visited
=
[]
k2n_map
=
{}
while
len
(simgr.active):
simgr.move(from_stash
=
'active'
, to_stash
=
'deadended'
, filter_func
=
lambda
state: state.addr
in
visited)
for
active
in
simgr.active:
visited.append(active.addr)
if
active.addr
in
relevant_nodes:
key_val
=
active.solver.
eval
(key)
k2n_map[key_val]
=
active.addr
logger.debug(
'[*] Mapped node(%#x)->key(%#x) [%d/%d]'
%
(active.addr, key_val,
len
(k2n_map),
len
(relevant_nodes)))
simgr.step()
return
k2n_map
def
fix_branch():
for
block
in
useless_blocks:
fill_nops(block.addr, block.size)
relevant_blocks.append(prologue_block)
for
block
in
relevant_blocks:
key_reg, sub_key_reg
=
'eax'
,
'ecx'
key, sub_key
=
None
,
None
cmov_mnemonic
=
None
for
insn
in
block.insns:
mnemonic, op_str
=
insn.mnemonic, insn.op_str
if
mnemonic
=
=
'mov'
:
if
re.match(rf
'{key_reg}, 0x[0-9a-f]+'
, op_str):
tmp_key
=
int
(op_str.replace(f
'{key_reg}, '
, ''),
16
)
if
tmp_key
in
k2n_map:
key
=
tmp_key
elif
re.match(rf
'{sub_key_reg}, 0x[0-9a-f]+'
, op_str):
tmp_sub_key
=
int
(op_str.replace(f
'{sub_key_reg}, '
, ''),
16
)
if
tmp_sub_key
in
k2n_map:
[招生]科锐逆向工程师培训(2024年11月15日实地,远程教学同时开班, 第51期)
最后于 2022-1-14 15:19
被34r7hm4n编辑
,原因: