mapi.c 116 KB
Newer Older
1
2
3
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
Arne Øslebø's avatar
Arne Øslebø committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <stdio.h>
#include <stdarg.h>
#include <stdlib.h>
#include <sys/file.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <netinet/in.h>
#include <errno.h>
#include <pthread.h>
#include <string.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <unistd.h>
#include "mapi.h"
#include "mapiipc.h"
#include "mapilibhandler.h"
#include "flist.h"
#include "debug.h"
's avatar
committed
26
#include "parseconf.h"
's avatar
   
committed
27
#include "printfstring.h"
28
#include "mapi_errors.h"
29
#include "devgroupdb.h"
Arne Øslebø's avatar
Arne Øslebø committed
30
31
32
33
34
35
36
37

#ifdef WITH_ADMISSION_CONTROL
#include <regex.h>
#include <keynote.h>
#include <openssl/rsa.h>
#include "bytestream.h"
#endif

's avatar
committed
38
39
40
41
#ifdef WITH_ANONYMIZATION
#include "anonymization/anonymization.h"
#endif

's avatar
committed
42
43
44
45
#ifdef DIMAPI
#include <signal.h>
#include <semaphore.h>
#endif
Arne Øslebø's avatar
Arne Øslebø committed
46

47
48
49
50
51
#ifdef WITH_AUTHENTICATION
#define FROM_MAPI
#include "vod/vod.h"
#endif

Arne Øslebø's avatar
Arne Øslebø committed
52
static pthread_once_t mapi_is_initialized = PTHREAD_ONCE_INIT;
53
static pthread_once_t initialized = PTHREAD_ONCE_INIT;
Arne Øslebø's avatar
Arne Øslebø committed
54
55
56
static int minit=0; //Set to 1 when MAPI has been initialized

static pthread_spinlock_t mapi_lock;
's avatar
committed
57
static pthread_spinlock_t numflows_lock;  // for numflows and totalflows variables
Arne Øslebø's avatar
Arne Øslebø committed
58
59
60

static int local_err=0; /* occurence of a mapi.c error, translation of these errors */

's avatar
committed
61
62
static int numflows=0;  // number of allocated (active) flows
static int totalflows=0;  // number of flows so far (including closed flows)
Arne Øslebø's avatar
Arne Øslebø committed
63

's avatar
committed
64
static int offline_devices;
's avatar
committed
65
66
static int agent=0;

67
int log_level = LOGGING_DISABLED, log_to_syslog = 0, log_to_file = 0, log_fd_debug = -1;	// support for logging to file & syslog
68

69
extern const errorstruct Errors[];
70

71
72
#ifdef DIMAPI
static int hostcmp(void *h1, void *h2);
's avatar
   
committed
73
static void delete_remote_flow(remote_flowdescr_t* rflow);
74
75
flist_t *hostlist=NULL;//list containing all remote hosts used so far
extern flist_t *remote_flowlist;
's avatar
committed
76
int dimapi_port;
77
extern sem_t stats_sem;
78

79
80
81
82
83
84
85
86
87
typedef struct function_data{

	int fid;			// real fid returned from mapicommd
	mapidflib_function_def_t* fdef;	// function definition
#ifdef RECONNECT
	host_flow *hflow;		// flow of a specified host
	char args[DIMAPI_DATA_SIZE];	// function's arguments
	int index;			// need for functions that have arguments that reference to a flow
#endif
88
89
90
91
} function_data;

static pthread_once_t dmapi_is_initialized = PTHREAD_ONCE_INIT;
static pthread_spinlock_t remote_ipc_lock;
's avatar
committed
92
static pthread_spinlock_t hostlist_lock;
93
94
95
96
97

static unsigned fdseed = 0;        // 'scope' flow descriptor seed (always increases)
static unsigned fidseed = 0;       // function descriptor seed (always increases)
static unsigned negfdseed=-1;       // generates temporary negative fd, for use before create_flow
#endif
's avatar
committed
98
99
100
101
102

typedef struct libinfo {
	char* name;
} libinfo_t;

Arne Øslebø's avatar
Arne Øslebø committed
103
104
typedef struct flowdescr {
  int fd;
105
  int file; 		// file descriptor for offline flows
's avatar
committed
106
107
  int fds[256];		// file descriptors
  int numfd;		// number of file descriptors
Arne Øslebø's avatar
Arne Øslebø committed
108
  char *devtype;
109
110
111
#ifdef RECONNECT
  char *device;
#endif
Arne Øslebø's avatar
Arne Øslebø committed
112
  char *shm_base;
's avatar
committed
113
  pthread_spinlock_t *shm_spinlock;
Arne Øslebø's avatar
Arne Øslebø committed
114
  flist_t *flist;
115
116
  unsigned char is_connected;	// this should be 1 if the flow is connected 0 otherwise
  int format;			// this should be MFF_PCAP or MFF_DAG_ERF if the flow is offline, -1 otherwise  
Arne Øslebø's avatar
Arne Øslebø committed
117
118
} flowdescr_t;

119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#ifdef RECONNECT

typedef struct offline_device{
	char *path;			// specifies the name of the trace file to open
	char *previous_device;		// return value of mapi_create_offline_device (device name that should be used in mapi_start_offline_device)
	char *new_device;		// new device name (returned from mapid)
	int format;			// the format of the captured packets (MFF_PCAP or MFF_DAG_ERF)
	unsigned char is_started;	// this should be 1 if the device is started ( via mapi_start_offline_device() ), 0 otherwise
} offline_device;

static unsigned count_offline_devices = 0;	// used in offline_device_list (always increases)

extern flist_t *function_list;			// list which contains all applyed functions ( defined in mapiipc.c )
extern flist_t *offline_device_list;		// list which contains all offline devices ( defined in mapiipc.c )

#endif

extern flist_t *flowlist;	// defined in mapiipc.c
Arne Øslebø's avatar
Arne Øslebø committed
137
138
139
140
141

typedef struct functdescr {
  int fid;
  short result_init;
  mapidflib_function_def_t *def;
142
  mapidflib_function_t *funct;
Arne Øslebø's avatar
Arne Øslebø committed
143
  void *data;  
's avatar
committed
144
  mapi_results_t *result;
145
146
147
148
#ifdef RECONNECT
  flowdescr_t *flow;
  int numfd;
#endif
Arne Øslebø's avatar
Arne Øslebø committed
149
150
151
152
153
154
155
156
157
158
159
160
161
} functdescr_t;

typedef struct shm_result {
  void* ptr; //Pointer to shared data
  int size; //Size of shared data
} shm_result_t;

/*
 * Function declarations 
 */
static int
default_read_result_init(flowdescr_t *flow,functdescr_t* f,void* data);
int 
's avatar
committed
162
get_results_info(flowdescr_t *flow, functdescr_t *f, int user_fd, int user_fid);
Arne Øslebø's avatar
Arne Øslebø committed
163

164
//static int set_error(void* flow, int err_no , int is_remote);
Arne Øslebø's avatar
Arne Øslebø committed
165
166
167

static int send_fd(int *fds,int numfd);

's avatar
committed
168
//global var access functions
's avatar
   
committed
169
170
171
172
173
static int get_numflows();
static int incr_numflows();
static int decr_numflows();
static int get_totalflows();
static int incr_totalflows();
Arne Øslebø's avatar
Arne Øslebø committed
174

175
176
177
static void init()
//common initialization function for mapi and dimapi
{
's avatar
committed
178
#ifdef DIMAPI
179
  char* path=NULL, *libs=NULL, *str=NULL, *s=NULL;
's avatar
committed
180
  char* mapi_conf;
181
  
182
183
  mapi_conf = printf_string( CONFDIR"/"CONF_FILE );

's avatar
committed
184
185
  if (pc_load (mapi_conf))
    {
's avatar
   
committed
186
187
188
189
      conf_category_entry_t* empty_cat = pc_get_category("");
      const char *portstr;

      if( empty_cat == NULL ){
190
          printf("Configuration file has no empty category. Giving up\n");
's avatar
   
committed
191
192
193
194
195
196
          exit(1);
      }
      path = pc_get_param (empty_cat, "libpath");
      libs = pc_get_param (empty_cat, "libs");
      portstr = pc_get_param (empty_cat, "dimapi_port");
      if( portstr == NULL ){
197
          printf("ERROR: Configuration file has no entry for `dimapi_port'. Using default port %d\n", DEFAULT_DIMAPI_PORT);
's avatar
committed
198
199
200
201
202
203
	  dimapi_port = DEFAULT_DIMAPI_PORT;
      }
      else {
	      /* make sure that portstr is a valid number. */
	      dimapi_port = atoi( portstr );
	      if ( dimapi_port<=0 || dimapi_port>=65536 ) {
204
		      printf("ERROR: Invalid port given in configuration file. The default port %d is used\n", DEFAULT_DIMAPI_PORT);
's avatar
committed
205
206
		      dimapi_port = DEFAULT_DIMAPI_PORT;
	      }
's avatar
   
committed
207
      }
's avatar
committed
208
209
210
    }
  else 
    {
211
212
      printf("ERROR: Cannot load mapi.conf file. Giving up.\n");
      printf("Search path is: %s\n", mapi_conf);
's avatar
committed
213
214
215
216
      exit(1);
    }
#endif

217
218
219
220
221
222
  minit = 1;
  pthread_spin_init(&numflows_lock, PTHREAD_PROCESS_PRIVATE);
  
  flowlist = malloc(sizeof(flist_t));
  flist_init(flowlist);

223
224
225
226
227
228
229
230
#ifdef RECONNECT
  function_list = malloc(sizeof(flist_t));
  flist_init(function_list);
  
  offline_device_list = malloc(sizeof(flist_t));
  flist_init(offline_device_list);
#endif

231
232
#ifdef DIMAPI
  pthread_spin_init(&remote_ipc_lock, PTHREAD_PROCESS_PRIVATE);
's avatar
committed
233
  pthread_spin_init(&hostlist_lock, PTHREAD_PROCESS_PRIVATE);
234
235
236
237
  hostlist = malloc(sizeof(flist_t));
  remote_flowlist = malloc(sizeof(flist_t));
  flist_init(hostlist); 
  flist_init(remote_flowlist);
's avatar
committed
238

239
240
241
242
243
244
245
246
  str=libs;
  while((s=strchr(str,':'))!=NULL) {
    *s='\0';
    mapilh_load_library(path,str);
    str=s+1;
  }

  mapilh_load_library(path,str);
's avatar
committed
247
  free(mapi_conf);
248
  pc_close();
249
250
251
#endif
}

Arne Øslebø's avatar
Arne Øslebø committed
252
//Initializes MAPI - called only once by pthread_once()
's avatar
committed
253
static void mapi_init()
Arne Øslebø's avatar
Arne Øslebø committed
254
255
256
257
{
  struct mapiipcbuf qbuf;
  char libpath[4096],*str,*s;
  minit=1;
258
259
  if(mapiipc_client_init()==-1){
	 local_err = MCOM_INIT_SOCKET_ERROR;
260
	 minit=0;
's avatar
committed
261
262
263
	 fprintf(stderr, "\n--------------------------------------------------------\n");
	 fprintf(stderr, "WARNING: mapid may not be running at the given interface\n");
	 fprintf(stderr,"--------------------------------------------------------\n");
264
  }
265
  pthread_once(&initialized, (void*)init);
Arne Øslebø's avatar
Arne Øslebø committed
266
267
  pthread_spin_init(&mapi_lock, PTHREAD_PROCESS_PRIVATE);
  
's avatar
committed
268
269
  offline_devices = 0;
  
Arne Øslebø's avatar
Arne Øslebø committed
270
271
272
273
274
275
  //Get libpath from mapid
  qbuf.mtype=1;
  qbuf.cmd=GET_LIBPATH;
  qbuf.fd=getpid();
  qbuf.pid=getpid();
  pthread_spin_lock(&mapi_lock);
276
277
278
279
280
281
  
 if( mapiipc_write((struct mapiipcbuf*)&qbuf))
	   	local_err = MCOM_SOCKET_ERROR;
 if( mapiipc_read((struct mapiipcbuf*)&qbuf) )
	   	local_err = MCOM_SOCKET_ERROR;
 
Arne Øslebø's avatar
Arne Øslebø committed
282
283
284
  pthread_spin_unlock(&mapi_lock);
  switch(qbuf.cmd) {
      case GET_LIBPATH_ACK:
's avatar
committed
285
	  strncpy(libpath, (char *)qbuf.data, 4096);
Arne Øslebø's avatar
Arne Øslebø committed
286
287
288
289
290
291
	  break;
      default:
	   /* MAPI_ERROR_GETTING_LIBPATH */
	  return;
	  break;
  }
292
  printf("libpath=%s\n", libpath);
Arne Øslebø's avatar
Arne Øslebø committed
293
294
295
296
297
298
299
		  
  //get libs from mapid
  qbuf.mtype=1;
  qbuf.cmd=GET_LIBS;
  qbuf.fd=getpid();
  qbuf.pid=getpid();
  pthread_spin_lock(&mapi_lock);
300
301
302
303
  if(mapiipc_write((struct mapiipcbuf*)&qbuf))
	   	local_err = MCOM_SOCKET_ERROR;
 if( mapiipc_read((struct mapiipcbuf*)&qbuf))
	   	local_err = MCOM_SOCKET_ERROR;
Arne Øslebø's avatar
Arne Øslebø committed
304
305
306
307
308
309
310
311
312
313
314
  pthread_spin_unlock(&mapi_lock);
  switch(qbuf.cmd) {
      case GET_LIBS_ACK:
	  break;
      default:
 	  /* MAPI_ERROR_GETTING_LIBS */
	  return; 
	  break;
  }

  //Load function libraries
's avatar
committed
315
    str = (char *)qbuf.data;
Arne Øslebø's avatar
Arne Øslebø committed
316
317
318
319
320
321
322
323
324
325
    while((s=strchr(str,':'))!=NULL) {
      *s='\0';
      mapilh_load_library(libpath,str);
      str=s+1;
    }

    mapilh_load_library(libpath,str);
    return;
}

326
327
328
329
330
331
332
333
334
#ifdef DIMAPI
//Initializes DIMAPI - called only once by pthread_once()
static void dmapi_init()
{ 
  pthread_once(&initialized, (void*)init);
  return;
}
#endif

Arne Øslebø's avatar
Arne Øslebø committed
335
336
337
338
339
340
341
int mapi_connect(int fd)
//Connect to a mapi flow
//fd = flow descriptor
{
  struct mapiipcbuf qbuf;
  flowdescr_t* flow;

342
343
344
345
346
347
#ifdef DIMAPI
  remote_flowdescr_t* rflow;
  host_flow* hflow;
  flist_node_t* fnode;
#endif

Arne Øslebø's avatar
Arne Øslebø committed
348
  if (!minit) {
349
    printf("MAPI not initialized! [%s:%d]\n", __FILE__, __LINE__);
350
    local_err = MAPI_INIT_ERROR;
Arne Øslebø's avatar
Arne Øslebø committed
351
    return -1;
352
  }else if (fd<=0){
353
   printf("ERROR: Invalid flow descriptor (fd: %d) in mapi_connect\n", fd);
354
   return -1;	  
Arne Øslebø's avatar
Arne Øslebø committed
355
356
  }

357
358
359
#ifdef DIMAPI
  if ((rflow=flist_get(remote_flowlist,fd))!=NULL) {//flow is remote
    for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
360

361
362
363
364
365
      hflow=(host_flow*)fnode->data;
      hflow->dbuf->cmd=CONNECT;
      hflow->dbuf->fd=hflow->fd;
      hflow->dbuf->length=BASIC_SIZE;
    }
366
367
368
#ifdef WITH_AUTHENTICATION
    if(rflow->is_authenticated == 0)
    {
369
	   printf("ERROR: Flow with id %d is not authenticated\n", fd);
370
	   rflow->is_connected = 0;
's avatar
committed
371
	   return(-2);
372
373
374
    }
    
#endif
375
376
377
378
    if (mapiipc_remote_write_to_all(rflow)<0){ 
	local_err = MCOM_SOCKET_ERROR;
	return -1;
    }
379
380
381
382
383
384
385
386

    //wait results

    for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
      hflow=(host_flow*)fnode->data;

      switch(hflow->dbuf->cmd) {
        case CONNECT_ACK:
387
		rflow->is_connected=1;
388
389
          continue;
        case ERROR_ACK:
's avatar
committed
390
	  memcpy(&local_err, hflow->dbuf->data, sizeof(int));
391
392
          return -1;
        default:
's avatar
committed
393
	  local_err = MCOM_UNKNOWN_ERROR;
394
395
396
397
398
399
400
401
          return -1;
     }

    }
    return 0;
  }
#endif

Arne Øslebø's avatar
Arne Øslebø committed
402
  if ((flow=flist_get(flowlist,fd))==NULL) {
403
    printf("ERROR: Invalid flow %d [%s:%d]\n", fd, __FILE__, __LINE__);
404
    local_err = MAPI_INVALID_FLOW;
Arne Øslebø's avatar
Arne Øslebø committed
405
    return -1;
406
407
  }

Arne Øslebø's avatar
Arne Øslebø committed
408
409
  qbuf.mtype=1;
  qbuf.cmd=CONNECT;
410
411
412
413
#ifdef RECONNECT
  qbuf.fd = flow->fd;
  qbuf.user_fd = fd;
#else
Arne Øslebø's avatar
Arne Øslebø committed
414
  qbuf.fd=fd;
415
#endif
Arne Øslebø's avatar
Arne Øslebø committed
416
417
  qbuf.pid=getpid();
  pthread_spin_lock(&mapi_lock);
's avatar
committed
418
419
  if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);	  
420
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
421
422
423
424
	  return -1;
  }
  if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);
425
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
426
427
	  return -1;
  }
Arne Øslebø's avatar
Arne Øslebø committed
428
429
430
431
  pthread_spin_unlock(&mapi_lock);
  switch(qbuf.cmd)
    {
    case CONNECT_ACK:
432
	flow->is_connected=1;
's avatar
committed
433
      return 0;
Arne Øslebø's avatar
Arne Øslebø committed
434
    case ERROR_ACK:
435
      local_err =  qbuf.remote_errorcode;
Arne Øslebø's avatar
Arne Øslebø committed
436
437
      return -1;
    default:
438
      local_err= MCOM_UNKNOWN_ERROR;
Arne Øslebø's avatar
Arne Øslebø committed
439
440
441
442
      return -1;
    }  
}

443
#ifdef DIMAPI
's avatar
   
committed
444
static void delete_remote_flow(remote_flowdescr_t* rflow) 
445
446
{
  host_flow* hflow;
's avatar
committed
447
  flist_node_t* fnode, *fnode2, *fnode3;
's avatar
   
committed
448
  mapi_results_t* res;
449
450

  pthread_spin_lock(&remote_ipc_lock);
451
  flist_remove(remote_flowlist, rflow->fd);
452
453
454
455
  decr_numflows();
  pthread_spin_unlock(&remote_ipc_lock);

  sem_destroy(&rflow->fd_sem);
's avatar
committed
456
  sem_destroy(&rflow->pkt_sem);
's avatar
committed
457
  //pthread_mutex_destroy(&rflow->mutex);
458
459
460
461
462

  for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=fnode2 ) {
    fnode2=flist_next(fnode);
    hflow=(host_flow*)fnode->data;
    hflow->rhost->num_flows--;
463
    flist_remove(hflow->rhost->flows, hflow->fd);
's avatar
committed
464
    for (fnode3=flist_head(hflow->functions); fnode3!=NULL; fnode3=flist_next(fnode3)) {
465
      free( flist_remove(hflow->rhost->functions, ((function_data*)fnode3->data)->fid) );
's avatar
committed
466
    }
467
    if (hflow->rhost->num_flows==0) {
468
      pthread_cancel(*hflow->rhost->comm_thread);
469
      mapiipc_remote_close(hflow->rhost);	//close the socket
470
      flist_destroy(hflow->rhost->flows);
471
      free(hflow->rhost->flows);
472
      flist_destroy(hflow->rhost->functions);
's avatar
committed
473
      free(hflow->rhost->functions);
474
      free(hflow->rhost->hostname);
475
      flist_remove(hostlist, hflow->rhost->sockfd);
's avatar
committed
476
      free(hflow->rhost->comm_thread);
477
478
479
#ifdef RECONNECT
      sem_destroy(&hflow->rhost->connection);	// destroy semaphore
#endif
480
481
482
      free(hflow->rhost);
    }
    //we check if a host is using in other rflows and delete it -close the socket- if not
483
    flist_destroy(hflow->functions);
484
485
486
    free(hflow->functions);
    free(hflow->dev);
    free(hflow->dbuf);
487
    if (hflow->pkt!=NULL) free(hflow->pkt);
488
    flist_remove(rflow->host_flowlist,hflow->id);
's avatar
committed
489
    free(hflow);
490
  }
491
  flist_destroy(rflow->host_flowlist);
's avatar
   
committed
492
493
  free(rflow->host_flowlist);
  if (rflow->pkt_list!=NULL) {
494
	flist_destroy(rflow->pkt_list);
's avatar
   
committed
495
496
497
498
499
500
501
502
	free(rflow->pkt_list);
  }
  for (fnode=flist_head(rflow->function_res); fnode!=NULL; fnode=fnode2 ) { 
	fnode2=flist_next(fnode);
	res=(mapi_results_t*)fnode->data;
	free(res->res);
	free(res);
  }
503
  flist_destroy(rflow->function_res);
's avatar
committed
504
  free(rflow->function_res);
's avatar
committed
505
  if (rflow->pkt!=NULL) free(rflow->pkt);
's avatar
   
committed
506
#ifdef WITH_AUTHENTICATION
's avatar
committed
507
508
  if (rflow->username!=NULL) free(rflow->username);
  if (rflow->vo!=NULL) free(rflow->vo);
509
510
511
#ifdef RECONNECT
  if (rflow->password!=NULL) free(rflow->password);
#endif
's avatar
   
committed
512
#endif  
513
514
515
516
  free(rflow);
}
#endif

517
int mapi_create_flow(const char *dev)
Arne Øslebø's avatar
Arne Øslebø committed
518
519
520
521
522
//Create new flow
//dev=device that should be used
{
  struct mapiipcbuf qbuf;
  flowdescr_t *flow, *tmpflow;
523
524
525
526
  struct devgroupdb *devgroupdb;
  int devgroupid = 0;
  char *mapidsocket, *mapidsocketglobal;
/*  int local;*/
Arne Øslebø's avatar
Arne Øslebø committed
527

528
529
530
531
532
#ifdef DIMAPI
  remote_flowdescr_t *rflow;
  char *hostname=NULL, *s=NULL, *k=NULL;
  struct host *h=NULL;
  host_flow* hflow;
's avatar
committed
533
  char *devp;
534
535
536
  flist_node_t* fnode;
  unsigned int idgen=0;
#endif
537
538
539
540
541
542
543
#ifdef RECONNECT
  offline_device *device = NULL;
  flist_node_t *fnode_;
  int flag = 0;
#endif

  if(dev==NULL){
544
	 printf("ERROR: Wrong device name given (NULL) in mapi_create_flow\n");
545
546
547
	 local_err  = MAPI_DEVICE_INFO_ERR;
	 return -1;
  }
548
549

#ifndef DIMAPI
550
551
552
553
554
555
556
557
558
559
560
561
562
	devgroupdb = devgroupdb_open(3); // try local, global
	devgroupid = devgroupdb_getgroupidbydevice(devgroupdb, (char *) dev);

	if(!devgroupid) {
		mapidsocket = printf_string(MAPIDSOCKHOME, getenv("HOME"));
  	mapidsocketglobal = strdup(MAPIDSOCKGLOBAL);
  }
	else {
		mapidsocket = printf_string(MAPIDGSOCKHOME, getenv("HOME"), devgroupid);
		mapidsocketglobal = printf_string(MAPIDGSOCKGLOBAL, devgroupid);
	}

	mapiipc_set_socket_names(mapidsocket, mapidsocketglobal);
563

's avatar
committed
564
565
566
  pthread_once(&mapi_is_initialized, (void*)mapi_init);
#endif

567
  //check if flow is remote or not and call the appropriate init function
568
#ifdef DIMAPI
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
  if ( strchr(dev,':')==NULL) {
		devgroupdb = devgroupdb_open(3); // try local, global
		devgroupid = devgroupdb_getgroupidbydevice(devgroupdb, (char *) dev);

		if(!devgroupid) {
			mapidsocket = printf_string(MAPIDSOCKHOME, getenv("HOME"));
			mapidsocketglobal = strdup(MAPIDSOCKGLOBAL);
		}
		else {
			mapidsocket = printf_string(MAPIDGSOCKHOME, getenv("HOME"), devgroupid);
			mapidsocketglobal = printf_string(MAPIDGSOCKGLOBAL, devgroupid);
		}

		mapiipc_set_socket_names(mapidsocket, mapidsocketglobal);

  	pthread_once(&mapi_is_initialized, (void*)mapi_init);
	}
586
587
588
  else pthread_once(&dmapi_is_initialized, (void*)dmapi_init);

  if ((s = strchr(dev,':'))!=NULL) {
's avatar
committed
589
    devp=strdup(dev);
590
591
592
    rflow=(remote_flowdescr_t *)malloc(sizeof(remote_flowdescr_t));
    rflow->fd=++fdseed;
    sem_init(&rflow->fd_sem, 0, 0);
's avatar
committed
593
    sem_init(&rflow->pkt_sem, 0, 0);
594
595
596
597
    rflow->host_flowlist=(flist_t*)malloc(sizeof(flist_t));
    flist_init(rflow->host_flowlist);
    rflow->pkt_list=NULL;
    rflow->function_res=(flist_t*)malloc(sizeof(flist_t));
598
    rflow->is_connected = 0;
's avatar
   
committed
599
600
601
#ifdef WITH_AUTHENTICATION
    rflow->username=NULL;
    rflow->vo=NULL;
602
603
604
#ifdef RECONNECT
    rflow->password = NULL;
#endif
's avatar
   
committed
605
#endif
's avatar
committed
606
    rflow->pkt=NULL;
607
608
609
610
611
612
613
614
    flist_init(rflow->function_res);
    k=strtok(devp, ", ");

    while (k!=NULL) {
      if ((s = strchr(k,':'))!=NULL) {
        *s = '\0';
        hostname = k;
        k = s + 1;
's avatar
committed
615
        pthread_spin_lock(&hostlist_lock);	
616
        h = (struct host *)flist_search(hostlist, hostcmp, hostname);
617
618
619
620

        if(h==NULL){// Our host is a new one --> insert it in the hostlist
          h = (struct host *)malloc(sizeof(struct host));
	  h->hostname = strdup(hostname);
's avatar
committed
621
	  h->port = dimapi_port;
622
623
	  h->flows = (flist_t *)malloc(sizeof(flist_t));
	  flist_init(h->flows);
's avatar
committed
624
625
	  h->functions = (flist_t *)malloc(sizeof(flist_t));
	  flist_init(h->functions);
626
	  h->num_flows=0;
627
	  h->stats=NULL;
628
629
630
#ifdef RECONNECT
      	  sem_init(&h->connection, 0, 0);	// initialize semaphore
#endif
631
	  // Create the socket
's avatar
committed
632
	  if (mapiipc_remote_init(h)<0) {
633
	   	  local_err = MCOM_SOCKET_ERROR;
634
		  printf("ERROR: Could not connect with host %s [%s:%d]\n", h->hostname, __FILE__, __LINE__);
's avatar
committed
635
636
637
		  pthread_spin_unlock(&hostlist_lock);
		  return -1;
	  }
's avatar
committed
638

639
640
641
642
	  h->comm_thread=(pthread_t *)malloc(sizeof(pthread_t));
	  pthread_create(h->comm_thread, NULL, *mapiipc_comm_thread, h);

          flist_append(hostlist, h->sockfd, h);
's avatar
committed
643
          pthread_spin_unlock(&hostlist_lock);
644
645
        }
        else{//host exists in the list
's avatar
committed
646
          pthread_spin_unlock(&hostlist_lock);
647
648
649
650
651
652
653
        }

        h->num_flows++;
	hflow=(host_flow*)malloc(sizeof(host_flow));
	hflow->scope_fd=rflow->fd;
	hflow->dev=strdup(k);
	flist_append(rflow->host_flowlist, ++idgen, hflow);
654
	hflow->id=idgen;
655
656
657
	flist_append(h->flows, --negfdseed, hflow);
	hflow->fd=negfdseed;
	hflow->dbuf=(struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf));
658
	hflow->pkt=NULL;
659
660
661
662
663
	hflow->rhost=h;
	hflow->functions=(flist_t *)malloc(sizeof(flist_t));
	flist_init(hflow->functions);

	hflow->dbuf->cmd=CREATE_FLOW;
's avatar
committed
664
	strncpy((char *)hflow->dbuf->data,k,DATA_SIZE);
665
666
667
668
669
670
671
672
673
674
675
676
677
	hflow->dbuf->length=BASIC_SIZE+strlen(k)+1;
      }
      else {
        //this is the case where the dev string contains both 'host:interface1' and 'interface2'
	//example: mapi_create_flow("139.91.70.98:eth0, 147.52.16.102:eth0, eth1");
	//user's intention is probably localhost:eth1
	//what should be done in this case?
      }
      k=strtok(NULL,", ");
    }

    free(devp);

's avatar
committed
678
    rflow->scope_size=flist_size(rflow->host_flowlist);
679
680
681
682
683
684
    pthread_spin_lock(&remote_ipc_lock);
    flist_append(remote_flowlist, rflow->fd, rflow);
    incr_numflows();
    incr_totalflows();
    pthread_spin_unlock(&remote_ipc_lock);

685
686
687
688
    if (mapiipc_remote_write_to_all(rflow)<0){
	    local_err = MCOM_SOCKET_ERROR;
	    return -1;
    }
's avatar
committed
689
    //sends to all hosts of rflow the proper dbuf, increment the pending_msgs makes sem_wait(rflow->fd_sem) and the comm_thread will get the results - the hflow->fd for every flow -
690
691
692
693
694

    //wait for results

    for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
      hflow=(host_flow*)fnode->data;
's avatar
committed
695
      if (hflow->dbuf->cmd == CREATE_FLOW_ACK) {
696
        hflow->fd=*((int*)hflow->dbuf->data);
697
	flist_remove(hflow->rhost->flows, hflow->dbuf->fd);
698
699
	flist_append(hflow->rhost->flows, hflow->fd, hflow);
      }
's avatar
committed
700
701
      else if (hflow->dbuf->cmd == ERROR_ACK) {
	memcpy(&local_err, hflow->dbuf->data, sizeof(int));
702
	printf("ERROR: Could not create flow in host %s [%s:%d]\n", hflow->rhost->hostname, __FILE__, __LINE__);
's avatar
committed
703
704
705
706
707
        delete_remote_flow(rflow);
	return -1;
      }
      else {
	local_err = MCOM_UNKNOWN_ERROR;
708
709
710
711
712
713
714
715
716
717
        delete_remote_flow(rflow);
	return -1;
      }
    }

    return rflow->fd;

  }
#endif

's avatar
committed
718
719
  pthread_spin_lock(&mapi_lock);
  
's avatar
committed
720
  if ((get_numflows() == 0) && (get_totalflows() > 0) && minit) { // socket has been closed, re-create it
721
722
723
    if(mapiipc_client_init()==-1){
	  local_err = MCOM_INIT_SOCKET_ERROR;
    }
's avatar
committed
724
    incr_numflows();
Arne Øslebø's avatar
Arne Øslebø committed
725
  }
's avatar
committed
726
727
728
729
  else 
    incr_numflows();
  
  pthread_spin_unlock(&mapi_lock);
Arne Øslebø's avatar
Arne Øslebø committed
730

731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
#ifdef RECONNECT
  /* consider the following case :
     	- devicename = mapi_create_offline_device("./tracefile", MFF_PCAP);
	- {
		code ...
		--> reconnection
		code ...
	  }
	  fd = mapi_create_flow(devicename);

	  Now mapi_create_flow must send to mapid the new device name ...
  */

  pthread_spin_lock(&mapi_lock);

  for(fnode_ = flist_head(offline_device_list); fnode_ != NULL; fnode_ = flist_next(fnode_)){	// find specified device in offline_device_list

	  device = (offline_device *)fnode_->data;

	  if(!strcmp(device->previous_device, dev)){
		  flag = 1;					// FIXME
		  break;
	  }
  }
  if(flag)	strncpy((char *) qbuf.data, device->new_device, DATA_SIZE);	// the new device name
  else		strncpy((char *) qbuf.data, dev, DATA_SIZE);

  flag = 0;

  pthread_spin_unlock(&mapi_lock);
#else
  strncpy((char *)qbuf.data,dev,DATA_SIZE);
#endif

Arne Øslebø's avatar
Arne Øslebø committed
765
766
767
768
769
  qbuf.mtype=1;
  qbuf.cmd=CREATE_FLOW;
  qbuf.fd=getpid();
  qbuf.pid=getpid();
  pthread_spin_lock(&mapi_lock);
770

's avatar
committed
771
  if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) {
772
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
773
	  pthread_spin_unlock(&mapi_lock);	  
's avatar
committed
774
   	  decr_numflows();
's avatar
committed
775
776
777
	  return -1;
  }
  if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
778
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
779
	  pthread_spin_unlock(&mapi_lock);
's avatar
committed
780
	  decr_numflows();
's avatar
committed
781
782
	  return -1;
  }
Arne Øslebø's avatar
Arne Øslebø committed
783
784
785
786
787
  pthread_spin_unlock(&mapi_lock);
  switch(qbuf.cmd)
    {
    case CREATE_FLOW_ACK:
      tmpflow=flist_get(flowlist,qbuf.fd);
's avatar
committed
788
789
      if (tmpflow!=NULL) 
        {
790
	        printf("ERROR: Mapid gave us a fd (%d) which already exist in our lists, exiting [%s:%d]\n", qbuf.fd, __FILE__, __LINE__);
's avatar
committed
791
	        decr_numflows();
's avatar
committed
792
		return -1;
's avatar
committed
793
     	}
Arne Øslebø's avatar
Arne Øslebø committed
794
      flow=malloc(sizeof(flowdescr_t));
's avatar
   
committed
795
      if( flow == NULL ){
796
	  printf("ERROR: Out of memory [%s:%d]\n", __FILE__, __LINE__);
's avatar
committed
797
	  decr_numflows();
's avatar
committed
798
	  return -1;
's avatar
   
committed
799
      }
Arne Øslebø's avatar
Arne Øslebø committed
800
      flow->fd=qbuf.fd;
's avatar
committed
801
      flow->devtype=(char *)malloc(strlen((char *)qbuf.data)+1);
Arne Øslebø's avatar
Arne Øslebø committed
802
      flow->flist=malloc(sizeof(flist_t));
803
804
805
806
#ifdef RECONNECT
      flow->device = strdup(dev);
      flow->format = -1;	// in case of online flow, assigned to -1
#endif
Arne Øslebø's avatar
Arne Øslebø committed
807
      flow->shm_base=NULL;
Arne Øslebø's avatar
Arne Øslebø committed
808
      flow->shm_spinlock=NULL;
's avatar
committed
809
      flow->file = -1;		// in case of online flow, assigned to -1
810
      flow->is_connected =0;
's avatar
committed
811
      flow->numfd = 0;		// initialize number of open file descriptors to zero
Arne Øslebø's avatar
Arne Øslebø committed
812
      flist_init(flow->flist);
's avatar
committed
813
      strcpy(flow->devtype,(char *)qbuf.data);
Arne Øslebø's avatar
Arne Øslebø committed
814
815
      pthread_spin_lock(&mapi_lock);
      flist_append(flowlist,qbuf.fd,flow);
's avatar
committed
816
      incr_totalflows();
's avatar
committed
817
818
819
#ifdef DIMAPI
      fdseed++;
#endif
Arne Øslebø's avatar
Arne Øslebø committed
820
821
822
823
824
      pthread_spin_unlock(&mapi_lock);
      return qbuf.fd;
      
    /* should probably have a separate error message for ERROR_ACK? */
    case ERROR_ACK:
's avatar
committed
825
      decr_numflows();
826
      local_err=qbuf.remote_errorcode;
Arne Øslebø's avatar
Arne Øslebø committed
827
828
      return -1;
    default:
's avatar
committed
829
      decr_numflows();
Arne Øslebø's avatar
Arne Øslebø committed
830
831
832
833
834
      local_err=MCOM_UNKNOWN_ERROR;
      return -1;
    }
}

835
int mapi_create_offline_flow(const char *dev, int format)
Arne Øslebø's avatar
Arne Øslebø committed
836
837
838
839
//Create new flow
//dev=device that should be used
{
  struct mapiipcbuf qbuf;
840
  flowdescr_t *flow=NULL;
Arne Øslebø's avatar
Arne Øslebø committed
841
  int file;
842
843
  struct devgroupdb *devgroupdb;
  int devgroupid = 0;
844
845
  char *mapidsocket, *mapidsocketglobal;

846
847
848
849
850
851
852
853
854
855
856
857
	devgroupdb = devgroupdb_open(3); // try local, global

	devgroupid = devgroupdb_getgroupid(devgroupdb); // get groupid of the latest instance of mapid

	if(!devgroupid) {
		mapidsocket = printf_string(MAPIDSOCKHOME, getenv("HOME"));
		mapidsocketglobal = strdup(MAPIDSOCKGLOBAL);
	}
	else {
		mapidsocket = printf_string(MAPIDGSOCKHOME, getenv("HOME"), devgroupid);
		mapidsocketglobal = printf_string(MAPIDGSOCKGLOBAL, devgroupid);
	}
858
859

	mapiipc_set_socket_names(mapidsocket, mapidsocketglobal);
Arne Øslebø's avatar
Arne Øslebø committed
860
861
862
863

  pthread_once(&mapi_is_initialized, (void*)mapi_init);

  //Check to see if file can be opened
864
  if (dev==NULL){
865
	  printf("ERROR: NULL device in mapi_create_offline_flow\n");
866
867
868
	  return -1;
  }
  else if ((file=open(dev,O_LARGEFILE))==-1) {
Arne Øslebø's avatar
Arne Øslebø committed
869
870
871
    local_err=MAPI_ERROR_FILE;
    return -1;
  }
's avatar
committed
872
  
's avatar
committed
873
  pthread_spin_lock(&mapi_lock);
's avatar
committed
874
  if ((get_numflows() == 0) && (get_totalflows() > 0) && minit) { // socket has been closed, re-create it
's avatar
committed
875
    if (mapiipc_client_init()<0) {
876
	    local_err = MCOM_INIT_SOCKET_ERROR;
's avatar
committed
877
878
879
	    pthread_spin_unlock(&mapi_lock);
	    return -1;
    }
's avatar
committed
880
    incr_numflows();
Arne Øslebø's avatar
Arne Øslebø committed
881
  }
's avatar
committed
882
883
884
  else
  	incr_numflows();

's avatar
committed
885
  pthread_spin_unlock(&mapi_lock);
Arne Øslebø's avatar
Arne Øslebø committed
886
887
888
889
890
891

  qbuf.mtype=1;
  qbuf.cmd=CREATE_OFFLINE_FLOW;
  qbuf.fd=getpid();
  qbuf.pid=getpid();
  qbuf.fid=format;
's avatar
committed
892
  strncpy((char *)qbuf.data,dev,DATA_SIZE);
Arne Øslebø's avatar
Arne Øslebø committed
893
  pthread_spin_lock(&mapi_lock);
's avatar
committed
894
  if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) {
895
896
	  pthread_spin_unlock(&mapi_lock);	 
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
897
	  decr_numflows();
's avatar
committed
898
899
900
901
	  return -1;
  }
  if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);
902
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
903
	  decr_numflows();
's avatar
committed
904
905
	  return -1;
  }
Arne Øslebø's avatar
Arne Øslebø committed
906
907
908
  if(qbuf.cmd==SEND_FD) {
    if(mapiipc_send_fd(file)==-1) {
      local_err=MAPI_ERROR_SEND_FD;
909
      pthread_spin_unlock(&mapi_lock);
's avatar
committed
910
      decr_numflows();
's avatar
committed
911
      return -1;
Arne Øslebø's avatar
Arne Øslebø committed
912
913
914
    }
  } else {
    local_err=MAPI_ERROR_SEND_FD;
's avatar
committed
915
916
    pthread_spin_unlock(&mapi_lock);
    decr_numflows();
Arne Øslebø's avatar
Arne Øslebø committed
917
918
919
    return -1;
  }

's avatar
committed
920
921
  if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);
922
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
923
	  decr_numflows();
's avatar
committed
924
925
	  return -1;
  }
Arne Øslebø's avatar
Arne Øslebø committed
926
927
928
929
930
931
  pthread_spin_unlock(&mapi_lock);
  switch(qbuf.cmd)
    {
    case CREATE_OFFLINE_FLOW_ACK:
      flow=malloc(sizeof(flowdescr_t));
      flow->fd=qbuf.fd;
's avatar
committed
932
      flow->devtype=(char *)malloc(strlen((char *)qbuf.data)+1);
Arne Øslebø's avatar
Arne Øslebø committed
933
      flow->flist=malloc(sizeof(flist_t));
934
935
936
937
#ifdef RECONNECT
      flow->device = strdup(dev);
      flow->format = format;	// MFF_PCAP or MFF_DAG_ERF
#endif
Arne Øslebø's avatar
Arne Øslebø committed
938
      flow->shm_base=NULL;
Arne Øslebø's avatar
Arne Øslebø committed
939
      flow->shm_spinlock=NULL;
's avatar
committed
940
      flow->file = file;	// file descriptor of offline flow
941
      flow->is_connected=0;
's avatar
committed
942
      flow->numfd = 0;		// initialize number of open file descriptors to zero      
Arne Øslebø's avatar
Arne Øslebø committed
943
      flist_init(flow->flist);
's avatar
committed
944
      strcpy(flow->devtype,(char *)qbuf.data);
Arne Øslebø's avatar
Arne Øslebø committed
945
946
      pthread_spin_lock(&mapi_lock);
      flist_append(flowlist,qbuf.fd,flow);
's avatar
committed
947
      incr_totalflows();
's avatar
committed
948
949
950
#ifdef DIMAPI
      fdseed++;
#endif
Arne Øslebø's avatar
Arne Øslebø committed
951
952
953
      pthread_spin_unlock(&mapi_lock);
      return qbuf.fd;
    case ERROR_ACK:
954
      local_err=qbuf.remote_errorcode;
's avatar
committed
955
      decr_numflows();
Arne Øslebø's avatar
Arne Øslebø committed
956
957
958
      return -1;
    default:
      local_err=MCOM_UNKNOWN_ERROR;
's avatar
committed
959
      decr_numflows();
Arne Øslebø's avatar
Arne Øslebø committed
960
961
962
963
      return -1;
    }
}

964
char* mapi_create_offline_device(const char *path, int format)
's avatar
committed
965
966
967
968
969
//Create new flow
//dev=device that should be used
{
  struct mapiipcbuf qbuf;
  int file;
970
971
972
#ifdef RECONNECT
  offline_device *device = NULL;
#endif
's avatar
committed
973

974
975
976
977
978
979
980
  char *mapidsocket, *mapidsocketglobal;

	mapidsocket = printf_string(MAPIDSOCKHOME, getenv("HOME"));
 	mapidsocketglobal = strdup(MAPIDSOCKGLOBAL);

	mapiipc_set_socket_names(mapidsocket, mapidsocketglobal);

's avatar
committed
981
982
983
  pthread_once(&mapi_is_initialized, (void*)mapi_init);

  //Check to see if file can be opened
984
985
  
  if (path==NULL){
986
	  printf("ERROR: NULL path in mapi_create_offline_device\n");
987
988
989
	  return NULL;
  }
  else if ((file=open(path,O_LARGEFILE))==-1) {
's avatar
committed
990
991
992
993
994
    local_err=MAPI_ERROR_FILE;
    return NULL;
  }

  pthread_spin_lock(&mapi_lock);
's avatar
committed
995
  if ((get_numflows() == 0) && (get_totalflows() > 0) && minit) { // socket has been closed, re-create it
's avatar
committed
996
997
    if (mapiipc_client_init()<0) {
	    pthread_spin_unlock(&mapi_lock);
998
	    local_err = MCOM_INIT_SOCKET_ERROR;
's avatar
committed
999
1000
	    return NULL;
    }
For faster browsing, not all history is shown. View entire blame