samples/net/mqtt: MQTT publisher and subscriber

This sample shows how to integrate Paho's MQTT Packet Library to create
a MQTT publisher and subscriber application with Zephyr.

Change-Id: I1cb4124386752eef6187a56ccc9f6fd7fe7175a3
Signed-off-by: Flavio Santes <flavio.santes@intel.com>
This commit is contained in:
Flavio Santes 2016-05-24 13:43:04 -05:00 committed by Anas Nashif
commit f194272c41
11 changed files with 664 additions and 0 deletions

View file

@ -0,0 +1,10 @@
subdir-ccflags-y += -I$(SOURCE_DIR)/paho/MQTTPacket/src
obj-y += src/
obj-y += paho/MQTTPacket/src/MQTTSubscribeClient.o
obj-y += paho/MQTTPacket/src/MQTTFormat.o
obj-y += paho/MQTTPacket/src/MQTTUnsubscribeClient.o
obj-y += paho/MQTTPacket/src/MQTTDeserializePublish.o
obj-y += paho/MQTTPacket/src/MQTTSerializePublish.o
obj-y += paho/MQTTPacket/src/MQTTConnectClient.o
obj-y += paho/MQTTPacket/src/MQTTPacket.o

View file

@ -0,0 +1,23 @@
#
# Copyright (c) 2016 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
KERNEL_TYPE = nano
BOARD = galileo
CONF_FILE = prj_ethernet.conf
SOURCE_DIR = $(ZEPHYR_BASE)/samples/net/paho_mqtt_client
include $(ZEPHYR_BASE)/Makefile.inc

View file

@ -0,0 +1,71 @@
MQTT sample using the Paho's MQTT Packet.
Requirements
------------
* An Ethernet LAN for testing purposes.
* A MQTT 3.1.1 Gateway running in the LAN.
* A Galileo Dev Board connected to the LAN.
* A FTDI 6-pin cable to see debugging information from Galileo.
The Mosquitto MQTT implementation is used in this document.
However, any MQTT 3.1.1 compliant gateway must work. See
http://mosquitto.org/ for more details.
Building instructions
---------------------
* Change src/config.h according to your network configuration.
* Download the Paho's MQTT Embedded C Library.
See http://www.eclipse.org/paho/clients/c/embedded/ for more
information about Paho's MQTT Packet Library.
* Modify the src/Makefile to reflect the location of your Paho
repository. In this example, it is assumed that src/paho_mqtt_pkt
is the directory that contains the MQTTPacket source code.
Usage
-----
* From a terminal, run the gateway:
mosquitto -v
* Connect the FTDI cable to the Galileo. Open a terminal and run:
screen /dev/ttyUSB0 115200
* Turn on the Galileo.
* Galileo's MQTT Client will publish to the topic "zephyr" the
following message: "Hello World from Zephyr!"
* A subscriber is needed in order to receive the messages sent by the
Galileo. For example, the following command must be executed from a
terminal to create a MQTT subscriber:
mosquitto_sub -t zephyr
If this command is executed from a computer other than the gateway,
the host must be specified.
* To publish a message, the following command can be used:
mosquitto_pub -t zephyr -m "Hello World"
The "Hello World" message must be displayed in the Galileo's debugging
console.
Final Remakrs
-------------
The provided source code perhaps does not exemplify the best way to use
Paho's MQTT in Zephyr. For example, PINGREQ must be sent periodically,
but not too often as in this use-case. Furthermore, DISCONNECT is never
used here.

View file

@ -0,0 +1,13 @@
CONFIG_STDOUT_CONSOLE=y
CONFIG_NETWORKING=y
CONFIG_ETHERNET=y
CONFIG_ETH_DW=y
CONFIG_NANO_TIMEOUTS=y
CONFIG_NETWORKING_WITH_TCP=y
CONFIG_NETWORKING_WITH_IPV4=y
CONFIG_NETWORKING_IPV6_NO_ND=y
CONFIG_IP_BUF_RX_SIZE=12
CONFIG_IP_BUF_TX_SIZE=13
#CONFIG_NETWORKING_WITH_LOGGING=y

View file

@ -0,0 +1,24 @@
#
# Copyright (c) 2016 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
ccflags-y +=-I${srctree}/net/ip/contiki
ccflags-y +=-I${srctree}/net/ip/contiki/os/lib
ccflags-y +=-I${srctree}/net/ip/contiki/os
ccflags-y +=-I${srctree}/net/ip
obj-y += tcp.o
obj-y += mqtt.o
obj-y += main.o

View file

@ -0,0 +1,45 @@
/*
* Copyright (c) 2016 Intel Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef _CONFIG_H_
#define _CONFIG_H_
#define NETMASK0 255
#define NETMASK1 255
#define NETMASK2 255
#define NETMASK3 0
#define CLIENT_IPADDR0 192
#define CLIENT_IPADDR1 168
#define CLIENT_IPADDR2 1
#define CLIENT_IPADDR3 100
#define SERVER_IPADDR0 192
#define SERVER_IPADDR1 168
#define SERVER_IPADDR2 1
#define SERVER_IPADDR3 2
#define SERVER_MQTT_PORT 1883
#define CLIENT_MQTT_PORT 8484
#define TCP_RX_TIMEOUT 50
#define TCP_RETRY_TIMEOUT 50
#define APP_SLEEP_TICKS 10
#endif

View file

@ -0,0 +1,77 @@
/*
* Copyright (c) 2016 Intel Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <zephyr.h>
#include <stdio.h>
#include <sections.h>
#include <errno.h>
#include "config.h"
#include "tcp.h"
#include "mqtt.h"
#define RC_MSG(rc) (rc) == 0 ? "success" : "failure"
#define STACKSIZE 1024
uint8_t stack[STACKSIZE];
struct net_context *ctx;
void fiber(void)
{
char *client_name = "zephyr_client";
char *topic = "zephyr";
char *msg = "Hello World from Zephyr!";
int rc;
do {
rc = mqtt_connect(ctx, client_name);
printf("Connect: %s\n", RC_MSG(rc));
fiber_sleep(APP_SLEEP_TICKS);
} while (rc != 0);
do {
rc = mqtt_subscribe(ctx, topic);
printf("Subscribe: %s\n", RC_MSG(rc));
fiber_sleep(APP_SLEEP_TICKS);
} while (rc != 0);
do {
rc = mqtt_pingreq(ctx);
printf("Pingreq: %s\n", RC_MSG(rc));
rc = mqtt_publish(ctx, topic, msg);
printf("Publish: %s\n", RC_MSG(rc));
rc = mqtt_publish_read(ctx);
printf("Publish read: %s\n", RC_MSG(rc));
fiber_sleep(APP_SLEEP_TICKS);
} while (1);
}
void main(void)
{
net_init();
tcp_init(&ctx);
task_fiber_start(stack, STACKSIZE, (nano_fiber_entry_t)fiber,
0, 0, 7, 0);
}

View file

@ -0,0 +1,219 @@
/*
* Copyright (c) 2016 Intel Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <string.h>
#include <errno.h>
#include "mqtt.h"
#include "MQTTPacket.h"
#include "MQTTConnect.h"
#include "MQTTPublish.h"
#include "MQTTSubscribe.h"
#include "MQTTUnsubscribe.h"
/* non thread safe */
#define BUF_SIZE 128
static uint8_t mqtt_buffer[BUF_SIZE];
#include "tcp.h"
int mqtt_connect(struct net_context *ctx, char *client_name)
{
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
unsigned char session_present;
unsigned char conn_ack;
size_t rx_len;
size_t tx_len;
int rc;
memset(&data, 0x00, sizeof(MQTTPacket_connectData));
data.MQTTVersion = 4;
data.clientID.cstring = client_name;
data.keepAliveInterval = 500;
data.cleansession = 1;
data.username.cstring = "zephyr";
data.password.cstring = "1234";
data.willFlag = 1;
data.will.message.cstring = "zephyr_will_msg";
data.will.retained = 0;
data.will.topicName.cstring = "zephyr_will_topic";
data.will.qos = 1;
rc = MQTTSerialize_connect(mqtt_buffer, BUF_SIZE, &data);
tx_len = rc;
rc = rc <= 0 ? -1 : 0;
if (rc != 0) {
return rc;
}
tcp_tx(ctx, mqtt_buffer, tx_len);
rc = tcp_rx(ctx, mqtt_buffer, &rx_len, BUF_SIZE);
if (rc != 0) {
return -EIO;
}
rc = MQTTDeserialize_connack(&session_present, &conn_ack,
mqtt_buffer, rx_len);
rc = rc != 1 ? -EINVAL : 0;
if (rc != 0) {
return -EINVAL;
}
return conn_ack;
}
int mqtt_disconnect(struct net_context *ctx)
{
int tx_len;
int rc;
rc = MQTTSerialize_disconnect(mqtt_buffer, BUF_SIZE);
tx_len = rc;
rc = rc <= 0 ? -EINVAL : 0;
if (rc != 0) {
return -EINVAL;
}
tcp_tx(ctx, mqtt_buffer, tx_len);
return 0;
}
int mqtt_publish(struct net_context *ctx, char *topic, char *msg)
{
MQTTString topic_str = MQTTString_initializer;
unsigned char pkt_type;
unsigned char dup;
unsigned short pkt_id;
size_t tx_len;
size_t rx_len;
int rc = 0;
topic_str.cstring = topic;
rc = MQTTSerialize_publish(mqtt_buffer, BUF_SIZE, 0, 0, 0, 0,
topic_str, (unsigned char *)msg,
strlen(msg));
tx_len = rc;
rc = rc <= 0 ? -EINVAL : 0;
if (rc != 0) {
return rc;
}
tcp_tx(ctx, mqtt_buffer, tx_len);
rc = tcp_rx(ctx, mqtt_buffer, &rx_len, BUF_SIZE);
if (rc != 0) {
return -EIO;
}
rc = MQTTDeserialize_ack(&pkt_type, &dup, &pkt_id, mqtt_buffer,
rx_len);
return rc == 1 ? 0 : -EINVAL;
}
int mqtt_pingreq(struct net_context *ctx)
{
unsigned char pkt_type;
unsigned char dup;
unsigned short pkt_id;
size_t tx_len;
size_t rx_len;
int rc;
rc = MQTTSerialize_pingreq(mqtt_buffer, BUF_SIZE);
tx_len = rc;
rc = rc <= 0 ? -EINVAL : 0;
if (rc != 0) {
return -EINVAL;
}
tcp_tx(ctx, mqtt_buffer, tx_len);
rc = tcp_rx(ctx, mqtt_buffer, &rx_len, BUF_SIZE);
if (rc != 0) {
return -EIO;
}
rc = MQTTDeserialize_ack(&pkt_type, &dup, &pkt_id, mqtt_buffer,
rx_len);
return rc == 1 ? 0 : -EINVAL;
}
int mqtt_publish_read(struct net_context *ctx)
{
MQTTString received_topic;
unsigned char dup;
unsigned char retained;
unsigned short msg_id;
unsigned char *msg;
size_t rx_len;
int qos;
int msg_len;
int rc;
rc = tcp_rx(ctx, mqtt_buffer, &rx_len, BUF_SIZE);
if (rc != 0) {
return -EIO;
}
rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msg_id,
&received_topic, &msg, &msg_len,
mqtt_buffer, rx_len);
rc = rc == 1 ? 0 : -EIO;
if (rc == 0) {
printf("\n\tReceived message: %.*s\n\n", msg_len, msg);
}
return rc;
}
int mqtt_subscribe(struct net_context *ctx, char *topic)
{
MQTTString topic_str = MQTTString_initializer;
unsigned short submsg_id;
size_t rx_len;
size_t tx_len;
int msg_id = 1;
int req_qos = 0;
int sub_count;
int granted_qos;
int rc = 0;
topic_str.cstring = topic;
rc = MQTTSerialize_subscribe(mqtt_buffer, BUF_SIZE, 0, msg_id, 1,
&topic_str, &req_qos);
tx_len = rc;
rc = rc <= 0 ? -EINVAL : 0;
if (rc != 0) {
return -EINVAL;
}
tcp_tx(ctx, mqtt_buffer, tx_len);
rc = tcp_rx(ctx, mqtt_buffer, &rx_len, BUF_SIZE);
if (rc != 0) {
return -EIO;
}
rc = MQTTDeserialize_suback(&submsg_id, 1, &sub_count, &granted_qos,
mqtt_buffer, rx_len);
return rc != 1 ? -EINVAL : granted_qos;
}

View file

@ -0,0 +1,29 @@
/*
* Copyright (c) 2016 Intel Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef _MQTT_H_
#define _MQTT_H_
#include <net/net_core.h>
int mqtt_connect(struct net_context *ctx, char *client_name);
int mqtt_disconnect(struct net_context *ctx);
int mqtt_publish(struct net_context *ctx, char *topic, char *msg);
int mqtt_publish_read(struct net_context *ctx);
int mqtt_subscribe(struct net_context *ctx, char *topic);
int mqtt_pingreq(struct net_context *ctx);
#endif

View file

@ -0,0 +1,126 @@
/*
* Copyright (c) 2016 Intel Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "config.h"
#include "tcp.h"
#include <net/ip_buf.h>
#include <net/net_core.h>
#include <net/net_socket.h>
#include <errno.h>
#include <stdio.h>
uip_ipaddr_t uip_hostaddr = { { CLIENT_IPADDR0, CLIENT_IPADDR1,
CLIENT_IPADDR2, CLIENT_IPADDR3 } };
uip_ipaddr_t uip_netmask = { { NETMASK0, NETMASK1, NETMASK2, NETMASK3 } };
#define CLIENT_IP_ADDR { { { CLIENT_IPADDR0, CLIENT_IPADDR1, \
CLIENT_IPADDR2, CLIENT_IPADDR3 } } }
#define SERVER_IP_ADDR { { { SERVER_IPADDR0, SERVER_IPADDR1, \
SERVER_IPADDR2, SERVER_IPADDR3 } } }
#define INET_FAMILY AF_INET
int tcp_tx(struct net_context *ctx, uint8_t *buf, size_t size)
{
int rc = 0;
uint8_t *ptr;
struct net_buf *nbuf = NULL;
nbuf = ip_buf_get_tx(ctx);
if (nbuf == NULL) {
printf("[%s:%d] Unable to get buffer\n", __func__, __LINE__);
return -1;
}
ptr = net_buf_add(nbuf, size);
memcpy(ptr, buf, size);
ip_buf_appdatalen(nbuf) = size;
do {
rc = net_send(nbuf);
if (rc >= 0) {
ip_buf_unref(nbuf);
return 0;
}
switch (rc) {
case -EINPROGRESS:
printf("%s: no connection yet, try again\n", __func__);
fiber_sleep(TCP_RETRY_TIMEOUT);
break;
case -EAGAIN:
case -ECONNRESET:
printf("%s: no connection, try again later\n", __func__);
fiber_sleep(TCP_RETRY_TIMEOUT);
break;
default:
printf("%s: sending %d bytes failed\n",
__func__, size);
ip_buf_unref(nbuf);
return -EIO;
}
} while (1);
return 0;
}
int tcp_rx(struct net_context *ctx, uint8_t *buf, size_t *read_bytes, size_t size)
{
int rc;
struct net_buf *nbuf;
nbuf = net_receive(ctx, TCP_RX_TIMEOUT);
rc = -EIO;
if (nbuf != NULL) {
*read_bytes = ip_buf_appdatalen(nbuf);
if (*read_bytes > size) {
rc = -ENOMEM;
} else {
memcpy(buf, ip_buf_appdata(nbuf), *read_bytes);
rc = 0;
}
ip_buf_unref(nbuf);
}
return rc;
}
int tcp_init(struct net_context **ctx)
{
static struct in_addr server_addr = SERVER_IP_ADDR;
static struct in_addr client_addr = CLIENT_IP_ADDR;
static struct net_addr server;
static struct net_addr client;
server.in_addr = server_addr;
server.family = AF_INET;
client.in_addr = client_addr;
client.family = AF_INET;
*ctx = net_context_get(IPPROTO_TCP,
&server, SERVER_MQTT_PORT,
&client, CLIENT_MQTT_PORT);
if (*ctx == NULL) {
printf("%s: Unable to get network context\n", __func__);
return -EINVAL;
}
return 0;
}

View file

@ -0,0 +1,27 @@
/*
* Copyright (c) 2016 Intel Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef _TCP_H_
#define _TCP_H_
#include <net/net_core.h>
int tcp_init(struct net_context **ctx);
int tcp_tx(struct net_context *ctx, uint8_t *buf, size_t size);
int tcp_rx(struct net_context *ctx, uint8_t *buf, size_t *read_bytes, size_t size);
#endif