net: mqtt-sn: Add Gateway Advertisement and Discovery process support

Fixes: #78010
This commit implements the "Gateway Advertisement and Discovery" process
defined in section 6.1 of the MQTT-SN specification.
This includes breaking changes to the transport interface and the default
included UDP interface implementation as support for UDP multicast
messages is added as implemented by the Paho MQTT-SN Gateway.

Signed-off-by: Kenneth Witham <kennywitham4@gmail.com>
This commit is contained in:
Kenneth Witham 2024-10-28 08:09:49 -07:00 committed by Anas Nashif
commit aa9c9228d4
6 changed files with 637 additions and 98 deletions

View file

@ -84,17 +84,33 @@ used. An example configuration for UDP transport is shown below:
mqtt_sn_client_init(&client, &client_id, &tp.tp, evt_cb, tx_buf, sizeof(tx_buf), rx_buf, sizeof(rx_buf));
After the configuration is set up, the MQTT-SN client can connect to the gateway.
While the MQTT-SN protocol offers functionality to discover gateways through an
advertisement mechanism, this is not implemented yet in the library.
After the configuration is set up, the network address for the gateway to
connect to must be defined. The MQTT-SN protocol offers functionality to
discover gateways through an advertisement or a search mechanism. A user
should do at least one of the following steps to define a Gateway for the library:
Call the ``mqtt_sn_connect`` function, which will send a ``CONNECT`` message.
The application should periodically call the ``mqtt_sn_input`` function to process
the response received. The application does not have to call ``mqtt_sn_input`` if it
knows that no data has been received (e.g. when using Bluetooth). Note that
``mqtt_sn_input`` is a non-blocking function, if the transport struct contains a
``poll`` compatible function pointer.
If the connection was successful, ``MQTT_SN_EVT_CONNECTED`` will be notified to the
* Call the :c:func:`mqtt_sn_add_gw` function to manually define a Gateway address.
* Wait for an :c:enumerator:`MQTT_SN_EVT_ADVERTISE`.
* Call the :c:func:`mqtt_sn_search` function and wait for an :c:enumerator:`MQTT_SN_EVT_GWINFO` callback.
Make sure to call the :c:func:`mqtt_sn_input` function periodically to process incoming messages.
Example :c:func:`mqtt_sn_search` function call:
.. code-block:: c
err = mqtt_sn_search(&mqtt_client, 1);
k_sleep(K_SECONDS(10));
err = mqtt_sn_input(&mqtt_client);
__ASSERT(err == 0, "mqtt_sn_search() failed %d", err);
After the Gateway address has been defined or found, the MQTT-SN client can
connect to the gateway. Call the :c:func:`mqtt_sn_connect` function, which will send a
``CONNECT`` MQTT-SN message. The application should periodically call the :c:func:`mqtt_sn_input`
function to process the response received. The application does not have to call
:c:func:`mqtt_sn_input` if it knows that no data has been received (e.g. when using Bluetooth).
Note that :c:func:`mqtt_sn_input` is a non-blocking function, if the transport struct contains a
:c:func:`poll` compatible function pointer.
If the connection was successful, :c:enumerator:`MQTT_SN_EVT_CONNECTED` will be notified to the
application through the callback function.
.. code-block:: c
@ -110,19 +126,19 @@ application through the callback function.
k_sleep(K_MSEC(500));
}
In the above code snippet, the event handler function should set the ``connected``
flag upon a successful connection. If the connection fails at the MQTT level
or a timeout occurs, the connection will be aborted.
In the above code snippet, the gateway is connected to before publishing messages.
If the connection fails at the MQTT level or a timeout occurs, the connection will be aborted and
an error returned.
After the connection is established, an application needs to call ``mqtt_input``
After the connection is established, an application needs to call :c:func:`mqtt_input`
function periodically to process incoming data. Connection upkeep, on the other hand,
is done automatically using a k_work item.
If a MQTT message is received, an MQTT callback function will be called and an
appropriate event notified.
The connection can be closed by calling the ``mqtt_sn_disconnect`` function. This
The connection can be closed by calling the :c:func:`mqtt_sn_disconnect` function. This
has no effect on the transport, however. If you want to close the transport (e.g.
the socket), call ``mqtt_sn_client_deinit``, which will deinit the transport as well.
the socket), call :c:func:`mqtt_sn_client_deinit`, which will deinit the transport as well.
Zephyr provides sample code utilizing the MQTT-SN client API. See
:zephyr:code-sample:`mqtt-sn-publisher` for more information.
@ -134,7 +150,6 @@ Certain parts of the protocol are not yet supported in the library.
* Pre-defined topic IDs
* QoS -1 - it's most useful with predefined topics
* Gateway discovery using ADVERTISE, SEARCHGW and GWINFO messages.
* Setting the will topic and message after the initial connect
* Forwarder Encapsulation

View file

@ -75,16 +75,16 @@ enum mqtt_sn_topic_type {
* MQTT-SN return codes.
*/
enum mqtt_sn_return_code {
MQTT_SN_CODE_ACCEPTED = 0x00, /**< Accepted */
MQTT_SN_CODE_ACCEPTED = 0x00, /**< Accepted */
MQTT_SN_CODE_REJECTED_CONGESTION = 0x01, /**< Rejected: congestion */
MQTT_SN_CODE_REJECTED_TOPIC_ID = 0x02, /**< Rejected: Invalid Topic ID */
MQTT_SN_CODE_REJECTED_NOTSUP = 0x03, /**< Rejected: Not Supported */
MQTT_SN_CODE_REJECTED_TOPIC_ID = 0x02, /**< Rejected: Invalid Topic ID */
MQTT_SN_CODE_REJECTED_NOTSUP = 0x03, /**< Rejected: Not Supported */
};
/** @brief Abstracts memory buffers. */
struct mqtt_sn_data {
const uint8_t *data; /**< Pointer to data. */
uint16_t size; /**< Size of data, in bytes. */
size_t size; /**< Size of data, in bytes. */
};
/**
@ -105,19 +105,22 @@ struct mqtt_sn_data {
*
* struct mqtt_sn_data data = MQTT_SN_DATA_BYTES(0x13, 0x37);
*/
#define MQTT_SN_DATA_BYTES(...) \
((struct mqtt_sn_data) { (uint8_t[]){ __VA_ARGS__ }, sizeof((uint8_t[]){ __VA_ARGS__ })})
#define MQTT_SN_DATA_BYTES(...) \
((struct mqtt_sn_data){(uint8_t[]){__VA_ARGS__}, sizeof((uint8_t[]){__VA_ARGS__})})
/**
* Event types that can be emitted by the library.
*/
enum mqtt_sn_evt_type {
MQTT_SN_EVT_CONNECTED, /**< Connected to a gateway */
MQTT_SN_EVT_CONNECTED, /**< Connected to a gateway */
MQTT_SN_EVT_DISCONNECTED, /**< Disconnected */
MQTT_SN_EVT_ASLEEP, /**< Entered ASLEEP state */
MQTT_SN_EVT_AWAKE, /**< Entered AWAKE state */
MQTT_SN_EVT_PUBLISH, /**< Received a PUBLISH message */
MQTT_SN_EVT_PINGRESP /**< Received a PINGRESP */
MQTT_SN_EVT_ASLEEP, /**< Entered ASLEEP state */
MQTT_SN_EVT_AWAKE, /**< Entered AWAKE state */
MQTT_SN_EVT_PUBLISH, /**< Received a PUBLISH message */
MQTT_SN_EVT_PINGRESP, /**< Received a PINGRESP */
MQTT_SN_EVT_ADVERTISE, /**< Received a ADVERTISE */
MQTT_SN_EVT_GWINFO, /**< Received a GWINFO */
MQTT_SN_EVT_SEARCHGW /**< Received a SEARCHGW */
};
/**
@ -180,16 +183,27 @@ struct mqtt_sn_transport {
void (*deinit)(struct mqtt_sn_transport *transport);
/**
* Will be called by the library when it wants to send a message.
* @brief Will be called by the library when it wants to send a message.
*
* Implementations should follow sendto conventions with exceptions.
* When dest_addr == NULL, message should be broadcast with addrlen being
* the broadcast radius. This should also handle setting up/destroying
* connections as required when the address changes.
*
* @return ENOERR on connection+transmission success, Negative values
* signal errors.
*/
int (*msg_send)(struct mqtt_sn_client *client, void *buf, size_t sz);
int (*sendto)(struct mqtt_sn_client *client, void *buf, size_t sz, const void *dest_addr,
size_t addrlen);
/**
* @brief Will be called by the library when it wants to receive a message.
*
* Implementations should follow recv conventions.
* Implementations should follow recvfrom conventions with the exception
* of a NULL src_addr being a broadcast message.
*/
ssize_t (*recv)(struct mqtt_sn_client *client, void *buffer, size_t length);
ssize_t (*recvfrom)(struct mqtt_sn_client *client, void *rx_buf, size_t rx_len,
void *src_addr, size_t *addrlen);
/**
* @brief Check if incoming data is available.
@ -215,9 +229,9 @@ struct mqtt_sn_transport_udp {
/** Socket FD */
int sock;
/** Address of the gateway */
struct sockaddr gwaddr;
socklen_t gwaddrlen;
/** Address of broadcasts */
struct sockaddr bcaddr;
socklen_t bcaddrlen;
};
#define UDP_TRANSPORT(transport) CONTAINER_OF(transport, struct mqtt_sn_transport_udp, tp)
@ -265,6 +279,9 @@ struct mqtt_sn_client {
/** Buffer for incoming data */
struct net_buf_simple rx;
/** Buffer for incoming data sender address */
struct net_buf_simple rx_addr;
/** Event callback */
mqtt_sn_evt_cb_t evt_cb;
@ -277,6 +294,9 @@ struct mqtt_sn_client {
/** List of registered topics */
sys_slist_t topic;
/** List of found gateways */
sys_slist_t gateway;
/** Current state of the MQTT-SN client */
int state;
@ -286,6 +306,15 @@ struct mqtt_sn_client {
/** Number of retries for failed ping attempts */
uint8_t ping_retries;
/** Timestamp of the next SEARCHGW transmission */
int64_t ts_searchgw;
/** Timestamp of the next GWINFO transmission */
int64_t ts_gwinfo;
/** Radius of the next GWINFO transmission */
int64_t radius_gwinfo;
/** Delayable work structure for processing MQTT-SN events */
struct k_work_delayable process_work;
};
@ -317,6 +346,29 @@ int mqtt_sn_client_init(struct mqtt_sn_client *client, const struct mqtt_sn_data
*/
void mqtt_sn_client_deinit(struct mqtt_sn_client *client);
/**
* @brief Manually add a Gateway, bypasing the normal search process.
*
* This function manually creates a gateway that is stored internal to the library.
*
* @param client The MQTT-SN client to connect.
* @param gw_id Single byte Gateway Identifier
* @param gw_addr Address data structure to be used by the transport layer.
*
* @return 0 or a negative error code (errno.h) indicating reason of failure.
*/
int mqtt_sn_add_gw(struct mqtt_sn_client *client, uint8_t gw_id, struct mqtt_sn_data gw_addr);
/**
* @brief Initiate the MQTT-SN GW Search process.
*
* @param client The MQTT-SN client to connect.
* @param radius Broadcast radius for the search message.
*
* @return 0 or a negative error code (errno.h) indicating reason of failure.
*/
int mqtt_sn_search(struct mqtt_sn_client *client, uint8_t radius);
/**
* @brief Connect the client.
*

View file

@ -14,26 +14,51 @@ if MQTT_SN_LIB
config MQTT_SN_LIB_MAX_PAYLOAD_SIZE
int "Maximum payload size of an MQTT-SN message"
default $(UINT8_MAX)
range $(UINT8_MAX) $(UINT16_MAX)
config MQTT_SN_LIB_MAX_MSGS
int "Number of preallocated messages"
default 10
range 1 $(UINT8_MAX)
config MQTT_SN_LIB_MAX_TOPICS
int "Number of topics that can be managed"
default 20
range 1 $(UINT8_MAX)
config MQTT_SN_LIB_MAX_TOPIC_SIZE
int "Maximum topic length"
default 64
range 1 $(UINT16_MAX)
config MQTT_SN_LIB_MAX_GATEWAYS
int "Maximum number of gateways to store internally"
default 2
range 1 $(UINT8_MAX)
config MQTT_SN_LIB_MAX_ADDR_SIZE
int "Maximum address size for the transport"
default 21
range 1 $(UINT8_MAX)
help
The MQTT_SN library stores addresses internally and thus
needs to know how long your addresses are. Set this to the maximum
length in bytes of the address data structure for your implemented transport.
config MQTT_SN_LIB_BROADCAST_RADIUS
int "Radius for broadcast messages"
default 1
range 1 $(UINT8_MAX)
config MQTT_SN_LIB_MAX_PUBLISH
int "Number of publishes that can be in-flight at the same time"
default 5
range 1 $(UINT8_MAX)
config MQTT_SN_KEEPALIVE
int "Maximum number of clients Keep alive time for MQTT-SN (in seconds)"
default 60
range 1 $(UINT8_MAX)
help
Keep alive time for MQTT-SN (in seconds). Sending of Ping Requests to
keep the connection alive are governed by this value.
@ -50,6 +75,22 @@ config MQTT_SN_LIB_N_RETRY
config MQTT_SN_LIB_T_RETRY
int "Time (seconds) to wait for responses"
default 10
range 0 $(UINT8_MAX)
config MQTT_SN_LIB_T_SEARCHGW
int "Max time (seconds) to wait before sending SEARCHGW"
default 10
range 0 $(UINT8_MAX)
config MQTT_SN_LIB_T_GWINFO
int "Max time (seconds) to wait before sending GWINFO"
default 10
range 0 $(UINT8_MAX)
config MQTT_SN_LIB_N_ADV
int "Number of missed Advertise messages before considering GW lost"
default 2
range 1 $(UINT8_MAX)
module=MQTT_SN
module-dep=NET_LOG

View file

@ -12,6 +12,7 @@
#include "mqtt_sn_msg.h"
#include <zephyr/logging/log.h>
#include <zephyr/random/random.h>
#include <zephyr/net/mqtt_sn.h>
LOG_MODULE_REGISTER(net_mqtt_sn, CONFIG_MQTT_SN_LOG_LEVEL);
@ -55,9 +56,19 @@ struct mqtt_sn_topic {
enum mqtt_sn_topic_state state;
};
K_MEM_SLAB_DEFINE_STATIC(publishes, sizeof(struct mqtt_sn_publish),
CONFIG_MQTT_SN_LIB_MAX_PUBLISH, 4);
struct mqtt_sn_gateway {
sys_snode_t next;
char gw_id;
int64_t adv_timer;
char addr[CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE];
size_t addr_len;
};
K_MEM_SLAB_DEFINE_STATIC(publishes, sizeof(struct mqtt_sn_publish), CONFIG_MQTT_SN_LIB_MAX_PUBLISH,
4);
K_MEM_SLAB_DEFINE_STATIC(topics, sizeof(struct mqtt_sn_topic), CONFIG_MQTT_SN_LIB_MAX_TOPICS, 4);
K_MEM_SLAB_DEFINE_STATIC(gateways, sizeof(struct mqtt_sn_gateway), CONFIG_MQTT_SN_LIB_MAX_GATEWAYS,
4);
enum mqtt_sn_client_state {
MQTT_SN_CLIENT_DISCONNECTED,
@ -74,8 +85,10 @@ static void mqtt_sn_set_state(struct mqtt_sn_client *client, enum mqtt_sn_client
LOG_DBG("Client %p state (%d) -> (%d)", client, prev_state, state);
}
#define T_RETRY_MSEC (CONFIG_MQTT_SN_LIB_T_RETRY * MSEC_PER_SEC)
#define N_RETRY (CONFIG_MQTT_SN_LIB_N_RETRY)
#define T_SEARCHGW_MSEC (CONFIG_MQTT_SN_LIB_T_SEARCHGW * MSEC_PER_SEC)
#define T_GWINFO_MSEC (CONFIG_MQTT_SN_LIB_T_GWINFO * MSEC_PER_SEC)
#define T_RETRY_MSEC (CONFIG_MQTT_SN_LIB_T_RETRY * MSEC_PER_SEC)
#define N_RETRY (CONFIG_MQTT_SN_LIB_N_RETRY)
#define T_KEEPALIVE_MSEC (CONFIG_MQTT_SN_KEEPALIVE * MSEC_PER_SEC)
static uint16_t next_msg_id(void)
@ -85,7 +98,8 @@ static uint16_t next_msg_id(void)
return ++msg_id;
}
static int encode_and_send(struct mqtt_sn_client *client, struct mqtt_sn_param *p)
static int encode_and_send(struct mqtt_sn_client *client, struct mqtt_sn_param *p,
uint8_t broadcast_radius)
{
int err;
@ -96,7 +110,7 @@ static int encode_and_send(struct mqtt_sn_client *client, struct mqtt_sn_param *
LOG_HEXDUMP_DBG(client->tx.data, client->tx.len, "Send message");
if (!client->transport->msg_send) {
if (!client->transport->sendto) {
LOG_ERR("Can't send: no callback");
err = -ENOTSUP;
goto end;
@ -108,13 +122,26 @@ static int encode_and_send(struct mqtt_sn_client *client, struct mqtt_sn_param *
goto end;
}
err = client->transport->msg_send(client, client->tx.data, client->tx.len);
if (err) {
LOG_ERR("Error during send: %d", err);
goto end;
if (broadcast_radius) {
err = client->transport->sendto(client, client->tx.data, client->tx.len, NULL,
broadcast_radius);
} else {
struct mqtt_sn_gateway *gw;
gw = SYS_SLIST_PEEK_HEAD_CONTAINER(&client->gateway, gw, next);
if (gw == NULL || gw->addr_len == 0) {
LOG_WRN("No Gateway Address");
err = -ENXIO;
goto end;
}
err = client->transport->sendto(client, client->tx.data, client->tx.len, gw->addr,
gw->addr_len);
}
end:
if (err) {
LOG_ERR("Error during send: %d", err);
}
net_buf_simple_reset(&client->tx);
return err;
@ -228,13 +255,13 @@ static struct mqtt_sn_topic *mqtt_sn_topic_create(struct mqtt_sn_data *name)
}
static struct mqtt_sn_topic *mqtt_sn_topic_find_name(struct mqtt_sn_client *client,
struct mqtt_sn_data *topic_name)
struct mqtt_sn_data *topic_name)
{
struct mqtt_sn_topic *topic;
SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
if (topic->namelen == topic_name->size &&
memcmp(topic->name, topic_name->data, topic_name->size) == 0) {
memcmp(topic->name, topic_name->data, topic_name->size) == 0) {
return topic;
}
}
@ -243,7 +270,7 @@ static struct mqtt_sn_topic *mqtt_sn_topic_find_name(struct mqtt_sn_client *clie
}
static struct mqtt_sn_topic *mqtt_sn_topic_find_msg_id(struct mqtt_sn_client *client,
uint16_t msg_id)
uint16_t msg_id)
{
struct mqtt_sn_topic *topic;
@ -287,6 +314,66 @@ static void mqtt_sn_topic_destroy_all(struct mqtt_sn_client *client)
}
}
static void mqtt_sn_gw_destroy(struct mqtt_sn_client *client, struct mqtt_sn_gateway *gw)
{
LOG_DBG("Destroying gateway %d", gw->gw_id);
sys_slist_find_and_remove(&client->gateway, &gw->next);
k_mem_slab_free(&gateways, (void *)gw);
}
static void mqtt_sn_gw_destroy_all(struct mqtt_sn_client *client)
{
struct mqtt_sn_gateway *gw;
sys_snode_t *next;
while ((next = sys_slist_get(&client->gateway)) != NULL) {
gw = SYS_SLIST_CONTAINER(next, gw, next);
sys_slist_find_and_remove(&client->gateway, next);
k_mem_slab_free(&gateways, (void *)gw);
}
}
static struct mqtt_sn_gateway *mqtt_sn_gw_create(uint8_t gw_id, short duration,
struct mqtt_sn_data gw_addr)
{
struct mqtt_sn_gateway *gw;
LOG_DBG("Free GW slots: %d", k_mem_slab_num_free_get(&gateways));
if (k_mem_slab_alloc(&gateways, (void **)&gw, K_NO_WAIT)) {
LOG_WRN("Can't create GW: no free slot");
return NULL;
}
__ASSERT(gw_addr.size < CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE,
"Gateway address is larger than allowed by CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE");
memset(gw, 0, sizeof(*gw));
memcpy(gw->addr, gw_addr.data, gw_addr.size);
gw->addr_len = gw_addr.size;
gw->gw_id = gw_id;
if (duration == -1) {
gw->adv_timer = duration;
} else {
gw->adv_timer =
k_uptime_get() + (duration * CONFIG_MQTT_SN_LIB_N_ADV * MSEC_PER_SEC);
}
return gw;
}
static struct mqtt_sn_gateway *mqtt_sn_gw_find_id(struct mqtt_sn_client *client, uint16_t gw_id)
{
struct mqtt_sn_gateway *gw;
SYS_SLIST_FOR_EACH_CONTAINER(&client->gateway, gw, next) {
if (gw->gw_id == gw_id) {
return gw;
}
}
return NULL;
}
static void mqtt_sn_disconnect_internal(struct mqtt_sn_client *client)
{
struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_DISCONNECTED};
@ -348,7 +435,7 @@ static void mqtt_sn_do_subscribe(struct mqtt_sn_client *client, struct mqtt_sn_t
return;
}
encode_and_send(client, &p);
encode_and_send(client, &p, 0);
}
static void mqtt_sn_do_unsubscribe(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic)
@ -380,7 +467,7 @@ static void mqtt_sn_do_unsubscribe(struct mqtt_sn_client *client, struct mqtt_sn
return;
}
encode_and_send(client, &p);
encode_and_send(client, &p, 0);
}
static void mqtt_sn_do_register(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic)
@ -408,7 +495,7 @@ static void mqtt_sn_do_register(struct mqtt_sn_client *client, struct mqtt_sn_to
return;
}
encode_and_send(client, &p);
encode_and_send(client, &p, 0);
}
static void mqtt_sn_do_publish(struct mqtt_sn_client *client, struct mqtt_sn_publish *pub, bool dup)
@ -435,7 +522,37 @@ static void mqtt_sn_do_publish(struct mqtt_sn_client *client, struct mqtt_sn_pub
p.params.publish.qos = pub->qos;
p.params.publish.dup = dup;
encode_and_send(client, &p);
encode_and_send(client, &p, 0);
}
static void mqtt_sn_do_searchgw(struct mqtt_sn_client *client)
{
struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_SEARCHGW};
p.params.searchgw.radius = CONFIG_MQTT_SN_LIB_BROADCAST_RADIUS;
encode_and_send(client, &p, CONFIG_MQTT_SN_LIB_BROADCAST_RADIUS);
}
static void mqtt_sn_do_gwinfo(struct mqtt_sn_client *client)
{
struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_GWINFO};
struct mqtt_sn_gateway *gw;
struct mqtt_sn_data addr;
gw = SYS_SLIST_PEEK_HEAD_CONTAINER(&client->gateway, gw, next);
if (gw == NULL || gw->addr_len == 0) {
LOG_WRN("No Gateway Address");
return;
}
response.params.gwinfo.gw_id = gw->gw_id;
addr.data = gw->addr;
addr.size = gw->addr_len;
response.params.gwinfo.gw_add = addr;
encode_and_send(client, &response, client->radius_gwinfo);
}
static void mqtt_sn_do_ping(struct mqtt_sn_client *client)
@ -454,7 +571,7 @@ static void mqtt_sn_do_ping(struct mqtt_sn_client *client)
p.params.pingreq.client_id.data = client->client_id.data;
p.params.pingreq.client_id.size = client->client_id.size;
case MQTT_SN_CLIENT_ACTIVE:
encode_and_send(client, &p);
encode_and_send(client, &p, 0);
break;
default:
LOG_WRN("Can't ping in state %d", client->state);
@ -469,8 +586,6 @@ static int process_pubs(struct mqtt_sn_client *client, int64_t *next_cycle)
int64_t next_attempt;
bool dup;
*next_cycle = 0;
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&client->publish, pub, pubs, next) {
LOG_HEXDUMP_DBG(pub->topic->name, pub->topic->namelen,
"Processing publish for topic");
@ -517,6 +632,8 @@ static int process_pubs(struct mqtt_sn_client *client, int64_t *next_cycle)
}
}
LOG_DBG("next_cycle: %lld", *next_cycle);
return 0;
}
@ -583,12 +700,15 @@ static int process_topics(struct mqtt_sn_client *client, int64_t *next_cycle)
}
}
LOG_DBG("next_cycle: %lld", *next_cycle);
return 0;
}
static int process_ping(struct mqtt_sn_client *client, int64_t *next_cycle)
{
const int64_t now = k_uptime_get();
struct mqtt_sn_gateway *gw = NULL;
int64_t next_ping;
if (client->ping_retries == N_RETRY) {
@ -602,6 +722,9 @@ static int process_ping(struct mqtt_sn_client *client, int64_t *next_cycle)
if (!client->ping_retries--) {
LOG_WRN("Ping ran out of retries");
mqtt_sn_disconnect_internal(client);
SYS_SLIST_PEEK_HEAD_CONTAINER(&client->gateway, gw, next);
LOG_DBG("Removing non-responsive GW 0x%08x", gw->gw_id);
mqtt_sn_gw_destroy(client, gw);
return -ETIMEDOUT;
}
@ -615,6 +738,63 @@ static int process_ping(struct mqtt_sn_client *client, int64_t *next_cycle)
*next_cycle = next_ping;
}
LOG_DBG("next_cycle: %lld", *next_cycle);
return 0;
}
static int process_search(struct mqtt_sn_client *client, int64_t *next_cycle)
{
const int64_t now = k_uptime_get();
LOG_DBG("ts_searchgw: %lld", client->ts_searchgw);
LOG_DBG("ts_gwinfo: %lld", client->ts_gwinfo);
if (client->ts_searchgw != 0 && client->ts_searchgw <= now) {
LOG_DBG("Sending SEARCHGW");
mqtt_sn_do_searchgw(client);
client->ts_searchgw = 0;
}
if (client->ts_gwinfo != 0 && client->ts_gwinfo <= now) {
LOG_DBG("Sending GWINFO");
mqtt_sn_do_gwinfo(client);
client->ts_gwinfo = 0;
}
if (*next_cycle == 0 || (client->ts_searchgw != 0 && client->ts_searchgw < *next_cycle)) {
*next_cycle = client->ts_searchgw;
}
if (*next_cycle == 0 || (client->ts_gwinfo != 0 && client->ts_gwinfo < *next_cycle)) {
*next_cycle = client->ts_gwinfo;
}
LOG_DBG("next_cycle: %lld", *next_cycle);
return 0;
}
static int process_advertise(struct mqtt_sn_client *client, int64_t *next_cycle)
{
const int64_t now = k_uptime_get();
struct mqtt_sn_gateway *gw;
struct mqtt_sn_gateway *gw_next;
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&client->gateway, gw, gw_next, next) {
LOG_DBG("Checking if GW 0x%02x is old", gw->gw_id);
if (gw->adv_timer != -1 && gw->adv_timer <= now) {
LOG_DBG("Removing non-responsive GW 0x%08x", gw->gw_id);
if (client->gateway.head == &gw->next) {
mqtt_sn_disconnect(client);
}
mqtt_sn_gw_destroy(client, gw);
}
if (gw->adv_timer != -1 && (*next_cycle == 0 || gw->adv_timer < *next_cycle)) {
*next_cycle = gw->adv_timer;
}
}
LOG_DBG("next_cycle: %lld", *next_cycle);
return 0;
}
@ -628,10 +808,18 @@ static void process_work(struct k_work *wrk)
dwork = k_work_delayable_from_work(wrk);
client = CONTAINER_OF(dwork, struct mqtt_sn_client, process_work);
LOG_DBG("Executing work of client %p in state %d", client, client->state);
LOG_DBG("Executing work of client %p in state %d at time %lld", client, client->state,
k_uptime_get());
if (client->state == MQTT_SN_CLIENT_DISCONNECTED) {
LOG_WRN("%s called while disconnected: Nothing to do", __func__);
/* Clean up old advertised gateways from list */
err = process_advertise(client, &next_cycle);
if (err) {
return;
}
/* Handle GW search process timers */
err = process_search(client, &next_cycle);
if (err) {
return;
}
@ -653,6 +841,7 @@ static void process_work(struct k_work *wrk)
}
if (next_cycle > 0) {
LOG_DBG("next_cycle: %lld", next_cycle);
k_work_schedule(dwork, K_MSEC(next_cycle - k_uptime_get()));
}
}
@ -695,6 +884,7 @@ void mqtt_sn_client_deinit(struct mqtt_sn_client *client)
mqtt_sn_publish_destroy_all(client);
mqtt_sn_topic_destroy_all(client);
mqtt_sn_gw_destroy_all(client);
if (client->transport && client->transport->deinit) {
client->transport->deinit(client->transport);
@ -703,6 +893,40 @@ void mqtt_sn_client_deinit(struct mqtt_sn_client *client)
k_work_cancel_delayable(&client->process_work);
}
int mqtt_sn_add_gw(struct mqtt_sn_client *client, uint8_t gw_id, struct mqtt_sn_data gw_addr)
{
struct mqtt_sn_gateway *gw;
gw = mqtt_sn_gw_find_id(client, gw_id);
if (gw != NULL) {
mqtt_sn_gw_destroy(client, gw);
}
gw = mqtt_sn_gw_create(gw_id, -1, gw_addr);
if (!gw) {
return -ENOMEM;
}
sys_slist_append(&client->gateway, &gw->next);
return 0;
}
int mqtt_sn_search(struct mqtt_sn_client *client, uint8_t radius)
{
if (!client) {
return -EINVAL;
}
/* Set SEARCHGW transmission timer */
client->ts_searchgw = k_uptime_get() + (T_SEARCHGW_MSEC * sys_rand8_get() / 255);
k_work_schedule(&client->process_work, K_NO_WAIT);
LOG_DBG("Requested SEARCHGW for time %lld at time %lld", client->ts_searchgw,
k_uptime_get());
return 0;
}
int mqtt_sn_connect(struct mqtt_sn_client *client, bool will, bool clean_session)
{
struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_CONNECT};
@ -728,7 +952,7 @@ int mqtt_sn_connect(struct mqtt_sn_client *client, bool will, bool clean_session
client->last_ping = k_uptime_get();
return encode_and_send(client, &p);
return encode_and_send(client, &p, 0);
}
int mqtt_sn_disconnect(struct mqtt_sn_client *client)
@ -742,7 +966,7 @@ int mqtt_sn_disconnect(struct mqtt_sn_client *client)
p.params.disconnect.duration = 0;
err = encode_and_send(client, &p);
err = encode_and_send(client, &p, 0);
mqtt_sn_disconnect_internal(client);
return err;
@ -759,14 +983,14 @@ int mqtt_sn_sleep(struct mqtt_sn_client *client, uint16_t duration)
p.params.disconnect.duration = duration;
err = encode_and_send(client, &p);
err = encode_and_send(client, &p, 0);
mqtt_sn_sleep_internal(client);
return err;
}
int mqtt_sn_subscribe(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
struct mqtt_sn_data *topic_name)
struct mqtt_sn_data *topic_name)
{
struct mqtt_sn_topic *topic;
int err;
@ -833,7 +1057,7 @@ int mqtt_sn_unsubscribe(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
}
int mqtt_sn_publish(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
struct mqtt_sn_data *topic_name, bool retain, struct mqtt_sn_data *data)
struct mqtt_sn_data *topic_name, bool retain, struct mqtt_sn_data *data)
{
struct mqtt_sn_publish *pub;
struct mqtt_sn_topic *topic;
@ -885,6 +1109,84 @@ int mqtt_sn_publish(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
return 0;
}
static void handle_advertise(struct mqtt_sn_client *client, struct mqtt_sn_param_advertise *p,
struct mqtt_sn_data rx_addr)
{
struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_ADVERTISE};
struct mqtt_sn_gateway *gw;
gw = mqtt_sn_gw_find_id(client, p->gw_id);
if (gw == NULL) {
LOG_DBG("Creating GW 0x%02x with duration %d", p->gw_id, p->duration);
gw = mqtt_sn_gw_create(p->gw_id, p->duration, rx_addr);
if (!gw) {
return;
}
sys_slist_append(&client->gateway, &gw->next);
} else {
LOG_DBG("Updating timer for GW 0x%02x with duration %d", p->gw_id, p->duration);
gw->adv_timer =
k_uptime_get() + (p->duration * CONFIG_MQTT_SN_LIB_N_ADV * MSEC_PER_SEC);
}
k_work_schedule(&client->process_work, K_NO_WAIT);
if (client->evt_cb) {
client->evt_cb(client, &evt);
}
}
static void handle_searchgw(struct mqtt_sn_client *client, struct mqtt_sn_param_searchgw *p)
{
struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_SEARCHGW};
/* Increment SEARCHGW transmission timestamp if waiting */
if (client->ts_searchgw != 0) {
client->ts_searchgw = k_uptime_get() + (T_SEARCHGW_MSEC * sys_rand8_get() / 255);
}
/* Set transmission timestamp to respond to SEARCHGW if we have a GW */
if (sys_slist_len(&client->gateway) > 0) {
client->ts_gwinfo = k_uptime_get() + (T_GWINFO_MSEC * sys_rand8_get() / 255);
}
client->radius_gwinfo = p->radius;
k_work_schedule(&client->process_work, K_NO_WAIT);
if (client->evt_cb) {
client->evt_cb(client, &evt);
}
}
static void handle_gwinfo(struct mqtt_sn_client *client, struct mqtt_sn_param_gwinfo *p,
struct mqtt_sn_data rx_addr)
{
struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_GWINFO};
struct mqtt_sn_gateway *gw;
/* Clear SEARCHGW and GWINFO transmission if waiting */
client->ts_searchgw = 0;
client->ts_gwinfo = 0;
k_work_schedule(&client->process_work, K_NO_WAIT);
/* Extract GW info and store */
if (p->gw_add.size > 0) {
rx_addr.data = p->gw_add.data;
rx_addr.size = p->gw_add.size;
} else {
}
gw = mqtt_sn_gw_create(p->gw_id, -1, rx_addr);
if (!gw) {
return;
}
sys_slist_append(&client->gateway, &gw->next);
if (client->evt_cb) {
client->evt_cb(client, &evt);
}
}
static void handle_connack(struct mqtt_sn_client *client, struct mqtt_sn_param_connack *p)
{
struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_CONNECTED};
@ -922,7 +1224,7 @@ static void handle_willtopicreq(struct mqtt_sn_client *client)
response.params.willtopic.topic.data = client->will_topic.data;
response.params.willtopic.topic.size = client->will_topic.size;
encode_and_send(client, &response);
encode_and_send(client, &response, 0);
}
static void handle_willmsgreq(struct mqtt_sn_client *client)
@ -932,7 +1234,7 @@ static void handle_willmsgreq(struct mqtt_sn_client *client)
response.params.willmsg.msg.data = client->will_msg.data;
response.params.willmsg.msg.size = client->will_msg.size;
encode_and_send(client, &response);
encode_and_send(client, &response, 0);
}
static void handle_register(struct mqtt_sn_client *client, struct mqtt_sn_param_register *p)
@ -955,7 +1257,7 @@ static void handle_register(struct mqtt_sn_client *client, struct mqtt_sn_param_
response.params.regack.topic_id = p->topic_id;
response.params.regack.msg_id = p->msg_id;
encode_and_send(client, &response);
encode_and_send(client, &response, 0);
}
static void handle_regack(struct mqtt_sn_client *client, struct mqtt_sn_param_regack *p)
@ -980,8 +1282,8 @@ static void handle_publish(struct mqtt_sn_client *client, struct mqtt_sn_param_p
{
struct mqtt_sn_param response;
struct mqtt_sn_evt evt = {.param.publish = {.data = p->data,
.topic_id = p->topic_id,
.topic_type = p->topic_type},
.topic_id = p->topic_id,
.topic_type = p->topic_type},
.type = MQTT_SN_EVT_PUBLISH};
if (p->qos == MQTT_SN_QOS_1) {
@ -990,12 +1292,12 @@ static void handle_publish(struct mqtt_sn_client *client, struct mqtt_sn_param_p
response.params.puback.msg_id = p->msg_id;
response.params.puback.ret_code = MQTT_SN_CODE_ACCEPTED;
encode_and_send(client, &response);
encode_and_send(client, &response, 0);
} else if (p->qos == MQTT_SN_QOS_2) {
response.type = MQTT_SN_MSG_TYPE_PUBREC;
response.params.pubrec.msg_id = p->msg_id;
encode_and_send(client, &response);
encode_and_send(client, &response, 0);
}
if (client->evt_cb) {
@ -1030,7 +1332,7 @@ static void handle_pubrec(struct mqtt_sn_client *client, struct mqtt_sn_param_pu
response.params.pubrel.msg_id = p->msg_id;
encode_and_send(client, &response);
encode_and_send(client, &response, 0);
}
static void handle_pubrel(struct mqtt_sn_client *client, struct mqtt_sn_param_pubrel *p)
@ -1039,7 +1341,7 @@ static void handle_pubrel(struct mqtt_sn_client *client, struct mqtt_sn_param_pu
response.params.pubcomp.msg_id = p->msg_id;
encode_and_send(client, &response);
encode_and_send(client, &response, 0);
}
static void handle_pubcomp(struct mqtt_sn_client *client, struct mqtt_sn_param_pubcomp *p)
@ -1088,7 +1390,7 @@ static void handle_pingreq(struct mqtt_sn_client *client)
{
struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_PINGRESP};
encode_and_send(client, &response);
encode_and_send(client, &response, 0);
}
static void handle_pingresp(struct mqtt_sn_client *client)
@ -1112,7 +1414,7 @@ static void handle_disconnect(struct mqtt_sn_client *client, struct mqtt_sn_para
mqtt_sn_disconnect_internal(client);
}
static int handle_msg(struct mqtt_sn_client *client)
static int handle_msg(struct mqtt_sn_client *client, struct mqtt_sn_data rx_addr)
{
int err;
struct mqtt_sn_param p;
@ -1125,7 +1427,14 @@ static int handle_msg(struct mqtt_sn_client *client)
LOG_INF("Got message of type %d", p.type);
switch (p.type) {
case MQTT_SN_MSG_TYPE_ADVERTISE:
handle_advertise(client, &p.params.advertise, rx_addr);
break;
case MQTT_SN_MSG_TYPE_SEARCHGW:
handle_searchgw(client, &p.params.searchgw);
break;
case MQTT_SN_MSG_TYPE_GWINFO:
handle_gwinfo(client, &p.params.gwinfo, rx_addr);
break;
case MQTT_SN_MSG_TYPE_CONNACK:
handle_connack(client, &p.params.connack);
@ -1189,9 +1498,11 @@ static int handle_msg(struct mqtt_sn_client *client)
int mqtt_sn_input(struct mqtt_sn_client *client)
{
ssize_t next_frame_size;
char addr[CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE];
struct mqtt_sn_data rx_addr = {.data = addr, .size = CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE};
int err;
if (!client || !client->transport || !client->transport->recv) {
if (!client || !client->transport || !client->transport->recvfrom) {
return -EINVAL;
}
@ -1204,7 +1515,8 @@ int mqtt_sn_input(struct mqtt_sn_client *client)
net_buf_simple_reset(&client->rx);
next_frame_size = client->transport->recv(client, client->rx.data, client->rx.size);
next_frame_size = client->transport->recvfrom(client, client->rx.data, client->rx.size,
(void *)rx_addr.data, &rx_addr.size);
if (next_frame_size <= 0) {
return next_frame_size;
}
@ -1217,7 +1529,7 @@ int mqtt_sn_input(struct mqtt_sn_client *client)
LOG_HEXDUMP_DBG(client->rx.data, client->rx.len, "Received data");
err = handle_msg(client);
err = handle_msg(client, rx_addr);
if (err) {
return err;

View file

@ -111,6 +111,17 @@ static int decode_msg_advertise(struct net_buf_simple *buf, struct mqtt_sn_param
return 0;
}
static int decode_msg_searchgw(struct net_buf_simple *buf, struct mqtt_sn_param_searchgw *params)
{
if (buf->len != 1) {
return -EPROTO;
}
params->radius = net_buf_simple_pull_u8(buf);
return 0;
}
static int decode_msg_gwinfo(struct net_buf_simple *buf, struct mqtt_sn_param_gwinfo *params)
{
if (buf->len < 1) {
@ -121,6 +132,8 @@ static int decode_msg_gwinfo(struct net_buf_simple *buf, struct mqtt_sn_param_gw
if (buf->len) {
decode_data(buf, &params->gw_add);
} else {
params->gw_add.size = 0;
}
return 0;
@ -332,6 +345,8 @@ int mqtt_sn_decode_msg(struct net_buf_simple *buf, struct mqtt_sn_param *params)
switch (params->type) {
case MQTT_SN_MSG_TYPE_ADVERTISE:
return decode_msg_advertise(buf, &params->params.advertise);
case MQTT_SN_MSG_TYPE_SEARCHGW:
return decode_msg_searchgw(buf, &params->params.searchgw);
case MQTT_SN_MSG_TYPE_GWINFO:
return decode_msg_gwinfo(buf, &params->params.gwinfo);
case MQTT_SN_MSG_TYPE_CONNACK:

View file

@ -14,6 +14,7 @@
#include <zephyr/net/mqtt_sn.h>
#include <zephyr/net/net_ip.h>
#include <zephyr/net/socket.h>
#include <zephyr/net/igmp.h>
#include <zephyr/posix/fcntl.h>
@ -43,25 +44,93 @@ static int tp_udp_init(struct mqtt_sn_transport *transport)
{
struct mqtt_sn_transport_udp *udp = UDP_TRANSPORT(transport);
int err;
struct sockaddr addrm;
struct ip_mreqn mreqn;
int optval;
struct net_if *iface;
udp->sock = zsock_socket(udp->gwaddr.sa_family, SOCK_DGRAM, 0);
udp->sock = zsock_socket(udp->bcaddr.sa_family, SOCK_DGRAM, 0);
if (udp->sock < 0) {
return errno;
}
LOG_DBG("Socket %d", udp->sock);
#ifdef LOG_DBG
char ip[30], *out;
out = get_ip_str((struct sockaddr *)&udp->gwaddr, ip, sizeof(ip));
if (out != NULL) {
LOG_DBG("Connecting to IP %s:%u", out,
ntohs(((struct sockaddr_in *)&udp->gwaddr)->sin_port));
optval = 1;
err = zsock_setsockopt(udp->sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
if (err < 0) {
return errno;
}
#endif
err = zsock_connect(udp->sock, (struct sockaddr *)&udp->gwaddr, udp->gwaddrlen);
if (IS_ENABLED(CONFIG_MQTT_SN_LOG_LEVEL_DBG)) {
char ip[30], *out;
uint16_t port;
out = get_ip_str((struct sockaddr *)&udp->bcaddr, ip, sizeof(ip));
switch (udp->bcaddr.sa_family) {
case AF_INET:
port = ntohs(((struct sockaddr_in *)&udp->bcaddr)->sin_port);
break;
case AF_INET6:
port = ntohs(((struct sockaddr_in6 *)&udp->bcaddr)->sin6_port);
break;
default:
break;
}
if (out != NULL) {
LOG_DBG("Binding to Brodcast IP %s:%u", out, port);
}
}
switch (udp->bcaddr.sa_family) {
case AF_INET:
if (IS_ENABLED(CONFIG_NET_IPV4)) {
addrm.sa_family = AF_INET;
((struct sockaddr_in *)&addrm)->sin_port =
((struct sockaddr_in *)&udp->bcaddr)->sin_port;
((struct sockaddr_in *)&addrm)->sin_addr.s_addr = INADDR_ANY;
}
break;
case AF_INET6:
if (IS_ENABLED(CONFIG_NET_IPV6)) {
addrm.sa_family = AF_INET6;
((struct sockaddr_in6 *)&addrm)->sin6_port =
((struct sockaddr_in6 *)&udp->bcaddr)->sin6_port;
memcpy(&((struct sockaddr_in6 *)&addrm)->sin6_addr, &in6addr_any,
sizeof(struct in6_addr));
break;
}
default:
LOG_ERR("Unknown AF");
return -EINVAL;
}
err = zsock_bind(udp->sock, &addrm, sizeof(addrm));
if (err) {
LOG_ERR("Error during bind: %d", errno);
return errno;
}
memcpy(&mreqn.imr_multiaddr, &udp->bcaddr.data[2], sizeof(udp->bcaddr.data) - 2);
if (udp->bcaddr.sa_family == AF_INET && IS_ENABLED(CONFIG_NET_IPV4)) {
iface = net_if_ipv4_select_src_iface(
&((struct sockaddr_in *)&udp->bcaddr)->sin_addr);
} else if (udp->bcaddr.sa_family == AF_INET6 && IS_ENABLED(CONFIG_NET_IPV6)) {
iface = net_if_ipv6_select_src_iface(&((struct sockaddr_in6 *)&addrm)->sin6_addr);
} else {
LOG_ERR("Unknown AF");
return -EINVAL;
}
mreqn.imr_ifindex = net_if_get_by_iface(iface);
err = zsock_setsockopt(udp->sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreqn, sizeof(mreqn));
if (err < 0) {
return errno;
}
optval = CONFIG_MQTT_SN_LIB_BROADCAST_RADIUS;
err = zsock_setsockopt(udp->sock, IPPROTO_IP, IP_MULTICAST_TTL, &optval, sizeof(optval));
if (err < 0) {
return errno;
}
@ -76,14 +145,37 @@ static void tp_udp_deinit(struct mqtt_sn_transport *transport)
zsock_close(udp->sock);
}
static int tp_udp_msg_send(struct mqtt_sn_client *client, void *buf, size_t sz)
static int tp_udp_sendto(struct mqtt_sn_client *client, void *buf, size_t sz, const void *dest_addr,
size_t addrlen)
{
struct mqtt_sn_transport_udp *udp = UDP_TRANSPORT(client->transport);
int rc;
int ttl;
socklen_t ttl_len;
LOG_HEXDUMP_DBG(buf, sz, "Sending UDP packet");
if (dest_addr == NULL) {
LOG_HEXDUMP_DBG(buf, sz, "Sending Broadcast UDP packet");
/* Set ttl if requested value does not match existing*/
rc = zsock_getsockopt(udp->sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, &ttl_len);
if (rc < 0) {
return -errno;
}
if (ttl != addrlen) {
ttl = addrlen;
rc = zsock_setsockopt(udp->sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl,
sizeof(ttl));
if (rc < 0) {
return -errno;
}
}
rc = zsock_sendto(udp->sock, buf, sz, 0, &udp->bcaddr, udp->bcaddrlen);
} else {
LOG_HEXDUMP_DBG(buf, sz, "Sending Addressed UDP packet");
rc = zsock_sendto(udp->sock, buf, sz, 0, dest_addr, addrlen);
}
rc = zsock_send(udp->sock, buf, sz, 0);
if (rc < 0) {
return -errno;
}
@ -95,12 +187,14 @@ static int tp_udp_msg_send(struct mqtt_sn_client *client, void *buf, size_t sz)
return 0;
}
static ssize_t tp_udp_recv(struct mqtt_sn_client *client, void *buffer, size_t length)
static ssize_t tp_udp_recvfrom(struct mqtt_sn_client *client, void *buffer, size_t length,
void *src_addr, size_t *addrlen)
{
struct mqtt_sn_transport_udp *udp = UDP_TRANSPORT(client->transport);
int rc;
struct sockaddr *srcaddr = src_addr;
rc = zsock_recv(udp->sock, buffer, length, 0);
rc = zsock_recvfrom(udp->sock, buffer, length, 0, src_addr, addrlen);
LOG_DBG("recv %d", rc);
if (rc < 0) {
return -errno;
@ -108,6 +202,16 @@ static ssize_t tp_udp_recv(struct mqtt_sn_client *client, void *buffer, size_t l
LOG_HEXDUMP_DBG(buffer, rc, "recv");
if (*addrlen != udp->bcaddrlen) {
return rc;
}
if (memcmp(srcaddr->data, udp->bcaddr.data, *addrlen) != 0) {
return rc;
}
src_addr = NULL;
*addrlen = 1;
return rc;
}
@ -131,10 +235,10 @@ static int tp_udp_poll(struct mqtt_sn_client *client)
return pollfd.revents & ZSOCK_POLLIN;
}
int mqtt_sn_transport_udp_init(struct mqtt_sn_transport_udp *udp, struct sockaddr *gwaddr,
int mqtt_sn_transport_udp_init(struct mqtt_sn_transport_udp *udp, struct sockaddr *bcaddr,
socklen_t addrlen)
{
if (!udp || !gwaddr || !addrlen) {
if (!udp || !bcaddr || !addrlen) {
return -EINVAL;
}
@ -142,14 +246,14 @@ int mqtt_sn_transport_udp_init(struct mqtt_sn_transport_udp *udp, struct sockadd
udp->tp = (struct mqtt_sn_transport){.init = tp_udp_init,
.deinit = tp_udp_deinit,
.msg_send = tp_udp_msg_send,
.sendto = tp_udp_sendto,
.poll = tp_udp_poll,
.recv = tp_udp_recv};
.recvfrom = tp_udp_recvfrom};
udp->sock = 0;
memcpy(&udp->gwaddr, gwaddr, addrlen);
udp->gwaddrlen = addrlen;
memcpy(&udp->bcaddr, bcaddr, addrlen);
udp->bcaddrlen = addrlen;
return 0;
}