ipprobe-mt_thrd.c

00001 /*
00002  * $Id: ipprobe-mt_thrd.c 347265 2009-11-19 13:55:39Z kdickman $
00003  *
00004  * This code is provided as is by Juniper Networks SDK Developer Support.
00005  * It is provided with no warranties or guarantees, and Juniper Networks
00006  * will not provide support or maintenance of this code in any fashion.
00007  * The code is provided only to help a developer better understand how
00008  * the SDK can be used.
00009  * 
00010  * Copyright (c) 2009, Juniper Networks, Inc.
00011  * All rights reserved.
00012  */
00013 
00014 #include <unistd.h>
00015 #include <string.h>
00016 #include <signal.h>
00017 #include <pthread.h>
00018 #include <errno.h>
00019 #include <sys/types.h>
00020 #include <sys/queue.h>
00021 #include <sys/socket.h>
00022 #include <isc/eventlib.h>
00023 #include <netinet/in.h>
00024 #include <netinet/in_systm.h>
00025 #include <netinet/ip.h>
00026 #include <netinet/udp.h>
00027 #include <netinet/ip_icmp.h>
00028 #include <arpa/inet.h>
00029 #include <jnx/aux_types.h>
00030 #include <jnx/bits.h>
00031 #include <jnx/patricia.h>
00032 #include <jnx/pconn.h>
00033 #include <jnx/junos_trace.h>
00034 #include "ipprobe-mt.h"
00035 #include IPPROBE_MT_OUT_H
00036 
00037 extern probe_mngr_t probe_mngr;
00038 extern rspd_mngr_t rspd_mngr;
00039 
00051 static u_short
00052 icmp_cksum (void *buf, int len)
00053 {
00054     u_char *p = (u_char *)buf;
00055     uint sum = 0;
00056 
00057     while (len > 1) {
00058         sum += (*p << 8) + *(p + 1);
00059         len -= 2;
00060         p += 2;
00061     }
00062     if (len == 1) {
00063         sum += (*p << 8);
00064     }
00065 
00066     while (sum >> 16)
00067         sum = (sum & 0xFFFF) + (sum >> 16);
00068 
00069     sum = ~sum;
00070 
00071     return(htons((u_short)sum));
00072 }
00073 
00089 static u_short
00090 udp_cksum (u_short len, void *src_addr, void *dst_addr, void *buf)
00091 {
00092     uint sum = 0;
00093     u_char *p = NULL;
00094 
00095     p = (u_char *)src_addr;
00096     sum += (*p << 8) + *(p + 1);
00097     sum += (*(p + 2) << 8) + *(p + 3);
00098 
00099     p = (u_char *)dst_addr;
00100     sum += (*p << 8) + *(p + 1);
00101     sum += (*(p + 2) << 8) + *(p + 3);
00102 
00103     sum += IPPROTO_UDP + len;
00104 
00105     p = (u_char *)buf;
00106     while (len > 1) {
00107         sum += (*p << 8) + *(p + 1);
00108         len -= 2;
00109         p += 2;
00110     }
00111     if (len == 1) {
00112         sum += (*p << 8);
00113     }
00114 
00115     while (sum >> 16)
00116         sum = (sum & 0xFFFF) + (sum >> 16);
00117 
00118     sum = ~sum;
00119 
00120     return(htons((u_short)sum));
00121 }
00122 
00134 static float
00135 time_diff (struct timeval *time1, struct timeval *time2)
00136 {
00137     float t1, t2;
00138 
00139     t1 = time1->tv_sec * 1000 + (float)time1->tv_usec / 1000;
00140     t2 = time2->tv_sec * 1000 + (float)time2->tv_usec / 1000;
00141     return (t2 - t1);
00142 }
00143 
00153 static void
00154 process_probe_pkts (probe_t *probe, probe_dst_t *dst)
00155 {
00156     pkt_stats_t *pkt_stats;
00157     probe_rx_buf_t *rx_buf;
00158     probe_pkt_t *pkt;
00159     probe_pkt_data_t *data;
00160     probe_pkt_data_t *data_prev = NULL;
00161     int i;
00162     float max_delay_sd = 0;
00163     float max_delay_ds = 0;
00164     float max_delay_rr = 0;
00165     float sum_delay_sd = 0;
00166     float sum_delay_ds = 0;
00167     float sum_delay_rr = 0;
00168     float max_jitter_sd = 0;
00169     float max_jitter_ds = 0;
00170     float max_jitter_rr = 0;
00171     float sum_jitter_sd = 0;
00172     float sum_jitter_ds = 0;
00173     float sum_jitter_rr = 0;
00174 
00175     PROBE_TRACE(PROBE_TF_THRD, "%s: Process %d replied packets from 0x%08x",
00176             __func__, dst->rx_count, dst->dst_addr);
00177     pkt_stats = calloc(dst->rx_count, sizeof(*pkt_stats));
00178     INSIST_ERR(pkt_stats != NULL);
00179 
00180     i = 0;
00181     LIST_FOREACH(rx_buf, &dst->rx_buf_list, entry) {
00182         if (probe->params.proto == IPPROTO_UDP) {
00183             data = (probe_pkt_data_t *)rx_buf->pkt;
00184         } else {
00185             pkt = (probe_pkt_t *)rx_buf->pkt;
00186             data = &pkt->data;
00187         }
00188         if (probe->params.proto != IPPROTO_ICMP) {
00189             pkt_stats[i].delay_sd = time_diff(&data->tx_time,
00190                     &data->target_rx_time);
00191 
00192             pkt_stats[i].delay_ds = time_diff(&data->target_tx_time,
00193                     &data->rx_time);
00194         }
00195         pkt_stats[i].rrt = time_diff(&data->tx_time, &data->rx_time);
00196 
00197         if (rx_buf == LIST_FIRST(&dst->rx_buf_list)) {
00198             goto continue_loop;
00199         }
00200         if (probe->params.proto != IPPROTO_ICMP) {
00201             pkt_stats[i].jitter_sd = time_diff(&data_prev->target_rx_time,
00202                     &data->target_rx_time) -
00203                     time_diff(&data_prev->tx_time, &data->tx_time);
00204 
00205             pkt_stats[i].jitter_ds = time_diff(&data_prev->rx_time,
00206                     &data->rx_time) -
00207                     time_diff(&data_prev->target_tx_time,
00208                     &data->target_tx_time);
00209         }
00210         pkt_stats[i].jitter_rr = time_diff(&data_prev->rx_time,
00211                 &data->rx_time) -
00212                 time_diff(&data_prev->tx_time, &data->tx_time);
00213 continue_loop:
00214         data_prev = data;
00215         i++;
00216     }
00217 
00218     for (i = 0; i < dst->rx_count; i++) {
00219         if (pkt_stats[i].delay_sd > max_delay_sd) {
00220             max_delay_sd = pkt_stats[i].delay_sd;
00221         }
00222         if (pkt_stats[i].delay_ds > max_delay_ds) {
00223             max_delay_ds = pkt_stats[i].delay_ds;
00224         }
00225         if (pkt_stats[i].rrt > max_delay_rr) {
00226             max_delay_rr = pkt_stats[i].rrt;
00227         }
00228         if (abs(pkt_stats[i].jitter_sd) > abs(max_jitter_sd)) {
00229             max_jitter_sd = pkt_stats[i].jitter_sd;
00230         }
00231         if (abs(pkt_stats[i].jitter_ds) > abs(max_jitter_ds)) {
00232             max_jitter_ds = pkt_stats[i].jitter_ds;
00233         }
00234         if (abs(pkt_stats[i].jitter_rr) > abs(max_jitter_rr)) {
00235             max_jitter_rr = pkt_stats[i].jitter_rr;
00236         }
00237         sum_delay_sd += pkt_stats[i].delay_sd;
00238         sum_delay_ds += pkt_stats[i].delay_ds;
00239         sum_delay_rr += pkt_stats[i].rrt;
00240         sum_jitter_sd += pkt_stats[i].jitter_sd;
00241         sum_jitter_ds += pkt_stats[i].jitter_ds;
00242         sum_jitter_rr += pkt_stats[i].jitter_rr;
00243     }
00244 
00245     dst->result.delay_sd_average = sum_delay_sd / dst->rx_count;
00246     dst->result.delay_ds_average = sum_delay_ds / dst->rx_count;
00247     dst->result.delay_rr_average = sum_delay_rr / dst->rx_count;
00248     dst->result.delay_sd_max = max_delay_sd;
00249     dst->result.delay_ds_max = max_delay_ds;
00250     dst->result.delay_rr_max = max_delay_rr;
00251     dst->result.jitter_sd_average = sum_jitter_sd / (dst->rx_count - 1);
00252     dst->result.jitter_ds_average = sum_jitter_ds / (dst->rx_count - 1);
00253     dst->result.jitter_rr_average = sum_jitter_rr / (dst->rx_count - 1);
00254     dst->result.jitter_sd_max = max_jitter_sd;
00255     dst->result.jitter_ds_max = max_jitter_ds;
00256     dst->result.jitter_rr_max = max_jitter_rr;
00257 
00258     free(pkt_stats);
00259 }
00260 
00268 static void
00269 probe_dst_rx_buf_clear (probe_dst_t *dst)
00270 {
00271     probe_rx_buf_t *rx_buf;
00272 
00273     LIST_FOREACH_SAFE(rx_buf, &dst->rx_buf_list, entry, dst->rx_buf_last) {
00274         LIST_REMOVE(rx_buf, entry);
00275         free(rx_buf);
00276     }
00277 }
00278 
00283 static void
00284 probe_pkt_hdlr (evContext ctx UNUSED, void *uap, int fd UNUSED,
00285         int evmask UNUSED)
00286 {
00287     probe_t *probe = uap;
00288     probe_rx_buf_t *rx_buf;
00289     int recv_len;
00290     struct sockaddr_in addr;
00291     int addr_len;
00292     probe_pkt_t *pkt;
00293     probe_pkt_data_t *data;
00294     probe_dst_t *dst;
00295     struct timeval rx_time;
00296 
00297     gettimeofday(&rx_time, NULL);
00298     rx_buf = calloc(1, sizeof(*rx_buf) + probe->params.pkt_size);
00299     INSIST_ERR(rx_buf);
00300 
00301     recv_len = recvfrom(probe->rx_socket, &rx_buf->pkt, probe->params.pkt_size,
00302             0, (struct sockaddr *)&addr, &addr_len);
00303 
00304     if (recv_len == 0) {
00305         goto ret_err;
00306     } else if(recv_len < 0) {
00307         if (errno != EAGAIN) {
00308             PROBE_LOG(LOG_ERR, "%s: Receive probe packets ERROR!", __func__);
00309         }
00310         goto ret_err;
00311     }
00312 
00313     if (probe->params.proto == IPPROTO_UDP) {
00314         data = (probe_pkt_data_t *)rx_buf->pkt;
00315     } else {
00316         pkt = (probe_pkt_t *)rx_buf->pkt;
00317         data = &pkt->data;
00318         if ((probe->params.proto == IPPROTO_ICMP)
00319                 && (pkt->icmp.icmp_type != ICMP_ECHOREPLY)) {
00320             goto ret_err;
00321         }
00322     }
00323     bcopy(&rx_time, &data->rx_time, sizeof(struct timeval));
00324 
00325     PROBE_TRACE(PROBE_TF_THRD, "%s: protocol %d, type %d, seq %d, src 0x%08x.",
00326             __func__, probe->params.proto, data->type, data->seq,
00327             addr.sin_addr.s_addr);
00328 
00329     dst = (probe_dst_t *)patricia_get(&probe->dst_pat, sizeof(in_addr_t),
00330             &addr.sin_addr.s_addr);
00331     if (!dst) {
00332         PROBE_TRACE(PROBE_TF_THRD, "%s: Dst 0x%08x does not exits!",
00333                 __func__, addr.sin_addr.s_addr);
00334         goto ret_err;
00335     }
00336     if (dst->rx_buf_last) {
00337         LIST_INSERT_AFTER(dst->rx_buf_last, rx_buf, entry);
00338     } else {
00339         LIST_INSERT_HEAD(&dst->rx_buf_list, rx_buf, entry);
00340     }
00341     dst->rx_buf_last = rx_buf;
00342 
00343     if (++dst->rx_count == probe->params.pkt_count) {
00344         process_probe_pkts(probe, dst);
00345         dst->state = PROBE_DST_STATE_DONE;
00346         probe_dst_rx_buf_clear(dst);
00347         PROBE_TRACE(PROBE_TF_THRD, "%s: Destination count %d.",
00348                 __func__, probe->dst_count - 1);
00349 
00350         /* Decrement the counter for running destinations and
00351          * exit if there is no destination running.
00352          */
00353         if (--probe->dst_count == 0) {
00354             pthread_exit(NULL);
00355         }
00356     }
00357     return;
00358 
00359 ret_err:
00360     free(rx_buf);
00361     return;
00362 }
00363 
00368 static void
00369 send_packet (evContext ctx UNUSED, void *uap, struct timespec due UNUSED,
00370         struct timespec inter UNUSED)
00371 {
00372     probe_t *probe = uap;
00373     probe_pkt_t *tx_pkt = (probe_pkt_t *)probe->tx_buf;
00374     struct sockaddr_in dst_addr;
00375     probe_dst_t *dst;
00376     int send_len;
00377     patnode *node;
00378 
00379     do {
00380         bzero(&dst_addr, sizeof(dst_addr));
00381         dst_addr.sin_family = AF_INET;
00382         tx_pkt->data.seq = probe->tx_count;
00383         if (probe->tx_count == probe->params.pkt_count - 1) {
00384             tx_pkt->data.type = PROBE_PKT_REQ_LAST;
00385         }
00386 
00387         node = NULL;
00388         while ((node = patricia_find_next(&probe->dst_pat, node))) {
00389             gettimeofday(&tx_pkt->data.tx_time, NULL);
00390             dst = (probe_dst_t *)node;
00391             if (dst->state == PROBE_DST_STATE_INIT) {
00392                 PROBE_TRACE(PROBE_TF_THRD, "%s: dst 0x%08x is in init mode.",
00393                         __func__, dst);
00394                 continue;
00395             }
00396             tx_pkt->header.ip_dst.s_addr = dst->dst_addr;
00397             tx_pkt->header.ip_src.s_addr = dst->local_addr;
00398             if (probe->params.proto == IPPROTO_ICMP) {
00399                 tx_pkt->icmp.icmp_cksum = 0;
00400                 tx_pkt->icmp.icmp_cksum = icmp_cksum(&tx_pkt->icmp,
00401                         probe->params.pkt_size - sizeof(struct ip));
00402             } else if (probe->params.proto == IPPROTO_UDP) {
00403                 tx_pkt->udp.uh_sum = 0;
00404                 tx_pkt->udp.uh_sum = udp_cksum(ntohs(tx_pkt->udp.uh_ulen),
00405                         &tx_pkt->header.ip_src.s_addr,
00406                         &tx_pkt->header.ip_dst.s_addr,
00407                         &tx_pkt->udp);
00408             }
00409             dst_addr.sin_addr.s_addr = dst->dst_addr;
00410             send_len = sendto(probe->tx_socket, tx_pkt, probe->params.pkt_size,
00411                     0, (struct sockaddr *)&dst_addr, sizeof(dst_addr));
00412             if (send_len < 0) {
00413                 PROBE_LOG(LOG_ERR, "Send probe packet ERROR!");
00414             }
00415             PROBE_TRACE(PROBE_TF_THRD, "%s: Sent packet to %s, dst(0x%08x)",
00416                     __func__, inet_ntoa(dst_addr.sin_addr), dst);
00417         }
00418 
00419         if (++probe->tx_count == probe->params.pkt_count) {
00420 
00421             /* Disable the timer. */
00422             evClearTimer(probe->ev_ctx, probe->tx_tid);
00423             evInitID(&probe->tx_tid);
00424             PROBE_TRACE(PROBE_TF_THRD, "%s: Sent %d probe packets to %s.",
00425                     __func__, probe->tx_count, inet_ntoa(dst_addr.sin_addr));
00426             break;
00427         }
00428     } while (probe->params.pkt_interval == 0);
00429 }
00430 
00438 static void
00439 probe_tx_socket_close (probe_t *probe)
00440 {
00441     if (evTestID(probe->tx_tid)) {
00442         evClearTimer(probe->ev_ctx, probe->tx_tid);
00443         evInitID(&probe->tx_tid);
00444     }
00445     if (probe->tx_socket >= 0) {
00446         close(probe->tx_socket);
00447         probe->tx_socket = -1;
00448     }
00449     if (probe->tx_buf) {
00450         free(probe->tx_buf);
00451         probe->tx_buf = NULL;
00452     }
00453 }
00454 
00464 static int
00465 probe_tx_socket_open (probe_t *probe)
00466 {
00467     const int on = 1;
00468 
00469     /* Create TX socket for sending probe packets. */
00470     probe->tx_socket = socket(AF_INET, SOCK_RAW, probe->params.proto);
00471     if (probe->tx_socket < 0) {
00472         PROBE_LOG(LOG_ERR, "Create probe TX socket ERROR!");
00473         goto ret_err;
00474     }
00475 
00476     if (setsockopt(probe->tx_socket, IPPROTO_IP, IP_HDRINCL, &on,
00477             sizeof(on)) < 0) {
00478         PROBE_LOG(LOG_ERR, "Set probe transmit socket IP_HDRINCL ERROR!");
00479         goto ret_err;
00480     }
00481     probe->tx_buf = calloc(1, probe->params.pkt_size);
00482     INSIST_ERR(probe->tx_buf);
00483 
00484     PROBE_TRACE(PROBE_TF_THRD, "%s: Tx socket is created on protocol %d.",
00485             __func__, probe->params.proto);
00486     return 0;
00487 
00488 ret_err:
00489     probe_tx_socket_close(probe);
00490     return -1;
00491 }
00492 
00500 static void
00501 probe_rx_socket_close (probe_t *probe)
00502 {
00503     patnode *node;
00504     probe_dst_t *dst;
00505 
00506     if (evTestID(probe->rx_fid)) {
00507         evDeselectFD(probe->ev_ctx, probe->rx_fid);
00508         evInitID(&probe->rx_fid);
00509     }
00510     if (probe->rx_socket >= 0) {
00511         close(probe->rx_socket);
00512         probe->rx_socket = -1;
00513     }
00514     node = NULL;
00515     while ((node = patricia_find_next(&probe->dst_pat, node))) {
00516         dst = (probe_dst_t *)node;
00517         probe_dst_rx_buf_clear(dst);
00518     }
00519 }
00520 
00532 static int
00533 probe_rx_socket_open (probe_t *probe, probe_dst_t *dst)
00534 {
00535     const int on = 1;
00536     struct sockaddr_in addr;
00537 
00538     /* Create RX socket. */
00539     if (probe->params.proto == IPPROTO_UDP) {
00540         probe->rx_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
00541         if (probe->rx_socket < 0) {
00542             PROBE_LOG(LOG_ERR, "Create probe receive socket ERROR!");
00543             goto ret_err;
00544         }
00545         if (setsockopt(probe->rx_socket, SOL_SOCKET, SO_REUSEADDR, &on,
00546                 sizeof(on)) < 0) {
00547             PROBE_LOG(LOG_ERR, "Set probe receive socket option ERROR!");
00548             goto ret_err;
00549         }
00550         bzero(&addr, sizeof(addr));
00551         addr.sin_family = AF_INET;
00552         addr.sin_port = htons(probe->params.src_port);
00553         addr.sin_addr.s_addr = htonl(INADDR_ANY);
00554         if (bind(probe->rx_socket, (struct sockaddr *)&addr, sizeof(addr))
00555                 < 0) {
00556             PROBE_LOG(LOG_ERR, "Bind probe receive socket ERROR!");
00557             goto ret_err;
00558         }
00559         PROBE_TRACE(PROBE_TF_THRD, "%s: Rx socket is created on UDP port %d.",
00560                 __func__, probe->params.src_port);
00561     } else {
00562         probe->rx_socket = dup(probe->tx_socket);
00563         PROBE_TRACE(PROBE_TF_THRD, "%s: Rx socket is created on protocol %d.",
00564                 __func__, probe->params.proto);
00565     }
00566     LIST_INIT(&dst->rx_buf_list);
00567     dst->rx_buf_last = NULL;
00568     dst->rx_count = 0;
00569 
00570     evInitID(&probe->rx_fid);
00571     if (evSelectFD(probe->ev_ctx, probe->rx_socket, EV_READ, probe_pkt_hdlr,
00572             probe, &probe->rx_fid) < 0) {
00573         PROBE_LOG(LOG_ERR, "Add probe receive socket!\n");
00574         goto ret_err;
00575     }
00576     return 0;
00577 
00578 ret_err:
00579     probe_rx_socket_close(probe);
00580     return -1;
00581 }
00582 
00592 static int
00593 probe_start (probe_t *probe)
00594 {
00595     probe_pkt_t *tx_pkt = (probe_pkt_t *)probe->tx_buf;
00596 
00597     tx_pkt->header.ip_v = IPVERSION;
00598     tx_pkt->header.ip_hl = IP_HEADER_LEN >> 2;
00599     tx_pkt->header.ip_tos = probe->params.tos;
00600     tx_pkt->header.ip_len = htons(probe->params.pkt_size);
00601     tx_pkt->header.ip_ttl = PROBE_PKT_TTL_DEFAULT;
00602     tx_pkt->header.ip_p = probe->params.proto;
00603 
00604     if (probe->params.proto == IPPROTO_ICMP) {
00605         tx_pkt->icmp.icmp_type = ICMP_ECHO;
00606         tx_pkt->icmp.icmp_code = 0;
00607         tx_pkt->data.type = PROBE_PKT_REPLY;
00608     } else if (probe->params.proto == IPPROTO_UDP) {
00609         tx_pkt->udp.uh_sport = htons(probe->params.src_port);
00610         tx_pkt->udp.uh_dport = htons(probe->params.dst_port);
00611         tx_pkt->udp.uh_ulen = htons(probe->params.pkt_size - sizeof(struct ip));
00612         tx_pkt->data.type = PROBE_PKT_REQ;
00613     } else {
00614         tx_pkt->data.type = PROBE_PKT_REQ;
00615     }
00616     probe->tx_count = 0;
00617 
00618     if (evSetTimer(probe->ev_ctx, send_packet, probe, evConsTime(0, 0),
00619             evConsTime(0, probe->params.pkt_interval * 1000000),
00620             &probe->tx_tid) < 0) {
00621         PROBE_LOG(LOG_ERR, "Set timer to schedule sending packets ERROR!");
00622         return -1;
00623     }
00624     return 0;
00625 }
00626 
00634 static void
00635 rspd_mgmt_close (probe_t *probe)
00636 {
00637     if (evTestID(probe->rspd_read_fid)) {
00638         evDeselectFD(probe->ev_ctx, probe->rspd_read_fid);
00639         evInitID(&probe->rspd_read_fid);
00640     }
00641     if (probe->rspd_socket >= 0) {
00642         close(probe->rspd_socket);
00643         probe->rspd_socket = -1;
00644     }
00645 }
00646 
00651 static void
00652 rspd_mgmt_pkt_hdlr (evContext lev_ctx UNUSED, void *uap, int fd,
00653         int eventmask UNUSED)
00654 {
00655     probe_t *probe = uap;
00656     struct sockaddr_in dst_addr;
00657     int recv_len = 0;
00658     socklen_t addr_len = sizeof(dst_addr);
00659     rspd_mgmt_pkt_t rx_pkt;
00660     probe_dst_t *dst;
00661 
00662     recv_len = read(fd, &rx_pkt, sizeof(rx_pkt));
00663 
00664     if (recv_len > 0) {
00665         if (recv_len == sizeof(rspd_mgmt_pkt_t)) {
00666             if (rx_pkt.type == RSPD_MGMT_MSG_ACK) {
00667                 getpeername(fd, (struct sockaddr *)&dst_addr, &addr_len);
00668 
00669                 PROBE_TRACE(PROBE_TF_THRD, "%s: Received ACK from %s, "
00670                         "start probe.",
00671                         __func__, inet_ntoa(dst_addr.sin_addr));
00672                 rspd_mgmt_close(probe);
00673                 dst = (probe_dst_t *)patricia_get(&probe->dst_pat,
00674                         sizeof(in_addr_t), &dst_addr.sin_addr.s_addr);
00675                 if (dst) {
00676                     dst->state = PROBE_DST_STATE_RUN;
00677                     probe_tx_socket_open(probe);
00678                     probe_rx_socket_open(probe, dst);
00679                     probe_start(probe);
00680                 } else {
00681                     PROBE_LOG(LOG_ERR, "%s: Destination does not exist!",
00682                             __func__);
00683                 }
00684             }
00685         } else {
00686             PROBE_LOG(LOG_ERR, "%s: Received fragment!", __func__);
00687         }
00688     } else if (recv_len < 0) {
00689         if (errno != EAGAIN) {
00690             PROBE_LOG(LOG_ERR, "%s, Read socket ERROR(%d)!",
00691                     __func__, errno);
00692         }
00693     }
00694 }
00695 
00707 static int
00708 rspd_mgmt_open (probe_t *probe, probe_dst_t *dst)
00709 {
00710     rspd_mgmt_pkt_t pkt;
00711     struct sockaddr_in rspd_addr;
00712     struct sockaddr_in local_addr;
00713     socklen_t len;
00714 
00715     PROBE_TRACE(PROBE_TF_THRD, "%s: Open socket to responder manager.",
00716             __func__);
00717     PROBE_TRACE(PROBE_TF_THRD, "%s: protocol %d, dst_port %d, src_port %d, "
00718             "size %d, count %d, interval %d.", __func__,
00719             probe->params.proto, probe->params.dst_port,
00720             probe->params.src_port, probe->params.pkt_size,
00721             probe->params.pkt_count, probe->params.pkt_interval);
00722 
00723     evInitID(&probe->rspd_read_fid);
00724     probe->rspd_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
00725     if (probe->rspd_socket < 0) {
00726         PROBE_LOG(LOG_ERR, "%s: Open socket to responder manager ERROR(%d)!",
00727                 __func__, errno);
00728         goto ret_err;
00729     }
00730 
00731     bzero(&rspd_addr, sizeof(rspd_addr));
00732     rspd_addr.sin_family = AF_INET;
00733     rspd_addr.sin_port = htons(rspd_mngr.port);
00734     rspd_addr.sin_addr.s_addr = dst->dst_addr;
00735 
00736     /* Find out the local address to this destination. */
00737     connect(probe->rspd_socket, (struct sockaddr *)&rspd_addr,
00738             sizeof(rspd_addr));
00739     len = sizeof(local_addr);
00740     getsockname(probe->rspd_socket, (struct sockaddr *)&local_addr, &len);
00741     dst->local_addr = local_addr.sin_addr.s_addr;
00742     PROBE_TRACE(PROBE_TF_THRD, "%s: Local address %s.",
00743             __func__, inet_ntoa(local_addr.sin_addr));
00744 
00745     if (evSelectFD(probe->ev_ctx, probe->rspd_socket, EV_READ,
00746             rspd_mgmt_pkt_hdlr, probe, &probe->rspd_read_fid)) {
00747         PROBE_LOG(LOG_ERR,
00748                 "%s: evSelectFD socket to responder manager ERROR(%d)!",
00749                 __func__, errno);
00750         goto ret_err;
00751     }
00752 
00753     pkt.type = RSPD_MGMT_MSG_REQ;
00754     pkt.port = probe->params.dst_port;
00755     pkt.proto = probe->params.proto;
00756     write(probe->rspd_socket, &pkt, sizeof(pkt));
00757 
00758     return 0;
00759 
00760 ret_err:
00761     rspd_mgmt_close(probe);
00762     return -1;
00763 }
00764 
00778 static status_t
00779 client_msg_hdlr (pconn_client_t *client UNUSED, ipc_msg_t *msg, void *cookie)
00780 {
00781     probe_t *probe = cookie;
00782     in_addr_t addr;
00783     probe_dst_t *dst;
00784 
00785     if (msg->subtype == PROBE_MGMT_MSG_ADD_DST) {
00786         addr = *((in_addr_t *)msg->data);
00787         PROBE_TRACE(PROBE_TF_THRD, "%s: Got request to add dst 0x%08x.",
00788                __func__, addr);
00789         PROBE_TRACE(PROBE_TF_THRD, "%s: probe 0x%08x, protocol %d, port %d, "
00790                 "size %d, count %d, interval %d.", __func__,
00791                 probe, probe->params.proto, probe->params.dst_port,
00792                 probe->params.pkt_size, probe->params.pkt_count,
00793                 probe->params.pkt_interval);
00794         dst = (probe_dst_t *)patricia_get(&probe->dst_pat, sizeof(in_addr_t),
00795                 &addr);
00796         if (dst) {
00797             if (dst->state == PROBE_DST_STATE_RUN ||
00798                     dst->state == PROBE_DST_STATE_INIT) {
00799                 PROBE_TRACE(PROBE_TF_THRD,
00800                         "%s: The probe to 0x%08x is running or initilaizing.",
00801                         __func__, addr);
00802                 return 0;
00803             }
00804         } else {
00805             dst = calloc(1, sizeof(*dst));
00806             INSIST_ERR(dst);
00807             dst->dst_addr = addr;
00808             dst->state = PROBE_DST_STATE_INIT;
00809             patricia_node_init_length(&dst->node, sizeof(in_addr_t));
00810             patricia_add(&probe->dst_pat, &dst->node);
00811         }
00812         probe->dst_count++;
00813 
00814         if (probe->params.proto == IPPROTO_ICMP) {
00815 
00816             /* ICMP probing doesn't need responder. */
00817             dst->state = PROBE_DST_STATE_RUN;
00818             probe_tx_socket_open(probe);
00819             probe_rx_socket_open(probe, dst);
00820             probe_start(probe);
00821         } else {
00822             rspd_mgmt_open(probe, dst);
00823         }
00824     }
00825     return 0;
00826 }
00827 
00839 static void
00840 client_event_hdlr (pconn_client_t *client, pconn_event_t event, void *cookie)
00841 {
00842     probe_t *probe = cookie;
00843 
00844     switch (event) {
00845     case PCONN_EVENT_ESTABLISHED:
00846         PROBE_TRACE(PROBE_TF_THRD, "%s: Connected to the probe manager.",
00847                 __func__);
00848         pconn_client_send(client, PROBE_MGMT_MSG_REG, probe->params.name,
00849                 sizeof(probe->params.name));
00850         break;
00851     case PCONN_EVENT_SHUTDOWN:
00852         PROBE_TRACE(PROBE_TF_THRD, "%s: Connection is down.", __func__);
00853         break;
00854     case PCONN_EVENT_FAILED:
00855         PROBE_TRACE(PROBE_TF_THRD, "%s: Connect ERROR.", __func__);
00856         pthread_exit(NULL);
00857         break;
00858     default:
00859         PROBE_LOG(LOG_ERR, "%s: Unknown event %d.", __func__, event);
00860     }
00861 }
00862 
00870 static void
00871 probe_thrd_cleanup (void *arg)
00872 {
00873     probe_t *probe = arg;
00874 
00875     PROBE_TRACE(PROBE_TF_THRD, "%s: Thread cleanup and exit.", __func__);
00876     rspd_mgmt_close(probe);
00877     if (probe->client_hdl) {
00878         pconn_client_close(probe->client_hdl);
00879         probe->client_hdl = NULL;
00880     }
00881     probe_tx_socket_close(probe);
00882     probe_rx_socket_close(probe);
00883     evDestroy(probe->ev_ctx);
00884     probe->tid = NULL;
00885 }
00886 
00894 void *
00895 probe_thrd_entry (probe_t *probe)
00896 {
00897     pconn_client_params_t params;
00898     int old_val;
00899 
00900     PROBE_TRACE(PROBE_TF_THRD, "%s: Probe thread %s started.",
00901             __func__, probe->params.name);
00902     PROBE_TRACE(PROBE_TF_THRD, "%s: probe 0x%08x, protocol %d, port %d, "
00903             "size %d, count %d, interval %d.", __func__,
00904             probe, probe->params.proto, probe->params.dst_port,
00905             probe->params.pkt_size, probe->params.pkt_count,
00906             probe->params.pkt_interval);
00907     if (pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_val) < 0) {
00908         PROBE_LOG(LOG_ERR, "%s: Set cancel state ERROR(%d)!", __func__, errno);
00909     }
00910     if (pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &old_val) < 0) {
00911         PROBE_LOG(LOG_ERR, "%s: Set cancel type ERROR(%d)!", __func__, errno);
00912     }
00913     pthread_cleanup_push(probe_thrd_cleanup, probe);
00914 
00915     patricia_root_init(&probe->dst_pat, FALSE, sizeof(in_addr_t), 0);
00916     if (evCreate(&probe->ev_ctx) < 0) {
00917         PROBE_LOG(LOG_ERR, "%s: evCreate ERROR(%d)!", __func__, errno);
00918         goto ret_err;
00919     }
00920 
00921     bzero(&params, sizeof(params));
00922     params.pconn_peer_info.ppi_peer_type = PCONN_PEER_TYPE_RE;
00923     params.pconn_port = probe_mngr.port;
00924     params.pconn_num_retries = CONN_RETRY_DEFAULT;
00925     params.pconn_event_handler = client_event_hdlr;
00926     probe->client_hdl = pconn_client_connect_async(&params, probe->ev_ctx,
00927             client_msg_hdlr, probe);
00928     if (!probe->client_hdl) {
00929         PROBE_LOG(LOG_ERR, "%s: Connect to probe manager ERROR!", __func__);
00930         goto ret_err;
00931     }
00932 
00933     evMainLoopSyncSighdl(probe->ev_ctx);
00934 
00935 ret_err:
00936     PROBE_LOG(LOG_ERR, "%s: Probe thread exits with ERROR!", __func__);
00937     return NULL;
00938 }
00939 

2007-2009 Juniper Networks, Inc. All rights reserved. The information contained herein is confidential information of Juniper Networks, Inc., and may not be used, disclosed, distributed, modified, or copied without the prior written consent of Juniper Networks, Inc. in an express license. This information is subject to change by Juniper Networks, Inc. Juniper Networks, the Juniper Networks logo, and JUNOS are registered trademarks of Juniper Networks, Inc. in the United States and other countries. All other trademarks, service marks, registered trademarks, or registered service marks are the property of their respective owners.
Generated on Sun May 30 20:27:00 2010 for SDK Your Net Corporation IP Probe MT: ipprobe-mt 1.0 by Doxygen 1.5.1