Commit 0d2a4d9a authored by 's avatar

return an appropriate error code in case of a break down during a read_result operation

git-svn-id: file:///home/svn/mapi/trunk@1358 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent 10d555ae
......@@ -18,6 +18,7 @@
#include "debug.h"
#include "mapidflib.h"
#include "devgroupdb.h"
#include "mapi_errors.h"
#define HAVE_MSGHDR_MSG_CONTROL 1
......@@ -90,6 +91,7 @@ typedef struct flowdescr{
int numfd; // number of file descriptors
char *devtype;
char *device;
int read_results_flag;
char *shm_base;
pthread_mutex_t *shm_spinlock;
flist_t *flist;
......@@ -125,7 +127,7 @@ flist_t *offline_device_list = NULL; // list which contains all offline devices
int mapiipc_write(struct mapiipcbuf *qbuf){ // sends an IPC message to mapid
qbuf->uid = getuid(); // returns the real user ID of the current process
if(send(sock, qbuf, sizeof(struct mapiipcbuf), MSG_NOSIGNAL) == -1){
//WARNING_CMD(printf("\nsend: %s [%s:%d]\n", strerror(errno), __FILE__, __LINE__));
......@@ -137,53 +139,54 @@ int mapiipc_write(struct mapiipcbuf *qbuf){ // sends an IPC message to mapid
functdescr_t *fun = NULL;
//printf("\n ---> Mapid is down\n");
restore_network_mapid();
mapi_create_offline_device_mapid();
mapi_recreate_flow_mapid();
mapi_reapply_function_mapid();
mapi_reconnect_mapid();
mapi_start_offline_device_mapid();
mapi_read_results_mapid();
// special case ( create flow for a device name returned from mapi_create_offline_device() and start/delete this device )
if(qbuf->cmd == CREATE_FLOW || qbuf->cmd == START_OFFLINE_DEVICE || qbuf->cmd == DELETE_OFFLINE_DEVICE){
// find specified device in offline_device_list
for(fnode = flist_head(offline_device_list); fnode != NULL; fnode = flist_next(fnode)){
device = (offline_device *)fnode->data;
if(qbuf->cmd == READ_RESULT && qbuf->size == 1) // 1st attempt after break down ...
return -1;
else{
restore_network_mapid();
mapi_create_offline_device_mapid();
mapi_recreate_flow_mapid();
mapi_reapply_function_mapid();
mapi_reconnect_mapid();
mapi_start_offline_device_mapid();
mapi_read_results_mapid();
// special case ( create flow for a device name returned from mapi_create_offline_device() and start/delete this device )
if(qbuf->cmd == CREATE_FLOW || qbuf->cmd == START_OFFLINE_DEVICE || qbuf->cmd == DELETE_OFFLINE_DEVICE){
if(!strcmp(device->previous_device, (char *) qbuf->data)){
strncpy((char *) qbuf->data, device->new_device, DATA_SIZE); // the new device name
break;
// find specified device in offline_device_list
for(fnode = flist_head(offline_device_list); fnode != NULL; fnode = flist_next(fnode)){
device = (offline_device *)fnode->data;
if(!strcmp(device->previous_device, (char *) qbuf->data)){
strncpy((char *) qbuf->data, device->new_device, DATA_SIZE); // the new device name
break;
}
}
}
}
// FIXME (CLOSE_FLOW ...)
else if(qbuf->cmd == GET_FLOW_INFO || qbuf->cmd == GET_NEXT_FLOW_INFO || qbuf->cmd == CONNECT || qbuf->cmd == APPLY_FUNCTION){
flow = flist_get(flowlist, qbuf->user_fd); // we must send the new fd of the flow, in the below send operation
qbuf->fd = flow->fd;
}
else if(qbuf->cmd == GET_FUNCTION_INFO || qbuf->cmd == GET_NEXT_FUNCTION_INFO){
flow = flist_get(flowlist, qbuf->user_fd); // we must send the new fd of the flow, in the below send operation
fun = flist_get(flow->flist, qbuf->user_fid); // and the new fid of the function
qbuf->fd = flow->fd;
qbuf->fid = fun->fid;
}
else if(qbuf->cmd == READ_RESULT && qbuf->fd != -1 && qbuf->fid != -1){
flow = flist_get(flowlist, qbuf->user_fd); // we must send the new fd of the flow, in the below send operation
fun = flist_get(flow->flist, qbuf->user_fid); // and the new fid of the function
// FIXME (CLOSE_FLOW ...)
else if(qbuf->cmd == GET_FLOW_INFO || qbuf->cmd == GET_NEXT_FLOW_INFO || qbuf->cmd == CONNECT || qbuf->cmd == APPLY_FUNCTION){
flow = flist_get(flowlist, qbuf->user_fd); // we must send the new fd of the flow, in the below send operation
qbuf->fd = flow->fd;
}
else if(qbuf->cmd == GET_FUNCTION_INFO || qbuf->cmd == GET_NEXT_FUNCTION_INFO){
flow = flist_get(flowlist, qbuf->user_fd); // we must send the new fd of the flow, in the below send operation
fun = flist_get(flow->flist, qbuf->user_fid); // and the new fid of the function
qbuf->fd = flow->fd;
qbuf->fid = fun->fid;
}
else if(qbuf->cmd == READ_RESULT && qbuf->fd != -1 && qbuf->fid != -1){
flow = flist_get(flowlist, qbuf->user_fd); // we must send the new fd of the flow, in the below send operation
fun = flist_get(flow->flist, qbuf->user_fid); // and the new fid of the function
qbuf->fd = flow->fd;
qbuf->fid = fun->fid;
}
qbuf->fd = flow->fd;
qbuf->fid = fun->fid;
send(sock, qbuf, sizeof(struct mapiipcbuf), MSG_NOSIGNAL);
}
send(sock, qbuf, sizeof(struct mapiipcbuf), MSG_NOSIGNAL);
#else
return -1;
#endif
......@@ -388,6 +391,7 @@ void *mapiipc_comm_thread(void *host){ // reads an IPC message - blocking call
continue;
}
#else
check_for_read_results((struct host *) host);
break;
#endif
}
......@@ -480,6 +484,7 @@ void *mapiipc_comm_thread(void *host){ // reads an IPC message - blocking call
continue;
}
#else
check_for_read_results((struct host *) host);
break;
#endif
}
......@@ -602,20 +607,6 @@ int mapiipc_remote_init(struct host *h)
return -1;
}
/*#ifdef RECONNECT
tv.tv_sec = 40; // 40 seconds timeout
tv.tv_usec = 0;
// SO_RCVTIMEO is an option to set a timeout value for input operations
if(setsockopt(h->sockfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval)) == -1){
close(h->sockfd);
printf("Unexpected error on setsockopt() : %s", strerror(errno));
return -1;
}
#endif
*/
tv.tv_sec=10; //timeout 10 sec for send
tv.tv_usec=0;
......@@ -676,6 +667,45 @@ void mapiipc_remote_close(struct host *h)
#endif
}
void check_for_read_results(struct host *h){
int fd_sem_value, errno;
struct dmapiipcbuf *dbuf = NULL;
flist_node_t *fnode = NULL;
host_flow *hflow = NULL;
remote_flowdescr_t *rflow = NULL;
for(fnode = flist_head(h->flows); fnode != NULL; fnode = flist_next(fnode)){ // iterate flow list of the specified host
hflow = (host_flow *)fnode->data;
if( (rflow = flist_get(remote_flowlist, hflow->scope_fd)) != NULL){
#ifndef RECONNECT
sleep(1); // XXX
#endif
if(sem_getvalue(&rflow->fd_sem, &fd_sem_value) == -1){
printf("ERROR: sem_getvalue() failed (%s) [%s:%d]\n", strerror(errno), __FILE__, __LINE__);
break;
}
if(fd_sem_value == 0 && hflow->dbuf->cmd == READ_RESULT){
dbuf = (struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf));
errno = MAPI_READ_RESULT_RECONNECTION;
dbuf->cmd = ERROR_ACK;
memcpy(dbuf->data, &errno, sizeof(int));
dbuf->length = BASIC_SIZE + sizeof(int);
memcpy(hflow->dbuf, dbuf, dbuf->length);
sem_post(&rflow->fd_sem); // critical point ...
free(dbuf);
break;
}
}
}
return;
}
#endif /* DIMAPI */
......@@ -940,6 +970,9 @@ void restore_network_mapicommd(struct host *h){
for(tries = 0; ; tries++){
if(tries == 1) // in case of read_results return NULL to application and then continue calling reconnection functions ...
check_for_read_results(h);
//printf("\n---> Reconnection try #%d", tries);
check_net = check_network_mapicommd(h);
fflush(stdout);
......
......@@ -230,6 +230,7 @@ ssize_t readn(int fd, void *vptr, size_t n);
ssize_t SSL_readn(SSL *con, void *vptr, size_t n);
#endif
void check_for_read_results(struct host *h);
#endif//DIMAPI
......
......@@ -114,6 +114,8 @@
6156 = "Error sending file descriptor"
#define MAPI_DEVICE_INFO_ERR 6157
6149 = "Error accessing device information"
#define MAPI_READ_RESULT_RECONNECTION 6158
6158 = "mapi_read_results returns NULL in case of reconnection"
#define MFUNCT_INVALID_ARGUMENT 7000
7000 = "Invalid argument passed to function"
#define MFUNCT_INVALID_ARGUMENT_1 7001
......
......@@ -106,6 +106,7 @@ typedef struct flowdescr {
char *devtype;
#ifdef RECONNECT
char *device;
int read_results_flag;
#endif
char *shm_base;
pthread_spinlock_t *shm_spinlock;
......@@ -801,6 +802,7 @@ int mapi_create_flow(const char *dev)
#ifdef RECONNECT
flow->device = strdup(dev);
flow->format = -1; // in case of online flow, assigned to -1
flow->read_results_flag = 0;
#endif
flow->shm_base=NULL;
flow->shm_spinlock=NULL;
......@@ -2140,11 +2142,19 @@ mapi_results_t* mapi_read_results(int fd, int fid)
qbuf.fid = -1;
qbuf.pid = getpid();
if(flow->read_results_flag == 0)
qbuf.size = 1; // use it in mapiipc_remote_write()
else{
qbuf.size = 0;
flow->read_results_flag = 0; // initialize for subsequent calls
}
pthread_spin_lock(&mapi_lock);
if(mapiipc_write((struct mapiipcbuf *) &qbuf) < 0){ // send a dummy IPC message to mapid
pthread_spin_unlock(&mapi_lock);
local_err = MCOM_SOCKET_ERROR;
flow->read_results_flag = 1;
local_err = MAPI_READ_RESULT_RECONNECTION;
return NULL;
}
if(mapiipc_read((struct mapiipcbuf *) &qbuf) < 0){ // read a dummy IPC message from mapid (blocking call)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment