Commit ff272742 authored by 's avatar

some IPC changes in agent

git-svn-id: file:///home/svn/mapi/trunk@580 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent 00d3bb3e
......@@ -9,6 +9,7 @@
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/time.h>
#include <errno.h>
#include "mapi.h"
#include "mapiipc.h"
#include "parseconf.h"
......@@ -20,7 +21,6 @@
extern void set_agent();
pthread_mutex_t lock;
int service_count;
int dimapi_port;
......@@ -116,11 +116,11 @@ void *handle_request(void *arg) {
int sock;
int recv_bytes;
char buffer[DIMAPI_DATA_SIZE];
//char buffer[DIMAPI_DATA_SIZE]; DELETE
struct dmapiipcbuf *dbuf=NULL;
int mapid_result;
mapi_results_t *result;
unsigned int dbuf_bytes=0;
//unsigned int dbuf_bytes=0; DELETE
int i;
int *active_flows = NULL;
int ac_fl_size=0;
......@@ -142,11 +142,11 @@ void *handle_request(void *arg) {
while(1) {
if (dbuf_bytes==0 || dbuf_bytes<((struct dmapiipcbuf *)buffer)->length){
/*if (dbuf_bytes==0 || dbuf_bytes<((struct dmapiipcbuf *)buffer)->length){
recv_bytes = recv(sock, (char*)buffer+dbuf_bytes, DIMAPI_DATA_SIZE, 0);
if (recv_bytes == 0) { /* the peer has gone */
if (recv_bytes == 0) { // the peer has gone
printf("Peer has gone\n");
break;
}
......@@ -159,9 +159,38 @@ void *handle_request(void *arg) {
}
if (dbuf_bytes<((struct dmapiipcbuf *)buffer)->length) continue;
if (dbuf_bytes<((struct dmapiipcbuf *)buffer)->length) continue;*/
recv_bytes=readn(sock, dbuf, BASIC_SIZE);
if (recv_bytes == 0) { // the peer has gone
printf("Peer has gone\n");
break;
}
else if (recv_bytes == -1) {
die("recv()");
break;
}
if (dbuf->length > DIMAPI_DATA_SIZE) {
fprintf(stderr,"Warning: Ignoring invalid message\n");
continue;
}
memcpy(dbuf,buffer,((struct dmapiipcbuf *)buffer)->length);
if (dbuf->length-BASIC_SIZE>0) {
recv_bytes=readn(sock, (char*)dbuf+BASIC_SIZE, dbuf->length-BASIC_SIZE);
if (recv_bytes == 0) { // the peer has gone
printf("Peer has gone\n");
break;
}
else if (recv_bytes == -1) {
die("recv()");
break;
}
}
//memcpy(dbuf,buffer,((struct dmapiipcbuf *)buffer)->length); DELETE
switch(dbuf->cmd) {
case CREATE_FLOW:
dev_addr=(char*)malloc( (strlen((char *)dbuf->data)+strlen(inet_ntoa(client)))*sizeof(char) );
......@@ -322,22 +351,18 @@ void *handle_request(void *arg) {
die("Default case found in handle_request loop!\n");
break;
}
/*if(dbuf->cmd==CLOSE_FLOW){
fprintf(stdout,"AGENT BREAKING\n");//XXX if only one flow exists
//break;//XXX it should only break if it is the last flow (we don't have that info)
}*/
//no need to send responce on mapi_close_flow
if (dbuf->cmd!=CLOSE_FLOW) {
send(sock, dbuf, dbuf->length, 0);
}
dbuf_bytes=dbuf_bytes-((struct dmapiipcbuf *)buffer)->length;
memcpy(buffer,buffer+((struct dmapiipcbuf *)buffer)->length,dbuf_bytes);
//dbuf_bytes=dbuf_bytes-((struct dmapiipcbuf *)buffer)->length; DELETE
//memcpy(buffer,buffer+((struct dmapiipcbuf *)buffer)->length,dbuf_bytes); DELETE
}
for(i=0;i<ac_fl_size;++i){//close all remaining flows before this thread exits
if(active_flows[dbuf_bytes]>0){//this should always be positive or realloc does not work
if(active_flows[i]>0){//this should always be positive or realloc does not work
mapi_close_flow(active_flows[i]);
}
}
......@@ -372,3 +397,7 @@ int getfid(struct dmapiipcbuf *dbuf){
return (result);
}
......@@ -132,23 +132,25 @@ int mapiipc_remote_write_to_all(remote_flowdescr_t* rflow)
void *mapiipc_comm_thread(void *host) {
//Reads an IPC message. Blocking call
char buffer[DIMAPI_DATA_SIZE];
//char buffer[DIMAPI_DATA_SIZE]; DELETE
struct dmapiipcbuf* dbuf;
remote_flowdescr_t* rflow;
host_flow* hflow;
int recv_bytes;
unsigned int dbuf_bytes=0;
//unsigned int dbuf_bytes=0; DELETE
int pending;
int sockfd=((struct host *)host)->sockfd;
/* Guarantees that thread resources are deallocated upon return */
pthread_detach(pthread_self());
dbuf=(struct dmapiipcbuf*)&buffer[0];
//dbuf=(struct dmapiipcbuf*)&buffer[0]; DELETE
dbuf = (struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf));
while (1) {
if (dbuf_bytes==0 || (dbuf->length<DIMAPI_DATA_SIZE && dbuf_bytes<dbuf->length)) {
if (host==NULL) break;
/*if (dbuf_bytes==0 || (dbuf->length<DIMAPI_DATA_SIZE && dbuf_bytes<dbuf->length)) {
if (host==NULL) return NULL;
recv_bytes=recv(sockfd, (char*)&buffer+dbuf_bytes, DIMAPI_DATA_SIZE, 0);
if (recv_bytes == -1) {
......@@ -166,8 +168,36 @@ void *mapiipc_comm_thread(void *host) {
dbuf_bytes += recv_bytes;
}
if (dbuf_bytes<dbuf->length) continue;
if (dbuf_bytes<dbuf->length) continue;*/
recv_bytes=readn(sockfd, dbuf, BASIC_SIZE);
if (recv_bytes == 0) { // the peer has gone
//printf("Socket closed\n");
break;
}
else if (recv_bytes == -1) {
//ERROR_CMD(printf("recv: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
continue;
}
if (dbuf->length > DIMAPI_DATA_SIZE) {
fprintf(stderr,"Bad IPC message from agent\n");
continue;
}
if (dbuf->length-BASIC_SIZE>0) {
recv_bytes=readn(sockfd, (char*)dbuf+BASIC_SIZE, dbuf->length-BASIC_SIZE);
if (recv_bytes == 0) { // the peer has gone
//printf("Socket closed\n");
break;
}
else if (recv_bytes == -1) {
//ERROR_CMD(printf("recv: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
continue;
}
}
hflow=(host_flow*)flist_get( ((struct host*)host)->flows, dbuf->fd );
if (hflow!=NULL) {
......@@ -193,15 +223,18 @@ void *mapiipc_comm_thread(void *host) {
}
}
else {
printf("Invalid IPC message, unknown fd %d\n",dbuf->fd);
fprintf(stderr,"Invalid IPC message, unknown fd %d\n",dbuf->fd);
//exit(-1);
//failure
continue;
}
dbuf_bytes=dbuf_bytes-dbuf->length;
memcpy(buffer,buffer+dbuf->length,dbuf_bytes);
//dbuf_bytes=dbuf_bytes-dbuf->length; DELETE
//memcpy(buffer,buffer+dbuf->length,dbuf_bytes); DELETE
}
free(dbuf);
return NULL;
}
......@@ -432,3 +465,29 @@ int mapiipc_read_fd(int sock)
return recvfd;
}
/* Read "n" bytes from a socket. */
ssize_t readn(int fd, void *vptr, size_t n) {
size_t nleft;
ssize_t nread;
char *ptr;
ptr = vptr;
nleft = n;
while (nleft > 0) {
errno=0;
if ( (nread = read(fd, ptr, nleft)) < 0) {
if (errno == EINTR)
nread = 0; /* and call read() again */
else
return(-1);
} else if (nread == 0)
return 0; /* EOF */
nleft -= nread;
ptr += nread;
}
return(n - nleft); /* return >= 0 */
}
......@@ -188,6 +188,8 @@ int mapiipc_remote_write_to_all(remote_flowdescr_t* rflow);
void mapiipc_remote_close(struct host *h);
void *mapiipc_comm_thread(void *host);
/* Read "n" bytes from a socket. */
ssize_t readn(int fd, void *vptr, size_t n);
#endif//DIMAPI
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment