feat(bigtable): add ClientConfigurationManager + EnableSession config#19986
feat(bigtable): add ClientConfigurationManager + EnableSession config#19986sushanb wants to merge 1 commit into
Conversation
Introduce the ClientConfigurationManager, which polls the service via GetClientConfiguration on a fixed interval and fans the parsed config out to registered listeners. Includes the manager (Start / Close / listener registration / RPC retry / validity-window fallback) and a test suite covering construction, polling, listener fan-out, retry on transient errors, and Close shutdown semantics. Wire it into Client behind a new ClientConfig.EnableSession flag (off by default). When set, NewClientWithConfig constructs the manager using the data-plane BigtableClient and the instance-scoped request metadata, then starts it; Close stops the manager first so it cannot fire listener callbacks against state that is about to be torn down. Future session-pool PRs will key off the configuration surface this manager exposes. For now it only polls. Part of the series of small PRs replacing googleapis#19980.
There was a problem hiding this comment.
Code Review
This pull request introduces a ClientConfigurationManager to dynamically poll and manage client configurations from the Bigtable control plane. The review feedback identifies several critical issues: the background polling loop is bound to a potentially short-lived context in Start which could prematurely terminate polling; a race condition in addListener can allow stale configurations to overwrite newer ones; an integer overflow in the exponential backoff calculation (1<<i) can cause a runtime panic in rand.Intn; and the fallback mechanism fails to reset the validity window (m.validUntil), resulting in redundant fallback notifications during prolonged outages.
| func (m *ClientConfigurationManager) Start(ctx context.Context) { | ||
| btopt.Debugf(m.logger, "bigtable: starting client configuration manager for instance %q, app profile %q", m.instanceName, m.appProfileId) | ||
| // We need a context for the initial poll. | ||
| m.pollsWG.Add(1) | ||
| go func() { | ||
| defer m.pollsWG.Done() | ||
| pollCtx, cancel := context.WithTimeout(ctx, 5*time.Second) | ||
| defer cancel() | ||
| m.poll(pollCtx) | ||
| }() | ||
|
|
||
| // Start background polling | ||
| go m.pollingLoop(ctx) | ||
| } | ||
|
|
There was a problem hiding this comment.
The background polling loop and the initial poll are currently bound to the lifecycle of the context passed to Start(ctx). In typical usage, the context passed to client initialization (NewClientWithConfig) is short-lived and canceled shortly after client creation. If this context is canceled, the background polling loop will terminate immediately, and the initial poll may be cut short.
To ensure the configuration manager continues to poll for the lifetime of the client, the background polling loop should be run with context.Background(). Similarly, the initial poll should use context.Background() with a timeout to avoid premature cancellation.
| func (m *ClientConfigurationManager) Start(ctx context.Context) { | |
| btopt.Debugf(m.logger, "bigtable: starting client configuration manager for instance %q, app profile %q", m.instanceName, m.appProfileId) | |
| // We need a context for the initial poll. | |
| m.pollsWG.Add(1) | |
| go func() { | |
| defer m.pollsWG.Done() | |
| pollCtx, cancel := context.WithTimeout(ctx, 5*time.Second) | |
| defer cancel() | |
| m.poll(pollCtx) | |
| }() | |
| // Start background polling | |
| go m.pollingLoop(ctx) | |
| } | |
| // Start begins the polling process. | |
| func (m *ClientConfigurationManager) Start(ctx context.Context) { | |
| btopt.Debugf(m.logger, "bigtable: starting client configuration manager for instance %q, app profile %q", m.instanceName, m.appProfileId) | |
| // We need a context for the initial poll. | |
| m.pollsWG.Add(1) | |
| go func() { | |
| defer m.pollsWG.Done() | |
| pollCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | |
| defer cancel() | |
| m.poll(pollCtx) | |
| }() | |
| // Start background polling | |
| go m.pollingLoop(context.Background()) | |
| } |
There was a problem hiding this comment.
wrong. ctx is from Newclient so it is long lived.
| func (m *ClientConfigurationManager) addListener(listener configListener) func() { | ||
| m.mu.Lock() | ||
| id := m.nextListenerID | ||
| m.nextListenerID++ | ||
| if m.listeners == nil { | ||
| m.listeners = make(map[int]configListener) | ||
| } | ||
| m.listeners[id] = listener | ||
|
|
||
| cfg := m.currentConfig.Clone() | ||
| seq := m.configSeq | ||
| m.mu.Unlock() | ||
|
|
||
| btopt.Debugf(m.logger, "bigtable: adding configuration listener (id: %d)", id) | ||
| listener(cfg, seq) | ||
|
|
||
| return func() { | ||
| m.mu.Lock() | ||
| delete(m.listeners, id) | ||
| m.mu.Unlock() | ||
| } | ||
| } |
There was a problem hiding this comment.
There is a subtle race condition in addListener where a concurrent poll can complete and notify the newly added listener with a newer configuration sequence number before the initial notification listener(cfg, seq) is executed. This results in the listener receiving the older configuration sequence after the newer one, causing it to silently overwrite the newer configuration with the stale one.
While the test TestManagerNotifyListeners_Race implements a sequence check to handle this, the actual production listeners registered via AddSessionPoolListener and AddSessionLoadListener do not have any sequence checks and are vulnerable to this race.
To prevent this, we can wrap the listener in addListener to enforce monotonic sequence numbers using atomic.Int64.
func (m *ClientConfigurationManager) addListener(listener configListener) func() {
m.mu.Lock()
id := m.nextListenerID
m.nextListenerID++
var lastSeq atomic.Int64
lastSeq.Store(-1)
wrappedListener := func(cfg clientConfig, seq int64) {
for {
current := lastSeq.Load()
if seq <= current {
return
}
if lastSeq.CompareAndSwap(current, seq) {
listener(cfg, seq)
return
}
}
}
if m.listeners == nil {
m.listeners = make(map[int]configListener)
}
m.listeners[id] = wrappedListener
cfg := m.currentConfig.Clone()
seq := m.configSeq
m.mu.Unlock()
btopt.Debugf(m.logger, "bigtable: adding configuration listener (id: %d)", id)
wrappedListener(cfg, seq)
return func() {
m.mu.Lock()
delete(m.listeners, id)
m.mu.Unlock()
}
}| if i < maxRetries { | ||
| delay := time.Duration(rand.Intn(1<<i)) * time.Second | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case <-time.After(delay): | ||
| } | ||
| } |
There was a problem hiding this comment.
If maxRetries is configured to a large value (or if the server returns an extreme value), 1<<i can overflow the signed integer type, resulting in a negative value or zero. Passing a non-positive value to rand.Intn will cause a runtime panic, crashing the client.
Additionally, without a cap, the backoff delay can grow excessively large (e.g., hours or days). Capping the backoff limit at a reasonable maximum (e.g., 30 seconds) prevents both integer overflow panics and excessively long retry delays.
| if i < maxRetries { | |
| delay := time.Duration(rand.Intn(1<<i)) * time.Second | |
| select { | |
| case <-ctx.Done(): | |
| return | |
| case <-time.After(delay): | |
| } | |
| } | |
| if i < maxRetries { | |
| backoffLimit := 1 << i | |
| if backoffLimit > 30 { | |
| backoffLimit = 30 | |
| } | |
| delay := time.Duration(rand.Intn(backoffLimit)) * time.Second | |
| select { | |
| case <-ctx.Done(): | |
| return | |
| case <-time.After(delay): | |
| } | |
| } |
| // Fall back to default configuration if validity window has expired | ||
| if time.Now().After(m.validUntil) { | ||
| btopt.Debugf(m.logger, "bigtable: client configuration validity window expired, falling back to default config") | ||
| m.currentConfig = m.defaultConfig | ||
| m.configSeq++ | ||
| seq = m.configSeq | ||
| listeners = make([]configListener, 0, len(m.listeners)) | ||
| for _, l := range m.listeners { | ||
| listeners = append(listeners, l) | ||
| } | ||
| cfgToNotify = m.currentConfig | ||
| } |
There was a problem hiding this comment.
When the client configuration validity window expires and the poll fails, the manager falls back to the default configuration. However, because m.validUntil is not reset or updated during this fallback, every subsequent failed poll (which occurs on every polling interval) will repeatedly trigger the fallback block. This results in redundant sequence number increments and unnecessary listener notifications with the same default configuration during a prolonged outage.
Resetting m.validUntil to a far-future time (similar to initialization) upon fallback prevents these redundant notifications.
| // Fall back to default configuration if validity window has expired | |
| if time.Now().After(m.validUntil) { | |
| btopt.Debugf(m.logger, "bigtable: client configuration validity window expired, falling back to default config") | |
| m.currentConfig = m.defaultConfig | |
| m.configSeq++ | |
| seq = m.configSeq | |
| listeners = make([]configListener, 0, len(m.listeners)) | |
| for _, l := range m.listeners { | |
| listeners = append(listeners, l) | |
| } | |
| cfgToNotify = m.currentConfig | |
| } | |
| // Fall back to default configuration if validity window has expired | |
| if time.Now().After(m.validUntil) { | |
| btopt.Debugf(m.logger, "bigtable: client configuration validity window expired, falling back to default config") | |
| m.currentConfig = m.defaultConfig | |
| m.configSeq++ | |
| seq = m.configSeq | |
| m.validUntil = time.Now().Add(time.Hour * 24 * 365 * 100) | |
| listeners = make([]configListener, 0, len(m.listeners)) | |
| for _, l := range m.listeners { | |
| listeners = append(listeners, l) | |
| } | |
| cfgToNotify = m.currentConfig | |
| } |
Summary
ClientConfigurationManagerinbigtable/internal/transport: pollsGetClientConfigurationon a fixed interval, parses the response into a typedclientConfig, and fans changes out to registered listeners. Supports RPC retry, validity-window fallback, andClose()shutdown that waits for in-flight polls.Closesemantics.Clientbehind a newClientConfig.EnableSessionflag (off by default). When set,NewClientWithConfigconstructs the manager using the data-planeBigtableClientand the instance-scoped request metadata, then starts it;Closestops the manager first so it cannot fire listener callbacks against state that is about to be torn down.Future session-pool PRs will key off the configuration surface this manager exposes. For now it only polls.
Part of the series of small PRs replacing #19980.
Test plan
go build ./...inbigtable/go vet ./...inbigtable/go test -short ./internal/transport/... .inbigtable/— all pass