from
burp
import
IBurpExtender
from
burp
import
IHttpListener
from
burp
import
IRequestInfo
from
burp
import
IParameter
from
burp
import
IBurpExtenderCallbacks
from
java.io
import
PrintWriter
import
base64
from
hashlib
import
md5
import
time
HOST_FROM
=
"yy.xx.com"
class
BurpExtender(IBurpExtender, IHttpListener):
def
registerExtenderCallbacks(
self
, callbacks):
self
._helpers
=
callbacks.getHelpers()
self
._stdout
=
PrintWriter(callbacks.getStdout(),
True
)
callbacks.setExtensionName(
"zz resign plugin"
)
callbacks.registerHttpListener(
self
)
def
processHttpMessage(
self
, toolFlag, messageIsRequest, messageInfo):
if
not
messageIsRequest:
return
httpService
=
messageInfo.getHttpService()
if
(HOST_FROM !
=
httpService.getHost()):
return
if
(IBurpExtenderCallbacks.TOOL_REPEATER !
=
toolFlag)
or
(IBurpExtenderCallbacks.TOOL_INTRUDER !
=
toolFlag):
return
requestInfo
=
self
._helpers.analyzeRequest(messageInfo)
path
=
requestInfo.getUrl().getPath()
self
._stdout.println(
"path=%s"
%
path)
if
not
path.startswith(
"/xx/"
):
return
self
.resign(messageInfo, check_sign
=
False
)
def
resign(
self
, messageInfo, check_sign
=
False
):
requestInfo
=
self
._helpers.analyzeRequest(messageInfo)
method
=
requestInfo.getMethod()
path
=
requestInfo.getUrl().getPath()
parameters
=
requestInfo.getParameters()
body_offset
=
requestInfo.getBodyOffset()
url_param_dist
=
{}
for
param
in
parameters:
if
IParameter.PARAM_URL
=
=
param.getType():
k
=
param.getName()
v
=
param.getValue()
v
=
self
._helpers.urlDecode(v)
self
._stdout.println(
"%s=%s"
%
(k, v))
request
=
messageInfo.getRequest()
body
=
request[body_offset:]
body
=
self
._helpers.bytesToString(body)
self
._stdout.println(
"body=%s"
%
body)
s
=
'{}-{}'
.
format
(method, path)
old_sign
=
""
new_time_sign
=
None
new_ts
=
str
(
int
(time.time()))
if
not
check_sign:
url_param_dist[
"_ts"
]
=
new_ts
if
"XX"
in
url_param_dist:
s
=
str
(url_param_dist[
"XX"
])
+
time.strftime(
'%Y%m%d'
)
new_time_sign
=
md5(s.encode()).hexdigest()
url_param_dist[
"sign"
]
=
new_time_sign
arguments
=
url_param_dist
keys
=
list
(arguments.keys())
keys.sort()
for
k
in
keys:
if
k
=
=
'_sign'
:
old_sign
=
arguments[k]
continue
val
=
arguments[k]
if
type
(val)
=
=
str
or
type
(val)
=
=
unicode
or
type
(val)
=
=
int
:
s
+
=
'&{}={}'
.
format
(k, val)
elif
type
(val)
=
=
list
:
for
v
in
val:
s
+
=
'&{}={}'
.
format
(k, v)
else
:
self
._stdout.println(
'params unknown type:{}, key:{}'
.
format
(
type
(val), k))
return
s
+
=
'&json={}'
.
format
(body
or
'')
digest
=
md5(s.encode()).hexdigest()
new_sign
=
base64.urlsafe_b64encode(digest)[:
-
2
]
request
=
self
._helpers.urlDecode(request)
if
not
check_sign:
new_request
=
self
._helpers.updateParameter(request,
self
._helpers.buildParameter(
"ts"
, new_ts, IParameter.PARAM_URL))
new_request
=
self
._helpers.updateParameter(new_request,
self
._helpers.buildParameter(
"_sign"
, new_sign, IParameter.PARAM_URL))
if
new_time_sign:
new_request
=
self
._helpers.updateParameter(new_request,
self
._helpers.buildParameter(
"sign"
, new_time_sign, IParameter.PARAM_URL))
messageInfo.setRequest(new_request)
else
:
if
new_sign !
=
old_sign:
self
._stdout.println(
"error! new_sign != old_sign"
)
else
:
self
._stdout.println(
"success! new_sign == old_sign"
)