当前位置: 首页 > news >正文

linux之网络子系统-GRO机制分析

一、起因

linux之网络子系统-NAPI机制的分析-CSDN博客

在写上面这篇文章时,igb  网卡驱动,根据软中断时,会调用napi_gro_receive 。这个函数接口是 GRO机制的开始。下面来分析 一下 GRO机制的处理流程。

二、GRO (Generic receive offload) 的作用

GRO是针对报文接收方向的,是指设备链路层在接收报文处理的时候,将多个小包合并成一个大包一起上送协议栈,减少数据包在协议栈间交互的机制。

GRO 是LGO(Large Receive Offload, 多数是在NIC 上实现的一种硬件优化机制)的一种软件实现,从而让所有NIC 都支持这个功能。网络上大部分MTU 都是1500字节,开启Jumbo Frame 后能到9000字节,如果 发送的数据超过MTU 就需要切割成多个数据包。通过合并[足够类似] 的包来减少传送给网络协议栈的包数,有助于减少CPU的使用量。GRO使协议层只需处理一个header,而将包含大量数据的整个大包送到用户程序。如果tcpdump 抓包看到机器收到了 不现实的、非常大的包,这很可能是系统开启了GRO。GRO 和 硬中断合并的思想类似,不过阶段不同。硬中断合并 是在中断发起之前,而GRO已经在软中断处理中了。

GRO 的主要功能是 将多个TCP/UDP 数据 聚合在一个skb 结构,然后作为一个大数据包 交付给上层的网络协议栈,以减少上层协议处理skb 的开销,提高系统接收数据包的性能。scatter-gather IO 的思想应用。合并了多个skb 的超级skb 能够一次性通过网络协议栈,从而减轻CPU负责。

GRO是在协议栈接收报文时进行减负的一种处理方式,该方式在设计上考虑了多种协议报文。主要原理是在接收端通过把多个相关的报文(比如TCP分段报文)组装成一个大的报文后再传送给协议栈进行处理,因为内核协议栈对报文的处理都是对报文头部进行处理,如果相关的多个报文合并后只有一个报文头,这样就减少了协议栈处理报文个数,加快协议栈对报文的处理速度。

GRO是针对网络收报流程进行改进的,并且只有NAPI类型的驱动才能支持此功能。因此如果要支持GRO,不仅要内核支持,驱动也必须调用相应的接口来开启此功能。 用 ethtool -K ethX gro on/off 来开启/关闭 GRO ,如果报错就说明网卡驱动本身就不支持GRO。GRO虽然可以提升吞吐,但同时也会带来一定是时延增加。GRO功能对到本机的报文能起到一定的加速作用,但如果linux 运行在转发设备上,一般不需要使用GRO功能,这时使用GRO功能反而会降低处理速度

为什么只有NAPI 类型的驱动才能支持?

先看一下NAPI和非NAPI 处理流程的对比:

 process_backlog 函数实现中,是从 process_queue 队列取出skb后,直接就调用 __netif_receive_skb(skb);所以是一个一个的数据包发送。process_backlog 函数是内核实现的函数,在网络设备子系统初始化时就注册进内核,也不会更改,所以不支持GRO。

而napi_poll 函数 会调用napi_gro_receive 函数接口,就会有合并功能,后面会具体分析这个函数的实现。

目前 网卡驱动,都是走NAPI机制。

通过命令打开GRO 功能:

在 调用dev_gro_receive 函数收包时会判断是否支持GRO,代码如下:

static enum gro_result dev_gro_receive(struct napi_struct *napi, struct sk_buff *skb)
{u32 hash = skb_get_hash_raw(skb) & (GRO_HASH_BUCKETS - 1);struct list_head *head = &offload_base;struct packet_offload *ptype;__be16 type = skb->protocol;struct list_head *gro_head;struct sk_buff *pp = NULL;enum gro_result ret;int same_flow;int grow;if (netif_elide_gro(skb->dev))goto normal;gro_head = gro_list_prepare(napi, skb);rcu_read_lock();list_for_each_entry_rcu(ptype, head, list) {if (ptype->type != type || !ptype->callbacks.gro_receive)continue;skb_set_network_header(skb, skb_gro_offset(skb));skb_reset_mac_len(skb);NAPI_GRO_CB(skb)->same_flow = 0;NAPI_GRO_CB(skb)->flush = skb_is_gso(skb) || skb_has_frag_list(skb);NAPI_GRO_CB(skb)->free = 0;NAPI_GRO_CB(skb)->encap_mark = 0;NAPI_GRO_CB(skb)->recursion_counter = 0;NAPI_GRO_CB(skb)->is_fou = 0;NAPI_GRO_CB(skb)->is_atomic = 1;NAPI_GRO_CB(skb)->gro_remcsum_start = 0;/* Setup for GRO checksum validation */switch (skb->ip_summed) {case CHECKSUM_COMPLETE:NAPI_GRO_CB(skb)->csum = skb->csum;NAPI_GRO_CB(skb)->csum_valid = 1;NAPI_GRO_CB(skb)->csum_cnt = 0;break;case CHECKSUM_UNNECESSARY:NAPI_GRO_CB(skb)->csum_cnt = skb->csum_level + 1;NAPI_GRO_CB(skb)->csum_valid = 0;break;default:NAPI_GRO_CB(skb)->csum_cnt = 0;NAPI_GRO_CB(skb)->csum_valid = 0;}pp = INDIRECT_CALL_INET(ptype->callbacks.gro_receive,ipv6_gro_receive, inet_gro_receive,gro_head, skb);break;}rcu_read_unlock();....
pull:grow = skb_gro_offset(skb) - skb_headlen(skb);if (grow > 0)gro_pull_from_frag0(skb, grow);normal:ret = GRO_NORMAL;goto pull;}

if netif_elide_gro(skb->dev) 为真,就是支持GRO。

static inline bool netif_elide_gro(const struct net_device *dev)
{if (!(dev->features & NETIF_F_GRO) || dev->xdp_prog)return true;return false;
}

 GRO 与TSO类似,但TSO 只支持发送数据包。支持GRO的驱动会在NAPI的回调 poll 方法中读取数据包,然后调用GRO的接口napi_gro_receive 或者napi_gro_frags 来将数据包送进协议栈。

下面分析GRO处理数据包的流程(linux 内核版本:5.10.*)。

三、GRO 源码详解

网卡驱动在收包时会调用napi_gro_receive 函数接收数据:

先介绍一下数据结构:

1、struct napi_struct
struct napi_struct {/* The poll_list must only be managed by the entity which* changes the state of the NAPI_STATE_SCHED bit.  This means* whoever atomically sets that bit can add this napi_struct* to the per-CPU poll_list, and whoever clears that bit* can remove from the list right before clearing the bit.*/struct list_head        poll_list;unsigned long           state;int                     weight;int                     defer_hard_irqs_count;
//哪些gro 链表上有数据包unsigned long           gro_bitmask;int                     (*poll)(struct napi_struct *, int);
#ifdef CONFIG_NETPOLLint                     poll_owner;
#endifstruct net_device       *dev;
//gro 的链表数组,数据包积攒struct gro_list         gro_hash[GRO_HASH_BUCKETS];struct sk_buff          *skb;struct list_head        rx_list; /* Pending GRO_NORMAL skbs */int                     rx_count; /* length of rx_list */ 最大值是8struct hrtimer          timer;struct list_head        dev_list;struct hlist_node       napi_hash_node;unsigned int            napi_id;
};

上面的数据成员 rx_count 最大值为8 ,定义如下:


/* Maximum number of GRO_NORMAL skbs to batch up for list-RX */
int gro_normal_batch __read_mostly = 8;
 

