mirror of
https://github.com/Telecominfraproject/openafc_final.git
synced 2025-11-01 10:37:51 +00:00
185 lines
6.1 KiB
Python
Executable File
185 lines
6.1 KiB
Python
Executable File
#!/usr/bin/python
|
|
''' This program takes a source RPM repository (or collection of packages)
|
|
An moves files out of the source tree into a destination directory based
|
|
on the package name and architecture.
|
|
'''
|
|
|
|
import os
|
|
import argparse
|
|
import re
|
|
import shutil
|
|
import rpm
|
|
import logging
|
|
import time
|
|
from .util import clear_path, find_bin
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
def any_match(matchers, value):
|
|
''' Determine if a matcher function list is non-empty and has any matches
|
|
for a value. '''
|
|
return any(item(value) for item in matchers)
|
|
|
|
|
|
class Matcher(object):
|
|
''' Keep track of a named item to match, and whether or not any
|
|
candidate text has matched yet.
|
|
|
|
:param re_match: A regular expression pattern to match.
|
|
:type re_match: str
|
|
'''
|
|
|
|
def __init__(self, re_match):
|
|
self._re = re.compile('^' + re_match + '$')
|
|
self._matched = False
|
|
|
|
def matched(self):
|
|
''' Determine if this matcher has ever matched a value.
|
|
:return: True if any match has occurred.
|
|
'''
|
|
return self._matched
|
|
|
|
def __str__(self):
|
|
return self._re.pattern
|
|
|
|
def _check(self, text):
|
|
match = self._re.match(text)
|
|
return bool(match is not None)
|
|
|
|
def __call__(self, text):
|
|
if self._check(text):
|
|
self._matched = True
|
|
return True
|
|
return False
|
|
|
|
|
|
def main(argv):
|
|
parser = argparse.ArgumentParser(
|
|
argv[0],
|
|
description='Move RPM packages into a repository tree.'
|
|
)
|
|
parser.add_argument(
|
|
'--log-level',
|
|
dest='log_level',
|
|
default='info',
|
|
metavar='LEVEL',
|
|
help='Console logging lowest level displayed. Defaults to info.')
|
|
parser.add_argument(
|
|
'--include-pattern',
|
|
type=str,
|
|
action='append',
|
|
default=[],
|
|
help='A regexp pattern to determine if a package "name.arch" (not file name) is taken.')
|
|
parser.add_argument(
|
|
'--include-list',
|
|
type=str,
|
|
help='A text file containing an include pattern on each line.')
|
|
parser.add_argument('--ignore-dupes', action='store_true', default=False,
|
|
help='Ignore duplicate versions of a package name.')
|
|
parser.add_argument(
|
|
'--ignore-input-signatures',
|
|
action='store_true',
|
|
default=False,
|
|
help='Ignore package signatures on inputs. Packages may be re-signed and those signatures lost anyway.')
|
|
parser.add_argument('inpath', type=str, nargs='+',
|
|
help='The input path to scan for files.')
|
|
parser.add_argument('outpath', type=str,
|
|
help='The output directory to move files into.')
|
|
args = parser.parse_args(argv[1:])
|
|
|
|
log_level_name = args.log_level.upper()
|
|
logging.basicConfig(level=log_level_name, format='%(message)s')
|
|
|
|
# include if matching
|
|
pkg_incl = []
|
|
for pat in args.include_pattern:
|
|
pkg_incl.append(Matcher(pat))
|
|
if args.include_list:
|
|
with open(args.include_list, 'r') as listfile:
|
|
for line in listfile:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
pkg_incl.append(Matcher(line))
|
|
# exclude if matching
|
|
pkg_excl = []
|
|
|
|
# read package name, not file name
|
|
rpm_trans = rpm.TransactionSet()
|
|
if args.ignore_input_signatures:
|
|
rpm_trans.setVSFlags(rpm._RPMVSF_NOSIGNATURES)
|
|
|
|
# Clean up first
|
|
tgtpath = args.outpath
|
|
if not os.path.exists(tgtpath):
|
|
os.makedirs(tgtpath)
|
|
LOGGER.info('Clearing output: %s', tgtpath)
|
|
clear_path(tgtpath)
|
|
|
|
# Error on duplicates
|
|
seen_fullnames = {}
|
|
|
|
for inpath in args.inpath:
|
|
LOGGER.info('Scanning input: %s', inpath)
|
|
for (dirpath, dirnames, filenames) in os.walk(inpath):
|
|
for filename in filenames:
|
|
# Only care about RPM files
|
|
if not filename.endswith('.rpm'):
|
|
continue
|
|
|
|
# get package info
|
|
src_path = os.path.join(dirpath, filename)
|
|
rel_path = os.path.relpath(src_path, inpath)
|
|
with open(src_path, 'rb') as rpmfile:
|
|
try:
|
|
head = rpm_trans.hdrFromFdno(rpmfile.fileno())
|
|
except rpm.error as err:
|
|
LOGGER.warning('Bad header in "%s"', rel_path)
|
|
raise
|
|
pkgname = head[rpm.RPMTAG_NAME]
|
|
# source rpms have no source name,
|
|
# so use the same name as 'yum' does
|
|
if not head[rpm.RPMTAG_SOURCERPM]:
|
|
archname = 'src'
|
|
else:
|
|
archname = head[rpm.RPMTAG_ARCH]
|
|
|
|
fullname = '{0}.{1}'.format(pkgname, archname)
|
|
LOGGER.info('File %s name %s', rel_path, fullname)
|
|
|
|
if not any_match(pkg_incl, fullname):
|
|
LOGGER.info('Ignoring name %s', fullname)
|
|
continue
|
|
if pkg_excl and any_match(pkg_excl, fullname):
|
|
LOGGER.info('Excluding name %s', fullname)
|
|
continue
|
|
|
|
# check for dupes
|
|
if fullname in seen_fullnames and not args.ignore_dupes:
|
|
raise ValueError(
|
|
'Duplicate package "{0}" in "{1}" and "{2}"'.format(
|
|
fullname, rel_path, seen_fullnames[fullname]))
|
|
seen_fullnames[fullname] = rel_path
|
|
|
|
# move, preserving permission/time and relative tree structure
|
|
dst_path = os.path.join(tgtpath, rel_path)
|
|
dst_dir = os.path.dirname(dst_path)
|
|
if not os.path.exists(dst_dir):
|
|
os.makedirs(dst_dir)
|
|
LOGGER.info('Moving %s to %s', fullname, dst_path)
|
|
shutil.move(src_path, dst_path)
|
|
|
|
unmatched = set()
|
|
for matcher in pkg_incl:
|
|
if not matcher.matched():
|
|
unmatched.add(str(matcher))
|
|
if unmatched:
|
|
raise RuntimeError(
|
|
'Unmatched include names: {0}'.format(', '.join(unmatched)))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
import sys
|
|
sys.exit(main(sys.argv))
|