反简单的OLLVM CFF,一般有几大功能模块:
许多定制修改、增强型OLLVM CFF,无法用上述方案简单搞掂,本小节只考虑基础情形。参看
GhHei提到,IDA微码有个值域分析功能,对付基础情形时可用于e步骤。此功能有重大短板,许多非标准CFF非常复杂,值域分析不足以发现状态变量到有效块的映射,e步骤无法完全实施。
D810插件比IDA内置的值域分析强一些,它用MicroCodeInterpreter模拟执行分发器逻辑,将指定状态变量值作为输入,看分发器会把控制流导向哪个有效块。bluerust曾想把MicroCodeInterpreter抠出来单独用,后来发现大量微指令未实现,要补的话,工作量不小,他没有动手。但D810这个框架很强大,有志于反CFF时,可认真学习借鉴。
单靠IDA微码反CFF,基本是静态分析,有许多复杂情形处理不了。
Angr符号执行比MicroCodeInterpreter强大,许多时候无需显式找出状态变量,直接模拟执行,建立Block到Block的映射关系,大致相当于c至f步骤一把梭。Angr不利之处,较重型。
以前我只接触过D810、Angr,未接触过IDA内置的值域分析。看雪两篇给了使用此技术时的大多数工程细节,惟缺一个完整PoC。本小节提供完整PoC及小型测试样本,便于技术聚焦。测试样本是从原始样本抠取的,无法在实际环境中加载调试,但在IDA中测试反CFF足矣。
完整测试用例打包
在IDA 8.4.1/9.2中测试通过。样本so中的foo、bar、baz函数是目标函数。
get_state_2_block_id_dict()中用到值域分析。
上述代码只能对付简单的OLLVM CFF。作为反例,libgeiri.so中有许多函数非常复杂,上述代码对付不了。关于libgeiri.so,参看
创建: 2025-11-11 16:07
更新: 2025-11-11 20:40
创建: 2025-11-11 16:07
更新: 2025-11-11 20:40
a. 找出dispatcher
b. 找出有效块(真实块)
c. 找出状态变量state_var
d. 找出哪个Block对state_var设了什么值
e. 找出哪个Block入口处state_var等于什么值
f. 建立Block到Block的映射关系
g. 修改控制流
a. 找出dispatcher
b. 找出有效块(真实块)
c. 找出状态变量state_var
d. 找出哪个Block对state_var设了什么值
e. 找出哪个Block入口处state_var等于什么值
f. 建立Block到Block的映射关系
g. 修改控制流
OLLVM扁平化还原更优雅的解法 IDA Hex-Rays Microcode - GhHei [2025-10-06]
https://bbs.kanxue.com/thread-288691.htm
教你玩转ida反编译(修改ida microcode) - 孤恒 [2025-10-21]
https://bbs.kanxue.com/thread-288865.htm
OLLVM扁平化还原更优雅的解法 IDA Hex-Rays Microcode - GhHei [2025-10-06]
https://bbs.kanxue.com/thread-288691.htm
教你玩转ida反编译(修改ida microcode) - 孤恒 [2025-10-21]
https://bbs.kanxue.com/thread-288865.htm
https://scz.617.cn/python/202511111607.txt
https://scz.617.cn/python/202511111607.7z
https://scz.617.cn/python/202511111607.txt
https://scz.617.cn/python/202511111607.7z
import sys
import ida_hexrays
def set_vivl ( vivl, state_var ) :
t = state_var[0]
if t == ida_hexrays.mop_r :
vivl.set_reg( state_var[1], state_var[2] )
elif t == ida_hexrays.mop_S :
vivl.set_stkoff( state_var[1], state_var[2] )
else :
assert False
def get_dispatcher_id ( mba, threshold=2 ) :
dispatcher_id = None
max_in_num = -1
for i in range( mba.qty ) :
mblock = mba.get_mblock( i )
if mblock.type != ida_hexrays.BLT_2WAY :
continue
in_num = mblock.npred()
if in_num > max_in_num :
max_in_num = in_num
dispatcher_id = i
if max_in_num < threshold :
return None
return dispatcher_id
def get_var_from_mop ( mop ) :
if mop.t == ida_hexrays.mop_r :
return ( ida_hexrays.mop_r, mop.r, mop.size )
if mop.t == ida_hexrays.mop_S :
return ( ida_hexrays.mop_S, mop.s.off, mop.size )
if mop.t == ida_hexrays.mop_d :
if mop.d.r.t == ida_hexrays.mop_n :
return get_var_from_mop( mop.d.l )
return None
def get_state_var ( mba, dispatcher_id ) :
mblock = mba.get_mblock( dispatcher_id )
minsn = mblock.tail
if not minsn :
return None
if not (
ida_hexrays.is_mcode_jcond( minsn.opcode ) and
minsn.r.t == ida_hexrays.mop_n and
minsn.d.t == ida_hexrays.mop_b
) :
return None
print( f"Block {mblock.serial} : {minsn.ea:#x} {minsn.dstr()}" )
return get_var_from_mop( minsn.l )
def check_var ( mop, var ) :
ret = False
t = var[0]
size = var[2]
while True :
if mop.t != t or mop.size != size :
break
if mop.t == ida_hexrays.mop_r :
if mop.r == var[1] :
ret = True
elif t == ida_hexrays.mop_S :
if mop.s.off == var[1] :
ret = True
else :
assert False
break
return ret
def get_block_id_2_next_state_dict ( mba, dispatcher_id, state_var, block_id_2_next_state_dict ) :
dispatcher_mblock = mba.get_mblock( dispatcher_id )
for i in range( dispatcher_mblock.npred() ) :
block_id = dispatcher_mblock.pred( i )
mblock = mba.get_mblock( block_id )
if mblock.type != ida_hexrays.BLT_1WAY :
continue
minsn = mblock.tail
while minsn :
if (
minsn.opcode == ida_hexrays.m_mov and
minsn.l.t == ida_hexrays.mop_n and
check_var( minsn.d, state_var )
) :
block_id_2_next_state_dict[block_id] = minsn.l.nnn.value
break
minsn = minsn.prev
def get_block_id_2_next_state_dict_1 ( mba, dispatcher_id, state_var, block_id_2_next_state_dict ) :
dispatcher_mblock = mba.get_mblock( dispatcher_id )
for i in range( dispatcher_mblock.npred() ) :
block_id = dispatcher_mblock.pred( i )
mblock = mba.get_mblock( block_id )
if mblock.type != ida_hexrays.BLT_1WAY :
continue
minsn = mblock.tail
while minsn :
if (
minsn.opcode == ida_hexrays.m_mov and
minsn.l.t != ida_hexrays.mop_n and
check_var( minsn.d, state_var )
) :
prev_state_var = get_var_from_mop( minsn.l )
assert prev_state_var is not None
get_block_id_2_next_state_dict( mba, block_id, prev_state_var, block_id_2_next_state_dict )
break
minsn = minsn.prev
def get_block_id_2_next_state_dict_2 ( mba, dispatcher_id, state_var, block_id_2_next_state_dict ) :
dispatcher_mblock = mba.get_mblock( dispatcher_id )
minsn = dispatcher_mblock.tail.prev
while minsn :
if (
minsn.opcode == ida_hexrays.m_mov and
minsn.l.t != ida_hexrays.mop_n and
check_var( minsn.d, state_var )
) :
prev_state_var = get_var_from_mop( minsn.l )
assert prev_state_var is not None
get_block_id_2_next_state_dict( mba, dispatcher_id, prev_state_var, block_id_2_next_state_dict )
break
minsn = minsn.prev
def get_state_2_block_id_dict ( mba, dispatcher_id, state_var, state_2_block_id_dict ) :
dispatcher_mblock \
= mba.get_mblock( dispatcher_id )
pred_block_ids = [dispatcher_mblock.pred(i) for i in range(dispatcher_mblock.npred())]
block_id_begin = min( dispatcher_id+1, *pred_block_ids )
block_id_end = max( pred_block_ids ) + 1 + 1
for i in range( block_id_begin, block_id_end ) :
mblock = mba.get_mblock( i )
res = ida_hexrays.valrng_t()
vivl = ida_hexrays.vivl_t()
set_vivl( vivl, state_var )
ok = mblock.get_valranges( res, vivl, ida_hexrays.VR_AT_START )
if ok :
ok, v = res.cvt_to_single_value()
if ok and v not in state_2_block_id_dict :
state_2_block_id_dict[v] = mblock.serial
def add_new_goto ( mba, block_id, next_block_id ) :
mblock = mba.get_mblock( block_id )
new_mop = ida_hexrays.mop_t()
new_mop.t = ida_hexrays.mop_b
new_mop.b = next_block_id
new_mop.size = ida_hexrays.NOSIZE
new_goto = ida_hexrays.minsn_t( mblock.tail.ea )
new_goto.opcode = ida_hexrays.m_goto
new_goto.l = new_mop
mblock.insert_into_block( new_goto, mblock.tail )
def change_goto ( mba, block_id, next_block_id ) :
mblock = mba.get_mblock( block_id )
assert mblock.type == ida_hexrays.BLT_1WAY
if mblock.tail.opcode == ida_hexrays.m_goto :
old_next_block_id = mblock.tail.l.b
mblock.tail.l.b = next_block_id
else :
old_next_block_id = mblock.succset[0]
add_new_goto( mba, block_id, next_block_id )
return old_next_block_id
def modify_edge ( mba, block_id, old_next_block_id, next_block_id ) :
mblock = mba.get_mblock( block_id )
old_next_mblock = mba.get_mblock( old_next_block_id )
next_mblock = mba.get_mblock( next_block_id )
mblock.succset._del( old_next_block_id )
old_next_mblock.predset._del( block_id )
mblock.succset.push_back( next_block_id )
next_mblock.predset.push_back( block_id )
def unflatten ( mba, state_var, block_id_2_next_state_dict, state_2_block_id_dict ) :
ret = False
for block_id, next_state in sorted( block_id_2_next_state_dict.items() ) :
if next_state in state_2_block_id_dict :
next_block_id = state_2_block_id_dict[next_state]
print( f"Block {block_id:<2} -> Block {next_block_id}" )
old_next_block_id = change_goto( mba, block_id, next_block_id )
modify_edge( mba, block_id, old_next_block_id, next_block_id )
mba.mark_chains_dirty()
mba.verify( True )
ret = True
return ret
def process_one_cff_structure ( mba, dispatcher_id=None, state_var=None ) :
if dispatcher_id is None :
print( "Attempting to automatically find dispatcher..." )
dispatcher_id = get_dispatcher_id( mba )
if dispatcher_id is None :
print( "Could not find a candidate for the dispatcher block." )
return False
print( f"Dispatcher candidate found: Block {dispatcher_id}" )
else :
print( f"Using manually specified dispatcher: Block {dispatcher_id}" )
if state_var is None :
print( "Attempting to automatically find state variable..." )
state_var = get_state_var( mba, dispatcher_id )
if state_var is None :
print( "Could not identify the state variable." )
return False
print( f"State variable candidate found: {state_var}" )
else :
print( f"Using manually specified state variable: {state_var}" )
block_id_2_next_state_dict = {}
get_block_id_2_next_state_dict( mba, dispatcher_id, state_var, block_id_2_next_state_dict )
if not block_id_2_next_state_dict :
get_block_id_2_next_state_dict_1( mba, dispatcher_id, state_var, block_id_2_next_state_dict )
if not block_id_2_next_state_dict :
get_block_id_2_next_state_dict_2( mba, dispatcher_id, state_var, block_id_2_next_state_dict )
if not block_id_2_next_state_dict :
print( "Could not find block-to-next_state mappings. Aborting this pass." )
return False
for block_id, next_state in sorted( block_id_2_next_state_dict.items() ) :
print( f"Block {block_id:<2} -> State {next_state:#x}" )
state_2_block_id_dict = {}
get_state_2_block_id_dict( mba, dispatcher_id, state_var, state_2_block_id_dict )
if not state_2_block_id_dict :
print( "Could not find state-to-block_id mappings. Aborting this pass." )
return False
for state, block_id in sorted( state_2_block_id_dict.items() ) :
print( f"State 0x{state:<8x} -> Block {block_id}" )
return unflatten( mba, state_var, block_id_2_next_state_dict, state_2_block_id_dict )
def analyzer_main ( mba ) :
func_ea = mba.entry_ea
changes = 0
print( f"Automatic analysis for function {func_ea:#x} mba.qty {mba.qty} maturity {mba.maturity} ..." )
ok = process_one_cff_structure( mba )
if not ok :
print( f"Automatic analysis for function {func_ea:#x} mba.qty {mba.qty} maturity {mba.maturity} did not find a CFF structure or failed." )
else :
print( f"Automatic analysis for function {func_ea:#x} mba.qty {mba.qty} maturity {mba.maturity} completed." )
changes += 1
return changes
class CFF_Analyzer ( ida_hexrays.optblock_t ) :
def __init__ ( self ) :
super().__init__()
self.analyzed_functions = set()
def func ( self, *args ) :
mblock = args[0]
mba = mblock.mba
if mba.maturity not in ( ida_hexrays.MMAT_GLBOPT1, ida_hexrays.MMAT_GLBOPT2 ) :
if (
mblock.serial == 1 and
mba.maturity == ida_hexrays.MMAT_LOCOPT
) :
self.analyzed_functions = {
key for key in self.analyzed_functions
if not ( key[0] == mba.entry_ea and key[1] in ( ida_hexrays.MMAT_GLBOPT1, ida_hexrays.MMAT_GLBOPT2 ) )
}
return 0
func_ea = mba.entry_ea
analysis_key = ( func_ea, mba.maturity, mba.qty )
if analysis_key in self.analyzed_functions :
return 0
self.analyzed_functions.add( analysis_key )
changes = analyzer_main( mba )
return changes
ANALYZER_INSTANCE_NAME = "__cff_analyzer_instance__"
def install_handler () :
if hasattr( sys, ANALYZER_INSTANCE_NAME ) :
print( "Removing existing CFF Analyzer instance..." )
remove_handler()
print( "Installing new CFF Analyzer instance..." )
instance = CFF_Analyzer()
instance.install()
setattr( sys, ANALYZER_INSTANCE_NAME, instance )
print( "CFF Analyzer installed successfully." )
ida_hexrays.clear_cached_cfuncs()
def remove_handler () :
if hasattr( sys, ANALYZER_INSTANCE_NAME ) :
print( "Removing existing CFF Analyzer instance..." )
instance = getattr( sys, ANALYZER_INSTANCE_NAME )
instance.remove()
delattr( sys, ANALYZER_INSTANCE_NAME )
print( "CFF Analyzer removed." )
ida_hexrays.clear_cached_cfuncs()
else :
print( "No existing CFF Analyzer instance to remove." )
def main () :
if ida_hexrays.init_hexrays_plugin() :
install_handler()
else :
print( "Error: Hex-Rays is not available." )
if "__main__" == __name__ :
main()
import sys
import ida_hexrays
def set_vivl ( vivl, state_var ) :
t = state_var[0]
if t == ida_hexrays.mop_r :
vivl.set_reg( state_var[1], state_var[2] )
elif t == ida_hexrays.mop_S :
vivl.set_stkoff( state_var[1], state_var[2] )
else :
assert False
def get_dispatcher_id ( mba, threshold=2 ) :
dispatcher_id = None
max_in_num = -1
for i in range( mba.qty ) :
mblock = mba.get_mblock( i )
if mblock.type != ida_hexrays.BLT_2WAY :
continue
in_num = mblock.npred()
if in_num > max_in_num :
max_in_num = in_num
dispatcher_id = i
if max_in_num < threshold :
return None
return dispatcher_id
def get_var_from_mop ( mop ) :
if mop.t == ida_hexrays.mop_r :
return ( ida_hexrays.mop_r, mop.r, mop.size )
if mop.t == ida_hexrays.mop_S :
return ( ida_hexrays.mop_S, mop.s.off, mop.size )
if mop.t == ida_hexrays.mop_d :
if mop.d.r.t == ida_hexrays.mop_n :
return get_var_from_mop( mop.d.l )
return None
def get_state_var ( mba, dispatcher_id ) :
mblock = mba.get_mblock( dispatcher_id )
minsn = mblock.tail
if not minsn :
return None
if not (
ida_hexrays.is_mcode_jcond( minsn.opcode ) and
minsn.r.t == ida_hexrays.mop_n and
minsn.d.t == ida_hexrays.mop_b
) :
return None
print( f"Block {mblock.serial} : {minsn.ea:#x} {minsn.dstr()}" )
return get_var_from_mop( minsn.l )
def check_var ( mop, var ) :
ret = False
t = var[0]
size = var[2]
while True :
if mop.t != t or mop.size != size :
break
if mop.t == ida_hexrays.mop_r :
if mop.r == var[1] :
ret = True
elif t == ida_hexrays.mop_S :
if mop.s.off == var[1] :
ret = True
else :
assert False
break
return ret
def get_block_id_2_next_state_dict ( mba, dispatcher_id, state_var, block_id_2_next_state_dict ) :
dispatcher_mblock = mba.get_mblock( dispatcher_id )
for i in range( dispatcher_mblock.npred() ) :
block_id = dispatcher_mblock.pred( i )
mblock = mba.get_mblock( block_id )
if mblock.type != ida_hexrays.BLT_1WAY :
continue
minsn = mblock.tail
while minsn :
if (
minsn.opcode == ida_hexrays.m_mov and
minsn.l.t == ida_hexrays.mop_n and
check_var( minsn.d, state_var )
) :
block_id_2_next_state_dict[block_id] = minsn.l.nnn.value
break
minsn = minsn.prev
def get_block_id_2_next_state_dict_1 ( mba, dispatcher_id, state_var, block_id_2_next_state_dict ) :
dispatcher_mblock = mba.get_mblock( dispatcher_id )
for i in range( dispatcher_mblock.npred() ) :
block_id = dispatcher_mblock.pred( i )
mblock = mba.get_mblock( block_id )
if mblock.type != ida_hexrays.BLT_1WAY :
continue
minsn = mblock.tail
while minsn :
if (
minsn.opcode == ida_hexrays.m_mov and
minsn.l.t != ida_hexrays.mop_n and
check_var( minsn.d, state_var )
) :
prev_state_var = get_var_from_mop( minsn.l )
assert prev_state_var is not None
get_block_id_2_next_state_dict( mba, block_id, prev_state_var, block_id_2_next_state_dict )
break
minsn = minsn.prev
def get_block_id_2_next_state_dict_2 ( mba, dispatcher_id, state_var, block_id_2_next_state_dict ) :
dispatcher_mblock = mba.get_mblock( dispatcher_id )
minsn = dispatcher_mblock.tail.prev
while minsn :
if (
minsn.opcode == ida_hexrays.m_mov and
minsn.l.t != ida_hexrays.mop_n and
check_var( minsn.d, state_var )
) :
prev_state_var = get_var_from_mop( minsn.l )
assert prev_state_var is not None
get_block_id_2_next_state_dict( mba, dispatcher_id, prev_state_var, block_id_2_next_state_dict )
break
minsn = minsn.prev
def get_state_2_block_id_dict ( mba, dispatcher_id, state_var, state_2_block_id_dict ) :
dispatcher_mblock \
= mba.get_mblock( dispatcher_id )
pred_block_ids = [dispatcher_mblock.pred(i) for i in range(dispatcher_mblock.npred())]
block_id_begin = min( dispatcher_id+1, *pred_block_ids )
block_id_end = max( pred_block_ids ) + 1 + 1
for i in range( block_id_begin, block_id_end ) :
mblock = mba.get_mblock( i )
res = ida_hexrays.valrng_t()
vivl = ida_hexrays.vivl_t()
set_vivl( vivl, state_var )
ok = mblock.get_valranges( res, vivl, ida_hexrays.VR_AT_START )
if ok :
ok, v = res.cvt_to_single_value()
if ok and v not in state_2_block_id_dict :
state_2_block_id_dict[v] = mblock.serial
def add_new_goto ( mba, block_id, next_block_id ) :
[培训]Windows内核深度攻防:从Hook技术到Rootkit实战!