mapicommd.c 10.7 KB
Newer Older
1 2 3
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
's avatar
committed
4 5
#include <stdio.h>
#include <stdlib.h>
's avatar
committed
6 7 8
#include <string.h>
#include <unistd.h>
#include <ctype.h>
's avatar
committed
9 10 11 12
#include <pthread.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/socket.h>
's avatar
committed
13
#include <arpa/inet.h>
's avatar
committed
14
#include <sys/time.h>
's avatar
committed
15
#include <errno.h>
's avatar
committed
16 17
#include "mapi.h"
#include "mapiipc.h"
's avatar
committed
18
#include "parseconf.h"
19
#include "mapi_errors.h"
's avatar
committed
20
#include "mapilibhandler.h"
's avatar
committed
21

's avatar
committed
22
#define MAXPENDING 500    /* Maximum outstanding connection requests */
's avatar
committed
23

24 25
extern void set_agent();

's avatar
committed
26
//pthread_mutex_t lock;	DELETE
's avatar
committed
27
int service_count;
's avatar
committed
28
int dimapi_port;
's avatar
committed
29 30 31 32 33 34 35 36 37 38 39

void *handle_request(void *);
int die(char *msg);
int getfid(struct dmapiipcbuf *dbuf);

int main(int argc, char **argv) {
	
	int serv_sock = 0;
	int new_sock = 0;      /* client's socket descriptor (from connect()) */
	socklen_t clnt_len;    /* length of client address data structure */
	int yes=1;
's avatar
committed
40
	unsigned char* arg;
's avatar
committed
41 42 43 44 45
	
	struct sockaddr_in serv_addr;
	struct sockaddr_in clnt_addr;
	
	pthread_t chld_thr;
's avatar
committed
46 47
	
	char* mapi_conf;
48 49 50 51 52
	//mapi_conf = malloc(sizeof(CONF_FILE)-2+strlen(getenv("HOME")));
	//sprintf(mapi_conf,CONF_FILE,getenv("HOME"));
	mapi_conf = printf_string( CONFDIR"/"CONF_FILE );
	printf("using %s\n", mapi_conf);
	
's avatar
committed
53 54 55 56 57 58 59 60 61 62 63
	if (pc_load (mapi_conf))
	  {
		dimapi_port = atoi( pc_get_param (pc_get_category(""), "dimapi_port") );
	  }
	else 
	  {
		printf("Error: cannot load mapi.conf file.\n");
		exit(1);
	  }
	free(mapi_conf);
	pc_close();
's avatar
committed
64

's avatar
committed
65
	if ((serv_sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
's avatar
committed
66
		die("Unexpected error on socket()");
's avatar
committed
67 68
		exit(-1);
	}
's avatar
committed
69 70 71 72 73
	
	memset(&serv_addr, 0, sizeof serv_addr);  

	serv_addr.sin_family = AF_INET;   
	serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
's avatar
committed
74
	serv_addr.sin_port = htons(dimapi_port);
's avatar
committed
75 76 77 78 79 80 81 82 83 84

	/* DANGEROUS, but useful for debugging, so leave it for now */
	if (setsockopt(serv_sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
		close(serv_sock);
		die("Unexpected error on setsockopt()");
	}

	if (bind(serv_sock, (struct sockaddr *)&serv_addr, sizeof serv_addr) == -1) {
		close(serv_sock);
		die("Unexpected error on bind()");
's avatar
committed
85
		exit(1);
's avatar
committed
86 87 88 89 90 91 92
	}

	/* queue max 5 connections */
	if (listen(serv_sock, MAXPENDING) == -1) {
		shutdown(serv_sock, SHUT_RDWR);
		close(serv_sock);
		die("Unexpected error on listen()");
's avatar
committed
93
		exit(-1);
's avatar
committed
94 95
	}

96
	set_agent();
's avatar
committed
97

's avatar
committed
98 99 100
	while(1) {

		clnt_len = sizeof clnt_addr;
's avatar
committed
101
		if ((new_sock = accept(serv_sock, (struct sockaddr *)&clnt_addr, &clnt_len)) == -1) {
's avatar
committed
102
			die("Unexpected error on accept()");
's avatar
committed
103 104
			continue;
		}
's avatar
committed
105 106

		printf("<*> got connection from %s\n", inet_ntoa(clnt_addr.sin_addr));
's avatar
committed
107 108 109
		arg=(unsigned char*)malloc((sizeof(int)+sizeof(clnt_addr.sin_addr)));
		memcpy(arg, &new_sock, sizeof(int));
		memcpy(arg+sizeof(int), &clnt_addr.sin_addr, sizeof(clnt_addr.sin_addr));
's avatar
committed
110
		if (pthread_create(&chld_thr, NULL, handle_request, (void *)arg) != 0) {
's avatar
committed
111
            		die("pthread_create() failed");
's avatar
committed
112 113
			continue;
		}
's avatar
committed
114 115 116 117 118 119 120
	}

	return 0; /* never reached */
}

void *handle_request(void *arg) {
	
's avatar
committed
121
	int sock;
's avatar
committed
122
	int recv_bytes;
's avatar
committed
123
	//char buffer[DIMAPI_DATA_SIZE];	DELETE
's avatar
committed
124 125
	struct dmapiipcbuf *dbuf=NULL;
	int mapid_result;
's avatar
committed
126
	mapi_results_t *result;
's avatar
committed
127
	//unsigned int dbuf_bytes=0;		DELETE
's avatar
committed
128 129 130 131 132 133 134
	int i;
	int *active_flows = NULL;
	int ac_fl_size=0;
	mapi_function_info_t funct_info;
	mapi_flow_info_t flow_info;
	struct timeval tv; /*used for timestamping results when produced */
	struct mapipkt *pkt;
's avatar
committed
135 136 137 138 139
	struct in_addr client;
	char* dev_addr;

	memcpy(&sock, (unsigned char*)arg, sizeof(int));
	memcpy(&client, ((unsigned char*)arg)+sizeof(int), sizeof(struct in_addr));
's avatar
committed
140
	free(arg);
's avatar
committed
141 142 143 144 145 146 147 148

	/* Guarantees that thread resources are deallocated upon return */
	pthread_detach(pthread_self()); 
	dbuf = (struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf));
	printf("<+> new thread %d, socket number = %d\n", (int)pthread_self(), sock);
	
	while(1) {

's avatar
committed
149
		/*if (dbuf_bytes==0 || dbuf_bytes<((struct dmapiipcbuf *)buffer)->length){
's avatar
committed
150

's avatar
committed
151
			recv_bytes = recv(sock, (char*)buffer+dbuf_bytes, DIMAPI_DATA_SIZE, 0);
's avatar
committed
152

's avatar
committed
153
			if (recv_bytes == 0) { // the peer has gone
's avatar
committed
154 155 156 157 158 159 160 161 162 163 164 165
				printf("Peer has gone\n");
				break;
			}
			else if (recv_bytes == -1) {
				die("recv()");
				break;
			}

			dbuf_bytes += recv_bytes;
		}


's avatar
committed
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
		if (dbuf_bytes<((struct dmapiipcbuf *)buffer)->length) continue;*/

		recv_bytes=readn(sock, dbuf, BASIC_SIZE);

		if (recv_bytes == 0) { // the peer has gone
			printf("Peer has gone\n");
			break;
		}
		else if (recv_bytes == -1) {
			die("recv()");
			break;
		}

		if (dbuf->length > DIMAPI_DATA_SIZE) {
			fprintf(stderr,"Warning: Ignoring invalid message\n");
			continue;
		}
's avatar
committed
183

's avatar
committed
184 185 186 187 188 189 190 191 192 193 194 195 196 197
		if (dbuf->length-BASIC_SIZE>0) {
			recv_bytes=readn(sock, (char*)dbuf+BASIC_SIZE, dbuf->length-BASIC_SIZE);
		
			if (recv_bytes == 0) { // the peer has gone
				printf("Peer has gone\n");
				break;
			}
			else if (recv_bytes == -1) {
				die("recv()");
				break;
			}
		}
		
		//memcpy(dbuf,buffer,((struct dmapiipcbuf *)buffer)->length);	DELETE
's avatar
committed
198 199
		switch(dbuf->cmd) {
			case CREATE_FLOW:
's avatar
committed
200
				dev_addr=(char*)malloc( (strlen((char *)dbuf->data)+strlen(inet_ntoa(client)))*sizeof(char) );
's avatar
committed
201
				strcpy(dev_addr, (char *)dbuf->data);	//secure
's avatar
committed
202 203 204
				strcat(dev_addr, "#");
				strcat(dev_addr, inet_ntoa(client));
				mapid_result = mapi_create_flow(dev_addr);
's avatar
committed
205 206 207 208 209 210
				fprintf(stdout,"CREATE_FLOW (%s, %d)\n",dbuf->data, mapid_result);
				if(mapid_result <0)
					dbuf->cmd = ERROR_ACK;
				else					
					dbuf->cmd = CREATE_FLOW_ACK;
				memcpy(dbuf->data, &mapid_result, sizeof(int));
's avatar
committed
211
				active_flows = realloc(active_flows,(ac_fl_size+1)*sizeof(int));
's avatar
committed
212 213
				active_flows[ac_fl_size++] = mapid_result;
				dbuf->length = BASIC_SIZE+sizeof(int);
's avatar
committed
214
				free(dev_addr);
's avatar
committed
215 216 217 218 219 220 221 222
				break;
			case CLOSE_FLOW:
				fprintf(stdout,"CLOSE_FLOW (%d)\n",dbuf->fd);
				mapid_result = mapi_close_flow(dbuf->fd);
				if(!mapid_result){
					for(i=0;i<ac_fl_size;++i){
						if(active_flows[i] == dbuf->fd){
							active_flows[i] = active_flows[--ac_fl_size];
's avatar
committed
223
							active_flows = realloc(active_flows,ac_fl_size*sizeof(int));
's avatar
committed
224 225 226
						}
					}
				}
's avatar
committed
227
				//no need to send responce
's avatar
committed
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
				break;
			case CONNECT:
				fprintf(stdout,"CONNECT (%d)",dbuf->fd);
				mapid_result = mapi_connect(dbuf->fd);
				if(mapid_result >= 0){
					dbuf->cmd = CONNECT_ACK;
					fprintf(stdout," OK\n");
				}
				else{
					dbuf->cmd = ERROR_ACK;
					fprintf(stdout," FAILED\n");
				}
				dbuf->length = BASIC_SIZE;
				break;
			case APPLY_FUNCTION:
's avatar
committed
243
				fprintf(stdout,"APPLY_FUNCTION\n");
's avatar
committed
244 245 246 247 248 249 250 251 252 253 254
				if((( dbuf->fid = getfid(dbuf))!=-1)){
					dbuf->cmd = APPLY_FUNCTION_ACK;
				}
				else{
					dbuf->cmd = ERROR_ACK;
				}
				dbuf->length = BASIC_SIZE;
				break;
			case READ_RESULT:
				//fprintf(stdout,"READ_RESULT\n");
				dbuf->cmd = READ_RESULT_ACK;
255
				result = mapi_read_results(dbuf->fd,dbuf->fid);
's avatar
committed
256
				if(result!=NULL){
's avatar
committed
257 258 259
					dbuf->timestamp = result->ts;
					memcpy(dbuf->data, result->res, result->size);
					dbuf->length = BASIC_SIZE + result->size;
's avatar
committed
260 261 262 263 264 265 266 267
				}
				else{
					fprintf(stdout,"mapi_read_results failed...\n");
					dbuf->cmd = ERROR_ACK;
					dbuf->length = BASIC_SIZE;
				}
				break;
			case GET_NEXT_PKT:
's avatar
committed
268
				//fprintf(stdout,"GET_NEXT_PKT\n");
's avatar
committed
269
				pkt = (struct mapipkt *)mapi_get_next_pkt(dbuf->fd,dbuf->fid);
's avatar
committed
270
				gettimeofday(&tv, NULL);
's avatar
committed
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
				dbuf->timestamp = tv.tv_usec;
				if(pkt!=NULL){
					dbuf->cmd = GET_NEXT_PKT_ACK;
					memcpy(dbuf->data, pkt, sizeof(struct mapipkt)-4+pkt->caplen);
					dbuf->length = BASIC_SIZE + sizeof(struct mapipkt) - 4 + pkt->caplen;
				}
				else{
					dbuf->cmd = ERROR_ACK;
					dbuf->length = BASIC_SIZE;
					fprintf(stdout,"mapi_get_next_pkt failed...\n");					
				}
				break;
			case GET_FLOW_INFO:
				fprintf(stdout,"GET_FLOW_INFO\n");
				if(mapi_get_flow_info(dbuf->fd, &flow_info)){
					dbuf->cmd = MAPI_FLOW_INFO_ERR;
					dbuf->length = BASIC_SIZE;
				}
				else{
					dbuf->cmd = GET_FLOW_INFO_ACK;
					memcpy(dbuf->data,&flow_info,sizeof(mapi_flow_info_t));
					dbuf->length = BASIC_SIZE+sizeof(mapi_flow_info_t);
				}
				break;
			case GET_FUNCTION_INFO:
				fprintf(stdout,"GET_FUNCTION_INFO\n");
				if(mapi_get_function_info(dbuf->fd, dbuf->fid, &funct_info)){
					dbuf->cmd = MAPI_FUNCTION_INFO_ERR;
					dbuf->length = BASIC_SIZE;
				}
				else{
					dbuf->cmd = GET_FUNCTION_INFO_ACK;
					memcpy(dbuf->data,&funct_info,sizeof(mapi_function_info_t));
					dbuf->length = BASIC_SIZE+sizeof(mapi_function_info_t);
				}
				break;
			case GET_NEXT_FUNCTION_INFO:
				fprintf(stdout,"GET_NEXT_FUNCTION_INFO\n");
				if(mapi_get_next_function_info(dbuf->fd, dbuf->fid, &funct_info)){
					dbuf->cmd = MAPI_FUNCTION_INFO_ERR;
					dbuf->length = BASIC_SIZE;
				}
				else{
					dbuf->cmd = GET_FUNCTION_INFO_ACK;
					memcpy(dbuf->data,&funct_info,sizeof(mapi_function_info_t));
					dbuf->length = BASIC_SIZE+sizeof(mapi_function_info_t);
				}
				break;				break;
			case GET_NEXT_FLOW_INFO:
				fprintf(stdout,"GET_NEXT_FLOW_INFO\n");
				if(mapi_get_next_flow_info(dbuf->fd, &flow_info)){
					dbuf->cmd = MAPI_FLOW_INFO_ERR;
					dbuf->length = BASIC_SIZE;
				}
				else{
					dbuf->cmd = GET_FLOW_INFO_ACK;
					memcpy(dbuf->data,&flow_info,sizeof(mapi_flow_info_t));
					dbuf->length = BASIC_SIZE+sizeof(mapi_flow_info_t);
				}
				break;
#ifdef WITH_ADMISSION_CONTROL
			case SET_AUTHDATA:
				fprintf(stdout,"SET_AUTHDATA\n");
				if(!agent_send_authdata(dbuf)){
					dbuf->cmd = SET_AUTHDATA_ACK;
				}
				else{
					dbuf->cmd = ERROR_ACK;
				}

				dbuf->length = BASIC_SIZE;
				break;
#endif
's avatar
committed
344 345
#ifdef WITH_AUTHENTICATION
			case AUTHENTICATE:
346
				fprintf(stdout, "AUTHENTICATE\n");
's avatar
 
committed
347
				if(!agent_authenticate(dbuf))
348 349 350
					dbuf->cmd = AUTHENTICATE_ACK;
				else
					dbuf->cmd = ERROR_ACK;
's avatar
 
committed
351
				dbuf->length = BASIC_SIZE;
's avatar
committed
352 353
				break;
#endif
's avatar
committed
354 355 356 357 358
	              default:
				die("Default case found in handle_request loop!\n");
        	        break;
	        }

's avatar
committed
359 360 361 362
		//no need to send responce on mapi_close_flow
		if (dbuf->cmd!=CLOSE_FLOW) {
			send(sock, dbuf, dbuf->length, 0);
		}
's avatar
committed
363

's avatar
committed
364 365
		//dbuf_bytes=dbuf_bytes-((struct dmapiipcbuf *)buffer)->length;			DELETE
		//memcpy(buffer,buffer+((struct dmapiipcbuf *)buffer)->length,dbuf_bytes);	DELETE
's avatar
committed
366 367 368
	}
	
	for(i=0;i<ac_fl_size;++i){//close all remaining flows before this thread exits
's avatar
committed
369
		if(active_flows[i]>0){//this should always be positive or realloc does not work
's avatar
committed
370 371 372 373
			mapi_close_flow(active_flows[i]);
		}
	}
	free(active_flows);
374
	free(dbuf);
's avatar
committed
375 376 377 378
	shutdown(sock, SHUT_RDWR);
	close(sock);

	/* update the global service counter */
's avatar
committed
379
	//pthread_mutex_lock(&lock);	DELETE
's avatar
committed
380
	service_count++;
's avatar
committed
381
	//pthread_mutex_unlock(&lock);	DELETE
's avatar
committed
382 383 384 385 386 387 388 389 390 391 392 393 394 395
	printf("<+> thread %d exiting\n<+> total sockets served: %d\n", (int)pthread_self(), service_count);

	pthread_exit((void *)0);
}


int die(char *msg) {
	perror(msg);
	return EXIT_FAILURE;
}

//calls the appropriate mapi_apply_function and returns the fid from mapid
int getfid(struct dmapiipcbuf *dbuf){
	int result;
's avatar
committed
396
	
397
	char *function = (char *)dbuf->data;
's avatar
committed
398
	char *data = (char *)(dbuf->data+strlen(function)+1);
's avatar
committed
399

's avatar
committed
400 401
	result = mapi_apply_function(dbuf->fd, function, data);
	return (result);
's avatar
committed
402 403
}

's avatar
committed
404 405 406 407