mapi.c 117 KB
Newer Older
1
2
3
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
Arne Øslebø's avatar
Arne Øslebø committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <stdio.h>
#include <stdarg.h>
#include <stdlib.h>
#include <sys/file.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <netinet/in.h>
#include <errno.h>
#include <pthread.h>
#include <string.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <unistd.h>
#include "mapi.h"
#include "mapiipc.h"
#include "mapilibhandler.h"
#include "flist.h"
#include "debug.h"
's avatar
committed
26
#include "parseconf.h"
's avatar
   
committed
27
#include "printfstring.h"
28
#include "mapi_errors.h"
29
#include "devgroupdb.h"
Arne Øslebø's avatar
Arne Øslebø committed
30
31
32
33
34
35
36
37

#ifdef WITH_ADMISSION_CONTROL
#include <regex.h>
#include <keynote.h>
#include <openssl/rsa.h>
#include "bytestream.h"
#endif

's avatar
committed
38
39
40
41
#ifdef WITH_ANONYMIZATION
#include "anonymization/anonymization.h"
#endif

's avatar
committed
42
43
44
45
#ifdef DIMAPI
#include <signal.h>
#include <semaphore.h>
#endif
Arne Øslebø's avatar
Arne Øslebø committed
46

47
48
49
50
51
#ifdef WITH_AUTHENTICATION
#define FROM_MAPI
#include "vod/vod.h"
#endif

Arne Øslebø's avatar
Arne Øslebø committed
52
static pthread_once_t mapi_is_initialized = PTHREAD_ONCE_INIT;
53
static pthread_once_t initialized = PTHREAD_ONCE_INIT;
Arne Øslebø's avatar
Arne Øslebø committed
54
55
56
static int minit=0; //Set to 1 when MAPI has been initialized

static pthread_spinlock_t mapi_lock;
's avatar
committed
57
static pthread_spinlock_t numflows_lock;  // for numflows and totalflows variables
Arne Øslebø's avatar
Arne Øslebø committed
58
59
60

static int local_err=0; /* occurence of a mapi.c error, translation of these errors */

's avatar
committed
61
62
static int numflows=0;  // number of allocated (active) flows
static int totalflows=0;  // number of flows so far (including closed flows)
Arne Øslebø's avatar
Arne Øslebø committed
63

's avatar
committed
64
static int offline_devices;
's avatar
committed
65
66
static int agent=0;

67
extern const errorstruct Errors[];
68

69
70
#ifdef DIMAPI
static int hostcmp(void *h1, void *h2);
's avatar
   
committed
71
static void delete_remote_flow(remote_flowdescr_t* rflow);
72
73
flist_t *hostlist=NULL;//list containing all remote hosts used so far
extern flist_t *remote_flowlist;
's avatar
committed
74
int dimapi_port;
75
extern sem_t stats_sem;
76

77
78
79
80
81
82
83
84
85
typedef struct function_data{

	int fid;			// real fid returned from mapicommd
	mapidflib_function_def_t* fdef;	// function definition
#ifdef RECONNECT
	host_flow *hflow;		// flow of a specified host
	char args[DIMAPI_DATA_SIZE];	// function's arguments
	int index;			// need for functions that have arguments that reference to a flow
#endif
86
87
88
89
} function_data;

static pthread_once_t dmapi_is_initialized = PTHREAD_ONCE_INIT;
static pthread_spinlock_t remote_ipc_lock;
's avatar
committed
90
static pthread_spinlock_t hostlist_lock;
91
92
93
94
95

static unsigned fdseed = 0;        // 'scope' flow descriptor seed (always increases)
static unsigned fidseed = 0;       // function descriptor seed (always increases)
static unsigned negfdseed=-1;       // generates temporary negative fd, for use before create_flow
#endif
's avatar
committed
96
97
98
99
100

typedef struct libinfo {
	char* name;
} libinfo_t;

Arne Øslebø's avatar
Arne Øslebø committed
101
102
typedef struct flowdescr {
  int fd;
103
  int file; 		// file descriptor for offline flows
's avatar
committed
104
105
  int fds[256];		// file descriptors
  int numfd;		// number of file descriptors
Arne Øslebø's avatar
Arne Øslebø committed
106
  char *devtype;
107
108
#ifdef RECONNECT
  char *device;
109
  int read_results_flag;
110
  int mapid_down;
111
#endif
Arne Øslebø's avatar
Arne Øslebø committed
112
  char *shm_base;
's avatar
committed
113
  pthread_spinlock_t *shm_spinlock;
Arne Øslebø's avatar
Arne Øslebø committed
114
  flist_t *flist;
115
116
  unsigned char is_connected;	// this should be 1 if the flow is connected 0 otherwise
  int format;			// this should be MFF_PCAP or MFF_DAG_ERF if the flow is offline, -1 otherwise  
Arne Øslebø's avatar
Arne Øslebø committed
117
118
} flowdescr_t;

119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#ifdef RECONNECT

typedef struct offline_device{
	char *path;			// specifies the name of the trace file to open
	char *previous_device;		// return value of mapi_create_offline_device (device name that should be used in mapi_start_offline_device)
	char *new_device;		// new device name (returned from mapid)
	int format;			// the format of the captured packets (MFF_PCAP or MFF_DAG_ERF)
	unsigned char is_started;	// this should be 1 if the device is started ( via mapi_start_offline_device() ), 0 otherwise
} offline_device;

static unsigned count_offline_devices = 0;	// used in offline_device_list (always increases)

extern flist_t *function_list;			// list which contains all applyed functions ( defined in mapiipc.c )
extern flist_t *offline_device_list;		// list which contains all offline devices ( defined in mapiipc.c )

#endif

extern flist_t *flowlist;	// defined in mapiipc.c
Arne Øslebø's avatar
Arne Øslebø committed
137
138
139
140
141

typedef struct functdescr {
  int fid;
  short result_init;
  mapidflib_function_def_t *def;
142
  mapidflib_function_t *funct;
Arne Øslebø's avatar
Arne Øslebø committed
143
  void *data;  
's avatar
committed
144
  mapi_results_t *result;
145
146
147
148
#ifdef RECONNECT
  flowdescr_t *flow;
  int numfd;
#endif
Arne Øslebø's avatar
Arne Øslebø committed
149
150
151
152
153
154
155
156
157
158
159
160
161
} functdescr_t;

typedef struct shm_result {
  void* ptr; //Pointer to shared data
  int size; //Size of shared data
} shm_result_t;

/*
 * Function declarations 
 */
static int
default_read_result_init(flowdescr_t *flow,functdescr_t* f,void* data);
int 
's avatar
committed
162
get_results_info(flowdescr_t *flow, functdescr_t *f, int user_fd, int user_fid);
Arne Øslebø's avatar
Arne Øslebø committed
163

164
//static int set_error(void* flow, int err_no , int is_remote);
Arne Øslebø's avatar
Arne Øslebø committed
165
166
167

static int send_fd(int *fds,int numfd);

's avatar
committed
168
//global var access functions
's avatar
   
committed
169
170
171
172
173
static int get_numflows();
static int incr_numflows();
static int decr_numflows();
static int get_totalflows();
static int incr_totalflows();
Arne Øslebø's avatar
Arne Øslebø committed
174

175
176
177
static void init()
//common initialization function for mapi and dimapi
{
's avatar
committed
178
#ifdef DIMAPI
179
  char* path=NULL, *libs=NULL, *str=NULL, *s=NULL;
's avatar
committed
180
  char* mapi_conf;
181
  
182
183
  mapi_conf = printf_string( CONFDIR"/"CONF_FILE );

's avatar
committed
184
185
  if (pc_load (mapi_conf))
    {
's avatar
   
committed
186
187
188
189
      conf_category_entry_t* empty_cat = pc_get_category("");
      const char *portstr;

      if( empty_cat == NULL ){
190
          printf("Configuration file has no empty category. Giving up\n");
's avatar
   
committed
191
192
193
194
195
196
          exit(1);
      }
      path = pc_get_param (empty_cat, "libpath");
      libs = pc_get_param (empty_cat, "libs");
      portstr = pc_get_param (empty_cat, "dimapi_port");
      if( portstr == NULL ){
197
          printf("ERROR: Configuration file has no entry for `dimapi_port'. Using default port %d\n", DEFAULT_DIMAPI_PORT);
's avatar
committed
198
199
200
201
202
203
	  dimapi_port = DEFAULT_DIMAPI_PORT;
      }
      else {
	      /* make sure that portstr is a valid number. */
	      dimapi_port = atoi( portstr );
	      if ( dimapi_port<=0 || dimapi_port>=65536 ) {
204
		      printf("ERROR: Invalid port given in configuration file. The default port %d is used\n", DEFAULT_DIMAPI_PORT);
's avatar
committed
205
206
		      dimapi_port = DEFAULT_DIMAPI_PORT;
	      }
's avatar
   
committed
207
      }
's avatar
committed
208
209
210
    }
  else 
    {
211
212
      printf("ERROR: Cannot load mapi.conf file. Giving up.\n");
      printf("Search path is: %s\n", mapi_conf);
's avatar
committed
213
214
215
216
      exit(1);
    }
#endif

217
218
219
220
221
222
  minit = 1;
  pthread_spin_init(&numflows_lock, PTHREAD_PROCESS_PRIVATE);
  
  flowlist = malloc(sizeof(flist_t));
  flist_init(flowlist);

223
224
225
226
227
228
229
230
#ifdef RECONNECT
  function_list = malloc(sizeof(flist_t));
  flist_init(function_list);
  
  offline_device_list = malloc(sizeof(flist_t));
  flist_init(offline_device_list);
#endif

231
232
#ifdef DIMAPI
  pthread_spin_init(&remote_ipc_lock, PTHREAD_PROCESS_PRIVATE);
's avatar
committed
233
  pthread_spin_init(&hostlist_lock, PTHREAD_PROCESS_PRIVATE);
234
235
236
237
  hostlist = malloc(sizeof(flist_t));
  remote_flowlist = malloc(sizeof(flist_t));
  flist_init(hostlist); 
  flist_init(remote_flowlist);
's avatar
committed
238

239
240
241
242
243
244
245
246
  str=libs;
  while((s=strchr(str,':'))!=NULL) {
    *s='\0';
    mapilh_load_library(path,str);
    str=s+1;
  }

  mapilh_load_library(path,str);
's avatar
committed
247
  free(mapi_conf);
248
  pc_close();
249
250
251
#endif
}

