00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00021 #include "equilibrium-data_main.h"
00022 #include <sys/socket.h>
00023 #include <unistd.h>
00024 #include <netinet/in_systm.h>
00025 #include <netinet/ip.h>
00026 #include <netinet/tcp.h>
00027 #include <sys/jnx/jbuf.h>
00028 #include "equilibrium-data_config.h"
00029 #include "equilibrium-data_monitor.h"
00030 #include "equilibrium-data_packet.h"
00031
00032
00033
00034
00035 #define EQ_SHARED_MEM_NAME "equilibrium-data arena"
00036
00037 #define EQ_SESSION_TABLE_NAME "equilibrium session table"
00038
00039 #define EQ_SESSION_ENTRY_NAME "equilibrium session entry"
00040
00041 #define SESSION_AGE_CHECK_INTERVAL 15
00042
00043 #define MAX_MSP_SEND_RETRIES 100
00044
00045
00048 #define IP_NEEDED_BYTES (sizeof(struct ip))
00049
00053 #define TCP_NEEDED_BYTES (sizeof(struct ip) + sizeof(struct tcphdr))
00054
00058 #define SESSIONS_BUCKET_COUNT (512 * 1024)
00059
00063 const uint32_t HASH_MASK = SESSIONS_BUCKET_COUNT - 1;
00064
00065
00066
00067
00071 typedef struct session_entry_s {
00072 struct session_entry_s * reverse;
00073 in_addr_t saddr;
00074 in_addr_t daddr;
00075 uint16_t sport;
00076 uint16_t dport;
00077 uint8_t cpu;
00078 uint8_t dir;
00079 in_addr_t faddr;
00080 time_t age_ts;
00081 uint16_t ss_id;
00082 uint16_t frag_group;
00083 msp_spinlock_t lock;
00084 TAILQ_ENTRY(session_entry_s) entries;
00085 } session_entry_t;
00086
00087
00091 typedef TAILQ_HEAD(ht_bucket_list_s, session_entry_s) ht_bucket_list_t;
00092
00093
00097 typedef struct hash_bucket_s {
00098 msp_spinlock_t bucket_lock;
00099 ht_bucket_list_t bucket_entries;
00100 } hash_bucket_t;
00101
00102
00106 typedef struct hashtable_s {
00107 hash_bucket_t hash_bucket[SESSIONS_BUCKET_COUNT];
00108 } hashtable_t;
00109
00110
00111 static evTimerID aging_timer;
00112 static msp_shm_handle_t shm_handle;
00113 static msp_oc_handle_t table_handle;
00114 static msp_oc_handle_t entry_handle;
00115 static hashtable_t * sessions_table;
00116 static atomic_uint_t loops_running;
00117 static volatile uint8_t do_shutdown;
00118 static uint32_t obj_cache_id;
00119
00120
00121
00122
00139 static void
00140 aging_cleanup(evContext ctx __unused,
00141 void * uap __unused,
00142 struct timespec due __unused,
00143 struct timespec inter __unused)
00144 {
00145 const time_t down_server_flow_duration = 60;
00146 const time_t non_app_flow_duration = 300;
00147
00148 uint32_t i, cpu;
00149 hash_bucket_t * bucket;
00150 session_entry_t * session, * next;
00151 time_t current_time, down_server_flow_timeout, non_app_flow_timeout;
00152
00153 cpu = msp_get_current_cpu();
00154
00155 current_time = get_current_time();
00156 down_server_flow_timeout = current_time - down_server_flow_duration;
00157 non_app_flow_timeout = current_time - non_app_flow_duration;
00158
00159 for(i = 0; i < SESSIONS_BUCKET_COUNT; ++i) {
00160
00161 bucket = &sessions_table->hash_bucket[i];
00162
00163
00164 INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
00165
00166 session = TAILQ_FIRST(&bucket->bucket_entries);
00167
00168 while(session != NULL) {
00169
00170
00171 next = TAILQ_NEXT(session, entries);
00172
00173
00174 INSIST_ERR(msp_spinlock_lock(&session->lock) == MSP_OK);
00175
00176
00177
00178 if(session->faddr == 0) {
00179
00180 if(session->age_ts < non_app_flow_timeout) {
00181
00182 TAILQ_REMOVE(&bucket->bucket_entries, session, entries);
00183 msp_objcache_free(entry_handle, session, cpu, obj_cache_id);
00184 session = next;
00185 continue;
00186 }
00187 } else if(session->faddr == (in_addr_t)-1) {
00188
00189 if(session->age_ts < down_server_flow_timeout) {
00190
00191 TAILQ_REMOVE(&bucket->bucket_entries, session, entries);
00192 msp_objcache_free(entry_handle, session, cpu, obj_cache_id);
00193 session = next;
00194 continue;
00195 }
00196 } else {
00197
00198
00199 if(session->dir == JBUF_PACKET_DIR_INGRESS) {
00200 if(session->age_ts +
00201 get_app_session_timeout(session->ss_id,
00202 session->faddr, session->sport) < current_time) {
00203
00204 TAILQ_REMOVE(&bucket->bucket_entries, session, entries);
00205 msp_objcache_free(entry_handle, session, cpu, obj_cache_id);
00206 session = next;
00207 continue;
00208 }
00209 } else {
00210 if(session->age_ts +
00211 get_app_session_timeout(session->ss_id,
00212 session->daddr, session->dport) < current_time) {
00213
00214
00215 monitor_remove_session_for_server(session->ss_id,
00216 session->daddr, session->dport, session->faddr);
00217
00218 TAILQ_REMOVE(&bucket->bucket_entries, session, entries);
00219 msp_objcache_free(entry_handle, session, cpu, obj_cache_id);
00220 session = next;
00221 continue;
00222 }
00223 }
00224 }
00225
00226
00227 INSIST_ERR(msp_spinlock_unlock(&session->lock) == MSP_OK);
00228 session = next;
00229 }
00230
00231
00232 INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
00233 }
00234
00235 monitor_send_stats();
00236
00237 msp_objcache_reclaim(shm_handle);
00238 }
00239
00240
00263 static void
00264 checksum_adjust(
00265 unsigned char * chksum,
00266 unsigned char * optr,
00267 int olen,
00268 unsigned char * nptr,
00269 int nlen)
00270 {
00271 long x, old, new_;
00272 x=chksum[0]*256+chksum[1];
00273 x=~x & 0xFFFF;
00274 while (olen)
00275 {
00276 old=optr[0]*256+optr[1]; optr+=2;
00277 x-=old & 0xffff;
00278 if (x<=0) { x--; x&=0xffff; }
00279 olen-=2;
00280 }
00281 while (nlen)
00282 {
00283 new_=nptr[0]*256+nptr[1]; nptr+=2;
00284 x+=new_ & 0xffff;
00285 if (x & 0x10000) { x++; x&=0xffff; }
00286 nlen-=2;
00287 }
00288 x=~x & 0xFFFF;
00289 chksum[0]=x/256; chksum[1]=x & 0xff;
00290 }
00291
00292
00307 static status_t
00308 pullup_bytes(struct jbuf ** pkt_buf, uint16_t num_bytes)
00309 {
00310 struct jbuf * tmp_buf;
00311
00312 if(jbuf_particle_get_data_length(*pkt_buf) < num_bytes) {
00313 tmp_buf = jbuf_pullup((*pkt_buf), num_bytes);
00314
00315 if(!tmp_buf) {
00316 return EFAIL;
00317 }
00318
00319 *pkt_buf = tmp_buf;
00320 }
00321 return SUCCESS;
00322 }
00323
00324
00344 static status_t
00345 process_packet(struct ip * ip_pkt,
00346 jbuf_svc_set_info_t * ss_info,
00347 int cpu)
00348 {
00349 uint32_t hash;
00350 uint16_t * key;
00351 int len;
00352 in_addr_t facade, real;
00353 hash_bucket_t * bucket;
00354 session_entry_t * session;
00355 struct tcphdr * tcp_hdr =
00356 (struct tcphdr *)((uint32_t *)ip_pkt + ip_pkt->ip_hl);
00357
00358 len = ip_pkt->ip_len - (ip_pkt->ip_hl * 4);
00359
00360
00361 len -= tcp_hdr->th_off * 4;
00362
00363 if(tcp_hdr->th_off < 5 || len < 0) {
00364 LOG(LOG_ERR, "%s: Found a malformed TCP header in a packet.",
00365 __func__);
00366 return EFAIL;
00367 }
00368
00369
00370
00371 key = (uint16_t *)&ip_pkt->ip_src;
00372 hash = 0x5F5F;
00373 hash = ((hash << 5) + hash) ^ key[0];
00374 hash = ((hash << 5) + hash) ^ key[1];
00375 hash = ((hash << 5) + hash) ^ key[2];
00376 hash = ((hash << 5) + hash) ^ key[3];
00377
00378 hash &= HASH_MASK;
00379
00380
00381
00382 bucket = &sessions_table->hash_bucket[hash];
00383
00384
00385 INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
00386
00387 session = TAILQ_FIRST(&bucket->bucket_entries);
00388
00389 while(session != NULL) {
00390 if(session->saddr == ip_pkt->ip_src.s_addr
00391 && session->daddr == ip_pkt->ip_dst.s_addr
00392 && session->ss_id == ss_info->info.intf_type.svc_set_id
00393 && session->dport == tcp_hdr->th_dport
00394 && session->sport == tcp_hdr->th_sport) {
00395
00396 break;
00397 }
00398 session = TAILQ_NEXT(session, entries);
00399 }
00400
00401
00402 if(session == NULL) {
00403 if(ss_info->pkt_dir == JBUF_PACKET_DIR_INGRESS) {
00404
00405
00406 INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
00407 return SUCCESS;
00408 }
00409
00410
00411 session = msp_objcache_alloc(entry_handle, cpu, obj_cache_id);
00412 if(session == NULL) {
00413
00414 INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
00415
00416 LOG(LOG_ERR, "%s: Failed to allocate object cache for a "
00417 "session entry", __func__);
00418 return EFAIL;
00419 }
00420 session->reverse = msp_objcache_alloc(entry_handle, cpu, obj_cache_id);
00421 if(session->reverse == NULL) {
00422
00423 msp_objcache_free(entry_handle, session, cpu, obj_cache_id);
00424
00425
00426 INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
00427
00428 LOG(LOG_ERR, "%s: Failed to allocate object cache for a "
00429 "session reverse entry", __func__);
00430 return EFAIL;
00431 }
00432
00433
00434 msp_spinlock_init(&session->lock);
00435 INSIST_ERR(msp_spinlock_lock(&session->lock) == MSP_OK);
00436
00437 TAILQ_INSERT_HEAD(&bucket->bucket_entries, session, entries);
00438
00439 session->age_ts = get_current_time();
00440 session->saddr = ip_pkt->ip_src.s_addr;
00441 session->daddr = ip_pkt->ip_dst.s_addr;
00442 session->sport = tcp_hdr->th_sport;
00443 session->dport = tcp_hdr->th_dport;
00444 session->ss_id = ss_info->info.intf_type.svc_set_id;
00445 session->dir = JBUF_PACKET_DIR_EGRESS;
00446 session->cpu = cpu;
00447
00448
00449
00450 INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
00451
00452
00453
00454
00455 msp_spinlock_init(&session->reverse->lock);
00456 INSIST_ERR(msp_spinlock_lock(&session->reverse->lock) == MSP_OK);
00457
00458 session->reverse->age_ts = session->age_ts;
00459 session->reverse->saddr = ip_pkt->ip_dst.s_addr;
00460 session->reverse->daddr = ip_pkt->ip_src.s_addr;
00461 session->reverse->sport = tcp_hdr->th_dport;
00462 session->reverse->dport = tcp_hdr->th_sport;
00463 session->reverse->ss_id = ss_info->info.intf_type.svc_set_id;
00464 session->reverse->reverse = session;
00465 session->reverse->dir = JBUF_PACKET_DIR_INGRESS;
00466 session->reverse->cpu = cpu;
00467
00468
00469
00470
00471 session->faddr = monitor_get_server_for(
00472 session->ss_id, session->daddr, session->dport);
00473
00474 if(session->faddr == (in_addr_t)-1) {
00475
00476 session->reverse->faddr = (in_addr_t)-1;
00477 session->reverse->saddr = ip_pkt->ip_dst.s_addr;
00478 } else if(session->faddr == 0) {
00479
00480 session->reverse->faddr = 0;
00481 session->reverse->saddr = ip_pkt->ip_dst.s_addr;
00482 } else {
00483
00484
00485
00486 session->reverse->faddr = ip_pkt->ip_dst.s_addr;
00487
00488
00489 session->reverse->saddr = session->faddr;
00490 }
00491
00492 session->reverse->frag_group = 0;
00493
00494
00495 if((ntohs(ip_pkt->ip_off) & IP_MF)) {
00496 session->frag_group = ip_pkt->ip_id;
00497 } else {
00498 session->frag_group = 0;
00499 }
00500
00501
00502
00503 hash = 0x5F5F;
00504 key = (uint16_t *)&session->reverse->saddr;
00505 hash = ((hash << 5) + hash) ^ key[0];
00506 hash = ((hash << 5) + hash) ^ key[1];
00507 hash = ((hash << 5) + hash) ^ key[2];
00508 hash = ((hash << 5) + hash) ^ key[3];
00509 hash &= HASH_MASK;
00510
00511
00512 bucket = &sessions_table->hash_bucket[hash];
00513
00514
00515 INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
00516
00517 TAILQ_INSERT_HEAD(&bucket->bucket_entries, session->reverse, entries);
00518
00519
00520 INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
00521
00522 if(session->faddr == 0) {
00523
00524 INSIST_ERR(msp_spinlock_unlock(&session->lock) == MSP_OK);
00525 INSIST_ERR(msp_spinlock_unlock(&session->reverse->lock) == MSP_OK);
00526 return SUCCESS;
00527 }
00528 if(session->faddr == (in_addr_t)-1) {
00529
00530 INSIST_ERR(msp_spinlock_unlock(&session->lock) == MSP_OK);
00531 INSIST_ERR(msp_spinlock_unlock(&session->reverse->lock) == MSP_OK);
00532
00533 LOG(LOG_ERR, "%s: Dropping packet found to an application with no "
00534 "servers up.", __func__);
00535 return EFAIL;
00536 }
00537
00538
00539
00540
00541
00542 checksum_adjust((unsigned char *)&ip_pkt->ip_sum,
00543 (unsigned char *)&ip_pkt->ip_dst, sizeof(in_addr_t),
00544 (unsigned char *)&session->faddr, sizeof(in_addr_t));
00545
00546
00547 checksum_adjust((unsigned char *)&tcp_hdr->th_sum,
00548 (unsigned char *)&ip_pkt->ip_dst, sizeof(in_addr_t),
00549 (unsigned char *)&session->faddr, sizeof(in_addr_t));
00550
00551
00552 facade = ip_pkt->ip_dst.s_addr;
00553 real = ip_pkt->ip_dst.s_addr = session->faddr;
00554
00555
00556 INSIST_ERR(msp_spinlock_unlock(&session->lock) == MSP_OK);
00557 INSIST_ERR(msp_spinlock_unlock(&session->reverse->lock) == MSP_OK);
00558
00559 return SUCCESS;
00560 }
00561
00562
00563
00564
00565
00566 INSIST_ERR(msp_spinlock_lock(&session->lock) == MSP_OK);
00567
00568
00569 INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
00570
00571
00572 if((ntohs(ip_pkt->ip_off) & IP_MF)) {
00573 session->frag_group = ip_pkt->ip_id;
00574 } else {
00575 session->frag_group = 0;
00576 }
00577
00578 if(session->faddr == (in_addr_t)-1) {
00579
00580
00581
00582 INSIST_ERR(msp_spinlock_unlock(&session->lock) == MSP_OK);
00583
00584 LOG(LOG_ERR, "%s: Dropping packet found to an application with no "
00585 "servers up.", __func__);
00586
00587
00588 return EFAIL;
00589 }
00590
00591 session->age_ts = get_current_time();
00592
00593 if(session->faddr != 0) {
00594 if(ss_info->pkt_dir == JBUF_PACKET_DIR_INGRESS) {
00595
00596
00597
00598
00599 checksum_adjust((unsigned char *)&ip_pkt->ip_sum,
00600 (unsigned char *)&ip_pkt->ip_src, sizeof(in_addr_t),
00601 (unsigned char *)&session->faddr, sizeof(in_addr_t));
00602
00603
00604 checksum_adjust((unsigned char *)&tcp_hdr->th_sum,
00605 (unsigned char *)&ip_pkt->ip_src, sizeof(in_addr_t),
00606 (unsigned char *)&session->faddr, sizeof(in_addr_t));
00607
00608
00609 ip_pkt->ip_src.s_addr = session->faddr;
00610 } else {
00611
00612
00613
00614
00615 checksum_adjust((unsigned char *)&ip_pkt->ip_sum,
00616 (unsigned char *)&ip_pkt->ip_dst, sizeof(in_addr_t),
00617 (unsigned char *)&session->faddr, sizeof(in_addr_t));
00618
00619
00620 checksum_adjust((unsigned char *)&tcp_hdr->th_sum,
00621 (unsigned char *)&ip_pkt->ip_dst, sizeof(in_addr_t),
00622 (unsigned char *)&session->faddr, sizeof(in_addr_t));
00623
00624
00625 facade = ip_pkt->ip_dst.s_addr;
00626 real = ip_pkt->ip_dst.s_addr = session->faddr;
00627
00628
00629 INSIST_ERR(msp_spinlock_unlock(&session->lock) == MSP_OK);
00630
00631 return SUCCESS;
00632 }
00633 }
00634
00635
00636 INSIST_ERR(msp_spinlock_unlock(&session->lock) == MSP_OK);
00637
00638 return SUCCESS;
00639 }
00640
00641
00655 static status_t
00656 process_fragment(struct ip * ip_pkt, jbuf_svc_set_info_t * ss_info)
00657 {
00658 uint32_t hash;
00659 uint16_t * key;
00660 hash_bucket_t * bucket;
00661 session_entry_t * session;
00662
00663
00664
00665 key = (uint16_t *)&ip_pkt->ip_src;
00666 hash = 0x5F5F;
00667 hash = ((hash << 5) + hash) ^ key[0];
00668 hash = ((hash << 5) + hash) ^ key[1];
00669 hash = ((hash << 5) + hash) ^ key[2];
00670 hash = ((hash << 5) + hash) ^ key[3];
00671
00672 hash &= HASH_MASK;
00673
00674
00675
00676 bucket = &sessions_table->hash_bucket[hash];
00677
00678
00679 INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
00680
00681 session = TAILQ_FIRST(&bucket->bucket_entries);
00682
00683 while(session != NULL) {
00684 if(session->saddr == ip_pkt->ip_src.s_addr
00685 && session->daddr == ip_pkt->ip_dst.s_addr
00686 && session->ss_id == ss_info->info.intf_type.svc_set_id
00687 && session->frag_group == ip_pkt->ip_id) {
00688
00689 break;
00690 }
00691 session = TAILQ_NEXT(session, entries);
00692 }
00693
00694
00695 if(session == NULL) {
00696
00697 INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
00698
00699 LOG(LOG_WARNING, "%s: Received a packet from %s. It is an IP "
00700 "fragment, but we have not yet received the first "
00701 "fragment, so we cannot tell if it belongs to an "
00702 "equilibrium application",
00703 __func__, inet_ntoa(ip_pkt->ip_src));
00704
00705 return SUCCESS;
00706 }
00707
00708
00709
00710
00711
00712 INSIST_ERR(msp_spinlock_lock(&session->lock) == MSP_OK);
00713
00714
00715 INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
00716
00717 if(session->faddr == (in_addr_t)-1) {
00718
00719
00720
00721 INSIST_ERR(msp_spinlock_unlock(&session->lock) == MSP_OK);
00722
00723 LOG(LOG_ERR, "%s: Dropping packet found to an application with no "
00724 "servers up.", __func__);
00725
00726
00727 return EFAIL;
00728 }
00729
00730 session->age_ts = get_current_time();
00731
00732 if(session->faddr != 0) {
00733 if(ss_info->pkt_dir == JBUF_PACKET_DIR_INGRESS) {
00734
00735
00736
00737
00738 checksum_adjust((unsigned char *)&ip_pkt->ip_sum,
00739 (unsigned char *)&ip_pkt->ip_src, sizeof(in_addr_t),
00740 (unsigned char *)&session->faddr, sizeof(in_addr_t));
00741
00742
00743 ip_pkt->ip_src.s_addr = session->faddr;
00744 } else {
00745
00746
00747
00748
00749 checksum_adjust((unsigned char *)&ip_pkt->ip_sum,
00750 (unsigned char *)&ip_pkt->ip_dst, sizeof(in_addr_t),
00751 (unsigned char *)&session->faddr, sizeof(in_addr_t));
00752
00753
00754 ip_pkt->ip_dst.s_addr = session->faddr;
00755 }
00756 }
00757
00758
00759 INSIST_ERR(msp_spinlock_unlock(&session->lock) == MSP_OK);
00760
00761 return SUCCESS;
00762 }
00763
00764
00778 static int
00779 send_packet(struct jbuf * pkt_buf,
00780 const msp_data_handle_t const * handle)
00781 {
00782
00783
00784 int rc = MSP_DATA_SEND_RETRY;
00785 int retries = 0;
00786
00787 while(rc == MSP_DATA_SEND_RETRY && ++retries <= MAX_MSP_SEND_RETRIES) {
00788 rc = msp_data_send(*handle, pkt_buf, MSP_MSG_TYPE_PACKET);
00789 }
00790
00791 if(rc == MSP_DATA_SEND_FAIL) {
00792
00793 LOG(LOG_ERR, "%s: Failed to forward packet using msp_data_send().",
00794 __func__);
00795
00796 } else if(rc == MSP_DATA_SEND_RETRY) {
00797
00798 LOG(LOG_EMERG, "%s: PANIC: Failed to send a jbuf after %d retries "
00799 "with msp_data_send().", __func__, MAX_MSP_SEND_RETRIES);
00800
00801 } else if(rc != MSP_OK) {
00802
00803 LOG(LOG_ERR, "%s: Failed to forward packet and got unknown return "
00804 "code from msp_data_send().", __func__);
00805 jbuf_free(pkt_buf);
00806 }
00807 return rc;
00808 }
00809
00810
00818 static void *
00819 equilibrium_process_packet(msp_dataloop_args_t * params)
00820 {
00821 struct jbuf * pkt_buf;
00822 jbuf_svc_set_info_t ss_info;
00823 struct ip * ip_pkt;
00824 int type;
00825 uint16_t ip_frag_offset;
00826 uint8_t ip_options_bytes = 0;
00827 int cpu;
00828
00829 atomic_add_uint(1, &loops_running);
00830
00831 cpu = msp_data_get_cpu_num(params->dhandle);
00832 INSIST_ERR(cpu != MSP_NEXT_NONE);
00833
00834
00835 while(!do_shutdown) {
00836
00837
00838 pkt_buf = msp_data_recv(params->dhandle, &type);
00839
00840 if(pkt_buf == NULL) {
00841 continue;
00842 }
00843
00844 if(type != MSP_MSG_TYPE_PACKET) {
00845 LOG(LOG_WARNING, "%s: Message wasn't a packet...dropping",
00846 __func__);
00847 jbuf_free(pkt_buf);
00848 continue;
00849 }
00850
00851 if(pullup_bytes(&pkt_buf, IP_NEEDED_BYTES)) {
00852
00853 LOG(LOG_ERR, "%s: Dropped a packet because there's not enough "
00854 "bytes to form an IP header.", __func__);
00855
00856 jbuf_free(pkt_buf);
00857 continue;
00858 }
00859
00860
00861 ip_pkt = jbuf_to_d(pkt_buf, struct ip *);
00862
00863 if(!ip_pkt || ip_pkt->ip_p != IPPROTO_TCP) {
00864 send_packet(pkt_buf, ¶ms->dhandle);
00865 continue;
00866 }
00867
00868 jbuf_get_svc_set_info(pkt_buf, &ss_info);
00869 INSIST_ERR(ss_info.svc_type == JBUF_SVC_TYPE_INTERFACE);
00870
00871 ip_frag_offset = ntohs(ip_pkt->ip_off);
00872 ip_options_bytes = (ip_pkt->ip_hl * 4) - sizeof(struct ip);
00873
00874 if((ip_frag_offset & IP_OFFMASK)) {
00875
00876
00877 if(process_fragment(ip_pkt, &ss_info)) {
00878
00879 LOG(LOG_NOTICE, "%s: Dropping a packet who's processing failed",
00880 __func__);
00881
00882 jbuf_free(pkt_buf);
00883 continue;
00884 }
00885 } else if(!pullup_bytes(&pkt_buf, TCP_NEEDED_BYTES+ ip_options_bytes)) {
00886
00887
00888 if(process_packet(ip_pkt, &ss_info, cpu)) {
00889
00890 LOG(LOG_NOTICE, "%s: Dropping a packet who's processing failed",
00891 __func__);
00892
00893 jbuf_free(pkt_buf);
00894
00895 continue;
00896 }
00897 } else {
00898 LOG(LOG_NOTICE, "%s: Did not process a packet from %s. There's not "
00899 "enough bytes to form the TCP header.",
00900 __func__, inet_ntoa(ip_pkt->ip_src));
00901 }
00902
00903 send_packet(pkt_buf, ¶ms->dhandle);
00904 }
00905
00906 atomic_sub_uint(1, &loops_running);
00907
00908 return NULL;
00909 }
00910
00911
00912
00913
00914
00924 status_t
00925 init_packet_loops(evContext ctx)
00926 {
00927 int i, rc, core;
00928 msp_dataloop_params_t params;
00929 msp_dataloop_result_t result;
00930 msp_shm_params_t shmp;
00931 msp_objcache_params_t ocp;
00932
00933 shm_handle = table_handle = entry_handle = sessions_table = NULL;
00934 evInitID(&aging_timer);
00935
00936 LOG(LOG_INFO, "%s: Initializing object cache for data loops", __func__);
00937
00938 bzero(&shmp, sizeof(shmp));
00939 bzero(&ocp, sizeof(ocp));
00940
00941
00942
00943 strncpy(shmp.shm_name, EQ_SHARED_MEM_NAME, SHM_NAME_LEN);
00944
00945 if(msp_shm_allocator_init(&shmp) != MSP_OK) {
00946 LOG(LOG_ERR, "%s: Shared memory allocator initialization failed",
00947 __func__);
00948 return EFAIL;
00949 }
00950
00951 shm_handle = shmp.shm;
00952
00953
00954 ocp.oc_shm = shm_handle;
00955 ocp.oc_size = sizeof(hashtable_t);
00956 strncpy(ocp.oc_name, EQ_SESSION_TABLE_NAME, OC_NAME_LEN);
00957
00958 if(msp_objcache_create(&ocp) != MSP_OK) {
00959 LOG(LOG_ERR, "%s: Object-cache allocator initialization failed (table)",
00960 __func__);
00961 return EFAIL;
00962 }
00963
00964 table_handle = ocp.oc;
00965
00966
00967 ocp.oc_shm = shmp.shm;
00968 ocp.oc_size = sizeof(session_entry_t);
00969 strncpy(ocp.oc_name, EQ_SESSION_TABLE_NAME, OC_NAME_LEN);
00970
00971 if (msp_objcache_create(&ocp) != MSP_OK) {
00972 LOG(LOG_ERR, "%s: Object-cache allocator initialization failed (entry)",
00973 __func__);
00974 return EFAIL;
00975 }
00976
00977 entry_handle = ocp.oc;
00978
00979
00980
00981 sessions_table = msp_objcache_alloc(table_handle,
00982 msp_get_current_cpu(), obj_cache_id);
00983 if(sessions_table == NULL) {
00984 LOG(LOG_ERR, "%s: Failed to allocate object cache for sessions table ",
00985 __func__);
00986 return EFAIL;
00987 }
00988
00989 for(i = 0; i < SESSIONS_BUCKET_COUNT; ++i) {
00990 msp_spinlock_init(&sessions_table->hash_bucket[i].bucket_lock);
00991 TAILQ_INIT(&sessions_table->hash_bucket[i].bucket_entries);
00992 }
00993
00994
00995
00996 if(evSetTimer(ctx, aging_cleanup, NULL,
00997 evAddTime(evNowTime(), evConsTime(SESSION_AGE_CHECK_INTERVAL, 0)),
00998 evConsTime(SESSION_AGE_CHECK_INTERVAL, 0),
00999 &aging_timer)) {
01000
01001 LOG(LOG_EMERG, "%s: Failed to initialize a timer to periodically "
01002 "check age of session entries (Error: %m)", __func__);
01003 return EFAIL;
01004 }
01005
01006 LOG(LOG_INFO, "%s: Starting packet loops", __func__);
01007
01008 bzero(¶ms, sizeof(msp_dataloop_params_t));
01009 bzero(&result, sizeof(msp_dataloop_result_t));
01010
01011 loops_running = 0;
01012 do_shutdown = 0;
01013
01014
01015 core = msp_env_get_next_data_core(MSP_NEXT_NONE);
01016 while(core != MSP_NEXT_END) {
01017
01018
01019 rc = msp_env_get_next_data_cpu_in_core(core, MSP_NEXT_NONE);
01020
01021 if(rc != MSP_NEXT_END) {
01022 LOG(LOG_INFO, "%s: Creating a data loop on CPU %d (in core %d)",
01023 __func__, rc, core);
01024
01025 rc = msp_data_create_loop_on_cpu(rc, equilibrium_process_packet,
01026 ¶ms, &result);
01027
01028 if(rc != MSP_OK) {
01029 LOG(LOG_ERR, "%s: Failed to create a data loop (Error: %d)",
01030 __func__, rc);
01031 return EFAIL;
01032 }
01033
01034 } else {
01035 LOG(LOG_ERR, "%s: Found no available data CPUs in data core %d",
01036 __func__, rc);
01037 return EFAIL;
01038 }
01039
01040 core = msp_env_get_next_data_core(core);
01041 }
01042 return SUCCESS;
01043 }
01044
01045
01052 void
01053 destroy_packet_loops(evContext ctx)
01054 {
01055 if(evTestID(aging_timer)) {
01056 evClearTimer(ctx, aging_timer);
01057 evInitID(&aging_timer);
01058 }
01059
01060 do_shutdown = 1;
01061
01062 while(loops_running > 0) ;
01063
01064
01065
01066 if(sessions_table) {
01067 msp_objcache_free(
01068 table_handle, sessions_table, msp_get_current_cpu(), obj_cache_id);
01069 sessions_table = NULL;
01070 }
01071
01072 if(table_handle) {
01073 msp_objcache_destroy(table_handle);
01074 table_handle = NULL;
01075 }
01076
01077 if(entry_handle) {
01078 msp_objcache_destroy(entry_handle);
01079 entry_handle = NULL;
01080 }
01081 }
01082
01083
01099 void
01100 clean_sessions_using_server(uint16_t ss_id,
01101 in_addr_t app_addr,
01102 uint16_t app_port,
01103 in_addr_t server_addr)
01104 {
01105 uint32_t i, cpu;
01106 hash_bucket_t * bucket;
01107 session_entry_t * session, * next;
01108
01109 cpu = msp_get_current_cpu();
01110
01111 for(i = 0; i < SESSIONS_BUCKET_COUNT; ++i) {
01112
01113 bucket = &sessions_table->hash_bucket[i];
01114
01115
01116 INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
01117
01118 session = TAILQ_FIRST(&bucket->bucket_entries);
01119
01120 while(session != NULL) {
01121
01122
01123 next = TAILQ_NEXT(session, entries);
01124
01125
01126 INSIST_ERR(msp_spinlock_lock(&session->lock) == MSP_OK);
01127
01128 if(session->ss_id == ss_id &&
01129 ((session->dir == JBUF_PACKET_DIR_INGRESS &&
01130 session->faddr == app_addr && session->sport == app_port &&
01131 session->saddr == server_addr) ||
01132 (session->dir == JBUF_PACKET_DIR_EGRESS &&
01133 session->faddr == server_addr && session->dport == app_port &&
01134 session->daddr == app_addr))) {
01135
01136
01137
01138 TAILQ_REMOVE(&bucket->bucket_entries, session, entries);
01139 msp_objcache_free(entry_handle, session, cpu, obj_cache_id);
01140 } else {
01141
01142 INSIST_ERR(msp_spinlock_unlock(&session->lock) == MSP_OK);
01143 }
01144 session = next;
01145 }
01146
01147
01148 INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
01149 }
01150
01151 msp_objcache_reclaim(shm_handle);
01152 }
01153
01154
01167 void
01168 clean_sessions_with_app(uint16_t ss_id,
01169 in_addr_t app_addr,
01170 uint16_t app_port)
01171 {
01172 uint32_t i, cpu;
01173 hash_bucket_t * bucket;
01174 session_entry_t * session, * next;
01175
01176 cpu = msp_get_current_cpu();
01177
01178 for(i = 0; i < SESSIONS_BUCKET_COUNT; ++i) {
01179
01180 bucket = &sessions_table->hash_bucket[i];
01181
01182
01183 INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
01184
01185 session = TAILQ_FIRST(&bucket->bucket_entries);
01186
01187 while(session != NULL) {
01188
01189
01190 next = TAILQ_NEXT(session, entries);
01191
01192
01193 INSIST_ERR(msp_spinlock_lock(&session->lock) == MSP_OK);
01194
01195 if(session->ss_id == ss_id &&
01196 ((session->dir == JBUF_PACKET_DIR_INGRESS &&
01197 session->faddr == app_addr && session->sport == app_port) ||
01198 (session->dir == JBUF_PACKET_DIR_EGRESS &&
01199 session->daddr == app_addr && session->dport == app_port))) {
01200
01201
01202
01203 TAILQ_REMOVE(&bucket->bucket_entries, session, entries);
01204 msp_objcache_free(entry_handle, session, cpu, obj_cache_id);
01205 } else {
01206
01207 INSIST_ERR(msp_spinlock_unlock(&session->lock) == MSP_OK);
01208 }
01209 session = next;
01210 }
01211
01212
01213 INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
01214 }
01215
01216 msp_objcache_reclaim(shm_handle);
01217 }
01218
01219
01226 void
01227 clean_sessions_with_service_set(uint16_t ss_id)
01228 {
01229 uint32_t i, cpu;
01230 hash_bucket_t * bucket;
01231 session_entry_t * session, * next;
01232
01233 cpu = msp_get_current_cpu();
01234
01235 for(i = 0; i < SESSIONS_BUCKET_COUNT; ++i) {
01236
01237 bucket = &sessions_table->hash_bucket[i];
01238
01239
01240 INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
01241
01242 session = TAILQ_FIRST(&bucket->bucket_entries);
01243
01244 while(session != NULL) {
01245
01246
01247 next = TAILQ_NEXT(session, entries);
01248
01249
01250 INSIST_ERR(msp_spinlock_lock(&session->lock) == MSP_OK);
01251
01252 if(session->ss_id == ss_id) {
01253
01254
01255
01256 TAILQ_REMOVE(&bucket->bucket_entries, session, entries);
01257 msp_objcache_free(entry_handle, session, cpu, obj_cache_id);
01258 } else {
01259
01260 INSIST_ERR(msp_spinlock_unlock(&session->lock) == MSP_OK);
01261 }
01262 session = next;
01263 }
01264
01265
01266 INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
01267 }
01268
01269 msp_objcache_reclaim(shm_handle);
01270 }