From d1353a4584c7af1fb7ea896505275a16f0cbeb07 Mon Sep 17 00:00:00 2001 From: Peter Mitsis Date: Mon, 4 Oct 2021 12:36:22 -0400 Subject: [PATCH] kernel: pipes: add pipe flush routines Adds two routines to flush pipe objects: k_pipe_flush() - This routine flushes the entire pipe. That includes both the pipe's buffer and all pended writers. It is equivalent to reading everything into a giant temporary buffer which is then discarded. k_pipe_buffer_flush() - This routine flushes only the pipe's buffer (if it exists). It is equivalent to reading a maximum of "buffer size" bytes into a temporary buffer which is then discarded. Signed-off-by: Peter Mitsis --- include/kernel.h | 29 +++++ include/tracing/tracing.h | 24 ++++ kernel/pipes.c | 145 ++++++++++++++++++----- subsys/tracing/ctf/tracing_ctf.h | 4 + subsys/tracing/sysview/tracing_sysview.h | 4 + subsys/tracing/test/tracing_test.h | 13 ++ subsys/tracing/user/tracing_user.h | 4 + 7 files changed, 193 insertions(+), 30 deletions(-) diff --git a/include/kernel.h b/include/kernel.h index 63938ef999e..4e1cd6d4459 100644 --- a/include/kernel.h +++ b/include/kernel.h @@ -4925,6 +4925,35 @@ __syscall size_t k_pipe_read_avail(struct k_pipe *pipe); */ __syscall size_t k_pipe_write_avail(struct k_pipe *pipe); +/** + * @brief Flush the pipe of write data + * + * This routine flushes the pipe. Flushing the pipe is equivalent to reading + * both all the data in the pipe's buffer and all the data waiting to go into + * that pipe into a large temporary buffer and discarding the buffer. Any + * writers that were previously pended become unpended. + * + * @param pipe Address of the pipe. + * + * @return N/A + */ +__syscall void k_pipe_flush(struct k_pipe *pipe); + +/** + * @brief Flush the pipe's internal buffer + * + * This routine flushes the pipe's internal buffer. This is equivalent to + * reading up to N bytes from the pipe (where N is the size of the pipe's + * buffer) into a temporary buffer and then discarding that buffer. If there + * were writers previously pending, then some may unpend as they try to fill + * up the pipe's emptied buffer. + * + * @param pipe Address of the pipe. + * + * @return N/A + */ +__syscall void k_pipe_buffer_flush(struct k_pipe *pipe); + /** @} */ /** diff --git a/include/tracing/tracing.h b/include/tracing/tracing.h index 9461427f62f..d09b324a35c 100644 --- a/include/tracing/tracing.h +++ b/include/tracing/tracing.h @@ -1589,6 +1589,30 @@ void sys_trace_idle(void); */ #define sys_port_trace_k_pipe_alloc_init_exit(pipe, ret) +/** + * @brief Trace Pipe flush entry + * @param pipe Pipe object + */ +#define sys_port_trace_k_pipe_flush_enter(pipe) + +/** + * @brief Trace Pipe flush exit + * @param pipe Pipe object + */ +#define sys_port_trace_k_pipe_flush_exit(pipe) + +/** + * @brief Trace Pipe buffer flush entry + * @param pipe Pipe object + */ +#define sys_port_trace_k_pipe_buffer_flush_enter(pipe) + +/** + * @brief Trace Pipe buffer flush exit + * @param pipe Pipe object + */ +#define sys_port_trace_k_pipe_buffer_flush_exit(pipe) + /** * @brief Trace Pipe put attempt entry * @param pipe Pipe object diff --git a/kernel/pipes.c b/kernel/pipes.c index 83d1e6f8bf6..78fbcaeaedf 100644 --- a/kernel/pipes.c +++ b/kernel/pipes.c @@ -26,6 +26,11 @@ struct k_pipe_desc { size_t bytes_to_xfer; /* # bytes left to transfer */ }; +static int pipe_get_internal(k_spinlock_key_t key, struct k_pipe *pipe, + void *data, size_t bytes_to_read, + size_t *bytes_read, size_t min_xfer, + k_timeout_t timeout); + void k_pipe_init(struct k_pipe *pipe, unsigned char *buffer, size_t size) { pipe->buffer = buffer; @@ -78,6 +83,55 @@ static inline int z_vrfy_k_pipe_alloc_init(struct k_pipe *pipe, size_t size) #include #endif +void z_impl_k_pipe_flush(struct k_pipe *pipe) +{ + size_t bytes_read; + + SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, flush, pipe); + + k_spinlock_key_t key = k_spin_lock(&pipe->lock); + + (void) pipe_get_internal(key, pipe, NULL, (size_t) -1, &bytes_read, 0, + K_NO_WAIT); + + SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, flush, pipe); +} + +#ifdef CONFIG_USERSPACE +void z_vrfy_k_pipe_flush(struct k_pipe *pipe) +{ + Z_OOPS(Z_SYSCALL_OBJ(pipe, K_OBJ_PIPE)); + + z_impl_k_pipe_flush(pipe); +} +#include +#endif + +void z_impl_k_pipe_buffer_flush(struct k_pipe *pipe) +{ + size_t bytes_read; + + SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, buffer_flush, pipe); + + k_spinlock_key_t key = k_spin_lock(&pipe->lock); + + if (pipe->buffer != NULL) { + (void) pipe_get_internal(key, pipe, NULL, pipe->size, + &bytes_read, 0, K_NO_WAIT); + } + + SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, buffer_flush, pipe); +} + +#ifdef CONFIG_USERSPACE +void z_vrfy_k_pipe_buffer_flush(struct k_pipe *pipe) +{ + Z_OOPS(Z_SYSCALL_OBJ(pipe, K_OBJ_PIPE)); + + z_impl_k_pipe_buffer_flush(pipe); +} +#endif + int k_pipe_cleanup(struct k_pipe *pipe) { SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, cleanup, pipe); @@ -127,6 +181,11 @@ static size_t pipe_xfer(unsigned char *dest, size_t dest_size, size_t num_bytes = MIN(dest_size, src_size); const unsigned char *end = src + num_bytes; + if (dest == NULL) { + /* Data is being flushed. Pretend the data was copied. */ + return num_bytes; + } + while (src != end) { *dest = *src; dest++; @@ -187,16 +246,19 @@ static size_t pipe_buffer_get(struct k_pipe *pipe, size_t bytes_copied; size_t run_length; size_t num_bytes_read = 0; + size_t dest_off; int i; for (i = 0; i < 2; i++) { run_length = MIN(pipe->bytes_used, pipe->size - pipe->read_index); - bytes_copied = pipe_xfer(dest + num_bytes_read, - dest_size - num_bytes_read, - pipe->buffer + pipe->read_index, - run_length); + dest_off = (dest == NULL) ? 0 : num_bytes_read; + + bytes_copied = pipe_xfer(dest + dest_off, + dest_size - num_bytes_read, + pipe->buffer + pipe->read_index, + run_length); num_bytes_read += bytes_copied; pipe->bytes_used -= bytes_copied; @@ -461,7 +523,7 @@ int z_impl_k_pipe_put(struct k_pipe *pipe, void *data, size_t bytes_to_write, k_spinlock_key_t key2 = k_spin_lock(&pipe->lock); z_sched_unlock_no_reschedule(); (void)z_pend_curr(&pipe->lock, key2, - &pipe->wait_q.writers, timeout); + &pipe->wait_q.writers, timeout); } else { k_sched_unlock(); } @@ -469,7 +531,7 @@ int z_impl_k_pipe_put(struct k_pipe *pipe, void *data, size_t bytes_to_write, *bytes_written = bytes_to_write - pipe_desc.bytes_to_xfer; int ret = pipe_return_code(min_xfer, pipe_desc.bytes_to_xfer, - bytes_to_write); + bytes_to_write); SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, ret); return ret; } @@ -490,42 +552,32 @@ int z_vrfy_k_pipe_put(struct k_pipe *pipe, void *data, size_t bytes_to_write, #include #endif -int z_impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read, - size_t *bytes_read, size_t min_xfer, k_timeout_t timeout) +static int pipe_get_internal(k_spinlock_key_t key, struct k_pipe *pipe, + void *data, size_t bytes_to_read, + size_t *bytes_read, size_t min_xfer, + k_timeout_t timeout) { struct k_thread *writer; struct k_pipe_desc *desc; sys_dlist_t xfer_list; size_t num_bytes_read = 0; + size_t data_off; size_t bytes_copied; - SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, get, pipe, timeout); - - CHECKIF((min_xfer > bytes_to_read) || bytes_read == NULL) { - SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, -EINVAL); - - return -EINVAL; - } - - k_spinlock_key_t key = k_spin_lock(&pipe->lock); - /* * Create a list of "working readers" into which the data will be * directly copied. */ + if (!pipe_xfer_prepare(&xfer_list, &writer, &pipe->wait_q.writers, pipe->bytes_used, bytes_to_read, min_xfer, timeout)) { k_spin_unlock(&pipe->lock, key); *bytes_read = 0; - SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, -EIO); - return -EIO; } - SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, get, pipe, timeout); - z_sched_lock(); k_spin_unlock(&pipe->lock, key); @@ -549,7 +601,8 @@ int z_impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read, sys_dlist_get(&xfer_list); while ((thread != NULL) && (num_bytes_read < bytes_to_read)) { desc = (struct k_pipe_desc *)thread->base.swap_data; - bytes_copied = pipe_xfer((uint8_t *)data + num_bytes_read, + data_off = (data == NULL) ? 0 : num_bytes_read; + bytes_copied = pipe_xfer((uint8_t *)data + data_off, bytes_to_read - num_bytes_read, desc->buffer, desc->bytes_to_xfer); @@ -573,7 +626,8 @@ int z_impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read, if ((writer != NULL) && (num_bytes_read < bytes_to_read)) { desc = (struct k_pipe_desc *)writer->base.swap_data; - bytes_copied = pipe_xfer((uint8_t *)data + num_bytes_read, + data_off = (data == NULL) ? 0 : num_bytes_read; + bytes_copied = pipe_xfer((uint8_t *)data + data_off, bytes_to_read - num_bytes_read, desc->buffer, desc->bytes_to_xfer); @@ -615,8 +669,6 @@ int z_impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read, *bytes_read = num_bytes_read; - SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, 0); - return 0; } @@ -627,12 +679,21 @@ int z_impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read, *bytes_read = num_bytes_read; - SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, 0); - return 0; } - /* Not all data was read */ + /* + * Not all data was read. It is important to note that when this + * routine is invoked by either of the flush routines() both the + * and parameters are set to NULL and K_NO_WAIT respectively. + * Consequently, neither k_pipe_flush() nor k_pipe_buffer_flush() + * will block. + * + * However, this routine may also be invoked by k_pipe_get() and there + * is no enforcement of being non-NULL when called from + * kernel-space. That restriction is enforced when called from + * user-space. + */ struct k_pipe_desc pipe_desc; @@ -640,6 +701,8 @@ int z_impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read, pipe_desc.bytes_to_xfer = bytes_to_read - num_bytes_read; if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT)) { + SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, get, pipe, timeout); + _current->base.swap_data = &pipe_desc; k_spinlock_key_t key2 = k_spin_lock(&pipe->lock); @@ -653,11 +716,33 @@ int z_impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read, *bytes_read = bytes_to_read - pipe_desc.bytes_to_xfer; int ret = pipe_return_code(min_xfer, pipe_desc.bytes_to_xfer, - bytes_to_read); + bytes_to_read); SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, ret); return ret; } +int z_impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read, + size_t *bytes_read, size_t min_xfer, k_timeout_t timeout) +{ + SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, get, pipe, timeout); + + CHECKIF((min_xfer > bytes_to_read) || bytes_read == NULL) { + SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, + timeout, -EINVAL); + + return -EINVAL; + } + + k_spinlock_key_t key = k_spin_lock(&pipe->lock); + + int ret = pipe_get_internal(key, pipe, data, bytes_to_read, bytes_read, + min_xfer, timeout); + + SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, ret); + + return ret; +} + #ifdef CONFIG_USERSPACE int z_vrfy_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read, size_t *bytes_read, size_t min_xfer, k_timeout_t timeout) diff --git a/subsys/tracing/ctf/tracing_ctf.h b/subsys/tracing/ctf/tracing_ctf.h index 85845701945..34043bdb2bf 100644 --- a/subsys/tracing/ctf/tracing_ctf.h +++ b/subsys/tracing/ctf/tracing_ctf.h @@ -270,6 +270,10 @@ extern "C" { #define sys_port_trace_k_pipe_cleanup_exit(pipe, ret) #define sys_port_trace_k_pipe_alloc_init_enter(pipe) #define sys_port_trace_k_pipe_alloc_init_exit(pipe, ret) +#define sys_port_trace_k_pipe_flush_enter(pipe) +#define sys_port_trace_k_pipe_flush_exit(pipe) +#define sys_port_trace_k_pipe_buffer_flush_enter(pipe) +#define sys_port_trace_k_pipe_buffer_flush_exit(pipe) #define sys_port_trace_k_pipe_put_enter(pipe, timeout) #define sys_port_trace_k_pipe_put_blocking(pipe, timeout) #define sys_port_trace_k_pipe_put_exit(pipe, timeout, ret) diff --git a/subsys/tracing/sysview/tracing_sysview.h b/subsys/tracing/sysview/tracing_sysview.h index b27d09ac45a..ac50c4dcd90 100644 --- a/subsys/tracing/sysview/tracing_sysview.h +++ b/subsys/tracing/sysview/tracing_sysview.h @@ -520,6 +520,10 @@ void sys_trace_thread_info(struct k_thread *thread); #define sys_port_trace_k_pipe_cleanup_exit(pipe, ret) #define sys_port_trace_k_pipe_alloc_init_enter(pipe) #define sys_port_trace_k_pipe_alloc_init_exit(pipe, ret) +#define sys_port_trace_k_pipe_flush_enter(pipe) +#define sys_port_trace_k_pipe_flush_exit(pipe) +#define sys_port_trace_k_pipe_buffer_flush_enter(pipe) +#define sys_port_trace_k_pipe_buffer_flush_exit(pipe) #define sys_port_trace_k_pipe_put_enter(pipe, timeout) #define sys_port_trace_k_pipe_put_blocking(pipe, timeout) #define sys_port_trace_k_pipe_put_exit(pipe, timeout, ret) diff --git a/subsys/tracing/test/tracing_test.h b/subsys/tracing/test/tracing_test.h index 3f9f3d281bb..2bba22e41d4 100644 --- a/subsys/tracing/test/tracing_test.h +++ b/subsys/tracing/test/tracing_test.h @@ -345,6 +345,15 @@ #define sys_port_trace_k_pipe_alloc_init_enter(pipe) sys_trace_k_pipe_alloc_init_enter(pipe, size) #define sys_port_trace_k_pipe_alloc_init_exit(pipe, ret) \ sys_trace_k_pipe_alloc_init_exit(pipe, size, ret) +#define sys_port_trace_k_pipe_flush_enter(pipe) \ + sys_trace_k_pipe_flush_enter(pipe) +#define sys_port_trace_k_pipe_flush_exit(pipe) \ + sys_trace_k_pipe_flush_exit(pipe) +#define sys_port_trace_k_pipe_buffer_flush_enter(pipe) \ + sys_trace_k_pipe_buffer_flush_enter(pipe) +#define sys_port_trace_k_pipe_buffer_flush_exit(pipe) \ + sys_trace_k_pipe_buffer_flush_exit(pipe) + #define sys_port_trace_k_pipe_put_enter(pipe, timeout) \ sys_trace_k_pipe_put_enter(pipe, data, bytes_to_write, bytes_written, min_xfer, timeout) #define sys_port_trace_k_pipe_put_blocking(pipe, timeout) \ @@ -607,6 +616,10 @@ void sys_trace_k_pipe_cleanup_enter(struct k_pipe *pipe); void sys_trace_k_pipe_cleanup_exit(struct k_pipe *pipe, int ret); void sys_trace_k_pipe_alloc_init_enter(struct k_pipe *pipe, size_t size); void sys_trace_k_pipe_alloc_init_exit(struct k_pipe *pipe, size_t size, int ret); +void sys_trace_k_pipe_flush_enter(struct k_pipe *pipe); +void sys_trace_k_pipe_flush_exit(struct k_pipe *pipe); +void sys_trace_k_pipe_buffer_flush_enter(struct k_pipe *pipe); +void sys_trace_k_pipe_buffer_flush_exit(struct k_pipe *pipe); void sys_trace_k_pipe_put_enter(struct k_pipe *pipe, void *data, size_t bytes_to_write, size_t *bytes_written, size_t min_xfer, k_timeout_t timeout); void sys_trace_k_pipe_put_blocking(struct k_pipe *pipe, void *data, size_t bytes_to_write, diff --git a/subsys/tracing/user/tracing_user.h b/subsys/tracing/user/tracing_user.h index 861ae271707..7d057ae1991 100644 --- a/subsys/tracing/user/tracing_user.h +++ b/subsys/tracing/user/tracing_user.h @@ -251,6 +251,10 @@ void sys_trace_idle(void); #define sys_port_trace_k_pipe_cleanup_exit(pipe, ret) #define sys_port_trace_k_pipe_alloc_init_enter(pipe) #define sys_port_trace_k_pipe_alloc_init_exit(pipe, ret) +#define sys_port_trace_k_pipe_flush_enter(pipe) +#define sys_port_trace_k_pipe_flush_exit(pipe) +#define sys_port_trace_k_pipe_buffer_flush_enter(pipe) +#define sys_port_trace_k_pipe_buffer_flush_exit(pipe) #define sys_port_trace_k_pipe_put_enter(pipe, timeout) #define sys_port_trace_k_pipe_put_blocking(pipe, timeout) #define sys_port_trace_k_pipe_put_exit(pipe, timeout, ret)