diff --git a/.github/workflows/build-release.yml b/.github/workflows/build-release.yml
index b5adab0..be0387c 100644
--- a/.github/workflows/build-release.yml
+++ b/.github/workflows/build-release.yml
@@ -136,19 +136,22 @@ jobs:
cd deploy
pyinstaller "OneDragon ScriptChainer.spec"
pyinstaller "OneDragon ScriptChainer Runner.spec"
+ Copy-Item "dist/OneDragon ScriptChainer Runner.exe" -Destination "dist/OneDragon ScriptChainer/" -Force
- name: Upload Editor
if: ${{ env.CREATE_RELEASE == 'false' }}
uses: actions/upload-artifact@v4
with:
- name: Editor
- path: deploy/dist/OneDragon ScriptChainer.exe
+ name: Editor-r${{ github.run_number }}
+ include-hidden-files: true
+ path: deploy/dist/OneDragon ScriptChainer/
- name: Upload Runner
if: ${{ env.CREATE_RELEASE == 'false' }}
uses: actions/upload-artifact@v4
with:
- name: Runner
+ name: Runner-r${{ github.run_number }}
+ include-hidden-files: true
path: deploy/dist/OneDragon ScriptChainer Runner.exe
- name: Upload Dist
@@ -156,8 +159,9 @@ jobs:
uses: actions/upload-artifact@v4
with:
name: dist
+ include-hidden-files: true
if-no-files-found: error
- path: deploy/dist
+ path: deploy/dist/OneDragon ScriptChainer/
release:
runs-on: windows-latest
@@ -173,7 +177,7 @@ jobs:
uses: actions/download-artifact@v4
with:
name: dist
- path: .
+ path: OneDragon ScriptChainer
- name: Prepare release packages
shell: pwsh
@@ -182,8 +186,7 @@ jobs:
run: |
$version = $env:RELEASE_VERSION
- # 在根目录打包两个 exe
- Compress-Archive -Path "OneDragon ScriptChainer.exe","OneDragon ScriptChainer Runner.exe" -DestinationPath "OneDragon-ScriptChainer-$version.zip" -Force
+ Compress-Archive -Path "OneDragon ScriptChainer" -DestinationPath "OneDragon-ScriptChainer-$version.zip" -Force
- name: Generate Changelog
@@ -310,10 +313,12 @@ jobs:
### 使用方法
1. 下载 `OneDragon-ScriptChainer-${{ needs.build.outputs.version }}.zip`
- 2. 解压到任意目录
+ 2. 设置中可以自更新。如需手动更新,请先删除旧版的 `.runtime` 文件夹,再解压覆盖
3. 运行 `OneDragon ScriptChainer.exe` 创建和编辑脚本链
4. 运行 `OneDragon ScriptChainer Runner.exe` 执行脚本链
+ > 注意:`OneDragon ScriptChainer.exe` 所在目录包含运行时依赖(`.runtime` 文件夹),请勿单独移动该文件。
+
### 命令行参数
`OneDragon ScriptChainer.exe` 支持以下参数:
diff --git a/.gitignore b/.gitignore
index d84134f..03a6a5b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,4 +12,5 @@ __pycache__/
.log/
config/script_chain/
config/custom.yml
-config/push.yml
\ No newline at end of file
+config/push.yml
+config/env.yml
\ No newline at end of file
diff --git a/config/project.yml b/config/project.yml
index d9df2b3..7899041 100644
--- a/config/project.yml
+++ b/config/project.yml
@@ -6,7 +6,6 @@ github_ssh_repository: "git@github.com:OneDragon-Anything/OneDragon-ScriptChaine
gitee_https_repository: "https://gitee.com/OneDragon-Anything/OneDragon-ScriptChainer.git"
gitee_ssh_repository: "git@gitee.com:OneDragon-Anything/OneDragon-ScriptChainer.git"
project_git_branch: "main"
-requirements: "requirements-prod.txt"
screen_standard_width: 1920
screen_standard_height: 1080
pip_source: "https://pypi.tuna.tsinghua.edu.cn/simple"
diff --git a/deploy/OneDragon ScriptChainer Runner.spec b/deploy/OneDragon ScriptChainer Runner.spec
index 7567860..e111db0 100644
--- a/deploy/OneDragon ScriptChainer Runner.spec
+++ b/deploy/OneDragon ScriptChainer Runner.spec
@@ -5,7 +5,7 @@ a = Analysis(
['..\\src\\script_chainer\\win_exe\\script_runner.py'],
pathex=[],
binaries=[],
- datas=[('../config/project.yml', 'config')],
+ datas=[('../config/project.yml', 'resources/config')],
hiddenimports=[],
hookspath=[],
hooksconfig={},
diff --git a/deploy/OneDragon ScriptChainer.spec b/deploy/OneDragon ScriptChainer.spec
index 9136733..03beccb 100644
--- a/deploy/OneDragon ScriptChainer.spec
+++ b/deploy/OneDragon ScriptChainer.spec
@@ -6,8 +6,8 @@ a = Analysis(
pathex=[],
binaries=[],
datas=[
- ('../config/project.yml', 'config'),
- ('../assets', 'assets')
+ ('../config/project.yml', 'resources/config'),
+ ('../assets', 'resources/assets')
],
hiddenimports=[],
hookspath=[],
@@ -22,16 +22,14 @@ pyz = PYZ(a.pure)
exe = EXE(
pyz,
a.scripts,
- a.binaries,
- a.datas,
[],
+ exclude_binaries=True,
name='OneDragon ScriptChainer',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
- runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
@@ -40,4 +38,15 @@ exe = EXE(
entitlements_file=None,
uac_admin=False,
icon=['..\\assets\\ui\\editor_icon.ico'],
+ contents_directory='.runtime',
+)
+
+coll = COLLECT(
+ exe,
+ a.binaries,
+ a.datas,
+ strip=False,
+ upx=True,
+ upx_exclude=[],
+ name='OneDragon ScriptChainer',
)
diff --git a/src/one_dragon/base/config/yaml_config.py b/src/one_dragon/base/config/yaml_config.py
index 4db354a..4a3fb1f 100644
--- a/src/one_dragon/base/config/yaml_config.py
+++ b/src/one_dragon/base/config/yaml_config.py
@@ -1,6 +1,5 @@
import os
import shutil
-from typing import Optional, List
from one_dragon.base.config.yaml_operator import YamlOperator
from one_dragon.utils import os_utils
@@ -8,21 +7,28 @@
class YamlConfig(YamlOperator):
- def __init__(self,
- module_name: str,
- instance_idx: Optional[int] = None,
- sub_dir: Optional[List[str]] = None,
- sample: bool = False, copy_from_sample: bool = False,
- is_mock: bool = False):
- self.instance_idx: Optional[int] = instance_idx
+ def __init__(
+ self,
+ module_name: str,
+ backup_module_name: str | None = None,
+ instance_idx: int | None = None,
+ sub_dir: list[str] | None = None,
+ sample: bool = False, copy_from_sample: bool = False,
+ read_sample_only: bool = False,
+ is_mock: bool = False
+ ):
+ self.instance_idx: int | None = instance_idx
"""传入时 该配置为一个的脚本实例独有的配置"""
- self.sub_dir: Optional[List[str]] = sub_dir
+ self.sub_dir: list[str] | None = sub_dir
"""配置所在的子目录"""
self.module_name: str = module_name
"""配置文件名称"""
+ self.backup_module_name: str | None = backup_module_name
+ """备用的配置文件名称 主要用于配置文件改名时做迁移使用"""
+
self.is_mock: bool = is_mock
"""mock情况下 不读取文件 也不会实际保存 用于测试"""
@@ -32,32 +38,66 @@ def __init__(self,
self._copy_from_sample: bool = copy_from_sample
"""配置文件不存在时 是否从sample文件中读取"""
- YamlOperator.__init__(self, self._get_yaml_file_path())
+ self._read_sample_only: bool = read_sample_only
+ """是否只读取sample文件(即使.yml文件存在也只读sample)"""
+
+ file_path, write_file_path, copy_on_write_source_path = self._get_yaml_file_paths()
+ YamlOperator.__init__(self, file_path)
+ self._write_file_path = write_file_path
+ self._copy_on_write_source_path = copy_on_write_source_path
- def _get_yaml_file_path(self) -> Optional[str]:
+ def _get_yaml_file_paths(self) -> tuple[str | None, str | None, str | None]:
"""
- 获取配置文件的路径
- 如果只有sample文件,就复制一个到实例文件夹下
+ 获取配置文件的读取路径、写入路径
:return:
"""
if self.is_mock:
- return None
+ return None, None, None
sub_dir = ['config']
if self.instance_idx is not None:
sub_dir.append('%02d' % self.instance_idx)
if self.sub_dir is not None:
sub_dir = sub_dir + self.sub_dir
- yml_path = os.path.join(os_utils.get_path_under_work_dir(*sub_dir), f'{self.module_name}.yml')
- sample_yml_path = os.path.join(os_utils.get_path_under_work_dir(*sub_dir), f'{self.module_name}.sample.yml')
- usage_yml_path = sample_yml_path if self._sample and not os.path.exists(yml_path) else yml_path
- use_sample = self._sample and not os.path.exists(yml_path)
+ dir_path = os_utils.get_path_under_work_dir(*sub_dir)
+
+ yml_path = os.path.join(dir_path, f'{self.module_name}.yml')
+ sample_yml_path = os.path.join(dir_path, f'{self.module_name}.sample.yml')
+
+ # 只读sample文件模式
+ if self._read_sample_only and os.path.exists(sample_yml_path):
+ return sample_yml_path, sample_yml_path, None
+
+ # 指定文件存在时 直接使用
+ if os.path.exists(yml_path):
+ return yml_path, yml_path, None
- if use_sample and self._copy_from_sample:
- shutil.copyfile(sample_yml_path, yml_path)
- usage_yml_path = yml_path
+ # 备用文件存在时 复制使用
+ if self.backup_module_name is not None:
+ backup_yml_path = os.path.join(dir_path, f'{self.backup_module_name}.yml')
+ if os.path.exists(backup_yml_path):
+ shutil.copyfile(backup_yml_path, yml_path)
+ return yml_path, yml_path, None
- return usage_yml_path
+ # 最后看是否有示例文件
+ if self._sample and os.path.exists(sample_yml_path):
+ if self._copy_from_sample:
+ shutil.copyfile(sample_yml_path, yml_path)
+ return yml_path, yml_path, None
+ return sample_yml_path, yml_path, sample_yml_path
+
+ # 冻结环境回退到 MEIPASS/resources
+ frozen_path = os_utils.get_resource_path(*sub_dir, f'{self.module_name}.yml')
+ if os.path.exists(frozen_path):
+ return frozen_path, yml_path, frozen_path
+
+ return yml_path, yml_path, None
+
+ def _get_yaml_file_path(self) -> str | None:
+ file_path, write_file_path, copy_on_write_source_path = self._get_yaml_file_paths()
+ self._write_file_path = write_file_path
+ self._copy_on_write_source_path = copy_on_write_source_path
+ return file_path
@property
def is_sample(self) -> bool:
@@ -65,11 +105,13 @@ def is_sample(self) -> bool:
是否样例文件
:return:
"""
+ if self.file_path is None:
+ return False
return self.file_path.endswith('.sample.yml')
def get_prop_adapter(self, prop: str,
- getter_convert: Optional[str] = None,
- setter_convert: Optional[str] = None):
+ getter_convert: str | None = None,
+ setter_convert: str | None = None):
"""
获取一个配置适配器
:param prop: 配置字段
@@ -77,7 +119,9 @@ def get_prop_adapter(self, prop: str,
:param setter_convert: 设置时的转换器
:return:
"""
- from one_dragon_qt.widgets.setting_card.yaml_config_adapter import YamlConfigAdapter
+ from one_dragon_qt.widgets.setting_card.yaml_config_adapter import (
+ YamlConfigAdapter,
+ )
return YamlConfigAdapter(
config=self,
field=prop,
diff --git a/src/one_dragon/base/config/yaml_operator.py b/src/one_dragon/base/config/yaml_operator.py
index 0c089b8..45fc0b5 100644
--- a/src/one_dragon/base/config/yaml_operator.py
+++ b/src/one_dragon/base/config/yaml_operator.py
@@ -1,36 +1,56 @@
+import copy
import os
-import sys
-from typing import Optional
+import shutil
import yaml
+from one_dragon.utils import yaml_utils
from one_dragon.utils.log_utils import log
+cached_yaml_data: dict[str, tuple[float, dict | list]] = {}
-def get_temp_config_path(file_path: str) -> str:
- """
- 优先检查PyInstaller运行时的_MEIPASS目录下是否有对应的yml文件
- 有则返回该路径,否则返回原路径
- """
- if hasattr(sys, '_MEIPASS'):
- mei_path = os.path.join(sys._MEIPASS, 'config', os.path.basename(file_path))
- if os.path.exists(mei_path):
- return mei_path
- return file_path
+
+def read_cache_or_load(file_path: str) -> dict | list:
+ cached = cached_yaml_data.get(file_path)
+ last_modify = os.path.getmtime(file_path)
+ if cached is not None and cached[0] == last_modify:
+ return copy.deepcopy(cached[1])
+
+ with open(file_path, encoding="utf-8") as file:
+ log.debug(f"加载yaml: {file_path}")
+ data = yaml_utils.safe_load(file)
+ if data is None:
+ data = {}
+ if not isinstance(data, dict | list):
+ raise TypeError(f"YAML root must be a dict or list: {file_path}")
+ cached_yaml_data[file_path] = (last_modify, data)
+ return copy.deepcopy(data)
+
+
+def invalidate_cache(file_path: str | None) -> None:
+ if file_path is None:
+ return
+ cached_yaml_data.pop(file_path, None)
class YamlOperator:
- def __init__(self, file_path: Optional[str] = None):
+ def __init__(self, file_path: str | None = None):
"""
yml文件的操作器
:param file_path: yml文件的路径。不传入时认为是mock,用于测试。
"""
- self.file_path: str = get_temp_config_path(file_path) if file_path else None
+ self.file_path: str | None = file_path
"""yml文件的路径"""
- self.data: dict = {}
+ self._write_file_path: str | None = file_path
+ """实际写入路径 兼容 onedir 下读写路径分离"""
+
+ self._copy_on_write_source_path: str | None = None
+ """首次写入前需要复制到写入路径的来源文件"""
+
+ self.data: dict | list = {}
"""存放数据的地方"""
self.__read_from_file()
@@ -46,8 +66,7 @@ def __read_from_file(self) -> None:
return
try:
- with open(self.file_path, 'r', encoding='utf-8') as file:
- self.data = yaml.safe_load(file)
+ self.data = read_cache_or_load(self.file_path)
except Exception:
log.error(f'文件读取失败 将使用默认值 {self.file_path}', exc_info=True)
return
@@ -55,12 +74,42 @@ def __read_from_file(self) -> None:
if self.data is None:
self.data = {}
+ def _ensure_write_path_ready(self) -> bool:
+ write_path = self._get_write_path()
+ if write_path is None:
+ return False
+
+ parent_dir = os.path.dirname(write_path)
+ if parent_dir:
+ os.makedirs(parent_dir, exist_ok=True)
+
+ if self._copy_on_write_source_path is not None and not os.path.exists(write_path):
+ shutil.copyfile(self._copy_on_write_source_path, write_path)
+
+ self._copy_on_write_source_path = None
+ return True
+
+ def _get_write_path(self) -> str | None:
+ if self._copy_on_write_source_path is None:
+ return self.file_path if self.file_path is not None else self._write_file_path
+ return self._write_file_path if self._write_file_path is not None else self.file_path
+
def save(self):
- if self.file_path is None:
+ if not self._ensure_write_path_ready():
+ return
+
+ write_path = self._get_write_path()
+ if write_path is None:
return
- with open(self.file_path, 'w', encoding='utf-8') as file:
+ with open(write_path, 'w', encoding='utf-8') as file:
yaml.dump(self.data, file, allow_unicode=True, sort_keys=False)
+ invalidate_cache(write_path)
+
+ if self.file_path != write_path:
+ self.file_path = write_path
+ if hasattr(self, 'old_file_path'):
+ self.old_file_path = write_path
def save_diy(self, text: str):
"""
@@ -68,17 +117,29 @@ def save_diy(self, text: str):
:param text: 自定义的文本
:return:
"""
- if self.file_path is None:
+ if not self._ensure_write_path_ready():
return
- with open(self.file_path, "w", encoding="utf-8") as file:
+ write_path = self._get_write_path()
+ if write_path is None:
+ return
+
+ with open(write_path, "w", encoding="utf-8") as file:
file.write(text)
+ invalidate_cache(write_path)
+
+ if self.file_path != write_path:
+ self.file_path = write_path
+ if hasattr(self, 'old_file_path'):
+ self.old_file_path = write_path
def get(self, prop: str, value=None):
+ if not isinstance(self.data, dict):
+ return value
return self.data.get(prop, value)
def update(self, key: str, value, save: bool = True):
- if self.data is None:
+ if not isinstance(self.data, dict):
self.data = {}
if key in self.data and not isinstance(value, list) and self.data[key] == value:
return
@@ -91,12 +152,16 @@ def delete(self):
删除配置文件
:return:
"""
+ if self.file_path is None:
+ return
if os.path.exists(self.file_path):
os.remove(self.file_path)
+ invalidate_cache(self.file_path)
+ @property
def is_file_exists(self) -> bool:
"""
配置文件是否存在
:return:
"""
- return os.path.exists(self.file_path)
+ return bool(self.file_path) and os.path.exists(self.file_path)
diff --git a/src/one_dragon/base/web/__init__.py b/src/one_dragon/base/web/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/one_dragon/base/web/common_downloader.py b/src/one_dragon/base/web/common_downloader.py
new file mode 100644
index 0000000..32e401e
--- /dev/null
+++ b/src/one_dragon/base/web/common_downloader.py
@@ -0,0 +1,105 @@
+import os
+from collections.abc import Callable
+
+from one_dragon.utils import http_utils
+from one_dragon.utils.log_utils import log
+
+
+class CommonDownloaderParam:
+
+ def __init__(
+ self,
+ save_file_path: str,
+ save_file_name: str,
+ github_release_download_url: str | None = None,
+ gitee_release_download_url: str | None = None,
+ mirror_chan_download_url: str | None = None,
+ check_existed_list: list[str] | None = None,
+ unzip_dir_path: str | None = None,
+ ):
+ """
+ 一个通用下载器 可提供3个下载源 并检查文件是否存在 如果存在则不进行下载
+
+ Args:
+ save_file_path (str): 文件保存的路径
+ save_file_name (str): 文件保存的名称
+ github_release_download_url (Optional[str], optional): Github Release下载地址. Defaults to None.
+ gitee_release_download_url (Optional[str], optional): Gitee Release下载地址. Defaults to None.
+ mirror_chan_download_url (Optional[str], optional): Mirror酱下载地址. Defaults to None.
+ check_existed_list (Optional[list[str]], optional): 需要检查文件是否存在的列表 完整路径的列表. Defaults to None.
+ unzip_dir_path (Optional[str], optional): 解压目录路径,如果为None则解压到save_file_path. Defaults to None.
+ """
+ self.save_file_path: str = save_file_path
+ self.save_file_name: str = save_file_name
+ self.github_release_download_url: str | None = github_release_download_url
+ self.gitee_release_download_url: str | None = gitee_release_download_url
+ self.mirror_chan_download_url: str | None = mirror_chan_download_url
+ self.check_existed_list: list[str] = [] if check_existed_list is None else check_existed_list
+ self.unzip_dir_path: str | None = unzip_dir_path
+
+
+class CommonDownloader:
+
+ def __init__(
+ self,
+ param: CommonDownloaderParam,
+ ) -> None:
+ """
+ 一个通用下载器 可提供3个下载源 并检查文件是否存在 如果存在则不进行下载
+
+ Args:
+ param (CommonDownloaderParam): 下载参数
+ """
+ self.param: CommonDownloaderParam = param
+
+ def download(
+ self,
+ download_by_github: bool = True,
+ download_by_gitee: bool = False,
+ download_by_mirror_chan: bool = False,
+ proxy_url: str | None = None,
+ ghproxy_url: str | None = None,
+ skip_if_existed: bool = True,
+ progress_signal: dict[str, str | None] | None = None,
+ progress_callback: Callable[[float, str], None] | None = None
+ ) -> bool:
+ if skip_if_existed and self.is_file_existed():
+ return True
+
+ download_url: str = ''
+ if download_by_github and self.param.github_release_download_url is not None:
+ if ghproxy_url is not None:
+ download_url=f'{ghproxy_url}/{self.param.github_release_download_url}'
+ else:
+ download_url = self.param.github_release_download_url
+ elif download_by_gitee and self.param.gitee_release_download_url is not None:
+ download_url = self.param.gitee_release_download_url
+ elif download_by_mirror_chan and self.param.mirror_chan_download_url is not None:
+ download_url = self.param.mirror_chan_download_url
+
+ if download_url == '':
+ log.error('没有指定下载方法或对应的下载地址')
+ return False
+
+ return http_utils.download_file(
+ download_url=download_url,
+ save_file_path=os.path.join(self.param.save_file_path, self.param.save_file_name),
+ proxy=proxy_url,
+ progress_signal=progress_signal,
+ progress_callback=progress_callback)
+
+ def is_file_existed(self) -> bool:
+ """
+ 判断所需文件是否都已经存在了
+
+ Returns:
+ bool: 是否都存在
+ """
+ if not self.param.check_existed_list:
+ return False
+ all_existed: bool = True
+ for file_name in self.param.check_existed_list:
+ if not os.path.exists(file_name):
+ all_existed = False
+ break
+ return all_existed
diff --git a/src/one_dragon/base/web/zip_downloader.py b/src/one_dragon/base/web/zip_downloader.py
new file mode 100644
index 0000000..cf2e915
--- /dev/null
+++ b/src/one_dragon/base/web/zip_downloader.py
@@ -0,0 +1,98 @@
+import os
+from collections.abc import Callable
+
+from one_dragon.base.web.common_downloader import (
+ CommonDownloader,
+ CommonDownloaderParam,
+)
+from one_dragon.utils import file_utils
+from one_dragon.utils.log_utils import log
+
+
+class ZipDownloader(CommonDownloader):
+
+ def __init__(
+ self,
+ param: CommonDownloaderParam,
+ ) -> None:
+ """
+ 一个Zip的通用下载器 可提供3个下载源 并检查文件是否存在 如果存在则不进行下载 下载后进行文件解压
+
+ Args:
+ param (CommonDownloaderParam): 下载参数
+ """
+ CommonDownloader.__init__(
+ self,
+ param=param,
+ )
+
+ def download(
+ self,
+ download_by_github: bool = True,
+ download_by_gitee: bool = False,
+ download_by_mirror_chan: bool = False,
+ proxy_url: str | None = None,
+ ghproxy_url: str | None = None,
+ skip_if_existed: bool = True,
+ progress_signal: dict[str, str | None] | None = None,
+ progress_callback: Callable[[float, str], None] | None = None
+ ) -> bool:
+ for i in range(2):
+ download_result = CommonDownloader.download(
+ self,
+ download_by_github=download_by_github,
+ download_by_gitee=download_by_gitee,
+ download_by_mirror_chan=download_by_mirror_chan,
+ proxy_url=proxy_url,
+ ghproxy_url=ghproxy_url,
+ skip_if_existed=skip_if_existed if i == 0 else False, # 第2次重试时必定重新下载
+ progress_signal=progress_signal,
+ progress_callback=progress_callback,
+ )
+
+ if not download_result:
+ return download_result
+
+ unzip_result = self.unzip()
+ if unzip_result:
+ break
+ else: # 可能压缩包下载不完整 解压不成功 重新下载
+ log.warning('疑似压缩包损毁 重新下载')
+ continue
+
+ # 解压有可能失败 最后再判断一次解压产物是否已经存在
+ return CommonDownloader.is_file_existed(self)
+
+ def unzip(self) -> bool:
+ """
+ 对目标压缩包进行解压
+ """
+ # 文件已存在则不解压
+ exists = CommonDownloader.is_file_existed(self)
+ if exists:
+ return True
+
+ zip_file_path = os.path.join(self.param.save_file_path, self.param.save_file_name)
+ if not os.path.exists(zip_file_path):
+ return False
+
+ # 使用指定的解压路径,如果没有指定则使用save_file_path
+ unzip_dir = self.param.unzip_dir_path or self.param.save_file_path
+ os.makedirs(unzip_dir, exist_ok=True)
+ file_utils.unzip_file(zip_file_path=zip_file_path, unzip_dir_path=unzip_dir)
+ log.info(f"解压完成 {zip_file_path} 到 {unzip_dir}")
+
+ # 最后判断压缩包以外的文件是否完整了 完整了才说明解压成功
+ return CommonDownloader.is_file_existed(self)
+
+ def is_file_existed(self) -> bool:
+ """
+ 检查文件是否存在
+ 额外判断压缩包是否已经存在了
+ """
+ exists = CommonDownloader.is_file_existed(self)
+ if exists:
+ return True
+
+ zip_file_path = os.path.join(self.param.save_file_path, self.param.save_file_name)
+ return os.path.exists(zip_file_path)
diff --git a/src/one_dragon/envs/ghproxy_service.py b/src/one_dragon/envs/ghproxy_service.py
index 0b4f88d..522f923 100644
--- a/src/one_dragon/envs/ghproxy_service.py
+++ b/src/one_dragon/envs/ghproxy_service.py
@@ -1,5 +1,6 @@
import re
-import urllib.request
+
+import requests
from one_dragon.envs.env_config import EnvConfig
from one_dragon.utils.log_utils import log
@@ -10,35 +11,40 @@ class GhProxyService:
def __init__(self, env_config: EnvConfig):
self.env_config = env_config
- def update_proxy_url(self) -> None:
+ def update_proxy_url(self) -> bool:
"""
更新免费代理的url
:return:
"""
url = 'https://ghproxy.link/js/src_views_home_HomeView_vue.js' # 打开 https://ghproxy.link/ 后找到的js文件
+ headers = {
+ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
+ 'Referer': 'https://ghproxy.link/'
+ }
try:
- with urllib.request.urlopen(url, timeout=10) as response:
- js_content: str = response.read().decode('utf-8')
+ response = requests.get(url, headers=headers, timeout=10)
+ response.raise_for_status()
+ js_content = response.text
except Exception:
log.error('自动获取免费代理地址失败', exc_info=True)
- return
+ return False
url_prefix = ' 标签 有多个时候忽略 等待后续再处理
if another_url_prefix_idx != -1:
log.error('自动获取免费代理地址失败 有多个 标签')
- return
+ return False
proxy_url = js_content[url_prefix_idx + len(url_prefix):url_suffix_idx]
# 判断 proxy_url 是一个 https 开头的域名 不包含任何路径 例如 https://ghfast.top
@@ -47,10 +53,11 @@ def update_proxy_url(self) -> None:
# 使用正则表达式匹配
if not re.match(pattern, proxy_url):
log.error('自动获取免费代理地址失败 提取域名不合法 %s', proxy_url)
- return
+ return False
log.info('自动获取免费代理地址成功 %s', proxy_url)
self.env_config.gh_proxy_url = proxy_url
+ return True
def __debug():
@@ -59,4 +66,4 @@ def __debug():
if __name__ == '__main__':
- __debug()
\ No newline at end of file
+ __debug()
diff --git a/src/one_dragon/envs/project_config.py b/src/one_dragon/envs/project_config.py
index 6059715..275d0f1 100644
--- a/src/one_dragon/envs/project_config.py
+++ b/src/one_dragon/envs/project_config.py
@@ -4,7 +4,7 @@
class ProjectConfig(YamlConfig):
def __init__(self):
- super().__init__(module_name='project')
+ YamlConfig.__init__(self, module_name='project')
self.project_name = self.get('project_name')
self.python_version = self.get('python_version')
@@ -14,9 +14,14 @@ def __init__(self):
self.gitee_https_repository = self.get('gitee_https_repository')
self.gitee_ssh_repository = self.get('gitee_ssh_repository')
self.project_git_branch = self.get('project_git_branch')
- self.requirements = self.get('requirements')
+ self.env_archive_name = f'{self.project_name}-Environment.zip'
+ self.game_executable_name = self.get('game_executable_name', '')
self.screen_standard_width = int(self.get('screen_standard_width'))
self.screen_standard_height = int(self.get('screen_standard_height'))
+ self.notice_url = self.get('notice_url')
self.qq_link = self.get('qq_link')
+ self.quick_start_link = self.get('quick_start_link') # 链接 - 快速开始
+ self.home_page_link = self.get('home_page_link') # 链接 - 主页
+ self.doc_link = self.get('doc_link') # 链接 - 文档
diff --git a/src/one_dragon/utils/http_utils.py b/src/one_dragon/utils/http_utils.py
index 639c62f..01de465 100644
--- a/src/one_dragon/utils/http_utils.py
+++ b/src/one_dragon/utils/http_utils.py
@@ -1,57 +1,97 @@
import time
import urllib.request
-from typing import Optional, Callable
+from collections.abc import Callable
+from pathlib import Path
+from one_dragon.utils.i18_utils import gt
from one_dragon.utils.log_utils import log
def download_file(download_url: str, save_file_path: str,
- proxy: Optional[str] = None,
- progress_callback: Optional[Callable[[float, str], None]] = None) -> bool:
+ proxy: str | None = None, progress_signal: dict[str, str | None] | None = None,
+ progress_callback: Callable[[float, str], None] | None = None) -> bool:
"""
下载文件
:param download_url: 下载的url
:param save_file_path: 保存的文件路径,包含文件名
:param proxy: 使用的代理地址
+ :param progress_signal: 进度信号字典,当字典中 'signal' 键的值为 'cancel' 时会取消下载
:param progress_callback: 下载进度的回调,进度发生改变时,通过该方法通知调用方。
:return: 是否下载成功
"""
- if proxy is not None:
- proxy_handler = urllib.request.ProxyHandler(
- {'http': proxy, 'https': proxy})
- opener = urllib.request.build_opener(proxy_handler)
- urllib.request.install_opener(opener)
+ proxy_handler = (
+ urllib.request.ProxyHandler({'http': proxy, 'https': proxy})
+ if proxy is not None else urllib.request.ProxyHandler({})
+ )
+ opener = urllib.request.build_opener(proxy_handler)
last_log_time = time.time()
+ save_path = Path(save_file_path)
+ save_path.parent.mkdir(parents=True, exist_ok=True)
- def log_download_progress(block_num, block_size, total_size):
+ def log_download_progress(downloaded_bytes: int, total_size: int) -> None:
nonlocal last_log_time
now = time.time()
if now - last_log_time < 1:
return
last_log_time = now
- downloaded = block_num * block_size / 1024.0 / 1024.0
- total_size_mb = total_size / 1024.0 / 1024.0
- progress = downloaded / total_size_mb
- msg = f"正在下载 {downloaded:.2f}/{total_size_mb:.2f} MB ({progress * 100:.2f}%)"
+
+ downloaded_mb = downloaded_bytes / 1024.0 / 1024.0
+ if total_size > 0:
+ total_size_mb = total_size / 1024.0 / 1024.0
+ progress = downloaded_bytes / total_size
+ msg = f"{gt('正在下载')} {downloaded_mb:.2f}/{total_size_mb:.2f} MB ({progress * 100:.2f}%)"
+ else:
+ progress = 0
+ msg = f"{gt('正在下载')} {downloaded_mb:.2f} MB"
+
log.info(msg)
if progress_callback is not None:
progress_callback(progress, msg)
try:
- msg = f'开始下载 {download_url}'
+ msg = f"{gt('开始下载')} {download_url}"
log.info(msg)
if progress_callback is not None:
progress_callback(0, msg)
- _, __ = urllib.request.urlretrieve(download_url, save_file_path, log_download_progress)
- msg = f'下载完成 {save_file_path}'
+
+ request = urllib.request.Request(download_url)
+ with opener.open(request, timeout=60) as response, save_path.open('wb') as file:
+ total_size = int(response.headers.get('Content-Length', '0') or 0)
+ downloaded_bytes = 0
+ chunk_size = 1024 * 64
+
+ while True:
+ if progress_signal is not None and progress_signal.get('signal') == 'cancel':
+ raise DownloadCancelledError("下载已取消")
+
+ chunk = response.read(chunk_size)
+ if not chunk:
+ break
+
+ file.write(chunk)
+ downloaded_bytes += len(chunk)
+ log_download_progress(downloaded_bytes, total_size)
+
+ msg = f"{gt('下载完成')} {save_file_path}"
log.info(msg)
if progress_callback is not None:
progress_callback(1, msg)
return True
+ except DownloadCancelledError:
+ save_path.unlink(missing_ok=True)
+ msg = f"{gt('下载已取消')}"
+ log.info(msg)
+ if progress_callback is not None:
+ progress_callback(0, msg)
+ return False
except Exception as e:
- msg = f'下载失败 {e}'
+ save_path.unlink(missing_ok=True)
+ msg = f"{gt('下载失败')} {e}"
if progress_callback is not None:
progress_callback(0, msg)
- log.error(msg, exec=True)
+ log.error(msg, exc_info=True)
return False
+
+class DownloadCancelledError(Exception):
+ pass
diff --git a/src/one_dragon/utils/i18_utils.py b/src/one_dragon/utils/i18_utils.py
index 003e9bf..9bf1439 100644
--- a/src/one_dragon/utils/i18_utils.py
+++ b/src/one_dragon/utils/i18_utils.py
@@ -10,12 +10,12 @@
def get_translations(model: str, lang: str):
"""
- 加载语音
+ 加载语言
:param model: 模块 将ocr 界面 日志等翻译区分开来
:param lang: 语言
:return:
"""
- translate_path = os_utils.get_path_under_work_dir('assets', 'text', 'output')
+ translate_path = os_utils.get_resource_path('assets', 'text', 'output')
lang_dir = os.path.join(translate_path, lang, 'LC_MESSAGES', f'{model}.mo')
# 未有对应的文本mo文件
if not os.path.exists(lang_dir):
diff --git a/src/one_dragon/utils/os_utils.py b/src/one_dragon/utils/os_utils.py
index 6075106..8668b49 100644
--- a/src/one_dragon/utils/os_utils.py
+++ b/src/one_dragon/utils/os_utils.py
@@ -35,7 +35,7 @@ def get_path_under_work_dir(*sub_paths: str) -> str:
def get_resource_path(*sub_paths: str) -> str:
"""获取资源文件路径。
- 优先查找工作目录下的路径,不存在时回退到 PyInstaller _MEIPASS。
+ 优先查找工作目录下的路径,不存在时回退到 PyInstaller _MEIPASS/resources。
"""
work_path = os.path.join(get_work_dir(), *sub_paths)
if os.path.exists(work_path):
diff --git a/src/one_dragon_qt/view/like_interface.py b/src/one_dragon_qt/view/like_interface.py
index 9693d65..7f9d17f 100644
--- a/src/one_dragon_qt/view/like_interface.py
+++ b/src/one_dragon_qt/view/like_interface.py
@@ -36,13 +36,14 @@ def get_content_widget(self) -> QWidget:
url='https://one-dragon.com/other/zh/like/like.html')
content.add_widget(cafe_opt)
- img_label = ImageLabel()
- img = cv2_utils.read_image(os.path.join(os_utils.get_path_under_work_dir('assets', 'ui'), 'sponsor_wechat.png'))
- image = Cv2Image(img)
- img_label.setImage(image)
- img_label.setFixedWidth(250)
- img_label.setFixedHeight(250)
- content.add_widget(img_label)
+ img = cv2_utils.read_image(os_utils.get_resource_path('assets', 'ui', 'sponsor_wechat.png'))
+ if img is not None:
+ img_label = ImageLabel()
+ image = Cv2Image(img)
+ img_label.setImage(image)
+ img_label.setFixedWidth(250)
+ img_label.setFixedHeight(250)
+ content.add_widget(img_label)
content.add_stretch(1)
return content
diff --git a/src/one_dragon_qt/view/setting/setting_instance_interface.py b/src/one_dragon_qt/view/setting/setting_instance_interface.py
index c42e8c9..a95cd4b 100644
--- a/src/one_dragon_qt/view/setting/setting_instance_interface.py
+++ b/src/one_dragon_qt/view/setting/setting_instance_interface.py
@@ -374,8 +374,9 @@ def _on_instance_delete(self, idx: int) -> None:
self._init_content_widget()
def _on_game_path_clicked(self) -> None:
+ executable_name = self.ctx.project_config.game_executable_name or 'game.exe'
file_path, _ = QFileDialog.getOpenFileName(
- self, f"{gt('选择你的')} ZenlessZoneZero.exe", filter="Exe (*.exe)"
+ self, f"{gt('选择你的')} {executable_name}", filter="Exe (*.exe)"
)
if file_path is not None and file_path.endswith(".exe"):
log.info(f"{gt('选择路径')} {file_path}")
diff --git a/src/script_chainer/context/script_chainer_context.py b/src/script_chainer/context/script_chainer_context.py
index ad78074..959e653 100644
--- a/src/script_chainer/context/script_chainer_context.py
+++ b/src/script_chainer/context/script_chainer_context.py
@@ -6,10 +6,12 @@
from one_dragon.base.push.push_service import PushService
from one_dragon.custom.custom_config import CustomConfig
from one_dragon.envs.env_config import EnvConfig
+from one_dragon.envs.ghproxy_service import GhProxyService
from one_dragon.envs.project_config import ProjectConfig
from one_dragon.utils import os_utils
from one_dragon.utils.log_utils import log
from script_chainer.config.script_config import ScriptChainConfig
+from script_chainer.services.github_update_service import GithubUpdateService
ONE_DRAGON_CONTEXT_EXECUTOR = ThreadPoolExecutor(thread_name_prefix='one_dragon_context', max_workers=1)
@@ -20,6 +22,8 @@ def __init__(self):
self.project_config: ProjectConfig = ProjectConfig()
self.env_config: EnvConfig = EnvConfig()
self.custom_config: CustomConfig = CustomConfig()
+ self.gh_proxy_service: GhProxyService = GhProxyService(self.env_config)
+ self.github_update_service: GithubUpdateService = GithubUpdateService(self)
self.push_service: PushService = PushService(self)
self.notify_config: NotifyConfig = NotifyConfig(None, {})
self._init_lock = threading.Lock()
diff --git a/src/script_chainer/gui/page/editor_setting_interface.py b/src/script_chainer/gui/page/editor_setting_interface.py
index 9e95e8f..66f98d2 100644
--- a/src/script_chainer/gui/page/editor_setting_interface.py
+++ b/src/script_chainer/gui/page/editor_setting_interface.py
@@ -1,28 +1,272 @@
+from PySide6.QtCore import Qt, QThread, Signal
from PySide6.QtGui import QColor
-from PySide6.QtWidgets import QWidget
+from PySide6.QtWidgets import QApplication, QWidget
from qfluentwidgets import (
ColorDialog,
+ ComboBox,
FluentIcon,
+ HyperlinkButton,
+ PrimaryPushButton,
+ PushButton,
SettingCardGroup,
Theme,
setTheme,
)
from one_dragon.custom.custom_config import ThemeEnum
+from one_dragon.envs.env_config import ProxyTypeEnum
+from one_dragon.utils import os_utils
+from one_dragon.utils.i18_utils import gt
+from one_dragon.utils.log_utils import log
from one_dragon_qt.services.theme_manager import ThemeManager
from one_dragon_qt.widgets.column import Column
from one_dragon_qt.widgets.setting_card.combo_box_setting_card import (
ComboBoxSettingCard,
)
+from one_dragon_qt.widgets.setting_card.multi_push_setting_card import (
+ MultiPushSettingCard,
+)
from one_dragon_qt.widgets.setting_card.push_setting_card import PushSettingCard
+from one_dragon_qt.widgets.setting_card.switch_setting_card import SwitchSettingCard
+from one_dragon_qt.widgets.setting_card.text_setting_card import TextSettingCard
from one_dragon_qt.widgets.vertical_scroll_interface import VerticalScrollInterface
from script_chainer.context.script_chainer_context import ScriptChainerContext
+from script_chainer.services.github_update_service import GithubUpdateService
+
+
+class GithubUpdateRunner(QThread):
+
+ progress_changed = Signal(float, str)
+ update_finished = Signal(bool, str)
+
+ def __init__(self, service: GithubUpdateService, target_version: str):
+ QThread.__init__(self)
+ self.service = service
+ self.target_version = target_version
+ self.progress_signal: dict[str, str | None] = {'signal': None}
+
+ def run(self) -> None:
+ success, message = self.service.download_and_restart(
+ self.target_version,
+ self._on_progress,
+ self.progress_signal,
+ )
+ self.update_finished.emit(success, message)
+
+ def _on_progress(self, progress: float, message: str) -> None:
+ self.progress_changed.emit(progress, message)
+
+ def cancel(self) -> None:
+ self.progress_signal['signal'] = 'cancel'
+
+
+class GithubUpdateChecker(QThread):
+
+ check_finished = Signal(bool, str, str, str)
+
+ def __init__(self, ctx: ScriptChainerContext):
+ QThread.__init__(self)
+ self.ctx = ctx
+
+ def run(self) -> None:
+ try:
+ latest_stable, latest_beta = self.ctx.github_update_service.get_latest_tags()
+ self.check_finished.emit(True, latest_stable, latest_beta, '')
+ except Exception as e:
+ log.error('检查 GitHub 更新失败', exc_info=True)
+ self.check_finished.emit(False, '', '', str(e))
+
+
+class GhProxyUpdateRunner(QThread):
+
+ update_finished = Signal(bool, str)
+
+ def __init__(self, ctx: ScriptChainerContext):
+ QThread.__init__(self)
+ self.ctx = ctx
+
+ def run(self) -> None:
+ success = False
+ try:
+ success = self.ctx.gh_proxy_service.update_proxy_url()
+ finally:
+ self.update_finished.emit(success, self.ctx.env_config.gh_proxy_url)
+
+
+class GithubUpdateCard(MultiPushSettingCard):
+
+ def __init__(self, ctx: ScriptChainerContext, parent=None):
+ self.ctx = ctx
+ self.current_version = ''
+ self.latest_stable_version = ''
+ self.latest_beta_version = ''
+ self.target_version = ''
+
+ self.channel_combo = ComboBox()
+ self.channel_combo.addItem(gt('正式版'), userData='stable')
+ self.channel_combo.addItem(gt('测试版'), userData='beta')
+ self.channel_combo.setCurrentIndex(0)
+ self.channel_combo.currentIndexChanged.connect(self._on_channel_changed)
+
+ self.check_btn = PushButton(gt('检查'))
+ self.check_btn.clicked.connect(self.check_and_update_display)
+
+ self.update_btn = PrimaryPushButton(text=gt('更新'))
+ self.update_btn.clicked.connect(self._on_update_clicked)
+
+ self.cancel_btn = PushButton(gt('取消'))
+ self.cancel_btn.clicked.connect(self._on_cancel_clicked)
+ self.cancel_btn.setVisible(False)
+
+ self.version_checker = GithubUpdateChecker(ctx)
+ self.version_checker.check_finished.connect(self._on_version_check_finished)
+
+ self.update_runner: GithubUpdateRunner | None = None
+
+ MultiPushSettingCard.__init__(
+ self,
+ btn_list=[self.channel_combo, self.check_btn, self.update_btn, self.cancel_btn],
+ icon=FluentIcon.SYNC,
+ title='程序更新',
+ content='检查中...',
+ parent=parent,
+ )
+ self.check_and_update_display()
+
+ def check_and_update_display(self) -> None:
+ if self.version_checker.isRunning() or self._is_update_running():
+ return
+
+ self.cancel_btn.setVisible(False)
+ self.cancel_btn.setEnabled(False)
+ self.setContent(gt('正在检查 GitHub 最新版本...'))
+ self.channel_combo.setDisabled(True)
+ self.check_btn.setDisabled(True)
+ self.update_btn.setDisabled(True)
+ self.update_btn.setText(gt('检查中'))
+ self.version_checker.start()
+
+ def _on_version_check_finished(
+ self,
+ success: bool,
+ latest_stable_version: str,
+ latest_beta_version: str,
+ message: str,
+ ) -> None:
+ self.channel_combo.setEnabled(True)
+ self.check_btn.setEnabled(True)
+
+ if not success:
+ self.latest_stable_version = ''
+ self.latest_beta_version = ''
+ self.target_version = ''
+ self.setContent(f"{gt('检查更新失败')}: {message}")
+ self.update_btn.setText(gt('重试'))
+ self.update_btn.setDisabled(True)
+ return
+
+ self.current_version = self._current_version()
+ self.latest_stable_version = latest_stable_version
+ self.latest_beta_version = latest_beta_version
+ self._update_display_by_channel()
+
+ def _on_channel_changed(self, _index: int) -> None:
+ if self.version_checker.isRunning() or self._is_update_running():
+ return
+ self._update_display_by_channel()
+
+ def _update_display_by_channel(self) -> None:
+ channel = self.channel_combo.currentData()
+ channel_name = gt('测试版') if channel == 'beta' else gt('正式版')
+ self.target_version = self.latest_beta_version if channel == 'beta' else self.latest_stable_version
+
+ if not self.target_version:
+ self.setContent(f"{channel_name}{gt('暂无可用版本')}")
+ self.update_btn.setText(gt('不可用'))
+ self.update_btn.setDisabled(True)
+ return
+
+ if not os_utils.run_in_exe():
+ self.setContent(
+ f"{gt('当前版本')}: {self.current_version}; "
+ f"{channel_name}: {self.target_version}; "
+ f"{gt('当前不是发布版,无法自动更新')}"
+ )
+ self.update_btn.setText(gt('不可用'))
+ self.update_btn.setDisabled(True)
+ elif self.current_version == self.target_version:
+ self.setContent(f"{gt('已是最新版本')} {self.current_version}")
+ self.update_btn.setText(gt('已最新'))
+ self.update_btn.setDisabled(True)
+ else:
+ self.setContent(
+ f"{gt('可更新')} {gt('当前版本')}: {self.current_version}; "
+ f"{channel_name}: {self.target_version}"
+ )
+ self.update_btn.setText(gt('更新'))
+ self.update_btn.setEnabled(True)
+
+ def _on_update_clicked(self) -> None:
+ if self._is_update_running():
+ return
+
+ self.channel_combo.setDisabled(True)
+ self.check_btn.setDisabled(True)
+ self.update_btn.setDisabled(True)
+ self.update_btn.setText(gt('更新中'))
+ self.cancel_btn.setVisible(True)
+ self.cancel_btn.setEnabled(True)
+ self.setContent(gt('正在准备 GitHub 更新...'))
+ self.update_runner = GithubUpdateRunner(self.ctx.github_update_service, self.target_version)
+ self.update_runner.progress_changed.connect(self._on_update_progress)
+ self.update_runner.update_finished.connect(self._on_update_finished)
+ self.update_runner.start()
+
+ def _on_cancel_clicked(self) -> None:
+ if not self._is_update_running() or self.update_runner is None:
+ return
+
+ self.update_runner.cancel()
+ self.cancel_btn.setDisabled(True)
+ self.update_btn.setText(gt('取消中'))
+ self.setContent(gt('正在取消下载...'))
+
+ def _on_update_progress(self, progress: float, message: str) -> None:
+ if progress > 0:
+ self.setContent(f'{message} {progress:.0%}')
+ else:
+ self.setContent(message)
+
+ def _on_update_finished(self, success: bool, message: str) -> None:
+ self.cancel_btn.setVisible(False)
+ self.cancel_btn.setEnabled(False)
+ self.setContent(message)
+ if success:
+ self.update_btn.setText(gt('重启中'))
+ QApplication.quit()
+ return
+
+ self.channel_combo.setEnabled(True)
+ self.check_btn.setEnabled(True)
+ self.update_btn.setText(gt('更新') if message == gt('下载已取消') else gt('重试'))
+ self.update_btn.setEnabled(True)
+
+ def _is_update_running(self) -> bool:
+ return self.update_runner is not None and self.update_runner.isRunning()
+
+ def _current_version(self) -> str:
+ from one_dragon.version import __version__
+
+ return __version__
class EditorSettingInterface(VerticalScrollInterface):
def __init__(self, ctx: ScriptChainerContext, parent=None):
self.ctx: ScriptChainerContext = ctx
+ self._gh_proxy_auto_refreshed = False
+ self.gh_proxy_update_runner = GhProxyUpdateRunner(ctx)
+ self.gh_proxy_update_runner.update_finished.connect(self._on_gh_proxy_update_finished)
VerticalScrollInterface.__init__(
self,
@@ -35,13 +279,14 @@ def __init__(self, ctx: ScriptChainerContext, parent=None):
def get_content_widget(self) -> QWidget:
content_widget = Column()
- content_widget.add_widget(self.get_basic_group())
+ content_widget.add_widget(self.get_appearance_group())
+ content_widget.add_widget(self.get_update_group())
content_widget.add_stretch(1)
return content_widget
- def get_basic_group(self) -> SettingCardGroup:
- group = SettingCardGroup('基础')
+ def get_appearance_group(self) -> SettingCardGroup:
+ group = SettingCardGroup(gt('外观'))
self.theme_opt = ComboBoxSettingCard(
icon=FluentIcon.CONSTRACT, title='界面主题',
@@ -57,9 +302,71 @@ def get_basic_group(self) -> SettingCardGroup:
return group
+ def get_update_group(self) -> SettingCardGroup:
+ group = SettingCardGroup(gt('更新'))
+
+ self.github_update_opt = GithubUpdateCard(self.ctx)
+ group.addSettingCard(self.github_update_opt)
+
+ self.proxy_type_opt = ComboBoxSettingCard(
+ icon=FluentIcon.GLOBE,
+ title='代理类型',
+ options_enum=ProxyTypeEnum,
+ )
+ self.proxy_type_opt.value_changed.connect(self._on_proxy_type_changed)
+ group.addSettingCard(self.proxy_type_opt)
+
+ self.personal_proxy_input = TextSettingCard(
+ icon=FluentIcon.WIFI,
+ title='个人代理',
+ input_placeholder='http://127.0.0.1:8080',
+ )
+ self.personal_proxy_input.value_changed.connect(self._on_proxy_changed)
+ group.addSettingCard(self.personal_proxy_input)
+
+ self.gh_proxy_url_opt = TextSettingCard(
+ icon=FluentIcon.GLOBE,
+ title='GitHub 代理',
+ )
+ group.addSettingCard(self.gh_proxy_url_opt)
+
+ self.auto_fetch_gh_proxy_url_opt = SwitchSettingCard(
+ icon=FluentIcon.SYNC,
+ title='自动获取免费代理地址',
+ content='获取失败时 可前往 https://ghproxy.link/ 查看自行更新',
+ )
+ self.fetch_gh_proxy_url_btn = PushButton(gt('获取'), self)
+ self.fetch_gh_proxy_url_btn.clicked.connect(self.on_fetch_gh_proxy_url_clicked)
+ self.auto_fetch_gh_proxy_url_opt.hBoxLayout.addWidget(
+ self.fetch_gh_proxy_url_btn,
+ 0,
+ Qt.AlignmentFlag.AlignRight,
+ )
+ self.auto_fetch_gh_proxy_url_opt.hBoxLayout.addSpacing(16)
+
+ self.goto_gh_proxy_link_btn = HyperlinkButton('https://ghproxy.link', gt('前往'), self)
+ self.auto_fetch_gh_proxy_url_opt.hBoxLayout.addWidget(
+ self.goto_gh_proxy_link_btn,
+ 0,
+ Qt.AlignmentFlag.AlignRight,
+ )
+ self.auto_fetch_gh_proxy_url_opt.hBoxLayout.addSpacing(16)
+
+ group.addSettingCard(self.auto_fetch_gh_proxy_url_opt)
+
+ return group
+
def on_interface_shown(self) -> None:
VerticalScrollInterface.on_interface_shown(self)
self.theme_opt.init_with_adapter(self.ctx.custom_config.get_prop_adapter('theme'))
+ self.proxy_type_opt.init_with_adapter(self.ctx.env_config.get_prop_adapter('proxy_type'))
+ self.personal_proxy_input.init_with_adapter(self.ctx.env_config.get_prop_adapter('personal_proxy'))
+ self.gh_proxy_url_opt.init_with_adapter(self.ctx.env_config.get_prop_adapter('gh_proxy_url'))
+ self.auto_fetch_gh_proxy_url_opt.init_with_adapter(
+ self.ctx.env_config.get_prop_adapter('auto_fetch_gh_proxy_url')
+ )
+ self.update_proxy_ui()
+ self.refresh_gh_proxy_url_by_config()
def on_theme_changed(self, index: int, value: str) -> None:
"""主题改变。
@@ -84,3 +391,46 @@ def _update_custom_theme_color(self, color: QColor) -> None:
self.ctx.custom_config.theme_color = color_tuple
ThemeManager.set_theme_color(color_tuple)
+ def _on_proxy_type_changed(self, index: int, value: str) -> None:
+ self.update_proxy_ui()
+ self._on_proxy_changed()
+
+ def _on_proxy_changed(self) -> None:
+ self.ctx.env_config.init_system_proxy()
+
+ def on_fetch_gh_proxy_url_clicked(self) -> None:
+ self.start_gh_proxy_url_refresh()
+
+ def refresh_gh_proxy_url_by_config(self) -> None:
+ if self._gh_proxy_auto_refreshed:
+ return
+ if not self.ctx.env_config.auto_fetch_gh_proxy_url:
+ return
+ if self.ctx.env_config.proxy_type != ProxyTypeEnum.GHPROXY.value.value:
+ return
+ self.start_gh_proxy_url_refresh()
+
+ def start_gh_proxy_url_refresh(self) -> None:
+ if self.gh_proxy_update_runner.isRunning():
+ return
+ self.fetch_gh_proxy_url_btn.setDisabled(True)
+ self.gh_proxy_update_runner.start()
+
+ def _on_gh_proxy_update_finished(self, success: bool, proxy_url: str) -> None:
+ self._gh_proxy_auto_refreshed = success
+ self.gh_proxy_url_opt.setValue(proxy_url, emit_signal=False)
+ self.fetch_gh_proxy_url_btn.setEnabled(True)
+
+ def update_proxy_ui(self) -> None:
+ if self.ctx.env_config.proxy_type == ProxyTypeEnum.GHPROXY.value.value:
+ self.personal_proxy_input.hide()
+ self.gh_proxy_url_opt.show()
+ self.auto_fetch_gh_proxy_url_opt.show()
+ elif self.ctx.env_config.proxy_type == ProxyTypeEnum.PERSONAL.value.value:
+ self.personal_proxy_input.show()
+ self.gh_proxy_url_opt.hide()
+ self.auto_fetch_gh_proxy_url_opt.hide()
+ elif self.ctx.env_config.proxy_type == ProxyTypeEnum.NONE.value.value:
+ self.personal_proxy_input.hide()
+ self.gh_proxy_url_opt.hide()
+ self.auto_fetch_gh_proxy_url_opt.hide()
diff --git a/src/script_chainer/services/github_update_service.py b/src/script_chainer/services/github_update_service.py
new file mode 100644
index 0000000..4e559e8
--- /dev/null
+++ b/src/script_chainer/services/github_update_service.py
@@ -0,0 +1,298 @@
+import json
+import os
+import shutil
+import subprocess
+import sys
+import urllib.parse
+import urllib.request
+import zipfile
+from collections.abc import Callable
+from pathlib import Path, PurePosixPath
+from typing import TYPE_CHECKING
+
+from one_dragon.base.web.common_downloader import (
+ CommonDownloader,
+ CommonDownloaderParam,
+)
+from one_dragon.utils import os_utils
+from one_dragon.utils.i18_utils import gt
+from one_dragon.utils.log_utils import log
+
+if TYPE_CHECKING:
+ from script_chainer.context.script_chainer_context import ScriptChainerContext
+
+ProgressCallback = Callable[[float, str], None]
+
+APP_EXE_NAMES = (
+ 'OneDragon ScriptChainer.exe',
+ 'OneDragon ScriptChainer Runner.exe',
+)
+RELEASE_ZIP_PREFIX = 'OneDragon-ScriptChainer'
+UPDATE_DIR_NAME = 'github_update'
+RUNTIME_DIR_NAME = '.runtime'
+
+
+class GithubUpdateService:
+ """下载 GitHub Release,并在进程退出后替换发布版文件。"""
+
+ def __init__(self, ctx: 'ScriptChainerContext'):
+ self.ctx = ctx
+
+ def download_and_restart(
+ self,
+ target_tag: str | None = None,
+ progress_callback: ProgressCallback | None = None,
+ progress_signal: dict[str, str | None] | None = None,
+ ) -> tuple[bool, str]:
+ if not os_utils.run_in_exe():
+ return False, gt('当前不是发布版,无法自动更新')
+
+ work_dir = Path(os_utils.get_work_dir())
+ update_dir = work_dir / '.temp' / UPDATE_DIR_NAME
+ stage_dir = update_dir / 'stage'
+
+ try:
+ if update_dir.exists():
+ shutil.rmtree(update_dir)
+ stage_dir.mkdir(parents=True, exist_ok=True)
+
+ if progress_callback is not None:
+ progress_callback(0, gt('正在获取 GitHub 更新版本'))
+ latest_tag = target_tag or self.get_latest_tag()
+ downloader_param = self._get_downloader_param(latest_tag, update_dir)
+ release_zip_name = downloader_param.save_file_name
+ zip_path = update_dir / release_zip_name
+ downloader = CommonDownloader(downloader_param)
+ proxy = self.ctx.env_config.personal_proxy if self.ctx.env_config.is_personal_proxy else None
+ ghproxy_url = self.ctx.env_config.gh_proxy_url if self.ctx.env_config.is_gh_proxy else None
+ if not downloader.download(
+ proxy_url=proxy,
+ ghproxy_url=ghproxy_url,
+ skip_if_existed=False,
+ progress_signal=progress_signal,
+ progress_callback=progress_callback,
+ ):
+ if progress_signal is not None and progress_signal.get('signal') == 'cancel':
+ return False, gt('下载已取消')
+ return False, gt('下载 GitHub 更新失败')
+
+ if progress_signal is not None and progress_signal.get('signal') == 'cancel':
+ return False, gt('下载已取消')
+
+ if progress_callback is not None:
+ progress_callback(1, gt('正在解压更新包'))
+ self._extract_release(zip_path, stage_dir)
+
+ update_items = self._get_update_items(stage_dir)
+ if not update_items:
+ return False, gt('更新包中没有可替换文件')
+
+ script_path = self._write_apply_script(work_dir, update_dir, update_items)
+ self._start_apply_script(script_path, work_dir)
+ return True, gt('更新包已下载完成,程序将重启并替换文件')
+ except Exception as e:
+ log.error('准备 GitHub 更新失败', exc_info=True)
+ return False, f"{gt('准备 GitHub 更新失败')}: {e}"
+
+ def get_latest_tags(self) -> tuple[str, str]:
+ stable_tag = self.get_latest_tag()
+ try:
+ beta_tag = self.get_latest_beta_tag()
+ except Exception:
+ log.error('获取 GitHub 最新测试版失败', exc_info=True)
+ beta_tag = ''
+ return stable_tag, beta_tag
+
+ def get_latest_tag(self) -> str:
+ repo_path = self._get_github_repo_path()
+ release_url = f'https://api.github.com/repos/{repo_path}/releases/latest'
+ with self._open_request(release_url, headers={'User-Agent': 'OneDragon-ScriptChainer'}) as response:
+ release = json.loads(response.read().decode('utf-8'))
+
+ tag = str(release.get('tag_name', '')).strip()
+ if not tag:
+ raise RuntimeError(f'无法解析最新版本: {release_url}')
+ return tag
+
+ def get_latest_beta_tag(self) -> str:
+ repo_path = self._get_github_repo_path()
+ releases_url = f'https://api.github.com/repos/{repo_path}/releases?per_page=30'
+ with self._open_request(releases_url, headers={'User-Agent': 'OneDragon-ScriptChainer'}) as response:
+ releases = json.loads(response.read().decode('utf-8'))
+
+ for release in releases:
+ if release.get('draft'):
+ continue
+ if release.get('prerelease'):
+ tag = str(release.get('tag_name', '')).strip()
+ if tag:
+ return tag
+
+ return ''
+
+ def _get_github_repo_path(self) -> str:
+ parsed = urllib.parse.urlparse(self.ctx.project_config.github_homepage)
+ repo_path = parsed.path.strip('/')
+ if repo_path.endswith('.git'):
+ repo_path = repo_path[:-4]
+ if repo_path.count('/') < 1:
+ raise RuntimeError(f'无法解析 GitHub 仓库地址: {self.ctx.project_config.github_homepage}')
+ return repo_path
+
+ def _open_request(
+ self,
+ url: str,
+ method: str = 'GET',
+ headers: dict[str, str] | None = None,
+ ):
+ request = urllib.request.Request(url, method=method, headers=headers or {})
+ return self._build_opener().open(request, timeout=15)
+
+ def _build_opener(self):
+ proxy = self.ctx.env_config.personal_proxy if self.ctx.env_config.is_personal_proxy else None
+ proxy_handler = (
+ urllib.request.ProxyHandler({'http': proxy, 'https': proxy})
+ if proxy is not None else urllib.request.ProxyHandler({})
+ )
+ return urllib.request.build_opener(proxy_handler)
+
+ def _get_downloader_param(self, latest_tag: str, update_dir: Path) -> CommonDownloaderParam:
+ release_zip_name = f'{RELEASE_ZIP_PREFIX}-{latest_tag}.zip'
+ download_url = (
+ f'{self.ctx.project_config.github_homepage}'
+ f'/releases/download/{latest_tag}/{release_zip_name}'
+ )
+
+ return CommonDownloaderParam(
+ save_file_path=str(update_dir),
+ save_file_name=release_zip_name,
+ github_release_download_url=download_url,
+ )
+
+ def _extract_release(self, zip_path: Path, stage_dir: Path) -> None:
+ stage_root = stage_dir.resolve()
+ with zipfile.ZipFile(zip_path) as zip_file:
+ root_dir = self._detect_root_dir(zip_file)
+ for info in zip_file.infolist():
+ if info.is_dir():
+ continue
+
+ parts = PurePosixPath(info.filename.replace('\\', '/')).parts
+ if root_dir is not None and parts and parts[0] == root_dir:
+ parts = parts[1:]
+ if not parts or '..' in parts:
+ continue
+ if not self._should_extract(parts):
+ continue
+
+ target_path = stage_dir.joinpath(*parts)
+ if not self._is_relative_to(target_path.resolve(), stage_root):
+ continue
+
+ target_path.parent.mkdir(parents=True, exist_ok=True)
+ with zip_file.open(info) as source, target_path.open('wb') as target:
+ shutil.copyfileobj(source, target)
+
+ def _detect_root_dir(self, zip_file: zipfile.ZipFile) -> str | None:
+ file_parts = [
+ PurePosixPath(info.filename.replace('\\', '/')).parts
+ for info in zip_file.infolist()
+ if not info.is_dir()
+ ]
+ if not file_parts:
+ return None
+
+ first_parts = [parts[0] for parts in file_parts if len(parts) > 1]
+ if len(first_parts) != len(file_parts):
+ return None
+
+ root_dir = first_parts[0]
+ return root_dir if all(part == root_dir for part in first_parts) else None
+
+ def _should_extract(self, parts: tuple[str, ...]) -> bool:
+ top_name = parts[0]
+ if top_name == RUNTIME_DIR_NAME:
+ return True
+ return len(parts) == 1 and top_name.lower().endswith('.exe')
+
+ def _get_update_items(self, stage_dir: Path) -> list[Path]:
+ items: list[Path] = []
+ for name in (*APP_EXE_NAMES, RUNTIME_DIR_NAME):
+ item_path = stage_dir / name
+ if item_path.exists():
+ items.append(item_path)
+
+ known_names = {item.name for item in items}
+ for item_path in stage_dir.glob('*.exe'):
+ if item_path.name not in known_names:
+ items.append(item_path)
+ return items
+
+ def _write_apply_script(
+ self,
+ work_dir: Path,
+ update_dir: Path,
+ update_items: list[Path],
+ ) -> Path:
+ lines = [
+ "$ErrorActionPreference = 'Stop'",
+ f'$ProcessIdToWait = {os.getpid()}',
+ f'$CurrentExe = {self._ps_quote(sys.executable)}',
+ f'$WorkDir = {self._ps_quote(str(work_dir))}',
+ 'Start-Sleep -Milliseconds 500',
+ 'try { Wait-Process -Id $ProcessIdToWait -Timeout 30 -ErrorAction SilentlyContinue } catch {}',
+ ]
+
+ for item_path in update_items:
+ target_path = work_dir / item_path.name
+ backup_path = work_dir / f'{item_path.name}.bak'
+ lines.extend([
+ f'$Source = {self._ps_quote(str(item_path))}',
+ f'$Target = {self._ps_quote(str(target_path))}',
+ f'$Backup = {self._ps_quote(str(backup_path))}',
+ 'if (Test-Path -LiteralPath $Backup) { Remove-Item -LiteralPath $Backup -Recurse -Force }',
+ 'if (Test-Path -LiteralPath $Target) { Move-Item -LiteralPath $Target -Destination $Backup -Force }',
+ 'try {',
+ ' Move-Item -LiteralPath $Source -Destination $Target -Force',
+ ' if (Test-Path -LiteralPath $Backup) { Remove-Item -LiteralPath $Backup -Recurse -Force }',
+ '} catch {',
+ ' if (Test-Path -LiteralPath $Backup) {',
+ ' if (Test-Path -LiteralPath $Target) { Remove-Item -LiteralPath $Target -Recurse -Force }',
+ ' Move-Item -LiteralPath $Backup -Destination $Target -Force',
+ ' }',
+ ' throw',
+ '}',
+ ])
+
+ lines.extend([
+ 'Start-Process -FilePath $CurrentExe -WorkingDirectory $WorkDir',
+ f'Remove-Item -LiteralPath {self._ps_quote(str(update_dir))} -Recurse -Force',
+ ])
+
+ script_path = update_dir / 'apply_update.ps1'
+ script_path.write_text('\n'.join(lines), encoding='utf-8')
+ return script_path
+
+ def _start_apply_script(self, script_path: Path, work_dir: Path) -> None:
+ subprocess.Popen(
+ [
+ 'powershell',
+ '-NoProfile',
+ '-ExecutionPolicy',
+ 'Bypass',
+ '-File',
+ str(script_path),
+ ],
+ cwd=str(work_dir),
+ creationflags=subprocess.CREATE_NO_WINDOW,
+ )
+
+ def _ps_quote(self, value: str) -> str:
+ return "'" + value.replace("'", "''") + "'"
+
+ def _is_relative_to(self, path: Path, parent: Path) -> bool:
+ try:
+ path.relative_to(parent)
+ return True
+ except ValueError:
+ return False
diff --git a/src/script_chainer/win_exe/script_runner.py b/src/script_chainer/win_exe/script_runner.py
index 49a1990..171f9a8 100644
--- a/src/script_chainer/win_exe/script_runner.py
+++ b/src/script_chainer/win_exe/script_runner.py
@@ -653,7 +653,7 @@ def run_chain(chain_name: str = '01', shutdown_delay: int = 0, debug_index: int
log.error(f'初始化上下文实例失败: {e}')
try:
- if not chain_config.is_file_exists():
+ if not chain_config.is_file_exists:
print_message(f'脚本链配置不存在 {chain_name}', "ERROR")
else:
attach_targets = chain_config.compute_attach_targets()