mapi.c 71.8 KB
Newer Older
Arne Øslebø's avatar
Arne Øslebø committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <stdio.h>
#include <stdarg.h>
#include <stdlib.h>
#include <sys/file.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <netinet/in.h>
#include <errno.h>
#include <pthread.h>
#include <string.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <unistd.h>
#include "mapi.h"
#include "mapiipc.h"
#include "mapilibhandler.h"
#include "flist.h"
#include "debug.h"
's avatar
committed
23
#include "parseconf.h"
's avatar
   
committed
24
#include "printfstring.h"
25
26
27
//#include "mapierror.h"

#include "mapi_errors.h"
Arne Øslebø's avatar
Arne Øslebø committed
28
29
30
31
32
33
34
35

#ifdef WITH_ADMISSION_CONTROL
#include <regex.h>
#include <keynote.h>
#include <openssl/rsa.h>
#include "bytestream.h"
#endif

's avatar
committed
36
37
38
39
#ifdef WITH_ANONYMIZATION
#include "anonymization/anonymization.h"
#endif

's avatar
committed
40
41
42
43
#ifdef DIMAPI
#include <signal.h>
#include <semaphore.h>
#endif
Arne Øslebø's avatar
Arne Øslebø committed
44

45
46
47
48
49
#ifdef WITH_AUTHENTICATION
#define FROM_MAPI
#include "vod/vod.h"
#endif

Arne Øslebø's avatar
Arne Øslebø committed
50
static pthread_once_t mapi_is_initialized = PTHREAD_ONCE_INIT;
51
static pthread_once_t initialized = PTHREAD_ONCE_INIT;
Arne Øslebø's avatar
Arne Øslebø committed
52
53
54
static int minit=0; //Set to 1 when MAPI has been initialized

static pthread_spinlock_t mapi_lock;
's avatar
committed
55
static pthread_spinlock_t numflows_lock;  // for numflows and totalflows variables
Arne Øslebø's avatar
Arne Øslebø committed
56
57
58

static int local_err=0; /* occurence of a mapi.c error, translation of these errors */

's avatar
committed
59
60
static int numflows=0;  // number of allocated (active) flows
static int totalflows=0;  // number of flows so far (including closed flows)
Arne Øslebø's avatar
Arne Øslebø committed
61

's avatar
committed
62
static int offline_devices;
's avatar
committed
63
64
static int agent=0;

65
extern const errorstruct Errors[];
66

Arne Øslebø's avatar
Arne Øslebø committed
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/*
//Structure used as a linked list to store information about
//registered functions
 struct functiondescr {
  char* function;
  int fd;
  int fid;
  void* ptr;
  int ptrsize;
  char* pktbuf;
  struct mapid_to_buffer *to_buffer;
  struct functiondescr *next;
  };

//Linked list of registered functions
static struct functiondescr *functions=NULL;
*/

85
86
#ifdef DIMAPI
static int hostcmp(void *h1, void *h2);
's avatar
   
committed
87
static void delete_remote_flow(remote_flowdescr_t* rflow);
88
89
flist_t *hostlist=NULL;//list containing all remote hosts used so far
extern flist_t *remote_flowlist;
's avatar
committed
90
int dimapi_port;
91
92
93
94
95
96
97
98

typedef struct function_data {
	int fid;
	mapidflib_function_def_t* fdef;
} function_data;

static pthread_once_t dmapi_is_initialized = PTHREAD_ONCE_INIT;
static pthread_spinlock_t remote_ipc_lock;
's avatar
committed
99
static pthread_spinlock_t hostlist_lock;
100
101
102
103
104

static unsigned fdseed = 0;        // 'scope' flow descriptor seed (always increases)
static unsigned fidseed = 0;       // function descriptor seed (always increases)
static unsigned negfdseed=-1;       // generates temporary negative fd, for use before create_flow
#endif
's avatar
committed
105
106
107
108
109

typedef struct libinfo {
	char* name;
} libinfo_t;

Arne Øslebø's avatar
Arne Øslebø committed
110
111
112
113
114
typedef struct flowdescr {
  int fd;
  int file; //File descriptor for offline flows
  char *devtype;
  char *shm_base;
115
  pthread_spinlock_t *shm_spinlock;
Arne Øslebø's avatar
Arne Øslebø committed
116
  flist_t *flist;
117
/*  
Arne Øslebø's avatar
Arne Øslebø committed
118
119
  int error;
  char errstr[MAPI_ERRORSTR_LENGTH];
120
*/
121
  unsigned char is_connected; // This should be 1 if the flow is connected 0 otherwise
Arne Øslebø's avatar
Arne Øslebø committed
122
123
124
125
126
127
128
129
} flowdescr_t;

flist_t *flowlist=NULL;

typedef struct functdescr {
  int fid;
  short result_init;
  mapidflib_function_def_t *def;
130
  mapidflib_function_t *funct;
Arne Øslebø's avatar
Arne Øslebø committed
131
  void *data;  
's avatar
committed
132
  mapi_results_t *result;
Arne Øslebø's avatar
Arne Øslebø committed
133
134
135
136
137
138
139
140
141
142
143
144
145
} functdescr_t;

typedef struct shm_result {
  void* ptr; //Pointer to shared data
  int size; //Size of shared data
} shm_result_t;

/*
 * Function declarations 
 */
static int
default_read_result_init(flowdescr_t *flow,functdescr_t* f,void* data);
int 
146
get_results_info(flowdescr_t *flow, functdescr_t *f);
Arne Øslebø's avatar
Arne Øslebø committed
147

148
//static int set_error(void* flow, int err_no , int is_remote);
Arne Øslebø's avatar
Arne Øslebø committed
149
150
151

static int send_fd(int *fds,int numfd);

's avatar
committed
152
//global var access functions
's avatar
   
committed
153
154
155
156
157
static int get_numflows();
static int incr_numflows();
static int decr_numflows();
static int get_totalflows();
static int incr_totalflows();
Arne Øslebø's avatar
Arne Øslebø committed
158

159
160
161
static void init()
//common initialization function for mapi and dimapi
{
's avatar
committed
162
163
#ifdef DIMAPI
 #define CONF_FILE "./mapi.conf:%s/.mapi.conf:/etc/mapi.conf"
164
  char* path=NULL, *libs=NULL, *str=NULL, *s=NULL;
's avatar
committed
165
  char* mapi_conf;
's avatar
   
committed
166
  mapi_conf = printf_string(CONF_FILE,getenv("HOME"));
167
  //init_error("/root/trunk/mapi_errors.dat");
's avatar
committed
168
169
  if (pc_load (mapi_conf))
    {
's avatar
   
committed
170
171
172
173
174
175
176
177
178
179
180
      conf_category_entry_t* empty_cat = pc_get_category("");
      const char *portstr;

      if( empty_cat == NULL ){
          fputs( "Configuration file has no empty category. Giving up.\n", stderr );
          exit(1);
      }
      path = pc_get_param (empty_cat, "libpath");
      libs = pc_get_param (empty_cat, "libs");
      portstr = pc_get_param (empty_cat, "dimapi_port");
      if( portstr == NULL ){
's avatar
committed
181
182
183
184
185
186
187
188
189
190
          fprintf(stderr,"Configuration file has no entry for `dimapi_port'. Using default port %d\n", DEFAULT_DIMAPI_PORT );
	  dimapi_port = DEFAULT_DIMAPI_PORT;
      }
      else {
	      /* make sure that portstr is a valid number. */
	      dimapi_port = atoi( portstr );
	      if ( dimapi_port<=0 || dimapi_port>=65536 ) {
		      fprintf(stderr, "Invalid port given in configuration file, the default port %d is used\n",DEFAULT_DIMAPI_PORT);
		      dimapi_port = DEFAULT_DIMAPI_PORT;
	      }
's avatar
   
committed
191
      }
's avatar
committed
192
193
194
    }
  else 
    {
's avatar
   
committed
195
196
      fputs("Error: cannot load mapi.conf file. Giving up.\n", stderr);
      fprintf( stderr, "Search path is: %s\n", mapi_conf );
's avatar
committed
197
198
199
200
      exit(1);
    }
#endif

201
202
203
204
205
206
207
208
  minit = 1;
  pthread_spin_init(&numflows_lock, PTHREAD_PROCESS_PRIVATE);
  
  flowlist = malloc(sizeof(flist_t));
  flist_init(flowlist);

#ifdef DIMAPI
  pthread_spin_init(&remote_ipc_lock, PTHREAD_PROCESS_PRIVATE);
's avatar
committed
209
  pthread_spin_init(&hostlist_lock, PTHREAD_PROCESS_PRIVATE);
210
211
212
213
  hostlist = malloc(sizeof(flist_t));
  remote_flowlist = malloc(sizeof(flist_t));
  flist_init(hostlist); 
  flist_init(remote_flowlist);
's avatar
committed
214

215
216
217
218
219
220
221
222
  str=libs;
  while((s=strchr(str,':'))!=NULL) {
    *s='\0';
    mapilh_load_library(path,str);
    str=s+1;
  }

  mapilh_load_library(path,str);
's avatar
committed
223
  free(mapi_conf);
224
  pc_close();
225
226
227
#endif
}

Arne Øslebø's avatar
Arne Øslebø committed
228
//Initializes MAPI - called only once by pthread_once()
's avatar
committed
229
static void mapi_init()
Arne Øslebø's avatar
Arne Øslebø committed
230
231
232
233
{
  struct mapiipcbuf qbuf;
  char libpath[4096],*str,*s;
  minit=1;
234
235
236
  if(mapiipc_client_init()==-1){
	 local_err = MCOM_INIT_SOCKET_ERROR;
  }
237
  pthread_once(&initialized, (void*)init);
Arne Øslebø's avatar
Arne Øslebø committed
238
239
  pthread_spin_init(&mapi_lock, PTHREAD_PROCESS_PRIVATE);
  
's avatar
committed
240
241
  offline_devices = 0;
  
Arne Øslebø's avatar
Arne Øslebø committed
242
243
244
245
246
247
  //Get libpath from mapid
  qbuf.mtype=1;
  qbuf.cmd=GET_LIBPATH;
  qbuf.fd=getpid();
  qbuf.pid=getpid();
  pthread_spin_lock(&mapi_lock);
248
249
250
251
252
253
  
 if( mapiipc_write((struct mapiipcbuf*)&qbuf))
	   	local_err = MCOM_SOCKET_ERROR;
 if( mapiipc_read((struct mapiipcbuf*)&qbuf) )
	   	local_err = MCOM_SOCKET_ERROR;
 
Arne Øslebø's avatar
Arne Øslebø committed
254
255
256
  pthread_spin_unlock(&mapi_lock);
  switch(qbuf.cmd) {
      case GET_LIBPATH_ACK:
's avatar
committed
257
	  strncpy(libpath, (char *)qbuf.data, 4096);
Arne Øslebø's avatar
Arne Øslebø committed
258
259
260
261
262
263
264
265
266
267
268
269
270
271
	  break;
      default:
	   /* MAPI_ERROR_GETTING_LIBPATH */
	  return;
	  break;
  }
  DEBUG_CMD(printf("libpath=%s\n",libpath));
		  
  //get libs from mapid
  qbuf.mtype=1;
  qbuf.cmd=GET_LIBS;
  qbuf.fd=getpid();
  qbuf.pid=getpid();
  pthread_spin_lock(&mapi_lock);
272
273
274
275
  if(mapiipc_write((struct mapiipcbuf*)&qbuf))
	   	local_err = MCOM_SOCKET_ERROR;
 if( mapiipc_read((struct mapiipcbuf*)&qbuf))
	   	local_err = MCOM_SOCKET_ERROR;
Arne Øslebø's avatar
Arne Øslebø committed
276
277
278
279
280
281
282
283
284
285
286
  pthread_spin_unlock(&mapi_lock);
  switch(qbuf.cmd) {
      case GET_LIBS_ACK:
	  break;
      default:
 	  /* MAPI_ERROR_GETTING_LIBS */
	  return; 
	  break;
  }

  //Load function libraries
's avatar
committed
287
    str = (char *)qbuf.data;
Arne Øslebø's avatar
Arne Øslebø committed
288
289
290
291
292
293
294
295
296
297
    while((s=strchr(str,':'))!=NULL) {
      *s='\0';
      mapilh_load_library(libpath,str);
      str=s+1;
    }

    mapilh_load_library(libpath,str);
    return;
}

298
299
300
301
302
303
304
305
306
#ifdef DIMAPI
//Initializes DIMAPI - called only once by pthread_once()
static void dmapi_init()
{ 
  pthread_once(&initialized, (void*)init);
  return;
}
#endif

Arne Øslebø's avatar
Arne Øslebø committed
307
308
309
310
311
312
313
int mapi_connect(int fd)
//Connect to a mapi flow
//fd = flow descriptor
{
  struct mapiipcbuf qbuf;
  flowdescr_t* flow;

314
315
316
317
318
319
#ifdef DIMAPI
  remote_flowdescr_t* rflow;
  host_flow* hflow;
  flist_node_t* fnode;
#endif

Arne Øslebø's avatar
Arne Øslebø committed
320
321
322
  if (!minit) {
    DEBUG_CMD(printf("Not initialized! [%s:%d]\n",__FILE__,__LINE__));
    return -1;
323
  }else if (fd<=0){
324
   //could not use set_error since the flow is not connected and may
325
326
327
   //cause seg fault
   fprintf(stderr,"Invalid Flow descriptor [%s:%d]\n",__FILE__,__LINE__); 
   return -1;	  
Arne Øslebø's avatar
Arne Øslebø committed
328
329
  }

330
331
332
333
334
335
336
337
#ifdef DIMAPI
  if ((rflow=flist_get(remote_flowlist,fd))!=NULL) {//flow is remote
    for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
      hflow=(host_flow*)fnode->data;
      hflow->dbuf->cmd=CONNECT;
      hflow->dbuf->fd=hflow->fd;
      hflow->dbuf->length=BASIC_SIZE;
    }
338
339
340
341
342
#ifdef WITH_AUTHENTICATION
    if(rflow->is_authenticated == 0)
    {
	   fprintf(stderr, "Flow with id %d is not authenticated. Aborting. [%s:%d]\n", fd, __FILE__, __LINE__);
	   rflow->is_connected = 0;
's avatar
committed
343
	   return(-2);
344
345
346
    }
    
#endif
347
348
349
350
    if (mapiipc_remote_write_to_all(rflow)<0){ 
	local_err = MCOM_SOCKET_ERROR;
	return -1;
    }
351
352
353
354
355
356
357
358

    //wait results

    for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
      hflow=(host_flow*)fnode->data;

      switch(hflow->dbuf->cmd) {
        case CONNECT_ACK:
359
	rflow->is_connected=1;
360
361
          continue;
        case ERROR_ACK:
362
363
	  local_err = MAPI_CONNECT; //is remote flow
	  //printf("Error! mapi_connect did not work!\n");
364
365
          return -1;
        default:
366
367
	  local_err = MAPI_CONNECT;
          //printf("Error! mapi_connect did not work!\n");
368
369
370
371
372
373
374
375
          return -1;
     }

    }
    return 0;
  }
#endif

Arne Øslebø's avatar
Arne Øslebø committed
376
377
  if ((flow=flist_get(flowlist,fd))==NULL) {
    DEBUG_CMD(printf("Invalid flow: %d [%s:%d]\n",fd,__FILE__,__LINE__));
378
    local_err = MAPI_INVALID_FLOW;
Arne Øslebø's avatar
Arne Øslebø committed
379
    return -1;
380
  } /*else if (flow->error!=0) {
Arne Øslebø's avatar
Arne Øslebø committed
381
    DEBUG_CMD(printf("Invalid flow: %d due to error #%d [%s:%d]\n",fd,flow->error,__FILE__,__LINE__));
382
    local_err = MAPI_INVALID_FLOW;
Arne Øslebø's avatar
Arne Øslebø committed
383
    return -1;
384
  }*/
Arne Øslebø's avatar
Arne Øslebø committed
385
386
387
388
389
390
 
  qbuf.mtype=1;
  qbuf.cmd=CONNECT;
  qbuf.fd=fd;
  qbuf.pid=getpid();
  pthread_spin_lock(&mapi_lock);
's avatar
committed
391
392
  if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);	  
393
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
394
395
396
397
	  return -1;
  }
  if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);
398
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
399
400
	  return -1;
  }
Arne Øslebø's avatar
Arne Øslebø committed
401
402
403
404
  pthread_spin_unlock(&mapi_lock);
  switch(qbuf.cmd)
    {
    case CONNECT_ACK:
405
	flow->is_connected=1;
's avatar
committed
406
      return 0;
Arne Øslebø's avatar
Arne Øslebø committed
407
    case ERROR_ACK:
408
      local_err =  qbuf.remote_errorcode;
Arne Øslebø's avatar
Arne Øslebø committed
409
410
      return -1;
    default:
411
      local_err= MCOM_UNKNOWN_ERROR;
Arne Øslebø's avatar
Arne Øslebø committed
412
413
414
415
      return -1;
    }  
}

416
#ifdef DIMAPI
's avatar
   
committed
417
static void delete_remote_flow(remote_flowdescr_t* rflow) 
418
419
420
421
422
423
424
425
426
427
{
  host_flow* hflow;
  flist_node_t* fnode, *fnode2;

  pthread_spin_lock(&remote_ipc_lock);
  flist_remove(remote_flowlist, rflow->fd,FLIST_LEAVE_DATA);
  decr_numflows();
  pthread_spin_unlock(&remote_ipc_lock);

  sem_destroy(&rflow->fd_sem);
's avatar
committed
428
  sem_destroy(&rflow->pkt_sem);
429
430
431
432
433
434
435
436
437
  pthread_mutex_destroy(&rflow->mutex);

  for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=fnode2 ) {
    fnode2=flist_next(fnode);
    hflow=(host_flow*)fnode->data;
    hflow->rhost->num_flows--;
    flist_remove(hflow->rhost->flows, hflow->fd, FLIST_LEAVE_DATA);
    if (hflow->rhost->num_flows==0) {
      mapiipc_remote_close(hflow->rhost);	//close the socket
's avatar
committed
438
      //pthread_kill(*hflow->rhost->comm_thread, 9);
439
440
441
      flist_destroy(hflow->rhost->flows, FLIST_LEAVE_DATA);
      free(hflow->rhost->flows);
      free(hflow->rhost->hostname);
's avatar
committed
442
      pthread_spin_lock(&hostlist_lock);
443
      flist_remove(hostlist, hflow->rhost->sockfd, FLIST_LEAVE_DATA);
's avatar
committed
444
      pthread_spin_unlock(&hostlist_lock);
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
      free(hflow->rhost);
    }
    //we check if a host is using in other rflows and delete it -close the socket- if not
    flist_destroy(hflow->functions, FLIST_FREE_DATA);
    free(hflow->functions);
    free(hflow->dev);
    free(hflow->dbuf);
    flist_remove(rflow->host_flowlist,hflow->id,FLIST_FREE_DATA);
  }
  if (rflow->pkt_list!=NULL) flist_destroy(rflow->pkt_list, FLIST_LEAVE_DATA);
  free(rflow->pkt);
  flist_destroy(rflow->function_res, FLIST_FREE_DATA);	
  free(rflow);
}
#endif

461
int mapi_create_flow(const char *dev)
Arne Øslebø's avatar
Arne Øslebø committed
462
463
464
465
466
467
//Create new flow
//dev=device that should be used
{
  struct mapiipcbuf qbuf;
  flowdescr_t *flow, *tmpflow;

468
469
  if(dev==NULL){
	 fprintf(stderr, "Error wrong device name given \n\n");
470
	 local_err  = MAPI_DEVICE_INFO_ERR;
471
472
	 return -1;
  }
473
474
475
476
477
478
479
480
481
#ifdef DIMAPI
  remote_flowdescr_t *rflow;
  char *hostname=NULL, *s=NULL, *k=NULL;
  struct host *h=NULL;
  host_flow* hflow;
  char *devp=strdup(dev);
  flist_node_t* fnode;
  unsigned int idgen=0;
#endif
Arne Øslebø's avatar
Arne Øslebø committed
482
483
484
485
  //if(!minit)
    //  if((ret=mapi_init())!=0)
	//  return ret;
  
486
  //check if flow is remote or not and call the appropriate init function
's avatar
committed
487
488
489
490
#ifndef DIMAPI
  pthread_once(&mapi_is_initialized, (void*)mapi_init);
#endif

491
#ifdef DIMAPI
's avatar
committed
492
  if ( strchr(dev,':')==NULL) pthread_once(&mapi_is_initialized, (void*)mapi_init);
493
494
495
496
497
498
  else pthread_once(&dmapi_is_initialized, (void*)dmapi_init);

  if ((s = strchr(dev,':'))!=NULL) {
    rflow=(remote_flowdescr_t *)malloc(sizeof(remote_flowdescr_t));
    rflow->fd=++fdseed;
    sem_init(&rflow->fd_sem, 0, 0);
's avatar
committed
499
    sem_init(&rflow->pkt_sem, 0, 0);
500
501
502
503
504
505
506
    pthread_mutex_init(&rflow->mutex, NULL);
    rflow->pending_msgs=0;
    rflow->host_flowlist=(flist_t*)malloc(sizeof(flist_t));
    flist_init(rflow->host_flowlist);
    rflow->pkt_list=NULL;
    rflow->pkt=(struct mapipkt*)malloc(sizeof(struct mapipkt)+PKT_LENGTH);
    rflow->function_res=(flist_t*)malloc(sizeof(flist_t));
507
    rflow->is_connected = 0;
508
509
510
511
512
513
514
515
    flist_init(rflow->function_res);
    k=strtok(devp, ", ");

    while (k!=NULL) {
      if ((s = strchr(k,':'))!=NULL) {
        *s = '\0';
        hostname = k;
        k = s + 1;
's avatar
committed
516
517
        //printf("host: %s - device: %s\n",hostname, k);
        pthread_spin_lock(&hostlist_lock);	
518
        h = (struct host *)flist_search(hostlist, hostcmp, hostname);
519
520
521
522

        if(h==NULL){// Our host is a new one --> insert it in the hostlist
          h = (struct host *)malloc(sizeof(struct host));
	  h->hostname = strdup(hostname);
's avatar
committed
523
	  //printf("New host %s\n",hostname);
's avatar
committed
524
	  h->port = dimapi_port;
525
526
527
528
	  h->flows = (flist_t *)malloc(sizeof(flist_t));
	  flist_init(h->flows);
	  h->num_flows=0;
	  // Create the socket
's avatar
committed
529
	  if (mapiipc_remote_init(h)<0) {
530
	   	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
531
532
533
		  pthread_spin_unlock(&hostlist_lock);
		  return -1;
	  }
's avatar
committed
534

535
536
537
538
539
	  h->comm_thread=(pthread_t *)malloc(sizeof(pthread_t));
	  pthread_create(h->comm_thread, NULL, *mapiipc_comm_thread, h);
	  //printf("New communication thread created\n");

          flist_append(hostlist, h->sockfd, h);
's avatar
committed
540
          pthread_spin_unlock(&hostlist_lock);
541
542
        }
        else{//host exists in the list
's avatar
committed
543
544
          //printf("%s host again\n",h->hostname);
          pthread_spin_unlock(&hostlist_lock);
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
        }

        h->num_flows++;
	hflow=(host_flow*)malloc(sizeof(host_flow));
	hflow->scope_fd=rflow->fd;
	hflow->dev=strdup(k);
	flist_append(rflow->host_flowlist, ++idgen, hflow);
	flist_append(h->flows, --negfdseed, hflow);
	hflow->fd=negfdseed;
	hflow->dbuf=(struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf));
	hflow->rhost=h;
	hflow->functions=(flist_t *)malloc(sizeof(flist_t));
	flist_init(hflow->functions);

	hflow->dbuf->cmd=CREATE_FLOW;
's avatar
committed
560
	strncpy((char *)hflow->dbuf->data,k,DATA_SIZE);
561
562
563
564
565
566
567
568
569
570
571
572
573
	hflow->dbuf->length=BASIC_SIZE+strlen(k)+1;
      }
      else {
        //this is the case where the dev string contains both 'host:interface1' and 'interface2'
	//example: mapi_create_flow("139.91.70.98:eth0, 147.52.16.102:eth0, eth1");
	//user's intention is probably localhost:eth1
	//what should be done in this case?
      }
      k=strtok(NULL,", ");
    }

    free(devp);

's avatar
committed
574
    rflow->scope_size=flist_size(rflow->host_flowlist);
575
576
577
578
579
580
    pthread_spin_lock(&remote_ipc_lock);
    flist_append(remote_flowlist, rflow->fd, rflow);
    incr_numflows();
    incr_totalflows();
    pthread_spin_unlock(&remote_ipc_lock);

581
582
583
584
    if (mapiipc_remote_write_to_all(rflow)<0){
	    local_err = MCOM_SOCKET_ERROR;
	    return -1;
    }
's avatar
committed
585
    //sends to all hosts of rflow the proper dbuf, increment the pending_msgs makes sem_wait(rflow->fd_sem) and the comm_thread will get the results - the hflow->fd for every flow -
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607

    //wait for results

    for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
      hflow=(host_flow*)fnode->data;
      if (hflow->dbuf->cmd == CREATE_FLOW_ACK && *((int*)hflow->dbuf->data)!=-1) {
        hflow->fd=*((int*)hflow->dbuf->data);
	flist_remove(hflow->rhost->flows, hflow->dbuf->fd, FLIST_LEAVE_DATA);
	flist_append(hflow->rhost->flows, hflow->fd, hflow);
	printf("New host flow created with fd %d for rflow %d\n",hflow->fd,rflow->fd);
      }
      else {//error (fd==-1)
        delete_remote_flow(rflow);
	return -1;
      }
    }

    return rflow->fd;

  }
#endif

's avatar
committed
608
609
  if ((get_numflows() == 0) && (get_totalflows() > 0) && minit) { // socket has been closed, re-create it
    pthread_spin_lock(&mapi_lock);
610
611
612
    if(mapiipc_client_init()==-1){
	  local_err = MCOM_INIT_SOCKET_ERROR;
    }
's avatar
committed
613
    pthread_spin_unlock(&mapi_lock);
Arne Øslebø's avatar
Arne Øslebø committed
614
615
616
617
618
619
  }

  qbuf.mtype=1;
  qbuf.cmd=CREATE_FLOW;
  qbuf.fd=getpid();
  qbuf.pid=getpid();
's avatar
committed
620
  strncpy((char *)qbuf.data,dev,DATA_SIZE);
Arne Øslebø's avatar
Arne Øslebø committed
621
  pthread_spin_lock(&mapi_lock);
's avatar
committed
622
  if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) {
623
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
624
625
626
627
	  pthread_spin_unlock(&mapi_lock);	  
	  return -1;
  }
  if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
628
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
629
630
631
	  pthread_spin_unlock(&mapi_lock);
	  return -1;
  }
Arne Øslebø's avatar
Arne Øslebø committed
632
633
634
635
636
  pthread_spin_unlock(&mapi_lock);
  switch(qbuf.cmd)
    {
    case CREATE_FLOW_ACK:
      tmpflow=flist_get(flowlist,qbuf.fd);
's avatar
committed
637
638
639
640
      if (tmpflow!=NULL) 
        {
	        ERROR_CMD(fprintf(stderr,"mapid gave us a fd which already exist in our lists (%d), exiting [%s:%d]\n",
	            qbuf.fd,__FILE__,__LINE__));
's avatar
committed
641
642
	        //exit(EXIT_FAILURE);
		return -1;
's avatar
committed
643
     	}
Arne Øslebø's avatar
Arne Øslebø committed
644
      flow=malloc(sizeof(flowdescr_t));
's avatar
   
committed
645
646
      if( flow == NULL ){
	  fputs( "Out of memory\n", stderr );
's avatar
committed
647
648
	  //exit( EXIT_FAILURE );
	  return -1;
's avatar
   
committed
649
      }
Arne Øslebø's avatar
Arne Øslebø committed
650
      flow->fd=qbuf.fd;
's avatar
committed
651
      flow->devtype=(char *)malloc(strlen((char *)qbuf.data)+1);
Arne Øslebø's avatar
Arne Øslebø committed
652
653
      flow->flist=malloc(sizeof(flist_t));
      flow->shm_base=NULL;
Arne Øslebø's avatar
Arne Øslebø committed
654
      flow->shm_spinlock=NULL;
655
656
      //flow->error=0;
      //flow->errstr[0]='\0';
657
      flow->is_connected =0;
Arne Øslebø's avatar
Arne Øslebø committed
658
      flist_init(flow->flist);
's avatar
committed
659
      strcpy(flow->devtype,(char *)qbuf.data);
Arne Øslebø's avatar
Arne Øslebø committed
660
661
      pthread_spin_lock(&mapi_lock);
      flist_append(flowlist,qbuf.fd,flow);
's avatar
committed
662
663
      incr_numflows();
      incr_totalflows();
Arne Øslebø's avatar
Arne Øslebø committed
664
665
666
667
668
      pthread_spin_unlock(&mapi_lock);
      return qbuf.fd;
      
    /* should probably have a separate error message for ERROR_ACK? */
    case ERROR_ACK:
669
      local_err=qbuf.remote_errorcode;
Arne Øslebø's avatar
Arne Øslebø committed
670
671
672
673
674
675
676
      return -1;
    default:
      local_err=MCOM_UNKNOWN_ERROR;
      return -1;
    }
}

677
int mapi_create_offline_flow(const char *dev, int format)
Arne Øslebø's avatar
Arne Øslebø committed
678
679
680
681
//Create new flow
//dev=device that should be used
{
  struct mapiipcbuf qbuf;
682
  flowdescr_t *flow=NULL;
Arne Øslebø's avatar
Arne Øslebø committed
683
684
685
686
687
  int file;

  pthread_once(&mapi_is_initialized, (void*)mapi_init);

  //Check to see if file can be opened
688
689
690
691
692
  if (dev==NULL){
	  fprintf(stderr,"Error NULL device in mapi_create_offline_flow\n\n");
	  return -1;
  }
  else if ((file=open(dev,O_LARGEFILE))==-1) {
Arne Øslebø's avatar
Arne Øslebø committed
693
694
695
696
    local_err=MAPI_ERROR_FILE;
    return -1;
  }

's avatar
committed
697
  pthread_spin_lock(&mapi_lock);
's avatar
committed
698
  if ((get_numflows() == 0) && (get_totalflows() > 0) && minit) { // socket has been closed, re-create it
's avatar
committed
699
    if (mapiipc_client_init()<0) {
700
	    local_err = MCOM_INIT_SOCKET_ERROR;
's avatar
committed
701
702
703
	    pthread_spin_unlock(&mapi_lock);
	    return -1;
    }
Arne Øslebø's avatar
Arne Øslebø committed
704
  }
's avatar
committed
705
  pthread_spin_unlock(&mapi_lock);
Arne Øslebø's avatar
Arne Øslebø committed
706
707
708
709
710
711

  qbuf.mtype=1;
  qbuf.cmd=CREATE_OFFLINE_FLOW;
  qbuf.fd=getpid();
  qbuf.pid=getpid();
  qbuf.fid=format;
's avatar
committed
712
  strncpy((char *)qbuf.data,dev,DATA_SIZE);
Arne Øslebø's avatar
Arne Øslebø committed
713
  pthread_spin_lock(&mapi_lock);
's avatar
committed
714
  if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) {
715
716
	  pthread_spin_unlock(&mapi_lock);	 
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
717
718
719
720
	  return -1;
  }
  if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);
721
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
722
723
	  return -1;
  }
Arne Øslebø's avatar
Arne Øslebø committed
724
725
726
  if(qbuf.cmd==SEND_FD) {
    if(mapiipc_send_fd(file)==-1) {
      local_err=MAPI_ERROR_SEND_FD;
's avatar
committed
727
      return -1;
Arne Øslebø's avatar
Arne Øslebø committed
728
729
730
    }
  } else {
    local_err=MAPI_ERROR_SEND_FD;
's avatar
committed
731
    pthread_spin_unlock(&mapi_lock);	  
Arne Øslebø's avatar
Arne Øslebø committed
732
733
734
    return -1;
  }

's avatar
committed
735
736
  if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);
737
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
738
739
	  return -1;
  }
Arne Øslebø's avatar
Arne Øslebø committed
740
741
742
743
744
745
  pthread_spin_unlock(&mapi_lock);
  switch(qbuf.cmd)
    {
    case CREATE_OFFLINE_FLOW_ACK:
      flow=malloc(sizeof(flowdescr_t));
      flow->fd=qbuf.fd;
's avatar
committed
746
      flow->devtype=(char *)malloc(strlen((char *)qbuf.data)+1);
Arne Øslebø's avatar
Arne Øslebø committed
747
748
      flow->flist=malloc(sizeof(flist_t));
      flow->shm_base=NULL;
Arne Øslebø's avatar
Arne Øslebø committed
749
      flow->shm_spinlock=NULL;
750
751
      //flow->error=0;
      //flow->errstr[0]='\0';
752
      flow->is_connected=0;
Arne Øslebø's avatar
Arne Øslebø committed
753
      flist_init(flow->flist);
's avatar
committed
754
      strcpy(flow->devtype,(char *)qbuf.data);
Arne Øslebø's avatar
Arne Øslebø committed
755
756
      pthread_spin_lock(&mapi_lock);
      flist_append(flowlist,qbuf.fd,flow);
's avatar
committed
757
758
      incr_numflows();
      incr_totalflows();
Arne Øslebø's avatar
Arne Øslebø committed
759
760
761
      pthread_spin_unlock(&mapi_lock);
      return qbuf.fd;
    case ERROR_ACK:
762
      local_err=qbuf.remote_errorcode;
Arne Øslebø's avatar
Arne Øslebø committed
763
764
765
766
767
768
769
      return -1;
    default:
      local_err=MCOM_UNKNOWN_ERROR;
      return -1;
    }
}

's avatar
committed
770

771
char* mapi_create_offline_device(const char *path, int format)
's avatar
committed
772
773
774
775
776
777
778
779
780
//Create new flow
//dev=device that should be used
{
  struct mapiipcbuf qbuf;
  int file;

  pthread_once(&mapi_is_initialized, (void*)mapi_init);

  //Check to see if file can be opened
781
782
783
784
785
786
  
  if (path==NULL){
	  fprintf(stderr,"Error NULL path in mapi_create_offline_device\n\n");
	  return NULL;
  }
  else if ((file=open(path,O_LARGEFILE))==-1) {
's avatar
committed
787
788
789
790
791
    local_err=MAPI_ERROR_FILE;
    return NULL;
  }

  pthread_spin_lock(&mapi_lock);
's avatar
committed
792
  if ((get_numflows() == 0) && (get_totalflows() > 0) && minit) { // socket has been closed, re-create it
's avatar
committed
793
//    pthread_spin_lock(&mapi_lock);
's avatar
committed
794
795
    if (mapiipc_client_init()<0) {
	    pthread_spin_unlock(&mapi_lock);
796
	    local_err = MCOM_INIT_SOCKET_ERROR;
's avatar
committed
797
798
	    return NULL;
    }
's avatar
committed
799
800
801
802
803
804
805
806
807
//    pthread_spin_unlock(&mapi_lock);
  }
  pthread_spin_unlock(&mapi_lock);

  qbuf.mtype=1;
  qbuf.cmd=CREATE_OFFLINE_DEVICE;
  qbuf.fd=getpid();
  qbuf.pid=getpid();
  qbuf.fid=format;
's avatar
committed
808
  strncpy((char *)qbuf.data,path,DATA_SIZE);
's avatar
committed
809
  pthread_spin_lock(&mapi_lock);
's avatar
committed
810
811
  if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);	  
812
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
813
814
815
816
	  return NULL;
  }
  if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);
817
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
818
819
	  return NULL;
  }
's avatar
committed
820
821
822
  if(qbuf.cmd==SEND_FD) {
    if(mapiipc_send_fd(file)==-1) {
      local_err=MAPI_ERROR_SEND_FD;
's avatar
committed
823
      pthread_spin_unlock(&mapi_lock);
's avatar
committed
824
825
826
827
      return NULL;      
    }
  } else {
    local_err=MAPI_ERROR_SEND_FD;
's avatar
committed
828
    pthread_spin_unlock(&mapi_lock);
's avatar
committed
829
830
831
    return NULL;
  }

's avatar
committed
832
833
  if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);
834
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
835
836
	  return NULL;
  }
's avatar
committed
837
838
839
840
841
  pthread_spin_unlock(&mapi_lock);
  switch(qbuf.cmd)
    {
    case CREATE_OFFLINE_DEVICE_ACK:
      offline_devices++;
's avatar
committed
842
      return strdup((char *)qbuf.data);
's avatar
committed
843
    case ERROR_ACK:
844
      local_err=qbuf.remote_errorcode;
's avatar
committed
845
846
847
848
849
850
851
      return NULL;
    default:
      local_err=MCOM_UNKNOWN_ERROR;
      return NULL;
    }
}

852
int mapi_start_offline_device(const char *dev)
's avatar
committed
853
854
855
856
857
858
859
//Create new flow
//dev=device that should be used
{
  struct mapiipcbuf qbuf;

  pthread_once(&mapi_is_initialized, (void*)mapi_init);

860
861
862
863
  if (dev==NULL){
	  fprintf(stderr,"Error NULL device in mapi_start_offline_device\n\n");
	  return -1;
  }
's avatar
committed
864
  pthread_spin_lock(&mapi_lock);
's avatar
committed
865
  if ((get_numflows() == 0) && (get_totalflows() > 0) && minit) { // socket has been closed, re-create it
's avatar
committed
866
    pthread_spin_lock(&mapi_lock);
's avatar
committed
867
868
    if (mapiipc_client_init()<0) {
	    pthread_spin_unlock(&mapi_lock);
869
	    local_err = MCOM_INIT_SOCKET_ERROR;
's avatar
committed
870
871
	    return -1;
    }
's avatar
committed
872
873
874
875
876
877
878
879
    pthread_spin_unlock(&mapi_lock);
  }
  pthread_spin_unlock(&mapi_lock);

  qbuf.mtype=1;
  qbuf.cmd=START_OFFLINE_DEVICE;
  qbuf.fd=getpid();
  qbuf.pid=getpid();
's avatar
committed
880
  strncpy((char *)qbuf.data,dev,DATA_SIZE);
's avatar
committed
881
882
  qbuf.fid=0;
  pthread_spin_lock(&mapi_lock);
's avatar
committed
883
884
  if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);	  
885
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
886
887
888
889
	  return -1;
  }
  if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);
890
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
891
892
	  return -1;
  }
's avatar
committed
893
894
895
896
897
898
  pthread_spin_unlock(&mapi_lock);
  switch(qbuf.cmd)
    {
    case START_OFFLINE_DEVICE_ACK:
      return 0;
    case ERROR_ACK:
899
      local_err=qbuf.remote_errorcode;
's avatar
committed
900
901
902
903
904
905
906
      return -1;
    default:
      local_err=MCOM_UNKNOWN_ERROR;
      return -1;
    }
}

's avatar
   
committed
907
int mapi_delete_offline_device(char *dev)
's avatar
committed
908
909
910
911
912
913
914
//Create new flow
//dev=device that should be used
{
  struct mapiipcbuf qbuf;
  printf("closing down...\n");
  pthread_once(&mapi_is_initialized, (void*)mapi_init);

915
916
917
918
  if (dev==NULL){
	  fprintf(stderr,"Error NULL device in mapi_delete_offline_device\n\n");
	  return -1;
  }
's avatar
committed
919
920
/* socet is not closed
  pthread_spin_lock(&mapi_lock);
's avatar
committed
921
  if ((get_numflows() == 0) && (get_totalflows() > 0) && minit) { // socket has been closed, re-create it
's avatar
committed
922
    if (mapiipc_client_init()<0) return -1;
's avatar
committed
923
924
925
926
927
928
929
930
  }
  pthread_spin_unlock(&mapi_lock);
*/

  qbuf.mtype=1;
  qbuf.cmd=DELETE_OFFLINE_DEVICE;
  qbuf.fd=getpid();
  qbuf.pid=getpid();
's avatar
committed
931
  strncpy((char *)qbuf.data,dev,DATA_SIZE);
's avatar
   
committed
932
  free(dev);
's avatar
committed
933
934
  qbuf.fid=0;
  pthread_spin_lock(&mapi_lock);
's avatar
committed
935
936
  if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);	  
