Commit 771e98ad authored by 's avatar
Browse files

Add mapi_apply_function_array that reads arguments from an array


git-svn-id: file:///home/svn/mapi/trunk@902 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent f4706782
......@@ -3402,3 +3402,441 @@ int mapi_is_remote(int fd) {
return -1;
}
}
int mapi_apply_function_array(int fd, const char* funct, char** args, unsigned int argn)
//Apply function to a mapi flow
//fd: flow descriptor
//funct: function to be added
{
struct mapiipcbuf qbuf;
//int fds[256];
int numfd=0;
//struct functiondescr* f=malloc(sizeof(struct functiondescr));
mapidflib_function_def_t *fdef;
functdescr_t *f;
mapiFunctArg *pos;
int tmp;
unsigned long long ltmp;
char ctmp;
// char* stemp;
//char* sbuff;
flowdescr_t *flow;
char *argdescr_ptr;
char *filename;
char *temp;
unsigned int arg_size=0; //only used in dimapi - declared here to avoid multiple ifdefs later
char *tmp_fname;
int un_id=0;
int curr_arg=0;
#ifdef DIMAPI
unsigned char is_remote=0;
remote_flowdescr_t* rflow;
remote_flowdescr_t* ref_flow;
host_flow* hflow;
function_data *fdata;
flist_node_t* fnode;
int i;
#endif
if (!minit) {
DEBUG_CMD(printf("Not initialized! [%s:%d]\n",__FILE__,__LINE__));
local_err = MAPI_INIT_ERROR;
return -1;
}
else if(fd<=0){
DEBUG_CMD(printf("Error wrong fd in mapi_apply_function\n\n"));
local_err = MAPI_INVALID_FID_FUNCID;
return -1;
}
if(funct==NULL){
DEBUG_CMD(printf("Error NULL function in mapi_apply_function\n\n"));
local_err = MFUNCT_COULD_NOT_APPLY_FUNCT;
return -1;
}
#ifdef DIMAPI
if ((rflow=flist_get(remote_flowlist,fd))!=NULL) {
if(rflow->is_connected){
printf("\nERROR can not apply function on an already connected flow\n");
//TODO when mapi_set_flow can support remote flows enable the folowing
local_err = MFUNCT_COULD_NOT_APPLY_FUNCT;
return -1;
}
//we create a dummy flow descriptor just to get the function info we want from mapilh_get_function_def
//flow=(flowdescr_t *)malloc(sizeof(flowdescr_t));
//flow->devtype="1.3";
is_remote = 1;//indicates that flow is remote
}
else {
#endif
if ((flow=flist_get(flowlist,fd))==NULL) {
DEBUG_CMD(printf("Invalid flow: %d [%s:%d]\n",fd,__FILE__,__LINE__));
local_err = MAPI_INVALID_FLOW;
return -1;
} /*else if (flow->error!=0) {
DEBUG_CMD(printf("Invalid flow: %d due to error #%d [%s:%d]\n",fd,flow->error,__FILE__,__LINE__));
local_err = MAPI_INVALID_FLOW;
return -1;
}*/
if(flow->is_connected){
printf("\nERROR can not apply function on an already connected flow\n");
local_err = MFUNCT_COULD_NOT_APPLY_FUNCT;
return -1;
}
#ifdef DIMAPI
}
if (is_remote) fdef=mapilh_get_function_def(funct,"1.3");
else
#endif
//Get information about function
fdef=mapilh_get_function_def(funct,flow->devtype);
if(fdef==NULL) {
DEBUG_CMD(printf("Could not find/match function %s [%s:%d]\n",funct,__FILE__,__LINE__));
local_err = MAPI_FUNCTION_NOT_FOUND;
return -1;
}
#ifdef DIMAPI
if(is_remote){//flow is remote
for (fnode=flist_head(rflow->host_flowlist), i=1; fnode!=NULL; fnode=flist_next(fnode), i++) {
hflow=(host_flow*)fnode->data;
hflow->dbuf->cmd=APPLY_FUNCTION;
hflow->dbuf->fd=hflow->fd;
memcpy(hflow->dbuf->data, funct, strlen(funct)+1);//put function name in the buffer
pos = qbuf.data; // point to start of arguments buffer
if ( strlen(fdef->argdescr) != argn ) {
local_err = MFUNCT_INVALID_ARGUMENT;
return -1;
}
curr_arg=0;
// parse function arguments
if(strncmp(fdef->argdescr, "", 1)) { // there are some args
argdescr_ptr = fdef->argdescr;
while(strlen(argdescr_ptr) > 0){
switch(*argdescr_ptr) {
case 's':
temp=args[curr_arg];
curr_arg++;
addarg(&pos, temp, STRING);
arg_size+=strlen(temp)+1;
break;
case 'i':
tmp=atoi(args[curr_arg]);
curr_arg++;
addarg(&pos, &tmp, INT);
arg_size+=sizeof(int);
break;
case 'r': //reference to a flow
tmp=atoi(args[curr_arg]);
curr_arg++;
//if (is_remote) {
ref_flow=flist_get(remote_flowlist, tmp);
if (ref_flow==NULL || i>ref_flow->scope_size) {
DEBUG_CMD(printf("Invalid flow in function arguments\n"));
return -1;
}
tmp=((host_flow*)flist_get(ref_flow->host_flowlist, i))->fd;
addarg(&pos, &tmp ,INT);
//}
//else
//addarg(&pos, &tmp, INT);
arg_size+=sizeof(int);
break;
case 'f': //reference to a fuction
tmp=atoi(args[curr_arg]);
curr_arg++;
//if (is_remote) {
fdata=flist_get(hflow->rhost->functions, tmp);
if (fdata==NULL) {
DEBUG_CMD(printf("Invalid fid in function arguments\n"));
return -1;
}
tmp=fdata->fid;
addarg(&pos, &tmp,INT);
//}
//else
//addarg(&pos, &tmp, INT);
arg_size+=sizeof(int);
break;
case 'c':
ctmp=args[curr_arg][0];
curr_arg++;
addarg(&pos, &ctmp, CHAR);
arg_size+=sizeof(char);
break;
case 'l':
ltmp = (unsigned long long)atoll(args[curr_arg]);
curr_arg++;
addarg(&pos, &ltmp, UNSIGNED_LONG_LONG);
arg_size+=sizeof(unsigned long long);
break;
/*
// Adding UID as a "hidden" argument
case 'u':
tmp = getuid();
addarg(&pos, &tmp, INT);
break;
// Adding PWD as a "hidden" argument
case 'p':
stemp = getcwd(NULL, 64);
sbuff = malloc(strlen(stemp) + 2);
strcpy(sbuff, stemp);
sbuff[strlen(stemp)] = '/';
sbuff[strlen(stemp)+1] = '\0';
addarg(&pos, sbuff, STRING);
free(sbuff);
break;
*/
case 'w':
//Open file for writing
// printf("--------------agent = %d\n", agent);
filename=args[curr_arg];
if (filename==NULL) {
local_err = MAPI_ERROR_FILE;
return -1;
}
curr_arg++;
//if(is_remote){//flow is remote
if (filename==NULL) {
local_err = MAPI_ERROR_FILE;
return -1;
}
addarg(&pos, filename, STRING);
arg_size+=strlen(filename)+1;
break;
//}
default:
local_err=MFUNCT_INVALID_ARGUMENT_DESCRIPTOR;
printf("Illegal argument descriptor %c\n",*argdescr_ptr);
//exit(EXIT_FAILURE);
return -1;
}
argdescr_ptr++; // move to the next arg
}
}
memcpy(hflow->dbuf->data+strlen(funct)+1, qbuf.data, arg_size); //argument size
hflow->dbuf->length=BASIC_SIZE + strlen(funct) + 1 + arg_size;
}
if (mapiipc_remote_write_to_all(rflow)<0){
local_err = MCOM_SOCKET_ERROR;
return -1;
}
fidseed++;
for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
hflow=(host_flow*)fnode->data;
switch(hflow->dbuf->cmd) {
case APPLY_FUNCTION_ACK:
//printf("Function applied with rfid %d and with real fid %d\n",fidseed,hflow->dbuf->fid);
//generate new fid (rfid) and return this. hold every fid in the hflow in a flist
fdata=(function_data*)malloc(sizeof(function_data));
fdata->fid=hflow->dbuf->fid;
fdata->fdef=fdef;
flist_append(hflow->functions, fidseed, fdata);
flist_append(hflow->rhost->functions, fidseed, fdata);
break;
case ERROR_ACK:
//printf("Error! mapi_apply_function did not work!\n");
memcpy(&local_err, hflow->dbuf->data, sizeof(int));
return -1;
default:
//printf("Error! mapi_apply_function did not work!\n");
local_err = MCOM_UNKNOWN_ERROR;
return -1;
}
}
return fidseed;
}
#endif
pos = qbuf.data; // point to start of arguments buffer
if ( strlen(fdef->argdescr) != argn ) {
local_err = MFUNCT_INVALID_ARGUMENT;
return -1;
}
curr_arg=0;
// parse function arguments
if(strncmp(fdef->argdescr, "", 1)) { // there are some args
argdescr_ptr = fdef->argdescr;
while(strlen(argdescr_ptr) > 0){
switch(*argdescr_ptr) {
case 's':
temp=args[curr_arg];
curr_arg++;
addarg(&pos, temp, STRING);
arg_size+=strlen(temp)+1;
break;
case 'i':
tmp=atoi(args[curr_arg]);
curr_arg++;
addarg(&pos, &tmp, INT);
arg_size+=sizeof(int);
break;
case 'r':
tmp=atoi(args[curr_arg]);
curr_arg++;
addarg(&pos, &tmp, INT);
arg_size+=sizeof(int);
break;
case 'f':
tmp=atoi(args[curr_arg]);
curr_arg++;
addarg(&pos, &tmp, INT);
arg_size+=sizeof(int);
break;
case 'c':
ctmp=args[curr_arg][0];
curr_arg++;
addarg(&pos, &ctmp, CHAR);
arg_size+=sizeof(char);
break;
case 'l':
ltmp=(unsigned long long)atoll(args[curr_arg]);
curr_arg++;
addarg(&pos, &ltmp, UNSIGNED_LONG_LONG);
arg_size+=sizeof(unsigned long long);
break;
/*
// Adding UID as a "hidden" argument
case 'u':
tmp = getuid();
addarg(&pos, &tmp, INT);
break;
// Adding PWD as a "hidden" argument
case 'p':
stemp = getcwd(NULL, 64);
sbuff = malloc(strlen(stemp) + 2);
strcpy(sbuff, stemp);
sbuff[strlen(stemp)] = '/';
sbuff[strlen(stemp)+1] = '\0';
addarg(&pos, sbuff, STRING);
free(sbuff);
break;
*/
case 'w':
//Open file for writing
filename=args[curr_arg];
curr_arg++;
if (agent == 1) {
//filename = (tmp_fname=strrchr(filename, '/'))?(tmp_fname+1):filename;
tmp=open(filename, O_WRONLY|O_TRUNC|O_CREAT|O_EXCL|O_LARGEFILE,S_IRUSR|S_IWUSR);
while(tmp==-1 && errno==EEXIST) {
asprintf(&tmp_fname, "%s-%d", filename, un_id++);
tmp=open(tmp_fname, O_WRONLY|O_TRUNC|O_CREAT|O_EXCL|O_LARGEFILE,S_IRUSR|S_IWUSR);
free(tmp_fname);
}
}
else {
tmp=open(filename,O_WRONLY|O_TRUNC|O_CREAT|O_LARGEFILE,S_IRUSR|S_IWUSR);
}
if(tmp==-1) { //|| numfd==256)
DEBUG_CMD(printf("Error creating file: %s [%s:%d]\n",filename,__FILE__,__LINE__));
local_err=MAPI_ERROR_FILE;
return -1;
}
DEBUG_CMD(printf("Created file for writing: %s [%s:%d]\n",filename,__FILE__,__LINE__));
flow->fds[flow->numfd++] = tmp;
numfd++;
addarg(&pos, &tmp, INT);
break;
default:
local_err=MFUNCT_INVALID_ARGUMENT_DESCRIPTOR;
printf("Illegal argument descriptor %c\n",*argdescr_ptr);
//exit(EXIT_FAILURE);
return -1;
}
argdescr_ptr++; // move to the next arg
}
}
qbuf.mtype=1;
qbuf.cmd=APPLY_FUNCTION;
qbuf.fd=fd;
qbuf.pid=getpid();
strncpy(qbuf.function,funct,FUNCT_NAME_LENGTH);
strncpy((char *)qbuf.argdescr,fdef->argdescr,ARG_LENGTH);
pthread_spin_lock(&mapi_lock);
if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) {
local_err = MCOM_SOCKET_ERROR;
pthread_spin_unlock(&mapi_lock);
return -1;
}
if(!send_fd(flow->fds,numfd)) {
local_err=MAPI_ERROR_SEND_FD;
pthread_spin_unlock(&mapi_lock);
return -1;
}
if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
pthread_spin_unlock(&mapi_lock);
local_err = MCOM_SOCKET_ERROR;
return -1;
}
pthread_spin_unlock(&mapi_lock);
switch(qbuf.cmd)
{
case APPLY_FUNCTION_ACK:
break;
case ERROR_ACK:
local_err =qbuf.remote_errorcode;
return -1;
default:
local_err = MCOM_UNKNOWN_ERROR;
return -1;
}
fdef=mapilh_get_function_def(funct,qbuf.function);
f=malloc(sizeof(functdescr_t));
f->fid=qbuf.fid;
f->def=fdef;
f->result_init=0;
f->data=NULL;
f->result=NULL;
f->funct=malloc(sizeof(mapidflib_function_t));
f->funct->fd=fd;
f->funct->fid=qbuf.fid;
f->funct->instance=malloc(sizeof(mapidflib_function_instance_t));
f->funct->instance->status=MAPIFUNC_UNINIT;
f->funct->instance->hwinfo=NULL;
f->funct->instance->result.data = NULL;
f->funct->instance->result.data_size = 0;
f->funct->instance->result.info.funct_res_size = 0;
f->funct->instance->result.info.shm.res_size = 0;
f->funct->instance->result.info.shm.buf_size = 0;
f->funct->instance->internal_data=NULL;
memcpy(f->funct->instance->args,qbuf.data,FUNCTARGS_BUF_SIZE);
flist_append(flow->flist,qbuf.fid,f);
return qbuf.fid;
}
......@@ -169,6 +169,7 @@ extern int mapi_delete_offline_device(char *dev);
//Apply function to a flow
extern int mapi_apply_function(int fd, const char* funct, ...);
extern int mapi_apply_function_array(int fd, const char* funct, char** args, unsigned int argn);
//Connect to a mapi flow
extern int mapi_connect(int fd);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment