Ubus
OpenWrt system message/RPC bus.
libubus-io.c
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2011-2014 Felix Fietkau <nbd@openwrt.org>
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU Lesser General Public License version 2.1
6  * as published by the Free Software Foundation
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU General Public License for more details.
12  */
13 
14 #define _GNU_SOURCE
15 #include <sys/types.h>
16 #include <sys/uio.h>
17 #include <sys/socket.h>
18 
19 #include <unistd.h>
20 #include <fcntl.h>
21 #include <poll.h>
22 
23 #include <libubox/usock.h>
24 #include <libubox/blob.h>
25 #include <libubox/blobmsg.h>
26 
27 #include "libubus.h"
28 #include "libubus-internal.h"
29 
30 #define STATIC_IOV(_var) { .iov_base = (char *) &(_var), .iov_len = sizeof(_var) }
31 
32 #define UBUS_MSGBUF_REDUCTION_INTERVAL 16
33 
34 static const struct blob_attr_info ubus_policy[UBUS_ATTR_MAX] = {
35  [UBUS_ATTR_STATUS] = { .type = BLOB_ATTR_INT32 },
36  [UBUS_ATTR_OBJID] = { .type = BLOB_ATTR_INT32 },
37  [UBUS_ATTR_OBJPATH] = { .type = BLOB_ATTR_STRING },
38  [UBUS_ATTR_METHOD] = { .type = BLOB_ATTR_STRING },
39  [UBUS_ATTR_ACTIVE] = { .type = BLOB_ATTR_INT8 },
40  [UBUS_ATTR_NO_REPLY] = { .type = BLOB_ATTR_INT8 },
41  [UBUS_ATTR_SUBSCRIBERS] = { .type = BLOB_ATTR_NESTED },
42 };
43 
44 static struct blob_attr *attrbuf[UBUS_ATTR_MAX];
45 
46 __hidden struct blob_attr **ubus_parse_msg(struct blob_attr *msg, size_t len)
47 {
48  blob_parse_untrusted(msg, len, attrbuf, ubus_policy, UBUS_ATTR_MAX);
49  return attrbuf;
50 }
51 
52 static void wait_data(int fd, bool write)
53 {
54  struct pollfd pfd = { .fd = fd };
55 
56  pfd.events = write ? POLLOUT : POLLIN;
57  poll(&pfd, 1, -1);
58 }
59 
60 static int writev_retry(int fd, struct iovec *iov, int iov_len, int sock_fd)
61 {
62  uint8_t fd_buf[CMSG_SPACE(sizeof(int))] = { 0 };
63  struct msghdr msghdr = { 0 };
64  struct cmsghdr *cmsg;
65  int len = 0;
66  int *pfd;
67 
68  msghdr.msg_iov = iov,
69  msghdr.msg_iovlen = iov_len,
70  msghdr.msg_control = fd_buf;
71  msghdr.msg_controllen = sizeof(fd_buf);
72 
73  cmsg = CMSG_FIRSTHDR(&msghdr);
74  cmsg->cmsg_type = SCM_RIGHTS;
75  cmsg->cmsg_level = SOL_SOCKET;
76  cmsg->cmsg_len = CMSG_LEN(sizeof(int));
77 
78  pfd = (int *) CMSG_DATA(cmsg);
79  msghdr.msg_controllen = cmsg->cmsg_len;
80 
81  do {
82  ssize_t cur_len;
83 
84  if (sock_fd < 0) {
85  msghdr.msg_control = NULL;
86  msghdr.msg_controllen = 0;
87  } else {
88  *pfd = sock_fd;
89  }
90 
91  cur_len = sendmsg(fd, &msghdr, 0);
92  if (cur_len < 0) {
93  switch(errno) {
94  case EAGAIN:
95  wait_data(fd, true);
96  break;
97  case EINTR:
98  break;
99  default:
100  return -1;
101  }
102  continue;
103  }
104 
105  if (len > 0)
106  sock_fd = -1;
107 
108  len += cur_len;
109  while (cur_len >= (ssize_t) iov->iov_len) {
110  cur_len -= iov->iov_len;
111  iov_len--;
112  iov++;
113  if (!iov_len)
114  return len;
115  }
116  iov->iov_base += cur_len;
117  iov->iov_len -= cur_len;
118  msghdr.msg_iov = iov;
119  msghdr.msg_iovlen = iov_len;
120  } while (1);
121 
122  /* Should never reach here */
123  return -1;
124 }
125 
126 int __hidden ubus_send_msg(struct ubus_context *ctx, uint32_t seq,
127  struct blob_attr *msg, int cmd, uint32_t peer, int fd)
128 {
129  struct ubus_msghdr hdr;
130  struct iovec iov[2] = {
131  STATIC_IOV(hdr)
132  };
133  int ret;
134 
135  hdr.version = 0;
136  hdr.type = cmd;
137  hdr.seq = cpu_to_be16(seq);
138  hdr.peer = cpu_to_be32(peer);
139 
140  if (!msg) {
141  blob_buf_init(&b, 0);
142  msg = b.head;
143  }
144 
145  iov[1].iov_base = (char *) msg;
146  iov[1].iov_len = blob_raw_len(msg);
147 
148  ret = writev_retry(ctx->sock.fd, iov, ARRAY_SIZE(iov), fd);
149  if (ret < 0)
150  ctx->sock.eof = true;
151 
152  if (fd >= 0)
153  close(fd);
154 
155  return ret;
156 }
157 
158 static int recv_retry(struct ubus_context *ctx, struct iovec *iov, bool wait, int *recv_fd)
159 {
160  uint8_t fd_buf[CMSG_SPACE(sizeof(int))] = { 0 };
161  struct msghdr msghdr = { 0 };
162  struct cmsghdr *cmsg;
163  int total = 0;
164  int bytes;
165  int *pfd;
166  int fd;
167 
168  fd = ctx->sock.fd;
169 
170  msghdr.msg_iov = iov,
171  msghdr.msg_iovlen = 1,
172  msghdr.msg_control = fd_buf;
173  msghdr.msg_controllen = sizeof(fd_buf);
174 
175  cmsg = CMSG_FIRSTHDR(&msghdr);
176  cmsg->cmsg_type = SCM_RIGHTS;
177  cmsg->cmsg_level = SOL_SOCKET;
178  cmsg->cmsg_len = CMSG_LEN(sizeof(int));
179 
180  pfd = (int *) CMSG_DATA(cmsg);
181 
182  while (iov->iov_len > 0) {
183  if (recv_fd) {
184  msghdr.msg_control = fd_buf;
185  msghdr.msg_controllen = cmsg->cmsg_len;
186  } else {
187  msghdr.msg_control = NULL;
188  msghdr.msg_controllen = 0;
189  }
190 
191  *pfd = -1;
192  bytes = recvmsg(fd, &msghdr, 0);
193  if (!bytes)
194  return -1;
195 
196  if (bytes < 0) {
197  bytes = 0;
198  if (errno == EINTR)
199  continue;
200 
201  if (errno != EAGAIN)
202  return -1;
203  }
204  if (!wait && !bytes)
205  return 0;
206 
207  if (recv_fd)
208  *recv_fd = *pfd;
209 
210  recv_fd = NULL;
211 
212  wait = true;
213  iov->iov_len -= bytes;
214  iov->iov_base += bytes;
215  total += bytes;
216 
217  if (iov->iov_len > 0)
218  wait_data(fd, false);
219  }
220 
221  return total;
222 }
223 
225 {
226  struct blob_attr *data = (struct blob_attr *) (hdr + 1);
227 
228  if (hdr->version != 0)
229  return false;
230 
231  if (blob_raw_len(data) < sizeof(*data))
232  return false;
233 
234  if (blob_pad_len(data) > UBUS_MAX_MSGLEN)
235  return false;
236 
237  return true;
238 }
239 
240 static bool alloc_msg_buf(struct ubus_context *ctx, int len)
241 {
242  void *ptr;
243  int buf_len = ctx->msgbuf_data_len;
244  int rem;
245 
246  if (!ctx->msgbuf.data)
247  buf_len = 0;
248 
249  rem = (len % UBUS_MSG_CHUNK_SIZE);
250  if (rem > 0)
251  len += UBUS_MSG_CHUNK_SIZE - rem;
252 
253  if (len < buf_len &&
256  buf_len = 0;
257  }
258 
259  if (len <= buf_len)
260  return true;
261 
262  ptr = realloc(ctx->msgbuf.data, len);
263  if (!ptr)
264  return false;
265 
266  ctx->msgbuf.data = ptr;
267  ctx->msgbuf_data_len = len;
268  return true;
269 }
270 
271 static bool get_next_msg(struct ubus_context *ctx, int *recv_fd)
272 {
273  struct {
274  struct ubus_msghdr hdr;
275  struct blob_attr data;
276  } hdrbuf;
277  struct iovec iov = STATIC_IOV(hdrbuf);
278  int len;
279  int r;
280 
281  /* receive header + start attribute */
282  r = recv_retry(ctx, &iov, false, recv_fd);
283  if (r <= 0) {
284  if (r < 0)
285  ctx->sock.eof = true;
286 
287  return false;
288  }
289 
290  hdrbuf.hdr.seq = be16_to_cpu(hdrbuf.hdr.seq);
291  hdrbuf.hdr.peer = be32_to_cpu(hdrbuf.hdr.peer);
292 
293  if (!ubus_validate_hdr(&hdrbuf.hdr))
294  return false;
295 
296  len = blob_raw_len(&hdrbuf.data);
297  if (!alloc_msg_buf(ctx, len))
298  return false;
299 
300  memcpy(&ctx->msgbuf.hdr, &hdrbuf.hdr, sizeof(hdrbuf.hdr));
301  memcpy(ctx->msgbuf.data, &hdrbuf.data, sizeof(hdrbuf.data));
302 
303  iov.iov_base = (char *)ctx->msgbuf.data + sizeof(hdrbuf.data);
304  iov.iov_len = blob_len(ctx->msgbuf.data);
305  if (iov.iov_len > 0 &&
306  recv_retry(ctx, &iov, true, NULL) <= 0)
307  return false;
308 
309  return true;
310 }
311 
312 void __hidden ubus_handle_data(struct uloop_fd *u, unsigned int events)
313 {
314  struct ubus_context *ctx = container_of(u, struct ubus_context, sock);
315  int recv_fd = -1;
316 
317  while (1) {
318  if (!ctx->stack_depth)
320 
321  if (!get_next_msg(ctx, &recv_fd))
322  break;
323  ubus_process_msg(ctx, &ctx->msgbuf, recv_fd);
324  if (uloop_cancelling() || ctx->cancel_poll)
325  break;
326  }
327 
328  if (!ctx->stack_depth)
330 
331  if (u->eof)
333 }
334 
336 {
337  struct pollfd pfd = {
338  .fd = ctx->sock.fd,
339  .events = POLLIN | POLLERR,
340  };
341 
342  ctx->cancel_poll = false;
343  poll(&pfd, 1, timeout ? timeout : -1);
344  ubus_handle_data(&ctx->sock, ULOOP_READ);
345 }
346 
347 static void
349  void *priv)
350 {
351  struct ubus_subscriber *s;
352 
353  list_for_each_entry(s, &ctx->auto_subscribers, list)
354  if (s->new_obj_cb(ctx, s, obj->path))
355  ubus_subscribe(ctx, s, obj->id);
356 }
357 
358 static void
360 {
362 
363  if (list_empty(&ctx->auto_subscribers))
364  return;
365 
366  ubus_register_event_handler(ctx, ev, "ubus.object.add");
367  ubus_lookup(ctx, NULL, ubus_auto_sub_lookup, NULL);
368 }
369 
370 static void
372 {
373  struct ubus_object *obj, *tmp;
374  struct ubus_object **objs;
375  int n, i = 0;
376 
377  /* clear all type IDs, they need to be registered again */
378  avl_for_each_element(&ctx->objects, obj, avl)
379  if (obj->type)
380  obj->type->id = 0;
381 
382  /* push out all objects again */
383  objs = alloca(ctx->objects.count * sizeof(*objs));
384  avl_remove_all_elements(&ctx->objects, obj, avl, tmp) {
385  objs[i++] = obj;
386  obj->id = 0;
387  }
388 
389  for (n = i, i = 0; i < n; i++)
390  ubus_add_object(ctx, objs[i]);
391 
393 }
394 
395 int ubus_reconnect(struct ubus_context *ctx, const char *path)
396 {
397  struct {
398  struct ubus_msghdr hdr;
399  struct blob_attr data;
400  } hdr;
401  struct blob_attr *buf;
402  int ret = UBUS_STATUS_UNKNOWN_ERROR;
403 
404  if (!path)
405  path = UBUS_UNIX_SOCKET;
406 
407  if (ctx->sock.fd >= 0) {
408  if (ctx->sock.registered)
409  uloop_fd_delete(&ctx->sock);
410 
411  close(ctx->sock.fd);
412  }
413 
414  ctx->sock.eof = false;
415  ctx->sock.error = false;
416  ctx->sock.fd = usock(USOCK_UNIX, path, NULL);
417  if (ctx->sock.fd < 0)
419 
420  if (read(ctx->sock.fd, &hdr, sizeof(hdr)) != sizeof(hdr))
421  goto out_close;
422 
423  if (!ubus_validate_hdr(&hdr.hdr))
424  goto out_close;
425 
426  if (hdr.hdr.type != UBUS_MSG_HELLO)
427  goto out_close;
428 
429  buf = calloc(1, blob_raw_len(&hdr.data));
430  if (!buf)
431  goto out_close;
432 
433  memcpy(buf, &hdr.data, sizeof(hdr.data));
434  if (read(ctx->sock.fd, blob_data(buf), blob_len(buf)) != (ssize_t) blob_len(buf))
435  goto out_free;
436 
437  ctx->local_id = hdr.hdr.peer;
438  if (!ctx->local_id)
439  goto out_free;
440 
441  ret = UBUS_STATUS_OK;
442  fcntl(ctx->sock.fd, F_SETFL, fcntl(ctx->sock.fd, F_GETFL) | O_NONBLOCK | O_CLOEXEC);
443 
445 
446 out_free:
447  free(buf);
448 out_close:
449  if (ret)
450  close(ctx->sock.fd);
451 
452  return ret;
453 }
static int timeout
Definition: cli.c:21
static struct blob_buf b
Definition: cli.c:19
static struct ubus_context * ctx
Definition: client.c:22
void ubus_process_msg(struct ubus_context *ctx, struct ubus_msghdr_buf *buf, int fd)
Definition: libubus.c:93
static void ubus_auto_sub_lookup(struct ubus_context *ctx, struct ubus_object_data *obj, void *priv)
Definition: libubus-io.c:348
static bool get_next_msg(struct ubus_context *ctx, int *recv_fd)
Definition: libubus-io.c:271
static int recv_retry(struct ubus_context *ctx, struct iovec *iov, bool wait, int *recv_fd)
Definition: libubus-io.c:158
static const struct blob_attr_info ubus_policy[UBUS_ATTR_MAX]
Definition: libubus-io.c:34
static bool alloc_msg_buf(struct ubus_context *ctx, int len)
Definition: libubus-io.c:240
static struct blob_attr * attrbuf[UBUS_ATTR_MAX]
Definition: libubus-io.c:44
int __hidden ubus_send_msg(struct ubus_context *ctx, uint32_t seq, struct blob_attr *msg, int cmd, uint32_t peer, int fd)
Definition: libubus-io.c:126
#define UBUS_MSGBUF_REDUCTION_INTERVAL
Definition: libubus-io.c:32
static void ubus_refresh_auto_subscribe(struct ubus_context *ctx)
Definition: libubus-io.c:359
void __hidden ubus_poll_data(struct ubus_context *ctx, int timeout)
Definition: libubus-io.c:335
static void ubus_refresh_state(struct ubus_context *ctx)
Definition: libubus-io.c:371
#define STATIC_IOV(_var)
Definition: libubus-io.c:30
static int writev_retry(int fd, struct iovec *iov, int iov_len, int sock_fd)
Definition: libubus-io.c:60
void __hidden ubus_handle_data(struct uloop_fd *u, unsigned int events)
Definition: libubus-io.c:312
__hidden struct blob_attr ** ubus_parse_msg(struct blob_attr *msg, size_t len)
Definition: libubus-io.c:46
int ubus_reconnect(struct ubus_context *ctx, const char *path)
Definition: libubus-io.c:395
static void wait_data(int fd, bool write)
Definition: libubus-io.c:52
bool ubus_validate_hdr(struct ubus_msghdr *hdr)
Definition: libubus-io.c:224
int ubus_add_object(struct ubus_context *ctx, struct ubus_object *obj)
Definition: libubus-obj.c:209
const struct ubus_method watch_method __hidden
Definition: libubus-sub.c:29
int ubus_subscribe(struct ubus_context *ctx, struct ubus_subscriber *obj, uint32_t id)
Definition: libubus-sub.c:118
int ubus_lookup(struct ubus_context *ctx, const char *path, ubus_lookup_handler_t cb, void *priv)
Definition: libubus.c:161
int ubus_register_event_handler(struct ubus_context *ctx, struct ubus_event_handler *ev, const char *pattern)
Definition: libubus.c:225
int stack_depth
Definition: libubus.h:168
uint32_t msgbuf_data_len
Definition: libubus.h:174
struct uloop_timeout pending_timer
Definition: libubus.h:163
int msgbuf_reduction_counter
Definition: libubus.h:175
struct ubus_msghdr_buf msgbuf
Definition: libubus.h:173
void(* connection_lost)(struct ubus_context *ctx)
Definition: libubus.h:170
struct avl_tree objects
Definition: libubus.h:159
bool cancel_poll
Definition: libubus.h:167
struct list_head auto_subscribers
Definition: libubus.h:177
struct uloop_fd sock
Definition: libubus.h:162
uint32_t local_id
Definition: libubus.h:165
struct ubus_event_handler auto_subscribe_event_handler
Definition: libubus.h:178
struct ubus_msghdr hdr
Definition: libubus.h:42
struct blob_attr * data
Definition: libubus.h:43
uint32_t peer
Definition: ubusmsg.h:33
uint8_t version
Definition: ubusmsg.h:30
uint8_t type
Definition: ubusmsg.h:31
uint16_t seq
Definition: ubusmsg.h:32
uint32_t id
Definition: libubus.h:120
uint32_t id
Definition: libubus.h:130
struct ubus_object_type * type
Definition: libubus.h:133
struct avl_node avl
Definition: libubus.h:127
const char * path
Definition: libubus.h:132
struct ubus_object obj
Definition: libubus.h:144
ubus_new_object_handler_t new_obj_cb
Definition: libubus.h:148
struct list_head list
Definition: libubus.h:143
struct avl_tree path
Definition: ubusd_obj.c:19
uint16_t seq
Definition: ubusmsg.h:2
#define UBUS_MSG_CHUNK_SIZE
Definition: ubusmsg.h:22
uint32_t peer
Definition: ubusmsg.h:3
@ UBUS_MSG_HELLO
Definition: ubusmsg.h:38
@ UBUS_STATUS_CONNECTION_FAILED
Definition: ubusmsg.h:129
@ UBUS_STATUS_OK
Definition: ubusmsg.h:119
@ UBUS_STATUS_UNKNOWN_ERROR
Definition: ubusmsg.h:128
@ UBUS_ATTR_ACTIVE
Definition: ubusmsg.h:94
@ UBUS_ATTR_OBJPATH
Definition: ubusmsg.h:84
@ UBUS_ATTR_METHOD
Definition: ubusmsg.h:86
@ UBUS_ATTR_SUBSCRIBERS
Definition: ubusmsg.h:97
@ UBUS_ATTR_STATUS
Definition: ubusmsg.h:82
@ UBUS_ATTR_NO_REPLY
Definition: ubusmsg.h:95
@ UBUS_ATTR_OBJID
Definition: ubusmsg.h:85
@ UBUS_ATTR_MAX
Definition: ubusmsg.h:103