Commit c86bd1cf authored by 's avatar

New prototype for mapi_read_results()


git-svn-id: file:///home/svn/mapi/trunk@542 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent 75c3f459
PREFIX=/usr/local/mapi
PREFIX=/usr/local
INSTALL_LIBDIR=$(PREFIX)/lib/mapi
INSTALL_INCDIR=$(PREFIX)/include/mapi
INSTALL_BINDIR=$(PREFIX)/bin
......@@ -16,7 +16,7 @@ WITH_DAG=0
WITH_COMBO6=0
#Distributed MAPI functionality
WITH_DIMAPI=0
WITH_DIMAPI=1
#MAPI function statistics
#With this pkt counters for each function is enabled
......
......@@ -136,36 +136,29 @@ the mapi_apply_function(). Before getting the results the mapi_connect()
function must be called. To get results from the remote sensors,
the mapi_read_results() function is used for the corresponding
function that have been applied. While in MAPI the mapi_read_results()
function returns directly the single result as a void* type, in
DiMAPI it returns a vector of the results from all monitoring sensors
by returning an instance of 'struct dmapi_results' (also as void*
type for compatibility). The relevant data structures that
mapi_read_results() returns in DiMAPI are shown below:
//struct returned by read_results in dimapi
struct dmapi_results {
struct host_results* res;
unsigned int cnt;
};
//struct containing the results in dimapi
struct host_results {
void *result;
char host_dev[HOST_DEV_SIZE];
unsigned long long t_produced;
unsigned long long t_received;
};
mapi_read_results() returns an instance of 'struct dmapi_results'
that contains the results from all monitoring sensors as a vector.
The 'cnt' field indicates the number of hosts that sent results and
the 'res' field is a table from 'struct host_results' instances
that contain the results from every monitoring sensor. The 'result'
field gives the real result, the 'host_dev' field gives the name
of host with its monitoring interface, and finally the fields
't_produced' and 't_received' are 64 bits timestamps for the
production (setted by mapid) and reception of the results (in mapi
stub) respectively. The other MAPI function that gives results
function returns a single mapi_results_t struct that contains the actual result,
the time whick this result has beed produced and the result size,
in DiMAPI it returns a table of mapi_results_t from all the monitoring sensors.
The mapi_results_t strcut that is returned by mapi_read_results() is shown below:
/*structure returned by read_results*/
typedef struct mapi_results {
void* res; //actual result
unsigned long long ts; //timestamp
int size; //result size
} mapi_results_t;
mapi_read_results() returns a table of mapi_results_t that each one
contains the results from one monitoring sensor.
The mapi_get_scope_size() function returns the number of monitoring sensors
for a specific network scope, denoted by the function's argument fd.
The monitoring sensors have been declared in mapi_create_flow().
mapi_read_results() function returns the results of every monitoring sensor
in a table with the same order that each monitoring sensor have been given in
mapi_create_flow().
The other MAPI function that gives results
from the monitoring sensors is the mapi_get_next_pkt() function.
In DiMAPI, the mapi_get_next_pkt() returns packets from the monitoring
sensors in a round-robin way if possible, else from a monitoring
......
......@@ -57,8 +57,10 @@ int main(int argc, char **argv) {
free(mapi_conf);
pc_close();
if ((serv_sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1)
if ((serv_sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
die("Unexpected error on socket()");
exit(-1);
}
memset(&serv_addr, 0, sizeof serv_addr);
......@@ -75,6 +77,7 @@ int main(int argc, char **argv) {
if (bind(serv_sock, (struct sockaddr *)&serv_addr, sizeof serv_addr) == -1) {
close(serv_sock);
die("Unexpected error on bind()");
exit(1);
}
/* queue max 5 connections */
......@@ -82,6 +85,7 @@ int main(int argc, char **argv) {
shutdown(serv_sock, SHUT_RDWR);
close(serv_sock);
die("Unexpected error on listen()");
exit(-1);
}
set_agent();
......@@ -89,16 +93,19 @@ int main(int argc, char **argv) {
while(1) {
clnt_len = sizeof clnt_addr;
if ((new_sock = accept(serv_sock, (struct sockaddr *)&clnt_addr, &clnt_len)) == -1)
if ((new_sock = accept(serv_sock, (struct sockaddr *)&clnt_addr, &clnt_len)) == -1) {
die("Unexpected error on accept()");
//continue;
continue;
}
printf("<*> got connection from %s\n", inet_ntoa(clnt_addr.sin_addr));
arg=(unsigned char*)malloc((sizeof(int)+sizeof(clnt_addr.sin_addr)));
memcpy(arg, &new_sock, sizeof(int));
memcpy(arg+sizeof(int), &clnt_addr.sin_addr, sizeof(clnt_addr.sin_addr));
if (pthread_create(&chld_thr, NULL, handle_request, (void *)arg) != 0)
if (pthread_create(&chld_thr, NULL, handle_request, (void *)arg) != 0) {
die("pthread_create() failed");
continue;
}
}
return 0; /* never reached */
......@@ -111,14 +118,12 @@ void *handle_request(void *arg) {
char buffer[DIMAPI_DATA_SIZE];
struct dmapiipcbuf *dbuf=NULL;
int mapid_result;
void *result;
mapi_results_t *result;
unsigned int dbuf_bytes=0;
int i;
int *active_flows = NULL;
int ac_fl_size=0;
mapi_function_info_t funct_info;
mapidflib_function_def_t *fdef;
int result_size;
mapi_flow_info_t flow_info;
struct timeval tv; /*used for timestamping results when produced */
struct mapipkt *pkt;
......@@ -214,14 +219,10 @@ void *handle_request(void *arg) {
//fprintf(stdout,"READ_RESULT\n");
dbuf->cmd = READ_RESULT_ACK;
result = mapi_read_results(dbuf->fd,dbuf->fid);
gettimeofday(&tv, NULL);
dbuf->timestamp = tv.tv_sec*1000000 + tv.tv_usec;
if(result!=NULL){
mapi_get_function_info(dbuf->fd,dbuf->fid,&funct_info);
fdef=mapilh_get_function_def(funct_info.name,funct_info.devtype);
result_size=fdef->shm_size;
memcpy(dbuf->data, result, result_size);
dbuf->length = BASIC_SIZE + result_size;
dbuf->timestamp = result->ts;
memcpy(dbuf->data, result->res, result->size);
dbuf->length = BASIC_SIZE + result->size;
}
else{
fprintf(stdout,"mapi_read_results failed...\n");
......
......@@ -10,8 +10,8 @@ int main(int argc, char* argv[]) {
int fid;
unsigned long long prev=0;
unsigned long long sum;
struct dmapi_results *dres;
unsigned int i;
mapi_results_t *dres;
int i;
if(!argv[1]){
......@@ -37,18 +37,19 @@ int main(int argc, char* argv[]) {
exit(0);
}
printf("Hosts: %d\n",mapi_get_scope_size(fd));
while(1) { /* forever, report the load */
sleep(1);
dres = mapi_read_results(fd, fid);
sum=0;
for (i=0; i<dres->cnt; i++)
dres = mapi_read_results(fd, fid);
for (i=0; i<mapi_get_scope_size(fd); i++)
{
sum+=*((unsigned long long*)dres->res[i].result);
sum+=*((unsigned long long*)dres[i].res);
}
printf("traffic: %.2f Mbit/s (%llu bytes)\n",
printf("total traffic: %.2f Mbit/s (%llu bytes)\n",
(sum-prev)*8/1000000.0, sum-prev );
prev = sum;
......
......@@ -16,9 +16,8 @@ die(int d)
int
main(int argc,char **argv)
{
int counter, loop = 10, isremote=0;
unsigned long long *res;
struct dmapi_results *dres;
int counter, loop = 10;
mapi_results_t *res;
signal(SIGINT,die);
signal(SIGTERM,die);
......@@ -26,7 +25,6 @@ main(int argc,char **argv)
if(argc>1){
fd = mapi_create_flow(argv[1]);
isremote=1;
}
else{
fd = mapi_create_flow("eth0");
......@@ -47,20 +45,11 @@ main(int argc,char **argv)
while( loop-- )
{
sleep(1);
if(isremote){
dres = (struct dmapi_results*) mapi_read_results(fd,counter);
if(dres)
printf("PKTS=%llu\n",*((unsigned long long*)dres->res[0].result));
else
printf("mapi_read_results failed!\n");
}
else{
res = mapi_read_results(fd,counter);
if(res)
printf("PKTS=%llu\n",*res);
else
printf("mapi_read_results failed!\n");
}
res = mapi_read_results(fd,counter);
if(res)
printf("PKTS=%llu\n",*((unsigned long long*)res->res));
else
printf("mapi_read_results failed!\n");
}
die(0);
......
......@@ -11,7 +11,7 @@ int main(int argc, char *argv[])
int fid;
int limit = 0;
unsigned int i;
struct dmapi_results *dres;
mapi_results_t *dres;
if(!argv[1] || !argv[2])
{
......@@ -48,10 +48,11 @@ int main(int argc, char *argv[])
while(1)
{
/* dres = (struct dmapi_results*) mapi_read_results(fd,fid);
for (i=0; i<dres->cnt; i++){
if(*((int*)dres->res[i].result)==0){
printf("host %s: ",dres->res[i].host_dev);
/* dres = mapi_read_results(fd,fid);
for (i=0; i<mapi_get_scope_size(fd); i++){
if(*((int*)dres[i].res)==0){
//printf("host %s: ",dres[i].res);
printf("host %d: ",i);
printf("File written.\n");
mapi_close_flow(fd);
exit(0);
......
......@@ -6,9 +6,9 @@
int main(int argc, char *argv[]){
int fd;
int fid;
struct dmapi_results *dres1;
mapi_results_t *dres;
int loop = 1000;
unsigned int i=0;
int i=0;
if(!argv[1] || !argv[2]){
printf("\nWrong arguments\nUsage <scope> <bpf filter>\n");
......@@ -41,11 +41,10 @@ int main(int argc, char *argv[]){
while(loop--){
sleep(1);
dres1 = (struct dmapi_results*) mapi_read_results(fd,fid);
for (i=0; i<dres1->cnt; i++)
printf("pkts read in host %s: %llu (time produced: %llu time received: %llu)\n",dres1->res[i].host_dev, *((unsigned long long*)dres1->res[i].result), dres1->res[i].t_produced, dres1->res[i].t_received);
dres = (mapi_results_t*) mapi_read_results(fd,fid);
for (i=0; i<mapi_get_scope_size(fd); i++)
printf("pkts read in host %d: %llu (time produced: %llu)\n",i, *((unsigned long long*)dres[i].res), dres[i].ts);
}
mapi_close_flow(fd);
......
......@@ -6,9 +6,9 @@
int main(int argc, char *argv[]){
int fd;
int fid1,fid2;
struct dmapi_results *dres1, *dres2;
mapi_results_t *dres1, *dres2;
int loop = 1000;
unsigned int i=0;
int i=0;
if(!argv[1]){
printf("\nWrong arguments\n");
......@@ -46,14 +46,14 @@ int main(int argc, char *argv[]){
while(loop--){
sleep(1);
dres1 = (struct dmapi_results*) mapi_read_results(fd,fid1);
dres2 = (struct dmapi_results*) mapi_read_results(fd,fid2);
dres1 = mapi_read_results(fd,fid1);
dres2 = mapi_read_results(fd,fid2);
for (i=0; i<dres2->cnt; i++)
printf("pkts read in host %s: %llu (time produced: %llu time received: %llu)\n",dres2->res[i].host_dev, *((unsigned long long*)dres2->res[i].result), dres2->res[i].t_produced, dres2->res[i].t_received);
for (i=0; i<mapi_get_scope_size(fd); i++)
printf("pkts read in host %d: %llu (time produced: %llu)\n",i, *((unsigned long long*)dres2[i].res), dres2[i].ts);
for (i=0; i<dres1->cnt; i++)
printf("bytes read in host %s: %llu (time produced: %llu time received: %llu)\n",dres1->res[i].host_dev, *((unsigned long long*)dres1->res[i].result), dres1->res[i].t_produced, dres1->res[i].t_received);
for (i=0; i<mapi_get_scope_size(fd); i++)
printf("bytes read in host %d: %llu (time produced: %llu)\n",i, *((unsigned long long*)dres1[i].res), dres1[i].ts);
}
......
......@@ -11,7 +11,7 @@ int main(int argc, char *argv[])
int fid;
int limit = 0;
unsigned int i;
struct dmapi_results *dres;
mapi_results_t *dres;
if(!argv[1] || !argv[2])
{
......@@ -38,10 +38,11 @@ int main(int argc, char *argv[])
while(1)
{
dres = (struct dmapi_results*) mapi_read_results(fd,fid);
for (i=0; i<dres->cnt; i++){
if(*((int*)dres->res[i].result)==0){
printf("host %s: ",dres->res[i].host_dev);
dres = mapi_read_results(fd,fid);
for (i=0; i<mapi_get_scope_size(fd); i++){
if(*((int*)dres[i].res)==0){
//printf("host %s: ",dres->res[i].host_dev);
printf("host %d: ",i);
printf("File written.\n");
mapi_close_flow(fd);
exit(0);
......
......@@ -13,7 +13,9 @@
int main(MAPI_UNUSED int argc, char *argv[])
{
int fd;
int fid,i;
int fid;
unsigned int i;
mapi_results_t *res;
unsigned int *cnt,*tmp;
if(!argv[1])
......@@ -31,7 +33,8 @@ int main(MAPI_UNUSED int argc, char *argv[])
while (1)
{
cnt=((struct dmapi_results*)mapi_read_results(fd,fid))->res[0].result;
res=mapi_read_results(fd,fid);
cnt=(unsigned int*)res->res;
sleep(1);
printf("results: %d\n",*cnt);
......
......@@ -24,7 +24,7 @@ MAPI \- Monitoring Application Programming Interface
.br
.BI "int mapi_loop(int " fd ", int " fid ", int " cnt ", mapi_handler);"
.br
.BI "void * mapi_read_results(int " fd ", int " fid ");"
.BI "mapi_results_t * mapi_read_results(int " fd ", int " fid ");"
.br
.BI "int mapi_read_error(int *" err_no ", char *" err_str ");"
.br
......@@ -41,6 +41,8 @@ MAPI \- Monitoring Application Programming Interface
.BI "int mapi_get_function_info(int " fd ", int " fid ", mapi_function_info_t *" info ");"
.br
.BI "int mapi_get_next_function_info(int " fd ", int " fid ", mapi_function_info_t *" info ");"
.br
.BI "int mapi_get_scope_size(int " fd ");"
.SH DESCRIPTION
MAPI builds on the simple and powerful abstraction of the network flow. In
MAPI, a network flow is generally defined as a sequence of packets that
......@@ -165,7 +167,7 @@ must have been previously applied to the flow, and the corresponding identifier
must be passed to
.BR mapi_loop() .
.sp
.BI "void * mapi_read_results(int " fd ", int " fid ");"
.BI "mapi_results_t * mapi_read_results(int " fd ", int " fid ");"
.br
.B mapi_read_results()
receives the results of the function denoted by
......@@ -173,36 +175,25 @@ receives the results of the function denoted by
applied to the flow with flow descriptor
.IR fd .
.B mapi_read_results()
returns a pointer (void *) to the memory where the results are stored.
.sp
In case that we use more than one monitoring sensors,
.B mapi_read_results()
returns a vector that contains the results from all the monitoring sensors.
The return value is a
.B dmapi_results struct
as specified in mapi.h:
returns a
.B mapi_results_t
struct, as specified in mapi.h:
.sp
.nf
struct dmapi_results {
struct host_results* res;
unsigned int cnt;
}
.sp
struct host_results {
void *result;
char *host_dev;
unsigned long long t_produced;
unsigned long long t_received;
}
typedef struct mapi_results {
void* res; //actual result
unsigned long long ts; //timestamp
int size; //result size
} mapi_results_t;
.fi
.sp
The
.I res
field is a vector of
.I cnt
items
.B (host_results struct)
that contain the actual result and information about the monitoring sensor and the time produced and received. The necessary memory for these structs has been already allocated, once per every function applied.
In case that we have more than one monitoring sensors,
.B mapi_read_results()
returns a vector of
.B mapi_results_t
structs that contains the results from all the monitoring sensors.
.sp
The necessary memory for these structs has been already allocated, once per every function applied.
.sp
.BI "int mapi_read_error(int *" err_no ", char *" err_str ");"
.br
......@@ -325,6 +316,15 @@ greater than
which has been applied to the network flow denoted by
.IR fd .
Enough memory for the data must have been previously allocated.
.sp
.BI "int mapi_get_scope_size(int " fd ");"
.br
.B mapi_get_scope_size()
is used in DiMAPI and returns the number of the monitoring hosts
that have been declared in
.B mapi_create_flow()
for the network scope denoted by
.IR fd .
.\".B \-<a command line switch>
.\"<description of what that switch does>
.\".TP
......
......@@ -123,7 +123,7 @@ typedef struct functdescr {
mapidflib_function_def_t *def;
mapidflib_function_t *funct;
void *data;
void *result;
mapi_results_t *result;
} functdescr_t;
typedef struct shm_result {
......@@ -379,7 +379,7 @@ int mapi_connect(int fd)
{
case CONNECT_ACK:
flow->is_connected=1;
return 1;
return 0;
case ERROR_ACK:
mapi_set_error(flow, MCOM_ERROR_ACK);
return -1;
......@@ -411,7 +411,7 @@ static void delete_remote_flow(remote_flowdescr_t* rflow)
flist_remove(hflow->rhost->flows, hflow->fd, FLIST_LEAVE_DATA);
if (hflow->rhost->num_flows==0) {
mapiipc_remote_close(hflow->rhost); //close the socket
pthread_kill(*hflow->rhost->comm_thread, 9);
//pthread_kill(*hflow->rhost->comm_thread, 9);
flist_destroy(hflow->rhost->flows, FLIST_LEAVE_DATA);
free(hflow->rhost->flows);
free(hflow->rhost->hostname);
......@@ -545,6 +545,7 @@ int mapi_create_flow(const char *dev)
free(devp);
rflow->scope_size=flist_size(rflow->host_flowlist);
pthread_spin_lock(&remote_ipc_lock);
flist_append(remote_flowlist, rflow->fd, rflow);
incr_numflows();
......@@ -1100,7 +1101,6 @@ int mapi_apply_function(int fd, const char* funct, ...)
host_flow* hflow;
function_data *fdata;
flist_node_t* fnode;
struct dmapi_results* dres;
#endif
......@@ -1288,14 +1288,7 @@ int mapi_apply_function(int fd, const char* funct, ...)
fidseed++;
dres=(struct dmapi_results*)malloc( sizeof(struct dmapi_results) );
dres->res=(struct host_results*)malloc( sizeof(struct host_results) * flist_size(rflow->host_flowlist) );
dres->cnt=flist_size(rflow->host_flowlist);
flist_append(rflow->function_res, fidseed, dres);
for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
int i=0;
hflow=(host_flow*)fnode->data;
switch(hflow->dbuf->cmd) {
case APPLY_FUNCTION_ACK:
......@@ -1305,14 +1298,11 @@ int mapi_apply_function(int fd, const char* funct, ...)
fdata->fid=hflow->dbuf->fid;
fdata->fdef=fdef;
flist_append(hflow->functions, fidseed, fdata);
dres->res[i++].result=NULL;
break;
case ERROR_ACK:
flist_remove(rflow->function_res, fidseed, FLIST_FREE_DATA);
printf("Error! mapi_apply_function did not work!\n");
return -1;
default:
flist_remove(rflow->function_res, fidseed, FLIST_FREE_DATA);
printf("Error! mapi_apply_function did not work!\n");
return -1;
}
......@@ -1446,20 +1436,21 @@ int get_results_info(flowdescr_t *flow,functdescr_t *f)
//Read result from a function
//fd: flow descriptor
//fid: ID of function
void* mapi_read_results(int fd, int fid)
mapi_results_t* mapi_read_results(int fd, int fid)
{
flowdescr_t *flow;
functdescr_t *f;
mapi_result_t res;
void *result;
struct timeval tv; /*used for timestamping results when produced */
#ifdef DIMAPI
remote_flowdescr_t *rflow;
host_flow* hflow;
struct dmapi_results* dres;
unsigned int currhost = 0;
struct timeval tv;
flist_node_t* fnode;
mapi_results_t* results;
int i;
function_data* fdata;
#endif
if (!minit) {
......@@ -1479,7 +1470,17 @@ void* mapi_read_results(int fd, int fid)
//mapi_set_error(rflow, MAPI_FLOW_NOT_CONNECTED);
return NULL;
}
if ((results=flist_get(rflow->function_res, fid))==NULL) { //init once
results=(mapi_results_t*)malloc(sizeof(mapi_results_t)*rflow->scope_size);
for (i=0; i<rflow->scope_size; i++) {
fdata=flist_get(((host_flow*)flist_head(rflow->host_flowlist)->data)->functions, fid);
results[i].size=fdata->fdef->shm_size;
results[i].res=(void*)malloc(results->size);
}
flist_append(rflow->function_res, fid, results);
}
for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
hflow=(host_flow*)fnode->data;
hflow->dbuf->cmd = READ_RESULT;
......@@ -1492,22 +1493,12 @@ void* mapi_read_results(int fd, int fid)
//wait results
dres=(struct dmapi_results*)flist_get(rflow->function_res, fid);
for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
hflow=(host_flow*)fnode->data;
switch(hflow->dbuf->cmd) {
case READ_RESULT_ACK:
gettimeofday(&tv, NULL);
dres->res[currhost].t_received = tv.tv_sec*1000000 + tv.tv_usec;
dres->res[currhost].t_produced = hflow->dbuf->timestamp;
strncpy(dres->res[currhost].host_dev, hflow->rhost->hostname, HOST_DEV_SIZE);
strcat(dres->res[currhost].host_dev, " : ");
strncat(dres->res[currhost].host_dev, hflow->dev, HOST_DEV_SIZE-strlen(hflow->rhost->hostname)-3);
if (dres->res[currhost].result!=NULL) free(dres->res[currhost].result);
dres->res[currhost].result = (void *)malloc(hflow->dbuf->length-BASIC_SIZE);
memcpy(dres->res[currhost].result, hflow->dbuf->data, hflow->dbuf->length-BASIC_SIZE);
if (currhost==0) memcpy(dres->first_res, hflow->dbuf->data, hflow->dbuf->length-BASIC_SIZE);
memcpy(results[currhost].res, hflow->dbuf->data, results[currhost].size);
results[currhost].ts = hflow->dbuf->timestamp;
break;
case ERROR_ACK:
default:
......@@ -1517,7 +1508,7 @@ void* mapi_read_results(int fd, int fid)
++currhost;
}
return(dres);
return(results);
}
#endif
......@@ -1538,7 +1529,7 @@ void* mapi_read_results(int fd, int fid)
f=flist_get(flow->flist,fid);