Commit d82f5c95 authored by 's avatar
Browse files

Initial support for MySQL


git-svn-id: file:///home/svn/mapi/trunk@323 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent 0cb4b433
CC=gcc
LD=gcc
CFLAGS=-g -O2 -Wall
LIBS=
LIBS=/usr/local/mapi/lib/mapi/mapi.so -lmysqlclient
AR=/usr/bin/ar
RANLIB=ranlib
INSTALL=/usr/bin/install -c
......
......@@ -6,11 +6,17 @@
#include <time.h>
#include <sys/time.h>
#include <mysql/mysql.h>
#include "../../mapi.h"
#include "../../stdlib/sample.h"
#include "abw.h"
flow_spec_t *flow_spec[MAX_FLOWS];
global_spec_t global_spec;
flow_run_t *flow_run[MAX_FLOWS];
void usage() {
char *progname="abw";
......@@ -31,6 +37,8 @@ void usage() {
fprintf(stderr, " -s DD.MM.YYYY-HH:MM:SS start time (default %s)\n", DEFAULT_START_TIME_STRING);
fprintf(stderr, " -e DD.MM.YYYY-HH:MM:SS end time (default %s)\n", DEFAULT_END_TIME_STRING);
fprintf(stderr, " -l mbps link installed bandwidth (default %d)\n", DEFAULT_LINK_MBPS);
fprintf(stderr, " -q quiet - do not print results on stdout\n");
fprintf(stderr, " -b do not insert results into MySQL dataBase\n");
fprintf(stderr, " -h this help message\n");
exit(-1);
}
......@@ -60,6 +68,19 @@ flow_spec_t *new_flow_spec(void) {
return p;
} /* new_flow_spec() */
flow_run_t *new_flow_run(void) {
flow_run_t *p;
if ((p=malloc(sizeof(flow_run_t)))==NULL) {
fprintf(stderr, "%s: malloc() failed\n", __func__);
return NULL;
}
memset(p, 0, sizeof(flow_run_t));
return p;
} /* new_flow_run() */
int read_conf_file(char *filename, flow_spec_t **flow_spec)
{
return 0;
......@@ -97,7 +118,7 @@ int check_conf(flow_spec_t **flow_spec) {
return -1;
}
p->sau_threshold=p->sau_byte_threshold;
p->sau_mode_encoded=COMBO6_DETERMINISTIC;
p->sau_mode_encoded=COMBO6_LENGTH_DETERMINISTIC;
}
else {
if (p->sau_packet_threshold<0) {
......@@ -121,6 +142,31 @@ int check_conf(flow_spec_t **flow_spec) {
return -1;
}
if (strptime(p->end_time_string, "%d.%m.%Y-%H:%M:%S",
&p->end_time_tm)==NULL) {
fprintf(stderr, "%s: flow[%d]: cannot parse end time %s\n",
__func__, i, p->end_time_string);
return -1;
}
if ((p->end_time_timestamp=mktime(&p->end_time_tm))<0) {
fprintf(stderr, "%s: flow[%d]: cannot convert end time %s to timestamp\n",
__func__, i, p->end_time_string);
return -1;
}
/* TODO: remove these limits */
if (p->sau_mode=='b') {
fprintf(stderr, "%s: byte probabilistic sampling is not supported\n",
__func__);
return -1;
}
if (p->payload_strings[1]) {
fprintf(stderr, "%s: payload searching for multiple strings is not supported\n", __func__);
return -1;
}
pp++; p=*pp; i++;
} /* while (p) */
......@@ -130,7 +176,8 @@ int check_conf(flow_spec_t **flow_spec) {
void print_conf(flow_spec_t **flow_spec) {
flow_spec_t **pp;
flow_spec_t *p;
char **r;
char **rr;
char *r;
int i, j;
if (flow_spec==NULL)
......@@ -141,19 +188,21 @@ void print_conf(flow_spec_t **flow_spec) {
while (p) {
printf("flow[%d]:\n", i);
printf("header_filter: |%s|\n", p->header_filter);
printf("device: |%s|\n", p->device);
printf("header_filter: %s\n", p->header_filter);
printf("device: %s\n", p->device);
printf("sau_mode: %c, sau_threshold: %u\n", p->sau_mode, p->sau_threshold);
r=p->payload_strings; j=0;
if (r)
rr=p->payload_strings;
r=*rr; j=0;
if (r) {
printf("payload_strings:");
while (r) {
if (j>0)
printf(",");
printf(" |%s|", *r);
r++; j++;
while (r) {
if (j>0)
printf(",");
printf(" |%s|", r);
rr++; r=*rr; j++;
}
printf("\n");
}
printf("\n");
printf("interval: %f\n", p->interval);
printf("start_time_string: %s, start_time_timestamp: %u\n",
......@@ -168,50 +217,68 @@ void print_conf(flow_spec_t **flow_spec) {
int main(int argc, char *argv[])
{
int i;
int opt;
struct timeval tm;
struct timezone tz;
int payload_strings_no;
/* Set values */
global_spec.conf_filename=NULL;
int i;
int opt;
struct timeval tm;
struct timezone tz;
int payload_strings_no;
int direct_flow;
flow_spec_t **pp;
flow_spec_t *p;
flow_run_t **qq;
flow_run_t *q;
char query[MAX_QUERY+1];
/* Set initial values */
global_spec.conf_filename=NULL;
/* TODO get the following MySQL settings from configuration */
global_spec.user="abw";
global_spec.passwd="lab10";
global_spec.db="abw";
if (gethostname(global_spec.hostname, MAX_HOSTNAME+1)<0) {
fprintf(stderr, "%s: gethostname() failed\n", __func__);
exit(-1);
}
{ char domainname[MAX_HOSTNAME+1];
if (getdomainname(domainname, MAX_HOSTNAME+1)<0) {
fprintf(stderr, "%s: getdomainname() failed\n", __func__);
exit(-1);
}
strcat(global_spec.hostname, domainname);
}
/* TODO: getdomainname() returns empty string, temporary hack here */
strcat(global_spec.hostname, ".cesnet.cz");
printf("hostname: %s\n", global_spec.hostname);
for (i=0; i<MAX_FLOWS; i++)
for (i=0; i<MAX_FLOWS; i++) {
flow_spec[i]=NULL;
flow_run[i]=NULL;
}
/* if (argc<2) {
usage();
return -1;
} */
if ((flow_spec[0]=new_flow_spec())==NULL) {
fprintf(stderr, "%s: new_flow_spec() failed\n", __func__);
exit(-1);
}
payload_strings_no=0;
payload_strings_no=0;
direct_flow=0;
while ((opt=getopt(argc, argv, ARGS)) != -1) {
while ((opt=getopt(argc, argv, ARGS)) != -1) {
switch(opt) {
case 'f':
if (flow_spec[0]) {
fprintf(stderr, "%s: cannot specify -f and other arguments at the same time\n", __func__);
if (direct_flow) {
fprintf(stderr, "%s: cannot specify -f and direct flow at the same time\n", __func__);
exit(-1);
}
global_spec.conf_filename=optarg;
break;
case 'd':
if (!flow_spec[0])
if ((flow_spec[0]=new_flow_spec())==NULL) {
fprintf(stderr, "%s: new_flow_spec() failed\n", __func__);
exit(-1);
}
direct_flow=1;
flow_spec[0]->device=optarg;
break;
case 'm':
if (!flow_spec[0])
if ((flow_spec[0]=new_flow_spec())==NULL) {
fprintf(stderr, "%s: new_flow_spec() failed\n", __func__);
exit(-1);
}
direct_flow=1;
if (strlen(optarg)>1) {
fprintf(stderr, "%s: sampling mode must be d, b or p\n", __func__);
exit(-1);
......@@ -219,12 +286,7 @@ int main(int argc, char *argv[])
flow_spec[0]->sau_mode=optarg[0];
break;
case 'r':
if (!flow_spec[0])
if ((flow_spec[0]=new_flow_spec())==NULL) {
fprintf(stderr, "%s: new_flow_spec() failed\n", __func__);
exit(-1);
}
direct_flow=1;
if (flow_spec[0]->sau_mode=='p')
sscanf(optarg, "%f", &(flow_spec[0]->sau_pass_probability));
else if (flow_spec[0]->sau_mode=='b')
......@@ -233,12 +295,7 @@ int main(int argc, char *argv[])
flow_spec[0]->sau_packet_threshold=atoi(optarg);
break;
case 'p':
if (!flow_spec[0])
if ((flow_spec[0]=new_flow_spec())==NULL) {
fprintf(stderr, "%s: new_flow_spec() failed\n", __func__);
exit(-1);
}
direct_flow=1;
if (payload_strings_no >= MAX_PAYLOAD_STRINGS) {
fprintf(stderr, "%s: max. %d strings can be searched in payload\n", __func__, MAX_PAYLOAD_STRINGS);
exit(-1);
......@@ -255,12 +312,7 @@ int main(int argc, char *argv[])
payload_strings_no++;
break;
case 'i':
if (!flow_spec[0])
if ((flow_spec[0]=new_flow_spec())==NULL) {
fprintf(stderr, "%s: new_flow_spec() failed\n", __func__);
exit(-1);
}
direct_flow=1;
flow_spec[0]->interval=atof(optarg);
if (flow_spec[0]->interval<=0) {
fprintf(stderr, "%s: interval must be positive\n", __func__);
......@@ -268,36 +320,27 @@ int main(int argc, char *argv[])
}
break;
case 's':
if (!flow_spec[0])
if ((flow_spec[0]=new_flow_spec())==NULL) {
fprintf(stderr, "%s: new_flow_spec() failed\n", __func__);
exit(-1);
}
direct_flow=1;
flow_spec[0]->start_time_string=optarg;
break;
case 'e':
if (!flow_spec[0])
if ((flow_spec[0]=new_flow_spec())==NULL) {
fprintf(stderr, "%s: new_flow_spec() failed\n", __func__);
exit(-1);
}
direct_flow=1;
flow_spec[0]->end_time_string=optarg;
break;
case 'l':
if (!flow_spec[0])
if ((flow_spec[0]=new_flow_spec())==NULL) {
fprintf(stderr, "%s: new_flow_spec() failed\n", __func__);
exit(-1);
}
direct_flow=1;
flow_spec[0]->link_mbps=atoi(optarg);
if (flow_spec[0]->link_mbps<=0) {
fprintf(stderr, "%s: link installed bandwidth must be positive\n", __func__);
exit(-1);
}
break;
case 'q':
global_spec.no_stdout=1;
break;
case 'b':
global_spec.no_mysql=1;
break;
case 'h':
usage();
exit(0);
......@@ -307,61 +350,204 @@ int main(int argc, char *argv[])
}
}
/* Read configuration file */
/* Read configuration file */
if (global_spec.conf_filename)
if (global_spec.conf_filename) {
if (read_conf_file(global_spec.conf_filename, flow_spec)<0) {
fprintf(stderr, "%s: read_conf_file() failed\n", __func__);
exit(-1);
}
}
/* Check if specified values are within acceptable limits */
/* Check if specified values are within acceptable limits */
if (check_conf(flow_spec)<0) {
if (check_conf(flow_spec)<0) {
fprintf(stderr, "%s: check_conf() failed\n", __func__);
exit(-1);
}
}
/* Remaining command-line arguments form a header filter string */
/* Remaining command-line arguments form a header filter string */
for (i=optind; i<argc; i++) {
if ((strlen(flow_spec[0]->header_filter) + strlen(argv[i])) <
MAX_HEADER_FILTER) {
if (i>optind)
strcat(flow_spec[0]->header_filter, " ");
strcat(flow_spec[0]->header_filter, argv[i]);
}
else {
fprintf(stderr, "%s: header filter string must not be longer than %d characters\n", __func__, MAX_HEADER_FILTER);
return -1;
}
}
/* Print configuration */
print_conf(flow_spec);
if (optind < argc) {
if (!global_spec.no_mysql) {
if (global_spec.conf_filename) {
fprintf(stderr, "%s: cannot specify -f and other arguments at the same time\n", __func__);
if (mysql_server_init(0, NULL, NULL)) {
fprintf(stderr, "%s: mysql_server_init() failed\n", __func__);
exit(-1);
}
printf("mysql_server_init() ok\n");
if ((global_spec.mysql=mysql_init(NULL))==NULL) {
fprintf(stderr, "%s: mysql_init() failed\n", __func__);
exit(-1);
}
printf("mysql_init() ok\n");
if (mysql_real_connect(global_spec.mysql, NULL,
global_spec.user,
global_spec.passwd,
global_spec.db, 0, NULL, 0)==NULL) {
fprintf(stderr, "%s: mysql_real_connect() failed\n", __func__);
exit(-1);
}
printf("mysql_real_connect() ok\n");
for (i=optind; i<argc; i++) {
if ((strlen(flow_spec[0]->header_filter) + strlen(argv[i])) <
MAX_HEADER_FILTER) {
if (i>optind)
strcat(flow_spec[0]->header_filter, " ");
strcat(flow_spec[0]->header_filter, argv[i]);
}
else {
fprintf(stderr, "%s: header filter string must not be longer than %d characters\n", __func__, MAX_HEADER_FILTER);
return -1;
}
}
}
/* Print configuration */
print_conf(flow_spec);
}
while (1) {
if (gettimeofday(&tm, &tz)<0) {
fprintf(stderr, "%s: gettimeofday() failed\n", __func__);
exit(-1);
pp=flow_spec;
p=*pp;
qq=flow_run;
i=0;
while (p && i<MAX_FLOWS) {
if (!(global_spec.no_mysql)) {
/* Find this node in tSubject or INSERT it */
strcpy(query, "SELECT * FROM tSubject WHERE ");
}
if ((*qq=new_flow_run())==NULL) {
fprintf(stderr, "%s: new_flow_run() failed\n", __func__);
exit(-1);
}
q=*qq;
if ((q->fd=mapi_create_flow(p->device))<0) {
fprintf(stderr, "%s: mapi_create_flow(%s) failed\n", __func__,
p->device);
exit(-1);
}
if (strlen(p->header_filter)) {
if ((q->bpf_filter_fid=
mapi_apply_function(q->fd, "BPF_FILTER", p->header_filter))<0) {
fprintf(stderr, "%s: BPF_FILTER for flow %d failed\n", __func__,
i);
exit(-1);
}
}
if (p->sau_mode == 'd' && p->sau_packet_threshold != 1) {
if ((q->sample_fid=
mapi_apply_function(q->fd, "SAMPLE", PERIODIC,
p->sau_packet_threshold))<0) {
fprintf(stderr, "%s: SAMPLE for flow %d failed\n", __func__, i);
exit(-1);
}
}
else if (p->sau_mode == 'p' && p->sau_pass_probability != 1) {
if ((q->sample_fid=
mapi_apply_function(q->fd, "SAMPLE", PROBABILISTIC,
(p->sau_pass_probability)*100))<0) {
fprintf(stderr, "%s: SAMPLE for flow %d failed\n", __func__, i);
exit(-1);
}
}
if (p->payload_strings[0]) {
if ((q->str_search_fid=
mapi_apply_function(q->fd, "STR_SEARCH", p->payload_strings[0],
0, 0))<0) {
fprintf(stderr, "%s: STR_SEARCH for flow %d failed\n", __func__,
i);
exit(-1);
}
}
if ((q->pkt_counter_fid=
mapi_apply_function(q->fd, "PKT_COUNTER"))<0) {
fprintf(stderr, "%s: PKT_COUNTER for flow %d failed\n", __func__,
i);
exit(-1);
}
if ((q->byte_counter_fid=
mapi_apply_function(q->fd, "BYTE_COUNTER"))<0) {
fprintf(stderr, "%s: BYTE_COUNTER for flow %d failed\n",
__func__, i);
exit(-1);
}
if (mapi_connect(q->fd)<0) {
fprintf(stderr, "%s: mapi_connect() for flow %d failed\n", __func__,
i);
exit(-1);
}
pp++; p=*pp;
qq++; q=*qq;
i++;
} /* while (p) */
while (1) {
if (gettimeofday(&tm, &tz)<0) {
fprintf(stderr, "%s: gettimeofday() failed\n", __func__);
exit(-1);
}
printf("%u.%u", (unsigned int)(tm.tv_sec), (unsigned int)(tm.tv_usec));
pp=flow_spec;
p=*pp;
qq=flow_run;
q=*qq;
i=0;
while (p) {
if ((q->pkt_counter=
mapi_read_results(q->fd, q->pkt_counter_fid, TRUE))==NULL) {
fprintf(stderr, "%s: mapi_read_results() for flow %d failed\n",
__func__, i);
exit(-1);
}
if ((q->byte_counter=
mapi_read_results(q->fd, q->byte_counter_fid, TRUE))==NULL) {
fprintf(stderr, "%s: mapi_read_results() for flow %d failed\n",
__func__, i);
exit(-1);
}
if (!global_spec.no_stdout) {
printf(" %u/%u", *(q->pkt_counter)-q->old_pkt_counter,
*(q->byte_counter)-q->old_byte_counter);
}
q->old_pkt_counter=*(q->pkt_counter);
q->old_byte_counter=*(q->byte_counter);
pp++; p=*pp;
qq++; q=*qq;
i++;
} /* while (p) */
printf("\n");
/* TODO: wait until next checkpoint (not a fixed delay) of
the flow whose checkpoint is earliest */
usleep((flow_spec[0]->interval)*1000000);
} /* while (1) */
/* TODO: this is never reached, move to interrupt handler */
if (!global_spec.no_mysql) {
mysql_close(global_spec.mysql);
mysql_library_end();
}
printf("1 %u.%u %d %d %d %d %d\n", (unsigned int)tm.tv_sec,
(unsigned int)tm.tv_usec, rand()%1000, rand()%1000,
rand()%1000, rand()%1000, rand()%1000);
printf("2 %u.%u %d %d %d %d %d\n", (unsigned int)tm.tv_sec,
(unsigned int)tm.tv_usec, rand()%1000, rand()%1000,
rand()%1000, rand()%1000, rand()%1000);
usleep(100000);
}
return 0;
return 0;
} /* main() */
# Run this script with mysql root privileges, like this:
#
# mysql -u root -p mysql < create_database.sql
drop database if exists abw;
create database abw;
# Run this script with mysql root privileges, like this:
#
# mysql -u root -p abw < create_tables.sql
DROP TABLE IF EXISTS tSubject;
CREATE TABLE tSubject (
id int NOT NULL auto_increment,
hostname varchar(64),
ip varchar(15),
interface varchar(32),
direction varchar(3),
PRIMARY KEY (id)
);
DROP TABLE IF EXISTS tParameters;
CREATE TABLE tParameters (
id int NOT NULL auto_increment,
header_filter varchar(128),
sampling_type varchar(1),
sampling_threshold int,
payload_filter varchar(128),
interval_sec float,
PRIMARY KEY (id)
);
DROP TABLE IF EXISTS tValue;
CREATE TABLE tValue (
id int NOT NULL auto_increment,
subject_id int NOT NULL,
parameters_id int NOT NULL,
timestamp_sec int,
timestamp_usec int,
characteristics varchar(32),
value_int int,
value_float float,
value_varchar varchar(32),
PRIMARY KEY (id)
);
# Run this script with mysql root privileges, like this:
#