Commit 06602071 authored by 's avatar
Browse files

Merged most of DiMapi with previous code. Most of the new code has been written using ifdefs.

The only significant change to the old code is the way mapi is initialized.


git-svn-id: file:///home/svn/mapi/trunk@123 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent d37ce50c
......@@ -19,7 +19,7 @@ ifeq ($(WITH_COMBO6),1)
COMBOSIX=mapicombo6drv.so combo6flib.so
endif
ifeq ($(WITH_DMAPI),1)
ifeq ($(WITH_DIMAPI),1)
CFLAGS := $(CFLAGS) -DDIMAPI
endif
......
......@@ -16,7 +16,7 @@ WITH_DAG=0
WITH_COMBO6=1
#Distributed MAPI functionality
WITH_DMAPI=0
WITH_DIMAPI=0
#MAPI function statistics
#With this pkt counters for each function is enabled
......
......@@ -14,7 +14,7 @@ SOLIBOBJS=nprobe.o npktproc.o md5.o engine.o util.o ipfixlib.o \
MAPI_HOME=..
MAPI_INCLUDE=$(MAPI_HOME)
MAPI_OBJ_PATH=$(MAPI_HOME)
MAPI_OBJS=$(MAPI_HOME)/fhelp.o $(MAPI_HOME)/mapiipc.o
MAPI_OBJS=$(MAPI_HOME)/fhelp.o $(MAPI_HOME)/mapiipc.o $(MAPI_HOME)/flist.o
MAPI_DEBUG=2
MAPI_CFLAGS=-g -O2 -Wall $(C_WARNINGS) -Wcast-align $(C_FEATURES)
......
This diff is collapsed.
......@@ -16,6 +16,13 @@
#define HAVE_MSGHDR_MSG_CONTROL 1
#ifdef DIMAPI
#include <netdb.h>
#include <netinet/in.h>
#include <semaphore.h>
#include "flist.h"
#endif
// this file contains all the client-side IPC related functions
int sock;
......@@ -85,6 +92,116 @@ void mapiipc_client_close()
close(sock);
}
#ifdef DIMAPI
struct sockaddr_in remoteaddr;
flist_t *remote_flowlist=NULL;
void mapiipc_remote_write(struct dmapiipcbuf *dbuf, struct host *h)
//Sends an IPC message to mapid
{
// qbuf->uid=getuid();
if(send(h->sockfd, dbuf, dbuf->length, 0) == -1) {
WARNING_CMD(printf("send: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
// exit(1);
}
}
void mapiipc_remote_write_to_all(remote_flowdescr_t* rflow)
{
host_flow* hflow;
flist_node_t* fnode;
pthread_mutex_lock(&rflow->mutex);
for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
hflow=(host_flow*)fnode->data;
hflow->dbuf->fd=hflow->fd;
mapiipc_remote_write(hflow->dbuf, hflow->rhost);
rflow->pending_msgs++;
}
pthread_mutex_unlock(&rflow->mutex);
sem_wait(&rflow->fd_sem);
}
void *mapiipc_comm_thread(void *host) {
//Reads an IPC message. Blocking call
char buffer[MAX_SEND_SIZE];
struct dmapiipcbuf* dbuf;
remote_flowdescr_t* rflow;
host_flow* hflow;
int recv_bytes;
unsigned int dbuf_bytes=0;
(char*)dbuf=&buffer[0];//XXX warning: use of cast expressions as lvalues is deprecated
while (1) {
if (dbuf_bytes==0 || dbuf_bytes<dbuf->length) {
if( (recv_bytes=recv(((struct host *)host)->sockfd, (char*)&buffer+dbuf_bytes, MAX_SEND_SIZE, 0)) == -1) {
//ERROR_CMD(printf("recv: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
exit(1);
//printf("error 1\n");
continue ;
}
dbuf_bytes += recv_bytes;
}
if (dbuf_bytes<dbuf->length) continue;
hflow=(host_flow*)flist_get( ((struct host*)host)->flows, dbuf->fd );
if (hflow!=NULL) {
rflow=flist_get(remote_flowlist, hflow->scope_fd);
memcpy( hflow->dbuf, dbuf, dbuf->length ); //place data
if (dbuf->cmd==GET_NEXT_PKT_ACK) flist_append(rflow->pkt_list, 0, hflow);
if (rflow->pending_msgs!=0) {
pthread_mutex_lock(&rflow->mutex);
--rflow->pending_msgs;
pthread_mutex_unlock(&rflow->mutex);
if ( rflow->pending_msgs==0 ) {
sem_post( &rflow->fd_sem );
}
}
}
else {
printf("INTERNAL SERIOUS FAILURE %d\n",dbuf->fd);
exit(-1);
//failure
}
dbuf_bytes=dbuf_bytes-dbuf->length;
memcpy(buffer,buffer+dbuf->length,dbuf_bytes);
}
}
void mapiipc_remote_init(struct host *h)
//Initializes IPC for dmapi functions
{
if ((h->sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
ERROR_CMD(printf("socket: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
exit(1);
}
// Construct name of dmapid's socket
remoteaddr.sin_family = AF_INET;
remoteaddr.sin_addr = *((struct in_addr *)gethostbyname(h->hostname)->h_addr);
remoteaddr.sin_port = htons(h->port);
if (connect(h->sockfd, (struct sockaddr *)&remoteaddr, sizeof(remoteaddr)) < 0) {
ERROR_CMD(printf("connect: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
exit(EXIT_FAILURE);
}
}
void mapiipc_remote_close(struct host *h)
//Releases socket resources
{
close(h->sockfd);
}
#endif
// Helper functions for function arguments retrieval
int getargint(mapiFunctArg **pos){
......
......@@ -141,6 +141,7 @@ struct dmapiipcbuf {
};
#define BASIC_SIZE (sizeof(struct dmapiipcbuf) - DATA_SIZE)
#define PKT_LENGTH 1600 //pkt info and actual pkt
typedef struct host_flow {
struct host *rhost;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment