00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00021 #include "packetproc-data.h"
00022
00029 #define OPT_CREATE_LOOPS 1
00030 #define OPT_CREATE_LOOP_ON_CPU 2
00031 #define OPT_CREATE_THREAD 3
00032
00033 #define CREATE_LOOP_OPT OPT_CREATE_LOOPS
00034
00036 packet_thread_t packet_thrd[MSP_MAX_CPUS];
00037
00044 static void
00045 data_thread_quit (int signo UNUSED)
00046 {
00047 logging(LOG_INFO, "%s: Thread exiting...", __func__);
00048
00049
00050 pthread_exit(NULL);
00051 }
00052
00064 static void
00065 packet_process (struct jbuf *jb, packet_thread_t *thrd)
00066 {
00067 struct ip *iph = NULL;
00068 struct jbuf *jb_tmp = NULL;
00069 char src_str[INET_ADDRSTRLEN];
00070 char dst_str[INET_ADDRSTRLEN];
00071 struct in_addr tmp_addr;
00072 int ret = MSP_OK;
00073 char in_vrf_name[64];
00074 int count = 0;
00075
00076
00077 jb_tmp = jbuf_pullup(jb, sizeof(struct ip));
00078 if (jb_tmp != NULL) {
00079 jb = jb_tmp;
00080 }
00081
00082 iph = jbuf_to_d(jb, struct ip *);
00083
00084 if (vrf_getvrfnamebyindex(jbuf_getvrf(jb), AF_INET, in_vrf_name, 64) < 0) {
00085 logging(LOG_ERR, "%s: Get input VRF name ERROR! idx: %d",
00086 __func__, jbuf_getvrf(jb));
00087 }
00088 inet_ntop(AF_INET, &iph->ip_src, src_str, INET_ADDRSTRLEN);
00089 inet_ntop(AF_INET, &iph->ip_dst, dst_str, INET_ADDRSTRLEN);
00090
00091 logging(LOG_INFO, "%s: %s.%d %s -> %s",
00092 __func__, in_vrf_name, jbuf_get_rcv_subunit(jb), src_str, dst_str);
00093
00094 switch (jbuf_get_rcv_subunit(jb)) {
00095 case 0:
00096
00097 jbuf_set_xmit_subunit(jb, 0);
00098 tmp_addr = iph->ip_src;
00099 iph->ip_src = iph->ip_dst;
00100 iph->ip_dst = tmp_addr;
00101 break;
00102 case 1:
00103
00104 jbuf_set_xmit_subunit(jb, 2);
00105 break;
00106 case 2:
00107
00108 jbuf_set_xmit_subunit(jb, 1);
00109 break;
00110 default:
00111 logging(LOG_ERR, "%s: Don't support traffic from unit %d.",
00112 __func__, jbuf_get_rcv_subunit(jb));
00113
00114 jbuf_free(jb);
00115 return;
00116 }
00117
00118 count = 100;
00119 if (thrd->thrd_data_hdl) {
00120 do {
00121 ret = msp_data_send(thrd->thrd_data_hdl, jb,
00122 MSP_MSG_TYPE_PACKET);
00123 } while((ret == MSP_DATA_SEND_RETRY) && (count-- > 0));
00124 } else if (thrd->thrd_fifo_hdl) {
00125 do {
00126 ret = msp_fifo_send(thrd->thrd_fifo_hdl, jb,
00127 MSP_MSG_TYPE_PACKET);
00128 } while((ret == MSP_DATA_SEND_RETRY) && (count-- > 0));
00129 } else {
00130 logging(LOG_ERR, "%s: Handler ERROR!", __func__);
00131 }
00132
00133 if(ret != MSP_OK) {
00134 logging(LOG_ERR, "%s: Send data ERROR! err: %d",
00135 __func__, ret);
00136 jbuf_free(jb);
00137 }
00138 return;
00139 }
00140
00141 #if ((CREATE_LOOP_OPT == OPT_CREATE_LOOPS) || \
00142 (CREATE_LOOP_OPT == OPT_CREATE_LOOP_ON_CPU))
00143
00150 static void *
00151 packet_loop (msp_dataloop_args_t *arg)
00152 {
00153 msp_data_handle_t data_hdl = arg->dhandle;
00154 int cpu_num = arg->dloop_number;
00155 int type;
00156 void *ptr;
00157 sigset_t sig_mask;
00158
00159
00160
00161 sigemptyset(&sig_mask);
00162 sigaddset(&sig_mask, SIGTERM);
00163 pthread_sigmask(SIG_BLOCK, &sig_mask, NULL);
00164
00165 logging(LOG_INFO, "%s: data handle: 0x%x, CPU num: %d",
00166 __func__, data_hdl, cpu_num);
00167
00168 packet_thrd[cpu_num].thrd_cpu = cpu_num;
00169 packet_thrd[cpu_num].thrd_data_hdl = data_hdl;
00170 packet_thrd[cpu_num].thrd_fifo_hdl = 0;
00171 packet_thrd[cpu_num].thrd_tid = pthread_self();
00172 while (1) {
00173 if ((ptr = msp_data_recv(data_hdl, &type)) != NULL) {
00174 if (type == MSP_MSG_TYPE_PACKET) {
00175 packet_process((struct jbuf *)ptr, &packet_thrd[cpu_num]);
00176 }
00177 }
00178 }
00179
00180 return NULL;
00181 }
00182 #endif
00183
00184
00185 #if (CREATE_LOOP_OPT == OPT_CREATE_LOOP_ON_CPU)
00186
00194 static int
00195 create_loops (void)
00196 {
00197 int cpu_num;
00198 msp_dataloop_params_t loop_option;
00199 msp_dataloop_result_t loop_result;
00200 int ret;
00201
00202 bzero(&loop_option, sizeof(loop_option));
00203 bzero(&loop_result, sizeof(loop_result));
00204
00205 cpu_num = MSP_NEXT_NONE;
00206 while (1) {
00207 cpu_num = msp_env_get_next_data_cpu(cpu_num);
00208 if (cpu_num == MSP_NEXT_END) {
00209 break;
00210 }
00211
00212
00213 packet_thrd[cpu_num].thrd_user_data = cpu_num;
00214 loop_option.app_data = &packet_thrd[cpu_num].thrd_user_data;
00215
00216
00217 ret = msp_data_create_loop_on_cpu(cpu_num, packet_loop,
00218 &loop_option, &loop_result);
00219 if (ret != MSP_OK) {
00220 logging(LOG_ERR,
00221 "%s: Create loop on CPU %d ERROR! err: %d",
00222 __func__, cpu_num, ret);
00223 }
00224 }
00225 return MSP_OK;
00226 }
00227 #endif
00228
00229 #if (CREATE_LOOP_OPT == OPT_CREATE_THREAD)
00230
00236 static void *
00237 packet_loop_thread (packet_thread_t *thrd)
00238 {
00239 int type;
00240 void *ptr;
00241 sigset_t sig_mask;
00242
00243
00244
00245 sigemptyset(&sig_mask);
00246 sigaddset(&sig_mask, SIGTERM);
00247 pthread_sigmask(SIG_BLOCK, &sig_mask, NULL);
00248
00249 logging(LOG_INFO, "%s: FIFO handle: 0x%x, cpu: %d",
00250 __func__, thrd->thrd_fifo_hdl, thrd->thrd_cpu);
00251
00252 while (1) {
00253 if((ptr = msp_fifo_recv(thrd->thrd_fifo_hdl, &type)) != NULL) {
00254 if (type == MSP_MSG_TYPE_PACKET) {
00255 packet_process((struct jbuf *)ptr, thrd);
00256 }
00257 }
00258 }
00259
00260 return NULL;
00261 }
00262
00263
00270 static int
00271 create_threads (void)
00272 {
00273 msp_fifo_create_t fifo_create;
00274 pthread_attr_t attr;
00275 int ret = MSP_OK;
00276 int cpu_num;
00277
00278 cpu_num = MSP_NEXT_NONE;
00279 while (1) {
00280
00281 cpu_num = msp_env_get_next_data_cpu(cpu_num);
00282 if (cpu_num == MSP_NEXT_END) {
00283 break;
00284 }
00285
00286
00287 bzero(&fifo_create, sizeof(fifo_create));
00288 fifo_create.fifo_depth = MSP_FIFO_DEFAULT_DEPTH;
00289 ret = msp_fifo_create_fifo(cpu_num, &fifo_create);
00290 if(ret != MSP_OK) {
00291 logging(LOG_ERR,
00292 "%s: Create FIFO for CPU %d ERROR! err: %d",
00293 __func__, cpu_num, ret);
00294 continue;
00295 }
00296
00297 packet_thrd[cpu_num].thrd_fifo_hdl = fifo_create.fhandle;
00298 packet_thrd[cpu_num].thrd_data_hdl = 0;
00299 packet_thrd[cpu_num].thrd_cpu = cpu_num;
00300
00301 pthread_attr_init(&attr);
00302 if (pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM)) {
00303 logging(LOG_ERR,
00304 "%s: pthread_attr_setscope() on CPU %d ERROR!",
00305 __func__, cpu_num);
00306 } else if (pthread_attr_setcpuaffinity_np(&attr, cpu_num)) {
00307 logging(LOG_ERR,
00308 "%s: pthread_attr_setcpuaffinity_np() on CPU %d ERROR!",
00309 __func__, cpu_num);
00310 } else if (pthread_create(&packet_thrd[cpu_num].thrd_tid, &attr,
00311 (void *)&packet_loop_thread, &packet_thrd[cpu_num])) {
00312 logging(LOG_ERR,
00313 "%s: pthread_create() on CPU %d ERROR!",
00314 __func__, cpu_num);
00315 } else if (msp_fifo_register_pthread(packet_thrd[cpu_num].thrd_fifo_hdl,
00316 packet_thrd[cpu_num].thrd_tid)) {
00317 logging(LOG_ERR,
00318 "%s: msp_fifo_register_pthread() on CPU %d ERROR!",
00319 __func__, cpu_num);
00320 }
00321 }
00322 return MSP_OK;
00323 }
00324 #endif
00325
00326
00333 int
00334 init_packet_loop(void)
00335 {
00336 int ret;
00337 sigset_t sig_mask;
00338
00339 logging(LOG_INFO, "%s: Start creating data loops.", __func__);
00340
00341 bzero(packet_thrd, sizeof(packet_thrd));
00342
00343
00344
00345 signal(SIGUSR1, data_thread_quit);
00346
00347 #if (CREATE_LOOP_OPT == OPT_CREATE_LOOPS)
00348
00349
00350
00351
00352 ret = msp_data_create_loops(packet_loop, NULL);
00353 #elif (CREATE_LOOP_OPT == OPT_CREATE_LOOP_ON_CPU)
00354 ret = create_loops();
00355 #elif (CREATE_LOOP_OPT == OPT_CREATE_THREAD)
00356 ret = create_threads();
00357 #endif
00358
00359 if(ret != MSP_OK) {
00360 logging(LOG_ERR, "%s: Create loops on all data CPUs ERROR! err: %d",
00361 __func__, ret);
00362 } else {
00363
00364 sigemptyset(&sig_mask);
00365 sigaddset(&sig_mask, SIGUSR1);
00366 pthread_sigmask(SIG_BLOCK, &sig_mask, NULL);
00367 }
00368
00369 return ret;
00370 }
00371