Commit 27145c28 authored by 's avatar
Browse files

Added devgroup parameter, which enables running of concurrent instances of...

Added devgroup parameter, which enables running of concurrent instances of mapid sharing the same configuration, but handling (locally) different devices (performance reasons).
Changes in socket names to enable concurrent run.
Added /tmp/mapid.socks to enable clients to choose the appropriate service (by device name).


git-svn-id: file:///home/svn/mapi/trunk@1208 8d5bb341-7cf1-0310-8cf6-ba355fef3186
parent e8b1fd5a
......@@ -35,7 +35,8 @@ mapid_LDADD = \
common/libflist.la \
common/libmapiipc.la \
common/libparseconf.la \
common/libprintfstring.la -lpthread -ldl
common/libprintfstring.la -lpthread -ldl \
common/libdevgroupdb.la
mapid_SOURCES = mapid.c mapid.h mapidevices.h debug.h
if BINARY_DIMAPI
......@@ -46,6 +47,7 @@ common/libfhelp.la \
common/libflist.la \
common/libparseconf.la \
common/libprintfstring.la \
common/libdevgroupdb.la \
lib/libmapi.la -lpthread
mapicommd_SOURCES = mapicommd.c
endif
......
## Process this file with automake to produce Makefile.in
AM_CPPFLAGS = -I$(srcdir)/.. -I$(srcdir)/../lib -I$(srcdir)/../drivers @SSLINC@ -D_GNU_SOURCE -D_THREAD_SAFE
noinst_LTLIBRARIES = libflist.la libfhelp.la libmapiipc.la libparseconf.la libprintfstring.la libmapilibhandler.la libmsearch.la libacsmx2.la libcbuf.la
noinst_LTLIBRARIES = libflist.la libfhelp.la libmapiipc.la libparseconf.la libprintfstring.la libmapilibhandler.la libmsearch.la libacsmx2.la libcbuf.la libdevgroupdb.la
libflist_la_SOURCES = flist.c flist.h
libflist_la_DEPENDENCIES = mapi_errors.h
libfhelp_la_SOURCES = fhelp.c fhelp.h
......@@ -13,6 +13,7 @@ libmapilibhandler_la_SOURCES = mapilibhandler.c mapilibhandler.h
libmsearch_la_SOURCES = mstring.c mstring.h
libacsmx2_la_SOURCES = acsmx2.c acsmx2.h
libcbuf_la_SOURCES = cbuf.c cbuf.h kernel_memory.h
libdevgroupdb_la_SOURCES = devgroupdb.c devgroupdb.h
noinst_HEADERS = mapidflib.h mapi_errors.h
......
/*
* File name: devgroupdb.c
* Date: 2007/04/16 21:10
* Author:
*/
/*
Enables concurrent run of more than one instance of mapid.
Each instance can be assigned to different CPU to increase
throughput.
It does not affect DiMAPI at all (it does not use local socket).
It does affect offline flows. Clients running offline flows are
always connected to mapid binding devgroup 0 (or none).
Each device configured in mapid.conf is supposed to be assigned
to some group, config. option is "devgroup=N". N is an integer.
[driver]
...
devgroup=1
Each instance of mapid can be started with devgroup set to some
of these numbers, which means that only appropriate devices are
binded with this instance.
Socket names are generated as follows:
/usr/local/bin/mapid
/tmp/mapid.sock or $HOME/.mapid.sock if configured as "local"
/usr/local/bin/mapid --devgroup N
/tmp/mapidN.sock or $HOME/.mapidN.sock if configured as "local"
N is an integer.
Each instance of mapid saves devices of its devgroup to file
/tmp/mapid.socks or $HOME/.mapid.socks if configured as "local"
as records of pairs "device groupid".
e.g.
eth0 1
/dev/dag0 2
Client application when creating flow (mapi_create_flow) chooses
the appropriate instance of mapid automatically, as it is looked
up in this file.
Devices in mapid.socks can be hijacked by another process (e.g.
if configuration changes), so no cleanup is applied after exit.
*/
/* Implementation dependencies --------------------------------------------- */
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include "mapi.h"
#include "mapi_errors.h"
#include "devgroupdb.h"
/* End implementation dependencies ----------------------------------------- */
/* Constructor for database instance. */
struct devgroupdb *new_devgroupdb() {
struct devgroupdb *db = NULL;
db = (struct devgroupdb *) malloc(sizeof(struct devgroupdb));
if (db == NULL) {
fputs("new_devgroup(): could not allocate internal data.\n", stderr);
exit(MAPID_MEM_ALLOCATION_ERROR);
}
db->first = NULL;
return db;
}
/* Constructor for database record instance */
struct devgroupdbitem *new_devgroupdbitem(char *device, int groupid) {
struct devgroupdbitem *item = NULL;
item = (struct devgroupdbitem *) malloc(sizeof(struct devgroupdbitem));
if (item == NULL) {
fputs("new_devgroup(): could not allocate internal data.\n", stderr);
exit(MAPID_MEM_ALLOCATION_ERROR);
}
item->device = strdup(device);
item->groupid = groupid;
item->next = item;
return item;
}
/* Returns groupid of device from db */
int devgroupdb_getgroupidbydevice(struct devgroupdb *db, char *device) {
struct devgroupdbitem *item;
item = db->first;
if(item != NULL) {
do {
if(strcmp(item->device, device) == 0) {
return item->groupid;
}
item = item->next;
} while(item != db->first);
}
return 0; /* default */
}
/* Sets groupid of device in db */
int devgroupdb_setgroupidbydevice(struct devgroupdb *db, char *device, int groupid) {
struct devgroupdbitem *item;
int oldgroupid;
item = db->first;
if(item != NULL) {
do {
if(strcmp(item->device, device) == 0) {
oldgroupid = item->groupid;
item->groupid = groupid;
return oldgroupid;
}
item = item->next;
} while(item != db->first);
}
return -1; /* not found */
}
/* Adds record into db */
int devgroupdb_add(struct devgroupdb *db, char *device, int groupid) {
struct devgroupdbitem *item;
int oldgroupid;
/* try */
if((oldgroupid = devgroupdb_setgroupidbydevice(db, device, groupid)) != -1) {
/* printf("devgroupdb_add(): device \"%s\" already exists (devgroup %d). So overriden (%d).\n", device, oldgroupid, groupid); */
return 1;
}
item = new_devgroupdbitem(device, groupid);
if(db->first != NULL) {
db->first->next = item;
item->next = db->first;
} else db->first = item;
return 0;
}
int devgroupdb_acquire_rw_lock(char *filename) {
struct flock fl;
int fd;
fl.l_type = F_WRLCK; /* write */
fl.l_whence = SEEK_SET;
fl.l_start = 0;
fl.l_len = 0;
fl.l_pid = getpid();
fd = open(filename, O_RDWR | O_CREAT); /* write */
if(fd == -1) {
fprintf(stderr, "devgroupdb_acquire_rw_lock(): could not open devgroupdb %s.\n", filename);
exit(EXIT_FAILURE);
}
if(fcntl(fd, F_SETLKW, &fl) == -1) { /* Note: BLOCKING until acquired. */
fputs("devgroupdb_acquire_rw_lock(): could not acquire.\n", stderr);
exit(EXIT_FAILURE);
}
return fd;
}
void devgroupdb_release_rw_lock(int fd) {
struct flock fl;
fl.l_type = F_UNLCK;
fl.l_whence = SEEK_SET;
fl.l_start = 0;
fl.l_len = 0;
fl.l_pid = getpid();
if(fcntl(fd, F_SETLK, &fl) == -1) {
fputs("devgroupdb_acquire_rw_lock(): could not release.\n", stderr);
exit(EXIT_FAILURE);
}
close(fd);
}
int devgroupdb_getdevgroupdbfd(char *filename) {
int fd;
fd = open(filename, O_RDONLY);
return fd;
}
void devgroupdb_freedevgroupdbfd(fd) {
close(fd);
}
/* Loads data into database instance from fd */
int devgroupdb_load(struct devgroupdb *db, int fd) {
char *line;
char *device;
int groupid;
char *chptr, *lptr;
line = (char *)malloc(MAPI_STR_LENGTH * sizeof(char));
chptr = lptr = line;
device = (char *)malloc(MAPI_STR_LENGTH * sizeof(char));
while(read(fd, chptr, 1)) {
/* *(chptr+1)='\0'; printf("%s", chptr); */
if(strncmp(chptr, "\n", strlen("\n")) == 0) {
*(++chptr) = '\0';
sscanf(lptr, "%s %d", device, &groupid);
devgroupdb_add(db, device, groupid);
chptr = lptr = line;
} else chptr++;
}
return 0;
}
/* Saves data from database instance into fd */
int devgroupdb_save(struct devgroupdb *db, int fd) {
struct devgroupdbitem *item;
char *buffer;
ftruncate(fd, 0);
lseek(fd, 0, SEEK_SET);
buffer = (char *)malloc(MAPI_STR_LENGTH * sizeof(char));
item = db->first;
if(item != NULL) {
do {
sprintf(buffer, "%s %d\n\0", item->device, item->groupid);
write(fd, buffer, strlen(buffer));
item = item->next;
} while(item != db->first);
}
write(fd, ".", 1);
return 0;
}
/* Merge data in database instance with shared database */
int devgroupdb_merge(struct devgroupdb *db, int local) {
struct devgroupdb *existingdb;
struct devgroupdbitem *item;
int fd;
existingdb = new_devgroupdb();
if(local)
fd = devgroupdb_acquire_rw_lock(MAPIDDEVGROUPDB);
else
fd = devgroupdb_acquire_rw_lock(MAPIDDEVGROUPDBGLOBAL);
devgroupdb_load(existingdb, fd);
item = db->first;
if(item != NULL) {
do {
devgroupdb_add(existingdb, item->device, item->groupid);
item = item->next;
} while(item != db->first);
}
devgroupdb_save(existingdb, fd);
devgroupdb_release_rw_lock(fd);
return 0;
}
/*
int devgroupdb_islocal(int local) {
int fd;
fd = devgroupdb_getdevgroupdbfd(MAPIDDEVGROUPDB);
if(fd != -1) {
close(fd);
return 1;
}
fd = devgroupdb_getdevgroupdbfd(MAPIDDEVGROUPDBGLOBAL);
if(fd != -1) {
close(fd);
return 0;
}
return -1;
}
*/
/* Returns database instance created from shared database */
struct devgroupdb *devgroupdb_open(int local) {
struct devgroupdb *db;
int fd;
db = new_devgroupdb();
switch(local) {
case 1:
fd = devgroupdb_getdevgroupdbfd(MAPIDDEVGROUPDB);
break;
case 0:
fd = devgroupdb_getdevgroupdbfd(MAPIDDEVGROUPDBGLOBAL);
break;
case 3: /* try */
fd = devgroupdb_getdevgroupdbfd(MAPIDDEVGROUPDB);
if(fd == -1)
fd = devgroupdb_getdevgroupdbfd(MAPIDDEVGROUPDBGLOBAL);
break;
}
if(fd == -1) {
fprintf(stderr, "devgroupdb_open(): could not open devgroupdb.\n");
exit(EXIT_FAILURE);
}
devgroupdb_load(db, fd);
devgroupdb_freedevgroupdbfd(fd);
return db;
}
/* vim: set foldmethod=marker foldmarker=\ {{{,\ }}} foldclose= foldcolumn=0 */
/*
* File name: devgroupdb.h
* Date: 2007/04/16 21:10
* Author:
*/
#ifndef __DEVGROUPDB_H__
#define __DEVGROUPDB_H__
/* Interface dependencies -------------------------------------------------- */
/* End interface dependencies ---------------------------------------------- */
#define MAPIDDEVGROUPDB "%s/.mapid.socks"
#define MAPIDDEVGROUPDBGLOBAL "/tmp/mapid.socks"
struct devgroupdbitem {
char *device;
int groupid;
struct devgroupdbitem *next;
};
struct devgroupdb {
struct devgroupdbitem *first;
};
/* Interface for server side (mapid). */
struct devgroupdb *new_devgroupdb();
int devgroupdb_add(struct devgroupdb *db, char *device, int groupid);
int devgroupdb_merge(struct devgroupdb *db, int local);
/* Interface for client side (mapi) */
struct devgroupdb *devgroupdb_open(int local);
int devgroupdb_getgroupidbydevice(struct devgroupdb *db, char *device);
#endif
/* vim: set foldmethod=marker foldmarker=\ {{{,\ }}} foldclose= foldcolumn=0 */
......@@ -17,6 +17,7 @@
#include "mapiipc.h"
#include "debug.h"
#include "mapidflib.h"
#include "devgroupdb.h"
#define HAVE_MSGHDR_MSG_CONTROL 1
......@@ -36,7 +37,8 @@
static int sock;
static int mapidaddr_len;
static struct sockaddr_un mapidaddr;
static char* mapidsocket;
static char* mapidsocket = NULL;
static char* mapidsocketglobal = NULL;
flist_t *flowlist = NULL;
......@@ -199,6 +201,12 @@ int mapiipc_read(struct mapiipcbuf *qbuf)
return 0;
}
// Sets globals
void mapiipc_set_socket_names(char *socket, char *socketglobal) {
mapidsocket = strdup(socket);
mapidsocketglobal = strdup(socketglobal);
}
int mapiipc_client_init()
//Initializes IPC for mapi functions
{
......@@ -207,29 +215,29 @@ int mapiipc_client_init()
return -1;
}
mapidsocket=malloc(sizeof(MAPIDSOCKHOME)-2+strlen(getenv("HOME")));
sprintf(mapidsocket,MAPIDSOCKHOME,getenv("HOME"));
// Check that names of mapid's sockets were set
if(mapidsocket == NULL || mapidsocketglobal == NULL) {
ERROR_CMD(printf("mapiipc_client_init(): socket names not set [%s:%d]\n",__FILE__,__LINE__));
return -1;
}
// Construct name of mapid's socket
// construct socket (try local)
mapidaddr.sun_family = AF_LOCAL;
strcpy(mapidaddr.sun_path, mapidsocket);
mapidaddr_len = sizeof mapidaddr.sun_family + strlen(mapidaddr.sun_path);
if (connect(sock, (struct sockaddr *)&mapidaddr, mapidaddr_len) < 0)
{
free(mapidsocket);
mapidsocket=strdup(MAPIDSOCKGLOBAL);
strcpy(mapidaddr.sun_path, mapidsocket);
mapidaddr_len = sizeof mapidaddr.sun_family + strlen(mapidaddr.sun_path);
if (connect(sock, (struct sockaddr *)&mapidaddr, mapidaddr_len) < 0) {
ERROR_CMD(printf("connect: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
}
else {
free(mapidsocket);
return 0;
}
// construct socket (try global)
strcpy(mapidaddr.sun_path, mapidsocketglobal);
mapidaddr_len = sizeof mapidaddr.sun_family + strlen(mapidaddr.sun_path);
if (connect(sock, (struct sockaddr *)&mapidaddr, mapidaddr_len) < 0) {
ERROR_CMD(printf("connect: %s [%s:%d]\n",strerror(errno),__FILE__,__LINE__));
}
else {
return 0;
}
}
free(mapidsocket);
return -1;
}
......@@ -297,13 +305,12 @@ int mapiipc_remote_write_to_all(remote_flowdescr_t* rflow)
return 0;
}
void cleanup_handler(void *arg){ //the cleanup handler
free(arg);
return;
void cleanup_handler(void *arg){ //the cleanup handler
free(arg);
return;
}
void *mapiipc_comm_thread(void *host){ // reads an IPC message - blocking call
void *mapiipc_comm_thread(void *host){ // reads an IPC message - blocking call
struct dmapiipcbuf* dbuf;
remote_flowdescr_t* rflow;
......@@ -317,9 +324,9 @@ void *mapiipc_comm_thread(void *host){ // reads an IPC message - blocking call
/* Guarantees that thread resources are deallocated upon return */
pthread_detach(pthread_self());
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); // enable cancellation
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); // changes the type of responses to cancellation requests for the calling thread
// asynchronous (cancel the calling thread as soon as the cancellation request is received)
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); // enable cancellation
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); // changes the type of responses to cancellation requests for the calling thread
// asynchronous (cancel the calling thread as soon as the cancellation request is received)
dbuf = (struct dmapiipcbuf *)malloc(sizeof(struct dmapiipcbuf));
// pthread_cleanup_push() function pushes the specified cancellation cleanup handler onto
......@@ -330,7 +337,7 @@ void *mapiipc_comm_thread(void *host){ // reads an IPC message - blocking call
while(1){
if(host == NULL) break;
if(host == NULL) break;
#ifdef DIMAPISSL
recv_bytes = SSL_readn( ((struct host *) host)->con, dbuf, BASIC_SIZE);
......
......@@ -27,6 +27,8 @@
#define ARG_LENGTH 32
#define MAPIDSOCKHOME "%s/.mapid.sock"
#define MAPIDSOCKGLOBAL "/tmp/mapid.sock"
#define MAPIDGSOCKHOME "%s/.mapid%d.sock"
#define MAPIDGSOCKGLOBAL "/tmp/mapid%d.sock"
//All IPC code needs to be rewritten and cleand up.
//To support dynamic loading of new functions we should have an IPC
......@@ -123,6 +125,9 @@ struct mapiipcbuf {
//IPC calls
//Initialize IPC variables
void mapiipc_set_socket_names(char *socket, char *socketglobal);
//Initialize IPC functions
int mapiipc_client_init(void);
void mapiipc_daemon_init(void);
......
......@@ -9,7 +9,8 @@ libmapi_la_LIBADD = \
../common/libmapiipc.la \
../common/libparseconf.la \
../common/libprintfstring.la \
../common/libmapilibhandler.la -lpthread -ldl
../common/libmapilibhandler.la -lpthread -ldl \
../common/libdevgroupdb.la
libmapi_la_SOURCES = mapi.c mapi_errors.c
include_HEADERS = mapi.h
......
......@@ -26,6 +26,7 @@
#include "parseconf.h"
#include "printfstring.h"
#include "mapi_errors.h"
#include "devgroupdb.h"
#ifdef WITH_ADMISSION_CONTROL
#include <regex.h>
......@@ -519,6 +520,10 @@ int mapi_create_flow(const char *dev)
{
struct mapiipcbuf qbuf;
flowdescr_t *flow, *tmpflow;
struct devgroupdb *devgroupdb;
int devgroupid = 0;
char *mapidsocket, *mapidsocketglobal;
/* int local;*/
#ifdef DIMAPI
remote_flowdescr_t *rflow;
......@@ -540,6 +545,26 @@ int mapi_create_flow(const char *dev)
local_err = MAPI_DEVICE_INFO_ERR;
return -1;
}
/*
local = devgroupid_islocal();
if(local == -1) {
DEBUG_CMD(printf( "Error wrong global / local \n\n"));
return -1;
}
*/
devgroupdb = devgroupdb_open(3); // try local, global
devgroupid = devgroupdb_getgroupidbydevice(devgroupdb, (char *) dev);
if(!devgroupid) {
mapidsocket = printf_string(MAPIDSOCKHOME, getenv("HOME"));
mapidsocketglobal = strdup(MAPIDSOCKGLOBAL);
}
else {
mapidsocket = printf_string(MAPIDGSOCKHOME, getenv("HOME"), devgroupid);
mapidsocketglobal = printf_string(MAPIDGSOCKGLOBAL, devgroupid);
}
mapiipc_set_socket_names(mapidsocket, mapidsocketglobal);
//check if flow is remote or not and call the appropriate init function
#ifndef DIMAPI
......@@ -807,6 +832,12 @@ int mapi_create_offline_flow(const char *dev, int format)
struct mapiipcbuf qbuf;
flowdescr_t *flow=NULL;
int file;
char *mapidsocket, *mapidsocketglobal;
mapidsocket = printf_string(MAPIDSOCKHOME, getenv("HOME"));
mapidsocketglobal = strdup(MAPIDSOCKGLOBAL);
mapiipc_set_socket_names(mapidsocket, mapidsocketglobal);
pthread_once(&mapi_is_initialized, (void*)mapi_init);
......@@ -921,6 +952,13 @@ char* mapi_create_offline_device(const char *path, int format)
offline_device *device = NULL;
#endif
char *mapidsocket, *mapidsocketglobal;
mapidsocket = printf_string(MAPIDSOCKHOME, getenv("HOME"));
mapidsocketglobal = strdup(MAPIDSOCKGLOBAL);
mapiipc_set_socket_names(mapidsocket, mapidsocketglobal);
pthread_once(&mapi_is_initialized, (void*)mapi_init);