Conversation
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds support for server-side (streaming) cursors to incrementally consume large result sets without buffering everything in memory.
Changes:
- Introduces
StreamCursor/AsyncStreamCursorand addsstream_results=Trueoption toConnection.cursor()/AsyncConnection.cursor(). - Tracks active streaming cursor per connection to block conflicting operations during interactive transactions.
- Expands unit + integration test coverage and updates README with streaming usage examples.
Reviewed changes
Copilot reviewed 8 out of 9 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| ydb_dbapi/utils.py | Prevents double-wrapping of already-normalized DBAPI errors in handle_ydb_errors. |
| ydb_dbapi/cursors.py | Refactors shared cursor state into BaseCursor and adds sync/async streaming cursors. |
| ydb_dbapi/connections.py | Adds stream_results flag, tracks active stream cursor, and blocks commit/rollback/queries while streaming. |
| ydb_dbapi/init.py | Exports stream cursor classes publicly. |
| tests/test_cursors_unit.py | Adds unit tests for stream cursor buffering/rowcount semantics with fake iterators/pools. |
| tests/test_cursors.py | Adds integration tests validating streaming behavior and transaction-session blocking. |
| tests/test_connections.py | Minor formatting-only changes. |
| README.md | Documents how to use streaming cursors for sync and async connections. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def _finish_stream(self) -> None: | ||
| self._stream = None | ||
| self._release_owned_session() | ||
| self._clear_current_cursor() | ||
| self._finish_query() | ||
|
|
||
| @handle_ydb_errors | ||
| @invalidate_cursor_on_ydb_error | ||
| def _load_next_result_set(self) -> bool: | ||
| if self._stream is None: | ||
| return False | ||
|
|
||
| try: | ||
| result_set = next(self._stream) | ||
| except StopIteration: | ||
| self._finish_stream() | ||
| return False |
There was a problem hiding this comment.
On normal stream exhaustion (StopIteration), the code releases the session but never explicitly closes/exits the SyncResponseContextIterator context. If the YDB iterator relies on __exit__ for cleanup (network resources, server-side cancellation, etc.), this may leak resources. Consider ensuring the iterator is always exited/closed (e.g., wrap iteration in a context manager and/or call the iterator’s close/exit equivalent inside _finish_stream, including the cancel() path).
| async def _finish_stream(self) -> None: | ||
| self._stream = None | ||
| await self._release_owned_session() | ||
| self._clear_current_cursor() | ||
| self._finish_query() | ||
|
|
||
| @handle_ydb_errors | ||
| @invalidate_cursor_on_ydb_error | ||
| async def _load_next_result_set(self) -> bool: | ||
| if self._stream is None: | ||
| return False | ||
|
|
||
| try: | ||
| result_set = await self._stream.__anext__() | ||
| except StopAsyncIteration: | ||
| await self._finish_stream() | ||
| return False |
There was a problem hiding this comment.
Same as the sync stream cursor: after StopAsyncIteration, the iterator is dropped and the session is released, but the AsyncResponseContextIterator is not explicitly exited/closed. If the iterator’s __aexit__ performs required cleanup, this can leave resources open. Consider consistently async with-wrapping the stream lifetime (or invoking its close/exit API) so both normal completion and error/close paths clean up deterministically.
| For large result sets you can use a server-side cursor that streams result | ||
| sets incrementally instead of buffering the whole response in memory: |
There was a problem hiding this comment.
The docs describe how to enable stream_results, but they don’t mention the key behavioral constraint enforced by the code: during interactive transactions, an active streaming cursor can block other queries and commit/rollback until the stream is fully consumed or explicitly closed (raising ProgrammingError otherwise). Consider adding a short note here (and in the async section) explaining that callers must consume all rows or call cursor.close() before issuing other operations on the same connection/transaction.
| def maybe_await(obj: callable) -> any: | ||
| if not iscoroutine(obj): | ||
| return obj | ||
| return await_only(obj) |
There was a problem hiding this comment.
These annotations are likely unintended: callable is a built-in, not a typing annotation, and any should be typing.Any. Since this helper is passed arbitrary objects (often None), annotate it as taking object (or Any) and returning Any, and import the appropriate typing symbols.
| def _set_current_cursor(self, cursor: AsyncStreamCursor) -> None: ... | ||
|
|
||
|
|
||
| class BaseStreamCursorTestSuit: |
There was a problem hiding this comment.
The class name uses TestSuit (missing 'e'); the standard spelling is TestSuite. Renaming improves readability/searchability across the test code.
Closes: #36