Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion pyrit/backend/mappers/attack_mappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,17 @@ def attack_result_to_summary(
last_preview = stats.last_message_preview
labels = dict(stats.labels) if stats.labels else {}

# Resolution order for created_at: explicit metadata override, then the
# persisted AttackResult.timestamp, and finally datetime.now() as a
# last-resort fallback for never-persisted results.
created_str = ar.metadata.get("created_at")
updated_str = ar.metadata.get("updated_at")
created_at = datetime.fromisoformat(created_str) if created_str else datetime.now(timezone.utc)
if created_str:
created_at = datetime.fromisoformat(created_str)
elif ar.timestamp is not None:
created_at = ar.timestamp
else:
created_at = datetime.now(timezone.utc)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just an observation: I'm trying to find out where the metadata fields "create_at" is even set ... I can't seem to find any reference to it being set ...

I think we can fix that in a later change, @romanlutz if you know where this is populated, can you correct me please?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left this alone since you flagged for a later change. Happy to fold it in here if you'd rather not split it.

updated_at = datetime.fromisoformat(updated_str) if updated_str else created_at

aid = ar.get_attack_strategy_identifier()
Expand Down
3 changes: 2 additions & 1 deletion pyrit/memory/memory_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ def __init__(self, *, entry: AttackResult):
ref.conversation_id for ref in entry.get_conversations_by_type(ConversationType.ADVERSARIAL)
] or None

self.timestamp = datetime.now(tz=timezone.utc)
self.timestamp = entry.timestamp or datetime.now(tz=timezone.utc)
self.pyrit_version = pyrit.__version__

@staticmethod
Expand Down Expand Up @@ -894,6 +894,7 @@ def get_attack_result(self) -> AttackResult:
outcome_reason=self.outcome_reason,
related_conversations=related_conversations,
metadata=self.attack_metadata or {},
timestamp=_ensure_utc(self.timestamp) or datetime.now(tz=timezone.utc),
)


Expand Down
4 changes: 4 additions & 0 deletions pyrit/models/attack_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import functools
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import TYPE_CHECKING, Any, Optional, TypeVar

Expand Down Expand Up @@ -81,6 +82,9 @@ class AttackResult(StrategyResult):
# Optional reason for the outcome, providing additional context
outcome_reason: Optional[str] = None

# Wall-clock time the result was created or persisted.
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))

# Flexible conversation refs (nothing unused)
related_conversations: set[ConversationReference] = field(default_factory=set)

Expand Down
44 changes: 44 additions & 0 deletions tests/unit/backend/test_mappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,50 @@ def test_message_count_from_stats(self) -> None:

assert summary.message_count == 5

def test_created_at_prefers_ar_timestamp_when_metadata_absent(self) -> None:
"""When metadata['created_at'] is absent but ar.timestamp is set, use ar.timestamp."""
persisted_ts = datetime(2026, 4, 17, 12, 0, 0, tzinfo=timezone.utc)
ar = AttackResult(
conversation_id="attack-1",
objective="test",
outcome=AttackOutcome.SUCCESS,
timestamp=persisted_ts,
)
summary = attack_result_to_summary(ar, stats=ConversationStats(message_count=0))

assert summary.created_at == persisted_ts
assert summary.updated_at == persisted_ts

def test_created_at_metadata_still_wins_over_ar_timestamp(self) -> None:
"""When both metadata['created_at'] and ar.timestamp are set, metadata wins (backward compat)."""
metadata_ts = datetime(2026, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
ar_ts = datetime(2026, 4, 17, 12, 0, 0, tzinfo=timezone.utc)
ar = AttackResult(
conversation_id="attack-1",
objective="test",
outcome=AttackOutcome.SUCCESS,
timestamp=ar_ts,
metadata={"created_at": metadata_ts.isoformat()},
)
summary = attack_result_to_summary(ar, stats=ConversationStats(message_count=0))

assert summary.created_at == metadata_ts

def test_created_at_falls_back_to_now_when_both_absent(self) -> None:
"""When neither metadata nor ar.timestamp is set, fall back to datetime.now()."""
ar = AttackResult(
conversation_id="attack-1",
objective="test",
outcome=AttackOutcome.SUCCESS,
)
ar.timestamp = None # type: ignore[assignment]

before = datetime.now(timezone.utc)
summary = attack_result_to_summary(ar, stats=ConversationStats(message_count=0))
after = datetime.now(timezone.utc)

assert before <= summary.created_at <= after

"""Tests for pyrit_scores_to_dto function."""

def test_maps_scores(self) -> None:
Expand Down
75 changes: 74 additions & 1 deletion tests/unit/models/test_attack_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
# Licensed under the MIT license.

import warnings
from datetime import datetime, timezone

from pyrit.identifiers import ComponentIdentifier
from pyrit.identifiers.atomic_attack_identifier import build_atomic_attack_identifier
from pyrit.models.attack_result import AttackResult
from pyrit.memory.memory_models import AttackResultEntry
from pyrit.models.attack_result import AttackOutcome, AttackResult


class TestAttackResultDeprecation:
Expand Down Expand Up @@ -133,3 +135,74 @@ def test_constructor_with_no_identifier_at_all(self) -> None:
result = AttackResult(conversation_id="c1", objective="test")
assert result.atomic_attack_identifier is None
assert result.get_attack_strategy_identifier() is None


class TestAttackResultTimestamp:
"""Tests for the AttackResult.timestamp field and its round-trip through AttackResultEntry."""

def test_timestamp_defaults_to_now_utc_when_not_set(self) -> None:
"""AttackResult constructed without a timestamp gets a tz-aware UTC default."""
before = datetime.now(timezone.utc)
result = AttackResult(conversation_id="c1", objective="test")
after = datetime.now(timezone.utc)

assert result.timestamp is not None
assert result.timestamp.tzinfo is timezone.utc
assert before <= result.timestamp <= after

def test_timestamp_accepts_and_preserves_aware_datetime(self) -> None:
"""A tz-aware datetime passed to the constructor is stored as-is."""
ts = datetime(2026, 4, 17, 12, 0, 0, tzinfo=timezone.utc)
result = AttackResult(conversation_id="c1", objective="test", timestamp=ts)
assert result.timestamp == ts

def test_entry_preserves_timestamp_from_attack_result(self) -> None:
"""Constructing AttackResultEntry from an AttackResult preserves its timestamp."""
persisted_ts = datetime(2026, 4, 17, 12, 0, 0, tzinfo=timezone.utc)
original = AttackResult(
conversation_id="c1",
objective="test",
timestamp=persisted_ts,
)
entry = AttackResultEntry(entry=original)
assert entry.timestamp == persisted_ts

def test_entry_falls_back_to_now_when_attack_result_timestamp_missing(self) -> None:
"""If AttackResult.timestamp is explicitly None, entry stamps datetime.now()."""
original = AttackResult(conversation_id="c1", objective="test")
original.timestamp = None # type: ignore[assignment]

before = datetime.now(timezone.utc)
entry = AttackResultEntry(entry=original)
after = datetime.now(timezone.utc)

assert entry.timestamp is not None
assert entry.timestamp.tzinfo is timezone.utc
assert before <= entry.timestamp <= after

def test_timestamp_roundtrips_through_attack_result_entry(self) -> None:
"""AttackResultEntry.timestamp is surfaced on the hydrated AttackResult."""
original = AttackResult(
conversation_id="c1",
objective="test",
outcome=AttackOutcome.SUCCESS,
)
entry = AttackResultEntry(entry=original)
persisted_ts = datetime(2026, 4, 17, 12, 0, 0, tzinfo=timezone.utc)
entry.timestamp = persisted_ts

hydrated = entry.get_attack_result()

assert hydrated.timestamp == persisted_ts

def test_naive_entry_timestamp_is_normalized_to_utc_on_hydration(self) -> None:
"""SQLite returns naive datetimes; hydration must attach UTC tzinfo."""
original = AttackResult(conversation_id="c1", objective="test")
entry = AttackResultEntry(entry=original)
entry.timestamp = datetime(2026, 4, 17, 12, 0, 0) # noqa: DTZ001

hydrated = entry.get_attack_result()

assert hydrated.timestamp is not None
assert hydrated.timestamp.tzinfo is timezone.utc
assert hydrated.timestamp.replace(tzinfo=None) == datetime(2026, 4, 17, 12, 0, 0) # noqa: DTZ001