feat: 流程任务支持并发限制 --story=134711116#749
Conversation
# Reviewed, transaction id: 81044
There was a problem hiding this comment.
Review Summary
本 PR 实现了流程任务的并发限制功能,通过 Redis Set 跟踪运行中任务,在任务启动/恢复/子流程启动时检查并发上限。整体设计合理,信号处理器中的清理逻辑覆盖了 FINISHED/REVOKED/FAILED 三种终态。
主要关注点:
- ⚡ 并发检查存在竞态条件(TOCTOU):
check_template_concurrency与update_running_task之间无原子保证,高并发下可能超限。建议使用 Redis Lua 脚本实现 check-and-add 原子操作。 ⚠️ ConcurrencyControlConfig.validate未处理非数字输入,会抛出ValueError而非ValidationError。⚠️ redis_inst_check装饰器在 Redis 不可用时返回None,而get_running_task_count标注返回int,调用方check_template_concurrency中running_count >= max_running会因None >= N异常。- ✨
default_value = 0应为"0"以保持与其他 TEXT 类型配置的一致性。
| {"space_id": space_id, "config_names": "concurrency_control"} | ||
| ) | ||
| space_configs = space_infos_result.get("data", {}).get("configs", {}) | ||
| max_running = int(space_configs.get("concurrency_control", 0)) |
There was a problem hiding this comment.
⚡ 竞态条件:check_template_concurrency 中先读 scard 再在外部 sadd,两步非原子。高并发下多个请求可能同时通过检查并超出并发上限。建议用 Redis Lua 脚本将 check+add 合并为原子操作。
| @@ -166,3 +167,79 @@ def extract_extra_info(constants, keys=None): | |||
| for key in list(constants.keys()) if not keys else keys: | |||
| extra_info.update({key: {"name": constants[key]["name"], "value": constants[key]["value"]}}) | |||
| return json.dumps(extra_info, ensure_ascii=False) | |||
There was a problem hiding this comment.
redis_inst_check 装饰器在 Redis 不可用时执行裸 return(返回 None),但本函数标注 -> int。下游 check_template_concurrency 中 running_count >= max_running 会抛 TypeError。建议改为 running_count = get_running_task_count(template_id) or 0。
| control = True | ||
|
|
||
| @classmethod | ||
| def validate(cls, value: str): |
There was a problem hiding this comment.
int(value) 对非数字输入会抛 ValueError,未被上层 Serializer 捕获为 400,而是返回 500。建议:```python
try:
num = int(value)
except (TypeError, ValueError):
raise ValidationError("...")
|
|
||
| class ConcurrencyControlConfig(BaseSpaceConfig): | ||
| name = "concurrency_control" | ||
| desc = _("流程并发控制") |
There was a problem hiding this comment.
✨ 其他 TEXT 类型配置的 default_value 均为字符串(如 "1h"、"true"),此处用整数 0 不一致,建议改为 "0"。
| logger.exception(f"[_remove_task_from_running_set] root_id={root_id} error: {e}") | ||
|
|
||
|
|
||
| @receiver(post_set_state) |
There was a problem hiding this comment.
to_state == FAILED 且 node_id != root_id(即单节点失败但整体流程未终止)时,也会执行 _remove_task_from_running_set。这会导致任务仍在运行但已从并发计数中移除,其他任务可趁机启动超出实际并发上限。建议仅在 node_id == root_id 时移除。
# Reviewed, transaction id: 81049
There was a problem hiding this comment.
Incremental Review Summary
最新提交 33852dd 修复了测试断言数量(12→13, 11→12),使其与新增的 ConcurrencyControlConfig 保持一致。修复正确。
之前报告的问题状态
| 问题 | 状态 |
|---|---|
| ⚡ TOCTOU 竞态条件(check + add 非原子) | 仍然存在,建议后续迭代用 Lua 脚本优化 |
redis_inst_check 返回 None 导致下游 TypeError |
仍然存在 |
ConcurrencyControlConfig.validate 中 int(value) 对非数字抛 ValueError 未捕获 |
仍然存在 |
✨ default_value = 0 应为字符串 "0" |
仍然存在 |
node_id == root_id 就移除任务 |
仍然存在(见下方补充说明) |
补充说明:FAILED 分支的移除逻辑
重新审视后,FAILED 分支在 node_id != root_id(单节点失败)时移除任务的设计 可能是有意为之:节点失败后任务暂停执行,此时释放并发槽位;后续用户 retry/skip 时会重新 check_template_concurrency + update_running_task(action="add") 加回。但这意味着:
- 在节点失败到用户 retry 之间,并发计数会少算一个正在「暂停等待处理」的任务
- 如果目标是严格控制同时「占用引擎资源」的任务数,当前行为合理
- 如果目标是控制「未完成」的任务数,则不应在单节点失败时移除
建议在代码注释中明确该设计意图,避免后续维护者误解。
本次增量提交无新问题。
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #749 +/- ##
===========================================
+ Coverage 82.14% 82.96% +0.82%
===========================================
Files 296 307 +11
Lines 17925 18254 +329
===========================================
+ Hits 14725 15145 +420
+ Misses 3200 3109 -91 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
Reviewed, transaction id: 81044