Commit 86125893 authored by Arne Øslebø's avatar Arne Øslebø
Browse files

rewritten to use same mechanisms as in res2file

git-svn-id: file:///home/svn/mapi/trunk@804 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent 3cdaba60
......@@ -6,6 +6,8 @@
#include <sys/shm.h>
#include <string.h>
#include <errno.h>
#include <sys/ipc.h>
#include <sys/types.h>
#include <sys/sem.h>
#include "mapidflib.h"
#include "mapidlib.h"
......@@ -15,7 +17,11 @@
#include "fhelp.h"
#include "bucket.h"
#define BUCKET_SIZE 50
#define BUCKET_SIZE 1000
#define ONCE 0
#define ALWAYS 1
#define PERIODIC 2
struct bucket_ringbuffer
{
......@@ -28,15 +34,27 @@ struct bucket_ringbuffer
struct bucket_function_data
{
unsigned long long timeout;
unsigned long long starttime;
unsigned long long ticks;
unsigned long long last;
mapidflib_function_instance_t* funct;
int fd;
int fid;
int reset;
int save;
};
mapidflib_function_def_t* bucket_get_funct_info();
static int parse_save(char* save) {
if(strcmp(save,"-1")==0)
return ONCE;
else if(strcmp(save,"0")==0)
return ALWAYS;
else
return PERIODIC;
}
static int bucket_instance( mapidflib_function_instance_t *instance,
MAPI_UNUSED int flow_descr,
MAPI_UNUSED mapidflib_flow_mod_t *flow_mod)
......@@ -61,6 +79,12 @@ static int bucket_instance( mapidflib_function_instance_t *instance,
#endif
return MFUNCT_INVALID_ARGUMENT;
}
if(parse_save(str_time)==PERIODIC) {
//Move res2file in front of the other function results are read from
flow_mod->reorder=fid;
}
return 0;
}
......@@ -71,20 +95,15 @@ static int bucket_init(mapidflib_function_instance_t *instance,
struct bucket_function_data* fdata;
struct bucket_ringbuffer* rbuf;
unsigned long long timeout;
char *s;
mapiFunctArg* fargs;
int fid,fd;
fargs=instance->args;
fd=getargint(&fargs);
fid=getargint(&fargs);
timeout=fhlp_str2ull(getargstr(&fargs));
s=getargstr(&fargs);
timeout=fhlp_str2ull(s);
/* if(fhlp_get_function_instance(instance->hwinfo->gflist,fd,fid)==NULL)
{
#ifdef DEBUG
printf("Function not found: %u\n",fid);
#endif
return MFUNCT_INVALID_ARGUMENT;
}*/
instance->internal_data=(struct bucket_function_data*)malloc(sizeof(struct bucket_function_data));
fdata=(struct bucket_function_data*)instance->internal_data;
rbuf=(struct bucket_ringbuffer*)instance->result.data;
......@@ -95,8 +114,18 @@ static int bucket_init(mapidflib_function_instance_t *instance,
if((fdata->funct=fhlp_get_function_instance(instance->hwinfo->gflist,fd,fid))==NULL) {
return MFUNCT_INVALID_ARGUMENT;
}
fdata->starttime=0;
fdata->timeout=timeout;
fdata->last=0;
fdata->ticks=timeout;
fdata->save=parse_save(s);
if(fdata->save==PERIODIC && fdata->ticks==0)
return MFUNCT_INVALID_ARGUMENT_3;
fdata->reset=getargint(&fargs);
if(!(fdata->reset==0 || fdata->reset==1))
return MFUNCT_INVALID_ARGUMENT;
if(fhlp_create_semaphore(&rbuf->semaphore,1)!=0)
{
#ifdef DEBUG
......@@ -114,22 +143,49 @@ static int bucket_process(mapidflib_function_instance_t *instance,
struct bucket_function_data* fdata;
struct bucket_ringbuffer* rbuf;
unsigned long long *res;
int s=0;
struct sembuf sem_add={0,1,IPC_NOWAIT};
fdata=(struct bucket_function_data*)instance->internal_data;
rbuf=(struct bucket_ringbuffer*)instance->result.data;
res=(unsigned long long*)(fhlp_get_res(fdata->funct)->data);
if (pkt_head->ts>(fdata->starttime+fdata->timeout))
if(fdata->save==PERIODIC) {
if(fdata->last==0)
fdata->last=pkt_head->ts;
else if(pkt_head->ts-fdata->last>fdata->ticks) {
s=1;
fdata->last+=fdata->ticks;
while(fdata->last+fdata->ticks<pkt_head->ts) {
if(rbuf->writepos<rbuf->ringsize) {
rbuf->writepos++;
if(rbuf->writepos>=rbuf->ringsize)
rbuf->writepos=0;
((struct bucket_data*)(((char*)rbuf)+rbuf->bucket_offset))[rbuf->writepos].timestamp=fdata->last;
((struct bucket_data*)(((char*)rbuf)+rbuf->bucket_offset))[rbuf->writepos].data=(unsigned long long)0;
if(semop(rbuf->semaphore.id,&sem_add,1)==-1) {
//error....
#ifdef DEBUG
printf("Error: %u,%u\n",errno,EINVAL);
#endif
/* should be handled in some way */
}
fdata->last+=fdata->ticks;
}
}
}
}
if (fdata->save==ALWAYS || s==1)
{
//out of the bucket
if(rbuf->writepos<rbuf->ringsize)
{
if(rbuf->writepos<rbuf->ringsize) {
//allow overwrite of old data...
rbuf->writepos++;
if(rbuf->writepos>=rbuf->ringsize) rbuf->writepos=0;
((struct bucket_data*)(((char*)rbuf)+rbuf->bucket_offset))[rbuf->writepos].timestamp=fdata->starttime+((pkt_head->ts-fdata->starttime)/fdata->timeout)*fdata->timeout;
fdata->starttime=fdata->starttime+((pkt_head->ts-fdata->starttime)/fdata->timeout)*fdata->timeout;
if(rbuf->writepos>=rbuf->ringsize)
rbuf->writepos=0;
((struct bucket_data*)(((char*)rbuf)+rbuf->bucket_offset))[rbuf->writepos].timestamp=fdata->last;
((struct bucket_data*)(((char*)rbuf)+rbuf->bucket_offset))[rbuf->writepos].data=*res;
if(semop(rbuf->semaphore.id,&sem_add,1)==-1)
{
//error....
......@@ -138,18 +194,14 @@ static int bucket_process(mapidflib_function_instance_t *instance,
#endif
/* should be handled in some way */
}
if(fdata->funct->def->reset!=NULL)
if(fdata->funct->def->reset!=NULL && fdata->reset==1)
{
//maybe move this to fhlp.c?
fdata->funct->def->reset(fdata->funct);
}
}
}
else
{
//only allow totals, maybe we could add cumulative results...
((struct bucket_data*)(((char*)rbuf)+rbuf->bucket_offset))[rbuf->writepos].data=*res;
}
}
return 1;
}
......@@ -161,11 +213,12 @@ static int bucket_client_read_result(mapidflib_function_instance_t *instance,map
data=(struct bucket_ringbuffer*)instance->result.data;
res->size=sizeof(struct bucket_data);
condid=semget(data->semaphore.key,1,IPC_CREAT|0660);
if(condid==-1)
{
#ifdef DEBUG
#ifdef DEBUG
printf("error in semget\n");
#endif
#endif
}
if(semop(condid,&sem_sub,1)==-1)
{
......@@ -185,13 +238,14 @@ static int bucket_client_init(mapidflib_function_instance_t *instance, void* dat
{
//done by mapi
return 0;
}
}
static mapidflib_function_def_t finfo={
"", //libname
"BUCKET", //name
"Sampling of packets\n\tReturn value: int\nParameters:\n\tint fd of resultfunction\n\tint fid of resultfunction", //descr
"rfs", //argdescr
"rfsi", //argdescr
MAPI_DEVICE_ALL, //devoid
MAPIRES_SHM,
sizeof(struct bucket_ringbuffer)+(BUCKET_SIZE)*sizeof(struct bucket_data),
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment