-
-
[原创]基于深度学习的恶意软件分类器(三)
-
2022-4-7 22:40 11056
-
一.前言
1.实验内容
以Windows PE结构中的一些信息作为输入数据来构建恶意软件分类器,所用数据集只对相关软件是否为恶意软件进行标注,并没有标注恶意软件的具体类别。因此,本次实验是一个二分类任务,因为没找到合适的论文,所以就自己构建了一个简单的神经网络用来构建分类器。
2.实验环境
Python版本:3.6.13
Pytorch版本:1.8.1
CUDA版本:11.4
二.数据集
公开数据集EMBER为网络安全公司Endgame于2018年发布的,训练集中共有6W条数据集,其中3W为恶意软件样本,3W为良性文件样本,2W未标记样本,测试集中共有2W条数据集,其中1W恶意软件样本,1W良性文件样本,所有数据均通过提取相应软件的PE信息来获得的。数据集的下载地址为:https://ember.elastic.co/ember_dataset_2018_2.tar.bz2,作者也在github上对该数据集进行了相关说明:https://github.com/endgameinc/ember。
每一条数据都是JSON数据格式,简略看如下:
{"sha256": "0abb4fda7d5b13801d63bee53e5e256be43e141faa077a6d149874242c3f02c2", "md5": "63956d6417f8f43357d9a8e79e52257e", "appeared": "2006-12", "label": 0, "avclass": "", "general": {"size": 3101705, "vsize": 380928, "has_debug": 0, "exports": 0, "imports": 156, "has_relocations": 0, "has_resources": 1, "has_signature": 0, "has_tls": 0, "symbols": 0}, "header": {"coff": {"timestamp": 1124149349, "machine": "I386", "characteristics": ["CHARA_32BIT_MACHINE", "RELOCS_STRIPPED", "EXECUTABLE_IMAGE", "LINE_NUMS_STRIPPED", "LOCAL_SYMS_STRIPPED"]}, "optional": {"subsystem": "WINDOWS_GUI", "dll_characteristics": [], "magic": "PE32", "major_image_version": 0, "minor_image_version": 0, "major_linker_version": 7, "minor_linker_version": 10, "major_operating_system_version": 4, "minor_operating_system_version": 0, "major_subsystem_version": 4, "minor_subsystem_version": 0, "sizeof_code": 26624, "sizeof_headers": 1024, "sizeof_heap_commit": 4096}}, "section": {"entry": ".text", "sections": [{"name": ".text", "size": 26624, "entropy": 6.532239617101003, "vsize": 26134, "props": ["CNT_CODE", "MEM_EXECUTE", "MEM_READ"]}, {"name": ".rdata", "size": 6656, "entropy": 5.433081641309689, "vsize": 6216, "props": ["CNT_INITIALIZED_DATA", "MEM_READ"]}, {"name": ".data", "size": 512, "entropy": 1.7424160994148217, "vsize": 172468, "props": ["CNT_INITIALIZED_DATA", "MEM_READ", "MEM_WRITE"]}, {"name": ".rsro", "size": 0, "entropy": -0.0, "vsize": 135168, "props": ["CNT_UNINITIALIZED_DATA", "MEM_READ", "MEM_WRITE"]}, {"name": ".rsrc", "size": 27648, "entropy": 5.020929764194735, "vsize": 28672, "props": ["CNT_INITIALIZED_DATA", "MEM_READ"]}]}, ....}
JSON格式中的label代表了样本的类别,1为恶意样本,0为正常样本,-1为未标记样本。每一条数据都包含了大量信息,包括导入表,导出表,节区中的二进制序列等,不过本实验只用到以下12个PE头中的信息用来作为输入数据:
optional->sizeof_code
optional->sizeof_headers
optional->sizeof_heap_commit
general->size
general->vsize
general->has_debug
general->has_relocations
general->has_resouces
general->has_signature
general->has_tls
general->symbols
节区的数目
最新的数据里面训练集被拆成了6个文件来分别存放,按顺序读取文件以后,就需要按照JSON文件格式来解析获取输入数据,以下是参考代码,这里舍弃了未标记数据,且每条数据都对255进行求余来进行归一化。
import torch from torch.utils.data import Dataset import pandas as pd import numpy as np class PEDataSet(Dataset): def __init__(self, data_path, is_train): if is_train: # 判断是否是训练数据 for i in range(len(data_path)): if i == 0: self.x, self.y = read_json(data_path[i]) else: x, y = read_json(data_path[i]) self.x = np.vstack((self.x, x)) self.y = np.vstack((self.y, y)) else: self.x, self.y = read_json(data_path) self.x = torch.tensor(self.x) self.y = torch.tensor(self.y) self.len = self.x.size()[0] def __getitem__(self, index): return self.x[index], self.y[index] def __len__(self): return self.len def read_json(json_path): df = pd.read_json(json_path, lines=True) label = df["label"] header = df["header"] general = df["general"] section = df["section"] x = [] y = [] for i in range(label.size): # 未标记样本,舍弃掉 if label[i] == -1: continue y.append(label[i]) tmp_x = [] tmp_x.append(header[i]["optional"]["sizeof_code"] % 255) tmp_x.append(header[i]["optional"]["sizeof_headers"] % 255) tmp_x.append(header[i]["optional"]["sizeof_heap_commit"] % 255) tmp_x.append(general[i]["size"] % 255) tmp_x.append(general[i]["vsize"] % 255) tmp_x.append(general[i]["has_debug"] % 255) tmp_x.append(general[i]["has_relocations"] % 255) tmp_x.append(general[i]["has_resources"] % 255) tmp_x.append(general[i]["has_signature"] % 255) tmp_x.append(general[i]["has_tls"] % 255) tmp_x.append(general[i]["symbols"] % 255) tmp_x.append(len(section[i]["sections"]) % 255) x.append(tmp_x) y = np.array(y) y = y.reshape(y.shape[0], 1) return np.array(x), y
三.模型
以下是本文实验用的模型,中间的隐藏层神经元数量分别是512,1024,512。
import torch.nn as nn class PEModel(nn.Module): def __init__(self): super(PEModel, self).__init__() self.classifier = nn.Sequential( nn.Linear(12, 512), nn.ReLU(True), nn.Dropout(), nn.Linear(512, 1024), nn.ReLU(True), nn.Dropout(), nn.Linear(1024, 512), nn.ReLU(True), nn.Dropout(), nn.Linear(512, 2) ) def forward(self, inputs): output = self.classifier(inputs) return output
四.参数设置
损失函数为cross_entropy,优化器为SGD,相关参数设定如下:
batch_size: 8
epoch: 25
lr: 0.001
decay: 0.0005
momentum: 0.9
参数的代码如下:
import os class Configure: base_path = "E:\\科研\\论文\\数据集\\ember\\ember_dataset_2018_2\\ember2018" train_path = [] for i in range(6): train_path.append(os.path.join(base_path, "train_features_" + str(i) + ".jsonl")) test_path = os.path.join(base_path, "test_features.jsonl") batch_size = 8 epochs = 25 lr = 0.001 decay = 0.0005 momentum = 0.9
分类器训练与测试的代码如下:
from Configure import Configure from PEModel import PEModel from PEDataSet import PEDataSet import os import torch from torch.utils.data import DataLoader def train(epoch): for batch_idx, data in enumerate(train_loader, 0): optimizer.zero_grad() # 梯度清0 inputs, labels = data inputs, labels = inputs.to(device), labels.to(device) y_pred = modeler(inputs) # 前向传播 loss = torch.nn.functional.cross_entropy(y_pred, labels) # 计算损失 if batch_idx % 100 == 99: print("epoch=%d, loss=%f" % (epoch, loss.item())) loss.backward() # 反向传播 optimizer.step() # 梯度更新 def test(): correct = 0 total = 0 with torch.no_grad(): for data in test_loader: inputs, target = data inputs, target = inputs.to(device), target.to(device) outputs = modeler(inputs) _, predicted = torch.max(outputs.data, dim=1) total += target.size(0) correct += (predicted == target).sum() acc = 1.0 * 100 * correct / total print('测试集准确率: %f%% [%d/%d]' % (acc, correct, total)) if __name__ == '__main__': os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" os.environ["CUDA_VISIBLE_DEVICES"] = "0" conf = Configure() train_dataset = PEDataSet(conf.train_path, True) train_loader = DataLoader(train_dataset, batch_size=conf.batch_size, shuffle=True, num_workers=2) test_dataset = PEDataSet(conf.test_path, False) test_loader = DataLoader(test_dataset, batch_size=conf.batch_size, shuffle=False, num_workers=2) modeler = PEModel() device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") modeler.to(device) optimizer = torch.optim.SGD(modeler.parameters(), lr=conf.lr, weight_decay=conf.decay, momentum=conf.momentum) print("========开始训练模型========") for i in range(conf.epochs): train(i) print("========模型训练完成========") print("========开始测试模型========") test() print("========模型测试完成========")
准备跑模型的时候发现,服务器的磁盘被同学们用完了,本机跑的又太慢,感兴趣的就自己跑一下看看效果吧。
[CTF入门培训]顶尖高校博士及硕士团队亲授《30小时教你玩转CTF》,视频+靶场+题目!助力进入CTF世界