From d6e7a82bdb14827920a951187115266101acad1a Mon Sep 17 00:00:00 2001 From: Jeffrey 'Alex' Clark Date: Fri, 1 May 2026 18:46:03 -0400 Subject: [PATCH] PYTHON-5676 Phase 1: Remove dead legacy OP_QUERY code from run_operation() Since MIN_SUPPORTED_WIRE_VERSION=8 (MongoDB 4.2+), use_command() always returns True and OP_MSG is always used. Remove the unreachable OP_QUERY branches from Server.run_operation(), along with the dead else-branches in Cursor and CommandCursor that referenced _OpReply. - Hard-code use_cmd=True / apply_timeout=True in run_operation() - Always use user_fields=_CURSOR_DOC_FIELDS and legacy_response=False - Remove isinstance(reply, _OpMsg) exhaust check (reply is always _OpMsg) - Hard-code from_command=True in Response/PinnedResponse construction - Remove dead else-branches with assert isinstance(response.data, _OpReply) in Cursor._send_message and CommandCursor._send_message - Add scope document PYTHON-5676-scope.md outlining all four phases --- PYTHON-5676-scope.md | 183 +++++++++++++++++++++++++ pymongo/asynchronous/command_cursor.py | 13 +- pymongo/asynchronous/cursor.py | 34 ++--- pymongo/asynchronous/server.py | 56 +++----- pymongo/synchronous/command_cursor.py | 13 +- pymongo/synchronous/cursor.py | 34 ++--- pymongo/synchronous/server.py | 56 +++----- 7 files changed, 251 insertions(+), 138 deletions(-) create mode 100644 PYTHON-5676-scope.md diff --git a/PYTHON-5676-scope.md b/PYTHON-5676-scope.md new file mode 100644 index 000000000..56a7fc0da --- /dev/null +++ b/PYTHON-5676-scope.md @@ -0,0 +1,183 @@ +# PYTHON-5676: Consolidate Command Execution Logic — Scope Document + +## Context + +PyMongo has accumulated several distinct code paths for executing commands against the server. When cross-cutting logic (e.g., backpressure, CSOT, monitoring) needs to be added or changed, engineers must find and update multiple paths. The goal is a single central path for all database operations, making future changes local to one place. + +PYTHON-1357 (Refactor Cursor and CommandCursor) is now Closed — the prerequisite work is done. + +--- + +## Current Command Execution Paths + +All paths are duplicated across `pymongo/asynchronous/` (source of truth) and `pymongo/synchronous/` (auto-generated via `tools/synchro.sh`). + +### Path 1 — Standard commands (most operations) +``` +Collection._command() / Database.command() / conn.command() (direct) + → Connection.command() [pool.py:344] — session, write concern, server API, reauth + → network.command() [network.py:61] — encode OP_MSG, APM, logging, CSOT, encryption, send/recv +``` +Used by: insert, update, delete, aggregate, distinct, find_one, createIndex, dropCollection, db.command(), etc. + +### Path 2 — Cursor operations (find / getMore) +``` +Cursor._send_message() + → MongoClient._run_operation() [mongo_client.py:1911] + → Server.run_operation() [server.py:138] — send/recv + FULL APM/logging (≈80 lines, near-identical to network.command()) +``` +**Bypasses `network.command()` entirely.** Has its own APM/monitoring and logging code. + +### Path 3 — Acknowledged bulk writes (collection-level) +``` +_Bulk._execute_batch() + → _Bulk.write_command() [bulk.py:244] — APM/logging wrapper (≈80 lines) + → Connection.write_command() [pool.py:480] — pre-encoded bytes, raw send+recv, no APM +``` +**Bypasses both `Connection.command()` and `network.command()`.** Note: for *encrypted* bulk writes, this path already uses `Connection.command()` — the non-encrypted path is the outlier. + +### Path 4 — Unacknowledged bulk writes +``` +_Bulk._execute_batch_unack() + → _Bulk.unack_write() [bulk.py:329] — APM/logging wrapper (≈70 lines) + → Connection.unack_write() [pool.py:469] — raw send only, no recv +``` +**Bypasses `Connection.command()` and `network.command()`.** + +### Path 5 — Client-level bulk writes (cross-collection) +`client_bulk.py` mirrors Paths 3 and 4 with its own `write_command()` and `unack_write()` wrappers — another full copy of the APM/logging boilerplate. + +--- + +## Key Findings + +**1. APM/logging code is copy-pasted 5+ times.** +`Server.run_operation()`, `_Bulk.write_command()`, `_Bulk.unack_write()`, `_ClientBulk.write_command()`, `_ClientBulk.unack_write()` all contain the same ~70-80 line block of `_COMMAND_LOGGER.isEnabledFor(DEBUG)` + `_debug_log(...)` + `listeners.publish_command_start/success/failure(...)`. The reference implementation is `network.command()`. + +**2. Dead legacy OP_QUERY code in `Server.run_operation()`.** +`MIN_SUPPORTED_WIRE_VERSION = 8` (MongoDB 4.2+, `common.py`). OP_MSG was introduced at wire version 6. The `use_cmd = False` branch (legacy OP_QUERY path) in `run_operation()` can never be reached for any currently supported server. The `_Query.use_command()` check at `message.py:1622` confirms: for wire version ≥ 8, it is unconditionally True. + +**3. `_op_msg()` already handles bulk write Type 1 sections.** +`message._op_msg()` (line 394) detects insert/update/delete commands, pops the documents field, and encodes them as a Type 1 section. The non-encrypted bulk write path bypasses this and does its own single-pass encode+batch-size-check via `_do_batched_op_msg`. Unifying would require separating batch determination from encoding (currently combined for performance). + +**4. Encrypted bulk writes already use `Connection.command()`.** +In `_Bulk._execute_batch()` (bulk.py:444), the `if self.is_encrypted:` branch calls `bwc.conn.command()`. The non-encrypted `else` branch goes through `Connection.write_command()`. These should be the same path. + +**5. Async is generated from async source.** +All `pymongo/synchronous/*.py` files are auto-generated from `pymongo/asynchronous/*.py` via `tools/synchro.sh`. All code changes must be made in the `asynchronous/` files only. + +--- + +## Proposed Consolidation: Phased Approach + +### Phase 1 — Remove dead OP_QUERY code from `run_operation()` *(Low risk)* + +**What:** Delete the `use_cmd = False` branches from `Server.run_operation()`. Remove the conditional `if use_cmd: ... else: ...` blocks for `user_fields`/`legacy_response` and response-building. + +**Files:** `pymongo/asynchronous/server.py`, `pymongo/asynchronous/cursor.py` (remove the dead `else` branches referencing `_OpReply`) + +**Why now:** This is purely dead code removal — no behavior change. It simplifies Phase 3 significantly and is independently safe. + +**Verification:** Full test suite; specifically `test/test_cursor.py` and `test/test_unified_format.py`. + +--- + +### Phase 2 — Extract shared APM/logging helpers *(Low risk)* + +**What:** Extract the duplicated APM/logging block into shared helper functions in `pymongo/helpers_shared.py` (or a new `pymongo/command_helpers.py`). All five duplicate sites call the helpers instead of inlining the code. + +```python +# No sync/async split needed — these functions contain no I/O +def _log_and_publish_command_started(client, listeners, cmd, dbname, request_id, conn): + ... + + +def _log_and_publish_command_succeeded( + client, listeners, cmd, dbname, request_id, conn, reply, duration +): + ... + + +def _log_and_publish_command_failed( + client, listeners, cmd, dbname, request_id, conn, failure, duration +): + ... +``` + +**Files:** `pymongo/helpers_shared.py` (or new `pymongo/command_helpers.py`), `pymongo/asynchronous/network.py`, `pymongo/asynchronous/server.py`, `pymongo/asynchronous/bulk.py`, `pymongo/asynchronous/client_bulk.py` + +**Why this matters:** This is the direct solution to the stated problem. When future cross-cutting logic needs to be added to "command execution", there is one place to add it. + +**Note on APM operationId:** `_BulkWriteContext` passes `op_id` (not `request_id`) as the `operationId` in APM events. This is spec-required behavior that must be preserved in the helpers. + +**Verification:** `test/test_command_monitoring.py`, `test/test_monitoring.py`, `test/test_unified_format.py`. + +--- + +### Phase 3 — Unify `Server.run_operation()` with `network.command()` *(Medium risk)* + +**What:** Refactor `network.command()` into a two-layer structure: + +- `_network_command_core()` — performs all work (encode, APM, send, recv) and returns `(response_doc, raw_reply, request_id, duration)` +- `command()` — thin wrapper returning just `response_doc` (existing callers unchanged) + +`Server.run_operation()` (after Phase 1+2) calls `_network_command_core()` for the actual transport, then wraps the result in `Response`/`PinnedResponse`. + +**Key complexity:** `RawBatchCursor._unpack_response()` (`cursor.py:1196`) calls `response.raw_response(cursor_id)` on the raw `_OpMsg` object — it needs the raw reply, not just the decoded dict. The `_network_command_core()` design satisfies this by exposing `raw_reply`. The `@_handle_reauth` decorator on `run_operation()` must also be preserved. + +**Files:** `pymongo/asynchronous/network.py`, `pymongo/asynchronous/server.py` + +**Result:** Cursor operations (find/getMore) fully share the command execution path with all other operations. Single place to add transport-level logic. + +**Verification:** `test/test_cursor.py`, exhaust cursor tests, `test/test_encryption.py`, `test/test_csot.py`. + +--- + +### Phase 4 — Unify non-encrypted bulk write path with `Connection.command()` *(Higher risk, defer)* + +**What:** Change `_Bulk._execute_batch()` to use `Connection.command()` for non-encrypted bulk writes, matching the already-unified encrypted path. Requires `_BulkWriteContext.batch_command()` to return a command dict instead of pre-encoded bytes. + +**The challenge:** `_do_batched_op_msg` performs batch-size determination and encoding in a single pass (including a C extension: `_cmessage._batched_op_msg`). Separating batch determination from encoding requires a two-pass approach, adding encoding overhead. **Needs benchmarking before committing.** + +`_EncryptedBulkWriteContext.batch_command()` encodes to bytes and then deserializes back via `_inflate_bson` — aligning the non-encrypted path with this approach may be the right model. + +**Result after Phase 4:** `Connection.write_command()`, `Connection.unack_write()`, `_Bulk.write_command()`, `_Bulk.unack_write()`, `_ClientBulk.write_command()`, and `_ClientBulk.unack_write()` have no callers and can be removed. + +**Gate:** Bulk write benchmarks before/after. Suggested regression threshold: ≤2% throughput degradation. + +--- + +## Definition of Done + +- All database operations route through `Connection.command()` → `network.command()` for actual transport +- APM/logging code exists in one location +- `Connection.write_command()`, `Connection.unack_write()`, `_Bulk.write_command()`, `_Bulk.unack_write()`, `_ClientBulk.write_command()`, `_ClientBulk.unack_write()` are removed (or reduced to thin wrappers) +- No performance regression (bulk write benchmarks) +- No behavioral regression (full spec test suite passes) + +--- + +## Risk Summary + +| Phase | Risk | Effort | Value | +|-------|------|--------|-------| +| 1 — Remove dead OP_QUERY code | Low | ~1 day | Medium (simplification) | +| 2 — Extract APM helpers | Low | ~2 days | **High** (solves the stated problem) | +| 3 — Unify run_operation with network.command | Medium | ~3 days | High (true path consolidation) | +| 4 — Unify bulk write bytes path | Higher | ~1 week + benchmarking | Medium (removes last bypass) | + +Phases 1 and 2 can be done together in one PR. Phase 3 should be a separate PR. Phase 4 should be gated on benchmarking. + +--- + +## Critical Files + +| File | Role | +|------|------| +| `pymongo/asynchronous/network.py:61` | Central `command()` — reference implementation | +| `pymongo/asynchronous/server.py:138` | `run_operation()` — cursor path bypass | +| `pymongo/asynchronous/pool.py:344,469,480` | `Connection.command`, `unack_write`, `write_command` | +| `pymongo/asynchronous/bulk.py:244,329,444` | `_Bulk.write_command`, `unack_write`, `_execute_batch` | +| `pymongo/asynchronous/client_bulk.py:229,320` | `_ClientBulk.write_command`, `unack_write` | +| `pymongo/message.py:394,695,1622` | `_op_msg`, `_BulkWriteContext.batch_command`, `_Query.use_command` | +| `pymongo/common.py` | `MIN_SUPPORTED_WIRE_VERSION = 8` | diff --git a/pymongo/asynchronous/command_cursor.py b/pymongo/asynchronous/command_cursor.py index 5a59c67a1..34194899e 100644 --- a/pymongo/asynchronous/command_cursor.py +++ b/pymongo/asynchronous/command_cursor.py @@ -189,15 +189,10 @@ class AsyncCommandCursor(_AsyncCursorBase[_DocumentType]): if isinstance(response, PinnedResponse): if not self._sock_mgr: self._sock_mgr = _ConnectionManager(response.conn, response.more_to_come) # type: ignore[arg-type] - if response.from_command: - cursor = response.docs[0]["cursor"] - documents = cursor["nextBatch"] - self._postbatchresumetoken = cursor.get("postBatchResumeToken") - self._id = cursor["id"] - else: - documents = response.docs - assert isinstance(response.data, _OpReply) - self._id = response.data.cursor_id + cursor = response.docs[0]["cursor"] + documents = cursor["nextBatch"] + self._postbatchresumetoken = cursor.get("postBatchResumeToken") + self._id = cursor["id"] if self._id == 0: await self.close() diff --git a/pymongo/asynchronous/cursor.py b/pymongo/asynchronous/cursor.py index a60c082ad..f7c167177 100644 --- a/pymongo/asynchronous/cursor.py +++ b/pymongo/asynchronous/cursor.py @@ -1020,29 +1020,23 @@ class AsyncCursor(_AsyncCursorBase[_DocumentType]): cmd_name = operation.name docs = response.docs - if response.from_command: - if cmd_name != "explain": - cursor = docs[0]["cursor"] - self._id = cursor["id"] - if cmd_name == "find": - documents = cursor["firstBatch"] - # Update the namespace used for future getMore commands. - ns = cursor.get("ns") - if ns: - self._dbname, self._collname = ns.split(".", 1) - else: - documents = cursor["nextBatch"] - self._data = deque(documents) - self._retrieved += len(documents) + if cmd_name != "explain": + cursor = docs[0]["cursor"] + self._id = cursor["id"] + if cmd_name == "find": + documents = cursor["firstBatch"] + # Update the namespace used for future getMore commands. + ns = cursor.get("ns") + if ns: + self._dbname, self._collname = ns.split(".", 1) else: - self._id = 0 - self._data = deque(docs) - self._retrieved += len(docs) + documents = cursor["nextBatch"] + self._data = deque(documents) + self._retrieved += len(documents) else: - assert isinstance(response.data, _OpReply) - self._id = response.data.cursor_id + self._id = 0 self._data = deque(docs) - self._retrieved += response.data.number_returned + self._retrieved += len(docs) if self._id == 0: # Don't wait for garbage collection to call __del__, return the diff --git a/pymongo/asynchronous/server.py b/pymongo/asynchronous/server.py index f21230617..aaf7ef82b 100644 --- a/pymongo/asynchronous/server.py +++ b/pymongo/asynchronous/server.py @@ -37,7 +37,7 @@ from pymongo.logger import ( _debug_log, _SDAMStatusMessage, ) -from pymongo.message import _convert_exception, _GetMore, _OpMsg, _Query +from pymongo.message import _convert_exception, _GetMore, _Query from pymongo.response import PinnedResponse, Response if TYPE_CHECKING: @@ -161,13 +161,13 @@ class Server: publish = listeners.enabled_for_commands start = datetime.now() - use_cmd = operation.use_command(conn) + operation.use_command(conn) more_to_come = operation.conn_mgr and operation.conn_mgr.more_to_come - cmd, dbn = await self.operation_to_command(operation, conn, use_cmd) + cmd, dbn = await self.operation_to_command(operation, conn, True) if more_to_come: request_id = 0 else: - message = operation.get_message(read_preference, conn, use_cmd) + message = operation.get_message(read_preference, conn, True) request_id, data, max_doc_size = self._split_message(message) if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): @@ -208,23 +208,16 @@ class Server: reply = await conn.receive_message(request_id) # Unpack and check for command errors. - if use_cmd: - user_fields = _CURSOR_DOC_FIELDS - legacy_response = False - else: - user_fields = None - legacy_response = True docs = unpack_res( reply, operation.cursor_id, operation.codec_options, - legacy_response=legacy_response, - user_fields=user_fields, + legacy_response=False, + user_fields=_CURSOR_DOC_FIELDS, ) - if use_cmd: - first = docs[0] - await operation.client._process_response(first, operation.session) # type: ignore[misc, arg-type] - _check_command_response(first, conn.max_wire_version, pool_opts=conn.opts) # type:ignore[has-type] + first = docs[0] + await operation.client._process_response(first, operation.session) # type: ignore[misc, arg-type] + _check_command_response(first, conn.max_wire_version, pool_opts=conn.opts) # type:ignore[has-type] except Exception as exc: duration = datetime.now() - start if isinstance(exc, (NotPrimaryError, OperationFailure)): @@ -263,18 +256,8 @@ class Server: ) raise duration = datetime.now() - start - # Must publish in find / getMore / explain command response - # format. - if use_cmd: - res = docs[0] - elif operation.name == "explain": - res = docs[0] if docs else {} - else: - res = {"cursor": {"id": reply.cursor_id, "ns": operation.namespace()}, "ok": 1} # type: ignore[union-attr] - if operation.name == "find": - res["cursor"]["firstBatch"] = docs - else: - res["cursor"]["nextBatch"] = docs + # Must publish in find / getMore / explain command response format. + res = docs[0] if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _COMMAND_LOGGER, @@ -308,21 +291,14 @@ class Server: # Decrypt response. client = operation.client # type: ignore[assignment] if client and client._encrypter: - if use_cmd: - decrypted = await client._encrypter.decrypt(reply.raw_command_response()) - docs = _decode_all_selective(decrypted, operation.codec_options, user_fields) + decrypted = await client._encrypter.decrypt(reply.raw_command_response()) + docs = _decode_all_selective(decrypted, operation.codec_options, _CURSOR_DOC_FIELDS) response: Response if client._should_pin_cursor(operation.session) or operation.exhaust: # type: ignore[arg-type] conn.pin_cursor() - if isinstance(reply, _OpMsg): - # In OP_MSG, the server keeps sending only if the - # more_to_come flag is set. - more_to_come = reply.more_to_come - else: - # In OP_REPLY, the server keeps sending until cursor_id is 0. - more_to_come = bool(operation.exhaust and reply.cursor_id) + more_to_come = reply.more_to_come if operation.conn_mgr: operation.conn_mgr.update_exhaust(more_to_come) response = PinnedResponse( @@ -331,7 +307,7 @@ class Server: conn=conn, duration=duration, request_id=request_id, - from_command=use_cmd, + from_command=True, docs=docs, more_to_come=more_to_come, ) @@ -341,7 +317,7 @@ class Server: address=self._description.address, duration=duration, request_id=request_id, - from_command=use_cmd, + from_command=True, docs=docs, ) diff --git a/pymongo/synchronous/command_cursor.py b/pymongo/synchronous/command_cursor.py index 34f60c654..b2023ad5d 100644 --- a/pymongo/synchronous/command_cursor.py +++ b/pymongo/synchronous/command_cursor.py @@ -189,15 +189,10 @@ class CommandCursor(_CursorBase[_DocumentType]): if isinstance(response, PinnedResponse): if not self._sock_mgr: self._sock_mgr = _ConnectionManager(response.conn, response.more_to_come) # type: ignore[arg-type] - if response.from_command: - cursor = response.docs[0]["cursor"] - documents = cursor["nextBatch"] - self._postbatchresumetoken = cursor.get("postBatchResumeToken") - self._id = cursor["id"] - else: - documents = response.docs - assert isinstance(response.data, _OpReply) - self._id = response.data.cursor_id + cursor = response.docs[0]["cursor"] + documents = cursor["nextBatch"] + self._postbatchresumetoken = cursor.get("postBatchResumeToken") + self._id = cursor["id"] if self._id == 0: self.close() diff --git a/pymongo/synchronous/cursor.py b/pymongo/synchronous/cursor.py index 5a721d8e0..909671dac 100644 --- a/pymongo/synchronous/cursor.py +++ b/pymongo/synchronous/cursor.py @@ -1018,29 +1018,23 @@ class Cursor(_CursorBase[_DocumentType]): cmd_name = operation.name docs = response.docs - if response.from_command: - if cmd_name != "explain": - cursor = docs[0]["cursor"] - self._id = cursor["id"] - if cmd_name == "find": - documents = cursor["firstBatch"] - # Update the namespace used for future getMore commands. - ns = cursor.get("ns") - if ns: - self._dbname, self._collname = ns.split(".", 1) - else: - documents = cursor["nextBatch"] - self._data = deque(documents) - self._retrieved += len(documents) + if cmd_name != "explain": + cursor = docs[0]["cursor"] + self._id = cursor["id"] + if cmd_name == "find": + documents = cursor["firstBatch"] + # Update the namespace used for future getMore commands. + ns = cursor.get("ns") + if ns: + self._dbname, self._collname = ns.split(".", 1) else: - self._id = 0 - self._data = deque(docs) - self._retrieved += len(docs) + documents = cursor["nextBatch"] + self._data = deque(documents) + self._retrieved += len(documents) else: - assert isinstance(response.data, _OpReply) - self._id = response.data.cursor_id + self._id = 0 self._data = deque(docs) - self._retrieved += response.data.number_returned + self._retrieved += len(docs) if self._id == 0: # Don't wait for garbage collection to call __del__, return the diff --git a/pymongo/synchronous/server.py b/pymongo/synchronous/server.py index f57420918..6073117f5 100644 --- a/pymongo/synchronous/server.py +++ b/pymongo/synchronous/server.py @@ -36,7 +36,7 @@ from pymongo.logger import ( _debug_log, _SDAMStatusMessage, ) -from pymongo.message import _convert_exception, _GetMore, _OpMsg, _Query +from pymongo.message import _convert_exception, _GetMore, _Query from pymongo.response import PinnedResponse, Response from pymongo.synchronous.helpers import _handle_reauth @@ -161,13 +161,13 @@ class Server: publish = listeners.enabled_for_commands start = datetime.now() - use_cmd = operation.use_command(conn) + operation.use_command(conn) more_to_come = operation.conn_mgr and operation.conn_mgr.more_to_come - cmd, dbn = self.operation_to_command(operation, conn, use_cmd) + cmd, dbn = self.operation_to_command(operation, conn, True) if more_to_come: request_id = 0 else: - message = operation.get_message(read_preference, conn, use_cmd) + message = operation.get_message(read_preference, conn, True) request_id, data, max_doc_size = self._split_message(message) if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): @@ -208,23 +208,16 @@ class Server: reply = conn.receive_message(request_id) # Unpack and check for command errors. - if use_cmd: - user_fields = _CURSOR_DOC_FIELDS - legacy_response = False - else: - user_fields = None - legacy_response = True docs = unpack_res( reply, operation.cursor_id, operation.codec_options, - legacy_response=legacy_response, - user_fields=user_fields, + legacy_response=False, + user_fields=_CURSOR_DOC_FIELDS, ) - if use_cmd: - first = docs[0] - operation.client._process_response(first, operation.session) # type: ignore[misc, arg-type] - _check_command_response(first, conn.max_wire_version, pool_opts=conn.opts) # type:ignore[has-type] + first = docs[0] + operation.client._process_response(first, operation.session) # type: ignore[misc, arg-type] + _check_command_response(first, conn.max_wire_version, pool_opts=conn.opts) # type:ignore[has-type] except Exception as exc: duration = datetime.now() - start if isinstance(exc, (NotPrimaryError, OperationFailure)): @@ -263,18 +256,8 @@ class Server: ) raise duration = datetime.now() - start - # Must publish in find / getMore / explain command response - # format. - if use_cmd: - res = docs[0] - elif operation.name == "explain": - res = docs[0] if docs else {} - else: - res = {"cursor": {"id": reply.cursor_id, "ns": operation.namespace()}, "ok": 1} # type: ignore[union-attr] - if operation.name == "find": - res["cursor"]["firstBatch"] = docs - else: - res["cursor"]["nextBatch"] = docs + # Must publish in find / getMore / explain command response format. + res = docs[0] if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _COMMAND_LOGGER, @@ -308,21 +291,14 @@ class Server: # Decrypt response. client = operation.client # type: ignore[assignment] if client and client._encrypter: - if use_cmd: - decrypted = client._encrypter.decrypt(reply.raw_command_response()) - docs = _decode_all_selective(decrypted, operation.codec_options, user_fields) + decrypted = client._encrypter.decrypt(reply.raw_command_response()) + docs = _decode_all_selective(decrypted, operation.codec_options, _CURSOR_DOC_FIELDS) response: Response if client._should_pin_cursor(operation.session) or operation.exhaust: # type: ignore[arg-type] conn.pin_cursor() - if isinstance(reply, _OpMsg): - # In OP_MSG, the server keeps sending only if the - # more_to_come flag is set. - more_to_come = reply.more_to_come - else: - # In OP_REPLY, the server keeps sending until cursor_id is 0. - more_to_come = bool(operation.exhaust and reply.cursor_id) + more_to_come = reply.more_to_come if operation.conn_mgr: operation.conn_mgr.update_exhaust(more_to_come) response = PinnedResponse( @@ -331,7 +307,7 @@ class Server: conn=conn, duration=duration, request_id=request_id, - from_command=use_cmd, + from_command=True, docs=docs, more_to_come=more_to_come, ) @@ -341,7 +317,7 @@ class Server: address=self._description.address, duration=duration, request_id=request_id, - from_command=use_cmd, + from_command=True, docs=docs, )