VPP输入节点与线程之间的关系


输入节点作为报文处理的入口,VPP支持多种类型的报文输入节点。每一种使用相同驱动类型的网卡有一个报文输入节点,比如dpdk类型的网卡,tap,af_packet类型的网卡等等。一种驱动类型一个输入节点,这主要是决定了收包的函数。输入节点使用宏进行注册,在main函数启动前注册到链表中。每一个工作线程都有相同的输入节点。线程的输入节点具体处理的队列动态分配,采用轮叫原则。

节点类型

typedef enum
{
  /* An internal node on the call graph (could be output). */
  VLIB_NODE_TYPE_INTERNAL,

  /* Nodes which input data into the processing graph.
     Input nodes are called for each iteration of main loop. 
     输入节点,报文流转入口 */
  VLIB_NODE_TYPE_INPUT,

  /* Nodes to be called before all input nodes.
     Used, for example, to clean out driver TX rings before
     processing input. */
  VLIB_NODE_TYPE_PRE_INPUT,

  /* "Process" nodes which can be suspended and later resumed. */
  VLIB_NODE_TYPE_PROCESS,

  VLIB_N_NODE_TYPE,
} vlib_node_type_t;

节点注册描述结构,用于注册一个节点

typedef struct _vlib_node_fn_registration
{
  vlib_node_function_t *function;
  int priority;
  struct _vlib_node_fn_registration *next_registration;
  char *name;
} vlib_node_fn_registration_t;

注册节点描述结构,用于表示一个注册节点

typedef struct _vlib_node_registration
{
  /* Vector processing function for this node. */
  vlib_node_function_t *function;

  /* Node function candidate registration with priority */
  vlib_node_fn_registration_t *node_fn_registrations;

  /* Node name. */
  char *name;

  /* Name of sibling (if applicable). */
  char *sibling_of;

  /* Node index filled in by registration. */
  u32 index;

  /* Type of this node. */
  vlib_node_type_t type;

  /* Error strings indexed by error code for this node. */
  char **error_strings;

  /* Buffer format/unformat for this node. */
  format_function_t *format_buffer;
  unformat_function_t *unformat_buffer;

  /* Trace format/unformat for this node. */
  format_function_t *format_trace;
  unformat_function_t *unformat_trace;

  /* Function to validate incoming frames. */
  u8 *(*validate_frame) (struct vlib_main_t * vm,
			 struct vlib_node_runtime_t *,
			 struct vlib_frame_t * f);

  /* Per-node runtime data. */
  void *runtime_data;

  /* Process stack size. 协程栈大小 */
  u16 process_log2_n_stack_bytes;

  /* Number of bytes of per-node run time data. */
  u8 runtime_data_bytes;

  /* State for input nodes. */
  u8 state;

  /* Node flags. */
  u16 flags;

  /* protocol at b->data[b->current_data] upon entry to the dispatch fn */
  u8 protocol_hint;

  /* Size of scalar and vector arguments in bytes. */
  u16 scalar_size, vector_size;

  /* Number of error codes used by this node. */
  u16 n_errors;

  /* Number of next node names that follow. */
  u16 n_next_nodes;

  /* Constructor link-list, don't ask... */
  struct _vlib_node_registration *next_registration;

  /* Names of next nodes which this node feeds into. */
  char *next_nodes[];

} vlib_node_registration_t;

节点注册相关的宏

#ifndef CLIB_MARCH_VARIANT
#define VLIB_REGISTER_NODE(x,...)                                       \
    __VA_ARGS__ vlib_node_registration_t x;                             \  //声明一个需要注册的节点
static void __vlib_add_node_registration_##x (void)                     \  //声明一个静态的添加一个节点的函数,有constructor属性,在main函数之前执行
    __attribute__((__constructor__)) ;                                  \
static void __vlib_add_node_registration_##x (void)                     \
{                                                                       \  //定义添加节点函数,即将节点x链接到vm->node_main.node_registrations链表中
    vlib_main_t * vm = vlib_get_main();                                 \
    x.next_registration = vm->node_main.node_registrations;             \
    vm->node_main.node_registrations = &x;                              \
}                                                                       \
static void __vlib_rm_node_registration_##x (void)                      \  //从链表中移除节点
    __attribute__((__destructor__)) ;                                   \
static void __vlib_rm_node_registration_##x (void)                      \
{                                                                       \
    vlib_main_t * vm = vlib_get_main();                                 \
    VLIB_REMOVE_FROM_LINKED_LIST (vm->node_main.node_registrations,     \
                                  &x, next_registration);               \
}                                                                       \
__VA_ARGS__ vlib_node_registration_t x          // 定义一个需要注册的节点,这里没有分号,是因为使用这个宏的时候有分号,并且初始化该变量。
#else
#define VLIB_REGISTER_NODE(x,...)                                       \
static __clib_unused vlib_node_registration_t __clib_unused_##x
#endif

VPP定义的输入节点样例

我们以DPDK类型的输入节点来进行分析。

/* *INDENT-OFF* */
VLIB_REGISTER_NODE (dpdk_input_node) = {
  .type = VLIB_NODE_TYPE_INPUT,
  .name = "dpdk-input",
  .sibling_of = "device-input",

  /* Will be enabled if/when hardware is detected. */
  .state = VLIB_NODE_STATE_DISABLED,

  .format_buffer = format_ethernet_header_with_length,
  .format_trace = format_dpdk_rx_trace,

  .n_errors = DPDK_N_ERROR,
  .error_strings = dpdk_error_strings,
};

输入节点没有指明其报文接收函数,因为接收函数也是采用宏来进行注册的,是一个链表,也就是说接收报文函数可以有多个,使用优先级最高的那个。

输入节点处理函数

输入节点处理函数宏

#define VLIB_NODE_FN(node)						\
uword CLIB_MARCH_SFX (node##_fn)();					\
static vlib_node_fn_registration_t					\
  CLIB_MARCH_SFX(node##_fn_registration) =				\
  { .function = &CLIB_MARCH_SFX (node##_fn), };				\
									\
static void __clib_constructor						\
CLIB_MARCH_SFX (node##_multiarch_register) (void)			\
{									\
  extern vlib_node_registration_t node;					\ //这里引用了一个node节点,其名字为宏的输入参数,说明在定义节点和其处理函数的时候要求它们有一样的名字。
  vlib_node_fn_registration_t *r;					\
  r = & CLIB_MARCH_SFX (node##_fn_registration);			\
  r->priority = CLIB_MARCH_FN_PRIORITY();				\//处理函数优先级,根据优先级选择最高优先级的处理函数
  r->name = CLIB_MARCH_VARIANT_STR;					\
  r->next_registration = node.node_fn_registrations;			\//将函数添加到其对应的节点链表中,从这里可以看出一个节点可以有多个处理函数,在函数register_node中会选择一个优先级最高的函数作为节点的最终处理函数。
  node.node_fn_registrations = r;					\
}									\
uword CLIB_CPU_OPTIMIZED CLIB_MARCH_SFX (node##_fn)

VLIB_NODE_FN (dpdk_input_node) (vlib_main_t * vm, vlib_node_runtime_t * node,
				vlib_frame_t * f)
{
  dpdk_main_t *dm = &dpdk_main;
  dpdk_device_t *xd;
  uword n_rx_packets = 0;
  /* 获取输入节点的运行信息,其中的devices_and_queues包含了该线程在该输入节点需要处理的队列信息,动态增加该类设备时,会在修改其中的信息 */
  vnet_device_input_runtime_t *rt = (void *) node->runtime_data;
  vnet_device_and_queue_t *dq;/*  */
  u32 thread_index = node->thread_index;

  /*
   * Poll all devices on this cpu for input/interrupts.
   */
  /* *INDENT-OFF* 遍历该线程接管的每一个设备的每一个队列 */
  foreach_device_and_queue (dq, rt->devices_and_queues)
    {
      xd = vec_elt_at_index(dm->devices, dq->dev_instance);
      if (PREDICT_FALSE (xd->flags & DPDK_DEVICE_FLAG_BOND_SLAVE))
	continue;	/* Do not poll slave to a bonded interface */
      n_rx_packets += dpdk_device_input (vm, dm, xd, node, thread_index,
					 dq->queue_id);
    }
  /* *INDENT-ON* */
  return n_rx_packets;
}

构建多线程时,为每一个线程构建输入节点

该工作主要是由start_workers函数负责。

static clib_error_t *
start_workers (vlib_main_t * vm)
{
    int i, j;
    vlib_worker_thread_t *w;
    vlib_main_t *vm_clone;
    void *oldheap;
    vlib_thread_main_t *tm = &vlib_thread_main;
    vlib_thread_registration_t *tr;
    vlib_node_runtime_t *rt;
    u32 n_vlib_mains = tm->n_vlib_mains;
    u32 worker_thread_index;
    u8 *main_heap = clib_mem_get_per_cpu_heap ();

    vec_reset_length (vlib_worker_threads);

    /* Set up the main thread */
    vec_add2_aligned (vlib_worker_threads, w, 1, CLIB_CACHE_LINE_BYTES);
    w->elog_track.name = "main thread";
    elog_track_register (&vm->elog_main, &w->elog_track);
    /* 设置主线程的名字,前缀是“VPP” */
    if (vec_len (tm->thread_prefix))
    {
        w->name = format (0, "%v_main%c", tm->thread_prefix, '\0');
        vlib_set_thread_name ((char *) w->name);
    }

    vm->elog_main.lock =
        clib_mem_alloc_aligned (CLIB_CACHE_LINE_BYTES, CLIB_CACHE_LINE_BYTES);
    vm->elog_main.lock[0] = 0;
    /* 存在其它线程,可能是hqos或者是worker */
    if (n_vlib_mains > 1)
    {
        /* Replace hand-crafted length-1 vector with a real vector */
        vlib_mains = 0;

        vec_validate_aligned (vlib_mains, tm->n_vlib_mains - 1,
                              CLIB_CACHE_LINE_BYTES);
        _vec_len (vlib_mains) = 0;
        vec_add1_aligned (vlib_mains, vm, CLIB_CACHE_LINE_BYTES);

        vlib_worker_threads->wait_at_barrier =
            clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
        vlib_worker_threads->workers_at_barrier =
            clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);

        vlib_worker_threads->node_reforks_required =
            clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);

        /* We'll need the rpc vector lock... */
        clib_spinlock_init (&vm->pending_rpc_lock);

        /* Ask for an initial barrier sync */
        *vlib_worker_threads->workers_at_barrier = 0;
        *vlib_worker_threads->wait_at_barrier = 1;

        /* Without update or refork */
        *vlib_worker_threads->node_reforks_required = 0;
        vm->need_vlib_worker_thread_node_runtime_update = 0;

        /* init timing */
        vm->barrier_epoch = 0;
        vm->barrier_no_close_before = 0;

        worker_thread_index = 1;
        /* 遍历每一个功能注册节点,一般只有一个,即报文处理函数 */
        for (i = 0; i < vec_len (tm->registrations); i++)
        {
            vlib_node_main_t *nm, *nm_clone;
            int k;

            tr = tm->registrations[i];

            if (tr->count == 0)
                continue;

            for (k = 0; k < tr->count; k++)/* 该类型任务需要几个核 */
            {
                vlib_node_t *n;
                /* 添加一个线程描述控制块 */
                vec_add2 (vlib_worker_threads, w, 1);
                ......

                /* Allocate all nodes in single block for speed */
                /* 分配注册节点数组内存 */
                n = clib_mem_alloc_no_fail (vec_len (nm->nodes) * sizeof (*n));
                /* 拷贝注册节点数组,是一个指针数组 */
                for (j = 0; j < vec_len (nm->nodes); j++)
                {
                    clib_memcpy (n, nm->nodes[j], sizeof (*n));
                    /* none of the copied nodes have enqueue rights given out */
                    n->owner_node_index = VLIB_INVALID_NODE_INDEX;
                    clib_memset (&n->stats_total, 0, sizeof (n->stats_total));
                    clib_memset (&n->stats_last_clear, 0,
                                 sizeof (n->stats_last_clear));
                    vec_add1 (nm_clone->nodes, n);
                    n++;
                }
                
                /* 拷贝内部节点数组 */
                nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
                    vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
                                     CLIB_CACHE_LINE_BYTES);
                /* 初始化每一个内部节点 */
                vec_foreach (rt,
                             nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL])
                {
                    vlib_node_t *n = vlib_get_node (vm, rt->node_index);
                    /* 设置运行的线程索引 */
                    rt->thread_index = vm_clone->thread_index;
                    /* copy initial runtime_data from node */
                    /* 拷贝原始的节点数据 */
                    if (n->runtime_data && n->runtime_data_bytes > 0)
                        clib_memcpy (rt->runtime_data, n->runtime_data,
                                     clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
                                               n->runtime_data_bytes));
                }
                /* 拷贝输入节点数组 */
                nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
                    vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
                                     CLIB_CACHE_LINE_BYTES);
                /* 初始化每一个输入节点 */
                vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
                {
                    vlib_node_t *n = vlib_get_node (vm, rt->node_index);
                    /* 设置运行的线程索引 */
                    rt->thread_index = vm_clone->thread_index;
                    /* copy initial runtime_data from node */
                    /* 拷贝原始的节点数据 */
                    if (n->runtime_data && n->runtime_data_bytes > 0)
                        clib_memcpy (rt->runtime_data, n->runtime_data,
                                     clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
                                               n->runtime_data_bytes));
                }
                /* 复制所有VLIB_NODE_TYPE_PRE_INPUT节点运行时数据 */
                nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT] =
                    vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT],
                                     CLIB_CACHE_LINE_BYTES);
                vec_foreach (rt,
                             nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT])
                {
                    vlib_node_t *n = vlib_get_node (vm, rt->node_index);
                    /* 设置正确的运行线程 */
                    rt->thread_index = vm_clone->thread_index;
                    /* copy initial runtime_data from node */
                    /* 拷贝原始的注册运行数据,这样就跟源节点就无关了 */
                    if (n->runtime_data && n->runtime_data_bytes > 0)
                        clib_memcpy (rt->runtime_data, n->runtime_data,
                                     clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
                                               n->runtime_data_bytes));
                }
                /* 拷贝所有的协程节点 */
                nm_clone->processes = vec_dup_aligned (nm->processes,
                                                       CLIB_CACHE_LINE_BYTES);

                /* Create per-thread frame freelist */
                /* 创建一个帧分配控制块 */
                nm_clone->frame_sizes = vec_new (vlib_frame_size_t, 1);
#ifdef VLIB_SUPPORTS_ARBITRARY_SCALAR_SIZES
                nm_clone->frame_size_hash = hash_create (0, sizeof (uword));
#endif

                /* Packet trace buffers are guaranteed to be empty, nothing to do here */

                clib_mem_set_heap (oldheap);
                /* 加入全局数组vlib_mains */
                vec_add1_aligned (vlib_mains, vm_clone, CLIB_CACHE_LINE_BYTES);
                /* 构建错误统计数组 */
                vm_clone->error_main.counters = vec_dup_aligned
                                                (vlib_mains[0]->error_main.counters, CLIB_CACHE_LINE_BYTES);
                vm_clone->error_main.counters_last_clear = vec_dup_aligned
                        (vlib_mains[0]->error_main.counters_last_clear,
                         CLIB_CACHE_LINE_BYTES);

                worker_thread_index++;
            }
        }
    }
    else
    {
        /* only have non-data-structure copy threads to create... */
        for (i = 0; i < vec_len (tm->registrations); i++)
        {
            tr = tm->registrations[i];

            for (j = 0; j < tr->count; j++)
            {
                vec_add2 (vlib_worker_threads, w, 1);
                if (tr->mheap_size)
                {
#if USE_DLMALLOC == 0
                    w->thread_mheap =
                        mheap_alloc (0 /* use VM */ , tr->mheap_size);
#else
                    w->thread_mheap =
                        create_mspace (tr->mheap_size, 0 /* locked */ );
#endif
                }
                else
                    w->thread_mheap = main_heap;
                w->thread_stack =
                    vlib_thread_stack_init (w - vlib_worker_threads);
                w->thread_function = tr->function;
                w->thread_function_arg = w;
                w->instance_id = j;
                w->elog_track.name =
                    (char *) format (0, "%s %d", tr->name, j + 1);
                w->registration = tr;
                vec_add1 (w->elog_track.name, 0);
                elog_track_register (&vm->elog_main, &w->elog_track);
            }
        }
    }
    ......
    return 0;
}

分配每一个线程处理的队列个数,采用的是轮叫策略

//dpdk组件初始化
static clib_error_t *
dpdk_lib_init (dpdk_main_t * dm)
{
    u32 nports;
    u32 mtu, max_rx_frame;
    int i;
    clib_error_t *error;
    vlib_main_t *vm = vlib_get_main ();
    vlib_thread_main_t *tm = vlib_get_thread_main ();
    vnet_device_main_t *vdm = &vnet_device_main;
    vnet_sw_interface_t *sw;
    vnet_hw_interface_t *hi;
    dpdk_device_t *xd;
    vlib_pci_addr_t last_pci_addr;
    u32 last_pci_addr_port = 0;
    vlib_thread_registration_t *tr_hqos;
    uword *p_hqos;

    u32 next_hqos_cpu = 0;
    u8 af_packet_instance_num = 0;
    u8 bond_ether_instance_num = 0;
    last_pci_addr.as_u32 = ~0;

    dm->hqos_cpu_first_index = 0;
    dm->hqos_cpu_count = 0;

    /* find out which cpus will be used for I/O TX */
    p_hqos = hash_get_mem (tm->thread_registrations_by_name, "hqos-threads");
    tr_hqos = p_hqos ? (vlib_thread_registration_t *) p_hqos[0] : 0;

    if (tr_hqos && tr_hqos->count > 0)
    {
        dm->hqos_cpu_first_index = tr_hqos->first_index;
        dm->hqos_cpu_count = tr_hqos->count;
    }

    vec_validate_aligned (dm->devices_by_hqos_cpu, tm->n_vlib_mains - 1,
                          CLIB_CACHE_LINE_BYTES);

    nports = rte_eth_dev_count_avail ();

    if (nports < 1)
    {
        dpdk_log_notice ("DPDK drivers found no ports...");
    }

    if (CLIB_DEBUG > 0)
        dpdk_log_notice ("DPDK drivers found %d ports...", nports);

    if (dm->conf->enable_tcp_udp_checksum)
        dm->buffer_flags_template &= ~(VNET_BUFFER_F_L4_CHECKSUM_CORRECT
                                       | VNET_BUFFER_F_L4_CHECKSUM_COMPUTED);

    /* vlib_buffer_t template */
    vec_validate_aligned (dm->per_thread_data, tm->n_vlib_mains - 1,
                          CLIB_CACHE_LINE_BYTES);
    for (i = 0; i < tm->n_vlib_mains; i++)
    {
        dpdk_per_thread_data_t *ptd = vec_elt_at_index (dm->per_thread_data, i);
        clib_memset (&ptd->buffer_template, 0, sizeof (vlib_buffer_t));
        ptd->buffer_template.flags = dm->buffer_flags_template;
        vnet_buffer (&ptd->buffer_template)->sw_if_index[VLIB_TX] = (u32) ~ 0;
    }

    /* *INDENT-OFF* 遍历每一个dpdk设备 */
    RTE_ETH_FOREACH_DEV(i)
    {
        ......

        if (devconf->workers)
        {
            int i;
            q = 0;
            clib_bitmap_foreach (i, devconf->workers, (//遍历每一个工作者线程
            {
                //分配该设备的队列q到工作线程i上,采用的是轮叫策略
                vnet_hw_interface_assign_rx_thread (dm->vnet_main, xd->hw_if_index, q++,
                vdm->first_worker_thread_index + i);
            }));
        }
        else
            //如果没有工作者线程的话,全部由主线程负责
            for (q = 0; q < xd->rx_q_used; q++)
            {
                vnet_hw_interface_assign_rx_thread (dm->vnet_main, xd->hw_if_index, q,	/* any */
                                                    ~1);
            }
        ......
    }
    /* *INDENT-ON* */

    return 0;
}

vnet_hw_interface_assign_rx_thread

该函数在进行队列分配时,使用如下结构来控制分配过程,保证公平性:


typedef struct
{
    vnet_device_per_worker_data_t *workers;
    uword first_worker_thread_index;
    uword last_worker_thread_index;
    uword next_worker_thread_index;//分配过程中记录的下一个分配的线程索引
} vnet_device_main_t;

函数实现:

/* 给接口分配收包线程 */
void
vnet_hw_interface_assign_rx_thread (vnet_main_t * vnm, u32 hw_if_index,
                                    u16 queue_id, uword thread_index)
{
    vnet_device_main_t *vdm = &vnet_device_main;
    vlib_main_t *vm, *vm0;
    vnet_device_input_runtime_t *rt;
    vnet_device_and_queue_t *dq;
    vnet_hw_interface_t *hw = vnet_get_hw_interface (vnm, hw_if_index);//队列所属设备描述符

    ASSERT (hw->input_node_index > 0);

    if (vdm->first_worker_thread_index == 0)/* 没有收包线程,即主线程负责所有工作,则使用主模式进行收包 */
        thread_index = 0;

    if (thread_index != 0 &&/* 选取合适的线程进行收包,算法为rr,保证公平性 */
            (thread_index < vdm->first_worker_thread_index ||
             thread_index > vdm->last_worker_thread_index))
    {
        thread_index = vdm->next_worker_thread_index++;
        if (vdm->next_worker_thread_index > vdm->last_worker_thread_index)/* 开始下一个来回 */
            vdm->next_worker_thread_index = vdm->first_worker_thread_index;
    }

    vm = vlib_mains[thread_index];/* 收包线程 */
    vm0 = vlib_get_main ();/* 本线程,一般是主线程 */

    vlib_worker_thread_barrier_sync (vm0);/*  开始同步 */
    //获取该类设备所在的输入节点动态运行信息
    rt = vlib_node_get_runtime_data (vm, hw->input_node_index);/* 获取输入节点的运行数据 */
    //给该输入节点增加一个队列
    vec_add2 (rt->devices_and_queues, dq, 1);/* 增加一个队列 */
    dq->hw_if_index = hw_if_index;
    dq->dev_instance = hw->dev_instance;
    dq->queue_id = queue_id;
    dq->mode = VNET_HW_INTERFACE_RX_MODE_POLLING;/* 默认设置轮询模式 */
    rt->enabled_node_state = VLIB_NODE_STATE_POLLING;

    vnet_device_queue_update (vnm, rt);
    /* 建立队列与线程之间的映射关系 */
    vec_validate (hw->input_node_thread_index_by_queue, queue_id);
    vec_validate (hw->rx_mode_by_queue, queue_id);
    hw->input_node_thread_index_by_queue[queue_id] = thread_index;
    hw->rx_mode_by_queue[queue_id] = VNET_HW_INTERFACE_RX_MODE_POLLING;

    vlib_worker_thread_barrier_release (vm0);
    /* 更新节点状态统计信息 */
    vlib_node_set_state (vm, hw->input_node_index, rt->enabled_node_state);
}

动态增加一个设备,如何分配到哪一个线程

动态增加一个新的网络设备,将会在设备初始化的时候调用函数vnet_hw_interface_assign_rx_thread分配到指定的工作者线程。我们以af_packet类型的网卡为例进行分析。

/* 创建一个af_packet类型的接口 */
int
af_packet_create_if (vlib_main_t * vm, u8 * host_if_name, u8 * hw_addr_set,
                     u32 * sw_if_index)
{
    af_packet_main_t *apm = &af_packet_main;
    int ret, fd = -1, fd2 = -1;
    struct tpacket_req *rx_req = 0;
    struct tpacket_req *tx_req = 0;
    struct ifreq ifr;
    u8 *ring = 0;
    af_packet_if_t *apif = 0;
    u8 hw_addr[6];
    clib_error_t *error;
    vnet_sw_interface_t *sw;
    vnet_hw_interface_t *hw;
    vlib_thread_main_t *tm = vlib_get_thread_main ();
    vnet_main_t *vnm = vnet_get_main ();
    uword *p;
    uword if_index;
    u8 *host_if_name_dup = vec_dup (host_if_name);
    int host_if_index = -1;

    p = mhash_get (&apm->if_index_by_host_if_name, host_if_name);
    if (p)
    {
        apif = vec_elt_at_index (apm->interfaces, p[0]);
        *sw_if_index = apif->sw_if_index;
        return VNET_API_ERROR_IF_ALREADY_EXISTS;
    }

    vec_validate (rx_req, 0);
    rx_req->tp_block_size = AF_PACKET_RX_BLOCK_SIZE;
    rx_req->tp_frame_size = AF_PACKET_RX_FRAME_SIZE;
    rx_req->tp_block_nr = AF_PACKET_RX_BLOCK_NR;
    rx_req->tp_frame_nr = AF_PACKET_RX_FRAME_NR;

    vec_validate (tx_req, 0);
    tx_req->tp_block_size = AF_PACKET_TX_BLOCK_SIZE;
    tx_req->tp_frame_size = AF_PACKET_TX_FRAME_SIZE;
    tx_req->tp_block_nr = AF_PACKET_TX_BLOCK_NR;
    tx_req->tp_frame_nr = AF_PACKET_TX_FRAME_NR;

    /*
     * make sure host side of interface is 'UP' before binding AF_PACKET
     * socket on it.
     * 需要确保af_packet类型的接口依附的主机侧的接口是up的
     */
    if ((fd2 = socket (AF_UNIX, SOCK_DGRAM, 0)) < 0)
    {
        vlib_log_debug (apm->log_class, "Failed to create socket");
        ret = VNET_API_ERROR_SYSCALL_ERROR_1;
        goto error;
    }

    clib_memcpy (ifr.ifr_name, (const char *) host_if_name,
                 vec_len (host_if_name));
    /* 根据名字获取主机接口的索引 */
    if ((ret = ioctl (fd2, SIOCGIFINDEX, &ifr)) < 0)
    {
        vlib_log_debug (apm->log_class, "af_packet_create error: %d", ret);
        close (fd2);
        return VNET_API_ERROR_INVALID_INTERFACE;
    }

    host_if_index = ifr.ifr_ifindex;
    /* 获取主机接口的标志信息 */
    if ((ret = ioctl (fd2, SIOCGIFFLAGS, &ifr)) < 0)
    {
        vlib_log_warn (apm->log_class, "af_packet_create error: %d", ret);
        goto error;
    }
    /* 如果没有up,那么设置其为UP */
    if (!(ifr.ifr_flags & IFF_UP))
    {
        ifr.ifr_flags |= IFF_UP;
        if ((ret = ioctl (fd2, SIOCSIFFLAGS, &ifr)) < 0)
        {
            vlib_log_warn (apm->log_class, "af_packet_create error: %d", ret);
            goto error;
        }
    }

    if (fd2 > -1)
        close (fd2);
    /* 创建af_packet套接字 */
    ret = create_packet_v2_sock (host_if_index, rx_req, tx_req, &fd, &ring);

    if (ret != 0)
        goto error;

    ret = is_bridge (host_if_name);

    if (ret == 0)			/* is a bridge, ignore state */
        host_if_index = -1;

    /* So far everything looks good, let's create interface */
    pool_get (apm->interfaces, apif);
    if_index = apif - apm->interfaces;

    apif->host_if_index = host_if_index;
    apif->fd = fd;
    apif->rx_ring = ring;
    apif->tx_ring = ring + rx_req->tp_block_size * rx_req->tp_block_nr;
    apif->rx_req = rx_req;
    apif->tx_req = tx_req;
    apif->host_if_name = host_if_name_dup;
    apif->per_interface_next_index = ~0;
    apif->next_tx_frame = 0;
    apif->next_rx_frame = 0;

    if (tm->n_vlib_mains > 1)/* 添加epoll监听事件,用于模拟中断 */
        clib_spinlock_init (&apif->lockp);

    {
        clib_file_t template = { 0 };
        template.read_function = af_packet_fd_read_ready;
        template.file_descriptor = fd;
        template.private_data = if_index;
        template.flags = UNIX_FILE_EVENT_EDGE_TRIGGERED;
        template.description = format (0, "%U", format_af_packet_device_name,
                                       if_index);
        apif->clib_file_index = clib_file_add (&file_main, &template);
    }

    /*use configured or generate random MAC address */
    if (hw_addr_set)
        clib_memcpy (hw_addr, hw_addr_set, 6);
    else
    {
        f64 now = vlib_time_now (vm);
        u32 rnd;
        rnd = (u32) (now * 1e6);
        rnd = random_u32 (&rnd);

        clib_memcpy (hw_addr + 2, &rnd, sizeof (rnd));
        hw_addr[0] = 2;
        hw_addr[1] = 0xfe;
    }
    /* 注册以太网接口 */
    error = ethernet_register_interface (vnm, af_packet_device_class.index,
                                         if_index, hw_addr, &apif->hw_if_index,
                                         af_packet_eth_flag_change);

    if (error)
    {
        clib_memset (apif, 0, sizeof (*apif));
        pool_put (apm->interfaces, apif);
        vlib_log_err (apm->log_class, "Unable to register interface: %U",
                      format_clib_error, error);
        clib_error_free (error);
        ret = VNET_API_ERROR_SYSCALL_ERROR_1;
        goto error;
    }

    sw = vnet_get_hw_sw_interface (vnm, apif->hw_if_index);
    hw = vnet_get_hw_interface (vnm, apif->hw_if_index);
    apif->sw_if_index = sw->sw_if_index;
    /* 设置该接口对应的输入节点索引 */
    vnet_hw_interface_set_input_node (vnm, apif->hw_if_index,
                                      af_packet_input_node.index);
    /* 分配该接口到指定的收包线程中 */
    vnet_hw_interface_assign_rx_thread (vnm, apif->hw_if_index, 0,	/* queue */
                                        ~0 /* any cpu */ );
    /* 接口支持中断模式 */
    hw->flags |= VNET_HW_INTERFACE_FLAG_SUPPORTS_INT_MODE;
    vnet_hw_interface_set_flags (vnm, apif->hw_if_index,
                                 VNET_HW_INTERFACE_FLAG_LINK_UP);
    /* 设置节点运行状态 */
    vnet_hw_interface_set_rx_mode (vnm, apif->hw_if_index, 0,
                                   VNET_HW_INTERFACE_RX_MODE_INTERRUPT);

    mhash_set_mem (&apm->if_index_by_host_if_name, host_if_name_dup, &if_index,
                   0);
    if (sw_if_index)
        *sw_if_index = apif->sw_if_index;

    return 0;

error:
    if (fd2 > -1)
        close (fd2);
    vec_free (host_if_name_dup);
    vec_free (rx_req);
    vec_free (tx_req);
    return ret;
}

