net: remove sample implementing NATS

A sample implementing NATS protocol that is not part of the Zephyr
networking subsystem. The implementation is not maintained and only
served as a proof of concept.

Related to #20017

Signed-off-by: Anas Nashif <anas.nashif@intel.com>
This commit is contained in:
Anas Nashif 2020-01-22 09:35:53 -05:00
commit 91920268e0
7 changed files with 0 additions and 1210 deletions

View file

@ -1,8 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
cmake_minimum_required(VERSION 3.13.1)
include($ENV{ZEPHYR_BASE}/cmake/app/boilerplate.cmake NO_POLICY_SCOPE)
project(nats)
FILE(GLOB app_sources src/*.c)
target_sources(app PRIVATE ${app_sources})

View file

@ -1,163 +0,0 @@
.. _NATS_Client_Sample:
NATS Client Implementation Sample
#################################
Overview
********
`NATS <http://nats.io/documentation/internals/nats-protocol/>`__ is a
publisher/subscriber protocol implemented on top of TCP. It is specified in
`NATS Protocol documentation <http://nats.io/documentation/internals/nats-protocol/>`__,
and this is a sample implementation for Zephyr using the new IP stack.
The API is loosely based off of the `Golang API
<https://github.com/nats-io/go-nats>`__.
With this sample, it's possible to subscribe/unsubscribe to a given subject,
and be notified of changes asynchronously. In order to conserve resources,
the implementation does not keep track of subscribed subjects; that
must be performed by the application itself, so it can ignore unknown/undesired
subjects.
TLS is not supported yet, although basic authentication is. The client will indicate
if it supports username/password if a certain callback is set in the ``struct
nats``. This callback will then be called, and the user must copy the
username/password to the supplied user/pass buffers.
Content might be also published for a given subject.
The sample application lets one observe the subject "led0", and turn it
"on", "off", or "toggle" its value. Changing the value will, if supported,
act on a status LED on the development board. The new status will be
published.
Also worth noting is that most of the networking and GPIO boilerplate has
been shamelessly copied from the IRC bot example. (Curiously, both
protocols are similar.)
Requirements
************
To test the sample, build the Zephyr application for your platform. This
has only been tested with the QEMU emulator as provided by the Zephyr SDK,
but it should work with other supported hardware as long as they have enough
memory, the network stack has TCP enabled, and the connectivity hardware is
supported.
As far as the software goes, this has been tested with the official `gnatsd
<https://github.com/nats-io/gnatsd>`__ for the server, and the official
`go-nats <https://github.com/nats-io/go-nats>`__ client library. Both the
server and clients were set up as per instructions found in their respective
``README.md`` files.
The client was a one-off test that is basically the same code provided in
the `Basic Usage
<https://github.com/nats-io/go-nats/blob/e6bb81b5a5f37ef7bf364bb6276e13813086c6ee/README.md#basic-usage>`__
section as found in the ``go-nats`` README file, however, subscribing to the
topic used in this sample: ``led0``, and publishing values as described
above (``on``, ``off``, and ``toggle``).
Library Usage
*************
Allocate enough space for a ``struct nats``, setting a few callbacks so
that you're notified as events happen:
::
struct nats nats_ctx = {
.on_auth_required = on_auth_required,
.on_message = on_message
};
The ``on_auth_required()`` and ``on_message()`` functions are part of
your application, and each must have these signatures:
::
int on_auth_required(struct nats *nats, char **user, char **pass);
int on_message(struct nats *nats, struct nats_msg *msg);
Both functions should return 0 to signal that they could successfully
handle their role, and a negative value, if they couldn't for any
reason. It's recommended to use a negative integer as provided by
errno.h in order to ease debugging.
The first function, ``on_auth_required()``, is called if the server
notifies that it requires authentication. It's not going to be called if
that's not the case, so it is optional. However, if the server asks for
credentials and this function is not provided, the connection will be
closed and an error will be returned by ``nats_connect()``.
The second function, ``on_message()``, will be called whenever the
server has been notified of a value change. The ``struct nats_msg`` has the
following fields:
::
struct nats_msg {
const char *subject;
const char *sid;
const char *reply_to;
};
The field ``reply_to`` may be passed directly to ``nats_publish()``,
in order to publish a reply to this message. If it's ``NULL`` (no
reply-to field in the message from the server), the
``nats_publish()`` function will not reply to a specific mailbox and
will just update the topic value.
In order to manage topic subscription, these functions can be used:
::
int nats_subscribe(struct nats *nats, const char *subject,
const char *queue_group, const char *sid);
``subject`` and ``sid`` are validated so that they're actually valid
per the protocol rules. ``-EINVAL`` is returned if they're not.
If ``queue_group`` is NULL, it's not sent to the server.
::
int nats_unsubscribe(struct nats *nats, const char *sid,
size_t max_msgs);
``sid`` is validated so it's actually valid per the protocol rules.
-EINVAL is returned if it's not.
``max_msgs`` specifies the number of messages that the server will
send before actually unsubscribing the message. Can be 0 to
immediately unsubscribe. (See note below.)
Both of these functions will return ``-ENOMEM`` if they couldn't build
the message to transmit to the server. They can also return any error
that ``net_context_send()`` can return.
Note: In order to conserve resources, the library implementation will not
make note of subscribed topics. Both ``nats_subscribe()`` and
``nats_unsubscribe()`` functions will only notify the server that the client
is either interested or uninterested in a particular topic. The
``on_message()`` callback may be called to notify of changes in topics that
have not been subscribed to (or have been recently unsubscribed). It's up
to the application to decide to ignore the message.
Topics can be published by using the following function:
::
int nats_publish(struct nats *nats, const char *subject,
const char *reply_to, const char *payload,
size_t payload_len);
As usual, ``subject`` is validated and ``-EINVAL`` will be returned if
it's in an invalid format. The ``reply_to`` field can be ``NULL``, in
which case, subscribers to this topic won't receive this information as
well.
As ``net_subscribe()`` and ``net_unsubscribe()``, this function can
return ``-ENOMEM`` or any other errors that ``net_context_send()``
returns.

View file

@ -1,23 +0,0 @@
CONFIG_INIT_STACKS=y
CONFIG_NET_DHCPV4=y
CONFIG_NET_IF_UNICAST_IPV4_ADDR_COUNT=3
CONFIG_NET_IPV6=y
CONFIG_NET_IF_UNICAST_IPV6_ADDR_COUNT=3
CONFIG_NET_LOG=y
CONFIG_NET_MAX_CONTEXTS=10
CONFIG_NET_SHELL=y
CONFIG_NET_STATISTICS=y
CONFIG_NET_TCP=y
CONFIG_NETWORKING=y
CONFIG_DNS_RESOLVER=n
CONFIG_PRINTK=y
CONFIG_NET_CONFIG_SETTINGS=y
CONFIG_NET_CONFIG_MY_IPV6_ADDR="2001:db8::1"
CONFIG_NET_CONFIG_PEER_IPV6_ADDR="2001:db8::2"
CONFIG_TEST_RANDOM_GENERATOR=y
CONFIG_JSON_LIBRARY=y

View file

@ -1,7 +0,0 @@
sample:
name: NATS Client
tests:
sample.net.nats:
harness: net
depends_on: netif
tags: net nats

View file

@ -1,330 +0,0 @@
/*
* Copyright (c) 2017 Intel Corporation
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <logging/log.h>
LOG_MODULE_REGISTER(net_nats_sample, LOG_LEVEL_DBG);
#include <drivers/gpio.h>
#include <net/net_context.h>
#include <net/net_core.h>
#include <net/net_if.h>
#include <zephyr.h>
#include "nats.h"
/* LED */
#ifndef DT_ALIAS_LED0_GPIOS_CONTROLLER
#ifdef LED0_GPIO_PORT
#define DT_ALIAS_LED0_GPIOS_CONTROLLER LED0_GPIO_PORT
#else
#define DT_ALIAS_LED0_GPIOS_CONTROLLER "(fail)"
#define DT_ALIAS_LED0_GPIOS_PIN 0
#endif
#endif
#define LED_GPIO_NAME DT_ALIAS_LED0_GPIOS_CONTROLLER
#define LED_PIN DT_ALIAS_LED0_GPIOS_PIN
static struct device *led0;
static bool fake_led;
/* Network Config */
#if defined(CONFIG_NET_IPV6)
#define NATS_AF_INET AF_INET6
#define NATS_SOCKADDR_IN sockaddr_in6
#if defined(CONFIG_NET_CONFIG_MY_IPV6_ADDR)
#define NATS_LOCAL_IP_ADDR CONFIG_NET_CONFIG_MY_IPV6_ADDR
#else
#define NATS_LOCAL_IP_ADDR "2001:db8::1"
#endif /* CONFIG_NET_CONFIG_MY_IPV6_ADDR */
#if defined(CONFIG_NET_CONFIG_PEER_IPV6_ADDR)
#define NATS_PEER_IP_ADDR CONFIG_NET_CONFIG_PEER_IPV6_ADDR
#else
#define NATS_PEER_IP_ADDR "2001:db8::2"
#endif /* CONFIG_NET_CONFIG_PEER_IPV6_ADDR */
#else /* CONFIG_NET_IPV4 */
#define NATS_AF_INET AF_INET
#define NATS_SOCKADDR_IN sockaddr_in
#if defined(CONFIG_NET_CONFIG_MY_IPV4_ADDR)
#define NATS_LOCAL_IP_ADDR CONFIG_NET_CONFIG_MY_IPV4_ADDR
#else
#define NATS_LOCAL_IP_ADDR "192.168.0.1"
#endif /* CONFIG_NET_CONFIG_MY_IPV4_ADDR */
#if defined(CONFIG_NET_CONFIG_PEER_IPV4_ADDR)
#define NATS_PEER_IP_ADDR CONFIG_NET_CONFIG_PEER_IPV4_ADDR
#else
#define NATS_PEER_IP_ADDR "192.168.0.2"
#endif /* CONFIG_NET_CONFIG_PEER_IPV4_ADDR */
#endif
/* DNS API */
#define DNS_PORT 53
#define DNS_SLEEP_MSECS 400
/* Default server */
#define DEFAULT_PORT 4222
static void panic(const char *msg)
{
LOG_ERR("Panic: %s", msg);
for (;;) {
k_sleep(K_FOREVER);
}
}
static int in_addr_set(sa_family_t family,
const char *ip_addr,
int port,
struct sockaddr *_sockaddr)
{
int rc = 0;
_sockaddr->sa_family = family;
if (ip_addr) {
if (family == AF_INET6) {
rc = net_addr_pton(family,
ip_addr,
&net_sin6(_sockaddr)->sin6_addr);
} else {
rc = net_addr_pton(family,
ip_addr,
&net_sin(_sockaddr)->sin_addr);
}
if (rc < 0) {
LOG_ERR("Invalid IP address: %s", log_strdup(ip_addr));
return -EINVAL;
}
}
if (port >= 0) {
if (family == AF_INET6) {
net_sin6(_sockaddr)->sin6_port = htons(port);
} else {
net_sin(_sockaddr)->sin_port = htons(port);
}
}
return rc;
}
static void initialize_network(void)
{
struct net_if *iface;
LOG_INF("Initializing network");
iface = net_if_get_default();
if (!iface) {
panic("No default network interface");
}
/* TODO: IPV6 DHCP */
#if defined(CONFIG_NET_IPV4) && defined(CONFIG_NET_DHCPV4)
net_dhcpv4_start(iface);
/* delay so DHCPv4 can assign IP */
/* TODO: add a timeout/retry */
LOG_INF("Waiting for DHCP ...");
do {
k_sleep(K_SECONDS(1));
} while (net_ipv4_is_addr_unspecified(&iface->dhcpv4.requested_ip));
LOG_INF("Done!");
/* TODO: add a timeout */
LOG_INF("Waiting for IP assginment ...");
do {
k_sleep(K_SECONDS(1));
} while (!net_ipv4_is_my_addr(&iface->dhcpv4.requested_ip));
LOG_INF("Done!");
#else
struct sockaddr addr;
if (in_addr_set(NATS_AF_INET, NATS_LOCAL_IP_ADDR, 0,
&addr) < 0) {
LOG_ERR("Invalid IP address: %s",
NATS_LOCAL_IP_ADDR);
}
#if defined(CONFIG_NET_IPV6)
net_if_ipv6_addr_add(iface,
&net_sin6(&addr)->sin6_addr,
NET_ADDR_MANUAL, 0);
#else
net_if_ipv4_addr_add(iface,
&net_sin(&addr)->sin_addr,
NET_ADDR_MANUAL, 0);
#endif
#endif /* CONFIG_NET_IPV4 && CONFIG_NET_DHCPV4 */
}
static bool read_led(void)
{
u32_t led = 0U;
int r;
if (!led0) {
return fake_led;
}
r = gpio_pin_read(led0, LED_PIN, &led);
if (r < 0) {
return false;
}
return !led;
}
static void write_led(const struct nats *nats,
const struct nats_msg *msg,
bool state)
{
char *pubstate;
int ret;
if (!led0) {
fake_led = state;
} else {
gpio_pin_write(led0, LED_PIN, !state);
}
pubstate = state ? "on" : "off";
ret = nats_publish(nats, "led0", 0, msg->reply_to, 0,
pubstate, strlen(pubstate));
if (ret < 0) {
printk("Failed to publish: %d\n", ret);
} else {
printk("*** Turning LED %s\n", pubstate);
}
}
static int on_msg_received(const struct nats *nats,
const struct nats_msg *msg)
{
if (!strcmp(msg->subject, "led0")) {
if (msg->payload_len == 2 && !strcmp(msg->payload, "on")) {
write_led(nats, msg, true);
return 0;
}
if (msg->payload_len == 3 && !strcmp(msg->payload, "off")) {
write_led(nats, msg, false);
return 0;
}
if (msg->payload_len == 6 && !strcmp(msg->payload, "toggle")) {
write_led(nats, msg, !read_led());
return 0;
}
return -EINVAL;
}
return -ENOENT;
}
static void initialize_hardware(void)
{
LOG_INF("Initializing hardware");
led0 = device_get_binding(LED_GPIO_NAME);
if (led0) {
gpio_pin_configure(led0, LED_PIN, GPIO_DIR_OUT);
}
}
static int connect(struct nats *nats, u16_t port)
{
#if defined(CONFIG_NET_IPV4) && defined(CONFIG_NET_DHCPV4)
struct net_if *iface;
#endif
struct sockaddr dst_addr, src_addr;
int ret;
LOG_INF("Connecting...");
ret = net_context_get(NATS_AF_INET, SOCK_STREAM, IPPROTO_TCP,
&nats->conn);
if (ret < 0) {
LOG_DBG("Could not get new context: %d", ret);
return ret;
}
/* TODO: IPV6 DHCP */
#if defined(CONFIG_NET_IPV4) && defined(CONFIG_NET_DHCPV4)
iface = net_if_get_default();
net_ipaddr_copy(&net_sin(&src_addr)->sin_addr,
&iface->dhcpv4.requested_ip);
ret = in_addr_set(NATS_AF_INET, NULL, 0, &src_addr);
#else
ret = in_addr_set(NATS_AF_INET, NATS_LOCAL_IP_ADDR,
0, &src_addr);
if (ret < 0) {
goto connect_exit;
}
#endif
ret = in_addr_set(NATS_AF_INET, NATS_PEER_IP_ADDR,
port, &dst_addr);
if (ret < 0) {
goto connect_exit;
}
ret = net_context_bind(nats->conn, &src_addr,
sizeof(struct NATS_SOCKADDR_IN));
if (ret < 0) {
LOG_DBG("Could not bind to local address: %d", -ret);
goto connect_exit;
}
ret = nats_connect(nats, &dst_addr, sizeof(struct NATS_SOCKADDR_IN));
if (!ret) {
return 0;
}
connect_exit:
net_context_put(nats->conn);
return ret;
}
static void nats_client(void)
{
struct nats nats = {
.on_message = on_msg_received
};
LOG_INF("NATS Client Sample");
initialize_network();
initialize_hardware();
if (connect(&nats, DEFAULT_PORT) < 0) {
panic("Could not connect to NATS server");
}
if (nats_subscribe(&nats, "led0", 0, NULL, 0,
"sub132984012384098", 0) < 0) {
panic("Could not subscribe to `led0` topic");
}
}
void main(void)
{
nats_client();
}

View file

@ -1,632 +0,0 @@
/*
* Copyright (c) 2017 Intel Corporation
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <logging/log.h>
LOG_MODULE_DECLARE(net_nats_sample, LOG_LEVEL_DBG);
#include <ctype.h>
#include <errno.h>
#include <data/json.h>
#include <sys/printk.h>
#include <sys/util.h>
#include <net/net_pkt.h>
#include <net/net_context.h>
#include <net/net_core.h>
#include <net/net_if.h>
#include <stdbool.h>
#include <zephyr/types.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <zephyr.h>
#include "nats.h"
#define CMD_BUF_LEN 256
struct nats_info {
const char *server_id;
const char *version;
const char *go;
const char *host;
size_t max_payload;
u16_t port;
bool ssl_required;
bool auth_required;
};
struct io_vec {
const void *base;
size_t len;
};
static bool is_subject_valid(const char *subject, size_t len)
{
size_t pos;
char last = '\0';
if (!subject) {
return false;
}
for (pos = 0; pos < len; last = subject[pos++]) {
switch (subject[pos]) {
case '>':
if (subject[pos + 1] != '\0') {
return false;
}
break;
case '.':
case '*':
if (last == subject[pos]) {
return false;
}
break;
default:
if (isalnum((unsigned char)subject[pos])) {
continue;
}
return false;
}
}
return true;
}
static bool is_sid_valid(const char *sid, size_t len)
{
size_t pos;
if (!sid) {
return false;
}
for (pos = 0; pos < len; pos++) {
if (!isalnum((unsigned char)sid[pos])) {
return false;
}
}
return true;
}
#define TRANSMITV_LITERAL(lit_) { .base = lit_, .len = sizeof(lit_) - 1 }
static int transmitv(struct net_context *conn, int iovcnt,
struct io_vec *iov)
{
u8_t buf[1024];
int i, pos;
for (i = 0, pos = 0; i < iovcnt; pos += iov[i].len, i++) {
memcpy(&buf[pos], iov[i].base, iov[i].len);
}
return net_context_send(conn, buf, pos, NULL, K_NO_WAIT, NULL);
}
static inline int transmit(struct net_context *conn, const char buffer[],
size_t len)
{
return transmitv(conn, 1, (struct io_vec[]) {
{ .base = buffer, .len = len },
});
}
#define FIELD(struct_, member_, type_) { \
.field_name = #member_, \
.field_name_len = sizeof(#member_) - 1, \
.offset = offsetof(struct_, member_), \
.type = type_ \
}
static int handle_server_info(struct nats *nats, char *payload, size_t len,
struct net_buf *buf, u16_t offset)
{
static const struct json_obj_descr descr[] = {
FIELD(struct nats_info, server_id, JSON_TOK_STRING),
FIELD(struct nats_info, version, JSON_TOK_STRING),
FIELD(struct nats_info, go, JSON_TOK_STRING),
FIELD(struct nats_info, host, JSON_TOK_STRING),
FIELD(struct nats_info, port, JSON_TOK_NUMBER),
FIELD(struct nats_info, auth_required, JSON_TOK_TRUE),
FIELD(struct nats_info, ssl_required, JSON_TOK_TRUE),
FIELD(struct nats_info, max_payload, JSON_TOK_NUMBER),
};
struct nats_info info = {};
char user[32], pass[64];
size_t user_len = sizeof(user), pass_len = sizeof(pass);
int ret;
ret = json_obj_parse(payload, len, descr, ARRAY_SIZE(descr), &info);
if (ret < 0) {
return -EINVAL;
}
if (info.ssl_required) {
return -ENOTSUP;
}
if (!info.auth_required) {
return 0;
}
if (!nats->on_auth_required) {
return -EPERM;
}
ret = nats->on_auth_required(nats, user, &user_len, pass, &pass_len);
if (ret < 0) {
return ret;
}
ret = json_escape(user, &user_len, sizeof(user));
if (ret < 0) {
return ret;
}
ret = json_escape(pass, &pass_len, sizeof(pass));
if (ret < 0) {
return ret;
}
return transmitv(nats->conn, 5, (struct io_vec[]) {
TRANSMITV_LITERAL("CONNECT {\"user\":\""),
{ .base = user, .len = user_len },
TRANSMITV_LITERAL("\",\"pass\":\""),
{ .base = pass, .len = pass_len },
TRANSMITV_LITERAL("\"}\r\n"),
});
}
#undef FIELD
static bool char_in_set(char chr, const char *set)
{
const char *ptr;
for (ptr = set; *ptr; ptr++) {
if (*ptr == chr) {
return true;
}
}
return false;
}
static char *strsep(char *strp, const char *delims)
{
const char *delim;
char *ptr;
if (!strp) {
return NULL;
}
for (delim = delims; *delim; delim++) {
ptr = strchr(strp, *delim);
if (ptr) {
*ptr = '\0';
for (ptr++; *ptr; ptr++) {
if (!char_in_set(*ptr, delims)) {
break;
}
}
return ptr;
}
}
return NULL;
}
static int copy_pkt_to_buf(struct net_buf *src, u16_t offset,
char *dst, size_t dst_size, size_t n_bytes)
{
u16_t to_copy;
u16_t copied;
if (dst_size < n_bytes) {
return -ENOMEM;
}
while (src && offset >= src->len) {
offset -= src->len;
src = src->frags;
}
for (copied = 0U; src && n_bytes > 0; offset = 0U) {
to_copy = MIN(n_bytes, src->len - offset);
memcpy(dst + copied, (char *)src->data + offset, to_copy);
copied += to_copy;
n_bytes -= to_copy;
src = src->frags;
}
if (n_bytes > 0) {
return -ENOMEM;
}
return 0;
}
static int handle_server_msg(struct nats *nats, char *payload, size_t len,
struct net_buf *buf, u16_t offset)
{
char *subject, *sid, *reply_to, *bytes, *end_ptr;
char prev_end = payload[len];
size_t payload_size;
/* strsep() uses strchr(), ensure payload is NUL-terminated */
payload[len] = '\0';
/* Slice the tokens */
subject = payload;
sid = strsep(subject, " \t");
reply_to = strsep(sid, " \t");
bytes = strsep(reply_to, " \t");
if (!bytes) {
if (!reply_to) {
return -EINVAL;
}
bytes = reply_to;
reply_to = NULL;
}
/* Parse the payload size */
errno = 0;
payload_size = strtoul(bytes, &end_ptr, 10);
payload[len] = prev_end;
if (errno != 0) {
return -errno;
}
if (!end_ptr) {
return -EINVAL;
}
if (payload_size >= CMD_BUF_LEN - len) {
return -ENOMEM;
}
if (copy_pkt_to_buf(buf, offset, end_ptr, CMD_BUF_LEN - len,
payload_size) < 0) {
return -ENOMEM;
}
end_ptr[payload_size] = '\0';
return nats->on_message(nats, &(struct nats_msg) {
.subject = subject,
.sid = sid,
.reply_to = reply_to,
.payload = end_ptr,
.payload_len = payload_size,
});
}
static int handle_server_ping(struct nats *nats, char *payload, size_t len,
struct net_buf *buf, u16_t offset)
{
static const char pong[] = "PONG\r\n";
return transmit(nats->conn, pong, sizeof(pong) - 1);
}
static int ignore(struct nats *nats, char *payload, size_t len,
struct net_buf *buf, u16_t offset)
{
/* FIXME: Notify user of success/errors. This would require
* maintaining information of what was the last sent command in
* order to provide the best error information for the user.
* Without VERBOSE set, these won't be sent -- but be cautious and
* ignore them just in case.
*/
return 0;
}
#define CMD(cmd_, handler_) { \
.op = cmd_, \
.len = sizeof(cmd_) - 1, \
.handle = handler_ \
}
static int handle_server_cmd(struct nats *nats, char *cmd, size_t len,
struct net_buf *buf, u16_t offset)
{
static const struct {
const char *op;
size_t len;
int (*handle)(struct nats *nats, char *payload, size_t len,
struct net_buf *buf, u16_t offset);
} cmds[] = {
CMD("INFO", handle_server_info),
CMD("MSG", handle_server_msg),
CMD("PING", handle_server_ping),
CMD("+OK", ignore),
CMD("-ERR", ignore),
};
size_t i;
char *payload;
size_t payload_len;
payload = strsep(cmd, " \t");
if (!payload) {
payload = strsep(cmd, "\r");
if (!payload) {
return -EINVAL;
}
}
payload_len = len - (size_t)(payload - cmd);
len = (size_t)(payload - cmd - 1);
for (i = 0; i < ARRAY_SIZE(cmds); i++) {
if (len != cmds[i].len) {
continue;
}
if (!strncmp(cmds[i].op, cmd, len)) {
return cmds[i].handle(nats, payload, payload_len,
buf, offset);
}
}
return -ENOENT;
}
#undef CMD
int nats_subscribe(const struct nats *nats,
const char *subject, size_t subject_len,
const char *queue_group, size_t queue_group_len,
const char *sid, size_t sid_len)
{
if (!is_subject_valid(subject, subject_len)) {
return -EINVAL;
}
if (!is_sid_valid(sid, sid_len)) {
return -EINVAL;
}
if (queue_group) {
return transmitv(nats->conn, 7, (struct io_vec[]) {
TRANSMITV_LITERAL("SUB "),
{
.base = subject,
.len = subject_len ?
subject_len : strlen(subject)
},
TRANSMITV_LITERAL(" "),
{
.base = queue_group,
.len = queue_group_len ?
queue_group_len : strlen(queue_group)
},
TRANSMITV_LITERAL(" "),
{
.base = sid,
.len = sid_len ? sid_len : strlen(sid)
},
TRANSMITV_LITERAL("\r\n")
});
}
return transmitv(nats->conn, 5, (struct io_vec[]) {
TRANSMITV_LITERAL("SUB "),
{
.base = subject,
.len = subject_len ? subject_len : strlen(subject)
},
TRANSMITV_LITERAL(" "),
{
.base = sid,
.len = sid_len ? sid_len : strlen(sid)
},
TRANSMITV_LITERAL("\r\n")
});
}
int nats_unsubscribe(const struct nats *nats,
const char *sid, size_t sid_len, size_t max_msgs)
{
if (!is_sid_valid(sid, sid_len)) {
return -EINVAL;
}
if (max_msgs) {
char max_msgs_str[3 * sizeof(size_t)];
int ret;
ret = snprintk(max_msgs_str, sizeof(max_msgs_str),
"%zu", max_msgs);
if (ret < 0 || ret >= (int)sizeof(max_msgs_str)) {
return -ENOMEM;
}
return transmitv(nats->conn, 5, (struct io_vec[]) {
TRANSMITV_LITERAL("UNSUB "),
{
.base = sid,
.len = sid_len ? sid_len : strlen(sid)
},
TRANSMITV_LITERAL(" "),
{ .base = max_msgs_str, .len = ret },
TRANSMITV_LITERAL("\r\n"),
});
}
return transmitv(nats->conn, 3, (struct io_vec[]) {
TRANSMITV_LITERAL("UNSUB "),
{
.base = sid,
.len = sid_len ? sid_len : strlen(sid)
},
TRANSMITV_LITERAL("\r\n")
});
}
int nats_publish(const struct nats *nats,
const char *subject, size_t subject_len,
const char *reply_to, size_t reply_to_len,
const char *payload, size_t payload_len)
{
char payload_len_str[3 * sizeof(size_t)];
int ret;
if (!is_subject_valid(subject, subject_len)) {
return -EINVAL;
}
ret = snprintk(payload_len_str, sizeof(payload_len_str), "%zu",
payload_len);
if (ret < 0 || ret >= (int)sizeof(payload_len_str)) {
return -ENOMEM;
}
if (reply_to) {
return transmitv(nats->conn, 9, (struct io_vec[]) {
TRANSMITV_LITERAL("PUB "),
{
.base = subject,
.len = subject_len ?
subject_len : strlen(subject)
},
TRANSMITV_LITERAL(" "),
{
.base = reply_to,
.len = reply_to_len ?
reply_to_len : strlen(reply_to)
},
TRANSMITV_LITERAL(" "),
{ .base = payload_len_str, .len = ret },
TRANSMITV_LITERAL("\r\n"),
{ .base = payload, .len = payload_len },
TRANSMITV_LITERAL("\r\n"),
});
}
return transmitv(nats->conn, 7, (struct io_vec[]) {
TRANSMITV_LITERAL("PUB "),
{
.base = subject,
.len = subject_len ? subject_len : strlen(subject)
},
TRANSMITV_LITERAL(" "),
{ .base = payload_len_str, .len = ret },
TRANSMITV_LITERAL("\r\n"),
{ .base = payload, .len = payload_len },
TRANSMITV_LITERAL("\r\n"),
});
}
static void receive_cb(struct net_context *ctx,
struct net_pkt *pkt,
union net_ip_header *ip_hdr,
union net_proto_header *proto_hdr,
int status,
void *user_data)
{
struct nats *nats = user_data;
char cmd_buf[CMD_BUF_LEN];
struct net_buf *tmp;
u16_t pos = 0U, cmd_len = 0U;
size_t len;
u8_t *end_of_line;
if (!pkt) {
/* FIXME: How to handle disconnection? */
return;
}
if (status) {
/* FIXME: How to handle connectio error? */
net_pkt_unref(pkt);
return;
}
tmp = pkt->cursor.buf;
if (!tmp) {
net_pkt_unref(pkt);
return;
}
pos = pkt->cursor.pos - tmp->data;
while (tmp) {
len = tmp->len - pos;
end_of_line = memchr((u8_t *)tmp->data + pos, '\n', len);
if (end_of_line) {
len = end_of_line - ((u8_t *)tmp->data + pos);
}
if (cmd_len + len > sizeof(cmd_buf)) {
break;
}
if (net_pkt_read(pkt, (u8_t *)(cmd_buf + cmd_len), len)) {
break;
}
cmd_len += len;
if (end_of_line) {
u8_t dummy;
int ret;
if (net_pkt_read_u8(pkt, &dummy)) {
break;
}
cmd_buf[cmd_len] = '\0';
ret = handle_server_cmd(nats, cmd_buf, cmd_len,
tmp, pos);
if (ret < 0) {
/* FIXME: What to do with unhandled messages? */
break;
}
cmd_len = 0U;
}
tmp = pkt->cursor.buf;
pos = pkt->cursor.pos - tmp->data;
}
net_pkt_unref(pkt);
}
int nats_connect(struct nats *nats, struct sockaddr *addr, socklen_t addrlen)
{
int ret;
ret = net_context_connect(nats->conn, addr, addrlen,
NULL, K_FOREVER, NULL);
if (ret < 0) {
return ret;
}
return net_context_recv(nats->conn, receive_cb, K_NO_WAIT, nats);
}
int nats_disconnect(struct nats *nats)
{
int ret;
ret = net_context_put(nats->conn);
if (ret < 0) {
return ret;
}
nats->conn = NULL;
return 0;
}

View file

@ -1,47 +0,0 @@
/*
* Copyright (c) 2017 Intel Corporation
*
* SPDX-License-Identifier: Apache-2.0
*/
#ifndef __NATS_H
#define __NATS_H
#include <zephyr/types.h>
#include <net/net_context.h>
struct nats_msg {
const char *subject;
const char *sid;
const char *reply_to;
const char *payload;
size_t payload_len;
};
struct nats {
struct net_context *conn;
int (*on_auth_required)(const struct nats *nats,
char *user, size_t *user_len,
char *pass, size_t *pass_len);
int (*on_message)(const struct nats *nats,
const struct nats_msg *msg);
};
int nats_connect(struct nats *nats, struct sockaddr *addr, socklen_t addrlen);
int nats_disconnect(struct nats *nats);
int nats_subscribe(const struct nats *nats,
const char *subject, size_t subject_len,
const char *queue_group, size_t queue_group_len,
const char *sid, size_t sid_len);
int nats_unsubscribe(const struct nats *nats,
const char *sid, size_t sid_len,
size_t max_msgs);
int nats_publish(const struct nats *nats,
const char *subject, size_t subject_len,
const char *reply_to, size_t reply_to_len,
const char *payload, size_t payload_len);
#endif /* __NATS_H */