mapi.c 129 KB
Newer Older
1
2
3
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
Arne Øslebø's avatar
Arne Øslebø committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <stdio.h>
#include <stdarg.h>
#include <stdlib.h>
#include <sys/file.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <netinet/in.h>
#include <errno.h>
#include <pthread.h>
#include <string.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <unistd.h>
21
#include <ctype.h>
Arne Øslebø's avatar
Arne Øslebø committed
22
23
24
25
26
#include "mapi.h"
#include "mapiipc.h"
#include "mapilibhandler.h"
#include "flist.h"
#include "debug.h"
's avatar
committed
27
#include "parseconf.h"
's avatar
   
committed
28
#include "printfstring.h"
29
#include "mapi_errors.h"
30
#include "devgroupdb.h"
Arne Øslebø's avatar
Arne Øslebø committed
31
32
33
34
35
36
37
38

#ifdef WITH_ADMISSION_CONTROL
#include <regex.h>
#include <keynote.h>
#include <openssl/rsa.h>
#include "bytestream.h"
#endif

's avatar
committed
39
40
41
42
#ifdef WITH_ANONYMIZATION
#include "anonymization/anonymization.h"
#endif

's avatar
committed
43
44
45
46
#ifdef DIMAPI
#include <signal.h>
#include <semaphore.h>
#endif
Arne Øslebø's avatar
Arne Øslebø committed
47

48
49
50
51
52
#ifdef WITH_AUTHENTICATION
#define FROM_MAPI
#include "vod/vod.h"
#endif

Arne Øslebø's avatar
Arne Øslebø committed
53
static pthread_once_t mapi_is_initialized = PTHREAD_ONCE_INIT;
54
static pthread_once_t initialized = PTHREAD_ONCE_INIT;
Arne Øslebø's avatar
Arne Øslebø committed
55
56
57
static int minit=0; //Set to 1 when MAPI has been initialized

static pthread_spinlock_t mapi_lock;
's avatar
committed
58
static pthread_spinlock_t numflows_lock;  // for numflows and totalflows variables
Arne Øslebø's avatar
Arne Øslebø committed
59
60
61

static int local_err=0; /* occurence of a mapi.c error, translation of these errors */

's avatar
committed
62
63
static int numflows=0;  // number of allocated (active) flows
static int totalflows=0;  // number of flows so far (including closed flows)
Arne Øslebø's avatar
Arne Øslebø committed
64

's avatar
committed
65
static int offline_devices;
's avatar
committed
66
67
static int agent=0;

68
extern const errorstruct Errors[];
69

70
71
#ifdef DIMAPI
static int hostcmp(void *h1, void *h2);
's avatar
   
committed
72
static void delete_remote_flow(remote_flowdescr_t* rflow);
73
74
flist_t *hostlist=NULL;//list containing all remote hosts used so far
extern flist_t *remote_flowlist;
's avatar
committed
75
int dimapi_port;
76
extern sem_t stats_sem;
77

78
79
80
typedef struct function_data{

	int fid;			// real fid returned from mapicommd
81
	int fidseed;			// fid returned to mapi user
82
	mapidflib_function_def_t* fdef;	// function definition
83
	struct dmapiipcbuf *dbuf;	// need for asynchronous mapi_read_results
84
85
86
87
88
#ifdef RECONNECT
	host_flow *hflow;		// flow of a specified host
	char args[DIMAPI_DATA_SIZE];	// function's arguments
	int index;			// need for functions that have arguments that reference to a flow
#endif
89
90
91
92
} function_data;

static pthread_once_t dmapi_is_initialized = PTHREAD_ONCE_INIT;
static pthread_spinlock_t remote_ipc_lock;
's avatar
committed
93
static pthread_spinlock_t hostlist_lock;
94
95
96
97
98

static unsigned fdseed = 0;        // 'scope' flow descriptor seed (always increases)
static unsigned fidseed = 0;       // function descriptor seed (always increases)
static unsigned negfdseed=-1;       // generates temporary negative fd, for use before create_flow
#endif
's avatar
committed
99
100
101
102
103

typedef struct libinfo {
	char* name;
} libinfo_t;

Arne Øslebø's avatar
Arne Øslebø committed
104
105
typedef struct flowdescr {
  int fd;
106
  int file; 		// file descriptor for offline flows
's avatar
committed
107
108
  int fds[256];		// file descriptors
  int numfd;		// number of file descriptors
Arne Øslebø's avatar
Arne Øslebø committed
109
  char *devtype;
110
111
#ifdef RECONNECT
  char *device;
112
  int read_results_flag;
113
  int mapid_down;
114
#endif
Arne Øslebø's avatar
Arne Øslebø committed
115
  char *shm_base;
's avatar
committed
116
  pthread_spinlock_t *shm_spinlock;
Arne Øslebø's avatar
Arne Øslebø committed
117
  flist_t *flist;
118
119
  unsigned char is_connected;	// this should be 1 if the flow is connected 0 otherwise
  int format;			// this should be MFF_PCAP or MFF_DAG_ERF if the flow is offline, -1 otherwise  
Arne Øslebø's avatar
Arne Øslebø committed
120
121
} flowdescr_t;

122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#ifdef RECONNECT

typedef struct offline_device{
	char *path;			// specifies the name of the trace file to open
	char *previous_device;		// return value of mapi_create_offline_device (device name that should be used in mapi_start_offline_device)
	char *new_device;		// new device name (returned from mapid)
	int format;			// the format of the captured packets (MFF_PCAP or MFF_DAG_ERF)
	unsigned char is_started;	// this should be 1 if the device is started ( via mapi_start_offline_device() ), 0 otherwise
} offline_device;

static unsigned count_offline_devices = 0;	// used in offline_device_list (always increases)

extern flist_t *function_list;			// list which contains all applyed functions ( defined in mapiipc.c )
extern flist_t *offline_device_list;		// list which contains all offline devices ( defined in mapiipc.c )

#endif

extern flist_t *flowlist;	// defined in mapiipc.c
Arne Øslebø's avatar
Arne Øslebø committed
140
141
142
143
144

typedef struct functdescr {
  int fid;
  short result_init;
  mapidflib_function_def_t *def;
145
  mapidflib_function_t *funct;
Arne Øslebø's avatar
Arne Øslebø committed
146
  void *data;  
's avatar
committed
147
  mapi_results_t *result;
's avatar
committed
148
  struct mapipkt *pkt;
149
150
151
152
#ifdef RECONNECT
  flowdescr_t *flow;
  int numfd;
#endif
Arne Øslebø's avatar
Arne Øslebø committed
153
154
155
156
157
158
159
160
161
162
163
164
165
} functdescr_t;

typedef struct shm_result {
  void* ptr; //Pointer to shared data
  int size; //Size of shared data
} shm_result_t;

/*
 * Function declarations 
 */
static int
default_read_result_init(flowdescr_t *flow,functdescr_t* f,void* data);
int 
's avatar
committed
166
get_results_info(flowdescr_t *flow, functdescr_t *f, int user_fd, int user_fid);
Arne Øslebø's avatar
Arne Øslebø committed
167

168
//static int set_error(void* flow, int err_no , int is_remote);
Arne Øslebø's avatar
Arne Øslebø committed
169
170
171

static int send_fd(int *fds,int numfd);

's avatar
committed
172
//global var access functions
's avatar
   
committed
173
174
175
176
177
static int get_numflows();
static int incr_numflows();
static int decr_numflows();
static int get_totalflows();
static int incr_totalflows();
Arne Øslebø's avatar
Arne Øslebø committed
178

179
180
181
static void init()
//common initialization function for mapi and dimapi
{
's avatar
committed
182
#ifdef DIMAPI
183
  char* path=NULL, *libs=NULL, *str=NULL, *s=NULL;
's avatar
committed
184
  char* mapi_conf;
185
  
186
187
  mapi_conf = printf_string( CONFDIR"/"CONF_FILE );

's avatar
committed
188
189
  if (pc_load (mapi_conf))
    {
's avatar
   
committed
190
191
192
193
      conf_category_entry_t* empty_cat = pc_get_category("");
      const char *portstr;

      if( empty_cat == NULL ){
194
          printf("Configuration file has no empty category. Giving up\n");
's avatar
   
committed
195
196
197
198
199
200
          exit(1);
      }
      path = pc_get_param (empty_cat, "libpath");
      libs = pc_get_param (empty_cat, "libs");
      portstr = pc_get_param (empty_cat, "dimapi_port");
      if( portstr == NULL ){
201
          printf("ERROR: Configuration file has no entry for `dimapi_port'. Using default port %d\n", DEFAULT_DIMAPI_PORT);
's avatar
committed
202
203
204
205
206
207
	  dimapi_port = DEFAULT_DIMAPI_PORT;
      }
      else {
	      /* make sure that portstr is a valid number. */
	      dimapi_port = atoi( portstr );
	      if ( dimapi_port<=0 || dimapi_port>=65536 ) {
208
		      printf("ERROR: Invalid port given in configuration file. The default port %d is used\n", DEFAULT_DIMAPI_PORT);
's avatar
committed
209
210
		      dimapi_port = DEFAULT_DIMAPI_PORT;
	      }
's avatar
   
committed
211
      }
's avatar
committed
212
213
214
    }
  else 
    {
215
216
      printf("ERROR: Cannot load mapi.conf file. Giving up.\n");
      printf("Search path is: %s\n", mapi_conf);
's avatar
committed
217
218
219
220
      exit(1);
    }
#endif

221
222
223
224
225
226
  minit = 1;
  pthread_spin_init(&numflows_lock, PTHREAD_PROCESS_PRIVATE);
  
  flowlist = malloc(sizeof(flist_t));
  flist_init(flowlist);

227
228
229
230
231
232
233
234
#ifdef RECONNECT
  function_list = malloc(sizeof(flist_t));
  flist_init(function_list);
  
  offline_device_list = malloc(sizeof(flist_t));
  flist_init(offline_device_list);
#endif

235
236
#ifdef DIMAPI
  pthread_spin_init(&remote_ipc_lock, PTHREAD_PROCESS_PRIVATE);
's avatar
committed
237
  pthread_spin_init(&hostlist_lock, PTHREAD_PROCESS_PRIVATE);
238
239
240
241
  hostlist = malloc(sizeof(flist_t));
  remote_flowlist = malloc(sizeof(flist_t));
  flist_init(hostlist); 
  flist_init(remote_flowlist);
's avatar
committed
242

243
244
245
246
247
248
249
250
  str=libs;
  while((s=strchr(str,':'))!=NULL) {
    *s='\0';
    mapilh_load_library(path,str);
    str=s+1;
  }

  mapilh_load_library(path,str);
's avatar
committed
251
  free(mapi_conf);
252
  pc_close();
253
254
255
#endif
}

Arne Øslebø's avatar
Arne Øslebø committed
256
//Initializes MAPI - called only once by pthread_once()
's avatar
committed
257
static void mapi_init()
Arne Øslebø's avatar
Arne Øslebø committed
258
259
260
261
{
  struct mapiipcbuf qbuf;
  char libpath[4096],*str,*s;
  minit=1;
262
263
  if(mapiipc_client_init()==-1){
	 local_err = MCOM_INIT_SOCKET_ERROR;
264
	 minit=0;
's avatar
committed
265
266
267
	 fprintf(stderr, "\n--------------------------------------------------------\n");
	 fprintf(stderr, "WARNING: mapid may not be running at the given interface\n");
	 fprintf(stderr,"--------------------------------------------------------\n");
268
  }
269
  pthread_once(&initialized, (void*)init);
Arne Øslebø's avatar
Arne Øslebø committed
270
271
  pthread_spin_init(&mapi_lock, PTHREAD_PROCESS_PRIVATE);
  
's avatar
committed
272
273
  offline_devices = 0;
  
Arne Øslebø's avatar
Arne Øslebø committed
274
275
276
277
278
279
  //Get libpath from mapid
  qbuf.mtype=1;
  qbuf.cmd=GET_LIBPATH;
  qbuf.fd=getpid();
  qbuf.pid=getpid();
  pthread_spin_lock(&mapi_lock);
280
281
282
283
284
285
  
 if( mapiipc_write((struct mapiipcbuf*)&qbuf))
	   	local_err = MCOM_SOCKET_ERROR;
 if( mapiipc_read((struct mapiipcbuf*)&qbuf) )
	   	local_err = MCOM_SOCKET_ERROR;
 
Arne Øslebø's avatar
Arne Øslebø committed
286
287
288
  pthread_spin_unlock(&mapi_lock);
  switch(qbuf.cmd) {
      case GET_LIBPATH_ACK:
's avatar
committed
289
	  strncpy(libpath, (char *)qbuf.data, 4096);
Arne Øslebø's avatar
Arne Øslebø committed
290
291
292
293
294
295
	  break;
      default:
	   /* MAPI_ERROR_GETTING_LIBPATH */
	  return;
	  break;
  }
296
  printf("libpath=%s\n", libpath);
Arne Øslebø's avatar
Arne Øslebø committed
297
298
299
300
301
302
303
		  
  //get libs from mapid
  qbuf.mtype=1;
  qbuf.cmd=GET_LIBS;
  qbuf.fd=getpid();
  qbuf.pid=getpid();
  pthread_spin_lock(&mapi_lock);
304
305
306
307
  if(mapiipc_write((struct mapiipcbuf*)&qbuf))
	   	local_err = MCOM_SOCKET_ERROR;
 if( mapiipc_read((struct mapiipcbuf*)&qbuf))
	   	local_err = MCOM_SOCKET_ERROR;
Arne Øslebø's avatar
Arne Øslebø committed
308
309
310
311
312
313
314
315
316
317
318
  pthread_spin_unlock(&mapi_lock);
  switch(qbuf.cmd) {
      case GET_LIBS_ACK:
	  break;
      default:
 	  /* MAPI_ERROR_GETTING_LIBS */
	  return; 
	  break;
  }

  //Load function libraries
's avatar
committed
319
    str = (char *)qbuf.data;
Arne Øslebø's avatar
Arne Øslebø committed
320
321
322
323
324
325
326
327
328
329
    while((s=strchr(str,':'))!=NULL) {
      *s='\0';
      mapilh_load_library(libpath,str);
      str=s+1;
    }

    mapilh_load_library(libpath,str);
    return;
}

330
331
332
333
334
335
336
337
338
#ifdef DIMAPI
//Initializes DIMAPI - called only once by pthread_once()
static void dmapi_init()
{ 
  pthread_once(&initialized, (void*)init);
  return;
}
#endif

Arne Øslebø's avatar
Arne Øslebø committed
339
340
341
342
343
344
345
int mapi_connect(int fd)
//Connect to a mapi flow
//fd = flow descriptor
{
  struct mapiipcbuf qbuf;
  flowdescr_t* flow;

346
347
348
349
350
351
#ifdef DIMAPI
  remote_flowdescr_t* rflow;
  host_flow* hflow;
  flist_node_t* fnode;
#endif

Arne Øslebø's avatar
Arne Øslebø committed
352
  if (!minit) {
353
    printf("MAPI not initialized! [%s:%d]\n", __FILE__, __LINE__);
354
    local_err = MAPI_INIT_ERROR;
Arne Øslebø's avatar
Arne Øslebø committed
355
    return -1;
356
  }else if (fd<=0){
357
   printf("ERROR: Invalid flow descriptor (fd: %d) in mapi_connect\n", fd);
358
   return -1;	  
Arne Øslebø's avatar
Arne Øslebø committed
359
360
  }

361
362
363
#ifdef DIMAPI
  if ((rflow=flist_get(remote_flowlist,fd))!=NULL) {//flow is remote
    for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
364

365
366
367
368
369
      hflow=(host_flow*)fnode->data;
      hflow->dbuf->cmd=CONNECT;
      hflow->dbuf->fd=hflow->fd;
      hflow->dbuf->length=BASIC_SIZE;
    }
370
371
372
#ifdef WITH_AUTHENTICATION
    if(rflow->is_authenticated == 0)
    {
373
	   printf("ERROR: Flow with id %d is not authenticated\n", fd);
374
	   rflow->is_connected = 0;
's avatar
committed
375
	   return(-2);
376
377
378
    }
    
#endif
379
380
381
382
    if (mapiipc_remote_write_to_all(rflow)<0){ 
	local_err = MCOM_SOCKET_ERROR;
	return -1;
    }
383
384
385
386
387
388
389
390

    //wait results

    for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
      hflow=(host_flow*)fnode->data;

      switch(hflow->dbuf->cmd) {
        case CONNECT_ACK:
391
		rflow->is_connected=1;
392
393
          continue;
        case ERROR_ACK:
's avatar
committed
394
	  memcpy(&local_err, hflow->dbuf->data, sizeof(int));
395
396
          return -1;
        default:
's avatar
committed
397
	  local_err = MCOM_UNKNOWN_ERROR;
398
399
400
401
402
403
404
405
          return -1;
     }

    }
    return 0;
  }
#endif

Arne Øslebø's avatar
Arne Øslebø committed
406
  if ((flow=flist_get(flowlist,fd))==NULL) {
407
    printf("ERROR: Invalid flow %d [%s:%d]\n", fd, __FILE__, __LINE__);
408
    local_err = MAPI_INVALID_FLOW;
Arne Øslebø's avatar
Arne Øslebø committed
409
    return -1;
410
411
  }

Arne Øslebø's avatar
Arne Øslebø committed
412
413
  qbuf.mtype=1;
  qbuf.cmd=CONNECT;
414
415
416
417
#ifdef RECONNECT
  qbuf.fd = flow->fd;
  qbuf.user_fd = fd;
#else
Arne Øslebø's avatar
Arne Øslebø committed
418
  qbuf.fd=fd;
419
#endif
Arne Øslebø's avatar
Arne Øslebø committed
420
421
  qbuf.pid=getpid();
  pthread_spin_lock(&mapi_lock);
's avatar
committed
422
423
  if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);	  
424
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
425
426
427
428
	  return -1;
  }
  if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);
429
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
430
431
	  return -1;
  }
Arne Øslebø's avatar
Arne Øslebø committed
432
433
434
435
  pthread_spin_unlock(&mapi_lock);
  switch(qbuf.cmd)
    {
    case CONNECT_ACK:
436
	flow->is_connected=1;
's avatar
committed
437
      return 0;
Arne Øslebø's avatar
Arne Øslebø committed
438
    case ERROR_ACK:
439
      local_err =  qbuf.remote_errorcode;
Arne Øslebø's avatar
Arne Øslebø committed
440
441
      return -1;
    default:
442
      local_err= MCOM_UNKNOWN_ERROR;
Arne Øslebø's avatar
Arne Øslebø committed
443
444
445
446
      return -1;
    }  
}

447
#ifdef DIMAPI
's avatar
   
committed
448
static void delete_remote_flow(remote_flowdescr_t* rflow) 
449
450
{
  host_flow* hflow;
's avatar
committed
451
  flist_node_t* fnode, *fnode2, *fnode3;
's avatar
   
committed
452
  mapi_results_t* res;
453
454
  function_data *fdata;
  int count;
455
456

  pthread_spin_lock(&remote_ipc_lock);
457
  flist_remove(remote_flowlist, rflow->fd);
458
459
460
461
  decr_numflows();
  pthread_spin_unlock(&remote_ipc_lock);

  sem_destroy(&rflow->fd_sem);
's avatar
committed
462
  sem_destroy(&rflow->pkt_sem);
463
464
465
466
467

  for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=fnode2 ) {
    fnode2=flist_next(fnode);
    hflow=(host_flow*)fnode->data;
    hflow->rhost->num_flows--;
468
    flist_remove(hflow->rhost->flows, hflow->fd);
469

's avatar
committed
470
    for (fnode3=flist_head(hflow->functions); fnode3!=NULL; fnode3=flist_next(fnode3)) {
471
472
473
474
475
476
477
478

      fdata = (function_data *)flist_remove(hflow->rhost->functions, ((function_data*)fnode3->data)->fidseed);

      if(fdata->dbuf != NULL){
	      free(fdata->dbuf);
	      fdata->dbuf = NULL;
      }
      free(fdata);
's avatar
committed
479
    }
480

481
    if (hflow->rhost->num_flows==0) {
482
      pthread_cancel(*hflow->rhost->comm_thread);
483
      mapiipc_remote_close(hflow->rhost);	//close the socket
484
      flist_destroy(hflow->rhost->flows);
485
      free(hflow->rhost->flows);
486
      flist_destroy(hflow->rhost->functions);
's avatar
committed
487
      free(hflow->rhost->functions);
488
      free(hflow->rhost->hostname);
489
      flist_remove(hostlist, hflow->rhost->sockfd);
's avatar
committed
490
      free(hflow->rhost->comm_thread);
491
492
#ifdef RECONNECT
      sem_destroy(&hflow->rhost->connection);	// destroy semaphore
493
494
      pthread_mutex_destroy(&hflow->rhost->rec_lock);
      pthread_cond_destroy(&hflow->rhost->rec_condition);
495
#endif
496
497
498
      free(hflow->rhost);
    }
    //we check if a host is using in other rflows and delete it -close the socket- if not
499
    flist_destroy(hflow->functions);
500
501
502
    free(hflow->functions);
    free(hflow->dev);
    free(hflow->dbuf);
503
    if (hflow->pkt!=NULL) free(hflow->pkt);
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518

    if(hflow->asyn_pkts != NULL){	// release resources of mapi_asynchronous_get_next_pkt()
	    
	    for(count = 0; count < ASYN_GNP_BUFFER_SIZE; count++)
		    free(hflow->asyn_pkts->pkts[count]);

	    free(hflow->asyn_pkts->pkts);
	    free(hflow->asyn_pkts);
	    
	    pthread_cancel(*hflow->asyn_comm_thread);
	    mapiipc_remote_close_asyn(hflow);
	    free(hflow->asyn_comm_thread);
	    // TODO close the new socket
    }

519
    flist_remove(rflow->host_flowlist,hflow->id);
's avatar
committed
520
    free(hflow);
521
  }
522
  flist_destroy(rflow->host_flowlist);
's avatar
   
committed
523
524
  free(rflow->host_flowlist);
  if (rflow->pkt_list!=NULL) {
525
	flist_destroy(rflow->pkt_list);
's avatar
   
committed
526
527
528
529
530
531
532
533
	free(rflow->pkt_list);
  }
  for (fnode=flist_head(rflow->function_res); fnode!=NULL; fnode=fnode2 ) { 
	fnode2=flist_next(fnode);
	res=(mapi_results_t*)fnode->data;
	free(res->res);
	free(res);
  }
534
  flist_destroy(rflow->function_res);
's avatar
committed
535
  free(rflow->function_res);
's avatar
committed
536
  if (rflow->pkt!=NULL) free(rflow->pkt);
's avatar
   
committed
537
#ifdef WITH_AUTHENTICATION
's avatar
committed
538
539
  if (rflow->username!=NULL) free(rflow->username);
  if (rflow->vo!=NULL) free(rflow->vo);
540
541
542
#ifdef RECONNECT
  if (rflow->password!=NULL) free(rflow->password);
#endif
's avatar
   
committed
543
#endif  
544
545
546
547
  free(rflow);
}
#endif

548
int mapi_create_flow(const char *dev)
Arne Øslebø's avatar
Arne Øslebø committed
549
550
551
552
553
//Create new flow
//dev=device that should be used
{
  struct mapiipcbuf qbuf;
  flowdescr_t *flow, *tmpflow;
554
555
556
557
  struct devgroupdb *devgroupdb;
  int devgroupid = 0;
  char *mapidsocket, *mapidsocketglobal;
/*  int local;*/
Arne Øslebø's avatar
Arne Øslebø committed
558

559
560
561
562
563
#ifdef DIMAPI
  remote_flowdescr_t *rflow;
  char *hostname=NULL, *s=NULL, *k=NULL;
  struct host *h=NULL;
  host_flow* hflow;
's avatar
committed
564
  char *devp;
565
566
567
  flist_node_t* fnode;
  unsigned int idgen=0;
#endif
568
569
570
571
572
573
574
#ifdef RECONNECT
  offline_device *device = NULL;
  flist_node_t *fnode_;
  int flag = 0;
#endif

  if(dev==NULL){
575
	 printf("ERROR: Wrong device name given (NULL) in mapi_create_flow\n");
576
577
578
	 local_err  = MAPI_DEVICE_INFO_ERR;
	 return -1;
  }
579
580

#ifndef DIMAPI
581
582
583
584
585
586
587
588
589
590
591
592
593
	devgroupdb = devgroupdb_open(3); // try local, global
	devgroupid = devgroupdb_getgroupidbydevice(devgroupdb, (char *) dev);

	if(!devgroupid) {
		mapidsocket = printf_string(MAPIDSOCKHOME, getenv("HOME"));
  	mapidsocketglobal = strdup(MAPIDSOCKGLOBAL);
  }
	else {
		mapidsocket = printf_string(MAPIDGSOCKHOME, getenv("HOME"), devgroupid);
		mapidsocketglobal = printf_string(MAPIDGSOCKGLOBAL, devgroupid);
	}

	mapiipc_set_socket_names(mapidsocket, mapidsocketglobal);
594

's avatar
committed
595
596
597
  pthread_once(&mapi_is_initialized, (void*)mapi_init);
#endif

598
  //check if flow is remote or not and call the appropriate init function
599
#ifdef DIMAPI
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
  if ( strchr(dev,':')==NULL) {
		devgroupdb = devgroupdb_open(3); // try local, global
		devgroupid = devgroupdb_getgroupidbydevice(devgroupdb, (char *) dev);

		if(!devgroupid) {
			mapidsocket = printf_string(MAPIDSOCKHOME, getenv("HOME"));
			mapidsocketglobal = strdup(MAPIDSOCKGLOBAL);
		}
		else {
			mapidsocket = printf_string(MAPIDGSOCKHOME, getenv("HOME"), devgroupid);
			mapidsocketglobal = printf_string(MAPIDGSOCKGLOBAL, devgroupid);
		}

		mapiipc_set_socket_names(mapidsocket, mapidsocketglobal);

  	pthread_once(&mapi_is_initialized, (void*)mapi_init);
	}
617
618
619
  else pthread_once(&dmapi_is_initialized, (void*)dmapi_init);

  if ((s = strchr(dev,':'))!=NULL) {
's avatar
committed
620
    devp=strdup(dev);
621
622
623
    rflow=(remote_flowdescr_t *)malloc(sizeof(remote_flowdescr_t));
    rflow->fd=++fdseed;
    sem_init(&rflow->fd_sem, 0, 0);
's avatar
committed
624
    sem_init(&rflow->pkt_sem, 0, 0);
625
626
627
628
    rflow->host_flowlist=(flist_t*)malloc(sizeof(flist_t));
    flist_init(rflow->host_flowlist);
    rflow->pkt_list=NULL;
    rflow->function_res=(flist_t*)malloc(sizeof(flist_t));
629
    rflow->is_connected = 0;
630
    rflow->is_asyn_gnp_called = 0;
's avatar
   
committed
631
632
633
#ifdef WITH_AUTHENTICATION
    rflow->username=NULL;
    rflow->vo=NULL;
634
635
#ifdef RECONNECT
    rflow->password = NULL;
636
    rflow->daemons_down = 0;    
637
#endif
's avatar
   
committed
638
#endif
's avatar
committed
639
    rflow->pkt=NULL;
640
641
642
643
644
645
646
647
    flist_init(rflow->function_res);
    k=strtok(devp, ", ");

    while (k!=NULL) {
      if ((s = strchr(k,':'))!=NULL) {
        *s = '\0';
        hostname = k;
        k = s + 1;
's avatar
committed
648
        pthread_spin_lock(&hostlist_lock);	
649
        h = (struct host *)flist_search(hostlist, hostcmp, hostname);
650
651
652
653

        if(h==NULL){// Our host is a new one --> insert it in the hostlist
          h = (struct host *)malloc(sizeof(struct host));
	  h->hostname = strdup(hostname);
's avatar
committed
654
	  h->port = dimapi_port;
655
656
	  h->flows = (flist_t *)malloc(sizeof(flist_t));
	  flist_init(h->flows);
's avatar
committed
657
658
	  h->functions = (flist_t *)malloc(sizeof(flist_t));
	  flist_init(h->functions);
659
	  h->num_flows=0;
660
	  h->stats=NULL;
661
662
#ifdef RECONNECT
      	  sem_init(&h->connection, 0, 0);	// initialize semaphore
663
664
665
	  pthread_mutex_init(&(h->rec_lock), NULL);
	  pthread_cond_init(&(h->rec_condition), NULL);
	  h->host_down = 0;
666
#endif
667
	  // Create the socket
's avatar
committed
668
	  if (mapiipc_remote_init(h)<0) {
669
	   	  local_err = MCOM_SOCKET_ERROR;
670
		  printf("ERROR: Could not connect with host %s [%s:%d]\n", h->hostname, __FILE__, __LINE__);
671
672
673
674
675
676
		  flist_destroy(h->flows);
		  free(h->flows);
		  flist_destroy(h->functions);
		  free(h->functions);
		  free(h->hostname);
		  free(h);
's avatar
committed
677
678
679
		  pthread_spin_unlock(&hostlist_lock);
		  return -1;
	  }
's avatar
committed
680

681
682
683
684
	  h->comm_thread=(pthread_t *)malloc(sizeof(pthread_t));
	  pthread_create(h->comm_thread, NULL, *mapiipc_comm_thread, h);

          flist_append(hostlist, h->sockfd, h);
's avatar
committed
685
          pthread_spin_unlock(&hostlist_lock);
686
687
        }
        else{//host exists in the list
's avatar
committed
688
          pthread_spin_unlock(&hostlist_lock);
689
690
691
692
693
694
695
        }

        h->num_flows++;
	hflow=(host_flow*)malloc(sizeof(host_flow));
	hflow->scope_fd=rflow->fd;
	hflow->dev=strdup(k);
	flist_append(rflow->host_flowlist, ++idgen, hflow);
696
	hflow->id=idgen;
697
698
699
	flist_append(h->flows, --negfdseed, hflow);
	hflow->fd=negfdseed;
	hflow->dbuf=(struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf));
700
	hflow->pkt=NULL;
701
	hflow->asyn_pkts = NULL;
702
703
	hflow->rhost=h;
	hflow->functions=(flist_t *)malloc(sizeof(flist_t));
704
	pthread_spin_init(&(hflow->asyn_get_next_pkt_lock), PTHREAD_PROCESS_PRIVATE);
705
706
707
	flist_init(hflow->functions);

	hflow->dbuf->cmd=CREATE_FLOW;
's avatar
committed
708
	strncpy((char *)hflow->dbuf->data,k,DATA_SIZE);
709
710
711
712
713
714
715
716
717
718
719
720
721
	hflow->dbuf->length=BASIC_SIZE+strlen(k)+1;
      }
      else {
        //this is the case where the dev string contains both 'host:interface1' and 'interface2'
	//example: mapi_create_flow("139.91.70.98:eth0, 147.52.16.102:eth0, eth1");
	//user's intention is probably localhost:eth1
	//what should be done in this case?
      }
      k=strtok(NULL,", ");
    }

    free(devp);

's avatar
committed
722
    rflow->scope_size=flist_size(rflow->host_flowlist);
723
724
725
726
727
728
    pthread_spin_lock(&remote_ipc_lock);
    flist_append(remote_flowlist, rflow->fd, rflow);
    incr_numflows();
    incr_totalflows();
    pthread_spin_unlock(&remote_ipc_lock);

729
730
731
732
    if (mapiipc_remote_write_to_all(rflow)<0){
	    local_err = MCOM_SOCKET_ERROR;
	    return -1;
    }
's avatar
committed
733
    //sends to all hosts of rflow the proper dbuf, increment the pending_msgs makes sem_wait(rflow->fd_sem) and the comm_thread will get the results - the hflow->fd for every flow -
734
735
736
737
738

    //wait for results

    for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
      hflow=(host_flow*)fnode->data;
's avatar
committed
739
      if (hflow->dbuf->cmd == CREATE_FLOW_ACK) {
740
        hflow->fd=*((int*)hflow->dbuf->data);
741
	flist_remove(hflow->rhost->flows, hflow->dbuf->fd);
742
743
	flist_append(hflow->rhost->flows, hflow->fd, hflow);
      }
's avatar
committed
744
745
      else if (hflow->dbuf->cmd == ERROR_ACK) {
	memcpy(&local_err, hflow->dbuf->data, sizeof(int));
746
	printf("ERROR: Could not create flow in host %s [%s:%d]\n", hflow->rhost->hostname, __FILE__, __LINE__);
's avatar
committed
747
748
749
750
751
        delete_remote_flow(rflow);
	return -1;
      }
      else {
	local_err = MCOM_UNKNOWN_ERROR;
752
753
754
755
756
757
758
759
760
761
        delete_remote_flow(rflow);
	return -1;
      }
    }

    return rflow->fd;

  }
