#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "mapi.h" #include "mapiipc.h" #include "mapilibhandler.h" #include "flist.h" #include "debug.h" #include "parseconf.h" #include "printfstring.h" //#include "mapierror.h" #include "mapi_errors.h" #ifdef WITH_ADMISSION_CONTROL #include #include #include #include "bytestream.h" #endif #ifdef WITH_ANONYMIZATION #include "anonymization/anonymization.h" #endif #ifdef DIMAPI #include #include #endif #ifdef WITH_AUTHENTICATION #define FROM_MAPI #include "vod/vod.h" #endif static pthread_once_t mapi_is_initialized = PTHREAD_ONCE_INIT; static pthread_once_t initialized = PTHREAD_ONCE_INIT; static int minit=0; //Set to 1 when MAPI has been initialized static pthread_spinlock_t mapi_lock; static pthread_spinlock_t numflows_lock; // for numflows and totalflows variables static int local_err=0; /* occurence of a mapi.c error, translation of these errors */ static int numflows=0; // number of allocated (active) flows static int totalflows=0; // number of flows so far (including closed flows) static int offline_devices; static int agent=0; extern const errorstruct Errors[]; /* //Structure used as a linked list to store information about //registered functions struct functiondescr { char* function; int fd; int fid; void* ptr; int ptrsize; char* pktbuf; struct mapid_to_buffer *to_buffer; struct functiondescr *next; }; //Linked list of registered functions static struct functiondescr *functions=NULL; */ #ifdef DIMAPI static int hostcmp(void *h1, void *h2); static void delete_remote_flow(remote_flowdescr_t* rflow); flist_t *hostlist=NULL;//list containing all remote hosts used so far extern flist_t *remote_flowlist; int dimapi_port; typedef struct function_data { int fid; mapidflib_function_def_t* fdef; } function_data; static pthread_once_t dmapi_is_initialized = PTHREAD_ONCE_INIT; static pthread_spinlock_t remote_ipc_lock; static pthread_spinlock_t hostlist_lock; static unsigned fdseed = 0; // 'scope' flow descriptor seed (always increases) static unsigned fidseed = 0; // function descriptor seed (always increases) static unsigned negfdseed=-1; // generates temporary negative fd, for use before create_flow #endif typedef struct libinfo { char* name; } libinfo_t; typedef struct flowdescr { int fd; int file; //File descriptor for offline flows char *devtype; char *shm_base; pthread_spinlock_t *shm_spinlock; flist_t *flist; /* int error; char errstr[MAPI_ERRORSTR_LENGTH]; */ unsigned char is_connected; // This should be 1 if the flow is connected 0 otherwise } flowdescr_t; flist_t *flowlist=NULL; typedef struct functdescr { int fid; short result_init; mapidflib_function_def_t *def; mapidflib_function_t *funct; void *data; mapi_results_t *result; } functdescr_t; typedef struct shm_result { void* ptr; //Pointer to shared data int size; //Size of shared data } shm_result_t; /* * Function declarations */ static int default_read_result_init(flowdescr_t *flow,functdescr_t* f,void* data); int get_results_info(flowdescr_t *flow, functdescr_t *f); //static int set_error(void* flow, int err_no , int is_remote); static int send_fd(int *fds,int numfd); //global var access functions static int get_numflows(); static int incr_numflows(); static int decr_numflows(); static int get_totalflows(); static int incr_totalflows(); static void init() //common initialization function for mapi and dimapi { #ifdef DIMAPI #define CONF_FILE "./mapi.conf:%s/.mapi.conf:/etc/mapi.conf" char* path=NULL, *libs=NULL, *str=NULL, *s=NULL; char* mapi_conf; mapi_conf = printf_string(CONF_FILE,getenv("HOME")); //init_error("/root/trunk/mapi_errors.dat"); if (pc_load (mapi_conf)) { conf_category_entry_t* empty_cat = pc_get_category(""); const char *portstr; if( empty_cat == NULL ){ fputs( "Configuration file has no empty category. Giving up.\n", stderr ); exit(1); } path = pc_get_param (empty_cat, "libpath"); libs = pc_get_param (empty_cat, "libs"); portstr = pc_get_param (empty_cat, "dimapi_port"); if( portstr == NULL ){ fprintf(stderr,"Configuration file has no entry for `dimapi_port'. Using default port %d\n", DEFAULT_DIMAPI_PORT ); dimapi_port = DEFAULT_DIMAPI_PORT; } else { /* make sure that portstr is a valid number. */ dimapi_port = atoi( portstr ); if ( dimapi_port<=0 || dimapi_port>=65536 ) { fprintf(stderr, "Invalid port given in configuration file, the default port %d is used\n",DEFAULT_DIMAPI_PORT); dimapi_port = DEFAULT_DIMAPI_PORT; } } } else { fputs("Error: cannot load mapi.conf file. Giving up.\n", stderr); fprintf( stderr, "Search path is: %s\n", mapi_conf ); exit(1); } #endif minit = 1; pthread_spin_init(&numflows_lock, PTHREAD_PROCESS_PRIVATE); flowlist = malloc(sizeof(flist_t)); flist_init(flowlist); #ifdef DIMAPI pthread_spin_init(&remote_ipc_lock, PTHREAD_PROCESS_PRIVATE); pthread_spin_init(&hostlist_lock, PTHREAD_PROCESS_PRIVATE); hostlist = malloc(sizeof(flist_t)); remote_flowlist = malloc(sizeof(flist_t)); flist_init(hostlist); flist_init(remote_flowlist); str=libs; while((s=strchr(str,':'))!=NULL) { *s='\0'; mapilh_load_library(path,str); str=s+1; } mapilh_load_library(path,str); free(mapi_conf); pc_close(); #endif } //Initializes MAPI - called only once by pthread_once() static void mapi_init() { struct mapiipcbuf qbuf; char libpath[4096],*str,*s; minit=1; if(mapiipc_client_init()==-1){ local_err = MCOM_INIT_SOCKET_ERROR; } pthread_once(&initialized, (void*)init); pthread_spin_init(&mapi_lock, PTHREAD_PROCESS_PRIVATE); offline_devices = 0; //Get libpath from mapid qbuf.mtype=1; qbuf.cmd=GET_LIBPATH; qbuf.fd=getpid(); qbuf.pid=getpid(); pthread_spin_lock(&mapi_lock); if( mapiipc_write((struct mapiipcbuf*)&qbuf)) local_err = MCOM_SOCKET_ERROR; if( mapiipc_read((struct mapiipcbuf*)&qbuf) ) local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); switch(qbuf.cmd) { case GET_LIBPATH_ACK: strncpy(libpath, (char *)qbuf.data, 4096); break; default: /* MAPI_ERROR_GETTING_LIBPATH */ return; break; } DEBUG_CMD(printf("libpath=%s\n",libpath)); //get libs from mapid qbuf.mtype=1; qbuf.cmd=GET_LIBS; qbuf.fd=getpid(); qbuf.pid=getpid(); pthread_spin_lock(&mapi_lock); if(mapiipc_write((struct mapiipcbuf*)&qbuf)) local_err = MCOM_SOCKET_ERROR; if( mapiipc_read((struct mapiipcbuf*)&qbuf)) local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); switch(qbuf.cmd) { case GET_LIBS_ACK: break; default: /* MAPI_ERROR_GETTING_LIBS */ return; break; } //Load function libraries str = (char *)qbuf.data; while((s=strchr(str,':'))!=NULL) { *s='\0'; mapilh_load_library(libpath,str); str=s+1; } mapilh_load_library(libpath,str); return; } #ifdef DIMAPI //Initializes DIMAPI - called only once by pthread_once() static void dmapi_init() { pthread_once(&initialized, (void*)init); return; } #endif int mapi_connect(int fd) //Connect to a mapi flow //fd = flow descriptor { struct mapiipcbuf qbuf; flowdescr_t* flow; #ifdef DIMAPI remote_flowdescr_t* rflow; host_flow* hflow; flist_node_t* fnode; #endif if (!minit) { DEBUG_CMD(printf("Not initialized! [%s:%d]\n",__FILE__,__LINE__)); return -1; }else if (fd<=0){ //could not use set_error since the flow is not connected and may //cause seg fault fprintf(stderr,"Invalid Flow descriptor [%s:%d]\n",__FILE__,__LINE__); return -1; } #ifdef DIMAPI if ((rflow=flist_get(remote_flowlist,fd))!=NULL) {//flow is remote for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; hflow->dbuf->cmd=CONNECT; hflow->dbuf->fd=hflow->fd; hflow->dbuf->length=BASIC_SIZE; } #ifdef WITH_AUTHENTICATION if(rflow->is_authenticated == 0) { fprintf(stderr, "Flow with id %d is not authenticated. Aborting. [%s:%d]\n", fd, __FILE__, __LINE__); rflow->is_connected = 0; return(-2); } #endif if (mapiipc_remote_write_to_all(rflow)<0){ local_err = MCOM_SOCKET_ERROR; return -1; } //wait results for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; switch(hflow->dbuf->cmd) { case CONNECT_ACK: rflow->is_connected=1; continue; case ERROR_ACK: local_err = MAPI_CONNECT; //is remote flow //printf("Error! mapi_connect did not work!\n"); return -1; default: local_err = MAPI_CONNECT; //printf("Error! mapi_connect did not work!\n"); return -1; } } return 0; } #endif if ((flow=flist_get(flowlist,fd))==NULL) { DEBUG_CMD(printf("Invalid flow: %d [%s:%d]\n",fd,__FILE__,__LINE__)); local_err = MAPI_INVALID_FLOW; return -1; } /*else if (flow->error!=0) { DEBUG_CMD(printf("Invalid flow: %d due to error #%d [%s:%d]\n",fd,flow->error,__FILE__,__LINE__)); local_err = MAPI_INVALID_FLOW; return -1; }*/ qbuf.mtype=1; qbuf.cmd=CONNECT; qbuf.fd=fd; qbuf.pid=getpid(); pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return -1; } pthread_spin_unlock(&mapi_lock); switch(qbuf.cmd) { case CONNECT_ACK: flow->is_connected=1; return 0; case ERROR_ACK: local_err = qbuf.remote_errorcode; return -1; default: local_err= MCOM_UNKNOWN_ERROR; return -1; } } #ifdef DIMAPI static void delete_remote_flow(remote_flowdescr_t* rflow) { host_flow* hflow; flist_node_t* fnode, *fnode2; pthread_spin_lock(&remote_ipc_lock); flist_remove(remote_flowlist, rflow->fd,FLIST_LEAVE_DATA); decr_numflows(); pthread_spin_unlock(&remote_ipc_lock); sem_destroy(&rflow->fd_sem); sem_destroy(&rflow->pkt_sem); pthread_mutex_destroy(&rflow->mutex); for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=fnode2 ) { fnode2=flist_next(fnode); hflow=(host_flow*)fnode->data; hflow->rhost->num_flows--; flist_remove(hflow->rhost->flows, hflow->fd, FLIST_LEAVE_DATA); if (hflow->rhost->num_flows==0) { mapiipc_remote_close(hflow->rhost); //close the socket //pthread_kill(*hflow->rhost->comm_thread, 9); flist_destroy(hflow->rhost->flows, FLIST_LEAVE_DATA); free(hflow->rhost->flows); free(hflow->rhost->hostname); pthread_spin_lock(&hostlist_lock); flist_remove(hostlist, hflow->rhost->sockfd, FLIST_LEAVE_DATA); pthread_spin_unlock(&hostlist_lock); free(hflow->rhost); } //we check if a host is using in other rflows and delete it -close the socket- if not flist_destroy(hflow->functions, FLIST_FREE_DATA); free(hflow->functions); free(hflow->dev); free(hflow->dbuf); flist_remove(rflow->host_flowlist,hflow->id,FLIST_FREE_DATA); } if (rflow->pkt_list!=NULL) flist_destroy(rflow->pkt_list, FLIST_LEAVE_DATA); free(rflow->pkt); flist_destroy(rflow->function_res, FLIST_FREE_DATA); free(rflow); } #endif int mapi_create_flow(const char *dev) //Create new flow //dev=device that should be used { struct mapiipcbuf qbuf; flowdescr_t *flow, *tmpflow; if(dev==NULL){ fprintf(stderr, "Error wrong device name given \n\n"); local_err = MAPI_DEVICE_INFO_ERR; return -1; } #ifdef DIMAPI remote_flowdescr_t *rflow; char *hostname=NULL, *s=NULL, *k=NULL; struct host *h=NULL; host_flow* hflow; char *devp=strdup(dev); flist_node_t* fnode; unsigned int idgen=0; #endif //if(!minit) // if((ret=mapi_init())!=0) // return ret; //check if flow is remote or not and call the appropriate init function #ifndef DIMAPI pthread_once(&mapi_is_initialized, (void*)mapi_init); #endif #ifdef DIMAPI if ( strchr(dev,':')==NULL) pthread_once(&mapi_is_initialized, (void*)mapi_init); else pthread_once(&dmapi_is_initialized, (void*)dmapi_init); if ((s = strchr(dev,':'))!=NULL) { rflow=(remote_flowdescr_t *)malloc(sizeof(remote_flowdescr_t)); rflow->fd=++fdseed; sem_init(&rflow->fd_sem, 0, 0); sem_init(&rflow->pkt_sem, 0, 0); pthread_mutex_init(&rflow->mutex, NULL); rflow->pending_msgs=0; rflow->host_flowlist=(flist_t*)malloc(sizeof(flist_t)); flist_init(rflow->host_flowlist); rflow->pkt_list=NULL; rflow->pkt=(struct mapipkt*)malloc(sizeof(struct mapipkt)+PKT_LENGTH); rflow->function_res=(flist_t*)malloc(sizeof(flist_t)); rflow->is_connected = 0; flist_init(rflow->function_res); k=strtok(devp, ", "); while (k!=NULL) { if ((s = strchr(k,':'))!=NULL) { *s = '\0'; hostname = k; k = s + 1; //printf("host: %s - device: %s\n",hostname, k); pthread_spin_lock(&hostlist_lock); h = (struct host *)flist_search(hostlist, hostcmp, hostname); if(h==NULL){// Our host is a new one --> insert it in the hostlist h = (struct host *)malloc(sizeof(struct host)); h->hostname = strdup(hostname); //printf("New host %s\n",hostname); h->port = dimapi_port; h->flows = (flist_t *)malloc(sizeof(flist_t)); flist_init(h->flows); h->num_flows=0; // Create the socket if (mapiipc_remote_init(h)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&hostlist_lock); return -1; } h->comm_thread=(pthread_t *)malloc(sizeof(pthread_t)); pthread_create(h->comm_thread, NULL, *mapiipc_comm_thread, h); //printf("New communication thread created\n"); flist_append(hostlist, h->sockfd, h); pthread_spin_unlock(&hostlist_lock); } else{//host exists in the list //printf("%s host again\n",h->hostname); pthread_spin_unlock(&hostlist_lock); } h->num_flows++; hflow=(host_flow*)malloc(sizeof(host_flow)); hflow->scope_fd=rflow->fd; hflow->dev=strdup(k); flist_append(rflow->host_flowlist, ++idgen, hflow); flist_append(h->flows, --negfdseed, hflow); hflow->fd=negfdseed; hflow->dbuf=(struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf)); hflow->rhost=h; hflow->functions=(flist_t *)malloc(sizeof(flist_t)); flist_init(hflow->functions); hflow->dbuf->cmd=CREATE_FLOW; strncpy((char *)hflow->dbuf->data,k,DATA_SIZE); hflow->dbuf->length=BASIC_SIZE+strlen(k)+1; } else { //this is the case where the dev string contains both 'host:interface1' and 'interface2' //example: mapi_create_flow("139.91.70.98:eth0, 147.52.16.102:eth0, eth1"); //user's intention is probably localhost:eth1 //what should be done in this case? } k=strtok(NULL,", "); } free(devp); rflow->scope_size=flist_size(rflow->host_flowlist); pthread_spin_lock(&remote_ipc_lock); flist_append(remote_flowlist, rflow->fd, rflow); incr_numflows(); incr_totalflows(); pthread_spin_unlock(&remote_ipc_lock); if (mapiipc_remote_write_to_all(rflow)<0){ local_err = MCOM_SOCKET_ERROR; return -1; } //sends to all hosts of rflow the proper dbuf, increment the pending_msgs makes sem_wait(rflow->fd_sem) and the comm_thread will get the results - the hflow->fd for every flow - //wait for results for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; if (hflow->dbuf->cmd == CREATE_FLOW_ACK && *((int*)hflow->dbuf->data)!=-1) { hflow->fd=*((int*)hflow->dbuf->data); flist_remove(hflow->rhost->flows, hflow->dbuf->fd, FLIST_LEAVE_DATA); flist_append(hflow->rhost->flows, hflow->fd, hflow); printf("New host flow created with fd %d for rflow %d\n",hflow->fd,rflow->fd); } else {//error (fd==-1) delete_remote_flow(rflow); return -1; } } return rflow->fd; } #endif if ((get_numflows() == 0) && (get_totalflows() > 0) && minit) { // socket has been closed, re-create it pthread_spin_lock(&mapi_lock); if(mapiipc_client_init()==-1){ local_err = MCOM_INIT_SOCKET_ERROR; } pthread_spin_unlock(&mapi_lock); } qbuf.mtype=1; qbuf.cmd=CREATE_FLOW; qbuf.fd=getpid(); qbuf.pid=getpid(); strncpy((char *)qbuf.data,dev,DATA_SIZE); pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } pthread_spin_unlock(&mapi_lock); switch(qbuf.cmd) { case CREATE_FLOW_ACK: tmpflow=flist_get(flowlist,qbuf.fd); if (tmpflow!=NULL) { ERROR_CMD(fprintf(stderr,"mapid gave us a fd which already exist in our lists (%d), exiting [%s:%d]\n", qbuf.fd,__FILE__,__LINE__)); //exit(EXIT_FAILURE); return -1; } flow=malloc(sizeof(flowdescr_t)); if( flow == NULL ){ fputs( "Out of memory\n", stderr ); //exit( EXIT_FAILURE ); return -1; } flow->fd=qbuf.fd; flow->devtype=(char *)malloc(strlen((char *)qbuf.data)+1); flow->flist=malloc(sizeof(flist_t)); flow->shm_base=NULL; flow->shm_spinlock=NULL; //flow->error=0; //flow->errstr[0]='\0'; flow->is_connected =0; flist_init(flow->flist); strcpy(flow->devtype,(char *)qbuf.data); pthread_spin_lock(&mapi_lock); flist_append(flowlist,qbuf.fd,flow); incr_numflows(); incr_totalflows(); pthread_spin_unlock(&mapi_lock); return qbuf.fd; /* should probably have a separate error message for ERROR_ACK? */ case ERROR_ACK: local_err=qbuf.remote_errorcode; return -1; default: local_err=MCOM_UNKNOWN_ERROR; return -1; } } int mapi_create_offline_flow(const char *dev, int format) //Create new flow //dev=device that should be used { struct mapiipcbuf qbuf; flowdescr_t *flow=NULL; int file; pthread_once(&mapi_is_initialized, (void*)mapi_init); //Check to see if file can be opened if (dev==NULL){ fprintf(stderr,"Error NULL device in mapi_create_offline_flow\n\n"); return -1; } else if ((file=open(dev,O_LARGEFILE))==-1) { local_err=MAPI_ERROR_FILE; return -1; } pthread_spin_lock(&mapi_lock); if ((get_numflows() == 0) && (get_totalflows() > 0) && minit) { // socket has been closed, re-create it if (mapiipc_client_init()<0) { local_err = MCOM_INIT_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } } pthread_spin_unlock(&mapi_lock); qbuf.mtype=1; qbuf.cmd=CREATE_OFFLINE_FLOW; qbuf.fd=getpid(); qbuf.pid=getpid(); qbuf.fid=format; strncpy((char *)qbuf.data,dev,DATA_SIZE); pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return -1; } if(qbuf.cmd==SEND_FD) { if(mapiipc_send_fd(file)==-1) { local_err=MAPI_ERROR_SEND_FD; return -1; } } else { local_err=MAPI_ERROR_SEND_FD; pthread_spin_unlock(&mapi_lock); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return -1; } pthread_spin_unlock(&mapi_lock); switch(qbuf.cmd) { case CREATE_OFFLINE_FLOW_ACK: flow=malloc(sizeof(flowdescr_t)); flow->fd=qbuf.fd; flow->devtype=(char *)malloc(strlen((char *)qbuf.data)+1); flow->flist=malloc(sizeof(flist_t)); flow->shm_base=NULL; flow->shm_spinlock=NULL; //flow->error=0; //flow->errstr[0]='\0'; flow->is_connected=0; flist_init(flow->flist); strcpy(flow->devtype,(char *)qbuf.data); pthread_spin_lock(&mapi_lock); flist_append(flowlist,qbuf.fd,flow); incr_numflows(); incr_totalflows(); pthread_spin_unlock(&mapi_lock); return qbuf.fd; case ERROR_ACK: local_err=qbuf.remote_errorcode; return -1; default: local_err=MCOM_UNKNOWN_ERROR; return -1; } } char* mapi_create_offline_device(const char *path, int format) //Create new flow //dev=device that should be used { struct mapiipcbuf qbuf; int file; pthread_once(&mapi_is_initialized, (void*)mapi_init); //Check to see if file can be opened if (path==NULL){ fprintf(stderr,"Error NULL path in mapi_create_offline_device\n\n"); return NULL; } else if ((file=open(path,O_LARGEFILE))==-1) { local_err=MAPI_ERROR_FILE; return NULL; } pthread_spin_lock(&mapi_lock); if ((get_numflows() == 0) && (get_totalflows() > 0) && minit) { // socket has been closed, re-create it // pthread_spin_lock(&mapi_lock); if (mapiipc_client_init()<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_INIT_SOCKET_ERROR; return NULL; } // pthread_spin_unlock(&mapi_lock); } pthread_spin_unlock(&mapi_lock); qbuf.mtype=1; qbuf.cmd=CREATE_OFFLINE_DEVICE; qbuf.fd=getpid(); qbuf.pid=getpid(); qbuf.fid=format; strncpy((char *)qbuf.data,path,DATA_SIZE); pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return NULL; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return NULL; } if(qbuf.cmd==SEND_FD) { if(mapiipc_send_fd(file)==-1) { local_err=MAPI_ERROR_SEND_FD; pthread_spin_unlock(&mapi_lock); return NULL; } } else { local_err=MAPI_ERROR_SEND_FD; pthread_spin_unlock(&mapi_lock); return NULL; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return NULL; } pthread_spin_unlock(&mapi_lock); switch(qbuf.cmd) { case CREATE_OFFLINE_DEVICE_ACK: offline_devices++; return strdup((char *)qbuf.data); case ERROR_ACK: local_err=qbuf.remote_errorcode; return NULL; default: local_err=MCOM_UNKNOWN_ERROR; return NULL; } } int mapi_start_offline_device(const char *dev) //Create new flow //dev=device that should be used { struct mapiipcbuf qbuf; pthread_once(&mapi_is_initialized, (void*)mapi_init); if (dev==NULL){ fprintf(stderr,"Error NULL device in mapi_start_offline_device\n\n"); return -1; } pthread_spin_lock(&mapi_lock); if ((get_numflows() == 0) && (get_totalflows() > 0) && minit) { // socket has been closed, re-create it pthread_spin_lock(&mapi_lock); if (mapiipc_client_init()<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_INIT_SOCKET_ERROR; return -1; } pthread_spin_unlock(&mapi_lock); } pthread_spin_unlock(&mapi_lock); qbuf.mtype=1; qbuf.cmd=START_OFFLINE_DEVICE; qbuf.fd=getpid(); qbuf.pid=getpid(); strncpy((char *)qbuf.data,dev,DATA_SIZE); qbuf.fid=0; pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return -1; } pthread_spin_unlock(&mapi_lock); switch(qbuf.cmd) { case START_OFFLINE_DEVICE_ACK: return 0; case ERROR_ACK: local_err=qbuf.remote_errorcode; return -1; default: local_err=MCOM_UNKNOWN_ERROR; return -1; } } int mapi_delete_offline_device(char *dev) //Create new flow //dev=device that should be used { struct mapiipcbuf qbuf; printf("closing down...\n"); pthread_once(&mapi_is_initialized, (void*)mapi_init); if (dev==NULL){ fprintf(stderr,"Error NULL device in mapi_delete_offline_device\n\n"); return -1; } /* socet is not closed pthread_spin_lock(&mapi_lock); if ((get_numflows() == 0) && (get_totalflows() > 0) && minit) { // socket has been closed, re-create it if (mapiipc_client_init()<0) return -1; } pthread_spin_unlock(&mapi_lock); */ qbuf.mtype=1; qbuf.cmd=DELETE_OFFLINE_DEVICE; qbuf.fd=getpid(); qbuf.pid=getpid(); strncpy((char *)qbuf.data,dev,DATA_SIZE); free(dev); qbuf.fid=0; pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return -1; } pthread_spin_unlock(&mapi_lock); switch(qbuf.cmd) { case DELETE_OFFLINE_DEVICE_ACK: pthread_spin_lock(&mapi_lock); if((get_numflows() == 0) && --offline_devices == 0) mapiipc_client_close(); pthread_spin_unlock(&mapi_lock); return 0; case ERROR_ACK: local_err=qbuf.remote_errorcode; return -1; default: local_err=MCOM_UNKNOWN_ERROR; return -1; } } int mapi_close_flow(int fd) { //struct mapiipcbuf qbuf; functdescr_t *f=NULL; flowdescr_t *flow=NULL; if (fd<=0){ fprintf(stderr,"Error wrong fd in mapi_close_flow\n\n"); return -1; } #ifdef DIMAPI remote_flowdescr_t *rflow=flist_get(remote_flowlist, fd); host_flow* hflow; flist_node_t* fnode; if(rflow!=NULL){ for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; hflow->dbuf->cmd=CLOSE_FLOW; hflow->dbuf->fd=hflow->fd; hflow->dbuf->length=BASIC_SIZE; } //if (mapiipc_remote_write_to_all(rflow)<0) return -1; pthread_mutex_lock(&rflow->mutex); rflow->pending_msgs=0; rflow->is_connected=0; for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; hflow->dbuf->fd=hflow->fd; if (mapiipc_remote_write(hflow->dbuf, hflow->rhost)<0) { pthread_mutex_unlock(&rflow->mutex); local_err = MCOM_SOCKET_ERROR; return -1; } rflow->pending_msgs++; } pthread_mutex_unlock(&rflow->mutex); //no need to wait for results /*for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; switch(hflow->dbuf->cmd){ case CLOSE_FLOW_ACK: break; case ERROR_ACK: local_err=MCOM_ERROR_ACK; break; default: local_err=MCOM_UNKNOWN_ERROR; break; } }*/ delete_remote_flow(rflow); return 0; } #endif if(flowlist && (flow = flist_get(flowlist, fd))==NULL){ DEBUG_CMD(printf("Invalid flow: %d [%s:%d]\n",fd,__FILE__,__LINE__)); local_err = MAPI_INVALID_FLOW; return -1; } else { pthread_spin_lock(&mapi_lock); //XXX MEMORY LEAK why leave data since the node is removed???? flow = flist_remove(flowlist,fd,FLIST_LEAVE_DATA); pthread_spin_unlock(&mapi_lock); } //Delete functions applied first, before mapid is notified. pthread_spin_lock(&mapi_lock); while((f = flist_pop_first(flow->flist)) != NULL) { if(f->def->client_cleanup!=NULL && f->funct->instance->status==MAPIFUNC_INIT){ f->def->client_cleanup(f->funct->instance); } if(f->result!=NULL) free(f->result); free(f->funct->instance); free(f->funct); free(f->data); free(f); } pthread_spin_unlock(&mapi_lock); /* if (flow->error==0) { qbuf.mtype=1; qbuf.cmd=CLOSE_FLOW; qbuf.fd=fd; qbuf.fid=getpid(); qbuf.pid=getpid(); pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); return -1; } pthread_spin_unlock(&mapi_lock); switch(qbuf.cmd) { case CLOSE_FLOW_ACK: // if this is the last one, release socket resources break; case ERROR_ACK: local_err=MCOM_ERROR_ACK; break; default: local_err=MCOM_UNKNOWN_ERROR; break; } }*/ //Detach shared mem if (flow->shm_base!=NULL) { if (shmdt(flow->shm_base)<0) { WARNING_CMD(printf("Warning: Could not detach shared mem (%s) [%s:%d]\n",strerror(errno),__FILE__,__LINE__)); } } /* subtract, the flow should be closed either due to an error or * explicitly in this function. so numflows is really number of allocated * flows... */ pthread_spin_lock(&mapi_lock); if((decr_numflows() == 0) && offline_devices == 0) mapiipc_client_close(); pthread_spin_unlock(&mapi_lock); //Free flow resources free(flow->devtype); free(flow->flist); free(flow); return 0; } //XXX Why send_fd returns 0 on error whereas every other function //return -1 on error? static int send_fd(int *fds, int num) { int c; struct mapiipcbuf qbuf; for(c=0;cis_connected){ printf("\nERROR can not apply function on an already connected flow\n"); //TODO when mapi_set_flow can support remote flows enable the folowing local_err = MAPI_FLOW_NOT_CONNECTED; return -1; } //we create a dummy flow descriptor just to get the function info we want from mapilh_get_function_def flow=(flowdescr_t *)malloc(sizeof(flowdescr_t)); flow->devtype="1.3"; is_remote = 1;//indicates that flow is remote } else #endif if ((flow=flist_get(flowlist,fd))==NULL) { DEBUG_CMD(printf("Invalid flow: %d [%s:%d]\n",fd,__FILE__,__LINE__)); local_err = MAPI_INVALID_FLOW; return -1; } /*else if (flow->error!=0) { DEBUG_CMD(printf("Invalid flow: %d due to error #%d [%s:%d]\n",fd,flow->error,__FILE__,__LINE__)); local_err = MAPI_INVALID_FLOW; return -1; }*/ if(flow->is_connected){ printf("\nERROR can not apply function on an already connected flow\n"); local_err = MFUNCT_COULD_NOT_APPLY_FUNCT; return -1; } //Get information about function fdef=mapilh_get_function_def(funct,flow->devtype); if(fdef==NULL) { DEBUG_CMD(printf("Could not find/match function %s [%s:%d]\n",funct,__FILE__,__LINE__)); local_err = MAPI_FUNCTION_NOT_FOUND; return -1; } va_start(vl,funct); pos = qbuf.data; // point to start of arguments buffer if (agent==1) { args = va_arg(vl, unsigned char*); } // parse function arguments if(strncmp(fdef->argdescr, "", 1)) { // there are some args argdescr_ptr = fdef->argdescr; while(strlen(argdescr_ptr) > 0){ switch(*argdescr_ptr) { case 's': if (agent==0) temp=va_arg(vl, char*); else { temp=(char*)args; args+=strlen(temp)+1; } addarg(&pos, temp, STRING); arg_size+=strlen(temp)+1; break; case 'i': if (agent==0) tmp = va_arg(vl, int); else { memcpy(&tmp, args, sizeof(int)); args+=sizeof(int); } addarg(&pos, &tmp, INT); arg_size+=sizeof(int); break; case 'c': if (agent==0) ctmp = va_arg(vl, int); //`char' is promoted to `int' when passed through `...' else { memcpy(&ctmp, args, sizeof(char)); args+=sizeof(char); } addarg(&pos, &ctmp, CHAR); arg_size+=sizeof(char); break; case 'l': if (agent==0) ltmp = va_arg(vl, unsigned long long); else { memcpy(<mp, args, sizeof(unsigned long long)); args+=sizeof(unsigned long long); } addarg(&pos, <mp, UNSIGNED_LONG_LONG); arg_size+=sizeof(unsigned long long); break; /* // Adding UID as a "hidden" argument case 'u': tmp = getuid(); addarg(&pos, &tmp, INT); break; // Adding PWD as a "hidden" argument case 'p': stemp = getcwd(NULL, 64); sbuff = malloc(strlen(stemp) + 2); strcpy(sbuff, stemp); sbuff[strlen(stemp)] = '/'; sbuff[strlen(stemp)+1] = '\0'; addarg(&pos, sbuff, STRING); free(sbuff); break; */ case 'w': //Open file for writing // printf("--------------agent = %d\n", agent); if (agent==0) filename=va_arg(vl, char*); else { filename=(char*)args; args+=strlen(filename)+1; } #ifdef DIMAPI if(is_remote){//flow is remote addarg(&pos, filename, STRING); arg_size+=strlen(filename)+1; break; } #endif if (agent == 1) { char *tmp_fname; int un_id=0; filename = (tmp_fname=strrchr(filename, '/'))?(tmp_fname+1):filename; do { asprintf(&tmp_fname, "%s-%d", filename, un_id++); tmp=open(tmp_fname, O_WRONLY|O_TRUNC|O_CREAT|O_EXCL|O_LARGEFILE,S_IRUSR|S_IWUSR); free(tmp_fname); } while(tmp==-1 && errno==EEXIST); } else { tmp=open(filename,O_WRONLY|O_TRUNC|O_CREAT|O_LARGEFILE,S_IRUSR|S_IWUSR); } if(tmp==-1) { //|| numfd==256) DEBUG_CMD(printf("Error creating file: %s [%s:%d]\n",filename,__FILE__,__LINE__)); local_err=MAPI_ERROR_FILE; return -1; } DEBUG_CMD(printf("Created file for writing: %s [%s:%d]\n",filename,__FILE__,__LINE__)); fds[numfd++]=tmp; addarg(&pos, &tmp, INT); break; default: local_err=MFUNCT_INVALID_ARGUMENT_DESCRIPTOR; printf("Illegal argument descriptor %c\n",*argdescr_ptr); //exit(EXIT_FAILURE); return -1; } argdescr_ptr++; // move to the next arg } } #ifdef DIMAPI if(is_remote){//flow is remote for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; hflow->dbuf->cmd=APPLY_FUNCTION; hflow->dbuf->fd=hflow->fd; memcpy(hflow->dbuf->data, funct, strlen(funct)+1);//put function name in the buffer memcpy(hflow->dbuf->data+strlen(funct)+1, qbuf.data, arg_size); //argument size hflow->dbuf->length=BASIC_SIZE + strlen(funct) + 1 + arg_size; } if (mapiipc_remote_write_to_all(rflow)<0){ local_err = MCOM_SOCKET_ERROR; return -1; } fidseed++; for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; switch(hflow->dbuf->cmd) { case APPLY_FUNCTION_ACK: printf("Function applied with rfid %d and with real fid %d\n",fidseed,hflow->dbuf->fid); //generate new fid (rfid) and return this. hold every fid in the hflow in a flist fdata=(function_data*)malloc(sizeof(function_data)); fdata->fid=hflow->dbuf->fid; fdata->fdef=fdef; flist_append(hflow->functions, fidseed, fdata); break; case ERROR_ACK: printf("Error! mapi_apply_function did not work!\n"); local_err = MFUNCT_COULD_NOT_APPLY_FUNCT; return -1; default: printf("Error! mapi_apply_function did not work!\n"); local_err = MFUNCT_COULD_NOT_APPLY_FUNCT; return -1; } } return fidseed; } #endif qbuf.mtype=1; qbuf.cmd=APPLY_FUNCTION; qbuf.fd=fd; qbuf.pid=getpid(); strncpy(qbuf.function,funct,FUNCT_NAME_LENGTH); strncpy((char *)qbuf.argdescr,fdef->argdescr,ARG_LENGTH); pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } if(!send_fd(fds,numfd)) { local_err=MAPI_ERROR_SEND_FD; pthread_spin_unlock(&mapi_lock); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return -1; } pthread_spin_unlock(&mapi_lock); switch(qbuf.cmd) { case APPLY_FUNCTION_ACK: break; case ERROR_ACK: local_err =qbuf.remote_errorcode; return -1; default: local_err = MCOM_UNKNOWN_ERROR; return -1; } fdef=mapilh_get_function_def(funct,qbuf.function); f=malloc(sizeof(functdescr_t)); f->fid=qbuf.fid; f->def=fdef; f->result_init=0; f->data=NULL; f->result=NULL; f->funct=malloc(sizeof(mapidflib_function_t)); f->funct->fd=fd; f->funct->fid=qbuf.fid; f->funct->instance=malloc(sizeof(mapidflib_function_instance_t)); f->funct->instance->status=MAPIFUNC_UNINIT; f->funct->instance->hwinfo=NULL; f->funct->instance->result.data = NULL; f->funct->instance->result.data_size = 0; f->funct->instance->result.info.funct_res_size = 0; f->funct->instance->result.info.shm.res_size = 0; f->funct->instance->result.info.shm.buf_size = 0; f->funct->instance->internal_data=NULL; memcpy(f->funct->instance->args,qbuf.data,FUNCTARGS_BUF_SIZE); pthread_spin_lock(&mapi_lock); flist_append(flow->flist,qbuf.fid,f); pthread_spin_unlock(&mapi_lock); return qbuf.fid; } int get_results_info(flowdescr_t *flow,functdescr_t *f) { struct mapiipcbuf qbuf; if (flow==NULL) { DEBUG_CMD(printf("Invalid flow (NULL) [%s:%d]\n",__FILE__,__LINE__)); local_err = MAPI_INVALID_FLOW; return -1; }/* else if (flow->error!=0) { DEBUG_CMD(printf("Invalid flow: %d due to error #%d [%s:%d]\n",flow->fd,flow->error,__FILE__,__LINE__)); local_err = MAPI_INVALID_FLOW; return -1; }*/ qbuf.mtype=1; qbuf.cmd=READ_RESULT; qbuf.fd=flow->fd; qbuf.fid=f->fid; qbuf.pid=getpid(); pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return -1; } pthread_spin_unlock(&mapi_lock); switch(qbuf.cmd) { case READ_RESULT_ACK: break; case ERROR_ACK: local_err = qbuf.remote_errorcode; return -1; default: local_err = MCOM_UNKNOWN_ERROR; return -1; } default_read_result_init(flow,f,&qbuf.data); if(f->def->client_init!=NULL) { int func_err=f->def->client_init(f->funct->instance,&qbuf.data); if(func_err!=0) { local_err = func_err; return -1; } f->funct->instance->status=MAPIFUNC_INIT; } f->result_init=1; return 0; } // old signature: int mapi_read_results(int fd, int fid, void *result) //Read result from a function //fd: flow descriptor //fid: ID of function mapi_results_t* mapi_read_results(int fd, int fid) { flowdescr_t *flow; functdescr_t *f; mapi_result_t res; struct timeval tv; /*used for timestamping results when produced */ #ifdef DIMAPI remote_flowdescr_t *rflow; host_flow* hflow; unsigned int currhost = 0; flist_node_t* fnode; mapi_results_t* results; int i; function_data* fdata; #endif if (!minit) { DEBUG_CMD(printf("Not initialized! [%s:%d]\n",__FILE__,__LINE__)); local_err = MAPI_INIT_ERROR; return NULL; } else if(fd<=0 || fid <=0 ){ fprintf(stderr,"Error wrong fd or fid in mapi_read_results\n\n"); local_err = MAPI_INVALID_FLOW; return NULL; } #ifdef DIMAPI if ((rflow=flist_get(remote_flowlist,fd))!=NULL) { if(!rflow->is_connected){ printf("\nERROR in mapi_read_results always use mapi_connect first\n"); //TODO when mapi_set_flow can support remote flows enable the folowing local_err = MAPI_FLOW_NOT_CONNECTED; return NULL; } if ((results=flist_get(rflow->function_res, fid))==NULL) { //init once results=(mapi_results_t*)malloc(sizeof(mapi_results_t)*rflow->scope_size); for (i=0; iscope_size; i++) { fdata=flist_get(((host_flow*)flist_head(rflow->host_flowlist)->data)->functions, fid); results[i].size=fdata->fdef->shm_size; results[i].res=(void*)malloc(results->size); } flist_append(rflow->function_res, fid, results); } for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; hflow->dbuf->cmd = READ_RESULT; hflow->dbuf->fd = hflow->fd; hflow->dbuf->fid = ((function_data*)flist_get(hflow->functions, fid))->fid; hflow->dbuf->length = BASIC_SIZE; } if (mapiipc_remote_write_to_all(rflow)<0){ local_err = MCOM_SOCKET_ERROR; return NULL;} //wait results for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; switch(hflow->dbuf->cmd) { case READ_RESULT_ACK: memcpy(results[currhost].res, hflow->dbuf->data, results[currhost].size); results[currhost].ts = hflow->dbuf->timestamp; break; case ERROR_ACK: default: fprintf(stderr,"Error in read results! [%s,%d]\n",__FILE__,__LINE__); return NULL; } ++currhost; } return(results); } #endif if ((flow=flist_get(flowlist,fd))==NULL) { DEBUG_CMD(printf("Invalid flow: %d [%s:%d]\n",fd,__FILE__,__LINE__)); local_err = MAPI_INVALID_FLOW; return NULL; }/* else if (flow->error!=0) { DEBUG_CMD(printf("Invalid flow: %d due to error #%d [%s:%d]\n",fd,flow->error,__FILE__,__LINE__)); local_err = MAPI_INVALID_FLOW; return NULL; }*/ if(!flow->is_connected){ printf("\nERROR in mapi_read_results always use mapi_connect first\n"); local_err = MAPI_FLOW_NOT_CONNECTED; //exit(-1); return NULL; } f=flist_get(flow->flist,fid); if(f!=NULL) { if(!f->result_init) if (get_results_info(flow,f) != 0) return NULL; if(f->def->client_init==NULL) { if(f->data == NULL) return(0); else { // printf("Lock...\n"); pthread_spin_lock(flow->shm_spinlock); //printf("Locked\n"); memcpy(f->result->res,((shm_result_t*)f->data)->ptr,((shm_result_t*)f->data)->size); //printf("Unlock...\n"); pthread_spin_unlock(flow->shm_spinlock); //printf("Unlocked\n"); gettimeofday(&tv, NULL); f->result->ts=tv.tv_sec*1000000 + tv.tv_usec; f->result->size=((shm_result_t*)f->data)->size; return f->result; } } else { int func_err=f->def->client_read_result(f->funct->instance,&res); if(func_err!=0) { local_err = func_err; return NULL; } memcpy(f->result->res,res.res,res.size); gettimeofday(&tv, NULL); f->result->ts=tv.tv_sec*1000000 + tv.tv_usec;; f->result->size=res.size; return f->result; } } else { local_err = MAPI_INVALID_FID_FUNCID; return NULL; } return NULL; } /** \brief Get the next packet from a to_buffer function \param fd flow descriptor \param fid id of TO_BUFFER function \return Reference to next packet, or NULL on error */ struct mapipkt * mapi_get_next_pkt(int fd,int fid) { flowdescr_t *flow; functdescr_t *f; mapi_result_t res; int func_err; #ifdef DIMAPI remote_flowdescr_t* rflow; host_flow* hflow; flist_node_t* fnode; #endif if (!minit) { DEBUG_CMD(printf("Not initialized! [%s:%d]\n",__FILE__,__LINE__)); local_err = MAPI_INIT_ERROR; return NULL; } else if(fd<=0 || fid <=0 ){ fprintf(stderr,"Error wrong fd or fid in mapi_get_next_pkt\n\n"); local_err = MAPI_INVALID_FID_FUNCID; return NULL; } #ifdef DIMAPI if ((rflow=flist_get(remote_flowlist,fd))!=NULL) { //FIFO if(!rflow->is_connected){ printf("\nERROR in mapi_get_next_pkt always use mapi_connect first\n"); //TODO when mapi_set_flow can support remote flows enable the folowing local_err = MAPI_FLOW_NOT_CONNECTED; return NULL; } if (rflow->pkt_list==NULL) { rflow->pkt_list=(flist_t*)malloc(sizeof(flist_t)); flist_init(rflow->pkt_list); for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; hflow->dbuf->cmd=GET_NEXT_PKT; hflow->dbuf->fd=hflow->fd; hflow->dbuf->fid=((function_data*)flist_get(hflow->functions, fid))->fid; hflow->dbuf->length=BASIC_SIZE; if (mapiipc_remote_write(hflow->dbuf, hflow->rhost)<0) { local_err = MCOM_SOCKET_ERROR; return NULL;} } pthread_mutex_lock(&rflow->mutex); rflow->pending_msgs=0; pthread_mutex_unlock(&rflow->mutex); sem_wait(&rflow->pkt_sem); //wait at least one host packet hflow=(host_flow*)flist_pop_first(rflow->pkt_list); switch(hflow->dbuf->cmd) { case GET_NEXT_PKT_ACK: memcpy(rflow->pkt, hflow->dbuf->data, hflow->dbuf->length-BASIC_SIZE); //send request for next packet from this host hflow->dbuf->cmd=GET_NEXT_PKT; hflow->dbuf->fd=hflow->fd; hflow->dbuf->fid=((function_data*)flist_get(hflow->functions, fid))->fid; hflow->dbuf->length=BASIC_SIZE; if(mapiipc_remote_write(hflow->dbuf, hflow->rhost)<0){ local_err = MCOM_SOCKET_ERROR; return NULL; } return rflow->pkt; case ERROR_ACK: printf("Error! mapi_get_next_pkt did not work!\n"); local_err = MCOM_ERROR_ACK; return NULL; default: printf("Error! mapi_get_next_pkt did not work!\n"); local_err = MCOM_UNKNOWN_ERROR; return NULL; } return NULL; } else { //if no packet arrived yet wait sem_wait(&rflow->pkt_sem); hflow=(host_flow*)flist_pop_first(rflow->pkt_list); switch(hflow->dbuf->cmd) { case GET_NEXT_PKT_ACK: memcpy(rflow->pkt, hflow->dbuf->data, hflow->dbuf->length-BASIC_SIZE); //send request for next packet from this host hflow->dbuf->cmd=GET_NEXT_PKT; hflow->dbuf->fd=hflow->fd; hflow->dbuf->fid=((function_data*)flist_get(hflow->functions, fid))->fid; hflow->dbuf->length=BASIC_SIZE; if (mapiipc_remote_write(hflow->dbuf, hflow->rhost)<0){ local_err = MCOM_SOCKET_ERROR; return NULL; } return rflow->pkt; case ERROR_ACK: printf("Error! mapi_get_next_pkt did not work!\n"); return NULL; default: printf("Error! mapi_get_next_pkt did not work!\n"); return NULL; } return NULL; } } #endif if ((flow=flist_get(flowlist,fd))==NULL) { DEBUG_CMD(printf("Invalid flow: %d [%s:%d]\n",fd,__FILE__,__LINE__)); return NULL; } /*else if (flow->error!=0) { DEBUG_CMD(printf("Invalid flow: %d due to error #%d [%s:%d]\n",fd,flow->error,__FILE__,__LINE__)); return NULL; }*/ if(!flow->is_connected){ printf("\nERROR in mapi_get_next_pkt always use mapi_connect first\n"); //exit(-1); local_err = MAPI_FLOW_NOT_CONNECTED; return NULL; } if ( (f = flist_get(flow->flist,fid)) == NULL ) { local_err = MAPI_INVALID_FID_FUNCID; return NULL; } // This should be attaching shared memory segment with results if( !f->result_init ) if (get_results_info(flow,f) != 0) { DEBUG_CMD(printf("Missing error message [%s:%d]\n",__FILE__,__LINE__)); return NULL; } if ( f->def->client_read_result == NULL ) { //TODO return error message DEBUG_CMD(printf("Missing error message [%s:%d]\n",__FILE__,__LINE__)); return NULL; } func_err = f->def->client_read_result(f->funct->instance,&res); if (func_err != 0) { local_err = func_err; return NULL; } if (res.res == NULL) { DEBUG_CMD(printf("result is NULL [%s:%d]\n",__FILE__,__LINE__)); } return res.res; } int mapi_loop(int fd, int fid, int cnt, mapi_handler callback){ struct mapipkt* pkt; int i = 0; flowdescr_t* flow; if (!minit) { DEBUG_CMD(printf("Not initialized! [%s:%d]\n",__FILE__,__LINE__)); local_err = MAPI_INIT_ERROR; return -1; } else if(fd<=0 || fid <=0 ){ fprintf(stderr,"Error wrong fd or fid in mapi_loop\n\n"); local_err = MAPI_INVALID_FID_FUNCID; return -1; } if ((flow=flist_get(flowlist,fd))==NULL) { DEBUG_CMD(printf("Invalid flow: %d [%s:%d]\n",fd,__FILE__,__LINE__)); local_err = MAPI_INVALID_FLOW; return -1; } /*else if (flow->error!=0) { DEBUG_CMD(printf("Invalid flow: %d due to error #%d [%s:%d]\n",fd,flow->error,__FILE__,__LINE__)); local_err = MAPI_INVALID_FLOW; return -1; }*/ //TOBEDONE when support for remote flows is added check if the //rflow is connected too. if(!flow->is_connected){ printf("\nERROR in mapi_loop always use mapi_connect first\n"); local_err = MAPI_FLOW_NOT_CONNECTED; return -1; } if (cnt > 0){ for (i = 0; i < cnt; i++){ pkt = mapi_get_next_pkt(fd, fid); if (pkt == NULL) return local_err; //flow->error; (*callback)(pkt); } } else{ while(1){ pkt = mapi_get_next_pkt(fd, fid); if (pkt == NULL) return local_err; //flow->error; (*callback)(pkt); } } return 0; } /* * Very simple function. Just reads the last error that was set. */ int mapi_read_error(MAPI_UNUSED int fd, int* err_no, char* errorstr) { int i =0; *err_no = local_err; for(; Errors[i].err_no!=0; i++){ if(Errors[i].err_no == local_err){ if(strlen(Errors[i].desc) < MAPI_ERRORSTR_LENGTH){ strncpy(errorstr, Errors[i].desc, MAPI_ERRORSTR_LENGTH); } else strncpy(errorstr,"Error in mapi_read_error: Error string too long\n",strlen("Error in mapi_read_error: Error string too long\n")); break; } } local_err =0; //Extra work which can be used in the case we use //per flow error /* if (fd == -1) { *err_no = local_err; translate_local_errorcode(err_no,errorstr); local_err=0; } else { flowdescr_t* flow = flist_get(flowlist,fd); if (flow == NULL) { DEBUG_CMD(printf("Invalid flow: %d [%s:%d]\n",fd,__FILE__,__LINE__)); return -1; } pthread_spin_lock(&mapi_lock); strncpy(errorstr, flow->errstr, (strlen(flow->errstr)errstr):MAPI_ERRORSTR_LENGTH); pthread_spin_unlock(&mapi_lock); *err_no = flow->error; }*/ return 0; } /* // PER FLOW ERROR CHECKING REMOVED ATM. static int set_error(void* flow, int err_no , int is_remote) { //An general mapi stub error occured if(local_err!=0) return; if (flow==NULL) { DEBUG_CMD(printf("Invalid flow: NULL [%s:%d]\n",__FILE__,__LINE__)); return -1; } if(is_remote){ pthread_spin_lock(&mapi_lock); ((remote_flowdescr_t*)flow)->error = err_no; translate_local_errorcode(err_no,((remote_flowdescr_t*)flow)->errstr); pthread_spin_unlock(&mapi_lock); } else{ pthread_spin_lock(&mapi_lock); ((flowdescr_t*)flow)->error = err_no; translate_local_errorcode(err_no,((flowdescr_t*)flow)->errstr); pthread_spin_unlock(&mapi_lock); } return 0; } */ static int default_read_result_init(flowdescr_t *flow,functdescr_t* f,void* data) { mapid_shm_t *shm=data; mapid_shm_t *shm_spinlock=(mapid_shm_t*)((char*)data+sizeof(mapid_shm_t)); int id; if (flow==NULL) { DEBUG_CMD(printf("Invalid flow: NULL [%s:%d]\n",__FILE__,__LINE__)); local_err = MAPI_INVALID_FLOW; return -1; } if(!flow->shm_base) { //Get pointer to shared memory id=shmget(shm->key, shm->buf_size, 660); if(id<0) { DEBUG_CMD(printf("Shared memory error [%s:%d]\n",__FILE__,__LINE__)); local_err=MAPI_SHM_ERR; return -1; } if ((flow->shm_base=shmat(id, 0, FUNCTION_SHM_PERMS))==NULL) { local_err = MAPI_SHM_ERR; return -1; } } if(!flow->shm_spinlock) { //Get pointer to shared spinlock memory id=shmget(shm_spinlock->key, shm_spinlock->buf_size, 660); if(id<0) { DEBUG_CMD(printf("Shared memory error:%m [%s:%d]\n",__FILE__,__LINE__)); perror("Error: "); local_err=MAPI_SHM_ERR; return -1; } if ((flow->shm_spinlock=shmat(id, 0, FUNCTION_SHM_PERMS))==NULL) { local_err=MAPI_SHM_ERR; return -1; } } f->data=malloc(sizeof(shm_result_t)); ((shm_result_t*)f->data)->ptr=flow->shm_base+shm->offset; ((shm_result_t*)f->data)->size=shm->res_size; //Attach result to instance f->funct->instance->result.data=flow->shm_base+shm->offset; f->funct->instance->result.data_size=shm->res_size; f->result=(mapi_results_t*)malloc(sizeof(mapi_results_t)); f->result->res=(void *)malloc(((shm_result_t*)f->data)->size); return 0; } int mapi_get_function_info(int fd,int fid, mapi_function_info_t *info) { struct mapiipcbuf qbuf; flowdescr_t* flow; if (!minit) { DEBUG_CMD(printf("Not initialized! [%s:%d]\n",__FILE__,__LINE__)); local_err = MAPI_INIT_ERROR; return -1; } else if(fd<=0 || fid <=0 ){ //fprintf(stderr,"Error wrong fd or fid in mapi_get_function_info\n\n"); local_err =MAPI_INVALID_FID_FUNCID ; return -1; } if ((flow=flist_get(flowlist,fd))==NULL) { DEBUG_CMD(printf("Invalid flow: %d [%s:%d]\n",fd,__FILE__,__LINE__)); local_err = MAPI_INIT_ERROR; return -1; }/* else if (flow->error!=0) { DEBUG_CMD(printf("Invalid flow: %d due to error #%d [%s:%d]\n",fd,flow->error,__FILE__,__LINE__)); local_err = MAPI_INIT_ERROR; return -1; }*/ qbuf.mtype=1; qbuf.cmd=GET_FUNCTION_INFO; qbuf.fd=fd; qbuf.fid=fid; pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } pthread_spin_unlock(&mapi_lock); if(qbuf.cmd==GET_FUNCTION_INFO_ACK) { memcpy(info,qbuf.data,sizeof(mapi_function_info_t)); info->result_size = mapilh_get_function_def(info->name, info->devtype)->shm_size ; return 0; } else return MAPI_FUNCTION_INFO_ERR; } int mapi_get_next_function_info(int fd,int fid, mapi_function_info_t *info) { struct mapiipcbuf qbuf; pthread_once(&mapi_is_initialized, (void*)mapi_init); qbuf.mtype=1; qbuf.cmd=GET_NEXT_FUNCTION_INFO; qbuf.fd=fd; qbuf.fid=fid; if (!minit) { DEBUG_CMD(printf("Not initialized! [%s:%d]\n",__FILE__,__LINE__)); local_err=MAPI_INIT_ERROR; return -1; } else if(fd<=0 || fid <=0 ){ fprintf(stderr,"Error wrong fd or fid in mapi_get_next_function_info\n\n"); local_err = MAPI_INVALID_FID_FUNCID; return -1; } pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } pthread_spin_unlock(&mapi_lock); if(qbuf.cmd==GET_FUNCTION_INFO_ACK) { memcpy(info,qbuf.data,sizeof(mapi_function_info_t)); return 0; } else return MAPI_FUNCTION_INFO_ERR; } int mapi_get_flow_info(int fd, mapi_flow_info_t *info) { struct mapiipcbuf qbuf; if (!minit) { DEBUG_CMD(printf("Not initialized! [%s:%d]\n",__FILE__,__LINE__)); local_err = MAPI_INIT_ERROR; return -1; } else if(fd<=0){ fprintf(stderr,"Error wrong fd in mapi_get_flow_info\n\n"); local_err = MAPI_INVALID_FLOW; return -1; } qbuf.mtype=1; qbuf.cmd=GET_FLOW_INFO; qbuf.fd=fd; pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } pthread_spin_unlock(&mapi_lock); if(qbuf.cmd==GET_FLOW_INFO_ACK) { memcpy(info,qbuf.data,sizeof(mapi_flow_info_t)); return 0; } else return MAPI_FLOW_INFO_ERR; } int mapi_get_next_flow_info(int fd, mapi_flow_info_t *info) { struct mapiipcbuf qbuf; pthread_once(&mapi_is_initialized, (void*)mapi_init); if(fd<=0){ fprintf(stderr,"Error wrong fd in mapi_get_next_flow_info\n\n"); local_err = MAPI_INVALID_FLOW; return -1; } qbuf.mtype=1; qbuf.cmd=GET_NEXT_FLOW_INFO; qbuf.fd=fd; pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } pthread_spin_unlock(&mapi_lock); if(qbuf.cmd==GET_FLOW_INFO_ACK) { memcpy(info,qbuf.data,sizeof(mapi_flow_info_t)); return 0; } else return (local_err = MAPI_FLOW_INFO_ERR); } /*int mapi_load_library(const char* library) { struct mapiipcbuf qbuf; if(!minit) mapi_init(); if((local_err=mapilh_load_library(libpath,library))==0) { qbuf.mtype=1; qbuf.cmd=LOAD_LIBRARY; qbuf.fd=getpid(); qbuf.pid=getpid(); strcpy((char *)qbuf.data,library); pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); return -1; } pthread_spin_unlock(&mapi_lock); return 0; } else return -1; }*/ int mapi_get_next_device_info(int devid, mapi_device_info_t *info) { struct mapiipcbuf qbuf; pthread_once(&mapi_is_initialized, (void*)mapi_init); qbuf.mtype=1; qbuf.cmd=GET_NEXT_DEVICE_INFO; qbuf.fd=devid; pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } pthread_spin_unlock(&mapi_lock); if(qbuf.cmd==GET_DEVICE_INFO_ACK) { memcpy(info,qbuf.data,sizeof(mapi_device_info_t)); return 0; } else return (local_err = MAPI_DEVICE_INFO_ERR); } #ifdef WITH_ADMISSION_CONTROL /* Auth data format pubkey + credentials + (unsigned int)encrypted nonce len + encrypted nonce */ static int ipc_send_authdata(int fd,const unsigned char *pubkey,const unsigned char *credentials,const unsigned char *enc_nonce,size_t len) { struct mapiipcbuf qbuf; size_t pk_sz,creds_sz; flowdescr_t* flow; if (!minit) { DEBUG_CMD(printf("Not initialized! [%s:%d]\n",__FILE__,__LINE__)); local_err = MAPI_INIT_ERROR; return -1; } if ((flow=flist_get(flowlist,fd))==NULL) { DEBUG_CMD(printf("Invalid flow: %d [%s:%d]\n",fd,__FILE__,__LINE__)); local_err = MAPI_INVALID_FLOW; return -1; }/* else if (flow->error!=0) { DEBUG_CMD(printf("Invalid flow: %d due to error #%d [%s:%d]\n",fd,flow->error,__FILE__,__LINE__)); local_err = MAPI_INVALID_FLOW; return -1; }*/ qbuf.mtype = 1; qbuf.cmd = SET_AUTHDATA; qbuf.fd = fd; qbuf.pid = getpid(); pk_sz = strlen(pubkey) + 1; creds_sz = strlen(credentials) + 1; if ( (pk_sz + creds_sz + len + sizeof(unsigned int)) > DATA_SIZE ) { ERROR_CMD(fprintf(stderr,"ipc_send_authdata: the size of your authentication & authorisation data exceeds the maximum allows %d bytes [%s:%d]\n",DATA_SIZE,__FILE__,__LINE__)); return -1; } strcpy(qbuf.data,pubkey); strcpy((qbuf.data+pk_sz),credentials); *(size_t *)(qbuf.data+pk_sz+creds_sz) = len; memcpy((qbuf.data+pk_sz+creds_sz+sizeof(unsigned int)),enc_nonce,len); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; return -1; } /* no ack? - espenb */ return 0; } static int read_file(const char *fn,bytestream *bs) { int fp,r; if ( (fp = open(fn,O_RDONLY)) < 0 ) return -1; r = read(fp,bs->data,bs->length); close(fp); return r; } #define MAX_PRIVKEY_SIZE 6000 static size_t encrypt_nonce(unsigned char **enc_nonce,unsigned int nonce,const char *priv) { bytestream privkey; char *pkstring; struct keynote_deckey priv_dk; RSA *rsa; int enc_len = 0; BS_NEW(privkey,MAX_PRIVKEY_SIZE); if ( BS_ISNULL(privkey) ) { errno = ENOMEM; return 0; } if ( read_file(priv,&privkey) < 0 ) goto ret; if ( (pkstring = kn_get_string(privkey.data)) == NULL ) goto ret; // Decode private key if ( kn_decode_key(&priv_dk,pkstring,KEYNOTE_PRIVATE_KEY) != 0 ) goto ret; if ( priv_dk.dec_algorithm != KEYNOTE_ALGORITHM_RSA ) goto dec_error; rsa = (RSA *)priv_dk.dec_key; // Allocate memory if ( (*enc_nonce = malloc(RSA_size(rsa))) == NULL ) { errno = ENOMEM; goto dec_error; } enc_len = RSA_private_encrypt(sizeof(unsigned int),(unsigned char *)&nonce, *enc_nonce,rsa,RSA_PKCS1_PADDING); if ( enc_len <= 0 ) { free(*enc_nonce); enc_len = 0; } dec_error: kn_free_key(&priv_dk); ret: BS_FREE(privkey); return (size_t)enc_len; } int mapi_set_authdata(int fd,const char *pub,const char *priv,const char *creds) { struct mapiipcbuf qbuf; bytestream pubkey,credentials; int ret = -1; size_t enc_len; unsigned char *encrypted_nonce = NULL; flowdescr_t* flow; #ifdef DIMAPI remote_flowdescr_t *rflow; host_flow* hflow; flist_node_t* fnode; size_t pk_sz,creds_sz, tot_size; unsigned char is_remote=0; #endif if (!minit) { DEBUG_CMD(printf("Not initialized! [%s:%d]\n",__FILE__,__LINE__)); local_err = MAPI_INIT_ERROR; return -1; } if(fd<=0){ fprintf(stderr,"Error wrong fd in mapi_set_authdata\n\n"); local_err = MAPI_INVALID_FLOW; return -1; } #ifdef DIMAPI if ((rflow=flist_get(remote_flowlist,fd))!=NULL) { is_remote=1; } else #endif if ((flow=flist_get(flowlist,fd))==NULL) { DEBUG_CMD(printf("Invalid flow: %d [%s:%d]\n",fd,__FILE__,__LINE__)); local_err = MAPI_INVALID_FLOW; return -1; }/* else if (flow->error!=0) { DEBUG_CMD(printf("Invalid flow: %d due to error #%d [%s:%d]\n",fd,flow->error,__FILE__,__LINE__)); local_err = MAPI_INVALID_FLOW; return -1; }*/ if ( fd < 0 ) return -1; // Allocate memory to hold data in memory BS_NEW(pubkey,4096); BS_NEW(credentials,4096); if ( BS_ISNULL(pubkey) || BS_ISNULL(credentials) ) goto fail; // Read data in memory if ( read_file(pub,&pubkey) < 0 ) goto fail; if ( read_file(creds,&credentials) < 0 ) goto fail; #ifdef DIMAPI if(is_remote){ pk_sz = strlen(pubkey.data) + 1; creds_sz = strlen(credentials.data) + 1; tot_size = pk_sz + creds_sz + sizeof(unsigned int); for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; // Encrypt nonce if ( (enc_len = encrypt_nonce(&encrypted_nonce,(unsigned int)hflow->fd,priv)) == 0 ) goto fail; if ( (tot_size + enc_len ) > DATA_SIZE ) { ERROR_CMD(fprintf(stderr,"mapi_set_authdata: the size of your authentication & authorisation data exceeds the maximum allows %d bytes [%s:%d]\n",DATA_SIZE,__FILE__,__LINE__)); goto fail; } hflow->dbuf->cmd = SET_AUTHDATA; hflow->dbuf->fd = hflow->fd; strcpy(hflow->dbuf->data,pubkey.data); strcpy((hflow->dbuf->data+pk_sz),credentials.data); *(size_t *)(hflow->dbuf->data+pk_sz+creds_sz) = enc_len; memcpy((hflow->dbuf->data+tot_size),encrypted_nonce,enc_len); hflow->dbuf->length = BASIC_SIZE + tot_size + enc_len; } if (mapiipc_remote_write_to_all(rflow)<0){ local_err = MCOM_SOCKET_ERROR; return -1; } ret = 0; for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; switch(hflow->dbuf->cmd) { case SET_AUTHDATA_ACK: //OK from this host - ret remains as is break; case ERROR_ACK: ret = -1; break; default: ret = -1; break; } } goto fail; } #endif // Encrypt nonce if ( (enc_len = encrypt_nonce(&encrypted_nonce,(unsigned int)fd,priv)) == 0 ) goto fail; // Send the data pthread_spin_lock(&mapi_lock); if ( ipc_send_authdata(fd,pubkey.data,credentials.data,encrypted_nonce,enc_len) != 0 ) goto fail; // Wait for ACK if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } switch( qbuf.cmd ) { case SET_AUTHDATA_ACK: ret = 0; break; case ERROR_ACK: locar_err = qbuf.remote_errorcode; break; default: local_err = MCOM_UNKNOWN_ERROR; break; } fail: BS_FREE(pubkey); BS_FREE(credentials); if ( encrypted_nonce != NULL ) free(encrypted_nonce); #ifdef DIMAPI if(is_remote) { pthread_spin_unlock(&mapi_lock); return ret;//just to avoid the spin_unlock } #endif pthread_spin_unlock(&mapi_lock); return ret; } #ifdef DIMAPI int agent_send_authdata(void *authdata) //sends authdata from agent to mapid { struct mapiipcbuf qbuf; size_t pk_p, creds_p; struct dmapiipcbuf *dbuf = (struct dmapiipcbuf *)authdata; pk_p = strlen(dbuf->data) + 1; creds_p = strlen(dbuf->data + pk_p) + 1; pthread_spin_lock(&mapi_lock); if ( ipc_send_authdata( dbuf->fd, dbuf->data, dbuf->data+pk_p, dbuf->data+pk_p+creds_p+sizeof(size_t), *(size_t *)(dbuf->data + pk_p + creds_p) ) != 0 ) { pthread_spin_unlock(&mapi_lock); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } switch(qbuf.cmd){ case SET_AUTHDATA_ACK: pthread_spin_unlock(&mapi_lock); return 0; case ERROR_ACK: pthread_spin_unlock(&mapi_lock); local_err = qbuf.remote_errorcode; return -1; default: pthread_spin_unlock(&mapi_lock); local_err = MCOM_UNKNOWN_ERROR; return -1; } return -1; } #endif //DIMAPI #else int mapi_set_authdata(MAPI_UNUSED int fd,MAPI_UNUSED const char *pub,MAPI_UNUSED const char *priv,MAPI_UNUSED const char *creds) { // We don't need to do anything return 0; } #endif //WITH_ADMISSION_CONTROL #ifdef WITH_AUTHENTICATION #ifdef DIMAPI /* * Stub function to authenticate a user. * Arguments: * fd - a flow descriptor, * username - a NULL-terminated string containing the username * password - a NULL-terminated string containing the password (yes, in clear text) * vo - a string containing the VO to authenticate the user against * * Returns 0 on success, or not on failure. */ int mapi_authenticate(int fd, const char *username, const char *password, const char *vo) { //flowdescr_t *flow; remote_flowdescr_t *rflow; host_flow *hflow; flist_node_t *lnode; if(!minit) { fprintf(stderr, "DIMAPI is not initialised. [%s:%d]\n",__FILE__,__LINE__); local_err = MAPI_INIT_ERROR; return(-1); } /* * Before we start, some trivial checks. */ if(fd<=0){ fprintf(stderr,"Error wrong fd in mapi_authenticate\n\n"); local_err = MAPI_INVALID_FLOW; return -1; } if(!username || !password || !vo) return(-1); // XXX: Omg ugly ugly ugly if(strlen(username) > MAX_DATA_SIZE || strlen(password) > MAX_DATA_SIZE || strlen(vo) > MAX_DATA_SIZE) { return(-1); } /* * Methinks that if the flow is not remote, * no authentication should take place. * I mean, who on this bloody earth is going to * stop an admin from monitoring whatever he wants * on his own box? * * I for one, will not. */ if((rflow = flist_get(remote_flowlist, fd)) != NULL) { int len = 0; for((lnode = flist_head(rflow->host_flowlist)) ; lnode != NULL; lnode = flist_next(lnode)) { hflow = (host_flow *)lnode->data; hflow->dbuf->cmd = AUTHENTICATE; hflow->dbuf->fd = hflow->fd; len += sprintf(hflow->dbuf->data + len, "%s", username) + 1; len += sprintf(hflow->dbuf->data + len, "%s", password) + 1; len += sprintf(hflow->dbuf->data + len, "%s", vo) + 1; hflow->dbuf->length = BASIC_SIZE + len; } if (mapiipc_remote_write_to_all(rflow)<0){ local_err = MCOM_SOCKET_ERROR; return -1; } // And wait for result.. for(lnode = flist_head(rflow->host_flowlist); lnode != NULL; lnode = flist_next(lnode)) { hflow = (host_flow *)lnode->data; switch(hflow->dbuf->cmd) { case AUTHENTICATE_ACK: rflow->is_authenticated = 1; rflow->username = malloc(sizeof(char) * (strlen(username) + 1)); rflow->vo = malloc(sizeof(char) * (strlen(vo) + 1)); snprintf(rflow->username, MAX_DATA_SIZE, "%s", username); snprintf(rflow->vo, MAX_DATA_SIZE, "%s", vo); return(0); break; case ERROR_ACK: rflow->is_authenticated = 0; rflow->username = NULL; rflow->vo = NULL; return(-1); break; default: rflow->is_authenticated = 0; rflow->username = NULL; rflow->vo = NULL; return(-1); } } } else { /* * Now what? */ return(0); } return(-1); } /* * mfukar * * Forward data from agent to mapid. */ int agent_authenticate(void *data) { struct dmapiipcbuf *dbuf = (struct dmapiipcbuf *)data; char *offset = dbuf->data; offset += strlen(offset); offset += strlen(offset); struct mapiipcbuf qbuf; qbuf.mtype = 1; // XXX whatever? qbuf.cmd = AUTHENTICATE; qbuf.fd = dbuf->fd; // qbuf.pid = getpid(); memcpy(qbuf.data, dbuf->data, DATA_SIZE); pthread_spin_lock(&mapi_lock); if (mapiipc_write(&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } if (mapiipc_read(&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } switch(qbuf.cmd) { case AUTHENTICATE_ACK: pthread_spin_unlock(&mapi_lock); return(0); case ERROR_ACK: pthread_spin_unlock(&mapi_lock); return(-1); default: pthread_spin_unlock(&mapi_lock); return(-1); } return(-1); } #endif #endif #ifdef DIMAPI static int hostcmp(void *h1, void *h2) { struct host *h = (struct host *)h1; char *s = (char *)h2; return strcmp(h->hostname, s); } #endif //global var access functions int get_numflows() { int n; pthread_spin_lock(&numflows_lock); n = numflows; pthread_spin_unlock(&numflows_lock); return n; } // increases numflows and returns its new value int incr_numflows() { int n; pthread_spin_lock(&numflows_lock); n = ++numflows; pthread_spin_unlock(&numflows_lock); return n; } // decreases numflows and returns its new value int decr_numflows() { int n; pthread_spin_lock(&numflows_lock); n = --numflows; pthread_spin_unlock(&numflows_lock); return n; } int get_totalflows() { int n; pthread_spin_lock(&numflows_lock); n = totalflows; pthread_spin_unlock(&numflows_lock); return n; } // increases totalflows and returns its new value int incr_totalflows() { int n; pthread_spin_lock(&numflows_lock); n = ++totalflows; pthread_spin_unlock(&numflows_lock); return n; } #ifdef DIMAPI void set_agent(); void set_agent() { agent=1; } #endif int mapi_get_scope_size(int fd) { #ifdef DIMAPI remote_flowdescr_t* rflow=(remote_flowdescr_t*)flist_get(remote_flowlist, fd); if (rflow!=NULL) return rflow->scope_size; #endif if ( flist_get(flowlist, fd)!=NULL ) return 1; else return -1; }