monitube2-mgmt_conn.c

Go to the documentation of this file.
00001 /*
00002  * $Id: monitube2-mgmt_conn.c 347265 2009-11-19 13:55:39Z kdickman $
00003  *
00004  * This code is provided as is by Juniper Networks SDK Developer Support.
00005  * It is provided with no warranties or guarantees, and Juniper Networks
00006  * will not provide support or maintenance of this code in any fashion.
00007  * The code is provided only to help a developer better understand how
00008  * the SDK can be used.
00009  *
00010  * Copyright (c) 2009, Juniper Networks, Inc.
00011  * All rights reserved.
00012  */
00013 
00021 #include <sync/common.h>
00022 #include <limits.h>
00023 #include <jnx/pconn.h>
00024 #include <sync/monitube2_ipc.h>
00025 #include "monitube2-mgmt_config.h"
00026 #include "monitube2-mgmt_conn.h"
00027 #include "monitube2-mgmt_logging.h"
00028 
00029 #include MONITUBE2_OUT_H
00030 
00031 /*** Constants ***/
00032 
00033 #define MONITUBE_MGMT_SERVER_MAX_CONN 30  
00034 
00035 #define RETRY_INTERVAL 5 
00036 
00037 /*** Data Structures ***/
00038 
00039 static evContext       m_ctx;         
00040 static pconn_server_t  * mgmt_server; 
00041 
00045 typedef struct session_s {
00046     pconn_session_t           * session;  
00047     TAILQ_ENTRY(session_s)    entries;    
00048 } session_t;
00049 
00053 typedef TAILQ_HEAD(session_list_s, session_s) session_list_t;
00054 
00058 static session_list_t session_list = TAILQ_HEAD_INITIALIZER(session_list);
00059 
00064 typedef struct notification_msg_s {
00065     msg_type_e   action;        
00066     uint16_t     fpc_slot;      
00067     uint16_t     pic_slot;      
00068     uint16_t     message_len;   
00069     void *       message;       
00070     TAILQ_ENTRY(notification_msg_s) entries;    
00071 } notification_msg_t;
00072 
00073 
00077 static TAILQ_HEAD(notification_buffer_s, notification_msg_s) notification_msgs =
00078             TAILQ_HEAD_INITIALIZER(notification_msgs);
00079 
00080 
00081 /*** STATIC/INTERNAL Functions ***/
00082 
00086 static void
00087 empty_notifications_queue(void)
00088 {
00089     notification_msg_t * msg;
00090 
00091     while((msg = TAILQ_FIRST(&notification_msgs)) != NULL) {
00092         TAILQ_REMOVE(&notification_msgs, msg, entries);
00093         if(msg->message_len != 0) {
00094             free(msg->message);
00095         }
00096         free(msg);
00097     }
00098 }
00099 
00100 
00113 static void
00114 receive_connection(pconn_session_t * session,
00115                    pconn_event_t event,
00116                    void * cookie UNUSED)
00117 {
00118     int rc;
00119     struct in_addr peer_addr;
00120     pconn_peer_info_t info;
00121     session_t * tmp_session;
00122     ss_info_t * ssi;
00123     list_item_t * item;
00124     rule_t * rule;
00125     address_t * a;
00126 
00127     rc = pconn_session_get_peer_info(session, &info);
00128 
00129     if(rc != PCONN_OK) {
00130         LOG(LOG_ERR, "%s: Cannot retrieve peer info "
00131             "for session. Error: %d", __func__, rc);
00132 
00133         pconn_session_close(session);
00134         return;
00135     }
00136     
00137     INSIST_ERR(info.ppi_peer_type == PCONN_PEER_TYPE_PIC);
00138     
00139     rc = pconn_get_peer_address(&info, &peer_addr.s_addr);
00140 
00141     if(rc != PCONN_OK) {
00142         LOG(LOG_ERR, "%s: Cannot retrieve peer address "
00143             "for session. Error: %d", __func__, rc);
00144 
00145         pconn_session_close(session);
00146         return;
00147     }
00148     
00149     switch (event) {
00150 
00151     case PCONN_EVENT_ESTABLISHED:
00152 
00153         junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00154             "%s: Accepted connection from ms-%d/%d/0 (%s)",
00155             __func__, info.ppi_fpc_slot, info.ppi_pic_slot,
00156             inet_ntoa(peer_addr));
00157 
00158         tmp_session = calloc(1, sizeof(session_t));
00159         INSIST_ERR(tmp_session != NULL);
00160         
00161         tmp_session->session = session;
00162         TAILQ_INSERT_TAIL(&session_list, tmp_session, entries);
00163 
00164         // Reset and send all configuration to the service (PIC) component
00165         
00166         empty_notifications_queue(); // start empty
00167 
00168         // Go over conf and add it to queue of messages
00169         // only service sets using this ms pic: FPC#+PIC#
00170         
00171         ssi = next_serviceset(NULL);
00172         while(ssi != NULL) {
00173             if (ssi->fpc_slot != info.ppi_fpc_slot || 
00174                     ssi->pic_slot != info.ppi_pic_slot) {
00175                 ssi = next_serviceset(ssi);
00176                 continue;
00177             }
00178             // else service set is applied on this same PIC
00179             
00180             item = TAILQ_FIRST(&ssi->rules);
00181             while(item != NULL) {
00182                 rule = (rule_t *)item->item;
00183                 
00184                 notify_config_rule(rule->name, rule->rate, rule->redirect,
00185                                         ssi->fpc_slot, ssi->pic_slot);
00186                 
00187                 a = next_address(rule, NULL);
00188                 while(a != NULL) {
00189                     notify_config_rule_prefix(rule->name, a->address, a->mask,
00190                                            false, ssi->fpc_slot, ssi->pic_slot);
00191                     a = next_address(rule, a);
00192                 }
00193                 
00194                 if (ssi->id) { // if still 0 we haven't got ssrb info
00195                     notify_apply_rule(rule->name, ssi->id, ssi->gen_num, 
00196                             ssi->svc_id, ssi->fpc_slot, ssi->pic_slot);
00197                 }
00198                 item = TAILQ_NEXT(item, entries);
00199             }
00200             
00201             ssi = next_serviceset(ssi);
00202         }
00203 
00204         process_notifications(); // send it
00205 
00206         return;
00207 
00208     case PCONN_EVENT_SHUTDOWN:
00209 
00210         junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00211             "%s: Disconnected from ms-%d/%d/0 (%s)",
00212             __func__, info.ppi_fpc_slot, info.ppi_pic_slot,
00213             inet_ntoa(peer_addr));
00214 
00215         break;
00216 
00217     case PCONN_EVENT_FAILED:
00218 
00219         junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00220             "%s: Connection failure from ms-%d/%d/0 (%s)",
00221             __func__, info.ppi_fpc_slot, info.ppi_pic_slot,
00222             inet_ntoa(peer_addr));
00223 
00224 
00225         break;
00226 
00227     default:
00228 
00229         LOG(LOG_ERR, "%s: Received an unknown event", __func__);
00230 
00231         return;
00232     }
00233     
00234     tmp_session = TAILQ_FIRST(&session_list);
00235     
00236     while(tmp_session != NULL) {
00237         
00238         if(tmp_session->session == session) {
00239             TAILQ_REMOVE(&session_list, tmp_session, entries);
00240             free(tmp_session);
00241             return;
00242         }
00243         
00244         tmp_session = TAILQ_NEXT(tmp_session, entries);
00245     }
00246     
00247     LOG(LOG_ERR, "%s: Cannot find an existing peer session to shutdown",
00248             __func__);
00249 }
00250 
00251 
00268 static status_t
00269 receive_message(pconn_session_t * session,
00270                 ipc_msg_t * msg,
00271                 void * cookie UNUSED)
00272 {
00273     flow_stat_t * data = NULL;
00274     double df;
00275     void * tmp;
00276     pconn_peer_info_t  info;
00277 
00278     pconn_session_get_peer_info(session, &info);
00279     
00280     switch(msg->subtype) {
00281 
00282     case MSG_FLOW_STAT_UPDATE:
00283 
00284         data = (flow_stat_t *)msg->data;
00285 
00286         // check message length
00287         INSIST_ERR(msg->length == sizeof(flow_stat_t));
00288 
00289         // carefully convert to the double it really is
00290         data->mdi_df = ntohq(data->mdi_df);
00291         tmp = &data->mdi_df;
00292         df = *(double *)tmp;
00293 
00294         set_flow_stat(info.ppi_fpc_slot, info.ppi_pic_slot, ntohs(data->ss_id),
00295             data->flow_addr, ntohs(data->flow_port), df, ntohl(data->mdi_mlr));
00296 
00297         break;
00298 
00299     default:
00300 
00301         LOG(LOG_ERR,
00302             "%s: Received an unknown message type (%d) from the "
00303             "data component", __func__, msg->subtype);
00304         return EFAIL;
00305     }
00306 
00307     return SUCCESS;
00308 }
00309 
00310 
00311 /*** GLOBAL/EXTERNAL Functions ***/
00312 
00313 
00323 status_t
00324 init_server(evContext ctx)
00325 {
00326     pconn_server_params_t params;
00327 
00328     m_ctx = ctx;
00329 
00330     bzero(&params, sizeof(pconn_server_params_t));
00331 
00332     // setup the server args
00333     params.pconn_port            = MONITUBE_PORT_NUM;
00334     params.pconn_max_connections = MONITUBE_MGMT_SERVER_MAX_CONN;
00335     params.pconn_event_handler   = receive_connection;
00336 
00337     // bind
00338     mgmt_server = pconn_server_create(&params, m_ctx, receive_message, NULL);
00339 
00340     if(mgmt_server == NULL) {
00341         LOG(LOG_ERR, "%s: Failed to initialize the pconn server on port %d.",
00342             __func__, MONITUBE_PORT_NUM);
00343         return EFAIL;
00344     }
00345 
00346     LOG(LOG_INFO, "%s: Successfully initialized the pconn server on port %d.",
00347             __func__, MONITUBE_PORT_NUM);
00348 
00349     return SUCCESS;
00350 }
00351 
00352 
00356 void
00357 close_connections(void)
00358 {
00359     session_t * session;
00360 
00361     while((session = TAILQ_FIRST(&session_list)) != NULL) {
00362         TAILQ_REMOVE(&session_list, session, entries);
00363         pconn_session_close(session->session);
00364         free(session);
00365     }
00366 
00367     if(mgmt_server != NULL) {
00368         junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00369             "%s: Shutting down the server on port %d.",
00370             __func__, MONITUBE_PORT_NUM);
00371         
00372         pconn_server_shutdown(mgmt_server);
00373         mgmt_server = NULL;
00374     }
00375 
00376     junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00377         "%s: Deleting the queue of notification messages.", __func__);
00378 
00379     empty_notifications_queue();
00380 }
00381 
00382 
00389 void
00390 mspic_offline(const char * name)
00391 {
00392     char current_int_prefix[64];
00393     session_t * session;
00394     pconn_peer_info_t info;
00395 
00396     // see if its one we are connected to
00397 
00398     session = TAILQ_FIRST(&session_list);
00399     
00400     while(session != NULL) {
00401 
00402         pconn_session_get_peer_info(session->session, &info);
00403         
00404         snprintf(current_int_prefix, sizeof(current_int_prefix),
00405             "ms-%d/%d/0", info.ppi_fpc_slot, info.ppi_pic_slot);
00406 
00407         if(strstr(name, current_int_prefix) != NULL) {
00408 
00409             junos_trace(MONITUBE_TRACEFLAG_CONNECTION, "%s: Connection from %s "
00410                 "was shutdown (forced).", __func__, current_int_prefix);
00411 
00412             pconn_session_close(session->session);
00413             TAILQ_REMOVE(&session_list, session, entries);
00414             free(session);
00415             return;
00416         }
00417         session = TAILQ_NEXT(session, entries);
00418     }
00419 }
00420 
00421 
00427 void
00428 notify_delete_all_policy(void)
00429 {
00430     notification_msg_t * msg;
00431 
00432     if(TAILQ_FIRST(&session_list) == NULL)
00433         return;
00434 
00435     msg = calloc(1, sizeof(notification_msg_t));
00436     INSIST_ERR(msg != NULL);
00437 
00438     msg->action = MSG_DELETE_ALL;
00439     msg->fpc_slot = USHRT_MAX; // broadcast to all
00440 
00441     TAILQ_INSERT_TAIL(&notification_msgs, msg, entries);
00442 
00443     junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00444         "%s: Enqueueing message for the data component", __func__);
00445 }
00446 
00447 
00467 void
00468 notify_delete_serviceset(uint16_t ss_id,
00469                          uint32_t gen_num,
00470                          uint32_t svc_id,
00471                          uint16_t fpc_slot,
00472                          uint16_t pic_slot)
00473 {
00474     notification_msg_t * msg;
00475     del_ss_t * data;
00476 
00477     if(TAILQ_FIRST(&session_list) == NULL)
00478         return;
00479 
00480     msg = calloc(1, sizeof(notification_msg_t));
00481     INSIST_ERR(msg != NULL);
00482 
00483     msg->action = MSG_DELETE_SS;
00484     msg->fpc_slot = fpc_slot;
00485     msg->pic_slot = pic_slot;
00486 
00487     msg->message_len = sizeof(del_ss_t);
00488 
00489     msg->message = data = calloc(1, msg->message_len);
00490     INSIST_ERR(data != NULL);
00491     data->ss_id = htons(ss_id);
00492     data->gen_num = htonl(gen_num);
00493     data->svc_id = htonl(svc_id);
00494 
00495     TAILQ_INSERT_TAIL(&notification_msgs, msg, entries);
00496 
00497     junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00498         "%s: Enqueueing message for the data component", __func__);
00499 }
00500 
00501 
00523 void
00524 notify_apply_rule(char * rule_name,
00525                   uint16_t ss_id,
00526                   uint32_t gen_num,
00527                   uint32_t svc_id,
00528                   uint16_t fpc_slot,
00529                   uint16_t pic_slot)
00530 {
00531     notification_msg_t * msg;
00532     apply_rule_t * data;
00533 
00534     if(TAILQ_FIRST(&session_list) == NULL)
00535         return;
00536 
00537     msg = calloc(1, sizeof(notification_msg_t));
00538     INSIST_ERR(msg != NULL);
00539 
00540     msg->action = MSG_APPLY_RULE;
00541     msg->fpc_slot = fpc_slot;
00542     msg->pic_slot = pic_slot;
00543 
00544     msg->message_len = sizeof(apply_rule_t) + strlen(rule_name) + 1;
00545 
00546     msg->message = data = calloc(1, msg->message_len);
00547     INSIST_ERR(data != NULL);
00548     data->ss_id = htons(ss_id);
00549     data->gen_num = htonl(gen_num);
00550     data->svc_id = htonl(svc_id);    
00551     data->rule_name_len = htons(strlen(rule_name) + 1);
00552     strcpy(data->rule_name, rule_name);
00553 
00554     TAILQ_INSERT_TAIL(&notification_msgs, msg, entries);
00555 
00556     junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00557         "%s: Enqueueing message for the data component", __func__);
00558 }
00559 
00560 
00582 void
00583 notify_remove_rule(char * rule_name,
00584                    uint16_t ss_id,
00585                    uint32_t gen_num,
00586                    uint32_t svc_id,
00587                    uint16_t fpc_slot,
00588                    uint16_t pic_slot)
00589 {
00590     notification_msg_t * msg;
00591     apply_rule_t * data;
00592 
00593     if(TAILQ_FIRST(&session_list) == NULL)
00594         return;
00595 
00596     msg = calloc(1, sizeof(notification_msg_t));
00597     INSIST_ERR(msg != NULL);
00598 
00599     msg->action = MSG_REMOVE_RULE;
00600     msg->fpc_slot = fpc_slot;
00601     msg->pic_slot = pic_slot;
00602 
00603     msg->message_len = sizeof(apply_rule_t) + strlen(rule_name) + 1;
00604 
00605     msg->message = data = calloc(1, msg->message_len);
00606     INSIST_ERR(data != NULL);
00607     data->ss_id = htons(ss_id);
00608     data->gen_num = htonl(gen_num);
00609     data->svc_id = htonl(svc_id);
00610     data->rule_name_len = htons(strlen(rule_name) + 1);
00611     strcpy(data->rule_name, rule_name);
00612 
00613     TAILQ_INSERT_TAIL(&notification_msgs, msg, entries);
00614 
00615     junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00616         "%s: Enqueueing message for the data component", __func__);
00617 }
00618 
00619 
00638 void
00639 notify_config_rule(char * rule_name,
00640                    uint32_t rate,
00641                    in_addr_t redirect,
00642                    uint16_t fpc_slot,
00643                    uint16_t pic_slot)
00644 {
00645     notification_msg_t * msg;
00646     update_rule_action_t * data;
00647 
00648     if(TAILQ_FIRST(&session_list) == NULL)
00649         return;
00650 
00651     msg = calloc(1, sizeof(notification_msg_t));
00652     INSIST_ERR(msg != NULL);
00653 
00654     msg->action = MSG_CONF_RULE_ACTION;
00655     msg->fpc_slot = fpc_slot;
00656     msg->pic_slot = pic_slot;
00657 
00658     msg->message_len = sizeof(update_rule_action_t) + strlen(rule_name) + 1;
00659 
00660     msg->message = data = calloc(1, msg->message_len);
00661     INSIST_ERR(data != NULL);
00662     data->rate = htonl(rate);
00663     data->redirect = redirect; // already network byte order   
00664     data->rule_name_len = htons(strlen(rule_name) + 1);
00665     strcpy(data->rule_name, rule_name);
00666 
00667     TAILQ_INSERT_TAIL(&notification_msgs, msg, entries);
00668 
00669     junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00670         "%s: Enqueueing message for the data component", __func__);
00671 }
00672 
00673 
00686 void
00687 notify_delete_rule(char * rule_name,
00688                    uint16_t fpc_slot,
00689                    uint16_t pic_slot)
00690 {
00691     notification_msg_t * msg;
00692     delete_rule_t * data;
00693 
00694     if(TAILQ_FIRST(&session_list) == NULL)
00695         return;
00696 
00697     msg = calloc(1, sizeof(notification_msg_t));
00698     INSIST_ERR(msg != NULL);
00699 
00700     msg->action = MSG_DELETE_RULE;
00701     msg->fpc_slot = fpc_slot;
00702     msg->pic_slot = pic_slot;
00703 
00704     msg->message_len = sizeof(delete_rule_t) + strlen(rule_name) + 1;
00705 
00706     msg->message = data = calloc(1, msg->message_len);
00707     INSIST_ERR(data != NULL);   
00708     data->rule_name_len = htons(strlen(rule_name) + 1);
00709     strcpy(data->rule_name, rule_name);
00710 
00711     TAILQ_INSERT_TAIL(&notification_msgs, msg, entries);
00712 
00713     junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00714         "%s: Enqueueing message for the data component", __func__);
00715 }
00716 
00717 
00740 void
00741 notify_config_rule_prefix(char * rule_name,
00742                           in_addr_t addr,
00743                           in_addr_t mask,
00744                           bool delete,
00745                           uint16_t fpc_slot,
00746                           uint16_t pic_slot)
00747 {
00748     notification_msg_t * msg;
00749     rule_addr_t * data;
00750 
00751     if(TAILQ_FIRST(&session_list) == NULL)
00752         return;
00753 
00754     msg = calloc(1, sizeof(notification_msg_t));
00755     INSIST_ERR(msg != NULL);
00756 
00757     if (delete) {
00758         msg->action = MSG_DELETE_RULE_MATCH_ADDR;
00759     } else {
00760         msg->action = MSG_CONF_RULE_MATCH_ADDR;
00761     }
00762     msg->fpc_slot = fpc_slot;
00763     msg->pic_slot = pic_slot;
00764 
00765     msg->message_len = sizeof(rule_addr_t) + strlen(rule_name) + 1;
00766 
00767     msg->message = data = calloc(1, msg->message_len);
00768     INSIST_ERR(data != NULL);
00769     data->addr = addr; // already network byte order
00770     data->mask = mask; // already network byte order   
00771     data->rule_name_len = htons(strlen(rule_name) + 1);
00772     strcpy(data->rule_name, rule_name);
00773 
00774     TAILQ_INSERT_TAIL(&notification_msgs, msg, entries);
00775 
00776     junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00777         "%s: Enqueueing message for the data component", __func__);
00778 }
00779 
00780 
00785 void
00786 process_notifications(void)
00787 {
00788     notification_msg_t * msg;
00789     session_t * session;
00790     pconn_peer_info_t info;
00791 
00792     if(TAILQ_FIRST(&session_list) == NULL) { // no connections
00793         // don't bother doing anything yet
00794         empty_notifications_queue();
00795         return;
00796     }
00797 
00798     junos_trace(MONITUBE_TRACEFLAG_CONNECTION, "%s", __func__);
00799 
00800     // for each msg
00801     while((msg = TAILQ_FIRST(&notification_msgs)) != NULL) {
00802 
00803         // send to sessions with matching fpc & pic info
00804         
00805         session = TAILQ_FIRST(&session_list);
00806         while(session != NULL) {
00807             pconn_session_get_peer_info(session->session, &info);
00808             
00809             if(msg->fpc_slot == USHRT_MAX ||
00810                 (msg->fpc_slot == info.ppi_fpc_slot &&
00811                     msg->pic_slot == info.ppi_pic_slot)) {
00812             
00813                 if(pconn_server_send(session->session, msg->action,
00814                         msg->message, msg->message_len) != PCONN_OK) {
00815     
00816                     LOG(LOG_ERR, "%s: Failed to send configuration message (%d)"
00817                             "to ms-%d/%d/0.", __func__, msg->action,
00818                             info.ppi_fpc_slot, info.ppi_pic_slot);
00819                 }
00820             }
00821             session = TAILQ_NEXT(session, entries);          
00822         }
00823 
00824         TAILQ_REMOVE(&notification_msgs, msg, entries);
00825         if(msg->message_len != 0) {
00826             free(msg->message);
00827         }
00828         free(msg);
00829 
00830         junos_trace(MONITUBE_TRACEFLAG_CONNECTION,
00831             "%s: Dequeued message sent to the data component(s)", __func__);
00832     }
00833 }
00834 

2007-2009 Juniper Networks, Inc. All rights reserved. The information contained herein is confidential information of Juniper Networks, Inc., and may not be used, disclosed, distributed, modified, or copied without the prior written consent of Juniper Networks, Inc. in an express license. This information is subject to change by Juniper Networks, Inc. Juniper Networks, the Juniper Networks logo, and JUNOS are registered trademarks of Juniper Networks, Inc. in the United States and other countries. All other trademarks, service marks, registered trademarks, or registered service marks are the property of their respective owners.
Generated on Sun May 30 20:27:04 2010 for SDK Your Net Corporation Monitube2 IPTV Monitoring Example: monitube2-mgmt 1.0 by Doxygen 1.5.1