-
-
[原创]2026 年腾讯游戏安全初赛 PC方向
-
发表于: 1小时前 79
-
2026 年腾讯游戏安全初赛 PC方向
仅为本人题解,并非参考答案不能保证正确,请参照官方公布题解
〇、得分点
如何加载驱动
关掉 ACE 预启动,随便签一个泄露签名,即可加载驱动,稳定性极高,就蓝屏了两次。
Flag
flag{SHAD0WNT_HYPERVMX}
最短路径
DDDDDDSSDDDDWWDDSSSSSSSSAASSSSDD
//格式化为
RRRRRRDDRRRRUURRDDDDDDDDLLDDDDRR
地图
###########################
#S . . . . . .#.#. . . . .#
############# ####### ### #
#.#.#.#.#.#.#.#.#.#.#.#.#.#
############# ####### ### #
#. . . . .#.#. . . . .#.#.#
# ####### ############### #
#.#.#.#.#.#.#.#.#.#.#.#.#.#
# ####### ############### #
#.#.#. . . . . . . . .#.#.#
# ### ### ########### ### #
#.#.#.#.#.#.#.#.#.#.#.#.#.#
# ### ### ########### ### #
#.#.#.#.#.#.#. . .#.#.#.#.#
# ### ####### ### ####### #
#.#.#.#.#.#.#.#.#.#.#.#.#.#
# ### ####### ### ####### #
#.#.#. . . . .#.#.#.#. . .#
# ############### ### #####
#.#.#.#.#.#.#.#.#.#.#.#.#.#
# ############### ### #####
#. . .#.#. . .#.#.#.#.#.#.#
##### ### ### ### ### ### #
#.#.#.#.#.#.#.#.#.#.#.#.#.#
##### ### ### ### ### ### #
#. . . . .#.#.#.#. . . . E#
###########################
五个泄漏点
1. 两个对象名
①MazeMoveOK 事件
- 类型:命名事件
- 对象名:Global\MazeMoveOK
- 驱动侧实现:ZwOpenEvent + ZwSetEvent
- 含义:成功相关反馈槽位
②MazeMoveWall 事件
- 类型:命名事件
- 对象名:Global\MazeMoveWall
- 驱动侧实现:ZwOpenEvent + ZwSetEvent
- 含义:失败/撞墙相关反馈槽位
2. 两个 GUID
①GUID1 命名信号量
- 类型:命名信号量
- 对象名:
- Global{A7F3B2C1-9E4D-4C8A-B5D6-1F2E3A4B5C6D}
- 驱动侧实现:
- 解码对象名
- ObReferenceObjectByName
- KeReleaseSemaphore
- 含义:成功相关反馈槽位
②GUID2 命名信号量
- 类型:命名信号量
- 对象名:
- Global{B8E2C3D0-0F5A-5D9B-C6E7-2A3F4B5C6D7E}
- 驱动侧实现同上
- 含义:失败/撞墙相关反馈槽位
3. LastError
- 类型:线程用户态上下文回写
- 驱动侧关键函数:sub_140316ADF
- 关键行为:
- 写 +0x68 (TEB -> LastErrorValue)
- 写入值包括:
- 0xC0DE0001 -> ok
- 0xC0DE0000 -> wall
4. ZwSetInformationObject
每次 move 前先调用 SetHandleInformation(h, HANDLE_FLAG_PROTECT_FROM_CLOSE, 0)进行清零,在move 后调用GetHandleInformation(h, &flags)
if( flags & HANDLE_FLAG_PROTECT_FROM_CLOSE != 0)
handle_ok = true
else
handle_ok = False
- handle_ok = True
- 表示:成功
- handle_ok = False
- 表示:失败/撞墙
5. KeDelayExecutionThread
函数会通过 KUSER_SHARED_DATA 访问 TickCountLowDeprecated,并且在异或后被编码进入返回缓冲,可以在用户层恢复被驱动使用的这个 Tick 值
tick_xor = struct.unpack_from("<I", raw, 0)[0]
tick = tick_xor ^ 0xBAADF00D
然后计算
predicted_ms = (tick % 50) + 10
就是理论上这个线路被延时的毫秒数,然后我们可以计算一个 IO 请求的耗时
t0 = time.perf_counter()
DeviceIoControl(...)
time_ms = (time.perf_counter() - t0) * 1000.0
如果 predicted_ms 和 time_ms 很接近,就能说明本次触发到了该泄漏点,但是无法判断具体的结果。
- 很接近
- 表示:命中该泄漏点,应重试,直到命中前几个可明确结果泄漏点
- 不接近
- 表示:应观测其他泄漏点获取本次 move 的结果
总结
在二进制文件中放了大量的小混淆,导致不管是正向分析还是逆向分析,都不舒服,比如发现了某一个函数,发现 xref 不可用,因为上游的调用点有花指令,混淆等,导致了 IDA 不能正确分析,在驱动分析中尤其明显,但是由于驱动导入表暴露了大量的信息,所以都可以通过导入表入手,不完整的分析全流程,也能得到相关结论,如果赛题在导入函数上做手脚,比如动态解析,难度会更大。
很好本来都要提交了,感觉这个寻路脚本咋会这么慢,时间还不稳定,又重新用IDA看了一眼驱动,发现有随机的 Sleep 感觉大概率是没找齐全,然后就找到了之前没找到的两个泄漏点(但是他们没有影响我得到flag....)
一、赛题文件
文档:2026游戏安全技术竞赛-PC客户端安全-初赛.docx
#「宫殿」的验证机制并不寻常:系统由用户态控制台和内核驱动组成,驱动内部隐藏着一个加密迷阵,所有操作指令必须由控制台通过驱动接口下
# 发。表面上,系统不会对你的任何操作给出反馈——你无法直接判断每一步操作是成功推进,还是被防线拦截。
从上述文本能够判断是迷宫题目,但是应该不是简单的迷宫,应该无法通过传统的迷宫分析方法分析,需要完整的逆向,至少要找到文档中提到的:
# 某些异常现象暗示着隐藏的信息泄露。找到这些线索,你就能感知每一步的结果;发现得越多,破解效率越高。
二、应用层:ShadowGateApp.exe
并未发现有保护壳,直接运行得到
C:\Users\Euarno\Desktop\2026游戏安全技术竞赛-PC客户端安全-初赛>ShadowGateApp.exe
==============================================
Shadow Palace Gate - ACCESS DENIED
==============================================
ACE has intercepted Shadow's palace gate
system. A kernel driver hides an encrypted
maze inside. Navigate through it to extract
the credential for Shadow's internal network.
The palace gives NO feedback on whether
your moves succeed or hit a wall.
Or does it? The system is not as silent
as it seems. Five hidden flaws betray
the result of every move 鈥?but each move
exposes only one of them.
Hint: after each reset, the first five
successful moves reveal each flaw exactly
once, in a fixed order.
[*] Connecting to Shadow gate driver...
[+] Gate module online.
[*] Maze grid: 13x13, Entry=(0,0), Exit=(12,12)
Commands:
W/A/S/D - Navigate Up/Left/Down/Right
I/J/K/L - Navigate Up/Left/Down/Right (alt)
R - Reset to entry point
T - Show operation log (position hidden)
H - Show this help
Q / ESC - Abort mission
可以有一个基本的了解 13*13 的迷宫,“The palace gives NO feedback on whether your moves succeed or hit a wall.” 表面不会有反馈,“Five hidden flaws betray the result of every move ” 要通过五个侧信道得到迷宫的反馈,**“after each reset, the first five successful moves reveal each flaw exactly once, in a fixed order. ”**每次你按 R 重置后,前 5 次成功移动会依次触发五种不同的泄露机制,顺序是固定的。(这里其实还是有一点疑惑的,最终只找到了三类泄露方式,但也可以算作五个泄漏点)
字符串没有做混淆,在字符串表暴露了很多信息。结合字符串的交叉引用,有如下分析:
__int64 OpenShadowGateDevice()
{
__int64 result; // rax
DWORD LastError; // edi
//设备路径
result = (__int64)CreateFileW(L"\\\\.\\ShadowGate", 0xC0000000, 0, 0, 3u, 0x80u, 0);
if ( result == -1 )
{
LastError = GetLastError();
sub_140001010("[ERROR] Failed to open device '%ws'\n", L"\\\\.\\ShadowGate");
sub_140001010("[ERROR] Error code: %lu (0x%08lX)\n", LastError, LastError);
if ( LastError == 2 )
{
//要求我们应该提前加载好驱动 驱动没有签名 我的解决方案是找一个泄露签名 退出ACE预启动 即可加载
sub_140001010("[HINT] Driver not loaded. Use: sc create ShadowGate type=kernel binPath=<path>\\ShadowGateSys.sys\n");
sub_140001010("[HINT] Then: sc start ShadowGate\n");
return -1;
}
else
{
if ( LastError == 5 )
sub_140001010("[HINT] Run as Administrator.\n");
return -1;
}
}
return result;
}
虽然还没有分析驱动,但是应用创建了两个全局命名事件,名称强烈暗示它们分别对应移动成功和撞墙,即失败。
HANDLE CreateLeakEvents()
{
HANDLE result; // rax
hObject = CreateEventW(0, 1, 0, L"Global\\MazeMoveOK");
result = CreateEventW(0, 1, 0, L"Global\\MazeMoveWall");
qword_140005688 = result;
return result;
}
通信方面使用了最基本的 IO 通讯,分析 DeviceIoControl 的交叉引用可以得到所有的 IO 码
if ( QueryMaza(v6, &v9) )
printf(
"[*] Maze grid: %ux%u, Entry=(%u,%u), Exit=(%u,%u)\n",
(_DWORD)v9,
DWORD1(v9),
DWORD2(v9),
HIDWORD(v9),
v10,
HIDWORD(v10));
BOOL __fastcall QueryMaza(__int64 a1, void *lpOutBuffer)
{
DWORD BytesReturned; // [rsp+40h] [rbp-18h] BYREF
BytesReturned = 0;
return DeviceIoControl(hDevice, 0x8001200C, 0, 0, lpOutBuffer, 0x18u, &BytesReturned, 0);
}
显然 0x8001200C 用于查询迷宫信息,接下来就能发现 main 函数有大量的花指令了,因为正常的 R 等逻辑的处理代码都不存在
只需要把
push rcx
ret
改为
jmp rcx
伪代码就重建好了,然后依旧是出题人的小礼物啊
v8 -= 27;
switch ( v8 )
这里会有垂落,处理的是大小写 R 的情况
case '7':
case 'W':
ResetMaze(_RCX);
v6 = 0;
dword_140005668 = 0;
v22[0] = 0;
v7 = 0;
printf("[*] Reset to entry point.\n");
continue;
BOOL ResetMaze()
{
DWORD BytesReturned; // [rsp+40h] [rbp-18h] BYREF
BytesReturned = 0;
return DeviceIoControl(hDevice, 0x80012008, 0, 0, 0, 0, &BytesReturned, 0);
}
显然 0x80012008 用于重新开始,那剩下的操作码肯定是操作迷宫了的
| IOCTL | 作用 |
|---|---|
| 0x80012004 | 移动/核心交互 |
| 0x80012008 | 重置到起点 |
| 0x8001200C | 查询迷宫信息 |
紧接着还要详细分析,相关的按键被映射为了十六进制整数,如下
case '&':
case '/':
case 'F':
case 'O':
LOBYTE(direct) = 0x30; //A J
v12 = 76;
goto LABEL_11;
case ')':
case '1':
case 'I':
case 'Q':
LOBYTE(direct) = 0x40;// D L
__asm { rcl al, cl }
v12 = 82;
goto LABEL_11;
case '.':
case '<':
case 'N':
case '\\':
LOBYTE(direct) = 0x10; //W I
v12 = 85;
goto LABEL_11;
case '0':
case '8':
case 'P':
case 'X':
LOBYTE(direct) = 0x20; // S K
v12 = 68;
LABEL_11:
if ( v6 < 255 )
{
v22[v7] = v12;
++v6;
++v7;
if ( (unsigned int)v6 >= 0x100 )
sub_140001E88(_RCX, direct);
v22[v7] = 0;
}
v14 = (unsigned int)dword_140005668;
memset(v21, 0, sizeof(v21));
v18 = 0;
++dword_140005668;
v15 = TryMove(hDevice, direct, v14, v21, &v18);
TryMove 对于路径的打包如下
__int64 v16;
unsigned int v17;
v16 = (unsigned __int8)v9; //只用了第一字节?
v17 = a3 ^ v9 ^ 0xDEAD1337;
DeviceIoControl(a1, 0x80012004, &v16, 0xCu, v14, 0x84u, &v15, 0) //12字节大小数据包 从v16开始写入
其中,具体的加密逻辑是这样
__int64 __fastcall TryMove(void *a1, __int64 _RDX, int op_count, _BYTE *a4, _DWORD *a5)
{
//rdx是代表方向的十六进制整数 op_count是一个计数器,代表次数
v6 = _RDX ^ 0xFA;
v9 = (unsigned __int8)(_RDX | (8 * v6));
v16 = (unsigned __int8)v9; //结构体开始
v17 = op_count ^ v9 ^ 0xDEAD1337
if ( DeviceIoControl(a1, 0x80012004, &v16, 0xCu, v14, 0x84u, &v15, 0) )
...
}
足够我们重建结构体了,但是伪代码有一个问题,驱动如何校验 v17 的正确性呢?切换到汇编,根据 DeviceIoControl 的参数可以知道
lea r8, [r11-30h] ; lpInBuffer
然后看汇编层面是如何写入 [r11-30h] 的
mov [r11-30h], al
mov [r11-2Ch], r8d
mov [r11-28h], eax
统一成 buf 的偏移就是
[buf+0] 写入 1 字节
[buf+4] 写入 4 字节
[buf+8] 写入 4 字节
struct {
uint8_t field0;
uint8_t pad[3];
uint32_t field4;
uint32_t field8;
};
也就能重建结构体为
typedef struct _MOVE_REQ {
uint8_t encoded_dir; //映射为了十六进制整数
uint8_t pad[3]; //数据对齐
uint32_t op_count; //操作次数
uint32_t checksum; //校验点
} MOVE_REQ; //一共12字节
接下来是输入 T 的问题
case '9':
case 'Y':
sub_140001010(asc_1400038E0, (unsigned int)v6);
v17 = v22;
if ( v6 <= 0 )
v17 = "(none)";
sub_140001010(asc_140003908, v17);
continue;
命令 T的作用并不是显示当前位置,而是输出应用层维护的成功移动日志。主循环中,每当一次移动被认为成功时,程序都会将对应方向字符 L R U D 追加到缓冲区 v22 中,而 T/t分支则负责打印当前操作计数及该字符串,也就是这里只能打印成功的路径。
先玩一下,也就是可以借助前五个正确路径找到五个泄漏点,因为提到了前五次成功顺次泄露,会在后边起关键作用
- D D D D D T
程序输出:
- Sequence: RRRRR
三、驱动层:ShadowGateSys.sys
也是见到没有壳子轻松达到 4 MB 的驱动文件了,混淆或者花指令估计是跑不了了,先迎接出题人的小礼物,重建 sub_140003208
0x140003244 call sub_1400018A0
patch 为
mov eax, 0
伪代码重建成功
__int64 __fastcall sub_140003208(struct _DRIVER_OBJECT *a1)
{
NTSTATUS v3; // eax
NTSTATUS v4; // edi
struct _UNICODE_STRING DestinationString; // [rsp+40h] [rbp-18h] BYREF
P = (PVOID)ExAllocatePool2(64, 472, 1702519117);
if ( !P )
return 3221225626LL;
KeInitializeSpinLock(&SpinLock);
KeInitializeSpinLock(&qword_1400050D0);
((void (*)(void))loc_140001E60)();
DestinationString = 0;
RtlInitUnicodeString(&DestinationString, L"\\Device\\ShadowGate");
v3 = IoCreateDevice(a1, 0, &DestinationString, 0x22u, 0x100u, 0, &DeviceObject);
v4 = v3;
if ( v3 < 0 )
{
_mm_lfence();
ExFreePoolWithTag(P, 0x657A614Du);
LABEL_5:
P = 0;
return (unsigned int)v4;
}
RtlInitUnicodeString(&SymbolicLinkName, L"\\??\\ShadowGate");
v4 = IoCreateSymbolicLink(&SymbolicLinkName, &DestinationString);
if ( v4 < 0 )
{
_mm_lfence();
IoDeleteDevice(DeviceObject);
ExFreePoolWithTag(P, 0x657A614Du);
DeviceObject = 0;
goto LABEL_5;
}
a1->DriverUnload = (PDRIVER_UNLOAD)sub_140001840;
a1->MajorFunction[0] = (PDRIVER_DISPATCH)sub_1400014B0;
a1->MajorFunction[2] = (PDRIVER_DISPATCH)sub_140001410;
a1->MajorFunction[14] = (PDRIVER_DISPATCH)sub_140001540;
DeviceObject->Flags |= 4u;
DeviceObject->Flags &= ~0x80u;
return 0;
}
接下来寻5个泄漏点,刚才在应用层找到的事件,先看字符串定位过去,找到第一个泄漏点
① 事件泄露
int __fastcall sub_1400022B0(__int64 a1, int a2)
{
int result; // eax
const WCHAR *v4; // rdx
struct _UNICODE_STRING DestinationString; // [rsp+20h] [rbp-40h] BYREF
struct _OBJECT_ATTRIBUTES ObjectAttributes; // [rsp+30h] [rbp-30h] BYREF
void *EventHandle; // [rsp+80h] [rbp+20h] BYREF
result = (unsigned __int8)dword_140005000;
if ( (((_BYTE)dword_140005000 * ((_BYTE)dword_140005000 - 1)) & 1) == 0 )
{
if ( !a2 || (v4 = L"\\BaseNamedObjects\\MazeMoveWall", a2 == 2) )
v4 = L"\\BaseNamedObjects\\MazeMoveOK";
RtlInitUnicodeString(&DestinationString, v4);
ObjectAttributes.Length = 48;
ObjectAttributes.ObjectName = &DestinationString;
ObjectAttributes.RootDirectory = 0;
ObjectAttributes.Attributes = 576;
EventHandle = 0;
*(_OWORD *)&ObjectAttributes.SecurityDescriptor = 0;
result = ZwOpenEvent(&EventHandle, 2u, &ObjectAttributes);
if ( result >= 0 )
{
ZwSetEvent(EventHandle, 0);
return ZwClose(EventHandle);
}
}
return result;
}
该函数根据传入状态参数,选择 **\BaseNamedObjects\MazeMoveOK **或 \BaseNamedObjects\MazeMoveWall
随后通过 ZwOpenEvent 打开对应命名事件,并调用 ZwSetEvent 将其置位,最后关闭句柄。
由于应用层事先创建了同名全局事件 Global\MazeMoveOK 与 Global\MazeMoveWall
因此用户态可在每次发送移动请求后轮询这两个事件,从而判断本次移动是成功推进还是撞墙失败。
② GUID泄露
然后在导入表发现了函数 KeReleaseSemaphore 和 ObReferenceObjectByName 跟到,sub_140319A37,有明显的选择和解密流程
if ( !v5 || (v9 = &unk_1400041E0, v5 == 2) )
v9 = &unk_140004160;
v10 = 57;
v11 = v9 - (_BYTE *)v18;
v12 = v18;
do
{
*v12 = *(WCHAR *)((char *)v12 + v11) ^ 0x4B; //解密
++v12;
--v10;
}
while ( v10 );
}
RtlInitUnicodeString((PUNICODE_STRING)v17, v18); //构建字符串
ObReferenceObjectByName(v17, 64, 0);
就要看看 unk_1400041E0 和 unk_140004160 对应的到底是什么字符串了
def decode_guid_name(words):
return ''.join(chr(w ^ 0x4B) for w in words)
unk_140004160 = [0x0017, 0x0009, 0x002A, 0x0038, 0x002E, 0x0005, 0x002A, 0x0026, 0x002E, 0x002F, 0x0004, 0x0029, 0x0021, 0x002E, 0x0028, 0x003F, 0x0038, 0x0017, 0x0030, 0x000A, 0x007C, 0x000D, 0x0078, 0x0009, 0x0079, 0x0008, 0x007A, 0x0066, 0x0072, 0x000E, 0x007F, 0x000F, 0x0066, 0x007F, 0x0008, 0x0073, 0x000A, 0x0066, 0x0009, 0x007E, 0x000F, 0x007D, 0x0066, 0x007A, 0x000D, 0x0079, 0x000E, 0x0078, 0x000A, 0x007F, 0x0009, 0x007E, 0x0008, 0x007D, 0x000F, 0x0036, 0x004B, 0x0000, 0x0000, 0x0000, 0x0000, 0x0000, 0x0000, 0x0000]
unk_1400041E0 = [0x0017, 0x0009, 0x002A, 0x0038, 0x002E, 0x0005, 0x002A, 0x0026, 0x002E, 0x002F, 0x0004, 0x0029, 0x0021, 0x002E, 0x0028, 0x003F, 0x0038, 0x0017, 0x0030, 0x0009, 0x0073, 0x000E, 0x0079, 0x0008, 0x0078, 0x000F, 0x007B, 0x0066, 0x007B, 0x000D, 0x007E, 0x000A, 0x0066, 0x007E, 0x000F, 0x0072, 0x0009, 0x0066, 0x0008, 0x007D, 0x000E, 0x007C, 0x0066, 0x0079, 0x000A, 0x0078, 0x000D, 0x007F, 0x0009, 0x007E, 0x0008, 0x007D, 0x000F, 0x007C, 0x000E, 0x0036, 0x004B, 0x0000, 0x0000, 0x0000, 0x000E, 0x0033, 0x0018, 0x002E]
print(decode_guid_name(unk_140004160))
print(decode_guid_name(unk_1400041E0))
得到输出,也就找到了第二个泄漏点,
\BaseNamedObjects\{A7F3B2C1-9E4D-4C8A-B5D6-1F2E3A4B5C6D}KKKKKKK
\BaseNamedObjects\{B8E2C3D0-0F5A-5D9B-C6E7-2A3F4B5C6D7E}KKKExSe
//手动去尾巴ing
\BaseNamedObjects\{A7F3B2C1-9E4D-4C8A-B5D6-1F2E3A4B5C6D}
\BaseNamedObjects\{B8E2C3D0-0F5A-5D9B-C6E7-2A3F4B5C6D7E}
但是暂时不能确定哪个是撞墙,哪个是成功,要具体测试一下
#include <windows.h>
#include <iostream>
#include <string>
#include <vector>
#include <cstdint>
static const wchar_t* DEVICE_NAME = L"\\\\.\\ShadowGate";
static const wchar_t* GUID1_NAME = L"Global\\{A7F3B2C1-9E4D-4C8A-B5D6-1F2E3A4B5C6D}";
static const wchar_t* GUID2_NAME = L"Global\\{B8E2C3D0-0F5A-5D9B-C6E7-2A3F4B5C6D7E}";
static const DWORD IOCTL_MOVE = 0x80012004;
static const DWORD IOCTL_RESET = 0x80012008;
static const DWORD IOCTL_QUERY = 0x8001200C;
#pragma pack(push, 1)
struct MOVE_REQ {
uint8_t encoded_dir;
uint8_t pad[3];
uint32_t op_count;
uint32_t checksum;
};
#pragma pack(pop)
static uint8_t ror8(uint8_t x, int n) {
return static_cast<uint8_t>((x >> n) | (x << (8 - n)));
}
static uint8_t encode_dir(uint8_t move_code) {
return ror8(static_cast<uint8_t>(move_code ^ 0x5A), 5);
}
static uint8_t move_code_from_char(char ch) {
switch (ch) {
case 'W': case 'w': return 0x10;
case 'S': case 's': return 0x20;
case 'A': case 'a': return 0x30;
case 'D': case 'd': return 0x40;
default: return 0;
}
}
static void drain_semaphore(HANDLE hSem) {
while (WaitForSingleObject(hSem, 0) == WAIT_OBJECT_0) {
}
}
static bool poll_semaphore(HANDLE hSem) {
DWORD rc = WaitForSingleObject(hSem, 0);
return rc == WAIT_OBJECT_0;
}
static bool reset_maze(HANDLE hDev) {
DWORD bytesReturned = 0;
return DeviceIoControl(
hDev,
IOCTL_RESET,
nullptr, 0,
nullptr, 0,
&bytesReturned,
nullptr
) != FALSE;
}
static bool move_once(HANDLE hDev, char ch, uint32_t opCount) {
uint8_t mc = move_code_from_char(ch);
if (!mc) {
std::cerr << "invalid move char: " << ch << "\n";
return false;
}
MOVE_REQ req{};
req.encoded_dir = encode_dir(mc);
req.op_count = opCount;
req.checksum = req.encoded_dir ^ opCount ^ 0xDEAD1337u;
uint8_t outbuf[0x84] = {};
DWORD bytesReturned = 0;
return DeviceIoControl(
hDev,
IOCTL_MOVE,
&req,
sizeof(req),
outbuf,
sizeof(outbuf),
&bytesReturned,
nullptr
) != FALSE;
}
int wmain(int argc, wchar_t* argv[]) {
std::string path = "DDDDD";
if (argc >= 2) {
std::wstring ws = argv[1];
path.assign(ws.begin(), ws.end());
}
HANDLE hDev = CreateFileW(
DEVICE_NAME,
GENERIC_READ | GENERIC_WRITE,
0,
nullptr,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL,
nullptr
);
if (hDev == INVALID_HANDLE_VALUE) {
std::cerr << "CreateFileW failed, gle=0x" << std::hex << GetLastError() << "\n";
return 1;
}
HANDLE hGuid1 = CreateSemaphoreW(nullptr, 0, 0x7fffffff, GUID1_NAME);
HANDLE hGuid2 = CreateSemaphoreW(nullptr, 0, 0x7fffffff, GUID2_NAME);
if (!hGuid1 || !hGuid2) {
std::cerr << "CreateSemaphoreW failed, gle=0x" << std::hex << GetLastError() <<
"\n";
CloseHandle(hDev);
return 1;
}
if (!reset_maze(hDev)) {
std::cerr << "reset failed, gle=0x" << std::hex << GetLastError() << "\n";
CloseHandle(hGuid1);
CloseHandle(hGuid2);
CloseHandle(hDev);
return 1;
}
uint32_t opCount = 0;
std::cout << "Testing path: " << path << "\n";
for (size_t i = 0; i < path.size(); ++i) {
drain_semaphore(hGuid1);
drain_semaphore(hGuid2);
char ch = path[i];
if (!move_once(hDev, ch, opCount)) {
std::cerr << "move " << ch << " failed at step " << i
<< ", gle=0x" << std::hex << GetLastError() << "\n";
break;
}
bool g1 = poll_semaphore(hGuid1);
bool g2 = poll_semaphore(hGuid2);
std::cout << "step " << (i + 1)
<< " move=" << ch
<< " opCount=" << std::dec << opCount
<< " guid1=" << (g1 ? "triggered" : "no")
<< " guid2=" << (g2 ? "triggered" : "no")
<< "\n";
++opCount;
}
CloseHandle(hGuid1);
CloseHandle(hGuid2);
CloseHandle(hDev);
return 0;
}
测试 DDDDD 得到
Testing path: DDDDD
step 1 move=D opCount=0 guid1=no guid2=no
step 2 move=D opCount=1 guid1=triggered guid2=no
step 3 move=D opCount=2 guid1=no guid2=no
step 4 move=D opCount=3 guid1=no guid2=no
step 5 move=D opCount=4 guid1=no guid2=no
那么路径正确时候会触发 Global\{A7F3B2C1-9E4D-4C8A-B5D6-1F2E3A4B5C6D}
#Global\{A7F3B2C1-9E4D-4C8A-B5D6-1F2E3A4B5C6D} -> 成功信号量
#Global\{B8E2C3D0-0F5A-5D9B-C6E7-2A3F4B5C6D7E} -> 撞墙信号量
③ TEB LastError泄露
还是要看导入表
- PsGetCurrentProcessId
- PsGetCurrentThreadId
- PsLookupProcessByProcessId
- PsLookupThreadByThreadId
- KeStackAttachProcess
- KeUnstackDetachProcess
- PsGetProcessPeb
- ZwQueryVirtualMemory
首先想到的可能是调试状态,或者 LastError,对于 LastError 的话,尝试找找有没有类似的写入逻辑
#结构:TEB -> LastErrorValue (DWORD)
#偏移:
#x86 (32 位):TEB + 0x34
#x64 (64 位):TEB + 0x68
int __fastcall sub_140316ADF(__int64 _RCX, int a2)
{
__int16 _AX; // ax
unsigned __int64 v4; // rax
__int64 v6; // rbx
void *v7; // rcx
__int64 v8; // rax
int *v9; // rsi
int v10; // ebx
__int64 v12; // [rsp-20h] [rbp-78h] BYREF
PEPROCESS Process; // [rsp+0h] [rbp-58h] BYREF
PVOID Object; // [rsp+8h] [rbp-50h] BYREF
struct _KAPC_STATE ApcState; // [rsp+10h] [rbp-48h] BYREF
_AX = 0;
__asm { rcl ax, cl }
v4 = (unsigned __int64)&v12 ^ _security_cookie;
v6 = _RCX;
v7 = *(void **)(_RCX + 464);
if ( v7 )
{
if ( *(_QWORD *)(v6 + 456) )
{
if ( qword_140005080 )
{
Object = 0;
LODWORD(v4) = PsLookupThreadByThreadId(v7, (PETHREAD *)&Object);
if ( (v4 & 0x80000000) == 0LL )
{
Process = 0;
if ( PsLookupProcessByProcessId(*(HANDLE *)(v6 + 456), &Process) >= 0 )
{
KeStackAttachProcess(Process, &ApcState);
v8 = qword_140005080(Object); //qword_140005080 是 PsGetThreadTeb v8是Teb了
if ( v8 )
{
v9 = (int *)(v8 + 0x68); //这里取出LastError的地址
if ( (((_BYTE)dword_140005000 * ((_BYTE)dword_140005000 - 1)) & 1) != 0 )
{
v10 = 0xDEADDEAD; //1
}
else if ( a2 )
{
v10 = 0xC0DE0002; //2
if ( a2 != 2 )
v10 = 0xC0DE0000; //3
}
else
{
v10 = 0xC0DE0001; //4
}
ProbeForWrite(v9, 4u, 4u);
*v9 = v10; //这里对LaseError做了覆写
}
KeUnstackDetachProcess(&ApcState);
ObfDereferenceObject(Process);
}
LODWORD(v4) = ObfDereferenceObject(Object);
}
}
}
}
return v4;
}
显然可以看到
v9 = (int *)(v8 + 0x68); //这里取出LastError的地址
*v9 = v10; //这里对LaseError做了覆写
那么 LastError 应该是下一个泄漏点了
| Code | 含义 |
|---|---|
| 0xC0DE0001 | ok |
| 0xC0DE0002 | 到达终点? |
| 0xC0DE0000 | wall |
| 0xDEADDEAD | 干扰值 |
④ ZwSetInformationObject
刚才我们提到 qword_140005080 是 PsGetThreadTeb ,相关的交叉引用还有一个函数
void sub_14031857E()
{
__int64 v0; // rax
_QWORD *v1; // rbx
if ( PsGetThreadTeb )
{
if ( qword_140005090 )
{
v0 = _guard_dispatch_icall_fptr(); // Call PsGetThreadTeb
if ( v0 )
{
v1 = (_QWORD *)(v0 + 0x1748); //TEB + 0x1748
ProbeForRead((volatile void *)(v0 + 0x1748), 8u, 8u);
if ( *v1 ) //如果TEB + 0x1748不为0
_guard_dispatch_icall_fptr(); // Call qword_140005090
}
}
}
}
1403185EF lea rbx, [rax+1748h] //rbx = TEB + 0x1748
14031861E mov rcx, rbx //ProbeForRead(TEB+0x1748, 8, 8)
140318621 call cs:ProbeForRead
140318627 mov r10, [rbx] //读出这个位置的 8 字节值 r10 = *(QWORD*)(TEB+0x1748)
140318630 test r10, r10 //如果这个值是 0
140318638 jz loc_140318758
1403186F6 mov [rsp+38h+arg_10], 0
140318701 mov [rsp+38h+arg_11], r8b
14031871D lea r8, [rsp+38h+arg_10] //r8 = &2_byte_buffer
140318710 mov r9d, 2 //r9d = 2
14031872B mov rcx, r10 // rcx = *(QWORD*)(TEB+0x1748)
通过汇编可以分析出,
qword_140005090(
*(QWORD*)(TEB + 0x1748),
4,
&two_byte_buf,
2
);
这还说啥了,如果不动态调试的话估计是没戏了,然后问 GPT 能不能猜测一下这是哪个函数?
为什么我把它解释成 ZwSetInformationObject
不是只凭感觉,而是因为参数形状正好匹配:
- HANDLE
- OBJECT_INFORMATION_CLASS = 4
- PVOID buffer
- ULONG length = 2
再结合动态实验:
- 把 TEB+0x1748 预置成事件句柄
- move 后这个句柄的 HANDLE_FLAG_PROTECT_FROM_CLOSE 会变化
所以这才进一步把它收敛成:
ZwSetInformationObject(handle, ObjectHandleFlagInformation, &info, 2)
查阅相关资料
NTSYSCALLAPI
NTSTATUS
NTAPI
NtSetInformationObject(
_In_ HANDLE Handle,
_In_ OBJECT_INFORMATION_CLASS ObjectInformationClass,
_In_reads_bytes_(ObjectInformationLength) PVOID ObjectInformation,
_In_ ULONG ObjectInformationLength // 为2
);
#endif
#endif
typedef struct _OBJECT_HANDLE_FLAG_INFORMATION {
BOOLEAN Inherit;
BOOLEAN ProtectFromClose;
} OBJECT_HANDLE_FLAG_INFORMATION;
然后需要确定 ZwSetInformationObject 提供的什么信号墙,什么信号是通路?可以先创建一个事件句柄 slot4_probe,把这个句柄值写到当前线程 TEB + 0x1748,每次 move 前先调用 SetHandleInformation(h, HANDLE_FLAG_PROTECT_FROM_CLOSE, 0)进行清零,在move 后调用GetHandleInformation(h, &flags),因为题目提到,reset 后前五次成功步按顺序泄露
if( flags & HANDLE_FLAG_PROTECT_FROM_CLOSE != 0)
handle_ok = true
else
handle_ok = False
并且稳定观测到(我们测试到了前几步都是D,并且可以通过回环 DAD 走法来增加正确步骤的次数)
- DAD + A
- handle_ok = True
- DAD + W
- handle_ok = False
⑤ KeDelayExecutionThread
这个的发现要归功于提交之前的几次测试,我感觉这个寻路没有道理这么慢,内核很可能有随机的,甚至是故意的延时,发现导入了函数 KeDelayExecutionThread
void __fastcall sub_1400026B4(unsigned int *a1)
{
sub_140002038(a1);
sub_140002388();
JUMPOUT(0x140001FF0LL);
}
void __fastcall sub_140002038(unsigned int *a1)
{
_BYTE *v1; // rdx
__int64 v2; // r8
unsigned int v3; // eax
if ( a1 )
{
v1 = a1 + 1;
v2 = 56;
//MEMORY[0xFFFFF78000000320] == KUSER_SHARED_DATA + 0x320 == TickCountLowDeprecated
v3 = TickCountLowDeprecated ^ 0xBAADF00D;
*a1 = TickCountLowDeprecated ^ 0xBAADF00D; // *(DWORD*)a1 = TickCount ^ 0xBAADF00D;
// drive 用来算 delay 的那个 TickCount 同时也被编码写进了返回缓冲
do
{
v3 = 1103515245 * v3 + 12345;
*v1++ = BYTE2(v3);
--v2;
}
while ( v2 );
}
}
NTSTATUS sub_140002388()
{
union _LARGE_INTEGER Interval; // [rsp+30h] [rbp+8h] BYREF
Interval.QuadPart = -10000LL * (TickCountLowDeprecated % 50u + 10); //转换到毫秒 10~59ms
return KeDelayExecutionThread(0, 0, &Interval);
}
函数会通过 KUSER_SHARED_DATA 访问 TickCountLowDeprecated,并且在异或后被编码进入返回缓冲,可以在用户层恢复被驱动使用的这个 Tick 值
tick_xor = struct.unpack_from("<I", raw, 0)[0]
tick = tick_xor ^ 0xBAADF00D
然后计算
predicted_ms = (tick % 50) + 10
就是理论上这个线路被延时的毫秒数,然后我们可以计算一个 IO 请求的耗时
t0 = time.perf_counter()
DeviceIoControl(...)
time_ms = (time.perf_counter() - t0) * 1000.0
如果 predicted_ms 和 time_ms 很接近,就能说明本次触发到了该泄漏点,但是更进一步,好像没有合适的办法确定到底是撞墙了还是成功了(从现有代码逻辑看是这样,向上的交叉引用又追不过去),其实也好办,我们能够观测到是不是触发了这个泄漏点,当观测到的时候,直接重试就好了,依赖前四个泄漏点做路线判断。
四、寻路算法并获取 Flag
算法如下
import argparse
import collections
import ctypes
from ctypes import wintypes
import struct
import sys
import time
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
ntdll = ctypes.WinDLL("ntdll")
GENERIC_READ = 0x80000000
GENERIC_WRITE = 0x40000000
OPEN_EXISTING = 3
FILE_ATTRIBUTE_NORMAL = 0x80
INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
WAIT_OBJECT_0 = 0x00000000
WAIT_TIMEOUT = 0x00000102
HANDLE_FLAG_PROTECT_FROM_CLOSE = 0x00000002
IOCTL_MOVE = 0x80012004
IOCTL_RESET = 0x80012008
IOCTL_QUERY = 0x8001200C
WIN_MAGIC = 0x57494E21
LEAK_WALL = 0xC0DE0000
LEAK_OK = 0xC0DE0001
LEAK_EXIT = 0xC0DE0002
LEAK_POISON = 0xDEADDEAD
DEVICE_NAME = r"\\.\ShadowGate"
EVENT_OK_NAME = r"Global\MazeMoveOK"
EVENT_WALL_NAME = r"Global\MazeMoveWall"
SEMAPHORE_OK_GUID = "{A7F3B2C1-9E4D-4C8A-B5D6-1F2E3A4B5C6D}"
SEMAPHORE_WALL_GUID = "{B8E2C3D0-0F5A-5D9B-C6E7-2A3F4B5C6D7E}"
DIRS = {
"W": (0, -1),
"A": (-1, 0),
"S": (0, 1),
"D": (1, 0),
}
INVERSE = {"W": "S", "S": "W", "A": "D", "D": "A"}
MOVE_CODE = {"W": 0x10, "A": 0x30, "S": 0x20, "D": 0x40}
kernel32.CreateFileW.argtypes = [
wintypes.LPCWSTR,
wintypes.DWORD,
wintypes.DWORD,
wintypes.LPVOID,
wintypes.DWORD,
wintypes.DWORD,
wintypes.HANDLE,
]
kernel32.CreateFileW.restype = wintypes.HANDLE
kernel32.DeviceIoControl.argtypes = [
wintypes.HANDLE,
wintypes.DWORD,
wintypes.LPVOID,
wintypes.DWORD,
wintypes.LPVOID,
wintypes.DWORD,
ctypes.POINTER(wintypes.DWORD),
wintypes.LPVOID,
]
kernel32.DeviceIoControl.restype = wintypes.BOOL
kernel32.CreateEventW.argtypes = [wintypes.LPVOID, wintypes.BOOL, wintypes.BOOL, wintypes.LPCWSTR]
kernel32.CreateEventW.restype = wintypes.HANDLE
kernel32.CreateSemaphoreW.argtypes = [
wintypes.LPVOID,
wintypes.LONG,
wintypes.LONG,
wintypes.LPCWSTR,
]
kernel32.CreateSemaphoreW.restype = wintypes.HANDLE
kernel32.WaitForSingleObject.argtypes = [wintypes.HANDLE, wintypes.DWORD]
kernel32.WaitForSingleObject.restype = wintypes.DWORD
kernel32.ResetEvent.argtypes = [wintypes.HANDLE]
kernel32.ResetEvent.restype = wintypes.BOOL
kernel32.GetCurrentThread.argtypes = []
kernel32.GetCurrentThread.restype = wintypes.HANDLE
kernel32.GetHandleInformation.argtypes = [wintypes.HANDLE, ctypes.POINTER(wintypes.DWORD)]
kernel32.GetHandleInformation.restype = wintypes.BOOL
kernel32.SetHandleInformation.argtypes = [wintypes.HANDLE, wintypes.DWORD, wintypes.DWORD]
kernel32.SetHandleInformation.restype = wintypes.BOOL
kernel32.CloseHandle.argtypes = [wintypes.HANDLE]
kernel32.CloseHandle.restype = wintypes.BOOL
kernel32.SetLastError.argtypes = [wintypes.DWORD]
kernel32.SetLastError.restype = None
class CLIENT_ID(ctypes.Structure):
_fields_ = [("UniqueProcess", wintypes.HANDLE), ("UniqueThread", wintypes.HANDLE)]
class THREAD_BASIC_INFORMATION(ctypes.Structure):
_fields_ = [
("ExitStatus", wintypes.LONG),
("TebBaseAddress", wintypes.LPVOID),
("ClientId", CLIENT_ID),
("AffinityMask", ctypes.c_size_t),
("Priority", wintypes.LONG),
("BasePriority", wintypes.LONG),
]
ntdll.NtQueryInformationThread.argtypes = [
wintypes.HANDLE,
wintypes.ULONG,
wintypes.LPVOID,
wintypes.ULONG,
ctypes.POINTER(wintypes.ULONG),
]
ntdll.NtQueryInformationThread.restype = wintypes.LONG
class WinError(RuntimeError):
pass
def check_handle(handle, what):
if handle in (None, 0, INVALID_HANDLE_VALUE):
raise WinError(f"{what} failed, last_error=0x{ctypes.get_last_error():08X}")
return handle
def ror8(value, shift):
value &= 0xFF
return ((value >> shift) | (value << (8 - shift))) & 0xFF
def encode_move_code(move_code):
return ror8(move_code ^ 0x5A, 5)
def current_teb_base():
tbi = THREAD_BASIC_INFORMATION()
returned = wintypes.ULONG()
status = ntdll.NtQueryInformationThread(
kernel32.GetCurrentThread(),
0,
ctypes.byref(tbi),
ctypes.sizeof(tbi),
ctypes.byref(returned),
)
if status != 0:
raise WinError(f"NtQueryInformationThread failed, status=0x{status & 0xFFFFFFFF:08X}")
return int(ctypes.cast(tbi.TebBaseAddress, ctypes.c_void_p).value)
def slot5_predicted_delay_ms(raw):
if len(raw) < 4:
return None
tick_xor = struct.unpack_from("<I", raw, 0)[0]
tick = tick_xor ^ 0xBAADF00D
return float((tick % 0x32) + 10)
def slot5_timing_hit(result, tolerance_ms=12.0):
sig = result["signals"]
if sig["event_ok"] or sig["event_wall"] or sig["sem_ok"] or sig["sem_wall"] or sig["handle_ok"]:
return False
if result["last_error"] in (LEAK_OK, LEAK_WALL, LEAK_EXIT, LEAK_POISON):
return False
predicted = result["slot5_predicted_ms"]
if predicted is None:
return False
return abs(result["time_ms"] - predicted) <= tolerance_ms
class ShadowGate:
def __init__(self):
self.handle = None
self.event_ok = None
self.event_wall = None
self.sem_ok = None
self.sem_wall = None
self.slot4_probe = None
self._teb_1748_slot = None
self._teb_1748_old = None
self.op_count = 0
def open(self):
self.handle = check_handle(
kernel32.CreateFileW(
DEVICE_NAME,
GENERIC_READ | GENERIC_WRITE,
0,
None,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL,
None,
),
f"CreateFileW({DEVICE_NAME})",
)
self.event_ok = self._ensure_event(EVENT_OK_NAME)
self.event_wall = self._ensure_event(EVENT_WALL_NAME)
self.sem_ok = self._ensure_semaphore(SEMAPHORE_OK_GUID)
self.sem_wall = self._ensure_semaphore(SEMAPHORE_WALL_GUID)
self._install_slot4_probe()
return self
def close(self):
if self._teb_1748_slot is not None and self._teb_1748_old is not None:
self._teb_1748_slot.value = self._teb_1748_old
self._teb_1748_slot = None
self._teb_1748_old = None
for attr in ("slot4_probe", "sem_wall", "sem_ok", "event_wall", "event_ok", "handle"):
handle = getattr(self, attr)
if handle:
kernel32.CloseHandle(handle)
setattr(self, attr, None)
def _ensure_event(self, name):
candidates = [name]
if name.startswith("Global\\"):
candidates.append(name.split("\\", 1)[1])
last_exc = None
for candidate in candidates:
try:
return check_handle(kernel32.CreateEventW(None, True, False, candidate), f"CreateEventW({candidate})")
except WinError as exc:
last_exc = exc
raise last_exc or WinError(f"CreateEventW({name}) failed")
def _ensure_semaphore(self, guid):
candidates = (rf"Global\{guid}", guid)
last_exc = None
for candidate in candidates:
try:
return check_handle(
kernel32.CreateSemaphoreW(None, 0, 0x7FFFFFFF, candidate),
f"CreateSemaphoreW({candidate})",
)
except WinError as exc:
last_exc = exc
raise last_exc or WinError(f"CreateSemaphoreW({guid}) failed")
def _install_slot4_probe(self):
teb = current_teb_base()
self.slot4_probe = check_handle(kernel32.CreateEventW(None, True, False, None), "CreateEventW(slot4_probe)")
self._teb_1748_slot = ctypes.c_uint64.from_address(teb + 0x1748)
self._teb_1748_old = self._teb_1748_slot.value
self._teb_1748_slot.value = int(self.slot4_probe)
def _ioctl(self, code, in_bytes=b"", out_size=0):
in_buf = ctypes.create_string_buffer(in_bytes, len(in_bytes)) if in_bytes else None
out_buf = ctypes.create_string_buffer(out_size) if out_size else None
returned = wintypes.DWORD()
kernel32.SetLastError(0)
ok = kernel32.DeviceIoControl(
self.handle,
code,
in_buf,
len(in_bytes),
out_buf,
out_size,
ctypes.byref(returned),
None,
)
last_error = ctypes.get_last_error() & 0xFFFFFFFF
if not ok:
raise WinError(f"DeviceIoControl(0x{code:08X}) failed, last_error=0x{last_error:08X}")
data = out_buf.raw[: returned.value] if out_buf else b""
return data, last_error
def query_maze(self):
data, _ = self._ioctl(IOCTL_QUERY, b"", 0x18)
return struct.unpack("<6I", data[:0x18])
def reset(self):
self._clear_sync_objects()
self._ioctl(IOCTL_RESET)
self.op_count = 0
def move(self, ch):
self._clear_sync_objects()
if self.slot4_probe:
kernel32.SetHandleInformation(self.slot4_probe, HANDLE_FLAG_PROTECT_FROM_CLOSE, 0)
move_code = MOVE_CODE[ch]
encoded = encode_move_code(move_code)
packet = struct.pack("<B3xII", encoded, self.op_count, encoded ^ self.op_count ^ 0xDEAD1337)
t0 = time.perf_counter()
data, last_error = self._ioctl(IOCTL_MOVE, packet, 0x84)
time_ms = (time.perf_counter() - t0) * 1000.0
signals = self._collect_signals()
outcome = self._classify_outcome(last_error, signals)
predicted = slot5_predicted_delay_ms(data)
self.op_count += 1
credential = None
if len(data) >= 0x84 and struct.unpack_from("<I", data, 0x3C)[0] == WIN_MAGIC:
length = struct.unpack_from("<I", data, 0x80)[0]
credential = data[0x40 : 0x40 + min(length, 0x3F)].split(b"\x00", 1)[0].decode("ascii", errors="replace")
result = {
"char": ch,
"move_code": move_code,
"encoded": encoded,
"outcome": outcome,
"last_error": last_error,
"signals": signals,
"time_ms": time_ms,
"slot5_predicted_ms": predicted,
"slot5_hit": False,
"raw": data,
"credential": credential,
}
result["slot5_hit"] = slot5_timing_hit(result)
return result
def _clear_sync_objects(self):
if self.event_ok:
kernel32.ResetEvent(self.event_ok)
if self.event_wall:
kernel32.ResetEvent(self.event_wall)
for sem in (self.sem_ok, self.sem_wall):
if sem:
while kernel32.WaitForSingleObject(sem, 0) == WAIT_OBJECT_0:
pass
def _collect_signals(self):
handle_ok = False
if self.slot4_probe:
flags = wintypes.DWORD()
if kernel32.GetHandleInformation(self.slot4_probe, ctypes.byref(flags)):
handle_ok = bool(flags.value & HANDLE_FLAG_PROTECT_FROM_CLOSE)
return {
"event_ok": self._poll_event(self.event_ok),
"event_wall": self._poll_event(self.event_wall),
"sem_ok": self._poll_semaphore(self.sem_ok),
"sem_wall": self._poll_semaphore(self.sem_wall),
"handle_ok": handle_ok,
}
def _classify_outcome(self, last_error, signals):
if last_error == LEAK_EXIT:
return "exit"
if last_error == LEAK_OK or signals["event_ok"] or signals["sem_ok"] or signals["handle_ok"]:
return "ok"
if last_error == LEAK_WALL or signals["event_wall"] or signals["sem_wall"]:
return "wall"
if last_error == LEAK_POISON:
return "poison"
return "unknown"
@staticmethod
def _poll_event(handle):
if not handle:
return False
return kernel32.WaitForSingleObject(handle, 0) == WAIT_OBJECT_0
@staticmethod
def _poll_semaphore(handle):
if not handle:
return False
rc = kernel32.WaitForSingleObject(handle, 0)
return rc == WAIT_OBJECT_0
def shortest_path(open_edges, start, goal):
queue = collections.deque([start])
prev = {start: (None, None)}
while queue:
cur = queue.popleft()
if cur == goal:
break
for move, nxt in open_edges.get(cur, {}).items():
if nxt not in prev:
prev[nxt] = (cur, move)
queue.append(nxt)
if goal not in prev:
return None
path = []
cur = goal
while prev[cur][0] is not None:
cur, move = prev[cur]
path.append(move)
path.reverse()
return "".join(path)
def is_explicit_success(result):
return result["outcome"] in ("ok", "exit")
def is_explicit_wall(result):
return result["outcome"] in ("wall", "poison")
def replay_path(gate, path):
for move in path:
result = gate.move(move)
if result["outcome"] in ("wall", "poison"):
raise WinError(f"replay failed at move {move}: {result}")
def probe_move(gate, route, move, cycle, attempts=8):
result = None
for shift in range(attempts):
gate.reset()
try:
replay_path(gate, route)
except WinError:
continue
if cycle:
out_move, back_move = cycle
blocked = False
for _ in range(shift):
out_res = gate.move(out_move)
if is_explicit_wall(out_res):
blocked = True
break
back_res = gate.move(back_move)
if is_explicit_wall(back_res):
blocked = True
break
if blocked:
continue
result = gate.move(move)
if result["slot5_hit"]:
continue
if is_explicit_success(result):
return True, result
if is_explicit_wall(result):
return False, result
return None, result
def solve_via_iterative_exploration(gate, width, height, start, goal, max_rounds=180):
discovered = {start}
open_edges = collections.defaultdict(dict)
walls = set()
pending = collections.deque([start])
credential = None
rounds = 0
while rounds < max_rounds and pending:
rounds += 1
progress = False
queue = collections.deque(list(dict.fromkeys(pending)))
pending.clear()
while queue:
cell = queue.popleft()
route = shortest_path(open_edges, start, cell) or ""
cycle = None
if route:
cycle = (INVERSE[route[-1]], route[-1])
elif "D" in open_edges[start]:
cycle = ("D", "A")
for move, delta in DIRS.items():
nx = cell[0] + delta[0]
ny = cell[1] + delta[1]
if not (0 <= nx < width and 0 <= ny < height):
continue
nxt = (nx, ny)
if move in open_edges[cell] or (cell, nxt) in walls:
continue
decision, result = probe_move(gate, route, move, cycle, attempts=8)
if result and result["credential"]:
credential = result["credential"]
if decision is True:
progress = True
open_edges[cell][move] = nxt
open_edges[nxt][INVERSE[move]] = cell
if nxt not in discovered:
discovered.add(nxt)
pending.append(nxt)
pending.append(cell)
elif decision is False:
progress = True
walls.add((cell, nxt))
walls.add((nxt, cell))
else:
pending.append(cell)
path = shortest_path(open_edges, start, goal)
if path:
if credential is None:
gate.reset()
replay_result = None
for move in path:
replay_result = gate.move(move)
if replay_result and replay_result["credential"]:
credential = replay_result["credential"]
return path, credential, open_edges
if not progress and not pending:
break
raise WinError("solve did not converge")
def render_ascii(width, height, open_edges):
rows = [["#"] * (width * 2 + 1) for _ in range(height * 2 + 1)]
for y in range(height):
for x in range(width):
rows[y * 2 + 1][x * 2 + 1] = "."
for (x, y), edges in open_edges.items():
for move, _ in edges.items():
if move == "D":
rows[y * 2 + 1][x * 2 + 2] = " "
elif move == "S":
rows[y * 2 + 2][x * 2 + 1] = " "
rows[1][1] = "S"
rows[height * 2 - 1][width * 2 - 1] = "E"
return "\n".join("".join(r) for r in rows)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--probe", metavar="PATH")
parser.add_argument("--solve", action="store_true")
parser.add_argument("--show-map", action="store_true")
args = parser.parse_args()
gate = ShadowGate().open()
try:
width, height, sx, sy, ex, ey = gate.query_maze()
print(f"maze: {width}x{height} start=({sx},{sy}) exit=({ex},{ey})")
print(f"events: ok={bool(gate.event_ok)} wall={bool(gate.event_wall)}")
print(f"semaphores: ok={bool(gate.sem_ok)} wall={bool(gate.sem_wall)}")
if args.probe:
gate.reset()
for ch in args.probe.upper():
result = gate.move(ch)
print(
f"{ch}: outcome={result['outcome']} "
f"last_error=0x{result['last_error']:08X} "
f"time_ms={result['time_ms']:.2f} "
f"slot5_predicted_ms={result['slot5_predicted_ms']} "
f"slot5_hit={result['slot5_hit']} "
f"signals={result['signals']}"
)
return 0
if args.solve:
path, credential, open_edges = solve_via_iterative_exploration(
gate, width, height, (sx, sy), (ex, ey)
)
print(f"path={path}")
if credential:
print(f"credential={credential}")
if args.show_map:
print(render_ascii(width, height, open_edges))
return 0
print("use --solve or --probe")
return 0
finally:
gate.close()
if __name__ == "__main__":
sys.exit(main())
① 单步观测 ShadowGate.move()
按协议构造 MOVE_REQ
encoded_dir = ror8(move_code ^ 0x5A, 5)
checksum = encoded_dir ^ op_count ^ 0xDEAD1337
- 调 DeviceIoControl(IOCTL_MOVE)
- 同时采集 5 个槽位的观测:
- 事件:MazeMoveOK / MazeMoveWall
- 信号量:A7F3... / B8E2...
- LastError
- TEB+0x1748 句柄标志
- slot5 timing:raw[0:4] ^ 0xBAADF00D 反推出 predicted_ms
② 单边判定 probe_move()
判断从当前格子朝某方向走一步,这条边是通还是墙?
做法是:
- reset
- replay 到当前待测格子
- 用一个已知成功的小回环 out/back 调整成功步相位
- 真正发一次待测 move
- 如果命中前四个结果槽位:
- 直接判 ok 或 wall
- 如果命中第五槽位:
- 只说明当前落在 phase5
- 不做结果判定,继续下一次尝试
③ 图搜索 solve_via_iterative_exploration()
- 从起点开始
- 对每个已发现格子的四个方向做 probe_move()
- decision == True:
- 记成通路边,加入图
- decision == False:
- 记成墙
- decision == None:
- 暂时放回队列,后续再试
④ 最短路径
每次图有新边之后,用 shortest_path() 在当前已恢复图上跑 BFS:
- 找到起点到终点的最短路
- 一旦终点连通,再 replay 一次路径
- 从返回缓冲取 credential
通过如下命令执行算法(需要加载驱动)
python shadowgate_solve.py --solve
得到输出
maze: 13x13 start=(0,0) exit=(12,12)
events: ok=True wall=True
semaphores: ok=True wall=True
path=DDDDDDSSDDDDWWDDSSSSSSSSAASSSSDD
credential=flag{SHAD0WNT_HYPERVMX}
得到了最短路径和 flag,具体执行时间有差异,在我的设备上应该在 70s 左右
五、构建地图
在前者的基础上设计建图脚本
⑤ 完整建图 explore_full_map()
核心数据结构:
- open_edges
- 已确认通路边
- walls
- 已确认墙边
- discovered
- 已发现可达格子
- tasks
- 尚未确认的候选边 (cell, move)
- attempts
- 每条候选边已经回炉过多少次
流程如下:
- 从起点开始,把起点四个方向加入任务队列
- 每次从队列中取出一条候选边 (cell, move)
- 用 shortest_path(open_edges, start, cell) 求从起点到该格子的当前已知路径
- 调用 probe_move() 判断这条边
- 若 decision == True
- 将这条边加入 open_edges
- 把新到达的格子加入 discovered
- 再把这个新格子的候选边加入任务队列
- 若 decision == False
- 将该边加入 walls
- 若 decision == None
- 表示这次只命中 slot5 或暂时未定
- 该边最多回炉有限次数,再次入队
⑥ 地图收敛与最短路径
完整建图跑完后,脚本会得到完整的 ASCII 迷宫地图,open_edges,随后脚本再用 shortest_path() 在完整恢复的图上跑 BFS,得到起点到终点的最短路,并replay 一次,从返回缓冲中提取 credential。
通过如下命令执行算法(需要加载驱动)
python shadowgate_map.py --map
得到输出
maze: 13x13 start=(0,0) exit=(12,12)
events: ok=True wall=True
semaphores: ok=True wall=True
###########################
#S . . . . . .#.#. . . . .#
############# ####### ### #
#.#.#.#.#.#.#.#.#.#.#.#.#.#
############# ####### ### #
#. . . . .#.#. . . . .#.#.#
# ####### ############### #
#.#.#.#.#.#.#.#.#.#.#.#.#.#
# ####### ############### #
#.#.#. . . . . . . . .#.#.#
# ### ### ########### ### #
#.#.#.#.#.#.#.#.#.#.#.#.#.#
# ### ### ########### ### #
#.#.#.#.#.#.#. . .#.#.#.#.#
# ### ####### ### ####### #
#.#.#.#.#.#.#.#.#.#.#.#.#.#
# ### ####### ### ####### #
#.#.#. . . . .#.#.#.#. . .#
# ############### ### #####
#.#.#.#.#.#.#.#.#.#.#.#.#.#
# ############### ### #####
#. . .#.#. . .#.#.#.#.#.#.#
##### ### ### ### ### ### #
#.#.#.#.#.#.#.#.#.#.#.#.#.#
##### ### ### ### ### ### #
#. . . . .#.#.#.#. . . . E#
###########################
path=DDDDDDSSDDDDWWDDSSSSSSSSAASSSSDD
credential=flag{SHAD0WNT_HYPERVMX}
discovered=97
unresolved=0
得到了地图、最短路径和 flag,具体执行时间有差异,在我的设备上应该在 600s 左右(上个厕所回来就跑出来了)
[培训]Windows内核深度攻防:从Hook技术到Rootkit实战!