Refactor file gathering logic and implement additional logging

This commit is contained in:
Hoang Hong Quan
2025-03-23 01:45:40 +07:00
parent 9e51313b4f
commit 2bbbfd4e77
2 changed files with 158 additions and 86 deletions

View File

@@ -30,15 +30,6 @@ class OCPE:
self.u = utils.Utils()
self.result_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "Results")
def gathering_files(self, macos_version):
self.u.head("Gathering Files")
print("")
print("Please wait for download OpenCorePkg, kexts and macserial...")
print("")
self.o.get_bootloader_kexts_data(self.k.kexts)
self.o.gather_bootloader_kexts(self.k.kexts, macos_version)
def select_hardware_report(self):
self.hardware_sniffer = self.o.gather_hardware_sniffer()
self.ac.dsdt = self.ac.acpi.acpi_tables = None
@@ -368,7 +359,9 @@ class OCPE:
smbios_model = self.s.customize_smbios_model(customized_hardware, smbios_model, macos_version)
self.s.smbios_specific_options(customized_hardware, smbios_model, macos_version, self.ac.patches, self.k)
elif option == 6:
self.gathering_files(macos_version)
if not self.o.gather_bootloader_kexts(self.k.kexts, macos_version):
continue
self.build_opencore_efi(customized_hardware, disabled_devices, smbios_model, macos_version, needs_oclp)
self.results(customized_hardware, smbios_model, self.k.kexts)

View File

@@ -6,6 +6,9 @@ import os
import tempfile
import shutil
import subprocess
import platform
os_name = platform.system()
class gatheringFiles:
def __init__(self):
@@ -23,9 +26,9 @@ class gatheringFiles:
self.bootloader_kexts_data_path = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "bootloader_kexts_data.json")
self.download_history_file = os.path.join(self.ock_files_dir, "history.json")
def get_product_index(self, product_list, target_product_name):
def get_product_index(self, product_list, product_name_name):
for index, product in enumerate(product_list):
if target_product_name == product.get("product_name"):
if product_name_name == product.get("product_name"):
return index
return None
@@ -86,13 +89,15 @@ class gatheringFiles:
if not os.path.exists(self.temporary_dir):
raise FileNotFoundError("The directory {} does not exist.".format(self.temporary_dir))
temp_product_dir = os.path.join(self.temporary_dir, product_name)
if not "OpenCore" in product_name:
kext_paths = self.utils.find_matching_paths(os.path.join(self.temporary_dir, product_name), extension_filter=".kext")
kext_paths = self.utils.find_matching_paths(temp_product_dir, extension_filter=".kext")
for kext_path, type in kext_paths:
source_kext_path = os.path.join(self.temporary_dir, product_name, kext_path)
destination_kext_path = os.path.join(self.ock_files_dir, product_name, os.path.basename(kext_path))
if "debug" in kext_path.lower() or "Contents" in kext_path or not self.kext.process_kext(os.path.join(self.temporary_dir, product_name), kext_path):
if "debug" in kext_path.lower() or "Contents" in kext_path or not self.kext.process_kext(temp_product_dir, kext_path):
continue
shutil.move(source_kext_path, destination_kext_path)
@@ -105,22 +110,28 @@ class gatheringFiles:
destination_config_path = os.path.join(destination_efi_path, "OC", "config.plist")
shutil.move(source_config_path, destination_config_path)
ocbinarydata_dir = os.path.join(self.temporary_dir, "OcBinaryData", "OcBinaryData-master")
ocbinarydata_dir = os.path.join(self.temporary_dir, "OcBinaryData", "OcBinaryData-master")
if os.path.exists(ocbinarydata_dir):
background_picker_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "datasets", "background_picker.icns")
if os.path.exists(ocbinarydata_dir):
for name in os.listdir(ocbinarydata_dir):
if name.startswith("."):
continue
shutil.copytree(os.path.join(ocbinarydata_dir, name), os.path.join(destination_efi_path, "OC", name), dirs_exist_ok=True)
resources_image_dir = os.path.join(destination_efi_path, "OC", "Resources", "Image")
product_dir = os.path.join(self.ock_files_dir, product_name)
efi_dirs = self.utils.find_matching_paths(product_dir, name_filter="EFI", type_filter="dir")
for efi_dir, _ in efi_dirs:
for dir_name in os.listdir(ocbinarydata_dir):
source_dir = os.path.join(ocbinarydata_dir, dir_name)
destination_dir = os.path.join(destination_efi_path, "OC", dir_name)
if os.path.isdir(destination_dir):
shutil.copytree(source_dir, destination_dir, dirs_exist_ok=True)
resources_image_dir = os.path.join(product_dir, efi_dir, "OC", "Resources", "Image")
picker_variants = self.utils.find_matching_paths(resources_image_dir, type_filter="dir")
for picker_variant, type in picker_variants:
for picker_variant, _ in picker_variants:
if ".icns" in ", ".join(os.listdir(os.path.join(resources_image_dir, picker_variant))):
shutil.copy(background_picker_path, os.path.join(resources_image_dir, picker_variant, "Background.icns"))
macserial_paths = self.utils.find_matching_paths(os.path.join(self.temporary_dir, product_name), name_filter="macserial", type_filter="file")
macserial_paths = self.utils.find_matching_paths(temp_product_dir, name_filter="macserial", type_filter="file")
if macserial_paths:
for macserial_path, type in macserial_paths:
for macserial_path, _ in macserial_paths:
source_macserial_path = os.path.join(self.temporary_dir, product_name, macserial_path)
destination_macserial_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), os.path.basename(macserial_path))
shutil.move(source_macserial_path, destination_macserial_path)
@@ -130,18 +141,21 @@ class gatheringFiles:
return True
def gather_bootloader_kexts(self, kexts, macos_version):
self.utils.head("Gathering Files")
print("")
print("Please wait for download OpenCorePkg, kexts and macserial...")
download_history = self.utils.read_file(self.download_history_file)
if not isinstance(download_history, list):
download_history = []
bootloader_kext_urls = self.utils.read_file(self.bootloader_kexts_data_path)
if not isinstance(bootloader_kext_urls, list):
bootloader_kext_urls = self.get_bootloader_kexts_data(kexts)
bootloader_kext_urls = self.get_bootloader_kexts_data(kexts)
self.utils.create_folder(self.temporary_dir)
seen_download_urls = set()
for product in kexts + [{"Name": "OpenCorePkg"}]:
if not isinstance(product, dict) and not product.checked:
continue
@@ -165,24 +179,64 @@ class gatheringFiles:
product_name = "IntelBluetoothFirmware"
elif product_name.startswith("VoodooI2C"):
product_name = "VoodooI2C"
try:
product_index = self.get_product_index(bootloader_kext_urls, product_name)
product_download_url = bootloader_kext_urls[product_index].get("url")
product_id = bootloader_kext_urls[product_index].get("id")
except:
continue
history_index = self.get_product_index(download_history, product_name)
if history_index is not None and product_id == download_history[history_index].get("id"):
continue
asset_dir = os.path.join(self.ock_files_dir, product_name)
self.utils.create_folder(asset_dir, remove_content=True)
product_download_index = self.get_product_index(bootloader_kext_urls, product_name)
if product_download_index is None:
if product.github_repo:
product_download_index = self.get_product_index(bootloader_kext_urls, product.github_repo.get("repo"))
if product_download_index:
_, product_id, product_download_url = bootloader_kext_urls[product_download_index].values()
if product_download_url in seen_download_urls:
continue
seen_download_urls.add(product_download_url)
else:
product_id = product_download_url = None
product_history_index = self.get_product_index(download_history, product_name)
print("")
if product_history_index == None:
print("Please wait for download {}...".format(product_name))
else:
if product_id == download_history[product_history_index].get("id"):
print("Latest version of {} already downloaded.".format(product_name))
continue
else:
print("Updating {}...".format(product_name))
if product_download_url:
print("from " + product_download_url)
else:
print("Could not find download URL for {}.".format(product_name))
print("Please try again later.")
print("")
self.utils.request_input()
shutil.rmtree(self.temporary_dir, ignore_errors=True)
return False
print("")
zip_path = os.path.join(self.temporary_dir, product_name) + ".zip"
self.fetcher.download_and_save_file(product_download_url, zip_path)
if not os.path.exists(zip_path):
if product_history_index:
print("Using previously version of {}.".format(product_name))
continue
else:
print("")
print("Could not download {} at this time.".format(product_name))
print("Please try again later.")
print("")
self.utils.request_input()
shutil.rmtree(self.temporary_dir, ignore_errors=True)
return False
self.utils.extract_zip_file(zip_path)
asset_dir = os.path.join(self.ock_files_dir, product_name)
self.utils.create_folder(asset_dir, remove_content=True)
while True:
zip_files = self.utils.find_matching_paths(os.path.join(self.temporary_dir, product_name), extension_filter=".zip")
@@ -196,21 +250,37 @@ class gatheringFiles:
os.remove(full_zip_path)
if "OpenCore" in product_name:
ocbinarydata_dir = os.path.join(self.temporary_dir, "OcBinaryData")
if not os.path.exists(ocbinarydata_dir):
zip_path = ocbinarydata_dir + ".zip"
self.fetcher.download_and_save_file(self.ocbinarydata_url, zip_path)
self.utils.extract_zip_file(zip_path)
zip_path = os.path.join(self.temporary_dir, "OcBinaryData.zip")
print("")
print("Please wait for download OcBinaryData...")
print("from " + self.ocbinarydata_url)
print("")
self.fetcher.download_and_save_file(self.ocbinarydata_url, zip_path)
if not os.path.exists(zip_path):
print("")
print("Could not download OcBinaryData at this time.")
print("Please try again later.")
print("")
self.utils.request_input()
shutil.rmtree(self.temporary_dir, ignore_errors=True)
return False
self.utils.extract_zip_file(zip_path)
if self.move_bootloader_kexts_to_product_directory(product_name):
if history_index is None:
download_history.append({"product_name": product_name, "id": product_id})
if product_history_index is None:
download_history.append({
"product_name": product_name,
"id": product_id
})
else:
download_history[history_index]["id"] = product_id
download_history[product_history_index]["id"] = product_id
self.utils.write_file(self.download_history_file, download_history)
shutil.rmtree(self.temporary_dir, ignore_errors=True)
return True
def get_kernel_patches(self, patches_name, patches_url):
try:
@@ -218,17 +288,17 @@ class gatheringFiles:
return response["Kernel"]["Patch"]
except:
print("\n")
print("")
print("Unable to download {} at this time".format(patches_name))
print("from " + patches_url)
print("")
print("Please try again later or apply them manually. ", end="")
self.utils.request_input()
print("Please try again later or apply them manually.")
print("")
self.utils.request_input()
return []
def gather_hardware_sniffer(self):
if os.name != "nt":
if os_name != "Windows":
return
self.utils.head("Gathering Files")
@@ -236,47 +306,56 @@ class gatheringFiles:
print("Please wait for download Hardware Sniffer")
print("")
hardware_sniffer_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "Hardware-Sniffer-CLI.exe")
product_name = "Hardware-Sniffer-CLI.exe"
hardware_sniffer_cli = None
hardware_sniffer_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), product_name)
try:
download_history = self.utils.read_file(self.download_history_file)
download_history = self.utils.read_file(self.download_history_file)
if not isinstance(download_history, list):
download_history = []
if not isinstance(download_history, list):
download_history = []
latest_release = self.github.get_latest_release("lzhoang2801", "Hardware-Sniffer") or {}
product_id = product_download_url = None
for product in latest_release.get("assets"):
if product.get("product_name") == "Hardware-Sniffer-CLI":
hardware_sniffer_cli = product
latest_release = self.github.get_latest_release("lzhoang2801", "Hardware-Sniffer") or {}
history_index = self.get_product_index(download_history, "Hardware-Sniffer-CLI")
if history_index is not None and hardware_sniffer_cli.get("id") == download_history[history_index].get("id"):
if os.path.exists(hardware_sniffer_path):
return hardware_sniffer_path
for product in latest_release.get("assets"):
if product.get("product_name") == product_name:
_, product_id, product_download_url = product.values()
product_history_index = self.get_product_index(download_history, product_name)
self.fetcher.download_and_save_file(hardware_sniffer_cli.get("url"), hardware_sniffer_path)
if not os.path.exists(hardware_sniffer_path):
return
if history_index is None:
download_history.append({"product_name": "Hardware-Sniffer-CLI", "id": hardware_sniffer_cli.get("id")})
print("")
if product_history_index == None:
print("Please wait for download {}...".format(product_name))
else:
if product_id == download_history[product_history_index].get("id") and os.path.exists(hardware_sniffer_path):
print("Latest version of {} already downloaded.".format(product_name))
return hardware_sniffer_path
else:
download_history[history_index]["id"] = hardware_sniffer_cli.get("id")
self.utils.create_folder(os.path.dirname(self.download_history_file))
self.utils.write_file(self.download_history_file, download_history)
print("Updating {}...".format(product_name))
return hardware_sniffer_path
except:
print("Could not complete download Hardware Sniffer.")
print("Please download Hardware Sniffer CLI manually and place it in \"Scripts\" folder.")
if hardware_sniffer_cli:
print("You can manually download \"Hardware-Sniffer-CLI.exe\" from:\n {}\n".format(hardware_sniffer_cli.get("url")))
print("Alternatively, export the hardware report manually to continue.")
if product_download_url:
print("from " + product_download_url)
else:
print("Could not find download URL for {}.".format(product_name))
print("Please try again later.")
print("")
self.utils.request_input()
return
return
print("")
self.fetcher.download_and_save_file(product_download_url, hardware_sniffer_path)
if product_history_index is None:
download_history.append({
"product_name": product_name,
"id": product_id
})
else:
download_history[product_history_index]["id"] = product_id
self.utils.create_folder(os.path.dirname(self.download_history_file))
self.utils.write_file(self.download_history_file, download_history)
return hardware_sniffer_path