Producer/Consumer: Refactor to use Queue policies

Previously the Producer and Consumer interfaces tracked the
Consumer and Producer respectively at the other end of the
queue that they interacted with.  This was done to avoid
modifying the queue implementation, but resulted in a rougher
interface that required additional initialization steps and
prevented alternative configurations; many producers and one
consumer for example.

This commit uses the new queue policies to track this
information.  The new direct policy behaves as the old producer
and consumers did.  Now the producers and consumers are just
named references to the queue that they work on and a convenient
location for a notification callback when the queue is updated in
a way that is relevent to the producer or consumer.

All users of Producer and Consumer have been updated including the
stream adaptors which are in use by the echo test code and the
mcdp28x0 driver.  Use of the stream adaptors has also been
simplified.

Signed-off-by: Anton Staaf <robotboy@chromium.org>

BRANCH=None
BUG=None
TEST=make buildall -j
     Manual testing of Ryu (P5) and discovery board echo task

Change-Id: I704be6378a31b4e20f5063295eff9943e4900409
Reviewed-on: https://chromium-review.googlesource.com/271792
Reviewed-by: Anton Staaf <robotboy@chromium.org>
Commit-Queue: Anton Staaf <robotboy@chromium.org>
Trybot-Ready: Anton Staaf <robotboy@chromium.org>
Tested-by: Anton Staaf <robotboy@chromium.org>
This commit is contained in:
Anton Staaf
2015-05-13 13:26:12 -07:00
committed by ChromeOS Commit Bot
parent a0ebf0a008
commit 855646e36b
17 changed files with 271 additions and 373 deletions

View File

@@ -13,6 +13,7 @@
#include "console.h"
#include "panic.h"
#include "task.h"
#include "queue_policies.h"
#include "stream_adaptor.h"
#include "timer.h"
#include "usart-stm32f0.h"
@@ -29,49 +30,23 @@ static void out_ready(struct out_stream const *stream)
task_wake(TASK_ID_ECHO);
}
#define USART_STREAM_CONFIG(NAME, \
HW, \
BAUD, \
RX_SIZE, \
TX_SIZE, \
IN_READY, \
OUT_READY) \
\
static struct queue const CONCAT2(NAME, _rx_queue) = \
QUEUE_NULL(RX_SIZE, uint8_t); \
static struct queue const CONCAT2(NAME, _tx_queue) = \
QUEUE_NULL(TX_SIZE, uint8_t); \
\
struct usart_config const NAME; \
\
IN_STREAM_FROM_PRODUCER(CONCAT2(NAME, _in), \
NAME.producer, \
CONCAT2(NAME, _rx_queue), \
IN_READY) \
OUT_STREAM_FROM_CONSUMER(CONCAT2(NAME, _out), \
NAME.consumer, \
CONCAT2(NAME, _tx_queue), \
OUT_READY) \
\
USART_CONFIG(NAME, \
HW, \
BAUD, \
CONCAT2(NAME, _rx_queue), \
CONCAT2(NAME, _tx_queue), \
CONCAT2(NAME, _in).consumer, \
CONCAT2(NAME, _out).producer)
USART_STREAM_CONFIG(usart1, usart1_hw, 115200, 64, 64, in_ready, NULL);
USART_STREAM_CONFIG(usart3, usart3_hw, 115200, 64, 64, in_ready, NULL);
USART_STREAM_CONFIG(usart4, usart4_hw, 115200, 64, 64, in_ready, NULL);
static struct queue const usb_rx_queue = QUEUE_NULL(256, uint8_t);
static struct queue const usb_tx_queue = QUEUE_NULL(256, uint8_t);
/*
* Forward declare all device configurations so that they can be used to
* construct appropriate queue policies within the IO_STREAM_CONFIG macro.
*/
struct usart_config const usart1;
struct usart_config const usart3;
struct usart_config const usart4;
struct usb_stream_config const usb_stream1;
IN_STREAM_FROM_PRODUCER(usb_in, usb_stream1.producer, usb_rx_queue, in_ready)
OUT_STREAM_FROM_CONSUMER(usb_out, usb_stream1.consumer, usb_tx_queue, out_ready)
IO_STREAM_CONFIG(usart1, 64, 64, in_ready, NULL)
IO_STREAM_CONFIG(usart3, 64, 64, in_ready, NULL)
IO_STREAM_CONFIG(usart4, 64, 64, in_ready, NULL)
IO_STREAM_CONFIG(usb_stream1, 256, 256, in_ready, out_ready)
USART_CONFIG(usart1, usart1_hw, 115200, usart1_rx_queue, usart1_tx_queue)
USART_CONFIG(usart3, usart3_hw, 115200, usart3_rx_queue, usart3_tx_queue)
USART_CONFIG(usart4, usart4_hw, 115200, usart4_rx_queue, usart4_tx_queue)
USB_STREAM_CONFIG(usb_stream1,
USB_IFACE_STREAM,
@@ -79,10 +54,8 @@ USB_STREAM_CONFIG(usb_stream1,
USB_EP_STREAM,
64,
64,
usb_rx_queue,
usb_tx_queue,
usb_in.consumer,
usb_out.producer)
usb_stream1_rx_queue,
usb_stream1_tx_queue)
struct stream_console_state {
size_t wrote;
@@ -106,7 +79,9 @@ struct stream_console_config {
STREAM_CONSOLE_CONFIG(usart1_stream_console, &usart1_in.in, &usart1_out.out)
STREAM_CONSOLE_CONFIG(usart3_stream_console, &usart3_in.in, &usart3_out.out)
STREAM_CONSOLE_CONFIG(usart4_stream_console, &usart4_in.in, &usart4_out.out)
STREAM_CONSOLE_CONFIG(usb_stream1_console, &usb_in.in, &usb_out.out)
STREAM_CONSOLE_CONFIG(usb_stream1_console,
&usb_stream1_in.in,
&usb_stream1_out.out)
static struct stream_console_config const *const consoles[] = {
&usart1_stream_console,

View File

@@ -25,6 +25,7 @@
#include "motion_sense.h"
#include "power.h"
#include "power_button.h"
#include "queue_policies.h"
#include "registers.h"
#include "spi.h"
#include "task.h"
@@ -210,58 +211,51 @@ BUILD_ASSERT(ARRAY_SIZE(usb_strings) == USB_STR_COUNT);
* Define AP and SH console forwarding queues and associated USART and USB
* stream endpoints.
*/
struct usart_config const ap_usart;
struct usart_config const sh_usart;
static struct queue const ap_usart_to_usb = QUEUE_NULL(64, uint8_t);
static struct queue const usb_to_ap_usart = QUEUE_NULL(64, uint8_t);
static struct queue const sh_usart_to_usb = QUEUE_NULL(64, uint8_t);
static struct queue const usb_to_sh_usart = QUEUE_NULL(64, uint8_t);
struct usb_stream_config const ap_usb;
struct usb_stream_config const sh_usb;
struct usb_stream_config const usb_ap_stream;
struct usb_stream_config const usb_sh_stream;
static struct queue const ap_usart_to_usb = QUEUE_DIRECT(64, uint8_t,
ap_usart.producer,
ap_usb.consumer);
static struct queue const ap_usb_to_usart = QUEUE_DIRECT(64, uint8_t,
ap_usb.producer,
ap_usart.consumer);
static struct queue const sh_usart_to_usb = QUEUE_DIRECT(64, uint8_t,
sh_usart.producer,
sh_usb.consumer);
static struct queue const sh_usb_to_usart = QUEUE_DIRECT(64, uint8_t,
sh_usb.producer,
sh_usart.consumer);
USART_CONFIG(usart1,
usart1_hw,
115200,
ap_usart_to_usb,
usb_to_ap_usart,
usb_ap_stream.consumer,
usb_ap_stream.producer)
USART_CONFIG(usart3,
usart3_hw,
115200,
sh_usart_to_usb,
usb_to_sh_usart,
usb_sh_stream.consumer,
usb_sh_stream.producer)
USART_CONFIG(ap_usart, usart1_hw, 115200, ap_usart_to_usb, ap_usb_to_usart)
USART_CONFIG(sh_usart, usart3_hw, 115200, sh_usart_to_usb, sh_usb_to_usart)
#define AP_USB_STREAM_RX_SIZE 16
#define AP_USB_STREAM_TX_SIZE 16
USB_STREAM_CONFIG(usb_ap_stream,
USB_STREAM_CONFIG(ap_usb,
USB_IFACE_AP_STREAM,
USB_STR_AP_STREAM_NAME,
USB_EP_AP_STREAM,
AP_USB_STREAM_RX_SIZE,
AP_USB_STREAM_TX_SIZE,
usb_to_ap_usart,
ap_usart_to_usb,
usart1.consumer,
usart1.producer)
ap_usb_to_usart,
ap_usart_to_usb)
#define SH_USB_STREAM_RX_SIZE 16
#define SH_USB_STREAM_TX_SIZE 16
USB_STREAM_CONFIG(usb_sh_stream,
USB_STREAM_CONFIG(sh_usb,
USB_IFACE_SH_STREAM,
USB_STR_SH_STREAM_NAME,
USB_EP_SH_STREAM,
SH_USB_STREAM_RX_SIZE,
SH_USB_STREAM_TX_SIZE,
usb_to_sh_usart,
sh_usart_to_usb,
usart3.consumer,
usart3.producer)
sh_usb_to_usart,
sh_usart_to_usb)
/* Initialize board. */
static void board_init(void)
@@ -307,11 +301,11 @@ static void board_init(void)
* Initialize AP and SH console forwarding USARTs and queues.
*/
queue_init(&ap_usart_to_usb);
queue_init(&usb_to_ap_usart);
queue_init(&ap_usb_to_usart);
queue_init(&sh_usart_to_usb);
queue_init(&usb_to_sh_usart);
usart_init(&usart1);
usart_init(&usart3);
queue_init(&sh_usb_to_usart);
usart_init(&ap_usart);
usart_init(&sh_usart);
/*
* Enable CC lines after all GPIO have been initialized. Note, it is

View File

@@ -23,6 +23,7 @@
#include "lid_switch.h"
#include "power.h"
#include "power_button.h"
#include "queue_policies.h"
#include "registers.h"
#include "spi.h"
#include "task.h"
@@ -208,58 +209,51 @@ BUILD_ASSERT(ARRAY_SIZE(usb_strings) == USB_STR_COUNT);
* Define AP and SH console forwarding queues and associated USART and USB
* stream endpoints.
*/
struct usart_config const ap_usart;
struct usart_config const sh_usart;
static struct queue const ap_usart_to_usb = QUEUE_NULL(64, uint8_t);
static struct queue const usb_to_ap_usart = QUEUE_NULL(64, uint8_t);
static struct queue const sh_usart_to_usb = QUEUE_NULL(64, uint8_t);
static struct queue const usb_to_sh_usart = QUEUE_NULL(64, uint8_t);
struct usb_stream_config const ap_usb;
struct usb_stream_config const sh_usb;
struct usb_stream_config const usb_ap_stream;
struct usb_stream_config const usb_sh_stream;
static struct queue const ap_usart_to_usb = QUEUE_DIRECT(64, uint8_t,
ap_usart.producer,
ap_usb.consumer);
static struct queue const ap_usb_to_usart = QUEUE_DIRECT(64, uint8_t,
ap_usb.producer,
ap_usart.consumer);
static struct queue const sh_usart_to_usb = QUEUE_DIRECT(64, uint8_t,
sh_usart.producer,
sh_usb.consumer);
static struct queue const sh_usb_to_usart = QUEUE_DIRECT(64, uint8_t,
sh_usb.producer,
sh_usart.consumer);
USART_CONFIG(usart1,
usart1_hw,
115200,
ap_usart_to_usb,
usb_to_ap_usart,
usb_ap_stream.consumer,
usb_ap_stream.producer)
USART_CONFIG(usart3,
usart3_hw,
115200,
sh_usart_to_usb,
usb_to_sh_usart,
usb_sh_stream.consumer,
usb_sh_stream.producer)
USART_CONFIG(ap_usart, usart1_hw, 115200, ap_usart_to_usb, ap_usb_to_usart)
USART_CONFIG(sh_usart, usart3_hw, 115200, sh_usart_to_usb, sh_usb_to_usart)
#define AP_USB_STREAM_RX_SIZE 16
#define AP_USB_STREAM_TX_SIZE 16
USB_STREAM_CONFIG(usb_ap_stream,
USB_STREAM_CONFIG(ap_usb,
USB_IFACE_AP_STREAM,
USB_STR_AP_STREAM_NAME,
USB_EP_AP_STREAM,
AP_USB_STREAM_RX_SIZE,
AP_USB_STREAM_TX_SIZE,
usb_to_ap_usart,
ap_usart_to_usb,
usart1.consumer,
usart1.producer)
ap_usb_to_usart,
ap_usart_to_usb)
#define SH_USB_STREAM_RX_SIZE 16
#define SH_USB_STREAM_TX_SIZE 16
USB_STREAM_CONFIG(usb_sh_stream,
USB_STREAM_CONFIG(sh_usb,
USB_IFACE_SH_STREAM,
USB_STR_SH_STREAM_NAME,
USB_EP_SH_STREAM,
SH_USB_STREAM_RX_SIZE,
SH_USB_STREAM_TX_SIZE,
usb_to_sh_usart,
sh_usart_to_usb,
usart3.consumer,
usart3.producer)
sh_usb_to_usart,
sh_usart_to_usb)
/* Initialize board. */
static void board_init(void)
@@ -305,11 +299,11 @@ static void board_init(void)
* Initialize AP and SH console forwarding USARTs and queues.
*/
queue_init(&ap_usart_to_usb);
queue_init(&usb_to_ap_usart);
queue_init(&ap_usb_to_usart);
queue_init(&sh_usart_to_usb);
queue_init(&usb_to_sh_usart);
usart_init(&usart1);
usart_init(&usart3);
queue_init(&sh_usb_to_usart);
usart_init(&ap_usart);
usart_init(&sh_usart);
/*
* Enable CC lines after all GPIO have been initialized. Note, it is

View File

@@ -142,7 +142,7 @@ static void usart_interrupt_tx(struct usart_config const *config)
intptr_t base = config->hw->base;
uint8_t byte;
if (consumer_read_unit(&config->consumer, &byte)) {
if (queue_remove_unit(config->consumer.queue, &byte)) {
STM32_USART_TDR(base) = byte;
/*
@@ -171,7 +171,7 @@ static void usart_interrupt_rx(struct usart_config const *config)
intptr_t base = config->hw->base;
uint8_t byte = STM32_USART_RDR(base);
if (!producer_write_unit(&config->producer, &byte))
if (!queue_add_unit(config->producer.queue, &byte))
atomic_add((uint32_t *) &config->state->rx_dropped, 1);
}

View File

@@ -104,11 +104,7 @@ extern struct producer_ops const usart_producer_ops;
* HW is the name of the usart_hw_config provided by the variant specific code.
*
* RX_QUEUE and TX_QUEUE are the names of the RX and TX queues that this USART
* should write to and read from respectively. They must match the queues
* that the CONSUMER and PRODUCER read from and write to respectively.
*
* CONSUMER and PRODUCER are the names of the consumer and producer objects at
* the other ends of the RX and TX queues respectively.
* should write to and read from respectively.
*/
/*
* The following assertions can not be made because they require access to
@@ -116,16 +112,12 @@ extern struct producer_ops const usart_producer_ops;
*
* BUILD_ASSERT(RX_QUEUE.unit_bytes == 1);
* BUILD_ASSERT(TX_QUEUE.unit_bytes == 1);
* BUILD_ASSERT(PRODUCER.queue == &TX_QUEUE);
* BUILD_ASSERT(CONSUMER.queue == &RX_QUEUE);
*/
#define USART_CONFIG(NAME, \
HW, \
BAUD, \
RX_QUEUE, \
TX_QUEUE, \
CONSUMER, \
PRODUCER) \
TX_QUEUE) \
\
static struct usart_state CONCAT2(NAME, _state); \
struct usart_config const NAME = { \
@@ -133,14 +125,12 @@ extern struct producer_ops const usart_producer_ops;
.state = &CONCAT2(NAME, _state), \
.baud = BAUD, \
.consumer = { \
.producer = &PRODUCER, \
.queue = &TX_QUEUE, \
.ops = &usart_consumer_ops, \
.queue = &TX_QUEUE, \
.ops = &usart_consumer_ops, \
}, \
.producer = { \
.consumer = &CONSUMER, \
.queue = &RX_QUEUE, \
.ops = &usart_producer_ops, \
.queue = &RX_QUEUE, \
.ops = &usart_producer_ops, \
}, \
};

View File

@@ -27,16 +27,16 @@ static size_t rx_read(struct usb_stream_config const *config)
if (count >= queue_space(config->producer.queue))
return 0;
return producer_write_memcpy(&config->producer,
(void *) address,
count,
memcpy_from_usbram);
return queue_add_memcpy(config->producer.queue,
(void *) address,
count,
memcpy_from_usbram);
}
static size_t tx_write(struct usb_stream_config const *config)
{
uintptr_t address = btable_ep[config->endpoint].tx_addr;
size_t count = consumer_read_memcpy(&config->consumer,
size_t count = queue_remove_memcpy(config->consumer.queue,
(void *) address,
config->tx_size,
memcpy_to_usbram);

View File

@@ -95,11 +95,7 @@ extern struct producer_ops const usb_stream_producer_ops;
* parameters are dictated by the USB peripheral.
*
* RX_QUEUE and TX_QUEUE are the names of the RX and TX queues that this driver
* should write to and read from respectively. They must match the queues
* that the CONSUMER and PRODUCER read from and write to respectively.
*
* CONSUMER and PRODUCER are the names of the consumer and producer objects at
* the other ends of the RX and TX queues respectively.
* should write to and read from respectively.
*/
/*
* The following assertions can not be made because they require access to
@@ -109,8 +105,6 @@ extern struct producer_ops const usb_stream_producer_ops;
* BUILD_ASSERT(TX_QUEUE.buffer_units >= TX_SIZE);
* BUILD_ASSERT(RX_QUEUE.unit_bytes == 1);
* BUILD_ASSERT(TX_QUEUE.unit_bytes == 1);
* BUILD_ASSERT(PRODUCER.queue == &TX_QUEUE);
* BUILD_ASSERT(CONSUMER.queue == &RX_QUEUE);
*/
#define USB_STREAM_CONFIG(NAME, \
INTERFACE, \
@@ -119,9 +113,7 @@ extern struct producer_ops const usb_stream_producer_ops;
RX_SIZE, \
TX_SIZE, \
RX_QUEUE, \
TX_QUEUE, \
CONSUMER, \
PRODUCER) \
TX_QUEUE) \
\
BUILD_ASSERT(RX_SIZE <= USB_MAX_PACKET_SIZE); \
BUILD_ASSERT(TX_SIZE <= USB_MAX_PACKET_SIZE); \
@@ -145,14 +137,12 @@ extern struct producer_ops const usb_stream_producer_ops;
.rx_ram = CONCAT2(NAME, _ep_rx_buffer), \
.tx_ram = CONCAT2(NAME, _ep_tx_buffer), \
.consumer = { \
.producer = &PRODUCER, \
.queue = &TX_QUEUE, \
.ops = &usb_stream_consumer_ops, \
.queue = &TX_QUEUE, \
.ops = &usb_stream_consumer_ops, \
}, \
.producer = { \
.consumer = &CONSUMER, \
.queue = &RX_QUEUE, \
.ops = &usb_stream_producer_ops, \
.queue = &RX_QUEUE, \
.ops = &usb_stream_producer_ops, \
}, \
}; \
const struct usb_interface_descriptor \

View File

@@ -7,7 +7,7 @@
#
common-y=util.o
common-y+=version.o printf.o queue.o producer.o consumer.o
common-y+=version.o printf.o queue.o queue_policies.o
common-$(CONFIG_ADC)+=adc.o
common-$(CONFIG_ALS)+=als.o

View File

@@ -1,40 +0,0 @@
/* Copyright (c) 2015 The Chromium OS Authors. All rights reserved.
* Use of this source code is governed by a BSD-style license that can be
* found in the LICENSE file.
*
* Consumer methods
*/
#include "consumer.h"
#include "producer.h"
void consumer_notify_directly(struct consumer const *consumer, size_t count)
{
if (count && consumer->ops->written)
consumer->ops->written(consumer, count);
}
size_t consumer_read_unit(struct consumer const *consumer, void *unit)
{
size_t removed = queue_remove_unit(consumer->queue, unit);
producer_notify_directly(consumer->producer, removed);
return removed;
}
size_t consumer_read_memcpy(struct consumer const *consumer,
void *units,
size_t count,
void *(*memcpy)(void *dest,
void const *src,
size_t n))
{
size_t removed = queue_remove_memcpy(consumer->queue,
units,
count,
memcpy);
producer_notify_directly(consumer->producer, removed);
return removed;
}

View File

@@ -1,40 +0,0 @@
/* Copyright (c) 2015 The Chromium OS Authors. All rights reserved.
* Use of this source code is governed by a BSD-style license that can be
* found in the LICENSE file.
*
* Producer methods
*/
#include "consumer.h"
#include "producer.h"
void producer_notify_directly(struct producer const *producer, size_t count)
{
if (count && producer->ops->read)
producer->ops->read(producer, count);
}
size_t producer_write_unit(struct producer const *producer, void const *unit)
{
size_t added = queue_add_unit(producer->queue, unit);
consumer_notify_directly(producer->consumer, added);
return added;
}
size_t producer_write_memcpy(struct producer const *producer,
void const *units,
size_t count,
void *(*memcpy)(void *dest,
void const *src,
size_t n))
{
size_t added = queue_add_memcpy(producer->queue,
units,
count,
memcpy);
consumer_notify_directly(producer->consumer, added);
return added;
}

28
common/queue_policies.c Normal file
View File

@@ -0,0 +1,28 @@
/* Copyright 2015 The Chromium OS Authors. All rights reserved.
* Use of this source code is governed by a BSD-style license that can be
* found in the LICENSE file.
*
* Queue policies.
*/
#include "queue_policies.h"
#include "util.h"
#include <stddef.h>
void queue_add_direct(struct queue_policy const *policy, size_t count)
{
struct queue_policy_direct const *direct =
DOWNCAST(policy, struct queue_policy_direct, policy);
if (count && direct->consumer->ops->written)
direct->consumer->ops->written(direct->consumer, count);
}
void queue_remove_direct(struct queue_policy const *policy, size_t count)
{
struct queue_policy_direct const *direct =
DOWNCAST(policy, struct queue_policy_direct, policy);
if (count && direct->producer->ops->read)
direct->producer->ops->read(direct->producer, count);
}

View File

@@ -1,4 +1,4 @@
/* Copyright (c) 2015 The Chromium OS Authors. All rights reserved.
/* Copyright 2015 The Chromium OS Authors. All rights reserved.
* Use of this source code is governed by a BSD-style license that can be
* found in the LICENSE file.
*
@@ -10,56 +10,62 @@
#include "stream_adaptor.h"
#include "util.h"
static size_t in_stream_from_producer_read(struct in_stream const *stream,
uint8_t *buffer,
size_t count)
static size_t in_stream_from_queue_read(struct in_stream const *stream,
uint8_t *buffer,
size_t count)
{
struct in_stream_from_producer const *adaptor =
DOWNCAST(stream, struct in_stream_from_producer, in);
struct in_stream_from_queue const *adaptor =
DOWNCAST(stream, struct in_stream_from_queue, in);
return consumer_read_memcpy(&adaptor->consumer, buffer, count, memcpy);
return queue_remove_memcpy(adaptor->consumer.queue,
buffer,
count,
memcpy);
}
static void in_stream_from_producer_written(struct consumer const *consumer,
size_t count)
static void in_stream_from_queue_written(struct consumer const *consumer,
size_t count)
{
struct in_stream_from_producer const *adaptor =
DOWNCAST(consumer, struct in_stream_from_producer, consumer);
struct in_stream_from_queue const *adaptor =
DOWNCAST(consumer, struct in_stream_from_queue, consumer);
in_stream_ready(&adaptor->in);
}
struct in_stream_ops const in_stream_from_producer_in_stream_ops = {
.read = in_stream_from_producer_read,
struct in_stream_ops const in_stream_from_queue_in_stream_ops = {
.read = in_stream_from_queue_read,
};
struct consumer_ops const in_stream_from_producer_consumer_ops = {
.written = in_stream_from_producer_written,
struct consumer_ops const in_stream_from_queue_consumer_ops = {
.written = in_stream_from_queue_written,
};
static size_t out_stream_from_consumer_write(struct out_stream const *stream,
uint8_t const *buffer,
size_t count)
static size_t out_stream_from_queue_write(struct out_stream const *stream,
uint8_t const *buffer,
size_t count)
{
struct out_stream_from_consumer const *adaptor =
DOWNCAST(stream, struct out_stream_from_consumer, out);
struct out_stream_from_queue const *adaptor =
DOWNCAST(stream, struct out_stream_from_queue, out);
return producer_write_memcpy(&adaptor->producer, buffer, count, memcpy);
return queue_add_memcpy(adaptor->producer.queue,
buffer,
count,
memcpy);
}
static void out_stream_from_consumer_read(struct producer const *producer,
size_t count)
static void out_stream_from_queue_read(struct producer const *producer,
size_t count)
{
struct out_stream_from_consumer const *adaptor =
DOWNCAST(producer, struct out_stream_from_consumer, producer);
struct out_stream_from_queue const *adaptor =
DOWNCAST(producer, struct out_stream_from_queue, producer);
out_stream_ready(&adaptor->out);
}
struct out_stream_ops const out_stream_from_consumer_out_stream_ops = {
.write = out_stream_from_consumer_write,
struct out_stream_ops const out_stream_from_queue_out_stream_ops = {
.write = out_stream_from_queue_write,
};
struct producer_ops const out_stream_from_consumer_producer_ops = {
.read = out_stream_from_consumer_read,
struct producer_ops const out_stream_from_queue_producer_ops = {
.read = out_stream_from_queue_read,
};

View File

@@ -37,17 +37,15 @@ static inline void print_buffer(uint8_t *buf, int cnt)
static inline void print_buffer(uint8_t *buf, int cnt) {}
#endif
struct queue const rx_queue = QUEUE_NULL(MCDP_INBUF_MAX, uint8_t);
struct queue const tx_queue = QUEUE_NULL(MCDP_OUTBUF_MAX, uint8_t);
struct usart_config const usart_mcdp;
IN_STREAM_FROM_PRODUCER(usart_in, usart_mcdp.producer, rx_queue, NULL)
OUT_STREAM_FROM_CONSUMER(usart_out, usart_mcdp.consumer, tx_queue, NULL)
USART_CONFIG(usart_mcdp, CONFIG_MCDP28X0, 115200, rx_queue, tx_queue,
usart_in.consumer, usart_out.producer);
IO_STREAM_CONFIG(usart_mcdp, MCDP_INBUF_MAX, MCDP_OUTBUF_MAX, NULL, NULL);
USART_CONFIG(usart_mcdp,
CONFIG_MCDP28X0,
115200,
usart_mcdp_rx_queue,
usart_mcdp_tx_queue);
/**
* Compute checksum.
@@ -86,13 +84,13 @@ static int tx_serial(const uint8_t *msg, int cnt)
/* 1st byte (not in msg) is always cnt + 2, so seed chksum with that */
uint8_t chksum = compute_checksum(cnt + 2, msg, cnt);
if (out_stream_write(&usart_out.out, &out, 1) != 1)
if (out_stream_write(&usart_mcdp_out.out, &out, 1) != 1)
return EC_ERROR_UNKNOWN;
if (out_stream_write(&usart_out.out, msg, cnt) != cnt)
if (out_stream_write(&usart_mcdp_out.out, msg, cnt) != cnt)
return EC_ERROR_UNKNOWN;
if (out_stream_write(&usart_out.out, &chksum, 1) != 1)
if (out_stream_write(&usart_mcdp_out.out, &chksum, 1) != 1)
return EC_ERROR_UNKNOWN;
return EC_SUCCESS;
@@ -119,10 +117,10 @@ static int rx_serial(uint8_t *msg, int cnt)
size_t read;
int retry = 2;
read = in_stream_read(&usart_in.in, msg, cnt);
read = in_stream_read(&usart_mcdp_in.in, msg, cnt);
while ((read < cnt) && retry) {
usleep(100*MSEC);
read += in_stream_read(&usart_in.in, msg + read,
read += in_stream_read(&usart_mcdp_in.in, msg + read,
cnt - read);
retry--;
}

View File

@@ -1,4 +1,4 @@
/* Copyright (c) 2015 The Chromium OS Authors. All rights reserved.
/* Copyright 2015 The Chromium OS Authors. All rights reserved.
* Use of this source code is governed by a BSD-style license that can be
* found in the LICENSE file.
*
@@ -37,45 +37,11 @@ struct consumer_ops {
struct consumer {
/*
* A consumer references the producer at the other end of the queue.
* This allows the consumer to notify the producer when units are
* removed from the queue.
*/
struct producer const *producer;
/*
* A consumer also references the queue that it is reading from. This
* and the producer reference above could be more flexibly replaced by
* a queue manager object that could handle multiple producer/consumers
* or alternate notification mechanisms. But that complexity is not
* yet warranted.
* A consumer references the queue that it is reading from.
*/
struct queue const *queue;
struct consumer_ops const *ops;
};
/*
* Notify the consumer by calling its written method directly, as opposed to
* from a deferred callback or another task.
*/
void consumer_notify_directly(struct consumer const *consumer, size_t count);
/*
* Read a single unit from the queue and notify the associated producer.
* Return the number of units read.
*/
size_t consumer_read_unit(struct consumer const *consumer, void *unit);
/*
* Read multiple units from the queue, using the provided memcpy like routine
* and notify the producer. Return the number of units read.
*/
size_t consumer_read_memcpy(struct consumer const *consumer,
void *units,
size_t count,
void *(*memcpy)(void *dest,
void const *src,
size_t n));
#endif /* INCLUDE_CONSUMER_H */

View File

@@ -1,4 +1,4 @@
/* Copyright (c) 2015 The Chromium OS Authors. All rights reserved.
/* Copyright 2015 The Chromium OS Authors. All rights reserved.
* Use of this source code is governed by a BSD-style license that can be
* found in the LICENSE file.
*
@@ -31,45 +31,11 @@ struct producer_ops {
struct producer {
/*
* A producer references the consumer at the other end of the queue.
* This allows the producer to notify the consumer when new units are
* added to the queue.
*/
struct consumer const *consumer;
/*
* A producer also references the queue that it is writing into. This
* and the consumer reference above could be more flexibly replaced by
* a queue manager object that could handle multiple producer/consumers
* or alternate notification mechanisms. But that complexity is not
* yet warranted.
* A producer references the queue that it is writing into.
*/
struct queue const *queue;
struct producer_ops const *ops;
};
/*
* Notify the producer by calling its read method directly, as opposed to from
* a deferred callback or another task.
*/
void producer_notify_directly(struct producer const *producer, size_t count);
/*
* Write a single unit to the queue and notify the associated consumer. Return
* the number of units written.
*/
size_t producer_write_unit(struct producer const *producer, void const *unit);
/*
* Write multiple units to the queue, using the provided memcpy like routine
* and notify the consumer. Return the number of units written.
*/
size_t producer_write_memcpy(struct producer const *producer,
void const *units,
size_t count,
void *(*memcpy)(void *dest,
void const *src,
size_t n));
#endif /* INCLUDE_PRODUCER_H */

43
include/queue_policies.h Normal file
View File

@@ -0,0 +1,43 @@
/* Copyright 2015 The Chromium OS Authors. All rights reserved.
* Use of this source code is governed by a BSD-style license that can be
* found in the LICENSE file.
*
* Queue policies.
*/
#ifndef INCLUDE_QUEUE_POLICIES_H
#define INCLUDE_QUEUE_POLICIES_H
#include "queue.h"
#include "consumer.h"
#include "producer.h"
/*
* The direct notification policy manages a 1-to-1 producer consumer model.
* When new units are added to the queue the consumer is notified directly, in
* whatever context (interrupt, deferred, task...) that the queue addition
* happened. Similarly, queue removals directly notify the producer.
*/
struct queue_policy_direct {
struct queue_policy policy;
struct producer const *producer;
struct consumer const *consumer;
};
void queue_add_direct(struct queue_policy const *policy, size_t count);
void queue_remove_direct(struct queue_policy const *policy, size_t count);
#define QUEUE_POLICY_DIRECT(PRODUCER, CONSUMER) \
((struct queue_policy_direct const) { \
.policy = { \
.add = queue_add_direct, \
.remove = queue_remove_direct, \
}, \
.producer = &PRODUCER, \
.consumer = &CONSUMER, \
})
#define QUEUE_DIRECT(SIZE, TYPE, PRODUCER, CONSUMER) \
QUEUE(SIZE, TYPE, QUEUE_POLICY_DIRECT(PRODUCER, CONSUMER).policy)
#endif /* INCLUDE_QUEUE_POLICIES_H */

View File

@@ -1,27 +1,27 @@
/* Copyright (c) 2015 The Chromium OS Authors. All rights reserved.
/* Copyright 2015 The Chromium OS Authors. All rights reserved.
* Use of this source code is governed by a BSD-style license that can be
* found in the LICENSE file.
*/
#ifndef INCLUDE_STREAM_ADAPTOR_H
#define INCLUDE_STREAM_ADAPTOR_H
/* STM32 USART driver for Chrome EC */
#include "common.h"
#include "in_stream.h"
#include "out_stream.h"
#include "consumer.h"
#include "producer.h"
#include "queue.h"
#include "queue_policies.h"
/*
* +..........+ +..........+------+...........+
* . .<------------->. | | .
* . Producer . +---------+ . Consumer | ISFP | In Stream .
* . Producer . +---------+ . Consumer | ISFQ | In Stream .
* . .->| Queue |->. | | .
* +..........+ +---------+ +..........+------+...........+
*/
struct in_stream_from_producer {
struct in_stream_from_queue {
struct consumer consumer;
struct in_stream in;
};
@@ -29,31 +29,30 @@ struct in_stream_from_producer {
/*
*
*/
extern struct in_stream_ops const in_stream_from_producer_in_stream_ops;
extern struct consumer_ops const in_stream_from_producer_consumer_ops;
extern struct in_stream_ops const in_stream_from_queue_in_stream_ops;
extern struct consumer_ops const in_stream_from_queue_consumer_ops;
#define IN_STREAM_FROM_PRODUCER(NAME, PRODUCER, QUEUE, READY) \
struct in_stream_from_producer const NAME = { \
#define IN_STREAM_FROM_QUEUE(NAME, QUEUE, READY) \
struct in_stream_from_queue const NAME = { \
.consumer = { \
.producer = &PRODUCER, \
.queue = &QUEUE, \
.ops = &in_stream_from_producer_consumer_ops, \
.queue = &QUEUE, \
.ops = &in_stream_from_queue_consumer_ops, \
}, \
.in = { \
.ready = READY, \
.ops = &in_stream_from_producer_in_stream_ops, \
.ops = &in_stream_from_queue_in_stream_ops, \
}, \
};
/*
* +..........+ +..........+------+............+
* . .<------------->. | | .
* . Consumer . +---------+ . Producer | OSFC | Out Stream .
* . Consumer . +---------+ . Producer | OSFQ | Out Stream .
* . .<-| Queue |<-. | | .
* +..........+ +---------+ +..........+------+............+
*/
struct out_stream_from_consumer {
struct out_stream_from_queue {
struct producer producer;
struct out_stream out;
};
@@ -61,20 +60,49 @@ struct out_stream_from_consumer {
/*
*
*/
extern struct out_stream_ops const out_stream_from_consumer_out_stream_ops;
extern struct producer_ops const out_stream_from_consumer_producer_ops;
extern struct out_stream_ops const out_stream_from_queue_out_stream_ops;
extern struct producer_ops const out_stream_from_queue_producer_ops;
#define OUT_STREAM_FROM_CONSUMER(NAME, CONSUMER, QUEUE, READY) \
struct out_stream_from_consumer const NAME = { \
#define OUT_STREAM_FROM_QUEUE(NAME, QUEUE, READY) \
struct out_stream_from_queue const NAME = { \
.producer = { \
.consumer = &CONSUMER, \
.queue = &QUEUE, \
.ops = &out_stream_from_consumer_producer_ops, \
.queue = &QUEUE, \
.ops = &out_stream_from_queue_producer_ops, \
}, \
.out = { \
.ready = READY, \
.ops = &out_stream_from_consumer_out_stream_ops, \
.ops = &out_stream_from_queue_out_stream_ops, \
}, \
};
/*
* Given a forward declared device configuration called NAME that implements
* producer and consumer interfaces construct RX/TX queues and expose them as
* streams called <NAME>_in and <NAME>_out.
*/
#define IO_STREAM_CONFIG(NAME, RX_SIZE, TX_SIZE, IN_READY, OUT_READY) \
\
struct in_stream_from_queue const CONCAT2(NAME, _in); \
\
struct queue const CONCAT2(NAME, _rx_queue) = \
QUEUE_DIRECT(RX_SIZE, \
uint8_t, \
NAME.producer, \
CONCAT2(NAME, _in).consumer); \
IN_STREAM_FROM_QUEUE(CONCAT2(NAME, _in), \
CONCAT2(NAME, _rx_queue), \
IN_READY) \
\
\
struct out_stream_from_queue const CONCAT2(NAME, _out); \
\
struct queue const CONCAT2(NAME, _tx_queue) = \
QUEUE_DIRECT(TX_SIZE, \
uint8_t, \
CONCAT2(NAME, _out).producer, \
NAME.consumer); \
OUT_STREAM_FROM_QUEUE(CONCAT2(NAME, _out), \
CONCAT2(NAME, _tx_queue), \
OUT_READY)
#endif /* INCLUDE_STREAM_ADAPTOR_H */