由于最近在整合一个云端流量捕获插件,所以特意的把OpenStack中的Tap-as-a-service的源代码拿来学习。项目的具体文件在github上.https://github.com/openstack/tap-as-a-service

目前,市面上的云端流量采集主要有三种方法,策略采集,网元采集以及代理采集,见https://www.sdnlab.com/21092.html

OpenStack用的是属于第一种,策略采集。

作为OpenStack的Neutron的一项插件,其核心是为跨节点的vm以及port提供一个统一的流量监听以及捕获的方案。

github上给出了API以及使用方法,下面看一看其在代码上的具体实现。

1.API 采用RestFul API的方式,将资源作为neutron的子资源提供给用户,具体的用法在API说明页已经给的很清楚了

https://github.com/openstack/tap-as-a-service/blob/master/API_REFERENCE.rst

2.整体架构

Taas采用和Neutron 的ML2_Plugin相同的架构实现流量捕获任务,主要由两部分实现一个流量捕获过程:tap_service(作为流量监听方)与tap_flow(被监听方)

我们先来看一看Taas Plugin的实现

2.1Taas Plugin的数据库操作:

主要有以下几个函数:

create_tap_service(self, context, tap_service)

delete_tap_service(self, context, id)

create_tap_flow(self, context, tap_flow)

delete_tap_flow(self, context, id)

在OpenStack的Plugin中,以上函数都是直接处理数据库的。我现在以create_tap_service()为例说明一下他的工作流程,具体位置在neutron_taas/services/taas/taas_plugin.py。下面这段代码执行之前,tap设备已经被创建

    def create_tap_service(self, context, tap_service):
        LOG.debug("create_tap_service() called")

        t_s = tap_service['tap_service']
        tenant_id = t_s['tenant_id']
        port_id = t_s['port_id']

        # Get port details
        port = self._get_port_details(context, port_id)

        # Check if the port is owned by the tenant.
        if port['tenant_id'] != tenant_id:
            raise taas_ex.PortDoesNotBelongToTenant()

        # Extract the host where the port is located
        host = port['binding:host_id']

        if host is not None:
            LOG.debug("Host on which the port is created = %s" % host)
        else:
            LOG.debug("Host could not be found, Port Binding disbaled!")

        # Create tap service in the db model
        with context.session.begin(subtransactions=True):
            ts = super(TaasPlugin, self).create_tap_service(context,
                                                            tap_service)
            driver_context = sd_context.TapServiceContext(self, context, ts)
            self.driver.create_tap_service_precommit(driver_context)

        try:
            self.driver.create_tap_service_postcommit(driver_context)
        except Exception:
            with excutils.save_and_reraise_exception():
                LOG.error("Failed to create tap service on driver,"
                          "deleting tap_service %s", ts['id'])
                super(TaasPlugin, self).delete_tap_service(context, ts['id'])

        return ts

如果做过OpenStack开发的人就知道,tap_service其实是包含API中POST的表单信息的一个字典,比如如下形式:

{
    "tap_service": {
        "description": "Test_Tap",
        "name": "Test",
        "port_id": "c9beb5a1-21f5-4225-9eaf-02ddccdd50a9",
        "tenant_id": "97e1586d580745d7b311406697aaf097"
    }
}

为了方便分析,我们来一段一段的解析代码:

        LOG.debug("create_tap_service() called")

        t_s = tap_service['tap_service']
        tenant_id = t_s['tenant_id']
        port_id = t_s['port_id']

        # Get port details
        port = self._get_port_details(context, port_id)

        # Check if the port is owned by the tenant.
        if port['tenant_id'] != tenant_id:
            raise taas_ex.PortDoesNotBelongToTenant()

        # Extract the host where the port is located
        host = port['binding:host_id']

        if host is not None:
            LOG.debug("Host on which the port is created = %s" % host)
        else:
            LOG.debug("Host could not be found, Port Binding disbaled!")

以上代码功能:获取基本信息,并且根据端口绑定的对应关系,从neutron-plugin上获取port所在的主机。

        with context.session.begin(subtransactions=True):
            ts = super(TaasPlugin, self).create_tap_service(context,
                                                            tap_service)#Position 1
            driver_context = sd_context.TapServiceContext(self, context, ts) #Position 2
            self.driver.create_tap_service_precommit(driver_context)#Position 3

        try:
            self.driver.create_tap_service_postcommit(driver_context)# Position 4
        except Exception:
            with excutils.save_and_reraise_exception():
                LOG.error("Failed to create tap service on driver,"
                          "deleting tap_service %s", ts['id'])
                super(TaasPlugin, self).delete_tap_service(context, ts['id'])

        return ts

以上代码功能:可以根据我划分的三个Position来讲解:

Position1:数据库操作,将新的tap_service实例信息添加到数据库中

Position2:获得driver_context.driver_context是什么呢?经过定位,发现其结构如下,其实就是将新创建的TapService相关信息封装成了一个Context类

class ServiceDriverContext(object):
    """ServiceDriverContext context base class"""
    def __init__(self, service_plugin, plugin_context):
        self._plugin = service_plugin
        self._plugin_context = plugin_context


class TapServiceContext(ServiceDriverContext):

    def __init__(self, service_plugin, plugin_context, tap_service):
        super(TapServiceContext, self).__init__(service_plugin, plugin_context)
        self._tap_service = tap_service
        self._tap_id_association = None
        self._setup_tap_id_association(tap_service['id'])

    def _setup_tap_id_association(self, tap_service_id):
        try:
            self._tap_id_association = self._plugin.get_tap_id_association(
                self._plugin_context, tap_service_id)
        except taas.TapServiceNotFound:
            LOG.debug("Not found tap_ip_association for tap_service: %s",
                      tap_service_id)

    @property
    def tap_service(self):
        return self._tap_service

    @property
    def tap_id_association(self):
        return self._tap_id_association

    @tap_id_association.setter
    def tap_id_association(self, tap_id_association):
        """Set tap_id_association in context"""
        self._tap_id_association = tap_id_association

