Commit dee85e94 authored by 's avatar

Fixed PoS


git-svn-id: file:///home/svn/mapi/trunk@1435 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent add33d00
......@@ -10,6 +10,7 @@ typedef struct eflow_data {
u_char ptcl;
u_char ttl_pkt1;
unsigned long long epoch;
unsigned int valid;
} eflow_data;
typedef struct flow_data {
......@@ -32,7 +33,7 @@ typedef struct flows_stat {
struct exfl_list_node {
unsigned int value;
/* info that define a flow */
struct eflow_data flow;
......
......@@ -62,7 +62,7 @@ inline unsigned int hash_function(eflow_data record) {
}
}
void add_toflow(struct exfl_data *data, eflow_data record, flows_stat *stats) {
void add_toflow(struct exfl_data *data, eflow_data record, flows_stat *stats) {
struct exfl_hash_node *lookup;
unsigned int value = hash_function(record);
......@@ -84,7 +84,7 @@ void add_toflow(struct exfl_data *data, eflow_data record, flows_stat *stats) {
shift_node(data, lookup->node);
pthread_mutex_unlock( &(data->mutex) );
}
}
}
void shift_node(struct exfl_data *data, struct exfl_list_node *node) {
struct exfl_list_node *previous;
......@@ -99,7 +99,7 @@ void shift_node(struct exfl_data *data, struct exfl_list_node *node) {
if(node->next)
node->next->previous = previous;
else {
//Tote node->next == NULL;
//Tote node->next == NULL;
data->list_tail = previous;
}
// Add node at the start of the list
......@@ -205,7 +205,7 @@ int checkhash(struct exfl_hash_node **hashtable) {
return(count);
}
/*
/*
* Check if there are any records in expired flows list and put them
* in shared memory if there is enough space.
*/
......@@ -214,7 +214,7 @@ void check_expired_flows(struct exfl_data *data, shm shm_struct) {
while( (tail != NULL) && (*(shm_struct.size) != data->shm_flows) ) {
// Add the expired flow directly into shared memory table
pthread_mutex_lock( shm_struct.smmutex );
pthread_mutex_lock( shm_struct.smmutex );
memcpy(&(shm_struct.Table[*(shm_struct.size)]), &(tail->flow), sizeof(struct flow_data));
(*(shm_struct.size))++;
pthread_mutex_unlock( shm_struct.smmutex );
......@@ -226,7 +226,7 @@ void check_expired_flows(struct exfl_data *data, shm shm_struct) {
free(tail);
tail = tmp;
}
if( tail == NULL ) {
data->expired_flows_head = data->expired_flows_tail = NULL;
}
......@@ -262,7 +262,7 @@ void poll_expired_flows(mapidflib_function_instance_t *instance) {
previous = tmp->previous;
value = tmp->value;
tmpeflow_data = tmp->flow;
//remove node from hashtable
lookup_hash = exfl_hash_lookup(value, data, tmpeflow_data);
if( lookup_hash != NULL) {
......@@ -301,11 +301,11 @@ void poll_expired_flows(mapidflib_function_instance_t *instance) {
data->list_head = NULL;
data->list_tail = NULL;
}
/*
/*
* When the shared memory segment is full, expired flow records must
* be removed from the temporal sorted list, because they are expired
* be removed from the temporal sorted list, because they are expired
* and for a new packet of this flow, a new record must be created.
* In order to achieve this we have a list with all the expired flow
* In order to achieve this we have a list with all the expired flow
* records that couldn't be returned.
*/
......@@ -315,9 +315,9 @@ void poll_expired_flows(mapidflib_function_instance_t *instance) {
tmp->next->previous = tmp->previous;
if(tmp->previous)
tmp->previous->next = tmp->next;
// if packets_count is worth to add
if(tmp->flow.packets_count >= ((struct exfl_data *)instance->internal_data)->packets_count_min) {
if(tmp->flow.valid && tmp->flow.packets_count >= ((struct exfl_data *)instance->internal_data)->packets_count_min) {
if(data->expired_flows_list_size < data->expired_flows_list_size_max) { // if buffer not full
//add node to expired flows list
data->expired_flows_list_size++;
......@@ -342,13 +342,13 @@ void poll_expired_flows(mapidflib_function_instance_t *instance) {
stats->ignored++;
free(tmp);
}
}
else { // shm not full
// if packets_count is worth to add
if(tmp->flow.packets_count >= ((struct exfl_data *)instance->internal_data)->packets_count_min) {
if(tmp->flow.valid && tmp->flow.packets_count >= ((struct exfl_data *)instance->internal_data)->packets_count_min) {
// Add the expired flow from temporal sorted list directly into shared memory table
pthread_mutex_lock( shm_struct.smmutex );
pthread_mutex_lock( shm_struct.smmutex );
memcpy(&(shm_struct.Table[*(shm_struct.size)]), &(tmp->flow), sizeof(struct flow_data));
(*(shm_struct.size))++;
pthread_mutex_unlock( shm_struct.smmutex );
......@@ -384,6 +384,9 @@ static int exprflow_instance(mapidflib_function_instance_t *instance,
instance->def->shm_size = sizeof(flows_stat) + sizeof(struct flow_data) * shm_flows + sizeof(pthread_mutex_t);
// 0 shm (instance->result.data) DIMAPI_DATA_SIZE
// | mapi_result_type | flows_stat | shm_flows * flow_data | pthread_mutex_t |
return 0;
}
......@@ -436,10 +439,10 @@ static int exprflow_init(mapidflib_function_instance_t *instance, MAPI_UNUSED in
//mutex initialization
*(shm_struct.smmutex) = tmpmutex;
// the thread pid is stored in internal_data in order to be available for stopping it in cleanup
// the thread pid is stored in internal_data in order to be available for stopping it in cleanup
mythread = pthread_create(&((((struct exfl_data *)(instance->internal_data))->pthread)), NULL, (void *) &poll_expired_flows, (void *)instance);
return 0;
}
......@@ -487,9 +490,9 @@ static int exprflow_process(mapidflib_function_instance_t *instance, MAPI_UNUSED
p += sizeof(struct vlan_802q_header);
headerlenoverplus = sizeof(struct vlan_802q_header);
}
if(ethertype == MPLS_MASK) {
p += 4;
p += 4;
headerlenoverplus = 4;
}
else if(ethertype != ETHERTYPE_IP) {
......@@ -503,7 +506,7 @@ static int exprflow_process(mapidflib_function_instance_t *instance, MAPI_UNUSED
p += sizeof(struct hdlc_header);
len -= sizeof(struct hdlc_header);
ethertype = ntohs(hp->ctrl);
ethertype = ntohs(hp->proto);
if (ethertype != ETHERTYPE_IP) {
return 0;
......@@ -527,6 +530,7 @@ static int exprflow_process(mapidflib_function_instance_t *instance, MAPI_UNUSED
record.dport = ntohs(tcp->dport);
record.timestamp = pkt_head->ts;
record.epoch = data->epoch;
record.valid = data->epoch > TIMEOUT;
record.ptcl = ip->ptcl;
record.bytes_count = pkt_head->wlen - headerlenoverplus;
record.ttl_pkt1 = ip->ttl;
......@@ -541,8 +545,9 @@ static int exprflow_process(mapidflib_function_instance_t *instance, MAPI_UNUSED
record.dport = ntohs(udp->dport);
record.timestamp = pkt_head->ts;
record.epoch = data->epoch;
record.valid = data->epoch > TIMEOUT;
record.ptcl = ip->ptcl;
record.bytes_count = pkt_head->wlen - headerlenoverplus;
record.bytes_count = pkt_head->wlen - headerlenoverplus;
record.ttl_pkt1 = ip->ttl;
add_toflow(data, record, stats);
}
......@@ -553,21 +558,22 @@ static int exprflow_process(mapidflib_function_instance_t *instance, MAPI_UNUSED
record.sport = record.dport = ntohs(0);
record.timestamp = pkt_head->ts;
record.epoch = data->epoch;
record.valid = data->epoch > TIMEOUT;
record.ptcl = ip->ptcl;
record.bytes_count = pkt_head->wlen - headerlenoverplus;
record.bytes_count = pkt_head->wlen - headerlenoverplus;
record.ttl_pkt1 = ip->ttl;
add_toflow(data, record, stats);
}
return 1;
}
static int exprflow_reset(MAPI_UNUSED mapidflib_function_instance_t *instance)
static int exprflow_reset(MAPI_UNUSED mapidflib_function_instance_t *instance)
{
// empty HashTable?
return 0;
}
static int exprflow_cleanup(mapidflib_function_instance_t *instance)
static int exprflow_cleanup(mapidflib_function_instance_t *instance)
{
struct exfl_list_node *tmp = ((struct exfl_data *)(instance->internal_data))->list_head;
struct exfl_list_node *next;
......@@ -577,14 +583,14 @@ static int exprflow_cleanup(mapidflib_function_instance_t *instance)
// stop polling thread
pthread_cancel((((struct exfl_data *)(instance->internal_data))->pthread));
//fprintf(stderr, "Hashtable contains %d buckets\n", checkhash(((struct exfl_data *)(instance->internal_data))->hashtable));
//fprintf(stderr, "Hashtable contains %d buckets\n", checkhash(((struct exfl_data *)(instance->internal_data))->hashtable));
// HashTable deallocation
while( i-- > 0 ) {
tmphash = ((struct exfl_data *)(instance->internal_data))->hashtable[i];
while(tmphash != NULL) {
//fprintf(stderr, "Cleaning hash node %p\n", tmphash);
//print_flow(&(tmphash->node->flow));
nexthash = tmphash->next;
free(tmphash);
tmphash = nexthash;
......@@ -610,7 +616,7 @@ static int exprflow_cleanup(mapidflib_function_instance_t *instance)
}
free(instance->internal_data);
return 0;
}
......@@ -644,14 +650,14 @@ static int exprflow_client_read_result(mapidflib_function_instance_t *instance,m
res->res = instance->internal_data;
res->size = sizeof(flows_stat) + sizeof(flow_data) * (*(shm_struct.size));
pthread_mutex_lock( shm_struct.smmutex );
pthread_mutex_lock( shm_struct.smmutex );
memcpy(instance->internal_data, instance->result.data, res->size);
stats->received = 0;
stats->expired = 0;
*(shm_struct.size) = 0; //stats->sent = 0;
stats->ignored = 0;
stats->dropped = 0;
pthread_mutex_unlock( shm_struct.smmutex );
pthread_mutex_unlock( shm_struct.smmutex );
return(0);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment