equilibrium-data_conn.c

Go to the documentation of this file.
00001 /*
00002  * $Id: equilibrium-data_conn.c 346460 2009-11-14 05:06:47Z ssiano $
00003  *
00004  * This code is provided as is by Juniper Networks SDK Developer Support.
00005  * It is provided with no warranties or guarantees, and Juniper Networks
00006  * will not provide support or maintenance of this code in any fashion.
00007  * The code is provided only to help a developer better understand how
00008  * the SDK can be used.
00009  * 
00010  * Copyright (c) 2008, Juniper Networks, Inc.
00011  * All rights reserved.
00012  */
00013 
00022 #include "equilibrium-data_main.h"
00023 #include <jnx/pconn.h>
00024 #include "equilibrium-data_config.h"
00025 #include "equilibrium-data_conn.h"
00026 
00027 
00028 /*** Constants ***/
00029 
00033 #define RETRY_CONNECT 60
00034 #define CONNECT_RETRIES    1  
00035 
00036 
00037 /*** Data Structures ***/
00038 
00043 typedef struct notification_msg_s {
00044     msg_type_e   action;        
00045     in_addr_t    app_addr;      
00046     in_addr_t    app_port;      
00047     void *       message;       
00048     TAILQ_ENTRY(notification_msg_s) entries;    
00049 } notification_msg_t;
00050 
00051 
00055 static TAILQ_HEAD(notification_buffer_s, notification_msg_s) notification_msgs = 
00056             TAILQ_HEAD_INITIALIZER(notification_msgs);
00057 
00058 static msp_spinlock_t msgs_lock;     
00059 
00060 static pconn_client_t * mgmt_client; 
00061 
00062 static evTimerID  mgmt_timer_id;     
00063 
00064 static evContext  main_ctx;          
00065 
00066 
00067 /*** STATIC/INTERNAL Functions ***/
00068 
00069 
00073 static void connect_mgmt(evContext ctx, void * uap,
00074     struct timespec due, struct timespec inter);
00075 
00076 
00093 static void
00094 process_notifications_messages(evContext ctx UNUSED, void * uap  UNUSED,
00095             struct timespec due UNUSED, struct timespec inter UNUSED)
00096 {
00097     notification_msg_t * msg;
00098     sessions_status_t * session_status, * session_data;
00099     server_status_t * server_status, * server_data;
00100     void * data;
00101     char * app_name;
00102     int rc = 0, len;
00103     
00104     if(mgmt_client == NULL) { // don't bother doing anything yet
00105         return;
00106     }
00107     
00108     INSIST_ERR(msp_spinlock_lock(&msgs_lock) == MSP_OK);
00109     
00110     while((msg = TAILQ_FIRST(&notification_msgs)) != NULL) {
00111         
00112         TAILQ_REMOVE(&notification_msgs, msg, entries);
00113         
00114         INSIST_ERR(msp_spinlock_unlock(&msgs_lock) == MSP_OK);
00115         
00116         if(msg->action == MSG_STATUS_UPDATE) {
00117             session_status = (sessions_status_t *)(msg->message);
00118 
00119             app_name = get_app_name(
00120                     session_status->svc_set_id, msg->app_addr, msg->app_port);
00121             
00122             len = sizeof(sessions_status_t) + strlen(app_name) + 1;
00123             session_data = calloc(1, len);
00124             INSIST_ERR(session_data != NULL);
00125             
00126             session_data->active_sessions = session_status->active_sessions;
00127             session_data->svc_set_id = session_status->svc_set_id;
00128             session_data->app_name_len = strlen(app_name) + 1;
00129             strcpy(session_data->app_name, app_name);
00130             
00131             data = session_data;
00132             
00133             LOG(LOG_INFO, "%s: Sending application status update to "
00134                     "mgmt component.", __func__);
00135 
00136         } else {
00137             server_status = (server_status_t *)(msg->message);
00138 
00139             app_name = get_app_name(
00140                     server_status->svc_set_id, msg->app_addr, msg->app_port);
00141             
00142             len = sizeof(server_status_t) + strlen(app_name) + 1;
00143             server_data = calloc(1, len);
00144             INSIST_ERR(server_data != NULL);
00145             
00146             server_data->server_status = server_status->server_status;
00147             server_data->svc_set_id = server_status->svc_set_id;
00148             server_data->server_addr = server_status->server_addr;
00149             server_data->app_name_len = strlen(app_name) + 1;
00150             strcpy(server_data->app_name, app_name);
00151             
00152             data = server_data;
00153             
00154             LOG(LOG_INFO, "%s: Sending server status update to "
00155                     "mgmt component.", __func__);
00156         }
00157         
00158         rc = pconn_client_send(mgmt_client, msg->action, data, len);
00159       
00160         if(rc != PCONN_OK) {
00161             // put message back in and process soon
00162             
00163             LOG(LOG_ERR, "%s: Failed to send (%d) to mgmt component."
00164                     " Error: %d", __func__, msg->action, rc);
00165 
00166             INSIST_ERR(msp_spinlock_lock(&msgs_lock) == MSP_OK);
00167             
00168             TAILQ_INSERT_TAIL(&notification_msgs, msg, entries);
00169             
00170             INSIST_ERR(msp_spinlock_unlock(&msgs_lock) == MSP_OK);
00171             
00172             if(evSetTimer(main_ctx, process_notifications_messages, NULL,
00173                     evAddTime(evNowTime(), evConsTime(5, 0)),
00174                     evConsTime(0, 0), NULL)) {
00175 
00176                 LOG(LOG_EMERG, "%s: evSetTimer() failed! Will not be "
00177                         "able to process buffered notifications", __func__);
00178             }
00179         } else {
00180             free(msg->message);
00181             free(msg);
00182         }
00183         free(data);
00184         
00185         INSIST_ERR(msp_spinlock_lock(&msgs_lock) == MSP_OK);
00186     }
00187     
00188     INSIST_ERR(msp_spinlock_unlock(&msgs_lock) == MSP_OK);
00189     
00190     return;
00191 }
00192 
00193 
00198 static void
00199 process_notifications(void)
00200 {
00201     if(evSetTimer(main_ctx, process_notifications_messages, NULL,
00202             evConsTime(0,0), evConsTime(0, 0), NULL)) {
00203 
00204         LOG(LOG_EMERG, "%s: evSetTimer() failed! Will not be "
00205                 "able to process buffered notifications", __func__);
00206     }
00207 }
00208 
00221 static void
00222 mgmt_client_connection(pconn_client_t * client,
00223                       pconn_event_t event,
00224                       void * cookie UNUSED)
00225 {
00226     INSIST_ERR(client == mgmt_client);
00227 
00228     switch (event) {
00229 
00230     case PCONN_EVENT_ESTABLISHED:
00231         
00232         // clear the retry timer
00233         if(evTestID(mgmt_timer_id)) {
00234             evClearTimer(main_ctx, mgmt_timer_id);
00235             evInitID(&mgmt_timer_id);
00236         }
00237         
00238         LOG(LOG_INFO, "%s: Connected to the equilibrium-mgmt component "
00239                 "on the RE", __func__);
00240         
00241         break;
00242 
00243     case PCONN_EVENT_SHUTDOWN:
00244         
00245         if(mgmt_client != NULL) {
00246             clear_config();
00247             LOG(LOG_INFO, "%s: Disconnected from the equilibrium-mgmt component"
00248                     " on the RE", __func__);
00249         }
00250         
00251         mgmt_client = NULL; // connection will be closed
00252         
00253         // Reconnect to it if timer not going
00254         if(!evTestID(mgmt_timer_id) &&
00255            evSetTimer(main_ctx, connect_mgmt, NULL, evConsTime(0,0),
00256                evConsTime(RETRY_CONNECT, 0), &mgmt_timer_id)) {
00257 
00258             LOG(LOG_EMERG, "%s: Failed to initialize a re-connect timer to "
00259                 "reconnect to the mgmt component", __func__);
00260         }
00261         
00262         break;
00263 
00264     case PCONN_EVENT_FAILED:
00265 
00266         LOG(LOG_ERR, "%s: Received a PCONN_EVENT_FAILED event", __func__);
00267         
00268         if(mgmt_client != NULL) {
00269             clear_config();
00270             LOG(LOG_INFO, "%s: Disconnected from the equilibrium-mgmt component"
00271                     " on the RE", __func__);
00272         }
00273         
00274         mgmt_client = NULL; // connection will be closed
00275         
00276         // Reconnect to it if timer not going
00277         if(!evTestID(mgmt_timer_id) &&
00278            evSetTimer(main_ctx, connect_mgmt, NULL, evConsTime(0,0),
00279                evConsTime(RETRY_CONNECT, 0), &mgmt_timer_id)) {
00280 
00281             LOG(LOG_EMERG, "%s: Failed to initialize a re-connect timer to "
00282                 "reconnect to the mgmt component", __func__);
00283         }
00284         
00285         break;
00286         
00287     default:
00288         LOG(LOG_ERR, "%s: Received an unknown pconn event", __func__);
00289     }
00290 }
00291 
00292 
00308 static status_t
00309 mgmt_client_message(pconn_client_t * session,
00310                    ipc_msg_t * msg,
00311                    void * cookie UNUSED)
00312 {
00313     INSIST_ERR(session == mgmt_client);
00314     INSIST_ERR(msg != NULL);
00315 
00316     switch(msg->subtype) { // a msg_type_e
00317     
00318     case MSG_DELETE_ALL:
00319         
00320         INSIST_ERR(msg->length == 0);
00321         
00322         LOG(LOG_INFO, "%s: Received a configuration update: "
00323                 "reset configuration", __func__);
00324         
00325         reset_configuration();
00326         
00327         break;
00328         
00329     case MSG_DELETE_SS:
00330     {
00331         del_svcset_info_t * data = (del_svcset_info_t *)msg->data;
00332         INSIST_ERR(msg->length == sizeof(del_svcset_info_t));
00333         
00334         LOG(LOG_INFO, "%s: Received a configuration update: delete service set",
00335                 __func__);
00336         
00337         delete_service_set(ntohs(data->svc_set_id));
00338         
00339         break;
00340     }
00341     case MSG_DELETE_APP:
00342     {
00343         del_app_info_t * data = (del_app_info_t *)msg->data;
00344         INSIST_ERR(msg->length == 
00345             sizeof(del_app_info_t) + ntohs(data->app_name_len));
00346         
00347         LOG(LOG_INFO, "%s: Received a configuration update: delete application",
00348                 __func__);
00349         
00350         delete_application(ntohs(data->svc_set_id), data->app_name);
00351         
00352         break;
00353     }
00354     case MSG_DELETE_SERVER:
00355     {
00356         server_info_t * data = (server_info_t *)msg->data;
00357         INSIST_ERR(msg->length == 
00358             sizeof(server_info_t) + ntohs(data->app_name_len));
00359         
00360         LOG(LOG_INFO, "%s: Received a configuration update: delete server",
00361                 __func__);
00362         
00363         delete_server(ntohs(data->svc_set_id), data->app_name,
00364                 data->server_addr);
00365         
00366         break;
00367     }   
00368     case MSG_DELETE_ALL_SERVERS:
00369     {
00370         del_app_info_t * data = (del_app_info_t *)msg->data;
00371         INSIST_ERR(msg->length == 
00372             sizeof(del_app_info_t) + ntohs(data->app_name_len));
00373         
00374         LOG(LOG_INFO, "%s: Received a configuration update: delete all servers",
00375                 __func__);
00376         
00377         delete_all_servers(ntohs(data->svc_set_id), data->app_name);
00378         
00379         break;
00380     }   
00381     case MSG_CONF_APPLICATION:
00382     {
00383         update_app_info_t * data = (update_app_info_t *)msg->data;
00384         INSIST_ERR(msg->length == 
00385             sizeof(update_app_info_t) + ntohs(data->app_name_len));
00386         
00387         LOG(LOG_INFO, "%s: Received a configuration update: update application (%s)",
00388             __func__, data->app_name);
00389         
00390         update_application(ntohs(data->svc_set_id),
00391                data->app_name, data->app_addr, data->app_port,
00392                ntohs(data->session_timeout), ntohs(data->connection_interval),
00393                ntohs(data->connection_timeout), ntohs(data->timeouts_allowed),
00394                ntohs(data->down_retry_interval));
00395         
00396         break;
00397     }
00398     case MSG_CONF_SERVER:
00399     {
00400         server_info_t * data = (server_info_t *)msg->data;
00401         INSIST_ERR(msg->length == 
00402             sizeof(server_info_t) + ntohs(data->app_name_len));
00403         
00404         LOG(LOG_INFO, "%s: Received a configuration update: update server (%s)",
00405             __func__, data->app_name);
00406         
00407         add_server(ntohs(data->svc_set_id), data->app_name,
00408                 data->server_addr);
00409         
00410         break;
00411     }   
00412     default:
00413         LOG(LOG_ERR, "%s: Received an unknown message type (%d) from the "
00414             "mgmt component", __func__, msg->subtype);
00415         return EFAIL;
00416     }
00417     
00418     return SUCCESS;
00419 }
00420 
00421 
00437 static void
00438 connect_mgmt(evContext ctx,
00439             void * uap  UNUSED,
00440             struct timespec due UNUSED,
00441             struct timespec inter UNUSED)
00442 {
00443     pconn_client_params_t params;
00444     
00445     bzero(&params, sizeof(pconn_client_params_t));
00446     
00447     // setup the client args
00448     params.pconn_peer_info.ppi_peer_type = PCONN_PEER_TYPE_RE;
00449     params.pconn_port                    = EQUILIBRIUM_PORT_NUM;
00450     params.pconn_num_retries             = CONNECT_RETRIES;
00451     params.pconn_event_handler           = mgmt_client_connection;
00452     
00453     if(mgmt_client) {
00454         // Then this is the second time in a row this is called even though it
00455         // didn't fail. We haven't received an event like ESTABLISHED yet.
00456         pconn_client_close(mgmt_client);
00457     }
00458     
00459     // connect
00460     mgmt_client = pconn_client_connect_async(
00461                     &params, ctx, mgmt_client_message, NULL);
00462     
00463     if(mgmt_client == NULL) {
00464         LOG(LOG_ERR, "%s: Failed to initialize the pconn client connection "
00465             "to the mgmt component", __func__);
00466     }
00467     
00468     LOG(LOG_INFO, "%s: Trying to connect to the mgmt component", __func__);
00469 }
00470 
00471 
00472 /*** GLOBAL/EXTERNAL Functions ***/
00473 
00474 
00484 status_t
00485 init_connections(evContext ctx)
00486 {
00487     mgmt_client = NULL;
00488     
00489     main_ctx = ctx;
00490     
00491     evInitID(&mgmt_timer_id);
00492     msp_spinlock_init(&msgs_lock);
00493     
00494     // Connect to the MGMT component
00495     if(evSetTimer(ctx, connect_mgmt, NULL, evNowTime(),
00496         evConsTime(RETRY_CONNECT, 0), &mgmt_timer_id)) {
00497 
00498         LOG(LOG_EMERG, "%s: Failed to initialize a connect timer to connect "
00499             "to the MGMT component", __func__);
00500         return EFAIL;
00501     }
00502 
00503     return SUCCESS;
00504 }
00505 
00506 
00510 void
00511 close_connections(void)
00512 {
00513     notification_msg_t * msg;
00514     
00515     if(mgmt_client) {
00516         pconn_client_close(mgmt_client);
00517         mgmt_client = NULL;
00518     }
00519 
00520     INSIST_ERR(msp_spinlock_lock(&msgs_lock) == MSP_OK);
00521 
00522     while((msg = TAILQ_FIRST(&notification_msgs)) != NULL) {
00523         TAILQ_REMOVE(&notification_msgs, msg, entries);
00524         free(msg->message);
00525         free(msg);
00526     }
00527     INSIST_ERR(msp_spinlock_unlock(&msgs_lock) == MSP_OK);
00528 }
00529 
00530 
00549 void
00550 notify_server_status(uint16_t svc_set_id,
00551                      in_addr_t app_addr,
00552                      uint16_t app_port,
00553                      in_addr_t server_addr,
00554                      uint8_t status)
00555 {
00556     notification_msg_t * msg;
00557     server_status_t * server_status;
00558     
00559     if(mgmt_client == NULL) { // don't bother doing anything yet
00560         return;
00561     }
00562     
00563     msg = calloc(1, sizeof(notification_msg_t));
00564     INSIST_ERR(msg != NULL);
00565     
00566     msg->action = MSG_SERVER_UPDATE;
00567     msg->app_addr = app_addr;
00568     msg->app_port = app_port;
00569     msg->message = server_status = calloc(1, sizeof(server_status_t));
00570     INSIST_ERR(server_status != NULL);
00571     
00572     server_status->server_addr = server_addr;
00573     server_status->server_status = status;
00574     server_status->svc_set_id = svc_set_id;
00575     
00576     INSIST_ERR(msp_spinlock_lock(&msgs_lock) == MSP_OK);
00577     
00578     TAILQ_INSERT_TAIL(&notification_msgs, msg, entries);
00579     
00580     INSIST_ERR(msp_spinlock_unlock(&msgs_lock) == MSP_OK);
00581     
00582     process_notifications();
00583 }
00584 
00585 
00601 void
00602 notify_application_sessions(uint16_t svc_set_id,
00603                             in_addr_t app_addr,
00604                             uint16_t app_port,
00605                             uint32_t session_count)
00606 {
00607     notification_msg_t * msg;
00608     sessions_status_t * status;
00609     
00610     if(mgmt_client == NULL) { // don't bother doing anything yet
00611         return;
00612     }
00613         
00614     msg = calloc(1, sizeof(notification_msg_t));
00615     INSIST_ERR(msg != NULL);
00616     
00617     msg->action = MSG_STATUS_UPDATE;
00618     msg->app_addr = app_addr;
00619     msg->app_port = app_port;
00620     msg->message = status = calloc(1, sizeof(sessions_status_t));
00621     INSIST_ERR(status != NULL);
00622     
00623     status->active_sessions = session_count;
00624     status->svc_set_id = svc_set_id;
00625     
00626     INSIST_ERR(msp_spinlock_lock(&msgs_lock) == MSP_OK);
00627     
00628     TAILQ_INSERT_TAIL(&notification_msgs, msg, entries);
00629     
00630     INSIST_ERR(msp_spinlock_unlock(&msgs_lock) == MSP_OK);
00631 
00632     process_notifications();
00633 }
00634 

2007-2009 Juniper Networks, Inc. All rights reserved. The information contained herein is confidential information of Juniper Networks, Inc., and may not be used, disclosed, distributed, modified, or copied without the prior written consent of Juniper Networks, Inc. in an express license. This information is subject to change by Juniper Networks, Inc. Juniper Networks, the Juniper Networks logo, and JUNOS are registered trademarks of Juniper Networks, Inc. in the United States and other countries. All other trademarks, service marks, registered trademarks, or registered service marks are the property of their respective owners.
Generated on Sun May 30 20:26:56 2010 for SDK Your Net Corporation Equilibrium Load Balancer Example: equilibrium-data 1.0 by Doxygen 1.5.1