Position3:具体的self.driver如下:

class TaasRpcDriver(service_drivers.TaasBaseDriver):
    """Taas Rpc Service Driver class"""

    def __init__(self, service_plugin):
        LOG.debug("Loading TaasRpcDriver.")
        super(TaasRpcDriver, self).__init__(service_plugin)
        self.endpoints = [taas_agent_api.TaasCallbacks(service_plugin)]
        self.conn = n_rpc.Connection()
        self.conn.create_consumer(topics.TAAS_PLUGIN,
                                  self.endpoints, fanout=False)

        self.conn.consume_in_threads()

        self.agent_rpc = taas_agent_api.TaasAgentApi(
            topics.TAAS_AGENT,
            cfg.CONF.host
        )

        return

这是一个RpcDriver,在消息队列的broker上创建了消费者并在本地创建了消费者监听序列与rpc_plugin_agent.

create_tap_service_precommit的具体实现如下:

    def create_tap_service_precommit(self, context):
        ts = context.tap_service
        tap_id_association = context._plugin.create_tap_id_association(
            context._plugin_context, ts['id'])
        context.tap_id_association = tap_id_association

具体而言,这里将tap_service的id号与分配的一个taas_id号进行绑定。

Position4:

    def create_tap_service_postcommit(self, context):
        """Send tap service creation RPC message to agent.

        This RPC message includes taas_id that is added vlan_range_start to
        so that taas-ovs-agent can use taas_id as VLANID.
        """
        # Get taas id associated with the Tap Service
        ts = context.tap_service
        tap_id_association = context.tap_id_association
        taas_vlan_id = tap_id_association['taas_id']
        port = self.service_plugin._get_port_details(context._plugin_context,
                                                     ts['port_id'])
        host = port['binding:host_id']

        rpc_msg = {'tap_service': ts,
                   'taas_id': taas_vlan_id,
                   'port': port}

        self.agent_rpc.create_tap_service(context._plugin_context,
                                          rpc_msg, host)
        return

将tap_service的id, taas的id,port的详细信息发送给了agent节点,之后在agent节点上会进行进一步的配置。

 

2.2 Plugin端实现rpc调用的具体流程:

首先从Taas Plugin发出rpc消息开始,在上一节中,在对控制节点上的TapService信息完成同步之后,Taas Plugin按照下面这段代码发出rpc消息:

    def create_tap_service_postcommit(self, context):
        """Send tap service creation RPC message to agent.

        This RPC message includes taas_id that is added vlan_range_start to
        so that taas-ovs-agent can use taas_id as VLANID.
        """
        # Get taas id associated with the Tap Service
        ts = context.tap_service
        tap_id_association = context.tap_id_association
        taas_vlan_id = tap_id_association['taas_id']
        port = self.service_plugin._get_port_details(context._plugin_context,
                                                     ts['port_id'])
        host = port['binding:host_id']

        rpc_msg = {'tap_service': ts,
                   'taas_id': taas_vlan_id,
                   'port': port}

        self.agent_rpc.create_tap_service(context._plugin_context,
                                          rpc_msg, host)
        return

最后一句self.agent_rpc.create_tap_service的详细代码如下:

    def create_tap_service(self, context, tap_service, host):
        LOG.debug("In RPC Call for Create Tap Service: Host=%s, MSG=%s" %
                  (host, tap_service))

        cctxt = self.client.prepare(fanout=True)
        cctxt.cast(context, 'create_tap_service', tap_service=tap_service,
                   host=host)

        return

如果不清楚rpc的具体流程可以重点看看cctxt与 cctxt.cast的详细流程:

首先, cctxt = self.client.prepare(fanout=True)

这句话的具体实现如下,该类的位置在neutron_lib.rpc.BackingOffClient

class BackingOffClient(oslo_messaging.RPCClient):
    """An oslo messaging RPC Client that implements a timeout backoff.

    This has all of the same interfaces as oslo_messaging.RPCClient but
    if the timeout parameter is not specified, the _BackingOffContextWrapper
    returned will track when call timeout exceptions occur and exponentially
    increase the timeout for the given call method.
    """
    def prepare(self, *args, **kwargs):
        ctx = super(BackingOffClient, self).prepare(*args, **kwargs)
        # don't back off contexts that explicitly set a timeout
        if 'timeout' in kwargs:
            return _ContextWrapper(ctx)
        return _BackingOffContextWrapper(ctx)

ctx就是一个利用工厂方法_CallContext._prepare()生成了一个oslo_messaging.rpc._CallContext对象,最后利用这个rpc对象去创建一个包装器对象

class _CallContext(_BaseCallContext):

    _marker = _BaseCallContext._marker

cctxt是neutron_lib.rpc._BackingOffContcxtWrapper对象,具体的cast的实现也是利用的上面的ctx对象的cast方法,将tap_service以及host参数打包发出去,请求的远程方法为create_tap_service

    def cast(self, ctxt, method, **kwargs):
        """Invoke a method and return immediately. See RPCClient.cast()."""
        msg = self._make_message(ctxt, method, kwargs)
        msg_ctxt = self.serializer.serialize_context(ctxt)

        self._check_version_cap(msg.get('version'))

        try:
            self.transport._send(self.target, msg_ctxt, msg, retry=self.retry)
        except driver_base.TransportDriverError as ex:
            raise ClientSendError(self.target, ex)

至此,create_tap_service在Plugin端的代码流程基本分析完毕。

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