2、struct napi_gro_cb

GRO功能使用skb结构体内私有空间cb[48]来存放gro所用到的一些信息.

struct   napi_gro_cb
{/*指向存在skb_shinfo(skb)->frag[0].page页的数据的头部,GRO使用过程中,如果skb是线性的,就置为空。如果是非线性的并且报文头部全部存在非线性区中,就指向页中的数据起始部分*/void *frag0;/*第一页中数据的长度,如果frag0 字段不为空,就设置该字段,否则为0。( Length of frag0.)*/unsigned int frag0_len;/*This indicates where we are processingrelative to skb->data.表明skb->data到GRO需要处理的数据区的偏移量。因为在GRO合并处理过程中skb->data是不能被改变的,所以需要使用该字段来记录一下偏移量。GRO处理过程中根据该记录值快速找到要处理的数据部分。比如进入ip层进行GRO处理,这时skb->data指向ip 头,而ip层的gro 正好要处理ip头,这时偏移量就为0.进入传输层后进行GRO处理,这时skb->data还指向ip头,而tcp层gro要处理tcp头,这时偏移量就是ip头部长度。*/int data_offset;/*This is non-zero if the packet may be of the same flow.标记挂在napi->gro_list上的报文是否跟现在的报文进行匹配。每层的gro_receive都设置该标记位。接收到一个报文后,使用该报文和挂在napi->gro_list 上的报文进行匹配。在链路层,使用dev 和 mac头进行匹配,如果一样表示两个报文是通一个设备发过来的,就标记napi->gro_list上对应的skb的same为1.到网络层,再进一步进行匹配时,只需跟napi->list上刚被链路层标记same为1的报文进行网络层的匹配即可,不需再跟每个报文进行匹配。如果网络层不匹配,就清除该标记。到传输层,也是只配置被网络层标记same为1 的报文即可。这样设计为的是减少没必要的匹配操作*/int same_flow;/*This is non-zero if the packet cannot be mergedwith the new skb.如果该字段不为0,表示该数据报文没必要再等待合并,可以直接送进协议栈进行处理了*/int flush;/*该报文被合并过的次数 ,Number of segments aggregated. */int count;/* Free the skb? ,是否该被丢弃*/int free;
};
#define NAPI_GRO_CB(skb) ((struct napi_gro_cb *)(skb)->cb)
3、每个协议中定义自己的GRO接收合并函数和合并后处理函数

接收合并函数定义:

struct sk_buff**(*gro_receive)(struct sk_buff **head,struct sk_buff *skb);

参数:

head:等待合并的skb链表头

skb:接收到的skb。

返回值:

如果为空,表示报文被合并后不需要现在送入协议栈。

如果不为空,表示返回的报文需要立即送入协议栈。

合并后处理函数定义:

int(*gro_complete)(struct sk_buff *skb);

该函数对合并好的报文进行进一步加工,比如更新校验和。

4、napi_gro_receive
static inline void skb_gro_reset_offset(struct sk_buff *skb, u32 nhoff)
{const struct skb_shared_info *pinfo = skb_shinfo(skb);const skb_frag_t *frag0 = &pinfo->frags[0];NAPI_GRO_CB(skb)->data_offset = 0;NAPI_GRO_CB(skb)->frag0 = NULL;NAPI_GRO_CB(skb)->frag0_len = 0;//skb_headlen为0,则说明包头保存在skb_shinfo中if (!skb_headlen(skb) && pinfo->nr_frags &&!PageHighMem(skb_frag_page(frag0)) &&(!NET_IP_ALIGN || !((skb_frag_off(frag0) + nhoff) & 3))) {NAPI_GRO_CB(skb)->frag0 = skb_frag_address(frag0);NAPI_GRO_CB(skb)->frag0_len = min_t(unsigned int,skb_frag_size(frag0),skb->end - skb->tail);}
}gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb)
{gro_result_t ret;skb_mark_napi_id(skb, napi);trace_napi_gro_receive_entry(skb);skb_gro_reset_offset(skb, 0);ret = napi_skb_finish(napi, skb, dev_gro_receive(napi, skb));trace_napi_gro_receive_exit(ret);return ret;
}
EXPORT_SYMBOL(napi_gro_receive);

GRO将数据送进协议栈的点有两处:

一、在napi_skb_finish里,它会通过判断dev_gro_receive 的返回值,来决定是否需要将数据包送入进协议栈;

二、当napi的循环执行完毕时,执行 napi_complete 的时候。 

5、skb_gro_reset_offset

根据napi_gro_receive 函数,在调用napi_skb_finish 之前,会调用skb_gro_reset_offset 进行skb 处理;

首先要知道一种情况,那就是skb本身不包含数据(包括头也没有),而所有的数据都保存在skb_shared_info中(支持S/G的网卡有可能会这么做).此时我们如果想要合并的话,就需要将包头这些信息取出来,也就是从skb_shared_info的frags[0]中去的,在 skb_gro_reset_offset中就有做这个事情,而这里就会把头的信息保存到napi_gro_cb 的frags0中。并且此时frags必然不会在high mem,要么是线性区,要么是dma(S/G io)。来看skb_gro_reset_offset

