diff --git a/src/utils/ring_buffer.c b/src/utils/ring_buffer.cpp similarity index 56% rename from src/utils/ring_buffer.c rename to src/utils/ring_buffer.cpp index 8afbdb4d1..91ea8e5c2 100644 --- a/src/utils/ring_buffer.c +++ b/src/utils/ring_buffer.cpp @@ -1,6 +1,7 @@ /** - * @file utils/ring_buffer.c + * @file utils/ring_buffer.cpp * @author Martin Pulec + * @author Martin Piatka */ /* * Copyright (c) 2011-2019 CESNET, z. s. p. o. @@ -40,11 +41,12 @@ #include #include #include +#include struct ring_buffer { char *data; int len; - volatile int start, end; + std::atomic start, end; }; struct ring_buffer *ring_buffer_init(int size) { @@ -66,31 +68,49 @@ void ring_buffer_destroy(struct ring_buffer *ring) { } int ring_buffer_read(struct ring_buffer * ring, char *out, int max_len) { - int end = ring->end; /* to avoid changes under our hand */ - int read_len = end - ring->start; + /* end index is modified by the writer thread, use acquire order to ensure + * that all writes by the writer thread made before the modification are + * observable in this (reader) thread */ + int end = std::atomic_load_explicit(&ring->end, std::memory_order_acquire); + // start index is modified only by this (reader) thread, so relaxed is enough + int start = std::atomic_load_explicit(&ring->start, std::memory_order_relaxed); + int read_len = end - start; if(read_len < 0) read_len += ring->len; if(read_len > max_len) read_len = max_len; - if(ring->start + read_len <= ring->len) { - memcpy(out, ring->data + ring->start, read_len); + if(start + read_len <= ring->len) { + memcpy(out, ring->data + start, read_len); } else { - int to_end = ring->len - ring->start; - memcpy(out, ring->data + ring->start, to_end); + int to_end = ring->len - start; + memcpy(out, ring->data + start, to_end); memcpy(out + to_end, ring->data, read_len - to_end); } - ring->start = (ring->start + read_len) % ring->len; + + /* Use release order to ensure that all reads are completed (no reads + * or writes in the current thread can be reordered after this store). + */ + std::atomic_store_explicit(&ring->start, (start + read_len) % ring->len, std::memory_order_release); return read_len; } void ring_buffer_flush(struct ring_buffer * buf) { - buf->start = buf->end = 0; + /* This should only be called while the buffer is not being read or + * written. The only way to safely flush without locking is by reading + * all available data from the reader thread. + */ + buf->start = 0; + buf->end = 0; } void ring_buffer_write(struct ring_buffer * ring, const char *in, int len) { - int to_end; + int start = std::atomic_load_explicit(&ring->start, std::memory_order_acquire); + + // end index is modified only by this (writer) thread, so relaxed is enough + int end = std::atomic_load_explicit(&ring->start, std::memory_order_relaxed); + if(len > ring->len) { fprintf(stderr, "Warning: too long write request for ring buffer (%d B)!!!\n", len); @@ -98,9 +118,8 @@ void ring_buffer_write(struct ring_buffer * ring, const char *in, int len) { } /* detect overrun */ { - int start = ring->start; - int read_len_old = ring->end - start; - int read_len_new = ((ring->end + len) % ring->len) - start; + int read_len_old = end - start; + int read_len_new = ((end + len) % ring->len) - start; if(read_len_old < 0) read_len_old += ring->len; @@ -111,14 +130,19 @@ void ring_buffer_write(struct ring_buffer * ring, const char *in, int len) { } } - to_end = ring->len - ring->end; + int to_end = ring->len - end; if(len <= to_end) { - memcpy(ring->data + ring->end, in, len); + memcpy(ring->data + end, in, len); } else { - memcpy(ring->data + ring->end, in, to_end); + memcpy(ring->data + end, in, to_end); memcpy(ring->data, in + to_end, len - to_end); } - ring->end = (ring->end + len) % ring->len; + + /* Use release order to ensure that all writes to the buffer are + * completed before advancing the end index (no reads or writes in the + * current thread can be reordered after this store). + */ + std::atomic_store_explicit(&ring->end, (end + len) % ring->len, std::memory_order_release); } int ring_get_size(struct ring_buffer * ring) { @@ -127,7 +151,23 @@ int ring_get_size(struct ring_buffer * ring) { int ring_get_current_size(struct ring_buffer * ring) { - return (ring->end - ring->start + ring->len) % ring->len; + /* This is called from both reader and writer thread. + * + * Writer case: + * If the reader modifies start index under our feet, it doesn't + * matter, because reader can only make the current size smaller. That + * means the writer may calculate less free space, but never more than + * really available. + * + * Reader case: + * If the writer modifies end index under our feet, it doesn't matter, + * because the writer can only make current size bigger. That means the + * reader may calculate less size for reading, but the read data is + * always valid. + */ + int start = std::atomic_load_explicit(&ring->start, std::memory_order_acquire); + int end = std::atomic_load_explicit(&ring->end, std::memory_order_acquire); + return (end - start + ring->len) % ring->len; } int ring_get_available_write_size(struct ring_buffer * ring){ diff --git a/src/utils/ring_buffer.h b/src/utils/ring_buffer.h index ace179d83..9d82e9a65 100644 --- a/src/utils/ring_buffer.h +++ b/src/utils/ring_buffer.h @@ -69,7 +69,9 @@ int ring_buffer_read(struct ring_buffer * ring, char *out, int max_len); void ring_buffer_write(struct ring_buffer * ring, const char *in, int len); int ring_get_size(struct ring_buffer * ring); /** - * Flushes all data from ring buffer + * Flushes all data from ring buffer. Not thread safe - needs external + * synchronization to ensure that this is not called while the buffer is being + * read or written. */ void ring_buffer_flush(struct ring_buffer *ring); /**