diff --git a/OpCore-Simplify.py b/OpCore-Simplify.py index 979bdcf..1324c0f 100644 --- a/OpCore-Simplify.py +++ b/OpCore-Simplify.py @@ -276,6 +276,9 @@ class OCPE: if not tool_path in tool_loaded: files_to_remove.append(os.path.join(tools_directory, tool_path)) + if "manifest.json" in os.listdir(self.result_dir): + files_to_remove.append(os.path.join(self.result_dir, "manifest.json")) + for file_path in files_to_remove: try: if os.path.isdir(file_path): @@ -421,7 +424,12 @@ class OCPE: self.s.smbios_specific_options(customized_hardware, smbios_model, macos_version, self.ac.patches, self.k) continue - if not self.o.gather_bootloader_kexts(self.k.kexts, macos_version): + try: + self.o.gather_bootloader_kexts(self.k.kexts, macos_version) + except Exception as e: + print("\033[91mError: {}\033[0m".format(e)) + print("") + self.u.request_input("Press Enter to continue...") continue self.build_opencore_efi(customized_hardware, disabled_devices, smbios_model, macos_version, needs_oclp) diff --git a/Scripts/gathering_files.py b/Scripts/gathering_files.py index 1690376..9fb20f3 100644 --- a/Scripts/gathering_files.py +++ b/Scripts/gathering_files.py @@ -1,5 +1,6 @@ from Scripts import github from Scripts import kext_maestro +from Scripts import integrity_checker from Scripts import resource_fetcher from Scripts import utils import os @@ -15,6 +16,7 @@ class gatheringFiles: self.github = github.Github() self.kext = kext_maestro.KextMaestro() self.fetcher = resource_fetcher.ResourceFetcher() + self.integrity_checker = integrity_checker.IntegrityChecker() self.dortania_builds_url = "https://raw.githubusercontent.com/dortania/build-repo/builds/latest.json" self.ocbinarydata_url = "https://github.com/acidanthera/OcBinaryData/archive/refs/heads/master.zip" self.amd_vanilla_patches_url = "https://raw.githubusercontent.com/AMD-OSX/AMD_Vanilla/beta/patches.plist" @@ -22,7 +24,6 @@ class gatheringFiles: self.hyper_threading_patches_url = "https://github.com/b00t0x/CpuTopologyRebuild/raw/refs/heads/master/patches_ht.plist" self.temporary_dir = self.utils.get_temporary_dir() self.ock_files_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "OCK_Files") - self.bootloader_kexts_data_path = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "bootloader_kexts_data.json") self.download_history_file = os.path.join(self.ock_files_dir, "history.json") def get_product_index(self, product_list, product_name_name): @@ -31,58 +32,55 @@ class gatheringFiles: return index return None - def get_bootloader_kexts_data(self, kexts): - download_urls = self.utils.read_file(self.bootloader_kexts_data_path) - - if not isinstance(download_urls, list): - download_urls = [] - + def update_download_database(self, kexts, download_history): dortania_builds_data = self.fetcher.fetch_and_parse_content(self.dortania_builds_url, "json") seen_repos = set() - def add_product_to_download_urls(products): + def add_product_to_download_database(products): if isinstance(products, dict): products = [products] for product in products: - product_index = self.get_product_index(download_urls, product.get("product_name")) + if not product or not product.get("product_name"): + continue + + product_index = self.get_product_index(download_history, product.get("product_name")) if product_index is None: - download_urls.append(product) + download_history.append(product) else: - download_urls[product_index] = product + download_history[product_index].update(product) for kext in kexts: if not kext.checked: continue if kext.download_info: - add_product_to_download_urls({"product_name": kext.name, **kext.download_info}) + if not kext.download_info.get("sha256"): + kext.download_info["sha256"] = None + add_product_to_download_database({"product_name": kext.name, **kext.download_info}) elif kext.github_repo and kext.github_repo.get("repo") not in seen_repos: name = kext.github_repo.get("repo") seen_repos.add(name) if name in dortania_builds_data: - add_product_to_download_urls({ + add_product_to_download_database({ "product_name": name, "id": dortania_builds_data[name]["versions"][0]["release"]["id"], - "url": dortania_builds_data[name]["versions"][0]["links"]["release"] + "url": dortania_builds_data[name]["versions"][0]["links"]["release"], + "sha256": dortania_builds_data[name]["versions"][0]["hashes"]["release"]["sha256"] }) else: latest_release = self.github.get_latest_release(kext.github_repo.get("owner"), kext.github_repo.get("repo")) or {} - add_product_to_download_urls(latest_release.get("assets")) + add_product_to_download_database(latest_release.get("assets")) - add_product_to_download_urls({ + add_product_to_download_database({ "product_name": "OpenCorePkg", "id": dortania_builds_data["OpenCorePkg"]["versions"][0]["release"]["id"], - "url": dortania_builds_data["OpenCorePkg"]["versions"][0]["links"]["release"] + "url": dortania_builds_data["OpenCorePkg"]["versions"][0]["links"]["release"], + "sha256": dortania_builds_data["OpenCorePkg"]["versions"][0]["hashes"]["release"]["sha256"] }) - sorted_download_urls = sorted(download_urls, key=lambda x:x["product_name"]) - - self.utils.create_folder(self.ock_files_dir) - self.utils.write_file(self.bootloader_kexts_data_path, sorted_download_urls) - - return sorted_download_urls + return sorted(download_history, key=lambda x:x["product_name"]) def move_bootloader_kexts_to_product_directory(self, product_name): if not os.path.exists(self.temporary_dir): @@ -145,11 +143,10 @@ class gatheringFiles: print("Please wait for download OpenCorePkg, kexts and macserial...") download_history = self.utils.read_file(self.download_history_file) - if not isinstance(download_history, list): download_history = [] - bootloader_kext_urls = self.get_bootloader_kexts_data(kexts) + download_database = self.update_download_database(kexts, download_history) self.utils.create_folder(self.temporary_dir) @@ -183,104 +180,95 @@ class gatheringFiles: elif product_name == "UTBDefault": product_name = "USBToolBox" - product_download_index = self.get_product_index(bootloader_kext_urls, product_name) + product_download_index = self.get_product_index(download_database, product_name) if product_download_index is None: - if product.github_repo: - product_download_index = self.get_product_index(bootloader_kext_urls, product.github_repo.get("repo")) + if hasattr(product, 'github_repo') and product.github_repo: + product_download_index = self.get_product_index(download_database, product.github_repo.get("repo")) - if product_download_index is not None: - _, product_id, product_download_url = bootloader_kext_urls[product_download_index].values() + if product_download_index is None: + print("\n") + print("Could not find download URL for {}.".format(product_name)) + continue - if product_download_url in seen_download_urls: - continue - seen_download_urls.add(product_download_url) - else: - product_id = product_download_url = None + product_info = download_database[product_download_index] + product_id = product_info.get("id") + product_download_url = product_info.get("url") + sha256_hash = product_info.get("sha256") + + if product_download_url in seen_download_urls: + continue + seen_download_urls.add(product_download_url) product_history_index = self.get_product_index(download_history, product_name) + asset_dir = os.path.join(self.ock_files_dir, product_name) + manifest_path = os.path.join(asset_dir, "manifest.json") + + if product_history_index is not None: + history_item = download_history[product_history_index] + is_latest_id = (product_id == history_item.get("id")) + folder_is_valid, _ = self.integrity_checker.verify_folder_integrity(asset_dir, manifest_path) + + if is_latest_id and folder_is_valid: + print(f"\nLatest version of {product_name} already downloaded.") + continue print("") - if product_history_index is None: - print("Please wait for download {}...".format(product_name)) - else: - if product_id == download_history[product_history_index].get("id"): - print("Latest version of {} already downloaded.".format(product_name)) - continue - else: - print("Updating {}...".format(product_name)) - - if product_download_url is not None: - print("from " + product_download_url) + print("Updating" if product_history_index is not None else "Please wait for download", end=" ") + print("{}...".format(product_name)) + print("") + if product_download_url: + print("from {}".format(product_download_url)) + print("") else: + print("") print("Could not find download URL for {}.".format(product_name)) - print("Please try again later.") print("") self.utils.request_input() shutil.rmtree(self.temporary_dir, ignore_errors=True) return False - print("") zip_path = os.path.join(self.temporary_dir, product_name) + ".zip" - self.fetcher.download_and_save_file(product_download_url, zip_path) - - if not os.path.exists(zip_path): - if product_history_index is not None: - print("Using previously version of {}.".format(product_name)) + if not self.fetcher.download_and_save_file(product_download_url, zip_path, sha256_hash): + folder_is_valid, _ = self.integrity_checker.verify_folder_integrity(asset_dir, manifest_path) + if product_history_index is not None and folder_is_valid: + print("Using previously downloaded version of {}.".format(product_name)) continue else: - print("") - print("Could not download {} at this time.".format(product_name)) - print("Please try again later.") - print("") - self.utils.request_input() - shutil.rmtree(self.temporary_dir, ignore_errors=True) - return False - + raise Exception("Could not download {} at this time. Please try again later.".format(product_name)) + self.utils.extract_zip_file(zip_path) - - asset_dir = os.path.join(self.ock_files_dir, product_name) self.utils.create_folder(asset_dir, remove_content=True) while True: - zip_files = self.utils.find_matching_paths(os.path.join(self.temporary_dir, product_name), extension_filter=".zip") - - if not zip_files: + nested_zip_files = self.utils.find_matching_paths(os.path.join(self.temporary_dir, product_name), extension_filter=".zip") + if not nested_zip_files: break - - for zip_file, file_type in zip_files: + for zip_file, _ in nested_zip_files: full_zip_path = os.path.join(self.temporary_dir, product_name, zip_file) self.utils.extract_zip_file(full_zip_path) os.remove(full_zip_path) if "OpenCore" in product_name: - zip_path = os.path.join(self.temporary_dir, "OcBinaryData.zip") + oc_binary_data_zip_path = os.path.join(self.temporary_dir, "OcBinaryData.zip") print("") print("Please wait for download OcBinaryData...") - print("from " + self.ocbinarydata_url) + print("from {}".format(self.ocbinarydata_url)) print("") - self.fetcher.download_and_save_file(self.ocbinarydata_url, zip_path) + self.fetcher.download_and_save_file(self.ocbinarydata_url, oc_binary_data_zip_path) - if not os.path.exists(zip_path): + if not os.path.exists(oc_binary_data_zip_path): print("") print("Could not download OcBinaryData at this time.") - print("Please try again later.") - print("") + print("Please try again later.\n") self.utils.request_input() shutil.rmtree(self.temporary_dir, ignore_errors=True) return False - self.utils.extract_zip_file(zip_path) + self.utils.extract_zip_file(oc_binary_data_zip_path) if self.move_bootloader_kexts_to_product_directory(product_name): - if product_history_index is None: - download_history.append({ - "product_name": product_name, - "id": product_id - }) - else: - download_history[product_history_index]["id"] = product_id - - self.utils.write_file(self.download_history_file, download_history) + self.integrity_checker.generate_folder_manifest(asset_dir, manifest_path) + self._update_download_history(download_history, product_name, product_id, product_download_url, sha256_hash) shutil.rmtree(self.temporary_dir, ignore_errors=True) return True @@ -300,65 +288,92 @@ class gatheringFiles: self.utils.request_input() return [] - def gather_hardware_sniffer(self): - if os_name != "Windows": - return - - self.utils.head("Gathering Files") - print("") - print("Please wait for download Hardware Sniffer") - print("") - - product_name = "Hardware-Sniffer-CLI.exe" - - hardware_sniffer_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), product_name) - - download_history = self.utils.read_file(self.download_history_file) - - if not isinstance(download_history, list): - download_history = [] - - product_id = product_download_url = None - - latest_release = self.github.get_latest_release("lzhoang2801", "Hardware-Sniffer") or {} - - for product in latest_release.get("assets"): - if product.get("product_name") == product_name.split(".")[0]: - _, product_id, product_download_url = product.values() - + def _update_download_history(self, download_history, product_name, product_id, product_url, sha256_hash): product_history_index = self.get_product_index(download_history, product_name) - - print("") - if product_history_index == None: - print("Please wait for download {}...".format(product_name)) - else: - if product_id == download_history[product_history_index].get("id") and os.path.exists(hardware_sniffer_path): - print("Latest version of {} already downloaded.".format(product_name)) - return hardware_sniffer_path - else: - print("Updating {}...".format(product_name)) - - if product_download_url: - print("from " + product_download_url) - else: - print("Could not find download URL for {}.".format(product_name)) - print("Please try again later.") - print("") - self.utils.request_input() - return - print("") - - self.fetcher.download_and_save_file(product_download_url, hardware_sniffer_path) + entry = { + "product_name": product_name, + "id": product_id, + "url": product_url, + "sha256": sha256_hash + } + if product_history_index is None: - download_history.append({ - "product_name": product_name, - "id": product_id - }) + download_history.append(entry) else: - download_history[product_history_index]["id"] = product_id + download_history[product_history_index].update(entry) self.utils.create_folder(os.path.dirname(self.download_history_file)) self.utils.write_file(self.download_history_file, download_history) + + def gather_hardware_sniffer(self): + if os_name != "Windows": + return - return hardware_sniffer_path \ No newline at end of file + self.utils.head("Gathering Hardware Sniffer") + + PRODUCT_NAME = "Hardware-Sniffer-CLI.exe" + REPO_OWNER = "lzhoang2801" + REPO_NAME = "Hardware-Sniffer" + + destination_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), PRODUCT_NAME) + + latest_release = self.github.get_latest_release(REPO_OWNER, REPO_NAME) or {} + + product_id = None + product_download_url = None + sha256_hash = None + + asset_name = PRODUCT_NAME.split('.')[0] + for asset in latest_release.get("assets", []): + if asset.get("product_name") == asset_name: + product_id = asset.get("id") + product_download_url = asset.get("url") + sha256_hash = asset.get("sha256") + break + + if not all([product_id, product_download_url, sha256_hash]): + print("") + print("Could not find release information for {}.".format(PRODUCT_NAME)) + print("Please try again later.") + print("") + self.utils.request_input() + raise Exception("Could not find release information for {}.".format(PRODUCT_NAME)) + + download_history = self.utils.read_file(self.download_history_file) + if not isinstance(download_history, list): + download_history = [] + + product_history_index = self.get_product_index(download_history, PRODUCT_NAME) + + if product_history_index is not None: + history_item = download_history[product_history_index] + is_latest_id = (product_id == history_item.get("id")) + + file_is_valid = False + if os.path.exists(destination_path): + local_hash = self.integrity_checker.get_sha256(destination_path) + file_is_valid = (sha256_hash == local_hash) + + if is_latest_id and file_is_valid: + print("") + print("Latest version of {} already downloaded.".format(PRODUCT_NAME)) + return destination_path + + print("") + print("Updating" if product_history_index is not None else "Please wait for download", end=" ") + print("{}...".format(PRODUCT_NAME)) + print("") + print("from {}".format(product_download_url)) + print("") + + if not self.fetcher.download_and_save_file(product_download_url, destination_path, sha256_hash): + manual_download_url = f"https://github.com/{REPO_OWNER}/{REPO_NAME}/releases/latest" + print("Go to {} to download {} manually.".format(manual_download_url, PRODUCT_NAME)) + print("") + self.utils.request_input() + raise Exception("Failed to download {}.".format(PRODUCT_NAME)) + + self._update_download_history(download_history, PRODUCT_NAME, product_id, product_download_url, sha256_hash) + + return destination_path \ No newline at end of file diff --git a/Scripts/github.py b/Scripts/github.py index 330d1e7..ba9fcf1 100644 --- a/Scripts/github.py +++ b/Scripts/github.py @@ -82,32 +82,37 @@ class Github: assets = [] in_li_block = False - download_link = None for line in response.splitlines(): if "