-
-
[翻译]漩涡#3:构建团队服务器
-
发表于: 2023-8-23 20:54 10151
-
原文标题:Maelstrom #3: Building the Team Server
原文地址:
https://pre.empt.blog/2023/maelstrom-3-building-the-team-server
由于作者博客迁移的原因,原文中的图片已经失效,我在archive中找到了别人保存下来的网页:https://web.archive.org/web/20221126194518/https://pre.empt.dev/posts/maelstrom-arch-episode-2/
本文由本人用chatgbt翻译而成,Maelstrom是一个概念验证的命令与控制框架,具有Python后端和C++植入程序。
在之前的文章《漩涡#2: C2架构》中,我们讨论了一个命令与控制框架的一般架构,包括从植入程序到服务器的连接以及植入程序本身的执行流程。本文将继续讨论这个架构,并关注用户界面和用户体验方面的C2开发考虑。我们还将(简要地!)讨论如何设计和保护C2的通信渠道。
本文将涵盖以下内容:
正如我们之前提到的,本系列中大部分讨论将集中在C2植入程序上采取的攻击和防御行动上。然而,我们仍然认为值得关注上述列出的要点,因为它们与植入程序的行为和安全性相关。在本文中,我们不会过多地讨论重定向器、通道和红队基础设施的其他部分。
我们处理C2通信的首选方式是使用简单的API。几乎在每个客户端环境中都存在HTTP,而在没有HTTP的情况下,流量可以由中间设备处理(例如前面提到的重定向器和通道,在这里我们不会讨论它们,因为它们非常有趣)。
我们先前选择使用面向对象的方法部分是基于这个决策,因为我们对植入程序建模的方式有助于确定我们的API应该使用的结构。通过在服务器端使用面向对象的方法,我们发现API架构基本上可以自己编写。确保API的编写方式合乎逻辑且可预测,使得将来的更改和添加比起解开"意大利面"式代码要简单得多。
与计算机程序的交互方式有无限种可能,这取决于你希望多么严格。在C2方面,有三个广泛的方式:
这些是广义的分类,值得指出的是,C2绝对可以同时拥有命令行界面和图形界面。有几个C2也支持"虚拟"命令行界面,其中基于浏览器的GUI模仿或传递CLI界面。
这些用户界面不一定与服务器本身紧密绑定。C2,特别是企业C2或旨在成为更广泛红队基础设施的一部分的C2,通常会提供一种与中央"团队服务器"进行交互的独立方式。一个很好的例子是Cobalt Strike,其中主要的团队服务器通过操作员本地机器上的客户端进行远程访问。
尽管这些讨论可能显得有些随意,但界面的设计是重要的。这是运营人员与C2进行交互的唯一方式,所显示和不显示的内容将影响其效果。如果确保流量和植入器安全的唯一方法被埋藏在子菜单中,而存在一个快速而懒惰的选项,运营人员会选择快速和简单的选项。即使在默认选择是一个好选择的情况下,过度依赖单一技术会成为妥协的指标或特征(例如APT28使用的PowerShell持久化技术)。
通过清晰的设计和可用的用户界面,操作安全性不再仅仅是肌肉记忆和符文的问题。
C2矩阵提供了一个名为"Ask The C2 Matrix"的功能,使用户可以根据各种C2组件进行筛选,其中包括用户界面(UI)。正如我们之前提到的,C2可以分为三个大类:命令行界面(CLI)、基于Web的图形用户界面(Web GUI)和客户端图形用户界面(Client GUI)。
让我们来看一些实现不同选项的当代C2框架的例子,以及一些可以帮助您实现类似体验的库:
C2框架:
值得一看的库:
C2框架:
值得一看的库:
C2框架:
值得一看的库:
在现实世界的场景中,服务器软件应该经过测试,以评估其处理请求的及时性。例如,ThePrimeagen在一段名为“Go is faster than Rust??! Go vs Rust vs TypeScript Servers (as a scientist)”的视频中,对Go、Rust和TypeScript进行了负载测试。我们建议为teamserver进行类似的测试。
例如,在视频的5:34左右,Primeagen讨论了一个实验,其中4台Linode机器分别向Golang和Rust服务器建立了800个连接。每个连接都会进行一场游戏,并测量活跃的游戏数量:
Golang 在处理负载时表现出色,同时开发也比较容易。而 Rust 则表现出高低不定,在编写过程中更加困难。尽管这个服务器并非针对恶意软件的团队服务器,但数据还是需要考虑的因素。
鉴于此,我们将选择 Flask/Python 进行工作,纯粹是出于演示的简便性考虑。
在我们开始编码之前,还有一个最后的说明 - 除了我们正在讨论的内容之外,这段代码故意依赖于不安全的行为。Maelstrom 只是为了演示目的而存在。在接下来的几篇博客文章中,一些这些误导将变得明显起来。
在通信方面,应考虑多种协议。
首先是超文本传输协议(HTTP)。HTTP在C2通信中非常适用,因为数据可以嵌入到请求中。这是大多数(如果不是全部)C2都支持的功能。以下是请求中可以嵌入数据的位置:
让我们来看一个例子:
在这个来自tutorialspoint的例子中,一个表单已经被填写并提交到服务器。如果这被用来掩盖C2通信,可以在请求体中添加一个额外的参数:
除了有效传输C2数据外,HTTP还非常适用于使用域前置等技术来混淆服务器位置。然而,这变得越来越困难。这催生了像C3这样的项目。我们将在以后的博客中进一步探讨如何掩盖流量。
说实话,专门为C2通信解释HTTP可能需要写一整篇博客。所以我们就在这里结束HTTP的讨论。但是,这就是我们将在概念验证中使用的通信方式。
域名系统(DNS)是人们常用于C2通信的另一种方法。
这是Cobalt Strike原生支持的功能,可以如下解释:
今天,DNS Beacon可以通过DNS TXT记录、DNS AAAA记录或DNS A记录下载任务。该负载在目标上具有在这些数据通道之间切换的灵活性。使用Beacon的mode命令来更改当前Beacon的数据通道。mode dns表示DNS A记录数据通道,mode dns6表示DNS AAAA记录通道,而mode dns-txt表示DNS TXT记录数据通道。默认情况下,使用的是DNS TXT记录数据通道。
使用DNS作为C2通信的一个问题是速度比HTTP慢得多,但提供了更好的保护,因为这些通道受到的检查比HTTP更少。
Server Message Block (SMB)是另一种用于C2通信的协议。然而,它并不用于传统的数据传输。通常情况下,它用于点对点或植入物之间的通信。为了使其工作,其中一个Beacon需要作为"服务器"运行,通过HTTP/DNS与团队服务器进行通信。这样可以使后续的植入物能够与处于服务器模式的植入物进行通信。这对于绕过入口/出口受限的安全网络以及将多个植入物串联在一起以减少出站流量非常有用。
与DNS类似,我们并未实现SMB协议,但我们建议实现该协议,而DNS则在很大程度上取决于最终用户的用例是否需要。SMB协议的实现可以为C2通信提供额外的选择和功能,特别是在需要在受限网络环境中进行通信或需要将多个植入物连接在一起以减少出站流量时。
Maelstrom是一种快速简便的方法,而且我们不需要考虑用户体验,因为最终用户就是我们自己,而且时间也很短暂。为了让我们的工作更轻松,我们将使用Flask、python-prompt-toolkit和Python 3.9来展示我们的工作过程。Python的优势在于,只要您能够找到让Windows停止从商店安装Python的方法,它就是跨平台的,并且开发速度快。由于Python基本上是可执行的伪代码,这也有助于阐明我们的观点。
既然我们给您提供了多种选择,让我们来看看Maelstrom将如何工作。
首先是服务器端。这是植入物将响应的组件。因此,当在终端中运行maelstrom.py
时,将打印出以下内容:
有两个选项:
然而,我们只实现服务器部分:
让我们逐步了解run_maelstrom()
函数...
它的第一步是获取可用命令:
get_commands()
函数的位置在:
该函数解析available_commands
的返回值,并构建一个包含所需信息的字典。
在Maelstrom中,这些命令在某种程度上是硬编码的,限制了其可扩展性:
请注意Command
对象,它是一个数据类(dataclass):
这种方法并没有提供任何灵活性。在Vulpes的情况下,使用了工厂设计模式。这使得服务器能够以编程方式识别命令、所需的信息以及前后操作等。我们之所以指出这一点,是因为为了使一个C2工具有用,它需要具备可扩展性。Maelstrom并不支持这一点。
那么,再次来看get_commands()
函数:
该函数这样重新构建字典的原因是因为Maelstrom使用了Prompt Toolkit,并且命令会自动填充嵌套字典的信息。
运行服务器并输入help
会显示如下内容:
这些与之前看到的数据类相匹配:
由于这是服务器的懒加载实现,接下来会进入一个大的while循环,它提供一个提示符并解析命令:
maelstrom.py仅提供创建监听器和显示连接的植入器的功能。然而,让我们简要讨论一下后渗透(我们稍后在系列中会更深入地讨论这个话题)。
为了讨论方便,我们以whoami
命令作为示例。如果你在C2矩阵上选择一个不是.NET的C2,你有90%的机会得到以下结果:
这将随后生成以下进程树:
这意味着每次运行时,它会生成两个进程,并产生T1059/003:命令和脚本解释器:Windows命令行。这将要求代码使用system()或CreateProcessA(),以cmd.exe
作为目标,并使用/c whoami
作为参数。总体而言,这为一个如此简单的命令产生了大量的IOC(指示对象)。而实际上,whoami
只是调用了GetUserNameA函数:
那么,为什么不直接使用GetUserNameA函数呢?
此外,当大多数人收到一个植入物时,他们的第一反应通常是执行ls
或whoami
等命令。通过重新实现这些命令,潜在的IOC(指示对象)大大减少了。这种说法促使了项目的诞生,如CS-Situational-Awareness-BOF、CS-Remote-OPs-BOF和C2-Tool-Collection。
想要知道主机名?那就使用GetComputerNameA()函数。想要获取当前进程的目录?可以从PEB(进程环境块)中获取:
这些显然是简单的命令,但是假设Kerberoast是一个内部命令。那么,这些操作是可以用C语言编写的,毕竟Windows本身就是用C语言编写的:c2-tool-collection/BOF/Kerberoast。
关于这个问题,我们稍后会在后续的部分进行更多的讨论。现在,让我们继续谈谈监听器!
让我们来看看我们是如何实现监听器创建的。如果用户在命令行界面上输入的启动命令以listener
开头,那么将调用handle_listener()
函数:
首先它会尝试找到一个空格来假设输入是正确的:
如果这一步没有问题,它会在空格处进行分割,并计算长度。如果长度不等于5,则说明命令错误:
请注意这里的usage
:
一个有效的输入示例可能是:
由于第一个索引(0)应该是命令名称,它尝试对其进行验证:
像这样进行验证:
返回的数据类并没有被使用,只是用来检查命令是否有效。现在命令是有效的,可以使用Listener数据类将信息传递给类,就像这样:
数据类的位置在:
然后将其附加到全局列表中:
显然,这种方法不是持久的。实际上,这应该是一个数据库,在服务器重新启动时进行检查和恢复:
接下来,将数据类用作线程的参数:
这在start_server()
函数中创建了一个线程,使用add_url_rule将指定的端点与Flask应用程序注册起来:
这样完成了监听器的创建过程:
可以通过运行以下命令进行验证:
这将产生类似以下的输出:
植入物如何与服务器通信呢?
正如我们所说的,我们并没有实现任何使这个C2变得有用的东西;因此,我们将忽略这个组件。但是,我们仍然希望讨论它。Maelstrom的帮助菜单有一个位置参数,这意味着实际的server.py文件需要再次运行,并带上有效载荷
开关。这并不完全符合用户体验的最佳实践,更好的工作流程是从C2内部进行操作。例如,作为Web应用程序,这很容易实现。可以使用像我们在Vulpes中所做的那样的模态框,或者像Cobalt Strike中的弹出窗口。如果是一个命令行界面(CLI),那么可能可以在C2内部使用一种交互式提示形式,它可以像之前展示的那样作为命令工作。
当涉及到自动化有效载荷生成时,我们发现最简单的方法是编程方式创建一个Makefile,并将所有源代码文件复制/粘贴到/tmp
目录中,然后使用类似以下的方式执行Makefile
命令:
这将使用MinGW-w64进行编译,然后移动编译后的植入物,并删除临时目录。
在接下来的博文中,我们将详细介绍有关有效载荷生成的注意事项。
在文章的这一部分,我们有一个正在运行的团队服务器,可以创建监听器。现在我们需要看看端点实际上如何处理传入的流量,以便:
在文章前面的部分,我们展示了如何将监听器作为一个新的应用程序启动,就像这样:
典型的 Flask 方式是将 app
定义为全局变量:
并且创建一个处理所有请求的/
路由函数。我们可以为每种类型创建一个端点,但我们发现在根路径监听,并使用逻辑进行过滤更容易。这样可以记录所有请求,同时通过期望的值仔细过滤请求。
路由函数:
让我们来看一下这个路由内部发生了什么。在前面的部分,我们在/a
上创建了一个端点,所以我们将使用那个例子。
这里首先从请求对象中提取了一些信息,这些信息可能会被使用,也可能不会被使用:
请注意request.headers.get
的调用。这是在客户端和服务器之间发送的硬编码值之一。
然后将这些信息传递给is_valid_listener
函数:
这仅仅是通过确保URI、服务器地址、服务器端口和标头的正确性,以确保数据类信息与请求相匹配:
这是一个非常基础的示例,用于确保请求的完整性...
如果请求有效,那么接下来的代码将确定这是哪种类型的请求:
在这种情况下,只有两个选项:
在实际的例子中,init
和stage
并不是好的端点。但这可以用来说明我们的观点。然后紧接着是:
如果请求是init类型,那么initialize_implant()
函数只是解析信息并打印一个新的连接,仅此而已:
在这种情况下,错误处理很差,没有数据库跟踪,没有任何东西。实际上,这里需要进行日志记录。无论请求的类型是什么,都应该将其记录到统一的格式中。根据我们项目的经验,通常最好使用一个文件,其中每一行都是一个JSON对象。
这样的日志文件可以通过以下两种方式轻松处理:
或者使用POST
请求:
根据项目的不同,我们将使用其中一种方法。
接下来是暂存(staging)端点,然后返回一个硬编码的路径。
再次强调,这是有意为之的。我们并不关心让它变得复杂,这只是一个概念验证。在接下来的部分,我们将介绍一些掩盖动态链接库(DLL)的方法,这就是那些逻辑应该放置的地方。但是,对于Maelstrom而言,我们只是返回了一个硬编码的DLL路径的字节。
从这里开始,可以实现多个开关来执行特定的任务,比如获取新任务或返回已执行任务的信息。每个端点可以以不同的配置进行设置,从而在通信中提供灵活性。
总结一下,在本节中,我们展示了两个硬编码的端点——"init"和"stage",并根据预期的信息对请求进行了筛选。为了阐明我们的观点,我们仅使用了X-Maelstrom
作为标头。但实际上,这可以是诸如ASPSESSIONID
之类的真实标头。我们所做的只是确保只有非常特定的请求才能与团队服务器进行通信。
使用Wireshark,让我们检查我们的HTTP请求。以下过滤器将找到C2和开发机器之间的所有HTTP请求:
请记住,这个过滤器只是用于暂存(stage)操作,没有其他额外的请求:
上述显示了对/a?stage
URI的请求,以及我们配置不良的标头信息:
由于这是一个暂存(stage)请求,如果我们在TCP流中跟踪它...
以Snort为例,我们可以使用以下规则匹配该标头并立即识别Maelstrom:
这个规则看起来类似于这样:
确实,这里可以进行大量的改进,并且随着各种伪装和加密技术的引入,情况可能会变得非常复杂。然而,并不一定需要那么复杂(尽管有些人可能需要)。只要HTTP请求可以完全由最终用户自定义,那么请求可以按需求进行构建。对于那些需要处理所有日志的复杂任务,可以调整请求以满足需求。不需要任何特殊处理?那也没问题,只需请求/stage
即可。
这是Cobalt Strike在其可塑的指挥与控制(Command and Control,C2)中做得很好的一点,特别是在HTTP暂存(staging)方面,请求可以按需进行配置:
特别出色的是它们的链式数据操作能力:
在上述示例中,使用了GIF89a
类型,它使用了GIF的魔术字节,并将其添加到末尾。或者,也可以通过附加PNG的魔术字节来实现。
以下是一个使用PDF的魔术字节的示例实现:
在CyberChef上将其转换为字节:
使用以下命令将这些字节添加到一个EXE文件的开头:
最后,对两个文件运行file命令进行检查:
现在需要做的是从缓冲区中删除前8个字节......
从用户体验的角度来看,服务器可以轻松地扩展这些行为。操作员对这些步骤应该是透明的,但应用它们应该是直观的。旧版的C2更依赖于用户直接自定义这种行为,通过修改植入物(implants)和投放器(droppers),如果有可修改性的话。大多数C2可以改变终端节点,但即使这样做也较为罕见。
我们在前面的帖子中提到的支持可扩展的植入物开发方法,使用简单的软件开发实践,极大地提高了植入物的操作安全性。像多态代码和军用级加密这样的流行词在讨论这些步骤时并不有助于理解,因为很多当代的C2仍然依赖于这些操作安全行为的硬编码。这导致红队只能使用一种操作安全植入物和一个C2,而不能使用一个具有生成无限操作安全植入物能力的模块化步骤库的C2。
例如,可以将以下行为添加到C2的载荷生成中,以进一步混淆植入物数据:
显然,这些只是非常基础的示例,但它们证明了观点。蓝队知道,在从以前从未见过的Azure和AWS实例之间传输的高熵数据块中,存在一些可疑的因素。随着现场流量检查的普及,我们预计C2将转向更模块化的方法,植入物可以实时切换数据混淆和加密方式,从而降低蓝队识别植入物的能力。
这篇文章在谈到蓝队方面的内容上比较简略,原因很简单——在硬编码植入物操作安全领域,即使不考虑目的地、EDR标志或其他启发式因素,已经很难在网络层面上进行识别。在像Microsoft Teams这样的已建立通道上,每20分钟到2小时发送一次恶意请求,要想找到其中的一个就像是在由针组成的一堆针中寻找一根针。只有通过密切关注终端遥测数据,对所有网络流量进行深度包检查来寻找特定内容,当进程被标记为可疑时才能进行识别。确保EDR和网络设备具备最新的域名声誉和威胁情报源,并适当地进行摄取,可能在一定程度上有助于弥合这个差距。
总结一下:
这篇文章更像是一个参考文档,具体的实施方法取决于用户的需求和经验。对于Maelstrom而言,它只是一个基本的Python命令行界面(CLI),使用了Prompt Toolkit库。
现在,一切都准备就绪,我们可以开始编写植入物的代码了!
我们在开头提到,不会涉及渠道、重定向器或其他红队基础设施的部分。作为一些值得一提的补充,以下三个博客是一个很好的起点:
GET
/
endpoint
/
form HTTP
/
1.1
User
-
Agent: Mozilla
/
4.0
(compatible; MSIE5.
01
; Windows NT)
Host: www.example.com
Content
-
Type
: application
/
x
-
www
-
form
-
urlencoded
Content
-
Length: length
Accept
-
Language: en
-
us
Accept
-
Encoding: gzip, deflate
Connection: Keep
-
Alive
licenseID
=
string&content;
=
string&
/
paramsXML
=
string
GET
/
endpoint
/
form HTTP
/
1.1
User
-
Agent: Mozilla
/
4.0
(compatible; MSIE5.
01
; Windows NT)
Host: www.example.com
Content
-
Type
: application
/
x
-
www
-
form
-
urlencoded
Content
-
Length: length
Accept
-
Language: en
-
us
Accept
-
Encoding: gzip, deflate
Connection: Keep
-
Alive
licenseID
=
string&content;
=
string&
/
paramsXML
=
string
licenseID
=
string&content;
=
string&
/
paramsXML
=
string&C2Info;
=
AAAAAA
licenseID
=
string&content;
=
string&
/
paramsXML
=
string&C2Info;
=
AAAAAA
usage: Maelstrom [
-
h] {server,payload} ...
positional arguments:
{server,payload} Start Server | Generate Payloads
server Start the Maelstrom Server
payload Generate a Maelstrom Payload
optional arguments:
-
h,
-
-
help
show this
help
message
and
exit
Example:
-
Run Server:
python3 maelstrom.py server
-
Generate Payload:
python3 maelstrom.py payload
usage: Maelstrom [
-
h] {server,payload} ...
positional arguments:
{server,payload} Start Server | Generate Payloads
server Start the Maelstrom Server
payload Generate a Maelstrom Payload
optional arguments:
-
h,
-
-
help
show this
help
message
and
exit
Example:
-
Run Server:
python3 maelstrom.py server
-
Generate Payload:
python3 maelstrom.py payload
def
main()
-
>
None
:
args
=
Args.get_args()
if
args.which
=
=
'server'
:
run_maelstrom()
return
if
__name__
=
=
"__main__"
:
main()
def
main()
-
>
None
:
args
=
Args.get_args()
if
args.which
=
=
'server'
:
run_maelstrom()
return
if
__name__
=
=
"__main__"
:
main()
commands:
dict
=
get_commands()
commands:
dict
=
get_commands()
def
get_commands()
-
>
dict
:
"""convert the list of commands into the complete data structure"""
cs:
dict
=
{}
commands:
list
=
available_commands()
for
command
in
commands:
cs[command.name]
=
command.sub_args
return
cs
def
get_commands()
-
>
dict
:
"""convert the list of commands into the complete data structure"""
cs:
dict
=
{}
commands:
list
=
available_commands()
for
command
in
commands:
cs[command.name]
=
command.sub_args
return
cs
def
available_commands()
-
>
list
:
"""Store all the commands and return as a list of dataclasses"""
exit_cmd: Command
=
Command(
'exit'
,
'Exit Maelstrom'
,
'Exit Maelstrom'
,
None
)
help_cmd: Command
=
Command(
'help'
,
'Show all the available commands'
,
'Show all the available commands'
,
None
)
set_cmd: Command
=
Command(
'listener'
,
'Start a new listener'
,
'Start a new listener by specifying a host, port, uri, and password'
,
None
)
show_cmd: Command
=
Command(
'show'
,
'Show listeners|implants'
,
'Show listeners|implants'
,
{
"listeners"
:
None
,
"implants"
:
None
}
)
# return all command dataclasses as a list
return
[exit_cmd, help_cmd, set_cmd, show_cmd]
def
available_commands()
-
>
list
:
"""Store all the commands and return as a list of dataclasses"""
exit_cmd: Command
=
Command(
'exit'
,
'Exit Maelstrom'
,
'Exit Maelstrom'
,
None
)
help_cmd: Command
=
Command(
'help'
,
'Show all the available commands'
,
'Show all the available commands'
,
None
)
set_cmd: Command
=
Command(
'listener'
,
'Start a new listener'
,
'Start a new listener by specifying a host, port, uri, and password'
,
None
)
show_cmd: Command
=
Command(
'show'
,
'Show listeners|implants'
,
'Show listeners|implants'
,
{
"listeners"
:
None
,
"implants"
:
None
}
)
# return all command dataclasses as a list
return
[exit_cmd, help_cmd, set_cmd, show_cmd]
from
dataclasses
import
dataclass, field
@dataclass
(order
=
True
)
class
Command:
"""Dataclass to hold the command info"""
# command name
name:
str
# command description (long)
long_desc:
str
# command description (short)
short_desc:
str
# sub arguments
sub_args:
dict
# allow it to be sorted on size
def
__post__init(
self
):
self
.sort_index
=
self
.size
from
dataclasses
import
dataclass, field
@dataclass
(order
=
True
)
class
Command:
"""Dataclass to hold the command info"""
# command name
name:
str
# command description (long)
long_desc:
str
# command description (short)
short_desc:
str
# sub arguments
sub_args:
dict
# allow it to be sorted on size
def
__post__init(
self
):
self
.sort_index
=
self
.size
def
get_commands()
-
>
dict
:
"""convert the list of commands into the complete data structure"""
cs:
dict
=
{}
commands:
list
=
available_commands()
for
command
in
commands:
cs[command.name]
=
command.sub_args
return
cs
def
get_commands()
-
>
dict
:
"""convert the list of commands into the complete data structure"""
cs:
dict
=
{}
commands:
list
=
available_commands()
for
command
in
commands:
cs[command.name]
=
command.sub_args
return
cs
exit_cmd: Command
=
Command(
'exit'
,
'Exit Maelstrom'
,
'Exit Maelstrom'
,
None
)
help_cmd: Command
=
Command(
'help'
,
'Show all the available commands'
,
'Show all the available commands'
,
None
)
set_cmd: Command
=
Command(
'listener'
,
'Start a new listener'
,
'Start a new listener by specifying a host, port, uri, and password'
,
None
)
show_cmd: Command
=
Command(
'show'
,
'Show listeners|implants'
,
'Show listeners|implants'
,
{
"listeners"
:
None
,
"implants"
:
None
}
)
exit_cmd: Command
=
Command(
'exit'
,
'Exit Maelstrom'
,
'Exit Maelstrom'
,
None
)
help_cmd: Command
=
Command(
'help'
,
'Show all the available commands'
,
'Show all the available commands'
,
None
)
set_cmd: Command
=
Command(
'listener'
,
'Start a new listener'
,
'Start a new listener by specifying a host, port, uri, and password'
,
None
)
show_cmd: Command
=
Command(
'show'
,
'Show listeners|implants'
,
'Show listeners|implants'
,
{
"listeners"
:
None
,
"implants"
:
None
}
)
# bool used to control the while loop
execute:
bool
=
True
# username to print
username:
str
=
getpass.getuser()
# run until told otherwise
while
execute:
# get the user input
user_input:
str
=
prompt(f
'[{username}]> '
, completer
=
completer)
# if nothing is passed, just continue
if
not
user_input:
continue
# Exit
if
user_input.startswith(
'exit'
):
execute
=
False
# Help
elif
user_input.startswith(
'help'
):
print_help(user_input)
# Listener
elif
user_input.startswith(
'listener'
):
handle_listener(user_input)
elif
user_input.startswith(
'show'
):
handle_show(user_input)
return
# bool used to control the while loop
execute:
bool
=
True
# username to print
username:
str
=
getpass.getuser()
# run until told otherwise
while
execute:
# get the user input
user_input:
str
=
prompt(f
'[{username}]> '
, completer
=
completer)
# if nothing is passed, just continue
if
not
user_input:
continue
# Exit
if
user_input.startswith(
'exit'
):
execute
=
False
# Help
elif
user_input.startswith(
'help'
):
print_help(user_input)
# Listener
elif
user_input.startswith(
'listener'
):
handle_listener(user_input)
elif
user_input.startswith(
'show'
):
handle_show(user_input)
return
cmd.exe
/
c whoami
cmd.exe
/
c whoami
-
> implant.exe
-
> cmd.exe
-
> whoami.exe
-
> implant.exe
-
> cmd.exe
-
> whoami.exe
BOOL
GetUserNameA(
[out] LPSTR lpBuffer,
[
in
, out] LPDWORD pcbBuffer
);
BOOL
GetUserNameA(
[out] LPSTR lpBuffer,
[
in
, out] LPDWORD pcbBuffer
);
CHAR lpUserName[MAX_PATH];
DWORD nSize
=
MAX_PATH;
if
(!GetUserNameA(lpUserName, &nSize))
{
return
NULL;
}
CHAR lpUserName[MAX_PATH];
DWORD nSize
=
MAX_PATH;
if
(!GetUserNameA(lpUserName, &nSize))
{
return
NULL;
}
std::string GetProcessCurrentDirectory() {
PRTL_USER_PROCESS_PARAMETERS processParams
=
(PRTL_USER_PROCESS_PARAMETERS)_pPEB
-
>ProcessParameters;
UNICODE_STRING us
=
processParams
-
>CurrentDirectoryPath;
std::wstring ws(us.
Buffer
, us.Length
/
2
);
std::string ss(ws.begin(), ws.end());
return
ss;
}
std::string GetProcessCurrentDirectory() {
PRTL_USER_PROCESS_PARAMETERS processParams
=
(PRTL_USER_PROCESS_PARAMETERS)_pPEB
-
>ProcessParameters;
UNICODE_STRING us
=
processParams
-
>CurrentDirectoryPath;
std::wstring ws(us.
Buffer
, us.Length
/
2
);
std::string ss(ws.begin(), ws.end());
return
ss;
}
elif
user_input.startswith(
'listener'
):
handle_listener(user_input)
elif
user_input.startswith(
'listener'
):
handle_listener(user_input)
def
handle_listener(user_input:
str
)
-
>
None
:
usage
=
'Usage: listener <host> <port> <base uri> <password>'
if
' '
not
in
user_input:
print
(usage)
return
def
handle_listener(user_input:
str
)
-
>
None
:
usage
=
'Usage: listener <host> <port> <base uri> <password>'
if
' '
not
in
user_input:
print
(usage)
return
# split out the command
split:
list
=
user_input.split(
' '
)
# make sure all the components are passed
if
len
(split) !
=
5
:
print
(usage)
return
# split out the command
split:
list
=
user_input.split(
' '
)
# make sure all the components are passed
if
len
(split) !
=
5
:
print
(usage)
return
usage
=
'Usage: listener <host> <port> <base uri> <password>'
usage
=
'Usage: listener <host> <port> <base uri> <password>'
listener
127.0
.
0.1
5555
/
some
-
uri PassWoRd1
listener
127.0
.
0.1
5555
/
some
-
uri PassWoRd1
# use the first element to get the comand object
command: Command
=
get_a_command(split[
0
])
# if it doesnt exist, return
if
not
command:
print
(
'Failed to find command!'
)
return
# use the first element to get the comand object
command: Command
=
get_a_command(split[
0
])
# if it doesnt exist, return
if
not
command:
print
(
'Failed to find command!'
)
return
def
get_a_command(select:
str
)
-
> Command:
"""Get a specific command"""
return
[c
for
c
in
available_commands()
if
select
=
=
c.name]
def
get_a_command(select:
str
)
-
> Command:
"""Get a specific command"""
return
[c
for
c
in
available_commands()
if
select
=
=
c.name]
listener: Listener
=
Listener(random_string(), split[
0
], split[
1
], split[
2
], split[
3
])
listener: Listener
=
Listener(random_string(), split[
0
], split[
1
], split[
2
], split[
3
])
@dataclass
(order
=
True
)
class
Listener:
"""Dataclass to hold the listener info"""
# name
name:
str
# listener address
address:
str
# port
port:
int
# uri
uri:
str
# password
password:
str
@dataclass
(order
=
True
)
class
Listener:
"""Dataclass to hold the listener info"""
# name
name:
str
# listener address
address:
str
# port
port:
int
# uri
uri:
str
# password
password:
str
# add the listener to the list
listeners.append(listener)
# add the listener to the list
listeners.append(listener)
# start a thread pointing at the start_server function
try
:
thread
=
threading.Thread(target
=
start_server, args
=
(listener,))
thread.daemon
=
True
thread.start()
logger.good(f
'Started: [{listener.name}] http://{listener.address}:{listener.port}{listener.uri} ({listener.password})'
)
except
Exception as e:
logger.bad(f
'Failed to start listener: {str(e)}'
)
return
# start a thread pointing at the start_server function
try
:
thread
=
threading.Thread(target
=
start_server, args
=
(listener,))
thread.daemon
=
True
thread.start()
logger.good(f
'Started: [{listener.name}] http://{listener.address}:{listener.port}{listener.uri} ({listener.password})'
)
except
Exception as e:
[培训]内核驱动高级班,冲击BAT一流互联网大厂工作,每周日13:00-18:00直播授课
赞赏
- Android主流的污点分析工具 1570
- [翻译]“过滤-静音”操作:调查EDR内部通信 12175
- [翻译]Windows授权指南 6006
- [翻译]利用Android WebView漏洞 8020
- [翻译]伪造调用堆栈以混淆EDR 11720