Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions hindsight-api-slim/hindsight_api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ def normalize_config_dict(config: dict[str, Any]) -> dict[str, Any]:
# Retain settings
ENV_RETAIN_MAX_COMPLETION_TOKENS = "HINDSIGHT_API_RETAIN_MAX_COMPLETION_TOKENS"
ENV_RETAIN_CHUNK_SIZE = "HINDSIGHT_API_RETAIN_CHUNK_SIZE"
ENV_RETAIN_CHUNK_OVERFLOW_FACTOR = "HINDSIGHT_API_RETAIN_CHUNK_OVERFLOW_FACTOR"
ENV_RETAIN_EXTRACT_CAUSAL_LINKS = "HINDSIGHT_API_RETAIN_EXTRACT_CAUSAL_LINKS"
ENV_RETAIN_EXTRACTION_MODE = "HINDSIGHT_API_RETAIN_EXTRACTION_MODE"
ENV_RETAIN_MISSION = "HINDSIGHT_API_RETAIN_MISSION"
Expand Down Expand Up @@ -818,6 +819,9 @@ def _parse_strategy_boosts(raw: str | None) -> dict[str, str]:
# Retain settings
DEFAULT_RETAIN_MAX_COMPLETION_TOKENS = 64000 # Max tokens for fact extraction LLM call
DEFAULT_RETAIN_CHUNK_SIZE = 3000 # Max chars per chunk for fact extraction
DEFAULT_RETAIN_CHUNK_OVERFLOW_FACTOR = (
1.5 # A whole JSONL line / conversation turn is kept intact up to this × chunk size
)
DEFAULT_RETAIN_EXTRACT_CAUSAL_LINKS = True # Extract causal links between facts
DEFAULT_RETAIN_EXTRACTION_MODE = "concise" # Extraction mode: "concise", "verbose", or "custom"
RETAIN_EXTRACTION_MODES = ("concise", "verbose", "custom", "verbatim", "chunks") # Allowed extraction modes
Expand Down Expand Up @@ -1066,6 +1070,25 @@ def _parse_positive_int(name: str, raw: str | None, default: int) -> int:
return parsed


def _parse_chunk_overflow_factor(raw: str | None) -> float:
"""Parse the retain chunk overflow factor (a float >= 1.0).

The factor is how far a single JSONL line / conversation turn may exceed the
chunk-size budget while still being kept whole. A value below 1.0 would split
units that already fit the budget, so it fails fast. Falls back to
``DEFAULT_RETAIN_CHUNK_OVERFLOW_FACTOR`` when unset/empty.
"""
if raw is None or raw == "":
return DEFAULT_RETAIN_CHUNK_OVERFLOW_FACTOR
try:
parsed = float(raw)
except ValueError as e:
raise ValueError(f"{ENV_RETAIN_CHUNK_OVERFLOW_FACTOR} must be a number, got {raw!r}") from e
if parsed < 1.0:
raise ValueError(f"{ENV_RETAIN_CHUNK_OVERFLOW_FACTOR} must be >= 1.0, got {parsed}")
return parsed


def _parse_optional_positive_int(name: str, raw: str | None) -> int | None:
"""Parse an optional env var that must be a positive integer when set."""
if raw is None or raw == "":
Expand Down Expand Up @@ -1426,6 +1449,7 @@ class HindsightConfig:
# Retain settings
retain_max_completion_tokens: int
retain_chunk_size: int
retain_chunk_overflow_factor: float # Keep a JSONL line / conversation turn whole up to this × chunk size
retain_extract_causal_links: bool
retain_extraction_mode: str
retain_mission: str | None
Expand Down Expand Up @@ -1649,6 +1673,7 @@ class HindsightConfig:
"mcp_enabled_tools",
# Retention settings (behavioral)
"retain_chunk_size",
"retain_chunk_overflow_factor",
"retain_extraction_mode",
"retain_mission",
"retain_custom_instructions",
Expand Down Expand Up @@ -2294,6 +2319,7 @@ def from_env(cls) -> "HindsightConfig":
os.getenv(ENV_RETAIN_MAX_COMPLETION_TOKENS, str(DEFAULT_RETAIN_MAX_COMPLETION_TOKENS))
),
retain_chunk_size=int(os.getenv(ENV_RETAIN_CHUNK_SIZE, str(DEFAULT_RETAIN_CHUNK_SIZE))),
retain_chunk_overflow_factor=_parse_chunk_overflow_factor(os.getenv(ENV_RETAIN_CHUNK_OVERFLOW_FACTOR)),
retain_extract_causal_links=os.getenv(
ENV_RETAIN_EXTRACT_CAUSAL_LINKS, str(DEFAULT_RETAIN_EXTRACT_CAUSAL_LINKS)
).lower()
Expand Down
39 changes: 31 additions & 8 deletions hindsight-api-slim/hindsight_api/engine/memory_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,19 @@ def _is_oracledb_integrity_error(e: Exception) -> bool:
return isinstance(e, oracledb.IntegrityError)


@dataclass
class RetainChunkParams:
"""Bank-resolved parameters that govern how retain content is chunked.

Both fields must mirror what the orchestrator uses so that chunk-count
estimates (for per-document ``chunk_index`` offsets) match the chunk
boundaries the orchestrator actually produces.
"""

chunk_size: int
overflow_factor: float


@dataclass
class _SubBatchSplit:
"""Result of packing retain contents into sub-batches.
Expand Down Expand Up @@ -3225,7 +3238,7 @@ async def retain_batch_async(
# with, so the offsets match the chunk_index values it assigns.
from .retain import fact_extraction, fact_storage

sub_chunk_size = await self._resolve_retain_chunk_size(bank_id, request_context, strategy)
chunk_params = await self._resolve_retain_chunk_params(bank_id, request_context, strategy)
chunk_offsets: dict[str, int] = {}

# In update_mode="append", retain_batch prepends the existing document
Expand All @@ -3248,7 +3261,7 @@ async def retain_batch_async(
existing_text = await fact_storage.get_document_content(conn, bank_id, append_doc_id)
if existing_text:
append_prepend_chunks[append_doc_id] = len(
fact_extraction.chunk_text(existing_text, sub_chunk_size)
fact_extraction.chunk_text(existing_text, chunk_params.chunk_size, chunk_params.overflow_factor)
)

for i, (sub_batch, sub_origins) in enumerate(zip(sub_batches, origin_indices), 1):
Expand Down Expand Up @@ -3302,7 +3315,13 @@ async def retain_batch_async(
# document continues the sequence.
if sub_doc_id:
sub_chunk_count = sum(
len(fact_extraction.chunk_text(item.get("content", "") or "", sub_chunk_size))
len(
fact_extraction.chunk_text(
item.get("content", "") or "",
chunk_params.chunk_size,
chunk_params.overflow_factor,
)
)
for item in sub_batch
)
# retain_batch only prepends the existing body on the global
Expand Down Expand Up @@ -3413,27 +3432,31 @@ async def _submit_post_insert_maintenance(
except Exception as e:
logger.warning(f"Failed to submit graph maintenance task for bank {bank_id}: {e}")

async def _resolve_retain_chunk_size(
async def _resolve_retain_chunk_params(
self,
bank_id: str,
request_context: "RequestContext",
strategy: str | None,
) -> int:
"""Resolve the effective ``retain_chunk_size`` for a bank.
) -> "RetainChunkParams":
"""Resolve the effective chunking parameters for a bank.

Mirrors the bank-config + strategy resolution that
``_retain_batch_async_internal`` applies before handing config to the
orchestrator, so chunk-count estimates used for per-document
chunk_index offsets match the chunk_index values the orchestrator
actually assigns.
actually assigns. Both the chunk size and the overflow factor must match,
since the overflow factor affects how oversized units are chunked.
"""
from hindsight_api.config_resolver import apply_strategy

