net: tcp2: Implement the retransmission for the outgoing data

In order to support the retransmission for the outgoing data:

1. The outgoing data packet is appended to the send_data queue
   in net_tcp_queue_data().

2. tcp_send_queued_data() is called and will use tcp_send_data()
   to sends queued but unsent data packet by packet
   until there's an unsent data and the receiver's window isn't full.

   tcp_send_queued_data() subscribes send_data_timer
   that will handle retrasmissions with tcp_resend_data().

3. tcp_send_data() peeks a single chunk of data from the send_data
   queue that will not exceed the maximum segment size
   until the the receiver's window is full.

   tcp_send_data() uses conn->seq and conn->unack_len as the sequence
   number for the TCP packet.

   conn->unacked_len is advanced on each send.

4. On data acknowledgment:

   - acknowledged amount of data is removed from the beginning
     of the send_data queue
   - conn->seq is advanced by the acknowledged amount
   - conn->unacked_len is decremented by the acknowledged amount
   - send_data_timer is cancelled
   - tcp_send_queued_data() is called to send queued but
     prevoiusly unsent data

5. On timeout, tcp_resend_data() will reset conn->unack_len,
   peek one packet from the beginning of the send_queue and resend,
   terminating the connection on retries exceeded.

   Meanwhile the outgoing data tcp_send_queued_data() is just
   appended to the send_data but not sent.

   In case of the acknowledgement, tcp_send_queued_data() will
   start sending multiple packets until the receiver's window
   is full.

Signed-off-by: Oleg Zhurakivskyy <oleg.zhurakivskyy@intel.com>
This commit is contained in:
Oleg Zhurakivskyy 2020-05-15 18:17:37 +03:00 committed by Carles Cufí
commit 6096efb3da
2 changed files with 249 additions and 5 deletions

View file

@ -249,6 +249,11 @@ static int tcp_conn_unref(struct tcp *conn)
tcp_send_queue_flush(conn);
if (k_delayed_work_remaining_get(&conn->send_data_timer)) {
k_delayed_work_cancel(&conn->send_data_timer);
}
tcp_pkt_unref(conn->send_data);
k_delayed_work_cancel(&conn->timewait_timer);
memset(conn, 0, sizeof(*conn));
@ -606,6 +611,159 @@ static void tcp_out(struct tcp *conn, u8_t flags)
tcp_out_ext(conn, flags, NULL /* no data */, conn->seq);
}
static int tcp_pkt_pull(struct net_pkt *pkt, size_t len)
{
int total = net_pkt_get_len(pkt);
int ret = 0;
if (len > total) {
ret = -EINVAL;
goto out;
}
net_pkt_cursor_init(pkt);
net_pkt_set_overwrite(pkt, true);
net_pkt_pull(pkt, len);
net_pkt_trim_buffer(pkt);
out:
return ret;
}
static void tcp_pkt_peek(struct net_pkt *to, struct net_pkt *from, size_t pos,
size_t len)
{
net_pkt_cursor_init(to);
net_pkt_cursor_init(from);
if (pos) {
net_pkt_set_overwrite(from, true);
net_pkt_skip(from, pos);
}
net_pkt_copy(to, from, len);
}
static bool tcp_window_full(struct tcp *conn)
{
bool window_full = !(conn->unacked_len < conn->send_win);
NET_DBG("conn: %p window_full=%hu", conn, window_full);
return window_full;
}
static int tcp_unsent_len(struct tcp *conn)
{
int unsent_len;
if (conn->unacked_len > conn->send_data_total) {
NET_ERR("total=%zu, unacked_len=%d",
conn->send_data_total, conn->unacked_len);
unsent_len = -ERANGE;
goto out;
}
unsent_len = conn->send_data_total - conn->unacked_len;
out:
NET_DBG("unsent_len=%d", unsent_len);
return unsent_len;
}
static int tcp_send_data(struct tcp *conn)
{
int ret = 0;
int pos, len;
struct net_pkt *pkt;
pos = conn->unacked_len;
len = MIN3(conn->send_data_total - conn->unacked_len,
conn->send_win - conn->unacked_len,
conn_mss(conn));
pkt = tcp_pkt_alloc(conn, len);
if (!pkt) {
NET_ERR("conn: %p packet allocation failed, len=%d", conn, len);
ret = -ENOBUFS;
goto out;
}
tcp_pkt_peek(pkt, conn->send_data, pos, len);
tcp_out_ext(conn, PSH | ACK, pkt, conn->seq + conn->unacked_len);
conn->unacked_len += len;
out:
conn_send_data_dump(conn);
return ret;
}
/* Send all queued but unsent data from the send_data packet by packet
* until the receiver's window is full. */
static int tcp_send_queued_data(struct tcp *conn)
{
int ret = 0;
bool subscribe = false;
if (conn->data_mode == TCP_DATA_MODE_RESEND) {
goto out;
}
while (tcp_unsent_len(conn) > 0) {
if (tcp_window_full(conn)) {
subscribe = true;
break;
}
ret = tcp_send_data(conn);
if (ret < 0) {
break;
}
}
if (conn->unacked_len) {
subscribe = true;
}
if (k_delayed_work_remaining_get(&conn->send_data_timer)) {
subscribe = false;
}
if (subscribe) {
conn->send_data_retries = 0;
k_delayed_work_submit(&conn->send_data_timer, K_MSEC(tcp_rto));
}
out:
return ret;
}
static void tcp_resend_data(struct k_work *work)
{
struct tcp *conn = CONTAINER_OF(work, struct tcp, send_data_timer);
bool conn_unref = false;
NET_DBG("send_data_retries=%hu", conn->send_data_retries);
if (conn->send_data_retries >= tcp_retries) {
NET_DBG("conn: %p close, data retransmissions exceeded", conn);
conn_unref = true;
goto out;
}
conn->data_mode = TCP_DATA_MODE_RESEND;
conn->unacked_len = 0;
tcp_send_data(conn);
conn->send_data_retries++;
k_delayed_work_submit(&conn->send_data_timer, K_MSEC(tcp_rto));
out:
if (conn_unref) {
tcp_conn_unref(conn);
}
}
static void tcp_timewait_timeout(struct k_work *work)
{
struct tcp *conn = CONTAINER_OF(work, struct tcp, timewait_timer);
@ -649,6 +807,9 @@ static struct tcp *tcp_conn_alloc(void)
k_delayed_work_init(&conn->timewait_timer, tcp_timewait_timeout);
conn->send_data = tcp_pkt_alloc(conn, 0);
k_delayed_work_init(&conn->send_data_timer, tcp_resend_data);
tcp_conn_ref(conn);
sys_slist_append(&tcp_conns, (sys_snode_t *)conn);
@ -938,6 +1099,49 @@ next_state:
break;
}
if (th && net_tcp_seq_cmp(th_ack(th), conn->seq) > 0) {
u32_t len_acked = th_ack(th) - conn->seq;
NET_DBG("conn: %p len_acked=%u", conn, len_acked);
if ((conn->send_data_total < len_acked) ||
(tcp_pkt_pull(conn->send_data,
len_acked) < 0)) {
NET_ERR("conn: %p, Invalid len_acked=%u "
"(total=%zu)", conn, len_acked,
conn->send_data_total);
tcp_out(conn, RST);
conn_state(conn, TCP_CLOSED);
break;
}
conn->send_data_total -= len_acked;
conn->unacked_len -= len_acked;
conn_seq(conn, + len_acked);
conn_send_data_dump(conn);
if (!k_delayed_work_remaining_get(&conn->send_data_timer)) {
NET_ERR("conn: %p, Missing a subscription "
"of the send_data queue timer", conn);
tcp_out(conn, RST);
conn_state(conn, TCP_CLOSED);
break;
}
conn->send_data_retries = 0;
k_delayed_work_cancel(&conn->send_data_timer);
if (conn->data_mode == TCP_DATA_MODE_RESEND) {
conn->unacked_len = 0;
}
conn->data_mode = TCP_DATA_MODE_SEND;
if (tcp_send_queued_data(conn) < 0) {
tcp_out(conn, RST);
conn_state(conn, TCP_CLOSED);
break;
}
}
if (len) {
if (th_seq(th) == conn->ack) {
if (tcp_data_get(conn, pkt) < 0) {
@ -1051,21 +1255,38 @@ int net_tcp_update_recv_wnd(struct net_context *context, s32_t delta)
return -EPROTONOSUPPORT;
}
/* net context wants to queue data for the TCP connection */
/* net_context queues the outgoing data for the TCP connection */
int net_tcp_queue_data(struct net_context *context, struct net_pkt *pkt)
{
struct tcp *conn = context->tcp;
int ret = 0;
NET_DBG("conn: %p, len: %zu", conn, net_pkt_get_len(pkt));
size_t len;
if (!conn || conn->state != TCP_ESTABLISHED) {
ret = -ENOTCONN;
goto out;
}
tcp_out(conn, PSH | ACK, pkt);
out:
k_mutex_lock(&conn->lock, K_FOREVER);
len = net_pkt_get_len(pkt);
net_pkt_append_buffer(conn->send_data, pkt->buffer);
conn->send_data_total += len;
NET_DBG("conn: %p Queued %zu bytes (total %zu)", conn, len,
conn->send_data_total);
pkt->buffer = NULL;
tcp_pkt_unref(pkt);
ret = tcp_send_queued_data(conn);
if (ret < 0) {
k_mutex_unlock(&conn->lock);
tcp_conn_unref(conn);
goto out;
}
k_mutex_unlock(&conn->lock);
out:
return ret;
}

View file

@ -8,6 +8,10 @@
#define is(_a, _b) (strcmp((_a), (_b)) == 0)
#ifndef MIN3
#define MIN3(_a, _b, _c) MIN((_a), MIN((_b), (_c)))
#endif
#define th_seq(_x) ntohl((_x)->th_seq)
#define th_ack(_x) ntohl((_x)->th_ack)
@ -89,6 +93,19 @@
(_conn)->state = _s; \
})
#define conn_send_data_dump(_conn) \
({ \
NET_DBG("conn: %p total=%zd, unacked_len=%d, " \
"send_win=%hu, mss=%hu", \
(_conn), net_pkt_get_len((_conn)->send_data), \
conn->unacked_len, conn->send_win, \
conn_mss((_conn))); \
NET_DBG("conn: %p send_data_timer=%hu, send_data_retries=%hu", \
(_conn), \
(bool)k_delayed_work_remaining_get(&(_conn)->send_data_timer),\
(_conn)->send_data_retries); \
})
#define TCPOPT_END 0
#define TCPOPT_NOP 1
#define TCPOPT_MAXSEG 2
@ -141,6 +158,11 @@ enum tcp_state {
TCP_CLOSED
};
enum tcp_data_mode {
TCP_DATA_MODE_SEND = 0,
TCP_DATA_MODE_RESEND = 1
};
union tcp_endpoint {
struct sockaddr sa;
struct sockaddr_in sin;
@ -174,6 +196,7 @@ struct tcp { /* TCP connection */
size_t send_data_total;
u8_t send_data_retries;
int unacked_len;
enum tcp_data_mode data_mode;
bool in_retransmission;
size_t send_retries;
struct k_delayed_work timewait_timer;