Commit ee104eec authored by 's avatar

Add 'agent' component for DiMAPI

git-svn-id: file:///home/svn/mapi/trunk@126 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent 4c93678e
C_WARNINGS = -Wall -Wsign-compare -Wpointer-arith -Wnested-externs \
-Wmissing-declarations -Wcast-align -Wchar-subscripts
#C_FEATURES = -D_GNU_SOURCE -D_THREAD_SAFE
CFLAGS = -O2 $(C_WARNINGS) $(C_FEATURES) -g -DDIMAPI
INCLUDE = -I..
LDLIBS = -lpthread
LDFLAGS =
CC = gcc
AR = ar
RM = rm -f
BINS = agent
.PHONY: all clean
all: $(BINS)
%.o : %.c
$(CC) $(CFLAGS) $(INCLUDE) -c $<
agent: agent.o
$(CC) $(CFLAGS) ../mapi.so $< -o $@ $(LDLIBS)
clean:
$(RM) *.o $(BINS)
#include <stdio.h>
#include <stdlib.h>
#include <string.h> /* memset */
#include <unistd.h> /* close, read, write */
#include <ctype.h> /* toupper */
#include <pthread.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h> /* for sockaddr_in and inet_ntoa() */
#include <sys/time.h>
#include "mapi.h"
#include "mapiipc.h"
#define MAXPENDING 5 /* Maximum outstanding connection requests */
#define PORT 2233
//TEMPORARY
typedef struct fhlp_sem {
int id;
key_t key;
char fname[MAPI_STR_LENGTH];
} fhlp_sem_t;
#include "../stdlib/hashsamp.h"
#include "../stdlib/bucket.h"
void print_table(int a[], int size);
void print_table(int a[], int size){
printf("Active flows: ");
while(size){
printf("%d ", a[--size]);
}
printf("\n");
return;
}
//END OF TEMPORARY
pthread_mutex_t lock;
int service_count;
void *handle_request(void *);
int die(char *msg);
int getfid(struct dmapiipcbuf *dbuf);
int main(int argc, char **argv) {
int serv_sock = 0;
int new_sock = 0; /* client's socket descriptor (from connect()) */
socklen_t clnt_len; /* length of client address data structure */
int yes=1;
struct sockaddr_in serv_addr;
struct sockaddr_in clnt_addr;
pthread_t chld_thr;
if ((serv_sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1)
die("Unexpected error on socket()");
memset(&serv_addr, 0, sizeof serv_addr);
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(PORT);
/* DANGEROUS, but useful for debugging, so leave it for now */
if (setsockopt(serv_sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
close(serv_sock);
die("Unexpected error on setsockopt()");
}
if (bind(serv_sock, (struct sockaddr *)&serv_addr, sizeof serv_addr) == -1) {
close(serv_sock);
die("Unexpected error on bind()");
}
/* queue max 5 connections */
if (listen(serv_sock, MAXPENDING) == -1) {
shutdown(serv_sock, SHUT_RDWR);
close(serv_sock);
die("Unexpected error on listen()");
}
while(1) {
clnt_len = sizeof clnt_addr;
if ((new_sock = accept(serv_sock, (struct sockaddr *)&clnt_addr, &clnt_len)) == -1)
die("Unexpected error on accept()");
//continue;
printf("<*> got connection from %s\n", inet_ntoa(clnt_addr.sin_addr));
if (pthread_create(&chld_thr, NULL, handle_request, (void *)new_sock) != 0)
die("pthread_create() failed");
}
return 0; /* never reached */
}
void *handle_request(void *arg) {
int sock = (int) arg;
int recv_bytes;
char buffer[MAX_SEND_SIZE];
struct dmapiipcbuf *dbuf=NULL;
int mapid_result;
void *result;
unsigned int dbuf_bytes=0;
int i;
int *active_flows = NULL;
int ac_fl_size=0;
mapi_function_info_t funct_info;
mapi_flow_info_t flow_info;
struct timeval tv; /*used for timestamping results when produced */
struct timezone tz;
struct mapipkt *pkt;
/* Guarantees that thread resources are deallocated upon return */
pthread_detach(pthread_self());
dbuf = (struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf));
printf("<+> new thread %d, socket number = %d\n", (int)pthread_self(), sock);
while(1) {
if (dbuf_bytes==0 || dbuf_bytes<((struct dmapiipcbuf *)buffer)->length){
recv_bytes = recv(sock, (char*)buffer+dbuf_bytes, MAX_SEND_SIZE, 0);
if (recv_bytes == 0) { /* the peer has gone */
printf("Peer has gone\n");
break;
}
else if (recv_bytes == -1) {
die("recv()");
break;
}
dbuf_bytes += recv_bytes;
}
if (dbuf_bytes<((struct dmapiipcbuf *)buffer)->length) continue;
memcpy(dbuf,buffer,((struct dmapiipcbuf *)buffer)->length);
switch(dbuf->cmd) {
case CREATE_FLOW:
mapid_result = mapi_create_flow(dbuf->data);
fprintf(stdout,"CREATE_FLOW (%s, %d)\n",dbuf->data, mapid_result);
if(mapid_result <0)
dbuf->cmd = ERROR_ACK;
else
dbuf->cmd = CREATE_FLOW_ACK;
memcpy(dbuf->data, &mapid_result, sizeof(int));
active_flows = realloc(active_flows,ac_fl_size+1);
active_flows[ac_fl_size++] = mapid_result;
print_table(active_flows,ac_fl_size);
dbuf->length = BASIC_SIZE+sizeof(int);
break;
case CLOSE_FLOW:
fprintf(stdout,"CLOSE_FLOW (%d)\n",dbuf->fd);
mapid_result = mapi_close_flow(dbuf->fd);
if(!mapid_result){
for(i=0;i<ac_fl_size;++i){
if(active_flows[i] == dbuf->fd){
active_flows[i] = active_flows[--ac_fl_size];
active_flows = realloc(active_flows,ac_fl_size);
}
}
dbuf->cmd = CLOSE_FLOW_ACK;
}
else{
dbuf->cmd = ERROR_ACK;
}
dbuf->length = BASIC_SIZE;
break;
case CONNECT:
fprintf(stdout,"CONNECT (%d)",dbuf->fd);
mapid_result = mapi_connect(dbuf->fd);
if(mapid_result >= 0){
dbuf->cmd = CONNECT_ACK;
fprintf(stdout," OK\n");
}
else{
dbuf->cmd = ERROR_ACK;
fprintf(stdout," FAILED\n");
}
dbuf->length = BASIC_SIZE;
break;
case APPLY_FUNCTION:
fprintf(stdout,"APPLY_FUNCTION: ");
if((( dbuf->fid = getfid(dbuf))!=-1)){
dbuf->cmd = APPLY_FUNCTION_ACK;
}
else{
dbuf->cmd = ERROR_ACK;
}
dbuf->length = BASIC_SIZE;
break;
case READ_RESULT:
//fprintf(stdout,"READ_RESULT\n");
dbuf->cmd = READ_RESULT_ACK;
result = mapi_read_results(dbuf->fd,dbuf->fid,MAPI_REF);
gettimeofday(&tv, &tz);
dbuf->timestamp = tv.tv_sec*1000000 + tv.tv_usec;
if(result!=NULL){
mapi_get_function_info(dbuf->fd,dbuf->fid,&funct_info);
if(!strcmp(funct_info.name,"PKT_COUNTER") || !strcmp(funct_info.name,"BYTE_COUNTER")){
memcpy(dbuf->data, result, sizeof(unsigned long long));
dbuf->length = BASIC_SIZE + sizeof(unsigned long long);
}
else if(!strcmp(funct_info.name,"HASH")){
memcpy(dbuf->data, result, sizeof(unsigned long long));
dbuf->length = BASIC_SIZE + sizeof(unsigned long long);
}
else if(!strcmp(funct_info.name,"HASHSAMP")){
memcpy(dbuf->data, result, sizeof(struct sample));
dbuf->length = BASIC_SIZE + sizeof(struct sample);
}
else if(!strcmp(funct_info.name,"BUCKET")){
memcpy(dbuf->data, result, sizeof(struct bucket_data *));
dbuf->length = BASIC_SIZE + sizeof(struct bucket_data *);
}
//to be continued...
}
else{
fprintf(stdout,"mapi_read_results failed...\n");
dbuf->cmd = ERROR_ACK;
dbuf->length = BASIC_SIZE;
}
break;
case GET_NEXT_PKT:
fprintf(stdout,"GET_NEXT_PKT\n");
pkt = (struct mapipkt *)mapi_get_next_pkt(dbuf->fd,dbuf->fid);
gettimeofday(&tv, &tz);
dbuf->timestamp = tv.tv_usec;
if(pkt!=NULL){
dbuf->cmd = GET_NEXT_PKT_ACK;
memcpy(dbuf->data, pkt, sizeof(struct mapipkt)-4+pkt->caplen);
dbuf->length = BASIC_SIZE + sizeof(struct mapipkt) - 4 + pkt->caplen;
}
else{
dbuf->cmd = ERROR_ACK;
dbuf->length = BASIC_SIZE;
fprintf(stdout,"mapi_get_next_pkt failed...\n");
}
break;
case GET_FLOW_INFO:
fprintf(stdout,"GET_FLOW_INFO\n");
if(mapi_get_flow_info(dbuf->fd, &flow_info)){
dbuf->cmd = MAPI_FLOW_INFO_ERR;
dbuf->length = BASIC_SIZE;
}
else{
dbuf->cmd = GET_FLOW_INFO_ACK;
memcpy(dbuf->data,&flow_info,sizeof(mapi_flow_info_t));
dbuf->length = BASIC_SIZE+sizeof(mapi_flow_info_t);
}
break;
case GET_FUNCTION_INFO:
fprintf(stdout,"GET_FUNCTION_INFO\n");
if(mapi_get_function_info(dbuf->fd, dbuf->fid, &funct_info)){
dbuf->cmd = MAPI_FUNCTION_INFO_ERR;
dbuf->length = BASIC_SIZE;
}
else{
dbuf->cmd = GET_FUNCTION_INFO_ACK;
memcpy(dbuf->data,&funct_info,sizeof(mapi_function_info_t));
dbuf->length = BASIC_SIZE+sizeof(mapi_function_info_t);
}
break;
case GET_NEXT_FUNCTION_INFO:
fprintf(stdout,"GET_NEXT_FUNCTION_INFO\n");
if(mapi_get_next_function_info(dbuf->fd, dbuf->fid, &funct_info)){
dbuf->cmd = MAPI_FUNCTION_INFO_ERR;
dbuf->length = BASIC_SIZE;
}
else{
dbuf->cmd = GET_FUNCTION_INFO_ACK;
memcpy(dbuf->data,&funct_info,sizeof(mapi_function_info_t));
dbuf->length = BASIC_SIZE+sizeof(mapi_function_info_t);
}
break; break;
case GET_NEXT_FLOW_INFO:
fprintf(stdout,"GET_NEXT_FLOW_INFO\n");
if(mapi_get_next_flow_info(dbuf->fd, &flow_info)){
dbuf->cmd = MAPI_FLOW_INFO_ERR;
dbuf->length = BASIC_SIZE;
}
else{
dbuf->cmd = GET_FLOW_INFO_ACK;
memcpy(dbuf->data,&flow_info,sizeof(mapi_flow_info_t));
dbuf->length = BASIC_SIZE+sizeof(mapi_flow_info_t);
}
break;
#ifdef WITH_ADMISSION_CONTROL
case SET_AUTHDATA:
fprintf(stdout,"SET_AUTHDATA\n");
if(!agent_send_authdata(dbuf)){
dbuf->cmd = SET_AUTHDATA_ACK;
}
else{
dbuf->cmd = ERROR_ACK;
}
dbuf->length = BASIC_SIZE;
break;
#endif
default:
die("Default case found in handle_request loop!\n");
break;
}
if(dbuf->cmd==CLOSE_FLOW){
fprintf(stdout,"AGENT BREAKING\n");//XXX if only one flow exists
break;//XXX it should only break if it is the last flow (we don't have that info)
}
send(sock, dbuf, dbuf->length, 0);
dbuf_bytes=dbuf_bytes-((struct dmapiipcbuf *)buffer)->length;
memcpy(buffer,buffer+((struct dmapiipcbuf *)buffer)->length,dbuf_bytes);
}
for(i=0;i<ac_fl_size;++i){//close all remaining flows before this thread exits
if(active_flows[dbuf_bytes]>0){//this should always be positive or realloc does not work
mapi_close_flow(active_flows[i]);
}
}
free(active_flows);
shutdown(sock, SHUT_RDWR);
close(sock);
/* update the global service counter */
pthread_mutex_lock(&lock);
service_count++;
pthread_mutex_unlock(&lock);
printf("<+> thread %d exiting\n<+> total sockets served: %d\n", (int)pthread_self(), service_count);
pthread_exit((void *)0);
}
int die(char *msg) {
perror(msg);
return EXIT_FAILURE;
}
//calls the appropriate mapi_apply_function and returns the fid from mapid
//
//fully unoptimized:
//1) find a way not to use strcmp
//2) use generic int variables - less readable but uses less memory
int getfid(struct dmapiipcbuf *dbuf){
int result;
//redundant declarations - but more readable code
char *p;
//for STR_SEARCH:
int offset, depth;
//for SAMPLE
int value, mode;
//for COOKING
int threshold, timeout;
//for HASHAMP
int range, keep;
//for TO_FILE
int format, count;
char *path;
//for BUCKET
unsigned long long ltimeout;
int funct;
//for THRESHOLD
int type, boundary, divider;
unsigned long long lthreshold;
char* function=dbuf->data;
char* data=dbuf->data+strlen(dbuf->data)+1;
if(!strcmp(function,"BPF_FILTER")){//Checked
fprintf(stdout,"%s, %s\n",function, data);
result = mapi_apply_function(dbuf->fd, function, data);
return(result);
}
else if(!strcmp(function,"PKT_COUNTER")){//Checked
fprintf(stdout,"PKT_COUNTER\n");
result = mapi_apply_function(dbuf->fd, function);
return(result);
}
else if(!strcmp(function,"BYTE_COUNTER")){//Checked
fprintf(stdout,"BYTE_COUNTER\n");
return(mapi_apply_function(dbuf->fd, function));
}
else if(!strcmp(function,"STR_SEARCH")){//Checked
fprintf(stdout,"STR_SEARCH\n");
p = data + strlen(data) + 1;
memcpy(&offset,p,sizeof(int));
p += sizeof(int);
memcpy(&depth,p,sizeof(int));
result = mapi_apply_function(dbuf->fd, function, data, offset, depth);
return(result);
}
else if(!strcmp(function,"TO_BUFFER")){//Checked
fprintf(stdout,"TO_BUFFER\n");
return(mapi_apply_function(dbuf->fd, function));
}
else if(!strcmp(function,"SAMPLE")){//Checked
fprintf(stdout,"SAMPLE\n");
p = data;
memcpy(&value,p,sizeof(int));
p += sizeof(int);
memcpy(&mode,p,sizeof(int));
printf("value == %d, mode == %d\n",value,mode);
result = mapi_apply_function(dbuf->fd, function, value, mode);
return(result);
}
else if(!strcmp(function,"HASHSAMP")){
//XXX not tested
fprintf(stdout,"HASHSAMP\n");
p = data;
memcpy(&range,p,sizeof(int));
p += sizeof(int);
memcpy(&keep,p,sizeof(int));
printf("range == %d, keep == %d\n",range,keep);
result = mapi_apply_function(dbuf->fd, function, range, keep);
return(result);
}
else if(!strcmp(function,"TO_FILE")){
//XXX does not work the way we want
//if called here, the file will be created at the agent's machine
fprintf(stdout,"TO_FILE\n");
p = data;
memcpy(&format,p,sizeof(int));
p += 2*sizeof(int);
path = (char *)malloc(strlen(p)+1);
memcpy(path,p,strlen(p)+1);
p += strlen(path)+1;
memcpy(&count,p,sizeof(int));
printf("format == %d, path == %s, count == %d\n",format,path,count);
result = mapi_apply_function(dbuf->fd, function, format, path, count);
free(path);
return(result);
}
else if(!strcmp(function,"ETHEREAL")){
//XXX this function is in the man page but isn't recognized by the mapi stub in this version
fprintf(stdout,"ETHEREAL\n");
return(mapi_apply_function(dbuf->fd, function, data));
}
else if(!strcmp(function,"HASH")){//Checked
fprintf(stdout,"HASH\n");
return(mapi_apply_function(dbuf->fd, function));
}
else if(!strcmp(function,"COOKING")){
//XXX not tested
fprintf(stdout,"COOKING\n");
p = data;
memcpy(&threshold,p,sizeof(int));
p += sizeof(int);
memcpy(&timeout,p,sizeof(int));
printf("threshold == %d, timeout == %d\n", threshold, timeout);
result = mapi_apply_function(dbuf->fd, function, threshold, timeout);
return(result);
}
else if(!strcmp(function,"BUCKET")){
//XXX not tested
fprintf(stdout,"BUCKET\n");
p = data;
memcpy(&ltimeout,p,sizeof(unsigned long long));
p += sizeof(unsigned long long);
memcpy(&funct,p,sizeof(int));
printf("timeout == %lld, function == %d\n", ltimeout, funct);
result = mapi_apply_function(dbuf->fd, function, ltimeout, function);
return(result);
}
else if(!strcmp(function,"THRESHOLD")){
//XXX not tested
fprintf(stdout,"THRESHOLD\n");
p = data;
memcpy(&type,p,sizeof(int));
p += sizeof(int);
memcpy(&funct,p,sizeof(int));
p += sizeof(int);
memcpy(&lthreshold,p,sizeof(unsigned long long));//need to check type argument first
p += sizeof(unsigned long long);
memcpy(&boundary,p,sizeof(int));
p += sizeof(int);
memcpy(&timeout,p,sizeof(int));
p += sizeof(int);
memcpy(&divider,p,sizeof(int));
p += sizeof(int);
memcpy(&count,p,sizeof(int));
printf("type == %d, function == %d, ", type, funct);
printf("boundary == %d, timeout == %d, divider == %d, count == %d", boundary, timeout, divider, count);
printf("threshold == %llu\n",lthreshold);
result = mapi_apply_function(dbuf->fd, function, threshold, timeout);
return(result);
}
else{
printf("Unknown function: %s\n", dbuf->data);
}
return(-1);
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment