SDN课设,使用Ryu来控制,啃源码可真的累呀

Github源码在此

项目背景

​ 目前智慧城市越来越发展,但是随着各种网络设备以及传感器的应用,城市的网络负担也在加大。如果数据分析等操作都由城市的数据中心来做,那么一定网络流量不堪重负。雾计算,是一个好思想,然而,某些时候又需要整体进行调控,我们使用SDN的思想来实现。

​ 这是一个交通路口的信号灯及背后处理器的拓扑,我们都知道,可以通过当前路况来智能调整交通信号灯的时长,使得交通更加通畅。我们设想,在车流量不是很大的时候,摄像头10.1.1.100收集车流信息,可以由局域网中的MCU(10.3.3.150)做决策,调节其时长;然而在上下班高峰期,就需要将车流信息上传到数据处理中心(10.2.2.130),数据中心根据全城的交通情况做决策,这样就避免各个路口自己做决策结果冲突了。

​ 但是,一个摄像头显然不会有这么智能的功能,知道什么时候将流发给局域网的MCU还是远程的数据中心。我们就像,设置一个虚拟IP(10.1.1.77)与对应的虚拟MAC(66:66:66:66:66:66),路由器在收到这个目的地址为虚拟IP的包之后,根据Ryu的控制,选择将其发送给近处还是远处进行处理,而MCU和数据中心可以正常向摄像头发信息。这样以来,通过虚拟IP,对于摄像头来说就是透明的了,它只管发,不用管别的。

肝课设过程中的笔记

方法一

虚拟机搭建拓扑
配置成为OVS交换机
某个虚拟机安装ryu作为控制器
使用命令将tyu指定为OVS所用的控制器

相当于将Ubuntu作为ovs的交换机了,好处:更加模拟现实环境,直接用linux系统。可能存在的问题:这个拓扑在清零的时候可能会有问题,比如arp表清除等(不能做到像mininet仿真的时候每次运行拓扑就从零开始。)

Ubuntu安装ovs配置

Ubuntu安装Open vSwitch

ryu RESTAPI

RYU核心源码解读:

RYU入门

RYU官方文档(其中有对模拟arp的介绍,并且可以查看guide1.dox)

RYU:OpenFlow协议源码

方法二

使用mininet搭建整个拓扑,ryu也是直接作为这个拓扑网络的控制器

优点:方便调试、更新,使用wireshark抓包,ping和tcpdump进行流量创建(这一点方法一也可以做到)

缺点:都是在一个虚拟机上进行的,互联互通有可能成为问题

Ryu+mininet+Wireshark

一个mininet+ryu逐步控制的例子

Mininet与真实网络连接

Mininet中host与外网通信

一个抓包例子

正式开始

ryu源码分析之packet类

ryupacket源码

ryu packet文档

结合这个例子学习构造包下发

逐级封装:

e = ethernet.ethernet(dst='ff:ff:ff:ff:ff:ff',
                      src='08:60:6e:7f:74:e7',
                      ethertype=ether.ETH_TYPE_ARP)
a = arp.arp(hwtype=1, proto=0x0800, hlen=6, plen=4, opcode=2,
            src_mac='08:60:6e:7f:74:e7', src_ip='192.0.2.1',
            dst_mac='00:00:00:00:00:00', dst_ip='192.0.2.2')
p = packet.Packet()
p.add_protocol(e)
p.add_protocol(a)

添加action事件下发包

actions = [parser.OFPActionOutput(port=port)]
out =parser.OFPPacketOut(datapath=datapath,buffer_id=ofproto.OFP_NO_BUFFER,in_port=ofproto.OFPP_CONTROLLER,actions=actions,data=data)
datapath.send_msg(out)

Ryu RESTFUL协议远程获取控制面信息

跨网段通信所需

OFPMatch 607行

OFPActinOutput 4682行

OFPInstructionActions 4550行

pkt = packet.Packet(msg.data)

eth = pkt.get_protocols(ethernet.ethernet)[0]
arp_pkt = pkt.get_protocol(arp.arp)

if eth:
    eth_dst = eth.dst
    eth_src = eth.src
if udp_pkt:
    print('***********收到虚拟udp包*********')
    match = parser.OFPMatch(eth_dst='66:66:66:66:66:66')
    print("match:", match)

    #kwargs = dict(eth_dst='66:66:66:66:66:66')
    #match1 = parser.OFPMatch(** kwargs)
    #print("match1:", match1)

    actions = [parser.OFPActionSetField(ipv4_dst='10.1.1.110'),parser.OFPActionSetField(eth_dst=src_mac),parser.OFPActionOutput(in_port)]
    

    # datapath = self._find_dp(int(switch))
    # assert datapath is not None

    inst = [parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS, actions)]

    # idle and hardtimeout set to 0,making the entry permanent
    # reference openflow spec
    mod = datapath.ofproto_parser.OFPFlowMod(
        datapath=datapath,
        match=match,
        idle_timeout=0,
        hard_timeout=0,
        priority=32768,
        instructions=inst
    )
    datapath.send_msg(mod)

RYU官方文档中,给了基于RESTFUL协议的对外通信API,Ryu RESTFUL协议远程获取控制面信息,封装在了ryu.app.ofctl_rest类中,主要提供了以下几种操作:

1.获取ovs交换机信息

2.获取流信息

3.添加流、添加新的组

由于是基于RESTFUL API,因此我们可以利用python的requests模块很轻松的使用,其方法和使用一般的http方法并无不同。

为了获取RYU所管理的网桥信息,首先根据官方文档得url:“http://localhost:8080:stats/switchs”,使用GET方法,得到所有的网桥dpid,存入列表中。

def __init__(self):
    self.URL='http://localhost:8080'
    self.dpid=[]
def getall_sw(self):
    url=self.URL+'/stats/switches'
    data = requests.get(url)
    if data.status_code == 200:
        data_all = data.json()
        print(data_all)           
        for i in data_all:
            self.dpid.append(i)

在遍历这个列表,使用url:“http://localhost:8080:stats/flows”,再加上列表中的dpid,组合成得到这个dpid网桥的具体信息,通过GET方法,将得到的数据转化成json格式,再提取其中的action模式、流过包数量、流过字节数等字段,得到想要的输出信息。

def getall_sw_data(self):
    for i in self.dpid:
        url=self.URL+'/stats/flow/'+str(i)
        data = requests.get(url)
        if data.status_code == 200:
            data_all = data.json()[str(i)]
            action=data_all[0]['actions']
            packet_count=data_all[3]['packet_count']
            priority=data_all[6]['priority']
            byte_count=data_all[8]['byte_count']
            print("交换机{}的action模式为{}\n"\
                  "流过的包为{}\n"\
                  "流过的字节为{}\n"\
                  "优先级为{}\n".format(i,action,packet_count,byte_count,priority))

整体的代码分析

按照如图的拓扑:

55行的代码

self.host_mac_to={}

是为了将每个设备的id和其MAC地址相对应,方便后面我们知道是哪个设备。

89行:

self.arp_table.update({VIP:VMAC})
self.arp_table.update({VIP2:VMAC})

将虚拟地址的IP和MAC提前准备好,这样才能够发arp回复。

当摄像头第一次向虚拟地址发包的时候,因为没有mac地址,会发arp请求(578行),我们会给个虚拟arp回复。

随后,摄像头拿到mac成帧,向外发包,通过439行:

# 发往虚拟ip
if pkt_eth.dst == VMAC:
    flag = 1
    print('!!!!!发往虚拟ip!!!!!')

    tsecond = time.asctime(time.localtime(time.time()))
    second = tsecond.split(' ')[3].split(':')[2]
    print(second, '!!GET TIME!!')

    if pkt_ipv4:
        if second <= '20':  # 前20s发往本地计算单元
            if pkt_ipv4.src == pc1_ip:
                print('前20s,pc1发往pc2')
                dstmac = pc2_mac
                dstip = pc2_ip
                if pkt_ipv4.src == pc4_ip:
                    print('前20s,pc4发往pc5')
                    dstmac = pc5_mac
                    dstip = pc5_ip
                    data = msg.data
                    actions = [parser.OFPActionSetField(ipv4_dst=dstip),
                               parser.OFPActionSetField(eth_dst=dstmac), parser.OFPActionOutput(port=2)]
                    self.sendpaket(parser, ofproto, data, datapath, actions, buffer_id, in_port)

就像前面写的,对包进行拆分、组装,更换目的IP,然后再从对应端口发出去。

为了模拟,我们假装每分钟的20s~40s,为上下班高峰期,此时会将报文转发到远程的服务器。

if second > '20' and second <= '40':  # 中间20s发往远程服务器
    if pkt_ipv4.src == pc1_ip:
        print('中间20s,pc1发往pc3')
        smac = gate1_mac

        if pkt_ipv4.src == pc4_ip:
            print('中间20s,pc4发往pc3')
            smac = gate3_mac

            actions = [parser.OFPActionOutput(port=gate_port)]
            data = msg.data
            self.sendpaket(parser, ofproto, data, datapath, actions, buffer_id, in_port)

            actions = [parser.OFPActionSetField(eth_src=smac),
                       parser.OFPActionSetField(ipv4_dst=pc3_ip),
                       parser.OFPActionSetField(eth_dst=gate2_mac),
                       parser.OFPActionOutput(port=gate_port)]
            data = msg.data
            datapath = self.datapaths[0]
            self.sendpaket(parser, ofproto, data, datapath, actions, buffer_id, in_port)

            actions = [parser.OFPActionSetField(eth_src=gate2_mac),
                       parser.OFPActionSetField(ipv4_dst=pc3_ip),
                       parser.OFPActionSetField(eth_dst=pc3_mac), parser.OFPActionOutput(port=1)]
            data = msg.data
            datapath = self.datapaths[0]
            self.sendpaket(parser, ofproto, data, datapath, actions, buffer_id, in_port)

额外功能:

204行:

# ipv6 广播报文目的Mac开头都是33:33“
if '33:33' in dst_mac[:5]:
    # the controller has not flooded this packet before
    if (src_mac,dst_mac) not in self.flood_history[dpid]:
        # we remember this packet
        self.flood_history[dpid].append((src_mac,dst_mac))
        else:
            # the controller have flooded this packet before,we do nothing and return
            return

防止ipv6的广播风暴。

为了能够检测整个网络的效果,还写了一个flask作为后端的web服务器,可以随时在线上查看网络的负载以及路由器的情况:

import requests
import json
class ryu_restful:

    def __init__(self):
        self.URL='http://localhost:8080'
        self.dpid=[]
    def getall_sw(self):
        url=self.URL+'/stats/switches'
        data = requests.get(url)
        if data.status_code == 200:
            data_all = data.json()
            print(data_all)           
            for i in data_all:
                self.dpid.append(i)


    def getall_sw_data(self):
        for i in self.dpid:
            url=self.URL+'/stats/flow/'+str(i)
            data = requests.get(url)
            if data.status_code == 200:
                data_all = data.json()[str(i)]
                action=data_all[0]['actions']
                packet_count=data_all[3]['packet_count']
                priority=data_all[6]['priority']
                byte_count=data_all[8]['byte_count']


                print("交换机{}的action模式为{}\n"\
                            "流过的包为{}\n"\
                            "流过的字节为{}\n"\
                            "优先级为{}\n".format(i,action,packet_count,byte_count,priority))


    def add_flow(self):
        url=sel.URL+'/stats/flowentry/add'
        data= '{"dpid": 1,"cookie": 1,"cookie_mask": 1,"table_id": 0,"idle_timeout": 30,'\
        ' "hard_timeout": 30, "priority": 11111,"flags": 1,"match":{"in_port":1}, "actions":[{"type":"OUTPUT","port": 2}]}'

        res=requests.post(url=url,data=data)
        print(res.text)


    def get_flow(self):
        
        for i in self.dpid:
            url=self.URL+'/stats/flow'+str(i)
            data=requests.get(url)
            if data.status_code==200:
                data_all=data.json()[str(i)][0]
                match=data_all['match']
                actions=data_all['actions']
                print('流{}的匹配表为{},动作为{}'.format(i,match,actions))
                


if __name__ == "__main__":
    ryu=ryu_restful()
    ryu.getall_sw()
    ryu.getall_sw_data()
    ryu.get_flow()

最后的效果

向虚拟地址发UDP报文:

远程监测网络运行情况:

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