monitube-data_packet.c

Go to the documentation of this file.
00001 /*
00002  * $Id: monitube-data_packet.c 346460 2009-11-14 05:06:47Z ssiano $
00003  *
00004  * This code is provided as is by Juniper Networks SDK Developer Support.
00005  * It is provided with no warranties or guarantees, and Juniper Networks
00006  * will not provide support or maintenance of this code in any fashion.
00007  * The code is provided only to help a developer better understand how
00008  * the SDK can be used.
00009  *
00010  * Copyright (c) 2008, Juniper Networks, Inc.
00011  * All rights reserved.
00012  */
00013 
00021 #include "monitube-data_main.h"
00022 #include <sys/socket.h>
00023 #include <unistd.h>
00024 #include <netinet/in_systm.h>
00025 #include <netinet/ip.h>
00026 #include <netinet/udp.h>
00027 #include <sys/jnx/jbuf.h>
00028 #include <jnx/ipc_types.h> // need this one indirectly for next one
00029 #include <jnx/ipc_msp_pub.h>
00030 #include <jnx/msp_fdb_api.h>
00031 #include <jnx/msp_hw_ts.h>
00032 #include "monitube-data_config.h"
00033 #include "monitube-data_conn.h"
00034 #include "monitube-data_ha.h"
00035 #include "monitube-data_packet.h"
00036 #include "monitube-data_rtp.h"
00037 
00038 
00039 /*** Constants ***/
00040 
00041 #define SHARED_MEM_NAME "monitube-data arena"    
00042 
00043 #define FLOW_TABLE_NAME "monitube flow table" 
00044 
00045 #define FLOW_ENTRY_NAME "monitube flow entry" 
00046 
00047 #define FLOW_AGE_CHECK_INTERVAL 15 
00048 
00049 #define RETRY_FDB_ATTACH_INTERVAL 3 
00050 
00051 #define MAX_MSP_SEND_RETRIES 100 
00052 
00053 
00056 #define IP_NEEDED_BYTES (sizeof(struct ip))
00057 
00061 #define UDP_NEEDED_BYTES (sizeof(struct ip) + sizeof(struct udphdr))
00062 
00067 #define MPEG_TS_PACKET_BYTES (188)
00068 
00072 #define MPEG_TS_HEADER_BYTES (4)
00073 
00077 #define FLOW_BUCKET_COUNT (1024 * 1024)
00078 
00082 const uint32_t HASH_MASK = FLOW_BUCKET_COUNT - 1;
00083 
00084 
00085 /*** Data Structures ***/
00086 
00090 typedef struct flow_entry_s {
00091     // flow information
00092     msp_spinlock_t               lock;       
00093     time_t                       age_ts;     
00094     in_addr_t                    daddr;      
00095     uint16_t                     dport;      
00096     uint16_t                     frag_group; 
00097     char *                       mon;        
00098 
00099     // replication related:
00100     time_t                       r_trigger;  
00101 
00102     // mirroring related:
00103     in_addr_t                    maddr;      
00104     uint32_t                     m_vrf;      
00105 
00106     // monitoring related:
00107        // MDI media loss rate related:
00108     uint32_t                     ssrc;       
00109     source_t                     source;     
00110     int32_t                      mdi_mlr;    
00111        // MDI delay factor related:
00112     uint32_t                     rate;       
00113     msp_hw_ts32_t                base_ts;    
00114     uint32_t                     pl_sum;     
00115     double                       vb_pre;     
00116     double                       vb_post;    
00117     double                       vb_min;     
00118     double                       vb_max;     
00119     double                       mdi_df;     
00120 
00121     // for list at this hash bucket:
00122     TAILQ_ENTRY(flow_entry_s)    entries;    
00123 } flow_entry_t;
00124 
00125 
00129 typedef TAILQ_HEAD(ht_bucket_list_s, flow_entry_s) ht_bucket_list_t;
00130 
00131 
00135 typedef struct hash_bucket_s {
00136     msp_spinlock_t        bucket_lock;    
00137     ht_bucket_list_t      bucket_entries; 
00138 } hash_bucket_t;
00139 
00140 
00153 typedef struct hashtable_s {
00154     hash_bucket_t hash_bucket[FLOW_BUCKET_COUNT]; 
00155 } hashtable_t;
00156 
00157 
00158 static evTimerID        aging_timer;   
00159 static evTimerID        retry_timer;   
00160 static msp_shm_handle_t shm_handle;    
00161 static msp_oc_handle_t  table_handle;  
00162 static msp_oc_handle_t  entry_handle;  
00163 static hashtable_t *    flows_table;   
00164 static atomic_uint_t    loops_running; 
00165 static volatile uint8_t fdb_connected; 
00166 static volatile uint8_t do_shutdown;   
00167 static uint32_t         obj_cache_id;  
00168 static msp_fdb_handle_t fdb_handle;    
00169 
00170 extern volatile boolean is_master; 
00171 
00172 
00173 /*** STATIC/INTERNAL Functions ***/
00174 
00175 
00192 static void
00193 retry_attach_fdb(evContext ctx,
00194                  void * uap __unused,
00195                  struct timespec due __unused,
00196                  struct timespec inter __unused)
00197 {
00198     int rc = msp_fdb_attach(NULL, &fdb_handle);
00199 
00200     if(rc == MSP_EAGAIN) {
00201         return; // will retry again later
00202     } else if(rc != MSP_OK) {
00203         LOG(LOG_ALERT, "%s: Failed to attach to the forwarding database. Check "
00204                 "that it is configured (Error code: %d)", __func__, rc);
00205         // we will keep trying, but something is probably wrong
00206     } else { // it succeeded
00207         evClearTimer(ctx, retry_timer);
00208         evInitID(&retry_timer);
00209         LOG(LOG_INFO, "%s: Attached to FDB", __func__);
00210         fdb_connected = 1;
00211 
00212         // Once FDB is initialized it is safe to init SHM & OC
00213         init_application();
00214     }
00215 }
00216 
00217 
00234 static void
00235 aging_cleanup(evContext ctx __unused,
00236               void * uap __unused,
00237               struct timespec due __unused,
00238               struct timespec inter __unused)
00239 {
00240     const time_t flow_duration = 180;
00241 
00242     uint32_t i, cpu;
00243     hash_bucket_t * bucket;
00244     flow_entry_t * flow, * next;
00245     time_t current_time, flow_timeout;
00246     delete_replication_data_t data;
00247 
00248     cpu = msp_get_current_cpu();
00249     current_time = get_current_time();
00250     flow_timeout = current_time - flow_duration;
00251 
00252     for(i = 0; i < FLOW_BUCKET_COUNT; ++i) {
00253 
00254         bucket = &flows_table->hash_bucket[i];
00255 
00256         // Get the bucket lock
00257         INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
00258 
00259         flow = TAILQ_FIRST(&bucket->bucket_entries);
00260 
00261         while(flow != NULL) {
00262 
00263             // keep next to safely remove from list
00264             next = TAILQ_NEXT(flow, entries);
00265 
00266             // Get the flow lock
00267             INSIST_ERR(msp_spinlock_lock(&flow->lock) == MSP_OK);
00268 
00269             if(flow->age_ts < flow_timeout) { // check for timeout/expiry
00270                 // notify slave
00271                 data.bucket = i;
00272                 data.daddr = flow->daddr;
00273                 data.dport = flow->dport;
00274                 delete_replication_entry(&data);
00275 
00276                 TAILQ_REMOVE(&bucket->bucket_entries, flow, entries);
00277                 msp_objcache_free(entry_handle, flow, cpu, obj_cache_id);
00278             } else {
00279                 // Release the flow lock
00280                 INSIST_ERR(msp_spinlock_unlock(&flow->lock) == MSP_OK);
00281             }
00282             flow = next;
00283         }
00284 
00285         // Release the bucket lock
00286         INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
00287     }
00288 
00289     msp_objcache_reclaim(shm_handle);
00290 }
00291 
00292 
00315 static void
00316 checksum_adjust(
00317     unsigned char * chksum,
00318     unsigned char * optr,
00319     int olen,
00320     unsigned char * nptr,
00321     int nlen)
00322 {
00323     long x, old, new_;
00324     x=chksum[0]*256+chksum[1];
00325     x=~x & 0xFFFF;
00326     while (olen)
00327     {
00328         old=optr[0]*256+optr[1]; optr+=2;
00329         x-=old & 0xffff;
00330         if (x<=0) { x--; x&=0xffff; }
00331         olen-=2;
00332     }
00333     while (nlen)
00334     {
00335         new_=nptr[0]*256+nptr[1]; nptr+=2;
00336         x+=new_ & 0xffff;
00337         if (x & 0x10000) { x++; x&=0xffff; }
00338         nlen-=2;
00339     }
00340     x=~x & 0xFFFF;
00341     chksum[0]=x/256; chksum[1]=x & 0xff;
00342 }
00343 
00344 
00359 static status_t
00360 pullup_bytes(struct jbuf ** pkt_buf, uint16_t num_bytes)
00361 {
00362     struct jbuf * tmp_buf;
00363 
00364     if(jbuf_particle_get_data_length(*pkt_buf) < num_bytes) {
00365         tmp_buf = jbuf_pullup((*pkt_buf), num_bytes);
00366 
00367         if(!tmp_buf) { // check it didn't fail
00368             return EFAIL;
00369         }
00370 
00371         *pkt_buf = tmp_buf;
00372     }
00373     return SUCCESS;
00374 }
00375 
00376 
00394 static void
00395 update_stats_for_flow(void * data,
00396                       uint8_t fragment,
00397                       flow_entry_t * flow,
00398                       uint16_t length)
00399 {
00400     rtp_hdr_t * rh = NULL;
00401     int pl_len;
00402     double rel_time, tmp;
00403     msp_hw_ts32_t now = msp_hw_ts32_read();
00404 
00405     // find rcv'd time relative to base_ts but in seconds w/ 0.0001 precision
00406     if(flow->base_ts < now) { // hasn't wrapped around yet
00407 
00408         rel_time = (double)msp_hw_ts32_diff(now, flow->base_ts)
00409                         / msp_hw_ts32_frequency();
00410 
00411     } else if(flow->base_ts > now) { // wrapped around
00412 
00413         rel_time = (double)(msp_hw_ts32_diff(now, flow->base_ts) + 1 + UINT_MAX)
00414                         / msp_hw_ts32_frequency();
00415     } else {
00416         rel_time = 0.000001; // shouldn't happen
00417     }
00418 
00419     // Check if we are into the next timeframe (base + 1 sec)
00420     if(rel_time > 1.0) {
00421 
00422         // reset the timeframe start to the next second
00423         flow->base_ts += msp_hw_ts32_frequency();
00424         rel_time -= 1.0; // in next interval
00425 
00426         // Check if we are updating the MLR yet
00427 
00428         // these values are discussed in Section A.3 of RFC 1889
00429 
00430         uint32_t extended_max = flow->source.cycles + flow->source.max_seq;
00431         uint32_t expected = extended_max - flow->source.base_seq + 1;
00432         // lost (total) = expected - flow->source.received
00433 
00434         // WRT this interval, since last report
00435         uint32_t expected_interval = expected - flow->source.expected_prior;
00436         uint32_t received_interval = flow->source.received -
00437                                                 flow->source.received_prior;
00438 
00439         flow->source.expected_prior = expected;
00440         flow->source.received_prior = flow->source.received;
00441 
00442         // lost this interval:
00443         flow->mdi_mlr = expected_interval - received_interval;
00444 
00445         // Calculate the DF, store and save
00446         flow->mdi_df = (flow->vb_max - flow->vb_min) / (double)flow->rate;
00447 
00448         /*
00449          * Really we don't need to save the mdi_df, but we do here anyway
00450          * in case we want to reference it in the future.
00451          */
00452 
00453         // Report the (previous timeframe's) MDI stats (DF and MLR)
00454         // no report if 0, which could happen after slave takes over as master
00455         if(flow->mdi_df != 0.0) {
00456             notify_stat_update(flow->daddr, flow->dport,
00457                     flow->mdi_df, flow->mdi_mlr, flow->mon);
00458         }
00459 
00460         flow->pl_sum = 0;
00461         flow->vb_max = 0.0;
00462         flow->vb_min = 0.0;
00463         flow->vb_pre = 0.0;
00464         flow->vb_post = 0.0;
00465     }
00466 
00467     // Update information related to the MDI DF
00468 
00469     if(!fragment) {
00470         // its not part of a secondary fragment, so it should have an RTP header
00471         rh = (rtp_hdr_t *)data;
00472 
00473         if(rh->version != RTP_VERSION) {
00474             LOG(LOG_WARNING, "%s: Found a UDP datagram without a valid "
00475                     "RTP header", __func__);
00476             return;
00477         }
00478 
00479         // Update information related to the MDI DF (length)
00480         pl_len = length - (sizeof(rtp_hdr_t) + (rh->cc * 4));
00481 
00482         if(pl_len < 0) {
00483             LOG(LOG_WARNING, "%s: Found an RTP header without any payload (or "
00484                     "an invalid packet for monitoring)", __func__);
00485             return;
00486         }
00487 
00488         // Update information related to the MDI MLR
00489         if(flow->ssrc == rh->ssrc) {
00490             update_seq(&flow->source, rh->seq);
00491         } else {
00492             // don't even know if it's a valid RTP stream, so don't bother
00493             // with the MLR
00494             flow->mdi_mlr = 0;
00495 
00496             // init these to compare to the next packet
00497             flow->ssrc = rh->ssrc;
00498             flow->source.probation = MIN_SEQUENTIAL;
00499             flow->source.max_seq = rh->seq;
00500         }
00501 
00502     } else {
00503         pl_len = length;
00504 
00505         if(pl_len < 0) {
00506             LOG(LOG_ERR, "%s: Found an IP fragment without any payload "
00507                     "report (or an invalid packet for monitoring)", __func__);
00508             return;
00509         }
00510     }
00511 
00512     pl_len -= (pl_len / MPEG_TS_PACKET_BYTES) * MPEG_TS_HEADER_BYTES;
00513 
00514     if(pl_len < 0) {
00515         LOG(LOG_WARNING, "%s: Found a UDP/RTP datagram without at least one"
00516             " MPEG TS packet in it (or an invalid packet for monitoring)",
00517             __func__);
00518         return;
00519     }
00520 
00521     // ... Continue updating information related to the MDI DF
00522 
00523     tmp = (double)flow->rate * rel_time;
00524 
00525     if((double)flow->pl_sum > tmp) { // want a positive/abs value
00526         flow->vb_pre = (double)flow->pl_sum - tmp;
00527     } else {
00528         flow->vb_pre = tmp - (double)flow->pl_sum;
00529     }
00530 
00531     flow->vb_post = flow->vb_pre + pl_len;
00532     flow->pl_sum += (pl_len << 3); // need bits not bytes *8 = <<3
00533 
00534     if(flow->vb_max == 0 && flow->vb_min == 0) {
00535         // first observed packet in timeframe
00536         flow->vb_max = flow->vb_post;
00537         flow->vb_min = flow->vb_pre;
00538     } else {
00539         // update max and min
00540         if(flow->vb_post > flow->vb_max) {
00541             flow->vb_max = flow->vb_post;
00542         }
00543         if(flow->vb_pre < flow->vb_min) {
00544             flow->vb_min = flow->vb_pre;
00545         }
00546     }
00547 }
00548 
00549 
00562 static msp_fdb_iter_res_t
00563 set_vrf(msp_fdb_rt_info_t * route_info, void * ctxt)
00564 {
00565     if(route_info != NULL) {
00566         *((uint32_t *)ctxt) = route_info->rt_idx;
00567     }
00568 
00569     // once we found one, we'll assume it is good enough
00570     return msp_fdb_iter_stop;
00571 }
00572 
00573 
00595 static status_t
00596 process_packet(struct jbuf * pkt_buf,
00597                int cpu,
00598                uint8_t * mirror,
00599                uint32_t * mirror_vrf)
00600 {
00601     register uint32_t hash;
00602     uint16_t * key;
00603     hash_bucket_t * bucket;
00604     flow_entry_t * flow;
00605     struct in_addr tmp;
00606     uint16_t jb_length = jbuf_total_len(pkt_buf);
00607     struct ip * ip_pkt = jbuf_to_d(pkt_buf, struct ip *);
00608     struct udphdr * udp_hdr =
00609         (struct udphdr *)((uint32_t *)ip_pkt + ip_pkt->ip_hl);
00610     replication_data_t data;
00611 
00612     if(jb_length != ip_pkt->ip_len) {
00613         LOG(LOG_EMERG, "%s: Jbuf does not contain entire packet",
00614                 __func__);
00615         return EFAIL;
00616     }
00617 
00618     // get hash of the just the destination address
00619     // hash input key is 32 bits; hash output is FLOW_BUCKET_COUNT bits
00620     // Could also hash the dest port but it makes dealing with fragments harder
00621 
00622     key = (uint16_t *)&ip_pkt->ip_dst;
00623     hash = 0x5F5F;
00624     hash = ((hash << 5) + hash) ^ key[0];
00625     hash = ((hash << 5) + hash) ^ key[1];
00626 
00627     hash &= HASH_MASK; // trim to output width
00628 
00629     // use hash to lookup a hash bucket and find the matching entry
00630 
00631     bucket = &flows_table->hash_bucket[hash]; // get bucket
00632 
00633     // Get the bucket lock
00634     INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
00635 
00636     flow = TAILQ_FIRST(&bucket->bucket_entries);
00637 
00638     while(flow != NULL) {
00639         if(flow->daddr == ip_pkt->ip_dst.s_addr
00640            && flow->dport == udp_hdr->uh_dport) {
00641 
00642             break; // match
00643         }
00644         flow = TAILQ_NEXT(flow, entries);
00645     }
00646 
00647     // if there's no matching flow, then create one... the slow path
00648     if(flow == NULL) {
00649 
00650         // SLOW PATH:
00651 
00652         flow = msp_objcache_alloc(entry_handle, cpu, obj_cache_id);
00653         if(flow == NULL) {
00654             // Release the bucket lock
00655             INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
00656 
00657             LOG(LOG_ERR, "%s: Failed to allocate object cache for a "
00658                     "flow entry", __func__);
00659             return EFAIL;
00660         }
00661 
00662         // init and grab lock
00663         msp_spinlock_init(&flow->lock);
00664         INSIST_ERR(msp_spinlock_lock(&flow->lock) == MSP_OK);
00665 
00666         TAILQ_INSERT_HEAD(&bucket->bucket_entries, flow, entries);
00667 
00668         flow->daddr = ip_pkt->ip_dst.s_addr;
00669         flow->dport = udp_hdr->uh_dport;
00670 
00671         // Release the bucket lock
00672         INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
00673 
00674         // find out if this flow will match a monitor or mirror
00675 
00676         flow->rate = get_monitored_rate(flow->daddr, &flow->mon);
00677 
00678         if(flow->rate != 0) {
00679             // init monitoring params for this flow
00680             LOG(LOG_INFO, "%s: Monitoring new flow to %s",
00681                     __func__, inet_ntoa(ip_pkt->ip_dst));
00682 
00683             flow->ssrc = 0;
00684             flow->base_ts = msp_hw_ts32_read();
00685             bzero(&flow->source, sizeof(source_t));
00686             flow->pl_sum = 0;
00687             flow->vb_max = 0.0;
00688             flow->vb_min = 0.0;
00689             flow->vb_pre = 0.0;
00690             flow->vb_post = 0.0;
00691         } else {
00692             LOG(LOG_INFO, "%s: NOT Monitoring new flow to %s",
00693                     __func__, inet_ntoa(ip_pkt->ip_dst));
00694         }
00695 
00696         flow->maddr = get_mirror(flow->daddr);
00697 
00698         if(flow->maddr != 0) {
00699             flow->m_vrf = 0;
00700             // look up VRF in FDB
00701             if(msp_fdb_get_all_route_records(fdb_handle, PROTO_IPV4,
00702                     set_vrf, &flow->m_vrf) != MSP_OK) {
00703 
00704                 tmp.s_addr = flow->maddr;
00705                 LOG(LOG_ERR, "%s: Did not successfully lookup a VRF "
00706                         "for mirrored site %s", __func__, inet_ntoa(tmp));
00707             }
00708         }
00709 
00710         flow->r_trigger = get_current_time(); // flag to replicate asap
00711 
00712     } else {
00713         // Get the flow lock
00714         INSIST_ERR(msp_spinlock_lock(&flow->lock) == MSP_OK);
00715 
00716         // Release the bucket lock
00717         INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
00718     }
00719 
00720     // there's a matching flow, so use it to forward the traffic
00721     // FAST PATH:
00722 
00723     flow->age_ts = get_current_time();
00724 
00725     // If it is the first fragment note the frag ID
00726     if((ntohs(ip_pkt->ip_off) & IP_MF)) {
00727         flow->frag_group = ip_pkt->ip_id;
00728     } else {
00729         flow->frag_group = 0;
00730     }
00731 
00732     if(flow->rate != 0) { // is it monitored
00733 
00734         if(!pullup_bytes(&pkt_buf, (ip_pkt->ip_hl * 4)
00735                 + sizeof(struct udphdr) + sizeof(rtp_hdr_t))) {
00736 
00737             // refresh pointer into jbuf data
00738             ip_pkt = jbuf_to_d(pkt_buf, struct ip *);
00739             udp_hdr = (struct udphdr *)((uint32_t *)ip_pkt + ip_pkt->ip_hl);
00740 
00741             update_stats_for_flow((uint8_t *)udp_hdr + sizeof(struct udphdr),
00742                     FALSE, flow, jb_length -
00743                     ((ip_pkt->ip_hl * 4) + sizeof(struct udphdr)));
00744         } else {
00745             LOG(LOG_NOTICE, "%s: Couldn't monitor UDP datagram because there "
00746                     "were not enough bytes to form an RTP header", __func__);
00747         }
00748     }
00749 
00750     if(flow->maddr != 0) { // is it mirrored anywhere
00751 
00752         // adjust IP checksum taking new dest IP addresses into account
00753         checksum_adjust((unsigned char *)&ip_pkt->ip_sum,
00754             (unsigned char *)&ip_pkt->ip_dst, sizeof(in_addr_t),
00755             (unsigned char *)&flow->maddr, sizeof(in_addr_t));
00756 
00757         // adjust UDP checksum taking new dest IP addresses into account
00758         checksum_adjust((unsigned char *)&udp_hdr->uh_sum,
00759                 (unsigned char *)&ip_pkt->ip_dst, sizeof(in_addr_t),
00760                 (unsigned char *)&flow->maddr, sizeof(in_addr_t));
00761 
00762         // change destination address
00763         ip_pkt->ip_dst.s_addr = flow->maddr;
00764 
00765         *mirror = 1;
00766         *mirror_vrf = flow->m_vrf;
00767     }
00768 
00769     if(flow->r_trigger <= get_current_time()) { // is it time to replicate ?
00770 
00771         // notify slave
00772         data.bucket = hash;
00773         data.daddr = flow->daddr;
00774         data.dport = flow->dport;
00775         data.age_ts = flow->age_ts;
00776         data.rate = flow->rate;
00777         data.maddr = flow->maddr;
00778         data.m_vrf = flow->m_vrf;
00779         if(flow->rate) {
00780             data.ssrc = flow->ssrc;
00781             memcpy(&data.source, &flow->source, sizeof(source_t));
00782             strncpy(data.mon, flow->mon, sizeof(MAX_MON_NAME_LEN));
00783         } else {
00784             data.mon[0] = '\0';
00785         }
00786 
00787         update_replication_entry(&data);
00788         flow->r_trigger += get_replication_interval();
00789     }
00790 
00791     // Release the flow lock
00792     INSIST_ERR(msp_spinlock_unlock(&flow->lock) == MSP_OK);
00793 
00794     return SUCCESS;
00795 }
00796 
00797 
00817 static status_t
00818 process_fragment(struct ip * ip_pkt,
00819                  uint16_t jb_length,
00820                  uint8_t * mirror,
00821                  uint32_t * mirror_vrf)
00822 {
00823     register uint32_t hash;
00824     uint16_t * key;
00825     hash_bucket_t * bucket;
00826     flow_entry_t * flow;
00827 
00828     if(jb_length != ip_pkt->ip_len) {
00829         LOG(LOG_WARNING, "%s: Jbuf does not contain entire packet",
00830                 __func__);
00831     }
00832 
00833     // get hash of the just the destination address
00834     // hash input key is 32 bits; hash output is FLOW_BUCKET_COUNT bits
00835     // Could also hash the dest port but it makes dealing with fragments harder
00836 
00837     key = (uint16_t *)&ip_pkt->ip_dst;
00838     hash = 0x5F5F;
00839     hash = ((hash << 5) + hash) ^ key[0];
00840     hash = ((hash << 5) + hash) ^ key[1];
00841 
00842     hash &= HASH_MASK; // trim to output width
00843 
00844     // use hash to lookup a hash bucket and find the matching entry
00845 
00846     bucket = &flows_table->hash_bucket[hash]; // get bucket
00847 
00848     // Get the bucket lock
00849     INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
00850 
00851     flow = TAILQ_FIRST(&bucket->bucket_entries);
00852 
00853     while(flow != NULL) {
00854         if(flow->daddr == ip_pkt->ip_dst.s_addr
00855            && flow->frag_group == ip_pkt->ip_id) {
00856 
00857             break; // match
00858         }
00859         flow = TAILQ_NEXT(flow, entries);
00860     }
00861 
00862     // if there's no matching flow, we haven't seen the first fragment yet
00863     if(flow == NULL) {
00864         // Release the bucket lock
00865         INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
00866 
00867         LOG(LOG_WARNING, "%s: Received a packet from %s. It is an IP "
00868                 "fragment, but we have not yet received the first "
00869                 "fragment, so we cannot tell if it belongs to an "
00870                 "monitube application",
00871             __func__, inet_ntoa(ip_pkt->ip_src));
00872 
00873         return SUCCESS; // don't know anything about this flow
00874     } else {
00875         // Get the flow lock
00876         INSIST_ERR(msp_spinlock_lock(&flow->lock) == MSP_OK);
00877 
00878         // Release the bucket lock
00879         INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
00880     }
00881 
00882     // else there's a matching flow, so use it to forward the traffic
00883     // FAST PATH:
00884     flow->age_ts = get_current_time();
00885 
00886     if(flow->rate != 0) { // is it monitored
00887         update_stats_for_flow((uint32_t *)ip_pkt + ip_pkt->ip_hl, TRUE,
00888                 flow, jb_length - ((ip_pkt->ip_hl * 4)));
00889     }
00890 
00891     if(flow->maddr != 0) { // is it mirrored anywhere
00892         // adjust IP checksum taking new dest IP addresses into account
00893         checksum_adjust((unsigned char *)&ip_pkt->ip_sum,
00894             (unsigned char *)&ip_pkt->ip_dst, sizeof(in_addr_t),
00895             (unsigned char *)&flow->maddr, sizeof(in_addr_t));
00896 
00897         // change destination address
00898         ip_pkt->ip_dst.s_addr = flow->maddr;
00899 
00900         *mirror = 1;
00901         *mirror_vrf = flow->m_vrf;
00902     }
00903 
00904     // Release the flow lock
00905     INSIST_ERR(msp_spinlock_unlock(&flow->lock) == MSP_OK);
00906 
00907     return SUCCESS;
00908 }
00909 
00910 
00924 static int
00925 send_packet(struct jbuf * pkt_buf,
00926             const msp_data_handle_t const * handle)
00927 {
00928     // enqueue it back into the FIFO to go out
00929 
00930     int rc = MSP_DATA_SEND_RETRY;
00931     int retries = 0;
00932 
00933     while(rc == MSP_DATA_SEND_RETRY && ++retries <= MAX_MSP_SEND_RETRIES) {
00934         rc = msp_data_send(*handle, pkt_buf, MSP_MSG_TYPE_PACKET);
00935     }
00936 
00937     if(rc == MSP_DATA_SEND_FAIL) {
00938 
00939         LOG(LOG_ERR, "%s: Failed to forward packet using msp_data_send().",
00940             __func__);
00941 
00942     } else if(rc == MSP_DATA_SEND_RETRY) { // Panic / calls exit(1)
00943 
00944         LOG(LOG_EMERG, "%s: PANIC: Failed to send a jbuf after %d retries "
00945             "with msp_data_send().", __func__, MAX_MSP_SEND_RETRIES);
00946 
00947     } else if(rc != MSP_OK) {
00948 
00949         LOG(LOG_ERR, "%s: Failed to forward packet and got unknown return "
00950             "code from msp_data_send().", __func__);
00951     }
00952     return rc;
00953 }
00954 
00955 
00963 static void *
00964 monitube_process_packet(msp_dataloop_args_t * params)
00965 {
00966     struct jbuf * pkt_buf;
00967     jbuf_svc_set_info_t ss_info;
00968     struct ip * ip_pkt;
00969     int type, cpu;
00970     uint16_t ip_frag_offset;
00971     uint8_t ip_options_bytes = 0, mirror = 0;
00972     uint32_t mirror_vrf = 0;
00973     sigset_t sig_mask;
00974 
00975     // Block SIGTERM to this thread/main thread will handle otherwise we inherit
00976     // this behaviour in our threads sigmask and the signal might come here
00977     sigemptyset(&sig_mask);
00978     sigaddset(&sig_mask, SIGTERM);
00979     pthread_sigmask(SIG_BLOCK, &sig_mask, NULL);
00980 
00981     atomic_add_uint(1, &loops_running);
00982 
00983     cpu = msp_data_get_cpu_num(params->dhandle);
00984     INSIST_ERR(cpu != MSP_NEXT_NONE);
00985     
00986     LOG(LOG_INFO, "%s: STARTED dLoop on CPU %d", __func__, cpu);
00987 
00988     // Start the packet loop...
00989     while(!do_shutdown) {
00990 
00991         // Dequeue a packet from the rx-fifo
00992         pkt_buf = msp_data_recv(params->dhandle, &type);
00993 
00994         if(pkt_buf == NULL) { // Didn't get anything
00995             continue;
00996         }
00997 
00998         if(!is_master) {
00999             jbuf_free(pkt_buf);
01000             continue;
01001         }
01002 
01003         if(type != MSP_MSG_TYPE_PACKET) { // Didn't get network traffic
01004             LOG(LOG_WARNING, "%s: Message wasn't a packet...dropping",
01005                 __func__);
01006             jbuf_free(pkt_buf);
01007             continue;
01008         }
01009 
01010         jbuf_get_svc_set_info(pkt_buf, &ss_info);
01011         if(ss_info.mon_svc == 0) { // we'll drop non-sampled packets
01012             LOG(LOG_NOTICE,"%s: Monitube-data encountered a non-sampled packet",
01013                 __func__);
01014             jbuf_free(pkt_buf);
01015             continue;
01016         }
01017 
01018         if(pullup_bytes(&pkt_buf, IP_NEEDED_BYTES)) {
01019 
01020             LOG(LOG_ERR, "%s: Dropped a packet because there's not enough "
01021                 "bytes to form an IP header.", __func__);
01022 
01023             jbuf_free(pkt_buf);
01024             continue;
01025         }
01026 
01027         // Get IP header
01028         ip_pkt = jbuf_to_d(pkt_buf, struct ip *);
01029 
01030         if(!ip_pkt || ip_pkt->ip_p != IPPROTO_UDP) { // only care about UDP/RTP
01031             jbuf_free(pkt_buf);
01032             continue;
01033         }
01034 
01035         ip_frag_offset = ntohs(ip_pkt->ip_off);
01036         ip_options_bytes = (ip_pkt->ip_hl * 4) - sizeof(struct ip);
01037         mirror = 0;
01038 
01039         if((ip_frag_offset & IP_OFFMASK)) {
01040 
01041             // It's a fragment, but not the first fragment
01042             process_fragment(ip_pkt, pkt_buf->jb_total_len,
01043                     &mirror, &mirror_vrf);
01044 
01045         } else if(!pullup_bytes(&pkt_buf, UDP_NEEDED_BYTES+ ip_options_bytes)) {
01046 
01047             // It is UDP, and could be the first fragment or normal
01048             process_packet(pkt_buf, cpu, &mirror, &mirror_vrf);
01049 
01050         } else {
01051             LOG(LOG_NOTICE, "%s: Did not process a packet to %s. There's not "
01052                 "enough bytes to form the UDP header (its not an IP fragment).",
01053                 __func__, inet_ntoa(ip_pkt->ip_dst));
01054         }
01055 
01056         if(mirror) {
01057             jbuf_setvrf(pkt_buf, mirror_vrf);
01058             if(send_packet(pkt_buf, &params->dhandle) != MSP_OK) {
01059                 jbuf_free(pkt_buf);
01060             }
01061         } else {
01062             // Drop by default since we are a monitoring application and
01063             // packets should be copies
01064             jbuf_free(pkt_buf);
01065         }
01066     }
01067 
01068     atomic_sub_uint(1, &loops_running);
01069 
01070     // thread is done if it reaches this point
01071     pthread_exit(NULL);
01072     return NULL;
01073 }
01074 
01075 
01076 /*** GLOBAL/EXTERNAL Functions ***/
01077 
01078 
01085 status_t
01086 init_packet_loops(evContext ctx)
01087 {
01088     int i, rc, core;
01089     msp_dataloop_params_t params;
01090     msp_dataloop_result_t result;
01091     msp_shm_params_t shmp;
01092     msp_objcache_params_t ocp;
01093 
01094     shm_handle = table_handle = entry_handle = flows_table = NULL;
01095     evInitID(&aging_timer);
01096     obj_cache_id = 0; // for now this can always be zero
01097 
01098     LOG(LOG_INFO, "%s: Initializing object cache for data loops", __func__);
01099 
01100     bzero(&shmp, sizeof(shmp));
01101     bzero(&ocp, sizeof(ocp));
01102 
01103     // allocate & initialize the shared memory
01104 
01105     strncpy(shmp.shm_name, SHARED_MEM_NAME, SHM_NAME_LEN);
01106 
01107     if(msp_shm_allocator_init(&shmp) != MSP_OK) {
01108         LOG(LOG_ERR, "%s: Shared memory allocator initialization failed",
01109                 __func__);
01110         return EFAIL;
01111     }
01112 
01113     shm_handle = shmp.shm; // get handle
01114 
01115     // create object cache allocator for the flow look up table
01116     ocp.oc_shm = shm_handle;
01117     ocp.oc_size  = sizeof(hashtable_t);
01118     strncpy(ocp.oc_name, FLOW_TABLE_NAME, OC_NAME_LEN);
01119 
01120     if(msp_objcache_create(&ocp) != MSP_OK) {
01121         LOG(LOG_ERR, "%s: Object-cache allocator initialization failed (table)",
01122                 __func__);
01123         return EFAIL;
01124     }
01125 
01126     table_handle = ocp.oc; // get handle
01127 
01128     // create object cache allocator for the flow look up table entries
01129     ocp.oc_shm = shmp.shm;
01130     ocp.oc_size  = sizeof(flow_entry_t);
01131     strncpy(ocp.oc_name, FLOW_ENTRY_NAME, OC_NAME_LEN);
01132 
01133     if (msp_objcache_create(&ocp) != MSP_OK) {
01134         LOG(LOG_ERR, "%s: Object-cache allocator initialization failed (entry)",
01135                 __func__);
01136         return EFAIL;
01137     }
01138 
01139     entry_handle = ocp.oc; // get handle
01140 
01141     // allocate flows_table in OC:
01142 
01143     flows_table = msp_objcache_alloc(table_handle,
01144             msp_get_current_cpu(), obj_cache_id);
01145     if(flows_table == NULL) {
01146         LOG(LOG_ERR, "%s: Failed to allocate object cache for flows table ",
01147                 __func__);
01148         return EFAIL;
01149     }
01150 
01151     for(i = 0; i < FLOW_BUCKET_COUNT; ++i) {
01152         msp_spinlock_init(&flows_table->hash_bucket[i].bucket_lock);
01153         TAILQ_INIT(&flows_table->hash_bucket[i].bucket_entries);
01154     }
01155 
01156     // init the hardware timestamp infrastructure
01157     rc = msp_hw_ts32_init();
01158     if(rc != MSP_OK) {
01159         LOG(LOG_EMERG, "%s: Failed to initialize HW timestamp infrastructure."
01160                 "(Error code: %d)", __func__, rc);
01161         return EFAIL;
01162     }
01163 
01164     // start ager
01165 
01166     if(evSetTimer(ctx, aging_cleanup, NULL,
01167             evAddTime(evNowTime(), evConsTime(FLOW_AGE_CHECK_INTERVAL, 0)),
01168             evConsTime(FLOW_AGE_CHECK_INTERVAL, 0),
01169             &aging_timer)) {
01170 
01171         LOG(LOG_EMERG, "%s: Failed to initialize a timer to periodically "
01172             "check age of flow entries (Error: %m)", __func__);
01173         return EFAIL;
01174     }
01175 
01176     LOG(LOG_INFO, "%s: Starting packet loops", __func__);
01177 
01178     bzero(&params, sizeof(msp_dataloop_params_t));
01179     bzero(&result, sizeof(msp_dataloop_result_t));
01180 
01181     loops_running = 0;
01182     do_shutdown = 0;
01183 
01184     // go through the available data cores
01185     core = msp_env_get_next_data_core(MSP_NEXT_NONE);
01186     while(core != MSP_NEXT_END) {
01187 
01188         // for each data core, create only one data cpu
01189         rc = msp_env_get_next_data_cpu_in_core(core, MSP_NEXT_NONE);
01190 
01191         if(rc != MSP_NEXT_END) {
01192             LOG(LOG_INFO, "%s: Creating a data loop on CPU %d (in core %d)",
01193                     __func__, rc, core);
01194 
01195             rc = msp_data_create_loop_on_cpu(rc, monitube_process_packet,
01196                     &params, &result);
01197 
01198             if(rc != MSP_OK) {
01199                 LOG(LOG_ERR, "%s: Failed to create a data loop (Error: %d)",
01200                         __func__, rc);
01201                 return EFAIL;
01202             }
01203 
01204         } else {
01205             LOG(LOG_ERR, "%s: Found no available data CPUs in data core %d",
01206                 __func__, rc);
01207             return EFAIL;
01208         }
01209 
01210         core = msp_env_get_next_data_core(core);
01211     }
01212     return SUCCESS;
01213 }
01214 
01215 
01222 void
01223 init_forwarding_database(evContext ctx)
01224 {
01225     evInitID(&retry_timer);
01226 
01227     LOG(LOG_INFO, "%s: Attaching to the forwarding database", __func__);
01228 
01229     // need to try again later because FDB isn't initialized yet
01230     if(evSetTimer(ctx, retry_attach_fdb, NULL, evConsTime(0, 0),
01231             evConsTime(RETRY_FDB_ATTACH_INTERVAL, 0), &retry_timer)) {
01232 
01233         LOG(LOG_EMERG, "%s: Failed to initialize a timer to retry "
01234             "attaching to FDB (Error: %m)", __func__);
01235     }
01236 }
01237 
01238 
01245 void
01246 stop_packet_loops(evContext ctx)
01247 {
01248     do_shutdown = 1;
01249 
01250     if(evTestID(aging_timer)) {
01251         evClearTimer(ctx, aging_timer);
01252         evInitID(&aging_timer);
01253     }
01254 
01255     if(evTestID(retry_timer)) {
01256         evClearTimer(ctx, retry_timer);
01257         evInitID(&retry_timer);
01258     }
01259 
01260     while(loops_running > 0) ; // note the spinning while waiting
01261 
01262     if(fdb_connected) {
01263         msp_fdb_detach(fdb_handle);
01264     }
01265 
01266     fdb_connected = 0;
01267 }
01268 
01269 
01273 void
01274 destroy_packet_loops_oc(void)
01275 {
01276     // loops must be all shutdown
01277 
01278     if(flows_table) {
01279         msp_objcache_free(
01280                 table_handle, flows_table, msp_get_current_cpu(), obj_cache_id);
01281         flows_table = NULL;
01282     }
01283 
01284     if(table_handle) {
01285         msp_objcache_destroy(table_handle);
01286         table_handle = NULL;
01287     }
01288 
01289     if(entry_handle) {
01290         msp_objcache_destroy(entry_handle);
01291         entry_handle = NULL;
01292     }
01293 }
01294 
01295 
01299 void
01300 clean_flows_with_any_monitor(void)
01301 {
01302     uint32_t i, cpu;
01303     hash_bucket_t * bucket;
01304     flow_entry_t * flow, * next;
01305     delete_replication_data_t data;
01306 
01307     cpu = msp_get_current_cpu();
01308 
01309     for(i = 0; i < FLOW_BUCKET_COUNT; ++i) {
01310 
01311         bucket = &flows_table->hash_bucket[i];
01312 
01313         // Get the bucket lock
01314         INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
01315 
01316         flow = TAILQ_FIRST(&bucket->bucket_entries);
01317 
01318         while(flow != NULL) {
01319 
01320             // keep next to safely remove from list
01321             next = TAILQ_NEXT(flow, entries);
01322 
01323             // Get the flow lock
01324             INSIST_ERR(msp_spinlock_lock(&flow->lock) == MSP_OK);
01325 
01326             if(flow->mon != NULL) {
01327                 // notify slave
01328                 data.bucket = i;
01329                 data.daddr = flow->daddr;
01330                 data.dport = flow->dport;
01331                 delete_replication_entry(&data);
01332 
01333                 TAILQ_REMOVE(&bucket->bucket_entries, flow, entries);
01334                 msp_objcache_free(entry_handle, flow, cpu, obj_cache_id);
01335                 flow = next;
01336             } else {
01337                 // Release the flow lock
01338                 INSIST_ERR(msp_spinlock_unlock(&flow->lock) == MSP_OK);
01339                 flow = next;
01340             }
01341         }
01342 
01343         // Release the bucket lock
01344         INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
01345     }
01346 
01347     msp_objcache_reclaim(shm_handle);
01348 }
01349 
01350 
01354 void
01355 clean_flows_with_any_mirror(void)
01356 {
01357     uint32_t i, cpu;
01358     hash_bucket_t * bucket;
01359     flow_entry_t * flow, * next;
01360     delete_replication_data_t data;
01361 
01362     cpu = msp_get_current_cpu();
01363 
01364     for(i = 0; i < FLOW_BUCKET_COUNT; ++i) {
01365 
01366         bucket = &flows_table->hash_bucket[i];
01367 
01368         // Get the bucket lock
01369         INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
01370 
01371         flow = TAILQ_FIRST(&bucket->bucket_entries);
01372 
01373         while(flow != NULL) {
01374 
01375             // keep next to safely remove from list
01376             next = TAILQ_NEXT(flow, entries);
01377 
01378             // Get the flow lock
01379             INSIST_ERR(msp_spinlock_lock(&flow->lock) == MSP_OK);
01380 
01381             if(flow->maddr != 0) {
01382                 // notify slave
01383                 data.bucket = i;
01384                 data.daddr = flow->daddr;
01385                 data.dport = flow->dport;
01386                 delete_replication_entry(&data);
01387 
01388                 TAILQ_REMOVE(&bucket->bucket_entries, flow, entries);
01389                 msp_objcache_free(entry_handle, flow, cpu, obj_cache_id);
01390                 flow = next;
01391             } else {
01392                 // Release the flow lock
01393                 INSIST_ERR(msp_spinlock_unlock(&flow->lock) == MSP_OK);
01394                 flow = next;
01395             }
01396         }
01397 
01398         // Release the bucket lock
01399         INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
01400     }
01401 
01402     msp_objcache_reclaim(shm_handle);
01403 }
01404 
01405 
01412 void
01413 clean_flows_with_mirror(in_addr_t addr)
01414 {
01415     uint32_t i, cpu;
01416     hash_bucket_t * bucket;
01417     flow_entry_t * flow, * next;
01418     delete_replication_data_t data;
01419 
01420     cpu = msp_get_current_cpu();
01421 
01422     for(i = 0; i < FLOW_BUCKET_COUNT; ++i) {
01423 
01424         bucket = &flows_table->hash_bucket[i];
01425 
01426         // Get the bucket lock
01427         INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
01428 
01429         flow = TAILQ_FIRST(&bucket->bucket_entries);
01430 
01431         while(flow != NULL) {
01432 
01433             // keep next to safely remove from list
01434             next = TAILQ_NEXT(flow, entries);
01435 
01436             // Get the flow lock
01437             INSIST_ERR(msp_spinlock_lock(&flow->lock) == MSP_OK);
01438 
01439             if(flow->daddr == addr && flow->maddr != 0) {
01440                 // notify slave
01441                 data.bucket = i;
01442                 data.daddr = flow->daddr;
01443                 data.dport = flow->dport;
01444                 delete_replication_entry(&data);
01445 
01446                 TAILQ_REMOVE(&bucket->bucket_entries, flow, entries);
01447                 msp_objcache_free(entry_handle, flow, cpu, obj_cache_id);
01448                 flow = next;
01449             } else {
01450                 // Release the flow lock
01451                 INSIST_ERR(msp_spinlock_unlock(&flow->lock) == MSP_OK);
01452                 flow = next;
01453             }
01454         }
01455 
01456         // Release the bucket lock
01457         INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
01458     }
01459 
01460     msp_objcache_reclaim(shm_handle);
01461 }
01462 
01463 
01473 void
01474 redirect_flows_with_mirror(in_addr_t addr, in_addr_t to)
01475 {
01476     uint32_t i, cpu;
01477     hash_bucket_t * bucket;
01478     flow_entry_t * flow, * next;
01479     struct in_addr tmp;
01480     replication_data_t data;
01481 
01482     cpu = msp_get_current_cpu();
01483 
01484     for(i = 0; i < FLOW_BUCKET_COUNT; ++i) {
01485 
01486         bucket = &flows_table->hash_bucket[i];
01487 
01488         // Get the bucket lock
01489         INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
01490 
01491         flow = TAILQ_FIRST(&bucket->bucket_entries);
01492 
01493         while(flow != NULL) {
01494 
01495             // keep next to safely remove from list
01496             next = TAILQ_NEXT(flow, entries);
01497 
01498             // Get the flow lock
01499             INSIST_ERR(msp_spinlock_lock(&flow->lock) == MSP_OK);
01500 
01501             if(flow->daddr == addr && flow->maddr != 0) {
01502 
01503                 flow->maddr = to; // update
01504                 flow->m_vrf = 0;
01505                 if(flow->maddr != 0) {
01506                     flow->m_vrf = 0;
01507                     // look up VRF in FDB
01508                     if(msp_fdb_get_all_route_records(fdb_handle, PROTO_IPV4,
01509                             set_vrf, &flow->m_vrf) != MSP_OK) {
01510 
01511                         tmp.s_addr = flow->maddr;
01512                         LOG(LOG_ERR, "Did not successfully lookup a VRF "
01513                                 "for mirrored site %s", inet_ntoa(tmp));
01514                     }
01515                 }
01516 
01517                 // notify slave
01518                 data.bucket = i;
01519                 data.daddr = flow->daddr;
01520                 data.dport = flow->dport;
01521                 data.maddr = flow->maddr; // new
01522                 data.m_vrf = flow->m_vrf; // new
01523                 data.age_ts = flow->age_ts;
01524                 data.ssrc = flow->ssrc;
01525                 memcpy(&data.source, &flow->source, sizeof(source_t));
01526                 data.rate = flow->rate;
01527                 strncpy(data.mon, flow->mon, sizeof(MAX_MON_NAME_LEN));
01528 
01529                 update_replication_entry(&data);
01530             }
01531             flow = next;
01532             INSIST_ERR(msp_spinlock_unlock(&flow->lock) == MSP_OK);
01533         }
01534 
01535         // Release the bucket lock
01536         INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
01537     }
01538 
01539     msp_objcache_reclaim(shm_handle);
01540 }
01541 
01542 
01549 void
01550 clean_flows_with_monitor(char * name)
01551 {
01552     uint32_t i, cpu;
01553     hash_bucket_t * bucket;
01554     flow_entry_t * flow, * next;
01555     delete_replication_data_t data;
01556 
01557     cpu = msp_get_current_cpu();
01558 
01559     for(i = 0; i < FLOW_BUCKET_COUNT; ++i) {
01560 
01561         bucket = &flows_table->hash_bucket[i];
01562 
01563         // Get the bucket lock
01564         INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
01565 
01566         flow = TAILQ_FIRST(&bucket->bucket_entries);
01567 
01568         while(flow != NULL) {
01569 
01570             // keep next to safely remove from list
01571             next = TAILQ_NEXT(flow, entries);
01572 
01573             // Get the flow lock
01574             INSIST_ERR(msp_spinlock_lock(&flow->lock) == MSP_OK);
01575 
01576             if(flow->mon && strcmp(flow->mon, name) == 0) {
01577                 // notify slave
01578                 data.bucket = i;
01579                 data.daddr = flow->daddr;
01580                 data.dport = flow->dport;
01581                 delete_replication_entry(&data);
01582 
01583                 TAILQ_REMOVE(&bucket->bucket_entries, flow, entries);
01584                 msp_objcache_free(entry_handle, flow, cpu, obj_cache_id);
01585 
01586             } else {
01587                 // Release the flow lock
01588                 INSIST_ERR(msp_spinlock_unlock(&flow->lock) == MSP_OK);
01589             }
01590 
01591             flow = next;
01592         }
01593 
01594         // Release the bucket lock
01595         INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
01596     }
01597 
01598     msp_objcache_reclaim(shm_handle);
01599 }
01600 
01601 
01614 void
01615 clean_flows_in_monitored_prefix(char * name, in_addr_t prefix, in_addr_t mask)
01616 {
01617     uint32_t i, cpu;
01618     hash_bucket_t * bucket;
01619     flow_entry_t * flow, * next;
01620     delete_replication_data_t data;
01621 
01622     cpu = msp_get_current_cpu();
01623 
01624     for(i = 0; i < FLOW_BUCKET_COUNT; ++i) {
01625 
01626         bucket = &flows_table->hash_bucket[i];
01627 
01628         // Get the bucket lock
01629         INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
01630 
01631         flow = TAILQ_FIRST(&bucket->bucket_entries);
01632 
01633         while(flow != NULL) {
01634 
01635             // keep next to safely remove from list
01636             next = TAILQ_NEXT(flow, entries);
01637 
01638             // Get the flow lock
01639             INSIST_ERR(msp_spinlock_lock(&flow->lock) == MSP_OK);
01640 
01641             if(((name == NULL) || (flow->mon && strcmp(flow->mon, name) == 0))
01642                && (prefix & mask) == (flow->daddr & mask)) {
01643 
01644                 // notify slave
01645                 data.bucket = i;
01646                 data.daddr = flow->daddr;
01647                 data.dport = flow->dport;
01648                 delete_replication_entry(&data);
01649 
01650                 TAILQ_REMOVE(&bucket->bucket_entries, flow, entries);
01651                 msp_objcache_free(entry_handle, flow, cpu, obj_cache_id);
01652             } else {
01653                 // Release the flow lock
01654                 INSIST_ERR(msp_spinlock_unlock(&flow->lock) == MSP_OK);
01655             }
01656             flow = next;
01657         }
01658 
01659         // Release the bucket lock
01660         INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
01661     }
01662 
01663     msp_objcache_reclaim(shm_handle);
01664 }
01665 
01666 
01681 status_t
01682 get_next_flow_state(replication_data_t * last, replication_data_t * next)
01683 {
01684     uint32_t i;
01685     hash_bucket_t * bucket;
01686     flow_entry_t * flow;
01687     boolean return_next = (last == NULL);
01688 
01689     for(i = last->bucket; i < FLOW_BUCKET_COUNT; ++i) {
01690 
01691         bucket = &flows_table->hash_bucket[last->bucket];
01692 
01693         // Get the bucket lock
01694         INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
01695 
01696         flow = TAILQ_FIRST(&bucket->bucket_entries);
01697 
01698         while(flow != NULL) {
01699 
01700             // Get the flow lock
01701             INSIST_ERR(msp_spinlock_lock(&flow->lock) == MSP_OK);
01702 
01703             if(return_next) {
01704                 // return this flow's information
01705                 next->bucket = i;
01706                 next->daddr = flow->daddr;
01707                 next->dport = flow->dport;
01708                 next->age_ts = flow->age_ts;
01709                 next->ssrc = flow->ssrc;
01710                 memcpy(&next->source, &flow->source, sizeof(source_t));
01711                 next->rate = flow->rate;
01712                 next->maddr = flow->maddr;
01713                 next->m_vrf = flow->m_vrf;
01714                 strncpy(next->mon, flow->mon, sizeof(MAX_MON_NAME_LEN));
01715                 return SUCCESS;
01716             }
01717 
01718             // match the last returned, then set return_next
01719             if(last->daddr == flow->daddr && last->dport == flow->dport) {
01720                 // find the next flow and return it
01721                 return_next = TRUE;
01722             }
01723             // Release the flow lock
01724             INSIST_ERR(msp_spinlock_unlock(&flow->lock) == MSP_OK);
01725             flow = TAILQ_NEXT(flow, entries);
01726         }
01727 
01728         // Release the bucket lock
01729         INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
01730     }
01731 
01732     return EFAIL;
01733 }
01734 
01735 
01749 status_t
01750 add_flow_state(replication_data_t * new_data)
01751 {
01752     hash_bucket_t * bucket;
01753     flow_entry_t * flow;
01754     uint32_t cpu, rc;
01755 
01756     cpu = msp_get_current_cpu();
01757 
01758     bucket = &flows_table->hash_bucket[new_data->bucket];
01759 
01760     // Get the bucket lock
01761     INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
01762 
01763     flow = TAILQ_FIRST(&bucket->bucket_entries);
01764 
01765     while(flow != NULL) {
01766         if(flow->daddr == new_data->daddr && flow->dport == new_data->dport) {
01767             break; // match
01768         }
01769         flow = TAILQ_NEXT(flow, entries);
01770     }
01771 
01772     if(flow == NULL) { // adding a new one
01773         flow = msp_objcache_alloc(entry_handle, cpu, obj_cache_id);
01774         if(flow == NULL) {
01775             // Release the bucket lock
01776             INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
01777             LOG(LOG_ERR, "%s: Failed to allocate object cache for a "
01778                     "flow entry", __func__);
01779             return EFAIL;
01780         }
01781 
01782         // init flow lock
01783         msp_spinlock_init(&flow->lock);
01784         flow->daddr = new_data->daddr;
01785         flow->dport = new_data->dport;
01786         // insert into bucket list of flows
01787         TAILQ_INSERT_HEAD(&bucket->bucket_entries, flow, entries);
01788 
01789     }
01790     // Get the flow lock
01791     INSIST_ERR(msp_spinlock_lock(&flow->lock) == MSP_OK);
01792 
01793     // Release the bucket lock
01794     INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
01795 
01796     // copy things into flow from backup data received
01797 
01798     flow->age_ts = new_data->age_ts;
01799     flow->ssrc = new_data->ssrc;
01800     memcpy(&flow->source, &new_data->source, sizeof(source_t));
01801     flow->rate = new_data->rate;
01802     flow->maddr = new_data->maddr;
01803     flow->m_vrf = new_data->m_vrf;
01804 
01805     // init flow info not present in new_data
01806     msp_spinlock_init(&flow->lock);
01807     flow->frag_group = 0;
01808 
01809     if(flow->rate != 0) {
01810         // init monitoring params for this flow
01811 
01812         rc = get_monitored_rate(flow->daddr, &flow->mon); // populate flow->mon
01813         if(rc && strcmp(new_data->mon, flow->mon) != 0) {
01814             LOG(LOG_ERR, "%s: Could not find the same monitor group in the "
01815                     "slave's configuration", __func__);
01816         }
01817 
01818         flow->base_ts = msp_hw_ts32_read();
01819         flow->mdi_mlr = 0;
01820         flow->pl_sum = 0;
01821         flow->vb_max = 0.0;
01822         flow->vb_min = 0.0;
01823         flow->vb_pre = 0.0;
01824         flow->vb_post = 0.0;
01825         flow->mdi_df = 0.0;
01826     } else {
01827         flow->mon = NULL;
01828     }
01829 
01830     INSIST_ERR(msp_spinlock_unlock(&flow->lock) == MSP_OK);
01831 
01832     return SUCCESS;
01833 }
01834 
01835 
01844 void
01845 remove_flow_state(delete_replication_data_t * data)
01846 {
01847     hash_bucket_t * bucket;
01848     flow_entry_t * flow;
01849     uint32_t cpu;
01850 
01851     cpu = msp_get_current_cpu();
01852 
01853     bucket = &flows_table->hash_bucket[data->bucket];
01854 
01855     // Get the bucket lock
01856     INSIST_ERR(msp_spinlock_lock(&bucket->bucket_lock) == MSP_OK);
01857 
01858     flow = TAILQ_FIRST(&bucket->bucket_entries);
01859 
01860     while(flow != NULL) {
01861         if(flow->daddr == data->daddr && flow->dport == data->dport) {
01862 
01863             INSIST_ERR(msp_spinlock_lock(&flow->lock) == MSP_OK);
01864 
01865             TAILQ_REMOVE(&bucket->bucket_entries, flow, entries);
01866             msp_objcache_free(entry_handle, flow, cpu, obj_cache_id);
01867 
01868             INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
01869             return;
01870         }
01871         flow = TAILQ_NEXT(flow, entries);
01872     }
01873 
01874     INSIST_ERR(msp_spinlock_unlock(&bucket->bucket_lock) == MSP_OK);
01875 
01876     LOG(LOG_ERR, "%s: Did not find a flow entry to remove", __func__);
01877 }

2007-2009 Juniper Networks, Inc. All rights reserved. The information contained herein is confidential information of Juniper Networks, Inc., and may not be used, disclosed, distributed, modified, or copied without the prior written consent of Juniper Networks, Inc. in an express license. This information is subject to change by Juniper Networks, Inc. Juniper Networks, the Juniper Networks logo, and JUNOS are registered trademarks of Juniper Networks, Inc. in the United States and other countries. All other trademarks, service marks, registered trademarks, or registered service marks are the property of their respective owners.
Generated on Sun May 30 20:27:02 2010 for SDK Your Net Corporation Monitube IPTV Monitoring Example: monitube-data 1.0 by Doxygen 1.5.1