diff --git a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsSystem.java b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsSystem.java index a57dd858b45..b38dffeb0f8 100644 --- a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsSystem.java +++ b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsSystem.java @@ -30,6 +30,11 @@ public static void start(Instrumentation inst, SharedCommunicationObjects sco) { return; } + if (!config.isTraceEnabled()) { + LOGGER.debug("LLM Observability is disabled: tracing is disabled"); + return; + } + sco.createRemaining(config); String mlApp = config.getLlmObsMlApp(); diff --git a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/DDLLMObsSpan.java b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/DDLLMObsSpan.java index 6532829cfa6..996590514b3 100644 --- a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/DDLLMObsSpan.java +++ b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/DDLLMObsSpan.java @@ -5,7 +5,9 @@ import datadog.trace.api.DDSpanTypes; import datadog.trace.api.DDTraceApiInfo; import datadog.trace.api.DDTraceId; +import datadog.trace.api.ProductTraceSource; import datadog.trace.api.WellKnownTags; +import datadog.trace.api.internal.TraceSegment; import datadog.trace.api.llmobs.LLMObs; import datadog.trace.api.llmobs.LLMObsContext; import datadog.trace.api.llmobs.LLMObsSpan; @@ -110,6 +112,12 @@ public DDLLMObsSpan( } span.setTag(LLMOBS_TAG_PREFIX + PARENT_ID_TAG_INTERNAL, parentSpanID); scope = LLMObsContext.attach(span.context()); + + // Mark this span as originating from LLM Observability product + TraceSegment segment = AgentTracer.get().getTraceSegment(); + if (segment != null) { + segment.setTagTop(Tags.PROPAGATED_TRACE_SOURCE, ProductTraceSource.LLMOBS); + } } @Override diff --git a/dd-java-agent/agent-llmobs/src/test/groovy/datadog/trace/llmobs/LLMObsSystemTest.groovy b/dd-java-agent/agent-llmobs/src/test/groovy/datadog/trace/llmobs/LLMObsSystemTest.groovy new file mode 100644 index 00000000000..ef2f6c82dd4 --- /dev/null +++ b/dd-java-agent/agent-llmobs/src/test/groovy/datadog/trace/llmobs/LLMObsSystemTest.groovy @@ -0,0 +1,53 @@ +package datadog.trace.llmobs + +import datadog.communication.ddagent.SharedCommunicationObjects +import datadog.trace.test.util.DDSpecification +import okhttp3.HttpUrl + +class LLMObsSystemTest extends DDSpecification { + + void 'start disabled when llmobs is disabled'() { + setup: + injectSysConfig('llmobs.enabled', 'false') + rebuildConfig() + final inst = Mock(java.lang.instrument.Instrumentation) + final sco = Mock(SharedCommunicationObjects) + + when: + LLMObsSystem.start(inst, sco) + + then: + 0 * sco._ + } + + void 'start disabled when trace is disabled'() { + setup: + injectSysConfig('llmobs.enabled', 'true') + injectSysConfig('trace.enabled', 'false') + rebuildConfig() + final inst = Mock(java.lang.instrument.Instrumentation) + final sco = Mock(SharedCommunicationObjects) + + when: + LLMObsSystem.start(inst, sco) + + then: + 0 * sco._ + } + + void 'start enabled when apm tracing disabled but llmobs enabled'() { + setup: + injectSysConfig('llmobs.enabled', 'true') + injectSysConfig('apm.tracing.enabled', 'false') + rebuildConfig() + final inst = Mock(java.lang.instrument.Instrumentation) + final sco = Mock(SharedCommunicationObjects) + sco.agentUrl = HttpUrl.parse('http://localhost:8126') + + when: + LLMObsSystem.start(inst, sco) + + then: + 1 * sco.createRemaining(_) + } +} diff --git a/dd-smoke-tests/apm-tracing-disabled/src/main/java/datadog/smoketest/apmtracingdisabled/Controller.java b/dd-smoke-tests/apm-tracing-disabled/src/main/java/datadog/smoketest/apmtracingdisabled/Controller.java index 3bb55197614..61f1e4ffef0 100644 --- a/dd-smoke-tests/apm-tracing-disabled/src/main/java/datadog/smoketest/apmtracingdisabled/Controller.java +++ b/dd-smoke-tests/apm-tracing-disabled/src/main/java/datadog/smoketest/apmtracingdisabled/Controller.java @@ -1,5 +1,7 @@ package datadog.smoketest.apmtracingdisabled; +import datadog.trace.api.llmobs.LLMObs; +import datadog.trace.api.llmobs.LLMObsSpan; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.opentracing.Span; import io.opentracing.util.GlobalTracer; @@ -73,6 +75,17 @@ public void write( } } + @GetMapping("/llmobs/test") + public String llmobsTest() { + // Create LLMObs span using public API + LLMObsSpan llmSpan = + LLMObs.startLLMSpan("llmobs-test-operation", "gpt-4", "openai", null, null); + llmSpan.annotateIO("test input", "test output"); + llmSpan.finish(); + + return "LLMObs test completed"; + } + private String forceKeepSpan() { final Span span = GlobalTracer.get().activeSpan(); if (span != null) { diff --git a/dd-smoke-tests/apm-tracing-disabled/src/test/groovy/datadog/smoketest/apmtracingdisabled/LlmObsApmDisabledSmokeTest.groovy b/dd-smoke-tests/apm-tracing-disabled/src/test/groovy/datadog/smoketest/apmtracingdisabled/LlmObsApmDisabledSmokeTest.groovy new file mode 100644 index 00000000000..ef1c6b8f7aa --- /dev/null +++ b/dd-smoke-tests/apm-tracing-disabled/src/test/groovy/datadog/smoketest/apmtracingdisabled/LlmObsApmDisabledSmokeTest.groovy @@ -0,0 +1,92 @@ +package datadog.smoketest.apmtracingdisabled + +import datadog.trace.api.sampling.PrioritySampling +import okhttp3.Request + +class LlmObsApmDisabledSmokeTest extends AbstractApmTracingDisabledSmokeTest { + + static final String LLMOBS_SERVICE_NAME = "llmobs-apm-disabled-test" + + static final String[] LLMOBS_APM_DISABLED_PROPERTIES = [ + "-Ddd.apm.tracing.enabled=false", + "-Ddd.llmobs.enabled=true", + "-Ddd.llmobs.ml-app=test-app", + "-Ddd.service.name=${LLMOBS_SERVICE_NAME}", + ] + + @Override + ProcessBuilder createProcessBuilder() { + return createProcess(LLMOBS_APM_DISABLED_PROPERTIES) + } + + void 'When APM disabled and LLMObs enabled, LLMObs spans should be kept and APM spans should be dropped'() { + setup: + final llmobsUrl = "http://localhost:${httpPort}/rest-api/llmobs/test" + final llmobsRequest = new Request.Builder().url(llmobsUrl).get().build() + + final apmUrl = "http://localhost:${httpPort}/rest-api/greetings" + final apmRequest = new Request.Builder().url(apmUrl).get().build() + + when: "Create LLMObs span" + final llmobsResponse = client.newCall(llmobsRequest).execute() + + then: "LLMObs request should succeed" + llmobsResponse.successful + + when: "Create regular APM span" + final apmResponse = client.newCall(apmRequest).execute() + + then: "APM request should succeed" + apmResponse.successful + + and: "Wait for traces" + waitForTraceCount(2) + + and: "LLMObs trace should be kept (SAMPLER_KEEP)" + def llmobsTrace = traces.find { trace -> + trace.spans.find { span -> + span.meta["http.url"] == llmobsUrl + } + } + assert llmobsTrace != null + // The LLMObs child span should have LLMObs tags + def llmobsChildSpan = llmobsTrace.spans.find { span -> + span.meta["_ml_obs_tag.model_name"] == "gpt-4" + } + assert llmobsChildSpan != null : "LLMObs child span with model_name=gpt-4 should exist" + + and: "Regular APM trace should be dropped (SAMPLER_DROP)" + def apmTrace = traces.find { trace -> + trace.spans.find { span -> + span.meta["http.url"] == apmUrl + } + } + assert apmTrace != null + checkRootSpanPrioritySampling(apmTrace, PrioritySampling.SAMPLER_DROP) + + and: "No NPE or errors in logs" + !isLogPresent { it.contains("NullPointerException") } + !isLogPresent { it.contains("ERROR") } + } + + void 'LLMObs spans should have PROPAGATED_TRACE_SOURCE tag set'() { + setup: + final llmobsUrl = "http://localhost:${httpPort}/rest-api/llmobs/test" + final llmobsRequest = new Request.Builder().url(llmobsUrl).get().build() + + when: + final response = client.newCall(llmobsRequest).execute() + + then: + response.successful + waitForTraceCount(1) + + and: "LLMObs span should be created successfully" + def trace = traces[0] + assert trace != null + def llmobsSpan = trace.spans.find { span -> + span.meta["_ml_obs_tag.model_name"] == "gpt-4" + } + assert llmobsSpan != null : "LLMObs span with model_name should exist" + } +} diff --git a/dd-smoke-tests/apm-tracing-disabled/src/test/groovy/datadog/smoketest/apmtracingdisabled/LlmObsTraceDisabledSmokeTest.groovy b/dd-smoke-tests/apm-tracing-disabled/src/test/groovy/datadog/smoketest/apmtracingdisabled/LlmObsTraceDisabledSmokeTest.groovy new file mode 100644 index 00000000000..5501a9cc845 --- /dev/null +++ b/dd-smoke-tests/apm-tracing-disabled/src/test/groovy/datadog/smoketest/apmtracingdisabled/LlmObsTraceDisabledSmokeTest.groovy @@ -0,0 +1,34 @@ +package datadog.smoketest.apmtracingdisabled + +import okhttp3.Request + +class LlmObsTraceDisabledSmokeTest extends AbstractApmTracingDisabledSmokeTest { + + static final String[] LLMOBS_TRACE_DISABLED_PROPERTIES = [ + "-Ddd.trace.enabled=false", + "-Ddd.llmobs.enabled=true", + "-Ddd.llmobs.ml-app=test-app", + "-Ddd.service.name=llmobs-trace-disabled-test", + ] + + @Override + ProcessBuilder createProcessBuilder() { + return createProcess(LLMOBS_TRACE_DISABLED_PROPERTIES) + } + + void 'DD_TRACE_ENABLED=false with DD_LLMOBS_ENABLED=true should disable LLMObs gracefully'() { + setup: + final llmobsUrl = "http://localhost:${httpPort}/rest-api/llmobs/test" + final llmobsRequest = new Request.Builder().url(llmobsUrl).get().build() + + when: "Call LLMObs endpoint" + final response = client.newCall(llmobsRequest).execute() + + then: "Request should succeed" + response.successful + response.code() == 200 + + and: "LLMObs disabled message in logs" + isLogPresent { it.contains("LLM Observability is disabled: tracing is disabled") } + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/sampling/AsmStandaloneSampler.java b/dd-trace-core/src/main/java/datadog/trace/common/sampling/AsmStandaloneSampler.java deleted file mode 100644 index 54a2d750700..00000000000 --- a/dd-trace-core/src/main/java/datadog/trace/common/sampling/AsmStandaloneSampler.java +++ /dev/null @@ -1,58 +0,0 @@ -package datadog.trace.common.sampling; - -import static datadog.trace.api.sampling.PrioritySampling.SAMPLER_DROP; -import static datadog.trace.api.sampling.PrioritySampling.SAMPLER_KEEP; - -import datadog.trace.api.sampling.SamplingMechanism; -import datadog.trace.core.CoreSpan; -import java.time.Clock; -import java.util.concurrent.atomic.AtomicLong; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class is designed to only allow 1 APM trace per minute as standalone ASM is only interested - * in the traces containing ASM events. But the service catalog and the billing need a continuous - * ingestion of at least at 1 trace per minute to consider a service as being live and billable. In - * the absence of ASM events, no APM traces must be sent, so we need to let some regular APM traces - * go through, even in the absence of ASM events. - */ -public class AsmStandaloneSampler implements Sampler, PrioritySampler { - - private static final Logger log = LoggerFactory.getLogger(AsmStandaloneSampler.class); - private static final int RATE_IN_MILLISECONDS = 60000; // 1 minute - - private final AtomicLong lastSampleTime; - private final Clock clock; - - public AsmStandaloneSampler(final Clock clock) { - this.clock = clock; - this.lastSampleTime = new AtomicLong(clock.millis() - RATE_IN_MILLISECONDS); - } - - @Override - public > boolean sample(final T span) { - // Priority sampling sends all traces to the core agent, including traces marked dropped. - // This allows the core agent to collect stats on all traces. - return true; - } - - @Override - public > void setSamplingPriority(final T span) { - - if (shouldSample()) { - log.debug("Set SAMPLER_KEEP for span {}", span.getSpanId()); - span.setSamplingPriority(SAMPLER_KEEP, SamplingMechanism.APPSEC); - } else { - log.debug("Set SAMPLER_DROP for span {}", span.getSpanId()); - span.setSamplingPriority(SAMPLER_DROP, SamplingMechanism.APPSEC); - } - } - - private boolean shouldSample() { - long now = clock.millis(); - return lastSampleTime.updateAndGet( - lastTime -> now - lastTime >= RATE_IN_MILLISECONDS ? now : lastTime) - == now; - } -} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/sampling/Sampler.java b/dd-trace-core/src/main/java/datadog/trace/common/sampling/Sampler.java index af1045e39df..346744b3677 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/sampling/Sampler.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/sampling/Sampler.java @@ -12,6 +12,7 @@ import datadog.trace.api.sampling.SamplingRule; import datadog.trace.core.CoreSpan; import java.time.Clock; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -36,9 +37,17 @@ final class Builder { public static Sampler forConfig(final Config config, final TraceConfig traceConfig) { Sampler sampler; if (config != null) { - if (!config.isApmTracingEnabled() && isAsmEnabled(config)) { - log.debug("APM is disabled. Only 1 trace per minute will be sent."); - return new AsmStandaloneSampler(Clock.systemUTC()); + if (!config.isApmTracingEnabled()) { + List active = new ArrayList<>(); + if (config.isLlmObsEnabled()) active.add(StandaloneProduct.LLMOBS); + if (isAsmEnabled(config)) active.add(StandaloneProduct.ASM); + if (active.isEmpty()) { + log.debug("APM is disabled. All APM traces will be dropped."); + return new ForcePrioritySampler( + PrioritySampling.SAMPLER_DROP, SamplingMechanism.DEFAULT); + } + log.debug("APM is disabled, standalone products active: {}.", active); + return new StandaloneSampler(active, Clock.systemUTC()); } final Map serviceRules = config.getTraceSamplingServiceRules(); final Map operationRules = config.getTraceSamplingOperationRules(); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/sampling/StandaloneProduct.java b/dd-trace-core/src/main/java/datadog/trace/common/sampling/StandaloneProduct.java new file mode 100644 index 00000000000..d1aa3081308 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/sampling/StandaloneProduct.java @@ -0,0 +1,52 @@ +package datadog.trace.common.sampling; + +import datadog.trace.api.ProductTraceSource; +import datadog.trace.api.sampling.SamplingMechanism; + +/** + * Represents a standalone product that can function when APM tracing is disabled. Each product + * defines which traces to keep, which sampling mechanism to report, and whether it requires a + * 1-per-minute billing trace for service catalog / billing purposes. + * + *

To add a new standalone product: + * + *

    + *
  1. Add an enum entry here. + *
  2. Add one line in {@link Sampler.Builder#forConfig}. + *
  3. Update {@code ProductTraceSource.STANDALONE_PRODUCTS_MASK} to include the new product's + * {@link ProductTraceSource} bit — otherwise {@code + * TraceCollector.setSamplingPriorityIfNecessary} will not recognize the product's traces. + *
+ */ +public enum StandaloneProduct { + + /** + * LLM Observability: keeps all LLMOBS-marked traces. No billing trace is needed because LLMObs + * requires capturing every LLM interaction. + */ + LLMOBS(ProductTraceSource.LLMOBS, SamplingMechanism.DEFAULT, false), + + /** + * Application Security Management: keeps all ASM-marked traces and allows 1 APM trace per minute + * so the service catalog and billing can detect the service as live. + */ + ASM(ProductTraceSource.ASM, SamplingMechanism.APPSEC, true); + + /** The {@link ProductTraceSource} bit used to identify traces belonging to this product. */ + public final int traceSourceBit; + + /** The sampling mechanism to report when a trace is kept for this product. */ + public final byte samplingMechanism; + + /** + * Whether this product requires a billing trace (1 APM trace per minute) even in the absence of + * product-marked spans. + */ + public final boolean needsBillingTrace; + + StandaloneProduct(int traceSourceBit, byte samplingMechanism, boolean needsBillingTrace) { + this.traceSourceBit = traceSourceBit; + this.samplingMechanism = samplingMechanism; + this.needsBillingTrace = needsBillingTrace; + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/sampling/StandaloneSampler.java b/dd-trace-core/src/main/java/datadog/trace/common/sampling/StandaloneSampler.java new file mode 100644 index 00000000000..586cdcfe9cb --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/sampling/StandaloneSampler.java @@ -0,0 +1,102 @@ +package datadog.trace.common.sampling; + +import static datadog.trace.api.sampling.PrioritySampling.SAMPLER_DROP; +import static datadog.trace.api.sampling.PrioritySampling.SAMPLER_KEEP; + +import datadog.trace.api.ProductTraceSource; +import datadog.trace.api.sampling.SamplingMechanism; +import datadog.trace.core.CoreSpan; +import datadog.trace.core.DDSpan; +import java.time.Clock; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A unified sampler for when APM tracing is disabled but one or more standalone products are + * active. + * + *

For each span, the sampler checks whether the root span's {@link ProductTraceSource} bitfield + * matches any of the active products (in list order). The first match wins: the trace is kept with + * that product's sampling mechanism. + * + *

For APM-only traces (no product match): + * + *

    + *
  • If any active product requires a billing trace ({@link StandaloneProduct#needsBillingTrace} + * ), one APM trace per minute is allowed through with {@link SamplingMechanism#APPSEC}. + *
  • Otherwise all APM-only traces are dropped with {@link SamplingMechanism#DEFAULT}. + *
+ */ +public class StandaloneSampler implements Sampler, PrioritySampler { + + private static final Logger log = LoggerFactory.getLogger(StandaloneSampler.class); + private static final int RATE_IN_MILLISECONDS = 60000; // 1 minute + + private final List activeProducts; + private final boolean needsBillingTrace; + private final byte billingMechanism; + private final AtomicLong lastSampleTime; + private final Clock clock; + + public StandaloneSampler(final List activeProducts, final Clock clock) { + this.activeProducts = activeProducts; + this.clock = clock; + this.needsBillingTrace = activeProducts.stream().anyMatch(p -> p.needsBillingTrace); + this.billingMechanism = + activeProducts.stream() + .filter(p -> p.needsBillingTrace) + .map(p -> p.samplingMechanism) + .findFirst() + .orElse(SamplingMechanism.DEFAULT); + this.lastSampleTime = new AtomicLong(clock.millis() - RATE_IN_MILLISECONDS); + } + + List getActiveProducts() { + return activeProducts; + } + + @Override + public > boolean sample(final T span) { + // Priority sampling sends all traces to the core agent, including traces marked dropped, + // so the agent can collect stats on all traces. + return true; + } + + @Override + public > void setSamplingPriority(final T span) { + T rootSpan = span.getLocalRootSpan(); + if (rootSpan instanceof DDSpan) { + DDSpan ddRootSpan = (DDSpan) rootSpan; + int traceSource = ddRootSpan.context().getPropagationTags().getTraceSource(); + for (StandaloneProduct product : activeProducts) { + if (ProductTraceSource.isProductMarked(traceSource, product.traceSourceBit)) { + log.debug("Set SAMPLER_KEEP for {} span {}", product.name(), span.getSpanId()); + span.setSamplingPriority(SAMPLER_KEEP, product.samplingMechanism); + return; + } + } + } + // APM-only trace: rate-limit for billing if required, otherwise drop. + if (needsBillingTrace) { + if (shouldSample()) { + log.debug("Set SAMPLER_KEEP for billing APM span {}", span.getSpanId()); + span.setSamplingPriority(SAMPLER_KEEP, billingMechanism); + } else { + log.debug("Set SAMPLER_DROP for APM span {}", span.getSpanId()); + span.setSamplingPriority(SAMPLER_DROP, billingMechanism); + } + } else { + log.debug("Set SAMPLER_DROP for APM-only span {}", span.getSpanId()); + span.setSamplingPriority(SAMPLER_DROP, SamplingMechanism.DEFAULT); + } + } + + private boolean shouldSample() { + long now = clock.millis(); + return lastSampleTime.updateAndGet( + lastTime -> now - lastTime >= RATE_IN_MILLISECONDS ? now : lastTime) + == now; + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/TraceCollector.java b/dd-trace-core/src/main/java/datadog/trace/core/TraceCollector.java index 777fc3889bf..fabe6b1bfdd 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/TraceCollector.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/TraceCollector.java @@ -65,10 +65,10 @@ public void setSamplingPriorityIfNecessary() { DDSpan rootSpan = getRootSpan(); if (traceConfig.sampler instanceof PrioritySampler && rootSpan != null) { // Ignore the force-keep priority in the absence of propagated _dd.p.ts span tag marked for - // ASM. + // any standalone product (ASM, LLMOBS, …). if ((!Config.get().isApmTracingEnabled() - && !ProductTraceSource.isProductMarked( - rootSpan.context().getPropagationTags().getTraceSource(), ProductTraceSource.ASM)) + && !ProductTraceSource.isAnyStandaloneProductMarked( + rootSpan.context().getPropagationTags().getTraceSource())) || rootSpan.context().getSamplingPriority() == PrioritySampling.UNSET) { ((PrioritySampler) traceConfig.sampler).setSamplingPriority(rootSpan); } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/sampling/AsmStandaloneSamplerTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/sampling/AsmStandaloneSamplerTest.groovy deleted file mode 100644 index c97d6f47013..00000000000 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/sampling/AsmStandaloneSamplerTest.groovy +++ /dev/null @@ -1,58 +0,0 @@ -package datadog.trace.common.sampling - -import datadog.trace.common.writer.ListWriter -import datadog.trace.core.test.DDCoreSpecification -import datadog.trace.api.sampling.PrioritySampling - -import java.time.Clock -import java.util.concurrent.atomic.AtomicLong - -class AsmStandaloneSamplerTest extends DDCoreSpecification{ - - def writer = new ListWriter() - - void "test setSamplingPriority"(){ - setup: - def current = new AtomicLong(System.currentTimeMillis()) - final Clock clock = Mock(Clock) { - millis() >> { - current.get() - } - } - def sampler = new AsmStandaloneSampler(clock) - def tracer = tracerBuilder().writer(writer).sampler(sampler).build() - - when: - def span1 = tracer.buildSpan("test").start() - sampler.setSamplingPriority(span1) - - then: - 1 * clock.millis() >> { - current.updateAndGet(value -> value + 1000) - } // increment in one second - span1.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP - - when: - def span2 = tracer.buildSpan("test2").start() - sampler.setSamplingPriority(span2) - - then: - 1 * clock.millis() >> { - current.updateAndGet(value -> value + 1000) - } // increment in one second - span2.getSamplingPriority() == PrioritySampling.SAMPLER_DROP - - when: - def span3 = tracer.buildSpan("test3").start() - sampler.setSamplingPriority(span3) - - then: "Mock one minute later" - clock.millis() >> { - current.updateAndGet(value -> value + 60000) - } // increment in one minute - span3.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP - - cleanup: - tracer.close() - } -} diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/sampling/SamplerTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/sampling/SamplerTest.groovy index 706f45ea43f..397ab2cf3fd 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/sampling/SamplerTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/sampling/SamplerTest.groovy @@ -2,68 +2,34 @@ package datadog.trace.common.sampling import datadog.trace.api.Config import datadog.trace.test.util.DDSpecification +import spock.lang.Unroll -class SamplerTest extends DDSpecification{ +class SamplerTest extends DDSpecification { - void "test that AsmStandaloneSampler is selected when apm tracing disabled and appsec enabled is enabled"() { + @Unroll + def 'sampler selection: apmEnabled=#apmEnabled llmobs=#llmobs appsec=#appsec iast=#iast sca=#sca → #expectedType.simpleName with activeProducts=#expectedProducts'() { setup: - System.setProperty("dd.apm.tracing.enabled", "false") - System.setProperty("dd.appsec.enabled", "true") - Config config = new Config() + if (!apmEnabled) injectSysConfig("dd.apm.tracing.enabled", "false") + if (llmobs) injectSysConfig("dd.llmobs.enabled", "true") + if (appsec) injectSysConfig("dd.appsec.enabled", "true") + if (iast) injectSysConfig("dd.iast.enabled", "true") + if (sca) injectSysConfig("dd.appsec.sca.enabled", "true") when: - Sampler sampler = Sampler.Builder.forConfig(config, null) + Sampler sampler = Sampler.Builder.forConfig(Config.get(), null) then: - sampler instanceof AsmStandaloneSampler - } - - void "test that AsmStandaloneSampler is selected when apm tracing disabled and iast enabled is enabled"() { - setup: - System.setProperty("dd.apm.tracing.enabled", "false") - System.setProperty("dd.iast.enabled", "true") - Config config = new Config() - - when: - Sampler sampler = Sampler.Builder.forConfig(config, null) - - then: - sampler instanceof AsmStandaloneSampler - } - - void "test that AsmStandaloneSampler is selected when apm tracing disabled and sca enabled is enabled"() { - setup: - System.setProperty("dd.apm.tracing.enabled", "false") - System.setProperty("dd.appsec.sca.enabled", "true") - Config config = new Config() - - when: - Sampler sampler = Sampler.Builder.forConfig(config, null) - - then: - sampler instanceof AsmStandaloneSampler - } - - void "test that AsmStandaloneSampler is not selected when apm tracing and asm not enabled"() { - setup: - System.setProperty("dd.apm.tracing.enabled", "false") - Config config = new Config() - - when: - Sampler sampler = Sampler.Builder.forConfig(config, null) - - then: - !(sampler instanceof AsmStandaloneSampler) - } - - void "test that AsmStandaloneSampler is not selected when apm tracing enabled and asm not enabled"() { - setup: - Config config = new Config() - - when: - Sampler sampler = Sampler.Builder.forConfig(config, null) - - then: - !(sampler instanceof AsmStandaloneSampler) + expectedType.isInstance(sampler) + expectedProducts == null || (sampler as StandaloneSampler).getActiveProducts() == expectedProducts + + where: + apmEnabled | llmobs | appsec | iast | sca || expectedType | expectedProducts + true | false | false | false | false || RateByServiceTraceSampler | null + false | true | false | false | false || StandaloneSampler | [StandaloneProduct.LLMOBS] + false | false | true | false | false || StandaloneSampler | [StandaloneProduct.ASM] + false | false | false | true | false || StandaloneSampler | [StandaloneProduct.ASM] + false | false | false | false | true || StandaloneSampler | [StandaloneProduct.ASM] + false | true | true | false | false || StandaloneSampler | [StandaloneProduct.LLMOBS, StandaloneProduct.ASM] + false | false | false | false | false || ForcePrioritySampler | null } } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/sampling/StandaloneSamplerTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/sampling/StandaloneSamplerTest.groovy new file mode 100644 index 00000000000..3c3c102ba1a --- /dev/null +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/sampling/StandaloneSamplerTest.groovy @@ -0,0 +1,240 @@ +package datadog.trace.common.sampling + +import datadog.trace.api.ProductTraceSource +import datadog.trace.api.sampling.PrioritySampling +import datadog.trace.bootstrap.instrumentation.api.Tags +import datadog.trace.common.writer.ListWriter +import datadog.trace.core.test.DDCoreSpecification + +import java.time.Clock +import java.util.concurrent.atomic.AtomicLong + +class StandaloneSamplerTest extends DDCoreSpecification { + + def writer = new ListWriter() + + void "LLMOBS only: LLMOBS-marked spans are kept with DEFAULT mechanism"() { + setup: + def sampler = new StandaloneSampler([StandaloneProduct.LLMOBS], Clock.systemUTC()) + def tracer = tracerBuilder().writer(writer).sampler(sampler).build() + + when: + def span = tracer.buildSpan("testInstrumentation", "llm-call").start() + def scope = tracer.activateSpan(span) + tracer.getTraceSegment().setTagTop(Tags.PROPAGATED_TRACE_SOURCE, ProductTraceSource.LLMOBS) + sampler.setSamplingPriority(span) + scope.close() + + then: + span.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP + span.context().getPropagationTags().createTagMap().get("_dd.p.dm") == "-0" + + cleanup: + tracer.close() + } + + void "LLMOBS only: APM-only spans are dropped with DEFAULT mechanism"() { + setup: + def sampler = new StandaloneSampler([StandaloneProduct.LLMOBS], Clock.systemUTC()) + def tracer = tracerBuilder().writer(writer).sampler(sampler).build() + + when: + def span = tracer.buildSpan("testInstrumentation", "http-request").start() + sampler.setSamplingPriority(span) + + then: + span.getSamplingPriority() == PrioritySampling.SAMPLER_DROP + !span.context().getPropagationTags().createTagMap().containsKey("_dd.p.dm") + + cleanup: + tracer.close() + } + + void "ASM only: ASM-marked spans are kept with APPSEC mechanism"() { + setup: + def sampler = new StandaloneSampler([StandaloneProduct.ASM], Clock.systemUTC()) + def tracer = tracerBuilder().writer(writer).sampler(sampler).build() + + when: + def span = tracer.buildSpan("testInstrumentation", "http-request").start() + def scope = tracer.activateSpan(span) + tracer.getTraceSegment().setTagTop(Tags.PROPAGATED_TRACE_SOURCE, ProductTraceSource.ASM) + sampler.setSamplingPriority(span) + scope.close() + + then: + span.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP + span.context().getPropagationTags().createTagMap().get("_dd.p.dm") == "-5" + + cleanup: + tracer.close() + } + + void "ASM only: APM-only spans are rate-limited to 1 per minute with APPSEC mechanism"() { + setup: + def current = new AtomicLong(System.currentTimeMillis()) + final Clock clock = Mock(Clock) { + millis() >> { + current.get() + } + } + def sampler = new StandaloneSampler([StandaloneProduct.ASM], clock) + def tracer = tracerBuilder().writer(writer).sampler(sampler).build() + + when: "first APM span — kept for billing" + def span1 = tracer.buildSpan("testInstrumentation", "apm-request").start() + sampler.setSamplingPriority(span1) + + then: + 1 * clock.millis() >> { + current.updateAndGet(v -> v + 1000) + } + span1.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP + span1.context().getPropagationTags().createTagMap().get("_dd.p.dm") == "-5" + + when: "second APM span within the same minute — dropped" + def span2 = tracer.buildSpan("testInstrumentation", "apm-request2").start() + sampler.setSamplingPriority(span2) + + then: + 1 * clock.millis() >> { + current.updateAndGet(v -> v + 1000) + } + span2.getSamplingPriority() == PrioritySampling.SAMPLER_DROP + !span2.context().getPropagationTags().createTagMap().containsKey("_dd.p.dm") + + when: "third APM span after 1 minute — kept again" + def span3 = tracer.buildSpan("testInstrumentation", "apm-request3").start() + sampler.setSamplingPriority(span3) + + then: + 1 * clock.millis() >> { + current.updateAndGet(v -> v + 60000) + } + span3.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP + span3.context().getPropagationTags().createTagMap().get("_dd.p.dm") == "-5" + + cleanup: + tracer.close() + } + + void "LLMOBS+ASM: LLMOBS-marked spans are kept with DEFAULT mechanism"() { + setup: + def sampler = new StandaloneSampler([StandaloneProduct.LLMOBS, StandaloneProduct.ASM], Clock.systemUTC()) + def tracer = tracerBuilder().writer(writer).sampler(sampler).build() + + when: + def span = tracer.buildSpan("testInstrumentation", "llm-call").start() + def scope = tracer.activateSpan(span) + tracer.getTraceSegment().setTagTop(Tags.PROPAGATED_TRACE_SOURCE, ProductTraceSource.LLMOBS) + sampler.setSamplingPriority(span) + scope.close() + + then: + span.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP + span.context().getPropagationTags().createTagMap().get("_dd.p.dm") == "-0" + + cleanup: + tracer.close() + } + + void "LLMOBS+ASM: ASM-marked spans are kept with APPSEC mechanism"() { + setup: + def sampler = new StandaloneSampler([StandaloneProduct.LLMOBS, StandaloneProduct.ASM], Clock.systemUTC()) + def tracer = tracerBuilder().writer(writer).sampler(sampler).build() + + when: + def span = tracer.buildSpan("testInstrumentation", "http-request").start() + def scope = tracer.activateSpan(span) + tracer.getTraceSegment().setTagTop(Tags.PROPAGATED_TRACE_SOURCE, ProductTraceSource.ASM) + sampler.setSamplingPriority(span) + scope.close() + + then: + span.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP + span.context().getPropagationTags().createTagMap().get("_dd.p.dm") == "-5" + + cleanup: + tracer.close() + } + + void "LLMOBS+ASM: span with both LLMOBS and ASM bits set is kept with DEFAULT mechanism (LLMOBS wins)"() { + setup: + def sampler = new StandaloneSampler([StandaloneProduct.LLMOBS, StandaloneProduct.ASM], Clock.systemUTC()) + def tracer = tracerBuilder().writer(writer).sampler(sampler).build() + + when: + def span = tracer.buildSpan("testInstrumentation", "waf-llm-request").start() + def scope = tracer.activateSpan(span) + tracer.getTraceSegment().setTagTop(Tags.PROPAGATED_TRACE_SOURCE, ProductTraceSource.LLMOBS | ProductTraceSource.ASM) + sampler.setSamplingPriority(span) + scope.close() + + then: + span.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP + span.context().getPropagationTags().createTagMap().get("_dd.p.dm") == "-0" + + cleanup: + tracer.close() + } + + void "LLMOBS+ASM: APM-only spans are rate-limited with APPSEC mechanism"() { + setup: + def current = new AtomicLong(System.currentTimeMillis()) + final Clock clock = Mock(Clock) { + millis() >> { + current.get() + } + } + def sampler = new StandaloneSampler([StandaloneProduct.LLMOBS, StandaloneProduct.ASM], clock) + def tracer = tracerBuilder().writer(writer).sampler(sampler).build() + + when: "first APM span" + def span1 = tracer.buildSpan("testInstrumentation", "apm-request").start() + sampler.setSamplingPriority(span1) + + then: + 1 * clock.millis() >> { + current.updateAndGet(v -> v + 1000) + } + span1.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP + + when: "second APM span within the same minute" + def span2 = tracer.buildSpan("testInstrumentation", "apm-request2").start() + sampler.setSamplingPriority(span2) + + then: + 1 * clock.millis() >> { + current.updateAndGet(v -> v + 1000) + } + span2.getSamplingPriority() == PrioritySampling.SAMPLER_DROP + + when: "third APM span after 1 minute" + def span3 = tracer.buildSpan("testInstrumentation", "apm-request3").start() + sampler.setSamplingPriority(span3) + + then: + 1 * clock.millis() >> { + current.updateAndGet(v -> v + 60000) + } + span3.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP + + cleanup: + tracer.close() + } + + void "sample() always returns true"() { + setup: + def sampler = new StandaloneSampler([StandaloneProduct.LLMOBS], Clock.systemUTC()) + def tracer = tracerBuilder().writer(writer).sampler(sampler).build() + + when: + def span = tracer.buildSpan("testInstrumentation", "test").start() + + then: + sampler.sample(span) == true + + cleanup: + tracer.close() + } +} diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/TraceCollectorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/TraceCollectorTest.groovy new file mode 100644 index 00000000000..12b44329779 --- /dev/null +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/TraceCollectorTest.groovy @@ -0,0 +1,40 @@ +package datadog.trace.core + +import datadog.trace.api.ProductTraceSource +import datadog.trace.api.sampling.PrioritySampling +import datadog.trace.api.sampling.SamplingMechanism +import datadog.trace.bootstrap.instrumentation.api.Tags +import datadog.trace.common.sampling.StandaloneProduct +import datadog.trace.common.sampling.StandaloneSampler +import datadog.trace.common.writer.ListWriter +import datadog.trace.core.test.DDCoreSpecification + +import java.time.Clock + +class TraceCollectorTest extends DDCoreSpecification { + + def writer = new ListWriter() + + void "setSamplingPriorityIfNecessary: sampler is skipped when APM disabled, standalone product flag set, and priority already non-UNSET"() { + setup: + injectSysConfig("dd.apm.tracing.enabled", "false") + def sampler = Spy(StandaloneSampler, constructorArgs: [[StandaloneProduct.LLMOBS], Clock.systemUTC()]) + def tracer = tracerBuilder().writer(writer).sampler(sampler).build() + + when: + def span = tracer.buildSpan("testInstrumentation", "llm-request").start() + def scope = tracer.activateSpan(span) + tracer.getTraceSegment().setTagTop(Tags.PROPAGATED_TRACE_SOURCE, ProductTraceSource.LLMOBS) + span.setSamplingPriority(PrioritySampling.USER_KEEP, SamplingMechanism.MANUAL) + scope.close() + span.finish() + writer.waitForTraces(1) + + then: + 0 * sampler.setSamplingPriority(_) + span.getSamplingPriority() == PrioritySampling.USER_KEEP + + cleanup: + tracer.close() + } +} diff --git a/internal-api/src/main/java/datadog/trace/api/ProductTraceSource.java b/internal-api/src/main/java/datadog/trace/api/ProductTraceSource.java index 792829dde3a..ad6afb1a0ff 100644 --- a/internal-api/src/main/java/datadog/trace/api/ProductTraceSource.java +++ b/internal-api/src/main/java/datadog/trace/api/ProductTraceSource.java @@ -22,6 +22,10 @@ public class ProductTraceSource { public static final int DSM = 0x04; public static final int DJM = 0x08; public static final int DBM = 0x10; + public static final int LLMOBS = 0x20; + + /** Bitmask of all products that can operate in standalone mode (without APM tracing). */ + private static final int STANDALONE_PRODUCTS_MASK = ASM | LLMOBS; /** Updates the bitfield by setting the bit corresponding to a specific product. */ public static int updateProduct(int bitfield, int product) { @@ -33,6 +37,11 @@ public static boolean isProductMarked(final int bitfield, int product) { return (bitfield & product) != 0; // Check if the bit is set } + /** Returns true if the bitfield contains a mark for any standalone product. */ + public static boolean isAnyStandaloneProductMarked(final int bitfield) { + return (bitfield & STANDALONE_PRODUCTS_MASK) != 0; + } + /** * Converts the current bitfield to a two-character hexadecimal string. * diff --git a/internal-api/src/test/groovy/datadog/trace/api/ProductTraceSourceTest.groovy b/internal-api/src/test/groovy/datadog/trace/api/ProductTraceSourceTest.groovy index bc376154217..a01f7bd58b9 100644 --- a/internal-api/src/test/groovy/datadog/trace/api/ProductTraceSourceTest.groovy +++ b/internal-api/src/test/groovy/datadog/trace/api/ProductTraceSourceTest.groovy @@ -30,6 +30,21 @@ class ProductTraceSourceTest extends DDSpecification { ProductTraceSource.DSM | ProductTraceSource.ASM | false } + void 'test isAnyStandaloneProductMarked'(){ + when: + final result = ProductTraceSource.isAnyStandaloneProductMarked(value) + + then: + result == expected + + where: + value | expected + ProductTraceSource.UNSET | false + ProductTraceSource.APM | false + ProductTraceSource.ASM | true + ProductTraceSource.LLMOBS | true + } + void 'test getBitfieldHex'(){ when: final result = ProductTraceSource.getBitfieldHex(value)