equilibrium-data_packet.c

Go to the documentation of this file.
00001 /*
00002  * $Id: equilibrium-data_packet.c 346460 2009-11-14 05:06:47Z ssiano $
00003  *
00004  * This code is provided as is by Juniper Networks SDK Developer Support.
00005  * It is provided with no warranties or guarantees, and Juniper Networks
00006  * will not provide support or maintenance of this code in any fashion.
00007  * The code is provided only to help a developer better understand how
00008  * the SDK can be used.
00009  * 
00010  * Copyright (c) 2008, Juniper Networks, Inc.
00011  * All rights reserved.
00012  */
00013 
00021 #include "equilibrium-data_main.h"
00022 #include <sys/socket.h>
00023 #include <unistd.h>
00024 #include <netinet/in_systm.h>
00025 #include <netinet/ip.h>
00026 #include <netinet/tcp.h>
00027 #include <sys/jnx/jbuf.h>
00028 #include "equilibrium-data_config.h"
00029 #include "equilibrium-data_monitor.h"
00030 #include "equilibrium-data_packet.h"
00031 
00032 
00033 /*** Constants ***/
00034 
00035 #define EQ_SHARED_MEM_NAME    "equilibrium-data arena"    
00036 
00037 #define EQ_SESSION_TABLE_NAME "equilibrium session table" 
00038 
00039 #define EQ_SESSION_ENTRY_NAME "equilibrium session entry" 
00040 
00041 #define SESSION_AGE_CHECK_INTERVAL 15 
00042 
00043 #define MAX_MSP_SEND_RETRIES 100 
00044 
00045 
00048 #define IP_NEEDED_BYTES (sizeof(struct ip))
00049 
00053 #define TCP_NEEDED_BYTES (sizeof(struct ip) + sizeof(struct tcphdr))
00054 
00058 #define SESSIONS_BUCKET_COUNT (512 * 1024)
00059 
00063 const uint32_t HASH_MASK = SESSIONS_BUCKET_COUNT - 1;
00064 
00065 
00066 /*** Data Structures ***/
00067 
00071 typedef struct session_entry_s {
00072     struct session_entry_s     * reverse; 
00073     in_addr_t                    saddr;   
00074     in_addr_t                    daddr;   
00075     uint16_t                     sport;   
00076     uint16_t                     dport;   
00077     uint8_t                      cpu;     
00078     uint8_t                      dir;     
00079     in_addr_t                    faddr;   
00080     time_t                       age_ts;  
00081     uint16_t                     ss_id;   
00082     uint16_t                     frag_group; 
00083     msp_spinlock_t               lock;    
00084     TAILQ_ENTRY(session_entry_s) entries; 
00085 } session_entry_t;
00086 
00087 
00091 typedef TAILQ_HEAD(ht_bucket_list_s, session_entry_s) ht_bucket_list_t;
00092 
00093 
00097 typedef struct hash_bucket_s {
00098     msp_spinlock_t        bucket_lock;    
00099     ht_bucket_list_t      bucket_entries; 
00100 } hash_bucket_t;
00101 
00102 
00106 typedef struct hashtable_s {
00107     hash_bucket_t hash_bucket[SESSIONS_BUCKET_COUNT]; 
00108 } hashtable_t;
00109 
00110 
00111 static evTimerID aging_timer;        
00112 static msp_shm_handle_t shm_handle;  
00113 static msp_oc_handle_t table_handle; 
00114 static msp_oc_handle_t entry_handle; 
00115 static hashtable_t * sessions_table; 
00116 static atomic_uint_t loops_running;  
00117 static volatile uint8_t do_shutdown; 
00118 static uint32_t obj_cache_id;        
00119 
00120 /*** STATIC/INTERNAL Functions ***/
00121 
00122 
00139 static void
00140 aging_cleanup(evContext ctx __unused,
00141               void * uap __unused,
00142               struct timespec due __unused,
00143               struct timespec inter __unused)
00144 {
00145     const time_t down_server_flow_duration = 60;
00146     const time_t non_app_flow_duration = 300;
00147     
00148     uint32_t i, cpu;
00149     hash_bucket_t * bucket;
00150     session_entry_t * session, * next;
00151     time_t current_time, down_server_flow_timeout, non_app_flow_timeout;
00152     
00153     cpu = msp_get_current_cpu();
00154     
00155     current_time = get_current_time();
00156     down_server_flow_timeout = current_time - down_server_flow_duration;
00157     non_app_flow_timeout = current_time - non_app_flow_duration;
00158     
00159     for(i = 0; i < SESSIONS_BUCKET_COUNT; ++i) {
00160         
00161         bucket = &sessions_table->hash_bucket[i];
00162         
00163         // Get the bucket lock
00164         INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
00165       
00166         session = TAILQ_FIRST(&bucket->bucket_entries);
00167         
00168         while(session != NULL) {
00169 
00170             // keep next to safely remove from list
00171             next = TAILQ_NEXT(session, entries);
00172             
00173             // Get the session lock
00174             INSIST_ERR(msp_spinlock_lock(&session->lock) == MSP_OK);
00175             
00176             // check for timeout/expiry
00177             
00178             if(session->faddr == 0) {
00179                 // an entry for a non-application
00180                 if(session->age_ts < non_app_flow_timeout) {
00181                     
00182                     TAILQ_REMOVE(&bucket->bucket_entries, session, entries);
00183                     msp_objcache_free(entry_handle, session, cpu, obj_cache_id);
00184                     session = next;
00185                     continue;
00186                 }
00187             } else if(session->faddr == (in_addr_t)-1) {
00188                 // entry for a flow matching an application w/ no servers up
00189                 if(session->age_ts < down_server_flow_timeout) {
00190                     
00191                     TAILQ_REMOVE(&bucket->bucket_entries, session, entries);
00192                     msp_objcache_free(entry_handle, session, cpu, obj_cache_id);
00193                     session = next;
00194                     continue;
00195                 }
00196             } else {
00197                 // it is a session entry for an application
00198                 
00199                 if(session->dir == JBUF_PACKET_DIR_INGRESS) {
00200                     if(session->age_ts +
00201                         get_app_session_timeout(session->ss_id,
00202                             session->faddr, session->sport) < current_time) {
00203 
00204                         TAILQ_REMOVE(&bucket->bucket_entries, session, entries);
00205                         msp_objcache_free(entry_handle, session, cpu, obj_cache_id);
00206                         session = next;
00207                         continue;
00208                     }
00209                 } else {
00210                     if(session->age_ts +
00211                        get_app_session_timeout(session->ss_id,
00212                             session->daddr, session->dport) < current_time) {
00213                         
00214                         // since it is egress
00215                         monitor_remove_session_for_server(session->ss_id,
00216                                 session->daddr, session->dport, session->faddr);
00217                         
00218                         TAILQ_REMOVE(&bucket->bucket_entries, session, entries);
00219                         msp_objcache_free(entry_handle, session, cpu, obj_cache_id);
00220                         session = next;
00221                         continue;
00222                     }
00223                 }
00224             }
00225             
00226             // Release the session lock
00227             INSIST_ERR(msp_spinlock_unlock(&session->lock) == MSP_OK);
00228             session = next;
00229         }
00230         
00231         // Release the bucket lock
00232         INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
00233     }
00234     
00235     monitor_send_stats(); // (update to mgmt component)
00236     
00237     msp_objcache_reclaim(shm_handle);
00238 }
00239 
00240 
00263 static void
00264 checksum_adjust(
00265     unsigned char * chksum,
00266     unsigned char * optr,
00267     int olen,
00268     unsigned char * nptr,
00269     int nlen)
00270 {
00271     long x, old, new_;
00272     x=chksum[0]*256+chksum[1];
00273     x=~x & 0xFFFF;
00274     while (olen)
00275     {
00276         old=optr[0]*256+optr[1]; optr+=2;
00277         x-=old & 0xffff;
00278         if (x<=0) { x--; x&=0xffff; }
00279         olen-=2;
00280     }
00281     while (nlen)
00282     {
00283         new_=nptr[0]*256+nptr[1]; nptr+=2;
00284         x+=new_ & 0xffff;
00285         if (x & 0x10000) { x++; x&=0xffff; }
00286         nlen-=2;
00287     }
00288     x=~x & 0xFFFF;
00289     chksum[0]=x/256; chksum[1]=x & 0xff;
00290 }
00291 
00292 
00307 static status_t
00308 pullup_bytes(struct jbuf ** pkt_buf, uint16_t num_bytes)
00309 {
00310     struct jbuf * tmp_buf;
00311     
00312     if(jbuf_particle_get_data_length(*pkt_buf) < num_bytes) {
00313         tmp_buf = jbuf_pullup((*pkt_buf), num_bytes);
00314         
00315         if(!tmp_buf) { // check it didn't fail 
00316             return EFAIL;
00317         }
00318         
00319         *pkt_buf = tmp_buf;
00320     }
00321     return SUCCESS;
00322 }
00323 
00324 
00344 static status_t
00345 process_packet(struct ip * ip_pkt,
00346                jbuf_svc_set_info_t * ss_info,
00347                int cpu)
00348 {
00349     uint32_t hash;
00350     uint16_t * key;
00351     int len;
00352     in_addr_t facade, real;
00353     hash_bucket_t * bucket;
00354     session_entry_t * session;
00355     struct tcphdr * tcp_hdr = 
00356         (struct tcphdr *)((uint32_t *)ip_pkt + ip_pkt->ip_hl);
00357     
00358     len = ip_pkt->ip_len - (ip_pkt->ip_hl * 4); // TCP hdr + TCP segment length
00359     
00360     // subtract length of TCP header to get payload length
00361     len -= tcp_hdr->th_off * 4;
00362     
00363     if(tcp_hdr->th_off < 5 || len < 0) { // check for malformed TCP header
00364         LOG(LOG_ERR, "%s: Found a malformed TCP header in a packet.",
00365                 __func__);
00366         return EFAIL;
00367     }
00368     
00369     // get hash of the two IP addresses
00370     
00371     key = (uint16_t *)&ip_pkt->ip_src;
00372     hash = 0x5F5F;
00373     hash = ((hash << 5) + hash) ^ key[0];
00374     hash = ((hash << 5) + hash) ^ key[1];
00375     hash = ((hash << 5) + hash) ^ key[2];
00376     hash = ((hash << 5) + hash) ^ key[3];
00377     
00378     hash &= HASH_MASK;
00379     
00380     // use hash to lookup a hash bucket and find the matching entry
00381     
00382     bucket = &sessions_table->hash_bucket[hash]; // get bucket
00383     
00384     // Get the bucket lock
00385     INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
00386   
00387     session = TAILQ_FIRST(&bucket->bucket_entries);
00388     
00389     while(session != NULL) {
00390         if(session->saddr == ip_pkt->ip_src.s_addr
00391                 && session->daddr == ip_pkt->ip_dst.s_addr
00392                 && session->ss_id == ss_info->info.intf_type.svc_set_id
00393                 && session->dport == tcp_hdr->th_dport
00394                 && session->sport == tcp_hdr->th_sport) {
00395 
00396             break; // match
00397         }
00398         session = TAILQ_NEXT(session, entries);
00399     }
00400     
00401     // if there's no matching session, then create one... the slow path
00402     if(session == NULL) {
00403         if(ss_info->pkt_dir == JBUF_PACKET_DIR_INGRESS) {
00404             // not initiated by a client,
00405             // so definietly can't belong to an application
00406             INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
00407             return SUCCESS;
00408         }
00409         // SLOW PATH FOR EGRESS TRAFFIC:
00410         
00411         session = msp_objcache_alloc(entry_handle, cpu, obj_cache_id);
00412         if(session == NULL) {
00413             // Release the bucket lock
00414             INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
00415             
00416             LOG(LOG_ERR, "%s: Failed to allocate object cache for a "
00417                     "session entry", __func__);
00418             return EFAIL;
00419         }
00420         session->reverse = msp_objcache_alloc(entry_handle, cpu, obj_cache_id);
00421         if(session->reverse == NULL) {
00422             
00423             msp_objcache_free(entry_handle, session, cpu, obj_cache_id);
00424             
00425             // Release the bucket lock
00426             INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
00427             
00428             LOG(LOG_ERR, "%s: Failed to allocate object cache for a "
00429                     "session reverse entry", __func__);
00430             return EFAIL;
00431         }
00432         
00433         // init and grab lock
00434         msp_spinlock_init(&session->lock);
00435         INSIST_ERR(msp_spinlock_lock(&session->lock) == MSP_OK);
00436         
00437         TAILQ_INSERT_HEAD(&bucket->bucket_entries, session, entries);
00438         
00439         session->age_ts = get_current_time();
00440         session->saddr = ip_pkt->ip_src.s_addr;
00441         session->daddr = ip_pkt->ip_dst.s_addr;
00442         session->sport = tcp_hdr->th_sport;
00443         session->dport = tcp_hdr->th_dport;
00444         session->ss_id = ss_info->info.intf_type.svc_set_id;
00445         session->dir = JBUF_PACKET_DIR_EGRESS;
00446         session->cpu = cpu;
00447         
00448 
00449         // Release the bucket lock
00450         INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
00451         
00452         // init reverse
00453         
00454         // init and grab lock
00455         msp_spinlock_init(&session->reverse->lock);
00456         INSIST_ERR(msp_spinlock_lock(&session->reverse->lock) == MSP_OK);
00457         
00458         session->reverse->age_ts = session->age_ts;
00459         session->reverse->saddr = ip_pkt->ip_dst.s_addr;
00460         session->reverse->daddr = ip_pkt->ip_src.s_addr;
00461         session->reverse->sport = tcp_hdr->th_dport;
00462         session->reverse->dport = tcp_hdr->th_sport;
00463         session->reverse->ss_id = ss_info->info.intf_type.svc_set_id;
00464         session->reverse->reverse = session;
00465         session->reverse->dir = JBUF_PACKET_DIR_INGRESS;
00466         session->reverse->cpu = cpu;
00467         
00468         // find out if this flow will match an application
00469         // ip_dst must match that of an application in this service set
00470         
00471         session->faddr = monitor_get_server_for(
00472                 session->ss_id, session->daddr, session->dport);
00473         
00474         if(session->faddr == (in_addr_t)-1) {
00475             // indicate it is for an app, but no servers are up
00476             session->reverse->faddr = (in_addr_t)-1;
00477             session->reverse->saddr = ip_pkt->ip_dst.s_addr;
00478         } else if(session->faddr == 0) {
00479             // indicate not for an app
00480             session->reverse->faddr = 0;
00481             session->reverse->saddr = ip_pkt->ip_dst.s_addr;
00482         } else {
00483             // session->faddr is actually the real server address now
00484             
00485             // for reverse set faddr to the facade
00486             session->reverse->faddr = ip_pkt->ip_dst.s_addr;
00487             
00488             // ingress src will be real server
00489             session->reverse->saddr = session->faddr;
00490         }
00491         
00492         session->reverse->frag_group = 0;
00493         
00494         // If it is the first fragment note the frag ID
00495         if((ntohs(ip_pkt->ip_off) & IP_MF)) {
00496             session->frag_group = ip_pkt->ip_id;
00497         } else {
00498             session->frag_group = 0;
00499         }
00500         
00501         // get hash of the reverse flow to find bucket for reverse entry
00502 
00503         hash = 0x5F5F;
00504         key = (uint16_t *)&session->reverse->saddr;
00505         hash = ((hash << 5) + hash) ^ key[0];
00506         hash = ((hash << 5) + hash) ^ key[1];
00507         hash = ((hash << 5) + hash) ^ key[2];
00508         hash = ((hash << 5) + hash) ^ key[3];        
00509         hash &= HASH_MASK;
00510         
00511         // use hash to lookup a hash bucket and find the matching entry
00512         bucket = &sessions_table->hash_bucket[hash]; // get bucket
00513         
00514         // Get the bucket lock
00515         INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
00516         
00517         TAILQ_INSERT_HEAD(&bucket->bucket_entries, session->reverse, entries);
00518         
00519         // Release the bucket lock
00520         INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);        
00521         
00522         if(session->faddr == 0) {
00523             // Release the session lock
00524             INSIST_ERR(msp_spinlock_unlock(&session->lock) == MSP_OK);
00525             INSIST_ERR(msp_spinlock_unlock(&session->reverse->lock) == MSP_OK);
00526             return SUCCESS; // not an application, so nothing to change
00527         }
00528         if(session->faddr == (in_addr_t)-1) {
00529             // Release the session lock
00530             INSIST_ERR(msp_spinlock_unlock(&session->lock) == MSP_OK);
00531             INSIST_ERR(msp_spinlock_unlock(&session->reverse->lock) == MSP_OK);
00532             
00533             LOG(LOG_ERR, "%s: Dropping packet found to an application with no "
00534                     "servers up.", __func__);
00535             return EFAIL; // indicate to drop, when no servers are up
00536         }
00537         
00538         // going to a real server for an application, so
00539         // replace the facade's address with the real server's address
00540         
00541         // adjust IP checksum taking IP addresses into account
00542         checksum_adjust((unsigned char *)&ip_pkt->ip_sum,
00543             (unsigned char *)&ip_pkt->ip_dst, sizeof(in_addr_t),
00544             (unsigned char *)&session->faddr, sizeof(in_addr_t));
00545         
00546         // adjust TCP checksum taking IP addresses into account
00547         checksum_adjust((unsigned char *)&tcp_hdr->th_sum,
00548             (unsigned char *)&ip_pkt->ip_dst, sizeof(in_addr_t),
00549             (unsigned char *)&session->faddr, sizeof(in_addr_t));
00550         
00551         // change address
00552         facade = ip_pkt->ip_dst.s_addr;
00553         real = ip_pkt->ip_dst.s_addr = session->faddr;
00554         
00555         // Release the session lock (don't need for filter)
00556         INSIST_ERR(msp_spinlock_unlock(&session->lock) == MSP_OK);
00557         INSIST_ERR(msp_spinlock_unlock(&session->reverse->lock) == MSP_OK);
00558         
00559         return SUCCESS;
00560     }
00561     
00562     // else there's a matching session, so use it to forward the traffic
00563     // FAST PATH:
00564 
00565     // Get the session lock
00566     INSIST_ERR(msp_spinlock_lock(&session->lock) == MSP_OK);
00567     
00568     // Release the bucket lock
00569     INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
00570     
00571     // If it is the first fragment note the frag ID
00572     if((ntohs(ip_pkt->ip_off) & IP_MF)) {
00573         session->frag_group = ip_pkt->ip_id;
00574     } else {
00575         session->frag_group = 0;
00576     }
00577     
00578     if(session->faddr == (in_addr_t)-1) {
00579         // all servers down for the application for this flow
00580         
00581         // Release the session lock
00582         INSIST_ERR(msp_spinlock_unlock(&session->lock) == MSP_OK);
00583         
00584         LOG(LOG_ERR, "%s: Dropping packet found to an application with no "
00585                 "servers up.", __func__);
00586         
00587         // don't refresh timestamp, and don't send out traffic (drop)
00588         return EFAIL;
00589     }
00590     
00591     session->age_ts = get_current_time();
00592     
00593     if(session->faddr != 0) { // it is for an application
00594         if(ss_info->pkt_dir == JBUF_PACKET_DIR_INGRESS) {
00595             // coming from a server, so replace the real server's address (src) 
00596             // with the facade's address  (stored in faddr for INGRESS)
00597             
00598             // adjust IP checksum taking IP addresses into account
00599             checksum_adjust((unsigned char *)&ip_pkt->ip_sum,
00600                 (unsigned char *)&ip_pkt->ip_src, sizeof(in_addr_t),
00601                 (unsigned char *)&session->faddr, sizeof(in_addr_t));
00602             
00603             // adjust TCP checksum taking IP addresses into account
00604             checksum_adjust((unsigned char *)&tcp_hdr->th_sum,
00605                 (unsigned char *)&ip_pkt->ip_src, sizeof(in_addr_t),
00606                 (unsigned char *)&session->faddr, sizeof(in_addr_t));
00607             
00608             // change address
00609             ip_pkt->ip_src.s_addr = session->faddr;
00610         } else {
00611             // going to a server, so replace the facade's address (dst)
00612             // with the real server's address (stored in faddr for EGRESS)
00613             
00614             // adjust IP checksum taking IP addresses into account
00615             checksum_adjust((unsigned char *)&ip_pkt->ip_sum,
00616                 (unsigned char *)&ip_pkt->ip_dst, sizeof(in_addr_t),
00617                 (unsigned char *)&session->faddr, sizeof(in_addr_t));
00618             
00619             // adjust TCP checksum taking IP addresses into account
00620             checksum_adjust((unsigned char *)&tcp_hdr->th_sum,
00621                 (unsigned char *)&ip_pkt->ip_dst, sizeof(in_addr_t),
00622                 (unsigned char *)&session->faddr, sizeof(in_addr_t));
00623             
00624             // change address
00625             facade = ip_pkt->ip_dst.s_addr;
00626             real = ip_pkt->ip_dst.s_addr = session->faddr;
00627             
00628             // Release the session lock (faster...don't need for filter)
00629             INSIST_ERR(msp_spinlock_unlock(&session->lock) == MSP_OK);
00630             
00631             return SUCCESS;
00632         }
00633     }
00634     
00635     // Release the session lock
00636     INSIST_ERR(msp_spinlock_unlock(&session->lock) == MSP_OK);
00637     
00638     return SUCCESS;
00639 }
00640 
00641 
00655 static status_t
00656 process_fragment(struct ip * ip_pkt, jbuf_svc_set_info_t * ss_info)
00657 {
00658     uint32_t hash;
00659     uint16_t * key;
00660     hash_bucket_t * bucket;
00661     session_entry_t * session;
00662     
00663     // get hash of the two IP addresses
00664     
00665     key = (uint16_t *)&ip_pkt->ip_src;
00666     hash = 0x5F5F;
00667     hash = ((hash << 5) + hash) ^ key[0];
00668     hash = ((hash << 5) + hash) ^ key[1];
00669     hash = ((hash << 5) + hash) ^ key[2];
00670     hash = ((hash << 5) + hash) ^ key[3];
00671     
00672     hash &= HASH_MASK;    
00673     
00674     // use hash to lookup a hash bucket and find the matching entry
00675     
00676     bucket = &sessions_table->hash_bucket[hash]; // get bucket
00677     
00678     // Get the bucket lock
00679     INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
00680   
00681     session = TAILQ_FIRST(&bucket->bucket_entries);
00682     
00683     while(session != NULL) {
00684         if(session->saddr == ip_pkt->ip_src.s_addr 
00685                 && session->daddr == ip_pkt->ip_dst.s_addr
00686                 && session->ss_id == ss_info->info.intf_type.svc_set_id
00687                 && session->frag_group == ip_pkt->ip_id) {
00688 
00689             break; // match
00690         }
00691         session = TAILQ_NEXT(session, entries);
00692     }
00693     
00694     // if there's no matching session, so we haven't seen the first fragment yet
00695     if(session == NULL) {
00696         // Release the bucket lock
00697         INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
00698         
00699         LOG(LOG_WARNING, "%s: Received a packet from %s. It is an IP "
00700                 "fragment, but we have not yet received the first "
00701                 "fragment, so we cannot tell if it belongs to an "
00702                 "equilibrium application",
00703             __func__, inet_ntoa(ip_pkt->ip_src));
00704         
00705         return SUCCESS; // don't know anything about this flow
00706     }
00707     
00708     // else there's a matching session, so use it to forward the traffic
00709     // FAST PATH:
00710 
00711     // Get the session lock
00712     INSIST_ERR(msp_spinlock_lock(&session->lock) == MSP_OK);
00713     
00714     // Release the bucket lock
00715     INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
00716     
00717     if(session->faddr == (in_addr_t)-1) {
00718         // all servers down for the application for this flow
00719         
00720         // Release the session lock
00721         INSIST_ERR(msp_spinlock_unlock(&session->lock) == MSP_OK);
00722         
00723         LOG(LOG_ERR, "%s: Dropping packet found to an application with no "
00724                 "servers up.", __func__);
00725         
00726         // don't refresh timestamp, and don't send out traffic (drop)
00727         return EFAIL;
00728     }
00729     
00730     session->age_ts = get_current_time();
00731     
00732     if(session->faddr != 0) { // it is for an application
00733         if(ss_info->pkt_dir == JBUF_PACKET_DIR_INGRESS) {
00734             // coming from a server, so replace the real server's address (src) 
00735             // with the facade's address  (stored in faddr for INGRESS)
00736             
00737             // adjust IP checksum taking IP addresses into account
00738             checksum_adjust((unsigned char *)&ip_pkt->ip_sum,
00739                 (unsigned char *)&ip_pkt->ip_src, sizeof(in_addr_t),
00740                 (unsigned char *)&session->faddr, sizeof(in_addr_t));
00741             
00742             // change address
00743             ip_pkt->ip_src.s_addr = session->faddr;
00744         } else {
00745             // going to a server, so replace the facade's address (dst)
00746             // with the real server's address (stored in faddr for EGRESS)
00747             
00748             // adjust IP checksum taking IP addresses into account
00749             checksum_adjust((unsigned char *)&ip_pkt->ip_sum,
00750                 (unsigned char *)&ip_pkt->ip_dst, sizeof(in_addr_t),
00751                 (unsigned char *)&session->faddr, sizeof(in_addr_t));
00752             
00753             // change address
00754             ip_pkt->ip_dst.s_addr = session->faddr;
00755         }
00756     }
00757     
00758     // Release the session lock
00759     INSIST_ERR(msp_spinlock_unlock(&session->lock) == MSP_OK);
00760     
00761     return SUCCESS;
00762 }
00763 
00764 
00778 static int
00779 send_packet(struct jbuf * pkt_buf,
00780             const msp_data_handle_t const * handle)
00781 {
00782     // enqueue it back into the FIFO to go out
00783     
00784     int rc = MSP_DATA_SEND_RETRY;
00785     int retries = 0;
00786     
00787     while(rc == MSP_DATA_SEND_RETRY && ++retries <= MAX_MSP_SEND_RETRIES) {
00788         rc = msp_data_send(*handle, pkt_buf, MSP_MSG_TYPE_PACKET);
00789     }
00790     
00791     if(rc == MSP_DATA_SEND_FAIL) {
00792         
00793         LOG(LOG_ERR, "%s: Failed to forward packet using msp_data_send().",
00794             __func__);
00795         
00796     } else if(rc == MSP_DATA_SEND_RETRY) { // Panic / calls exit(1)
00797         
00798         LOG(LOG_EMERG, "%s: PANIC: Failed to send a jbuf after %d retries "
00799             "with msp_data_send().", __func__, MAX_MSP_SEND_RETRIES);
00800 
00801     } else if(rc != MSP_OK) {
00802         
00803         LOG(LOG_ERR, "%s: Failed to forward packet and got unknown return "
00804             "code from msp_data_send().", __func__);
00805         jbuf_free(pkt_buf);
00806     }
00807     return rc;
00808 }
00809 
00810 
00818 static void *
00819 equilibrium_process_packet(msp_dataloop_args_t * params)
00820 {
00821     struct jbuf * pkt_buf;
00822     jbuf_svc_set_info_t ss_info;
00823     struct ip * ip_pkt;
00824     int type;
00825     uint16_t ip_frag_offset;
00826     uint8_t ip_options_bytes = 0;
00827     int cpu;
00828     
00829     atomic_add_uint(1, &loops_running);
00830     
00831     cpu = msp_data_get_cpu_num(params->dhandle);
00832     INSIST_ERR(cpu != MSP_NEXT_NONE);
00833     
00834     // Start the packet loop...
00835     while(!do_shutdown) {
00836         
00837         // Dequeue a packet from the rx-fifo
00838         pkt_buf = msp_data_recv(params->dhandle, &type);
00839         
00840         if(pkt_buf == NULL) { // Didn't get anything
00841             continue;
00842         }
00843 
00844         if(type != MSP_MSG_TYPE_PACKET) { // Didn't get network traffic
00845             LOG(LOG_WARNING, "%s: Message wasn't a packet...dropping",
00846                 __func__);
00847             jbuf_free(pkt_buf);
00848             continue;
00849         }
00850 
00851         if(pullup_bytes(&pkt_buf, IP_NEEDED_BYTES)) {
00852         
00853             LOG(LOG_ERR, "%s: Dropped a packet because there's not enough "
00854                 "bytes to form an IP header.", __func__);
00855             
00856             jbuf_free(pkt_buf);
00857             continue;
00858         }
00859         
00860         // Get IP header
00861         ip_pkt = jbuf_to_d(pkt_buf, struct ip *);
00862         
00863         if(!ip_pkt || ip_pkt->ip_p != IPPROTO_TCP) {
00864             send_packet(pkt_buf, &params->dhandle);
00865             continue;
00866         }
00867         
00868         jbuf_get_svc_set_info(pkt_buf, &ss_info);
00869         INSIST_ERR(ss_info.svc_type == JBUF_SVC_TYPE_INTERFACE);
00870         
00871         ip_frag_offset = ntohs(ip_pkt->ip_off);
00872         ip_options_bytes = (ip_pkt->ip_hl * 4) - sizeof(struct ip);
00873         
00874         if((ip_frag_offset & IP_OFFMASK)) {
00875             
00876             // It's a fragment, but not the first fragment
00877             if(process_fragment(ip_pkt, &ss_info)) {
00878 
00879                 LOG(LOG_NOTICE, "%s: Dropping a packet who's processing failed",
00880                     __func__);
00881                 
00882                 jbuf_free(pkt_buf);
00883                 continue;
00884             }
00885         } else if(!pullup_bytes(&pkt_buf, TCP_NEEDED_BYTES+ ip_options_bytes)) {
00886             
00887             // It is TCP, and could be the first fragment or normal
00888             if(process_packet(ip_pkt, &ss_info, cpu)) {
00889                 
00890                 LOG(LOG_NOTICE, "%s: Dropping a packet who's processing failed",
00891                     __func__);
00892                 
00893                 jbuf_free(pkt_buf);
00894                 
00895                 continue;
00896             }
00897         } else {
00898             LOG(LOG_NOTICE, "%s: Did not process a packet from %s. There's not "
00899                 "enough bytes to form the TCP header.",
00900                 __func__, inet_ntoa(ip_pkt->ip_src));
00901         }
00902         
00903         send_packet(pkt_buf, &params->dhandle);
00904     }
00905     
00906     atomic_sub_uint(1, &loops_running);
00907     
00908     return NULL; // thread is done if it reaches this point
00909 }
00910 
00911 
00912 /*** GLOBAL/EXTERNAL Functions ***/
00913 
00914 
00924 status_t
00925 init_packet_loops(evContext ctx)
00926 {
00927     int i, rc, core;
00928     msp_dataloop_params_t params;
00929     msp_dataloop_result_t result;
00930     msp_shm_params_t shmp;
00931     msp_objcache_params_t ocp;
00932     
00933     shm_handle = table_handle = entry_handle = sessions_table = NULL;
00934     evInitID(&aging_timer);
00935     
00936     LOG(LOG_INFO, "%s: Initializing object cache for data loops", __func__);
00937     
00938     bzero(&shmp, sizeof(shmp));
00939     bzero(&ocp, sizeof(ocp));
00940 
00941     // allocate & initialize the shared memory
00942     
00943     strncpy(shmp.shm_name, EQ_SHARED_MEM_NAME, SHM_NAME_LEN);
00944     
00945     if(msp_shm_allocator_init(&shmp) != MSP_OK) {
00946         LOG(LOG_ERR, "%s: Shared memory allocator initialization failed",
00947                 __func__);
00948         return EFAIL;
00949     }
00950 
00951     shm_handle = shmp.shm; // get handle
00952     
00953     // create object cache allocator for the session/flow look up table
00954     ocp.oc_shm = shm_handle;
00955     ocp.oc_size  = sizeof(hashtable_t);
00956     strncpy(ocp.oc_name, EQ_SESSION_TABLE_NAME, OC_NAME_LEN);
00957 
00958     if(msp_objcache_create(&ocp) != MSP_OK) {
00959         LOG(LOG_ERR, "%s: Object-cache allocator initialization failed (table)",
00960                 __func__);
00961         return EFAIL;
00962     }
00963 
00964     table_handle = ocp.oc; // get handle
00965 
00966     // create object cache allocator for the session/flow look up table entries
00967     ocp.oc_shm = shmp.shm;
00968     ocp.oc_size  = sizeof(session_entry_t);
00969     strncpy(ocp.oc_name, EQ_SESSION_TABLE_NAME, OC_NAME_LEN);
00970 
00971     if (msp_objcache_create(&ocp) != MSP_OK) {
00972         LOG(LOG_ERR, "%s: Object-cache allocator initialization failed (entry)",
00973                 __func__);
00974         return EFAIL;
00975     }
00976 
00977     entry_handle = ocp.oc; // get handle
00978     
00979     // allocate sessions_table in OC:
00980     
00981     sessions_table = msp_objcache_alloc(table_handle,
00982             msp_get_current_cpu(), obj_cache_id);
00983     if(sessions_table == NULL) {
00984         LOG(LOG_ERR, "%s: Failed to allocate object cache for sessions table ",
00985                 __func__);
00986         return EFAIL;
00987     }
00988     
00989     for(i = 0; i < SESSIONS_BUCKET_COUNT; ++i) {
00990         msp_spinlock_init(&sessions_table->hash_bucket[i].bucket_lock);
00991         TAILQ_INIT(&sessions_table->hash_bucket[i].bucket_entries);
00992     }
00993     
00994     // start ager
00995     
00996     if(evSetTimer(ctx, aging_cleanup, NULL,
00997             evAddTime(evNowTime(), evConsTime(SESSION_AGE_CHECK_INTERVAL, 0)),
00998             evConsTime(SESSION_AGE_CHECK_INTERVAL, 0),
00999             &aging_timer)) {
01000 
01001         LOG(LOG_EMERG, "%s: Failed to initialize a timer to periodically "
01002             "check age of session entries (Error: %m)", __func__);
01003         return EFAIL;
01004     }
01005     
01006     LOG(LOG_INFO, "%s: Starting packet loops", __func__);
01007     
01008     bzero(&params, sizeof(msp_dataloop_params_t));
01009     bzero(&result, sizeof(msp_dataloop_result_t));
01010     
01011     loops_running = 0;
01012     do_shutdown = 0;
01013     
01014     // go through the available data cores
01015     core = msp_env_get_next_data_core(MSP_NEXT_NONE);
01016     while(core != MSP_NEXT_END) {
01017         
01018         // for each data core, create only one data cpu
01019         rc = msp_env_get_next_data_cpu_in_core(core, MSP_NEXT_NONE);
01020         
01021         if(rc != MSP_NEXT_END) {
01022             LOG(LOG_INFO, "%s: Creating a data loop on CPU %d (in core %d)",
01023                     __func__, rc, core);
01024             
01025             rc = msp_data_create_loop_on_cpu(rc, equilibrium_process_packet,
01026                     &params, &result);
01027             
01028             if(rc != MSP_OK) {
01029                 LOG(LOG_ERR, "%s: Failed to create a data loop (Error: %d)",
01030                         __func__, rc);
01031                 return EFAIL;
01032             }
01033             
01034         } else {
01035             LOG(LOG_ERR, "%s: Found no available data CPUs in data core %d",
01036                 __func__, rc);
01037             return EFAIL;
01038         }
01039                         
01040         core = msp_env_get_next_data_core(core);        
01041     }
01042     return SUCCESS;
01043 }
01044 
01045 
01052 void
01053 destroy_packet_loops(evContext ctx)
01054 {
01055     if(evTestID(aging_timer)) {
01056         evClearTimer(ctx, aging_timer);
01057         evInitID(&aging_timer);
01058     }
01059     
01060     do_shutdown = 1;
01061     
01062     while(loops_running > 0) ; // note the spinning while waiting
01063     
01064     // now they are all shutdown
01065     
01066     if(sessions_table) {
01067         msp_objcache_free(
01068                 table_handle, sessions_table, msp_get_current_cpu(), obj_cache_id);
01069         sessions_table = NULL;
01070     }
01071     
01072     if(table_handle) {
01073         msp_objcache_destroy(table_handle);
01074         table_handle = NULL;
01075     }
01076     
01077     if(entry_handle) {
01078         msp_objcache_destroy(entry_handle);
01079         entry_handle = NULL;
01080     }
01081 }
01082 
01083 
01099 void
01100 clean_sessions_using_server(uint16_t ss_id,
01101                             in_addr_t app_addr,
01102                             uint16_t app_port,
01103                             in_addr_t server_addr)
01104 {
01105     uint32_t i, cpu;
01106     hash_bucket_t * bucket;
01107     session_entry_t * session, * next;
01108     
01109     cpu = msp_get_current_cpu();
01110     
01111     for(i = 0; i < SESSIONS_BUCKET_COUNT; ++i) {
01112         
01113         bucket = &sessions_table->hash_bucket[i];
01114         
01115         // Get the bucket lock
01116         INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
01117       
01118         session = TAILQ_FIRST(&bucket->bucket_entries);
01119         
01120         while(session != NULL) {
01121 
01122             // keep next to safely remove from list
01123             next = TAILQ_NEXT(session, entries);
01124             
01125             // Get the session lock
01126             INSIST_ERR(msp_spinlock_lock(&session->lock) == MSP_OK);
01127             
01128             if(session->ss_id == ss_id &&
01129                 ((session->dir == JBUF_PACKET_DIR_INGRESS &&
01130                    session->faddr == app_addr && session->sport == app_port &&
01131                         session->saddr == server_addr) ||
01132                   (session->dir == JBUF_PACKET_DIR_EGRESS &&
01133                    session->faddr == server_addr && session->dport == app_port &&
01134                         session->daddr == app_addr))) {
01135                 
01136                 // we need to delete this session
01137                 
01138                 TAILQ_REMOVE(&bucket->bucket_entries, session, entries);
01139                 msp_objcache_free(entry_handle, session, cpu, obj_cache_id); 
01140             } else {
01141                 // Release the session lock
01142                 INSIST_ERR(msp_spinlock_unlock(&session->lock) == MSP_OK);
01143             }
01144             session = next;
01145         }
01146         
01147         // Release the bucket lock
01148         INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
01149     }
01150     
01151     msp_objcache_reclaim(shm_handle);
01152 }
01153 
01154 
01167 void
01168 clean_sessions_with_app(uint16_t ss_id,
01169                         in_addr_t app_addr,
01170                         uint16_t app_port)
01171 {
01172     uint32_t i, cpu;
01173     hash_bucket_t * bucket;
01174     session_entry_t * session, * next;
01175     
01176     cpu = msp_get_current_cpu();
01177     
01178     for(i = 0; i < SESSIONS_BUCKET_COUNT; ++i) {
01179         
01180         bucket = &sessions_table->hash_bucket[i];
01181         
01182         // Get the bucket lock
01183         INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
01184       
01185         session = TAILQ_FIRST(&bucket->bucket_entries);
01186         
01187         while(session != NULL) {
01188 
01189             // keep next to safely remove from list
01190             next = TAILQ_NEXT(session, entries);
01191             
01192             // Get the session lock
01193             INSIST_ERR(msp_spinlock_lock(&session->lock) == MSP_OK);
01194             
01195             if(session->ss_id == ss_id &&
01196                 ((session->dir == JBUF_PACKET_DIR_INGRESS &&
01197                   session->faddr == app_addr && session->sport == app_port) ||
01198                  (session->dir == JBUF_PACKET_DIR_EGRESS &&
01199                   session->daddr == app_addr && session->dport == app_port))) {
01200                 
01201                 // we need to delete this session
01202                 
01203                 TAILQ_REMOVE(&bucket->bucket_entries, session, entries);
01204                 msp_objcache_free(entry_handle, session, cpu, obj_cache_id); 
01205             } else {
01206                 // Release the session lock
01207                 INSIST_ERR(msp_spinlock_unlock(&session->lock) == MSP_OK);
01208             }
01209             session = next;
01210         }
01211         
01212         // Release the bucket lock
01213         INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
01214     }
01215     
01216     msp_objcache_reclaim(shm_handle);
01217 }
01218 
01219 
01226 void
01227 clean_sessions_with_service_set(uint16_t ss_id)
01228 {
01229     uint32_t i, cpu;
01230     hash_bucket_t * bucket;
01231     session_entry_t * session, * next;
01232 
01233     cpu = msp_get_current_cpu();
01234     
01235     for(i = 0; i < SESSIONS_BUCKET_COUNT; ++i) {
01236         
01237         bucket = &sessions_table->hash_bucket[i];
01238         
01239         // Get the bucket lock
01240         INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
01241       
01242         session = TAILQ_FIRST(&bucket->bucket_entries);
01243 
01244         while(session != NULL) {
01245 
01246             // keep next to safely remove from list
01247             next = TAILQ_NEXT(session, entries);
01248             
01249             // Get the session lock
01250             INSIST_ERR(msp_spinlock_lock(&session->lock) == MSP_OK);
01251             
01252             if(session->ss_id == ss_id) {
01253                 
01254                 // we need to delete this session
01255                 
01256                 TAILQ_REMOVE(&bucket->bucket_entries, session, entries);
01257                 msp_objcache_free(entry_handle, session, cpu, obj_cache_id); 
01258             } else {
01259                 // Release the session lock
01260                 INSIST_ERR(msp_spinlock_unlock(&session->lock) == MSP_OK);
01261             }
01262             session = next;
01263         }
01264         
01265         // Release the bucket lock
01266         INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
01267     }
01268     
01269     msp_objcache_reclaim(shm_handle);
01270 }

2007-2009 Juniper Networks, Inc. All rights reserved. The information contained herein is confidential information of Juniper Networks, Inc., and may not be used, disclosed, distributed, modified, or copied without the prior written consent of Juniper Networks, Inc. in an express license. This information is subject to change by Juniper Networks, Inc. Juniper Networks, the Juniper Networks logo, and JUNOS are registered trademarks of Juniper Networks, Inc. in the United States and other countries. All other trademarks, service marks, registered trademarks, or registered service marks are the property of their respective owners.
Generated on Sun May 30 20:26:56 2010 for SDK Your Net Corporation Equilibrium Load Balancer Example: equilibrium-data 1.0 by Doxygen 1.5.1