Commit 8af134dc authored by 's avatar
Browse files

Added abw_mysql_write_conf() and abw_mysql_clear_conf()


git-svn-id: file:///home/svn/mapi/trunk@374 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent 3658edc6
......@@ -19,6 +19,8 @@
global_spec_t global_spec;
flow_spec_t *flow_spec[MAX_FLOWS];
int debug=1;
int main(int argc, char *argv[])
{
int i, j;
......@@ -101,7 +103,7 @@ int main(int argc, char *argv[])
}
if (global_spec.conf_to_mysql) {
if (abw_mysql_write_conf(global_spec.mysql, flow_spec)<0) {
if (abw_mysql_write_conf(&global_spec, flow_spec)<0) {
fprintf(stderr, "%s: abw_mysql_write_conf() failed\n", __func__);
return -1;
}
......@@ -151,15 +153,16 @@ int main(int argc, char *argv[])
return -1;
}
j=0;
while (j<MAX_PROTOCOLS && p->protocols_array[j]) {
printf("protocol: |%s|\n", p->protocols_array[j]);
if ((p->flow_run[j]=new_flow_run())==NULL) {
fprintf(stderr, "%s: new_flow_run() failed\n", __func__);
return -1;
}
if (!(global_spec.no_measure)) {
j=0;
while (j<MAX_PROTOCOLS && p->protocols_array[j]) {
printf("protocol: |%s|\n", p->protocols_array[j]);
if ((p->flow_run[j]=new_flow_run())==NULL) {
fprintf(stderr, "%s: new_flow_run() failed\n", __func__);
return -1;
}
if (!(global_spec.no_measure)) {
q=p->flow_run[j];
/* Prepare header filter for this protocol */
......@@ -249,33 +252,28 @@ int main(int argc, char *argv[])
__func__, i);
return -1;
}
} /* if (!(global_spec.no_measure)) */
else {
value_t *values;
if (abw_mysql_select_values(global_spec.mysql, p,
p->protocols_array[j], "bytes", NULL, &values)<0) {
fprintf(stderr, "%s: abw_mysql_select_values() failed\n",
__func__);
return -1;
}
abw_mysql_print_values(values);
abw_mysql_free_values(values);
if (abw_mysql_select_values(global_spec.mysql, p,
p->protocols_array[j], "packets", NULL, &values)<0) {
fprintf(stderr, "%s: abw_mysql_select_values() failed\n",
__func__);
return -1;
}
abw_mysql_print_values(values);
abw_mysql_free_values(values);
j++;
} /* while (j<MAX_PROTOCOLS && p->protocols_array[j]) */
} /* if (!(global_spec.no_measure)) */
else {
values_int_t *values;
char *characteristics[MAX_CHARACTERISTICS];
characteristics[0]="bytes";
characteristics[1]="packets";
} /* if (!(global_spec.no_measure)) else */
if (abw_mysql_select_values(global_spec.mysql, p,
characteristics, NULL, &values)<0) {
fprintf(stderr, "%s: abw_mysql_select_values() failed\n",
__func__);
return -1;
}
abw_mysql_print_values(i, values);
abw_mysql_free_values(values);
j++;
} /* while (j<MAX_PROTOCOLS && p->protocols_array[j]) */
} /* if (!(global_spec.no_measure)) else */
pp++;
i++;
......
......@@ -15,6 +15,7 @@
#define MAX_FLOWS 16
#define MAX_PROTOCOLS 16
#define MAX_PROTOCOLS_STRING 128
#define MAX_CHARACTERISTICS 2
#define MAX_HOSTNAME 128
#define MAX_QUERY 256
......
......@@ -7,6 +7,8 @@
#include "abw.h"
#include "abw_mysql.h"
extern int debug;
int abw_mysql_start(global_spec_t *global_spec) {
if (mysql_server_init(0, NULL, NULL)) {
......@@ -281,39 +283,170 @@ int abw_mysql_insert_value(MYSQL *mysql, flow_spec_t *p, flow_run_t *q,
* - int **values return a sequence of structures with values
*/
int abw_mysql_select_values(MYSQL *mysql, flow_spec_t *p,
char *protocol, char *characteristics, char *statistics,
value_t **values) {
char query[MAX_QUERY+1];
MYSQL_RES *mysql_res=NULL;
MYSQL_ROW mysql_row;
int num;
int i;
value_t **last_value;
if (mysql==NULL || p==NULL || protocol==NULL || !strlen(protocol)) {
char **characteristics, char *statistics,
values_int_t **values) {
char query[MAX_QUERY+1], query2[MAX_QUERY+1];
MYSQL_RES *mysql_res, *mysql_res2;
MYSQL_ROW mysql_row, mysql_row2;
int num, num2;
int i, j, k;
char *ch, **chch;
values_int_t **last_value;
int characteristics_no, protocols_no;
unsigned int timestamp_sec, timestamp_usec;
int value_int;
int id;
if (mysql==NULL || p==NULL) {
fprintf(stderr, "%s: some required arguments were not supplied\n",
__func__);
return -1;
}
sprintf(query, "SELECT * FROM tValue WHERE subject_id='%d' AND parameters_id='%d' AND protocol='%s'", p->subject_id, p->parameters_id, protocol);
sprintf(query, "DELETE FROM tTempInt;");
if (mysql_query(mysql, query)) {
fprintf(stderr, "%s: mysql_query(%s) failed\n", __func__, query);
return -1;
}
i=0;
while (i<MAX_PROTOCOLS && p->protocols_array[i]) {
j=0;
chch=characteristics;
while (j<MAX_CHARACTERISTICS && *chch) {
ch=*chch;
sprintf(query, "SELECT * FROM tValue WHERE subject_id='%d' AND parameters_id='%d' AND protocol='%s' AND characteristics='%s'", p->subject_id, p->parameters_id, p->protocols_array[i], ch);
if (p->start_time_timestamp)
sprintf(query+strlen(query), " AND timestamp_sec >= '%u'",
(unsigned int)(p->start_time_timestamp));
if (p->end_time_timestamp)
sprintf(query+strlen(query), " AND timestamp_sec <= '%u'",
(unsigned int)(p->end_time_timestamp));
if (statistics && strlen(statistics))
sprintf(query+strlen(query), " AND statistics='%s'", statistics);
sprintf(query+strlen(query), ";");
printf("%s: query: %s\n", __func__, query);
if (mysql_query(mysql, query)) {
fprintf(stderr, "%s: mysql_query(%s) failed\n", __func__, query);
return -1;
}
if ((mysql_res=mysql_store_result(mysql))==NULL) {
fprintf(stderr, "%s: mysql_store_result() failed\n", __func__);
return -1;
}
if (p->start_time_timestamp)
sprintf(query+strlen(query), " AND timestamp_sec >= '%u'",
(unsigned int)(p->start_time_timestamp));
if ((num=mysql_affected_rows(mysql))<0) {
fprintf(stderr, "%s: mysql_affected_rows() failed\n", __func__);
return -1;
}
printf("Selected rows: %d\n", num);
for (k=0; k<num; k++) {
if ((mysql_row=mysql_fetch_row(mysql_res))==NULL) {
fprintf(stderr, "%s: mysql_fetch_row() failed\n", __func__);
return -1;
}
timestamp_sec=atoi(mysql_row[3]);
timestamp_usec=atoi(mysql_row[4]);
value_int=atoi(mysql_row[9]);
sprintf(query2, "SELECT * FROM tTempInt WHERE timestamp_sec='%u' AND timestamp_usec='%u';", timestamp_sec, timestamp_usec);
printf("%s: query2: %s\n", __func__, query2);
if (mysql_query(mysql, query2)) {
fprintf(stderr, "%s: mysql_query(%s) failed\n", __func__,
query2);
return -1;
}
if ((mysql_res2=mysql_store_result(mysql))==NULL) {
fprintf(stderr, "%s: mysql_store_result() failed\n",
__func__);
return -1;
}
if ((num2=mysql_affected_rows(mysql))<0) {
fprintf(stderr, "%s: mysql_affected_rows() failed\n", __func__);
return -1;
}
printf("Selected rows: %d\n", num2);
if (num2>1) {
fprintf(stderr, "%s: multiple rows for the same flow and timestamp found, skipped\n", __func__);
continue;
}
if (num2==1) {
printf("%s: timestamp %u.%u found in tTempInt\n",
__func__, timestamp_sec, timestamp_usec);
if ((mysql_row2=mysql_fetch_row(mysql_res2))==NULL) {
fprintf(stderr, "%s: mysql_fetch_row() failed\n", __func__);
return -1;
}
id=atoi(mysql_row2[0]);
/* Update row with new value */
sprintf(query, "UPDATE tTempInt SET value%d_int='%d' WHERE id='%d';", (i+1)*(j+1), value_int, id);
printf("%s: query: %s\n", __func__, query);
if (mysql_query(mysql, query)) {
fprintf(stderr, "%s: mysql_query(%s) failed\n", __func__,
query);
return -1;
}
if (mysql_affected_rows(mysql)!=1) {
fprintf(stderr, "%s: mysql_affected_rows() failed\n",
__func__);
return -1;
}
}
else {
printf("%s: timestamp %u.%u NOT found in tTempInt\n",
__func__, timestamp_sec, timestamp_usec);
sprintf(query, "INSERT INTO tTempInt (timestamp_sec, timestamp_usec, value%d_int) VALUES ('%u', '%u', '%d');", (i+1)*(j+1), atoi(mysql_row[3]), atoi(mysql_row[4]), value_int);
printf("%s: query: %s\n", __func__, query);
if (mysql_query(mysql, query)) {
fprintf(stderr, "%s: mysql_query(%s) failed\n", __func__,
query);
return -1;
}
if (mysql_affected_rows(mysql)!=1) {
fprintf(stderr, "%s: mysql_affected_rows() failed\n",
__func__);
return -1;
}
} /* if (num2==1) else, timestamp not found */
} /* for (k=0; k<num; k++) */
j++;
chch++;
} /* while (j<MAX_CHARACTERISTICS && *chch) */
if (p->end_time_timestamp)
sprintf(query+strlen(query), " AND timestamp_sec <= '%u'",
(unsigned int)(p->end_time_timestamp));
characteristics_no=j;
if (characteristics && strlen(characteristics))
sprintf(query+strlen(query), " AND characteristics='%s'",
characteristics);
i++;
} /* while (i<MAX_PROTOCOLS && p->protocols_array[i] */
if (statistics && strlen(statistics))
sprintf(query+strlen(query), " AND statistics='%s'", statistics);
protocols_no=i;
sprintf(query+strlen(query), ";");
sprintf(query, "SELECT * FROM tTempInt;");
printf("%s: query: %s\n", __func__, query);
if (mysql_query(mysql, query)) {
......@@ -322,8 +455,8 @@ int abw_mysql_select_values(MYSQL *mysql, flow_spec_t *p,
}
if ((mysql_res=mysql_store_result(mysql))==NULL) {
fprintf(stderr, "%s: mysql_store_result() failed\n", __func__);
return -1;
fprintf(stderr, "%s: mysql_store_result() failed\n", __func__);
return -1;
}
if ((num=mysql_affected_rows(mysql))<0) {
......@@ -340,33 +473,22 @@ int abw_mysql_select_values(MYSQL *mysql, flow_spec_t *p,
return -1;
}
if ((*last_value=malloc(sizeof(value_t)))==NULL) {
if ((*last_value=malloc(sizeof(values_int_t)))==NULL) {
fprintf(stderr, "%s: malloc() failed\n", __func__);
return -1;
}
memset(*last_value, 0, sizeof(value_t));
memset(*last_value, 0, sizeof(values_int_t));
(*last_value)->timestamp.tv_sec=atoi(mysql_row[3]);
(*last_value)->timestamp.tv_usec=atoi(mysql_row[4]);
(*last_value)->timestamp.tv_sec=atoi(mysql_row[1]);
(*last_value)->timestamp.tv_usec=atoi(mysql_row[2]);
/* TODO: protocol, characteristics, unit and statistics fields are
currently not retrieved from database. When we do it, we will need
to malloc() memory for each field here and free() it later. */
for (j=0; j<(characteristics_no * protocols_no); j++)
(*last_value)->value_int[j]=atoi(mysql_row[j+3]);
if (mysql_row[9] && strlen(mysql_row[9]))
(*last_value)->value_int=atoi(mysql_row[9]);
if (mysql_row[10] && strlen(mysql_row[10]))
(*last_value)->value_double=atof(mysql_row[10]);
if (mysql_row[11] && strlen(mysql_row[11])) {
if (((*last_value)->value_char=malloc(strlen(mysql_row[11])+1))==NULL) {
fprintf(stderr, "%s: malloc() failed\n", __func__);
return -1;
}
strcpy((*last_value)->value_char, mysql_row[11]);
}
/* Mark the end of valid values for abw_mysql_print_values() */
if (j<MAX_VALUES)
(*last_value)->value_int[j]=-1;
last_value=&((*last_value)->next);
} /* for (i=0; i<num; i++) */
......@@ -375,39 +497,248 @@ int abw_mysql_select_values(MYSQL *mysql, flow_spec_t *p,
} /* abw_mysql_select_values() */
int abw_mysql_clear_conf(MYSQL *mysql) {
char query[MAX_QUERY+1];
int num;
if (mysql==NULL) {
fprintf(stderr, "%s: MySQL handle not passed in input argument\n",
__func__);
return -1;
}
/*
* Delete all rows from tMeasurement
*/
sprintf(query, "DELETE FROM tMeasurement;");
if (mysql_query(mysql, query)) {
fprintf(stderr, "%s: mysql_query(%s) failed\n", __func__, query);
return -1;
}
if ((num=mysql_affected_rows(mysql))<0) {
fprintf(stderr, "%s: mysql_affected_rows() failed\n", __func__);
return -1;
}
if (debug)
printf("%s: %d rows deleted from tMeasurement\n", __func__, num);
/*
* Delete rows from tSubject with ids not in tValue.subject_id
*/
sprintf(query, "DELETE FROM tTempId;");
if (mysql_query(mysql, query)) {
fprintf(stderr, "%s: mysql_query(%s) failed\n", __func__, query);
return -1;
}
sprintf(query, "INSERT INTO tTempId SELECT DISTINCT tSubject.id FROM tSubject,tValue WHERE tSubject.id=tValue.subject_id;");
if (mysql_query(mysql, query)) {
fprintf(stderr, "%s: mysql_query(%s) failed\n", __func__, query);
return -1;
}
if ((num=mysql_affected_rows(mysql))<0) {
fprintf(stderr, "%s: mysql_affected_rows() failed\n", __func__);
return -1;
}
if (debug)
printf("%s: %d rows with distinct valid ids found in tSubject\n", __func__, num);
sprintf(query, "DELETE FROM tSubject WHERE id NOT IN (SELECT id FROM tTempId);");
if (mysql_query(mysql, query)) {
fprintf(stderr, "%s: mysql_query(%s) failed\n", __func__, query);
return -1;
}
if ((num=mysql_affected_rows(mysql))<0) {
fprintf(stderr, "%s: mysql_affected_rows() failed\n", __func__);
return -1;
}
if (debug)
printf("%s: %d rows deleted from tSubject with ids not in tValue\n", __func__, num);
/*
* Delete rows from tParameters with ids not in tValue.parameters_id
*/
sprintf(query, "DELETE FROM tTempId;");
if (mysql_query(mysql, query)) {
fprintf(stderr, "%s: mysql_query(%s) failed\n", __func__, query);
return -1;
}
sprintf(query, "INSERT INTO tTempId SELECT DISTINCT tParameters.id FROM tParameters,tValue WHERE tParameters.id=tValue.Parameters_id;");
if (mysql_query(mysql, query)) {
fprintf(stderr, "%s: mysql_query(%s) failed\n", __func__, query);
return -1;
}
if ((num=mysql_affected_rows(mysql))<0) {
fprintf(stderr, "%s: mysql_affected_rows() failed\n", __func__);
return -1;
}
if (debug)
printf("%s: %d rows with distinct valid ids found in tParameters\n", __func__, num);
sprintf(query, "DELETE FROM tParameters WHERE id NOT IN (SELECT id FROM tTempId);");
if (mysql_query(mysql, query)) {
fprintf(stderr, "%s: mysql_query(%s) failed\n", __func__, query);
return -1;
}
if ((num=mysql_affected_rows(mysql))<0) {
fprintf(stderr, "%s: mysql_affected_rows() failed\n", __func__);
return -1;
}
if (debug)
printf("%s: %d rows deleted from tParameters with ids not in tValue\n", __func__, num);
return 0;
} /* abw_mysql_clear_conf() */
int abw_mysql_read_conf(MYSQL *mysql, flow_spec_t *flow_spec[]) {
return 0;
} /* abw_mysql_read_conf() */
int abw_mysql_write_conf(MYSQL *mysql, flow_spec_t *flow_spec[]) {
int abw_mysql_write_conf(global_spec_t *global_spec, flow_spec_t *flow_spec[]) {
flow_spec_t *p, **pp;
int i;
int subject_id, parameters_id;
char query[MAX_QUERY+1];
MYSQL_RES *mysql_res;
int num;
if (global_spec==NULL || global_spec->mysql==NULL) {
fprintf(stderr, "%s: MySQL handle not passed in input argument\n",
__func__);
return -1;
}
pp=flow_spec;
i=0;
while (i<MAX_FLOWS && *pp) {
p=*pp;
/* Find subject of this flow in tSubject or INSERT it */
if ((subject_id=abw_mysql_get_subject_id(global_spec, p))<0) {
fprintf(stderr, "%s: abw_mysql_get_subject_id() failed\n",
__func__);
return -1;
}
/* Find parameters of this flow in tParameters or INSERT it */
if ((parameters_id=
abw_mysql_get_parameters_id(global_spec, p))<0) {
fprintf(stderr, "%s: abw_mysql_get_parameters_id() failed\n",
__func__);
return -1;
}
/* Check if measurement with this subject and parameters is already
in tMeasurement */
sprintf(query, "SELECT * FROM tMeasurement WHERE subject_id='%d' AND parameters_id='%d'", subject_id, parameters_id);
if (p->start_time_timestamp)
sprintf(query+strlen(query), " AND start_timestamp_sec = '%u'",
(unsigned int)(p->start_time_timestamp));
if (p->end_time_timestamp)
sprintf(query+strlen(query), " AND end_timestamp_sec = '%u'",
(unsigned int)(p->end_time_timestamp));
printf("%s: query: %s\n", __func__, query);
if (mysql_query(global_spec->mysql, query)) {
fprintf(stderr, "%s: mysql_query(%s) failed\n", __func__, query);
return -1;
}
if ((mysql_res=mysql_store_result(global_spec->mysql))==NULL) {
fprintf(stderr, "%s: mysql_store_result() failed\n", __func__);
return -1;
}
if ((num=mysql_affected_rows(global_spec->mysql))<0) {
fprintf(stderr, "%s: mysql_affected_rows() failed\n", __func__);
return -1;
}
if (num>1) {
fprintf(stderr, "%s: multiple rows in tMeasurement for the same subject, parameters and time period\n", __func__);
return -1;
}
if (num==1) {
if (debug)
printf("%s: measurement subject, parameters and time period found in tMeasurement\n", __func__);
}
else {
if (debug)
printf("%s: measurement subject, parameters and time period not found in tMeasurement\n", __func__);
sprintf(query, "INSERT INTO tMeasurement (subject_id, parameters_id%s%s) VALUES ('%d', '%d'", (p->start_time_timestamp)?", start_timestamp_sec":"", (p->end_time_timestamp)?", end_timestamp_sec":"", subject_id, parameters_id);
if (p->start_time_timestamp)
sprintf(query+strlen(query), ", '%u'", (unsigned int)(p->start_time_timestamp));
if (p->end_time_timestamp)
sprintf(query+strlen(query), ", '%u'", (unsigned int)(p->end_time_timestamp));
sprintf(query+strlen(query), ");");
printf("%s: query: %s\n", __func__, query);
if (mysql_query(global_spec->mysql, query)) {
fprintf(stderr, "%s: mysql_query(%s) failed\n", __func__, query);
return -1;
}
if ((num=mysql_affected_rows(global_spec->mysql))!=1) {
fprintf(stderr, "%s: mysql_affected_rows() failed\n", __func__);
return -1;
}
}
i++;
pp++;
} /* while (i<MAX_FLOWS && *pp) */
return 0;
} /* abw_mysql_write_conf() */
int abw_mysql_print_values(value_t *values) {
value_t *value;
int abw_mysql_print_values(int flow_id, values_int_t *values) {
values_int_t *current_values;
int i;
current_values=values;
while (current_values) {
printf("%d %d.%u", flow_id,
(unsigned int)(current_values->timestamp.tv_sec),
(unsigned int)(current_values->timestamp.tv_usec));
i=0;
while (i<MAX_VALUES && current_values->value_int[i]>=0) {
printf(" %d", current_values->value_int[i]);
i++;
}
printf("\n");
value=values;
while (value) {
printf("%d.%u %u\n", (unsigned int)(value->timestamp.tv_sec),
(unsigned int)(value->timestamp.tv_usec), value->value_int);
value=value->next;
current_values=current_values->next;
}
return 0;
} /* abw_mysql_print_values() */
int abw_mysql_free_values(value_t *values) {
value_t *value, *next;
int abw_mysql_free_values(values_int_t *values) {
values_int_t *current_values, *next;