exprflow.c 21.7 KB
Newer Older
1 2 3 4 5 6 7 8
#include <stdlib.h>
#include <stdio.h>
#include <sys/shm.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <sys/time.h>
#include <unistd.h>
's avatar
committed
9
#include <pcap.h>
10 11 12 13 14 15 16
#include <netinet/in.h>

#include "mapidflib.h"
#include "mapidlib.h"
#include "mapidevices.h"
#include "mapid.h"
#include "fhelp.h"
17
#include "debug.h"
18 19
#include "expiredflowshash.h"

's avatar
committed
20 21 22
#define EXPIRED_FLOWS "EXPIRED_FLOWS"
#define TIMEOUT 30			//in epochs
#define EPOCHDURATION 1
23

24 25
#define ETHERTYPE_8021Q 0x8100
#define MPLS_MASK 0x8847
's avatar
committed
26
#define ETHERTYPE_IP 0x0800  /* IP protocol */
27

28

29 30 31 32 33
struct vlan_802q_header {
	u_int16_t priority_cfi_vid;
	u_int16_t ether_type;
};

34 35 36 37 38 39 40 41
pthread_t pthread;

typedef struct shm {
	unsigned int *size;
	flow_data *Table;
	pthread_mutex_t *smmutex;
}shm;

42

's avatar
committed
43 44 45
void add_toflow(struct exfl_data *data, eflow_data record, flows_stat *stats);
static struct exfl_hash_node *exfl_hash_lookup(unsigned int value, struct exfl_data *data, eflow_data record);
void exfl_add_to_hashtable_and_list(struct exfl_data *data, unsigned int value, eflow_data record);
46 47
void shift_node(struct exfl_data *data, struct exfl_list_node *node);
void poll_expired_flows(mapidflib_function_instance_t *instance);
's avatar
committed
48
void print_flow(eflow_data *data);
49

's avatar
committed
50
inline unsigned int hash_function(eflow_data record) {
51 52
	ip_addr saddr = record.saddr;
	ip_addr daddr = record.daddr;
53

54 55
	return ((unsigned int)((saddr.byte1 + saddr.byte2 + saddr.byte3 + saddr.byte4) * (record.sport + 1) +
		                     (daddr.byte1 + daddr.byte2 + daddr.byte3 + daddr.byte4) * (record.dport + 2))) / 137;
56 57
}

's avatar
committed
58
void add_toflow(struct exfl_data *data, eflow_data record, flows_stat *stats) {
59 60 61

	struct exfl_hash_node *lookup;
	unsigned int value = hash_function(record);
's avatar
committed
62 63

	pthread_mutex_lock( &(data->mutex) );
64

's avatar
committed
65
	lookup = exfl_hash_lookup(value, data, record);
's avatar
committed
66

67 68
	if(lookup == NULL) {
		pthread_mutex_unlock( &(data->mutex) );
69
		exfl_add_to_hashtable_and_list(data, value, record);
70
		stats->received++;
71 72 73
	}
	else {
		lookup->node->flow.packets_count++;
74
		lookup->node->flow.bytes_count += record.bytes_count;
's avatar
committed
75
		lookup->node->flow.epoch = data->epoch;
76
		lookup->node->flow.timestamp_last = record.timestamp_last;
77 78 79 80

		shift_node(data, lookup->node);
		pthread_mutex_unlock( &(data->mutex) );
	}
81
	stats->packets.received++;
's avatar
committed
82
}
83 84 85 86 87 88 89

void shift_node(struct exfl_data *data, struct exfl_list_node *node) {
	struct exfl_list_node *previous;

	if( data->list_head == node)
		return;
	else {
's avatar
committed
90

91 92 93 94 95 96
		previous = node->previous;
		// remove node from list
		previous->next = node->next;
		if(node->next)
			node->next->previous = previous;
		else {
's avatar
committed
97
		//Tote node->next == NULL;
98 99 100 101 102 103 104
			data->list_tail = previous;
		}
		// Add node at the start of the list
		data->list_head->previous = node;
		node->next = data->list_head;
		node->previous = NULL;
		data->list_head = node;
's avatar
committed
105

106 107 108
	}
}

's avatar
committed
109
void exfl_add_to_hashtable_and_list(struct exfl_data *data, unsigned int value, eflow_data record) {
's avatar
committed
110

111 112 113
	struct exfl_list_node *newlistnode;
	struct exfl_hash_node *newhashnode;
	unsigned int pos;
's avatar
committed
114 115

	// Create a new list node
's avatar
committed
116
	if( (newlistnode = (struct exfl_list_node *)malloc(sizeof(struct exfl_list_node))) == NULL) {
117
		DEBUG_CMD(Debug_Message("Malloc failed for size %d", sizeof(struct exfl_list_node)));
's avatar
committed
118
		return;
's avatar
committed
119
	}
120
	newlistnode->value = value;
121
	newlistnode->flow = record;
122 123 124 125 126
	newlistnode->flow.packets_count = 1;
	newlistnode->next = newlistnode->previous = NULL;

	// Create a new hash node
	pos = value%EXFL_HASH_SIZE;
's avatar
committed
127
	if( (newhashnode = (struct exfl_hash_node *)malloc(sizeof(struct exfl_hash_node))) == NULL) {
's avatar
committed
128
		DEBUG_CMD(Debug_Message("Malloc failed for size %d", sizeof(struct exfl_hash_node)));
's avatar
committed
129 130 131
		return;
	}

132 133 134
	newhashnode->value = value;
	newhashnode->node = newlistnode;
	newhashnode->next = data->hashtable[pos];
's avatar
committed
135
	newhashnode->prev = NULL;
136 137 138

	//add to list
	pthread_mutex_lock( &(data->mutex) );
's avatar
committed
139

140 141 142 143 144 145 146 147 148
	data->list_size++;
	if(data->list_tail == NULL) {
		data->list_head = data->list_tail = newlistnode;
	}
	else {
		data->list_head->previous = newlistnode;
		newlistnode->next = data->list_head;
		data->list_head = newlistnode;
	}
's avatar
committed
149

150
	//add to hashtable
's avatar
committed
151 152
	if(data->hashtable[pos] != NULL)
		data->hashtable[pos]->prev = newhashnode;
153
	data->hashtable[pos] = newhashnode;
's avatar
committed
154

155 156 157
	pthread_mutex_unlock( &(data->mutex) );
}

's avatar
committed
158 159
int compare_ip(ip_addr ip1, ip_addr ip2)
{
's avatar
committed
160 161 162
	if ((ip1.byte1 == ip2.byte1) && (ip1.byte2 == ip2.byte2) && (ip1.byte3 == ip2.byte3) && (ip1.byte4 == ip2.byte4))
		return (1);
	return (0);
's avatar
committed
163 164 165
}


's avatar
committed
166
struct exfl_hash_node *exfl_hash_lookup(unsigned int value, struct exfl_data *data, eflow_data record) {
167 168 169 170
	unsigned int pos;
	struct exfl_hash_node *tmp;
	pos = value%EXFL_HASH_SIZE;
	tmp = data->hashtable[pos];
's avatar
committed
171
	eflow_data *hashflow;
's avatar
committed
172

173
	while(tmp) {
's avatar
committed
174
		if(tmp->value == value){
's avatar
committed
175
			hashflow = &(tmp->node->flow);
's avatar
committed
176
			if(record.ptcl == hashflow->ptcl ){
's avatar
committed
177
				if(compare_ip(record.saddr, hashflow->saddr) && compare_ip(record.daddr, hashflow->daddr)) {
's avatar
committed
178
					if((record.sport == hashflow->sport) && (record.dport == hashflow->dport)) return tmp;
's avatar
committed
179
				}
's avatar
committed
180 181
			}
		}
182 183 184 185 186 187 188 189
		tmp = tmp->next;
	}
	return(NULL);
}

int checkhash(struct exfl_hash_node **hashtable) {
	int i, count = 0;
	struct exfl_hash_node *tmp;
's avatar
committed
190

191 192 193 194 195 196 197 198 199 200 201 202
	for(i = 0; i < EXFL_HASH_SIZE; i++) {
		if(hashtable[i] != NULL) {
			tmp = hashtable[i];
			while(tmp != NULL) {
				count++;
				tmp = tmp->next;
			}
		}
	}
	return(count);
}

's avatar
committed
203
/*
204 205 206
 * Check if there are any records in expired flows list and put them
 * in shared memory if there is enough space.
 */
207
void check_expired_flows(struct exfl_data *data, shm shm_struct) {
208
	struct exfl_list_node *tmp, *tail = data->expired_flows_tail;
's avatar
committed
209

210
	while( (tail != NULL) && (*(shm_struct.size) != data->shm_flows) ) {
211
		// Add the expired flow directly into shared memory table
's avatar
committed
212
		pthread_mutex_lock( shm_struct.smmutex );
213 214 215 216
		memcpy(&(shm_struct.Table[*(shm_struct.size)]), &(tail->flow), sizeof(struct flow_data));
		(*(shm_struct.size))++;
		pthread_mutex_unlock( shm_struct.smmutex );
		// Remove node from the expired flows list
217
		data->expired_flows_list_size--;
218
		tmp = tail->previous;
's avatar
committed
219 220
		if(tmp != NULL)
			tmp->next = NULL;
221 222 223
		free(tail);
		tail = tmp;
	}
's avatar
committed
224

225 226 227 228 229 230 231 232 233 234
	if( tail == NULL ) {
		data->expired_flows_head = data->expired_flows_tail = NULL;
	}
	else
		data->expired_flows_tail = tail;
}


void poll_expired_flows(mapidflib_function_instance_t *instance) {
	struct exfl_data *data = instance->internal_data;
235
	struct exfl_list_node *tmp, *previous;
236
	struct exfl_hash_node *lookup_hash;
's avatar
committed
237
	eflow_data tmpeflow_data;
238
	flows_stat *stats;
239 240 241 242 243
	unsigned int value;
	pthread_mutex_t *mutex = &(data->mutex);
	int check;
	shm shm_struct;

244
	stats = (flows_stat *) instance->result.data;
245
	shm_struct.size = (unsigned int *)instance->result.data;
's avatar
committed
246 247
	shm_struct.Table = (flow_data *)((char *)instance->result.data+sizeof(flows_stat));
	shm_struct.smmutex = (pthread_mutex_t *)(((char *)instance->result.data) + sizeof(flows_stat) + sizeof(struct flow_data)*data->shm_flows);
's avatar
committed
248

249 250
	while(data->run) {
		pthread_testcancel();
's avatar
committed
251
		pthread_mutex_lock( mutex );
252
		tmp = data->list_tail;
253
		check_expired_flows(data, shm_struct);
254
		if(tmp) {	//if list is not empty
's avatar
committed
255
				while( data->epoch - tmp->flow.epoch > TIMEOUT) {
256
					stats->expired++;
257
					stats->packets.expired += tmp->flow.packets_count;
258
					previous = tmp->previous;
's avatar
committed
259
					value = tmp->value;
's avatar
committed
260
					tmpeflow_data = tmp->flow;
's avatar
committed
261

's avatar
committed
262
					//remove node from hashtable
's avatar
committed
263
					lookup_hash = exfl_hash_lookup(value, data, tmpeflow_data);
's avatar
committed
264 265 266 267
					if( lookup_hash != NULL) {
							if(data->hashtable[value%EXFL_HASH_SIZE] == lookup_hash) {
								data->hashtable[value%EXFL_HASH_SIZE] = lookup_hash->next;

268 269 270
								if( lookup_hash->prev != NULL ){
									DEBUG_CMD(Debug_Message("hash node is at the head, but previous isn't NULL"));
								}
's avatar
committed
271 272 273 274 275 276 277 278 279 280 281 282 283

								// lookup_hash->next is now at the head
								if( lookup_hash->next != NULL) {
									lookup_hash->next->prev = NULL;
								}
							}
							else {
								lookup_hash->prev->next = lookup_hash->next;
								if( lookup_hash->next != NULL)
									lookup_hash->next->prev = lookup_hash->prev;
							}
							free(lookup_hash);
					}
's avatar
committed
284
					else {
285
						DEBUG_CMD(Debug_Message("hash node not found but list node exist!"));
's avatar
committed
286 287
					}

288 289 290 291 292 293 294 295
					//remove node from list
					if(tmp->previous != NULL) {
						//the node isn't at the head.
						tmp->previous->next = NULL;
						data->list_tail = tmp->previous;
					}
					else {
						check = checkhash(data->hashtable);
296
						if( check != 1)
297 298 299
						data->list_head = NULL;
						data->list_tail = NULL;
					}
's avatar
committed
300
					/*
301
					* When the shared memory segment is full, expired flow records must
's avatar
committed
302
					* be removed from the temporal sorted list, because they are expired
303
					* and for a new packet of this flow, a new record must be created.
's avatar
committed
304
					* In order to achieve this we have a list with all the expired flow
305 306 307
					* records that couldn't be returned.
					*/

308
					if(*(shm_struct.size) == data->shm_flows) { // shm full
309 310 311 312 313
						//remove node from temporal sorted list
						if(tmp->next)
							tmp->next->previous = tmp->previous;
						if(tmp->previous)
							tmp->previous->next = tmp->next;
's avatar
committed
314

315
						// if packets_count is worth to add
's avatar
committed
316
						if(tmp->flow.valid && tmp->flow.packets_count >= ((struct exfl_data *)instance->internal_data)->packets_count_min) {
317 318 319 320 321 322 323 324 325 326 327 328 329
							if(data->expired_flows_list_size < data->expired_flows_list_size_max) { // if buffer not full
								//add node to expired flows list
								data->expired_flows_list_size++;
								if(data->expired_flows_head != NULL) {
									tmp->next = data->expired_flows_head;
									data->expired_flows_head->previous = tmp;
									tmp->previous = NULL;
									data->expired_flows_head = tmp;
								}
								else {
									data->expired_flows_head = data->expired_flows_tail = tmp;
									tmp->previous = tmp->next = NULL;
								}
330 331
							}
							else {
332 333
								// else drop
								stats->dropped++;
334
								stats->packets.dropped += tmp->flow.packets_count;
335
								free(tmp);
336
							}
337 338
						}
						else {
339
							stats->ignored++;
340
							stats->packets.ignored += tmp->flow.packets_count;
341
							free(tmp);
342
						}
's avatar
committed
343

344
					}
345 346
					else { // shm not full
						// if packets_count is worth to add
's avatar
committed
347
						if(tmp->flow.valid && tmp->flow.packets_count >= ((struct exfl_data *)instance->internal_data)->packets_count_min) {
348
							// Add the expired flow from temporal sorted list directly into shared memory table
's avatar
committed
349
							pthread_mutex_lock( shm_struct.smmutex );
350 351
							memcpy(&(shm_struct.Table[*(shm_struct.size)]), &(tmp->flow), sizeof(struct flow_data));
							(*(shm_struct.size))++;
352
							stats->packets.sent += tmp->flow.packets_count;
353 354 355
							pthread_mutex_unlock( shm_struct.smmutex );
						}
						else {
356
							stats->ignored++;
357
							stats->packets.ignored += tmp->flow.packets_count;
358
						}
's avatar
committed
359
						free(tmp);
360 361 362
					}

					data->list_size--;
's avatar
committed
363

364 365 366 367 368 369
					tmp = previous;
					if(tmp == NULL)
						break;
				}
		}
		pthread_mutex_unlock( mutex );
370
		sleep(EPOCHDURATION);
's avatar
committed
371
		data->epoch++;
372 373 374 375
	}
	pthread_exit(NULL);
}

376 377 378 379
static int exprflow_instance(mapidflib_function_instance_t *instance,
                             MAPI_UNUSED int fd,
                             MAPI_UNUSED mapidflib_flow_mod_t *flow_mod) {
	mapiFunctArg* fargs;
's avatar
committed
380
	int shm_flows; // max: (DIMAPI_DATA_SIZE - sizeof(flows_stat) - sizeof(mapi_results_t)) / sizeof(struct flow_data))
381 382 383 384

	fargs = instance->args;
	shm_flows = getargint(&fargs);

's avatar
committed
385
	instance->def->shm_size = sizeof(flows_stat) + sizeof(struct flow_data) * shm_flows + sizeof(pthread_mutex_t);
386

's avatar
committed
387 388 389
	// 0                 shm (instance->result.data)                             DIMAPI_DATA_SIZE
	// | mapi_result_type | flows_stat | shm_flows * flow_data | pthread_mutex_t |

390 391 392
	return 0;
}

393 394
static int exprflow_init(mapidflib_function_instance_t *instance, MAPI_UNUSED int fd)
{
's avatar
committed
395 396

	int mythread;
397 398 399
	pthread_mutex_t tmpmutex = PTHREAD_MUTEX_INITIALIZER;
	shm shm_struct;
	struct exfl_data *data;
400
	flows_stat *stats;
401

402 403
	mapiFunctArg* fargs;
	fargs = instance->args;
404

405
	// HashTable initialization
's avatar
committed
406
	if( (instance->internal_data = malloc(sizeof(struct exfl_data))) == NULL) {
407
		DEBUG_CMD(Debug_Message("Malloc failed for size %d", sizeof(struct exfl_data)));
's avatar
committed
408 409
		return(-1);
	}
410 411 412 413 414 415 416 417
	data = instance->internal_data;
	data->list_head = NULL;
	data->list_tail = NULL;
	data->expired_flows_head = NULL;
	data->expired_flows_tail = NULL;
	memset(data->hashtable, 0, EXFL_HASH_SIZE*sizeof(struct exfl_hash_node *));
	data->mutex = tmpmutex;
	data->run = 1;
418
	data->list_size = 0;
419
	data->expired_flows_list_size = 0;
's avatar
committed
420 421
	// Epoch initialization
	data->epoch=0;
422 423

	// function arguments
's avatar
committed
424
	data->shm_flows = getargint(&fargs); // max: (DIMAPI_DATA_SIZE - sizeof(flows_stat) - sizeof(mapi_results_t)) / sizeof(struct flow_data))
425 426 427
	data->expired_flows_list_size_max = getargint(&fargs);
	data->packets_count_min = getargint(&fargs);

428
	// Shared Memory Initialization
429
	stats = (flows_stat *)instance->result.data;
430
	shm_struct.size = (unsigned int *)instance->result.data;
's avatar
committed
431 432
	shm_struct.Table = (flow_data *)((char *)instance->result.data+sizeof(flows_stat));
	shm_struct.smmutex = (pthread_mutex_t *)(((char *)instance->result.data) + sizeof(flows_stat) + sizeof(struct flow_data) * data->shm_flows);
433

434 435 436 437 438
	stats->received = 0;
	stats->expired = 0;
	*(shm_struct.size) = 0; //stats->sent = 0;
	stats->ignored = 0;
	stats->dropped = 0;
439

440 441 442 443 444 445
	stats->packets.received = 0;
	stats->packets.expired = 0;
	stats->packets.sent = 0;
	stats->packets.ignored = 0;
	stats->packets.dropped = 0;

446 447
	//mutex initialization
	*(shm_struct.smmutex) = tmpmutex;
's avatar
committed
448 449

	// the thread pid is stored in internal_data in order to be available for stopping it in cleanup
450
	mythread = pthread_create(&((((struct exfl_data *)(instance->internal_data))->pthread)), NULL, (void *) &poll_expired_flows, (void *)instance);
's avatar
committed
451

452 453 454
	return 0;
}

's avatar
committed
455
static int exprflow_process(mapidflib_function_instance_t *instance, MAPI_UNUSED unsigned char* dev_pkt, unsigned char* link_pkt, mapid_pkthdr_t* pkt_head) {
456 457

	struct exfl_data *data = (struct exfl_data *)(instance->internal_data);
458
	struct flows_stat *stats;
's avatar
committed
459
	eflow_data record;
460 461 462
	ip_header* ip = NULL;
	tcp_header* tcp = NULL;
 	udp_header* udp = NULL;
463 464 465
	unsigned char *p = NULL;
	uint16_t ethertype;
	struct ether_header *ep = NULL;
's avatar
committed
466 467 468 469 470 471 472
	struct hdlc_header {
		uint8_t addr;     // 0x0F for unicast, 0x8F for broadcast
		uint8_t ctrl;     // 0x00
		uint16_t proto; // http://www.nethelp.no/net/cisco-hdlc.txt
	}	*hp = NULL;
	//int ether_len = 0;
	int ip_len = 0;
473
	unsigned int len = pkt_head->caplen;
474
	int headerlenoverplus = 0;
475

476
	stats = (flows_stat *) instance->result.data;
's avatar
committed
477

478
	struct vlan_802q_header *vlan_header;
's avatar
committed
479
	p = link_pkt;
480

's avatar
committed
481 482 483 484 485
	switch(instance->hwinfo->link_type) {
		case DLT_EN10MB:
				// lay the Ethernet header struct over the packet data
				ep = (struct ether_header *)p;
				//ether_len = sizeof(struct ether_header);
486

's avatar
committed
487 488 489
				// skip ethernet header
				p += sizeof(struct ether_header);
				len -= sizeof(struct ether_header);
490

's avatar
committed
491
				ethertype = ntohs(ep->ether_type);
492

's avatar
committed
493 494 495 496 497 498
				if(ethertype  == ETHERTYPE_8021Q) {
					vlan_header = (struct vlan_802q_header*)p;
					ethertype = ntohs(vlan_header->ether_type);
					p += sizeof(struct vlan_802q_header);
					headerlenoverplus = sizeof(struct vlan_802q_header);
				}
's avatar
committed
499

's avatar
committed
500
				if(ethertype == MPLS_MASK) {
's avatar
committed
501
					p += 4;
's avatar
committed
502 503 504 505 506 507 508 509 510
					headerlenoverplus = 4;
				}
				else if(ethertype != ETHERTYPE_IP) {
					DEBUG_CMD(Debug_Message("not an ip packet?"));
					return 0;
				}
			break;
		case DLT_CHDLC:
				hp = (struct hdlc_header *)p;
511

's avatar
committed
512 513 514
				p += sizeof(struct hdlc_header);
				len -= sizeof(struct hdlc_header);

's avatar
committed
515
				ethertype = ntohs(hp->proto);
's avatar
committed
516 517 518 519 520 521 522 523

				if (ethertype != ETHERTYPE_IP) {
					return 0;
				}
			break;
		default:
			//DEBUG_CMD(Debug_Message("Link layer not supported"));
			return 0;
524
	}
525

's avatar
committed
526
	// IP header struct over the packet data;
527
	ip =(ip_header*)p;
528 529 530 531
	ip_len = (ip->ver_ihl & 0xf) * 4;

	//IPPROTO_TCP
	if(ip->ptcl == IPPROTO_TCP){
532 533 534 535 536
		tcp = (tcp_header*)(p + ip_len);
		record.saddr = ip->saddr;
		record.daddr = ip->daddr;
		record.sport = ntohs(tcp->sport);
		record.dport = ntohs(tcp->dport);
537 538
		record.timestamp_first = pkt_head->ts;
		record.timestamp_last = pkt_head->ts;
's avatar
committed
539
		record.epoch = data->epoch;
's avatar
committed
540
		record.valid = data->epoch > TIMEOUT;
541
		record.ptcl = ip->ptcl;
542
		record.bytes_count = pkt_head->wlen - headerlenoverplus;
543
		record.ttl_pkt1 = ip->ttl;
544
		add_toflow(data, record, stats);
545 546 547
	}
	//IPPROTO_UDP
	else if( ip->ptcl == IPPROTO_UDP) {
548
		udp = (udp_header *)(p + ip_len);
549 550
		record.saddr = ip->saddr;
		record.daddr = ip->daddr;
's avatar
committed
551 552
		record.sport = ntohs(udp->sport);
		record.dport = ntohs(udp->dport);
553 554
		record.timestamp_first = pkt_head->ts;
		record.timestamp_last = pkt_head->ts;
's avatar
committed
555
		record.epoch = data->epoch;
's avatar
committed
556
		record.valid = data->epoch > TIMEOUT;
557
		record.ptcl = ip->ptcl;
's avatar
committed
558
		record.bytes_count = pkt_head->wlen - headerlenoverplus;
559
		record.ttl_pkt1 = ip->ttl;
560
		add_toflow(data, record, stats);
561 562 563
	}
	//IPPROTO_IP
	else {
564 565
		record.saddr = ip->saddr;
		record.daddr = ip->daddr;
's avatar
committed
566
		record.sport = record.dport = ntohs(0);
567 568
		record.timestamp_first = pkt_head->ts;
		record.timestamp_last = pkt_head->ts;
's avatar
committed
569
		record.epoch = data->epoch;
's avatar
committed
570
		record.valid = data->epoch > TIMEOUT;
571
		record.ptcl = ip->ptcl;
's avatar
committed
572
		record.bytes_count = pkt_head->wlen - headerlenoverplus;
573
		record.ttl_pkt1 = ip->ttl;
574
		add_toflow(data, record, stats);
575 576 577 578
	}
	return 1;
}

's avatar
committed
579
static int exprflow_reset(MAPI_UNUSED mapidflib_function_instance_t *instance)
580 581 582 583 584
{
	// empty HashTable?
  return 0;
}

's avatar
committed
585
static int exprflow_cleanup(mapidflib_function_instance_t *instance)
586 587 588 589
{
	struct exfl_list_node *tmp = ((struct exfl_data *)(instance->internal_data))->list_head;
	struct exfl_list_node *next;
	struct exfl_hash_node *tmphash, *nexthash;
590
	int i = EXFL_HASH_SIZE, count=0;
591

's avatar
committed
592

593 594
	// stop polling thread
	pthread_cancel((((struct exfl_data *)(instance->internal_data))->pthread));
's avatar
committed
595
	//fprintf(stderr, "Hashtable contains %d buckets\n", checkhash(((struct exfl_data *)(instance->internal_data))->hashtable));
596
	// HashTable deallocation
's avatar
committed
597
	while( i-- > 0 ) {
598 599
		tmphash = ((struct exfl_data *)(instance->internal_data))->hashtable[i];
		while(tmphash != NULL) {
600 601
			//fprintf(stderr, "Cleaning hash node %p\n", tmphash);
			//print_flow(&(tmphash->node->flow));
's avatar
committed
602

603 604 605 606 607 608 609
			nexthash = tmphash->next;
			free(tmphash);
			tmphash = nexthash;
		}
	}
	// List deallocation
	while(tmp != NULL) {
610 611
		//fprintf(stderr, "Cleaning list node %p\n", tmp);
		//print_flow(&(tmp->flow));
612 613 614
		next = tmp->next;
		free(tmp);
		tmp = next;
615
		count++;
616
	}
's avatar
committed
617 618 619 620 621 622 623 624 625 626
	count = 0;
	// Exprired Flows list deallocation
	tmp = ((struct exfl_data *)(instance->internal_data))->expired_flows_head;
	while(tmp != NULL) {
		next = tmp->next;
		free(tmp);
		tmp = next;
		count++;
	}

627
	free(instance->internal_data);
's avatar
committed
628

629
	return 0;
630 631
}

's avatar
committed
632
void print_flow(eflow_data *data) {
633 634 635 636 637
	switch (data->ptcl) {
		case IPPROTO_TCP: fprintf(stdout, "TCP "); break;
		case IPPROTO_UDP: fprintf(stdout, "UDP "); break;
		default: fprintf(stdout, "IP "); break;
	}
638

639 640 641 642
	fprintf(stdout, "src %d.%d.%d.%d:%d\t", data->saddr.byte1, data->saddr.byte2, data->saddr.byte3, data->saddr.byte4, data->sport);
	fprintf(stdout, "dst %d.%d.%d.%d:%d\t", data->daddr.byte1, data->daddr.byte2, data->daddr.byte3, data->daddr.byte4, data->dport);
	fprintf(stdout, "packets: %lld, bytes: %lld\n", data->packets_count, data->bytes_count);
}
's avatar
committed
643

