libamxp  1.4.0
Patterns C Implementation
amxp_signal.c
Go to the documentation of this file.
1 /****************************************************************************
2 **
3 ** SPDX-License-Identifier: BSD-2-Clause-Patent
4 **
5 ** SPDX-FileCopyrightText: Copyright (c) 2023 SoftAtHome
6 **
7 ** Redistribution and use in source and binary forms, with or without modification,
8 ** are permitted provided that the following conditions are met:
9 **
10 ** 1. Redistributions of source code must retain the above copyright notice,
11 ** this list of conditions and the following disclaimer.
12 **
13 ** 2. Redistributions in binary form must reproduce the above copyright notice,
14 ** this list of conditions and the following disclaimer in the documentation
15 ** and/or other materials provided with the distribution.
16 **
17 ** Subject to the terms and conditions of this license, each copyright holder
18 ** and contributor hereby grants to those receiving rights under this license
19 ** a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable
20 ** (except for failure to satisfy the conditions of this license) patent license
21 ** to make, have made, use, offer to sell, sell, import, and otherwise transfer
22 ** this software, where such license applies only to those patent claims, already
23 ** acquired or hereafter acquired, licensable by such copyright holder or contributor
24 ** that are necessarily infringed by:
25 **
26 ** (a) their Contribution(s) (the licensed copyrights of copyright holders and
27 ** non-copyrightable additions of contributors, in source or binary form) alone;
28 ** or
29 **
30 ** (b) combination of their Contribution(s) with the work of authorship to which
31 ** such Contribution(s) was added by such copyright holder or contributor, if,
32 ** at the time the Contribution is added, such addition causes such combination
33 ** to be necessarily infringed. The patent license shall not apply to any other
34 ** combinations which include the Contribution.
35 **
36 ** Except as expressly stated above, no rights or licenses from any copyright
37 ** holder or contributor is granted under this license, whether expressly, by
38 ** implication, estoppel or otherwise.
39 **
40 ** DISCLAIMER
41 **
42 ** THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
43 ** AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
44 ** IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
45 ** ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
46 ** LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
47 ** DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
48 ** SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
49 ** CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
50 ** OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
51 ** USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
52 **
53 ****************************************************************************/
54 
55 #ifndef _GNU_SOURCE
56 #define _GNU_SOURCE
57 #endif
58 
59 #include <stdlib.h>
60 #include <stdio.h>
61 #include <string.h>
62 #include <fcntl.h>
63 #include <unistd.h>
64 #include <pthread.h>
65 
66 #include <amxc/amxc.h>
67 #include <amxp/amxp.h>
68 
69 #include "amxp_signal_priv.h"
70 
71 typedef struct _signal_ctrl {
72  int sigpipe[2];
74  amxc_llist_t sigmngrs;
75  amxc_llist_t pending_sigmngrs;
76  pthread_mutex_t mutex;
78 
79 typedef struct _signal_queue_item {
80  char* sig_name;
81  amxc_var_t sig_data;
82  amxc_lqueue_it_t it;
85  void* priv;
87 
90 
91 static int amxp_signal_queue_lock(void) {
92  return pthread_mutex_lock(&amxp_sigctrl.mutex);
93 }
94 
95 static int amxp_signal_queue_unlock(void) {
96  return pthread_mutex_unlock(&amxp_sigctrl.mutex);
97 }
98 
99 void amxp_free_slots(amxc_llist_it_t* it) {
100  amxp_slot_t* slot = amxc_llist_it_get_data(it, amxp_slot_t, it);
101  amxc_llist_it_take(it);
102  amxp_expr_delete(&slot->expr);
103  free(slot->regexp);
104  free(slot);
105 }
106 
107 static void amxp_free_signals(UNUSED const char* key,
108  amxc_htable_it_t* it) {
109  amxp_signal_t* signal = amxc_htable_it_get_data(it, amxp_signal_t, hit);
111  free(signal);
112 }
113 
114 static void amxp_free_queued_signals(amxc_lqueue_it_t* it) {
115  signal_queue_item_t* item = amxc_llist_it_get_data(it, signal_queue_item_t, it);
116  amxc_var_clean(&item->sig_data);
117  free(item->sig_name);
118  free(item);
119 }
120 
121 static int amxp_signal_queue(const amxp_signal_t* const signal,
122  const amxc_var_t* const data) {
123  int retval = -1;
124  signal_queue_item_t* item = (signal_queue_item_t*) calloc(1, sizeof(signal_queue_item_t));
125  when_null(item, exit);
126  item->sig_name = (char*) calloc(1, strlen(signal->name) + 1);
127  when_null(item->sig_name, exit);
128 
129  amxc_var_init(&item->sig_data);
130  if(data != NULL) {
131  when_failed(amxc_var_copy(&item->sig_data, data), exit);
132  }
133  strcpy(item->sig_name, signal->name);
134  amxc_lqueue_add(&signal->mngr->signal_queue, &item->it);
135 
136  if(!signal->mngr->suspended) {
137  amxc_llist_append(&amxp_sigctrl.pending_sigmngrs, &signal->mngr->it);
138  }
139 
140  retval = 0;
141 
142 exit:
143  if((retval != 0) && (item != NULL)) {
144  free(item->sig_name);
145  amxc_var_clean(&item->sig_data);
146  free(item);
147  }
148  return retval;
149 }
150 
153  const amxc_var_t* const data,
154  void* priv) {
155  int retval = -1;
156  signal_queue_item_t* item = (signal_queue_item_t*) calloc(1, sizeof(signal_queue_item_t));
157  when_null(item, exit);
158 
159  amxc_var_init(&item->sig_data);
160  if(data != NULL) {
161  when_failed(amxc_var_copy(&item->sig_data, data), exit);
162  }
163  item->fn = fn;
164  item->priv = priv;
165  amxc_lqueue_add(&sigmngr->signal_queue, &item->it);
166 
167  if(!sigmngr->suspended) {
168  amxc_llist_append(&amxp_sigctrl.pending_sigmngrs, &sigmngr->it);
169  }
170 
171  retval = 0;
172 
173 exit:
174  if((retval != 0) && (item != NULL)) {
175  amxc_var_clean(&item->sig_data);
176  free(item);
177  }
178  return retval;
179 }
180 
182  signal_queue_item_t* item = NULL;
183  amxp_signal_mngr_t* sig_mngr = NULL;
184  amxc_lqueue_it_t* sig_it = NULL;
185 
186  amxc_llist_it_t* it = amxc_llist_get_first(&amxp_sigctrl.pending_sigmngrs);
187  when_null(it, exit);
188 
189  sig_mngr = amxc_llist_it_get_data(it, amxp_signal_mngr_t, it);
190  amxc_llist_append(&amxp_sigctrl.pending_sigmngrs, &sig_mngr->it);
191 
192  sig_it = amxc_lqueue_remove(&sig_mngr->signal_queue);
193  item = amxc_llist_it_get_data(sig_it, signal_queue_item_t, it);
194 
195  item->sig_mngr = sig_mngr;
196 
197  if(amxc_lqueue_is_empty(&sig_mngr->signal_queue)) {
198  amxc_llist_append(&amxp_sigctrl.sigmngrs, it);
199  }
200 
201 exit:
202  return item;
203 }
204 
205 static int amxp_signal_create_pipe(void) {
206  int flags = 0;
207  int retval = pipe(amxp_sigctrl.sigpipe);
208  when_true(retval != 0, exit);
209 
210  flags = fcntl(amxp_sigctrl.sigpipe[0], F_GETFL, 0);
211  when_true(flags < 0, exit);
212  when_true(fcntl(amxp_sigctrl.sigpipe[0], F_SETFL, flags | O_NONBLOCK) < 0, exit);
213  when_true(fcntl(amxp_sigctrl.sigpipe[0], F_SETFD, amxp_sigctrl.sigpipe[0] | FD_CLOEXEC) < 0, exit);
214  when_true(fcntl(amxp_sigctrl.sigpipe[1], F_SETFD, amxp_sigctrl.sigpipe[1] | FD_CLOEXEC) < 0, exit);
215 
216  retval = 0;
217 
218 exit:
219  if(retval != 0) {
220  if(amxp_sigctrl.sigpipe[0] != -1) {
221  close(amxp_sigctrl.sigpipe[0]);
222  close(amxp_sigctrl.sigpipe[1]);
223  }
224  amxp_sigctrl.sigpipe[0] = -1;
225  amxp_sigctrl.sigpipe[1] = -1;
226  }
227  return retval;
228 }
229 
230 static void amxp_slot_trigger(const amxp_slot_t* const slot,
231  const char* name,
232  const amxc_var_t* const data) {
233  if(slot->expr != NULL) {
235  if((data == NULL) || amxp_expr_eval_var(slot->expr, data, &status)) {
236  slot->fn(name, data, slot->priv);
237  }
238  } else {
239  slot->fn(name, data, slot->priv);
240  }
241 }
242 
244  const char* name,
245  const amxc_var_t* const data) {
246  amxp_signal_t* signal = amxp_sigmngr_find_signal(sig_mngr, name);
247  if(signal != NULL) {
248  signal->triggered = true;
249  }
250 
251  sig_mngr->triggered = true;
252  amxc_llist_for_each(it, (&sig_mngr->regexp_slots)) {
253  regex_t regexp;
254  amxp_slot_t* slot = amxc_llist_it_get_data(it, amxp_slot_t, it);
255  if(slot->deleted) {
256  continue;
257  }
258  regcomp(&regexp, slot->regexp, REG_NOSUB | REG_EXTENDED);
259  if(regexec(&regexp, name, 0, NULL, 0) == 0) {
260  amxp_slot_trigger(slot, name, data);
261  }
262  regfree(&regexp);
263  if(sig_mngr->deleted || !sig_mngr->enabled) {
264  break;
265  }
266  }
267 
268  if(signal != NULL) {
269  signal->triggered = false;
270  }
271  sig_mngr->triggered = false;
272 }
273 
275  when_null(signal, exit);
276 
277  if(signal->mngr == NULL) {
278  amxc_htable_it_clean(&signal->hit, amxp_free_signals);
279  } else {
280  amxc_llist_for_each(it, (&signal->slots)) {
281  amxp_slot_t* slot = amxc_llist_it_get_data(it, amxp_slot_t, it);
282  if(slot->deleted) {
283  amxc_llist_it_clean(it, amxp_free_slots);
284  }
285  }
286  }
287 
288 exit:
289  return;
290 }
291 
293  when_null(sig_mngr, exit);
294 
295  if(sig_mngr->deleted && (sig_mngr != &amxp_sigmngr)) {
296  amxp_sigmngr_delete(&sig_mngr);
297  } else {
298  sig_mngr->deleted = false;
299  amxc_htable_for_each(hit, &sig_mngr->signals) {
300  amxp_signal_t* sig = amxc_container_of(hit, amxp_signal_t, hit);
302  }
303  amxc_llist_for_each(it, &sig_mngr->regexp_slots) {
304  amxp_slot_t* slot = amxc_llist_it_get_data(it, amxp_slot_t, it);
305  if(slot->deleted) {
306  amxc_llist_it_clean(it, amxp_free_slots);
307  }
308  }
309  }
310 
311 exit:
312  return;
313 }
314 
316  return (sig_mngr == NULL) ? &amxp_sigmngr : sig_mngr;
317 }
318 
319 void amxp_get_sigmngrs(amxc_llist_t** sigmngrs,
320  amxc_llist_t** pending_sigmngrs) {
321  *sigmngrs = &amxp_sigctrl.sigmngrs;
322  *pending_sigmngrs = &amxp_sigctrl.pending_sigmngrs;
323 }
324 
325 const amxc_htable_t* amxp_get_signals(const amxp_signal_mngr_t* sig_mngr) {
326  sig_mngr = (sig_mngr == NULL) ? &amxp_sigmngr : sig_mngr;
327  return &sig_mngr->signals;
328 }
329 
331  int retval = -1;
332  when_null(sig_mngr, exit);
333 
334  *sig_mngr = (amxp_signal_mngr_t*) calloc(1, sizeof(amxp_signal_mngr_t));
335  when_null(sig_mngr, exit);
336 
337  retval = amxp_sigmngr_init((*sig_mngr));
338 
339 exit:
340  if((retval != 0) &&
341  (sig_mngr != NULL) &&
342  (*sig_mngr != NULL)) {
343  free(*sig_mngr);
344  *sig_mngr = NULL;
345  }
346  return retval;
347 }
348 
350  int retval = -1;
351 
352  when_null(sig_mngr, exit);
353  when_null(*sig_mngr, exit);
354 
355  if((*sig_mngr)->triggered) {
356  (*sig_mngr)->deleted = true;
357  amxc_htable_for_each(hit, &(*sig_mngr)->signals) {
358  amxp_signal_t* sig = amxc_container_of(hit, amxp_signal_t, hit);
359  if(sig != amxp_sigctrl.sigall) {
360  sig->mngr = NULL;
361  }
362  }
363  } else {
364  amxp_sigmngr_clean((*sig_mngr));
365  if(*sig_mngr != &amxp_sigmngr) {
366  free(*sig_mngr);
367  }
368  }
369 
370  *sig_mngr = NULL;
371  retval = 0;
372 
373 exit:
374  return retval;
375 }
376 
378  int retval = -1;
379  when_null(sig_mngr, exit);
380 
382 
383  amxc_llist_it_init(&sig_mngr->it);
384  amxc_lqueue_init(&sig_mngr->signal_queue);
385  when_false_status(amxc_htable_init(&sig_mngr->signals, 20) == 0, exit, amxp_signal_queue_unlock());
386  amxc_llist_init(&sig_mngr->regexp_slots);
387  amxc_llist_append(&amxp_sigctrl.sigmngrs, &sig_mngr->it);
388  sig_mngr->enabled = true;
389  sig_mngr->deleted = false;
390  sig_mngr->triggered = false;
391  sig_mngr->suspended = false;
392 
394 
395  retval = 0;
396 
397 exit:
398  if((retval != 0) &&
399  (sig_mngr != NULL)) {
400  amxc_htable_clean(&sig_mngr->signals, NULL);
401  }
402  return retval;
403 }
404 
406  int retval = -1;
407  amxc_llist_it_t* it = NULL;
408  char buffer[1];
409 
410  when_null(sig_mngr, exit);
411 
413 
414  amxc_llist_it_take(&sig_mngr->it);
415  it = amxc_lqueue_remove(&sig_mngr->signal_queue);
416  while(it) {
418  when_true_status(read(amxp_sigctrl.sigpipe[0], buffer, 1) == -1, exit, amxp_signal_queue_unlock());
419  it = amxc_lqueue_remove(&sig_mngr->signal_queue);
420  }
421  amxc_llist_clean(&sig_mngr->regexp_slots, amxp_free_slots);
422  amxc_htable_clean(&sig_mngr->signals, amxp_free_signals);
423 
425 
426  retval = 0;
427 
428 exit:
429 
430  return retval;
431 }
432 
434  const char* name) {
435  int retval = -1;
436  amxp_signal_t* signal = NULL;
437  amxp_signal_mngr_t* mngr = (sig_mngr == NULL) ? &amxp_sigmngr : sig_mngr;
438 
439  when_null(name, exit);
440  when_true(*name == 0, exit)
441 
442  retval = amxp_signal_new(mngr, &signal, name);
443 
444 exit:
445  return retval;
446 }
447 
449  const char* name) {
450  int retval = -1;
451  amxp_signal_t* signal = NULL;
452  amxp_signal_mngr_t* mngr = (sig_mngr == NULL) ? &amxp_sigmngr : sig_mngr;
453  amxc_htable_it_t* hit = NULL;
454 
456 
457  when_null(name, exit);
458  when_true(*name == 0, exit)
459 
460  hit = amxc_htable_get(&mngr->signals, name);
461  when_null(hit, exit);
462 
463  signal = amxc_htable_it_get_data(hit, amxp_signal_t, hit);
464  amxc_htable_it_clean(hit, NULL);
466  signal->mngr = NULL;
467 
468  retval = 0;
469 
470 exit:
472  return retval;
473 }
474 
476  const char* name) {
477  amxp_signal_t* signal = NULL;
478  amxc_htable_it_t* hit = NULL;
479  const amxp_signal_mngr_t* mngr = (sig_mngr == NULL) ? &amxp_sigmngr : sig_mngr;
480 
481  when_null(name, exit);
482  when_true(*name == 0, exit)
483 
484  hit = amxc_htable_get(&mngr->signals, name);
485  when_null(hit, exit);
486  signal = amxc_htable_it_get_data(hit, amxp_signal_t, hit);
487 
488 exit:
489  return signal;
490 }
491 
493  const char* name,
494  const amxc_var_t* const data) {
495  amxp_signal_t* signal = NULL;
496  amxp_signal_mngr_t* mngr = (sig_mngr == NULL) ? &amxp_sigmngr : sig_mngr;
497 
498  when_null(name, exit);
499  when_true(*name == 0, exit);
500  when_true(mngr->suspended, exit);
501 
502  signal = amxp_sigmngr_find_signal(mngr, name);
503  if(signal != NULL) {
504  amxp_signal_trigger(signal, data);
505  } else {
506  amxp_sigmngr_trigger_regexp(mngr, name, data);
508  }
509 
510 exit:
511  return;
512 }
513 
515  const char* name,
516  const amxc_var_t* const data) {
517 
518  int retval = -1;
519  amxp_signal_t* signal = NULL;
520  const amxp_signal_mngr_t* mngr = (sig_mngr == NULL) ? &amxp_sigmngr : sig_mngr;
521  amxc_htable_it_t* hit = NULL;
522 
523  when_null(name, exit);
524  when_true(*name == 0, exit)
525 
526  hit = amxc_htable_get(&mngr->signals, name);
527  when_null(hit, exit);
528 
529  signal = amxc_htable_it_get_data(hit, amxp_signal_t, hit);
530  retval = amxp_signal_emit(signal, data);
531 
532 exit:
533  return retval;
534 }
535 
538  const amxc_var_t* const data,
539  void* priv) {
540  int retval = -1;
541  int write_length = 0;
542  amxp_signal_mngr_t* mngr = (sig_mngr == NULL) ? &amxp_sigmngr : sig_mngr;
543 
544  when_null(fn, exit);
545 
547 
548  if(!mngr->suspended) {
549  write_length = write(amxp_sigctrl.sigpipe[1], "S", 1);
550  when_true_status(write_length != 1, exit, amxp_signal_queue_unlock());
551  }
552 
553  retval = amxp_deferred_queue(mngr, fn, data, priv);
554 
556 
557 exit:
558  return retval;
559 }
560 
562  bool enable) {
563  int retval = 0;
564  amxp_signal_mngr_t* mngr = (sig_mngr == NULL) ? &amxp_sigmngr : sig_mngr;
565 
566  mngr->enabled = enable;
567 
568  return retval;
569 }
570 
572  int retval = -1;
573  char buffer[1];
574  amxp_signal_mngr_t* mngr = (sig_mngr == NULL) ? &amxp_sigmngr : sig_mngr;
575 
576  when_true(mngr->suspended, exit);
577 
579 
580  amxc_llist_iterate(it, &mngr->signal_queue) {
581  when_true_status(read(amxp_sigctrl.sigpipe[0], buffer, 1) == -1, exit, amxp_signal_queue_unlock());
582  }
583 
584  amxc_llist_append(&amxp_sigctrl.sigmngrs, &mngr->it);
585  mngr->suspended = true;
586 
588 
589  retval = 0;
590 
591 exit:
592  return retval;
593 }
594 
596  int retval = -1;
597  int write_length = 0;
598  amxp_signal_mngr_t* mngr = (sig_mngr == NULL) ? &amxp_sigmngr : sig_mngr;
599 
600  when_false(mngr->suspended, exit);
601  mngr->suspended = false;
602 
604 
605  amxc_llist_iterate(it, &mngr->signal_queue) {
606  write_length = write(amxp_sigctrl.sigpipe[1], "S", 1);
607  when_true_status(write_length != 1, exit, amxp_signal_queue_unlock());
608  }
609 
610  amxc_llist_append(&amxp_sigctrl.pending_sigmngrs, &mngr->it);
611 
613 
614  retval = 0;
615 
616 exit:
617  return retval;
618 }
619 
621  amxp_signal_t** signal,
622  const char* name) {
623  int retval = -1;
624  amxc_htable_it_t* hit = NULL;
625  when_null(signal, exit);
626  when_not_null(*signal, exit);
627  when_null(name, exit);
628  when_true(*name == 0, exit)
629 
630  sig_mngr = (sig_mngr == NULL) ? &amxp_sigmngr : sig_mngr;
631  when_null(sig_mngr->it.llist, exit);
632 
633  hit = amxc_htable_get(&sig_mngr->signals, name);
634  when_not_null(hit, exit);
635 
636  *signal = (amxp_signal_t*) calloc(1, sizeof(amxp_signal_t));
637  when_null(*signal, exit);
638  (*signal)->mngr = sig_mngr;
639 
641 
642  amxc_htable_it_init(&(*signal)->hit);
643  amxc_llist_init(&(*signal)->slots);
644  if(amxc_htable_insert(&sig_mngr->signals, name, &(*signal)->hit) == 0) {
645  const char* n = amxc_htable_it_get_key(&(*signal)->hit);
646  when_null_status(n, exit, amxp_signal_queue_unlock());
647  (*signal)->name = n;
648  amxc_llist_for_each(it, (&amxp_sigctrl.sigall->slots)) {
649  amxp_slot_t* slot = amxc_llist_it_get_data(it, amxp_slot_t, it);
650  if(slot->deleted) {
651  continue;
652  }
653  amxp_slot_connect(sig_mngr,
654  name,
655  slot->expr == NULL ? NULL : slot->expr->expression,
656  slot->fn,
657  slot->priv);
658  }
659 
660  retval = 0;
661  }
662 
664 
665 exit:
666  return retval;
667 }
668 
669 int amxp_signal_delete(amxp_signal_t** const signal) {
670  int retval = -1;
671  when_null(signal, exit);
672  when_null(*signal, exit);
673 
675 
676  if(!(*signal)->triggered) {
677  amxc_htable_it_clean(&(*signal)->hit, amxp_free_signals);
678  } else {
679  (*signal)->mngr = NULL;
680  }
681  *signal = NULL;
682 
684 
685  retval = 0;
686 
687 exit:
688  return retval;
689 }
690 
692  const amxc_var_t* const data) {
693  amxp_signal_mngr_t* sig_mngr = NULL;
694  char* sig_name = NULL;
695  static uint32_t recurse = 0;
696 
697  when_null(signal, exit);
698  when_null(signal->mngr, exit);
699  sig_mngr = signal->mngr;
700  when_false(sig_mngr->enabled, exit);
701  when_true(sig_mngr->deleted, exit);
702  when_true(sig_mngr->suspended, exit);
703 
704  recurse++;
705  sig_name = strdup(signal->name);
706  sig_mngr->triggered = true;
707  signal->triggered = true;
708  amxc_llist_for_each(it, (&signal->slots)) {
709  amxp_slot_t* slot = amxc_llist_it_get_data(it, amxp_slot_t, it);
710  // slots that are marked for deletion should not be called anymore
711  if(slot->deleted) {
712  continue;
713  }
714  // always call the remaining slots (if not marked as deleted), even
715  // if the signal is removed (signal->mngr == NULL)
716  amxp_slot_trigger(slot, sig_name, data);
717 
718  }
719  signal->triggered = false;
720  sig_mngr->triggered = false;
721 
722  amxp_sigmngr_trigger_regexp(sig_mngr, sig_name, data);
723  recurse--;
724 
725  if(recurse == 0) {
726  // it is safe now to clean-up memory
727  // garbage collect all deleted slots or the signal
729  }
730 
731  free(sig_name);
732 
733 exit:
734  if(recurse == 0) {
735  // it is safe now to clean-up memory
736  // garbage collect all deleted slots, signals or the signal manager
738  }
739  return;
740 }
741 
742 int amxp_signal_emit(const amxp_signal_t* const signal,
743  const amxc_var_t* const data) {
744  int retval = -1;
745  int write_length = 0;
746  amxp_signal_mngr_t* sig_mngr = NULL;
747  when_null(signal, exit);
748  when_null(signal->mngr, exit);
749  sig_mngr = signal->mngr;
750 
751  when_false(sig_mngr->enabled, exit);
752  when_true(sig_mngr->deleted, exit);
753 
755 
756  if(!sig_mngr->suspended) {
757  write_length = write(amxp_sigctrl.sigpipe[1], "S", 1);
758  when_true_status(write_length != 1, exit, amxp_signal_queue_unlock());
759  }
760 
761  retval = amxp_signal_queue(signal, data);
762 
764 
765 exit:
766  return retval;
767 }
768 
769 int amxp_signal_read(void) {
770  int retval = -1;
771  int read_length = 0;
772  char buffer[1];
773  signal_queue_item_t* item = NULL;
774  amxc_htable_it_t* hit = NULL;
775  amxp_signal_t* signal = NULL;
776 
778 
779  read_length = read(amxp_sigctrl.sigpipe[0], buffer, 1);
780  when_true_status(read_length < 1, exit, amxp_signal_queue_unlock());
781  item = amxp_signal_dequeue();
782  when_null_status(item, exit, amxp_signal_queue_unlock());
783 
785 
786  if(item->sig_name != NULL) {
787  hit = amxc_htable_get(&item->sig_mngr->signals, item->sig_name);
788  signal = amxc_container_of(hit, amxp_signal_t, hit);
789  when_null(signal, exit);
790  amxp_signal_trigger(signal, &item->sig_data);
791  } else if(item->fn != NULL) {
792  item->fn(&item->sig_data, item->priv);
793  }
794 
795  retval = 0;
796 
797 exit:
798  if(item != NULL) {
799  amxc_var_clean(&item->sig_data);
800  free(item->sig_name);
801  free(item);
802  }
803  return retval;
804 }
805 
807  int retval = -1;
808  when_null(signal, exit);
809 
810  amxc_llist_clean(&signal->slots, amxp_free_slots);
811  retval = 0;
812 
813 exit:
814  return retval;
815 }
816 
817 const char* amxp_signal_name(const amxp_signal_t* const signal) {
818  return signal ? signal->name : NULL;
819 }
820 
821 bool amxp_signal_has_slots(const amxp_signal_t* const signal) {
822  bool retval = false;
823 
824  when_null(signal, exit);
825  retval = !amxc_llist_is_empty(&signal->slots);
826  when_false(retval, exit);
827 
828  retval = false;
829  amxc_llist_iterate(it, &signal->slots) {
830  amxp_slot_t* slot = amxc_container_of(it, amxp_slot_t, it);
831  if(!slot->deleted) {
832  retval = true;
833  break;
834  }
835  }
836 
837 exit:
838  return retval;
839 }
840 
841 int amxp_signal_fd(void) {
842  return amxp_sigctrl.sigpipe[0];
843 }
844 
845 CONSTRUCTOR_LVL(101) static void amxp_signals_init(void) {
847  amxc_llist_init(&amxp_sigctrl.sigmngrs);
848  amxc_lqueue_init(&amxp_sigctrl.pending_sigmngrs);
851  pthread_mutex_init(&amxp_sigctrl.mutex, NULL);
852 
853  amxp_sigmngr_add_signal(NULL, "connection-added");
854  amxp_sigmngr_add_signal(NULL, "connection-wait-write");
855  amxp_sigmngr_add_signal(NULL, "listen-added");
856  amxp_sigmngr_add_signal(NULL, "listen-deleted");
857  amxp_sigmngr_add_signal(NULL, "connection-deleted");
858 }
859 
860 DESTRUCTOR_LVL(101) static void amxp_signals_cleanup(void) {
862  if(amxp_sigctrl.sigpipe[0] != -1) {
863  close(amxp_sigctrl.sigpipe[0]);
864  }
865  if(amxp_sigctrl.sigpipe[1] != -1) {
866  close(amxp_sigctrl.sigpipe[1]);
867  }
868  pthread_mutex_destroy(&amxp_sigctrl.mutex);
869 }
static int amxp_signal_queue_unlock(void)
Definition: amxp_signal.c:95
amxp_signal_mngr_t * amxp_get_sigmngr(amxp_signal_mngr_t *sig_mngr)
Definition: amxp_signal.c:315
static void amxp_sigmngr_trigger_regexp(amxp_signal_mngr_t *sig_mngr, const char *name, const amxc_var_t *const data)
Definition: amxp_signal.c:243
struct _signal_queue_item signal_queue_item_t
DESTRUCTOR_LVL(101)
Definition: amxp_signal.c:860
static int amxp_signal_create_pipe(void)
Definition: amxp_signal.c:205
static void amxp_slot_trigger(const amxp_slot_t *const slot, const char *name, const amxc_var_t *const data)
Definition: amxp_signal.c:230
static void amxp_signal_garbage_collect(amxp_signal_t *signal)
Definition: amxp_signal.c:274
void amxp_get_sigmngrs(amxc_llist_t **sigmngrs, amxc_llist_t **pending_sigmngrs)
Definition: amxp_signal.c:319
static int amxp_signal_queue_lock(void)
Definition: amxp_signal.c:91
static signal_ctrl_t amxp_sigctrl
Definition: amxp_signal.c:88
static signal_queue_item_t * amxp_signal_dequeue(void)
Definition: amxp_signal.c:181
static void amxp_free_queued_signals(amxc_lqueue_it_t *it)
Definition: amxp_signal.c:114
struct _signal_ctrl signal_ctrl_t
void amxp_free_slots(amxc_llist_it_t *it)
Definition: amxp_signal.c:99
static void amxp_sigmngr_garbage_collect(amxp_signal_mngr_t *sig_mngr)
Definition: amxp_signal.c:292
static int amxp_signal_queue(const amxp_signal_t *const signal, const amxc_var_t *const data)
Definition: amxp_signal.c:121
CONSTRUCTOR_LVL(101)
Definition: amxp_signal.c:845
static void amxp_free_signals(UNUSED const char *key, amxc_htable_it_t *it)
Definition: amxp_signal.c:107
static int amxp_deferred_queue(amxp_signal_mngr_t *sigmngr, amxp_deferred_fn_t fn, const amxc_var_t *const data, void *priv)
Definition: amxp_signal.c:151
const amxc_htable_t * amxp_get_signals(const amxp_signal_mngr_t *sig_mngr)
Definition: amxp_signal.c:325
static amxp_signal_mngr_t amxp_sigmngr
Definition: amxp_signal.c:89
#define UNUSED
Definition: main.c:68
bool amxp_expr_eval_var(amxp_expr_t *expr, const amxc_var_t *const data, amxp_expr_status_t *status)
Evaluates an expression against a composite variant.
enum _expr_status amxp_expr_status_t
Expression status/error codes.
void amxp_expr_delete(amxp_expr_t **expr)
Deletes a previously allocated expression structure.
@ amxp_expr_status_ok
int amxp_sigmngr_add_signal(amxp_signal_mngr_t *const sig_mngr, const char *name)
Adds a signal to a signal manager.
Definition: amxp_signal.c:433
int amxp_sigmngr_init(amxp_signal_mngr_t *sig_mngr)
Initializing function, initializes members of the amxp_signal_mngr_t structure.
Definition: amxp_signal.c:377
int amxp_sigmngr_new(amxp_signal_mngr_t **sig_mngr)
Constructor function, creates a new signal manager instance.
Definition: amxp_signal.c:330
int amxp_sigmngr_clean(amxp_signal_mngr_t *sig_mngr)
Clean-up functions, cleans-up all members of a amxp_signal_mngr_t structure.
Definition: amxp_signal.c:405
void amxp_sigmngr_trigger_signal(amxp_signal_mngr_t *const sig_mngr, const char *name, const amxc_var_t *const data)
Triggers a signal.
Definition: amxp_signal.c:492
int amxp_sigmngr_remove_signal(amxp_signal_mngr_t *const sig_mngr, const char *name)
Removes a signal from a signal manager.
Definition: amxp_signal.c:448
amxp_signal_t * amxp_sigmngr_find_signal(const amxp_signal_mngr_t *const sig_mngr, const char *name)
Get the pointer to the signal.
Definition: amxp_signal.c:475
int amxp_sigmngr_delete(amxp_signal_mngr_t **sig_mngr)
Destructor function, deletes a signal manager instance.
Definition: amxp_signal.c:349
int amxp_sigmngr_suspend(amxp_signal_mngr_t *const sig_mngr)
Suspends the handling of signals for the signal manager.
Definition: amxp_signal.c:571
int amxp_sigmngr_deferred_call(amxp_signal_mngr_t *const sig_mngr, amxp_deferred_fn_t fn, const amxc_var_t *const data, void *priv)
Defers a function call.
Definition: amxp_signal.c:536
bool amxp_signal_has_slots(const amxp_signal_t *const signal)
Checks if the signal has slots conencted.
Definition: amxp_signal.c:821
int amxp_signal_read(void)
Reads from the amxp signal file descriptor.
Definition: amxp_signal.c:769
const char * amxp_signal_name(const amxp_signal_t *const signal)
Gets the name of the signal.
Definition: amxp_signal.c:817
int amxp_sigmngr_enable(amxp_signal_mngr_t *const sig_mngr, bool enable)
Enables or disables the signal manager.
Definition: amxp_signal.c:561
int amxp_signal_new(amxp_signal_mngr_t *sig_mngr, amxp_signal_t **signal, const char *name)
Constructor function, creates a new signal.
Definition: amxp_signal.c:620
void amxp_signal_trigger(amxp_signal_t *const signal, const amxc_var_t *const data)
Triggers a signal.
Definition: amxp_signal.c:691
int amxp_signal_emit(const amxp_signal_t *const signal, const amxc_var_t *const data)
Emits a signal.
Definition: amxp_signal.c:742
int amxp_sigmngr_resume(amxp_signal_mngr_t *const sig_mngr)
Resumes the handling of signals for the signal manager.
Definition: amxp_signal.c:595
int amxp_signal_fd(void)
Gets the amxp signal file descriptor.
Definition: amxp_signal.c:841
int amxp_signal_disconnect_all(amxp_signal_t *const signal)
Disconnects all slots from the signal.
Definition: amxp_signal.c:806
int amxp_signal_delete(amxp_signal_t **const signal)
Destructor function, deletes a signal.
Definition: amxp_signal.c:669
int amxp_sigmngr_emit_signal(const amxp_signal_mngr_t *const sig_mngr, const char *name, const amxc_var_t *const data)
Emits a signal.
Definition: amxp_signal.c:514
void(* amxp_deferred_fn_t)(const amxc_var_t *const data, void *const priv)
Deferred callback function signature.
Definition: amxp_signal.h:137
int amxp_slot_connect(amxp_signal_mngr_t *const sig_mngr, const char *const sig_name, const char *const expression, amxp_slot_fn_t fn, void *const priv)
Connects a slot (function) to a named signal of a signal manager.
Definition: amxp_slot.c:300
char * expression
Structure containing the signal manager information.
Definition: amxp_signal.h:103
amxc_llist_it_t it
Definition: amxp_signal.h:107
amxc_htable_t signals
Definition: amxp_signal.h:104
amxc_lqueue_t signal_queue
Definition: amxp_signal.h:105
amxc_llist_t regexp_slots
Definition: amxp_signal.h:106
Structure containing the signal information.
Definition: amxp_signal.h:119
amxc_llist_t slots
Definition: amxp_signal.h:121
amxp_signal_mngr_t * mngr
Definition: amxp_signal.h:123
amxc_htable_it_t hit
Definition: amxp_signal.h:120
const char * name
Definition: amxp_signal.h:122
amxp_slot_fn_t fn
amxp_expr_t * expr
int sigpipe[2]
Definition: amxp_signal.c:72
amxp_signal_t * sigall
Definition: amxp_signal.c:73
amxc_llist_t sigmngrs
Definition: amxp_signal.c:74
pthread_mutex_t mutex
Definition: amxp_signal.c:76
amxc_llist_t pending_sigmngrs
Definition: amxp_signal.c:75
amxc_lqueue_it_t it
Definition: amxp_signal.c:82
amxc_var_t sig_data
Definition: amxp_signal.c:81
amxp_deferred_fn_t fn
Definition: amxp_signal.c:84
amxp_signal_mngr_t * sig_mngr
Definition: amxp_signal.c:83
static amxp_signal_mngr_t * sigmngr