Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go/action_kit_sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## Unreleased

- fix: prevent data races and panics in the action stop/heartbeat handling — guard the shared `stopEvents` slice with a mutex, make `heartbeat.Monitor.Stop` idempotent, and make `RecordHeartbeat` a non-blocking, closed-safe send, so concurrent stop/status/timeout paths can no longer crash the extension (double-close / send-on-closed-channel / slice race)

## 1.3.1

- Update dependencies
Expand Down
12 changes: 9 additions & 3 deletions go/action_kit_sdk/action_sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var (
registeredActions = make(map[string]interface{})
statePersister = state_persister.NewInmemoryStatePersister()
stopEvents = make([]stopEvent, 0, 10)
stopEventsMu sync.Mutex
heartbeatMonitors = sync.Map{}
)

Expand Down Expand Up @@ -270,14 +271,17 @@ func recordHeartbeat(executionId uuid.UUID) {
}

func stopMonitorHeartbeat(executionId uuid.UUID) {
monitor, _ := heartbeatMonitors.Load(executionId)
if monitor != nil {
// LoadAndDelete so that when two paths stop the same execution concurrently (the HTTP
// stop handler and the heartbeat-timeout goroutine) only one gets the monitor; Stop is
// idempotent regardless.
if monitor, ok := heartbeatMonitors.LoadAndDelete(executionId); ok {
monitor.(*heartbeat.Monitor).Stop()
heartbeatMonitors.Delete(executionId)
}
}

func markAsStopped(executionId uuid.UUID, reason string) {
stopEventsMu.Lock()
defer stopEventsMu.Unlock()
if len(stopEvents) > 100 {
stopEvents = stopEvents[1:]
}
Expand All @@ -289,6 +293,8 @@ func markAsStopped(executionId uuid.UUID, reason string) {
}

func getStopEvent(executionId uuid.UUID) *stopEvent {
stopEventsMu.Lock()
defer stopEventsMu.Unlock()
for _, event := range stopEvents {
if event.executionId == executionId {
return &event
Expand Down
15 changes: 15 additions & 0 deletions go/action_kit_sdk/action_sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,25 @@ package action_kit_sdk
import (
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"sync"
"testing"
"time"
)

// TestStopEvents_concurrent_access exercises markAsStopped (write) and getStopEvent (read)
// from many goroutines, mirroring the HTTP stop/status handlers, the heartbeat-timeout
// goroutine and the signal handler racing on the shared stopEvents slice. Run under -race.
func TestStopEvents_concurrent_access(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
id := uuid.New()
wg.Add(2)
go func() { defer wg.Done(); markAsStopped(id, "test") }()
go func() { defer wg.Done(); _ = getStopEvent(id) }()
}
wg.Wait()
}

// This test reproduced an issue in which new heartbeats
// would not be processed anymore and led to a stop of the experiment.
func TestHeartbeat_should_not_timeout(t *testing.T) {
Expand Down
26 changes: 24 additions & 2 deletions go/action_kit_sdk/heartbeat/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ package heartbeat

import (
"github.com/rs/zerolog/log"
"sync"
"time"
)

type Monitor struct {
pulse chan time.Time
mu sync.Mutex
pulse chan time.Time
closed bool
}

func Notify(ch chan<- time.Time, interval, timeout time.Duration) *Monitor {
Expand Down Expand Up @@ -71,10 +74,29 @@ func Notify(ch chan<- time.Time, interval, timeout time.Duration) *Monitor {

func (h *Monitor) RecordHeartbeat() {
log.Trace().Msg("received heartbeat")
h.pulse <- time.Now()
h.mu.Lock()
defer h.mu.Unlock()
if h.closed {
return
}
// Non-blocking send: once Stop has closed the channel we must not send (that panics),
// and if the buffer is full the reader only needs to see recent activity — so dropping
// a beat is fine and must never block the caller (an HTTP status handler goroutine).
select {
case h.pulse <- time.Now():
default:
}
}

// Stop is idempotent: concurrent or repeated Stop calls (e.g. the HTTP stop handler and
// the heartbeat-timeout goroutine both stopping the same execution) close the channel once.
func (h *Monitor) Stop() {
h.mu.Lock()
defer h.mu.Unlock()
if h.closed {
return
}
log.Debug().Msg("closing heartbeat channel")
h.closed = true
close(h.pulse)
}
24 changes: 24 additions & 0 deletions go/action_kit_sdk/heartbeat/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,35 @@ package heartbeat

import (
"github.com/stretchr/testify/assert"
"sync"
"sync/atomic"
"testing"
"time"
)

// TestMonitor_concurrent_stop_and_record hammers RecordHeartbeat with concurrent and
// repeated Stop calls: it must never panic (double close / send on closed channel), and a
// RecordHeartbeat after Stop must be a no-op. Run under -race.
func TestMonitor_concurrent_stop_and_record(t *testing.T) {
ch := make(chan time.Time, 1)
hb := Notify(ch, time.Hour, time.Hour)

var wg sync.WaitGroup
for i := 0; i < 50; i++ {
wg.Add(1)
go func() { defer wg.Done(); hb.RecordHeartbeat() }()
}
for i := 0; i < 5; i++ {
wg.Add(1)
go func() { defer wg.Done(); hb.Stop() }()
}
wg.Wait()

// After Stop, further heartbeats are dropped rather than panicking.
hb.RecordHeartbeat()
hb.Stop()
}

func TestHeartbeat_should_timeout(t *testing.T) {
ch := make(chan time.Time)
hb := Notify(ch, 300*time.Millisecond, 150*time.Millisecond)
Expand Down
Loading