Commit f5481a42 authored by 's avatar
Browse files

Packetloss application and exprired flows lib added


git-svn-id: file:///home/svn/mapi/trunk@906 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent 864ce0d5
CC = gcc
CFLAGS = -g -O2 -W -Wall
LDFLAGS = -lmapi -lcurses -lrrd
SOURCES=$(wildcard *.c)
TARGETS=$(SOURCES:.c=)
all: $(TARGETS)
% : %.c
$(CC) $(CFLAGS) -o $@ $< $(LDFLAGS)
clean:
rm -rf $(TARGETS) $(TARGETS).cgi $(TARGETS).rrd
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
#include <mapi/expiredflowshash.h>
#include <netinet/in.h>
#include <ncurses.h>
#include <rrd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>
#include "mapi.h"
#define SLEEP_TIME 1 //sleeping time
#define TIMEOUT 500 //time to drop an unmatched flow
#define HASHTABLE_SIZE 4096
#define MAX_INTERFACES 16
#define RRD_FILENAME "packetloss.rrd"
static void terminate();
int fd, fid1;
mapi_results_t *results;
unsigned long long totalpkloss = 0,pkcount0 = 0, pkcount1 = 0;
int with_ncurses = 1;
struct hash_bucket {
struct hash_bucket *next;
struct flow_data data;
unsigned int value, flow;
};
struct hash_bucket *hashtable[HASHTABLE_SIZE];
struct hash_bucket *compare_bucket(struct hash_bucket *bucket, struct flow_data *data);
unsigned long long add_bucket(struct flow_data *data, unsigned int flow);
int compare_ip(ip_addr ip1, ip_addr ip2);
void my_rrd_create(char *filename, int num);
void my_rrd_update(char *, char *);
void my_rrd_graph(char *filename, char *data); /* not used */
int create_cgi(int argc, char **argv);
int main(int argc, char *argv[]){
unsigned int *i, count, j;
unsigned long long pkloss = 0;
char argvs[500];
int row, col;
char sample[512], *p, *tmp1, *tmp2;
signal(SIGINT, terminate);
signal(SIGQUIT, terminate);
signal(SIGTERM, terminate);
if( argc != 3 ) {
fprintf(stderr, "Wrong arguments\n Usage: ./application hostA:interface hostB:interface \n");
exit(1);
}
sprintf(argvs, "%s, %s", argv[1], argv[2]);
tmp1 = strchr(argv[1], ':');
tmp2 = strchr(argv[2], ':');
if( (tmp1 == NULL) || (tmp2 == NULL) ) {
fprintf(stderr, "Error!\n You must provide an interface for each IP!\n");
exit(1);
}
sprintf(argvs, "%s, %s", argv[1], argv[2]);
fd=mapi_create_flow(argvs);
if(fd<=0){
printf("CREATE_FLOW ERROR\n");
exit(0);
}
*tmp1 = *tmp2 = '\0';
sprintf(argvs, "host %s and host %s", argv[1], argv[2]);
*tmp1 = *tmp2 = ':';
fid1=mapi_apply_function(fd,"BPF_FILTER", argvs);
if(fid1<0){
printf("APPLY BPF FILTER ERROR\n");
exit(0);
}
fid1=mapi_apply_function(fd,"EXPIRED_FLOWS");
if(fid1<0){
printf("APPLY EXPIRED_FLOWS FILTER ERROR\n");
exit(0);
}
if(mapi_connect(fd) <0){
printf("MAPI_CONNECT ERROR\n");
exit(0);
}
if(with_ncurses) {
initscr(); // start curses mode
getmaxyx(stdscr,row,col); // get the number of rows and columns
}
my_rrd_create(RRD_FILENAME, 2);
create_cgi(argc, argv);
/* Infinite Loop */
while(1)
{
sleep(SLEEP_TIME);
p = sample;
results = (mapi_results_t*)mapi_read_results(fd,fid1);
for(j = 0; j < 2; j++) {
i = (unsigned int *)(results[j].res);
for(count = *i, i=(unsigned int *)((char *)i+sizeof(unsigned int)); count > 0 ; count--) {
if( (pkloss = add_bucket((struct flow_data *)i, j) )!= 0 ) {
if( j == 0 )
pkcount0 += ((struct flow_data *)i)->packets_count;
else if( j == 1 )
pkcount1 += ((struct flow_data *)i)->packets_count;
i = (unsigned int *)((struct flow_data *)i + 1);
}
}
}
if(with_ncurses) {
mvprintw(row/2-1, 8, "Total packets loss = %lld\n Total packets received = %lld/%lld", totalpkloss, pkcount0, pkcount1);
refresh();
}
p += snprintf(sample, 24, "%u:%lld", (unsigned int)time(0), totalpkloss);
p += snprintf(p, 16, ":0");
p += snprintf(p, 16, ":0");
my_rrd_update(RRD_FILENAME, sample);
}
return 0;
}
void drop_old_flows(void) {
struct hash_bucket *tmp, *prev;
int i;
struct timeval *tp;
unsigned long long current_ts = 0;
tp = (struct timeval *)malloc(sizeof(struct timeval));
if(gettimeofday(tp, NULL) ==0) {
// calculate the current timestamp
current_ts = (((unsigned long long)tp->tv_sec)<<32)+((tp->tv_usec*4295) & 0xffffffff);
}
else {
fprintf(stderr, "Error! gettimeofday failed!\nExiting...\n");
exit(-1);
}
for(i = 0; i < HASHTABLE_SIZE; ++i) {
tmp = hashtable[i];
prev = NULL;
while(tmp != NULL) {
if(current_ts - tmp->data.timestamp > ((unsigned long long)(TIMEOUT))<<32) {
if( prev != NULL )
prev = tmp->next;
free(tmp);
tmp = NULL;
}
tmp = tmp->next;
}
}
free(tp);
return;
}
void terminate() {
int i;
struct hash_bucket *tmp;
if(with_ncurses)
endwin(); /* end curses */
fprintf(stdout, "Terminated\n");
for(i = 0; i < HASHTABLE_SIZE; ++i) {
tmp = hashtable[i];
while(tmp != NULL) {
fprintf(stdout, "flow[%d] : saddr = %d.%d.%d.%d, daddr = %d.%d.%d.%d, sport = %d, dport = %d, packets = %lld\n", tmp->flow, (u_int32_t)((struct flow_data *)&(tmp->data))->saddr.byte1, (u_int32_t)((struct flow_data *)&(tmp->data))->saddr.byte2, (u_int32_t)((struct flow_data *)&(tmp->data))->saddr.byte3, (u_int32_t)((struct flow_data *)&(tmp->data))->saddr.byte4, (u_int32_t)((struct flow_data *)&(tmp->data))->daddr.byte1, (u_int32_t)((struct flow_data *)&(tmp->data))->daddr.byte2, (u_int32_t)((struct flow_data *)&(tmp->data))->daddr.byte3, (u_int32_t)((struct flow_data *)&(tmp->data))->daddr.byte4, ((struct flow_data *)&(tmp->data))->sport, ((struct flow_data *)&(tmp->data))->dport, ((struct flow_data *)&(tmp->data))->packets_count);
tmp = tmp->next;
}
}
if( pkcount0 != 0)
fprintf(stdout, "Total Packets lost = %lld (%.2lf%%)\n", totalpkloss, (float)(((float)(totalpkloss*100))/((float)pkcount1)));
else
fprintf(stdout, "Total Packets lost = %lld\n", totalpkloss);
fprintf(stdout, "Flow[0] send %lld packets.\n", pkcount0);
fprintf(stdout, "Flow[1] send %lld packets.\n", pkcount1);
mapi_close_flow(fd);
exit(EXIT_SUCCESS);
}
int compare_ip(ip_addr ip1, ip_addr ip2) {
if( (ip1.byte1 == ip2.byte1) && (ip1.byte2 == ip2.byte2) && (ip1.byte3 == ip2.byte3) && (ip1.byte4 == ip2.byte4) )
return(1);
return(0);
}
struct hash_bucket *compare_bucket(struct hash_bucket *bucket, struct flow_data *data) {
while(bucket) {
// (ip1.port1, ip2.port2) is the same flow as (ip2.port2, ip1.port1)
if( compare_ip(bucket->data.saddr, data->saddr) && compare_ip(bucket->data.daddr, data->daddr) ) {
if( (bucket->data.ptcl == IPPROTO_UDP) || (bucket->data.ptcl == IPPROTO_TCP) ) {
if((bucket->data.sport == data->sport) && (bucket->data.dport == data->dport) )
return bucket;
else
return(NULL);
}
else
return bucket;
}
if( compare_ip(bucket->data.saddr, data->daddr) && compare_ip(bucket->data.daddr, data->saddr) ){
if( (bucket->data.ptcl == IPPROTO_UDP) || (bucket->data.ptcl == IPPROTO_TCP) ) {
if((bucket->data.sport == data->dport) && (bucket->data.dport == data->sport) )
return bucket;
else
return(NULL);
}
else
return bucket;
}
bucket = bucket->next;
}
return(bucket);
}
unsigned long long add_bucket(struct flow_data *data, unsigned int flow) {
unsigned int value = 0, pos, ret;
ip_addr saddr = data->saddr, daddr = data->daddr;
u_short sport = data->sport, dport = data->dport;
struct hash_bucket *tmp, *prev, *tmp2;
struct timeval *tp;
unsigned long long current_ts = 0;
tp = (struct timeval *)malloc(sizeof(struct timeval));
if( (data->ptcl == IPPROTO_UDP) || (data->ptcl == IPPROTO_TCP) ) {
value = ((unsigned int)((saddr.byte1 + saddr.byte2 + saddr.byte3 + saddr.byte4)*sport + (daddr.byte1 + daddr.byte2 + daddr.byte3 + daddr.byte4)*dport))/137;
}
else
value = ((unsigned int)((saddr.byte1 + saddr.byte2 + saddr.byte3 + saddr.byte4) + (daddr.byte1 + daddr.byte2 + daddr.byte3 + daddr.byte4)))/137;
pos = value%HASHTABLE_SIZE;
// Flow already exist?
if((tmp = compare_bucket(hashtable[pos], data)) != NULL) {
if(tmp->flow == flow) {
tmp->data.packets_count += data->packets_count;
if(gettimeofday(tp, NULL) ==0) {
// calculate the current timestamp
current_ts = (((unsigned long long)tp->tv_sec)<<32)+((tp->tv_usec*4295) & 0xffffffff);
tmp->data.timestamp = current_ts;
}
return(0);
}
else {
if( tmp->data.packets_count > data->packets_count ) {
ret = tmp->data.packets_count - data->packets_count;
}
else
ret = data->packets_count -tmp->data.packets_count;
// remove node first
prev = hashtable[pos];
// if tmp == head
if(prev == tmp) {
hashtable[pos] = NULL;
free(tmp);
return(ret);
}
// find tmp
while(prev->next != tmp) {
prev = prev->next;
}
prev->next = tmp->next;
free(tmp);
return(ret);
}
}
// New flow
tmp = (struct hash_bucket *)malloc(sizeof(struct hash_bucket));
memcpy(&(tmp->data), data, sizeof(struct flow_data));
tmp->value = value;
tmp->flow = flow;
tmp->next = NULL;
if(gettimeofday(tp, NULL) ==0) {
// calculate the current timestamp
current_ts = (((unsigned long long)tp->tv_sec)<<32)+((tp->tv_usec*4295) & 0xffffffff);
tmp->data.timestamp = current_ts;
}
prev = hashtable[pos];
if( prev != NULL) {
tmp2 = prev->next;
while(tmp2 != NULL) {
prev = tmp2;
tmp2 = tmp2->next;
}
prev->next = tmp;
}
else {
hashtable[pos] = tmp;
}
//fprintf(stdout, "flow = %d, packets = %llu\n", flow, data->packets_count);
return(0);
}
void my_rrd_create(char *filename, int num) {
int ret, i;
char start[20];
char tmp[64];
char **argv;
argv = malloc((9+num)*sizeof(char *));
argv[0] = strdup("create exprflows.rrd");
argv[1] = strdup("--start");
snprintf(start, 20, "%u", (unsigned int)time(0));
argv[2] = start;
argv[3] = strdup("--step");
argv[4] = strdup("1");
argv[5] = filename;
argv[6] = strdup("DS:total:GAUGE:600:-100000000:10000000000000");
for (i=0; i<num; ++i) {
snprintf(tmp, 64, "DS:flow%d:GAUGE:600:-100000000:10000000000000", i+1);
argv[7+i] = strdup(tmp);
}
argv[7+num] = strdup("RRA:AVERAGE:0.5:1:300"); /* no average, keep 300 samples (5 min) */
argv[7+num+1] = NULL;
optind = opterr = 0; /* reset optind/opterr */
ret = rrd_create(9+num-1,argv); /* try to create rrd */
if(rrd_test_error()) { /* look for errors */
printf("rrd_create: %s\n",rrd_get_error());
rrd_clear_error();
}
}
void my_rrd_update(char *filename, char *data) {
int ret, argc = 3;
char *argv[] = { /* update template */
"update",
NULL, /* 3 filename */
NULL, /* 4 data */
NULL
};
argv[1] = filename; /* set filename */
argv[2] = data; /* set data */
optind = opterr = 0; /* reset optind/opterr */
ret = rrd_update(argc,argv); /* try to update rrd */
//printf("rrd_update: %d\n", ret); /* print result */
if(rrd_test_error()) { /* look for errors */
printf("rrd_update: %s\n",rrd_get_error());
rrd_clear_error();
}
}
void my_rrd_graph(char *filename, char *data) {
int ret, argc = 3;
char *argv[] = { /* update template */
"update",
NULL, /* 3 filename */
NULL, /* 4 data */
NULL
};
argv[1] = filename; /* set filename */
argv[2] = data; /* set data */
optind = opterr = 0; /* reset optind/opterr */
ret = rrd_update(argc,argv); /* try to update rrd */
//printf("rrd_update: %d\n", ret); /* print result */
if(rrd_test_error()) { /* look for errors */
printf("rrd_graph: %s\n",rrd_get_error());
rrd_clear_error();
}
}
/*
<!-- <META HTTP-EQUIV=\"Refresh\" CONTENT=1> -->\n\
*/
static char cgi1[] = "\
#!/usr/bin/rrdcgi\n\
<HTML>\n<HEAD>\n\
<TITLE>LOBSTER - Distributed PacketLoss</TITLE>\n\
<script language=\"JavaScript\">\n\
<!--var time = null\n\
function move() {\n\
window.location = 'http://139.91.70.50/cgi-bin/packetloss.cgi';\n\
} //-->\n\
</script>\n\
<style type=\"text/css\">\n\
<!-- body { background-color: #ffffff; } -->\n\
</style>\n\n\
</HEAD>\n<BODY onload=\"timer=setTimeout('move()',1000)\">\n<center>\n\
<H1><font color=#0000FF>LOBSTER</font> - Distributed PacketLoss</H1>\n\
<P>\n<RRD::GRAPH\n\
/var/www/img/exprflows.png\n\
--width 600 --height 300 --lower-limit 0\n\
--font UNIT:10:/usr/share/fonts/truetype/freefont/FreeMono.ttf\n\
--font TITLE:11:/usr/share/fonts/truetype/freefont/FreeMono.ttf\n\
--imginfo '<IMG SRC=/img/%s WIDTH=%lu HEIGHT=%lu >'\n\
--end now --start end-300s --lazy\n\
--title \"Packetloss\" --vertical-label \"Packet Loss Ratio\"\n\
--slope-mode --interlaced\n";
int create_cgi(int argc, char **argv) {
FILE *fp;
char cwd[512], *p;
static char *colors[] = {"FFFF00", "0066FF", "00EE00", "FF0000", "FF9900", "9900FF", "00EEFF"};
int i;
if((fp = fopen("packetloss.cgi","w")) == NULL){
printf("\n---> ERROR: Can't create output file\n\n");
return -1;
}
chmod("exprflows.cgi", S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
getcwd(cwd, 512); //FIXME
fprintf(fp, "%s", cgi1);
fprintf(fp, " DEF:total=%s/%s:total:AVERAGE\n", cwd, RRD_FILENAME);
for(i=1; i<argc; ++i) {
fprintf(fp, " DEF:flow%d=%s/%s:flow%d:AVERAGE\n", i, cwd, RRD_FILENAME, i);
}
for(i=1; i<argc; ++i) {
p = strchr(argv[i], ':'); // !@#!#$#@$%!^$!
*p = ' ';
fprintf(fp, " AREA:flow%d#%s:\"%s\":STACK\n", i, colors[i-1], argv[i]);
}
fprintf(fp, " AREA:total#FFFF00>\n");
fprintf(fp, "</P>\n</cetner>\n</BODY>\n</HTML>\n");
fclose(fp);
return 0;
}
......@@ -17,6 +17,7 @@ extraflib.c \
pcapio.h \
protocols.h \
regexp.c \
topx.c
topx.c \
exprflow.c
pkginclude_HEADERS = topx.h
pkginclude_HEADERS = topx.h expiredflowshash.h
#define EXFL_HASH_SIZE 4096
#include "protocols.h"
typedef struct flow_data {
u_char ptcl;
unsigned long long packets_count, bytes_count;
ip_addr saddr, daddr;
u_short sport, dport;
unsigned long long timestamp;
}flow_data;
struct exfl_list_node {
unsigned int value;
/* info that define a flow */
struct flow_data flow;
struct exfl_list_node *next;
struct exfl_list_node *previous;
};
struct exfl_hash_node {
unsigned int value;
struct exfl_list_node *node;
struct exfl_hash_node *next;
};
struct exfl_data {
int list_size;
struct exfl_list_node *list_head;
struct exfl_list_node *list_tail;
struct exfl_hash_node *hashtable[EXFL_HASH_SIZE];
struct exfl_list_node *expired_flows_head;
struct exfl_list_node *expired_flows_tail;
pthread_mutex_t mutex;
pthread_t pthread;
unsigned int run;
};
#include <stdlib.h>
#include <stdio.h>
#include <sys/shm.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <sys/time.h>
#include <unistd.h>
#include <netinet/in.h>
#include "mapidflib.h"
#include "mapidlib.h"
#include "mapidevices.h"
#include "mapid.h"
#include "fhelp.h"
#include "expiredflowshash.h"
#define EXPIRED_FLOWS "EXPIRED_FLOWS"
#define SEND_SIZE 64 //Number of expired flows send per mapi_read_results.
#define TIMEOUT 30
#define SHM_FLOWS 25 //shm flows count
pthread_t pthread;
typedef struct shm {
unsigned int *size;
flow_data *Table;
pthread_mutex_t *smmutex;
}shm;
void add_toflow(struct exfl_data *data, ip_addr saddr, ip_addr daddr, u_short sport, u_short dport, unsigned long long timestamp, u_char ptcl);
static struct exfl_hash_node *exfl_hash_lookup(unsigned int value, struct exfl_data *data);
void exfl_add_to_hashtable_and_list(struct exfl_data *data, unsigned int value, ip_addr saddr, ip_addr daddr, u_short sport, u_short dport, unsigned long long timestamp, u_char ptcl);
void shift_node(struct exfl_data *data, struct exfl_list_node *node);
void poll_expired_flows(mapidflib_function_instance_t *instance);
void add_toflow(struct exfl_data *data, ip_addr saddr, ip_addr daddr, u_short sport, u_short dport, unsigned long long timestamp, u_char ptcl) {
struct exfl_hash_node *lookup;
unsigned int value = 0;
if( (ptcl == IPPROTO_TCP) || (ptcl == IPPROTO_UDP) ) {
value = ((unsigned int)((saddr.byte1 + saddr.byte2 + saddr.byte3 + saddr.byte4)*ntohs(sport) + (daddr.byte1 + daddr.byte2 + daddr.byte3 + daddr.byte4)*ntohs(dport)))/137;
}
else {
value = ((unsigned int)((saddr.byte1 + saddr.byte2 + saddr.byte3 + saddr.byte4) + (daddr.byte1 + daddr.byte2 + daddr.byte3 + daddr.byte4)*ntohs(dport)))/137;
}
pthread_mutex_lock( &(data->mutex) );
lookup = exfl_hash_lookup(value, data);
if(lookup == NULL) {
pthread_mutex_unlock( &(data->mutex) );
exfl_add_to_hashtable_and_list(data, value, saddr, daddr, sport, dport, timestamp, ptcl);
}
else {
lookup->node->flow.packets_count++;
lookup->node->flow.timestamp = timestamp;
shift_node(data, lookup->node);
pthread_mutex_unlock( &(data->mutex) );
}
}
void shift_node(struct exfl_data *data, struct exfl_list_node *node) {
struct exfl_list_node *previous;
if( data->list_head == node)
return;
else {
previous = node->previous;
// remove node from list
previous->next = node->next;
if(node->next)
node->next->previous = previous;
else {
//Tote node->next == NULL;
data->list_tail = previous;
}
// Add node at the start of the list
data->list_head->previous = node;
node->next = data->list_head;
node->previous = NULL;
data->list_head = node;
}