samples/net: Add a NATS publisher application

This sample code demonstrates how to write a NATS publisher
application. Code is self-documented and a README file is also
included.

This sample code uses the netz API.

The NATS protocol specification is available at:
http://nats.io/documentation/internals/nats-protocol/

Origin: Original

Jira: ZEP-415
Jira: ZEP-573
Jira: ZEP-596

Change-Id: Ieb6e6e5f9bd48f34246b6d8c1bd9af3bbbe016bb
Signed-off-by: Flavio Santes <flavio.santes@intel.com>
This commit is contained in:
Flavio Santes 2016-07-27 19:03:24 -05:00 committed by Inaky Perez-Gonzalez
commit 78aa6ea050
9 changed files with 1251 additions and 0 deletions

View file

@ -0,0 +1,21 @@
#
# Copyright (c) 2016 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
KERNEL_TYPE = nano
BOARD = galileo
CONF_FILE = prj_galileo.conf
include $(ZEPHYR_BASE)/Makefile.inc

View file

@ -0,0 +1,108 @@
NATS publisher
This sample code demonstrates how to write a NATS publisher sample
application.
See: http://nats.io/documentation/internals/nats-protocol/ for more
information about the NATS protocol.
Requirements
------------
* Ethernet LAN for testing purposes.
* Galileo Gen 2 Development Board.
* USB-UART cable: CH340, FTDI 6-pin or equivalent for debugging.
* NATS server, subscriber and publisher applications. See:
http://nats.io/documentation/tutorials/gnatsd-install/
http://nats.io/documentation/clients/nats-clients/
https://github.com/nats-io/nats
Building instructions
---------------------
* Read src/main.c, change the IP addresses according to the LAN
environment.
* On a terminal window, type:
make pristine && make
* To load the binary into the Galileo Dev Board, follow the steps
indicated here:
https://www.zephyrproject.org/doc/board/galileo.html
Usage
-----
* Follow nats.io documentation to setup the server and client
applications.
* Start the NATS server: open a terminal window and type:
gnatsd -D
The terminal window must display something like this:
[2045] 2016/07/27 17:40:20.928840 [INF] Starting nats-server version 0.8.2
[2045] 2016/07/27 17:40:20.928913 [DBG] Go build version go1.5.4
[2045] 2016/07/27 17:40:20.928922 [INF] Listening for client connections on 0.0.0.0:4222
[2045] 2016/07/27 17:40:20.929035 [DBG] Server id is X6QdoKdcVHDeWQcVIpJcb5
[2045] 2016/07/27 17:40:20.929048 [INF] Server is ready
* Start the NATS subscriber client. Open another terminal window and type:
nats-sub -t sensors
This command will start the NATS subscriber client. The terminal
window will show the following text:
Listening on [sensors]
* Connect the USB-UART cable to the Galileo. Open a terminal and run:
screen /dev/ttyUSB0 115200
* Connect Galileo to the LAN, Turn on the board.
* On the screen terminal window, the following text will appear:
WARNING: no console will be available to OS
error: no suitable video mode found.
--------------------------------
NATS PUB: OK
--------------------------------
NATS PUB: OK
--------------------------------
NATS PUB: OK
--------------------------------
* The NATS subscriber terminal window will display:
2016/07/27 17:44:22 [#1] Received on [sensors]: 'DOOR3:OPEN'
2016/07/27 17:44:23 [#2] Received on [sensors]: 'DOOR3:OPEN'
2016/07/27 17:44:24 [#3] Received on [sensors]: 'DOOR3:OPEN'
* If the NATS server sends a PING message:
[30741] 2016/07/27 18:58:28.289650 [DBG] 192.168.1.110:1025 - cid:1 - Client Ping Timer
the application will print:
--------------------------------
Ping-pong message processed
NATS PUB: OK
--------------------------------
Final remarks
-------------
Full NATS support is not yet achieved. However, current publisher
and subscriber routines are enough to write basic NATS applications.

View file

@ -0,0 +1,22 @@
CONFIG_MINIMAL_LIBC_EXTENDED=y
CONFIG_STDOUT_CONSOLE=y
CONFIG_NETWORKING=y
CONFIG_ETHERNET=y
CONFIG_ETH_DW=y
CONFIG_NANO_TIMEOUTS=y
CONFIG_NETWORKING_WITH_TCP=y
CONFIG_NETWORKING_WITH_IPV4=y
CONFIG_NETWORKING_IPV6_NO_ND=y
CONFIG_IP_BUF_RX_SIZE=4
CONFIG_IP_BUF_TX_SIZE=4
CONFIG_NETZ=y
# Uncomment the following variables for debugging
#CONFIG_NETWORKING_WITH_LOGGING=y
#CONFIG_NETWORK_IP_STACK_DEBUG_NET_BUF=y
#CONFIG_NET_BUF_DEBUG=y

View file

@ -0,0 +1,19 @@
#
# Copyright (c) 2016 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
obj-y += main.o
obj-y += nats_pack.o
obj-y += nats_client.o

View file

@ -0,0 +1,124 @@
/*
* Copyright (c) 2016 Intel Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <zephyr.h>
#include <stdio.h>
#include <errno.h>
/* Network for Zephyr API - netz */
#include <netz.h>
#include "nats_client.h"
#define STACK_SIZE 1024
uint8_t stack[STACK_SIZE];
/* Change this value to modify the size of the tx and rx buffers */
#define BUF_SIZE 256
uint8_t tx_raw_buf[BUF_SIZE];
uint8_t rx_raw_buf[BUF_SIZE];
#define SLEEP_TIME 30
#define RC_STR(rc) (rc == 0 ? "OK" : "ERROR")
int handle_msg(struct nats_clapp_ctx_t *ctx);
void fiber(void)
{
/* tx_buf and rx_buf are application-level buffers */
struct app_buf_t tx_buf = APP_BUF_INIT(tx_raw_buf,
sizeof(tx_raw_buf), 0);
struct app_buf_t rx_buf = APP_BUF_INIT(rx_raw_buf,
sizeof(rx_raw_buf), 0);
/* netz context is initialized with default values. See netz.h */
struct netz_ctx_t netz_ctx = NETZ_CTX_INIT;
struct nats_cl_ctx_t nats_client = NATS_CL_INIT;
struct nats_clapp_ctx_t nats = NATS_CLAPP_INIT(&nats_client, &netz_ctx,
&tx_buf, &rx_buf);
int rc;
/* First we configure network related stuff */
netz_host_ipv4(&netz_ctx, 192, 168, 1, 110);
netz_netmask_ipv4(&netz_ctx, 255, 255, 255, 0);
/* NATS server address and port */
netz_remote_ipv4(&netz_ctx, 192, 168, 1, 10, 4222);
rc = nats_connect(&nats, "zephyr", 1);
if (rc != 0) {
printf("[%s:%d] Unable to connect to NATS server: %d\n",
__func__, __LINE__, rc);
return;
}
do {
printf("--------------------------------\n");
/* A better message handling routine must be implemented:
* if a PING message is received and not processed, the server
* will disconnect us.
*/
handle_msg(&nats);
fiber_sleep(SLEEP_TIME);
rc = nats_pub(&nats, "sensors", "INBOX.1", "DOOR3:OPEN");
printf("NATS PUB: %s\n", RC_STR(rc));
fiber_sleep(SLEEP_TIME);
} while (1);
}
void main(void)
{
net_init();
task_fiber_start(stack, STACK_SIZE, (nano_fiber_entry_t)fiber,
0, 0, 7, 0);
}
int handle_msg(struct nats_clapp_ctx_t *ctx)
{
int rc;
rc = netz_rx(ctx->netz_ctx, ctx->rx_buf);
if (rc != 0) {
return -EIO;
}
/* ping from the server */
rc = nats_unpack_ping(ctx->rx_buf);
if (rc == 0) {
rc = nats_pack_pong(ctx->tx_buf);
if (rc != 0) {
return -EINVAL;
}
rc = netz_tx(ctx->netz_ctx, ctx->tx_buf);
if (rc != 0) {
return -EIO;
}
printf("Ping-pong message processed\n");
return 0;
}
return rc;
}

View file

@ -0,0 +1,150 @@
/*
* Copyright (c) 2016 Intel Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "nats_client.h"
#include "nats_pack.h"
#include <errno.h>
int nats_connect(struct nats_clapp_ctx_t *ctx, char *client_name, int verbose)
{
int rc;
ctx->nats->name = client_name;
ctx->nats->verbose = verbose ? 1 : 0;
rc = netz_tcp(ctx->netz_ctx);
if (rc != 0) {
return -EIO;
}
rc = netz_rx(ctx->netz_ctx, ctx->rx_buf);
if (rc != 0) {
return -EIO;
}
rc = nats_unpack_info(ctx->rx_buf);
if (rc != 0) {
return -EINVAL;
}
rc = nats_pack_quickcon(ctx->tx_buf, client_name, verbose);
if (rc != 0) {
return -EINVAL;
}
rc = netz_tx(ctx->netz_ctx, ctx->tx_buf);
if (rc != 0) {
return -EIO;
}
return nats_read_ok(ctx);
}
int nats_pub(struct nats_clapp_ctx_t *ctx, char *subject, char *reply_to,
char *payload)
{
int rc;
rc = nats_pack_pub(ctx->tx_buf, subject, reply_to, payload);
if (rc != 0) {
return -EINVAL;
}
rc = netz_tx(ctx->netz_ctx, ctx->tx_buf);
if (rc != 0) {
return -EIO;
}
return nats_read_ok(ctx);
}
int nats_sub(struct nats_clapp_ctx_t *ctx, char *subject, char *queue_grp,
char *sid)
{
int rc;
rc = nats_pack_sub(ctx->tx_buf, subject, queue_grp, sid);
if (rc != 0) {
return -EINVAL;
}
rc = netz_tx(ctx->netz_ctx, ctx->tx_buf);
if (rc != 0) {
return rc;
}
return nats_read_ok(ctx);
}
int nats_unsub(struct nats_clapp_ctx_t *ctx, char *sid, int max_msgs)
{
int rc;
rc = nats_pack_unsub(ctx->tx_buf, sid, max_msgs);
rc = netz_tx(ctx->netz_ctx, ctx->tx_buf);
if (rc != 0) {
return rc;
}
return nats_read_ok(ctx);
}
int nats_read_ok(struct nats_clapp_ctx_t *ctx)
{
int rc;
if (ctx->nats->verbose) {
rc = netz_rx(ctx->netz_ctx, ctx->rx_buf);
if (rc != 0) {
return -EIO;
}
rc = nats_find_msg(ctx->rx_buf, "+OK");
if (rc != 0) {
return -EINVAL;
}
}
return 0;
}
int nats_ping_pong(struct nats_clapp_ctx_t *ctx)
{
int rc;
rc = nats_pack_ping(ctx->tx_buf);
if (rc != 0) {
return -EINVAL;
}
rc = netz_tx(ctx->netz_ctx, ctx->tx_buf);
if (rc != 0) {
return -EIO;
}
rc = netz_rx(ctx->netz_ctx, ctx->rx_buf);
if (rc != 0) {
return -EIO;
}
rc = nats_unpack_pong(ctx->rx_buf);
if (rc != 0) {
return -EINVAL;
}
return 0;
}

View file

@ -0,0 +1,106 @@
/*
* Copyright (c) 2016 Intel Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef _NATS_CLIENT_H_
#define _NATS_CLIENT_H_
#include <netz.h>
#include "nats_pack.h"
/**
* @brief nats_clapp_ctx_t NATS Client Application Context structure
*/
struct nats_clapp_ctx_t {
struct nats_cl_ctx_t *nats;
struct netz_ctx_t *netz_ctx;
struct app_buf_t *tx_buf;
struct app_buf_t *rx_buf;
};
/**
* @brief NATS_CLAPP_INIT Context initializer
*/
#define NATS_CLAPP_INIT(cl, netz, tx, rx) { .nats = cl, \
.netz_ctx = netz, \
.tx_buf = tx, \
.rx_buf = rx}
/**
* @brief nats_connect Connects to a NATS server
* @param [in] ctx NATS Client Application Context structure
* @param [in] client_name Client idientifier
* @param [in] verbose Verbose mode
* @return 0 on success
* @return -EIO on network error
* @return -EINVAL
*/
int nats_connect(struct nats_clapp_ctx_t *ctx, char *client_name, int verbose);
/**
* @brief nats_pub Sends the NATS PUB message
* @param [in] ctx NATS Client Application Context structure
* @param [in] subject Message subject
* @param [in] reply_to Reply-to field
* @param [in] payload Message payload
* @return 0 on success
* @return -EIO on network error
* @return -EINVAL
*/
int nats_pub(struct nats_clapp_ctx_t *ctx, char *subject, char *reply_to,
char *payload);
/**
* @brief nats_sub Sends the NATS SUB message
* @param [in] ctx NATS Client Application Context structure
* @param [in] subject Subscription subject
* @param [in] queue_grp Queue group
* @param [in] sid Subscription Identifier
* @return 0 on success
* @return -EIO on network error
* @return -EINVAL
*/
int nats_sub(struct nats_clapp_ctx_t *ctx, char *subject, char *queue_grp,
char *sid);
/**
* @brief nats_unsub Send the NATS UNSUB message
* @param [in] ctx NATS Client Application Context structure
* @param [in] sid Subscription Identifier
* @param [in] max_msgs Max messages field
* @return
*/
int nats_unsub(struct nats_clapp_ctx_t *ctx, char *sid, int max_msgs);
/**
* @brief nats_read_ok Reads the +OK message
* @param [in] ctx NATS Client Application Context structure
* @return 0 on success
* @return -EIO on error
* @return -EINVAL
*/
int nats_read_ok(struct nats_clapp_ctx_t *ctx);
/**
* @brief nats_ping_pong Sends a NATS PING msg and waits for
* a NATS PONG msg from the NATS server
* @param [in] ctx NATS Client Application Context structure
* @return 0 on success
* @return -EIO on network error
* @return -EINVAL
*/
int nats_ping_pong(struct nats_clapp_ctx_t *ctx);
#endif

View file

@ -0,0 +1,454 @@
/*
* Copyright (c) 2016 Intel Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "nats_pack.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include <errno.h>
/* NATS require to send the language and version.
* So we send the GCC version. It could be any string.
*/
#ifdef __GNUC__
#define CLANG "GCC"
#define CLANG_VERSION __VERSION__
#else
#define CLANG "unknown"
#define CLANG_VERSION "unknown"
#endif
#define NATS_STR_TRUE_FALSE(v) (v) != 0 ? "true" : "false"
#define NATS_STR_NULL(str) ((str) == NULL ? "" : (str))
#define NATS_STR_COMMA(comma) (comma) ? "," : ""
#define min(a, b) ((a) < (b) ? (a) : (b))
#define PING_MSG "PING\r\n"
#define PONG_MSG "PONG\r\n"
#define PING_PONG_MSG_LEN 6
#define MSG_MSG_MIN_SIZE 13
static int nats_isblank(int c)
{
return c == ' ' || c == '\t';
}
static int nats_unpack(struct app_buf_t *buf, enum nats_msg_type type);
static int nats_pack(struct app_buf_t *buf, enum nats_msg_type type);
int nats_pack_info(struct app_buf_t *buf, char *server_id, char *version,
char *go, char *host, int port, int auth_req, int ssl_req,
int max_payload)
{
size_t size;
size_t len;
char *str;
int comma;
str = (char *)buf->buf;
size = buf->size;
comma = 0;
len = snprintf(str, size, "INFO {");
if (server_id) {
len += snprintf(str + len, size - len, "\"server_id\":\"%s\"",
server_id);
comma = 1;
}
if (version) {
len += snprintf(str + len, size - len, "%s\"version\":\"%s\"",
NATS_STR_COMMA(comma), version);
comma = 1;
}
if (go) {
len += snprintf(str + len, size - len, "%s\"go\":\"%s\"",
NATS_STR_COMMA(comma), go);
comma = 1;
}
if (host) {
len += snprintf(str + len, size - len, "%s\"host\":\"%s\"",
NATS_STR_COMMA(comma), host);
comma = 1;
}
if (port) {
len += snprintf(str + len, size - len, "%s\"port\":%d",
NATS_STR_COMMA(comma), port);
comma = 1;
}
len += snprintf(str + len, size - len, "%s\"auth_required\":%s",
NATS_STR_COMMA(comma), auth_req ? "true" : "false");
len += snprintf(str + len, size - len, ",\"ssl_required\":%s",
ssl_req ? "true" : "false");
if (max_payload) {
len += snprintf(str + len, size - len, ",\"max_payload\":%d",
max_payload);
}
len += snprintf(str + len, size, "}\r\n");
buf->length = len;
if (buf->length) {
return 0;
}
return -EINVAL;
}
int nats_pack_connect(struct app_buf_t *buf, int verbose, int pedantic,
int ssl_req, char *auth_token, char *user, char *pass,
char *name, char *lang, char *version)
{
size_t size;
size_t len;
char *str;
str = (char *)buf->buf;
size = buf->size;
len = snprintf(str, size, "CONNECT {\"verbose\":%s,\"pedantic\":%s,"
"\"ssl_required\":%s",
NATS_STR_TRUE_FALSE(verbose),
NATS_STR_TRUE_FALSE(pedantic),
NATS_STR_TRUE_FALSE(ssl_req));
if (auth_token) {
len += snprintf(str + len, size - len,
",\"auth_token\":\"%s\"", auth_token);
}
if (user) {
len += snprintf(str + len, size - len, ",\"user\":\"%s\"",
user);
}
if (pass) {
len += snprintf(str + len, size - len, ",\"pass\":\"%s\"",
pass);
}
if (name) {
len += snprintf(str + len, size - len, ",\"name\":\"%s\"",
name);
}
len += snprintf(str + len, size - len,
",\"lang\":\"%s\",\"version\":\"%s\"}\r\n",
lang, version);
buf->length = len;
if (buf->length) {
return 0;
}
return -EINVAL;
}
int nats_pack_quickcon(struct app_buf_t *buf, char *name, int verbose)
{
return nats_pack_connect(buf, verbose, 1, 0, NULL, NULL, NULL, name,
CLANG, CLANG_VERSION);
}
int nats_pack_pub(struct app_buf_t *buf, char *subject, char *reply_to,
char *payload)
{
size_t size;
char *str;
str = (char *)buf->buf;
size = buf->size;
buf->length = snprintf(str, size, "PUB %s %s %d\r\n%s\r\n",
subject, NATS_STR_NULL(reply_to),
(int)strlen(NATS_STR_NULL(payload)),
NATS_STR_NULL(payload));
if (buf->length) {
return 0;
}
return -EINVAL;
}
int nats_pack_sub(struct app_buf_t *buf, char *subject, char *queue_grp,
char *sid)
{
size_t size;
char *str;
str = (char *)buf->buf;
size = buf->size;
buf->length = snprintf(str, size, "SUB %s %s %s\r\n", subject,
NATS_STR_NULL(queue_grp), sid);
if (buf->length) {
return 0;
}
return -EINVAL;
}
int nats_pack_unsub(struct app_buf_t *buf, char *sid, int max_msgs)
{
size_t size;
char *str;
str = (char *)buf->buf;
size = buf->size;
if (max_msgs > 0) {
buf->length = snprintf(str, size, "UNSUB %s %d\r\n", sid,
max_msgs);
} else {
buf->length = snprintf(str, size, "UNSUB %s\r\n", sid);
}
if (buf->length) {
return 0;
}
return -EINVAL;
}
int nats_pack_msg(struct app_buf_t *buf, char *subject, char *sid,
char *reply_to, char *payload)
{
size_t size;
char *str;
str = (char *)buf->buf;
size = buf->size;
buf->length = snprintf(str, size, "MSG %s %s %s %d\r\n%s\r\n", subject,
sid, NATS_STR_NULL(reply_to),
(int)strlen(NATS_STR_NULL(payload)),
NATS_STR_NULL(payload));
if (buf->length) {
return 0;
}
return -EINVAL;
}
int nats_unpack_msg(struct app_buf_t *buf,
int *subject_start, int *subject_len,
int *sid_start, int *sid_len,
int *reply_start, int *reply_len,
int *payload_start, int *payload_len)
{
int payload_len_start;
int payload_size;
int size;
int i;
char *str;
str = (char *)buf->buf;
size = (int)buf->length;
if (size < MSG_MSG_MIN_SIZE) {
return -EINVAL;
}
if (str[0] != 'M' || str[1] != 'S' || str[2] != 'G') {
return -EINVAL;
}
/* subject */
for (i = 3; i < size && nats_isblank(str[i]); i++) {
}
if (i == size) {
return -EINVAL;
}
*subject_start = i;
for (; i < size && !nats_isblank(str[i]); i++) {
}
if (i == size) {
return -EINVAL;
}
*subject_len = i - *subject_start;
/* sid */
for (; i < size && nats_isblank(str[i]); i++) {
}
if (i == size) {
return -EINVAL;
}
*sid_start = i;
for (; i < size && !nats_isblank(str[i]); i++) {
}
if (i == size) {
return -EINVAL;
}
*sid_len = i - *sid_start;
/* payload */
if (str[size-1] != '\n' || str[size-2] != '\r') {
return -EINVAL;
}
for (i = size - 3; i >= 0 && str[i] != '\n'; i--) {
}
if (i == 0) {
return -EINVAL;
}
if (str[i] != '\n' || str[i - 1] != '\r') {
return -EINVAL;
}
*payload_start = i + 1;
*payload_len = size - 2 - *payload_start;
/* payload size */
i -= 2;
for (; i >= 0 && isdigit(str[i]); i--) {
}
if (i <= 0 || !nats_isblank(str[i])) {
return -EINVAL;
}
payload_len_start = i + 1;
payload_size = atoi(str + payload_len_start);
if (payload_size != *payload_len) {
return -EINVAL;
}
/* find reply-to after sid and payload size were found */
i = *sid_start + *sid_len;
for (; i < size && nats_isblank(str[i]); i++) {
}
if (i < payload_len_start) {
*reply_start = i;
for (; i < size && !nats_isblank(str[i]); i++) {
}
*reply_len = i - *reply_start;
} else {
*reply_start = *reply_len = -1;
}
return 0;
}
int nats_unpack_info(struct app_buf_t *buf)
{
int rc;
rc = nats_find_msg(buf, "INFO");
/* TODO: evaluate all INFO options */
return rc;
}
static int nats_pack(struct app_buf_t *buf, enum nats_msg_type type)
{
char *str;
size_t size;
str = (char *)buf->buf;
size = buf->size;
switch (type) {
case NATS_MSG_PING:
buf->length = snprintf(str, size, "PING\r\n");
break;
case NATS_MSG_PONG:
buf->length = snprintf(str, size, "PONG\r\n");
break;
default:
return -EINVAL;
}
return 0;
}
int nats_pack_ping(struct app_buf_t *buf)
{
return nats_pack(buf, NATS_MSG_PING);
}
int nats_pack_pong(struct app_buf_t *buf)
{
return nats_pack(buf, NATS_MSG_PONG);
}
static int nats_unpack(struct app_buf_t *buf, enum nats_msg_type type)
{
char *str;
size_t len;
str = (char *)buf->buf;
switch (type) {
case NATS_MSG_PING:
len = min(buf->length, PING_PONG_MSG_LEN);
if (strncmp(str, PING_MSG, len) != 0) {
return -EINVAL;
}
break;
case NATS_MSG_PONG:
len = min(buf->length, PING_PONG_MSG_LEN);
if (strncmp(str, PONG_MSG, len) != 0) {
return -EINVAL;
}
break;
default:
return -EINVAL;
}
return 0;
}
int nats_unpack_ping(struct app_buf_t *buf)
{
return nats_unpack(buf, NATS_MSG_PING);
}
int nats_unpack_pong(struct app_buf_t *buf)
{
return nats_unpack(buf, NATS_MSG_PONG);
}
int nats_find_msg(struct app_buf_t *buf, char *str)
{
size_t size;
char *_buf;
int len;
int i;
_buf = buf->buf;
size = buf->length;
i = 0;
do {
} while (i < size && (isalpha(_buf[i]) == 0 && _buf[i] != '+') && ++i);
len = strlen(str);
if (i + len >= size || strncmp(_buf + i, str, len) != 0) {
return -EINVAL;
}
return 0;
}

View file

@ -0,0 +1,247 @@
/*
* Copyright (c) 2016 Intel Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef _NATS_PKT_H_
#define _NATS_PKT_H_
#include <app_buf.h>
#include <stdint.h>
#include <stddef.h>
/**
* @brief nats_msg_type NATS message type enum.
*
* @details This sort is arbitrary and is not part of the
* standard.
*/
enum nats_msg_type {
NATS_MSG_INFO = 1,
NATS_MSG_CONNECT,
NATS_MSG_PUB,
NATS_MSG_SUB,
NATS_MSG_UNSUB,
NATS_MSG_MSG,
NATS_MSG_PING,
NATS_MSG_PONG,
NATS_MSG_OK,
NATS_MSG_ERR
};
/**
* @brief nats_cl_ctx_t NATS client context structure
*/
struct nats_cl_ctx_t {
int verbose;
int pedantic;
int ssl_req;
char *user;
char *pass;
char *name;
};
/**
* @brief NATS_CL_INIT NATS client context initializer
*/
#define NATS_CL_INIT { .verbose = 1, .pedantic = 0, .ssl_req = 0, \
.user = NULL, .pass = NULL, .name = NULL }
/**
* @brief nats_pack_info Packs the NATS INFO message
* @param [out] buf app_buf used to store the resultant string
* @param [in] server_id NATS server identifier, may be NULL
* @param [in] version NATS server software version, may be NULL
* @param [in] go golang version used to build the NATS server,
* may be NULL
* @param [in] host NATS server IP address, may be NULL
* @param [in] port NATS server port number, may be NULL
* @param [in] auth_req If 0 no client authentication required
* @param [in] ssl_req If 0 no ssl channel required
* @param [in] max_payload Maximum payload size the server will accept
* from the client
* @return 0 on success
* @return -EINVAL on error
*/
int nats_pack_info(struct app_buf_t *buf, char *server_id, char *version,
char *go, char *host, int port, int auth_req, int ssl_req,
int max_payload);
/**
* @brief nats_pack_connect Packs the NATS CONNECT message
* @param [out] buf app_buf used to store the resultant string
* @param [in] verbose Turns +OK protocol ACK
* @param [in] pedantic Turns on strict checking
* @param [in] ssl_req Indicates if the user requires a SSL connection
* @param [in] auth_token Client auth token, may be NULL
* @param [in] user Connection username, may be NULL
* @param [in] pass Connection password, may be NULL
* @param [in] name Optional client name, may be NULL
* @param [in] lang Implementation language of the client,
* may be NULL
* @param [in] version Client's version, may be NULL
* @return 0 on success
* @return -EINVAL on error
*/
int nats_pack_connect(struct app_buf_t *buf, int verbose, int pedantic,
int ssl_req, char *auth_token, char *user, char *pass,
char *name, char *lang, char *version);
/**
* @brief nats_pack_quickcon Packs the NATS CONNECT message
*
* @details This function calls nats_pack_connect with all
* the parameters set to 0 or NULL but name.
*
* @param [out] buf app_buf used to store the resultant string
* @param [in] name Optional client name, may be NULL
* @param [in] verbose 0 for quiet, anything else for verbose
* communication
* @return 0 on success
* @return -EINVAL on error
*/
int nats_pack_quickcon(struct app_buf_t *buf, char *name, int verbose);
/**
* @brief nats_pack_pub Packs the NATS PUB message
* @param [out] buf app_buf used to store the resultant string
* @param [in] subject Message subject
* @param [in] reply_to Replay inbox subject, may be NULL
* @param payload C-string, may be NULL
* @return 0 on success
* @return -EINVAL on error
*/
int nats_pack_pub(struct app_buf_t *buf, char *subject, char *reply_to,
char *payload);
/**
* @brief nats_pack_sub Packs the NATS SUB message
* @param [out] buf app_buf used to store the resultant string
* @param [in] subject Subject name to subscribe to
* @param [in] queue_grp Queue group to join to, may be NULL
* @param [in] sid Subscription Identifier
* @return 0 on success
* @return -EINVAL on error
*/
int nats_pack_sub(struct app_buf_t *buf, char *subject, char *queue_grp,
char *sid);
/**
* @brief nats_pack_unsub Packs the NATS UNSUB message
* @param buf app_buf used to store the resultant string
* @param sid Subscription Identifier
* @param max_msgs Max number of messages
* @return 0 on success
* @return -EINVAL on error
*/
int nats_pack_unsub(struct app_buf_t *buf, char *sid, int max_msgs);
/**
* @brief nats_pack_msg Packs the NATS MSG message
* @param buf app_buf used to store the resultant string
* @param subject Subject name to subscribe to
* @param sid Subscription Identifier
* @param reply_to Inbox subject for this subscription
* @param payload The message payload data
* @return 0 on success
* @return -EINVAL on error
*/
int nats_pack_msg(struct app_buf_t *buf, char *subject, char *sid,
char *reply_to, char *payload);
/**
* @brief nats_unpack_msg Unpacks the NATS MSG message
*
* @details This function unpacks a NATS MSG message.
* Output parameters indicate where the field
* was found inside the MSG and the length of
* the field.
* If the reply-to field was found, reply_start
* will be greater than 0, otherwise will be set
* to -1.
*
* @param [in] buf Buffer containing a C-string with the
* NATS MSG message
* @param [out] subject_start Indicates where the subject field starts
* @param [out] subject_len Subject field length
* @param [out] sid_start Indicates where the sid field starts
* @param [out] sid_len sid field length
* @param [out] reply_start Indicates where the reply-to field starts
* @param [out] reply_len reply-to field length
* @param [out] payload_start Indicates where the payload starts
* @param [out] payload_len payload field length
* @return 0 on success
* @return -EINVAL if a malformed MSG was received
*/
int nats_unpack_msg(struct app_buf_t *buf,
int *subject_start, int *subject_len,
int *sid_start, int *sid_len,
int *reply_start, int *reply_len,
int *payload_start, int *payload_len);
/**
* @brief nats_unpack_info Unpacks the NATS INFO message
*
* @details This function unpacks de INFO message.
* INFO msg parser is WIP. So, support
* for this message is not yet finished.
* WARNING: Function signature may change.
*
* @param [in] buf Buffer containing a C-string with the
* NATS INFO message
* @return 0 on success
* @return -EINVAL on error
*/
int nats_unpack_info(struct app_buf_t *buf);
/**
* @brief nats_pack_ping Packs the NATS PING message
* @param [out] buf app_buf used to store the resultant string
* @return 0 on success
* @return -EINVAL on error
*/
int nats_pack_ping(struct app_buf_t *buf);
/**
* @brief nats_unpack_ping Unacks the NATS PING message
* @param [in] buf Buffer containing a C-string with the
* NATS PING message
* @return 0 on success
* @return -EINVAL on error
*/
int nats_unpack_ping(struct app_buf_t *buf);
/**
* @brief nats_pack_ping Packs the NATS PONG message
* @param [out] buf app_buf used to store the resultant string
* @return 0 on success
* @return -EINVAL on error
*/
int nats_pack_pong(struct app_buf_t *buf);
/**
* @brief nats_unpack_pong Unacks the NATS PONG message
* @param [in] buf Buffer containing a C-string with the
* NATS PONG message
* @return 0 on success
* @return -EINVAL on error
*/
int nats_unpack_pong(struct app_buf_t *buf);
int nats_find_msg(struct app_buf_t *buf, char *str);
#endif