kernel: pipes: add pipe flush routines

Adds two routines to flush pipe objects:
   k_pipe_flush()
     - This routine flushes the entire pipe. That includes both
     the pipe's buffer and all pended writers. It is equivalent
     to reading everything into a giant temporary buffer which
     is then discarded.
   k_pipe_buffer_flush()
     - This routine flushes only the pipe's buffer (if it exists).
     It is equivalent to reading a maximum of "buffer size" bytes
     into a temporary buffer which is then discarded.

Signed-off-by: Peter Mitsis <peter.mitsis@intel.com>
This commit is contained in:
Peter Mitsis 2021-10-04 12:36:22 -04:00 committed by Anas Nashif
commit d1353a4584
7 changed files with 193 additions and 30 deletions

View file

@ -4925,6 +4925,35 @@ __syscall size_t k_pipe_read_avail(struct k_pipe *pipe);
*/ */
__syscall size_t k_pipe_write_avail(struct k_pipe *pipe); __syscall size_t k_pipe_write_avail(struct k_pipe *pipe);
/**
* @brief Flush the pipe of write data
*
* This routine flushes the pipe. Flushing the pipe is equivalent to reading
* both all the data in the pipe's buffer and all the data waiting to go into
* that pipe into a large temporary buffer and discarding the buffer. Any
* writers that were previously pended become unpended.
*
* @param pipe Address of the pipe.
*
* @return N/A
*/
__syscall void k_pipe_flush(struct k_pipe *pipe);
/**
* @brief Flush the pipe's internal buffer
*
* This routine flushes the pipe's internal buffer. This is equivalent to
* reading up to N bytes from the pipe (where N is the size of the pipe's
* buffer) into a temporary buffer and then discarding that buffer. If there
* were writers previously pending, then some may unpend as they try to fill
* up the pipe's emptied buffer.
*
* @param pipe Address of the pipe.
*
* @return N/A
*/
__syscall void k_pipe_buffer_flush(struct k_pipe *pipe);
/** @} */ /** @} */
/** /**

View file

@ -1589,6 +1589,30 @@ void sys_trace_idle(void);
*/ */
#define sys_port_trace_k_pipe_alloc_init_exit(pipe, ret) #define sys_port_trace_k_pipe_alloc_init_exit(pipe, ret)
/**
* @brief Trace Pipe flush entry
* @param pipe Pipe object
*/
#define sys_port_trace_k_pipe_flush_enter(pipe)
/**
* @brief Trace Pipe flush exit
* @param pipe Pipe object
*/
#define sys_port_trace_k_pipe_flush_exit(pipe)
/**
* @brief Trace Pipe buffer flush entry
* @param pipe Pipe object
*/
#define sys_port_trace_k_pipe_buffer_flush_enter(pipe)
/**
* @brief Trace Pipe buffer flush exit
* @param pipe Pipe object
*/
#define sys_port_trace_k_pipe_buffer_flush_exit(pipe)
/** /**
* @brief Trace Pipe put attempt entry * @brief Trace Pipe put attempt entry
* @param pipe Pipe object * @param pipe Pipe object

View file

@ -26,6 +26,11 @@ struct k_pipe_desc {
size_t bytes_to_xfer; /* # bytes left to transfer */ size_t bytes_to_xfer; /* # bytes left to transfer */
}; };
static int pipe_get_internal(k_spinlock_key_t key, struct k_pipe *pipe,
void *data, size_t bytes_to_read,
size_t *bytes_read, size_t min_xfer,
k_timeout_t timeout);
void k_pipe_init(struct k_pipe *pipe, unsigned char *buffer, size_t size) void k_pipe_init(struct k_pipe *pipe, unsigned char *buffer, size_t size)
{ {
pipe->buffer = buffer; pipe->buffer = buffer;
@ -78,6 +83,55 @@ static inline int z_vrfy_k_pipe_alloc_init(struct k_pipe *pipe, size_t size)
#include <syscalls/k_pipe_alloc_init_mrsh.c> #include <syscalls/k_pipe_alloc_init_mrsh.c>
#endif #endif
void z_impl_k_pipe_flush(struct k_pipe *pipe)
{
size_t bytes_read;
SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, flush, pipe);
k_spinlock_key_t key = k_spin_lock(&pipe->lock);
(void) pipe_get_internal(key, pipe, NULL, (size_t) -1, &bytes_read, 0,
K_NO_WAIT);
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, flush, pipe);
}
#ifdef CONFIG_USERSPACE
void z_vrfy_k_pipe_flush(struct k_pipe *pipe)
{
Z_OOPS(Z_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
z_impl_k_pipe_flush(pipe);
}
#include <syscalls/k_pipe_flush_mrsh.c>
#endif
void z_impl_k_pipe_buffer_flush(struct k_pipe *pipe)
{
size_t bytes_read;
SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, buffer_flush, pipe);
k_spinlock_key_t key = k_spin_lock(&pipe->lock);
if (pipe->buffer != NULL) {
(void) pipe_get_internal(key, pipe, NULL, pipe->size,
&bytes_read, 0, K_NO_WAIT);
}
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, buffer_flush, pipe);
}
#ifdef CONFIG_USERSPACE
void z_vrfy_k_pipe_buffer_flush(struct k_pipe *pipe)
{
Z_OOPS(Z_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
z_impl_k_pipe_buffer_flush(pipe);
}
#endif
int k_pipe_cleanup(struct k_pipe *pipe) int k_pipe_cleanup(struct k_pipe *pipe)
{ {
SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, cleanup, pipe); SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, cleanup, pipe);
@ -127,6 +181,11 @@ static size_t pipe_xfer(unsigned char *dest, size_t dest_size,
size_t num_bytes = MIN(dest_size, src_size); size_t num_bytes = MIN(dest_size, src_size);
const unsigned char *end = src + num_bytes; const unsigned char *end = src + num_bytes;
if (dest == NULL) {
/* Data is being flushed. Pretend the data was copied. */
return num_bytes;
}
while (src != end) { while (src != end) {
*dest = *src; *dest = *src;
dest++; dest++;
@ -187,13 +246,16 @@ static size_t pipe_buffer_get(struct k_pipe *pipe,
size_t bytes_copied; size_t bytes_copied;
size_t run_length; size_t run_length;
size_t num_bytes_read = 0; size_t num_bytes_read = 0;
size_t dest_off;
int i; int i;
for (i = 0; i < 2; i++) { for (i = 0; i < 2; i++) {
run_length = MIN(pipe->bytes_used, run_length = MIN(pipe->bytes_used,
pipe->size - pipe->read_index); pipe->size - pipe->read_index);
bytes_copied = pipe_xfer(dest + num_bytes_read, dest_off = (dest == NULL) ? 0 : num_bytes_read;
bytes_copied = pipe_xfer(dest + dest_off,
dest_size - num_bytes_read, dest_size - num_bytes_read,
pipe->buffer + pipe->read_index, pipe->buffer + pipe->read_index,
run_length); run_length);
@ -490,42 +552,32 @@ int z_vrfy_k_pipe_put(struct k_pipe *pipe, void *data, size_t bytes_to_write,
#include <syscalls/k_pipe_put_mrsh.c> #include <syscalls/k_pipe_put_mrsh.c>
#endif #endif
int z_impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read, static int pipe_get_internal(k_spinlock_key_t key, struct k_pipe *pipe,
size_t *bytes_read, size_t min_xfer, k_timeout_t timeout) void *data, size_t bytes_to_read,
size_t *bytes_read, size_t min_xfer,
k_timeout_t timeout)
{ {
struct k_thread *writer; struct k_thread *writer;
struct k_pipe_desc *desc; struct k_pipe_desc *desc;
sys_dlist_t xfer_list; sys_dlist_t xfer_list;
size_t num_bytes_read = 0; size_t num_bytes_read = 0;
size_t data_off;
size_t bytes_copied; size_t bytes_copied;
SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, get, pipe, timeout);
CHECKIF((min_xfer > bytes_to_read) || bytes_read == NULL) {
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, -EINVAL);
return -EINVAL;
}
k_spinlock_key_t key = k_spin_lock(&pipe->lock);
/* /*
* Create a list of "working readers" into which the data will be * Create a list of "working readers" into which the data will be
* directly copied. * directly copied.
*/ */
if (!pipe_xfer_prepare(&xfer_list, &writer, &pipe->wait_q.writers, if (!pipe_xfer_prepare(&xfer_list, &writer, &pipe->wait_q.writers,
pipe->bytes_used, bytes_to_read, pipe->bytes_used, bytes_to_read,
min_xfer, timeout)) { min_xfer, timeout)) {
k_spin_unlock(&pipe->lock, key); k_spin_unlock(&pipe->lock, key);
*bytes_read = 0; *bytes_read = 0;
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, -EIO);
return -EIO; return -EIO;
} }
SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, get, pipe, timeout);
z_sched_lock(); z_sched_lock();
k_spin_unlock(&pipe->lock, key); k_spin_unlock(&pipe->lock, key);
@ -549,7 +601,8 @@ int z_impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read,
sys_dlist_get(&xfer_list); sys_dlist_get(&xfer_list);
while ((thread != NULL) && (num_bytes_read < bytes_to_read)) { while ((thread != NULL) && (num_bytes_read < bytes_to_read)) {
desc = (struct k_pipe_desc *)thread->base.swap_data; desc = (struct k_pipe_desc *)thread->base.swap_data;
bytes_copied = pipe_xfer((uint8_t *)data + num_bytes_read, data_off = (data == NULL) ? 0 : num_bytes_read;
bytes_copied = pipe_xfer((uint8_t *)data + data_off,
bytes_to_read - num_bytes_read, bytes_to_read - num_bytes_read,
desc->buffer, desc->bytes_to_xfer); desc->buffer, desc->bytes_to_xfer);
@ -573,7 +626,8 @@ int z_impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read,
if ((writer != NULL) && (num_bytes_read < bytes_to_read)) { if ((writer != NULL) && (num_bytes_read < bytes_to_read)) {
desc = (struct k_pipe_desc *)writer->base.swap_data; desc = (struct k_pipe_desc *)writer->base.swap_data;
bytes_copied = pipe_xfer((uint8_t *)data + num_bytes_read, data_off = (data == NULL) ? 0 : num_bytes_read;
bytes_copied = pipe_xfer((uint8_t *)data + data_off,
bytes_to_read - num_bytes_read, bytes_to_read - num_bytes_read,
desc->buffer, desc->bytes_to_xfer); desc->buffer, desc->bytes_to_xfer);
@ -615,8 +669,6 @@ int z_impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read,
*bytes_read = num_bytes_read; *bytes_read = num_bytes_read;
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, 0);
return 0; return 0;
} }
@ -627,12 +679,21 @@ int z_impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read,
*bytes_read = num_bytes_read; *bytes_read = num_bytes_read;
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, 0);
return 0; return 0;
} }
/* Not all data was read */ /*
* Not all data was read. It is important to note that when this
* routine is invoked by either of the flush routines() both the <data>
* and <timeout> parameters are set to NULL and K_NO_WAIT respectively.
* Consequently, neither k_pipe_flush() nor k_pipe_buffer_flush()
* will block.
*
* However, this routine may also be invoked by k_pipe_get() and there
* is no enforcement of <data> being non-NULL when called from
* kernel-space. That restriction is enforced when called from
* user-space.
*/
struct k_pipe_desc pipe_desc; struct k_pipe_desc pipe_desc;
@ -640,6 +701,8 @@ int z_impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read,
pipe_desc.bytes_to_xfer = bytes_to_read - num_bytes_read; pipe_desc.bytes_to_xfer = bytes_to_read - num_bytes_read;
if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT)) { if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, get, pipe, timeout);
_current->base.swap_data = &pipe_desc; _current->base.swap_data = &pipe_desc;
k_spinlock_key_t key2 = k_spin_lock(&pipe->lock); k_spinlock_key_t key2 = k_spin_lock(&pipe->lock);
@ -658,6 +721,28 @@ int z_impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read,
return ret; return ret;
} }
int z_impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read,
size_t *bytes_read, size_t min_xfer, k_timeout_t timeout)
{
SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, get, pipe, timeout);
CHECKIF((min_xfer > bytes_to_read) || bytes_read == NULL) {
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe,
timeout, -EINVAL);
return -EINVAL;
}
k_spinlock_key_t key = k_spin_lock(&pipe->lock);
int ret = pipe_get_internal(key, pipe, data, bytes_to_read, bytes_read,
min_xfer, timeout);
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, ret);
return ret;
}
#ifdef CONFIG_USERSPACE #ifdef CONFIG_USERSPACE
int z_vrfy_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read, int z_vrfy_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read,
size_t *bytes_read, size_t min_xfer, k_timeout_t timeout) size_t *bytes_read, size_t min_xfer, k_timeout_t timeout)

View file

@ -270,6 +270,10 @@ extern "C" {
#define sys_port_trace_k_pipe_cleanup_exit(pipe, ret) #define sys_port_trace_k_pipe_cleanup_exit(pipe, ret)
#define sys_port_trace_k_pipe_alloc_init_enter(pipe) #define sys_port_trace_k_pipe_alloc_init_enter(pipe)
#define sys_port_trace_k_pipe_alloc_init_exit(pipe, ret) #define sys_port_trace_k_pipe_alloc_init_exit(pipe, ret)
#define sys_port_trace_k_pipe_flush_enter(pipe)
#define sys_port_trace_k_pipe_flush_exit(pipe)
#define sys_port_trace_k_pipe_buffer_flush_enter(pipe)
#define sys_port_trace_k_pipe_buffer_flush_exit(pipe)
#define sys_port_trace_k_pipe_put_enter(pipe, timeout) #define sys_port_trace_k_pipe_put_enter(pipe, timeout)
#define sys_port_trace_k_pipe_put_blocking(pipe, timeout) #define sys_port_trace_k_pipe_put_blocking(pipe, timeout)
#define sys_port_trace_k_pipe_put_exit(pipe, timeout, ret) #define sys_port_trace_k_pipe_put_exit(pipe, timeout, ret)

View file

@ -520,6 +520,10 @@ void sys_trace_thread_info(struct k_thread *thread);
#define sys_port_trace_k_pipe_cleanup_exit(pipe, ret) #define sys_port_trace_k_pipe_cleanup_exit(pipe, ret)
#define sys_port_trace_k_pipe_alloc_init_enter(pipe) #define sys_port_trace_k_pipe_alloc_init_enter(pipe)
#define sys_port_trace_k_pipe_alloc_init_exit(pipe, ret) #define sys_port_trace_k_pipe_alloc_init_exit(pipe, ret)
#define sys_port_trace_k_pipe_flush_enter(pipe)
#define sys_port_trace_k_pipe_flush_exit(pipe)
#define sys_port_trace_k_pipe_buffer_flush_enter(pipe)
#define sys_port_trace_k_pipe_buffer_flush_exit(pipe)
#define sys_port_trace_k_pipe_put_enter(pipe, timeout) #define sys_port_trace_k_pipe_put_enter(pipe, timeout)
#define sys_port_trace_k_pipe_put_blocking(pipe, timeout) #define sys_port_trace_k_pipe_put_blocking(pipe, timeout)
#define sys_port_trace_k_pipe_put_exit(pipe, timeout, ret) #define sys_port_trace_k_pipe_put_exit(pipe, timeout, ret)

View file

@ -345,6 +345,15 @@
#define sys_port_trace_k_pipe_alloc_init_enter(pipe) sys_trace_k_pipe_alloc_init_enter(pipe, size) #define sys_port_trace_k_pipe_alloc_init_enter(pipe) sys_trace_k_pipe_alloc_init_enter(pipe, size)
#define sys_port_trace_k_pipe_alloc_init_exit(pipe, ret) \ #define sys_port_trace_k_pipe_alloc_init_exit(pipe, ret) \
sys_trace_k_pipe_alloc_init_exit(pipe, size, ret) sys_trace_k_pipe_alloc_init_exit(pipe, size, ret)
#define sys_port_trace_k_pipe_flush_enter(pipe) \
sys_trace_k_pipe_flush_enter(pipe)
#define sys_port_trace_k_pipe_flush_exit(pipe) \
sys_trace_k_pipe_flush_exit(pipe)
#define sys_port_trace_k_pipe_buffer_flush_enter(pipe) \
sys_trace_k_pipe_buffer_flush_enter(pipe)
#define sys_port_trace_k_pipe_buffer_flush_exit(pipe) \
sys_trace_k_pipe_buffer_flush_exit(pipe)
#define sys_port_trace_k_pipe_put_enter(pipe, timeout) \ #define sys_port_trace_k_pipe_put_enter(pipe, timeout) \
sys_trace_k_pipe_put_enter(pipe, data, bytes_to_write, bytes_written, min_xfer, timeout) sys_trace_k_pipe_put_enter(pipe, data, bytes_to_write, bytes_written, min_xfer, timeout)
#define sys_port_trace_k_pipe_put_blocking(pipe, timeout) \ #define sys_port_trace_k_pipe_put_blocking(pipe, timeout) \
@ -607,6 +616,10 @@ void sys_trace_k_pipe_cleanup_enter(struct k_pipe *pipe);
void sys_trace_k_pipe_cleanup_exit(struct k_pipe *pipe, int ret); void sys_trace_k_pipe_cleanup_exit(struct k_pipe *pipe, int ret);
void sys_trace_k_pipe_alloc_init_enter(struct k_pipe *pipe, size_t size); void sys_trace_k_pipe_alloc_init_enter(struct k_pipe *pipe, size_t size);
void sys_trace_k_pipe_alloc_init_exit(struct k_pipe *pipe, size_t size, int ret); void sys_trace_k_pipe_alloc_init_exit(struct k_pipe *pipe, size_t size, int ret);
void sys_trace_k_pipe_flush_enter(struct k_pipe *pipe);
void sys_trace_k_pipe_flush_exit(struct k_pipe *pipe);
void sys_trace_k_pipe_buffer_flush_enter(struct k_pipe *pipe);
void sys_trace_k_pipe_buffer_flush_exit(struct k_pipe *pipe);
void sys_trace_k_pipe_put_enter(struct k_pipe *pipe, void *data, size_t bytes_to_write, void sys_trace_k_pipe_put_enter(struct k_pipe *pipe, void *data, size_t bytes_to_write,
size_t *bytes_written, size_t min_xfer, k_timeout_t timeout); size_t *bytes_written, size_t min_xfer, k_timeout_t timeout);
void sys_trace_k_pipe_put_blocking(struct k_pipe *pipe, void *data, size_t bytes_to_write, void sys_trace_k_pipe_put_blocking(struct k_pipe *pipe, void *data, size_t bytes_to_write,

View file

@ -251,6 +251,10 @@ void sys_trace_idle(void);
#define sys_port_trace_k_pipe_cleanup_exit(pipe, ret) #define sys_port_trace_k_pipe_cleanup_exit(pipe, ret)
#define sys_port_trace_k_pipe_alloc_init_enter(pipe) #define sys_port_trace_k_pipe_alloc_init_enter(pipe)
#define sys_port_trace_k_pipe_alloc_init_exit(pipe, ret) #define sys_port_trace_k_pipe_alloc_init_exit(pipe, ret)
#define sys_port_trace_k_pipe_flush_enter(pipe)
#define sys_port_trace_k_pipe_flush_exit(pipe)
#define sys_port_trace_k_pipe_buffer_flush_enter(pipe)
#define sys_port_trace_k_pipe_buffer_flush_exit(pipe)
#define sys_port_trace_k_pipe_put_enter(pipe, timeout) #define sys_port_trace_k_pipe_put_enter(pipe, timeout)
#define sys_port_trace_k_pipe_put_blocking(pipe, timeout) #define sys_port_trace_k_pipe_put_blocking(pipe, timeout)
#define sys_port_trace_k_pipe_put_exit(pipe, timeout, ret) #define sys_port_trace_k_pipe_put_exit(pipe, timeout, ret)