Files
OpCore-Simplify/Scripts/resource_fetcher.py
Hoang Hong Quan 0e608a56ce Add GUI Support for OpCore Simplify (#512)
* Refactor OpCore-Simplify to GUI version

* New ConfigEditor

* Add requirement checks and installation in launchers

* Add GitHub Actions workflow to generate manifest.json

* Set compression level for asset

* Skip .git and __pycache__ folders

* Refactor update process to include integrity checker

* Add SMBIOS model selection

* Update README.md

* Update to main branch
2025-12-30 14:19:47 +07:00

196 lines
8.6 KiB
Python

from Scripts import integrity_checker
from Scripts import utils
import ssl
import os
import json
import plistlib
import socket
import sys
import gzip
import zlib
import time
if sys.version_info >= (3, 0):
from urllib.request import urlopen, Request
from urllib.error import URLError
else:
import urllib2
from urllib2 import urlopen, Request, URLError
MAX_ATTEMPTS = 3
class ResourceFetcher:
def __init__(self, utils_instance=None, integrity_checker_instance=None, headers=None):
self.request_headers = headers or {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
}
self.utils = utils_instance if utils_instance else utils.Utils()
self.buffer_size = 16 * 1024
self.ssl_context = self.create_ssl_context()
self.integrity_checker = integrity_checker_instance if integrity_checker_instance else integrity_checker.IntegrityChecker()
def create_ssl_context(self):
try:
cafile = ssl.get_default_verify_paths().openssl_cafile
if not os.path.exists(cafile):
import certifi
cafile = certifi.where()
ssl_context = ssl.create_default_context(cafile=cafile)
self.utils.log_message("[RESOURCE FETCHER] Created SSL context", level="INFO")
except Exception as e:
ssl_context = ssl._create_unverified_context()
self.utils.log_message("[RESOURCE FETCHER] Created unverified SSL context", level="INFO")
return ssl_context
def _make_request(self, resource_url, timeout=10):
try:
headers = dict(self.request_headers)
headers["Accept-Encoding"] = "gzip, deflate"
return urlopen(Request(resource_url, headers=headers), timeout=timeout, context=self.ssl_context)
except socket.timeout as e:
self.utils.log_message("[RESOURCE FETCHER] Timeout error: {}".format(e), level="ERROR", to_build_log=True)
except ssl.SSLError as e:
self.utils.log_message("[RESOURCE FETCHER] SSL error: {}".format(e), level="ERROR", to_build_log=True)
except (URLError, socket.gaierror) as e:
self.utils.log_message("[RESOURCE FETCHER] Connection error: {}".format(e), level="ERROR", to_build_log=True)
except Exception as e:
self.utils.log_message("[RESOURCE FETCHER] Request failed: {}".format(e), level="ERROR", to_build_log=True)
return None
def fetch_and_parse_content(self, resource_url, content_type=None):
attempt = 0
response = None
self.utils.log_message("[RESOURCE FETCHER] Fetching and parsing content from {}".format(resource_url), level="INFO")
while attempt < MAX_ATTEMPTS:
response = self._make_request(resource_url)
if not response:
attempt += 1
self.utils.log_message("[RESOURCE FETCHER] Failed to fetch content from {}. Retrying...".format(resource_url), level="WARNING", to_build_log=True)
continue
if response.getcode() == 200:
break
attempt += 1
if not response:
self.utils.log_message("[RESOURCE FETCHER] Failed to fetch content from {}".format(resource_url), level="ERROR", to_build_log=True)
return None
content = response.read()
if response.info().get("Content-Encoding") == "gzip" or content.startswith(b"\x1f\x8b"):
try:
content = gzip.decompress(content)
except Exception as e:
self.utils.log_message("[RESOURCE FETCHER] Failed to decompress gzip content: {}".format(e), level="ERROR", to_build_log=True)
elif response.info().get("Content-Encoding") == "deflate":
try:
content = zlib.decompress(content)
except Exception as e:
self.utils.log_message("[RESOURCE FETCHER] Failed to decompress deflate content: {}".format(e), level="ERROR", to_build_log=True)
try:
if content_type == "json":
return json.loads(content)
elif content_type == "plist":
return plistlib.loads(content)
else:
return content.decode("utf-8")
except Exception as e:
self.utils.log_message("[RESOURCE FETCHER] Error parsing content as {}: {}".format(content_type, e), level="ERROR", to_build_log=True)
return None
def _download_with_progress(self, response, local_file):
total_size = response.getheader("Content-Length")
if total_size:
total_size = int(total_size)
bytes_downloaded = 0
start_time = time.time()
last_time = start_time
last_bytes = 0
speeds = []
speed_str = "-- KB/s"
while True:
chunk = response.read(self.buffer_size)
if not chunk:
break
local_file.write(chunk)
bytes_downloaded += len(chunk)
current_time = time.time()
time_diff = current_time - last_time
if time_diff > 0.5:
current_speed = (bytes_downloaded - last_bytes) / time_diff
speeds.append(current_speed)
if len(speeds) > 5:
speeds.pop(0)
avg_speed = sum(speeds) / len(speeds)
if avg_speed < 1024*1024:
speed_str = "{:.1f} KB/s".format(avg_speed/1024)
else:
speed_str = "{:.1f} MB/s".format(avg_speed/(1024*1024))
last_time = current_time
last_bytes = bytes_downloaded
if total_size:
percent = int(bytes_downloaded / total_size * 100)
bar_length = 40
filled = int(bar_length * bytes_downloaded / total_size)
bar = "" * filled + "" * (bar_length - filled)
progress = "{} [{}] {:3d}% {:.1f}/{:.1f}MB".format(speed_str, bar, percent, bytes_downloaded/(1024*1024), total_size/(1024*1024))
else:
progress = "{} {:.1f}MB downloaded".format(speed_str, bytes_downloaded/(1024*1024))
self.utils.log_message("[RESOURCE FETCHER] Download progress: {}".format(progress), level="INFO", to_build_log=True)
def download_and_save_file(self, resource_url, destination_path, sha256_hash=None):
attempt = 0
self.utils.log_message("[RESOURCE FETCHER] Downloading and saving file from {} to {}".format(resource_url, destination_path), level="INFO")
while attempt < MAX_ATTEMPTS:
attempt += 1
response = self._make_request(resource_url)
if not response:
self.utils.log_message("[RESOURCE FETCHER] Failed to fetch content from {}. Retrying...".format(resource_url), level="WARNING", to_build_log=True)
continue
with open(destination_path, "wb") as local_file:
self._download_with_progress(response, local_file)
if os.path.exists(destination_path) and os.path.getsize(destination_path) > 0:
if sha256_hash:
self.utils.log_message("[RESOURCE FETCHER] Verifying SHA256 checksum...", level="INFO", to_build_log=True)
downloaded_hash = self.integrity_checker.get_sha256(destination_path)
if downloaded_hash.lower() == sha256_hash.lower():
self.utils.log_message("[RESOURCE FETCHER] Checksum verified successfully.", level="INFO", to_build_log=True)
return True
else:
self.utils.log_message("[RESOURCE FETCHER] Checksum mismatch! Removing file and retrying download...", level="WARNING", to_build_log=True)
os.remove(destination_path)
continue
else:
self.utils.log_message("[RESOURCE FETCHER] No SHA256 hash provided. Downloading file without verification.", level="INFO", to_build_log=True)
return True
if os.path.exists(destination_path):
os.remove(destination_path)
if attempt < MAX_ATTEMPTS:
self.utils.log_message("[RESOURCE FETCHER] Download failed for {}. Retrying...".format(resource_url), level="WARNING", to_build_log=True)
self.utils.log_message("[RESOURCE FETCHER] Failed to download {} after {} attempts.".format(resource_url, MAX_ATTEMPTS), level="ERROR", to_build_log=True)
return False