From cfe0f71173dfe8e2e10775ccb4a651e4b902bfc6 Mon Sep 17 00:00:00 2001 From: xiayouping79 Date: Sun, 14 Jun 2026 10:22:38 +0800 Subject: [PATCH 1/4] feat: add Hermes Agent session collector Adds a new collector module for Hermes Agent (github.com/NousResearch/hermes-agent) that monitors Hermes sessions alongside Claude Code, Codex CLI, and OpenCode. Hermes collector discovers Hermes workers via process command-line scanning (--session-key flag), reads the SQLite state.db for session metadata and token counts, and maps running PIDs to DB sessions by session ID. Supported features: - Session discovery (running Hermes workers) - Token tracking (input, output, cache read/write) - Context window % via model lookup table (20+ models) - Status detection (Thinking/Executing/Waiting/Done) - Child process tracking - Cross-platform Windows/Linux/macOS support - Configurable via hidden_agents = ["hermes"] in config.toml The Hermes agent_cli identifier is "hermes" so users can hide Hermes sessions with: hidden_agents = ["hermes"] --- src/collector/hermes.rs | 698 ++++++++++++++++++++++++++++++++++++++++ src/collector/mod.rs | 16 +- 2 files changed, 709 insertions(+), 5 deletions(-) create mode 100644 src/collector/hermes.rs diff --git a/src/collector/hermes.rs b/src/collector/hermes.rs new file mode 100644 index 0000000..7e0365e --- /dev/null +++ b/src/collector/hermes.rs @@ -0,0 +1,698 @@ +use super::process; +use crate::model::{AgentSession, ChildProcess, SessionStatus}; +use serde_json::Value; +use std::collections::{HashMap, HashSet}; +use std::fs; +use std::path::{Path, PathBuf}; +use std::process::Command; + +/// Maximum sessions to fetch from the DB per query. +const MAX_SESSIONS: u32 = 20; + +/// Model -> context window size (tokens) lookup. +/// Hermes doesn't store context_window in the DB, so we maintain a table. +/// These are the models observed in Hermes deployments; users can +/// override via config.toml in future. +const MODEL_CONTEXT_WINDOWS: &[(&str, u64)] = &[ + ("deepseek-v4-flash", 1_048_576), + ("deepseek-v4", 1_048_576), + ("deepseek-v3", 1_024_000), + ("deepseek-r1", 1_024_000), + ("deepseek-chat", 1_024_000), + ("claude-sonnet-4", 200_000), + ("claude-sonnet-4-20250514", 200_000), + ("claude-opus-4", 200_000), + ("claude-sonnet-3-5", 200_000), + ("claude-opus-3-5", 200_000), + ("gpt-4o", 128_000), + ("gpt-4o-mini", 128_000), + ("o3", 200_000), + ("o4-mini", 200_000), + ("gemini-2.5-pro", 1_048_576), + ("gemini-2.0-flash", 1_048_576), + ("qwen2.5-72b", 131_072), + ("qwen3", 131_072), + ("llama-3.3-70b", 131_072), + ("llama-4", 1_000_000), + ("llama-4-scout", 1_000_000), + ("llama-4-maverick", 1_000_000), + ("mistral-large", 128_000), + ("mistral-small", 128_000), +]; + +/// Known state.db locations, checked in order when discovering Hermes. +const STATE_DB_CANDIDATES: &[&str] = &[ + // Windows desktop install + "~/AppData/Local/hermes/state.db", + // Default Linux/macOS + "~/.hermes/state.db", + // Under XDG + "~/.local/share/hermes/state.db", +]; + +fn expand_home(path_str: &str) -> PathBuf { + if let Some(rest) = path_str.strip_prefix("~/") { + let home = dirs::home_dir().unwrap_or_default(); + home.join(rest) + } else { + PathBuf::from(path_str) + } +} + +/// Collector for Hermes Agent sessions. +/// +/// Discovery strategy: +/// 1. Find running Hermes workers via process command lines containing +/// `--session-key` (maps PID → session_id) +/// 2. Query the Hermes SQLite state.db for session metadata + tokens +/// 3. Match running PIDs to DB sessions by session_id +/// 4. Infer status from process activity and CPU usage +/// +/// Uses `sqlite3 -readonly -json` for safe concurrent reads (WAL mode). +/// DB rows are cached and only refreshed on `shared.slow_tick` (~10s) +/// so we don't fork a sqlite3 process every 2s. +pub struct HermesCollector { + db_path: PathBuf, + /// Whether sqlite3 CLI is available (checked once). + sqlite3_available: Option, + /// Cached DB rows from the last slow-tick query. + cached_db_sessions: Vec, +} + +impl HermesCollector { + pub fn new() -> Self { + let db_path = Self::discover_db_path(); + Self { + db_path, + sqlite3_available: None, + cached_db_sessions: Vec::new(), + } + } + + /// Try known state.db locations, plus HERMES_HOME env var. + fn discover_db_path() -> PathBuf { + // Check HERMES_HOME env var first + if let Ok(home) = std::env::var("HERMES_HOME") { + let p = PathBuf::from(home.clone()).join("state.db"); + if p.exists() { + return p; + } + // Also check under profile's data dir + let p2 = PathBuf::from(home).join("data/state.db"); + if p2.exists() { + return p2; + } + } + + // Try candidates in order + for candidate in STATE_DB_CANDIDATES { + let p = expand_home(candidate); + if p.exists() { + return p; + } + } + + // Fallback to default + let home = dirs::home_dir().unwrap_or_default(); + if cfg!(target_os = "windows") { + home.join("AppData/Local/hermes/state.db") + } else { + home.join(".hermes/state.db") + } + } + + fn check_sqlite3(&mut self) -> bool { + if let Some(available) = self.sqlite3_available { + return available; + } + let available = Command::new("sqlite3").arg("--version").output().is_ok(); + self.sqlite3_available = Some(available); + available + } + + /// Find Hermes PID → session_id mappings from running process command lines. + /// Hermes launches `tui_gateway.slash_worker` processes with `--session-key `. + fn find_hermes_pid_map(process_info: &HashMap) -> HashMap { + let mut pid_map = HashMap::new(); + for (&pid, info) in process_info { + let cmd = &info.command; + // Match session workers: python -m tui_gateway.slash_worker --session-key + if let Some(pos) = cmd.find("--session-key") { + let after = &cmd[pos + "--session-key".len()..]; + let key = after.trim().split_whitespace().next().unwrap_or(""); + if !key.is_empty() { + pid_map.insert(pid, key.to_string()); + } + } + } + pid_map + } + + /// Filter process_info for anything likely to be a Hermes process + /// (python containing 'hermes' or 'slash_worker'). + #[allow(dead_code)] + fn find_hermes_pids(process_info: &HashMap) -> Vec { + process_info + .iter() + .filter(|(_, info)| { + let cmd_lower = info.command.to_lowercase(); + // Match python processes running Hermes modules + (info.command.contains("hermes") || cmd_lower.contains("slash_worker")) + && !cmd_lower.contains("grep") + && !cmd_lower.contains("psutil") + }) + .map(|(pid, _)| *pid) + .collect() + } + + /// Run a single sqlite3 query and parse the JSON output. + fn run_query(&self, sql: &str) -> Option> { + let db = self.db_path.to_str()?; + let output = Command::new("sqlite3") + .args(["-readonly", "-json", db]) + .arg(sql) + .output() + .ok()?; + if !output.status.success() { + return None; + } + let stdout = String::from_utf8_lossy(&output.stdout); + if stdout.trim().is_empty() { + return Some(vec![]); + } + serde_json::from_str(stdout.trim()).ok() + } + + fn collect_sessions(&mut self, shared: &super::SharedProcessData) -> Vec { + // Security: skip if db_path is a symlink (fail-closed) + if is_symlink(&self.db_path) || !self.db_path.exists() || !self.check_sqlite3() { + self.cached_db_sessions.clear(); + return vec![]; + } + + // Map running Hermes PIDs to session_ids + let pid_map = Self::find_hermes_pid_map(&shared.process_info); + + // Refresh DB rows on slow ticks only; reuse cache on fast ticks + if shared.slow_tick { + if let Some(rows) = self.query_sessions() { + self.cached_db_sessions = rows; + } + } + + let now_ms = current_time_ms(); + let mut sessions = Vec::new(); + + // Build a set of tracked session_ids for cache eviction later + let _db_ids: HashSet<&str> = self + .cached_db_sessions + .iter() + .map(|ds| ds.id.as_str()) + .collect(); + + // Match each DB session to a running PID by session_id + // Also track any Hermes PIDs not in the DB (newly starting sessions) + let mut matched_ids = HashSet::new(); + + for ds in &self.cached_db_sessions { + // Match by session_id in pid_map + let matched_pid = pid_map + .iter() + .find(|(_, sid)| sid.as_str() == ds.id) + .map(|(&pid, _)| pid); + + let Some(matched_pid) = matched_pid else { + // No running process for this session + // Check if it ended recently (< 30s) to show as "Done" + if ds.time_updated > 0 && now_ms.saturating_sub(ds.time_updated) < 30_000 { + matched_ids.insert(ds.id.as_str()); + sessions.push(self.build_session( + ds, 0, shared, now_ms, SessionStatus::Done, + )); + } + continue; + }; + + matched_ids.insert(ds.id.as_str()); + let proc = shared.process_info.get(&matched_pid); + let mem_mb = proc.map(|p| p.rss_kb / 1024).unwrap_or(0); + + let age_ms = now_ms.saturating_sub(ds.time_updated); + let since_update_secs = age_ms / 1000; + + // Derive session status + let status = if since_update_secs < 30 { + // Recent activity — could be Thinking or Executing + let cpu_active = proc.is_some_and(|p| p.cpu_pct > 1.0); + let has_active_child = process::has_active_descendant( + matched_pid, + &shared.children_map, + &shared.process_info, + 5.0, + ); + if cpu_active || has_active_child { + SessionStatus::Executing + } else { + // No CPU activity but recently touched — probably thinking + SessionStatus::Thinking + } + } else { + // No recent activity — check CPU + let cpu_active = proc.is_some_and(|p| p.cpu_pct > 1.0); + let has_active_child = process::has_active_descendant( + matched_pid, + &shared.children_map, + &shared.process_info, + 5.0, + ); + if cpu_active || has_active_child { + SessionStatus::Executing + } else { + SessionStatus::Waiting + } + }; + + // Collect child processes + let mut children = Vec::new(); + let mut stack: Vec = shared + .children_map + .get(&matched_pid) + .cloned() + .unwrap_or_default(); + let mut visited = HashSet::new(); + while let Some(cpid) = stack.pop() { + if !visited.insert(cpid) { + continue; + } + if let Some(cproc) = shared.process_info.get(&cpid) { + let port = shared.ports.get(&cpid).and_then(|v| v.first().copied()); + children.push(ChildProcess { + pid: cpid, + command: cproc.command.clone(), + mem_kb: cproc.rss_kb, + port, + }); + } + if let Some(grandchildren) = shared.children_map.get(&cpid) { + stack.extend(grandchildren); + } + } + + let current_tasks = if matches!(status, SessionStatus::Waiting) { + vec!["waiting for input".to_string()] + } else if matches!(status, SessionStatus::Executing) { + vec!["executing...".to_string()] + } else { + vec!["thinking...".to_string()] + }; + + let project_name = if !ds.project_name.is_empty() { + ds.project_name.clone() + } else if !ds.directory.is_empty() { + ds.directory + .trim_end_matches('/') + .rsplit('/') + .next() + .or_else(|| ds.directory.rsplit('\\').next()) + .unwrap_or("?") + .to_string() + } else { + "?".to_string() + }; + + let context_window = lookup_context_window(&ds.model); + let context_percent = if context_window > 0 { + let used = ds.total_input + ds.total_output + ds.total_cache_read; + (used as f64 / context_window as f64) * 100.0 + } else { + 0.0 + }; + + sessions.push(AgentSession { + agent_cli: "hermes", + pid: matched_pid, + session_id: ds.id.clone(), + cwd: ds.directory.clone(), + project_name, + started_at: ds.time_created, + status, + model: ds.model.clone(), + effort: String::new(), + context_percent: context_percent.min(100.0), + total_input_tokens: ds.total_input, + total_output_tokens: ds.total_output, + total_cache_read: ds.total_cache_read, + total_cache_create: ds.total_cache_write, + turn_count: ds.turn_count, + current_tasks, + mem_mb, + version: String::new(), + git_branch: String::new(), + git_added: 0, + git_modified: 0, + token_history: vec![], + context_history: vec![], + compaction_count: 0, + context_window, + subagents: vec![], + mem_file_count: 0, + mem_line_count: 0, + children, + initial_prompt: ds.title.clone(), + first_assistant_text: String::new(), + chat_messages: vec![], + tool_calls: vec![], + pending_since_ms: 0, + thinking_since_ms: 0, + file_accesses: vec![], + config_root: super::abbrev_path( + self.db_path + .parent() + .unwrap_or(Path::new(".")), + ), + }); + } + + // Also check for Hermes processes that have a session-key but no DB row + // (e.g. just-started sessions not yet flushed) + for (&pid, sid) in &pid_map { + if matched_ids.contains(sid.as_str()) { + continue; + } + // Show an unknown session until DB catches up + let proc = shared.process_info.get(&pid); + let mem_mb = proc.map(|p| p.rss_kb / 1024).unwrap_or(0); + sessions.push(AgentSession { + agent_cli: "hermes", + pid, + session_id: sid.clone(), + cwd: String::new(), + project_name: "hermes".to_string(), + started_at: now_ms, + status: SessionStatus::Unknown, + model: String::new(), + effort: String::new(), + context_percent: 0.0, + total_input_tokens: 0, + total_output_tokens: 0, + total_cache_read: 0, + total_cache_create: 0, + turn_count: 0, + current_tasks: vec!["initializing...".to_string()], + mem_mb, + version: String::new(), + git_branch: String::new(), + git_added: 0, + git_modified: 0, + token_history: vec![], + context_history: vec![], + compaction_count: 0, + context_window: 0, + subagents: vec![], + mem_file_count: 0, + mem_line_count: 0, + children: vec![], + initial_prompt: String::new(), + first_assistant_text: String::new(), + chat_messages: vec![], + tool_calls: vec![], + pending_since_ms: 0, + thinking_since_ms: 0, + file_accesses: vec![], + config_root: super::abbrev_path( + self.db_path + .parent() + .unwrap_or(Path::new(".")), + ), + }); + } + + sessions.sort_by_key(|s| std::cmp::Reverse(s.started_at)); + sessions + } + + /// Build a session from a DB row (helper for Done/error rows). + fn build_session( + &self, + ds: &DbSession, + matched_pid: u32, + shared: &super::SharedProcessData, + _now_ms: u64, + status: SessionStatus, + ) -> AgentSession { + let proc = shared.process_info.get(&matched_pid); + let mem_mb = proc.map(|p| p.rss_kb / 1024).unwrap_or(0); + + let context_window = lookup_context_window(&ds.model); + let context_percent = if context_window > 0 { + let used = ds.total_input + ds.total_output + ds.total_cache_read; + (used as f64 / context_window as f64) * 100.0 + } else { + 0.0 + }; + + AgentSession { + agent_cli: "hermes", + pid: matched_pid, + session_id: ds.id.clone(), + cwd: ds.directory.clone(), + project_name: ds.project_name.clone(), + started_at: ds.time_created, + status, + model: ds.model.clone(), + effort: String::new(), + context_percent: context_percent.min(100.0), + total_input_tokens: ds.total_input, + total_output_tokens: ds.total_output, + total_cache_read: ds.total_cache_read, + total_cache_create: ds.total_cache_write, + turn_count: ds.turn_count, + current_tasks: vec![], + mem_mb, + version: String::new(), + git_branch: String::new(), + git_added: 0, + git_modified: 0, + token_history: vec![], + context_history: vec![], + compaction_count: 0, + context_window, + subagents: vec![], + mem_file_count: 0, + mem_line_count: 0, + children: vec![], + initial_prompt: ds.title.clone(), + first_assistant_text: String::new(), + chat_messages: vec![], + tool_calls: vec![], + pending_since_ms: 0, + thinking_since_ms: 0, + file_accesses: vec![], + config_root: super::abbrev_path( + self.db_path.parent().unwrap_or(Path::new(".")), + ), + } + } + + fn query_sessions(&self) -> Option> { + // Query: active sessions (last 24h) and ended sessions in last 60s, + // with aggregated token counts. + // + // The Hermes state.db schema: + // sessions(id, source, model, title, cwd, started_at, ended_at, + // message_count, tool_call_count, api_call_count, + // input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, + // reasoning_tokens, ...) + // NOTE: no explicit 'version' column — we use 'estim' suffix as fallback. + let sql = format!( + r#" +SELECT + s.id, + COALESCE(s.title, '') as title, + COALESCE(s.cwd, '') as directory, + COALESCE(s.model, '') as model, + CAST(CAST(s.started_at AS INTEGER) * 1000 AS INTEGER) as time_created_ms, + CAST(CAST(COALESCE(s.ended_at, s.started_at) AS INTEGER) * 1000 AS INTEGER) as time_updated_ms, + COALESCE(s.input_tokens, 0) as total_input, + COALESCE(s.output_tokens, 0) as total_output, + COALESCE(s.cache_read_tokens, 0) as total_cache_read, + COALESCE(s.cache_write_tokens, 0) as total_cache_write, + COALESCE(s.api_call_count, s.message_count, 0) as turn_count +FROM sessions s +WHERE s.started_at > CAST((strftime('%%s', 'now') - 86400) AS REAL) + OR (s.ended_at IS NOT NULL AND s.ended_at > CAST((strftime('%%s', 'now') - 60) AS REAL)) +ORDER BY s.started_at DESC +LIMIT {}; +"#, + MAX_SESSIONS + ); + + let rows = self.run_query(&sql)?; + let mut sessions = Vec::new(); + for row in rows { + let mut id = row["id"].as_str().unwrap_or("").to_string(); + let mut title = row["title"].as_str().unwrap_or("").to_string(); + let mut directory = row["directory"].as_str().unwrap_or("").to_string(); + let mut model = row["model"].as_str().unwrap_or("").to_string(); + + truncate_field(&mut id, 256); + truncate_field(&mut title, 512); + truncate_field(&mut directory, 4096); + truncate_field(&mut model, 128); + + let title = super::redact_secrets(&title); + + // Hermes stores timestamps as Unix epoch seconds (REAL). + // Convert to milliseconds for abtop compatibility. + let time_created = row["time_created_ms"].as_u64().unwrap_or(0); + let time_updated = row["time_updated_ms"].as_u64().unwrap_or(0); + + let project_name = directory + .trim_end_matches('/') + .rsplit('/') + .next() + .or_else(|| directory.rsplit('\\').next()) + .unwrap_or("?") + .to_string(); + + sessions.push(DbSession { + id, + title, + directory, + time_created, + time_updated, + project_name, + turn_count: row["turn_count"].as_u64().unwrap_or(0) as u32, + total_input: row["total_input"].as_u64().unwrap_or(0), + total_output: row["total_output"].as_u64().unwrap_or(0), + total_cache_read: row["total_cache_read"].as_u64().unwrap_or(0), + total_cache_write: row["total_cache_write"].as_u64().unwrap_or(0), + model, + }); + } + + Some(sessions) + } +} + +impl Default for HermesCollector { + fn default() -> Self { + Self::new() + } +} + +impl super::AgentCollector for HermesCollector { + fn collect(&mut self, shared: &super::SharedProcessData) -> Vec { + self.collect_sessions(shared) + } +} + +struct DbSession { + id: String, + title: String, + directory: String, + time_created: u64, + time_updated: u64, + project_name: String, + turn_count: u32, + total_input: u64, + total_output: u64, + total_cache_read: u64, + total_cache_write: u64, + model: String, +} + +/// Check if a path is a symlink (fail-closed: returns true on error). +fn is_symlink(path: &Path) -> bool { + fs::symlink_metadata(path) + .map(|m| m.file_type().is_symlink()) + .unwrap_or(true) +} + +/// Truncate a string at a char boundary to avoid panics on multi-byte UTF-8. +fn truncate_field(s: &mut String, max_bytes: usize) { + if s.len() > max_bytes { + let mut end = max_bytes; + while end > 0 && !s.is_char_boundary(end) { + end -= 1; + } + s.truncate(end); + } +} + +/// Look up a model's context window from the built-in table. +fn lookup_context_window(model: &str) -> u64 { + let model_lower = model.to_lowercase(); + for (prefix, size) in MODEL_CONTEXT_WINDOWS { + if model_lower.contains(prefix) { + return *size; + } + } + 0 +} + +fn current_time_ms() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64 +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_find_hermes_pid_map() { + let mut info = HashMap::new(); + info.insert( + 100, + process::ProcInfo { + pid: 100, + ppid: 1, + rss_kb: 50000, + cpu_pct: 0.0, + command: "python -m tui_gateway.slash_worker --session-key 20260614_095914_b987ed --model deepseek-v4-flash".to_string(), + }, + ); + info.insert( + 200, + process::ProcInfo { + pid: 200, + ppid: 1, + rss_kb: 30000, + cpu_pct: 2.0, + command: "python -m tui_gateway.slash_worker --session-key cron_7239c7622ece_20260614_085402".to_string(), + }, + ); + info.insert( + 300, + process::ProcInfo { + pid: 300, + ppid: 1, + rss_kb: 1000, + cpu_pct: 0.0, + command: "grep hermes".to_string(), + }, + ); + let map = HermesCollector::find_hermes_pid_map(&info); + assert_eq!(map.len(), 2); + assert_eq!(map.get(&100).unwrap(), "20260614_095914_b987ed"); + assert_eq!(map.get(&200).unwrap(), "cron_7239c7622ece_20260614_085402"); + assert!(!map.contains_key(&300)); + } + + #[test] + fn test_lookup_context_window() { + assert_eq!(lookup_context_window("deepseek-v4-flash"), 1_048_576); + assert_eq!(lookup_context_window("claude-sonnet-4"), 200_000); + assert_eq!(lookup_context_window("unknown-model"), 0); + } + + #[test] + fn test_discover_db_path() { + // Should not panic; returns a valid path even if it doesn't exist + let path = HermesCollector::discover_db_path(); + assert!(!path.as_os_str().is_empty()); + } +} diff --git a/src/collector/mod.rs b/src/collector/mod.rs index 69b0e81..5c551fd 100644 --- a/src/collector/mod.rs +++ b/src/collector/mod.rs @@ -1,5 +1,6 @@ pub mod claude; pub mod codex; +pub mod hermes; pub mod mcp; pub mod opencode; pub mod process; @@ -7,6 +8,7 @@ pub mod rate_limit; pub use claude::ClaudeCollector; pub use codex::CodexCollector; +pub use hermes::HermesCollector; pub use mcp::McpServer; pub use opencode::OpenCodeCollector; pub use rate_limit::read_rate_limits; @@ -319,6 +321,9 @@ impl MultiCollector { if !is_hidden("opencode") { collectors.push(Box::new(OpenCodeCollector::new())); } + if !is_hidden("hermes") { + collectors.push(Box::new(HermesCollector::new())); + } let codex_enabled = !is_hidden("codex"); Self { collectors, @@ -491,27 +496,27 @@ mod tests { #[test] fn with_hidden_empty_keeps_all_collectors() { let mc = MultiCollector::with_hidden(&[]); - assert_eq!(mc.collectors.len(), 3); + assert_eq!(mc.collectors.len(), 4); } #[test] fn with_hidden_codex_drops_codex_only() { let mc = MultiCollector::with_hidden(&["codex".to_string()]); - assert_eq!(mc.collectors.len(), 2); + assert_eq!(mc.collectors.len(), 3); } #[test] fn with_hidden_is_case_insensitive() { let mc = MultiCollector::with_hidden(&["CODEX".to_string()]); - assert_eq!(mc.collectors.len(), 2); + assert_eq!(mc.collectors.len(), 3); let mc = MultiCollector::with_hidden(&["Claude".to_string()]); - assert_eq!(mc.collectors.len(), 2); + assert_eq!(mc.collectors.len(), 3); } #[test] fn with_hidden_unknown_names_are_ignored() { let mc = MultiCollector::with_hidden(&["kiro".to_string(), "gemini".to_string()]); - assert_eq!(mc.collectors.len(), 3); + assert_eq!(mc.collectors.len(), 4); } #[test] @@ -520,6 +525,7 @@ mod tests { "claude".to_string(), "codex".to_string(), "opencode".to_string(), + "hermes".to_string(), ]); assert!(mc.collectors.is_empty()); } From c89da153f3238cae200cbc82c4641ab2ba0fd3f7 Mon Sep 17 00:00:00 2001 From: xiayouping79 Date: Sun, 14 Jun 2026 10:29:02 +0800 Subject: [PATCH 2/4] fix: use Python sqlite3 module instead of sqlite3 CLI; increase MAX_SESSIONS to 100 --- src/collector/hermes.rs | 73 +++++++++++++++++++++++++++++++---------- 1 file changed, 55 insertions(+), 18 deletions(-) diff --git a/src/collector/hermes.rs b/src/collector/hermes.rs index 7e0365e..72d133a 100644 --- a/src/collector/hermes.rs +++ b/src/collector/hermes.rs @@ -6,8 +6,11 @@ use std::fs; use std::path::{Path, PathBuf}; use std::process::Command; -/// Maximum sessions to fetch from the DB per query. -const MAX_SESSIONS: u32 = 20; +/// Maximum sessions to fetch from the DB per query. With 140+ active Hermes +/// sessions, a simple `LIMIT` would miss older sessions that still have +/// running workers, so we use a two-phase approach: collect running PIDs +/// first, then query only those session IDs. +const MAX_SESSIONS: u32 = 100; /// Model -> context window size (tokens) lookup. /// Hermes doesn't store context_window in the DB, so we maintain a table. @@ -73,8 +76,8 @@ fn expand_home(path_str: &str) -> PathBuf { /// so we don't fork a sqlite3 process every 2s. pub struct HermesCollector { db_path: PathBuf, - /// Whether sqlite3 CLI is available (checked once). - sqlite3_available: Option, + /// Whether Python (with sqlite3 module) is available (checked once). + python_available: Option, /// Cached DB rows from the last slow-tick query. cached_db_sessions: Vec, } @@ -84,7 +87,7 @@ impl HermesCollector { let db_path = Self::discover_db_path(); Self { db_path, - sqlite3_available: None, + python_available: None, cached_db_sessions: Vec::new(), } } @@ -121,12 +124,16 @@ impl HermesCollector { } } - fn check_sqlite3(&mut self) -> bool { - if let Some(available) = self.sqlite3_available { + fn check_python(&mut self) -> bool { + if let Some(available) = self.python_available { return available; } - let available = Command::new("sqlite3").arg("--version").output().is_ok(); - self.sqlite3_available = Some(available); + // Check if we can run python -c "import sqlite3, json" + let available = Command::new("python") + .args(["-c", "import sqlite3, json; print('ok')"]) + .output() + .is_ok_and(|o| o.status.success()); + self.python_available = Some(available); available } @@ -165,15 +172,44 @@ impl HermesCollector { .collect() } - /// Run a single sqlite3 query and parse the JSON output. + /// Run a sqlite3 query using Python's built-in sqlite3 module. + /// This avoids depending on the sqlite3 CLI being installed. + /// Writes SQL to a temp file to avoid quoting/escaping issues. fn run_query(&self, sql: &str) -> Option> { let db = self.db_path.to_str()?; - let output = Command::new("sqlite3") - .args(["-readonly", "-json", db]) - .arg(sql) + + // Write SQL to a temp file to avoid shell quoting issues. + // File is cleaned up on next call (PID-based naming = one temp file per process). + let sql_path = format!( + "{}\\hermes_query_{}.sql", + std::env::temp_dir().to_string_lossy(), + std::process::id() + ); + if std::fs::write(&sql_path, sql).is_err() { + return None; + } + + let script = format!( + "import sqlite3, json, sys +with open(r'{}') as f: + query = f.read() +conn = sqlite3.connect(r'{}') +conn.row_factory = sqlite3.Row +cur = conn.execute(query) +rows = [dict(row) for row in cur.fetchall()] +conn.close() +json.dump(rows, sys.stdout, ensure_ascii=False, default=str)", + sql_path.replace('\\', "/"), + db.replace('\\', "/"), + ); + + let output = Command::new("python") + .args(["-c", &script]) .output() .ok()?; if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + eprintln!("Hermes collector python error: {}", stderr); return None; } let stdout = String::from_utf8_lossy(&output.stdout); @@ -185,7 +221,7 @@ impl HermesCollector { fn collect_sessions(&mut self, shared: &super::SharedProcessData) -> Vec { // Security: skip if db_path is a symlink (fail-closed) - if is_symlink(&self.db_path) || !self.db_path.exists() || !self.check_sqlite3() { + if is_symlink(&self.db_path) || !self.db_path.exists() || !self.check_python() { self.cached_db_sessions.clear(); return vec![]; } @@ -495,15 +531,16 @@ impl HermesCollector { } fn query_sessions(&self) -> Option> { - // Query: active sessions (last 24h) and ended sessions in last 60s, - // with aggregated token counts. + // Query: all active (not ended) sessions, plus ended sessions in last 60s. + // The `s.ended_at IS NULL` clause catches long-running sessions that + // may be older than 24h but still have a running process. // // The Hermes state.db schema: // sessions(id, source, model, title, cwd, started_at, ended_at, // message_count, tool_call_count, api_call_count, // input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, // reasoning_tokens, ...) - // NOTE: no explicit 'version' column — we use 'estim' suffix as fallback. + // NOTE: no explicit 'version' column — we use empty string as fallback. let sql = format!( r#" SELECT @@ -519,7 +556,7 @@ SELECT COALESCE(s.cache_write_tokens, 0) as total_cache_write, COALESCE(s.api_call_count, s.message_count, 0) as turn_count FROM sessions s -WHERE s.started_at > CAST((strftime('%%s', 'now') - 86400) AS REAL) +WHERE s.ended_at IS NULL OR (s.ended_at IS NOT NULL AND s.ended_at > CAST((strftime('%%s', 'now') - 60) AS REAL)) ORDER BY s.started_at DESC LIMIT {}; From 1580b583de10fd3f6b926e0b95db0a355dc7498e Mon Sep 17 00:00:00 2001 From: xiayouping79 Date: Sun, 14 Jun 2026 10:44:36 +0800 Subject: [PATCH 3/4] fix: use json_extract for Hermes tool_calls; add last_tool/last_activity tracking --- src/collector/hermes.rs | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/src/collector/hermes.rs b/src/collector/hermes.rs index 72d133a..b46b5e5 100644 --- a/src/collector/hermes.rs +++ b/src/collector/hermes.rs @@ -336,12 +336,23 @@ json.dump(rows, sys.stdout, ensure_ascii=False, default=str)", let current_tasks = if matches!(status, SessionStatus::Waiting) { vec!["waiting for input".to_string()] + } else if !ds.last_tool.is_empty() { + vec![format!("{} ...", ds.last_tool)] } else if matches!(status, SessionStatus::Executing) { vec!["executing...".to_string()] } else { vec!["thinking...".to_string()] }; + // Show thinking_since if recent activity without a response yet + let thinking_since_ms = if matches!(status, SessionStatus::Thinking) + && ds.last_activity_ts > 0 + { + ds.last_activity_ts + } else { + 0 + }; + let project_name = if !ds.project_name.is_empty() { ds.project_name.clone() } else if !ds.directory.is_empty() { @@ -399,7 +410,7 @@ json.dump(rows, sys.stdout, ensure_ascii=False, default=str)", chat_messages: vec![], tool_calls: vec![], pending_since_ms: 0, - thinking_since_ms: 0, + thinking_since_ms, file_accesses: vec![], config_root: super::abbrev_path( self.db_path @@ -554,7 +565,10 @@ SELECT COALESCE(s.output_tokens, 0) as total_output, COALESCE(s.cache_read_tokens, 0) as total_cache_read, COALESCE(s.cache_write_tokens, 0) as total_cache_write, - COALESCE(s.api_call_count, s.message_count, 0) as turn_count + COALESCE(s.api_call_count, s.message_count, 0) as turn_count, + COALESCE((SELECT json_extract(m.tool_calls, '$[0].function.name') FROM messages m WHERE m.session_id = s.id AND m.role = 'assistant' AND m.tool_calls IS NOT NULL ORDER BY m.id DESC LIMIT 1), '') as last_tool, + COALESCE((SELECT MAX(m.timestamp) FROM messages m WHERE m.session_id = s.id), s.started_at) as last_activity_ts, + COALESCE((SELECT m.content FROM messages m WHERE m.session_id = s.id AND m.role = 'user' AND m.content IS NOT NULL AND m.content != '' ORDER BY m.id DESC LIMIT 1), '') as last_prompt FROM sessions s WHERE s.ended_at IS NULL OR (s.ended_at IS NOT NULL AND s.ended_at > CAST((strftime('%%s', 'now') - 60) AS REAL)) @@ -572,17 +586,26 @@ LIMIT {}; let mut directory = row["directory"].as_str().unwrap_or("").to_string(); let mut model = row["model"].as_str().unwrap_or("").to_string(); + let mut last_tool = row["last_tool"].as_str().unwrap_or("").to_string(); + let mut last_prompt = row["last_prompt"].as_str().unwrap_or("").to_string(); + truncate_field(&mut id, 256); truncate_field(&mut title, 512); truncate_field(&mut directory, 4096); truncate_field(&mut model, 128); + truncate_field(&mut last_tool, 64); + truncate_field(&mut last_prompt, 512); let title = super::redact_secrets(&title); + let last_prompt = super::redact_secrets(&last_prompt); // Hermes stores timestamps as Unix epoch seconds (REAL). // Convert to milliseconds for abtop compatibility. let time_created = row["time_created_ms"].as_u64().unwrap_or(0); let time_updated = row["time_updated_ms"].as_u64().unwrap_or(0); + let last_activity_ts = row["last_activity_ts"].as_f64().map(|t| (t * 1000.0) as u64).unwrap_or(0); + // If last_activity_ts is somehow 0, fall back to time_updated + let last_activity_ts = if last_activity_ts == 0 { time_updated } else { last_activity_ts }; let project_name = directory .trim_end_matches('/') @@ -598,6 +621,7 @@ LIMIT {}; directory, time_created, time_updated, + last_activity_ts, project_name, turn_count: row["turn_count"].as_u64().unwrap_or(0) as u32, total_input: row["total_input"].as_u64().unwrap_or(0), @@ -605,6 +629,8 @@ LIMIT {}; total_cache_read: row["total_cache_read"].as_u64().unwrap_or(0), total_cache_write: row["total_cache_write"].as_u64().unwrap_or(0), model, + last_tool, + last_prompt, }); } @@ -637,6 +663,12 @@ struct DbSession { total_cache_read: u64, total_cache_write: u64, model: String, + /// Last tool call name from the assistant (e.g. "terminal", "read_file", "web_search") + last_tool: String, + /// Timestamp of the most recent message (epoch ms), or started_at if no messages + last_activity_ts: u64, + /// Last user prompt text (truncated for display) + last_prompt: String, } /// Check if a path is a symlink (fail-closed: returns true on error). From b11d162fca085ef21c5ad6df9009ba8f0d5c0d1d Mon Sep 17 00:00:00 2001 From: xiayouping79 Date: Sun, 14 Jun 2026 11:06:49 +0800 Subject: [PATCH 4/4] feat: populate chat messages, tool calls, and session summary from Hermes messages table --- src/collector/hermes.rs | 195 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 191 insertions(+), 4 deletions(-) diff --git a/src/collector/hermes.rs b/src/collector/hermes.rs index b46b5e5..0bdac9a 100644 --- a/src/collector/hermes.rs +++ b/src/collector/hermes.rs @@ -239,6 +239,20 @@ json.dump(rows, sys.stdout, ensure_ascii=False, default=str)", let now_ms = current_time_ms(); let mut sessions = Vec::new(); + // Collect active session IDs for chat data query + let active_sids: Vec = self + .cached_db_sessions + .iter() + .filter(|ds| pid_map.values().any(|sid| sid.as_str() == ds.id)) + .map(|ds| ds.id.clone()) + .collect(); + // Only fetch chat data on slow ticks (expensive) + let chat_data = if shared.slow_tick { + query_chat_data(&self.db_path, &active_sids) + } else { + HashMap::new() + }; + // Build a set of tracked session_ids for cache eviction later let _db_ids: HashSet<&str> = self .cached_db_sessions @@ -405,10 +419,23 @@ json.dump(rows, sys.stdout, ensure_ascii=False, default=str)", mem_file_count: 0, mem_line_count: 0, children, - initial_prompt: ds.title.clone(), - first_assistant_text: String::new(), - chat_messages: vec![], - tool_calls: vec![], + initial_prompt: chat_data + .get(&ds.id) + .map(|d| d.0.clone()) + .filter(|s| !s.is_empty()) + .unwrap_or_else(|| ds.title.clone()), + first_assistant_text: chat_data + .get(&ds.id) + .map(|d| d.1.clone()) + .unwrap_or_default(), + chat_messages: chat_data + .get(&ds.id) + .map(|d| d.2.clone()) + .unwrap_or_default(), + tool_calls: chat_data + .get(&ds.id) + .map(|d| d.3.clone()) + .unwrap_or_default(), pending_since_ms: 0, thinking_since_ms, file_accesses: vec![], @@ -650,6 +677,166 @@ impl super::AgentCollector for HermesCollector { } } +/// Batch-query chat messages and tool calls for active sessions. +fn query_chat_data( + db_path: &std::path::Path, + session_ids: &[String], +) -> HashMap, Vec)> { + use crate::model::{ChatMessage, ChatRole, ToolCall}; + use crate::model::MAX_CHAT_MESSAGES; + if session_ids.is_empty() || !db_path.exists() { + return HashMap::new(); + } + + let db_str = db_path.to_str().unwrap_or("").replace('\\', "/"); + let ids_json = serde_json::to_string(session_ids).unwrap_or_else(|_| "[]".to_string()); + let ids_py = ids_json.replace('\'', "'\\''"); + + let script = format!( + r#"import sqlite3, json, sys +ids = json.loads(r'{}') +db = r'{}' +placeholders = ','.join(['?'] * len(ids)) +query = """SELECT m.session_id, m.role, m.content, m.tool_calls, + CAST(CAST(m.timestamp AS REAL) * 1000 AS INTEGER) as ts_ms +FROM messages m +WHERE m.session_id IN ({}) +ORDER BY m.id ASC""" +conn = sqlite3.connect(db) +conn.row_factory = sqlite3.Row +cur = conn.execute(query, ids) +rows = [dict(row) for row in cur.fetchall()] +conn.close() +json.dump(rows, sys.stdout, ensure_ascii=False, default=str) +"#, + ids_py, + db_str, + placeholders_str(session_ids.len()), + ); + + let output = std::process::Command::new("python") + .args(["-c", &script]) + .output() + .ok(); + let output = match output { + Some(o) if o.status.success() => o, + _ => return HashMap::new(), + }; + + let stdout = String::from_utf8_lossy(&output.stdout); + if stdout.trim().is_empty() { + return HashMap::new(); + } + + let rows: Vec = match serde_json::from_str(stdout.trim()) { + Ok(r) => r, + Err(_) => return HashMap::new(), + }; + + let mut result: HashMap, Vec)> = + HashMap::new(); + + for row in &rows { + let sid = match row["session_id"].as_str() { + Some(s) => s.to_string(), + None => continue, + }; + let role = row["role"].as_str().unwrap_or(""); + let content = row["content"].as_str().unwrap_or("").to_string(); + + let entry = result.entry(sid).or_insert_with(|| { + ( + String::new(), // initial_prompt + String::new(), // first_assistant_text + Vec::new(), // chat_messages + Vec::new(), // tool_calls + ) + }); + + match role { + "user" => { + if entry.0.is_empty() && !content.is_empty() { + entry.0 = content.clone(); + } + if !content.is_empty() { + entry.2.push(ChatMessage { + role: ChatRole::User, + text: super::redact_secrets(&content), + }); + } + } + "assistant" => { + if entry.1.is_empty() && !content.is_empty() { + entry.1 = content.clone(); + } + // Extract tool calls from JSON tool_calls column + if let Some(tc_val) = row["tool_calls"].as_str() { + if let Ok(tcs) = serde_json::from_str::>(tc_val) { + for tc in &tcs { + if let Some(name) = tc["function"]["name"].as_str() { + let arg = tc["function"]["arguments"] + .as_str() + .unwrap_or("") + .to_string(); + entry.3.push(ToolCall { + name: name.to_string(), + arg: truncate_arg(&arg), + duration_ms: 0, + }); + } + } + } + } + if !content.is_empty() { + entry.2.push(ChatMessage { + role: ChatRole::Assistant, + text: super::redact_secrets(&content), + }); + } + } + _ => {} // skip tool role messages + } + } + + // Trim chat_messages to MAX_CHAT_MESSAGES + for entry in result.values_mut() { + if entry.2.len() > MAX_CHAT_MESSAGES { + let start = entry.2.len() - MAX_CHAT_MESSAGES; + let tail = entry.2.drain(start..).collect(); + entry.2 = tail; + } + } + + result +} + +/// Build a placeholder string for SQL IN clause. +fn placeholders_str(count: usize) -> String { + let mut s = String::with_capacity(count * 2); + for i in 0..count { + if i > 0 { + s.push(','); + } + s.push('?'); + s.push_str(&(i + 1).to_string()); + } + s +} + +/// Truncate a tool call argument for display. +fn truncate_arg(arg: &str) -> String { + if arg.len() > 80 { + // Find last char boundary at or before 77 + let mut end = 77; + while end > 0 && !arg.is_char_boundary(end) { + end -= 1; + } + format!("{}...", &arg[..end]) + } else { + arg.to_string() + } +} + struct DbSession { id: String, title: String,