#endif

's avatar
committed
762
763
  pthread_spin_lock(&mapi_lock);
  
's avatar
committed
764
  if ((get_numflows() == 0) && (get_totalflows() > 0) && minit) { // socket has been closed, re-create it
765
766
767
    if(mapiipc_client_init()==-1){
	  local_err = MCOM_INIT_SOCKET_ERROR;
    }
's avatar
committed
768
    incr_numflows();
Arne Øslebø's avatar
Arne Øslebø committed
769
  }
's avatar
committed
770
771
772
773
  else 
    incr_numflows();
  
  pthread_spin_unlock(&mapi_lock);
Arne Øslebø's avatar
Arne Øslebø committed
774

775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
#ifdef RECONNECT
  /* consider the following case :
     	- devicename = mapi_create_offline_device("./tracefile", MFF_PCAP);
	- {
		code ...
		--> reconnection
		code ...
	  }
	  fd = mapi_create_flow(devicename);

	  Now mapi_create_flow must send to mapid the new device name ...
  */

  pthread_spin_lock(&mapi_lock);

  for(fnode_ = flist_head(offline_device_list); fnode_ != NULL; fnode_ = flist_next(fnode_)){	// find specified device in offline_device_list

	  device = (offline_device *)fnode_->data;

	  if(!strcmp(device->previous_device, dev)){
		  flag = 1;					// FIXME
		  break;
	  }
  }
  if(flag)	strncpy((char *) qbuf.data, device->new_device, DATA_SIZE);	// the new device name
  else		strncpy((char *) qbuf.data, dev, DATA_SIZE);

  flag = 0;

  pthread_spin_unlock(&mapi_lock);
#else
  strncpy((char *)qbuf.data,dev,DATA_SIZE);
#endif

Arne Øslebø's avatar
Arne Øslebø committed
809
810
811
812
813
  qbuf.mtype=1;
  qbuf.cmd=CREATE_FLOW;
  qbuf.fd=getpid();
  qbuf.pid=getpid();
  pthread_spin_lock(&mapi_lock);
814

's avatar
committed
815
  if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) {
816
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
817
	  pthread_spin_unlock(&mapi_lock);	  
's avatar
committed
818
   	  decr_numflows();
's avatar
committed
819
820
821
	  return -1;
  }
  if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
822
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
823
	  pthread_spin_unlock(&mapi_lock);
's avatar
committed
824
	  decr_numflows();
's avatar
committed
825
826
	  return -1;
  }
Arne Øslebø's avatar
Arne Øslebø committed
827
828
829
830
831
  pthread_spin_unlock(&mapi_lock);
  switch(qbuf.cmd)
    {
    case CREATE_FLOW_ACK:
      tmpflow=flist_get(flowlist,qbuf.fd);
's avatar
committed
832
833
      if (tmpflow!=NULL) 
        {
834
	        printf("ERROR: Mapid gave us a fd (%d) which already exist in our lists, exiting [%s:%d]\n", qbuf.fd, __FILE__, __LINE__);
's avatar
committed
835
	        decr_numflows();
's avatar
committed
836
		return -1;
's avatar
committed
837
     	}
Arne Øslebø's avatar
Arne Øslebø committed
838
      flow=malloc(sizeof(flowdescr_t));
's avatar
   
committed
839
      if( flow == NULL ){
840
	  printf("ERROR: Out of memory [%s:%d]\n", __FILE__, __LINE__);
's avatar
committed
841
	  decr_numflows();
's avatar
committed
842
	  return -1;
's avatar
   
committed
843
      }
Arne Øslebø's avatar
Arne Øslebø committed
844
      flow->fd=qbuf.fd;
's avatar
committed
845
      flow->devtype=(char *)malloc(strlen((char *)qbuf.data)+1);
Arne Øslebø's avatar
Arne Øslebø committed
846
      flow->flist=malloc(sizeof(flist_t));
847
848
849
#ifdef RECONNECT
      flow->device = strdup(dev);
      flow->format = -1;	// in case of online flow, assigned to -1
850
851
      flow->read_results_flag = 0;
      flow->mapid_down = 0;
852
#endif
Arne Øslebø's avatar
Arne Øslebø committed
853
      flow->shm_base=NULL;
Arne Øslebø's avatar
Arne Øslebø committed
854
      flow->shm_spinlock=NULL;
's avatar
committed
855
      flow->file = -1;		// in case of online flow, assigned to -1
856
      flow->is_connected =0;
's avatar
committed
857
      flow->numfd = 0;		// initialize number of open file descriptors to zero
Arne Øslebø's avatar
Arne Øslebø committed
858
      flist_init(flow->flist);
's avatar
committed
859
      strcpy(flow->devtype,(char *)qbuf.data);
Arne Øslebø's avatar
Arne Øslebø committed
860
861
      pthread_spin_lock(&mapi_lock);
      flist_append(flowlist,qbuf.fd,flow);
's avatar
committed
862
      incr_totalflows();
's avatar
committed
863
864
865
#ifdef DIMAPI
      fdseed++;
#endif
Arne Øslebø's avatar
Arne Øslebø committed
866
867
868
869
870
      pthread_spin_unlock(&mapi_lock);
      return qbuf.fd;
      
    /* should probably have a separate error message for ERROR_ACK? */
    case ERROR_ACK:
's avatar
committed
871
      decr_numflows();
872
      local_err=qbuf.remote_errorcode;
Arne Øslebø's avatar
Arne Øslebø committed
873
874
      return -1;
    default:
's avatar
committed
875
      decr_numflows();
Arne Øslebø's avatar
Arne Øslebø committed
876
877
878
879
880
      local_err=MCOM_UNKNOWN_ERROR;
      return -1;
    }
}

881
int mapi_create_offline_flow(const char *dev, int format)
Arne Øslebø's avatar
Arne Øslebø committed
882
883
884
885
//Create new flow
//dev=device that should be used
{
  struct mapiipcbuf qbuf;
886
  flowdescr_t *flow=NULL;
Arne Øslebø's avatar
Arne Øslebø committed
887
  int file;
888
889
  struct devgroupdb *devgroupdb;
  int devgroupid = 0;
890
891
  char *mapidsocket, *mapidsocketglobal;

892
893
894
895
896
897
898
899
900
901
902
903
	devgroupdb = devgroupdb_open(3); // try local, global

	devgroupid = devgroupdb_getgroupid(devgroupdb); // get groupid of the latest instance of mapid

	if(!devgroupid) {
		mapidsocket = printf_string(MAPIDSOCKHOME, getenv("HOME"));
		mapidsocketglobal = strdup(MAPIDSOCKGLOBAL);
	}
	else {
		mapidsocket = printf_string(MAPIDGSOCKHOME, getenv("HOME"), devgroupid);
		mapidsocketglobal = printf_string(MAPIDGSOCKGLOBAL, devgroupid);
	}
904
905

	mapiipc_set_socket_names(mapidsocket, mapidsocketglobal);
Arne Øslebø's avatar
Arne Øslebø committed
906
907
908
909

  pthread_once(&mapi_is_initialized, (void*)mapi_init);

  //Check to see if file can be opened
910
  if (dev==NULL){
911
	  printf("ERROR: NULL device in mapi_create_offline_flow\n");
912
913
914
	  return -1;
  }
  else if ((file=open(dev,O_LARGEFILE))==-1) {
Arne Øslebø's avatar
Arne Øslebø committed
915
916
917
    local_err=MAPI_ERROR_FILE;
    return -1;
  }
's avatar
committed
918
  
's avatar
committed
919
  pthread_spin_lock(&mapi_lock);
's avatar
committed
920
  if ((get_numflows() == 0) && (get_totalflows() > 0) && minit) { // socket has been closed, re-create it
's avatar
committed
921
    if (mapiipc_client_init()<0) {
922
	    local_err = MCOM_INIT_SOCKET_ERROR;
's avatar
committed
923
924
925
	    pthread_spin_unlock(&mapi_lock);
	    return -1;
    }
's avatar
committed
926
    incr_numflows();
Arne Øslebø's avatar
Arne Øslebø committed
927
  }
's avatar
committed
928
929
930
  else
  	incr_numflows();

's avatar
committed
931
  pthread_spin_unlock(&mapi_lock);
Arne Øslebø's avatar
Arne Øslebø committed
932
933
934
935
936
937

  qbuf.mtype=1;
  qbuf.cmd=CREATE_OFFLINE_FLOW;
  qbuf.fd=getpid();
  qbuf.pid=getpid();
  qbuf.fid=format;
's avatar
committed
938
  strncpy((char *)qbuf.data,dev,DATA_SIZE);
Arne Øslebø's avatar
Arne Øslebø committed
939
  pthread_spin_lock(&mapi_lock);
's avatar
committed
940
  if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) {
941
942
	  pthread_spin_unlock(&mapi_lock);	 
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
943
	  decr_numflows();
's avatar
committed
944
945
946
947
	  return -1;
  }
  if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);
948
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
949
	  decr_numflows();
's avatar
committed
950
951
	  return -1;
  }
Arne Øslebø's avatar
Arne Øslebø committed
952
953
954
  if(qbuf.cmd==SEND_FD) {
    if(mapiipc_send_fd(file)==-1) {
      local_err=MAPI_ERROR_SEND_FD;
955
      pthread_spin_unlock(&mapi_lock);
's avatar
committed
956
      decr_numflows();
's avatar
committed
957
      return -1;
Arne Øslebø's avatar
Arne Øslebø committed
958
959
960
    }
  } else {
    local_err=MAPI_ERROR_SEND_FD;
's avatar
committed
961
962
    pthread_spin_unlock(&mapi_lock);
    decr_numflows();
Arne Øslebø's avatar
Arne Øslebø committed
963
964
965
    return -1;
  }

's avatar
committed
966
967
  if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
	  pthread_spin_unlock(&mapi_lock);
968
	  local_err = MCOM_SOCKET_ERROR;
's avatar
committed
969
	  decr_numflows();
's avatar
committed
970
971
	  return -1;
  }
Arne Øslebø's avatar
Arne Øslebø committed
972
973
974
975
976
977
  pthread_spin_unlock(&mapi_lock);
  switch(qbuf.cmd)
    {
    case CREATE_OFFLINE_FLOW_ACK:
      flow=malloc(sizeof(flowdescr_t));
      flow->fd=qbuf.fd;
's avatar
committed
978
      flow->devtype=(char *)malloc(strlen((char *)qbuf.data)+1);
Arne Øslebø's avatar
Arne Øslebø committed
979
      flow->flist=malloc(sizeof(flist_t));
980
981
982
983
#ifdef RECONNECT
      flow->device = strdup(dev);
      flow->format = format;	// MFF_PCAP or MFF_DAG_ERF
#endif
Arne Øslebø's avatar
Arne Øslebø committed
984
      flow->shm_base=NULL;
Arne Øslebø's avatar
Arne Øslebø committed
985
      flow->shm_spinlock=NULL;
's avatar
committed
986
      flow->file = file;	// file descriptor of offline flow
987
      flow->is_connected=0;
's avatar
committed
988
      flow->numfd = 0;		// initialize number of open file descriptors to zero      
Arne Øslebø's avatar
Arne Øslebø committed
989
      flist_init(flow->flist);
's avatar
committed
990
      strcpy(flow->devtype,(char *)qbuf.data);
Arne Øslebø's avatar
Arne Øslebø committed
991
992
      pthread_spin_lock(&mapi_lock);
      flist_append(flowlist,qbuf.fd,flow);
's avatar
committed
993
      incr_totalflows();
's avatar
committed
994
995
996
#ifdef DIMAPI
      fdseed++;
#endif
Arne Øslebø's avatar
Arne Øslebø committed
997
998
999
      pthread_spin_unlock(&mapi_lock);
      return qbuf.fd;
    case ERROR_ACK:
1000
      local_err=qbuf.remote_errorcode;
For faster browsing, not all history is shown. View entire blame