diff --git a/dsl/camel-jbang/camel-jbang-plugin-tui/src/main/java/org/apache/camel/dsl/jbang/core/commands/tui/CamelMonitor.java b/dsl/camel-jbang/camel-jbang-plugin-tui/src/main/java/org/apache/camel/dsl/jbang/core/commands/tui/CamelMonitor.java index ab3a259ca40d7..0c20ff224d2de 100644 --- a/dsl/camel-jbang/camel-jbang-plugin-tui/src/main/java/org/apache/camel/dsl/jbang/core/commands/tui/CamelMonitor.java +++ b/dsl/camel-jbang/camel-jbang-plugin-tui/src/main/java/org/apache/camel/dsl/jbang/core/commands/tui/CamelMonitor.java @@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Duration; import java.time.Instant; import java.time.ZoneId; import java.util.ArrayList; @@ -31,6 +32,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -120,10 +122,10 @@ public class CamelMonitor extends CamelCommand { private static final int TAB_ROUTES = 2; private static final int TAB_CONSUMERS = 3; private static final int TAB_ENDPOINTS = 4; - private static final int TAB_CIRCUIT_BREAKER = 5; - private static final int TAB_HEALTH = 6; - private static final int TAB_HISTORY = 7; - private static final int TAB_TRACE = 8; + private static final int TAB_HEALTH = 5; + private static final int TAB_HISTORY = 6; + private static final int TAB_TRACE = 7; + private static final int TAB_CIRCUIT_BREAKER = 8; // Overview sort columns private static final String[] OVERVIEW_SORT_COLUMNS = { "pid", "name", "version", "status", "total", "fail" }; @@ -182,6 +184,10 @@ public class CamelMonitor extends CamelCommand { private final Map> endpointRemoteSamples = new ConcurrentHashMap<>(); private final Map previousEndpointRemoteTime = new ConcurrentHashMap<>(); + // Load averages (EWMA) — CPU%, per PID (inflight EWMA is read from the management JSON) + private final Map cpuLoadAvg = new ConcurrentHashMap<>(); + private final Map prevCpuSample = new ConcurrentHashMap<>(); + // Overview sort state private String overviewSort = "name"; private int overviewSortIndex = 1; @@ -204,6 +210,7 @@ public class CamelMonitor extends CamelCommand { // Endpoint filter state private boolean showOnlyRemote; + private boolean showEndpointChart = true; // Circuit breaker sort state (default: route = index 0) private String cbSort = "route"; @@ -268,7 +275,10 @@ public class CamelMonitor extends CamelCommand { private String selectedPid; // Diagram state - private boolean chartAllIntegrations = true; + private static final int CHART_ALL = 0; + private static final int CHART_SINGLE = 1; + private static final int CHART_OFF = 2; + private int chartMode = CHART_ALL; private boolean showDiagram; private boolean diagramTextMode; private boolean diagramMetrics = true; @@ -397,16 +407,16 @@ private boolean handleEvent(Event event, TuiRunner runner) { return handleTabKey(TAB_ENDPOINTS); } if (ke.isChar('6')) { - return handleTabKey(TAB_CIRCUIT_BREAKER); + return handleTabKey(TAB_HEALTH); } if (ke.isChar('7')) { - return handleTabKey(TAB_HEALTH); + return handleTabKey(TAB_HISTORY); } if (ke.isChar('8')) { - return handleTabKey(TAB_HISTORY); + return handleTabKey(TAB_TRACE); } if (ke.isChar('9')) { - return handleTabKey(TAB_TRACE); + return handleTabKey(TAB_CIRCUIT_BREAKER); } // Tab cycling @@ -526,9 +536,9 @@ private boolean handleEvent(Event event, TuiRunner runner) { overviewSortReversed = !overviewSortReversed; return true; } - // Overview tab: toggle chart between all integrations and selected only + // Overview tab: cycle chart between all integrations, selected only, and off if (tab == TAB_OVERVIEW && ke.isCharIgnoreCase('a')) { - chartAllIntegrations = !chartAllIntegrations; + chartMode = (chartMode + 1) % 3; return true; } // Overview tab: start/stop all routes for selected integration @@ -580,6 +590,10 @@ private boolean handleEvent(Event event, TuiRunner runner) { showOnlyRemote = !showOnlyRemote; return true; } + if (tab == TAB_ENDPOINTS && ke.isCharIgnoreCase('a')) { + showEndpointChart = !showEndpointChart; + return true; + } // Routes tab: sort and diagram if (tab == TAB_ROUTES && ke.isChar('s')) { @@ -840,6 +854,11 @@ private boolean handleTabKey(int tab) { if (tab != TAB_OVERVIEW) { selectCurrentIntegration(); } + if (tab == TAB_LOG) { + // Reset log state so the next tick tails from the correct file/position + logFilePos = -1; + logLineBuffer.setLength(0); + } if (tab == TAB_HISTORY && selectedPid != null) { refreshHistoryData(List.of(Long.parseLong(selectedPid))); if (!historyEntries.isEmpty()) { @@ -861,28 +880,29 @@ private boolean handleTabKey(int tab) { return true; } + // Returns integrations in the same order the overview table renders them. + // Must be used anywhere that translates a table row index to a PID. + private List sortedOverviewInfos() { + List infos = new ArrayList<>(data.get()); + infos.sort(this::sortOverview); + return infos; + } + private void selectCurrentIntegration() { if (selectedPid != null) { return; } - List infos = data.get().stream().filter(i -> !i.vanishing).toList(); + List infos = sortedOverviewInfos(); Integer sel = overviewTableState.selected(); if (sel != null && sel >= 0 && sel < infos.size()) { selectedPid = infos.get(sel).pid; } else if (infos.size() == 1) { selectedPid = infos.get(0).pid; } - if (selectedPid != null) { - List pids = List.of(Long.parseLong(selectedPid)); - refreshHistoryData(pids); - traceFilePositions.clear(); - traces.set(Collections.emptyList()); - refreshTraceData(pids); - } } private void syncSelectedPidFromOverview() { - List infos = data.get().stream().filter(i -> !i.vanishing).toList(); + List infos = sortedOverviewInfos(); Integer sel = overviewTableState.selected(); String newPid = null; if (sel != null && sel >= 0 && sel < infos.size()) { @@ -893,11 +913,6 @@ private void syncSelectedPidFromOverview() { if (newPid != null && !newPid.equals(selectedPid)) { selectedPid = newPid; resetIntegrationTabState(); - List pids = List.of(Long.parseLong(selectedPid)); - refreshHistoryData(pids); - traceFilePositions.clear(); - traces.set(Collections.emptyList()); - refreshTraceData(pids); } } @@ -943,9 +958,6 @@ private void navigateUp() { syncSelectedPidFromOverview(); } case TAB_ROUTES -> routeTableState.selectPrevious(); - case TAB_CONSUMERS -> consumerTableState.selectPrevious(); - case TAB_HEALTH -> healthTableState.selectPrevious(); - case TAB_ENDPOINTS -> endpointTableState.selectPrevious(); case TAB_CIRCUIT_BREAKER -> cbTableState.selectPrevious(); case TAB_LOG -> { logFollowMode = false; @@ -970,25 +982,13 @@ private void navigateDown() { List infos = data.get().stream().filter(i -> !i.vanishing).toList(); switch (tabsState.selected()) { case TAB_OVERVIEW -> { - overviewTableState.selectNext(infos.size()); + overviewTableState.selectNext(sortedOverviewInfos().size()); syncSelectedPidFromOverview(); } case TAB_ROUTES -> { IntegrationInfo info = findSelectedIntegration(); routeTableState.selectNext(info != null ? info.routes.size() : 0); } - case TAB_CONSUMERS -> { - IntegrationInfo info = findSelectedIntegration(); - consumerTableState.selectNext(info != null ? info.consumers.size() : 0); - } - case TAB_HEALTH -> { - IntegrationInfo info = findSelectedIntegration(); - healthTableState.selectNext(info != null ? getFilteredHealthChecks(info).size() : 0); - } - case TAB_ENDPOINTS -> { - IntegrationInfo info = findSelectedIntegration(); - endpointTableState.selectNext(info != null ? info.endpoints.size() : 0); - } case TAB_CIRCUIT_BREAKER -> { IntegrationInfo info = findSelectedIntegration(); cbTableState.selectNext(info != null ? info.circuitBreakers.size() : 0); @@ -1080,20 +1080,17 @@ private void renderTabs(Frame frame, Rect area) { Tabs tabs = Tabs.builder() .titles( badge(" 1 Overview ", activeCount), - filteredLogEntries.isEmpty() - ? Line.from(" 2 Log ") - : Line.from(Span.raw(" 2 Log "), Span.styled("(*)", Style.EMPTY.fg(Color.YELLOW).bold()), - Span.raw(" ")), + Line.from(" 2 Log "), badge(" 3 Routes ", routeCount), badge(" 4 Consumers ", consumerCount), badge(" 5 Endpoints ", endpointCount), - badgeCb(" 6 Circuit Breaker ", cbCount, cbOpenCount), - badgeHealth(" 7 Health ", healthCount, healthDownCount), - badge(" 8 Last ", historyCount), + badgeHealth(" 6 Health ", healthCount, healthDownCount), + badge(" 7 Last ", historyCount), hasTraces - ? Line.from(Span.raw(" 9 Trace "), Span.styled("(*)", Style.EMPTY.fg(Color.YELLOW).bold()), + ? Line.from(Span.raw(" 8 Trace "), Span.styled("(*)", Style.EMPTY.fg(Color.YELLOW).bold()), Span.raw(" ")) - : Line.from(" 9 Trace ")) + : Line.from(" 8 Trace "), + badgeCb(" 9 Circuit Breaker ", cbCount, cbOpenCount)) .highlightStyle(Style.EMPTY.fg(Color.rgb(0xF6, 0x91, 0x23)).bold()) .divider(Span.styled(" | ", Style.EMPTY.dim())) .build(); @@ -1122,11 +1119,20 @@ private void renderContent(Frame frame, Rect area) { // ---- Tab 1: Overview ---- private void renderOverview(Frame frame, Rect area) { - List infos = new ArrayList<>(data.get()); - infos.sort(this::sortOverview); + List infos = sortedOverviewInfos(); + + // Keep the table selection index tracking the same PID across sort changes and data refreshes + if (selectedPid != null) { + for (int i = 0; i < infos.size(); i++) { + if (selectedPid.equals(infos.get(i).pid)) { + overviewTableState.select(i); + break; + } + } + } - // Split: table (fill) + chart (14 rows: 13 chart + 1 x-axis) if we have data - boolean hasSparkline = !throughputHistory.isEmpty(); + // Split: table (fill) + chart (14 rows: 13 chart + 1 x-axis) if we have data and chart is on + boolean hasSparkline = chartMode != CHART_OFF && !throughputHistory.isEmpty(); List chunks; if (hasSparkline) { chunks = Layout.vertical() @@ -1257,7 +1263,7 @@ private void renderOverview(Frame frame, Rect area) { // Merge throughput histories: all PIDs or selected only long[] mergedTotal = new long[renderPoints]; long[] mergedFailed = new long[renderPoints]; - String chartPid = (!chartAllIntegrations && selectedPid != null) ? selectedPid : null; + String chartPid = (chartMode == CHART_SINGLE && selectedPid != null) ? selectedPid : null; for (int i = 0; i < renderPoints; i++) { for (Map.Entry> e : throughputHistory.entrySet()) { if (chartPid == null || chartPid.equals(e.getKey())) { @@ -1287,7 +1293,7 @@ private void renderOverview(Frame frame, Rect area) { // Styled legend in chart title Line titleLine; - if (!chartAllIntegrations && selectedPid != null) { + if (chartMode == CHART_SINGLE && selectedPid != null) { IntegrationInfo chartSel = findSelectedIntegration(); String chartName = chartSel != null ? TuiHelper.truncate(chartSel.name, 12) : selectedPid; titleLine = Line.from( @@ -1456,6 +1462,22 @@ private void renderOverviewInfoPanel(Frame frame, Rect area) { Span.styled("Thds: ", dim), Span.raw(sel.threadCount + " / " + sel.peakThreadCount))); } + LoadAvg cpu = cpuLoadAvg.get(sel.pid); + boolean hasInfl = sel.inflightLoad01 != null && !sel.inflightLoad01.isEmpty(); + if (cpu != null || hasInfl) { + lines.add(Line.from(Span.raw(""))); + lines.add(Line.from(Span.styled("Load (1m/5m/15m):", dim))); + if (cpu != null) { + lines.add(Line.from( + Span.styled("CPU: ", dim), + Span.raw(cpu.format("%.1f / %.1f / %.1f %%")))); + } + if (hasInfl) { + lines.add(Line.from( + Span.styled("Infl: ", dim), + Span.raw(sel.inflightLoad01 + " / " + sel.inflightLoad05 + " / " + sel.inflightLoad15))); + } + } } else { lines.add(Line.from(Span.raw("-"))); } @@ -1823,8 +1845,6 @@ private void renderConsumers(Frame frame, Rect area) { Constraint.length(10), Constraint.length(22), Constraint.fill()) - .highlightStyle(Style.EMPTY.fg(Color.WHITE).bold().onBlue()) - .highlightSpacing(Table.HighlightSpacing.ALWAYS) .block(Block.builder().borderType(BorderType.ROUNDED) .title(" Consumers sort:" + consumerSort + " ").build()) .build(); @@ -2888,8 +2908,6 @@ private void renderHealth(Frame frame, Rect area) { Constraint.length(12), Constraint.length(6), Constraint.fill()) - .highlightStyle(Style.EMPTY.fg(Color.WHITE).bold().onBlue()) - .highlightSpacing(Table.HighlightSpacing.ALWAYS) .block(Block.builder().borderType(BorderType.ROUNDED).title(title).build()) .build(); @@ -2971,27 +2989,27 @@ private void renderEndpoints(Frame frame, Rect area) { Constraint.length(6), Constraint.length(8), Constraint.fill()) - .highlightStyle(Style.EMPTY.fg(Color.WHITE).bold().onBlue()) - .highlightSpacing(Table.HighlightSpacing.ALWAYS) .block(Block.builder().borderType(BorderType.ROUNDED) .title(" Endpoints sort:" + endpointSort + (showOnlyRemote ? " remote" : "") + " ").build()) .build(); - List chunks = Layout.vertical() - .constraints(Constraint.fill(), Constraint.length(12)) - .split(area); + List chunks = showEndpointChart + ? Layout.vertical().constraints(Constraint.fill(), Constraint.length(16)).split(area) + : List.of(area); frame.renderStatefulWidget(table, chunks.get(0), endpointTableState); - long inTotal = info.endpoints.stream() - .filter(ep -> "in".equals(ep.direction) && (!showOnlyRemote || ep.remote)) - .mapToLong(ep -> ep.hits) - .sum(); - long outTotal = info.endpoints.stream() - .filter(ep -> "out".equals(ep.direction) && (!showOnlyRemote || ep.remote)) - .mapToLong(ep -> ep.hits) - .sum(); - renderEndpointFlow(frame, chunks.get(1), inTotal, outTotal, info.name, info.pid, showOnlyRemote); + if (showEndpointChart) { + long inTotal = info.endpoints.stream() + .filter(ep -> "in".equals(ep.direction) && (!showOnlyRemote || ep.remote)) + .mapToLong(ep -> ep.hits) + .sum(); + long outTotal = info.endpoints.stream() + .filter(ep -> "out".equals(ep.direction) && (!showOnlyRemote || ep.remote)) + .mapToLong(ep -> ep.hits) + .sum(); + renderEndpointFlow(frame, chunks.get(1), inTotal, outTotal, info.name, info.pid, showOnlyRemote); + } } private void renderEndpointFlow( @@ -3878,16 +3896,22 @@ private void renderFooter(Frame frame, Rect area) { if (tab == TAB_OVERVIEW) { hint(spans, "q", "quit"); + if (selectedPid != null) { + hint(spans, "Esc", "unselect"); + } hint(spans, "\u2191\u2193", "navigate"); hint(spans, "s", "sort"); - hint(spans, "a", "chart " + (chartAllIntegrations ? "[all]" : "[single]")); + hint(spans, "a", "chart " + switch (chartMode) { + case CHART_ALL -> "[all]"; + case CHART_SINGLE -> "[single]"; + default -> "[off]"; + }); hint(spans, "Enter", "details"); if (selectedPid != null) { IntegrationInfo selInfo = findSelectedIntegration(); if (selInfo != null) { hint(spans, "p", selInfo.routeStarted > 0 ? "stop" : "start"); } - hint(spans, "Esc", "unselect"); } hint(spans, "1-9", "tabs"); } else if (tab == TAB_ROUTES && showSource) { @@ -3932,14 +3956,13 @@ private void renderFooter(Frame frame, Rect area) { hint(spans, "1-9", "tabs"); } else if (tab == TAB_CONSUMERS) { hint(spans, "Esc", "back"); - hint(spans, "\u2191\u2193", "navigate"); hint(spans, "s", "sort"); hint(spans, "1-9", "tabs"); } else if (tab == TAB_ENDPOINTS) { hint(spans, "Esc", "back"); - hint(spans, "\u2191\u2193", "navigate"); hint(spans, "s", "sort"); hint(spans, "r", "remote" + (showOnlyRemote ? " [on]" : " [off]")); + hint(spans, "a", "chart " + (showEndpointChart ? "[all]" : "[off]")); hint(spans, "1-9", "tabs"); } else if (tab == TAB_CIRCUIT_BREAKER) { hint(spans, "Esc", "back"); @@ -3948,7 +3971,6 @@ private void renderFooter(Frame frame, Rect area) { hint(spans, "1-9", "tabs"); } else if (tab == TAB_HEALTH) { hint(spans, "Esc", "back"); - hint(spans, "\u2191\u2193", "navigate"); hint(spans, "d", "toggle DOWN"); hint(spans, "1-9", "tabs"); } else if (tab == TAB_LOG && showLogLevelPopup) { @@ -4110,6 +4132,7 @@ private void refreshDataSync() { infos.add(info); updateThroughputHistory(info); updateEndpointHistory(info); + updateLoadMetrics(ph, info); } } }); @@ -4140,6 +4163,8 @@ private void refreshDataSync() { endpointRemoteOutHistory.remove(entry.getKey()); endpointRemoteSamples.remove(entry.getKey()); previousEndpointRemoteTime.remove(entry.getKey()); + cpuLoadAvg.remove(entry.getKey()); + prevCpuSample.remove(entry.getKey()); } else if (!livePids.contains(entry.getKey())) { IntegrationInfo ghost = entry.getValue().info; ghost.vanishing = true; @@ -4152,33 +4177,37 @@ private void refreshDataSync() { data.set(infos); - // Refresh log data for the selected integration (incremental tail) - IntegrationInfo selected = findSelectedIntegration(); - if (selected != null) { - if (!selected.pid.equals(logFilePid)) { - // Integration changed: reset all incremental log state - mutableFilteredEntries.clear(); - logFilePos = -1; - logTotalLinesRead = 0; - logEvictedSeen = 0; - logLineBuffer.setLength(0); - } - List newRawLines = new ArrayList<>(); - readNewLogLines(selected.pid, newRawLines); - if (!newRawLines.isEmpty()) { - logTotalLinesRead += newRawLines.size(); - for (String line : newRawLines) { - mutableFilteredEntries.add(parseLogLine(line)); + // Refresh log data only when the Log tab is visible + if (tabsState.selected() == TAB_LOG) { + IntegrationInfo selected = findSelectedIntegration(); + if (selected != null) { + if (!selected.pid.equals(logFilePid)) { + // Integration changed: reset all incremental log state + mutableFilteredEntries.clear(); + logFilePos = -1; + logTotalLinesRead = 0; + logEvictedSeen = 0; + logLineBuffer.setLength(0); } - if (mutableFilteredEntries.size() > MAX_LOG_LINES) { - mutableFilteredEntries.subList(0, mutableFilteredEntries.size() - MAX_LOG_LINES).clear(); + List newRawLines = new ArrayList<>(); + readNewLogLines(selected.pid, newRawLines); + if (!newRawLines.isEmpty()) { + logTotalLinesRead += newRawLines.size(); + for (String line : newRawLines) { + mutableFilteredEntries.add(parseLogLine(line)); + } + if (mutableFilteredEntries.size() > MAX_LOG_LINES) { + mutableFilteredEntries.subList(0, mutableFilteredEntries.size() - MAX_LOG_LINES).clear(); + } + filteredLogEntries = new ArrayList<>(mutableFilteredEntries); } - filteredLogEntries = new ArrayList<>(mutableFilteredEntries); } } - // Refresh trace data - refreshTraceData(pids); + // Refresh trace data only when the Trace tab is visible + if (tabsState.selected() == TAB_TRACE) { + refreshTraceData(pids); + } } catch (Exception e) { // ignore refresh errors } @@ -4313,6 +4342,27 @@ private void refreshTraceData(List pids) { traces.set(allTraces); } + private void updateLoadMetrics(ProcessHandle ph, IntegrationInfo info) { + String pid = info.pid; + + // CPU EWMA — compute % from ProcessHandle CPU duration delta + Optional durOpt = ph.info().totalCpuDuration(); + if (durOpt.isPresent()) { + long cpuNanos = durOpt.get().toNanos(); + long wallMs = System.currentTimeMillis(); + long[] prev = prevCpuSample.get(pid); + if (prev != null) { + long deltaCpuNanos = cpuNanos - prev[0]; + long deltaWallNanos = (wallMs - prev[1]) * 1_000_000L; + if (deltaWallNanos > 0) { + double cpuPct = (double) deltaCpuNanos / deltaWallNanos * 100.0; + cpuLoadAvg.computeIfAbsent(pid, k -> new LoadAvg()).update(Math.max(0, cpuPct)); + } + } + prevCpuSample.put(pid, new long[] { cpuNanos, wallMs }); + } + } + @SuppressWarnings("unchecked") private void readTraceFile(String pid, List allTraces) { Path traceFile = CommandLineHelper.getCamelDir().resolve(pid + "-trace.json"); @@ -4742,6 +4792,9 @@ private IntegrationInfo parseIntegration(ProcessHandle ph, JsonObject root) { info.exchangesTotal = objToLong(stats.get("exchangesTotal")); info.failed = objToLong(stats.get("exchangesFailed")); info.inflight = objToLong(stats.get("exchangesInflight")); + info.inflightLoad01 = objToString(stats.get("load01")); + info.inflightLoad05 = objToString(stats.get("load05")); + info.inflightLoad15 = objToString(stats.get("load15")); info.last = objToString(stats.get("lastProcessingTime")); info.delta = objToString(stats.get("deltaProcessingTime")); long tsStarted = objToLong(stats.get("lastCreatedExchangeTimestamp")); @@ -5070,6 +5123,28 @@ private static long objToLong(Object o) { return TuiHelper.objToLong(o); } + // ---- Load Average ---- + + private static class LoadAvg { + private static final double EXP_1 = Math.exp(-1 / 60.0); + private static final double EXP_5 = Math.exp(-1 / (60.0 * 5.0)); + private static final double EXP_15 = Math.exp(-1 / (60.0 * 15.0)); + + private double load1 = Double.NaN; + private double load5 = Double.NaN; + private double load15 = Double.NaN; + + synchronized void update(double value) { + load1 = Double.isNaN(load1) ? value : value + EXP_1 * (load1 - value); + load5 = Double.isNaN(load5) ? value : value + EXP_5 * (load5 - value); + load15 = Double.isNaN(load15) ? value : value + EXP_15 * (load15 - value); + } + + synchronized String format(String fmt) { + return Double.isNaN(load1) ? "-" : String.format(fmt, load1, load5, load15); + } + } + // ---- Data Classes ---- static class IntegrationInfo { @@ -5091,6 +5166,9 @@ static class IntegrationInfo { long exchangesTotal; long failed; long inflight; + String inflightLoad01; + String inflightLoad05; + String inflightLoad15; String last; String delta; String sinceLastStarted;