monitube-data_conn.c

Go to the documentation of this file.
00001 /*
00002  * $Id: monitube-data_conn.c 346460 2009-11-14 05:06:47Z ssiano $
00003  *
00004  * This code is provided as is by Juniper Networks SDK Developer Support.
00005  * It is provided with no warranties or guarantees, and Juniper Networks
00006  * will not provide support or maintenance of this code in any fashion.
00007  * The code is provided only to help a developer better understand how
00008  * the SDK can be used.
00009  *
00010  * Copyright (c) 2008, Juniper Networks, Inc.
00011  * All rights reserved.
00012  */
00013 
00022 #include "monitube-data_main.h"
00023 #include <jnx/pconn.h>
00024 #include "monitube-data_config.h"
00025 #include "monitube-data_conn.h"
00026 
00027 
00028 /*** Constants ***/
00029 
00033 #define RETRY_CONNECT 60
00034 #define CONNECT_RETRIES    1  
00035 
00036 
00037 /*** Data Structures ***/
00038 
00039 extern volatile boolean is_master; 
00040 
00045 typedef struct notification_msg_s {
00046     msg_type_e     action;   
00047     flow_stat_t *  message;  
00048     TAILQ_ENTRY(notification_msg_s) entries;    
00049 } notification_msg_t;
00050 
00051 
00055 static TAILQ_HEAD(notification_buffer_s, notification_msg_s) notification_msgs =
00056             TAILQ_HEAD_INITIALIZER(notification_msgs);
00057 
00058 static msp_spinlock_t msgs_lock;     
00059 
00060 static pconn_client_t * mgmt_client; 
00061 
00062 static evTimerID  mgmt_timer_id;     
00063 
00064 static evContext  main_ctx;          
00065 
00066 /*** STATIC/INTERNAL Functions ***/
00067 
00068 
00072 static void connect_mgmt(evContext ctx, void * uap,
00073     struct timespec due, struct timespec inter);
00074 
00075 
00092 static void
00093 process_notifications_messages(evContext ctx UNUSED, void * uap  UNUSED,
00094             struct timespec due UNUSED, struct timespec inter UNUSED)
00095 {
00096     notification_msg_t * msg;
00097     int rc = 0;
00098 
00099     if(mgmt_client == NULL) { // don't bother doing anything yet
00100         return;
00101     }
00102 
00103     INSIST_ERR(msp_spinlock_lock(&msgs_lock) == MSP_OK);
00104 
00105     while((msg = TAILQ_FIRST(&notification_msgs)) != NULL) {
00106 
00107         TAILQ_REMOVE(&notification_msgs, msg, entries);
00108 
00109         INSIST_ERR(msp_spinlock_unlock(&msgs_lock) == MSP_OK);
00110 
00111         rc = pconn_client_send(mgmt_client, msg->action, msg->message,
00112                 sizeof(flow_stat_t) + msg->message->mon_name_len);
00113 
00114         if(rc != PCONN_OK) {
00115             // put message back in and process soon
00116 
00117             LOG(LOG_ERR, "%s: Failed to send message to mgmt component."
00118                     " Error: %d", __func__, rc);
00119 
00120             INSIST_ERR(msp_spinlock_lock(&msgs_lock) == MSP_OK);
00121 
00122             TAILQ_INSERT_HEAD(&notification_msgs, msg, entries);
00123 
00124             INSIST_ERR(msp_spinlock_unlock(&msgs_lock) == MSP_OK);
00125 
00126             if(evSetTimer(main_ctx, process_notifications_messages, NULL,
00127                     evAddTime(evNowTime(), evConsTime(5, 0)),
00128                     evConsTime(0, 0), NULL)) {
00129 
00130                 LOG(LOG_EMERG, "%s: evSetTimer() failed! Will not be "
00131                         "able to process buffered notifications", __func__);
00132             }
00133         } else {
00134             free(msg->message);
00135             free(msg);
00136         }
00137 
00138         INSIST_ERR(msp_spinlock_lock(&msgs_lock) == MSP_OK);
00139     }
00140 
00141     INSIST_ERR(msp_spinlock_unlock(&msgs_lock) == MSP_OK);
00142 }
00143 
00144 
00149 static void
00150 process_notifications(void)
00151 {
00152     if(evSetTimer(main_ctx, process_notifications_messages, NULL,
00153             evConsTime(0,0), evConsTime(0, 0), NULL)) {
00154 
00155         LOG(LOG_EMERG, "%s: evSetTimer() failed! Will not be "
00156                 "able to process buffered notifications", __func__);
00157     }
00158 }
00159 
00172 static void
00173 mgmt_client_connection(pconn_client_t * client,
00174                       pconn_event_t event,
00175                       void * cookie UNUSED)
00176 {
00177     INSIST_ERR(client == mgmt_client);
00178 
00179     switch (event) {
00180 
00181     case PCONN_EVENT_ESTABLISHED:
00182 
00183         // clear the retry timer
00184         if(evTestID(mgmt_timer_id)) {
00185             evClearTimer(main_ctx, mgmt_timer_id);
00186             evInitID(&mgmt_timer_id);
00187         }
00188 
00189         clear_config(); // we expect to get a fresh copy of everything
00190         
00191         // For now, toggle this here until MASTERSHIP messages from the 
00192         // mgmt component are supported
00193         is_master = TRUE;
00194         
00195         LOG(LOG_INFO, "%s: Connected to the monitube-mgmt component "
00196                 "on the RE", __func__);
00197 
00198         break;
00199 
00200     case PCONN_EVENT_SHUTDOWN:
00201 
00202         if(mgmt_client != NULL) {
00203             clear_config();
00204             LOG(LOG_INFO, "%s: Disconnected from the monitube-mgmt component"
00205                     " on the RE", __func__);
00206         }
00207 
00208         mgmt_client = NULL; // connection will be closed
00209         // For now, toggle this here until MASTERSHIP messages from the 
00210         // mgmt component are supported
00211         is_master = FALSE;
00212 
00213         // Reconnect to it if timer not going
00214         if(!evTestID(mgmt_timer_id) &&
00215            evSetTimer(main_ctx, connect_mgmt, NULL, evConsTime(0,0),
00216                evConsTime(RETRY_CONNECT, 0), &mgmt_timer_id)) {
00217 
00218             LOG(LOG_EMERG, "%s: Failed to initialize a re-connect timer to "
00219                 "reconnect to the mgmt component", __func__);
00220         }
00221 
00222         break;
00223 
00224     case PCONN_EVENT_FAILED:
00225 
00226         LOG(LOG_ERR, "%s: Received a PCONN_EVENT_FAILED event", __func__);
00227 
00228         if(mgmt_client != NULL) {
00229             clear_config();
00230             LOG(LOG_INFO, "%s: Disconnected from the monitube-mgmt component"
00231                     " on the RE", __func__);
00232         }
00233 
00234         mgmt_client = NULL; // connection will be closed
00235         // For now, toggle this here until MASTERSHIP messages from the 
00236         // mgmt component are supported
00237         is_master = FALSE;
00238 
00239         // Reconnect to it if timer not going
00240         if(!evTestID(mgmt_timer_id) &&
00241            evSetTimer(main_ctx, connect_mgmt, NULL, evConsTime(0,0),
00242                evConsTime(RETRY_CONNECT, 0), &mgmt_timer_id)) {
00243 
00244             LOG(LOG_EMERG, "%s: Failed to initialize a re-connect timer to "
00245                 "reconnect to the mgmt component", __func__);
00246         }
00247 
00248         break;
00249 
00250     default:
00251         LOG(LOG_ERR, "%s: Received an unknown pconn event", __func__);
00252     }
00253 }
00254 
00255 
00271 static status_t
00272 mgmt_client_message(pconn_client_t * session,
00273                     ipc_msg_t * msg,
00274                     void * cookie UNUSED)
00275 {
00276     struct in_addr ia;
00277 
00278     INSIST_ERR(session == mgmt_client);
00279     INSIST_ERR(msg != NULL);
00280 
00281     switch(msg->subtype) { // a msg_type_e
00282 
00283     case MSG_CONF_MASTER:
00284 
00285         INSIST_ERR(msg->length == 0);
00286 
00287         LOG(LOG_INFO, "%s: Received a configuration update: Becoming master",
00288                 __func__);
00289 
00290         set_mastership(TRUE, 0);
00291 
00292         break;
00293 
00294     case MSG_CONF_SLAVE:
00295     {
00296         slave_info_t * data = (slave_info_t *)msg->data;
00297 
00298         INSIST_ERR(msg->length == sizeof(slave_info_t));
00299 
00300         ia.s_addr = data->master_address;
00301         LOG(LOG_INFO, "%s: Received a configuration update: Becoming slave "
00302                 "to %s", __func__, inet_ntoa(ia));
00303 
00304         set_mastership(FALSE, data->master_address);
00305 
00306         break;
00307     }
00308     case MSG_REP_INFO:
00309     {
00310         replication_info_t * data = (replication_info_t *)msg->data;
00311 
00312         INSIST_ERR(msg->length == sizeof(replication_info_t));
00313 
00314         LOG(LOG_INFO, "%s: Received a configuration update: Set replication "
00315                 "interval: %d", __func__, data->interval);
00316 
00317         set_replication_interval(data->interval);
00318 
00319         break;
00320     }
00321     case MSG_DELETE_ALL_MON:
00322 
00323         INSIST_ERR(msg->length == 0);
00324 
00325         LOG(LOG_INFO, "%s: Received a configuration update: "
00326                 "clear monitors configuration", __func__);
00327 
00328         clear_monitors_configuration();
00329 
00330         break;
00331 
00332     case MSG_DELETE_ALL_MIR:
00333 
00334         INSIST_ERR(msg->length == 0);
00335 
00336         LOG(LOG_INFO, "%s: Received a configuration update: "
00337                 "clear mirrors configuration", __func__);
00338 
00339         clear_mirrors_configuration();
00340 
00341         break;
00342 
00343     case MSG_DELETE_MON:
00344     {
00345         del_mon_info_t * data = (del_mon_info_t *)msg->data;
00346 
00347         INSIST_ERR(msg->length == sizeof(del_mon_info_t) +
00348                 ntohs(data->mon_name_len));
00349 
00350         LOG(LOG_INFO, "%s: Received a configuration update: delete monitor",
00351                 __func__);
00352 
00353         delete_monitor(data->mon_name);
00354 
00355         break;
00356     }
00357     case MSG_DELETE_MIR:
00358     {
00359         del_mir_info_t * data = (del_mir_info_t *)msg->data;
00360         INSIST_ERR(msg->length == sizeof(del_mir_info_t));
00361 
00362         LOG(LOG_INFO, "%s: Received a configuration update: delete mirror",
00363                 __func__);
00364 
00365         delete_mirror(data->mirror_from);
00366 
00367         break;
00368     }
00369     case MSG_DELETE_MON_ADDR:
00370     {
00371         maddr_info_t * data = (maddr_info_t *)msg->data;
00372         INSIST_ERR(msg->length ==
00373             sizeof(maddr_info_t) + ntohs(data->mon_name_len));
00374 
00375         LOG(LOG_INFO, "%s: Received a configuration update: "
00376                 "delete monitored address (prefix)", __func__);
00377 
00378         delete_address(data->mon_name, data->addr, data->mask);
00379 
00380         break;
00381     }
00382     case MSG_CONF_MON:
00383     {
00384         update_mon_info_t * data = (update_mon_info_t *)msg->data;
00385         INSIST_ERR(msg->length ==
00386             sizeof(update_mon_info_t) + ntohs(data->mon_name_len));
00387 
00388         LOG(LOG_INFO, "%s: Received a configuration update: update "
00389             "monitor: %s/%d", __func__, data->mon_name, ntohl(data->rate));
00390 
00391         update_monitor(data->mon_name, ntohl(data->rate));
00392 
00393         break;
00394     }
00395     case MSG_CONF_MIR:
00396     {
00397         update_mir_info_t * data = (update_mir_info_t *)msg->data;
00398         INSIST_ERR(msg->length == sizeof(update_mir_info_t));
00399 
00400         LOG(LOG_INFO, "%s: Received a configuration update: update mirror",
00401             __func__);
00402 
00403         update_mirror(data->mirror_from, data->mirror_to);
00404 
00405         break;
00406     }
00407     case MSG_CONF_MON_ADDR:
00408     {
00409         maddr_info_t * data = (maddr_info_t *)msg->data;
00410         INSIST_ERR(msg->length ==
00411             sizeof(maddr_info_t) + ntohs(data->mon_name_len));
00412 
00413         LOG(LOG_INFO, "%s: Received a configuration update: "
00414                 "add monitored address/prefix", __func__);
00415         ia.s_addr = data->addr;
00416         LOG(LOG_INFO, "%s: (%s with address: %s)", __func__,
00417                 data->mon_name, inet_ntoa(ia));
00418         ia.s_addr = data->mask;
00419         LOG(LOG_INFO, "%s: (%s with mask: %s)", __func__,
00420                 data->mon_name, inet_ntoa(ia));
00421 
00422         add_address(data->mon_name, data->addr, data->mask);
00423 
00424         break;
00425     }
00426     default:
00427         LOG(LOG_ERR, "%s: Received an unknown message type (%d) from the "
00428             "mgmt component", __func__, msg->subtype);
00429         return EFAIL;
00430     }
00431 
00432     return SUCCESS;
00433 }
00434 
00435 
00451 static void
00452 connect_mgmt(evContext ctx UNUSED,
00453             void * uap  UNUSED,
00454             struct timespec due UNUSED,
00455             struct timespec inter UNUSED)
00456 {
00457     pconn_client_params_t params;
00458 
00459     bzero(&params, sizeof(pconn_client_params_t));
00460 
00461     // setup the client args
00462     params.pconn_peer_info.ppi_peer_type = PCONN_PEER_TYPE_RE;
00463     params.pconn_port                    = MONITUBE_PORT_NUM;
00464     params.pconn_num_retries             = CONNECT_RETRIES;
00465     params.pconn_event_handler           = mgmt_client_connection;
00466 
00467     if(mgmt_client) {
00468         // Then this is the second time in a row this is called even though it
00469         // didn't fail. We haven't received an event like ESTABLISHED yet.
00470         pconn_client_close(mgmt_client);
00471     }
00472 
00473     // connect
00474     mgmt_client = pconn_client_connect_async(
00475                     &params, main_ctx, mgmt_client_message, NULL);
00476 
00477     if(mgmt_client == NULL) {
00478         LOG(LOG_ERR, "%s: Failed to initialize the pconn client connection "
00479             "to the mgmt component", __func__);
00480     }
00481 
00482     LOG(LOG_INFO, "%s: Trying to connect to the mgmt component", __func__);
00483 }
00484 
00485 
00486 /*** GLOBAL/EXTERNAL Functions ***/
00487 
00488 
00498 status_t
00499 init_connections(evContext ctx)
00500 {
00501     mgmt_client = NULL;
00502 
00503     main_ctx = ctx;
00504 
00505     evInitID(&mgmt_timer_id);
00506     msp_spinlock_init(&msgs_lock);
00507 
00508     // Connect to the MGMT component
00509     if(evSetTimer(ctx, connect_mgmt, NULL, evNowTime(),
00510         evConsTime(RETRY_CONNECT, 0), &mgmt_timer_id)) {
00511 
00512         LOG(LOG_EMERG, "%s: Failed to initialize a connect timer to connect "
00513             "to the MGMT component", __func__);
00514         return EFAIL;
00515     }
00516 
00517     return SUCCESS;
00518 }
00519 
00520 
00524 void
00525 close_connections(void)
00526 {
00527     notification_msg_t * msg;
00528 
00529     if(mgmt_client) {
00530         pconn_client_close(mgmt_client);
00531         mgmt_client = NULL;
00532     }
00533 
00534     INSIST_ERR(msp_spinlock_lock(&msgs_lock) == MSP_OK);
00535 
00536     while((msg = TAILQ_FIRST(&notification_msgs)) != NULL) {
00537         TAILQ_REMOVE(&notification_msgs, msg, entries);
00538         free(msg->message);
00539         free(msg);
00540     }
00541 
00542     INSIST_ERR(msp_spinlock_unlock(&msgs_lock) == MSP_OK);
00543 }
00544 
00545 
00564 void
00565 notify_stat_update(in_addr_t flow_addr,
00566                    uint16_t flow_dport,
00567                    double mdi_df,
00568                    uint32_t mdi_mlr,
00569                    char * monitor_name)
00570 {
00571     notification_msg_t * msg;
00572     uint16_t len;
00573     void * doub;
00574     uint64_t out;
00575 
00576     if(mgmt_client == NULL) { // don't bother doing anything yet
00577         return;
00578     }
00579 
00580     msg = calloc(1, sizeof(notification_msg_t));
00581     INSIST_ERR(msg != NULL);
00582 
00583     msg->action = MSG_FLOW_STAT_UPDATE;
00584 
00585     len = strlen(monitor_name);
00586     msg->message = calloc(1, sizeof(flow_stat_t) + len + 1);
00587     msg->message->flow_addr = flow_addr;
00588     msg->message->flow_port = flow_dport;
00589     doub = &mdi_df; // play around double type in case of a truncating typecast
00590     out = *(uint64_t *)doub;
00591     msg->message->mdi_df = htonq(out);
00592     msg->message->mdi_mlr = mdi_mlr;
00593     msg->message->mon_name_len = len + 1;
00594     strcpy(msg->message->mon_name, monitor_name);
00595 
00596     INSIST_ERR(msp_spinlock_lock(&msgs_lock) == MSP_OK);
00597 
00598     TAILQ_INSERT_TAIL(&notification_msgs, msg, entries);
00599 
00600     INSIST_ERR(msp_spinlock_unlock(&msgs_lock) == MSP_OK);
00601 
00602     process_notifications();
00603 }

2007-2009 Juniper Networks, Inc. All rights reserved. The information contained herein is confidential information of Juniper Networks, Inc., and may not be used, disclosed, distributed, modified, or copied without the prior written consent of Juniper Networks, Inc. in an express license. This information is subject to change by Juniper Networks, Inc. Juniper Networks, the Juniper Networks logo, and JUNOS are registered trademarks of Juniper Networks, Inc. in the United States and other countries. All other trademarks, service marks, registered trademarks, or registered service marks are the property of their respective owners.
Generated on Sun May 30 20:27:02 2010 for SDK Your Net Corporation Monitube IPTV Monitoring Example: monitube-data 1.0 by Doxygen 1.5.1