Commit 14e6f8d9 authored by 's avatar
Browse files

DiMAPI implementation added

git-svn-id: file:///home/svn/mapi/trunk@162 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent 91ca4393
......@@ -357,6 +357,7 @@ void delete_remote_flow(remote_flowdescr_t* rflow)
pthread_spin_unlock(&remote_ipc_lock);
sem_destroy(&rflow->fd_sem);
sem_destroy(&rflow->pkt_sem);
pthread_mutex_destroy(&rflow->mutex);
for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=fnode2 ) {
......@@ -366,7 +367,7 @@ void delete_remote_flow(remote_flowdescr_t* rflow)
flist_remove(hflow->rhost->flows, hflow->fd, FLIST_LEAVE_DATA);
if (hflow->rhost->num_flows==0) {
mapiipc_remote_close(hflow->rhost); //close the socket
pthread_kill(*hflow->rhost->comm_thread, 9);
//pthread_kill(*hflow->rhost->comm_thread, 9);
flist_destroy(hflow->rhost->flows, FLIST_LEAVE_DATA);
free(hflow->rhost->flows);
free(hflow->rhost->hostname);
......@@ -416,6 +417,7 @@ int mapi_create_flow(char *dev)
rflow=(remote_flowdescr_t *)malloc(sizeof(remote_flowdescr_t));
rflow->fd=++fdseed;
sem_init(&rflow->fd_sem, 0, 0);
sem_init(&rflow->pkt_sem, 0, 0);
pthread_mutex_init(&rflow->mutex, NULL);
rflow->pending_msgs=0;
rflow->host_flowlist=(flist_t*)malloc(sizeof(flist_t));
......@@ -1394,8 +1396,8 @@ mapi_get_next_pkt(int fd,int fid)
hflow->dbuf->length=BASIC_SIZE;
mapiipc_remote_write(hflow->dbuf, hflow->rhost);
}
rflow->pending_msgs=1;
sem_wait(&rflow->fd_sem); //wait at least one host packet
rflow->pending_msgs=-1;
sem_wait(&rflow->pkt_sem); //wait at least one host packet
hflow=(host_flow*)flist_pop_first(rflow->pkt_list);
switch(hflow->dbuf->cmd) {
case GET_NEXT_PKT_ACK:
......@@ -1421,8 +1423,8 @@ mapi_get_next_pkt(int fd,int fid)
}
else {
//What if no packet arrived yet?
while ( flist_size(rflow->pkt_list)==0 );
//if no packet arrived yet wait
sem_wait(&rflow->pkt_sem);
hflow=(host_flow*)flist_pop_first(rflow->pkt_list);
......
......@@ -152,9 +152,15 @@ void *mapiipc_comm_thread(void *host) {
if (hflow!=NULL) {
rflow=flist_get(remote_flowlist, hflow->scope_fd);
memcpy( hflow->dbuf, dbuf, dbuf->length ); //place data
if (dbuf->cmd==GET_NEXT_PKT_ACK) flist_append(rflow->pkt_list, 0, hflow);
if (dbuf->cmd==GET_NEXT_PKT_ACK)
{
flist_append(rflow->pkt_list, 0, hflow);
sem_post(&rflow->pkt_sem);
}
if (rflow->pending_msgs!=0) {
if (rflow->pending_msgs>0) {
pthread_mutex_lock(&rflow->mutex);
--rflow->pending_msgs;
pthread_mutex_unlock(&rflow->mutex);
......
......@@ -158,6 +158,7 @@ typedef struct remote_flowdescr {
flist_t* host_flowlist;
flist_t* pkt_list; //FIFO list for get_next_pkt
sem_t fd_sem;
sem_t pkt_sem;
unsigned int pending_msgs;
pthread_mutex_t mutex;
struct mapipkt* pkt;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment