Commit c6ebdd28 authored by 's avatar

added a new function - mapi_is_sensor_down() - if mapid and/or mapicommd are...

added a new function - mapi_is_sensor_down() - if mapid and/or mapicommd are out of execution returns 1, otherwise returns 0. Use this function for reconnection purposes

git-svn-id: file:///home/svn/mapi/trunk@1361 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent 522bd37f
......@@ -74,7 +74,6 @@ typedef struct function_data{
#endif
int check_network_mapid(void); // checks if network (mapid) is up. Returns 1 if network is up, 0 otherwise
void restore_network_mapid(void); // restores the connection to mapid in case of a breakdown, using back-off mechanism
void mapi_create_offline_device_mapid(void); // this function recreates all offline devices, that have been created and not deleted
void mapi_recreate_flow_mapid(void); // recreates all flows, on-line and off-line, that have been created and not closed
......@@ -92,6 +91,7 @@ typedef struct flowdescr{
char *devtype;
char *device;
int read_results_flag;
int mapid_down;
char *shm_base;
pthread_mutex_t *shm_spinlock;
flist_t *flist;
......@@ -565,9 +565,27 @@ void *mapiipc_comm_thread(void *host){ // reads an IPC message - blocking call
flist_append(rflow->pkt_list, 0, hflow);
sem_post(&rflow->pkt_sem);
}
#ifdef RECONNECT
else if(dbuf->cmd == READ_RESULT_ACK_RECONNECT)
rflow->daemons_down = 0; // mapid is up and runnung ...
#endif
else {
memcpy( hflow->dbuf, dbuf, dbuf->length ); //place data
sem_post( &rflow->fd_sem );
#ifdef RECONNECT
if(dbuf->cmd == ERROR_ACK){
void *error_num = (void *)malloc(sizeof(int));
memcpy(error_num, dbuf->data, dbuf->length - BASIC_SIZE);
if(*((int *) error_num) == MAPI_READ_RESULT_RECONNECTION){
free(error_num);
rflow->daemons_down = 1; // mapid is down ...
}
}
#endif
}
}
else {
......@@ -682,6 +700,8 @@ void check_for_read_results(struct host *h){
if( (rflow = flist_get(remote_flowlist, hflow->scope_fd)) != NULL){
#ifndef RECONNECT
sleep(1); // XXX
#else
rflow->daemons_down = 1;
#endif
if(sem_getvalue(&rflow->fd_sem, &fd_sem_value) == -1){
printf("ERROR: sem_getvalue() failed (%s) [%s:%d]\n", strerror(errno), __FILE__, __LINE__);
......@@ -691,6 +711,7 @@ void check_for_read_results(struct host *h){
if(fd_sem_value == 0 && hflow->dbuf->cmd == READ_RESULT){
dbuf = (struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf));
errno = MAPI_READ_RESULT_RECONNECTION;
dbuf->cmd = ERROR_ACK;
memcpy(dbuf->data, &errno, sizeof(int));
......@@ -1277,6 +1298,7 @@ void mapi_reconnect(struct host *h){
//printf("\n Flow %d is now connected !!\n", hflow->fd);
rflow->is_connected = 1; // FIXME
rflow->daemons_down = 0;
}
else{
printf("ERROR: Could not re-connect to flow %d [%s:%d]\n", hflow->scope_fd, __FILE__, __LINE__);
......@@ -1862,6 +1884,7 @@ void mapi_reconnect_mapid(void){
if(qbuf.cmd == CONNECT_ACK){
//printf("--> Flow %d is now connected !!\n", flow->fd);
flow->is_connected = 1; // FIXME
flow->mapid_down = 0;
}
else{
printf("ERROR: Could not re-connect to flow %d [%s:%d]\n", flow->fd, __FILE__, __LINE__);
......
......@@ -96,8 +96,9 @@ typedef enum {
SEND_FD,
GET_NEXT_PKT,
GET_NEXT_PKT_ACK,
IGNORE_SLEEP, // reconnection ...
IGNORE_SLEEP, // start reconnection ...
IGNORE_NOTIFY,
READ_RESULT_ACK_RECONNECT, // end reconnection ...
MAPI_STATS,
MAPI_STATS_ACK,
MAPI_STATS_ERR
......@@ -212,6 +213,7 @@ typedef struct remote_flowdescr {
#endif
#ifdef RECONNECT
int to_buffer_fid; // fid returned to user, in response to to_buffer() function apply
int daemons_down;
#endif
struct mapipkt* pkt;
} remote_flowdescr_t;
......@@ -233,6 +235,10 @@ ssize_t SSL_readn(SSL *con, void *vptr, size_t n);
void check_for_read_results(struct host *h);
#endif//DIMAPI
#ifdef RECONNECT
int check_network_mapid(void); // checks if network (mapid) is up. Returns 1 if network is up, 0 otherwise
#endif
#define INT 1
#define STRING 2
......
......@@ -107,6 +107,7 @@ typedef struct flowdescr {
#ifdef RECONNECT
char *device;
int read_results_flag;
int mapid_down;
#endif
char *shm_base;
pthread_spinlock_t *shm_spinlock;
......@@ -600,6 +601,7 @@ int mapi_create_flow(const char *dev)
rflow->vo=NULL;
#ifdef RECONNECT
rflow->password = NULL;
rflow->daemons_down = 0;
#endif
#endif
rflow->pkt=NULL;
......@@ -802,7 +804,8 @@ int mapi_create_flow(const char *dev)
#ifdef RECONNECT
flow->device = strdup(dev);
flow->format = -1; // in case of online flow, assigned to -1
flow->read_results_flag = 0;
flow->read_results_flag = 0;
flow->mapid_down = 0;
#endif
flow->shm_base=NULL;
flow->shm_spinlock=NULL;
......@@ -2075,6 +2078,7 @@ mapi_results_t* mapi_read_results(int fd, int fid)
local_err = MAPI_INVALID_FID_FUNCID;
return NULL;
}
hflow->dbuf->fid = fdata->fid;
hflow->dbuf->length = BASIC_SIZE;
}
......@@ -2154,6 +2158,7 @@ mapi_results_t* mapi_read_results(int fd, int fid)
if(mapiipc_write((struct mapiipcbuf *) &qbuf) < 0){ // send a dummy IPC message to mapid
pthread_spin_unlock(&mapi_lock);
flow->read_results_flag = 1;
flow->mapid_down = 1;
local_err = MAPI_READ_RESULT_RECONNECTION;
return NULL;
}
......@@ -3899,6 +3904,35 @@ int mapi_is_remote(int fd) {
}
}
// if mapid and/or mapicommd are out of execution returns 1, otherwise returns 0
int mapi_is_sensor_down(int fd){
#ifdef DIMAPI
#ifdef RECONNECT
remote_flowdescr_t *rflow = (remote_flowdescr_t *)flist_get(remote_flowlist, fd);
if(rflow != NULL && rflow->daemons_down == 1)
return 1;
#else
return 0; // without reconnect always up ??
#endif
#endif
#ifdef RECONNECT
flowdescr_t *flow = flist_get(flowlist, fd);
int check_mapid;
check_mapid = check_network_mapid();
if(check_mapid == 1 && flow != NULL)
flow->mapid_down = 0;
if(flow != NULL && flow->mapid_down == 1)
return 1;
else
return 0;
#else
return 0; // without reconnect always up ??
#endif
}
int mapi_apply_function_array(int fd, const char* funct, char** args, unsigned int argn)
//Apply function to a mapi flow
......
......@@ -242,6 +242,7 @@ extern int mapi_get_next_flow_info(int fd, mapi_flow_info_t *info);
extern int mapi_get_scope_size(int fd);
extern int mapi_is_remote(int fd);
extern int mapi_is_sensor_down(int fd);
//Get information about a function applied to a flow
extern int mapi_get_function_info(int fd, int fid, mapi_function_info_t *info);
......
......@@ -499,6 +499,29 @@ void *handle_request(void *arg) {
mapi_read_error(&errno, errorstr);
memcpy(dbuf->data, &errno, sizeof(int));
dbuf->length = BASIC_SIZE+sizeof(int);
#ifdef RECONNECT
if(errno == MAPI_READ_RESULT_RECONNECTION){ // mapid is down ...
#ifdef DIMAPISSL
SSL_write(con, dbuf, dbuf->length);
#else
send(sock, dbuf, dbuf->length, 0);
#endif // dummy read result operation to enable reconnection mechanism
result = mapi_read_results(dbuf->fd, dbuf->fid);
if(result != NULL && result->size < DIMAPI_DATA_SIZE){
dbuf->cmd = READ_RESULT_ACK_RECONNECT;
dbuf->timestamp = result->ts;
memcpy(dbuf->data, result->res, result->size);
dbuf->length = BASIC_SIZE + result->size;
}
else{
dbuf->cmd = ERROR_ACK;
mapi_read_error(&errno, errorstr);
memcpy(dbuf->data, &errno, sizeof(int));
dbuf->length = BASIC_SIZE+sizeof(int);
}
}
#endif
}
break;
case GET_NEXT_PKT:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment