Commit 829a46bd authored by 's avatar

Parametrized shm size for better scalability.

Increased DIMAPI_DATA_SIZE to 1M.


git-svn-id: file:///home/svn/mapi/trunk@1359 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent 0d2a4d9a
......@@ -21,6 +21,10 @@
#define WITH_NCURSES 0 // set to 1 if you want ncurses, 0 otherwise
#define REFRESH_TIME 10 // 10 seconds base interval with which data will be fed into the RRD
#define SHM_FLOWS_DEVICE_MAX 24900 // (DIMAPI_DATA_SIZE - sizeof(flows_stat)) / sizeof(struct flow_data))
#define SHM_FLOWS_TOTAL_MAX 225000
#define MIN(x, y) ((x) < (y) ? (x) : (y))
#define LOGFILE 0 //Log enabled
#define LOGFILENAME_FULL "packetloss.full.log" //Log filename
#define LOGFILENAME_LOST "packetloss.lost.log"
......@@ -182,6 +186,8 @@ int hosts = 0; // number of hosts (total)
struct tstats stats; // all hosts stats
unsigned int totalflows; // count total # of flows (records) received from hosts/devices
// Functions declaration
static void terminate();
void print_protocols(FILE * fp, char *protocol, unsigned long long total_pkts, unsigned long long total_lost);
......@@ -361,7 +367,7 @@ int main(int argc, char *argv[])
int maxpad = 13;
float speed = 0.0;
//int devices = 0; // number of devices (total)
int devices = 0; // number of devices (total)
//int flows = 0; // number of flows (total)
char error[512];
......@@ -376,6 +382,7 @@ int main(int argc, char *argv[])
unsigned int uptime = 0;
bool overload = false;
unsigned int shm_flows;
const unsigned int expired_flows_list_size_max = 1249;
const unsigned int packets_count_min = 2;
......@@ -409,7 +416,7 @@ int main(int argc, char *argv[])
}
// }}}
// Get host names, ents/IPs, # of devices {{{
// Get host names, ents/IPs, # of devices, shm_flows {{{
for(h = 0; h < hosts; h++) {
// get host name
tmp1 = strchr(argv[h + 1], ':');
......@@ -424,7 +431,7 @@ int main(int argc, char *argv[])
host[h].devices++;
k = strtok(NULL,", ");
}
//devices += host[h].devices;
devices += host[h].devices;
// get host IPs by names
// if((host[h].ent = gethostbyname(host[h].name)) == NULL) {
......@@ -444,6 +451,9 @@ int main(int argc, char *argv[])
// flows += 2;
//}
}
shm_flows = MIN(SHM_FLOWS_TOTAL_MAX / devices, SHM_FLOWS_DEVICE_MAX);
// }}}
// Initialize host arrays {{{
......@@ -558,7 +568,7 @@ int main(int argc, char *argv[])
fprintf(stdout, "packetloss: apply EXPIRED_FLOWS at %s\n", host[h].name); fflush(stdout);
if ((host[h].fid_loss = mapi_apply_function(host[h].fd, "EXPIRED_FLOWS", expired_flows_list_size_max, packets_count_min)) < 0) {
if ((host[h].fid_loss = mapi_apply_function(host[h].fd, "EXPIRED_FLOWS", shm_flows, expired_flows_list_size_max, packets_count_min)) < 0) {
fprintf(stderr, "Count not apply function EXPIRED_FLOWS to flow %d\n", host[h].fd);
mapi_read_error(&err_no, error);
fprintf(stderr, "Errorcode: %d description: %s\n", err_no, error);
......@@ -688,6 +698,7 @@ int main(int argc, char *argv[])
// }}}
// Store host results in the hashtable {{{
totalflows = 0;
for (h = 0; h < hosts; h++) {
// initialize flowsstats
host[h].flowsstats.received = 0;
......@@ -721,6 +732,7 @@ int main(int argc, char *argv[])
host[h].flowsstats.dropped += host[h].flowsstats_dev[d].dropped;
if(*results) {
totalflows += *results;
store_results(h, d, *results, flowdata);
}
}
......@@ -977,9 +989,11 @@ int main(int argc, char *argv[])
fprintf(fp, "*;basic;*;*;*;*;*;uptime;Up time;%d;secs;\n", uptime);
fprintf(fp, "*;basic;*;*;*;*;*;pcap_filter;The filter expression used (pcap filter);\"%s\";text;\n", filter);
fprintf(fp, "*;basic;*;*;*;*;*;packets_count_min;Ignore flows with less packets than...;%u;text;\n", packets_count_min);
fprintf(fp, "host;int_stats;*;*;*;*;*;shm_flows;Internal buffer size for expired flows at hosts/devices;%u;flows;\n", shm_flows);
fprintf(fp, "host;int_stats;*;*;*;*;*;expired_flows_list_size_max;Internal buffer size for expired flows at hosts/devices;%u;flows;\n", expired_flows_list_size_max);
fprintf(fp, "*;hidden;*;*;*;*;*;loop_busytime;Packetloss application busy time [s];%u;text;\n", loop_busytime);
fprintf(fp, "*;hidden;*;*;*;*;*;overload;Packetloss application overloaded [bool];%u;text;\n", overload);
fprintf(fp, "*;hidden;*;*;*;*;*;totalflows;Flows (records) received -- packetloss app utilization;%u of %u max (%6.2f %%);text;\n", totalflows, SHM_FLOWS_TOTAL_MAX, PCTOF(totalflows, SHM_FLOWS_TOTAL_MAX));
fprintf(fp, "\n");
// }}}
// basic information - host/device 1 {{{
......
......@@ -22,7 +22,7 @@
#define MAX_SEND_SIZE 8192//1024
#define FUNCTARGS_BUF_SIZE 7168
#define DATA_SIZE 7168
#define DIMAPI_DATA_SIZE 50000
#define DIMAPI_DATA_SIZE 1000000
#define FUNCT_NAME_LENGTH 256
#define ARG_LENGTH 32
#define MAPIDSOCKHOME "%s/.mapid.sock"
......
......@@ -53,4 +53,5 @@ struct exfl_data {
unsigned int run;
unsigned long long epoch;
unsigned int packets_count_min;
unsigned int shm_flows;
};
......@@ -20,7 +20,6 @@
#define SEND_SIZE 64 //Number of expired flows send per mapi_read_results.
#define TIMEOUT 30 //in epochs
#define EPOCHDURATION 1
#define SHM_FLOWS 1000 //1249 // (DIMAPI_DATA_SIZE - sizeof(flows_stat)) / sizeof(struct flow_data))
#define ETHERTYPE_8021Q 0x8100
#define MPLS_MASK 0x8847
......@@ -218,7 +217,7 @@ int checkhash(struct exfl_hash_node **hashtable) {
void check_expired_flows(struct exfl_data *data, shm shm_struct) {
struct exfl_list_node *tmp, *tail = data->expired_flows_tail;
while( (tail != NULL) && (*(shm_struct.size) != SHM_FLOWS) ) {
while( (tail != NULL) && (*(shm_struct.size) != data->shm_flows) ) {
// Add the expired flow directly into shared memory table
pthread_mutex_lock( shm_struct.smmutex );
memcpy(&(shm_struct.Table[*(shm_struct.size)]), &(tail->flow), sizeof(struct flow_data));
......@@ -255,7 +254,7 @@ void poll_expired_flows(mapidflib_function_instance_t *instance) {
stats = (flows_stat *) instance->result.data;
shm_struct.size = (unsigned int *)instance->result.data;
shm_struct.Table = (flow_data *)((char *)instance->result.data+sizeof(flows_stat));
shm_struct.smmutex = (pthread_mutex_t *)(((char *)instance->result.data) + sizeof(flows_stat) + sizeof(struct flow_data)*SHM_FLOWS);
shm_struct.smmutex = (pthread_mutex_t *)(((char *)instance->result.data) + sizeof(flows_stat) + sizeof(struct flow_data)*data->shm_flows);
*(shm_struct.size) = 0;
while(data->run) {
......@@ -317,7 +316,7 @@ void poll_expired_flows(mapidflib_function_instance_t *instance) {
* records that couldn't be returned.
*/
if(*(shm_struct.size) == SHM_FLOWS) { // shm full
if(*(shm_struct.size) == data->shm_flows) { // shm full
//remove node from temporal sorted list
if(tmp->next)
tmp->next->previous = tmp->previous;
......@@ -381,6 +380,20 @@ void poll_expired_flows(mapidflib_function_instance_t *instance) {
pthread_exit(NULL);
}
static int exprflow_instance(mapidflib_function_instance_t *instance,
MAPI_UNUSED int fd,
MAPI_UNUSED mapidflib_flow_mod_t *flow_mod) {
mapiFunctArg* fargs;
int shm_flows; // max: (DIMAPI_DATA_SIZE - sizeof(flows_stat)) / sizeof(struct flow_data))
fargs = instance->args;
shm_flows = getargint(&fargs);
instance->def->shm_size = sizeof(flows_stat) + sizeof(struct flow_data) * shm_flows + sizeof(pthread_mutex_t);
return 0;
}
static int exprflow_init(mapidflib_function_instance_t *instance, MAPI_UNUSED int fd)
{
......@@ -412,15 +425,15 @@ static int exprflow_init(mapidflib_function_instance_t *instance, MAPI_UNUSED in
data->epoch=0;
// function arguments
data->shm_flows = getargint(&fargs); // max: (DIMAPI_DATA_SIZE - sizeof(flows_stat)) / sizeof(struct flow_data))
data->expired_flows_list_size_max = getargint(&fargs);
data->packets_count_min = getargint(&fargs);
// Shared Memory Initialization
stats = (flows_stat *)instance->result.data;
shm_struct.size = (unsigned int *)instance->result.data;
shm_struct.Table = (flow_data *)((char *)instance->result.data+sizeof(flows_stat));
shm_struct.smmutex = (pthread_mutex_t *)(((char *)instance->result.data) + sizeof(flows_stat) + sizeof(struct flow_data)*SHM_FLOWS);
shm_struct.smmutex = (pthread_mutex_t *)(((char *)instance->result.data) + sizeof(flows_stat) + sizeof(struct flow_data)*data->shm_flows);
stats->received = 0;
stats->expired = 0;
......@@ -597,10 +610,16 @@ static int exprflow_client_read_result(mapidflib_function_instance_t *instance,m
flow_data *data;
flows_stat *stats;
mapiFunctArg* fargs;
int shm_flows; // max: (DIMAPI_DATA_SIZE - sizeof(flows_stat)) / sizeof(struct flow_data))
fargs = instance->args;
shm_flows = getargint(&fargs);
stats = (flows_stat *)instance->result.data;
shm_struct.size = (unsigned int *)instance->result.data;
shm_struct.Table = (flow_data *)((char *)instance->result.data+sizeof(flows_stat));
shm_struct.smmutex = (pthread_mutex_t *)(((char *)instance->result.data) + sizeof(flows_stat) + sizeof(struct flow_data)*SHM_FLOWS);
shm_struct.smmutex = (pthread_mutex_t *)(((char *)instance->result.data) + sizeof(flows_stat) + sizeof(struct flow_data)*shm_flows);
res->res = instance->internal_data;
res->size = sizeof(flows_stat) + sizeof(flow_data)*(*(shm_struct.size));
......@@ -621,8 +640,19 @@ static int exprflow_client_read_result(mapidflib_function_instance_t *instance,m
static int exprflow_client_init( mapidflib_function_instance_t *instance, void *data) {
if((instance->internal_data = malloc(sizeof(flows_stat)+sizeof(struct flow_data)*SHM_FLOWS)) == NULL) {
printf("Malloc failed for size %d [%s:%d]\n", sizeof(flows_stat)+sizeof(struct flow_data)*SHM_FLOWS, __FILE__, __LINE__);
mapiFunctArg* fargs;
int shm_flows; // max: (DIMAPI_DATA_SIZE - sizeof(flows_stat)) / sizeof(struct flow_data))
fargs = instance->args;
shm_flows = getargint(&fargs);
if(shm_flows > (DIMAPI_DATA_SIZE - sizeof(flows_stat)) / sizeof(struct flow_data)) {
printf("Cannot process %d flows. Maximum is %d.\n", shm_flows, (DIMAPI_DATA_SIZE - sizeof(flows_stat)) / sizeof(struct flow_data));
return(-1);
}
if((instance->internal_data = malloc(sizeof(flows_stat)+sizeof(struct flow_data)*shm_flows)) == NULL) {
printf("Malloc failed for size %d [%s:%d]\n", sizeof(flows_stat)+sizeof(struct flow_data)*shm_flows, __FILE__, __LINE__);
return(-1);
}
data = instance->internal_data;
......@@ -642,14 +672,14 @@ static mapidflib_function_def_t finfo={
"", //libname
EXPIRED_FLOWS, //name
"Expired Flows function", //descr
"ii", //argdescr
"iii", //argdescr
MAPI_DEVICE_ALL, //devtype
MAPIRES_SHM, //Method for returning results
sizeof(flows_stat)+sizeof(struct flow_data)*SHM_FLOWS + sizeof(pthread_mutex_t), //shm size
0, //shm size. Set by instance.
0, //modifies_pkts
0, //filters packets ?
MAPIOPT_NONE,
NULL, //instance
exprflow_instance, //instance
exprflow_init, //init
exprflow_process, //process
NULL, //get_result,
......
......@@ -23,8 +23,8 @@ int main(int argc, char *argv[]) {
mapi_results_t *result;
flow_data *flowdata;
flows_stat *stat;
//unsigned int *stat;
flows_stat *stats;
//unsigned int *stats;
unsigned int i;
......@@ -36,6 +36,7 @@ int main(int argc, char *argv[]) {
int loops = LOOPS;
const unsigned int shm_flows = 24900;
const unsigned int expired_flows_list_size_max = 1249;
const unsigned int packets_count_min = 2;
......@@ -51,7 +52,7 @@ int main(int argc, char *argv[]) {
exit(EXIT_FAILURE);
}
if ((fid = mapi_apply_function(fd, "EXPIRED_FLOWS", expired_flows_list_size_max, packets_count_min)) < 0) {
if ((fid = mapi_apply_function(fd, "EXPIRED_FLOWS", shm_flows, expired_flows_list_size_max, packets_count_min)) < 0) {
fprintf(stderr, "Count not apply function EXPIRED_FLOWS to flow %d\n", fd);
mapi_read_error(&err_no, error);
fprintf(stderr, "Errorcode: %d description: %s\n", err_no, error);
......@@ -75,14 +76,14 @@ int main(int argc, char *argv[]) {
result = (mapi_results_t *) mapi_read_results(fd, fid);
stat = (flows_stat *) result->res;
stats = (flows_stat *) result->res;
flowdata = (struct flow_data *)((char *) result->res + sizeof(flows_stat)); // results/flows array
for (i = stat->sent; i > 0; i--) {
for (i = stats->sent; i > 0; i--) {
print_flow(flowdata);
flowdata++;
}
printf("results: %u, buffered: %u, ignored_total: %llu, dropped_total: %llu\n", stat->sent, stat->buffered, stat->ignored_total, stat->dropped_total);
printf("flows received: %u, expired: %u, sent: %u, ignored: %u, dropped: %u; buffer increase: %d\n", stats->received, stats->expired, stats->sent, stats->ignored, stats->dropped, stats->expired - stats->sent - stats->ignored - stats->dropped);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment