diff --git a/include/net/mqtt.h b/include/net/mqtt.h index 0e49f1ff222..8105b638747 100644 --- a/include/net/mqtt.h +++ b/include/net/mqtt.h @@ -9,6 +9,7 @@ #include #include +#include #ifdef __cplusplus extern "C" { @@ -61,11 +62,15 @@ enum mqtt_app { * the state of the received and sent messages. */ struct mqtt_ctx { - /** IP stack context structure */ - struct net_context *net_ctx; - /** Network timeout for tx and rx routines */ + /** Net app context structure */ + struct net_app_ctx net_app_ctx; + s32_t net_init_timeout; s32_t net_timeout; + /** Connectivity */ + char *peer_addr_str; + u16_t peer_port; + /** Callback executed when a MQTT CONNACK msg is received and validated. * If this function pointer is not used, must be set to NULL. */ @@ -176,6 +181,23 @@ struct mqtt_ctx { */ int mqtt_init(struct mqtt_ctx *ctx, enum mqtt_app app_type); +/** + * Release the MQTT context structure + * + * @param ctx MQTT context structure + * @retval 0 on success, and <0 if error + */ +int mqtt_close(struct mqtt_ctx *ctx); + +/** + * Connect to an MQTT broker + * + * @param ctx MQTT context structure + * @retval 0 on success, and <0 if error + */ + +int mqtt_connect(struct mqtt_ctx *ctx); + /** * Sends the MQTT CONNECT message * diff --git a/samples/net/mqtt_publisher/src/config.h b/samples/net/mqtt_publisher/src/config.h index 6f7e448a658..2bfa295732e 100644 --- a/samples/net/mqtt_publisher/src/config.h +++ b/samples/net/mqtt_publisher/src/config.h @@ -29,6 +29,7 @@ #define APP_SLEEP_MSECS 500 #define APP_TX_RX_TIMEOUT 300 +#define APP_NET_INIT_TIMEOUT 10000 #define APP_CONNECT_TRIES 10 diff --git a/samples/net/mqtt_publisher/src/main.c b/samples/net/mqtt_publisher/src/main.c index 051849f7d1d..c85d643a469 100644 --- a/samples/net/mqtt_publisher/src/main.c +++ b/samples/net/mqtt_publisher/src/main.c @@ -61,15 +61,11 @@ struct mqtt_client_ctx { void *publish_data; }; -/* This is the network context structure. */ -static struct net_context *net_ctx; - /* The mqtt client struct */ static struct mqtt_client_ctx client_ctx; /* This routine sets some basic properties for the network context variable */ -static int network_setup(struct net_context **net_ctx, const char *local_addr, - const char *server_addr, u16_t server_port); +static int network_setup(void); /* The signature of this routine must match the connect callback declared at * the mqtt.h header. @@ -243,45 +239,24 @@ static void publisher(void) { int i, rc; - /* The net_ctx variable must be ready BEFORE passing it to the MQTT API. - */ - for (i = 0; i < CONN_TRIES; i++) { - rc = network_setup(&net_ctx, ZEPHYR_ADDR, SERVER_ADDR, - SERVER_PORT); - if (!rc) { - goto connected; - } - } - - PRINT_RESULT("network_setup", rc); - goto exit_app; - -connected: /* Set everything to 0 and later just assign the required fields. */ memset(&client_ctx, 0x00, sizeof(client_ctx)); - /* The network context is the only field that must be set BEFORE - * calling the mqtt_init routine. - */ - client_ctx.mqtt_ctx.net_ctx = net_ctx; - /* connect, disconnect and malformed may be set to NULL */ client_ctx.mqtt_ctx.connect = connect_cb; client_ctx.mqtt_ctx.disconnect = disconnect_cb; client_ctx.mqtt_ctx.malformed = malformed_cb; + client_ctx.mqtt_ctx.net_init_timeout = APP_NET_INIT_TIMEOUT; client_ctx.mqtt_ctx.net_timeout = APP_TX_RX_TIMEOUT; + client_ctx.mqtt_ctx.peer_addr_str = SERVER_ADDR; + client_ctx.mqtt_ctx.peer_port = SERVER_PORT; + /* Publisher apps TX the MQTT PUBLISH msg */ client_ctx.mqtt_ctx.publish_tx = publish_cb; - rc = mqtt_init(&client_ctx.mqtt_ctx, MQTT_APP_PUBLISHER); - PRINT_RESULT("mqtt_init", rc); - if (rc != 0) { - goto exit_app; - } - /* The connect message will be sent to the MQTT server (broker). * If clean_session here is 0, the mqtt_ctx clean_session variable * will be set to 0 also. Please don't do that, set always to 1. @@ -295,6 +270,30 @@ connected: client_ctx.disconnect_data = "DISCONNECTED"; client_ctx.publish_data = "PUBLISH"; + rc = network_setup(); + PRINT_RESULT("network_setup", rc); + if (rc < 0) { + return; + } + + rc = mqtt_init(&client_ctx.mqtt_ctx, MQTT_APP_PUBLISHER); + PRINT_RESULT("mqtt_init", rc); + if (rc != 0) { + return; + } + + for (i = 0; i < CONN_TRIES; i++) { + rc = mqtt_connect(&client_ctx.mqtt_ctx); + PRINT_RESULT("mqtt_connect", rc); + if (!rc) { + goto connected; + } + } + + goto exit_app; + +connected: + rc = try_to_connect(&client_ctx); PRINT_RESULT("try_to_connect", rc); if (rc != 0) { @@ -327,34 +326,12 @@ connected: PRINT_RESULT("mqtt_tx_disconnect", rc); exit_app: - net_context_put(net_ctx); + + mqtt_close(&client_ctx.mqtt_ctx); + printk("\nBye!\n"); } -static int set_addr(struct sockaddr *sock_addr, const char *addr, u16_t port) -{ - void *ptr; - int rc; - -#ifdef CONFIG_NET_IPV6 - net_sin6(sock_addr)->sin6_port = htons(port); - sock_addr->family = AF_INET6; - ptr = &(net_sin6(sock_addr)->sin6_addr); - rc = net_addr_pton(AF_INET6, addr, ptr); -#else - net_sin(sock_addr)->sin_port = htons(port); - sock_addr->family = AF_INET; - ptr = &(net_sin(sock_addr)->sin_addr); - rc = net_addr_pton(AF_INET, addr, ptr); -#endif - - if (rc) { - printk("Invalid IP address: %s\n", addr); - } - - return rc; -} - #if defined(CONFIG_NET_L2_BLUETOOTH) static bool bt_connected; @@ -378,24 +355,13 @@ struct bt_conn_cb bt_conn_cb = { }; #endif -static int network_setup(struct net_context **net_ctx, const char *local_addr, - const char *server_addr, u16_t server_port) +static int network_setup(void) { -#ifdef CONFIG_NET_IPV6 - socklen_t addr_len = sizeof(struct sockaddr_in6); - sa_family_t family = AF_INET6; - -#else - socklen_t addr_len = sizeof(struct sockaddr_in); - sa_family_t family = AF_INET; -#endif - struct sockaddr server_sock, local_sock; - void *p; - int rc; #if defined(CONFIG_NET_L2_BLUETOOTH) const char *progress_mark = "/-\\|"; int i = 0; + int rc; rc = bt_enable(NULL); if (rc) { @@ -420,57 +386,7 @@ static int network_setup(struct net_context **net_ctx, const char *local_addr, printk("\n"); #endif - rc = set_addr(&local_sock, local_addr, 0); - if (rc) { - printk("set_addr (local) error\n"); - return rc; - } - -#ifdef CONFIG_NET_IPV6 - p = net_if_ipv6_addr_add(net_if_get_default(), - &net_sin6(&local_sock)->sin6_addr, - NET_ADDR_MANUAL, 0); -#else - p = net_if_ipv4_addr_add(net_if_get_default(), - &net_sin(&local_sock)->sin_addr, - NET_ADDR_MANUAL, 0); -#endif - - if (!p) { - return -EINVAL; - } - - rc = net_context_get(family, SOCK_STREAM, IPPROTO_TCP, net_ctx); - if (rc) { - printk("net_context_get error\n"); - return rc; - } - - rc = net_context_bind(*net_ctx, &local_sock, addr_len); - if (rc) { - printk("net_context_bind error\n"); - goto lb_exit; - } - - rc = set_addr(&server_sock, server_addr, server_port); - if (rc) { - printk("set_addr (server) error\n"); - goto lb_exit; - } - - rc = net_context_connect(*net_ctx, &server_sock, addr_len, NULL, - APP_SLEEP_MSECS, NULL); - if (rc) { - printk("net_context_connect error\n"); - goto lb_exit; - } - return 0; - -lb_exit: - net_context_put(*net_ctx); - - return rc; } void main(void) diff --git a/subsys/net/lib/mqtt/Kconfig b/subsys/net/lib/mqtt/Kconfig index a22f03fcb9f..a5fc60f17ca 100644 --- a/subsys/net/lib/mqtt/Kconfig +++ b/subsys/net/lib/mqtt/Kconfig @@ -9,6 +9,7 @@ config MQTT_LIB bool "MQTT Library Support" default n + select NET_APP_CLIENT help Enable the Zephyr MQTT Library diff --git a/subsys/net/lib/mqtt/mqtt.c b/subsys/net/lib/mqtt/mqtt.c index bbfb698316b..5f960e09101 100644 --- a/subsys/net/lib/mqtt/mqtt.c +++ b/subsys/net/lib/mqtt/mqtt.c @@ -9,6 +9,7 @@ #include #include +#include #include #include @@ -42,7 +43,8 @@ int mqtt_tx_connect(struct mqtt_ctx *ctx, struct mqtt_connect_msg *msg) goto exit_connect; } - tx = net_pkt_get_tx(ctx->net_ctx, ctx->net_timeout); + tx = net_app_get_net_pkt(&ctx->net_app_ctx, + AF_UNSPEC, ctx->net_timeout); if (tx == NULL) { rc = -ENOMEM; goto exit_connect; @@ -51,7 +53,8 @@ int mqtt_tx_connect(struct mqtt_ctx *ctx, struct mqtt_connect_msg *msg) net_pkt_frag_add(tx, data); data = NULL; - rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL); + rc = net_app_send_pkt(&ctx->net_app_ctx, + tx, NULL, 0, ctx->net_timeout, NULL); if (rc < 0) { net_pkt_unref(tx); } @@ -79,7 +82,8 @@ int mqtt_tx_disconnect(struct mqtt_ctx *ctx) return -EINVAL; } - tx = net_pkt_get_tx(ctx->net_ctx, ctx->net_timeout); + tx = net_app_get_net_pkt(&ctx->net_app_ctx, + AF_UNSPEC, ctx->net_timeout); if (tx == NULL) { return -ENOMEM; } @@ -90,7 +94,8 @@ int mqtt_tx_disconnect(struct mqtt_ctx *ctx) goto exit_disconnect; } - rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL); + rc = net_app_send_pkt(&ctx->net_app_ctx, + tx, NULL, 0, ctx->net_timeout, NULL); if (rc < 0) { goto exit_disconnect; } @@ -152,7 +157,8 @@ int mqtt_tx_pub_msgs(struct mqtt_ctx *ctx, u16_t id, return -EINVAL; } - tx = net_pkt_get_tx(ctx->net_ctx, ctx->net_timeout); + tx = net_app_get_net_pkt(&ctx->net_app_ctx, + AF_UNSPEC, ctx->net_timeout); if (tx == NULL) { return -ENOMEM; } @@ -163,7 +169,8 @@ int mqtt_tx_pub_msgs(struct mqtt_ctx *ctx, u16_t id, goto exit_send; } - rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL); + rc = net_app_send_pkt(&ctx->net_app_ctx, + tx, NULL, 0, ctx->net_timeout, NULL); if (rc < 0) { goto exit_send; } @@ -215,7 +222,8 @@ int mqtt_tx_publish(struct mqtt_ctx *ctx, struct mqtt_publish_msg *msg) goto exit_publish; } - tx = net_pkt_get_tx(ctx->net_ctx, ctx->net_timeout); + tx = net_app_get_net_pkt(&ctx->net_app_ctx, + AF_UNSPEC, ctx->net_timeout); if (tx == NULL) { rc = -ENOMEM; goto exit_publish; @@ -224,7 +232,8 @@ int mqtt_tx_publish(struct mqtt_ctx *ctx, struct mqtt_publish_msg *msg) net_pkt_frag_add(tx, data); data = NULL; - rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL); + rc = net_app_send_pkt(&ctx->net_app_ctx, + tx, NULL, 0, ctx->net_timeout, NULL); if (rc < 0) { net_pkt_unref(tx); } @@ -251,7 +260,8 @@ int mqtt_tx_pingreq(struct mqtt_ctx *ctx) return -EINVAL; } - tx = net_pkt_get_tx(ctx->net_ctx, ctx->net_timeout); + tx = net_app_get_net_pkt(&ctx->net_app_ctx, + AF_UNSPEC, ctx->net_timeout); if (tx == NULL) { return -ENOMEM; } @@ -262,7 +272,8 @@ int mqtt_tx_pingreq(struct mqtt_ctx *ctx) goto exit_pingreq; } - rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL); + rc = net_app_send_pkt(&ctx->net_app_ctx, + tx, NULL, 0, ctx->net_timeout, NULL); if (rc < 0) { goto exit_pingreq; } @@ -296,7 +307,8 @@ int mqtt_tx_subscribe(struct mqtt_ctx *ctx, u16_t pkt_id, u8_t items, goto exit_subs; } - tx = net_pkt_get_tx(ctx->net_ctx, ctx->net_timeout); + tx = net_app_get_net_pkt(&ctx->net_app_ctx, + AF_UNSPEC, ctx->net_timeout); if (tx == NULL) { rc = -ENOMEM; goto exit_subs; @@ -305,7 +317,8 @@ int mqtt_tx_subscribe(struct mqtt_ctx *ctx, u16_t pkt_id, u8_t items, net_pkt_frag_add(tx, data); data = NULL; - rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL); + rc = net_app_send_pkt(&ctx->net_app_ctx, + tx, NULL, 0, ctx->net_timeout, NULL); if (rc < 0) { net_pkt_unref(tx); } @@ -339,7 +352,8 @@ int mqtt_tx_unsubscribe(struct mqtt_ctx *ctx, u16_t pkt_id, u8_t items, goto exit_unsub; } - tx = net_pkt_get_tx(ctx->net_ctx, ctx->net_timeout); + tx = net_app_get_net_pkt(&ctx->net_app_ctx, + AF_UNSPEC, ctx->net_timeout); if (tx == NULL) { rc = -ENOMEM; goto exit_unsub; @@ -348,7 +362,8 @@ int mqtt_tx_unsubscribe(struct mqtt_ctx *ctx, u16_t pkt_id, u8_t items, net_pkt_frag_add(tx, data); data = NULL; - rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL); + rc = net_app_send_pkt(&ctx->net_app_ctx, + tx, NULL, 0, ctx->net_timeout, NULL); if (rc < 0) { net_pkt_unref(tx); } @@ -743,13 +758,13 @@ int mqtt_parser(struct mqtt_ctx *ctx, struct net_pkt *rx) } static -void mqtt_recv(struct net_context *net_ctx, struct net_pkt *pkt, int status, +void mqtt_recv(struct net_app_ctx *ctx, struct net_pkt *pkt, int status, void *data) { struct mqtt_ctx *mqtt = (struct mqtt_ctx *)data; /* net_ctx is already referenced to by the mqtt_ctx struct */ - ARG_UNUSED(net_ctx); + ARG_UNUSED(ctx); if (status || !pkt) { return; @@ -765,6 +780,49 @@ lb_exit: net_pkt_unref(pkt); } +int mqtt_connect(struct mqtt_ctx *ctx) +{ + int rc = 0; + + if (!ctx) { + return -EFAULT; + } + + rc = net_app_init_tcp_client(&ctx->net_app_ctx, + NULL, + NULL, + ctx->peer_addr_str, + ctx->peer_port, + ctx->net_init_timeout, + ctx); + if (rc < 0) { + goto error_connect; + } + + rc = net_app_set_cb(&ctx->net_app_ctx, + NULL, + mqtt_recv, + NULL, + NULL); + if (rc < 0) { + goto error_connect; + } + + rc = net_app_connect(&ctx->net_app_ctx, ctx->net_timeout); + if (rc < 0) { + goto error_connect; + } + + return rc; + +error_connect: + /* clean net app context, so mqtt_connect() can be called repeatedly */ + net_app_close(&ctx->net_app_ctx); + net_app_release(&ctx->net_app_ctx); + + return rc; +} + int mqtt_init(struct mqtt_ctx *ctx, enum mqtt_app app_type) { /* So far, only clean session = 1 is supported */ @@ -774,10 +832,19 @@ int mqtt_init(struct mqtt_ctx *ctx, enum mqtt_app app_type) ctx->app_type = app_type; ctx->rcv = mqtt_parser; - /* Install the receiver callback, timeout is set to K_NO_WAIT. - * In this case, no return code is evaluated. - */ - (void)net_context_recv(ctx->net_ctx, mqtt_recv, K_NO_WAIT, ctx); + return 0; +} + +int mqtt_close(struct mqtt_ctx *ctx) +{ + if (!ctx) { + return -EFAULT; + } + + if (ctx->net_app_ctx.is_init) { + net_app_close(&ctx->net_app_ctx); + net_app_release(&ctx->net_app_ctx); + } return 0; } diff --git a/tests/net/lib/mqtt_publisher/src/config.h b/tests/net/lib/mqtt_publisher/src/config.h index f9f96c085cf..d8e0f9923be 100644 --- a/tests/net/lib/mqtt_publisher/src/config.h +++ b/tests/net/lib/mqtt_publisher/src/config.h @@ -29,6 +29,7 @@ #define APP_SLEEP_MSECS 500 #define APP_TX_RX_TIMEOUT 300 +#define APP_NET_INIT_TIMEOUT 1000 #define APP_CONNECT_TRIES 10 diff --git a/tests/net/lib/mqtt_publisher/src/test_mqtt_publish.c b/tests/net/lib/mqtt_publisher/src/test_mqtt_publish.c index 2d9129e87a5..b6a2c419ba0 100644 --- a/tests/net/lib/mqtt_publisher/src/test_mqtt_publish.c +++ b/tests/net/lib/mqtt_publisher/src/test_mqtt_publish.c @@ -9,6 +9,7 @@ #include #include +#include #include #include @@ -56,16 +57,9 @@ struct mqtt_client_ctx { /* This is mqtt payload message. */ char payload[] = "DOORS:OPEN_QoSx"; -/* This is the network context structure. */ -static struct net_context *net_ctx; - /* The mqtt client struct */ static struct mqtt_client_ctx client_ctx; -/* This routine sets some basic properties for the network context variable */ -static int network_setup(struct net_context **net_ctx, const char *local_addr, - const char *server_addr, u16_t server_port); - /* The signature of this routine must match the connect callback declared at * the mqtt.h header. */ @@ -223,37 +217,24 @@ static int init_network(void) { int rc; - /* The net_ctx variable must be ready BEFORE passing it to the MQTT API. - */ - rc = network_setup(&net_ctx, ZEPHYR_ADDR, SERVER_ADDR, SERVER_PORT); - if (rc != 0) { - goto exit_app; - } - /* Set everything to 0 and later just assign the required fields. */ memset(&client_ctx, 0x00, sizeof(client_ctx)); - /* The network context is the only field that must be set BEFORE - * calling the mqtt_init routine. - */ - client_ctx.mqtt_ctx.net_ctx = net_ctx; - /* connect, disconnect and malformed may be set to NULL */ client_ctx.mqtt_ctx.connect = connect_cb; client_ctx.mqtt_ctx.disconnect = disconnect_cb; client_ctx.mqtt_ctx.malformed = malformed_cb; + client_ctx.mqtt_ctx.net_init_timeout = APP_NET_INIT_TIMEOUT; client_ctx.mqtt_ctx.net_timeout = APP_TX_RX_TIMEOUT; + client_ctx.mqtt_ctx.peer_addr_str = SERVER_ADDR; + client_ctx.mqtt_ctx.peer_port = SERVER_PORT; + /* Publisher apps TX the MQTT PUBLISH msg */ client_ctx.mqtt_ctx.publish_tx = publish_cb; - rc = mqtt_init(&client_ctx.mqtt_ctx, MQTT_APP_PUBLISHER); - if (rc != 0) { - goto exit_app; - } - /* The connect message will be sent to the MQTT server (broker). * If clean_session here is 0, the mqtt_ctx clean_session variable * will be set to 0 also. Please don't do that, set always to 1. @@ -267,10 +248,20 @@ static int init_network(void) client_ctx.disconnect_data = "DISCONNECTED"; client_ctx.publish_data = "PUBLISH"; + rc = mqtt_init(&client_ctx.mqtt_ctx, MQTT_APP_PUBLISHER); + if (rc != 0) { + goto exit_app; + } + + rc = mqtt_connect(&client_ctx.mqtt_ctx); + if (!rc) { + goto exit_app; + } + return TC_PASS; exit_app: - net_context_put(net_ctx); + mqtt_close(&client_ctx.mqtt_ctx); return TC_FAIL; } @@ -327,99 +318,6 @@ static int test_disconnect(void) return TC_PASS; } -static int set_addr(struct sockaddr *sock_addr, const char *addr, u16_t port) -{ - void *ptr; - int rc; - -#ifdef CONFIG_NET_IPV6 - net_sin6(sock_addr)->sin6_port = htons(port); - sock_addr->family = AF_INET6; - ptr = &(net_sin6(sock_addr)->sin6_addr); - rc = net_addr_pton(AF_INET6, addr, ptr); -#else - net_sin(sock_addr)->sin_port = htons(port); - sock_addr->family = AF_INET; - ptr = &(net_sin(sock_addr)->sin_addr); - rc = net_addr_pton(AF_INET, addr, ptr); -#endif - - if (rc) { - TC_PRINT("Invalid IP address: %s\n", addr); - } - - return rc; -} - -static int network_setup(struct net_context **net_ctx, const char *local_addr, - const char *server_addr, u16_t server_port) -{ -#ifdef CONFIG_NET_IPV6 - socklen_t addr_len = sizeof(struct sockaddr_in6); - sa_family_t family = AF_INET6; - -#else - socklen_t addr_len = sizeof(struct sockaddr_in); - sa_family_t family = AF_INET; -#endif - struct sockaddr server_sock, local_sock; - void *p; - int rc; - - rc = set_addr(&local_sock, local_addr, 0); - if (rc) { - TC_PRINT("set_addr (local) error\n"); - return TC_FAIL; - } - -#ifdef CONFIG_NET_IPV6 - p = net_if_ipv6_addr_add(net_if_get_default(), - &net_sin6(&local_sock)->sin6_addr, - NET_ADDR_MANUAL, 0); -#else - p = net_if_ipv4_addr_add(net_if_get_default(), - &net_sin(&local_sock)->sin_addr, - NET_ADDR_MANUAL, 0); -#endif - - if (!p) { - return TC_FAIL; - } - - rc = net_context_get(family, SOCK_STREAM, IPPROTO_TCP, net_ctx); - if (rc) { - TC_PRINT("net_context_get error\n"); - return TC_FAIL; - } - - rc = net_context_bind(*net_ctx, &local_sock, addr_len); - if (rc) { - TC_PRINT("net_context_bind error\n"); - goto lb_exit; - } - - rc = set_addr(&server_sock, server_addr, server_port); - if (rc) { - TC_PRINT("set_addr (server) error\n"); - goto lb_exit; - } - - rc = net_context_connect(*net_ctx, &server_sock, addr_len, NULL, - APP_SLEEP_MSECS, NULL); - if (rc) { - TC_PRINT("net_context_connect error\n" - "Is the server (broker) up and running?\n"); - goto lb_exit; - } - - return TC_PASS; - -lb_exit: - net_context_put(*net_ctx); - - return TC_FAIL; -} - void test_mqtt_init(void) { zassert_true(init_network() == TC_PASS, NULL); diff --git a/tests/net/lib/mqtt_subscriber/src/config.h b/tests/net/lib/mqtt_subscriber/src/config.h index f9f96c085cf..d8e0f9923be 100644 --- a/tests/net/lib/mqtt_subscriber/src/config.h +++ b/tests/net/lib/mqtt_subscriber/src/config.h @@ -29,6 +29,7 @@ #define APP_SLEEP_MSECS 500 #define APP_TX_RX_TIMEOUT 300 +#define APP_NET_INIT_TIMEOUT 1000 #define APP_CONNECT_TRIES 10 diff --git a/tests/net/lib/mqtt_subscriber/src/test_mqtt_subscribe.c b/tests/net/lib/mqtt_subscriber/src/test_mqtt_subscribe.c index 1b923a71e1f..cc9ce90f263 100644 --- a/tests/net/lib/mqtt_subscriber/src/test_mqtt_subscribe.c +++ b/tests/net/lib/mqtt_subscriber/src/test_mqtt_subscribe.c @@ -60,16 +60,9 @@ struct mqtt_client_ctx { void *unsubscribe_data; }; -/* This is the network context structure. */ -static struct net_context *net_ctx; - /* The mqtt client struct */ static struct mqtt_client_ctx client_ctx; -/* This routine sets some basic properties for the network context variable */ -static int network_setup(struct net_context **net_ctx, const char *local_addr, - const char *server_addr, u16_t server_port); - /* The signature of this routine must match the connect callback declared at * the mqtt.h header. */ @@ -249,21 +242,9 @@ static int init_network(void) { int rc; - /* The net_ctx variable must be ready BEFORE passing it to the MQTT API. - */ - rc = network_setup(&net_ctx, ZEPHYR_ADDR, SERVER_ADDR, SERVER_PORT); - if (rc != 0) { - goto exit_app; - } - /* Set everything to 0 and later just assign the required fields. */ memset(&client_ctx, 0x00, sizeof(client_ctx)); - /* The network context is the only field that must be set BEFORE - * calling the mqtt_init routine. - */ - client_ctx.mqtt_ctx.net_ctx = net_ctx; - /* connect, disconnect and malformed may be set to NULL */ client_ctx.mqtt_ctx.connect = connect_cb; @@ -275,8 +256,14 @@ static int init_network(void) client_ctx.mqtt_ctx.unsubscribe = unsubscribe_cb; + client_ctx.mqtt_ctx.net_init_timeout = APP_NET_INIT_TIMEOUT; + client_ctx.mqtt_ctx.net_timeout = APP_TX_RX_TIMEOUT; + client_ctx.mqtt_ctx.peer_addr_str = SERVER_ADDR; + + client_ctx.mqtt_ctx.peer_port = SERVER_PORT; + /* Publisher apps TX the MQTT PUBLISH msg */ client_ctx.mqtt_ctx.publish_rx = publish_rx_cb; @@ -299,10 +286,20 @@ static int init_network(void) client_ctx.subscribe_data = "SUBSCRIBE"; client_ctx.unsubscribe_data = "UNSUBSCRIBE"; + rc = mqtt_init(&client_ctx.mqtt_ctx, MQTT_APP_SUBSCRIBER); + if (rc != 0) { + goto exit_app; + } + + rc = mqtt_connect(&client_ctx.mqtt_ctx); + if (!rc) { + goto exit_app; + } + return TC_PASS; exit_app: - net_context_put(net_ctx); + mqtt_close(&client_ctx.mqtt_ctx); return TC_FAIL; } @@ -364,99 +361,6 @@ static int test_disconnect(void) return TC_PASS; } -static int set_addr(struct sockaddr *sock_addr, const char *addr, u16_t port) -{ - void *ptr; - int rc; - -#ifdef CONFIG_NET_IPV6 - net_sin6(sock_addr)->sin6_port = htons(port); - sock_addr->family = AF_INET6; - ptr = &(net_sin6(sock_addr)->sin6_addr); - rc = net_addr_pton(AF_INET6, addr, ptr); -#else - net_sin(sock_addr)->sin_port = htons(port); - sock_addr->family = AF_INET; - ptr = &(net_sin(sock_addr)->sin_addr); - rc = net_addr_pton(AF_INET, addr, ptr); -#endif - - if (rc) { - printk("Invalid IP address: %s\n", addr); - } - - return rc; -} - -static int network_setup(struct net_context **net_ctx, const char *local_addr, - const char *server_addr, u16_t server_port) -{ -#ifdef CONFIG_NET_IPV6 - socklen_t addr_len = sizeof(struct sockaddr_in6); - sa_family_t family = AF_INET6; - -#else - socklen_t addr_len = sizeof(struct sockaddr_in); - sa_family_t family = AF_INET; -#endif - struct sockaddr server_sock, local_sock; - void *p; - int rc; - - rc = set_addr(&local_sock, local_addr, 0); - if (rc) { - printk("set_addr (local) error\n"); - return rc; - } - -#ifdef CONFIG_NET_IPV6 - p = net_if_ipv6_addr_add(net_if_get_default(), - &net_sin6(&local_sock)->sin6_addr, - NET_ADDR_MANUAL, 0); -#else - p = net_if_ipv4_addr_add(net_if_get_default(), - &net_sin(&local_sock)->sin_addr, - NET_ADDR_MANUAL, 0); -#endif - - if (!p) { - return -EINVAL; - } - - rc = net_context_get(family, SOCK_STREAM, IPPROTO_TCP, net_ctx); - if (rc) { - printk("net_context_get error\n"); - return rc; - } - - rc = net_context_bind(*net_ctx, &local_sock, addr_len); - if (rc) { - printk("net_context_bind error\n"); - goto lb_exit; - } - - rc = set_addr(&server_sock, server_addr, server_port); - if (rc) { - printk("set_addr (server) error\n"); - goto lb_exit; - } - - rc = net_context_connect(*net_ctx, &server_sock, addr_len, NULL, - APP_SLEEP_MSECS, NULL); - if (rc) { - printk("net_context_connect error\n" - "Is the server (broker) up and running?\n"); - goto lb_exit; - } - - return 0; - -lb_exit: - net_context_put(*net_ctx); - - return rc; -} - void test_mqtt_init(void) { zassert_true(init_network() == TC_PASS, NULL);