21 #include <sys/prctl.h>
26 #include <sys/types.h>
50 #define __DEFAULT_GENERIC_RPC_TMPDIR "/var/tmp"
54 static pthread_mutex_t generic_rpc_mutex = PTHREAD_MUTEX_INITIALIZER;
55 static int init_done = 0;
57 static int generic_rpc_listener_count = 1;
60 static GHashTable *svctype_info_tab;
73 pthread_mutex_lock(&generic_rpc_mutex);
76 pthread_mutex_unlock(&generic_rpc_mutex);
80 memset(&tv,0,
sizeof(tv));
81 if (gettimeofday(&tv,NULL))
82 vwarn(
"gettimeofday: %s (not doing srand!)\n",strerror(errno));
84 srand((
int)tv.tv_usec);
86 if ((tmpdir = getenv(
"TMP")))
97 g_hash_table_new_full(g_direct_hash,g_direct_equal,NULL,NULL);
101 pthread_mutex_unlock(&generic_rpc_mutex);
108 pthread_mutex_lock(&generic_rpc_mutex);
111 pthread_mutex_unlock(&generic_rpc_mutex);
115 g_hash_table_iter_init(&iter,svctype_info_tab);
116 while (g_hash_table_iter_next(&iter,&key,NULL)) {
120 g_hash_table_destroy(svctype_info_tab);
121 svctype_info_tab = NULL;
128 pthread_mutex_unlock(&generic_rpc_mutex);
133 g_hash_table_lookup(svctype_info_tab,(gpointer)(uintptr_t)svctype);
142 pthread_mutex_lock(&generic_rpc_mutex);
145 pthread_mutex_unlock(&generic_rpc_mutex);
149 si =
calloc(1,
sizeof(*si));
154 g_hash_table_new_full(g_direct_hash,g_direct_equal,NULL,NULL);
156 g_hash_table_new_full(g_direct_hash,g_direct_equal,NULL,NULL);
158 g_hash_table_insert(svctype_info_tab,(gpointer)(uintptr_t)svctype,si);
160 pthread_mutex_unlock(&generic_rpc_mutex);
176 while (g_hash_table_iter_next(&iter,NULL,(gpointer *)&l)) {
177 _generic_rpc_listener_free(l);
185 while (g_hash_table_iter_next(&iter,NULL,(gpointer *)&ll)) {
193 g_hash_table_remove(svctype_info_tab,(gpointer)(uintptr_t)svctype);
196 pthread_mutex_lock(&generic_rpc_mutex);
198 pthread_mutex_unlock(&generic_rpc_mutex);
211 {
"port",
'p',
"PORT",0,
212 "Set the RPC server's port; if unspecified, uses stdin.",0 },
213 {
"tmpdir",
'T',
"DIR",0,
214 "Set the RPC server's tmpdir; if unspecified, $TMP, then /var/tmp.",0 },
224 return ARGP_ERR_UNKNOWN;
226 case ARGP_KEY_NO_ARGS:
227 case ARGP_KEY_SUCCESS:
238 cfg->
port = atoi(arg);
240 return ARGP_ERR_UNKNOWN;
246 return ARGP_ERR_UNKNOWN;
250 return ARGP_ERR_UNKNOWN;
267 pthread_detach(pthread_self());
273 soap->fdebug[SOAP_INDEX_RECV] = stderr;
274 soap->fdebug[SOAP_INDEX_SENT] = stderr;
275 soap->fdebug[SOAP_INDEX_TEST] = stderr;
280 (soap->ip >> 24) & 0xff,(soap->ip >> 16) & 0xff,
281 (soap->ip >> 8) & 0xff,soap->ip & 0xff);
292 static void *_generic_rpc_handle_request(
void *arg) {
298 pthread_detach(pthread_self());
300 snprintf(namebuf,
sizeof(namebuf),
"%s_reqhand",state->
cfg->
name);
301 prctl(PR_SET_NAME,namebuf,NULL,NULL,NULL);
311 static void *generic_rpc_sigwaiter(
void *arg) {
316 prctl(PR_SET_NAME,
"sigwaiter",NULL,NULL,NULL);
319 memset(&siginfo,0,
sizeof(siginfo));
320 rc = sigwaitinfo(&cfg->
sigset,&siginfo);
322 if (errno == EINTR || errno == EAGAIN)
325 vwarn(
"sigwait: %s!\n",strerror(errno));
329 else if (rc == SIGINT) {
330 vwarn(
"interrupted, exiting!\n");
333 else if (rc == SIGCHLD) {
335 sigaddset(&cfg->
sigset,SIGCHLD);
337 else if (rc == SIGPIPE) {
339 sigaddset(&cfg->
sigset,SIGPIPE);
342 vwarn(
"unexpected signal %d; ignoring!\n",rc);
355 pthread_t tid, sigtid;
364 soap.fdebug[SOAP_INDEX_RECV] = stderr;
365 soap.fdebug[SOAP_INDEX_SENT] = stderr;
366 soap.fdebug[SOAP_INDEX_TEST] = stderr;
371 if (cfg->
port <= 0) {
382 soap.send_timeout = 60;
383 soap.recv_timeout = 60;
384 soap.accept_timeout = 0;
385 soap.max_keep_alive = 100;
388 soap.bind_flags = SO_REUSEADDR;
390 m = soap_bind(&soap,NULL,cfg->
port,64);
391 if (!soap_valid_socket(m)) {
392 verror(
"Could not bind to port %d: ",cfg->
port);
393 soap_print_fault(&soap,stderr);
402 if ((rc = pthread_sigmask(SIG_BLOCK,&cfg->
sigset,NULL))) {
403 verror(
"pthread_sigmask: %s\n",strerror(rc));
407 if ((rc = pthread_create(&sigtid,NULL,&generic_rpc_sigwaiter,cfg))) {
408 verror(
"pthread: %s\n",strerror(rc));
415 s = soap_accept(&soap);
416 if (!soap_valid_socket(s)) {
419 soap_print_fault(&soap,stderr);
425 verror(
"SOAP: server timed out\n");
429 (soap.ip >> 24) & 0xff,(soap.ip >> 16) & 0xff,
430 (soap.ip >> 8) & 0xff,soap.ip & 0xff);
432 tsoap = soap_copy(&soap);
434 verror(
"could not copy SOAP data to handle connection; exiting!\n");
438 state =
calloc(1,
sizeof(*state));
442 pthread_create(&tid,NULL,_generic_rpc_handle_request,(
void *)state);
470 pthread_mutex_lock(&generic_rpc_mutex);
472 pthread_mutex_unlock(&generic_rpc_mutex);
485 g_hash_table_lookup(si->
id_listener_tab,(gpointer)(uintptr_t)listener_id);
491 pthread_mutex_lock(&generic_rpc_mutex);
493 pthread_mutex_unlock(&generic_rpc_mutex);
510 l->
id = generic_rpc_listener_count++;
512 l->
url = strdup(url);
514 l->
objid_tab = g_hash_table_new(g_direct_hash,g_direct_equal);
516 soap_init2(&l->
soap,SOAP_IO_KEEPALIVE,SOAP_IO_KEEPALIVE);
517 l->
soap.socket_flags = MSG_NOSIGNAL;
518 l->
soap.tcp_keep_alive = 1;
526 l->
soap.connect_timeout = 4;
527 l->
soap.send_timeout = 4;
528 l->
soap.recv_timeout = 4;
538 pthread_mutex_lock(&generic_rpc_mutex);
540 pthread_mutex_unlock(&generic_rpc_mutex);
547 if (soap_valid_socket(l->
soap.socket))
548 soap_closesock(&(l->
soap));
550 soap_destroy(&l->
soap);
560 int no_objid_deletes) {
572 g_hash_table_lookup(si->
id_listener_tab,(gpointer)(uintptr_t)listener_id);
576 g_hash_table_remove(si->
id_listener_tab,(gpointer)(uintptr_t)listener_id);
583 if (!no_objid_deletes) {
586 while (g_hash_table_iter_next(&iter,NULL,(gpointer *)&tmpl)) {
588 g_hash_table_iter_remove(&iter);
593 while (g_hash_table_iter_next(&iter,NULL,(gpointer *)&ll)) {
603 _generic_rpc_listener_free(l);
611 pthread_mutex_lock(&generic_rpc_mutex);
613 pthread_mutex_unlock(&generic_rpc_mutex);
654 if (!ll || array_list_len(ll) < 1)
667 g_hash_table_remove(l->
objid_tab,(gpointer)(uintptr_t)objid);
684 pthread_mutex_lock(&generic_rpc_mutex);
686 pthread_mutex_unlock(&generic_rpc_mutex);
701 (gpointer)(uintptr_t)objid);
710 ll = array_list_clone(ll,0);
724 (gpointer)(uintptr_t)objid);
736 pthread_mutex_lock(&generic_rpc_mutex);
738 pthread_mutex_unlock(&generic_rpc_mutex);
744 int objid,
int owns) {
749 pthread_mutex_lock(&generic_rpc_mutex);
752 pthread_mutex_unlock(&generic_rpc_mutex);
757 pthread_mutex_unlock(&generic_rpc_mutex);
758 verror(
"listener %d does not exist!\n",listener_id);
764 (gpointer)(uintptr_t)objid)) {
765 pthread_mutex_unlock(&generic_rpc_mutex);
774 (gpointer)(uintptr_t)objid))) {
775 ll = array_list_create(1);
777 (gpointer)(uintptr_t)objid,ll);
779 else if (array_list_find(ll,l) != -1) {
780 pthread_mutex_unlock(&generic_rpc_mutex);
785 verror(
"listener %d already on objid %d's list!\n",
795 g_hash_table_insert(l->
objid_tab,(gpointer)(uintptr_t)objid,l);
797 array_list_append(ll,l);
799 pthread_mutex_unlock(&generic_rpc_mutex);
805 int objid,
int owns) {
810 pthread_mutex_lock(&generic_rpc_mutex);
813 pthread_mutex_unlock(&generic_rpc_mutex);
825 (gpointer)(uintptr_t)objid)) {
826 pthread_mutex_unlock(&generic_rpc_mutex);
835 (gpointer)(uintptr_t)objid))) {
836 ll = array_list_create(1);
838 (gpointer)(uintptr_t)objid,ll);
840 else if (array_list_find(ll,l) != -1) {
841 pthread_mutex_unlock(&generic_rpc_mutex);
846 verror(
"listener %d already on objid %d's list!\n",
855 g_hash_table_insert(l->
objid_tab,(gpointer)(uintptr_t)objid,l);
857 array_list_append(ll,l);
859 pthread_mutex_unlock(&generic_rpc_mutex);
869 pthread_mutex_lock(&generic_rpc_mutex);
871 pthread_mutex_unlock(&generic_rpc_mutex);
875 pthread_mutex_unlock(&generic_rpc_mutex);
885 pthread_mutex_lock(&generic_rpc_mutex);
888 pthread_mutex_unlock(&generic_rpc_mutex);
895 rc = array_list_len(ll);
897 pthread_mutex_unlock(&generic_rpc_mutex);
912 pthread_mutex_lock(&generic_rpc_mutex);
915 pthread_mutex_unlock(&generic_rpc_mutex);
923 (gpointer)(uintptr_t)objid);
925 if (!ll || array_list_len(ll) < 1) {
926 pthread_mutex_unlock(&generic_rpc_mutex);
935 owner->
soap.connect_timeout = 24 * 60 * 60;
936 owner->
soap.send_timeout = 24 * 60 * 60;
937 owner->
soap.recv_timeout = 24 * 60 * 60;
939 rc = notifier(owner,1,data);
941 owner->
soap.connect_timeout = 4;
942 owner->
soap.send_timeout = 4;
943 owner->
soap.recv_timeout = 4;
953 rc = notifier(l,0,data);
956 "notifier returned %d on %s for (%d,%d); removing!\n",
957 rc,l->
url,svctype,objid);
971 l->
url,svctype,objid);
975 pthread_mutex_unlock(&generic_rpc_mutex);
982 char *filename,
int max_size) {
989 memset(&statbuf,0,
sizeof(statbuf));
990 if (stat(filename,&statbuf)) {
991 verror(
"could not stat logfile %s: %s\n",filename,strerror(errno));
994 else if ((fd = open(filename,O_RDONLY)) < 0) {
995 verror(
"could not open logfile %s: %s\n",
996 filename,strerror(errno));
1001 if (statbuf.st_size > 0) {
1002 sz = statbuf.st_size;
1003 if (max_size > 0 && max_size < statbuf.st_size)
1009 lseek(fd,statbuf.st_size - sz,SEEK_SET);
1013 if (rc != sz && errno) {
1014 vwarn(
"only read %d of %d bytes for logfile %s: %s\n",
1015 rc,sz,filename,strerror(errno));
1018 vwarn(
"only read %d of %d bytes for logfile %s (no error)\n",
const struct argp_child generic_rpc_argp_children[2]
#define vwarnopt(level, area, flags, format,...)
int( generic_rpc_listener_notifier_t)(struct generic_rpc_listener *l, int is_owner, void *data)
int generic_rpc_serve(struct generic_rpc_config *cfg)
int _generic_rpc_remove_listener(rpc_svctype_t svctype, int listener_id, int no_objid_deletes)
#define SOAP_CALLOC(soap, nmemb, size)
struct argp generic_rpc_argp
int generic_rpc_unbind_dynlistener_objid(rpc_svctype_t svctype, char *listener_url, int objid)
static uint64_t unsigned int i
void generic_rpc_init(void)
char * GENERIC_RPC_TMPDIR
int generic_rpc_unbind_all_listeners_objid(rpc_svctype_t svctype, int objid)
int _generic_rpc_unbind_listener_objid(rpc_svctype_t svctype, int listener_id, int objid)
struct generic_rpc_config * cfg
GHashTable * objid_listener_tab
#define verror(format,...)
#define verrorc(format,...)
#define vwarn(format,...)
int generic_rpc_insert_listener(rpc_svctype_t svctype, char *url)
int _generic_rpc_insert_listener(rpc_svctype_t svctype, char *url)
int generic_rpc_remove_listener(rpc_svctype_t svctype, int listener_id)
#define array_list_foreach(alist, lpc, placeholder)
void waitpipe_notify(int signo, siginfo_t *siginfo)
GHashTable * url_listener_tab
#define generic_rpc_argp_header
GHashTable * id_listener_tab
struct argp_option generic_rpc_argp_opts[]
#define __DEFAULT_GENERIC_RPC_TMPDIR
int generic_rpc_unbind_listener_objid(rpc_svctype_t svctype, int listener_id, int objid)
struct xsd__hexBinary * generic_rpc_read_file_into_hexBinary(struct soap *soap, char *filename, int max_size)
void generic_rpc_register_svctype(rpc_svctype_t svctype)
int generic_rpc_bind_dynlistener_objid(rpc_svctype_t svctype, char *listener_url, int objid, int owns)
int _generic_rpc_unbind_all_listeners_objid(rpc_svctype_t svctype, int objid)
#define vdebug(devel, areas, flags, format,...)
void * calloc(size_t nmemb, size_t size)
void generic_rpc_fini(void)
struct generic_rpc_listener * generic_rpc_lookup_listener_url(rpc_svctype_t svctype, char *url)
struct generic_rpc_listener * _generic_rpc_lookup_listener_id(rpc_svctype_t svctype, int listener_id)
int generic_rpc_listener_notify_all(rpc_svctype_t svctype, int objid, generic_rpc_listener_notifier_t *notifier, void *data)
int generic_rpc_bind_listener_objid(rpc_svctype_t svctype, int listener_id, int objid, int owns)
int(* handle_request)(struct soap *soap)
#define __SAFE_IO(fn, fns, fd, buf, buflen, rc)
struct generic_rpc_listener * _generic_rpc_listener_lookup_owner(rpc_svctype_t svctype, int objid)
int generic_rpc_handle_request(struct soap *soap)
#define array_list_foreach_delete(alist, lpc)
struct generic_rpc_listener * generic_rpc_lookup_listener_id(rpc_svctype_t svctype, int listener_id)
void _generic_rpc_unregister_svctype(rpc_svctype_t svctype, int no_hash_delete)
void generic_rpc_unregister_svctype(rpc_svctype_t svctype)
struct svctype_info * __get_si(rpc_svctype_t svctype)
GHashTable * objid_listenerlist_tab
int generic_rpc_count_listeners(rpc_svctype_t svctype, int objid)
int waitpipe_init_ext(void(*alt_handler)(int, siginfo_t *, void *))
error_t generic_rpc_argp_parse_opt(int key, char *arg, struct argp_state *state)
struct generic_rpc_listener * _generic_rpc_lookup_listener_url(rpc_svctype_t svctype, char *url)