resolved_config = await self._config_resolver.resolve_full_config(bank_id, request_context)
effective_strategy = strategy or resolved_config.retain_default_strategy
if effective_strategy:
resolved_config = apply_strategy(resolved_config, effective_strategy)
return getattr(resolved_config, "retain_chunk_size", 3000)
return RetainChunkParams(
chunk_size=getattr(resolved_config, "retain_chunk_size", 3000),
overflow_factor=getattr(resolved_config, "retain_chunk_overflow_factor", 1.5),
)

async def _retain_batch_async_internal(
self,
Expand Down
35 changes: 23 additions & 12 deletions hindsight-api-slim/hindsight_api/engine/retain/fact_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,18 +447,21 @@ def _split_oversized_unit(text: str, max_chars: int) -> list[str]:
return splitter.split_text(text)


def chunk_text(text: str, max_chars: int) -> list[str]:
def chunk_text(text: str, max_chars: int, overflow_factor: float = _CHUNK_OVERFLOW_FACTOR) -> list[str]:
"""
Split text into chunks, preserving conversation structure when possible.

For JSON conversation arrays (user/assistant turns) and JSONL (newline-delimited
JSON objects), splits at turn/line boundaries so no object is split across chunks.
A single turn/line that overflows is kept whole up to ``_CHUNK_OVERFLOW_FACTOR``×
A single turn/line that overflows is kept whole up to ``overflow_factor``×
the budget, then split as text. For plain text, uses sentence-aware splitting.

Args:
text: Input text to chunk (plain text, JSON conversation, or JSONL)
max_chars: Maximum characters per chunk (default 120k ≈ 30k tokens)
overflow_factor: How far a single turn/line may exceed ``max_chars`` and
still be kept whole (>= 1.0). Raise it to keep larger structured units
(e.g. long JSONL messages) intact; see issue #2136.

Returns:
List of text chunks, roughly under max_chars
Expand All @@ -472,32 +475,36 @@ def chunk_text(text: str, max_chars: int) -> list[str]:
parsed = json.loads(text)
if isinstance(parsed, list) and all(isinstance(turn, dict) for turn in parsed):
# This looks like a conversation - chunk at turn boundaries
return _chunk_conversation(parsed, max_chars)
return _chunk_conversation(parsed, max_chars, overflow_factor)
except (json.JSONDecodeError, ValueError):
pass

# Try to parse as JSONL (newline-delimited JSON objects, e.g. session logs)
jsonl_chunks = _chunk_jsonl(text, max_chars)
jsonl_chunks = _chunk_jsonl(text, max_chars, overflow_factor)
if jsonl_chunks is not None:
return jsonl_chunks

# Fall back to sentence-aware text splitting
return _split_oversized_unit(text, max_chars)


def _chunk_conversation(turns: list[dict], max_chars: int) -> list[str]:
def _chunk_conversation(
turns: list[dict], max_chars: int, overflow_factor: float = _CHUNK_OVERFLOW_FACTOR
) -> list[str]:
"""
Chunk a conversation array at turn boundaries, preserving complete turns.

Args:
turns: List of conversation turn dicts (with 'role' and 'content' keys)
max_chars: Maximum characters per chunk
overflow_factor: How far a single turn may exceed ``max_chars`` and still
be kept whole (>= 1.0).

Returns:
List of JSON-serialized chunks, each containing complete turns
"""

overflow_limit = int(max_chars * _CHUNK_OVERFLOW_FACTOR)
overflow_limit = int(max_chars * overflow_factor)

chunks = []
current_chunk = []
Expand Down Expand Up @@ -536,18 +543,20 @@ def _flush() -> None:
return chunks if chunks else [json.dumps(turns, ensure_ascii=False)]


def _chunk_jsonl(text: str, max_chars: int) -> list[str] | None:
def _chunk_jsonl(text: str, max_chars: int, overflow_factor: float = _CHUNK_OVERFLOW_FACTOR) -> list[str] | None:
"""Chunk newline-delimited JSON (JSONL) at line boundaries.

Detects JSONL — two or more non-empty lines, each a complete JSON object —
and packs whole lines into chunks so no line is split across chunks (multiple
short lines may share a chunk). A line that overflows is kept whole up to
``_CHUNK_OVERFLOW_FACTOR``× the budget, then split as text. Returns ``None``
``overflow_factor``× the budget, then split as text. Returns ``None``
if the input is not JSONL, so the caller falls back to plain-text splitting.

Args:
text: Input text to inspect/chunk.
max_chars: Maximum characters per chunk.
overflow_factor: How far a single line may exceed ``max_chars`` and still
be kept whole (>= 1.0).

Returns:
List of JSONL chunks (lines joined by newline), or ``None`` if not JSONL.
Expand All @@ -564,7 +573,7 @@ def _chunk_jsonl(text: str, max_chars: int) -> list[str] | None:
if not isinstance(obj, dict):
return None

overflow_limit = int(max_chars * _CHUNK_OVERFLOW_FACTOR)
overflow_limit = int(max_chars * overflow_factor)

chunks: list[str] = []
current_chunk: list[str] = []
Expand Down Expand Up @@ -1739,7 +1748,7 @@ async def extract_facts_from_text(
- chunks: List of tuples (chunk_text, fact_count) for each chunk
- usage: Aggregated token usage across all LLM calls
"""
chunks = chunk_text(text, max_chars=config.retain_chunk_size)
chunks = chunk_text(text, max_chars=config.retain_chunk_size, overflow_factor=config.retain_chunk_overflow_factor)

# Log chunk count before starting LLM requests
total_chars = sum(len(c) for c in chunks)
Expand Down Expand Up @@ -1923,7 +1932,9 @@ async def extract_facts_from_contents_batch_api(
prompt, response_schema = _build_extraction_prompt_and_schema(config)

for content_index, item in enumerate(contents):
chunks = chunk_text(item.content, max_chars=config.retain_chunk_size)
chunks = chunk_text(
item.content, max_chars=config.retain_chunk_size, overflow_factor=config.retain_chunk_overflow_factor
)

for chunk_index_in_content, chunk in enumerate(chunks):
all_chunks_info.append((chunk, content_index, chunk_index_in_content, item.event_date, item.context))
Expand Down Expand Up @@ -2344,7 +2355,7 @@ def _extract_facts_chunks(
global_chunk_idx = 0

for content_index, content in enumerate(contents):
chunks = chunk_text(content.content, config.retain_chunk_size)
chunks = chunk_text(content.content, config.retain_chunk_size, config.retain_chunk_overflow_factor)
for chunk in chunks:
chunks_metadata.append(
ChunkMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -864,10 +864,11 @@ async def retain_batch(
# retain code paths.
chunk_batch_size = getattr(config, "retain_chunk_batch_size", 100)
chunk_size = getattr(config, "retain_chunk_size", 3000)
overflow_factor = getattr(config, "retain_chunk_overflow_factor", 1.5)
all_pre_chunks: list[str] = []
chunk_to_content: list[int] = [] # maps chunk index -> index into contents
for content_idx, content in enumerate(contents):
content_chunks = fact_extraction.chunk_text(content.content, chunk_size)
content_chunks = fact_extraction.chunk_text(content.content, chunk_size, overflow_factor)
all_pre_chunks.extend(content_chunks)
chunk_to_content.extend([content_idx] * len(content_chunks))

Expand Down Expand Up @@ -2249,7 +2250,8 @@ def _chunk_contents_for_delta(contents: list[RetainContent], config) -> dict[int
global_chunk_idx = 0
for content in contents:
chunk_size = getattr(config, "retain_chunk_size", 3000)
chunks = fact_extraction.chunk_text(content.content, chunk_size)
overflow_factor = getattr(config, "retain_chunk_overflow_factor", 1.5)
chunks = fact_extraction.chunk_text(content.content, chunk_size, overflow_factor)
for chunk_text in chunks:
result[global_chunk_idx] = chunk_text
global_chunk_idx += 1
Expand Down
36 changes: 36 additions & 0 deletions hindsight-api-slim/tests/test_chunking.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,42 @@ def test_chunk_conversation_huge_turn_is_split():
assert len(chunk) <= int(20 * OVERFLOW_FACTOR)


# ---------------------------------------------------------------------------
# Configurable overflow factor (issue #2136)
# ---------------------------------------------------------------------------


def test_overflow_factor_keeps_larger_jsonl_line_whole():
"""A raised overflow_factor keeps a line whole that the default would split."""
# 49-char line, budget 20: default cap 30 splits it, but cap 3x = 60 keeps it.
big = json.dumps({"c": "y" * 40}) # 49 chars
small = json.dumps({"c": "ok"})
text = "\n".join([big, small])
assert len(big) == 49

default = chunk_text(text, max_chars=20)
raised = chunk_text(text, max_chars=20, overflow_factor=3.0)

# Default splits the big line into text fragments (more than 2 chunks).
assert len(default) > 2
# With the raised factor the big line is kept intact as its own chunk.
assert raised == [big, small]


def test_overflow_factor_keeps_larger_conversation_turn_whole():
"""A raised overflow_factor keeps a conversation turn whole, not split as text."""
turns = [{"c": "y" * 40}, {"c": "ok"}]
text = json.dumps(turns)

default = chunk_text(text, max_chars=20)
raised = chunk_text(text, max_chars=20, overflow_factor=4.0)

# Default splits the huge turn into text fragments.
assert len(default) > 2
# With the raised factor every chunk is a valid JSON array of whole turns.
assert [json.loads(c) for c in raised] == [[turns[0]], [turns[1]]]


# ---------------------------------------------------------------------------
# Detection guard
# ---------------------------------------------------------------------------
Expand Down
34 changes: 34 additions & 0 deletions hindsight-api-slim/tests/test_config_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def setup_test_env():
"HINDSIGHT_API_RETAIN_MAX_COMPLETION_TOKENS",
"HINDSIGHT_API_CONSOLIDATION_MAX_COMPLETION_TOKENS",
"HINDSIGHT_API_RETAIN_CHUNK_SIZE",
"HINDSIGHT_API_RETAIN_CHUNK_OVERFLOW_FACTOR",
"HINDSIGHT_API_LLM_PROVIDER",
"HINDSIGHT_API_LLM_MODEL",
"HINDSIGHT_API_LLM_REASONING_EFFORT",
Expand Down Expand Up @@ -106,6 +107,39 @@ def test_valid_retain_config_succeeds():
assert config.retain_chunk_size == 3000


def test_retain_chunk_overflow_factor_defaults_to_1_5():
"""The chunk overflow factor defaults to 1.5 when unset."""
from hindsight_api.config import HindsightConfig

os.environ["HINDSIGHT_API_LLM_PROVIDER"] = "mock"
os.environ.pop("HINDSIGHT_API_RETAIN_CHUNK_OVERFLOW_FACTOR", None)

config = HindsightConfig.from_env()
assert config.retain_chunk_overflow_factor == 1.5


def test_retain_chunk_overflow_factor_reads_from_env():
"""The chunk overflow factor is read from the environment."""
from hindsight_api.config import HindsightConfig

os.environ["HINDSIGHT_API_LLM_PROVIDER"] = "mock"
os.environ["HINDSIGHT_API_RETAIN_CHUNK_OVERFLOW_FACTOR"] = "3"

config = HindsightConfig.from_env()
assert config.retain_chunk_overflow_factor == 3.0


def test_retain_chunk_overflow_factor_below_one_fails():
"""A factor below 1.0 would split units that already fit — fail fast."""
from hindsight_api.config import HindsightConfig

os.environ["HINDSIGHT_API_LLM_PROVIDER"] = "mock"
os.environ["HINDSIGHT_API_RETAIN_CHUNK_OVERFLOW_FACTOR"] = "0.5"

with pytest.raises(ValueError, match="RETAIN_CHUNK_OVERFLOW_FACTOR"):
HindsightConfig.from_env()


def test_semantic_min_similarity_reads_from_env():
"""Semantic retrieval min similarity can be configured at the server level."""
from hindsight_api.config import HindsightConfig
Expand Down
Loading
Loading