Commit 856cbf2a authored by 's avatar
Browse files

fix bug for mapi_get_next_pkt in DiMAPI and some memory leaks



git-svn-id: file:///home/svn/mapi/trunk@574 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent 4af71b7f
......@@ -418,6 +418,7 @@ static void delete_remote_flow(remote_flowdescr_t* rflow)
{
host_flow* hflow;
flist_node_t* fnode, *fnode2;
mapi_results_t* res;
pthread_spin_lock(&remote_ipc_lock);
flist_remove(remote_flowlist, rflow->fd,FLIST_LEAVE_DATA);
......@@ -449,11 +450,27 @@ static void delete_remote_flow(remote_flowdescr_t* rflow)
free(hflow->functions);
free(hflow->dev);
free(hflow->dbuf);
free(hflow->pkt);
flist_remove(rflow->host_flowlist,hflow->id,FLIST_FREE_DATA);
}
if (rflow->pkt_list!=NULL) flist_destroy(rflow->pkt_list, FLIST_LEAVE_DATA);
free(rflow->pkt);
flist_destroy(rflow->function_res, FLIST_FREE_DATA);
flist_destroy(rflow->host_flowlist, FLIST_LEAVE_DATA);
free(rflow->host_flowlist);
if (rflow->pkt_list!=NULL) {
flist_destroy(rflow->pkt_list, FLIST_LEAVE_DATA);
free(rflow->pkt_list);
}
//free(rflow->pkt); DELETE
for (fnode=flist_head(rflow->function_res); fnode!=NULL; fnode=fnode2 ) {
fnode2=flist_next(fnode);
res=(mapi_results_t*)fnode->data;
free(res->res);
free(res);
}
flist_destroy(rflow->function_res, FLIST_LEAVE_DATA);
#ifdef WITH_AUTHENTICATION
free(rflow->username);
free(rflow->vo);
#endif
free(rflow);
}
#endif
......@@ -502,9 +519,13 @@ int mapi_create_flow(const char *dev)
rflow->host_flowlist=(flist_t*)malloc(sizeof(flist_t));
flist_init(rflow->host_flowlist);
rflow->pkt_list=NULL;
rflow->pkt=(struct mapipkt*)malloc(sizeof(struct mapipkt)+PKT_LENGTH);
//rflow->pkt=(struct mapipkt*)malloc(sizeof(struct mapipkt)+PKT_LENGTH); DELETE
rflow->function_res=(flist_t*)malloc(sizeof(flist_t));
rflow->is_connected = 0;
#ifdef WITH_AUTHENTICATION
rflow->username=NULL;
rflow->vo=NULL;
#endif
flist_init(rflow->function_res);
k=strtok(devp, ", ");
......@@ -552,6 +573,7 @@ int mapi_create_flow(const char *dev)
flist_append(h->flows, --negfdseed, hflow);
hflow->fd=negfdseed;
hflow->dbuf=(struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf));
hflow->pkt=(struct mapipkt*)malloc(sizeof(struct mapipkt)+PKT_LENGTH);
hflow->rhost=h;
hflow->functions=(flist_t *)malloc(sizeof(flist_t));
flist_init(hflow->functions);
......@@ -1040,8 +1062,10 @@ int mapi_close_flow(int fd)
if(f->def->client_cleanup!=NULL && f->funct->instance->status==MAPIFUNC_INIT){
f->def->client_cleanup(f->funct->instance);
}
if(f->result!=NULL)
if(f->result!=NULL) {
free(f->result->res);
free(f->result);
}
free(f->funct->instance);
free(f->funct);
free(f->data);
......@@ -1708,18 +1732,18 @@ mapi_get_next_pkt(int fd,int fid)
hflow->dbuf->fid=((function_data*)flist_get(hflow->functions, fid))->fid;
hflow->dbuf->length=BASIC_SIZE;
if (mapiipc_remote_write(hflow->dbuf, hflow->rhost)<0) {
local_err = MCOM_SOCKET_ERROR;
local_err = MCOM_SOCKET_ERROR;
return NULL;}
}
pthread_mutex_lock(&rflow->mutex);
rflow->pending_msgs=0;
pthread_mutex_unlock(&rflow->mutex);
//pthread_mutex_lock(&rflow->mutex); DELETE
//rflow->pending_msgs=0; DELETE
//pthread_mutex_unlock(&rflow->mutex); DELETE
sem_wait(&rflow->pkt_sem); //wait at least one host packet
hflow=(host_flow*)flist_pop_first(rflow->pkt_list);
switch(hflow->dbuf->cmd) {
case GET_NEXT_PKT_ACK:
memcpy(rflow->pkt, hflow->dbuf->data, hflow->dbuf->length-BASIC_SIZE);
if (hflow==NULL) return NULL;
//switch(hflow->dbuf->cmd) { DELETE
// case GET_NEXT_PKT_ACK: DELETE
// memcpy(rflow->pkt, hflow->dbuf->data, hflow->dbuf->length-BASIC_SIZE); DELETE
//send request for next packet from this host
hflow->dbuf->cmd=GET_NEXT_PKT;
......@@ -1730,18 +1754,19 @@ mapi_get_next_pkt(int fd,int fid)
local_err = MCOM_SOCKET_ERROR;
return NULL;
}
return rflow->pkt;
case ERROR_ACK:
printf("Error! mapi_get_next_pkt did not work!\n");
local_err = MCOM_ERROR_ACK;
return NULL;
default:
printf("Error! mapi_get_next_pkt did not work!\n");
local_err = MCOM_UNKNOWN_ERROR;
return NULL;
}
return NULL;
return hflow->pkt;
//case ERROR_ACK: DELETE
// printf("Error! mapi_get_next_pkt did not work!\n");
// local_err = MCOM_ERROR_ACK;
// return NULL;
//default:
// printf("Error! mapi_get_next_pkt did not work!\n");
// local_err = MCOM_UNKNOWN_ERROR;
// return NULL;
//}
//return NULL; DELETE
}
else {
......@@ -1749,10 +1774,11 @@ mapi_get_next_pkt(int fd,int fid)
sem_wait(&rflow->pkt_sem);
hflow=(host_flow*)flist_pop_first(rflow->pkt_list);
if (hflow==NULL) return NULL;
switch(hflow->dbuf->cmd) {
case GET_NEXT_PKT_ACK:
memcpy(rflow->pkt, hflow->dbuf->data, hflow->dbuf->length-BASIC_SIZE);
//switch(hflow->dbuf->cmd) { DELETE
// case GET_NEXT_PKT_ACK:
// memcpy(rflow->pkt, hflow->dbuf->data, hflow->dbuf->length-BASIC_SIZE);
//send request for next packet from this host
hflow->dbuf->cmd=GET_NEXT_PKT;
......@@ -1763,15 +1789,15 @@ mapi_get_next_pkt(int fd,int fid)
local_err = MCOM_SOCKET_ERROR;
return NULL;
}
return rflow->pkt;
case ERROR_ACK:
printf("Error! mapi_get_next_pkt did not work!\n");
return NULL;
default:
printf("Error! mapi_get_next_pkt did not work!\n");
return NULL;
}
return NULL;
return hflow->pkt;
//case ERROR_ACK: DELETE
// printf("Error! mapi_get_next_pkt did not work!\n");
// return NULL;
//default:
// printf("Error! mapi_get_next_pkt did not work!\n");
// return NULL;
//}
//return NULL;
}
}
#endif
......
......@@ -167,16 +167,15 @@ void *mapiipc_comm_thread(void *host) {
hflow=(host_flow*)flist_get( ((struct host*)host)->flows, dbuf->fd );
if (hflow!=NULL) {
rflow=flist_get(remote_flowlist, hflow->scope_fd);
memcpy( hflow->dbuf, dbuf, dbuf->length ); //place data
if (dbuf->cmd==GET_NEXT_PKT_ACK)
{
if (dbuf->cmd==GET_NEXT_PKT_ACK) {
memcpy(hflow->pkt, dbuf->data, dbuf->length-BASIC_SIZE);
flist_append(rflow->pkt_list, 0, hflow);
sem_post(&rflow->pkt_sem);
}
pthread_mutex_lock(&rflow->mutex);
if (rflow->pending_msgs>0) {
}
else {
memcpy( hflow->dbuf, dbuf, dbuf->length ); //place data
pthread_mutex_lock(&rflow->mutex);
//if (rflow->pending_msgs>0) {
--rflow->pending_msgs;
pending=rflow->pending_msgs;
pthread_mutex_unlock(&rflow->mutex);
......@@ -184,8 +183,9 @@ void *mapiipc_comm_thread(void *host) {
if ( pending==0 ) {
sem_post( &rflow->fd_sem );
}
//}
//else pthread_mutex_unlock(&rflow->mutex);
}
else pthread_mutex_unlock(&rflow->mutex);
}
else {
printf("Invalid IPC message, unknown fd %d\n",dbuf->fd);
......
......@@ -154,6 +154,7 @@ typedef struct host_flow {
int fd; //fd of flow in the mapid of host
int id;
struct dmapiipcbuf *dbuf; //buffer for writting results from this host -for this flow-
struct mapipkt* pkt;
flist_t *functions; //holds all fids for this host_flow
} host_flow;
......@@ -166,7 +167,7 @@ typedef struct remote_flowdescr {
sem_t pkt_sem;
unsigned int pending_msgs;
pthread_mutex_t mutex;
struct mapipkt* pkt;
//struct mapipkt* pkt; DELETE
flist_t* function_res;
unsigned char is_connected; // This should be 1 if the flow is connected 0 otherwise
/* int error;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment