Initial checkin

- wrapper for some AIM classes (probably the wrong package)
- wrap a subset of libonlp
- wrap onlp.h, sys.h, oids.h
- sample unit tests
- sample onlpdump.py
This commit is contained in:
Carl D. Roth
2017-09-12 11:28:10 -07:00
parent e616647b4f
commit 1547f487a7
7 changed files with 633 additions and 0 deletions

View File

@@ -26,6 +26,11 @@ packages:
builds/onlp-platform/$BUILD_DIR/${TOOLCHAIN}/bin/libonlp-platform.so : $libdir/
builds/onlp-platform-defaults/$BUILD_DIR/${TOOLCHAIN}/bin/libonlp-platform-defaults.so : $libdir/
builds/onlpd/$BUILD_DIR/${TOOLCHAIN}/bin/onlpd : $bindir/
${ONL}/packages/base/any/onlp/src/onlpdump.py: $bindir/
${ONL}/packages/base/any/onlp/src/onlp/module/python/onlp/*.py: ${PY_INSTALL}/onlp/
${ONL}/packages/base/any/onlp/src/onlp/module/python/onlp/onlp/*.py: ${PY_INSTALL}/onlp/onlp/
${ONL}/packages/base/any/onlp/src/onlp/module/python/onlp/test/*.py: ${PY_INSTALL}/onlp/test/
${ONL}/packages/base/any/onlp/src/onlplib/module/python/onlp/onlplib/*.py: ${PY_INSTALL}/onlp/onlplib/
init: $ONL/packages/base/any/onlp/src/onlpd.init

View File

@@ -0,0 +1,4 @@
"""__init__.py
Module init for onlp.
"""

View File

@@ -0,0 +1,261 @@
"""__init__.py
Module init for onlp.onlp
"""
import ctypes
libonlp = ctypes.cdll.LoadLibrary("libonlp.so")
import ctypes.util
libc = ctypes.cdll.LoadLibrary(ctypes.util.find_library("c"))
import onlp.onlplib
# AIM/aim_memory.h
class aim_void_p(ctypes.c_void_p):
"""Generic data allocated by AIM."""
def __del__(self):
libonlp.aim_free(self)
class aim_char_p(aim_void_p):
"""AIM data that is a printable string."""
def __init__(self, stringOrAddress):
if stringOrAddress is None:
aim_void_p.__init__(self, stringOrAddress)
return
if isinstance(stringOrAddress, aim_void_p):
aim_void_p.__init__(self, stringOrAddress)
return
if isinstance(stringOrAddress, basestring):
cs = ctypes.c_char_p(stringOrAddress)
ptr = libonlp.aim_malloc(len(stringOrAddress)+1)
libc.strcpy(ptr, ctypes.addressof(cs))
aim_void_p.__init__(self, ptr)
return
if type(stringOrAddress) == int:
aim_void_p.__init__(self, stringOrAddress)
return
raise ValueError("invalid initializer for aim_char_p: %s"
% repr(stringOrAddress))
def string_at(self):
if self.value:
return ctypes.string_at(self.value)
else:
return None
def __eq__(self, other):
return (isinstance(other, aim_char_p)
and (self.string_at()==other.string_at()))
def __neq__(self, other):
return not self == other
def __hash__(self):
return hash(self.string_at())
def aim_memory_init_prototypes():
libonlp.aim_malloc.restype = aim_void_p
libonlp.aim_malloc.argtypes = (ctypes.c_size_t,)
libonlp.aim_free.restype = None
libonlp.aim_free.argtypes = (aim_void_p,)
# AIM/aim_object.h
aim_object_dtor = ctypes.CFUNCTYPE(None, ctypes.c_void_p)
class aim_object(ctypes.Structure):
_fields_ = [("_id", ctypes.c_char_p,),
("subtype", ctypes.c_int,),
("cookie", ctypes.c_void_p,),
("destructor", aim_object_dtor,),]
# AIM/aim_pvs.h
# AIM/aim_pvs_*.h
aim_vprintf_f = ctypes.CFUNCTYPE(ctypes.c_int)
class aim_pvs(ctypes.Structure):
_fields_ = [("object", aim_object,),
("description", ctypes.c_char_p,),
("vprintf", aim_vprintf_f,),
("enabled", ctypes.c_int,),
("counter", ctypes.c_uint,),
("isatty", aim_vprintf_f,),]
def aim_pvs_init_prototypes():
libonlp.aim_pvs_buffer_create.restype = ctypes.POINTER(aim_pvs)
libonlp.aim_pvs_destroy.restype = None
libonlp.aim_pvs_destroy.argtypes = (ctypes.POINTER(aim_pvs),)
libonlp.aim_pvs_buffer_size.restype = ctypes.c_int
libonlp.aim_pvs_buffer_size.argtypes = (ctypes.POINTER(aim_pvs),)
libonlp.aim_pvs_buffer_get.restype = aim_char_p
libonlp.aim_pvs_buffer_get.argtypes = (ctypes.POINTER(aim_pvs),)
libonlp.aim_pvs_buffer_reset.restype = None
libonlp.aim_pvs_buffer_reset.argtypes = (ctypes.POINTER(aim_pvs),)
# onlp/oids.h
onlp_oid = ctypes.c_uint
ONLP_OID_DESC_SIZE = 128
ONLP_OID_TABLE_SIZE = 32
ONLP_OID_DUMP_F_RECURSE = 0x1
ONLP_OID_DUMP_F_EVEN_IF_ABSENT = 0x2
ONLP_OID_SHOW_F_RECURSE = 0x1
ONLP_OID_SHOW_F_EXTENDED = 0x2
ONLP_OID_SHOW_F_YAML = 0x4
class OidTableIterator(object):
def __init__(self, hdr):
self.hdr = hdr
self.idx = 0
def __iter__(self):
return self
def next(self):
if self.idx >= ONLP_OID_TABLE_SIZE:
raise StopIteration
oid = self.hdr.coids[self.idx]
self.idx += 1
if oid == 0:
raise StopIteration
oidHdr = onlp_oid_hdr()
libonlp.onlp_oid_hdr_get(oid, ctypes.byref(oidHdr))
return oidHdr
ONLP_OID_TYPE_SYS = 1
ONLP_OID_TYPE_THERMAL = 2
ONLP_OID_TYPE_FAN = 3
ONLP_OID_TYPE_PSU = 4
ONLP_OID_TYPE_LED = 5
ONLP_OID_TYPE_MODULE = 6
ONLP_OID_TYPE_RTC = 7
# XXX roth waiting for enum generator
class onlp_oid_hdr(ctypes.Structure):
_fields_ = [("_id", onlp_oid,),
("description", ctypes.c_char * ONLP_OID_DESC_SIZE,),
("poid", onlp_oid,),
("coids", onlp_oid * ONLP_OID_TABLE_SIZE,),]
def getType(self):
return self._id >> 24
def isSystem(self):
return self.getType() == ONLP_OID_TYPE_SYS
def isThermal(self):
return self.getType() == ONLP_OID_TYPE_THERMAL
def isFan(self):
return self.getType() == ONLP_OID_TYPE_FAN
def isPsu(self):
return self.getType() == ONLP_OID_TYPE_PSU
def isLed(self):
return self.getType() == ONLP_OID_TYPE_LED
def isModule(self):
return self.getType() == ONLP_OID_TYPE_MODULE
def isRtc(self):
return self.getType() == ONLP_OID_TYPE_RTC
def children(self):
return OidTableIterator(self)
onlp_oid_iterate_f = ctypes.CFUNCTYPE(ctypes.c_int, onlp_oid, ctypes.c_void_p)
def onlp_oid_init_prototypes():
#onlp_oid_dump
#onlp_oid_table_dump
#onlp_oid_show
#onlp_oid_table_show
libonlp.onlp_oid_iterate.restype = ctypes.c_int
libonlp.onlp_oid_iterate.argtypes = (onlp_oid, ctypes.c_int,
onlp_oid_iterate_f, ctypes.c_void_p,)
# XXX enum
libonlp.onlp_oid_hdr_get.restype = ctypes.c_int
libonlp.onlp_oid_hdr_get.argtypes = (onlp_oid, ctypes.POINTER(onlp_oid_hdr,))
# XXX enum
# onlp/sys.h
class onlp_sys_info(ctypes.Structure):
initialized = False
_fields_ = [("hdr", onlp_oid_hdr,),
("onie_info", onlp.onlplib.onlp_onie_info,),
("platform_info", onlp.onlplib.onlp_platform_info,),]
def __del__(self):
if self.initialized:
libonlp.onlp_sys_info_free(ctypes.byref(self))
def onlp_sys_init_prototypes():
libonlp.onlp_sys_init.restype = ctypes.c_int
libonlp.onlp_sys_info_get.restype = ctypes.c_int
libonlp.onlp_sys_info_get.argtypes = (ctypes.POINTER(onlp_sys_info),)
libonlp.onlp_sys_info_free.restype = None
libonlp.onlp_sys_info_get.argtypes = (ctypes.POINTER(onlp_sys_info),)
libonlp.onlp_sys_hdr_get.restype = ctypes.c_int
libonlp.onlp_sys_hdr_get.argtypes = (ctypes.POINTER(onlp_oid_hdr),)
libonlp.onlp_sys_dump.restype = None
libonlp.onlp_sys_dump.argtypes = (onlp_oid, ctypes.POINTER(aim_pvs), ctypes.c_uint,)
libonlp.onlp_sys_show.restype = None
libonlp.onlp_sys_show.argtypes = (onlp_oid, ctypes.POINTER(aim_pvs), ctypes.c_uint,)
libonlp.onlp_sys_ioctl.restype = ctypes.c_int
# leave the parameters empty (varargs)
##libonlp.onlp_sys_vioctl.restype = ctypes.c_int
# NOTE that ctypes cannot automatically handle va_list
libonlp.onlp_sys_platform_manage_start.restype = ctypes.c_int
libonlp.onlp_sys_platform_manage_start.argtypes = (ctypes.c_int,)
libonlp.onlp_sys_platform_manage_stop.restype = ctypes.c_int
libonlp.onlp_sys_platform_manage_stop.argtypes = (ctypes.c_int,)
libonlp.onlp_sys_platform_manage_join.restype = ctypes.c_int
libonlp.onlp_sys_platform_manage_now.restype = None
libonlp.onlp_sys_debug.restype = ctypes.c_int
libonlp.onlp_sys_debug.argtypes = (ctypes.POINTER(aim_pvs), ctypes.c_int,
ctypes.POINTER(ctypes.POINTER(ctypes.c_char)),)
# onlp/onlp.h
def onlp_init():
libonlp.onlp_init()
aim_memory_init_prototypes()
aim_pvs_init_prototypes()
onlp_oid_init_prototypes()
onlp_sys_init_prototypes()

View File

@@ -0,0 +1,236 @@
"""OnlpApiTest.py
Test the API bindings.
"""
import ctypes
import unittest
import logging
import re
import onlp.onlp
onlp.onlp.onlp_init()
libonlp = onlp.onlp.libonlp
class OnlpTestMixin(object):
def setUp(self):
self.log = logging.getLogger("onlp")
self.log.setLevel(logging.DEBUG)
self.aim_pvs_buffer_p = libonlp.aim_pvs_buffer_create()
self.aim_pvs_buffer = self.aim_pvs_buffer_p.contents
def tearDown(self):
libonlp.aim_pvs_destroy(self.aim_pvs_buffer_p)
class InitTest(OnlpTestMixin,
unittest.TestCase):
def setUp(self):
OnlpTestMixin.setUp(self)
def tearDown(self):
OnlpTestMixin.tearDown(self)
def testInit(self):
"""Verify that the library can be loaded."""
pass
def testBuffer(self):
"""Verify that the AIM buffer type is usable."""
nullString = onlp.onlp.aim_char_p(None)
buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p)
self.assertEqual(nullString, buf)
libonlp.aim_printf(self.aim_pvs_buffer_p, "hello\n")
buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p)
self.assertEqual("hello\n", buf.string_at())
libonlp.aim_printf(self.aim_pvs_buffer_p, "world\n")
buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p)
self.assertEqual("hello\nworld\n", buf.string_at())
libonlp.aim_pvs_buffer_reset(self.aim_pvs_buffer_p)
buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p)
self.assertEqual(nullString, buf)
class OnlpTest(OnlpTestMixin,
unittest.TestCase):
"""Test interfaces in onlp/onlp.h."""
def setUp(self):
OnlpTestMixin.setUp(self)
def tearDown(self):
OnlpTestMixin.tearDown(self)
def testPlatformDump(self):
"""Verify basic platform dump output."""
flags = 0
libonlp.onlp_platform_dump(self.aim_pvs_buffer_p, flags)
buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p)
bufStr = buf.string_at()
self.assertIn("System Information:", bufStr)
self.assertIn("thermal @ 1", bufStr)
def testPlatformDumpFlags(self):
"""Verify platform dump flags are honored."""
flags = 0
libonlp.onlp_platform_dump(self.aim_pvs_buffer_p, flags)
buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p)
bufStr = buf.string_at()
self.assertIn("psu @ 1", bufStr)
self.assertNotIn("PSU-1 Fan", bufStr)
libonlp.aim_pvs_buffer_reset(self.aim_pvs_buffer_p)
flags = onlp.onlp.ONLP_OID_DUMP_F_RECURSE
libonlp.onlp_platform_dump(self.aim_pvs_buffer_p, flags)
buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p)
bufStr = buf.string_at()
self.assertIn("psu @ 1", bufStr)
self.assertIn("PSU-1 Fan", bufStr)
# hard to test onlp.onlp.ONLP_OID_DUMP_F_RECURSE,
# since it depends on whether a specific component is inserted
def testPlatformShow(self):
"""Verify basic platform show output."""
flags = 0
libonlp.onlp_platform_show(self.aim_pvs_buffer_p, flags)
buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p)
bufStr = buf.string_at()
self.assertIn("System Information:", bufStr)
def testPlatformShowFlags(self):
"""Verify that onlp_platform_show honors flags."""
flags = 0
libonlp.onlp_platform_show(self.aim_pvs_buffer_p, flags)
buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p)
bufStr = buf.string_at()
self.assertNotIn("PSU 1", bufStr)
self.assertNotIn("PSU-1 Fan", bufStr)
libonlp.aim_pvs_buffer_reset(self.aim_pvs_buffer_p)
flags = onlp.onlp.ONLP_OID_SHOW_F_RECURSE
libonlp.onlp_platform_show(self.aim_pvs_buffer_p, flags)
buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p)
bufStr = buf.string_at()
self.assertIn("PSU 1", bufStr)
self.assertIn("PSU-1 Fan", bufStr)
class SysTest(OnlpTestMixin,
unittest.TestCase):
"""Test interfaces in onlp/sys.h."""
def setUp(self):
OnlpTestMixin.setUp(self)
libonlp.onlp_sys_init()
self.sys_info = onlp.onlp.onlp_sys_info()
libonlp.onlp_sys_info_get(ctypes.byref(self.sys_info))
self.sys_info.initialized = True
def tearDown(self):
OnlpTestMixin.tearDown(self)
def testNoop(self):
pass
def testOnieInfo(self):
"""Verify the ONIE fields."""
product_re = re.compile("(.*)-(.*)-(.*)")
m = product_re.match(self.sys_info.onie_info.product_name)
self.assertIsNotNone(m)
vendor_re = re.compile("[A-Z][a-z]*[a-zA-Z0-9_. -]")
m = vendor_re.match(self.sys_info.onie_info.vendor)
self.assertIsNotNone(m)
self.assertIn('.', self.sys_info.onie_info.onie_version)
# see if there are any vendor extensions
# if there are any, make sure the are well-formed
for vx in self.sys_info.onie_info.vx_list:
sz = vx.size
self.assert_(sz <= 256)
def testPlatformInfo(self):
"""Verify the platform info fields."""
# XXX VM platforms have null for both
pass
def auditOidHdr(self, hdr):
self.assertEqual(0, hdr.poid)
self.assertEqual(0, hdr._id)
# root OID
coids = [x for x in hdr.children()]
self.assert_(coids)
self.assert_(len(coids) < onlp.onlp.ONLP_OID_TABLE_SIZE)
def _oidType(oid):
if oid.isSystem():
return "sys"
if oid.isThermal():
return "thm"
if oid.isFan():
return "fan"
if oid.isPsu():
return "psu"
if oid.isLed():
return "led"
if oid.isModule():
return "mod"
if oid.isRtc():
return "rtc"
return "unk"
for coid in coids:
self.log.info("oid %d (%s): %s",
coid._id, _oidType(coid), coid.description)
if _oidType(coid) == "unk":
raise AssertionError("invalid oid")
self.assertEqual(hdr._id, coid.poid)
# if it's a PSU, verify that it has fans
if _oidType(coid) == "psu":
foids = [x for x in coid.children()]
for foid in foids:
self.log.info("oid %d (%s): %s",
foid._id, _oidType(foid), foid.description)
self.assertEqual("fan", _oidType(foid))
# parent should the the PSU or the chassis
if coid._id == foid.poid:
pass
elif hdr._id == foid.poid:
pass
else:
raise AssertionError("invalid parent OID")
def testSysHeaderOnie(self):
"""Test the sys_hdr data that is in the sys_info."""
self.auditOidHdr(self.sys_info.hdr)
# test the iteration of the oid table
def testSysHeader(self):
"""Test the sys_hdr data available via sys_hdr_get."""
pass
if __name__ == "__main__":
logging.basicConfig()
unittest.main()

View File

@@ -0,0 +1,4 @@
"""__init__.py
Test code for the onlp Python bindings.
"""

View File

@@ -0,0 +1,36 @@
#!/usr/bin/python
"""onldump.py
Test harness for Python bindings.
"""
import sys
from ctypes import *
import logging
import onlp.onlp
import onlp.onlplib
logging.basicConfig()
logger = logging.getLogger("onlpdump")
logger.setLevel(logging.DEBUG)
onlp.onlp.onlp_init()
libonlp = onlp.onlp.libonlp
si = onlp.onlp.onlp_sys_info()
libonlp.onlp_sys_info_get(byref(si))
logger.info("hello")
import pdb
pdb.set_trace()
##libonlp.onlp_onie_show(byref(si.onie_info), byref(libonlp.aim_pvs_stdout))
libonlp.onlp_platform_dump(libonlp.aim_pvs_stdout,
(onlp.onlp.ONLP_OID_DUMP_F_RECURSE
| onlp.onlp.ONLP_OID_DUMP_F_EVEN_IF_ABSENT))
sys.exit(0)

View File

@@ -0,0 +1,87 @@
"""__init__.py
Module init for onlp.onlplib
"""
import ctypes
# properly belongs in AIM.aim_list
class list_links(ctypes.Structure):
pass
list_links._fields_ = [("prev", ctypes.POINTER(list_links),),
("next", ctypes.POINTER(list_links),),]
class ListIterator(object):
def __init__(self, links, castType=None):
self.links = links
self.castType = castType
self.cur = self.links.links.next
def next(self):
# Hurr, pointer()/POINTER() types are not directly comparable
p1 = ctypes.cast(self.cur, ctypes.c_void_p)
p2 = ctypes.cast(self.links.links.prev, ctypes.c_void_p)
if p1.value == p2.value:
raise StopIteration
cur, self.cur = self.cur, self.cur.contents.next
if self.castType is not None:
cur = ctypes.cast(cur, ctypes.POINTER(self.castType))
return cur.contents
class list_head(ctypes.Structure):
_fields_ = [("links", list_links,),]
links_klass = list_links
def __iter__(self):
if self.links_klass == list_links:
return ListIterator(self)
else:
return ListIterator(self, castType=self.links_klass)
class onlp_onie_vx(list_links):
# NOTE that Python inheritence merges the fields
# with the base class (ctypes-ism)
_fields_ = [("data", ctypes.c_ubyte * 256,),
("size", ctypes.c_int,),]
class onlp_onie_vx_list_head(list_head):
links_klass = onlp_onie_vx
class onlp_onie_info(ctypes.Structure):
_fields_ = [("product_name", ctypes.c_char_p,),
("part_number", ctypes.c_char_p,),
("serial_number", ctypes.c_char_p,),
("mac", ctypes.c_ubyte * 6,),
("manufacture_date", ctypes.c_char_p,),
("device_version", ctypes.c_ubyte,),
("label_revision", ctypes.c_char_p,),
("platform_name", ctypes.c_char_p,),
("onie_version", ctypes.c_char_p,),
("mac_range", ctypes.c_ushort,),
("manufacturer", ctypes.c_char_p,),
("country_code", ctypes.c_char_p,),
("vendor", ctypes.c_char_p,),
("diag_version", ctypes.c_char_p,),
("service_tag", ctypes.c_char_p,),
("crc", ctypes.c_uint,),
#
# Vendor Extensions list, if available.
#
("vx_list", onlp_onie_vx_list_head,),
# Internal/debug
("_hdr_id_string", ctypes.c_char_p,),
("_hdr_version", ctypes.c_ubyte,),
("_hdr_length", ctypes.c_ubyte,),
("_hdr_valid_crc", ctypes.c_ubyte,),]
class onlp_platform_info(ctypes.Structure):
_fields_ = [("cpld_versions", ctypes.c_char_p,),
("other_versions", ctypes.c_char_p,),]