Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b3ec122
refactor: move human input schema ownership into dify
QuantumGhost Jun 30, 2026
56ff71e
feat: wire dify hitl callback into graphon
QuantumGhost Jun 30, 2026
f1ddc94
test: update hitl coverage for graphon v060
QuantumGhost Jun 30, 2026
581f34f
fix: remove remaining graphon hitl rendering dependency
QuantumGhost Jun 30, 2026
400b603
test: cover human input submitted rendering
QuantumGhost Jun 30, 2026
13a7837
fix: preserve hitl snapshot rendering semantics
QuantumGhost Jun 30, 2026
a7f14c6
test: verify human input resume node execution
QuantumGhost Jul 1, 2026
d88d8bf
refactor: share default human input session binding
QuantumGhost Jul 1, 2026
6587464
refactor: remove object-typed hitl boundary annotations
QuantumGhost Jul 1, 2026
3f8905b
fix: distinguish hitl timeout from global expiration
QuantumGhost Jul 1, 2026
99d1cf3
fix: align hitl timeout semantics with dify
QuantumGhost Jul 1, 2026
bb30e55
[autofix.ci] apply automated fixes
autofix-ci[bot] Jul 1, 2026
4e350c8
chore(api): fix type
QuantumGhost Jul 1, 2026
1f8e084
Merge remote-tracking branch 'origin/codex/hitl-graphon-v060-migratio…
QuantumGhost Jul 1, 2026
e68d81f
fix(api): fix typing issues
QuantumGhost Jul 1, 2026
4c0a059
WIP
QuantumGhost Jul 1, 2026
79b7b95
fix: stop writing legacy hitl pause reasons
QuantumGhost Jul 1, 2026
7cfc4de
test: align pause-reason fallout across unit suite
QuantumGhost Jul 1, 2026
f43eb54
Fix pytest_dify env isolation test
QuantumGhost Jul 2, 2026
56b370d
Restore pause reasons list shape
QuantumGhost Jul 2, 2026
a989676
Fix empty pause reasons test
QuantumGhost Jul 2, 2026
c088ebd
Fix ask_human expired resume semantics
QuantumGhost Jul 2, 2026
ab7ef3c
[autofix.ci] apply automated fixes
autofix-ci[bot] Jul 2, 2026
8a08e23
fix: tighten HITL pause boundary handling
QuantumGhost Jul 2, 2026
ce8834a
Merge remote-tracking branch 'origin/codex/hitl-graphon-v060-migratio…
QuantumGhost Jul 2, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .superpowers/sdd/hitl-timeout-semantics-impl-report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# HITL timeout semantics implementation report

## What changed

- Updated `api/core/workflow/nodes/human_input/callback.py` so `DifyHITLCallback` now preserves Dify's timeout split at the boundary:
- `HumanInputFormStatus.TIMEOUT` returns the graphon timeout branch via `Expired(selected_handle="__timeout__", ...)`.
- `HumanInputFormStatus.EXPIRED` is treated as an invalid resume state and raises `AssertionError`.
- `HumanInputFormStatus.WAITING` with a past global deadline is treated as an invalid resume state and raises `AssertionError`.
- `HumanInputFormStatus.WAITING` with only the node-level deadline expired still returns the timeout branch.
- Added `created_at` to `HumanInputFormEntity` and `_HumanInputFormEntityImpl` so the callback can compute the global deadline using Dify's shared `HUMAN_INPUT_GLOBAL_TIMEOUT_SECONDS` invariant.
- Kept the submitted and pause flows unchanged.
- Added focused unit coverage in `api/tests/unit_tests/core/workflow/test_human_input_callback.py` for:
- node timeout branch
- global expiration rejection
- waiting-form past node deadline timeout
- waiting-form past global deadline rejection

## Verification

- `uv run --project api pytest -o addopts='' api/tests/unit_tests/core/workflow/test_human_input_callback.py api/tests/unit_tests/core/workflow/nodes/human_input/test_human_input_form_filled_event.py -q`
- `git diff --check`

## Result

- The focused test set is expected to pass with the new `created_at` boundary in place.
- No unrelated files were modified.

## Concerns

- The callback now fails fast on invalid resume states by design. That is intentional, but any caller that previously relied on `EXPIRED` being mapped to the timeout branch will now see an assertion failure instead.
2 changes: 1 addition & 1 deletion api/controllers/console/app/workflow_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
with_current_user,
)
from core.workflow.human_input_forms import load_form_tokens_by_form_id as _load_form_tokens_by_form_id
from core.workflow.nodes.human_input.pause_reason import HumanInputRequired
from extensions.ext_database import db
from fields.base import ResponseModel
from fields.workflow_run_fields import (
Expand All @@ -33,7 +34,6 @@
WorkflowRunNodeExecutionResponse,
WorkflowRunPaginationResponse,
)
from graphon.entities.pause_reason import HumanInputRequired
from graphon.enums import WorkflowExecutionStatus
from libs.archive_storage import ArchiveStorageNotConfiguredError, get_archive_storage
from libs.custom_inputs import time_duration
Expand Down
2 changes: 1 addition & 1 deletion api/controllers/service_api/app/human_input_form.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
from controllers.service_api.schema import expect_with_user
from controllers.service_api.wraps import FetchUserArg, WhereisUserArg, validate_app_token
from core.workflow.human_input_policy import HumanInputSurface, is_recipient_type_allowed_for_surface
from core.workflow.nodes.human_input.entities import FormInputConfig
from extensions.ext_database import db
from fields.base import ResponseModel
from graphon.nodes.human_input.entities import FormInputConfig
from libs.helper import to_timestamp
from models.model import App, EndUser
from services.human_input_service import Form, FormNotFoundError, HumanInputService
Expand Down
2 changes: 1 addition & 1 deletion api/controllers/web/human_input_form.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
from controllers.web import web_ns
from controllers.web.error import WebFormRateLimitExceededError
from controllers.web.site import serialize_app_site_payload
from core.workflow.nodes.human_input.entities import FormInputConfig
from extensions.ext_database import db
from graphon.nodes.human_input.entities import FormInputConfig
from libs.helper import RateLimiter, extract_remote_ip, to_timestamp
from models.account import TenantStatus
from models.model import App, Site
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@
from core.ops.ops_trace_manager import TraceQueueManager
from core.repositories.human_input_repository import HumanInputFormRepositoryImpl
from core.workflow.file_reference import resolve_file_record_id
from core.workflow.nodes.human_input.pause_reason import HumanInputRequired
from core.workflow.system_variables import build_system_variables
from extensions.ext_database import db
from graphon.entities.pause_reason import HumanInputRequired
from graphon.enums import WorkflowExecutionStatus
from graphon.model_runtime.entities.llm_entities import LLMUsage
from graphon.model_runtime.utils.encoders import jsonable_encoder
Expand Down
2 changes: 1 addition & 1 deletion api/core/app/apps/common/workflow_response_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@
enrich_human_input_pause_reasons,
resolve_human_input_pause_reason_inputs,
)
from core.workflow.nodes.human_input.pause_reason import HumanInputRequired
from core.workflow.system_variables import SystemVariableKey, system_variables_to_mapping
from core.workflow.workflow_entry import WorkflowEntry
from extensions.ext_database import db
from graphon.entities import WorkflowStartReason
from graphon.entities.pause_reason import HumanInputRequired
from graphon.enums import (
BuiltinNodeTypes,
WorkflowExecutionStatus,
Expand Down
13 changes: 10 additions & 3 deletions api/core/app/apps/workflow_app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,15 @@
QueueWorkflowSucceededEvent,
)
from core.rag.entities import RetrievalSourceMetadata
from core.repositories.human_input_repository import HumanInputFormSubmissionRepository
from core.workflow.node_factory import (
DifyGraphInitContext,
DifyNodeFactory,
get_default_root_node_id,
resolve_workflow_node_class,
)
from core.workflow.nodes.human_input.boundary import enrich_graph_pause_reasons
from core.workflow.nodes.human_input.pause_reason import HumanInputRequired
from core.workflow.system_variables import (
build_bootstrap_variables,
default_system_variables,
Expand All @@ -51,7 +54,6 @@
from core.workflow.workflow_entry import WorkflowEntry
from core.workflow.workflow_run_outputs import project_node_outputs_for_workflow_run
from graphon.entities.graph_config import NodeConfigDictAdapter
from graphon.entities.pause_reason import HumanInputRequired
from graphon.graph import Graph
from graphon.graph_engine.layers import GraphEngineLayer
from graphon.graph_events import (
Expand Down Expand Up @@ -426,10 +428,15 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent):
case GraphRunPausedEvent():
runtime_state = workflow_entry.graph_engine.graph_runtime_state
paused_nodes = runtime_state.get_paused_nodes()
self._enqueue_human_input_notifications(event.reasons)
enriched_reasons = enrich_graph_pause_reasons(
reasons=event.reasons,
form_repository=HumanInputFormSubmissionRepository(),
variable_pool=runtime_state.variable_pool,
)
self._enqueue_human_input_notifications(enriched_reasons)
self._publish_event(
QueueWorkflowPausedEvent(
reasons=event.reasons,
reasons=enriched_reasons,
outputs=event.outputs,
paused_nodes=paused_nodes,
)
Expand Down
2 changes: 1 addition & 1 deletion api/core/app/entities/queue_entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

from core.app.entities.agent_strategy import AgentStrategyInfo
from core.rag.entities import RetrievalSourceMetadata
from core.workflow.nodes.human_input.pause_reason import PauseReason
from graphon.entities import WorkflowStartReason
from graphon.entities.pause_reason import PauseReason
from graphon.enums import NodeType, WorkflowNodeExecutionMetadataKey
from graphon.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk
from graphon.variables.segments import Segment
Expand Down
6 changes: 3 additions & 3 deletions api/core/app/entities/task_entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@

from core.app.entities.agent_strategy import AgentStrategyInfo
from core.rag.entities import RetrievalSourceMetadata
from core.workflow.nodes.human_input.entities import FormInputConfig, UserActionConfig
from core.workflow.nodes.human_input.pause_reason import DifyHITLEventType
from graphon.entities import WorkflowStartReason
from graphon.entities.pause_reason import PauseReasonType
from graphon.enums import WorkflowExecutionStatus, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from graphon.model_runtime.entities.llm_entities import LLMResult, LLMUsage
from graphon.nodes.human_input.entities import FormInputConfig, UserActionConfig


class AnnotationReplyAccount(BaseModel):
Expand Down Expand Up @@ -307,7 +307,7 @@ class HumanInputRequiredPauseReasonPayload(BaseModel):
``human_input_required`` events are available.
"""

TYPE: Literal[PauseReasonType.HUMAN_INPUT_REQUIRED] = PauseReasonType.HUMAN_INPUT_REQUIRED
TYPE: Literal[DifyHITLEventType.HUMAN_INPUT_REQUIRED] = DifyHITLEventType.HUMAN_INPUT_REQUIRED
form_id: str
node_id: str
node_title: str
Expand Down
12 changes: 11 additions & 1 deletion api/core/app/layers/pause_state_persist_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from sqlalchemy.orm import Session, sessionmaker

from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity
from core.repositories.human_input_repository import HumanInputFormSubmissionRepository
from core.workflow.nodes.human_input.boundary import enrich_graph_pause_reasons
from core.workflow.system_variables import SystemVariableKey, get_system_text
from graphon.graph_engine.layers import GraphEngineLayer
from graphon.graph_events import GraphEngineEvent, GraphRunPausedEvent
Expand Down Expand Up @@ -126,12 +128,20 @@ def on_event(self, event: GraphEngineEvent) -> None:
SystemVariableKey.WORKFLOW_EXECUTION_ID,
)
assert workflow_run_id is not None
# NOTE(QuantumGhost): Dify owns the pause-reason semantics that cross the
# persistence boundary. Graphon session ids are translated back to form ids
# here so repository/model layers only handle Dify-owned pause reasons.
pause_reasons = enrich_graph_pause_reasons(
reasons=event.reasons,
form_repository=HumanInputFormSubmissionRepository(),
variable_pool=self.graph_runtime_state.variable_pool,
)
repo = self._get_repo()
repo.create_workflow_pause(
workflow_run_id=workflow_run_id,
state_owner_user_id=self._state_owner_user_id,
state=state.dumps(),
pause_reasons=event.reasons,
pause_reasons=pause_reasons,
)

@override
Expand Down
2 changes: 1 addition & 1 deletion api/core/entities/execution_extra_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from pydantic import BaseModel, ConfigDict, Field, JsonValue

from graphon.nodes.human_input.entities import FormInputConfig, UserActionConfig
from core.workflow.nodes.human_input.entities import FormInputConfig, UserActionConfig
from models.execution_extra_content import ExecutionContentType


Expand Down
20 changes: 19 additions & 1 deletion api/core/plugin/impl/model_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import hashlib
import logging
from collections.abc import Generator, Iterable, Sequence
from collections.abc import Generator, Iterable, Mapping, Sequence
from typing import IO, Any, Literal, cast, overload, override

from pydantic import ValidationError
Expand Down Expand Up @@ -285,6 +285,7 @@ def invoke_llm(
tools: list[PromptMessageTool] | None,
stop: Sequence[str] | None,
stream: Literal[False],
request_metadata: Mapping[str, object] | None = None,
) -> LLMResult: ...

@overload
Expand All @@ -299,6 +300,7 @@ def invoke_llm(
tools: list[PromptMessageTool] | None,
stop: Sequence[str] | None,
stream: Literal[True],
request_metadata: Mapping[str, object] | None = None,
) -> Generator[LLMResultChunk, None, None]: ...

@override
Expand All @@ -313,7 +315,9 @@ def invoke_llm(
tools: list[PromptMessageTool] | None,
stop: Sequence[str] | None,
stream: bool,
request_metadata: Mapping[str, object] | None = None,
) -> LLMResult | Generator[LLMResultChunk, None, None]:
del request_metadata
plugin_id, provider_name = self._split_provider(provider)
result = self.client.invoke_llm(
tenant_id=self.tenant_id,
Expand Down Expand Up @@ -489,7 +493,9 @@ def invoke_text_embedding(
credentials: dict[str, Any],
texts: list[str],
input_type: EmbeddingInputType,
request_metadata: Mapping[str, object] | None = None,
) -> EmbeddingResult:
del request_metadata
plugin_id, provider_name = self._split_provider(provider)
return self.client.invoke_text_embedding(
tenant_id=self.tenant_id,
Expand All @@ -511,7 +517,9 @@ def invoke_multimodal_embedding(
credentials: dict[str, Any],
documents: list[dict[str, Any]],
input_type: EmbeddingInputType,
request_metadata: Mapping[str, object] | None = None,
) -> EmbeddingResult:
del request_metadata
plugin_id, provider_name = self._split_provider(provider)
return self.client.invoke_multimodal_embedding(
tenant_id=self.tenant_id,
Expand Down Expand Up @@ -555,7 +563,9 @@ def invoke_rerank(
docs: list[str],
score_threshold: float | None,
top_n: int | None,
request_metadata: Mapping[str, object] | None = None,
) -> RerankResult:
del request_metadata
plugin_id, provider_name = self._split_provider(provider)
return self.client.invoke_rerank(
tenant_id=self.tenant_id,
Expand All @@ -581,7 +591,9 @@ def invoke_multimodal_rerank(
docs: list[MultimodalRerankInput],
score_threshold: float | None,
top_n: int | None,
request_metadata: Mapping[str, object] | None = None,
) -> RerankResult:
del request_metadata
plugin_id, provider_name = self._split_provider(provider)
return self.client.invoke_multimodal_rerank(
tenant_id=self.tenant_id,
Expand All @@ -605,7 +617,9 @@ def invoke_tts(
credentials: dict[str, Any],
content_text: str,
voice: str,
request_metadata: Mapping[str, object] | None = None,
) -> Iterable[bytes]:
del request_metadata
plugin_id, provider_name = self._split_provider(provider)
return self.client.invoke_tts(
tenant_id=self.tenant_id,
Expand Down Expand Up @@ -646,7 +660,9 @@ def invoke_speech_to_text(
model: str,
credentials: dict[str, Any],
file: IO[bytes],
request_metadata: Mapping[str, object] | None = None,
) -> str:
del request_metadata
plugin_id, provider_name = self._split_provider(provider)
return self.client.invoke_speech_to_text(
tenant_id=self.tenant_id,
Expand All @@ -666,7 +682,9 @@ def invoke_moderation(
model: str,
credentials: dict[str, Any],
text: str,
request_metadata: Mapping[str, object] | None = None,
) -> bool:
del request_metadata
plugin_id, provider_name = self._split_provider(provider)
return self.client.invoke_moderation(
tenant_id=self.tenant_id,
Expand Down
19 changes: 17 additions & 2 deletions api/core/repositories/human_input_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
InteractiveSurfaceDeliveryMethod,
is_human_input_webapp_enabled,
)
from graphon.nodes.human_input.entities import FormDefinition, HumanInputNodeData
from graphon.nodes.human_input.enums import HumanInputFormKind, HumanInputFormStatus
from core.workflow.nodes.human_input.entities import FormDefinition, HumanInputNodeData
from core.workflow.nodes.human_input.enums import HumanInputFormKind, HumanInputFormStatus
from libs.datetime_utils import naive_utc_now
from libs.uuid_utils import uuidv7
from models.account import Account, TenantAccountJoin
Expand Down Expand Up @@ -93,6 +93,9 @@ def rendered_content(self) -> str: ...
@property
def selected_action_id(self) -> str | None: ...

@property
def created_at(self) -> datetime: ...

@property
def submitted_data(self) -> Mapping[str, Any] | None: ...

Expand Down Expand Up @@ -178,6 +181,11 @@ def rendered_content(self) -> str:
def selected_action_id(self) -> str | None:
return self._form_model.selected_action_id

@property
@override
def created_at(self) -> datetime:
return self._form_model.created_at

@property
@override
def submitted_data(self) -> Mapping[str, Any] | None:
Expand Down Expand Up @@ -572,6 +580,13 @@ def get_by_token(self, form_token: str) -> HumanInputFormRecord | None:
return None
return HumanInputFormRecord.from_models(recipient_model.form, recipient_model)

def get_by_form_id(self, form_id: str) -> HumanInputFormRecord | None:
with session_factory.create_session() as session:
form_model = session.get(HumanInputForm, form_id)
if form_model is None:
return None
return HumanInputFormRecord.from_models(form_model, None)

def get_by_form_id_and_recipient_type(
self,
form_id: str,
Expand Down
12 changes: 8 additions & 4 deletions api/core/workflow/human_input_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
from enum import StrEnum
from typing import Any, NamedTuple

from graphon.entities.pause_reason import HumanInputRequired, PauseReason, PauseReasonType
from graphon.nodes.human_input.entities import FormInputConfig, SelectInputConfig
from graphon.nodes.human_input.enums import ValueSourceType
from core.workflow.nodes.human_input.entities import FormInputConfig, SelectInputConfig
from core.workflow.nodes.human_input.enums import ValueSourceType
from core.workflow.nodes.human_input.pause_reason import (
DifyHITLEventType,
HumanInputRequired,
PauseReason,
)
from graphon.runtime.graph_runtime_state_protocol import ReadOnlyVariablePool
from graphon.variables import ArrayStringSegment
from models.human_input import ApprovalChannel, RecipientType
Expand Down Expand Up @@ -97,7 +101,7 @@ def enrich_human_input_pause_reasons(
enriched: list[dict[str, Any]] = []
for reason in reasons:
updated = dict(reason)
if updated.get("TYPE") == PauseReasonType.HUMAN_INPUT_REQUIRED:
if updated.get("TYPE") == DifyHITLEventType.HUMAN_INPUT_REQUIRED:
form_id = updated.get("form_id")
if isinstance(form_id, str):
disposition = dispositions_by_form_id.get(form_id)
Expand Down
Loading
Loading