Commit 50b20a45 authored by 's avatar
Browse files

MySQL support working


git-svn-id: file:///home/svn/mapi/trunk@327 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent 81645754
......@@ -10,7 +10,7 @@ RLIB_DIRS=
INCLUDE_DIRS=
TARGETS=abw abw_dummy
OBJS=abw.o abw_dummy.o
OBJS=abw.o abw_mysql.o abw_dummy.o
all: $(TARGETS)
......@@ -21,8 +21,8 @@ all: $(TARGETS)
# $(AR) rv $@ $(LIBSCAMPI_OBJS)
# $(RANLIB) $@
abw: abw.o
$(LD) -o $@ abw.o $(LIB_DIRS) $(LIBS) -Wl,-rpath,$(RLIB_DIRS)
abw: abw.o abw_mysql.o
$(LD) -o $@ abw.o abw_mysql.o $(LIB_DIRS) $(LIBS) -Wl,-rpath,$(RLIB_DIRS)
abw_dummy: abw_dummy.o
$(LD) -o $@ abw_dummy.o $(LIB_DIRS) $(LIBS) -Wl,-rpath,$(RLIB_DIRS)
......
......@@ -11,6 +11,7 @@
#include "../../mapi.h"
#include "../../stdlib/sample.h"
#include "abw.h"
#include "abw_mysql.h"
flow_spec_t *flow_spec[MAX_FLOWS];
global_spec_t global_spec;
......@@ -130,7 +131,7 @@ int check_conf(flow_spec_t **flow_spec) {
}
if (strptime(p->start_time_string, "%d.%m.%Y-%H:%M:%S",
&p->start_time_tm)==NULL) {
&(p->start_time_tm))==NULL) {
fprintf(stderr, "%s: flow[%d]: cannot parse start time %s\n",
__func__, i, p->start_time_string);
return -1;
......@@ -143,7 +144,7 @@ int check_conf(flow_spec_t **flow_spec) {
}
if (strptime(p->end_time_string, "%d.%m.%Y-%H:%M:%S",
&p->end_time_tm)==NULL) {
&(p->end_time_tm))==NULL) {
fprintf(stderr, "%s: flow[%d]: cannot parse end time %s\n",
__func__, i, p->end_time_string);
return -1;
......@@ -227,7 +228,6 @@ int main(int argc, char *argv[])
flow_spec_t *p;
flow_run_t **qq;
flow_run_t *q;
char query[MAX_QUERY+1];
/* Set initial values */
global_spec.conf_filename=NULL;
......@@ -377,7 +377,7 @@ int main(int argc, char *argv[])
}
else {
fprintf(stderr, "%s: header filter string must not be longer than %d characters\n", __func__, MAX_HEADER_FILTER);
return -1;
exit(-1);
}
}
......@@ -415,18 +415,31 @@ int main(int argc, char *argv[])
i=0;
while (p && i<MAX_FLOWS) {
if (!(global_spec.no_mysql)) {
/* Find this node in tSubject or INSERT it */
strcpy(query, "SELECT * FROM tSubject WHERE ");
}
if ((*qq=new_flow_run())==NULL) {
fprintf(stderr, "%s: new_flow_run() failed\n", __func__);
exit(-1);
}
q=*qq;
if (!(global_spec.no_mysql)) {
/* Find this node in tSubject or INSERT it */
if ((q->subject_id=
abw_mysql_get_subject_id(&global_spec, p))<0) {
fprintf(stderr, "%s: abw_mysql_get_subject_id() failed\n",
__func__);
exit(-1);
}
/* Find these parameters in tParameters or INSERT it */
if ((q->parameters_id=
abw_mysql_get_parameters_id(&global_spec, p))<0) {
fprintf(stderr, "%s: abw_mysql_get_parameters_id() failed\n",
__func__);
exit(-1);
}
}
if ((q->fd=mapi_create_flow(p->device))<0) {
fprintf(stderr, "%s: mapi_create_flow(%s) failed\n", __func__,
p->device);
......@@ -527,6 +540,25 @@ int main(int argc, char *argv[])
*(q->byte_counter)-q->old_byte_counter);
}
if (!global_spec.no_mysql) {
if (abw_mysql_insert_value(&global_spec, q,
tm.tv_sec, tm.tv_usec,
"packets all",
*(q->pkt_counter)-q->old_pkt_counter, 0, NULL)<0) {
fprintf(stderr, "%s: abw_mysql_insert() failed\n", __func__);
exit(-1);
}
if (abw_mysql_insert_value(&global_spec, q,
tm.tv_sec, tm.tv_usec,
"bytes all",
*(q->byte_counter)-q->old_byte_counter, 0, NULL)<0) {
fprintf(stderr, "%s: abw_mysql_insert() failed\n", __func__);
exit(-1);
}
}
q->old_pkt_counter=*(q->pkt_counter);
q->old_byte_counter=*(q->byte_counter);
......
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <mysql/mysql.h>
#include "abw.h"
#include "abw_mysql.h"
int abw_mysql_get_subject_id(global_spec_t *global_spec,
flow_spec_t *p) {
char query[MAX_QUERY+1];
MYSQL_RES *mysql_res;
MYSQL_ROW mysql_row;
int subject_id;
int num;
if (global_spec==NULL || p==NULL) {
fprintf(stderr, "%s: some required arguments were not supplied\n",
__func__);
return -1;
}
/* We should search hostname if specified otherwise IP address,
then interface and direction. We currently run this application
on the host with a monitoring card (no DiMAPI), so we always
search local hostname and device of monitoring card, direction
is included in chosen monitoring card (but in future more ports
on the card can be used) */
strcpy(query, "SELECT * FROM tSubject WHERE hostname='");
strcat(query, global_spec->hostname);
strcat(query, "' AND interface='");
strcat(query, p->device);
strcat(query, "';");
printf("%s: query: %s\n", __func__, query);
if (mysql_query(global_spec->mysql, query)) {
fprintf(stderr, "%s: mysql_query(%s) failed\n", __func__, query);
return -1;
}
if ((mysql_res=mysql_store_result(global_spec->mysql))==NULL) {
fprintf(stderr, "%s: mysql_store_result() failed\n", __func__);
return -1;
}
if ((num=mysql_affected_rows(global_spec->mysql))<0) {
fprintf(stderr, "%s: mysql_affected_rows() failed\n", __func__);
return -1;
}
if (num>1) {
fprintf(stderr, "%s: multiple rows in tSubject matching subject\n",
__func__);
return -1;
}
if (num==1) { /* subject is in tSubject */
printf("%s: subject is in tSubject\n", __func__);
/* if ((mysql_res=mysql_store_result(global_spec->mysql))==NULL) {
fprintf(stderr, "%s: mysql_store_result() failed\n", __func__);
return -1;
} */
if ((mysql_row=mysql_fetch_row(mysql_res))==NULL) {
fprintf(stderr, "%s: mysql_fetch_row() failed\n", __func__);
return -1;
}
subject_id=atoi(mysql_row[0]);
printf("%s: subject_id: %d\n", __func__, subject_id);
return subject_id;
}
else { /* subject is not in tSubject */
printf("%s: subject is not in tSubject\n", __func__);
strcpy(query, "INSERT INTO tSubject (hostname, interface) VALUES ('");
strcat(query, global_spec->hostname);
strcat(query, "', '");
strcat(query, p->device);
strcat(query, "');");
printf("%s: query: %s\n", __func__, query);
if (mysql_query(global_spec->mysql, query)) {
fprintf(stderr, "%s: mysql_query(%s) failed\n", __func__, query);
return -1;
}
if ((num=mysql_affected_rows(global_spec->mysql))!=1) {
fprintf(stderr, "%s: mysql_affected_rows() failed\n", __func__);
return -1;
}
/* Call this function again (recursively) to get ID field of just
INSERTed row */
printf("%s: calling abw_mysql_get_subject_id() again\n", __func__);
if ((subject_id=abw_mysql_get_subject_id(global_spec, p))<0) {
fprintf(stderr, "%s: abw_mysql_get_subject_id() failed\n", __func__);
return -1;
}
return subject_id;
}
} /* abw_mysql_get_subject_id() */
int abw_mysql_get_parameters_id(global_spec_t *global_spec,
flow_spec_t *p) {
char query[MAX_QUERY+1];
MYSQL_RES *mysql_res;
MYSQL_ROW mysql_row;
int parameters_id;
int num;
if (global_spec==NULL || p==NULL) {
fprintf(stderr, "%s: some required arguments were not supplied\n",
__func__);
return -1;
}
sprintf(query, "SELECT * FROM tParameters WHERE sau_mode='%c' AND sau_threshold='%d' AND interval_sec='%f'",
p->sau_mode, p->sau_threshold, p->interval);
if (strlen(p->header_filter))
sprintf(query+strlen(query), " AND header_filter='%s'", p->header_filter);
if (p->payload_strings_no)
sprintf(query+strlen(query), " AND payload_string='%s'", p->payload_strings[0]);
printf("%s: query: %s\n", __func__, query);
if (mysql_query(global_spec->mysql, query)) {
fprintf(stderr, "%s: mysql_query(%s) failed\n", __func__, query);
return -1;
}
if ((mysql_res=mysql_store_result(global_spec->mysql))==NULL) {
fprintf(stderr, "%s: mysql_store_result() failed\n", __func__);
return -1;
}
if ((num=mysql_affected_rows(global_spec->mysql))<0) {
fprintf(stderr, "%s: mysql_affected_rows() failed\n", __func__);
return -1;
}
if (num>1) {
fprintf(stderr, "%s: multiple rows in tParameters matching parameters\n",
__func__);
return -1;
}
if (num==1) { /* parameters are in tParameters */
printf("%s: parameters are in in tParameters\n", __func__);
/* if ((mysql_res=mysql_store_result(global_spec->mysql))==NULL) {
fprintf(stderr, "%s: mysql_store_result() failed\n", __func__);
return -1;
} */
if ((mysql_row=mysql_fetch_row(mysql_res))==NULL) {
fprintf(stderr, "%s: mysql_fetch_row() failed\n", __func__);
return -1;
}
parameters_id=atoi(mysql_row[0]);
printf("%s: parameters_id: %d\n", __func__, parameters_id);
return parameters_id;
}
else { /* parameters are not in tParameters */
printf("%s: parameters are not in tParameters\n", __func__);
strcpy(query, "INSERT INTO tParameters (sau_mode, sau_threshold, interval_sec, link_mbps");
if (p->header_filter)
strcat(query, ", header_filter");
if (p->payload_strings_no)
strcat(query, ", payload_string");
sprintf(query+strlen(query), ") VALUES ('%c', '%d', '%f', '%d'",
p->sau_mode, p->sau_threshold, p->interval, p->link_mbps);
if (p->header_filter)
sprintf(query+strlen(query), ", '%s'", p->header_filter);
if (p->payload_strings_no)
sprintf(query+strlen(query), ", '%s'", p->payload_strings[0]);
strcat(query, ");");
printf("%s: query: %s\n", __func__, query);
if (mysql_query(global_spec->mysql, query)) {
fprintf(stderr, "%s: mysql_query(%s) failed\n", __func__, query);
return -1;
}
if ((num=mysql_affected_rows(global_spec->mysql))!=1) {
fprintf(stderr, "%s: mysql_affected_rows() failed\n", __func__);
return -1;
}
/* Call this function again (recursively) to get ID field of just
INSERTed row */
printf("%s: calling abw_mysql_get_parameters_id() again\n", __func__);
if ((parameters_id=abw_mysql_get_parameters_id(global_spec, p))<0) {
fprintf(stderr, "%s: abw_mysql_get_parameters_id() failed\n", __func__);
return -1;
}
return parameters_id;
}
} /* abw_mysql_get_parameters_id() */
int abw_mysql_insert_value(global_spec_t *global_spec, flow_run_t *q,
int timestamp_sec, int timestamp_usec,
char *characteristics,
int value_int, double value_float, char *value_varchar) {
char query[MAX_QUERY+1];
int num;
if (global_spec==NULL || q==NULL || characteristics==NULL) {
fprintf(stderr, "%s: some required arguments were not supplied\n",
__func__);
return -1;
}
sprintf(query, "INSERT INTO tValue (subject_id, parameters_id, timestamp_sec, timestamp_usec, characteristics, value_int, value_float");
if (value_varchar)
sprintf(query+strlen(query), ", value_varchar");
sprintf(query+strlen(query), ") VALUES ('%d', '%d', '%d', '%d', '%s', '%d', '%f'", q->subject_id, q->parameters_id, timestamp_sec, timestamp_usec, characteristics, value_int, value_float);
if (value_varchar)
sprintf(query+strlen(query), ", '%s'", value_varchar);
strcat(query, ");");
printf("%s: query: %s\n", __func__, query);
if (mysql_query(global_spec->mysql, query)) {
fprintf(stderr, "%s: mysql_query(%s) failed\n", __func__, query);
return -1;
}
if ((num=mysql_affected_rows(global_spec->mysql))!=1) {
fprintf(stderr, "%s: mysql_affected_rows() failed\n", __func__);
return -1;
}
return 0;
} /* abw_mysql_insert_value() */
#define MAX_QUERY 256
int abw_mysql_get_subject_id(global_spec_t *global_spec,
flow_spec_t *p);
int abw_mysql_get_parameters_id(global_spec_t *global_spec,
flow_spec_t *p);
int abw_mysql_insert_value(global_spec_t *global_spec, flow_run_t *q,
int timestamp_sec, int timestamp_usec,
char *characteristics,
int value_int, double value_float, char *value_varchar);
......@@ -16,10 +16,11 @@ DROP TABLE IF EXISTS tParameters;
CREATE TABLE tParameters (
id int NOT NULL auto_increment,
header_filter varchar(128),
sampling_type varchar(1),
sampling_threshold int,
payload_filter varchar(128),
sau_mode varchar(1),
sau_threshold int,
payload_string varchar(128),
interval_sec float,
link_mbps int,
PRIMARY KEY (id)
);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment