本文记录用python机器学习实现对基于TCP协议的DDOS攻击的流量监测。基本原理是假设将流量包分组,每100个流量包为一组,用一些反应每组流量包整体特征的数据作为依据,判断抓取这组流量包时主机是否遭遇了DDOS攻击。以下是学习的详细记录。


一、Wireshark抓包工具使用以及数据包分析

wireshark可以分析导入的日志记录,也可以实时监控本地接口。其基本使用方法有数据包筛选、数据包搜索、数据包分析等。由于此时使用的是WIFI,故选用WLAN接口。
在这里插入图片描述
在这里插入图片描述

1.数据包筛选

在显示过滤器中输入以下规则的字符串可以进行数据筛选。
①目的ip筛选:ip.dst = = ip地址
②mac地址筛选:
目标mac地址筛选: eth.dst = =A0:00:00:04:C5:84 ;
mac地址筛选: eth.addr= =A0:00:00:04:C5:84
③端口筛选:
筛选tcp协议的目标端口为80 的流量包:tcp.dstport = = 80
筛选tcp协议的源端口为80 的流量包:tcp.srcport = = 80
筛选udp协议的源端口为80 的流量包:udp.srcport = = 80
④协议筛选:
筛选协议为tcp的流量包:tcp
筛选协议为udp的流量包:udp
筛选协议为arp/icmp/http/ftp/dns/ip的流量包:arp/icmp/http/ftp/dns/ip
⑤包长度筛选:
筛选长度为20的udp流量包:udp.length = =20
筛选长度大于20的tcp流量包:tcp.len >=20
筛选长度为20的IP流量包:ip.len = =20
筛选长度为20的整个流量包:frame.len = =20
⑥http请求筛选:
筛选HTTP请求方法为GET的流量包:http.request.method==“GET”
筛选HTTP请求方法为POST的流量包:http.request.method==“POST”
筛选URL为/img/logo-edu.gif的流量包:http.request.uri==“/img/logo-edu.gif”
筛选HTTP内容为FLAG的流量包:http contains “FLAG”

2.数据包搜索

在wireshark界面按“Ctrl+F”,可以进行关键字搜索。搜索栏的左边下拉,有分组列表、分组详情、分组字节流三个选项,分别对应wireshark界面的三个部分,搜索时选择不同的选项以指定搜索区域。
在这里插入图片描述

3.数据包分析

在分组详情一栏中我们可以查看数据包的每一个字段,一般从上至下每行分别为物理层、数据链路层、IP层、运输层、应用层协议的字段。
在这里插入图片描述
点开每层协议可以看到数据包中的具体字段,以下面的tcp包为例:
在这里插入图片描述

二、使用python库进行流量特征提取

1.下载scapy库

打开cmd输入命令pip install scapy即可。
在这里插入图片描述

由于代码中需要用到datetime库读取抓取时间,故用同样方法下载datetime库
在这里插入图片描述

2.scapy库的使用

【注意:scapy模块必须使用 from scapy.all import * 才能正确调用。】
①读出文件流量包中的全部数据:

#引入库
from scapy.all import rdpcap
#调用库函数读取数据
packets = rdpcap('Normal flow package.pcapng')
#数据输出(.mysummary让数据易读)
for data in packets:
    print(data.mysummary)

因为数据过多运行结果不再全部展示,此处截取前三条,可以发现:每条记录都是由数据链路层协议(Ether)、网络层协议(IP)、运输层协议(如UDP)、网络层数据组成。
在这里插入图片描述
②读pcap中的某个包。当scapy读入pcap文件时,实则是读入一个列表。因为pcap文件中包含了很多个数据包,所以读进来的packets代表所有pcap包中包含的数据,而packets[i]表示在pcap中的第i条数据。
③读每条数据包的具体格式,可通过show()函数进行结构的展示。
④提取每条数据包中具体网络属性的值,利用如下代码即可访问。

pkts[ i ] [ 对应的协议].属性名称

例如以下代码:

#引入库
from scapy.all import rdpcap
#调用库函数读取数据
a = rdpcap('Normal flow package.pcapng')
#显示第一条记录的数据包结构 
a[0].show()
#输出第一条记录的ip层arp协议的hwtype字段
print("a[0]['ARP'].hwtype=",a[0]['ARP'].hwtype)

运行结果如下:
在这里插入图片描述

3.csv库的使用(数据写入.csv文件)

①CSV即逗号分隔值(Comma-Separated Values),有时也称为字符分隔值,因为分隔字符也可以不是逗号,其文件以纯文本形式存储表格数据(数字和文本)。纯文本意味着该文件是一个字符序列,不含必须像二进制数字那样被解读的数据。
②CSV文件由任意数目的记录组成,记录间以某种换行符分隔;每条记录由字段组成,字段间的分隔符是其它字符或字符串,最常见的是逗号或制表符。通常,所有记录都有完全相同的字段序列。通常都是纯文本文件。建议使用WORDPAD或是记事本来开启,再则先另存新档后用EXCEL开启,也是方法之一。
③CSV文件的写入和读取均可用字典或列表的形式,具体要看数据在python文件中的保存形式,以下例子中假设data1为列表的列表,data2为字典的列表。
(1)列表写入:

#表头(列表)
head1 = ["姓名","年龄","分数"]
#数据(列表的列表,列表的一个元素是一行)
data1 =[["李四",23,90],
       ["刘二麻子",13,20]]
#创建写入器对象
with open("text.csv","w",encoding="utf-8") as f:
	#调用csv库
	writer = csv.writer(f)
	#写入标题,写入单行用writerow()
	writer.writerow(head1)
	#写入数据,写入多行用writerows()
	writer.writerows(data1)

(2)字典写入:

#表头和数据
head2 = ["姓名", "年龄", "分数"]
data2 =[{"姓名":"李四","年龄":23,"分数":90},
        {"姓名":"王五","年龄":19,"分数":100},
        {"姓名":"刘二麻子","年龄":13,"分数":20}]
#创建写入器对象
with open("text2.csv","w",encoding="utf-8") as f:
    writer2 = csv.DictWriter(f,head2)
	#上一步并不会写入表头,这里才写入
	writer2.writeheader()
    #写入多行数据
    writer2.writerows(data2)
    #数据存储在单个字典中,可以写入一行数据
    writer2.writerow({"姓名":"周九龄","年龄":9,"分数":10})

(3)读出列表:

with open("text.csv","r",encoding="utf-8") as f:
    reader = csv.reader(f)
	print(reader)#<_csv.reader object at>
	#可依次next(reader)来获取
	for i in reader:#I为list,
		print(i)#每一行的list

(4)读出字典

with open("text2.csv","r",encoding="utf-8") as f:
	reader = csv.DictReader(f)
	print(reader)#
	#可next(reader)来获取或者遍历
	for i in reader:
		print(type(i))# 字典类型
		print(i["姓名"])

4.流量特征提取

(一)分析
假设将流量包分组,每100个流量包为一组,用一些反应每组流量包整体特征的数据作为依据,判断抓取这组流量包时主机是否遭遇了DDOS攻击。为了实现对DDOS攻击的检测,选取以下四个特征对每组流量包进行特征提取。

①每组数据包长度的均值;
②每组数据包长度的标准差;
⑤每组中SYN数据包标志数目;
原理:当使用伪造IP地址的DDoS攻击发生时,抓包文件中每个源地址对应的数据包数目较小,数据包字节数几乎一致。因此,当每组数据包长度的均值越小且标准差越小时,越容易被判定为主机遭受了DDOS攻击。

③每组数据包TTL的标准差;
原理:当使用随机源进行DDoS攻击时,虽然使用了伪造源地址进行攻击,但攻击者无法伪造攻击主机与目标主机之间的位置关系,有时候所有的攻击数据包使用相同的TTL值。因此,每组数据包TTL的标准差越小时,越容易被判定为主机遭受了DDOS攻击。

④每组中ACK数据包标志数目;
⑤每组中SYN数据包标志数目;
原理:根据建立TCP连接的三次握手,一种常见的DDOS攻击模式是向目标主机发送大量的SYN包,而不响应第三次的ACK报文,因此目标主机会接收到大量的SYN包而很少的ACK包。因此,每组中ACK数据包标志数目/SYN数据包标志数目越小时,越容易被判定为主机遭受了DDOS攻击。

(二)提取
综上所述,先对一组中的每个流量记录进行特征提取,然后再进行特征处理。
以下代码用于提取同目录下的名为"Normal flow package.pcap"的文件,形成每个具有100条记录的csv文件。其中的流量特征有报文长度、ttl、时间戳、ack、syn,同时还有IP使用标识和TCP使用标识用来提示数据包结构。


#!/usr/bin/env python
# coding: utf-8

'''
Created on 2021年4月2日

@author: OrangeNut's MilkCover
'''

#导入库
import csv
from scapy.all import *

#读数据包
packets = rdpcap("be attacked.pcapng")

#表格头(IP使用标识、TCP使用标识、报文长度、ttl、时间戳、ack、syn)
headers=['IP','TCP','length','ttl','timestamp','ACK','SYN']


#列表的列表记录每行数据
rows=[]

#每条记录的编号
j=1

#遍历每条记录
for data in packets:
    
    #如果一个包用到IP协议,IP使用标识置1,输出报文长度和生存时间,否则输出 '***'
    if 'IP' in data:
        a1=1
        a3=data['IP'].len
        a4=data['IP'].ttl
    else:
        a1=0
        a3='***'
        a4='***'
        
    #如果一个包用到TCP协议,TCP使用标识置1,查看flags字段
    if 'TCP' in data:
        a2=1
        #若flags字段中有ACK,ACK置1,否则置0
        if 'A' in data['TCP'].flags:
            a6=1
        else:
            a6=0
        #若flags字段中有SYN,SYN置1,否则置0
        if 'S' in data['TCP'].flags:
            a7=1
        else:
            a7=0
    #否则输出 '***'
    else:
        a2=0
        a6='***'
        a7='***'
        
    #时间戳还待定hhh
    a5='***'
    
    #数据写入列表,列表中加一条记录
    rows.append([a1,a2,a3,a4,a5,a6,a7])
    j+=1

    
    #每100条写入一个文件
    if j%100==1:
        #形成文件名编号
        j_str=str(j//100)
        
        #数据写入文件
        with open(j_str+'.csv','w',newline ='') as f:
            fcsv= csv.writer(f)
            fcsv.writerow(headers)
            fcsv.writerows(rows)
            
        #清空rows列表
        rows.clear

5.数据处理

数据处理代码是对特征提取代码的一个改进,可以直接读取一个数据包数目较大的pcap文件,并按顺序将其中的每100条记录的数据包长度的均值、数据包长度的标准差、TTL的标准差、ACK标志数目、SYN标志数目形成一条数据(及特征向量),最后存入csv文件中。也就是说,具有10000个数据包的pcap文件可以形成一个具有100条数据的csv文件。为这些数据加上标签后,即可用来训练分类器模型。

#!/usr/bin/env python
# coding: utf-8

'''
Created on 2021年4月2日

@author: OrangeNut's MilkCover
'''

#导入库
import csv
from scapy.all import *
import numpy as np

#读数据包
packets = rdpcap("Normal flow package.pcap")

#表格头(IP使用标识、TCP使用标识、报文长度、ttl、时间戳、ack、syn)
headers=['average_length','standard_deviation_length','standard_deviation_ttl','average_time_difference','ACK','SYN']


#列表的列表记录每行数据(长度、ttl、时间、ack、syn)
rows=[]

#每条记录的编号
j=1

#初始化每一百个包ack和syn数目
a4=0
a5=0

#记录100条记录均值或平均差的数组
every_100_raws=[]

#遍历每条记录
for data in packets:
    
    #如果一个包用到IP协议和TCP协议,将其记录
    if 'IP' in data and 'TCP' in data :
        a1=data['IP'].len
        a2=data['IP'].ttl
        a3=data.time
        #若flags字段中有ACK,ack加1
        if 'A' in data['TCP'].flags:
            a4+=1
        #若flags字段中有SYN,syn加1
        if 'S' in data['TCP'].flags:
            a5+=1
 
        
        #数据写入列表,列表中加一条记录,便于以后计算均值或标准差
        rows.append([a1,a2,a3])
        j+=1

        #每100条计算一下
        if j%100==1:
            #形成数组用np库
            array= np.array(rows,dtype='float32')
            #每列平均值
            average=np.mean(array,0)
            #每列标准差
            standard_deviation=np.std(array, axis=0)
            

            #b0~b5是每100条记录的五个综合计算值
            b0=average[0]
            b1=standard_deviation[0]
            b2=standard_deviation[1]
            for i in range(99):
                temp=0
                temp+=rows[i+1][2]-rows[i][2]
                b3=temp/99
            b4=a4
            b5=a5   
            
            #将其形成列表写成every_100_raws列表的一个元素
            every_100_raws.append([b0,b1,b2,b3,b4,b5])

            #清空rows列表和a4,a5,以便读取下100条数据
            rows.clear
            a4=0
            a5=0


#数据写入文件
with open('every_100_record.csv','w',newline ='') as f:
    fcsv= csv.writer(f)
    fcsv.writerow(headers)
    fcsv.writerows(every_100_raws)
   

三、机器学习建立流量监测器模型

1. sklearn决策树参数详解

我们都知道,一个模型中很重要的一步是调参。在sklearn中,模型的参数是通过方法参数来决定的,以下给出sklearn中,决策树的参数:


DecisionTreeClassifier(
                 criterion="gini",//分裂节点时评价准则
                 splitter="best",//分裂节点时的策略
                 max_depth=None,//树的最大深度
                 min_samples_split=2,//分裂一个内部节点需要的最少样本数。
                 min_samples_leaf=1,//指定每个叶子节点需要的最少样本数。
                 min_weight_fraction_leaf=0.,//指定叶子节点中样本的最小权重。
                 max_features=None,//搜寻最佳划分的时候考虑的特征数量。
                 random_state=None,//叶子节点的最大数量。
                 max_leaf_nodes=None,
                 min_impurity_decrease=0.,
                 min_impurity_split=None,
                 class_weight=None,
                 presort=False
                 )


参数含义:

1.criterion:string, optional (default="gini")
            (1).criterion='gini',分裂节点时评价准则是Gini指数。
            (2).criterion='entropy',分裂节点时的评价指标是信息增益。
2.max_depth:int or None, optional (default=None)。指定树的最大深度。
            如果为None,表示树的深度不限。直到所有的叶子节点都是纯净的,即叶   子节点中所有的样本点都属于同一个类别。或者每个叶子节点包含的样本数小于min_samples_split。
3.splitter:string, optional (default="best")。指定分裂节点时的策略。
           (1).splitter='best',表示选择最优的分裂策略。
           (2).splitter='random',表示选择最好的随机切分策略。
4.min_samples_split:int, float, optional (default=2)。表示分裂一个内部节点需要的最少样本数。
           (1).如果为整数,则min_samples_split就是最少样本数。
           (2).如果为浮点数(01之间),则每次分裂最少样本数为ceil(min_samples_split * n_samples)
5.min_samples_leaf: int, float, optional (default=1)。指定每个叶子节点需要的最少样本数。
           (1).如果为整数,则min_samples_split就是最少样本数。
           (2).如果为浮点数(01之间),则每个叶子节点最少样本数为ceil(min_samples_leaf * n_samples)
6.min_weight_fraction_leaf:float, optional (default=0.)
           指定叶子节点中样本的最小权重。
7.max_features:int, float, string or None, optional (default=None).
           搜寻最佳划分的时候考虑的特征数量。
           (1).如果为整数,每次分裂只考虑max_features个特征。
           (2).如果为浮点数(01之间),每次切分只考虑int(max_features * n_features)个特征。
           (3).如果为'auto'或者'sqrt',则每次切分只考虑sqrt(n_features)个特征
           (4).如果为'log2',则每次切分只考虑log2(n_features)个特征。
           (5).如果为None,则每次切分考虑n_features个特征。
           (6).如果已经考虑了max_features个特征,但还是没有找到一个有效的切分,那么还会继续寻找
           下一个特征,直到找到一个有效的切分为止。
8.random_state:int, RandomState instance or None, optional (default=None)
           (1).如果为整数,则它指定了随机数生成器的种子。
           (2).如果为RandomState实例,则指定了随机数生成器。
           (3).如果为None,则使用默认的随机数生成器。
9.max_leaf_nodes: int or None, optional (default=None)。指定了叶子节点的最大数量。
           (1).如果为None,叶子节点数量不限。
           (2).如果为整数,则max_depth被忽略。
10.min_impurity_decrease:float, optional (default=0.)
         如果节点的分裂导致不纯度的减少(分裂后样本比分裂前更加纯净)大于或等于min_impurity_decrease,则分裂该节点。
         加权不纯度的减少量计算公式为:
         min_impurity_decrease=N_t / N * (impurity - N_t_R / N_t * right_impurity
                            - N_t_L / N_t * left_impurity)
         其中N是样本的总数,N_t是当前节点的样本数,N_t_L是分裂后左子节点的样本数,
         N_t_R是分裂后右子节点的样本数。impurity指当前节点的基尼指数,right_impurity指
         分裂后右子节点的基尼指数。left_impurity指分裂后左子节点的基尼指数。
11.min_impurity_split:float
         树生长过程中早停止的阈值。如果当前节点的不纯度高于阈值,节点将分裂,否则它是叶子节点。
         这个参数已经被弃用。用min_impurity_decrease代替了min_impurity_split。
12.class_weight:dict, list of dicts, "balanced" or None, default=None
         类别权重的形式为{class_label: weight}
         (1).如果没有给出每个类别的权重,则每个类别的权重都为1(2).如果class_weight='balanced',则分类的权重与样本中每个类别出现的频率成反比。
         计算公式为:n_samples / (n_classes * np.bincount(y))
         (3).如果sample_weight提供了样本权重(由fit方法提供),则这些权重都会乘以sample_weight。
13.presort:bool, optional (default=False)
        指定是否需要提前排序数据从而加速训练中寻找最优切分的过程。设置为True时,对于大数据集
        会减慢总体的训练过程;但是对于一个小数据集或者设定了最大深度的情况下,会加速训练过程。


2.读入数据并训练模型

#!/usr/bin/env python
# coding: utf-8

'''
Created on 2021年4月4日

@author: OrangeNut's MilkCover

功能:训练决策树模型,并输出测试的拟合度和分类精度
      
'''


#导入库
import pandas as pd
from sklearn.feature_extraction import DictVectorizer
from sklearn import tree
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
 
#pandas 读取 csv 文件,header = None 表示不将首行作为列

x = pd.read_csv('every_100_record.csv', usecols=[0,1,2,3,4,5],skiprows=[0],header=None)  
y= pd.read_csv('every_100_record.csv', usecols=[6], skiprows=[0],header=None)     
#print(x,y)


#划分
x_train,x_test,y_train,y_test = train_test_split(x,y)
#print(x_train,x_test,y_train,y_test)

#训练
clf = tree.DecisionTreeClassifier(criterion='gini')
clf.fit(x_train,y_train)

#测试
#①sk-learn模型的score方法
train_score=clf.score(x_train,y_train)#train分数
test_score=clf.score(x_test,y_test)#test分数
print('train score:{0};test score:{1}'.format(train_score,test_score))#输出

#②预测精度值
y_pred=clf.predict(x_test)
score = accuracy_score(y_test,y_pred)
#print(y_test,y_pred)
print("acuraccy:",score)


'''
#保存成 dot 文件,后面可以用 dot out.dot -T pdf -o out.pdf 转换成图片
with open("out.dot", 'w') as f :
    f = tree.export_graphviz(clf, out_file = f)
'''
    

利用以上模型可以实现对流量包的二分类,即对基于TCP协议的DDOS攻击的流量监测。

更多推荐