samples/net: Add a NATS subscriber application

This sample code demonstrates how to write a NATS subscriber
application. Code is self-documented and a README file is also
included.

This sample code uses the netz API.

The NATS protocol specification is available at:
http://nats.io/documentation/internals/nats-protocol/

Origin: Original

Jira: ZEP-415
Jira: ZEP-573
Jira: ZEP-597

Change-Id: I4b7e56bb6c2a934012b33039ea5b313b14f3b4c5
Signed-off-by: Flavio Santes <flavio.santes@intel.com>
This commit is contained in:
Flavio Santes 2016-07-27 19:15:12 -05:00 committed by Inaky Perez-Gonzalez
commit 4172b3afab
6 changed files with 324 additions and 0 deletions

View file

@ -0,0 +1 @@
obj-y += src/

View file

@ -0,0 +1,23 @@
#
# Copyright (c) 2016 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
KERNEL_TYPE = nano
BOARD = galileo
CONF_FILE = prj_galileo.conf
SOURCE_DIR = $(ZEPHYR_BASE)/samples/net/nats_clients/subscriber
include $(ZEPHYR_BASE)/Makefile.inc

View file

@ -0,0 +1,94 @@
NATS subscriber
This sample code demonstrates how to write a NATS subscriber sample
application.
See: http://nats.io/documentation/internals/nats-protocol/ for more
information about the NATS protocol.
Requirements
------------
* Ethernet LAN for testing purposes.
* Galileo Gen 2 Development Board.
* USB-UART cable: CH340, FTDI 6-pin or equivalent for debugging.
* NATS server, subscriber and publisher applications. See:
http://nats.io/documentation/tutorials/gnatsd-install/
http://nats.io/documentation/clients/nats-clients/
https://github.com/nats-io/nats
Building instructions
---------------------
* Read src/main.c, change the IP addresses according to the LAN
environment.
* On a terminal window, type:
make pristine && make
* To load the binary into the Galileo Dev Board, follow the steps
indicated here:
https://www.zephyrproject.org/doc/board/galileo.html
Usage
-----
* Follow nats.io documentation to setup the server and client
applications.
* Start the NATS server: open a terminal window and type:
gnatsd -D
The terminal window must display something like this:
[2045] 2016/07/27 17:40:20.928840 [INF] Starting nats-server version 0.8.2
[2045] 2016/07/27 17:40:20.928913 [DBG] Go build version go1.5.4
[2045] 2016/07/27 17:40:20.928922 [INF] Listening for client connections on 0.0.0.0:4222
[2045] 2016/07/27 17:40:20.929035 [DBG] Server id is X6QdoKdcVHDeWQcVIpJcb5
[2045] 2016/07/27 17:40:20.929048 [INF] Server is ready
* Connect the USB-UART cable to the Galileo. Open a terminal and run:
screen /dev/ttyUSB0 115200
* Connect Galileo to the LAN, Turn on the board.
* On the screen terminal window, the following text will appear:
WARNING: no console will be available to OS
error: no suitable video mode found.
--------------------------------
--------------------------------
* Run the NATS publisher client. Open a terminal window and type:
nats-pub sensors DOOR89:CLOSED
The terminal window will display:
Published [sensors] : 'DOOR89:CLOSED'
* On the screen terminal window, the following message will appear:
--------------------------------
Subject: [7] sensors
Sid: [2] z1
Reply-to: not found
Payload: [13] DOOR89:CLOSED
--------------------------------
Final remarks
-------------
Full NATS support is not yet achieved. However, current publisher
and subscriber routines are enough to write basic NATS applications.

View file

@ -0,0 +1,22 @@
CONFIG_MINIMAL_LIBC_EXTENDED=y
CONFIG_STDOUT_CONSOLE=y
CONFIG_NETWORKING=y
CONFIG_ETHERNET=y
CONFIG_ETH_DW=y
CONFIG_NANO_TIMEOUTS=y
CONFIG_NETWORKING_WITH_TCP=y
CONFIG_NETWORKING_WITH_IPV4=y
CONFIG_NETWORKING_IPV6_NO_ND=y
CONFIG_IP_BUF_RX_SIZE=4
CONFIG_IP_BUF_TX_SIZE=4
CONFIG_NETZ=y
# Uncomment the following variables for debugging
#CONFIG_NETWORKING_WITH_LOGGING=y
#CONFIG_NETWORK_IP_STACK_DEBUG_NET_BUF=y
#CONFIG_NET_BUF_DEBUG=y

View file

@ -0,0 +1,21 @@
#
# Copyright (c) 2016 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
ccflags-y += -I$(SOURCE_DIR)/../publisher/src
obj-y += main.o
obj-y += ../../publisher/src/nats_pack.o
obj-y += ../../publisher/src/nats_client.o

View file

@ -0,0 +1,163 @@
/*
* Copyright (c) 2016 Intel Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <zephyr.h>
#include <stdio.h>
#include <errno.h>
/* Network for Zephyr API - netz */
#include <netz.h>
#include "nats_client.h"
#define STACK_SIZE 1024
uint8_t stack[STACK_SIZE];
/* Change this value to modify the size of the tx and rx buffers */
#define BUF_SIZE 256
uint8_t tx_raw_buf[BUF_SIZE];
uint8_t rx_raw_buf[BUF_SIZE];
#define SLEEP_TIME 30
#define RC_STR(rc) (rc == 0 ? "OK" : "ERROR")
int handle_msg(struct nats_clapp_ctx_t *ctx);
/* Unpacks and prints the NATS MSG message */
int unpack_msg(struct app_buf_t *buf);
void fiber(void)
{
/* tx_buf and rx_buf are application-level buffers */
struct app_buf_t tx_buf = APP_BUF_INIT(tx_raw_buf,
sizeof(tx_raw_buf), 0);
struct app_buf_t rx_buf = APP_BUF_INIT(rx_raw_buf,
sizeof(rx_raw_buf), 0);
/* netz context is initialized with default values. See netz.h */
struct netz_ctx_t netz_ctx = NETZ_CTX_INIT;
struct nats_cl_ctx_t nats_client = NATS_CL_INIT;
struct nats_clapp_ctx_t nats = NATS_CLAPP_INIT(&nats_client, &netz_ctx,
&tx_buf, &rx_buf);
int rc;
/* First we configure network related stuff */
netz_host_ipv4(&netz_ctx, 192, 168, 1, 110);
netz_netmask_ipv4(&netz_ctx, 255, 255, 255, 0);
/* NATS server address and port */
netz_remote_ipv4(&netz_ctx, 192, 168, 1, 10, 4222);
rc = nats_connect(&nats, "zephyr", 1);
if (rc != 0) {
printf("[%s:%d] Unable to connect to NATS server: %d\n",
__func__, __LINE__, rc);
return;
}
rc = nats_sub(&nats, "sensors", NULL, "z1");
if (rc != 0) {
printf("[%s:%d] Unable to subscribe: %d\n",
__func__, __LINE__, rc);
return;
}
do {
printf("--------------------------------\n");
handle_msg(&nats);
fiber_sleep(SLEEP_TIME);
} while (1);
}
void main(void)
{
net_init();
task_fiber_start(stack, STACK_SIZE, (nano_fiber_entry_t)fiber,
0, 0, 7, 0);
}
int handle_msg(struct nats_clapp_ctx_t *ctx)
{
int rc;
rc = netz_rx(ctx->netz_ctx, ctx->rx_buf);
if (rc != 0) {
return -EIO;
}
/* ping pong */
rc = nats_unpack_ping(ctx->rx_buf);
if (rc == 0) {
rc = nats_pack_pong(ctx->tx_buf);
if (rc != 0) {
return -EINVAL;
}
rc = netz_tx(ctx->netz_ctx, ctx->tx_buf);
if (rc != 0) {
return -EIO;
}
printf("Ping-pong message processed\n");
return 0;
}
/* msg */
rc = unpack_msg(ctx->rx_buf);
return rc;
}
int unpack_msg(struct app_buf_t *buf)
{
char *str;
int subject_start;
int subject_len;
int sid_start;
int sid_len;
int reply_start;
int reply_len;
int payload_start;
int payload_len;
int rc;
str = (char *)buf->buf;
rc = nats_unpack_msg(buf, &subject_start, &subject_len,
&sid_start, &sid_len, &reply_start, &reply_len,
&payload_start, &payload_len);
if (rc != 0) {
return rc;
}
printf("Subject: [%d] %.*s\n", subject_len, subject_len,
str + subject_start);
printf("Sid: [%d] %.*s\n", sid_len, sid_len, str + sid_start);
if (reply_start > 0) {
printf("Reply-to: [%d] %.*s\n", reply_len,
reply_len, str + reply_start);
} else {
printf("Reply-to: not found\n");
}
printf("Payload: [%d] %.*s\n", payload_len, payload_len,
str + payload_start);
return 0;
}