net: zperf: Add public API to start TCP/UDP server

Add public API for zperf download functionality. The TCP/UDP server
modules are decoupled from shell, allowing to trigger download directly
from the application code. The shell submodule makes use of this new
public API.

Signed-off-by: Robert Lubos <robert.lubos@nordicsemi.no>
This commit is contained in:
Robert Lubos 2022-11-23 17:42:49 +01:00 committed by Carles Cufí
commit cd4f7cbc61
5 changed files with 324 additions and 225 deletions

View file

@ -16,7 +16,6 @@ LOG_MODULE_DECLARE(net_zperf, CONFIG_NET_ZPERF_LOG_LEVEL);
#include <zephyr/net/zperf.h>
#include "zperf_internal.h"
#include "shell_utils.h"
#include "zperf_session.h"
/* To get net_sprint_ipv{4|6}_addr() */
@ -40,8 +39,12 @@ static struct sockaddr_in *in4_addr_my;
#define UDP_RECEIVER_BUF_SIZE 1500
K_THREAD_STACK_DEFINE(udp_receiver_stack_area, UDP_RECEIVER_STACK_SIZE);
struct k_thread udp_receiver_thread_data;
static K_THREAD_STACK_DEFINE(udp_receiver_stack_area, UDP_RECEIVER_STACK_SIZE);
static struct k_thread udp_receiver_thread_data;
static zperf_callback udp_session_cb;
static void *udp_user_data;
static bool udp_server_running;
static inline void build_reply(struct zperf_udp_datagram *hdr,
struct zperf_server_hdr *stat,
@ -71,8 +74,7 @@ static inline void build_reply(struct zperf_udp_datagram *hdr,
#define BUF_SIZE sizeof(struct zperf_udp_datagram) + \
sizeof(struct zperf_server_hdr)
static int zperf_receiver_send_stat(const struct shell *sh,
int sock, const struct sockaddr *addr,
static int zperf_receiver_send_stat(int sock, const struct sockaddr *addr,
struct zperf_udp_datagram *hdr,
struct zperf_server_hdr *stat)
{
@ -86,15 +88,13 @@ static int zperf_receiver_send_stat(const struct shell *sh,
sizeof(struct sockaddr_in6) :
sizeof(struct sockaddr_in));
if (ret < 0) {
shell_fprintf(sh, SHELL_WARNING,
" Cannot send data to peer (%d)", errno);
NET_ERR("Cannot send data to peer (%d)", errno);
}
return ret;
}
static void udp_received(const struct shell *sh, int sock,
const struct sockaddr *addr, uint8_t *data,
static void udp_received(int sock, const struct sockaddr *addr, uint8_t *data,
size_t datalen)
{
struct zperf_udp_datagram *hdr;
@ -104,8 +104,7 @@ static void udp_received(const struct shell *sh, int sock,
int32_t id;
if (datalen < sizeof(struct zperf_udp_datagram)) {
shell_fprintf(sh, SHELL_WARNING,
"Short iperf packet!\n");
NET_WARN("Short iperf packet!");
return;
}
@ -114,8 +113,7 @@ static void udp_received(const struct shell *sh, int sock,
session = get_session(addr, SESSION_UDP);
if (!session) {
shell_fprintf(sh, SHELL_WARNING,
"Cannot get a session!\n");
NET_ERR("Cannot get a session!");
return;
}
@ -128,44 +126,33 @@ static void udp_received(const struct shell *sh, int sock,
/* Session is already completed: Resend the stat packet
* and continue
*/
if (zperf_receiver_send_stat(sh, sock, addr, hdr,
if (zperf_receiver_send_stat(sock, addr, hdr,
&session->stat) < 0) {
shell_fprintf(sh, SHELL_WARNING,
"Failed to send the packet\n");
NET_ERR("Failed to send the packet");
}
} else {
/* Start a new session! */
shell_fprintf(sh, SHELL_NORMAL,
"New session started.\n");
zperf_reset_session_stats(session);
session->state = STATE_ONGOING;
session->start_time = time;
/* Start a new session! */
if (udp_session_cb != NULL) {
udp_session_cb(ZPERF_SESSION_STARTED, NULL,
udp_user_data);
}
}
break;
case STATE_ONGOING:
if (id < 0) { /* Negative id means session end. */
uint32_t rate_in_kbps;
struct zperf_results results = { 0 };
uint32_t duration;
shell_fprintf(sh, SHELL_NORMAL, "End of session!\n");
duration = k_ticks_to_us_ceil32(time -
session->start_time);
/* Update state machine */
session->state = STATE_COMPLETED;
/* Compute baud rate */
if (duration != 0U) {
rate_in_kbps = (uint32_t)
((session->length * 8ULL *
(uint64_t)USEC_PER_SEC) /
((uint64_t)duration * 1024ULL));
} else {
rate_in_kbps = 0U;
}
/* Fill statistics */
session->stat.flags = 0x80000000;
session->stat.total_len1 = session->length >> 32;
@ -179,37 +166,23 @@ static void udp_received(const struct shell *sh, int sock,
session->stat.jitter1 = 0;
session->stat.jitter2 = session->jitter;
if (zperf_receiver_send_stat(sh, sock, addr, hdr,
if (zperf_receiver_send_stat(sock, addr, hdr,
&session->stat) < 0) {
shell_fprintf(sh, SHELL_WARNING,
"Failed to send the packet\n");
NET_ERR("Failed to send the packet");
}
shell_fprintf(sh, SHELL_NORMAL,
" duration:\t\t");
print_number(sh, duration, TIME_US, TIME_US_UNIT);
shell_fprintf(sh, SHELL_NORMAL, "\n");
results.nb_packets_rcvd = session->counter;
results.nb_packets_lost = session->error;
results.nb_packets_outorder = session->outorder;
results.total_len = session->length;
results.time_in_us = duration;
results.jitter_in_us = session->jitter;
results.packet_size = session->length / session->counter;
shell_fprintf(sh, SHELL_NORMAL,
" received packets:\t%u\n",
session->counter);
shell_fprintf(sh, SHELL_NORMAL,
" nb packets lost:\t%u\n",
session->error);
shell_fprintf(sh, SHELL_NORMAL,
" nb packets outorder:\t%u\n",
session->outorder);
shell_fprintf(sh, SHELL_NORMAL,
" jitter:\t\t\t");
print_number(sh, session->jitter, TIME_US,
TIME_US_UNIT);
shell_fprintf(sh, SHELL_NORMAL, "\n");
shell_fprintf(sh, SHELL_NORMAL,
" rate:\t\t\t");
print_number(sh, rate_in_kbps, KBPS, KBPS_UNIT);
shell_fprintf(sh, SHELL_NORMAL, "\n");
if (udp_session_cb != NULL) {
udp_session_cb(ZPERF_SESSION_FINISHED, &results,
udp_user_data);
}
} else {
/* Update counter */
session->counter++;
@ -254,10 +227,10 @@ static void udp_received(const struct shell *sh, int sock,
void udp_receiver_thread(void *ptr1, void *ptr2, void *ptr3)
{
ARG_UNUSED(ptr1);
ARG_UNUSED(ptr3);
static uint8_t buf[UDP_RECEIVER_BUF_SIZE];
const struct shell *sh = ptr1;
int port = POINTER_TO_INT(ptr2);
struct zsock_pollfd fds[SOCK_ID_MAX] = { 0 };
int ret;
@ -274,18 +247,16 @@ void udp_receiver_thread(void *ptr1, void *ptr2, void *ptr3)
fds[SOCK_ID_IPV4].fd = zsock_socket(AF_INET, SOCK_DGRAM,
IPPROTO_UDP);
if (fds[SOCK_ID_IPV4].fd < 0) {
shell_fprintf(sh, SHELL_WARNING,
"Cannot create IPv4 network socket.\n");
NET_ERR("Cannot create IPv4 network socket.");
goto cleanup;
}
if (MY_IP4ADDR && strlen(MY_IP4ADDR)) {
/* Use setting IP */
ret = zperf_get_ipv4_addr(sh, MY_IP4ADDR,
ret = zperf_get_ipv4_addr(MY_IP4ADDR,
&in4_addr_my->sin_addr);
if (ret < 0) {
shell_fprintf(sh, SHELL_WARNING,
"Unable to set IPv4\n");
NET_WARN("Unable to set IPv4");
goto use_existing_ipv4;
}
} else {
@ -293,16 +264,15 @@ void udp_receiver_thread(void *ptr1, void *ptr2, void *ptr3)
/* Use existing IP */
in4_addr = zperf_get_default_if_in4_addr();
if (!in4_addr) {
shell_fprintf(sh, SHELL_WARNING,
"Unable to get IPv4 by default\n");
NET_ERR("Unable to get IPv4 by default");
goto cleanup;
}
memcpy(&in4_addr_my->sin_addr, in4_addr,
sizeof(struct in_addr));
}
shell_fprintf(sh, SHELL_NORMAL, "Binding to %s\n",
net_sprint_ipv4_addr(&in4_addr_my->sin_addr));
NET_INFO("Binding to %s",
net_sprint_ipv4_addr(&in4_addr_my->sin_addr));
in4_addr_my->sin_port = htons(port);
@ -310,10 +280,9 @@ void udp_receiver_thread(void *ptr1, void *ptr2, void *ptr3)
(struct sockaddr *)in4_addr_my,
sizeof(struct sockaddr_in));
if (ret < 0) {
shell_fprintf(sh, SHELL_WARNING,
"Cannot bind IPv4 UDP port %d (%d)\n",
ntohs(in4_addr_my->sin_port),
errno);
NET_ERR("Cannot bind IPv4 UDP port %d (%d)",
ntohs(in4_addr_my->sin_port),
errno);
goto cleanup;
}
@ -328,19 +297,17 @@ void udp_receiver_thread(void *ptr1, void *ptr2, void *ptr3)
fds[SOCK_ID_IPV6].fd = zsock_socket(AF_INET6, SOCK_DGRAM,
IPPROTO_UDP);
if (fds[SOCK_ID_IPV6].fd < 0) {
shell_fprintf(sh, SHELL_WARNING,
"Cannot create IPv4 network socket.\n");
NET_ERR("Cannot create IPv4 network socket.");
goto cleanup;
}
if (MY_IP6ADDR && strlen(MY_IP6ADDR)) {
/* Use setting IP */
ret = zperf_get_ipv6_addr(sh, MY_IP6ADDR,
ret = zperf_get_ipv6_addr(MY_IP6ADDR,
MY_PREFIX_LEN_STR,
&in6_addr_my->sin6_addr);
if (ret < 0) {
shell_fprintf(sh, SHELL_WARNING,
"Unable to set IPv6\n");
NET_WARN("Unable to set IPv6");
goto use_existing_ipv6;
}
} else {
@ -348,16 +315,15 @@ void udp_receiver_thread(void *ptr1, void *ptr2, void *ptr3)
/* Use existing IP */
in6_addr = zperf_get_default_if_in6_addr();
if (!in6_addr) {
shell_fprintf(sh, SHELL_WARNING,
"Unable to get IPv4 by default\n");
NET_ERR("Unable to get IPv4 by default");
goto cleanup;
}
memcpy(&in6_addr_my->sin6_addr, in6_addr,
sizeof(struct in6_addr));
}
shell_fprintf(sh, SHELL_NORMAL, "Binding to %s\n",
net_sprint_ipv6_addr(&in6_addr_my->sin6_addr));
NET_INFO("Binding to %s",
net_sprint_ipv6_addr(&in6_addr_my->sin6_addr));
in6_addr_my->sin6_port = htons(port);
@ -365,25 +331,21 @@ void udp_receiver_thread(void *ptr1, void *ptr2, void *ptr3)
(struct sockaddr *)in6_addr_my,
sizeof(struct sockaddr_in6));
if (ret < 0) {
shell_fprintf(sh, SHELL_WARNING,
"Cannot bind IPv6 UDP port %d (%d)\n",
ntohs(in6_addr_my->sin6_port),
ret);
NET_ERR("Cannot bind IPv6 UDP port %d (%d)",
ntohs(in6_addr_my->sin6_port),
ret);
goto cleanup;
}
fds[SOCK_ID_IPV6].events = ZSOCK_POLLIN;
}
shell_fprintf(sh, SHELL_NORMAL,
"Listening on port %d\n", port);
NET_INFO("Listening on port %d", port);
while (true) {
ret = zsock_poll(fds, ARRAY_SIZE(fds), -1);
if (ret < 0) {
shell_fprintf(sh, SHELL_WARNING,
"UDP receiver poll error (%d)\n",
errno);
NET_ERR("UDP receiver poll error (%d)", errno);
goto cleanup;
}
@ -393,9 +355,7 @@ void udp_receiver_thread(void *ptr1, void *ptr2, void *ptr3)
if ((fds[i].revents & ZSOCK_POLLERR) ||
(fds[i].revents & ZSOCK_POLLNVAL)) {
shell_fprintf(
sh, SHELL_WARNING,
"UDP receiver IPv%d socket error\n",
NET_ERR("UDP receiver IPv%d socket error",
(i == SOCK_ID_IPV4) ? 4 : 6);
goto cleanup;
}
@ -407,18 +367,20 @@ void udp_receiver_thread(void *ptr1, void *ptr2, void *ptr3)
ret = zsock_recvfrom(fds[i].fd, buf, sizeof(buf), 0,
&addr, &addrlen);
if (ret < 0) {
shell_fprintf(
sh, SHELL_WARNING,
"recv failed on IPv%d socket (%d)\n",
NET_ERR("recv failed on IPv%d socket (%d)",
(i == SOCK_ID_IPV4) ? 4 : 6, errno);
goto cleanup;
}
udp_received(sh, fds[i].fd, &addr, buf, ret);
udp_received(fds[i].fd, &addr, buf, ret);
}
}
cleanup:
if (udp_session_cb != NULL) {
udp_session_cb(ZPERF_SESSION_ERROR, NULL, udp_user_data);
}
for (int i = 0; i < ARRAY_SIZE(fds); i++) {
if (fds[i].fd >= 0) {
zsock_close(fds[i].fd);
@ -426,15 +388,35 @@ cleanup:
}
}
void zperf_udp_receiver_init(const struct shell *sh, int port)
static void zperf_udp_receiver_init(int port)
{
k_thread_create(&udp_receiver_thread_data,
udp_receiver_stack_area,
K_THREAD_STACK_SIZEOF(udp_receiver_stack_area),
udp_receiver_thread,
(void *)sh, INT_TO_POINTER(port), NULL,
NULL, INT_TO_POINTER(port), NULL,
UDP_RECEIVER_THREAD_PRIORITY,
IS_ENABLED(CONFIG_USERSPACE) ? K_USER |
K_INHERIT_PERMS : 0,
K_NO_WAIT);
}
int zperf_udp_download(const struct zperf_download_params *param,
zperf_callback callback, void *user_data)
{
if (param == NULL || callback == NULL) {
return -EINVAL;
}
if (udp_server_running) {
return -EALREADY;
}
udp_session_cb = callback;
udp_user_data = user_data;
udp_server_running = true;
zperf_udp_receiver_init(param->port);
return 0;
}