equilibrium2-balance_ctrl.c

Go to the documentation of this file.
00001 /*
00002  * $Id: equilibrium2-balance_ctrl.c 346460 2009-11-14 05:06:47Z ssiano $
00003  *
00004  * This code is provided as is by Juniper Networks SDK Developer Support.
00005  * It is provided with no warranties or guarantees, and Juniper Networks
00006  * will not provide support or maintenance of this code in any fashion.
00007  * The code is provided only to help a developer better understand how
00008  * the SDK can be used.
00009  *
00010  * Copyright (c) 2008, Juniper Networks, Inc.
00011  * All rights reserved.
00012  */
00013 
00020 #include <sync/equilibrium2.h>
00021 #include <sync/equilibrium2_svc.h>
00022 #include "equilibrium2-balance.h"
00023 
00024 #include <jnx/ipc_types.h>
00025 #include <jnx/pconn.h>
00026 #include <jnx/junos_kcom_mpsdk_cfg.h>
00027 #include <jnx/mpsdk.h>
00028 #include <jnx/msp_objcache.h>
00029 
00030 static connect_state_t  connect_state;    
00031 static evTimerID        ev_timer_id;      
00032 static pconn_client_t   *client_hdl;      
00044 static void
00045 update_svr_group_addr (svr_group_t *group, in_addr_t addr)
00046 {
00047     svr_addr_t *svr_addr;
00048 
00049     LIST_FOREACH(svr_addr, &group->group_addr_head, entry) {
00050         if (svr_addr->addr == addr) {
00051             /* Address exists, mark it new. */
00052             svr_addr->addr_new = true;
00053             return;
00054         }
00055     }
00056 
00057     /* New address, create address node and add it to the list. */
00058     svr_addr = msp_shm_alloc(ctrl_ctx->scc_shm, sizeof(*svr_addr));
00059     if (svr_addr == NULL) {
00060         msp_log(LOG_ERR, "%s: Allocate memory ERROR!", __func__);
00061         return;
00062     }
00063     svr_addr->addr = addr;
00064     svr_addr->addr_ssn_count = 0;
00065     svr_addr->addr_new = true;
00066 
00067     LIST_INSERT_HEAD(&group->group_addr_head, svr_addr, entry);
00068 }
00069 
00080 static svr_group_t *
00081 get_svr_group (char *name)
00082 {
00083     svr_group_t *group;
00084 
00085     LIST_FOREACH(group, svr_group_head, entry) {
00086         if (strcmp(name, group->group_name) == 0) {
00087             break;
00088         }
00089     }
00090     return group;
00091 }
00092 
00103 static void
00104 proc_svr_group_blob (void *blob, junos_kcom_gencfg_opcode_t op)
00105 {
00106     blob_svr_group_set_t *blob_group_set = blob;
00107     blob_svr_group_t *blob_group;
00108     in_addr_t *blob_addr;
00109     svr_group_t *group, *group_tmp;
00110     svr_addr_t *addr, *addr_tmp;
00111     int i, j;
00112 
00113     NTOHS(blob_group_set->gs_count);
00114     svr_group_count = blob_group_set->gs_count;
00115 
00116     switch (op) {
00117     case JUNOS_KCOM_GENCFG_OPCODE_BLOB_ADD:
00118         msp_log(LOG_INFO, "%s: Add %d server groups.", __func__,
00119                 svr_group_count);
00120         break;
00121     case JUNOS_KCOM_GENCFG_OPCODE_BLOB_DEL:
00122         msp_log(LOG_INFO, "%s: Delete %d server groups.", __func__,
00123                 svr_group_count);
00124 
00125         /* Don't delete server group, always update server group list
00126          * when adding a new list.
00127          */
00128         return;
00129     default:
00130         msp_log(LOG_INFO, "%s: Ignore operation %d.", __func__, op);
00131         return;
00132     }
00133 
00134     /* Lock the list when updating it. */
00135     msp_spinlock_lock(&svr_group_lock);
00136 
00137     blob_group = blob_group_set->gs_group;
00138     for (i = 0; i < blob_group_set->gs_count; i++) {
00139         NTOHS(blob_group->group_addr_count);
00140 
00141         group = get_svr_group(blob_group->group_name);
00142         if (group == NULL) {
00143 
00144             /* A new group, create a group in shared memory. */
00145             group = msp_shm_alloc(ctrl_ctx->scc_shm, sizeof(*group));
00146             if (group == NULL) {
00147                 msp_log(LOG_ERR, "%s: Allocate memory ERROR!", __func__);
00148                 goto done;
00149             }
00150             strlcpy(group->group_name, blob_group->group_name,
00151                     sizeof(group->group_name));
00152             group->group_addr_count = 0;
00153             LIST_INIT(&group->group_addr_head);
00154 
00155             /* Add group to the list. */
00156             LIST_INSERT_HEAD(svr_group_head, group, entry);
00157         }
00158         group->group_addr_count = blob_group->group_addr_count;
00159 
00160         blob_addr = blob_group->group_addr;
00161         for (j = 0; j < blob_group->group_addr_count; j++) {
00162             msp_log(LOG_INFO, "%s: Update 0x%08x in group %s.",
00163                     __func__, *blob_addr, group->group_name);
00164             update_svr_group_addr(group, *blob_addr);
00165             blob_addr++;
00166         }
00167         blob_group = (blob_svr_group_t *)blob_addr;
00168     }
00169 
00170     /* Clear old addresses in server group list. */
00171     LIST_FOREACH_SAFE(group, svr_group_head, entry, group_tmp) {
00172 
00173         LIST_FOREACH_SAFE(addr, &group->group_addr_head, entry, addr_tmp) {
00174             if (addr->addr_new) {
00175                 msp_log(LOG_INFO, "%s: Address 0x%08x is new/unchanged.",
00176                         __func__, addr->addr);
00177                 addr->addr_new = false;
00178             } else {
00179                 msp_log(LOG_INFO, "%s: Address 0x%08x is deleted.",
00180                         __func__, addr->addr);
00181                 LIST_REMOVE(addr, entry);
00182                 msp_shm_free(ctrl_ctx->scc_shm, addr);
00183             }
00184         }
00185         if (LIST_FIRST(&group->group_addr_head) == NULL) {
00186             msp_log(LOG_INFO, "%s: Group %s is deleted.",
00187                     __func__, group->group_name);
00188 
00189             /* All addresses are deleted in this group, delete it. */
00190             LIST_REMOVE(group, entry);
00191             msp_shm_free(ctrl_ctx->scc_shm, group);
00192         }
00193     }
00194 
00195 done:
00196     msp_spinlock_unlock(&svr_group_lock);
00197 }
00198 
00209 static void
00210 proc_svc_set_blob (void *blob, junos_kcom_gencfg_opcode_t op)
00211 {
00212     blob_svc_set_t *blob_ss = blob;
00213     blob_svc_set_t *policy = NULL;
00214     blob_rule_t *blob_rule;
00215     blob_term_t *blob_term;
00216     int rule, term;
00217     msp_policy_db_params_t policy_db_params;
00218     sp_svc_set_t *ss;
00219 
00220     NTOHS(blob_ss->ss_id);
00221     NTOHL(blob_ss->ss_svc_id);
00222     NTOHL(blob_ss->ss_gen_num);
00223     NTOHS(blob_ss->ss_size);
00224     NTOHS(blob_ss->ss_rule_count);
00225 
00226     msp_log(LOG_INFO, "%s: %s (%d) with %d rules.", __func__,
00227             blob_ss->ss_name, blob_ss->ss_id, blob_ss->ss_rule_count);
00228 
00229     blob_rule = blob_ss->ss_rule;
00230     for (rule = 0; rule < blob_ss->ss_rule_count; rule++) {
00231         NTOHS(blob_rule->rule_term_count);
00232         
00233         blob_term = blob_rule->rule_term;
00234         for (term = 0; term < blob_rule->rule_term_count; term++) {
00235             NTOHS(blob_term->term_match_port);
00236             blob_term++;
00237         }
00238         blob_rule = (blob_rule_t *)blob_term;
00239     }
00240     
00241     /* Setup policy-db parameters. */
00242     bzero(&policy_db_params, sizeof(msp_policy_db_params_t));
00243     policy_db_params.handle = ctrl_ctx->policy_db_handle;
00244     policy_db_params.svc_set_id = blob_ss->ss_id;
00245     policy_db_params.svc_id = blob_ss->ss_svc_id;
00246     policy_db_params.plugin_id = balance_pid;
00247     strlcpy(policy_db_params.plugin_name, EQ2_BALANCE_SVC_NAME,
00248             sizeof(policy_db_params.plugin_name));
00249 
00250     switch (op) {
00251     case JUNOS_KCOM_GENCFG_OPCODE_BLOB_ADD:
00252         msp_log(LOG_INFO, "%s: Add policy.", __func__);
00253         msp_spinlock_lock(&svc_set_lock);
00254 
00255         /* Check active service-set. */
00256         ss = get_svc_set(blob_ss->ss_id, true);
00257         if (ss) {
00258             msp_log(LOG_ERR, "%s: Check active service-set ERROR!",
00259                     __func__);
00260             goto done;
00261         }
00262 
00263         /* Allocate memory from policy-db shared memory. */
00264         policy = msp_shm_alloc(ctrl_ctx->policy_shm_handle, blob_ss->ss_size);
00265         if (policy == NULL) {
00266             msp_log(LOG_ERR, "%s: Allocate memory ERROR!", __func__);
00267             goto done;
00268         }
00269 
00270         /* Copy blob to policy. */
00271         bcopy(blob_ss, policy, blob_ss->ss_size);
00272 
00273         /* Add service-set to the list and mark it active. */
00274         ss = calloc(1, sizeof(*ss));
00275         INSIST_ERR(ss != NULL);
00276         ss->ss_policy = policy;
00277         ss->ss_active = true;
00278         LIST_INSERT_HEAD(&svc_set_head, ss, entry);
00279 
00280         /* Check inactive service-set. */
00281         if (get_svc_set(blob_ss->ss_id, FALSE)) {
00282             
00283             /* Inactive service-set exists, there are sessions using this
00284              * old policy in the middle, the old policy was not detached from
00285              * service-set yet, so don't add new policy to policy-db.
00286              */
00287         } else {
00288 
00289             /* No inactive service-set, the old policy must have been detached
00290              * and deleted, add new policy to policy-db.
00291              */
00292             policy_db_params.policy_op = MSP_POLICY_DB_POLICY_ADD;
00293             policy_db_params.op.add_params.gen_num = blob_ss->ss_gen_num;
00294             policy_db_params.op.add_params.policy = policy;
00295             if (msp_policy_db_op(&policy_db_params) != MSP_OK) {
00296                 msp_log(LOG_ERR, "%s: Policy operation %d ERROR!",
00297                         __func__, policy_db_params.policy_op);
00298                 del_svc_set(ss);
00299             }
00300         }
00301         break;
00302     case JUNOS_KCOM_GENCFG_OPCODE_BLOB_DEL:
00303         msp_log(LOG_INFO, "%s: Delete policy.", __func__);
00304         msp_spinlock_lock(&svc_set_lock);
00305 
00306         /* Check inactive service-set. */
00307         if (get_svc_set(blob_ss->ss_id, FALSE)) {
00308             msp_log(LOG_ERR, "%s: Inactive sevice-set exists!", __func__);
00309             goto done;
00310         }
00311 
00312         /* Check active service-set. */
00313         ss = get_svc_set(blob_ss->ss_id, true);
00314         if (ss == NULL) {
00315             msp_log(LOG_ERR, "%s: No active sevice-set to delete!",
00316                     __func__);
00317             goto done;
00318         }
00319 
00320         /* Check session counter. */
00321         if (ss->ss_ssn_count) {
00322 
00323             /* There are sesions using this service-set policy, to not break
00324              * thoes sessions, don't delete this service-set, just mark it
00325              * inactive. Don't detach it from policy-db either, so data handler
00326              * can keep receiving the rest packet of thoes sessions.
00327              * Any new session won't be accepted till all existing sessions
00328              * are closed and new policy is attached.
00329              */
00330             ss->ss_active = FALSE;
00331         } else {
00332 
00333             /* There is no session using this service-set policy,
00334              * detach policy from policy-db, free policy memory and
00335              * delete service-set.
00336              */
00337             policy_db_params.policy_op = MSP_POLICY_DB_POLICY_DEL;
00338             policy_db_params.op.del_params.gen_num = blob_ss->ss_gen_num;
00339             if (msp_policy_db_op(&policy_db_params) != MSP_OK) {
00340                 msp_log(LOG_ERR, "%s: Policy operation %d ERROR!",
00341                         __func__, policy_db_params.policy_op);
00342             }
00343             del_svc_set(ss);
00344         }
00345         break;
00346     default:
00347         msp_log(LOG_INFO, "%s: Ignore operation %d.", __func__, op);
00348     }
00349 
00350 done:
00351     msp_spinlock_unlock(&svc_set_lock);
00352 }
00353 
00361 static void
00362 read_gencfg_blob (junos_kcom_gencfg_t *kcom_gencfg)
00363 {
00364     config_blob_key_t *key;
00365 
00366     key = kcom_gencfg->key.data_p;
00367     msp_log(LOG_INFO, "%s: %s, tag: %d, key len %d, blob len %d",
00368             __func__, key->key_name, key->key_tag, kcom_gencfg->key.size,
00369             kcom_gencfg->blob.size);
00370 
00371     switch (key->key_tag) {
00372     case CONFIG_BLOB_SVC_SET:
00373         proc_svc_set_blob(kcom_gencfg->blob.data_p, kcom_gencfg->opcode);
00374         break;
00375     case CONFIG_BLOB_SVR_GROUP:
00376         proc_svr_group_blob(kcom_gencfg->blob.data_p, kcom_gencfg->opcode);
00377         break;
00378     default:
00379         msp_log(LOG_INFO, "%s: Ignore blob.", __func__);
00380     }
00381     JUNOS_KCOM_MPSDK_CFG_FREE(kcom_gencfg);
00382 }
00383 
00388 static void
00389 send_svr_group (void)
00390 {
00391     svr_group_t *group;
00392     svr_addr_t *addr;
00393     char *msg;
00394     int len;
00395     msg_svr_addr_t *msg_addr;
00396     msg_svr_group_t *msg_group;
00397     uint32_t *msg_group_count;
00398 
00399     if (svr_group_count == 0) {
00400         msp_log(LOG_INFO, "%s: No server group.", __func__);
00401         return;
00402     }
00403 
00404     msp_spinlock_lock(&svr_group_lock);
00405 
00406     /* Get the total length of message. */
00407     /* The first 4 bytes is the number of server groups. */
00408     len = sizeof(uint32_t) + svr_group_count * sizeof(msg_svr_group_t);
00409     LIST_FOREACH(group, svr_group_head, entry) {
00410         len += group->group_addr_count * sizeof(msg_svr_addr_t);
00411     }
00412 
00413     /* Create message. */
00414     msg = calloc(1, len);
00415     INSIST_ERR(msg != NULL);
00416 
00417     /* The first two bytes is the number of server groups. */
00418     msg_group_count = (uint32_t *)msg;
00419     *msg_group_count = htonl(svr_group_count);
00420 
00421     /* Skip the first two bytes that is the number of server groups and
00422      * add all server groups and addresses to the message.
00423      */
00424     msg_group = (msg_svr_group_t *)(msg + sizeof(uint32_t));
00425     LIST_FOREACH(group, svr_group_head, entry) {
00426         strlcpy(msg_group->group_name, group->group_name,
00427                 sizeof(msg_group->group_name));
00428         msg_group->group_addr_count = htons(group->group_addr_count);
00429 
00430         msg_addr = (msg_svr_addr_t *)msg_group->group_addr;
00431         LIST_FOREACH(addr, &group->group_addr_head, entry) {
00432             msg_addr->addr = addr->addr;
00433             msg_addr->addr_ssn_count = htons(addr->addr_ssn_count);
00434             msg_addr++;
00435         }
00436         msg_group = (msg_svr_group_t *)msg_addr;
00437     }
00438     msp_spinlock_unlock(&svr_group_lock);
00439 
00440     pconn_client_send(client_hdl, EQ2_BALANCE_MSG_SVR_GROUP, msg, len);
00441     free(msg);
00442 }
00443 
00457 static void
00458 client_event_hdlr (pconn_client_t *client UNUSED, pconn_event_t event,
00459         void *cookie UNUSED)
00460 {
00461     switch (event) {
00462     case PCONN_EVENT_ESTABLISHED:
00463         msp_log(LOG_INFO, "%s: Connect to server OK.", __func__);
00464         connect_state = CONNECT_OK;
00465         break;
00466     case PCONN_EVENT_SHUTDOWN:
00467         msp_log(LOG_INFO, "%s: Connection to server is down.", __func__);
00468         connect_state = CONNECT_NA;
00469         break;
00470     case PCONN_EVENT_FAILED:
00471         msp_log(LOG_INFO, "%s: Connection to server FAILED.", __func__);
00472         connect_state = CONNECT_NA;
00473         break;
00474     default:
00475         msp_log(LOG_ERR, "%s: Unknown event %d.", __func__, event);
00476     }
00477 }
00478 
00495 static status_t
00496 client_msg_hdlr (pconn_client_t *client UNUSED, ipc_msg_t *msg UNUSED,
00497         void *cookie UNUSED)
00498 {
00499     /* Don't care any message from the manager. */
00500     return 0;
00501 }
00502 
00510 static void
00511 connect_manager (evContext ctx)
00512 {
00513     pconn_client_params_t param;
00514 
00515     bzero(&param, sizeof(param));
00516     param.pconn_peer_info.ppi_peer_type = PCONN_PEER_TYPE_RE;
00517     param.pconn_port = EQ2_MGMT_SERVER_PORT;
00518     param.pconn_num_retries = EQ2_CLIENT_RETRY;
00519     param.pconn_event_handler = client_event_hdlr;
00520     client_hdl = pconn_client_connect_async(&param, ctx, client_msg_hdlr, NULL);
00521     if (client_hdl) {
00522         connect_state = CONNECT_INPROGRESS;
00523         msp_log(LOG_INFO, "%s: Connecting to server...", __func__);
00524     } else {
00525         msp_log(LOG_ERR, "%s: Connect to server ERROR!", __func__);
00526     }
00527 }
00528 
00545 static void
00546 upload_status (evContext ctx, void *uap UNUSED, struct timespec due UNUSED,
00547         struct timespec inter UNUSED)
00548 {
00549     if (connect_state == CONNECT_OK) {
00550         msp_log(LOG_INFO, "%s: Send server group info.", __func__);
00551         send_svr_group();
00552     } else if (connect_state == CONNECT_NA) {
00553         connect_manager(ctx);
00554     }
00555 }
00556 
00573 sp_svc_set_t *
00574 get_svc_set (uint16_t id, bool active)
00575 {
00576     sp_svc_set_t *ss;
00577 
00578     LIST_FOREACH(ss, &svc_set_head, entry) {
00579         if ((ss->ss_policy->ss_id == id) && (ss->ss_active = active)){
00580             break;
00581         }
00582     }
00583     return ss;
00584 }
00585 
00593 void
00594 del_svc_set (sp_svc_set_t *ss)
00595 {
00596     if (ss == NULL) {
00597         return;
00598     }
00599     LIST_REMOVE(ss, entry);
00600     msp_shm_free(ctrl_ctx->policy_shm_handle, ss->ss_policy);
00601     free(ss);
00602 }
00603 
00617 int
00618 equilibrium2_balance_ctrl_hdlr (msvcs_control_context_t *ctx,
00619         msvcs_control_event_t ev)
00620 {
00621 
00622     /* Save control context. */
00623     ctrl_ctx = ctx;
00624 
00625     switch (ev) {
00626     case MSVCS_CONTROL_EV_INIT:
00627         msp_log(LOG_INFO, "%s: MSVCS_CONTROL_EV_INIT", __func__);
00628 
00629         if (msvcs_plugin_resolve_event_class(EQ2_CLASSIFY_SVC_NAME,
00630                     EV_CLASS_CLASSIFY, &classify_ev_class) < 0) {
00631             msp_log(LOG_ERR, "%s: Resovle event class EV_CLASS_CLASSIFY ERROR!",
00632                     __func__);
00633         }
00634 
00635         msvcs_plugin_subscribe_data_events(balance_pid, classify_ev_class,
00636                 MSVCS_GET_EVENT_MASK(EV_CLASSIFY_FIRST_PACKET));
00637 
00638         /* Create the head of server group list. */
00639         svr_group_head = msp_shm_alloc(ctrl_ctx->policy_shm_handle,
00640                 sizeof(*svr_group_head));
00641         if (svr_group_head == NULL) {
00642             msp_log(LOG_ERR, "%s: Allocate memory ERROR!", __func__);
00643             break;
00644         }
00645         LIST_INIT(svr_group_head);
00646         svr_group_count = 0;
00647         msp_spinlock_init(&svc_set_lock);
00648         msp_spinlock_init(&svr_group_lock);
00649         connect_state = CONNECT_NA;
00650 
00651         /* Schedule sending status to manager. */
00652         evInitID(&ev_timer_id);
00653         if (evSetTimer(*ctx->scc_ev_ctxt, upload_status, NULL,
00654                 evAddTime(evNowTime(), evConsTime(STATUS_UPDATE_INTERVAL, 0)),
00655                 evConsTime(STATUS_UPDATE_INTERVAL, 0), &ev_timer_id) < 0) {
00656             msp_log(LOG_ERR, "%s: Schedule sending status ERROR!", __func__);
00657         }
00658 
00659         break;
00660 
00661     case MSVCS_CONTROL_EV_CFG_BLOB:
00662         msp_log(LOG_INFO, "%s: MSVCS_CONTROL_EV_CFG_BLOB", __func__);
00663         read_gencfg_blob(ctx->plugin_data);
00664         break;
00665 
00666     default:
00667         msp_log(LOG_ERR, "%s: Unknown event!", __func__);
00668     }
00669     return 0;
00670 }
00671 

2007-2009 Juniper Networks, Inc. All rights reserved. The information contained herein is confidential information of Juniper Networks, Inc., and may not be used, disclosed, distributed, modified, or copied without the prior written consent of Juniper Networks, Inc. in an express license. This information is subject to change by Juniper Networks, Inc. Juniper Networks, the Juniper Networks logo, and JUNOS are registered trademarks of Juniper Networks, Inc. in the United States and other countries. All other trademarks, service marks, registered trademarks, or registered service marks are the property of their respective owners.
Generated on Sun May 30 20:27:08 2010 for SDK Your Net Corporation Equilibrium II: equilibrium2-balance 1.0 by Doxygen 1.5.1