Commit 025c6746 authored by 's avatar
Browse files

Added IPFIX/NetFlow capture & report function "IPFIXCAP". Usable for COMBO netflow design.

Captures flow reports from network or COMBO with netflow design (not via API) and reports
them via SHM to MAPI application. MAPI function is part of ipfixflib and is called "IPFIXCAP".



git-svn-id: file:///home/svn/mapi/trunk@1554 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent c3035d8d
......@@ -22,6 +22,7 @@ ipfixflib_la_SOURCES = \
engine.c engine.h\
ifp-priv.h \
ipfixlib.c \
ipfixcap.c \
ipfixprobe.c \
md5.c md5.h \
npctrl.c npctrl.h \
......@@ -34,6 +35,6 @@ services.c services.h \
util.c util.h
# these headers will be installd in $prefix/include/mapi
pkginclude_HEADERS = ipfixlib.h
pkginclude_HEADERS = ipfixlib.h ipfixcap.h
EXTRA_DIST = COPYING etter.passive.os.fp README-nprobe
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <stdio.h>
#include <errno.h>
#include <unistd.h>
#include <stdlib.h>
#include <time.h>
#include <netinet/in.h>
#include <syslog.h>
#include <string.h>
#include <pthread.h>
#include <signal.h>
#include "mapidevices.h"
#include "mapid.h"
#include "fhelp.h"
#include "debug.h"
#include "mapi_errors.h"
#include "ipfixcap.h"
#define IPFIXCAP "IPFIXCAP"
#define IPFIXCAP_FIFO_SIZE 512
#define LISTEN_QUEUE_MAX 128
#define MESSAGE_SIZE_MAX 65535
typedef enum {
rec_type_undef = 0,
rec_type_nf_v5 = 5,
rec_type_nf_v7 = 7,
rec_type_nf_v9 = 9,
rec_type_ipfix = 10
} ipfixcap_rec_type_t;
static struct {
char *name;
ipfixcap_rec_type_t rtype;
} rec_types[] = {
{ "NETFLOW_V5", rec_type_nf_v5 },
{ "NETFLOW_V7", rec_type_nf_v7 },
{ "NETFLOW_V9", rec_type_nf_v9 },
{ "IPFIX", rec_type_ipfix }
};
typedef struct {
char *rec_type_name;
char *transport_name;
ipfixcap_rec_type_t rec_type;
pthread_t pthread;
pthread_mutex_t mutex;
unsigned int run;
} ipfixcap_idata_t;
typedef struct {
unsigned long read_ptr; // pointer to the next record that can be read
unsigned long write_ptr; // pointer to where the next packet can be written
unsigned long next_recno; // record numner of next record
pthread_spinlock_t fifo_lock;
ipfixcap_dgram_t rbuf[IPFIXCAP_FIFO_SIZE]; // flow record buffer
unsigned bufsize; // buffer size
} ipfixcap_rdata_t;
void *getcmem(size_t nmemb, size_t size) {
void *mem;
if((mem = (void *) calloc (nmemb, size)) == NULL) perror("calloc");
return mem;
}
int socket_bind(const char *node, const char *service, int family) {
int sockfd;
int err;
struct addrinfo addrinfo_req;
struct addrinfo *addrinfo_res;
struct addrinfo *addrinfo_current_res;
memset(&addrinfo_req, 0, sizeof(struct addrinfo));
if (family == AF_UNSPEC) family = AF_INET;
addrinfo_req.ai_flags = AI_PASSIVE;
addrinfo_req.ai_family = family;
addrinfo_req.ai_socktype = SOCK_DGRAM;
err = getaddrinfo(node, service, &addrinfo_req, &addrinfo_res);
if (err) {
DEBUG_CMD(Debug_Message("ipfixcap: getaddrinfo error: [%s]\n", gai_strerror(err)));
return -1;
}
sockfd = -1;
for (addrinfo_current_res = addrinfo_res; addrinfo_current_res != NULL; addrinfo_current_res = addrinfo_current_res->ai_next) {
if (addrinfo_current_res->ai_family == AF_INET || addrinfo_current_res->ai_family == AF_INET6) {
sockfd = socket(addrinfo_current_res->ai_family, addrinfo_current_res->ai_socktype, addrinfo_current_res->ai_protocol);
if (sockfd != -1) {
int opt = 1;
err = setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
if (err == -1) {
DEBUG_CMD(Debug_Message("ipfixcap: bind() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno)));
}
err = bind(sockfd, addrinfo_current_res->ai_addr, addrinfo_current_res->ai_addrlen);
if (err == -1) {
DEBUG_CMD(Debug_Message("ipfixcap: bind() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno)));
close(sockfd); // cleanup
sockfd = -1;
freeaddrinfo(addrinfo_res);
return -1;
}
DEBUG_CMD(Debug_Message("bind(): Bound to %s:%s\n", node, service));
break;
}
}
}
if (sockfd == -1) {
DEBUG_CMD(Debug_Message("ipfixcap: socket() error in %s line %d: could not open socket\n", __FILE__, __LINE__));
freeaddrinfo(addrinfo_res); // cleanup
return -1;
}
freeaddrinfo(addrinfo_res); // cleanup
return sockfd;
}
void ipfixcap_write_shm(const void *mapinf, const void *buffer, uint32_t buf_len) {
DEBUG_CMD(Debug_Message("ipfixcap_write_shm: %d\n", buf_len));
ipfixcap_dgram_t *rec;
unsigned new_write;
int done = FALSE;
ipfixcap_rdata_t *ipfixcap_rdata = (ipfixcap_rdata_t *) mapinf;
while (!done) {
pthread_spin_lock(&ipfixcap_rdata->fifo_lock);
new_write = ipfixcap_rdata->write_ptr + 1;
if (new_write >= IPFIXCAP_FIFO_SIZE)
new_write = 0;
if (new_write == ipfixcap_rdata->read_ptr) {
DEBUG_CMD(Debug_Message("ipfixcap: Flow record no. %ld - %d bytes long dropped", ipfixcap_rdata->next_recno, buf_len));
done = TRUE;
}
else {
rec = (ipfixcap_dgram_t*) (ipfixcap_rdata->rbuf + ipfixcap_rdata->write_ptr);
rec->recno = ipfixcap_rdata->next_recno;
rec->size = buf_len;
memcpy(rec->bytes, buffer, buf_len);
ipfixcap_rdata->write_ptr = new_write;
done = TRUE;
}
pthread_spin_unlock(&ipfixcap_rdata->fifo_lock);
}
ipfixcap_rdata->next_recno++;
}
void ipfixcap_run(mapidflib_function_instance_t *instance) {
DEBUG_CMD(Debug_Message("ipfixcap: ipfixcap_run()\n"));
ipfixcap_idata_t *ipfixcap_idata = (ipfixcap_idata_t *) instance->internal_data;
ipfixcap_rdata_t *ipfixcap_rdata = (ipfixcap_rdata_t *) instance->result.data;
int i;
int sock;
char *node = NULL;
char *service = NULL;
node = strdup(ipfixcap_idata->transport_name);
for (i = strlen (node) - 1; i >= 0; i--) {
if (node[i] == ':') {
if(i < (int) strlen(node) - 1) {
node[i] = '\0';
service = &node[i + 1];
}
else {
node[i] = '\0';
service = NULL;
}
break;
}
}
sock = 0;
sock = socket_bind(node, service, AF_UNSPEC);
if (sock == -1) {
DEBUG_CMD(Debug_Message("ipfixcap: socket_bind() error in %s line %d\n", __FILE__, __LINE__));
return;
}
void *message;
message = malloc(MESSAGE_SIZE_MAX);
if (!message) {
DEBUG_CMD(Debug_Message("ipfixcap: malloc() allocation error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno)));
return;
}
netflow_common_header_t *netflow_common_header;
//netflow_common_header = (netflow_common_header_t *) message;
struct sockaddr_storage ipfixcap_sender;
socklen_t ipfixcap_sender_size = sizeof(ipfixcap_sender);
ssize_t message_len;
ssize_t message_todo;
int expected_flowset_size;
uint16_t version;
uint16_t count;
netflow_v5_header_t *netflow_v5_header;
netflow_v7_header_t *netflow_v7_header;
netflow_v9_header_t *netflow_v9_header;
ipfix_header_t *ipfix_header;
netflow_v5_record_t *netflow_v5_record;
netflow_v7_record_t *netflow_v7_record;
while(ipfixcap_idata->run) {
//message_len = recvfrom(sock, message, MESSAGE_SIZE_MAX, MSG_DONTWAIT, (struct sockaddr *) &ipfixcap_sender, &ipfixcap_sender_size);
message_len = recvfrom(sock, message, MESSAGE_SIZE_MAX, 0, (struct sockaddr *) &ipfixcap_sender, &ipfixcap_sender_size);
//if(errno == EAGAIN || errno == EWOULDBLOCK) {
// fprintf(stderr, ".");
// struct timespec timespec;
// timespec.tv_sec = 0;
// timespec.tv_nsec = 10000;
// nanosleep(&timespec, NULL);
// continue;
//}
if (message_len == -1 && errno != EINTR) {
DEBUG_CMD(Debug_Message("ipfixcap: ERROR: recvfrom: %s\n", strerror(errno)));
continue;
}
if (message_len < (ssize_t) sizeof(netflow_common_header_t)) {
DEBUG_CMD(Debug_Message("ipfixcap: message too short for common netflow header. message_len: %zd\n", message_len));
continue; // XXX ignore bad packets
}
netflow_common_header = (netflow_common_header_t *) message;
version = ntohs(netflow_common_header->version);
// TODO format conversion
switch (version) {
case NETFLOW_V5_VERSION:
DEBUG_CMD(Debug_Message("seems to be netflow_v5_message, message_len: %zd\n", message_len));
message_todo = message_len;
while(message_todo > 0) {
if(message_len < (int) sizeof(netflow_v5_header_t)) {
DEBUG_CMD(Debug_Message("ipfixcap: message too short for netflow_v5_message, message_len: %zd\n", message_len));
return;
}
netflow_v5_header = (netflow_v5_header_t *) message;
count = ntohs(netflow_v5_header->count);
if (count > NETFLOW_V5_RECORDS_IN_PACKET_MAX) {
DEBUG_CMD(Debug_Message("ipfixcap: too many records in netflow_v5 packet: %u, maximum is: %u\n", count, NETFLOW_V5_RECORDS_IN_PACKET_MAX));
return;
}
expected_flowset_size = sizeof(netflow_v5_header_t) + count * sizeof(netflow_v5_record_t);
if (message_todo < expected_flowset_size) {
DEBUG_CMD(Debug_Message("ipfixcap: rest of message too short for expected flowset size (netflow_v5_header + number of netflow_v5_records)\n"));
return;
}
ipfixcap_write_shm(ipfixcap_rdata, netflow_v5_header, expected_flowset_size);
// process header and records in flowset (now just dummy cycle)
for (netflow_v5_record = (netflow_v5_record_t *) (netflow_v5_header + 1); netflow_v5_record < (netflow_v5_record_t *) (netflow_v5_header + 1) + count; netflow_v5_record++);
netflow_v5_header = (netflow_v5_header_t *) netflow_v5_record;
message_todo -= expected_flowset_size;
}
break;
case NETFLOW_V7_VERSION:
DEBUG_CMD(Debug_Message("seems to be netflow_v7_message, message_len: %zd\n", message_len));
message_todo = message_len;
while(message_todo > 0) {
if(message_len < (int) sizeof(netflow_v7_header_t)) {
DEBUG_CMD(Debug_Message("ipfixcap: message too short for netflow_v7_message, message_len: %zd\n", message_len));
return;
}
netflow_v7_header = (netflow_v7_header_t *) message;
count = ntohs(netflow_v7_header->count);
if (count > NETFLOW_V7_RECORDS_IN_PACKET_MAX) {
DEBUG_CMD(Debug_Message("ipfixcap: too many records in netflow_v7 packet: %u, maximum is: %u\n", count, NETFLOW_V7_RECORDS_IN_PACKET_MAX));
return;
}
expected_flowset_size = sizeof(netflow_v7_header_t) + count * sizeof(netflow_v7_record_t);
if (message_todo < expected_flowset_size) {
DEBUG_CMD(Debug_Message("ipfixcap: rest of message too short for expected flowset size (netflow_v7_header + number of netflow_v7_records)\n"));
return;
}
ipfixcap_write_shm(ipfixcap_rdata, netflow_v7_header, expected_flowset_size);
// process header and records in flowset (now just dummy cycle)
for (netflow_v7_record = (netflow_v7_record_t *) (netflow_v7_header + 1); netflow_v7_record < (netflow_v7_record_t *) (netflow_v7_header + 1) + count; netflow_v7_record++);
netflow_v7_header = (netflow_v7_header_t *) netflow_v7_record;
message_todo -= expected_flowset_size;
}
break;
case NETFLOW_V9_VERSION:
DEBUG_CMD(Debug_Message("seems to be netflow_v9_message, message_len: %zd\n", message_len));
if(message_len < (int) sizeof(netflow_v9_header_t)) {
DEBUG_CMD(Debug_Message("ipfixcap: message too short for netflow_v9_message, message_len: %zd\n", message_len));
return;
}
netflow_v9_header = (netflow_v9_header_t *) message;
ipfixcap_write_shm(ipfixcap_rdata, netflow_v9_header, message_len);
break;
case IPFIX_VERSION:
DEBUG_CMD(Debug_Message("seems to be ipfix_message, message_len: %zd\n", message_len));
if(message_len < (int) sizeof(ipfix_header_t)) {
DEBUG_CMD(Debug_Message("ipfixcap: message too short for ipfix_message, message_len: %zd\n", message_len));
return;
}
ipfix_header = (ipfix_header_t *) message;
ipfixcap_write_shm(ipfixcap_rdata, ipfix_header, message_len);
break;
default:
DEBUG_CMD(Debug_Message("ipfixcap: unexpected netflow version %i\n", netflow_common_header->version));
continue; // ignore unexpected packets
}
// each Process_xx function has to process the entire input buffer, therefore it's empty now.
//export_packets++;
}
DEBUG_CMD(Debug_Message("ipfixcap: ipfixcap_run(): bye...\n"));
pthread_exit(NULL);
}
static int ipfixcap_instance(mapidflib_function_instance_t *instance, MAPI_UNUSED int fd, MAPI_UNUSED mapidflib_flow_mod_t *flow_mod) {
DEBUG_CMD(Debug_Message("ipfixcap: ipfixcap_instance()\n"));
int i;
mapiFunctArg* fargs;
char *rec_type_name;
char *transport_name;
//char *key_template; // NetFlow v9
//char *record_template;
ipfixcap_rec_type_t rec_type = rec_type_undef;
fargs = instance->args;
rec_type_name = getargstr(&fargs);
transport_name = getargstr(&fargs);
//key_template = getargstr(&fargs);
//record_template = getargstr(&fargs);
if (rec_type_name == NULL) {
rec_type = rec_type_ipfix;
} else {
for(i = 0; i < (int) (sizeof rec_types / sizeof rec_types[0]); i++) {
if (strcmp(rec_types[i].name, rec_type_name) == 0)
rec_type = rec_types[i].rtype;
}
}
if (rec_type == rec_type_undef) {
DEBUG_CMD(Debug_Message("ipfixcap: init: Illegal record type %s", rec_type_name));
return MFUNCT_INVALID_ARGUMENT;
}
DEBUG_CMD(Debug_Message("ipfixcap: init: rec_type_name=%s (%d)", rec_type_name, rec_type));
DEBUG_CMD(Debug_Message("ipfixcap: init: transport_name=%s", transport_name));
//DEBUG_CMD(Debug_Message("ipfixcap: init: key_template=%sd", key_template)));
//DEBUG_CMD(Debug_Message("ipfixcap: init: record_template=%s", record_template)));
instance->def->shm_size = sizeof(ipfixcap_rdata_t);
DEBUG_CMD(Debug_Message("ipfixcap: shm_size: %d\n", instance->def->shm_size));
// 0 shm (instance->result.data) DIMAPI_DATA_SIZE
// | mapi_result_type | ipfixcap |
return 0;
}
static int ipfixcap_init(mapidflib_function_instance_t *instance, MAPI_UNUSED int fd) {
DEBUG_CMD(Debug_Message("ipfixcap: ipfixcap_init()\n"));
int i;
ipfixcap_idata_t *ipfixcap_idata;
ipfixcap_rdata_t *ipfixcap_rdata;
mapiFunctArg* fargs;
char *rec_type_name;
char *transport_name;
//char *key_template; // NetFlow v9
//char *record_template;
ipfixcap_rec_type_t rec_type = rec_type_undef;
fargs = instance->args;
rec_type_name = getargstr(&fargs);
transport_name = getargstr(&fargs);
//key_template = getargstr(&fargs);
//record_template = getargstr(&fargs);
if (rec_type_name == NULL) {
rec_type = rec_type_ipfix;
} else {
for(i = 0; i < (int) (sizeof rec_types / sizeof rec_types[0]); i++) {
if (strcmp(rec_types[i].name, rec_type_name) == 0)
rec_type = rec_types[i].rtype;
}
}
if (rec_type == rec_type_undef) {
DEBUG_CMD(Debug_Message("ipfixcap: init: Illegal record type %s", rec_type_name));
return MFUNCT_INVALID_ARGUMENT;
}
instance->internal_data = (ipfixcap_idata_t *) getcmem(1, sizeof(ipfixcap_idata_t));
ipfixcap_idata = instance->internal_data;
ipfixcap_idata->rec_type = rec_type;
ipfixcap_idata->rec_type_name = rec_type_name;
ipfixcap_idata->transport_name = transport_name;
DEBUG_CMD(Debug_Message("ipfixcap: init: rec_type_name=%s (%d)", ipfixcap_idata->rec_type_name, ipfixcap_idata->rec_type));
DEBUG_CMD(Debug_Message("ipfixcap: init: transport_name=%s", ipfixcap_idata->transport_name));
//DEBUG_CMD(Debug_Message("ipfixcap: init: key_template=%sd", key_template));
//DEBUG_CMD(Debug_Message("ipfixcap: init: record_template=%s", record_template));
ipfixcap_rdata = instance->result.data;
pthread_spin_init(&ipfixcap_rdata->fifo_lock, 1);
pthread_spin_lock(&ipfixcap_rdata->fifo_lock);
ipfixcap_rdata->read_ptr = 0;
ipfixcap_rdata->write_ptr = 0;
ipfixcap_rdata->next_recno = 0;
memset(ipfixcap_rdata->rbuf, 0, IPFIXCAP_FIFO_SIZE * sizeof(ipfixcap_dgram_t));
ipfixcap_rdata->bufsize = IPFIXCAP_FIFO_SIZE;
pthread_spin_unlock(&ipfixcap_rdata->fifo_lock);
DEBUG_CMD(Debug_Message("ipfixcap: ipfixcap_init: ipfixcap_rdata=%p\n", ipfixcap_rdata));
pthread_mutex_t tmpmutex = PTHREAD_MUTEX_INITIALIZER;
ipfixcap_idata = instance->internal_data;
ipfixcap_idata->mutex = tmpmutex;
ipfixcap_idata->run = 1;
i = pthread_create(&(ipfixcap_idata->pthread), NULL, (void *) &ipfixcap_run, (void *) instance);
return 0;
}
static int ipfixcap_process(MAPI_UNUSED mapidflib_function_instance_t *instance, MAPI_UNUSED unsigned char* dev_pkt, MAPI_UNUSED unsigned char* link_pkt, MAPI_UNUSED mapid_pkthdr_t* pkt_head) {
return 1;
}
static int ipfixcap_cleanup(mapidflib_function_instance_t *instance) {
DEBUG_CMD(Debug_Message("ipfixcap: ipfixcap_cleanup()\n"));
ipfixcap_idata_t *ipfixcap_idata;
ipfixcap_idata = (ipfixcap_idata_t *) instance->internal_data;
DEBUG_CMD(Debug_Message("ipfixcap: cleanup(%p)\n", instance));
if (ipfixcap_idata) {
ipfixcap_idata->run = 0;
sleep(3);
pthread_cancel(ipfixcap_idata->pthread);
free (ipfixcap_idata);
instance->internal_data = NULL;
}
DEBUG_CMD(Debug_Message("ipfixcap: cleanup(%p) finished\n", instance));
return 1;
}
static int ipfixcap_client_init( mapidflib_function_instance_t *instance, void *data) {
DEBUG_CMD(Debug_Message("ipfixcap: ipfixcap_client_init()\n"));
instance->internal_data = (ipfixcap_idata_t *) getcmem(1, sizeof(ipfixcap_idata_t));
data = instance->internal_data;
return(0);
}
static int ipfixcap_client_read_result(mapidflib_function_instance_t *instance, mapi_result_t *res) {
DEBUG_CMD(Debug_Message("ipfixcap: ipfixcap_client_read_result()\n"));
ipfixcap_rdata_t *ipfixcap_rdata = (ipfixcap_rdata_t *) instance->result.data;
ipfixcap_dgram_t *rec = NULL;
int done = FALSE;
while (!done) {
//wait for record(spinlock blocks when no records are ready in the buffer)
pthread_spin_lock(&ipfixcap_rdata->fifo_lock);
if (ipfixcap_rdata->read_ptr != ipfixcap_rdata->write_ptr) {
// Copy flow record from kmem
rec = (ipfixcap_dgram_t*) (ipfixcap_rdata->rbuf + ipfixcap_rdata->read_ptr);
ipfixcap_rdata->read_ptr++;
if(ipfixcap_rdata->read_ptr >= IPFIXCAP_FIFO_SIZE)
ipfixcap_rdata->read_ptr=0;
done = TRUE;
}
pthread_spin_unlock(&ipfixcap_rdata->fifo_lock);
//if (!done) {
// struct timespec timespec;
// timespec.tv_sec = 0;
// timespec.tv_nsec = 10000;
// nanosleep(&timespec, NULL);
//}
}
res->res = rec;
res->size = sizeof *rec;
return 0;
}
/* This function is called when the flow closes and should release all resources
* allocated by the IPFIXCAP function on the client side */
static int ipfixcap_client_cleanup( mapidflib_function_instance_t *instance) {
DEBUG_CMD(Debug_Message("ipfixcap: ipfixcap_client_cleanup()\n"));
free(instance->internal_data);
return(0);
}
static mapidflib_function_def_t finfo = {
"", // libname
IPFIXCAP, // name
"IPFIX and NetFlow capture and report", // descr
"ss", // argdescr
MAPI_DEVICE_ALL, // devtype
MAPIRES_SHM, // method for returning results
0, // set by instance // shm size
0, // modifies packets?
0, // filters packets?
MAPIOPT_NONE, // optimization
ipfixcap_instance, // instance,
ipfixcap_init, // init
ipfixcap_process, // process
NULL, // get_result