Commit bdf12c63 authored by 's avatar
Browse files

increased buffers, fixed EXPIRED_FLOWS shm_size parameter bug


git-svn-id: file:///home/svn/mapi/trunk@1370 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent 2b13ce53
...@@ -17,12 +17,12 @@ ...@@ -17,12 +17,12 @@
#include "cgi_headers_packetloss.h" #include "cgi_headers_packetloss.h"
#define TIMEOUT 500 // time to drop an unmatched flow #define TIMEOUT 500 // time to drop an unmatched flow
#define HASHTABLE_SIZE 4096 #define HASHTABLE_SIZE 65536
#define WITH_NCURSES 0 // set to 1 if you want ncurses, 0 otherwise #define WITH_NCURSES 0 // set to 1 if you want ncurses, 0 otherwise
#define REFRESH_TIME 10 // 10 seconds base interval with which data will be fed into the RRD #define REFRESH_TIME 10 // 10 seconds base interval with which data will be fed into the RRD
#define SHM_FLOWS_DEVICE_MAX 49000 // (DIMAPI_DATA_SIZE - sizeof(flows_stat)) / sizeof(struct flow_data)) #define SHM_FLOWS_DEVICE_MAX 49999 // (DIMAPI_DATA_SIZE - sizeof(flows_stat) - sizeof(unsigned int)) / sizeof(struct flow_data))
#define SHM_FLOWS_TOTAL_MAX 225000 #define SHM_FLOWS_TOTAL_MAX 250000
#define TIMESTAMP_TOLERANCE 430000000 // NTP 0.1 #define TIMESTAMP_TOLERANCE 430000000 // NTP 0.1
#define MIN(x, y) ((x) < (y) ? (x) : (y)) #define MIN(x, y) ((x) < (y) ? (x) : (y))
#define DIFF(x, y) ((x) > (y) ? ((x) - (y)) : ((y) - (x))) #define DIFF(x, y) ((x) > (y) ? ((x) - (y)) : ((y) - (x)))
...@@ -337,7 +337,7 @@ int main(int argc, char *argv[]) ...@@ -337,7 +337,7 @@ int main(int argc, char *argv[])
unsigned int *results; // number of results/flowdata = (unsigned int *) ... unsigned int *results; // number of results/flowdata = (unsigned int *) ...
struct flows_stat *flowsstats; struct flows_stat *flowsstats;
struct flow_data *flowdata; // result/flowdata = (struct flow_data *)((char *) ... + sizeof(flows_stat)); struct flow_data *flowdata; // result/flowdata = (struct flow_data *)((char *) ... + sizeof(flows_stat) + sizeof(unsigned int));
unsigned long long pkloss = 0; unsigned long long pkloss = 0;
...@@ -710,7 +710,7 @@ int main(int argc, char *argv[]) ...@@ -710,7 +710,7 @@ int main(int argc, char *argv[])
} }
results = (unsigned int *) host[h].results_dev[d].res; // number of results/flows at host h, dev d results = (unsigned int *) host[h].results_dev[d].res; // number of results/flows at host h, dev d
flowsstats = (struct flows_stat *) host[h].results_dev[d].res; flowsstats = (struct flows_stat *) host[h].results_dev[d].res;
flowdata = (struct flow_data *)((char *) host[h].results_dev[d].res + sizeof(flows_stat)); // results/flows array flowdata = (struct flow_data *)((char *) host[h].results_dev[d].res + sizeof(flows_stat) + sizeof(unsigned int)); // results/flows array
fprintf(stdout, "packetloss: storing and matching results from %s:%d: %d\n", host[h].name, d, *results); fflush(stdout); fprintf(stdout, "packetloss: storing and matching results from %s:%d: %d\n", host[h].name, d, *results); fflush(stdout);
// store flowstats_dev // store flowstats_dev
...@@ -763,7 +763,7 @@ int main(int argc, char *argv[]) ...@@ -763,7 +763,7 @@ int main(int argc, char *argv[])
// }}} // }}}
results = (unsigned int *) host[h].results_dev[d].res; // number of results/flows results = (unsigned int *) host[h].results_dev[d].res; // number of results/flows
flowdata = (struct flow_data *) ((char *) host[h].results_dev[d].res + sizeof(flows_stat)); // results/flows array flowdata = (struct flow_data *) ((char *) host[h].results_dev[d].res + sizeof(flows_stat) + sizeof(unsigned int)); // results/flows array
for (count = *results; count > 0; count--) { for (count = *results; count > 0; count--) {
if (LOGFILE) { if (LOGFILE) {
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
#define MAX_SEND_SIZE 8192//1024 #define MAX_SEND_SIZE 8192//1024
#define FUNCTARGS_BUF_SIZE 7168 #define FUNCTARGS_BUF_SIZE 7168
#define DATA_SIZE 7168 #define DATA_SIZE 7168
#define DIMAPI_DATA_SIZE 1000000 #define DIMAPI_DATA_SIZE 2000000
#define FUNCT_NAME_LENGTH 256 #define FUNCT_NAME_LENGTH 256
#define ARG_LENGTH 32 #define ARG_LENGTH 32
#define MAPIDSOCKHOME "%s/.mapid.sock" #define MAPIDSOCKHOME "%s/.mapid.sock"
......
#define EXFL_HASH_SIZE 4096 #define EXFL_HASH_SIZE 262144 //4096
#include "protocols.h" #include "protocols.h"
......
...@@ -35,6 +35,7 @@ pthread_t pthread; ...@@ -35,6 +35,7 @@ pthread_t pthread;
typedef struct shm { typedef struct shm {
unsigned int *size; unsigned int *size;
unsigned int *shm_flows;
flow_data *Table; flow_data *Table;
pthread_mutex_t *smmutex; pthread_mutex_t *smmutex;
}shm; }shm;
...@@ -182,7 +183,7 @@ struct exfl_hash_node *exfl_hash_lookup(unsigned int value, struct exfl_data *da ...@@ -182,7 +183,7 @@ struct exfl_hash_node *exfl_hash_lookup(unsigned int value, struct exfl_data *da
if(compare_ip(record.saddr, hashflow->saddr) && compare_ip(record.daddr, hashflow->daddr)) { if(compare_ip(record.saddr, hashflow->saddr) && compare_ip(record.daddr, hashflow->daddr)) {
if((record.sport == hashflow->sport) && (record.dport == hashflow->dport)) return tmp; if((record.sport == hashflow->sport) && (record.dport == hashflow->dport)) return tmp;
} }
if(compare_ip(record.saddr, hashflow->daddr) && compare_ip(record.daddr, hashflow->saddr)){ if(compare_ip(record.saddr, hashflow->daddr) && compare_ip(record.daddr, hashflow->saddr)) {
if((record.sport == hashflow->dport) && (record.dport == hashflow->sport)) return tmp; if((record.sport == hashflow->dport) && (record.dport == hashflow->sport)) return tmp;
} }
} }
...@@ -249,12 +250,10 @@ void poll_expired_flows(mapidflib_function_instance_t *instance) { ...@@ -249,12 +250,10 @@ void poll_expired_flows(mapidflib_function_instance_t *instance) {
int check; int check;
shm shm_struct; shm shm_struct;
stats = (flows_stat *) instance->result.data;
stats = (flows_stat *) instance->result.data; stats = (flows_stat *) instance->result.data;
shm_struct.size = (unsigned int *)instance->result.data; shm_struct.size = (unsigned int *)instance->result.data;
shm_struct.Table = (flow_data *)((char *)instance->result.data+sizeof(flows_stat)); shm_struct.Table = (flow_data *)((char *)instance->result.data+sizeof(flows_stat)+sizeof(unsigned int));
shm_struct.smmutex = (pthread_mutex_t *)(((char *)instance->result.data) + sizeof(flows_stat) + sizeof(struct flow_data)*data->shm_flows); shm_struct.smmutex = (pthread_mutex_t *)(((char *)instance->result.data) + sizeof(flows_stat) + sizeof(unsigned int) + sizeof(struct flow_data)*data->shm_flows);
*(shm_struct.size) = 0; *(shm_struct.size) = 0;
while(data->run) { while(data->run) {
...@@ -384,12 +383,12 @@ static int exprflow_instance(mapidflib_function_instance_t *instance, ...@@ -384,12 +383,12 @@ static int exprflow_instance(mapidflib_function_instance_t *instance,
MAPI_UNUSED int fd, MAPI_UNUSED int fd,
MAPI_UNUSED mapidflib_flow_mod_t *flow_mod) { MAPI_UNUSED mapidflib_flow_mod_t *flow_mod) {
mapiFunctArg* fargs; mapiFunctArg* fargs;
int shm_flows; // max: (DIMAPI_DATA_SIZE - sizeof(flows_stat)) / sizeof(struct flow_data)) int shm_flows; // max: (DIMAPI_DATA_SIZE - sizeof(flows_stat) - sizeof(unsigned int)) / sizeof(struct flow_data))
fargs = instance->args; fargs = instance->args;
shm_flows = getargint(&fargs); shm_flows = getargint(&fargs);
instance->def->shm_size = sizeof(flows_stat) + sizeof(struct flow_data) * shm_flows + sizeof(pthread_mutex_t); instance->def->shm_size = sizeof(flows_stat) + sizeof(unsigned int) + sizeof(struct flow_data) * shm_flows + sizeof(unsigned int) + sizeof(pthread_mutex_t);
return 0; return 0;
} }
...@@ -425,21 +424,23 @@ static int exprflow_init(mapidflib_function_instance_t *instance, MAPI_UNUSED in ...@@ -425,21 +424,23 @@ static int exprflow_init(mapidflib_function_instance_t *instance, MAPI_UNUSED in
data->epoch=0; data->epoch=0;
// function arguments // function arguments
data->shm_flows = getargint(&fargs); // max: (DIMAPI_DATA_SIZE - sizeof(flows_stat)) / sizeof(struct flow_data)) data->shm_flows = getargint(&fargs); // max: (DIMAPI_DATA_SIZE - sizeof(flows_stat) - sizeof(unsigned int)) / sizeof(struct flow_data))
data->expired_flows_list_size_max = getargint(&fargs); data->expired_flows_list_size_max = getargint(&fargs);
data->packets_count_min = getargint(&fargs); data->packets_count_min = getargint(&fargs);
// Shared Memory Initialization // Shared Memory Initialization
stats = (flows_stat *)instance->result.data; stats = (flows_stat *)instance->result.data;
shm_struct.size = (unsigned int *)instance->result.data; shm_struct.size = (unsigned int *)instance->result.data;
shm_struct.Table = (flow_data *)((char *)instance->result.data+sizeof(flows_stat)); shm_struct.shm_flows = (unsigned int*)((char *)instance->result.data+sizeof(flows_stat));
shm_struct.smmutex = (pthread_mutex_t *)(((char *)instance->result.data) + sizeof(flows_stat) + sizeof(struct flow_data)*data->shm_flows); shm_struct.Table = (flow_data *)((char *)instance->result.data+sizeof(flows_stat)+sizeof(unsigned int));
shm_struct.smmutex = (pthread_mutex_t *)(((char *)instance->result.data) + sizeof(flows_stat) + sizeof(unsigned int) + sizeof(struct flow_data)*data->shm_flows);
stats->received = 0; stats->received = 0;
stats->expired = 0; stats->expired = 0;
*(shm_struct.size) = 0; //stats->sent = 0; *(shm_struct.size) = 0; //stats->sent = 0;
stats->ignored = 0; stats->ignored = 0;
stats->dropped = 0; stats->dropped = 0;
*(shm_struct.shm_flows) = data->shm_flows;
//mutex initialization //mutex initialization
*(shm_struct.smmutex) = tmpmutex; *(shm_struct.smmutex) = tmpmutex;
...@@ -614,18 +615,20 @@ static int exprflow_client_read_result(mapidflib_function_instance_t *instance,m ...@@ -614,18 +615,20 @@ static int exprflow_client_read_result(mapidflib_function_instance_t *instance,m
flows_stat *stats; flows_stat *stats;
mapiFunctArg* fargs; mapiFunctArg* fargs;
int shm_flows; // max: (DIMAPI_DATA_SIZE - sizeof(flows_stat)) / sizeof(struct flow_data)) int shm_flows; // max: (DIMAPI_DATA_SIZE - sizeof(flows_stat) - sizeof(unsigned int)) / sizeof(struct flow_data))
fargs = instance->args; fargs = instance->args;
shm_flows = getargint(&fargs); shm_flows = getargint(&fargs);
stats = (flows_stat *)instance->result.data; stats = (flows_stat *)instance->result.data;
shm_struct.size = (unsigned int *)instance->result.data; shm_struct.size = (unsigned int *)instance->result.data;
shm_struct.Table = (flow_data *)((char *)instance->result.data+sizeof(flows_stat)); shm_struct.shm_flows = (unsigned int *)((char *)instance->result.data+sizeof(flows_stat));
shm_struct.smmutex = (pthread_mutex_t *)(((char *)instance->result.data) + sizeof(flows_stat) + sizeof(struct flow_data)*shm_flows); shm_struct.Table = (flow_data *)((char *)instance->result.data+sizeof(flows_stat)+sizeof(unsigned int));
fprintf(stdout, "shm_flows: %u", *(shm_struct.shm_flows)); fflush(stdout);
shm_struct.smmutex = (pthread_mutex_t *)(((char *)instance->result.data) + sizeof(flows_stat) + sizeof(unsigned int) + sizeof(struct flow_data)*(*shm_struct.shm_flows));
res->res = instance->internal_data; res->res = instance->internal_data;
res->size = sizeof(flows_stat) + sizeof(flow_data)*(*(shm_struct.size)); res->size = sizeof(flows_stat) + sizeof(unsigned int) + sizeof(flow_data)*(*(shm_struct.size));
size = *shm_struct.size; size = *shm_struct.size;
pthread_mutex_lock( shm_struct.smmutex ); pthread_mutex_lock( shm_struct.smmutex );
...@@ -637,25 +640,25 @@ static int exprflow_client_read_result(mapidflib_function_instance_t *instance,m ...@@ -637,25 +640,25 @@ static int exprflow_client_read_result(mapidflib_function_instance_t *instance,m
stats->dropped = 0; stats->dropped = 0;
pthread_mutex_unlock( shm_struct.smmutex ); pthread_mutex_unlock( shm_struct.smmutex );
data = (flow_data *)((char *)instance->internal_data+sizeof(flows_stat)); data = (flow_data *)((char *)instance->internal_data+sizeof(flows_stat)+sizeof(unsigned int));
return(0); return(0);
} }
static int exprflow_client_init( mapidflib_function_instance_t *instance, void *data) { static int exprflow_client_init( mapidflib_function_instance_t *instance, void *data) {
mapiFunctArg* fargs; mapiFunctArg* fargs;
int shm_flows; // max: (DIMAPI_DATA_SIZE - sizeof(flows_stat)) / sizeof(struct flow_data)) int shm_flows; // max: (DIMAPI_DATA_SIZE - sizeof(flows_stat) - sizeof(unsigned int)) / sizeof(struct flow_data))
fargs = instance->args; fargs = instance->args;
shm_flows = getargint(&fargs); shm_flows = getargint(&fargs);
if(shm_flows > (DIMAPI_DATA_SIZE - sizeof(flows_stat)) / sizeof(struct flow_data)) { if(shm_flows > (DIMAPI_DATA_SIZE - sizeof(flows_stat) - sizeof(unsigned int)) / sizeof(struct flow_data)) {
printf("Cannot process %d flows. Maximum is %d.\n", shm_flows, (DIMAPI_DATA_SIZE - sizeof(flows_stat)) / sizeof(struct flow_data)); printf("Cannot process %d flows. Maximum is %d.\n", shm_flows, (DIMAPI_DATA_SIZE - sizeof(flows_stat)) / sizeof(struct flow_data));
return(-1); return(-1);
} }
if((instance->internal_data = malloc(sizeof(flows_stat)+sizeof(struct flow_data)*shm_flows)) == NULL) { if((instance->internal_data = malloc(sizeof(flows_stat)+sizeof(unsigned int)+sizeof(struct flow_data)*shm_flows)) == NULL) {
printf("Malloc failed for size %d [%s:%d]\n", sizeof(flows_stat)+sizeof(struct flow_data)*shm_flows, __FILE__, __LINE__); printf("Malloc failed for size %d [%s:%d]\n", sizeof(flows_stat)+sizeof(unsigned int)+sizeof(struct flow_data)*shm_flows, __FILE__, __LINE__);
return(-1); return(-1);
} }
data = instance->internal_data; data = instance->internal_data;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment