Commit d420ebcb authored by 's avatar
Browse files

More precise scheduling with mysql_time and some bugs corrected



git-svn-id: file:///home/svn/mapi/trunk@335 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent cb35e033
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#define __USE_XOPEN #define __USE_XOPEN
#include <time.h> #include <time.h>
#include <sys/time.h> #include <sys/time.h>
#include <math.h>
#include <mysql/mysql.h> #include <mysql/mysql.h>
...@@ -12,6 +13,7 @@ ...@@ -12,6 +13,7 @@
#include "../../stdlib/sample.h" #include "../../stdlib/sample.h"
#include "abw.h" #include "abw.h"
#include "abw_mysql.h" #include "abw_mysql.h"
#include "abw_time.h"
flow_spec_t *flow_spec[MAX_FLOWS]; flow_spec_t *flow_spec[MAX_FLOWS];
global_spec_t global_spec; global_spec_t global_spec;
...@@ -22,25 +24,25 @@ void usage() { ...@@ -22,25 +24,25 @@ void usage() {
char *progname="abw"; char *progname="abw";
fprintf(stderr, "%s: Usage: %s [options] [header_filter]\n", progname, progname); fprintf(stderr, "%s: Usage: %s [options] [header_filter]\n", progname, progname);
fprintf(stderr, " -d device device (e.g., /dev/combosix/0, /dev/dag0, default %s)\n", DEFAULT_DEVICE); fprintf(stderr, " -d device device (e.g., /dev/combosix/0, /dev/dag0, default %s)\n", DEFAULT_DEVICE);
fprintf(stderr, " -m {d | b | p} sampling mode (default deterministic)\n"); fprintf(stderr, " -m {d | b | p} sampling mode (default deterministic)\n");
fprintf(stderr, " d - deterministic\n"); fprintf(stderr, " d - deterministic\n");
fprintf(stderr, " b - byte deterministic\n"); fprintf(stderr, " b - byte deterministic\n");
fprintf(stderr, " p - probabilistic\n"); fprintf(stderr, " p - probabilistic\n");
fprintf(stderr, " -r threshold pass packet when threshold is reached\n"); fprintf(stderr, " -r threshold pass packet when threshold is reached\n");
fprintf(stderr, " number of packets for d mode (default 1)\n"); fprintf(stderr, " number of packets for d mode (default 1)\n");
fprintf(stderr, " number of bytes for b mode (default 3000)\n"); fprintf(stderr, " number of bytes for b mode (default 3000)\n");
fprintf(stderr, " pass probability for p mode (default 0.5)\n"); fprintf(stderr, " pass probability for p mode (default 0.5)\n");
fprintf(stderr, " -p string string to be searched in payload, multiple -p argument\n"); fprintf(stderr, " -p string string to be searched in payload, multiple -p argument\n");
fprintf(stderr, " can be specified, packet passes when at least one match\n"); fprintf(stderr, " can be specified, packet passes when at least one match\n");
fprintf(stderr, " -f filename file to read multi-flow specification (e.g., abw.cfg)\n"); fprintf(stderr, " -f filename file to read multi-flow specification (e.g., abw.cfg)\n");
fprintf(stderr, " -i interval seconds between measurements (default 1)\n"); fprintf(stderr, " -i interval seconds between measurements (default 1)\n");
fprintf(stderr, " -s DD.MM.YYYY-HH:MM:SS start time (default %s)\n", DEFAULT_START_TIME_STRING); fprintf(stderr, " -s DD.MM.YYYY-HH:MM:SS start time (default %s)\n", DEFAULT_START_TIME_STRING);
fprintf(stderr, " -e DD.MM.YYYY-HH:MM:SS end time (default %s)\n", DEFAULT_END_TIME_STRING); fprintf(stderr, " -e DD.MM.YYYY-HH:MM:SS end time (default %s)\n", DEFAULT_END_TIME_STRING);
fprintf(stderr, " -l mbps link installed bandwidth (default %d)\n", DEFAULT_LINK_MBPS); fprintf(stderr, " -l mbps link installed bandwidth (default %d)\n", DEFAULT_LINK_MBPS);
fprintf(stderr, " -q quiet - do not print results on stdout\n"); fprintf(stderr, " -q quiet - do not print results on stdout\n");
fprintf(stderr, " -b do not insert results into MySQL dataBase\n"); fprintf(stderr, " -b do not insert results into MySQL dataBase\n");
fprintf(stderr, " -h this help message\n"); fprintf(stderr, " -h this help message\n");
exit(-1); exit(-1);
} }
...@@ -61,7 +63,8 @@ flow_spec_t *new_flow_spec(void) { ...@@ -61,7 +63,8 @@ flow_spec_t *new_flow_spec(void) {
p->sau_byte_threshold=3000; p->sau_byte_threshold=3000;
p->sau_pass_probability=0.5; p->sau_pass_probability=0.5;
p->payload_strings_no=0; p->payload_strings_no=0;
p->interval=1; p->interval.tv_sec=1;
p->interval.tv_usec=0;
p->start_time_string=DEFAULT_START_TIME_STRING; p->start_time_string=DEFAULT_START_TIME_STRING;
p->end_time_string=DEFAULT_END_TIME_STRING; p->end_time_string=DEFAULT_END_TIME_STRING;
p->link_mbps=DEFAULT_LINK_MBPS; p->link_mbps=DEFAULT_LINK_MBPS;
...@@ -205,8 +208,9 @@ void print_conf(flow_spec_t **flow_spec) { ...@@ -205,8 +208,9 @@ void print_conf(flow_spec_t **flow_spec) {
printf("\n"); printf("\n");
} }
printf("interval: %f\n", p->interval); printf("interval.tv_sec: %d, interval.tv_usec: %d\n",
printf("start_time_string: %s, start_time_timestamp: %u\n", (int)(p->interval.tv_sec), (int)(p->interval.tv_usec));
printf("start_time_string: %s, start_time_timestamp: %u\n",
p->start_time_string, (unsigned int)(p->start_time_timestamp)); p->start_time_string, (unsigned int)(p->start_time_timestamp));
printf("end_time_string: %s, end_time_timestamp: %u\n", printf("end_time_string: %s, end_time_timestamp: %u\n",
p->end_time_string, (unsigned int)(p->end_time_timestamp)); p->end_time_string, (unsigned int)(p->end_time_timestamp));
...@@ -222,12 +226,13 @@ int main(int argc, char *argv[]) ...@@ -222,12 +226,13 @@ int main(int argc, char *argv[])
int opt; int opt;
struct timeval tm; struct timeval tm;
struct timezone tz; struct timezone tz;
int payload_strings_no; /* int payload_strings_no; */
int direct_flow; int direct_flow;
flow_spec_t **pp; flow_spec_t **pp;
flow_spec_t *p; flow_spec_t *p;
flow_run_t **qq; flow_run_t **qq;
flow_run_t *q; flow_run_t *q;
struct timeval next, wait;
/* Set initial values */ /* Set initial values */
global_spec.conf_filename=NULL; global_spec.conf_filename=NULL;
...@@ -261,7 +266,7 @@ int main(int argc, char *argv[]) ...@@ -261,7 +266,7 @@ int main(int argc, char *argv[])
exit(-1); exit(-1);
} }
payload_strings_no=0; /* payload_strings_no=0; */
direct_flow=0; direct_flow=0;
while ((opt=getopt(argc, argv, ARGS)) != -1) { while ((opt=getopt(argc, argv, ARGS)) != -1) {
...@@ -297,7 +302,7 @@ int main(int argc, char *argv[]) ...@@ -297,7 +302,7 @@ int main(int argc, char *argv[])
break; break;
case 'p': case 'p':
direct_flow=1; direct_flow=1;
if (payload_strings_no >= MAX_PAYLOAD_STRINGS) { if (flow_spec[0]->payload_strings_no >= MAX_PAYLOAD_STRINGS) {
fprintf(stderr, "%s: max. %d strings can be searched in payload\n", __func__, MAX_PAYLOAD_STRINGS); fprintf(stderr, "%s: max. %d strings can be searched in payload\n", __func__, MAX_PAYLOAD_STRINGS);
exit(-1); exit(-1);
} }
...@@ -305,19 +310,26 @@ int main(int argc, char *argv[]) ...@@ -305,19 +310,26 @@ int main(int argc, char *argv[])
fprintf(stderr, "%s: max. %d characters long strings can be searched in payload\n", __func__, MAX_PAYLOAD_STRING); fprintf(stderr, "%s: max. %d characters long strings can be searched in payload\n", __func__, MAX_PAYLOAD_STRING);
exit(-1); exit(-1);
} }
if ((flow_spec[0]->payload_strings[payload_strings_no]=malloc(strlen(optarg)+1))==NULL) { if ((flow_spec[0]->payload_strings[flow_spec[0]->payload_strings_no]=
malloc(strlen(optarg)+1))==NULL) {
fprintf(stderr, "%s: malloc() failed\n", __func__); fprintf(stderr, "%s: malloc() failed\n", __func__);
exit(-1); exit(-1);
} }
strcpy(flow_spec[0]->payload_strings[payload_strings_no], optarg); strcpy(flow_spec[0]->payload_strings[flow_spec[0]->payload_strings_no],
payload_strings_no++; optarg);
flow_spec[0]->payload_strings_no++;
break; break;
case 'i': case 'i':
direct_flow=1; { double interval;
flow_spec[0]->interval=atof(optarg); direct_flow=1;
if (flow_spec[0]->interval<=0) { interval=atof(optarg);
fprintf(stderr, "%s: interval must be positive\n", __func__); if (interval<=0) {
exit(-1); fprintf(stderr, "%s: interval must be positive\n", __func__);
exit(-1);
}
flow_spec[0]->interval.tv_sec=floor(interval);
flow_spec[0]->interval.tv_usec=
(suseconds_t)floor((interval - flow_spec[0]->interval.tv_sec)*1000000);
} }
break; break;
case 's': case 's':
...@@ -374,9 +386,6 @@ int main(int argc, char *argv[]) ...@@ -374,9 +386,6 @@ int main(int argc, char *argv[])
exit(-1); exit(-1);
} }
printf("%s: no_stdout: %d, no_mysql: %d\n", __func__,
global_spec.no_stdout, global_spec.no_mysql);
/* Remaining command-line arguments form a header filter string */ /* Remaining command-line arguments form a header filter string */
for (i=optind; i<argc; i++) { for (i=optind; i<argc; i++) {
...@@ -468,16 +477,16 @@ int main(int argc, char *argv[]) ...@@ -468,16 +477,16 @@ int main(int argc, char *argv[])
if (p->sau_mode == 'd' && p->sau_packet_threshold != 1) { if (p->sau_mode == 'd' && p->sau_packet_threshold != 1) {
if ((q->sample_fid= if ((q->sample_fid=
mapi_apply_function(q->fd, "SAMPLE", PERIODIC, mapi_apply_function(q->fd, "SAMPLE", p->sau_packet_threshold),
p->sau_packet_threshold))<0) { PERIODIC)<0) {
fprintf(stderr, "%s: SAMPLE for flow %d failed\n", __func__, i); fprintf(stderr, "%s: SAMPLE for flow %d failed\n", __func__, i);
exit(-1); exit(-1);
} }
} }
else if (p->sau_mode == 'p' && p->sau_pass_probability != 1) { else if (p->sau_mode == 'p' && p->sau_pass_probability != 1) {
if ((q->sample_fid= if ((q->sample_fid=
mapi_apply_function(q->fd, "SAMPLE", PROBABILISTIC, mapi_apply_function(q->fd, "SAMPLE", (p->sau_pass_probability)*100),
(p->sau_pass_probability)*100))<0) { PROBABILISTIC)<0) {
fprintf(stderr, "%s: SAMPLE for flow %d failed\n", __func__, i); fprintf(stderr, "%s: SAMPLE for flow %d failed\n", __func__, i);
exit(-1); exit(-1);
} }
...@@ -524,7 +533,8 @@ int main(int argc, char *argv[]) ...@@ -524,7 +533,8 @@ int main(int argc, char *argv[])
exit(-1); exit(-1);
} }
printf("%u.%u", (unsigned int)(tm.tv_sec), (unsigned int)(tm.tv_usec)); if (!global_spec.no_stdout)
printf("%u.%u", (unsigned int)(tm.tv_sec), (unsigned int)(tm.tv_usec));
pp=flow_spec; pp=flow_spec;
p=*pp; p=*pp;
...@@ -545,7 +555,7 @@ int main(int argc, char *argv[]) ...@@ -545,7 +555,7 @@ int main(int argc, char *argv[])
__func__, i); __func__, i);
exit(-1); exit(-1);
} }
if (!global_spec.no_stdout) { if (!global_spec.no_stdout) {
printf(" %u/%u", *(q->pkt_counter)-q->old_pkt_counter, printf(" %u/%u", *(q->pkt_counter)-q->old_pkt_counter,
*(q->byte_counter)-q->old_byte_counter); *(q->byte_counter)-q->old_byte_counter);
...@@ -578,12 +588,19 @@ int main(int argc, char *argv[]) ...@@ -578,12 +588,19 @@ int main(int argc, char *argv[])
i++; i++;
} /* while (p) */ } /* while (p) */
printf("\n");
if (!global_spec.no_stdout)
printf("\n");
/* TODO: wait until next checkpoint (not a fixed delay) of /* TODO: wait until next checkpoint (not a fixed delay) of
the flow whose checkpoint is earliest */ the flow whose checkpoint is earliest */
abw_next_timestamp(&(flow_spec[0]->interval), &next, &wait);
printf("next.tv_sec: %d, next.tv_usec: %d, wait.tv_sec: %d, wait.tv_usec: %d\n", (int)(next.tv_sec), (int)(next.tv_usec), (int)(wait.tv_sec), (int)(wait.tv_usec));
usleep(wait.tv_sec * 1000000 + wait.tv_usec);
usleep((flow_spec[0]->interval)*1000000); /* usleep((flow_spec[0]->interval)*1000000); */
} /* while (1) */ } /* while (1) */
/* TODO: this is never reached, move to interrupt handler */ /* TODO: this is never reached, move to interrupt handler */
......
...@@ -115,12 +115,17 @@ int abw_mysql_get_parameters_id(global_spec_t *global_spec, ...@@ -115,12 +115,17 @@ int abw_mysql_get_parameters_id(global_spec_t *global_spec,
return -1; return -1;
} }
sprintf(query, "SELECT * FROM tParameters WHERE sau_mode='%c' AND sau_threshold='%d' AND interval_sec='%f'", sprintf(query, "SELECT * FROM tParameters WHERE sau_mode='%c' AND sau_threshold='%d' AND interval_sec='%d' AND interval_usec='%d'",
p->sau_mode, p->sau_threshold, p->interval); p->sau_mode, p->sau_threshold, (int)(p->interval.tv_sec),
(int)(p->interval.tv_usec));
if (strlen(p->header_filter)) if (strlen(p->header_filter))
sprintf(query+strlen(query), " AND header_filter='%s'", p->header_filter); sprintf(query+strlen(query), " AND header_filter='%s'", p->header_filter);
else
sprintf(query+strlen(query), " AND header_filter=''");
if (p->payload_strings_no) if (p->payload_strings_no)
sprintf(query+strlen(query), " AND payload_string='%s'", p->payload_strings[0]); sprintf(query+strlen(query), " AND payload_string='%s'", p->payload_strings[0]);
else
sprintf(query+strlen(query), " AND payload_string=''");
printf("%s: query: %s\n", __func__, query); printf("%s: query: %s\n", __func__, query);
if (mysql_query(global_spec->mysql, query)) { if (mysql_query(global_spec->mysql, query)) {
...@@ -162,17 +167,17 @@ int abw_mysql_get_parameters_id(global_spec_t *global_spec, ...@@ -162,17 +167,17 @@ int abw_mysql_get_parameters_id(global_spec_t *global_spec,
} }
else { /* parameters are not in tParameters */ else { /* parameters are not in tParameters */
printf("%s: parameters are not in tParameters\n", __func__); printf("%s: parameters are not in tParameters\n", __func__);
strcpy(query, "INSERT INTO tParameters (sau_mode, sau_threshold, interval_sec, link_mbps"); sprintf(query, "INSERT INTO tParameters (sau_mode, sau_threshold, interval_sec, interval_usec, link_mbps, header_filter, payload_string) VALUES ('%c', '%d', '%d', '%d', '%d'",
if (p->header_filter) p->sau_mode, p->sau_threshold, (int)(p->interval.tv_sec),
strcat(query, ", header_filter"); (int)(p->interval.tv_usec), p->link_mbps);
if (p->payload_strings_no) if (strlen(p->header_filter))
strcat(query, ", payload_string");
sprintf(query+strlen(query), ") VALUES ('%c', '%d', '%f', '%d'",
p->sau_mode, p->sau_threshold, p->interval, p->link_mbps);
if (p->header_filter)
sprintf(query+strlen(query), ", '%s'", p->header_filter); sprintf(query+strlen(query), ", '%s'", p->header_filter);
else
sprintf(query+strlen(query), ", ''");
if (p->payload_strings_no) if (p->payload_strings_no)
sprintf(query+strlen(query), ", '%s'", p->payload_strings[0]); sprintf(query+strlen(query), ", '%s'", p->payload_strings[0]);
else
sprintf(query+strlen(query), ", ''");
strcat(query, ");"); strcat(query, ");");
printf("%s: query: %s\n", __func__, query); printf("%s: query: %s\n", __func__, query);
...@@ -219,7 +224,7 @@ int abw_mysql_insert_value(global_spec_t *global_spec, flow_run_t *q, ...@@ -219,7 +224,7 @@ int abw_mysql_insert_value(global_spec_t *global_spec, flow_run_t *q,
sprintf(query+strlen(query), ", '%s'", value_varchar); sprintf(query+strlen(query), ", '%s'", value_varchar);
strcat(query, ");"); strcat(query, ");");
printf("%s: query: %s\n", __func__, query); /* printf("%s: query: %s\n", __func__, query); */
if (mysql_query(global_spec->mysql, query)) { if (mysql_query(global_spec->mysql, query)) {
fprintf(stderr, "%s: mysql_query(%s) failed\n", __func__, query); fprintf(stderr, "%s: mysql_query(%s) failed\n", __func__, query);
return -1; return -1;
......
...@@ -19,7 +19,8 @@ CREATE TABLE tParameters ( ...@@ -19,7 +19,8 @@ CREATE TABLE tParameters (
sau_mode varchar(1), sau_mode varchar(1),
sau_threshold int, sau_threshold int,
payload_string varchar(128), payload_string varchar(128),
interval_sec float, interval_sec int,
interval_usec int,
link_mbps int, link_mbps int,
PRIMARY KEY (id) PRIMARY KEY (id)
); );
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment