Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ HINDSIGHT_API_LLM_BASE_URL=https://api.openai.com/v1
HINDSIGHT_API_HOST=0.0.0.0
HINDSIGHT_API_PORT=8888
HINDSIGHT_API_LOG_LEVEL=info
# Optional retain chunking override for structured logs/transcripts.
# Unset uses HINDSIGHT_API_RETAIN_CHUNK_SIZE as the structured-chunk limit.
# HINDSIGHT_API_RETAIN_STRUCTURED_CHUNK_SIZE=

# Base Path / Reverse Proxy Support (Optional)
# Set these when deploying behind a reverse proxy with path-based routing
Expand Down
19 changes: 17 additions & 2 deletions hindsight-api-slim/hindsight_api/api/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,14 @@ class CreateBankRequest(BaseModel):
)
retain_chunk_size: int | None = Field(
default=None,
description="Maximum token size for each content chunk during retain.",
description="Target maximum characters for each content chunk during retain.",
)
retain_structured_chunk_size: int | None = Field(
default=None,
description=(
"Maximum characters for a single JSONL line or conversation turn to keep whole during retain. "
"Defaults to retain_chunk_size when unset."
),
)
enable_observations: bool | None = Field(
default=None,
Expand Down Expand Up @@ -1189,6 +1196,7 @@ def get_config_updates(self) -> dict[str, Any]:
"retain_extraction_mode",
"retain_custom_instructions",
"retain_chunk_size",
"retain_structured_chunk_size",
"enable_observations",
"observations_mission",
):
Expand Down Expand Up @@ -1994,7 +2002,14 @@ class BankTemplateConfig(BaseModel):
retain_custom_instructions: str | None = Field(
default=None, description="Custom extraction prompt (when mode='custom')"
)
retain_chunk_size: int | None = Field(default=None, description="Max token size for each content chunk")
retain_chunk_size: int | None = Field(default=None, description="Target max characters for each content chunk")
retain_structured_chunk_size: int | None = Field(
default=None,
description=(
"Max characters for a single JSONL line or conversation turn to keep whole; "
"defaults to retain_chunk_size when unset"
),
)
enable_observations: bool | None = Field(default=None, description="Toggle observation consolidation")
observations_mission: str | None = Field(default=None, description="Controls what gets synthesised")
disposition_skepticism: int | None = Field(default=None, ge=1, le=5, description="Skepticism trait (1-5)")
Expand Down
28 changes: 28 additions & 0 deletions hindsight-api-slim/hindsight_api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ def normalize_config_dict(config: dict[str, Any]) -> dict[str, Any]:
# Retain settings
ENV_RETAIN_MAX_COMPLETION_TOKENS = "HINDSIGHT_API_RETAIN_MAX_COMPLETION_TOKENS"
ENV_RETAIN_CHUNK_SIZE = "HINDSIGHT_API_RETAIN_CHUNK_SIZE"
ENV_RETAIN_STRUCTURED_CHUNK_SIZE = "HINDSIGHT_API_RETAIN_STRUCTURED_CHUNK_SIZE"
ENV_RETAIN_EXTRACT_CAUSAL_LINKS = "HINDSIGHT_API_RETAIN_EXTRACT_CAUSAL_LINKS"
ENV_RETAIN_EXTRACTION_MODE = "HINDSIGHT_API_RETAIN_EXTRACTION_MODE"
ENV_RETAIN_MISSION = "HINDSIGHT_API_RETAIN_MISSION"
Expand Down Expand Up @@ -1077,6 +1078,25 @@ def _parse_optional_positive_int(name: str, raw: str | None) -> int | None:
return _parse_positive_int(name, raw, 1)


def _validate_retain_chunking_int(name: str, value: Any) -> int:
if isinstance(value, bool) or not isinstance(value, int):
raise ValueError(f"{name} must be an integer, got {value!r}")
if value < 1:
raise ValueError(f"{name} must be >= 1, got {value}")
return value


def validate_retain_chunking_config(retain_chunk_size: Any, retain_structured_chunk_size: Any) -> None:
"""Validate retain chunking size fields."""
_validate_retain_chunking_int("HINDSIGHT_API_RETAIN_CHUNK_SIZE", retain_chunk_size)
if retain_structured_chunk_size is None:
return
_validate_retain_chunking_int(
"HINDSIGHT_API_RETAIN_STRUCTURED_CHUNK_SIZE",
retain_structured_chunk_size,
)


def _parse_optional_choice(name: str, raw: str | None, allowed: frozenset[str]) -> str | None:
"""Parse an optional string env var constrained to a small allowlist."""
if raw is None or raw == "":
Expand Down Expand Up @@ -1430,6 +1450,7 @@ class HindsightConfig:
# Retain settings
retain_max_completion_tokens: int
retain_chunk_size: int
retain_structured_chunk_size: int | None
retain_extract_causal_links: bool
retain_extraction_mode: str
retain_mission: str | None
Expand Down Expand Up @@ -1657,6 +1678,7 @@ class HindsightConfig:
"mcp_enabled_tools",
# Retention settings (behavioral)
"retain_chunk_size",
"retain_structured_chunk_size",
"retain_extraction_mode",
"retain_mission",
"retain_custom_instructions",
Expand Down Expand Up @@ -1816,6 +1838,8 @@ def validate(self) -> None:
"disabling observations/consolidation. Reflect will return HTTP 400."
)

validate_retain_chunking_config(self.retain_chunk_size, self.retain_structured_chunk_size)

# RETAIN_MAX_COMPLETION_TOKENS must be greater than RETAIN_CHUNK_SIZE
# to ensure the LLM has enough output capacity to extract facts from chunks
# (not applicable when provider is "none" since no LLM calls are made)
Expand Down Expand Up @@ -2303,6 +2327,10 @@ def from_env(cls) -> "HindsightConfig":
os.getenv(ENV_RETAIN_MAX_COMPLETION_TOKENS, str(DEFAULT_RETAIN_MAX_COMPLETION_TOKENS))
),
retain_chunk_size=int(os.getenv(ENV_RETAIN_CHUNK_SIZE, str(DEFAULT_RETAIN_CHUNK_SIZE))),
retain_structured_chunk_size=_parse_optional_positive_int(
ENV_RETAIN_STRUCTURED_CHUNK_SIZE,
os.getenv(ENV_RETAIN_STRUCTURED_CHUNK_SIZE),
),
retain_extract_causal_links=os.getenv(
ENV_RETAIN_EXTRACT_CAUSAL_LINKS, str(DEFAULT_RETAIN_EXTRACT_CAUSAL_LINKS)
).lower()
Expand Down
97 changes: 78 additions & 19 deletions hindsight-api-slim/hindsight_api/config_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
HindsightConfig,
_get_raw_config,
normalize_config_dict,
validate_retain_chunking_config,
)
from hindsight_api.engine.memory_engine import fq_table
from hindsight_api.extensions.tenant import TenantExtension
Expand All @@ -29,6 +30,27 @@
logger = logging.getLogger(__name__)


def _validate_retain_strategy_chunking(base_config: HindsightConfig, strategies: Any) -> None:
"""Validate retain strategy chunking with the same semantics as apply_strategy()."""
if not isinstance(strategies, dict):
return
configurable = HindsightConfig.get_configurable_fields()
for strategy_name, overrides in strategies.items():
if not isinstance(overrides, dict):
continue
filtered = {k: v for k, v in overrides.items() if k in configurable}
if not filtered:
continue
try:
resolved = replace(base_config, **filtered)
validate_retain_chunking_config(
resolved.retain_chunk_size,
resolved.retain_structured_chunk_size,
)
except ValueError as e:
raise ValueError(f"Invalid retain strategy {strategy_name!r}: {e}") from e


class ConfigResolver:
"""Resolves hierarchical configuration with tenant/bank overrides."""

Expand All @@ -46,6 +68,26 @@ def __init__(self, backend: "DatabaseBackend", tenant_extension: TenantExtension
self._configurable_fields = HindsightConfig.get_configurable_fields()
self._credential_fields = HindsightConfig.get_credential_fields()

async def _resolve_parent_config_dict(self, bank_id: str, context: RequestContext | None = None) -> dict[str, Any]:
"""Resolve global + tenant config before bank-level overrides."""
config_dict = asdict(self._global_config)

if self.tenant_extension and context:
try:
tenant_overrides = await self.tenant_extension.get_tenant_config(context)
if tenant_overrides:
# Normalize keys and filter to configurable fields only
normalized_tenant = normalize_config_dict(tenant_overrides)
configurable_tenant = {k: v for k, v in normalized_tenant.items() if k in self._configurable_fields}
config_dict.update(configurable_tenant)
logger.debug(
f"Applied tenant config overrides for bank {bank_id}: {list(configurable_tenant.keys())}"
)
except Exception as e:
logger.warning(f"Failed to load tenant config for bank {bank_id}: {e}")

return config_dict

async def resolve_full_config(self, bank_id: str, context: RequestContext | None = None) -> HindsightConfig:
"""
Resolve full HindsightConfig for a bank with hierarchical overrides applied.
Expand All @@ -65,23 +107,7 @@ async def resolve_full_config(self, bank_id: str, context: RequestContext | None
Returns:
Complete HindsightConfig with hierarchical overrides applied
"""
# Start with global config (all fields)
config_dict = asdict(self._global_config)

# Load tenant config overrides (if tenant extension available)
if self.tenant_extension and context:
try:
tenant_overrides = await self.tenant_extension.get_tenant_config(context)
if tenant_overrides:
# Normalize keys and filter to configurable fields only
normalized_tenant = normalize_config_dict(tenant_overrides)
configurable_tenant = {k: v for k, v in normalized_tenant.items() if k in self._configurable_fields}
config_dict.update(configurable_tenant)
logger.debug(
f"Applied tenant config overrides for bank {bank_id}: {list(configurable_tenant.keys())}"
)
except Exception as e:
logger.warning(f"Failed to load tenant config for bank {bank_id}: {e}")
config_dict = await self._resolve_parent_config_dict(bank_id, context)

# Load bank config overrides
bank_overrides = await self._load_bank_config(bank_id)
Expand All @@ -92,6 +118,10 @@ async def resolve_full_config(self, bank_id: str, context: RequestContext | None
# Return full config object (dataclass doesn't have __init__ that accepts kwargs, so we update the object)
# Create a new config instance by copying the global config and updating fields
resolved_config = HindsightConfig(**config_dict)
validate_retain_chunking_config(
resolved_config.retain_chunk_size,
resolved_config.retain_structured_chunk_size,
)
return resolved_config

async def get_bank_config(self, bank_id: str, context: RequestContext | None = None) -> dict[str, Any]:
Expand Down Expand Up @@ -266,6 +296,32 @@ async def update_bank_config(
# Validate recall budget fields
_validate_recall_budget_updates(normalized_updates)

chunking_fields_updated = (
"retain_chunk_size" in normalized_updates
or "retain_structured_chunk_size" in normalized_updates
or "retain_strategies" in normalized_updates
)
if chunking_fields_updated:
config_dict = await self._resolve_parent_config_dict(bank_id, context)
active_bank_overrides = await self._load_bank_config(bank_id)
active_bank_overrides.update(
{
key: value
for key, value in normalized_updates.items()
if key in self._configurable_fields and value is not None
}
)
for key, value in normalized_updates.items():
if key in self._configurable_fields and value is None:
active_bank_overrides.pop(key, None)
config_dict.update(active_bank_overrides)
base_config = HindsightConfig(**config_dict)
validate_retain_chunking_config(
base_config.retain_chunk_size,
base_config.retain_structured_chunk_size,
)
_validate_retain_strategy_chunking(base_config, base_config.retain_strategies)

# Persist the override. Banks are created lazily (on first retain), so a
# PATCH that precedes any ingestion would otherwise UPDATE zero rows and
# silently no-op while returning 200. Ensure the bank row exists first
Expand Down Expand Up @@ -364,7 +420,8 @@ def apply_strategy(config: HindsightConfig, strategy_name: str) -> HindsightConf
A strategy is a named set of hierarchical field overrides stored in
config.retain_strategies. Any field in _HIERARCHICAL_FIELDS can be
overridden, including retain_extraction_mode, retain_chunk_size,
entity_labels, entities_allow_free_form, etc.
retain_structured_chunk_size, entity_labels,
entities_allow_free_form, etc.

Unknown strategy names log a warning and return config unchanged.
Unknown or non-hierarchical fields in the strategy are silently ignored.
Expand All @@ -386,4 +443,6 @@ def apply_strategy(config: HindsightConfig, strategy_name: str) -> HindsightConf
return config

logger.debug(f"Applying retain strategy '{strategy_name}': {list(filtered.keys())}")
return replace(config, **filtered)
resolved = replace(config, **filtered)
validate_retain_chunking_config(resolved.retain_chunk_size, resolved.retain_structured_chunk_size)
return resolved
33 changes: 26 additions & 7 deletions hindsight-api-slim/hindsight_api/engine/memory_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,12 @@ class _SubBatchSplit:
document_body_overrides: list[str | None] = field(default_factory=list)


@dataclass(frozen=True)
class _RetainChunkingConfig:
chunk_size: int
structured_chunk_size: int | None


def _split_contents_into_sub_batches(
contents: list[RetainContentDict],
tokens_per_batch: int,
Expand Down Expand Up @@ -3211,7 +3217,7 @@ async def retain_batch_async(
# with, so the offsets match the chunk_index values it assigns.
from .retain import fact_extraction, fact_storage

sub_chunk_size = await self._resolve_retain_chunk_size(bank_id, request_context, strategy)
chunking_config = await self._resolve_retain_chunking_config(bank_id, request_context, strategy)
chunk_offsets: dict[str, int] = {}

# In update_mode="append", retain_batch prepends the existing document
Expand All @@ -3234,7 +3240,11 @@ async def retain_batch_async(
existing_text = await fact_storage.get_document_content(conn, bank_id, append_doc_id)
if existing_text:
append_prepend_chunks[append_doc_id] = len(
fact_extraction.chunk_text(existing_text, sub_chunk_size)
fact_extraction.chunk_text(
existing_text,
chunking_config.chunk_size,
structured_chunk_size=chunking_config.structured_chunk_size,
)
)

for i, (sub_batch, sub_origins) in enumerate(zip(sub_batches, origin_indices), 1):
Expand Down Expand Up @@ -3288,7 +3298,13 @@ async def retain_batch_async(
# document continues the sequence.
if sub_doc_id:
sub_chunk_count = sum(
len(fact_extraction.chunk_text(item.get("content", "") or "", sub_chunk_size))
len(
fact_extraction.chunk_text(
item.get("content", "") or "",
chunking_config.chunk_size,
structured_chunk_size=chunking_config.structured_chunk_size,
)
)
for item in sub_batch
)
# retain_batch only prepends the existing body on the global
Expand Down Expand Up @@ -3399,13 +3415,13 @@ async def _submit_post_insert_maintenance(
except Exception as e:
logger.warning(f"Failed to submit graph maintenance task for bank {bank_id}: {e}")

async def _resolve_retain_chunk_size(
async def _resolve_retain_chunking_config(
self,
bank_id: str,
request_context: "RequestContext",
strategy: str | None,
) -> int:
"""Resolve the effective ``retain_chunk_size`` for a bank.
) -> _RetainChunkingConfig:
"""Resolve the effective retain chunking settings for a bank.

Mirrors the bank-config + strategy resolution that
``_retain_batch_async_internal`` applies before handing config to the
Expand All @@ -3419,7 +3435,10 @@ async def _resolve_retain_chunk_size(
effective_strategy = strategy or resolved_config.retain_default_strategy
if effective_strategy:
resolved_config = apply_strategy(resolved_config, effective_strategy)
return getattr(resolved_config, "retain_chunk_size", 3000)
return _RetainChunkingConfig(
chunk_size=getattr(resolved_config, "retain_chunk_size", 3000),
structured_chunk_size=getattr(resolved_config, "retain_structured_chunk_size", None),
)

async def _retain_batch_async_internal(
self,
Expand Down
Loading