Commit 6203701d authored by Arne Øslebø's avatar Arne Øslebø

fixed race condition problem with mapi_read_result

git-svn-id: file:///home/svn/mapi/trunk@333 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent 54981664
......@@ -23,7 +23,7 @@ WITH_DIMAPI=0
WITH_FUNCT_STATS=1
#Support for flow priorities
WITH_PRIORITIES=0
WITH_PRIORITIES=1
PRIORITIES=3
#Support for functions that modifies packets
......
......@@ -200,7 +200,7 @@ void *handle_request(void *arg) {
//fprintf(stdout,"READ_RESULT\n");
dbuf->cmd = READ_RESULT_ACK;
result = mapi_read_results(dbuf->fd,dbuf->fid,MAPI_REF);
result = mapi_read_results(dbuf->fd,dbuf->fid);
gettimeofday(&tv, &tz);
dbuf->timestamp = tv.tv_sec*1000000 + tv.tv_usec;
if(result!=NULL){
......
......@@ -40,7 +40,7 @@ int main(int argc, char* argv[]) {
while(1) { /* forever, report the load */
sleep(1);
dres = mapi_read_results(fd, fid, MAPI_COPY);
dres = mapi_read_results(fd, fid);
sum=0;
for (i=0; i<dres->cnt; i++)
......
......@@ -48,14 +48,14 @@ main(int argc,char **argv)
{
sleep(1);
if(isremote){
dres = (struct dmapi_results*) mapi_read_results(fd,counter,MAPI_COPY);
dres = (struct dmapi_results*) mapi_read_results(fd,counter);
if(dres)
printf("PKTS=%llu\n",*((unsigned long long*)dres->res[0].result));
else
printf("mapi_read_results failed!\n");
}
else{
res = mapi_read_results(fd,counter,MAPI_COPY);
res = mapi_read_results(fd,counter);
if(res)
printf("PKTS=%llu\n",*res);
else
......
......@@ -41,7 +41,7 @@ int main(int argc, char *argv[]){
while(loop--){
sleep(1);
dres1 = (struct dmapi_results*) mapi_read_results(fd,fid,MAPI_COPY);
dres1 = (struct dmapi_results*) mapi_read_results(fd,fid);
for (i=0; i<dres1->cnt; i++)
printf("pkts read in host %s: %llu (time produced: %llu time received: %llu)\n",dres1->res[i].host_dev, *((unsigned long long*)dres1->res[i].result), dres1->res[i].t_produced, dres1->res[i].t_received);
......
......@@ -46,8 +46,8 @@ int main(int argc, char *argv[]){
while(loop--){
sleep(1);
dres1 = (struct dmapi_results*) mapi_read_results(fd,fid1,MAPI_COPY);
dres2 = (struct dmapi_results*) mapi_read_results(fd,fid2,MAPI_COPY);
dres1 = (struct dmapi_results*) mapi_read_results(fd,fid1);
dres2 = (struct dmapi_results*) mapi_read_results(fd,fid2);
for (i=0; i<dres2->cnt; i++)
printf("pkts read in host %s: %llu (time produced: %llu time received: %llu)\n",dres2->res[i].host_dev, *((unsigned long long*)dres2->res[i].result), dres2->res[i].t_produced, dres2->res[i].t_received);
......
......@@ -38,7 +38,7 @@ int main(int argc, char *argv[])
while(1)
{
dres = (struct dmapi_results*) mapi_read_results(fd,fid,MAPI_COPY);
dres = (struct dmapi_results*) mapi_read_results(fd,fid);
for (i=0; i<dres->cnt; i++){
if(*((int*)dres->res[i].result)==0){
printf("host %s: ",dres->res[i].host_dev);
......
......@@ -31,7 +31,7 @@ int main(MAPI_UNUSED int argc, char *argv[])
while (1)
{
cnt=((struct dmapi_results*)mapi_read_results(fd,fid,MAPI_REF))->res[0].result;
cnt=((struct dmapi_results*)mapi_read_results(fd,fid))->res[0].result;
sleep(1);
printf("results: %d\n",*cnt);
......
......@@ -107,6 +107,7 @@ typedef struct flowdescr {
int file; //File descriptor for offline flows
char *devtype;
char *shm_base;
pthread_spinlock_t *shm_spinlock;
flist_t *flist;
int error;
char errstr[MAPI_ERRORSTR_LENGTH];
......@@ -120,6 +121,7 @@ typedef struct functdescr {
mapidflib_function_def_t *def;
mapidflib_function_t *funct;
void *data;
void *result;
} functdescr_t;
typedef struct shm_result {
......@@ -132,8 +134,6 @@ typedef struct shm_result {
*/
static int
default_read_result_init(flowdescr_t *flow,functdescr_t* f,void* data);
static void
default_read_result(functdescr_t* f, void* result);
int
get_results_info(flowdescr_t *flow, functdescr_t *f);
......@@ -845,6 +845,8 @@ int mapi_close_flow(int fd)
if(f->def->client_cleanup!=NULL && f->funct->instance->status==MAPIFUNC_INIT){
f->def->client_cleanup(f->funct->instance);
}
if(f->result!=NULL)
free(f->result);
free(f->funct);
free(f->data);
free(f);
......@@ -1231,6 +1233,7 @@ int mapi_apply_function(int fd, char* funct, ...)
f->def=fdef;
f->result_init=0;
f->data=NULL;
f->result=NULL;
f->funct=malloc(sizeof(mapidflib_function_t));
f->funct->fd=fd;
......@@ -1304,12 +1307,9 @@ int get_results_info(flowdescr_t *flow,functdescr_t *f)
// old signature: int mapi_read_results(int fd, int fid, void *result)
//Read result from a function
//This should be changed to:
// void* mapi_read_results(int fd, int fid,int copy);
//fd: flow descriptor
//fid: ID of function
//result: pointer to structure which results are copied to
void* mapi_read_results(int fd, int fid,enum mapi_read_result_method copy)
void* mapi_read_results(int fd, int fid)
{
flowdescr_t *flow;
functdescr_t *f;
......@@ -1392,39 +1392,28 @@ void* mapi_read_results(int fd, int fid,enum mapi_read_result_method copy)
if(f->def->client_init==NULL)
{
if(f->data == NULL)
return(0);
else
{
if(copy==MAPI_REF)
{
return ((shm_result_t*)f->data)->ptr;
}
else
{
result=(void *)malloc(((shm_result_t*)f->data)->size);
default_read_result(f,result);
return result;
}
}
if(f->data == NULL)
return(0);
else
{
pthread_spin_lock(flow->shm_spinlock);
memcpy(f->result,((shm_result_t*)f->data)->ptr,((shm_result_t*)f->data)->size);
pthread_spin_unlock(flow->shm_spinlock);
return f->result;
}
}
else
{
int func_err=f->def->client_read_result(f->funct->instance,&res);
if(func_err!=0)
{
mapi_set_error(flow, func_err);
return NULL;
}
if(copy==MAPI_COPY)
{
result=(void *)malloc(res.size);
memcpy(result,res.res,res.size);
return result;
}
else
return res.res;
int func_err=f->def->client_read_result(f->funct->instance,&res);
if(func_err!=0)
{
mapi_set_error(flow, func_err);
return NULL;
}
result=(void *)malloc(res.size);
memcpy(result,res.res,res.size);
return result;
}
}
else
......@@ -1432,7 +1421,7 @@ void* mapi_read_results(int fd, int fid,enum mapi_read_result_method copy)
mapi_set_error(flow, MAPI_INVALID_FID_FUNCID);
return NULL;
}
return 0;
}
......@@ -1705,6 +1694,8 @@ static int
default_read_result_init(flowdescr_t *flow,functdescr_t* f,void* data)
{
mapid_shm_t *shm=data;
mapid_shm_t *shm_spinlock=(mapid_shm_t*)((char*)data+sizeof(mapid_shm_t));
int id;
if (flow==NULL) {
......@@ -1727,6 +1718,24 @@ default_read_result_init(flowdescr_t *flow,functdescr_t* f,void* data)
return -1;
}
}
if(!flow->shm_spinlock) {
//Get pointer to shared spinlock memory
id=shmget(shm_spinlock->key, shm_spinlock->buf_size, 660);
if(id<0)
{
DEBUG_CMD(printf("Shared memory error [%s:%d]\n",__FILE__,__LINE__));
mapi_set_error(flow, MAPI_SHM_ERR);
return -1;
}
if ((flow->shm_spinlock=shmat(id, 0, FUNCTION_SHM_PERMS))==NULL) {
mapi_set_error(flow, MAPI_SHM_ERR);
return -1;
}
}
f->data=malloc(sizeof(shm_result_t));
((shm_result_t*)f->data)->ptr=flow->shm_base+shm->offset;
((shm_result_t*)f->data)->size=shm->res_size;
......@@ -1735,14 +1744,12 @@ default_read_result_init(flowdescr_t *flow,functdescr_t* f,void* data)
f->funct->instance->result.data=flow->shm_base+shm->offset;
f->funct->instance->result.data_size=shm->res_size;
f->result=(void *)malloc(((shm_result_t*)f->data)->size);
return 0;
}
static void
default_read_result(functdescr_t* f, void* result)
{
memcpy(result,((shm_result_t*)f->data)->ptr,((shm_result_t*)f->data)->size);
}
int mapi_get_function_info(int fd,int fid, mapi_function_info_t *info)
{
......
......@@ -182,8 +182,7 @@ int mapi_loop(int fd, int fid, int cnt, mapi_handler);
//Read result from a function
//This should be changed to:
void* mapi_read_results(int fd, int fid,enum mapi_read_result_method copy);
//int mapi_read_results(int fd, int fid,void *res);
void* mapi_read_results(int fd, int fid);
//Close a mapi flow
int mapi_close_flow(int fd);
......
......@@ -1644,9 +1644,11 @@ cmd_read_results (int fd, int fid, int pid, int sock)
buf.fd = fd;
//Copy result data
if (result->funct_res_size + sizeof (mapid_shm_t) < DATA_SIZE)
if (result->funct_res_size + 2*sizeof (mapid_shm_t) < DATA_SIZE)
{
memcpy (p, &result->shm, sizeof (mapid_shm_t));
p += sizeof (result->shm);
memcpy (p, &result->shm_spinlock, sizeof (mapid_shm_t));
if (result->funct_res_size > 0)
{
p += sizeof (result->shm);
......
......@@ -37,6 +37,7 @@ typedef struct mapid_result {
void *funct_res; //Pointer to function specific result data
unsigned funct_res_size; //size of result
mapid_shm_t shm;
mapid_shm_t shm_spinlock;
} mapid_result_t;
typedef struct global_function_list {
......
......@@ -143,6 +143,7 @@ process_pkts(void *buf,unsigned len, dag_instance_t *i)
rec = (dag_record_t *) buf;
rlen = ntohs (rec->rlen);
mapid_lock(&i->mapidlib);
while (c + rlen < len)
{
char *p = buf;
......@@ -164,6 +165,7 @@ process_pkts(void *buf,unsigned len, dag_instance_t *i)
rlen = ntohs (rec->rlen);
i->hwinfo.pkts++;
}
mapid_unlock(&i->mapidlib);
return len - c;
......@@ -181,6 +183,7 @@ process_pkts_offline(void *buf,unsigned len, dag_instance_t *i)
rec = (dag_record_t *) buf;
rlen = ntohs (rec->rlen);
mapid_lock(&i->mapidlib);
while (c + rlen <= len && rlen!=0)
{
char *p = buf;
......@@ -202,6 +205,7 @@ process_pkts_offline(void *buf,unsigned len, dag_instance_t *i)
rlen = ntohs (rec->rlen);
i->hwinfo.pkts++;
}
mapid_unlock(&i->mapidlib);
return len - c;
......
......@@ -31,6 +31,9 @@ mapid_init(mapidlib_instance_t *i)
//infor = pointer to info
{
char buf[1024],*str,*s;
char pathname[MAPI_STR_LENGTH];
int fd;
//Copy devtype
/*
i->flowlist=malloc(sizeof(flist_t));
......@@ -73,10 +76,43 @@ mapid_init(mapidlib_instance_t *i)
free(mapid_conf);
}
//Initialized spinlock in shared memory
//Allocate shared memory
strncpy(pathname,FUNCTION_SHM_TEMPLATE,MAPI_STR_LENGTH);
if(mktemp(pathname)==NULL)
return MDLIB_SHM_ERR;
umask(017);
if((fd=open(pathname,O_CREAT|O_EXCL,FUNCTION_SHM_PERMS))<0)
return MDLIB_SHM_ERR;
else
close(fd);
strncpy(i->shm_spinlock_fname,pathname,MAPI_STR_LENGTH);
if((i->shm_spinlock_key=ftok(pathname,FUNCTION_SHM_PROJECT_ID))<0)
return MDLIB_SHM_ERR;
if((i->shm_spinlock_id=shmget(i->shm_spinlock_key,sizeof(pthread_spinlock_t),FUNCTION_SHM_PERMS | IPC_CREAT)) < 0)
return MDLIB_SHM_ERR;
if((i->shm_spinlock=shmat(i->shm_spinlock_id,0,0))==NULL)
return MDLIB_SHM_ERR;
//Initialize spinlock
pthread_spin_init (i->shm_spinlock, PTHREAD_PROCESS_SHARED);
return 0;
};
void mapid_lock(mapidlib_instance_t *i) {
pthread_spin_lock(i->shm_spinlock);
}
void mapid_unlock(mapidlib_instance_t *i) {
pthread_spin_unlock(i->shm_spinlock);
}
//Function that can be used by MAPI functions for adding new functions to the flow
static int mapid_add_funct(mapidlib_instance_t *i, int fd, char *funct, ...)
{
......@@ -188,7 +224,7 @@ static void delete_flow(mapidlib_instance_t *i,struct mapidlibflow *flow) {
pthread_spin_lock(lock);
#ifdef WITH_PRIORITIES
f=flist_priorities_remove(i->flowlist,flow->fd,FLIST_LEAVE_DATA);
f=flist_remove(i->flowlist[flow->priority],flow->fd,FLIST_LEAVE_DATA);
#else
f=flist_remove(i->flowlist,flow->fd,FLIST_LEAVE_DATA);
#endif
......@@ -316,6 +352,12 @@ mapid_connect(mapidlib_instance_t *i,int fd)
fi->result.info.shm.offset=offset;
fi->result.data=flow->shm+offset;
offset+=fi->result.data_size;
//Set information about spinlock
fi->result.info.shm_spinlock.key=i->shm_spinlock_key;
fi->result.info.shm_spinlock.buf_size=i->shm_spinlock_size;
fi->result.info.shm.offset=0;
}
if(fi->def->init!=NULL) {
DEBUG_CMD(printf("Initializing function %s\tfid=%d [%s:%d]\n",fi->def->name,f->fid,__FILE__,__LINE__));
......
......@@ -19,6 +19,12 @@ typedef struct mapidlib_instance {
#endif
// flist_t *flowlist;//=NULL; //List of flows
unsigned fcount; //Number of functions
pthread_spinlock_t *shm_spinlock; //Pointer to start of shared memory that contains a spinlock
unsigned long long shm_spinlock_size; //Size of shared memory
key_t shm_spinlock_key;
int shm_spinlock_id;
char shm_spinlock_fname[MAPI_STR_LENGTH];
}mapidlib_instance_t;
typedef int (*mapid_add_function)(mapidlib_instance_t *i, int fd, char *funct, ...);
......@@ -85,6 +91,8 @@ int mapid_get_errno(mapidlib_instance_t *i,int fid);
int mapid_get_devid(mapidlib_instance_t *i,int fd);
int mapid_load_library(char *lib);
int mapid_get_flow_info(mapidlib_instance_t *i,int fd,mapi_flow_info_t *info);
void mapid_lock(mapidlib_instance_t *i);
void mapid_unlock(mapidlib_instance_t *i);
mapid_funct_info_t* mapid_get_flow_functions(mapidlib_instance_t *i,int fd);
......
......@@ -147,6 +147,8 @@ process_pkts(void *buf,unsigned len, nic_instance_t *i,MAPI_UNUSED int devid, in
rec = (struct pcap_pkthdr *) buf;
rlen = rec->caplen+sizeof(struct pcap_pkthdr);
mapid_lock(&i->mapidlib);
while (c + rlen <= len)
{
char *p = buf;
......@@ -172,6 +174,7 @@ process_pkts(void *buf,unsigned len, nic_instance_t *i,MAPI_UNUSED int devid, in
rec = (struct pcap_pkthdr *) buf;
rlen = rec->caplen+sizeof(struct pcap_pkthdr);
}
mapid_unlock(&i->mapidlib);
return len - c;
......@@ -248,7 +251,9 @@ mapidrv_proc_loop (int devid)
i->hwinfo.pkts++;
// Process packet
mapid_lock(&i->mapidlib);
mapid_process_pkt(&i->mapidlib,packet,packet,&mhdr);
mapid_unlock(&i->mapidlib);
}
}
......
......@@ -35,9 +35,9 @@ int main(MAPI_UNUSED int argc, char *argv[])
sleep(2);
cntc = mapi_read_results(fd,fidc,MAPI_REF);
cntc = mapi_read_results(fd,fidc);
cntf = mapi_read_results(fd,fidf,MAPI_REF);
cntf = mapi_read_results(fd,fidf);
mapi_close_flow(fd);
......
......@@ -5,7 +5,7 @@ int main(MAPI_UNUSED int argc, char *argv[])
{
int fd;
int fid;
unsigned int *byte;
unsigned long long *byte;
if(!argv[1])
return -1;
......@@ -26,7 +26,7 @@ int main(MAPI_UNUSED int argc, char *argv[])
// while(1)
// {
byte = mapi_read_results(fd, fid, MAPI_REF);
byte = mapi_read_results(fd, fid);
// printf("\nBytes till now : %d\n", *byte);
// }
......
......@@ -54,8 +54,8 @@ int main(MAPI_UNUSED int argc, char *argv[])
while(1)
{
sleep(1);
cnt = mapi_read_results(fd,fid,MAPI_REF);
cnt2 = mapi_read_results(fd,fid2,MAPI_REF);
cnt = mapi_read_results(fd,fid);
cnt2 = mapi_read_results(fd,fid2);
printf("\nPackets:: Before COOKING: %d After COOKING: %d",*cnt,*cnt2);
}
......
......@@ -56,7 +56,7 @@ int main(void)
}
while(1) {
cr = mapi_read_results(fd,counter_all,MAPI_REF);
cr = mapi_read_results(fd,counter_all);
pkt=mapi_get_next_pkt(fd,bufid);
printf("pkts:%lld\n",*cr);
}
......
......@@ -50,7 +50,7 @@ int main(MAPI_UNUSED int argc, char *argv[])
printf("4. Error: %d - %s\n", err_no, err_buffer);
}
cn=mapi_read_results(fd, 43, 1);
cn=mapi_read_results(fd, 43);
if((cn) == NULL)
{
char err_buffer[1024];
......
......@@ -68,7 +68,7 @@ int main(MAPI_UNUSED int argc, char *argv[])
else
printf("Error in mapi_get_next_packet\n");
cnt = mapi_read_results(fd2, fid_cnt2, MAPI_COPY);
cnt = mapi_read_results(fd2, fid_cnt2);
// cnt2 = mapi_read_results(fd2, fid_cnt2, MAPI_COPY);
printf("cnt: %d\n", *cnt);
}
......
......@@ -28,7 +28,7 @@ int main(MAPI_UNUSED int argc, char *argv[])
// {
sleep(2);
cnt = mapi_read_results(fd, fid, MAPI_REF);
cnt = mapi_read_results(fd, fid);
printf("\nRead_results : %d\n", *cnt);
// }
......
......@@ -45,9 +45,9 @@ int main(MAPI_UNUSED int argc, char *argv[])
mapi_get_flow_info(fd,&info);
} while(info.status!=FLOW_FINISHED);
cntc = mapi_read_results(fd,fidc,MAPI_REF);
cntc = mapi_read_results(fd,fidc);
cntf = mapi_read_results(fd,fidf,MAPI_REF);
cntf = mapi_read_results(fd,fidf);
//printf("\nPackets read :%d\t Packets filtered : %d", *cntc, *cntf);
......
......@@ -33,7 +33,7 @@ int main(MAPI_UNUSED int argc, char *argv[])
mapi_connect(fd);
res=&((struct ethereal_reti_data*)mapi_read_results(fd,fid,MAPI_REF))->result;
res=&((struct ethereal_reti_data*)mapi_read_results(fd,fid))->result;
do
{
......
......@@ -37,7 +37,7 @@ int main(MAPI_UNUSED int argc, char *argv[])
}
mapi_connect(fd);
res=((struct ethereal_rets_data*)mapi_read_results(fd,fid,MAPI_REF))->result;
res=((struct ethereal_rets_data*)mapi_read_results(fd,fid))->result;
do
{
......
......@@ -39,20 +39,20 @@ int main(MAPI_UNUSED int argc, char *argv[])
}
mapi_start_offline_device(device);
cnt = mapi_read_results(flows[0],fid[0],MAPI_REF);
prev=now=0;
do
{
prev=now;
cnt = mapi_read_results(flows[0],fid[0]);
now = *cnt;
if(now<prev)
printf("WE HAVE A FUCKUP\n");
printf("Should not happen!\n");
mapi_get_flow_info(flows[0],&info);
}while(info.status!=FLOW_FINISHED);
printf("\n");
for(i = 0;i<NUMBER;i++)
{
cnt = mapi_read_results(flows[i],fid[i],MAPI_REF);
cnt = mapi_read_results(flows[i],fid[i]);
printf("\nPacket read from flow %d : %d\n",i, *cnt);
mapi_close_flow(flows[i]);
......
......@@ -38,7 +38,7 @@ int main(MAPI_UNUSED int argc, char *argv[])
mapi_get_flow_info(fd,&info);
} while(info.status!=FLOW_FINISHED);
stats = mapi_read_results(fd,fid,MAPI_REF);
stats = mapi_read_results(fd,fid);
if(stats->count!=893 || stats->sum!=283464.0 ||
......
......@@ -39,8 +39,8 @@ int main(MAPI_UNUSED int argc, char *argv[])
mapi_get_flow_info(fd,&info);
} while(info.status!=FLOW_FINISHED);
cntc = mapi_read_results(fd,fidc,MAPI_REF);
cnts = mapi_read_results(fd,fids,MAPI_REF);
cntc = mapi_read_results(fd,fidc);
cnts = mapi_read_results(fd,fids);
if(*cntc == 10 && *cnts == 1)
printf("\nOffline STR_SEARCH OK\n");
......
......@@ -28,7 +28,7 @@ int main(MAPI_UNUSED int argc, char *argv[])