monitube-mgmt_conn.c

Go to the documentation of this file.
00001 /*
00002  * $Id: monitube-mgmt_conn.c 346460 2009-11-14 05:06:47Z ssiano $
00003  *
00004  * This code is provided as is by Juniper Networks SDK Developer Support.
00005  * It is provided with no warranties or guarantees, and Juniper Networks
00006  * will not provide support or maintenance of this code in any fashion.
00007  * The code is provided only to help a developer better understand how
00008  * the SDK can be used.
00009  *
00010  * Copyright (c) 2008, Juniper Networks, Inc.
00011  * All rights reserved.
00012  */
00013 
00021 #include <sync/common.h>
00022 #include <jnx/pconn.h>
00023 #include <sync/monitube_ipc.h>
00024 #include "monitube-mgmt_config.h"
00025 #include "monitube-mgmt_conn.h"
00026 #include "monitube-mgmt_logging.h"
00027 
00028 #include MONITUBE_OUT_H
00029 
00030 /*** Constants ***/
00031 
00032 #define MONITUBE_MGMT_SERVER_MAX_CONN 20  
00033 
00034 #define RETRY_INTERVAL 5 
00035 
00036 /*** Data Structures ***/
00037 
00038 static evContext       m_ctx;         
00039 static pconn_server_t  * mgmt_server; 
00040 
00044 typedef struct session_s {
00045     pconn_session_t           * session;  
00046     TAILQ_ENTRY(session_s)    entries;    
00047 } session_t;
00048 
00052 typedef TAILQ_HEAD(session_list_s, session_s) session_list_t;
00053 
00057 static session_list_t session_list = TAILQ_HEAD_INITIALIZER(session_list);
00058 
00063 typedef struct notification_msg_s {
00064     msg_type_e   action;        
00065     uint16_t     message_len;   
00066     void *       message;       
00067     TAILQ_ENTRY(notification_msg_s) entries;    
00068 } notification_msg_t;
00069 
00070 
00074 static TAILQ_HEAD(notification_buffer_s, notification_msg_s) notification_msgs =
00075             TAILQ_HEAD_INITIALIZER(notification_msgs);
00076 
00077 
00078 /*** STATIC/INTERNAL Functions ***/
00079 
00083 static void
00084 empty_notifications_queue(void)
00085 {
00086     notification_msg_t * msg;
00087 
00088     while((msg = TAILQ_FIRST(&notification_msgs)) != NULL) {
00089         TAILQ_REMOVE(&notification_msgs, msg, entries);
00090         if(msg->message_len != 0) {
00091             free(msg->message);
00092         }
00093         free(msg);
00094     }
00095 }
00096 
00109 static void
00110 receive_connection(pconn_session_t * session,
00111                    pconn_event_t event,
00112                    void * cookie __unused)
00113 {
00114     int rc;
00115     monitor_t * mon;
00116     mirror_t * mir;
00117     address_t * address;
00118     struct in_addr peer_addr;
00119     pconn_peer_info_t info;
00120     notification_msg_t * msg;
00121     session_t * tmp_session;
00122 
00123     rc = pconn_session_get_peer_info(session, &info);
00124 
00125     if(rc != PCONN_OK) {
00126         LOG(TRACE_LOG_ERR, "%s: Cannot retrieve peer info "
00127             "for session. Error: %d", __func__, rc);
00128 
00129         pconn_session_close(session);
00130         return;
00131     }
00132     
00133     INSIST_ERR(info.ppi_peer_type == PCONN_PEER_TYPE_PIC);
00134     
00135     rc = pconn_get_peer_address(&info, &peer_addr.s_addr);
00136 
00137     if(rc != PCONN_OK) {
00138         LOG(TRACE_LOG_ERR, "%s: Cannot retrieve peer address "
00139             "for session. Error: %d", __func__, rc);
00140 
00141         pconn_session_close(session);
00142         return;
00143     }
00144     
00145     switch (event) {
00146 
00147     case PCONN_EVENT_ESTABLISHED:
00148 
00149         junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00150             "%s: Accepted connection from ms-%d/%d/0 (%s)",
00151             __func__, info.ppi_fpc_slot, info.ppi_pic_slot,
00152             inet_ntoa(peer_addr));
00153 
00154         tmp_session = malloc(sizeof(session_t));
00155         INSIST(tmp_session != NULL);
00156         
00157         tmp_session->session = session;
00158         TAILQ_INSERT_TAIL(&session_list, tmp_session, entries);
00159 
00160         // Send all configuration to the service (PIC) component
00161         // go through all monitors and mirrors
00162         // Assume it has nothing
00163         
00164         empty_notifications_queue();
00165 
00166         mon = next_monitor(NULL);
00167         while(mon != NULL) {
00168 
00169             notify_monitor_update(mon->name, mon->rate);
00170 
00171             // addresses of this monitor
00172             address = next_address(mon, NULL);
00173             while(address != NULL) {
00174 
00175                 notify_address_update(
00176                         mon->name, address->address, address->mask);
00177 
00178                 address = next_address(mon, address);
00179             }
00180 
00181             mon = next_monitor(mon);
00182         }
00183 
00184         mir = next_mirror(NULL);
00185         while(mir != NULL) {
00186 
00187             notify_mirror_update(mir->original, mir->redirect);
00188 
00189             mir = next_mirror(mir);
00190         }
00191         
00192         notify_replication_interval(get_replication_interval());
00193         
00194 
00195         // send it all out to the component that just connected
00196         while((msg = TAILQ_FIRST(&notification_msgs)) != NULL) {
00197 
00198             if(pconn_server_send(session, msg->action, msg->message,
00199                     msg->message_len) != PCONN_OK) {
00200 
00201                 LOG(TRACE_LOG_ERR, "%s: Failed to send configuration message"
00202                     " (%d) to the data component.", __func__, msg->action);
00203 
00204                 empty_notifications_queue();
00205                 return;
00206             }
00207 
00208             TAILQ_REMOVE(&notification_msgs, msg, entries);
00209             if(msg->message_len != 0) {
00210                 free(msg->message);
00211             }
00212             free(msg);
00213 
00214             junos_trace(MONITUBE_TRACEFLAG_CONNECTION, "%s: Dequeued message "
00215                     "sent to the data component", __func__);
00216         }
00217 
00218         return;
00219 
00220     case PCONN_EVENT_SHUTDOWN:
00221 
00222         junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00223             "%s: Disconnected from ms-%d/%d/0 (%s)",
00224             __func__, info.ppi_fpc_slot, info.ppi_pic_slot,
00225             inet_ntoa(peer_addr));
00226 
00227         break;
00228 
00229     case PCONN_EVENT_FAILED:
00230 
00231         junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00232             "%s: Connection failure from ms-%d/%d/0 (%s)",
00233             __func__, info.ppi_fpc_slot, info.ppi_pic_slot,
00234             inet_ntoa(peer_addr));
00235 
00236 
00237         break;
00238 
00239     default:
00240 
00241         LOG(TRACE_LOG_ERR, "%s: Received an unknown event", __func__);
00242 
00243         return;
00244     }
00245     
00246     tmp_session = TAILQ_FIRST(&session_list);
00247     
00248     while(tmp_session != NULL) {
00249         
00250         if(tmp_session->session == session) {
00251             TAILQ_REMOVE(&session_list, tmp_session, entries);
00252             free(tmp_session);
00253             return;
00254         }
00255         
00256         tmp_session = TAILQ_NEXT(tmp_session, entries);
00257     }
00258     
00259     LOG(TRACE_LOG_ERR, "%s: Cannot find an existing "
00260             "peer session to shutdown", __func__);
00261 }
00262 
00263 
00280 static status_t
00281 receive_message(pconn_session_t * session,
00282                 ipc_msg_t * msg,
00283                 void * cookie __unused)
00284 {
00285     flow_stat_t * data = NULL;
00286     double df;
00287     void * tmp;
00288     pconn_peer_info_t  info;
00289 
00290     pconn_session_get_peer_info(session, &info);
00291     
00292     switch(msg->subtype) {
00293 
00294     case MSG_FLOW_STAT_UPDATE:
00295 
00296         data = (flow_stat_t *)msg->data;
00297 
00298         // check message length
00299         INSIST_ERR(msg->length ==
00300             sizeof(flow_stat_t) + ntohs(data->mon_name_len));
00301 
00302         // check name length
00303         INSIST_ERR(strlen(data->mon_name) + 1 == ntohs(data->mon_name_len));
00304 
00305         // carefully convert to the double it really is
00306         data->mdi_df = ntohq(data->mdi_df);
00307         tmp = &data->mdi_df;
00308         df = *(double *)tmp;
00309 
00310         set_flow_stat(info.ppi_fpc_slot, info.ppi_pic_slot, data->mon_name,
00311             data->flow_addr, ntohs(data->flow_port), df, ntohl(data->mdi_mlr));
00312 
00313         break;
00314 
00315     default:
00316 
00317         LOG(TRACE_LOG_ERR,
00318             "%s: Received an unknown message type (%d) from the "
00319             "data component", __func__, msg->subtype);
00320         return EFAIL;
00321     }
00322 
00323     return SUCCESS;
00324 }
00325 
00326 
00327 /*** GLOBAL/EXTERNAL Functions ***/
00328 
00329 
00339 status_t
00340 init_server(evContext ctx)
00341 {
00342     pconn_server_params_t params;
00343 
00344     m_ctx = ctx;
00345 
00346     bzero(&params, sizeof(pconn_server_params_t));
00347 
00348     // setup the server args
00349     params.pconn_port            = MONITUBE_PORT_NUM;
00350     params.pconn_max_connections = MONITUBE_MGMT_SERVER_MAX_CONN;
00351     params.pconn_event_handler   = receive_connection;
00352 
00353     // bind
00354     mgmt_server = pconn_server_create(&params, m_ctx, receive_message, NULL);
00355 
00356     if(mgmt_server == NULL) {
00357         LOG(TRACE_LOG_ERR, "%s: Failed to initialize the pconn server"
00358             " on port %d.", __func__, MONITUBE_PORT_NUM);
00359         return EFAIL;
00360     }
00361 
00362     LOG(TRACE_LOG_INFO, "%s: Successfully initialized "
00363             "the pconn server on port %d.", __func__, MONITUBE_PORT_NUM);
00364 
00365     return SUCCESS;
00366 }
00367 
00368 
00372 void
00373 close_connections(void)
00374 {
00375     session_t * session;
00376 
00377     while((session = TAILQ_FIRST(&session_list)) != NULL) {
00378         TAILQ_REMOVE(&session_list, session, entries);
00379         pconn_session_close(session->session);
00380         free(session);
00381     }
00382 
00383     if(mgmt_server != NULL) {
00384         junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00385             "%s: Shutting down the server on port %d.",
00386             __func__, MONITUBE_PORT_NUM);
00387         
00388         pconn_server_shutdown(mgmt_server);
00389         mgmt_server = NULL;
00390     }
00391 
00392     junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00393         "%s: Deleting the queue of notification messages.", __func__);
00394 
00395     empty_notifications_queue();
00396 }
00397 
00398 
00405 void
00406 mspic_offline(const char * name)
00407 {
00408     char current_int_prefix[64];
00409     session_t * session;
00410     pconn_peer_info_t info;
00411 
00412     // see if its one we are connected to
00413 
00414     session = TAILQ_FIRST(&session_list);
00415     
00416     while(session != NULL) {
00417 
00418         pconn_session_get_peer_info(session->session, &info);
00419         
00420         snprintf(current_int_prefix, sizeof(current_int_prefix),
00421             "ms-%d/%d/0", info.ppi_fpc_slot, info.ppi_pic_slot);
00422 
00423         if(strstr(name, current_int_prefix) != NULL) {
00424 
00425             junos_trace(MONITUBE_TRACEFLAG_CONNECTION, "%s: Connection from %s "
00426                 "was shutdown (forced).", __func__, current_int_prefix);
00427 
00428             pconn_session_close(session->session);
00429             TAILQ_REMOVE(&session_list, session, entries);
00430             free(session);
00431             return;
00432         }
00433         session = TAILQ_NEXT(session, entries);
00434     }
00435 }
00436 
00437 
00447 void
00448 notify_monitor_update(const char * mon_name, uint32_t rate)
00449 {
00450     notification_msg_t * msg;
00451     update_mon_info_t * data;
00452 
00453     if(TAILQ_FIRST(&session_list) == NULL)
00454         return;
00455 
00456     msg = calloc(1, sizeof(notification_msg_t));
00457     INSIST(msg != NULL);
00458 
00459     msg->action = MSG_CONF_MON;
00460 
00461     msg->message_len = sizeof(update_mon_info_t) + strlen(mon_name) + 1;
00462 
00463     msg->message = data = calloc(1, msg->message_len);
00464     INSIST(data != NULL);
00465     data->rate = htonl(rate);
00466     data->mon_name_len = htons(strlen(mon_name) + 1);
00467     strcpy(data->mon_name, mon_name);
00468 
00469     TAILQ_INSERT_TAIL(&notification_msgs, msg, entries);
00470 
00471     junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00472         "%s: Enqueueing message for the data component", __func__);
00473 }
00474 
00475 
00488 void
00489 notify_address_update(const char * mon_name, in_addr_t address, in_addr_t mask)
00490 {
00491     notification_msg_t * msg;
00492     maddr_info_t * data;
00493 
00494     if(TAILQ_FIRST(&session_list) == NULL)
00495         return;
00496 
00497     msg = calloc(1, sizeof(notification_msg_t));
00498     INSIST(msg != NULL);
00499 
00500     msg->action = MSG_CONF_MON_ADDR;
00501 
00502     msg->message_len = sizeof(maddr_info_t) + strlen(mon_name) + 1;
00503 
00504     msg->message = data = calloc(1, msg->message_len);
00505     INSIST(data != NULL);
00506     data->addr = address;
00507     data->mask = mask;
00508     data->mon_name_len = htons(strlen(mon_name) + 1);
00509     strcpy(data->mon_name, mon_name);
00510 
00511     TAILQ_INSERT_TAIL(&notification_msgs, msg, entries);
00512 
00513     junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00514         "%s: Enqueueing message for the data component", __func__);
00515 }
00516 
00517 
00527 void
00528 notify_mirror_update(in_addr_t mirror_from, in_addr_t mirror_to)
00529 {
00530     notification_msg_t * msg;
00531     update_mir_info_t * data;
00532 
00533     if(TAILQ_FIRST(&session_list) == NULL)
00534         return;
00535 
00536     msg = calloc(1, sizeof(notification_msg_t));
00537     INSIST(msg != NULL);
00538 
00539     msg->action = MSG_CONF_MIR;
00540 
00541     msg->message_len = sizeof(update_mir_info_t);
00542 
00543     msg->message = data = calloc(1, msg->message_len);
00544     INSIST(data != NULL);
00545     data->mirror_from = mirror_from;
00546     data->mirror_to = mirror_to;
00547 
00548     TAILQ_INSERT_TAIL(&notification_msgs, msg, entries);
00549 
00550     junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00551         "%s: Enqueueing message for the data component", __func__);
00552 }
00553 
00554 
00561 void
00562 notify_monitor_delete(const char * mon_name)
00563 {
00564     notification_msg_t * msg;
00565     del_mon_info_t * data;
00566 
00567     if(TAILQ_FIRST(&session_list) == NULL)
00568         return;
00569 
00570     msg = calloc(1, sizeof(notification_msg_t));
00571     INSIST(msg != NULL);
00572 
00573     msg->action = MSG_DELETE_MON;
00574 
00575     msg->message_len = sizeof(del_mon_info_t) + strlen(mon_name) + 1;
00576 
00577     msg->message = data = calloc(1, msg->message_len);
00578     INSIST(data != NULL);
00579     data->mon_name_len = htons(strlen(mon_name) + 1);
00580     strcpy(data->mon_name, mon_name);
00581 
00582     TAILQ_INSERT_TAIL(&notification_msgs, msg, entries);
00583 
00584     junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00585         "%s: Enqueueing message for the data component", __func__);
00586 }
00587 
00588 
00601 void
00602 notify_address_delete(const char * mon_name, in_addr_t address, in_addr_t mask)
00603 {
00604     notification_msg_t * msg;
00605     maddr_info_t * data;
00606 
00607     if(TAILQ_FIRST(&session_list) == NULL)
00608         return;
00609 
00610     msg = calloc(1, sizeof(notification_msg_t));
00611     INSIST(msg != NULL);
00612 
00613     msg->action = MSG_DELETE_MON_ADDR;
00614 
00615     msg->message_len = sizeof(maddr_info_t) + strlen(mon_name) + 1;
00616 
00617     msg->message = data = calloc(1, msg->message_len);
00618     INSIST(data != NULL);
00619     data->addr = address;
00620     data->mask = mask;
00621     data->mon_name_len = htons(strlen(mon_name) + 1);
00622     strcpy(data->mon_name, mon_name);
00623 
00624     TAILQ_INSERT_TAIL(&notification_msgs, msg, entries);
00625 
00626     junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00627         "%s: Enqueueing message for the data component", __func__);
00628 }
00629 
00630 
00637 void
00638 notify_mirror_delete(in_addr_t mirror_from)
00639 {
00640     notification_msg_t * msg;
00641     del_mir_info_t * data;
00642 
00643     if(TAILQ_FIRST(&session_list) == NULL)
00644         return;
00645 
00646     msg = calloc(1, sizeof(notification_msg_t));
00647     INSIST(msg != NULL);
00648 
00649     msg->action = MSG_DELETE_MIR;
00650 
00651     msg->message_len = sizeof(del_mir_info_t);
00652 
00653     msg->message = data = calloc(1, msg->message_len);
00654     INSIST(data != NULL);
00655     data->mirror_from = mirror_from;
00656 
00657     TAILQ_INSERT_TAIL(&notification_msgs, msg, entries);
00658 
00659     junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00660         "%s: Enqueueing message for the data component", __func__);
00661 }
00662 
00663 
00667 void
00668 notify_delete_all_monitors(void)
00669 {
00670     notification_msg_t * msg;
00671 
00672     if(TAILQ_FIRST(&session_list) == NULL)
00673         return;
00674 
00675     msg = calloc(1, sizeof(notification_msg_t));
00676     INSIST(msg != NULL);
00677 
00678     msg->action = MSG_DELETE_ALL_MON;
00679 
00680     TAILQ_INSERT_TAIL(&notification_msgs, msg, entries);
00681 
00682     junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00683         "%s: Enqueueing message for the data component", __func__);
00684 }
00685 
00686 
00690 void
00691 notify_delete_all_mirrors(void)
00692 {
00693     notification_msg_t * msg;
00694 
00695     if(TAILQ_FIRST(&session_list) == NULL)
00696         return;
00697 
00698     msg = calloc(1, sizeof(notification_msg_t));
00699     INSIST(msg != NULL);
00700 
00701     msg->action = MSG_DELETE_ALL_MIR;
00702 
00703     TAILQ_INSERT_TAIL(&notification_msgs, msg, entries);
00704 
00705     junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00706         "%s: Enqueueing message for the data component", __func__);
00707 }
00708 
00709 
00716 void
00717 notify_replication_interval(uint8_t r_int)
00718 {
00719     notification_msg_t * msg;
00720     replication_info_t * data;
00721 
00722     if(TAILQ_FIRST(&session_list) == NULL)
00723         return;
00724 
00725     msg = calloc(1, sizeof(notification_msg_t));
00726     INSIST(msg != NULL);
00727 
00728     msg->action = MSG_REP_INFO;
00729 
00730     msg->message_len = sizeof(replication_info_t);
00731 
00732     msg->message = data = calloc(1, msg->message_len);
00733     INSIST(data != NULL);
00734     data->interval = r_int;
00735 
00736     TAILQ_INSERT_TAIL(&notification_msgs, msg, entries);
00737 
00738     junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00739         "%s: Enqueueing message for the data component", __func__);
00740 }
00741 
00742 
00747 void
00748 process_notifications(void)
00749 {
00750     notification_msg_t * msg;
00751     session_t * session;
00752     pconn_peer_info_t info;
00753 
00754     if(TAILQ_FIRST(&session_list) == NULL) { // no connections
00755         // don't bother doing anything yet
00756         empty_notifications_queue();
00757         return;
00758     }
00759 
00760     junos_trace(MONITUBE_TRACEFLAG_CONNECTION, "%s", __func__);
00761 
00762     // for each msg
00763     while((msg = TAILQ_FIRST(&notification_msgs)) != NULL) {
00764 
00765         // send to each session (PIC)
00766         session = TAILQ_FIRST(&session_list);
00767         while(session != NULL) {
00768             
00769             if(pconn_server_send(session->session,
00770                     msg->action, msg->message, msg->message_len) != PCONN_OK) {
00771 
00772                 pconn_session_get_peer_info(session->session, &info);
00773                 
00774                 LOG(LOG_ERR, "%s: Failed to send configuration message (%d) to "
00775                     "ms-%d/%d/0.", __func__, msg->action, info.ppi_fpc_slot,
00776                     info.ppi_pic_slot);
00777             }
00778             
00779             session = TAILQ_NEXT(session, entries);          
00780         }
00781 
00782         TAILQ_REMOVE(&notification_msgs, msg, entries);
00783         if(msg->message_len != 0) {
00784             free(msg->message);
00785         }
00786         free(msg);
00787 
00788         junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00789             "%s: Dequeued message sent to the data component(s)", __func__);
00790     }
00791 }
00792 

2007-2009 Juniper Networks, Inc. All rights reserved. The information contained herein is confidential information of Juniper Networks, Inc., and may not be used, disclosed, distributed, modified, or copied without the prior written consent of Juniper Networks, Inc. in an express license. This information is subject to change by Juniper Networks, Inc. Juniper Networks, the Juniper Networks logo, and JUNOS are registered trademarks of Juniper Networks, Inc. in the United States and other countries. All other trademarks, service marks, registered trademarks, or registered service marks are the property of their respective owners.
Generated on Sun May 30 20:27:03 2010 for SDK Your Net Corporation Monitube IPTV Monitoring Example: monitube-mgmt 1.0 by Doxygen 1.5.1