samples: net: add secure MQTT sensor/actuator device sample

This sample demonstrates the implementation of an (industrial) IoT
sensor/actuator device. The application uses the MQTT protocol to
securely send sensor data to a remote MQTT broker, while responding
to commands received over the MQTT connection.

Signed-off-by: Jason Murphy <jason.murphy@analog.com>
This commit is contained in:
Jason Murphy 2024-03-30 17:13:12 +00:00 committed by Anas Nashif
commit 55d6e4cb10
14 changed files with 1353 additions and 0 deletions

View file

@ -0,0 +1,11 @@
# SPDX-License-Identifier: Apache-2.0
cmake_minimum_required(VERSION 3.20.0)
find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE})
project(secure_mqtt_sensor_actuator)
FILE(GLOB app_sources src/*.c)
target_sources(app PRIVATE ${app_sources})
zephyr_include_directories(${APPLICATION_SOURCE_DIR}/src/tls_config)

View file

@ -0,0 +1,58 @@
# Copyright (c) 2024 Analog Devices, Inc.
# SPDX-License-Identifier: Apache-2.0
mainmenu "Secure MQTT Sensor Actuator Sample Application"
config NET_SAMPLE_MQTT_BROKER_HOSTNAME
string "Hostname of MQTT broker"
default "test.mosquitto.org"
help
MQTT broker's hostname or IP address.
config NET_SAMPLE_MQTT_BROKER_PORT
string "MQTT Broker Connection Port"
default "8883"
help
Port through which the application should connect to the MQTT broker.
Secure MQTT uses port 8883.
config NET_SAMPLE_MQTT_PUB_TOPIC
string "The MQTT topic the application should publish data to"
default "zephyr_sample/sensor"
config NET_SAMPLE_MQTT_SUB_TOPIC_CMD
string "The MQTT topic the application will receive commands on"
default "zephyr_sample/command"
config NET_SAMPLE_MQTT_PUBLISH_INTERVAL
int "Interval between MQTT publishes (in seconds)"
default 3
help
This config determines the frequency at which MQTT publishes occur.
choice NET_SAMPLE_MQTT_QOS
prompt "Quality of Service level used for MQTT publish and subscribe"
default NET_SAMPLE_MQTT_QOS_1_AT_LEAST_ONCE
config NET_SAMPLE_MQTT_QOS_0_AT_MOST_ONCE
bool "QoS 0 / At most once delivery"
help
No acknowledgment needed for published message.
config NET_SAMPLE_MQTT_QOS_1_AT_LEAST_ONCE
bool "QoS 1 / At least once delivery"
help
If acknowledgment expected for published message, duplicate messages permitted.
config NET_SAMPLE_MQTT_QOS_2_EXACTLY_ONCE
bool "QoS 2 / Exactly once delivery"
help
Acknowledgment expected and message shall be published only once.
endchoice
config NET_SAMPLE_MQTT_PAYLOAD_SIZE
int "Size of MQTT payload in bytes"
default 128
source "Kconfig.zephyr"

View file

@ -0,0 +1,219 @@
.. zephyr:code-sample:: secure-mqtt-sensor-actuator
:name: Secure MQTT Sensor/Actuator
:relevant-api: mqtt_socket sensor_interface
Implement an MQTT-based IoT sensor/actuator device
Overview
********
This sample demonstrates the implementation of an IoT sensor/actuator device.
The application uses the MQTT protocol to securely send sensor data
to a remote MQTT broker, while responding to commands received over MQTT.
Features:
- Establishing network connectivity using a DHCPv4 lease
- Establishing a secure MQTT connection (using TLS 1.2) to MQTT broker
- Publishing temperature sensor data in JSON format to the MQTT broker at a user-defined interval
- Subscribing to user-defined topic(s) on MQTT broker
- Responding to commands received over the network (LED control)
- Handling of MQTT connection, re-connecting and keep-alive
- Network status LED
Requirements
************
- Board with network capability (tested with adi_eval_adin1110ebz)
- `Eclipse Mosquitto`_ MQTT broker
- DHCP server
- Network connection between the board and Mosquitto broker
Build and Running
*****************
This application relies on an network connection between the board and an MQTT broker.
This broker can exist locally (e.g. on a host PC) or a publicly available MQTT broker
<https://test.mosquitto.org/> can be used.
For quick sampling/testing, a configuration is provided to connect to a local MQTT broker
without security, using a static IP address.
Hardware Setup
==============
If using Ethernet, connect the board to the MQTT broker. This may be your host PC
(for locally hosted Mosquitto broker) or your internet router
(to connect to the public Mosquitto broker).
If required, connect a temperature sensor to the board.
Software Setup
==============
The temperature sensor should be aliased in devicetree as ``ambient-temp0``.
If a board does not include an on-board temperature sensor, one can be connected externally
and a board overlay file used to add the sensor and alias:
.. code-block:: devicetree
/ {
aliases {
ambient-temp0 = &adt7420;
};
};
};
It is possible to use other types of sensors, by adding them in devicetree and by changing
``SENSOR_CHAN`` :file:`in device.c` to match the desired sensor type.
There are a few ways to configure the application:
.. list-table::
* - :file:`prj.conf`
- Default config: Secure MQTT, dynamic IP address (DHCP)
* - :file:`overlay-static.conf`
- Secure MQTT, static IP address
* - :file:`overlay-static-insecure.conf`
- Unsecure MQTT, static IP address
**Default Config:**
Using the default config, the application will use DHCP to acquire an IP address and attempt
to securely connect to an MQTT broker using TLS 1.2.
- The MQTT broker to which the board will connect is specified by
``CONFIG_NET_SAMPLE_MQTT_BROKER_HOSTNAME``.
By default, this is set to test.mosquitto.org.
- Connecting securely using TLS, requires the inclusion of the broker's CA certificate
in the application.
- Download the CA certificate in DER or PEM format from https://test.mosquitto.org
- In :file:`tls_config/cert.h`, set ``ca_certificate[]`` to the contents of the cert.
- By connecting the board to your internet router, it should automatically be assigned
an IPv4 address using DHCP.
- The application will then attempt to connect to the public Mosquitto broker
and begin publishing data.
- It is also possible to connect securely to a locally hosted MQTT broker.
This will require provisioning of certificates.
The CA cert should be included in the build as described above.
``CONFIG_NET_SAMPLE_MQTT_BROKER_HOSTNAME`` should be configured to match the
local broker hostname/IP address.
Depending on the CA cert being used, additional MbedTLS config options may need to be enabled.
This can be done using Kconfig or using a custom MbedTLS config file
(see modules/mbedtls/Kconfig).
See https://mosquitto.org/man/mosquitto-tls-7.html for more info on setting up
TLS support for Mosquitto locally.
- A DHCP server can be installed on the host PC to handle assigning an IP to the board
e.g. dnsmasq (Linux) or DHCP Server for Windows (Windows).
- Build the sample with default config:
.. zephyr-app-commands::
:zephyr-app: samples/net/secure_mqtt_sensor_actuator
:board: adi_eval_adin1110ebz
:goals: build
:compact:
**Static IP Config:**
Use the :file:`overlay-static.conf` Kconfig overlay to disable DHCP and use
a static IP address config.
The device, gateway, and DNS server IP addresses should be set according to
your local network configuration.
.. zephyr-app-commands::
:zephyr-app: samples/net/secure_mqtt_sensor_actuator
:board: adi_eval_adin1110ebz
:conf: "prj.conf overlay-static.conf"
:goals: build
:compact:
**Static IP/Unsecure MQTT Config:**
Use the :file:`overlay-static-insecure.conf` Kconfig overlay to disable TLS and DHCP.
This config requires connecting to a locally hosted Mosquitto MQTT broker.
- In :file:`overlay-static-insecure.conf`, set the IP address of the board and the Mosquitto
broker (i.e. IP address of Ethernet port on host PC). These addresses should be in the
same subnet e.g. 192.0.2.1 and 192.0.2.2.
- On your host PC, install Mosquitto.
- Create a file called ``unsecure.conf`` with the following content:
.. code-block:: console
listener 1883 0.0.0.0
allow_anonymous true
- Start a Mosquitto broker using the configuration file:
.. code-block:: console
$ sudo mosquitto -v -c unsecure.conf
- Build the sample with quick test config:
.. zephyr-app-commands::
:zephyr-app: samples/net/secure_mqtt_sensor_actuator
:board: adi_eval_adin1110ebz
:conf: "prj.conf overlay-static-insecure.conf"
:goals: build
:compact:
Using the Sample
================
- Once the board establishes an MQTT connection with the Mosquitto broker, the network
LED will turn on and the board will begin publishing sensor readings in JSON format
at a regular interval determined by ``CONFIG_NET_SAMPLE_MQTT_PUBLISH_INTERVAL``.
- Use Mosquitto to subscribe to the sensor data being sent from the board:
.. code-block:: console
$ mosquitto_sub -d -h <test.mosquitto.org/local broker IP> -t zephyr_sample/sensor
- The application will subscribe to a topic determined by ``CONFIG_NET_SAMPLE_MQTT_SUB_TOPIC_CMD``.
If a supported command string is received by the board on this topic, the board will execute
an associated command handler function.
Supported commands (defined in :file:`device.c`):
- ``led_on``, turn on board LED
- ``led_off``, turn off board LED
- Use Mosquitto to publish these commands to the MQTT broker:
.. code-block:: console
$ mosquitto_pub -d -h <test.mosquitto.org/local broker IP> --cafile <path/to/ca.crt> -t zephyr_sample/command -m "led_on"
- The Quality of Service (QoS) level that is used for MQTT publishing and
subscriptions can be configured using Kconfig.
Sample output
=============
.. code-block:: console
*** Booting Zephyr OS build v3.6.0-2212-g2c9c4f3733e9 ***
[00:00:00.181,000] <inf> app_device: Device adt7420@48 is ready
[00:00:00.181,000] <inf> app_device: Device leds is ready
[00:00:00.181,000] <inf> app_main: MAC Address: 00:E0:FE:FE:DA:C8
[00:00:00.181,000] <inf> app_main: Bringing up network..
[00:00:00.801,000] <inf> net_dhcpv4: Received: 192.168.1.17
[00:00:00.801,000] <inf> app_main: Network connectivity up!
[00:00:00.818,000] <inf> app_mqtt: Connecting to MQTT broker @ 91.121.93.94
[00:00:01.154,000] <inf> net_mqtt: Connect completed
[00:00:01.197,000] <inf> app_mqtt: Connected to MQTT broker!
[00:00:01.197,000] <inf> app_mqtt: Hostname: test.mosquitto.org
[00:00:01.198,000] <inf> app_mqtt: Client ID: adi_eval_adin1110ebz_9a
[00:00:01.198,000] <inf> app_mqtt: Port: 8883
[00:00:01.198,000] <inf> app_mqtt: TLS: Enabled
[00:00:01.198,000] <inf> app_mqtt: Subscribing to 1 topic(s)
[00:00:01.238,000] <inf> app_mqtt: SUBACK packet ID: 5841
[00:00:04.200,000] <inf> app_mqtt: Published to topic 'zephyr_sample/sensor', QoS 1
[00:00:04.319,000] <inf> app_mqtt: PUBACK packet ID: 1
[00:00:07.202,000] <inf> app_mqtt: Published to topic 'zephyr_sample/sensor', QoS 1
[00:00:07.323,000] <inf> app_mqtt: PUBACK packet ID: 2
[00:00:10.204,000] <inf> app_mqtt: Published to topic 'zephyr_sample/sensor', QoS 1
[00:00:10.322,000] <inf> app_mqtt: PUBACK packet ID: 3
[00:00:12.769,000] <inf> app_mqtt: MQTT payload received!
[00:00:12.769,000] <inf> app_mqtt: topic: 'zephyr_sample/command', payload: led_on
[00:00:12.770,000] <inf> app_device: Executing device command: led_on
.. _Eclipse Mosquitto: https://mosquitto.org/download/

View file

@ -0,0 +1,15 @@
/*
* Copyright (c) 2024 Analog Devices, Inc.
*
* SPDX-License-Identifier: Apache-2.0
*/
&adin1110 {
mdio {
/* Enable link status LED */
ethernet-phy@1 {
led0-en;
led1-en;
};
};
};

View file

@ -0,0 +1,18 @@
# Config to disable TLS and DHCPv4, allowing insecure MQTT and a static IP address.
# This allows quick testing of the application without need for a DHCP server or secure MQTT broker set up.
# Only recommended for quick sampling/testing.
# Usage: west build -b <board> -- -DOVERLAY_CONFIG=overlay-static-insecure.conf
# Disable MQTT with TLS
CONFIG_MQTT_LIB_TLS=n
# Disable DHCP
CONFIG_NET_DHCPV4=n
# Insecure MQTT uses port 1883
CONFIG_NET_SAMPLE_MQTT_BROKER_PORT="1883"
# Static IP address config
CONFIG_NET_CONFIG_SETTINGS=y
CONFIG_NET_CONFIG_MY_IPV4_ADDR="192.0.2.1"
CONFIG_NET_SAMPLE_MQTT_BROKER_HOSTNAME="192.0.2.2"

View file

@ -0,0 +1,8 @@
CONFIG_NET_DHCPV4=n
CONFIG_NET_CONFIG_SETTINGS=y
CONFIG_NET_CONFIG_NEED_IPV4=y
CONFIG_NET_CONFIG_MY_IPV4_ADDR="192.0.2.1"
CONFIG_NET_CONFIG_MY_IPV4_GW="192.0.2.2"
CONFIG_DNS_RESOLVER=y
CONFIG_DNS_SERVER_IP_ADDRESSES=y
CONFIG_DNS_SERVER1="192.0.2.2"

View file

@ -0,0 +1,59 @@
# Enable network stack
CONFIG_NETWORKING=y
CONFIG_NET_LOG=y
# Enable IPv4
CONFIG_NET_IPV4=y
# Enable IPv6
CONFIG_NET_IPV6=y
# Enable TCP
CONFIG_NET_TCP=y
# Enable DHCP
CONFIG_NET_DHCPV4=y
# Enable Sockets (used by MQTT lib)
CONFIG_NET_SOCKETS=y
CONFIG_NET_SOCKETS_SOCKOPT_TLS=y
# Enable MQTT
CONFIG_MQTT_LIB=y
CONFIG_MQTT_LIB_TLS=y
# Enable Mbed TLS
CONFIG_MBEDTLS=y
CONFIG_MBEDTLS_BUILTIN=y
CONFIG_MBEDTLS_ENABLE_HEAP=y
CONFIG_MBEDTLS_HEAP_SIZE=60000
CONFIG_MBEDTLS_SSL_MAX_CONTENT_LEN=16384
CONFIG_MBEDTLS_PEM_CERTIFICATE_FORMAT=y
CONFIG_MBEDTLS_SERVER_NAME_INDICATION=y
# Enable JSON
CONFIG_JSON_LIBRARY=y
# Enable net conn manager
CONFIG_NET_CONNECTION_MANAGER=y
# Enable device hostname
CONFIG_NET_HOSTNAME_ENABLE=y
# Enable Posix API functionality
CONFIG_POSIX_API=y
# Enable sensor API
CONFIG_SENSOR=y
# Enable LED API
CONFIG_LED=y
# Main stack size
CONFIG_MAIN_STACK_SIZE=2048
# System work queue stack size
CONFIG_SYSTEM_WORKQUEUE_STACK_SIZE=2048
# Increase Rx net buffers
CONFIG_NET_BUF_RX_COUNT=100

View file

@ -0,0 +1,33 @@
sample:
name: Secure MQTT Sensor/Actuator Sample
tests:
sample.net.secure_mqtt_sensor_actuator:
harness: net
tags:
- net
- mqtt
- sensors
filter: dt_alias_exists("ambient-temp0")
integration_platforms:
- adi_eval_adin1110ebz
sample.net.secure_mqtt_sensor_actuator.staticip:
harness: net
extra_args: OVERLAY_CONFIG="overlay-static.conf"
tags:
- net
- mqtt
- sensors
filter: dt_alias_exists("ambient-temp0")
integration_platforms:
- native_sim
sample.net.secure_mqtt_sensor_actuator.staticip_insecure:
harness: net
extra_args: OVERLAY_CONFIG="overlay-static-insecure.conf"
tags:
- net
- mqtt
- sensors
filter: dt_alias_exists("ambient-temp0")
integration_platforms:
- adi_eval_adin1110ebz
- native_sim

View file

@ -0,0 +1,138 @@
/*
* Copyright (c) 2024 Analog Devices, Inc.
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <zephyr/logging/log.h>
LOG_MODULE_REGISTER(app_device, LOG_LEVEL_DBG);
#include <zephyr/kernel.h>
#include <zephyr/drivers/sensor.h>
#include <zephyr/drivers/led.h>
#include <zephyr/random/random.h>
#include "device.h"
#define SENSOR_CHAN SENSOR_CHAN_AMBIENT_TEMP
#define SENSOR_UNIT "Celsius"
/* Devices */
static const struct device *sensor = DEVICE_DT_GET_OR_NULL(DT_ALIAS(ambient_temp0));
static const struct device *leds = DEVICE_DT_GET_OR_NULL(DT_INST(0, gpio_leds));
/* Command handlers */
static void led_on_handler(void)
{
device_write_led(LED_USER, LED_ON);
}
static void led_off_handler(void)
{
device_write_led(LED_USER, LED_OFF);
}
/* Supported device commands */
struct device_cmd device_commands[] = {
{"led_on", led_on_handler},
{"led_off", led_off_handler}
};
const size_t num_device_commands = ARRAY_SIZE(device_commands);
/* Command dispatcher */
void device_command_handler(uint8_t *command)
{
for (int i = 0; i < num_device_commands; i++) {
if (strcmp(command, device_commands[i].command) == 0) {
LOG_INF("Executing device command: %s", device_commands[i].command);
return device_commands[i].handler();
}
}
LOG_ERR("Unknown command: %s", command);
}
int device_read_sensor(struct sensor_sample *sample)
{
int rc;
struct sensor_value sensor_val;
/* Read sample only if a real sensor device is present
* otherwise return a dummy value
*/
if (sensor == NULL) {
sample->unit = SENSOR_UNIT;
sample->value = 20.0 + (double)sys_rand32_get() / UINT32_MAX * 5.0;
return 0;
}
rc = sensor_sample_fetch(sensor);
if (rc) {
LOG_ERR("Failed to fetch sensor sample [%d]", rc);
return rc;
}
rc = sensor_channel_get(sensor, SENSOR_CHAN, &sensor_val);
if (rc) {
LOG_ERR("Failed to get sensor channel [%d]", rc);
return rc;
}
sample->unit = SENSOR_UNIT;
sample->value = sensor_value_to_double(&sensor_val);
return rc;
}
int device_write_led(enum led_id led_idx, enum led_state state)
{
int rc;
switch (state) {
case LED_OFF:
if (leds == NULL) {
LOG_INF("LED %d OFF", led_idx);
break;
}
led_off(leds, led_idx);
break;
case LED_ON:
if (leds == NULL) {
LOG_INF("LED %d ON", led_idx);
break;
}
led_on(leds, led_idx);
break;
default:
LOG_ERR("Invalid LED state setting");
rc = -EINVAL;
break;
}
return rc;
}
bool devices_ready(void)
{
bool rc = true;
/* Check readiness only if a real sensor device is present */
if (sensor != NULL) {
if (!device_is_ready(sensor)) {
LOG_ERR("Device %s is not ready", sensor->name);
rc = false;
} else {
LOG_INF("Device %s is ready", sensor->name);
}
}
if (leds != NULL) {
if (!device_is_ready(leds)) {
LOG_ERR("Device %s is not ready", leds->name);
rc = false;
} else {
LOG_INF("Device %s is ready", leds->name);
}
}
return rc;
}

View file

@ -0,0 +1,56 @@
/*
* Copyright (c) 2024 Analog Devices, Inc.
*
* SPDX-License-Identifier: Apache-2.0
*/
#ifndef __DEVICE_H__
#define __DEVICE_H__
/** @brief Sensor sample structure */
struct sensor_sample {
const char *unit;
int value;
};
/** @brief Available board LEDs */
enum led_id {
LED_NET, /* Network status LED*/
LED_USER /* User LED */
};
/** @brief LED states */
enum led_state {
LED_OFF,
LED_ON
};
/** @brief Device command consisting of a command string
* and associated handler function pointer
*/
struct device_cmd {
const char *command;
void (*handler)();
};
/**
* @brief Check if all devices are ready
*/
bool devices_ready(void);
/**
* @brief Read sample from the sensor
*/
int device_read_sensor(struct sensor_sample *sample);
/**
* @brief Write to a board LED
*/
int device_write_led(enum led_id led_idx, enum led_state state);
/**
* @brief Handler function for commands received over MQTT
*/
void device_command_handler(uint8_t *command);
#endif /* __DEVICE_H__ */

View file

@ -0,0 +1,140 @@
/*
* Copyright (c) 2024 Analog Devices, Inc.
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <zephyr/logging/log.h>
LOG_MODULE_REGISTER(app_main, LOG_LEVEL_DBG);
#include <zephyr/kernel.h>
#include <zephyr/net/mqtt.h>
#include <zephyr/net/net_if.h>
#include <zephyr/net/net_mgmt.h>
#include <zephyr/net/conn_mgr_monitor.h>
#include "mqtt_client.h"
#include "device.h"
#define NET_L4_EVENT_MASK (NET_EVENT_L4_CONNECTED | NET_EVENT_L4_DISCONNECTED)
/* MQTT client struct */
static struct mqtt_client client_ctx;
/* MQTT publish work item */
struct k_work_delayable mqtt_publish_work;
static struct net_mgmt_event_callback net_l4_mgmt_cb;
/* Network connection semaphore */
K_SEM_DEFINE(net_conn_sem, 0, 1);
static void net_l4_evt_handler(struct net_mgmt_event_callback *cb,
uint32_t mgmt_event, struct net_if *iface)
{
switch (mgmt_event) {
case NET_EVENT_L4_CONNECTED:
k_sem_give(&net_conn_sem);
LOG_INF("Network connectivity up!");
break;
case NET_EVENT_L4_DISCONNECTED:
LOG_INF("Network connectivity down!");
break;
default:
break;
}
}
/** Print the device's MAC address to console */
void log_mac_addr(struct net_if *iface)
{
struct net_linkaddr *mac;
mac = net_if_get_link_addr(iface);
LOG_INF("MAC Address: %02X:%02X:%02X:%02X:%02X:%02X",
mac->addr[0], mac->addr[1], mac->addr[3],
mac->addr[3], mac->addr[4], mac->addr[5]);
}
/** The system work queue is used to handle periodic MQTT publishing.
* Work queuing begins when the MQTT connection is established.
* Use CONFIG_NET_SAMPLE_MQTT_PUBLISH_INTERVAL to set the publish frequency.
*/
static void publish_work_handler(struct k_work *work)
{
int rc;
if (mqtt_connected) {
rc = app_mqtt_publish(&client_ctx);
if (rc != 0) {
LOG_INF("MQTT Publish failed [%d]", rc);
}
k_work_reschedule(&mqtt_publish_work,
K_SECONDS(CONFIG_NET_SAMPLE_MQTT_PUBLISH_INTERVAL));
} else {
k_work_cancel_delayable(&mqtt_publish_work);
}
}
int main(void)
{
int rc;
struct net_if *iface;
devices_ready();
iface = net_if_get_default();
if (iface == NULL) {
LOG_ERR("No network interface configured");
return -ENETDOWN;
}
log_mac_addr(iface);
/* Register callbacks for L4 events */
net_mgmt_init_event_callback(&net_l4_mgmt_cb, &net_l4_evt_handler, NET_L4_EVENT_MASK);
net_mgmt_add_event_callback(&net_l4_mgmt_cb);
LOG_INF("Bringing up network..");
#if defined(CONFIG_NET_DHCPV4)
net_dhcpv4_start(iface);
#else
/* If using static IP, L4 Connect callback will happen,
* before conn mgr is initialised, so resend events here
* to check for connectivity
*/
conn_mgr_mon_resend_status();
#endif
/* Wait for network to come up */
while (k_sem_take(&net_conn_sem, K_MSEC(MSECS_NET_POLL_TIMEOUT)) != 0) {
LOG_INF("Waiting for network connection..");
}
rc = app_mqtt_init(&client_ctx);
if (rc != 0) {
LOG_ERR("MQTT Init failed [%d]", rc);
return rc;
}
/* Initialise MQTT publish work item */
k_work_init_delayable(&mqtt_publish_work, publish_work_handler);
/* Thread main loop */
while (1) {
/* Block until MQTT connection is up */
app_mqtt_connect(&client_ctx);
/* We are now connected, begin queueing periodic MQTT publishes */
k_work_reschedule(&mqtt_publish_work,
K_SECONDS(CONFIG_NET_SAMPLE_MQTT_PUBLISH_INTERVAL));
/* Handle MQTT inputs and connection */
app_mqtt_run(&client_ctx);
}
return rc;
}

View file

@ -0,0 +1,510 @@
/*
* Copyright (c) 2024 Analog Devices, Inc.
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <zephyr/logging/log.h>
LOG_MODULE_REGISTER(app_mqtt, LOG_LEVEL_DBG);
#include <zephyr/kernel.h>
#include <zephyr/net/socket.h>
#include <zephyr/net/mqtt.h>
#include <zephyr/data/json.h>
#include <zephyr/random/random.h>
#include "mqtt_client.h"
#include "device.h"
/* Buffers for MQTT client */
static uint8_t rx_buffer[CONFIG_NET_SAMPLE_MQTT_PAYLOAD_SIZE];
static uint8_t tx_buffer[CONFIG_NET_SAMPLE_MQTT_PAYLOAD_SIZE];
/* MQTT payload buffer */
static uint8_t payload_buf[CONFIG_NET_SAMPLE_MQTT_PAYLOAD_SIZE];
/* MQTT broker details */
static struct sockaddr_storage broker;
/* Socket descriptor */
static struct zsock_pollfd fds[1];
static int nfds;
/* JSON payload format */
static const struct json_obj_descr sensor_sample_descr[] = {
JSON_OBJ_DESCR_PRIM(struct sensor_sample, unit, JSON_TOK_STRING),
JSON_OBJ_DESCR_PRIM(struct sensor_sample, value, JSON_TOK_NUMBER),
};
/* MQTT connectivity status flag */
bool mqtt_connected;
/* MQTT client ID buffer */
static uint8_t client_id[50];
#if defined(CONFIG_MQTT_LIB_TLS)
#include "tls_config/cert.h"
/* This should match the CN field in the server's CA cert */
#define TLS_SNI_HOSTNAME CONFIG_NET_SAMPLE_MQTT_BROKER_HOSTNAME
#define APP_CA_CERT_TAG 1
static const sec_tag_t m_sec_tags[] = {
APP_CA_CERT_TAG,
};
/** Register CA certificate for TLS */
static int tls_init(void)
{
int rc;
rc = tls_credential_add(APP_CA_CERT_TAG, TLS_CREDENTIAL_CA_CERTIFICATE,
ca_certificate, sizeof(ca_certificate));
if (rc < 0) {
LOG_ERR("Failed to register public certificate: %d", rc);
return rc;
}
return rc;
}
#endif
static void prepare_fds(struct mqtt_client *client)
{
if (client->transport.type == MQTT_TRANSPORT_NON_SECURE) {
fds[0].fd = client->transport.tcp.sock;
}
#if defined(CONFIG_MQTT_LIB_TLS)
else if (client->transport.type == MQTT_TRANSPORT_SECURE) {
fds[0].fd = client->transport.tls.sock;
}
#endif
fds[0].events = ZSOCK_POLLIN;
nfds = 1;
}
static void clear_fds(void)
{
nfds = 0;
}
/** Initialise the MQTT client ID as the board name with random hex postfix */
static void init_mqtt_client_id(void)
{
snprintk(client_id, sizeof(client_id), CONFIG_BOARD"_%x", (uint8_t)sys_rand32_get());
}
static inline void on_mqtt_connect(void)
{
mqtt_connected = true;
device_write_led(LED_NET, LED_ON);
LOG_INF("Connected to MQTT broker!");
LOG_INF("Hostname: %s", CONFIG_NET_SAMPLE_MQTT_BROKER_HOSTNAME);
LOG_INF("Client ID: %s", client_id);
LOG_INF("Port: %s", CONFIG_NET_SAMPLE_MQTT_BROKER_PORT);
LOG_INF("TLS: %s",
IS_ENABLED(CONFIG_MQTT_LIB_TLS) ? "Enabled" : "Disabled");
}
static inline void on_mqtt_disconnect(void)
{
mqtt_connected = false;
clear_fds();
device_write_led(LED_NET, LED_OFF);
LOG_INF("Disconnected from MQTT broker");
}
/** Called when an MQTT payload is received.
* Reads the payload and calls the commands
* handler if a payloads is received on the
* command topic
*/
static void on_mqtt_publish(struct mqtt_client *const client, const struct mqtt_evt *evt)
{
int rc;
uint8_t payload[CONFIG_NET_SAMPLE_MQTT_PAYLOAD_SIZE];
rc = mqtt_read_publish_payload(client, payload,
CONFIG_NET_SAMPLE_MQTT_PAYLOAD_SIZE);
if (rc < 0) {
LOG_ERR("Failed to read received MQTT payload [%d]", rc);
return;
}
/* Place null terminator at end of payload buffer */
payload[rc] = '\0';
LOG_INF("MQTT payload received!");
LOG_INF("topic: '%s', payload: %s",
evt->param.publish.message.topic.topic.utf8, payload);
/* If the topic is a command, call the command handler */
if (strcmp(evt->param.publish.message.topic.topic.utf8,
CONFIG_NET_SAMPLE_MQTT_SUB_TOPIC_CMD) == 0) {
device_command_handler(payload);
}
}
/** Handler for asynchronous MQTT events */
static void mqtt_event_handler(struct mqtt_client *const client, const struct mqtt_evt *evt)
{
switch (evt->type) {
case MQTT_EVT_CONNACK:
if (evt->result != 0) {
LOG_ERR("MQTT Event Connect failed [%d]", evt->result);
break;
}
on_mqtt_connect();
break;
case MQTT_EVT_DISCONNECT:
on_mqtt_disconnect();
break;
case MQTT_EVT_PINGRESP:
LOG_INF("PINGRESP packet");
break;
case MQTT_EVT_PUBACK:
if (evt->result != 0) {
LOG_ERR("MQTT PUBACK error [%d]", evt->result);
break;
}
LOG_INF("PUBACK packet ID: %u", evt->param.puback.message_id);
break;
case MQTT_EVT_PUBREC:
if (evt->result != 0) {
LOG_ERR("MQTT PUBREC error [%d]", evt->result);
break;
}
LOG_INF("PUBREC packet ID: %u", evt->param.pubrec.message_id);
const struct mqtt_pubrel_param rel_param = {
.message_id = evt->param.pubrec.message_id
};
mqtt_publish_qos2_release(client, &rel_param);
break;
case MQTT_EVT_PUBREL:
if (evt->result != 0) {
LOG_ERR("MQTT PUBREL error [%d]", evt->result);
break;
}
LOG_INF("PUBREL packet ID: %u", evt->param.pubrel.message_id);
const struct mqtt_pubcomp_param rec_param = {
.message_id = evt->param.pubrel.message_id
};
mqtt_publish_qos2_complete(client, &rec_param);
break;
case MQTT_EVT_PUBCOMP:
if (evt->result != 0) {
LOG_ERR("MQTT PUBCOMP error %d", evt->result);
break;
}
LOG_INF("PUBCOMP packet ID: %u", evt->param.pubcomp.message_id);
break;
case MQTT_EVT_SUBACK:
if (evt->result == 0x80) {
LOG_ERR("MQTT SUBACK error [%d]", evt->result);
break;
}
LOG_INF("SUBACK packet ID: %d", evt->param.suback.message_id);
break;
case MQTT_EVT_PUBLISH:
const struct mqtt_publish_param *p = &evt->param.publish;
if (p->message.topic.qos == MQTT_QOS_1_AT_LEAST_ONCE) {
const struct mqtt_puback_param ack_param = {
.message_id = p->message_id
};
mqtt_publish_qos1_ack(client, &ack_param);
} else if (p->message.topic.qos == MQTT_QOS_2_EXACTLY_ONCE) {
const struct mqtt_pubrec_param rec_param = {
.message_id = p->message_id
};
mqtt_publish_qos2_receive(client, &rec_param);
}
on_mqtt_publish(client, evt);
default:
break;
}
}
/** Poll the MQTT socket for received data */
static int poll_mqtt_socket(struct mqtt_client *client, int timeout)
{
int rc;
prepare_fds(client);
if (nfds <= 0) {
return -EINVAL;
}
rc = zsock_poll(fds, nfds, timeout);
if (rc < 0) {
LOG_ERR("Socket poll error [%d]", rc);
}
return rc;
}
/** Retrieves a sensor sample and encodes it in JSON format */
static int get_mqtt_payload(struct mqtt_binstr *payload)
{
int rc;
struct sensor_sample sample;
rc = device_read_sensor(&sample);
if (rc != 0) {
LOG_ERR("Failed to get sensor sample [%d]", rc);
return rc;
}
rc = json_obj_encode_buf(sensor_sample_descr, ARRAY_SIZE(sensor_sample_descr),
&sample, payload_buf, CONFIG_NET_SAMPLE_MQTT_PAYLOAD_SIZE);
if (rc != 0) {
LOG_ERR("Failed to encode JSON object [%d]", rc);
return rc;
}
payload->data = payload_buf;
payload->len = strlen(payload->data);
return rc;
}
int app_mqtt_publish(struct mqtt_client *client)
{
int rc;
struct mqtt_publish_param param;
struct mqtt_binstr payload;
static uint16_t msg_id = 1;
struct mqtt_topic topic = {
.topic = {
.utf8 = CONFIG_NET_SAMPLE_MQTT_PUB_TOPIC,
.size = strlen(topic.topic.utf8)
},
.qos = IS_ENABLED(CONFIG_NET_SAMPLE_MQTT_QOS_0_AT_MOST_ONCE) ? 0 :
(IS_ENABLED(CONFIG_NET_SAMPLE_MQTT_QOS_1_AT_LEAST_ONCE) ? 1 : 2)
};
rc = get_mqtt_payload(&payload);
if (rc != 0) {
LOG_ERR("Failed to get MQTT payload [%d]", rc);
}
param.message.topic = topic;
param.message.payload = payload;
param.message_id = msg_id++;
param.dup_flag = 0;
param.retain_flag = 0;
rc = mqtt_publish(client, &param);
if (rc != 0) {
LOG_ERR("MQTT Publish failed [%d]", rc);
}
LOG_INF("Published to topic '%s', QoS %d",
param.message.topic.topic.utf8,
param.message.topic.qos);
return rc;
}
int app_mqtt_subscribe(struct mqtt_client *client)
{
int rc;
struct mqtt_topic sub_topics[] = {
{
.topic = {
.utf8 = CONFIG_NET_SAMPLE_MQTT_SUB_TOPIC_CMD,
.size = strlen(sub_topics->topic.utf8)
},
.qos = IS_ENABLED(CONFIG_NET_SAMPLE_MQTT_QOS_0_AT_MOST_ONCE) ? 0 :
(IS_ENABLED(CONFIG_NET_SAMPLE_MQTT_QOS_1_AT_LEAST_ONCE) ? 1 : 2)
}
};
const struct mqtt_subscription_list sub_list = {
.list = sub_topics,
.list_count = ARRAY_SIZE(sub_topics),
.message_id = 5841u
};
LOG_INF("Subscribing to %d topic(s)", sub_list.list_count);
rc = mqtt_subscribe(client, &sub_list);
if (rc != 0) {
LOG_ERR("MQTT Subscribe failed [%d]", rc);
}
return rc;
}
/** Process incoming MQTT data and keep the connection alive*/
int app_mqtt_process(struct mqtt_client *client)
{
int rc;
rc = poll_mqtt_socket(client, mqtt_keepalive_time_left(client));
if (rc != 0) {
if (fds[0].revents & ZSOCK_POLLIN) {
/* MQTT data received */
rc = mqtt_input(client);
if (rc != 0) {
LOG_ERR("MQTT Input failed [%d]", rc);
return rc;
}
/* Socket error */
if (fds[0].revents & (ZSOCK_POLLHUP | ZSOCK_POLLERR)) {
LOG_ERR("MQTT socket closed / error");
return -ENOTCONN;
}
}
} else {
/* Socket poll timed out, time to call mqtt_live() */
rc = mqtt_live(client);
if (rc != 0) {
LOG_ERR("MQTT Live failed [%d]", rc);
return rc;
}
}
return 0;
}
void app_mqtt_run(struct mqtt_client *client)
{
int rc;
/* Subscribe to MQTT topics */
app_mqtt_subscribe(client);
/* Thread will primarily remain in this loop */
while (mqtt_connected) {
rc = app_mqtt_process(client);
if (rc != 0) {
break;
}
}
/* Gracefully close connection */
mqtt_disconnect(client);
}
void app_mqtt_connect(struct mqtt_client *client)
{
int rc = 0;
mqtt_connected = false;
/* Block until MQTT CONNACK event callback occurs */
while (!mqtt_connected) {
rc = mqtt_connect(client);
if (rc != 0) {
LOG_ERR("MQTT Connect failed [%d]", rc);
k_msleep(MSECS_WAIT_RECONNECT);
continue;
}
/* Poll MQTT socket for response */
rc = poll_mqtt_socket(client, MSECS_NET_POLL_TIMEOUT);
if (rc > 0) {
mqtt_input(client);
}
if (!mqtt_connected) {
mqtt_abort(client);
}
}
}
int app_mqtt_init(struct mqtt_client *client)
{
int rc;
uint8_t broker_ip[NET_IPV4_ADDR_LEN];
struct sockaddr_in *broker4;
struct addrinfo *result;
const struct addrinfo hints = {
.ai_family = AF_INET,
.ai_socktype = SOCK_STREAM
};
/* Resolve IP address of MQTT broker */
rc = getaddrinfo(CONFIG_NET_SAMPLE_MQTT_BROKER_HOSTNAME,
CONFIG_NET_SAMPLE_MQTT_BROKER_PORT, &hints, &result);
if (rc != 0) {
LOG_ERR("Failed to resolve broker hostname [%s]", gai_strerror(rc));
return -EIO;
}
if (result == NULL) {
LOG_ERR("Broker address not found");
return -ENOENT;
}
broker4 = (struct sockaddr_in *)&broker;
broker4->sin_addr.s_addr = ((struct sockaddr_in *)result->ai_addr)->sin_addr.s_addr;
broker4->sin_family = AF_INET;
broker4->sin_port = ((struct sockaddr_in *)result->ai_addr)->sin_port;
freeaddrinfo(result);
/* Log resolved IP address */
inet_ntop(AF_INET, &broker4->sin_addr.s_addr, broker_ip, sizeof(broker_ip));
LOG_INF("Connecting to MQTT broker @ %s", broker_ip);
/* MQTT client configuration */
init_mqtt_client_id();
mqtt_client_init(client);
client->broker = &broker;
client->evt_cb = mqtt_event_handler;
client->client_id.utf8 = client_id;
client->client_id.size = strlen(client->client_id.utf8);
client->password = NULL;
client->user_name = NULL;
client->protocol_version = MQTT_VERSION_3_1_1;
/* MQTT buffers configuration */
client->rx_buf = rx_buffer;
client->rx_buf_size = sizeof(rx_buffer);
client->tx_buf = tx_buffer;
client->tx_buf_size = sizeof(tx_buffer);
/* MQTT transport configuration */
#if defined(CONFIG_MQTT_LIB_TLS)
struct mqtt_sec_config *tls_config;
client->transport.type = MQTT_TRANSPORT_SECURE;
rc = tls_init();
if (rc != 0) {
LOG_ERR("TLS init error");
return rc;
}
tls_config = &client->transport.tls.config;
tls_config->peer_verify = TLS_PEER_VERIFY_REQUIRED;
tls_config->cipher_list = NULL;
tls_config->sec_tag_list = m_sec_tags;
tls_config->sec_tag_count = ARRAY_SIZE(m_sec_tags);
#if defined(CONFIG_MBEDTLS_SERVER_NAME_INDICATION)
tls_config->hostname = TLS_SNI_HOSTNAME;
#else
tls_config->hostname = NULL;
#endif /* CONFIG_MBEDTLS_SERVER_NAME_INDICATION */
#endif /* CONFIG_MQTT_LIB_TLS */
return rc;
}

View file

@ -0,0 +1,43 @@
/*
* Copyright (c) 2024 Analog Devices, Inc.
*
* SPDX-License-Identifier: Apache-2.0
*/
#ifndef __MQTT_CLIENT_H__
#define __MQTT_CLIENT_H__
/** MQTT connection timeouts */
#define MSECS_NET_POLL_TIMEOUT 5000
#define MSECS_WAIT_RECONNECT 1000
/** MQTT connection status flag */
extern bool mqtt_connected;
/**
* @brief Initialise the MQTT client & broker configuration
*/
int app_mqtt_init(struct mqtt_client *client);
/**
* @brief Blocking function that establishes connectivity to the MQTT broker
*/
void app_mqtt_connect(struct mqtt_client *client);
/**
* @brief Subscribes to user-defined MQTT topics and continuously
* processes incoming data while the MQTT connection is active
*/
void app_mqtt_run(struct mqtt_client *client);
/**
* @brief Subscribe to user-defined MQTT topics
*/
int app_mqtt_subscribe(struct mqtt_client *client);
/**
* @brief Publish MQTT payload
*/
int app_mqtt_publish(struct mqtt_client *client);
#endif /* __MQTT_CLIENT_H__ */

View file

@ -0,0 +1,45 @@
/*
* Copyright (c) 2024 Analog Devices, Inc.
*
* SPDX-License-Identifier: Apache-2.0
*/
#ifndef __CERT_H__
#define __CERT_H__
/* The CA certficate of the MQTT broker should be included here
* The certificate can either be in DER or PEM format.
* A DER certificate can be converted to a byte array using
* "cat ca.crt | sed -e '1d;$d' | base64 -d |xxd -i"
* If using a PEM certifificate, each line should be wrapped in "\r\n"
*/
/* CA certificate for Mosquitto public broker
* (available from test.mosquitto.org, valid at time of development)
*/
static const unsigned char ca_certificate[] = "-----BEGIN CERTIFICATE-----\r\n"
"MIIEAzCCAuugAwIBAgIUBY1hlCGvdj4NhBXkZ/uLUZNILAwwDQYJKoZIhvcNAQEL\r\n"
"BQAwgZAxCzAJBgNVBAYTAkdCMRcwFQYDVQQIDA5Vbml0ZWQgS2luZ2RvbTEOMAwG\r\n"
"A1UEBwwFRGVyYnkxEjAQBgNVBAoMCU1vc3F1aXR0bzELMAkGA1UECwwCQ0ExFjAU\r\n"
"BgNVBAMMDW1vc3F1aXR0by5vcmcxHzAdBgkqhkiG9w0BCQEWEHJvZ2VyQGF0Y2hv\r\n"
"by5vcmcwHhcNMjAwNjA5MTEwNjM5WhcNMzAwNjA3MTEwNjM5WjCBkDELMAkGA1UE\r\n"
"BhMCR0IxFzAVBgNVBAgMDlVuaXRlZCBLaW5nZG9tMQ4wDAYDVQQHDAVEZXJieTES\r\n"
"MBAGA1UECgwJTW9zcXVpdHRvMQswCQYDVQQLDAJDQTEWMBQGA1UEAwwNbW9zcXVp\r\n"
"dHRvLm9yZzEfMB0GCSqGSIb3DQEJARYQcm9nZXJAYXRjaG9vLm9yZzCCASIwDQYJ\r\n"
"KoZIhvcNAQEBBQADggEPADCCAQoCggEBAME0HKmIzfTOwkKLT3THHe+ObdizamPg\r\n"
"UZmD64Tf3zJdNeYGYn4CEXbyP6fy3tWc8S2boW6dzrH8SdFf9uo320GJA9B7U1FW\r\n"
"Te3xda/Lm3JFfaHjkWw7jBwcauQZjpGINHapHRlpiCZsquAthOgxW9SgDgYlGzEA\r\n"
"s06pkEFiMw+qDfLo/sxFKB6vQlFekMeCymjLCbNwPJyqyhFmPWwio/PDMruBTzPH\r\n"
"3cioBnrJWKXc3OjXdLGFJOfj7pP0j/dr2LH72eSvv3PQQFl90CZPFhrCUcRHSSxo\r\n"
"E6yjGOdnz7f6PveLIB574kQORwt8ePn0yidrTC1ictikED3nHYhMUOUCAwEAAaNT\r\n"
"MFEwHQYDVR0OBBYEFPVV6xBUFPiGKDyo5V3+Hbh4N9YSMB8GA1UdIwQYMBaAFPVV\r\n"
"6xBUFPiGKDyo5V3+Hbh4N9YSMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQEL\r\n"
"BQADggEBAGa9kS21N70ThM6/Hj9D7mbVxKLBjVWe2TPsGfbl3rEDfZ+OKRZ2j6AC\r\n"
"6r7jb4TZO3dzF2p6dgbrlU71Y/4K0TdzIjRj3cQ3KSm41JvUQ0hZ/c04iGDg/xWf\r\n"
"+pp58nfPAYwuerruPNWmlStWAXf0UTqRtg4hQDWBuUFDJTuWuuBvEXudz74eh/wK\r\n"
"sMwfu1HFvjy5Z0iMDU8PUDepjVolOCue9ashlS4EB5IECdSR2TItnAIiIwimx839\r\n"
"LdUdRudafMu5T5Xma182OC0/u/xRlEm+tvKGGmfFcN0piqVl8OrSPBgIlb+1IKJE\r\n"
"m/XriWr/Cq4h/JfB7NTsezVslgkBaoU=\r\n"
"-----END CERTIFICATE-----\r\n";
#endif /* __CERT_H__ */