Ubus
OpenWrt system message/RPC bus.
libubus-req.c
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2011-2014 Felix Fietkau <nbd@openwrt.org>
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU Lesser General Public License version 2.1
6  * as published by the Free Software Foundation
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU General Public License for more details.
12  */
13 
14 #include <unistd.h>
15 #include "libubus.h"
16 #include "libubus-internal.h"
17 
19  struct list_head list;
20  int type;
21  struct blob_attr data[];
22 };
23 
24 static void req_data_cb(struct ubus_request *req, int type, struct blob_attr *data)
25 {
26  struct blob_attr **attr;
27 
28  if (req->raw_data_cb)
29  req->raw_data_cb(req, type, data);
30 
31  if (!req->data_cb)
32  return;
33 
34  attr = ubus_parse_msg(data, blob_raw_len(data));
35  if (!attr[UBUS_ATTR_DATA])
36  return;
37 
38  req->data_cb(req, type, attr[UBUS_ATTR_DATA]);
39 }
40 
41 static void __ubus_process_req_data(struct ubus_request *req)
42 {
43  struct ubus_pending_data *data, *tmp;
44 
45  list_for_each_entry_safe(data, tmp, &req->pending, list) {
46  list_del(&data->list);
47  if (!req->cancelled)
48  req_data_cb(req, data->type, data->data);
49  free(data);
50  }
51 }
52 
54  struct blob_attr *msg, int cmd, uint32_t peer)
55 {
56 
57  if (msg && blob_pad_len(msg) > UBUS_MAX_MSGLEN)
58  return -1;
59 
60  INIT_LIST_HEAD(&req->list);
61  INIT_LIST_HEAD(&req->pending);
62  req->ctx = ctx;
63  req->peer = peer;
64  req->seq = ++ctx->request_seq;
65 
66  return ubus_send_msg(ctx, req->seq, msg, cmd, peer, req->fd);
67 }
68 
70  struct blob_attr *msg, int cmd, uint32_t peer)
71 {
72  memset(req, 0, sizeof(*req));
73 
74  req->fd = -1;
75 
76  return __ubus_start_request(ctx, req, msg, cmd, peer);
77 }
78 
79 
80 void ubus_abort_request(struct ubus_context *ctx, struct ubus_request *req)
81 {
82  if (list_empty(&req->list))
83  return;
84 
85  req->cancelled = true;
87  list_del_init(&req->list);
88 }
89 
91 {
92  if (!list_empty(&req->list))
93  return;
94 
95  list_add(&req->list, &ctx->requests);
96 }
97 
98 static void
100 {
102 
103  if (!cb)
104  return;
105 
106  req->complete_cb = NULL;
107  cb(req, req->status_code);
108 }
109 
110 static void
111 ubus_set_req_status(struct ubus_request *req, int ret)
112 {
113  if (!list_empty(&req->list))
114  list_del_init(&req->list);
115 
116  req->status_msg = true;
117  req->status_code = ret;
118  if (!req->blocked)
120 }
121 
122 static void ubus_sync_req_cb(struct ubus_request *req, int ret)
123 {
124  req->status_msg = true;
125  req->status_code = ret;
126  req->ctx->cancel_poll = true;
127 }
128 
129 static int64_t get_time_msec(void)
130 {
131  struct timespec ts;
132  int64_t val;
133 
134  clock_gettime(CLOCK_MONOTONIC, &ts);
135  val = (int64_t) ts.tv_sec * 1000LL;
136  val += ts.tv_nsec / 1000000LL;
137  return val;
138 }
139 
141  int req_timeout)
142 {
143  ubus_complete_handler_t complete_cb = req->complete_cb;
144  int status = UBUS_STATUS_NO_DATA;
145  int64_t timeout = 0, time_end = 0;
146 
147  if (req_timeout)
148  time_end = get_time_msec() + req_timeout;
149 
152 
153  ctx->stack_depth++;
154  while (!req->status_msg) {
155  if (req_timeout) {
156  timeout = time_end - get_time_msec();
157  if (timeout <= 0) {
159  break;
160  }
161  }
162 
163  ubus_poll_data(ctx, (unsigned int) timeout);
164 
165  if (ctx->sock.eof) {
167  ctx->cancel_poll = true;
168  break;
169  }
170  }
171 
172  ctx->stack_depth--;
173  if (ctx->stack_depth)
174  ctx->cancel_poll = true;
175 
176  if (req->status_msg)
177  status = req->status_code;
178 
179  req->complete_cb = complete_cb;
180  if (req->complete_cb)
181  req->complete_cb(req, status);
182 
183  if (!ctx->stack_depth && !ctx->sock.registered)
185 
186  return status;
187 }
188 
190 {
191  blob_buf_init(&b, 0);
192  blob_put_int32(&b, UBUS_ATTR_STATUS, ret);
193  blob_put_int32(&b, UBUS_ATTR_OBJID, req->object);
194  ubus_send_msg(ctx, req->seq, b.head, UBUS_MSG_STATUS, req->peer, req->fd);
195 }
196 
197 static void ubus_put_data(struct blob_buf *buf, struct blob_attr *msg)
198 {
199  if (msg)
200  blob_put(buf, UBUS_ATTR_DATA, blob_data(msg), blob_len(msg));
201  else
202  blob_put(buf, UBUS_ATTR_DATA, NULL, 0);
203 }
204 
206  struct blob_attr *msg)
207 {
208  int ret;
209 
210  blob_buf_init(&b, 0);
211  blob_put_int32(&b, UBUS_ATTR_OBJID, req->object);
212  ubus_put_data(&b, msg);
213  ret = ubus_send_msg(ctx, req->seq, b.head, UBUS_MSG_DATA, req->peer, -1);
214  if (ret < 0)
215  return UBUS_STATUS_NO_DATA;
216 
217  return 0;
218 }
219 
220 int ubus_invoke_async_fd(struct ubus_context *ctx, uint32_t obj,
221  const char *method, struct blob_attr *msg,
222  struct ubus_request *req, int fd)
223 {
224  blob_buf_init(&b, 0);
225  blob_put_int32(&b, UBUS_ATTR_OBJID, obj);
226  blob_put_string(&b, UBUS_ATTR_METHOD, method);
227  ubus_put_data(&b, msg);
228 
229  memset(req, 0, sizeof(*req));
230  req->fd = fd;
231  if (__ubus_start_request(ctx, req, b.head, UBUS_MSG_INVOKE, obj) < 0)
233  return 0;
234 }
235 
236 int ubus_invoke_fd(struct ubus_context *ctx, uint32_t obj, const char *method,
237  struct blob_attr *msg, ubus_data_handler_t cb, void *priv,
238  int timeout, int fd)
239 {
240  struct ubus_request req;
241  int rc;
242 
243  rc = ubus_invoke_async_fd(ctx, obj, method, msg, &req, fd);
244  if (rc)
245  return rc;
246 
247  req.data_cb = cb;
248  req.priv = priv;
249  return ubus_complete_request(ctx, &req, timeout);
250 }
251 
252 static void
254 {
255  struct ubus_notify_request *nreq;
256 
257  nreq = container_of(req, struct ubus_notify_request, req);
258  if (!nreq->complete_cb)
259  return;
260 
261  nreq->complete_cb(nreq, 0, 0);
262 }
263 
264 static void
265 ubus_notify_data_cb(struct ubus_request *req, int type, struct blob_attr *msg)
266 {
267  struct ubus_notify_request *nreq;
268 
269  nreq = container_of(req, struct ubus_notify_request, req);
270  if (!nreq->data_cb)
271  return;
272 
273  nreq->data_cb(nreq, type, msg);
274 }
275 
276 static int
278  const char *type, struct blob_attr *msg,
279  struct ubus_notify_request *req, bool reply)
280 {
281  memset(req, 0, sizeof(*req));
282 
283  blob_buf_init(&b, 0);
284  blob_put_int32(&b, UBUS_ATTR_OBJID, obj->id);
285  blob_put_string(&b, UBUS_ATTR_METHOD, type);
286  ubus_put_data(&b, msg);
287 
288  if (!reply)
289  blob_put_int8(&b, UBUS_ATTR_NO_REPLY, true);
290 
291  if (ubus_start_request(ctx, &req->req, b.head, UBUS_MSG_NOTIFY, obj->id) < 0)
293 
294  /* wait for status message from ubusd first */
295  req->req.notify = true;
296  req->pending = 1;
297  req->id[0] = obj->id;
300 
301  return 0;
302 }
303 
305  const char *type, struct blob_attr *msg,
306  struct ubus_notify_request *req)
307 {
308  return __ubus_notify_async(ctx, obj, type, msg, req, true);
309 }
310 
311 int ubus_notify(struct ubus_context *ctx, struct ubus_object *obj,
312  const char *type, struct blob_attr *msg, int timeout)
313 {
314  struct ubus_notify_request req;
315  int ret;
316 
317  ret = __ubus_notify_async(ctx, obj, type, msg, &req, timeout >= 0);
318  if (ret < 0)
319  return ret;
320 
321  if (timeout < 0) {
322  ubus_abort_request(ctx, &req.req);
323  return 0;
324  }
325 
326  return ubus_complete_request(ctx, &req.req, timeout);
327 }
328 
329 static bool ubus_get_status(struct ubus_msghdr_buf *buf, int *ret)
330 {
331  struct blob_attr **attrbuf = ubus_parse_msg(buf->data, blob_raw_len(buf->data));
332 
334  return false;
335 
336  *ret = blob_get_u32(attrbuf[UBUS_ATTR_STATUS]);
337  return true;
338 }
339 
340 static int
342 {
344 
345  ubus_get_status(buf, &ret);
346  req->peer = buf->hdr.peer;
347  ubus_set_req_status(req, ret);
348 
349  return ret;
350 }
351 
352 static void
354 {
355  struct ubus_pending_data *data;
356  int len;
357 
358  if (!req->blocked) {
359  req->blocked = true;
360  req_data_cb(req, buf->hdr.type, buf->data);
362  req->blocked = false;
363 
364  if (req->status_msg)
366 
367  return;
368  }
369 
370  len = blob_raw_len(buf->data);
371  data = calloc(1, sizeof(*data) + len);
372  if (!data)
373  return;
374 
375  data->type = buf->hdr.type;
376  memcpy(data->data, buf->data, len);
377  list_add(&data->list, &req->pending);
378 }
379 
380 static int
381 ubus_find_notify_id(struct ubus_notify_request *n, uint32_t objid)
382 {
383  uint32_t pending = n->pending;
384  int i;
385 
386  for (i = 0; pending; i++, pending >>= 1) {
387  if (!(pending & 1))
388  continue;
389 
390  if (n->id[i] == objid)
391  return i;
392  }
393 
394  return -1;
395 }
396 
397 static struct ubus_request *
398 ubus_find_request(struct ubus_context *ctx, uint32_t seq, uint32_t peer, int *id)
399 {
400  struct ubus_request *req;
401 
402  list_for_each_entry(req, &ctx->requests, list) {
403  struct ubus_notify_request *nreq;
404  nreq = container_of(req, struct ubus_notify_request, req);
405 
406  if (seq != req->seq)
407  continue;
408 
409  if (req->notify) {
410  if (!nreq->pending)
411  continue;
412 
413  *id = ubus_find_notify_id(nreq, peer);
414  if (*id < 0)
415  continue;
416  } else if (peer != req->peer)
417  continue;
418 
419  return req;
420  }
421  return NULL;
422 }
423 
424 static void ubus_process_notify_status(struct ubus_request *req, int id, struct ubus_msghdr_buf *buf)
425 {
426  struct ubus_notify_request *nreq;
427  struct blob_attr **tb;
428  struct blob_attr *cur;
429  size_t rem;
430  int idx = 1;
431  int ret = 0;
432 
433  nreq = container_of(req, struct ubus_notify_request, req);
434  nreq->pending &= ~(1 << id);
435 
436  if (!id) {
437  /* first id: ubusd's status message with a list of ids */
438  tb = ubus_parse_msg(buf->data, blob_raw_len(buf->data));
439  if (tb[UBUS_ATTR_SUBSCRIBERS]) {
440  blob_for_each_attr(cur, tb[UBUS_ATTR_SUBSCRIBERS], rem) {
441  if (!blob_check_type(blob_data(cur), blob_len(cur), BLOB_ATTR_INT32))
442  continue;
443 
444  nreq->pending |= (1 << idx);
445  nreq->id[idx] = blob_get_int32(cur);
446  idx++;
447 
448  if (idx == UBUS_MAX_NOTIFY_PEERS + 1)
449  break;
450  }
451  }
452  } else {
453  ubus_get_status(buf, &ret);
454  if (nreq->status_cb)
455  nreq->status_cb(nreq, id, ret);
456  }
457 
458  if (!nreq->pending)
459  ubus_set_req_status(req, 0);
460 }
461 
463 {
464  struct ubus_msghdr *hdr = &buf->hdr;
465  struct ubus_request *req;
466  int id = -1;
467 
468  switch(hdr->type) {
469  case UBUS_MSG_STATUS:
470  req = ubus_find_request(ctx, hdr->seq, hdr->peer, &id);
471  if (!req)
472  break;
473 
474  if (fd >= 0) {
475  if (req->fd_cb)
476  req->fd_cb(req, fd);
477  else
478  close(fd);
479  }
480 
481  if (id >= 0)
482  ubus_process_notify_status(req, id, buf);
483  else
484  ubus_process_req_status(req, buf);
485  break;
486 
487  case UBUS_MSG_DATA:
488  req = ubus_find_request(ctx, hdr->seq, hdr->peer, &id);
489  if (req && (req->data_cb || req->raw_data_cb))
490  ubus_process_req_data(req, buf);
491  break;
492  }
493 }
494 
495 int __ubus_monitor(struct ubus_context *ctx, const char *type)
496 {
497  blob_buf_init(&b, 0);
498  return ubus_invoke(ctx, UBUS_SYSTEM_OBJECT_MONITOR, type, b.head, NULL, NULL, 1000);
499 }
static int timeout
Definition: cli.c:21
static struct blob_buf b
Definition: cli.c:19
int(* cb)(struct ubus_context *ctx, int argc, char **argv)
Definition: cli.c:581
static struct ubus_context * ctx
Definition: client.c:22
int ubus_send_msg(struct ubus_context *ctx, uint32_t seq, struct blob_attr *msg, int cmd, uint32_t peer, int fd)
Definition: libubus-io.c:126
void __hidden ubus_poll_data(struct ubus_context *ctx, int timeout)
Definition: libubus-io.c:335
struct blob_attr ** ubus_parse_msg(struct blob_attr *msg, size_t len)
Definition: libubus-io.c:46
static struct blob_attr * attrbuf[UBUS_ATTR_MAX]
Definition: libubus-io.c:44
void ubus_complete_deferred_request(struct ubus_context *ctx, struct ubus_request_data *req, int ret)
Definition: libubus-req.c:189
static void ubus_process_req_data(struct ubus_request *req, struct ubus_msghdr_buf *buf)
Definition: libubus-req.c:353
static void ubus_put_data(struct blob_buf *buf, struct blob_attr *msg)
Definition: libubus-req.c:197
int ubus_notify(struct ubus_context *ctx, struct ubus_object *obj, const char *type, struct blob_attr *msg, int timeout)
Definition: libubus-req.c:311
static int ubus_process_req_status(struct ubus_request *req, struct ubus_msghdr_buf *buf)
Definition: libubus-req.c:341
static void req_data_cb(struct ubus_request *req, int type, struct blob_attr *data)
Definition: libubus-req.c:24
int ubus_complete_request(struct ubus_context *ctx, struct ubus_request *req, int req_timeout)
Definition: libubus-req.c:140
int __hidden ubus_start_request(struct ubus_context *ctx, struct ubus_request *req, struct blob_attr *msg, int cmd, uint32_t peer)
Definition: libubus-req.c:69
int ubus_invoke_async_fd(struct ubus_context *ctx, uint32_t obj, const char *method, struct blob_attr *msg, struct ubus_request *req, int fd)
Definition: libubus-req.c:220
int ubus_invoke_fd(struct ubus_context *ctx, uint32_t obj, const char *method, struct blob_attr *msg, ubus_data_handler_t cb, void *priv, int timeout, int fd)
Definition: libubus-req.c:236
static bool ubus_get_status(struct ubus_msghdr_buf *buf, int *ret)
Definition: libubus-req.c:329
void __hidden ubus_process_req_msg(struct ubus_context *ctx, struct ubus_msghdr_buf *buf, int fd)
Definition: libubus-req.c:462
static int64_t get_time_msec(void)
Definition: libubus-req.c:129
int __hidden __ubus_start_request(struct ubus_context *ctx, struct ubus_request *req, struct blob_attr *msg, int cmd, uint32_t peer)
Definition: libubus-req.c:53
void ubus_abort_request(struct ubus_context *ctx, struct ubus_request *req)
Definition: libubus-req.c:80
static void ubus_set_req_status(struct ubus_request *req, int ret)
Definition: libubus-req.c:111
static void __ubus_process_req_data(struct ubus_request *req)
Definition: libubus-req.c:41
static void ubus_req_complete_cb(struct ubus_request *req)
Definition: libubus-req.c:99
int ubus_send_reply(struct ubus_context *ctx, struct ubus_request_data *req, struct blob_attr *msg)
Definition: libubus-req.c:205
static void ubus_notify_complete_cb(struct ubus_request *req, int ret)
Definition: libubus-req.c:253
static void ubus_sync_req_cb(struct ubus_request *req, int ret)
Definition: libubus-req.c:122
int ubus_notify_async(struct ubus_context *ctx, struct ubus_object *obj, const char *type, struct blob_attr *msg, struct ubus_notify_request *req)
Definition: libubus-req.c:304
static int __ubus_notify_async(struct ubus_context *ctx, struct ubus_object *obj, const char *type, struct blob_attr *msg, struct ubus_notify_request *req, bool reply)
Definition: libubus-req.c:277
void ubus_complete_request_async(struct ubus_context *ctx, struct ubus_request *req)
Definition: libubus-req.c:90
static void ubus_process_notify_status(struct ubus_request *req, int id, struct ubus_msghdr_buf *buf)
Definition: libubus-req.c:424
int __ubus_monitor(struct ubus_context *ctx, const char *type)
Definition: libubus-req.c:495
static struct ubus_request * ubus_find_request(struct ubus_context *ctx, uint32_t seq, uint32_t peer, int *id)
Definition: libubus-req.c:398
static int ubus_find_notify_id(struct ubus_notify_request *n, uint32_t objid)
Definition: libubus-req.c:381
static void ubus_notify_data_cb(struct ubus_request *req, int type, struct blob_attr *msg)
Definition: libubus-req.c:265
const struct ubus_method watch_method __hidden
Definition: libubus-sub.c:29
void(* ubus_complete_handler_t)(struct ubus_request *req, int ret)
Definition: libubus.h:60
#define UBUS_MAX_NOTIFY_PEERS
Definition: libubus.h:29
void(* ubus_data_handler_t)(struct ubus_request *req, int type, struct blob_attr *msg)
Definition: libubus.h:57
static int ubus_invoke(struct ubus_context *ctx, uint32_t obj, const char *method, struct blob_attr *msg, ubus_data_handler_t cb, void *priv, int timeout)
Definition: libubus.h:354
int stack_depth
Definition: libubus.h:168
struct uloop_timeout pending_timer
Definition: libubus.h:163
struct list_head requests
Definition: libubus.h:158
bool cancel_poll
Definition: libubus.h:167
struct uloop_fd sock
Definition: libubus.h:162
uint16_t request_seq
Definition: libubus.h:166
struct ubus_msghdr hdr
Definition: libubus.h:42
struct blob_attr * data
Definition: libubus.h:43
uint32_t peer
Definition: ubusmsg.h:33
uint8_t type
Definition: ubusmsg.h:31
uint16_t seq
Definition: ubusmsg.h:32
uint32_t pending
Definition: libubus.h:238
struct ubus_request req
Definition: libubus.h:232
ubus_notify_complete_handler_t complete_cb
Definition: libubus.h:235
ubus_notify_complete_handler_t status_cb
Definition: libubus.h:234
ubus_notify_data_handler_t data_cb
Definition: libubus.h:236
uint32_t id[16+1]
Definition: libubus.h:239
uint32_t id
Definition: libubus.h:130
struct blob_attr data[]
Definition: libubus-req.c:21
struct list_head list
Definition: libubus-req.c:19
uint16_t seq
Definition: libubus.h:197
uint32_t object
Definition: libubus.h:195
uint32_t peer
Definition: libubus.h:196
ubus_data_handler_t raw_data_cb
Definition: libubus.h:220
ubus_complete_handler_t complete_cb
Definition: libubus.h:223
bool blocked
Definition: libubus.h:213
int status_code
Definition: libubus.h:211
bool status_msg
Definition: libubus.h:212
void * priv
Definition: libubus.h:228
uint32_t peer
Definition: libubus.h:217
uint16_t seq
Definition: libubus.h:218
struct list_head pending
Definition: libubus.h:210
bool notify
Definition: libubus.h:215
ubus_data_handler_t data_cb
Definition: libubus.h:221
struct ubus_context * ctx
Definition: libubus.h:227
bool cancelled
Definition: libubus.h:214
struct list_head list
Definition: libubus.h:208
ubus_fd_handler_t fd_cb
Definition: libubus.h:222
uint16_t seq
Definition: ubusmsg.h:2
uint8_t type
Definition: ubusmsg.h:1
uint32_t peer
Definition: ubusmsg.h:3
@ UBUS_MSG_INVOKE
Definition: ubusmsg.h:53
@ UBUS_MSG_DATA
Definition: ubusmsg.h:44
@ UBUS_MSG_STATUS
Definition: ubusmsg.h:41
@ UBUS_MSG_NOTIFY
Definition: ubusmsg.h:71
#define UBUS_SYSTEM_OBJECT_MONITOR
Definition: ubusmsg.h:26
@ UBUS_STATUS_INVALID_ARGUMENT
Definition: ubusmsg.h:121
@ UBUS_STATUS_NO_DATA
Definition: ubusmsg.h:124
@ UBUS_STATUS_TIMEOUT
Definition: ubusmsg.h:126
@ UBUS_STATUS_CONNECTION_FAILED
Definition: ubusmsg.h:129
@ UBUS_ATTR_METHOD
Definition: ubusmsg.h:86
@ UBUS_ATTR_SUBSCRIBERS
Definition: ubusmsg.h:97
@ UBUS_ATTR_STATUS
Definition: ubusmsg.h:82
@ UBUS_ATTR_NO_REPLY
Definition: ubusmsg.h:95
@ UBUS_ATTR_OBJID
Definition: ubusmsg.h:85
@ UBUS_ATTR_DATA
Definition: ubusmsg.h:91