kernel: pipes: rewrite pipes implementation
This new implementation of pipes has a number of advantages over the previous. 1. The schedule locking is eliminated both making it safer for SMP and allowing for pipes to be used from ISR context. 2. The code used to be structured to have separate code for copying to/from a wating thread's buffer and the pipe buffer. This had unnecessary duplication that has been replaced with a simpler scatter-gather copy model. 3. The manner in which the "working list" is generated has also been simplified. It no longer tries to use the thread's queuing node. Instead, the k_pipe_desc structure (whose instances are on the part of the k_thread structure) has been extended to contain additional fields including a node for use with a linked list. As this impacts the k_thread structure, pipes are now configurable in the kernel via CONFIG_PIPES. Fixes #47061 Signed-off-by: Peter Mitsis <peter.mitsis@intel.com>
This commit is contained in:
parent
2bc23e0543
commit
f86027ffb7
7 changed files with 356 additions and 427 deletions
|
@ -39,6 +39,20 @@ struct __thread_entry {
|
|||
};
|
||||
#endif
|
||||
|
||||
struct k_thread;
|
||||
|
||||
/*
|
||||
* This _pipe_desc structure is used by the pipes kernel module when
|
||||
* CONFIG_PIPES has been selected.
|
||||
*/
|
||||
|
||||
struct _pipe_desc {
|
||||
sys_dnode_t node;
|
||||
unsigned char *buffer; /* Position in src/dest buffer */
|
||||
size_t bytes_to_xfer; /* # bytes left to transfer */
|
||||
struct k_thread *thread; /* Back pointer to pended thread */
|
||||
};
|
||||
|
||||
/* can be used for creating 'dummy' threads, e.g. for pending on objects */
|
||||
struct _thread_base {
|
||||
|
||||
|
@ -320,6 +334,11 @@ struct k_thread {
|
|||
struct k_mem_paging_stats_t paging_stats;
|
||||
#endif
|
||||
|
||||
#ifdef CONFIG_PIPES
|
||||
/** Pipe descriptor used with blocking k_pipe operations */
|
||||
struct _pipe_desc pipe_desc;
|
||||
#endif
|
||||
|
||||
/** arch-specifics: must always be at the end */
|
||||
struct _thread_arch arch;
|
||||
};
|
||||
|
|
|
@ -36,7 +36,6 @@ list(APPEND kernel_files
|
|||
mailbox.c
|
||||
msg_q.c
|
||||
mutex.c
|
||||
pipes.c
|
||||
queue.c
|
||||
sem.c
|
||||
stack.c
|
||||
|
@ -81,6 +80,7 @@ target_sources_ifdef(CONFIG_ATOMIC_OPERATIONS_C kernel PRIVATE atomic_c.c)
|
|||
target_sources_ifdef(CONFIG_MMU kernel PRIVATE mmu.c)
|
||||
target_sources_ifdef(CONFIG_POLL kernel PRIVATE poll.c)
|
||||
target_sources_ifdef(CONFIG_EVENTS kernel PRIVATE events.c)
|
||||
target_sources_ifdef(CONFIG_PIPES kernel PRIVATE pipes.c)
|
||||
target_sources_ifdef(CONFIG_SCHED_THREAD_USAGE kernel PRIVATE usage.c)
|
||||
|
||||
if(${CONFIG_KERNEL_MEM_POOL})
|
||||
|
|
|
@ -554,6 +554,13 @@ config EVENTS
|
|||
Note that setting this option slightly increases the size of the
|
||||
thread structure.
|
||||
|
||||
config PIPES
|
||||
bool "Pipe objects"
|
||||
help
|
||||
This option enables kernel pipes. A pipe is a kernel object that
|
||||
allows a thread to send a byte stream to another thread. Pipes can
|
||||
be used to synchronously transfer chunks of data in whole or in part.
|
||||
|
||||
config KERNEL_MEM_POOL
|
||||
bool "Use Kernel Memory Pool"
|
||||
default y
|
||||
|
|
754
kernel/pipes.c
754
kernel/pipes.c
|
@ -21,11 +21,6 @@
|
|||
#include <kernel_internal.h>
|
||||
#include <zephyr/sys/check.h>
|
||||
|
||||
struct k_pipe_desc {
|
||||
unsigned char *buffer; /* Position in src/dest buffer */
|
||||
size_t bytes_to_xfer; /* # bytes left to transfer */
|
||||
};
|
||||
|
||||
static int pipe_get_internal(k_spinlock_key_t key, struct k_pipe *pipe,
|
||||
void *data, size_t bytes_to_read,
|
||||
size_t *bytes_read, size_t min_xfer,
|
||||
|
@ -35,9 +30,9 @@ void k_pipe_init(struct k_pipe *pipe, unsigned char *buffer, size_t size)
|
|||
{
|
||||
pipe->buffer = buffer;
|
||||
pipe->size = size;
|
||||
pipe->bytes_used = 0;
|
||||
pipe->read_index = 0;
|
||||
pipe->write_index = 0;
|
||||
pipe->bytes_used = 0U;
|
||||
pipe->read_index = 0U;
|
||||
pipe->write_index = 0U;
|
||||
pipe->lock = (struct k_spinlock){};
|
||||
z_waitq_init(&pipe->wait_q.writers);
|
||||
z_waitq_init(&pipe->wait_q.readers);
|
||||
|
@ -64,7 +59,7 @@ int z_impl_k_pipe_alloc_init(struct k_pipe *pipe, size_t size)
|
|||
ret = -ENOMEM;
|
||||
}
|
||||
} else {
|
||||
k_pipe_init(pipe, NULL, 0);
|
||||
k_pipe_init(pipe, NULL, 0U);
|
||||
ret = 0;
|
||||
}
|
||||
|
||||
|
@ -91,7 +86,7 @@ void z_impl_k_pipe_flush(struct k_pipe *pipe)
|
|||
|
||||
k_spinlock_key_t key = k_spin_lock(&pipe->lock);
|
||||
|
||||
(void) pipe_get_internal(key, pipe, NULL, (size_t) -1, &bytes_read, 0,
|
||||
(void) pipe_get_internal(key, pipe, NULL, (size_t) -1, &bytes_read, 0U,
|
||||
K_NO_WAIT);
|
||||
|
||||
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, flush, pipe);
|
||||
|
@ -117,7 +112,9 @@ void z_impl_k_pipe_buffer_flush(struct k_pipe *pipe)
|
|||
|
||||
if (pipe->buffer != NULL) {
|
||||
(void) pipe_get_internal(key, pipe, NULL, pipe->size,
|
||||
&bytes_read, 0, K_NO_WAIT);
|
||||
&bytes_read, 0U, K_NO_WAIT);
|
||||
} else {
|
||||
k_spin_unlock(&pipe->lock, key);
|
||||
}
|
||||
|
||||
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, buffer_flush, pipe);
|
||||
|
@ -156,16 +153,16 @@ int k_pipe_cleanup(struct k_pipe *pipe)
|
|||
* pipe. Reset the pipe's counters to prevent malfunction.
|
||||
*/
|
||||
|
||||
pipe->size = 0;
|
||||
pipe->bytes_used = 0;
|
||||
pipe->read_index = 0;
|
||||
pipe->write_index = 0;
|
||||
pipe->size = 0U;
|
||||
pipe->bytes_used = 0U;
|
||||
pipe->read_index = 0U;
|
||||
pipe->write_index = 0U;
|
||||
pipe->flags &= ~K_PIPE_FLAG_ALLOC;
|
||||
}
|
||||
|
||||
k_spin_unlock(&pipe->lock, key);
|
||||
|
||||
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, cleanup, pipe, 0);
|
||||
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, cleanup, pipe, 0U);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -179,191 +176,80 @@ static size_t pipe_xfer(unsigned char *dest, size_t dest_size,
|
|||
const unsigned char *src, size_t src_size)
|
||||
{
|
||||
size_t num_bytes = MIN(dest_size, src_size);
|
||||
const unsigned char *end = src + num_bytes;
|
||||
|
||||
if (dest == NULL) {
|
||||
/* Data is being flushed. Pretend the data was copied. */
|
||||
return num_bytes;
|
||||
}
|
||||
|
||||
while (src != end) {
|
||||
*dest = *src;
|
||||
dest++;
|
||||
src++;
|
||||
(void) memcpy(dest, src, num_bytes);
|
||||
|
||||
return num_bytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Popluate pipe descriptors for copying to/from waiters' buffers
|
||||
*
|
||||
* This routine cycles through the waiters on the wait queue and creates
|
||||
* a list of threads that will have data directly copied to / read from
|
||||
* their buffers. This list helps us avoid double copying later.
|
||||
*
|
||||
* @return # of bytes available for direct copying
|
||||
*/
|
||||
static size_t pipe_waiter_list_populate(sys_dlist_t *list,
|
||||
_wait_q_t *wait_q,
|
||||
size_t bytes_to_xfer)
|
||||
{
|
||||
struct k_thread *thread;
|
||||
struct _pipe_desc *curr;
|
||||
size_t num_bytes = 0U;
|
||||
|
||||
_WAIT_Q_FOR_EACH(wait_q, thread) {
|
||||
curr = (struct _pipe_desc *)thread->base.swap_data;
|
||||
|
||||
sys_dlist_append(list, &curr->node);
|
||||
|
||||
num_bytes += curr->bytes_to_xfer;
|
||||
if (num_bytes >= bytes_to_xfer) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return num_bytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Put data from @a src into the pipe's circular buffer
|
||||
* @brief Populate pipe descriptors for copying to/from pipe buffer
|
||||
*
|
||||
* Modifies the following fields in @a pipe:
|
||||
* buffer, bytes_used, write_index
|
||||
*
|
||||
* @return Number of bytes written to the pipe's circular buffer
|
||||
* This routine is only called if the pipe buffer is not empty (when reading),
|
||||
* or if not full (when writing).
|
||||
*/
|
||||
static size_t pipe_buffer_put(struct k_pipe *pipe,
|
||||
const unsigned char *src, size_t src_size)
|
||||
static size_t pipe_buffer_list_populate(sys_dlist_t *list,
|
||||
struct _pipe_desc *desc,
|
||||
unsigned char *buffer,
|
||||
size_t size,
|
||||
size_t start,
|
||||
size_t end)
|
||||
{
|
||||
size_t bytes_copied;
|
||||
size_t run_length;
|
||||
size_t num_bytes_written = 0;
|
||||
int i;
|
||||
sys_dlist_append(list, &desc[0].node);
|
||||
|
||||
desc[0].thread = NULL;
|
||||
desc[0].buffer = &buffer[start];
|
||||
|
||||
for (i = 0; i < 2; i++) {
|
||||
run_length = MIN(pipe->size - pipe->bytes_used,
|
||||
pipe->size - pipe->write_index);
|
||||
|
||||
bytes_copied = pipe_xfer(pipe->buffer + pipe->write_index,
|
||||
run_length,
|
||||
src + num_bytes_written,
|
||||
src_size - num_bytes_written);
|
||||
|
||||
num_bytes_written += bytes_copied;
|
||||
pipe->bytes_used += bytes_copied;
|
||||
pipe->write_index += bytes_copied;
|
||||
if (pipe->write_index == pipe->size) {
|
||||
pipe->write_index = 0;
|
||||
}
|
||||
if (start < end) {
|
||||
desc[0].bytes_to_xfer = end - start;
|
||||
return end - start;
|
||||
}
|
||||
|
||||
return num_bytes_written;
|
||||
}
|
||||
desc[0].bytes_to_xfer = size - start;
|
||||
|
||||
/**
|
||||
* @brief Get data from the pipe's circular buffer
|
||||
*
|
||||
* Modifies the following fields in @a pipe:
|
||||
* bytes_used, read_index
|
||||
*
|
||||
* @return Number of bytes read from the pipe's circular buffer
|
||||
*/
|
||||
static size_t pipe_buffer_get(struct k_pipe *pipe,
|
||||
unsigned char *dest, size_t dest_size)
|
||||
{
|
||||
size_t bytes_copied;
|
||||
size_t run_length;
|
||||
size_t num_bytes_read = 0;
|
||||
size_t dest_off;
|
||||
int i;
|
||||
desc[1].thread = NULL;
|
||||
desc[1].buffer = &buffer[0];
|
||||
desc[1].bytes_to_xfer = end;
|
||||
|
||||
for (i = 0; i < 2; i++) {
|
||||
run_length = MIN(pipe->bytes_used,
|
||||
pipe->size - pipe->read_index);
|
||||
sys_dlist_append(list, &desc[1].node);
|
||||
|
||||
dest_off = (dest == NULL) ? 0 : num_bytes_read;
|
||||
|
||||
bytes_copied = pipe_xfer(dest + dest_off,
|
||||
dest_size - num_bytes_read,
|
||||
pipe->buffer + pipe->read_index,
|
||||
run_length);
|
||||
|
||||
num_bytes_read += bytes_copied;
|
||||
pipe->bytes_used -= bytes_copied;
|
||||
pipe->read_index += bytes_copied;
|
||||
if (pipe->read_index == pipe->size) {
|
||||
pipe->read_index = 0;
|
||||
}
|
||||
}
|
||||
|
||||
return num_bytes_read;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Prepare a working set of readers/writers
|
||||
*
|
||||
* Prepare a list of "working threads" into/from which the data
|
||||
* will be directly copied. This list is useful as it is used to ...
|
||||
*
|
||||
* 1. avoid double copying
|
||||
* 2. minimize interrupt latency as interrupts are unlocked
|
||||
* while copying data
|
||||
* 3. ensure a timeout can not make the request impossible to satisfy
|
||||
*
|
||||
* The list is populated with previously pended threads that will be ready to
|
||||
* run after the pipe call is complete.
|
||||
*
|
||||
* Important things to remember when reading from the pipe ...
|
||||
* 1. If there are writers int @a wait_q, then the pipe's buffer is full.
|
||||
* 2. Conversely if the pipe's buffer is not full, there are no writers.
|
||||
* 3. The amount of available data in the pipe is the sum the bytes used in
|
||||
* the pipe (@a pipe_space) and all the requests from the waiting writers.
|
||||
* 4. Since data is read from the pipe's buffer first, the working set must
|
||||
* include writers that will (try to) re-fill the pipe's buffer afterwards.
|
||||
*
|
||||
* Important things to remember when writing to the pipe ...
|
||||
* 1. If there are readers in @a wait_q, then the pipe's buffer is empty.
|
||||
* 2. Conversely if the pipe's buffer is not empty, then there are no readers.
|
||||
* 3. The amount of space available in the pipe is the sum of the bytes unused
|
||||
* in the pipe (@a pipe_space) and all the requests from the waiting readers.
|
||||
*
|
||||
* @return false if request is unsatisfiable, otherwise true
|
||||
*/
|
||||
static bool pipe_xfer_prepare(sys_dlist_t *xfer_list,
|
||||
struct k_thread **waiter,
|
||||
_wait_q_t *wait_q,
|
||||
size_t pipe_space,
|
||||
size_t bytes_to_xfer,
|
||||
size_t min_xfer,
|
||||
k_timeout_t timeout)
|
||||
{
|
||||
struct k_thread *thread;
|
||||
struct k_pipe_desc *desc;
|
||||
size_t num_bytes = 0;
|
||||
|
||||
if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
|
||||
_WAIT_Q_FOR_EACH(wait_q, thread) {
|
||||
desc = (struct k_pipe_desc *)thread->base.swap_data;
|
||||
|
||||
num_bytes += desc->bytes_to_xfer;
|
||||
|
||||
if (num_bytes >= bytes_to_xfer) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (num_bytes + pipe_space < min_xfer) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Either @a timeout is not K_NO_WAIT (so the thread may pend) or
|
||||
* the entire request can be satisfied. Generate the working list.
|
||||
*/
|
||||
|
||||
sys_dlist_init(xfer_list);
|
||||
num_bytes = 0;
|
||||
|
||||
while ((thread = z_waitq_head(wait_q)) != NULL) {
|
||||
desc = (struct k_pipe_desc *)thread->base.swap_data;
|
||||
num_bytes += desc->bytes_to_xfer;
|
||||
|
||||
if (num_bytes > bytes_to_xfer) {
|
||||
/*
|
||||
* This request can not be fully satisfied.
|
||||
* Do not remove it from the wait_q.
|
||||
* Do not abort its timeout (if applicable).
|
||||
* Do not add it to the transfer list
|
||||
*/
|
||||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
* This request can be fully satisfied.
|
||||
* Remove it from the wait_q.
|
||||
* Abort its timeout.
|
||||
* Add it to the transfer list.
|
||||
*/
|
||||
z_unpend_thread(thread);
|
||||
sys_dlist_append(xfer_list, &thread->base.qnode_dlist);
|
||||
}
|
||||
|
||||
*waiter = (num_bytes > bytes_to_xfer) ? thread : NULL;
|
||||
|
||||
return true;
|
||||
return size - start + end;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -373,8 +259,8 @@ static bool pipe_xfer_prepare(sys_dlist_t *xfer_list,
|
|||
* >= Minimum 0 0
|
||||
* < Minimum -EIO* -EAGAIN
|
||||
*
|
||||
* * The "-EIO No Wait" case was already checked when the "working set"
|
||||
* was created in _pipe_xfer_prepare().
|
||||
* * The "-EIO No Wait" case was already checked after the list of pipe
|
||||
* descriptors was created.
|
||||
*
|
||||
* @return See table above
|
||||
*/
|
||||
|
@ -392,147 +278,170 @@ static int pipe_return_code(size_t min_xfer, size_t bytes_remaining,
|
|||
return -EAGAIN;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Copy data from source(s) to destination(s)
|
||||
*/
|
||||
|
||||
static size_t pipe_write(struct k_pipe *pipe, sys_dlist_t *src_list,
|
||||
sys_dlist_t *dest_list, bool *reschedule)
|
||||
{
|
||||
struct _pipe_desc *src;
|
||||
struct _pipe_desc *dest;
|
||||
size_t bytes_copied;
|
||||
size_t num_bytes_written = 0U;
|
||||
|
||||
src = (struct _pipe_desc *)sys_dlist_get(src_list);
|
||||
dest = (struct _pipe_desc *)sys_dlist_get(dest_list);
|
||||
|
||||
while ((src != NULL) && (dest != NULL)) {
|
||||
bytes_copied = pipe_xfer(dest->buffer, dest->bytes_to_xfer,
|
||||
src->buffer, src->bytes_to_xfer);
|
||||
|
||||
num_bytes_written += bytes_copied;
|
||||
|
||||
dest->buffer += bytes_copied;
|
||||
dest->bytes_to_xfer -= bytes_copied;
|
||||
|
||||
src->buffer += bytes_copied;
|
||||
src->bytes_to_xfer -= bytes_copied;
|
||||
|
||||
if (dest->thread == NULL) {
|
||||
|
||||
/* Writing to the pipe buffer. Update details. */
|
||||
|
||||
pipe->bytes_used += bytes_copied;
|
||||
pipe->write_index += bytes_copied;
|
||||
if (pipe->write_index >= pipe->size) {
|
||||
pipe->write_index -= pipe->size;
|
||||
}
|
||||
} else if (dest->bytes_to_xfer == 0U) {
|
||||
|
||||
/* A thread's read request has been satisfied. */
|
||||
|
||||
(void) z_sched_wake(&pipe->wait_q.readers, 0, NULL);
|
||||
|
||||
*reschedule = true;
|
||||
}
|
||||
|
||||
if (src->bytes_to_xfer == 0U) {
|
||||
src = (struct _pipe_desc *)sys_dlist_get(src_list);
|
||||
}
|
||||
|
||||
if (dest->bytes_to_xfer == 0U) {
|
||||
dest = (struct _pipe_desc *)sys_dlist_get(dest_list);
|
||||
}
|
||||
}
|
||||
|
||||
return num_bytes_written;
|
||||
}
|
||||
|
||||
int z_impl_k_pipe_put(struct k_pipe *pipe, void *data, size_t bytes_to_write,
|
||||
size_t *bytes_written, size_t min_xfer,
|
||||
k_timeout_t timeout)
|
||||
{
|
||||
struct k_thread *reader;
|
||||
struct k_pipe_desc *desc;
|
||||
sys_dlist_t xfer_list;
|
||||
size_t num_bytes_written = 0;
|
||||
size_t bytes_copied;
|
||||
struct _pipe_desc pipe_desc[2];
|
||||
struct _pipe_desc *src_desc;
|
||||
sys_dlist_t dest_list;
|
||||
sys_dlist_t src_list;
|
||||
size_t bytes_can_write;
|
||||
bool reschedule_needed = false;
|
||||
|
||||
__ASSERT(((arch_is_in_isr() == false) ||
|
||||
K_TIMEOUT_EQ(timeout, K_NO_WAIT)), "");
|
||||
|
||||
SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, put, pipe, timeout);
|
||||
|
||||
CHECKIF((min_xfer > bytes_to_write) || bytes_written == NULL) {
|
||||
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, -EINVAL);
|
||||
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout,
|
||||
-EINVAL);
|
||||
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
sys_dlist_init(&src_list);
|
||||
sys_dlist_init(&dest_list);
|
||||
|
||||
k_spinlock_key_t key = k_spin_lock(&pipe->lock);
|
||||
|
||||
/*
|
||||
* Create a list of "working readers" into which the data will be
|
||||
* directly copied.
|
||||
* First, write to any waiting readers, if any exist.
|
||||
* Second, write to the pipe buffer, if it exists.
|
||||
*/
|
||||
|
||||
if (!pipe_xfer_prepare(&xfer_list, &reader, &pipe->wait_q.readers,
|
||||
pipe->size - pipe->bytes_used, bytes_to_write,
|
||||
min_xfer, timeout)) {
|
||||
k_spin_unlock(&pipe->lock, key);
|
||||
*bytes_written = 0;
|
||||
bytes_can_write = pipe_waiter_list_populate(&dest_list,
|
||||
&pipe->wait_q.readers,
|
||||
bytes_to_write);
|
||||
|
||||
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, -EIO);
|
||||
if (pipe->bytes_used != pipe->size) {
|
||||
bytes_can_write += pipe_buffer_list_populate(&dest_list,
|
||||
pipe_desc,
|
||||
pipe->buffer,
|
||||
pipe->size,
|
||||
pipe->write_index,
|
||||
pipe->read_index);
|
||||
}
|
||||
|
||||
if ((bytes_can_write < min_xfer) &&
|
||||
(K_TIMEOUT_EQ(timeout, K_NO_WAIT))) {
|
||||
|
||||
/* The request can not be fulfilled. */
|
||||
|
||||
k_spin_unlock(&pipe->lock, key);
|
||||
*bytes_written = 0U;
|
||||
|
||||
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe,
|
||||
timeout, -EIO);
|
||||
|
||||
return -EIO;
|
||||
}
|
||||
|
||||
src_desc = &_current->pipe_desc;
|
||||
|
||||
src_desc->buffer = data;
|
||||
src_desc->bytes_to_xfer = bytes_to_write;
|
||||
src_desc->thread = _current;
|
||||
sys_dlist_append(&src_list, &src_desc->node);
|
||||
|
||||
*bytes_written = pipe_write(pipe, &src_list,
|
||||
&dest_list, &reschedule_needed);
|
||||
|
||||
/*
|
||||
* The immediate success conditions below are backwards
|
||||
* compatible with an earlier pipe implementation.
|
||||
*/
|
||||
|
||||
if ((*bytes_written == bytes_to_write) ||
|
||||
(K_TIMEOUT_EQ(timeout, K_NO_WAIT)) ||
|
||||
((*bytes_written >= min_xfer) && (min_xfer > 0U))) {
|
||||
|
||||
/* The minimum amount of data has been copied */
|
||||
|
||||
if (reschedule_needed) {
|
||||
z_reschedule(&pipe->lock, key);
|
||||
} else {
|
||||
k_spin_unlock(&pipe->lock, key);
|
||||
}
|
||||
|
||||
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, 0);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* The minimum amount of data has not been copied. Block. */
|
||||
|
||||
SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, put, pipe, timeout);
|
||||
|
||||
z_sched_lock();
|
||||
k_spin_unlock(&pipe->lock, key);
|
||||
_current->base.swap_data = src_desc;
|
||||
|
||||
/*
|
||||
* 1. 'xfer_list' currently contains a list of reader threads that can
|
||||
* have their read requests fulfilled by the current call.
|
||||
* 2. 'reader' if not NULL points to a thread on the reader wait_q
|
||||
* that can get some of its requested data.
|
||||
* 3. Interrupts are unlocked but the scheduler is locked to allow
|
||||
* ticks to be delivered but no scheduling to occur
|
||||
* 4. If 'reader' times out while we are copying data, not only do we
|
||||
* still have a pointer to it, but it can not execute until this call
|
||||
* is complete so it is still safe to copy data to it.
|
||||
*/
|
||||
z_sched_wait(&pipe->lock, key, &pipe->wait_q.writers, timeout, NULL);
|
||||
|
||||
struct k_thread *thread = (struct k_thread *)
|
||||
sys_dlist_get(&xfer_list);
|
||||
while (thread != NULL) {
|
||||
desc = (struct k_pipe_desc *)thread->base.swap_data;
|
||||
bytes_copied = pipe_xfer(desc->buffer, desc->bytes_to_xfer,
|
||||
(uint8_t *)data + num_bytes_written,
|
||||
bytes_to_write - num_bytes_written);
|
||||
*bytes_written = bytes_to_write - src_desc->bytes_to_xfer;
|
||||
|
||||
num_bytes_written += bytes_copied;
|
||||
desc->buffer += bytes_copied;
|
||||
desc->bytes_to_xfer -= bytes_copied;
|
||||
|
||||
/* The thread's read request has been satisfied. Ready it. */
|
||||
z_ready_thread(thread);
|
||||
|
||||
thread = (struct k_thread *)sys_dlist_get(&xfer_list);
|
||||
}
|
||||
|
||||
/*
|
||||
* Copy any data to the reader that we left on the wait_q.
|
||||
* It is possible no data will be copied.
|
||||
*/
|
||||
if (reader != NULL) {
|
||||
desc = (struct k_pipe_desc *)reader->base.swap_data;
|
||||
bytes_copied = pipe_xfer(desc->buffer, desc->bytes_to_xfer,
|
||||
(uint8_t *)data + num_bytes_written,
|
||||
bytes_to_write - num_bytes_written);
|
||||
|
||||
num_bytes_written += bytes_copied;
|
||||
desc->buffer += bytes_copied;
|
||||
desc->bytes_to_xfer -= bytes_copied;
|
||||
}
|
||||
|
||||
/*
|
||||
* As much data as possible has been directly copied to any waiting
|
||||
* readers. Add as much as possible to the pipe's circular buffer.
|
||||
*/
|
||||
|
||||
num_bytes_written +=
|
||||
pipe_buffer_put(pipe, (uint8_t *)data + num_bytes_written,
|
||||
bytes_to_write - num_bytes_written);
|
||||
|
||||
if (num_bytes_written == bytes_to_write) {
|
||||
*bytes_written = num_bytes_written;
|
||||
k_sched_unlock();
|
||||
|
||||
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, 0);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT)
|
||||
&& num_bytes_written >= min_xfer
|
||||
&& min_xfer > 0U) {
|
||||
*bytes_written = num_bytes_written;
|
||||
k_sched_unlock();
|
||||
|
||||
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, 0);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Not all data was copied */
|
||||
|
||||
struct k_pipe_desc pipe_desc;
|
||||
|
||||
pipe_desc.buffer = (uint8_t *)data + num_bytes_written;
|
||||
pipe_desc.bytes_to_xfer = bytes_to_write - num_bytes_written;
|
||||
|
||||
if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
|
||||
_current->base.swap_data = &pipe_desc;
|
||||
/*
|
||||
* Lock interrupts and unlock the scheduler before
|
||||
* manipulating the writers wait_q.
|
||||
*/
|
||||
k_spinlock_key_t key2 = k_spin_lock(&pipe->lock);
|
||||
z_sched_unlock_no_reschedule();
|
||||
(void)z_pend_curr(&pipe->lock, key2,
|
||||
&pipe->wait_q.writers, timeout);
|
||||
} else {
|
||||
k_sched_unlock();
|
||||
}
|
||||
|
||||
*bytes_written = bytes_to_write - pipe_desc.bytes_to_xfer;
|
||||
|
||||
int ret = pipe_return_code(min_xfer, pipe_desc.bytes_to_xfer,
|
||||
int ret = pipe_return_code(min_xfer, src_desc->bytes_to_xfer,
|
||||
bytes_to_write);
|
||||
|
||||
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, ret);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -557,173 +466,158 @@ static int pipe_get_internal(k_spinlock_key_t key, struct k_pipe *pipe,
|
|||
size_t *bytes_read, size_t min_xfer,
|
||||
k_timeout_t timeout)
|
||||
{
|
||||
struct k_thread *writer;
|
||||
struct k_pipe_desc *desc;
|
||||
sys_dlist_t xfer_list;
|
||||
size_t num_bytes_read = 0;
|
||||
size_t data_off;
|
||||
sys_dlist_t src_list;
|
||||
struct _pipe_desc pipe_desc[2];
|
||||
struct _pipe_desc *dest_desc;
|
||||
struct _pipe_desc *src_desc;
|
||||
size_t num_bytes_read = 0U;
|
||||
size_t bytes_copied;
|
||||
size_t bytes_can_read = 0U;
|
||||
bool reschedule_needed = false;
|
||||
|
||||
/*
|
||||
* Create a list of "working readers" into which the data will be
|
||||
* directly copied.
|
||||
* Data copying takes place in the following order.
|
||||
* 1. Copy data from the pipe buffer to the receive buffer.
|
||||
* 2. Copy data from the waiting writer(s) to the receive buffer.
|
||||
* 3. Refill the pipe buffer from the waiting writer(s).
|
||||
*/
|
||||
|
||||
if (!pipe_xfer_prepare(&xfer_list, &writer, &pipe->wait_q.writers,
|
||||
pipe->bytes_used, bytes_to_read,
|
||||
min_xfer, timeout)) {
|
||||
sys_dlist_init(&src_list);
|
||||
|
||||
if (pipe->bytes_used != 0) {
|
||||
bytes_can_read = pipe_buffer_list_populate(&src_list,
|
||||
pipe_desc,
|
||||
pipe->buffer,
|
||||
pipe->size,
|
||||
pipe->read_index,
|
||||
pipe->write_index);
|
||||
}
|
||||
|
||||
bytes_can_read += pipe_waiter_list_populate(&src_list,
|
||||
&pipe->wait_q.writers,
|
||||
bytes_to_read);
|
||||
|
||||
if ((bytes_can_read < min_xfer) &&
|
||||
(K_TIMEOUT_EQ(timeout, K_NO_WAIT))) {
|
||||
|
||||
/* The request can not be fulfilled. */
|
||||
|
||||
k_spin_unlock(&pipe->lock, key);
|
||||
*bytes_read = 0;
|
||||
|
||||
return -EIO;
|
||||
}
|
||||
|
||||
z_sched_lock();
|
||||
dest_desc = &_current->pipe_desc;
|
||||
|
||||
dest_desc->buffer = data;
|
||||
dest_desc->bytes_to_xfer = bytes_to_read;
|
||||
dest_desc->thread = _current;
|
||||
|
||||
src_desc = (struct _pipe_desc *)sys_dlist_get(&src_list);
|
||||
while (src_desc != NULL) {
|
||||
bytes_copied = pipe_xfer(dest_desc->buffer,
|
||||
dest_desc->bytes_to_xfer,
|
||||
src_desc->buffer,
|
||||
src_desc->bytes_to_xfer);
|
||||
|
||||
num_bytes_read += bytes_copied;
|
||||
|
||||
src_desc->buffer += bytes_copied;
|
||||
src_desc->bytes_to_xfer -= bytes_copied;
|
||||
|
||||
if (dest_desc->buffer != NULL) {
|
||||
dest_desc->buffer += bytes_copied;
|
||||
}
|
||||
dest_desc->bytes_to_xfer -= bytes_copied;
|
||||
|
||||
if (src_desc->thread == NULL) {
|
||||
|
||||
/* Reading from the pipe buffer. Update details. */
|
||||
|
||||
pipe->bytes_used -= bytes_copied;
|
||||
pipe->read_index += bytes_copied;
|
||||
if (pipe->read_index >= pipe->size) {
|
||||
pipe->read_index -= pipe->size;
|
||||
}
|
||||
} else if (src_desc->bytes_to_xfer == 0U) {
|
||||
|
||||
/* The thread's write request has been satisfied. */
|
||||
|
||||
(void) z_sched_wake(&pipe->wait_q.writers, 0, NULL);
|
||||
|
||||
reschedule_needed = true;
|
||||
}
|
||||
src_desc = (struct _pipe_desc *)sys_dlist_get(&src_list);
|
||||
}
|
||||
|
||||
if (pipe->bytes_used != pipe->size) {
|
||||
sys_dlist_t pipe_list;
|
||||
|
||||
/*
|
||||
* The pipe is not full. If there are any waiting writers,
|
||||
* refill the pipe.
|
||||
*/
|
||||
|
||||
sys_dlist_init(&src_list);
|
||||
sys_dlist_init(&pipe_list);
|
||||
|
||||
(void) pipe_waiter_list_populate(&src_list,
|
||||
&pipe->wait_q.writers,
|
||||
pipe->size - pipe->bytes_used);
|
||||
|
||||
(void) pipe_buffer_list_populate(&pipe_list, pipe_desc,
|
||||
pipe->buffer, pipe->size,
|
||||
pipe->write_index,
|
||||
pipe->read_index);
|
||||
|
||||
(void) pipe_write(pipe, &src_list,
|
||||
&pipe_list, &reschedule_needed);
|
||||
}
|
||||
|
||||
/*
|
||||
* The immediate success conditions below are backwards
|
||||
* compatible with an earlier pipe implementation.
|
||||
*/
|
||||
|
||||
if ((num_bytes_read == bytes_to_read) ||
|
||||
(K_TIMEOUT_EQ(timeout, K_NO_WAIT)) ||
|
||||
((num_bytes_read >= min_xfer) && (min_xfer > 0U))) {
|
||||
|
||||
/* The minimum amount of data has been copied */
|
||||
|
||||
*bytes_read = num_bytes_read;
|
||||
if (reschedule_needed) {
|
||||
z_reschedule(&pipe->lock, key);
|
||||
} else {
|
||||
k_spin_unlock(&pipe->lock, key);
|
||||
|
||||
num_bytes_read = pipe_buffer_get(pipe, data, bytes_to_read);
|
||||
|
||||
/*
|
||||
* 1. 'xfer_list' currently contains a list of writer threads that can
|
||||
* have their write requests fulfilled by the current call.
|
||||
* 2. 'writer' if not NULL points to a thread on the writer wait_q
|
||||
* that can post some of its requested data.
|
||||
* 3. Data will be copied from each writer's buffer to either the
|
||||
* reader's buffer and/or to the pipe's circular buffer.
|
||||
* 4. Interrupts are unlocked but the scheduler is locked to allow
|
||||
* ticks to be delivered but no scheduling to occur
|
||||
* 5. If 'writer' times out while we are copying data, not only do we
|
||||
* still have a pointer to it, but it can not execute until this
|
||||
* call is complete so it is still safe to copy data from it.
|
||||
*/
|
||||
|
||||
struct k_thread *thread = (struct k_thread *)
|
||||
sys_dlist_get(&xfer_list);
|
||||
while ((thread != NULL) && (num_bytes_read < bytes_to_read)) {
|
||||
desc = (struct k_pipe_desc *)thread->base.swap_data;
|
||||
data_off = (data == NULL) ? 0 : num_bytes_read;
|
||||
bytes_copied = pipe_xfer((uint8_t *)data + data_off,
|
||||
bytes_to_read - num_bytes_read,
|
||||
desc->buffer, desc->bytes_to_xfer);
|
||||
|
||||
num_bytes_read += bytes_copied;
|
||||
desc->buffer += bytes_copied;
|
||||
desc->bytes_to_xfer -= bytes_copied;
|
||||
|
||||
/*
|
||||
* It is expected that the write request will be satisfied.
|
||||
* However, if the read request was satisfied before the
|
||||
* write request was satisfied, then the write request must
|
||||
* finish later when writing to the pipe's circular buffer.
|
||||
*/
|
||||
if (num_bytes_read == bytes_to_read) {
|
||||
break;
|
||||
}
|
||||
z_ready_thread(thread);
|
||||
|
||||
thread = (struct k_thread *)sys_dlist_get(&xfer_list);
|
||||
}
|
||||
|
||||
if ((writer != NULL) && (num_bytes_read < bytes_to_read)) {
|
||||
desc = (struct k_pipe_desc *)writer->base.swap_data;
|
||||
data_off = (data == NULL) ? 0 : num_bytes_read;
|
||||
bytes_copied = pipe_xfer((uint8_t *)data + data_off,
|
||||
bytes_to_read - num_bytes_read,
|
||||
desc->buffer, desc->bytes_to_xfer);
|
||||
|
||||
num_bytes_read += bytes_copied;
|
||||
desc->buffer += bytes_copied;
|
||||
desc->bytes_to_xfer -= bytes_copied;
|
||||
}
|
||||
|
||||
/*
|
||||
* Copy as much data as possible from the writers (if any)
|
||||
* into the pipe's circular buffer.
|
||||
*/
|
||||
|
||||
while (thread != NULL) {
|
||||
desc = (struct k_pipe_desc *)thread->base.swap_data;
|
||||
bytes_copied = pipe_buffer_put(pipe, desc->buffer,
|
||||
desc->bytes_to_xfer);
|
||||
|
||||
desc->buffer += bytes_copied;
|
||||
desc->bytes_to_xfer -= bytes_copied;
|
||||
|
||||
/* Write request has been satisfied */
|
||||
z_ready_thread(thread);
|
||||
|
||||
thread = (struct k_thread *)sys_dlist_get(&xfer_list);
|
||||
}
|
||||
|
||||
if (writer != NULL) {
|
||||
desc = (struct k_pipe_desc *)writer->base.swap_data;
|
||||
bytes_copied = pipe_buffer_put(pipe, desc->buffer,
|
||||
desc->bytes_to_xfer);
|
||||
|
||||
desc->buffer += bytes_copied;
|
||||
desc->bytes_to_xfer -= bytes_copied;
|
||||
}
|
||||
|
||||
if (num_bytes_read == bytes_to_read) {
|
||||
k_sched_unlock();
|
||||
|
||||
*bytes_read = num_bytes_read;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT)
|
||||
&& num_bytes_read >= min_xfer
|
||||
&& min_xfer > 0U) {
|
||||
k_sched_unlock();
|
||||
/* The minimum amount of data has not been copied. Block. */
|
||||
|
||||
*bytes_read = num_bytes_read;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Not all data was read. It is important to note that when this
|
||||
* routine is invoked by either of the flush routines() both the <data>
|
||||
* and <timeout> parameters are set to NULL and K_NO_WAIT respectively.
|
||||
* Consequently, neither k_pipe_flush() nor k_pipe_buffer_flush()
|
||||
* will block.
|
||||
*
|
||||
* However, this routine may also be invoked by k_pipe_get() and there
|
||||
* is no enforcement of <data> being non-NULL when called from
|
||||
* kernel-space. That restriction is enforced when called from
|
||||
* user-space.
|
||||
*/
|
||||
|
||||
struct k_pipe_desc pipe_desc;
|
||||
|
||||
pipe_desc.buffer = (uint8_t *)data + num_bytes_read;
|
||||
pipe_desc.bytes_to_xfer = bytes_to_read - num_bytes_read;
|
||||
|
||||
if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
|
||||
SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, get, pipe, timeout);
|
||||
|
||||
_current->base.swap_data = &pipe_desc;
|
||||
k_spinlock_key_t key2 = k_spin_lock(&pipe->lock);
|
||||
_current->base.swap_data = dest_desc;
|
||||
|
||||
z_sched_unlock_no_reschedule();
|
||||
(void)z_pend_curr(&pipe->lock, key2,
|
||||
&pipe->wait_q.readers, timeout);
|
||||
} else {
|
||||
k_sched_unlock();
|
||||
}
|
||||
z_sched_wait(&pipe->lock, key, &pipe->wait_q.readers, timeout, NULL);
|
||||
|
||||
*bytes_read = bytes_to_read - pipe_desc.bytes_to_xfer;
|
||||
*bytes_read = bytes_to_read - dest_desc->bytes_to_xfer;
|
||||
|
||||
int ret = pipe_return_code(min_xfer, pipe_desc.bytes_to_xfer,
|
||||
int ret = pipe_return_code(min_xfer, dest_desc->bytes_to_xfer,
|
||||
bytes_to_read);
|
||||
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, ret);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int z_impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read,
|
||||
size_t *bytes_read, size_t min_xfer, k_timeout_t timeout)
|
||||
{
|
||||
__ASSERT(((arch_is_in_isr() == false) ||
|
||||
K_TIMEOUT_EQ(timeout, K_NO_WAIT)), "");
|
||||
|
||||
SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, get, pipe, timeout);
|
||||
|
||||
CHECKIF((min_xfer > bytes_to_read) || bytes_read == NULL) {
|
||||
|
|
|
@ -492,9 +492,11 @@ static void unref_check(struct z_object *ko, uintptr_t index)
|
|||
* specifically needs to happen depends on the object type.
|
||||
*/
|
||||
switch (ko->type) {
|
||||
#ifdef CONFIG_PIPES
|
||||
case K_OBJ_PIPE:
|
||||
k_pipe_cleanup((struct k_pipe *)ko->name);
|
||||
break;
|
||||
#endif
|
||||
case K_OBJ_MSGQ:
|
||||
k_msgq_cleanup((struct k_msgq *)ko->name);
|
||||
break;
|
||||
|
|
|
@ -244,6 +244,7 @@ config NET_SOCKETS_CAN_RECEIVERS
|
|||
config NET_SOCKETPAIR
|
||||
bool "Support for the socketpair syscall [EXPERIMENTAL]"
|
||||
select EXPERIMENTAL
|
||||
select PIPES
|
||||
depends on HEAP_MEM_POOL_SIZE != 0
|
||||
help
|
||||
Choose y here if you would like to use the socketpair(2)
|
||||
|
|
|
@ -30,8 +30,10 @@ struct k_spinlock _track_list_k_msgq_lock;
|
|||
struct k_mbox *_track_list_k_mbox;
|
||||
struct k_spinlock _track_list_k_mbox_lock;
|
||||
|
||||
#ifdef CONFIG_PIPES
|
||||
struct k_pipe *_track_list_k_pipe;
|
||||
struct k_spinlock _track_list_k_pipe_lock;
|
||||
#endif
|
||||
|
||||
struct k_queue *_track_list_k_queue;
|
||||
struct k_spinlock _track_list_k_queue_lock;
|
||||
|
@ -95,11 +97,13 @@ void sys_track_k_mbox_init(struct k_mbox *mbox)
|
|||
SYS_TRACK_LIST_PREPEND(_track_list_k_mbox, mbox));
|
||||
}
|
||||
|
||||
#ifdef CONFIG_PIPES
|
||||
void sys_track_k_pipe_init(struct k_pipe *pipe)
|
||||
{
|
||||
SYS_PORT_TRACING_TYPE_MASK(k_pipe,
|
||||
SYS_TRACK_LIST_PREPEND(_track_list_k_pipe, pipe));
|
||||
}
|
||||
#endif
|
||||
|
||||
void sys_track_k_queue_init(struct k_queue *queue)
|
||||
{
|
||||
|
@ -132,8 +136,10 @@ static int sys_track_static_init(const struct device *arg)
|
|||
SYS_PORT_TRACING_TYPE_MASK(k_mbox,
|
||||
SYS_TRACK_STATIC_INIT(k_mbox));
|
||||
|
||||
#ifdef CONFIG_PIPES
|
||||
SYS_PORT_TRACING_TYPE_MASK(k_pipe,
|
||||
SYS_TRACK_STATIC_INIT(k_pipe));
|
||||
#endif
|
||||
|
||||
SYS_PORT_TRACING_TYPE_MASK(k_queue,
|
||||
SYS_TRACK_STATIC_INIT(k_queue));
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue