diff --git a/.github/workflows/build-release.yml b/.github/workflows/build-release.yml index b5adab0..be0387c 100644 --- a/.github/workflows/build-release.yml +++ b/.github/workflows/build-release.yml @@ -136,19 +136,22 @@ jobs: cd deploy pyinstaller "OneDragon ScriptChainer.spec" pyinstaller "OneDragon ScriptChainer Runner.spec" + Copy-Item "dist/OneDragon ScriptChainer Runner.exe" -Destination "dist/OneDragon ScriptChainer/" -Force - name: Upload Editor if: ${{ env.CREATE_RELEASE == 'false' }} uses: actions/upload-artifact@v4 with: - name: Editor - path: deploy/dist/OneDragon ScriptChainer.exe + name: Editor-r${{ github.run_number }} + include-hidden-files: true + path: deploy/dist/OneDragon ScriptChainer/ - name: Upload Runner if: ${{ env.CREATE_RELEASE == 'false' }} uses: actions/upload-artifact@v4 with: - name: Runner + name: Runner-r${{ github.run_number }} + include-hidden-files: true path: deploy/dist/OneDragon ScriptChainer Runner.exe - name: Upload Dist @@ -156,8 +159,9 @@ jobs: uses: actions/upload-artifact@v4 with: name: dist + include-hidden-files: true if-no-files-found: error - path: deploy/dist + path: deploy/dist/OneDragon ScriptChainer/ release: runs-on: windows-latest @@ -173,7 +177,7 @@ jobs: uses: actions/download-artifact@v4 with: name: dist - path: . + path: OneDragon ScriptChainer - name: Prepare release packages shell: pwsh @@ -182,8 +186,7 @@ jobs: run: | $version = $env:RELEASE_VERSION - # 在根目录打包两个 exe - Compress-Archive -Path "OneDragon ScriptChainer.exe","OneDragon ScriptChainer Runner.exe" -DestinationPath "OneDragon-ScriptChainer-$version.zip" -Force + Compress-Archive -Path "OneDragon ScriptChainer" -DestinationPath "OneDragon-ScriptChainer-$version.zip" -Force - name: Generate Changelog @@ -310,10 +313,12 @@ jobs: ### 使用方法 1. 下载 `OneDragon-ScriptChainer-${{ needs.build.outputs.version }}.zip` - 2. 解压到任意目录 + 2. 设置中可以自更新。如需手动更新,请先删除旧版的 `.runtime` 文件夹,再解压覆盖 3. 运行 `OneDragon ScriptChainer.exe` 创建和编辑脚本链 4. 运行 `OneDragon ScriptChainer Runner.exe` 执行脚本链 + > 注意:`OneDragon ScriptChainer.exe` 所在目录包含运行时依赖(`.runtime` 文件夹),请勿单独移动该文件。 + ### 命令行参数 `OneDragon ScriptChainer.exe` 支持以下参数: diff --git a/.gitignore b/.gitignore index d84134f..03a6a5b 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,5 @@ __pycache__/ .log/ config/script_chain/ config/custom.yml -config/push.yml \ No newline at end of file +config/push.yml +config/env.yml \ No newline at end of file diff --git a/config/project.yml b/config/project.yml index d9df2b3..7899041 100644 --- a/config/project.yml +++ b/config/project.yml @@ -6,7 +6,6 @@ github_ssh_repository: "git@github.com:OneDragon-Anything/OneDragon-ScriptChaine gitee_https_repository: "https://gitee.com/OneDragon-Anything/OneDragon-ScriptChainer.git" gitee_ssh_repository: "git@gitee.com:OneDragon-Anything/OneDragon-ScriptChainer.git" project_git_branch: "main" -requirements: "requirements-prod.txt" screen_standard_width: 1920 screen_standard_height: 1080 pip_source: "https://pypi.tuna.tsinghua.edu.cn/simple" diff --git a/deploy/OneDragon ScriptChainer Runner.spec b/deploy/OneDragon ScriptChainer Runner.spec index 7567860..e111db0 100644 --- a/deploy/OneDragon ScriptChainer Runner.spec +++ b/deploy/OneDragon ScriptChainer Runner.spec @@ -5,7 +5,7 @@ a = Analysis( ['..\\src\\script_chainer\\win_exe\\script_runner.py'], pathex=[], binaries=[], - datas=[('../config/project.yml', 'config')], + datas=[('../config/project.yml', 'resources/config')], hiddenimports=[], hookspath=[], hooksconfig={}, diff --git a/deploy/OneDragon ScriptChainer.spec b/deploy/OneDragon ScriptChainer.spec index 9136733..03beccb 100644 --- a/deploy/OneDragon ScriptChainer.spec +++ b/deploy/OneDragon ScriptChainer.spec @@ -6,8 +6,8 @@ a = Analysis( pathex=[], binaries=[], datas=[ - ('../config/project.yml', 'config'), - ('../assets', 'assets') + ('../config/project.yml', 'resources/config'), + ('../assets', 'resources/assets') ], hiddenimports=[], hookspath=[], @@ -22,16 +22,14 @@ pyz = PYZ(a.pure) exe = EXE( pyz, a.scripts, - a.binaries, - a.datas, [], + exclude_binaries=True, name='OneDragon ScriptChainer', debug=False, bootloader_ignore_signals=False, strip=False, upx=True, upx_exclude=[], - runtime_tmpdir=None, console=True, disable_windowed_traceback=False, argv_emulation=False, @@ -40,4 +38,15 @@ exe = EXE( entitlements_file=None, uac_admin=False, icon=['..\\assets\\ui\\editor_icon.ico'], + contents_directory='.runtime', +) + +coll = COLLECT( + exe, + a.binaries, + a.datas, + strip=False, + upx=True, + upx_exclude=[], + name='OneDragon ScriptChainer', ) diff --git a/src/one_dragon/base/config/yaml_config.py b/src/one_dragon/base/config/yaml_config.py index 4db354a..4a3fb1f 100644 --- a/src/one_dragon/base/config/yaml_config.py +++ b/src/one_dragon/base/config/yaml_config.py @@ -1,6 +1,5 @@ import os import shutil -from typing import Optional, List from one_dragon.base.config.yaml_operator import YamlOperator from one_dragon.utils import os_utils @@ -8,21 +7,28 @@ class YamlConfig(YamlOperator): - def __init__(self, - module_name: str, - instance_idx: Optional[int] = None, - sub_dir: Optional[List[str]] = None, - sample: bool = False, copy_from_sample: bool = False, - is_mock: bool = False): - self.instance_idx: Optional[int] = instance_idx + def __init__( + self, + module_name: str, + backup_module_name: str | None = None, + instance_idx: int | None = None, + sub_dir: list[str] | None = None, + sample: bool = False, copy_from_sample: bool = False, + read_sample_only: bool = False, + is_mock: bool = False + ): + self.instance_idx: int | None = instance_idx """传入时 该配置为一个的脚本实例独有的配置""" - self.sub_dir: Optional[List[str]] = sub_dir + self.sub_dir: list[str] | None = sub_dir """配置所在的子目录""" self.module_name: str = module_name """配置文件名称""" + self.backup_module_name: str | None = backup_module_name + """备用的配置文件名称 主要用于配置文件改名时做迁移使用""" + self.is_mock: bool = is_mock """mock情况下 不读取文件 也不会实际保存 用于测试""" @@ -32,32 +38,66 @@ def __init__(self, self._copy_from_sample: bool = copy_from_sample """配置文件不存在时 是否从sample文件中读取""" - YamlOperator.__init__(self, self._get_yaml_file_path()) + self._read_sample_only: bool = read_sample_only + """是否只读取sample文件(即使.yml文件存在也只读sample)""" + + file_path, write_file_path, copy_on_write_source_path = self._get_yaml_file_paths() + YamlOperator.__init__(self, file_path) + self._write_file_path = write_file_path + self._copy_on_write_source_path = copy_on_write_source_path - def _get_yaml_file_path(self) -> Optional[str]: + def _get_yaml_file_paths(self) -> tuple[str | None, str | None, str | None]: """ - 获取配置文件的路径 - 如果只有sample文件,就复制一个到实例文件夹下 + 获取配置文件的读取路径、写入路径 :return: """ if self.is_mock: - return None + return None, None, None sub_dir = ['config'] if self.instance_idx is not None: sub_dir.append('%02d' % self.instance_idx) if self.sub_dir is not None: sub_dir = sub_dir + self.sub_dir - yml_path = os.path.join(os_utils.get_path_under_work_dir(*sub_dir), f'{self.module_name}.yml') - sample_yml_path = os.path.join(os_utils.get_path_under_work_dir(*sub_dir), f'{self.module_name}.sample.yml') - usage_yml_path = sample_yml_path if self._sample and not os.path.exists(yml_path) else yml_path - use_sample = self._sample and not os.path.exists(yml_path) + dir_path = os_utils.get_path_under_work_dir(*sub_dir) + + yml_path = os.path.join(dir_path, f'{self.module_name}.yml') + sample_yml_path = os.path.join(dir_path, f'{self.module_name}.sample.yml') + + # 只读sample文件模式 + if self._read_sample_only and os.path.exists(sample_yml_path): + return sample_yml_path, sample_yml_path, None + + # 指定文件存在时 直接使用 + if os.path.exists(yml_path): + return yml_path, yml_path, None - if use_sample and self._copy_from_sample: - shutil.copyfile(sample_yml_path, yml_path) - usage_yml_path = yml_path + # 备用文件存在时 复制使用 + if self.backup_module_name is not None: + backup_yml_path = os.path.join(dir_path, f'{self.backup_module_name}.yml') + if os.path.exists(backup_yml_path): + shutil.copyfile(backup_yml_path, yml_path) + return yml_path, yml_path, None - return usage_yml_path + # 最后看是否有示例文件 + if self._sample and os.path.exists(sample_yml_path): + if self._copy_from_sample: + shutil.copyfile(sample_yml_path, yml_path) + return yml_path, yml_path, None + return sample_yml_path, yml_path, sample_yml_path + + # 冻结环境回退到 MEIPASS/resources + frozen_path = os_utils.get_resource_path(*sub_dir, f'{self.module_name}.yml') + if os.path.exists(frozen_path): + return frozen_path, yml_path, frozen_path + + return yml_path, yml_path, None + + def _get_yaml_file_path(self) -> str | None: + file_path, write_file_path, copy_on_write_source_path = self._get_yaml_file_paths() + self._write_file_path = write_file_path + self._copy_on_write_source_path = copy_on_write_source_path + return file_path @property def is_sample(self) -> bool: @@ -65,11 +105,13 @@ def is_sample(self) -> bool: 是否样例文件 :return: """ + if self.file_path is None: + return False return self.file_path.endswith('.sample.yml') def get_prop_adapter(self, prop: str, - getter_convert: Optional[str] = None, - setter_convert: Optional[str] = None): + getter_convert: str | None = None, + setter_convert: str | None = None): """ 获取一个配置适配器 :param prop: 配置字段 @@ -77,7 +119,9 @@ def get_prop_adapter(self, prop: str, :param setter_convert: 设置时的转换器 :return: """ - from one_dragon_qt.widgets.setting_card.yaml_config_adapter import YamlConfigAdapter + from one_dragon_qt.widgets.setting_card.yaml_config_adapter import ( + YamlConfigAdapter, + ) return YamlConfigAdapter( config=self, field=prop, diff --git a/src/one_dragon/base/config/yaml_operator.py b/src/one_dragon/base/config/yaml_operator.py index 0c089b8..45fc0b5 100644 --- a/src/one_dragon/base/config/yaml_operator.py +++ b/src/one_dragon/base/config/yaml_operator.py @@ -1,36 +1,56 @@ +import copy import os -import sys -from typing import Optional +import shutil import yaml +from one_dragon.utils import yaml_utils from one_dragon.utils.log_utils import log +cached_yaml_data: dict[str, tuple[float, dict | list]] = {} -def get_temp_config_path(file_path: str) -> str: - """ - 优先检查PyInstaller运行时的_MEIPASS目录下是否有对应的yml文件 - 有则返回该路径,否则返回原路径 - """ - if hasattr(sys, '_MEIPASS'): - mei_path = os.path.join(sys._MEIPASS, 'config', os.path.basename(file_path)) - if os.path.exists(mei_path): - return mei_path - return file_path + +def read_cache_or_load(file_path: str) -> dict | list: + cached = cached_yaml_data.get(file_path) + last_modify = os.path.getmtime(file_path) + if cached is not None and cached[0] == last_modify: + return copy.deepcopy(cached[1]) + + with open(file_path, encoding="utf-8") as file: + log.debug(f"加载yaml: {file_path}") + data = yaml_utils.safe_load(file) + if data is None: + data = {} + if not isinstance(data, dict | list): + raise TypeError(f"YAML root must be a dict or list: {file_path}") + cached_yaml_data[file_path] = (last_modify, data) + return copy.deepcopy(data) + + +def invalidate_cache(file_path: str | None) -> None: + if file_path is None: + return + cached_yaml_data.pop(file_path, None) class YamlOperator: - def __init__(self, file_path: Optional[str] = None): + def __init__(self, file_path: str | None = None): """ yml文件的操作器 :param file_path: yml文件的路径。不传入时认为是mock,用于测试。 """ - self.file_path: str = get_temp_config_path(file_path) if file_path else None + self.file_path: str | None = file_path """yml文件的路径""" - self.data: dict = {} + self._write_file_path: str | None = file_path + """实际写入路径 兼容 onedir 下读写路径分离""" + + self._copy_on_write_source_path: str | None = None + """首次写入前需要复制到写入路径的来源文件""" + + self.data: dict | list = {} """存放数据的地方""" self.__read_from_file() @@ -46,8 +66,7 @@ def __read_from_file(self) -> None: return try: - with open(self.file_path, 'r', encoding='utf-8') as file: - self.data = yaml.safe_load(file) + self.data = read_cache_or_load(self.file_path) except Exception: log.error(f'文件读取失败 将使用默认值 {self.file_path}', exc_info=True) return @@ -55,12 +74,42 @@ def __read_from_file(self) -> None: if self.data is None: self.data = {} + def _ensure_write_path_ready(self) -> bool: + write_path = self._get_write_path() + if write_path is None: + return False + + parent_dir = os.path.dirname(write_path) + if parent_dir: + os.makedirs(parent_dir, exist_ok=True) + + if self._copy_on_write_source_path is not None and not os.path.exists(write_path): + shutil.copyfile(self._copy_on_write_source_path, write_path) + + self._copy_on_write_source_path = None + return True + + def _get_write_path(self) -> str | None: + if self._copy_on_write_source_path is None: + return self.file_path if self.file_path is not None else self._write_file_path + return self._write_file_path if self._write_file_path is not None else self.file_path + def save(self): - if self.file_path is None: + if not self._ensure_write_path_ready(): + return + + write_path = self._get_write_path() + if write_path is None: return - with open(self.file_path, 'w', encoding='utf-8') as file: + with open(write_path, 'w', encoding='utf-8') as file: yaml.dump(self.data, file, allow_unicode=True, sort_keys=False) + invalidate_cache(write_path) + + if self.file_path != write_path: + self.file_path = write_path + if hasattr(self, 'old_file_path'): + self.old_file_path = write_path def save_diy(self, text: str): """ @@ -68,17 +117,29 @@ def save_diy(self, text: str): :param text: 自定义的文本 :return: """ - if self.file_path is None: + if not self._ensure_write_path_ready(): return - with open(self.file_path, "w", encoding="utf-8") as file: + write_path = self._get_write_path() + if write_path is None: + return + + with open(write_path, "w", encoding="utf-8") as file: file.write(text) + invalidate_cache(write_path) + + if self.file_path != write_path: + self.file_path = write_path + if hasattr(self, 'old_file_path'): + self.old_file_path = write_path def get(self, prop: str, value=None): + if not isinstance(self.data, dict): + return value return self.data.get(prop, value) def update(self, key: str, value, save: bool = True): - if self.data is None: + if not isinstance(self.data, dict): self.data = {} if key in self.data and not isinstance(value, list) and self.data[key] == value: return @@ -91,12 +152,16 @@ def delete(self): 删除配置文件 :return: """ + if self.file_path is None: + return if os.path.exists(self.file_path): os.remove(self.file_path) + invalidate_cache(self.file_path) + @property def is_file_exists(self) -> bool: """ 配置文件是否存在 :return: """ - return os.path.exists(self.file_path) + return bool(self.file_path) and os.path.exists(self.file_path) diff --git a/src/one_dragon/base/web/__init__.py b/src/one_dragon/base/web/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/one_dragon/base/web/common_downloader.py b/src/one_dragon/base/web/common_downloader.py new file mode 100644 index 0000000..32e401e --- /dev/null +++ b/src/one_dragon/base/web/common_downloader.py @@ -0,0 +1,105 @@ +import os +from collections.abc import Callable + +from one_dragon.utils import http_utils +from one_dragon.utils.log_utils import log + + +class CommonDownloaderParam: + + def __init__( + self, + save_file_path: str, + save_file_name: str, + github_release_download_url: str | None = None, + gitee_release_download_url: str | None = None, + mirror_chan_download_url: str | None = None, + check_existed_list: list[str] | None = None, + unzip_dir_path: str | None = None, + ): + """ + 一个通用下载器 可提供3个下载源 并检查文件是否存在 如果存在则不进行下载 + + Args: + save_file_path (str): 文件保存的路径 + save_file_name (str): 文件保存的名称 + github_release_download_url (Optional[str], optional): Github Release下载地址. Defaults to None. + gitee_release_download_url (Optional[str], optional): Gitee Release下载地址. Defaults to None. + mirror_chan_download_url (Optional[str], optional): Mirror酱下载地址. Defaults to None. + check_existed_list (Optional[list[str]], optional): 需要检查文件是否存在的列表 完整路径的列表. Defaults to None. + unzip_dir_path (Optional[str], optional): 解压目录路径,如果为None则解压到save_file_path. Defaults to None. + """ + self.save_file_path: str = save_file_path + self.save_file_name: str = save_file_name + self.github_release_download_url: str | None = github_release_download_url + self.gitee_release_download_url: str | None = gitee_release_download_url + self.mirror_chan_download_url: str | None = mirror_chan_download_url + self.check_existed_list: list[str] = [] if check_existed_list is None else check_existed_list + self.unzip_dir_path: str | None = unzip_dir_path + + +class CommonDownloader: + + def __init__( + self, + param: CommonDownloaderParam, + ) -> None: + """ + 一个通用下载器 可提供3个下载源 并检查文件是否存在 如果存在则不进行下载 + + Args: + param (CommonDownloaderParam): 下载参数 + """ + self.param: CommonDownloaderParam = param + + def download( + self, + download_by_github: bool = True, + download_by_gitee: bool = False, + download_by_mirror_chan: bool = False, + proxy_url: str | None = None, + ghproxy_url: str | None = None, + skip_if_existed: bool = True, + progress_signal: dict[str, str | None] | None = None, + progress_callback: Callable[[float, str], None] | None = None + ) -> bool: + if skip_if_existed and self.is_file_existed(): + return True + + download_url: str = '' + if download_by_github and self.param.github_release_download_url is not None: + if ghproxy_url is not None: + download_url=f'{ghproxy_url}/{self.param.github_release_download_url}' + else: + download_url = self.param.github_release_download_url + elif download_by_gitee and self.param.gitee_release_download_url is not None: + download_url = self.param.gitee_release_download_url + elif download_by_mirror_chan and self.param.mirror_chan_download_url is not None: + download_url = self.param.mirror_chan_download_url + + if download_url == '': + log.error('没有指定下载方法或对应的下载地址') + return False + + return http_utils.download_file( + download_url=download_url, + save_file_path=os.path.join(self.param.save_file_path, self.param.save_file_name), + proxy=proxy_url, + progress_signal=progress_signal, + progress_callback=progress_callback) + + def is_file_existed(self) -> bool: + """ + 判断所需文件是否都已经存在了 + + Returns: + bool: 是否都存在 + """ + if not self.param.check_existed_list: + return False + all_existed: bool = True + for file_name in self.param.check_existed_list: + if not os.path.exists(file_name): + all_existed = False + break + return all_existed diff --git a/src/one_dragon/base/web/zip_downloader.py b/src/one_dragon/base/web/zip_downloader.py new file mode 100644 index 0000000..cf2e915 --- /dev/null +++ b/src/one_dragon/base/web/zip_downloader.py @@ -0,0 +1,98 @@ +import os +from collections.abc import Callable + +from one_dragon.base.web.common_downloader import ( + CommonDownloader, + CommonDownloaderParam, +) +from one_dragon.utils import file_utils +from one_dragon.utils.log_utils import log + + +class ZipDownloader(CommonDownloader): + + def __init__( + self, + param: CommonDownloaderParam, + ) -> None: + """ + 一个Zip的通用下载器 可提供3个下载源 并检查文件是否存在 如果存在则不进行下载 下载后进行文件解压 + + Args: + param (CommonDownloaderParam): 下载参数 + """ + CommonDownloader.__init__( + self, + param=param, + ) + + def download( + self, + download_by_github: bool = True, + download_by_gitee: bool = False, + download_by_mirror_chan: bool = False, + proxy_url: str | None = None, + ghproxy_url: str | None = None, + skip_if_existed: bool = True, + progress_signal: dict[str, str | None] | None = None, + progress_callback: Callable[[float, str], None] | None = None + ) -> bool: + for i in range(2): + download_result = CommonDownloader.download( + self, + download_by_github=download_by_github, + download_by_gitee=download_by_gitee, + download_by_mirror_chan=download_by_mirror_chan, + proxy_url=proxy_url, + ghproxy_url=ghproxy_url, + skip_if_existed=skip_if_existed if i == 0 else False, # 第2次重试时必定重新下载 + progress_signal=progress_signal, + progress_callback=progress_callback, + ) + + if not download_result: + return download_result + + unzip_result = self.unzip() + if unzip_result: + break + else: # 可能压缩包下载不完整 解压不成功 重新下载 + log.warning('疑似压缩包损毁 重新下载') + continue + + # 解压有可能失败 最后再判断一次解压产物是否已经存在 + return CommonDownloader.is_file_existed(self) + + def unzip(self) -> bool: + """ + 对目标压缩包进行解压 + """ + # 文件已存在则不解压 + exists = CommonDownloader.is_file_existed(self) + if exists: + return True + + zip_file_path = os.path.join(self.param.save_file_path, self.param.save_file_name) + if not os.path.exists(zip_file_path): + return False + + # 使用指定的解压路径,如果没有指定则使用save_file_path + unzip_dir = self.param.unzip_dir_path or self.param.save_file_path + os.makedirs(unzip_dir, exist_ok=True) + file_utils.unzip_file(zip_file_path=zip_file_path, unzip_dir_path=unzip_dir) + log.info(f"解压完成 {zip_file_path} 到 {unzip_dir}") + + # 最后判断压缩包以外的文件是否完整了 完整了才说明解压成功 + return CommonDownloader.is_file_existed(self) + + def is_file_existed(self) -> bool: + """ + 检查文件是否存在 + 额外判断压缩包是否已经存在了 + """ + exists = CommonDownloader.is_file_existed(self) + if exists: + return True + + zip_file_path = os.path.join(self.param.save_file_path, self.param.save_file_name) + return os.path.exists(zip_file_path) diff --git a/src/one_dragon/envs/ghproxy_service.py b/src/one_dragon/envs/ghproxy_service.py index 0b4f88d..522f923 100644 --- a/src/one_dragon/envs/ghproxy_service.py +++ b/src/one_dragon/envs/ghproxy_service.py @@ -1,5 +1,6 @@ import re -import urllib.request + +import requests from one_dragon.envs.env_config import EnvConfig from one_dragon.utils.log_utils import log @@ -10,35 +11,40 @@ class GhProxyService: def __init__(self, env_config: EnvConfig): self.env_config = env_config - def update_proxy_url(self) -> None: + def update_proxy_url(self) -> bool: """ 更新免费代理的url :return: """ url = 'https://ghproxy.link/js/src_views_home_HomeView_vue.js' # 打开 https://ghproxy.link/ 后找到的js文件 + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', + 'Referer': 'https://ghproxy.link/' + } try: - with urllib.request.urlopen(url, timeout=10) as response: - js_content: str = response.read().decode('utf-8') + response = requests.get(url, headers=headers, timeout=10) + response.raise_for_status() + js_content = response.text except Exception: log.error('自动获取免费代理地址失败', exc_info=True) - return + return False url_prefix = ' 标签 有多个时候忽略 等待后续再处理 if another_url_prefix_idx != -1: log.error('自动获取免费代理地址失败 有多个 标签') - return + return False proxy_url = js_content[url_prefix_idx + len(url_prefix):url_suffix_idx] # 判断 proxy_url 是一个 https 开头的域名 不包含任何路径 例如 https://ghfast.top @@ -47,10 +53,11 @@ def update_proxy_url(self) -> None: # 使用正则表达式匹配 if not re.match(pattern, proxy_url): log.error('自动获取免费代理地址失败 提取域名不合法 %s', proxy_url) - return + return False log.info('自动获取免费代理地址成功 %s', proxy_url) self.env_config.gh_proxy_url = proxy_url + return True def __debug(): @@ -59,4 +66,4 @@ def __debug(): if __name__ == '__main__': - __debug() \ No newline at end of file + __debug() diff --git a/src/one_dragon/envs/project_config.py b/src/one_dragon/envs/project_config.py index 6059715..275d0f1 100644 --- a/src/one_dragon/envs/project_config.py +++ b/src/one_dragon/envs/project_config.py @@ -4,7 +4,7 @@ class ProjectConfig(YamlConfig): def __init__(self): - super().__init__(module_name='project') + YamlConfig.__init__(self, module_name='project') self.project_name = self.get('project_name') self.python_version = self.get('python_version') @@ -14,9 +14,14 @@ def __init__(self): self.gitee_https_repository = self.get('gitee_https_repository') self.gitee_ssh_repository = self.get('gitee_ssh_repository') self.project_git_branch = self.get('project_git_branch') - self.requirements = self.get('requirements') + self.env_archive_name = f'{self.project_name}-Environment.zip' + self.game_executable_name = self.get('game_executable_name', '') self.screen_standard_width = int(self.get('screen_standard_width')) self.screen_standard_height = int(self.get('screen_standard_height')) + self.notice_url = self.get('notice_url') self.qq_link = self.get('qq_link') + self.quick_start_link = self.get('quick_start_link') # 链接 - 快速开始 + self.home_page_link = self.get('home_page_link') # 链接 - 主页 + self.doc_link = self.get('doc_link') # 链接 - 文档 diff --git a/src/one_dragon/utils/http_utils.py b/src/one_dragon/utils/http_utils.py index 639c62f..01de465 100644 --- a/src/one_dragon/utils/http_utils.py +++ b/src/one_dragon/utils/http_utils.py @@ -1,57 +1,97 @@ import time import urllib.request -from typing import Optional, Callable +from collections.abc import Callable +from pathlib import Path +from one_dragon.utils.i18_utils import gt from one_dragon.utils.log_utils import log def download_file(download_url: str, save_file_path: str, - proxy: Optional[str] = None, - progress_callback: Optional[Callable[[float, str], None]] = None) -> bool: + proxy: str | None = None, progress_signal: dict[str, str | None] | None = None, + progress_callback: Callable[[float, str], None] | None = None) -> bool: """ 下载文件 :param download_url: 下载的url :param save_file_path: 保存的文件路径,包含文件名 :param proxy: 使用的代理地址 + :param progress_signal: 进度信号字典,当字典中 'signal' 键的值为 'cancel' 时会取消下载 :param progress_callback: 下载进度的回调,进度发生改变时,通过该方法通知调用方。 :return: 是否下载成功 """ - if proxy is not None: - proxy_handler = urllib.request.ProxyHandler( - {'http': proxy, 'https': proxy}) - opener = urllib.request.build_opener(proxy_handler) - urllib.request.install_opener(opener) + proxy_handler = ( + urllib.request.ProxyHandler({'http': proxy, 'https': proxy}) + if proxy is not None else urllib.request.ProxyHandler({}) + ) + opener = urllib.request.build_opener(proxy_handler) last_log_time = time.time() + save_path = Path(save_file_path) + save_path.parent.mkdir(parents=True, exist_ok=True) - def log_download_progress(block_num, block_size, total_size): + def log_download_progress(downloaded_bytes: int, total_size: int) -> None: nonlocal last_log_time now = time.time() if now - last_log_time < 1: return last_log_time = now - downloaded = block_num * block_size / 1024.0 / 1024.0 - total_size_mb = total_size / 1024.0 / 1024.0 - progress = downloaded / total_size_mb - msg = f"正在下载 {downloaded:.2f}/{total_size_mb:.2f} MB ({progress * 100:.2f}%)" + + downloaded_mb = downloaded_bytes / 1024.0 / 1024.0 + if total_size > 0: + total_size_mb = total_size / 1024.0 / 1024.0 + progress = downloaded_bytes / total_size + msg = f"{gt('正在下载')} {downloaded_mb:.2f}/{total_size_mb:.2f} MB ({progress * 100:.2f}%)" + else: + progress = 0 + msg = f"{gt('正在下载')} {downloaded_mb:.2f} MB" + log.info(msg) if progress_callback is not None: progress_callback(progress, msg) try: - msg = f'开始下载 {download_url}' + msg = f"{gt('开始下载')} {download_url}" log.info(msg) if progress_callback is not None: progress_callback(0, msg) - _, __ = urllib.request.urlretrieve(download_url, save_file_path, log_download_progress) - msg = f'下载完成 {save_file_path}' + + request = urllib.request.Request(download_url) + with opener.open(request, timeout=60) as response, save_path.open('wb') as file: + total_size = int(response.headers.get('Content-Length', '0') or 0) + downloaded_bytes = 0 + chunk_size = 1024 * 64 + + while True: + if progress_signal is not None and progress_signal.get('signal') == 'cancel': + raise DownloadCancelledError("下载已取消") + + chunk = response.read(chunk_size) + if not chunk: + break + + file.write(chunk) + downloaded_bytes += len(chunk) + log_download_progress(downloaded_bytes, total_size) + + msg = f"{gt('下载完成')} {save_file_path}" log.info(msg) if progress_callback is not None: progress_callback(1, msg) return True + except DownloadCancelledError: + save_path.unlink(missing_ok=True) + msg = f"{gt('下载已取消')}" + log.info(msg) + if progress_callback is not None: + progress_callback(0, msg) + return False except Exception as e: - msg = f'下载失败 {e}' + save_path.unlink(missing_ok=True) + msg = f"{gt('下载失败')} {e}" if progress_callback is not None: progress_callback(0, msg) - log.error(msg, exec=True) + log.error(msg, exc_info=True) return False + +class DownloadCancelledError(Exception): + pass diff --git a/src/one_dragon/utils/i18_utils.py b/src/one_dragon/utils/i18_utils.py index 003e9bf..9bf1439 100644 --- a/src/one_dragon/utils/i18_utils.py +++ b/src/one_dragon/utils/i18_utils.py @@ -10,12 +10,12 @@ def get_translations(model: str, lang: str): """ - 加载语音 + 加载语言 :param model: 模块 将ocr 界面 日志等翻译区分开来 :param lang: 语言 :return: """ - translate_path = os_utils.get_path_under_work_dir('assets', 'text', 'output') + translate_path = os_utils.get_resource_path('assets', 'text', 'output') lang_dir = os.path.join(translate_path, lang, 'LC_MESSAGES', f'{model}.mo') # 未有对应的文本mo文件 if not os.path.exists(lang_dir): diff --git a/src/one_dragon/utils/os_utils.py b/src/one_dragon/utils/os_utils.py index 6075106..8668b49 100644 --- a/src/one_dragon/utils/os_utils.py +++ b/src/one_dragon/utils/os_utils.py @@ -35,7 +35,7 @@ def get_path_under_work_dir(*sub_paths: str) -> str: def get_resource_path(*sub_paths: str) -> str: """获取资源文件路径。 - 优先查找工作目录下的路径,不存在时回退到 PyInstaller _MEIPASS。 + 优先查找工作目录下的路径,不存在时回退到 PyInstaller _MEIPASS/resources。 """ work_path = os.path.join(get_work_dir(), *sub_paths) if os.path.exists(work_path): diff --git a/src/one_dragon_qt/view/like_interface.py b/src/one_dragon_qt/view/like_interface.py index 9693d65..7f9d17f 100644 --- a/src/one_dragon_qt/view/like_interface.py +++ b/src/one_dragon_qt/view/like_interface.py @@ -36,13 +36,14 @@ def get_content_widget(self) -> QWidget: url='https://one-dragon.com/other/zh/like/like.html') content.add_widget(cafe_opt) - img_label = ImageLabel() - img = cv2_utils.read_image(os.path.join(os_utils.get_path_under_work_dir('assets', 'ui'), 'sponsor_wechat.png')) - image = Cv2Image(img) - img_label.setImage(image) - img_label.setFixedWidth(250) - img_label.setFixedHeight(250) - content.add_widget(img_label) + img = cv2_utils.read_image(os_utils.get_resource_path('assets', 'ui', 'sponsor_wechat.png')) + if img is not None: + img_label = ImageLabel() + image = Cv2Image(img) + img_label.setImage(image) + img_label.setFixedWidth(250) + img_label.setFixedHeight(250) + content.add_widget(img_label) content.add_stretch(1) return content diff --git a/src/one_dragon_qt/view/setting/setting_instance_interface.py b/src/one_dragon_qt/view/setting/setting_instance_interface.py index c42e8c9..a95cd4b 100644 --- a/src/one_dragon_qt/view/setting/setting_instance_interface.py +++ b/src/one_dragon_qt/view/setting/setting_instance_interface.py @@ -374,8 +374,9 @@ def _on_instance_delete(self, idx: int) -> None: self._init_content_widget() def _on_game_path_clicked(self) -> None: + executable_name = self.ctx.project_config.game_executable_name or 'game.exe' file_path, _ = QFileDialog.getOpenFileName( - self, f"{gt('选择你的')} ZenlessZoneZero.exe", filter="Exe (*.exe)" + self, f"{gt('选择你的')} {executable_name}", filter="Exe (*.exe)" ) if file_path is not None and file_path.endswith(".exe"): log.info(f"{gt('选择路径')} {file_path}") diff --git a/src/script_chainer/context/script_chainer_context.py b/src/script_chainer/context/script_chainer_context.py index ad78074..959e653 100644 --- a/src/script_chainer/context/script_chainer_context.py +++ b/src/script_chainer/context/script_chainer_context.py @@ -6,10 +6,12 @@ from one_dragon.base.push.push_service import PushService from one_dragon.custom.custom_config import CustomConfig from one_dragon.envs.env_config import EnvConfig +from one_dragon.envs.ghproxy_service import GhProxyService from one_dragon.envs.project_config import ProjectConfig from one_dragon.utils import os_utils from one_dragon.utils.log_utils import log from script_chainer.config.script_config import ScriptChainConfig +from script_chainer.services.github_update_service import GithubUpdateService ONE_DRAGON_CONTEXT_EXECUTOR = ThreadPoolExecutor(thread_name_prefix='one_dragon_context', max_workers=1) @@ -20,6 +22,8 @@ def __init__(self): self.project_config: ProjectConfig = ProjectConfig() self.env_config: EnvConfig = EnvConfig() self.custom_config: CustomConfig = CustomConfig() + self.gh_proxy_service: GhProxyService = GhProxyService(self.env_config) + self.github_update_service: GithubUpdateService = GithubUpdateService(self) self.push_service: PushService = PushService(self) self.notify_config: NotifyConfig = NotifyConfig(None, {}) self._init_lock = threading.Lock() diff --git a/src/script_chainer/gui/page/editor_setting_interface.py b/src/script_chainer/gui/page/editor_setting_interface.py index 9e95e8f..66f98d2 100644 --- a/src/script_chainer/gui/page/editor_setting_interface.py +++ b/src/script_chainer/gui/page/editor_setting_interface.py @@ -1,28 +1,272 @@ +from PySide6.QtCore import Qt, QThread, Signal from PySide6.QtGui import QColor -from PySide6.QtWidgets import QWidget +from PySide6.QtWidgets import QApplication, QWidget from qfluentwidgets import ( ColorDialog, + ComboBox, FluentIcon, + HyperlinkButton, + PrimaryPushButton, + PushButton, SettingCardGroup, Theme, setTheme, ) from one_dragon.custom.custom_config import ThemeEnum +from one_dragon.envs.env_config import ProxyTypeEnum +from one_dragon.utils import os_utils +from one_dragon.utils.i18_utils import gt +from one_dragon.utils.log_utils import log from one_dragon_qt.services.theme_manager import ThemeManager from one_dragon_qt.widgets.column import Column from one_dragon_qt.widgets.setting_card.combo_box_setting_card import ( ComboBoxSettingCard, ) +from one_dragon_qt.widgets.setting_card.multi_push_setting_card import ( + MultiPushSettingCard, +) from one_dragon_qt.widgets.setting_card.push_setting_card import PushSettingCard +from one_dragon_qt.widgets.setting_card.switch_setting_card import SwitchSettingCard +from one_dragon_qt.widgets.setting_card.text_setting_card import TextSettingCard from one_dragon_qt.widgets.vertical_scroll_interface import VerticalScrollInterface from script_chainer.context.script_chainer_context import ScriptChainerContext +from script_chainer.services.github_update_service import GithubUpdateService + + +class GithubUpdateRunner(QThread): + + progress_changed = Signal(float, str) + update_finished = Signal(bool, str) + + def __init__(self, service: GithubUpdateService, target_version: str): + QThread.__init__(self) + self.service = service + self.target_version = target_version + self.progress_signal: dict[str, str | None] = {'signal': None} + + def run(self) -> None: + success, message = self.service.download_and_restart( + self.target_version, + self._on_progress, + self.progress_signal, + ) + self.update_finished.emit(success, message) + + def _on_progress(self, progress: float, message: str) -> None: + self.progress_changed.emit(progress, message) + + def cancel(self) -> None: + self.progress_signal['signal'] = 'cancel' + + +class GithubUpdateChecker(QThread): + + check_finished = Signal(bool, str, str, str) + + def __init__(self, ctx: ScriptChainerContext): + QThread.__init__(self) + self.ctx = ctx + + def run(self) -> None: + try: + latest_stable, latest_beta = self.ctx.github_update_service.get_latest_tags() + self.check_finished.emit(True, latest_stable, latest_beta, '') + except Exception as e: + log.error('检查 GitHub 更新失败', exc_info=True) + self.check_finished.emit(False, '', '', str(e)) + + +class GhProxyUpdateRunner(QThread): + + update_finished = Signal(bool, str) + + def __init__(self, ctx: ScriptChainerContext): + QThread.__init__(self) + self.ctx = ctx + + def run(self) -> None: + success = False + try: + success = self.ctx.gh_proxy_service.update_proxy_url() + finally: + self.update_finished.emit(success, self.ctx.env_config.gh_proxy_url) + + +class GithubUpdateCard(MultiPushSettingCard): + + def __init__(self, ctx: ScriptChainerContext, parent=None): + self.ctx = ctx + self.current_version = '' + self.latest_stable_version = '' + self.latest_beta_version = '' + self.target_version = '' + + self.channel_combo = ComboBox() + self.channel_combo.addItem(gt('正式版'), userData='stable') + self.channel_combo.addItem(gt('测试版'), userData='beta') + self.channel_combo.setCurrentIndex(0) + self.channel_combo.currentIndexChanged.connect(self._on_channel_changed) + + self.check_btn = PushButton(gt('检查')) + self.check_btn.clicked.connect(self.check_and_update_display) + + self.update_btn = PrimaryPushButton(text=gt('更新')) + self.update_btn.clicked.connect(self._on_update_clicked) + + self.cancel_btn = PushButton(gt('取消')) + self.cancel_btn.clicked.connect(self._on_cancel_clicked) + self.cancel_btn.setVisible(False) + + self.version_checker = GithubUpdateChecker(ctx) + self.version_checker.check_finished.connect(self._on_version_check_finished) + + self.update_runner: GithubUpdateRunner | None = None + + MultiPushSettingCard.__init__( + self, + btn_list=[self.channel_combo, self.check_btn, self.update_btn, self.cancel_btn], + icon=FluentIcon.SYNC, + title='程序更新', + content='检查中...', + parent=parent, + ) + self.check_and_update_display() + + def check_and_update_display(self) -> None: + if self.version_checker.isRunning() or self._is_update_running(): + return + + self.cancel_btn.setVisible(False) + self.cancel_btn.setEnabled(False) + self.setContent(gt('正在检查 GitHub 最新版本...')) + self.channel_combo.setDisabled(True) + self.check_btn.setDisabled(True) + self.update_btn.setDisabled(True) + self.update_btn.setText(gt('检查中')) + self.version_checker.start() + + def _on_version_check_finished( + self, + success: bool, + latest_stable_version: str, + latest_beta_version: str, + message: str, + ) -> None: + self.channel_combo.setEnabled(True) + self.check_btn.setEnabled(True) + + if not success: + self.latest_stable_version = '' + self.latest_beta_version = '' + self.target_version = '' + self.setContent(f"{gt('检查更新失败')}: {message}") + self.update_btn.setText(gt('重试')) + self.update_btn.setDisabled(True) + return + + self.current_version = self._current_version() + self.latest_stable_version = latest_stable_version + self.latest_beta_version = latest_beta_version + self._update_display_by_channel() + + def _on_channel_changed(self, _index: int) -> None: + if self.version_checker.isRunning() or self._is_update_running(): + return + self._update_display_by_channel() + + def _update_display_by_channel(self) -> None: + channel = self.channel_combo.currentData() + channel_name = gt('测试版') if channel == 'beta' else gt('正式版') + self.target_version = self.latest_beta_version if channel == 'beta' else self.latest_stable_version + + if not self.target_version: + self.setContent(f"{channel_name}{gt('暂无可用版本')}") + self.update_btn.setText(gt('不可用')) + self.update_btn.setDisabled(True) + return + + if not os_utils.run_in_exe(): + self.setContent( + f"{gt('当前版本')}: {self.current_version}; " + f"{channel_name}: {self.target_version}; " + f"{gt('当前不是发布版,无法自动更新')}" + ) + self.update_btn.setText(gt('不可用')) + self.update_btn.setDisabled(True) + elif self.current_version == self.target_version: + self.setContent(f"{gt('已是最新版本')} {self.current_version}") + self.update_btn.setText(gt('已最新')) + self.update_btn.setDisabled(True) + else: + self.setContent( + f"{gt('可更新')} {gt('当前版本')}: {self.current_version}; " + f"{channel_name}: {self.target_version}" + ) + self.update_btn.setText(gt('更新')) + self.update_btn.setEnabled(True) + + def _on_update_clicked(self) -> None: + if self._is_update_running(): + return + + self.channel_combo.setDisabled(True) + self.check_btn.setDisabled(True) + self.update_btn.setDisabled(True) + self.update_btn.setText(gt('更新中')) + self.cancel_btn.setVisible(True) + self.cancel_btn.setEnabled(True) + self.setContent(gt('正在准备 GitHub 更新...')) + self.update_runner = GithubUpdateRunner(self.ctx.github_update_service, self.target_version) + self.update_runner.progress_changed.connect(self._on_update_progress) + self.update_runner.update_finished.connect(self._on_update_finished) + self.update_runner.start() + + def _on_cancel_clicked(self) -> None: + if not self._is_update_running() or self.update_runner is None: + return + + self.update_runner.cancel() + self.cancel_btn.setDisabled(True) + self.update_btn.setText(gt('取消中')) + self.setContent(gt('正在取消下载...')) + + def _on_update_progress(self, progress: float, message: str) -> None: + if progress > 0: + self.setContent(f'{message} {progress:.0%}') + else: + self.setContent(message) + + def _on_update_finished(self, success: bool, message: str) -> None: + self.cancel_btn.setVisible(False) + self.cancel_btn.setEnabled(False) + self.setContent(message) + if success: + self.update_btn.setText(gt('重启中')) + QApplication.quit() + return + + self.channel_combo.setEnabled(True) + self.check_btn.setEnabled(True) + self.update_btn.setText(gt('更新') if message == gt('下载已取消') else gt('重试')) + self.update_btn.setEnabled(True) + + def _is_update_running(self) -> bool: + return self.update_runner is not None and self.update_runner.isRunning() + + def _current_version(self) -> str: + from one_dragon.version import __version__ + + return __version__ class EditorSettingInterface(VerticalScrollInterface): def __init__(self, ctx: ScriptChainerContext, parent=None): self.ctx: ScriptChainerContext = ctx + self._gh_proxy_auto_refreshed = False + self.gh_proxy_update_runner = GhProxyUpdateRunner(ctx) + self.gh_proxy_update_runner.update_finished.connect(self._on_gh_proxy_update_finished) VerticalScrollInterface.__init__( self, @@ -35,13 +279,14 @@ def __init__(self, ctx: ScriptChainerContext, parent=None): def get_content_widget(self) -> QWidget: content_widget = Column() - content_widget.add_widget(self.get_basic_group()) + content_widget.add_widget(self.get_appearance_group()) + content_widget.add_widget(self.get_update_group()) content_widget.add_stretch(1) return content_widget - def get_basic_group(self) -> SettingCardGroup: - group = SettingCardGroup('基础') + def get_appearance_group(self) -> SettingCardGroup: + group = SettingCardGroup(gt('外观')) self.theme_opt = ComboBoxSettingCard( icon=FluentIcon.CONSTRACT, title='界面主题', @@ -57,9 +302,71 @@ def get_basic_group(self) -> SettingCardGroup: return group + def get_update_group(self) -> SettingCardGroup: + group = SettingCardGroup(gt('更新')) + + self.github_update_opt = GithubUpdateCard(self.ctx) + group.addSettingCard(self.github_update_opt) + + self.proxy_type_opt = ComboBoxSettingCard( + icon=FluentIcon.GLOBE, + title='代理类型', + options_enum=ProxyTypeEnum, + ) + self.proxy_type_opt.value_changed.connect(self._on_proxy_type_changed) + group.addSettingCard(self.proxy_type_opt) + + self.personal_proxy_input = TextSettingCard( + icon=FluentIcon.WIFI, + title='个人代理', + input_placeholder='http://127.0.0.1:8080', + ) + self.personal_proxy_input.value_changed.connect(self._on_proxy_changed) + group.addSettingCard(self.personal_proxy_input) + + self.gh_proxy_url_opt = TextSettingCard( + icon=FluentIcon.GLOBE, + title='GitHub 代理', + ) + group.addSettingCard(self.gh_proxy_url_opt) + + self.auto_fetch_gh_proxy_url_opt = SwitchSettingCard( + icon=FluentIcon.SYNC, + title='自动获取免费代理地址', + content='获取失败时 可前往 https://ghproxy.link/ 查看自行更新', + ) + self.fetch_gh_proxy_url_btn = PushButton(gt('获取'), self) + self.fetch_gh_proxy_url_btn.clicked.connect(self.on_fetch_gh_proxy_url_clicked) + self.auto_fetch_gh_proxy_url_opt.hBoxLayout.addWidget( + self.fetch_gh_proxy_url_btn, + 0, + Qt.AlignmentFlag.AlignRight, + ) + self.auto_fetch_gh_proxy_url_opt.hBoxLayout.addSpacing(16) + + self.goto_gh_proxy_link_btn = HyperlinkButton('https://ghproxy.link', gt('前往'), self) + self.auto_fetch_gh_proxy_url_opt.hBoxLayout.addWidget( + self.goto_gh_proxy_link_btn, + 0, + Qt.AlignmentFlag.AlignRight, + ) + self.auto_fetch_gh_proxy_url_opt.hBoxLayout.addSpacing(16) + + group.addSettingCard(self.auto_fetch_gh_proxy_url_opt) + + return group + def on_interface_shown(self) -> None: VerticalScrollInterface.on_interface_shown(self) self.theme_opt.init_with_adapter(self.ctx.custom_config.get_prop_adapter('theme')) + self.proxy_type_opt.init_with_adapter(self.ctx.env_config.get_prop_adapter('proxy_type')) + self.personal_proxy_input.init_with_adapter(self.ctx.env_config.get_prop_adapter('personal_proxy')) + self.gh_proxy_url_opt.init_with_adapter(self.ctx.env_config.get_prop_adapter('gh_proxy_url')) + self.auto_fetch_gh_proxy_url_opt.init_with_adapter( + self.ctx.env_config.get_prop_adapter('auto_fetch_gh_proxy_url') + ) + self.update_proxy_ui() + self.refresh_gh_proxy_url_by_config() def on_theme_changed(self, index: int, value: str) -> None: """主题改变。 @@ -84,3 +391,46 @@ def _update_custom_theme_color(self, color: QColor) -> None: self.ctx.custom_config.theme_color = color_tuple ThemeManager.set_theme_color(color_tuple) + def _on_proxy_type_changed(self, index: int, value: str) -> None: + self.update_proxy_ui() + self._on_proxy_changed() + + def _on_proxy_changed(self) -> None: + self.ctx.env_config.init_system_proxy() + + def on_fetch_gh_proxy_url_clicked(self) -> None: + self.start_gh_proxy_url_refresh() + + def refresh_gh_proxy_url_by_config(self) -> None: + if self._gh_proxy_auto_refreshed: + return + if not self.ctx.env_config.auto_fetch_gh_proxy_url: + return + if self.ctx.env_config.proxy_type != ProxyTypeEnum.GHPROXY.value.value: + return + self.start_gh_proxy_url_refresh() + + def start_gh_proxy_url_refresh(self) -> None: + if self.gh_proxy_update_runner.isRunning(): + return + self.fetch_gh_proxy_url_btn.setDisabled(True) + self.gh_proxy_update_runner.start() + + def _on_gh_proxy_update_finished(self, success: bool, proxy_url: str) -> None: + self._gh_proxy_auto_refreshed = success + self.gh_proxy_url_opt.setValue(proxy_url, emit_signal=False) + self.fetch_gh_proxy_url_btn.setEnabled(True) + + def update_proxy_ui(self) -> None: + if self.ctx.env_config.proxy_type == ProxyTypeEnum.GHPROXY.value.value: + self.personal_proxy_input.hide() + self.gh_proxy_url_opt.show() + self.auto_fetch_gh_proxy_url_opt.show() + elif self.ctx.env_config.proxy_type == ProxyTypeEnum.PERSONAL.value.value: + self.personal_proxy_input.show() + self.gh_proxy_url_opt.hide() + self.auto_fetch_gh_proxy_url_opt.hide() + elif self.ctx.env_config.proxy_type == ProxyTypeEnum.NONE.value.value: + self.personal_proxy_input.hide() + self.gh_proxy_url_opt.hide() + self.auto_fetch_gh_proxy_url_opt.hide() diff --git a/src/script_chainer/services/github_update_service.py b/src/script_chainer/services/github_update_service.py new file mode 100644 index 0000000..4e559e8 --- /dev/null +++ b/src/script_chainer/services/github_update_service.py @@ -0,0 +1,298 @@ +import json +import os +import shutil +import subprocess +import sys +import urllib.parse +import urllib.request +import zipfile +from collections.abc import Callable +from pathlib import Path, PurePosixPath +from typing import TYPE_CHECKING + +from one_dragon.base.web.common_downloader import ( + CommonDownloader, + CommonDownloaderParam, +) +from one_dragon.utils import os_utils +from one_dragon.utils.i18_utils import gt +from one_dragon.utils.log_utils import log + +if TYPE_CHECKING: + from script_chainer.context.script_chainer_context import ScriptChainerContext + +ProgressCallback = Callable[[float, str], None] + +APP_EXE_NAMES = ( + 'OneDragon ScriptChainer.exe', + 'OneDragon ScriptChainer Runner.exe', +) +RELEASE_ZIP_PREFIX = 'OneDragon-ScriptChainer' +UPDATE_DIR_NAME = 'github_update' +RUNTIME_DIR_NAME = '.runtime' + + +class GithubUpdateService: + """下载 GitHub Release,并在进程退出后替换发布版文件。""" + + def __init__(self, ctx: 'ScriptChainerContext'): + self.ctx = ctx + + def download_and_restart( + self, + target_tag: str | None = None, + progress_callback: ProgressCallback | None = None, + progress_signal: dict[str, str | None] | None = None, + ) -> tuple[bool, str]: + if not os_utils.run_in_exe(): + return False, gt('当前不是发布版,无法自动更新') + + work_dir = Path(os_utils.get_work_dir()) + update_dir = work_dir / '.temp' / UPDATE_DIR_NAME + stage_dir = update_dir / 'stage' + + try: + if update_dir.exists(): + shutil.rmtree(update_dir) + stage_dir.mkdir(parents=True, exist_ok=True) + + if progress_callback is not None: + progress_callback(0, gt('正在获取 GitHub 更新版本')) + latest_tag = target_tag or self.get_latest_tag() + downloader_param = self._get_downloader_param(latest_tag, update_dir) + release_zip_name = downloader_param.save_file_name + zip_path = update_dir / release_zip_name + downloader = CommonDownloader(downloader_param) + proxy = self.ctx.env_config.personal_proxy if self.ctx.env_config.is_personal_proxy else None + ghproxy_url = self.ctx.env_config.gh_proxy_url if self.ctx.env_config.is_gh_proxy else None + if not downloader.download( + proxy_url=proxy, + ghproxy_url=ghproxy_url, + skip_if_existed=False, + progress_signal=progress_signal, + progress_callback=progress_callback, + ): + if progress_signal is not None and progress_signal.get('signal') == 'cancel': + return False, gt('下载已取消') + return False, gt('下载 GitHub 更新失败') + + if progress_signal is not None and progress_signal.get('signal') == 'cancel': + return False, gt('下载已取消') + + if progress_callback is not None: + progress_callback(1, gt('正在解压更新包')) + self._extract_release(zip_path, stage_dir) + + update_items = self._get_update_items(stage_dir) + if not update_items: + return False, gt('更新包中没有可替换文件') + + script_path = self._write_apply_script(work_dir, update_dir, update_items) + self._start_apply_script(script_path, work_dir) + return True, gt('更新包已下载完成,程序将重启并替换文件') + except Exception as e: + log.error('准备 GitHub 更新失败', exc_info=True) + return False, f"{gt('准备 GitHub 更新失败')}: {e}" + + def get_latest_tags(self) -> tuple[str, str]: + stable_tag = self.get_latest_tag() + try: + beta_tag = self.get_latest_beta_tag() + except Exception: + log.error('获取 GitHub 最新测试版失败', exc_info=True) + beta_tag = '' + return stable_tag, beta_tag + + def get_latest_tag(self) -> str: + repo_path = self._get_github_repo_path() + release_url = f'https://api.github.com/repos/{repo_path}/releases/latest' + with self._open_request(release_url, headers={'User-Agent': 'OneDragon-ScriptChainer'}) as response: + release = json.loads(response.read().decode('utf-8')) + + tag = str(release.get('tag_name', '')).strip() + if not tag: + raise RuntimeError(f'无法解析最新版本: {release_url}') + return tag + + def get_latest_beta_tag(self) -> str: + repo_path = self._get_github_repo_path() + releases_url = f'https://api.github.com/repos/{repo_path}/releases?per_page=30' + with self._open_request(releases_url, headers={'User-Agent': 'OneDragon-ScriptChainer'}) as response: + releases = json.loads(response.read().decode('utf-8')) + + for release in releases: + if release.get('draft'): + continue + if release.get('prerelease'): + tag = str(release.get('tag_name', '')).strip() + if tag: + return tag + + return '' + + def _get_github_repo_path(self) -> str: + parsed = urllib.parse.urlparse(self.ctx.project_config.github_homepage) + repo_path = parsed.path.strip('/') + if repo_path.endswith('.git'): + repo_path = repo_path[:-4] + if repo_path.count('/') < 1: + raise RuntimeError(f'无法解析 GitHub 仓库地址: {self.ctx.project_config.github_homepage}') + return repo_path + + def _open_request( + self, + url: str, + method: str = 'GET', + headers: dict[str, str] | None = None, + ): + request = urllib.request.Request(url, method=method, headers=headers or {}) + return self._build_opener().open(request, timeout=15) + + def _build_opener(self): + proxy = self.ctx.env_config.personal_proxy if self.ctx.env_config.is_personal_proxy else None + proxy_handler = ( + urllib.request.ProxyHandler({'http': proxy, 'https': proxy}) + if proxy is not None else urllib.request.ProxyHandler({}) + ) + return urllib.request.build_opener(proxy_handler) + + def _get_downloader_param(self, latest_tag: str, update_dir: Path) -> CommonDownloaderParam: + release_zip_name = f'{RELEASE_ZIP_PREFIX}-{latest_tag}.zip' + download_url = ( + f'{self.ctx.project_config.github_homepage}' + f'/releases/download/{latest_tag}/{release_zip_name}' + ) + + return CommonDownloaderParam( + save_file_path=str(update_dir), + save_file_name=release_zip_name, + github_release_download_url=download_url, + ) + + def _extract_release(self, zip_path: Path, stage_dir: Path) -> None: + stage_root = stage_dir.resolve() + with zipfile.ZipFile(zip_path) as zip_file: + root_dir = self._detect_root_dir(zip_file) + for info in zip_file.infolist(): + if info.is_dir(): + continue + + parts = PurePosixPath(info.filename.replace('\\', '/')).parts + if root_dir is not None and parts and parts[0] == root_dir: + parts = parts[1:] + if not parts or '..' in parts: + continue + if not self._should_extract(parts): + continue + + target_path = stage_dir.joinpath(*parts) + if not self._is_relative_to(target_path.resolve(), stage_root): + continue + + target_path.parent.mkdir(parents=True, exist_ok=True) + with zip_file.open(info) as source, target_path.open('wb') as target: + shutil.copyfileobj(source, target) + + def _detect_root_dir(self, zip_file: zipfile.ZipFile) -> str | None: + file_parts = [ + PurePosixPath(info.filename.replace('\\', '/')).parts + for info in zip_file.infolist() + if not info.is_dir() + ] + if not file_parts: + return None + + first_parts = [parts[0] for parts in file_parts if len(parts) > 1] + if len(first_parts) != len(file_parts): + return None + + root_dir = first_parts[0] + return root_dir if all(part == root_dir for part in first_parts) else None + + def _should_extract(self, parts: tuple[str, ...]) -> bool: + top_name = parts[0] + if top_name == RUNTIME_DIR_NAME: + return True + return len(parts) == 1 and top_name.lower().endswith('.exe') + + def _get_update_items(self, stage_dir: Path) -> list[Path]: + items: list[Path] = [] + for name in (*APP_EXE_NAMES, RUNTIME_DIR_NAME): + item_path = stage_dir / name + if item_path.exists(): + items.append(item_path) + + known_names = {item.name for item in items} + for item_path in stage_dir.glob('*.exe'): + if item_path.name not in known_names: + items.append(item_path) + return items + + def _write_apply_script( + self, + work_dir: Path, + update_dir: Path, + update_items: list[Path], + ) -> Path: + lines = [ + "$ErrorActionPreference = 'Stop'", + f'$ProcessIdToWait = {os.getpid()}', + f'$CurrentExe = {self._ps_quote(sys.executable)}', + f'$WorkDir = {self._ps_quote(str(work_dir))}', + 'Start-Sleep -Milliseconds 500', + 'try { Wait-Process -Id $ProcessIdToWait -Timeout 30 -ErrorAction SilentlyContinue } catch {}', + ] + + for item_path in update_items: + target_path = work_dir / item_path.name + backup_path = work_dir / f'{item_path.name}.bak' + lines.extend([ + f'$Source = {self._ps_quote(str(item_path))}', + f'$Target = {self._ps_quote(str(target_path))}', + f'$Backup = {self._ps_quote(str(backup_path))}', + 'if (Test-Path -LiteralPath $Backup) { Remove-Item -LiteralPath $Backup -Recurse -Force }', + 'if (Test-Path -LiteralPath $Target) { Move-Item -LiteralPath $Target -Destination $Backup -Force }', + 'try {', + ' Move-Item -LiteralPath $Source -Destination $Target -Force', + ' if (Test-Path -LiteralPath $Backup) { Remove-Item -LiteralPath $Backup -Recurse -Force }', + '} catch {', + ' if (Test-Path -LiteralPath $Backup) {', + ' if (Test-Path -LiteralPath $Target) { Remove-Item -LiteralPath $Target -Recurse -Force }', + ' Move-Item -LiteralPath $Backup -Destination $Target -Force', + ' }', + ' throw', + '}', + ]) + + lines.extend([ + 'Start-Process -FilePath $CurrentExe -WorkingDirectory $WorkDir', + f'Remove-Item -LiteralPath {self._ps_quote(str(update_dir))} -Recurse -Force', + ]) + + script_path = update_dir / 'apply_update.ps1' + script_path.write_text('\n'.join(lines), encoding='utf-8') + return script_path + + def _start_apply_script(self, script_path: Path, work_dir: Path) -> None: + subprocess.Popen( + [ + 'powershell', + '-NoProfile', + '-ExecutionPolicy', + 'Bypass', + '-File', + str(script_path), + ], + cwd=str(work_dir), + creationflags=subprocess.CREATE_NO_WINDOW, + ) + + def _ps_quote(self, value: str) -> str: + return "'" + value.replace("'", "''") + "'" + + def _is_relative_to(self, path: Path, parent: Path) -> bool: + try: + path.relative_to(parent) + return True + except ValueError: + return False diff --git a/src/script_chainer/win_exe/script_runner.py b/src/script_chainer/win_exe/script_runner.py index 49a1990..171f9a8 100644 --- a/src/script_chainer/win_exe/script_runner.py +++ b/src/script_chainer/win_exe/script_runner.py @@ -653,7 +653,7 @@ def run_chain(chain_name: str = '01', shutdown_delay: int = 0, debug_index: int log.error(f'初始化上下文实例失败: {e}') try: - if not chain_config.is_file_exists(): + if not chain_config.is_file_exists: print_message(f'脚本链配置不存在 {chain_name}', "ERROR") else: attach_targets = chain_config.compute_attach_targets()