mmpi
,是一款使用python实现的开源邮件快速检测工具库,基于community
框架设计开发。mmpi
支持对邮件头、邮件正文、邮件附件的解析检测,并输出json检测报告。
mmpi
,代码项目地址:https://github.com/a232319779/mmpi,pypi项目地址https://pypi.org/project/mmpi/
mmpi
,邮件快速检测工具库检测逻辑:
mmpi
的分析判定检测前提:邮件系统环境。脱离邮件环境上下文,检测规则的依据就不可靠了。
备注:windows
安装yara-python
,可以从这里下载
mmpi
完全基于python
开发,使用python
原生email
、html
、zip
库进行解析,基于oletool
做定制化修改,支持对office
文档和rtf
文档的解析,再结合yara
实现对其他文件的检测。
检测规则:压缩包中文件名以.exe结尾,并且中间插入20个以上空格的
检测规则:压缩包中同时存在exe和dll文件
检测规则:RTF文档中存在OLE对象,并且class_name是OLE2Link
或者以equation
开头
结果说明:邮件包含漏洞利用的RTF文档,属于恶意邮件。
$ pip install mmpi
$ mmpi
-
run $email_path
from
mmpi
import
mmpi
def
main():
emp
=
mmpi()
emp.parse(
'test.eml'
)
report
=
emp.get_report()
print
(report)
if
__name__
=
=
"__main__"
:
main()
from
mmpi
import
mmpi
def
main():
emp
=
mmpi()
emp.parse(
'test.eml'
)
report
=
emp.get_report()
print
(report)
if
__name__
=
=
"__main__"
:
main()
{
/
/
固定字段
"headers"
: [],
"body"
: [],
"attachments"
: [],
"signatures"
: []
/
/
动态字段
"vba"
: [],
"rtf"
: [],
}
{
/
/
固定字段
"headers"
: [],
"body"
: [],
"attachments"
: [],
"signatures"
: []
/
/
动态字段
"vba"
: [],
"rtf"
: [],
}
.
├── mmpi
│ ├── common
│ ├── core
│ ├── data
│ │ ├── signatures
│ │ │ ├── eml
│ │ │ ├── html
│ │ │ ├── ole
│ │ │ ├── other
│ │ │ ├── rtf
│ │ │ └──
zip
│ │ ├── white
│ │ └── yara
│ │ ├── exe
│ │ ├── pdf
│ │ └── vba
│ └── processing
└── tests
└── samples
.
├── mmpi
│ ├── common
│ ├── core
│ ├── data
│ │ ├── signatures
│ │ │ ├── eml
│ │ │ ├── html
│ │ │ ├── ole
│ │ │ ├── other
│ │ │ ├── rtf
│ │ │ └──
zip
│ │ ├── white
│ │ └── yara
│ │ ├── exe
│ │ ├── pdf
│ │ └── vba
│ └── processing
└── tests
└── samples
class
PEFakeDocument(Signature):
authors
=
[
"ddvv"
]
sig_type
=
'zip'
name
=
"pe_fake_document"
severity
=
9
description
=
"PE File Fake Document"
def
on_complete(
self
):
results
=
self
.get_results()
for
result
in
results:
if
result.get(
'type'
, '')
=
=
self
.sig_type:
infos
=
result.get(
'value'
, {}).get(
'infos'
, [])
for
info
in
infos:
file_type
=
info.get(
'type'
)
file_name
=
info.get(
'name'
)
space_count
=
file_name.count(
' '
)
if
'exe'
=
=
file_type
and
space_count >
20
:
self
.mark(
type
=
"zip"
, tag
=
self
.name, data
=
info.get(
'name'
))
return
self
.has_marks()
return
None
class
PEFakeDocument(Signature):
authors
=
[
"ddvv"
]
sig_type
=
'zip'
name
=
"pe_fake_document"
severity
=
9
description
=
"PE File Fake Document"
def
on_complete(
self
):
results
=
self
.get_results()
for
result
in
results:
if
result.get(
'type'
, '')
=
=
self
.sig_type:
infos
=
result.get(
'value'
, {}).get(
'infos'
, [])
for
info
in
infos:
file_type
=
info.get(
'type'
)
file_name
=
info.get(
'name'
)
space_count
=
file_name.count(
' '
)
if
'exe'
=
=
file_type
and
space_count >
20
:
self
.mark(
type
=
"zip"
, tag
=
self
.name, data
=
info.get(
'name'
))
return
self
.has_marks()
return
None
class
DLLHijacking(Signature):
authors
=
[
"ddvv"
]
sig_type
=
'zip'
name
=
"dll_hijacking"
severity
=
9
description
=
"DLL Hijacking"
def
on_complete(
self
):
results
=
self
.get_results()
for
result
in
results:
if
result.get(
'type'
, '')
=
=
self
.sig_type:
infos
=
result.get(
'value'
, {}).get(
'infos'
, [])
file_types
=
[info.get(
'type'
)
for
info
in
infos]
if
set
([
'exe'
,
'dll'
]).issubset(file_types):
self
.mark(
type
=
"zip"
, tag
=
self
.name)
return
self
.has_marks()
return
None
class
DLLHijacking(Signature):
authors
=
[
"ddvv"
]
sig_type
=
'zip'
name
=
"dll_hijacking"
severity
=
9
description
=
"DLL Hijacking"
def
on_complete(
self
):
results
=
self
.get_results()
for
result
in
results:
if
result.get(
'type'
, '')
=
=
self
.sig_type:
infos
=
result.get(
'value'
, {}).get(
'infos'
, [])
file_types
=
[info.get(
'type'
)
for
info
in
infos]
if
set
([
'exe'
,
'dll'
]).issubset(file_types):
self
.mark(
type
=
"zip"
, tag
=
self
.name)
return
self
.has_marks()
return
None
class
RTFExploitDetected(Signature):
authors
=
[
"ddvv"
]
sig_type
=
'rtf'
name
=
"rtf_exploit_detected"
severity
=
9
description
=
"RTF Exploit Detected"
def
on_complete(
self
):
results
=
self
.get_results()
for
result
in
results:
if
result.get(
'type'
, '')
=
=
self
.sig_type:
infos
=
result.get(
'value'
, {}).get(
'infos'
, [])
for
info
in
infos:
if
info.get(
'is_ole'
,
False
):
class_name
=
info.get(
'class_name'
, '')
if
class_name
=
=
'OLE2Link'
or
class_name.lower().startswith(
'equation'
):
self
.mark(
type
=
"rtf"
, tag
=
self
.name)
return
self
.has_marks()
return
None
class
RTFExploitDetected(Signature):
authors
=
[
"ddvv"
]
sig_type
=
'rtf'
name
=
"rtf_exploit_detected"
severity
=
9
description
=
"RTF Exploit Detected"
def
on_complete(
self
):
results
=
self
.get_results()
for
result
in
results:
if
result.get(
'type'
, '')
=
=
self
.sig_type:
infos
=
result.get(
'value'
, {}).get(
'infos'
, [])
for
info
in
infos:
if
info.get(
'is_ole'
,
False
):
class_name
=
info.get(
'class_name'
, '')
if
class_name
=
=
'OLE2Link'
or
class_name.lower().startswith(
'equation'
):
[培训]内核驱动高级班,冲击BAT一流互联网大厂工作,每周日13:00-18:00直播授课