Initial checkin of pythonic installer library

- u-boot is currently broken
This commit is contained in:
Carl D. Roth
2016-05-02 12:42:37 -07:00
parent 58cf4e5036
commit dc0178c984
9 changed files with 3315 additions and 0 deletions

View File

@@ -0,0 +1,234 @@
"""App.py
top-level install app
"""
import subprocess
import sys, os
import logging
import imp
import glob
import distutils.sysconfig
from InstallUtils import InitrdContext
from InstallUtils import SubprocessMixin
import ConfUtils, BaseInstall
# locate the platform-config files using SWI path rules
sys.path.append("/usr/lib/python%s/dist-packages"
% (distutils.sysconfig.get_python_version(),))
import onl.platform.base
import onl.platform.current
class App(SubprocessMixin):
def __init__(self, log=None):
if log is not None:
self.log = log
else:
self.log = logging.getLogger(self.__class__.__name__)
self.installer = None
self.machineConf = None
self.installerConf = None
self.platform = None
def run(self):
self.log.info("getting installer configuration")
self.machineConf = ConfUtils.MachineConf()
self.installerConf = ConfUtils.InstallerConf()
##self.log.info("using native GRUB")
##self.grubEnv = ConfUtils.GrubEnv(log=self.log.getChild("grub"))
pat = "/mnt/onie-boot/onie/initrd.img*"
l = glob.glob(pat)
if l:
initrd = l[0]
self.log.info("using native ONIE initrd+chroot GRUB (%s)", initrd)
initrdDir = InitrdContext.mkChroot(initrd, log=self.log)
self.grubEnv = ConfUtils.ChrootGrubEnv(initrdDir,
bootDir="/mnt/onie-boot",
path="/grub/grubenv",
log=self.log.getChild("grub"))
# direct access using ONIE initrd as a chroot
# (will need to fix up bootDir and bootPart later)
else:
self.log.info("using proxy GRUB")
self.grubEnv = ConfUtils.ProxyGrubEnv(self.installerConf,
bootDir="/mnt/onie-boot",
path="/grub/grubenv",
chroot=False,
log=self.log.getChild("grub"))
# indirect access through chroot host
# (will need to fix up bootDir and bootPart later)
if os.path.exists(ConfUtils.UbootEnv.SETENV):
self.ubootEnv = ConfUtils.UbootEnv(log=self.log.getChild("u-boot"))
else:
self.ubootEnv = None
self.log.info("ONL Installer %s", self.installerConf.onl_version)
code = self.findPlatform()
if code: return code
self.onlPlatform = onl.platform.current.OnlPlatform()
if 'grub' in self.onlPlatform.platform_config:
self.log.info("trying a GRUB based installer")
iklass = BaseInstall.GrubInstaller
elif 'flat_image_tree' in self.onlPlatform.platform_config:
self.log.info("trying a U-Boot based installer")
iklass = BaseInstall.UbootInstaller
else:
self.log.error("cannot detect installer type")
return 1
# run the platform-specific installer
self.installer = iklass(machineConf=self.machineConf,
installerConf=self.installerConf,
platformConf=self.onlPlatform.platform_config,
grubEnv=self.grubEnv,
ubootEnv=self.ubootEnv,
log=self.log)
try:
code = self.installer.run()
except:
self.log.exception("installer failed")
code = 1
if self.log.level < logging.INFO:
self.post_mortem()
if code: return code
if getattr(self.installer, 'grub', False):
code = self.finalizeGrub()
if code: return code
if getattr(self.installer, 'uboot', False):
code = self.finalizeUboot()
if code: return code
self.log.info("Install finished.")
return 0
def findPlatform(self):
plat = getattr(self.machineConf, 'onie_platform', None)
arch = getattr(self.machineConf, 'onie_arch', None)
if plat and arch:
self.log.info("ONL installer running under ONIE.")
plat = plat.replace('_', '-')
self.installerConf.installer_platform = plat
self.installerConf.installer_arch = arch
else:
self.log.error("The installation platform cannot be determined.")
self.log.error("It does not appear that we are running under ONIE or the ONL loader.")
self.log.error("If you know what you are doing you can re-run this installer")
self.log.error("with an explicit 'installer_platform=<platform>' setting,")
self.log.error("though this is unlikely to be the correct procedure at this point.")
return 1
self.log.info("Detected platform %s", self.installerConf.installer_platform)
self.installerConf.installer_platform_dir = ("/lib/platform-config/%s"
% (self.installerConf.installer_platform,))
if not os.path.isdir(self.installerConf.installer_platform_dir):
self.log.error("This installer does not support the %s platform.",
self.installerConf.installer_platform)
self.log.error("Available platforms are:")
for d in os.listdir("/lib/platform-config"):
self.log.error(" %s", d)
self.log.error("Installation cannot continue.")
return 1
return 0
def finalizeGrub(self):
def _m(src, dst):
val = getattr(self.installerConf, src, None)
if val is not None:
setattr(self.grubEnv, dst, val)
else:
delattr(self.grubEnv, dst)
_m('installer_md5', 'onl_installer_md5')
_m('onl_version', 'onl_installer_version')
_m('installer_url', 'onl_installer_url')
return 0
def finalizeUboot(self):
if self.installer.platform.isOnie():
def _m(src, dst):
val = getattr(self.installerConf, src, None)
if val is not None:
setattr(self.ubootEnv, dst, val)
else:
delattr(self.ubootEnv, dst)
_m('installer_md5', 'onl_installer_md5')
_m('onl_version', 'onl_installer_version')
_m('installer_url', 'onl_installer_url')
else:
self.log.info("To configure U-Boot to boot ONL automatically, reboot the switch,")
self.log.info("enter the U-Boot shell, and run these 2 commands:")
self.log.info("=> setenv bootcmd '%s'", self.installer.platform.str_bootcmd())
self.log.info("saveenv")
return 0
def shutdown(self):
installer, self.installer = self.installer, None
if installer is not None:
installer.shutdown()
def post_mortem(self):
self.log.info("re-attaching to tty")
fdno = os.open("/dev/console", os.O_RDWR)
os.dup2(fdno, sys.stdin.fileno())
os.dup2(fdno, sys.stdout.fileno())
os.dup2(fdno, sys.stderr.fileno())
os.close(fdno)
self.log.info("entering Python debugger (installer_debug=1)")
import pdb
pdb.post_mortem(sys.exc_info()[2])
@classmethod
def main(cls):
logging.basicConfig()
logger = logging.getLogger("install")
logger.setLevel(logging.DEBUG)
# send to ONIE log
hnd = logging.FileHandler("/dev/console")
logger.addHandler(hnd)
logger.propagate = False
debug = 'installer_debug' in os.environ
if debug:
logger.setLevel(logging.DEBUG)
app = cls(log=logger)
try:
code = app.run()
except:
logger.exception("runner failed")
code = 1
if debug:
app.post_mortem()
app.shutdown()
sys.exit(code)
main = App.main
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,708 @@
"""BaseInstall.py
Base classes for installers.
"""
import os, stat
import subprocess
import re
import tempfile
import logging
import StringIO
import parted
import yaml
from InstallUtils import MountContext, BlkidParser, PartedParser, SubprocessMixin
import onl.YamlUtils
class Base:
class installmeta:
def __init__(self,
installerConf=None,
machineConf=None,
platformConf=None,
grubEnv=None, ubootEnv=None):
self.installerConf = installerConf
self.machineConf = machineConf
self.platformConf = platformConf
self.grubEnv = grubEnv
self.ubootEnv = ubootEnv
def isOnie(self):
if self.machineConf is None: return False
plat = getattr(self.machineConf, 'onie_platform', None)
return plat is not None
def __init__(self,
machineConf=None, installerConf=None, platformConf=None,
grubEnv=None, ubootEnv=None,
log=None):
self.machineConf = machineConf
self.installerConf = installerConf
self.im = self.installmeta(installerConf=installerConf,
machineConf=machineConf,
platformConf=platformConf,
grubEnv=grubEnv,
ubootEnv = ubootEnv)
self.grubEnv = grubEnv
self.ubootEnv = ubootEnv
self.log = log or logging.getLogger(self.__class__.__name__)
def run(self):
self.log.error("not implemented")
return 1
def shutdown(self):
pass
def installSwi(self):
swis = [x for x in os.listdir(self.installerConf.installer_dir) if x.endswith('.swi')]
if not swis:
self.log.info("No ONL Software Image available for installation.")
self.log.info("Post-install ZTN installation will be required.")
return
if len(swis) > 1:
self.log.warn("Multiple SWIs found in installer: %s", " ".join(swis))
return
base = swis[0]
src = os.path.join(self.installerConf.installer_dir, base)
self.log.info("Installing ONL Software Image (%s)...", base)
dev = self.blkidParts['ONL-IMAGES']
with MountContext(dev.device, log=self.log) as ctx:
dst = os.path.join(ctx.dir, base)
self.copy2(src, dst)
return 0
def backupConfig(self, dev):
"""Back up the ONL-CONFIG partition for later restore."""
self.configArchive = tempfile.mktemp(prefix="onl-config-",
suffix=".tar.gz")
self.log.info("backing up ONL-CONFIG partition %s to %s",
dev, self.configArchive)
with MountContext(dev, log=self.log) as ctx:
self.log.debug("+ tar -zcf %s -C %s .",
self.configArchive, ctx.dir)
pipe = subprocess.Popen(["tar", "-zcf", self.configArchive, ".",],
cwd=ctx.dir)
pipe.communicate()
code = pipe.wait()
if code:
raise SystemExit("backup of ONL-CONFIG failed")
def restoreConfig(self, dev):
"""Restore the saved ONL-CONFIG."""
archive, self.configArchive = self.configArchive, None
self.log.info("restoring ONL-CONFIG archive %s to %s",
archive, dev)
with MountContext(dev, log=self.log) as ctx:
self.log.debug("+ tar -zxf %s -C %s",
archive, ctx.dir)
pipe = subprocess.Popen(["tar", "-zxf", archive,],
cwd=ctx.dir)
pipe.communicate()
code = pipe.wait()
if code:
raise SystemExit("backup of ONL-CONFIG failed")
self.unlink(archive)
GRUB_TPL = """\
#serial --port=0x3f8 --speed=115200 --word=8 --parity=no --stop=1
serial %(serial)s
terminal_input serial
terminal_output serial
set timeout=5
menuentry OpenNetworkLinux {
search --no-floppy --label --set=root ONL-BOOT
echo 'Loading Open Network Linux ...'
insmod gzio
insmod part_msdos
#linux /kernel-3.9.6-x86-64-all nopat console=ttyS0,115200n8 onl_platform=x86-64-kvm-x86-64-r0
linux /%(kernel)s %(args)s onl_platform=%(platform)s
initrd /%(initrd)s
}
# Menu entry to chainload ONIE
menuentry ONIE {
search --no-floppy --label --set=root ONIE-BOOT
echo 'Loading ONIE ...'
chainloader +1
}
"""
class GrubInstaller(SubprocessMixin, Base):
"""Installer for grub-based systems (x86)."""
class installmeta(Base.installmeta):
grub = True
def __init__(self, *args, **kwargs):
Base.__init__(self, *args, **kwargs)
self.device = None
self.minpart = None
self.nextBlock = None
self.blkidParts = []
self.partedDevice = None
self.partedDisk = None
self.configArchive = None
# backup of ONL-CONFIG during re-partitioning
def findGpt(self):
self.blkidParts = BlkidParser(log=self.log.getChild("blkid"))
deviceOrLabel = self.im.platformConf['grub']['device']
if deviceOrLabel.startswith('/dev'):
tgtDevice, tgtLabel = deviceOrLabel, None
else:
tgtDevice, tgtLabel = None, deviceOrLabel
# enumerate labeled partitions to try to identify
# the boot device
for part in self.blkidParts:
dev, partno = part.splitDev()
if tgtLabel is not None and tgtLabel == part.label:
if not len(partno):
self.log.error("cannot use whole disk")
return 1
if self.device is None:
self.device = dev
else:
self.log.error("found multiple devices: %s, %s",
dev, self.device)
return 1
elif tgtDevice is not None and tgtDevice == dev:
if not len(partno):
self.log.error("cannot use whole disk")
return 1
if self.device is None:
self.device = dev
else:
self.log.error("found multiple devices: %s, %s",
dev, self.device)
return 1
if self.device is None:
self.log.error("cannot find an install device")
return 1
# optionally back up a config partition
# if it's on the boot device
for part in self.blkidParts:
dev, partno = part.splitDev()
if dev == self.device and part.label == 'ONL-CONFIG':
self.backupConfig(part.device)
self.partedDevice = parted.getDevice(self.device)
self.partedDisk = parted.newDisk(self.partedDevice)
# enumerate the partitions that will stay and go
minpart = -1
for part in self.partedDisk.partitions:
if part.getFlag(parted.PARTITION_HIDDEN):
minpart = max(minpart, part.number+1)
continue
# else, the partition should exist
blkidParts = [x for x in self.blkidParts if x.device == part.path]
if not blkidParts:
self.log.warn("cannot identify partition %s", part)
continue
blkidPart = blkidParts[0]
if not blkidPart.isOnieReserved(): continue
# else, check the GPT label for reserved-ness
if (part.name
and ('GRUB' in part.name
or 'ONIE-BOOT' in part.name
or 'DIAG' in part.name)):
minpart = max(minpart, part.number+1)
if minpart < 0:
self.log.error("cannot find an install partition")
return 1
self.minpart = minpart
return 0
def deletePartitions(self):
nextBlock = -1
for part in self.partedDisk.partitions:
self.log.info("examining %s part %d",
self.partedDisk.device.path, part.number)
if part.number < self.minpart:
self.log.info("skip this part")
nextBlock = max(part.geometry.start+part.geometry.length,
nextBlock)
else:
self.log.info("deleting this part")
self.partedDisk.removePartition(part)
if nextBlock < 0:
self.log.error("cannot find a starting block")
return 1
self.nextBlock = nextBlock
return 0
def partitionGpt(self):
constraint = self.partedDevice.optimalAlignedConstraint
# default partition layout constraint
devices = {}
def _u2s(sz, u):
bsz = sz * u
bsz = bsz + self.partedDevice.physicalSectorSize - 1
return bsz / self.partedDevice.physicalSectorSize
UNITS = {
'GiB' : 1024 * 1024 * 1024,
'G' : 1000 * 1000 * 1000,
'MiB' : 1024 * 1024,
'M' : 1000 * 1000,
'KiB' : 1024,
'K' : 1000,
}
for part in self.im.platformConf['installer']:
label, sz = list(part.items())[0]
if type(sz) == dict:
sz, fmt = sz['='], sz.get('format', 'ext4')
else:
fmt = 'ext4'
cnt = None
for ul, ub in UNITS.items():
if sz.endswith(ul):
cnt = _u2s(int(sz[:-len(ul)], 10), ub)
break
if sz == '100%':
cnt = self.partedDevice.getLength() - self.nextBlock
if cnt is None:
self.log.error("invalid size (no units) for %s: %s",
part, sz)
return 1
start = self.nextBlock
end = start + cnt - 1
if end <= self.partedDevice.getLength():
self.log.info("Allocating %d sectors for %s",
cnt, label)
else:
self.log.warn("%s: start sector %d, end sector %d, max %d",
label, start, end,
self.partedDevice.getLength())
self.log.error("invalid partition %s [%s] (too big)",
label, sz)
return 1
geom = parted.Geometry(device=self.partedDevice,
start=start, length=end-start+1)
fs = parted.FileSystem(type=fmt, geometry=geom)
part = parted.Partition(disk=self.partedDisk,
type=parted.PARTITION_NORMAL,
fs=fs,
geometry=geom)
part.getPedPartition().set_name(label)
self.partedDisk.addPartition(part, constraint=constraint)
self.partedDisk.commit()
self.log.info("Formatting %s (%s) as %s",
part.path, label, fmt)
if fmt == 'msdos':
cmd = ('mkdosfs', '-n', label, part.path,)
else:
cmd = ('mkfs.%s' % fmt, '-L', label, part.path,)
self.check_call(cmd, vmode=self.V1)
self.nextBlock, self.minpart = end+1, self.minpart+1
devices[label] = part.path
if label == 'ONL-CONFIG' and self.configArchive is not None:
self.restoreConfig(part.path)
self.blkidParts = BlkidParser(log=self.log.getChild("blkid"))
# re-read the partitions
return 0
def installBootConfig(self):
dev = self.blkidParts['ONL-BOOT']
self.log.info("Installing boot-config to %s", dev.device)
src = os.path.join(self.installerConf.installer_dir, 'boot-config')
with MountContext(dev.device, log=self.log) as ctx:
dst = os.path.join(ctx.dir, 'boot-config')
self.copy2(src, dst)
with open(src) as fd:
ecf = fd.read().encode('base64', 'strict').strip()
setattr(self.grubEnv, 'boot_config_default', ecf)
return 0
def installLoader(self):
ctx = {}
kernel = self.im.platformConf['grub']['kernel']
ctx['kernel'] = kernel['='] if type(kernel) == dict else kernel
initrd = self.im.platformConf['grub']['initrd']
ctx['initrd'] = initrd['='] if type(initrd) == dict else initrd
ctx['args'] = self.im.platformConf['grub']['args']
ctx['platform'] = self.installerConf.installer_platform
ctx['serial'] = self.im.platformConf['grub']['serial']
cf = GRUB_TPL % ctx
self.log.info("Installing kernel")
dev = self.blkidParts['ONL-BOOT']
with MountContext(dev.device, log=self.log) as ctx:
def _cp(b):
src = os.path.join(self.installerConf.installer_dir, b)
if not os.path.isfile(src): return
if b.startswith('kernel-') or b.startswith('onl-loader-initrd-'):
dst = os.path.join(ctx.dir, b)
self.copy2(src, dst)
[_cp(e) for e in os.listdir(self.installerConf.installer_dir)]
d = os.path.join(ctx.dir, "grub")
self.makedirs(d)
dst = os.path.join(ctx.dir, 'grub/grub.cfg')
with open(dst, "w") as fd:
fd.write(cf)
return 0
def installGrub(self):
self.log.info("Installing GRUB to %s", self.partedDevice.path)
self.grubEnv.install(self.partedDevice.path)
return 0
def installGpt(self):
code = self.findGpt()
if code: return code
self.log.info("Installing to %s starting at partition %d",
self.device, self.minpart)
self.log.info("disk is %s", self.partedDevice.path)
if self.partedDisk.type != 'gpt':
self.log.error("not a GPT partition table")
return 1
if self.partedDevice.sectorSize != 512:
self.log.error("invalid logical block size")
return 1
if self.partedDevice.physicalSectorSize != 512:
self.log.error("invalid physical block size")
return 1
self.log.info("found a disk with %d blocks",
self.partedDevice.getLength())
code = self.deletePartitions()
if code: return code
self.log.info("next usable block is %s", self.nextBlock)
code = self.partitionGpt()
if code: return code
# once we assign the ONL-BOOT partition,
# we can re-target the grub environment
dev = self.blkidParts['ONL-BOOT']
self.grubEnv.__dict__['bootPart'] = dev.device
self.grubEnv.__dict__['bootDir'] = None
code = self.installSwi()
if code: return code
code = self.installLoader()
if code: return code
code = self.installBootConfig()
if code: return code
code = self.installGrub()
if code: return code
self.log.info("ONL loader install successful.")
self.log.info("GRUB installation is required next.")
return 0
def run(self):
if 'grub' not in self.im.platformConf:
self.log.error("platform config is missing a GRUB section")
return 1
label = self.im.platformConf['grub'].get('label', None)
if label != 'gpt':
self.log.error("invalid GRUB label in platform config: %s", label)
return 1
return self.installGpt()
def shutdown(self):
pass
class UbootInstaller(SubprocessMixin, Base):
class installmeta(Base.installmeta):
device = None
uboot = True
loaderBlocks = None
flashBlocks = None
flash2Blocks = None
# block count, or -1 for "rest"
loaderRaw = True
# true for raw loader partition (FIT image)
loaderSrc = None
# default loader source file (auto-detect)
loaderDst = "onl-loader"
# destination path on ONL-BOOT for non-raw installs
bootConf = None
# optional pre-formatted boot-config contents
# (string or list of strings)
bootCmd = None
# pre-formatted string
bootCmds = None
# ... or a list of strings
def str_bootcmd(self):
if self.bootCmd is not None: return self.bootCmd
if self.bootCmds:
return "; ".join(self.bootCmds)
raise ValueError("missing boot commands")
def __init__(self, *args, **kwargs):
kwargs = dict(kwargs)
installerConf = kwargs.pop('installerConf', None)
machineConf = kwargs.pop('machineConf', None)
platformConf = kwargs.pop('platformConf', None)
ubootEnv = kwargs.pop('ubootEnv', None)
self.im = self.installmeta(installerConf=installerConf,
machineConf=machineConf,
platformConf=platormConf,
ubootEnv=ubootEnv)
Base.__init__(self, *args,
machineConf=machineConf, installerConf=installerConf, platformConf=platformConf,
ubootEnv=ubootEnv,
**kwargs)
# XXX roth
self.onlBootDev = None
self.onlConfigDev = None
self.onlDataDev = None
def formatBlockdev(self):
if self.im.loaderBlocks < 0:
self.log.error("no size defined for ONL-BOOT")
return 1
if self.im.flashBlocks < 0:
self.log.error("no size defined for FLASH")
return 1
self.log.info("Formatting %s as %d:%d:%d",
self.im.device,
self.im.loaderBlocks,
self.im.flashBlocks,
self.im.flash2Blocks)
self.check_call(('parted', '-s', self.im.device,
'mklabel', 'msdos',))
start = 1
end = start + self.im.loaderBlocks-1
self.check_call(('parted', '-s', self.im.device,
'unit', 's',
'mkpart', 'primary', 'fat32', str(start)+'s', str(end)+'s',))
self.onlBootDev = self.im.device + '1'
if not self.im.loaderRaw:
cmd = ('mkdosfs', '-n', 'ONL-BOOT', self.onlBootDev,)
self.check_call(cmd, vmode=self.V1)
start = end + 1
end = start + self.im.flashBlocks-1
self.check_call(('parted', '-s', self.im.device,
'unit', 's',
'mkpart', 'primary', 'fat32', str(start)+'s', str(end)+'s',))
self.onlConfigDev = self.im.device + '2'
cmd = ('mkdosfs', '-n', 'FLASH', self.onlConfigDev,)
self.check_call(cmd, vmode=self.V1)
start = end + 1
if self.im.flash2Blocks > -1:
end = start + self.im.flash2Blocks-1
self.check_call(('parted', '-s', self.im.device,
'unit', 's',
'mkpart', 'primary', 'fat32', str(start)+'s', str(end)+'s',))
else:
self.check_call(('parted', '-s', self.im.device,
'unit', 's',
'mkpart', 'primary', 'fat32', str(start)+'s', '100%',))
self.onlDataDev = self.im.device + '3'
cmd = ('mkdosfs', '-n', 'FLASH2', self.onlDataDev,)
self.check_call(cmd, vmode=self.V1)
return 0
def installLoader(self):
loaderSrc = None
for cand in (("%s/%s.itb"
% (self.installerConf.installer_dir,
self.installerConf.installer_platform,)),
os.path.join(self.installerConf.installer_dir, 'powerpc-fit-all.itb'),
self.im.loaderSrc,
("%s/onl.%s.loader"
% (self.installerConf.installer_dir,
self.installerConf.installer_platform,))):
if os.path.exists(cand):
loaderSrc = cand
break
if not loaderSrc:
self.log.error("The platform loader file is missing.")
self.log.error("This is unexpected - %s", loaderSrc)
return 1
self.log.info("Installing the ONL loader...")
if self.im.loaderRaw:
cmd = ('dd',
'if=' + loaderSrc,
'of=' + self.onlBootDev,)
self.check_call(cmd, vmode=self.V2)
else:
with MountContext(self.onlBootDev, log=self.log) as ctx:
dst = os.path.join(ctx, self.im.loaderDst)
self.copy2(loaderSrc, dst)
return 0
def installBootconfig(self):
cf = None
p = os.path.join(self.installerConf.installer_dir, 'boot-config')
if cf is None and os.path.exists(p):
cf = open(p, "r").read()
p = os.path.join(self.installerConf.installer_platform_dir, 'boot-config')
if cf is None and os.path.exists(p):
cf = open(p, "r").read()
if cf is None and self.im.bootConf:
if isinstance(self.im.bootConf, basestring):
cf = self.im.bootConf
else:
cf = "\n".join(cf) + "\n"
if cf is None:
buf = StringIO.StringIO()
buf.write("SWI=images:onl-%s.swi\n"
% (self.platformConf.installer_arch,))
buf.write("NETDEV=ma1\n")
cf = buf.getvalue()
self.log.info("Writing boot-config.")
with MountContext(self.onlConfigDev, log=self.log) as ctx:
dst = os.path.join(ctx.dir, "boot-config")
with open(dst, "w") as fd:
fd.write(cf)
ecf = cf.encode('base64', 'strict').strip()
setattr(self.ubootEnv, 'boot-config-default', ecf)
return 0
def installUbootEnv(self):
# Special access instructions for initrd
off = getattr(self.installerConf, 'initrd_offset', None)
if off is not None:
if self.im.loaderRaw:
a = self.onlBootDev
else:
a = self.installerConf.initrd_archive
s = int(self.installerConf.initrd_offset)
e = s + int(self.installerConf.initrd_size) - 1
self.ubootEnv.onl_installer_initrd = ("%s:%x:%x" % (a, s, e,))
else:
try:
del self.installerConf.onl_installer_initrd
except AttributeError:
pass
if self.im.isOnie():
self.log.info("Setting ONIE nos_bootcmd to boot ONL")
self.ubootEnv.nos_bootcmd = self.im.str_bootcmd()
else:
self.log.warn("U-boot boot setting is not changed")
return 0
def installUboot(self):
st = os.stat(self.im.device)
if not stat.S_ISBLK(st[stat.ST_MODE]):
self.log.error("not a block device: %s",
self.im.device)
return 1
code = self.formatBlockdev()
if code: return code
code = self.installLoader()
if code: return code
code = self.installBootconfig()
if code: return code
code = self.installSwi()
if code: return code
self.log.info("syncing block devices")
self.check_call(('sync',))
# XXX roth probably not needed
code = self.installUbootEnv()
if code: return code
return 0
def run(self):
return self.installUboot()
def shutdown(self):
pass

View File

@@ -0,0 +1,240 @@
"""BaseRecovery.py
Base classes for recovery.
"""
import subprocess, os, stat
import tempfile
import binascii
import glob
import logging
from InstallUtils import TempdirContext, MountContext, SubprocessMixin, ProcMountsParser
from InstallUtils import InitrdContext, BlkidParser
from ConfUtils import ChrootGrubEnv
class Base(SubprocessMixin):
class recovermeta:
bootConfig = "/mnt/flash/boot-config"
bootConfigDfl = "/etc/boot-config.default"
@property
def needRecovery(self):
if os.path.exists('/mnt/flash/.notmounted'): return True
if os.path.exists('/mnt/flash2/.notmounted'): return True
return False
def __init__(self,
ubootEnv=None,
log=None):
self.platform = self.recovermeta()
self.ubootEnv = ubootEnv
self.log = log or logging.getLogger(self.__class__.__name__)
def recoverFull(self):
self.log.error("not implemented")
return 1
def recoverConfig(self):
if os.path.exists(self.platform.bootConfig): return 0
self.copy2(self.platform.bootConfigDfl, self.platform.bootConfig)
return 0
def run(self):
if self.platform.needRecovery:
self.log.info("Attempting recovery")
code = self.recoverFull()
if code: return code
code = self.recoverConfig()
if code: return code
return 0
def umountAny(self, device=None, label=None):
p = ProcMountsParser()
if label is not None:
b = BlkidParser(log=self.log)
for e in b.parts:
if label == e.label:
device = e.device
break
for m in p.mounts:
if device is not None and device in m.device:
try:
self.check_call(('umount', m.device,),
vmode=self.V1)
except CalledProcessError, what:
self.log.warn("cannot umount %s: %s",
m.device, str(what))
return 0
def shutdown(self):
pass
class GrubRecovery(Base):
class recovermeta(Base.recovermeta):
pass
def recoverX86(self):
def _u(l):
self.umountAny(label=l)
def _l(l):
try:
return self.check_output(('blkid', '-L', l,)).strip()
except subprocess.CalledProcessError:
return None
def _r(l):
_u(l)
dev = _l(l)
if dev is not None:
self.log.info("Recovering %s partition", l)
self.check_call(('mkdosfs', '-n', l, dev,),
vmode=self.V1)
_r('FLASH')
_r('FLASH2')
return 0
def recoverGrubConfig(self):
with MountContext(label='ONIE-BOOT', log=self.log) as octx:
pat = "%s/onie/initrd.img*" % octx.dir
l = glob.glob(pat)
if not l:
raise ValueError("cannot find ONIE initrd")
initrd = l[0]
with InitrdContext(initrd=initrd, log=self.log) as ictx:
# copy the Switch Light grubenv out of its GRUB directory
dst = os.path.join(ictx.dir, "tmp/grubenv")
with MountContext(label='SL-BOOT', log=self.log) as sctx:
src = os.path.join(sctx.dir, "grub/grubenv")
self.copy2(src, dst)
# use the ONIE runtime's GRUB tools to read it
grubEnv = ChrootGrubEnv(ictx.dir, mounted=True,
bootDir="/",
path="/tmp/grubenv",
log=self.log)
buf = getattr(grubEnv, 'boot_config_default', None)
if buf is None:
raise ValueError("Cannot recover filesystem(s) -- missing boot_config_default.")
if buf == "":
raise ValueError("Cannot recover filesystem(s) -- empty boot_config_default.")
try:
buf = buf.decode('base64', 'strict')
except binascii.Error:
raise ValueError("Cannot recover filesystem(s) -- corrupted boot_config_default.")
if "SWI=flash" in buf:
raise ValueError("Cannot recover filesystem(s) -- local SWI cannot be recovered.")
with MountContext(label='FLASH', log=self.log) as ctx:
dst = os.path.join(ctx.dir, 'boot-config')
with open(dst, "w") as fd:
self.log.debug("+ cat > %s", dst)
fd.write(buf)
return 0
def recoverFull(self):
self.log.info("Recovering flash partitions.")
code = self.recoverX86()
if code: return code
code = self.recoverGrubConfig()
if code: return code
self.check_call(('initmounts',))
return 0
class UbootRecovery(Base):
class recovermeta(Base.recovermeta):
def __init__(self, ubootEnv=None):
self.ubootEnv = ubootEnv
device = None
# fill this in per-platform
@property
def bootConfigEnv(self):
if self.ubootEnv is None:
raise ValueError("missing u-boot environment tools")
buf = getattr(self.ubootEnv, 'boot-config-default', None)
if buf is None:
raise ValueError("Cannot recover filesystem(s) -- missing boot-config-default.")
if buf == "":
raise ValueError("Cannot recover filesystem(s) -- empty boot-config-default.")
try:
buf = buf.decode('base64', 'strict')
except binascii.Error:
raise ValueError("Cannot recover filesystem(s) -- corrupted boot-config-default.")
if "SWI=flash" in buf:
raise ValueError("Cannot recover filesystem(s) -- local SWI cannot be recovered.")
return buf
def __init__(self,
ubootEnv=None,
log=None):
self.ubootEnv = ubootEnv
self.platform = self.recovermeta(ubootEnv=ubootEnv)
self.log = log or logging.getLogger(self.__class__.__name__)
self.flashDev = self.platform.device + '2'
self.flash2Dev = self.platform.device + '3'
def recoverUboot(self):
if not os.path.exists(self.platform.device):
self.log.error("missing block device, cannot recover")
return 1
st = os.stat(self.platform.device)
if not stat.S_ISBLK(st[stat.ST_MODE]):
self.log.error("invalid block device")
return 1
code = self.umountAny(device=self.platform.device)
if code: return code
self.log.info("Re-formatting %s", self.platform.device)
cmd = ('mkdosfs', '-n', 'FLASH', self.flashDev,)
self.check_call(cmd, vmode=self.V1)
cmd = ('mkdosfs', '-n', 'FLASH2', self.flash2Dev,)
self.check_call(cmd, vmode=self.V1)
return 0
def recoverUbootConfig(self):
with MountContext(self.flashDev, log=self.log) as ctx:
dst = os.path.join(ctx.dir, 'boot-config')
with open(dst, "w") as fd:
self.log.debug("+ cat > %s", dst)
fd.write(self.platform.bootConfigEnv)
return 0
def recoverFull(self):
code = self.recoverUboot()
if code: return code
self.recoverUbootConfig()
if code: return code
self.log.info("syncing block devices")
self.check_call(('sync',))
# XXX roth probably not needed
self.check_call(('initmounts',))
return 0

View File

@@ -0,0 +1,390 @@
"""ConfUtils.py
Config interfaces to different backend mechanisms.
"""
import os
import logging
import subprocess
from InstallUtils import SubprocessMixin, ChrootSubprocessMixin, MountContext
class ConfBase:
def __init__(self):
self._parse()
def _parse(self):
raise NotImplementedError
def _feedLine(self, line):
line = line.strip()
if not line: return
idx = line.find('=')
if idx < 0:
raise ValueError("invalid line in %s: %s"
% (self.path, line,))
key, val = line[:idx], line[idx+1:]
if val[:1] == '"' and val[-1:] == '"':
val = val[1:-1]
if val[:1] == "'" and val[-1:] == "'":
val = val[1:-1]
self.__dict__['_data'][key] = val
def __getattr__(self, attr, *args):
if len(args) == 1:
return self.__dict__['_data'].get(attr, args[0])
elif len(args) == 0:
try:
return self.__dict__['_data'][attr]
except KeyError, what:
raise AttributeError(str(what))
else:
raise ValueError("extra arguments")
def __setattr__(self, attr, val):
self.__dict__['_data'][attr] = val
class ConfFileBase(ConfBase):
PATH = None
# Override me
def __init__(self, path=None):
self.__dict__['path'] = path or self.PATH
ConfBase.__init__(self)
def _parse(self):
self.__dict__['_data'] = {}
with open(self.path) as fd:
for line in fd.xreadlines():
self._feedLine(line)
class MachineConf(ConfFileBase):
PATH = "/etc/machine.conf"
class InstallerConf(ConfFileBase):
PATH = "/etc/onl/installer.conf"
class ConfBuf(ConfBase):
def __init__(self, buf):
self.__dict__['buf'] = buf
ConfBase.__init__(self)
def _parse(self):
self.__dict__['_data'] = {}
for line in self.buf.splitlines():
self._feedLine(line)
class GrubEnv(SubprocessMixin):
INSTALL = "grub-install"
EDITENV = "grub-editenv"
# system default
ENV_PATH = "/grub/grubenv"
# override me
def __init__(self,
bootDir=None, bootPart=None,
path=None,
log=None):
if bootDir and bootPart:
raise ValueError("cannot specify bootDir and bootPart")
if not bootDir and not bootPart:
raise ValueError("missing bootDir or bootPart")
self.__dict__['bootDir'] = bootDir
self.__dict__['bootPart'] = bootPart
# location of GRUB boot files (mounted directory or unmounted partition)
self.__dict__['path'] = path or self.ENV_PATH
# path to grubenv, relative to above
self.__dict__['log'] = log or logging.getLogger("grub")
def mountCtx(self, device):
return MountContext(device, fsType='ext4', log=self.log)
def asDict(self):
if self.bootPart:
with self.mountCtx(self.bootPart) as ctx:
p = os.path.join(ctx.dir, self.path.lstrip('/'))
buf = self.check_output((self.EDITENV, p, 'list',)).strip()
else:
p = os.path.join(self.bootDir, self.path.lstrip('/'))
buf = self.check_output((self.EDITENV, p, 'list',)).strip()
cf = ConfBuf(buf)
return cf.__dict__['_data']
toDict = asDict
def __getattr__(self, *args):
args = list(args)
attr = args.pop(0)
d = self.asDict()
if args:
return d.get(attr, args[0])
try:
return d[attr]
except KeyError, what:
raise AttributeError(str(what))
def __setattr__(self, attr, val):
if self.bootPart:
with self.mountCtx(self.bootPart) as ctx:
p = os.path.join(ctx.dir, self.path.lstrip('/'))
cmd = (self.EDITENV, p, 'set', ("%s=%s" % (attr, val,)),)
self.check_call(cmd)
else:
p = os.path.join(self.bootDir, self.path.lstrip('/'))
cmd = (self.EDITENV, p, 'set', ("%s=%s" % (attr, val,)),)
self.check_call(cmd)
def __delattr__(self, attr):
if self.bootPart:
with self.mountCtx(self.bootPart) as ctx:
p = os.path.join(ctx.dir, self.path.lstrip('/'))
cmd = (self.EDITENV, p, 'unset', attr,)
self.check_call(cmd)
else:
p = os.path.join(self.bootDir, self.path.lstrip('/'))
cmd = (self.EDITENV, p, 'unset', attr,)
self.check_call(cmd)
def install(self, device):
if self.bootDir is not None:
self.check_call((self.INSTALL, '--boot-directory=' + self.bootDir, device,))
elif self.bootPart is not None:
with self.mountCtx(self.bootPart) as ctx:
self.check_call((self.INSTALL, '--boot-directory=' + ctx.dir, device,))
else:
self.check_call((self.INSTALL, device,))
class ChrootGrubEnv(ChrootSubprocessMixin, GrubEnv):
def __init__(self,
chrootDir,
mounted=False,
bootDir=None, bootPart=None,
path=None,
log=None):
self.__dict__['chrootDir'] = chrootDir
self.__dict__['mounted'] = mounted
GrubEnv.__init__(self,
bootDir=bootDir, bootPart=bootPart,
path=path,
log=log)
def mountCtx(self, device):
return MountContext(device,
chroot=self.chrootDir, fsType='ext4',
log=self.log)
class ProxyGrubEnv:
"""Pretend to manipulate the GRUB environment.
Instead, write a trace of shell commands to a log
so that e.g. the chroot's host can execute it with
the proper GRUB runtime.
"""
INSTALL = "grub-install"
EDITENV = "grub-editenv"
# system defaults
ENV_PATH = "/grub/grubenv"
# override this
def __init__(self,
installerConf,
bootDir=None, chroot=True, bootPart=None,
path=None,
log=None):
self.__dict__['installerConf'] = installerConf
# installer state, to retrieve e.g. chroot directory and trace log
if bootDir and bootPart:
raise ValueError("cannot specify bootDir and bootPart")
if not bootDir and not bootPart:
raise ValueError("missing bootDir or bootPart")
self.__dict__['bootDir'] = bootDir
self.__dict__['bootPart'] = bootPart
# location of GRUB boot files (mounted directory or unmounted partition)
self.__dict__['chroot'] = chroot
# True of the bootDir is inside the chroot,
# else bootDir is in the host's file namespace
self.__dict__['path'] = path or self.ENV_PATH
# path to grubenv, relative to above
self.__dict__['log'] = log or logging.getLogger("grub")
def asDict(self):
raise NotImplementedError("proxy grubenv list not implemented")
toDict = asDict
def __getattr__(self, *args):
raise NotImplementedError("proxy grubenv list not implemented")
def __setattr__(self, attr, val):
self.log.warn("deferring commands to %s...", self.installerConf.installer_postinst)
cmds = []
if self.bootDir and self.chroot:
p = os.path.join(self.installerConf.installer_chroot,
self.bootDir.lstrip('/'),
self.path.lstrip('/'))
cmds.append(("%s %s set %s=\"%s\"" % (self.EDITENV, p, attr, val,)))
elif self.bootDir:
p = os.path.join(self.bootDir,
self.path.lstrip('/'))
cmds.append(("%s %s set %s=\"%s\"" % (self.EDITENV, p, attr, val,)))
else:
p = ("${mpt}/%s"
% (self.path.lstrip('/'),))
cmds.append("mpt=$(mktemp -t -d)")
cmds.append("mount %s $mpt" % self.bootPart)
cmds.append(("sts=0; %s %s set %s=\"%s\" || sts=$?"
% (self.EDITENV, p, attr, val,)))
cmds.append("umount $mpt")
cmds.append("rmdir $mpt")
cmds.append("test $sts -eq 0")
with open(self.installerConf.installer_postinst, "a") as fd:
for cmd in cmds:
self.log.debug("+ [PROXY] " + cmd)
fd.write(cmd)
fd.write("\n")
def __delattr__(self, attr):
self.log.warn("deferring commands to %s...", self.installerConf.installer_postinst)
cmds = []
if self.bootDir and self.chroot:
p = os.path.join(self.installerConf.installer_chroot,
self.bootDir.lstrip('/'),
self.path.lstrip('/'))
cmds.append(("%s %s unset %s" % (self.EDITENV, p, attr,)))
elif self.bootDir:
p = os.path.join(self.bootDir,
self.path.lstrip('/'))
cmds.append(("%s %s unset %s" % (self.EDITENV, p, attr,)))
else:
p = ("$mpt%s"
% (self.path.lstrip('/'),))
cmds.append("mpt=$(mktemp -t -d)")
cmds.append("mount %s $mpt" % self.bootPart)
cmds.append(("sts=0; %s %s unset %s || sts=$?"
% (self.EDITENV, p, attr,)))
cmds.append("umount $mpt")
cmds.append("rmdir $mpt")
cmds.append("test $sts -eq 0")
with open(self.installerConf.installer_postinst, "a") as fd:
for cmd in cmds:
self.log.debug("+ [PROXY] " + cmd)
fd.write(cmd)
fd.write("\n")
def install(self, device):
self.log.warn("deferring commands to %s...", self.installerConf.installer_postinst)
cmds = []
if self.bootDir and self.chroot:
p = os.pat.join(self.installerConf.installer_chroot,
self.bootDir.lstrip('/'))
cmds.append(("%s --boot-directory=\"%s\" %s" % (self.INSTALL, p, device,)))
elif self.bootDir:
p = self.bootDir
cmds.append(("%s --boot-directory=\"%s\" %s" % (self.INSTALL, p, device,)))
elif self.bootPart:
cmds.append("mpt=$(mktemp -t -d)")
cmds.append("mount %s $mpt" % self.bootPart)
cmds.append(("sts=0; %s --boot-directory=\"$mpt\" %s || sts=$?"
% (self.INSTALL, device,)))
cmds.append("umount $mpt")
cmds.append("rmdir $mpt")
cmds.append("test $sts -eq 0")
else:
cmds.append(("%s %s"
% (self.INSTALL, device,)))
with open(self.installerConf.installer_postinst, "a") as fd:
for cmd in cmds:
self.log.debug("+ [PROXY] " + cmd)
fd.write(cmd)
fd.write("\n")
class UbootEnv(SubprocessMixin):
# ha ha, loader and SWI use different paths
if os.path.exists("/usr/sbin/fw_setenv"):
SETENV = "/usr/sbin/fw_setenv"
elif os.path.exists("/usr/bin/fw_setenv"):
SETENV = "/usr/bin/fw_setenv"
else:
SETENV = "/bin/false"
if os.path.exists("/usr/sbin/fw_printenv"):
PRINTENV = "/usr/sbin/fw_printenv"
elif os.path.exists("/usr/bin/fw_printenv"):
PRINTENV = "/usr/bin/fw_printenv"
else:
PRINTENV = "/bin/false"
def __init__(self, log=None):
self.__dict__['log'] = log or logging.getLogger("u-boot")
self.__dict__['hasForceUpdate'] = False
try:
out = self.check_output((self.SETENV, '--help',),
stderr=subprocess.STDOUT)
if "-f" in out and "Force update" in out:
self.__dict__['hasForceUpdate'] = True
except subprocess.CalledProcessError:
if self.SETENV != '/bin/false':
raise
def __getattr__(self, *args):
args = list(args)
attr = args.pop(0)
with open(os.devnull, "w") as nfd:
try:
out = self.check_output((self.PRINTENV, '-n', attr,),
stderr=nfd.fileno())
except subprocess.CalledProcessError:
out = None
if out is not None: return out
if args:
return args[0]
raise AttributeError("firmware tag not found")
def __setattr__(self, attr, val):
if self.hasForceUpdate:
self.check_call((self.SETENV, '-f', attr, val,))
else:
self.check_call((self.SETENV, attr, val,))
def __delattr__(self, attr):
if self.hasForceUpdate:
self.check_call((self.SETENV, '-f', attr,))
else:
self.check_call((self.SETENV, attr,))
def asDict(self):
buf = self.check_output((self.PRINTENV,)).strip()
return ConfBuf(buf).__dict__['_data']
toDict = asDict

View File

@@ -0,0 +1,578 @@
"""Fit.py
Parse FIT files.
"""
import os, sys
import logging
import struct
import argparse
import time
class FdtProperty:
def __init__(self, name, offset, sz):
self.name = name
self.offset = offset
self.sz = sz
class FdtNode:
def __init__(self, name):
self.name = name
self.properties = {}
self.nodes = {}
class Parser:
FDT_MAGIC = 0xd00dfeed
FDT_BEGIN_NODE = 1
FDT_END_NODE = 2
FDT_PROP = 3
FDT_NOP = 4
FDT_END = 9
def __init__(self, path=None, stream=None, log=None):
self.log = log or logging.getLogger(self.__class__.__name__)
self.path = path
self.stream = stream
self.rootNodes = {}
self._parse()
def _parse(self):
if self.stream is not None:
try:
pos = self.stream.tell()
self._parseStream(self.stream)
finally:
self.stream.seek(pos, 0)
elif self.path is not None:
with open(self.path) as fd:
self._parseStream(fd)
else:
raise ValueError("missing file or stream")
def _parseStream(self, fd):
strings = {}
buf = fd.read(40)
hdr = list(struct.unpack(">10I", buf))
magic = hdr.pop(0)
if magic != self.FDT_MAGIC:
raise ValueError("missing magic")
self.fdtSize = hdr.pop(0)
self.structPos = hdr.pop(0)
self.stringPos = hdr.pop(0)
self.version = hdr.pop(0)
if self.version < 17:
raise ValueError("invalid format version")
hdr.pop(0) # last compatible version
hdr.pop(0) # boot cpu
self.stringSize = hdr.pop(0)
self.structSize = hdr.pop(0)
fd.seek(self.structPos, 0)
def _align():
pos = fd.tell()
pos = (pos+3) & ~3
fd.seek(pos, 0)
def _label():
buf = ""
while True:
c = fd.read(1)
if c == '\x00': break
if c:
buf += c
return buf
def _string(off):
if off in strings:
return strings[off]
pos = fd.tell()
fd.seek(self.stringPos, 0)
fd.seek(off, 1)
buf = _label()
fd.seek(pos)
return buf
nodeStack = []
while True:
buf = fd.read(4)
s = list(struct.unpack(">I", buf))
tag = s.pop(0)
if tag == self.FDT_BEGIN_NODE:
name = _label()
_align()
newNode = FdtNode(name)
if nodeStack:
if name in nodeStack[-1].nodes:
raise ValueError("duplicate node")
nodeStack[-1].nodes[name] = newNode
nodeStack.append(newNode)
else:
if name in self.rootNodes:
raise ValueError("duplicate node")
self.rootNodes[name] = newNode
nodeStack.append(newNode)
continue
if tag == self.FDT_PROP:
buf = fd.read(8)
s = list(struct.unpack(">2I", buf))
plen = s.pop(0)
nameoff = s.pop(0)
name = _string(nameoff)
pos = fd.tell()
fd.seek(plen, 1)
_align()
newProp = FdtProperty(name, pos, plen)
if nodeStack:
if name in nodeStack[-1].properties:
raise ValueError("duplicate property")
nodeStack[-1].properties[name] = newProp
else:
raise ValueError("property with no node")
continue
if tag == self.FDT_END_NODE:
if nodeStack:
nodeStack.pop(-1)
else:
raise ValueError("missing begin node")
continue
if tag == self.FDT_NOP:
print "NOP"
continue
if tag == self.FDT_END:
if nodeStack:
raise ValueError("missing end node(s)")
break
raise ValueError("invalid tag %d" % tag)
def report(self, stream=sys.stdout):
q = [(x, "") for x in self.rootNodes.values()]
while q:
n, pfx = q.pop(0)
name = n.name or "/"
stream.write("%s%s\n" % (pfx, name,))
if n.properties:
stream.write("\n")
for p in n.properties.values():
stream.write("%s %s (%d bytes)\n"
% (pfx, p.name, p.sz,))
if n.properties:
stream.write("\n")
pfx2 = pfx + " "
q[0:0] = [(x, pfx2) for x in n.nodes.values()]
def getNode(self, path):
if path == '/':
return self.rootNodes.get('', None)
els = path.split('/')
n = None
while els:
b = els.pop(0)
if n is None:
if b not in self.rootNodes: return None
n = self.rootNodes[b]
else:
if b not in n.nodes: return None
n = n.nodes[b]
return n
def getNodeProperty(self, node, propName):
if propName not in node.properties: return None
prop = node.properties[propName]
def _get(fd):
fd.seek(self.structPos, 0)
fd.seek(prop.offset)
buf = fd.read(prop.sz)
if buf[-1] == '\x00':
return buf[:-1]
return buf
if self.stream is not None:
return _get(self.stream)
else:
with open(self.path) as fd:
return _get(fd)
def dumpNodeProperty(self, node, propIsh, outPath):
if isinstance(propIsh, FdtProperty):
prop = propIsh
else:
if propIsh not in node.properties:
raise ValueError("missing property")
prop = node.properties[propIsh]
def _dump(fd):
with open(outPath, "w") as wfd:
fd.seek(prop.offset, 0)
buf = fd.read(prop.sz)
wfd.write(buf)
if self.stream is not None:
try:
pos = self.stream.tell()
_dump(self.stream)
finally:
self.stream.seek(pos, 0)
else:
with open(self.path) as fd:
_dump(fd)
def getInitrdNode(self, profile=None):
"""U-boot mechanism to retrieve boot profile."""
node = self.getNode('/configurations')
if node is None:
self.log.warn("missing /configurations node")
return None
if profile is not None:
if profile not in node.nodes:
self.log.warn("missing profile %s", profile)
return None
node = node.nodes[profile]
elif 'default' in node.properties:
pf = self.getNodeProperty(node, 'default')
self.log.debug("default profile is %s", pf)
node = node.nodes[pf]
else:
pf = node.nodes.keys()[0]
self.log.debug("using profile %s", pf)
node = node.nodes[pf]
if 'ramdisk' not in node.properties:
self.log.warn("ramdisk property not found")
return None
rdName = self.getNodeProperty(node, 'ramdisk')
self.log.debug("retrieving ramdisk %s", rdName)
node = self.getNode('/images/' + rdName)
return node
class DumpRunner:
def __init__(self, stream,
log=None):
self.log = log or logging.getLogger(self.__class__.__name__)
self.stream = stream
def run(self):
p = Parser(stream=self.stream, log=self.log)
p.report()
return 0
def shutdown(self):
stream, self.stream = self.stream, None
if stream is not None: stream.close()
class ExtractBase:
def __init__(self, stream,
initrd=False, profile=None, path=None,
property=None,
log=None):
self.log = log or logging.getLogger(self.__class__.__name__)
self.stream = stream
self.initrd = initrd
self.profile = profile
self.path = path
self.property = property
self.parser = None
self.node = None
self.dataProp = None
def run(self):
self.parser = Parser(stream=self.stream, log=self.log)
if self.path is not None:
self.node = self.parser.getNode(self.path)
if self.node is None:
self.log.error("cannot find path")
return 1
elif self.initrd:
self.node = self.parser.getInitrdNode(profile=self.profile)
if self.node is None:
self.log.error("cannot find initrd")
return 1
else:
self.log.error("missing path or initrd")
return 1
def _t(n):
if n is None: return
self.dataProp = self.dataProp or self.node.properties.get(n, None)
_t(self.property)
_t('data')
_t('value')
if self.dataProp is None:
self.log.error("cannot find %s property", self.property)
return 1
return self._handleParsed()
def _handleParsed(self):
raise NotImplementedError
def shutdown(self):
stream, self.stream = self.stream, None
if stream is not None: stream.close()
class ExtractRunner(ExtractBase):
def __init__(self, stream,
outStream=None,
initrd=False, profile=None, path=None,
property=None,
text=False, numeric=False, timestamp=False, hex=False,
log=None):
ExtractBase.__init__(self, stream,
initrd=initrd, profile=profile,
path=path,
property=property,
log=log)
self.outStream = outStream
self.text = text
self.numeric = numeric
self.timestamp = timestamp
self.hex = hex
def _handleParsed(self):
if (self.numeric or self.timestamp) and self.dataProp.sz != 4:
self.log.error("invalid size for number")
return 1
def _dump(rfd, wfd):
rfd.seek(self.dataProp.offset, 0)
buf = rfd.read(self.dataProp.sz)
if self.text:
if buf[-1:] != '\x00':
self.log.error("missing NUL terminator")
return 1
wfd.write(buf[:-1])
return 0
if self.numeric:
n = struct.unpack(">I", buf)[0]
wfd.write(str(n))
return 0
if self.timestamp:
n = struct.unpack(">I", buf)[0]
wfd.write(time.ctime(n))
return 0
if self.hex:
for c in buf:
wfd.write("%02x" % ord(c))
return 0
wfd.write(buf)
return 0
if self.outStream is not None:
return _dump(self.stream, self.outStream)
else:
return _dump(self.stream, sys.stdout)
class OffsetRunner(ExtractBase):
def __init__(self, stream,
initrd=False, profile=None, path=None,
property=None,
log=None):
ExtractBase.__init__(self, stream,
initrd=initrd, profile=profile,
path=path,
property=property,
log=log)
def _handleParsed(self):
start = self.dataProp.offset
self.log.debug("first byte is %d", start)
end = start + self.dataProp.sz - 1
self.log.debug("data size is %d", self.dataProp.sz)
self.log.debug("last byte is %d", end)
sys.stdout.write("%s %s\n" % (start, end,))
return 0
USAGE = """\
pyfit [OPTIONS] dump|extract ...
"""
EPILOG = """\
Payload for 'offset' and 'extract' is specified as a given
PROPERTY for a tree node at PATH.
Alternately, the initrd/ramdisk can be specified with '--initrd',
using the PROFILE machine configuration. If no PROFILE is specified,
the built-in default configuration from the FDT is used.
"""
DESC="""\
Extract or examine FIT file contents.
"""
DUMP_USAGE = """\
pyfit [OPTIONS] dump FIT-FILE
"""
EXTRACT_USAGE = """\
pyfit [OPTIONS] extract [OPTIONS] FIT-FILE
"""
EXTRACT_EPILOG = """\
Extracts payload to OUTPUT or to stdout if not specified.
Output can be optionally reformatted
as a NUL-terminated string ('--text'),
as a decimal number ('--number'),
as a UNIX timestamp ('--timestamp'),
or as hex data ('--hex').
Numbers and timestamps must be 4-byte payloads.
"""
OFFSET_USAGE = """\
pyfit [OPTIONS] offset [OPTIONS] FIT-FILE
"""
OFFSET_EPILOG = """\
Outputs the first and last byte offsets, inclusive, containing the
payload.
"""
class App:
def __init__(self, log=None):
self.log = log or logging.getLogger("pyfit")
def run(self):
ap = argparse.ArgumentParser(usage=USAGE,
description=DESC,
epilog=EPILOG)
ap.add_argument('-q', '--quiet', action='store_true',
help="Suppress log messages")
ap.add_argument('-v', '--verbose', action='store_true',
help="Add more logging")
sp = ap.add_subparsers()
apd = sp.add_parser('dump',
help="Dump tree structure",
usage=DUMP_USAGE)
apd.set_defaults(mode='dump')
apd.add_argument('fit-file', type=open,
help="FIT file")
apx = sp.add_parser('extract',
help="Extract items",
usage=EXTRACT_USAGE,
epilog=EXTRACT_EPILOG)
apx.set_defaults(mode='extract')
apx.add_argument('fit-file', type=open,
help="FIT file")
apx.add_argument('-o', '--output',
type=argparse.FileType('wb', 0),
help="File destination")
apx.add_argument('--initrd', action="store_true",
help="Extract platform initrd")
apx.add_argument('--profile', type=str,
help="Platform profile for initrd selection")
apx.add_argument('--path', type=str,
help="Tree path to extract")
apx.add_argument('--property', type=str,
help="Node property to extract")
apx.add_argument('--text', action='store_true',
help="Format property as text")
apx.add_argument('--numeric', action='store_true',
help="Format property as a number")
apx.add_argument('--hex', action='store_true',
help="Format property as hex")
apx.add_argument('--timestamp', action='store_true',
help="Format property as a date")
apo = sp.add_parser('offset',
help="Extract item offset",
usage=OFFSET_USAGE,
epilog=OFFSET_EPILOG)
apo.set_defaults(mode='offset')
apo.add_argument('fit-file', type=open,
help="FIT file")
apo.add_argument('--initrd', action="store_true",
help="Extract platform initrd")
apo.add_argument('--profile', type=str,
help="Platform profile for initrd selection")
apo.add_argument('--path', type=str,
help="Tree path to extract")
apo.add_argument('--property', type=str,
help="Node property to extract")
try:
args = ap.parse_args()
except SystemExit, what:
return what.code
if args.quiet:
self.log.setLevel(logging.ERROR)
if args.verbose:
self.log.setLevel(logging.DEBUG)
if args.mode == 'dump':
r = DumpRunner(getattr(args, 'fit-file'), log=self.log)
elif args.mode == 'extract':
r = ExtractRunner(getattr(args, 'fit-file'),
outStream=args.output,
path=args.path,
initrd=args.initrd, profile=args.profile,
property=args.property,
text=args.text, numeric=args.numeric,
timestamp=args.timestamp, hex=args.hex,
log=self.log)
elif args.mode == 'offset':
r = OffsetRunner(getattr(args, 'fit-file'),
path=args.path,
initrd=args.initrd, profile=args.profile,
property=args.property,
log=self.log)
else:
self.log.error("invalid mode")
return 1
try:
code = r.run()
except:
self.log.exception("runner failed")
code = 1
r.shutdown()
return code
def shutdown(self):
pass
@classmethod
def main(cls):
logging.basicConfig()
logger = logging.getLogger("pyfit")
app = cls(log=logger)
try:
code = app.run()
except:
logger.exception("app failed")
code = 1
app.shutdown()
sys.exit(code)
main = App.main
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,829 @@
"""InstallUtils.py
"""
import os, sys
import stat
import logging
import subprocess
import tempfile
import string
import shutil
class SubprocessMixin:
V1 = "V1"
V2 = "V2"
def check_call(self, *args, **kwargs):
args = list(args)
kwargs = dict(kwargs)
cwd = kwargs.pop('cwd', None)
if cwd is not None:
self.log.debug("+ cd " + cwd)
if args:
cmd = args.pop(0)
else:
cmd = kwargs.pop('cmd')
vmode = kwargs.pop('vmode', None)
if vmode == self.V1 and self.log.isEnabledFor(logging.DEBUG):
if isinstance(cmd, basestring):
raise ValueError("vmode=V1 requires a list")
cmd = list(cmd)
cmd[1:1] = ['-v',]
if vmode == self.V2 and self.log.isEnabledFor(logging.DEBUG):
stdout = kwargs.pop('stdout', None)
stderr = kwargs.pop('stderr', None)
if stdout is not None:
raise ValueError("vmode=V2 conflicts with stdout")
if stderr is not None and stderr != subprocess.STDOUT:
raise ValueError("vmode=V2 conflicts with stderr")
fno, v2Out = tempfile.mkstemp(prefix='subprocess-',
suffix='out')
kwargs['stdout'] = fno
kwargs['stderr'] = subprocess.STDOUT
if isinstance(cmd, basestring):
self.log.debug("+ " + cmd)
else:
self.log.debug("+ " + " ".join(cmd))
if vmode == self.V2 and self.log.isEnabledFor(logging.DEBUG):
try:
subprocess.check_call(cmd, *args, cwd=cwd, **kwargs)
finally:
with open(v2Out) as fd:
sys.stderr.write(fd.read())
os.unlink(v2Out)
else:
subprocess.check_call(cmd, *args, cwd=cwd, **kwargs)
def check_output(self, *args, **kwargs):
args = list(args)
kwargs = dict(kwargs)
cwd = kwargs.pop('cwd', None)
if cwd is not None:
self.log.debug("+ cd " + cwd)
if args:
cmd = args.pop(0)
else:
cmd = kwargs.pop('cmd')
vmode = kwargs.pop('vmode', None)
if vmode == self.V1 and self.log.isEnabledFor(logging.DEBUG):
if isinstance(cmd, basestring):
raise ValueError("vmode=V1 requires a list")
cmd = list(cmd)
cmd[1:1] = ['-v',]
if vmode == self.V2 and self.log.isEnabledFor(logging.DEBUG):
stdout = kwargs.pop('stdout', None)
stderr = kwargs.pop('stderr', None)
if stdout is not None:
raise ValueError("vmode=V2 conflicts with stdout")
if stderr is not None and stderr != subprocess.STDOUT:
raise ValueError("vmode=V2 conflicts with stderr")
fno, v2Out = tempfile.mkstemp(prefix='subprocess-',
suffix='out')
kwargs['stderr'] = fno
if isinstance(cmd, basestring):
self.log.debug("+ " + cmd)
else:
self.log.debug("+ " + " ".join(cmd))
if vmode == self.V2 and self.log.isEnabledFor(logging.DEBUG):
try:
return subprocess.check_output(cmd, *args, cwd=cwd, **kwargs)
finally:
with open(v2Out) as fd:
sys.stderr.write(fd.read())
os.unlink(v2Out)
else:
return subprocess.check_output(cmd, *args, cwd=cwd, **kwargs)
def rmdir(self, path):
self.log.debug("+ /bin/rmdir %s", path)
os.rmdir(path)
def unlink(self, path):
self.log.debug("+ /bin/rm %s", path)
os.unlink(path)
def rmtree(self, path):
self.log.debug("+ /bin/rm -fr %s", path)
shutil.rmtree(path)
def mkdtemp(self, *args, **kwargs):
path = tempfile.mkdtemp(*args, **kwargs)
self.log.debug("+ /bin/mkdir %s", path)
return path
def copy2(self, src, dst):
self.log.debug("+ /bin/cp -a %s %s", src, dst)
shutil.copy2(src, dst)
def copyfile(self, src, dst):
self.log.debug("+ /bin/cp %s %s", src, dst)
shutil.copyfile(src, dst)
def mkdir(self, path):
self.log.debug("+ /bin/mkdir %s", path)
os.mkdir(path)
def makedirs(self, path):
self.log.debug("+ /bin/mkdir -p %s", path)
os.makedirs(path)
def symlink(self, tgt, dst):
self.log.debug("+ /bin/ln -s %s %s", tgt, dst)
os.symlink(tgt, dst)
class TempdirContext(SubprocessMixin):
def __init__(self, prefix=None, suffix=None, chroot=None, log=None):
self.prefix = prefix
self.suffix = suffix
self.chroot = chroot
self.dir = None
self.hostDir = None
self.log = log or logging.getLogger("mount")
def __enter__(self):
if self.chroot is not None:
self.hostDir = self.mkdtemp(prefix=self.prefix,
suffix=self.suffix,
dir=self.chroot + "/tmp")
self.dir = self.hostDir[len(self.chroot):]
else:
self.dir = self.hostDir = self.mkdtemp(prefix=self.prefix,
suffix=self.suffix)
return self
def __exit__(self, type, value, tb):
if self.path: self.rmtree(self.hostDir)
return False
class MountContext(SubprocessMixin):
def __init__(self, device=None, chroot=None, label=None, fsType=None, log=None):
self.device = device
self.chroot = chroot
self.label = label
self.fsType = fsType
self.dir = None
self.hostDir = None
self.mounted = False
self.log = log or logging.getLogger("mount")
if self.device and self.label:
raise ValueError("cannot specify device and label")
if not self.device and not self.label:
raise ValueError("no device or label specified")
def __enter__(self):
dev = self.device
if dev is None:
try:
dev = self.check_output(('blkid', '-L', self.label,)).strip()
except subprocess.CalledProcessError, what:
raise ValueError("cannot find label %s: %s"
% (self.label, str(what),))
if self.chroot is not None:
self.hostDir = self.mkdtemp(prefix="mount-",
suffix=".d",
dir=self.chroot + "/tmp")
self.dir = self.hostDir[len(self.chroot):]
else:
self.dir = self.hostDir = self.mkdtemp(prefix="mount-",
suffix=".d")
if self.fsType is not None:
cmd = ('mount', '-t', self.fsType, dev, self.hostDir,)
else:
cmd = ('mount', dev, self.hostDir,)
self.check_call(cmd, vmode=self.V1)
self.mounted = True
return self
def __exit__(self, type, value, tb):
mounted = False
if self.mounted:
p = ProcMountsParser()
for e in p.mounts:
if e.dir == self.hostDir:
mounted = True
break
# really mounted?
# maybe unmounted e.g. if inside a chroot
if mounted:
cmd = ('umount', self.hostDir,)
self.check_call(cmd, vmode=self.V1)
self.rmdir(self.hostDir)
return False
class BlkidEntry:
def __init__(self, device, **kwargs):
self.device = device
kwargs = dict(kwargs)
self.label = kwargs.pop('label', None)
self.uuid = kwargs.pop('uuid', None)
self.fsType = kwargs.pop('fsType', None)
@classmethod
def fromLine(cls, line):
line = line.strip()
p = line.find(':')
if p < 0:
raise ValueError("invalid blkid output %s"
% line)
dev, line = line[:p], line[p+1:].strip()
attrs = {}
while line:
p = line.find('=')
if p < 0:
raise ValueError("invalid blkid output %s"
% line)
key = line[:p].lower()
if line[p+1:p+2] == "'":
q = line.find("'", p+2)
if q < 0:
val, line = line[p+1:], ""
else:
val, line = line[p+2:q], line[q+1:].strip()
elif line[p+1:p+2] == '"':
q = line.find('"', p+2)
if q < 0:
val, line = line[p+1:], ""
else:
val, line = line[p+2:q], line[q+1:].strip()
else:
q = line.find(" ", p+1)
if q < 0:
val, line = line[p+1:], ""
else:
val, line = line[p+1:], line[:q].strip()
if key == 'type': key = 'fsType'
attrs[key] = val
return cls(dev, **attrs)
def splitDev(self):
dev, part = self.device, ""
while dev[-1:] in string.digits:
dev, part = dev[:-1], dev[-1] + part
return dev, part
def isOnieReserved(self):
if self.label is None: return False
if 'GRUB' in self.label: return True
if 'ONIE-BOOT' in self.label: return True
if 'DIAG' in self.label: return True
return False
class BlkidParser(SubprocessMixin):
def __init__(self, log=None):
self.log = log or logging.getLogger("blkid")
self.parse()
def parse(self):
cmd = ('blkid',)
lines = self.check_output(cmd).splitlines()
self.parts = [BlkidEntry.fromLine(line) for line in lines]
def __getitem__(self, idxOrName):
if type(idxOrName) == int:
return self.parts[idxOrName]
for part in self.parts:
if part.label == idxOrName: return part
if part.uuid == idxOrName: return part
raise IndexError("cannot find partition %s" % repr(idxOrName))
def __len__(self):
return len(self.parts)
class ProcMtdEntry:
def __init__(self,
charDevice, blockDevice,
offset, size, eraseSize,
label=None):
self.charDevice = charDevice
self.blockDevice = blockDevice
self.offset = offset
self.size = size
self.eraseSize = eraseSize
self.label = label
@classmethod
def fromLine(cls, line, offset=0):
buf = line.strip()
p = buf.find(':')
if p < 0:
raise ValueError("invalid /proc/mtd entry %s"
% line)
dev, buf = buf[:p], buf[p+1:].strip()
dev = '/dev/' + dev
if not os.path.exists(dev):
raise ValueError("invalid /proc/mtd entry %s (missing device)"
% line)
st = os.stat(dev)
if stat.S_ISBLK(st.st_mode):
cdev, bdev = None, dev
elif stat.S_ISCHR(st.st_mode):
cdev, bdev = dev, None
else:
cdev, bdev = None, None
if cdev and not bdev:
if cdev.startswith("/dev/mtd") and not cdev.startswith("/dev/mtdblock"):
bdev = "/dev/mtdblock" + cdev[8:]
if not os.path.exists(bdev):
raise ValueError("invalid /proc/mtd entry %s (cannot find block device)"
% line)
st = os.stat(bdev)
if not stat.S_ISBLK(st.st_mode):
raise ValueError("invalid /proc/mtd entry %s (cannot find block device)"
% line)
else:
raise ValueError("invalid /proc/mtd entry %s (cannot find block device)"
% line)
elif not bdev:
raise ValueError("invalid /proc/mtd entry %s (not a block or char device)"
% line)
p = buf.find(" ")
if p < 0:
raise ValueError("invalid /proc/mtd entry %s (missing size)"
% line)
sz, buf = buf[:p], buf[p+1:].strip()
sz = int(sz, 16)
if not buf:
raise ValueError("invalid /proc/mtd entry %s (missing erase size)"
% line)
p = buf.find(" ")
if p < 0:
esz, buf = buf, ""
else:
esz, buf = buf[:p], buf[p+1:].strip()
esz = int(esz, 16)
if not buf:
label = None
elif len(buf) > 1 and buf[0:1] == "'" and buf[-1:] == "'":
label = buf[1:-1]
elif len(buf) > 1 and buf[0:1] == '"' and buf[-1:] == '"':
label = buf[1:-1]
else:
label = buf
return cls(cdev, bdev, offset, sz, esz, label=label)
class ProcMtdParser():
def __init__(self, log=None):
self.log = log or logging.getLogger("blkid")
self.parse()
def parse(self):
self.parts = []
offset = 0
if os.path.exists("/proc/mtd"):
with open("/proc/mtd") as fd:
for line in fd.xreadlines():
if line.startswith("dev:"):
pass
else:
part = ProcMtdEntry.fromLine(line, offset=offset)
offset += part.size
self.parts.append(part)
def __getitem__(self, idxOrName):
if type(idxOrName) == int:
return self.parts[idxOrName]
for part in self.parts:
if part.label == idxOrName: return part
raise IndexError("cannot find MTD partition %s" % repr(idxOrName))
def __len__(self):
return len(self.parts)
class PartedDiskEntry:
def __init__(self, device, blocks, lbsz, pbsz,
model=None, typ=None, flags=[]):
self.device = device
self.blocks = blocks
self.lbsz = lbsz
self.pbsz = pbsz
self.model = model
self.typ = typ
self.flags = flags
@classmethod
def fromLine(cls, line):
line = line.strip()
if not line.endswith(';'):
raise ValueError("invalid parted line %s" % line)
line = line[:-1]
rec = line.split(':')
def _s():
secs = rec.pop(0)
if secs[-1:] != 's':
raise ValueError("invalid sector count %s" % secs)
return int(secs[:-1])
dev = rec.pop(0)
blocks = _s()
model = rec.pop(0) or None
lbsz = int(rec.pop(0), 10)
pbsz = int(rec.pop(0), 10)
typ = rec.pop(0)
label = rec.pop(0) or None
flags = rec.pop(0)
flags = [x.strip() for x in flags.split(',')]
if rec:
raise ValueError("invalid parted line %s" % line)
return cls(dev, blocks, lbsz, pbsz,
model=model, typ=typ,
flags=flags)
class PartedPartEntry:
def __init__(self, part, start, end, sz,
fs=None, label=None, flags=[]):
self.part = part
self.start = start
self.end = end
self.sz = sz
self.fs = fs
self.label = label
self.flags = flags
@classmethod
def fromLine(cls, line):
line = line.strip()
if not line.endswith(';'):
raise ValueError("invalid parted line %s" % line)
line = line[:-1]
rec = line.split(':')
def _s():
secs = rec.pop(0)
if secs[-1:] != 's':
raise ValueError("invalid sector count %s" % secs)
return int(secs[:-1])
part = int(rec.pop(0), 10)
if part < 1:
raise ValueError("invalid partition %d" % part)
start = _s()
end = _s()
sz = _s()
fs = rec.pop(0) or None
label = rec.pop(0) or None
flags = rec.pop(0)
flags = [x.strip() for x in flags.split(',')]
if rec:
raise ValueError("invalid parted line %s" % line)
return cls(part, start, end, sz,
fs=fs, label=label,
flags=flags)
class PartedParser(SubprocessMixin):
def __init__(self, device, log=None):
self.device = device
self.log = log or logging.getLogger("parted")
self.parse()
def parse(self):
cmd = ('parted', '-m', self.device,
'unit', 's',
'print',)
lines = self.check_output(cmd).splitlines()
self.disk = None
parts = {}
for line in lines:
if line.startswith('/dev/'):
self.disk = PartedDiskEntry.fromLine(line)
elif line[0:1] in string.digits:
ent = PartedPartEntry.fromLine(line)
if ent.part in parts:
raise ValueError("duplicate partition")
parts[ent.part] = ent
self.parts = []
for partno in sorted(parts.keys()):
self.parts.append(parts[partno])
if self.disk is None:
raise ValueError("no partition table found")
def __len__(self):
return len(self.parts)
class ProcMountsEntry:
def __init__(self, device, dir, fsType, flags={}):
self.device = device
self.dir = dir
self.fsType = fsType
self.flags = flags
@classmethod
def fromLine(cls, line):
buf = line.strip()
idx = buf.find(' ')
if idx < 0:
raise ValueError("invalid /proc/mounts line %s", line)
device, buf = buf[:idx], buf[idx+1:].strip()
idx = buf.find(' ')
if idx < 0:
raise ValueError("invalid /proc/mounts line %s", line)
dir, buf = buf[:idx], buf[idx+1:].strip()
idx = buf.find(' ')
if idx < 0:
raise ValueError("invalid /proc/mounts line %s", line)
fsType, buf = buf[:idx], buf[idx+1:].strip()
idx = buf.rfind(' ')
if idx < 0:
raise ValueError("invalid /proc/mounts line %s", line)
buf, _ = buf[:idx], buf[idx+1:].strip()
idx = buf.rfind(' ')
if idx < 0:
buf = ""
else:
buf, _ = buf[:idx], buf[idx+1:].strip()
flags = {}
if buf:
for flag in buf.split(','):
idx = flag.find('=')
if idx > -1:
key, val = flag[:idx], flag[idx+1:]
else:
key, val = flag, True
flags[key] = val
return cls(device, dir, fsType, flags)
class ProcMountsParser:
def __init__(self):
self.parse()
def parse(self):
self.mounts = []
with open("/proc/mounts") as fd:
for line in fd.readlines():
self.mounts.append(ProcMountsEntry.fromLine(line))
class InitrdContext(SubprocessMixin):
def __init__(self, initrd=None, dir=None, log=None):
if initrd is None and dir is None:
raise ValueError("missing initrd or initrd dir")
if initrd and dir:
raise ValueError("cannot specify initrd and initrd dir")
self.initrd = initrd
self.dir = dir
self.hlog = log or logging.getLogger("mount")
self.ilog = self.hlog.getChild("initrd")
self.ilog.setLevel(logging.INFO)
self.log = self.hlog
def _unpack(self):
self.dir = self.mkdtemp(prefix="chroot-",
suffix=".d")
with open(self.initrd) as fd:
mbuf = fd.read(1024)
if mbuf[0:2] == "\x1f\x8b":
c1 = ('gzip', '-dc', self.initrd,)
elif mbuf[0:2] == "BZ":
c1 = ('bzip2', '-dc', self.initrd,)
elif mbuf[0:6] == "\xfd7zXZ\x00":
c1 = ('xz', '-dc', self.initrd,)
else:
raise ValueError("cannot decode initrd")
c2 = ('cpio', '-imd',)
self.log.debug("+ %s | %s",
" ".join(c1), " ".join(c2))
p1 = subprocess.Popen(c1,
stdout=subprocess.PIPE)
if self.log.isEnabledFor(logging.DEBUG):
p2 = subprocess.Popen(c2,
cwd=self.dir,
stdin=p1.stdout)
else:
p2 = subprocess.Popen(c2,
cwd=self.dir,
stdin=p1.stdout,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
c1 = p1.wait()
out, _ = p2.communicate()
c2 = p2.wait()
if c2 and out:
sys.stderr.write(out)
if c1 or c2:
raise ValueError("initrd unpack failed")
def _prepDirs(self):
dev2 = os.path.join(self.dir, "dev")
if not os.path.exists(dev2):
self.mkdir(dev2)
for e in os.listdir(dev2):
dst = os.path.join(dev2, e)
if os.path.islink(dst):
self.unlink(dst)
elif os.path.isdir(dst):
self.rmtree(dst)
else:
self.unlink(dst)
for e in os.listdir("/dev"):
src = os.path.join("/dev", e)
dst = os.path.join(dev2, e)
if os.path.islink(src):
self.symlink(os.readlink(src), dst)
elif os.path.isdir(src):
self.mkdir(dst)
elif os.path.isfile(src):
self.copy2(src, dst)
else:
st = os.stat(src)
if stat.S_ISBLK(st.st_mode):
maj, min = os.major(st.st_rdev), os.minor(st.st_rdev)
self.log.debug("+ mknod %s b %d %d", dst, maj, min)
os.mknod(dst, st.st_mode, st.st_rdev)
elif stat.S_ISCHR(st.st_mode):
maj, min = os.major(st.st_rdev), os.minor(st.st_rdev)
self.log.debug("+ mknod %s c %d %d", dst, maj, min)
os.mknod(dst, st.st_mode, st.st_rdev)
else:
self.log.debug("skipping device %s", src)
dst = os.path.join(self.dir, "dev/pts")
if not os.path.exists(dst):
self.mkdir(dst)
if 'TMPDIR' in os.environ:
dst = self.dir + os.environ['TMPDIR']
if not os.path.exists(dst):
self.makedirs(dst)
def __enter__(self):
if self.initrd is not None:
self.log.debug("extracting initrd %s", self.initrd)
self._unpack()
self.log.debug("preparing chroot in %s", self.dir)
try:
self.log = self.ilog
self._prepDirs()
finally:
self.log = self.hlog
dst = os.path.join(self.dir, "proc")
cmd = ('mount', '-t', 'proc', 'proc', dst,)
self.check_call(cmd, vmode=self.V1)
dst = os.path.join(self.dir, "sys")
cmd = ('mount', '-t', 'sysfs', 'sysfs', dst,)
self.check_call(cmd, vmode=self.V1)
dst = os.path.join(self.dir, "dev/pts")
cmd = ('mount', '-t', 'devpts', 'devpts', dst,)
self.check_call(cmd, vmode=self.V1)
return self
def __exit__(self, type, value, tb):
p = ProcMountsParser()
dirs = [e.dir for e in p.mounts if e.dir.startswith(self.dir)]
# XXX probabaly also kill files here
# umount any nested mounts
self.log.debug("un-mounting mounts points in chroot %s", self.dir)
dirs.sort(reverse=True)
for p in dirs:
cmd = ('umount', p,)
self.check_call(cmd, vmode=self.V1)
if self.initrd is not None:
self.log.debug("cleaning up chroot in %s", self.dir)
self.rmtree(self.dir)
else:
self.log.debug("saving chroot in %s", self.dir)
return False
@classmethod
def mkChroot(self, initrd, log=None):
with InitrdContext(initrd=initrd, log=log) as ctx:
initrdDir = ctx.dir
ctx.initrd = None
# save the unpacked directory, do not clean it up
# (it's inside this chroot anyway)
return initrdDir
class ChrootSubprocessMixin:
chrootDir = None
mounted = False
# initialize this in a concrete class
def check_call(self, *args, **kwargs):
args = list(args)
kwargs = dict(kwargs)
cwd = kwargs.pop('cwd', None)
if cwd is not None:
self.log.debug("+ cd " + cwd)
if args:
cmd = args.pop(0)
else:
cmd = kwargs.pop('cmd')
if isinstance(cmd, basestring):
cmd = ('chroot', self.chrootDir,
'/bin/sh', '-c', 'IFS=;' + cmd,)
else:
cmd = ['chroot', self.chrootDir,] + list(cmd)
if not self.mounted:
with InitrdContext(dir=self.chrootDir, log=self.log) as ctx:
self.log.debug("+ " + " ".join(cmd))
subprocess.check_call(cmd, *args, cwd=cwd, **kwargs)
else:
self.log.debug("+ " + " ".join(cmd))
subprocess.check_call(cmd, *args, cwd=cwd, **kwargs)
def check_output(self, *args, **kwargs):
args = list(args)
kwargs = dict(kwargs)
cwd = kwargs.pop('cwd', None)
if cwd is not None:
self.log.debug("+ cd " + cwd)
if args:
cmd = args.pop(0)
else:
cmd = kwargs.pop('cmd')
if isinstance(cmd, basestring):
cmd = ('chroot', self.chrootDir,
'/bin/sh', '-c', 'IFS=;' + cmd,)
else:
cmd = ['chroot', self.chrootDir,] + list(cmd)
if not self.mounted:
with InitrdContext(self.chrootDir, log=self.log) as ctx:
self.log.debug("+ " + " ".join(cmd))
return subprocess.check_output(cmd, *args, cwd=cwd, **kwargs)
else:
self.log.debug("+ " + " ".join(cmd))
return subprocess.check_output(cmd, *args, cwd=cwd, **kwargs)

View File

@@ -0,0 +1,81 @@
"""RecoverApp.py
Application-level code for Switch Light recovery.
"""
import os, sys
import imp
import logging
from ConfUtils import UbootEnv
class App:
def __init__(self, log=None):
if log is not None:
self.log = log
else:
self.log = logging.getLogger(self.__class__.__name__)
self.recovery = None
def run(self):
if os.path.exists(UbootEnv.SETENV):
self.ubootEnv = UbootEnv(log=self.log.getChild("u-boot"))
else:
self.ubootEnv = None
# load the platform-specific blob
if os.path.exists("/etc/onl/platform"):
with open("/etc/onl/platform") as fd:
plat = fd.read().strip()
else:
self.log.error("cannot recover non-ONL platform")
return 1
p = ("/lib/platform-config/%s/python/recover.py"
% (plat,))
if not os.path.exists(p):
self.log.error("missing recover profile %s", p)
return 1
mod = imp.load_source("platform_recover", p)
# run the platform-specific installer
self.recovery = mod.Recovery(ubootEnv=self.ubootEnv,
log=self.log)
try:
code = self.recovery.run()
except:
self.log.exception("recovery failed")
code = 1
if code: return code
return 0
def shutdown(self):
recovery, self.recovery = self.recovery, None
if recovery is not None:
recovery.shutdown()
@classmethod
def main(cls):
logging.basicConfig()
logger = logging.getLogger("recover")
logger.setLevel(logging.DEBUG)
app = cls(log=logger)
try:
code = app.run()
except:
logger.exception("runner failed")
code = 1
app.shutdown()
sys.exit(code)
main = App.main
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,251 @@
"""ShellApp.py
"""
import os, sys
import glob
import tempfile
import logging
import subprocess
import argparse
import string
import struct
from InstallUtils import InitrdContext, MountContext
from InstallUtils import SubprocessMixin
from InstallUtils import ProcMountsParser, ProcMtdParser
import Fit
class AppBase(SubprocessMixin):
@property
def PROG(self):
raise NotImplementedError
def __init__(self, command=None, log=None):
if log is not None:
self.log = log
else:
self.log = logging.getLogger(self.__class__.__name__)
self.command = command
def _runInitrdShell(self, initrd):
with InitrdContext(initrd=initrd, log=self.log) as ctx:
if self.command is not None:
cmd = ('chroot', ctx.dir,
'/bin/sh', '-c', 'IFS=;' + self.command)
else:
cmd = ('chroot', ctx.dir,
'/bin/sh', '-i')
try:
self.check_call(cmd)
except subprocess.CalledProcessError, what:
pass
return 0
def _runMtdShell(self, device):
self.log.debug("parsing FIT image in %s", device)
p = Fit.Parser(path=device, log=self.log)
node = p.getInitrdNode()
if node is None:
self.log.error("cannot find initrd node in FDT")
return 1
prop = node.properties.get('data', None)
if prop is None:
self.log.error("cannot find initrd data property in FDT")
return 1
with open(device) as fd:
self.log.debug("reading initrd at [%x:%x]",
prop.offset, prop.offset+prop.sz)
fd.seek(prop.offset, 0)
buf = fd.read(prop.sz)
try:
fno, initrd = tempfile.mkstemp(prefix="initrd-",
suffix=".img")
self.log.debug("+ cat > %s", initrd)
with os.fdopen(fno, "w") as fd:
fd.write(buf)
return self._runInitrdShell(initrd)
finally:
self.unlink(initrd)
def shutdown(self):
pass
@classmethod
def main(cls):
logging.basicConfig()
logger = logging.getLogger(cls.PROG)
logger.setLevel(logging.INFO)
ap = argparse.ArgumentParser(prog=cls.PROG)
ap.add_argument('-v', '--verbose', action='store_true',
help='Enable verbose logging')
ap.add_argument('-q', '--quiet', action='store_true',
help='Suppress logging')
ap.add_argument('-c', type=str, dest='command',
help='Run a batch command')
try:
args = ap.parse_args()
except SystemExit, what:
sys.exit(what.code)
if args.verbose:
logger.setLevel(logging.DEBUG)
if args.quiet:
logger.setLevel(logging.ERROR)
app = cls(command=args.command, log=logger)
try:
code = app.run()
except:
logger.exception("runner failed")
code = 1
app.shutdown()
sys.exit(code)
class Onie(AppBase):
PROG = "onie-shell"
def run(self):
def _g(d):
pat = os.path.join(d, "onie/initrd.img*")
l = glob.glob(pat)
if l: return l[0]
return None
# try to find onie initrd on a mounted fs (GRUB)
initrd = _g("/mnt/onie-boot")
if initrd is not None:
self.log.debug("found ONIE initrd at %s", initrd)
return self._runInitrdShell(initrd)
# try to find the onie boot partition elsewhere
pm = ProcMountsParser()
try:
dev = self.check_output(('blkid', '-L', 'ONIE-BOOT',)).strip()
except subprocess.CalledProcessError, what:
dev = None
if dev is not None:
self.log.debug("found ONIE boot device %s", dev)
parts = [p for p in pm.mounts if p.device == dev]
if parts:
onieDir = parts[0]
self.log.debug("found ONIE boot mounted at %s", onieDir)
initrd = _g(onieDir)
if initrd is not None:
self.log.debug("found ONIE initrd at %s", initrd)
return _runInitrdShell(initrd)
else:
self.log.error("cannot find ONIE initrd")
return 1
else:
with MountContext(dev, fsType='ext4', log=self.log) as ctx:
initrd = _g(ctx.dir)
if initrd is not None:
self.log.debug("found ONIE initrd at %s", initrd)
return self._runInitrdShell(initrd)
else:
self.log.error("cannot find ONIE initrd")
return 1
# grovel through MTD devices (u-boot)
pm = ProcMtdParser(log=self.log)
parts = [p for p in pm.parts if p.label == "onie"]
if parts:
part = parts[0]
self.log.debug("found ONIE MTD device %s",
part.charDevice or part.blockDevice)
return self._runMtdShell(part.blockDevice)
elif pm.parts:
self.log.error("cannot find ONIE MTD device")
return 1
self.log.error("cannot find ONIE initrd")
return 1
class Loader(AppBase):
PROG = "loader-shell"
def run(self):
def _g(d):
pat = os.path.join(d, "initrd-*")
l = glob.glob(pat)
if l: return l[0]
return None
# try to find the loader boot partition as a formatted block device
pm = ProcMountsParser()
try:
dev = self.check_output(('blkid', '-L', 'SL-BOOT',)).strip()
except subprocess.CalledProcessError, what:
dev = None
if dev is not None:
self.log.debug("found loader device %s", dev)
parts = [p for p in pm.mounts if p.device == dev]
if parts:
loaderDir = parts[0]
self.log.debug("found loader device mounted at %s", loaderDir)
initrd = _g(loaderDir)
if initrd is not None:
self.log.debug("found loader initrd at %s", initrd)
return _runInitrdShell(initrd)
else:
self.log.error("cannot find loader initrd")
return 1
else:
with MountContext(dev, fsType='ext4', log=self.log) as ctx:
initrd = _g(ctx.dir)
if initrd is not None:
self.log.debug("found loader initrd at %s", initrd)
return self._runInitrdShell(initrd)
else:
self.log.error("cannot find loader initrd")
return 1
# try to find the loader partition on the same desk as /mnt/flash
try:
flashDev = self.check_output(('blkid', '-L', 'FLASH',)).strip()
except subprocess.CalledProcessError, what:
flashDev = None
if flashDev is not None:
self.log.debug("found flash device hint %s", flashDev)
loaderDev = flashDev
while loaderDev and loaderDev[-1] in string.digits:
loaderDev = loaderDev[:-1]
loaderDev = loaderDev + '1'
with open(loaderDev) as fd:
buf = fd.read(4)
magic = struct.unpack(">I", buf)[0]
if magic == Fit.Parser.FDT_MAGIC:
self.log.debug("found loader device %s", loaderDev)
return self._runMtdShell(loaderDev)
else:
self.log.error("bad FDT signature on %s %x",
loaderDev, magic)
return 1
# grovel through MTD devices (u-boot)
pm = ProcMtdParser(log=self.log)
parts = [p for p in pm.parts if p.label == "sl-boot"]
if parts:
part = parts[0]
self.log.debug("found loader MTD device %s",
part.charDevice or part.blockDevice)
return self._runMtdShell(part.blockDevice)
elif pm.parts:
self.log.error("cannot find loader MTD device")
return 1
self.log.error("cannot find loader initrd")
return 1
main = Onie.main
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,4 @@
"""__init__.py
Module setup for switchlight.install
"""