lib: os: ring_buffer: Fix race condition

Ring buffer claims that no synchronization is needed
when there is a single producer and single consumer.
However, recent changes have broken that promise since
indexes rewind mechanism was modifing head and tail
when consuming. Patch fixes that by spliting rewinding
of indexes so that producer rewinds tail only and
consumer rewinds head.

Signed-off-by: Krzysztof Chruscinski <krzysztof.chruscinski@nordicsemi.no>
This commit is contained in:
Krzysztof Chruscinski 2021-07-30 08:29:48 +02:00 committed by Carles Cufí
commit 58942f3f13
2 changed files with 87 additions and 58 deletions

View file

@ -174,10 +174,7 @@ static inline void ring_buf_init(struct ring_buf *buf,
*
* @return 1 if the ring buffer is empty, or 0 if not.
*/
static inline int ring_buf_is_empty(struct ring_buf *buf)
{
return (buf->head == buf->tail);
}
int ring_buf_is_empty(struct ring_buf *buf);
/**
* @brief Reset ring buffer state.
@ -198,10 +195,7 @@ static inline void ring_buf_reset(struct ring_buf *buf)
*
* @return Ring buffer free space (in 32-bit words or bytes).
*/
static inline uint32_t ring_buf_space_get(struct ring_buf *buf)
{
return buf->size - (buf->tail - buf->head);
}
uint32_t ring_buf_space_get(struct ring_buf *buf);
/**
* @brief Return ring buffer capacity.
@ -222,10 +216,7 @@ static inline uint32_t ring_buf_capacity_get(struct ring_buf *buf)
*
* @return Ring buffer space used (in 32-bit words or bytes).
*/
static inline uint32_t ring_buf_size_get(struct ring_buf *buf)
{
return buf->tail - buf->head;
}
uint32_t ring_buf_size_get(struct ring_buf *buf);
/**
* @brief Write a data item to a ring buffer.

View file

@ -36,56 +36,48 @@ static uint32_t mod(struct ring_buf *buf, uint32_t val)
return likely(buf->mask) ? val & buf->mask : val % buf->size;
}
/* Check if indexes did not progress too far (too close to 32-bit wrapping).
* If so, then reduce all indexes by an arbitrary value.
*/
static void item_indexes_rewind(struct ring_buf *buf)
static uint32_t get_rewind_value(uint32_t buf_size, uint32_t threshold)
{
uint32_t rew;
uint32_t threshold = ring_buf_get_rewind_threshold();
if (buf->head < threshold) {
return;
}
rew = buf->size * (threshold / buf->size);
k_spinlock_key_t key = k_spin_lock(&buf->lock);
buf->tail -= rew;
buf->head -= rew;
k_spin_unlock(&buf->lock, key);
return buf_size * (threshold / buf_size);
}
/* Check if indexes did not progresses too far (too close to 32-bit wrapping).
* If so, then rewind all indexes by an arbitrary value. For byte mode temporary
* indexes must also be reduced.
*/
static void byte_indexes_rewind(struct ring_buf *buf)
int ring_buf_is_empty(struct ring_buf *buf)
{
uint32_t rew;
uint32_t threshold = ring_buf_get_rewind_threshold();
uint32_t tail = buf->tail;
uint32_t head = buf->head;
/* Checking head since it is the smallest index. */
if (buf->head < threshold) {
return;
if (tail < head) {
tail += get_rewind_value(buf->size,
ring_buf_get_rewind_threshold());
}
rew = buf->size * (threshold / buf->size);
return (head == tail);
}
k_spinlock_key_t key = k_spin_lock(&buf->lock);
uint32_t ring_buf_size_get(struct ring_buf *buf)
{
uint32_t tail = buf->tail;
uint32_t head = buf->head;
buf->tail -= rew;
buf->head -= rew;
buf->misc.byte_mode.tmp_head -= rew;
buf->misc.byte_mode.tmp_tail -= rew;
k_spin_unlock(&buf->lock, key);
if (tail < head) {
tail += get_rewind_value(buf->size,
ring_buf_get_rewind_threshold());
}
return tail - head;
}
uint32_t ring_buf_space_get(struct ring_buf *buf)
{
return buf->size - ring_buf_size_get(buf);
}
int ring_buf_item_put(struct ring_buf *buf, uint16_t type, uint8_t value,
uint32_t *data, uint8_t size32)
{
uint32_t i, space, index, rc;
uint32_t threshold = ring_buf_get_rewind_threshold();
uint32_t rew;
space = ring_buf_space_get(buf);
if (space >= (size32 + 1)) {
@ -108,7 +100,14 @@ int ring_buf_item_put(struct ring_buf *buf, uint16_t type, uint8_t value,
}
}
buf->tail = buf->tail + size32 + 1;
/* Check if indexes shall be rewound. */
if (buf->tail > threshold) {
rew = get_rewind_value(buf->size, threshold);
} else {
rew = 0;
}
buf->tail = buf->tail + (size32 + 1 - rew);
rc = 0U;
} else {
buf->misc.item_mode.dropped_put_count++;
@ -123,9 +122,19 @@ int ring_buf_item_get(struct ring_buf *buf, uint16_t *type, uint8_t *value,
{
struct ring_element *header;
uint32_t i, index;
uint32_t tail = buf->tail;
uint32_t rew;
if (ring_buf_is_empty(buf)) {
/* Tail is always ahead, if it is not, it's only because it got rewound. */
if (tail < buf->head) {
/* Locally undo rewind to get tail aligned with head. */
rew = get_rewind_value(buf->size,
ring_buf_get_rewind_threshold());
tail += rew;
} else if (ring_buf_is_empty(buf)) {
return -EAGAIN;
} else {
rew = 0;
}
header = (struct ring_element *) &buf->buf.buf32[mod(buf, buf->head)];
@ -153,9 +162,8 @@ int ring_buf_item_get(struct ring_buf *buf, uint16_t *type, uint8_t *value,
}
}
buf->head = buf->head + header->length + 1;
item_indexes_rewind(buf);
/* Include potential rewinding */
buf->head = buf->head + header->length + 1 - rew;
return 0;
}
@ -197,11 +205,21 @@ uint32_t ring_buf_put_claim(struct ring_buf *buf, uint8_t **data, uint32_t size)
int ring_buf_put_finish(struct ring_buf *buf, uint32_t size)
{
uint32_t rew;
uint32_t threshold = ring_buf_get_rewind_threshold();
if ((buf->tail + size) > (buf->head + buf->size)) {
return -EINVAL;
}
buf->tail += size;
/* Check if indexes shall be rewind. */
if (buf->tail > threshold) {
rew = get_rewind_value(buf->size, threshold);
} else {
rew = 0;
}
buf->tail += (size - rew);
buf->misc.byte_mode.tmp_tail = buf->tail;
return 0;
@ -231,9 +249,17 @@ uint32_t ring_buf_put(struct ring_buf *buf, const uint8_t *data, uint32_t size)
uint32_t ring_buf_get_claim(struct ring_buf *buf, uint8_t **data, uint32_t size)
{
uint32_t space, granted_size, trail_size, tmp_head_mod;
uint32_t tail = buf->tail;
/* Tail is always ahead, if it is not, it's only because it got rewinded. */
if (tail < buf->misc.byte_mode.tmp_head) {
/* Locally, increment it to pre-rewind value */
tail += get_rewind_value(buf->size,
ring_buf_get_rewind_threshold());
}
tmp_head_mod = mod(buf, buf->misc.byte_mode.tmp_head);
space = buf->tail - buf->misc.byte_mode.tmp_head;
space = tail - buf->misc.byte_mode.tmp_head;
trail_size = buf->size - tmp_head_mod;
/* Limit requested size to available size. */
@ -250,15 +276,27 @@ uint32_t ring_buf_get_claim(struct ring_buf *buf, uint8_t **data, uint32_t size)
int ring_buf_get_finish(struct ring_buf *buf, uint32_t size)
{
if ((buf->head + size) > buf->tail) {
uint32_t tail = buf->tail;
uint32_t rew;
/* Tail is always ahead, if it is not, it's only because it got rewinded. */
if (tail < buf->misc.byte_mode.tmp_head) {
/* tail was rewinded. Locally, increment it to pre-rewind value */
rew = get_rewind_value(buf->size,
ring_buf_get_rewind_threshold());
tail += rew;
} else {
rew = 0;
}
if ((buf->head + size) > tail) {
return -EINVAL;
}
buf->head += size;
/* Include potential rewinding. */
buf->head += (size - rew);
buf->misc.byte_mode.tmp_head = buf->head;
byte_indexes_rewind(buf);
return 0;
}