Skip to content

feat: 流程任务支持并发限制 --story=134711116#749

Open
guohelu wants to merge 2 commits into
TencentBlueKing:developfrom
guohelu:develop_0526
Open

feat: 流程任务支持并发限制 --story=134711116#749
guohelu wants to merge 2 commits into
TencentBlueKing:developfrom
guohelu:develop_0526

Conversation

@guohelu

@guohelu guohelu commented May 27, 2026

Copy link
Copy Markdown
Collaborator

Reviewed, transaction id: 81044

@github-actions github-actions Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Summary

本 PR 实现了流程任务的并发限制功能,通过 Redis Set 跟踪运行中任务,在任务启动/恢复/子流程启动时检查并发上限。整体设计合理,信号处理器中的清理逻辑覆盖了 FINISHED/REVOKED/FAILED 三种终态。

主要关注点:

  1. 并发检查存在竞态条件(TOCTOU)check_template_concurrencyupdate_running_task 之间无原子保证,高并发下可能超限。建议使用 Redis Lua 脚本实现 check-and-add 原子操作。
  2. ⚠️ ConcurrencyControlConfig.validate 未处理非数字输入,会抛出 ValueError 而非 ValidationError
  3. ⚠️ redis_inst_check 装饰器在 Redis 不可用时返回 None,而 get_running_task_count 标注返回 int,调用方 check_template_concurrencyrunning_count >= max_running 会因 None >= N 异常。
  4. default_value = 0 应为 "0" 以保持与其他 TEXT 类型配置的一致性。

Comment thread bkflow/task/utils.py
{"space_id": space_id, "config_names": "concurrency_control"}
)
space_configs = space_infos_result.get("data", {}).get("configs", {})
max_running = int(space_configs.get("concurrency_control", 0))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

竞态条件check_template_concurrency 中先读 scard 再在外部 sadd,两步非原子。高并发下多个请求可能同时通过检查并超出并发上限。建议用 Redis Lua 脚本将 check+add 合并为原子操作。

Comment thread bkflow/task/utils.py
@@ -166,3 +167,79 @@ def extract_extra_info(constants, keys=None):
for key in list(constants.keys()) if not keys else keys:
extra_info.update({key: {"name": constants[key]["name"], "value": constants[key]["value"]}})
return json.dumps(extra_info, ensure_ascii=False)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ redis_inst_check 装饰器在 Redis 不可用时执行裸 return(返回 None),但本函数标注 -> int。下游 check_template_concurrencyrunning_count >= max_running 会抛 TypeError。建议改为 running_count = get_running_task_count(template_id) or 0

Comment thread bkflow/space/configs.py
control = True

@classmethod
def validate(cls, value: str):

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ int(value) 对非数字输入会抛 ValueError,未被上层 Serializer 捕获为 400,而是返回 500。建议:```python
try:
num = int(value)
except (TypeError, ValueError):
raise ValidationError("...")

Comment thread bkflow/space/configs.py

class ConcurrencyControlConfig(BaseSpaceConfig):
name = "concurrency_control"
desc = _("流程并发控制")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✨ 其他 TEXT 类型配置的 default_value 均为字符串(如 "1h""true"),此处用整数 0 不一致,建议改为 "0"

logger.exception(f"[_remove_task_from_running_set] root_id={root_id} error: {e}")


@receiver(post_set_state)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️to_state == FAILEDnode_id != root_id(即单节点失败但整体流程未终止)时,也会执行 _remove_task_from_running_set。这会导致任务仍在运行但已从并发计数中移除,其他任务可趁机启动超出实际并发上限。建议仅在 node_id == root_id 时移除。

# Reviewed, transaction id: 81049

@github-actions github-actions Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incremental Review Summary

最新提交 33852dd 修复了测试断言数量(12→13, 11→12),使其与新增的 ConcurrencyControlConfig 保持一致。修复正确。

之前报告的问题状态

问题 状态
⚡ TOCTOU 竞态条件(check + add 非原子) 仍然存在,建议后续迭代用 Lua 脚本优化
⚠️ redis_inst_check 返回 None 导致下游 TypeError 仍然存在
⚠️ ConcurrencyControlConfig.validateint(value) 对非数字抛 ValueError 未捕获 仍然存在
default_value = 0 应为字符串 "0" 仍然存在
⚠️ FAILED 状态未检查 node_id == root_id 就移除任务 仍然存在(见下方补充说明)

补充说明:FAILED 分支的移除逻辑

重新审视后,FAILED 分支在 node_id != root_id(单节点失败)时移除任务的设计 可能是有意为之:节点失败后任务暂停执行,此时释放并发槽位;后续用户 retry/skip 时会重新 check_template_concurrency + update_running_task(action="add") 加回。但这意味着:

  1. 在节点失败到用户 retry 之间,并发计数会少算一个正在「暂停等待处理」的任务
  2. 如果目标是严格控制同时「占用引擎资源」的任务数,当前行为合理
  3. 如果目标是控制「未完成」的任务数,则不应在单节点失败时移除

建议在代码注释中明确该设计意图,避免后续维护者误解。

本次增量提交无新问题。

@codecov-commenter

codecov-commenter commented May 27, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 55.20833% with 43 lines in your changes missing coverage. Please review.
✅ Project coverage is 82.96%. Comparing base (ea70462) to head (33852dd).
⚠️ Report is 22 commits behind head on develop.

Files with missing lines Patch % Lines
bkflow/task/utils.py 56.52% 20 Missing ⚠️
bkflow/task/signals/handlers.py 18.18% 9 Missing ⚠️
...components/collections/subprocess_plugin/v1_0_0.py 12.50% 7 Missing ⚠️
bkflow/space/configs.py 72.72% 3 Missing ⚠️
bkflow/task/celery/tasks.py 60.00% 2 Missing ⚠️
bkflow/task/views.py 86.66% 2 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff             @@
##           develop     #749      +/-   ##
===========================================
+ Coverage    82.14%   82.96%   +0.82%     
===========================================
  Files          296      307      +11     
  Lines        17925    18254     +329     
===========================================
+ Hits         14725    15145     +420     
+ Misses        3200     3109      -91     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants