#ifdef HAVE_CONFIG_H #include #endif #include #include #include #include #include #include #include #include #include #include #include #include #include "mapiipc.h" #include "debug.h" #include "mapidflib.h" #include "devgroupdb.h" #include "mapi_errors.h" #define HAVE_MSGHDR_MSG_CONTROL 1 #ifdef DIMAPI #include #include #include #include "flist.h" #endif #ifdef RECONNECT #include #endif // this file contains all the client-side IPC related functions [ and functions for reconnection purposes ] static int sock; static int mapidaddr_len; static struct sockaddr_un mapidaddr; static char* mapidsocket = NULL; static char* mapidsocketglobal = NULL; flist_t *flowlist = NULL; #ifdef DIMAPISSL static SSL_CTX *ctx; void sigpipe_handle(); #endif #ifdef RECONNECT #ifdef DIMAPI int check_network_mapicommd(struct host *h); // checks if network (mapicommd) is up. Returns 1 if network is up, 0 otherwise void restore_network_mapicommd(struct host *h); // restores the connection to mapicommd in case of a breakdown, using back-off mechanism void mapi_recreate_flow(struct host *h); // recreates all flows, that host which broke down the connection had created void mapi_reapply_function(struct host *h); // reapplies all functions, that host which broke down the connection had applyed void mapi_reconnect(struct host *h); // checks which flows of the specified host, had called mapi_connect() function #ifdef WITH_AUTHENTICATION void mapi_reauthenticate(struct host *h); // checks which flows of the specified host, had called mapi_authenticate() function #endif void check_mapi_functions(struct host *h); // checks all functions that can be called, when a break down happens void mapi_get_next_packet(struct host *h); // checks which flows of the specified host, had called mapi_get_next_pkt() function typedef struct function_data{ int fid; // real fid returned from mapicommd mapidflib_function_def_t *fdef; // function definition host_flow *hflow; // flow of a specified host char args[DIMAPI_DATA_SIZE]; // function's arguments int index; // need for functions that have arguments that reference to a flow } function_data; #endif void restore_network_mapid(void); // restores the connection to mapid in case of a breakdown, using back-off mechanism void mapi_create_offline_device_mapid(void); // this function recreates all offline devices, that have been created and not deleted void mapi_recreate_flow_mapid(void); // recreates all flows, on-line and off-line, that have been created and not closed void mapi_reapply_function_mapid(void); // reapplies all functions, that have been applyed to flows that have not been closed void mapi_reconnect_mapid(void); // checks which flows, had called mapi_connect() function and not mapi_close_flow() void mapi_start_offline_device_mapid(void); // checks which devices, had called mapi_start_offline_device() function and not mapi_delete_offline_device() void mapi_read_results_mapid(void); // initializes shared memory segments for new results of a function, after reconnection typedef struct flowdescr{ int fd; int file; // file descriptor for offline flows int fds[256]; // file descriptors int numfd; // number of file descriptors char *devtype; char *device; int read_results_flag; int mapid_down; char *shm_base; pthread_mutex_t *shm_spinlock; flist_t *flist; unsigned char is_connected; // this should be 1 if the flow is connected, 0 otherwise int format; // this should be MFF_PCAP or MFF_DAG_ERF if the flow is offline, -1 otherwise } flowdescr_t; typedef struct functdescr{ int fid; short result_init; mapidflib_function_def_t *def; mapidflib_function_t *funct; void *data; mapi_results_t *result; flowdescr_t *flow; int numfd; } functdescr_t; typedef struct offline_device{ char *path; // specifies the name of the trace file to open char *previous_device; // return value of mapi_create_offline_device (device name that should be used in mapi_start_offline_device) char *new_device; // new device name (returned from mapid) int format; // the format of the captured packets (MFF_PCAP or MFF_DAG_ERF) unsigned char is_started; // this should be 1 if the device is started ( via mapi_start_offline_device() ), 0 otherwise } offline_device; flist_t *function_list = NULL; // list which contains all applyed functions flist_t *offline_device_list = NULL; // list which contains all offline devices #endif int mapiipc_write(struct mapiipcbuf *qbuf){ // sends an IPC message to mapid qbuf->uid = getuid(); // returns the real user ID of the current process if(send(sock, qbuf, sizeof(struct mapiipcbuf), MSG_NOSIGNAL) == -1){ //WARNING_CMD(printf("\nsend: %s [%s:%d]\n", strerror(errno), __FILE__, __LINE__)); #ifdef RECONNECT offline_device *device = NULL; flist_node_t *fnode = NULL; flowdescr_t* flow = NULL; functdescr_t *fun = NULL; //printf("\n ---> Mapid is down\n"); if(qbuf->cmd == READ_RESULT && qbuf->size == 1) // 1st attempt after break down ... return -1; else{ restore_network_mapid(); mapi_create_offline_device_mapid(); mapi_recreate_flow_mapid(); mapi_reapply_function_mapid(); mapi_reconnect_mapid(); mapi_start_offline_device_mapid(); mapi_read_results_mapid(); // special case ( create flow for a device name returned from mapi_create_offline_device() and start/delete this device ) if(qbuf->cmd == CREATE_FLOW || qbuf->cmd == START_OFFLINE_DEVICE || qbuf->cmd == DELETE_OFFLINE_DEVICE){ // find specified device in offline_device_list for(fnode = flist_head(offline_device_list); fnode != NULL; fnode = flist_next(fnode)){ device = (offline_device *)fnode->data; if(!strcmp(device->previous_device, (char *) qbuf->data)){ strncpy((char *) qbuf->data, device->new_device, DATA_SIZE); // the new device name break; } } } // FIXME (CLOSE_FLOW ...) else if(qbuf->cmd == GET_FLOW_INFO || qbuf->cmd == GET_NEXT_FLOW_INFO || qbuf->cmd == CONNECT || qbuf->cmd == APPLY_FUNCTION){ flow = flist_get(flowlist, qbuf->user_fd); // we must send the new fd of the flow, in the below send operation qbuf->fd = flow->fd; } else if(qbuf->cmd == GET_FUNCTION_INFO || qbuf->cmd == GET_NEXT_FUNCTION_INFO){ flow = flist_get(flowlist, qbuf->user_fd); // we must send the new fd of the flow, in the below send operation fun = flist_get(flow->flist, qbuf->user_fid); // and the new fid of the function qbuf->fd = flow->fd; qbuf->fid = fun->fid; } else if(qbuf->cmd == READ_RESULT && qbuf->fd != -1 && qbuf->fid != -1){ flow = flist_get(flowlist, qbuf->user_fd); // we must send the new fd of the flow, in the below send operation fun = flist_get(flow->flist, qbuf->user_fid); // and the new fid of the function qbuf->fd = flow->fd; qbuf->fid = fun->fid; } send(sock, qbuf, sizeof(struct mapiipcbuf), MSG_NOSIGNAL); } #else return -1; #endif } return 0; } int mapiipc_read(struct mapiipcbuf *qbuf) //Reads an IPC message. Blocking call { if(recv(sock, qbuf, MAX_SEND_SIZE, 0) == -1){ printf("ERROR: recv (%s) [%s:%d]\n", strerror(errno), __FILE__, __LINE__); return -1; } return 0; } // Sets globals (each thread) int mapiipc_set_socket_names(char *socket, char *socketglobal) { if(mapidsocket != NULL) free(mapidsocket); if(mapidsocketglobal != NULL) free(mapidsocketglobal); mapidsocket = strdup(socket); mapidsocketglobal = strdup(socketglobal); return 0; } int mapiipc_client_init() //Initializes IPC for mapi functions { if ((sock = socket(AF_LOCAL, SOCK_STREAM, 0)) == -1) { printf("ERROR: socket (%s) [%s:%d]\n", strerror(errno), __FILE__, __LINE__); return -1; } // Check that names of mapid's sockets were set if(mapidsocket == NULL || mapidsocketglobal == NULL) { printf("ERROR: mapiipc_client_init() - socket names not set [%s:%d]\n", __FILE__, __LINE__); return -1; } // construct socket (try local) mapidaddr.sun_family = AF_LOCAL; strcpy(mapidaddr.sun_path, mapidsocket); mapidaddr_len = sizeof mapidaddr.sun_family + strlen(mapidaddr.sun_path); if (connect(sock, (struct sockaddr *)&mapidaddr, mapidaddr_len) < 0) { // construct socket (try global) strcpy(mapidaddr.sun_path, mapidsocketglobal); mapidaddr_len = sizeof mapidaddr.sun_family + strlen(mapidaddr.sun_path); if (connect(sock, (struct sockaddr *)&mapidaddr, mapidaddr_len) < 0) { printf("ERROR: connect (%s) [%s:%d]\n", strerror(errno), __FILE__, __LINE__); } else { return 0; } } return -1; } void mapiipc_client_close() //Releases socket resources { close(sock); } #ifdef DIMAPI struct sockaddr_in remoteaddr; flist_t *remote_flowlist=NULL; sem_t stats_sem; int mapiipc_remote_write(struct dmapiipcbuf *dbuf, struct host *h){ // sends an IPC message to mapid #ifdef DIMAPISSL #ifdef RECONNECT if(SSL_write(h->con, dbuf, dbuf->length) <= 0){ printf("WARNING: SSL_write (%s) [%s:%d]\n", strerror(errno), __FILE__, __LINE__); sem_wait(&h->connection); // lock the semaphore } #else if(SSL_write(h->con, dbuf, dbuf->length) <= 0){ printf("WARNING: SSL_write (%s) [%s:%d]\n", strerror(errno), __FILE__, __LINE__); return -1; } #endif #else #ifdef RECONNECT // MSG_NOSIGNAL : requests not to send SIGPIPE on errors on stream oriented sockets when the other end breaks the connection // need in mapi_get_next_packet() if(send(h->sockfd, dbuf, dbuf->length, MSG_NOSIGNAL) <= 0){ //WARNING_CMD( printf("send: %s [%s:%d]\n", strerror(errno), __FILE__, __LINE__) ); sem_wait(&h->connection); // lock the semaphore } #else if(send(h->sockfd, dbuf, dbuf->length, 0) == -1) { printf("WARNING: send (%s) [%s:%d]\n", strerror(errno), __FILE__, __LINE__); return -1; } #endif #endif return 0; } int mapiipc_remote_write_to_all(remote_flowdescr_t* rflow) { host_flow* hflow; flist_node_t* fnode; for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { hflow=(host_flow*)fnode->data; hflow->dbuf->fd=hflow->fd; if (mapiipc_remote_write(hflow->dbuf, hflow->rhost)<0) return -1; } for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) { sem_wait(&rflow->fd_sem); } return 0; } void cleanup_handler(void *arg){ //the cleanup handler free(arg); return; } void *mapiipc_comm_thread(void *host){ // reads an IPC message - blocking call struct dmapiipcbuf* dbuf; remote_flowdescr_t* rflow; host_flow* hflow; int recv_bytes; struct mapi_stat *stat; flist_node_t* fnode; #ifdef RECONNECT int check_net; struct dmapiipcbuf dbuf_; #endif /* Guarantees that thread resources are deallocated upon return */ pthread_detach(pthread_self()); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); // enable cancellation pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); // changes the type of responses to cancellation requests for the calling thread // asynchronous (cancel the calling thread as soon as the cancellation request is received) dbuf = (struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf)); // pthread_cleanup_push() function pushes the specified cancellation cleanup handler onto // the cancellation cleanup stack of the calling thread. When a thread exits or is cancelled, // and its cancellation cleanup stack is not empty, the cleanup handlers are invoked with // the specified argument in last-in-first-out order from the cancellation cleanup stack pthread_cleanup_push(cleanup_handler, dbuf); while(1){ if(host == NULL) break; #ifdef DIMAPISSL recv_bytes = SSL_readn( ((struct host *) host)->con, dbuf, BASIC_SIZE); #else recv_bytes = readn( ((struct host *) host)->sockfd, dbuf, BASIC_SIZE); #endif if(recv_bytes == 0){ // the peer has gone #ifdef RECONNECT //printf("\n\t---> Mapicommd is down. Socket closed from host: %s\n", ((struct host *)host)->hostname); check_net = check_network_mapicommd((struct host *) host); if(check_net == 1) // network is up continue; else{ // network is down //printf("\nNetwork down ...\n"); restore_network_mapicommd((struct host *) host); dbuf_.cmd = IGNORE_SLEEP; dbuf_.length = BASIC_SIZE; if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0) // send an IPC message to mapicommd break; mapi_recreate_flow((struct host *) host); mapi_reapply_function((struct host *) host); mapi_reconnect((struct host *) host); #ifdef WITH_AUTHENTICATION mapi_reauthenticate((struct host *) host); #endif dbuf_.cmd = IGNORE_NOTIFY; dbuf_.length = BASIC_SIZE; if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0) // send an IPC message to mapicommd break; check_mapi_functions((struct host *) host); mapi_get_next_packet((struct host *) host); sem_trywait(& ((struct host *) host)->connection); // lock the semaphore only if the semaphore is currently not locked sem_post(& ((struct host *) host)->connection); // unlock the semaphore continue; } #else check_for_read_results((struct host *) host); break; #endif } else if(recv_bytes == -1){ #ifdef RECONNECT //printf("\n\t---> Mapicommd is down. Socket closed from host: %s\n", ((struct host *)host)->hostname); check_net = check_network_mapicommd((struct host *) host); if(check_net == 1) // network is up continue; else{ // network is down //printf("\nNetwork down ...\n"); restore_network_mapicommd((struct host *) host); dbuf_.cmd = IGNORE_SLEEP; dbuf_.length = BASIC_SIZE; if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0) // send an IPC message to mapicommd break; mapi_recreate_flow((struct host *) host); mapi_reapply_function((struct host *) host); mapi_reconnect((struct host *) host); #ifdef WITH_AUTHENTICATION mapi_reauthenticate((struct host *) host); #endif dbuf_.cmd = IGNORE_NOTIFY; dbuf_.length = BASIC_SIZE; if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0) // send an IPC message to mapicommd break; check_mapi_functions((struct host *) host); mapi_get_next_packet((struct host *) host); sem_trywait(& ((struct host *) host)->connection); // lock the semaphore only if the semaphore is currently not locked sem_post(& ((struct host *) host)->connection); // unlock the semaphore continue; } #else continue; #endif } if (dbuf->length > DIMAPI_DATA_SIZE) { printf("Bad IPC message from agent [%s:%d]\n", __FILE__, __LINE__); continue; } if (dbuf->length - BASIC_SIZE>0) { #ifdef DIMAPISSL recv_bytes = SSL_readn( ((struct host *) host)->con, (char *)dbuf + BASIC_SIZE, dbuf->length - BASIC_SIZE ); #else recv_bytes = readn( ((struct host *) host)->sockfd, (char *)dbuf + BASIC_SIZE, dbuf->length - BASIC_SIZE); #endif if(recv_bytes == 0){ // the peer has gone #ifdef RECONNECT //printf("\n\t---> Mapicommd is down. Socket closed from host: %s\n", ((struct host *)host)->hostname); check_net = check_network_mapicommd((struct host *) host); if(check_net == 1) // network is up continue; else{ // network is down //printf("\nNetwork down ...\n"); restore_network_mapicommd((struct host *) host); dbuf_.cmd = IGNORE_SLEEP; dbuf_.length = BASIC_SIZE; if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0) // send an IPC message to mapicommd break; mapi_recreate_flow((struct host *) host); mapi_reapply_function((struct host *) host); mapi_reconnect((struct host *) host); #ifdef WITH_AUTHENTICATION mapi_reauthenticate((struct host *) host); #endif dbuf_.cmd = IGNORE_NOTIFY; dbuf_.length = BASIC_SIZE; if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0) // send an IPC message to mapicommd break; check_mapi_functions((struct host *) host); mapi_get_next_packet((struct host *) host); sem_trywait(& ((struct host *) host)->connection); // lock the semaphore only if the semaphore is currently not locked sem_post(& ((struct host *) host)->connection); // unlock the semaphore continue; } #else check_for_read_results((struct host *) host); break; #endif } else if(recv_bytes == -1){ #ifdef RECONNECT //printf("\n\t---> Mapicommd is down. Socket closed from host: %s\n", ((struct host *)host)->hostname); check_net = check_network_mapicommd((struct host *) host); if(check_net == 1) // network is up continue; else{ // network is down //printf("\nNetwork down ...\n"); restore_network_mapicommd((struct host *) host); dbuf_.cmd = IGNORE_SLEEP; dbuf_.length = BASIC_SIZE; if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0) // send an IPC message to mapicommd break; mapi_recreate_flow((struct host *) host); mapi_reapply_function((struct host *) host); mapi_reconnect((struct host *) host); #ifdef WITH_AUTHENTICATION mapi_reauthenticate((struct host *) host); #endif dbuf_.cmd = IGNORE_NOTIFY; dbuf_.length = BASIC_SIZE; if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0) // send an IPC message to mapicommd break; check_mapi_functions((struct host *) host); mapi_get_next_packet((struct host *) host); sem_trywait(& ((struct host *) host)->connection); // lock the semaphore only if the semaphore is currently not locked sem_post(& ((struct host *) host)->connection); // unlock the semaphore continue; } #else continue; #endif } } if (dbuf->cmd==MAPI_STATS_ACK) { for (fnode=flist_head(((struct host*)host)->stats); fnode!=NULL; fnode=flist_next(fnode)) { stat=(struct mapi_stat*)fnode->data; if ( strcmp( ((struct mapi_stat*)dbuf->data)->dev, stat->dev)==0 ) { strncpy( ((struct mapi_stat*)dbuf->data)->hostname, stat->hostname, MAPI_STR_LENGTH ); memcpy(stat, dbuf->data, sizeof(struct mapi_stat)); flist_remove(((struct host*)host)->stats, fnode->id); break; } } sem_post( &stats_sem ); continue; } else if (dbuf->cmd==MAPI_STATS_ERR) { sem_post( &stats_sem ); continue; } hflow=(host_flow*)flist_get( ((struct host*)host)->flows, dbuf->fd ); if (hflow!=NULL) { rflow=flist_get(remote_flowlist, hflow->scope_fd); if (dbuf->cmd==GET_NEXT_PKT_ACK) { if (dbuf->length == BASIC_SIZE) { hflow->pkt->caplen = 0; } memcpy(hflow->pkt, dbuf->data, dbuf->length-BASIC_SIZE); flist_append(rflow->pkt_list, 0, hflow); sem_post(&rflow->pkt_sem); } #ifdef RECONNECT else if(dbuf->cmd == READ_RESULT_ACK_RECONNECT) rflow->daemons_down = 0; // mapid is up and runnung ... #endif else { memcpy( hflow->dbuf, dbuf, dbuf->length ); //place data sem_post( &rflow->fd_sem ); #ifdef RECONNECT if(dbuf->cmd == ERROR_ACK){ void *error_num = (void *)malloc(sizeof(int)); memcpy(error_num, dbuf->data, dbuf->length - BASIC_SIZE); if(*((int *) error_num) == MAPI_READ_RESULT_RECONNECTION){ free(error_num); rflow->daemons_down = 1; // mapid is down ... } } #endif } } else { printf("Invalid IPC message, unknown fd %d [%s:%d]\n", dbuf->fd, __FILE__, __LINE__); continue; } } pthread_cleanup_pop(1); // pthread_cleanup_pop() function shall remove the routine at the top of the // calling thread's cancellation cleanup stack and invoke it return NULL; } int mapiipc_remote_init(struct host *h) //Initializes IPC for dmapi functions { struct hostent* host=gethostbyname(h->hostname); struct timeval tv; #ifdef DIMAPISSL SSL_library_init(); // registers the available ciphers and digests SSL_load_error_strings(); // registers the error strings for all libcrypto functions and libssl if ((ctx=SSL_CTX_new(SSLv3_client_method())) == NULL) { ERR_print_errors_fp(stderr); return -1; } if ((h->con = SSL_new(ctx)) == NULL) { ERR_print_errors_fp(stderr); return -1; } signal(SIGPIPE, sigpipe_handle); // catch SIGPIPE signal (SSL_write), in case of reconnection ... #endif if ((h->sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { printf("ERROR: socket (%s) [%s:%d]\n", strerror(errno), __FILE__, __LINE__); return -1; } tv.tv_sec=10; //timeout 10 sec for send tv.tv_usec=0; if (setsockopt(h->sockfd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(struct timeval)) == -1) { close(h->sockfd); printf("ERROR: Unexpected error on setsockopt() [%s:%d]\n", __FILE__, __LINE__); return -1; } if (host==NULL) { printf("ERROR: Could not determine address for host %s [%s:%d]\n", h->hostname, __FILE__, __LINE__); return -1; } // Construct name of dmapid's socket remoteaddr.sin_family = AF_INET; remoteaddr.sin_addr = *((struct in_addr *)host->h_addr); remoteaddr.sin_port = htons(h->port); if (connect(h->sockfd, (struct sockaddr *)&remoteaddr, sizeof(remoteaddr)) < 0) { printf("ERROR: connect failed (%s) [%s:%d]\n", strerror(errno), __FILE__, __LINE__); return -1; } #ifdef DIMAPISSL if (SSL_set_fd(h->con, h->sockfd) == 0) { ERR_print_errors_fp(stderr); return -1; } if (SSL_connect(h->con) <= 0) { ERR_print_errors_fp(stderr); return -1; } #endif return 0; } void mapiipc_remote_close(struct host *h) //Releases socket resources { shutdown(h->sockfd, SHUT_RDWR); close(h->sockfd); #ifdef DIMAPISSL if (SSL_shutdown(h->con) == -1) // shut down a TLS/SSL connection ERR_print_errors_fp(stderr); SSL_free(h->con); // decrements the reference count of ssl, and removes the SSL structure pointed to by ssl // frees up the allocated memory if the the reference count has reached 0 if(ctx != NULL) SSL_CTX_free(ctx); // decrements the reference count of ctx, and removes the SSL_CTX object pointed to by ctx // frees up the allocated memory if the the reference count has reached 0 ERR_remove_state(0); // the current thread will have its error queue removed ERR_free_strings(); // frees all previously loaded error strings EVP_cleanup(); // removes all ciphers and digests from the table CRYPTO_cleanup_all_ex_data(); // clean up all allocated state #endif } void check_for_read_results(struct host *h){ int fd_sem_value, errno; struct dmapiipcbuf *dbuf = NULL; flist_node_t *fnode = NULL; host_flow *hflow = NULL; remote_flowdescr_t *rflow = NULL; for(fnode = flist_head(h->flows); fnode != NULL; fnode = flist_next(fnode)){ // iterate flow list of the specified host hflow = (host_flow *)fnode->data; if( (rflow = flist_get(remote_flowlist, hflow->scope_fd)) != NULL){ #ifndef RECONNECT sleep(1); // XXX #else rflow->daemons_down = 1; #endif if(sem_getvalue(&rflow->fd_sem, &fd_sem_value) == -1){ printf("ERROR: sem_getvalue() failed (%s) [%s:%d]\n", strerror(errno), __FILE__, __LINE__); break; } if(fd_sem_value == 0 && hflow->dbuf->cmd == READ_RESULT){ dbuf = (struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf)); errno = MAPI_READ_RESULT_RECONNECTION; dbuf->cmd = ERROR_ACK; memcpy(dbuf->data, &errno, sizeof(int)); dbuf->length = BASIC_SIZE + sizeof(int); memcpy(hflow->dbuf, dbuf, dbuf->length); sem_post(&rflow->fd_sem); // critical point ... free(dbuf); break; } } } return; } #endif /* DIMAPI */ // Helper functions for function arguments retrieval int getargint(mapiFunctArg **pos){ int i; i = *((int *)(*pos)); // printf("getint: %d\n", i); (*pos) += sizeof(int); return i; } char getargchar(mapiFunctArg **pos){ char c; c = *((char *)(*pos)); //printf("getchar: %c\n", c); (*pos) += sizeof(char); return c; } unsigned long long getargulonglong(mapiFunctArg **pos){ unsigned long long l; l = *((unsigned long long *)(*pos)); //printf("getulonglong: %lld\n", l); (*pos) += sizeof(unsigned long long); return l; } char * getargstr(mapiFunctArg **pos){ char *s; s = (char*)*pos; //printf("getstr: %s\n", s); (*pos) += strlen(s)+1; return s; } void addarg(mapiFunctArg **pos, void *arg, int type) // Helper function for mapi_apply_function() // pos: current position in message's argument buffer. // arg: argument to copy into buffer // type: argument type { switch(type){ case INT: memcpy(*pos, arg, sizeof(int)); //printf("add_arg: %d\n", *((int *)(*pos))); (*pos) += sizeof(int); break; case CHAR: memcpy(*pos, arg, sizeof(char)); //printf("add_arg: %d\n", *((char *)(*pos))); (*pos) += sizeof(char); break; case UNSIGNED_LONG_LONG: memcpy(*pos, arg, sizeof(unsigned long long)); //printf("add_arg: %llu\n", *((unsigned long long *)(*pos))); (*pos) += sizeof(unsigned long long); break; case STRING: memcpy(*pos, arg, strlen((char *)arg)+1); //printf("add_arg: %s\n", (char *)(*pos)); (*pos) += strlen((char *)arg)+1; break; default: break; } } int mapiipc_send_fd(int sendfd) { struct msghdr msg; struct iovec iov[1]; char ptr[2]; int ret; #ifdef HAVE_MSGHDR_MSG_CONTROL union { struct cmsghdr cm; char control[CMSG_SPACE(sizeof(int))]; } control_un; struct cmsghdr *cmptr; msg.msg_control = control_un.control; msg.msg_controllen = sizeof(control_un.control); cmptr = CMSG_FIRSTHDR(&msg); cmptr->cmsg_len = CMSG_LEN(sizeof(int)); cmptr->cmsg_level = SOL_SOCKET; cmptr->cmsg_type = SCM_RIGHTS; *((int *) CMSG_DATA(cmptr)) = sendfd; #else msg.msg_accrights = (caddr_t) &sendfd; msg.msg_accrightslen = sizeof(int); #endif iov[0].iov_base = ptr; iov[0].iov_len = 2; msg.msg_iov = iov; msg.msg_iovlen = 1; msg.msg_name = NULL; msg.msg_namelen = 0; ret=sendmsg(sock,&msg,0); return(ret); } int mapiipc_read_fd(int sock) { struct msghdr msg; struct iovec iov[1]; ssize_t n; int recvfd; char c[2]; #ifdef HAVE_MSGHDR_MSG_CONTROL union { struct cmsghdr cm; char control[CMSG_SPACE(sizeof(int))]; } control_un; struct cmsghdr *cmptr; msg.msg_control = control_un.control; msg.msg_controllen = sizeof(control_un.control); #else msg.msg_accrights = (caddr_t) &newfd; msg.msg_accrightslen = sizeof(int); #endif iov[0].iov_base = &c; iov[0].iov_len = 2; msg.msg_iov = iov; msg.msg_iovlen = 1; msg.msg_name = NULL; msg.msg_namelen = 0; if ( (n = recvmsg(sock, &msg, 0)) <= 0) return(n); #ifdef HAVE_MSGHDR_MSG_CONTROL if ( (cmptr = CMSG_FIRSTHDR(&msg)) != NULL && cmptr->cmsg_len == CMSG_LEN(sizeof(int))) { if (cmptr->cmsg_level != SOL_SOCKET) { printf("ERROR: control level != SOL_SOCKET [%s:%d]\n", __FILE__, __LINE__); return -1; } if (cmptr->cmsg_type != SCM_RIGHTS) { printf("ERROR: control type != SCM_RIGHTS [%s:%d]\n", __FILE__, __LINE__); return -1; } recvfd = *((int *) CMSG_DATA(cmptr)); } else recvfd = -1; /* descriptor was not passed */ #else /* *INDENT-OFF* */ if (msg.msg_accrightslen == sizeof(int)) recvfd = newfd; else recvfd = -1; /* descriptor was not passed */ /* *INDENT-ON* */ #endif return recvfd; } /* Read "n" bytes from a socket. */ ssize_t readn(int fd, void *vptr, size_t n) { size_t nleft; ssize_t nread; char *ptr; ptr = vptr; nleft = n; while (nleft > 0) { errno=0; if ( (nread = read(fd, ptr, nleft)) < 0) { if (errno == EINTR) nread = 0; /* and call read() again */ else return(-1); } else if (nread == 0) return 0; /* EOF */ nleft -= nread; ptr += nread; } return(n - nleft); /* return >= 0 */ } #ifdef DIMAPISSL ssize_t SSL_readn(SSL *con, void *vptr, size_t n) { size_t nleft; ssize_t nread; char *ptr; ptr = vptr; nleft = n; while (nleft > 0) { errno=0; if ( (nread = SSL_read(con, ptr, nleft)) < 0) { if (errno == EINTR) nread = 0; /* and call read() again */ else return(-1); } else if (nread == 0) return 0; /* EOF */ nleft -= nread; ptr += nread; } return(n - nleft); /* return >= 0 */ } void sigpipe_handle(){ // catch the signal printf("Broken pipe (signal handler) ...\n"); return; } #endif #ifdef RECONNECT #ifdef DIMAPI // this function checks if network (mapicommd) is up // returns 1 if network is up or 0 if network is down int check_network_mapicommd(struct host *h){ struct hostent *host; struct sockaddr_in remote_address; int sockfd = 0; host = gethostbyname(h->hostname); // information about the specified host if( (sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0){ printf("ERROR: socket (%s) [%s:%d]\n", strerror(errno), __FILE__, __LINE__); return -1; } // construct name of dmapid's socket remote_address.sin_family = AF_INET; // address family remote_address.sin_addr = *((struct in_addr *)host->h_addr); remote_address.sin_port = htons(h->port); // port number (2233) if(connect(sockfd, (struct sockaddr *)&remote_address, sizeof(remote_address)) < 0) // remote mapid server is down ... return 0; shutdown(sockfd, SHUT_RDWR); close(sockfd); return 1; } // this function restores the connection to mapicommd in case of a breakdown, using back-off mechanism void restore_network_mapicommd(struct host *h){ int tries; // how many times will we try to reconnect int time = 1; // initial waiting time int check_net; for(tries = 0; ; tries++){ if(tries == 1) // in case of read_results return NULL to application and then continue calling reconnection functions ... check_for_read_results(h); //printf("\n---> Reconnection try #%d", tries); check_net = check_network_mapicommd(h); fflush(stdout); if(check_net == 1){ // network is now up ... //printf("\nMapid server is back online ...\n"); mapiipc_remote_close(h); // release previous socket resources mapiipc_remote_init(h); // initializes IPC for DiMAPI functions return; } sleep(time); // pause execution for the specified time if (tries < 8) time += time; // increase waiting time } //fprintf(stderr,"\nNetwork is down (reconnection tries expired)\n"); exit(EXIT_FAILURE); } // this function recreates all flows, that host which broke down the connection had created void mapi_recreate_flow(struct host *h){ flist_node_t *fnode; flist_t *tmp_list = NULL; // all created flows from this host host_flow *hflow; struct dmapiipcbuf *dbuf = NULL; int recv_bytes; dbuf = (struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf)); // buffer to store messages that are sent/received to/from mapicommd tmp_list = (flist_t *)malloc(sizeof(flist_t)); flist_init(tmp_list); for(fnode = flist_head(h->flows); fnode != NULL; fnode = flist_next(fnode)){ // iterate flow list of the specified host hflow = (host_flow *)fnode->data; // data stored in the node dbuf->cmd = CREATE_FLOW; strncpy((char *) dbuf->data, hflow->dev, DATA_SIZE); dbuf->length = BASIC_SIZE + strlen(hflow->dev) + 1; if(mapiipc_remote_write(dbuf, hflow->rhost) < 0){ // send an IPC message to mapicommd free(dbuf); dbuf = NULL; exit(EXIT_FAILURE); } #ifdef DIMAPISSL recv_bytes = SSL_readn(h->con, dbuf, BASIC_SIZE); #else recv_bytes = readn(h->sockfd, dbuf, BASIC_SIZE); // receive an IPC message from mapicommd #endif if(dbuf->length - BASIC_SIZE > 0){ // TODO check recv_bytes #ifdef DIMAPISSL recv_bytes = SSL_readn(h->con, (char *)dbuf + BASIC_SIZE, dbuf->length - BASIC_SIZE); #else recv_bytes = readn(h->sockfd, (char *)dbuf + BASIC_SIZE, dbuf->length - BASIC_SIZE); #endif } if(dbuf->cmd == CREATE_FLOW_ACK){ //printf("\n\t New fd = %d, Previous fd = %d\n", *(int *)(dbuf->data), hflow->fd); hflow->dbuf->fd = *(int *)(dbuf->data); // FIXME hflow->fd = *((int *)dbuf->data); // change previous fd with the new one flist_append(tmp_list, hflow->fd, hflow); // append new node to the tmp list } else{ printf("\nERROR: Could not re-create flow in host %s [%s:%d]\n", hflow->rhost->hostname, __FILE__, __LINE__); free(dbuf); dbuf = NULL; exit(EXIT_FAILURE); } } free(dbuf); dbuf = NULL; flist_destroy(h->flows); // destroy old flow list free(h->flows); h->flows = tmp_list; // now flow list is the tmp list return; } // this function reapplies all functions, that host which broke down the connection had applyed void mapi_reapply_function(struct host *h){ flist_node_t *fnode; host_flow *hflow; function_data *fdata, *fdata_; remote_flowdescr_t* ref_flow; mapiFunctArg *pos; struct dmapiipcbuf *dbuf = NULL; struct mapiipcbuf qbuf = {-1, -1, -1, -1, -1, "", -1, -1, -1, 0, "", "", -1}; int recv_bytes, tmp, fid_, fd_, tmp_fd, tmp_fid, len = 0; char *argdescr_ptr, *temp, *filename, ctmp; char buf[DATA_SIZE], *fids, *cfids, *s, *new_fids; unsigned char *args; unsigned long long ltmp; unsigned int args_size; dbuf = (struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf)); // buffer to store messages that are sent/received to/from mapicommd for(fnode = flist_head(h->functions); fnode != NULL; fnode = flist_next(fnode)){ // iterate function list of the specified host fdata = (function_data *)fnode->data; // data stored in the node hflow = fdata->hflow; //printf("\nFunction name = %s\n", fdata->fdef->name); dbuf->cmd = APPLY_FUNCTION; dbuf->fd = hflow->fd; // the new fd of the flow memcpy(dbuf->data, fdata->fdef->name, strlen(fdata->fdef->name) + 1); // put function name in the buffer pos = qbuf.data; // point to start of arguments buffer args_size = 0; // arguments size (number of bytes) // parse function arguments (need again, because some functions have arguments that reference to other flows and functions) if(strncmp(fdata->fdef->argdescr, "", 1)){ // there are some arguments args = (unsigned char *)fdata->args; argdescr_ptr = fdata->fdef->argdescr; while(strlen(argdescr_ptr) > 0){ switch(*argdescr_ptr){ case 's': temp = (char *)args; args += strlen(temp) + 1; addarg(&pos, temp, STRING); args_size += strlen(temp) + 1; break; case 'S': // reference to flows and functions (e.g RES2FILE) fids = (char *)args; args += strlen(fids) + 1; // allocate more bytes, because we can have the following case: 7@2,8@2,9@2 translated to 12@3,13@3,14@3 new_fids = (char *)malloc(strlen(fids) * 10 * sizeof(char)); strncpy(buf, fids, DATA_SIZE); cfids = buf; while( (s = strchr(cfids, ',')) != NULL){ *s = '\0'; sscanf(cfids, "%d@%d", &fid_, &fd_); ref_flow = flist_get(remote_flowlist, fd_); tmp_fd = ((host_flow *)flist_get(ref_flow->host_flowlist, fdata->index))->fd; fdata_ = flist_get(hflow->rhost->functions, fid_); tmp_fid = fdata_->fid; if(len != 0) len += sprintf(new_fids + len, ","); len += sprintf(new_fids + len, "%d", tmp_fid); len += sprintf(new_fids + len, "@"); len += sprintf(new_fids + len, "%d", tmp_fd); cfids = s + 1; } sscanf(cfids, "%d@%d", &fid_, &fd_); ref_flow = flist_get(remote_flowlist, fd_); tmp_fd = ((host_flow *)flist_get(ref_flow->host_flowlist, fdata->index))->fd; fdata_ = flist_get(hflow->rhost->functions, fid_); tmp_fid = fdata_->fid; if(len != 0) len += sprintf(new_fids + len, ","); len += sprintf(new_fids + len, "%d", tmp_fid); len += sprintf(new_fids + len, "@"); len += sprintf(new_fids + len, "%d", tmp_fd); addarg(&pos, new_fids, STRING); args_size += strlen(new_fids) + 1; len = 0; free(new_fids); new_fids = NULL; break; case 'i': memcpy(&tmp, args, sizeof(int)); args += sizeof(int); addarg(&pos, &tmp, INT); args_size += sizeof(int); break; case 'c': memcpy(&ctmp, args, sizeof(char)); args += sizeof(char); addarg(&pos, &ctmp, CHAR); args_size += sizeof(char); break; case 'l': memcpy(<mp, args, sizeof(unsigned long long)); args += sizeof(unsigned long long); addarg(&pos, <mp, UNSIGNED_LONG_LONG); args_size += sizeof(unsigned long long); break; case 'w': filename = (char *)args; args += strlen(filename) + 1; addarg(&pos, filename, STRING); args_size += strlen(filename) + 1; break; case 'r': // reference to a flow memcpy(&tmp, args, sizeof(int)); args += sizeof(int); ref_flow = flist_get(remote_flowlist, tmp); tmp = ((host_flow *)flist_get(ref_flow->host_flowlist, fdata->index))->fd; addarg(&pos, &tmp, INT); args_size += sizeof(int); break; case 'f': // reference to a fuction memcpy(&tmp, args, sizeof(int)); args += sizeof(int); fdata_ = flist_get(hflow->rhost->functions, tmp); tmp = fdata_->fid; addarg(&pos, &tmp, INT); args_size += sizeof(int); break; } argdescr_ptr++; // move to the next argument } } memcpy(dbuf->data + strlen(fdata->fdef->name) + 1, qbuf.data, args_size); // put function arguments in the buffer dbuf->length = BASIC_SIZE + strlen(fdata->fdef->name) + 1 + args_size; if(mapiipc_remote_write(dbuf, hflow->rhost) < 0){ // send an IPC message to mapicommd free(dbuf); dbuf = NULL; exit(EXIT_FAILURE); } #ifdef DIMAPISSL recv_bytes = SSL_readn(h->con, dbuf, BASIC_SIZE); #else recv_bytes = readn(h->sockfd, dbuf, BASIC_SIZE); // receive an IPC message from mapicommd #endif if(dbuf->length - BASIC_SIZE > 0){ // TODO check recv_bytes #ifdef DIMAPISSL recv_bytes = SSL_readn(h->con, (char *)dbuf + BASIC_SIZE, dbuf->length - BASIC_SIZE); #else recv_bytes = readn(h->sockfd, (char *)dbuf + BASIC_SIZE, dbuf->length - BASIC_SIZE); #endif } if(dbuf->cmd == APPLY_FUNCTION_ACK){ //printf("\n\t APPLY OK !! New fid = %d, Previous fid = %d", dbuf->fid, fdata->fid); if(hflow->dbuf->fid == fdata->fid) hflow->dbuf->fid = dbuf->fid; // FIXME fdata->fid = dbuf->fid; // change previous fid with the new one (IMPORTANT) } else{ printf("\nERROR: Could not re-apply function %s in host %s [%s:%d]\n", fdata->fdef->name, hflow->rhost->hostname, __FILE__, __LINE__); free(dbuf); dbuf = NULL; exit(EXIT_FAILURE); } } free(dbuf); dbuf = NULL; return; } // this function checks which flows of the specified host, had called mapi_connect() function void mapi_reconnect(struct host *h){ flist_node_t *fnode; remote_flowdescr_t* rflow; host_flow *hflow; struct dmapiipcbuf *dbuf = NULL; int recv_bytes; dbuf = (struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf)); // buffer to store messages that are sent/received to/from mapicommd for(fnode = flist_head(h->flows); fnode != NULL; fnode = flist_next(fnode)){ // iterate flow list of the specified host hflow = (host_flow *)fnode->data; // data stored in the node if( (rflow = flist_get(remote_flowlist, hflow->scope_fd)) != NULL){ if(rflow->is_connected){ // flow is already connected, via mapi_connect() function //printf("\n flow %d is connected\n", hflow->scope_fd); dbuf->cmd = CONNECT; dbuf->fd = hflow->fd; dbuf->length = BASIC_SIZE; if(mapiipc_remote_write(dbuf, hflow->rhost) < 0){ // send an IPC message to mapicommd free(dbuf); dbuf = NULL; exit(EXIT_FAILURE); } #ifdef DIMAPISSL recv_bytes = SSL_readn(h->con, dbuf, BASIC_SIZE); #else recv_bytes = readn(h->sockfd, dbuf, BASIC_SIZE); // receive an IPC message from mapicommd #endif if(dbuf->length - BASIC_SIZE > 0){ // TODO check recv_bytes #ifdef DIMAPISSL recv_bytes = SSL_readn(h->con, (char *)dbuf + BASIC_SIZE, dbuf->length - BASIC_SIZE); #else recv_bytes = readn(h->sockfd, (char *)dbuf + BASIC_SIZE, dbuf->length - BASIC_SIZE); #endif } if(dbuf->cmd == CONNECT_ACK){ //printf("\n Flow %d is now connected !!\n", hflow->fd); rflow->is_connected = 1; // FIXME rflow->daemons_down = 0; } else{ printf("ERROR: Could not re-connect to flow %d [%s:%d]\n", hflow->scope_fd, __FILE__, __LINE__); rflow->is_connected = 0; // FIXME free(dbuf); dbuf = NULL; exit(EXIT_FAILURE); } } /*else{ // flow is not connected printf("\n flow %d is not connected\n", hflow->scope_fd); // delete }*/ } else // FIXME return; } free(dbuf); dbuf = NULL; return; } #ifdef WITH_AUTHENTICATION // this function checks which flows of the specified host, had called mapi_authenticate() function void mapi_reauthenticate(struct host *h){ flist_node_t *fnode; remote_flowdescr_t* rflow; host_flow *hflow; struct dmapiipcbuf *dbuf = NULL; int recv_bytes; int length; dbuf = (struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf)); // buffer to store messages that are sent/received to/from mapicommd for(fnode = flist_head(h->flows); fnode != NULL; fnode = flist_next(fnode)){ // iterate flow list of the specified host hflow = (host_flow *)fnode->data; // data stored in the node if( (rflow = flist_get(remote_flowlist, hflow->scope_fd)) != NULL){ if(rflow->is_authenticated == 1){ //printf("\n flow %d is authenticated\n", hflow->scope_fd); length = 0; dbuf->cmd = AUTHENTICATE; dbuf->fd = hflow->fd; length += sprintf(dbuf->data + length, "%s", rflow->username) + 1; length += sprintf(dbuf->data + length, "%s", rflow->password) + 1; length += sprintf(dbuf->data + length, "%s", rflow->vo) + 1; dbuf->length = BASIC_SIZE + length; if(mapiipc_remote_write(dbuf, hflow->rhost) < 0){ // send an IPC message to mapicommd free(dbuf); dbuf = NULL; exit(EXIT_FAILURE); } #ifdef DIMAPISSL recv_bytes = SSL_readn(h->con, dbuf, BASIC_SIZE); #else recv_bytes = readn(h->sockfd, dbuf, BASIC_SIZE); // receive an IPC message from mapicommd #endif if(dbuf->length - BASIC_SIZE > 0){ // TODO check recv_bytes #ifdef DIMAPISSL recv_bytes = SSL_readn(h->con, (char *)dbuf + BASIC_SIZE, dbuf->length - BASIC_SIZE); #else recv_bytes = readn(h->sockfd, (char *)dbuf + BASIC_SIZE, dbuf->length - BASIC_SIZE); #endif } if(dbuf->cmd == AUTHENTICATE_ACK){ //printf("\n Flow %d is now authenticated !!\n", hflow->fd); rflow->is_authenticated = 1; // FIXME } else{ printf("ERROR: Could not re-authenticate flow %d [%s:%d]\n", hflow->scope_fd, __FILE__, __LINE__); rflow->is_authenticated = 0; // FIXME free(dbuf); dbuf = NULL; exit(EXIT_FAILURE); } } /*else{ // flow is not authenticated printf("\n flow %d is not authenticated\n", hflow->scope_fd); // delete }*/ } else // FIXME return; } free(dbuf); dbuf = NULL; return; } #endif // this function checks all functions that can be called, when a break down happens void check_mapi_functions(struct host *h){ flist_node_t *fnode; host_flow *hflow; struct dmapiipcbuf *dbuf = NULL; dbuf = (struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf)); // buffer to store messages that are sent to mapicommd for(fnode = flist_head(h->flows); fnode != NULL; fnode = flist_next(fnode)){ // iterate flow list of the specified host hflow = (host_flow *)fnode->data; // data stored in the node if(hflow->dbuf->cmd == READ_RESULT || hflow->dbuf->cmd == GET_FLOW_INFO || hflow->dbuf->cmd == GET_NEXT_FLOW_INFO || hflow->dbuf->cmd == GET_FUNCTION_INFO || hflow->dbuf->cmd == GET_NEXT_FUNCTION_INFO || hflow->dbuf->cmd == GET_DEVICE_INFO || hflow->dbuf->cmd == GET_NEXT_DEVICE_INFO || hflow->dbuf->cmd == CREATE_FLOW || hflow->dbuf->cmd == CLOSE_FLOW || hflow->dbuf->cmd == CONNECT || hflow->dbuf->cmd == APPLY_FUNCTION || hflow->dbuf->cmd == AUTHENTICATE){ memcpy(dbuf, hflow->dbuf, hflow->dbuf->length); if(mapiipc_remote_write(dbuf, hflow->rhost) < 0){ // send an IPC message to mapicommd free(dbuf); dbuf = NULL; exit(EXIT_FAILURE); } } } free(dbuf); dbuf = NULL; return; } // this function checks which flows of the specified host, had called mapi_get_next_pkt() function void mapi_get_next_packet(struct host *h){ flist_node_t *fnode; remote_flowdescr_t* rflow; host_flow *hflow; function_data* fdata; struct dmapiipcbuf *dbuf = NULL; dbuf = (struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf)); // buffer to store messages that are sent to mapicommd for(fnode = flist_head(h->flows); fnode != NULL; fnode = flist_next(fnode)){ // iterate flow list of the specified host hflow = (host_flow *)fnode->data; // data stored in the node if( (rflow = flist_get(remote_flowlist, hflow->scope_fd)) != NULL){ if(rflow->pkt_list != NULL){ // mapi_get_next_pkt() was called for this flow dbuf->cmd = GET_NEXT_PKT; dbuf->fd = hflow->fd; if( (fdata = (function_data *)flist_get(hflow->functions, rflow->to_buffer_fid)) == NULL){ // to_buffer() function ... printf("ERROR: Probably invalid fid in mapi_get_next_packet() [%s:%d]\n", __FILE__, __LINE__); free(dbuf); dbuf = NULL; exit(EXIT_FAILURE); } dbuf->fid = fdata->fid; dbuf->length = BASIC_SIZE; if(mapiipc_remote_write(dbuf, hflow->rhost) < 0){ // send an IPC message to mapicommd free(dbuf); dbuf = NULL; exit(EXIT_FAILURE); } } } else // FIXME return; } free(dbuf); dbuf = NULL; return; } #endif /* *** MAPID *** */ // this function checks if network (mapid) is up // returns 1 if network is up or 0 if network is down int check_network_mapid(void){ int sockfd, mapidaddr_len_; char *mapidsocket_; struct sockaddr_un mapidaddr_; if((sockfd = socket(AF_LOCAL, SOCK_STREAM, 0)) == -1){ // local communication printf("ERROR: socket (%s) [%s:%d]\n", strerror(errno), __FILE__, __LINE__); return -1; } mapidsocket_ = strdup(MAPIDSOCKGLOBAL); // Construct name of mapid's socket mapidaddr_.sun_family = AF_LOCAL; // address family: Unix socket strcpy(mapidaddr_.sun_path, mapidsocket_); // file name to use mapidaddr_len_ = sizeof mapidaddr_.sun_family + strlen(mapidaddr_.sun_path); if(connect(sockfd, (struct sockaddr *)&mapidaddr_, mapidaddr_len_) < 0){ // mapid is down free(mapidsocket_); return 0; } free(mapidsocket_); close(sockfd); return 1; } // this function restores the connection to mapid in case of a breakdown, using back-off mechanism void restore_network_mapid(void){ int tries; // how many times will we try to reconnect int time = 1; // initial waiting time int check_net; for(tries = 0; ; tries++){ //printf("\n---> Reconnection try #%d", tries); check_net = check_network_mapid(); fflush(stdout); if(check_net == 1){ // network is now up ... //printf("\nMapid is back online ...\n\n"); mapiipc_client_close(); // release previous socket resources mapiipc_client_init(); // initializes IPC for mapi functions return; } sleep(time); // pause execution for the specified time if (tries < 8) time += time; // increase waiting time } //fprintf(stderr,"\nNetwork is down (reconnection tries expired)\n"); exit(EXIT_FAILURE); } // this function recreates all offline devices, that have been created and not deleted void mapi_create_offline_device_mapid(void){ flist_node_t *fnode, *fnode_; offline_device *device = NULL; flowdescr_t *flow; struct mapiipcbuf qbuf; int file; for(fnode = flist_head(offline_device_list); fnode != NULL; fnode = flist_next(fnode)){ device = (offline_device *)fnode->data; // data stored in the node if( (file = open(device->path, O_LARGEFILE)) == -1) exit(EXIT_FAILURE); qbuf.mtype = 1; qbuf.cmd = CREATE_OFFLINE_DEVICE; qbuf.fd = getpid(); qbuf.pid = getpid(); qbuf.fid = device->format; strncpy((char *) qbuf.data, device->path, DATA_SIZE); if(mapiipc_write((struct mapiipcbuf *) &qbuf) < 0) // sends an IPC message to mapid exit(EXIT_FAILURE); if(mapiipc_read((struct mapiipcbuf *) &qbuf) < 0) // reads an IPC message (blocking call) exit(EXIT_FAILURE); if(qbuf.cmd == SEND_FD){ if(mapiipc_send_fd(file) == -1) exit(EXIT_FAILURE); } else exit(EXIT_FAILURE); if(mapiipc_read((struct mapiipcbuf *) &qbuf) < 0) // reads an IPC message (blocking call) exit(EXIT_FAILURE); if(qbuf.cmd == CREATE_OFFLINE_DEVICE_ACK){ //printf("\n\t New device = %s, Previous device = %s\n", (char *)qbuf.data, device->previous_device); for(fnode_ = flist_head(flowlist); fnode_ != NULL; fnode_ = flist_next(fnode_)){ // iterate list with all active flows flow = (flowdescr_t *)fnode_->data; // data stored in the node if(!strcmp(flow->device, device->previous_device)) // change previous device name with the new one (mapi_create_flow()) strcpy(flow->device, (char *) qbuf.data); // XXX } strcpy(device->new_device, (char *) qbuf.data); // XXX } else{ printf("\nERROR: Could not re-create offline device %s [%s:%d]\n", device->path, __FILE__, __LINE__); exit(EXIT_FAILURE); } } return; } // this function recreates all flows, on-line and off-line, that have been created and not closed void mapi_recreate_flow_mapid(void){ flist_node_t *fnode; flowdescr_t *flow; struct mapiipcbuf qbuf; int file; // need for offline flows for(fnode = flist_head(flowlist); fnode != NULL; fnode = flist_next(fnode)){ // iterate list with all active flows flow = (flowdescr_t *)fnode->data; // data stored in the node if(flow->file == -1){ // ON-LINE flow ... qbuf.mtype = 1; qbuf.cmd = CREATE_FLOW; qbuf.fd = getpid(); // get process identification qbuf.pid = getpid(); strncpy((char *) qbuf.data, flow->device, DATA_SIZE); if(mapiipc_write((struct mapiipcbuf *) &qbuf) < 0) // sends an IPC message to mapid exit(EXIT_FAILURE); if(mapiipc_read((struct mapiipcbuf *) &qbuf) < 0) // reads an IPC message (blocking call) exit(EXIT_FAILURE); if(qbuf.cmd == CREATE_FLOW_ACK){ //printf("\n\t New fd = %d, Previous fd = %d\n", qbuf.fd, flow->fd); flow->fd = qbuf.fd; // the new fd of the flow strcpy(flow->devtype, (char *) qbuf.data); // XXX } else{ printf("\nERROR: Could not re-create flow with device %s [%s:%d]\n", flow->device, __FILE__, __LINE__); exit(EXIT_FAILURE); } } else{ // OFF-LINE flow ... if( (file = open(flow->device, O_LARGEFILE)) == -1) exit(EXIT_FAILURE); qbuf.mtype = 1; qbuf.cmd = CREATE_OFFLINE_FLOW; qbuf.fd = getpid(); qbuf.pid = getpid(); qbuf.fid = flow->format; strncpy((char *) qbuf.data, flow->device, DATA_SIZE); if(mapiipc_write((struct mapiipcbuf *) &qbuf) < 0) // sends an IPC message to mapid exit(EXIT_FAILURE); if(mapiipc_read((struct mapiipcbuf *) &qbuf) < 0) // reads an IPC message (blocking call) exit(EXIT_FAILURE); if(qbuf.cmd == SEND_FD){ if(mapiipc_send_fd(file) == -1) exit(EXIT_FAILURE); } else exit(EXIT_FAILURE); if(mapiipc_read((struct mapiipcbuf *) &qbuf) < 0) // reads an IPC message (blocking call) exit(EXIT_FAILURE); if(qbuf.cmd == CREATE_OFFLINE_FLOW_ACK){ //printf("\n\t New fd = %d, Previous fd = %d \t off-line\n", qbuf.fd, flow->fd); flow->fd = qbuf.fd; // the new fd of the flow strcpy(flow->devtype, (char *) qbuf.data); // XXX flow->file = file; // new file descriptor of offline flow } else{ printf("\nERROR: Could not re-create offline flow with device %s [%s:%d]\n", flow->device, __FILE__, __LINE__); exit(EXIT_FAILURE); } } } return; } // this function reapplies all functions, that have been applyed to flows that have not been closed void mapi_reapply_function_mapid(void){ flist_node_t *fnode; functdescr_t *fun, *fun_; mapiFunctArg *pos; flowdescr_t *flow; struct mapiipcbuf qbuf = {-1, -1, -1, -1, -1, "", -1, -1, -1, 0, "", "", -1}; unsigned char *args; char *argdescr_ptr, *temp, *fids, *new_fids, *cfids, *s, ctmp, buf[DATA_SIZE]; unsigned long long ltmp; int c, tmp, fid_, fd_, len = 0; for(fnode = flist_head(function_list); fnode != NULL; fnode = flist_next(fnode)){ // iterate function list fun = (functdescr_t *)fnode->data; // data stored in the node pos = qbuf.data; // point to start of arguments buffer if(strncmp(fun->def->argdescr, "", 1)){ // there are some arguments args = (unsigned char *)fun->funct->instance->args; argdescr_ptr = fun->def->argdescr; while(strlen(argdescr_ptr) > 0){ switch(*argdescr_ptr){ case 's': temp = (char *)args; args += strlen(temp) + 1; addarg(&pos, temp, STRING); break; case 'S': // reference to flows and functions (e.g RES2FILE) fids = (char *)args; args += strlen(fids) + 1; new_fids = (char *)malloc(strlen(fids) * 10 * sizeof(char)); strncpy(buf, fids, DATA_SIZE); cfids = buf; while( (s = strchr(cfids, ',')) != NULL){ *s = '\0'; sscanf(cfids, "%d@%d", &fid_, &fd_); if((flow = flist_get(flowlist, fd_)) == NULL){ // search list using previous fd printf("ERROR: Probably invalid flow in re-apply function [%s:%d]\n", __FILE__, __LINE__); free(new_fids); new_fids = NULL; exit(EXIT_FAILURE); } fd_ = flow->fd; // new fd if((fun_ = flist_get(function_list, fid_)) == NULL){ // search list using previous fid printf("ERROR: Probably invalid fid in re-apply function [%s:%d]\n", __FILE__, __LINE__); free(new_fids); new_fids = NULL; exit(EXIT_FAILURE); } fid_ = fun_->fid; // new fid if(len != 0) len += sprintf(new_fids + len, ","); len += sprintf(new_fids + len, "%d", fid_); len += sprintf(new_fids + len, "@"); len += sprintf(new_fids + len, "%d", fd_); cfids = s + 1; } sscanf(cfids, "%d@%d", &fid_, &fd_); if((flow = flist_get(flowlist, fd_)) == NULL){ // search list using previous fd printf("ERROR: Probably invalid flow in re-apply function [%s:%d]\n", __FILE__, __LINE__); free(new_fids); new_fids = NULL; exit(EXIT_FAILURE); } fd_ = flow->fd; // new fd if((fun_ = flist_get(function_list, fid_)) == NULL){ // search list using previous fid printf("ERROR: Probably invalid fid in re-apply function [%s:%d]\n", __FILE__, __LINE__); free(new_fids); new_fids = NULL; exit(EXIT_FAILURE); } fid_ = fun_->fid; // new fid if(len != 0) len += sprintf(new_fids + len, ","); len += sprintf(new_fids + len, "%d", fid_); len += sprintf(new_fids + len, "@"); len += sprintf(new_fids + len, "%d", fd_); addarg(&pos, new_fids, STRING); len = 0; free(new_fids); new_fids = NULL; break; case 'i': memcpy(&tmp, args, sizeof(int)); args += sizeof(int); addarg(&pos, &tmp, INT); break; case 'c': memcpy(&ctmp, args, sizeof(char)); args += sizeof(char); addarg(&pos, &ctmp, CHAR); break; case 'l': memcpy(<mp, args, sizeof(unsigned long long)); args += sizeof(unsigned long long); addarg(&pos, <mp, UNSIGNED_LONG_LONG); break; case 'r': // reference to a flow memcpy(&tmp, args, sizeof(int)); args += sizeof(int); if((flow = flist_get(flowlist, tmp)) == NULL){ // search list using previous fd printf("ERROR: Probably invalid flow in re-apply function [%s:%d]\n", __FILE__, __LINE__); exit(EXIT_FAILURE); } tmp = flow->fd; // new fd addarg(&pos, &tmp, INT); break; case 'f': // reference to a fuction memcpy(&tmp, args, sizeof(int)); args += sizeof(int); if((fun_ = flist_get(function_list, tmp)) == NULL){ // search list using previous fid printf("ERROR: Probably invalid fid in re-apply function [%s:%d]\n", __FILE__, __LINE__); exit(EXIT_FAILURE); } tmp = fun_->fid; // new fid addarg(&pos, &tmp, INT); break; case 'w': // XXX memcpy(&tmp, args, sizeof(int)); args += sizeof(int); addarg(&pos, &tmp, INT); break; } argdescr_ptr++; // move to the next argument } } qbuf.mtype = 1; qbuf.cmd = APPLY_FUNCTION; qbuf.fd = fun->flow->fd; qbuf.pid = getpid(); strncpy(qbuf.function, fun->def->name, FUNCT_NAME_LENGTH); // function's name strncpy((char *) qbuf.argdescr, fun->def->argdescr, ARG_LENGTH); // arguments descriptor if(mapiipc_write((struct mapiipcbuf *) &qbuf) < 0) // sends an IPC message to mapid exit(EXIT_FAILURE); for(c = 0; c < fun->numfd; c++){ if(mapiipc_read((struct mapiipcbuf *) &qbuf) < 0) // reads an IPC message (blocking call) exit(EXIT_FAILURE); if(qbuf.cmd != SEND_FD) exit(EXIT_FAILURE); if(mapiipc_send_fd(fun->flow->fds[c]) == -1) exit(EXIT_FAILURE); } if(mapiipc_read((struct mapiipcbuf *) &qbuf) < 0) // reads an IPC message (blocking call) exit(EXIT_FAILURE); if(qbuf.cmd == APPLY_FUNCTION_ACK){ //printf("\n+++\t APPLY OK (%s)!! New fid = %d, Previous fid = %d\n", fun->def->name, qbuf.fid, fun->fid); fun->fid = qbuf.fid; // change previous fid with the new one fun->funct->fid = qbuf.fid; //memcpy(fun->funct->instance->args, qbuf.data, FUNCTARGS_BUF_SIZE); // copy arguments } else{ printf("\nERROR: Could not re-apply function %s [%s:%d]\n", fun->def->name, __FILE__, __LINE__); exit(EXIT_FAILURE); } } //flist_destroy(function_list); // destroy list (remove all nodes) XXX return; } // this function checks which flows, had called mapi_connect() function and not mapi_close_flow() void mapi_reconnect_mapid(void){ flist_node_t *fnode; flowdescr_t *flow; struct mapiipcbuf qbuf; for(fnode = flist_head(flowlist); fnode != NULL; fnode = flist_next(fnode)){ // iterate list with all active flows flow = (flowdescr_t *)fnode->data; // data stored in the node if(flow->is_connected){ // flow is already connected, via mapi_connect() function //printf("\n flow %d is connected\n", flow->fd); qbuf.mtype = 1; qbuf.cmd = CONNECT; qbuf.fd = flow->fd; // new fd of the flow qbuf.pid = getpid(); if(mapiipc_write((struct mapiipcbuf *) &qbuf) < 0) // sends an IPC message to mapid exit(EXIT_FAILURE); if(mapiipc_read((struct mapiipcbuf *) &qbuf) < 0) // reads an IPC message (blocking call) exit(EXIT_FAILURE); if(qbuf.cmd == CONNECT_ACK){ //printf("--> Flow %d is now connected !!\n", flow->fd); flow->is_connected = 1; // FIXME flow->mapid_down = 0; } else{ printf("ERROR: Could not re-connect to flow %d [%s:%d]\n", flow->fd, __FILE__, __LINE__); flow->is_connected = 0; // FIXME exit(EXIT_FAILURE); } } /*else // flow is not connected printf("\n flow %d is not connected\n", flow->fd); // delete */ } return; } // this function checks which flows, had called mapi_start_offline_device() function and not mapi_delete_offline_device() void mapi_start_offline_device_mapid(void){ flist_node_t *fnode; offline_device *device = NULL; struct mapiipcbuf qbuf; for(fnode = flist_head(offline_device_list); fnode != NULL; fnode = flist_next(fnode)){ device = (offline_device *)fnode->data; // data stored in the node if(device->is_started){ // offline device is already started, via mapi_start_offline_device() function //printf("device %s is started\n", device->previous_device); qbuf.mtype = 1; qbuf.cmd = START_OFFLINE_DEVICE; qbuf.fd = getpid(); qbuf.pid = getpid(); strncpy((char *) qbuf.data, device->new_device, DATA_SIZE); qbuf.fid = 0; if(mapiipc_write((struct mapiipcbuf *) &qbuf) < 0) // sends an IPC message to mapid exit(EXIT_FAILURE); if(mapiipc_read((struct mapiipcbuf *) &qbuf) < 0) // reads an IPC message (blocking call) exit(EXIT_FAILURE); if(qbuf.cmd == START_OFFLINE_DEVICE_ACK) device->is_started = 1; else{ printf("ERROR: Could not re-start offline device %s [%s:%d]\n", device->path, __FILE__, __LINE__); device->is_started = 0; // FIXME exit(EXIT_FAILURE); } } /*else //printf("device %s is NOT started\n", device->previous_device); */ } return; } // this function initializes shared memory segments for new results of a function, after reconnection void mapi_read_results_mapid(void){ flist_node_t *fnode, *fnode_; flowdescr_t *flow; functdescr_t *fun; for(fnode = flist_head(flowlist); fnode != NULL; fnode = flist_next(fnode)){ // iterate list with all active flows flow = (flowdescr_t *)fnode->data; // data stored in the node flow->shm_base = NULL; // used in default_read_result_init() function [ get pointer to shared memory ] for(fnode_ = flist_head(flow->flist); fnode_ != NULL; fnode_ = flist_next(fnode_)){ // iterate list with all functions of the flow fun = (functdescr_t *)fnode_->data; fun->result_init = 0; // in order to call function get_results_info(flow, fun) again } } return; } #endif