From 87ca1c332995e7406718ce78e36b5ced120ae80a Mon Sep 17 00:00:00 2001 From: Robert Lubos Date: Tue, 29 Nov 2022 17:08:40 +0100 Subject: [PATCH] net: zperf: Make UDP/TCP servers restartable Make the TCP/UDP server functionality restartable. Provide a public API to stop the TCP/UDP server. Signed-off-by: Robert Lubos --- include/zephyr/net/zperf.h | 14 ++++ subsys/net/lib/zperf/zperf_common.c | 2 + subsys/net/lib/zperf/zperf_internal.h | 5 +- subsys/net/lib/zperf/zperf_shell.c | 12 --- subsys/net/lib/zperf/zperf_tcp_receiver.c | 97 ++++++++++++++--------- subsys/net/lib/zperf/zperf_udp_receiver.c | 80 ++++++++++++++----- 6 files changed, 138 insertions(+), 72 deletions(-) diff --git a/include/zephyr/net/zperf.h b/include/zephyr/net/zperf.h index e008db83b37..f93fe8e7e34 100644 --- a/include/zephyr/net/zperf.h +++ b/include/zephyr/net/zperf.h @@ -149,6 +149,20 @@ int zperf_udp_download(const struct zperf_download_params *param, int zperf_tcp_download(const struct zperf_download_params *param, zperf_callback callback, void *user_data); +/** + * @brief Stop UDP server. + * + * @return 0 if server was stopped successfully, a negative error code otherwise. + */ +int zperf_udp_download_stop(void); + +/** + * @brief Stop TCP server. + * + * @return 0 if server was stopped successfully, a negative error code otherwise. + */ +int zperf_tcp_download_stop(void); + #ifdef __cplusplus } #endif diff --git a/subsys/net/lib/zperf/zperf_common.c b/subsys/net/lib/zperf/zperf_common.c index 1890f95d1de..f65b2c60898 100644 --- a/subsys/net/lib/zperf/zperf_common.c +++ b/subsys/net/lib/zperf/zperf_common.c @@ -114,6 +114,8 @@ static int zperf_init(const struct device *unused) zperf_udp_uploader_init(); zperf_tcp_uploader_init(); + zperf_udp_receiver_init(); + zperf_tcp_receiver_init(); return 0; } diff --git a/subsys/net/lib/zperf/zperf_internal.h b/subsys/net/lib/zperf/zperf_internal.h index 1f8223a42c6..102443deb65 100644 --- a/subsys/net/lib/zperf/zperf_internal.h +++ b/subsys/net/lib/zperf/zperf_internal.h @@ -92,9 +92,6 @@ extern void connect_ap(char *ssid); const struct in_addr *zperf_get_default_if_in4_addr(void); const struct in6_addr *zperf_get_default_if_in6_addr(void); -void zperf_tcp_stopped(void); -void zperf_tcp_started(void); - int zperf_prepare_upload_sock(const struct sockaddr *peer_addr, int tos, int proto); @@ -103,5 +100,7 @@ uint32_t zperf_packet_duration(uint32_t packet_size, uint32_t rate_in_kbps); void zperf_async_work_submit(struct k_work *work); void zperf_udp_uploader_init(void); void zperf_tcp_uploader_init(void); +void zperf_udp_receiver_init(void); +void zperf_tcp_receiver_init(void); #endif /* __ZPERF_INTERNAL_H */ diff --git a/subsys/net/lib/zperf/zperf_shell.c b/subsys/net/lib/zperf/zperf_shell.c index 6834deb3c2a..455e3e3ae5d 100644 --- a/subsys/net/lib/zperf/zperf_shell.c +++ b/subsys/net/lib/zperf/zperf_shell.c @@ -1012,18 +1012,6 @@ static int cmd_connectap(const struct shell *sh, size_t argc, char *argv[]) return 0; } -static bool tcp_running; - -void zperf_tcp_stopped(void) -{ - tcp_running = false; -} - -void zperf_tcp_started(void) -{ - tcp_running = true; -} - static void tcp_session_cb(enum zperf_status status, struct zperf_results *result, void *user_data) diff --git a/subsys/net/lib/zperf/zperf_tcp_receiver.c b/subsys/net/lib/zperf/zperf_tcp_receiver.c index 309f11646dd..4dd53da8284 100644 --- a/subsys/net/lib/zperf/zperf_tcp_receiver.c +++ b/subsys/net/lib/zperf/zperf_tcp_receiver.c @@ -25,8 +25,6 @@ LOG_MODULE_DECLARE(net_zperf, CONFIG_NET_ZPERF_LOG_LEVEL); static struct sockaddr_in6 *in6_addr_my; static struct sockaddr_in *in4_addr_my; -static bool init_done; - #if defined(CONFIG_NET_TC_THREAD_COOPERATIVE) #define TCP_RECEIVER_THREAD_PRIORITY K_PRIO_COOP(8) #else @@ -42,6 +40,7 @@ static bool init_done; #define SOCK_ID_MAX 4 #define TCP_RECEIVER_BUF_SIZE 1500 +#define POLL_TIMEOUT_MS 100 static K_THREAD_STACK_DEFINE(tcp_receiver_stack_area, TCP_RECEIVER_STACK_SIZE); static struct k_thread tcp_receiver_thread_data; @@ -49,6 +48,9 @@ static struct k_thread tcp_receiver_thread_data; static zperf_callback tcp_session_cb; static void *tcp_user_data; static bool tcp_server_running; +static bool tcp_server_stop; +static uint16_t tcp_server_port; +static K_SEM_DEFINE(tcp_server_run, 0, 1); static void tcp_received(int sock, size_t datalen) { @@ -95,8 +97,6 @@ static void tcp_received(int sock, size_t datalen) tcp_user_data); } - zperf_tcp_stopped(); - session->state = STATE_NULL; } @@ -109,13 +109,9 @@ static void tcp_received(int sock, size_t datalen) } } -void tcp_receiver_thread(void *ptr1, void *ptr2, void *ptr3) +static void tcp_server_session(void) { - ARG_UNUSED(ptr1); - ARG_UNUSED(ptr3); - static uint8_t buf[TCP_RECEIVER_BUF_SIZE]; - int port = POINTER_TO_INT(ptr2); struct zsock_pollfd fds[SOCK_ID_MAX] = { 0 }; int ret; @@ -132,7 +128,7 @@ void tcp_receiver_thread(void *ptr1, void *ptr2, void *ptr3) IPPROTO_TCP); if (fds[SOCK_ID_IPV4_LISTEN].fd < 0) { NET_ERR("Cannot create IPv4 network socket."); - goto cleanup; + goto error; } if (MY_IP4ADDR && strlen(MY_IP4ADDR)) { @@ -149,13 +145,13 @@ void tcp_receiver_thread(void *ptr1, void *ptr2, void *ptr3) in4_addr = zperf_get_default_if_in4_addr(); if (!in4_addr) { NET_ERR("Unable to get IPv4 by default"); - goto cleanup; + goto error; } memcpy(&in4_addr_my->sin_addr, in4_addr, sizeof(struct in_addr)); } - in4_addr_my->sin_port = htons(port); + in4_addr_my->sin_port = htons(tcp_server_port); NET_INFO("Binding to %s", net_sprint_ipv4_addr(&in4_addr_my->sin_addr)); @@ -166,13 +162,13 @@ void tcp_receiver_thread(void *ptr1, void *ptr2, void *ptr3) if (ret < 0) { NET_ERR("Cannot bind IPv4 UDP port %d (%d)", ntohs(in4_addr_my->sin_port), errno); - goto cleanup; + goto error; } ret = zsock_listen(fds[SOCK_ID_IPV4_LISTEN].fd, 1); if (ret < 0) { NET_ERR("Cannot listen IPv4 TCP (%d)", errno); - goto cleanup; + goto error; } fds[SOCK_ID_IPV4_LISTEN].events = ZSOCK_POLLIN; @@ -187,7 +183,7 @@ void tcp_receiver_thread(void *ptr1, void *ptr2, void *ptr3) IPPROTO_TCP); if (fds[SOCK_ID_IPV6_LISTEN].fd < 0) { NET_ERR("Cannot create IPv6 network socket."); - goto cleanup; + goto error; } if (MY_IP6ADDR && strlen(MY_IP6ADDR)) { @@ -205,13 +201,13 @@ void tcp_receiver_thread(void *ptr1, void *ptr2, void *ptr3) in6_addr = zperf_get_default_if_in6_addr(); if (!in6_addr) { NET_ERR("Unable to get IPv4 by default"); - goto cleanup; + goto error; } memcpy(&in6_addr_my->sin6_addr, in6_addr, sizeof(struct in6_addr)); } - in6_addr_my->sin6_port = htons(port); + in6_addr_my->sin6_port = htons(tcp_server_port); NET_INFO("Binding to %s", net_sprint_ipv6_addr(&in6_addr_my->sin6_addr)); @@ -222,31 +218,35 @@ void tcp_receiver_thread(void *ptr1, void *ptr2, void *ptr3) if (ret < 0) { NET_ERR("Cannot bind IPv6 UDP port %d (%d)", ntohs(in6_addr_my->sin6_port), errno); - goto cleanup; + goto error; } ret = zsock_listen(fds[SOCK_ID_IPV6_LISTEN].fd, 1); if (ret < 0) { NET_ERR("Cannot listen IPv6 TCP (%d)", errno); - goto cleanup; + goto error; } fds[SOCK_ID_IPV6_LISTEN].events = ZSOCK_POLLIN; } - NET_INFO("Listening on port %d", port); - - /* TODO Investigate started/stopped logic */ - zperf_tcp_started(); - init_done = true; + NET_INFO("Listening on port %d", tcp_server_port); while (true) { - ret = zsock_poll(fds, ARRAY_SIZE(fds), -1); + ret = zsock_poll(fds, ARRAY_SIZE(fds), POLL_TIMEOUT_MS); if (ret < 0) { NET_ERR("TCP receiver poll error (%d)", errno); + goto error; + } + + if (tcp_server_stop) { goto cleanup; } + if (ret == 0) { + continue; + } + for (int i = 0; i < ARRAY_SIZE(fds); i++) { struct sockaddr addr; socklen_t addrlen = sizeof(addr); @@ -255,7 +255,7 @@ void tcp_receiver_thread(void *ptr1, void *ptr2, void *ptr3) (fds[i].revents & ZSOCK_POLLNVAL)) { NET_ERR("TCP receiver IPv%d socket error", (i <= SOCK_ID_IPV4_DATA) ? 4 : 6); - goto cleanup; + goto error; } if (!(fds[i].revents & ZSOCK_POLLIN)) { @@ -271,7 +271,7 @@ void tcp_receiver_thread(void *ptr1, void *ptr2, void *ptr3) if (sock < 0) { NET_ERR("TCP receiver IPv%d accept error", (i <= SOCK_ID_IPV4_DATA) ? 4 : 6); - goto cleanup; + goto error; } if (i == SOCK_ID_IPV4_LISTEN && @@ -298,7 +298,7 @@ void tcp_receiver_thread(void *ptr1, void *ptr2, void *ptr3) NET_ERR("recv failed on IPv%d socket (%d)", (i <= SOCK_ID_IPV4_DATA) ? 4 : 6, errno); - goto cleanup; + goto error; } tcp_received(fds[i].fd, ret); @@ -313,12 +313,13 @@ void tcp_receiver_thread(void *ptr1, void *ptr2, void *ptr3) } } -cleanup: +error: if (tcp_session_cb != NULL) { tcp_session_cb(ZPERF_SESSION_ERROR, NULL, tcp_user_data); } +cleanup: for (int i = 0; i < ARRAY_SIZE(fds); i++) { if (fds[i].fd >= 0) { zsock_close(fds[i].fd); @@ -326,18 +327,28 @@ cleanup: } } -void zperf_tcp_receiver_init(int port) +void tcp_receiver_thread(void *ptr1, void *ptr2, void *ptr3) { - if (init_done) { - zperf_tcp_started(); - return; - } + ARG_UNUSED(ptr1); + ARG_UNUSED(ptr2); + ARG_UNUSED(ptr3); + while (true) { + k_sem_take(&tcp_server_run, K_FOREVER); + + tcp_server_session(); + + tcp_server_running = false; + } +} + +void zperf_tcp_receiver_init(void) +{ k_thread_create(&tcp_receiver_thread_data, tcp_receiver_stack_area, K_THREAD_STACK_SIZEOF(tcp_receiver_stack_area), tcp_receiver_thread, - NULL, INT_TO_POINTER(port), NULL, + NULL, NULL, NULL, TCP_RECEIVER_THREAD_PRIORITY, IS_ENABLED(CONFIG_USERSPACE) ? K_USER | K_INHERIT_PERMS : 0, @@ -357,9 +368,23 @@ int zperf_tcp_download(const struct zperf_download_params *param, tcp_session_cb = callback; tcp_user_data = user_data; + tcp_server_port = param->port; tcp_server_running = true; + tcp_server_stop = false; - zperf_tcp_receiver_init(param->port); + k_sem_give(&tcp_server_run); + + return 0; +} + +int zperf_tcp_download_stop(void) +{ + if (!tcp_server_running) { + return -EALREADY; + } + + tcp_server_stop = true; + tcp_session_cb = NULL; return 0; } diff --git a/subsys/net/lib/zperf/zperf_udp_receiver.c b/subsys/net/lib/zperf/zperf_udp_receiver.c index f5e7486772f..f96acde6f49 100644 --- a/subsys/net/lib/zperf/zperf_udp_receiver.c +++ b/subsys/net/lib/zperf/zperf_udp_receiver.c @@ -38,6 +38,7 @@ static struct sockaddr_in *in4_addr_my; #define SOCK_ID_MAX 2 #define UDP_RECEIVER_BUF_SIZE 1500 +#define POLL_TIMEOUT_MS 100 static K_THREAD_STACK_DEFINE(udp_receiver_stack_area, UDP_RECEIVER_STACK_SIZE); static struct k_thread udp_receiver_thread_data; @@ -45,6 +46,9 @@ static struct k_thread udp_receiver_thread_data; static zperf_callback udp_session_cb; static void *udp_user_data; static bool udp_server_running; +static bool udp_server_stop; +static uint16_t udp_server_port; +static K_SEM_DEFINE(udp_server_run, 0, 1); static inline void build_reply(struct zperf_udp_datagram *hdr, struct zperf_server_hdr *stat, @@ -225,13 +229,9 @@ static void udp_received(int sock, const struct sockaddr *addr, uint8_t *data, } } -void udp_receiver_thread(void *ptr1, void *ptr2, void *ptr3) +static void udp_server_session(void) { - ARG_UNUSED(ptr1); - ARG_UNUSED(ptr3); - static uint8_t buf[UDP_RECEIVER_BUF_SIZE]; - int port = POINTER_TO_INT(ptr2); struct zsock_pollfd fds[SOCK_ID_MAX] = { 0 }; int ret; @@ -248,7 +248,7 @@ void udp_receiver_thread(void *ptr1, void *ptr2, void *ptr3) IPPROTO_UDP); if (fds[SOCK_ID_IPV4].fd < 0) { NET_ERR("Cannot create IPv4 network socket."); - goto cleanup; + goto error; } if (MY_IP4ADDR && strlen(MY_IP4ADDR)) { @@ -265,7 +265,7 @@ void udp_receiver_thread(void *ptr1, void *ptr2, void *ptr3) in4_addr = zperf_get_default_if_in4_addr(); if (!in4_addr) { NET_ERR("Unable to get IPv4 by default"); - goto cleanup; + goto error; } memcpy(&in4_addr_my->sin_addr, in4_addr, sizeof(struct in_addr)); @@ -274,7 +274,7 @@ void udp_receiver_thread(void *ptr1, void *ptr2, void *ptr3) NET_INFO("Binding to %s", net_sprint_ipv4_addr(&in4_addr_my->sin_addr)); - in4_addr_my->sin_port = htons(port); + in4_addr_my->sin_port = htons(udp_server_port); ret = zsock_bind(fds[SOCK_ID_IPV4].fd, (struct sockaddr *)in4_addr_my, @@ -283,7 +283,7 @@ void udp_receiver_thread(void *ptr1, void *ptr2, void *ptr3) NET_ERR("Cannot bind IPv4 UDP port %d (%d)", ntohs(in4_addr_my->sin_port), errno); - goto cleanup; + goto error; } fds[SOCK_ID_IPV4].events = ZSOCK_POLLIN; @@ -298,7 +298,7 @@ void udp_receiver_thread(void *ptr1, void *ptr2, void *ptr3) IPPROTO_UDP); if (fds[SOCK_ID_IPV6].fd < 0) { NET_ERR("Cannot create IPv4 network socket."); - goto cleanup; + goto error; } if (MY_IP6ADDR && strlen(MY_IP6ADDR)) { @@ -316,7 +316,7 @@ void udp_receiver_thread(void *ptr1, void *ptr2, void *ptr3) in6_addr = zperf_get_default_if_in6_addr(); if (!in6_addr) { NET_ERR("Unable to get IPv4 by default"); - goto cleanup; + goto error; } memcpy(&in6_addr_my->sin6_addr, in6_addr, sizeof(struct in6_addr)); @@ -325,7 +325,7 @@ void udp_receiver_thread(void *ptr1, void *ptr2, void *ptr3) NET_INFO("Binding to %s", net_sprint_ipv6_addr(&in6_addr_my->sin6_addr)); - in6_addr_my->sin6_port = htons(port); + in6_addr_my->sin6_port = htons(udp_server_port); ret = zsock_bind(fds[SOCK_ID_IPV6].fd, (struct sockaddr *)in6_addr_my, @@ -334,21 +334,29 @@ void udp_receiver_thread(void *ptr1, void *ptr2, void *ptr3) NET_ERR("Cannot bind IPv6 UDP port %d (%d)", ntohs(in6_addr_my->sin6_port), ret); - goto cleanup; + goto error; } fds[SOCK_ID_IPV6].events = ZSOCK_POLLIN; } - NET_INFO("Listening on port %d", port); + NET_INFO("Listening on port %d", udp_server_port); while (true) { - ret = zsock_poll(fds, ARRAY_SIZE(fds), -1); + ret = zsock_poll(fds, ARRAY_SIZE(fds), POLL_TIMEOUT_MS); if (ret < 0) { NET_ERR("UDP receiver poll error (%d)", errno); + goto error; + } + + if (udp_server_stop) { goto cleanup; } + if (ret == 0) { + continue; + } + for (int i = 0; i < ARRAY_SIZE(fds); i++) { struct sockaddr addr; socklen_t addrlen = sizeof(addr); @@ -357,7 +365,7 @@ void udp_receiver_thread(void *ptr1, void *ptr2, void *ptr3) (fds[i].revents & ZSOCK_POLLNVAL)) { NET_ERR("UDP receiver IPv%d socket error", (i == SOCK_ID_IPV4) ? 4 : 6); - goto cleanup; + goto error; } if (!(fds[i].revents & ZSOCK_POLLIN)) { @@ -369,18 +377,19 @@ void udp_receiver_thread(void *ptr1, void *ptr2, void *ptr3) if (ret < 0) { NET_ERR("recv failed on IPv%d socket (%d)", (i == SOCK_ID_IPV4) ? 4 : 6, errno); - goto cleanup; + goto error; } udp_received(fds[i].fd, &addr, buf, ret); } } -cleanup: +error: if (udp_session_cb != NULL) { udp_session_cb(ZPERF_SESSION_ERROR, NULL, udp_user_data); } +cleanup: for (int i = 0; i < ARRAY_SIZE(fds); i++) { if (fds[i].fd >= 0) { zsock_close(fds[i].fd); @@ -388,13 +397,28 @@ cleanup: } } -static void zperf_udp_receiver_init(int port) +static void udp_receiver_thread(void *ptr1, void *ptr2, void *ptr3) +{ + ARG_UNUSED(ptr1); + ARG_UNUSED(ptr2); + ARG_UNUSED(ptr3); + + while (true) { + k_sem_take(&udp_server_run, K_FOREVER); + + udp_server_session(); + + udp_server_running = false; + } +} + +void zperf_udp_receiver_init(void) { k_thread_create(&udp_receiver_thread_data, udp_receiver_stack_area, K_THREAD_STACK_SIZEOF(udp_receiver_stack_area), udp_receiver_thread, - NULL, INT_TO_POINTER(port), NULL, + NULL, NULL, NULL, UDP_RECEIVER_THREAD_PRIORITY, IS_ENABLED(CONFIG_USERSPACE) ? K_USER | K_INHERIT_PERMS : 0, @@ -414,9 +438,23 @@ int zperf_udp_download(const struct zperf_download_params *param, udp_session_cb = callback; udp_user_data = user_data; + udp_server_port = param->port; udp_server_running = true; + udp_server_stop = false; - zperf_udp_receiver_init(param->port); + k_sem_give(&udp_server_run); + + return 0; +} + +int zperf_udp_download_stop(void) +{ + if (!udp_server_running) { + return -EALREADY; + } + + udp_server_stop = true; + udp_session_cb = NULL; return 0; }