Arne Øslebø's avatar
Arne Øslebø committed
252
//Initializes MAPI - called only once by pthread_once()
's avatar
committed
253
static void mapi_init()
Arne Øslebø's avatar
Arne Øslebø committed
254
255
256
257
{
  struct mapiipcbuf qbuf;
  char libpath[4096],*str,*s;
  minit=1;
258
259
  if(mapiipc_client_init()==-1){
	 local_err = MCOM_INIT_SOCKET_ERROR;
260
	 minit=0;
's avatar
committed
261
262
263
	 fprintf(stderr, "\n--------------------------------------------------------\n");
	 fprintf(stderr, "WARNING: mapid may not be running at the given interface\n");
	 fprintf(stderr,"--------------------------------------------------------\n");
264
  }
265
  pthread_once(&initialized, (void*)init);
Arne Øslebø's avatar
Arne Øslebø committed
266
267
  pthread_spin_init(&mapi_lock, PTHREAD_PROCESS_PRIVATE);
  
's avatar
committed
268
269
  offline_devices = 0;
  
Arne Øslebø's avatar
Arne Øslebø committed
270
271
272
273
274
275
  //Get libpath from mapid
  qbuf.mtype=1;
  qbuf.cmd=GET_LIBPATH;
  qbuf.fd=getpid();
  qbuf.pid=getpid();
  pthread_spin_lock(&mapi_lock);
276
277
278
279
280
281
  
 if( mapiipc_write((struct mapiipcbuf*)&qbuf))
	   	local_err = MCOM_SOCKET_ERROR;
 if( mapiipc_read((struct mapiipcbuf*)&qbuf) )
	   	local_err = MCOM_SOCKET_ERROR;
 
Arne Øslebø's avatar
Arne Øslebø committed
282
283
284
  pthread_spin_unlock(&mapi_lock);
  switch(qbuf.cmd) {
      case GET_LIBPATH_ACK:
's avatar
committed
285
	  strncpy(libpath, (char *)qbuf.data, 4096);
Arne Øslebø's avatar
Arne Øslebø committed
286
287
288
289
290
291
	  break;
      default:
	   /* MAPI_ERROR_GETTING_LIBPATH */
	  return;
	  break;
  }
292
  printf("libpath=%s\n", libpath);
Arne Øslebø's avatar
Arne Øslebø committed
293
294
295
296
297
298
299
		  
  //get libs from mapid
  qbuf.mtype=1;
  qbuf.cmd=GET_LIBS;
  qbuf.fd=getpid();
  qbuf.pid=getpid();
  pthread_spin_lock(&mapi_lock);
300
301
302
303
  if(mapiipc_write((struct mapiipcbuf*)&qbuf))
	   	local_err = MCOM_SOCKET_ERROR;
 if( mapiipc_read((struct mapiipcbuf*)&qbuf))
	   	local_err = MCOM_SOCKET_ERROR;
Arne Øslebø's avatar
Arne Øslebø committed
304
305
306
307
308
309
310
311
312
313
314
  pthread_spin_unlock(&mapi_lock);
  switch(qbuf.cmd) {
      case GET_LIBS_ACK:
	  break;
      default:
 	  /* MAPI_ERROR_GETTING_LIBS */
	  return; 
	  break;
  }

  //Load function libraries
's avatar
committed
315
    str = (char *)qbuf.data;
Arne Øslebø's avatar
Arne Øslebø committed
316
317
318
319
320
321
322
323
324
325
    while((s=strchr(str,':'))!=NULL) {
      *s='\0';
      mapilh_load_library(libpath,str);
      str=s+1;
    }

    mapilh_load_library(libpath,str);
    return;
}

326
327
328
329
330
331
332
333
334
#ifdef DIMAPI
//Initializes DIMAPI - called only once by pthread_once()
static void dmapi_init()
{ 
  pthread_once(&initialized, (void*)init);
  return;
}
#endif

Arne Øslebø's avatar
Arne Øslebø committed
335
336
337
338
339
340
341
int mapi_connect(int fd)
//Connect to a mapi flow
//fd = flow descriptor
{
  struct mapiipcbuf qbuf;
  flowdescr_t* flow;

342
343
344
345
346
347
#ifdef DIMAPI
  remote_flowdescr_t* rflow;
  host_flow* hflow;
  flist_node_t* fnode;
#endif

Arne Øslebø's avatar
Arne Øslebø committed
348
  if (!minit) {
349
    printf("MAPI not initialized! [%s:%d]\n", __FILE__, __LINE__);
350
    local_err = MAPI_INIT_ERROR;
Arne Øslebø's avatar
Arne Øslebø committed
351
    return -1;
352
  }else if (fd<=0){
353
   printf("ERROR: Invalid flow descriptor (fd: %d) in mapi_connect\n", fd);
354
   return -1;	  
Arne Øslebø's avatar
Arne Øslebø committed
355
356
  }

357
358
359
#ifdef DIMAPI
  if ((rflow=flist_get(remote_flowlist,fd))!=NULL) {//flow is remote
    for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
360

361
362
363
364
365
      hflow=(host_flow*)fnode->data;
      hflow->dbuf->cmd=CONNECT;
      hflow->dbuf->fd=hflow->fd;
      hflow->dbuf->length=BASIC_SIZE;
    }
366
367
368
#ifdef WITH_AUTHENTICATION
    if(rflow->is_authenticated == 0)
    {
369
	   printf("ERROR: Flow with id %d is not authenticated\n", fd);
370
	   rflow->is_connected = 0;
's avatar
committed
371
	   return(-2);
372
373
374
    }
    
#endif
375
376
377
378
    if (mapiipc_remote_write_to_all(rflow)<0){ 
	local_err = MCOM_SOCKET_ERROR;
	return -1;
    }
379
380
381
382
383
384
385
386

    //wait results

    for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
      hflow=(host_flow*)fnode->data;

      switch(hflow->dbuf->cmd) {
        case CONNECT_ACK:
387
		rflow->is_connected=1;
388
389
          continue;
        case ERROR_ACK:
's avatar
committed
390
	  memcpy(&local_err, hflow->dbuf->data, sizeof(int));
391
392
          return -1;
        default:
's avatar
committed
393
	  local_err = MCOM_UNKNOWN_ERROR;
394
395
396
397
398
399
400
401
          return -1;
     }

    }
    return 0;
  }
#endif

Arne Øslebø's avatar
Arne Øslebø committed
402
  if ((flow=flist_get(flowlist,fd))==NULL) {
403
    printf("ERROR: Invalid flow %d [%s:%d]\n", fd, __FILE__, __LINE__);
404
    local_err = MAPI_INVALID_FLOW;
Arne Øslebø's avatar
Arne Øslebø committed
405
    return -1;
406
407
  }

Arne Øslebø's avatar
Arne Øslebø committed
408
409
  qbuf.mtype=1;
  qbuf.cmd=CONNECT;
410
411
412
413
#ifdef RECONNECT
  qbuf.fd = flow->fd;
  qbuf.user_fd = fd;
#else
Arne Øslebø's avatar
Arne Øslebø committed
414
  qbuf.fd=fd;
415
#endif
Arne Øslebø's avatar
Arne Øslebø committed
416
417
  qbuf.pid=getpid();
  pthread_spin_lock(&mapi_lock);
's avatar
committed
418
419
  if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);	  
420
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
421
422
423
424
	  return -1;
  }
  if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);
425
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
426
427
	  return -1;
  }
Arne Øslebø's avatar
Arne Øslebø committed
428
429
430
431
  pthread_spin_unlock(&mapi_lock);
  switch(qbuf.cmd)
    {
    case CONNECT_ACK:
432
	flow->is_connected=1;
's avatar
committed
433
      return 0;
Arne Øslebø's avatar
Arne Øslebø committed
434
    case ERROR_ACK:
435
      local_err =  qbuf.remote_errorcode;
Arne Øslebø's avatar
Arne Øslebø committed
436
437
      return -1;
    default:
438
      local_err= MCOM_UNKNOWN_ERROR;
Arne Øslebø's avatar
Arne Øslebø committed
439
440
441
442
      return -1;
    }  
}

443
#ifdef DIMAPI
's avatar
   
committed
444
static void delete_remote_flow(remote_flowdescr_t* rflow) 
445
446
{
  host_flow* hflow;
's avatar
committed
447
  flist_node_t* fnode, *fnode2, *fnode3;
's avatar
   
committed
448
  mapi_results_t* res;
449
450

  pthread_spin_lock(&remote_ipc_lock);
451
  flist_remove(remote_flowlist, rflow->fd);
452
453
454
455
  decr_numflows();
  pthread_spin_unlock(&remote_ipc_lock);

  sem_destroy(&rflow->fd_sem);
's avatar
committed
456
  sem_destroy(&rflow->pkt_sem);
's avatar
committed
457
  //pthread_mutex_destroy(&rflow->mutex);
458
459
460
461
462

  for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=fnode2 ) {
    fnode2=flist_next(fnode);
    hflow=(host_flow*)fnode->data;
    hflow->rhost->num_flows--;
463
    flist_remove(hflow->rhost->flows, hflow->fd);
's avatar
committed
464
    for (fnode3=flist_head(hflow->functions); fnode3!=NULL; fnode3=flist_next(fnode3)) {
465
      free( flist_remove(hflow->rhost->functions, ((function_data*)fnode3->data)->fid) );
's avatar
committed
466
    }
467
    if (hflow->rhost->num_flows==0) {
468
      pthread_cancel(*hflow->rhost->comm_thread);
469
      mapiipc_remote_close(hflow->rhost);	//close the socket
470
      flist_destroy(hflow->rhost->flows);
471
      free(hflow->rhost->flows);
472
      flist_destroy(hflow->rhost->functions);
's avatar
committed
473
      free(hflow->rhost->functions);
474
      free(hflow->rhost->hostname);
475
      flist_remove(hostlist, hflow->rhost->sockfd);
's avatar
committed
476
      free(hflow->rhost->comm_thread);
477
478
479
#ifdef RECONNECT
      sem_destroy(&hflow->rhost->connection);	// destroy semaphore
#endif
480
481
482
      free(hflow->rhost);
    }
    //we check if a host is using in other rflows and delete it -close the socket- if not
483
    flist_destroy(hflow->functions);
484
485
486
    free(hflow->functions);
    free(hflow->dev);
    free(hflow->dbuf);
487
    if (hflow->pkt!=NULL) free(hflow->pkt);
488
    flist_remove(rflow->host_flowlist,hflow->id);
's avatar
committed
489
    free(hflow);
490
  }
491
  flist_destroy(rflow->host_flowlist);
's avatar
   
committed
492
493
  free(rflow->host_flowlist);
  if (rflow->pkt_list!=NULL) {
494
	flist_destroy(rflow->pkt_list);
's avatar
   
committed
495
496
497
498
499
500
501
502
	free(rflow->pkt_list);
  }
  for (fnode=flist_head(rflow->function_res); fnode!=NULL; fnode=fnode2 ) { 
	fnode2=flist_next(fnode);
	res=(mapi_results_t*)fnode->data;
	free(res->res);
	free(res);
  }
503
  flist_destroy(rflow->function_res);
's avatar
committed
504
  free(rflow->function_res);
's avatar
committed
505
  if (rflow->pkt!=NULL) free(rflow->pkt);
's avatar
   
committed
506
#ifdef WITH_AUTHENTICATION
's avatar
committed
507
508
  if (rflow->username!=NULL) free(rflow->username);
  if (rflow->vo!=NULL) free(rflow->vo);
509
510
511
#ifdef RECONNECT
  if (rflow->password!=NULL) free(rflow->password);
#endif
's avatar
   
committed
512
#endif  
513
514
515
516
  free(rflow);
}
#endif

517
int mapi_create_flow(const char *dev)
Arne Øslebø's avatar
Arne Øslebø committed
518
519
520
521
522
//Create new flow
//dev=device that should be used
{
  struct mapiipcbuf qbuf;
  flowdescr_t *flow, *tmpflow;
523
524
525
526
  struct devgroupdb *devgroupdb;
  int devgroupid = 0;
  char *mapidsocket, *mapidsocketglobal;
/*  int local;*/
Arne Øslebø's avatar
Arne Øslebø committed
527

528
529
530
531
532
#ifdef DIMAPI
  remote_flowdescr_t *rflow;
  char *hostname=NULL, *s=NULL, *k=NULL;
  struct host *h=NULL;
  host_flow* hflow;
's avatar
committed
533
  char *devp;
534
535
536
  flist_node_t* fnode;
  unsigned int idgen=0;
#endif
537
538
539
540
541
542
543
#ifdef RECONNECT
  offline_device *device = NULL;
  flist_node_t *fnode_;
  int flag = 0;
#endif

  if(dev==NULL){
544
	 printf("ERROR: Wrong device name given (NULL) in mapi_create_flow\n");
545
546
547
	 local_err  = MAPI_DEVICE_INFO_ERR;
	 return -1;
  }
548
549

#ifndef DIMAPI
550
551
552
553
554
555
556
557
558
559
560
561
562
	devgroupdb = devgroupdb_open(3); // try local, global
	devgroupid = devgroupdb_getgroupidbydevice(devgroupdb, (char *) dev);

	if(!devgroupid) {
		mapidsocket = printf_string(MAPIDSOCKHOME, getenv("HOME"));
  	mapidsocketglobal = strdup(MAPIDSOCKGLOBAL);
  }
	else {
		mapidsocket = printf_string(MAPIDGSOCKHOME, getenv("HOME"), devgroupid);
		mapidsocketglobal = printf_string(MAPIDGSOCKGLOBAL, devgroupid);
	}

	mapiipc_set_socket_names(mapidsocket, mapidsocketglobal);
563

's avatar
committed
564
565
566
  pthread_once(&mapi_is_initialized, (void*)mapi_init);
#endif

567
  //check if flow is remote or not and call the appropriate init function
568
#ifdef DIMAPI
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
  if ( strchr(dev,':')==NULL) {
		devgroupdb = devgroupdb_open(3); // try local, global
		devgroupid = devgroupdb_getgroupidbydevice(devgroupdb, (char *) dev);

		if(!devgroupid) {
			mapidsocket = printf_string(MAPIDSOCKHOME, getenv("HOME"));
			mapidsocketglobal = strdup(MAPIDSOCKGLOBAL);
		}
		else {
			mapidsocket = printf_string(MAPIDGSOCKHOME, getenv("HOME"), devgroupid);
			mapidsocketglobal = printf_string(MAPIDGSOCKGLOBAL, devgroupid);
		}

		mapiipc_set_socket_names(mapidsocket, mapidsocketglobal);

  	pthread_once(&mapi_is_initialized, (void*)mapi_init);
	}
586
587
588
  else pthread_once(&dmapi_is_initialized, (void*)dmapi_init);

  if ((s = strchr(dev,':'))!=NULL) {
's avatar
committed
589
    devp=strdup(dev);
590
591
592
    rflow=(remote_flowdescr_t *)malloc(sizeof(remote_flowdescr_t));
    rflow->fd=++fdseed;
    sem_init(&rflow->fd_sem, 0, 0);
's avatar
committed
593
    sem_init(&rflow->pkt_sem, 0, 0);
594
595
596
597
    rflow->host_flowlist=(flist_t*)malloc(sizeof(flist_t));
    flist_init(rflow->host_flowlist);
    rflow->pkt_list=NULL;
    rflow->function_res=(flist_t*)malloc(sizeof(flist_t));
598
    rflow->is_connected = 0;
's avatar
   
committed
599
600
601
#ifdef WITH_AUTHENTICATION
    rflow->username=NULL;
    rflow->vo=NULL;
602
603
#ifdef RECONNECT
    rflow->password = NULL;
604
    rflow->daemons_down = 0;    
605
#endif
's avatar
   
committed
606
#endif
's avatar
committed
607
    rflow->pkt=NULL;
608
609
610
611
612
613
614
615
    flist_init(rflow->function_res);
    k=strtok(devp, ", ");

    while (k!=NULL) {
      if ((s = strchr(k,':'))!=NULL) {
        *s = '\0';
        hostname = k;
        k = s + 1;
's avatar
committed
616
        pthread_spin_lock(&hostlist_lock);	
617
        h = (struct host *)flist_search(hostlist, hostcmp, hostname);
618
619
620
621

        if(h==NULL){// Our host is a new one --> insert it in the hostlist
          h = (struct host *)malloc(sizeof(struct host));
	  h->hostname = strdup(hostname);
's avatar
committed
622
	  h->port = dimapi_port;
623
624
	  h->flows = (flist_t *)malloc(sizeof(flist_t));
	  flist_init(h->flows);
's avatar
committed
625
626
	  h->functions = (flist_t *)malloc(sizeof(flist_t));
	  flist_init(h->functions);
627
	  h->num_flows=0;
628
	  h->stats=NULL;
629
630
631
#ifdef RECONNECT
      	  sem_init(&h->connection, 0, 0);	// initialize semaphore
#endif
632
	  // Create the socket
's avatar
committed
633
	  if (mapiipc_remote_init(h)<0) {
634
	   	  local_err = MCOM_SOCKET_ERROR;
635
		  printf("ERROR: Could not connect with host %s [%s:%d]\n", h->hostname, __FILE__, __LINE__);
's avatar
committed
636
637
638
		  pthread_spin_unlock(&hostlist_lock);
		  return -1;
	  }
's avatar
committed
639

640
641
642
643
	  h->comm_thread=(pthread_t *)malloc(sizeof(pthread_t));
	  pthread_create(h->comm_thread, NULL, *mapiipc_comm_thread, h);

          flist_append(hostlist, h->sockfd, h);
's avatar
committed
644
          pthread_spin_unlock(&hostlist_lock);
645
646
        }
        else{//host exists in the list
's avatar
committed
647
          pthread_spin_unlock(&hostlist_lock);
648
649
650
651
652
653
654
        }

        h->num_flows++;
	hflow=(host_flow*)malloc(sizeof(host_flow));
	hflow->scope_fd=rflow->fd;
	hflow->dev=strdup(k);
	flist_append(rflow->host_flowlist, ++idgen, hflow);
655
	hflow->id=idgen;
656
657
658
	flist_append(h->flows, --negfdseed, hflow);
	hflow->fd=negfdseed;
	hflow->dbuf=(struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf));
659
	hflow->pkt=NULL;
660
661
662
663
664
	hflow->rhost=h;
	hflow->functions=(flist_t *)malloc(sizeof(flist_t));
	flist_init(hflow->functions);

	hflow->dbuf->cmd=CREATE_FLOW;
's avatar
committed
665
	strncpy((char *)hflow->dbuf->data,k,DATA_SIZE);
666
667
668
669
670
671
672
673
674
675
676
677
678
	hflow->dbuf->length=BASIC_SIZE+strlen(k)+1;
      }
      else {
        //this is the case where the dev string contains both 'host:interface1' and 'interface2'
	//example: mapi_create_flow("139.91.70.98:eth0, 147.52.16.102:eth0, eth1");
	//user's intention is probably localhost:eth1
	//what should be done in this case?
      }
      k=strtok(NULL,", ");
    }

    free(devp);

's avatar
committed
679
    rflow->scope_size=flist_size(rflow->host_flowlist);
680
681
682
683
684
685
    pthread_spin_lock(&remote_ipc_lock);
    flist_append(remote_flowlist, rflow->fd, rflow);
    incr_numflows();
    incr_totalflows();
    pthread_spin_unlock(&remote_ipc_lock);

686
687
688
689
    if (mapiipc_remote_write_to_all(rflow)<0){
	    local_err = MCOM_SOCKET_ERROR;
	    return -1;
    }
's avatar
committed
690
    //sends to all hosts of rflow the proper dbuf, increment the pending_msgs makes sem_wait(rflow->fd_sem) and the comm_thread will get the results - the hflow->fd for every flow -
691
692
693
694
695

    //wait for results

    for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
      hflow=(host_flow*)fnode->data;
's avatar
committed
696
      if (hflow->dbuf->cmd == CREATE_FLOW_ACK) {
697
        hflow->fd=*((int*)hflow->dbuf->data);
698
	flist_remove(hflow->rhost->flows, hflow->dbuf->fd);
699
700
	flist_append(hflow->rhost->flows, hflow->fd, hflow);
      }
's avatar
committed
701
702
      else if (hflow->dbuf->cmd == ERROR_ACK) {
	memcpy(&local_err, hflow->dbuf->data, sizeof(int));
703
	printf("ERROR: Could not create flow in host %s [%s:%d]\n", hflow->rhost->hostname, __FILE__, __LINE__);
's avatar
committed
704
705
706
707
708
        delete_remote_flow(rflow);
	return -1;
      }
      else {
	local_err = MCOM_UNKNOWN_ERROR;
709
710
711
712
713
714
715
716
717
718
        delete_remote_flow(rflow);
	return -1;
      }
    }

    return rflow->fd;

  }
#endif

's avatar
committed
719
720
  pthread_spin_lock(&mapi_lock);
  
's avatar
committed
721
  if ((get_numflows() == 0) && (get_totalflows() > 0) && minit) { // socket has been closed, re-create it
722
723
724
    if(mapiipc_client_init()==-1){
	  local_err = MCOM_INIT_SOCKET_ERROR;
    }
's avatar
committed
725
    incr_numflows();
Arne Øslebø's avatar
Arne Øslebø committed
726
  }
's avatar
committed
727
728
729
730
  else 
    incr_numflows();
  
  pthread_spin_unlock(&mapi_lock);
Arne Øslebø's avatar
Arne Øslebø committed
731

732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
#ifdef RECONNECT
  /* consider the following case :
     	- devicename = mapi_create_offline_device("./tracefile", MFF_PCAP);
	- {
		code ...
		--> reconnection
		code ...
	  }
	  fd = mapi_create_flow(devicename);

	  Now mapi_create_flow must send to mapid the new device name ...
  */

  pthread_spin_lock(&mapi_lock);

  for(fnode_ = flist_head(offline_device_list); fnode_ != NULL; fnode_ = flist_next(fnode_)){	// find specified device in offline_device_list

	  device = (offline_device *)fnode_->data;

	  if(!strcmp(device->previous_device, dev)){
		  flag = 1;					// FIXME
		  break;
	  }
  }
  if(flag)	strncpy((char *) qbuf.data, device->new_device, DATA_SIZE);	// the new device name
  else		strncpy((char *) qbuf.data, dev, DATA_SIZE);

  flag = 0;

  pthread_spin_unlock(&mapi_lock);
#else
  strncpy((char *)qbuf.data,dev,DATA_SIZE);
#endif

Arne Øslebø's avatar
Arne Øslebø committed
766
767
768
769
770
  qbuf.mtype=1;
  qbuf.cmd=CREATE_FLOW;
  qbuf.fd=getpid();
  qbuf.pid=getpid();
  pthread_spin_lock(&mapi_lock);
771

's avatar
committed
772
  if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) {
773
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
774
	  pthread_spin_unlock(&mapi_lock);	  
's avatar
committed
775
   	  decr_numflows();
's avatar
committed
776
777
778
	  return -1;
  }
  if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
779
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
780
	  pthread_spin_unlock(&mapi_lock);
's avatar
committed
781
	  decr_numflows();
's avatar
committed
782
783
	  return -1;
  }
Arne Øslebø's avatar
Arne Øslebø committed
784
785
786
787
788
  pthread_spin_unlock(&mapi_lock);
  switch(qbuf.cmd)
    {
    case CREATE_FLOW_ACK:
      tmpflow=flist_get(flowlist,qbuf.fd);
's avatar
committed
789
790
      if (tmpflow!=NULL) 
        {
791
	        printf("ERROR: Mapid gave us a fd (%d) which already exist in our lists, exiting [%s:%d]\n", qbuf.fd, __FILE__, __LINE__);
's avatar
committed
792
	        decr_numflows();
's avatar
committed
793
		return -1;
's avatar
committed
794
     	}
Arne Øslebø's avatar
Arne Øslebø committed
795
      flow=malloc(sizeof(flowdescr_t));
's avatar
   
committed
796
      if( flow == NULL ){
797
	  printf("ERROR: Out of memory [%s:%d]\n", __FILE__, __LINE__);
's avatar
committed
798
	  decr_numflows();
's avatar
committed
799
	  return -1;
's avatar
   
committed
800
      }
Arne Øslebø's avatar
Arne Øslebø committed
801
      flow->fd=qbuf.fd;
's avatar
committed
802
      flow->devtype=(char *)malloc(strlen((char *)qbuf.data)+1);
Arne Øslebø's avatar
Arne Øslebø committed
803
      flow->flist=malloc(sizeof(flist_t));
804
805
806
#ifdef RECONNECT
      flow->device = strdup(dev);
      flow->format = -1;	// in case of online flow, assigned to -1
807
808
      flow->read_results_flag = 0;
      flow->mapid_down = 0;
809
#endif
Arne Øslebø's avatar
Arne Øslebø committed
810
      flow->shm_base=NULL;
Arne Øslebø's avatar
Arne Øslebø committed
811
      flow->shm_spinlock=NULL;
's avatar
committed
812
      flow->file = -1;		// in case of online flow, assigned to -1
813
      flow->is_connected =0;
's avatar
committed
814
      flow->numfd = 0;		// initialize number of open file descriptors to zero
Arne Øslebø's avatar
Arne Øslebø committed
815
      flist_init(flow->flist);
's avatar
committed
816
      strcpy(flow->devtype,(char *)qbuf.data);
Arne Øslebø's avatar
Arne Øslebø committed
817
818
      pthread_spin_lock(&mapi_lock);
      flist_append(flowlist,qbuf.fd,flow);
's avatar
committed
819
      incr_totalflows();
's avatar
committed
820
821
822
#ifdef DIMAPI
      fdseed++;
#endif
Arne Øslebø's avatar
Arne Øslebø committed
823
824
825
826
827
      pthread_spin_unlock(&mapi_lock);
      return qbuf.fd;
      
    /* should probably have a separate error message for ERROR_ACK? */
    case ERROR_ACK:
's avatar
committed
828
      decr_numflows();
829
      local_err=qbuf.remote_errorcode;
Arne Øslebø's avatar
Arne Øslebø committed
830
831
      return -1;
    default:
's avatar
committed
832
      decr_numflows();
Arne Øslebø's avatar
Arne Øslebø committed
833
834
835
836
837
      local_err=MCOM_UNKNOWN_ERROR;
      return -1;
    }
}

838
int mapi_create_offline_flow(const char *dev, int format)
Arne Øslebø's avatar
Arne Øslebø committed
839
840
841
842
//Create new flow
//dev=device that should be used
{
  struct mapiipcbuf qbuf;
843
  flowdescr_t *flow=NULL;
Arne Øslebø's avatar
Arne Øslebø committed
844
  int file;
845
846
  struct devgroupdb *devgroupdb;
  int devgroupid = 0;
847
848
  char *mapidsocket, *mapidsocketglobal;

849
850
851
852
853
854
855
856
857
858
859
860
	devgroupdb = devgroupdb_open(3); // try local, global

	devgroupid = devgroupdb_getgroupid(devgroupdb); // get groupid of the latest instance of mapid

	if(!devgroupid) {
		mapidsocket = printf_string(MAPIDSOCKHOME, getenv("HOME"));
		mapidsocketglobal = strdup(MAPIDSOCKGLOBAL);
	}
	else {
		mapidsocket = printf_string(MAPIDGSOCKHOME, getenv("HOME"), devgroupid);
		mapidsocketglobal = printf_string(MAPIDGSOCKGLOBAL, devgroupid);
	}
861
862

	mapiipc_set_socket_names(mapidsocket, mapidsocketglobal);
Arne Øslebø's avatar
Arne Øslebø committed
863
864
865
866

  pthread_once(&mapi_is_initialized, (void*)mapi_init);

  //Check to see if file can be opened
867
  if (dev==NULL){
868
	  printf("ERROR: NULL device in mapi_create_offline_flow\n");
869
870
871
	  return -1;
  }
  else if ((file=open(dev,O_LARGEFILE))==-1) {
Arne Øslebø's avatar
Arne Øslebø committed
872
873
874
    local_err=MAPI_ERROR_FILE;
    return -1;
  }
's avatar
committed
875
  
's avatar
committed
876
  pthread_spin_lock(&mapi_lock);
's avatar
committed
877
  if ((get_numflows() == 0) && (get_totalflows() > 0) && minit) { // socket has been closed, re-create it
's avatar
committed
878
    if (mapiipc_client_init()<0) {
879
	    local_err = MCOM_INIT_SOCKET_ERROR;
's avatar
committed
880
881
882
	    pthread_spin_unlock(&mapi_lock);
	    return -1;
    }
's avatar
committed
883
    incr_numflows();
Arne Øslebø's avatar
Arne Øslebø committed
884
  }
's avatar
committed
885
886
887
  else
  	incr_numflows();

's avatar
committed
888
  pthread_spin_unlock(&mapi_lock);
Arne Øslebø's avatar
Arne Øslebø committed
889
890
891
892
893
894

  qbuf.mtype=1;
  qbuf.cmd=CREATE_OFFLINE_FLOW;
  qbuf.fd=getpid();
  qbuf.pid=getpid();
  qbuf.fid=format;
's avatar
committed
895
  strncpy((char *)qbuf.data,dev,DATA_SIZE);
Arne Øslebø's avatar
Arne Øslebø committed
896
  pthread_spin_lock(&mapi_lock);
's avatar
committed
897
  if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) {
898
899
	  pthread_spin_unlock(&mapi_lock);	 
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
900
	  decr_numflows();
's avatar
committed
901
902
903
904
	  return -1;
  }
  if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);
905
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
906
	  decr_numflows();
's avatar
committed
907
908
	  return -1;
  }
Arne Øslebø's avatar
Arne Øslebø committed
909
910
911
  if(qbuf.cmd==SEND_FD) {
    if(mapiipc_send_fd(file)==-1) {
      local_err=MAPI_ERROR_SEND_FD;
912
      pthread_spin_unlock(&mapi_lock);
's avatar
committed
913
      decr_numflows();
's avatar
committed
914
      return -1;
Arne Øslebø's avatar
Arne Øslebø committed
915
916
917
    }
  } else {
    local_err=MAPI_ERROR_SEND_FD;
's avatar
committed
918
919
    pthread_spin_unlock(&mapi_lock);
    decr_numflows();
Arne Øslebø's avatar
Arne Øslebø committed
920
921
922
    return -1;
  }

's avatar
committed
923
924
  if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);
925
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
926
	  decr_numflows();
's avatar
committed
927
928
	  return -1;
  }
Arne Øslebø's avatar
Arne Øslebø committed
929
930
931
932
933
934
  pthread_spin_unlock(&mapi_lock);
  switch(qbuf.cmd)
    {
    case CREATE_OFFLINE_FLOW_ACK:
      flow=malloc(sizeof(flowdescr_t));
      flow->fd=qbuf.fd;
's avatar
committed
935
      flow->devtype=(char *)malloc(strlen((char *)qbuf.data)+1);
Arne Øslebø's avatar
Arne Øslebø committed
936
      flow->flist=malloc(sizeof(flist_t));
937
938
939
940
#ifdef RECONNECT
      flow->device = strdup(dev);
      flow->format = format;	// MFF_PCAP or MFF_DAG_ERF
#endif
Arne Øslebø's avatar
Arne Øslebø committed
941
      flow->shm_base=NULL;
Arne Øslebø's avatar
Arne Øslebø committed
942
      flow->shm_spinlock=NULL;
's avatar
committed
943
      flow->file = file;	// file descriptor of offline flow
944
      flow->is_connected=0;
's avatar
committed
945
      flow->numfd = 0;		// initialize number of open file descriptors to zero      
Arne Øslebø's avatar
Arne Øslebø committed
946
      flist_init(flow->flist);
's avatar
committed
947
      strcpy(flow->devtype,(char *)qbuf.data);
Arne Øslebø's avatar
Arne Øslebø committed
948
949
      pthread_spin_lock(&mapi_lock);
      flist_append(flowlist,qbuf.fd,flow);
's avatar
committed
950
      incr_totalflows();
's avatar
committed
951
952
953
#ifdef DIMAPI
      fdseed++;
#endif
Arne Øslebø's avatar
Arne Øslebø committed
954
955
956
      pthread_spin_unlock(&mapi_lock);
      return qbuf.fd;
    case ERROR_ACK:
957
      local_err=qbuf.remote_errorcode;
's avatar
committed
958
      decr_numflows();
Arne Øslebø's avatar
Arne Øslebø committed
959
960
961
      return -1;
    default:
      local_err=MCOM_UNKNOWN_ERROR;
's avatar
committed
962
      decr_numflows();
Arne Øslebø's avatar
Arne Øslebø committed
963
964
965
966
      return -1;
    }
}

967
char* mapi_create_offline_device(const char *path, int format)
's avatar
committed
968
969
970
971
972
//Create new flow
//dev=device that should be used
{
  struct mapiipcbuf qbuf;
  int file;
973
974
975
#ifdef RECONNECT
  offline_device *device = NULL;
#endif
's avatar
committed
976

977
978
979
980
981
982
983
  char *mapidsocket, *mapidsocketglobal;

	mapidsocket = printf_string(MAPIDSOCKHOME, getenv("HOME"));
 	mapidsocketglobal = strdup(MAPIDSOCKGLOBAL);

	mapiipc_set_socket_names(mapidsocket, mapidsocketglobal);

's avatar
committed
984
985
986
  pthread_once(&mapi_is_initialized, (void*)mapi_init);

  //Check to see if file can be opened
987
988
  
  if (path==NULL){
989
	  printf("ERROR: NULL path in mapi_create_offline_device\n");
990
991
992
	  return NULL;
  }
  else if ((file=open(path,O_LARGEFILE))==-1) {
's avatar
committed
993
994
995
996
997
    local_err=MAPI_ERROR_FILE;
    return NULL;
  }

  pthread_spin_lock(&mapi_lock);
's avatar
committed
998
  if ((get_numflows() == 0) && (get_totalflows() > 0) && minit) { // socket has been closed, re-create it
's avatar
committed
999
1000
    if (mapiipc_client_init()<0) {
	    pthread_spin_unlock(&mapi_lock);
1001
	    local_err = MCOM_INIT_SOCKET_ERROR;
's avatar
committed
1002
1003
	    return NULL;
    }
's avatar
committed
1004
1005
1006
1007
1008
1009
1010
1011
  }
  pthread_spin_unlock(&mapi_lock);

  qbuf.mtype=1;
  qbuf.cmd=CREATE_OFFLINE_DEVICE;
  qbuf.fd=getpid();
  qbuf.pid=getpid();
  qbuf.fid=format;
's avatar
committed
1012
  strncpy((char *)qbuf.data,path,DATA_SIZE);
's avatar
committed
1013
  pthread_spin_lock(&mapi_lock);
's avatar
committed
1014
1015
  if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);	  
1016
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
1017
1018
1019
1020
	  return NULL;
  }
  if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);
1021
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
1022
1023
	  return NULL;
  }
's avatar
committed
1024
1025
1026
  if(qbuf.cmd==SEND_FD) {
    if(mapiipc_send_fd(file)==-1) {
      local_err=MAPI_ERROR_SEND_FD;
's avatar
committed
1027
      pthread_spin_unlock(&mapi_lock);
's avatar
committed
1028
1029
1030
1031
      return NULL;      
    }
  } else {
    local_err=MAPI_ERROR_SEND_FD;
's avatar
committed
1032
    pthread_spin_unlock(&mapi_lock);
's avatar
committed
1033
1034
1035
    return NULL;
  }

's avatar
committed
1036
1037
  if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);
1038
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
1039
1040
	  return NULL;
  }
's avatar
committed
1041
1042
1043
1044
  pthread_spin_unlock(&mapi_lock);
  switch(qbuf.cmd)
    {
    case CREATE_OFFLINE_DEVICE_ACK:
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056

#ifdef RECONNECT
      device = (offline_device *)malloc(sizeof(offline_device));
      device->path = strdup(path);
      device->previous_device = strdup((char *) qbuf.data);	// previous device name
      device->new_device = strdup((char *) qbuf.data);		// new device name, after reconnection of client
      device->format = format;
      device->is_started = 0;
      pthread_spin_lock(&mapi_lock);
      flist_append(offline_device_list, count_offline_devices++, device);
      pthread_spin_unlock(&mapi_lock);
#endif
's avatar
committed
1057
      offline_devices++;
1058

's avatar
committed
1059
      return strdup((char *)qbuf.data);
's avatar
committed
1060
    case ERROR_ACK:
1061
      local_err=qbuf.remote_errorcode;
's avatar
committed
1062
1063
1064
1065
1066
1067
1068
      return NULL;
    default:
      local_err=MCOM_UNKNOWN_ERROR;
      return NULL;
    }
}

1069
int mapi_start_offline_device(const char *dev)
's avatar
committed
1070
1071
1072
1073
//Create new flow
//dev=device that should be used
{
  struct mapiipcbuf qbuf;
1074
1075
1076
1077
#ifdef RECONNECT
  offline_device *device = NULL;
  flist_node_t *fnode;
#endif
's avatar
committed
1078
1079
1080

  pthread_once(&mapi_is_initialized, (void*)mapi_init);

1081
  if (dev==NULL){
1082
	  printf("ERROR: NULL device in mapi_start_offline_device\n");
1083
1084
	  return -1;
  }
's avatar
committed
1085
  pthread_spin_lock(&mapi_lock);
's avatar
committed
1086
  if ((get_numflows() == 0)