samples: net: zperf: Rewrite download part to use sockets

Rewrite the TCP/UDP download part of the zperf sample to use socket API.
For UDP, performance impact is negligible (< 1 Mbps), for TCP it's
noticable, but still throughputs can be considered satisfactory (up to
~75 Mbps).

Signed-off-by: Robert Lubos <robert.lubos@nordicsemi.no>
This commit is contained in:
Robert Lubos 2022-06-02 15:05:54 +02:00 committed by Carles Cufí
commit fd2fab1a49
4 changed files with 365 additions and 313 deletions

View file

@ -19,64 +19,44 @@ LOG_MODULE_DECLARE(net_zperf_sample, LOG_LEVEL_DBG);
static struct session sessions[SESSION_PROTO_END][SESSION_MAX];
/* Get session from a given packet */
struct session *get_session(struct net_pkt *pkt,
union net_ip_header *ip_hdr,
union net_proto_header *proto_hdr,
struct session *get_session(const struct sockaddr *addr,
enum session_proto proto)
{
struct session *active = NULL;
struct session *free = NULL;
struct in6_addr ipv6 = { };
struct in_addr ipv4 = { };
struct net_udp_hdr *udp_hdr;
int i = 0;
uint16_t port;
if (!pkt) {
printk("Error! null pkt detected.\n");
return NULL;
}
const struct sockaddr_in *addr4 = (const struct sockaddr_in *)addr;
const struct sockaddr_in6 *addr6 = (const struct sockaddr_in6 *)addr;
if (proto != SESSION_TCP && proto != SESSION_UDP) {
printk("Error! unsupported proto.\n");
return NULL;
}
udp_hdr = proto_hdr->udp;
/* Get tuple of the remote connection */
port = udp_hdr->src_port;
if (net_pkt_family(pkt) == AF_INET6) {
net_ipv6_addr_copy_raw((uint8_t *)&ipv6, ip_hdr->ipv6->src);
} else if (net_pkt_family(pkt) == AF_INET) {
net_ipv4_addr_copy_raw((uint8_t *)&ipv4, ip_hdr->ipv4->src);
} else {
printk("Error! unsupported protocol %d\n",
net_pkt_family(pkt));
return NULL;
}
/* Check whether we already have an active session */
while (!active && i < SESSION_MAX) {
struct session *ptr = &sessions[proto][i];
#if defined(CONFIG_NET_IPV4)
if (ptr->port == port &&
net_pkt_family(pkt) == AF_INET &&
net_ipv4_addr_cmp(&ptr->ip.in_addr, &ipv4)) {
if (IS_ENABLED(CONFIG_NET_IPV4) &&
addr->sa_family == AF_INET &&
ptr->ip.family == AF_INET &&
ptr->port == addr4->sin_port &&
net_ipv4_addr_cmp(&ptr->ip.in_addr, &addr4->sin_addr)) {
/* We found an active session */
active = ptr;
} else
#endif
#if defined(CONFIG_NET_IPV6)
if (ptr->port == port &&
net_pkt_family(pkt) == AF_INET6 &&
net_ipv6_addr_cmp(&ptr->ip.in6_addr, &ipv6)) {
break;
}
if (IS_ENABLED(CONFIG_NET_IPV6) &&
addr->sa_family == AF_INET6 &&
ptr->ip.family == AF_INET6 &&
ptr->port == addr6->sin6_port &&
net_ipv6_addr_cmp(&ptr->ip.in6_addr, &addr6->sin6_addr)) {
/* We found an active session */
active = ptr;
} else
#endif
break;
}
if (!free && (ptr->state == STATE_NULL ||
ptr->state == STATE_COMPLETED)) {
/* We found a free slot - just in case */
@ -89,30 +69,30 @@ struct session *get_session(struct net_pkt *pkt,
/* If no active session then create a new one */
if (!active && free) {
active = free;
active->port = port;
#if defined(CONFIG_NET_IPV6)
if (net_pkt_family(pkt) == AF_INET6) {
net_ipaddr_copy(&active->ip.in6_addr, &ipv6);
if (IS_ENABLED(CONFIG_NET_IPV4) && addr->sa_family == AF_INET) {
active->port = addr4->sin_port;
active->ip.family = AF_INET;
net_ipaddr_copy(&active->ip.in_addr, &addr4->sin_addr);
} else if (IS_ENABLED(CONFIG_NET_IPV6) &&
addr->sa_family == AF_INET6) {
active->port = addr6->sin6_port;
active->ip.family = AF_INET6;
net_ipaddr_copy(&active->ip.in6_addr, &addr6->sin6_addr);
}
#endif
#if defined(CONFIG_NET_IPV4)
if (net_pkt_family(pkt) == AF_INET) {
net_ipaddr_copy(&active->ip.in_addr, &ipv4);
}
#endif
}
return active;
}
struct session *get_tcp_session(struct net_context *ctx)
/* TODO Unify session handling */
struct session *get_tcp_session(int sock)
{
struct session *free = NULL;
int i = 0;
if (!ctx) {
printk("Error! null context detected.\n");
if (sock < 0) {
printk("Error! Invalid socket.\n");
return NULL;
}
@ -120,7 +100,7 @@ struct session *get_tcp_session(struct net_context *ctx)
while (i < SESSION_MAX) {
struct session *ptr = &sessions[SESSION_TCP][i];
if (ptr->ctx == ctx) {
if (ptr->sock == sock) {
return ptr;
}
@ -135,7 +115,7 @@ struct session *get_tcp_session(struct net_context *ctx)
}
if (free) {
free->ctx = ctx;
free->sock = sock;
}
return free;

View file

@ -39,7 +39,7 @@ struct session {
struct net_addr ip;
/* TCP session */
struct net_context *ctx;
int sock;
enum state state;
@ -58,11 +58,9 @@ struct session {
struct zperf_server_hdr stat;
};
struct session *get_session(struct net_pkt *pkt,
union net_ip_header *ip_hdr,
union net_proto_header *proto_hdr,
struct session *get_session(const struct sockaddr *addr,
enum session_proto proto);
struct session *get_tcp_session(struct net_context *ctx);
struct session *get_tcp_session(int sock);
void zperf_session_init(void);
void zperf_reset_session_stats(struct session *session);

View file

@ -14,9 +14,7 @@ LOG_MODULE_DECLARE(net_zperf_sample, LOG_LEVEL_DBG);
#include <zephyr/sys/printk.h>
#include <zephyr/net/net_core.h>
#include <zephyr/net/net_ip.h>
#include <zephyr/net/net_pkt.h>
#include <zephyr/net/socket.h>
#include "zperf.h"
#include "zperf_internal.h"
@ -30,28 +28,35 @@ LOG_MODULE_DECLARE(net_zperf_sample, LOG_LEVEL_DBG);
static struct sockaddr_in6 *in6_addr_my;
static struct sockaddr_in *in4_addr_my;
const struct shell *tcp_shell;
static bool init_done;
static void tcp_received(struct net_context *context,
struct net_pkt *pkt,
union net_ip_header *ip_hdr,
union net_proto_header *proto_hdr,
int status,
void *user_data)
#if IS_ENABLED(CONFIG_NET_TC_THREAD_COOPERATIVE)
#define TCP_RECEIVER_THREAD_PRIORITY K_PRIO_COOP(8)
#else
#define TCP_RECEIVER_THREAD_PRIORITY K_PRIO_PREEMPT(8)
#endif
#define TCP_RECEIVER_STACK_SIZE 2048
#define SOCK_ID_IPV4_LISTEN 0
#define SOCK_ID_IPV4_DATA 1
#define SOCK_ID_IPV6_LISTEN 2
#define SOCK_ID_IPV6_DATA 3
#define SOCK_ID_MAX 4
#define TCP_RECEIVER_BUF_SIZE 1500
K_THREAD_STACK_DEFINE(tcp_receiver_stack_area, TCP_RECEIVER_STACK_SIZE);
struct k_thread tcp_receiver_thread_data;
static void tcp_received(const struct shell *shell, int sock, size_t datalen)
{
const struct shell *shell = tcp_shell;
struct session *session;
int64_t time;
int len = 0;
if (!shell) {
printk("Shell is not set!\n");
return;
}
time = k_uptime_ticks();
session = get_tcp_session(context);
session = get_tcp_session(sock);
if (!session) {
shell_fprintf(shell, SHELL_WARNING, "Cannot get a session!\n");
return;
@ -69,13 +74,9 @@ static void tcp_received(struct net_context *context,
__fallthrough;
case STATE_ONGOING:
session->counter++;
session->length += datalen;
if (pkt) {
len = net_pkt_remaining_data(pkt);
session->length += len;
}
if (pkt == NULL && status == 0) { /* EOF */
if (datalen == 0) { /* EOF */
uint32_t rate_in_kbps;
uint32_t duration;
@ -108,13 +109,9 @@ static void tcp_received(struct net_context *context,
zperf_tcp_stopped();
net_context_unref(context);
session->state = STATE_NULL;
}
if (pkt) {
(void)net_context_update_recv_wnd(context, len);
}
break;
case STATE_LAST_PACKET_RECEIVED:
@ -122,60 +119,33 @@ static void tcp_received(struct net_context *context,
default:
shell_fprintf(shell, SHELL_WARNING, "Unsupported case\n");
}
if (pkt) {
net_pkt_unref(pkt);
}
}
static void tcp_accepted(struct net_context *context,
struct sockaddr *addr,
socklen_t addrlen,
int error,
void *user_data)
void tcp_receiver_thread(void *ptr1, void *ptr2, void *ptr3)
{
const struct shell *shell = user_data;
ARG_UNUSED(ptr3);
static uint8_t buf[TCP_RECEIVER_BUF_SIZE];
const struct shell *shell = ptr1;
int port = POINTER_TO_INT(ptr2);
struct pollfd fds[SOCK_ID_MAX] = { 0 };
int ret;
ret = net_context_recv(context, tcp_received, K_NO_WAIT, user_data);
if (ret < 0) {
shell_fprintf(shell, SHELL_WARNING,
"Cannot receive TCP packet (family %d)",
net_context_get_family(context));
}
}
void zperf_tcp_receiver_init(const struct shell *shell, int port)
{
static bool init_done;
struct net_context *context4 = NULL;
struct net_context *context6 = NULL;
const struct in_addr *in4_addr = NULL;
const struct in6_addr *in6_addr = NULL;
int ret;
if (init_done) {
zperf_tcp_started();
return;
}
tcp_shell = shell;
if (IS_ENABLED(CONFIG_NET_IPV6)) {
in6_addr_my = zperf_get_sin6();
for (int i = 0; i < ARRAY_SIZE(fds); i++) {
fds[i].fd = -1;
}
if (IS_ENABLED(CONFIG_NET_IPV4)) {
const struct in_addr *in4_addr = NULL;
in4_addr_my = zperf_get_sin();
}
if (IS_ENABLED(CONFIG_NET_IPV4)) {
ret = net_context_get(AF_INET, SOCK_STREAM, IPPROTO_TCP,
&context4);
if (ret < 0) {
fds[SOCK_ID_IPV4_LISTEN].fd = socket(AF_INET, SOCK_STREAM,
IPPROTO_TCP);
if (fds[SOCK_ID_IPV4_LISTEN].fd < 0) {
shell_fprintf(shell, SHELL_WARNING,
"Cannot get IPv4 TCP network context.\n");
return;
"Cannot create IPv4 network socket.\n");
goto cleanup;
}
if (MY_IP4ADDR && strlen(MY_IP4ADDR)) {
@ -200,19 +170,43 @@ void zperf_tcp_receiver_init(const struct shell *shell, int port)
sizeof(struct in_addr));
}
in4_addr_my->sin_port = htons(port);
shell_fprintf(shell, SHELL_NORMAL, "Binding to %s\n",
net_sprint_ipv4_addr(&in4_addr_my->sin_addr));
in4_addr_my->sin_port = htons(port);
ret = bind(fds[SOCK_ID_IPV4_LISTEN].fd,
(struct sockaddr *)in4_addr_my,
sizeof(struct sockaddr_in));
if (ret < 0) {
shell_fprintf(shell, SHELL_WARNING,
"Cannot bind IPv4 UDP port %d (%d)\n",
ntohs(in4_addr_my->sin_port),
errno);
goto cleanup;
}
ret = listen(fds[SOCK_ID_IPV4_LISTEN].fd, 1);
if (ret < 0) {
shell_fprintf(shell, SHELL_WARNING,
"Cannot listen IPv4 TCP (%d)", errno);
goto cleanup;
}
fds[SOCK_ID_IPV4_LISTEN].events = POLLIN;
}
if (IS_ENABLED(CONFIG_NET_IPV6)) {
ret = net_context_get(AF_INET6, SOCK_STREAM, IPPROTO_TCP,
&context6);
if (ret < 0) {
const struct in6_addr *in6_addr = NULL;
in6_addr_my = zperf_get_sin6();
fds[SOCK_ID_IPV6_LISTEN].fd = socket(AF_INET6, SOCK_STREAM,
IPPROTO_TCP);
if (fds[SOCK_ID_IPV6_LISTEN].fd < 0) {
shell_fprintf(shell, SHELL_WARNING,
"Cannot get IPv6 TCP network context.\n");
return;
"Cannot create IPv6 network socket.\n");
goto cleanup;
}
if (MY_IP6ADDR && strlen(MY_IP6ADDR)) {
@ -238,71 +232,141 @@ void zperf_tcp_receiver_init(const struct shell *shell, int port)
sizeof(struct in6_addr));
}
in6_addr_my->sin6_port = htons(port);
shell_fprintf(shell, SHELL_NORMAL, "Binding to %s\n",
net_sprint_ipv6_addr(&in6_addr_my->sin6_addr));
in6_addr_my->sin6_port = htons(port);
}
if (IS_ENABLED(CONFIG_NET_IPV6) && context6) {
ret = net_context_bind(context6,
(struct sockaddr *)in6_addr_my,
sizeof(struct sockaddr_in6));
ret = bind(fds[SOCK_ID_IPV6_LISTEN].fd,
(struct sockaddr *)in6_addr_my,
sizeof(struct sockaddr_in6));
if (ret < 0) {
shell_fprintf(shell, SHELL_WARNING,
"Cannot bind IPv6 TCP port %d (%d)\n",
ntohs(in6_addr_my->sin6_port), ret);
return;
"Cannot bind IPv6 UDP port %d (%d)\n",
ntohs(in6_addr_my->sin6_port),
errno);
goto cleanup;
}
ret = net_context_listen(context6, 0);
ret = listen(fds[SOCK_ID_IPV6_LISTEN].fd, 1);
if (ret < 0) {
shell_fprintf(shell, SHELL_WARNING,
"Cannot listen IPv6 TCP (%d)", ret);
return;
"Cannot listen IPv6 TCP (%d)", errno);
goto cleanup;
}
ret = net_context_accept(context6, tcp_accepted, K_NO_WAIT,
NULL);
if (ret < 0) {
shell_fprintf(shell, SHELL_WARNING,
"Cannot receive IPv6 TCP packets (%d)",
ret);
return;
}
}
if (IS_ENABLED(CONFIG_NET_IPV4) && context4) {
ret = net_context_bind(context4,
(struct sockaddr *)in4_addr_my,
sizeof(struct sockaddr_in));
if (ret < 0) {
shell_fprintf(shell, SHELL_WARNING,
"Cannot bind IPv4 TCP port %d (%d)\n",
ntohs(in4_addr_my->sin_port), ret);
return;
}
ret = net_context_listen(context4, 0);
if (ret < 0) {
shell_fprintf(shell, SHELL_WARNING,
"Cannot listen IPv4 TCP (%d)", ret);
return;
}
ret = net_context_accept(context4, tcp_accepted, K_NO_WAIT,
(void *)shell);
if (ret < 0) {
shell_fprintf(shell, SHELL_WARNING,
"Cannot receive IPv4 TCP packets (%d)",
ret);
return;
}
fds[SOCK_ID_IPV6_LISTEN].events = POLLIN;
}
shell_fprintf(shell, SHELL_NORMAL,
"Listening on port %d\n", port);
/* TODO Investigate started/stopped logic */
zperf_tcp_started();
init_done = true;
while (true) {
ret = poll(fds, ARRAY_SIZE(fds), -1);
if (ret < 0) {
shell_fprintf(shell, SHELL_WARNING,
"TCP receiver poll error (%d)\n",
errno);
goto cleanup;
}
for (int i = 0; i < ARRAY_SIZE(fds); i++) {
struct sockaddr addr;
socklen_t addrlen = sizeof(addr);
if ((fds[i].revents & POLLERR) ||
(fds[i].revents & POLLNVAL)) {
shell_fprintf(
shell, SHELL_WARNING,
"TCP receiver IPv%d socket error\n",
(i <= SOCK_ID_IPV4_DATA) ? 4 : 6);
goto cleanup;
}
if (!(fds[i].revents & POLLIN)) {
continue;
}
switch (i) {
case SOCK_ID_IPV4_LISTEN:
case SOCK_ID_IPV6_LISTEN:{
int sock = accept(fds[i].fd, &addr, &addrlen);
if (sock < 0) {
shell_fprintf(
shell, SHELL_WARNING,
"TCP receiver IPv%d accept error\n",
(i <= SOCK_ID_IPV4_DATA) ? 4 : 6);
goto cleanup;
}
if (i == SOCK_ID_IPV4_LISTEN &&
fds[SOCK_ID_IPV4_DATA].fd < 0) {
fds[SOCK_ID_IPV4_DATA].fd = sock;
fds[SOCK_ID_IPV4_DATA].events = POLLIN;
} else if (i == SOCK_ID_IPV6_LISTEN &&
fds[SOCK_ID_IPV6_DATA].fd < 0) {
fds[SOCK_ID_IPV6_DATA].fd = sock;
fds[SOCK_ID_IPV6_DATA].events = POLLIN;
} else {
/* Too many connections. */
close(sock);
break;
}
break;
}
case SOCK_ID_IPV4_DATA:
case SOCK_ID_IPV6_DATA:
ret = recv(fds[i].fd, buf, sizeof(buf), 0);
if (ret < 0) {
shell_fprintf(
shell, SHELL_WARNING,
"recv failed on IPv%d socket (%d)\n",
(i <= SOCK_ID_IPV4_DATA) ? 4 : 6,
errno);
goto cleanup;
}
tcp_received(shell, fds[i].fd, ret);
if (ret == 0) {
close(fds[i].fd);
fds[i].fd = -1;
}
break;
}
}
}
cleanup:
for (int i = 0; i < ARRAY_SIZE(fds); i++) {
if (fds[i].fd >= 0) {
close(fds[i].fd);
}
}
}
void zperf_tcp_receiver_init(const struct shell *shell, int port)
{
if (init_done) {
zperf_tcp_started();
return;
}
k_thread_create(&tcp_receiver_thread_data,
tcp_receiver_stack_area,
K_THREAD_STACK_SIZEOF(tcp_receiver_stack_area),
tcp_receiver_thread,
(void *)shell, INT_TO_POINTER(port), NULL,
TCP_RECEIVER_THREAD_PRIORITY,
IS_ENABLED(CONFIG_USERSPACE) ? K_USER |
K_INHERIT_PERMS : 0,
K_NO_WAIT);
}

View file

@ -13,9 +13,7 @@ LOG_MODULE_DECLARE(net_zperf_sample, LOG_LEVEL_DBG);
#include <zephyr/zephyr.h>
#include <zephyr/sys/printk.h>
#include <zephyr/net/net_core.h>
#include <zephyr/net/net_pkt.h>
#include <zephyr/net/udp.h>
#include <zephyr/net/socket.h>
#include "zperf.h"
#include "zperf_internal.h"
@ -29,27 +27,22 @@ LOG_MODULE_DECLARE(net_zperf_sample, LOG_LEVEL_DBG);
static struct sockaddr_in6 *in6_addr_my;
static struct sockaddr_in *in4_addr_my;
static inline void set_dst_addr(const struct shell *shell,
sa_family_t family,
struct net_pkt *pkt,
union net_ip_header *ip_hdr,
struct net_udp_hdr *udp_hdr,
struct sockaddr *dst_addr)
{
if (IS_ENABLED(CONFIG_NET_IPV6) && family == AF_INET6) {
net_ipv6_addr_copy_raw((uint8_t *)&net_sin6(dst_addr)->sin6_addr,
ip_hdr->ipv6->src);
net_sin6(dst_addr)->sin6_family = AF_INET6;
net_sin6(dst_addr)->sin6_port = udp_hdr->src_port;
}
#if IS_ENABLED(CONFIG_NET_TC_THREAD_COOPERATIVE)
#define UDP_RECEIVER_THREAD_PRIORITY K_PRIO_COOP(8)
#else
#define UDP_RECEIVER_THREAD_PRIORITY K_PRIO_PREEMPT(8)
#endif
if (IS_ENABLED(CONFIG_NET_IPV4) && family == AF_INET) {
net_ipv4_addr_copy_raw((uint8_t *)&net_sin(dst_addr)->sin_addr,
ip_hdr->ipv4->src);
net_sin(dst_addr)->sin_family = AF_INET;
net_sin(dst_addr)->sin_port = udp_hdr->src_port;
}
}
#define UDP_RECEIVER_STACK_SIZE 2048
#define SOCK_ID_IPV4 0
#define SOCK_ID_IPV6 1
#define SOCK_ID_MAX 2
#define UDP_RECEIVER_BUF_SIZE 1500
K_THREAD_STACK_DEFINE(udp_receiver_stack_area, UDP_RECEIVER_STACK_SIZE);
struct k_thread udp_receiver_thread_data;
static inline void build_reply(struct zperf_udp_datagram *hdr,
struct zperf_server_hdr *stat,
@ -80,72 +73,51 @@ static inline void build_reply(struct zperf_udp_datagram *hdr,
sizeof(struct zperf_server_hdr)
static int zperf_receiver_send_stat(const struct shell *shell,
struct net_context *context,
struct net_pkt *pkt,
union net_ip_header *ip_hdr,
struct net_udp_hdr *udp_hdr,
int sock, const struct sockaddr *addr,
struct zperf_udp_datagram *hdr,
struct zperf_server_hdr *stat)
{
uint8_t reply[BUF_SIZE];
struct sockaddr dst_addr;
int ret;
shell_fprintf(shell, SHELL_NORMAL,
"Received %d bytes\n", net_pkt_remaining_data(pkt));
set_dst_addr(shell, net_pkt_family(pkt),
pkt, ip_hdr, udp_hdr, &dst_addr);
build_reply(hdr, stat, reply);
ret = net_context_sendto(context, reply, BUF_SIZE, &dst_addr,
net_pkt_family(pkt) == AF_INET6 ?
sizeof(struct sockaddr_in6) :
sizeof(struct sockaddr_in),
NULL, K_NO_WAIT, NULL);
ret = sendto(sock, reply, sizeof(reply), 0, addr,
addr->sa_family == AF_INET6 ?
sizeof(struct sockaddr_in6) :
sizeof(struct sockaddr_in));
if (ret < 0) {
shell_fprintf(shell, SHELL_WARNING,
" Cannot send data to peer (%d)", ret);
" Cannot send data to peer (%d)", errno);
}
return ret;
}
static void udp_received(struct net_context *context,
struct net_pkt *pkt,
union net_ip_header *ip_hdr,
union net_proto_header *proto_hdr,
int status,
void *user_data)
static void udp_received(const struct shell *shell, int sock,
const struct sockaddr *addr, uint8_t *data,
size_t datalen)
{
NET_PKT_DATA_ACCESS_DEFINE(zperf, struct zperf_udp_datagram);
struct net_udp_hdr *udp_hdr = proto_hdr->udp;
const struct shell *shell = user_data;
struct zperf_udp_datagram *hdr;
struct session *session;
int32_t transit_time;
int64_t time;
int32_t id;
if (!pkt) {
if (datalen < sizeof(struct zperf_udp_datagram)) {
shell_fprintf(shell, SHELL_WARNING,
"Short iperf packet!\n");
return;
}
hdr = (struct zperf_udp_datagram *)net_pkt_get_data(pkt, &zperf);
if (!hdr) {
shell_fprintf(shell, SHELL_WARNING,
"Short iperf packet!\n");
goto out;
}
hdr = (struct zperf_udp_datagram *)data;
time = k_uptime_ticks();
session = get_session(pkt, ip_hdr, proto_hdr, SESSION_UDP);
session = get_session(addr, SESSION_UDP);
if (!session) {
shell_fprintf(shell, SHELL_WARNING,
"Cannot get a session!\n");
goto out;
return;
}
id = ntohl(hdr->id);
@ -157,8 +129,7 @@ static void udp_received(struct net_context *context,
/* Session is already completed: Resend the stat packet
* and continue
*/
if (zperf_receiver_send_stat(shell, context, pkt,
ip_hdr, udp_hdr, hdr,
if (zperf_receiver_send_stat(shell, sock, addr, hdr,
&session->stat) < 0) {
shell_fprintf(shell, SHELL_WARNING,
"Failed to send the packet\n");
@ -209,8 +180,7 @@ static void udp_received(struct net_context *context,
session->stat.jitter1 = 0;
session->stat.jitter2 = session->jitter;
if (zperf_receiver_send_stat(shell, context, pkt,
ip_hdr, udp_hdr, hdr,
if (zperf_receiver_send_stat(shell, sock, addr, hdr,
&session->stat) < 0) {
shell_fprintf(shell, SHELL_WARNING,
"Failed to send the packet\n");
@ -244,7 +214,7 @@ static void udp_received(struct net_context *context,
} else {
/* Update counter */
session->counter++;
session->length += net_pkt_remaining_data(pkt);
session->length += datalen;
/* Compute jitter */
transit_time = time_delta(
@ -281,35 +251,32 @@ static void udp_received(struct net_context *context,
default:
break;
}
out:
net_pkt_unref(pkt);
}
void zperf_udp_receiver_init(const struct shell *shell, int port)
void udp_receiver_thread(void *ptr1, void *ptr2, void *ptr3)
{
struct net_context *context4 = NULL;
struct net_context *context6 = NULL;
const struct in_addr *in4_addr = NULL;
const struct in6_addr *in6_addr = NULL;
ARG_UNUSED(ptr3);
static uint8_t buf[UDP_RECEIVER_BUF_SIZE];
const struct shell *shell = ptr1;
int port = POINTER_TO_INT(ptr2);
struct pollfd fds[SOCK_ID_MAX] = { 0 };
int ret;
if (IS_ENABLED(CONFIG_NET_IPV6)) {
in6_addr_my = zperf_get_sin6();
for (int i = 0; i < ARRAY_SIZE(fds); i++) {
fds[i].fd = -1;
}
if (IS_ENABLED(CONFIG_NET_IPV4)) {
const struct in_addr *in4_addr = NULL;
in4_addr_my = zperf_get_sin();
}
if (IS_ENABLED(CONFIG_NET_IPV4)) {
ret = net_context_get(AF_INET, SOCK_DGRAM, IPPROTO_UDP,
&context4);
if (ret < 0) {
fds[SOCK_ID_IPV4].fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (fds[SOCK_ID_IPV4].fd < 0) {
shell_fprintf(shell, SHELL_WARNING,
"Cannot get IPv4 network context.\n");
return;
"Cannot create IPv4 network socket.\n");
goto cleanup;
}
if (MY_IP4ADDR && strlen(MY_IP4ADDR)) {
@ -328,7 +295,7 @@ void zperf_udp_receiver_init(const struct shell *shell, int port)
if (!in4_addr) {
shell_fprintf(shell, SHELL_WARNING,
"Unable to get IPv4 by default\n");
return;
goto cleanup;
}
memcpy(&in4_addr_my->sin_addr, in4_addr,
sizeof(struct in_addr));
@ -339,27 +306,30 @@ void zperf_udp_receiver_init(const struct shell *shell, int port)
in4_addr_my->sin_port = htons(port);
if (context4) {
ret = net_context_bind(context4,
(struct sockaddr *)in4_addr_my,
sizeof(struct sockaddr_in));
if (ret < 0) {
shell_fprintf(shell, SHELL_WARNING,
"Cannot bind IPv4 UDP port %d (%d)\n",
ntohs(in4_addr_my->sin_port),
ret);
return;
}
ret = bind(fds[SOCK_ID_IPV4].fd,
(struct sockaddr *)in4_addr_my,
sizeof(struct sockaddr_in));
if (ret < 0) {
shell_fprintf(shell, SHELL_WARNING,
"Cannot bind IPv4 UDP port %d (%d)\n",
ntohs(in4_addr_my->sin_port),
errno);
goto cleanup;
}
fds[SOCK_ID_IPV4].events = POLLIN;
}
if (IS_ENABLED(CONFIG_NET_IPV6)) {
ret = net_context_get(AF_INET6, SOCK_DGRAM, IPPROTO_UDP,
&context6);
if (ret < 0) {
const struct in6_addr *in6_addr = NULL;
in6_addr_my = zperf_get_sin6();
fds[SOCK_ID_IPV6].fd = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP);
if (fds[SOCK_ID_IPV6].fd < 0) {
shell_fprintf(shell, SHELL_WARNING,
"Cannot get IPv6 network context.\n");
return;
"Cannot create IPv4 network socket.\n");
goto cleanup;
}
if (MY_IP6ADDR && strlen(MY_IP6ADDR)) {
@ -379,7 +349,7 @@ void zperf_udp_receiver_init(const struct shell *shell, int port)
if (!in6_addr) {
shell_fprintf(shell, SHELL_WARNING,
"Unable to get IPv4 by default\n");
return;
goto cleanup;
}
memcpy(&in6_addr_my->sin6_addr, in6_addr,
sizeof(struct in6_addr));
@ -390,40 +360,80 @@ void zperf_udp_receiver_init(const struct shell *shell, int port)
in6_addr_my->sin6_port = htons(port);
if (context6) {
ret = net_context_bind(context6,
(struct sockaddr *)in6_addr_my,
sizeof(struct sockaddr_in6));
if (ret < 0) {
shell_fprintf(shell, SHELL_WARNING,
"Cannot bind IPv6 UDP port %d (%d)\n",
ntohs(in6_addr_my->sin6_port),
ret);
return;
}
}
}
if (IS_ENABLED(CONFIG_NET_IPV6)) {
ret = net_context_recv(context6, udp_received, K_NO_WAIT,
(void *)shell);
ret = bind(fds[SOCK_ID_IPV6].fd,
(struct sockaddr *)in6_addr_my,
sizeof(struct sockaddr_in6));
if (ret < 0) {
shell_fprintf(shell, SHELL_WARNING,
"Cannot receive IPv6 UDP packets\n");
return;
"Cannot bind IPv6 UDP port %d (%d)\n",
ntohs(in6_addr_my->sin6_port),
ret);
goto cleanup;
}
}
if (IS_ENABLED(CONFIG_NET_IPV4)) {
ret = net_context_recv(context4, udp_received, K_NO_WAIT,
(void *)shell);
if (ret < 0) {
shell_fprintf(shell, SHELL_WARNING,
"Cannot receive IPv4 UDP packets\n");
return;
}
fds[SOCK_ID_IPV6].events = POLLIN;
}
shell_fprintf(shell, SHELL_NORMAL,
"Listening on port %d\n", port);
while (true) {
ret = poll(fds, ARRAY_SIZE(fds), -1);
if (ret < 0) {
shell_fprintf(shell, SHELL_WARNING,
"UDP receiver poll error (%d)\n",
errno);
goto cleanup;
}
for (int i = 0; i < ARRAY_SIZE(fds); i++) {
struct sockaddr addr;
socklen_t addrlen = sizeof(addr);
if ((fds[i].revents & POLLERR) ||
(fds[i].revents & POLLNVAL)) {
shell_fprintf(
shell, SHELL_WARNING,
"UDP receiver IPv%d socket error\n",
(i == SOCK_ID_IPV4) ? 4 : 6);
goto cleanup;
}
if (!(fds[i].revents & POLLIN)) {
continue;
}
ret = recvfrom(fds[i].fd, buf, sizeof(buf), 0, &addr,
&addrlen);
if (ret < 0) {
shell_fprintf(
shell, SHELL_WARNING,
"recv failed on IPv%d socket (%d)\n",
(i == SOCK_ID_IPV4) ? 4 : 6, errno);
goto cleanup;
}
udp_received(shell, fds[i].fd, &addr, buf, ret);
}
}
cleanup:
for (int i = 0; i < ARRAY_SIZE(fds); i++) {
if (fds[i].fd >= 0) {
close(fds[i].fd);
}
}
}
void zperf_udp_receiver_init(const struct shell *shell, int port)
{
k_thread_create(&udp_receiver_thread_data,
udp_receiver_stack_area,
K_THREAD_STACK_SIZEOF(udp_receiver_stack_area),
udp_receiver_thread,
(void *)shell, INT_TO_POINTER(port), NULL,
UDP_RECEIVER_THREAD_PRIORITY,
IS_ENABLED(CONFIG_USERSPACE) ? K_USER |
K_INHERIT_PERMS : 0,
K_NO_WAIT);
}