首页
社区
课程
招聘
[原创]2026 年腾讯游戏安全初赛 PC方向
发表于: 1小时前 79

[原创]2026 年腾讯游戏安全初赛 PC方向

1小时前
79

2026 年腾讯游戏安全初赛 PC方向

仅为本人题解,并非参考答案不能保证正确,请参照官方公布题解

〇、得分点

如何加载驱动

关掉 ACE 预启动,随便签一个泄露签名,即可加载驱动,稳定性极高,就蓝屏了两次。

Flag

flag{SHAD0WNT_HYPERVMX}

最短路径

DDDDDDSSDDDDWWDDSSSSSSSSAASSSSDD
//格式化为
RRRRRRDDRRRRUURRDDDDDDDDLLDDDDRR

地图

###########################
#S . . . . . .#.#. . . . .#
############# ####### ### #
#.#.#.#.#.#.#.#.#.#.#.#.#.#
############# ####### ### #
#. . . . .#.#. . . . .#.#.#
# ####### ############### #
#.#.#.#.#.#.#.#.#.#.#.#.#.#
# ####### ############### #
#.#.#. . . . . . . . .#.#.#
# ### ### ########### ### #
#.#.#.#.#.#.#.#.#.#.#.#.#.#
# ### ### ########### ### #
#.#.#.#.#.#.#. . .#.#.#.#.#
# ### ####### ### ####### #
#.#.#.#.#.#.#.#.#.#.#.#.#.#
# ### ####### ### ####### #
#.#.#. . . . .#.#.#.#. . .#
# ############### ### #####
#.#.#.#.#.#.#.#.#.#.#.#.#.#
# ############### ### #####
#. . .#.#. . .#.#.#.#.#.#.#
##### ### ### ### ### ### #
#.#.#.#.#.#.#.#.#.#.#.#.#.#
##### ### ### ### ### ### #
#. . . . .#.#.#.#. . . . E#
###########################

五个泄漏点

1. 两个对象名

①MazeMoveOK 事件

  • 类型:命名事件
  • 对象名:Global\MazeMoveOK
  • 驱动侧实现:ZwOpenEvent + ZwSetEvent
  • 含义:成功相关反馈槽位

②MazeMoveWall 事件

  • 类型:命名事件
  • 对象名:Global\MazeMoveWall
  • 驱动侧实现:ZwOpenEvent + ZwSetEvent
  • 含义:失败/撞墙相关反馈槽位

2. 两个 GUID

①GUID1 命名信号量

  • 类型:命名信号量
  • 对象名:
    • Global{A7F3B2C1-9E4D-4C8A-B5D6-1F2E3A4B5C6D}
  • 驱动侧实现:
    • 解码对象名
    • ObReferenceObjectByName
    • KeReleaseSemaphore
  • 含义:成功相关反馈槽位

②GUID2 命名信号量

  • 类型:命名信号量
  • 对象名:
    • Global{B8E2C3D0-0F5A-5D9B-C6E7-2A3F4B5C6D7E}
  • 驱动侧实现同上
  • 含义:失败/撞墙相关反馈槽位

3. LastError

  • 类型:线程用户态上下文回写
  • 驱动侧关键函数:sub_140316ADF
  • 关键行为:
    • +0x68 (TEB -> LastErrorValue)
  • 写入值包括:
    • 0xC0DE0001 -> ok
    • 0xC0DE0000 -> wall

4. ZwSetInformationObject

每次 move 前先调用 SetHandleInformation(h, HANDLE_FLAG_PROTECT_FROM_CLOSE, 0)进行清零,在move 后调用GetHandleInformation(h, &flags)

if( flags & HANDLE_FLAG_PROTECT_FROM_CLOSE != 0)
   handle_ok = true
else
   handle_ok = False
  • handle_ok = True
    • 表示:成功
  • handle_ok = False
    • 表示:失败/撞墙

5. KeDelayExecutionThread

函数会通过 KUSER_SHARED_DATA 访问 TickCountLowDeprecated,并且在异或后被编码进入返回缓冲,可以在用户层恢复被驱动使用的这个 Tick

 tick_xor = struct.unpack_from("<I", raw, 0)[0]
 tick = tick_xor ^ 0xBAADF00D

然后计算

predicted_ms = (tick % 50) + 10

就是理论上这个线路被延时的毫秒数,然后我们可以计算一个 IO 请求的耗时

 t0 = time.perf_counter()
 DeviceIoControl(...)
 time_ms = (time.perf_counter() - t0) * 1000.0

如果 predicted_mstime_ms 很接近,就能说明本次触发到了该泄漏点,但是无法判断具体的结果。

  • 很接近
    • 表示:命中该泄漏点,应重试,直到命中前几个可明确结果泄漏点
  • 不接近
    • 表示:应观测其他泄漏点获取本次 move 的结果

总结

在二进制文件中放了大量的小混淆,导致不管是正向分析还是逆向分析,都不舒服,比如发现了某一个函数,发现 xref 不可用,因为上游的调用点有花指令,混淆等,导致了 IDA 不能正确分析,在驱动分析中尤其明显,但是由于驱动导入表暴露了大量的信息,所以都可以通过导入表入手,不完整的分析全流程,也能得到相关结论,如果赛题在导入函数上做手脚,比如动态解析,难度会更大。

很好本来都要提交了,感觉这个寻路脚本咋会这么慢,时间还不稳定,又重新用IDA看了一眼驱动,发现有随机的 Sleep 感觉大概率是没找齐全,然后就找到了之前没找到的两个泄漏点(但是他们没有影响我得到flag....)

一、赛题文件

文档:2026游戏安全技术竞赛-PC客户端安全-初赛.docx

#「宫殿」的验证机制并不寻常:系统由用户态控制台和内核驱动组成,驱动内部隐藏着一个加密迷阵,所有操作指令必须由控制台通过驱动接口下
# 发。表面上,系统不会对你的任何操作给出反馈——你无法直接判断每一步操作是成功推进,还是被防线拦截。

从上述文本能够判断是迷宫题目,但是应该不是简单的迷宫,应该无法通过传统的迷宫分析方法分析,需要完整的逆向,至少要找到文档中提到的:

# 某些异常现象暗示着隐藏的信息泄露。找到这些线索,你就能感知每一步的结果;发现得越多,破解效率越高。

二、应用层:ShadowGateApp.exe

并未发现有保护壳,直接运行得到

C:\Users\Euarno\Desktop\2026游戏安全技术竞赛-PC客户端安全-初赛>ShadowGateApp.exe

==============================================
    Shadow Palace Gate  -  ACCESS DENIED
==============================================

  ACE has intercepted Shadow's palace gate
  system. A kernel driver hides an encrypted
  maze inside. Navigate through it to extract
  the credential for Shadow's internal network.

  The palace gives NO feedback on whether
  your moves succeed or hit a wall.

  Or does it? The system is not as silent
  as it seems. Five hidden flaws betray
  the result of every move 鈥?but each move
  exposes only one of them.

  Hint: after each reset, the first five
  successful moves reveal each flaw exactly
  once, in a fixed order.

[*] Connecting to Shadow gate driver...
[+] Gate module online.

[*] Maze grid: 13x13, Entry=(0,0), Exit=(12,12)
Commands:
  W/A/S/D    - Navigate Up/Left/Down/Right
  I/J/K/L    - Navigate Up/Left/Down/Right (alt)
  R          - Reset to entry point
  T          - Show operation log (position hidden)
  H          - Show this help
  Q / ESC    - Abort mission

可以有一个基本的了解 13*13 的迷宫,“The palace gives NO feedback on whether your moves succeed or hit a wall.” 表面不会有反馈,“Five hidden flaws betray the result of every move ” 要通过五个侧信道得到迷宫的反馈,**“after each reset, the first five successful moves reveal each flaw exactly once, in a fixed order. ”**每次你按 R 重置后,前 5 次成功移动会依次触发五种不同的泄露机制,顺序是固定的。(这里其实还是有一点疑惑的,最终只找到了三类泄露方式,但也可以算作五个泄漏点)

字符串没有做混淆,在字符串表暴露了很多信息。结合字符串的交叉引用,有如下分析:

__int64 OpenShadowGateDevice()
{
  __int64 result; // rax
  DWORD LastError; // edi

   //设备路径
  result = (__int64)CreateFileW(L"\\\\.\\ShadowGate", 0xC0000000, 0, 0, 3u, 0x80u, 0);
  if ( result == -1 )
  {
    LastError = GetLastError();
    sub_140001010("[ERROR] Failed to open device '%ws'\n", L"\\\\.\\ShadowGate");
    sub_140001010("[ERROR] Error code: %lu (0x%08lX)\n", LastError, LastError);
    if ( LastError == 2 )
    {
      //要求我们应该提前加载好驱动 驱动没有签名 我的解决方案是找一个泄露签名 退出ACE预启动 即可加载
      sub_140001010("[HINT] Driver not loaded. Use: sc create ShadowGate type=kernel binPath=<path>\\ShadowGateSys.sys\n");
      sub_140001010("[HINT] Then: sc start ShadowGate\n");
      return -1;
    }
    else
    {
      if ( LastError == 5 )
        sub_140001010("[HINT] Run as Administrator.\n");
      return -1;
    }
  }
  return result;
}

虽然还没有分析驱动,但是应用创建了两个全局命名事件,名称强烈暗示它们分别对应移动成功和撞墙,即失败。

HANDLE CreateLeakEvents()
{
  HANDLE result; // rax

  hObject = CreateEventW(0, 1, 0, L"Global\\MazeMoveOK");
  result = CreateEventW(0, 1, 0, L"Global\\MazeMoveWall");
  qword_140005688 = result;
  return result;
}

通信方面使用了最基本的 IO 通讯,分析 DeviceIoControl 的交叉引用可以得到所有的 IO

 if ( QueryMaza(v6, &v9) )
      printf(
        "[*] Maze grid: %ux%u, Entry=(%u,%u), Exit=(%u,%u)\n",
        (_DWORD)v9,
        DWORD1(v9),
        DWORD2(v9),
        HIDWORD(v9),
        v10,
        HIDWORD(v10));

BOOL __fastcall QueryMaza(__int64 a1, void *lpOutBuffer)
{
  DWORD BytesReturned; // [rsp+40h] [rbp-18h] BYREF

  BytesReturned = 0;
  return DeviceIoControl(hDevice, 0x8001200C, 0, 0, lpOutBuffer, 0x18u, &BytesReturned, 0);
}

显然 0x8001200C 用于查询迷宫信息,接下来就能发现 main 函数有大量的花指令了,因为正常的 R 等逻辑的处理代码都不存在

只需要把

push rcx
ret

改为

jmp rcx

伪代码就重建好了,然后依旧是出题人的小礼物啊

v8 -= 27;
switch ( v8 )

这里会有垂落,处理的是大小写 R 的情况

 case '7':
 case 'W':
     ResetMaze(_RCX);
     v6 = 0;
     dword_140005668 = 0;
     v22[0] = 0;
     v7 = 0;
     printf("[*] Reset to entry point.\n");
     continue;

BOOL ResetMaze()
{
  DWORD BytesReturned; // [rsp+40h] [rbp-18h] BYREF

  BytesReturned = 0;
  return DeviceIoControl(hDevice, 0x80012008, 0, 0, 0, 0, &BytesReturned, 0);
}

显然 0x80012008 用于重新开始,那剩下的操作码肯定是操作迷宫了的

IOCTL 作用
0x80012004 移动/核心交互
0x80012008 重置到起点
0x8001200C 查询迷宫信息

紧接着还要详细分析,相关的按键被映射为了十六进制整数,如下

      case '&':
      case '/':
      case 'F':
      case 'O':
        LOBYTE(direct) = 0x30; //A  J
        v12 = 76;
        goto LABEL_11;
      case ')':
      case '1':
      case 'I':
      case 'Q':
        LOBYTE(direct) = 0x40;// D  L
        __asm { rcl     al, cl }
        v12 = 82;
        goto LABEL_11;
      case '.':
      case '<':
      case 'N':
      case '\\':
        LOBYTE(direct) = 0x10; //W  I 
        v12 = 85;
        goto LABEL_11;
      case '0':
      case '8':
      case 'P':
      case 'X':
        LOBYTE(direct) = 0x20; // S   K 
        v12 = 68;
LABEL_11:
        if ( v6 < 255 )
        {
          v22[v7] = v12;
          ++v6;
          ++v7;
          if ( (unsigned int)v6 >= 0x100 )
            sub_140001E88(_RCX, direct);
          v22[v7] = 0;
        }
        v14 = (unsigned int)dword_140005668;
        memset(v21, 0, sizeof(v21));
        v18 = 0;
        ++dword_140005668;
        v15 = TryMove(hDevice, direct, v14, v21, &v18);

TryMove 对于路径的打包如下

__int64 v16;
unsigned int v17;

v16 = (unsigned __int8)v9; //只用了第一字节?
v17 = a3 ^ v9 ^ 0xDEAD1337;

DeviceIoControl(a1, 0x80012004, &v16, 0xCu, v14, 0x84u, &v15, 0) //12字节大小数据包 从v16开始写入

其中,具体的加密逻辑是这样

__int64 __fastcall TryMove(void *a1, __int64 _RDX, int op_count, _BYTE *a4, _DWORD *a5)
{
    //rdx是代表方向的十六进制整数 op_count是一个计数器,代表次数
    v6 = _RDX ^ 0xFA;
    v9 = (unsigned __int8)(_RDX | (8 * v6));
    v16 = (unsigned __int8)v9; //结构体开始
    v17 = op_count ^ v9 ^ 0xDEAD1337
    if ( DeviceIoControl(a1, 0x80012004, &v16, 0xCu, v14, 0x84u, &v15, 0) )
        ...
}

足够我们重建结构体了,但是伪代码有一个问题,驱动如何校验 v17 的正确性呢?切换到汇编,根据 DeviceIoControl 的参数可以知道

lea     r8, [r11-30h]   ; lpInBuffer

然后看汇编层面是如何写入 [r11-30h]

mov     [r11-30h], al
mov     [r11-2Ch], r8d
mov     [r11-28h], eax

统一成 buf 的偏移就是

[buf+0] 写入 1 字节
[buf+4] 写入 4 字节
[buf+8] 写入 4 字节

struct {
    uint8_t  field0;
    uint8_t  pad[3];
    uint32_t field4;
    uint32_t field8;
};

也就能重建结构体为

typedef struct _MOVE_REQ {
     uint8_t  encoded_dir; //映射为了十六进制整数
     uint8_t  pad[3];      //数据对齐
     uint32_t op_count;    //操作次数
     uint32_t checksum;    //校验点
 } MOVE_REQ; //一共12字节

接下来是输入 T 的问题

 case '9':
 case 'Y':
      sub_140001010(asc_1400038E0, (unsigned int)v6);
      v17 = v22;
      if ( v6 <= 0 )
        v17 = "(none)";
      sub_140001010(asc_140003908, v17);
      continue;

命令 T的作用并不是显示当前位置,而是输出应用层维护的成功移动日志。主循环中,每当一次移动被认为成功时,程序都会将对应方向字符 L R U D 追加到缓冲区 v22 中,而 T/t分支则负责打印当前操作计数及该字符串,也就是这里只能打印成功的路径。

先玩一下,也就是可以借助前五个正确路径找到五个泄漏点,因为提到了前五次成功顺次泄露,会在后边起关键作用

- D D D D D T

程序输出:

- Sequence: RRRRR

三、驱动层:ShadowGateSys.sys

也是见到没有壳子轻松达到 4 MB 的驱动文件了,混淆或者花指令估计是跑不了了,先迎接出题人的小礼物,重建 sub_140003208

0x140003244  call sub_1400018A0

patch

mov eax, 0

伪代码重建成功

__int64 __fastcall sub_140003208(struct _DRIVER_OBJECT *a1)
{
  NTSTATUS v3; // eax
  NTSTATUS v4; // edi
  struct _UNICODE_STRING DestinationString; // [rsp+40h] [rbp-18h] BYREF

  P = (PVOID)ExAllocatePool2(64, 472, 1702519117);
  if ( !P )
    return 3221225626LL;
  KeInitializeSpinLock(&SpinLock);
  KeInitializeSpinLock(&qword_1400050D0);
  ((void (*)(void))loc_140001E60)();
  DestinationString = 0;
  RtlInitUnicodeString(&DestinationString, L"\\Device\\ShadowGate");
  v3 = IoCreateDevice(a1, 0, &DestinationString, 0x22u, 0x100u, 0, &DeviceObject);
  v4 = v3;
  if ( v3 < 0 )
  {
    _mm_lfence();
    ExFreePoolWithTag(P, 0x657A614Du);
LABEL_5:
    P = 0;
    return (unsigned int)v4;
  }
  RtlInitUnicodeString(&SymbolicLinkName, L"\\??\\ShadowGate");
  v4 = IoCreateSymbolicLink(&SymbolicLinkName, &DestinationString);
  if ( v4 < 0 )
  {
    _mm_lfence();
    IoDeleteDevice(DeviceObject);
    ExFreePoolWithTag(P, 0x657A614Du);
    DeviceObject = 0;
    goto LABEL_5;
  }
  a1->DriverUnload = (PDRIVER_UNLOAD)sub_140001840;
  a1->MajorFunction[0] = (PDRIVER_DISPATCH)sub_1400014B0;
  a1->MajorFunction[2] = (PDRIVER_DISPATCH)sub_140001410;
  a1->MajorFunction[14] = (PDRIVER_DISPATCH)sub_140001540;
  DeviceObject->Flags |= 4u;
  DeviceObject->Flags &= ~0x80u;
  return 0;
}

接下来寻5个泄漏点,刚才在应用层找到的事件,先看字符串定位过去,找到第一个泄漏点

① 事件泄露

int __fastcall sub_1400022B0(__int64 a1, int a2)
{
  int result; // eax
  const WCHAR *v4; // rdx
  struct _UNICODE_STRING DestinationString; // [rsp+20h] [rbp-40h] BYREF
  struct _OBJECT_ATTRIBUTES ObjectAttributes; // [rsp+30h] [rbp-30h] BYREF
  void *EventHandle; // [rsp+80h] [rbp+20h] BYREF

  result = (unsigned __int8)dword_140005000;
  if ( (((_BYTE)dword_140005000 * ((_BYTE)dword_140005000 - 1)) & 1) == 0 )
  {
    if ( !a2 || (v4 = L"\\BaseNamedObjects\\MazeMoveWall", a2 == 2) )
      v4 = L"\\BaseNamedObjects\\MazeMoveOK";
    RtlInitUnicodeString(&DestinationString, v4);
    ObjectAttributes.Length = 48;
    ObjectAttributes.ObjectName = &DestinationString;
    ObjectAttributes.RootDirectory = 0;
    ObjectAttributes.Attributes = 576;
    EventHandle = 0;
    *(_OWORD *)&ObjectAttributes.SecurityDescriptor = 0;
    result = ZwOpenEvent(&EventHandle, 2u, &ObjectAttributes);
    if ( result >= 0 )
    {
      ZwSetEvent(EventHandle, 0);
      return ZwClose(EventHandle);
    }
  }
  return result;
}

该函数根据传入状态参数,选择 **\BaseNamedObjects\MazeMoveOK **或 \BaseNamedObjects\MazeMoveWall

随后通过 ZwOpenEvent 打开对应命名事件,并调用 ZwSetEvent 将其置位,最后关闭句柄。

由于应用层事先创建了同名全局事件 Global\MazeMoveOKGlobal\MazeMoveWall

因此用户态可在每次发送移动请求后轮询这两个事件,从而判断本次移动是成功推进还是撞墙失败。

② GUID泄露

然后在导入表发现了函数 KeReleaseSemaphoreObReferenceObjectByName 跟到,sub_140319A37,有明显的选择和解密流程

  if ( !v5 || (v9 = &unk_1400041E0, v5 == 2) )
        v9 = &unk_140004160;
      v10 = 57;
      v11 = v9 - (_BYTE *)v18;
      v12 = v18;
      do
      {
        *v12 = *(WCHAR *)((char *)v12 + v11) ^ 0x4B; //解密
        ++v12;
        --v10;
      }
      while ( v10 );
    }
   RtlInitUnicodeString((PUNICODE_STRING)v17, v18); //构建字符串
    ObReferenceObjectByName(v17, 64, 0);

就要看看 unk_1400041E0unk_140004160 对应的到底是什么字符串了

def decode_guid_name(words):
    return ''.join(chr(w ^ 0x4B) for w in words)

unk_140004160 = [0x0017, 0x0009, 0x002A, 0x0038, 0x002E, 0x0005, 0x002A, 0x0026, 0x002E, 0x002F, 0x0004, 0x0029, 0x0021, 0x002E, 0x0028, 0x003F, 0x0038, 0x0017, 0x0030, 0x000A, 0x007C, 0x000D, 0x0078, 0x0009, 0x0079, 0x0008, 0x007A, 0x0066, 0x0072, 0x000E, 0x007F, 0x000F, 0x0066, 0x007F, 0x0008, 0x0073, 0x000A, 0x0066, 0x0009, 0x007E, 0x000F, 0x007D, 0x0066, 0x007A, 0x000D, 0x0079, 0x000E, 0x0078, 0x000A, 0x007F, 0x0009, 0x007E, 0x0008, 0x007D, 0x000F, 0x0036, 0x004B, 0x0000, 0x0000, 0x0000, 0x0000, 0x0000, 0x0000, 0x0000]

unk_1400041E0 = [0x0017, 0x0009, 0x002A, 0x0038, 0x002E, 0x0005, 0x002A, 0x0026, 0x002E, 0x002F, 0x0004, 0x0029, 0x0021, 0x002E, 0x0028, 0x003F, 0x0038, 0x0017, 0x0030, 0x0009, 0x0073, 0x000E, 0x0079, 0x0008, 0x0078, 0x000F, 0x007B, 0x0066, 0x007B, 0x000D, 0x007E, 0x000A, 0x0066, 0x007E, 0x000F, 0x0072, 0x0009, 0x0066, 0x0008, 0x007D, 0x000E, 0x007C, 0x0066, 0x0079, 0x000A, 0x0078, 0x000D, 0x007F, 0x0009, 0x007E, 0x0008, 0x007D, 0x000F, 0x007C, 0x000E, 0x0036, 0x004B, 0x0000, 0x0000, 0x0000, 0x000E, 0x0033, 0x0018, 0x002E]

print(decode_guid_name(unk_140004160))
print(decode_guid_name(unk_1400041E0))

得到输出,也就找到了第二个泄漏点,

\BaseNamedObjects\{A7F3B2C1-9E4D-4C8A-B5D6-1F2E3A4B5C6D}KKKKKKK
\BaseNamedObjects\{B8E2C3D0-0F5A-5D9B-C6E7-2A3F4B5C6D7E}KKKExSe
//手动去尾巴ing
\BaseNamedObjects\{A7F3B2C1-9E4D-4C8A-B5D6-1F2E3A4B5C6D}
\BaseNamedObjects\{B8E2C3D0-0F5A-5D9B-C6E7-2A3F4B5C6D7E}

但是暂时不能确定哪个是撞墙,哪个是成功,要具体测试一下

#include &lt;windows.h&gt;
#include &lt;iostream&gt;
#include &lt;string&gt;
#include &lt;vector&gt;
#include &lt;cstdint&gt;

static const wchar_t* DEVICE_NAME = L"\\\\.\\ShadowGate";
static const wchar_t* GUID1_NAME = L"Global\\{A7F3B2C1-9E4D-4C8A-B5D6-1F2E3A4B5C6D}";
static const wchar_t* GUID2_NAME = L"Global\\{B8E2C3D0-0F5A-5D9B-C6E7-2A3F4B5C6D7E}";

static const DWORD IOCTL_MOVE = 0x80012004;
static const DWORD IOCTL_RESET = 0x80012008;
static const DWORD IOCTL_QUERY = 0x8001200C;

#pragma pack(push, 1)
struct MOVE_REQ {
    uint8_t  encoded_dir;
    uint8_t  pad[3];
    uint32_t op_count;
    uint32_t checksum;
};
#pragma pack(pop)

static uint8_t ror8(uint8_t x, int n) {
    return static_cast<uint8_t>((x >> n) | (x << (8 - n)));
}

static uint8_t encode_dir(uint8_t move_code) {
    return ror8(static_cast<uint8_t>(move_code ^ 0x5A), 5);
}

static uint8_t move_code_from_char(char ch) {
    switch (ch) {
    case 'W': case 'w': return 0x10;
    case 'S': case 's': return 0x20;
    case 'A': case 'a': return 0x30;
    case 'D': case 'd': return 0x40;
    default: return 0;
    }
}

static void drain_semaphore(HANDLE hSem) {
    while (WaitForSingleObject(hSem, 0) == WAIT_OBJECT_0) {
    }
}

static bool poll_semaphore(HANDLE hSem) {
    DWORD rc = WaitForSingleObject(hSem, 0);
    return rc == WAIT_OBJECT_0;
}

static bool reset_maze(HANDLE hDev) {
    DWORD bytesReturned = 0;
    return DeviceIoControl(
        hDev,
        IOCTL_RESET,
        nullptr, 0,
        nullptr, 0,
        &bytesReturned,
        nullptr
    ) != FALSE;
}

static bool move_once(HANDLE hDev, char ch, uint32_t opCount) {
    uint8_t mc = move_code_from_char(ch);
    if (!mc) {
        std::cerr << "invalid move char: " << ch << "\n";
        return false;
    }

    MOVE_REQ req{};
    req.encoded_dir = encode_dir(mc);
    req.op_count = opCount;
    req.checksum = req.encoded_dir ^ opCount ^ 0xDEAD1337u;

    uint8_t outbuf[0x84] = {};
    DWORD bytesReturned = 0;

    return DeviceIoControl(
        hDev,
        IOCTL_MOVE,
        &req,
        sizeof(req),
        outbuf,
        sizeof(outbuf),
        &bytesReturned,
        nullptr
    ) != FALSE;
}

int wmain(int argc, wchar_t* argv[]) {
    std::string path = "DDDDD";
    if (argc >= 2) {
        std::wstring ws = argv[1];
        path.assign(ws.begin(), ws.end());
    }

    HANDLE hDev = CreateFileW(
        DEVICE_NAME,
        GENERIC_READ | GENERIC_WRITE,
        0,
        nullptr,
        OPEN_EXISTING,
        FILE_ATTRIBUTE_NORMAL,
        nullptr
    );
    if (hDev == INVALID_HANDLE_VALUE) {
        std::cerr << "CreateFileW failed, gle=0x" << std::hex << GetLastError() << "\n";
        return 1;
    }

    HANDLE hGuid1 = CreateSemaphoreW(nullptr, 0, 0x7fffffff, GUID1_NAME);
    HANDLE hGuid2 = CreateSemaphoreW(nullptr, 0, 0x7fffffff, GUID2_NAME);
    if (!hGuid1 || !hGuid2) {
        std::cerr << "CreateSemaphoreW failed, gle=0x" << std::hex << GetLastError() <<
            "\n";
        CloseHandle(hDev);
        return 1;
    }

    if (!reset_maze(hDev)) {
        std::cerr << "reset failed, gle=0x" << std::hex << GetLastError() << "\n";
        CloseHandle(hGuid1);
        CloseHandle(hGuid2);
        CloseHandle(hDev);
        return 1;
    }

    uint32_t opCount = 0;
    std::cout << "Testing path: " << path << "\n";

    for (size_t i = 0; i < path.size(); ++i) {
        drain_semaphore(hGuid1);
        drain_semaphore(hGuid2);

        char ch = path[i];
        if (!move_once(hDev, ch, opCount)) {
            std::cerr << "move " << ch << " failed at step " << i
                << ", gle=0x" << std::hex << GetLastError() << "\n";
            break;
        }

        bool g1 = poll_semaphore(hGuid1);
        bool g2 = poll_semaphore(hGuid2);

        std::cout << "step " << (i + 1)
            << " move=" << ch
            << " opCount=" << std::dec << opCount
            << " guid1=" << (g1 ? "triggered" : "no")
            << " guid2=" << (g2 ? "triggered" : "no")
            << "\n";

        ++opCount;
    }

    CloseHandle(hGuid1);
    CloseHandle(hGuid2);
    CloseHandle(hDev);
    return 0;
}

测试 DDDDD 得到

Testing path: DDDDD
step 1 move=D opCount=0 guid1=no guid2=no
step 2 move=D opCount=1 guid1=triggered guid2=no
step 3 move=D opCount=2 guid1=no guid2=no
step 4 move=D opCount=3 guid1=no guid2=no
step 5 move=D opCount=4 guid1=no guid2=no

那么路径正确时候会触发 Global\{A7F3B2C1-9E4D-4C8A-B5D6-1F2E3A4B5C6D}

#Global\{A7F3B2C1-9E4D-4C8A-B5D6-1F2E3A4B5C6D} -> 成功信号量
#Global\{B8E2C3D0-0F5A-5D9B-C6E7-2A3F4B5C6D7E} -> 撞墙信号量

③ TEB LastError泄露

还是要看导入表

  - PsGetCurrentProcessId
  - PsGetCurrentThreadId
  - PsLookupProcessByProcessId
  - PsLookupThreadByThreadId
  - KeStackAttachProcess
  - KeUnstackDetachProcess
  - PsGetProcessPeb
  - ZwQueryVirtualMemory

首先想到的可能是调试状态,或者 LastError,对于 LastError 的话,尝试找找有没有类似的写入逻辑

#结构:TEB -> LastErrorValue (DWORD)
#偏移:
#x86 (32 位):TEB + 0x34
#x64 (64 位):TEB + 0x68
int __fastcall sub_140316ADF(__int64 _RCX, int a2)
{
  __int16 _AX; // ax
  unsigned __int64 v4; // rax
  __int64 v6; // rbx
  void *v7; // rcx
  __int64 v8; // rax
  int *v9; // rsi
  int v10; // ebx
  __int64 v12; // [rsp-20h] [rbp-78h] BYREF
  PEPROCESS Process; // [rsp+0h] [rbp-58h] BYREF
  PVOID Object; // [rsp+8h] [rbp-50h] BYREF
  struct _KAPC_STATE ApcState; // [rsp+10h] [rbp-48h] BYREF

  _AX = 0;
  __asm { rcl     ax, cl }
  v4 = (unsigned __int64)&v12 ^ _security_cookie;
  v6 = _RCX;
  v7 = *(void **)(_RCX + 464);
  if ( v7 )
  {
    if ( *(_QWORD *)(v6 + 456) )
    {
      if ( qword_140005080 )
      {
        Object = 0;
        LODWORD(v4) = PsLookupThreadByThreadId(v7, (PETHREAD *)&Object);
        if ( (v4 & 0x80000000) == 0LL )
        {
          Process = 0;
          if ( PsLookupProcessByProcessId(*(HANDLE *)(v6 + 456), &Process) >= 0 )
          {
            KeStackAttachProcess(Process, &ApcState);
            v8 = qword_140005080(Object); //qword_140005080 是 PsGetThreadTeb v8是Teb了
            if ( v8 )
            {
              v9 = (int *)(v8 + 0x68); //这里取出LastError的地址
              if ( (((_BYTE)dword_140005000 * ((_BYTE)dword_140005000 - 1)) & 1) != 0 )
              {
                v10 = 0xDEADDEAD;   //1
              }
              else if ( a2 )
              {
                v10 = 0xC0DE0002;   //2
                if ( a2 != 2 )
                  v10 = 0xC0DE0000; //3
              }
              else
              {
                v10 = 0xC0DE0001;   //4
              }
              ProbeForWrite(v9, 4u, 4u);
              *v9 = v10; //这里对LaseError做了覆写
            }
            KeUnstackDetachProcess(&ApcState);
            ObfDereferenceObject(Process);
          }
          LODWORD(v4) = ObfDereferenceObject(Object);
        }
      }
    }
  }
  return v4;
}

显然可以看到

v9 = (int *)(v8 + 0x68); //这里取出LastError的地址
*v9 = v10; //这里对LaseError做了覆写

那么 LastError 应该是下一个泄漏点了

Code 含义
0xC0DE0001 ok
0xC0DE0002 到达终点?
0xC0DE0000 wall
0xDEADDEAD 干扰值

④ ZwSetInformationObject

刚才我们提到 qword_140005080PsGetThreadTeb ,相关的交叉引用还有一个函数

void sub_14031857E()
{
  __int64 v0; // rax
  _QWORD *v1; // rbx

  if ( PsGetThreadTeb )
  {
    if ( qword_140005090 )
    {
      v0 = _guard_dispatch_icall_fptr(); // Call PsGetThreadTeb
      if ( v0 )
      {
        v1 = (_QWORD *)(v0 + 0x1748); //TEB + 0x1748
        ProbeForRead((volatile void *)(v0 + 0x1748), 8u, 8u); 
        if ( *v1 ) //如果TEB + 0x1748不为0
          _guard_dispatch_icall_fptr(); // Call qword_140005090
      }
    }
  }
}
1403185EF                 lea     rbx, [rax+1748h]    //rbx = TEB + 0x1748


14031861E                 mov     rcx, rbx            //ProbeForRead(TEB+0x1748, 8, 8)
140318621                 call    cs:ProbeForRead

140318627                 mov     r10, [rbx]          //读出这个位置的 8 字节值 r10 = *(QWORD*)(TEB+0x1748)
140318630                 test    r10, r10            //如果这个值是 0
140318638                 jz      loc_140318758

1403186F6                 mov     [rsp+38h+arg_10], 0  
140318701                 mov     [rsp+38h+arg_11], r8b
14031871D                 lea     r8, [rsp+38h+arg_10]  //r8  = &2_byte_buffer
140318710                 mov     r9d, 2                //r9d = 2
14031872B                 mov     rcx, r10              // rcx = *(QWORD*)(TEB+0x1748)

通过汇编可以分析出,

 qword_140005090(
      *(QWORD*)(TEB + 0x1748),
      4,
      &two_byte_buf,
      2
  );

这还说啥了,如果不动态调试的话估计是没戏了,然后问 GPT 能不能猜测一下这是哪个函数?

为什么我把它解释成 ZwSetInformationObject
不是只凭感觉,而是因为参数形状正好匹配:
- HANDLE
- OBJECT_INFORMATION_CLASS = 4
- PVOID buffer
- ULONG length = 2

再结合动态实验:

- 把 TEB+0x1748 预置成事件句柄
- move 后这个句柄的 HANDLE_FLAG_PROTECT_FROM_CLOSE 会变化

所以这才进一步把它收敛成:

ZwSetInformationObject(handle, ObjectHandleFlagInformation, &info, 2)

查阅相关资料

NTSYSCALLAPI
NTSTATUS
NTAPI
NtSetInformationObject(
    _In_ HANDLE Handle,
    _In_ OBJECT_INFORMATION_CLASS ObjectInformationClass,
    _In_reads_bytes_(ObjectInformationLength) PVOID ObjectInformation,
    _In_ ULONG ObjectInformationLength  // 为2
    );

#endif
#endif

typedef struct _OBJECT_HANDLE_FLAG_INFORMATION {
      BOOLEAN Inherit;
      BOOLEAN ProtectFromClose;
  } OBJECT_HANDLE_FLAG_INFORMATION;

然后需要确定 ZwSetInformationObject 提供的什么信号墙,什么信号是通路?可以先创建一个事件句柄 slot4_probe,把这个句柄值写到当前线程 TEB + 0x1748,每次 move 前先调用 SetHandleInformation(h, HANDLE_FLAG_PROTECT_FROM_CLOSE, 0)进行清零,在move 后调用GetHandleInformation(h, &flags),因为题目提到,reset 后前五次成功步按顺序泄露

if( flags & HANDLE_FLAG_PROTECT_FROM_CLOSE != 0)
   handle_ok = true
else
   handle_ok = False

并且稳定观测到(我们测试到了前几步都是D,并且可以通过回环 DAD 走法来增加正确步骤的次数)

- DAD + A
  - handle_ok = True
- DAD + W
  - handle_ok = False

⑤ KeDelayExecutionThread

这个的发现要归功于提交之前的几次测试,我感觉这个寻路没有道理这么慢,内核很可能有随机的,甚至是故意的延时,发现导入了函数 KeDelayExecutionThread

void __fastcall sub_1400026B4(unsigned int *a1)
{
  sub_140002038(a1);
  sub_140002388();
  JUMPOUT(0x140001FF0LL);
}
void __fastcall sub_140002038(unsigned int *a1)
{
  _BYTE *v1; // rdx
  __int64 v2; // r8
  unsigned int v3; // eax

  if ( a1 )
  {
    v1 = a1 + 1;
    v2 = 56;
    //MEMORY[0xFFFFF78000000320] == KUSER_SHARED_DATA + 0x320 == TickCountLowDeprecated
    v3 = TickCountLowDeprecated ^ 0xBAADF00D; 
    *a1 = TickCountLowDeprecated ^ 0xBAADF00D; // *(DWORD*)a1 = TickCount ^ 0xBAADF00D;
                                                 // drive 用来算 delay 的那个 TickCount 同时也被编码写进了返回缓冲
    do
    {
      v3 = 1103515245 * v3 + 12345;
      *v1++ = BYTE2(v3);
      --v2;
    }
    while ( v2 );
  }
}
NTSTATUS sub_140002388()
{
  union _LARGE_INTEGER Interval; // [rsp+30h] [rbp+8h] BYREF

  Interval.QuadPart = -10000LL * (TickCountLowDeprecated % 50u + 10); //转换到毫秒 10~59ms
  return KeDelayExecutionThread(0, 0, &Interval);
}

函数会通过 KUSER_SHARED_DATA 访问 TickCountLowDeprecated,并且在异或后被编码进入返回缓冲,可以在用户层恢复被驱动使用的这个 Tick

 tick_xor = struct.unpack_from("&lt;I", raw, 0)[0]
 tick = tick_xor ^ 0xBAADF00D

然后计算

predicted_ms = (tick % 50) + 10

就是理论上这个线路被延时的毫秒数,然后我们可以计算一个 IO 请求的耗时

 t0 = time.perf_counter()
 DeviceIoControl(...)
 time_ms = (time.perf_counter() - t0) * 1000.0

如果 predicted_mstime_ms 很接近,就能说明本次触发到了该泄漏点,但是更进一步,好像没有合适的办法确定到底是撞墙了还是成功了(从现有代码逻辑看是这样,向上的交叉引用又追不过去),其实也好办,我们能够观测到是不是触发了这个泄漏点,当观测到的时候,直接重试就好了,依赖前四个泄漏点做路线判断。

四、寻路算法并获取 Flag

算法如下

import argparse
import collections
import ctypes
from ctypes import wintypes
import struct
import sys
import time


kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
ntdll = ctypes.WinDLL("ntdll")

GENERIC_READ = 0x80000000
GENERIC_WRITE = 0x40000000
OPEN_EXISTING = 3
FILE_ATTRIBUTE_NORMAL = 0x80
INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value

WAIT_OBJECT_0 = 0x00000000
WAIT_TIMEOUT = 0x00000102

HANDLE_FLAG_PROTECT_FROM_CLOSE = 0x00000002

IOCTL_MOVE = 0x80012004
IOCTL_RESET = 0x80012008
IOCTL_QUERY = 0x8001200C

WIN_MAGIC = 0x57494E21
LEAK_WALL = 0xC0DE0000
LEAK_OK = 0xC0DE0001
LEAK_EXIT = 0xC0DE0002
LEAK_POISON = 0xDEADDEAD

