diff --git a/packages/base/any/onlp/APKG.yml b/packages/base/any/onlp/APKG.yml index a245d8ec..4597e63d 100644 --- a/packages/base/any/onlp/APKG.yml +++ b/packages/base/any/onlp/APKG.yml @@ -26,6 +26,12 @@ packages: builds/onlp-platform/$BUILD_DIR/${TOOLCHAIN}/bin/libonlp-platform.so : $libdir/ builds/onlp-platform-defaults/$BUILD_DIR/${TOOLCHAIN}/bin/libonlp-platform-defaults.so : $libdir/ builds/onlpd/$BUILD_DIR/${TOOLCHAIN}/bin/onlpd : $bindir/ + ${ONL}/packages/base/any/onlp/src/onlpdump.py: $bindir/ + ${ONL}/packages/base/any/onlp/src/onlp/module/python/onlp/__init__.py: ${PY_INSTALL}/onlp/ + ${ONL}/packages/base/any/onlp/src/onlp/module/python/onlp/onlp: ${PY_INSTALL}/onlp/onlp + ${ONL}/packages/base/any/onlp/src/onlp/module/python/onlp/test: ${PY_INSTALL}/onlp/test + ${ONL}/packages/base/any/onlp/src/onlplib/module/python/onlp/onlplib: ${PY_INSTALL}/onlp/onlplib + ${ONL}/packages/base/any/onlp/src/sff/module/python/onlp/sff: ${PY_INSTALL}/onlp/sff init: $ONL/packages/base/any/onlp/src/onlpd.init diff --git a/packages/base/any/onlp/src/onlp/module/auto/make.mk b/packages/base/any/onlp/src/onlp/module/auto/make.mk index 95a06896..dbf853f1 100644 --- a/packages/base/any/onlp/src/onlp/module/auto/make.mk +++ b/packages/base/any/onlp/src/onlp/module/auto/make.mk @@ -24,5 +24,5 @@ ############################################################ onlp_AUTO_DEFS := module/auto/onlp.yml -onlp_AUTO_DIRS := module/inc/onlp module/src module/py +onlp_AUTO_DIRS := module/inc/onlp module/src module/python/onlp/onlp include $(BUILDER)/auto.mk diff --git a/packages/base/any/onlp/src/onlp/module/auto/onlp.yml b/packages/base/any/onlp/src/onlp/module/auto/onlp.yml index f784e320..ac82e314 100644 --- a/packages/base/any/onlp/src/onlp/module/auto/onlp.yml +++ b/packages/base/any/onlp/src/onlp/module/auto/onlp.yml @@ -109,6 +109,17 @@ oid_types: &oid_types - MODULE : 6 - RTC : 7 +# OID dump options +oid_dump: &oid_dump +- RECURSE +- EVEN_IF_ABSENT + +# OID show options +oid_show: &oid_show +- RECURSE +- EXTENDED +- YAML + # SFP Control sfp_control: &sfp_control - RESET @@ -255,6 +266,14 @@ definitions: onlp_oid_type: tag: oid members: *oid_types + onlp_oid_show: + tag: oid + members: *oid_show + flags: True + onlp_oid_dump: + tag: oid + members: *oid_dump + flags: True onlp_thermal_status: tag: thermal members: *thermal_status diff --git a/packages/base/any/onlp/src/onlp/module/inc/onlp/oids.h b/packages/base/any/onlp/src/onlp/module/inc/onlp/oids.h index 20ce5746..13df3243 100644 --- a/packages/base/any/onlp/src/onlp/module/inc/onlp/oids.h +++ b/packages/base/any/onlp/src/onlp/module/inc/onlp/oids.h @@ -47,6 +47,19 @@ typedef uint32_t onlp_oid_t; /* */ +/** onlp_oid_dump */ +typedef enum onlp_oid_dump_e { + ONLP_OID_DUMP_RECURSE = (1 << 0), + ONLP_OID_DUMP_EVEN_IF_ABSENT = (1 << 1), +} onlp_oid_dump_t; + +/** onlp_oid_show */ +typedef enum onlp_oid_show_e { + ONLP_OID_SHOW_RECURSE = (1 << 0), + ONLP_OID_SHOW_EXTENDED = (1 << 1), + ONLP_OID_SHOW_YAML = (1 << 2), +} onlp_oid_show_t; + /** onlp_oid_type */ typedef enum onlp_oid_type_e { ONLP_OID_TYPE_SYS = 1, @@ -121,13 +134,6 @@ typedef struct onlp_oid_hdr_s { } onlp_oid_hdr_t; -#define ONLP_OID_DUMP_F_RECURSE 0x1 -#define ONLP_OID_DUMP_F_EVEN_IF_ABSENT 0x2 - -#define ONLP_OID_SHOW_F_RECURSE 0x1 -#define ONLP_OID_SHOW_F_EXTENDED 0x2 -#define ONLP_OID_SHOW_F_YAML 0x4 - void onlp_oid_dump(onlp_oid_t oid, aim_pvs_t* pvs, uint32_t flags); void onlp_oid_table_dump(onlp_oid_table_t table, aim_pvs_t* pvs, uint32_t flags); @@ -199,6 +205,48 @@ int onlp_oid_hdr_get(onlp_oid_t oid, onlp_oid_hdr_t* hdr); * *****************************************************************************/ /* */ +/** Enum names. */ +const char* onlp_oid_dump_name(onlp_oid_dump_t e); + +/** Enum values. */ +int onlp_oid_dump_value(const char* str, onlp_oid_dump_t* e, int substr); + +/** Enum descriptions. */ +const char* onlp_oid_dump_desc(onlp_oid_dump_t e); + +/** Enum validator. */ +int onlp_oid_dump_valid(onlp_oid_dump_t e); + +/** validator */ +#define ONLP_OID_DUMP_VALID(_e) \ + (onlp_oid_dump_valid((_e))) + +/** onlp_oid_dump_map table. */ +extern aim_map_si_t onlp_oid_dump_map[]; +/** onlp_oid_dump_desc_map table. */ +extern aim_map_si_t onlp_oid_dump_desc_map[]; + +/** Enum names. */ +const char* onlp_oid_show_name(onlp_oid_show_t e); + +/** Enum values. */ +int onlp_oid_show_value(const char* str, onlp_oid_show_t* e, int substr); + +/** Enum descriptions. */ +const char* onlp_oid_show_desc(onlp_oid_show_t e); + +/** Enum validator. */ +int onlp_oid_show_valid(onlp_oid_show_t e); + +/** validator */ +#define ONLP_OID_SHOW_VALID(_e) \ + (onlp_oid_show_valid((_e))) + +/** onlp_oid_show_map table. */ +extern aim_map_si_t onlp_oid_show_map[]; +/** onlp_oid_show_desc_map table. */ +extern aim_map_si_t onlp_oid_show_desc_map[]; + /** Enum names. */ const char* onlp_oid_type_name(onlp_oid_type_t e); diff --git a/packages/base/any/onlp/src/onlp/module/inc/onlp/onlp.x b/packages/base/any/onlp/src/onlp/module/inc/onlp/onlp.x index ebcf42da..988bcf3c 100644 --- a/packages/base/any/onlp/src/onlp/module/inc/onlp/onlp.x +++ b/packages/base/any/onlp/src/onlp/module/inc/onlp/onlp.x @@ -48,6 +48,8 @@ ONLP_ENUMERATION_ENTRY(onlp_fan_status, "") ONLP_ENUMERATION_ENTRY(onlp_led_caps, "") ONLP_ENUMERATION_ENTRY(onlp_led_mode, "") ONLP_ENUMERATION_ENTRY(onlp_led_status, "") +ONLP_ENUMERATION_ENTRY(onlp_oid_dump, "") +ONLP_ENUMERATION_ENTRY(onlp_oid_show, "") ONLP_ENUMERATION_ENTRY(onlp_oid_type, "") ONLP_ENUMERATION_ENTRY(onlp_psu_caps, "") ONLP_ENUMERATION_ENTRY(onlp_psu_status, "") diff --git a/packages/base/any/onlp/src/onlp/module/python/onlp/__init__.py b/packages/base/any/onlp/src/onlp/module/python/onlp/__init__.py new file mode 100644 index 00000000..cd682130 --- /dev/null +++ b/packages/base/any/onlp/src/onlp/module/python/onlp/__init__.py @@ -0,0 +1,4 @@ +"""__init__.py + +Module init for onlp. +""" diff --git a/packages/base/any/onlp/src/onlp/module/python/onlp/onlp/__init__.py b/packages/base/any/onlp/src/onlp/module/python/onlp/onlp/__init__.py new file mode 100644 index 00000000..022764e4 --- /dev/null +++ b/packages/base/any/onlp/src/onlp/module/python/onlp/onlp/__init__.py @@ -0,0 +1,652 @@ +"""__init__.py + +Module init for onlp.onlp +""" + +import ctypes + +libonlp = ctypes.cdll.LoadLibrary("libonlp.so") +libonlp.onlp_init() + +import ctypes.util +libc = ctypes.cdll.LoadLibrary(ctypes.util.find_library("c")) + +import onlp.onlplib +import onlp.sff + +from onlp.onlp.enums import * + +# AIM/aim_memory.h + +class aim_void_p(ctypes.c_void_p): + """Generic data allocated by AIM.""" + def __del__(self): + libonlp.aim_free(self) + +class aim_char_p(aim_void_p): + """AIM data that is a printable string.""" + + def __init__(self, stringOrAddress): + + if stringOrAddress is None: + aim_void_p.__init__(self, stringOrAddress) + return + + if isinstance(stringOrAddress, aim_void_p): + aim_void_p.__init__(self, stringOrAddress) + return + + if isinstance(stringOrAddress, basestring): + cs = ctypes.c_char_p(stringOrAddress) + ptr = libonlp.aim_malloc(len(stringOrAddress)+1) + libc.strcpy(ptr, ctypes.addressof(cs)) + aim_void_p.__init__(self, ptr) + return + + if type(stringOrAddress) == int: + aim_void_p.__init__(self, stringOrAddress) + return + + raise ValueError("invalid initializer for aim_char_p: %s" + % repr(stringOrAddress)) + + def string_at(self): + if self.value: + return ctypes.string_at(self.value) + else: + return None + + def __eq__(self, other): + return (isinstance(other, aim_char_p) + and (self.string_at()==other.string_at())) + + def __neq__(self, other): + return not self == other + + def __hash__(self): + return hash(self.string_at()) + +def aim_memory_init_prototypes(): + + libonlp.aim_malloc.restype = aim_void_p + libonlp.aim_malloc.argtypes = (ctypes.c_size_t,) + + libonlp.aim_free.restype = None + libonlp.aim_free.argtypes = (aim_void_p,) + +# AIM/aim_object.h + +aim_object_dtor = ctypes.CFUNCTYPE(None, ctypes.c_void_p) + +class aim_object(ctypes.Structure): + _fields_ = [("_id", ctypes.c_char_p,), + ("subtype", ctypes.c_int,), + ("cookie", ctypes.c_void_p,), + ("destructor", aim_object_dtor,),] + +# AIM/aim_pvs.h +# AIM/aim_pvs_*.h + +aim_vprintf_f = ctypes.CFUNCTYPE(ctypes.c_int) + +class aim_pvs(ctypes.Structure): + _fields_ = [("object", aim_object,), + ("description", ctypes.c_char_p,), + ("vprintf", aim_vprintf_f,), + ("enabled", ctypes.c_int,), + ("counter", ctypes.c_uint32,), + ("isatty", aim_vprintf_f,),] + +def aim_pvs_init_prototypes(): + + libonlp.aim_pvs_buffer_create.restype = ctypes.POINTER(aim_pvs) + + libonlp.aim_pvs_destroy.restype = None + libonlp.aim_pvs_destroy.argtypes = (ctypes.POINTER(aim_pvs),) + + libonlp.aim_pvs_buffer_size.restype = ctypes.c_int + libonlp.aim_pvs_buffer_size.argtypes = (ctypes.POINTER(aim_pvs),) + + libonlp.aim_pvs_buffer_get.restype = aim_char_p + libonlp.aim_pvs_buffer_get.argtypes = (ctypes.POINTER(aim_pvs),) + + libonlp.aim_pvs_buffer_reset.restype = None + libonlp.aim_pvs_buffer_reset.argtypes = (ctypes.POINTER(aim_pvs),) + +# AIM/aim_bitmap.h + +aim_bitmap_word = ctypes.c_uint32 +AIM_BITMAP_BITS_PER_WORD = 32 + +def AIM_BITMAP_WORD_COUNT(bitcount): + return ((bitcount // AIM_BITMAP_BITS_PER_WORD) + + (1 if (bitcount % AIM_BITMAP_BITS_PER_WORD) else 0)) + +# ugh, most of aim_bitmap.h is inline C + +def AIM_BITMAP_HDR_WORD_GET(hdr, word): + """Return a specific ctypes word.""" + return hdr.words[word] + +def AIM_BITMAP_HDR_BIT_WORD_GET(hdr, bit): + """Return the ctypes word holding this bit.""" + return hdr.words[bit/AIM_BITMAP_BITS_PER_WORD] + +def AIM_BITMAP_HDR_BIT_WORD_SET(hdr, bit, word): + """Return the ctypes word holding this bit.""" + hdr.words[bit/AIM_BITMAP_BITS_PER_WORD] = word + +def AIM_BITMAP_BIT_POS(bit): + return (1<<(bit % AIM_BITMAP_BITS_PER_WORD)) + +def AIM_BITMAP_INIT(bitmap, count): + """Initialize a static bitmap.""" + libc.memset(ctypes.byref(bitmap), 0, ctypes.sizeof(bitmap)) + bitmap.hdr.maxbit = count + bitmap.hdr.words = ctypes.cast(ctypes.byref(bitmap.words), ctypes.POINTER(ctypes.c_uint)) + bitmap.hdr.wordcount = AIM_BITMAP_WORD_COUNT(count) + +class aim_bitmap_hdr(ctypes.Structure): + _fields_ = [("wordcount", ctypes.c_int,), + ("words", ctypes.POINTER(aim_bitmap_word),), + ("maxbit", ctypes.c_int,), + ("allocated", ctypes.c_int,),] + +class aim_bitmap(ctypes.Structure): + _fields_ = [("hdr", aim_bitmap_hdr,),] + + @classmethod + def fromAim(cls, bitcount): + """Return a pointer to a bitmap from aim_alloc(). + + Pre-initialized; needs to be freed with aim_free(). + """ + return libonlp.aim_bitmap_alloc(None, bitcount) + +class aim_bitmap256(aim_bitmap): + """Statically-allocated AIM bitmap.""" + _fields_ = [("words", aim_bitmap_word * AIM_BITMAP_WORD_COUNT(256),),] + + def __init__(self): + super(aim_bitmap256, self).__init__() + AIM_BITMAP_INIT(self, 255) + +def aim_bitmap_set(hdr, bit): + word = AIM_BITMAP_HDR_BIT_WORD_GET(hdr, bit) + word |= AIM_BITMAP_BIT_POS(bit) + AIM_BITMAP_HDR_BIT_WORD_SET(hdr, bit, word) + +def aim_bitmap_clr(hdr, bit): + word = AIM_BITMAP_HDR_BIT_WORD_GET(hdr, bit) + word &= ~(AIM_BITMAP_BIT_POS(bit)) + AIM_BITMAP_HDR_BIT_WORD_SET(hdr, bit, word) + +def aim_bitmap_mod(hdr, bit, value): + if value: + aim_bitmap_set(hdr, bit) + else: + aim_bitmap_clr(hdr, bit) + +def aim_bitmap_get(hdr, bit): + val = AIM_BITMAP_HDR_BIT_WORD_GET(hdr,bit) & AIM_BITMAP_BIT_POS(bit) + return 1 if val else 0 + +# Huh, these is inline too, but calls into glibc memset + +def aim_bitmap_set_all(hdr): + libc.memset(ctypes.byref(hdr.words), 0xFF, hdr.wordcount*ctypes.sizeof(aim_bitmap_word)) + +def aim_bitmap_clr_all(hdr): + libc.memset(ctypes.byref(hdr.words), 0x00, hdr.wordcount*ctypes.sizeof(aim_bitmap_word)) + +# XXX aim_bitmap_count is left out + +def aim_bitmap_init_prototypes(): + + libonlp.aim_bitmap_alloc.restype = ctypes.POINTER(aim_bitmap) + libonlp.aim_bitmap_alloc.argtypes = (ctypes.POINTER(aim_bitmap), ctypes.c_int,) + + libonlp.aim_bitmap_free.restype = None + libonlp.aim_bitmap_free.argtypes = (ctypes.POINTER(aim_bitmap),) + +# onlp.yml + +##ONLP_CONFIG_INFO_STR_MAX = int(libonlp.onlp_config_lookup("ONLP_CONFIG_INFO_STR_MAX")) +ONLP_CONFIG_INFO_STR_MAX = 64 +# prototype for onlp_config_lookup is not defined yet, see below + +# onlp/oids.h + +onlp_oid = ctypes.c_uint32 + +ONLP_OID_SYS = (ONLP_OID_TYPE.SYS<<24) | 1 +# XXX not a config option + +ONLP_OID_DESC_SIZE = 128 +ONLP_OID_TABLE_SIZE = 32 +# XXX not a config option + +class OidTableIterator(object): + + def __init__(self, hdr): + self.hdr = hdr + self.idx = 0 + + def __iter__(self): + return self + + def next(self): + if self.idx >= ONLP_OID_TABLE_SIZE: + raise StopIteration + oid = self.hdr.coids[self.idx] + self.idx += 1 + if oid == 0: + raise StopIteration + oidHdr = onlp_oid_hdr() + libonlp.onlp_oid_hdr_get(oid, ctypes.byref(oidHdr)) + return oidHdr + +class onlp_oid_hdr(ctypes.Structure): + + _fields_ = [("_id", onlp_oid,), + ("description", ctypes.c_char * ONLP_OID_DESC_SIZE,), + ("poid", onlp_oid,), + ("coids", onlp_oid * ONLP_OID_TABLE_SIZE,),] + + def getType(self): + return self._id >> 24 + + def isSystem(self): + return self.getType() == ONLP_OID_TYPE.SYS + def isThermal(self): + return self.getType() == ONLP_OID_TYPE.THERMAL + def isFan(self): + return self.getType() == ONLP_OID_TYPE.FAN + def isPsu(self): + return self.getType() == ONLP_OID_TYPE.PSU + def isLed(self): + return self.getType() == ONLP_OID_TYPE.LED + def isModule(self): + return self.getType() == ONLP_OID_TYPE.MODULE + def isRtc(self): + return self.getType() == ONLP_OID_TYPE.RTC + + def children(self): + return OidTableIterator(self) + +onlp_oid_iterate_f = ctypes.CFUNCTYPE(ctypes.c_int, onlp_oid, ctypes.c_void_p) + +def onlp_oid_init_prototypes(): + + libonlp.onlp_oid_dump.restype = None + libonlp.onlp_oid_dump.argtypes = (onlp_oid, ctypes.POINTER(aim_pvs), ctypes.c_uint32,) + + libonlp.onlp_oid_table_dump.restype = None + libonlp.onlp_oid_table_dump.argtypes = (ctypes.POINTER(onlp_oid), ctypes.POINTER(aim_pvs), ctypes.c_uint32,) + + libonlp.onlp_oid_show.restype = None + libonlp.onlp_oid_show.argtypes = (onlp_oid, ctypes.POINTER(aim_pvs), ctypes.c_uint32,) + + libonlp.onlp_oid_table_show.restype = None + libonlp.onlp_oid_table_show.argtypes = (ctypes.POINTER(onlp_oid), ctypes.POINTER(aim_pvs), ctypes.c_uint32,) + + libonlp.onlp_oid_iterate.restype = ctypes.c_int + libonlp.onlp_oid_iterate.argtypes = (onlp_oid, ctypes.c_int, + onlp_oid_iterate_f, ctypes.c_void_p,) + + libonlp.onlp_oid_hdr_get.restype = ctypes.c_int + libonlp.onlp_oid_hdr_get.argtypes = (onlp_oid, ctypes.POINTER(onlp_oid_hdr,)) + +# onlp/sys.h + +class onlp_sys_info(ctypes.Structure): + + initialized = False + + _fields_ = [("hdr", onlp_oid_hdr,), + ("onie_info", onlp.onlplib.onlp_onie_info,), + ("platform_info", onlp.onlplib.onlp_platform_info,),] + + def __del__(self): + if self.initialized: + libonlp.onlp_sys_info_free(ctypes.byref(self)) + +def onlp_sys_init_prototypes(): + + libonlp.onlp_sys_init.restype = ctypes.c_int + + libonlp.onlp_sys_info_get.restype = ctypes.c_int + libonlp.onlp_sys_info_get.argtypes = (ctypes.POINTER(onlp_sys_info),) + + libonlp.onlp_sys_info_free.restype = None + libonlp.onlp_sys_info_get.argtypes = (ctypes.POINTER(onlp_sys_info),) + + libonlp.onlp_sys_hdr_get.restype = ctypes.c_int + libonlp.onlp_sys_hdr_get.argtypes = (ctypes.POINTER(onlp_oid_hdr),) + + libonlp.onlp_sys_dump.restype = None + libonlp.onlp_sys_dump.argtypes = (onlp_oid, ctypes.POINTER(aim_pvs), ctypes.c_uint32,) + + libonlp.onlp_sys_show.restype = None + libonlp.onlp_sys_show.argtypes = (onlp_oid, ctypes.POINTER(aim_pvs), ctypes.c_uint32,) + + libonlp.onlp_sys_ioctl.restype = ctypes.c_int + # leave the parameters empty (varargs) + + ##libonlp.onlp_sys_vioctl.restype = ctypes.c_int + # NOTE that ctypes cannot automatically handle va_list + + libonlp.onlp_sys_platform_manage_start.restype = ctypes.c_int + libonlp.onlp_sys_platform_manage_start.argtypes = (ctypes.c_int,) + + libonlp.onlp_sys_platform_manage_stop.restype = ctypes.c_int + libonlp.onlp_sys_platform_manage_stop.argtypes = (ctypes.c_int,) + + libonlp.onlp_sys_platform_manage_join.restype = ctypes.c_int + + libonlp.onlp_sys_platform_manage_now.restype = None + + libonlp.onlp_sys_debug.restype = ctypes.c_int + libonlp.onlp_sys_debug.argtypes = (ctypes.POINTER(aim_pvs), ctypes.c_int, + ctypes.POINTER(ctypes.POINTER(ctypes.c_char)),) + +# onlp/fan.h + +class onlp_fan_info(ctypes.Structure): + _fields_ = [("hdr", onlp_oid_hdr,), + ("status", ctypes.c_uint32,), + ("caps", ctypes.c_uint32,), + ("rpm", ctypes.c_int,), + ("percentage", ctypes.c_int,), + ("mode", ctypes.c_uint32,), + ("model", ctypes.c_char * ONLP_CONFIG_INFO_STR_MAX,), + ("serial", ctypes.c_char * ONLP_CONFIG_INFO_STR_MAX,),] + + def isPresent(self): + return self.status & ONLP_FAN_STATUS.PRESENT + + def isMissing(self): + return not self.PRESENT() + + def isFailed(self): + return self.status & ONLP_FAN_STATUS.FAILED + + def isNormal(self): + return self.isPresent() and not self.isFailed() + +def onlp_fan_init_prototypes(): + + libonlp.onlp_fan_init.restype = None + + libonlp.onlp_fan_info_get.restype = ctypes.c_int + libonlp.onlp_fan_info_get.argtypes = (onlp_oid, ctypes.POINTER(onlp_fan_info),) + + libonlp.onlp_fan_status_get.restype = ctypes.c_int + libonlp.onlp_fan_status_get.argtypes = (onlp_oid, ctypes.POINTER(ctypes.c_uint32),) + + libonlp.onlp_fan_hdr_get.restype = ctypes.c_int + libonlp.onlp_fan_hdr_get.argtypes = (onlp_oid, ctypes.POINTER(onlp_oid_hdr),) + + libonlp.onlp_fan_rpm_set.restype = ctypes.c_int + libonlp.onlp_fan_rpm_set.argtypes = (onlp_oid, ctypes.c_int,) + + libonlp.onlp_fan_percentage_set.restype = ctypes.c_int + libonlp.onlp_fan_percentage_set.argtypes = (onlp_oid, ctypes.c_int,) + + libonlp.onlp_fan_mode_set.restype = ctypes.c_int + libonlp.onlp_fan_mode_set.argtypes = (onlp_oid, ctypes.c_uint32,) + + libonlp.onlp_fan_dir_set.restype = ctypes.c_int + libonlp.onlp_fan_dir_set.argtypes = (onlp_oid, ctypes.c_uint32,) + + libonlp.onlp_fan_dump.restype = None + libonlp.onlp_fan_dump.argtypes = (onlp_oid, ctypes.POINTER(aim_pvs), ctypes.c_uint32,) + + libonlp.onlp_fan_show.restype = None + libonlp.onlp_fan_show.argtypes = (onlp_oid, ctypes.POINTER(aim_pvs), ctypes.c_uint32,) + +# onlp/led.h + +class onlp_led_info(ctypes.Structure): + _fields_ = [("hdr", onlp_oid_hdr,), + ("status", ctypes.c_uint32,), + ("caps", ctypes.c_uint32,), + ("mode", ctypes.c_uint32,), + ("character", ctypes.c_char,),] + +def onlp_led_init_prototypes(): + + libonlp.onlp_led_init.restype = None + + libonlp.onlp_led_info_get.restype = ctypes.c_int + libonlp.onlp_led_info_get.argtypes = (onlp_oid, ctypes.POINTER(onlp_led_info),) + + libonlp.onlp_led_status_get.restype = ctypes.c_int + libonlp.onlp_led_status_get.argtypes = (onlp_oid, ctypes.POINTER(ctypes.c_uint32),) + + libonlp.onlp_led_hdr_get.restype = ctypes.c_int + libonlp.onlp_led_hdr_get.argtypes = (onlp_oid, ctypes.POINTER(onlp_oid_hdr),) + + libonlp.onlp_led_set.restype = ctypes.c_int + libonlp.onlp_led_set.argtypes = (onlp_oid, ctypes.c_int,) + + libonlp.onlp_led_mode_set.restype = ctypes.c_int + libonlp.onlp_led_mode_set.argtypes = (onlp_oid, ctypes.c_uint32,) + + libonlp.onlp_led_char_set.restype = ctypes.c_int + libonlp.onlp_led_char_set.argtypes = (onlp_oid, ctypes.c_char,) + + libonlp.onlp_led_dump.restype = None + libonlp.onlp_led_dump.argtypes = (onlp_oid, ctypes.POINTER(aim_pvs), ctypes.c_uint32,) + + libonlp.onlp_led_show.restype = None + libonlp.onlp_led_show.argtypes = (onlp_oid, ctypes.POINTER(aim_pvs), ctypes.c_uint32,) + +# onlp/onlp_config.h + +# don't need the actual config structure, since we'll be using lookups + +def onlp_config_init_prototypes(): + + libonlp.onlp_config_lookup.restype = ctypes.c_char_p + libonlp.onlp_config_lookup.argtypes = (ctypes.c_char_p,) + + libonlp.onlp_config_show.restype = ctypes.c_int + libonlp.onlp_config_show.argtypes = (ctypes.POINTER(aim_pvs),) + +# onlp/thermal.h + +class onlp_thermal_info_thresholds(ctypes.Structure): + _fields_ = [('warning', ctypes.c_int,), + ('error', ctypes.c_int,), + ('shutdown', ctypes.c_int,),] + +class onlp_thermal_info(ctypes.Structure): + _fields_ = [('hdr', onlp_oid_hdr,), + ('status', ctypes.c_uint32,), + ('caps', ctypes.c_uint32,), + ('mcelcius', ctypes.c_int,), + ('thresholds', onlp_thermal_info_thresholds,),] + +def onlp_thermal_init_prototypes(): + + libonlp.onlp_thermal_init.restype = ctypes.c_int + + libonlp.onlp_thermal_info_get.restype = ctypes.c_int + libonlp.onlp_thermal_info_get.argtypes = (onlp_oid, ctypes.POINTER(onlp_thermal_info),) + + libonlp.onlp_thermal_status_get.restype = ctypes.c_int + libonlp.onlp_thermal_status_get.argtypes = (onlp_oid, ctypes.POINTER(ctypes.c_uint32),) + + libonlp.onlp_thermal_hdr_get.restype = ctypes.c_int + libonlp.onlp_thermal_hdr_get.argtypes = (onlp_oid, ctypes.POINTER(onlp_oid_hdr),) + + libonlp.onlp_thermal_ioctl.restype = ctypes.c_int + + libonlp.onlp_thermal_dump.restype = None + libonlp.onlp_thermal_dump.argtypes = (onlp_oid, ctypes.POINTER(aim_pvs),) + + libonlp.onlp_thermal_show.restype = None + libonlp.onlp_thermal_show.argtypes = (onlp_oid, ctypes.POINTER(aim_pvs),) + +# onlp/psu.h + +class onlp_psu_info(ctypes.Structure): + _fields_ = [("hdr", onlp_oid_hdr,), + ("model", ctypes.c_char * ONLP_CONFIG_INFO_STR_MAX,), + ("serial", ctypes.c_char * ONLP_CONFIG_INFO_STR_MAX,), + ("status", ctypes.c_uint32,), + ("caps", ctypes.c_uint32,), + ("mvin", ctypes.c_int,), + ("mvout", ctypes.c_int,), + ("miin", ctypes.c_int,), + ("miout", ctypes.c_int,), + ("mpin", ctypes.c_int,), + ("mpout", ctypes.c_int,),] + +def onlp_psu_init_prototypes(): + + libonlp.onlp_psu_init.restype = ctypes.c_int + + libonlp.onlp_psu_info_get.restype = ctypes.c_int + libonlp.onlp_psu_info_get.argtypes = (onlp_oid, ctypes.POINTER(onlp_psu_info),) + + libonlp.onlp_psu_status_get.restype = ctypes.c_int + libonlp.onlp_psu_status_get.argtypes = (onlp_oid, ctypes.POINTER(ctypes.c_uint32),) + + libonlp.onlp_psu_hdr_get.restype = ctypes.c_int + libonlp.onlp_psu_hdr_get.argtypes = (onlp_oid, ctypes.POINTER(onlp_oid_hdr),) + + libonlp.onlp_psu_ioctl.restype = ctypes.c_int + + libonlp.onlp_psu_dump.restype = None + libonlp.onlp_psu_dump.argtypes = (onlp_oid, ctypes.POINTER(aim_pvs),) + + libonlp.onlp_psu_show.restype = None + libonlp.onlp_psu_show.argtypes = (onlp_oid, ctypes.POINTER(aim_pvs),) + +# sff/sff.h + +def sff_init_prototypes(): + + libonlp.sff_sfp_type_get.restype = onlp.sff.sff_sfp_type + libonlp.sff_sfp_type_get.argtypes = (ctypes.POINTER(ctypes.c_ubyte),) + + libonlp.sff_module_type_get.restype = onlp.sff.sff_module_type + libonlp.sff_module_type_get.argtypes = (ctypes.POINTER(ctypes.c_ubyte),) + + libonlp.sff_media_type_get.restype = onlp.sff.sff_media_type + libonlp.sff_media_type_get.argtypes = (onlp.sff.sff_module_type,) + + libonlp.sff_module_caps_get.restype = ctypes.c_int + libonlp.sff_module_caps_get.argtypes = (onlp.sff.sff_module_type, ctypes.POINTER(ctypes.c_uint32),) + + libonlp.sff_eeprom_parse.restype = ctypes.c_int + libonlp.sff_eeprom_parse.argtypes = (ctypes.POINTER(onlp.sff.sff_eeprom), ctypes.POINTER(ctypes.c_ubyte),) + + libonlp.sff_eeprom_parse_file.restype = ctypes.c_int + libonlp.sff_eeprom_parse_file.argtypes = (ctypes.POINTER(onlp.sff.sff_eeprom), ctypes.c_char_p,) + + libonlp.sff_eeprom_invalidate.restype = None + libonlp.sff_eeprom_invalidate.argtypes = (ctypes.POINTER(onlp.sff.sff_eeprom),) + + libonlp.sff_eeprom_validate.restype = ctypes.c_int + libonlp.sff_eeprom_validate.argtypes = (ctypes.POINTER(onlp.sff.sff_eeprom), ctypes.c_int,) + + libonlp.sff_info_show.restype = None + libonlp.sff_info_show.argtypes = (ctypes.POINTER(onlp.sff.sff_info), ctypes.POINTER(aim_pvs),) + + libonlp.sff_info_init.restype = ctypes.c_int + libonlp.sff_info_init.argtypes = (ctypes.POINTER(onlp.sff.sff_info), onlp.sff.sff_module_type, + ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, + ctypes.c_int,) + +# onlp/sff.h + +onlp_sfp_bitmap = aim_bitmap256 + +onlp_sfp_control = ctypes.c_int + +def onlp_sfp_init_prototypes(): + + libonlp.onlp_sfp_init.restype = ctypes.c_int + + libonlp.onlp_sfp_bitmap_t_init.restype = None + libonlp.onlp_sfp_bitmap_t_init.argtypes = (ctypes.POINTER(onlp_sfp_bitmap),) + + libonlp.onlp_sfp_bitmap_get.restype = ctypes.c_int + libonlp.onlp_sfp_bitmap_get.argtypes = (ctypes.POINTER(onlp_sfp_bitmap),) + + libonlp.onlp_sfp_port_valid.restype = ctypes.c_int + libonlp.onlp_sfp_port_valid.argtypes = (ctypes.c_int,) + + libonlp.onlp_sfp_is_present.restype = ctypes.c_int + libonlp.onlp_sfp_is_present.argtypes = (ctypes.c_int,) + + libonlp.onlp_sfp_presence_bitmap_get.restype = ctypes.c_int + libonlp.onlp_sfp_presence_bitmap_get.argtypes = (ctypes.POINTER(onlp_sfp_bitmap),) + + libonlp.onlp_sfp_eeprom_read.restype = ctypes.c_int + libonlp.onlp_sfp_eeprom_read.argtypes = (ctypes.c_int, ctypes.POINTER(ctypes.POINTER(ctypes.c_ubyte,)),) + + libonlp.onlp_sfp_dom_read.restype = ctypes.c_int + libonlp.onlp_sfp_dom_read.argtypes = (ctypes.c_int, ctypes.POINTER(ctypes.POINTER(ctypes.c_ubyte)),) + + libonlp.onlp_sfp_denit.restype = ctypes.c_int + + libonlp.onlp_sfp_rx_los_bitmap_get.restype = ctypes.c_int + libonlp.onlp_sfp_rx_los_bitmap_get.argtypes = (ctypes.POINTER(onlp_sfp_bitmap),) + + libonlp.onlp_sfp_dev_readb.restype = ctypes.c_int + libonlp.onlp_sfp_dev_readb.argtypes = (ctypes.c_int, ctypes.c_ubyte, ctypes.c_ubyte,) + + libonlp.onlp_sfp_dev_writeb.restype = ctypes.c_int + libonlp.onlp_sfp_dev_writeb.argtypes = (ctypes.c_int, ctypes.c_ubyte, ctypes.c_ubyte, ctypes.c_ubyte) + + libonlp.onlp_sfp_dev_readw.restype = ctypes.c_int + libonlp.onlp_sfp_dev_readw.argtypes = (ctypes.c_int, ctypes.c_ubyte, ctypes.c_ubyte,) + + libonlp.onlp_sfp_dev_writew.restype = ctypes.c_int + libonlp.onlp_sfp_dev_writew.argtypes = (ctypes.c_int, ctypes.c_ubyte, ctypes.c_ubyte, ctypes.c_ushort) + + libonlp.onlp_sfp_dump.restype = None + libonlp.onlp_sfp_dump.argtypes = (ctypes.POINTER(aim_pvs),) + + libonlp.onlp_sfp_ioctl.restype = ctypes.c_int + + libonlp.onlp_sfp_post_insert.restype = ctypes.c_int + libonlp.onlp_sfp_post_insert.argtypes = (ctypes.c_int, ctypes.POINTER(onlp.sff.sff_info),) + + libonlp.onlp_sfp_control_set.restype = ctypes.c_int + libonlp.onlp_sfp_control_set.argtypes = (ctypes.c_int, onlp_sfp_control, ctypes.c_int,) + + libonlp.onlp_sfp_control_get.restype = ctypes.c_int + libonlp.onlp_sfp_control_get.argtypes = (ctypes.c_int, onlp_sfp_control, ctypes.POINTER(ctypes.c_int)) + + libonlp.onlp_sfp_control_flags_get.restype = ctypes.c_int + libonlp.onlp_sfp_control_flags_get.argtyeps = (ctypes.c_int, ctypes.POINTER(ctypes.c_uint32),) + +# onlp/onlp.h + +def init_prototypes(): + aim_memory_init_prototypes() + aim_pvs_init_prototypes() + aim_bitmap_init_prototypes() + onlp_oid_init_prototypes() + onlp_sys_init_prototypes() + onlp_fan_init_prototypes() + onlp_led_init_prototypes() + + onlp_config_init_prototypes() + + strMax = int(libonlp.onlp_config_lookup("ONLP_CONFIG_INFO_STR_MAX")) + if ONLP_CONFIG_INFO_STR_MAX != strMax: + raise AssertionError("ONLP_CONFIG_INFO_STR_MAX changed from %d to %d" + % (ONLP_CONFIG_INFO_STR_MAX, strMax,)) + + onlp_thermal_init_prototypes() + onlp_psu_init_prototypes() + sff_init_prototypes() + onlp_sfp_init_prototypes() + +init_prototypes() diff --git a/packages/base/any/onlp/src/onlp/module/py/enums.py b/packages/base/any/onlp/src/onlp/module/python/onlp/onlp/enums.py similarity index 94% rename from packages/base/any/onlp/src/onlp/module/py/enums.py rename to packages/base/any/onlp/src/onlp/module/python/onlp/onlp/enums.py index 65cd75ec..a747c9bc 100644 --- a/packages/base/any/onlp/src/onlp/module/py/enums.py +++ b/packages/base/any/onlp/src/onlp/module/python/onlp/onlp/enums.py @@ -88,6 +88,17 @@ class ONLP_LED_STATUS(Enumeration): ON = (1 << 2) +class ONLP_OID_DUMP(Enumeration): + RECURSE = (1 << 0) + EVEN_IF_ABSENT = (1 << 1) + + +class ONLP_OID_SHOW(Enumeration): + RECURSE = (1 << 0) + EXTENDED = (1 << 1) + YAML = (1 << 2) + + class ONLP_OID_TYPE(Enumeration): SYS = 1 THERMAL = 2 diff --git a/packages/base/any/onlp/src/onlp/module/python/onlp/test/OnlpApiTest.py b/packages/base/any/onlp/src/onlp/module/python/onlp/test/OnlpApiTest.py new file mode 100644 index 00000000..121ae40d --- /dev/null +++ b/packages/base/any/onlp/src/onlp/module/python/onlp/test/OnlpApiTest.py @@ -0,0 +1,1531 @@ +"""OnlpApiTest.py + +Test the API bindings. +""" + +import ctypes +import unittest +import logging +import re +import time +import subprocess +import random +import tempfile +import os + +import onlp.onlp +import onlp.sff + +libonlp = onlp.onlp.libonlp + +def isVirtual(): + with open("/etc/onl/platform") as fd: + buf = fd.read() + return "bigswitch" in buf + +def skipIfVirtual(reason="this test only works with a physical device"): + return unittest.skipIf(isVirtual(), reason) + +class OnlpTestMixin(object): + + def setUp(self): + + self.log = logging.getLogger("onlp") + self.log.setLevel(logging.DEBUG) + + self.aim_pvs_buffer_p = libonlp.aim_pvs_buffer_create() + self.aim_pvs_buffer = self.aim_pvs_buffer_p.contents + + def tearDown(self): + + libonlp.aim_pvs_destroy(self.aim_pvs_buffer_p) + + def assertStatusOK(self, sts): + if sts == onlp.onlp.ONLP_STATUS.OK: + return + raise AssertionError("invalid ONLP status %s" % onlp.onlp.ONLP_STATUS.name(sts)) + +class InitTest(OnlpTestMixin, + unittest.TestCase): + + def setUp(self): + OnlpTestMixin.setUp(self) + + def tearDown(self): + OnlpTestMixin.tearDown(self) + + def testInit(self): + """Verify that the library can be loaded.""" + pass + + def testBuffer(self): + """Verify that the AIM buffer type is usable.""" + + nullString = onlp.onlp.aim_char_p(None) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + self.assertEqual(nullString, buf) + + libonlp.aim_printf(self.aim_pvs_buffer_p, "hello\n") + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + self.assertEqual("hello\n", buf.string_at()) + + libonlp.aim_printf(self.aim_pvs_buffer_p, "world\n") + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + self.assertEqual("hello\nworld\n", buf.string_at()) + + libonlp.aim_printf(self.aim_pvs_buffer_p, "%d\n", 42) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + self.assertEqual("hello\nworld\n42\n", buf.string_at()) + + libonlp.aim_pvs_buffer_reset(self.aim_pvs_buffer_p) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + self.assertEqual(nullString, buf) + +class OnlpTest(OnlpTestMixin, + unittest.TestCase): + """Test interfaces in onlp/onlp.h.""" + + def setUp(self): + OnlpTestMixin.setUp(self) + + def tearDown(self): + OnlpTestMixin.tearDown(self) + + def testPlatformDump(self): + """Verify basic platform dump output.""" + + flags = 0 + libonlp.onlp_platform_dump(self.aim_pvs_buffer_p, flags) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + bufStr = buf.string_at() + self.assertIn("System Information:", bufStr) + self.assertIn("thermal @ 1", bufStr) + + def testPlatformDumpFlags(self): + """Verify platform dump flags are honored.""" + + flags = 0 + libonlp.onlp_platform_dump(self.aim_pvs_buffer_p, flags) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + bufStr = buf.string_at() + self.assertIn("psu @ 1", bufStr) + self.assertNotIn("PSU-1 Fan", bufStr) + + libonlp.aim_pvs_buffer_reset(self.aim_pvs_buffer_p) + + flags = onlp.onlp.ONLP_OID_DUMP.RECURSE + libonlp.onlp_platform_dump(self.aim_pvs_buffer_p, flags) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + bufStr = buf.string_at() + self.assertIn("psu @ 1", bufStr) + self.assertIn("PSU-1 Fan", bufStr) + + # hard to test onlp.onlp.ONLP_OID_DUMP.RECURSE, + # since it depends on whether a specific component is inserted + + def testPlatformShow(self): + """Verify basic platform show output.""" + + flags = 0 + libonlp.onlp_platform_show(self.aim_pvs_buffer_p, flags) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + bufStr = buf.string_at() + self.assertIn("System Information:", bufStr) + + def testPlatformShowFlags(self): + """Verify that onlp_platform_show honors flags.""" + + flags = 0 + libonlp.onlp_platform_show(self.aim_pvs_buffer_p, flags) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + bufStr = buf.string_at() + self.assertNotIn("PSU 1", bufStr) + self.assertNotIn("PSU-1 Fan", bufStr) + + libonlp.aim_pvs_buffer_reset(self.aim_pvs_buffer_p) + + flags = onlp.onlp.ONLP_OID_SHOW.RECURSE + libonlp.onlp_platform_show(self.aim_pvs_buffer_p, flags) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + bufStr = buf.string_at() + self.assertIn("PSU 1", bufStr) + self.assertIn("PSU-1 Fan", bufStr) + +class SysHdrMixin(object): + + def auditOidHdr(self, hdr): + + self.assertEqual(0, hdr.poid) + if hdr._id == onlp.onlp.ONLP_OID_SYS: + pass + elif hdr._id == 0: + self.log.warn("invalid system OID 0") + else: + raise AssertionError("invalid system OID") + # root OID + + coids = [x for x in hdr.children()] + self.assert_(coids) + self.assertLess(len(coids), onlp.onlp.ONLP_OID_TABLE_SIZE) + + def _oidType(oid): + return onlp.onlp.ONLP_OID_TYPE.name(oid._id>>24) + + for coid in coids: + self.log.info("oid %d (%s): %s", + coid._id, _oidType(coid), coid.description) + if _oidType(coid) is None: + raise AssertionError("invalid oid") + self.assertEqual(hdr._id, coid.poid) + + # if it's a PSU, verify that it has fans + if _oidType(coid) == "PSU": + foids = [x for x in coid.children()] + for foid in foids: + self.log.info("oid %d (%s): %s", + foid._id, _oidType(foid), foid.description) + if _oidType(foid) in ["FAN", "THERMAL",]: + pass + else: + raise AssertionError("invalid child oid") + # parent should the the PSU or the chassis + if coid._id == foid.poid: + pass + elif hdr._id == foid.poid: + pass + else: + raise AssertionError("invalid parent OID") + +class SysTest(OnlpTestMixin, + SysHdrMixin, + unittest.TestCase): + """Test interfaces in onlp/sys.h.""" + + def setUp(self): + OnlpTestMixin.setUp(self) + + libonlp.onlp_sys_init() + self.sys_info = onlp.onlp.onlp_sys_info() + + libonlp.onlp_sys_info_get(ctypes.byref(self.sys_info)) + self.sys_info.initialized = True + + def tearDown(self): + OnlpTestMixin.tearDown(self) + + def testNoop(self): + pass + + def testOnieInfo(self): + """Verify the ONIE fields.""" + + product_re = re.compile("[a-z]*[a-zA-Z0-9_. -]") + m = product_re.match(self.sys_info.onie_info.product_name) + if m is None: + raise AssertionError("invalid product name %s" + % repr(self.sys_info.onie_info.product_name)) + + vendor_re = re.compile("[A-Z][a-z]*[a-zA-Z0-9_. -]") + m = vendor_re.match(self.sys_info.onie_info.vendor) + if m is None: + raise AssertionError("invalid vendor %s" + % repr(self.sys_info.onie_info.vendor)) + + self.assertIn('.', self.sys_info.onie_info.onie_version) + + # see if there are any vendor extensions + # if there are any, make sure the are well-formed + for vx in self.sys_info.onie_info.vx_list: + sz = vx.size + self.assertLessEqual(sz, 256) + + def testPlatformInfo(self): + """Verify the platform info fields.""" + # XXX VM platforms have null for both + pass + + def testSysHeaderOnie(self): + """Test the sys_hdr data that is in the sys_info.""" + self.auditOidHdr(self.sys_info.hdr) + + def testSysHeader(self): + """Test the sys_hdr data available via sys_hdr_get.""" + + hdr = onlp.onlp.onlp_oid_hdr() + libonlp.onlp_sys_hdr_get(ctypes.byref(hdr)) + self.auditOidHdr(hdr) + + def testManage(self): + """Verify we can start/stop platform management.""" + + try: + + subprocess.check_call(('service', 'onlpd', 'stop',)) + + code = libonlp.onlp_sys_platform_manage_init() + self.assertGreaterEqual(code, 0) + + code = libonlp.onlp_sys_platform_manage_start(0) + self.assertGreaterEqual(code, 0) + + for i in range(10): + libonlp.onlp_sys_platform_manage_now() + time.sleep(0.25) + + code = libonlp.onlp_sys_platform_manage_stop(0) + self.assertGreaterEqual(code, 0) + + time.sleep(2.0) + + code = libonlp.onlp_sys_platform_manage_join(0) + self.assertGreaterEqual(code, 0) + + finally: + subprocess.check_call(('service', 'onlpd', 'start',)) + + def testSysDump(self): + """Test the SYS OID debug dump.""" + + oid = onlp.onlp.ONLP_OID_SYS + flags = 0 + libonlp.onlp_sys_dump(oid, self.aim_pvs_buffer_p, flags) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + bufStr = buf.string_at() + self.assertIn("System Information", bufStr) + + libonlp.aim_pvs_buffer_reset(self.aim_pvs_buffer_p) + + # this is not the system OID + + oid = self.sys_info.hdr.coids[0] + libonlp.onlp_sys_dump(oid, self.aim_pvs_buffer_p, flags) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + bufStr = buf.string_at() + self.assertIsNone(bufStr) + + def testSysShow(self): + """Test the OID status.""" + + oid = onlp.onlp.ONLP_OID_SYS + flags = 0 + libonlp.onlp_sys_show(oid, self.aim_pvs_buffer_p, flags) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + bufStr = buf.string_at() + self.assertIn("System Information", bufStr) + + libonlp.aim_pvs_buffer_reset(self.aim_pvs_buffer_p) + + # this is not the system OID + + oid = self.sys_info.hdr.coids[0] + libonlp.onlp_sys_show(oid, self.aim_pvs_buffer_p, flags) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + bufStr = buf.string_at() + self.assertIsNone(bufStr) + + def testSysIoctl(self): + """Test the IOCTL interface.""" + + # no such ioctl + + code = libonlp.onlp_sys_ioctl(9999) + self.assertEqual(onlp.onlp.ONLP_STATUS.E_UNSUPPORTED, code) + +class OidIterator(object): + + def __init__(self, log): + self.log = log or logging.getLogger("visit") + + def visit(self, oid, cookie): + return onlp.onlp.ONLP_STATUS.OK + + def cvisit(self): + def _v(oid, cookie): + try: + return self.visit(oid, cookie) + except: + self.log.exception("visitor failed") + return onlp.onlp.ONLP_STATUS.E_GENERIC + return onlp.onlp.onlp_oid_iterate_f(_v) + +class OidTest(OnlpTestMixin, + SysHdrMixin, + unittest.TestCase): + """Test interfaces in onlp/oids.h.""" + + def setUp(self): + OnlpTestMixin.setUp(self) + + self.hdr = onlp.onlp.onlp_oid_hdr() + libonlp.onlp_oid_hdr_get(onlp.onlp.ONLP_OID_SYS, ctypes.byref(self.hdr)) + + def tearDown(self): + OnlpTestMixin.tearDown(self) + + def testSystemOid(self): + """Audit the system oid.""" + self.auditOidHdr(self.hdr) + + def testOidIter(self): + """Test the oid iteration functions.""" + + class V1(OidIterator): + + def __init__(self, cookie, log): + super(V1, self).__init__(log) + self.cookie = cookie + self.oids = [] + + def visit(self, oid, cookie): + if cookie != self.cookie: + raise AssertionError("invalid cookie") + self.log.info("found oid %d", oid) + self.oids.append(oid) + return onlp.onlp.ONLP_STATUS.OK + + oidType = 0 + cookie = 0xdeadbeef + + # gather all OIDs + + v1 = V1(cookie, log=self.log.getChild("v1")) + libonlp.onlp_oid_iterate(onlp.onlp.ONLP_OID_SYS, oidType, v1.cvisit(), cookie) + self.assert_(v1.oids) + oids = list(v1.oids) + + # filter based on OID type + + oidType = onlp.onlp.ONLP_OID_TYPE.PSU + v1b = V1(cookie, log=self.log.getChild("v1b")) + libonlp.onlp_oid_iterate(onlp.onlp.ONLP_OID_SYS, oidType, v1b.cvisit(), cookie) + self.assert_(v1b.oids) + self.assertLess(len(v1b.oids), len(oids)) + + # validate error recovery + + oidType = 0 + v2 = V1(cookie+1, log=self.log.getChild("v2")) + libonlp.onlp_oid_iterate(onlp.onlp.ONLP_OID_SYS, oidType, v2.cvisit(), cookie) + self.assertEqual([], v2.oids) + + # validate early exit + + class V3(OidIterator): + + def __init__(self, log): + super(V3, self).__init__(log) + self.oids = [] + + def visit(self, oid, cookie): + if oid == cookie: + return onlp.onlp.ONLP_STATUS.E_GENERIC + self.log.info("found oid %d", oid) + self.oids.append(oid) + return onlp.onlp.ONLP_STATUS.OK + + v3 = V3(log=self.log.getChild("v3")) + cookie = oids[4] + libonlp.onlp_oid_iterate(onlp.onlp.ONLP_OID_SYS, oidType, v3.cvisit(), cookie) + self.assertEqual(4, len(v3.oids)) + + def testOidDump(self): + oid = self.hdr.coids[0] + flags = 0 + libonlp.onlp_oid_dump(oid, self.aim_pvs_buffer_p, flags) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + self.assertIn("Description:", buf.string_at()) + + def testOidTableDump(self): + tbl = self.hdr.coids + flags = 0 + libonlp.onlp_oid_table_dump(tbl, self.aim_pvs_buffer_p, flags) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + lines = buf.string_at().splitlines(False) + lines = [x for x in lines if 'Description' in x] + self.assertGreater(len(lines), 1) + + def testOidShow(self): + oid = self.hdr.coids[0] + flags = 0 + libonlp.onlp_oid_show(oid, self.aim_pvs_buffer_p, flags) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + self.assertIn("Description:", buf.string_at()) + + def testOidTableShow(self): + tbl = self.hdr.coids + flags = 0 + libonlp.onlp_oid_table_show(tbl, self.aim_pvs_buffer_p, flags) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + lines = buf.string_at().splitlines(False) + lines = [x for x in lines if 'Description' in x] + self.assertGreater(len(lines), 1) + +class FanTest(OnlpTestMixin, + unittest.TestCase): + """Test interfaces in onlp/fan.h.""" + + FAN_MODE_VALID = False + # 'fan mode' configuration is not implemented + + def setUp(self): + OnlpTestMixin.setUp(self) + + libonlp.onlp_sys_init() + libonlp.onlp_fan_init() + + def tearDown(self): + OnlpTestMixin.tearDown(self) + + def auditFanOid(self, oid): + """Test the power-on behavior of a fan.""" + + hdr = onlp.onlp.onlp_oid_hdr() + libonlp.onlp_fan_hdr_get(oid, ctypes.byref(hdr)) + self.assertEqual(oid, hdr._id) + + fan = onlp.onlp.onlp_fan_info() + libonlp.onlp_fan_info_get(oid, ctypes.byref(fan)) + + self.assertEqual(oid, fan.hdr._id) + self.assert_(fan.model) + self.assert_(fan.serial) + self.log.info("auditing fan %d: %s (S/N %s)", + oid & 0xFFFFFF, + fan.model, fan.serial) + + if fan.caps & onlp.onlp.ONLP_FAN_CAPS.B2F: + pass + elif fan.caps & onlp.onlp.ONLP_FAN_CAPS.F2B: + pass + else: + raise AssertionError("invalid fan caps") + + if fan.caps & onlp.onlp.ONLP_FAN_CAPS.GET_RPM: + self.assertGreater(fan.rpm, 0) + else: + self.log.warn("fan does not support RPM get") + + if fan.caps & onlp.onlp.ONLP_FAN_CAPS.GET_PERCENTAGE: + self.assertGreater(fan.percentage, 0) + self.assertLessEqual(fan.percentage, 100) + else: + self.log.warn("fan does not support PCT get") + + if self.FAN_MODE_VALID: + self.assertNotEqual(onlp.onlp.ONLP_FAN_MODE.OFF, fan.mode) + # default, fan should be running + + self.assert_(onlp.onlp.ONLP_FAN_STATUS.PRESENT & fan.status) + # default, fan should be present + + if fan.status & onlp.onlp.ONLP_FAN_STATUS.B2F: + self.assert_(onlp.onlp.ONLP_ONLP_FAN_CAPS.B2F) + elif fan.status & onlp.onlp.ONLP_FAN_STATUS.F2B: + self.assert_(onlp.onlp.ONLP_FAN_CAPS.F2B) + else: + self.log.warn("fan direction not supported") + + # retrieve fan status separately + sts = ctypes.c_uint() + libonlp.onlp_fan_status_get(oid, ctypes.byref(sts)) + self.assert_(onlp.onlp.ONLP_FAN_STATUS.PRESENT & sts.value) + + # try to manipulate the fan speed + if fan.caps & onlp.onlp.ONLP_FAN_CAPS.SET_RPM: + self.auditFanRpm(oid) + if fan.caps & onlp.onlp.ONLP_FAN_CAPS.SET_PERCENTAGE: + self.auditFanPct(oid) + if (self.FAN_MODE_VALID + and (fan.caps & onlp.onlp.ONLP_FAN_CAPS.GET_RPM + or fan.caps & onlp.onlp.ONLP_FAN_CAPS.GET_PERCENTAGE)): + self.auditFanMode(oid) + if (fan.caps & onlp.onlp.ONLP_FAN_CAPS.F2B + and fan.caps & onlp.onlp.ONLP_FAN_CAPS.B2F): + self.auditFanDir(oid) + + flags = 0 + libonlp.onlp_fan_dump(oid, self.aim_pvs_buffer_p, flags) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + bufStr = buf.string_at() + self.assertIn("Fan", bufStr) + + libonlp.onlp_fan_show(oid, self.aim_pvs_buffer_p, flags) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + bufStr = buf.string_at() + self.assertIn("Fan", bufStr) + + def auditFanRpm(self, oid): + """Try to adjust the fan RPM. + + Note that the maximum fan speed is not know ahead of time. + Also note the mechanicals here: + - fan spin-up takes several seconds + - fan spin-down takes much longer than spin-up + - actual target fan speeds are set by the driver + - for safety reasons there may not be an 'off' setting + """ + + fan = onlp.onlp.onlp_fan_info() + libonlp.onlp_fan_info_get(oid, ctypes.byref(fan)) + if self.FAN_MODE_VALID: + self.assertEqual(fan.mode, onlp.onlp.ONLP_FAN_MODE.MAX) + minRpm = maxRpm = curRpm = fan.rpm + + speeds = [] + pcts = [] + try: + subprocess.check_call(('service', 'onlpd', 'stop',)) + + self.log.info("probing for max fan speed") + nspeed = curRpm + while True: + self.log.info("current fan rpm is %d", nspeed) + self.log.info("trying higher fan rpm is %d", nspeed * 2) + libonlp.onlp_fan_rpm_set(oid, nspeed * 2) + time.sleep(5.0) + libonlp.onlp_fan_info_get(oid, ctypes.byref(fan)) + self.log.info("probed fan speed is %d", fan.rpm) + if fan.rpm > (nspeed * 125 // 100): + nspeed = fan.rpm + continue + + self.log.info("max fan speed is %d", fan.rpm) + maxRpm = fan.rpm + break + + self.log.info("probing for min fan speed") + nspeed = curRpm + while True: + self.log.info("setting fan rpm to %d", nspeed) + self.log.info("trying lower fan rpm is %d", nspeed // 2) + libonlp.onlp_fan_rpm_set(oid, nspeed // 2) + + time.sleep(10.0) + # spin-down is slower than spin-up + + libonlp.onlp_fan_info_get(oid, ctypes.byref(fan)) + self.log.info("probed fan speed is %d", fan.rpm) + if fan.rpm < (nspeed * 75 // 100): + nspeed = fan.rpm + continue + + self.log.info("min fan speed is %d", fan.rpm) + minRpm = fan.rpm + break + + self.assertLess(minRpm, maxRpm) + + self.log.info("cycling through fan speeds") + for nspeed in range(minRpm, maxRpm, (maxRpm-minRpm)//3): + self.log.info("setting fan rpm to %d", nspeed) + libonlp.onlp_fan_rpm_set(oid, nspeed) + time.sleep(5.0) + libonlp.onlp_fan_info_get(oid, ctypes.byref(fan)) + speeds.append(fan.rpm) + pcts.append(fan.percentage) + + finally: + libonlp.onlp_fan_rpm_set(oid, curRpm) + libonlp.onlp_fan_mode_set(oid, onlp.onlp.ONLP_FAN_MODE.MAX) + subprocess.check_call(('service', 'onlpd', 'start',)) + + # fan speeds should be monotonically increasing + if fan.caps & onlp.onlp.ONLP_FAN_CAPS.GET_RPM: + self.assertEqual(speeds, sorted(speeds)) + self.assertLess(minRpm * 95 // 100, speeds[0]) + self.assertGreater(maxRpm * 105 // 100, speeds[-1]) + if fan.caps & onlp.onlp.ONLP_FAN_CAPS.GET_PERCENTAGE: + self.assertEqual(pcts, sorted(pcts)) + ##self.assertEqual(0, pcts[0]) + ##self.assertGreater(105, pcts[-1]) + + def auditFanPct(self, oid): + """Try to adjust the fan percentage.""" + + fan = onlp.onlp.onlp_fan_info() + libonlp.onlp_fan_info_get(oid, ctypes.byref(fan)) + if self.FAN_MODE_VALID: + self.assertEqual(fan.mode, onlp.onlp.ONLP_FAN_MODE.MAX) + + speeds = [] + pcts = [] + try: + subprocess.check_call(('service', 'onlpd', 'stop',)) + + libonlp.onlp_fan_percentage_set(oid, 0) + time.sleep(10.0) + # initially spin down the fan + + for npct in [0, 33, 66, 100,]: + self.log.info("setting fan percentage to %d", npct) + libonlp.onlp_fan_percentage_set(oid, npct) + time.sleep(5.0) + libonlp.onlp_fan_info_get(oid, ctypes.byref(fan)) + speeds.append(fan.rpm) + pcts.append(fan.percentage) + finally: + libonlp.onlp_fan_percentage_set(oid, 100) + libonlp.onlp_fan_mode_set(oid, onlp.onlp.ONLP_FAN_MODE.MAX) + subprocess.check_call(('service', 'onlpd', 'start',)) + + # fan speeds should be monotonically increasing + if fan.caps & onlp.onlp.ONLP_FAN_CAPS.GET_RPM: + self.assertEqual(speeds, sorted(speeds)) + if fan.caps & onlp.onlp.ONLP_FAN_CAPS.GET_PERCENTAGE: + self.assertEqual(pcts, sorted(pcts)) + + def auditFanDir(self, oid): + """Try to adjust the fan direction.""" + unittest.skip("not implemented") + + def auditFanMode(self, oid): + """Try to adjust the fan speed using the mode specifier.""" + + fan = onlp.onlp.onlp_fan_info() + libonlp.onlp_fan_info_get(oid, ctypes.byref(fan)) + self.assertEqual(fan.mode, onlp.onlp.ONLP_FAN_MODE.MAX) + + speeds = [] + pcts = [] + try: + subprocess.check_call(('service', 'onlpd', 'stop',)) + for nmode in [onlp.onlp.ONLP_FAN_MODE.OFF, + onlp.onlp.ONLP_FAN_MODE.SLOW, + onlp.onlp.ONLP_FAN_MODE.NORMAL, + onlp.onlp.ONLP_FAN_MODE.FAST, + onlp.onlp.ONLP_FAN_MODE.MAX,]: + self.log.info("setting fan mode to %s", onlp.onlp.ONLP_FAN_MODE.name(nmode)) + libonlp.onlp_fan_mode_set(oid, nmode) + time.sleep(2.0) + libonlp.onlp_fan_info_get(oid, ctypes.byref(fan)) + speeds.append(fan.rpm) + pcts.append(fan.percentage) + finally: + libonlp.onlp_fan_mode_set(oid, onlp.onlp.ONLP_FAN_MODE.MAX) + subprocess.check_call(('service', 'onlpd', 'start',)) + + # fan speeds should be monotonically increasing + if fan.caps & onlp.onlp.ONLP_FAN_CAPS.GET_RPM: + self.assertEqual(speeds, sorted(speeds)) + self.assertEqual(0, speeds[0]) + self.assertGreater(105*maxRpm//100, speeds[-1]) + if fan.caps & onlp.onlp.ONLP_FAN_CAPS.GET_PERCENTAGE: + self.assertEqual(pcts, sorted(pcts)) + self.assertEqual(0, pcts[0]) + self.assertGreater(105, pcts[-1]) + + def testFindFans(self): + """Verify that the system has fans.""" + + class V(OidIterator): + + def __init__(self, log): + super(V, self).__init__(log) + self.oids = [] + + def visit(self, oid, cookie): + self.log.info("found FAN oid %d", oid) + self.oids.append(oid) + return onlp.onlp.ONLP_STATUS.OK + + v = V(log=self.log.getChild("fan")) + libonlp.onlp_oid_iterate(onlp.onlp.ONLP_OID_SYS, + onlp.onlp.ONLP_OID_TYPE.FAN, + v.cvisit(), 0) + self.assert_(v.oids) + + self.auditFanOid(v.oids[0]) + +class LedTest(OnlpTestMixin, + unittest.TestCase): + """Test interfaces in onlp/led.h. + + XXX roth -- need to flesh this out using a physical device. + """ + + def setUp(self): + OnlpTestMixin.setUp(self) + + libonlp.onlp_sys_init() + libonlp.onlp_led_init() + + def tearDown(self): + OnlpTestMixin.tearDown(self) + + def testFindLeds(self): + """Verify that the system has LEDs.""" + + class V(OidIterator): + + def __init__(self, log): + super(V, self).__init__(log) + self.oids = [] + + def visit(self, oid, cookie): + self.log.info("found LED oid %d", oid) + self.oids.append(oid) + return onlp.onlp.ONLP_STATUS.OK + + v = V(log=self.log.getChild("led")) + libonlp.onlp_oid_iterate(onlp.onlp.ONLP_OID_SYS, + onlp.onlp.ONLP_OID_TYPE.LED, + v.cvisit(), 0) + self.assert_(v.oids) + + self.auditLedOid(v.oids[0]) + + def auditLedOid(self, oid): + + hdr = onlp.onlp.onlp_oid_hdr() + libonlp.onlp_led_hdr_get(oid, ctypes.byref(hdr)) + self.assertEqual(oid, hdr._id) + + led = onlp.onlp.onlp_led_info() + libonlp.onlp_led_info_get(oid, ctypes.byref(led)) + + self.assertEqual(oid, led.hdr._id) + + self.assert_(led.caps) + # should support some non-empty set of capabilities + + self.log.info("auditing led %d", + oid & 0xFFFFFF) + + self.assert_(led.status & onlp.onlp.ONLP_LED_STATUS.PRESENT) + + # retrieve led status separately + sts = ctypes.c_uint() + libonlp.onlp_led_status_get(oid, ctypes.byref(sts)) + self.assert_(onlp.onlp.ONLP_LED_STATUS.PRESENT & sts.value) + + try: + subprocess.check_call(('service', 'onlpd', 'stop',)) + + if led.caps & onlp.onlp.ONLP_LED_CAPS.CHAR: + self.auditLedChar(oid) + if (led.caps & onlp.onlp.ONLP_LED_CAPS.ON_OFF + and not self.hasColors(led.caps)): + self.auditLedOnOff(oid) + self.auditLedColors(oid) + self.auditLedBlink(oid) + + finally: + subprocess.check_call(('service', 'onlpd', 'start',)) + + + flags = 0 + libonlp.onlp_led_dump(oid, self.aim_pvs_buffer_p, flags) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + bufStr = buf.string_at() + self.assertIn("led @", bufStr) + + libonlp.onlp_led_show(oid, self.aim_pvs_buffer_p, flags) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + bufStr = buf.string_at() + self.assertIn("led @", bufStr) + + def hasColors(self, caps): + """True if this a color LED.""" + + if caps & onlp.onlp.ONLP_LED_CAPS.RED: + return True + if caps & onlp.onlp.ONLP_LED_CAPS.RED_BLINKING: + return True + if caps & onlp.onlp.ONLP_LED_CAPS.ORANGE: + return True + if caps & onlp.onlp.ONLP_LED_CAPS.ORANGE_BLINKING: + return True + if caps & onlp.onlp.ONLP_LED_CAPS.YELLOW: + return True + if caps & onlp.onlp.ONLP_LED_CAPS.YELLOW_BLINKING: + return True + if caps & onlp.onlp.ONLP_LED_CAPS.GREEN: + return True + if caps & onlp.onlp.ONLP_LED_CAPS.GREEN_BLINKING: + return True + if caps & onlp.onlp.ONLP_LED_CAPS.BLUE: + return True + if caps & onlp.onlp.ONLP_LED_CAPS.BLUE_BLINKING: + return True + if caps & onlp.onlp.ONLP_LED_CAPS.PURPLE: + return True + if caps & onlp.onlp.ONLP_LED_CAPS.PURPLE_BLINKING: + return True + + return False + + def auditLedChar(self, oid): + + led = onlp.onlp.onlp_led_info() + libonlp.onlp_led_info_get(oid, ctypes.byref(led)) + saveChar = led.char + + try: + for nchar in ['0', '1', '2', '3',]: + self.log.info("led %d: char '%s'", oid, nchar) + + sts = libonlp.onlp_led_char_set(oid, nchar) + self.assertStatusOK(sts) + + time.sleep(1.0) + + libonlp.onlp_led_info_get(oid, ctypes.byref(led)) + self.assertEqual(nchar, led.char) + finally: + libonlp.onlp_led_char_set(oid, saveChar) + + def auditLedOnOff(self, oid): + + led = onlp.onlp.onlp_led_info() + libonlp.onlp_led_info_get(oid, ctypes.byref(led)) + saveMode = led.mode + + if saveMode == onlp.onlp.ONLP_LED_MODE.OFF: + pass + elif saveMode == onlp.onlp.ONLP_LED_MODE.ON: + pass + else: + self.log.warn("invalid LED on/off mode %s", + onlp.onlp.ONLP_LED_MODE.name(saveMode)) + + try: + for i in range(4): + self.log.info("led %d: on", oid) + + sts = libonlp.onlp_led_set(oid, 1) + self.assertStatusOK(sts) + + time.sleep(1.0) + + libonlp.onlp_led_info_get(oid, ctypes.byref(led)) + self.assertEqual(onlp.onlp.ONLP_LED_MODE.ON, led.mode) + + sts = libonlp.onlp_led_get(oid, 0) + self.assertStatusOK(sts) + + time.sleep(1.0) + + libonlp.onlp_led_info_get(oid, ctypes.byref(led)) + self.assertEqual(onlp.onlp.ONLP_LED_MODE.OFF, led.mode) + + finally: + if saveMode == onlp.onlp.ONLP_LED_MODE.OFF: + libonlp.onlp_led_set(oid, 0) + else: + libonlp.onlp_led_set(oid, 1) + + def auditLedColors(self, oid): + + led = onlp.onlp.onlp_led_info() + libonlp.onlp_led_info_get(oid, ctypes.byref(led)) + saveMode = led.mode + + allModes = [] + if led.caps & onlp.onlp.ONLP_LED_CAPS.RED: + allModes.append(onlp.onlp.ONLP_LED_MODE.RED) + if led.caps & onlp.onlp.ONLP_LED_CAPS.ORANGE: + allModes.append(onlp.onlp.ONLP_LED_MODE.ORANGE) + if led.caps & onlp.onlp.ONLP_LED_CAPS.YELLOW: + allModes.append(onlp.onlp.ONLP_LED_MODE.YELLOW) + if led.caps & onlp.onlp.ONLP_LED_CAPS.GREEN: + allModes.append(onlp.onlp.ONLP_LED_MODE.GREEN) + if led.caps & onlp.onlp.ONLP_LED_CAPS.BLUE: + allModes.append(onlp.onlp.ONLP_LED_MODE.BLUE) + if led.caps & onlp.onlp.ONLP_LED_CAPS.PURPLE: + allModes.append(onlp.onlp.ONLP_LED_MODE.PURPLE) + if led.caps & onlp.onlp.ONLP_LED_CAPS.AUTO: + allModes.append(onlp.onlp.ONLP_LED_MODE.AUTO) + + if not allModes: + unittest.skip("colors not supported") + return + self.log.info("found %d supported colors", len(allModes)) + + try: + for ncolor in allModes: + self.log.info("led %d: color '%s'", oid, onlp.onlp.ONLP_LED_MODE.name(ncolor)) + + sts = libonlp.onlp_led_mode_set(oid, ncolor) + self.assertStatusOK(sts) + + time.sleep(1.0) + + libonlp.onlp_led_info_get(oid, ctypes.byref(led)) + self.assertEqual(ncolor, led.mode) + + self.log.info("led %d: OFF", oid) + + sts = libonlp.onlp_led_mode_set(oid, onlp.onlp.ONLP_LED_MODE.OFF) + self.assertStatusOK(sts) + + time.sleep(1.0) + + libonlp.onlp_led_info_get(oid, ctypes.byref(led)) + self.assertEqual(onlp.onlp.ONLP_LED_MODE.OFF, led.mode) + + finally: + libonlp.onlp_led_mode_set(oid, saveMode) + + def auditLedBlink(self, oid): + + led = onlp.onlp.onlp_led_info() + libonlp.onlp_led_info_get(oid, ctypes.byref(led)) + saveMode = led.mode + + allModes = [] + if led.caps & onlp.onlp.ONLP_LED_CAPS.RED_BLINKING: + allModes.append(onlp.onlp.ONLP_LED_MODE.RED_BLINKING) + if led.caps & onlp.onlp.ONLP_LED_CAPS.ORANGE_BLINKING: + allModes.append(onlp.onlp.ONLP_LED_MODE.ORANGE_BLINKING) + if led.caps & onlp.onlp.ONLP_LED_CAPS.YELLOW_BLINKING: + allModes.append(onlp.onlp.ONLP_LED_MODE.YELLOW_BLINKING) + if led.caps & onlp.onlp.ONLP_LED_CAPS.GREEN_BLINKING: + allModes.append(onlp.onlp.ONLP_LED_MODE.GREEN_BLINKING) + if led.caps & onlp.onlp.ONLP_LED_CAPS.BLUE_BLINKING: + allModes.append(onlp.onlp.ONLP_LED_MODE.BLUE_BLINKING) + if led.caps & onlp.onlp.ONLP_LED_CAPS.PURPLE_BLINKING: + allModes.append(onlp.onlp.ONLP_LED_MODE.PURPLE_BLINKING) + if led.caps & onlp.onlp.ONLP_LED_CAPS.AUTO_BLINKING: + allModes.append(onlp.onlp.ONLP_LED_MODE.AUTO_BLINKING) + + if not allModes: + unittest.skip("blinking colors not supported") + return + self.log.info("found %d supported blink colors", len(allModes)) + + try: + for ncolor in allModes: + self.log.info("led %d: blink color '%s'", oid, onlp.onlp.ONLP_LED_MODE.name(ncolor)) + + sts = libonlp.onlp_led_mode_set(oid, ncolor) + self.assertStatusOK(sts) + + time.sleep(1.0) + + libonlp.onlp_led_info_get(oid, ctypes.byref(led)) + self.assertEqual(ncolor, led.mode) + + self.log.info("led %d: OFF", oid) + + sts = libonlp.onlp_led_mode_set(oid, onlp.onlp.ONLP_LED_MODE.OFF) + self.assertStatusOK(sts) + + time.sleep(1.0) + + libonlp.onlp_led_info_get(oid, ctypes.byref(led)) + self.assertEqual(onlp.onlp.ONLP_LED_MODE.OFF, led.mode) + + finally: + libonlp.onlp_led_mode_set(oid, saveMode) + +class ConfigTest(OnlpTestMixin, + unittest.TestCase): + """Test interfaces in onlp/onlp_config.h.""" + + def setUp(self): + OnlpTestMixin.setUp(self) + + def tearDown(self): + OnlpTestMixin.tearDown(self) + + def testConfig(self): + + s = libonlp.onlp_config_lookup("ONLP_CONFIG_INFO_STR_MAX") + self.assertEqual('64', s) + + s = libonlp.onlp_config_lookup("foo") + self.assertIsNone(s) + + def testConfigShow(self): + + libonlp.onlp_config_show(self.aim_pvs_buffer_p) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + self.assertIn("ONLP_CONFIG_INFO_STR_MAX = 64\n", buf.string_at()) + +class ThermalTest(OnlpTestMixin, + unittest.TestCase): + """Test interfaces in onlp/thermal.h.""" + + def setUp(self): + OnlpTestMixin.setUp(self) + + libonlp.onlp_thermal_init() + + def tearDown(self): + OnlpTestMixin.tearDown(self) + + def testFindThermal(self): + + class V(OidIterator): + + def __init__(self, log): + super(V, self).__init__(log) + self.oids = [] + + def visit(self, oid, cookie): + self.log.info("found thermal oid %d", oid) + self.oids.append(oid) + return onlp.onlp.ONLP_STATUS.OK + + v = V(log=self.log.getChild("thermal")) + libonlp.onlp_oid_iterate(onlp.onlp.ONLP_OID_SYS, + onlp.onlp.ONLP_OID_TYPE.THERMAL, + v.cvisit(), 0) + self.assert_(v.oids) + + self.auditThermalOid(v.oids[0]) + + def auditThermalOid(self, oid): + + hdr = onlp.onlp.onlp_oid_hdr() + libonlp.onlp_thermal_hdr_get(oid, ctypes.byref(hdr)) + self.assertEqual(oid, hdr._id) + + thm = onlp.onlp.onlp_thermal_info() + libonlp.onlp_thermal_info_get(oid, ctypes.byref(thm)) + + self.assertEqual(oid, thm.hdr._id) + + self.assert_(thm.caps) + # should support some non-empty set of capabilities + + self.assert_(thm.caps & onlp.onlp.ONLP_THERMAL_CAPS.GET_TEMPERATURE) + # sensor should at least report temperature + + self.log.info("auditing thermal %d", + oid & 0xFFFFFF) + + self.assert_(thm.status & onlp.onlp.ONLP_THERMAL_STATUS.PRESENT) + # sensor should be present + + self.assertGreater(thm.mcelcius, 20000) + self.assertLess(thm.mcelcius, 35000) + # temperature should be non-crazy + + # retrieve thermal status separately + sts = ctypes.c_uint() + libonlp.onlp_thermal_status_get(oid, ctypes.byref(sts)) + self.assert_(onlp.onlp.ONLP_THERMAL_STATUS.PRESENT & sts.value) + + # test ioctl + code = libonlp.onlp_thermal_ioctl(9999) + self.assertEqual(onlp.onlp.ONLP_STATUS.E_UNSUPPORTED, code) + + flags = 0 + libonlp.onlp_thermal_dump(oid, self.aim_pvs_buffer_p, flags) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + bufStr = buf.string_at() + self.assertIn("thermal @", bufStr) + + libonlp.onlp_thermal_show(oid, self.aim_pvs_buffer_p, flags) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + bufStr = buf.string_at() + self.assertIn("thermal @", bufStr) + +class PsuTest(OnlpTestMixin, + unittest.TestCase): + """Test interfaces in onlp/psu.h.""" + + def setUp(self): + OnlpTestMixin.setUp(self) + + libonlp.onlp_psu_init() + + def tearDown(self): + OnlpTestMixin.tearDown(self) + + def testFindPsu(self): + + class V(OidIterator): + + def __init__(self, log): + super(V, self).__init__(log) + self.oids = [] + + def visit(self, oid, cookie): + self.log.info("found psu oid %d", oid) + self.oids.append(oid) + return onlp.onlp.ONLP_STATUS.OK + + v = V(log=self.log.getChild("psu")) + libonlp.onlp_oid_iterate(onlp.onlp.ONLP_OID_SYS, + onlp.onlp.ONLP_OID_TYPE.PSU, + v.cvisit(), 0) + self.assert_(v.oids) + + self.auditPsuOid(v.oids[0]) + + def auditPsuOid(self, oid): + + hdr = onlp.onlp.onlp_oid_hdr() + libonlp.onlp_psu_hdr_get(oid, ctypes.byref(hdr)) + self.assertEqual(oid, hdr._id) + + psu = onlp.onlp.onlp_psu_info() + libonlp.onlp_psu_info_get(oid, ctypes.byref(psu)) + + self.assertEqual(oid, psu.hdr._id) + + self.assert_(psu.caps + & (onlp.onlp.ONLP_PSU_CAPS.AC + | onlp.onlp.ONLP_PSU_CAPS.DC12 + | onlp.onlp.ONLP_PSU_CAPS.DC48)) + # should support some non-empty set of capabilities + + self.log.info("auditing psu %d", + oid & 0xFFFFFF) + + self.assert_(psu.status & onlp.onlp.ONLP_PSU_STATUS.PRESENT) + # sensor should be present + + if (psu.caps + & onlp.onlp.ONLP_PSU_CAPS.AC + & onlp.onlp.ONLP_PSU_CAPS.VOUT): + self.assertGreater(psu.mvout, 100000) + self.assertLess(psu.mvout, 125000) + if (psu.caps + & onlp.onlp.ONLP_PSU_CAPS.DC12 + & onlp.onlp.ONLP_PSU_CAPS.VOUT): + self.assertGreater(psu.mvout, 11000) + self.assertLess(psu.mvout, 13000) + if (psu.caps + & onlp.onlp.ONLP_PSU_CAPS.DC48 + & onlp.onlp.ONLP_PSU_CAPS.VOUT): + self.assertGreater(psu.mvout, 47000) + self.assertLess(psu.mvout, 49000) + # output voltage should be non-crazy + + # retrieve psu status separately + sts = ctypes.c_uint() + libonlp.onlp_psu_status_get(oid, ctypes.byref(sts)) + self.assert_(onlp.onlp.ONLP_PSU_STATUS.PRESENT & sts.value) + + # test ioctl + code = libonlp.onlp_psu_ioctl(9999) + self.assertEqual(onlp.onlp.ONLP_STATUS.E_UNSUPPORTED, code) + + flags = 0 + libonlp.onlp_psu_dump(oid, self.aim_pvs_buffer_p, flags) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + bufStr = buf.string_at() + self.assertIn("psu @", bufStr) + + libonlp.onlp_psu_show(oid, self.aim_pvs_buffer_p, flags) + buf = libonlp.aim_pvs_buffer_get(self.aim_pvs_buffer_p) + bufStr = buf.string_at() + self.assertIn("psu @", bufStr) + +class Eeprom(ctypes.Structure): + _fields_ = [('eeprom', ctypes.c_ubyte * 256,),] + +class SfpTest(OnlpTestMixin, + unittest.TestCase): + """Test interfaces in onlp/sfp.h.""" + + def setUp(self): + OnlpTestMixin.setUp(self) + + libonlp.onlp_sfp_init() + + self.bitmap = onlp.onlp.aim_bitmap256() + + def tearDown(self): + OnlpTestMixin.tearDown(self) + + libonlp.onlp_sfp_denit() + + def bitmap2list(self, bitmap=None): + outBits = [] + bitmap = bitmap or self.bitmap + for pos in range(256): + outBits.append(onlp.onlp.aim_bitmap_get(bitmap.hdr, pos)) + return outBits + + def testBitmap(self): + """Verify that our aim_bitmap implementation is sound.""" + + refBits = [] + for pos in range(256): + val = random.randint(0, 1) + refBits.append(val) + onlp.onlp.aim_bitmap_mod(self.bitmap.hdr, pos, val) + + for i in range(1000): + pos = random.randint(0, 255) + val = refBits[pos] ^ 1 + refBits[pos] = val + onlp.onlp.aim_bitmap_mod(self.bitmap.hdr, pos, val) + + self.assertEqual(refBits, self.bitmap2list()) + + refBits = [0] * 256 + libonlp.onlp_sfp_bitmap_t_init(ctypes.byref(self.bitmap)) + self.assertEqual(refBits, self.bitmap2list()) + + def testValid(self): + """Test for valid SFP ports.""" + + libonlp.onlp_sfp_bitmap_t_init(ctypes.byref(self.bitmap)) + sts = libonlp.onlp_sfp_bitmap_get(ctypes.byref(self.bitmap)) + self.assertStatusOK(sts) + refBits = [0] * 256 + + ports = [x[0] for x in enumerate(self.bitmap2list()) if x[1]] + self.log.info("found %d SFP ports", len(ports)) + self.assert_(ports) + + self.assertEqual(0, ports[0]) + self.assertEqual(len(ports)-1, ports[-1]) + # make sure the ports are contiguous, starting from 1 + + # make sure the per-port valid bits are correct + for i in range(256): + valid = libonlp.onlp_sfp_port_valid(i) + if i < len(ports): + self.assertEqual(1, valid) + else: + self.assertEqual(0, valid) + + # see if any of them are present + # XXX this test requires at least one of the SFPs to be present. + bm = onlp.onlp.aim_bitmap256() + sts = libonlp.onlp_sfp_presence_bitmap_get(ctypes.byref(bm)) + self.assertStatusOK(sts) + present = [x[0] for x in enumerate(self.bitmap2list(bm)) if x[1]] + self.log.info("found %d SFPs", len(present)) + self.assert_(present) + + presentSet = set(present) + portSet = set(ports) + + for port in presentSet: + if port not in portSet: + raise AssertionError("invalid SFP %d not valid" + % (port,)) + + for i in range(256): + valid = libonlp.onlp_sfp_is_present(i) + if i in presentSet: + self.assertEqual(1, valid) + elif i in portSet: + self.assertEqual(0, valid) + else: + self.assertGreater(0, valid) + + # test the rx_los bitmap + # (tough to be more detailed since it depends on connectivity) + sts = libonlp.onlp_sfp_rx_los_bitmap_get(ctypes.byref(bm)) + if sts != onlp.onlp.ONLP_STATUS.E_UNSUPPORTED: + self.assertStatusOK(sts) + rxLos = [x[0] for x in enumerate(self.bitmap2list(bm)) if x[1]] + + # any port exhibiting rx_los should actually be a port + for i in rxLos: + self.assertIn(i, portSet) + + # any missing SFP should *NOT* be exhibiting rx_los + rxLosSet = set(rxLos) + for i in portSet: + if not i in presentSet: + self.assertNotIn(i, rxLosSet) + + port = ports[0] + + self.auditIoctl(port) + self.auditControl(port) + + eeprom = ctypes.POINTER(ctypes.c_ubyte)() + sts = libonlp.onlp_sfp_eeprom_read(port, ctypes.byref(eeprom)) + self.assertStatusOK(sts) + + try: + + # try to read in the data manually + for i in range(128): + b = libonlp.onlp_sfp_dev_readb(port, 0x50, i) + if b != eeprom[i]: + raise AssertionError("eeprom mismatch at 0x50.%d" % i) + + monType = eeprom[92] & 0x40 + # See e.g. https://www.optcore.net/wp-content/uploads/2017/04/SFF_8472.pdf + + self.auditEeprom(eeprom) + + finally: + ptr = onlp.onlp.aim_void_p(ctypes.cast(eeprom, ctypes.c_void_p).value) + del ptr + + if monType: + + domData = ctypes.POINTER(ctypes.c_ubyte)() + sts = libonlp.onlp_sfp_dom_read(port, ctypes.byref(domData)) + self.assertStatusOK(sts) + + try: + self.auditDom(domData) + finally: + ptr = onlp.onlp.aim_void_p(ctypes.cast(domData, ctypes.c_void_p).value) + del ptr + + def auditEeprom(self, eeprom): + """Audit that the entries for this SFP are valid.""" + + sffEeprom = onlp.sff.sff_eeprom() + sts = libonlp.sff_eeprom_parse(ctypes.byref(sffEeprom), eeprom) + self.assertStatusOK(sts) + + self.assertEqual(1, sffEeprom.identified) + + # XXX info strings include space padding + vendor = sffEeprom.info.vendor.strip() + self.assert_(vendor) + model = sffEeprom.info.model.strip() + self.assert_(model) + serial = sffEeprom.info.serial.strip() + self.assert_(serial) + + self.log.info("found SFP: %s %s (S/N %s)", + vendor, model, serial) + + self.log.info("%s (%s %s)", + sffEeprom.info.module_type_name, + sffEeprom.info.media_type_name, + sffEeprom.info.sfp_type_name) + + sffType = libonlp.sff_sfp_type_get(eeprom) + self.assertEqual(sffType, sffEeprom.info.sfp_type) + + moduleType = libonlp.sff_module_type_get(eeprom) + self.assertEqual(moduleType, sffEeprom.info.module_type) + + mediaType = libonlp.sff_media_type_get(sffEeprom.info.module_type) + self.assertEqual(mediaType, sffEeprom.info.media_type) + + caps = ctypes.c_uint32() + sts = libonlp.sff_module_caps_get(sffEeprom.info.module_type, ctypes.byref(caps)) + self.assertStatusOK(sts) + self.assert_(caps) + cl = [] + for i in range(32): + fl = 1< +class SFF_MEDIA_TYPE(Enumeration): + COPPER = 0 + FIBER = 1 + + +class SFF_MODULE_CAPS(Enumeration): + F_100 = 1 + F_1G = 2 + F_10G = 4 + F_25G = 8 + F_40G = 16 + F_100G = 32 + + +class SFF_MODULE_TYPE(Enumeration): + _100G_AOC = 0 + _100G_BASE_CR4 = 1 + _100G_BASE_SR4 = 2 + _100G_BASE_LR4 = 3 + _100G_CWDM4 = 4 + _100G_PSM4 = 5 + _40G_BASE_CR4 = 6 + _40G_BASE_SR4 = 7 + _40G_BASE_LR4 = 8 + _40G_BASE_LM4 = 9 + _40G_BASE_ACTIVE = 10 + _40G_BASE_CR = 11 + _40G_BASE_SR2 = 12 + _40G_BASE_SM4 = 13 + _40G_BASE_ER4 = 14 + _25G_BASE_CR = 15 + _10G_BASE_SR = 16 + _10G_BASE_LR = 17 + _10G_BASE_LRM = 18 + _10G_BASE_ER = 19 + _10G_BASE_CR = 20 + _10G_BASE_SX = 21 + _10G_BASE_LX = 22 + _10G_BASE_ZR = 23 + _10G_BASE_SRL = 24 + _1G_BASE_SX = 25 + _1G_BASE_LX = 26 + _1G_BASE_CX = 27 + _1G_BASE_T = 28 + _100_BASE_LX = 29 + _100_BASE_FX = 30 + _4X_MUX = 31 + + +class SFF_SFP_TYPE(Enumeration): + SFP = 0 + QSFP = 1 + QSFP_PLUS = 2 + QSFP28 = 3 + +# diff --git a/packages/base/any/onlp/src/sff/module/src/sff_config.c b/packages/base/any/onlp/src/sff/module/src/sff_config.c index 400b7a72..cf770933 100644 --- a/packages/base/any/onlp/src/sff/module/src/sff_config.c +++ b/packages/base/any/onlp/src/sff/module/src/sff_config.c @@ -70,7 +70,7 @@ sff_config_lookup(const char* setting) { int i; for(i = 0; sff_config_settings[i].name; i++) { - if(strcmp(sff_config_settings[i].name, setting)) { + if(!strcmp(sff_config_settings[i].name, setting)) { return sff_config_settings[i].value; } } diff --git a/packages/base/any/oom-shim/src/utest/main.c b/packages/base/any/oom-shim/src/utest/main.c index e7e9e030..7ada8e55 100644 --- a/packages/base/any/oom-shim/src/utest/main.c +++ b/packages/base/any/oom-shim/src/utest/main.c @@ -115,9 +115,9 @@ aim_main(int argc, char* argv[]) /* Example Platform Dump */ onlp_init(); - onlp_platform_dump(&aim_pvs_stdout, ONLP_OID_DUMP_F_RECURSE); + onlp_platform_dump(&aim_pvs_stdout, ONLP_OID_DUMP_RECURSE); onlp_oid_iterate(0, 0, iter__, NULL); - onlp_platform_show(&aim_pvs_stdout, ONLP_OID_SHOW_F_RECURSE|ONLP_OID_SHOW_F_EXTENDED); + onlp_platform_show(&aim_pvs_stdout, ONLP_OID_SHOW_RECURSE|ONLP_OID_SHOW_EXTENDED); if(argv[1] && !strcmp("manage", argv[1])) { onlp_sys_platform_manage_start();