Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions reporter/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,43 @@
package reporter

import (
"github.com/parca-dev/oomprof/oomprof"
"go.opentelemetry.io/ebpf-profiler/reporter"
"go.opentelemetry.io/otel/log"
)

// ParcaReporter is the parca-agent reporter API: it accepts profile trace
// events (via the embedded TraceReporter) and hands out OTel logs Loggers for
// any in-process producer that wants to ship records via the agent's shared
// OTLP/gRPC connection.
// ParcaReporter is the parca-agent superset of the otel ebpf-profiler
// reporter interfaces. It carries everything `main.go` needs to drive the
// agent end-to-end:
//
// Consumers that only need to publish logs (e.g. the probes BPF service, or
// the logrus -> OTLP hook) should depend on this interface rather than the
// concrete implementation, so they remain independent of profile-side code.
// - reporter.Reporter (ReportTraceEvent + Start + Stop)
// - reporter.ExecutableReporter (ReportExecutable)
// - ReportMetrics (for the otel-side metrics fan-in)
// - ReportMemoryTraces (the dedicated path for oomprof and any
// future memory-attributed-trace producer)
// - Logger (OTel logs for in-process producers)
//
// `ReportMemoryTraces` exists so memory profiles don't have to ride on the
// TraceReporter contract. Callers pass a per-process batch of
// stacktrace+counter samples and the implementation writes the
// inuse/alloc rows directly. The signature mirrors the upstream
// oomprof.Reporter contract because oomprof is the only producer today;
// if a non-oomprof producer ever appears we can extract a parca-agent-
// local type then.
type ParcaReporter interface {
reporter.TraceReporter
reporter.Reporter
reporter.ExecutableReporter

// ReportMetrics fans otel-side metric updates back into the agent's
// prometheus registry.
ReportMetrics(timestamp uint32, ids []uint32, values []int64)

// ReportMemoryTraces emits one or more memory-attributed traces for a
// single process snapshot. The samples slice carries the stacks plus
// alloc/free counters; meta carries the per-process attribution
// (PID, comm, executable path, build ID). Implementations should
// hold their writer lock at most once per call.
ReportMemoryTraces(samples []oomprof.Sample, meta oomprof.SampleMeta) error

// Logger returns an OTel logs Logger bound to the given instrumentation
// scope name. Callers should use distinct scope names per producer (e.g.
Expand Down
46 changes: 46 additions & 0 deletions reporter/oomprof_adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Apache License 2.0.
* See the file "LICENSE" for details.
*/

package reporter

import "github.com/parca-dev/oomprof/oomprof"

// oomprofBatchSize bounds how many samples we ship to the reporter in a
// single ReportMemoryTraces call. Memory traces share a writer lock with
// every other v2 sample row, so we want each lock acquisition to do a
// bounded amount of work. oomprof can deliver large bursts when a process
// is rapidly allocating.
const oomprofBatchSize = 100

// oomprofAdapter implements oomprof.Reporter on top of a ParcaReporter,
// chunking the incoming sample batch and forwarding each chunk through
// ReportMemoryTraces. It holds the ParcaReporter interface (not the
// concrete impl) so tests and future producers can stub it.
type oomprofAdapter struct {
rep ParcaReporter
}

func newOOMProfAdapter(r ParcaReporter) *oomprofAdapter {
return &oomprofAdapter{rep: r}
}

// SampleEvents satisfies oomprof.Reporter. Each chunk of at most
// oomprofBatchSize samples is handed to the reporter as a single
// ReportMemoryTraces call.
func (a *oomprofAdapter) SampleEvents(
samples []oomprof.Sample, meta oomprof.SampleMeta,
) error {
for i := 0; i < len(samples); i += oomprofBatchSize {
end := i + oomprofBatchSize
if end > len(samples) {
end = len(samples)
}
if err := a.rep.ReportMemoryTraces(samples[i:end], meta); err != nil {
return err
}
}
return nil
}
201 changes: 97 additions & 104 deletions reporter/parca_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ import (
"github.com/parca-dev/parca-agent/reporter/metadata"
)

// Assert that we implement the full Reporter interface.
var _ reporter.Reporter = (*arrowReporter)(nil)
// Assert that we implement the full ParcaReporter interface (which itself
// embeds otel's reporter.Reporter, so the otel contract is covered too).
var _ ParcaReporter = (*arrowReporter)(nil)

// GPU sample reporting. PC samples are reported as a raw sample count
// (gpu_pcsample/count); the per-sample weight — NsPerSample = 2^SamplingFactor
Expand Down Expand Up @@ -110,10 +111,10 @@ type labelRetrievalResult struct {
keep bool
}

// use 0xFF to represent a special frame type for oomprof memory samples.
const oomprofMemoryFrame libpf.FrameType = 0xFF

// arrowReporter receives and transforms information to be OTLP/profiles compliant.
// arrowReporter is the concrete arrow-row builder behind the
// ParcaReporter interface. It transforms otel trace events and
// parca-agent-specific memory traces into OTLP/profiles-compliant arrow
// records and ships them to the configured backend.
type arrowReporter struct {
// client for the connection to the receiver.
client profilestoregrpc.ProfileStoreServiceClient
Expand Down Expand Up @@ -285,6 +286,10 @@ func maybeFixTruncation(s string, maxLen int) (string, bool) {
}

// ReportTraceEvent enqueues reported trace events for the OTLP reporter.
//
// Memory-origin traces do not flow through this method — they take the
// dedicated ReportMemoryTraces path so a memory batch can hold the writer
// lock once for many rows.
func (r *arrowReporter) ReportTraceEvent(trace *libpf.Trace,
meta *samples.TraceEventMeta,
) error {
Expand Down Expand Up @@ -361,34 +366,6 @@ func (r *arrowReporter) ReportTraceEvent(trace *libpf.Trace,
writeSample(meta.Value, int64(time.Second.Nanoseconds()), 1e9/int64(r.samplesPerSecond), "parca_agent", "wallclock", "nanoseconds", "samples", "count")
r.sampleWriter.Temporality.AppendString("delta")
r.offcpuSamples.Inc()
case support.TraceOriginMemory:
mod, ok := meta.OriginData.(*oomprof.Sample)
if !ok {
log.Warnf("memory trace event missing OriginData (got %T)", meta.OriginData)
break
}
// TODO: this isn't necessarily correct and should be extracted from the Go process somehow.
memPeriod := int64(512 * 1024) // 512 KiB
// Write 4 memory samples
// 1. inuse_objects (Allocs - Frees)
if mod.Allocs != mod.Frees {
r.sampleWriter.Temporality.AppendNull()
writeSample(int64(mod.Allocs-mod.Frees), 0, memPeriod, "memory", "inuse_objects", "count", "space", "bytes")
}
// 2. inuse_space (AllocBytes - FreeBytes)
if mod.AllocBytes != mod.FreeBytes {
r.sampleWriter.Temporality.AppendNull()
writeSample(int64(mod.AllocBytes-mod.FreeBytes), 0, memPeriod, "memory", "inuse_space", "bytes", "space", "bytes")
}
if r.reportAllocs {
// 3. alloc_objects
r.sampleWriter.Temporality.AppendNull()
writeSample(int64(mod.Allocs), 0, memPeriod, "memory", "alloc_objects", "count", "space", "bytes")
// 4. alloc_space
r.sampleWriter.Temporality.AppendNull()
writeSample(int64(mod.AllocBytes), 0, memPeriod, "memory", "alloc_space", "bytes", "space", "bytes")
}
r.memorySamples.Inc()
case support.TraceOriginCuda:
if r.mergeGpuProfiles {
r.sampleWriter.Label("gpu_view").AppendString("kernel_time")
Expand Down Expand Up @@ -423,9 +400,12 @@ func (r *arrowReporter) ReportTraceEvent(trace *libpf.Trace,
return nil
}

// reportTraceEventV2 handles trace events using the v2 schema with inline stacktraces.
// reportTraceEventV2 handles trace events using the v2 schema with inline
// stacktraces. Memory-origin traces do not pass through this method —
// they are written by ReportMemoryTraces.
func (r *arrowReporter) reportTraceEventV2(trace *libpf.Trace, traceHash libpf.TraceHash,
meta *samples.TraceEventMeta, labelResult labelRetrievalResult) error {
meta *samples.TraceEventMeta, labelResult labelRetrievalResult,
) error {

r.sampleWriterV2Mu.Lock()
defer r.sampleWriterV2Mu.Unlock()
Expand All @@ -437,25 +417,6 @@ func (r *arrowReporter) reportTraceEventV2(trace *libpf.Trace, traceHash libpf.T
case support.TraceOriginOffCPU:
r.writeSampleV2(trace, traceHash, meta, labelResult, meta.Value, uint64(time.Second.Nanoseconds()), 0, true, "parca_agent", "wallclock", "nanoseconds", "samples", "count")
r.offcpuSamples.Inc()
case support.TraceOriginMemory:
mod, ok := meta.OriginData.(*oomprof.Sample)
if !ok {
log.Warnf("memory trace event missing OriginData (got %T)", meta.OriginData)
break
}
log.Infof("Received memory trace event for TID %d, PID %d, comm %s", meta.TID, meta.PID, meta.Comm)
memPeriod := int64(512 * 1024) // 512 KiB
if mod.Allocs != mod.Frees {
r.writeSampleV2(trace, traceHash, meta, labelResult, int64(mod.Allocs-mod.Frees), 0, memPeriod, false, "memory", "inuse_objects", "count", "space", "bytes")
}
if mod.AllocBytes != mod.FreeBytes {
r.writeSampleV2(trace, traceHash, meta, labelResult, int64(mod.AllocBytes-mod.FreeBytes), 0, memPeriod, false, "memory", "inuse_space", "bytes", "space", "bytes")
}
if r.reportAllocs {
r.writeSampleV2(trace, traceHash, meta, labelResult, int64(mod.Allocs), 0, memPeriod, false, "memory", "alloc_objects", "count", "space", "bytes")
r.writeSampleV2(trace, traceHash, meta, labelResult, int64(mod.AllocBytes), 0, memPeriod, false, "memory", "alloc_space", "bytes", "space", "bytes")
}
r.memorySamples.Inc()
case support.TraceOriginCuda:
if r.mergeGpuProfiles {
r.sampleWriterV2.Label("gpu_view").AppendString("kernel_time")
Expand Down Expand Up @@ -667,12 +628,6 @@ func (r *arrowReporter) appendLocationV2(frame libpf.Frame) uint32 {
StartLine: 0,
}))

case oomprofMemoryFrame:
b.locFrameType.AppendString(libpf.NativeFrame.String())
b.locMappingFile.AppendString(frame.SourceFile.String())
b.locMappingID.AppendString(frame.FunctionName.String())
// No lines for oomprof frames

default:
// Interpreted frames (Python, Ruby, V8 etc.)
// Forward the Mapping's GnuBuildID when present so the
Expand Down Expand Up @@ -886,55 +841,100 @@ func (r *arrowReporter) ReportHostMetadataBlocking(_ context.Context,
return nil
}

// SampleEvents implements the oomprof.Reporter interface.
// It converts oomprof samples to trace events and reports them.
func (r *arrowReporter) SampleEvents(oomprofSamples []oomprof.Sample, meta oomprof.SampleMeta) error {
log.Debugf("Received %d oomprof samples for PID %d, comm: %s", len(oomprofSamples), meta.PID, meta.Comm)
// memorySamplePeriod is the assumed inter-allocation period used as the
// pprof "period" for memory rows. 512 KiB matches the previous behavior;
// long term this should be derived from the target process.
const memorySamplePeriod int64 = 512 * 1024

// The process is potentially gone and if it was in a container the path maybe gone too so
// we have to make do with the BuildID/Comm/ExecutablePath in the SampleMeta. So we're gonna
// hack it and put in a special frame type that stashes the BuildID instead of relying
// on the usual FileID mechanism.
// ReportMemoryTraces emits inuse / alloc rows for a batch of memory-
// attributed traces. All samples share `meta` (one process snapshot), so
// labels are computed once and the v2 writer lock is taken once for the
// whole call.
//
// The agent v2 schema is the only target — memory profiles never went
// through the v1 path in production. The trace's call stack is encoded as
// a libpf.Trace of native frames with the build ID stashed on
// FunctionName so the location builder can synthesize a mapping for the
// possibly-gone process.
func (r *arrowReporter) ReportMemoryTraces(
memSamples []oomprof.Sample, meta oomprof.SampleMeta,
) error {
if !r.useV2Schema {
// v1 never carried memory profiles end-to-end; drop loudly so
// misconfigurations are obvious.
return fmt.Errorf("ReportMemoryTraces requires the v2 schema; v1 memory reporting is unsupported")
}
if len(memSamples) == 0 {
return nil
}
log.Debugf("Received %d oomprof samples for PID %d, comm: %s", len(memSamples), meta.PID, meta.Comm)

pid := libpf.PID(meta.PID)
comm := libpf.Intern(meta.Comm)
labelResult := r.labelsForTID(pid, pid, comm, 0, support.TraceOriginUnknown, nil)
if !labelResult.keep {
r.skippedByRelabeling.Inc()
log.Debugf("Skipping %d memory traces for PID %d, filtered by relabeling", len(memSamples), meta.PID)
return nil
}

// Intern the per-process attributes once for the whole batch.
buildID := libpf.Intern(meta.BuildID)
execPath := libpf.Intern(meta.ExecutablePath)
var customLabels map[libpf.String]libpf.String
if len(meta.CustomLabels) > 0 {
customLabels = make(map[libpf.String]libpf.String, len(meta.CustomLabels))
for k, v := range meta.CustomLabels {
customLabels[libpf.Intern(k)] = libpf.Intern(v)
}
}

// Create trace event metadata
traceEventMeta := &samples.TraceEventMeta{
Timestamp: libpf.UnixTime64(meta.Timestamp),
Comm: libpf.Intern(meta.Comm),
Origin: support.TraceOriginMemory,
Comm: comm,
Origin: support.TraceOriginUnknown,
ProcessName: libpf.Intern(meta.ProcessName),
ExecutablePath: libpf.Intern(meta.ExecutablePath),
PID: libpf.PID(meta.PID),
TID: libpf.PID(meta.PID), // For oomprof, TID is same as PID
ExecutablePath: execPath,
PID: pid,
TID: pid, // oomprof samples carry the process, not a single TID.
}

for _, sample := range oomprofSamples {
// Create a trace from the oomprof sample
t := &libpf.Trace{}
r.sampleWriterV2Mu.Lock()
defer r.sampleWriterV2Mu.Unlock()

for i := range memSamples {
s := &memSamples[i]

// Convert addresses to frames
for _, addr := range sample.Addresses {
t := &libpf.Trace{CustomLabels: customLabels}
for _, addr := range s.Addresses {
t.Frames.Append(&libpf.Frame{
Type: oomprofMemoryFrame,
Type: libpf.NativeFrame,
AddressOrLineno: libpf.AddressOrLineno(addr),
FunctionName: libpf.Intern(meta.BuildID), // Stash the BuildID here
SourceFile: libpf.Intern(meta.ExecutablePath), // MappingFile
FunctionName: buildID, // Stash the BuildID for the location builder.
SourceFile: execPath, // Stash the executable path.
})
}
traceHash := traceutil.HashTrace(t)

// TODO - should we make oomprof's meta use libpf.String ?
if meta.CustomLabels != nil {
t.CustomLabels = make(map[libpf.String]libpf.String, len(meta.CustomLabels))
for k, v := range meta.CustomLabels {
t.CustomLabels[libpf.Intern(k)] = libpf.Intern(v)
}
if s.Allocs != s.Frees {
r.writeSampleV2(t, traceHash, traceEventMeta, labelResult,
int64(s.Allocs-s.Frees), 0, memorySamplePeriod, false,
"memory", "inuse_objects", "count", "space", "bytes")
}

traceEventMeta.OriginData = &sample

// Report the trace event
if err := r.ReportTraceEvent(t, traceEventMeta); err != nil {
return fmt.Errorf("failed to report oomprof trace event: %w", err)
if s.AllocBytes != s.FreeBytes {
r.writeSampleV2(t, traceHash, traceEventMeta, labelResult,
int64(s.AllocBytes-s.FreeBytes), 0, memorySamplePeriod, false,
"memory", "inuse_space", "bytes", "space", "bytes")
}
if r.reportAllocs {
r.writeSampleV2(t, traceHash, traceEventMeta, labelResult,
int64(s.Allocs), 0, memorySamplePeriod, false,
"memory", "alloc_objects", "count", "space", "bytes")
r.writeSampleV2(t, traceHash, traceEventMeta, labelResult,
int64(s.AllocBytes), 0, memorySamplePeriod, false,
"memory", "alloc_space", "bytes", "space", "bytes")
}
r.memorySamples.Inc()
}
return nil
}
Expand Down Expand Up @@ -1042,7 +1042,7 @@ func New(
useV2Schema bool,
mergeGpuProfiles bool,
grpcConn *grpc.ClientConn,
) (*arrowReporter, error) {
) (ParcaReporter, error) {
if offlineModeConfig != nil && !disableSymbolUpload {
return nil, errors.New("Illogical configuration: offline mode with symbol upload enabled")
}
Expand Down Expand Up @@ -1220,7 +1220,7 @@ func New(
ReportAlloc: enableAllocs,
}

state, err := oomprof.SetupWithReporter(context.TODO(), config, r)
state, err := oomprof.SetupWithReporter(context.TODO(), config, newOOMProfAdapter(r))
if err != nil {
close(r.stopSignal)
return nil, fmt.Errorf("failed to setup oomprof: %w", err)
Expand Down Expand Up @@ -1923,13 +1923,6 @@ func (r *arrowReporter) buildStacktraceRecord(ctx context.Context, stacktraceIDs
w.FunctionFilename.AppendNull()
w.FunctionStartLine.Append(0)

case oomprofMemoryFrame:
// This is a special frame that is used to report OOMProf samples.
w.FrameType.AppendString(libpf.NativeFrame.String())
w.MappingFile.AppendString(frame.SourceFile.String())
w.MappingBuildID.AppendString(frame.FunctionName.String())
w.Lines.Append(false)
isComplete = false
default:
w.FrameType.AppendString(frame.Type.String())

Expand Down
Loading