Skip to content

Stream Cursors#35

Draft
vgvoleg wants to merge 1 commit intomainfrom
stream_cursor
Draft

Stream Cursors#35
vgvoleg wants to merge 1 commit intomainfrom
stream_cursor

Conversation

@vgvoleg
Copy link
Copy Markdown
Collaborator

@vgvoleg vgvoleg commented Apr 24, 2026

Closes: #36

@vgvoleg vgvoleg requested a review from Copilot April 24, 2026 12:48
@vgvoleg vgvoleg marked this pull request as draft April 24, 2026 12:50
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Adds support for server-side (streaming) cursors to incrementally consume large result sets without buffering everything in memory.

Changes:

  • Introduces StreamCursor / AsyncStreamCursor and adds stream_results=True option to Connection.cursor() / AsyncConnection.cursor().
  • Tracks active streaming cursor per connection to block conflicting operations during interactive transactions.
  • Expands unit + integration test coverage and updates README with streaming usage examples.

Reviewed changes

Copilot reviewed 8 out of 9 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
ydb_dbapi/utils.py Prevents double-wrapping of already-normalized DBAPI errors in handle_ydb_errors.
ydb_dbapi/cursors.py Refactors shared cursor state into BaseCursor and adds sync/async streaming cursors.
ydb_dbapi/connections.py Adds stream_results flag, tracks active stream cursor, and blocks commit/rollback/queries while streaming.
ydb_dbapi/init.py Exports stream cursor classes publicly.
tests/test_cursors_unit.py Adds unit tests for stream cursor buffering/rowcount semantics with fake iterators/pools.
tests/test_cursors.py Adds integration tests validating streaming behavior and transaction-session blocking.
tests/test_connections.py Minor formatting-only changes.
README.md Documents how to use streaming cursors for sync and async connections.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread ydb_dbapi/cursors.py
Comment on lines +450 to +466
def _finish_stream(self) -> None:
self._stream = None
self._release_owned_session()
self._clear_current_cursor()
self._finish_query()

@handle_ydb_errors
@invalidate_cursor_on_ydb_error
def _load_next_result_set(self) -> bool:
if self._stream is None:
return False

try:
result_set = next(self._stream)
except StopIteration:
self._finish_stream()
return False
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On normal stream exhaustion (StopIteration), the code releases the session but never explicitly closes/exits the SyncResponseContextIterator context. If the YDB iterator relies on __exit__ for cleanup (network resources, server-side cancellation, etc.), this may leak resources. Consider ensuring the iterator is always exited/closed (e.g., wrap iteration in a context manager and/or call the iterator’s close/exit equivalent inside _finish_stream, including the cancel() path).

Copilot uses AI. Check for mistakes.
Comment thread ydb_dbapi/cursors.py
Comment on lines +865 to +881
async def _finish_stream(self) -> None:
self._stream = None
await self._release_owned_session()
self._clear_current_cursor()
self._finish_query()

@handle_ydb_errors
@invalidate_cursor_on_ydb_error
async def _load_next_result_set(self) -> bool:
if self._stream is None:
return False

try:
result_set = await self._stream.__anext__()
except StopAsyncIteration:
await self._finish_stream()
return False
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the sync stream cursor: after StopAsyncIteration, the iterator is dropped and the session is released, but the AsyncResponseContextIterator is not explicitly exited/closed. If the iterator’s __aexit__ performs required cleanup, this can leave resources open. Consider consistently async with-wrapping the stream lifetime (or invoking its close/exit API) so both normal completion and error/close paths clean up deterministically.

Copilot uses AI. Check for mistakes.
Comment thread README.md
Comment on lines +40 to +41
For large result sets you can use a server-side cursor that streams result
sets incrementally instead of buffering the whole response in memory:
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs describe how to enable stream_results, but they don’t mention the key behavioral constraint enforced by the code: during interactive transactions, an active streaming cursor can block other queries and commit/rollback until the stream is fully consumed or explicitly closed (raising ProgrammingError otherwise). Consider adding a short note here (and in the async section) explaining that callers must consume all rows or call cursor.close() before issuing other operations on the same connection/transaction.

Copilot uses AI. Check for mistakes.
Comment on lines +16 to +19
def maybe_await(obj: callable) -> any:
if not iscoroutine(obj):
return obj
return await_only(obj)
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These annotations are likely unintended: callable is a built-in, not a typing annotation, and any should be typing.Any. Since this helper is passed arbitrary objects (often None), annotate it as taking object (or Any) and returning Any, and import the appropriate typing symbols.

Copilot uses AI. Check for mistakes.
def _set_current_cursor(self, cursor: AsyncStreamCursor) -> None: ...


class BaseStreamCursorTestSuit:
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class name uses TestSuit (missing 'e'); the standard spelling is TestSuite. Renaming improves readability/searchability across the test code.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Server-side streaming cursors

2 participants