Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Added a defensive guard in bounded change feed reads (with `endLsn`) that fails the Spark task with `IllegalStateException` when the underlying paginator stops before the latest continuation token has advanced to `endLsn`. - See [PR 49393](https://github.com/Azure/azure-sdk-for-java/pull/49393)

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Added a defensive guard in bounded change feed reads (with `endLsn`) that fails the Spark task with `IllegalStateException` when the underlying paginator stops before the latest continuation token has advanced to `endLsn`. - See [PR 49393](https://github.com/Azure/azure-sdk-for-java/pull/49393)

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Added a defensive guard in bounded change feed reads (with `endLsn`) that fails the Spark task with `IllegalStateException` when the underlying paginator stops before the latest continuation token has advanced to `endLsn`. - See [PR 49393](https://github.com/Azure/azure-sdk-for-java/pull/49393)

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Added a defensive guard in bounded change feed reads (with `endLsn`) that fails the Spark task with `IllegalStateException` when the underlying paginator stops before the latest continuation token has advanced to `endLsn`. - See [PR 49393](https://github.com/Azure/azure-sdk-for-java/pull/49393)

#### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
import scala.util.Random
import scala.util.control.Breaks
import scala.concurrent.{Await, ExecutionContext, Future}
import com.azure.cosmos.implementation.{ChangeFeedSparkRowItem, OperationCancelledException, SparkBridgeImplementationInternal}
import com.azure.cosmos.implementation.{ChangeFeedSparkRowItem, OperationCancelledException, SparkBridgeImplementationInternal, Strings}


// scalastyle:off underscore.import
Expand Down Expand Up @@ -177,11 +177,50 @@ private class TransientIOErrorsRetryingIterator[TSparkRow]
None
}
} else {
validateEndLsnReachedOrFail()
Some(false)
}
}
}

/**
* Defensive guard for bounded change feed reads (endLsn defined). When the underlying
* paginator signals no more pages, validate that the latest continuation token has actually
* advanced to endLsn for every sub-feed-range. If it has not, the SDK terminated the change
* feed read prematurely (see issue #49380) and silently completing would surface as missing
* rows downstream. Fail the task with IllegalStateException so Spark can retry/abort instead
* of returning a truncated result.
*/
private[this] def validateEndLsnReachedOrFail(): Unit = {
Comment thread
xinlian12 marked this conversation as resolved.
Comment thread
xinlian12 marked this conversation as resolved.
this.endLsn match {
case None => // no bound configured (e.g. batch mode draining to 304s) - nothing to validate
case Some(boundLsn) =>
val continuation = lastContinuationToken.get()
if (Strings.isNullOrWhiteSpace(continuation)) {
throw new IllegalStateException(
s"Bounded change feed read terminated before any page was returned. " +
s"Expected to reach endLsn=$boundLsn but no continuation was produced. " +
s"Context: $operationContextString")
}

val tokens = SparkBridgeImplementationInternal
.extractContinuationTokensFromChangeFeedStateJson(continuation)
if (tokens.isEmpty) {
throw new IllegalStateException(
s"Bounded change feed read terminated with a continuation that has no sub-range tokens. " +
s"Expected to reach endLsn=$boundLsn. Continuation=$continuation. " +
s"Context: $operationContextString")
}
val minLsn = tokens.minBy(_._2)._2
if (minLsn < boundLsn) {
throw new IllegalStateException(
s"Bounded change feed read terminated before reaching endLsn=$boundLsn. " +
s"Lowest sub-range LSN in latest continuation=$minLsn. " +
s"Continuation=$continuation. Context: $operationContextString")
}
}
}

private def hasBufferedNext: Boolean = {
currentItemIterator match {
case Some(iterator) => if (iterator.hasNext && validateNextLsn(iterator)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ package com.azure.cosmos.spark

import com.azure.cosmos.CosmosException
import com.azure.cosmos.implementation.SparkRowItem
import com.azure.cosmos.implementation.changefeed.common.{ChangeFeedMode, ChangeFeedStartFromInternal, ChangeFeedStateV1}
import com.azure.cosmos.implementation.feedranges.{FeedRangeContinuation, FeedRangeEpkImpl}
import com.azure.cosmos.implementation.query.CompositeContinuationToken
import com.azure.cosmos.models.{FeedResponse, ModelBridgeInternal}
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
import com.azure.cosmos.util.UtilBridgeInternal
Expand All @@ -13,6 +16,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode
import reactor.core.publisher.Flux

import java.time.Duration
import java.util.Collections
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

Expand Down Expand Up @@ -180,6 +184,89 @@ class TransientIOErrorsRetryingIteratorSpec extends UnitSpec with BasicLoggingTr
factoryCallCount.get shouldEqual 1
}

"Bounded change feed read" should "throw IllegalStateException when paginator stops before reaching endLsn" in {
val containerRid = "testContainerRid"
val midLsn = 15L
val boundLsn = 20L
val continuationToken = buildChangeFeedStateJson(containerRid, midLsn)

val iterator = new TransientIOErrorsRetryingIterator(
_ => buildSinglePageFluxWithContinuation(continuationToken),
pageSize,
1,
None,
Some(boundLsn)
)
iterator.maxRetryIntervalInMs = 5

val ex = intercept[IllegalStateException] {
iterator.count(_ => true)
}
ex.getMessage should include (s"endLsn=$boundLsn")
ex.getMessage should include (s"continuation=$midLsn")
.or(include(s"LSN in latest continuation=$midLsn"))
}

"Bounded change feed read" should "complete normally when continuation has reached endLsn" in {
val containerRid = "testContainerRid"
val boundLsn = 20L
val continuationToken = buildChangeFeedStateJson(containerRid, boundLsn)

val iterator = new TransientIOErrorsRetryingIterator(
_ => buildSinglePageFluxWithContinuation(continuationToken),
pageSize,
1,
None,
Some(boundLsn)
)
iterator.maxRetryIntervalInMs = 5

// Drain the iterator - should not throw
noException should be thrownBy iterator.count(_ => true)
}

"Bounded change feed read" should "throw IllegalStateException when no continuation is produced" in {
val boundLsn = 20L

val iterator = new TransientIOErrorsRetryingIterator(
_ => UtilBridgeInternal.createCosmosPagedFlux(_ => Flux.empty[FeedResponse[SparkRowItem]]()),
pageSize,
1,
None,
Some(boundLsn)
)
iterator.maxRetryIntervalInMs = 5

val ex = intercept[IllegalStateException] {
iterator.count(_ => true)
}
ex.getMessage should include (s"endLsn=$boundLsn")
ex.getMessage should include ("no continuation was produced")
}

private def buildChangeFeedStateJson(containerRid: String, lsn: Long): String = {
val fullRange = FeedRangeEpkImpl.forFullRange()
val continuation = FeedRangeContinuation.create(
containerRid,
fullRange,
Collections.singletonList(
new CompositeContinuationToken("\"" + lsn + "\"", fullRange.getRange)))
new ChangeFeedStateV1(
containerRid,
fullRange,
ChangeFeedMode.INCREMENTAL,
ChangeFeedStartFromInternal.createFromBeginning(),
continuation).toString
}

private def buildSinglePageFluxWithContinuation(continuationToken: String) = {
val response = ModelBridgeInternal.createFeedResponse(
Collections.emptyList[SparkRowItem](),
new ConcurrentHashMap[String, String]())
ModelBridgeInternal.setFeedResponseContinuationToken(continuationToken, response)
UtilBridgeInternal.createCosmosPagedFlux(_ => Flux.fromArray(Array(response)))
}

"TransientIOErrors" should "drain long runs of empty pages without hitting the end-to-end timeout" in {
// Regression test for the empty-page drain scenario: when the SDK is configured with
// emptyPagesAllowed=true the iterator must surface many consecutive empty
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_4-0_2-13/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Added a defensive guard in bounded change feed reads (with `endLsn`) that fails the Spark task with `IllegalStateException` when the underlying paginator stops before the latest continuation token has advanced to `endLsn`. - See [PR 49393](https://github.com/Azure/azure-sdk-for-java/pull/49393)

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_4-1_2-13/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Added a defensive guard in bounded change feed reads (with `endLsn`) that fails the Spark task with `IllegalStateException` when the underlying paginator stops before the latest continuation token has advanced to `endLsn`. - See [PR 49393](https://github.com/Azure/azure-sdk-for-java/pull/49393)

#### Other Changes

Expand Down