#ifdef HAVE_CONFIG_H #include #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "mapi.h" #include "mapiipc.h" #include "mapilibhandler.h" #include "flist.h" #include "debug.h" #include "parseconf.h" #include "printfstring.h" #include "mapi_errors.h" #include "devgroupdb.h" #ifdef WITH_ADMISSION_CONTROL #include #include #include #include "bytestream.h" #endif #ifdef WITH_ANONYMIZATION #include "anonymization/anonymization.h" #endif #ifdef DIMAPI #include #include #endif #ifdef WITH_AUTHENTICATION #define FROM_MAPI #include "vod/vod.h" #endif static pthread_once_t mapi_is_initialized = PTHREAD_ONCE_INIT; static pthread_once_t initialized = PTHREAD_ONCE_INIT; static int minit=0; //Set to 1 when MAPI has been initialized static pthread_spinlock_t mapi_lock; static pthread_spinlock_t numflows_lock; // for numflows and totalflows variables static int local_err=0; /* occurence of a mapi.c error, translation of these errors */ static int numflows=0; // number of allocated (active) flows static int totalflows=0; // number of flows so far (including closed flows) static int offline_devices; static int agent=0; extern const errorstruct Errors[]; #ifdef DIMAPI static int hostcmp(void *h1, void *h2); static void delete_remote_flow(remote_flowdescr_t* rflow); flist_t *hostlist=NULL;//list containing all remote hosts used so far extern flist_t *remote_flowlist; int dimapi_port; extern sem_t stats_sem; typedef struct function_data{ int fid; // real fid returned from mapicommd mapidflib_function_def_t* fdef; // function definition #ifdef RECONNECT host_flow *hflow; // flow of a specified host char args[DIMAPI_DATA_SIZE]; // function's arguments int index; // need for functions that have arguments that reference to a flow #endif } function_data; static pthread_once_t dmapi_is_initialized = PTHREAD_ONCE_INIT; static pthread_spinlock_t remote_ipc_lock; static pthread_spinlock_t hostlist_lock; static unsigned fdseed = 0; // 'scope' flow descriptor seed (always increases) static unsigned fidseed = 0; // function descriptor seed (always increases) static unsigned negfdseed=-1; // generates temporary negative fd, for use before create_flow #endif typedef struct libinfo { char* name; } libinfo_t; typedef struct flowdescr { int fd; int file; // file descriptor for offline flows int fds[256]; // file descriptors int numfd; // number of file descriptors char *devtype; #ifdef RECONNECT char *device; int read_results_flag; int mapid_down; #endif char *shm_base; pthread_spinlock_t *shm_spinlock; flist_t *flist; unsigned char is_connected; // this should be 1 if the flow is connected 0 otherwise int format; // this should be MFF_PCAP or MFF_DAG_ERF if the flow is offline, -1 otherwise } flowdescr_t; #ifdef RECONNECT typedef struct offline_device{ char *path; // specifies the name of the trace file to open char *previous_device; // return value of mapi_create_offline_device (device name that should be used in mapi_start_offline_device) char *new_device; // new device name (returned from mapid) int format; // the format of the captured packets (MFF_PCAP or MFF_DAG_ERF) unsigned char is_started; // this should be 1 if the device is started ( via mapi_start_offline_device() ), 0 otherwise } offline_device; static unsigned count_offline_devices = 0; // used in offline_device_list (always increases) extern flist_t *function_list; // list which contains all applyed functions ( defined in mapiipc.c ) extern flist_t *offline_device_list; // list which contains all offline devices ( defined in mapiipc.c ) #endif extern flist_t *flowlist; // defined in mapiipc.c typedef struct functdescr { int fid; short result_init; mapidflib_function_def_t *def; mapidflib_function_t *funct; void *data; mapi_results_t *result; #ifdef RECONNECT flowdescr_t *flow; int numfd; #endif } functdescr_t; typedef struct shm_result { void* ptr; //Pointer to shared data int size; //Size of shared data } shm_result_t; /* * Function declarations */ static int default_read_result_init(flowdescr_t *flow,functdescr_t* f,void* data); int get_results_info(flowdescr_t *flow, functdescr_t *f, int user_fd, int user_fid); //static int set_error(void* flow, int err_no , int is_remote); static int send_fd(int *fds,int numfd); //global var access functions static int get_numflows(); static int incr_numflows(); static int decr_numflows(); static int get_totalflows(); static int incr_totalflows(); static void init() //common initialization function for mapi and dimapi { #ifdef DIMAPI char* path=NULL, *libs=NULL, *str=NULL, *s=NULL; char* mapi_conf; mapi_conf = printf_string( CONFDIR"/"CONF_FILE ); if (pc_load (mapi_conf)) { conf_category_entry_t* empty_cat = pc_get_category(""); const char *portstr; if( empty_cat == NULL ){ printf("Configuration file has no empty category. Giving up\n"); exit(1); } path = pc_get_param (empty_cat, "libpath"); libs = pc_get_param (empty_cat, "libs"); portstr = pc_get_param (empty_cat, "dimapi_port"); if( portstr == NULL ){ printf("ERROR: Configuration file has no entry for `dimapi_port'. Using default port %d\n", DEFAULT_DIMAPI_PORT); dimapi_port = DEFAULT_DIMAPI_PORT; } else { /* make sure that portstr is a valid number. */ dimapi_port = atoi( portstr ); if ( dimapi_port<=0 || dimapi_port>=65536 ) { printf("ERROR: Invalid port given in configuration file. The default port %d is used\n", DEFAULT_DIMAPI_PORT); dimapi_port = DEFAULT_DIMAPI_PORT; } } } else { printf("ERROR: Cannot load mapi.conf file. Giving up.\n"); printf("Search path is: %s\n", mapi_conf); exit(1); } #endif minit = 1; pthread_spin_init(&numflows_lock, PTHREAD_PROCESS_PRIVATE); flowlist = malloc(sizeof(flist_t)); flist_init(flowlist); #ifdef RECONNECT function_list = malloc(sizeof(flist_t)); flist_init(function_list); offline_device_list = malloc(sizeof(flist_t)); flist_init(offline_device_list); #endif #ifdef DIMAPI pthread_spin_init(&remote_ipc_lock, PTHREAD_PROCESS_PRIVATE); pthread_spin_init(&hostlist_lock, PTHREAD_PROCESS_PRIVATE); hostlist = malloc(sizeof(flist_t)); remote_flowlist = malloc(sizeof(flist_t)); flist_init(hostlist); flist_init(remote_flowlist); str=libs; while((s=strchr(str,':'))!=NULL) { *s='\0'; mapilh_load_library(path,str); str=s+1; } mapilh_load_library(path,str); free(mapi_conf); pc_close(); #endif } //Initializes MAPI - called only once by pthread_once() static void mapi_init() { struct mapiipcbuf qbuf; char libpath[4096],*str,*s; minit=1; if(mapiipc_client_init()==-1){ local_err = MCOM_INIT_SOCKET_ERROR; minit=0; fprintf(stderr, "\n--------------------------------------------------------\n"); fprintf(stderr, "WARNING: mapid may not be running at the given interface\n"); fprintf(stderr,"--------------------------------------------------------\n"); } pthread_once(&initialized, (void*)init); pthread_spin_init(&mapi_lock, PTHREAD_PROCESS_PRIVATE); offline_devices = 0; //Get libpath from mapid qbuf.mtype=1; qbuf.cmd=GET_LIBPATH; qbuf.fd=getpid(); qbuf.pid=getpid(); pthread_spin_lock(&mapi_lock); if( mapiipc_write((struct mapiipcbuf*)&qbuf)) local_err = MCOM_SOCKET_ERROR; if( mapiipc_read((struct mapiipcbuf*)&qbuf) ) local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); switch(qbuf.cmd) { case GET_LIBPATH_ACK: strncpy(libpath, (char *)qbuf.data, 4096); break; default: /* MAPI_ERROR_GETTING_LIBPATH */ return; break; } printf("libpath=%s\n", libpath); //get libs from mapid qbuf.mtype=1; qbuf.cmd=GET_LIBS; qbuf.fd=getpid(); qbuf.pid=getpid(); pthread_spin_lock(&mapi_lock); if(mapiipc_write((struct mapiipcbuf*)&qbuf)) local_err = MCOM_SOCKET_ERROR; if( mapiipc_read((struct mapiipcbuf*)&qbuf)) local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); switch(qbuf.cmd) { case GET_LIBS_ACK: break; default: /* MAPI_ERROR_GETTING_LIBS */ return; break; } //Load function libraries str = (char *)qbuf.data; while((s=strchr(str,':'))!=NULL) { *s='\0'; mapilh_load_library(libpath,str); str=s+1; } mapilh_load_library(libpath,str); return; } #ifdef DIMAPI //Initializes DIMAPI - called only once by pthread_once() static void dmapi_init() { pthread_once(&initialized, (void*)init); return; } #endif int mapi_connect(int fd) //Connect to a mapi flow //fd = flow descriptor { struct mapiipcbuf qbuf; flowdescr_t* flow; #ifdef DIMAPI remote_flowdescr_t* rflow; host_flow* hflow; flist_node_t* fnode; #endif if (!minit) { printf("MAPI not initialized! [%s:%d]\n", __FILE__, __LINE__); local_err = MAPI_INIT_ERROR; return -1; }else if (fd<=0){ printf("ERROR: Invalid flow descriptor (fd: %d) in mapi_connect\n", fd); return -1; } #ifdef DIMAPI if ((rflow=flist_get(remote_flowlist,fd))!=NULL) {//flow is remote for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; hflow->dbuf->cmd=CONNECT; hflow->dbuf->fd=hflow->fd; hflow->dbuf->length=BASIC_SIZE; } #ifdef WITH_AUTHENTICATION if(rflow->is_authenticated == 0) { printf("ERROR: Flow with id %d is not authenticated\n", fd); rflow->is_connected = 0; return(-2); } #endif if (mapiipc_remote_write_to_all(rflow)<0){ local_err = MCOM_SOCKET_ERROR; return -1; } //wait results for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; switch(hflow->dbuf->cmd) { case CONNECT_ACK: rflow->is_connected=1; continue; case ERROR_ACK: memcpy(&local_err, hflow->dbuf->data, sizeof(int)); return -1; default: local_err = MCOM_UNKNOWN_ERROR; return -1; } } return 0; } #endif if ((flow=flist_get(flowlist,fd))==NULL) { printf("ERROR: Invalid flow %d [%s:%d]\n", fd, __FILE__, __LINE__); local_err = MAPI_INVALID_FLOW; return -1; } qbuf.mtype=1; qbuf.cmd=CONNECT; #ifdef RECONNECT qbuf.fd = flow->fd; qbuf.user_fd = fd; #else qbuf.fd=fd; #endif qbuf.pid=getpid(); pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return -1; } pthread_spin_unlock(&mapi_lock); switch(qbuf.cmd) { case CONNECT_ACK: flow->is_connected=1; return 0; case ERROR_ACK: local_err = qbuf.remote_errorcode; return -1; default: local_err= MCOM_UNKNOWN_ERROR; return -1; } } #ifdef DIMAPI static void delete_remote_flow(remote_flowdescr_t* rflow) { host_flow* hflow; flist_node_t* fnode, *fnode2, *fnode3; mapi_results_t* res; pthread_spin_lock(&remote_ipc_lock); flist_remove(remote_flowlist, rflow->fd); decr_numflows(); pthread_spin_unlock(&remote_ipc_lock); sem_destroy(&rflow->fd_sem); sem_destroy(&rflow->pkt_sem); //pthread_mutex_destroy(&rflow->mutex); for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=fnode2 ) { fnode2=flist_next(fnode); hflow=(host_flow*)fnode->data; hflow->rhost->num_flows--; flist_remove(hflow->rhost->flows, hflow->fd); for (fnode3=flist_head(hflow->functions); fnode3!=NULL; fnode3=flist_next(fnode3)) { free( flist_remove(hflow->rhost->functions, ((function_data*)fnode3->data)->fid) ); } if (hflow->rhost->num_flows==0) { pthread_cancel(*hflow->rhost->comm_thread); mapiipc_remote_close(hflow->rhost); //close the socket flist_destroy(hflow->rhost->flows); free(hflow->rhost->flows); flist_destroy(hflow->rhost->functions); free(hflow->rhost->functions); free(hflow->rhost->hostname); flist_remove(hostlist, hflow->rhost->sockfd); free(hflow->rhost->comm_thread); #ifdef RECONNECT sem_destroy(&hflow->rhost->connection); // destroy semaphore #endif free(hflow->rhost); } //we check if a host is using in other rflows and delete it -close the socket- if not flist_destroy(hflow->functions); free(hflow->functions); free(hflow->dev); free(hflow->dbuf); if (hflow->pkt!=NULL) free(hflow->pkt); flist_remove(rflow->host_flowlist,hflow->id); free(hflow); } flist_destroy(rflow->host_flowlist); free(rflow->host_flowlist); if (rflow->pkt_list!=NULL) { flist_destroy(rflow->pkt_list); free(rflow->pkt_list); } for (fnode=flist_head(rflow->function_res); fnode!=NULL; fnode=fnode2 ) { fnode2=flist_next(fnode); res=(mapi_results_t*)fnode->data; free(res->res); free(res); } flist_destroy(rflow->function_res); free(rflow->function_res); if (rflow->pkt!=NULL) free(rflow->pkt); #ifdef WITH_AUTHENTICATION if (rflow->username!=NULL) free(rflow->username); if (rflow->vo!=NULL) free(rflow->vo); #ifdef RECONNECT if (rflow->password!=NULL) free(rflow->password); #endif #endif free(rflow); } #endif int mapi_create_flow(const char *dev) //Create new flow //dev=device that should be used { struct mapiipcbuf qbuf; flowdescr_t *flow, *tmpflow; struct devgroupdb *devgroupdb; int devgroupid = 0; char *mapidsocket, *mapidsocketglobal; /* int local;*/ #ifdef DIMAPI remote_flowdescr_t *rflow; char *hostname=NULL, *s=NULL, *k=NULL; struct host *h=NULL; host_flow* hflow; char *devp; flist_node_t* fnode; unsigned int idgen=0; #endif #ifdef RECONNECT offline_device *device = NULL; flist_node_t *fnode_; int flag = 0; #endif if(dev==NULL){ printf("ERROR: Wrong device name given (NULL) in mapi_create_flow\n"); local_err = MAPI_DEVICE_INFO_ERR; return -1; } #ifndef DIMAPI devgroupdb = devgroupdb_open(3); // try local, global devgroupid = devgroupdb_getgroupidbydevice(devgroupdb, (char *) dev); if(!devgroupid) { mapidsocket = printf_string(MAPIDSOCKHOME, getenv("HOME")); mapidsocketglobal = strdup(MAPIDSOCKGLOBAL); } else { mapidsocket = printf_string(MAPIDGSOCKHOME, getenv("HOME"), devgroupid); mapidsocketglobal = printf_string(MAPIDGSOCKGLOBAL, devgroupid); } mapiipc_set_socket_names(mapidsocket, mapidsocketglobal); pthread_once(&mapi_is_initialized, (void*)mapi_init); #endif //check if flow is remote or not and call the appropriate init function #ifdef DIMAPI if ( strchr(dev,':')==NULL) { devgroupdb = devgroupdb_open(3); // try local, global devgroupid = devgroupdb_getgroupidbydevice(devgroupdb, (char *) dev); if(!devgroupid) { mapidsocket = printf_string(MAPIDSOCKHOME, getenv("HOME")); mapidsocketglobal = strdup(MAPIDSOCKGLOBAL); } else { mapidsocket = printf_string(MAPIDGSOCKHOME, getenv("HOME"), devgroupid); mapidsocketglobal = printf_string(MAPIDGSOCKGLOBAL, devgroupid); } mapiipc_set_socket_names(mapidsocket, mapidsocketglobal); pthread_once(&mapi_is_initialized, (void*)mapi_init); } else pthread_once(&dmapi_is_initialized, (void*)dmapi_init); if ((s = strchr(dev,':'))!=NULL) { devp=strdup(dev); rflow=(remote_flowdescr_t *)malloc(sizeof(remote_flowdescr_t)); rflow->fd=++fdseed; sem_init(&rflow->fd_sem, 0, 0); sem_init(&rflow->pkt_sem, 0, 0); rflow->host_flowlist=(flist_t*)malloc(sizeof(flist_t)); flist_init(rflow->host_flowlist); rflow->pkt_list=NULL; rflow->function_res=(flist_t*)malloc(sizeof(flist_t)); rflow->is_connected = 0; #ifdef WITH_AUTHENTICATION rflow->username=NULL; rflow->vo=NULL; #ifdef RECONNECT rflow->password = NULL; rflow->daemons_down = 0; #endif #endif rflow->pkt=NULL; flist_init(rflow->function_res); k=strtok(devp, ", "); while (k!=NULL) { if ((s = strchr(k,':'))!=NULL) { *s = '\0'; hostname = k; k = s + 1; pthread_spin_lock(&hostlist_lock); h = (struct host *)flist_search(hostlist, hostcmp, hostname); if(h==NULL){// Our host is a new one --> insert it in the hostlist h = (struct host *)malloc(sizeof(struct host)); h->hostname = strdup(hostname); h->port = dimapi_port; h->flows = (flist_t *)malloc(sizeof(flist_t)); flist_init(h->flows); h->functions = (flist_t *)malloc(sizeof(flist_t)); flist_init(h->functions); h->num_flows=0; h->stats=NULL; #ifdef RECONNECT sem_init(&h->connection, 0, 0); // initialize semaphore #endif // Create the socket if (mapiipc_remote_init(h)<0) { local_err = MCOM_SOCKET_ERROR; printf("ERROR: Could not connect with host %s [%s:%d]\n", h->hostname, __FILE__, __LINE__); pthread_spin_unlock(&hostlist_lock); return -1; } h->comm_thread=(pthread_t *)malloc(sizeof(pthread_t)); pthread_create(h->comm_thread, NULL, *mapiipc_comm_thread, h); flist_append(hostlist, h->sockfd, h); pthread_spin_unlock(&hostlist_lock); } else{//host exists in the list pthread_spin_unlock(&hostlist_lock); } h->num_flows++; hflow=(host_flow*)malloc(sizeof(host_flow)); hflow->scope_fd=rflow->fd; hflow->dev=strdup(k); flist_append(rflow->host_flowlist, ++idgen, hflow); hflow->id=idgen; flist_append(h->flows, --negfdseed, hflow); hflow->fd=negfdseed; hflow->dbuf=(struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf)); hflow->pkt=NULL; hflow->rhost=h; hflow->functions=(flist_t *)malloc(sizeof(flist_t)); flist_init(hflow->functions); hflow->dbuf->cmd=CREATE_FLOW; strncpy((char *)hflow->dbuf->data,k,DATA_SIZE); hflow->dbuf->length=BASIC_SIZE+strlen(k)+1; } else { //this is the case where the dev string contains both 'host:interface1' and 'interface2' //example: mapi_create_flow("139.91.70.98:eth0, 147.52.16.102:eth0, eth1"); //user's intention is probably localhost:eth1 //what should be done in this case? } k=strtok(NULL,", "); } free(devp); rflow->scope_size=flist_size(rflow->host_flowlist); pthread_spin_lock(&remote_ipc_lock); flist_append(remote_flowlist, rflow->fd, rflow); incr_numflows(); incr_totalflows(); pthread_spin_unlock(&remote_ipc_lock); if (mapiipc_remote_write_to_all(rflow)<0){ local_err = MCOM_SOCKET_ERROR; return -1; } //sends to all hosts of rflow the proper dbuf, increment the pending_msgs makes sem_wait(rflow->fd_sem) and the comm_thread will get the results - the hflow->fd for every flow - //wait for results for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; if (hflow->dbuf->cmd == CREATE_FLOW_ACK) { hflow->fd=*((int*)hflow->dbuf->data); flist_remove(hflow->rhost->flows, hflow->dbuf->fd); flist_append(hflow->rhost->flows, hflow->fd, hflow); } else if (hflow->dbuf->cmd == ERROR_ACK) { memcpy(&local_err, hflow->dbuf->data, sizeof(int)); printf("ERROR: Could not create flow in host %s [%s:%d]\n", hflow->rhost->hostname, __FILE__, __LINE__); delete_remote_flow(rflow); return -1; } else { local_err = MCOM_UNKNOWN_ERROR; delete_remote_flow(rflow); return -1; } } return rflow->fd; } #endif pthread_spin_lock(&mapi_lock); if ((get_numflows() == 0) && (get_totalflows() > 0) && minit) { // socket has been closed, re-create it if(mapiipc_client_init()==-1){ local_err = MCOM_INIT_SOCKET_ERROR; } incr_numflows(); } else incr_numflows(); pthread_spin_unlock(&mapi_lock); #ifdef RECONNECT /* consider the following case : - devicename = mapi_create_offline_device("./tracefile", MFF_PCAP); - { code ... --> reconnection code ... } fd = mapi_create_flow(devicename); Now mapi_create_flow must send to mapid the new device name ... */ pthread_spin_lock(&mapi_lock); for(fnode_ = flist_head(offline_device_list); fnode_ != NULL; fnode_ = flist_next(fnode_)){ // find specified device in offline_device_list device = (offline_device *)fnode_->data; if(!strcmp(device->previous_device, dev)){ flag = 1; // FIXME break; } } if(flag) strncpy((char *) qbuf.data, device->new_device, DATA_SIZE); // the new device name else strncpy((char *) qbuf.data, dev, DATA_SIZE); flag = 0; pthread_spin_unlock(&mapi_lock); #else strncpy((char *)qbuf.data,dev,DATA_SIZE); #endif qbuf.mtype=1; qbuf.cmd=CREATE_FLOW; qbuf.fd=getpid(); qbuf.pid=getpid(); pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); decr_numflows(); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); decr_numflows(); return -1; } pthread_spin_unlock(&mapi_lock); switch(qbuf.cmd) { case CREATE_FLOW_ACK: tmpflow=flist_get(flowlist,qbuf.fd); if (tmpflow!=NULL) { printf("ERROR: Mapid gave us a fd (%d) which already exist in our lists, exiting [%s:%d]\n", qbuf.fd, __FILE__, __LINE__); decr_numflows(); return -1; } flow=malloc(sizeof(flowdescr_t)); if( flow == NULL ){ printf("ERROR: Out of memory [%s:%d]\n", __FILE__, __LINE__); decr_numflows(); return -1; } flow->fd=qbuf.fd; flow->devtype=(char *)malloc(strlen((char *)qbuf.data)+1); flow->flist=malloc(sizeof(flist_t)); #ifdef RECONNECT flow->device = strdup(dev); flow->format = -1; // in case of online flow, assigned to -1 flow->read_results_flag = 0; flow->mapid_down = 0; #endif flow->shm_base=NULL; flow->shm_spinlock=NULL; flow->file = -1; // in case of online flow, assigned to -1 flow->is_connected =0; flow->numfd = 0; // initialize number of open file descriptors to zero flist_init(flow->flist); strcpy(flow->devtype,(char *)qbuf.data); pthread_spin_lock(&mapi_lock); flist_append(flowlist,qbuf.fd,flow); incr_totalflows(); #ifdef DIMAPI fdseed++; #endif pthread_spin_unlock(&mapi_lock); return qbuf.fd; /* should probably have a separate error message for ERROR_ACK? */ case ERROR_ACK: decr_numflows(); local_err=qbuf.remote_errorcode; return -1; default: decr_numflows(); local_err=MCOM_UNKNOWN_ERROR; return -1; } } int mapi_create_offline_flow(const char *dev, int format) //Create new flow //dev=device that should be used { struct mapiipcbuf qbuf; flowdescr_t *flow=NULL; int file; struct devgroupdb *devgroupdb; int devgroupid = 0; char *mapidsocket, *mapidsocketglobal; devgroupdb = devgroupdb_open(3); // try local, global devgroupid = devgroupdb_getgroupid(devgroupdb); // get groupid of the latest instance of mapid if(!devgroupid) { mapidsocket = printf_string(MAPIDSOCKHOME, getenv("HOME")); mapidsocketglobal = strdup(MAPIDSOCKGLOBAL); } else { mapidsocket = printf_string(MAPIDGSOCKHOME, getenv("HOME"), devgroupid); mapidsocketglobal = printf_string(MAPIDGSOCKGLOBAL, devgroupid); } mapiipc_set_socket_names(mapidsocket, mapidsocketglobal); pthread_once(&mapi_is_initialized, (void*)mapi_init); //Check to see if file can be opened if (dev==NULL){ printf("ERROR: NULL device in mapi_create_offline_flow\n"); return -1; } else if ((file=open(dev,O_LARGEFILE))==-1) { local_err=MAPI_ERROR_FILE; return -1; } pthread_spin_lock(&mapi_lock); if ((get_numflows() == 0) && (get_totalflows() > 0) && minit) { // socket has been closed, re-create it if (mapiipc_client_init()<0) { local_err = MCOM_INIT_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } incr_numflows(); } else incr_numflows(); pthread_spin_unlock(&mapi_lock); qbuf.mtype=1; qbuf.cmd=CREATE_OFFLINE_FLOW; qbuf.fd=getpid(); qbuf.pid=getpid(); qbuf.fid=format; strncpy((char *)qbuf.data,dev,DATA_SIZE); pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; decr_numflows(); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; decr_numflows(); return -1; } if(qbuf.cmd==SEND_FD) { if(mapiipc_send_fd(file)==-1) { local_err=MAPI_ERROR_SEND_FD; pthread_spin_unlock(&mapi_lock); decr_numflows(); return -1; } } else { local_err=MAPI_ERROR_SEND_FD; pthread_spin_unlock(&mapi_lock); decr_numflows(); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; decr_numflows(); return -1; } pthread_spin_unlock(&mapi_lock); switch(qbuf.cmd) { case CREATE_OFFLINE_FLOW_ACK: flow=malloc(sizeof(flowdescr_t)); flow->fd=qbuf.fd; flow->devtype=(char *)malloc(strlen((char *)qbuf.data)+1); flow->flist=malloc(sizeof(flist_t)); #ifdef RECONNECT flow->device = strdup(dev); flow->format = format; // MFF_PCAP or MFF_DAG_ERF #endif flow->shm_base=NULL; flow->shm_spinlock=NULL; flow->file = file; // file descriptor of offline flow flow->is_connected=0; flow->numfd = 0; // initialize number of open file descriptors to zero flist_init(flow->flist); strcpy(flow->devtype,(char *)qbuf.data); pthread_spin_lock(&mapi_lock); flist_append(flowlist,qbuf.fd,flow); incr_totalflows(); #ifdef DIMAPI fdseed++; #endif pthread_spin_unlock(&mapi_lock); return qbuf.fd; case ERROR_ACK: local_err=qbuf.remote_errorcode; decr_numflows(); return -1; default: local_err=MCOM_UNKNOWN_ERROR; decr_numflows(); return -1; } } char* mapi_create_offline_device(const char *path, int format) //Create new flow //dev=device that should be used { struct mapiipcbuf qbuf; int file; #ifdef RECONNECT offline_device *device = NULL; #endif char *mapidsocket, *mapidsocketglobal; mapidsocket = printf_string(MAPIDSOCKHOME, getenv("HOME")); mapidsocketglobal = strdup(MAPIDSOCKGLOBAL); mapiipc_set_socket_names(mapidsocket, mapidsocketglobal); pthread_once(&mapi_is_initialized, (void*)mapi_init); //Check to see if file can be opened if (path==NULL){ printf("ERROR: NULL path in mapi_create_offline_device\n"); return NULL; } else if ((file=open(path,O_LARGEFILE))==-1) { local_err=MAPI_ERROR_FILE; return NULL; } pthread_spin_lock(&mapi_lock); if ((get_numflows() == 0) && (get_totalflows() > 0) && minit) { // socket has been closed, re-create it if (mapiipc_client_init()<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_INIT_SOCKET_ERROR; return NULL; } } pthread_spin_unlock(&mapi_lock); qbuf.mtype=1; qbuf.cmd=CREATE_OFFLINE_DEVICE; qbuf.fd=getpid(); qbuf.pid=getpid(); qbuf.fid=format; strncpy((char *)qbuf.data,path,DATA_SIZE); pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return NULL; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return NULL; } if(qbuf.cmd==SEND_FD) { if(mapiipc_send_fd(file)==-1) { local_err=MAPI_ERROR_SEND_FD; pthread_spin_unlock(&mapi_lock); return NULL; } } else { local_err=MAPI_ERROR_SEND_FD; pthread_spin_unlock(&mapi_lock); return NULL; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return NULL; } pthread_spin_unlock(&mapi_lock); switch(qbuf.cmd) { case CREATE_OFFLINE_DEVICE_ACK: #ifdef RECONNECT device = (offline_device *)malloc(sizeof(offline_device)); device->path = strdup(path); device->previous_device = strdup((char *) qbuf.data); // previous device name device->new_device = strdup((char *) qbuf.data); // new device name, after reconnection of client device->format = format; device->is_started = 0; pthread_spin_lock(&mapi_lock); flist_append(offline_device_list, count_offline_devices++, device); pthread_spin_unlock(&mapi_lock); #endif offline_devices++; return strdup((char *)qbuf.data); case ERROR_ACK: local_err=qbuf.remote_errorcode; return NULL; default: local_err=MCOM_UNKNOWN_ERROR; return NULL; } } int mapi_start_offline_device(const char *dev) //Create new flow //dev=device that should be used { struct mapiipcbuf qbuf; #ifdef RECONNECT offline_device *device = NULL; flist_node_t *fnode; #endif pthread_once(&mapi_is_initialized, (void*)mapi_init); if (dev==NULL){ printf("ERROR: NULL device in mapi_start_offline_device\n"); return -1; } pthread_spin_lock(&mapi_lock); if ((get_numflows() == 0) && (get_totalflows() > 0) && minit) { // socket has been closed, re-create it if (mapiipc_client_init()<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_INIT_SOCKET_ERROR; return -1; } } pthread_spin_unlock(&mapi_lock); #ifdef RECONNECT for(fnode = flist_head(offline_device_list); fnode != NULL; fnode = flist_next(fnode)){ // find specified device in offline_device_list device = (offline_device *)fnode->data; if(!strcmp(device->previous_device, dev)) break; } #endif qbuf.mtype=1; qbuf.cmd=START_OFFLINE_DEVICE; qbuf.fd=getpid(); qbuf.pid=getpid(); #ifdef RECONNECT strncpy((char *) qbuf.data, device->new_device, DATA_SIZE); #else strncpy((char *) qbuf.data, dev, DATA_SIZE); #endif qbuf.fid=0; pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return -1; } pthread_spin_unlock(&mapi_lock); switch(qbuf.cmd) { case START_OFFLINE_DEVICE_ACK: #ifdef RECONNECT device->is_started = 1; #endif return 0; case ERROR_ACK: local_err=qbuf.remote_errorcode; return -1; default: local_err=MCOM_UNKNOWN_ERROR; return -1; } } int mapi_delete_offline_device(char *dev) //Create new flow //dev=device that should be used { struct mapiipcbuf qbuf; #ifdef RECONNECT offline_device *device = NULL; flist_node_t *fnode; int node_id; #endif pthread_once(&mapi_is_initialized, (void*)mapi_init); if (dev==NULL){ printf("ERROR: NULL device in mapi_delete_offline_device\n"); return -1; } #ifdef RECONNECT for(fnode = flist_head(offline_device_list); fnode != NULL; fnode = flist_next(fnode)){ // find specified device in offline_device_list device = (offline_device *)fnode->data; if(!strcmp(device->previous_device, dev)){ node_id = flist_id(fnode); // id of the node break; } } #endif qbuf.mtype=1; qbuf.cmd=DELETE_OFFLINE_DEVICE; qbuf.fd=getpid(); qbuf.pid=getpid(); #ifdef RECONNECT strncpy((char *) qbuf.data, device->new_device, DATA_SIZE); #else strncpy((char *) qbuf.data, dev, DATA_SIZE); #endif free(dev); qbuf.fid=0; pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return -1; } #ifdef RECONNECT free(device->previous_device); free(device->new_device); free(device->path); #endif pthread_spin_unlock(&mapi_lock); switch(qbuf.cmd) { case DELETE_OFFLINE_DEVICE_ACK: pthread_spin_lock(&mapi_lock); if((get_numflows() == 0) && --offline_devices == 0) mapiipc_client_close(); #ifdef RECONNECT free(flist_remove(offline_device_list, node_id)); #endif pthread_spin_unlock(&mapi_lock); return 0; case ERROR_ACK: local_err=qbuf.remote_errorcode; return -1; default: local_err=MCOM_UNKNOWN_ERROR; return -1; } } int mapi_close_flow(int fd) { functdescr_t *f=NULL; flowdescr_t *flow=NULL; struct mapiipcbuf qbuf; #ifdef RECONNECT flist_node_t *fnode_ = NULL; functdescr_t *fun = NULL; #endif if (fd<=0){ printf("ERROR: Wrong fd (fd: %d) in mapi_close_flow\n", fd); return -1; } #ifdef DIMAPI remote_flowdescr_t *rflow=flist_get(remote_flowlist, fd); host_flow* hflow; flist_node_t* fnode; if(rflow!=NULL){ for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; hflow->dbuf->cmd=CLOSE_FLOW; hflow->dbuf->fd=hflow->fd; hflow->dbuf->length=BASIC_SIZE; } rflow->is_connected=0; for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; hflow->dbuf->fd=hflow->fd; if (mapiipc_remote_write(hflow->dbuf, hflow->rhost)<0) { local_err = MCOM_SOCKET_ERROR; return -1; } } delete_remote_flow(rflow); return 0; } #endif if(flowlist && (flow = flist_get(flowlist, fd))==NULL){ printf("ERROR: Invalid flow %d [%s:%d]\n", fd, __FILE__, __LINE__); local_err = MAPI_INVALID_FLOW; return -1; } else { //XXX MEMORY LEAK why leave data since the node is removed???? flow = flist_remove(flowlist,fd); } //Delete functions applied first, before mapid is notified. pthread_spin_lock(&mapi_lock); while((f = flist_pop_first(flow->flist)) != NULL) { if(f->def->client_cleanup!=NULL && f->funct->instance->status==MAPIFUNC_INIT){ f->def->client_cleanup(f->funct->instance); } #ifdef RECONNECT // remove nodes with functions applied to specified flow for(fnode_ = flist_head(function_list); fnode_ != NULL; fnode_ = flist_next(fnode_)){ // iterate function list fun = (functdescr_t *)fnode_->data; if(fun == f){ // XXX flist_remove(function_list, flist_id(fnode_)); break; } } #endif if(f->result!=NULL) { free(f->result->res); free(f->result); } free(f->funct->instance); free(f->funct); free(f->data); free(f); } pthread_spin_unlock(&mapi_lock); qbuf.mtype=1; qbuf.cmd=CLOSE_FLOW; #ifdef RECONNECT qbuf.fd = flow->fd; qbuf.user_fd = fd; #else qbuf.fd=fd; #endif qbuf.fid=getpid(); qbuf.pid=getpid(); pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); return -1; } pthread_spin_unlock(&mapi_lock); switch(qbuf.cmd) { case CLOSE_FLOW_ACK: // if this is the last one, release socket resources break; case ERROR_ACK: local_err=MCOM_ERROR_ACK; break; default: local_err=MCOM_UNKNOWN_ERROR; break; } //Detach shared mem if (flow->shm_base!=NULL) { if (shmdt(flow->shm_base)<0) { printf("WARNING: Could not detach shared mem (%s) [%s:%d]\n", strerror(errno), __FILE__, __LINE__); } } /* subtract, the flow should be closed either due to an error or * explicitly in this function. so numflows is really number of allocated * flows... */ pthread_spin_lock(&mapi_lock); if((decr_numflows() == 0) && offline_devices == 0) mapiipc_client_close(); pthread_spin_unlock(&mapi_lock); //Free flow resources int i; // close opened file descriptors for(i = 0; i < flow->numfd; i++) close(flow->fds[i]); if(flow->file != -1) // close file descriptor in case of offline flow close(flow->file); free(flow->devtype); free(flow->flist); #ifdef RECONNECT free(flow->device); #endif free(flow); return 0; } //XXX Why send_fd returns 0 on error whereas every other function //return -1 on error? static int send_fd(int *fds, int num) { int c; struct mapiipcbuf qbuf; for(c=0;cis_connected){ printf("ERROR: Can not apply function %s on an already connected flow\n", funct); local_err = MFUNCT_COULD_NOT_APPLY_FUNCT; return -1; } is_remote = 1;//indicates that flow is remote } else { #endif if ((flow=flist_get(flowlist,fd))==NULL) { printf("ERROR: Invalid flow %d [%s:%d]\n", fd, __FILE__, __LINE__); local_err = MAPI_INVALID_FLOW; return -1; } if(flow->is_connected){ printf("ERROR: Can not apply function %s on an already connected flow\n", funct); local_err = MFUNCT_COULD_NOT_APPLY_FUNCT; return -1; } #ifdef DIMAPI } if (is_remote) fdef=mapilh_get_function_def(funct,"1.3"); else #endif //Get information about function fdef=mapilh_get_function_def(funct,flow->devtype); if(fdef==NULL) { printf("ERROR: Could not find/match function %s [%s:%d]\n", funct, __FILE__, __LINE__); local_err = MAPI_FUNCTION_NOT_FOUND; return -1; } #ifdef DIMAPI if(is_remote){//flow is remote for (fnode=flist_head(rflow->host_flowlist), i=1; fnode!=NULL; fnode=flist_next(fnode), i++) { hflow=(host_flow*)fnode->data; hflow->dbuf->cmd=APPLY_FUNCTION; hflow->dbuf->fd=hflow->fd; memcpy(hflow->dbuf->data, funct, strlen(funct)+1);//put function name in the buffer va_start(vl,funct); pos = qbuf.data; // point to start of arguments buffer arg_size = 0; #ifdef RECONNECT pos_ = qbuf_.data; // point to start of arguments buffer #endif if (agent==1) { args = va_arg(vl, unsigned char*); } // parse function arguments if(strncmp(fdef->argdescr, "", 1)){ // there are some args argdescr_ptr = fdef->argdescr; while(strlen(argdescr_ptr) > 0){ switch(*argdescr_ptr){ case 's': if (agent==0) temp=va_arg(vl, char*); else{ temp=(char*)args; args+=strlen(temp)+1; } addarg(&pos, temp, STRING); #ifdef RECONNECT addarg(&pos_, temp, STRING); #endif arg_size+=strlen(temp)+1; break; case 'S': // reference to flows and functions (e.g RES2FILE) if(agent == 0) fids = va_arg(vl, char *); else{ fids = (char *)args; args += strlen(fids) + 1; } #ifdef RECONNECT addarg(&pos_, fids, STRING); #endif // allocate more bytes, because we can have the following case: 7@2,8@2,9@2 translated to 12@3,13@3,14@3 new_fids = (char *)malloc(strlen(fids) * 10 * sizeof(char)); strncpy(buf, fids, DATA_SIZE); cfids = buf; while( (s = strchr(cfids, ',')) != NULL){ *s = '\0'; sscanf(cfids, "%d@%d", &fid_, &fd_); ref_flow = flist_get(remote_flowlist, fd_); if(ref_flow == NULL || i > ref_flow->scope_size){ printf("ERROR: Invalid flow in function arguments [%s:%d]\n", __FILE__, __LINE__); local_err = MAPI_INVALID_FID_FUNCID; return -1; } tmp_fd = ((host_flow *)flist_get(ref_flow->host_flowlist, i))->fd; fdata = flist_get(hflow->rhost->functions, fid_); if(fdata == NULL){ printf("ERROR: Invalid fid in function arguments [%s:%d]\n", __FILE__, __LINE__); local_err = MAPI_INVALID_FID_FUNCID; return -1; } tmp_fid = fdata->fid; if(len != 0) len += sprintf(new_fids + len, ","); len += sprintf(new_fids + len, "%d", tmp_fid); len += sprintf(new_fids + len, "@"); len += sprintf(new_fids + len, "%d", tmp_fd); cfids = s + 1; } sscanf(cfids, "%d@%d", &fid_, &fd_); ref_flow = flist_get(remote_flowlist, fd_); if(ref_flow == NULL || i > ref_flow->scope_size){ printf("ERROR: Invalid flow in function arguments [%s:%d]\n", __FILE__, __LINE__); local_err = MAPI_INVALID_FID_FUNCID; return -1; } tmp_fd = ((host_flow *)flist_get(ref_flow->host_flowlist, i))->fd; fdata = flist_get(hflow->rhost->functions, fid_); if(fdata == NULL){ printf("ERROR: Invalid fid in function arguments [%s:%d]\n", __FILE__, __LINE__); local_err = MAPI_INVALID_FID_FUNCID; return -1; } tmp_fid = fdata->fid; if(len != 0) len += sprintf(new_fids + len, ","); len += sprintf(new_fids + len, "%d", tmp_fid); len += sprintf(new_fids + len, "@"); len += sprintf(new_fids + len, "%d", tmp_fd); addarg(&pos, new_fids, STRING); arg_size += strlen(new_fids) + 1; len = 0; free(new_fids); new_fids = NULL; break; case 'i': if (agent==0) tmp = va_arg(vl, int); else{ memcpy(&tmp, args, sizeof(int)); args+=sizeof(int); } addarg(&pos, &tmp, INT); #ifdef RECONNECT addarg(&pos_, &tmp, INT); #endif arg_size+=sizeof(int); break; case 'r': //reference to a flow if (agent==0) tmp = va_arg(vl, int); else{ memcpy(&tmp, args, sizeof(int)); args+=sizeof(int); } #ifdef RECONNECT addarg(&pos_, &tmp, INT); // fd given to user ... #endif ref_flow=flist_get(remote_flowlist, tmp); if (ref_flow==NULL || i>ref_flow->scope_size){ printf("ERROR: Invalid flow in function arguments [%s:%d]\n", __FILE__, __LINE__); local_err = MAPI_INVALID_FID_FUNCID; return -1; } tmp=((host_flow*)flist_get(ref_flow->host_flowlist, i))->fd; addarg(&pos, &tmp ,INT); arg_size+=sizeof(int); break; case 'f': //reference to a fuction if (agent==0) tmp = va_arg(vl, int); else{ memcpy(&tmp, args, sizeof(int)); args+=sizeof(int); } #ifdef RECONNECT addarg(&pos_, &tmp, INT); // fid given to user ... #endif fdata=flist_get(hflow->rhost->functions, tmp); if (fdata==NULL){ printf("ERROR: Invalid fid in function arguments [%s:%d]\n", __FILE__, __LINE__); local_err = MAPI_INVALID_FID_FUNCID; return -1; } tmp=fdata->fid; addarg(&pos, &tmp,INT); arg_size+=sizeof(int); break; case 'c': if (agent==0) ctmp = va_arg(vl, int); //`char' is promoted to `int' when passed through `...' else{ memcpy(&ctmp, args, sizeof(char)); args+=sizeof(char); } addarg(&pos, &ctmp, CHAR); #ifdef RECONNECT addarg(&pos_, &ctmp, CHAR); #endif arg_size+=sizeof(char); break; case 'l': if (agent==0) ltmp = va_arg(vl, unsigned long long); else{ memcpy(<mp, args, sizeof(unsigned long long)); args+=sizeof(unsigned long long); } addarg(&pos, <mp, UNSIGNED_LONG_LONG); #ifdef RECONNECT addarg(&pos_, <mp, UNSIGNED_LONG_LONG); #endif arg_size+=sizeof(unsigned long long); break; case 'w': // open file for writing if (agent==0) filename=va_arg(vl, char*); else{ filename=(char*)args; if (filename==NULL){ local_err = MAPI_ERROR_FILE; return -1; } args+=strlen(filename)+1; } if (filename==NULL) { local_err = MAPI_ERROR_FILE; return -1; } addarg(&pos, filename, STRING); #ifdef RECONNECT addarg(&pos_, filename, STRING); #endif arg_size+=strlen(filename)+1; break; default: local_err=MFUNCT_INVALID_ARGUMENT_DESCRIPTOR; printf("ERROR: Illegal argument descriptor %c [%s:%d]\n", *argdescr_ptr, __FILE__, __LINE__); return -1; } argdescr_ptr++; // move to the next arg } } va_end(vl); memcpy(hflow->dbuf->data+strlen(funct)+1, qbuf.data, arg_size); //argument size hflow->dbuf->length=BASIC_SIZE + strlen(funct) + 1 + arg_size; } if (mapiipc_remote_write_to_all(rflow)<0){ local_err = MCOM_SOCKET_ERROR; return -1; } fidseed++; for (fnode=flist_head(rflow->host_flowlist), i = 1; fnode!=NULL; fnode=flist_next(fnode), i++) { hflow=(host_flow*)fnode->data; switch(hflow->dbuf->cmd) { case APPLY_FUNCTION_ACK: fdata=(function_data*)malloc(sizeof(function_data)); fdata->fid=hflow->dbuf->fid; fdata->fdef=fdef; #ifdef RECONNECT memcpy(fdata->args, qbuf_.data, arg_size); fdata->hflow = hflow; fdata->index = i; #endif flist_append(hflow->functions, fidseed, fdata); flist_append(hflow->rhost->functions, fidseed, fdata); break; case ERROR_ACK: memcpy(&local_err, hflow->dbuf->data, sizeof(int)); return -1; default: local_err = MCOM_UNKNOWN_ERROR; return -1; } } return fidseed; } #endif va_start(vl,funct); pos = qbuf.data; // point to start of arguments buffer if (agent==1) { args = va_arg(vl, unsigned char*); } // parse function arguments if(strncmp(fdef->argdescr, "", 1)) { // there are some args argdescr_ptr = fdef->argdescr; while(strlen(argdescr_ptr) > 0){ switch(*argdescr_ptr) { case 's': if (agent==0) temp=va_arg(vl, char*); else { temp=(char*)args; args+=strlen(temp)+1; } addarg(&pos, temp, STRING); arg_size+=strlen(temp)+1; break; case 'S': if(agent == 0) fids = va_arg(vl, char *); else{ fids = (char *)args; args += strlen(fids) + 1; } addarg(&pos, fids, STRING); arg_size += strlen(fids) + 1; break; case 'i': if (agent==0) tmp = va_arg(vl, int); else { memcpy(&tmp, args, sizeof(int)); args+=sizeof(int); } addarg(&pos, &tmp, INT); arg_size+=sizeof(int); break; case 'r': if (agent==0) tmp = va_arg(vl, int); else { memcpy(&tmp, args, sizeof(int)); args+=sizeof(int); } addarg(&pos, &tmp, INT); arg_size+=sizeof(int); break; case 'f': if (agent==0) tmp = va_arg(vl, int); else { memcpy(&tmp, args, sizeof(int)); args+=sizeof(int); } addarg(&pos, &tmp, INT); arg_size+=sizeof(int); break; case 'c': if (agent==0) ctmp = va_arg(vl, int); //`char' is promoted to `int' when passed through `...' else { memcpy(&ctmp, args, sizeof(char)); args+=sizeof(char); } addarg(&pos, &ctmp, CHAR); arg_size+=sizeof(char); break; case 'l': if (agent==0) ltmp = va_arg(vl, unsigned long long); else { memcpy(<mp, args, sizeof(unsigned long long)); args+=sizeof(unsigned long long); } addarg(&pos, <mp, UNSIGNED_LONG_LONG); arg_size+=sizeof(unsigned long long); break; case 'w': //Open file for writing if (agent==0) filename=va_arg(vl, char*); else { filename=(char*)args; args+=strlen(filename)+1; } if (agent == 1) { tmp=open(filename, O_WRONLY|O_TRUNC|O_CREAT|O_EXCL|O_LARGEFILE,S_IRUSR|S_IWUSR); while(tmp==-1 && errno==EEXIST) { asprintf(&tmp_fname, "%s-%d", filename, un_id++); tmp=open(tmp_fname, O_WRONLY|O_TRUNC|O_CREAT|O_EXCL|O_LARGEFILE,S_IRUSR|S_IWUSR); free(tmp_fname); } } else { tmp=open(filename,O_WRONLY|O_TRUNC|O_CREAT|O_LARGEFILE,S_IRUSR|S_IWUSR); } if(tmp==-1) { printf("ERROR: Can not create file: %s [%s:%d]\n", filename, __FILE__, __LINE__); local_err=MAPI_ERROR_FILE; return -1; } printf("Created file %s for writing\n", filename); flow->fds[flow->numfd++] = tmp; numfd++; addarg(&pos, &tmp, INT); break; default: local_err=MFUNCT_INVALID_ARGUMENT_DESCRIPTOR; printf("ERROR: Illegal argument descriptor %c [%s:%d]\n", *argdescr_ptr, __FILE__, __LINE__); return -1; } argdescr_ptr++; // move to the next arg } } va_end(vl); qbuf.mtype=1; qbuf.cmd=APPLY_FUNCTION; #ifdef RECONNECT // FIXME qbuf.fd = flow->fd; qbuf.user_fd = fd; #else qbuf.fd = fd; #endif qbuf.pid=getpid(); strncpy(qbuf.function,funct,FUNCT_NAME_LENGTH); strncpy((char *)qbuf.argdescr,fdef->argdescr,ARG_LENGTH); pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } if(!send_fd(flow->fds,numfd)) { local_err=MAPI_ERROR_SEND_FD; pthread_spin_unlock(&mapi_lock); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return -1; } pthread_spin_unlock(&mapi_lock); switch(qbuf.cmd) { case APPLY_FUNCTION_ACK: break; case ERROR_ACK: local_err =qbuf.remote_errorcode; return -1; default: local_err = MCOM_UNKNOWN_ERROR; return -1; } fdef=mapilh_get_function_def(funct,qbuf.function); f=malloc(sizeof(functdescr_t)); f->fid=qbuf.fid; f->def=fdef; f->result_init=0; f->data=NULL; f->result=NULL; f->funct=malloc(sizeof(mapidflib_function_t)); #ifdef RECONNECT // FIXME f->funct->fd = flow->fd; #else f->funct->fd = fd; #endif f->funct->fid=qbuf.fid; f->funct->instance=malloc(sizeof(mapidflib_function_instance_t)); f->funct->instance->status=MAPIFUNC_UNINIT; f->funct->instance->hwinfo=NULL; f->funct->instance->result.data = NULL; f->funct->instance->result.data_size = 0; f->funct->instance->result.info.funct_res_size = 0; f->funct->instance->result.info.shm.res_size = 0; f->funct->instance->result.info.shm.buf_size = 0; f->funct->instance->internal_data=NULL; memcpy(f->funct->instance->args,qbuf.data,FUNCTARGS_BUF_SIZE); #ifdef PRECLASSIFICATION f->funct->instance->preclassification_mask = 0; /* not a showstopper func */ #endif #ifdef RECONNECT f->flow = flow; f->numfd = numfd; flist_append(function_list, qbuf.fid, f); #endif flist_append(flow->flist,qbuf.fid,f); return qbuf.fid; } int _request_result(flowdescr_t *flow,functdescr_t *f, struct mapiipcbuf *qbuf, int user_fd, int user_fid) { qbuf->mtype=1; qbuf->cmd=READ_RESULT; qbuf->fd=flow->fd; qbuf->fid=f->fid; #ifdef RECONNECT qbuf->user_fd = user_fd; qbuf->user_fid = user_fid; #endif qbuf->pid=getpid(); pthread_spin_lock(&mapi_lock); if (mapiipc_write(qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return -1; } if (mapiipc_read(qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return -1; } pthread_spin_unlock(&mapi_lock); switch(qbuf->cmd) { case READ_RESULT_ACK: break; case ERROR_ACK: local_err = qbuf->remote_errorcode; return -1; default: local_err = MCOM_UNKNOWN_ERROR; return -1; } return 0; } int get_results_info(flowdescr_t *flow,functdescr_t *f, int user_fd, int user_fid) { struct mapiipcbuf qbuf; if (flow==NULL) { printf("ERROR: Invalid flow (NULL) [%s:%d]\n", __FILE__, __LINE__); local_err = MAPI_INVALID_FLOW; return -1; } if (_request_result(flow,f,&qbuf, user_fd, user_fid)!=0) return -1; default_read_result_init(flow,f,&qbuf.data); if(f->def->client_init!=NULL) { int func_err=f->def->client_init(f->funct->instance,&qbuf.data); if(func_err!=0) { local_err = func_err; return -1; } f->funct->instance->status=MAPIFUNC_INIT; } f->result_init=1; return 0; } // old signature: int mapi_read_results(int fd, int fid, void *result) //Read result from a function //fd: flow descriptor //fid: ID of function mapi_results_t* mapi_read_results(int fd, int fid) { flowdescr_t *flow; functdescr_t *f; mapi_result_t res; struct timeval tv; /*used for timestamping results when produced */ #ifdef DIMAPI remote_flowdescr_t *rflow; host_flow* hflow; unsigned int currhost = 0; flist_node_t* fnode; mapi_results_t* results; int i; function_data* fdata; #endif #ifdef RECONNECT struct mapiipcbuf qbuf; #endif if (!minit) { printf("MAPI not initialized! [%s:%d]\n", __FILE__, __LINE__); local_err = MAPI_INIT_ERROR; return NULL; } else if(fd<=0 || fid <=0 ){ printf("ERROR: Wrong fd (fd: %d) or fid (fid: %d) in mapi_read_results\n", fd, fid); local_err = MAPI_INVALID_FLOW; return NULL; } #ifdef DIMAPI if ((rflow=flist_get(remote_flowlist,fd))!=NULL) { if(!rflow->is_connected){ printf("ERROR: In mapi_read_results always use mapi_connect first\n"); local_err = MAPI_FLOW_NOT_CONNECTED; return NULL; } if ((results=flist_get(rflow->function_res, fid))==NULL) { //init once results=(mapi_results_t*)malloc(sizeof(mapi_results_t)*rflow->scope_size); for (i=0; iscope_size; i++) { fdata=flist_get(((host_flow*)flist_head(rflow->host_flowlist)->data)->functions, fid); results[i].size=fdata->fdef->shm_size; results[i].res=(void*)malloc(results->size); } flist_append(rflow->function_res, fid, results); } for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; hflow->dbuf->cmd = READ_RESULT; hflow->dbuf->fd = hflow->fd; if ( (fdata=(function_data*)flist_get(hflow->functions, fid))==NULL ) { local_err = MAPI_INVALID_FID_FUNCID; return NULL; } hflow->dbuf->fid = fdata->fid; hflow->dbuf->length = BASIC_SIZE; } if (mapiipc_remote_write_to_all(rflow)<0){ local_err = MCOM_SOCKET_ERROR; return NULL; } //wait results for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; switch(hflow->dbuf->cmd) { case READ_RESULT_ACK: if ( hflow->dbuf->length - BASIC_SIZE > (unsigned int)results[currhost].size ) { // check memory allocation results[currhost].res = realloc(results[currhost].res, hflow->dbuf->length-BASIC_SIZE); results[currhost].size = hflow->dbuf->length-BASIC_SIZE; } memcpy(results[currhost].res, hflow->dbuf->data, hflow->dbuf->length-BASIC_SIZE); results[currhost].ts = hflow->dbuf->timestamp; results[currhost].size = hflow->dbuf->length-BASIC_SIZE; break; case ERROR_ACK: memcpy(&local_err, hflow->dbuf->data, sizeof(int)); return NULL; default: printf("ERROR: In read results! [%s:%d]\n", __FILE__, __LINE__); local_err = MCOM_UNKNOWN_ERROR; return NULL; } ++currhost; } return(results); } #endif if ((flow=flist_get(flowlist,fd))==NULL) { printf("ERROR: Invalid flow %d [%s:%d]\n", fd, __FILE__, __LINE__); local_err = MAPI_INVALID_FLOW; return NULL; } if(!flow->is_connected){ printf("ERROR: In mapi_read_results always use mapi_connect first\n"); local_err = MAPI_FLOW_NOT_CONNECTED; return NULL; } f=flist_get(flow->flist,fid); if(f!=NULL) { if(!f->result_init) if (get_results_info(flow,f, fd, fid) != 0) return NULL; if(f->def->client_init==NULL){ #ifdef RECONNECT qbuf.mtype = 1; qbuf.cmd = READ_RESULT; qbuf.fd = -1; qbuf.fid = -1; qbuf.pid = getpid(); if(flow->read_results_flag == 0) qbuf.size = 1; // use it in mapiipc_remote_write() else{ qbuf.size = 0; flow->read_results_flag = 0; // initialize for subsequent calls } pthread_spin_lock(&mapi_lock); if(mapiipc_write((struct mapiipcbuf *) &qbuf) < 0){ // send a dummy IPC message to mapid pthread_spin_unlock(&mapi_lock); flow->read_results_flag = 1; flow->mapid_down = 1; local_err = MAPI_READ_RESULT_RECONNECTION; return NULL; } if(mapiipc_read((struct mapiipcbuf *) &qbuf) < 0){ // read a dummy IPC message from mapid (blocking call) pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return NULL; } if(qbuf.cmd != READ_RESULT_ACK){ pthread_spin_unlock(&mapi_lock); local_err = MCOM_UNKNOWN_ERROR; return NULL; } pthread_spin_unlock(&mapi_lock); #endif if(f->data == NULL) return(0); else // FIXME in case of reconnection { if (f->def->restype==MAPIRES_IPC) { struct mapiipcbuf qbuf; void *data = (char *)&qbuf.data + 2*sizeof(mapid_shm_t); // get to the actual data if (_request_result(flow,f,&qbuf, fd, fid) != 0) { local_err = MCOM_UNKNOWN_ERROR; return NULL; } memcpy(f->result->res,data,f->funct->instance->result.data_size); } else { // MAPIRES_SHM pthread_spin_lock(flow->shm_spinlock); memcpy(f->result->res,((shm_result_t*)f->data)->ptr,((shm_result_t*)f->data)->size); pthread_spin_unlock(flow->shm_spinlock); } gettimeofday(&tv, NULL); f->result->ts=(unsigned long long)tv.tv_sec*1000000 + tv.tv_usec; f->result->size=((shm_result_t*)f->data)->size; return f->result; } } else { int func_err=f->def->client_read_result(f->funct->instance,&res); if(func_err!=0) { local_err = func_err; return NULL; } if (res.res == NULL) { f->result->res = NULL; } else { memcpy(f->result->res,res.res,res.size); } gettimeofday(&tv, NULL); f->result->ts=(unsigned long long)tv.tv_sec*1000000 + tv.tv_usec;; f->result->size=res.size; return f->result; } } else { local_err = MAPI_INVALID_FID_FUNCID; return NULL; } return NULL; } /** \brief Get the next packet from a to_buffer function \param fd flow descriptor \param fid id of TO_BUFFER function \return Reference to next packet, or NULL on error */ struct mapipkt * mapi_get_next_pkt(int fd,int fid) { flowdescr_t *flow; functdescr_t *f; mapi_result_t res; int func_err; #ifdef DIMAPI remote_flowdescr_t* rflow; host_flow* hflow; flist_node_t* fnode; function_data* fdata; #endif #ifdef RECONNECT struct mapiipcbuf qbuf; #endif if (!minit) { printf("MAPI not initialized! [%s:%d]\n", __FILE__, __LINE__); local_err = MAPI_INIT_ERROR; return NULL; } else if(fd<=0 || fid <=0 ){ printf("ERROR: Wrong fd (fd: %d) or fid (fid: %d) in mapi_get_next_pkt\n", fd, fid); local_err = MAPI_INVALID_FID_FUNCID; return NULL; } #ifdef DIMAPI if ((rflow=flist_get(remote_flowlist,fd))!=NULL) { //FIFO if(!rflow->is_connected){ printf("ERROR: In mapi_get_next_pkt always use mapi_connect first\n"); local_err = MAPI_FLOW_NOT_CONNECTED; return NULL; } if (rflow->pkt_list==NULL) { #ifdef RECONNECT // XXX ??? rflow->to_buffer_fid = fid; // fid of to_buffer() function #endif rflow->pkt_list=(flist_t*)malloc(sizeof(flist_t)); flist_init(rflow->pkt_list); rflow->pkt=(struct mapipkt*)malloc(sizeof(struct mapipkt)+PKT_LENGTH); for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; hflow->pkt=(struct mapipkt*)malloc(sizeof(struct mapipkt)+PKT_LENGTH); hflow->dbuf->cmd=GET_NEXT_PKT; hflow->dbuf->fd=hflow->fd; if ( (fdata=(function_data*)flist_get(hflow->functions, fid))==NULL ) { local_err = MAPI_INVALID_FID_FUNCID; return NULL; } hflow->dbuf->fid = fdata->fid; hflow->dbuf->length=BASIC_SIZE; if (mapiipc_remote_write(hflow->dbuf, hflow->rhost)<0) { local_err = MCOM_SOCKET_ERROR; return NULL; } } sem_wait(&rflow->pkt_sem); //wait at least one host packet hflow=(host_flow*)flist_pop_first(rflow->pkt_list); if (hflow==NULL) { local_err= MCOM_UNKNOWN_ERROR; return NULL; } memcpy(rflow->pkt, hflow->pkt, sizeof(struct mapipkt)-4+hflow->pkt->caplen); //send request for next packet from this host hflow->dbuf->cmd=GET_NEXT_PKT; hflow->dbuf->fd=hflow->fd; if ( (fdata=(function_data*)flist_get(hflow->functions, fid))==NULL ) return NULL; hflow->dbuf->fid = fdata->fid; hflow->dbuf->length=BASIC_SIZE; if(mapiipc_remote_write(hflow->dbuf, hflow->rhost)<0){ local_err = MCOM_SOCKET_ERROR; return NULL; } if (rflow->pkt->caplen == 0) { return NULL; } return rflow->pkt; } else { //if no packet arrived yet wait sem_wait(&rflow->pkt_sem); hflow=(host_flow*)flist_pop_first(rflow->pkt_list); if (hflow==NULL) { return NULL; } memcpy(rflow->pkt, hflow->pkt, sizeof(struct mapipkt)-4+hflow->pkt->caplen); //send request for next packet from this host #ifdef RECONNECT rflow->to_buffer_fid = fid; // fid of to_buffer() function #endif hflow->dbuf->cmd=GET_NEXT_PKT; hflow->dbuf->fd=hflow->fd; if ( (fdata=(function_data*)flist_get(hflow->functions, fid))==NULL ) return NULL; hflow->dbuf->fid = fdata->fid; hflow->dbuf->length=BASIC_SIZE; if (mapiipc_remote_write(hflow->dbuf, hflow->rhost)<0){ local_err = MCOM_SOCKET_ERROR; return NULL; } /* No packet, return null */ if (rflow->pkt->caplen == 0) { return NULL; } return rflow->pkt; } } #endif if ((flow=flist_get(flowlist,fd))==NULL) { printf("ERROR: Invalid flow %d [%s:%d]\n", fd, __FILE__, __LINE__); local_err = MAPI_INVALID_FLOW; return NULL; } if(!flow->is_connected){ printf("ERROR: In mapi_get_next_pkt always use mapi_connect first\n"); local_err = MAPI_FLOW_NOT_CONNECTED; return NULL; } if ( (f = flist_get(flow->flist,fid)) == NULL ){ local_err = MAPI_INVALID_FID_FUNCID; return NULL; } // This should be attaching shared memory segment with results if( !f->result_init ) if (get_results_info(flow,f, fd, fid) != 0) { printf("ERROR: Missing error message [%s:%d]\n", __FILE__, __LINE__); return NULL; } if ( f->def->client_read_result == NULL ){ printf("ERROR: Missing error message [%s:%d]\n", __FILE__, __LINE__); return NULL; } #ifdef RECONNECT qbuf.mtype = 1; qbuf.cmd = READ_RESULT; qbuf.fd = -1; qbuf.fid = -1; qbuf.pid = getpid(); pthread_spin_lock(&mapi_lock); if(mapiipc_write((struct mapiipcbuf *) &qbuf) < 0){ // send a dummy IPC message to mapid pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return NULL; } if(mapiipc_read((struct mapiipcbuf *) &qbuf) < 0){ // read a dummy IPC message from mapid (blocking call) pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return NULL; } if(qbuf.cmd != READ_RESULT_ACK){ pthread_spin_unlock(&mapi_lock); local_err = MCOM_UNKNOWN_ERROR; return NULL; } pthread_spin_unlock(&mapi_lock); #endif func_err = f->def->client_read_result(f->funct->instance,&res); if (func_err != 0) { local_err = func_err; return NULL; } return res.res; } int mapi_loop(int fd, int fid, int cnt, mapi_handler callback){ struct mapipkt* pkt; int i = 0; #ifdef DIMAPI remote_flowdescr_t* rflow; #endif flowdescr_t* flow; if (!minit) { printf("MAPI not initialized! [%s:%d]\n", __FILE__, __LINE__); local_err = MAPI_INIT_ERROR; return -1; } else if(fd<=0 || fid <=0 ){ printf("ERROR: Wrong fd (fd: %d) or fid (fid: %d) in mapi_loop\n", fd, fid); local_err = MAPI_INVALID_FID_FUNCID; return -1; } #ifdef DIMAPI if ((rflow=flist_get(remote_flowlist,fd))!=NULL) { if(!rflow->is_connected){ printf("ERROR: In mapi_loop always use mapi_connect first\n"); local_err = MAPI_FLOW_NOT_CONNECTED; return -1; } } else { #endif if((flow=flist_get(flowlist,fd))==NULL){ printf("ERROR: Invalid flow %d [%s:%d]\n", fd, __FILE__, __LINE__); local_err = MAPI_INVALID_FLOW; return -1; } if(!flow->is_connected){ printf("ERROR: In mapi_loop always use mapi_connect first\n"); local_err = MAPI_FLOW_NOT_CONNECTED; return -1; } #ifdef DIMAPI } #endif if( callback == NULL ){ local_err = MFUNCT_INVALID_ARGUMENT_4; return -1; } if (cnt > 0){ for (i = 0; i < cnt; i++){ pkt = mapi_get_next_pkt(fd, fid); if (pkt == NULL) return local_err; //flow->error; (*callback)(pkt); } } else{ while(1){ pkt = mapi_get_next_pkt(fd, fid); if (pkt == NULL) return local_err; //flow->error; (*callback)(pkt); } } return 0; } /* * Very simple function. Just reads the last error that was set. */ int mapi_read_error(int* err_no, char* errorstr) { int i =0; if(err_no == NULL && errorstr == NULL){ return -1; } if (err_no!=NULL) *err_no = local_err; if (errorstr!=NULL) { for(; Errors[i].err_no!=0; i++){ if(Errors[i].err_no == local_err){ if(strlen(Errors[i].desc) < MAPI_ERRORSTR_LENGTH){ strncpy(errorstr, Errors[i].desc, MAPI_ERRORSTR_LENGTH); } else strcpy(errorstr,"Error in mapi_read_error: Error string too long\n"); break; } } } local_err =0; //Extra work which can be used in the case we use //per flow error /* if (fd == -1) { *err_no = local_err; translate_local_errorcode(err_no,errorstr); local_err=0; } else { flowdescr_t* flow = flist_get(flowlist,fd); if (flow == NULL) { DEBUG_CMD(printf("Invalid flow: %d [%s:%d]\n",fd,__FILE__,__LINE__)); return -1; } pthread_spin_lock(&mapi_lock); strncpy(errorstr, flow->errstr, (strlen(flow->errstr)errstr):MAPI_ERRORSTR_LENGTH); pthread_spin_unlock(&mapi_lock); *err_no = flow->error; }*/ return 0; } /* // PER FLOW ERROR CHECKING REMOVED ATM. static int set_error(void* flow, int err_no , int is_remote) { //An general mapi stub error occured if(local_err!=0) return; if (flow==NULL) { DEBUG_CMD(printf("Invalid flow: NULL [%s:%d]\n",__FILE__,__LINE__)); return -1; } if(is_remote){ pthread_spin_lock(&mapi_lock); ((remote_flowdescr_t*)flow)->error = err_no; translate_local_errorcode(err_no,((remote_flowdescr_t*)flow)->errstr); pthread_spin_unlock(&mapi_lock); } else{ pthread_spin_lock(&mapi_lock); ((flowdescr_t*)flow)->error = err_no; translate_local_errorcode(err_no,((flowdescr_t*)flow)->errstr); pthread_spin_unlock(&mapi_lock); } return 0; } */ static int default_read_result_init(flowdescr_t *flow,functdescr_t* f,void* data) { mapid_shm_t *shm=data; mapid_shm_t *shm_spinlock=(mapid_shm_t*)((char*)data+sizeof(mapid_shm_t)); int id; if (flow==NULL) { printf("ERROR: Invalid flow (NULL) [%s:%d]\n", __FILE__, __LINE__); local_err = MAPI_INVALID_FLOW; return -1; } if(!flow->shm_base) { //Get pointer to shared memory id=shmget(shm->key, shm->buf_size, 660); if(id<0) { printf("ERROR: Shared memory error [%s:%d]\n", __FILE__, __LINE__); local_err=MAPI_SHM_ERR; return -1; } if ((flow->shm_base=shmat(id, 0, FUNCTION_SHM_PERMS))==NULL) { local_err = MAPI_SHM_ERR; return -1; } } if(!flow->shm_spinlock) { //Get pointer to shared spinlock memory id=shmget(shm_spinlock->key, shm_spinlock->buf_size, 660); if(id<0) { printf("ERROR: Shared memory error [%s:%d]\n", __FILE__, __LINE__); perror("Error: "); local_err=MAPI_SHM_ERR; return -1; } if ((flow->shm_spinlock=shmat(id, 0, FUNCTION_SHM_PERMS))==NULL) { local_err=MAPI_SHM_ERR; return -1; } } f->data=malloc(sizeof(shm_result_t)); ((shm_result_t*)f->data)->ptr=flow->shm_base+shm->offset; ((shm_result_t*)f->data)->size=shm->res_size; //Attach result to instance f->funct->instance->result.data=flow->shm_base+shm->offset; f->funct->instance->result.data_size=shm->res_size; f->result=(mapi_results_t*)malloc(sizeof(mapi_results_t)); f->result->res=(void *)malloc(((shm_result_t*)f->data)->size); return 0; } int mapi_get_function_info(int fd,int fid, mapi_function_info_t *info) { struct mapiipcbuf qbuf; flowdescr_t* flow; #ifdef DIMAPI remote_flowdescr_t* rflow; host_flow* hflow; flist_node_t* fnode; function_data* fdata; #endif #ifdef RECONNECT functdescr_t *f; #endif if (!minit) { printf("MAPI not initialized! [%s:%d]\n", __FILE__, __LINE__); local_err = MAPI_INIT_ERROR; return -1; } else if(fd<=0 || fid <=0 ){ printf("ERROR: Wrong fd (fd: %d) or fid (fid: %d) in mapi_get_function_info\n", fd, fid); local_err =MAPI_INVALID_FID_FUNCID ; return -1; } else if (info == NULL){ local_err = MFUNCT_INVALID_ARGUMENT_3; return -1; } #ifdef DIMAPI if ((rflow=flist_get(remote_flowlist,fd))!=NULL) {//flow is remote for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; hflow->dbuf->cmd=GET_FUNCTION_INFO; hflow->dbuf->fd=hflow->fd; if ( (fdata=(function_data*)flist_get(hflow->functions, fid))==NULL ) { local_err = MAPI_FUNCTION_INFO_ERR; return -1; } hflow->dbuf->fid = fdata->fid; hflow->dbuf->length=BASIC_SIZE; } if (mapiipc_remote_write_to_all(rflow)<0){ local_err = MCOM_SOCKET_ERROR; return -1; } //wait results for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; switch(hflow->dbuf->cmd) { case GET_FUNCTION_INFO_ACK: memcpy(info,hflow->dbuf->data,sizeof(mapi_function_info_t)); continue; case ERROR_ACK: local_err = MAPI_FUNCTION_INFO_ERR; return -1; default: local_err = MAPI_FUNCTION_INFO_ERR; return -1; } } return 0; } #endif if ((flow=flist_get(flowlist,fd))==NULL) { printf("ERROR: Invalid flow %d [%s:%d]\n", fd, __FILE__, __LINE__); local_err = MAPI_INIT_ERROR; return -1; } #ifdef RECONNECT if((f = flist_get(flow->flist, fid)) == NULL){ local_err = MAPI_INVALID_FID_FUNCID; return -1; } #endif qbuf.mtype=1; qbuf.cmd=GET_FUNCTION_INFO; #ifdef RECONNECT qbuf.fd = flow->fd; qbuf.fid = f->fid; qbuf.user_fd = fd; qbuf.user_fid = fid; #else qbuf.fd=fd; qbuf.fid=fid; #endif pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } pthread_spin_unlock(&mapi_lock); if(qbuf.cmd==GET_FUNCTION_INFO_ACK) { memcpy(info,qbuf.data,sizeof(mapi_function_info_t)); info->result_size = mapilh_get_function_def(info->name, info->devtype)->shm_size ; return 0; } else{ local_err= MAPI_FUNCTION_INFO_ERR; return -1; } } int mapi_get_next_function_info(int fd,int fid, mapi_function_info_t *info) { struct mapiipcbuf qbuf; flowdescr_t* flow; #ifdef DIMAPI remote_flowdescr_t* rflow; host_flow* hflow; flist_node_t* fnode; function_data* fdata; #endif #ifdef RECONNECT functdescr_t *f; #endif // pthread_once(&mapi_is_initialized, (void*)mapi_init); if (!minit) { printf("MAPI not initialized! [%s:%d]\n", __FILE__, __LINE__); local_err=MAPI_INIT_ERROR; return -1; } else if(fd<=0 || fid <0 ){ printf("ERROR: Wrong fd (fd: %d) or fid (fid: %d) in mapi_get_next_function_info\n", fd, fid); local_err = MAPI_INVALID_FID_FUNCID; return -1; }else if( info==NULL){ printf("ERROR: NULL argument in mapi_get_next_function_info\n"); local_err = MFUNCT_INVALID_ARGUMENT_3; return -1; } #ifdef DIMAPI if ((rflow=flist_get(remote_flowlist,fd))!=NULL) {//flow is remote for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; hflow->dbuf->cmd=GET_NEXT_FUNCTION_INFO; hflow->dbuf->fd=hflow->fd; if ( (fdata=(function_data*)flist_get(hflow->functions, fid))==NULL ) { local_err = MAPI_FUNCTION_INFO_ERR; return -1; } hflow->dbuf->fid = fdata->fid; hflow->dbuf->length=BASIC_SIZE; } if (mapiipc_remote_write_to_all(rflow)<0){ local_err = MCOM_SOCKET_ERROR; return -1; } //wait results for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; switch(hflow->dbuf->cmd) { case GET_FUNCTION_INFO_ACK: memcpy(info,hflow->dbuf->data,sizeof(mapi_function_info_t)); continue; case ERROR_ACK: local_err = MAPI_FUNCTION_INFO_ERR; return -1; default: local_err = MAPI_FUNCTION_INFO_ERR; return -1; } } return 0; } #endif if((flow = flist_get(flowlist, fd)) == NULL){ printf("ERROR: Invalid flow %d [%s:%d]\n", fd, __FILE__, __LINE__); local_err = MAPI_INIT_ERROR; return -1; } #ifdef RECONNECT if((f = flist_get(flow->flist, fid)) == NULL){ local_err = MAPI_INVALID_FID_FUNCID; return -1; } #endif qbuf.mtype=1; qbuf.cmd=GET_NEXT_FUNCTION_INFO; #ifdef RECONNECT qbuf.fd = flow->fd; qbuf.fid = f->fid; qbuf.user_fd = fd; qbuf.user_fid = fid; #else qbuf.fd=fd; qbuf.fid=fid; #endif pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } pthread_spin_unlock(&mapi_lock); if(qbuf.cmd==GET_FUNCTION_INFO_ACK) { memcpy(info,qbuf.data,sizeof(mapi_function_info_t)); return 0; } else{ local_err= MAPI_FUNCTION_INFO_ERR; return -1; } } int mapi_get_flow_info(int fd, mapi_flow_info_t *info) { struct mapiipcbuf qbuf; flowdescr_t* flow; #ifdef DIMAPI remote_flowdescr_t* rflow; host_flow* hflow; flist_node_t* fnode; #endif if (!minit) { printf("MAPI not initialized! [%s:%d]\n", __FILE__, __LINE__); local_err = MAPI_INIT_ERROR; return -1; } else if(fd<=0){ printf("ERROR: Invalid flow descriptor %d in mapi_get_flow_info\n", fd); local_err = MAPI_INVALID_FLOW; return -1; } if(info == NULL){ local_err=MFUNCT_INVALID_ARGUMENT_2; return -1; } #ifdef DIMAPI if ((rflow=flist_get(remote_flowlist,fd))!=NULL) {//flow is remote for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; hflow->dbuf->cmd=GET_FLOW_INFO; hflow->dbuf->fd=hflow->fd; hflow->dbuf->length=BASIC_SIZE; } if (mapiipc_remote_write_to_all(rflow)<0){ local_err = MCOM_SOCKET_ERROR; return -1; } //wait results for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; switch(hflow->dbuf->cmd) { case GET_FLOW_INFO_ACK: memcpy(info,hflow->dbuf->data,sizeof(mapi_flow_info_t)); info->devid=fd; continue; case ERROR_ACK: local_err = MAPI_FLOW_INFO_ERR; return -1; default: local_err = MAPI_FLOW_INFO_ERR; return -1; } } return 0; } #endif if((flow = flist_get(flowlist, fd)) == NULL){ printf("ERROR: Invalid flow %d [%s:%d]\n", fd, __FILE__, __LINE__); local_err = MAPI_INVALID_FLOW; return -1; } qbuf.mtype=1; qbuf.cmd=GET_FLOW_INFO; #ifdef RECONNECT qbuf.fd = flow->fd; qbuf.user_fd = fd; #else qbuf.fd = fd; #endif pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } pthread_spin_unlock(&mapi_lock); if(qbuf.cmd==GET_FLOW_INFO_ACK) { memcpy(info,qbuf.data,sizeof(mapi_flow_info_t)); return 0; } else { local_err = MAPI_FLOW_INFO_ERR; return -1; } } int mapi_get_next_flow_info(int fd, mapi_flow_info_t *info) { struct mapiipcbuf qbuf; flowdescr_t* flow; #ifdef DIMAPI remote_flowdescr_t* rflow; host_flow* hflow; flist_node_t* fnode; #endif if(fd<=0){ printf("ERROR: Invalid flow descriptor %d in mapi_get_next_flow_info\n", fd); local_err = MAPI_INVALID_FLOW; return -1; } if(info == NULL){ local_err=MFUNCT_INVALID_ARGUMENT_2; return -1; } #ifdef DIMAPI if ((rflow=flist_get(remote_flowlist,fd))!=NULL) {//flow is remote for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; hflow->dbuf->cmd=GET_NEXT_FLOW_INFO; hflow->dbuf->fd=hflow->fd; hflow->dbuf->length=BASIC_SIZE; } if (mapiipc_remote_write_to_all(rflow)<0){ local_err = MCOM_SOCKET_ERROR; return -1; } //wait results for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; switch(hflow->dbuf->cmd) { case GET_FLOW_INFO_ACK: memcpy(info,hflow->dbuf->data,sizeof(mapi_flow_info_t)); info->devid=fd; continue; case ERROR_ACK: local_err = MAPI_FLOW_INFO_ERR; return -1; default: local_err = MAPI_FLOW_INFO_ERR; return -1; } } return 0; } #endif if((flow = flist_get(flowlist, fd)) == NULL){ printf("ERROR: Invalid flow %d [%s:%d]\n", fd, __FILE__, __LINE__); local_err = MAPI_INVALID_FLOW; return -1; } qbuf.mtype=1; qbuf.cmd=GET_NEXT_FLOW_INFO; #ifdef RECONNECT qbuf.fd = flow->fd; qbuf.user_fd = fd; #else qbuf.fd = fd; #endif pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } pthread_spin_unlock(&mapi_lock); if(qbuf.cmd==GET_FLOW_INFO_ACK) { memcpy(info,qbuf.data,sizeof(mapi_flow_info_t)); return 0; } else { local_err = MAPI_FLOW_INFO_ERR; return -1; } } /*int mapi_load_library(const char* library) { struct mapiipcbuf qbuf; if(!minit) mapi_init(); if((local_err=mapilh_load_library(libpath,library))==0) { qbuf.mtype=1; qbuf.cmd=LOAD_LIBRARY; qbuf.fd=getpid(); qbuf.pid=getpid(); strcpy((char *)qbuf.data,library); pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); return -1; } pthread_spin_unlock(&mapi_lock); return 0; } else return -1; }*/ extern int mapi_get_libfunct_info(int libnum, int fnum, mapi_libfunct_info_t *info) { int c; mapidflib_functionlist_t *functs; functs=mapidflib_get_lib_functions(libnum); for(c=0;cnext; if(functs==NULL) return -1; strncpy(info->name,functs->def->name,MAPI_STR_LENGTH); strncpy(info->descr,functs->def->descr,MAPI_STR_LENGTH); strncpy(info->argdescr,functs->def->argdescr,MAPI_STR_LENGTH); return 0; } extern int mapi_get_next_libfunct_info(int libnum, int fnum, mapi_libfunct_info_t *info) { int ret; if(fnum<0) ret=mapi_get_libfunct_info(libnum,0,info); else ret=mapi_get_libfunct_info(libnum,fnum+1,info); return ret; } int mapi_get_library_info(int libid, mapi_lib_info_t *info) { char *name; pthread_once(&mapi_is_initialized, (void*)mapi_init); name=mapidflib_get_lib_name(libid); if(name==NULL) return -1; strncpy(info->libname,name,MAPI_STR_LENGTH); info->id=libid; info->functs=mapidflib_get_lib_numfuncts(libid); return 0; } int mapi_get_next_library_info(int libid, mapi_lib_info_t *info) { int ret; if(libid<0) ret=mapi_get_library_info(0,info); else ret=mapi_get_library_info(libid+1,info); return ret; } int mapi_get_next_device_info(int devid, mapi_device_info_t *info) { struct mapiipcbuf qbuf; #ifdef DIMAPI remote_flowdescr_t* rflow; host_flow* hflow; flist_node_t* fnode; int i=0; #endif // pthread_once(&mapi_is_initialized, (void*)mapi_init); #ifdef DIMAPI if ((rflow=flist_get(remote_flowlist,devid))!=NULL) { //device is for a remote flow for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; hflow->dbuf->cmd=GET_NEXT_DEVICE_INFO; hflow->dbuf->fd=hflow->fd; hflow->dbuf->length=BASIC_SIZE; } if (mapiipc_remote_write_to_all(rflow)<0){ local_err = MCOM_SOCKET_ERROR; return -1; } //wait results for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; switch(hflow->dbuf->cmd) { case GET_DEVICE_INFO_ACK: memcpy(&info[i++],hflow->dbuf->data,sizeof(mapi_device_info_t)); continue; case GET_DEVICE_INFO_NACK: local_err = MAPI_DEVICE_INFO_ERR; return -1; default: local_err = MAPI_DEVICE_INFO_ERR; return -1; } } return 0; } #endif qbuf.mtype=1; qbuf.cmd=GET_NEXT_DEVICE_INFO; qbuf.fd=devid; pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } pthread_spin_unlock(&mapi_lock); if(qbuf.cmd==GET_DEVICE_INFO_ACK) { memcpy(info,qbuf.data,sizeof(mapi_device_info_t)); return 0; } else { local_err = MAPI_DEVICE_INFO_ERR; return -1; } } int mapi_get_device_info(int devid, mapi_device_info_t *info) { struct mapiipcbuf qbuf; #ifdef DIMAPI remote_flowdescr_t* rflow; host_flow* hflow; flist_node_t* fnode; int i=0; #endif // pthread_once(&mapi_is_initialized, (void*)mapi_init); #ifdef DIMAPI if ((rflow=flist_get(remote_flowlist,devid))!=NULL) { //device is for a remote flow for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; hflow->dbuf->cmd=GET_DEVICE_INFO; hflow->dbuf->fd=hflow->fd; hflow->dbuf->length=BASIC_SIZE; } if (mapiipc_remote_write_to_all(rflow)<0){ local_err = MCOM_SOCKET_ERROR; return -1; } //wait results for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; switch(hflow->dbuf->cmd) { case GET_DEVICE_INFO_ACK: memcpy(&info[i++],hflow->dbuf->data,sizeof(mapi_device_info_t)); continue; case GET_DEVICE_INFO_NACK: local_err = MAPI_DEVICE_INFO_ERR; return -1; default: local_err = MAPI_DEVICE_INFO_ERR; return -1; } } return 0; } #endif qbuf.mtype=1; qbuf.cmd=GET_DEVICE_INFO; qbuf.fd=devid; pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } pthread_spin_unlock(&mapi_lock); if(qbuf.cmd==GET_DEVICE_INFO_ACK) { memcpy(info,qbuf.data,sizeof(mapi_device_info_t)); return 0; } else { local_err = MAPI_DEVICE_INFO_ERR; return -1; } } #ifdef WITH_ADMISSION_CONTROL /* Auth data format pubkey + credentials + (unsigned int)encrypted nonce len + encrypted nonce */ static int ipc_send_authdata(int fd,const unsigned char *pubkey,const unsigned char *credentials,const unsigned char *enc_nonce,size_t len) { struct mapiipcbuf qbuf; size_t pk_sz,creds_sz; flowdescr_t* flow; if (!minit) { printf("MAPI not initialized! [%s:%d]\n", __FILE__, __LINE__); local_err = MAPI_INIT_ERROR; return -1; } if ((flow=flist_get(flowlist,fd))==NULL) { printf("ERROR: Invalid flow %d [%s:%d]\n", fd, __FILE__, __LINE__); local_err = MAPI_INVALID_FLOW; return -1; } qbuf.mtype = 1; qbuf.cmd = SET_AUTHDATA; #ifdef RECONNECT qbuf.fd = flow->fd; #else qbuf.fd = fd; #endif qbuf.pid = getpid(); pk_sz = strlen(pubkey) + 1; creds_sz = strlen(credentials) + 1; if ( (pk_sz + creds_sz + len + sizeof(unsigned int)) > DATA_SIZE ) { printf("ERROR: ipc_send_authdata: the size of your authentication & authorisation data exceeds the maximum allows %d bytes [%s:%d]\n", DATA_SIZE, __FILE__, __LINE__); return -1; } strcpy(qbuf.data,pubkey); strcpy((qbuf.data+pk_sz),credentials); *(size_t *)(qbuf.data+pk_sz+creds_sz) = len; memcpy((qbuf.data+pk_sz+creds_sz+sizeof(unsigned int)),enc_nonce,len); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; return -1; } /* no ack? - espenb */ return 0; } static int read_file(const char *fn,bytestream *bs) { int fp,r; if ( (fp = open(fn,O_RDONLY)) < 0 ) return -1; r = read(fp,bs->data,bs->length); close(fp); return r; } #define MAX_PRIVKEY_SIZE 6000 static size_t encrypt_nonce(unsigned char **enc_nonce,unsigned int nonce,const char *priv) { bytestream privkey; char *pkstring; struct keynote_deckey priv_dk; RSA *rsa; int enc_len = 0; BS_NEW(privkey,MAX_PRIVKEY_SIZE); if ( BS_ISNULL(privkey) ) { errno = ENOMEM; return 0; } if ( read_file(priv,&privkey) < 0 ) goto ret; if ( (pkstring = kn_get_string(privkey.data)) == NULL ) goto ret; // Decode private key if ( kn_decode_key(&priv_dk,pkstring,KEYNOTE_PRIVATE_KEY) != 0 ) goto ret; if ( priv_dk.dec_algorithm != KEYNOTE_ALGORITHM_RSA ) goto dec_error; rsa = (RSA *)priv_dk.dec_key; // Allocate memory if ( (*enc_nonce = malloc(RSA_size(rsa))) == NULL ) { errno = ENOMEM; goto dec_error; } enc_len = RSA_private_encrypt(sizeof(unsigned int),(unsigned char *)&nonce, *enc_nonce,rsa,RSA_PKCS1_PADDING); if ( enc_len <= 0 ) { free(*enc_nonce); enc_len = 0; } dec_error: kn_free_key(&priv_dk); ret: BS_FREE(privkey); return (size_t)enc_len; } int mapi_set_authdata(int fd,const char *pub,const char *priv,const char *creds) { struct mapiipcbuf qbuf; bytestream pubkey,credentials; int ret = -1; size_t enc_len; unsigned char *encrypted_nonce = NULL; flowdescr_t* flow; #ifdef DIMAPI remote_flowdescr_t *rflow; host_flow* hflow; flist_node_t* fnode; size_t pk_sz,creds_sz, tot_size; unsigned char is_remote=0; #endif if (!minit) { printf("MAPI not initialized! [%s:%d]\n", __FILE__, __LINE__); local_err = MAPI_INIT_ERROR; return -1; } if(fd<=0){ printf("ERROR: Wrong fd (fd: %d) in mapi_set_authdata\n", fd); local_err = MAPI_INVALID_FLOW; return -1; } #ifdef DIMAPI if ((rflow=flist_get(remote_flowlist,fd))!=NULL) { is_remote=1; } else #endif if ((flow=flist_get(flowlist,fd))==NULL) { printf("ERROR: Invalid flow %d [%s:%d]\n", fd, __FILE__, __LINE__); local_err = MAPI_INVALID_FLOW; return -1; } if ( fd < 0 ) return -1; // Allocate memory to hold data in memory BS_NEW(pubkey,4096); BS_NEW(credentials,4096); if ( BS_ISNULL(pubkey) || BS_ISNULL(credentials) ) goto fail; // Read data in memory if ( read_file(pub,&pubkey) < 0 ) goto fail; if ( read_file(creds,&credentials) < 0 ) goto fail; #ifdef DIMAPI if(is_remote){ pk_sz = strlen(pubkey.data) + 1; creds_sz = strlen(credentials.data) + 1; tot_size = pk_sz + creds_sz + sizeof(unsigned int); for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; // Encrypt nonce if ( (enc_len = encrypt_nonce(&encrypted_nonce,(unsigned int)hflow->fd,priv)) == 0 ) goto fail; if ( (tot_size + enc_len ) > DATA_SIZE ) { printf("ERROR: mapi_set_authdata: the size of your authentication & authorisation data exceeds the maximum allows %d bytes [%s:%d]\n", DATA_SIZE, __FILE__, __LINE__); goto fail; } hflow->dbuf->cmd = SET_AUTHDATA; hflow->dbuf->fd = hflow->fd; strcpy(hflow->dbuf->data,pubkey.data); strcpy((hflow->dbuf->data+pk_sz),credentials.data); *(size_t *)(hflow->dbuf->data+pk_sz+creds_sz) = enc_len; memcpy((hflow->dbuf->data+tot_size),encrypted_nonce,enc_len); hflow->dbuf->length = BASIC_SIZE + tot_size + enc_len; } if (mapiipc_remote_write_to_all(rflow)<0){ local_err = MCOM_SOCKET_ERROR; return -1; } ret = 0; for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; switch(hflow->dbuf->cmd) { case SET_AUTHDATA_ACK: //OK from this host - ret remains as is break; case ERROR_ACK: ret = -1; break; default: ret = -1; break; } } goto fail; } #endif // Encrypt nonce if ( (enc_len = encrypt_nonce(&encrypted_nonce,(unsigned int)fd,priv)) == 0 ) goto fail; // Send the data pthread_spin_lock(&mapi_lock); if ( ipc_send_authdata(fd,pubkey.data,credentials.data,encrypted_nonce,enc_len) != 0 ) goto fail; // Wait for ACK if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } switch( qbuf.cmd ) { case SET_AUTHDATA_ACK: ret = 0; break; case ERROR_ACK: locar_err = qbuf.remote_errorcode; break; default: local_err = MCOM_UNKNOWN_ERROR; break; } fail: BS_FREE(pubkey); BS_FREE(credentials); if ( encrypted_nonce != NULL ) free(encrypted_nonce); #ifdef DIMAPI if(is_remote) { pthread_spin_unlock(&mapi_lock); return ret;//just to avoid the spin_unlock } #endif pthread_spin_unlock(&mapi_lock); return ret; } #ifdef DIMAPI int agent_send_authdata(void *authdata) //sends authdata from agent to mapid { struct mapiipcbuf qbuf; size_t pk_p, creds_p; struct dmapiipcbuf *dbuf = (struct dmapiipcbuf *)authdata; pk_p = strlen(dbuf->data) + 1; creds_p = strlen(dbuf->data + pk_p) + 1; pthread_spin_lock(&mapi_lock); if ( ipc_send_authdata( dbuf->fd, dbuf->data, dbuf->data+pk_p, dbuf->data+pk_p+creds_p+sizeof(size_t), *(size_t *)(dbuf->data + pk_p + creds_p) ) != 0 ) { pthread_spin_unlock(&mapi_lock); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } switch(qbuf.cmd){ case SET_AUTHDATA_ACK: pthread_spin_unlock(&mapi_lock); return 0; case ERROR_ACK: pthread_spin_unlock(&mapi_lock); local_err = qbuf.remote_errorcode; return -1; default: pthread_spin_unlock(&mapi_lock); local_err = MCOM_UNKNOWN_ERROR; return -1; } return -1; } #endif //DIMAPI #else int mapi_set_authdata(MAPI_UNUSED int fd,MAPI_UNUSED const char *pub,MAPI_UNUSED const char *priv,MAPI_UNUSED const char *creds) { // We don't need to do anything return 0; } #endif //WITH_ADMISSION_CONTROL #ifdef WITH_AUTHENTICATION #ifdef DIMAPI /* * Stub function to authenticate a user. * Arguments: * fd - a flow descriptor, * username - a NULL-terminated string containing the username * password - a NULL-terminated string containing the password (yes, in clear text) * vo - a string containing the VO to authenticate the user against * * Returns 0 on success, or not on failure. */ int mapi_authenticate(int fd, const char *username, const char *password, const char *vo) { //flowdescr_t *flow; remote_flowdescr_t *rflow; host_flow *hflow; flist_node_t *lnode; if(!minit) { printf("ERROR: DIMAPI is not initialised [%s:%d]\n", __FILE__, __LINE__); local_err = MAPI_INIT_ERROR; return(-1); } /* * Before we start, some trivial checks. */ if(fd<=0){ printf("ERROR: Wrong fd (fd: %d) in mapi_authenticate\n", fd); local_err = MAPI_INVALID_FLOW; return -1; } if(!username || !password || !vo) return(-1); // XXX: Omg ugly ugly ugly if(strlen(username) > MAX_DATA_SIZE || strlen(password) > MAX_DATA_SIZE || strlen(vo) > MAX_DATA_SIZE) { return(-1); } /* * Methinks that if the flow is not remote, * no authentication should take place. * I mean, who on this bloody earth is going to * stop an admin from monitoring whatever he wants * on his own box? * * I for one, will not. */ if((rflow = flist_get(remote_flowlist, fd)) != NULL) { int len = 0; for((lnode = flist_head(rflow->host_flowlist)) ; lnode != NULL; lnode = flist_next(lnode)) { hflow = (host_flow *)lnode->data; hflow->dbuf->cmd = AUTHENTICATE; hflow->dbuf->fd = hflow->fd; len += sprintf(hflow->dbuf->data + len, "%s", username) + 1; len += sprintf(hflow->dbuf->data + len, "%s", password) + 1; len += sprintf(hflow->dbuf->data + len, "%s", vo) + 1; hflow->dbuf->length = BASIC_SIZE + len; } if (mapiipc_remote_write_to_all(rflow)<0){ local_err = MCOM_SOCKET_ERROR; return -1; } // And wait for result.. for(lnode = flist_head(rflow->host_flowlist); lnode != NULL; lnode = flist_next(lnode)) { hflow = (host_flow *)lnode->data; switch(hflow->dbuf->cmd) { case AUTHENTICATE_ACK: rflow->is_authenticated = 1; rflow->username = malloc(sizeof(char) * (strlen(username) + 1)); rflow->vo = malloc(sizeof(char) * (strlen(vo) + 1)); snprintf(rflow->username, MAX_DATA_SIZE, "%s", username); snprintf(rflow->vo, MAX_DATA_SIZE, "%s", vo); #ifdef RECONNECT rflow->password = (char *)malloc(sizeof(char) * (strlen(password) + 1)); snprintf(rflow->password, MAX_DATA_SIZE, "%s", password); #endif return(0); break; case ERROR_ACK: rflow->is_authenticated = 0; rflow->username = NULL; rflow->vo = NULL; #ifdef RECONNECT rflow->password = NULL; #endif return(-1); break; default: rflow->is_authenticated = 0; rflow->username = NULL; rflow->vo = NULL; #ifdef RECONNECT rflow->password = NULL; #endif return(-1); } } } else { /* * Now what? */ return(0); } return(-1); } /* * mfukar * * Forward data from agent to mapid. */ int agent_authenticate(void *data) { struct dmapiipcbuf *dbuf = (struct dmapiipcbuf *)data; char *offset = dbuf->data; offset += strlen(offset); offset += strlen(offset); struct mapiipcbuf qbuf; qbuf.mtype = 1; // XXX whatever? qbuf.cmd = AUTHENTICATE; qbuf.fd = dbuf->fd; // qbuf.pid = getpid(); memcpy(qbuf.data, dbuf->data, DATA_SIZE); pthread_spin_lock(&mapi_lock); if (mapiipc_write(&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } if (mapiipc_read(&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } switch(qbuf.cmd) { case AUTHENTICATE_ACK: pthread_spin_unlock(&mapi_lock); return(0); case ERROR_ACK: pthread_spin_unlock(&mapi_lock); return(-1); default: pthread_spin_unlock(&mapi_lock); return(-1); } return(-1); } #endif #endif #ifdef DIMAPI static int hostcmp(void *h1, void *h2) { struct host *h = (struct host *)h1; char *s = (char *)h2; return strcmp(h->hostname, s); } #endif //global var access functions int get_numflows() { int n; pthread_spin_lock(&numflows_lock); n = numflows; pthread_spin_unlock(&numflows_lock); return n; } // increases numflows and returns its new value int incr_numflows() { int n; pthread_spin_lock(&numflows_lock); n = ++numflows; pthread_spin_unlock(&numflows_lock); return n; } // decreases numflows and returns its new value int decr_numflows() { int n; pthread_spin_lock(&numflows_lock); n = --numflows; pthread_spin_unlock(&numflows_lock); return n; } int get_totalflows() { int n; pthread_spin_lock(&numflows_lock); n = totalflows; pthread_spin_unlock(&numflows_lock); return n; } // increases totalflows and returns its new value int incr_totalflows() { int n; pthread_spin_lock(&numflows_lock); n = ++totalflows; pthread_spin_unlock(&numflows_lock); return n; } #ifdef DIMAPI void set_agent(); void set_agent() { agent=1; } #endif int mapi_get_scope_size(int fd) { #ifdef DIMAPI remote_flowdescr_t* rflow=(remote_flowdescr_t*)flist_get(remote_flowlist, fd); if (rflow!=NULL) return rflow->scope_size; #endif if ( flist_get(flowlist, fd)!=NULL ) return 1; else { local_err = MAPI_INVALID_FLOW; return -1; } } int mapi_is_remote(int fd) { #ifdef DIMAPI remote_flowdescr_t* rflow=(remote_flowdescr_t*)flist_get(remote_flowlist, fd); if (rflow!=NULL) { return 1; } #endif if (flist_get(flowlist, fd)!=NULL ) { return 0; } else { local_err = MAPI_INVALID_FLOW; return -1; } } // if mapid and/or mapicommd are out of execution returns 1, otherwise returns 0 int mapi_is_sensor_down(int fd){ #ifdef DIMAPI #ifdef RECONNECT remote_flowdescr_t *rflow = (remote_flowdescr_t *)flist_get(remote_flowlist, fd); if(rflow != NULL && rflow->daemons_down == 1) return 1; else if (rflow!=NULL) return 0; #else return 0; // without reconnect always up ?? #endif #endif #ifdef RECONNECT flowdescr_t *flow = flist_get(flowlist, fd); int check_mapid; if (flow==NULL) return 0; check_mapid = check_network_mapid(); if(check_mapid == 1) { //up flow->mapid_down = 0; return 0; } else return 1; #else return 0; // without reconnect always up ?? #endif } int mapi_apply_function_array(int fd, const char* funct, char** args, unsigned int argn) //Apply function to a mapi flow //fd: flow descriptor //funct: function to be added { #ifdef RECONNECT struct mapiipcbuf qbuf = {-1, -1, -1, -1, -1, "", -1, -1, -1, 0, "", "", -1}; // need initialization #else struct mapiipcbuf qbuf = {-1, -1, -1, "", -1, -1, -1, 0, "", "", -1}; // need initialization #endif int numfd = 0, tmp, un_id = 0, curr_arg = 0; unsigned long long ltmp; unsigned int arg_size=0; //only used in dimapi - declared here to avoid multiple ifdefs later char *argdescr_ptr, *filename, *temp, ctmp, *tmp_fname; char *fids; mapidflib_function_def_t *fdef; functdescr_t *f; mapiFunctArg *pos; flowdescr_t *flow; #ifdef DIMAPI unsigned char is_remote=0; remote_flowdescr_t *rflow, *ref_flow; host_flow* hflow; function_data *fdata; flist_node_t* fnode; int i; char buf[DATA_SIZE], *cfids, *s, *new_fids; int fid_, fd_, tmp_fd, tmp_fid, len = 0; #ifdef RECONNECT struct mapiipcbuf qbuf_ = {-1, -1, -1, -1, -1, "", -1, -1, -1, 0, "", "", -1}; // need initialization mapiFunctArg *pos_; #endif #endif if (!minit) { printf("MAPI not initialized! [%s:%d]\n", __FILE__, __LINE__); local_err = MAPI_INIT_ERROR; return -1; } else if(fd<=0){ printf("ERROR: Wrong fd (fd: %d) in mapi_apply_function_array\n", fd); local_err = MAPI_INVALID_FID_FUNCID; return -1; } if(funct==NULL){ printf("ERROR: NULL function in mapi_apply_function_array\n"); local_err = MFUNCT_COULD_NOT_APPLY_FUNCT; return -1; } #ifdef DIMAPI if ((rflow=flist_get(remote_flowlist,fd))!=NULL) { if(rflow->is_connected){ printf("ERROR: Can not apply function on an already connected flow\n"); local_err = MFUNCT_COULD_NOT_APPLY_FUNCT; return -1; } is_remote = 1;//indicates that flow is remote } else { #endif if ((flow=flist_get(flowlist,fd))==NULL) { printf("ERROR: Invalid flow %d [%s:%d]\n", fd, __FILE__, __LINE__); local_err = MAPI_INVALID_FLOW; return -1; } if(flow->is_connected){ printf("ERROR: Can not apply function on an already connected flow\n"); local_err = MFUNCT_COULD_NOT_APPLY_FUNCT; return -1; } #ifdef DIMAPI } if (is_remote) fdef=mapilh_get_function_def(funct,"1.3"); else #endif //Get information about function fdef=mapilh_get_function_def(funct,flow->devtype); if(fdef==NULL) { printf("ERROR: Could not find/match function %s [%s:%d]\n", funct, __FILE__, __LINE__); local_err = MAPI_FUNCTION_NOT_FOUND; return -1; } #ifdef DIMAPI if(is_remote){//flow is remote for (fnode=flist_head(rflow->host_flowlist), i=1; fnode!=NULL; fnode=flist_next(fnode), i++) { hflow=(host_flow*)fnode->data; hflow->dbuf->cmd=APPLY_FUNCTION; hflow->dbuf->fd=hflow->fd; memcpy(hflow->dbuf->data, funct, strlen(funct)+1);//put function name in the buffer pos = qbuf.data; // point to start of arguments buffer arg_size = 0; #ifdef RECONNECT pos_ = qbuf_.data; // point to start of arguments buffer #endif if ( strlen(fdef->argdescr) != argn ) { local_err = MFUNCT_INVALID_ARGUMENT; return -1; } curr_arg=0; // parse function arguments if(strncmp(fdef->argdescr, "", 1)) { // there are some args argdescr_ptr = fdef->argdescr; while(strlen(argdescr_ptr) > 0){ switch(*argdescr_ptr){ case 's': temp=args[curr_arg]; curr_arg++; addarg(&pos, temp, STRING); #ifdef RECONNECT addarg(&pos_, temp, STRING); #endif arg_size+=strlen(temp)+1; break; case 'S': // reference to flows and functions (e.g RES2FILE) fids = args[curr_arg]; curr_arg++; #ifdef RECONNECT addarg(&pos_, fids, STRING); #endif // allocate more bytes, because we can have the following case: 7@2,8@2,9@2 translated to 12@3,13@3,14@3 new_fids = (char *)malloc(strlen(fids) * 10 * sizeof(char)); strncpy(buf, fids, DATA_SIZE); cfids = buf; while( (s = strchr(cfids, ',')) != NULL){ *s = '\0'; sscanf(cfids, "%d@%d", &fid_, &fd_); ref_flow = flist_get(remote_flowlist, fd_); if(ref_flow == NULL || i > ref_flow->scope_size){ printf("ERROR: Invalid flow in function arguments [%s:%d]\n", __FILE__, __LINE__); local_err = MAPI_INVALID_FID_FUNCID; return -1; } tmp_fd = ((host_flow *)flist_get(ref_flow->host_flowlist, i))->fd; fdata = flist_get(hflow->rhost->functions, fid_); if(fdata == NULL){ printf("ERROR: Invalid fid in function arguments [%s:%d]\n", __FILE__, __LINE__); local_err = MAPI_INVALID_FID_FUNCID; return -1; } tmp_fid = fdata->fid; if(len != 0) len += sprintf(new_fids + len, ","); len += sprintf(new_fids + len, "%d", tmp_fid); len += sprintf(new_fids + len, "@"); len += sprintf(new_fids + len, "%d", tmp_fd); cfids = s + 1; } sscanf(cfids, "%d@%d", &fid_, &fd_); ref_flow = flist_get(remote_flowlist, fd_); if(ref_flow == NULL || i > ref_flow->scope_size){ printf("ERROR: Invalid flow in function arguments [%s:%d]\n", __FILE__, __LINE__); local_err = MAPI_INVALID_FID_FUNCID; return -1; } tmp_fd = ((host_flow *)flist_get(ref_flow->host_flowlist, i))->fd; fdata = flist_get(hflow->rhost->functions, fid_); if(fdata == NULL){ printf("ERROR: Invalid fid in function arguments [%s:%d]\n", __FILE__, __LINE__); local_err = MAPI_INVALID_FID_FUNCID; return -1; } tmp_fid = fdata->fid; if(len != 0) len += sprintf(new_fids + len, ","); len += sprintf(new_fids + len, "%d", tmp_fid); len += sprintf(new_fids + len, "@"); len += sprintf(new_fids + len, "%d", tmp_fd); addarg(&pos, new_fids, STRING); arg_size += strlen(new_fids) + 1; len = 0; free(new_fids); new_fids = NULL; break; case 'i': tmp=atoi(args[curr_arg]); curr_arg++; addarg(&pos, &tmp, INT); #ifdef RECONNECT addarg(&pos_, &tmp, INT); #endif arg_size+=sizeof(int); break; case 'r': // reference to a flow tmp=atoi(args[curr_arg]); curr_arg++; #ifdef RECONNECT addarg(&pos_, &tmp, INT); // fd given to user ... #endif ref_flow=flist_get(remote_flowlist, tmp); if (ref_flow==NULL || i>ref_flow->scope_size){ printf("ERROR: Invalid flow in function arguments [%s:%d]\n", __FILE__, __LINE__); local_err = MAPI_INVALID_FID_FUNCID; return -1; } tmp=((host_flow*)flist_get(ref_flow->host_flowlist, i))->fd; addarg(&pos, &tmp, INT); arg_size+=sizeof(int); break; case 'f': // reference to a fuction tmp=atoi(args[curr_arg]); curr_arg++; #ifdef RECONNECT addarg(&pos_, &tmp, INT); // fid given to user ... #endif fdata=flist_get(hflow->rhost->functions, tmp); if (fdata==NULL){ printf("ERROR: Invalid fid in function arguments [%s:%d]\n", __FILE__, __LINE__); local_err = MAPI_INVALID_FID_FUNCID; return -1; } tmp=fdata->fid; addarg(&pos, &tmp, INT); arg_size+=sizeof(int); break; case 'c': ctmp=args[curr_arg][0]; curr_arg++; addarg(&pos, &ctmp, CHAR); #ifdef RECONNECT addarg(&pos_, &ctmp, CHAR); #endif arg_size+=sizeof(char); break; case 'l': ltmp = (unsigned long long)atoll(args[curr_arg]); curr_arg++; addarg(&pos, <mp, UNSIGNED_LONG_LONG); #ifdef RECONNECT addarg(&pos_, <mp, UNSIGNED_LONG_LONG); #endif arg_size+=sizeof(unsigned long long); break; case 'w': // Open file for writing filename=args[curr_arg]; if (filename==NULL){ local_err = MAPI_ERROR_FILE; return -1; } curr_arg++; if (filename==NULL){ local_err = MAPI_ERROR_FILE; return -1; } addarg(&pos, filename, STRING); #ifdef RECONNECT addarg(&pos_, filename, STRING); #endif arg_size+=strlen(filename)+1; break; default: local_err=MFUNCT_INVALID_ARGUMENT_DESCRIPTOR; printf("ERROR: Illegal argument descriptor %c [%s:%d]\n", *argdescr_ptr, __FILE__, __LINE__); return -1; } argdescr_ptr++; // move to the next arg } } memcpy(hflow->dbuf->data+strlen(funct)+1, qbuf.data, arg_size); //argument size hflow->dbuf->length=BASIC_SIZE + strlen(funct) + 1 + arg_size; } if (mapiipc_remote_write_to_all(rflow)<0){ local_err = MCOM_SOCKET_ERROR; return -1; } fidseed++; for (fnode=flist_head(rflow->host_flowlist), i = 1; fnode!=NULL; fnode=flist_next(fnode), i++) { hflow=(host_flow*)fnode->data; switch(hflow->dbuf->cmd) { case APPLY_FUNCTION_ACK: fdata=(function_data*)malloc(sizeof(function_data)); fdata->fid=hflow->dbuf->fid; fdata->fdef=fdef; #ifdef RECONNECT memcpy(fdata->args, qbuf_.data, arg_size); fdata->hflow = hflow; fdata->index = i; #endif flist_append(hflow->functions, fidseed, fdata); flist_append(hflow->rhost->functions, fidseed, fdata); break; case ERROR_ACK: memcpy(&local_err, hflow->dbuf->data, sizeof(int)); return -1; default: local_err = MCOM_UNKNOWN_ERROR; return -1; } } return fidseed; } #endif pos = qbuf.data; // point to start of arguments buffer if ( strlen(fdef->argdescr) != argn ) { local_err = MFUNCT_INVALID_ARGUMENT; return -1; } curr_arg=0; // parse function arguments if(strncmp(fdef->argdescr, "", 1)) { // there are some args argdescr_ptr = fdef->argdescr; while(strlen(argdescr_ptr) > 0){ switch(*argdescr_ptr) { case 's': temp=args[curr_arg]; curr_arg++; addarg(&pos, temp, STRING); arg_size+=strlen(temp)+1; break; case 'S': fids = args[curr_arg]; curr_arg++; addarg(&pos, fids, STRING); arg_size += strlen(fids) + 1; break; case 'i': tmp=atoi(args[curr_arg]); curr_arg++; addarg(&pos, &tmp, INT); arg_size+=sizeof(int); break; case 'r': tmp=atoi(args[curr_arg]); curr_arg++; addarg(&pos, &tmp, INT); arg_size+=sizeof(int); break; case 'f': tmp=atoi(args[curr_arg]); curr_arg++; addarg(&pos, &tmp, INT); arg_size+=sizeof(int); break; case 'c': ctmp=args[curr_arg][0]; curr_arg++; addarg(&pos, &ctmp, CHAR); arg_size+=sizeof(char); break; case 'l': ltmp=(unsigned long long)atoll(args[curr_arg]); curr_arg++; addarg(&pos, <mp, UNSIGNED_LONG_LONG); arg_size+=sizeof(unsigned long long); break; case 'w': //Open file for writing filename=args[curr_arg]; curr_arg++; if (agent == 1) { tmp=open(filename, O_WRONLY|O_TRUNC|O_CREAT|O_EXCL|O_LARGEFILE,S_IRUSR|S_IWUSR); while(tmp==-1 && errno==EEXIST) { asprintf(&tmp_fname, "%s-%d", filename, un_id++); tmp=open(tmp_fname, O_WRONLY|O_TRUNC|O_CREAT|O_EXCL|O_LARGEFILE,S_IRUSR|S_IWUSR); free(tmp_fname); } } else { tmp=open(filename,O_WRONLY|O_TRUNC|O_CREAT|O_LARGEFILE,S_IRUSR|S_IWUSR); } if(tmp==-1) { printf("ERROR: Can not create file: %s [%s:%d]\n", filename, __FILE__, __LINE__); local_err=MAPI_ERROR_FILE; return -1; } printf("Created file %s for writing\n", filename); flow->fds[flow->numfd++] = tmp; numfd++; addarg(&pos, &tmp, INT); break; default: local_err=MFUNCT_INVALID_ARGUMENT_DESCRIPTOR; printf("ERROR: Illegal argument descriptor %c [%s:%d]\n",*argdescr_ptr, __FILE__, __LINE__); return -1; } argdescr_ptr++; // move to the next arg } } qbuf.mtype=1; qbuf.cmd=APPLY_FUNCTION; #ifdef RECONEECT qbuf.fd = flow->fd; qbuf.user_fd = fd; #else qbuf.fd = fd; #endif qbuf.pid=getpid(); strncpy(qbuf.function,funct,FUNCT_NAME_LENGTH); strncpy((char *)qbuf.argdescr,fdef->argdescr,ARG_LENGTH); pthread_spin_lock(&mapi_lock); if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) { local_err = MCOM_SOCKET_ERROR; pthread_spin_unlock(&mapi_lock); return -1; } if(!send_fd(flow->fds,numfd)) { local_err=MAPI_ERROR_SEND_FD; pthread_spin_unlock(&mapi_lock); return -1; } if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) { pthread_spin_unlock(&mapi_lock); local_err = MCOM_SOCKET_ERROR; return -1; } pthread_spin_unlock(&mapi_lock); switch(qbuf.cmd) { case APPLY_FUNCTION_ACK: break; case ERROR_ACK: local_err =qbuf.remote_errorcode; return -1; default: local_err = MCOM_UNKNOWN_ERROR; return -1; } fdef=mapilh_get_function_def(funct,qbuf.function); f=malloc(sizeof(functdescr_t)); f->fid=qbuf.fid; f->def=fdef; f->result_init=0; f->data=NULL; f->result=NULL; f->funct=malloc(sizeof(mapidflib_function_t)); #ifdef RECONNECT f->funct->fd = flow->fd; #else f->funct->fd = fd; #endif f->funct->fid=qbuf.fid; f->funct->instance=malloc(sizeof(mapidflib_function_instance_t)); f->funct->instance->status=MAPIFUNC_UNINIT; f->funct->instance->hwinfo=NULL; f->funct->instance->result.data = NULL; f->funct->instance->result.data_size = 0; f->funct->instance->result.info.funct_res_size = 0; f->funct->instance->result.info.shm.res_size = 0; f->funct->instance->result.info.shm.buf_size = 0; f->funct->instance->internal_data=NULL; memcpy(f->funct->instance->args,qbuf.data,FUNCTARGS_BUF_SIZE); #ifdef PRECLASSIFICATION f->funct->instance->preclassification_mask = 0; /* not a showstopper func */ #endif #ifdef RECONNECT f->flow = flow; f->numfd = numfd; flist_append(function_list, qbuf.fid, f); #endif flist_append(flow->flist,qbuf.fid,f); return qbuf.fid; } int mapi_stats(const char *dev, struct mapi_stat *stats) { struct mapiipcbuf qbuf; #ifdef DIMAPI char *hostname=NULL, *s=NULL, *k=NULL; struct host *h=NULL; char *devp; struct dmapiipcbuf dbuf; int seed=0; int i; #endif if(dev==NULL){ printf("ERROR: Wrong device name given (NULL) in mapi_stats\n"); local_err = MAPI_DEVICE_INFO_ERR; return -1; } //check if flow is remote or not and call the appropriate init function #ifdef DIMAPI if ((s = strchr(dev,':'))!=NULL) { devp=strdup(dev); k=strtok(devp, ", "); sem_init(&stats_sem, 0, 0); while (k!=NULL) { if ((s = strchr(k,':'))!=NULL) { *s = '\0'; hostname = k; k = s + 1; pthread_spin_lock(&hostlist_lock); h = (struct host *)flist_search(hostlist, hostcmp, hostname); if(h==NULL){// Our host is a new one --> insert it in the hostlist h = (struct host *)malloc(sizeof(struct host)); h->hostname = strdup(hostname); h->port = dimapi_port; h->flows = (flist_t *)malloc(sizeof(flist_t)); flist_init(h->flows); h->functions = (flist_t *)malloc(sizeof(flist_t)); flist_init(h->functions); h->num_flows=0; h->stats=NULL; #ifdef RECONNECT sem_init(&h->connection, 0, 0); // initialize semaphore #endif // Create the socket if (mapiipc_remote_init(h)<0) { local_err = MCOM_SOCKET_ERROR; printf("ERROR: Could not connect with host %s [%s:%d]\n", h->hostname, __FILE__, __LINE__); pthread_spin_unlock(&hostlist_lock); return -1; } h->comm_thread=(pthread_t *)malloc(sizeof(pthread_t)); pthread_create(h->comm_thread, NULL, *mapiipc_comm_thread, h); flist_append(hostlist, h->sockfd, h); pthread_spin_unlock(&hostlist_lock); } else{//host exists in the list pthread_spin_unlock(&hostlist_lock); } if (h->stats==NULL) { h->stats = (flist_t *)malloc(sizeof(flist_t)); flist_init(h->stats); } strncpy(stats[seed].hostname,hostname,MAPI_STR_LENGTH); strncpy(stats[seed].dev,k,MAPI_STR_LENGTH); flist_append(h->stats,seed,&stats[seed]); dbuf.cmd=MAPI_STATS; strncpy((char *)dbuf.data,k,DATA_SIZE); dbuf.length=BASIC_SIZE+strlen(k)+1; if (mapiipc_remote_write(&dbuf, h)<0) return -1; seed++; } k=strtok(NULL,", "); } free(devp); //wait for results for (i=0; ihostname,"localhost",MAPI_STR_LENGTH); strncpy(stats->dev,dev,MAPI_STR_LENGTH); return 1; case MAPI_STATS_ERR: local_err=qbuf.remote_errorcode; return -1; default: local_err=MAPI_STATS_ERROR; return -1; } }