mapiipc.c 58.2 KB
Newer Older
1
2
3
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
Arne Øslebø's avatar
Arne Øslebø committed
4
5
6
7
8
9
10
11
12
13
14
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/un.h>
#include <string.h>
15
#include <signal.h>
Arne Øslebø's avatar
Arne Øslebø committed
16
17
18

#include "mapiipc.h"
#include "debug.h"
19
#include "mapidflib.h"
Arne Øslebø's avatar
Arne Øslebø committed
20
21
22

#define HAVE_MSGHDR_MSG_CONTROL 1

23
24
25
26
27
28
29
#ifdef DIMAPI
#include <netdb.h>
#include <netinet/in.h>
#include <semaphore.h>
#include "flist.h"
#endif

30
31
32
33
34
#ifdef RECONNECT
#include <fcntl.h>
#endif

// this file contains all the client-side IPC related functions [ and functions for reconnection purposes ]
Arne Øslebø's avatar
Arne Øslebø committed
35

's avatar
   
committed
36
37
38
static int sock;
static int mapidaddr_len;
static struct sockaddr_un mapidaddr;
's avatar
committed
39
static char* mapidsocket;
Arne Øslebø's avatar
Arne Øslebø committed
40

41
42
flist_t *flowlist = NULL;

's avatar
committed
43
44
#ifdef DIMAPISSL
static SSL_CTX *ctx;
45
void sigpipe_handle();
's avatar
committed
46
47
#endif

48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#ifdef RECONNECT

#ifdef DIMAPI
int check_network_mapicommd(struct host *h);	// checks if network (mapicommd) is up. Returns 1 if network is up, 0 otherwise
void restore_network_mapicommd(struct host *h);	// restores the connection to mapicommd in case of a breakdown, using back-off mechanism
void mapi_recreate_flow(struct host *h);	// recreates all flows, that host which broke down the connection had created
void mapi_reapply_function(struct host *h);	// reapplies all functions, that host which broke down the connection had applyed
void mapi_reconnect(struct host *h);		// checks which flows of the specified host, had called mapi_connect() function
#ifdef WITH_AUTHENTICATION
void mapi_reauthenticate(struct host *h);	// checks which flows of the specified host, had called mapi_authenticate() function
#endif
void check_mapi_functions(struct host *h);	// checks all functions that can be called, when a break down happens
void mapi_get_next_packet(struct host *h);	// checks which flows of the specified host, had called mapi_get_next_pkt() function

typedef struct function_data{

	int fid;			// real fid returned from mapicommd
	mapidflib_function_def_t *fdef;	// function definition
	host_flow *hflow;		// flow of a specified host
	char args[DIMAPI_DATA_SIZE];	// function's arguments
	int index;			// need for functions that have arguments that reference to a flow

} function_data;

#endif

int check_network_mapid(void);			// checks if network (mapid) is up. Returns 1 if network is up, 0 otherwise
void restore_network_mapid(void);		// restores the connection to mapid in case of a breakdown, using back-off mechanism
void mapi_create_offline_device_mapid(void);	// this function recreates all offline devices, that have been created and not deleted
void mapi_recreate_flow_mapid(void);		// recreates all flows, on-line and off-line, that have been created and not closed
void mapi_reapply_function_mapid(void);		// reapplies all functions, that have been applyed to flows that have not been closed
void mapi_reconnect_mapid(void);		// checks which flows, had called mapi_connect() function and not mapi_close_flow()
void mapi_start_offline_device_mapid(void);	// checks which devices, had called mapi_start_offline_device() function and not mapi_delete_offline_device()
void mapi_read_results_mapid(void);		// initializes shared memory segments for new results of a function, after reconnection

typedef struct flowdescr{

	int fd;
	int file; 			// file descriptor for offline flows
	int fds[256];			// file descriptors
	int numfd;			// number of file descriptors
	char *devtype;
	char *device;
	char *shm_base;
	pthread_mutex_t *shm_spinlock;
	flist_t *flist;
	unsigned char is_connected; 	// this should be 1 if the flow is connected, 0 otherwise
	int format;			// this should be MFF_PCAP or MFF_DAG_ERF if the flow is offline, -1 otherwise
} flowdescr_t;

typedef struct functdescr{

	int fid;
	short result_init;
	mapidflib_function_def_t *def;
	mapidflib_function_t *funct;
	void *data;
	mapi_results_t *result;
	flowdescr_t *flow;
	int numfd;
} functdescr_t;

typedef struct offline_device{
	char *path;			// specifies the name of the trace file to open
	char *previous_device;		// return value of mapi_create_offline_device (device name that should be used in mapi_start_offline_device)
	char *new_device;		// new device name (returned from mapid)
	int format;			// the format of the captured packets (MFF_PCAP or MFF_DAG_ERF)
	unsigned char is_started;	// this should be 1 if the device is started ( via mapi_start_offline_device() ), 0 otherwise
} offline_device;

flist_t *function_list = NULL;		// list which contains all applyed functions
flist_t *offline_device_list = NULL;	// list which contains all offline devices

#endif

int mapiipc_write(struct mapiipcbuf *qbuf){	// sends an IPC message to mapid

  qbuf->uid = getuid();		 // returns the real user ID of the current process

  if(send(sock, qbuf, sizeof(struct mapiipcbuf), MSG_NOSIGNAL) == -1){

    WARNING_CMD(printf("send: %s [%s:%d]\n", strerror(errno), __FILE__, __LINE__));

#ifdef RECONNECT
    offline_device *device = NULL;
    flist_node_t *fnode = NULL;
    flowdescr_t* flow = NULL;
    functdescr_t *fun = NULL;

    printf("\n ---> Mapid is down\n");
    
's avatar
committed
139
140
141
142
143
144
145
146
147
148
    restore_network_mapid();
    mapi_create_offline_device_mapid();
    mapi_recreate_flow_mapid();
    mapi_reapply_function_mapid();
    mapi_reconnect_mapid();
    mapi_start_offline_device_mapid();
    mapi_read_results_mapid();
    
    // special case ( create flow for a device name returned from mapi_create_offline_device() and start/delete this device )
    if(qbuf->cmd == CREATE_FLOW || qbuf->cmd == START_OFFLINE_DEVICE || qbuf->cmd == DELETE_OFFLINE_DEVICE){
149

's avatar
committed
150
151
	    // find specified device in offline_device_list
	    for(fnode = flist_head(offline_device_list); fnode != NULL; fnode = flist_next(fnode)){
152
		    
's avatar
committed
153
154
155
156
157
158
		    device = (offline_device *)fnode->data;
		    
		    if(!strcmp(device->previous_device, (char *) qbuf->data)){
			    strncpy((char *) qbuf->data, device->new_device, DATA_SIZE);	// the new device name
			    break;
		    }
159
	    }
's avatar
committed
160
161
162
163
164
165
166
167
    }
    // FIXME (CLOSE_FLOW ...)
    else if(qbuf->cmd == GET_FLOW_INFO || qbuf->cmd == GET_NEXT_FLOW_INFO || qbuf->cmd == CONNECT || qbuf->cmd == APPLY_FUNCTION){
	    
	    flow = flist_get(flowlist, qbuf->user_fd);	// we must send the new fd of the flow, in the below send operation
	    qbuf->fd = flow->fd;
    }
    else if(qbuf->cmd == GET_FUNCTION_INFO || qbuf->cmd == GET_NEXT_FUNCTION_INFO){
168

's avatar
committed
169
170
171
172
173
	    flow = flist_get(flowlist, qbuf->user_fd);		// we must send the new fd of the flow, in the below send operation
	    fun = flist_get(flow->flist, qbuf->user_fid);	// and the new fid of the function
	    
	    qbuf->fd = flow->fd;
	    qbuf->fid = fun->fid;
174
    }
's avatar
committed
175
176
177
178
179
180
181
182
183
184
185
    else if(qbuf->cmd == READ_RESULT && qbuf->fd != -1 && qbuf->fid != -1){

	    flow = flist_get(flowlist, qbuf->user_fd);		// we must send the new fd of the flow, in the below send operation
	    fun = flist_get(flow->flist, qbuf->user_fid);	// and the new fid of the function
	    
	    qbuf->fd = flow->fd;
	    qbuf->fid = fun->fid;
    }
    
    send(sock, qbuf, sizeof(struct mapiipcbuf), MSG_NOSIGNAL);

186
#else
's avatar
committed
187
    return -1;
188
#endif
Arne Øslebø's avatar
Arne Øslebø committed
189
  }
's avatar
committed
190
  return 0;
Arne Øslebø's avatar
Arne Øslebø committed
191
192
}

's avatar
committed
193
int mapiipc_read(struct mapiipcbuf *qbuf)
Arne Øslebø's avatar
Arne Øslebø committed
194
195
196
197
//Reads an IPC message. Blocking call
{
  if(recv(sock, qbuf, MAX_SEND_SIZE, 0) == -1){
    ERROR_CMD(printf("recv: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
's avatar
committed
198
    return -1;
Arne Øslebø's avatar
Arne Øslebø committed
199
  }
's avatar
committed
200
  return 0;
Arne Øslebø's avatar
Arne Øslebø committed
201
202
}

's avatar
committed
203
int mapiipc_client_init()
Arne Øslebø's avatar
Arne Øslebø committed
204
205
206
207
//Initializes IPC for mapi functions
{
  if ((sock = socket(AF_LOCAL, SOCK_STREAM, 0)) == -1) {
    ERROR_CMD(printf("socket: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
's avatar
committed
208
    return -1;
Arne Øslebø's avatar
Arne Øslebø committed
209
  }
's avatar
committed
210
211
212

  mapidsocket=malloc(sizeof(MAPIDSOCKHOME)-2+strlen(getenv("HOME")));
  sprintf(mapidsocket,MAPIDSOCKHOME,getenv("HOME")); 
Arne Øslebø's avatar
Arne Øslebø committed
213
214
215

  // Construct name of mapid's socket
  mapidaddr.sun_family = AF_LOCAL;
's avatar
committed
216
  strcpy(mapidaddr.sun_path, mapidsocket);
Arne Øslebø's avatar
Arne Øslebø committed
217
218
  mapidaddr_len = sizeof mapidaddr.sun_family + strlen(mapidaddr.sun_path);

's avatar
committed
219
  if (connect(sock, (struct sockaddr *)&mapidaddr, mapidaddr_len) < 0) 
's avatar
   
committed
220
  {
's avatar
committed
221
222
223
224
      free(mapidsocket);
      mapidsocket=strdup(MAPIDSOCKGLOBAL);
      strcpy(mapidaddr.sun_path, mapidsocket);
      mapidaddr_len = sizeof mapidaddr.sun_family + strlen(mapidaddr.sun_path);
225
226
227
      if (connect(sock, (struct sockaddr *)&mapidaddr, mapidaddr_len) < 0) {
    		ERROR_CMD(printf("connect: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
	}
's avatar
committed
228
      else {
229
230
		free(mapidsocket);
      	return 0;
's avatar
committed
231
      }
's avatar
   
committed
232
  }
's avatar
committed
233
  free(mapidsocket);
234
  return -1;
Arne Øslebø's avatar
Arne Øslebø committed
235
236
237
238
239
240
241
242
}

void mapiipc_client_close()
//Releases socket resources
{
  close(sock);
}

243
244
245
246
#ifdef DIMAPI
struct sockaddr_in remoteaddr;
flist_t *remote_flowlist=NULL;

247
int mapiipc_remote_write(struct dmapiipcbuf *dbuf, struct host *h){	// sends an IPC message to mapid
's avatar
committed
248
249

#ifdef DIMAPISSL
250
251
252
253
254
255
256
257
258

#ifdef RECONNECT
	if(SSL_write(h->con, dbuf, dbuf->length) <= 0){
		WARNING_CMD(printf("SSL_write: %s [%s:%d]\n", strerror(errno), __FILE__, __LINE__));
		sem_wait(&h->connection);	// lock the semaphore
	}
#else
	if(SSL_write(h->con, dbuf, dbuf->length) <= 0){
		WARNING_CMD(printf("SSL_write: %s [%s:%d]\n", strerror(errno), __FILE__, __LINE__));
's avatar
committed
259
260
		return -1;
	}
261
262
263
264
265
266
267
268
269
270
271
#endif

#else

#ifdef RECONNECT
	// MSG_NOSIGNAL : requests not to send SIGPIPE on errors on stream oriented sockets when the other end breaks the connection
	// need in mapi_get_next_packet()
	if(send(h->sockfd, dbuf, dbuf->length, MSG_NOSIGNAL) <= 0){
		WARNING_CMD( printf("send: %s [%s:%d]\n", strerror(errno), __FILE__, __LINE__) );
		sem_wait(&h->connection);	// lock the semaphore
	}
272
273
274
275
276
#else
	if(send(h->sockfd, dbuf, dbuf->length, 0) == -1) {
		WARNING_CMD(printf("send: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
		return -1;
	}
277
278
#endif

's avatar
committed
279
#endif
280
	return 0;
281
282
}

's avatar
committed
283
int mapiipc_remote_write_to_all(remote_flowdescr_t* rflow)
284
285
286
{
  host_flow* hflow;
  flist_node_t* fnode;
's avatar
committed
287

288
289
290
  for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
    hflow=(host_flow*)fnode->data;
    hflow->dbuf->fd=hflow->fd;
's avatar
fix    
committed
291
    if (mapiipc_remote_write(hflow->dbuf, hflow->rhost)<0) return -1;
's avatar
committed
292
  }
293

's avatar
committed
294
  for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
295
     sem_wait(&rflow->fd_sem);
296
  }
297

's avatar
committed
298
  return 0;
299
}
300
301
302
303
304
305
 
void cleanup_handler(void *arg){	//the cleanup handler

        free(arg);
	return;
}
306

307
308
void *mapiipc_comm_thread(void *host){		// reads an IPC message - blocking call

309
310
311
312
  struct dmapiipcbuf* dbuf;
  remote_flowdescr_t* rflow;
  host_flow* hflow;
  int recv_bytes;
's avatar
committed
313

314
315
#ifdef RECONNECT
  int check_net;
's avatar
committed
316
  struct dmapiipcbuf dbuf_;
's avatar
committed
317
#endif
318
  
's avatar
committed
319
  /* Guarantees that thread resources are deallocated upon return */
320
321
322
323
  pthread_detach(pthread_self());
  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);		// enable cancellation
  pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);	// changes the type of responses to cancellation requests for the calling thread
 								// asynchronous (cancel the calling thread as soon as the cancellation request is received)
324
  dbuf = (struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf));
325

326
327
328
329
330
331
  // pthread_cleanup_push() function pushes the specified cancellation cleanup handler onto
  // the cancellation cleanup stack of the calling thread. When a thread exits or is cancelled,
  // and its cancellation cleanup stack is not empty, the cleanup handlers are invoked with
  // the specified argument in last-in-first-out order from the cancellation cleanup stack
  pthread_cleanup_push(cleanup_handler, dbuf);

332
  while(1){
333

334
	  if(host == NULL) break;
's avatar
committed
335

's avatar
committed
336
#ifdef DIMAPISSL
337
      	  recv_bytes = SSL_readn( ((struct host *) host)->con, dbuf, BASIC_SIZE);
338
#else
339
      	  recv_bytes = readn( ((struct host *) host)->sockfd, dbuf, BASIC_SIZE);
's avatar
committed
340
341
#endif

342
343
344
345
346
347
348
349
350
351
	  if(recv_bytes == 0){				// the peer has gone
#ifdef RECONNECT
	  	  printf("\n\t--->  Mapicommd is down. Socket closed from host: %s\n", ((struct host *)host)->hostname);
		  check_net = check_network_mapicommd((struct host *) host);

		  if(check_net == 1)			// network is up
			  continue;
		  else{					// network is down
			  printf("\nNetwork down ...\n");
			  restore_network_mapicommd((struct host *) host);
's avatar
committed
352
353
354
355
356
357
358

			  dbuf_.cmd = IGNORE_SLEEP;
			  dbuf_.length = BASIC_SIZE;
			  
			  if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0)	// send an IPC message to mapicommd
				  break;

359
360
361
362
363
364
			  mapi_recreate_flow((struct host *) host);
			  mapi_reapply_function((struct host *) host);
			  mapi_reconnect((struct host *) host);
#ifdef WITH_AUTHENTICATION
			  mapi_reauthenticate((struct host *) host);
#endif
's avatar
committed
365
366
367
368
369
370
			  dbuf_.cmd = IGNORE_NOTIFY;
			  dbuf_.length = BASIC_SIZE;
			  
			  if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0)	// send an IPC message to mapicommd
				  break;
			  
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
			  check_mapi_functions((struct host *) host);
			  mapi_get_next_packet((struct host *) host);
			  sem_trywait(& ((struct host *) host)->connection);	// lock the semaphore only if the semaphore is currently not locked
			  sem_post(& ((struct host *) host)->connection);	// unlock the semaphore
			  continue;
		  }
#else
		  break;
#endif
      	  }

      	  else if(recv_bytes == -1){
#ifdef RECONNECT
	  	  printf("\n\t--->  Mapicommd is down. Socket closed from host: %s\n", ((struct host *)host)->hostname);
		  check_net = check_network_mapicommd((struct host *) host);

  		  if(check_net == 1)			// network is up
			  continue;
		  else{					// network is down
			  printf("\nNetwork down ...\n");
			  restore_network_mapicommd((struct host *) host);
's avatar
committed
392
393
394
395
396
397
398

			  dbuf_.cmd = IGNORE_SLEEP;
			  dbuf_.length = BASIC_SIZE;
			  
			  if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0)	// send an IPC message to mapicommd
				  break;

399
400
401
402
403
404
			  mapi_recreate_flow((struct host *) host);
			  mapi_reapply_function((struct host *) host);
			  mapi_reconnect((struct host *) host);
#ifdef WITH_AUTHENTICATION
			  mapi_reauthenticate((struct host *) host);
#endif
's avatar
committed
405
406
407
408
409
410
			  dbuf_.cmd = IGNORE_NOTIFY;
			  dbuf_.length = BASIC_SIZE;
			  
			  if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0)	// send an IPC message to mapicommd
				  break;

411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
			  check_mapi_functions((struct host *) host);
			  mapi_get_next_packet((struct host *) host);
			  sem_trywait(& ((struct host *) host)->connection);	// lock the semaphore only if the semaphore is currently not locked
			  sem_post(& ((struct host *) host)->connection);	// unlock the semaphore
			  continue;
		  }
#else
		  continue;
#endif
	  }
      
	  if (dbuf->length > DIMAPI_DATA_SIZE) {
	  	  fprintf(stderr,"Bad IPC message from agent\n");
	  	  continue;
      	  }
      
	  if (dbuf->length - BASIC_SIZE>0) {
's avatar
committed
428
429

#ifdef DIMAPISSL
430
		  recv_bytes = SSL_readn( ((struct host *) host)->con, (char *)dbuf + BASIC_SIZE, dbuf->length - BASIC_SIZE );
431
#else
432
	      	  recv_bytes = readn( ((struct host *) host)->sockfd, (char *)dbuf + BASIC_SIZE, dbuf->length - BASIC_SIZE);
's avatar
committed
433
#endif
434
435
436
437
438
439
440
441
442
443
	  	  if(recv_bytes == 0){			// the peer has gone
#ifdef RECONNECT
		  	  printf("\n\t--->  Mapicommd is down. Socket closed from host: %s\n", ((struct host *)host)->hostname);
			  check_net = check_network_mapicommd((struct host *) host);
	
			  if(check_net == 1)			// network is up
				  continue;
			  else{					// network is down
				  printf("\nNetwork down ...\n");
				  restore_network_mapicommd((struct host *) host);
's avatar
committed
444
445
446
447
448
449
450

				  dbuf_.cmd = IGNORE_SLEEP;
				  dbuf_.length = BASIC_SIZE;
				  
				  if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0)	// send an IPC message to mapicommd
					  break;

451
452
453
454
455
456
				  mapi_recreate_flow((struct host *) host);
				  mapi_reapply_function((struct host *) host);
				  mapi_reconnect((struct host *) host);
#ifdef WITH_AUTHENTICATION
				  mapi_reauthenticate((struct host *) host);
#endif
's avatar
committed
457
458
459
460
461
462
				  dbuf_.cmd = IGNORE_NOTIFY;
				  dbuf_.length = BASIC_SIZE;
				  
				  if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0)	// send an IPC message to mapicommd
				  	break;

463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
				  check_mapi_functions((struct host *) host);
				  mapi_get_next_packet((struct host *) host);
				  sem_trywait(& ((struct host *) host)->connection);	// lock the semaphore only if the semaphore is currently not locked
				  sem_post(& ((struct host *) host)->connection);	// unlock the semaphore
				  continue;
			  }
#else
			  break;
#endif
		  }

	  	  else if(recv_bytes == -1){
#ifdef RECONNECT			  
		  	  printf("\n\t--->  Mapicommd is down. Socket closed from host: %s\n", ((struct host *)host)->hostname);
			  check_net = check_network_mapicommd((struct host *) host);
	
			  if(check_net == 1)			// network is up
				  continue;
			  else{					// network is down
				  printf("\nNetwork down ...\n");
				  restore_network_mapicommd((struct host *) host);
's avatar
committed
484
485
486
487
488
489
490

				  dbuf_.cmd = IGNORE_SLEEP;
				  dbuf_.length = BASIC_SIZE;
				  
				  if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0)	// send an IPC message to mapicommd
					  break;

491
492
493
494
495
496
				  mapi_recreate_flow((struct host *) host);
				  mapi_reapply_function((struct host *) host);
				  mapi_reconnect((struct host *) host);
#ifdef WITH_AUTHENTICATION
				  mapi_reauthenticate((struct host *) host);
#endif
's avatar
committed
497
498
499
500
501
502
				  dbuf_.cmd = IGNORE_NOTIFY;
				  dbuf_.length = BASIC_SIZE;
				  
				  if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0)	// send an IPC message to mapicommd
				  	break;
				  
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
				  check_mapi_functions((struct host *) host);
				  mapi_get_next_packet((struct host *) host);
				  sem_trywait(& ((struct host *) host)->connection);	// lock the semaphore only if the semaphore is currently not locked
				  sem_post(& ((struct host *) host)->connection);	// unlock the semaphore
				  continue;
			  }
