net: lwm2m: move to flat buffers

As part of the migration from net_app APIs to socket APIs, let's
stop referencing the net_pkt fragments throughout the LwM2M library.

Establish a msg_data flat buffer inside lwm2m_message and use that
instead.

NOTE: As a part of this change we remove the COAP_NET_PKT setting.
The COAP library reverts to COAP_SOCK behavior.

This doesn't mean we use sockets in LwM2M (yet), it only means we
use the socket-compatible COAP library which parses flat buffers
instead of net_pkt fragments.

Signed-off-by: Michael Scott <mike@foundries.io>
This commit is contained in:
Michael Scott 2019-01-25 14:13:07 -08:00 committed by Anas Nashif
commit 3bfb7debb3
14 changed files with 388 additions and 296 deletions

View file

@ -37,12 +37,11 @@ LOG_MODULE_REGISTER(LOG_MODULE_NAME);
#include <errno.h>
#include <init.h>
#include <misc/printk.h>
#include <net/net_app.h>
#include <net/net_ip.h>
#include <net/net_pkt.h>
#include <net/udp.h>
#include <net/coap.h>
#include <net/lwm2m.h>
#include <net/net_pkt.h>
#include <net/net_app.h>
#include "lwm2m_object.h"
#include "lwm2m_engine.h"
@ -949,10 +948,6 @@ void lwm2m_reset_message(struct lwm2m_message *msg, bool release)
if (release) {
(void)memset(msg, 0, sizeof(*msg));
} else {
if (msg->cpkt.pkt) {
net_pkt_unref(msg->cpkt.pkt);
}
msg->message_timeout_cb = NULL;
(void)memset(&msg->cpkt, 0, sizeof(msg->cpkt));
}
@ -960,9 +955,6 @@ void lwm2m_reset_message(struct lwm2m_message *msg, bool release)
int lwm2m_init_message(struct lwm2m_message *msg)
{
struct net_pkt *pkt;
struct net_app_ctx *app_ctx;
struct net_buf *frag;
u8_t tokenlen = 0U;
u8_t *token = NULL;
int r = 0;
@ -972,20 +964,6 @@ int lwm2m_init_message(struct lwm2m_message *msg)
return -EINVAL;
}
app_ctx = &msg->ctx->net_app_ctx;
pkt = net_app_get_net_pkt(app_ctx, AF_UNSPEC, BUF_ALLOC_TIMEOUT);
if (!pkt) {
LOG_ERR("Unable to get TX packet, not enough memory.");
return -ENOMEM;
}
frag = net_app_get_net_buf(app_ctx, pkt, BUF_ALLOC_TIMEOUT);
if (!frag) {
LOG_ERR("Unable to get DATA buffer, not enough memory.");
r = -ENOMEM;
goto cleanup;
}
/*
* msg->tkl == 0 is for a new TOKEN
* msg->tkl == LWM2M_MSG_TOKEN_LEN_SKIP means dont set
@ -998,8 +976,8 @@ int lwm2m_init_message(struct lwm2m_message *msg)
token = msg->token;
}
r = coap_packet_init(&msg->cpkt, pkt, 1, msg->type,
tokenlen, token, msg->code,
r = coap_packet_init(&msg->cpkt, msg->msg_data, sizeof(msg->msg_data),
1, msg->type, tokenlen, token, msg->code,
(msg->mid > 0 ? msg->mid : coap_next_id()));
if (r < 0) {
LOG_ERR("coap packet init error (err:%d)", r);
@ -1022,7 +1000,7 @@ int lwm2m_init_message(struct lwm2m_message *msg)
}
r = coap_pending_init(msg->pending, &msg->cpkt,
&app_ctx->default_ctx->remote);
&msg->ctx->net_app_ctx.default_ctx->remote);
if (r < 0) {
LOG_ERR("Unable to initialize a pending "
"retransmission (err:%d).", r);
@ -1048,15 +1026,13 @@ int lwm2m_init_message(struct lwm2m_message *msg)
cleanup:
lwm2m_reset_message(msg, true);
if (pkt) {
net_pkt_unref(pkt);
}
return r;
}
int lwm2m_send_message(struct lwm2m_message *msg)
{
struct net_pkt *pkt;
int ret;
if (!msg || !msg->ctx) {
@ -1065,15 +1041,27 @@ int lwm2m_send_message(struct lwm2m_message *msg)
}
if (msg->type == COAP_TYPE_CON) {
/*
* Increase packet ref count to avoid being unref after
* net_app_send_pkt()
*/
coap_pending_cycle(msg->pending);
}
msg->send_attempts++;
ret = net_app_send_pkt(&msg->ctx->net_app_ctx, msg->cpkt.pkt,
pkt = net_app_get_net_pkt(&msg->ctx->net_app_ctx, AF_UNSPEC,
BUF_ALLOC_TIMEOUT);
if (!pkt) {
LOG_ERR("Unable to get TX packet, not enough memory.");
ret = -ENOMEM;
goto cleanup;
}
if (!net_pkt_append(pkt, msg->cpkt.offset, msg->cpkt.data,
BUF_ALLOC_TIMEOUT)) {
LOG_ERR("Unable to add packet data, not enough memory.");
ret = -ENOMEM;
goto cleanup;
}
ret = net_app_send_pkt(&msg->ctx->net_app_ctx, pkt,
&msg->ctx->net_app_ctx.default_ctx->remote,
NET_SOCKADDR_MAX_SIZE, K_NO_WAIT, NULL);
if (ret < 0) {
@ -1081,6 +1069,7 @@ int lwm2m_send_message(struct lwm2m_message *msg)
coap_pending_clear(msg->pending);
}
net_pkt_unref(pkt);
return ret;
}
@ -1096,6 +1085,13 @@ int lwm2m_send_message(struct lwm2m_message *msg)
lwm2m_reset_message(msg, true);
}
return ret;
cleanup:
if (pkt) {
net_pkt_unref(pkt);
}
return ret;
}
@ -2090,9 +2086,8 @@ size_t lwm2m_engine_get_opaque_more(struct lwm2m_input_context *in,
*last_block = true;
}
in->frag = net_frag_read(in->frag, in->offset, &in->offset, in_len,
buf);
if (!in->frag && in->offset == 0xffff) {
if (buf_read(buf, in_len, CPKT_BUF_READ(in->in_cpkt),
&in->offset) < 0) {
*last_block = true;
return 0;
}
@ -2145,17 +2140,17 @@ int lwm2m_write_handler(struct lwm2m_engine_obj_inst *obj_inst,
struct lwm2m_engine_obj_field *obj_field,
struct lwm2m_message *msg)
{
s64_t temp64 = 0;
s32_t temp32 = 0;
struct block_context *block_ctx = NULL;
void *data_ptr = NULL;
size_t data_len = 0;
size_t len = 0;
size_t total_size = 0;
s64_t temp64 = 0;
s32_t temp32 = 0;
int ret = 0;
u8_t tkl = 0U;
u8_t token[8];
u8_t tkl = 0U;
bool last_block = true;
struct block_context *block_ctx = NULL;
if (!obj_inst || !res || !obj_field || !msg) {
return -EINVAL;
@ -2701,7 +2696,6 @@ int lwm2m_perform_read_op(struct lwm2m_engine_obj *obj,
struct lwm2m_engine_obj_field *obj_field;
struct lwm2m_obj_path temp_path;
int ret = 0, index;
u16_t temp_len;
u8_t num_read = 0U;
if (msg->path.level >= 2) {
@ -2733,10 +2727,6 @@ int lwm2m_perform_read_op(struct lwm2m_engine_obj *obj,
/* store original path values so we can change them during processing */
memcpy(&temp_path, &msg->path, sizeof(temp_path));
msg->out.frag = coap_packet_get_payload(msg->out.out_cpkt,
&msg->out.offset,
&temp_len);
msg->out.offset++;
engine_put_begin(&msg->out, &msg->path);
while (obj_inst) {
@ -2832,10 +2822,11 @@ move_forward:
return ret;
}
static int print_attr(struct net_pkt *pkt, char *buf, u16_t buflen, void *ref)
static int print_attr(struct lwm2m_output_context *out,
u8_t *buf, u16_t buflen, void *ref)
{
struct lwm2m_attr *attr;
int i, used, base;
int i, used, base, ret;
u8_t digit;
s32_t fraction;
@ -2865,8 +2856,9 @@ static int print_attr(struct net_pkt *pkt, char *buf, u16_t buflen, void *ref)
base /= 10;
}
if (!net_pkt_append_all(pkt, used, buf, BUF_ALLOC_TIMEOUT)) {
return -ENOMEM;
ret = buf_append(CPKT_BUF_WRITE(out->out_cpkt), buf, used);
if (ret < 0) {
return ret;
}
}
@ -2879,7 +2871,6 @@ static int do_discover_op(struct lwm2m_message *msg, bool well_known)
struct lwm2m_engine_obj *obj;
struct lwm2m_engine_obj_inst *obj_inst;
int ret;
u16_t temp_len;
bool reported = false;
/* object ID is required unless it's bootstrap discover (TODO) or it's
@ -2907,27 +2898,24 @@ static int do_discover_op(struct lwm2m_message *msg, bool well_known)
return ret;
}
msg->out.frag = coap_packet_get_payload(msg->out.out_cpkt, &msg->out.offset,
&temp_len);
msg->out.offset++;
/* Handle CoAP .well-known/core discover */
if (well_known) {
/* </.well-known/core> */
if (!net_pkt_append_all(msg->out.out_cpkt->pkt,
strlen(WELL_KNOWN_CORE_PATH),
WELL_KNOWN_CORE_PATH,
BUF_ALLOC_TIMEOUT)) {
return -ENOMEM;
ret = buf_append(CPKT_BUF_WRITE(msg->out.out_cpkt),
WELL_KNOWN_CORE_PATH,
strlen(WELL_KNOWN_CORE_PATH));
if (ret < 0) {
return ret;
}
SYS_SLIST_FOR_EACH_CONTAINER(&engine_obj_list, obj, node) {
snprintk(disc_buf, sizeof(disc_buf), ",</%u>",
obj->obj_id);
if (!net_pkt_append_all(msg->out.out_cpkt->pkt,
strlen(disc_buf), disc_buf,
BUF_ALLOC_TIMEOUT)) {
return -ENOMEM;
ret = buf_append(CPKT_BUF_WRITE(msg->out.out_cpkt),
disc_buf, strlen(disc_buf));
if (ret < 0) {
return ret;
}
}
@ -2954,15 +2942,15 @@ static int do_discover_op(struct lwm2m_message *msg, bool well_known)
snprintk(disc_buf, sizeof(disc_buf), "%s</%u>",
reported ? "," : "",
obj_inst->obj->obj_id);
if (!net_pkt_append_all(msg->out.out_cpkt->pkt,
strlen(disc_buf), disc_buf,
BUF_ALLOC_TIMEOUT)) {
return -ENOMEM;
ret = buf_append(CPKT_BUF_WRITE(msg->out.out_cpkt),
disc_buf, strlen(disc_buf));
if (ret < 0) {
return ret;
}
/* report object attrs (5.4.2) */
ret = print_attr(msg->out.out_cpkt->pkt,
disc_buf, sizeof(disc_buf),
ret = print_attr(&msg->out, disc_buf, sizeof(disc_buf),
obj_inst->obj);
if (ret < 0) {
return ret;
@ -2981,15 +2969,15 @@ static int do_discover_op(struct lwm2m_message *msg, bool well_known)
snprintk(disc_buf, sizeof(disc_buf), "%s</%u/%u>",
reported ? "," : "",
obj_inst->obj->obj_id, obj_inst->obj_inst_id);
if (!net_pkt_append_all(msg->out.out_cpkt->pkt,
strlen(disc_buf), disc_buf,
BUF_ALLOC_TIMEOUT)) {
return -ENOMEM;
ret = buf_append(CPKT_BUF_WRITE(msg->out.out_cpkt),
disc_buf, strlen(disc_buf));
if (ret < 0) {
return ret;
}
/* report object instance attrs (5.4.2) */
ret = print_attr(msg->out.out_cpkt->pkt,
disc_buf, sizeof(disc_buf),
ret = print_attr(&msg->out, disc_buf, sizeof(disc_buf),
obj_inst);
if (ret < 0) {
return ret;
@ -3011,15 +2999,16 @@ static int do_discover_op(struct lwm2m_message *msg, bool well_known)
obj_inst->obj->obj_id,
obj_inst->obj_inst_id,
obj_inst->resources[i].res_id);
if (!net_pkt_append_all(msg->out.out_cpkt->pkt,
strlen(disc_buf), disc_buf,
BUF_ALLOC_TIMEOUT)) {
return -ENOMEM;
ret = buf_append(CPKT_BUF_WRITE(msg->out.out_cpkt),
disc_buf, strlen(disc_buf));
if (ret < 0) {
return ret;
}
/* report resource attrs when path > 1 (5.4.2) */
if (msg->path.level > 1) {
ret = print_attr(msg->out.out_cpkt->pkt,
ret = print_attr(&msg->out,
disc_buf, sizeof(disc_buf),
&obj_inst->resources[i]);
if (ret < 0) {
@ -3105,6 +3094,7 @@ static int handle_request(struct coap_packet *request,
bool well_known = false;
struct block_context *block_ctx = NULL;
enum coap_block_size block_size;
u16_t payload_len = 0U;
bool last_block = false;
/* set CoAP request / message */
@ -3250,9 +3240,8 @@ static int handle_request(struct coap_packet *request,
}
/* setup incoming data */
msg->in.frag = coap_packet_get_payload(msg->in.in_cpkt,
&msg->in.offset,
&msg->in.payload_len);
msg->in.offset = msg->in.in_cpkt->hdr_len + msg->in.in_cpkt->opt_len;
coap_packet_get_payload(msg->in.in_cpkt, &payload_len);
/* Check for block transfer */
r = get_option_int(msg->in.in_cpkt, COAP_OPTION_BLOCK1);
@ -3262,8 +3251,7 @@ static int handle_request(struct coap_packet *request,
/* RFC7252: 4.6. Message Size */
block_size = GET_BLOCK_SIZE(r);
if (!last_block &&
coap_block_size_to_bytes(block_size) >
msg->in.payload_len) {
coap_block_size_to_bytes(block_size) > payload_len) {
LOG_DBG("Trailing payload is discarded!");
r = -EFBIG;
goto error;
@ -3425,6 +3413,8 @@ void lwm2m_udp_receive(struct lwm2m_ctx *client_ctx, struct net_pkt *pkt,
struct coap_reply *reply;
struct coap_packet response;
struct sockaddr from_addr;
static u8_t in_buf[NET_IPV6_MTU];
size_t recv_len;
int r;
u8_t token[8];
u8_t tkl;
@ -3454,10 +3444,23 @@ void lwm2m_udp_receive(struct lwm2m_ctx *client_ctx, struct net_pkt *pkt,
}
#endif
r = coap_packet_parse(&response, pkt, NULL, 0);
/* copy data from pkt */
recv_len = net_pkt_appdatalen(pkt);
if (recv_len > sizeof(in_buf)) {
recv_len = sizeof(in_buf);
}
(void)net_frag_linearize(in_buf, recv_len, pkt,
net_pkt_appdata(pkt) - pkt->frags->data,
recv_len);
/* now that we have data, free pkt */
net_pkt_unref(pkt);
r = coap_packet_parse(&response, in_buf, recv_len, NULL, 0);
if (r < 0) {
LOG_ERR("Invalid data received (err:%d)", r);
goto cleanup;
return;
}
tkl = coap_header_get_token(&response, token);
@ -3471,9 +3474,6 @@ void lwm2m_udp_receive(struct lwm2m_ctx *client_ctx, struct net_pkt *pkt,
*/
if (pending) {
msg = find_msg(pending, NULL);
if (msg) {
msg->pending = NULL;
}
}
LOG_DBG("checking for reply from [%s]",
@ -3495,7 +3495,7 @@ void lwm2m_udp_receive(struct lwm2m_ctx *client_ctx, struct net_pkt *pkt,
if (handle_separate_response && !tkl &&
coap_header_get_type(&response) == COAP_TYPE_ACK) {
LOG_DBG("separated response, not removing reply");
goto cleanup;
return;
}
if (!msg) {
@ -3509,7 +3509,7 @@ void lwm2m_udp_receive(struct lwm2m_ctx *client_ctx, struct net_pkt *pkt,
/* reset reply->user_data for next time */
reply->user_data = (void *)COAP_REPLY_STATUS_NONE;
LOG_DBG("reply %p NOT removed", reply);
goto cleanup;
return;
}
/* free up msg resources */
@ -3518,7 +3518,7 @@ void lwm2m_udp_receive(struct lwm2m_ctx *client_ctx, struct net_pkt *pkt,
}
LOG_DBG("reply %p handled and removed", reply);
goto cleanup;
return;
}
/*
@ -3531,7 +3531,7 @@ void lwm2m_udp_receive(struct lwm2m_ctx *client_ctx, struct net_pkt *pkt,
msg = lwm2m_get_message(client_ctx);
if (!msg) {
LOG_ERR("Unable to get a lwm2m message!");
goto cleanup;
return;
}
/* Create a response message if we reach this point */
@ -3544,7 +3544,7 @@ void lwm2m_udp_receive(struct lwm2m_ctx *client_ctx, struct net_pkt *pkt,
/* process the response to this request */
r = udp_request_handler(&response, msg);
if (r < 0) {
goto cleanup;
return;
}
r = lwm2m_send_message(msg);
@ -3555,11 +3555,6 @@ void lwm2m_udp_receive(struct lwm2m_ctx *client_ctx, struct net_pkt *pkt,
} else {
LOG_ERR("No handler for response");
}
cleanup:
if (pkt) {
net_pkt_unref(pkt);
}
}
static void udp_receive(struct net_app_ctx *app_ctx, struct net_pkt *pkt,
@ -3574,6 +3569,7 @@ static void udp_receive(struct net_app_ctx *app_ctx, struct net_pkt *pkt,
static void retransmit_request(struct k_work *work)
{
struct net_pkt *pkt;
struct lwm2m_ctx *client_ctx;
struct lwm2m_message *msg;
struct coap_pending *pending;
@ -3608,6 +3604,20 @@ static void retransmit_request(struct k_work *work)
LOG_DBG("Resending message: %p", msg);
msg->send_attempts++;
pkt = net_app_get_net_pkt(&msg->ctx->net_app_ctx, AF_UNSPEC,
BUF_ALLOC_TIMEOUT);
if (!pkt) {
LOG_ERR("Unable to get TX packet, not enough memory.");
goto cleanup;
}
if (!net_pkt_append(pkt, msg->cpkt.offset, msg->cpkt.data,
BUF_ALLOC_TIMEOUT)) {
LOG_ERR("Unable to add packet data, not enough memory.");
goto cleanup;
}
/*
* Don't use lwm2m_send_message() because it calls
* coap_pending_cycle() / coap_pending_cycle() in a different order
@ -3615,16 +3625,22 @@ static void retransmit_request(struct k_work *work)
* unref of the net_pkt. Keep it simple and call net_app_send_pkt()
* directly here.
*/
r = net_app_send_pkt(&msg->ctx->net_app_ctx, msg->cpkt.pkt,
r = net_app_send_pkt(&msg->ctx->net_app_ctx, pkt,
&msg->ctx->net_app_ctx.default_ctx->remote,
NET_SOCKADDR_MAX_SIZE, K_NO_WAIT, NULL);
if (r < 0) {
LOG_ERR("Error sending lwm2m message: %d", r);
/* don't error here, retry until timeout */
net_pkt_unref(msg->cpkt.pkt);
net_pkt_unref(pkt);
}
k_delayed_work_submit(&client_ctx->retransmit_work, pending->timeout);
return;
cleanup:
if (pkt) {
net_pkt_unref(pkt);
}
}
static int notify_message_reply_cb(const struct coap_packet *response,