Change the method for retrieving bootloader and kexts download information

This commit is contained in:
Hoang Hong Quan
2024-09-29 02:35:39 +07:00
parent bfc9869214
commit 89515ab5c6
2 changed files with 43 additions and 35 deletions

View File

@@ -6,7 +6,6 @@ import os
import tempfile
import shutil
import subprocess
from datetime import datetime, timedelta
class gatheringFiles:
def __init__(self):
@@ -19,53 +18,59 @@ class gatheringFiles:
self.ock_files_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "OCK_Files")
self.bootloader_kexts_data_path = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "bootloader_kexts_data.json")
self.download_history_file = os.path.join(self.ock_files_dir, "history.json")
def get_product_index(self, product_list, target_product_name):
for index, product in enumerate(product_list):
if target_product_name == product.get("product_name"):
return index
return None
def get_bootloader_kexts_data(self):
last_updated_str = (self.utils.read_file(self.bootloader_kexts_data_path) or {}).get("last_updated", "2024-07-25T12:00:00")
last_updated = datetime.fromisoformat(last_updated_str)
download_urls = self.utils.read_file(self.bootloader_kexts_data_path)
current_time = datetime.now()
if current_time - last_updated < timedelta(minutes=15):
return
download_urls = []
if not isinstance(download_urls, list):
download_urls = []
dortania_builds_data = self.fetcher.fetch_and_parse_content(self.dortania_builds_url, "json")
seen_repos = set()
def add_product_to_download_urls(products):
if isinstance(products, dict):
products = [products]
for product in products:
product_index = self.get_product_index(download_urls, product.get("product_name"))
if product_index is None:
download_urls.append(product)
else:
download_urls[product_index] = product
for kext in kexts:
if kext.download_info:
download_urls.append({"product_name": kext.name, **kext.download_info})
add_product_to_download_urls({"product_name": kext.name, **kext.download_info})
elif kext.github_repo and kext.github_repo.get("repo") not in seen_repos:
name = kext.github_repo.get("repo")
seen_repos.add(name)
if name in dortania_builds_data:
download_urls.append({
add_product_to_download_urls({
"product_name": name,
"id": dortania_builds_data[name]["versions"][0]["release"]["id"],
"url": dortania_builds_data[name]["versions"][0]["links"]["release"]
})
else:
if self.github.check_ratelimit():
download_urls.extend(self.github.get_latest_release(kext.github_repo.get("owner"), kext.github_repo.get("repo")))
add_product_to_download_urls(self.github.get_latest_release(kext.github_repo.get("owner"), kext.github_repo.get("repo")))
if self.github.check_ratelimit():
download_urls.extend(self.github.get_latest_release("wjz304", "OpenCore_Patch_Build"))
add_product_to_download_urls(self.github.get_latest_release("wjz304", "OpenCore_Patch_Build"))
sorted_data = sorted(download_urls, key=lambda x:x["product_name"])
sorted_download_urls = sorted(download_urls, key=lambda x:x["product_name"])
self.utils.create_folder(self.ock_files_dir)
self.utils.write_file(self.bootloader_kexts_data_path, {
"download_urls": sorted_data,
"last_updated": current_time.isoformat()
})
self.utils.write_file(self.bootloader_kexts_data_path, sorted_download_urls)
return sorted_data
def product_index_in_history(self, product_name, download_history):
for index, item in enumerate(download_history):
if product_name in item["product_name"]:
return index
return None
return sorted_download_urls
def move_bootloader_kexts_to_product_directory(self, product_name):
if not os.path.exists(self.temporary_dir):
@@ -103,30 +108,33 @@ class gatheringFiles:
return True
def gathering_bootloader_kexts(self):
download_history = self.utils.read_file(self.download_history_file) or []
download_history = self.utils.read_file(self.download_history_file)
if not isinstance(download_history, list):
download_history = []
self.utils.create_folder(self.temporary_dir)
for product_data in (self.utils.read_file(self.bootloader_kexts_data_path) or {}).get("download_urls", []):
product_index = self.product_index_in_history(product_data.get("product_name"), download_history)
if not product_index is None and product_data.get("id") == download_history[product_index].get("id"):
for product in self.utils.read_file(self.bootloader_kexts_data_path) or []:
product_index = self.get_product_index(download_history, product.get("product_name"))
if not product_index is None and product.get("id") == download_history[product_index].get("id"):
continue
asset_dir = os.path.join(self.ock_files_dir, product_data.get("product_name"))
asset_dir = os.path.join(self.ock_files_dir, product.get("product_name"))
self.utils.create_folder(asset_dir, remove_content=True)
zip_path = os.path.join(self.temporary_dir, product_data.get("product_name")) + ".zip"
self.fetcher.download_and_save_file(product_data.get("url"), zip_path)
zip_path = os.path.join(self.temporary_dir, product.get("product_name")) + ".zip"
self.fetcher.download_and_save_file(product.get("url"), zip_path)
self.utils.extract_zip_file(zip_path)
if self.move_bootloader_kexts_to_product_directory(product_data.get("product_name")):
if self.move_bootloader_kexts_to_product_directory(product.get("product_name")):
if product_index is None:
download_history.append({
"product_name": product_data.get("product_name"),
"id": product_data.get("id")
"product_name": product.get("product_name"),
"id": product.get("id")
})
else:
download_history[product_index]["id"] = product_data.get("id")
download_history[product_index]["id"] = product.get("id")
self.utils.write_file(self.download_history_file, download_history)

View File

@@ -22,7 +22,7 @@ class SMBIOS:
download_history = self.utils.read_file(self.g.download_history_file)
if download_history:
product_index = self.g.product_index_in_history("OpenCore", download_history)
product_index = self.g.get_product_index(download_history, "OpenCore")
if product_index:
download_history.pop(product_index)