#else
			  continue;
#endif
		  }
      	  }
      
	  hflow=(host_flow*)flist_get( ((struct host*)host)->flows, dbuf->fd );

      	  if (hflow!=NULL) {

	    	  rflow=flist_get(remote_flowlist, hflow->scope_fd);

	    	  if (dbuf->cmd==GET_NEXT_PKT_ACK) {
522
523
524
			  if (dbuf->length == BASIC_SIZE) {
				hflow->pkt->caplen = 0;
			  }
525
526
527
528
529
530
531
532
533
534
535
536
537
	  		  memcpy(hflow->pkt, dbuf->data, dbuf->length-BASIC_SIZE);
	  		  flist_append(rflow->pkt_list, 0, hflow);
	  		  sem_post(&rflow->pkt_sem);
	    	  }
	    	  else {
		  	  memcpy( hflow->dbuf, dbuf, dbuf->length );	//place data
		  	  sem_post( &rflow->fd_sem );
	    	  }
      	  }
      	  else {
	    	  fprintf(stderr,"Invalid IPC message, unknown fd %d\n",dbuf->fd);
	    	  continue;
      	  }
538
  }
539
540
  pthread_cleanup_pop(1);	// pthread_cleanup_pop() function shall remove the routine at the top of the
 				// calling thread's cancellation cleanup stack and invoke it
's avatar
committed
541
  return NULL;
542
543
}

's avatar
committed
544
int mapiipc_remote_init(struct host *h)
545
546
//Initializes IPC for dmapi functions
{
's avatar
committed
547
548
  struct hostent* host=gethostbyname(h->hostname);
  struct timeval tv;
549

550
#ifdef DIMAPISSL
551
552
	SSL_library_init();		// registers the available ciphers and digests
	SSL_load_error_strings();	// registers the error strings for all libcrypto functions and libssl
's avatar
committed
553
554
555

	if ((ctx=SSL_CTX_new(SSLv3_client_method())) == NULL) {
		ERR_print_errors_fp(stderr);
's avatar
committed
556
		return -1;
's avatar
committed
557
558
559
	}
	if ((h->con = SSL_new(ctx)) == NULL) {
		ERR_print_errors_fp(stderr);
's avatar
committed
560
		return -1;
's avatar
committed
561
	}
562
563

	signal(SIGPIPE, sigpipe_handle);	// catch SIGPIPE signal (SSL_write), in case of reconnection ...
's avatar
committed
564
565
#endif

566
567
  if ((h->sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
    ERROR_CMD(printf("socket: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
's avatar
committed
568
    return -1;
's avatar
committed
569
570
  }

's avatar
fix    
committed
571
  /*
572
  tv.tv_sec=20;		//timeout 20 sec for recv
's avatar
committed
573
574
575
576
577
  tv.tv_usec=0;

  if (setsockopt(h->sockfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval)) == -1) {
    close(h->sockfd);
    printf("Unexpected error on setsockopt()");
's avatar
committed
578
579
    //exit(-1);
    return -1;
's avatar
committed
580
  }
's avatar
fix    
committed
581
  */
582

's avatar
committed
583
584
585
586
587
588
  tv.tv_sec=10;		//timeout 10 sec for send
  tv.tv_usec=0;

  if (setsockopt(h->sockfd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(struct timeval)) == -1) {
    close(h->sockfd);
    printf("Unexpected error on setsockopt()");
's avatar
committed
589
    return -1;
's avatar
committed
590
591
592
593
  }

  if (host==NULL) {
    printf("Could not determine address for %s\n",h->hostname);
's avatar
committed
594
    return -1;
595
  }
's avatar
committed
596

597
598
  // Construct name of dmapid's socket
  remoteaddr.sin_family = AF_INET;
's avatar
committed
599
  remoteaddr.sin_addr = *((struct in_addr *)host->h_addr);
600
601
  remoteaddr.sin_port = htons(h->port);

602
  if (connect(h->sockfd, (struct sockaddr *)&remoteaddr, sizeof(remoteaddr)) < 0) {
's avatar
committed
603
    ERROR_CMD(printf("connect failed: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
's avatar
committed
604
    return -1;
605
  }
's avatar
committed
606
607

#ifdef DIMAPISSL
608
	if (SSL_set_fd(h->con, h->sockfd) == 0) {
's avatar
committed
609
		ERR_print_errors_fp(stderr);
's avatar
committed
610
		return -1;
's avatar
committed
611
612
613
	}
	if (SSL_connect(h->con) <= 0) {
		ERR_print_errors_fp(stderr);
's avatar
committed
614
		return -1;
's avatar
committed
615
616
	}
#endif
's avatar
committed
617
  return 0;
's avatar
committed
618

619
620
621
622
623
}

void mapiipc_remote_close(struct host *h)
//Releases socket resources
{
624
625
626
	shutdown(h->sockfd, SHUT_RDWR);
	close(h->sockfd);

's avatar
committed
627
#ifdef DIMAPISSL
628
	if (SSL_shutdown(h->con) == -1)	// shut down a TLS/SSL connection
's avatar
committed
629
		ERR_print_errors_fp(stderr);
630
631
632
633
634
635
636
637
638
639

	SSL_free(h->con);		// decrements the reference count of ssl, and removes the SSL structure pointed to by ssl
       					// frees up the allocated memory if the the reference count has reached 0
	if(ctx != NULL)
		SSL_CTX_free(ctx);	// decrements the reference count of ctx, and removes the SSL_CTX object pointed to by ctx
					// frees up the allocated memory if the the reference count has reached 0
	ERR_remove_state(0);		// the current thread will have its error queue removed	
	ERR_free_strings();		// frees all previously loaded error strings
	EVP_cleanup();			// removes all ciphers and digests from the table	
	CRYPTO_cleanup_all_ex_data();	// clean up all allocated state
's avatar
committed
640
#endif
641
}
642
643

#endif /* DIMAPI */
644

Arne Øslebø's avatar
Arne Øslebø committed
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672

// Helper functions for function arguments retrieval
int getargint(mapiFunctArg **pos){
	int i;
	i = *((int *)(*pos));
	//	printf("getint: %d\n", i);
	(*pos) += sizeof(int);
	return i;
}

char getargchar(mapiFunctArg **pos){
	char c;
	c = *((char *)(*pos));
	//printf("getchar: %c\n", c);
	(*pos) += sizeof(char);
	return c;
}

unsigned long long getargulonglong(mapiFunctArg **pos){
	unsigned long long l;
	l = *((unsigned long long *)(*pos));
	//printf("getulonglong: %lld\n", l);
	(*pos) += sizeof(unsigned long long);
	return l;
}

char * getargstr(mapiFunctArg **pos){
	char *s;
673
	s = (char*)*pos;
Arne Øslebø's avatar
Arne Øslebø committed
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
	//printf("getstr: %s\n", s);
	(*pos) += strlen(s)+1;
	return s;
}

void addarg(mapiFunctArg **pos, void *arg, int type)
// Helper function for mapi_apply_function()
// pos: current position in message's argument buffer.
// arg: argument to copy into buffer
// type: argument type
{
  switch(type){
    case INT:
      memcpy(*pos, arg, sizeof(int));
      //printf("add_arg: %d\n", *((int *)(*pos)));
      (*pos) += sizeof(int);
      break;
    case CHAR:
      memcpy(*pos, arg, sizeof(char));
      //printf("add_arg: %d\n", *((char *)(*pos)));
      (*pos) += sizeof(char);
      break;
    case UNSIGNED_LONG_LONG:
      memcpy(*pos, arg, sizeof(unsigned long long));
      //printf("add_arg: %llu\n", *((unsigned long long *)(*pos)));
      (*pos) += sizeof(unsigned long long);
      break;
    case STRING:
      memcpy(*pos, arg, strlen((char *)arg)+1);
      //printf("add_arg: %s\n", (char *)(*pos));
      (*pos) += strlen((char *)arg)+1;
      break;
    default:
      break;
  }
}

int mapiipc_send_fd(int sendfd)
{
  struct msghdr	msg;
  struct iovec	iov[1];
  char ptr[2];
  int ret;

#ifdef	HAVE_MSGHDR_MSG_CONTROL
  union {
    struct cmsghdr	cm;
    char control[CMSG_SPACE(sizeof(int))];
  } control_un;
  struct cmsghdr	*cmptr;
  
  msg.msg_control = control_un.control;
  msg.msg_controllen = sizeof(control_un.control);
  
  cmptr = CMSG_FIRSTHDR(&msg);
  cmptr->cmsg_len = CMSG_LEN(sizeof(int));
  cmptr->cmsg_level = SOL_SOCKET;
  cmptr->cmsg_type = SCM_RIGHTS;
  *((int *) CMSG_DATA(cmptr)) = sendfd;
#else
  msg.msg_accrights = (caddr_t) &sendfd;
  msg.msg_accrightslen = sizeof(int);
#endif
    
  iov[0].iov_base = ptr;
  iov[0].iov_len = 2;
  msg.msg_iov = iov;
  msg.msg_iovlen = 1;
  msg.msg_name = NULL;
  msg.msg_namelen = 0;

  ret=sendmsg(sock,&msg,0);
  return(ret);
}


int mapiipc_read_fd(int sock)
{
  struct msghdr	msg;
  struct iovec	iov[1];
  ssize_t n;
  int recvfd;
  char c[2];
  
#ifdef	HAVE_MSGHDR_MSG_CONTROL
  union {
    struct cmsghdr	cm;
    char				control[CMSG_SPACE(sizeof(int))];
  } control_un;
  struct cmsghdr	*cmptr;
  
  msg.msg_control = control_un.control;
  msg.msg_controllen = sizeof(control_un.control);
#else
  msg.msg_accrights = (caddr_t) &newfd;
  msg.msg_accrightslen = sizeof(int);
#endif
  
  iov[0].iov_base = &c;
  iov[0].iov_len = 2;
  msg.msg_iov = iov;
  msg.msg_iovlen = 1;
  msg.msg_name = NULL;
  msg.msg_namelen = 0;
  
  
  if ( (n = recvmsg(sock, &msg, 0)) <= 0)
    return(n);
  
#ifdef	HAVE_MSGHDR_MSG_CONTROL
  if ( (cmptr = CMSG_FIRSTHDR(&msg)) != NULL &&
       cmptr->cmsg_len == CMSG_LEN(sizeof(int))) {
    if (cmptr->cmsg_level != SOL_SOCKET) {
      // err_quit("control level != SOL_SOCKET");
      fprintf(stderr, "control level != SOL_SOCKET");
      return -1;
    }
    if (cmptr->cmsg_type != SCM_RIGHTS) {
      // err_quit("control type != SCM_RIGHTS");
      fprintf(stderr, "control type != SCM_RIGHTS");
      return -1;
    }
    recvfd = *((int *) CMSG_DATA(cmptr));
  } else
    recvfd = -1;		/* descriptor was not passed */
#else
  /* *INDENT-OFF* */
  if (msg.msg_accrightslen == sizeof(int))
    recvfd = newfd;
  else
    recvfd = -1;		/* descriptor was not passed */
  /* *INDENT-ON* */
#endif

  return recvfd;
}
's avatar
committed
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834

/* Read "n" bytes from a socket. */
ssize_t readn(int fd, void *vptr, size_t n) {
        size_t nleft;
        ssize_t nread;
        char *ptr;

        ptr = vptr;
        nleft = n;
        while (nleft > 0) {
		errno=0;
                if ( (nread = read(fd, ptr, nleft)) < 0) {
                        if (errno == EINTR)
                                nread = 0;              /* and call read() again */
                        else
                                return(-1);
                } else if (nread == 0)
                        return 0;                          /* EOF */

                nleft -= nread;
                ptr   += nread;
        }
        return(n - nleft);              /* return >= 0 */
}

's avatar
committed
835
836
#ifdef DIMAPISSL
ssize_t SSL_readn(SSL *con, void *vptr, size_t n) {
837

's avatar
committed
838
839
840
        size_t nleft;
        ssize_t nread;
        char *ptr;
's avatar
committed
841

's avatar
committed
842
843
        ptr = vptr;
        nleft = n;
844

's avatar
committed
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
        while (nleft > 0) {
		errno=0;
                if ( (nread = SSL_read(con, ptr, nleft)) < 0) {
                        if (errno == EINTR)
                                nread = 0;              /* and call read() again */
                        else
                                return(-1);
                } else if (nread == 0)
                        return 0;                          /* EOF */

                nleft -= nread;
                ptr   += nread;
        }
        return(n - nleft);              /* return >= 0 */
}
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894

void sigpipe_handle(){	// catch the signal

	printf("Broken pipe (signal handler) ...\n");
	return;
}
#endif

#ifdef RECONNECT

#ifdef DIMAPI

// this function checks if network (mapicommd) is up
// returns 1 if network is up or 0 if network is down
int check_network_mapicommd(struct host *h){

	struct hostent *host;
	struct sockaddr_in remote_address;
	int sockfd = 0;

	host = gethostbyname(h->hostname);	// information about the specified host

	if( (sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
		ERROR_CMD(printf("socket: %s [%s:%d]\n", strerror(errno), __FILE__, __LINE__));
    	        return -1;
	}

	// construct name of dmapid's socket
	remote_address.sin_family = AF_INET;				// address family
 	remote_address.sin_addr = *((struct in_addr *)host->h_addr);
  	remote_address.sin_port = htons(h->port);			// port number (2233)

  	if(connect(sockfd, (struct sockaddr *)&remote_address, sizeof(remote_address)) < 0)	// remote mapid server is down ...
		return 0;

's avatar
committed
895
	shutdown(sockfd, SHUT_RDWR);
896
897
898
899
900
901
902
903
904
905
906
	close(sockfd);
	return 1;
}

// this function restores the connection to mapicommd in case of a breakdown, using back-off mechanism
void restore_network_mapicommd(struct host *h){

	int tries;		// how many times will we try to reconnect
	int time = 1;		// initial waiting time
	int check_net;

907
	for(tries = 0; ; tries++){
908
909
910
911
912
913
914
915
916
917
918
919
920
921

		printf("\n---> Reconnection try #%d", tries);
		check_net = check_network_mapicommd(h);
		fflush(stdout);

		if(check_net == 1){		// network is now up ...

			printf("\nMapid server is back online ...\n");

			mapiipc_remote_close(h);	// release previous socket resources
			mapiipc_remote_init(h);		// initializes IPC for DiMAPI functions
			return;
		}

922
923
924
925
		sleep(time);		// pause execution for the specified time

		if (tries < 8)
			time += time;	// increase waiting time
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
	}

	fprintf(stderr,"\nNetwork is down (reconnection tries expired)\n");
	exit(EXIT_FAILURE);
}

// this function recreates all flows, that host which broke down the connection had created
void mapi_recreate_flow(struct host *h){

	flist_node_t *fnode;
	flist_t *tmp_list = NULL;	// all created flows from this host
	host_flow *hflow;
	struct dmapiipcbuf *dbuf = NULL;
	int recv_bytes;

	dbuf = (struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf));	// buffer to store messages that are sent/received to/from mapicommd
	tmp_list = (flist_t *)malloc(sizeof(flist_t));
	flist_init(tmp_list);
	
	for(fnode = flist_head(h->flows); fnode != NULL; fnode = flist_next(fnode)){	// iterate flow list of the specified host

		hflow = (host_flow *)fnode->data;	// data stored in the node

		dbuf->cmd = CREATE_FLOW;
		strncpy((char *) dbuf->data, hflow->dev, DATA_SIZE);
		dbuf->length = BASIC_SIZE + strlen(hflow->dev) + 1;

		if(mapiipc_remote_write(dbuf, hflow->rhost) < 0){		// send an IPC message to mapicommd
			free(dbuf); dbuf = NULL;
			exit(EXIT_FAILURE);
		}
#ifdef DIMAPISSL
		recv_bytes = SSL_readn(h->con, dbuf, BASIC_SIZE);
#else
		recv_bytes = readn(h->sockfd, dbuf, BASIC_SIZE);		// receive an IPC message from mapicommd
#endif
		if(dbuf->length - BASIC_SIZE > 0){				// TODO check recv_bytes
#ifdef DIMAPISSL
			recv_bytes = SSL_readn(h->con, (char *)dbuf + BASIC_SIZE, dbuf->length - BASIC_SIZE);
#else
			recv_bytes = readn(h->sockfd, (char *)dbuf + BASIC_SIZE, dbuf->length - BASIC_SIZE);
#endif
		}

		if(dbuf->cmd == CREATE_FLOW_ACK){
			//printf("\n\t New fd = %d, Previous fd = %d\n", *(int *)(dbuf->data), hflow->fd);

			hflow->dbuf->fd = *(int *)(dbuf->data);		// FIXME
			hflow->fd = *((int *)dbuf->data);		// change previous fd with the new one
			flist_append(tmp_list, hflow->fd, hflow);	// append new node to the tmp list
		}
		else{
			fprintf(stderr, "\nError: Could not re-create flow in host %s [%s : %d]\n", hflow->rhost->hostname, __FILE__, __LINE__);
			free(dbuf); dbuf = NULL;
			exit(EXIT_FAILURE);
		}
	}
	free(dbuf); dbuf = NULL;
	flist_destroy(h->flows);	// destroy old flow list
	free(h->flows);
	h->flows = tmp_list;		// now flow list is the tmp list

	return;
}

// this function reapplies all functions, that host which broke down the connection had applyed
void mapi_reapply_function(struct host *h){

	flist_node_t *fnode;
	host_flow *hflow;
	function_data *fdata, *fdata_;
	remote_flowdescr_t* ref_flow;
	mapiFunctArg *pos;
	struct dmapiipcbuf *dbuf = NULL;
	struct mapiipcbuf qbuf = {-1, -1, -1, -1, -1, "", -1, -1, -1, 0, "", "", -1};