mirror of
https://github.com/outbackdingo/OpCore-Simplify.git
synced 2026-01-27 10:19:49 +00:00
Add GUI Support for OpCore Simplify (#512)
* Refactor OpCore-Simplify to GUI version * New ConfigEditor * Add requirement checks and installation in launchers * Add GitHub Actions workflow to generate manifest.json * Set compression level for asset * Skip .git and __pycache__ folders * Refactor update process to include integrity checker * Add SMBIOS model selection * Update README.md * Update to main branch
This commit is contained in:
367
updater.py
367
updater.py
@@ -1,201 +1,260 @@
|
||||
import os
|
||||
import tempfile
|
||||
import shutil
|
||||
import sys
|
||||
|
||||
from PyQt6.QtCore import QThread, pyqtSignal
|
||||
|
||||
from Scripts import resource_fetcher
|
||||
from Scripts import github
|
||||
from Scripts import run
|
||||
from Scripts import utils
|
||||
import os
|
||||
import tempfile
|
||||
import shutil
|
||||
from Scripts import integrity_checker
|
||||
from Scripts.custom_dialogs import show_update_dialog, show_info, show_confirmation
|
||||
|
||||
class UpdateCheckerThread(QThread):
|
||||
update_available = pyqtSignal(dict)
|
||||
check_failed = pyqtSignal(str)
|
||||
no_update = pyqtSignal()
|
||||
|
||||
def __init__(self, updater_instance):
|
||||
super().__init__()
|
||||
self.updater = updater_instance
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
remote_manifest = self.updater.get_remote_manifest()
|
||||
if not remote_manifest:
|
||||
self.check_failed.emit("Could not fetch update information from GitHub.\n\nPlease check your internet connection and try again later.")
|
||||
return
|
||||
|
||||
local_manifest = self.updater.get_local_manifest()
|
||||
if not local_manifest:
|
||||
self.check_failed.emit("Could not generate local manifest.\n\nPlease try again later.")
|
||||
return
|
||||
|
||||
files_to_update = self.updater.compare_manifests(local_manifest, remote_manifest)
|
||||
if not files_to_update:
|
||||
self.no_update.emit()
|
||||
else:
|
||||
self.update_available.emit(files_to_update)
|
||||
except Exception as e:
|
||||
self.check_failed.emit("An error occurred during update check:\n\n{}".format(str(e)))
|
||||
|
||||
class Updater:
|
||||
def __init__(self):
|
||||
self.github = github.Github()
|
||||
self.fetcher = resource_fetcher.ResourceFetcher()
|
||||
self.run = run.Run().run
|
||||
self.utils = utils.Utils()
|
||||
self.sha_version = os.path.join(os.path.dirname(os.path.realpath(__file__)), "sha_version.txt")
|
||||
def __init__(self, utils_instance=None, github_instance=None, resource_fetcher_instance=None, run_instance=None, integrity_checker_instance=None):
|
||||
self.utils = utils_instance if utils_instance else utils.Utils()
|
||||
self.github = github_instance if github_instance else github.Github(utils_instance=self.utils)
|
||||
self.fetcher = resource_fetcher_instance if resource_fetcher_instance else resource_fetcher.ResourceFetcher(utils_instance=self.utils)
|
||||
self.run = run_instance.run if run_instance else run.Run().run
|
||||
self.integrity_checker = integrity_checker_instance if integrity_checker_instance else integrity_checker.IntegrityChecker(utils_instance=self.utils)
|
||||
self.remote_manifest_url = "https://nightly.link/lzhoang2801/OpCore-Simplify/workflows/generate-manifest/main/manifest.json.zip"
|
||||
self.download_repo_url = "https://github.com/lzhoang2801/OpCore-Simplify/archive/refs/heads/main.zip"
|
||||
self.temporary_dir = tempfile.mkdtemp()
|
||||
self.current_step = 0
|
||||
self.root_dir = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
def get_current_sha_version(self):
|
||||
print("Checking current version...")
|
||||
try:
|
||||
current_sha_version = self.utils.read_file(self.sha_version)
|
||||
|
||||
if not current_sha_version:
|
||||
print("SHA version information is missing.")
|
||||
return "missing_sha_version"
|
||||
|
||||
return current_sha_version.decode()
|
||||
except Exception as e:
|
||||
print("Error reading current SHA version: {}".format(str(e)))
|
||||
return "error_reading_sha_version"
|
||||
|
||||
def get_latest_sha_version(self):
|
||||
print("Fetching latest version from GitHub...")
|
||||
try:
|
||||
commits = self.github.get_commits("lzhoang2801", "OpCore-Simplify")
|
||||
return commits["commitGroups"][0]["commits"][0]["oid"]
|
||||
except Exception as e:
|
||||
print("Error fetching latest SHA version: {}".format(str(e)))
|
||||
def get_remote_manifest(self, dialog=None):
|
||||
if dialog:
|
||||
dialog.update_progress(10, "Fetching remote manifest...")
|
||||
|
||||
return None
|
||||
try:
|
||||
temp_manifest_zip_path = os.path.join(self.temporary_dir, "remote_manifest.json.zip")
|
||||
success = self.fetcher.download_and_save_file(self.remote_manifest_url, temp_manifest_zip_path)
|
||||
|
||||
if not success or not os.path.exists(temp_manifest_zip_path):
|
||||
return None
|
||||
|
||||
def download_update(self):
|
||||
self.current_step += 1
|
||||
print("")
|
||||
print("Step {}: Creating temporary directory...".format(self.current_step))
|
||||
self.utils.extract_zip_file(temp_manifest_zip_path, self.temporary_dir)
|
||||
|
||||
remote_manifest_path = os.path.join(self.temporary_dir, "manifest.json")
|
||||
manifest_data = self.utils.read_file(remote_manifest_path)
|
||||
|
||||
if dialog:
|
||||
dialog.update_progress(20, "Manifest downloaded successfully")
|
||||
|
||||
return manifest_data
|
||||
except Exception as e:
|
||||
self.utils.log_message("[UPDATER] Error fetching remote manifest: {}".format(str(e)), level="ERROR")
|
||||
return None
|
||||
|
||||
def get_local_manifest(self, dialog=None):
|
||||
if dialog:
|
||||
dialog.update_progress(40, "Generating local manifest...")
|
||||
|
||||
try:
|
||||
manifest_data = self.integrity_checker.generate_folder_manifest(self.root_dir, save_manifest=False)
|
||||
|
||||
if dialog:
|
||||
dialog.update_progress(50, "Local manifest generated")
|
||||
|
||||
return manifest_data
|
||||
except Exception as e:
|
||||
self.utils.log_message("[UPDATER] Error generating local manifest: {}".format(str(e)), level="ERROR")
|
||||
return None
|
||||
|
||||
def compare_manifests(self, local_manifest, remote_manifest):
|
||||
if not local_manifest or not remote_manifest:
|
||||
return None
|
||||
|
||||
files_to_update = {
|
||||
"modified": [],
|
||||
"missing": [],
|
||||
"new": []
|
||||
}
|
||||
|
||||
local_files = set(local_manifest.keys())
|
||||
remote_files = set(remote_manifest.keys())
|
||||
|
||||
for file_path in local_files & remote_files:
|
||||
if local_manifest[file_path] != remote_manifest[file_path]:
|
||||
files_to_update["modified"].append(file_path)
|
||||
|
||||
files_to_update["missing"] = list(remote_files - local_files)
|
||||
|
||||
files_to_update["new"] = list(local_files - remote_files)
|
||||
|
||||
total_changes = len(files_to_update["modified"]) + len(files_to_update["missing"])
|
||||
|
||||
return files_to_update if total_changes > 0 else None
|
||||
|
||||
def download_update(self, dialog=None):
|
||||
if dialog:
|
||||
dialog.update_progress(60, "Creating temporary directory...")
|
||||
|
||||
try:
|
||||
self.utils.create_folder(self.temporary_dir)
|
||||
print(" Temporary directory created.")
|
||||
|
||||
self.current_step += 1
|
||||
print("Step {}: Downloading update package...".format(self.current_step))
|
||||
print(" ", end="")
|
||||
file_path = os.path.join(self.temporary_dir, os.path.basename(self.download_repo_url))
|
||||
self.fetcher.download_and_save_file(self.download_repo_url, file_path)
|
||||
if dialog:
|
||||
dialog.update_progress(65, "Downloading update package...")
|
||||
|
||||
if os.path.exists(file_path) and os.path.getsize(file_path) > 0:
|
||||
print(" Update package downloaded ({:.1f} KB)".format(os.path.getsize(file_path)/1024))
|
||||
|
||||
self.current_step += 1
|
||||
print("Step {}: Extracting files...".format(self.current_step))
|
||||
self.utils.extract_zip_file(file_path)
|
||||
print(" Files extracted successfully")
|
||||
return True
|
||||
else:
|
||||
print(" Download failed or file is empty")
|
||||
file_path = os.path.join(self.temporary_dir, "update.zip")
|
||||
success = self.fetcher.download_and_save_file(self.download_repo_url, file_path)
|
||||
|
||||
if not success or not os.path.exists(file_path) or os.path.getsize(file_path) == 0:
|
||||
return False
|
||||
|
||||
if dialog:
|
||||
dialog.update_progress(75, "Extracting files...")
|
||||
|
||||
self.utils.extract_zip_file(file_path, self.temporary_dir)
|
||||
|
||||
if dialog:
|
||||
dialog.update_progress(80, "Files extracted successfully")
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
print(" Error during download/extraction: {}".format(str(e)))
|
||||
self.utils.log_message("[UPDATER] Error during download/extraction: {}".format(str(e)), level="ERROR")
|
||||
return False
|
||||
|
||||
def update_files(self):
|
||||
self.current_step += 1
|
||||
print("Step {}: Updating files...".format(self.current_step))
|
||||
|
||||
def update_files(self, files_to_update, dialog=None):
|
||||
if not files_to_update:
|
||||
return True
|
||||
|
||||
try:
|
||||
target_dir = os.path.join(self.temporary_dir, "OpCore-Simplify-main")
|
||||
|
||||
if not os.path.exists(target_dir):
|
||||
target_dir = os.path.join(self.temporary_dir, "main", "OpCore-Simplify-main")
|
||||
|
||||
if not os.path.exists(target_dir):
|
||||
print(" Could not locate extracted files directory")
|
||||
self.utils.log_message("[UPDATER] Target directory not found: {}".format(target_dir), level="ERROR")
|
||||
return False
|
||||
|
||||
file_paths = self.utils.find_matching_paths(target_dir, type_filter="file")
|
||||
|
||||
total_files = len(file_paths)
|
||||
print(" Found {} files to update".format(total_files))
|
||||
all_files = files_to_update["modified"] + files_to_update["missing"]
|
||||
total_files = len(all_files)
|
||||
|
||||
if dialog:
|
||||
dialog.update_progress(85, "Updating {} files...".format(total_files))
|
||||
|
||||
updated_count = 0
|
||||
for index, (path, type) in enumerate(file_paths, start=1):
|
||||
source = os.path.join(target_dir, path)
|
||||
destination = source.replace(target_dir, os.path.dirname(os.path.realpath(__file__)))
|
||||
for index, relative_path in enumerate(all_files, start=1):
|
||||
source = os.path.join(target_dir, relative_path)
|
||||
|
||||
if not os.path.exists(source):
|
||||
self.utils.log_message("[UPDATER] Source file not found: {}".format(source), level="ERROR")
|
||||
continue
|
||||
|
||||
destination = os.path.join(self.root_dir, relative_path)
|
||||
|
||||
self.utils.create_folder(os.path.dirname(destination))
|
||||
|
||||
print(" Updating [{}/{}]: {}".format(index, total_files, os.path.basename(path)), end="\r")
|
||||
self.utils.log_message("[UPDATER] Updating [{}/{}]: {}".format(index, total_files, os.path.basename(relative_path)), level="INFO")
|
||||
if dialog:
|
||||
progress = 85 + int((index / total_files) * 10)
|
||||
dialog.update_progress(progress, "Updating [{}/{}]: {}".format(index, total_files, os.path.basename(relative_path)))
|
||||
|
||||
try:
|
||||
shutil.move(source, destination)
|
||||
updated_count += 1
|
||||
|
||||
if ".command" in os.path.splitext(path)[-1] and os.name != "nt":
|
||||
if ".command" in os.path.splitext(relative_path)[-1] and os.name != "nt":
|
||||
self.run({
|
||||
"args": ["chmod", "+x", destination]
|
||||
})
|
||||
except Exception as e:
|
||||
print(" Failed to update {}: {}".format(path, str(e)))
|
||||
self.utils.log_message("[UPDATER] Failed to update {}: {}".format(relative_path, str(e)), level="ERROR")
|
||||
|
||||
print("")
|
||||
print(" Successfully updated {}/{} files".format(updated_count, total_files))
|
||||
if dialog:
|
||||
dialog.update_progress(95, "Successfully updated {}/{} files".format(updated_count, total_files))
|
||||
|
||||
self.current_step += 1
|
||||
print("Step {}: Cleaning up temporary files...".format(self.current_step))
|
||||
shutil.rmtree(self.temporary_dir)
|
||||
print(" Cleanup complete")
|
||||
if os.path.exists(self.temporary_dir):
|
||||
shutil.rmtree(self.temporary_dir)
|
||||
|
||||
if dialog:
|
||||
dialog.update_progress(100, "Update completed!")
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
print(" Error during file update: {}".format(str(e)))
|
||||
self.utils.log_message("[UPDATER] Error during file update: {}".format(str(e)), level="ERROR")
|
||||
return False
|
||||
|
||||
def save_latest_sha_version(self, latest_sha):
|
||||
try:
|
||||
self.utils.write_file(self.sha_version, latest_sha.encode())
|
||||
self.current_step += 1
|
||||
print("Step {}: Version information updated.".format(self.current_step))
|
||||
return True
|
||||
except Exception as e:
|
||||
print("Failed to save version information: {}".format(str(e)))
|
||||
return False
|
||||
|
||||
def run_update(self):
|
||||
self.utils.head("Check for Updates")
|
||||
print("")
|
||||
|
||||
def run_update(self):
|
||||
checker_thread = UpdateCheckerThread(self)
|
||||
|
||||
current_sha_version = self.get_current_sha_version()
|
||||
latest_sha_version = self.get_latest_sha_version()
|
||||
|
||||
print("")
|
||||
|
||||
if latest_sha_version is None:
|
||||
print("Could not verify the latest version from GitHub.")
|
||||
print("Current script SHA version: {}".format(current_sha_version))
|
||||
print("Please check your internet connection and try again later.")
|
||||
print("")
|
||||
def on_update_available(files_to_update):
|
||||
checker_thread.quit()
|
||||
checker_thread.wait()
|
||||
|
||||
while True:
|
||||
user_input = self.utils.request_input("Do you want to skip the update process? (yes/No): ").strip().lower()
|
||||
if user_input == "yes":
|
||||
print("")
|
||||
print("Update process skipped.")
|
||||
return False
|
||||
elif user_input == "no":
|
||||
print("")
|
||||
print("Continuing with update using default version check...")
|
||||
latest_sha_version = "update_forced_by_user"
|
||||
break
|
||||
else:
|
||||
print("\033[91mInvalid selection, please try again.\033[0m\n\n")
|
||||
else:
|
||||
print("Current script SHA version: {}".format(current_sha_version))
|
||||
print("Latest script SHA version: {}".format(latest_sha_version))
|
||||
|
||||
print("")
|
||||
|
||||
if latest_sha_version != current_sha_version:
|
||||
print("Update available!")
|
||||
print("Updating from version {} to {}".format(current_sha_version, latest_sha_version))
|
||||
print("")
|
||||
print("Starting update process...")
|
||||
if not show_confirmation("An update is available!", "Would you like to update now?", yes_text="Update", no_text="Later"):
|
||||
return False
|
||||
|
||||
if not self.download_update():
|
||||
print("")
|
||||
print(" Update failed: Could not download or extract update package")
|
||||
|
||||
dialog = show_update_dialog("Updating", "Starting update process...")
|
||||
dialog.show()
|
||||
|
||||
try:
|
||||
if not self.download_update(dialog):
|
||||
dialog.close()
|
||||
show_info("Update Failed", "Could not download or extract update package.\n\nPlease check your internet connection and try again.")
|
||||
return
|
||||
|
||||
if not self.update_files(files_to_update, dialog):
|
||||
dialog.close()
|
||||
show_info("Update Failed", "Could not update files.\n\nPlease try again later.")
|
||||
return
|
||||
|
||||
dialog.close()
|
||||
show_info("Update Complete", "Update completed successfully!\n\nThe program needs to restart to complete the update process.")
|
||||
|
||||
os.execv(sys.executable, ["python3"] + sys.argv)
|
||||
except Exception as e:
|
||||
dialog.close()
|
||||
self.utils.log_message("[UPDATER] Error during update: {}".format(str(e)), level="ERROR")
|
||||
show_info("Update Error", "An error occurred during the update process:\n\n{}".format(str(e)))
|
||||
finally:
|
||||
if os.path.exists(self.temporary_dir):
|
||||
self.current_step += 1
|
||||
print("Step {}: Cleaning up temporary files...".format(self.current_step))
|
||||
shutil.rmtree(self.temporary_dir)
|
||||
print(" Cleanup complete")
|
||||
|
||||
return False
|
||||
|
||||
if not self.update_files():
|
||||
print("")
|
||||
print(" Update failed: Could not update files")
|
||||
return False
|
||||
|
||||
if not self.save_latest_sha_version(latest_sha_version):
|
||||
print("")
|
||||
print(" Update completed but version information could not be saved")
|
||||
|
||||
print("")
|
||||
print("Update completed successfully!")
|
||||
print("")
|
||||
print("The program needs to restart to complete the update process.")
|
||||
return True
|
||||
else:
|
||||
print("You are already using the latest version")
|
||||
return False
|
||||
try:
|
||||
shutil.rmtree(self.temporary_dir)
|
||||
except:
|
||||
pass
|
||||
|
||||
def on_check_failed(error_message):
|
||||
checker_thread.quit()
|
||||
checker_thread.wait()
|
||||
show_info("Update Check Failed", error_message)
|
||||
|
||||
def on_no_update():
|
||||
checker_thread.quit()
|
||||
checker_thread.wait()
|
||||
|
||||
checker_thread.update_available.connect(on_update_available)
|
||||
checker_thread.check_failed.connect(on_check_failed)
|
||||
checker_thread.no_update.connect(on_no_update)
|
||||
|
||||
checker_thread.start()
|
||||
Reference in New Issue
Block a user