monitube-data_ha.c

00001 /*
00002  * $Id: monitube-data_ha.c 346460 2009-11-14 05:06:47Z ssiano $
00003  *
00004  * This code is provided as is by Juniper Networks SDK Developer Support.
00005  * It is provided with no warranties or guarantees, and Juniper Networks
00006  * will not provide support or maintenance of this code in any fashion.
00007  * The code is provided only to help a developer better understand how
00008  * the SDK can be used.
00009  *
00010  * Copyright (c) 2008, Juniper Networks, Inc.
00011  * All rights reserved.
00012  */
00013 
00023 #include "monitube-data_main.h"
00024 #include "monitube-data_ha.h"
00025 #include "monitube-data_packet.h"
00026 #include <jnx/junos_sync.h>
00027 
00028 /*** Constants ***/
00029 
00030 #define MONITUBE_DC_REPLICATION_PORT 39079  
00031 
00032 #define MONITUBE_DC_REPLICATION_KA 30       
00033 
00034 /*** Data structures ***/
00035 
00036 static junos_sync_state_t     rep_state;     
00037 static junos_sync_callbacks_t rep_callbacks; 
00038 static junos_sync_context_t * rep_ctx;       
00039 
00040 
00041 /*** STATIC/INTERNAL Functions ***/
00042 
00043 
00050 static void
00051 replication_data_hton(replication_data_t * data)
00052 {
00053     data->bucket = htonl(data->bucket);
00054     data->dport = htons(data->dport);
00055     data->age_ts = htonl(data->age_ts);
00056     data->ssrc = htonl(data->ssrc);
00057     data->rate = htonl(data->rate);
00058     data->m_vrf = htonl(data->m_vrf);
00059     data->source.max_seq = htons(data->source.max_seq);
00060     data->source.cycles = htonl(data->source.cycles);
00061     data->source.base_seq = htonl(data->source.base_seq);
00062     data->source.bad_seq = htonl(data->source.bad_seq);
00063     data->source.probation = htonl(data->source.probation);
00064     data->source.received = htonl(data->source.received);
00065     data->source.expected_prior = htonl(data->source.expected_prior);
00066     data->source.received_prior = htonl(data->source.received_prior);
00067 }
00068 
00069 
00076 static void
00077 replication_data_ntoh(replication_data_t * data)
00078 {
00079     data->bucket = ntohl(data->bucket);
00080     data->dport = ntohs(data->dport);
00081     data->age_ts = ntohl(data->age_ts);
00082     data->ssrc = ntohl(data->ssrc);
00083     data->rate = ntohl(data->rate);
00084     data->m_vrf = ntohl(data->m_vrf);
00085     data->source.max_seq = ntohs(data->source.max_seq);
00086     data->source.cycles = ntohl(data->source.cycles);
00087     data->source.base_seq = ntohl(data->source.base_seq);
00088     data->source.bad_seq = ntohl(data->source.bad_seq);
00089     data->source.probation = ntohl(data->source.probation);
00090     data->source.received = ntohl(data->source.received);
00091     data->source.expected_prior = ntohl(data->source.expected_prior);
00092     data->source.received_prior = ntohl(data->source.received_prior);
00093 }
00094 
00095 
00102 static void
00103 delete_replication_data_hton(delete_replication_data_t * data)
00104 {
00105     data->bucket = htonl(data->bucket);
00106     data->dport = htons(data->dport);
00107 }
00108 
00109 
00116 static void
00117 delete_replication_data_ntoh(delete_replication_data_t * data)
00118 {
00119     data->bucket = ntohl(data->bucket);
00120     data->dport = ntohs(data->dport);
00121 }
00122 
00123 
00133 static void
00134 ack_notify(junos_sync_tlv_t * data UNUSED, size_t len UNUSED)
00135 {
00136     LOG(LOG_INFO, "%s: len %d", __func__, len);
00137 }
00138 
00139 
00150 static void
00151 decode(void * data, size_t len)
00152 {
00153     INSIST_ERR(!rep_state.is_master);
00154 
00155     if(len == sizeof(replication_data_t)) {
00156 
00157         LOG(LOG_INFO, "%s: update message of length %d", __func__, len);
00158 
00159         replication_data_ntoh((replication_data_t *)data);
00160         add_flow_state((replication_data_t *)data);
00161 
00162     } else if(len == sizeof(delete_replication_data_t)) {
00163 
00164         LOG(LOG_INFO, "%s: delete message of length %d", __func__, len);
00165 
00166         delete_replication_data_ntoh((delete_replication_data_t *)data);
00167         remove_flow_state((delete_replication_data_t *)data);
00168 
00169     } else {
00170 
00171         LOG(LOG_ERR, "%s: unknown message of length %d", __func__, len);
00172     }
00173 }
00174 
00175 
00188 static void *
00189 get_next(void * current, junos_sync_getnext_data_t * gndata)
00190 {
00191     replication_data_t * next, * last = (replication_data_t *)current;
00192     junos_sync_tlv_t * tlv_data = calloc(1,
00193             sizeof(junos_sync_tlv_t) + sizeof(replication_data_t));
00194 
00195     INSIST_ERR(tlv_data != NULL);
00196 
00197     LOG(LOG_INFO, "%s", __func__);
00198 
00199     tlv_data->tlv_len = sizeof(replication_data_t);
00200     next = (replication_data_t *)&tlv_data->tlv_value;
00201     gndata->data = tlv_data;
00202 
00203     if(get_next_flow_state(last, next) == EFAIL) {
00204         free(tlv_data);
00205         gndata->data = NULL;
00206         return NULL;
00207     }
00208 
00209     replication_data_hton(next);
00210 
00211     return next;
00212 }
00213 
00214 
00221 static void
00222 conn_status_change (junos_sync_conn_status_t conn_status)
00223 {
00224     LOG(LOG_INFO, "%s: replication connection status is %s", __func__,
00225         (conn_status == JUNOS_SYNC_CONN_DOWN) ? "Down" : "Up");
00226 }
00227 
00228 
00235 static void
00236 init_sync_done (size_t num_of_entries_synced)
00237 {
00238     LOG(LOG_INFO, "%s: initial replication is done, %d entries are synced",
00239             __func__, num_of_entries_synced);
00240 }
00241 
00242 
00249 static void
00250 bind_done(in_port_t port)
00251 {
00252     if(port != MONITUBE_DC_REPLICATION_PORT) {
00253         LOG(LOG_EMERG, "%s: master's replication server could not bind "
00254                 "to port %d", __func__, MONITUBE_DC_REPLICATION_PORT);
00255         return;
00256     }
00257 
00258     LOG(LOG_INFO, "%s: master's replication server is bound to port %d",
00259             __func__, port);
00260 }
00261 
00262 
00269 static void
00270 log_msg(const char * format, ...)
00271 {
00272     va_list ap;
00273     va_start(ap, format);
00274     vlogging(LOG_INFO, format, ap);
00275     va_end(ap);
00276 }
00277 
00278 
00282 static void
00283 db_clear(void)
00284 {
00285     LOG(LOG_INFO, "%s", __func__);
00286 }
00287 
00288 
00289 /*** GLOBAL/EXTERNAL Functions ***/
00290 
00291 
00304 status_t
00305 init_replication(in_addr_t master_address, evContext ev_ctx)
00306 {
00307     static bool first_time = true;
00308     int rc;
00309 
00310     if(!first_time) {
00311 
00312         if(rep_state.is_master && master_address != 0) {
00313             // shutdown master server and become slave, but start from scratch
00314             junos_sync_exit(rep_ctx);
00315         } else if(!rep_state.is_master && master_address == 0) {
00316 
00317             rep_state.is_master = true;
00318             rep_state.server_addr = 0;
00319 
00320             // slave can take over as master
00321             junos_sync_switchover(rep_ctx, &rep_state, &rep_callbacks);
00322             return SUCCESS;
00323         }
00324     }
00325 
00326     first_time = false;
00327 
00328     rep_ctx = NULL;
00329     bzero(&rep_state, sizeof(junos_sync_state_t));
00330     bzero(&rep_callbacks, sizeof(junos_sync_callbacks_t));
00331 
00332     rep_state.is_master = (master_address == 0) ? true : false;
00333     rep_state.no_resync = false;
00334     rep_state.server_port = MONITUBE_DC_REPLICATION_PORT;
00335     rep_state.keepalive_time = MONITUBE_DC_REPLICATION_KA;
00336 
00337     if(!rep_state.is_master) {
00338         rep_state.server_addr = master_address;
00339     }
00340 
00341     rep_callbacks.sync_ack_notify_func = ack_notify;
00342     rep_callbacks.sync_data_decode_func = decode;
00343     rep_callbacks.sync_data_get_next_func = get_next;
00344     rep_callbacks.sync_conn_status_change_func = conn_status_change;
00345     rep_callbacks.sync_init_sync_done_func = init_sync_done;
00346     rep_callbacks.sync_bind_done_func = bind_done;
00347     rep_callbacks.sync_log_msg_func = log_msg;
00348     rep_callbacks.sync_malloc_func = malloc;
00349     rep_callbacks.sync_free_func = free;
00350     rep_callbacks.sync_db_clear_func = db_clear;
00351 
00352     rep_ctx = junos_sync_init(&rep_state, &rep_callbacks);
00353 
00354     if (!rep_ctx) {
00355         LOG(LOG_ERR, "%s: Failed to initialize a junos_sync context.",
00356             __func__);
00357         first_time = true;
00358         return EFAIL;
00359     }
00360     
00361     rc = junos_sync_set_event_context(rep_ctx, ev_ctx);
00362     
00363     if (rc != JUNOS_SYNC_OK) {
00364         LOG(LOG_ERR, "%s: Failed to use main event context (err: %d).",
00365             __func__, rc);
00366         junos_sync_exit(rep_ctx);
00367         rep_ctx = NULL;
00368         first_time = true;
00369         return EFAIL;
00370     }
00371     
00372     rc = junos_sync_start(rep_ctx);
00373 
00374     if (rc != JUNOS_SYNC_OK) {
00375         LOG(LOG_ERR, "%s: Failed to start a junos_sync subsystem (err: %d).",
00376             __func__, rc);
00377         junos_sync_exit(rep_ctx);
00378         rep_ctx = NULL;
00379         first_time = true;
00380         return EFAIL;
00381     }
00382     return SUCCESS;
00383 }
00384 
00385 
00392 void
00393 update_replication_entry(replication_data_t * data)
00394 {
00395     replication_data_t * current;
00396     junos_sync_tlv_t * tlv_data;
00397 
00398     if(rep_ctx == NULL)
00399         return;
00400 
00401     LOG(LOG_INFO, "%s", __func__);
00402 
00403     // create a new tlv with the trailing data
00404     tlv_data = calloc(1, sizeof(junos_sync_tlv_t) + sizeof(replication_data_t));
00405     INSIST_ERR(tlv_data != NULL);
00406 
00407     tlv_data->tlv_len = sizeof(replication_data_t);
00408     current = (replication_data_t *)&tlv_data->tlv_value;
00409 
00410     // populate data to enqueue
00411     memcpy(current, data, sizeof(replication_data_t));
00412 
00413     replication_data_hton(current);
00414 
00415     junos_sync_queue(rep_ctx, tlv_data, false);
00416 }
00417 
00418 
00425 void
00426 delete_replication_entry(delete_replication_data_t * data)
00427 {
00428     delete_replication_data_t * current;
00429     junos_sync_tlv_t * tlv_data;
00430 
00431     if(rep_ctx == NULL)
00432         return;
00433 
00434     LOG(LOG_INFO, "%s", __func__);
00435 
00436     // create a new tlv with the trailing data
00437     tlv_data = calloc(1, sizeof(junos_sync_tlv_t) +
00438                             sizeof(delete_replication_data_t));
00439     INSIST_ERR(tlv_data != NULL);
00440 
00441     tlv_data->tlv_len = sizeof(delete_replication_data_t);
00442     current = (delete_replication_data_t *)&tlv_data->tlv_value;
00443 
00444     // populate data to enqueue
00445     memcpy(current, data, sizeof(delete_replication_data_t));
00446 
00447     delete_replication_data_hton(current);
00448 
00449     junos_sync_queue(rep_ctx, tlv_data, false);
00450 }
00451 
00452 
00456 void
00457 stop_replication(void)
00458 {
00459     if(rep_ctx) {
00460         junos_sync_exit(rep_ctx);
00461     }
00462     rep_ctx = NULL;
00463 }

2007-2009 Juniper Networks, Inc. All rights reserved. The information contained herein is confidential information of Juniper Networks, Inc., and may not be used, disclosed, distributed, modified, or copied without the prior written consent of Juniper Networks, Inc. in an express license. This information is subject to change by Juniper Networks, Inc. Juniper Networks, the Juniper Networks logo, and JUNOS are registered trademarks of Juniper Networks, Inc. in the United States and other countries. All other trademarks, service marks, registered trademarks, or registered service marks are the property of their respective owners.
Generated on Sun May 30 20:27:02 2010 for SDK Your Net Corporation Monitube IPTV Monitoring Example: monitube-data 1.0 by Doxygen 1.5.1