一、Linux 网络收包总览

在 TCP / IP 网络分层模型里,整个协议栈被分成了物理层、链路层、网络层,传输层和应用层。物理层对应的是网卡和网线,应用层对应的是我们常见的 Nginx,FTP 等等各种应用。Linux 实现的是链路层、网络层和传输层这三层。

在 Linux 内核实现中,链路层协议靠网卡驱动来实现,内核协议栈来实现网络层和传输层。内核对更上层的应用层提供 socket 接口来供用户进程访问。

图解 Linux 网络包接收过程

在 Linux 的源代码中,网络设备驱动对应的逻辑位于driver/net/ethernet。协议栈模块代码位于 kernel 和 net 目录。

内核网络收包过程

1、网卡收到数据包后,linux中第一个工作的模块就是网络驱动,网络驱动以DMA方式把网卡上收到的帧写到内存中。再向CPU发出硬中断以通知CPU有数据到了。
2、中断上半部来说(基本等同硬中断),当 CPU 收到中断请求后,会去调用网络驱动注册的中断处理函数,最后发送一个软中断信号通知下半部做进一步处理。
3、下半部(基本等于软中断)被软中断信号唤醒后,ksoftirqd 检测到有软中断请求到达,调用 poll 开始轮询收包,收到后交由各级协议栈处理,直到把数据送到应用程序。

图解 Linux 网络包接收过程

二、Linux 启动

Linux 驱动,内核协议栈等等模块在具备接收网卡数据包之前,要做很多的准备工作才行。比如要注册硬中断、提前创建好 ksoftirqd 内核线程、要注册好各个协议对应的处理函数,网络设备子系统要提前初始化好,网卡要启动好。只有这些都 Ready 之后,我们才能真正开始接收数据包。

2.1 注册硬中断

一般是在probe中使用devm_request_irq注册硬中断及handler。

err = devm_request_irq(eth->dev, eth->irq[1],
				       mtk_handle_irq_tx, 0,
				       dev_name(eth->dev), eth);
err = devm_request_irq(eth->dev, eth->irq[2],
				       mtk_handle_irq_rx, 0,
				       dev_name(eth->dev), &eth->rx_napi[0]);

2.2 创建 ksoftirqd 内核线程

Linux 的软中断都是在专门的内核线程(ksoftirqd)中进行的。该进程数量和核数相同。

root@/tmp/root/root# ps | grep ksoftirqd
    9 root         0 SW   [ksoftirqd/0]
   16 root         0 SW   [ksoftirqd/1]

系统初始化的时候在 kernel/smpboot.c 中调用了 smpboot_register_percpu_thread,该函数进一步会执行到 spawn_ksoftirqd(位于 kernel/softirq.c)来创建出 softirqd 进程。

图解 Linux 网络包接收过程

static struct smp_hotplug_thread softirq_threads = {
	.store			= &ksoftirqd,
	.thread_should_run	= ksoftirqd_should_run,
	.thread_fn		= run_ksoftirqd,
	.thread_comm		= "ksoftirqd/%u",
};

static __init int spawn_ksoftirqd(void)
{
	cpuhp_setup_state_nocalls(CPUHP_SOFTIRQ_DEAD, "softirq:dead", NULL,
				  takeover_tasklets);
	BUG_ON(smpboot_register_percpu_thread(&softirq_threads));

	return 0;
}
early_initcall(spawn_ksoftirqd);

当 ksoftirqd 被创建出来以后,它就会进入自己的线程循环函数 ksoftirqd_should_run 和 run_ksoftirqd 了。不停地判断有没有软中断需要被处理。软中断类型:

//file: include/linux/interrupt.h
enum{
    HI_SOFTIRQ=0, //start_kernel->softirq_init
    TIMER_SOFTIRQ,  //start_kernel->init_timers
    NET_TX_SOFTIRQ, //net_dev_init
    NET_RX_SOFTIRQ, //net_dev_init
    BLOCK_SOFTIRQ, //
    BLOCK_IOPOLL_SOFTIRQ,
    TASKLET_SOFTIRQ, //start_kernel->softirq_init
    SCHED_SOFTIRQ, //start_kernel->sched_init->init_sched_fair_class
    HRTIMER_SOFTIRQ, //start_kernel->hrtimers_init
    RCU_SOFTIRQ,  //start_kernel->rcu_init
};

2.3 网络子系统初始化

图解 Linux 网络包接收过程

linux 内核通过调用 subsys_initcall 来初始化各个子系统,会执行到 net_dev_init 函数。

//file: net/core/dev.c
static int __init net_dev_init(void){

	for_each_possible_cpu(i) {
		struct work_struct *flush = per_cpu_ptr(&flush_works, i);
		struct softnet_data *sd = &per_cpu(softnet_data, i);

		INIT_WORK(flush, flush_backlog);

		skb_queue_head_init(&sd->input_pkt_queue);
		skb_queue_head_init(&sd->process_queue);
#ifdef CONFIG_XFRM_OFFLOAD
		skb_queue_head_init(&sd->xfrm_backlog);
#endif
		INIT_LIST_HEAD(&sd->poll_list);
		sd->output_queue_tailp = &sd->output_queue;
#ifdef CONFIG_RPS
		sd->csd.func = rps_trigger_softirq;
		sd->csd.info = sd;
		sd->cpu = i;
#endif

		init_gro_hash(&sd->backlog);
		sd->backlog.poll = process_backlog;
		sd->backlog.weight = weight_p;
	}

    open_softirq(NET_TX_SOFTIRQ, net_tx_action);
    open_softirq(NET_RX_SOFTIRQ, net_rx_action);
}
subsys_initcall(net_dev_init);