DEVICE_NAME = r"\\.\ShadowGate"
EVENT_OK_NAME = r"Global\MazeMoveOK"
EVENT_WALL_NAME = r"Global\MazeMoveWall"
SEMAPHORE_OK_GUID = "{A7F3B2C1-9E4D-4C8A-B5D6-1F2E3A4B5C6D}"
SEMAPHORE_WALL_GUID = "{B8E2C3D0-0F5A-5D9B-C6E7-2A3F4B5C6D7E}"

DIRS = {
    "W": (0, -1),
    "A": (-1, 0),
    "S": (0, 1),
    "D": (1, 0),
}
INVERSE = {"W": "S", "S": "W", "A": "D", "D": "A"}
MOVE_CODE = {"W": 0x10, "A": 0x30, "S": 0x20, "D": 0x40}


kernel32.CreateFileW.argtypes = [
    wintypes.LPCWSTR,
    wintypes.DWORD,
    wintypes.DWORD,
    wintypes.LPVOID,
    wintypes.DWORD,
    wintypes.DWORD,
    wintypes.HANDLE,
]
kernel32.CreateFileW.restype = wintypes.HANDLE

kernel32.DeviceIoControl.argtypes = [
    wintypes.HANDLE,
    wintypes.DWORD,
    wintypes.LPVOID,
    wintypes.DWORD,
    wintypes.LPVOID,
    wintypes.DWORD,
    ctypes.POINTER(wintypes.DWORD),
    wintypes.LPVOID,
]
kernel32.DeviceIoControl.restype = wintypes.BOOL

kernel32.CreateEventW.argtypes = [wintypes.LPVOID, wintypes.BOOL, wintypes.BOOL, wintypes.LPCWSTR]
kernel32.CreateEventW.restype = wintypes.HANDLE

kernel32.CreateSemaphoreW.argtypes = [
    wintypes.LPVOID,
    wintypes.LONG,
    wintypes.LONG,
    wintypes.LPCWSTR,
]
kernel32.CreateSemaphoreW.restype = wintypes.HANDLE

kernel32.WaitForSingleObject.argtypes = [wintypes.HANDLE, wintypes.DWORD]
kernel32.WaitForSingleObject.restype = wintypes.DWORD

kernel32.ResetEvent.argtypes = [wintypes.HANDLE]
kernel32.ResetEvent.restype = wintypes.BOOL

kernel32.GetCurrentThread.argtypes = []
kernel32.GetCurrentThread.restype = wintypes.HANDLE

kernel32.GetHandleInformation.argtypes = [wintypes.HANDLE, ctypes.POINTER(wintypes.DWORD)]
kernel32.GetHandleInformation.restype = wintypes.BOOL

kernel32.SetHandleInformation.argtypes = [wintypes.HANDLE, wintypes.DWORD, wintypes.DWORD]
kernel32.SetHandleInformation.restype = wintypes.BOOL

kernel32.CloseHandle.argtypes = [wintypes.HANDLE]
kernel32.CloseHandle.restype = wintypes.BOOL

kernel32.SetLastError.argtypes = [wintypes.DWORD]
kernel32.SetLastError.restype = None


class CLIENT_ID(ctypes.Structure):
    _fields_ = [("UniqueProcess", wintypes.HANDLE), ("UniqueThread", wintypes.HANDLE)]


class THREAD_BASIC_INFORMATION(ctypes.Structure):
    _fields_ = [
        ("ExitStatus", wintypes.LONG),
        ("TebBaseAddress", wintypes.LPVOID),
        ("ClientId", CLIENT_ID),
        ("AffinityMask", ctypes.c_size_t),
        ("Priority", wintypes.LONG),
        ("BasePriority", wintypes.LONG),
    ]


ntdll.NtQueryInformationThread.argtypes = [
    wintypes.HANDLE,
    wintypes.ULONG,
    wintypes.LPVOID,
    wintypes.ULONG,
    ctypes.POINTER(wintypes.ULONG),
]
ntdll.NtQueryInformationThread.restype = wintypes.LONG


class WinError(RuntimeError):
    pass


def check_handle(handle, what):
    if handle in (None, 0, INVALID_HANDLE_VALUE):
        raise WinError(f"{what} failed, last_error=0x{ctypes.get_last_error():08X}")
    return handle


def ror8(value, shift):
    value &= 0xFF
    return ((value &gt;> shift) | (value << (8 - shift))) & 0xFF


def encode_move_code(move_code):
    return ror8(move_code ^ 0x5A, 5)


def current_teb_base():
    tbi = THREAD_BASIC_INFORMATION()
    returned = wintypes.ULONG()
    status = ntdll.NtQueryInformationThread(
        kernel32.GetCurrentThread(),
        0,
        ctypes.byref(tbi),
        ctypes.sizeof(tbi),
        ctypes.byref(returned),
    )
    if status != 0:
        raise WinError(f"NtQueryInformationThread failed, status=0x{status & 0xFFFFFFFF:08X}")
    return int(ctypes.cast(tbi.TebBaseAddress, ctypes.c_void_p).value)


def slot5_predicted_delay_ms(raw):
    if len(raw) < 4:
        return None
    tick_xor = struct.unpack_from("&lt;I", raw, 0)[0]
    tick = tick_xor ^ 0xBAADF00D
    return float((tick % 0x32) + 10)


def slot5_timing_hit(result, tolerance_ms=12.0):
    sig = result["signals"]
    if sig["event_ok"] or sig["event_wall"] or sig["sem_ok"] or sig["sem_wall"] or sig["handle_ok"]:
        return False
    if result["last_error"] in (LEAK_OK, LEAK_WALL, LEAK_EXIT, LEAK_POISON):
        return False
    predicted = result["slot5_predicted_ms"]
    if predicted is None:
        return False
    return abs(result["time_ms"] - predicted) &lt;= tolerance_ms


class ShadowGate:
    def __init__(self):
        self.handle = None
        self.event_ok = None
        self.event_wall = None
        self.sem_ok = None
        self.sem_wall = None
        self.slot4_probe = None
        self._teb_1748_slot = None
        self._teb_1748_old = None
        self.op_count = 0

    def open(self):
        self.handle = check_handle(
            kernel32.CreateFileW(
                DEVICE_NAME,
                GENERIC_READ | GENERIC_WRITE,
                0,
                None,
                OPEN_EXISTING,
                FILE_ATTRIBUTE_NORMAL,
                None,
            ),
            f"CreateFileW({DEVICE_NAME})",
        )
        self.event_ok = self._ensure_event(EVENT_OK_NAME)
        self.event_wall = self._ensure_event(EVENT_WALL_NAME)
        self.sem_ok = self._ensure_semaphore(SEMAPHORE_OK_GUID)
        self.sem_wall = self._ensure_semaphore(SEMAPHORE_WALL_GUID)
        self._install_slot4_probe()
        return self

    def close(self):
        if self._teb_1748_slot is not None and self._teb_1748_old is not None:
            self._teb_1748_slot.value = self._teb_1748_old
        self._teb_1748_slot = None
        self._teb_1748_old = None
        for attr in ("slot4_probe", "sem_wall", "sem_ok", "event_wall", "event_ok", "handle"):
            handle = getattr(self, attr)
            if handle:
                kernel32.CloseHandle(handle)
                setattr(self, attr, None)

    def _ensure_event(self, name):
        candidates = [name]
        if name.startswith("Global\\"):
            candidates.append(name.split("\\", 1)[1])
        last_exc = None
        for candidate in candidates:
            try:
                return check_handle(kernel32.CreateEventW(None, True, False, candidate), f"CreateEventW({candidate})")
            except WinError as exc:
                last_exc = exc
        raise last_exc or WinError(f"CreateEventW({name}) failed")

    def _ensure_semaphore(self, guid):
        candidates = (rf"Global\{guid}", guid)
        last_exc = None
        for candidate in candidates:
            try:
                return check_handle(
                    kernel32.CreateSemaphoreW(None, 0, 0x7FFFFFFF, candidate),
                    f"CreateSemaphoreW({candidate})",
                )
            except WinError as exc:
                last_exc = exc
        raise last_exc or WinError(f"CreateSemaphoreW({guid}) failed")

    def _install_slot4_probe(self):
        teb = current_teb_base()
        self.slot4_probe = check_handle(kernel32.CreateEventW(None, True, False, None), "CreateEventW(slot4_probe)")
        self._teb_1748_slot = ctypes.c_uint64.from_address(teb + 0x1748)
        self._teb_1748_old = self._teb_1748_slot.value
        self._teb_1748_slot.value = int(self.slot4_probe)

    def _ioctl(self, code, in_bytes=b"", out_size=0):
        in_buf = ctypes.create_string_buffer(in_bytes, len(in_bytes)) if in_bytes else None
        out_buf = ctypes.create_string_buffer(out_size) if out_size else None
        returned = wintypes.DWORD()
        kernel32.SetLastError(0)
        ok = kernel32.DeviceIoControl(
            self.handle,
            code,
            in_buf,
            len(in_bytes),
            out_buf,
            out_size,
            ctypes.byref(returned),
            None,
        )
        last_error = ctypes.get_last_error() & 0xFFFFFFFF
        if not ok:
            raise WinError(f"DeviceIoControl(0x{code:08X}) failed, last_error=0x{last_error:08X}")
        data = out_buf.raw[: returned.value] if out_buf else b""
        return data, last_error

    def query_maze(self):
        data, _ = self._ioctl(IOCTL_QUERY, b"", 0x18)
        return struct.unpack("&lt;6I", data[:0x18])

    def reset(self):
        self._clear_sync_objects()
        self._ioctl(IOCTL_RESET)
        self.op_count = 0

    def move(self, ch):
        self._clear_sync_objects()
        if self.slot4_probe:
            kernel32.SetHandleInformation(self.slot4_probe, HANDLE_FLAG_PROTECT_FROM_CLOSE, 0)
        move_code = MOVE_CODE[ch]
        encoded = encode_move_code(move_code)
        packet = struct.pack("&lt;B3xII", encoded, self.op_count, encoded ^ self.op_count ^ 0xDEAD1337)
        t0 = time.perf_counter()
        data, last_error = self._ioctl(IOCTL_MOVE, packet, 0x84)
        time_ms = (time.perf_counter() - t0) * 1000.0
        signals = self._collect_signals()
        outcome = self._classify_outcome(last_error, signals)
        predicted = slot5_predicted_delay_ms(data)
        self.op_count += 1
        credential = None
        if len(data) &gt;= 0x84 and struct.unpack_from("<I", data, 0x3C)[0] == WIN_MAGIC:
            length = struct.unpack_from("<I", data, 0x80)[0]
            credential = data[0x40 : 0x40 + min(length, 0x3F)].split(b"\x00", 1)[0].decode("ascii", errors="replace")
        result = {
            "char": ch,
            "move_code": move_code,
            "encoded": encoded,
            "outcome": outcome,
            "last_error": last_error,
            "signals": signals,
            "time_ms": time_ms,
            "slot5_predicted_ms": predicted,
            "slot5_hit": False,
            "raw": data,
            "credential": credential,
        }
        result["slot5_hit"] = slot5_timing_hit(result)
        return result

    def _clear_sync_objects(self):
        if self.event_ok:
            kernel32.ResetEvent(self.event_ok)
        if self.event_wall:
            kernel32.ResetEvent(self.event_wall)
        for sem in (self.sem_ok, self.sem_wall):
            if sem:
                while kernel32.WaitForSingleObject(sem, 0) == WAIT_OBJECT_0:
                    pass

    def _collect_signals(self):
        handle_ok = False
        if self.slot4_probe:
            flags = wintypes.DWORD()
            if kernel32.GetHandleInformation(self.slot4_probe, ctypes.byref(flags)):
                handle_ok = bool(flags.value & HANDLE_FLAG_PROTECT_FROM_CLOSE)
        return {
            "event_ok": self._poll_event(self.event_ok),
            "event_wall": self._poll_event(self.event_wall),
            "sem_ok": self._poll_semaphore(self.sem_ok),
            "sem_wall": self._poll_semaphore(self.sem_wall),
            "handle_ok": handle_ok,
        }

    def _classify_outcome(self, last_error, signals):
        if last_error == LEAK_EXIT:
            return "exit"
        if last_error == LEAK_OK or signals["event_ok"] or signals["sem_ok"] or signals["handle_ok"]:
            return "ok"
        if last_error == LEAK_WALL or signals["event_wall"] or signals["sem_wall"]:
            return "wall"
        if last_error == LEAK_POISON:
            return "poison"
        return "unknown"

    @staticmethod
    def _poll_event(handle):
        if not handle:
            return False
        return kernel32.WaitForSingleObject(handle, 0) == WAIT_OBJECT_0

    @staticmethod
    def _poll_semaphore(handle):
        if not handle:
            return False
        rc = kernel32.WaitForSingleObject(handle, 0)
        return rc == WAIT_OBJECT_0


def shortest_path(open_edges, start, goal):
    queue = collections.deque([start])
    prev = {start: (None, None)}
    while queue:
        cur = queue.popleft()
        if cur == goal:
            break
        for move, nxt in open_edges.get(cur, {}).items():
            if nxt not in prev:
                prev[nxt] = (cur, move)
                queue.append(nxt)
    if goal not in prev:
        return None
    path = []
    cur = goal
    while prev[cur][0] is not None:
        cur, move = prev[cur]
        path.append(move)
    path.reverse()
    return "".join(path)


def is_explicit_success(result):
    return result["outcome"] in ("ok", "exit")


def is_explicit_wall(result):
    return result["outcome"] in ("wall", "poison")


def replay_path(gate, path):
    for move in path:
        result = gate.move(move)
        if result["outcome"] in ("wall", "poison"):
            raise WinError(f"replay failed at move {move}: {result}")


def probe_move(gate, route, move, cycle, attempts=8):
    result = None
    for shift in range(attempts):
        gate.reset()
        try:
            replay_path(gate, route)
        except WinError:
            continue
        if cycle:
            out_move, back_move = cycle
            blocked = False
            for _ in range(shift):
                out_res = gate.move(out_move)
                if is_explicit_wall(out_res):
                    blocked = True
                    break
                back_res = gate.move(back_move)
                if is_explicit_wall(back_res):
                    blocked = True
                    break
            if blocked:
                continue
        result = gate.move(move)
        if result["slot5_hit"]:
            continue
        if is_explicit_success(result):
            return True, result
        if is_explicit_wall(result):
            return False, result
    return None, result


def solve_via_iterative_exploration(gate, width, height, start, goal, max_rounds=180):
    discovered = {start}
    open_edges = collections.defaultdict(dict)
    walls = set()
    pending = collections.deque([start])
    credential = None

    rounds = 0
    while rounds < max_rounds and pending:
        rounds += 1
        progress = False
        queue = collections.deque(list(dict.fromkeys(pending)))
        pending.clear()

        while queue:
            cell = queue.popleft()
            route = shortest_path(open_edges, start, cell) or ""
            cycle = None
            if route:
                cycle = (INVERSE[route[-1]], route[-1])
            elif "D" in open_edges[start]:
                cycle = ("D", "A")

            for move, delta in DIRS.items():
                nx = cell[0] + delta[0]
                ny = cell[1] + delta[1]
                if not (0 <= nx < width and 0 <= ny < height):
                    continue
                nxt = (nx, ny)
                if move in open_edges[cell] or (cell, nxt) in walls:
                    continue

                decision, result = probe_move(gate, route, move, cycle, attempts=8)
                if result and result["credential"]:
                    credential = result["credential"]

                if decision is True:
                    progress = True
                    open_edges[cell][move] = nxt
                    open_edges[nxt][INVERSE[move]] = cell
                    if nxt not in discovered:
                        discovered.add(nxt)
                        pending.append(nxt)
                    pending.append(cell)
                elif decision is False:
                    progress = True
                    walls.add((cell, nxt))
                    walls.add((nxt, cell))
                else:
                    pending.append(cell)

        path = shortest_path(open_edges, start, goal)
        if path:
            if credential is None:
                gate.reset()
                replay_result = None
                for move in path:
                    replay_result = gate.move(move)
                if replay_result and replay_result["credential"]:
                    credential = replay_result["credential"]
            return path, credential, open_edges

        if not progress and not pending:
            break

    raise WinError("solve did not converge")


def render_ascii(width, height, open_edges):
    rows = [["#"] * (width * 2 + 1) for _ in range(height * 2 + 1)]
    for y in range(height):
        for x in range(width):
            rows[y * 2 + 1][x * 2 + 1] = "."
    for (x, y), edges in open_edges.items():
        for move, _ in edges.items():
            if move == "D":
                rows[y * 2 + 1][x * 2 + 2] = " "
            elif move == "S":
                rows[y * 2 + 2][x * 2 + 1] = " "
    rows[1][1] = "S"
    rows[height * 2 - 1][width * 2 - 1] = "E"
    return "\n".join("".join(r) for r in rows)


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--probe", metavar="PATH")
    parser.add_argument("--solve", action="store_true")
    parser.add_argument("--show-map", action="store_true")
    args = parser.parse_args()

    gate = ShadowGate().open()
    try:
        width, height, sx, sy, ex, ey = gate.query_maze()
        print(f"maze: {width}x{height} start=({sx},{sy}) exit=({ex},{ey})")
        print(f"events: ok={bool(gate.event_ok)} wall={bool(gate.event_wall)}")
        print(f"semaphores: ok={bool(gate.sem_ok)} wall={bool(gate.sem_wall)}")

        if args.probe:
            gate.reset()
            for ch in args.probe.upper():
                result = gate.move(ch)
                print(
                    f"{ch}: outcome={result['outcome']} "
                    f"last_error=0x{result['last_error']:08X} "
                    f"time_ms={result['time_ms']:.2f} "
                    f"slot5_predicted_ms={result['slot5_predicted_ms']} "
                    f"slot5_hit={result['slot5_hit']} "
                    f"signals={result['signals']}"
                )
            return 0

        if args.solve:
            path, credential, open_edges = solve_via_iterative_exploration(
                gate, width, height, (sx, sy), (ex, ey)
            )
            print(f"path={path}")
            if credential:
                print(f"credential={credential}")
            if args.show_map:
                print(render_ascii(width, height, open_edges))
            return 0

        print("use --solve or --probe")
        return 0
    finally:
        gate.close()


if __name__ == "__main__":
    sys.exit(main())

① 单步观测 ShadowGate.move()

按协议构造 MOVE_REQ

  • encoded_dir = ror8(move_code ^ 0x5A, 5)

  • checksum = encoded_dir ^ op_count ^ 0xDEAD1337

    • 调 DeviceIoControl(IOCTL_MOVE)
    • 同时采集 5 个槽位的观测:
      • 事件:MazeMoveOK / MazeMoveWall
      • 信号量:A7F3... / B8E2...
      • LastError
      • TEB+0x1748 句柄标志
      • slot5 timing:raw[0:4] ^ 0xBAADF00D 反推出 predicted_ms

② 单边判定 probe_move()

判断从当前格子朝某方向走一步,这条边是通还是墙?

做法是:

  • reset
  • replay 到当前待测格子
  • 用一个已知成功的小回环 out/back 调整成功步相位
  • 真正发一次待测 move
  • 如果命中前四个结果槽位:
    • 直接判 ok 或 wall
  • 如果命中第五槽位:
    • 只说明当前落在 phase5
    • 不做结果判定,继续下一次尝试

③ 图搜索 solve_via_iterative_exploration()

  • 从起点开始
  • 对每个已发现格子的四个方向做 probe_move()
  • decision == True:
    • 记成通路边,加入图
  • decision == False:
    • 记成墙
  • decision == None:
    • 暂时放回队列,后续再试

④ 最短路径

每次图有新边之后,用 shortest_path() 在当前已恢复图上跑 BFS:

  • 找到起点到终点的最短路
  • 一旦终点连通,再 replay 一次路径
  • 从返回缓冲取 credential

通过如下命令执行算法(需要加载驱动)

python shadowgate_solve.py --solve

得到输出

maze: 13x13 start=(0,0) exit=(12,12)
events: ok=True wall=True
semaphores: ok=True wall=True
path=DDDDDDSSDDDDWWDDSSSSSSSSAASSSSDD
credential=flag{SHAD0WNT_HYPERVMX}

得到了最短路径和 flag,具体执行时间有差异,在我的设备上应该在 70s 左右

五、构建地图

在前者的基础上设计建图脚本

⑤ 完整建图 explore_full_map()

核心数据结构:

  • open_edges
    • 已确认通路边
  • walls
    • 已确认墙边
  • discovered
    • 已发现可达格子
  • tasks
    • 尚未确认的候选边 (cell, move)
  • attempts
    • 每条候选边已经回炉过多少次

流程如下:

  • 从起点开始,把起点四个方向加入任务队列
  • 每次从队列中取出一条候选边 (cell, move)
  • 用 shortest_path(open_edges, start, cell) 求从起点到该格子的当前已知路径
  • 调用 probe_move() 判断这条边
  • 若 decision == True
    • 将这条边加入 open_edges
    • 把新到达的格子加入 discovered
    • 再把这个新格子的候选边加入任务队列
  • 若 decision == False
    • 将该边加入 walls
  • 若 decision == None
    • 表示这次只命中 slot5 或暂时未定
    • 该边最多回炉有限次数,再次入队

⑥ 地图收敛与最短路径

完整建图跑完后,脚本会得到完整的 ASCII 迷宫地图,open_edges,随后脚本再用 shortest_path() 在完整恢复的图上跑 BFS,得到起点到终点的最短路,并replay 一次,从返回缓冲中提取 credential。

通过如下命令执行算法(需要加载驱动)

python shadowgate_map.py --map

得到输出

maze: 13x13 start=(0,0) exit=(12,12)
events: ok=True wall=True
semaphores: ok=True wall=True
###########################
#S . . . . . .#.#. . . . .#
############# ####### ### #
#.#.#.#.#.#.#.#.#.#.#.#.#.#
############# ####### ### #
#. . . . .#.#. . . . .#.#.#
# ####### ############### #
#.#.#.#.#.#.#.#.#.#.#.#.#.#
# ####### ############### #
#.#.#. . . . . . . . .#.#.#
# ### ### ########### ### #
#.#.#.#.#.#.#.#.#.#.#.#.#.#
# ### ### ########### ### #
#.#.#.#.#.#.#. . .#.#.#.#.#
# ### ####### ### ####### #
#.#.#.#.#.#.#.#.#.#.#.#.#.#
# ### ####### ### ####### #
#.#.#. . . . .#.#.#.#. . .#
# ############### ### #####
#.#.#.#.#.#.#.#.#.#.#.#.#.#
# ############### ### #####
#. . .#.#. . .#.#.#.#.#.#.#
##### ### ### ### ### ### #
#.#.#.#.#.#.#.#.#.#.#.#.#.#
##### ### ### ### ### ### #
#. . . . .#.#.#.#. . . . E#
###########################
path=DDDDDDSSDDDDWWDDSSSSSSSSAASSSSDD
credential=flag{SHAD0WNT_HYPERVMX}
discovered=97
unresolved=0

得到了地图、最短路径和 flag,具体执行时间有差异,在我的设备上应该在 600s 左右(上个厕所回来就跑出来了)


[培训]Windows内核深度攻防:从Hook技术到Rootkit实战!

最后于 1小时前 被江树编辑 ,原因:
上传的附件:
收藏
免费 0
支持
分享
最新回复 (1)
雪    币: 3624
活跃值: (6417)
能力值: ( LV5,RANK:65 )
在线值:
发帖
回帖
粉丝
2
厉害厉害
58分钟前
0
游客
登录 | 注册 方可回帖
返回