Commit 5783ad06 authored by 's avatar
Browse files

fixed bugs in reconnection

git-svn-id: file:///home/svn/mapi/trunk@1153 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent 5dffe5d9
......@@ -129,58 +129,60 @@ int mapiipc_write(struct mapiipcbuf *qbuf){ // sends an IPC message to mapid
WARNING_CMD(printf("send: %s [%s:%d]\n", strerror(errno), __FILE__, __LINE__));
#ifdef RECONNECT
int check_net;
offline_device *device = NULL;
flist_node_t *fnode = NULL;
flowdescr_t* flow = NULL;
functdescr_t *fun = NULL;
printf("\n ---> Mapid is down\n");
check_net = check_network_mapid();
if(check_net == 1) // network is up FIXME XXX
printf("Network up ...\n");
else{ // network is down
//printf("\nNetwork down ...\n");
restore_network_mapid();
mapi_create_offline_device_mapid();
mapi_recreate_flow_mapid();
mapi_reapply_function_mapid();
mapi_reconnect_mapid();
mapi_start_offline_device_mapid();
mapi_read_results_mapid();
// special case ( create flow for a device name returned from mapi_create_offline_device() and start/delete this device )
if(qbuf->cmd == CREATE_FLOW || qbuf->cmd == START_OFFLINE_DEVICE || qbuf->cmd == DELETE_OFFLINE_DEVICE){
// find specified device in offline_device_list
for(fnode = flist_head(offline_device_list); fnode != NULL; fnode = flist_next(fnode)){
device = (offline_device *)fnode->data;
if(!strcmp(device->previous_device, (char *) qbuf->data)){
strncpy((char *) qbuf->data, device->new_device, DATA_SIZE); // the new device name
break;
}
}
}
else if(qbuf->cmd == GET_FLOW_INFO || qbuf->cmd == GET_NEXT_FLOW_INFO || // FIXME (CLOSE_FLOW ...)
qbuf->cmd == CONNECT || qbuf->cmd == APPLY_FUNCTION){
restore_network_mapid();
mapi_create_offline_device_mapid();
mapi_recreate_flow_mapid();
mapi_reapply_function_mapid();
mapi_reconnect_mapid();
mapi_start_offline_device_mapid();
mapi_read_results_mapid();
// special case ( create flow for a device name returned from mapi_create_offline_device() and start/delete this device )
if(qbuf->cmd == CREATE_FLOW || qbuf->cmd == START_OFFLINE_DEVICE || qbuf->cmd == DELETE_OFFLINE_DEVICE){
flow = flist_get(flowlist, qbuf->user_fd); // we must send the new fd of the flow, in the below send operation
qbuf->fd = flow->fd;
}
else if(qbuf->cmd == GET_FUNCTION_INFO || qbuf->cmd == GET_NEXT_FUNCTION_INFO){
// find specified device in offline_device_list
for(fnode = flist_head(offline_device_list); fnode != NULL; fnode = flist_next(fnode)){
flow = flist_get(flowlist, qbuf->user_fd); // we must send the new fd of the flow, in the below send operation
fun = flist_get(flow->flist, qbuf->user_fid); // and the new fid of the function
qbuf->fd = flow->fd;
qbuf->fid = fun->fid;
device = (offline_device *)fnode->data;
if(!strcmp(device->previous_device, (char *) qbuf->data)){
strncpy((char *) qbuf->data, device->new_device, DATA_SIZE); // the new device name
break;
}
}
}
// FIXME (CLOSE_FLOW ...)
else if(qbuf->cmd == GET_FLOW_INFO || qbuf->cmd == GET_NEXT_FLOW_INFO || qbuf->cmd == CONNECT || qbuf->cmd == APPLY_FUNCTION){
flow = flist_get(flowlist, qbuf->user_fd); // we must send the new fd of the flow, in the below send operation
qbuf->fd = flow->fd;
}
else if(qbuf->cmd == GET_FUNCTION_INFO || qbuf->cmd == GET_NEXT_FUNCTION_INFO){
send(sock, qbuf, sizeof(struct mapiipcbuf), MSG_NOSIGNAL);
flow = flist_get(flowlist, qbuf->user_fd); // we must send the new fd of the flow, in the below send operation
fun = flist_get(flow->flist, qbuf->user_fid); // and the new fid of the function
qbuf->fd = flow->fd;
qbuf->fid = fun->fid;
}
else if(qbuf->cmd == READ_RESULT && qbuf->fd != -1 && qbuf->fid != -1){
flow = flist_get(flowlist, qbuf->user_fd); // we must send the new fd of the flow, in the below send operation
fun = flist_get(flow->flist, qbuf->user_fid); // and the new fid of the function
qbuf->fd = flow->fd;
qbuf->fid = fun->fid;
}
send(sock, qbuf, sizeof(struct mapiipcbuf), MSG_NOSIGNAL);
#else
return -1;
#endif
......@@ -311,6 +313,7 @@ void *mapiipc_comm_thread(void *host){ // reads an IPC message - blocking call
#ifdef RECONNECT
int check_net;
struct dmapiipcbuf dbuf_;
#endif
/* Guarantees that thread resources are deallocated upon return */
......@@ -346,12 +349,25 @@ void *mapiipc_comm_thread(void *host){ // reads an IPC message - blocking call
else{ // network is down
printf("\nNetwork down ...\n");
restore_network_mapicommd((struct host *) host);
dbuf_.cmd = IGNORE_SLEEP;
dbuf_.length = BASIC_SIZE;
if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0) // send an IPC message to mapicommd
break;
mapi_recreate_flow((struct host *) host);
mapi_reapply_function((struct host *) host);
mapi_reconnect((struct host *) host);
#ifdef WITH_AUTHENTICATION
mapi_reauthenticate((struct host *) host);
#endif
dbuf_.cmd = IGNORE_NOTIFY;
dbuf_.length = BASIC_SIZE;
if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0) // send an IPC message to mapicommd
break;
check_mapi_functions((struct host *) host);
mapi_get_next_packet((struct host *) host);
sem_trywait(& ((struct host *) host)->connection); // lock the semaphore only if the semaphore is currently not locked
......@@ -373,12 +389,25 @@ void *mapiipc_comm_thread(void *host){ // reads an IPC message - blocking call
else{ // network is down
printf("\nNetwork down ...\n");
restore_network_mapicommd((struct host *) host);
dbuf_.cmd = IGNORE_SLEEP;
dbuf_.length = BASIC_SIZE;
if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0) // send an IPC message to mapicommd
break;
mapi_recreate_flow((struct host *) host);
mapi_reapply_function((struct host *) host);
mapi_reconnect((struct host *) host);
#ifdef WITH_AUTHENTICATION
mapi_reauthenticate((struct host *) host);
#endif
dbuf_.cmd = IGNORE_NOTIFY;
dbuf_.length = BASIC_SIZE;
if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0) // send an IPC message to mapicommd
break;
check_mapi_functions((struct host *) host);
mapi_get_next_packet((struct host *) host);
sem_trywait(& ((struct host *) host)->connection); // lock the semaphore only if the semaphore is currently not locked
......@@ -412,12 +441,25 @@ void *mapiipc_comm_thread(void *host){ // reads an IPC message - blocking call
else{ // network is down
printf("\nNetwork down ...\n");
restore_network_mapicommd((struct host *) host);
dbuf_.cmd = IGNORE_SLEEP;
dbuf_.length = BASIC_SIZE;
if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0) // send an IPC message to mapicommd
break;
mapi_recreate_flow((struct host *) host);
mapi_reapply_function((struct host *) host);
mapi_reconnect((struct host *) host);
#ifdef WITH_AUTHENTICATION
mapi_reauthenticate((struct host *) host);
#endif
dbuf_.cmd = IGNORE_NOTIFY;
dbuf_.length = BASIC_SIZE;
if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0) // send an IPC message to mapicommd
break;
check_mapi_functions((struct host *) host);
mapi_get_next_packet((struct host *) host);
sem_trywait(& ((struct host *) host)->connection); // lock the semaphore only if the semaphore is currently not locked
......@@ -439,12 +481,25 @@ void *mapiipc_comm_thread(void *host){ // reads an IPC message - blocking call
else{ // network is down
printf("\nNetwork down ...\n");
restore_network_mapicommd((struct host *) host);
dbuf_.cmd = IGNORE_SLEEP;
dbuf_.length = BASIC_SIZE;
if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0) // send an IPC message to mapicommd
break;
mapi_recreate_flow((struct host *) host);
mapi_reapply_function((struct host *) host);
mapi_reconnect((struct host *) host);
#ifdef WITH_AUTHENTICATION
mapi_reauthenticate((struct host *) host);
#endif
dbuf_.cmd = IGNORE_NOTIFY;
dbuf_.length = BASIC_SIZE;
if(mapiipc_remote_write(&dbuf_, (struct host *) host) < 0) // send an IPC message to mapicommd
break;
check_mapi_functions((struct host *) host);
mapi_get_next_packet((struct host *) host);
sem_trywait(& ((struct host *) host)->connection); // lock the semaphore only if the semaphore is currently not locked
......@@ -834,6 +889,7 @@ int check_network_mapicommd(struct host *h){
if(connect(sockfd, (struct sockaddr *)&remote_address, sizeof(remote_address)) < 0) // remote mapid server is down ...
return 0;
shutdown(sockfd, SHUT_RDWR);
close(sockfd);
return 1;
}
......@@ -1094,7 +1150,7 @@ void mapi_reapply_function(struct host *h){
}
else{
fprintf(stderr, "\nError: Could not re-apply function %s in host %s [%s : %d]\n", fdata->fdef->name, hflow->rhost->hostname,
__FILE__, __LINE__);
__FILE__, __LINE__);
free(dbuf); dbuf = NULL;
exit(EXIT_FAILURE);
}
......
......@@ -93,7 +93,9 @@ typedef enum {
GET_LIBS_NACK,
SEND_FD,
GET_NEXT_PKT,
GET_NEXT_PKT_ACK
GET_NEXT_PKT_ACK,
IGNORE_SLEEP, // reconnection ...
IGNORE_NOTIFY
} mapiipcMsg;
......
......@@ -155,7 +155,7 @@ typedef struct shm_result {
static int
default_read_result_init(flowdescr_t *flow,functdescr_t* f,void* data);
int
get_results_info(flowdescr_t *flow, functdescr_t *f);
get_results_info(flowdescr_t *flow, functdescr_t *f, int user_fd, int user_fid);
//static int set_error(void* flow, int err_no , int is_remote);
......@@ -1878,12 +1878,16 @@ int mapi_apply_function(int fd, const char* funct, ...)
return qbuf.fid;
}
int _request_result(flowdescr_t *flow,functdescr_t *f, struct mapiipcbuf *qbuf)
int _request_result(flowdescr_t *flow,functdescr_t *f, struct mapiipcbuf *qbuf, int user_fd, int user_fid)
{
qbuf->mtype=1;
qbuf->cmd=READ_RESULT;
qbuf->fd=flow->fd;
qbuf->fid=f->fid;
#ifdef RECONNECT
qbuf->user_fd = user_fd;
qbuf->user_fid = user_fid;
#endif
qbuf->pid=getpid();
pthread_spin_lock(&mapi_lock);
......@@ -1912,7 +1916,7 @@ int _request_result(flowdescr_t *flow,functdescr_t *f, struct mapiipcbuf *qbuf)
return 0;
}
int get_results_info(flowdescr_t *flow,functdescr_t *f)
int get_results_info(flowdescr_t *flow,functdescr_t *f, int user_fd, int user_fid)
{
struct mapiipcbuf qbuf;
......@@ -1922,7 +1926,7 @@ int get_results_info(flowdescr_t *flow,functdescr_t *f)
return -1;
}
if (_request_result(flow,f,&qbuf)!=0)
if (_request_result(flow,f,&qbuf, user_fd, user_fid)!=0)
return -1;
default_read_result_init(flow,f,&qbuf.data);
......@@ -2055,7 +2059,7 @@ mapi_results_t* mapi_read_results(int fd, int fid)
if(f!=NULL)
{
if(!f->result_init)
if (get_results_info(flow,f) != 0)
if (get_results_info(flow,f, fd, fid) != 0)
return NULL;
if(f->def->client_init==NULL){
......@@ -2086,15 +2090,15 @@ mapi_results_t* mapi_read_results(int fd, int fid)
}
pthread_spin_unlock(&mapi_lock);
#endif
if(f->data == NULL)
if(f->data == NULL)
return(0);
else // FIXME in case of reconnection
else // FIXME in case of reconnection
{
if (f->def->restype==MAPIRES_IPC) {
struct mapiipcbuf qbuf;
void *data = (char *)&qbuf.data + 2*sizeof(mapid_shm_t); // get to the actual data
if (_request_result(flow,f,&qbuf) != 0) {
if (_request_result(flow,f,&qbuf, fd, fid) != 0) {
local_err = MCOM_UNKNOWN_ERROR;
return NULL;
}
......@@ -2270,7 +2274,7 @@ mapi_get_next_pkt(int fd,int fid)
// This should be attaching shared memory segment with results
if( !f->result_init )
if (get_results_info(flow,f) != 0) {
if (get_results_info(flow,f, fd, fid) != 0) {
DEBUG_CMD(printf("Missing error message [%s:%d]\n",__FILE__,__LINE__));
return NULL;
}
......
......@@ -24,7 +24,6 @@
extern void set_agent();
//pthread_mutex_t lock; DELETE
int service_count;
int dimapi_port;
......@@ -66,8 +65,6 @@ int main() {
signal (SIGQUIT, mapicommd_shutdown);
signal (SIGINT, mapicommd_shutdown);
#endif
//mapi_conf = malloc(sizeof(CONF_FILE)-2+strlen(getenv("HOME")));
//sprintf(mapi_conf,CONF_FILE,getenv("HOME"));
mapi_conf = printf_string( CONFDIR"/"CONF_FILE );
printf("using %s\n", mapi_conf);
......@@ -187,11 +184,9 @@ void *handle_request(void *arg) {
int sock=(int)arg;
#endif
int recv_bytes;
//char buffer[DIMAPI_DATA_SIZE]; DELETE
struct dmapiipcbuf *dbuf=NULL;
int mapid_result;
mapi_results_t *result;
//unsigned int dbuf_bytes=0; DELETE
int i;
int *active_flows = NULL;
int ac_fl_size=0;
......@@ -201,11 +196,14 @@ void *handle_request(void *arg) {
struct mapipkt *pkt;
int errno;
char errorstr[MAPI_ERRORSTR_LENGTH];
#ifdef RECONNECT
int flag = 0;
#endif
/* Guarantees that thread resources are deallocated upon return */
pthread_detach(pthread_self());
dbuf = (struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf));
#ifdef DIMAPISSL
printf("<+> new thread %d, socket number = %d\n", (int)pthread_self(),(int) con);
......@@ -219,25 +217,6 @@ void *handle_request(void *arg) {
while(1) {
/*if (dbuf_bytes==0 || dbuf_bytes<((struct dmapiipcbuf *)buffer)->length){
recv_bytes = recv(sock, (char*)buffer+dbuf_bytes, DIMAPI_DATA_SIZE, 0);
if (recv_bytes == 0) { // the peer has gone
printf("Peer has gone\n");
break;
}
else if (recv_bytes == -1) {
die("recv()");
break;
}
dbuf_bytes += recv_bytes;
}
if (dbuf_bytes<((struct dmapiipcbuf *)buffer)->length) continue;*/
#ifdef DIMAPISSL
recv_bytes = SSL_readn(con,dbuf,BASIC_SIZE);
#else
......@@ -272,8 +251,21 @@ void *handle_request(void *arg) {
break;
}
}
#ifdef RECONNECT
if(dbuf->cmd == IGNORE_SLEEP){ // ignore some messages
flag = 1;
continue;
}
if(dbuf->cmd == IGNORE_NOTIFY){ // accept all kind of messages
flag = 0;
continue;
}
//memcpy(dbuf,buffer,((struct dmapiipcbuf *)buffer)->length); DELETE
if(dbuf->cmd != CREATE_FLOW && dbuf->cmd != APPLY_FUNCTION && dbuf->cmd != CONNECT && dbuf->cmd != AUTHENTICATE && flag == 1)
continue;
#endif
switch(dbuf->cmd) {
case CREATE_FLOW:
mapid_result = mapi_create_flow(dbuf->data);
......@@ -334,10 +326,9 @@ void *handle_request(void *arg) {
}
break;
case READ_RESULT:
//fprintf(stdout,"READ_RESULT\n");
dbuf->cmd = READ_RESULT_ACK;
result = mapi_read_results(dbuf->fd,dbuf->fid);
if(result!=NULL){
dbuf->cmd = READ_RESULT_ACK;
dbuf->timestamp = result->ts;
memcpy(dbuf->data, result->res, result->size);
dbuf->length = BASIC_SIZE + result->size;
......@@ -351,7 +342,6 @@ void *handle_request(void *arg) {
}
break;
case GET_NEXT_PKT:
//fprintf(stdout,"GET_NEXT_PKT\n");
pkt = (struct mapipkt *)mapi_get_next_pkt(dbuf->fd,dbuf->fid);
gettimeofday(&tv, NULL);
dbuf->timestamp = tv.tv_usec;
......@@ -450,8 +440,6 @@ void *handle_request(void *arg) {
send(sock,dbuf, dbuf->length,0);
#endif
}
//dbuf_bytes=dbuf_bytes-((struct dmapiipcbuf *)buffer)->length; DELETE
//memcpy(buffer,buffer+((struct dmapiipcbuf *)buffer)->length,dbuf_bytes); DELETE
}
for(i=0;i<ac_fl_size;++i){//close all remaining flows before this thread exits
......@@ -461,6 +449,7 @@ void *handle_request(void *arg) {
}
free(active_flows);
free(dbuf);
dbuf = NULL;
shutdown(sock, SHUT_RDWR);
close(sock);
......@@ -474,9 +463,7 @@ void *handle_request(void *arg) {
#endif
/* update the global service counter */
//pthread_mutex_lock(&lock); DELETE
service_count++;
//pthread_mutex_unlock(&lock); DELETE
printf("<+> thread %d exiting\n<+> total sockets served: %d\n", (int)pthread_self(), service_count);
pthread_exit((void *)0);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment