taish: notification support

Signed-off-by: Wataru Ishida <ishida@nel-america.com>
This commit is contained in:
Wataru Ishida
2019-05-23 16:56:10 -07:00
committed by Wataru Ishida
parent 347fc472db
commit 99385ad110
5 changed files with 271 additions and 20 deletions

View File

@@ -23,8 +23,12 @@ lib: proto $(LIB_OBJS)
.cpp.o:
$(CC) $(CFLAGS) $(INCLUDE) -c -o $@ $<
proto: proto/tai.proto
proto: lib/tai.grpc.pb.cc lib/tai.grpc.pb.h lib/tai.pb.cc lib/tai.pb.h
lib/tai.pb.cc lib/tai.pb.h: proto/tai.proto
protoc --cpp_out=./lib -I proto tai.proto
lib/tai.grpc.pb.cc lib/tai.grpc.pb.h: proto/tai.proto
protoc --grpc_out=./lib --plugin=protoc-gen-grpc=`which grpc_cpp_plugin` -I proto tai.proto
docker:

View File

@@ -33,7 +33,8 @@ def show_help(hidden=False):
d = [['list', 'list detected TAI objects'],
['list-attr', 'list attributes of the selected TAI object'],
['set <attr-name> <attr-value>', 'set an attribute'],
['get <attr-name> [<attr-value>]', 'get an attribute']]
['get <attr-name> [<attr-value>]', 'get an attribute'],
['monitor', 'monitor state change']]
print(tabulate(d, headers=['command', 'description']))
def show_modules(modules):
@@ -66,6 +67,24 @@ def list_attr(stub, module, netif, hostif):
d.append([m.short_name, 'ro' if m.is_readonly else 'r/w', m.usage, 'custom' if m.attr_id > TAI_ATTR_CUSTOM_RANGE_START else 'official'])
print(tabulate(d, headers=['name', 'type', 'value', 'range']))
def monitor(stub, module, netif, hostif):
req = tai_pb2.MonitorRequest()
req.oid = netif.oid
m = get_attribute_metadata(stub, module, netif, hostif)
try:
for res in stub.Monitor(req):
a = [ v.short_name for v in m if v.attr_id == res.attribute.attr_id ]
if len(a) == 1:
print('{} | {}'.format(a[0], res.attribute.value))
elif len(a) == 0:
print('0x{:x} | {}'.format(res.attribute.attr_id, res.attribute.value))
else:
print('error: more than one metadata matched for id 0x{:x}: {}'.format(res.attribute.attr_id, a))
except KeyboardInterrupt:
pass
def get_attr(stub, module, netif, hostif, cmds):
if not module:
print ('no module selected.')
@@ -169,7 +188,7 @@ class TAIShellCompleter(Completer):
if len(t) == 1:
cmds = ['list', 'module', 'help', 'quit', 'exit']
if self.module != None:
cmds += ['list-attr', 'netif', 'hostif', 'set', 'get']
cmds += ['list-attr', 'netif', 'hostif', 'set', 'get', 'monitor']
for c in cmds:
if c.startswith(t[0]):
@@ -277,6 +296,8 @@ def loop(stub, modules):
print(get_attr(stub, module, netif, hostif, cmds))
elif cmd == 'set':
set_attr(stub, module, netif, hostif, cmds)
elif cmd == 'monitor':
monitor(stub, module, netif, hostif)
elif cmd in ['exit', 'quit', 'q']:
if netif:
netif = None
@@ -358,6 +379,8 @@ def main():
set_attr(stub, module, netif, hostif, args)
elif args[0] == 'help':
show_help()
elif args[0] == 'monitor':
monitor(stub, module, netif, hostif)
else:
print('unknown cmd: {}'.format(args[0]))
show_help()

View File

@@ -26,6 +26,11 @@
#include "tai.h"
#include "tai.grpc.pb.h"
#include <grpcpp/grpcpp.h>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <map>
#include <iostream>
struct tai_api_module_t
{
@@ -62,6 +67,46 @@ class TAIAPIModuleList {
uint32_t m_netif_size;
};
struct tai_notification_t {
tai_object_id_t oid;
tai_attribute_t const * const attr;
};
struct tai_subscription_t {
std::mutex mtx;
std::queue<tai_notification_t> q;
std::condition_variable cv;
};
class TAINotifier {
public:
TAINotifier() {};
int notify(const tai_notification_t& n);
int subscribe(void* id, tai_subscription_t* s) {
std::unique_lock<std::mutex> lk(mtx);
if ( m.find(id) != m.end() ) {
return -1;
}
m[id] = s;
return 0;
}
int desubscribe(void* id) {
std::unique_lock<std::mutex> lk(mtx);
if ( m.find(id) == m.end() ) {
return -1;
}
m.erase(id);
return 0;
}
int size() {
std::unique_lock<std::mutex> lk(mtx);
return m.size();
}
private:
std::map<void*, tai_subscription_t*> m;
std::mutex mtx;
};
class TAIServiceImpl final : public tai::TAI::Service {
public:
TAIServiceImpl(const tai_api_method_table_t* const api) : m_api(api) {};
@@ -70,8 +115,19 @@ class TAIServiceImpl final : public tai::TAI::Service {
::grpc::Status GetAttributeMetadata(::grpc::ServerContext* context, const ::tai::GetAttributeMetadataRequest* request, ::tai::GetAttributeMetadataResponse* response);
::grpc::Status GetAttribute(::grpc::ServerContext* context, const ::tai::GetAttributeRequest* request, ::tai::GetAttributeResponse* response);
::grpc::Status SetAttribute(::grpc::ServerContext* context, const ::tai::SetAttributeRequest* request, ::tai::SetAttributeResponse* response);
::grpc::Status Monitor(::grpc::ServerContext* context, const ::tai::MonitorRequest* request, ::grpc::ServerWriter< ::tai::MonitorResponse>* writer);
void notify(tai_object_id_t oid, tai_attribute_t const * const attribute);
private:
TAINotifier* get_notifier(tai_object_id_t oid) {
std::unique_lock<std::mutex> lk(m_mtx);
if ( m_notifiers.find(oid) == m_notifiers.end() ) {
m_notifiers[oid] = new TAINotifier();
}
return m_notifiers[oid];
}
const tai_api_method_table_t* const m_api;
std::map<tai_object_id_t, TAINotifier*> m_notifiers;
std::mutex m_mtx;
};
#endif // __TAIGRPC_HPP__

View File

@@ -23,10 +23,36 @@
#include "taigrpc.hpp"
#include "taimetadata.h"
#include <sstream>
#include <chrono>
using grpc::Status;
using grpc::StatusCode;
static int _serialize_attribute(const tai_attr_metadata_t* meta, const tai_attribute_t* attr, std::string& out) {
tai_serialize_option_t option{true, true, true};
size_t buf_size = 64;
char bbuf[64] = {0}, *buf = bbuf;
auto count = tai_serialize_attribute(buf, buf_size, meta, attr, &option);
if ( count < 0 ) {
return count;
}
if ( count > buf_size ) {
buf_size = count + 1;
buf = new char[buf_size];
count = tai_serialize_attribute(buf, buf_size, meta, attr, &option);
if ( count < 0 || count > buf_size ) {
return count;
}
out = buf;
}
out = buf;
// check if buf is dynamically allocated
if ( buf != bbuf ) {
delete[] buf;
}
return 0;
}
TAIAPIModuleList::TAIAPIModuleList(uint32_t module_size, uint32_t hostif_size, uint32_t netif_size) : m_module_size(module_size), m_hostif_size(hostif_size), m_netif_size(netif_size) {
m_list.count = module_size;
m_list.list = new tai_api_module_t[module_size];
@@ -278,8 +304,8 @@ static void convert_metadata(const tai_attr_metadata_t* const src, ::tai::Attrib
attr.id = id;
tai_serialize_option_t option{true, true, true};
tai_alloc_info_t alloc_info = { .list_size = 16 };
char bbuf[64] = {0}, *buf = bbuf;
size_t buf_size = 64, count = 0;
size_t count = 0;
std::string value;
again:
if( tai_metadata_alloc_attr_value(meta, &attr, &alloc_info) != TAI_STATUS_SUCCESS ) {
@@ -319,27 +345,15 @@ again:
goto err;
}
count = tai_serialize_attribute(buf, buf_size, meta, &attr, &option);
if ( count < 0 ) {
if ( _serialize_attribute(meta, &attr, value) != 0 ) {
ret = TAI_STATUS_FAILURE;
goto err;
}
if ( count > buf_size ) {
buf_size = count + 1;
buf = new char[buf_size];
count = tai_serialize_attribute(buf, buf_size, meta, &attr, &option);
if ( count < 0 || count > buf_size ) {
goto err;
}
}
res = response->mutable_attribute();
res->set_attr_id(id);
res->set_value(buf, count);
res->set_value(value);
err:
// check if buf is dynamically allocated
if ( buf != bbuf ) {
delete[] buf;
}
if ( tai_metadata_free_attr_value(meta, &attr, &alloc_info) != TAI_STATUS_SUCCESS ) {
return Status(StatusCode::UNKNOWN, "failed to free value");
}
@@ -398,3 +412,148 @@ err:
}
return Status::OK;
}
int TAINotifier::notify(const tai_notification_t& n) {
auto oid = n.oid;
auto type = tai_object_type_query(oid);
auto meta = tai_metadata_get_attr_metadata(type, n.attr->id);
tai_alloc_info_t alloc_info;
alloc_info.reference = n.attr;
std::unique_lock<std::mutex> lk(mtx);
for ( auto& s : m ) {
tai_attribute_t *attr = new tai_attribute_t();
attr->id = n.attr->id;
if ( tai_metadata_alloc_attr_value(meta, attr, &alloc_info) != TAI_STATUS_SUCCESS ) {
return -1;
}
if ( tai_metadata_deepcopy_attr_value(meta, n.attr, attr) != TAI_STATUS_SUCCESS ) {
return -1;
}
tai_notification_t nn{ oid, attr };
auto v = s.second;
std::unique_lock<std::mutex> lk(v->mtx);
v->q.push(nn);
v->cv.notify_one();
}
return 0;
}
void monitor_callback(void* context, tai_object_id_t oid, tai_attribute_t const * const attribute) {
if ( context == nullptr ) {
return;
}
auto impl = static_cast<TAIServiceImpl*>(context);
impl->notify(oid, attribute);
}
void TAIServiceImpl::notify(tai_object_id_t oid, tai_attribute_t const * const attribute) {
std::unique_lock<std::mutex> lk(m_mtx);
auto n = m_notifiers[oid];
if ( n != nullptr ) {
n->notify(tai_notification_t{oid, attribute});
}
}
::grpc::Status TAIServiceImpl::Monitor(::grpc::ServerContext* context, const ::tai::MonitorRequest* request, ::grpc::ServerWriter< ::tai::MonitorResponse>* writer) {
auto oid = request->oid();
auto type = tai_object_type_query(oid);
tai_attribute_t attr = {0};
tai_status_t ret;
switch (type) {
case TAI_OBJECT_TYPE_NETWORKIF:
attr.id = TAI_NETWORK_INTERFACE_ATTR_NOTIFY;
ret = m_api->netif_api->get_network_interface_attribute(oid, &attr);
break;
default:
return Status(StatusCode::UNKNOWN, "unsupported object type");
}
if ( ret != TAI_STATUS_SUCCESS ) {
std::stringstream ss;
ss << "failed to get notify attribute: ret:" << std::hex << -ret;
return Status(StatusCode::UNKNOWN, ss.str());
}
auto notifier = get_notifier(oid);
if ( attr.value.notification.notify == nullptr ) {
attr.value.notification.notify = monitor_callback;
attr.value.notification.context = this;
ret = m_api->netif_api->set_network_interface_attribute(oid, &attr);
if ( ret != TAI_STATUS_SUCCESS ) {
std::stringstream ss;
ss << "failed to set notify attribute: ret:" << std::hex << -ret;
return Status(StatusCode::UNKNOWN, ss.str());
}
} else if ( attr.value.notification.notify != nullptr && notifier->size() == 0 ) {
return Status(StatusCode::UNKNOWN, "notify attribute is set by others");
}
tai_subscription_t s;
if ( notifier->subscribe(writer, &s) < 0 ) {
return Status(StatusCode::UNKNOWN, "failed to subscribe");
}
std::unique_lock<std::mutex> lk(s.mtx);
while(true) {
std::chrono::seconds sec(1);
auto pred = s.cv.wait_for(lk, sec, [&]{ return !s.q.empty(); });
if ( !pred ) { // queue is empty
if ( context->IsCancelled() ) {
break;
}
// writer is still alive continue
continue;
}
auto n = s.q.front();
s.q.pop();
::tai::MonitorResponse res;
auto a = res.mutable_attribute();
auto meta = tai_metadata_get_attr_metadata(type, n.attr->id);
a->set_attr_id(n.attr->id);
std::string value;
_serialize_attribute(meta, n.attr, value);
a->set_value(value);
tai_attribute_t *attr = const_cast<tai_attribute_t*>(n.attr);
if ( tai_metadata_free_attr_value(meta, attr, nullptr) < 0 ) {
return Status(StatusCode::UNKNOWN, "failed to free attribute");
}
delete attr;
if (!writer->Write(res)) {
break;
}
}
if ( notifier->desubscribe(writer) < 0 ) {
return Status(StatusCode::UNKNOWN, "failed to desubscribe");
}
if ( notifier->size() == 0 ) {
std::unique_lock<std::mutex> lk(m_mtx);
delete notifier;
m_notifiers.erase(oid);
attr.value.notification.notify = nullptr;
attr.value.notification.context = nullptr;
ret = m_api->netif_api->set_network_interface_attribute(oid, &attr);
if ( ret != TAI_STATUS_SUCCESS ) {
std::stringstream ss;
ss << "failed to clear notify attribute: ret:" << std::hex << -ret;
return Status(StatusCode::UNKNOWN, ss.str());
}
}
return Status::OK;
}

View File

@@ -8,6 +8,7 @@ service TAI {
rpc GetAttributeMetadata(GetAttributeMetadataRequest) returns (GetAttributeMetadataResponse);
rpc GetAttribute(GetAttributeRequest) returns (GetAttributeResponse);
rpc SetAttribute(SetAttributeRequest) returns (SetAttributeResponse);
rpc Monitor(MonitorRequest) returns (stream MonitorResponse);
}
enum TAIObjectType {
@@ -61,6 +62,14 @@ message SetAttributeRequest {
message SetAttributeResponse {
}
message MonitorRequest {
uint64 oid = 1;
}
message MonitorResponse {
Attribute attribute = 1;
}
message Attribute {
uint64 attr_id = 1;
string value = 2;