Commit 8e60d754 authored by 's avatar

Addition of a new mapi function, mapi_stats(), which returns statistics

for a device similar to pcap_stats().


git-svn-id: file:///home/svn/mapi/trunk@1247 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent 94344348
......@@ -49,6 +49,8 @@ MAPI \- Monitoring Application Programming Interface
.BI "int mapi_get_scope_size(int " fd ");"
.br
.BI "int mapi_is_remote(int " fd ");"
.br
.BI "int mapi_stats(char *" dev ", struct mapi_stat *" stats ");"
.SH DESCRIPTION
MAPI builds on the simple and powerful abstraction of the network flow. In
MAPI, a network flow is generally defined as a sequence of packets that
......@@ -414,6 +416,49 @@ Returns 1 if the flow
.IR fd
is associated with a remote sensor, otherwise returns 0.
If the flow does not exist, it returns -1.
.sp
.BI "int mapi_stats(char *" dev ", struct mapi_stat *" stats ");"
.br
.B mapi_stats()
fills a
.B mapi_stat
struct with statistics for the device
.IR dev
from when starting reading from the device to the time of the call.
The
.B struct mapi_stat
is defined in mapi.h:
.sp
.nf
struct mapi_stat {
unsigned int ps_recv; /* number of packets received */
unsigned int ps_drop; /* number of packets dropped */
unsigned int ps_ifdrop; /* drops by interface */
char *hostname;
char *dev;
};
.fi
.sp
Enough memory for
.IR stats
must have been previously allocated.
.sp
In case of DiMAPI, the
.IR dev
can be a network scope, that is a list with pairs of hostname - device
(e.g. "host1:dev1, host2:dev2").
The
.IR stats
should be a table of
.B struct mapi_stat
records, one for each host - device pair, that will be filled with
the statistics of the corresponding devices in the remote hosts.
.sp
In case of error,
.B mapi_stats()
returns -1.
On success, it returns the number of devices for which statistics are
given.
.\".B \-<a command line switch>
.\"<description of what that switch does>
.\".TP
......
......@@ -254,6 +254,7 @@ void mapiipc_client_close()
#ifdef DIMAPI
struct sockaddr_in remoteaddr;
flist_t *remote_flowlist=NULL;
sem_t stats_sem;
int mapiipc_remote_write(struct dmapiipcbuf *dbuf, struct host *h){ // sends an IPC message to mapid
......@@ -320,12 +321,14 @@ void *mapiipc_comm_thread(void *host){ // reads an IPC message - blocking call
remote_flowdescr_t* rflow;
host_flow* hflow;
int recv_bytes;
struct mapi_stat *stat;
flist_node_t* fnode;
#ifdef RECONNECT
int check_net;
struct dmapiipcbuf dbuf_;
#endif
/* Guarantees that thread resources are deallocated upon return */
pthread_detach(pthread_self());
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); // enable cancellation
......@@ -509,7 +512,7 @@ void *mapiipc_comm_thread(void *host){ // reads an IPC message - blocking call
if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0) // send an IPC message to mapicommd
break;
check_mapi_functions((struct host *) host);
mapi_get_next_packet((struct host *) host);
sem_trywait(& ((struct host *) host)->connection); // lock the semaphore only if the semaphore is currently not locked
......@@ -521,6 +524,28 @@ void *mapiipc_comm_thread(void *host){ // reads an IPC message - blocking call
#endif
}
}
if (dbuf->cmd==MAPI_STATS_ACK) {
for (fnode=flist_head(((struct host*)host)->stats); fnode!=NULL; fnode=flist_next(fnode)) {
stat=(struct mapi_stat*)fnode->data;
if ( strcmp((char*)dbuf->data+sizeof(struct mapi_stat), stat->dev)==0 ) {
strcpy(((struct mapi_stat*)dbuf->data)->hostname, stat->hostname);
strcpy(((struct mapi_stat*)dbuf->data)->dev, stat->dev);
memcpy(stat, dbuf->data, sizeof(struct mapi_stat));
flist_remove(((struct host*)host)->stats, fnode->id);
break;
}
}
sem_post( &stats_sem );
continue;
}
else if (dbuf->cmd==MAPI_STATS_ERR) {
sem_post( &stats_sem );
continue;
}
hflow=(host_flow*)flist_get( ((struct host*)host)->flows, dbuf->fd );
......
......@@ -97,7 +97,10 @@ typedef enum {
GET_NEXT_PKT,
GET_NEXT_PKT_ACK,
IGNORE_SLEEP, // reconnection ...
IGNORE_NOTIFY
IGNORE_NOTIFY,
MAPI_STATS,
MAPI_STATS_ACK,
MAPI_STATS_ERR
} mapiipcMsg;
......@@ -163,6 +166,7 @@ struct host {
#ifdef RECONNECT
sem_t connection; // use it in mapiipc.c source code file
#endif
flist_t *stats; //for mapi_stats
};
//Buffer that is sent to/from mapi and agent
......
......@@ -27,6 +27,8 @@ int mapidrv_load_library(int devid,char* lib);
int mapidrv_add_device(const char *devname,int file, int devid, global_function_list_t *gflist, void *param);
int mapidrv_delete_device(int devid);
int mapidrv_start_offline_device( int devid);
int mapidrv_stats(int devid, char **devtype, struct mapi_stat *stats);
#ifdef WITH_AUTHENTICATION
int mapidrv_authenticate(int, int, char *);
#endif
......
......@@ -442,6 +442,34 @@ mapidrv_create_flow (int devid, int fd, char **devtype)
return mapid_add_flow(&i->mapidlib,fd,&i->hwinfo,NULL);
}
int
mapidrv_stats (int devid, char **devtype, struct mapi_stat *stats)
{
nic_instance_t *i=flist_get(devlist,devid);
char errbuf[PCAP_ERRBUF_SIZE];
struct pcap_stat p_stats;
*devtype=MAPI_DEVICE_NIC;
if (i->pcap!=NULL)
{
if( pcap_stats(i->pcap, &p_stats) != 0 )
{
ERROR_CMD(fprintf(stderr,"pcap_stats: %s [%s:%d]\n",errbuf,__FILE__,__LINE__));
return MAPI_STATS_ERROR;
}
else
{
stats->ps_recv=p_stats.ps_recv;
stats->ps_drop=p_stats.ps_drop;
stats->ps_ifdrop=p_stats.ps_ifdrop;
return 0;
}
}
return MAPI_STATS_ERROR;
}
int
mapidrv_connect (int devid,int fd)
{
......
......@@ -146,5 +146,7 @@
8002 = "Communication error with admission control"
#define ADMCTRL_AUTH_FAILED 8003
8003 = "Authorization failed"
#define MAPI_STATS_ERROR 8004
8004 = "Error in mapi_stats"
#define MAPID_MEM_ALLOCATION_ERROR 3084
3084 = "Error Allocating Memory"
......@@ -72,6 +72,7 @@ static void delete_remote_flow(remote_flowdescr_t* rflow);
flist_t *hostlist=NULL;//list containing all remote hosts used so far
extern flist_t *remote_flowlist;
int dimapi_port;
extern sem_t stats_sem;
typedef struct function_data{
......@@ -626,6 +627,7 @@ int mapi_create_flow(const char *dev)
h->functions = (flist_t *)malloc(sizeof(flist_t));
flist_init(h->functions);
h->num_flows=0;
h->stats=NULL;
#ifdef RECONNECT
sem_init(&h->connection, 0, 0); // initialize semaphore
#endif
......@@ -4331,3 +4333,194 @@ int mapi_apply_function_array(int fd, const char* funct, char** args, unsigned i
return qbuf.fid;
}
int mapi_stats(const char *dev, struct mapi_stat *stats)
{
struct mapiipcbuf qbuf;
struct devgroupdb *devgroupdb;
int devgroupid = 0;
char *mapidsocket, *mapidsocketglobal;
#ifdef DIMAPI
char *hostname=NULL, *s=NULL, *k=NULL;
struct host *h=NULL;
char *devp;
struct dmapiipcbuf dbuf;
int seed=0;
int i;
#endif
if(dev==NULL){
DEBUG_CMD(printf( "Error wrong device name given \n\n"));
local_err = MAPI_DEVICE_INFO_ERR;
return -1;
}
#ifndef DIMAPI
devgroupdb = devgroupdb_open(3); // try local, global
devgroupid = devgroupdb_getgroupidbydevice(devgroupdb, (char *) dev);
if(!devgroupid) {
mapidsocket = printf_string(MAPIDSOCKHOME, getenv("HOME"));
mapidsocketglobal = strdup(MAPIDSOCKGLOBAL);
}
else {
mapidsocket = printf_string(MAPIDGSOCKHOME, getenv("HOME"), devgroupid);
mapidsocketglobal = printf_string(MAPIDGSOCKGLOBAL, devgroupid);
}
mapiipc_set_socket_names(mapidsocket, mapidsocketglobal);
pthread_once(&mapi_is_initialized, (void*)mapi_init);
#endif
//check if flow is remote or not and call the appropriate init function
#ifdef DIMAPI
if ( strchr(dev,':')==NULL) {
devgroupdb = devgroupdb_open(3); // try local, global
devgroupid = devgroupdb_getgroupidbydevice(devgroupdb, (char *) dev);
if(!devgroupid) {
mapidsocket = printf_string(MAPIDSOCKHOME, getenv("HOME"));
mapidsocketglobal = strdup(MAPIDSOCKGLOBAL);
}
else {
mapidsocket = printf_string(MAPIDGSOCKHOME, getenv("HOME"), devgroupid);
mapidsocketglobal = printf_string(MAPIDGSOCKGLOBAL, devgroupid);
}
mapiipc_set_socket_names(mapidsocket, mapidsocketglobal);
pthread_once(&mapi_is_initialized, (void*)mapi_init);
}
else pthread_once(&dmapi_is_initialized, (void*)dmapi_init);
if ((s = strchr(dev,':'))!=NULL) {
devp=strdup(dev);
k=strtok(devp, ", ");
sem_init(&stats_sem, 0, 0);
while (k!=NULL) {
if ((s = strchr(k,':'))!=NULL) {
*s = '\0';
hostname = k;
k = s + 1;
//printf("host: %s - device: %s\n",hostname, k);
pthread_spin_lock(&hostlist_lock);
h = (struct host *)flist_search(hostlist, hostcmp, hostname);
if(h==NULL){// Our host is a new one --> insert it in the hostlist
h = (struct host *)malloc(sizeof(struct host));
h->hostname = strdup(hostname);
//printf("New host %s\n",hostname);
h->port = dimapi_port;
h->flows = (flist_t *)malloc(sizeof(flist_t));
flist_init(h->flows);
h->functions = (flist_t *)malloc(sizeof(flist_t));
flist_init(h->functions);
h->num_flows=0;
h->stats=NULL;
#ifdef RECONNECT
sem_init(&h->connection, 0, 0); // initialize semaphore
#endif
// Create the socket
if (mapiipc_remote_init(h)<0) {
local_err = MCOM_SOCKET_ERROR;
printf("Could not connect with host %s\n",h->hostname);
pthread_spin_unlock(&hostlist_lock);
return -1;
}
h->comm_thread=(pthread_t *)malloc(sizeof(pthread_t));
pthread_create(h->comm_thread, NULL, *mapiipc_comm_thread, h);
flist_append(hostlist, h->sockfd, h);
pthread_spin_unlock(&hostlist_lock);
}
else{//host exists in the list
//printf("%s host again\n",h->hostname);
pthread_spin_unlock(&hostlist_lock);
}
if (h->stats==NULL) {
h->stats = (flist_t *)malloc(sizeof(flist_t));
flist_init(h->stats);
}
stats[seed].hostname=strdup(hostname);
stats[seed].dev=strdup(k);
flist_append(h->stats,seed,&stats[seed]);
dbuf.cmd=MAPI_STATS;
strncpy((char *)dbuf.data,k,DATA_SIZE);
dbuf.length=BASIC_SIZE+strlen(k)+1;
if (mapiipc_remote_write(&dbuf, h)<0) return -1;
seed++;
}
k=strtok(NULL,", ");
}
free(devp);
//wait for results
for (i=0; i<seed; i++)
sem_wait(&stats_sem);
sem_destroy(&stats_sem);
return seed;
}
#endif
pthread_spin_lock(&mapi_lock);
if ((get_numflows() == 0) && !minit) { //create the socket
if(mapiipc_client_init()==-1){
local_err = MCOM_INIT_SOCKET_ERROR;
}
}
pthread_spin_unlock(&mapi_lock);
strncpy((char *)qbuf.data,dev,DATA_SIZE);
qbuf.mtype=1;
qbuf.cmd=MAPI_STATS;
qbuf.fd=getpid();
qbuf.pid=getpid();
pthread_spin_lock(&mapi_lock);
if (mapiipc_write((struct mapiipcbuf*)&qbuf)<0) {
local_err = MCOM_SOCKET_ERROR;
pthread_spin_unlock(&mapi_lock);
return -1;
}
if (mapiipc_read((struct mapiipcbuf*)&qbuf)<0) {
local_err = MCOM_SOCKET_ERROR;
pthread_spin_unlock(&mapi_lock);
return -1;
}
pthread_spin_unlock(&mapi_lock);
switch(qbuf.cmd)
{
case MAPI_STATS_ACK:
memcpy(stats, qbuf.data, sizeof(struct mapi_stat));
stats->hostname=strdup("localhost");
stats->dev=strdup(dev);
return 1;
case MAPI_STATS_ERR:
local_err=qbuf.remote_errorcode;
return -1;
default:
local_err=MAPI_STATS_ERROR;
return -1;
}
}
......@@ -159,6 +159,16 @@ typedef struct mapi_lib_info {
unsigned int functs; //Number of functions in the library
} mapi_lib_info_t;
/*
* As returned by mapi_stats()
*/
struct mapi_stat {
unsigned int ps_recv; /* number of packets received */
unsigned int ps_drop; /* number of packets dropped */
unsigned int ps_ifdrop; /* drops by interface */
char *hostname;
char *dev;
};
//Prototype of the mapi_loop callback function
typedef void (*mapi_handler)(const struct mapipkt*);
......@@ -237,5 +247,7 @@ extern int mapi_is_remote(int fd);
extern int mapi_get_function_info(int fd, int fid, mapi_function_info_t *info);
//Get information about a function applied to a flow
extern int mapi_get_next_function_info(int fd, int fid, mapi_function_info_t *info);
//mapi stats
extern int mapi_stats(const char *dev, struct mapi_stat *stats);
#endif
......@@ -324,6 +324,8 @@ void *handle_request(void *arg) {
int errno;
char errorstr[MAPI_ERRORSTR_LENGTH], str[30], str_[30];
long file_size;
struct mapi_stat stats;
char* dev;
#ifdef RECONNECT
int flag = 0;
#endif
......@@ -588,6 +590,24 @@ void *handle_request(void *arg) {
dbuf->length = BASIC_SIZE;
break;
#endif
case MAPI_STATS:
dev=strdup(dbuf->data);
mapid_result = mapi_stats(dev, &stats);
//fprintf(stdout,"MAPI_STATS (%s, %d)\n",dbuf->data, mapid_result);
if(mapid_result <0) {
dbuf->cmd = MAPI_STATS_ERR;
mapi_read_error(&errno, errorstr);
memcpy(dbuf->data, &errno, sizeof(int));
dbuf->length = BASIC_SIZE+sizeof(int);
}
else {
dbuf->cmd = MAPI_STATS_ACK;
memcpy(dbuf->data, &stats, sizeof(struct mapi_stat));
memcpy(dbuf->data+sizeof(struct mapi_stat), dev, strlen(dev));
dbuf->length = BASIC_SIZE+sizeof(struct mapi_stat)+strlen(dev);
}
free(dev);
break;
default:
die("Default case found in handle_request loop!\n");
break;
......
......@@ -137,6 +137,7 @@ struct _mapidrv
};
int (*mapidrv_create_flow) (int devid, int fd, char **devtype);
int (*mapidrv_stats) (int devid, char **devtype, struct mapi_stat *stats);
int (*mapidrv_create_offline_flow) (int devid, int format, int fd,
char **devtype);
......@@ -2082,6 +2083,102 @@ ret:
mapiipc_daemon_write (&qbuf, sock);
}
static void
cmd_stats (char *device, int pid, uid_t uid, int sock) /*removed id, id==pid here */
//dev = device
//if = IPC id used to send ack message back to client
{
struct client *cl;
struct mapiipcbuf buf;
char *devtype;
mapidrv *drv;
int err = 0;
char* dev=device;
struct mapi_stat stats;
if (running_shutdown)
err = MAPI_SHUTTING_DOWN;
//Decide which driver to use
for (drv = drvlist; drv != NULL; drv = drv->next)
{
if (drv->device != NULL)
if (strcmp (dev, drv->device) == 0)
{
DEBUG_CMD (printf
("Using driver %s for %s [%s:%d]\n", drv->name, dev,__FILE__, __LINE__));
break;
}
}
if (drv == NULL)
{
DEBUG_CMD (fprintf
(stderr, "No driver found for %s [%s:%d]\n", dev, __FILE__,
__LINE__));
report_error (MAPID_NO_DRIVER, pid, sock);
return;
}
//Calls driver
if (err == 0)
{
mapidrv_stats =
get_drv_funct (drv->handle, "mapidrv_stats");
err = mapidrv_stats (drv->devid, &devtype, &stats);
}
if (err != 0)
{
buf.mtype = pid;
buf.cmd = MAPI_STATS_ERR;
buf.remote_errorcode=err;
buf.fd = 0;
mapiipc_daemon_write ((struct mapiipcbuf *) &buf, sock);
return;
}
else
{
//check if this is the first time we hear from this client
cl = flist_get (clientlist, pid);
if (cl == NULL)
{
cl = (struct client *) malloc (sizeof (struct client));
cl->pid = pid;
cl->sock = sock;
// init the list that holds references to the flows of this client
if ((cl->flowlist = malloc (sizeof (flist_t))) == NULL)
{
ERROR_CMD (printf
("cmd_create_flow:malloc new client struct: %s [%s:%d]\n",
strerror (errno), __FILE__, __LINE__));
exit (EXIT_FAILURE);
}
flist_init (cl->flowlist);
if ((cl->devicelist = malloc (sizeof (flist_t))) == NULL)
{
ERROR_CMD (printf
("cmd_create_flow:malloc new client struct: %s [%s:%d]\n",
strerror (errno), __FILE__, __LINE__));
exit (EXIT_FAILURE);
}
flist_init (cl->devicelist);
cl->numflows = 0;
cl->numdevs = 0;
flist_append (clientlist, pid, cl);
}
//Send ack back to user
buf.mtype = pid;
memcpy ((char *)buf.data, &stats, sizeof(struct mapi_stat));
buf.cmd = MAPI_STATS_ACK;
buf.fd = 0;
}
mapiipc_daemon_write (&buf, sock);
}
/*
* mfukar
......@@ -2725,9 +2822,10 @@ mapidcom ()
break;
case GET_DEVICE_INFO:
cmd_get_device_info (qbuf.fd, qbuf.pid, s);
break;
break;
case MAPI_STATS:
cmd_stats ((char *)qbuf.data, qbuf.pid, qbuf.uid, s);
break;
case APPLY_FUNCTION:
cmd_apply_function (&qbuf, qbuf.pid, s);
break;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment