From 2bbbfd4e775b5c6f184583177110fb517c7ec989 Mon Sep 17 00:00:00 2001 From: Hoang Hong Quan Date: Sun, 23 Mar 2025 01:45:40 +0700 Subject: [PATCH] Refactor file gathering logic and implement additional logging --- OpCore-Simplify.py | 13 +-- Scripts/gathering_files.py | 231 +++++++++++++++++++++++++------------ 2 files changed, 158 insertions(+), 86 deletions(-) diff --git a/OpCore-Simplify.py b/OpCore-Simplify.py index 2176bdf..e5ed744 100644 --- a/OpCore-Simplify.py +++ b/OpCore-Simplify.py @@ -30,15 +30,6 @@ class OCPE: self.u = utils.Utils() self.result_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "Results") - def gathering_files(self, macos_version): - self.u.head("Gathering Files") - print("") - print("Please wait for download OpenCorePkg, kexts and macserial...") - print("") - - self.o.get_bootloader_kexts_data(self.k.kexts) - self.o.gather_bootloader_kexts(self.k.kexts, macos_version) - def select_hardware_report(self): self.hardware_sniffer = self.o.gather_hardware_sniffer() self.ac.dsdt = self.ac.acpi.acpi_tables = None @@ -368,7 +359,9 @@ class OCPE: smbios_model = self.s.customize_smbios_model(customized_hardware, smbios_model, macos_version) self.s.smbios_specific_options(customized_hardware, smbios_model, macos_version, self.ac.patches, self.k) elif option == 6: - self.gathering_files(macos_version) + if not self.o.gather_bootloader_kexts(self.k.kexts, macos_version): + continue + self.build_opencore_efi(customized_hardware, disabled_devices, smbios_model, macos_version, needs_oclp) self.results(customized_hardware, smbios_model, self.k.kexts) diff --git a/Scripts/gathering_files.py b/Scripts/gathering_files.py index e42061d..48935f1 100644 --- a/Scripts/gathering_files.py +++ b/Scripts/gathering_files.py @@ -6,6 +6,9 @@ import os import tempfile import shutil import subprocess +import platform + +os_name = platform.system() class gatheringFiles: def __init__(self): @@ -23,9 +26,9 @@ class gatheringFiles: self.bootloader_kexts_data_path = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "bootloader_kexts_data.json") self.download_history_file = os.path.join(self.ock_files_dir, "history.json") - def get_product_index(self, product_list, target_product_name): + def get_product_index(self, product_list, product_name_name): for index, product in enumerate(product_list): - if target_product_name == product.get("product_name"): + if product_name_name == product.get("product_name"): return index return None @@ -86,13 +89,15 @@ class gatheringFiles: if not os.path.exists(self.temporary_dir): raise FileNotFoundError("The directory {} does not exist.".format(self.temporary_dir)) + temp_product_dir = os.path.join(self.temporary_dir, product_name) + if not "OpenCore" in product_name: - kext_paths = self.utils.find_matching_paths(os.path.join(self.temporary_dir, product_name), extension_filter=".kext") + kext_paths = self.utils.find_matching_paths(temp_product_dir, extension_filter=".kext") for kext_path, type in kext_paths: source_kext_path = os.path.join(self.temporary_dir, product_name, kext_path) destination_kext_path = os.path.join(self.ock_files_dir, product_name, os.path.basename(kext_path)) - if "debug" in kext_path.lower() or "Contents" in kext_path or not self.kext.process_kext(os.path.join(self.temporary_dir, product_name), kext_path): + if "debug" in kext_path.lower() or "Contents" in kext_path or not self.kext.process_kext(temp_product_dir, kext_path): continue shutil.move(source_kext_path, destination_kext_path) @@ -105,22 +110,28 @@ class gatheringFiles: destination_config_path = os.path.join(destination_efi_path, "OC", "config.plist") shutil.move(source_config_path, destination_config_path) - ocbinarydata_dir = os.path.join(self.temporary_dir, "OcBinaryData", "OcBinaryData-master") + ocbinarydata_dir = os.path.join(self.temporary_dir, "OcBinaryData", "OcBinaryData-master") + if os.path.exists(ocbinarydata_dir): background_picker_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "datasets", "background_picker.icns") - if os.path.exists(ocbinarydata_dir): - for name in os.listdir(ocbinarydata_dir): - if name.startswith("."): - continue - shutil.copytree(os.path.join(ocbinarydata_dir, name), os.path.join(destination_efi_path, "OC", name), dirs_exist_ok=True) - resources_image_dir = os.path.join(destination_efi_path, "OC", "Resources", "Image") + product_dir = os.path.join(self.ock_files_dir, product_name) + efi_dirs = self.utils.find_matching_paths(product_dir, name_filter="EFI", type_filter="dir") + + for efi_dir, _ in efi_dirs: + for dir_name in os.listdir(ocbinarydata_dir): + source_dir = os.path.join(ocbinarydata_dir, dir_name) + destination_dir = os.path.join(destination_efi_path, "OC", dir_name) + if os.path.isdir(destination_dir): + shutil.copytree(source_dir, destination_dir, dirs_exist_ok=True) + + resources_image_dir = os.path.join(product_dir, efi_dir, "OC", "Resources", "Image") picker_variants = self.utils.find_matching_paths(resources_image_dir, type_filter="dir") - for picker_variant, type in picker_variants: + for picker_variant, _ in picker_variants: if ".icns" in ", ".join(os.listdir(os.path.join(resources_image_dir, picker_variant))): shutil.copy(background_picker_path, os.path.join(resources_image_dir, picker_variant, "Background.icns")) - macserial_paths = self.utils.find_matching_paths(os.path.join(self.temporary_dir, product_name), name_filter="macserial", type_filter="file") + macserial_paths = self.utils.find_matching_paths(temp_product_dir, name_filter="macserial", type_filter="file") if macserial_paths: - for macserial_path, type in macserial_paths: + for macserial_path, _ in macserial_paths: source_macserial_path = os.path.join(self.temporary_dir, product_name, macserial_path) destination_macserial_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), os.path.basename(macserial_path)) shutil.move(source_macserial_path, destination_macserial_path) @@ -130,18 +141,21 @@ class gatheringFiles: return True def gather_bootloader_kexts(self, kexts, macos_version): + self.utils.head("Gathering Files") + print("") + print("Please wait for download OpenCorePkg, kexts and macserial...") + download_history = self.utils.read_file(self.download_history_file) if not isinstance(download_history, list): download_history = [] - bootloader_kext_urls = self.utils.read_file(self.bootloader_kexts_data_path) - - if not isinstance(bootloader_kext_urls, list): - bootloader_kext_urls = self.get_bootloader_kexts_data(kexts) + bootloader_kext_urls = self.get_bootloader_kexts_data(kexts) self.utils.create_folder(self.temporary_dir) + seen_download_urls = set() + for product in kexts + [{"Name": "OpenCorePkg"}]: if not isinstance(product, dict) and not product.checked: continue @@ -165,24 +179,64 @@ class gatheringFiles: product_name = "IntelBluetoothFirmware" elif product_name.startswith("VoodooI2C"): product_name = "VoodooI2C" - - try: - product_index = self.get_product_index(bootloader_kext_urls, product_name) - product_download_url = bootloader_kext_urls[product_index].get("url") - product_id = bootloader_kext_urls[product_index].get("id") - except: - continue - - history_index = self.get_product_index(download_history, product_name) - if history_index is not None and product_id == download_history[history_index].get("id"): - continue - asset_dir = os.path.join(self.ock_files_dir, product_name) - self.utils.create_folder(asset_dir, remove_content=True) + product_download_index = self.get_product_index(bootloader_kext_urls, product_name) + if product_download_index is None: + if product.github_repo: + product_download_index = self.get_product_index(bootloader_kext_urls, product.github_repo.get("repo")) + + if product_download_index: + _, product_id, product_download_url = bootloader_kext_urls[product_download_index].values() + + if product_download_url in seen_download_urls: + continue + seen_download_urls.add(product_download_url) + else: + product_id = product_download_url = None + + product_history_index = self.get_product_index(download_history, product_name) + + print("") + if product_history_index == None: + print("Please wait for download {}...".format(product_name)) + else: + if product_id == download_history[product_history_index].get("id"): + print("Latest version of {} already downloaded.".format(product_name)) + continue + else: + print("Updating {}...".format(product_name)) + + if product_download_url: + print("from " + product_download_url) + else: + print("Could not find download URL for {}.".format(product_name)) + print("Please try again later.") + print("") + self.utils.request_input() + shutil.rmtree(self.temporary_dir, ignore_errors=True) + return False + print("") zip_path = os.path.join(self.temporary_dir, product_name) + ".zip" self.fetcher.download_and_save_file(product_download_url, zip_path) + + if not os.path.exists(zip_path): + if product_history_index: + print("Using previously version of {}.".format(product_name)) + continue + else: + print("") + print("Could not download {} at this time.".format(product_name)) + print("Please try again later.") + print("") + self.utils.request_input() + shutil.rmtree(self.temporary_dir, ignore_errors=True) + return False + self.utils.extract_zip_file(zip_path) + + asset_dir = os.path.join(self.ock_files_dir, product_name) + self.utils.create_folder(asset_dir, remove_content=True) while True: zip_files = self.utils.find_matching_paths(os.path.join(self.temporary_dir, product_name), extension_filter=".zip") @@ -196,21 +250,37 @@ class gatheringFiles: os.remove(full_zip_path) if "OpenCore" in product_name: - ocbinarydata_dir = os.path.join(self.temporary_dir, "OcBinaryData") - if not os.path.exists(ocbinarydata_dir): - zip_path = ocbinarydata_dir + ".zip" - self.fetcher.download_and_save_file(self.ocbinarydata_url, zip_path) - self.utils.extract_zip_file(zip_path) + zip_path = os.path.join(self.temporary_dir, "OcBinaryData.zip") + print("") + print("Please wait for download OcBinaryData...") + print("from " + self.ocbinarydata_url) + print("") + self.fetcher.download_and_save_file(self.ocbinarydata_url, zip_path) + + if not os.path.exists(zip_path): + print("") + print("Could not download OcBinaryData at this time.") + print("Please try again later.") + print("") + self.utils.request_input() + shutil.rmtree(self.temporary_dir, ignore_errors=True) + return False + + self.utils.extract_zip_file(zip_path) if self.move_bootloader_kexts_to_product_directory(product_name): - if history_index is None: - download_history.append({"product_name": product_name, "id": product_id}) + if product_history_index is None: + download_history.append({ + "product_name": product_name, + "id": product_id + }) else: - download_history[history_index]["id"] = product_id + download_history[product_history_index]["id"] = product_id self.utils.write_file(self.download_history_file, download_history) shutil.rmtree(self.temporary_dir, ignore_errors=True) + return True def get_kernel_patches(self, patches_name, patches_url): try: @@ -218,17 +288,17 @@ class gatheringFiles: return response["Kernel"]["Patch"] except: - print("\n") + print("") print("Unable to download {} at this time".format(patches_name)) print("from " + patches_url) print("") - print("Please try again later or apply them manually. ", end="") - self.utils.request_input() + print("Please try again later or apply them manually.") print("") + self.utils.request_input() return [] def gather_hardware_sniffer(self): - if os.name != "nt": + if os_name != "Windows": return self.utils.head("Gathering Files") @@ -236,47 +306,56 @@ class gatheringFiles: print("Please wait for download Hardware Sniffer") print("") - hardware_sniffer_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "Hardware-Sniffer-CLI.exe") + product_name = "Hardware-Sniffer-CLI.exe" - hardware_sniffer_cli = None + hardware_sniffer_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), product_name) - try: - download_history = self.utils.read_file(self.download_history_file) + download_history = self.utils.read_file(self.download_history_file) - if not isinstance(download_history, list): - download_history = [] + if not isinstance(download_history, list): + download_history = [] - latest_release = self.github.get_latest_release("lzhoang2801", "Hardware-Sniffer") or {} + product_id = product_download_url = None - for product in latest_release.get("assets"): - if product.get("product_name") == "Hardware-Sniffer-CLI": - hardware_sniffer_cli = product + latest_release = self.github.get_latest_release("lzhoang2801", "Hardware-Sniffer") or {} - history_index = self.get_product_index(download_history, "Hardware-Sniffer-CLI") - if history_index is not None and hardware_sniffer_cli.get("id") == download_history[history_index].get("id"): - if os.path.exists(hardware_sniffer_path): - return hardware_sniffer_path + for product in latest_release.get("assets"): + if product.get("product_name") == product_name: + _, product_id, product_download_url = product.values() + + product_history_index = self.get_product_index(download_history, product_name) - self.fetcher.download_and_save_file(hardware_sniffer_cli.get("url"), hardware_sniffer_path) - - if not os.path.exists(hardware_sniffer_path): - return - - if history_index is None: - download_history.append({"product_name": "Hardware-Sniffer-CLI", "id": hardware_sniffer_cli.get("id")}) + print("") + if product_history_index == None: + print("Please wait for download {}...".format(product_name)) + else: + if product_id == download_history[product_history_index].get("id") and os.path.exists(hardware_sniffer_path): + print("Latest version of {} already downloaded.".format(product_name)) + return hardware_sniffer_path else: - download_history[history_index]["id"] = hardware_sniffer_cli.get("id") - - self.utils.create_folder(os.path.dirname(self.download_history_file)) - self.utils.write_file(self.download_history_file, download_history) + print("Updating {}...".format(product_name)) - return hardware_sniffer_path - except: - print("Could not complete download Hardware Sniffer.") - print("Please download Hardware Sniffer CLI manually and place it in \"Scripts\" folder.") - if hardware_sniffer_cli: - print("You can manually download \"Hardware-Sniffer-CLI.exe\" from:\n {}\n".format(hardware_sniffer_cli.get("url"))) - print("Alternatively, export the hardware report manually to continue.") + if product_download_url: + print("from " + product_download_url) + else: + print("Could not find download URL for {}.".format(product_name)) + print("Please try again later.") print("") self.utils.request_input() - return \ No newline at end of file + return + print("") + + self.fetcher.download_and_save_file(product_download_url, hardware_sniffer_path) + + if product_history_index is None: + download_history.append({ + "product_name": product_name, + "id": product_id + }) + else: + download_history[product_history_index]["id"] = product_id + + self.utils.create_folder(os.path.dirname(self.download_history_file)) + self.utils.write_file(self.download_history_file, download_history) + + return hardware_sniffer_path \ No newline at end of file