在这个函数里,会为每个 CPU 都申请一个 softnet_data 数据结构,在这个数据结构里的 poll_list 是等待驱动程序将其 poll 函数注册进来,稍后网卡驱动初始化的时候我们可以看到这一过程。

另外 open_softirq 注册了每一种软中断都注册一个处理函数。NET_TX_SOFTIRQ 的处理函数为 net_tx_action,NET_RX_SOFTIRQ 的为 net_rx_action。继续跟踪 open_softirq 后发现这个注册的方式是记录在 softirq_vec 变量里的。后面 ksoftirqd 线程收到软中断的时候,也会使用这个变量来找到每一种软中断对应的处理函数。

//file: kernel/softirq.c
void open_softirq(int nr, void (*action)(struct softirq_action *)){
	softirq_vec[nr].action = action;
}

2.3 协议栈注册

内核实现了网络层的 ip 协议,也实现了传输层的 tcp 协议和 udp 协议。这些协议对应的实现函数分别是 ip_rcv (),tcp_v4_rcv () 和 udp_rcv ()。和我们平时写代码的方式不一样的是,内核是通过注册的方式来实现的。Linux 内核中的 fs_initcall 和 subsys_initcall 类似,也是初始化模块的入口。fs_initcall 调用 inet_init 后开始网络协议栈注册。通过 inet_init,将这些函数注册到了 inet_protos 和 ptype_base 数据结构中。如下图:

图解 Linux 网络包接收过程

相关代码如下

//file: net/ipv4/af_inet.c
static struct packet_type ip_packet_type __read_mostly = {
	.type = cpu_to_be16(ETH_P_IP),
	.func = ip_rcv,
	.list_func = ip_list_rcv,
};

static struct net_protocol tcp_protocol = {
	.early_demux	=	tcp_v4_early_demux,
	.early_demux_handler =  tcp_v4_early_demux,
	.handler	=	tcp_v4_rcv,
	.err_handler	=	tcp_v4_err,
	.no_policy	=	1,
	.netns_ok	=	1,
	.icmp_strict_tag_validation = 1,
};

/* thinking of making this const? Don't.
 * early_demux can change based on sysctl.
 */
static struct net_protocol udp_protocol = {
	.early_demux =	udp_v4_early_demux,
	.early_demux_handler =	udp_v4_early_demux,
	.handler =	udp_rcv,
	.err_handler =	udp_err,
	.no_policy =	1,
	.netns_ok =	1,
};

static const struct net_protocol icmp_protocol = {
	.handler =	icmp_rcv,
	.err_handler =	icmp_err,
	.no_policy =	1,
	.netns_ok =	1,
};

static int __init inet_init(void){
......
	if (inet_add_protocol(&icmp_protocol, IPPROTO_ICMP) < 0)
		pr_crit("%s: Cannot add ICMP protocol\n", __func__);
	if (inet_add_protocol(&udp_protocol, IPPROTO_UDP) < 0)
		pr_crit("%s: Cannot add UDP protocol\n", __func__);
	if (inet_add_protocol(&tcp_protocol, IPPROTO_TCP) < 0)
		pr_crit("%s: Cannot add TCP protocol\n", __func__);
#ifdef CONFIG_IP_MULTICAST
	if (inet_add_protocol(&igmp_protocol, IPPROTO_IGMP) < 0)
		pr_crit("%s: Cannot add IGMP protocol\n", __func__);
#endif
......
dev_add_pack(&ip_packet_type);
}

上面的代码中我们可以看到,udp_protocol 结构体中的 handler 是 udp_rcv,tcp_protocol 结构体中的 handler 是 tcp_v4_rcv,通过 inet_add_protocol 被初始化了进来。

int int inet_add_protocol(const struct net_protocol *prot, unsigned char protocol)
{
	if (!prot->netns_ok) {
		pr_err("Protocol %u is not namespace aware, cannot register.\n",
			protocol);
		return -EINVAL;
	}

	return !cmpxchg((const struct net_protocol **)&inet_protos[protocol],
			NULL, prot) ? 0 : -1;
}

inet_add_protocol 函数将 tcp 和 udp 对应的处理函数都注册到了 inet_protos 数组中了。再看 dev_add_pack (&ip_packet_type); 这一行,ip_packet_type 结构体中的 type 是协议名,func 是 ip_rcv 函数,在 dev_add_pack 中会被注册到 ptype_base 哈希表中。

//file: net/core/dev.c
void dev_add_pack(struct packet_type *pt)
{
	struct list_head *head = ptype_head(pt);

	spin_lock(&ptype_lock);
	list_add_rcu(&pt->list, head);
	spin_unlock(&ptype_lock);
}
static inline struct list_head *ptype_head(const struct packet_type *pt)
{
	if (pt->type == htons(ETH_P_ALL))
		return pt->dev ? &pt->dev->ptype_all : &ptype_all;
	else
		return pt->dev ? &pt->dev->ptype_specific :
				 &ptype_base[ntohs(pt->type) & PTYPE_HASH_MASK];
}

这里我们需要记住 inet_protos 记录着 udp,tcp 的处理函数地址,ptype_base 存储着 ip_rcv () 函数的处理地址。后面我们会看到软中断中会通过 ptype_base 找到 ip_rcv 函数地址,进而将 ip 包正确地送到 ip_rcv () 中执行。在 ip_rcv 中将会通过 inet_protos 找到 tcp 或者 udp 的处理函数,再而把包转发给 udp_rcv () 或 tcp_v4_rcv () 函数。

2.5 网卡驱动初始化

每一个驱动程序(不仅仅只是网卡驱动)会向内核注册driver结构体,如platform_driver。

static struct platform_driver mtk_driver = {
	.probe = mtk_probe,
	.remove = mtk_remove,
	.driver = {
		.name = "mtk_soc_eth",
		.of_match_table = of_mtk_match,
	},
};

module_platform_driver(mtk_driver);

实际上使用 module_init 向内核注册一个初始化函数,当驱动被加载时,内核会调用这个函数。

static int __init xxx_init(void)
{
	return platform_driver_register(&xxx_driver);
}
module_init(xxx_init);

platform_driver_register调用完成后,Linux 内核就知道了该驱动的相关信息,比如driver_name 和 probe 函数地址等等。当网卡设备被识别以后,内核会调用其驱动的 probe 方法,probe 方法执行的目的就是让设备 ready。主要执行的操作如下:

图解 Linux 网络包接收过程

第 5 步中我们看到,网卡驱动实现了 ethtool 所需要的接口,也在这里注册完成函数地址的注册。当 ethtool 发起一个系统调用之后,内核会找到对应操作的回调函数。这个命令之所以能查看网卡收发包统计、能修改网卡自适应模式、能调整 RX 队列的数量和大小,是因为 ethtool 命令最终调用到了网卡驱动的相应方法。

static const struct ethtool_ops mtk_ethtool_ops = {
	.get_link_ksettings	= mtk_get_link_ksettings,
	.set_link_ksettings	= mtk_set_link_ksettings,
	.get_drvinfo		= mtk_get_drvinfo,
	.get_msglevel		= mtk_get_msglevel,
	.set_msglevel		= mtk_set_msglevel,
	.nway_reset		= mtk_nway_reset,
#if IS_ENABLED(CONFIG_BONDING)
	.get_link		= mtk_get_link,
#else
	.get_link		= ethtool_op_get_link,
#endif
	.get_strings		= mtk_get_strings,
	.get_sset_count		= mtk_get_sset_count,
	.get_ethtool_stats	= mtk_get_ethtool_stats,
	.get_rxnfc		= mtk_get_rxnfc,
	.set_rxnfc              = mtk_set_rxnfc,
};

第 6 步注册的netdev_ops 中包含的是 xxx_open 等函数,该函数在网卡被启动的时候会被调用。

static const struct net_device_ops mtk_netdev_ops = {
	.ndo_init		= mtk_init,
	.ndo_uninit		= mtk_uninit,
	.ndo_open		= mtk_open,
	.ndo_stop		= mtk_stop,
	.ndo_start_xmit		= mtk_start_xmit,
	.ndo_set_mac_address	= mtk_set_mac_address,
	.ndo_validate_addr	= eth_validate_addr,
	.ndo_do_ioctl		= mtk_do_ioctl,
	.ndo_tx_timeout		= mtk_tx_timeout,
	.ndo_get_stats64        = mtk_get_stats64,
	.ndo_fix_features	= mtk_fix_features,
	.ndo_set_features	= mtk_set_features,
#ifdef CONFIG_NET_POLL_CONTROLLER
	.ndo_poll_controller	= mtk_poll_controller,
#endif
};

第 7 步中,xxx_probe 初始化过程中,还调用到了 netif_napi_add注册 NAPI 机制所必须的 poll 函数,比如mtk_napi_tx、mtk_napi_rx。

netif_napi_add(&eth->dummy_dev, &eth->tx_napi, mtk_napi_tx,
		       MTK_NAPI_WEIGHT);
netif_napi_add(&eth->dummy_dev, &eth->rx_napi[0].napi, mtk_napi_rx,
		       MTK_NAPI_WEIGHT);

2.6 启动网卡

当上面的初始化都完成以后,就可以启动网卡了。回忆前面网卡驱动初始化时,我们提到了驱动向内核注册了 structure net_device_ops 变量,它包含着网卡启用、发包、设置 mac 地址等回调函数(函数指针)。当启用一个网卡时(例如,通过 ifconfig eth0 up),net_device_ops 中的 xxx_open 方法会被调用。它通常会做以下事情:

图解 Linux 网络包接收过程

图 7 启动网卡

// 注册中断也可能在probe中进行。
static int mtk_open(struct net_device *dev)
{
	struct mtk_mac *mac = netdev_priv(dev);
	struct mtk_eth *eth = mac->hw;
	int err, i;

	/* we run 2 netdevs on the same dma ring so we only bring it up once */
	if (!refcount_read(&eth->dma_refcnt)) {
		int err = mtk_start_dma(eth);

		if (err)
			return err;

		mtk_gdm_config(eth, MTK_GDMA_TO_PDMA);

		/* Indicates CDM to parse the MTK special tag from CPU */
		if (netdev_uses_dsa(dev)) {
			u32 val;
			val = mtk_r32(eth, MTK_CDMQ_IG_CTRL);
			mtk_w32(eth, val | MTK_CDMQ_STAG_EN, MTK_CDMQ_IG_CTRL);
			val = mtk_r32(eth, MTK_CDMP_IG_CTRL);
			mtk_w32(eth, val | MTK_CDMP_STAG_EN, MTK_CDMP_IG_CTRL);
		}
		// 开启NAPI
		napi_enable(&eth->tx_napi);
		napi_enable(&eth->rx_napi[0].napi);
		mtk_tx_irq_enable(eth, MTK_TX_DONE_INT);
		mtk_rx_irq_enable(eth, MTK_RX_DONE_INT(0));

		refcount_set(&eth->dma_refcnt, 1);
	}
	else
		refcount_inc(&eth->dma_refcnt);

	phylink_start(mac->phylink);
	netif_start_queue(dev);
	return 0;
}

当做好以上准备工作以后,就可以开门迎客(数据包)了!

三、迎接数据的到来

3.1 硬中断处理

首先当数据帧从网线到达网卡上的时候,第一站是网卡的接收队列。网卡在分配给自己的 RingBuffer 中寻找可用的内存位置,找到后 DMA 引擎会把数据 DMA 到网卡之前关联的内存里,这个时候 CPU 都是无感的。当 DMA 操作完成以后,网卡向 CPU 发起一个硬中断,通知 CPU 有数据到达。

图解 Linux 网络包接收过程

注意:当 RingBuffer 满的时候,新来的数据包将给丢弃。ifconfig 查看网卡的时候,可以里面有个 overruns,表示因为环形队列满被丢弃的包。如果发现有丢包,可能需要通过 ethtool 命令来加大环形队列的长度。

在启动网卡一节,我们说到了网卡的硬中断注册的处理函数是 igb_msix_ring。

static irqreturn_t mtk_handle_irq_rx(int irq, void *priv)
{
	struct mtk_napi *rx_napi = priv;
	struct mtk_eth *eth = rx_napi->eth;
	struct mtk_rx_ring *ring = rx_napi->rx_ring;

	if (likely(napi_schedule_prep(&rx_napi->napi))) {
        // 关闭中断,开始poll
		mtk_rx_irq_disable(eth, MTK_RX_DONE_INT(ring->ring_no));
		__napi_schedule(&rx_napi->napi);
	}

	return IRQ_HANDLED;
}
/* Called with irq disabled */
static inline void ____napi_schedule(struct softnet_data *sd,
				     struct napi_struct *napi)
{
	struct task_struct *thread;

	if (test_bit(NAPI_STATE_THREADED, &napi->state)) {
		/* Paired with smp_mb__before_atomic() in
		 * napi_enable()/dev_set_threaded().
		 * Use READ_ONCE() to guarantee a complete
		 * read on napi->thread. Only call
		 * wake_up_process() when it's not NULL.
		 */
		thread = READ_ONCE(napi->thread);
		if (thread) {
			if (thread->state != TASK_INTERRUPTIBLE)
				set_bit(NAPI_STATE_SCHED_THREADED, &napi->state);
			wake_up_process(thread);
			return;
		}
	}

	list_add_tail(&napi->poll_list, &sd->poll_list);
	__raise_softirq_irqoff(NET_RX_SOFTIRQ);
}

这里我们看到,list_add_tail 将驱动传过来的 poll_list 添加到CPU 变量 softnet_data 里的 poll_list。其中 softnet_data 中的 poll_list 是一个双向链表,其中的设备都带有输入帧等着被处理。紧接着__raise_softirq_irqoff 触发了一个软中断 NET_RX_SOFTIRQ,这个所谓的触发过程只是对一个变量进行了一次或运算而已。

void __raise_softirq_irqoff(unsigned int nr)
{
	trace_softirq_raise(nr);
	or_softirq_pending(1UL << nr);
}
#define or_softirq_pending(x)	(__this_cpu_or(local_softirq_pending_ref, (x)))

我们说过,Linux 在硬中断里只完成简单必要的工作,剩下的大部分的处理都是转交给软中断的。通过上面代码可以看到,硬中断处理过程真的是非常短。只是记录了一个寄存器,修改了一下下 CPU 的 poll_list,然后发出个软中断。就这么简单,硬中断工作就算是完成了。

3.2 ksoftirqd 内核线程处理软中断

图解 Linux 网络包接收过程

内核线程初始化的时候,我们介绍了 ksoftirqd 中两个线程函数 ksoftirqd_should_run 和 run_ksoftirqd。其中 ksoftirqd_should_run 代码如下:

static int ksoftirqd_should_run(unsigned int cpu)
{
	return local_softirq_pending();
}
#define local_softirq_pending()	(__this_cpu_read(local_softirq_pending_ref))

这里看到和硬中断中调用了同一个函数 local_softirq_pending。使用方式不同的是硬中断位置是为了写入标记,这里仅仅只是读取。如果硬中断中设置了 NET_RX_SOFTIRQ, 这里自然能读取的到。接下来会真正进入线程函数中 run_ksoftirqd 处理:

static void run_ksoftirqd(unsigned int cpu)
{
	local_irq_disable();
	if (local_softirq_pending()) {
		/*
		 * We can safely run softirq on inline stack, as we are not deep
		 * in the task stack here.
		 */
		__do_softirq();
		local_irq_enable();
		cond_resched();
		return;
	}
	local_irq_enable();
}

在__do_softirq 中,判断根据当前 CPU 的软中断类型,调用其注册的 action 方法。

asmlinkage void __do_softirq(void){
	h = softirq_vec;

	while ((softirq_bit = ffs(pending))) {
		unsigned int vec_nr;
		int prev_count;

		h += softirq_bit - 1;

		vec_nr = h - softirq_vec;
		prev_count = preempt_count();

		kstat_incr_softirqs_this_cpu(vec_nr);

		trace_softirq_entry(vec_nr);
		h->action(h);
		trace_softirq_exit(vec_nr);
		if (unlikely(prev_count != preempt_count())) {
			pr_err("huh, entered softirq %u %s %p with preempt_count %08x, exited with %08x?\n",
			       vec_nr, softirq_to_name[vec_nr], h->action,
			       prev_count, preempt_count());
			preempt_count_set(prev_count);
		}
		h++;
		pending >>= softirq_bit;
	}
}

之前为 NET_RX_SOFTIRQ 注册了处理函数 net_rx_action。所以 net_rx_action 函数就会被执行到了。这里需要注意一个细节,硬中断中设置软中断标记,和 ksoftirq 的判断是否有软中断到达,都是基于 smp_processor_id () 的。这意味着只要硬中断在哪个 CPU 上被响应,那么软中断也是在这个 CPU 上处理的。所以说,如果你发现你的 Linux 软中断 CPU 消耗都集中在一个核上的话,做法是要把调整硬中断的 CPU 亲和性,来将硬中断打散到不同的 CPU 核上去。

static __latent_entropy void net_rx_action(struct softirq_action *h)
{
	struct softnet_data *sd = this_cpu_ptr(&softnet_data);
	unsigned long time_limit = jiffies +
		usecs_to_jiffies(netdev_budget_usecs);
	int budget = netdev_budget;
	LIST_HEAD(list);
	LIST_HEAD(repoll);

	local_irq_disable();
	list_splice_init(&sd->poll_list, &list);
	local_irq_enable();

	for (;;) {
		struct napi_struct *n;

		if (list_empty(&list)) {
			if (!sd_has_rps_ipi_waiting(sd) && list_empty(&repoll))
				goto out;
			break;
		}

		n = list_first_entry(&list, struct napi_struct, poll_list);
		budget -= napi_poll(n, &repoll);

		/* If softirq window is exhausted then punt.
		 * Allow this to run for 2 jiffies since which will allow
		 * an average latency of 1.5/HZ.
		 */
		if (unlikely(budget <= 0 ||
			     time_after_eq(jiffies, time_limit))) {
			sd->time_squeeze++;
			break;
		}
	}

	local_irq_disable();

	list_splice_tail_init(&sd->poll_list, &list);
	list_splice_tail(&repoll, &list);
	list_splice(&list, &sd->poll_list);
	if (!list_empty(&sd->poll_list))
		__raise_softirq_irqoff(NET_RX_SOFTIRQ);

	net_rps_action_and_irq_enable(sd);
out:
	__kfree_skb_flush();
}

函数开头的 time_limit 和 budget 是用来控制 net_rx_action 函数主动退出的,目的是保证网络包的接收不霸占 CPU 不放。等下次网卡再有硬中断过来的时候再处理剩下的接收数据包。其中 budget 可以通过内核参数调整。这个函数中剩下的核心逻辑是获取到当前 CPU 变量 softnet_data,对其 poll_list 进行遍历,然后执行到网卡驱动注册到的 poll 函数如mtk_napi_rx。

static int mtk_napi_rx(struct napi_struct *napi, int budget)
{
	struct mtk_napi *rx_napi = container_of(napi, struct mtk_napi, napi);
	struct mtk_eth *eth = rx_napi->eth;
	struct mtk_rx_ring *ring = rx_napi->rx_ring;
	u32 status, mask;
	int rx_done = 0;
	int remain_budget = budget;

	mtk_handle_status_irq(eth);

poll_again:
	mtk_w32(eth, MTK_RX_DONE_INT(ring->ring_no), MTK_PDMA_INT_STATUS);
	rx_done = mtk_poll_rx(napi, remain_budget, eth);

	if (unlikely(netif_msg_intr(eth))) {
		status = mtk_r32(eth, MTK_PDMA_INT_STATUS);
		mask = mtk_r32(eth, MTK_PDMA_INT_MASK);
		dev_info(eth->dev,
			 "done rx %d, intr 0x%08x/0x%x\n",
			 rx_done, status, mask);
	}
	if (rx_done == remain_budget)
		return budget;

	status = mtk_r32(eth, MTK_PDMA_INT_STATUS);
	if (status & MTK_RX_DONE_INT(ring->ring_no)) {
		remain_budget -= rx_done;
		goto poll_again;
	}

	if (napi_complete(napi))
		mtk_rx_irq_enable(eth, MTK_RX_DONE_INT(ring->ring_no));

	return rx_done + budget - remain_budget;
}

static int mtk_poll_rx(struct napi_struct *napi, int budget,
		       struct mtk_eth *eth)
{
	struct mtk_napi *rx_napi = container_of(napi, struct mtk_napi, napi);
	struct mtk_rx_ring *ring = rx_napi->rx_ring;
	int idx;
	struct sk_buff *skb;
	u8 *data, *new_data;
	struct mtk_rx_dma *rxd, trxd;
	int done = 0;

	if (unlikely(!ring))
		goto rx_done;

	while (done < budget) {
		
		idx = NEXT_DESP_IDX(ring->calc_idx, ring->dma_size);
		rxd = &ring->dma[idx];
		data = ring->data[idx];
............
		/* receive data */
		skb = build_skb(data, 0);
		skb_reserve(skb, NET_SKB_PAD + NET_IP_ALIGN);

		pktlen = RX_DMA_GET_PLEN0(trxd.rxd2);
		skb->dev = netdev;
		skb_put(skb, pktlen);

		if ((!MTK_HAS_CAPS(eth->soc->caps, MTK_NETSYS_V2) &&
				  (trxd.rxd4 & eth->rx_dma_l4_valid)) ||
		    (MTK_HAS_CAPS(eth->soc->caps, MTK_NETSYS_V2) &&
				  (trxd.rxd3 & eth->rx_dma_l4_valid)))
			skb->ip_summed = CHECKSUM_UNNECESSARY;
		else
			skb_checksum_none_assert(skb);
		skb->protocol = eth_type_trans(skb, netdev);

		if (netdev->features & NETIF_F_HW_VLAN_CTAG_RX) {
			if (MTK_HAS_CAPS(eth->soc->caps, MTK_NETSYS_V2)) {
				if (trxd.rxd3 & RX_DMA_VTAG_V2)
					__vlan_hwaccel_put_tag(skb,
					htons(RX_DMA_VPID_V2(trxd.rxd4)),
					RX_DMA_VID_V2(trxd.rxd4));
			} else {
				if (trxd.rxd2 & RX_DMA_VTAG)
					__vlan_hwaccel_put_tag(skb,
					htons(RX_DMA_VPID(trxd.rxd3)),
					RX_DMA_VID(trxd.rxd3));
			}

			/* If netdev is attached to dsa switch, the special
			 * tag inserted in VLAN field by switch hardware can
			 * be offload by RX HW VLAN offload. Clears the VLAN
			 * information from @skb to avoid unexpected 8021d
			 * handler before packet enter dsa framework.
			 */
			if (netdev_uses_dsa(netdev))
				__vlan_hwaccel_clear_tag(skb);
		}

#if defined(CONFIG_NET_MEDIATEK_HNAT) || defined(CONFIG_NET_MEDIATEK_HNAT_MODULE)
#if defined(CONFIG_MEDIATEK_NETSYS_V2)
		if (MTK_HAS_CAPS(eth->soc->caps, MTK_NETSYS_V2))
			*(u32 *)(skb->head) = trxd.rxd5;
		else
#endif
			*(u32 *)(skb->head) = trxd.rxd4;

		skb_hnat_alg(skb) = 0;
		skb_hnat_filled(skb) = 0;
		skb_hnat_magic_tag(skb) = HNAT_MAGIC_TAG;

		if (skb_hnat_reason(skb) == HIT_BIND_FORCE_TO_CPU) {
			trace_printk("[%s] reason=0x%x(force to CPU) from WAN to Ext\n",
				     __func__, skb_hnat_reason(skb));
			skb->pkt_type = PACKET_HOST;
		}
#endif
		skb_record_rx_queue(skb, 0);
		napi_gro_receive(napi, skb);

skip_rx:
		ring->data[idx] = new_data;
		rxd->rxd1 = (unsigned int)dma_addr;

release_desc:
		if (MTK_HAS_CAPS(eth->soc->caps, MTK_SOC_MT7628))
			rxd->rxd2 = RX_DMA_LSO;
		else
			rxd->rxd2 = RX_DMA_PLEN0(ring->buf_size);

		ring->calc_idx = idx;

		done++;
	}
	return done;
}

获取下来的一个数据帧用一个 skb 来表示。收取完数据以后,对其进行一些校验,然后开始设置 sbk 变量的 timestamp, VLAN id, protocol 等字段。接下来进入到 napi_gro_receive 中:

gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb)
{
	gro_result_t ret;

	skb_mark_napi_id(skb, napi);
	trace_napi_gro_receive_entry(skb);

	skb_gro_reset_offset(skb, 0);

	ret = napi_skb_finish(napi, skb, dev_gro_receive(napi, skb));
	trace_napi_gro_receive_exit(ret);

	return ret;
}

dev_gro_receive 这个函数代表的是网卡 GRO 特性,可以简单理解成把相关的小包合并成一个大包就行,目的是减少传送给网络栈的包数,这有助于减少 CPU 的使用量。我们暂且忽略,直接看 napi_skb_finish, 这个函数主要就是调用了 netif_receive_skb。

static gro_result_t napi_skb_finish(struct napi_struct *napi,
				    struct sk_buff *skb,
				    gro_result_t ret)
{
	switch (ret) {
	case GRO_NORMAL:
		gro_normal_one(napi, skb, 1);//最终调用deliver_skb
		break;

	case GRO_DROP:
		kfree_skb(skb);
		break;

	case GRO_MERGED_FREE:
		if (NAPI_GRO_CB(skb)->free == NAPI_GRO_FREE_STOLEN_HEAD)
			napi_skb_free_stolen_head(skb);
		else
			__kfree_skb(skb);
		break;

	case GRO_HELD:
	case GRO_MERGED:
	case GRO_CONSUMED:
		break;
	}

	return ret;
}

在 netif_receive_skb 中,数据包将被送到协议栈中。

3.3 网络协议栈处理

netif_receive_skb 函数会根据包的协议,假如是 udp 包,会将包依次送到 ip_rcv (),udp_rcv () 协议处理函数中进行处理。

图解 Linux 网络包接收过程

图 10 网络协议栈处理

napi_skb_finish
    gro_normal_one
    	gro_normal_list
    		netif_receive_skb_list_internal
    			enqueue_to_backlog
    			__netif_receive_skb_list
    				__netif_receive_skb_list_core
    					deliver_skb

static inline int deliver_skb(struct sk_buff *skb,
			      struct packet_type *pt_prev,
			      struct net_device *orig_dev)
{
	if (unlikely(skb_orphan_frags_rx(skb, GFP_ATOMIC)))
		return -ENOMEM;
	refcount_inc(&skb->users);
	return pt_prev->func(skb, skb->dev, pt_prev, orig_dev);
}

pt_prev->func 这一行就调用到了协议层注册的处理函数了。对于 ip 包来讲,就会进入到 ip_rcv(如果是 arp 包的话,会进入到 arp_rcv)。

3.4 IP 协议层处理

int ip_rcv(struct sk_buff *skb, struct net_device *dev, struct packet_type *pt,
	   struct net_device *orig_dev)
{
	struct net *net = dev_net(dev);

	skb = ip_rcv_core(skb, net);
	if (skb == NULL)
		return NET_RX_DROP;

	return NF_HOOK(NFPROTO_IPV4, NF_INET_PRE_ROUTING,
		       net, NULL, skb, dev, NULL,
		       ip_rcv_finish);
}


这里 NF_HOOK 是一个钩子函数,当执行完注册的钩子后就会执行到最后一个参数指向的函数 ip_rcv_finish。

static int ip_rcv_finish(struct net *net, struct sock *sk, struct sk_buff *skb)
{
	struct net_device *dev = skb->dev;
	int ret;

	/* if ingress device is enslaved to an L3 master device pass the
	 * skb to its handler for processing
	 */
	skb = l3mdev_ip_rcv(skb);
	if (!skb)
		return NET_RX_SUCCESS;

	ret = ip_rcv_finish_core(net, sk, skb, dev);
	if (ret != NET_RX_DROP)
		ret = dst_input(skb);
	return ret;
}

int ip_local_deliver(struct sk_buff *skb)
{
	/*
	 *	Reassemble IP fragments.
	 */
	struct net *net = dev_net(skb->dev);

	if (ip_is_fragment(ip_hdr(skb))) {
		if (ip_defrag(net, skb, IP_DEFRAG_LOCAL_DELIVER))
			return 0;
	}

	return NF_HOOK(NFPROTO_IPV4, NF_INET_LOCAL_IN,
		       net, NULL, skb, skb->dev, NULL,
		       ip_local_deliver_finish);
}

static int ip_local_deliver_finish(struct net *net, struct sock *sk, struct sk_buff *skb)
{
	__skb_pull(skb, skb_network_header_len(skb));

	rcu_read_lock();
	ip_protocol_deliver_rcu(net, skb, ip_hdr(skb)->protocol);
	rcu_read_unlock();

	return 0;
}
void ip_protocol_deliver_rcu(struct net *net, struct sk_buff *skb, int protocol)
{
	const struct net_protocol *ipprot;
	int raw, ret;

resubmit:
	raw = raw_local_deliver(skb, protocol);

	ipprot = rcu_dereference(inet_protos[protocol]);
	if (ipprot) {
		if (!ipprot->no_policy) {
			if (!xfrm4_policy_check(NULL, XFRM_POLICY_IN, skb)) {
				kfree_skb(skb);
				return;
			}
			nf_reset_ct(skb);
		}
		ret = INDIRECT_CALL_2(ipprot->handler, tcp_v4_rcv, udp_rcv,
				      skb);
		if (ret < 0) {
			protocol = -ret;
			goto resubmit;
		}
		__IP_INC_STATS(net, IPSTATS_MIB_INDELIVERS);
	} 
    ...
}

inet_protos 中保存着 tcp_rcv () 和 udp_rcv () 的函数地址。这里将会根据包中的协议类型选择进行分发,在这里 skb 包将会进一步被派送到更上层的协议中,udp 和 tcp。

四、recvfrom 系统调用

Linux 内核对数据包的接收和处理后最后把数据包放到 socket 的接收队列中了。那么我们再回头看用户进程调用 recvfrom 后是发生了什么。我们在代码里调用的 recvfrom 是一个 glibc 的库函数,该函数在执行后会将用户进行陷入到内核态,进入到 Linux 实现的系统调用 sys_recvfrom。在理解 Linux 对 sys_revvfrom 之前,我们先来简单看一下 socket 这个核心数据结构。这个数据结构太大了,我们只把对和我们今天主题相关的内容画出来,如下:

图解 Linux 网络包接收过程

socket 数据结构中的 const struct proto_ops 对应的是协议的方法集合。每个协议都会实现不同的方法集,对于 IPv4 Internet 协议族来说,每种协议都有对应的处理方法,如下。对于 udp 来说,是通过 inet_dgram_ops 来定义的,其中注册了 inet_recvmsg 方法。

const struct proto_ops inet_stream_ops = {
	.sendmsg	   = inet_sendmsg,
	.recvmsg	   = inet_recvmsg,
#ifdef CONFIG_MMU
	.mmap		   = tcp_mmap,
#endif
}
const struct proto_ops inet_dgram_ops = {
	......
	.recvmsg	   = inet_recvmsg,
	.mmap		   = sock_no_mmap,
};

socket 数据结构中的另一个数据结构 struct sock *sk 是一个非常大,非常重要的子结构体。其中的 sk_prot 又定义了二级处理函数。对于 UDP 协议来说,会被设置成 UDP 协议实现的方法集 udp_prot。

struct proto udp_prot = {
	.name			= "UDP",
	.owner			= THIS_MODULE,
	...
	.sendmsg		= udp_sendmsg,
	.recvmsg		= udp_recvmsg,
	.sendpage		= udp_sendpage,
};

看完了 socket 变量之后,我们再来看 sys_revvfrom 的实现过程。

图解 Linux 网络包接收过程

图 12 recvfrom 函数内部实现过程

在 inet_recvmsg 调用了 sk->sk_prot->recvmsg。

//file: net/ipv4/af_inet.c
int inet_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,size_t size, int flags){  	struct sock *sk = sock->sk;
	int addr_len = 0;
	int err;

	if (likely(!(flags & MSG_ERRQUEUE)))
		sock_rps_record_flow(sk);

	err = INDIRECT_CALL_2(sk->sk_prot->recvmsg, tcp_recvmsg, udp_recvmsg,
			      sk, msg, size, flags & MSG_DONTWAIT,
			      flags & ~MSG_DONTWAIT, &addr_len);
	if (err >= 0)
		msg->msg_namelen = addr_len;
	return err;
}

struct sk_buff *__skb_recv_udp(struct sock *sk, unsigned int flags,
			       int noblock, int *off, int *err)
{
	struct sk_buff_head *sk_queue = &sk->sk_receive_queue;
	struct sk_buff_head *queue;
	struct sk_buff *last;
	long timeo;
	int error;

	queue = &udp_sk(sk)->reader_queue;
	flags |= noblock ? MSG_DONTWAIT : 0;
	timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
	do {
		struct sk_buff *skb;

		error = sock_error(sk);
		if (error)
			break;

		error = -EAGAIN;
		do {
			spin_lock_bh(&queue->lock);
			skb = __skb_try_recv_from_queue(sk, queue, flags,
							udp_skb_destructor,
							off, err, &last);
			if (skb) {
				spin_unlock_bh(&queue->lock);
				return skb;
			}

			if (skb_queue_empty_lockless(sk_queue)) {
				spin_unlock_bh(&queue->lock);
				goto busy_check;
			}

			/* refill the reader queue and walk it again
			 * keep both queues locked to avoid re-acquiring
			 * the sk_receive_queue lock if fwd memory scheduling
			 * is needed.
			 */
			spin_lock(&sk_queue->lock);
			skb_queue_splice_tail_init(sk_queue, queue);

			skb = __skb_try_recv_from_queue(sk, queue, flags,
							udp_skb_dtor_locked,
							off, err, &last);
			spin_unlock(&sk_queue->lock);
			spin_unlock_bh(&queue->lock);
			if (skb)
				return skb;

busy_check:
			if (!sk_can_busy_loop(sk))
				break;

			sk_busy_loop(sk, flags & MSG_DONTWAIT);
		} while (!skb_queue_empty_lockless(sk_queue));

		/* sk_queue is empty, reader_queue may contain peeked packets */
	} while (timeo &&
		 !__skb_wait_for_more_packets(sk, &error, &timeo,
					      (struct sk_buff *)sk_queue));

	*err = error;
	return NULL;
}

访问 sk->sk_receive_queue读取数据。如果没有数据,且用户也允许等待,则将调用 wait_for_more_packets () 执行等待操作,它加入会让用户进程进入睡眠状态。

五、总结

当用户执行完 recvfrom 调用后,用户进程就通过系统调用进行到内核态工作了。如果接收队列没有数据,进程就进入睡眠状态被操作系统挂起。剩下大部分都是由 Linux 内核其它模块来完成。

首先在开始收包之前,Linux 要做许多的准备工作:

  1. 创建 ksoftirqd 线程,为它设置好它自己的线程函数,以便后面处理软中断。

  2. 协议栈注册,linux 要实现许多协议,比如 arp,icmp,ip,udp,tcp,每一个协议都会将自己的处理函数注册一下,方便包来了迅速找到对应的处理函数

  3. 网卡驱动初始化,每个驱动都有一个初始化函数,内核会让驱动也初始化一下。在这个初始化过程中,把自己的 DMA 准备好,把 NAPI 的 poll 函数地址告诉内核

  4. 启动网卡,分配 RX,TX 队列,注册中断对应的处理函数

以上是内核准备收包之前的重要工作,当上面都 ready 之后,就可以打开硬中断,等待数据包的到来了。

当数据到来了以后,第一个迎接它的是网卡(我去,这不是废话么):

\1. 网卡将数据帧 DMA 到内存的 RingBuffer 中,然后向 CPU 发起中断通知

\2. CPU 响应中断请求,调用网卡启动时注册的中断处理函数

\3. 中断处理函数几乎没干啥,就发起了软中断请求

\4. 内核线程 ksoftirqd 线程发现有软中断请求到来,先关闭硬中断

\5. ksoftirqd 线程开始调用驱动的 poll 函数收包

\6. poll 函数将收到的包送到协议栈注册的 ip_rcv 函数中

\7. ip_rcv 函数再讲包送到 udp_rcv 函数中(对于 tcp 包就送到 tcp_rcv)

理解了整个收包过程以后,我们就能明确知道 Linux 收一个包的 CPU 开销了。首先第一块是用户进程调用系统调用陷入内核态的开销。第二块是 CPU 响应包的硬中断的 CPU 开销。第三块是 ksoftirqd 内核线程的软中断上下文花费的。

Logo

更多推荐