lwm2m: use asynchronous socket io

This restructures the lwm2m_engine to use a non-blocking socket access
instead of the previously used blocking style, and eliminates any
socket-access from outside of the main work loop.

The main motivation behind this is an issue within nordics
nrf_modem_lib/modem-fw on nrf9160, that leads to socket send() calls to
block indefinitely when the shared memory used for
rpc-communication with the modem is already exhausted because of
incoming data.

This lead to the lwm2m_engine locking up on send calls when there is
also a large amount of incoming data.

This works around this issue, by only issuing send calls when poll
reports the socket to be ready for sending, and (more importantly) by
always receiving all buffered incoming data before sending anything.

There might still be a (perhaps academic) possibility where this
situation might be triggered, when  the scheduler interrupts the lwm2m
thread in-between receiving and sending, but for now we have not yet
observed this.

Besides working around the aforementioned issue, this also simplifies
the way resends are handled as they are no longer send from the main
system-workqueue, and limits all interaction with the sockets to a
single thread.

Signed-off-by: Henning Fleddermann <henning.fleddermann@grandcentrix.net>
This commit is contained in:
Henning Fleddermann 2021-05-17 16:23:51 +02:00 committed by Jukka Rissanen
commit 32989a38f0
6 changed files with 233 additions and 222 deletions

View file

@ -77,8 +77,7 @@ struct lwm2m_ctx {
/** Private CoAP and networking structures */
struct coap_pending pendings[CONFIG_LWM2M_ENGINE_MAX_PENDING];
struct coap_reply replies[CONFIG_LWM2M_ENGINE_MAX_REPLIES];
struct k_work_delayable retransmit_work;
struct sys_mutex send_lock;
sys_slist_t pending_sends;
/** A pointer to currently processed request, for internal LwM2M engine
* use. The underlying type is ``struct lwm2m_message``, but since it's

View file

@ -18,6 +18,7 @@
#include <logging/log.h>
LOG_MODULE_REGISTER(LOG_MODULE_NAME);
#include <fcntl.h>
#include <zephyr/types.h>
#include <stddef.h>
#include <stdlib.h>
@ -26,7 +27,6 @@ LOG_MODULE_REGISTER(LOG_MODULE_NAME);
#include <ctype.h>
#include <errno.h>
#include <init.h>
#include <sys/mutex.h>
#include <sys/printk.h>
#include <net/net_ip.h>
#include <net/http_parser_url.h>
@ -1024,9 +1024,15 @@ cleanup:
return r;
}
int lwm2m_send_message(struct lwm2m_message *msg)
int lwm2m_send_message_async(struct lwm2m_message *msg)
{
int rc, ret;
sys_slist_append(&msg->ctx->pending_sends, &msg->node);
return 0;
}
static int lwm2m_send_message(struct lwm2m_message *msg)
{
int rc;
if (!msg || !msg->ctx) {
LOG_ERR("LwM2M message is invalid.");
@ -1037,37 +1043,23 @@ int lwm2m_send_message(struct lwm2m_message *msg)
coap_pending_cycle(msg->pending);
}
msg->send_attempts++;
ret = sys_mutex_lock(&msg->ctx->send_lock, K_FOREVER);
__ASSERT(ret == 0, "sys_mutex_lock failed with %d", ret);
rc = send(msg->ctx->sock_fd, msg->cpkt.data, msg->cpkt.offset, 0);
ret = sys_mutex_unlock(&msg->ctx->send_lock);
__ASSERT(ret == 0, "sys_mutex_unlock failed with %d", ret);
ARG_UNUSED(ret);
if (rc < 0) {
if (msg->type == COAP_TYPE_CON) {
coap_pending_clear(msg->pending);
if (errno == EAGAIN || errno == EWOULDBLOCK) {
LOG_WRN("Failed to send packet, would block");
return -errno;
}
LOG_ERR("Failed to send packet, err %d", errno);
if (msg->type != COAP_TYPE_CON) {
lwm2m_reset_message(msg, true);
}
return -errno;
}
if (msg->type == COAP_TYPE_CON) {
int32_t remaining =
k_ticks_to_ms_ceil32(k_work_delayable_remaining_get(
&msg->ctx->retransmit_work));
/* If the item is already pending and its timeout is smaller
* than the new one, skip the submission.
*/
if (remaining == 0 || remaining > msg->pending->timeout) {
k_work_reschedule(&msg->ctx->retransmit_work,
K_MSEC(msg->pending->timeout));
}
} else {
if (msg->type != COAP_TYPE_CON) {
lwm2m_reset_message(msg, true);
}
@ -1099,11 +1091,7 @@ int lwm2m_send_empty_ack(struct lwm2m_ctx *client_ctx, uint16_t mid)
goto cleanup;
}
ret = lwm2m_send_message(msg);
if (ret < 0) {
LOG_ERR("Error sending LWM2M packet (err:%d).", ret);
goto cleanup;
}
lwm2m_send_message_async(msg);
return 0;
@ -4204,102 +4192,77 @@ static void lwm2m_udp_receive(struct lwm2m_ctx *client_ctx,
}
client_ctx->processed_req = NULL;
r = lwm2m_send_message(msg);
if (r < 0) {
LOG_ERR("Err sending response: %d", r);
lwm2m_reset_message(msg, true);
}
lwm2m_send_message_async(msg);
} else {
LOG_DBG("No handler for response");
}
}
static void retransmit_request(struct k_work *work)
{
struct lwm2m_ctx *client_ctx;
struct lwm2m_message *msg;
struct coap_pending *pending;
int32_t remaining;
int ret;
client_ctx = CONTAINER_OF(work, struct lwm2m_ctx, retransmit_work);
pending = coap_pending_next_to_expire(client_ctx->pendings,
CONFIG_LWM2M_ENGINE_MAX_PENDING);
if (!pending) {
return;
}
remaining = pending->t0 + pending->timeout - k_uptime_get_32();
if (remaining > 0) {
/* First message to expire was removed from the list,
* schedule next.
/* returns ms until the next retransmission is due, or INT32_MAX
* if no retransmissions are necessary
*/
goto next;
static int32_t retransmit_request(struct lwm2m_ctx *client_ctx,
const uint32_t timestamp)
{
struct lwm2m_message *msg;
struct coap_pending *p;
int32_t remaining, next_retransmission = INT32_MAX;
int i;
for (i = 0, p = client_ctx->pendings;
i < CONFIG_LWM2M_ENGINE_MAX_PENDING; i++, p++) {
if (!p->timeout) {
continue;
}
msg = find_msg(pending, NULL);
remaining = p->t0 + p->timeout - timestamp;
if (remaining < 0) {
msg = find_msg(p, NULL);
if (!msg) {
LOG_ERR("pending has no valid LwM2M message!");
coap_pending_clear(pending);
goto next;
coap_pending_clear(p);
continue;
}
if (!coap_pending_cycle(pending)) {
if (!p->retries) {
/* pending request has expired */
if (msg->message_timeout_cb) {
msg->message_timeout_cb(msg);
}
/*
* coap_pending_clear() is called in lwm2m_reset_message()
* which balances the ref we made in coap_pending_cycle()
*/
lwm2m_reset_message(msg, true);
goto next;
continue;
}
if (msg->acknowledged) {
/* No need to retransmit, just keep the timer running to
* timeout in case no response arrives.
*/
goto next;
coap_pending_cycle(p);
continue;
}
LOG_INF("Resending message: %p", msg);
msg->send_attempts++;
ret = sys_mutex_lock(&client_ctx->send_lock, K_FOREVER);
__ASSERT(ret == 0, "sys_mutex_lock failed with %d", ret);
if (msg->ctx == NULL) {
LOG_INF("Response for %p already handled", msg);
goto next_locked;
lwm2m_send_message_async(msg);
break;
}
if (remaining < next_retransmission) {
next_retransmission = remaining;
}
}
if (send(msg->ctx->sock_fd, msg->cpkt.data, msg->cpkt.offset, 0) < 0) {
LOG_ERR("Error sending lwm2m message: %d", -errno);
/* don't error here, retry until timeout */
return next_retransmission;
}
static int32_t check_retransmissions(const uint32_t timestamp)
{
int32_t next_retransmit;
int32_t next_retransmit_min = INT32_MAX;
for (int i = 0; i < sock_nfds; ++i) {
next_retransmit = retransmit_request(sock_ctx[i], timestamp);
if (next_retransmit < next_retransmit_min) {
next_retransmit_min = next_retransmit;
}
}
next_locked:
ret = sys_mutex_unlock(&client_ctx->send_lock);
__ASSERT(ret == 0, "sys_mutex_unlock failed with %d", ret);
ARG_UNUSED(ret);
next:
pending = coap_pending_next_to_expire(client_ctx->pendings,
CONFIG_LWM2M_ENGINE_MAX_PENDING);
if (!pending) {
return;
}
remaining = pending->t0 + pending->timeout - k_uptime_get_32();
if (remaining < 0) {
remaining = 0;
}
k_work_reschedule(&client_ctx->retransmit_work, K_MSEC(remaining));
return next_retransmit_min;
}
static int notify_message_reply_cb(const struct coap_packet *response,
@ -4407,11 +4370,7 @@ static int generate_notify_message(struct observe_node *obs,
goto cleanup;
}
ret = lwm2m_send_message(msg);
if (ret < 0) {
LOG_ERR("Error sending LWM2M packet (err:%d).", ret);
goto cleanup;
}
lwm2m_send_message_async(msg);
LOG_DBG("NOTIFY MSG: SENT");
return 0;
@ -4421,10 +4380,11 @@ cleanup:
return ret;
}
int32_t engine_next_service_timeout_ms(uint32_t max_timeout)
static int32_t engine_next_service_timeout_ms(uint32_t max_timeout,
const int64_t timestamp)
{
struct service_node *srv;
uint64_t time_left_ms, timestamp = k_uptime_get();
uint64_t time_left_ms;
uint32_t timeout = max_timeout;
SYS_SLIST_FOR_EACH_CONTAINER(&engine_service_list, srv, node) {
@ -4484,48 +4444,11 @@ int lwm2m_engine_update_service_period(k_work_handler_t service, uint32_t period
return -ENOENT;
}
static int lwm2m_engine_service(void)
static int32_t lwm2m_engine_service(const int64_t timestamp)
{
struct observe_node *obs;
struct service_node *srv;
int64_t timestamp, service_due_timestamp;
int64_t service_due_timestamp;
/*
* 1. scan the observer list
* 2. For each notify event found, scan the observer list
* 3. For each observer match, generate a NOTIFY message,
* attaching the notify response handler
*/
timestamp = k_uptime_get();
SYS_SLIST_FOR_EACH_CONTAINER(&engine_observer_list, obs, node) {
/*
* manual notify requirements:
* - event_timestamp > last_timestamp
* - if min_period_sec is set:
* current timestamp > last_timestamp + min_period_sec
*/
if (obs->event_timestamp > obs->last_timestamp &&
(obs->min_period_sec == 0 ||
timestamp > obs->last_timestamp +
MSEC_PER_SEC * obs->min_period_sec)) {
obs->last_timestamp = k_uptime_get();
generate_notify_message(obs, true);
/*
* automatic time-based notify requirements:
* - if max_period_sec is set:
* current timestamp > last_timestamp + max_period_sec
*/
} else if (obs->max_period_sec > 0 &&
timestamp > obs->last_timestamp +
MSEC_PER_SEC * obs->max_period_sec) {
obs->last_timestamp = k_uptime_get();
generate_notify_message(obs, false);
}
}
timestamp = k_uptime_get();
SYS_SLIST_FOR_EACH_CONTAINER(&engine_service_list, srv, node) {
service_due_timestamp = srv->last_timestamp +
srv->min_call_period;
@ -4537,7 +4460,8 @@ static int lwm2m_engine_service(void)
}
/* calculate how long to sleep till the next service */
return engine_next_service_timeout_ms(ENGINE_UPDATE_INTERVAL_MS);
return engine_next_service_timeout_ms(ENGINE_UPDATE_INTERVAL_MS,
timestamp);
}
int lwm2m_engine_context_close(struct lwm2m_ctx *client_ctx)
@ -4548,12 +4472,9 @@ int lwm2m_engine_context_close(struct lwm2m_ctx *client_ctx)
struct lwm2m_message *msg;
size_t i;
/* Cancel pending retransmit work */
k_work_cancel_delayable(&client_ctx->retransmit_work);
/* Remove observes for this context */
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&engine_observer_list,
obs, tmp, node) {
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&engine_observer_list, obs, tmp,
node) {
if (obs->ctx == client_ctx) {
sys_slist_remove(&engine_observer_list, prev_node,
&obs->node);
@ -4585,8 +4506,7 @@ int lwm2m_engine_context_close(struct lwm2m_ctx *client_ctx)
void lwm2m_engine_context_init(struct lwm2m_ctx *client_ctx)
{
k_work_init_delayable(&client_ctx->retransmit_work, retransmit_request);
sys_mutex_init(&client_ctx->send_lock);
sys_slist_init(&client_ctx->pending_sends);
}
/* LwM2M Socket Integration */
@ -4628,28 +4548,136 @@ void lwm2m_socket_del(struct lwm2m_ctx *ctx)
}
}
/* LwM2M main work loop */
static void socket_receive_loop(void)
static bool manual_notify_is_due(const struct observe_node *obs,
const int64_t timestamp)
{
static uint8_t in_buf[NET_IPV6_MTU];
static struct sockaddr from_addr;
socklen_t from_addr_len;
ssize_t len;
int i;
const bool has_min_period = obs->min_period_sec != 0;
while (1) {
/* wait for sockets */
if (sock_nfds < 1) {
k_msleep(lwm2m_engine_service());
return obs->event_timestamp > obs->last_timestamp &&
(!has_min_period || timestamp > obs->last_timestamp +
MSEC_PER_SEC * obs->min_period_sec);
}
static bool automatic_notify_is_due(const struct observe_node *obs,
const int64_t timestamp)
{
const bool has_max_period = obs->max_period_sec != 0;
return has_max_period && (timestamp > obs->last_timestamp +
MSEC_PER_SEC * obs->max_period_sec);
}
static void check_notifications(sys_slist_t *observer_list,
const int64_t timestamp)
{
struct observe_node *obs;
int rc;
bool manual_notify, automatic_notify;
SYS_SLIST_FOR_EACH_CONTAINER(observer_list, obs, node) {
manual_notify = manual_notify_is_due(obs, timestamp);
automatic_notify = automatic_notify_is_due(obs, timestamp);
if (!manual_notify && !automatic_notify) {
continue;
}
rc = generate_notify_message(obs, manual_notify);
if (rc == -ENOMEM) {
/* no memory/messages available, retry later */
return;
}
obs->last_timestamp = timestamp;
if (!rc) {
/* create at most one notification */
return;
}
}
}
static int socket_recv_message(struct lwm2m_ctx *client_ctx)
{
static uint8_t in_buf[NET_IPV6_MTU];
socklen_t from_addr_len;
ssize_t len;
static struct sockaddr from_addr;
from_addr_len = sizeof(from_addr);
len = recvfrom(client_ctx->sock_fd, in_buf, sizeof(in_buf) - 1,
0, &from_addr, &from_addr_len);
if (len < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return -errno;
}
LOG_ERR("Error reading response: %d", errno);
if (client_ctx->fault_cb != NULL) {
client_ctx->fault_cb(errno);
}
return -errno;
}
if (len == 0) {
LOG_ERR("Zero length recv");
return 0;
}
in_buf[len] = 0U;
lwm2m_udp_receive(client_ctx, in_buf, len, &from_addr, handle_request);
return 0;
}
static int socket_send_message(struct lwm2m_ctx *client_ctx)
{
sys_snode_t *msg_node = sys_slist_get(&client_ctx->pending_sends);
struct lwm2m_message *msg;
if (!msg_node) {
return 0;
}
msg = SYS_SLIST_CONTAINER(msg_node, msg, node);
return lwm2m_send_message(msg);
}
static void socket_reset_pollfd_events(void)
{
for (int i = 0; i < sock_nfds; ++i) {
sock_fds[i].events = POLLIN
| (sys_slist_is_empty(&sock_ctx[i]->pending_sends) ? 0 : POLLOUT);
sock_fds[i].revents = 0;
}
}
/* LwM2M main work loop */
static void socket_loop(void)
{
int i, rc;
int64_t timestamp;
int32_t timeout, next_retransmit;
while (1) {
timestamp = k_uptime_get();
timeout = lwm2m_engine_service(timestamp);
/* wait for sockets */
if (sock_nfds < 1) {
k_msleep(timeout);
continue;
}
next_retransmit = check_retransmissions(timestamp);
if (next_retransmit < timeout) {
timeout = next_retransmit;
}
check_notifications(&engine_observer_list, timestamp);
socket_reset_pollfd_events();
/*
* FIXME: Currently we timeout and restart poll in case fds
* were modified.
*/
if (poll(sock_fds, sock_nfds, lwm2m_engine_service()) < 0) {
rc = poll(sock_fds, sock_nfds, timeout);
if (rc < 0) {
LOG_ERR("Error in poll:%d", errno);
errno = 0;
k_msleep(ENGINE_UPDATE_INTERVAL_MS);
@ -4669,35 +4697,18 @@ static void socket_receive_loop(void)
continue;
}
if (!(sock_fds[i].revents & POLLIN) ||
sock_ctx[i] == NULL) {
sock_fds[i].revents = 0;
continue;
if (sock_fds[i].revents & POLLIN) {
while (sock_ctx[i]) {
rc = socket_recv_message(sock_ctx[i]);
if (rc) {
break;
}
}
}
from_addr_len = sizeof(from_addr);
sock_fds[i].revents = 0;
len = recvfrom(sock_ctx[i]->sock_fd, in_buf,
sizeof(in_buf) - 1, 0,
&from_addr, &from_addr_len);
if (len < 0) {
LOG_ERR("Error reading response: %d", errno);
if (sock_ctx[i]->fault_cb != NULL) {
sock_ctx[i]->fault_cb(errno);
if (sock_fds[i].revents & POLLOUT) {
socket_send_message(sock_ctx[i]);
}
continue;
}
if (len == 0) {
LOG_ERR("Zero length recv");
continue;
}
in_buf[len] = 0U;
lwm2m_udp_receive(sock_ctx[i], in_buf, len, &from_addr,
handle_request);
}
}
}
@ -4737,6 +4748,7 @@ static int load_tls_credential(struct lwm2m_ctx *client_ctx, uint16_t res_id,
int lwm2m_socket_start(struct lwm2m_ctx *client_ctx)
{
int flags;
#if defined(CONFIG_LWM2M_DTLS_SUPPORT)
int ret;
@ -4796,6 +4808,12 @@ int lwm2m_socket_start(struct lwm2m_ctx *client_ctx)
return -errno;
}
flags = fcntl(client_ctx->sock_fd, F_GETFL, 0);
if (flags == -1) {
return -errno;
}
fcntl(client_ctx->sock_fd, F_SETFL, flags | O_NONBLOCK);
return lwm2m_socket_add(client_ctx);
}
@ -4934,22 +4952,17 @@ int lwm2m_engine_start(struct lwm2m_ctx *client_ctx)
static int lwm2m_engine_init(const struct device *dev)
{
int ret = 0;
(void)memset(block1_contexts, 0, sizeof(block1_contexts));
/* start sock receive thread */
k_thread_create(&engine_thread_data,
&engine_thread_stack[0],
k_thread_create(&engine_thread_data, &engine_thread_stack[0],
K_KERNEL_STACK_SIZEOF(engine_thread_stack),
(k_thread_entry_t) socket_receive_loop,
NULL, NULL, NULL,
THREAD_PRIORITY,
0, K_NO_WAIT);
(k_thread_entry_t)socket_loop, NULL, NULL, NULL,
THREAD_PRIORITY, 0, K_NO_WAIT);
k_thread_name_set(&engine_thread_data, "lwm2m-sock-recv");
LOG_DBG("LWM2M engine socket receive thread started");
return ret;
return 0;
}
SYS_INIT(lwm2m_engine_init, APPLICATION, CONFIG_KERNEL_INIT_PRIORITY_DEFAULT);

View file

@ -91,7 +91,7 @@ int lwm2m_put_message_buf(uint8_t *buf);
struct lwm2m_message *lwm2m_get_message(struct lwm2m_ctx *client_ctx);
void lwm2m_reset_message(struct lwm2m_message *msg, bool release);
int lwm2m_init_message(struct lwm2m_message *msg);
int lwm2m_send_message(struct lwm2m_message *msg);
int lwm2m_send_message_async(struct lwm2m_message *msg);
int lwm2m_send_empty_ack(struct lwm2m_ctx *client_ctx, uint16_t mid);
int lwm2m_register_payload_handler(struct lwm2m_message *msg);

View file

@ -182,7 +182,7 @@ static int transfer_request(struct coap_block_context *ctx,
#endif
/* send request */
ret = lwm2m_send_message(msg);
ret = lwm2m_send_message_async(msg);
if (ret < 0) {
LOG_ERR("Error sending LWM2M packet (err:%d).", ret);
goto cleanup;

View file

@ -444,6 +444,8 @@ typedef void (*lwm2m_message_timeout_cb_t)(struct lwm2m_message *msg);
/* Internal LwM2M message structure to track in-flight messages. */
struct lwm2m_message {
sys_snode_t node;
/** LwM2M context related to this message */
struct lwm2m_ctx *ctx;
@ -476,9 +478,6 @@ struct lwm2m_message {
/** Incoming message action */
uint8_t operation;
/** Counter for message re-send / abort handling */
uint8_t send_attempts;
/* Information whether the message was acknowledged. */
bool acknowledged : 1;
};

View file

@ -779,7 +779,7 @@ static int sm_send_registration(bool send_obj_support_data,
}
}
ret = lwm2m_send_message(msg);
ret = lwm2m_send_message_async(msg);
if (ret < 0) {
LOG_ERR("Error sending LWM2M packet (err:%d).", ret);
goto cleanup;
@ -929,7 +929,7 @@ static int sm_do_deregister(void)
LOG_INF("Deregister from '%s'", log_strdup(client.server_ep));
ret = lwm2m_send_message(msg);
ret = lwm2m_send_message_async(msg);
if (ret < 0) {
LOG_ERR("Error sending LWM2M packet (err:%d).", ret);
goto cleanup;