644 645
static int exprflow_client_read_result(mapidflib_function_instance_t *instance,mapi_result_t *res) {
	shm shm_struct;
646
	flows_stat *stats;
647

648
	mapiFunctArg* fargs;
's avatar
committed
649
	int shm_flows; // max: (DIMAPI_DATA_SIZE - sizeof(flows_stat) - sizeof(mapi_results_t)) / sizeof(struct flow_data))
650 651 652 653

	fargs = instance->args;
	shm_flows = getargint(&fargs);

654
	stats = (flows_stat *)instance->result.data;
655
	shm_struct.size = (unsigned int *)instance->result.data;
's avatar
committed
656 657
	shm_struct.Table = (flow_data *)((char *)instance->result.data+sizeof(flows_stat));
	shm_struct.smmutex = (pthread_mutex_t *)(((char *)instance->result.data) + sizeof(flows_stat) + sizeof(struct flow_data) * shm_flows);
658

659
	res->res = instance->internal_data;
's avatar
committed
660
	res->size = sizeof(flows_stat) + sizeof(flow_data) * (*(shm_struct.size));
661

's avatar
committed
662
	pthread_mutex_lock( shm_struct.smmutex );
663
	memcpy(instance->internal_data, instance->result.data, res->size);
664 665
	stats->received = 0;
	stats->expired = 0;
's avatar
committed
666
	*(shm_struct.size) = 0; //stats->sent = 0;
667 668
	stats->ignored = 0;
	stats->dropped = 0;
669 670 671 672 673
	stats->packets.received = 0;
	stats->packets.expired = 0;
	stats->packets.sent = 0;
	stats->packets.ignored = 0;
	stats->packets.dropped = 0;
's avatar
committed
674
	pthread_mutex_unlock( shm_struct.smmutex );
675 676 677 678 679

	return(0);
}

static int exprflow_client_init( mapidflib_function_instance_t *instance, void *data) {
680

681
	mapiFunctArg* fargs;
's avatar
committed
682
	int shm_flows; // max: (DIMAPI_DATA_SIZE - sizeof(flows_stat) - sizeof(mapi_results_t)) / sizeof(struct flow_data))
683 684 685 686

	fargs = instance->args;
	shm_flows = getargint(&fargs);

's avatar
committed
687 688
	if(shm_flows > (DIMAPI_DATA_SIZE - sizeof(flows_stat) - sizeof(mapi_results_t)) / sizeof(struct flow_data)) {
		printf("Cannot process %d flows. Maximum is %d.\n", shm_flows, (DIMAPI_DATA_SIZE - sizeof(flows_stat) - sizeof(mapi_results_t)) / sizeof(struct flow_data));
689 690 691
		return(-1);
	}

's avatar
committed
692 693
	if((instance->internal_data = malloc(sizeof(flows_stat)+sizeof(struct flow_data)*shm_flows)) == NULL) {
		printf("Malloc failed for size %d [%s:%d]\n", sizeof(flows_stat)+sizeof(struct flow_data)*shm_flows, __FILE__, __LINE__);
's avatar
committed
694 695
		return(-1);
	}
696 697 698 699
	data = instance->internal_data;
	return(0);
}

700 701 702 703 704 705 706 707
/* This function is called when the flow closes and should release all resources
 * allocated by the EXPIRED_FLOWS function on the client side */

static int exprflow_client_cleanup( mapidflib_function_instance_t *instance){

	free(instance->internal_data);
	return(0);
}
708 709 710 711 712

static mapidflib_function_def_t finfo={
  "", //libname
  EXPIRED_FLOWS, //name
  "Expired Flows function", //descr
713
  "iii", //argdescr
714 715
  MAPI_DEVICE_ALL, //devtype
  MAPIRES_SHM, //Method for returning results
716
  0, //shm size. Set by instance.
717 718 719
  0, //modifies_pkts
  0, //filters packets ?
  MAPIOPT_NONE,
720
  exprflow_instance, //instance
721 722 723 724 725 726 727
  exprflow_init, //init
  exprflow_process, //process
  NULL, //get_result,
  exprflow_reset, //reset
  exprflow_cleanup, //cleanup
  exprflow_client_init, //client_init
  exprflow_client_read_result, //client_read_result
728
  exprflow_client_cleanup //client_cleanup
729 730 731 732 733 734 735
};

mapidflib_function_def_t* exprflow_get_funct_info();

mapidflib_function_def_t* exprflow_get_funct_info() {
  return &finfo;
};