monitube2-data_main.c

Go to the documentation of this file.
00001 /*
00002  * $Id: monitube2-data_main.c 347265 2009-11-19 13:55:39Z kdickman $
00003  *
00004  * This code is provided as is by Juniper Networks SDK Developer Support.
00005  * It is provided with no warranties or guarantees, and Juniper Networks
00006  * will not provide support or maintenance of this code in any fashion.
00007  * The code is provided only to help a developer better understand how
00008  * the SDK can be used.
00009  *
00010  * Copyright (c) 2009, Juniper Networks, Inc.
00011  * All rights reserved.
00012  */
00013 
00020 /* The Application and This Plug-in's Documentation: */
00021 
00022 
00529 #include "monitube2-data_main.h"
00530 #include <netinet/ip.h>
00531 #include <netinet/udp.h>
00532 
00533 #include <sys/jnx/jbuf.h>
00534 
00535 #include <jnx/multi-svcs/msvcs_plugin.h>
00536 #include <jnx/multi-svcs/msvcs_state.h>
00537 #include <jnx/multi-svcs/msvcs_events.h>
00538 #include <jnx/multi-svcs/msvcs_flow.h>
00539 #include <jnx/multi-svcs/msvcs_session.h>
00540 
00541 #include <jnx/junos_kcom.h>
00542 #include <jnx/junos_kcom_mpsdk_cfg.h>
00543 
00544 #include <jnx/msp_pkt_utils.h>
00545 #include <jnx/msp_fdb_api.h> 
00546 #include <jnx/msp_hw_ts.h>
00547 #include <jnx/ipc_msp_pub.h>
00548 
00549 #include "monitube2-data_config.h"
00550 #include "monitube2-data_conn.h"
00551 #include "monitube2-data_rtp.h"
00552 #include "monitube2-data_packet.h"
00553 
00554 /*** Constants ***/
00555 
00556 // PIC redundancy-state strings for KCOM_IFDEV_REDUNDANCY_STATE_* values
00557 const char * waiting_state = "waiting for primary"; 
00558 const char * primary_state = "primary active"; 
00559 const char * secondary_state = "secondary active"; 
00560 const char * none_state = "none active"; 
00561 
00562 // PIC redundancy operations for KCOM_IFDEV_REDUNDANCY_CMD_* values
00563 const char * no_op = "none"; 
00564 const char * switch_op = "switch"; 
00565 const char * revert_op = "revert"; 
00566 
00567 
00568 /*** Data Structures ***/
00569 
00573 int plugin_id;
00574 
00578 evContext * ctx;
00579 
00583 msp_policy_db_handle_t pdb_handle;
00584 
00588 void * pdb_shm_handle;
00589 
00590 static msp_oc_handle_t  entry_handle; 
00591 
00592 static msp_fdb_handle_t fdb_handle;    
00593 
00594 static evTimerID        retry_timer;   
00595 
00596 /*** STATIC/INTERNAL Functions ***/
00597 
00598 
00611 static msp_fdb_iter_res_t
00612 set_vrf(msp_fdb_rt_info_t * route_info, void * ctxt)
00613 {
00614     if(route_info != NULL) {
00615         *((uint32_t *)ctxt) = route_info->rt_idx;
00616     }
00617 
00618     // once we found one, we'll assume it is good enough
00619     return msp_fdb_iter_stop;
00620 }
00621 
00622 
00639 static void
00640 retry_attach_fdb(evContext ectx UNUSED,
00641                  void * uap UNUSED,
00642                  struct timespec due UNUSED,
00643                  struct timespec inter UNUSED)
00644 {
00645     int rc = msp_fdb_attach(NULL, &fdb_handle);
00646 
00647     if(rc == MSP_EAGAIN) {
00648         return; // will retry again later
00649     } else if(rc != MSP_OK) {
00650         CLOG(LOG_ALERT, "%s: Failed to attach to the forwarding database. Check"
00651                 " that it is configured (Error code: %d)", __func__, rc);
00652         // we will keep trying, but something is probably wrong
00653     } else { // it succeeded
00654         evClearTimer(*ctx, retry_timer);
00655         evInitID(&retry_timer);
00656         CLOG(LOG_INFO, "%s: Attached to FDB", __func__);
00657 
00658         // Once FDB is attached, init the rest:
00659 
00660         init_config();
00661         init_connections();
00662     }
00663 }
00664 
00665 
00677 static int monitube_data_hdlr(msvcs_data_context_t * md_ctx,
00678                               msvcs_data_event_t ev)
00679 {
00680     msvcs_pub_data_req_t * req;
00681     msp_objcache_params_t ocp;
00682     jbuf_svc_set_info_t ss_info;
00683     struct jbuf * jb;
00684     struct ip * ip_hdr;
00685     int rc, cpu;
00686     uint16_t ip_options_bytes;
00687     uint32_t rate;
00688     in_addr_t mirror_addr;
00689     flow_entry_t * flow;
00690 
00691     switch (ev) {
00692     case MSVCS_DATA_EV_SM_INIT:
00693 
00694         // Should only get this once, and first
00695 
00696         DLOG(LOG_INFO, "%s: Initialization of shared memory event", __func__);
00697 
00698         bzero(&ocp, sizeof(ocp));
00699         ocp.oc_shm = md_ctx->sc_shm; // use the global SHM arena handle
00700         strlcpy(ocp.oc_name, PLUGIN_NAME "-flow entry OC", sizeof(ocp.oc_name));
00701         ocp.oc_size = sizeof(flow_entry_t);
00702 
00703         rc = msp_objcache_create(&ocp);
00704 
00705         if (rc) {
00706             DLOG(LOG_ERR, "%s: Failed to initialize an object cache", __func__);
00707             return rc;
00708         }
00709 
00710         // Save the OC handle
00711         entry_handle = ocp.oc;
00712 
00713         break;
00714 
00715     case MSVCS_DATA_EV_INIT:
00716 
00717         break;
00718 
00719     case MSVCS_DATA_EV_FIRST_PKT_PROC:
00720 
00721         // First event received only once for every session
00722 
00723         jb = (struct jbuf *) md_ctx->sc_pkt;
00724         jbuf_get_svc_set_info(jb, &ss_info);
00725 
00726         if (ss_info.mon_svc == 0) { // we'll ignore flows of non-sampled packets
00727             DLOG(LOG_NOTICE, "%s: Encountered a non-sampled packet", __func__);
00728 
00729             msvcs_session_ignore(md_ctx->sc_session, md_ctx->sc_pid,
00730                     (MSVCS_SESSION_OP_FLAG_FORWARD_DIR
00731                             | MSVCS_SESSION_OP_FLAG_REVERSE_DIR));
00732 
00733             return MSVCS_ST_PKT_FORWARD;
00734         }
00735 
00736         if (pullup_bytes(&jb, sizeof(struct ip))) {
00737             DLOG(LOG_ERR, "%s: Failed to pullup IP header bytes", __func__);
00738 
00739             msvcs_session_ignore(md_ctx->sc_session, md_ctx->sc_pid,
00740                     md_ctx->sc_flags);
00741 
00742             return MSVCS_ST_PKT_FORWARD;
00743         }
00744 
00745         // Get IP header
00746         ip_hdr = jbuf_to_d(jb, struct ip *);
00747 
00748         if (!ip_hdr || ip_hdr->ip_p != IPPROTO_UDP) { // only care about UDP/RTP
00749 
00750             msvcs_session_ignore(md_ctx->sc_session, md_ctx->sc_pid,
00751                     md_ctx->sc_flags);
00752 
00753             return MSVCS_ST_PKT_FORWARD;
00754         }
00755 
00756         ip_options_bytes = (ip_hdr->ip_hl * 4) - sizeof(struct ip);
00757 
00758         if (pullup_bytes(&jb, sizeof(struct ip) + sizeof(struct udphdr)
00759                 + ip_options_bytes)) {
00760             DLOG(LOG_ERR, "%s: Failed to pullup UDP header bytes", __func__);
00761 
00762             msvcs_session_ignore(md_ctx->sc_session, md_ctx->sc_pid,
00763                     md_ctx->sc_flags);
00764 
00765             return MSVCS_ST_PKT_FORWARD;
00766         }
00767 
00768         mirror_addr = rate = 0;
00769 
00770         if (get_monitored_rule_info(md_ctx->sc_sset_id, ss_info.svc_id,
00771                 ip_hdr->ip_dst.s_addr, &rate, &mirror_addr) || (rate == 0
00772                 && mirror_addr == 0)) {
00773 
00774             // no policy match for this destination
00775 
00776             msvcs_session_ignore(md_ctx->sc_session, md_ctx->sc_pid,
00777                     md_ctx->sc_flags);
00778 
00779             return MSVCS_ST_PKT_FORWARD;
00780         }
00781 
00782         cpu = msvcs_state_get_cpuid();
00783 
00784         flow = msp_objcache_alloc(entry_handle, cpu, md_ctx->sc_sset_id);
00785 
00786         if (!flow) {
00787             DLOG(LOG_ERR, "%s: Failed to allocate flow state", __func__);
00788 
00789             msvcs_session_ignore(md_ctx->sc_session, md_ctx->sc_pid,
00790                     md_ctx->sc_flags);
00791 
00792             return MSVCS_ST_PKT_FORWARD;
00793         }
00794 
00795         flow->daddr = ip_hdr->ip_dst.s_addr;
00796         flow->dport = ((struct udphdr *)
00797                         ((uint32_t *) ip_hdr + ip_hdr->ip_hl))->uh_dport;
00798 
00799         flow->rate = rate;
00800 
00801         if (flow->rate != 0) {
00802             // init monitoring params for this flow
00803             DLOG(LOG_INFO, "%s: Setup monitoring for flow to %s", __func__,
00804                     inet_ntoa(ip_hdr->ip_dst));
00805 
00806             flow->ssrc = 0;
00807             bzero(&flow->source, sizeof(source_t));
00808             flow->pl_sum = 0;
00809             flow->vb_max = 0.0;
00810             flow->vb_min = 0.0;
00811             flow->vb_pre = 0.0;
00812             flow->vb_post = 0.0;
00813             flow->mdi_df = 0.0;
00814             flow->mdi_mlr = 0;
00815 
00816             // start window right before we received the first one
00817             flow->base_ts = jbuf_get_hw_timestamp(jb) - 1;
00818         }
00819 
00820         flow->maddr = mirror_addr;
00821 
00822         if (flow->maddr != 0) {
00823             
00824             flow->m_vrf = 0;
00825 
00826             // look up VRF in FDB
00827             if (msp_fdb_get_all_route_records(fdb_handle, PROTO_IPV4,
00828                     set_vrf, &flow->m_vrf) != MSP_OK) {
00829             
00830                 struct in_addr tmp;
00831                 tmp.s_addr = flow->maddr;
00832                 DLOG(LOG_ERR, "%s: Did not successfully lookup a VRF "
00833                     "for mirrored site %s", __func__, inet_ntoa(tmp));
00834             }
00835             
00836             DLOG(LOG_INFO, "%s: Setup mirroring for flow to %s to VRF %d",
00837                  __func__, inet_ntoa(ip_hdr->ip_dst), flow->m_vrf);
00838         }
00839 
00840         // save the flow entry in the session context
00841         // we should always only see a flow one direction
00842 
00843         if (md_ctx->sc_flags & MSVCS_CTX_FLAGS_DIR_REVERSE) {
00844             msvcs_session_set_ext_handle(
00845                     (msvcs_session_t *) md_ctx->sc_session,
00846                     (uint8_t) plugin_id, NULL, flow);
00847         } else {
00848             msvcs_session_set_ext_handle(
00849                     (msvcs_session_t *) md_ctx->sc_session,
00850                     (uint8_t) plugin_id, flow, NULL);
00851         }
00852 
00853         process_packet(jb, flow, md_ctx->sc_sset_id);
00854 
00855         if (jb != md_ctx->sc_pkt) {
00856             // if a new jbuf was returned during any jbuf manipulations, then
00857             // we can't return forward. We hold the old jbuf (which is eaten by 
00858             // jbuf APIs anyway) and we inject the new jbuf
00859             // Note this cannot be done on "first" packets
00860 
00861             // Having only done a pullup this is usually not supposed to happen,
00862             // but just in case...
00863 
00864             msp_reinject_packet((msvcs_session_t *) md_ctx->sc_session, jb);
00865 
00866             return MSVCS_ST_PKT_HOLD;
00867         }
00868 
00869         // Drop by default since we are a monitoring application and
00870         // packets should be copies
00871         return MSVCS_ST_PKT_FORWARD;
00872 
00873     case MSVCS_DATA_EV_PKT_PROC:
00874 
00875         // received for all packets in a session following the first one
00876 
00877         jb = (struct jbuf *) md_ctx->sc_pkt;
00878 
00879         if (pullup_bytes(&jb, sizeof(struct ip))) {
00880             DLOG(LOG_ERR, "%s: Failed to pullup IP header bytes", __func__);
00881             return MSVCS_ST_PKT_FORWARD;
00882         }
00883 
00884         // safe typecast to an IP header
00885         ip_hdr = jbuf_to_d(jb, struct ip *);
00886 
00887         // retrieve the flow entry in the session context
00888 
00889         if (md_ctx->sc_flags & MSVCS_CTX_FLAGS_DIR_REVERSE) {
00890             msvcs_session_get_ext_handle(
00891                     (msvcs_session_t *) md_ctx->sc_session,
00892                     (uint8_t) plugin_id, NULL, (void **) &flow);
00893         } else {
00894             msvcs_session_get_ext_handle(
00895                     (msvcs_session_t *) md_ctx->sc_session,
00896                     (uint8_t) plugin_id, (void **) &flow, NULL);
00897         }
00898 
00899         if (!flow) {
00900             DLOG(LOG_ERR, "%s: Could not retrieve session context", __func__);
00901             
00902             msvcs_session_ignore(md_ctx->sc_session, md_ctx->sc_pid,
00903                     md_ctx->sc_flags);
00904             
00905             return MSVCS_ST_PKT_FORWARD;
00906         }
00907 
00908         process_packet(jb, flow, md_ctx->sc_sset_id);
00909 
00910         if (jb != md_ctx->sc_pkt) {
00911             // if a new jbuf was returned during any jbuf manipulations, then
00912             // we can't return forward. We hold the old jbuf (which is eaten by 
00913             // jbuf APIs anyway) and we inject the new jbuf
00914             // Note this cannot be done on "first" packets
00915 
00916             // Having only done a pullup this is usually not supposed to happen,
00917             // but just in case...
00918 
00919             msp_reinject_packet((msvcs_session_t *) md_ctx->sc_session, jb);
00920 
00921             return MSVCS_ST_PKT_HOLD;
00922         }
00923 
00924         return MSVCS_ST_PKT_FORWARD;
00925 
00926     case MSVCS_DATA_EV_SESSION_OPEN:
00927 
00928         // received after all plug-ins have decided to forward the first packet
00929         // of a session
00930 
00931         break;
00932 
00933     case MSVCS_DATA_EV_SESSION_CLOSE:
00934 
00935         // received when an existing session (OPEN rcv'd previously) times out
00936 
00937         break;
00938 
00939     case MSVCS_DATA_EV_SESSION_DESTROY:
00940 
00941         // All plug-ins have been notified about the CLOSE or some plug-in 
00942         // dropped the first packet of a session, so no open/close was received
00943 
00944         DLOG(LOG_INFO, "%s: Session destroyed", __func__);
00945 
00946         cpu = msvcs_state_get_cpuid();
00947 
00948         // Get and free attached session context containing the flow entry
00949         msvcs_session_get_ext_handle((msvcs_session_t *) md_ctx->sc_session,
00950                 (uint8_t) plugin_id, (void **) &flow, NULL);
00951 
00952         if (flow) {
00953             msp_objcache_free(entry_handle, flow, cpu, md_ctx->sc_sset_id);
00954         }
00955 
00956         // Get and free attached session context containing the flow entry
00957         msvcs_session_get_ext_handle((msvcs_session_t *) md_ctx->sc_session,
00958                 (uint8_t) plugin_id, NULL, (void **) &flow);
00959 
00960         if (flow) {
00961             msp_objcache_free(entry_handle, flow, cpu, md_ctx->sc_sset_id);
00962         }
00963 
00964         break;
00965 
00966     case MSVCS_DATA_EV_REQ_PUB_DATA:
00967 
00968         req = md_ctx->plugin_data;
00969 
00970         DLOG(LOG_INFO, "%s: Requesting public data %d", __func__, req->data_id);
00971 
00972         // we do not expect this event; no one would be requesting data from us
00973         req->err = -1; // we do not have any public data to give
00974 
00975         break;
00976 
00977     default:
00978 
00979         DLOG(LOG_ERR, "%s: Received an unhandled data event %X", __func__, ev);
00980     }
00981 
00982     return MSVCS_ST_OK;
00983 }
00984 
00996 static int monitube_ctrl_hdlr(msvcs_control_context_t * mc_ctx,
00997                               msvcs_control_event_t ev)
00998 {
00999     junos_kcom_gencfg_t * jkg;
01000     msvcs_ha_info_hdr_t * ha_info;
01001     kcom_ifdev_redundancy_info_t * ri;
01002     const char * op, *state;
01003     char tmp1[32], tmp2[32];
01004 
01005     CLOG(LOG_INFO, "%s: Handling control event for plug-in %d from CPU %d.",
01006             __func__, plugin_id, msvcs_state_get_cpuid());
01007 
01008     switch (ev) {
01009     case MSVCS_CONTROL_EV_INIT:
01010 
01011         CLOG(LOG_INFO, "%s: Initialization Event...", __func__);
01012 
01013         // do initialization here
01014 
01015         ctx = mc_ctx->scc_ev_ctxt;
01016         pdb_handle = mc_ctx->policy_db_handle;
01017         pdb_shm_handle = mc_ctx->policy_shm_handle;
01018 
01019         if (!msp_fdb_is_configured()) {
01020             CLOG(LOG_EMERG, "%s: FDB is not configured, but required.",
01021                     __func__);
01022         }
01023 
01024         // init the hardware timestamp infrastructure
01025         if (msp_hw_ts32_init() != MSP_OK) {
01026             CLOG(LOG_EMERG, "%s: Failed to initialize HW timestamp "
01027                 "infrastructure.", __func__);
01028         }
01029         
01030         // Attach to FDB
01031         evInitID(&retry_timer);
01032         if(evSetTimer(*ctx, retry_attach_fdb, NULL, evConsTime(0, 0),
01033                 evConsTime(5, 0), &retry_timer)) {
01034 
01035             CLOG(LOG_EMERG, "%s: Failed to initialize a timer to retry "
01036                 "attaching to FDB", __func__);
01037         }
01038         
01039         // Init the rest when attached to FDB...
01040 
01041         break;
01042 
01043     case MSVCS_CONTROL_EV_CFG_BLOB:
01044 
01045         CLOG(LOG_INFO, "%s: Configuration Event", __func__);
01046 
01047         // we do not expect this event as their is no one sending us blobs
01048         jkg = (junos_kcom_gencfg_t *) mc_ctx->plugin_data;
01049 
01050         if (!jkg) {
01051             CLOG(LOG_ERR, "%s: Malformed control event", __func__);
01052             break;
01053         }
01054 
01055         CLOG(LOG_INFO, "%s: received unexpected GENCFG blob %d with key of "
01056             "size %d and blob of size %d. It was sent to %d MS PIC peers.",
01057                 __func__, jkg->opcode, jkg->key.size, jkg->blob.size,
01058                 jkg->peer_count);
01059 
01060         JUNOS_KCOM_MPSDK_CFG_FREE(jkg);
01061 
01062         break;
01063 
01064     case MSVCS_CONTROL_EV_HA_INFO_BLOB:
01065 
01066         CLOG(LOG_INFO, "%s: HA Info Event", __func__);
01067 
01068         ha_info = (msvcs_ha_info_hdr_t *) mc_ctx->plugin_data;
01069 
01070         if (!ha_info) {
01071             CLOG(LOG_ERR, "%s: Malformed control event", __func__);
01072             break;
01073         }
01074 
01075         if (ha_info->subtype == MSVCS_HA_INFO_REDUNDANCY_INFO) {
01076             CLOG(LOG_INFO, "%s: Received redundancy information of length %d",
01077                     __func__, ha_info->length);
01078 
01079             ri = (kcom_ifdev_redundancy_info_t *) MSVCS_HA_INFO_HDR_DATA(
01080                     ha_info);
01081 
01082             switch (ri->state) {
01083             case KCOM_IFDEV_REDUNDANCY_STATE_WAITING_FOR_PRIMARY:
01084                 state = waiting_state;
01085                 break;
01086             case KCOM_IFDEV_REDUNDANCY_STATE_PRIMARY_ACTIVE:
01087                 state = primary_state;
01088                 break;
01089             case KCOM_IFDEV_REDUNDANCY_STATE_SECONDARY_ACTIVE:
01090                 state = secondary_state;
01091                 break;
01092             case KCOM_IFDEV_REDUNDANCY_STATE_NONE_ACTIVE:
01093                 state = none_state;
01094                 break;
01095             default:
01096                 state = tmp1;
01097                 snprintf(tmp1, sizeof(tmp1), "%d", (int) ri->state);
01098                 break;
01099             }
01100 
01101             switch (ri->cmd) {
01102             case KCOM_IFDEV_REDUNDANCY_CMD_NONE:
01103                 op = no_op;
01104                 break;
01105             case KCOM_IFDEV_REDUNDANCY_CMD_SWITCH:
01106                 op = switch_op;
01107                 break;
01108             case KCOM_IFDEV_REDUNDANCY_CMD_REVERT:
01109                 op = revert_op;
01110                 break;
01111             default:
01112                 op = tmp2;
01113                 snprintf(tmp2, sizeof(tmp2), "%d", (int) ri->cmd);
01114                 break;
01115             }
01116 
01117             CLOG(LOG_INFO, "State: %s, Cmd: %s, "
01118                 "Time since last change Sec: %ld, uSec: %ld, "
01119                 "Primary: %s, Secondary: %s, Active: %s", state, op,
01120                     ri->lastupdate.tv_sec, ri->lastupdate.tv_usec,
01121                     ri->primary_ifdev_name, ri->secondary_ifdev_name,
01122                     ri->active_ifdev_name);
01123 
01124         } else {
01125             CLOG(LOG_ERR, "%s: Received unrecognizable HA info", __func__);
01126         }
01127 
01128         free(ha_info);
01129 
01130         break;
01131 
01132     default:
01133         CLOG(LOG_ERR, "%s: Received an unhandled ctrl event %X", __func__, ev);
01134     }
01135 
01136     return MSP_OK;
01137 }
01138 
01139 
01140 /*** GLOBAL/EXTERNAL Functions ***/
01141 
01142 int monitube2_entry(void);
01143 
01144 
01152 int monitube2_entry(void)
01153 {
01154     msvcs_plugin_params_t params;
01155 
01156     CLOG(LOG_INFO, "%s: Registering plug-in in entry function.", __func__);
01157 
01158     // Register plug-in itself
01159 
01160     bzero(&params, sizeof(msvcs_plugin_params_t));
01161 
01162     strlcpy(params.spp_name, PLUGIN_NAME, sizeof(params.spp_name));
01163     params.spp_plugin_app_id = PLUGIN_ID;
01164     params.spp_class = MSVCS_PLUGIN_CLASS_EXTERNAL;
01165     params.spp_data_evh = monitube_data_hdlr;
01166     params.spp_control_evh = monitube_ctrl_hdlr;
01167 
01168     plugin_id = msvcs_plugin_register(&params);
01169 
01170     if (plugin_id < 0) {
01171         CLOG(LOG_ALERT, "%s: %s cannot be registered as a valid plug-in",
01172                 __func__, PLUGIN_NAME);
01173     } else {
01174         CLOG(LOG_INFO, "%s: %s was successfully registered and assigned id"
01175             "%d.", __func__, PLUGIN_NAME, plugin_id);
01176     }
01177 
01178     return plugin_id;
01179 }
01180 

2007-2009 Juniper Networks, Inc. All rights reserved. The information contained herein is confidential information of Juniper Networks, Inc., and may not be used, disclosed, distributed, modified, or copied without the prior written consent of Juniper Networks, Inc. in an express license. This information is subject to change by Juniper Networks, Inc. Juniper Networks, the Juniper Networks logo, and JUNOS are registered trademarks of Juniper Networks, Inc. in the United States and other countries. All other trademarks, service marks, registered trademarks, or registered service marks are the property of their respective owners.
Generated on Sun May 30 20:27:09 2010 for SDK Your Net Corporation Monitube2 IPTV Monitoring Example: monitube2-plugin 1.0 by Doxygen 1.5.1