Skip to content

fix the issue with offsetOutOfRange#485

Open
dethlex wants to merge 2 commits into
lovoo:masterfrom
dethlex:fix-outside-offsets
Open

fix the issue with offsetOutOfRange#485
dethlex wants to merge 2 commits into
lovoo:masterfrom
dethlex:fix-outside-offsets

Conversation

@dethlex

@dethlex dethlex commented Mar 10, 2026

Copy link
Copy Markdown

Summary
Handle ErrOffsetOutOfRange in goka's partition table recovery by falling back to sarama.OffsetOldest instead of crashing the processor

Problem

  Production error:                                                                                                                                                                                                                                                        
  error consuming %TOPIC%: kafka server: The requested offset is outside   
  the range of offsets maintained by the server for the given topic/partition         

Goka's findOffsetToLoad() adjusts offsets below the broker's oldest, but a race condition exists: the oldest offset can shift between the metadata fetch and the actual ConsumePartition call (e.g. due to log compaction or retention cleanup), causing ErrOffsetOutOfRange.

Fix When ConsumePartition returns ErrOffsetOutOfRange, retry with sarama.OffsetOldest (lets the broker resolve the actual earliest offset) instead of failing the partition setup.

@norbertklawikowski norbertklawikowski left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating the PR!

Comment thread partition_table.go
partConsumer, err = p.consumer.ConsumePartition(p.topic, p.partition, loadOffset)
if err != nil {
return fmt.Errorf("Error creating partition consumer for topic %s, partition %d, offset %d: %v", p.topic, p.partition, storedOffset, err)
if errors.Is(err, sarama.ErrOffsetOutOfRange) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handling of other errors than sarama.ErrOffsetOutOfRange is missing here.

Comment thread partition_table.go

// load messages and stop when you're at HWM
loadErr := p.loadMessages(ctx, partConsumer, hwm, stopAfterCatchup)
if loadErr != nil && errors.Is(loadErr, sarama.ErrOffsetOutOfRange) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that a ErrOffsetOutOfRange can be returned by the Errors channel of the consumer inside loadMessages? It seems to be only possibly returned via the chooseStartingOffset which is called via ConsumePartition, thus while constructing a PartitionConsumer.

Here it would mean that while consuming from a PartitionConsumer this error can happen, and we would retry only one time which doesn't seem to be sufficient in this case. But I can not see that this is possible at all.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants