exprflow.c 21.5 KB
Newer Older
1 2 3 4 5 6 7 8
#include <stdlib.h>
#include <stdio.h>
#include <sys/shm.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <sys/time.h>
#include <unistd.h>
's avatar
committed
9
#include <pcap.h>
10 11 12 13 14 15 16
#include <netinet/in.h>

#include "mapidflib.h"
#include "mapidlib.h"
#include "mapidevices.h"
#include "mapid.h"
#include "fhelp.h"
17
#include "debug.h"
18 19
#include "expiredflowshash.h"

's avatar
committed
20 21 22
#define EXPIRED_FLOWS "EXPIRED_FLOWS"
#define TIMEOUT 30			//in epochs
#define EPOCHDURATION 1
23

24 25
#define ETHERTYPE_8021Q 0x8100
#define MPLS_MASK 0x8847
's avatar
committed
26
#define ETHERTYPE_IP 0x0800  /* IP protocol */
27

28

29 30 31 32 33
struct vlan_802q_header {
	u_int16_t priority_cfi_vid;
	u_int16_t ether_type;
};

34 35 36 37 38 39 40 41
pthread_t pthread;

typedef struct shm {
	unsigned int *size;
	flow_data *Table;
	pthread_mutex_t *smmutex;
}shm;

42

's avatar
committed
43 44 45
void add_toflow(struct exfl_data *data, eflow_data record, flows_stat *stats);
static struct exfl_hash_node *exfl_hash_lookup(unsigned int value, struct exfl_data *data, eflow_data record);
void exfl_add_to_hashtable_and_list(struct exfl_data *data, unsigned int value, eflow_data record);
46 47
void shift_node(struct exfl_data *data, struct exfl_list_node *node);
void poll_expired_flows(mapidflib_function_instance_t *instance);
's avatar
committed
48
void print_flow(eflow_data *data);
49

's avatar
committed
50
inline unsigned int hash_function(eflow_data record) {
51 52
	ip_addr saddr = record.saddr;
	ip_addr daddr = record.daddr;
53

54 55
	return ((unsigned int)((saddr.byte1 + saddr.byte2 + saddr.byte3 + saddr.byte4) * (record.sport + 1) +
		                     (daddr.byte1 + daddr.byte2 + daddr.byte3 + daddr.byte4) * (record.dport + 2))) / 137;
56 57
}

's avatar
committed
58
void add_toflow(struct exfl_data *data, eflow_data record, flows_stat *stats) {
59 60 61

	struct exfl_hash_node *lookup;
	unsigned int value = hash_function(record);
's avatar
committed
62 63

	pthread_mutex_lock( &(data->mutex) );
64

's avatar
committed
65
	lookup = exfl_hash_lookup(value, data, record);
's avatar
committed
66

67 68
	if(lookup == NULL) {
		pthread_mutex_unlock( &(data->mutex) );
69
		exfl_add_to_hashtable_and_list(data, value, record);
70
		stats->received++;
71 72 73
	}
	else {
		lookup->node->flow.packets_count++;
74
		lookup->node->flow.bytes_count += record.bytes_count;
's avatar
committed
75
		lookup->node->flow.epoch = data->epoch;
76 77 78 79

		shift_node(data, lookup->node);
		pthread_mutex_unlock( &(data->mutex) );
	}
80
	stats->packets.received++;
's avatar
committed
81
}
82 83 84 85 86 87 88

void shift_node(struct exfl_data *data, struct exfl_list_node *node) {
	struct exfl_list_node *previous;

	if( data->list_head == node)
		return;
	else {
's avatar
committed
89

90 91 92 93 94 95
		previous = node->previous;
		// remove node from list
		previous->next = node->next;
		if(node->next)
			node->next->previous = previous;
		else {
's avatar
committed
96
		//Tote node->next == NULL;
97 98 99 100 101 102 103
			data->list_tail = previous;
		}
		// Add node at the start of the list
		data->list_head->previous = node;
		node->next = data->list_head;
		node->previous = NULL;
		data->list_head = node;
's avatar
committed
104

105 106 107
	}
}

's avatar
committed
108
void exfl_add_to_hashtable_and_list(struct exfl_data *data, unsigned int value, eflow_data record) {
's avatar
committed
109

110 111 112
	struct exfl_list_node *newlistnode;
	struct exfl_hash_node *newhashnode;
	unsigned int pos;
's avatar
committed
113 114

	// Create a new list node
's avatar
committed
115
	if( (newlistnode = (struct exfl_list_node *)malloc(sizeof(struct exfl_list_node))) == NULL) {
116
		DEBUG_CMD(Debug_Message("Malloc failed for size %d", sizeof(struct exfl_list_node)));
's avatar
committed
117
		return;
's avatar
committed
118
	}
119
	newlistnode->value = value;
120
	newlistnode->flow = record;
121 122 123 124 125
	newlistnode->flow.packets_count = 1;
	newlistnode->next = newlistnode->previous = NULL;

	// Create a new hash node
	pos = value%EXFL_HASH_SIZE;
's avatar
committed
126
	if( (newhashnode = (struct exfl_hash_node *)malloc(sizeof(struct exfl_hash_node))) == NULL) {
's avatar
committed
127
		DEBUG_CMD(Debug_Message("Malloc failed for size %d", sizeof(struct exfl_hash_node)));
's avatar
committed
128 129 130
		return;
	}

131 132 133
	newhashnode->value = value;
	newhashnode->node = newlistnode;
	newhashnode->next = data->hashtable[pos];
's avatar
committed
134
	newhashnode->prev = NULL;
135 136 137

	//add to list
	pthread_mutex_lock( &(data->mutex) );
's avatar
committed
138

139 140 141 142 143 144 145 146 147
	data->list_size++;
	if(data->list_tail == NULL) {
		data->list_head = data->list_tail = newlistnode;
	}
	else {
		data->list_head->previous = newlistnode;
		newlistnode->next = data->list_head;
		data->list_head = newlistnode;
	}
's avatar
committed
148

149
	//add to hashtable
's avatar
committed
150 151
	if(data->hashtable[pos] != NULL)
		data->hashtable[pos]->prev = newhashnode;
152
	data->hashtable[pos] = newhashnode;
's avatar
committed
153

154 155 156
	pthread_mutex_unlock( &(data->mutex) );
}

's avatar
committed
157 158
int compare_ip(ip_addr ip1, ip_addr ip2)
{
's avatar
committed
159 160 161
	if ((ip1.byte1 == ip2.byte1) && (ip1.byte2 == ip2.byte2) && (ip1.byte3 == ip2.byte3) && (ip1.byte4 == ip2.byte4))
		return (1);
	return (0);
's avatar
committed
162 163 164
}


's avatar
committed
165
struct exfl_hash_node *exfl_hash_lookup(unsigned int value, struct exfl_data *data, eflow_data record) {
166 167 168 169
	unsigned int pos;
	struct exfl_hash_node *tmp;
	pos = value%EXFL_HASH_SIZE;
	tmp = data->hashtable[pos];
's avatar
committed
170
	eflow_data *hashflow;
's avatar
committed
171

172
	while(tmp) {
's avatar
committed
173
		if(tmp->value == value){
's avatar
committed
174
			hashflow = &(tmp->node->flow);
's avatar
committed
175
			if(record.ptcl == hashflow->ptcl ){
's avatar
committed
176
				if(compare_ip(record.saddr, hashflow->saddr) && compare_ip(record.daddr, hashflow->daddr)) {
's avatar
committed
177
					if((record.sport == hashflow->sport) && (record.dport == hashflow->dport)) return tmp;
's avatar
committed
178
				}
's avatar
committed
179 180
			}
		}
181 182 183 184 185 186 187 188
		tmp = tmp->next;
	}
	return(NULL);
}

int checkhash(struct exfl_hash_node **hashtable) {
	int i, count = 0;
	struct exfl_hash_node *tmp;
's avatar
committed
189

190 191 192 193 194 195 196 197 198 199 200 201
	for(i = 0; i < EXFL_HASH_SIZE; i++) {
		if(hashtable[i] != NULL) {
			tmp = hashtable[i];
			while(tmp != NULL) {
				count++;
				tmp = tmp->next;
			}
		}
	}
	return(count);
}

's avatar
committed
202
/*
203 204 205
 * Check if there are any records in expired flows list and put them
 * in shared memory if there is enough space.
 */
206
void check_expired_flows(struct exfl_data *data, shm shm_struct) {
207
	struct exfl_list_node *tmp, *tail = data->expired_flows_tail;
's avatar
committed
208

209
	while( (tail != NULL) && (*(shm_struct.size) != data->shm_flows) ) {
210
		// Add the expired flow directly into shared memory table
's avatar
committed
211
		pthread_mutex_lock( shm_struct.smmutex );
212 213 214 215
		memcpy(&(shm_struct.Table[*(shm_struct.size)]), &(tail->flow), sizeof(struct flow_data));
		(*(shm_struct.size))++;
		pthread_mutex_unlock( shm_struct.smmutex );
		// Remove node from the expired flows list
216
		data->expired_flows_list_size--;
217
		tmp = tail->previous;
's avatar
committed
218 219
		if(tmp != NULL)
			tmp->next = NULL;
220 221 222
		free(tail);
		tail = tmp;
	}
's avatar
committed
223

224 225 226 227 228 229 230 231 232 233
	if( tail == NULL ) {
		data->expired_flows_head = data->expired_flows_tail = NULL;
	}
	else
		data->expired_flows_tail = tail;
}


void poll_expired_flows(mapidflib_function_instance_t *instance) {
	struct exfl_data *data = instance->internal_data;
234
	struct exfl_list_node *tmp, *previous;
235
	struct exfl_hash_node *lookup_hash;
's avatar
committed
236
	eflow_data tmpeflow_data;
237
	flows_stat *stats;
238 239 240 241 242
	unsigned int value;
	pthread_mutex_t *mutex = &(data->mutex);
	int check;
	shm shm_struct;

243
	stats = (flows_stat *) instance->result.data;
244
	shm_struct.size = (unsigned int *)instance->result.data;
's avatar
committed
245 246
	shm_struct.Table = (flow_data *)((char *)instance->result.data+sizeof(flows_stat));
	shm_struct.smmutex = (pthread_mutex_t *)(((char *)instance->result.data) + sizeof(flows_stat) + sizeof(struct flow_data)*data->shm_flows);
's avatar
committed
247

248 249
	while(data->run) {
		pthread_testcancel();
's avatar
committed
250
		pthread_mutex_lock( mutex );
251
		tmp = data->list_tail;
252
		check_expired_flows(data, shm_struct);
253
		if(tmp) {	//if list is not empty
's avatar
committed
254
				while( data->epoch - tmp->flow.epoch > TIMEOUT) {
255
					stats->expired++;
256
					stats->packets.expired += tmp->flow.packets_count;
257
					previous = tmp->previous;
's avatar
committed
258
					value = tmp->value;
's avatar
committed
259
					tmpeflow_data = tmp->flow;
's avatar
committed
260

's avatar
committed
261
					//remove node from hashtable
's avatar
committed
262
					lookup_hash = exfl_hash_lookup(value, data, tmpeflow_data);
's avatar
committed
263 264 265 266
					if( lookup_hash != NULL) {
							if(data->hashtable[value%EXFL_HASH_SIZE] == lookup_hash) {
								data->hashtable[value%EXFL_HASH_SIZE] = lookup_hash->next;

267 268 269
								if( lookup_hash->prev != NULL ){
									DEBUG_CMD(Debug_Message("hash node is at the head, but previous isn't NULL"));
								}
's avatar
committed
270 271 272 273 274 275 276 277 278 279 280 281 282

								// lookup_hash->next is now at the head
								if( lookup_hash->next != NULL) {
									lookup_hash->next->prev = NULL;
								}
							}
							else {
								lookup_hash->prev->next = lookup_hash->next;
								if( lookup_hash->next != NULL)
									lookup_hash->next->prev = lookup_hash->prev;
							}
							free(lookup_hash);
					}
's avatar
committed
283
					else {
284
						DEBUG_CMD(Debug_Message("hash node not found but list node exist!"));
's avatar
committed
285 286
					}

287 288 289 290 291 292 293 294
					//remove node from list
					if(tmp->previous != NULL) {
						//the node isn't at the head.
						tmp->previous->next = NULL;
						data->list_tail = tmp->previous;
					}
					else {
						check = checkhash(data->hashtable);
295
						if( check != 1)
296 297 298
						data->list_head = NULL;
						data->list_tail = NULL;
					}
's avatar
committed
299
					/*
300
					* When the shared memory segment is full, expired flow records must
's avatar
committed
301
					* be removed from the temporal sorted list, because they are expired
302
					* and for a new packet of this flow, a new record must be created.
's avatar
committed
303
					* In order to achieve this we have a list with all the expired flow
304 305 306
					* records that couldn't be returned.
					*/

307
					if(*(shm_struct.size) == data->shm_flows) { // shm full
308 309 310 311 312
						//remove node from temporal sorted list
						if(tmp->next)
							tmp->next->previous = tmp->previous;
						if(tmp->previous)
							tmp->previous->next = tmp->next;
's avatar
committed
313

314
						// if packets_count is worth to add
's avatar
committed
315
						if(tmp->flow.valid && tmp->flow.packets_count >= ((struct exfl_data *)instance->internal_data)->packets_count_min) {
316 317 318 319 320 321 322 323 324 325 326 327 328
							if(data->expired_flows_list_size < data->expired_flows_list_size_max) { // if buffer not full
								//add node to expired flows list
								data->expired_flows_list_size++;
								if(data->expired_flows_head != NULL) {
									tmp->next = data->expired_flows_head;
									data->expired_flows_head->previous = tmp;
									tmp->previous = NULL;
									data->expired_flows_head = tmp;
								}
								else {
									data->expired_flows_head = data->expired_flows_tail = tmp;
									tmp->previous = tmp->next = NULL;
								}
329 330
							}
							else {
331 332
								// else drop
								stats->dropped++;
333
								stats->packets.dropped += tmp->flow.packets_count;
334
								free(tmp);
335
							}
336 337
						}
						else {
338
							stats->ignored++;
339
							stats->packets.ignored += tmp->flow.packets_count;
340
							free(tmp);
341
						}
's avatar
committed
342

343
					}
344 345
					else { // shm not full
						// if packets_count is worth to add
's avatar
committed
346
						if(tmp->flow.valid && tmp->flow.packets_count >= ((struct exfl_data *)instance->internal_data)->packets_count_min) {
347
							// Add the expired flow from temporal sorted list directly into shared memory table
's avatar
committed
348
							pthread_mutex_lock( shm_struct.smmutex );
349 350
							memcpy(&(shm_struct.Table[*(shm_struct.size)]), &(tmp->flow), sizeof(struct flow_data));
							(*(shm_struct.size))++;
351
							stats->packets.sent += tmp->flow.packets_count;
352 353 354
							pthread_mutex_unlock( shm_struct.smmutex );
						}
						else {
355
							stats->ignored++;
356
							stats->packets.ignored += tmp->flow.packets_count;
357
						}
's avatar
committed
358
						free(tmp);
359 360 361
					}

					data->list_size--;
's avatar
committed
362

363 364 365 366 367 368
					tmp = previous;
					if(tmp == NULL)
						break;
				}
		}
		pthread_mutex_unlock( mutex );
369
		sleep(EPOCHDURATION);
's avatar
committed
370
		data->epoch++;
371 372 373 374
	}
	pthread_exit(NULL);
}

375 376 377 378
static int exprflow_instance(mapidflib_function_instance_t *instance,
                             MAPI_UNUSED int fd,
                             MAPI_UNUSED mapidflib_flow_mod_t *flow_mod) {
	mapiFunctArg* fargs;
's avatar
committed
379
	int shm_flows; // max: (DIMAPI_DATA_SIZE - sizeof(flows_stat) - sizeof(mapi_results_t)) / sizeof(struct flow_data))
380 381 382 383

	fargs = instance->args;
	shm_flows = getargint(&fargs);

's avatar
committed
384
	instance->def->shm_size = sizeof(flows_stat) + sizeof(struct flow_data) * shm_flows + sizeof(pthread_mutex_t);
385

's avatar
committed
386 387 388
	// 0                 shm (instance->result.data)                             DIMAPI_DATA_SIZE
	// | mapi_result_type | flows_stat | shm_flows * flow_data | pthread_mutex_t |

389 390 391
	return 0;
}

392 393
static int exprflow_init(mapidflib_function_instance_t *instance, MAPI_UNUSED int fd)
{
's avatar
committed
394 395

	int mythread;
396 397 398
	pthread_mutex_t tmpmutex = PTHREAD_MUTEX_INITIALIZER;
	shm shm_struct;
	struct exfl_data *data;
399
	flows_stat *stats;
400

401 402
	mapiFunctArg* fargs;
	fargs = instance->args;
403

404
	// HashTable initialization
's avatar
committed
405
	if( (instance->internal_data = malloc(sizeof(struct exfl_data))) == NULL) {
406
		DEBUG_CMD(Debug_Message("Malloc failed for size %d", sizeof(struct exfl_data)));
's avatar
committed
407 408
		return(-1);
	}
409 410 411 412 413 414 415 416
	data = instance->internal_data;
	data->list_head = NULL;
	data->list_tail = NULL;
	data->expired_flows_head = NULL;
	data->expired_flows_tail = NULL;
	memset(data->hashtable, 0, EXFL_HASH_SIZE*sizeof(struct exfl_hash_node *));
	data->mutex = tmpmutex;
	data->run = 1;
417
	data->list_size = 0;
418
	data->expired_flows_list_size = 0;
's avatar
committed
419 420
	// Epoch initialization
	data->epoch=0;
421 422

	// function arguments
's avatar
committed
423
	data->shm_flows = getargint(&fargs); // max: (DIMAPI_DATA_SIZE - sizeof(flows_stat) - sizeof(mapi_results_t)) / sizeof(struct flow_data))
424 425 426
	data->expired_flows_list_size_max = getargint(&fargs);
	data->packets_count_min = getargint(&fargs);

427
	// Shared Memory Initialization
428
	stats = (flows_stat *)instance->result.data;
429
	shm_struct.size = (unsigned int *)instance->result.data;
's avatar
committed
430 431
	shm_struct.Table = (flow_data *)((char *)instance->result.data+sizeof(flows_stat));
	shm_struct.smmutex = (pthread_mutex_t *)(((char *)instance->result.data) + sizeof(flows_stat) + sizeof(struct flow_data) * data->shm_flows);
432

433 434 435 436 437
	stats->received = 0;
	stats->expired = 0;
	*(shm_struct.size) = 0; //stats->sent = 0;
	stats->ignored = 0;
	stats->dropped = 0;
438

439 440 441 442 443 444
	stats->packets.received = 0;
	stats->packets.expired = 0;
	stats->packets.sent = 0;
	stats->packets.ignored = 0;
	stats->packets.dropped = 0;

445 446
	//mutex initialization
	*(shm_struct.smmutex) = tmpmutex;
's avatar
committed
447 448

	// the thread pid is stored in internal_data in order to be available for stopping it in cleanup
449
	mythread = pthread_create(&((((struct exfl_data *)(instance->internal_data))->pthread)), NULL, (void *) &poll_expired_flows, (void *)instance);
's avatar
committed
450

451 452 453
	return 0;
}

's avatar
committed
454
static int exprflow_process(mapidflib_function_instance_t *instance, MAPI_UNUSED unsigned char* dev_pkt, unsigned char* link_pkt, mapid_pkthdr_t* pkt_head) {
455 456

	struct exfl_data *data = (struct exfl_data *)(instance->internal_data);
457
	struct flows_stat *stats;
's avatar
committed
458
	eflow_data record;
459 460 461
	ip_header* ip = NULL;
	tcp_header* tcp = NULL;
 	udp_header* udp = NULL;
462 463 464
	unsigned char *p = NULL;
	uint16_t ethertype;
	struct ether_header *ep = NULL;
's avatar
committed
465 466 467 468 469 470 471
	struct hdlc_header {
		uint8_t addr;     // 0x0F for unicast, 0x8F for broadcast
		uint8_t ctrl;     // 0x00
		uint16_t proto; // http://www.nethelp.no/net/cisco-hdlc.txt
	}	*hp = NULL;
	//int ether_len = 0;
	int ip_len = 0;
472
	unsigned int len = pkt_head->caplen;
473
	int headerlenoverplus = 0;
474

475
	stats = (flows_stat *) instance->result.data;
's avatar
committed
476

477
	struct vlan_802q_header *vlan_header;
's avatar
committed
478
	p = link_pkt;
479

's avatar
committed
480 481 482 483 484
	switch(instance->hwinfo->link_type) {
		case DLT_EN10MB:
				// lay the Ethernet header struct over the packet data
				ep = (struct ether_header *)p;
				//ether_len = sizeof(struct ether_header);
485

's avatar
committed
486 487 488
				// skip ethernet header
				p += sizeof(struct ether_header);
				len -= sizeof(struct ether_header);
489

's avatar
committed
490
				ethertype = ntohs(ep->ether_type);
491

's avatar
committed
492 493 494 495 496 497
				if(ethertype  == ETHERTYPE_8021Q) {
					vlan_header = (struct vlan_802q_header*)p;
					ethertype = ntohs(vlan_header->ether_type);
					p += sizeof(struct vlan_802q_header);
					headerlenoverplus = sizeof(struct vlan_802q_header);
				}
's avatar
committed
498

's avatar
committed
499
				if(ethertype == MPLS_MASK) {
's avatar
committed
500
					p += 4;
's avatar
committed
501 502 503 504 505 506 507 508 509
					headerlenoverplus = 4;
				}
				else if(ethertype != ETHERTYPE_IP) {
					DEBUG_CMD(Debug_Message("not an ip packet?"));
					return 0;
				}
			break;
		case DLT_CHDLC:
				hp = (struct hdlc_header *)p;
510

's avatar
committed
511 512 513
				p += sizeof(struct hdlc_header);
				len -= sizeof(struct hdlc_header);

's avatar
committed
514
				ethertype = ntohs(hp->proto);
's avatar
committed
515 516 517 518 519 520 521 522

				if (ethertype != ETHERTYPE_IP) {
					return 0;
				}
			break;
		default:
			//DEBUG_CMD(Debug_Message("Link layer not supported"));
			return 0;
523
	}
524

's avatar
committed
525
	// IP header struct over the packet data;
526
	ip =(ip_header*)p;
527 528 529 530
	ip_len = (ip->ver_ihl & 0xf) * 4;

	//IPPROTO_TCP
	if(ip->ptcl == IPPROTO_TCP){
531 532 533 534 535
		tcp = (tcp_header*)(p + ip_len);
		record.saddr = ip->saddr;
		record.daddr = ip->daddr;
		record.sport = ntohs(tcp->sport);
		record.dport = ntohs(tcp->dport);
's avatar
committed
536 537
		record.timestamp = pkt_head->ts;
		record.epoch = data->epoch;
's avatar
committed
538
		record.valid = data->epoch > TIMEOUT;
539
		record.ptcl = ip->ptcl;
540
		record.bytes_count = pkt_head->wlen - headerlenoverplus;
541
		record.ttl_pkt1 = ip->ttl;
542
		add_toflow(data, record, stats);
543 544 545
	}
	//IPPROTO_UDP
	else if( ip->ptcl == IPPROTO_UDP) {
546
		udp = (udp_header *)(p + ip_len);
547 548
		record.saddr = ip->saddr;
		record.daddr = ip->daddr;
's avatar
committed
549 550
		record.sport = ntohs(udp->sport);
		record.dport = ntohs(udp->dport);
's avatar
committed
551 552
		record.timestamp = pkt_head->ts;
		record.epoch = data->epoch;
's avatar
committed
553
		record.valid = data->epoch > TIMEOUT;
554
		record.ptcl = ip->ptcl;
's avatar
committed
555
		record.bytes_count = pkt_head->wlen - headerlenoverplus;
556
		record.ttl_pkt1 = ip->ttl;
557
		add_toflow(data, record, stats);
558 559 560
	}
	//IPPROTO_IP
	else {
561 562
		record.saddr = ip->saddr;
		record.daddr = ip->daddr;
's avatar
committed
563
		record.sport = record.dport = ntohs(0);
's avatar
committed
564 565
		record.timestamp = pkt_head->ts;
		record.epoch = data->epoch;
's avatar
committed
566
		record.valid = data->epoch > TIMEOUT;
567
		record.ptcl = ip->ptcl;
's avatar
committed
568
		record.bytes_count = pkt_head->wlen - headerlenoverplus;
569
		record.ttl_pkt1 = ip->ttl;
570
		add_toflow(data, record, stats);
571 572 573 574
	}
	return 1;
}

's avatar
committed
575
static int exprflow_reset(MAPI_UNUSED mapidflib_function_instance_t *instance)
576 577 578 579 580
{
	// empty HashTable?
  return 0;
}

's avatar
committed
581
static int exprflow_cleanup(mapidflib_function_instance_t *instance)
582 583 584 585
{
	struct exfl_list_node *tmp = ((struct exfl_data *)(instance->internal_data))->list_head;
	struct exfl_list_node *next;
	struct exfl_hash_node *tmphash, *nexthash;
586
	int i = EXFL_HASH_SIZE, count=0;
587

's avatar
committed
588

589 590
	// stop polling thread
	pthread_cancel((((struct exfl_data *)(instance->internal_data))->pthread));
's avatar
committed
591
	//fprintf(stderr, "Hashtable contains %d buckets\n", checkhash(((struct exfl_data *)(instance->internal_data))->hashtable));
592
	// HashTable deallocation
's avatar
committed
593
	while( i-- > 0 ) {
594 595
		tmphash = ((struct exfl_data *)(instance->internal_data))->hashtable[i];
		while(tmphash != NULL) {
596 597
			//fprintf(stderr, "Cleaning hash node %p\n", tmphash);
			//print_flow(&(tmphash->node->flow));
's avatar
committed
598

599 600 601 602 603 604 605
			nexthash = tmphash->next;
			free(tmphash);
			tmphash = nexthash;
		}
	}
	// List deallocation
	while(tmp != NULL) {
606 607
		//fprintf(stderr, "Cleaning list node %p\n", tmp);
		//print_flow(&(tmp->flow));
608 609 610
		next = tmp->next;
		free(tmp);
		tmp = next;
611
		count++;
612
	}
's avatar
committed
613 614 615 616 617 618 619 620 621 622
	count = 0;
	// Exprired Flows list deallocation
	tmp = ((struct exfl_data *)(instance->internal_data))->expired_flows_head;
	while(tmp != NULL) {
		next = tmp->next;
		free(tmp);
		tmp = next;
		count++;
	}

623
	free(instance->internal_data);
's avatar
committed
624

625
	return 0;
626 627
}

's avatar
committed
628
void print_flow(eflow_data *data) {
629 630 631 632 633
	switch (data->ptcl) {
		case IPPROTO_TCP: fprintf(stdout, "TCP "); break;
		case IPPROTO_UDP: fprintf(stdout, "UDP "); break;
		default: fprintf(stdout, "IP "); break;
	}
634

635 636 637 638
	fprintf(stdout, "src %d.%d.%d.%d:%d\t", data->saddr.byte1, data->saddr.byte2, data->saddr.byte3, data->saddr.byte4, data->sport);
	fprintf(stdout, "dst %d.%d.%d.%d:%d\t", data->daddr.byte1, data->daddr.byte2, data->daddr.byte3, data->daddr.byte4, data->dport);
	fprintf(stdout, "packets: %lld, bytes: %lld\n", data->packets_count, data->bytes_count);
}
's avatar
committed
639

640 641
static int exprflow_client_read_result(mapidflib_function_instance_t *instance,mapi_result_t *res) {
	shm shm_struct;
642
	flows_stat *stats;
643

644
	mapiFunctArg* fargs;
's avatar
committed
645
	int shm_flows; // max: (DIMAPI_DATA_SIZE - sizeof(flows_stat) - sizeof(mapi_results_t)) / sizeof(struct flow_data))
646 647 648 649

	fargs = instance->args;
	shm_flows = getargint(&fargs);

650
	stats = (flows_stat *)instance->result.data;
651
	shm_struct.size = (unsigned int *)instance->result.data;
's avatar
committed
652 653
	shm_struct.Table = (flow_data *)((char *)instance->result.data+sizeof(flows_stat));
	shm_struct.smmutex = (pthread_mutex_t *)(((char *)instance->result.data) + sizeof(flows_stat) + sizeof(struct flow_data) * shm_flows);
654

655
	res->res = instance->internal_data;
's avatar
committed
656
	res->size = sizeof(flows_stat) + sizeof(flow_data) * (*(shm_struct.size));
657

's avatar
committed
658
	pthread_mutex_lock( shm_struct.smmutex );
659
	memcpy(instance->internal_data, instance->result.data, res->size);
660 661
	stats->received = 0;
	stats->expired = 0;
's avatar
committed
662
	*(shm_struct.size) = 0; //stats->sent = 0;
663 664
	stats->ignored = 0;
	stats->dropped = 0;
665 666 667 668 669
	stats->packets.received = 0;
	stats->packets.expired = 0;
	stats->packets.sent = 0;
	stats->packets.ignored = 0;
	stats->packets.dropped = 0;
's avatar
committed
670
	pthread_mutex_unlock( shm_struct.smmutex );
671 672 673 674 675

	return(0);
}

static int exprflow_client_init( mapidflib_function_instance_t *instance, void *data) {
676

677
	mapiFunctArg* fargs;
's avatar
committed
678
	int shm_flows; // max: (DIMAPI_DATA_SIZE - sizeof(flows_stat) - sizeof(mapi_results_t)) / sizeof(struct flow_data))
679 680 681 682

	fargs = instance->args;
	shm_flows = getargint(&fargs);

's avatar
committed
683 684
	if(shm_flows > (DIMAPI_DATA_SIZE - sizeof(flows_stat) - sizeof(mapi_results_t)) / sizeof(struct flow_data)) {
		printf("Cannot process %d flows. Maximum is %d.\n", shm_flows, (DIMAPI_DATA_SIZE - sizeof(flows_stat) - sizeof(mapi_results_t)) / sizeof(struct flow_data));
685 686 687
		return(-1);
	}

's avatar
committed
688 689
	if((instance->internal_data = malloc(sizeof(flows_stat)+sizeof(struct flow_data)*shm_flows)) == NULL) {
		printf("Malloc failed for size %d [%s:%d]\n", sizeof(flows_stat)+sizeof(struct flow_data)*shm_flows, __FILE__, __LINE__);
's avatar
committed
690 691
		return(-1);
	}
692 693 694 695
	data = instance->internal_data;
	return(0);
}

696 697 698 699 700 701 702 703
/* This function is called when the flow closes and should release all resources
 * allocated by the EXPIRED_FLOWS function on the client side */

static int exprflow_client_cleanup( mapidflib_function_instance_t *instance){

	free(instance->internal_data);
	return(0);
}
704 705 706 707 708

static mapidflib_function_def_t finfo={
  "", //libname
  EXPIRED_FLOWS, //name
  "Expired Flows function", //descr
709
  "iii", //argdescr
710 711
  MAPI_DEVICE_ALL, //devtype
  MAPIRES_SHM, //Method for returning results
712
  0, //shm size. Set by instance.
713 714 715
  0, //modifies_pkts
  0, //filters packets ?
  MAPIOPT_NONE,
716
  exprflow_instance, //instance
717 718 719 720 721 722 723
  exprflow_init, //init
  exprflow_process, //process
  NULL, //get_result,
  exprflow_reset, //reset
  exprflow_cleanup, //cleanup
  exprflow_client_init, //client_init
  exprflow_client_read_result, //client_read_result
724
  exprflow_client_cleanup //client_cleanup
725 726 727 728 729 730 731
};

mapidflib_function_def_t* exprflow_get_funct_info();

mapidflib_function_def_t* exprflow_get_funct_info() {
  return &finfo;
};