Commit 183c177b authored by 's avatar
Browse files

Correct argument mapping in DiMAPI

git-svn-id: file:///home/svn/mapi/trunk@650 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent 2613b847
......@@ -131,6 +131,7 @@ struct host {
int sockfd;
int num_flows; //to know when to close the socket
flist_t *flows;
flist_t* functions;
pthread_t* comm_thread; //communication thread
};
......
......@@ -420,7 +420,7 @@ int mapi_connect(int fd)
static void delete_remote_flow(remote_flowdescr_t* rflow)
{
host_flow* hflow;
flist_node_t* fnode, *fnode2;
flist_node_t* fnode, *fnode2, *fnode3;
mapi_results_t* res;
pthread_spin_lock(&remote_ipc_lock);
......@@ -437,11 +437,16 @@ static void delete_remote_flow(remote_flowdescr_t* rflow)
hflow=(host_flow*)fnode->data;
hflow->rhost->num_flows--;
flist_remove(hflow->rhost->flows, hflow->fd, FLIST_LEAVE_DATA);
for (fnode3=flist_head(hflow->functions); fnode3!=NULL; fnode3=flist_next(fnode3)) {
flist_remove(hflow->rhost->functions, ((function_data*)fnode3->data)->fid, FLIST_LEAVE_DATA);
}
if (hflow->rhost->num_flows==0) {
mapiipc_remote_close(hflow->rhost); //close the socket
//pthread_kill(*hflow->rhost->comm_thread, 9);
flist_destroy(hflow->rhost->flows, FLIST_LEAVE_DATA);
free(hflow->rhost->flows);
flist_destroy(hflow->rhost->functions, FLIST_LEAVE_DATA);
free(hflow->rhost->functions);
free(hflow->rhost->hostname);
pthread_spin_lock(&hostlist_lock);
flist_remove(hostlist, hflow->rhost->sockfd, FLIST_LEAVE_DATA);
......@@ -552,6 +557,8 @@ int mapi_create_flow(const char *dev)
h->port = dimapi_port;
h->flows = (flist_t *)malloc(sizeof(flist_t));
flist_init(h->flows);
h->functions = (flist_t *)malloc(sizeof(flist_t));
flist_init(h->functions);
h->num_flows=0;
// Create the socket
if (mapiipc_remote_init(h)<0) {
......@@ -1182,9 +1189,11 @@ int mapi_apply_function(int fd, const char* funct, ...)
#ifdef DIMAPI
unsigned char is_remote=0;
remote_flowdescr_t* rflow;
remote_flowdescr_t* ref_flow;
host_flow* hflow;
function_data *fdata;
flist_node_t* fnode;
int i;
#endif
......@@ -1251,42 +1260,93 @@ int mapi_apply_function(int fd, const char* funct, ...)
return -1;
}
va_start(vl,funct);
pos = qbuf.data; // point to start of arguments buffer
#ifdef DIMAPI
if(is_remote){//flow is remote
if (agent==1) {
args = va_arg(vl, unsigned char*);
}
for (fnode=flist_head(rflow->host_flowlist), i=1; fnode!=NULL; fnode=flist_next(fnode), i++) {
hflow=(host_flow*)fnode->data;
hflow->dbuf->cmd=APPLY_FUNCTION;
hflow->dbuf->fd=hflow->fd;
memcpy(hflow->dbuf->data, funct, strlen(funct)+1);//put function name in the buffer
// parse function arguments
if(strncmp(fdef->argdescr, "", 1)) { // there are some args
argdescr_ptr = fdef->argdescr;
while(strlen(argdescr_ptr) > 0){
switch(*argdescr_ptr) {
case 's':
if (agent==0) temp=va_arg(vl, char*);
else {
temp=(char*)args;
args+=strlen(temp)+1;
}
addarg(&pos, temp, STRING);
arg_size+=strlen(temp)+1;
break;
case 'i':
if (agent==0) tmp = va_arg(vl, int);
else {
memcpy(&tmp, args, sizeof(int));
args+=sizeof(int);
}
addarg(&pos, &tmp, INT);
arg_size+=sizeof(int);
break;
case 'c':
if (agent==0) ctmp = va_arg(vl, int); //`char' is promoted to `int' when passed through `...'
else {
memcpy(&ctmp, args, sizeof(char));
args+=sizeof(char);
}
va_start(vl,funct);
pos = qbuf.data; // point to start of arguments buffer
if (agent==1) {
args = va_arg(vl, unsigned char*);
}
// parse function arguments
if(strncmp(fdef->argdescr, "", 1)) { // there are some args
argdescr_ptr = fdef->argdescr;
while(strlen(argdescr_ptr) > 0){
switch(*argdescr_ptr) {
case 's':
if (agent==0) temp=va_arg(vl, char*);
else {
temp=(char*)args;
args+=strlen(temp)+1;
}
addarg(&pos, temp, STRING);
arg_size+=strlen(temp)+1;
break;
case 'i':
if (agent==0) tmp = va_arg(vl, int);
else {
memcpy(&tmp, args, sizeof(int));
args+=sizeof(int);
}
addarg(&pos, &tmp, INT);
arg_size+=sizeof(int);
break;
case 'r': //reference to a flow
if (agent==0) tmp = va_arg(vl, int);
else {
memcpy(&tmp, args, sizeof(int));
args+=sizeof(int);
}
#ifdef DIMAPI
if (is_remote) {
ref_flow=flist_get(remote_flowlist, tmp);
if (ref_flow==NULL || i>ref_flow->scope_size) {
fprintf(stderr,"Invalid flow in function arguments\n");
return -1;
}
tmp=((host_flow*)flist_get(ref_flow->host_flowlist, i))->fd;
addarg(&pos, &tmp ,INT);
}
else
#endif
addarg(&pos, &tmp, INT);
arg_size+=sizeof(int);
break;
case 'f': //reference to a fuction
if (agent==0) tmp = va_arg(vl, int);
else {
memcpy(&tmp, args, sizeof(int));
args+=sizeof(int);
}
#ifdef DIMAPI
if (is_remote) {
fdata=flist_get(hflow->rhost->functions, tmp);
if (fdata==NULL) {
fprintf(stderr,"Invalid fid in function arguments\n");
return -1;
}
tmp=fdata->fid;
addarg(&pos, &tmp,INT);
}
else
#endif
addarg(&pos, &tmp, INT);
arg_size+=sizeof(int);
break;
case 'c':
if (agent==0) ctmp = va_arg(vl, int); //`char' is promoted to `int' when passed through `...'
else {
memcpy(&ctmp, args, sizeof(char));
args+=sizeof(char);
}
addarg(&pos, &ctmp, CHAR);
arg_size+=sizeof(char);
break;
......@@ -1366,15 +1426,9 @@ int mapi_apply_function(int fd, const char* funct, ...)
argdescr_ptr++; // move to the next arg
}
}
#ifdef DIMAPI
if(is_remote){//flow is remote
for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
hflow=(host_flow*)fnode->data;
hflow->dbuf->cmd=APPLY_FUNCTION;
hflow->dbuf->fd=hflow->fd;
memcpy(hflow->dbuf->data, funct, strlen(funct)+1);//put function name in the buffer
va_end(vl);
memcpy(hflow->dbuf->data+strlen(funct)+1, qbuf.data, arg_size); //argument size
hflow->dbuf->length=BASIC_SIZE + strlen(funct) + 1 + arg_size;
}
......@@ -1396,6 +1450,7 @@ int mapi_apply_function(int fd, const char* funct, ...)
fdata->fid=hflow->dbuf->fid;
fdata->fdef=fdef;
flist_append(hflow->functions, fidseed, fdata);
flist_append(hflow->rhost->functions, fidseed, fdata);
break;
case ERROR_ACK:
printf("Error! mapi_apply_function did not work!\n");
......@@ -1412,6 +1467,135 @@ int mapi_apply_function(int fd, const char* funct, ...)
}
#endif
va_start(vl,funct);
pos = qbuf.data; // point to start of arguments buffer
if (agent==1) {
args = va_arg(vl, unsigned char*);
}
// parse function arguments
if(strncmp(fdef->argdescr, "", 1)) { // there are some args
argdescr_ptr = fdef->argdescr;
while(strlen(argdescr_ptr) > 0){
switch(*argdescr_ptr) {
case 's':
if (agent==0) temp=va_arg(vl, char*);
else {
temp=(char*)args;
args+=strlen(temp)+1;
}
addarg(&pos, temp, STRING);
arg_size+=strlen(temp)+1;
break;
case 'i':
if (agent==0) tmp = va_arg(vl, int);
else {
memcpy(&tmp, args, sizeof(int));
args+=sizeof(int);
}
addarg(&pos, &tmp, INT);
arg_size+=sizeof(int);
break;
case 'r':
if (agent==0) tmp = va_arg(vl, int);
else {
memcpy(&tmp, args, sizeof(int));
args+=sizeof(int);
}
addarg(&pos, &tmp, INT);
arg_size+=sizeof(int);
break;
case 'f':
if (agent==0) tmp = va_arg(vl, int);
else {
memcpy(&tmp, args, sizeof(int));
args+=sizeof(int);
}
addarg(&pos, &tmp, INT);
arg_size+=sizeof(int);
break;
case 'c':
if (agent==0) ctmp = va_arg(vl, int); //`char' is promoted to `int' when passed through `...'
else {
memcpy(&ctmp, args, sizeof(char));
args+=sizeof(char);
}
addarg(&pos, &ctmp, CHAR);
arg_size+=sizeof(char);
break;
case 'l':
if (agent==0) ltmp = va_arg(vl, unsigned long long);
else {
memcpy(&ltmp, args, sizeof(unsigned long long));
args+=sizeof(unsigned long long);
}
addarg(&pos, &ltmp, UNSIGNED_LONG_LONG);
arg_size+=sizeof(unsigned long long);
break;
/*
// Adding UID as a "hidden" argument
case 'u':
tmp = getuid();
addarg(&pos, &tmp, INT);
break;
// Adding PWD as a "hidden" argument
case 'p':
stemp = getcwd(NULL, 64);
sbuff = malloc(strlen(stemp) + 2);
strcpy(sbuff, stemp);
sbuff[strlen(stemp)] = '/';
sbuff[strlen(stemp)+1] = '\0';
addarg(&pos, sbuff, STRING);
free(sbuff);
break;
*/
case 'w':
//Open file for writing
// printf("--------------agent = %d\n", agent);
if (agent==0) filename=va_arg(vl, char*);
else {
filename=(char*)args;
args+=strlen(filename)+1;
}
if (agent == 1) {
char *tmp_fname;
int un_id=0;
filename = (tmp_fname=strrchr(filename, '/'))?(tmp_fname+1):filename;
do {
asprintf(&tmp_fname, "%s-%d", filename, un_id++);
tmp=open(tmp_fname, O_WRONLY|O_TRUNC|O_CREAT|O_EXCL|O_LARGEFILE,S_IRUSR|S_IWUSR);
free(tmp_fname);
} while(tmp==-1 && errno==EEXIST);
}
else {
tmp=open(filename,O_WRONLY|O_TRUNC|O_CREAT|O_LARGEFILE,S_IRUSR|S_IWUSR);
}
if(tmp==-1) { //|| numfd==256)
DEBUG_CMD(printf("Error creating file: %s [%s:%d]\n",filename,__FILE__,__LINE__));
local_err=MAPI_ERROR_FILE;
return -1;
}
DEBUG_CMD(printf("Created file for writing: %s [%s:%d]\n",filename,__FILE__,__LINE__));
fds[numfd++]=tmp;
addarg(&pos, &tmp, INT);
break;
default:
local_err=MFUNCT_INVALID_ARGUMENT_DESCRIPTOR;
printf("Illegal argument descriptor %c\n",*argdescr_ptr);
//exit(EXIT_FAILURE);
return -1;
}
argdescr_ptr++; // move to the next arg
}
}
va_end(vl);
qbuf.mtype=1;
qbuf.cmd=APPLY_FUNCTION;
qbuf.fd=fd;
......
......@@ -100,7 +100,7 @@ static mapidflib_function_def_t finfo={
"", //libname
"BINOP", //name
"Simulates binary operators like sum, subtract, multiply etc", //descr
"iiiii", //argdescr
"irfrf", //argdescr
MAPI_DEVICE_ALL, //devoid
MAPIRES_SHM, //Method for returning results
sizeof(unsigned long long), //shm size
......
......@@ -188,7 +188,7 @@ static mapidflib_function_def_t finfo={
"", //libname
"BUCKET", //name
"Sampling of packets\n\tReturn value: int\nParameters:\n\tint fd of resultfunction\n\tint fid of resultfunction", //descr
"iis", //argdescr
"rfs", //argdescr
MAPI_DEVICE_ALL, //devoid
MAPIRES_SHM,
sizeof(struct bucket_ringbuffer)+(BUCKET_SIZE)*sizeof(struct bucket_data),
......
......@@ -133,7 +133,7 @@ static mapidflib_function_def_t finfo={
"", //libname
"DIST", //name
"Shows the distribution of results from other functions", //descr
"iisss", //argdescr
"rfsss", //argdescr
MAPI_DEVICE_ALL, //devoid
MAPIRES_SHM, //Method for returning results
0, //shm size. Set by instance
......
......@@ -116,7 +116,7 @@ static mapidflib_function_def_t finfo={
"", //libname
"STATS", //name
"Returns statistical information about unsigned long long values from other functions\nParameters:\n int fd - flowdescriptor of resultfunction\n int fid - function ID for reading results\n char skip - if set to 1 then skip results from first packet\nReturn type: mapi_stats_t", //descr
"iic", //argdescr
"rfc", //argdescr
MAPI_DEVICE_ALL, //devoid
MAPIRES_SHM, //Method for returning results
sizeof(stats_t), //shm size
......
......@@ -856,7 +856,7 @@ static mapidflib_function_def_t finfo={
"",
"THRESHOLD",
"Thresholding function.\nParameters:\n\tfd: int\n\tfid: int\n\ttimeout: int\n\tthreshold_type: int\n\tthreshold: int, int or unsigned long long\n\tupper_bound: int\n",
"iiiiliiii",
"irfiliiii",
MAPI_DEVICE_ALL,
MAPIRES_SHM,
0,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment