monitube2-data_packet.c

Go to the documentation of this file.
00001 /*
00002  * $Id: monitube2-data_packet.c 347265 2009-11-19 13:55:39Z kdickman $
00003  *
00004  * This code is provided as is by Juniper Networks SDK Developer Support.
00005  * It is provided with no warranties or guarantees, and Juniper Networks
00006  * will not provide support or maintenance of this code in any fashion.
00007  * The code is provided only to help a developer better understand how
00008  * the SDK can be used.
00009  *
00010  * Copyright (c) 2009, Juniper Networks, Inc.
00011  * All rights reserved.
00012  */
00013 
00021 #include "monitube2-data_main.h"
00022 #include <jnx/msp_hw_ts.h>
00023 #include <sys/jnx/jbuf.h>
00024 #include <netinet/ip.h>
00025 #include <netinet/udp.h>
00026 #include "monitube2-data_config.h"
00027 #include "monitube2-data_conn.h"
00028 #include "monitube2-data_rtp.h"
00029 #include "monitube2-data_packet.h"
00030 
00031 /*** Constants ***/
00032 
00037 #define MPEG_TS_PACKET_BYTES (188)
00038 
00042 #define MPEG_TS_HEADER_BYTES (4)
00043 
00044 /*** Data Structures ***/
00045 
00046 /*** STATIC/INTERNAL Functions ***/
00047 
00070 static void
00071 checksum_adjust(unsigned char * chksum,
00072                 unsigned char * optr, 
00073                 int olen,
00074                 unsigned char * nptr,
00075                 int nlen)
00076 {
00077     long x, old, new_;
00078     x = chksum[0] * 256 + chksum[1];
00079     x = ~x & 0xFFFF;
00080     while (olen) {
00081         old = optr[0] * 256 + optr[1];
00082         optr += 2;
00083         x -= old & 0xffff;
00084         if (x <= 0) {
00085             x--;
00086             x &= 0xffff;
00087         }
00088         olen -= 2;
00089     }
00090     while (nlen) {
00091         new_ = nptr[0] * 256 + nptr[1];
00092         nptr += 2;
00093         x += new_ & 0xffff;
00094         if (x & 0x10000) {
00095             x++;
00096             x &= 0xffff;
00097         }
00098         nlen -= 2;
00099     }
00100     x = ~x & 0xFFFF;
00101     chksum[0] = x / 256;
00102     chksum[1] = x & 0xff;
00103 }
00104 
00123 static void
00124 update_stats_for_flow(rtp_hdr_t * rh,
00125                       flow_entry_t * flow,
00126                       uint16_t length,
00127                       uint16_t ssid,
00128                       uint64_t * fpga_ts)
00129 {
00130     int pl_len;
00131     double rel_time, tmp;
00132     uint64_t ts = *fpga_ts;
00133 
00134     // find rcv'd time relative to base_ts but in seconds w/ 0.0001 precision
00135     if (flow->base_ts < ts) { // hasn't wrapped around yet
00136 
00137         rel_time = (double) (ts - flow->base_ts) / msp_hw_ts64_frequency();
00138 
00139     } else if (flow->base_ts > ts) { // wrapped around
00140 
00141         rel_time = (double) (ts - flow->base_ts + 1 + UQUAD_MAX)
00142                 / msp_hw_ts64_frequency();
00143     } else {
00144         rel_time = 0.000001; // shouldn't happen (base is always before ts 
00145         //                   unless wrapped)
00146     }
00147 
00148     // Check if we are into the next timeframe (base + 1 sec)
00149     if (rel_time > 1.0) {
00150 
00151         // reset the timeframe start to the next second
00152         flow->base_ts += msp_hw_ts64_frequency();
00153         rel_time -= 1.0; // in next interval
00154 
00155         // Check if we are updating the MLR yet
00156 
00157         // these values are discussed in Section A.3 of RFC 1889
00158 
00159         uint32_t extended_max = flow->source.cycles + flow->source.max_seq;
00160         uint32_t expected = extended_max - flow->source.base_seq + 1;
00161         // lost (total) = expected - flow->source.received
00162 
00163         // WRT this interval, since last report
00164         uint32_t expected_interval = expected - flow->source.expected_prior;
00165         uint32_t received_interval = flow->source.received
00166                 - flow->source.received_prior;
00167 
00168         flow->source.expected_prior = expected;
00169         flow->source.received_prior = flow->source.received;
00170 
00171         // lost this interval:
00172         flow->mdi_mlr = expected_interval - received_interval;
00173 
00174         // Calculate the DF, store and save
00175         flow->mdi_df = (flow->vb_max - flow->vb_min) / (double) flow->rate;
00176 
00177         /*
00178          * Really we don't need to save the mdi_df, but we do here anyway
00179          * in case we want to reference it in the future.
00180          */
00181 
00182         // Report the (previous timeframe's) MDI stats (DF and MLR)
00183         // no report if 0, which could happen after slave takes over as master
00184         if (flow->mdi_df != 0.0) {
00185             notify_stat_update(flow->daddr, flow->dport, flow->mdi_df,
00186                     flow->mdi_mlr, ssid);
00187         }
00188 
00189         flow->pl_sum = 0;
00190         flow->vb_max = 0.0;
00191         flow->vb_min = 0.0;
00192         flow->vb_pre = 0.0;
00193         flow->vb_post = 0.0;
00194     }
00195 
00196     // Update information related to the MDI DF
00197 
00198     if (rh->version != RTP_VERSION) {
00199         DLOG(LOG_WARNING, "%s: Found a UDP datagram without a valid "
00200             "RTP header", __func__);
00201         return;
00202     }
00203 
00204     // Update information related to the MDI DF (length)
00205     pl_len = length - (sizeof(rtp_hdr_t) + (rh->cc * 4));
00206 
00207     if (pl_len < 0) {
00208         DLOG(LOG_WARNING, "%s: Found an RTP header without any payload (or "
00209             "an invalid packet for monitoring)", __func__);
00210         return;
00211     }
00212 
00213     // Update information related to the MDI MLR
00214     if (flow->ssrc == rh->ssrc) {
00215         update_seq(&flow->source, rh->seq);
00216     } else {
00217         // don't even know if it's a valid RTP stream, so don't bother
00218         // with the MLR
00219         flow->mdi_mlr = 0;
00220 
00221         // init these to compare to the next packet
00222         flow->ssrc = rh->ssrc;
00223         flow->source.probation = MIN_SEQUENTIAL;
00224         flow->source.max_seq = rh->seq;
00225     }
00226 
00227     pl_len -= (pl_len / MPEG_TS_PACKET_BYTES) * MPEG_TS_HEADER_BYTES;
00228 
00229     if (pl_len < 0) {
00230         DLOG(LOG_WARNING, "%s: Found a UDP/RTP datagram without at least one"
00231             " MPEG TS packet in it (or an invalid packet for monitoring)",
00232                 __func__);
00233         return;
00234     }
00235 
00236     // ... Continue updating information related to the MDI DF
00237 
00238     tmp = (double) flow->rate * rel_time;
00239 
00240     if ((double) flow->pl_sum > tmp) { // want a positive/abs value
00241         flow->vb_pre = (double) flow->pl_sum - tmp;
00242     } else {
00243         flow->vb_pre = tmp - (double) flow->pl_sum;
00244     }
00245 
00246     flow->vb_post = flow->vb_pre + pl_len;
00247     flow->pl_sum += (pl_len << 3); // need bits not bytes *8 = <<3
00248 
00249     if (flow->vb_max == 0 && flow->vb_min == 0) {
00250         // first observed packet in timeframe
00251         flow->vb_max = flow->vb_post;
00252         flow->vb_min = flow->vb_pre;
00253     } else {
00254         // update max and min
00255         if (flow->vb_post > flow->vb_max) {
00256             flow->vb_max = flow->vb_post;
00257         }
00258         if (flow->vb_pre < flow->vb_min) {
00259             flow->vb_min = flow->vb_pre;
00260         }
00261     }
00262 }
00263 
00264 /*** GLOBAL/EXTERNAL Functions ***/
00265 
00280 status_t
00281 pullup_bytes(struct jbuf ** pkt_buf, uint16_t num_bytes)
00282 {
00283     struct jbuf * tmp_buf;
00284 
00285     if (jbuf_particle_get_data_length(*pkt_buf) < num_bytes) {
00286         tmp_buf = jbuf_pullup((*pkt_buf), num_bytes);
00287 
00288         if (!tmp_buf) { // check in case it failed
00289             DLOG(LOG_ERR,
00290                     "%s: jbuf_pullup() of %d failed on jbuf of length %d",
00291                     __func__, num_bytes, jbuf_total_len(*pkt_buf));
00292             return EFAIL;
00293         }
00294 
00295         *pkt_buf = tmp_buf;
00296     }
00297     return SUCCESS;
00298 }
00299 
00313 void
00314 process_packet(struct jbuf * jb,
00315                flow_entry_t * flow,
00316                uint16_t ssid)
00317 {
00318     struct jbuf * jb2;
00319     struct ip * ip_pkt = jbuf_to_d(jb, struct ip *);
00320     struct udphdr * udp_hdr = (struct udphdr *) ((uint32_t *) ip_pkt
00321             + ip_pkt->ip_hl);
00322     uint64_t fpga_ts = jbuf_get_hw_timestamp(jb);
00323 
00324     flow->age_ts = get_current_time();
00325 
00326     if (flow->rate != 0) { // is it monitored
00327 
00328         if (!pullup_bytes(&jb, (ip_pkt->ip_hl * 4) + sizeof(struct udphdr)
00329                 + sizeof(rtp_hdr_t))) {
00330 
00331             // pulled up RTP header
00332 
00333             // refresh pointer into jbuf data for ip and udp header
00334             ip_pkt = jbuf_to_d(jb, struct ip *);
00335             udp_hdr = (struct udphdr *) ((uint32_t *) ip_pkt + ip_pkt->ip_hl);
00336 
00337             update_stats_for_flow((rtp_hdr_t *) ((uint8_t *) udp_hdr
00338                     + sizeof(struct udphdr)), flow, jbuf_total_len(jb)
00339                     - ((ip_pkt->ip_hl * 4) + sizeof(struct udphdr)), ssid,
00340                     &fpga_ts);
00341         } else {
00342             DLOG(LOG_NOTICE, "%s: Couldn't monitor UDP datagram because there "
00343                 "were not enough bytes to form an RTP header", __func__);
00344         }
00345     }
00346 
00347     if (flow->maddr != 0) { // is it mirrored anywhere
00348 
00349         jb2 = jbuf_dup(jb);
00350         if (!jb2) {
00351             DLOG(LOG_ERR, "%s: Failed to dup a packet for mirroring", __func__);
00352             return;
00353         }
00354 
00355         ip_pkt = jbuf_to_d(jb2, struct ip *);
00356         udp_hdr = (struct udphdr *) ((uint32_t *) ip_pkt + ip_pkt->ip_hl);
00357 
00358         // adjust checksums taking new dest IP addresses into account
00359 
00360         checksum_adjust((unsigned char *) &ip_pkt->ip_sum,
00361                 (unsigned char *) &ip_pkt->ip_dst, sizeof(in_addr_t),
00362                 (unsigned char *) &flow->maddr, sizeof(in_addr_t));
00363 
00364         checksum_adjust((unsigned char *) &udp_hdr->uh_sum,
00365                 (unsigned char *) &ip_pkt->ip_dst, sizeof(in_addr_t),
00366                 (unsigned char *) &flow->maddr, sizeof(in_addr_t));
00367 
00368         // change destination address
00369         ip_pkt->ip_dst.s_addr = flow->maddr;
00370 
00371         jbuf_setvrf(jb2, flow->m_vrf);
00372 
00373         if (msp_send_packet(jb2) != MSP_OK) { // send bypasses plug-in chain
00374             DLOG(LOG_ERR,
00375                     "%s: Failed to send a duplicated packet for mirroring",
00376                     __func__);
00377             jbuf_free(jb2);
00378         }
00379     }
00380 }

2007-2009 Juniper Networks, Inc. All rights reserved. The information contained herein is confidential information of Juniper Networks, Inc., and may not be used, disclosed, distributed, modified, or copied without the prior written consent of Juniper Networks, Inc. in an express license. This information is subject to change by Juniper Networks, Inc. Juniper Networks, the Juniper Networks logo, and JUNOS are registered trademarks of Juniper Networks, Inc. in the United States and other countries. All other trademarks, service marks, registered trademarks, or registered service marks are the property of their respective owners.
Generated on Sun May 30 20:27:09 2010 for SDK Your Net Corporation Monitube2 IPTV Monitoring Example: monitube2-plugin 1.0 by Doxygen 1.5.1