net: lwm2m: refactor engine to use lwm2m_message structure

Sending an lwm2m message is too difficult.  It requires pending / reply
and other structures to be configured and set by various portions of
the library.  There is also no way to know if a pending message ever
encounters a timeout.

Let's fix this by simplifying the internal LwM2M engine APIs for
handling lwm2m messages:

1. A user calls lwm2m_get_message(lwm2m_ctx) which returns the first
   available lwm2m message from an array of messages
   (total # of messages is set via CONFIG_LWM2M_ENGINE_MAX_MESSAGES).
2. Next the user sets all of the fields in the message that are
   required (type, code message id, token, etc)
3. Then the user calls lwm2m_init_message(msg).  This initializes the
   underlying zoap_packet, pending and reply structures.
4. Once initialized, the user creates their payload in msg->zpkt.
5. When the user is ready to send, the call lwm2m_send_message(msg).
6. And if for some reason an error occurs at any point, they can free
   up the entire set of structures with: lwm2m_release_message(msg).

Included in the refactoring is a timeout_cb field which can be set in
the LwM2M messages.  If a pending structure ever expires the engine
will call the timeout_cb passing in the msg structure before it's
automatically released.

Signed-off-by: Michael Scott <michael.scott@linaro.org>
This commit is contained in:
Michael Scott 2017-09-01 01:11:43 -07:00 committed by Jukka Rissanen
commit 897dbffe7c
5 changed files with 412 additions and 351 deletions

View file

@ -111,6 +111,9 @@ static struct lwm2m_engine_obj *get_engine_obj(int obj_id);
static struct lwm2m_engine_obj_inst *get_engine_obj_inst(int obj_id,
int obj_inst_id);
/* Shared set of in-flight LwM2M messages */
static struct lwm2m_message messages[CONFIG_LWM2M_ENGINE_MAX_MESSAGES];
/* for debugging: to print IP addresses */
char *lwm2m_sprint_ip_addr(const struct sockaddr *addr)
{
@ -189,7 +192,7 @@ int lwm2m_notify_observer_path(struct lwm2m_obj_path *path)
path->res_id);
}
static int engine_add_observer(struct net_app_ctx *app_ctx,
static int engine_add_observer(struct lwm2m_message *msg,
const u8_t *token, u8_t tkl,
struct lwm2m_obj_path *path,
u16_t format)
@ -199,6 +202,11 @@ static int engine_add_observer(struct net_app_ctx *app_ctx,
struct sockaddr *addr;
int i;
if (!msg || !msg->ctx) {
SYS_LOG_ERR("valid lwm2m message is required");
return -EINVAL;
}
if (!token || (tkl == 0 || tkl > MAX_TOKEN_LEN)) {
SYS_LOG_ERR("token(%p) and token length(%u) must be valid.",
token, tkl);
@ -206,7 +214,7 @@ static int engine_add_observer(struct net_app_ctx *app_ctx,
}
/* remote addr */
addr = &app_ctx->default_ctx->remote;
addr = &msg->ctx->net_app_ctx.default_ctx->remote;
/* check if object exists */
if (!get_engine_obj(path->obj_id)) {
@ -243,7 +251,7 @@ static int engine_add_observer(struct net_app_ctx *app_ctx,
/* make sure this observer doesn't exist already */
SYS_SLIST_FOR_EACH_CONTAINER(&engine_observer_list, obs, node) {
if (&obs->ctx->net_app_ctx == app_ctx &&
if (obs->ctx == msg->ctx &&
memcmp(&obs->path, path, sizeof(*path)) == 0) {
/* quietly update the token information */
memcpy(obs->token, token, tkl);
@ -272,8 +280,7 @@ static int engine_add_observer(struct net_app_ctx *app_ctx,
/* copy the values and add it to the list */
observe_node_data[i].used = true;
observe_node_data[i].ctx = CONTAINER_OF(app_ctx,
struct lwm2m_ctx, net_app_ctx);
observe_node_data[i].ctx = msg->ctx;
memcpy(&observe_node_data[i].path, path, sizeof(*path));
memcpy(observe_node_data[i].token, token, tkl);
observe_node_data[i].tkl = tkl;
@ -588,104 +595,192 @@ static void zoap_options_to_path(struct zoap_option *opt, int options_count,
}
}
int lwm2m_init_message(struct net_app_ctx *app_ctx, struct zoap_packet *zpkt,
struct net_pkt **pkt, u8_t type, u8_t code, u16_t mid,
const u8_t *token, u8_t tkl)
struct lwm2m_message *find_msg_from_pending(struct zoap_pending *pending)
{
size_t i;
if (!pending) {
return NULL;
}
for (i = 0; i < CONFIG_LWM2M_ENGINE_MAX_MESSAGES; i++) {
if (messages[i].ctx && messages[i].pending == pending) {
return &messages[i];
}
}
return NULL;
}
struct lwm2m_message *lwm2m_get_message(struct lwm2m_ctx *client_ctx)
{
size_t i;
for (i = 0; i < CONFIG_LWM2M_ENGINE_MAX_MESSAGES; i++) {
if (!messages[i].ctx) {
messages[i].ctx = client_ctx;
return &messages[i];
}
}
return NULL;
}
void lwm2m_release_message(struct lwm2m_message *msg)
{
if (!msg) {
return;
}
if (msg->pending) {
zoap_pending_clear(msg->pending);
}
if (msg->reply) {
/* make sure we want to clear the reply */
zoap_reply_clear(msg->reply);
}
memset(msg, 0, sizeof(*msg));
}
int lwm2m_init_message(struct lwm2m_message *msg)
{
struct net_pkt *pkt;
struct net_app_ctx *app_ctx;
struct net_buf *frag;
int r;
*pkt = net_app_get_net_pkt(app_ctx, AF_UNSPEC, BUF_ALLOC_TIMEOUT);
if (!*pkt) {
if (!msg || !msg->ctx) {
SYS_LOG_ERR("LwM2M message is invalid.");
return -EINVAL;
}
app_ctx = &msg->ctx->net_app_ctx;
pkt = net_app_get_net_pkt(app_ctx, AF_UNSPEC, BUF_ALLOC_TIMEOUT);
if (!pkt) {
SYS_LOG_ERR("Unable to get TX packet, not enough memory.");
return -ENOMEM;
}
frag = net_app_get_net_buf(app_ctx, *pkt,
BUF_ALLOC_TIMEOUT);
frag = net_app_get_net_buf(app_ctx, pkt, BUF_ALLOC_TIMEOUT);
if (!frag) {
SYS_LOG_ERR("Unable to get DATA buffer, not enough memory.");
net_pkt_unref(*pkt);
*pkt = NULL;
return -ENOMEM;
r = -ENOMEM;
goto cleanup;
}
r = zoap_packet_init(zpkt, *pkt);
r = zoap_packet_init(&msg->zpkt, pkt);
if (r < 0) {
SYS_LOG_ERR("zoap packet init error (err:%d)", r);
return r;
goto cleanup;
}
/* FIXME: Could be that zoap_packet_init() sets some defaults */
zoap_header_set_version(zpkt, 1);
zoap_header_set_type(zpkt, type);
zoap_header_set_code(zpkt, code);
zoap_header_set_version(&msg->zpkt, 1);
zoap_header_set_type(&msg->zpkt, msg->type);
zoap_header_set_code(&msg->zpkt, msg->code);
if (mid > 0) {
zoap_header_set_id(zpkt, mid);
if (msg->mid > 0) {
zoap_header_set_id(&msg->zpkt, msg->mid);
} else {
zoap_header_set_id(zpkt, zoap_next_id());
zoap_header_set_id(&msg->zpkt, zoap_next_id());
}
/*
* tkl == 0 is for a new TOKEN
* tkl == LWM2M_MSG_TOKEN_LEN_SKIP means dont set
*/
if (tkl == 0) {
zoap_header_set_token(zpkt, zoap_next_token(),
if (msg->tkl == 0) {
zoap_header_set_token(&msg->zpkt, zoap_next_token(),
MAX_TOKEN_LEN);
} else if (token && tkl != LWM2M_MSG_TOKEN_LEN_SKIP) {
zoap_header_set_token(zpkt, token, tkl);
} else if (msg->token && msg->tkl != LWM2M_MSG_TOKEN_LEN_SKIP) {
zoap_header_set_token(&msg->zpkt, msg->token, msg->tkl);
}
/* only TYPE_CON messages need pending tracking / reply handling */
if (msg->type != ZOAP_TYPE_CON) {
return 0;
}
msg->pending = zoap_pending_next_unused(
msg->ctx->pendings,
CONFIG_LWM2M_ENGINE_MAX_PENDING);
if (!msg->pending) {
SYS_LOG_ERR("Unable to find a free pending to track "
"retransmissions.");
r = -ENOMEM;
goto cleanup;
}
r = zoap_pending_init(msg->pending, &msg->zpkt,
&app_ctx->default_ctx->remote);
if (r < 0) {
SYS_LOG_ERR("Unable to initialize a pending "
"retransmission (err:%d).", r);
goto cleanup;
}
/* clear out pkt to avoid double unref */
pkt = NULL;
if (msg->reply_cb) {
msg->reply = zoap_reply_next_unused(
msg->ctx->replies,
CONFIG_LWM2M_ENGINE_MAX_REPLIES);
if (!msg->reply) {
SYS_LOG_ERR("No resources for "
"waiting for replies.");
r = -ENOMEM;
goto cleanup;
}
zoap_reply_init(msg->reply, &msg->zpkt);
msg->reply->reply = msg->reply_cb;
}
return 0;
}
struct zoap_pending *lwm2m_init_message_pending(struct lwm2m_ctx *client_ctx,
struct zoap_packet *zpkt)
{
struct zoap_pending *pending = NULL;
int ret;
pending = zoap_pending_next_unused(client_ctx->pendings,
CONFIG_LWM2M_ENGINE_MAX_PENDING);
if (!pending) {
SYS_LOG_ERR("Unable to find a free pending to track "
"retransmissions.");
return NULL;
}
ret = zoap_pending_init(pending, zpkt,
&client_ctx->net_app_ctx.default_ctx->remote);
if (ret < 0) {
SYS_LOG_ERR("Unable to initialize a pending "
"retransmission (err:%d).", ret);
pending->pkt = NULL;
return NULL;
}
return pending;
}
void lwm2m_init_message_cleanup(struct net_pkt *pkt,
struct zoap_pending *pending,
struct zoap_reply *reply)
{
if (pending) {
zoap_pending_clear(pending);
/* don't unref attached pkt twice */
if (!pending->pkt) {
pkt = NULL;
}
}
if (reply) {
zoap_reply_clear(reply);
}
cleanup:
lwm2m_release_message(msg);
if (pkt) {
net_pkt_unref(pkt);
}
return r;
}
int lwm2m_send_message(struct lwm2m_message *msg)
{
int ret;
if (!msg || !msg->ctx) {
SYS_LOG_ERR("LwM2M message is invalid.");
return -EINVAL;
}
msg->send_attempts++;
ret = net_app_send_pkt(&msg->ctx->net_app_ctx, msg->zpkt.pkt,
&msg->ctx->net_app_ctx.default_ctx->remote,
NET_SOCKADDR_MAX_SIZE, K_NO_WAIT, NULL);
if (ret < 0) {
return ret;
}
if (msg->type == ZOAP_TYPE_CON) {
if (msg->send_attempts > 1) {
return 0;
}
zoap_pending_cycle(msg->pending);
k_delayed_work_submit(&msg->ctx->retransmit_work,
msg->pending->timeout);
} else {
/* if we're not expecting an ACK, free up the msg data */
lwm2m_release_message(msg);
}
return 0;
}
u16_t lwm2m_get_rd_data(u8_t *client_data, u16_t size)
@ -2015,9 +2110,8 @@ static int get_observe_option(const struct zoap_packet *zpkt)
return zoap_option_value_to_int(&option);
}
static int handle_request(struct net_app_ctx *app_ctx,
struct zoap_packet *request,
struct zoap_packet *response)
static int handle_request(struct zoap_packet *request,
struct lwm2m_message *msg)
{
int r;
u8_t code;
@ -2040,9 +2134,9 @@ static int handle_request(struct net_app_ctx *app_ctx,
context.path = &path;
engine_clear_context(&context);
/* set ZoAP request / response */
/* set ZoAP request / message */
in.in_zpkt = request;
out.out_zpkt = response;
out.out_zpkt = &msg->zpkt;
/* set default reader/writer */
in.reader = &plain_text_reader;
@ -2159,8 +2253,7 @@ static int handle_request(struct net_app_ctx *app_ctx,
r);
}
r = engine_add_observer(app_ctx,
token, tkl, &path,
r = engine_add_observer(msg, token, tkl, &path,
accept);
if (r < 0) {
SYS_LOG_ERR("add OBSERVE error: %d", r);
@ -2248,25 +2341,16 @@ static int handle_request(struct net_app_ctx *app_ctx,
return r;
}
int lwm2m_udp_sendto(struct net_app_ctx *app_ctx, struct net_pkt *pkt)
{
return net_app_send_pkt(app_ctx, pkt, &app_ctx->default_ctx->remote,
NET_SOCKADDR_MAX_SIZE, K_NO_WAIT, NULL);
}
void lwm2m_udp_receive(struct lwm2m_ctx *client_ctx, struct net_pkt *pkt,
bool handle_separate_response,
int (*udp_request_handler)(struct net_app_ctx *app_ctx,
struct zoap_packet *,
struct zoap_packet *))
udp_request_handler_cb_t udp_request_handler)
{
struct lwm2m_message *msg = NULL;
struct net_udp_hdr hdr, *udp_hdr;
struct zoap_pending *pending;
struct zoap_reply *reply;
struct zoap_packet response;
struct sockaddr from_addr;
struct zoap_packet response2;
struct net_pkt *pkt2;
int header_len, r;
const u8_t *token;
u8_t tkl;
@ -2312,8 +2396,17 @@ void lwm2m_udp_receive(struct lwm2m_ctx *client_ctx, struct net_pkt *pkt,
token = zoap_header_get_token(&response, &tkl);
pending = zoap_pending_received(&response, client_ctx->pendings,
CONFIG_LWM2M_ENGINE_MAX_PENDING);
/*
* Clear pending pointer because zoap_pending_received() calls
* zoap_pending_clear, and later when we call lwm2m_release_message()
* it will try and call zoap_pending_clear() again if msg->pending
* is != NULL.
*/
if (pending) {
/* TODO: If necessary cancel retransmissions */
msg = find_msg_from_pending(pending);
if (msg) {
msg->pending = NULL;
}
}
SYS_LOG_DBG("checking for reply from [%s]",
@ -2321,47 +2414,7 @@ void lwm2m_udp_receive(struct lwm2m_ctx *client_ctx, struct net_pkt *pkt,
reply = zoap_response_received(&response, &from_addr,
client_ctx->replies,
CONFIG_LWM2M_ENGINE_MAX_REPLIES);
if (!reply) {
/*
* If no normal response handler is found, then this is
* a new request coming from the server. Let's look
* at registered objects to find a handler.
*/
if (udp_request_handler &&
zoap_header_get_type(&response) == ZOAP_TYPE_CON) {
/* Create a response packet if we reach this point */
r = lwm2m_init_message(&client_ctx->net_app_ctx,
&response2, &pkt2,
ZOAP_TYPE_ACK,
zoap_header_get_code(&response),
zoap_header_get_id(&response),
NULL, LWM2M_MSG_TOKEN_LEN_SKIP);
if (r < 0) {
if (pkt2) {
net_pkt_unref(pkt2);
}
goto cleanup;
}
/*
* The "response" here is actually a new request
*/
r = udp_request_handler(&client_ctx->net_app_ctx,
&response, &response2);
if (r < 0) {
SYS_LOG_ERR("Request handler error: %d", r);
} else {
r = lwm2m_udp_sendto(&client_ctx->net_app_ctx,
pkt2);
if (r < 0) {
SYS_LOG_ERR("Err sending response: %d",
r);
}
}
} else {
SYS_LOG_ERR("No handler for response");
}
} else {
if (reply) {
/*
* Separate response is composed of 2 messages, empty ACK with
* no token and an additional message with a matching token id
@ -2375,12 +2428,61 @@ void lwm2m_udp_receive(struct lwm2m_ctx *client_ctx, struct net_pkt *pkt,
if (handle_separate_response && !tkl &&
zoap_header_get_type(&response) == ZOAP_TYPE_ACK) {
SYS_LOG_DBG("separated response, not removing reply");
} else {
SYS_LOG_DBG("reply %p handled and removed", reply);
zoap_reply_clear(reply);
goto cleanup;
}
}
if (reply || pending) {
/* free up msg resources */
if (msg) {
lwm2m_release_message(msg);
}
SYS_LOG_DBG("reply %p handled and removed", reply);
goto cleanup;
}
/*
* If no normal response handler is found, then this is
* a new request coming from the server. Let's look
* at registered objects to find a handler.
*/
if (udp_request_handler &&
zoap_header_get_type(&response) == ZOAP_TYPE_CON) {
msg = lwm2m_get_message(client_ctx);
if (!msg) {
SYS_LOG_ERR("Unable to get a lwm2m message!");
goto cleanup;
}
/* Create a response message if we reach this point */
msg->type = ZOAP_TYPE_ACK;
msg->code = zoap_header_get_code(&response);
msg->mid = zoap_header_get_id(&response);
/* skip token generation by default */
msg->tkl = LWM2M_MSG_TOKEN_LEN_SKIP;
r = lwm2m_init_message(msg);
if (r < 0) {
goto cleanup;
}
/* process the response to this request */
r = udp_request_handler(&response, msg);
if (r < 0) {
SYS_LOG_ERR("Request handler error: %d", r);
} else {
r = lwm2m_send_message(msg);
if (r < 0) {
SYS_LOG_ERR("Err sending response: %d",
r);
lwm2m_release_message(msg);
}
}
} else {
SYS_LOG_ERR("No handler for response");
}
cleanup:
if (pkt) {
net_pkt_unref(pkt);
@ -2400,6 +2502,7 @@ static void udp_receive(struct net_app_ctx *app_ctx, struct net_pkt *pkt,
static void retransmit_request(struct k_work *work)
{
struct lwm2m_ctx *client_ctx;
struct lwm2m_message *msg;
struct zoap_pending *pending;
int r;
@ -2410,17 +2513,28 @@ static void retransmit_request(struct k_work *work)
return;
}
r = lwm2m_udp_sendto(&client_ctx->net_app_ctx,
pending->pkt);
if (r < 0) {
msg = find_msg_from_pending(pending);
if (!msg) {
SYS_LOG_ERR("pending has no valid LwM2M message!");
return;
}
if (!zoap_pending_cycle(pending)) {
zoap_pending_clear(pending);
/* pending request has expired */
if (msg->message_timeout_cb) {
msg->message_timeout_cb(msg);
}
lwm2m_release_message(msg);
return;
}
r = lwm2m_send_message(msg);
if (r < 0) {
SYS_LOG_ERR("Error sending lwm2m message: %d", r);
/* don't error here, retry until timeout */
}
k_delayed_work_submit(&client_ctx->retransmit_work, pending->timeout);
}
@ -2458,10 +2572,7 @@ static int notify_message_reply_cb(const struct zoap_packet *response,
static int generate_notify_message(struct observe_node *obs,
bool manual_trigger)
{
struct net_pkt *pkt = NULL;
struct zoap_pending *pending = NULL;
struct zoap_reply *reply = NULL;
struct zoap_packet request;
struct lwm2m_message *msg;
struct lwm2m_engine_obj_inst *obj_inst;
struct lwm2m_output_context out;
struct lwm2m_engine_context context;
@ -2481,7 +2592,6 @@ static int generate_notify_message(struct observe_node *obs,
memcpy(&path, &obs->path, sizeof(struct lwm2m_obj_path));
context.path = &path;
context.operation = LWM2M_OP_READ;
out.out_zpkt = &request;
SYS_LOG_DBG("[%s] NOTIFY MSG START: %u/%u/%u(%u) token:'%s' [%s] %lld",
manual_trigger ? "MANUAL" : "AUTO",
@ -2503,11 +2613,23 @@ static int generate_notify_message(struct observe_node *obs,
return -EINVAL;
}
ret = lwm2m_init_message(&obs->ctx->net_app_ctx, out.out_zpkt, &pkt,
ZOAP_TYPE_CON, ZOAP_RESPONSE_CODE_CONTENT,
0, obs->token, obs->tkl);
msg = lwm2m_get_message(obs->ctx);
if (!msg) {
SYS_LOG_ERR("Unable to get a lwm2m message!");
return -ENOMEM;
}
out.out_zpkt = &msg->zpkt;
msg->type = ZOAP_TYPE_CON;
msg->code = ZOAP_RESPONSE_CODE_CONTENT;
msg->mid = 0;
msg->token = obs->token;
msg->tkl = obs->tkl;
msg->reply_cb = notify_message_reply_cb;
ret = lwm2m_init_message(msg);
if (ret) {
goto cleanup;
return ret;
}
/* each notification should increment the obs counter */
@ -2542,36 +2664,17 @@ static int generate_notify_message(struct observe_node *obs,
goto cleanup;
}
pending = lwm2m_init_message_pending(obs->ctx, out.out_zpkt);
if (!pending) {
ret = -ENOMEM;
goto cleanup;
}
reply = zoap_reply_next_unused(obs->ctx->replies,
CONFIG_LWM2M_ENGINE_MAX_REPLIES);
if (!reply) {
SYS_LOG_ERR("No resources for waiting for replies.");
ret = -ENOMEM;
goto cleanup;
}
zoap_reply_init(reply, &request);
reply->reply = notify_message_reply_cb;
ret = lwm2m_udp_sendto(&obs->ctx->net_app_ctx, pkt);
ret = lwm2m_send_message(msg);
if (ret < 0) {
SYS_LOG_ERR("Error sending LWM2M packet (err:%d).", ret);
goto cleanup;
}
SYS_LOG_DBG("NOTIFY MSG: SENT");
zoap_pending_cycle(pending);
k_delayed_work_submit(&obs->ctx->retransmit_work, pending->timeout);
return ret;
SYS_LOG_DBG("NOTIFY MSG: SENT");
return 0;
cleanup:
lwm2m_init_message_cleanup(pkt, pending, reply);
lwm2m_release_message(msg);
return ret;
}