937
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
938
939
940
941
	  return -1;
  }
  if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);
942
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
943
944
	  return -1;
  }
's avatar
committed
945
946
947
948
949
  pthread_spin_unlock(&mapi_lock);
  switch(qbuf.cmd)
    {
    case DELETE_OFFLINE_DEVICE_ACK:
      pthread_spin_lock(&mapi_lock);
's avatar
committed
950
  	  if((get_numflows() == 0) && --offline_devices == 0)
's avatar
committed
951
952
953
954
    	mapiipc_client_close();
  	  pthread_spin_unlock(&mapi_lock);
      return 0;
    case ERROR_ACK:
955
      local_err=qbuf.remote_errorcode;
's avatar
committed
956
957
958
959
960
961
962
963
964
      return -1;
    default:
      local_err=MCOM_UNKNOWN_ERROR;
      return -1;
    }
}



Arne Øslebø's avatar
Arne Øslebø committed
965
966
int mapi_close_flow(int fd) 
{
967
  //struct mapiipcbuf qbuf;
Arne Øslebø's avatar
Arne Øslebø committed
968
969
  functdescr_t *f=NULL;
  flowdescr_t *flow=NULL;
970

971
972
973
974
  if (fd<=0){
	  fprintf(stderr,"Error wrong fd in mapi_close_flow\n\n");
	  return -1;
  }
975
976
977
978
979
980
981
982
983
984
985
986
987
#ifdef DIMAPI
  remote_flowdescr_t *rflow=flist_get(remote_flowlist, fd);
  host_flow* hflow;  
  flist_node_t* fnode;

  if(rflow!=NULL){
    for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
      hflow=(host_flow*)fnode->data;
      hflow->dbuf->cmd=CLOSE_FLOW;
      hflow->dbuf->fd=hflow->fd;
      hflow->dbuf->length=BASIC_SIZE;
    }

's avatar
committed
988
    //if (mapiipc_remote_write_to_all(rflow)<0) return -1;
's avatar
committed
989
990
    pthread_mutex_lock(&rflow->mutex);
    rflow->pending_msgs=0;
991
    rflow->is_connected=0;
's avatar
committed
992
993
994
    for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
      hflow=(host_flow*)fnode->data;
      hflow->dbuf->fd=hflow->fd;
's avatar
committed
995
996
      if (mapiipc_remote_write(hflow->dbuf, hflow->rhost)<0) {
	  pthread_mutex_unlock(&rflow->mutex);
997
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
998
999
	  return -1;
      }
's avatar
committed
1000
1001
1002
      rflow->pending_msgs++;
    }
    pthread_mutex_unlock(&rflow->mutex);
1003

's avatar
committed
1004
1005
    //no need to wait for results
    /*for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
      hflow=(host_flow*)fnode->data;
      switch(hflow->dbuf->cmd){
        case CLOSE_FLOW_ACK:
	      break;
        case ERROR_ACK:
              local_err=MCOM_ERROR_ACK;
              break;
        default:
	      local_err=MCOM_UNKNOWN_ERROR;
              break;
      }
's avatar
committed
1017
    }*/
1018
1019
1020
1021
1022

    delete_remote_flow(rflow);
    return 0;
  }
#endif
1023
1024

  if(flowlist && (flow = flist_get(flowlist, fd))==NULL){
Arne Øslebø's avatar
Arne Øslebø committed
1025
    DEBUG_CMD(printf("Invalid flow: %d [%s:%d]\n",fd,__FILE__,__LINE__));
1026
    local_err = MAPI_INVALID_FLOW;
Arne Øslebø's avatar
Arne Øslebø committed
1027
1028
    return -1;
  }
1029
1030
1031
1032
1033
1034
1035
1036
  else
  {
    pthread_spin_lock(&mapi_lock);
    //XXX MEMORY LEAK why leave data since the node is removed????
    flow = flist_remove(flowlist,fd,FLIST_LEAVE_DATA);
    pthread_spin_unlock(&mapi_lock);
  }
	
Arne Øslebø's avatar
Arne Øslebø committed
1037
1038
1039
  //Delete functions applied first, before mapid is notified.
  pthread_spin_lock(&mapi_lock);
  while((f = flist_pop_first(flow->flist)) != NULL) {
1040
1041
    if(f->def->client_cleanup!=NULL && f->funct->instance->status==MAPIFUNC_INIT){
      f->def->client_cleanup(f->funct->instance);
Arne Øslebø's avatar
Arne Øslebø committed
1042
    }
1043
1044
    if(f->result!=NULL)
	    free(f->result);
Arne Øslebø's avatar
Arne Øslebø committed
1045
    free(f->funct->instance);
1046
    free(f->funct);
Arne Øslebø's avatar
Arne Øslebø committed
1047
1048
1049
1050
1051
    free(f->data);
    free(f);
  }
  pthread_spin_unlock(&mapi_lock);
  
1052
 /* if (flow->error==0) {
Arne Øslebø's avatar
Arne Øslebø committed
1053
1054
1055
1056
1057
  	qbuf.mtype=1;
   	qbuf.cmd=CLOSE_FLOW;
  	qbuf.fd=fd;
  	qbuf.fid=getpid();
  	qbuf.pid=getpid();
's avatar
   
committed
1058
	pthread_spin_lock(&mapi_lock);
's avatar
committed
1059
1060
1061
1062
1063
1064
1065
1066
	if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);	  
	  return -1;
	}
	if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);
	  return -1;
	}
's avatar
   
committed
1067
	pthread_spin_unlock(&mapi_lock);
Arne Øslebø's avatar
Arne Øslebø committed
1068
1069
1070
1071
1072
  
  	switch(qbuf.cmd)
	  {
	  case CLOSE_FLOW_ACK:
		// if this is the last one, release socket resources
's avatar
   
committed
1073
	      break;
Arne Øslebø's avatar
Arne Øslebø committed
1074
	  case ERROR_ACK:
's avatar
   
committed
1075
1076
	      local_err=MCOM_ERROR_ACK;
	      break;
Arne Øslebø's avatar
Arne Øslebø committed
1077
	  default:
's avatar
   
committed
1078
1079
	      local_err=MCOM_UNKNOWN_ERROR;
	      break;
Arne Øslebø's avatar
Arne Øslebø committed
1080
	  }
1081
  }*/
Arne Øslebø's avatar
Arne Øslebø committed
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094

  //Detach shared mem
  if (flow->shm_base!=NULL) {
    if (shmdt(flow->shm_base)<0) {
      WARNING_CMD(printf("Warning: Could not detach shared mem (%s) [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
    }
  }
  
  /* subtract, the flow should be closed either due to an error or
   * explicitly in this function. so numflows is really number of allocated
   * flows...
   */
  pthread_spin_lock(&mapi_lock);
's avatar
committed
1095
  if((decr_numflows() == 0) && offline_devices == 0)
Arne Øslebø's avatar
Arne Øslebø committed
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
    mapiipc_client_close();
  pthread_spin_unlock(&mapi_lock);

  //Free flow resources
  free(flow->devtype);
  free(flow->flist);
  free(flow);  
  
  return 0;
}
1106
1107
//XXX Why send_fd returns 0 on error whereas every other function 
//return -1 on error?
's avatar
   
committed
1108
static int send_fd(int *fds, int num) {
Arne Øslebø's avatar
Arne Øslebø committed
1109
1110
1111
1112
  int c;
  struct mapiipcbuf qbuf;

  for(c=0;c<num;c++) {
1113
1114
1115
1116
    if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0){
	  local_err = MCOM_SOCKET_ERROR;
	  return 0; 
    }
Arne Øslebø's avatar
Arne Øslebø committed
1117
1118
    if(qbuf.cmd!=SEND_FD)
      return 0;
1119
1120
    if(mapiipc_send_fd(fds[c])==-1)	
	    return 0;
Arne Øslebø's avatar
Arne Øslebø committed
1121
1122
1123
1124
  }
  return 1;
}

1125
int mapi_apply_function(int fd, const char* funct, ...) 
Arne Øslebø's avatar
Arne Øslebø committed
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
//Apply function to a mapi flow
//fd: flow descriptor
//funct: function to be added
{
  struct mapiipcbuf qbuf;
  int fds[256];
  int numfd=0;
  //struct functiondescr* f=malloc(sizeof(struct functiondescr));
  mapidflib_function_def_t *fdef;
  functdescr_t *f;
  mapiFunctArg *pos;
  int tmp;
  unsigned long long ltmp;
  char ctmp;
  //  char* stemp;
  //char* sbuff;
  va_list vl;
  flowdescr_t *flow;
  char *argdescr_ptr;
  char *filename;
1146
  char *temp;
's avatar
committed
1147
1148
  unsigned int arg_size=0;	//only used in dimapi - declared here to avoid multiple ifdefs later
  unsigned char* args;		//in case read from a buffer instead of va_list
Arne Øslebø's avatar
Arne Øslebø committed
1149

1150
1151
1152
1153
1154
1155
1156
#ifdef DIMAPI
  unsigned char is_remote=0;
  remote_flowdescr_t* rflow;
  host_flow* hflow;
  function_data *fdata;
  flist_node_t* fnode;
#endif
1157
 
1158

Arne Øslebø's avatar
Arne Øslebø committed
1159
1160
  if (!minit) {
    DEBUG_CMD(printf("Not initialized! [%s:%d]\n",__FILE__,__LINE__));
1161
    local_err = MAPI_INIT_ERROR;
Arne Øslebø's avatar
Arne Øslebø committed
1162
1163
    return -1;
  }
1164
1165
  else if(fd<=0){
    fprintf(stderr,"Error wrong fd in mapi_apply_function\n\n");
1166
    local_err = MAPI_INVALID_FID_FUNCID;
1167
1168
1169
1170
    return -1;
  }
  if(funct==NULL){
    fprintf(stderr,"Error NULL function in mapi_apply_function\n\n");
1171
    local_err = MFUNCT_COULD_NOT_APPLY_FUNCT;
1172
1173
1174
    return -1;
  }
  
1175
1176
#ifdef DIMAPI
  if ((rflow=flist_get(remote_flowlist,fd))!=NULL) {
1177
1178
1179
1180

	  if(rflow->is_connected){
		printf("\nERROR can not apply function on an already connected flow\n");
    		//TODO when mapi_set_flow can support remote flows enable the folowing
1181
		local_err = MAPI_FLOW_NOT_CONNECTED;
1182
1183
		return -1;
	}
1184
1185
1186
1187
1188
1189
1190
    //we create a dummy flow descriptor just to get the function info we want from mapilh_get_function_def
    flow=(flowdescr_t *)malloc(sizeof(flowdescr_t));
    flow->devtype="1.3";
    is_remote = 1;//indicates that flow is remote
  }
  else 
#endif
Arne Øslebø's avatar
Arne Øslebø committed
1191
1192
  if ((flow=flist_get(flowlist,fd))==NULL) {
    DEBUG_CMD(printf("Invalid flow: %d [%s:%d]\n",fd,__FILE__,__LINE__));
1193
    local_err = MAPI_INVALID_FLOW;
Arne Øslebø's avatar
Arne Øslebø committed
1194
    return -1;
1195
  } /*else if (flow->error!=0) {
Arne Øslebø's avatar
Arne Øslebø committed
1196
    DEBUG_CMD(printf("Invalid flow: %d due to error #%d [%s:%d]\n",fd,flow->error,__FILE__,__LINE__));
1197
    local_err = MAPI_INVALID_FLOW;
Arne Øslebø's avatar
Arne Øslebø committed
1198
    return -1;
1199
  }*/
Arne Øslebø's avatar
Arne Øslebø committed
1200

1201
1202
  if(flow->is_connected){
	printf("\nERROR can not apply function on an already connected flow\n");
1203
        local_err = MFUNCT_COULD_NOT_APPLY_FUNCT;
1204
1205
1206
	return -1;
  }

Arne Øslebø's avatar
Arne Øslebø committed
1207
1208
1209
1210
1211
  //Get information about function
  fdef=mapilh_get_function_def(funct,flow->devtype);

  if(fdef==NULL) {
    DEBUG_CMD(printf("Could not find/match function %s [%s:%d]\n",funct,__FILE__,__LINE__));
1212
    local_err = MAPI_FUNCTION_NOT_FOUND;
Arne Øslebø's avatar
Arne Øslebø committed
1213
1214
1215
1216
1217
1218
    return -1;
  }

  va_start(vl,funct);
  pos = qbuf.data;  // point to start of arguments buffer

's avatar
committed
1219
1220
1221
1222
  if (agent==1) {
    args = va_arg(vl, unsigned char*);
  }

Arne Øslebø's avatar
Arne Øslebø committed
1223
1224
1225
1226
1227
1228
  // parse function arguments
  if(strncmp(fdef->argdescr, "", 1)) { // there are some args