static inline void skb_gro_reset_offset(struct sk_buff *skb, u32 nhoff)
{const struct skb_shared_info *pinfo = skb_shinfo(skb);const skb_frag_t *frag0 = &pinfo->frags[0];NAPI_GRO_CB(skb)->data_offset = 0;NAPI_GRO_CB(skb)->frag0 = NULL;NAPI_GRO_CB(skb)->frag0_len = 0;
如果mac_header和skb->tail相等并且地址不在高端内存,则说明包头保存在skb_shinfo中,所以我们需要从frags中取得对应的数据包if (!skb_headlen(skb) && pinfo->nr_frags &&!PageHighMem(skb_frag_page(frag0)) &&(!NET_IP_ALIGN || !((skb_frag_off(frag0) + nhoff) & 3))) {// 可以看到frag0保存的就是对应的skb的frags的第一个元素的地址NAPI_GRO_CB(skb)->frag0 = skb_frag_address(frag0);
//然后保存对应的大小NAPI_GRO_CB(skb)->frag0_len = min_t(unsigned int,skb_frag_size(frag0),skb->end - skb->tail);}
}

 

 从上面数据结构的包含关系可知, end 指针指向的位置 是 skb_shared_info 结构的开始位置。

6、napi_skb _finish
static gro_result_t napi_skb_finish(struct napi_struct *napi,struct sk_buff *skb,gro_result_t ret)
{switch (ret) {case GRO_NORMAL://将数据包送进协议栈gro_normal_one(napi, skb, 1);break;case GRO_DROP:kfree_skb(skb);break;case GRO_MERGED_FREE://表示skb可以被free,因为GRO已经将skb合并并保存起来if (NAPI_GRO_CB(skb)->free == NAPI_GRO_FREE_STOLEN_HEAD)napi_skb_free_stolen_head(skb);else__kfree_skb(skb);break;case GRO_HELD://这个表示当前数据已经被GRO保存起来,但是并没有进行合并,因此skb还需要保存。case GRO_MERGED:case GRO_CONSUMED:break;}return ret;
}

 这个函数,是根据dev_gro_receive 函数执行之后的结果,来执行不行的逻辑。如果是 GRO_NORMAL ,调用gro_normal_one 将数据包送进协议栈。

7、dev_gro_receive

dev_gro_receive 函数用于合并skb,并决定是否将合并后的大skb 送入网络协议栈:

INDIRECT_CALLABLE_DECLARE(struct sk_buff *inet_gro_receive(struct list_head *,struct sk_buff *));
INDIRECT_CALLABLE_DECLARE(struct sk_buff *ipv6_gro_receive(struct list_head *,struct sk_buff *));
static enum gro_result dev_gro_receive(struct napi_struct *napi, struct sk_buff *skb)
{u32 hash = skb_get_hash_raw(skb) & (GRO_HASH_BUCKETS - 1);struct list_head *head = &offload_base;struct packet_offload *ptype;__be16 type = skb->protocol;struct list_head *gro_head;struct sk_buff *pp = NULL;enum gro_result ret;int same_flow;int grow;if (netif_elide_gro(skb->dev))//判断是否支持GROgoto normal;gro_head = gro_list_prepare(napi, skb);//比较GRO队列中的skb与当前skb在链路层是否属于同一个流rcu_read_lock();list_for_each_entry_rcu(ptype, head, list) {//遍历GRO处理函数注册队列if (ptype->type != type || !ptype->callbacks.gro_receive)//查找网络层GRO处理函数continue;skb_set_network_header(skb, skb_gro_offset(skb));skb_reset_mac_len(skb);NAPI_GRO_CB(skb)->same_flow = 0;//skb是GSO或skb有非线性空间的数据;这个skb已经是聚合状态了,不需要再次聚合NAPI_GRO_CB(skb)->flush = skb_is_gso(skb) || skb_has_frag_list(skb);NAPI_GRO_CB(skb)->free = 0;NAPI_GRO_CB(skb)->encap_mark = 0;NAPI_GRO_CB(skb)->recursion_counter = 0;NAPI_GRO_CB(skb)->is_fou = 0;NAPI_GRO_CB(skb)->is_atomic = 1;NAPI_GRO_CB(skb)->gro_remcsum_start = 0;/* Setup for GRO checksum validation */switch (skb->ip_summed) {case CHECKSUM_COMPLETE:NAPI_GRO_CB(skb)->csum = skb->csum;NAPI_GRO_CB(skb)->csum_valid = 1;NAPI_GRO_CB(skb)->csum_cnt = 0;break;case CHECKSUM_UNNECESSARY:NAPI_GRO_CB(skb)->csum_cnt = skb->csum_level + 1;NAPI_GRO_CB(skb)->csum_valid = 0;break;default:NAPI_GRO_CB(skb)->csum_cnt = 0;NAPI_GRO_CB(skb)->csum_valid = 0;}pp = INDIRECT_CALL_INET(ptype->callbacks.gro_receive,ipv6_gro_receive, inet_gro_receive,gro_head, skb);//调用inet_gro_receive或ipv6_gro_receive合并skbbreak;}rcu_read_unlock();if (&ptype->list == head)goto normal;if (PTR_ERR(pp) == -EINPROGRESS) {ret = GRO_CONSUMED;goto ok;}same_flow = NAPI_GRO_CB(skb)->same_flow;ret = NAPI_GRO_CB(skb)->free ? GRO_MERGED_FREE : GRO_MERGED;if (pp) {skb_list_del_init(pp);napi_gro_complete(napi, pp);//更新聚合后的skb信息,将包送入协议栈napi->gro_hash[hash].count--;}if (same_flow)//找到与当前skb属与同一个流的skb,此时当前skb已经聚合到所属的流中goto ok;if (NAPI_GRO_CB(skb)->flush)//skb不能聚合或GRO队列已满goto normal;if (unlikely(napi->gro_hash[hash].count >= MAX_GRO_SKBS)) {gro_flush_oldest(napi, gro_head);} else {napi->gro_hash[hash].count++;}NAPI_GRO_CB(skb)->count = 1;NAPI_GRO_CB(skb)->age = jiffies;NAPI_GRO_CB(skb)->last = skb;skb_shinfo(skb)->gso_size = skb_gro_len(skb);list_add(&skb->list, gro_head);//将skb是一个新包,在gro_list中没有能与之合并的,需要加入到GRO队列中ret = GRO_HELD;//存储当前包,不能释放pull:grow = skb_gro_offset(skb) - skb_headlen(skb);if (grow > 0)//如果头不全部在线性区,则需要将其copy到线性区gro_pull_from_frag0(skb, grow);
ok:if (napi->gro_hash[hash].count) {if (!test_bit(hash, &napi->gro_bitmask))__set_bit(hash, &napi->gro_bitmask);} else if (test_bit(hash, &napi->gro_bitmask)) {__clear_bit(hash, &napi->gro_bitmask);}return ret;normal:ret = GRO_NORMAL;goto pull;
}static void gro_pull_from_frag0(struct sk_buff *skb, int grow)
{struct skb_shared_info *pinfo = skb_shinfo(skb);BUG_ON(skb->end - skb->tail < grow);memcpy(skb_tail_pointer(skb), NAPI_GRO_CB(skb)->frag0, grow);//将第一个页的内容转移到线性空间skb->data_len -= grow;skb->tail += grow;skb_frag_off_add(&pinfo->frags[0], grow);skb_frag_size_sub(&pinfo->frags[0], grow);if (unlikely(!skb_frag_size(&pinfo->frags[0]))) {//第一个页的内容已经全部转移到线性空间skb_frag_unref(skb, 0);memmove(pinfo->frags, pinfo->frags + 1,--pinfo->nr_frags * sizeof(pinfo->frags[0]));//数组内容向前移动}
}
8、每一层协议都实现了自己的gro回调函数,gro_receive和gro_complete

 gro系统会根据协议来调用对应回调函数,其中gro_receive是将输入skb尽量合并到我们gro_list中。而gro_complete则是当我们需要提交gro合并的数据包到协议栈时被调用的。

下面就是ip层和tcp层对应的回调方法:

/**      IP protocol layer initialiser*/static struct packet_offload ip_packet_offload __read_mostly = {.type = cpu_to_be16(ETH_P_IP),.callbacks = {.gso_segment = inet_gso_segment,.gro_receive = inet_gro_receive,.gro_complete = inet_gro_complete,},
};static const struct net_offload ipip_offload = {.callbacks = {.gso_segment    = ipip_gso_segment,.gro_receive    = ipip_gro_receive,.gro_complete   = ipip_gro_complete,},
};static int __init ipip_offload_init(void)
{return inet_add_offload(&ipip_offload, IPPROTO_IPIP);
}static int __init ipv4_offload_init(void)
{/** Add offloads*/if (udpv4_offload_init() < 0)pr_crit("%s: Cannot add UDP protocol offload\n", __func__);if (tcpv4_offload_init() < 0)pr_crit("%s: Cannot add TCP protocol offload\n", __func__);if (ipip_offload_init() < 0)pr_crit("%s: Cannot add IPIP protocol offload\n", __func__);dev_add_offload(&ip_packet_offload);return 0;
}fs_initcall(ipv4_offload_init);

通过函数dev_add_offload  和inet_add_offload 注册进系统。

什么时候调用gro_receive 函数?

在dev_gro_receive 函数中,会调用 gro_receive 函数进行 合并skb.

INDIRECT_CALLABLE_DECLARE(struct sk_buff *inet_gro_receive(struct list_head *,struct sk_buff *));
INDIRECT_CALLABLE_DECLARE(struct sk_buff *ipv6_gro_receive(struct list_head *,struct sk_buff *));
static enum gro_result dev_gro_receive(struct napi_struct *napi, struct sk_buff *skb)
{...pp = INDIRECT_CALL_INET(ptype->callbacks.gro_receive,ipv6_gro_receive, inet_gro_receive,gro_head, skb);
....}

什么时候调用gro_complete 函数?

在dev_gro_receive 函数中,当调用gro_receive函数进行合并skb 之后,会调用 napi_gro_complete函数。在napi_gro_complete中,会调用gro_complete函数。

如下:

INDIRECT_CALLABLE_DECLARE(int inet_gro_complete(struct sk_buff *, int));
INDIRECT_CALLABLE_DECLARE(int ipv6_gro_complete(struct sk_buff *, int));
int napi_gro_complete(struct napi_struct *napi, struct sk_buff *skb)
{struct packet_offload *ptype;__be16 type = skb->protocol;struct list_head *head = &offload_base;int err = -ENOENT;BUILD_BUG_ON(sizeof(struct napi_gro_cb) > sizeof(skb->cb));if (NAPI_GRO_CB(skb)->count == 1) {skb_shinfo(skb)->gso_size = 0;goto out;}rcu_read_lock();list_for_each_entry_rcu(ptype, head, list) {if (ptype->type != type || !ptype->callbacks.gro_complete)continue;err = INDIRECT_CALL_INET(ptype->callbacks.gro_complete,ipv6_gro_complete, inet_gro_complete,skb, 0);break;}rcu_read_unlock();if (err) {WARN_ON(&ptype->list == head);kfree_skb(skb);return NET_RX_SUCCESS;}out:gro_normal_one(napi, skb, NAPI_GRO_CB(skb)->count);return NET_RX_SUCCESS;
}
EXPORT_SYMBOL(napi_gro_complete);
9、inet_gro_receive

inet_gro_receive函数是网络层skb 聚合处理函数:

struct sk_buff *inet_gro_receive(struct list_head *head, struct sk_buff *skb)
{const struct net_offload *ops;struct sk_buff *pp = NULL;const struct iphdr *iph;struct sk_buff *p;unsigned int hlen;unsigned int off;unsigned int id;int flush = 1;int proto;off = skb_gro_offset(skb);hlen = off + sizeof(*iph);//MAC + IP头长度iph = skb_gro_header_fast(skb, off);if (skb_gro_header_hard(skb, hlen)) {iph = skb_gro_header_slow(skb, hlen, off);if (unlikely(!iph))goto out;}proto = iph->protocol;rcu_read_lock();ops = rcu_dereference(inet_offloads[proto]);//找到传输层注册的处理函数if (!ops || !ops->callbacks.gro_receive)goto out_unlock;if (*(u8 *)iph != 0x45)//不是IPv4且头长度不是20字节goto out_unlock;if (ip_is_fragment(iph))goto out_unlock;if (unlikely(ip_fast_csum((u8 *)iph, 5)))//检验和非法goto out_unlock;id = ntohl(*(__be32 *)&iph->id);flush = (u16)((ntohl(*(__be32 *)iph) ^ skb_gro_len(skb)) | (id & ~IP_DF));id >>= 16;list_for_each_entry(p, head, list) {struct iphdr *iph2;u16 flush_id;if (!NAPI_GRO_CB(p)->same_flow)//与当前包不是一个流continue;iph2 = (struct iphdr *)(p->data + off);/* The above works because, with the exception of the top* (inner most) layer, we only aggregate pkts with the same* hdr length so all the hdrs we'll need to verify will start* at the same offset.*/if ((iph->protocol ^ iph2->protocol) |((__force u32)iph->saddr ^ (__force u32)iph2->saddr) |((__force u32)iph->daddr ^ (__force u32)iph2->daddr)) {//三元组不匹配,与当前包不是一个流NAPI_GRO_CB(p)->same_flow = 0;continue;}/* All fields must match except length and checksum. */NAPI_GRO_CB(p)->flush |=(iph->ttl ^ iph2->ttl) |(iph->tos ^ iph2->tos) |((iph->frag_off ^ iph2->frag_off) & htons(IP_DF));//检查ttl、tos、id顺序,如果不符合则不是一个流NAPI_GRO_CB(p)->flush |= flush;/* We need to store of the IP ID check to be included later* when we can verify that this packet does in fact belong* to a given flow.*/flush_id = (u16)(id - ntohs(iph2->id));/* This bit of code makes it much easier for us to identify* the cases where we are doing atomic vs non-atomic IP ID* checks.  Specifically an atomic check can return IP ID* values 0 - 0xFFFF, while a non-atomic check can only* return 0 or 0xFFFF.*/if (!NAPI_GRO_CB(p)->is_atomic ||!(iph->frag_off & htons(IP_DF))) {flush_id ^= NAPI_GRO_CB(p)->count;flush_id = flush_id ? 0xFFFF : 0;}/* If the previous IP ID value was based on an atomic* datagram we can overwrite the value and ignore it.*/if (NAPI_GRO_CB(skb)->is_atomic)NAPI_GRO_CB(p)->flush_id = flush_id;elseNAPI_GRO_CB(p)->flush_id |= flush_id;}NAPI_GRO_CB(skb)->is_atomic = !!(iph->frag_off & htons(IP_DF));NAPI_GRO_CB(skb)->flush |= flush;skb_set_network_header(skb, off);/* The above will be needed by the transport layer if there is one* immediately following this IP hdr.*//* Note : No need to call skb_gro_postpull_rcsum() here,* as we already checked checksum over ipv4 header was 0*/skb_gro_pull(skb, sizeof(*iph));skb_set_transport_header(skb, skb_gro_offset(skb));pp = indirect_call_gro_receive(tcp4_gro_receive, udp4_gro_receive,ops->callbacks.gro_receive, head, skb);//指向tcp4_gro_receive或tcp6_gro_receiveout_unlock:rcu_read_unlock();out:skb_gro_flush_final(skb, pp, flush);return pp;
}
EXPORT_SYMBOL(inet_gro_receive);
10、tcp4_gro_receive 

tcp4_grp_receive 函数是传输层skb 聚合处理函数:

INDIRECT_CALLABLE_SCOPE
struct sk_buff *tcp4_gro_receive(struct list_head *head, struct sk_buff *skb)
{/* Don't bother verifying checksum if we're going to flush anyway. */if (!NAPI_GRO_CB(skb)->flush &&skb_gro_checksum_validate(skb, IPPROTO_TCP,inet_gro_compute_pseudo)) {NAPI_GRO_CB(skb)->flush = 1;return NULL;}return tcp_gro_receive(head, skb);
}struct sk_buff *tcp_gro_receive(struct list_head *head, struct sk_buff *skb)
{struct sk_buff *pp = NULL;struct sk_buff *p;struct tcphdr *th;struct tcphdr *th2;unsigned int len;unsigned int thlen;__be32 flags;unsigned int mss = 1;unsigned int hlen;unsigned int off;int flush = 1;int i;off = skb_gro_offset(skb);hlen = off + sizeof(*th);th = skb_gro_header_fast(skb, off);if (skb_gro_header_hard(skb, hlen)) {th = skb_gro_header_slow(skb, hlen, off);if (unlikely(!th))goto out;}thlen = th->doff * 4;if (thlen < sizeof(*th))goto out;hlen = off + thlen;if (skb_gro_header_hard(skb, hlen)) {th = skb_gro_header_slow(skb, hlen, off);if (unlikely(!th))goto out;}skb_gro_pull(skb, thlen);len = skb_gro_len(skb);flags = tcp_flag_word(th);list_for_each_entry(p, head, list) {if (!NAPI_GRO_CB(p)->same_flow)//IP层与当前包不是一个流continue;th2 = tcp_hdr(p);if (*(u32 *)&th->source ^ *(u32 *)&th2->source) {//源端口不匹配,与当前包不是一个流NAPI_GRO_CB(p)->same_flow = 0;continue;}goto found;}p = NULL;goto out_check_final;found:/* Include the IP ID check below from the inner most IP hdr */flush = NAPI_GRO_CB(p)->flush;flush |= (__force int)(flags & TCP_FLAG_CWR);//发生了拥塞,那么前面被缓存的数据包需要马上被送入协议栈,以便进行TCP的拥塞控制flush |= (__force int)((flags ^ tcp_flag_word(th2)) &~(TCP_FLAG_CWR | TCP_FLAG_FIN | TCP_FLAG_PSH));//如果控制标志位除了CWR、FIN、PSH外有不相同的则需要马上被送入协议栈flush |= (__force int)(th->ack_seq ^ th2->ack_seq);//确认号不相同则需要马上被送入协议栈,以便尽快释放TCP发送缓存中的空间for (i = sizeof(*th); i < thlen; i += 4)flush |= *(u32 *)((u8 *)th + i) ^*(u32 *)((u8 *)th2 + i);/* When we receive our second frame we can made a decision on if we* continue this flow as an atomic flow with a fixed ID or if we use* an incrementing ID.*/if (NAPI_GRO_CB(p)->flush_id != 1 ||NAPI_GRO_CB(p)->count != 1 ||!NAPI_GRO_CB(p)->is_atomic)flush |= NAPI_GRO_CB(p)->flush_id;elseNAPI_GRO_CB(p)->is_atomic = false;mss = skb_shinfo(p)->gso_size;flush |= (len - 1) >= mss;flush |= (ntohl(th2->seq) + skb_gro_len(p)) ^ ntohl(th->seq);
#ifdef CONFIG_TLS_DEVICEflush |= p->decrypted ^ skb->decrypted;
#endifif (flush || skb_gro_receive(p, skb)) {//flush非0意味着需要将skb立即送入协议栈;这样的包不能调用skb_gro_receive进行合并mss = 1;goto out_check_final;}tcp_flag_word(th2) |= flags & (TCP_FLAG_FIN | TCP_FLAG_PSH);out_check_final:flush = len < mss;flush |= (__force int)(flags & (TCP_FLAG_URG | TCP_FLAG_PSH |TCP_FLAG_RST | TCP_FLAG_SYN |TCP_FLAG_FIN));//如果包中有URG、PSH、RST、SYN、FIN标记中的任意一个,则将合并后的包立即交付协议栈if (p && (!NAPI_GRO_CB(skb)->same_flow || flush))pp = p;out:NAPI_GRO_CB(skb)->flush |= (flush != 0);return pp;
}

这里有一个疑问:为什么匹配TCP流时只检查源端口而不检查目的端口?

猜测:因为检查了 seq 和ack_seq .源IP 和源端口相同意味着包来自同一台主机,这种情况下,seq 和ack_seq 都匹配且目的端口不同的概率极低,所以不用检查目的端口。

11、skb_gro_receive 

skb_gro_receive 函数用于合并同流的skb:

int skb_gro_receive(struct sk_buff *p, struct sk_buff *skb)
{struct skb_shared_info *pinfo, *skbinfo = skb_shinfo(skb);unsigned int offset = skb_gro_offset(skb);unsigned int headlen = skb_headlen(skb);unsigned int len = skb_gro_len(skb);unsigned int delta_truesize;struct sk_buff *lp;if (unlikely(p->len + len >= 65536 || NAPI_GRO_CB(skb)->flush))return -E2BIG;lp = NAPI_GRO_CB(p)->last;pinfo = skb_shinfo(lp);if (headlen <= offset) {//有一部分头在page中skb_frag_t *frag;skb_frag_t *frag2;int i = skbinfo->nr_frags;int nr_frags = pinfo->nr_frags + i;if (nr_frags > MAX_SKB_FRAGS)goto merge;offset -= headlen;pinfo->nr_frags = nr_frags;skbinfo->nr_frags = 0;frag = pinfo->frags + nr_frags;frag2 = skbinfo->frags + i;do {//遍历赋值,将skb的frag加到pinfo的frgas后面*--frag = *--frag2;} while (--i);skb_frag_off_add(frag, offset);//去除剩余的头,只保留数据部分skb_frag_size_sub(frag, offset);/* all fragments truesize : remove (head size + sk_buff) */delta_truesize = skb->truesize -SKB_TRUESIZE(skb_end_offset(skb));skb->truesize -= skb->data_len;skb->len -= skb->data_len;skb->data_len = 0;NAPI_GRO_CB(skb)->free = NAPI_GRO_FREE;goto done;} else if (skb->head_frag) {//支持分散-聚集IOint nr_frags = pinfo->nr_frags;skb_frag_t *frag = pinfo->frags + nr_frags;struct page *page = virt_to_head_page(skb->head);unsigned int first_size = headlen - offset;unsigned int first_offset;if (nr_frags + 1 + skbinfo->nr_frags > MAX_SKB_FRAGS)goto merge;first_offset = skb->data -(unsigned char *)page_address(page) +offset;pinfo->nr_frags = nr_frags + 1 + skbinfo->nr_frags;__skb_frag_set_page(frag, page);skb_frag_off_set(frag, first_offset);skb_frag_size_set(frag, first_size);memcpy(frag + 1, skbinfo->frags, sizeof(*frag) * skbinfo->nr_frags);/* We dont need to clear skbinfo->nr_frags here */delta_truesize = skb->truesize - SKB_DATA_ALIGN(sizeof(struct sk_buff));NAPI_GRO_CB(skb)->free = NAPI_GRO_FREE_STOLEN_HEAD;goto done;}merge:delta_truesize = skb->truesize;if (offset > headlen) {unsigned int eat = offset - headlen;skb_frag_off_add(&skbinfo->frags[0], eat);skb_frag_size_sub(&skbinfo->frags[0], eat);skb->data_len -= eat;skb->len -= eat;offset = headlen;}__skb_pull(skb, offset);if (NAPI_GRO_CB(p)->last == p)skb_shinfo(p)->frag_list = skb;//将旧GRO队列头放入frag_list队列中elseNAPI_GRO_CB(p)->last->next = skb;//将包放入GRO队列中NAPI_GRO_CB(p)->last = skb;__skb_header_release(skb);lp = p;done:NAPI_GRO_CB(p)->count++;p->data_len += len;p->truesize += delta_truesize;p->len += len;if (lp != p) {lp->data_len += len;lp->truesize += delta_truesize;lp->len += len;}NAPI_GRO_CB(skb)->same_flow = 1;//标识当前skb已经找到同流的skb并进行了合并return 0;
}

可见当网卡支持分散-聚合IO时,GRO会将多个skb 合并到一个skb 的frag page 数组中, 否则会合并到skb 的frag_list 中。

即使在上述流程中skb被放入GRO队列中保存而没有被立刻送入协议栈,他们也不会在队列中滞留太久时间,因为在收包软中断会调用napi_gro_flush 函数将GRO队列中的包送入协议栈。

12、napi_gro_flush
static void __napi_gro_flush_chain(struct napi_struct *napi, u32 index,bool flush_old)
{struct list_head *head = &napi->gro_hash[index].list;struct sk_buff *skb, *p;list_for_each_entry_safe_reverse(skb, p, head, list) {if (flush_old && NAPI_GRO_CB(skb)->age == jiffies)return;skb_list_del_init(skb);napi_gro_complete(napi, skb);//将包送入协议栈napi->gro_hash[index].count--;}if (!napi->gro_hash[index].count)__clear_bit(index, &napi->gro_bitmask);
}/* napi->gro_hash[].list contains packets ordered by age.* youngest packets at the head of it.* Complete skbs in reverse order to reduce latencies.*/
void napi_gro_flush(struct napi_struct *napi, bool flush_old)
{unsigned long bitmask = napi->gro_bitmask;unsigned int i, base = ~0U;while ((i = ffs(bitmask)) != 0) {bitmask >>= i;base += i;__napi_gro_flush_chain(napi, base, flush_old);}
}
EXPORT_SYMBOL(napi_gro_flush);

上面的函数也是 调用 napi_gro_complete 将包送入协议栈。

包加入GRO队列的时间比当前仅晚1个jiffies 也会被视作旧包并交付协议栈处理,可见如果软中断每个jiffies 都调用一次napi_gro_flush 函数的话,开启GRO功能最多增加1个jiffies(1ms 或10ms) 的延迟。

13、napi_gro_complete 

napi_gro_complete 函数会先处理一下聚合包各层协议的首部信息,再将包交付网络协议栈:

INDIRECT_CALLABLE_DECLARE(int inet_gro_complete(struct sk_buff *, int));
INDIRECT_CALLABLE_DECLARE(int ipv6_gro_complete(struct sk_buff *, int));
static int napi_gro_complete(struct napi_struct *napi, struct sk_buff *skb)
{struct packet_offload *ptype;__be16 type = skb->protocol;struct list_head *head = &offload_base;int err = -ENOENT;BUILD_BUG_ON(sizeof(struct napi_gro_cb) > sizeof(skb->cb));if (NAPI_GRO_CB(skb)->count == 1) {skb_shinfo(skb)->gso_size = 0;goto out;}rcu_read_lock();list_for_each_entry_rcu(ptype, head, list) {if (ptype->type != type || !ptype->callbacks.gro_complete)continue;err = INDIRECT_CALL_INET(ptype->callbacks.gro_complete,ipv6_gro_complete, inet_gro_complete,//指向inet_gro_complete或ipv6_gro_completeskb, 0);break;}rcu_read_unlock();if (err) {WARN_ON(&ptype->list == head);kfree_skb(skb);return NET_RX_SUCCESS;}out:gro_normal_one(napi, skb, NAPI_GRO_CB(skb)->count);return NET_RX_SUCCESS;
}/* Queue one GRO_NORMAL SKB up for list processing. If batch size exceeded,* pass the whole batch up to the stack.*/
更新当前 napi->rx_count 计数, 当数量达到gro_normal_batch时,将调用 gro_normal_list 函数,将多个包一次性送到协议栈。
static void gro_normal_one(struct napi_struct *napi, struct sk_buff *skb, int segs)
{list_add_tail(&skb->list, &napi->rx_list);napi->rx_count += segs;if (napi->rx_count >= gro_normal_batch)gro_normal_list(napi);
}
14、inet_gro_complete

inet_gro_complete 函数处理网络层首部信息:

int inet_gro_complete(struct sk_buff *skb, int nhoff)
{__be16 newlen = htons(skb->len - nhoff);struct iphdr *iph = (struct iphdr *)(skb->data + nhoff);const struct net_offload *ops;int proto = iph->protocol;int err = -ENOSYS;if (skb->encapsulation) {skb_set_inner_protocol(skb, cpu_to_be16(ETH_P_IP));skb_set_inner_network_header(skb, nhoff);}csum_replace2(&iph->check, iph->tot_len, newlen);//重新计算IP检验和,因为聚合在一起的包共用一个IP头iph->tot_len = newlen;//重新计算包长rcu_read_lock();ops = rcu_dereference(inet_offloads[proto]);if (WARN_ON(!ops || !ops->callbacks.gro_complete))goto out_unlock;/* Only need to add sizeof(*iph) to get to the next hdr below* because any hdr with option will have been flushed in* inet_gro_receive().*/err = INDIRECT_CALL_2(ops->callbacks.gro_complete,tcp4_gro_complete, udp4_gro_complete,//指向tcp4_gro_complete或tcp6_gro_completeskb, nhoff + sizeof(*iph));out_unlock:rcu_read_unlock();return err;
}
EXPORT_SYMBOL(inet_gro_complete);
15、tcp4_gro_complete

tcp4_gro_complete 函数处理TCP首部信息

INDIRECT_CALLABLE_SCOPE int tcp4_gro_complete(struct sk_buff *skb, int thoff)
{const struct iphdr *iph = ip_hdr(skb);struct tcphdr *th = tcp_hdr(skb);th->check = ~tcp_v4_check(skb->len - thoff, iph->saddr,iph->daddr, 0);//重算聚合后的包的TCP检验和skb_shinfo(skb)->gso_type |= SKB_GSO_TCPV4;if (NAPI_GRO_CB(skb)->is_atomic)skb_shinfo(skb)->gso_type |= SKB_GSO_TCP_FIXEDID;return tcp_gro_complete(skb);
}int tcp_gro_complete(struct sk_buff *skb)
{struct tcphdr *th = tcp_hdr(skb);skb->csum_start = (unsigned char *)th - skb->head;skb->csum_offset = offsetof(struct tcphdr, check);skb->ip_summed = CHECKSUM_PARTIAL;skb_shinfo(skb)->gso_segs = NAPI_GRO_CB(skb)->count;if (th->cwr)skb_shinfo(skb)->gso_type |= SKB_GSO_TCP_ECN;if (skb->encapsulation)skb->inner_transport_header = skb->transport_header;return 0;
}
EXPORT_SYMBOL(tcp_gro_complete);

 可见,GRO 的基本原理是将MAC层、IP层、TCP层都能合并的包的头只留一个,数据部分在frag 数组或 frag_list 中存储,这样大大提高了包携带数据的效率。

在完成GRO处理后,skb 会被交付到linux 网络协议栈入口进行协议处理。聚合后的skb 在被送入到网络协议栈后,在网络层协议、TCP协议处理函数中会调用pskb_may_pull 函数将GRO skb的数据整合到线性空间。

16、tcp_v4_rcv

/**      From tcp_input.c*/int tcp_v4_rcv(struct sk_buff *skb)
{struct net *net = dev_net(skb->dev);struct sk_buff *skb_to_free;int sdif = inet_sdif(skb);int dif = inet_iif(skb);const struct iphdr *iph;const struct tcphdr *th;bool refcounted;struct sock *sk;int ret;if (skb->pkt_type != PACKET_HOST)goto discard_it;/* Count it even if it's bad */__TCP_INC_STATS(net, TCP_MIB_INSEGS);if (!pskb_may_pull(skb, sizeof(struct tcphdr)))//将TCP基本首部整合到线性空间goto discard_it;th = (const struct tcphdr *)skb->data;if (unlikely(th->doff < sizeof(struct tcphdr) / 4))goto bad_packet;if (!pskb_may_pull(skb, th->doff * 4))//将TCP基本首部整合到线性空间goto discard_it;/* An explanation is required here, I think.* Packet length and doff are validated by header prediction,* provided case of th->doff==0 is eliminated.* So, we defer the checks. */if (skb_checksum_init(skb, IPPROTO_TCP, inet_compute_pseudo))goto csum_error;th = (const struct tcphdr *)skb->data;iph = ip_hdr(skb);
lookup:sk = __inet_lookup_skb(&tcp_hashinfo, skb, __tcp_hdrlen(th), th->source,th->dest, sdif, &refcounted);if (!sk)goto no_tcp_socket;process:if (sk->sk_state == TCP_TIME_WAIT)goto do_time_wait;if (sk->sk_state == TCP_NEW_SYN_RECV) {struct request_sock *req = inet_reqsk(sk);bool req_stolen = false;struct sock *nsk;sk = req->rsk_listener;if (unlikely(tcp_v4_inbound_md5_hash(sk, skb, dif, sdif))) {sk_drops_add(sk, skb);reqsk_put(req);goto discard_it;}if (tcp_checksum_complete(skb)) {reqsk_put(req);goto csum_error;}if (unlikely(sk->sk_state != TCP_LISTEN)) {inet_csk_reqsk_queue_drop_and_put(sk, req);goto lookup;}/* We own a reference on the listener, increase it again* as we might lose it too soon.*/sock_hold(sk);refcounted = true;nsk = NULL;if (!tcp_filter(sk, skb)) {th = (const struct tcphdr *)skb->data;iph = ip_hdr(skb);tcp_v4_fill_cb(skb, iph, th);nsk = tcp_check_req(sk, skb, req, false, &req_stolen);}if (!nsk) {reqsk_put(req);if (req_stolen) {/* Another cpu got exclusive access to req* and created a full blown socket.* Try to feed this packet to this socket* instead of discarding it.*/tcp_v4_restore_cb(skb);sock_put(sk);goto lookup;}goto discard_and_relse;}if (nsk == sk) {reqsk_put(req);tcp_v4_restore_cb(skb);} else if (tcp_child_process(sk, nsk, skb)) {tcp_v4_send_reset(nsk, skb);goto discard_and_relse;} else {sock_put(sk);return 0;}}if (unlikely(iph->ttl < inet_sk(sk)->min_ttl)) {__NET_INC_STATS(net, LINUX_MIB_TCPMINTTLDROP);goto discard_and_relse;}if (!xfrm4_policy_check(sk, XFRM_POLICY_IN, skb))goto discard_and_relse;if (tcp_v4_inbound_md5_hash(sk, skb, dif, sdif))goto discard_and_relse;nf_reset_ct(skb);if (tcp_filter(sk, skb))goto discard_and_relse;th = (const struct tcphdr *)skb->data;iph = ip_hdr(skb);tcp_v4_fill_cb(skb, iph, th);skb->dev = NULL;if (sk->sk_state == TCP_LISTEN) {ret = tcp_v4_do_rcv(sk, skb);goto put_and_return;}sk_incoming_cpu_update(sk);bh_lock_sock_nested(sk);tcp_segs_in(tcp_sk(sk), skb);ret = 0;if (!sock_owned_by_user(sk)) {skb_to_free = sk->sk_rx_skb_cache;sk->sk_rx_skb_cache = NULL;ret = tcp_v4_do_rcv(sk, skb);} else {if (tcp_add_backlog(sk, skb))goto discard_and_relse;skb_to_free = NULL;}bh_unlock_sock(sk);if (skb_to_free)__kfree_skb(skb_to_free);put_and_return:if (refcounted)sock_put(sk);return ret;no_tcp_socket:if (!xfrm4_policy_check(NULL, XFRM_POLICY_IN, skb))goto discard_it;tcp_v4_fill_cb(skb, iph, th);if (tcp_checksum_complete(skb)) {
csum_error:__TCP_INC_STATS(net, TCP_MIB_CSUMERRORS);
bad_packet:__TCP_INC_STATS(net, TCP_MIB_INERRS);} else {tcp_v4_send_reset(NULL, skb);}discard_it:/* Discard frame. */kfree_skb(skb);return 0;discard_and_relse:sk_drops_add(sk, skb);if (refcounted)sock_put(sk);goto discard_it;do_time_wait:if (!xfrm4_policy_check(NULL, XFRM_POLICY_IN, skb)) {inet_twsk_put(inet_twsk(sk));goto discard_it;}tcp_v4_fill_cb(skb, iph, th);if (tcp_checksum_complete(skb)) {inet_twsk_put(inet_twsk(sk));goto csum_error;}switch (tcp_timewait_state_process(inet_twsk(sk), skb, th)) {case TCP_TW_SYN: {struct sock *sk2 = inet_lookup_listener(dev_net(skb->dev),&tcp_hashinfo, skb,__tcp_hdrlen(th),iph->saddr, th->source,iph->daddr, th->dest,inet_iif(skb),sdif);if (sk2) {inet_twsk_deschedule_put(inet_twsk(sk));sk = sk2;tcp_v4_restore_cb(skb);refcounted = false;goto process;}}/* to ACK */fallthrough;case TCP_TW_ACK:tcp_v4_timewait_ack(sk, skb);break;case TCP_TW_RST:tcp_v4_send_reset(sk, skb);inet_twsk_deschedule_put(inet_twsk(sk));goto discard_it;case TCP_TW_SUCCESS:;}goto discard_it;
}

什么时候调用tcp_v4_rcv?

解释:当调用napi_gro_complete 时:

napi_gro_complete->

        gro_normal_one->

                gro_normal_list->  //当 napi->rx_count >= gro_normal_batch时,函数被调用

                        netif_receive_skb_list_internal->

                                __netif_receive_skb_list->

                                        __netif_receive_skb_list_core->

                                                __netif_receive_skb_core->

                                                        deliver_skb->  // pt_prev->func();                                                                                                       ip_rcv->  //static struct packet_type ip_packet_type;

                                                                        tcp_v4_rcv

 上面的调用流程,可以参考我的另外一篇文章:linux之网络子系统-网络协议栈 发包收包详解-CSDN博客

到这里,就算是GRO 处理之后,将数据包送入网络协议栈的全过程。

17、pskb_may_pull
static inline bool pskb_may_pull(struct sk_buff *skb, unsigned int len)
{if (likely(len <= skb_headlen(skb)))//线性区中的数据长度够pull len个字节return true;if (unlikely(len > skb->len))//skb中数据总长度小于len,包异常return false;return __pskb_pull_tail(skb, len - skb_headlen(skb)) != NULL;//需要增加线性区中的数据
}/* Moves tail of skb head forward, copying data from fragmented part,* when it is necessary.* 1. It may fail due to malloc failure.* 2. It may change skb pointers.** It is pretty complicated. Luckily, it is called only in exceptional cases.*/
void *__pskb_pull_tail(struct sk_buff *skb, int delta)
{/* If skb has not enough free space at tail, get new one* plus 128 bytes for future expansions. If we have enough* room at tail, reallocate without expansion only if skb is cloned.*/int i, k, eat = (skb->tail + delta) - skb->end;if (eat > 0 || skb_cloned(skb)) {//如果空间不足或是有共享空间if (pskb_expand_head(skb, 0, eat > 0 ? eat + 128 : 0,GFP_ATOMIC))return NULL;}BUG_ON(skb_copy_bits(skb, skb_headlen(skb),//将非线性空间中的数据填充到线性空间skb_tail_pointer(skb), delta));/* Optimization: no fragments, no reasons to preestimate* size of pulled pages. Superb.*/if (!skb_has_frag_list(skb))//frag_list中没有skbgoto pull_pages;/* Estimate size of pulled pages. */eat = delta;for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) {int size = skb_frag_size(&skb_shinfo(skb)->frags[i]);if (size >= eat)//仅使用frags中的数据就填满了pull的长度goto pull_pages;eat -= size;}/* If we need update frag list, we are in troubles.* Certainly, it is possible to add an offset to skb data,* but taking into account that pulling is expected to* be very rare operation, it is worth to fight against* further bloating skb head and crucify ourselves here instead.* Pure masohism, indeed. 8)8)*/if (eat) {//整理frag_list队列struct sk_buff *list = skb_shinfo(skb)->frag_list;struct sk_buff *clone = NULL;struct sk_buff *insp = NULL;do {if (list->len <= eat) {/* Eaten as whole. */eat -= list->len;list = list->next;insp = list;} else {/* Eaten partially. */if (skb_shared(list)) {/* Sucks! We need to fork list. :-( */clone = skb_clone(list, GFP_ATOMIC);if (!clone)return NULL;insp = list->next;list = clone;} else {/* This may be pulled without* problems. */insp = list;}if (!pskb_pull(list, eat)) {kfree_skb(clone);return NULL;}break;}} while (eat);/* Free pulled out fragments. */while ((list = skb_shinfo(skb)->frag_list) != insp) {//释放已经合并的skbskb_shinfo(skb)->frag_list = list->next;kfree_skb(list);}/* And insert new clone at head. */if (clone) {clone->next = list;skb_shinfo(skb)->frag_list = clone;}}/* Success! Now we may commit changes to skb data. */pull_pages:eat = delta;k = 0;for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) {int size = skb_frag_size(&skb_shinfo(skb)->frags[i]);if (size <= eat) {//释放已经合并的pageskb_frag_unref(skb, i);eat -= size;} else {skb_frag_t *frag = &skb_shinfo(skb)->frags[k];*frag = skb_shinfo(skb)->frags[i];if (eat) {skb_frag_off_add(frag, eat);skb_frag_size_sub(frag, eat);if (!i)goto end;eat = 0;}k++;}}skb_shinfo(skb)->nr_frags = k;end:skb->tail     += delta;skb->data_len -= delta;if (!skb->data_len)skb_zcopy_clear(skb, false);return skb_tail_pointer(skb);
}
EXPORT_SYMBOL(__pskb_pull_tail);/***	skb_copy_bits - copy bits from skb to kernel buffer*	@skb: source skb*	@offset: offset in source*	@to: destination buffer*	@len: number of bytes to copy**	Copy the specified number of bytes from the source skb to the*	destination buffer.**	CAUTION ! :*		If its prototype is ever changed,*		check arch/{*}/net/{*}.S files,*		since it is called from BPF assembly code.*/
int skb_copy_bits(const struct sk_buff *skb, int offset, void *to, int len)
{int start = skb_headlen(skb);struct sk_buff *frag_iter;int i, copy;if (offset > (int)skb->len - len)goto fault;/* Copy header. */if ((copy = start - offset) > 0) {if (copy > len)copy = len;skb_copy_from_linear_data_offset(skb, offset, to, copy);if ((len -= copy) == 0)return 0;offset += copy;to     += copy;}for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) {//将frags数组中数据整理到线性区int end;skb_frag_t *f = &skb_shinfo(skb)->frags[i];WARN_ON(start > offset + len);end = start + skb_frag_size(f);if ((copy = end - offset) > 0) {u32 p_off, p_len, copied;struct page *p;u8 *vaddr;if (copy > len)copy = len;skb_frag_foreach_page(f,skb_frag_off(f) + offset - start,copy, p, p_off, p_len, copied) {vaddr = kmap_atomic(p);memcpy(to + copied, vaddr + p_off, p_len);//复制页中数据到线性区kunmap_atomic(vaddr);}if ((len -= copy) == 0)//copy的长度如果达到了要pull的长度,则可以结束了return 0;offset += copy;to     += copy;}start = end;}skb_walk_frags(skb, frag_iter) {//frags队列中的page都已经合入线性区,但还是没有凑够要pull的长度,需要整理frag_list队列int end;WARN_ON(start > offset + len);end = start + frag_iter->len;if ((copy = end - offset) > 0) {if (copy > len)copy = len;if (skb_copy_bits(frag_iter, offset - start, to, copy))goto fault;if ((len -= copy) == 0)//copy的长度如果达到了要pull的长度,则可以结束了return 0;offset += copy;to     += copy;}start = end;}if (!len)return 0;fault:return -EFAULT;
}
EXPORT_SYMBOL(skb_copy_bits);

pskb_may_pull 的整合保证了TCP首部数据全部被放入线性空间,从而使GRO不影响TCP协议的处理。在应用进程使用系统调用收数据时,会将仍然分散在不连续空间中的数据copy到应用进程的缓存空间中。应用进程会使用tcp_recvmsg 函数接收数据:

18、tcp_recvmsg
int tcp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,size_t len, int nonblock, int flags, int *addr_len)
{   
...if (!(flags & MSG_TRUNC)) {err = skb_copy_datagram_msg(skb, offset, msg, used);//copy数据到用户缓存if (err) {/* Exception. Bailout! */if (!copied)copied = -EFAULT;break;}}
...

skb_copy_datagram_msg 函数可以将skb的线性区和非线性区中的数据全部copy进用户缓存,linux以GRO方式接收的数据会在这个函数中全部交付应用进程。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 09正弦稳态电路的分析
  • C语言学习笔记 Day10(指针--中)
  • 13.StringRedisTemplete使用
  • 巧用Array.forEach:简化循环与增强代码可读性;Array.forEach怎么用;面对大量数据时怎么提高Array.forEach的性能
  • C语言:字符函数,字符串函数
  • 《计算机网络 - 自顶向下方法》阅读笔记
  • 28. Hibernate 中的常见坑
  • webassembly初探
  • llama3.1本地部署方式
  • Java 中的泛型 集合(List,Set) Map
  • opencascade AIS_Line源码学习直线节点
  • 前端响应式布局解决方案分享
  • One-hot编码
  • 2024视频编辑网站微服务
  • android13去掉安全模式 删除安全模式
  • 【从零开始安装kubernetes-1.7.3】2.flannel、docker以及Harbor的配置以及作用
  • 【划重点】MySQL技术内幕:InnoDB存储引擎
  • 【译】React性能工程(下) -- 深入研究React性能调试
  • Angular js 常用指令ng-if、ng-class、ng-option、ng-value、ng-click是如何使用的?
  • AngularJS指令开发(1)——参数详解
  • eclipse(luna)创建web工程
  • IDEA常用插件整理
  • Linux快速复制或删除大量小文件
  • Odoo domain写法及运用
  • select2 取值 遍历 设置默认值
  • Web标准制定过程
  • 爱情 北京女病人
  • 大数据与云计算学习:数据分析(二)
  • 等保2.0 | 几维安全发布等保检测、等保加固专版 加速企业等保合规
  • 第十八天-企业应用架构模式-基本模式
  • 服务器从安装到部署全过程(二)
  • 关于Android中设置闹钟的相对比较完善的解决方案
  • 检测对象或数组
  • 盘点那些不知名却常用的 Git 操作
  • 浅析微信支付:申请退款、退款回调接口、查询退款
  • 软件开发学习的5大技巧,你知道吗?
  • 数据仓库的几种建模方法
  • 学习ES6 变量的解构赋值
  • 硬币翻转问题,区间操作
  • Linux权限管理(week1_day5)--技术流ken
  • 整理一些计算机基础知识!
  • ​​​​​​​GitLab 之 GitLab-Runner 安装,配置与问题汇总
  • ‌U盘闪一下就没了?‌如何有效恢复数据
  • # Redis 入门到精通(八)-- 服务器配置-redis.conf配置与高级数据类型
  • # 职场生活之道:善于团结
  • #WEB前端(HTML属性)
  • (4) PIVOT 和 UPIVOT 的使用
  • (c语言)strcpy函数用法
  • (Java数据结构)ArrayList
  • (初研) Sentence-embedding fine-tune notebook
  • (附源码)计算机毕业设计ssm高校《大学语文》课程作业在线管理系统
  • (力扣题库)跳跃游戏II(c++)
  • (十)c52学习之旅-定时器实验
  • (实战)静默dbca安装创建数据库 --参数说明+举例
  • (原創) 人會胖會瘦,都是自我要求的結果 (日記)