Commit b153a842 authored by 's avatar

some changes in dimapi stuff


git-svn-id: file:///home/svn/mapi/trunk@743 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent 8de7ab0c
......@@ -121,16 +121,18 @@ int mapiipc_remote_write_to_all(remote_flowdescr_t* rflow)
host_flow* hflow;
flist_node_t* fnode;
pthread_mutex_lock(&rflow->mutex);
rflow->pending_msgs=0;
//pthread_mutex_lock(&rflow->mutex);
//rflow->pending_msgs=0;
for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
hflow=(host_flow*)fnode->data;
hflow->dbuf->fd=hflow->fd;
if (mapiipc_remote_write(hflow->dbuf, hflow->rhost)<0) return -1;
rflow->pending_msgs++;
//rflow->pending_msgs++;
}
//pthread_mutex_unlock(&rflow->mutex);
for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
sem_wait(&rflow->fd_sem);
}
pthread_mutex_unlock(&rflow->mutex);
sem_wait(&rflow->fd_sem);
return 0;
}
......@@ -142,8 +144,11 @@ void *mapiipc_comm_thread(void *host) {
host_flow* hflow;
int recv_bytes;
//unsigned int dbuf_bytes=0; DELETE
int pending;
//int pending;
int sockfd=((struct host *)host)->sockfd;
struct timeval tv1, tv2;
int cnt=-3;
/* Guarantees that thread resources are deallocated upon return */
pthread_detach(pthread_self());
......@@ -154,26 +159,8 @@ void *mapiipc_comm_thread(void *host) {
while (1) {
if (host==NULL) break;
/*if (dbuf_bytes==0 || (dbuf->length<DIMAPI_DATA_SIZE && dbuf_bytes<dbuf->length)) {
if (host==NULL) return NULL;
recv_bytes=recv(sockfd, (char*)&buffer+dbuf_bytes, DIMAPI_DATA_SIZE, 0);
if (recv_bytes == -1) {
//ERROR_CMD(printf("recv: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
//exit(1);
//printf("error in recv\n");
continue ;
}
else if (recv_bytes == 0) {
//printf("Socket closed\n");
//exit(1);
return NULL;
}
dbuf_bytes += recv_bytes;
}
if (dbuf_bytes<dbuf->length) continue;*/
// gettimeofday(&tv1,NULL);
recv_bytes=readn(sockfd, dbuf, BASIC_SIZE);
if (recv_bytes == 0) { // the peer has gone
......@@ -202,7 +189,12 @@ void *mapiipc_comm_thread(void *host) {
continue;
}
}
// gettimeofday(&tv2,NULL);
// fprintf(stderr,"%d internal read delay %lu sec and %lu usec from host %s\n",cnt++,tv2.tv_sec-tv1.tv_sec, tv2.tv_usec-tv1.tv_usec,((struct host *)host)->hostname);
fflush(stderr);
// gettimeofday(&tv1,NULL);
hflow=(host_flow*)flist_get( ((struct host*)host)->flows, dbuf->fd );
if (hflow!=NULL) {
rflow=flist_get(remote_flowlist, hflow->scope_fd);
......@@ -213,15 +205,15 @@ void *mapiipc_comm_thread(void *host) {
}
else {
memcpy( hflow->dbuf, dbuf, dbuf->length ); //place data
pthread_mutex_lock(&rflow->mutex);
//pthread_mutex_lock(&rflow->mutex);
//if (rflow->pending_msgs>0) {
--rflow->pending_msgs;
pending=rflow->pending_msgs;
pthread_mutex_unlock(&rflow->mutex);
//--rflow->pending_msgs;
//pending=rflow->pending_msgs;
//pthread_mutex_unlock(&rflow->mutex);
if ( pending==0 ) {
//if ( pending==0 ) {
sem_post( &rflow->fd_sem );
}
//}
//}
//else pthread_mutex_unlock(&rflow->mutex);
}
......@@ -232,6 +224,9 @@ void *mapiipc_comm_thread(void *host) {
//failure
continue;
}
// gettimeofday(&tv2,NULL);
// fprintf(stderr,"%d internal read delay %lu sec and %lu usec from host %s\n",cnt++,tv2.tv_sec-tv1.tv_sec, tv2.tv_usec-tv1.tv_usec,((struct host *)host)->hostname);
//dbuf_bytes=dbuf_bytes-dbuf->length; DELETE
//memcpy(buffer,buffer+dbuf->length,dbuf_bytes); DELETE
......
......@@ -166,8 +166,8 @@ typedef struct remote_flowdescr {
flist_t* pkt_list; //FIFO list for get_next_pkt
sem_t fd_sem;
sem_t pkt_sem;
unsigned int pending_msgs;
pthread_mutex_t mutex;
//unsigned int pending_msgs;
//pthread_mutex_t mutex;
//struct mapipkt* pkt; DELETE
flist_t* function_res;
unsigned char is_connected; // This should be 1 if the flow is connected 0 otherwise
......
......@@ -434,7 +434,7 @@ static void delete_remote_flow(remote_flowdescr_t* rflow)
sem_destroy(&rflow->fd_sem);
sem_destroy(&rflow->pkt_sem);
pthread_mutex_destroy(&rflow->mutex);
//pthread_mutex_destroy(&rflow->mutex);
for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=fnode2 ) {
fnode2=flist_next(fnode);
......@@ -528,8 +528,8 @@ int mapi_create_flow(const char *dev)
rflow->fd=++fdseed;
sem_init(&rflow->fd_sem, 0, 0);
sem_init(&rflow->pkt_sem, 0, 0);
pthread_mutex_init(&rflow->mutex, NULL);
rflow->pending_msgs=0;
//pthread_mutex_init(&rflow->mutex, NULL);
//rflow->pending_msgs=0;
rflow->host_flowlist=(flist_t*)malloc(sizeof(flist_t));
flist_init(rflow->host_flowlist);
rflow->pkt_list=NULL;
......@@ -1025,20 +1025,20 @@ int mapi_close_flow(int fd)
}
//if (mapiipc_remote_write_to_all(rflow)<0) return -1;
pthread_mutex_lock(&rflow->mutex);
rflow->pending_msgs=0;
//pthread_mutex_lock(&rflow->mutex);
//rflow->pending_msgs=0;
rflow->is_connected=0;
for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
hflow=(host_flow*)fnode->data;
hflow->dbuf->fd=hflow->fd;
if (mapiipc_remote_write(hflow->dbuf, hflow->rhost)<0) {
pthread_mutex_unlock(&rflow->mutex);
//pthread_mutex_unlock(&rflow->mutex);
local_err = MCOM_SOCKET_ERROR;
return -1;
}
rflow->pending_msgs++;
//rflow->pending_msgs++;
}
pthread_mutex_unlock(&rflow->mutex);
//pthread_mutex_unlock(&rflow->mutex);
//no need to wait for results
/*for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=flist_next(fnode)) {
......
......@@ -38,7 +38,6 @@ int main() {
int new_sock = 0; /* client's socket descriptor (from connect()) */
socklen_t clnt_len; /* length of client address data structure */
int yes=1;
unsigned char* arg;
struct sockaddr_in serv_addr;
struct sockaddr_in clnt_addr;
......@@ -105,10 +104,7 @@ int main() {
}
printf("<*> got connection from %s\n", inet_ntoa(clnt_addr.sin_addr));
arg=(unsigned char*)malloc((sizeof(int)+sizeof(clnt_addr.sin_addr)));
memcpy(arg, &new_sock, sizeof(int));
memcpy(arg+sizeof(int), &clnt_addr.sin_addr, sizeof(clnt_addr.sin_addr));
if (pthread_create(&chld_thr, NULL, handle_request, (void *)arg) != 0) {
if (pthread_create(&chld_thr, NULL, handle_request, (void *)new_sock) != 0) {
die("pthread_create() failed");
continue;
}
......@@ -119,7 +115,7 @@ int main() {
void *handle_request(void *arg) {
int sock;
int sock=(int)arg;
int recv_bytes;
//char buffer[DIMAPI_DATA_SIZE]; DELETE
struct dmapiipcbuf *dbuf=NULL;
......@@ -133,12 +129,6 @@ void *handle_request(void *arg) {
mapi_flow_info_t flow_info;
struct timeval tv; /*used for timestamping results when produced */
struct mapipkt *pkt;
struct in_addr client;
char* dev_addr;
memcpy(&sock, (unsigned char*)arg, sizeof(int));
memcpy(&client, ((unsigned char*)arg)+sizeof(int), sizeof(struct in_addr));
free(arg);
/* Guarantees that thread resources are deallocated upon return */
pthread_detach(pthread_self());
......@@ -198,11 +188,7 @@ void *handle_request(void *arg) {
//memcpy(dbuf,buffer,((struct dmapiipcbuf *)buffer)->length); DELETE
switch(dbuf->cmd) {
case CREATE_FLOW:
dev_addr=(char*)malloc( (strlen((char *)dbuf->data)+strlen(inet_ntoa(client)))*sizeof(char) );
strcpy(dev_addr, (char *)dbuf->data); //secure
strcat(dev_addr, "#");
strcat(dev_addr, inet_ntoa(client));
mapid_result = mapi_create_flow(dev_addr);
mapid_result = mapi_create_flow(dbuf->data);
fprintf(stdout,"CREATE_FLOW (%s, %d)\n",dbuf->data, mapid_result);
if(mapid_result <0)
dbuf->cmd = ERROR_ACK;
......@@ -212,7 +198,6 @@ void *handle_request(void *arg) {
active_flows = realloc(active_flows,(ac_fl_size+1)*sizeof(int));
active_flows[ac_fl_size++] = mapid_result;
dbuf->length = BASIC_SIZE+sizeof(int);
free(dev_addr);
break;
case CLOSE_FLOW:
fprintf(stdout,"CLOSE_FLOW (%d)\n",dbuf->fd);
......
......@@ -607,14 +607,6 @@ cmd_create_flow (char *device, int pid, uid_t uid, int sock) /*removed id, id==p
mapidrv *drv;
int err = 0;
char* dev=device;
char* tmp=NULL;
char* client_addr;
tmp=strchr(device, '#');
if (tmp!=NULL) {
*tmp='\0';
client_addr=tmp+1;
}
//flist_node_t *tmpnode = NULL;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment