Commit 1171cd31 authored by 's avatar
Browse files

Added a new function - mapi_asynchronous_get_next_pkt() - for packet...

Added a new function - mapi_asynchronous_get_next_pkt() - for packet pre-fetching, using the PUSH model

git-svn-id: file:///home/svn/mapi/trunk@1392 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent 42a139cf
...@@ -417,6 +417,17 @@ void mapicommd_logging(int log_to_file, int log_to_syslog, int log_fd, struct dm ...@@ -417,6 +417,17 @@ void mapicommd_logging(int log_to_file, int log_to_syslog, int log_fd, struct dm
log_message("read results (fd: %d, fid: %d) FAILED", dbuf->fd, dbuf->fid); log_message("read results (fd: %d, fid: %d) FAILED", dbuf->fd, dbuf->fid);
break; break;
case GET_NEXT_PKT_ASYN:
if(log_to_file){
file_size = acquire_write_lock(log_fd);
write_to_file(log_fd, "MAPICOMMD: asynchronous get next packet (fd: %d, fid: %d) at ", dbuf->fd, dbuf->fid);
write_date(log_fd); write_newline(log_fd, "\n");
release_write_lock(log_fd, file_size);
}
if(log_to_syslog)
log_message("asynchronous get next packet (fd: %d, fid: %d)", dbuf->fd, dbuf->fid);
break;
case GET_FLOW_INFO: case GET_FLOW_INFO:
case GET_NEXT_FLOW_INFO: case GET_NEXT_FLOW_INFO:
if(log_to_file){ if(log_to_file){
......
This diff is collapsed.
...@@ -30,6 +30,10 @@ ...@@ -30,6 +30,10 @@
#define MAPIDGSOCKHOME "%s/.mapid%d.sock" #define MAPIDGSOCKHOME "%s/.mapid%d.sock"
#define MAPIDGSOCKGLOBAL "/tmp/mapid%d.sock" #define MAPIDGSOCKGLOBAL "/tmp/mapid%d.sock"
#define ASYN_GNP_BUFFER_SIZE 500 // buffer size for mapi_asynchronous_get_next_pkt() function
#define MAX_CAPLEN 1540U
#define ASYN_GNP_THRESHOLD 0.1
//All IPC code needs to be rewritten and cleand up. //All IPC code needs to be rewritten and cleand up.
//To support dynamic loading of new functions we should have an IPC //To support dynamic loading of new functions we should have an IPC
//system that do not need to be changed for each new function type //system that do not need to be changed for each new function type
...@@ -96,6 +100,8 @@ typedef enum { ...@@ -96,6 +100,8 @@ typedef enum {
SEND_FD, SEND_FD,
GET_NEXT_PKT, GET_NEXT_PKT,
GET_NEXT_PKT_ACK, GET_NEXT_PKT_ACK,
GET_NEXT_PKT_ASYN,
GET_NEXT_PKT_ASYN_ACK,
IGNORE_SLEEP, // start reconnection ... IGNORE_SLEEP, // start reconnection ...
IGNORE_NOTIFY, IGNORE_NOTIFY,
READ_RESULT_ACK_RECONNECT, // end reconnection ... READ_RESULT_ACK_RECONNECT, // end reconnection ...
...@@ -141,7 +147,6 @@ int mapiipc_write(struct mapiipcbuf *qbuf); ...@@ -141,7 +147,6 @@ int mapiipc_write(struct mapiipcbuf *qbuf);
//Reads an IPC message. Blocking call. //Reads an IPC message. Blocking call.
int mapiipc_read(struct mapiipcbuf *qbuf); int mapiipc_read(struct mapiipcbuf *qbuf);
//Send a file handle //Send a file handle
int mapiipc_send_fd(int sendfd); int mapiipc_send_fd(int sendfd);
//receive a file handle //receive a file handle
...@@ -159,6 +164,7 @@ struct host { ...@@ -159,6 +164,7 @@ struct host {
int sockfd; int sockfd;
#ifdef DIMAPISSL #ifdef DIMAPISSL
SSL *con; SSL *con;
SSL_CTX *ctx;
#endif #endif
int num_flows; // to know when to close the socket int num_flows; // to know when to close the socket
flist_t *flows; flist_t *flows;
...@@ -166,6 +172,9 @@ struct host { ...@@ -166,6 +172,9 @@ struct host {
pthread_t* comm_thread; // communication thread pthread_t* comm_thread; // communication thread
#ifdef RECONNECT #ifdef RECONNECT
sem_t connection; // use it in mapiipc.c source code file sem_t connection; // use it in mapiipc.c source code file
pthread_mutex_t rec_lock;
pthread_cond_t rec_condition;
int host_down;
#endif #endif
flist_t *stats; //for mapi_stats flist_t *stats; //for mapi_stats
}; };
...@@ -181,7 +190,7 @@ struct dmapiipcbuf { ...@@ -181,7 +190,7 @@ struct dmapiipcbuf {
}; };
#define BASIC_SIZE (sizeof(struct dmapiipcbuf) - DIMAPI_DATA_SIZE) #define BASIC_SIZE (sizeof(struct dmapiipcbuf) - DIMAPI_DATA_SIZE)
#define PKT_LENGTH 131072 //pkt info and actual pkt #define PKT_LENGTH 131072 // pkt info and actual pkt
typedef struct host_flow { typedef struct host_flow {
struct host *rhost; struct host *rhost;
...@@ -189,8 +198,16 @@ typedef struct host_flow { ...@@ -189,8 +198,16 @@ typedef struct host_flow {
int scope_fd; int scope_fd;
int fd; //fd of flow in the mapid of host int fd; //fd of flow in the mapid of host
int id; int id;
int sockfd_asyn;
#ifdef DIMAPISSL
SSL *con_asyn;
SSL_CTX *ctx_asyn;
#endif
pthread_t* asyn_comm_thread;
struct dmapiipcbuf *dbuf; //buffer for writting results from this host -for this flow- struct dmapiipcbuf *dbuf; //buffer for writting results from this host -for this flow-
struct mapipkt* pkt; struct mapipkt* pkt;
struct asyn_mgnp_buffer *asyn_pkts; // used by mapi_asynchronous_get_next_pkt()
pthread_spinlock_t asyn_get_next_pkt_lock; // used by mapi_asynchronous_get_next_pkt()
flist_t *functions; //holds all fids for this host_flow flist_t *functions; //holds all fids for this host_flow
} host_flow; } host_flow;
...@@ -212,19 +229,43 @@ typedef struct remote_flowdescr { ...@@ -212,19 +229,43 @@ typedef struct remote_flowdescr {
#endif #endif
#endif #endif
#ifdef RECONNECT #ifdef RECONNECT
int to_buffer_fid; // fid returned to user, in response to to_buffer() function apply int to_buffer_fid; // fid returned to user, in response to to_buffer() function apply [used by mapi_get_next_pkt()]
int to_buffer_fid_asyn; // fid returned to user, in response to to_buffer() function apply [used by mapi_asynchronous_get_next_pkt()]
int daemons_down; int daemons_down;
#endif #endif
struct mapipkt* pkt; struct mapipkt* pkt;
unsigned char is_asyn_gnp_called; // this should be 1 if the remote flow has called mapi_asynchronous_get_next_pkt(), 0 otherwise
flist_node_t *asyn_mgnp_fnode; // the last host that returned a packet to a client
} remote_flowdescr_t; } remote_flowdescr_t;
struct asyn_mgnp{ // used by mapi_asynchronous_get_next_pkt()
struct dmapiipcbuf *dbuf;
int sock;
int sent_packets;
#ifdef DIMAPISSL
SSL *con;
#endif
};
struct asyn_mgnp_buffer{ // used by mapi_asynchronous_get_next_pkt() function
struct mapipkt **pkts; // buffer for saving packets
int read; // index for read packet
int write; // index for write packet
int size; // the amount of packets that are stored in the buffer
};
//dmapi functions and ipcbuffer //dmapi functions and ipcbuffer
int mapiipc_remote_init(struct host *h); int mapiipc_remote_init(struct host *h);
int mapiipc_remote_init_asyn(struct host *h, struct host_flow *hflow);
int mapiipc_remote_write(struct dmapiipcbuf *dbuf, struct host *h); int mapiipc_remote_write(struct dmapiipcbuf *dbuf, struct host *h);
int mapiipc_remote_write_asyn(struct dmapiipcbuf *dbuf, struct host_flow *hflow);
int mapiipc_remote_write_to_all(remote_flowdescr_t* rflow); int mapiipc_remote_write_to_all(remote_flowdescr_t* rflow);
void mapiipc_remote_close(struct host *h); void mapiipc_remote_close(struct host *h);
void mapiipc_remote_close_asyn(struct host_flow *hflow);
void *mapiipc_comm_thread(void *host); void *mapiipc_comm_thread(void *host);
void *mapiipc_asyn_comm_thread(void *host);
/* Read "n" bytes from a socket. */ /* Read "n" bytes from a socket. */
ssize_t readn(int fd, void *vptr, size_t n); ssize_t readn(int fd, void *vptr, size_t n);
...@@ -239,7 +280,6 @@ void check_for_read_results(struct host *h); ...@@ -239,7 +280,6 @@ void check_for_read_results(struct host *h);
int check_network_mapid(void); // checks if network (mapid) is up. Returns 1 if network is up, 0 otherwise int check_network_mapid(void); // checks if network (mapid) is up. Returns 1 if network is up, 0 otherwise
#endif #endif
#define INT 1 #define INT 1
#define STRING 2 #define STRING 2
#define UNSIGNED_LONG_LONG 3 #define UNSIGNED_LONG_LONG 3
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include <sys/shm.h> #include <sys/shm.h>
#include <sys/sem.h> #include <sys/sem.h>
#include <unistd.h> #include <unistd.h>
#include <ctype.h>
#include "mapi.h" #include "mapi.h"
#include "mapiipc.h" #include "mapiipc.h"
#include "mapilibhandler.h" #include "mapilibhandler.h"
...@@ -77,7 +78,9 @@ extern sem_t stats_sem; ...@@ -77,7 +78,9 @@ extern sem_t stats_sem;
typedef struct function_data{ typedef struct function_data{
int fid; // real fid returned from mapicommd int fid; // real fid returned from mapicommd
int fidseed; // fid returned to mapi user
mapidflib_function_def_t* fdef; // function definition mapidflib_function_def_t* fdef; // function definition
struct dmapiipcbuf *dbuf; // need for asynchronous mapi_read_results
#ifdef RECONNECT #ifdef RECONNECT
host_flow *hflow; // flow of a specified host host_flow *hflow; // flow of a specified host
char args[DIMAPI_DATA_SIZE]; // function's arguments char args[DIMAPI_DATA_SIZE]; // function's arguments
...@@ -446,6 +449,8 @@ static void delete_remote_flow(remote_flowdescr_t* rflow) ...@@ -446,6 +449,8 @@ static void delete_remote_flow(remote_flowdescr_t* rflow)
host_flow* hflow; host_flow* hflow;
flist_node_t* fnode, *fnode2, *fnode3; flist_node_t* fnode, *fnode2, *fnode3;
mapi_results_t* res; mapi_results_t* res;
function_data *fdata;
int count;
pthread_spin_lock(&remote_ipc_lock); pthread_spin_lock(&remote_ipc_lock);
flist_remove(remote_flowlist, rflow->fd); flist_remove(remote_flowlist, rflow->fd);
...@@ -454,16 +459,24 @@ static void delete_remote_flow(remote_flowdescr_t* rflow) ...@@ -454,16 +459,24 @@ static void delete_remote_flow(remote_flowdescr_t* rflow)
sem_destroy(&rflow->fd_sem); sem_destroy(&rflow->fd_sem);
sem_destroy(&rflow->pkt_sem); sem_destroy(&rflow->pkt_sem);
//pthread_mutex_destroy(&rflow->mutex);
for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=fnode2 ) { for (fnode=flist_head(rflow->host_flowlist); fnode!=NULL; fnode=fnode2 ) {
fnode2=flist_next(fnode); fnode2=flist_next(fnode);
hflow=(host_flow*)fnode->data; hflow=(host_flow*)fnode->data;
hflow->rhost->num_flows--; hflow->rhost->num_flows--;
flist_remove(hflow->rhost->flows, hflow->fd); flist_remove(hflow->rhost->flows, hflow->fd);
for (fnode3=flist_head(hflow->functions); fnode3!=NULL; fnode3=flist_next(fnode3)) { for (fnode3=flist_head(hflow->functions); fnode3!=NULL; fnode3=flist_next(fnode3)) {
free( flist_remove(hflow->rhost->functions, ((function_data*)fnode3->data)->fid) );
fdata = (function_data *)flist_remove(hflow->rhost->functions, ((function_data*)fnode3->data)->fidseed);
if(fdata->dbuf != NULL){
free(fdata->dbuf);
fdata->dbuf = NULL;
}
free(fdata);
} }
if (hflow->rhost->num_flows==0) { if (hflow->rhost->num_flows==0) {
pthread_cancel(*hflow->rhost->comm_thread); pthread_cancel(*hflow->rhost->comm_thread);
mapiipc_remote_close(hflow->rhost); //close the socket mapiipc_remote_close(hflow->rhost); //close the socket
...@@ -476,6 +489,8 @@ static void delete_remote_flow(remote_flowdescr_t* rflow) ...@@ -476,6 +489,8 @@ static void delete_remote_flow(remote_flowdescr_t* rflow)
free(hflow->rhost->comm_thread); free(hflow->rhost->comm_thread);
#ifdef RECONNECT #ifdef RECONNECT
sem_destroy(&hflow->rhost->connection); // destroy semaphore sem_destroy(&hflow->rhost->connection); // destroy semaphore
pthread_mutex_destroy(&hflow->rhost->rec_lock);
pthread_cond_destroy(&hflow->rhost->rec_condition);
#endif #endif
free(hflow->rhost); free(hflow->rhost);
} }
...@@ -485,6 +500,21 @@ static void delete_remote_flow(remote_flowdescr_t* rflow) ...@@ -485,6 +500,21 @@ static void delete_remote_flow(remote_flowdescr_t* rflow)
free(hflow->dev); free(hflow->dev);
free(hflow->dbuf); free(hflow->dbuf);
if (hflow->pkt!=NULL) free(hflow->pkt); if (hflow->pkt!=NULL) free(hflow->pkt);
if(hflow->asyn_pkts != NULL){ // release resources of mapi_asynchronous_get_next_pkt()
for(count = 0; count < ASYN_GNP_BUFFER_SIZE; count++)
free(hflow->asyn_pkts->pkts[count]);
free(hflow->asyn_pkts->pkts);
free(hflow->asyn_pkts);
pthread_cancel(*hflow->asyn_comm_thread);
mapiipc_remote_close_asyn(hflow);
free(hflow->asyn_comm_thread);
// TODO close the new socket
}
flist_remove(rflow->host_flowlist,hflow->id); flist_remove(rflow->host_flowlist,hflow->id);
free(hflow); free(hflow);
} }
...@@ -596,6 +626,7 @@ int mapi_create_flow(const char *dev) ...@@ -596,6 +626,7 @@ int mapi_create_flow(const char *dev)
rflow->pkt_list=NULL; rflow->pkt_list=NULL;
rflow->function_res=(flist_t*)malloc(sizeof(flist_t)); rflow->function_res=(flist_t*)malloc(sizeof(flist_t));
rflow->is_connected = 0; rflow->is_connected = 0;
rflow->is_asyn_gnp_called = 0;
#ifdef WITH_AUTHENTICATION #ifdef WITH_AUTHENTICATION
rflow->username=NULL; rflow->username=NULL;
rflow->vo=NULL; rflow->vo=NULL;
...@@ -628,6 +659,9 @@ int mapi_create_flow(const char *dev) ...@@ -628,6 +659,9 @@ int mapi_create_flow(const char *dev)
h->stats=NULL; h->stats=NULL;
#ifdef RECONNECT #ifdef RECONNECT
sem_init(&h->connection, 0, 0); // initialize semaphore sem_init(&h->connection, 0, 0); // initialize semaphore
pthread_mutex_init(&(h->rec_lock), NULL);
pthread_cond_init(&(h->rec_condition), NULL);
h->host_down = 0;
#endif #endif
// Create the socket // Create the socket
if (mapiipc_remote_init(h)<0) { if (mapiipc_remote_init(h)<0) {
...@@ -663,8 +697,10 @@ int mapi_create_flow(const char *dev) ...@@ -663,8 +697,10 @@ int mapi_create_flow(const char *dev)
hflow->fd=negfdseed; hflow->fd=negfdseed;
hflow->dbuf=(struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf)); hflow->dbuf=(struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf));
hflow->pkt=NULL; hflow->pkt=NULL;
hflow->asyn_pkts = NULL;
hflow->rhost=h; hflow->rhost=h;
hflow->functions=(flist_t *)malloc(sizeof(flist_t)); hflow->functions=(flist_t *)malloc(sizeof(flist_t));
pthread_spin_init(&(hflow->asyn_get_next_pkt_lock), PTHREAD_PROCESS_PRIVATE);
flist_init(hflow->functions); flist_init(hflow->functions);
hflow->dbuf->cmd=CREATE_FLOW; hflow->dbuf->cmd=CREATE_FLOW;
...@@ -1312,6 +1348,7 @@ int mapi_close_flow(int fd) ...@@ -1312,6 +1348,7 @@ int mapi_close_flow(int fd)
free(f->funct); free(f->funct);
free(f->data); free(f->data);
free(f); free(f);
f = NULL;
} }
pthread_spin_unlock(&mapi_lock); pthread_spin_unlock(&mapi_lock);
...@@ -1736,7 +1773,10 @@ int mapi_apply_function(int fd, const char* funct, ...) ...@@ -1736,7 +1773,10 @@ int mapi_apply_function(int fd, const char* funct, ...)
case APPLY_FUNCTION_ACK: case APPLY_FUNCTION_ACK:
fdata=(function_data*)malloc(sizeof(function_data)); fdata=(function_data*)malloc(sizeof(function_data));
fdata->fid=hflow->dbuf->fid; fdata->fid=hflow->dbuf->fid;
fdata->fidseed=fidseed;
fdata->fdef=fdef; fdata->fdef=fdef;
fdata->dbuf = (struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf)); // for asynchronous mapi_read_results ...
fdata->dbuf->length = 0;
#ifdef RECONNECT #ifdef RECONNECT
memcpy(fdata->args, qbuf_.data, arg_size); memcpy(fdata->args, qbuf_.data, arg_size);
fdata->hflow = hflow; fdata->hflow = hflow;
...@@ -2332,7 +2372,6 @@ mapi_get_next_pkt(int fd,int fid) ...@@ -2332,7 +2372,6 @@ mapi_get_next_pkt(int fd,int fid)
else { else {
//if no packet arrived yet wait //if no packet arrived yet wait
sem_wait(&rflow->pkt_sem); sem_wait(&rflow->pkt_sem);
hflow=(host_flow*)flist_pop_first(rflow->pkt_list); hflow=(host_flow*)flist_pop_first(rflow->pkt_list);
if (hflow==NULL) { if (hflow==NULL) {
...@@ -2430,9 +2469,293 @@ mapi_get_next_pkt(int fd,int fid) ...@@ -2430,9 +2469,293 @@ mapi_get_next_pkt(int fd,int fid)
local_err = func_err; local_err = func_err;
return NULL; return NULL;
} }
return res.res; return res.res;
} }
// Get the next packet from a to_buffer function, using asynchronous mechanism - PUSH MODEL (only for DiMAPI)
// Arguments ---> fd: flow descriptor, fid: function id of TO_BUFFER function
// Returns a reference to next packet, or NULL on error
struct mapipkt* mapi_asynchronous_get_next_pkt(int fd, int fid){
flowdescr_t *flow;
functdescr_t *f;
mapi_result_t res;
int func_err;
#ifdef DIMAPI
remote_flowdescr_t *rflow;
host_flow *hflow;
flist_node_t *fnode, *temp;
function_data *fdata;
int count, gave_packet = 0, restart = 0;
int sem_flag = 0;
#endif
#ifdef RECONNECT
struct mapiipcbuf qbuf;
#endif
if(!minit){
printf("MAPI not initialized! [%s:%d]\n", __FILE__, __LINE__);
local_err = MAPI_INIT_ERROR;
return NULL;
}
else if(fd <= 0 || fid <= 0){
printf("ERROR: Wrong fd (fd: %d) or fid (fid: %d) in mapi_asynchronous_get_next_pkt\n", fd, fid);
local_err = MAPI_INVALID_FID_FUNCID;
return NULL;
}
#ifdef DIMAPI
if( (rflow = flist_get(remote_flowlist, fd)) != NULL){
if(!rflow->is_connected){
printf("ERROR: In mapi_asynchronous_get_next_pkt always use mapi_connect first\n");
local_err = MAPI_FLOW_NOT_CONNECTED;
return NULL;
}
if(rflow->is_asyn_gnp_called == 0){ // mapi_asynchronous_get_next_pkt() is called for first time for this remote flow
#ifdef RECONNECT
rflow->to_buffer_fid_asyn = fid; // fid of to_buffer() function
#endif
rflow->is_asyn_gnp_called = 1;
rflow->pkt = (struct mapipkt *)malloc(sizeof(struct mapipkt) + PKT_LENGTH);
for(fnode = flist_head(rflow->host_flowlist); fnode != NULL; fnode = flist_next(fnode)){
hflow = (host_flow *)fnode->data;
// allocate memory for each buffer - buffer for saving packets for each host. Every host has its own buffer ...
hflow->asyn_pkts = (struct asyn_mgnp_buffer *)malloc(sizeof(struct asyn_mgnp_buffer));
hflow->asyn_pkts->pkts = (struct mapipkt **)malloc(sizeof(struct mapipkt *) * ASYN_GNP_BUFFER_SIZE);
for(count = 0; count < ASYN_GNP_BUFFER_SIZE; count++)
hflow->asyn_pkts->pkts[count] = (struct mapipkt *)malloc(sizeof(struct mapipkt) + PKT_LENGTH);
hflow->asyn_pkts->read = 0; // index for read packet
hflow->asyn_pkts->write = 0; // index for write packet
hflow->asyn_pkts->size = 0; // the amount of packets that are stored in the buffer
hflow->dbuf->cmd = GET_NEXT_PKT_ASYN;
hflow->dbuf->fd = hflow->fd;
if( (fdata = (function_data *)flist_get(hflow->functions, fid)) == NULL){ // get data of TO_BUFFER function
local_err = MAPI_INVALID_FID_FUNCID;
return NULL;
}
hflow->dbuf->fid = fdata->fid;
// not a valid timestamp. Specifies the maximum amount of packets, that communication agents can send
hflow->dbuf->timestamp = ASYN_GNP_BUFFER_SIZE;
hflow->dbuf->length = BASIC_SIZE;
if(mapiipc_remote_init_asyn(hflow->rhost, hflow) < 0){ // create the new socket (XXX why hflow ?)
local_err = MCOM_SOCKET_ERROR;
printf("ERROR: Could not connect with host %s [%s:%d]\n", hflow->rhost->hostname, __FILE__, __LINE__);
return NULL;
}
hflow->asyn_comm_thread = (pthread_t *)malloc(sizeof(pthread_t)); // specialized thread - only for this function
pthread_create(hflow->asyn_comm_thread, NULL, *mapiipc_asyn_comm_thread, hflow);
if(mapiipc_remote_write_asyn(hflow->dbuf, hflow) < 0){
local_err = MCOM_SOCKET_ERROR;
return NULL;
}
}
rflow->asyn_mgnp_fnode = flist_head(rflow->host_flowlist); // the first host that can return a packet
}
temp = rflow->asyn_mgnp_fnode;
sem_wait(&rflow->pkt_sem); // wait at least one host packet
for(fnode = flist_head(rflow->host_flowlist); fnode != NULL; ){
if(fnode != rflow->asyn_mgnp_fnode && restart == 0){ // find the appropriate host, that should service client with a packet
fnode = flist_next(fnode); // e.g if host "1" gave the last packet, then host "2" must give the next packet
continue;
}
if(restart == 1 && fnode == temp){ // used for "Extreme case 1"
if(sem_flag == 1) // semaphore was decremented, but we will not read another packet.
sem_post(&rflow->pkt_sem); // Thus, we must undo the change in semaphore's value
break;
}
hflow = (host_flow *)fnode->data;
sem_flag = 0;
// read packet from client's buffer
memcpy(rflow->pkt, hflow->asyn_pkts->pkts[hflow->asyn_pkts->read],
sizeof(struct mapipkt) - 4 + hflow->asyn_pkts->pkts[hflow->asyn_pkts->read]->caplen);
if(rflow->pkt->caplen == 0 || hflow->asyn_pkts->size == 0){ // empty host, check the remaining hosts
// 'NULL' packet ... probably from a non-blocking way of reading packets XXX (else)
if(rflow->pkt->caplen == 0 && hflow->asyn_pkts->size != 0){
hflow->asyn_pkts->read++;
pthread_spin_lock(&(hflow->asyn_get_next_pkt_lock));
hflow->asyn_pkts->size--;
pthread_spin_unlock(&(hflow->asyn_get_next_pkt_lock));
// when the buffer becomes empty enough, specified by a threshold (e.g. 10%), we must send a new request to mapicommd
// the buffer should have become previously full
if(hflow->asyn_pkts->read == ASYN_GNP_BUFFER_SIZE && hflow->asyn_pkts->write == ASYN_GNP_BUFFER_SIZE){
pthread_spin_lock(&(hflow->asyn_get_next_pkt_lock));
hflow->asyn_pkts->write = 0; // write again to the start of the buffer
pthread_spin_unlock(&(hflow->asyn_get_next_pkt_lock));
hflow->dbuf->cmd = GET_NEXT_PKT_ASYN; // re-construct message
hflow->dbuf->fd = hflow->fd;
hflow->dbuf->fid = ((function_data *)flist_get(hflow->functions, fid))->fid;
hflow->dbuf->timestamp = ASYN_GNP_BUFFER_SIZE; // number of required packets
hflow->dbuf->length = BASIC_SIZE;
// send request for next packet to specified remote host
if(mapiipc_remote_write_asyn(hflow->dbuf, hflow) < 0){
local_err = MCOM_SOCKET_ERROR;
return NULL;
}
}
if(hflow->asyn_pkts->read == ASYN_GNP_BUFFER_SIZE) // start reading packets from the start of the buffer
hflow->asyn_pkts->read = 0;
if(flist_size(rflow->host_flowlist) == 1){ // this remote flow has only one host ...
return NULL;
}
sem_flag = 1;
sem_wait(&rflow->pkt_sem); // read another packet ...
}
if(flist_next(fnode) != NULL){ // check the next host
rflow->asyn_mgnp_fnode = flist_next(fnode);
fnode = flist_next(fnode);
}
// e.g if host "3" gave us the last packet, and none of the remaining hosts (e.g hosts 4-10) have a available packet,
// then we must examine if hosts 1-3 have a available packet. If not, null is returned indicating an error
else{ // end of list is reached, but no available packet was found - Extreme case 1
restart = 1;
rflow->asyn_mgnp_fnode = flist_head(rflow->host_flowlist); // check the first host again
fnode = flist_head(rflow->host_flowlist);
}
continue;
}
else{
hflow->asyn_pkts->read++;
pthread_spin_lock(&(hflow->asyn_get_next_pkt_lock));
hflow->asyn_pkts->size--;
pthread_spin_unlock(&(hflow->asyn_get_next_pkt_lock));
// when the buffer becomes empty enough, specified by a threshold 10%, we must send a new request to mapicommd
// the buffer should have become previously full
if(hflow->asyn_pkts->read == ASYN_GNP_BUFFER_SIZE && hflow->asyn_pkts->write == ASYN_GNP_BUFFER_SIZE){
pthread_spin_lock(&(hflow->asyn_get_next_pkt_lock));
hflow->asyn_pkts->write = 0; // write again to the start of the buffer
pthread_spin_unlock(&(hflow->asyn_get_next_pkt_lock));
hflow->dbuf->cmd = GET_NEXT_PKT_ASYN; // re-construct message
hflow->dbuf->fd = hflow->fd;
hflow->dbuf->fid = ((function_data *)flist_get(hflow->functions, fid))->fid;
hflow->dbuf->timestamp = ASYN_GNP_BUFFER_SIZE; // number of required packets
hflow->dbuf->length = BASIC_SIZE;
if(mapiipc_remote_write_asyn(hflow->dbuf, hflow) < 0){// send request for next packet to specified remote host
local_err = MCOM_SOCKET_ERROR;
return NULL;
}
}
if(hflow->asyn_pkts->read == ASYN_GNP_BUFFER_SIZE) // start reading packets from the start of the buffer
hflow->asyn_pkts->read = 0;
if(flist_next(fnode) != NULL) rflow->asyn_mgnp_fnode = flist_next(fnode); // check the next host
else rflow->asyn_mgnp_fnode = flist_head(rflow->host_flowlist); // check the first host
gave_packet = 1;
return rflow->pkt;
}
<