00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00020 #include <sync/equilibrium2.h>
00021 #include <sync/equilibrium2_svc.h>
00022 #include "equilibrium2-balance.h"
00023
00024 #include <jnx/ipc_types.h>
00025 #include <jnx/pconn.h>
00026 #include <jnx/junos_kcom_mpsdk_cfg.h>
00027 #include <jnx/mpsdk.h>
00028 #include <jnx/msp_objcache.h>
00029
00030 static connect_state_t connect_state;
00031 static evTimerID ev_timer_id;
00032 static pconn_client_t *client_hdl;
00044 static void
00045 update_svr_group_addr (svr_group_t *group, in_addr_t addr)
00046 {
00047 svr_addr_t *svr_addr;
00048
00049 LIST_FOREACH(svr_addr, &group->group_addr_head, entry) {
00050 if (svr_addr->addr == addr) {
00051
00052 svr_addr->addr_new = true;
00053 return;
00054 }
00055 }
00056
00057
00058 svr_addr = msp_shm_alloc(ctrl_ctx->scc_shm, sizeof(*svr_addr));
00059 if (svr_addr == NULL) {
00060 msp_log(LOG_ERR, "%s: Allocate memory ERROR!", __func__);
00061 return;
00062 }
00063 svr_addr->addr = addr;
00064 svr_addr->addr_ssn_count = 0;
00065 svr_addr->addr_new = true;
00066
00067 LIST_INSERT_HEAD(&group->group_addr_head, svr_addr, entry);
00068 }
00069
00080 static svr_group_t *
00081 get_svr_group (char *name)
00082 {
00083 svr_group_t *group;
00084
00085 LIST_FOREACH(group, svr_group_head, entry) {
00086 if (strcmp(name, group->group_name) == 0) {
00087 break;
00088 }
00089 }
00090 return group;
00091 }
00092
00103 static void
00104 proc_svr_group_blob (void *blob, junos_kcom_gencfg_opcode_t op)
00105 {
00106 blob_svr_group_set_t *blob_group_set = blob;
00107 blob_svr_group_t *blob_group;
00108 in_addr_t *blob_addr;
00109 svr_group_t *group, *group_tmp;
00110 svr_addr_t *addr, *addr_tmp;
00111 int i, j;
00112
00113 NTOHS(blob_group_set->gs_count);
00114 svr_group_count = blob_group_set->gs_count;
00115
00116 switch (op) {
00117 case JUNOS_KCOM_GENCFG_OPCODE_BLOB_ADD:
00118 msp_log(LOG_INFO, "%s: Add %d server groups.", __func__,
00119 svr_group_count);
00120 break;
00121 case JUNOS_KCOM_GENCFG_OPCODE_BLOB_DEL:
00122 msp_log(LOG_INFO, "%s: Delete %d server groups.", __func__,
00123 svr_group_count);
00124
00125
00126
00127
00128 return;
00129 default:
00130 msp_log(LOG_INFO, "%s: Ignore operation %d.", __func__, op);
00131 return;
00132 }
00133
00134
00135 msp_spinlock_lock(&svr_group_lock);
00136
00137 blob_group = blob_group_set->gs_group;
00138 for (i = 0; i < blob_group_set->gs_count; i++) {
00139 NTOHS(blob_group->group_addr_count);
00140
00141 group = get_svr_group(blob_group->group_name);
00142 if (group == NULL) {
00143
00144
00145 group = msp_shm_alloc(ctrl_ctx->scc_shm, sizeof(*group));
00146 if (group == NULL) {
00147 msp_log(LOG_ERR, "%s: Allocate memory ERROR!", __func__);
00148 goto done;
00149 }
00150 strlcpy(group->group_name, blob_group->group_name,
00151 sizeof(group->group_name));
00152 group->group_addr_count = 0;
00153 LIST_INIT(&group->group_addr_head);
00154
00155
00156 LIST_INSERT_HEAD(svr_group_head, group, entry);
00157 }
00158 group->group_addr_count = blob_group->group_addr_count;
00159
00160 blob_addr = blob_group->group_addr;
00161 for (j = 0; j < blob_group->group_addr_count; j++) {
00162 msp_log(LOG_INFO, "%s: Update 0x%08x in group %s.",
00163 __func__, *blob_addr, group->group_name);
00164 update_svr_group_addr(group, *blob_addr);
00165 blob_addr++;
00166 }
00167 blob_group = (blob_svr_group_t *)blob_addr;
00168 }
00169
00170
00171 LIST_FOREACH_SAFE(group, svr_group_head, entry, group_tmp) {
00172
00173 LIST_FOREACH_SAFE(addr, &group->group_addr_head, entry, addr_tmp) {
00174 if (addr->addr_new) {
00175 msp_log(LOG_INFO, "%s: Address 0x%08x is new/unchanged.",
00176 __func__, addr->addr);
00177 addr->addr_new = false;
00178 } else {
00179 msp_log(LOG_INFO, "%s: Address 0x%08x is deleted.",
00180 __func__, addr->addr);
00181 LIST_REMOVE(addr, entry);
00182 msp_shm_free(ctrl_ctx->scc_shm, addr);
00183 }
00184 }
00185 if (LIST_FIRST(&group->group_addr_head) == NULL) {
00186 msp_log(LOG_INFO, "%s: Group %s is deleted.",
00187 __func__, group->group_name);
00188
00189
00190 LIST_REMOVE(group, entry);
00191 msp_shm_free(ctrl_ctx->scc_shm, group);
00192 }
00193 }
00194
00195 done:
00196 msp_spinlock_unlock(&svr_group_lock);
00197 }
00198
00209 static void
00210 proc_svc_set_blob (void *blob, junos_kcom_gencfg_opcode_t op)
00211 {
00212 blob_svc_set_t *blob_ss = blob;
00213 blob_svc_set_t *policy = NULL;
00214 blob_rule_t *blob_rule;
00215 blob_term_t *blob_term;
00216 int rule, term;
00217 msp_policy_db_params_t policy_db_params;
00218 sp_svc_set_t *ss;
00219
00220 NTOHS(blob_ss->ss_id);
00221 NTOHL(blob_ss->ss_svc_id);
00222 NTOHL(blob_ss->ss_gen_num);
00223 NTOHS(blob_ss->ss_size);
00224 NTOHS(blob_ss->ss_rule_count);
00225
00226 msp_log(LOG_INFO, "%s: %s (%d) with %d rules.", __func__,
00227 blob_ss->ss_name, blob_ss->ss_id, blob_ss->ss_rule_count);
00228
00229 blob_rule = blob_ss->ss_rule;
00230 for (rule = 0; rule < blob_ss->ss_rule_count; rule++) {
00231 NTOHS(blob_rule->rule_term_count);
00232
00233 blob_term = blob_rule->rule_term;
00234 for (term = 0; term < blob_rule->rule_term_count; term++) {
00235 NTOHS(blob_term->term_match_port);
00236 blob_term++;
00237 }
00238 blob_rule = (blob_rule_t *)blob_term;
00239 }
00240
00241
00242 bzero(&policy_db_params, sizeof(msp_policy_db_params_t));
00243 policy_db_params.handle = ctrl_ctx->policy_db_handle;
00244 policy_db_params.svc_set_id = blob_ss->ss_id;
00245 policy_db_params.svc_id = blob_ss->ss_svc_id;
00246 policy_db_params.plugin_id = balance_pid;
00247 strlcpy(policy_db_params.plugin_name, EQ2_BALANCE_SVC_NAME,
00248 sizeof(policy_db_params.plugin_name));
00249
00250 switch (op) {
00251 case JUNOS_KCOM_GENCFG_OPCODE_BLOB_ADD:
00252 msp_log(LOG_INFO, "%s: Add policy.", __func__);
00253 msp_spinlock_lock(&svc_set_lock);
00254
00255
00256 ss = get_svc_set(blob_ss->ss_id, true);
00257 if (ss) {
00258 msp_log(LOG_ERR, "%s: Check active service-set ERROR!",
00259 __func__);
00260 goto done;
00261 }
00262
00263
00264 policy = msp_shm_alloc(ctrl_ctx->policy_shm_handle, blob_ss->ss_size);
00265 if (policy == NULL) {
00266 msp_log(LOG_ERR, "%s: Allocate memory ERROR!", __func__);
00267 goto done;
00268 }
00269
00270
00271 bcopy(blob_ss, policy, blob_ss->ss_size);
00272
00273
00274 ss = calloc(1, sizeof(*ss));
00275 INSIST_ERR(ss != NULL);
00276 ss->ss_policy = policy;
00277 ss->ss_active = true;
00278 LIST_INSERT_HEAD(&svc_set_head, ss, entry);
00279
00280
00281 if (get_svc_set(blob_ss->ss_id, FALSE)) {
00282
00283
00284
00285
00286
00287 } else {
00288
00289
00290
00291
00292 policy_db_params.policy_op = MSP_POLICY_DB_POLICY_ADD;
00293 policy_db_params.op.add_params.gen_num = blob_ss->ss_gen_num;
00294 policy_db_params.op.add_params.policy = policy;
00295 if (msp_policy_db_op(&policy_db_params) != MSP_OK) {
00296 msp_log(LOG_ERR, "%s: Policy operation %d ERROR!",
00297 __func__, policy_db_params.policy_op);
00298 del_svc_set(ss);
00299 }
00300 }
00301 break;
00302 case JUNOS_KCOM_GENCFG_OPCODE_BLOB_DEL:
00303 msp_log(LOG_INFO, "%s: Delete policy.", __func__);
00304 msp_spinlock_lock(&svc_set_lock);
00305
00306
00307 if (get_svc_set(blob_ss->ss_id, FALSE)) {
00308 msp_log(LOG_ERR, "%s: Inactive sevice-set exists!", __func__);
00309 goto done;
00310 }
00311
00312
00313 ss = get_svc_set(blob_ss->ss_id, true);
00314 if (ss == NULL) {
00315 msp_log(LOG_ERR, "%s: No active sevice-set to delete!",
00316 __func__);
00317 goto done;
00318 }
00319
00320
00321 if (ss->ss_ssn_count) {
00322
00323
00324
00325
00326
00327
00328
00329
00330 ss->ss_active = FALSE;
00331 } else {
00332
00333
00334
00335
00336
00337 policy_db_params.policy_op = MSP_POLICY_DB_POLICY_DEL;
00338 policy_db_params.op.del_params.gen_num = blob_ss->ss_gen_num;
00339 if (msp_policy_db_op(&policy_db_params) != MSP_OK) {
00340 msp_log(LOG_ERR, "%s: Policy operation %d ERROR!",
00341 __func__, policy_db_params.policy_op);
00342 }
00343 del_svc_set(ss);
00344 }
00345 break;
00346 default:
00347 msp_log(LOG_INFO, "%s: Ignore operation %d.", __func__, op);
00348 }
00349
00350 done:
00351 msp_spinlock_unlock(&svc_set_lock);
00352 }
00353
00361 static void
00362 read_gencfg_blob (junos_kcom_gencfg_t *kcom_gencfg)
00363 {
00364 config_blob_key_t *key;
00365
00366 key = kcom_gencfg->key.data_p;
00367 msp_log(LOG_INFO, "%s: %s, tag: %d, key len %d, blob len %d",
00368 __func__, key->key_name, key->key_tag, kcom_gencfg->key.size,
00369 kcom_gencfg->blob.size);
00370
00371 switch (key->key_tag) {
00372 case CONFIG_BLOB_SVC_SET:
00373 proc_svc_set_blob(kcom_gencfg->blob.data_p, kcom_gencfg->opcode);
00374 break;
00375 case CONFIG_BLOB_SVR_GROUP:
00376 proc_svr_group_blob(kcom_gencfg->blob.data_p, kcom_gencfg->opcode);
00377 break;
00378 default:
00379 msp_log(LOG_INFO, "%s: Ignore blob.", __func__);
00380 }
00381 JUNOS_KCOM_MPSDK_CFG_FREE(kcom_gencfg);
00382 }
00383
00388 static void
00389 send_svr_group (void)
00390 {
00391 svr_group_t *group;
00392 svr_addr_t *addr;
00393 char *msg;
00394 int len;
00395 msg_svr_addr_t *msg_addr;
00396 msg_svr_group_t *msg_group;
00397 uint32_t *msg_group_count;
00398
00399 if (svr_group_count == 0) {
00400 msp_log(LOG_INFO, "%s: No server group.", __func__);
00401 return;
00402 }
00403
00404 msp_spinlock_lock(&svr_group_lock);
00405
00406
00407
00408 len = sizeof(uint32_t) + svr_group_count * sizeof(msg_svr_group_t);
00409 LIST_FOREACH(group, svr_group_head, entry) {
00410 len += group->group_addr_count * sizeof(msg_svr_addr_t);
00411 }
00412
00413
00414 msg = calloc(1, len);
00415 INSIST_ERR(msg != NULL);
00416
00417
00418 msg_group_count = (uint32_t *)msg;
00419 *msg_group_count = htonl(svr_group_count);
00420
00421
00422
00423
00424 msg_group = (msg_svr_group_t *)(msg + sizeof(uint32_t));
00425 LIST_FOREACH(group, svr_group_head, entry) {
00426 strlcpy(msg_group->group_name, group->group_name,
00427 sizeof(msg_group->group_name));
00428 msg_group->group_addr_count = htons(group->group_addr_count);
00429
00430 msg_addr = (msg_svr_addr_t *)msg_group->group_addr;
00431 LIST_FOREACH(addr, &group->group_addr_head, entry) {
00432 msg_addr->addr = addr->addr;
00433 msg_addr->addr_ssn_count = htons(addr->addr_ssn_count);
00434 msg_addr++;
00435 }
00436 msg_group = (msg_svr_group_t *)msg_addr;
00437 }
00438 msp_spinlock_unlock(&svr_group_lock);
00439
00440 pconn_client_send(client_hdl, EQ2_BALANCE_MSG_SVR_GROUP, msg, len);
00441 free(msg);
00442 }
00443
00457 static void
00458 client_event_hdlr (pconn_client_t *client UNUSED, pconn_event_t event,
00459 void *cookie UNUSED)
00460 {
00461 switch (event) {
00462 case PCONN_EVENT_ESTABLISHED:
00463 msp_log(LOG_INFO, "%s: Connect to server OK.", __func__);
00464 connect_state = CONNECT_OK;
00465 break;
00466 case PCONN_EVENT_SHUTDOWN:
00467 msp_log(LOG_INFO, "%s: Connection to server is down.", __func__);
00468 connect_state = CONNECT_NA;
00469 break;
00470 case PCONN_EVENT_FAILED:
00471 msp_log(LOG_INFO, "%s: Connection to server FAILED.", __func__);
00472 connect_state = CONNECT_NA;
00473 break;
00474 default:
00475 msp_log(LOG_ERR, "%s: Unknown event %d.", __func__, event);
00476 }
00477 }
00478
00495 static status_t
00496 client_msg_hdlr (pconn_client_t *client UNUSED, ipc_msg_t *msg UNUSED,
00497 void *cookie UNUSED)
00498 {
00499
00500 return 0;
00501 }
00502
00510 static void
00511 connect_manager (evContext ctx)
00512 {
00513 pconn_client_params_t param;
00514
00515 bzero(¶m, sizeof(param));
00516 param.pconn_peer_info.ppi_peer_type = PCONN_PEER_TYPE_RE;
00517 param.pconn_port = EQ2_MGMT_SERVER_PORT;
00518 param.pconn_num_retries = EQ2_CLIENT_RETRY;
00519 param.pconn_event_handler = client_event_hdlr;
00520 client_hdl = pconn_client_connect_async(¶m, ctx, client_msg_hdlr, NULL);
00521 if (client_hdl) {
00522 connect_state = CONNECT_INPROGRESS;
00523 msp_log(LOG_INFO, "%s: Connecting to server...", __func__);
00524 } else {
00525 msp_log(LOG_ERR, "%s: Connect to server ERROR!", __func__);
00526 }
00527 }
00528
00545 static void
00546 upload_status (evContext ctx, void *uap UNUSED, struct timespec due UNUSED,
00547 struct timespec inter UNUSED)
00548 {
00549 if (connect_state == CONNECT_OK) {
00550 msp_log(LOG_INFO, "%s: Send server group info.", __func__);
00551 send_svr_group();
00552 } else if (connect_state == CONNECT_NA) {
00553 connect_manager(ctx);
00554 }
00555 }
00556
00573 sp_svc_set_t *
00574 get_svc_set (uint16_t id, bool active)
00575 {
00576 sp_svc_set_t *ss;
00577
00578 LIST_FOREACH(ss, &svc_set_head, entry) {
00579 if ((ss->ss_policy->ss_id == id) && (ss->ss_active = active)){
00580 break;
00581 }
00582 }
00583 return ss;
00584 }
00585
00593 void
00594 del_svc_set (sp_svc_set_t *ss)
00595 {
00596 if (ss == NULL) {
00597 return;
00598 }
00599 LIST_REMOVE(ss, entry);
00600 msp_shm_free(ctrl_ctx->policy_shm_handle, ss->ss_policy);
00601 free(ss);
00602 }
00603
00617 int
00618 equilibrium2_balance_ctrl_hdlr (msvcs_control_context_t *ctx,
00619 msvcs_control_event_t ev)
00620 {
00621
00622
00623 ctrl_ctx = ctx;
00624
00625 switch (ev) {
00626 case MSVCS_CONTROL_EV_INIT:
00627 msp_log(LOG_INFO, "%s: MSVCS_CONTROL_EV_INIT", __func__);
00628
00629 if (msvcs_plugin_resolve_event_class(EQ2_CLASSIFY_SVC_NAME,
00630 EV_CLASS_CLASSIFY, &classify_ev_class) < 0) {
00631 msp_log(LOG_ERR, "%s: Resovle event class EV_CLASS_CLASSIFY ERROR!",
00632 __func__);
00633 }
00634
00635 msvcs_plugin_subscribe_data_events(balance_pid, classify_ev_class,
00636 MSVCS_GET_EVENT_MASK(EV_CLASSIFY_FIRST_PACKET));
00637
00638
00639 svr_group_head = msp_shm_alloc(ctrl_ctx->policy_shm_handle,
00640 sizeof(*svr_group_head));
00641 if (svr_group_head == NULL) {
00642 msp_log(LOG_ERR, "%s: Allocate memory ERROR!", __func__);
00643 break;
00644 }
00645 LIST_INIT(svr_group_head);
00646 svr_group_count = 0;
00647 msp_spinlock_init(&svc_set_lock);
00648 msp_spinlock_init(&svr_group_lock);
00649 connect_state = CONNECT_NA;
00650
00651
00652 evInitID(&ev_timer_id);
00653 if (evSetTimer(*ctx->scc_ev_ctxt, upload_status, NULL,
00654 evAddTime(evNowTime(), evConsTime(STATUS_UPDATE_INTERVAL, 0)),
00655 evConsTime(STATUS_UPDATE_INTERVAL, 0), &ev_timer_id) < 0) {
00656 msp_log(LOG_ERR, "%s: Schedule sending status ERROR!", __func__);
00657 }
00658
00659 break;
00660
00661 case MSVCS_CONTROL_EV_CFG_BLOB:
00662 msp_log(LOG_INFO, "%s: MSVCS_CONTROL_EV_CFG_BLOB", __func__);
00663 read_gencfg_blob(ctx->plugin_data);
00664 break;
00665
00666 default:
00667 msp_log(LOG_ERR, "%s: Unknown event!", __func__);
00668 }
00669 return 0;
00670 }
00671