mapicommd.c 10.1 KB
Newer Older
1 2 3
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
's avatar
committed
4 5
#include <stdio.h>
#include <stdlib.h>
's avatar
committed
6 7 8
#include <string.h>
#include <unistd.h>
#include <ctype.h>
's avatar
committed
9 10 11 12
#include <pthread.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/socket.h>
's avatar
committed
13
#include <arpa/inet.h>
's avatar
committed
14
#include <sys/time.h>
's avatar
committed
15
#include <errno.h>
's avatar
committed
16 17
#include "mapi.h"
#include "mapiipc.h"
's avatar
committed
18
#include "parseconf.h"
19
#include "mapi_errors.h"
's avatar
committed
20
#include "mapilibhandler.h"
's avatar
committed
21
#include "printfstring.h"
's avatar
committed
22

's avatar
committed
23
#define MAXPENDING 500    /* Maximum outstanding connection requests */
's avatar
committed
24

25 26
extern void set_agent();

's avatar
committed
27
//pthread_mutex_t lock;	DELETE
's avatar
committed
28
int service_count;
's avatar
committed
29
int dimapi_port;
's avatar
committed
30 31 32 33 34

void *handle_request(void *);
int die(char *msg);
int getfid(struct dmapiipcbuf *dbuf);

's avatar
committed
35
int main() {
's avatar
committed
36 37 38 39 40 41 42 43 44 45
	
	int serv_sock = 0;
	int new_sock = 0;      /* client's socket descriptor (from connect()) */
	socklen_t clnt_len;    /* length of client address data structure */
	int yes=1;
	
	struct sockaddr_in serv_addr;
	struct sockaddr_in clnt_addr;
	
	pthread_t chld_thr;
's avatar
committed
46 47
	
	char* mapi_conf;
48 49 50 51 52
	//mapi_conf = malloc(sizeof(CONF_FILE)-2+strlen(getenv("HOME")));
	//sprintf(mapi_conf,CONF_FILE,getenv("HOME"));
	mapi_conf = printf_string( CONFDIR"/"CONF_FILE );
	printf("using %s\n", mapi_conf);
	
's avatar
committed
53 54 55 56 57 58 59 60 61 62 63
	if (pc_load (mapi_conf))
	  {
		dimapi_port = atoi( pc_get_param (pc_get_category(""), "dimapi_port") );
	  }
	else 
	  {
		printf("Error: cannot load mapi.conf file.\n");
		exit(1);
	  }
	free(mapi_conf);
	pc_close();
's avatar
committed
64

's avatar
committed
65
	if ((serv_sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
's avatar
committed
66
		die("Unexpected error on socket()");
's avatar
committed
67 68
		exit(-1);
	}
's avatar
committed
69 70 71 72 73
	
	memset(&serv_addr, 0, sizeof serv_addr);  

	serv_addr.sin_family = AF_INET;   
	serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
's avatar
committed
74
	serv_addr.sin_port = htons(dimapi_port);
's avatar
committed
75 76 77 78 79 80 81 82 83 84

	/* DANGEROUS, but useful for debugging, so leave it for now */
	if (setsockopt(serv_sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
		close(serv_sock);
		die("Unexpected error on setsockopt()");
	}

	if (bind(serv_sock, (struct sockaddr *)&serv_addr, sizeof serv_addr) == -1) {
		close(serv_sock);
		die("Unexpected error on bind()");
's avatar
committed
85
		exit(1);
's avatar
committed
86 87 88 89 90 91 92
	}

	/* queue max 5 connections */
	if (listen(serv_sock, MAXPENDING) == -1) {
		shutdown(serv_sock, SHUT_RDWR);
		close(serv_sock);
		die("Unexpected error on listen()");
's avatar
committed
93
		exit(-1);
's avatar
committed
94 95
	}

96
	set_agent();
's avatar
committed
97

's avatar
committed
98 99 100
	while(1) {

		clnt_len = sizeof clnt_addr;
's avatar
committed
101
		if ((new_sock = accept(serv_sock, (struct sockaddr *)&clnt_addr, &clnt_len)) == -1) {
's avatar
committed
102
			die("Unexpected error on accept()");
's avatar
committed
103 104
			continue;
		}
's avatar
committed
105 106

		printf("<*> got connection from %s\n", inet_ntoa(clnt_addr.sin_addr));
's avatar
committed
107
		if (pthread_create(&chld_thr, NULL, handle_request, (void *)new_sock) != 0) {
's avatar
committed
108
            		die("pthread_create() failed");
's avatar
committed
109 110
			continue;
		}
's avatar
committed
111 112 113 114 115 116 117
	}

	return 0; /* never reached */
}

void *handle_request(void *arg) {
	
's avatar
committed
118
	int sock=(int)arg;
's avatar
committed
119
	int recv_bytes;
's avatar
committed
120
	//char buffer[DIMAPI_DATA_SIZE];	DELETE
's avatar
committed
121 122
	struct dmapiipcbuf *dbuf=NULL;
	int mapid_result;
's avatar
committed
123
	mapi_results_t *result;
's avatar
committed
124
	//unsigned int dbuf_bytes=0;		DELETE
's avatar
committed
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
	int i;
	int *active_flows = NULL;
	int ac_fl_size=0;
	mapi_function_info_t funct_info;
	mapi_flow_info_t flow_info;
	struct timeval tv; /*used for timestamping results when produced */
	struct mapipkt *pkt;

	/* Guarantees that thread resources are deallocated upon return */
	pthread_detach(pthread_self()); 
	dbuf = (struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf));
	printf("<+> new thread %d, socket number = %d\n", (int)pthread_self(), sock);
	
	while(1) {

's avatar
committed
140
		/*if (dbuf_bytes==0 || dbuf_bytes<((struct dmapiipcbuf *)buffer)->length){
's avatar
committed
141

's avatar
committed
142
			recv_bytes = recv(sock, (char*)buffer+dbuf_bytes, DIMAPI_DATA_SIZE, 0);
's avatar
committed
143

's avatar
committed
144
			if (recv_bytes == 0) { // the peer has gone
's avatar
committed
145 146 147 148 149 150 151 152 153 154 155 156
				printf("Peer has gone\n");
				break;
			}
			else if (recv_bytes == -1) {
				die("recv()");
				break;
			}

			dbuf_bytes += recv_bytes;
		}


's avatar
committed
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
		if (dbuf_bytes<((struct dmapiipcbuf *)buffer)->length) continue;*/

		recv_bytes=readn(sock, dbuf, BASIC_SIZE);

		if (recv_bytes == 0) { // the peer has gone
			printf("Peer has gone\n");
			break;
		}
		else if (recv_bytes == -1) {
			die("recv()");
			break;
		}

		if (dbuf->length > DIMAPI_DATA_SIZE) {
			fprintf(stderr,"Warning: Ignoring invalid message\n");
			continue;
		}
's avatar
committed
174

's avatar
committed
175 176 177 178 179 180 181 182 183 184 185 186 187 188
		if (dbuf->length-BASIC_SIZE>0) {
			recv_bytes=readn(sock, (char*)dbuf+BASIC_SIZE, dbuf->length-BASIC_SIZE);
		
			if (recv_bytes == 0) { // the peer has gone
				printf("Peer has gone\n");
				break;
			}
			else if (recv_bytes == -1) {
				die("recv()");
				break;
			}
		}
		
		//memcpy(dbuf,buffer,((struct dmapiipcbuf *)buffer)->length);	DELETE
's avatar
committed
189 190
		switch(dbuf->cmd) {
			case CREATE_FLOW:
's avatar
committed
191
				mapid_result = mapi_create_flow(dbuf->data);
's avatar
committed
192 193 194 195 196 197
				fprintf(stdout,"CREATE_FLOW (%s, %d)\n",dbuf->data, mapid_result);
				if(mapid_result <0)
					dbuf->cmd = ERROR_ACK;
				else					
					dbuf->cmd = CREATE_FLOW_ACK;
				memcpy(dbuf->data, &mapid_result, sizeof(int));
's avatar
committed
198
				active_flows = realloc(active_flows,(ac_fl_size+1)*sizeof(int));
's avatar
committed
199 200 201 202 203 204 205 206 207 208
				active_flows[ac_fl_size++] = mapid_result;
				dbuf->length = BASIC_SIZE+sizeof(int);
				break;
			case CLOSE_FLOW:
				fprintf(stdout,"CLOSE_FLOW (%d)\n",dbuf->fd);
				mapid_result = mapi_close_flow(dbuf->fd);
				if(!mapid_result){
					for(i=0;i<ac_fl_size;++i){
						if(active_flows[i] == dbuf->fd){
							active_flows[i] = active_flows[--ac_fl_size];
's avatar
committed
209
							active_flows = realloc(active_flows,ac_fl_size*sizeof(int));
's avatar
committed
210 211 212
						}
					}
				}
's avatar
committed
213
				//no need to send responce
's avatar
committed
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
				break;
			case CONNECT:
				fprintf(stdout,"CONNECT (%d)",dbuf->fd);
				mapid_result = mapi_connect(dbuf->fd);
				if(mapid_result >= 0){
					dbuf->cmd = CONNECT_ACK;
					fprintf(stdout," OK\n");
				}
				else{
					dbuf->cmd = ERROR_ACK;
					fprintf(stdout," FAILED\n");
				}
				dbuf->length = BASIC_SIZE;
				break;
			case APPLY_FUNCTION:
's avatar
committed
229
				fprintf(stdout,"APPLY_FUNCTION\n");
's avatar
committed
230 231 232 233 234 235 236 237 238 239 240
				if((( dbuf->fid = getfid(dbuf))!=-1)){
					dbuf->cmd = APPLY_FUNCTION_ACK;
				}
				else{
					dbuf->cmd = ERROR_ACK;
				}
				dbuf->length = BASIC_SIZE;
				break;
			case READ_RESULT:
				//fprintf(stdout,"READ_RESULT\n");
				dbuf->cmd = READ_RESULT_ACK;
241
				result = mapi_read_results(dbuf->fd,dbuf->fid);
's avatar
committed
242
				if(result!=NULL){
's avatar
committed
243 244 245
					dbuf->timestamp = result->ts;
					memcpy(dbuf->data, result->res, result->size);
					dbuf->length = BASIC_SIZE + result->size;
's avatar
committed
246 247 248 249 250 251 252 253
				}
				else{
					fprintf(stdout,"mapi_read_results failed...\n");
					dbuf->cmd = ERROR_ACK;
					dbuf->length = BASIC_SIZE;
				}
				break;
			case GET_NEXT_PKT:
's avatar
committed
254
				//fprintf(stdout,"GET_NEXT_PKT\n");
's avatar
committed
255
				pkt = (struct mapipkt *)mapi_get_next_pkt(dbuf->fd,dbuf->fid);
's avatar
committed
256
				gettimeofday(&tv, NULL);
's avatar
committed
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
				dbuf->timestamp = tv.tv_usec;
				if(pkt!=NULL){
					dbuf->cmd = GET_NEXT_PKT_ACK;
					memcpy(dbuf->data, pkt, sizeof(struct mapipkt)-4+pkt->caplen);
					dbuf->length = BASIC_SIZE + sizeof(struct mapipkt) - 4 + pkt->caplen;
				}
				else{
					dbuf->cmd = ERROR_ACK;
					dbuf->length = BASIC_SIZE;
					fprintf(stdout,"mapi_get_next_pkt failed...\n");					
				}
				break;
			case GET_FLOW_INFO:
				fprintf(stdout,"GET_FLOW_INFO\n");
				if(mapi_get_flow_info(dbuf->fd, &flow_info)){
					dbuf->cmd = MAPI_FLOW_INFO_ERR;
					dbuf->length = BASIC_SIZE;
				}
				else{
					dbuf->cmd = GET_FLOW_INFO_ACK;
					memcpy(dbuf->data,&flow_info,sizeof(mapi_flow_info_t));
					dbuf->length = BASIC_SIZE+sizeof(mapi_flow_info_t);
				}
				break;
			case GET_FUNCTION_INFO:
				fprintf(stdout,"GET_FUNCTION_INFO\n");
				if(mapi_get_function_info(dbuf->fd, dbuf->fid, &funct_info)){
					dbuf->cmd = MAPI_FUNCTION_INFO_ERR;
					dbuf->length = BASIC_SIZE;
				}
				else{
					dbuf->cmd = GET_FUNCTION_INFO_ACK;
					memcpy(dbuf->data,&funct_info,sizeof(mapi_function_info_t));
					dbuf->length = BASIC_SIZE+sizeof(mapi_function_info_t);
				}
				break;
			case GET_NEXT_FUNCTION_INFO:
				fprintf(stdout,"GET_NEXT_FUNCTION_INFO\n");
				if(mapi_get_next_function_info(dbuf->fd, dbuf->fid, &funct_info)){
					dbuf->cmd = MAPI_FUNCTION_INFO_ERR;
					dbuf->length = BASIC_SIZE;
				}
				else{
					dbuf->cmd = GET_FUNCTION_INFO_ACK;
					memcpy(dbuf->data,&funct_info,sizeof(mapi_function_info_t));
					dbuf->length = BASIC_SIZE+sizeof(mapi_function_info_t);
				}
				break;				break;
			case GET_NEXT_FLOW_INFO:
				fprintf(stdout,"GET_NEXT_FLOW_INFO\n");
				if(mapi_get_next_flow_info(dbuf->fd, &flow_info)){
					dbuf->cmd = MAPI_FLOW_INFO_ERR;
					dbuf->length = BASIC_SIZE;
				}
				else{
					dbuf->cmd = GET_FLOW_INFO_ACK;
					memcpy(dbuf->data,&flow_info,sizeof(mapi_flow_info_t));
					dbuf->length = BASIC_SIZE+sizeof(mapi_flow_info_t);
				}
				break;
#ifdef WITH_ADMISSION_CONTROL
			case SET_AUTHDATA:
				fprintf(stdout,"SET_AUTHDATA\n");
				if(!agent_send_authdata(dbuf)){
					dbuf->cmd = SET_AUTHDATA_ACK;
				}
				else{
					dbuf->cmd = ERROR_ACK;
				}

				dbuf->length = BASIC_SIZE;
				break;
#endif
's avatar
committed
330 331
#ifdef WITH_AUTHENTICATION
			case AUTHENTICATE:
332
				fprintf(stdout, "AUTHENTICATE\n");
's avatar
 
committed
333
				if(!agent_authenticate(dbuf))
334 335 336
					dbuf->cmd = AUTHENTICATE_ACK;
				else
					dbuf->cmd = ERROR_ACK;
's avatar
 
committed
337
				dbuf->length = BASIC_SIZE;
's avatar
committed
338 339
				break;
#endif
's avatar
committed
340 341 342 343 344
	              default:
				die("Default case found in handle_request loop!\n");
        	        break;
	        }

's avatar
committed
345 346 347 348
		//no need to send responce on mapi_close_flow
		if (dbuf->cmd!=CLOSE_FLOW) {
			send(sock, dbuf, dbuf->length, 0);
		}
's avatar
committed
349

's avatar
committed
350 351
		//dbuf_bytes=dbuf_bytes-((struct dmapiipcbuf *)buffer)->length;			DELETE
		//memcpy(buffer,buffer+((struct dmapiipcbuf *)buffer)->length,dbuf_bytes);	DELETE
's avatar
committed
352 353 354
	}
	
	for(i=0;i<ac_fl_size;++i){//close all remaining flows before this thread exits
's avatar
committed
355
		if(active_flows[i]>0){//this should always be positive or realloc does not work
's avatar
committed
356 357 358 359
			mapi_close_flow(active_flows[i]);
		}
	}
	free(active_flows);
360
	free(dbuf);
's avatar
committed
361 362 363 364
	shutdown(sock, SHUT_RDWR);
	close(sock);

	/* update the global service counter */
's avatar
committed
365
	//pthread_mutex_lock(&lock);	DELETE
's avatar
committed
366
	service_count++;
's avatar
committed
367
	//pthread_mutex_unlock(&lock);	DELETE
's avatar
committed
368 369 370 371 372 373 374 375 376 377 378 379 380 381
	printf("<+> thread %d exiting\n<+> total sockets served: %d\n", (int)pthread_self(), service_count);

	pthread_exit((void *)0);
}


int die(char *msg) {
	perror(msg);
	return EXIT_FAILURE;
}

//calls the appropriate mapi_apply_function and returns the fid from mapid
int getfid(struct dmapiipcbuf *dbuf){
	int result;
's avatar
committed
382
	
383
	char *function = (char *)dbuf->data;
's avatar
committed
384
	char *data = (char *)(dbuf->data+strlen(function)+1);
's avatar
committed
385

's avatar
committed
386 387
	result = mapi_apply_function(dbuf->fd, function, data);
	return (result);
's avatar
committed
388 389
}

's avatar
committed
390 391 392 393