操作系统Posix API所提供的网络接口,数据收发是基于用户态与内核态的频繁切换实现。而dpdk实现了绕过内核监管,直接在用户态访问网络硬件,避免频繁状态切换。
DPDK安装与配置
虚拟机环境配置
检查是否支持多队列网卡 cat /proc/interrupts | grep ens33(获取整个机器的终端), 结果 19: 42 0 212 0 IO-APIC 19-fasteoi ens33,不支持多队列网卡。
虚拟机关机,修改文件 .vmx文件,找到 ethernet0.virtualDev = “e1000″,把 e1000 改成 vmxnet3,并添加 ethernet0.wakeOnPcktRcv = “TRUE”
Hugepage(大页/巨页)
进入 sudo vim /etc/default/grub 在 GRUB_CMDLINE_LINUX= 后 增加三个配置 default_hugepagesz=1G hugepagesz=2M hugepages=1024
然后虚拟机重启 sudo update-grub
用dpdk实现收取数据
环境搭建
DPDK源码下载:
core.dpdk.org/download/选择 19.08版本下载
进入dpdk/usertools/dpdk-setup.sh编译DPDK环境变量(只需要编译一次) :
如果不需要修改源码,选择 [36] x86_64-native-linuxapp-gcc
如果需要修改源码,选择 [39] x86_64-native-linux-gcc
如果出现报错:ERROR: Target does not have the DPDk uIo Kernel Module。To fix, please try to rebuild target.的解决方法:
设置环境变量1:export RTE_SDK=/home/king/share/dpdk/dpdk-stable-19.08.2/
设置环境变量2:export RTE_TARGET=x86_64-native-linux-gcc
设置Linux环境变量:
[43] Insert IGB UIO module
[44] Insert VFIO module
[45] Insert KNI module
[46] Setup hugepage mappings for non-NUMA systems
[47] Setup hugepage mappings for NUMA systems
[48] Display current Ethernet/Baseband/Crypto device settings
[49] Bind Ethernet/Baseband/Crypto device to IGB UIO module
[43] Insert IGB UIO module
功能:IGB UIO(Intel Gigabit Ethernet Userspace I/O)模块是一种用户空间输入 / 输出驱动模块。它允许绕过内核网络栈,直接在用户空间对网卡进行操作。在 DPDK(Data Plane Development Kit )中使用该模块,能使应用程序更高效地处理网络数据包,减少内核态和用户态之间的切换开销,提升数据包处理性能和吞吐量 。比如在高并发的网络服务器场景下,可让 DPDK 应用直接快速地与网卡交互。
[44] Insert VFIO module
功能:VFIO(Virtual Function I/O)是一种用于设备虚拟化的框架。插入 VFIO 模块后,可支持将物理设备(如网卡)以更灵活的方式分配给虚拟机或用户空间应用程序。在 DPDK 环境中,它能实现对设备的高效隔离和共享,方便在多租户或虚拟化场景下,让不同的应用或虚拟机安全、独立地使用设备资源,增强系统的灵活性和资源利用率 。
[45] Insert KNI module
功能:KNI(Kernel NIC Interface)模块提供了一种在 DPDK 应用和内核网络栈之间建立接口的方式。它允许 DPDK 处理后的数据包能够与内核网络栈进行交互,比如将经过 DPDK 处理的数据包发送到内核网络栈进行进一步的处理(如防火墙规则检查、路由等),或者从内核网络栈接收数据包进行 DPDK 层面的处理。这在一些需要结合用户态高性能处理和内核态网络功能的场景中很有用 。
[46] Setup hugepage mappings for non-NUMA systems 这里大页输入512
功能:大页(hugepage)是一种内存分页机制,相比普通的 4KB 页,大页通常为 2MB 或 1GB。对于非 NUMA(Non-Uniform Memory Access,非统一内存访问架构 )系统,设置大页映射可减少内存页表项数量,降低 CPU 在内存地址转换时的开销,提高内存访问效率。在 DPDK 这种对内存读写频繁的场景下,使用大页能显著提升应用性能,减少内存管理的开销 。
[47] Setup hugepage mappings for NUMA systems这里大页输入512
功能:NUMA 系统中,不同的 CPU 节点访问内存的速度不同。为 NUMA 系统设置大页映射,除了能享受类似非 NUMA 系统中减少页表项、提升内存访问效率的好处外,还能更好地结合 NUMA 架构特性,让每个 CPU 节点更高效地访问本地内存,进一步优化内存访问性能,使 DPDK 应用在 NUMA 架构的服务器上能充分发挥硬件优势,提升整体性能 。
[48] Display current Ethernet/Baseband/Crypto device settings这一步不用执行
功能:该选项用于显示当前以太网设备、基带设备以及加密设备的相关设置信息。比如网卡的工作模式(全双工、半双工等)、速率、MAC 地址,基带设备的参数配置,加密设备的算法、密钥等设置情况。通过查看这些信息,用户可以了解设备当前的运行状态,判断是否符合 DPDK 应用的需求,以便进行相应的调整和优化 。
[49] Bind Ethernet/Baseband/Crypto device to IGB UIO module这里选择 eth0对应的pci码 0000:03:00.0,注意需要先 sudo ifconfig eth0 down让网卡下线再插入
功能:将以太网设备、基带设备或加密设备绑定到 IGB UIO 模块,是为了让这些设备能够利用 IGB UIO 模块提供的用户空间直接访问功能。绑定后,DPDK 应用就可以绕过内核网络栈,直接对设备进行操作和数据处理,实现高效的数据包收发和设备控制,从而提升网络数据处理的性能和效率 。
DPDK原理与架构

UIO(Userspace I/O)
概念:是一种用户态驱动框架。Linux 内核提供uio.ko内核模块作为框架支持。原理是为注册的设备生成/dev/uioX字符设备,用户态程序借助它实现设备内存空间映射、中断开关与获取等操作。
作用:在 DPDK(Data Plane Development Kit )中,可让网卡驱动(如 igb_uio ,针对 Intel 网卡)运行在用户态,采用轮询和零拷贝方式从网卡收报文,提高收发性能。它截获中断并重置回调行为,绕过内核协议栈后续处理,还将网卡硬件寄存器映射到用户态。
局限:不支持 IOMMU,若设备需 DMA,用户态要获取并注册物理内存地址空间;每个设备仅支持一个中断,对需多中断响应的应用不太友好。
VFIO(Virtual Function I/O )
概念:是 UIO 的增强版,是全能用户态 IO 框架。借助 IOMMU(Input/Output Memory Management Unit ,输入 / 输出内存管理单元 )实现设备的 DMA 访问和中断重定向。包含接口层框架模块vfio和设备驱动模块(如vfio-pci ,针对 PCI 设备) 。
作用:通过 IOMMU 避免用户态注册物理 DMA 地址的高危操作,将用户态进程虚拟内存地址注册给网卡设备并维护页表;为设备每个 irq 中断提供用户态接收方式(注册 eventfd ),解决了 UIO 的安全和易用性问题,以及中断支持不足问题。在 DPDK 中,是主要的用户态驱动框架。
KNI(Kernel NIC Interface)
概念:是 DPDK 提供的一种模块,用于在 DPDK 应用和内核网络栈间建立接口。
作用:允许 DPDK 处理后的数据包与内核网络栈交互。比如将 DPDK 处理后的包送内核网络栈做防火墙规则检查、路由等;或从内核网络栈接收数据包进行 DPDK 层面处理,结合用户态高性能处理和内核态网络功能 。
DPDK(Data Plane Development Kit)是一个用于,快速数据包处理的开源库,基于 DPDK 的用户态协议栈实现,是在用户空间而非内核空间来处理网络协议,这样可以避开内核态和用户态切换开销,提升网络处理性能。
编译ustack.c代码
完整代码见下文,使用make编译,如果有 Makefile:73: *** “Please define RTE_SDK environment variable”. Stop.报错和 Cause: No Supported eth found报错:
设置编译时环境变量:
export RTE SDK=/home/king/share/dpdk/dpdk-stable-19.08.2/
export RTE TARGET=x86_64-native-linux-gcc
设置运行时环境:
sudo ./share/dpdk/dpdk-stable-19.08.2/usertools/dpdk-setup.sh
运行 39 43 44 45 46(512) 47(512) 49(down eth1)
设置 arp
在Linux虚拟机上输入 ifconfig -a,查看eth 0 的mac地址 HWaddr 00:0c:29:c6:0c:b1,linux上对应为冒号,window上对应为横杠 00-0c-29-c6-0c-b1
在window主机上输入arp -a 插看对应mac地址映射 接口: 10.134.96.172 — 0x13,0x13就是十进制的 19
设置静态 arp,netsh -c i i add neighbors 19 10.134.96.77 00-0c-29-c6-0c-b1, 记得用管理员运行
使用 sudo ./build/ustack运行代码
UDP收发数据
#include <stdio.h>
#include <unistd.h>
#include <rte_eal.h>
#include <rte_ethdev.h>
#include <rte_mbuf.h>
#include <arpa/inet.h>
#define NUM_MBUFS 2048 //缓冲区数量
#define BURST_SIZE 128 //每次接收的数据包数量
#define ENABLE_SEND 1 //用于控制是否启用数据包发送功能
int gDpdkPortId = 0; //指定使用的DPDK端口ID
// src_mac, dst_mac
// src_ip, dst_ip
// src_port, dst_port
#if ENABLE_SEND
//用于存储源和目的的MAC地址、IP地址和端口信息
uint8_t gSrcMac[RTE_ETHER_ADDR_LEN];
uint8_t gDstMac[RTE_ETHER_ADDR_LEN];
uint32_t gSrcIp;
uint32_t gDstIp;
uint16_t gSrcPort;
uint16_t gDstPort;
#endif
//构造一个UDP数据包,包括以太网帧头、IP头和UDP头
static int ustack_encode_udp_pkt(uint8_t *msg, char *data, uint16_t total_length) {
//1 ether
struct rte_ether_hdr *eth = (struct rte_ether_hdr*)msg;
rte_memcpy(eth->s_addr.addr_bytes, gSrcMac, RTE_ETHER_ADDR_LEN);
rte_memcpy(eth->d_addr.addr_bytes, gDstMac, RTE_ETHER_ADDR_LEN);
eth->ether_type = htons(RTE_ETHER_TYPE_IPV4);
//1 iphdr
struct rte_ipv4_hdr *iphdr = (struct rte_ipv4_hdr *)(eth+1);
iphdr->version_ihl = 0x54;
iphdr->type_of_service = 0x0;
iphdr->total_length = htons(total_length - sizeof(struct rte_ether_hdr));
iphdr->packet_id = 0;
iphdr->fragment_offset = 0;
iphdr->time_to_live = 0;
iphdr->next_proto_id = IPPROTO_UDP;
iphdr->src_addr = gSrcIp;
iphdr->dst_addr = gDstIp;
iphdr->hdr_checksum = 0;
iphdr->hdr_checksum = rte_ipv4_cksum(iphdr);//计算校验和
//1 udphdr
struct rte_udp_hdr *udphdr = (struct rte_udp_hdr *)(iphdr+1);
udphdr->src_port = gSrcPort;
udphdr->dst_port = gDstPort;
uint16_t udplen = total_length - sizeof(struct rte_ether_hdr) - sizeof(struct rte_ipv4_hdr);
udphdr->dgram_len = htons(udplen);
rte_memcpy((uint8_t*)(udphdr+1), data, udplen-sizeof(struct rte_udp_hdr));
udphdr->dgram_cksum = 0;
udphdr->dgram_cksum = rte_ipv4_udptcp_cksum(iphdr, udphdr);//计算校验和
return total_length;
}
// 发送数据包函数
static struct rte_mbuf *ustack_send(struct rte_mempool *mbuf_pool, char *data, uint16_t length) {
const unsigned total_length = length + sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr) + sizeof(struct rte_udp_hdr);
struct rte_mbuf *mbuf = rte_pktmbuf_alloc(mbuf_pool);//分配一个数据包缓存区
if (!mbuf) {
rte_exit(EXIT_FAILURE, "Error with EAL init
");
}
mbuf->pkt_len = total_length;
mbuf->data_len = total_length;
uint8_t *pktdata = rte_pktmbuf_mtod(mbuf, uint8_t*);
ustack_encode_udp_pkt(pktdata, data, total_length);//填充数据包内容
return mbuf;
}
static const struct rte_eth_conf port_conf_default = {
.rxmode = {.max_rx_pkt_len = RTE_ETHER_MAX_LEN }
};
int main(int argc, char *argv[]) {
if (rte_eal_init(argc, argv) < 0) {//初始化DPDK环境
rte_exit(EXIT_FAILURE, "Error with EAL init
");
}
uint16_t nb_sys_ports = rte_eth_dev_count_avail();
if (nb_sys_ports == 0) {
rte_exit(EXIT_FAILURE, "No Support eth found
");
}
printf("nb_sys_ports: %d
", nb_sys_ports);
//创建一个内存池用于存储数据包
struct rte_mempool *mbuf_pool = rte_pktmbuf_pool_create("mbufpool", NUM_MBUFS, 0, 0,
RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
if (!mbuf_pool) {
rte_exit(EXIT_FAILURE, "Could not create mbuf pool
");
}
struct rte_eth_dev_info dev_info;
rte_eth_dev_info_get(gDpdkPortId, &dev_info);
const int num_rx_queues = 1;
const int num_tx_queues = 1;
struct rte_eth_conf port_conf = port_conf_default;
if (rte_eth_dev_configure(gDpdkPortId, num_rx_queues, num_tx_queues, &port_conf) < 0) {
rte_exit(EXIT_FAILURE, "Could not configure
");
}
if (rte_eth_rx_queue_setup(gDpdkPortId, 0, 128, rte_eth_dev_socket_id(gDpdkPortId),
NULL, mbuf_pool) < 0) {
rte_exit(EXIT_FAILURE, "Could not setup RX queue
");
}
#if ENABLE_SEND
struct rte_eth_txconf txq_conf = dev_info.default_txconf;
txq_conf.offloads = port_conf.rxmode.offloads;
if (rte_eth_tx_queue_setup(gDpdkPortId, 0, 512, rte_eth_dev_socket_id(gDpdkPortId), &txq_conf) < 0) {
rte_exit(EXIT_FAILURE, "Could not setup TX queue
");
}
#endif
if (rte_eth_dev_start(gDpdkPortId) < 0) {
rte_exit(EXIT_FAILURE, "Could not start
");
}
printf("dev start success
");
while (1) {
struct rte_mbuf *mbufs[BURST_SIZE];
unsigned nb_recvd = rte_eth_rx_burst(gDpdkPortId, 0, mbufs, BURST_SIZE);//接收数据包,并解析UDP数据包内容
//收数据还要配置一个静态的arp
if (nb_recvd > BURST_SIZE) {
rte_exit(EXIT_FAILURE, "Error with rte_eth_rx_burst
");
}
/*
+------------+---------------+-------------------+--------------+
| ethhdr | iphdr | udphdr/tcphdr | payload |
+------------+---------------+-------------------+--------------+
*/
unsigned i = 0;
//根据接收到的数据包构造一个新的UDP数据包并发送
for (i = 0;i < nb_recvd;i ++) {
struct rte_ether_hdr *ehdr = rte_pktmbuf_mtod(mbufs[i], struct rte_ether_hdr *);
if (ehdr->ether_type != rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4)) {
continue;
}
struct rte_ipv4_hdr *iphdr = rte_pktmbuf_mtod_offset(mbufs[i], struct rte_ipv4_hdr *, sizeof(struct rte_ether_hdr));
if (iphdr->next_proto_id == IPPROTO_UDP) {
struct rte_udp_hdr *udphdr = (struct rte_udp_hdr *)(iphdr+1);
#if ENABLE_SEND
rte_memcpy(gSrcMac, ehdr->d_addr.addr_bytes, RTE_ETHER_ADDR_LEN);
rte_memcpy(gDstMac, ehdr->s_addr.addr_bytes, RTE_ETHER_ADDR_LEN);
rte_memcpy(&gSrcIp, &iphdr->dst_addr, sizeof(uint32_t));
rte_memcpy(&gDstIp, &iphdr->src_addr, sizeof(uint32_t));
rte_memcpy(&gSrcPort, &udphdr->dst_port, sizeof(uint16_t));
rte_memcpy(&gDstPort, &udphdr->src_port, sizeof(uint16_t));
#endif
uint16_t length = ntohs(udphdr->dgram_len) - sizeof(struct rte_udp_hdr);
printf("length: %d, content: %s
", length, (char *)(udphdr+1));
#if ENABLE_SEND
struct rte_mbuf *txbuf = ustack_send(mbuf_pool, (char *)(udphdr+1), length);
rte_eth_tx_burst(gDpdkPortId, 0, &txbuf, 1);
//printf("ustack_send
");
rte_pktmbuf_free(txbuf);
#endif
}
//rte_eth_tx_burst(gDpdkPortId, 0, &mbufs[i], 1);
}
}
return 0;
}
自定义epoll
通过将文件描述符交给epoll统一管理实现IO多路复用。问题在于系统提供的struct epoll_event所接管的fd是操作系统自己维护的一套文件描述符,那么如何让epoll可以管理自定义的fd呢?
所以我们需要自定义epoll改写接口将其适配到我们的项目中。epoll的核心在于四个API和一个数据结构。四个API用于初始化和管理事件。
分别定义epoll、epoll数据、epoll事件结构体:
struct eventpoll {
int fd;
ep_rb_tree rbr;
int rbcnt;
LIST_HEAD( ,epitem) rdlist;
int rdnum;
int waiting;
pthread_mutex_t mtx; //rbtree update
pthread_spinlock_t lock; //rdlist update
pthread_cond_t cond; //block for event
pthread_mutex_t cdmtx; //mutex for cond
};
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events;
epoll_data_t data;
};
tcp_process中如果有可读的数据就绪,通过epoll_event_callback通知epoll该连接有数据可读:
//函数定义
int epoll_event_callback(struct eventpoll *ep, int sockid, uint32_t event) {
struct epitem tmp;
tmp.sockfd = sockid;
struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp);
if (!epi) {
printf("rbtree not exist
");
return -1;
}
if (epi->rdy) {
epi->event.events |= event;
return 1;
}
printf("epoll_event_callback --> %d
", epi->sockfd);
pthread_spin_lock(&ep->lock);
epi->rdy = 1;
LIST_INSERT_HEAD(&ep->rdlist, epi, rdlink);
ep->rdnum ++;
pthread_spin_unlock(&ep->lock);
pthread_mutex_lock(&ep->cdmtx);
pthread_cond_signal(&ep->cond);
pthread_mutex_unlock(&ep->cdmtx);
}
在tcp成功建立连接时,该fd就变为可读事件需要通过回调函数通知epoll:
static int ng_tcp_handle_syn_rcvd(struct ng_tcp_stream *stream, struct rte_tcp_hdr *tcphdr) {
...
struct ng_tcp_table *table = tcpInstance();
epoll_event_callback(table->ep, listener->fd, EPOLLIN);
...
}
epoll的使用场景往往需要频繁的查找,并且无法事先明确该函数需要占用的内存,综合考虑采用红黑树作为epoll的数据结构。epoll中所有的事件都由红黑树统一组织,就绪队列只属于红黑树的一部分。如图,epoll目前管理五个事件,只有红色为就绪事件被就绪链表统一连接。

定义每个结点结构体,将数据交给红黑树管理:
struct epitem {
RB_ENTRY(epitem) rbn;
LIST_ENTRY(epitem) rdlink;
int rdy; //exist in list
int sockfd;
struct epoll_event event;
};
epoll_create初始化epoll实例,其主要功能就是给必要的参数初始化和分配空间:
int nepoll_create(int size) {
int epfd = get_fd_frombitmap(); //tcp, udp
struct eventpoll *ep = (struct eventpoll*)rte_malloc("eventpoll", sizeof(struct eventpoll), 0);
...
ep->fd = epfd;
ep->rbcnt = 0;
RB_INIT(&ep->rbr);//插入红黑树
LIST_INIT(&ep->rdlist);//初始化就绪链表
...
return epfd;
}
nepoll_ctl根据提供的操作符设置事件。对于epoll事件来说常用三类操作:添加、修改、删除。这三个操作本质来说就是对底层红黑树的CURD。
根据红黑树查找的情况,添加一个新分配的epitem。同时还需要注意epoll上下文作为一个互斥资源需要通过加锁来进行互斥访问。
if (op == EPOLL_CTL_ADD) {
pthread_mutex_lock(&ep->mtx);
...
struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp);
...
epi = (struct epitem*)rte_malloc("epitem", sizeof(struct epitem), 0);
...
epi = RB_INSERT(_epoll_rb_socket, &ep->rbr, epi);
ep->rbcnt ++;
...
pthread_mutex_unlock(&ep->mtx);
}
同理增加、修改事件就是在互斥条件下对红黑树的结点进行调整此处不再赘述。
epoll_wait核心功能是等待 epoll 实例中就绪的事件,并将这些事件返回给用户。这个函数的实现基础就是对互斥锁和自旋锁的使用(多线程互斥资源管理方案 – +_+0526 – 博客园)。使用条件变量+互斥锁处理事件的等待,通过条件变量将未就绪的线程CPU让出。而使用自旋锁保证对就绪链表的高效操作,减少了互斥锁频繁切换上下文所带来的开销。
当rdnum为0,即无就绪事件时,计算判断超时时间将线程使用条件等待挂起。
while (ep->rdnum == 0 && timeout != 0) {
ep->waiting = 1;
if (timeout > 0) {
// 计算绝对超时时间
struct timespec deadline = ...
int ret = pthread_cond_timedwait(&ep->cond, &ep->cdmtx, &deadline);
timeout = 0;
} else if (timeout < 0) {
// 无限等待,直到有事件就绪
int ret = pthread_cond_wait(&ep->cond, &ep->cdmtx);
}
ep->waiting = 0;
}
使用自旋锁保护就绪链表rdlist,当就绪链表不为空,从就绪链表头部取出epitem结点作为接续为事件返回给用户。
pthread_spin_lock(&ep->lock);
int cnt = 0;
int num = (ep->rdnum > maxevents ? maxevents : ep->rdnum);
while (num != 0 && !LIST_EMPTY(&ep->rdlist)) {
struct epitem *epi = LIST_FIRST(&ep->rdlist);
LIST_REMOVE(epi, rdlink); // 从就绪链表移除
memcpy(&events[cnt++], &epi->event, sizeof(struct epoll_event));
ep->rdnum--;
}
pthread_spin_unlock(&ep->lock);
至此自定义epoll的实现告一段落,总结一下。之所以要自己实现epoll,是因为自定义的网络协议栈并未采用系统提供的文件描述符系统,从而需要定制化一个可适配的epoll组件。epoll是由一个数据结构和四个API(epoll_create,epoll_ctl,epoll_wait,epoll_event_callback)组织而成,其高并发性能就来自于底层的红黑树与就绪链表;四个API分别往下处理数据结构的CURD,往上用于通知协议处理模块信息。
最后在tcp_server_entry中按照epoll使用方法开启监听处理事件:
while (1) {
struct epoll_event events[1024] = {0};
int nready = epoll_wait(epfd, events, 1024, -1);
int i = 0;
for (i = 0;i < nready;i ++) {
int connfd = events[i].data.fd;
if (connfd == listenfd) {
int clientfd = naccept(listenfd, (struct sockaddr*)&clientaddr, &len);
printf("accept finshed: %d
", clientfd);
ev.events = EPOLLIN;
ev.data.fd = clientfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);
} else if (events[i].events & EPOLLIN) {
char buffer[1024] = {0};
int count = nrecv(connfd, buffer, 1024, 0);
if (count == 0) { // disconnect
printf("client disconnect: %d
", connfd);
nclose(connfd);
epoll_ctl(epfd, EPOLL_CTL_DEL, connfd, NULL);
continue;
}
printf("RECV: %s
", buffer);
count = nsend(connfd, buffer, count, 0);
printf("SEND: %d
", count);
}
}
以下是总结的几个问题:



















暂无评论内容