mapicommd.c 10.7 KB
Newer Older
1 2 3
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
's avatar
committed
4 5
#include <stdio.h>
#include <stdlib.h>
's avatar
committed
6 7 8
#include <string.h>
#include <unistd.h>
#include <ctype.h>
's avatar
committed
9 10 11 12
#include <pthread.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/socket.h>
's avatar
committed
13
#include <arpa/inet.h>
's avatar
committed
14
#include <sys/time.h>
's avatar
committed
15
#include <errno.h>
's avatar
committed
16 17
#include "mapi.h"
#include "mapiipc.h"
's avatar
committed
18
#include "parseconf.h"
19
#include "mapi_errors.h"
's avatar
committed
20
#include "mapilibhandler.h"
's avatar
committed
21
#include "printfstring.h"
's avatar
committed
22

's avatar
committed
23
#define MAXPENDING 500    /* Maximum outstanding connection requests */
's avatar
committed
24

25 26
extern void set_agent();

's avatar
committed
27
//pthread_mutex_t lock;	DELETE
's avatar
committed
28
int service_count;
's avatar
committed
29
int dimapi_port;
's avatar
committed
30 31 32 33 34

void *handle_request(void *);
int die(char *msg);
int getfid(struct dmapiipcbuf *dbuf);

's avatar
committed
35
int main() {
's avatar
committed
36 37 38 39 40
	
	int serv_sock = 0;
	int new_sock = 0;      /* client's socket descriptor (from connect()) */
	socklen_t clnt_len;    /* length of client address data structure */
	int yes=1;
's avatar
committed
41
	unsigned char* arg;
's avatar
committed
42 43 44 45 46
	
	struct sockaddr_in serv_addr;
	struct sockaddr_in clnt_addr;
	
	pthread_t chld_thr;
's avatar
committed
47 48
	
	char* mapi_conf;
49 50 51 52 53
	//mapi_conf = malloc(sizeof(CONF_FILE)-2+strlen(getenv("HOME")));
	//sprintf(mapi_conf,CONF_FILE,getenv("HOME"));
	mapi_conf = printf_string( CONFDIR"/"CONF_FILE );
	printf("using %s\n", mapi_conf);
	
's avatar
committed
54 55 56 57 58 59 60 61 62 63 64
	if (pc_load (mapi_conf))
	  {
		dimapi_port = atoi( pc_get_param (pc_get_category(""), "dimapi_port") );
	  }
	else 
	  {
		printf("Error: cannot load mapi.conf file.\n");
		exit(1);
	  }
	free(mapi_conf);
	pc_close();
's avatar
committed
65

's avatar
committed
66
	if ((serv_sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
's avatar
committed
67
		die("Unexpected error on socket()");
's avatar
committed
68 69
		exit(-1);
	}
's avatar
committed
70 71 72 73 74
	
	memset(&serv_addr, 0, sizeof serv_addr);  

	serv_addr.sin_family = AF_INET;   
	serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
's avatar
committed
75
	serv_addr.sin_port = htons(dimapi_port);
's avatar
committed
76 77 78 79 80 81 82 83 84 85

	/* DANGEROUS, but useful for debugging, so leave it for now */
	if (setsockopt(serv_sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
		close(serv_sock);
		die("Unexpected error on setsockopt()");
	}

	if (bind(serv_sock, (struct sockaddr *)&serv_addr, sizeof serv_addr) == -1) {
		close(serv_sock);
		die("Unexpected error on bind()");
's avatar
committed
86
		exit(1);
's avatar
committed
87 88 89 90 91 92 93
	}

	/* queue max 5 connections */
	if (listen(serv_sock, MAXPENDING) == -1) {
		shutdown(serv_sock, SHUT_RDWR);
		close(serv_sock);
		die("Unexpected error on listen()");
's avatar
committed
94
		exit(-1);
's avatar
committed
95 96
	}

97
	set_agent();
's avatar
committed
98

's avatar
committed
99 100 101
	while(1) {

		clnt_len = sizeof clnt_addr;
's avatar
committed
102
		if ((new_sock = accept(serv_sock, (struct sockaddr *)&clnt_addr, &clnt_len)) == -1) {
's avatar
committed
103
			die("Unexpected error on accept()");
's avatar
committed
104 105
			continue;
		}
's avatar
committed
106 107

		printf("<*> got connection from %s\n", inet_ntoa(clnt_addr.sin_addr));
's avatar
committed
108 109 110
		arg=(unsigned char*)malloc((sizeof(int)+sizeof(clnt_addr.sin_addr)));
		memcpy(arg, &new_sock, sizeof(int));
		memcpy(arg+sizeof(int), &clnt_addr.sin_addr, sizeof(clnt_addr.sin_addr));
's avatar
committed
111
		if (pthread_create(&chld_thr, NULL, handle_request, (void *)arg) != 0) {
's avatar
committed
112
            		die("pthread_create() failed");
's avatar
committed
113 114
			continue;
		}
's avatar
committed
115 116 117 118 119 120 121
	}

	return 0; /* never reached */
}

void *handle_request(void *arg) {
	
's avatar
committed
122
	int sock;
's avatar
committed
123
	int recv_bytes;
's avatar
committed
124
	//char buffer[DIMAPI_DATA_SIZE];	DELETE
's avatar
committed
125 126
	struct dmapiipcbuf *dbuf=NULL;
	int mapid_result;
's avatar
committed
127
	mapi_results_t *result;
's avatar
committed
128
	//unsigned int dbuf_bytes=0;		DELETE
's avatar
committed
129 130 131 132 133 134 135
	int i;
	int *active_flows = NULL;
	int ac_fl_size=0;
	mapi_function_info_t funct_info;
	mapi_flow_info_t flow_info;
	struct timeval tv; /*used for timestamping results when produced */
	struct mapipkt *pkt;
's avatar
committed
136 137 138 139 140
	struct in_addr client;
	char* dev_addr;

	memcpy(&sock, (unsigned char*)arg, sizeof(int));
	memcpy(&client, ((unsigned char*)arg)+sizeof(int), sizeof(struct in_addr));
's avatar
committed
141
	free(arg);
's avatar
committed
142 143 144 145 146 147 148 149

	/* Guarantees that thread resources are deallocated upon return */
	pthread_detach(pthread_self()); 
	dbuf = (struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf));
	printf("<+> new thread %d, socket number = %d\n", (int)pthread_self(), sock);
	
	while(1) {

's avatar
committed
150
		/*if (dbuf_bytes==0 || dbuf_bytes<((struct dmapiipcbuf *)buffer)->length){
's avatar
committed
151

's avatar
committed
152
			recv_bytes = recv(sock, (char*)buffer+dbuf_bytes, DIMAPI_DATA_SIZE, 0);
's avatar
committed
153

's avatar
committed
154
			if (recv_bytes == 0) { // the peer has gone
's avatar
committed
155 156 157 158 159 160 161 162 163 164 165 166
				printf("Peer has gone\n");
				break;
			}
			else if (recv_bytes == -1) {
				die("recv()");
				break;
			}

			dbuf_bytes += recv_bytes;
		}


's avatar
committed
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
		if (dbuf_bytes<((struct dmapiipcbuf *)buffer)->length) continue;*/

		recv_bytes=readn(sock, dbuf, BASIC_SIZE);

		if (recv_bytes == 0) { // the peer has gone
			printf("Peer has gone\n");
			break;
		}
		else if (recv_bytes == -1) {
			die("recv()");
			break;
		}

		if (dbuf->length > DIMAPI_DATA_SIZE) {
			fprintf(stderr,"Warning: Ignoring invalid message\n");
			continue;
		}
's avatar
committed
184

's avatar
committed
185 186 187 188 189 190 191 192 193 194 195 196 197 198
		if (dbuf->length-BASIC_SIZE>0) {
			recv_bytes=readn(sock, (char*)dbuf+BASIC_SIZE, dbuf->length-BASIC_SIZE);
		
			if (recv_bytes == 0) { // the peer has gone
				printf("Peer has gone\n");
				break;
			}
			else if (recv_bytes == -1) {
				die("recv()");
				break;
			}
		}
		
		//memcpy(dbuf,buffer,((struct dmapiipcbuf *)buffer)->length);	DELETE
's avatar
committed
199 200
		switch(dbuf->cmd) {
			case CREATE_FLOW:
's avatar
committed
201
				dev_addr=(char*)malloc( (strlen((char *)dbuf->data)+strlen(inet_ntoa(client)))*sizeof(char) );
's avatar
committed
202
				strcpy(dev_addr, (char *)dbuf->data);	//secure
's avatar
committed
203 204 205
				strcat(dev_addr, "#");
				strcat(dev_addr, inet_ntoa(client));
				mapid_result = mapi_create_flow(dev_addr);
's avatar
committed
206 207 208 209 210 211
				fprintf(stdout,"CREATE_FLOW (%s, %d)\n",dbuf->data, mapid_result);
				if(mapid_result <0)
					dbuf->cmd = ERROR_ACK;
				else					
					dbuf->cmd = CREATE_FLOW_ACK;
				memcpy(dbuf->data, &mapid_result, sizeof(int));
's avatar
committed
212
				active_flows = realloc(active_flows,(ac_fl_size+1)*sizeof(int));
's avatar
committed
213 214
				active_flows[ac_fl_size++] = mapid_result;
				dbuf->length = BASIC_SIZE+sizeof(int);
's avatar
committed
215
				free(dev_addr);
's avatar
committed
216 217 218 219 220 221 222 223
				break;
			case CLOSE_FLOW:
				fprintf(stdout,"CLOSE_FLOW (%d)\n",dbuf->fd);
				mapid_result = mapi_close_flow(dbuf->fd);
				if(!mapid_result){
					for(i=0;i<ac_fl_size;++i){
						if(active_flows[i] == dbuf->fd){
							active_flows[i] = active_flows[--ac_fl_size];
's avatar
committed
224
							active_flows = realloc(active_flows,ac_fl_size*sizeof(int));
's avatar
committed
225 226 227
						}
					}
				}
's avatar
committed
228
				//no need to send responce
's avatar
committed
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
				break;
			case CONNECT:
				fprintf(stdout,"CONNECT (%d)",dbuf->fd);
				mapid_result = mapi_connect(dbuf->fd);
				if(mapid_result >= 0){
					dbuf->cmd = CONNECT_ACK;
					fprintf(stdout," OK\n");
				}
				else{
					dbuf->cmd = ERROR_ACK;
					fprintf(stdout," FAILED\n");
				}
				dbuf->length = BASIC_SIZE;
				break;
			case APPLY_FUNCTION:
's avatar
committed
244
				fprintf(stdout,"APPLY_FUNCTION\n");
's avatar
committed
245 246 247 248 249 250 251 252 253 254 255
				if((( dbuf->fid = getfid(dbuf))!=-1)){
					dbuf->cmd = APPLY_FUNCTION_ACK;
				}
				else{
					dbuf->cmd = ERROR_ACK;
				}
				dbuf->length = BASIC_SIZE;
				break;
			case READ_RESULT:
				//fprintf(stdout,"READ_RESULT\n");
				dbuf->cmd = READ_RESULT_ACK;
256
				result = mapi_read_results(dbuf->fd,dbuf->fid);
's avatar
committed
257
				if(result!=NULL){
's avatar
committed
258 259 260
					dbuf->timestamp = result->ts;
					memcpy(dbuf->data, result->res, result->size);
					dbuf->length = BASIC_SIZE + result->size;
's avatar
committed
261 262 263 264 265 266 267 268
				}
				else{
					fprintf(stdout,"mapi_read_results failed...\n");
					dbuf->cmd = ERROR_ACK;
					dbuf->length = BASIC_SIZE;
				}
				break;
			case GET_NEXT_PKT:
's avatar
committed
269
				//fprintf(stdout,"GET_NEXT_PKT\n");
's avatar
committed
270
				pkt = (struct mapipkt *)mapi_get_next_pkt(dbuf->fd,dbuf->fid);
's avatar
committed
271
				gettimeofday(&tv, NULL);
's avatar
committed
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
				dbuf->timestamp = tv.tv_usec;
				if(pkt!=NULL){
					dbuf->cmd = GET_NEXT_PKT_ACK;
					memcpy(dbuf->data, pkt, sizeof(struct mapipkt)-4+pkt->caplen);
					dbuf->length = BASIC_SIZE + sizeof(struct mapipkt) - 4 + pkt->caplen;
				}
				else{
					dbuf->cmd = ERROR_ACK;
					dbuf->length = BASIC_SIZE;
					fprintf(stdout,"mapi_get_next_pkt failed...\n");					
				}
				break;
			case GET_FLOW_INFO:
				fprintf(stdout,"GET_FLOW_INFO\n");
				if(mapi_get_flow_info(dbuf->fd, &flow_info)){
					dbuf->cmd = MAPI_FLOW_INFO_ERR;
					dbuf->length = BASIC_SIZE;
				}
				else{
					dbuf->cmd = GET_FLOW_INFO_ACK;
					memcpy(dbuf->data,&flow_info,sizeof(mapi_flow_info_t));
					dbuf->length = BASIC_SIZE+sizeof(mapi_flow_info_t);
				}
				break;
			case GET_FUNCTION_INFO:
				fprintf(stdout,"GET_FUNCTION_INFO\n");
				if(mapi_get_function_info(dbuf->fd, dbuf->fid, &funct_info)){
					dbuf->cmd = MAPI_FUNCTION_INFO_ERR;
					dbuf->length = BASIC_SIZE;
				}
				else{
					dbuf->cmd = GET_FUNCTION_INFO_ACK;
					memcpy(dbuf->data,&funct_info,sizeof(mapi_function_info_t));
					dbuf->length = BASIC_SIZE+sizeof(mapi_function_info_t);
				}
				break;
			case GET_NEXT_FUNCTION_INFO:
				fprintf(stdout,"GET_NEXT_FUNCTION_INFO\n");
				if(mapi_get_next_function_info(dbuf->fd, dbuf->fid, &funct_info)){
					dbuf->cmd = MAPI_FUNCTION_INFO_ERR;
					dbuf->length = BASIC_SIZE;
				}
				else{
					dbuf->cmd = GET_FUNCTION_INFO_ACK;
					memcpy(dbuf->data,&funct_info,sizeof(mapi_function_info_t));
					dbuf->length = BASIC_SIZE+sizeof(mapi_function_info_t);
				}
				break;				break;
			case GET_NEXT_FLOW_INFO:
				fprintf(stdout,"GET_NEXT_FLOW_INFO\n");
				if(mapi_get_next_flow_info(dbuf->fd, &flow_info)){
					dbuf->cmd = MAPI_FLOW_INFO_ERR;
					dbuf->length = BASIC_SIZE;
				}
				else{
					dbuf->cmd = GET_FLOW_INFO_ACK;
					memcpy(dbuf->data,&flow_info,sizeof(mapi_flow_info_t));
					dbuf->length = BASIC_SIZE+sizeof(mapi_flow_info_t);
				}
				break;
#ifdef WITH_ADMISSION_CONTROL
			case SET_AUTHDATA:
				fprintf(stdout,"SET_AUTHDATA\n");
				if(!agent_send_authdata(dbuf)){
					dbuf->cmd = SET_AUTHDATA_ACK;
				}
				else{
					dbuf->cmd = ERROR_ACK;
				}

				dbuf->length = BASIC_SIZE;
				break;
#endif
's avatar
committed
345 346
#ifdef WITH_AUTHENTICATION
			case AUTHENTICATE:
347
				fprintf(stdout, "AUTHENTICATE\n");
's avatar
 
committed
348
				if(!agent_authenticate(dbuf))
349 350 351
					dbuf->cmd = AUTHENTICATE_ACK;
				else
					dbuf->cmd = ERROR_ACK;
's avatar
 
committed
352
				dbuf->length = BASIC_SIZE;
's avatar
committed
353 354
				break;
#endif
's avatar
committed
355 356 357 358 359
	              default:
				die("Default case found in handle_request loop!\n");
        	        break;
	        }

's avatar
committed
360 361 362 363
		//no need to send responce on mapi_close_flow
		if (dbuf->cmd!=CLOSE_FLOW) {
			send(sock, dbuf, dbuf->length, 0);
		}
's avatar
committed
364

's avatar
committed
365 366
		//dbuf_bytes=dbuf_bytes-((struct dmapiipcbuf *)buffer)->length;			DELETE
		//memcpy(buffer,buffer+((struct dmapiipcbuf *)buffer)->length,dbuf_bytes);	DELETE
's avatar
committed
367 368 369
	}
	
	for(i=0;i<ac_fl_size;++i){//close all remaining flows before this thread exits
's avatar
committed
370
		if(active_flows[i]>0){//this should always be positive or realloc does not work
's avatar
committed
371 372 373 374
			mapi_close_flow(active_flows[i]);
		}
	}
	free(active_flows);
375
	free(dbuf);
's avatar
committed
376 377 378 379
	shutdown(sock, SHUT_RDWR);
	close(sock);

	/* update the global service counter */
's avatar
committed
380
	//pthread_mutex_lock(&lock);	DELETE
's avatar
committed
381
	service_count++;
's avatar
committed
382
	//pthread_mutex_unlock(&lock);	DELETE
's avatar
committed
383 384 385 386 387 388 389 390 391 392 393 394 395 396
	printf("<+> thread %d exiting\n<+> total sockets served: %d\n", (int)pthread_self(), service_count);

	pthread_exit((void *)0);
}


int die(char *msg) {
	perror(msg);
	return EXIT_FAILURE;
}

//calls the appropriate mapi_apply_function and returns the fid from mapid
int getfid(struct dmapiipcbuf *dbuf){
	int result;
's avatar
committed
397
	
398
	char *function = (char *)dbuf->data;
's avatar
committed
399
	char *data = (char *)(dbuf->data+strlen(function)+1);
's avatar
committed
400

's avatar
committed
401 402
	result = mapi_apply_function(dbuf->fd, function, data);
	return (result);
's avatar
committed
403 404
}

's avatar
committed
405 406 407 408