mapiipc.c 12.9 KB
Newer Older
1 2 3
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
Arne Øslebø's avatar
Arne Øslebø committed
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/un.h>
#include <string.h>
#include <errno.h>

#include "mapiipc.h"
#include "debug.h"

#define HAVE_MSGHDR_MSG_CONTROL 1

22 23 24 25 26 27 28
#ifdef DIMAPI
#include <netdb.h>
#include <netinet/in.h>
#include <semaphore.h>
#include "flist.h"
#endif

Arne Øslebø's avatar
Arne Øslebø committed
29 30
// this file contains all the client-side IPC related functions

's avatar
 
committed
31 32 33
static int sock;
static int mapidaddr_len;
static struct sockaddr_un mapidaddr;
's avatar
committed
34
//static struct sockaddr_un fromaddr;
's avatar
committed
35
static char* mapidsocket;
Arne Øslebø's avatar
Arne Øslebø committed
36

's avatar
committed
37 38 39 40
#ifdef DIMAPISSL
static SSL_CTX *ctx;
#endif

's avatar
committed
41
int mapiipc_write(struct mapiipcbuf *qbuf)
Arne Øslebø's avatar
Arne Øslebø committed
42 43 44 45 46 47
//Sends an IPC message to mapid
{
  qbuf->uid=getuid();
  if(send(sock, qbuf, sizeof(struct mapiipcbuf), 0) == -1) {
    WARNING_CMD(printf("send: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
    //    exit(1);
's avatar
committed
48
    return -1;
Arne Øslebø's avatar
Arne Øslebø committed
49
  }
's avatar
committed
50
  return 0;
Arne Øslebø's avatar
Arne Øslebø committed
51 52
}

's avatar
committed
53
int mapiipc_read(struct mapiipcbuf *qbuf)
Arne Øslebø's avatar
Arne Øslebø committed
54 55 56 57
//Reads an IPC message. Blocking call
{
  if(recv(sock, qbuf, MAX_SEND_SIZE, 0) == -1){
    ERROR_CMD(printf("recv: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
's avatar
committed
58 59
    //exit(1);
    return -1;
Arne Øslebø's avatar
Arne Øslebø committed
60
  }
's avatar
committed
61
  return 0;
Arne Øslebø's avatar
Arne Øslebø committed
62 63
}

's avatar
committed
64
int mapiipc_client_init()
Arne Øslebø's avatar
Arne Øslebø committed
65 66 67 68
//Initializes IPC for mapi functions
{
  if ((sock = socket(AF_LOCAL, SOCK_STREAM, 0)) == -1) {
    ERROR_CMD(printf("socket: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
's avatar
committed
69 70
    //exit(1);
    return -1;
Arne Øslebø's avatar
Arne Øslebø committed
71
  }
's avatar
committed
72 73 74

  mapidsocket=malloc(sizeof(MAPIDSOCKHOME)-2+strlen(getenv("HOME")));
  sprintf(mapidsocket,MAPIDSOCKHOME,getenv("HOME")); 
Arne Øslebø's avatar
Arne Øslebø committed
75 76 77

  // Construct name of mapid's socket
  mapidaddr.sun_family = AF_LOCAL;
's avatar
committed
78
  strcpy(mapidaddr.sun_path, mapidsocket);
Arne Øslebø's avatar
Arne Øslebø committed
79 80
  mapidaddr_len = sizeof mapidaddr.sun_family + strlen(mapidaddr.sun_path);

's avatar
committed
81
  if (connect(sock, (struct sockaddr *)&mapidaddr, mapidaddr_len) < 0) 
's avatar
 
committed
82
  {
's avatar
committed
83
	//	ERROR_CMD(printf("connect: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
's avatar
committed
84 85 86 87
      free(mapidsocket);
      mapidsocket=strdup(MAPIDSOCKGLOBAL);
      strcpy(mapidaddr.sun_path, mapidsocket);
      mapidaddr_len = sizeof mapidaddr.sun_family + strlen(mapidaddr.sun_path);
88 89 90
      if (connect(sock, (struct sockaddr *)&mapidaddr, mapidaddr_len) < 0) {
    		ERROR_CMD(printf("connect: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
	}
's avatar
committed
91
      else {
92
		free(mapidsocket);
's avatar
committed
93
      	//exit(EXIT_FAILURE);
94
      	return 0;
's avatar
committed
95
      }
's avatar
 
committed
96
  }
's avatar
committed
97
  free(mapidsocket);
98
  return -1;
Arne Øslebø's avatar
Arne Øslebø committed
99 100 101 102 103 104 105 106
}

void mapiipc_client_close()
//Releases socket resources
{
  close(sock);
}

107 108 109 110
#ifdef DIMAPI
struct sockaddr_in remoteaddr;
flist_t *remote_flowlist=NULL;

's avatar
committed
111
int mapiipc_remote_write(struct dmapiipcbuf *dbuf, struct host *h)
112 113
//Sends an IPC message to mapid
{
114
//  qbuf->uid=getuid();
's avatar
committed
115 116 117 118 119 120 121

#ifdef DIMAPISSL
	if(SSL_write(h->con,dbuf,dbuf->length) == -1){
		WARNING_CMD(printf("SSL_write: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
		//    exit(1);
		return -1;
	}
122 123 124 125 126 127
#else
	if(send(h->sockfd, dbuf, dbuf->length, 0) == -1) {
		WARNING_CMD(printf("send: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
		//    exit(1);
		return -1;
	}
's avatar
committed
128
#endif
129
	return 0;
130 131
}

's avatar
committed
132 133


's avatar
committed
134
int mapiipc_remote_write_to_all(remote_flowdescr_t* rflow)
135 136 137
{
  host_flow* hflow;
  flist_node_t* fnode;
's avatar
committed
138

's avatar
committed
139 140
  //pthread_mutex_lock(&rflow->mutex);
  //rflow->pending_msgs=0;
141 142 143
  for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
    hflow=(host_flow*)fnode->data;
    hflow->dbuf->fd=hflow->fd;
's avatar
fix  
committed
144
    if (mapiipc_remote_write(hflow->dbuf, hflow->rhost)<0) return -1;
's avatar
committed
145 146 147 148
    //rflow->pending_msgs++;
  }
  //pthread_mutex_unlock(&rflow->mutex);
  for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
149
     sem_wait(&rflow->fd_sem);
150
  }
's avatar
committed
151
  return 0;
152 153 154 155 156 157 158 159
}

void *mapiipc_comm_thread(void *host) {
//Reads an IPC message. Blocking call
  struct dmapiipcbuf* dbuf;
  remote_flowdescr_t* rflow;
  host_flow* hflow;
  int recv_bytes;
's avatar
committed
160
  int sockfd=((struct host *)host)->sockfd;
's avatar
committed
161

's avatar
committed
162 163 164 165
#ifdef DIMAPISSL
  SSL *hostconn = ((struct host*)host)->con;
#endif

's avatar
committed
166 167 168
  /* Guarantees that thread resources are deallocated upon return */
  pthread_detach(pthread_self());

169
  dbuf = (struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf));
170 171 172

  while (1) {

173
    if (host==NULL) break;
's avatar
committed
174

's avatar
committed
175
#ifdef DIMAPISSL
176 177 178
    recv_bytes = SSL_readn(hostconn,dbuf,BASIC_SIZE);
#else
    recv_bytes = readn(sockfd, dbuf, BASIC_SIZE);
's avatar
committed
179 180
#endif

's avatar
committed
181 182 183 184 185 186 187 188
    if (recv_bytes == 0) { // the peer has gone
	//printf("Socket closed\n");
	break;
    }
    else if (recv_bytes == -1) {
	//ERROR_CMD(printf("recv: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
	continue;
    }
189

's avatar
committed
190 191 192 193
    if (dbuf->length > DIMAPI_DATA_SIZE) {
	fprintf(stderr,"Bad IPC message from agent\n");
	continue;
    }
194

's avatar
committed
195
    if (dbuf->length-BASIC_SIZE>0) {
's avatar
committed
196 197

#ifdef DIMAPISSL
198 199
    recv_bytes = SSL_readn(hostconn,(char *)dbuf + BASIC_SIZE, dbuf->length - BASIC_SIZE );
#else
's avatar
committed
200
	recv_bytes=readn(sockfd, (char*)dbuf+BASIC_SIZE, dbuf->length-BASIC_SIZE);
's avatar
committed
201
#endif
's avatar
committed
202 203 204 205 206 207 208 209 210
	if (recv_bytes == 0) { // the peer has gone
		//printf("Socket closed\n");
		break;
	}
	else if (recv_bytes == -1) {
		//ERROR_CMD(printf("recv: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
		continue;
	}
    }
211

212 213 214
    hflow=(host_flow*)flist_get( ((struct host*)host)->flows, dbuf->fd );
    if (hflow!=NULL) {
      rflow=flist_get(remote_flowlist, hflow->scope_fd);
's avatar
 
committed
215 216
      if (dbuf->cmd==GET_NEXT_PKT_ACK) {
		memcpy(hflow->pkt, dbuf->data, dbuf->length-BASIC_SIZE);
's avatar
committed
217 218
		flist_append(rflow->pkt_list, 0, hflow);
		sem_post(&rflow->pkt_sem);
's avatar
 
committed
219 220 221
      }
      else {
	memcpy( hflow->dbuf, dbuf, dbuf->length );	//place data
's avatar
committed
222 223

	sem_post( &rflow->fd_sem );
224 225 226
      }
    }
    else {
's avatar
committed
227
      fprintf(stderr,"Invalid IPC message, unknown fd %d\n",dbuf->fd);
's avatar
committed
228
      //exit(-1);
229
      //failure
's avatar
committed
230
      continue;
231 232 233
    }

  }
's avatar
committed
234 235 236

  free(dbuf);
  return NULL;
237 238 239
}


's avatar
committed
240
int mapiipc_remote_init(struct host *h)
241 242
//Initializes IPC for dmapi functions
{
's avatar
committed
243 244
  struct hostent* host=gethostbyname(h->hostname);
  struct timeval tv;
245

246
#ifdef DIMAPISSL
's avatar
committed
247 248 249 250 251 252 253 254 255 256 257 258 259
	SSL_library_init();
	SSL_load_error_strings();

	if ((ctx=SSL_CTX_new(SSLv3_client_method())) == NULL) {
		ERR_print_errors_fp(stderr);
		return 0;
	}
	if ((h->con = SSL_new(ctx)) == NULL) {
		ERR_print_errors_fp(stderr);
		return 0;;
	}
#endif

260 261
  if ((h->sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
    ERROR_CMD(printf("socket: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
's avatar
committed
262 263
    //exit(-1);
    return -1;
's avatar
committed
264 265
  }

's avatar
fix  
committed
266
  /*
267
  tv.tv_sec=20;		//timeout 20 sec for recv
's avatar
committed
268 269 270 271 272
  tv.tv_usec=0;

  if (setsockopt(h->sockfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval)) == -1) {
    close(h->sockfd);
    printf("Unexpected error on setsockopt()");
's avatar
committed
273 274
    //exit(-1);
    return -1;
's avatar
committed
275
  }
's avatar
fix  
committed
276
  */
277

's avatar
committed
278 279 280 281 282 283
  tv.tv_sec=10;		//timeout 10 sec for send
  tv.tv_usec=0;

  if (setsockopt(h->sockfd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(struct timeval)) == -1) {
    close(h->sockfd);
    printf("Unexpected error on setsockopt()");
's avatar
committed
284 285
    //exit(-1);
    return -1;
's avatar
committed
286 287 288 289
  }

  if (host==NULL) {
    printf("Could not determine address for %s\n",h->hostname);
's avatar
committed
290 291
    //exit(1);
    return -1;
292
  }
's avatar
committed
293

294 295
  // Construct name of dmapid's socket
  remoteaddr.sin_family = AF_INET;
's avatar
committed
296
  remoteaddr.sin_addr = *((struct in_addr *)host->h_addr);
297 298
  remoteaddr.sin_port = htons(h->port);

299
  if (connect(h->sockfd, (struct sockaddr *)&remoteaddr, sizeof(remoteaddr)) < 0) {
's avatar
committed
300
    ERROR_CMD(printf("connect failed: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
's avatar
committed
301 302
    //exit(EXIT_FAILURE);
    return -1;
303
  }
's avatar
committed
304 305

#ifdef DIMAPISSL
306
	if (SSL_set_fd(h->con, h->sockfd) == 0) {
's avatar
committed
307 308 309 310 311 312 313 314
		ERR_print_errors_fp(stderr);
		return 0;
	}
	if (SSL_connect(h->con) <= 0) {
		ERR_print_errors_fp(stderr);
		return 0;
	}
#endif
's avatar
committed
315
  return 0;
's avatar
committed
316

317 318 319 320 321
}

void mapiipc_remote_close(struct host *h)
//Releases socket resources
{
's avatar
committed
322
#ifdef DIMAPISSL
323
	if (SSL_shutdown(h->con) == -1) {
's avatar
committed
324
		ERR_print_errors_fp(stderr);
325
	}
's avatar
committed
326
#endif
327 328
	shutdown(h->sockfd, SHUT_RDWR);
	close(h->sockfd);
329
}
330 331

#endif /* DIMAPI */
332

Arne Øslebø's avatar
Arne Øslebø committed
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360

// Helper functions for function arguments retrieval
int getargint(mapiFunctArg **pos){
	int i;
	i = *((int *)(*pos));
	//	printf("getint: %d\n", i);
	(*pos) += sizeof(int);
	return i;
}

char getargchar(mapiFunctArg **pos){
	char c;
	c = *((char *)(*pos));
	//printf("getchar: %c\n", c);
	(*pos) += sizeof(char);
	return c;
}

unsigned long long getargulonglong(mapiFunctArg **pos){
	unsigned long long l;
	l = *((unsigned long long *)(*pos));
	//printf("getulonglong: %lld\n", l);
	(*pos) += sizeof(unsigned long long);
	return l;
}

char * getargstr(mapiFunctArg **pos){
	char *s;
361
	s = (char*)*pos;
Arne Øslebø's avatar
Arne Øslebø committed
362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497
	//printf("getstr: %s\n", s);
	(*pos) += strlen(s)+1;
	return s;
}

void addarg(mapiFunctArg **pos, void *arg, int type)
// Helper function for mapi_apply_function()
// pos: current position in message's argument buffer.
// arg: argument to copy into buffer
// type: argument type
{
  switch(type){
    case INT:
      memcpy(*pos, arg, sizeof(int));
      //printf("add_arg: %d\n", *((int *)(*pos)));
      (*pos) += sizeof(int);
      break;
    case CHAR:
      memcpy(*pos, arg, sizeof(char));
      //printf("add_arg: %d\n", *((char *)(*pos)));
      (*pos) += sizeof(char);
      break;
    case UNSIGNED_LONG_LONG:
      memcpy(*pos, arg, sizeof(unsigned long long));
      //printf("add_arg: %llu\n", *((unsigned long long *)(*pos)));
      (*pos) += sizeof(unsigned long long);
      break;
    case STRING:
      memcpy(*pos, arg, strlen((char *)arg)+1);
      //printf("add_arg: %s\n", (char *)(*pos));
      (*pos) += strlen((char *)arg)+1;
      break;
    default:
      break;
  }
}

int mapiipc_send_fd(int sendfd)
{
  struct msghdr	msg;
  struct iovec	iov[1];
  char ptr[2];
  int ret;

#ifdef	HAVE_MSGHDR_MSG_CONTROL
  union {
    struct cmsghdr	cm;
    char control[CMSG_SPACE(sizeof(int))];
  } control_un;
  struct cmsghdr	*cmptr;
  
  msg.msg_control = control_un.control;
  msg.msg_controllen = sizeof(control_un.control);
  
  cmptr = CMSG_FIRSTHDR(&msg);
  cmptr->cmsg_len = CMSG_LEN(sizeof(int));
  cmptr->cmsg_level = SOL_SOCKET;
  cmptr->cmsg_type = SCM_RIGHTS;
  *((int *) CMSG_DATA(cmptr)) = sendfd;
#else
  msg.msg_accrights = (caddr_t) &sendfd;
  msg.msg_accrightslen = sizeof(int);
#endif
    
  iov[0].iov_base = ptr;
  iov[0].iov_len = 2;
  msg.msg_iov = iov;
  msg.msg_iovlen = 1;
  msg.msg_name = NULL;
  msg.msg_namelen = 0;

  ret=sendmsg(sock,&msg,0);
  return(ret);
}


int mapiipc_read_fd(int sock)
{
  struct msghdr	msg;
  struct iovec	iov[1];
  ssize_t n;
  int recvfd;
  char c[2];
  
#ifdef	HAVE_MSGHDR_MSG_CONTROL
  union {
    struct cmsghdr	cm;
    char				control[CMSG_SPACE(sizeof(int))];
  } control_un;
  struct cmsghdr	*cmptr;
  
  msg.msg_control = control_un.control;
  msg.msg_controllen = sizeof(control_un.control);
#else
  msg.msg_accrights = (caddr_t) &newfd;
  msg.msg_accrightslen = sizeof(int);
#endif
  
  iov[0].iov_base = &c;
  iov[0].iov_len = 2;
  msg.msg_iov = iov;
  msg.msg_iovlen = 1;
  msg.msg_name = NULL;
  msg.msg_namelen = 0;
  
  
  if ( (n = recvmsg(sock, &msg, 0)) <= 0)
    return(n);
  
#ifdef	HAVE_MSGHDR_MSG_CONTROL
  if ( (cmptr = CMSG_FIRSTHDR(&msg)) != NULL &&
       cmptr->cmsg_len == CMSG_LEN(sizeof(int))) {
    if (cmptr->cmsg_level != SOL_SOCKET) {
      // err_quit("control level != SOL_SOCKET");
      fprintf(stderr, "control level != SOL_SOCKET");
      return -1;
    }
    if (cmptr->cmsg_type != SCM_RIGHTS) {
      // err_quit("control type != SCM_RIGHTS");
      fprintf(stderr, "control type != SCM_RIGHTS");
      return -1;
    }
    recvfd = *((int *) CMSG_DATA(cmptr));
  } else
    recvfd = -1;		/* descriptor was not passed */
#else
  /* *INDENT-OFF* */
  if (msg.msg_accrightslen == sizeof(int))
    recvfd = newfd;
  else
    recvfd = -1;		/* descriptor was not passed */
  /* *INDENT-ON* */
#endif

  return recvfd;
}
's avatar
committed
498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522

/* Read "n" bytes from a socket. */
ssize_t readn(int fd, void *vptr, size_t n) {
        size_t nleft;
        ssize_t nread;
        char *ptr;

        ptr = vptr;
        nleft = n;
        while (nleft > 0) {
		errno=0;
                if ( (nread = read(fd, ptr, nleft)) < 0) {
                        if (errno == EINTR)
                                nread = 0;              /* and call read() again */
                        else
                                return(-1);
                } else if (nread == 0)
                        return 0;                          /* EOF */

                nleft -= nread;
                ptr   += nread;
        }
        return(n - nleft);              /* return >= 0 */
}

's avatar
committed
523 524
#ifdef DIMAPISSL
ssize_t SSL_readn(SSL *con, void *vptr, size_t n) {
525

's avatar
committed
526 527 528
        size_t nleft;
        ssize_t nread;
        char *ptr;
's avatar
committed
529

's avatar
committed
530 531
        ptr = vptr;
        nleft = n;
532

's avatar
committed
533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548
        while (nleft > 0) {
		errno=0;
                if ( (nread = SSL_read(con, ptr, nleft)) < 0) {
                        if (errno == EINTR)
                                nread = 0;              /* and call read() again */
                        else
                                return(-1);
                } else if (nread == 0)
                        return 0;                          /* EOF */

                nleft -= nread;
                ptr   += nread;
        }
        return(n - nleft);              /* return >= 0 */
}
#endif