diff --git a/samples/net/paho_mqtt_client/Kbuild b/samples/net/paho_mqtt_client/Kbuild new file mode 100644 index 00000000000..991ee5c4811 --- /dev/null +++ b/samples/net/paho_mqtt_client/Kbuild @@ -0,0 +1,10 @@ +subdir-ccflags-y += -I$(SOURCE_DIR)/paho/MQTTPacket/src + +obj-y += src/ +obj-y += paho/MQTTPacket/src/MQTTSubscribeClient.o +obj-y += paho/MQTTPacket/src/MQTTFormat.o +obj-y += paho/MQTTPacket/src/MQTTUnsubscribeClient.o +obj-y += paho/MQTTPacket/src/MQTTDeserializePublish.o +obj-y += paho/MQTTPacket/src/MQTTSerializePublish.o +obj-y += paho/MQTTPacket/src/MQTTConnectClient.o +obj-y += paho/MQTTPacket/src/MQTTPacket.o diff --git a/samples/net/paho_mqtt_client/Makefile b/samples/net/paho_mqtt_client/Makefile new file mode 100644 index 00000000000..f93cce2add4 --- /dev/null +++ b/samples/net/paho_mqtt_client/Makefile @@ -0,0 +1,23 @@ +# +# Copyright (c) 2016 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +KERNEL_TYPE = nano +BOARD = galileo +CONF_FILE = prj_ethernet.conf + +SOURCE_DIR = $(ZEPHYR_BASE)/samples/net/paho_mqtt_client + +include $(ZEPHYR_BASE)/Makefile.inc diff --git a/samples/net/paho_mqtt_client/README b/samples/net/paho_mqtt_client/README new file mode 100644 index 00000000000..ad969f7f27f --- /dev/null +++ b/samples/net/paho_mqtt_client/README @@ -0,0 +1,71 @@ +MQTT sample using the Paho's MQTT Packet. + +Requirements +------------ + +* An Ethernet LAN for testing purposes. + +* A MQTT 3.1.1 Gateway running in the LAN. + +* A Galileo Dev Board connected to the LAN. + +* A FTDI 6-pin cable to see debugging information from Galileo. + +The Mosquitto MQTT implementation is used in this document. +However, any MQTT 3.1.1 compliant gateway must work. See +http://mosquitto.org/ for more details. + + +Building instructions +--------------------- + +* Change src/config.h according to your network configuration. + +* Download the Paho's MQTT Embedded C Library. + See http://www.eclipse.org/paho/clients/c/embedded/ for more + information about Paho's MQTT Packet Library. + +* Modify the src/Makefile to reflect the location of your Paho + repository. In this example, it is assumed that src/paho_mqtt_pkt + is the directory that contains the MQTTPacket source code. + +Usage +----- + +* From a terminal, run the gateway: + + mosquitto -v + +* Connect the FTDI cable to the Galileo. Open a terminal and run: + + screen /dev/ttyUSB0 115200 + +* Turn on the Galileo. + +* Galileo's MQTT Client will publish to the topic "zephyr" the + following message: "Hello World from Zephyr!" + +* A subscriber is needed in order to receive the messages sent by the + Galileo. For example, the following command must be executed from a + terminal to create a MQTT subscriber: + + mosquitto_sub -t zephyr + + If this command is executed from a computer other than the gateway, + the host must be specified. + +* To publish a message, the following command can be used: + + mosquitto_pub -t zephyr -m "Hello World" + + The "Hello World" message must be displayed in the Galileo's debugging + console. + + +Final Remakrs +------------- + +The provided source code perhaps does not exemplify the best way to use +Paho's MQTT in Zephyr. For example, PINGREQ must be sent periodically, +but not too often as in this use-case. Furthermore, DISCONNECT is never +used here. diff --git a/samples/net/paho_mqtt_client/prj_ethernet.conf b/samples/net/paho_mqtt_client/prj_ethernet.conf new file mode 100644 index 00000000000..9a65653941c --- /dev/null +++ b/samples/net/paho_mqtt_client/prj_ethernet.conf @@ -0,0 +1,13 @@ +CONFIG_STDOUT_CONSOLE=y +CONFIG_NETWORKING=y +CONFIG_ETHERNET=y +CONFIG_ETH_DW=y + +CONFIG_NANO_TIMEOUTS=y +CONFIG_NETWORKING_WITH_TCP=y +CONFIG_NETWORKING_WITH_IPV4=y +CONFIG_NETWORKING_IPV6_NO_ND=y +CONFIG_IP_BUF_RX_SIZE=12 +CONFIG_IP_BUF_TX_SIZE=13 + +#CONFIG_NETWORKING_WITH_LOGGING=y diff --git a/samples/net/paho_mqtt_client/src/Makefile b/samples/net/paho_mqtt_client/src/Makefile new file mode 100644 index 00000000000..4afa1a8185a --- /dev/null +++ b/samples/net/paho_mqtt_client/src/Makefile @@ -0,0 +1,24 @@ +# +# Copyright (c) 2016 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +ccflags-y +=-I${srctree}/net/ip/contiki +ccflags-y +=-I${srctree}/net/ip/contiki/os/lib +ccflags-y +=-I${srctree}/net/ip/contiki/os +ccflags-y +=-I${srctree}/net/ip + +obj-y += tcp.o +obj-y += mqtt.o +obj-y += main.o diff --git a/samples/net/paho_mqtt_client/src/config.h b/samples/net/paho_mqtt_client/src/config.h new file mode 100644 index 00000000000..cb3ee74f8ab --- /dev/null +++ b/samples/net/paho_mqtt_client/src/config.h @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2016 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _CONFIG_H_ +#define _CONFIG_H_ + +#define NETMASK0 255 +#define NETMASK1 255 +#define NETMASK2 255 +#define NETMASK3 0 + +#define CLIENT_IPADDR0 192 +#define CLIENT_IPADDR1 168 +#define CLIENT_IPADDR2 1 +#define CLIENT_IPADDR3 100 + +#define SERVER_IPADDR0 192 +#define SERVER_IPADDR1 168 +#define SERVER_IPADDR2 1 +#define SERVER_IPADDR3 2 + +#define SERVER_MQTT_PORT 1883 + +#define CLIENT_MQTT_PORT 8484 + +#define TCP_RX_TIMEOUT 50 + +#define TCP_RETRY_TIMEOUT 50 + +#define APP_SLEEP_TICKS 10 + +#endif diff --git a/samples/net/paho_mqtt_client/src/main.c b/samples/net/paho_mqtt_client/src/main.c new file mode 100644 index 00000000000..ed34a6f8575 --- /dev/null +++ b/samples/net/paho_mqtt_client/src/main.c @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2016 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include "config.h" +#include "tcp.h" +#include "mqtt.h" + +#define RC_MSG(rc) (rc) == 0 ? "success" : "failure" +#define STACKSIZE 1024 + +uint8_t stack[STACKSIZE]; + +struct net_context *ctx; + + +void fiber(void) +{ + char *client_name = "zephyr_client"; + char *topic = "zephyr"; + char *msg = "Hello World from Zephyr!"; + int rc; + + do { + rc = mqtt_connect(ctx, client_name); + printf("Connect: %s\n", RC_MSG(rc)); + + fiber_sleep(APP_SLEEP_TICKS); + } while (rc != 0); + + do { + + rc = mqtt_subscribe(ctx, topic); + printf("Subscribe: %s\n", RC_MSG(rc)); + + fiber_sleep(APP_SLEEP_TICKS); + } while (rc != 0); + + do { + rc = mqtt_pingreq(ctx); + printf("Pingreq: %s\n", RC_MSG(rc)); + + rc = mqtt_publish(ctx, topic, msg); + printf("Publish: %s\n", RC_MSG(rc)); + + rc = mqtt_publish_read(ctx); + printf("Publish read: %s\n", RC_MSG(rc)); + + fiber_sleep(APP_SLEEP_TICKS); + } while (1); +} + +void main(void) +{ + net_init(); + tcp_init(&ctx); + + task_fiber_start(stack, STACKSIZE, (nano_fiber_entry_t)fiber, + 0, 0, 7, 0); +} diff --git a/samples/net/paho_mqtt_client/src/mqtt.c b/samples/net/paho_mqtt_client/src/mqtt.c new file mode 100644 index 00000000000..36a4bcd36a3 --- /dev/null +++ b/samples/net/paho_mqtt_client/src/mqtt.c @@ -0,0 +1,219 @@ +/* + * Copyright (c) 2016 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "mqtt.h" + +#include "MQTTPacket.h" +#include "MQTTConnect.h" +#include "MQTTPublish.h" +#include "MQTTSubscribe.h" +#include "MQTTUnsubscribe.h" + +/* non thread safe */ +#define BUF_SIZE 128 +static uint8_t mqtt_buffer[BUF_SIZE]; + + +#include "tcp.h" + +int mqtt_connect(struct net_context *ctx, char *client_name) +{ + MQTTPacket_connectData data = MQTTPacket_connectData_initializer; + unsigned char session_present; + unsigned char conn_ack; + size_t rx_len; + size_t tx_len; + int rc; + + memset(&data, 0x00, sizeof(MQTTPacket_connectData)); + + data.MQTTVersion = 4; + data.clientID.cstring = client_name; + data.keepAliveInterval = 500; + data.cleansession = 1; + data.username.cstring = "zephyr"; + data.password.cstring = "1234"; + data.willFlag = 1; + data.will.message.cstring = "zephyr_will_msg"; + data.will.retained = 0; + data.will.topicName.cstring = "zephyr_will_topic"; + data.will.qos = 1; + + rc = MQTTSerialize_connect(mqtt_buffer, BUF_SIZE, &data); + tx_len = rc; + rc = rc <= 0 ? -1 : 0; + if (rc != 0) { + return rc; + } + + tcp_tx(ctx, mqtt_buffer, tx_len); + + rc = tcp_rx(ctx, mqtt_buffer, &rx_len, BUF_SIZE); + if (rc != 0) { + return -EIO; + } + rc = MQTTDeserialize_connack(&session_present, &conn_ack, + mqtt_buffer, rx_len); + rc = rc != 1 ? -EINVAL : 0; + if (rc != 0) { + return -EINVAL; + } + + return conn_ack; +} + +int mqtt_disconnect(struct net_context *ctx) +{ + int tx_len; + int rc; + + rc = MQTTSerialize_disconnect(mqtt_buffer, BUF_SIZE); + tx_len = rc; + rc = rc <= 0 ? -EINVAL : 0; + if (rc != 0) { + return -EINVAL; + } + + tcp_tx(ctx, mqtt_buffer, tx_len); + return 0; +} + +int mqtt_publish(struct net_context *ctx, char *topic, char *msg) +{ + MQTTString topic_str = MQTTString_initializer; + unsigned char pkt_type; + unsigned char dup; + unsigned short pkt_id; + size_t tx_len; + size_t rx_len; + int rc = 0; + + topic_str.cstring = topic; + rc = MQTTSerialize_publish(mqtt_buffer, BUF_SIZE, 0, 0, 0, 0, + topic_str, (unsigned char *)msg, + strlen(msg)); + tx_len = rc; + rc = rc <= 0 ? -EINVAL : 0; + if (rc != 0) { + return rc; + } + + tcp_tx(ctx, mqtt_buffer, tx_len); + + rc = tcp_rx(ctx, mqtt_buffer, &rx_len, BUF_SIZE); + if (rc != 0) { + return -EIO; + } + + rc = MQTTDeserialize_ack(&pkt_type, &dup, &pkt_id, mqtt_buffer, + rx_len); + return rc == 1 ? 0 : -EINVAL; +} + +int mqtt_pingreq(struct net_context *ctx) +{ + unsigned char pkt_type; + unsigned char dup; + unsigned short pkt_id; + size_t tx_len; + size_t rx_len; + int rc; + + rc = MQTTSerialize_pingreq(mqtt_buffer, BUF_SIZE); + + tx_len = rc; + rc = rc <= 0 ? -EINVAL : 0; + if (rc != 0) { + return -EINVAL; + } + + tcp_tx(ctx, mqtt_buffer, tx_len); + + rc = tcp_rx(ctx, mqtt_buffer, &rx_len, BUF_SIZE); + if (rc != 0) { + return -EIO; + } + + rc = MQTTDeserialize_ack(&pkt_type, &dup, &pkt_id, mqtt_buffer, + rx_len); + return rc == 1 ? 0 : -EINVAL; +} + +int mqtt_publish_read(struct net_context *ctx) +{ + MQTTString received_topic; + unsigned char dup; + unsigned char retained; + unsigned short msg_id; + unsigned char *msg; + size_t rx_len; + int qos; + int msg_len; + int rc; + + rc = tcp_rx(ctx, mqtt_buffer, &rx_len, BUF_SIZE); + if (rc != 0) { + return -EIO; + } + + rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msg_id, + &received_topic, &msg, &msg_len, + mqtt_buffer, rx_len); + rc = rc == 1 ? 0 : -EIO; + if (rc == 0) { + printf("\n\tReceived message: %.*s\n\n", msg_len, msg); + } + return rc; +} + + +int mqtt_subscribe(struct net_context *ctx, char *topic) +{ + MQTTString topic_str = MQTTString_initializer; + unsigned short submsg_id; + size_t rx_len; + size_t tx_len; + int msg_id = 1; + int req_qos = 0; + int sub_count; + int granted_qos; + int rc = 0; + + topic_str.cstring = topic; + rc = MQTTSerialize_subscribe(mqtt_buffer, BUF_SIZE, 0, msg_id, 1, + &topic_str, &req_qos); + tx_len = rc; + rc = rc <= 0 ? -EINVAL : 0; + if (rc != 0) { + return -EINVAL; + } + + tcp_tx(ctx, mqtt_buffer, tx_len); + + rc = tcp_rx(ctx, mqtt_buffer, &rx_len, BUF_SIZE); + if (rc != 0) { + return -EIO; + } + + rc = MQTTDeserialize_suback(&submsg_id, 1, &sub_count, &granted_qos, + mqtt_buffer, rx_len); + + return rc != 1 ? -EINVAL : granted_qos; +} + diff --git a/samples/net/paho_mqtt_client/src/mqtt.h b/samples/net/paho_mqtt_client/src/mqtt.h new file mode 100644 index 00000000000..dc2a0d3a9ef --- /dev/null +++ b/samples/net/paho_mqtt_client/src/mqtt.h @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2016 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _MQTT_H_ +#define _MQTT_H_ + +#include + +int mqtt_connect(struct net_context *ctx, char *client_name); +int mqtt_disconnect(struct net_context *ctx); +int mqtt_publish(struct net_context *ctx, char *topic, char *msg); +int mqtt_publish_read(struct net_context *ctx); +int mqtt_subscribe(struct net_context *ctx, char *topic); +int mqtt_pingreq(struct net_context *ctx); + +#endif diff --git a/samples/net/paho_mqtt_client/src/tcp.c b/samples/net/paho_mqtt_client/src/tcp.c new file mode 100644 index 00000000000..82125f30955 --- /dev/null +++ b/samples/net/paho_mqtt_client/src/tcp.c @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2016 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "config.h" +#include "tcp.h" + +#include +#include +#include + +#include +#include + +uip_ipaddr_t uip_hostaddr = { { CLIENT_IPADDR0, CLIENT_IPADDR1, + CLIENT_IPADDR2, CLIENT_IPADDR3 } }; + +uip_ipaddr_t uip_netmask = { { NETMASK0, NETMASK1, NETMASK2, NETMASK3 } }; + + +#define CLIENT_IP_ADDR { { { CLIENT_IPADDR0, CLIENT_IPADDR1, \ + CLIENT_IPADDR2, CLIENT_IPADDR3 } } } + +#define SERVER_IP_ADDR { { { SERVER_IPADDR0, SERVER_IPADDR1, \ + SERVER_IPADDR2, SERVER_IPADDR3 } } } + +#define INET_FAMILY AF_INET + + +int tcp_tx(struct net_context *ctx, uint8_t *buf, size_t size) +{ + int rc = 0; + uint8_t *ptr; + struct net_buf *nbuf = NULL; + + nbuf = ip_buf_get_tx(ctx); + if (nbuf == NULL) { + printf("[%s:%d] Unable to get buffer\n", __func__, __LINE__); + return -1; + } + + ptr = net_buf_add(nbuf, size); + memcpy(ptr, buf, size); + ip_buf_appdatalen(nbuf) = size; + + do { + rc = net_send(nbuf); + if (rc >= 0) { + ip_buf_unref(nbuf); + return 0; + } + switch (rc) { + case -EINPROGRESS: + printf("%s: no connection yet, try again\n", __func__); + fiber_sleep(TCP_RETRY_TIMEOUT); + break; + case -EAGAIN: + case -ECONNRESET: + printf("%s: no connection, try again later\n", __func__); + fiber_sleep(TCP_RETRY_TIMEOUT); + break; + default: + printf("%s: sending %d bytes failed\n", + __func__, size); + ip_buf_unref(nbuf); + return -EIO; + } + } while (1); + + return 0; +} + +int tcp_rx(struct net_context *ctx, uint8_t *buf, size_t *read_bytes, size_t size) +{ + int rc; + struct net_buf *nbuf; + + nbuf = net_receive(ctx, TCP_RX_TIMEOUT); + rc = -EIO; + if (nbuf != NULL) { + *read_bytes = ip_buf_appdatalen(nbuf); + if (*read_bytes > size) { + rc = -ENOMEM; + } else { + memcpy(buf, ip_buf_appdata(nbuf), *read_bytes); + rc = 0; + } + ip_buf_unref(nbuf); + } + return rc; +} + +int tcp_init(struct net_context **ctx) +{ + static struct in_addr server_addr = SERVER_IP_ADDR; + static struct in_addr client_addr = CLIENT_IP_ADDR; + static struct net_addr server; + static struct net_addr client; + + server.in_addr = server_addr; + server.family = AF_INET; + + client.in_addr = client_addr; + client.family = AF_INET; + + *ctx = net_context_get(IPPROTO_TCP, + &server, SERVER_MQTT_PORT, + &client, CLIENT_MQTT_PORT); + if (*ctx == NULL) { + printf("%s: Unable to get network context\n", __func__); + return -EINVAL; + } + return 0; +} diff --git a/samples/net/paho_mqtt_client/src/tcp.h b/samples/net/paho_mqtt_client/src/tcp.h new file mode 100644 index 00000000000..04b4a4ade02 --- /dev/null +++ b/samples/net/paho_mqtt_client/src/tcp.h @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2016 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TCP_H_ +#define _TCP_H_ + +#include + +int tcp_init(struct net_context **ctx); +int tcp_tx(struct net_context *ctx, uint8_t *buf, size_t size); +int tcp_rx(struct net_context *ctx, uint8_t *buf, size_t *read_bytes, size_t size); + + +#endif