p4wq: add support for per-CPU, per-queue and delayed threads

Currently P4WQ supports queues with sets of user-provided
worked threads of arbitrary numbers. These threads are started
immediately upon initialisation.

This patch adds support for 3 more thread implementation options:

1. queue per thread. It adds a K_P4WQ_ARRAY_DEFINE() macro which
   initialises an array of queues and threads of the same number.
   These threads are then uniquely assigned to respective queues.
2. delayed start. With this option threads aren't started
   immediately upon queue initialisation. Instead a new function
   k_p4wq_enable_static_thread() has to be called to enable those
   threads individually.
3. queue per CPU. With this option the user can assign CPU masks
   to threads when calling k_p4wq_enable_static_thread().
   Otherwise the cpu_mask parameter to that function is ignored.
   Currently enabling this option implies option 2 above. Also so
   far to enable queues per CPU the user has to use
   K_P4WQ_ARRAY_DEFINE(), which means this option also implies 1
   above, but both these restrictions can be relaxed in the
   future if required.

Signed-off-by: Guennadi Liakhovetski <guennadi.liakhovetski@linux.intel.com>
This commit is contained in:
Guennadi Liakhovetski 2021-04-07 15:34:30 +02:00 committed by Anas Nashif
commit 91d90df2a7
2 changed files with 91 additions and 3 deletions

View file

@ -42,6 +42,10 @@ struct k_p4wq_work {
struct k_p4wq *queue;
};
#define K_P4WQ_QUEUE_PER_THREAD BIT(0)
#define K_P4WQ_DELAYED_START BIT(1)
#define K_P4WQ_USER_CPU_MASK BIT(2)
/**
* @brief P4 Queue
*
@ -66,6 +70,9 @@ struct k_p4wq {
/* Work items in progress */
sys_dlist_t active;
/* K_P4WQ_* flags above */
uint32_t flags;
};
struct k_p4wq_initparam {
@ -74,6 +81,7 @@ struct k_p4wq_initparam {
struct k_p4wq *queue;
struct k_thread *threads;
struct z_thread_stack_element *stacks;
uint32_t flags;
};
/**
@ -99,6 +107,33 @@ struct k_p4wq_initparam {
.threads = _p4threads_##name, \
.stacks = &(_p4stacks_##name[0][0]), \
.queue = &name, \
.flags = 0, \
}
/**
* @brief Statically initialize an array of P4 Work Queues
*
* Statically defines an array of struct k_p4wq objects with the specified
* number of threads which will be initialized at boot and ready for use on
* entry to main().
*
* @param name Symbol name of the struct k_p4wq array that will be defined
* @param n_threads Number of threads and work queues
* @param stack_sz Requested stack size of each thread, in bytes
*/
#define K_P4WQ_ARRAY_DEFINE(name, n_threads, stack_sz, flg) \
static K_THREAD_STACK_ARRAY_DEFINE(_p4stacks_##name, \
n_threads, stack_sz); \
static struct k_thread _p4threads_##name[n_threads]; \
static struct k_p4wq name[n_threads]; \
static const Z_STRUCT_SECTION_ITERABLE(k_p4wq_initparam, \
_init_##name) = { \
.num = n_threads, \
.stack_size = stack_sz, \
.threads = _p4threads_##name, \
.stacks = &(_p4stacks_##name[0][0]), \
.queue = name, \
.flags = K_P4WQ_QUEUE_PER_THREAD | flg, \
}
/**
@ -168,4 +203,7 @@ bool k_p4wq_cancel(struct k_p4wq *queue, struct k_p4wq_work *item);
*/
int k_p4wq_wait(struct k_p4wq_work *work, k_timeout_t timeout);
void k_p4wq_enable_static_thread(struct k_p4wq *queue, struct k_thread *thread,
uint32_t cpu_mask);
#endif /* ZEPHYR_INCLUDE_SYS_P4WQ_H_ */

View file

@ -134,7 +134,8 @@ void k_p4wq_add_thread(struct k_p4wq *queue, struct k_thread *thread,
{
k_thread_create(thread, stack, stack_size,
p4wq_loop, queue, NULL, NULL,
K_HIGHEST_THREAD_PRIO, 0, K_NO_WAIT);
K_HIGHEST_THREAD_PRIO, 0,
queue->flags & K_P4WQ_DELAYED_START ? K_FOREVER : K_NO_WAIT);
}
static int static_init(const struct device *dev)
@ -142,18 +143,67 @@ static int static_init(const struct device *dev)
ARG_UNUSED(dev);
Z_STRUCT_SECTION_FOREACH(k_p4wq_initparam, pp) {
k_p4wq_init(pp->queue);
for (int i = 0; i < pp->num; i++) {
uintptr_t ssz = K_THREAD_STACK_LEN(pp->stack_size);
struct k_p4wq *q = pp->flags & K_P4WQ_QUEUE_PER_THREAD ?
pp->queue + i : pp->queue;
k_p4wq_add_thread(pp->queue, &pp->threads[i],
if (!i || (pp->flags & K_P4WQ_QUEUE_PER_THREAD))
k_p4wq_init(q);
q->flags = pp->flags;
/*
* If the user wants to specify CPU affinity, we have to
* delay starting threads until that has been done
*/
if (q->flags & K_P4WQ_USER_CPU_MASK)
q->flags |= K_P4WQ_DELAYED_START;
k_p4wq_add_thread(q, &pp->threads[i],
&pp->stacks[ssz * i],
pp->stack_size);
if (pp->flags & K_P4WQ_DELAYED_START)
z_mark_thread_as_suspended(&pp->threads[i]);
#ifdef CONFIG_SCHED_CPU_MASK
if (pp->flags & K_P4WQ_USER_CPU_MASK) {
int ret = k_thread_cpu_mask_clear(&pp->threads[i]);
if (ret < 0)
LOG_ERR("Couldn't clear CPU mask: %d", ret);
}
#endif
}
}
return 0;
}
void k_p4wq_enable_static_thread(struct k_p4wq *queue, struct k_thread *thread,
uint32_t cpu_mask)
{
#ifdef CONFIG_SCHED_CPU_MASK
if (queue->flags & K_P4WQ_USER_CPU_MASK) {
unsigned int i;
while ((i = find_lsb_set(cpu_mask))) {
int ret = k_thread_cpu_mask_enable(thread, i - 1);
if (ret < 0)
LOG_ERR("Couldn't set CPU mask for %u: %d", i, ret);
cpu_mask &= ~BIT(i - 1);
}
}
#endif
if (queue->flags & K_P4WQ_DELAYED_START) {
z_mark_thread_as_not_suspended(thread);
k_thread_start(thread);
}
}
/* We spawn a bunch of high priority threads, use the "SMP" initlevel
* so they can initialize in parallel instead of serially on the main
* CPU.