from boofuzz import *
class EmptyConnection(TCPSocketConnection):
def __init__(self, host, port):
super().__init__(host, port)
def open(self):
pass
def close(self):
pass
def recv(self, _=1024):
return b""
def send(self, data):
print(f"Sending fuzz data: {data}")
client.publish("v1/event/computer/resource/000c29963b29", data)
pass
empty_connection = EmptyConnection("localhost", 0)
target = Target(connection=empty_connection)
session = Session(target=target)
while True:
try:
s_initialize("MQTT Fuzz")
with s_block('{"body":'):
s_static('{"body":')
s_static('{"cpu":')
s_static('0.004999999888241291')
s_static(',"mem":')
s_string('0.244140625',fuzzable=True)
s_static(',"network":{"receive":')
s_string('14.853154182434082',fuzzable=True)
s_static(',"send":')
s_string('9.8922290802001953',fuzzable=True)
s_static('},"state":"')
s_string('normal',fuzzable=True)
s_string('","volumes":{"/dev/sda2":')
s_string('7')
s_string('}},"seq":')
s_static(str(int(round(time.time() * 1000))))
s_static('}')
session.connect(s_get("MQTT Fuzz"))
session.fuzz()
time.sleep(1)
except Exception as e:
print(f"Error during fuzzing: {e}")
break