lib/os: P4 Work Queue: Pooled Parallel Preemptible Priority-based

This adds a somewhat special purpose IPC mechanism.  It's intended for
applications which have a "work queue" like architecture of discrete
callback items, but which need the ability to schedule those items
independently in separate threads across multiple CPUs.  So P4 Work
items:

1. Can run at any Zephyr scheduler priority and with any deadline
   (this feature assumes EDF scheduling is enabled)

2. Can be submitted at any time and from any context, including being
   resubmitted from within their own handler.

3. Will preempt any lower priority work as soon as they are runnable,
   according to the standard rules of Zephyr priority scheduling.

4. Run from a pool of worker threads that can be allocated efficiently
   (i.e. you need as many as the number of CPUs plus the number of
   preempted in-progress items, but no more).

Signed-off-by: Andy Ross <andrew.j.ross@intel.com>
This commit is contained in:
Andy Ross 2020-07-28 13:14:23 -07:00 committed by Anas Nashif
commit d2eadfa162
4 changed files with 381 additions and 0 deletions

View file

@ -125,6 +125,8 @@
Z_ITERABLE_SECTION_ROM(settings_handler_static, 4)
#endif
Z_ITERABLE_SECTION_ROM(k_p4wq_initparam, 4)
#if defined(CONFIG_EMUL)
SECTION_DATA_PROLOGUE(emulators_section,,)
{

163
include/sys/p4wq.h Normal file
View file

@ -0,0 +1,163 @@
/*
* Copyright (c) 2020 Intel Corporation
*
* SPDX-License-Identifier: Apache-2.0
*/
#ifndef ZEPHYR_INCLUDE_SYS_P4WQ_H_
#define ZEPHYR_INCLUDE_SYS_P4WQ_H_
#include <kernel.h>
/* Zephyr Pooled Parallel Preemptible Priority-based Work Queues */
struct k_p4wq_work;
/**
* P4 Queue handler callback
*/
typedef void (*k_p4wq_handler_t)(struct k_p4wq_work *work);
/**
* @brief P4 Queue Work Item
*
* User-populated struct representing a single work item. The
* priority and deadline fields are interpreted as thread scheduling
* priorities, exactly as per k_thread_priority_set() and
* k_thread_deadline_set().
*/
struct k_p4wq_work {
/* Filled out by submitting code */
int32_t priority;
int32_t deadline;
k_p4wq_handler_t handler;
/* reserved for implementation */
union {
struct rbnode rbnode;
sys_dlist_t dlnode;
};
struct k_thread *thread;
};
/**
* @brief P4 Queue
*
* Kernel pooled parallel preemptible priority-based work queue
*/
struct k_p4wq {
struct k_spinlock lock;
/* Pending threads waiting for work items
*
* FIXME: a waitq isn't really the right data structure here.
* Wait queues are priority-sorted, but we don't want that
* sorting overhead since we're effectively doing it ourselves
* by directly mutating the priority when a thread is
* unpended. We just want "blocked threads on a list", but
* there's no clean scheduler API for that.
*/
_wait_q_t waitq;
/* Work items waiting for processing */
struct rbtree queue;
/* Work items in progress */
sys_dlist_t active;
};
struct k_p4wq_initparam {
uint32_t num;
uintptr_t stack_size;
struct k_p4wq *queue;
struct k_thread *threads;
struct z_thread_stack_element *stacks;
};
/**
* @brief Statically initialize a P4 Work Queue
*
* Statically defines a struct k_p4wq object with the specified number
* of threads which will be initialized at boot and ready for use on
* entry to main().
*
* @param name Symbol name of the struct k_p4wq that will be defined
* @param n_threads Number of threads in the work queue pool
* @param stack_sz Requested stack size of each thread, in bytes
*/
#define K_P4WQ_DEFINE(name, n_threads, stack_sz) \
static K_THREAD_STACK_ARRAY_DEFINE(_p4stacks_##name, \
n_threads, stack_sz); \
static struct k_thread _p4threads_##name[n_threads]; \
static struct k_p4wq name; \
static const Z_STRUCT_SECTION_ITERABLE(k_p4wq_initparam, \
_init_##name) = { \
.num = n_threads, \
.stack_size = stack_sz, \
.threads = _p4threads_##name, \
.stacks = &(_p4stacks_##name[0][0]), \
.queue = &name, \
}
/**
* @brief Initialize P4 Queue
*
* Initializes a P4 Queue object. These objects much be initialized
* via this function (or statically using K_P4WQ_DEFINE) before any
* other API calls are made on it.
*
* @param queue P4 Queue to initialize
*/
void k_p4wq_init(struct k_p4wq *queue);
/**
* @brief Dynamically add a thread object to a P4 Queue pool
*
* Adds a thread to the pool managed by a P4 queue. The thread object
* must not be in use. If k_thread_create() has previously been
* called on it, it must be aborted before being given to the queue.
*
* @param queue P4 Queue to which to add the thread
* @param thread Uninitialized/aborted thread object to add
* @param stack Thread stack memory
* @param stack_size Thread stack size
*/
void k_p4wq_add_thread(struct k_p4wq *queue, struct k_thread *thread,
k_thread_stack_t *stack,
size_t stack_size);
/**
* @brief Submit work item to a P4 queue
*
* Submits the specified work item to the queue. The caller must have
* already initialized the relevant fields of the struct. The queue
* will execute the handler when CPU time is available and when no
* higher-priority work items are available. The handler may be
* invoked on any CPU.
*
* The caller must not mutate the struct while it is stored in the
* queue. The memory should remain unchanged until k_p4wq_cancel() is
* called or until the entry to the handler function.
*
* @note This call is a scheduling point, so if the submitted item (or
* any other ready thread) has a higher priority than the current
* thread and the current thread has a preemptible priority then the
* caller will yield.
*
* @param queue P4 Queue to which to submit
* @param item P4 work item to be submitted
*/
void k_p4wq_submit(struct k_p4wq *queue, struct k_p4wq_work *item);
/**
* @brief Cancel submitted P4 work item
*
* Cancels a previously-submitted work item and removes it from the
* queue. Returns true if the item was found in the queue and
* removed. If the function returns false, either the item was never
* submitted, has already been executed, or is still running.
*
* @return true if the item was successfully removed, otherwise false
*/
bool k_p4wq_cancel(struct k_p4wq *queue, struct k_p4wq_work *item);
#endif /* ZEPHYR_INCLUDE_SYS_P4WQ_H_ */

View file

@ -33,3 +33,10 @@ zephyr_sources_ifdef(CONFIG_RING_BUFFER ring_buffer.c)
zephyr_sources_ifdef(CONFIG_ASSERT assert.c)
zephyr_sources_ifdef(CONFIG_USERSPACE mutex.c)
zephyr_sources_ifdef(CONFIG_SCHED_DEADLINE p4wq.c)
zephyr_library_include_directories(
${ZEPHYR_BASE}/kernel/include
${ZEPHYR_BASE}/arch/${ARCH}/include
)

209
lib/os/p4wq.c Normal file
View file

@ -0,0 +1,209 @@
/*
* Copyright (c) 2020 Intel Corporation
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <logging/log.h>
#include <sys/p4wq.h>
#include <wait_q.h>
#include <ksched.h>
#include <init.h>
LOG_MODULE_REGISTER(p4wq);
struct device;
static void set_prio(struct k_thread *th, struct k_p4wq_work *item)
{
__ASSERT_NO_MSG(!IS_ENABLED(CONFIG_SMP) || !z_is_thread_queued(th));
th->base.prio = item->priority;
th->base.prio_deadline = item->deadline;
}
static bool rb_lessthan(struct rbnode *a, struct rbnode *b)
{
struct k_p4wq_work *aw = CONTAINER_OF(a, struct k_p4wq_work, rbnode);
struct k_p4wq_work *bw = CONTAINER_OF(b, struct k_p4wq_work, rbnode);
if (aw->priority != bw->priority) {
return aw->priority > bw->priority;
}
if (aw->deadline != bw->deadline) {
return aw->deadline - bw->deadline > 0;
}
return (uintptr_t)a < (uintptr_t)b;
}
/* Slightly different semantics: rb_lessthan must be perfectly
* symmetric (to produce a single tree structure) and will use the
* pointer value to break ties where priorities are equal, here we
* tolerate equality as meaning "not lessthan"
*/
static inline bool item_lessthan(struct k_p4wq_work *a, struct k_p4wq_work *b)
{
if (a->priority > b->priority) {
return true;
} else if ((a->priority == b->priority) &&
(a->deadline != b->deadline)) {
return a->deadline - b->deadline > 0;
}
return false;
}
static FUNC_NORETURN void p4wq_loop(void *p0, void *p1, void *p2)
{
ARG_UNUSED(p1);
ARG_UNUSED(p2);
struct k_p4wq *queue = p0;
k_spinlock_key_t k = k_spin_lock(&queue->lock);
while (true) {
struct rbnode *r = rb_get_max(&queue->queue);
if (r) {
struct k_p4wq_work *w
= CONTAINER_OF(r, struct k_p4wq_work, rbnode);
rb_remove(&queue->queue, r);
w->thread = _current;
sys_dlist_append(&queue->active, &w->dlnode);
set_prio(_current, w);
k_spin_unlock(&queue->lock, k);
w->handler(w);
k = k_spin_lock(&queue->lock);
/* Remove from the active list only if it
* wasn't resubmitted already
*/
if (w->thread == _current) {
sys_dlist_remove(&w->dlnode);
w->thread = NULL;
}
} else {
z_pend_curr(&queue->lock, k, &queue->waitq, K_FOREVER);
k = k_spin_lock(&queue->lock);
}
}
}
void k_p4wq_init(struct k_p4wq *queue)
{
memset(queue, 0, sizeof(*queue));
z_waitq_init(&queue->waitq);
queue->queue.lessthan_fn = rb_lessthan;
sys_dlist_init(&queue->active);
}
void k_p4wq_add_thread(struct k_p4wq *queue, struct k_thread *thread,
k_thread_stack_t *stack,
size_t stack_size)
{
k_thread_create(thread, stack, stack_size,
p4wq_loop, queue, NULL, NULL,
K_HIGHEST_THREAD_PRIO, 0, K_NO_WAIT);
}
static int static_init(const struct device *dev)
{
ARG_UNUSED(dev);
Z_STRUCT_SECTION_FOREACH(k_p4wq_initparam, pp) {
k_p4wq_init(pp->queue);
for (int i = 0; i < pp->num; i++) {
uintptr_t ssz = K_THREAD_STACK_LEN(pp->stack_size);
k_p4wq_add_thread(pp->queue, &pp->threads[i],
&pp->stacks[ssz * i],
pp->stack_size);
}
}
return 0;
}
/* We spawn a bunch of high priority threads, use the "SMP" initlevel
* so they can initialize in parallel instead of serially on the main
* CPU.
*/
SYS_INIT(static_init, SMP, 99);
void k_p4wq_submit(struct k_p4wq *queue, struct k_p4wq_work *item)
{
k_spinlock_key_t k = k_spin_lock(&queue->lock);
/* Input is a delta time from now (to match
* k_thread_deadline_set()), but we store and use the absolute
* cycle count.
*/
item->deadline += k_cycle_get_32();
/* Resubmission from within handler? Remove from active list */
if (item->thread == _current) {
sys_dlist_remove(&item->dlnode);
item->thread = NULL;
}
__ASSERT_NO_MSG(item->thread == NULL);
rb_insert(&queue->queue, &item->rbnode);
/* If there were other items already ahead of it in the queue,
* then we don't need to revisit active thread state and can
* return.
*/
if (rb_get_max(&queue->queue) != &item->rbnode) {
goto out;
}
/* Check the list of active (running or preempted) items, if
* there are at least an "active target" of those that are
* higher priority than the new item, then no one needs to be
* preempted and we can return.
*/
struct k_p4wq_work *wi;
uint32_t n_beaten_by = 0, active_target = CONFIG_MP_NUM_CPUS;
SYS_DLIST_FOR_EACH_CONTAINER(&queue->active, wi, dlnode) {
if (!item_lessthan(wi, item)) {
n_beaten_by++;
}
}
if (n_beaten_by >= active_target) {
goto out;
}
/* Grab a thread, set its priority and queue it. If there are
* no threads available to unpend, this is a soft runtime
* error: we are breaking our promise about run order.
* Complain.
*/
struct k_thread *th = z_unpend_first_thread(&queue->waitq);
if (th == NULL) {
LOG_WRN("Out of worker threads, priority guarantee violated");
goto out;
}
set_prio(th, item);
z_ready_thread(th);
z_reschedule(&queue->lock, k);
return;
out:
k_spin_unlock(&queue->lock, k);
}
bool k_p4wq_cancel(struct k_p4wq *queue, struct k_p4wq_work *item)
{
k_spinlock_key_t k = k_spin_lock(&queue->lock);
bool ret = rb_contains(&queue->queue, &item->rbnode);
if (ret) {
rb_remove(&queue->queue, &item->rbnode);
}
k_spin_unlock(&queue->lock, k);
return ret;
}