PYTHON-5676 Phase 1: Remove dead legacy OP_QUERY code from run_operation()

Since MIN_SUPPORTED_WIRE_VERSION=8 (MongoDB 4.2+), use_command() always
returns True and OP_MSG is always used. Remove the unreachable OP_QUERY
branches from Server.run_operation(), along with the dead else-branches
in Cursor and CommandCursor that referenced _OpReply.

- Hard-code use_cmd=True / apply_timeout=True in run_operation()
- Always use user_fields=_CURSOR_DOC_FIELDS and legacy_response=False
- Remove isinstance(reply, _OpMsg) exhaust check (reply is always _OpMsg)
- Hard-code from_command=True in Response/PinnedResponse construction
- Remove dead else-branches with assert isinstance(response.data, _OpReply)
  in Cursor._send_message and CommandCursor._send_message
- Add scope document PYTHON-5676-scope.md outlining all four phases
This commit is contained in:
Jeffrey 'Alex' Clark 2026-05-01 18:46:03 -04:00
parent f4219bdca2
commit d6e7a82bdb
7 changed files with 251 additions and 138 deletions

183
PYTHON-5676-scope.md Normal file
View File

@ -0,0 +1,183 @@
# PYTHON-5676: Consolidate Command Execution Logic — Scope Document
## Context
PyMongo has accumulated several distinct code paths for executing commands against the server. When cross-cutting logic (e.g., backpressure, CSOT, monitoring) needs to be added or changed, engineers must find and update multiple paths. The goal is a single central path for all database operations, making future changes local to one place.
PYTHON-1357 (Refactor Cursor and CommandCursor) is now Closed — the prerequisite work is done.
---
## Current Command Execution Paths
All paths are duplicated across `pymongo/asynchronous/` (source of truth) and `pymongo/synchronous/` (auto-generated via `tools/synchro.sh`).
### Path 1 — Standard commands (most operations)
```
Collection._command() / Database.command() / conn.command() (direct)
→ Connection.command() [pool.py:344] — session, write concern, server API, reauth
→ network.command() [network.py:61] — encode OP_MSG, APM, logging, CSOT, encryption, send/recv
```
Used by: insert, update, delete, aggregate, distinct, find_one, createIndex, dropCollection, db.command(), etc.
### Path 2 — Cursor operations (find / getMore)
```
Cursor._send_message()
→ MongoClient._run_operation() [mongo_client.py:1911]
→ Server.run_operation() [server.py:138] — send/recv + FULL APM/logging (≈80 lines, near-identical to network.command())
```
**Bypasses `network.command()` entirely.** Has its own APM/monitoring and logging code.
### Path 3 — Acknowledged bulk writes (collection-level)
```
_Bulk._execute_batch()
→ _Bulk.write_command() [bulk.py:244] — APM/logging wrapper (≈80 lines)
→ Connection.write_command() [pool.py:480] — pre-encoded bytes, raw send+recv, no APM
```
**Bypasses both `Connection.command()` and `network.command()`.** Note: for *encrypted* bulk writes, this path already uses `Connection.command()` — the non-encrypted path is the outlier.
### Path 4 — Unacknowledged bulk writes
```
_Bulk._execute_batch_unack()
→ _Bulk.unack_write() [bulk.py:329] — APM/logging wrapper (≈70 lines)
→ Connection.unack_write() [pool.py:469] — raw send only, no recv
```
**Bypasses `Connection.command()` and `network.command()`.**
### Path 5 — Client-level bulk writes (cross-collection)
`client_bulk.py` mirrors Paths 3 and 4 with its own `write_command()` and `unack_write()` wrappers — another full copy of the APM/logging boilerplate.
---
## Key Findings
**1. APM/logging code is copy-pasted 5+ times.**
`Server.run_operation()`, `_Bulk.write_command()`, `_Bulk.unack_write()`, `_ClientBulk.write_command()`, `_ClientBulk.unack_write()` all contain the same ~70-80 line block of `_COMMAND_LOGGER.isEnabledFor(DEBUG)` + `_debug_log(...)` + `listeners.publish_command_start/success/failure(...)`. The reference implementation is `network.command()`.
**2. Dead legacy OP_QUERY code in `Server.run_operation()`.**
`MIN_SUPPORTED_WIRE_VERSION = 8` (MongoDB 4.2+, `common.py`). OP_MSG was introduced at wire version 6. The `use_cmd = False` branch (legacy OP_QUERY path) in `run_operation()` can never be reached for any currently supported server. The `_Query.use_command()` check at `message.py:1622` confirms: for wire version ≥ 8, it is unconditionally True.
**3. `_op_msg()` already handles bulk write Type 1 sections.**
`message._op_msg()` (line 394) detects insert/update/delete commands, pops the documents field, and encodes them as a Type 1 section. The non-encrypted bulk write path bypasses this and does its own single-pass encode+batch-size-check via `_do_batched_op_msg`. Unifying would require separating batch determination from encoding (currently combined for performance).
**4. Encrypted bulk writes already use `Connection.command()`.**
In `_Bulk._execute_batch()` (bulk.py:444), the `if self.is_encrypted:` branch calls `bwc.conn.command()`. The non-encrypted `else` branch goes through `Connection.write_command()`. These should be the same path.
**5. Async is generated from async source.**
All `pymongo/synchronous/*.py` files are auto-generated from `pymongo/asynchronous/*.py` via `tools/synchro.sh`. All code changes must be made in the `asynchronous/` files only.
---
## Proposed Consolidation: Phased Approach
### Phase 1 — Remove dead OP_QUERY code from `run_operation()` *(Low risk)*
**What:** Delete the `use_cmd = False` branches from `Server.run_operation()`. Remove the conditional `if use_cmd: ... else: ...` blocks for `user_fields`/`legacy_response` and response-building.
**Files:** `pymongo/asynchronous/server.py`, `pymongo/asynchronous/cursor.py` (remove the dead `else` branches referencing `_OpReply`)
**Why now:** This is purely dead code removal — no behavior change. It simplifies Phase 3 significantly and is independently safe.
**Verification:** Full test suite; specifically `test/test_cursor.py` and `test/test_unified_format.py`.
---
### Phase 2 — Extract shared APM/logging helpers *(Low risk)*
**What:** Extract the duplicated APM/logging block into shared helper functions in `pymongo/helpers_shared.py` (or a new `pymongo/command_helpers.py`). All five duplicate sites call the helpers instead of inlining the code.
```python
# No sync/async split needed — these functions contain no I/O
def _log_and_publish_command_started(client, listeners, cmd, dbname, request_id, conn):
...
def _log_and_publish_command_succeeded(
client, listeners, cmd, dbname, request_id, conn, reply, duration
):
...
def _log_and_publish_command_failed(
client, listeners, cmd, dbname, request_id, conn, failure, duration
):
...
```
**Files:** `pymongo/helpers_shared.py` (or new `pymongo/command_helpers.py`), `pymongo/asynchronous/network.py`, `pymongo/asynchronous/server.py`, `pymongo/asynchronous/bulk.py`, `pymongo/asynchronous/client_bulk.py`
**Why this matters:** This is the direct solution to the stated problem. When future cross-cutting logic needs to be added to "command execution", there is one place to add it.
**Note on APM operationId:** `_BulkWriteContext` passes `op_id` (not `request_id`) as the `operationId` in APM events. This is spec-required behavior that must be preserved in the helpers.
**Verification:** `test/test_command_monitoring.py`, `test/test_monitoring.py`, `test/test_unified_format.py`.
---
### Phase 3 — Unify `Server.run_operation()` with `network.command()` *(Medium risk)*
**What:** Refactor `network.command()` into a two-layer structure:
- `_network_command_core()` — performs all work (encode, APM, send, recv) and returns `(response_doc, raw_reply, request_id, duration)`
- `command()` — thin wrapper returning just `response_doc` (existing callers unchanged)
`Server.run_operation()` (after Phase 1+2) calls `_network_command_core()` for the actual transport, then wraps the result in `Response`/`PinnedResponse`.
**Key complexity:** `RawBatchCursor._unpack_response()` (`cursor.py:1196`) calls `response.raw_response(cursor_id)` on the raw `_OpMsg` object — it needs the raw reply, not just the decoded dict. The `_network_command_core()` design satisfies this by exposing `raw_reply`. The `@_handle_reauth` decorator on `run_operation()` must also be preserved.
**Files:** `pymongo/asynchronous/network.py`, `pymongo/asynchronous/server.py`
**Result:** Cursor operations (find/getMore) fully share the command execution path with all other operations. Single place to add transport-level logic.
**Verification:** `test/test_cursor.py`, exhaust cursor tests, `test/test_encryption.py`, `test/test_csot.py`.
---
### Phase 4 — Unify non-encrypted bulk write path with `Connection.command()` *(Higher risk, defer)*
**What:** Change `_Bulk._execute_batch()` to use `Connection.command()` for non-encrypted bulk writes, matching the already-unified encrypted path. Requires `_BulkWriteContext.batch_command()` to return a command dict instead of pre-encoded bytes.
**The challenge:** `_do_batched_op_msg` performs batch-size determination and encoding in a single pass (including a C extension: `_cmessage._batched_op_msg`). Separating batch determination from encoding requires a two-pass approach, adding encoding overhead. **Needs benchmarking before committing.**
`_EncryptedBulkWriteContext.batch_command()` encodes to bytes and then deserializes back via `_inflate_bson` — aligning the non-encrypted path with this approach may be the right model.
**Result after Phase 4:** `Connection.write_command()`, `Connection.unack_write()`, `_Bulk.write_command()`, `_Bulk.unack_write()`, `_ClientBulk.write_command()`, and `_ClientBulk.unack_write()` have no callers and can be removed.
**Gate:** Bulk write benchmarks before/after. Suggested regression threshold: ≤2% throughput degradation.
---
## Definition of Done
- All database operations route through `Connection.command()``network.command()` for actual transport
- APM/logging code exists in one location
- `Connection.write_command()`, `Connection.unack_write()`, `_Bulk.write_command()`, `_Bulk.unack_write()`, `_ClientBulk.write_command()`, `_ClientBulk.unack_write()` are removed (or reduced to thin wrappers)
- No performance regression (bulk write benchmarks)
- No behavioral regression (full spec test suite passes)
---
## Risk Summary
| Phase | Risk | Effort | Value |
|-------|------|--------|-------|
| 1 — Remove dead OP_QUERY code | Low | ~1 day | Medium (simplification) |
| 2 — Extract APM helpers | Low | ~2 days | **High** (solves the stated problem) |
| 3 — Unify run_operation with network.command | Medium | ~3 days | High (true path consolidation) |
| 4 — Unify bulk write bytes path | Higher | ~1 week + benchmarking | Medium (removes last bypass) |
Phases 1 and 2 can be done together in one PR. Phase 3 should be a separate PR. Phase 4 should be gated on benchmarking.
---
## Critical Files
| File | Role |
|------|------|
| `pymongo/asynchronous/network.py:61` | Central `command()` — reference implementation |
| `pymongo/asynchronous/server.py:138` | `run_operation()` — cursor path bypass |
| `pymongo/asynchronous/pool.py:344,469,480` | `Connection.command`, `unack_write`, `write_command` |
| `pymongo/asynchronous/bulk.py:244,329,444` | `_Bulk.write_command`, `unack_write`, `_execute_batch` |
| `pymongo/asynchronous/client_bulk.py:229,320` | `_ClientBulk.write_command`, `unack_write` |
| `pymongo/message.py:394,695,1622` | `_op_msg`, `_BulkWriteContext.batch_command`, `_Query.use_command` |
| `pymongo/common.py` | `MIN_SUPPORTED_WIRE_VERSION = 8` |

View File

@ -189,15 +189,10 @@ class AsyncCommandCursor(_AsyncCursorBase[_DocumentType]):
if isinstance(response, PinnedResponse):
if not self._sock_mgr:
self._sock_mgr = _ConnectionManager(response.conn, response.more_to_come) # type: ignore[arg-type]
if response.from_command:
cursor = response.docs[0]["cursor"]
documents = cursor["nextBatch"]
self._postbatchresumetoken = cursor.get("postBatchResumeToken")
self._id = cursor["id"]
else:
documents = response.docs
assert isinstance(response.data, _OpReply)
self._id = response.data.cursor_id
cursor = response.docs[0]["cursor"]
documents = cursor["nextBatch"]
self._postbatchresumetoken = cursor.get("postBatchResumeToken")
self._id = cursor["id"]
if self._id == 0:
await self.close()

View File

@ -1020,29 +1020,23 @@ class AsyncCursor(_AsyncCursorBase[_DocumentType]):
cmd_name = operation.name
docs = response.docs
if response.from_command:
if cmd_name != "explain":
cursor = docs[0]["cursor"]
self._id = cursor["id"]
if cmd_name == "find":
documents = cursor["firstBatch"]
# Update the namespace used for future getMore commands.
ns = cursor.get("ns")
if ns:
self._dbname, self._collname = ns.split(".", 1)
else:
documents = cursor["nextBatch"]
self._data = deque(documents)
self._retrieved += len(documents)
if cmd_name != "explain":
cursor = docs[0]["cursor"]
self._id = cursor["id"]
if cmd_name == "find":
documents = cursor["firstBatch"]
# Update the namespace used for future getMore commands.
ns = cursor.get("ns")
if ns:
self._dbname, self._collname = ns.split(".", 1)
else:
self._id = 0
self._data = deque(docs)
self._retrieved += len(docs)
documents = cursor["nextBatch"]
self._data = deque(documents)
self._retrieved += len(documents)
else:
assert isinstance(response.data, _OpReply)
self._id = response.data.cursor_id
self._id = 0
self._data = deque(docs)
self._retrieved += response.data.number_returned
self._retrieved += len(docs)
if self._id == 0:
# Don't wait for garbage collection to call __del__, return the

View File

@ -37,7 +37,7 @@ from pymongo.logger import (
_debug_log,
_SDAMStatusMessage,
)
from pymongo.message import _convert_exception, _GetMore, _OpMsg, _Query
from pymongo.message import _convert_exception, _GetMore, _Query
from pymongo.response import PinnedResponse, Response
if TYPE_CHECKING:
@ -161,13 +161,13 @@ class Server:
publish = listeners.enabled_for_commands
start = datetime.now()
use_cmd = operation.use_command(conn)
operation.use_command(conn)
more_to_come = operation.conn_mgr and operation.conn_mgr.more_to_come
cmd, dbn = await self.operation_to_command(operation, conn, use_cmd)
cmd, dbn = await self.operation_to_command(operation, conn, True)
if more_to_come:
request_id = 0
else:
message = operation.get_message(read_preference, conn, use_cmd)
message = operation.get_message(read_preference, conn, True)
request_id, data, max_doc_size = self._split_message(message)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
@ -208,23 +208,16 @@ class Server:
reply = await conn.receive_message(request_id)
# Unpack and check for command errors.
if use_cmd:
user_fields = _CURSOR_DOC_FIELDS
legacy_response = False
else:
user_fields = None
legacy_response = True
docs = unpack_res(
reply,
operation.cursor_id,
operation.codec_options,
legacy_response=legacy_response,
user_fields=user_fields,
legacy_response=False,
user_fields=_CURSOR_DOC_FIELDS,
)
if use_cmd:
first = docs[0]
await operation.client._process_response(first, operation.session) # type: ignore[misc, arg-type]
_check_command_response(first, conn.max_wire_version, pool_opts=conn.opts) # type:ignore[has-type]
first = docs[0]
await operation.client._process_response(first, operation.session) # type: ignore[misc, arg-type]
_check_command_response(first, conn.max_wire_version, pool_opts=conn.opts) # type:ignore[has-type]
except Exception as exc:
duration = datetime.now() - start
if isinstance(exc, (NotPrimaryError, OperationFailure)):
@ -263,18 +256,8 @@ class Server:
)
raise
duration = datetime.now() - start
# Must publish in find / getMore / explain command response
# format.
if use_cmd:
res = docs[0]
elif operation.name == "explain":
res = docs[0] if docs else {}
else:
res = {"cursor": {"id": reply.cursor_id, "ns": operation.namespace()}, "ok": 1} # type: ignore[union-attr]
if operation.name == "find":
res["cursor"]["firstBatch"] = docs
else:
res["cursor"]["nextBatch"] = docs
# Must publish in find / getMore / explain command response format.
res = docs[0]
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
@ -308,21 +291,14 @@ class Server:
# Decrypt response.
client = operation.client # type: ignore[assignment]
if client and client._encrypter:
if use_cmd:
decrypted = await client._encrypter.decrypt(reply.raw_command_response())
docs = _decode_all_selective(decrypted, operation.codec_options, user_fields)
decrypted = await client._encrypter.decrypt(reply.raw_command_response())
docs = _decode_all_selective(decrypted, operation.codec_options, _CURSOR_DOC_FIELDS)
response: Response
if client._should_pin_cursor(operation.session) or operation.exhaust: # type: ignore[arg-type]
conn.pin_cursor()
if isinstance(reply, _OpMsg):
# In OP_MSG, the server keeps sending only if the
# more_to_come flag is set.
more_to_come = reply.more_to_come
else:
# In OP_REPLY, the server keeps sending until cursor_id is 0.
more_to_come = bool(operation.exhaust and reply.cursor_id)
more_to_come = reply.more_to_come
if operation.conn_mgr:
operation.conn_mgr.update_exhaust(more_to_come)
response = PinnedResponse(
@ -331,7 +307,7 @@ class Server:
conn=conn,
duration=duration,
request_id=request_id,
from_command=use_cmd,
from_command=True,
docs=docs,
more_to_come=more_to_come,
)
@ -341,7 +317,7 @@ class Server:
address=self._description.address,
duration=duration,
request_id=request_id,
from_command=use_cmd,
from_command=True,
docs=docs,
)

View File

@ -189,15 +189,10 @@ class CommandCursor(_CursorBase[_DocumentType]):
if isinstance(response, PinnedResponse):
if not self._sock_mgr:
self._sock_mgr = _ConnectionManager(response.conn, response.more_to_come) # type: ignore[arg-type]
if response.from_command:
cursor = response.docs[0]["cursor"]
documents = cursor["nextBatch"]
self._postbatchresumetoken = cursor.get("postBatchResumeToken")
self._id = cursor["id"]
else:
documents = response.docs
assert isinstance(response.data, _OpReply)
self._id = response.data.cursor_id
cursor = response.docs[0]["cursor"]
documents = cursor["nextBatch"]
self._postbatchresumetoken = cursor.get("postBatchResumeToken")
self._id = cursor["id"]
if self._id == 0:
self.close()

View File

@ -1018,29 +1018,23 @@ class Cursor(_CursorBase[_DocumentType]):
cmd_name = operation.name
docs = response.docs
if response.from_command:
if cmd_name != "explain":
cursor = docs[0]["cursor"]
self._id = cursor["id"]
if cmd_name == "find":
documents = cursor["firstBatch"]
# Update the namespace used for future getMore commands.
ns = cursor.get("ns")
if ns:
self._dbname, self._collname = ns.split(".", 1)
else:
documents = cursor["nextBatch"]
self._data = deque(documents)
self._retrieved += len(documents)
if cmd_name != "explain":
cursor = docs[0]["cursor"]
self._id = cursor["id"]
if cmd_name == "find":
documents = cursor["firstBatch"]
# Update the namespace used for future getMore commands.
ns = cursor.get("ns")
if ns:
self._dbname, self._collname = ns.split(".", 1)
else:
self._id = 0
self._data = deque(docs)
self._retrieved += len(docs)
documents = cursor["nextBatch"]
self._data = deque(documents)
self._retrieved += len(documents)
else:
assert isinstance(response.data, _OpReply)
self._id = response.data.cursor_id
self._id = 0
self._data = deque(docs)
self._retrieved += response.data.number_returned
self._retrieved += len(docs)
if self._id == 0:
# Don't wait for garbage collection to call __del__, return the

View File

@ -36,7 +36,7 @@ from pymongo.logger import (
_debug_log,
_SDAMStatusMessage,
)
from pymongo.message import _convert_exception, _GetMore, _OpMsg, _Query
from pymongo.message import _convert_exception, _GetMore, _Query
from pymongo.response import PinnedResponse, Response
from pymongo.synchronous.helpers import _handle_reauth
@ -161,13 +161,13 @@ class Server:
publish = listeners.enabled_for_commands
start = datetime.now()
use_cmd = operation.use_command(conn)
operation.use_command(conn)
more_to_come = operation.conn_mgr and operation.conn_mgr.more_to_come
cmd, dbn = self.operation_to_command(operation, conn, use_cmd)
cmd, dbn = self.operation_to_command(operation, conn, True)
if more_to_come:
request_id = 0
else:
message = operation.get_message(read_preference, conn, use_cmd)
message = operation.get_message(read_preference, conn, True)
request_id, data, max_doc_size = self._split_message(message)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
@ -208,23 +208,16 @@ class Server:
reply = conn.receive_message(request_id)
# Unpack and check for command errors.
if use_cmd:
user_fields = _CURSOR_DOC_FIELDS
legacy_response = False
else:
user_fields = None
legacy_response = True
docs = unpack_res(
reply,
operation.cursor_id,
operation.codec_options,
legacy_response=legacy_response,
user_fields=user_fields,
legacy_response=False,
user_fields=_CURSOR_DOC_FIELDS,
)
if use_cmd:
first = docs[0]
operation.client._process_response(first, operation.session) # type: ignore[misc, arg-type]
_check_command_response(first, conn.max_wire_version, pool_opts=conn.opts) # type:ignore[has-type]
first = docs[0]
operation.client._process_response(first, operation.session) # type: ignore[misc, arg-type]
_check_command_response(first, conn.max_wire_version, pool_opts=conn.opts) # type:ignore[has-type]
except Exception as exc:
duration = datetime.now() - start
if isinstance(exc, (NotPrimaryError, OperationFailure)):
@ -263,18 +256,8 @@ class Server:
)
raise
duration = datetime.now() - start
# Must publish in find / getMore / explain command response
# format.
if use_cmd:
res = docs[0]
elif operation.name == "explain":
res = docs[0] if docs else {}
else:
res = {"cursor": {"id": reply.cursor_id, "ns": operation.namespace()}, "ok": 1} # type: ignore[union-attr]
if operation.name == "find":
res["cursor"]["firstBatch"] = docs
else:
res["cursor"]["nextBatch"] = docs
# Must publish in find / getMore / explain command response format.
res = docs[0]
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
@ -308,21 +291,14 @@ class Server:
# Decrypt response.
client = operation.client # type: ignore[assignment]
if client and client._encrypter:
if use_cmd:
decrypted = client._encrypter.decrypt(reply.raw_command_response())
docs = _decode_all_selective(decrypted, operation.codec_options, user_fields)
decrypted = client._encrypter.decrypt(reply.raw_command_response())
docs = _decode_all_selective(decrypted, operation.codec_options, _CURSOR_DOC_FIELDS)
response: Response
if client._should_pin_cursor(operation.session) or operation.exhaust: # type: ignore[arg-type]
conn.pin_cursor()
if isinstance(reply, _OpMsg):
# In OP_MSG, the server keeps sending only if the
# more_to_come flag is set.
more_to_come = reply.more_to_come
else:
# In OP_REPLY, the server keeps sending until cursor_id is 0.
more_to_come = bool(operation.exhaust and reply.cursor_id)
more_to_come = reply.more_to_come
if operation.conn_mgr:
operation.conn_mgr.update_exhaust(more_to_come)
response = PinnedResponse(
@ -331,7 +307,7 @@ class Server:
conn=conn,
duration=duration,
request_id=request_id,
from_command=use_cmd,
from_command=True,
docs=docs,
more_to_come=more_to_come,
)
@ -341,7 +317,7 @@ class Server:
address=self._description.address,
duration=duration,
request_id=request_id,
from_command=use_cmd,
from_command=True,
docs=docs,
)