通过命令行强制指定网卡队列处理线程

可以使用如下命名强制指定网卡的队列所在的工作线程:

set interface rx-placement <interface> [queue <n>]

可以使用如下命令查看当前网卡与队列之间的关系:

show interface rx-placement

具体命令实现

/*?
 * This command is used to assign a given interface, and optionally a
 * given queue, to a different thread. If the '<em>queue</em>' is not provided,
 * it defaults to 0. The '<em>worker</em>' parameter is zero based and the index
 * in the thread name, for example, 0 in the thread name '<em>vpp_wk_0</em>'.
 *
 * @cliexpar
 * Example of how to display the interface placement:
 * @cliexstart{show interface rx-placement}
 * Thread 1 (vpp_wk_0):
 *   node dpdk-input:
 *     GigabitEthernet7/0/0 queue 0 (polling)
 *   node vhost-user-input:
 *     VirtualEthernet0/0/12 queue 0 (polling)
 *     VirtualEthernet0/0/12 queue 2 (polling)
 *     VirtualEthernet0/0/13 queue 0 (polling)
 *     VirtualEthernet0/0/13 queue 2 (polling)
 * Thread 2 (vpp_wk_1):
 *   node dpdk-input:
 *     GigabitEthernet7/0/1 queue 0 (polling)
 *   node vhost-user-input:
 *     VirtualEthernet0/0/12 queue 1 (polling)
 *     VirtualEthernet0/0/12 queue 3 (polling)
 *     VirtualEthernet0/0/13 queue 1 (polling)
 *     VirtualEthernet0/0/13 queue 3 (polling)
 * @cliexend
 * Example of how to assign a interface and queue to a worker thread:
 * @cliexcmd{set interface rx-placement VirtualEthernet0/0/12 queue 1 worker 0}
 * Example of how to display the interface placement:
 * @cliexstart{show interface rx-placement}
 * Thread 1 (vpp_wk_0):
 *   node dpdk-input:
 *     GigabitEthernet7/0/0 queue 0 (polling)
 *   node vhost-user-input:
 *     VirtualEthernet0/0/12 queue 0 (polling)
 *     VirtualEthernet0/0/12 queue 1 (polling)
 *     VirtualEthernet0/0/12 queue 2 (polling)
 *     VirtualEthernet0/0/13 queue 0 (polling)
 *     VirtualEthernet0/0/13 queue 2 (polling)
 * Thread 2 (vpp_wk_1):
 *   node dpdk-input:
 *     GigabitEthernet7/0/1 queue 0 (polling)
 *   node vhost-user-input:
 *     VirtualEthernet0/0/12 queue 3 (polling)
 *     VirtualEthernet0/0/13 queue 1 (polling)
 *     VirtualEthernet0/0/13 queue 3 (polling)
 * @cliexend
?*/
/* *INDENT-OFF* */
VLIB_CLI_COMMAND (cmd_set_if_rx_placement,static) =
{
    .path = "set interface rx-placement",
    .short_help = "set interface rx-placement <interface> [queue <n>] "
    "[worker <n> | main]",
    .function = set_interface_rx_placement,
    .is_mp_safe = 1,
};
/* *INDENT-ON* */


/*?
 * This command is used to display the interface and queue worker
 * thread placement.
 *
 * @cliexpar
 * Example of how to display the interface placement:
 * @cliexstart{show interface rx-placement}
 * Thread 1 (vpp_wk_0):
 *   node dpdk-input:
 *     GigabitEthernet7/0/0 queue 0 (polling)
 *   node vhost-user-input:
 *     VirtualEthernet0/0/12 queue 0 (polling)
 *     VirtualEthernet0/0/12 queue 2 (polling)
 *     VirtualEthernet0/0/13 queue 0 (polling)
 *     VirtualEthernet0/0/13 queue 2 (polling)
 * Thread 2 (vpp_wk_1):
 *   node dpdk-input:
 *     GigabitEthernet7/0/1 queue 0 (polling)
 *   node vhost-user-input:
 *     VirtualEthernet0/0/12 queue 1 (polling)
 *     VirtualEthernet0/0/12 queue 3 (polling)
 *     VirtualEthernet0/0/13 queue 1 (polling)
 *     VirtualEthernet0/0/13 queue 3 (polling)
 * @cliexend
?*/
/* *INDENT-OFF* */
VLIB_CLI_COMMAND (show_interface_rx_placement, static) =
{
    .path = "show interface rx-placement",
    .short_help = "show interface rx-placement",
    .function = show_interface_rx_placement_fn,
};
/* *INDENT-ON* */

clib_error_t *
set_hw_interface_rx_placement (u32 hw_if_index, u32 queue_id,
                               u32 thread_index, u8 is_main)
{
    vnet_main_t *vnm = vnet_get_main ();
    vnet_device_main_t *vdm = &vnet_device_main;
    clib_error_t *error = 0;
    vnet_hw_interface_rx_mode mode = VNET_HW_INTERFACE_RX_MODE_UNKNOWN;
    int rv;

    if (is_main)
        thread_index = 0;
    else
        thread_index += vdm->first_worker_thread_index;

    if (thread_index > vdm->last_worker_thread_index)
        return clib_error_return (0,
                                  "please specify valid worker thread or main");

    rv = vnet_hw_interface_get_rx_mode (vnm, hw_if_index, queue_id, &mode);

    if (rv)
        return clib_error_return (0, "not found");
    //取消原先的分配 
    rv = vnet_hw_interface_unassign_rx_thread (vnm, hw_if_index, queue_id);

    if (rv)
        return clib_error_return (0, "not found");
    //重新分配
    vnet_hw_interface_assign_rx_thread (vnm, hw_if_index, queue_id,
                                        thread_index);
    vnet_hw_interface_set_rx_mode (vnm, hw_if_index, queue_id, mode);

    return (error);
}


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM