PYTHON-4925 Fix test bugs in $$matchAsDocument and $$matchAsRoot (#1988)

Fixes a bug where the driverConnectionId field was missing from "server heartbeat failed" log messages.
Avoids sending "upsert": False since various client.bulkWrite spec tests assume this field is only sent when it's True.
This commit is contained in:
Shane Harvey 2024-11-05 12:19:51 -08:00 committed by GitHub
parent 466d0a188f
commit 0733c4da44
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 609 additions and 65 deletions

View File

@ -140,8 +140,8 @@ class _AsyncBulk:
self,
selector: Mapping[str, Any],
update: Union[Mapping[str, Any], _Pipeline],
multi: bool = False,
upsert: bool = False,
multi: bool,
upsert: Optional[bool],
collation: Optional[Mapping[str, Any]] = None,
array_filters: Optional[list[Mapping[str, Any]]] = None,
hint: Union[str, dict[str, Any], None] = None,
@ -149,9 +149,9 @@ class _AsyncBulk:
) -> None:
"""Create an update document and add it to the list of ops."""
validate_ok_for_update(update)
cmd: dict[str, Any] = dict( # noqa: C406
[("q", selector), ("u", update), ("multi", multi), ("upsert", upsert)]
)
cmd: dict[str, Any] = {"q": selector, "u": update, "multi": multi}
if upsert is not None:
cmd["upsert"] = upsert
if collation is not None:
self.uses_collation = True
cmd["collation"] = collation
@ -173,14 +173,16 @@ class _AsyncBulk:
self,
selector: Mapping[str, Any],
replacement: Mapping[str, Any],
upsert: bool = False,
upsert: Optional[bool],
collation: Optional[Mapping[str, Any]] = None,
hint: Union[str, dict[str, Any], None] = None,
sort: Optional[Mapping[str, Any]] = None,
) -> None:
"""Create a replace document and add it to the list of ops."""
validate_ok_for_replace(replacement)
cmd = {"q": selector, "u": replacement, "multi": False, "upsert": upsert}
cmd: dict[str, Any] = {"q": selector, "u": replacement}
if upsert is not None:
cmd["upsert"] = upsert
if collation is not None:
self.uses_collation = True
cmd["collation"] = collation
@ -200,7 +202,7 @@ class _AsyncBulk:
hint: Union[str, dict[str, Any], None] = None,
) -> None:
"""Create a delete document and add it to the list of ops."""
cmd = {"q": selector, "limit": limit}
cmd: dict[str, Any] = {"q": selector, "limit": limit}
if collation is not None:
self.uses_collation = True
cmd["collation"] = collation

View File

@ -106,20 +106,13 @@ class _AsyncClientBulk:
self.bypass_doc_val = bypass_document_validation
self.comment = comment
self.verbose_results = verbose_results
self.ops: list[tuple[str, Mapping[str, Any]]] = []
self.namespaces: list[str] = []
self.idx_offset: int = 0
self.total_ops: int = 0
self.executed = False
self.uses_upsert = False
self.uses_collation = False
self.uses_array_filters = False
self.uses_hint_update = False
self.uses_hint_delete = False
self.uses_sort = False
self.is_retryable = self.client.options.retry_writes
self.retrying = False
self.started_retryable_write = False
@ -144,7 +137,7 @@ class _AsyncClientBulk:
namespace: str,
selector: Mapping[str, Any],
update: Union[Mapping[str, Any], _Pipeline],
multi: bool = False,
multi: bool,
upsert: Optional[bool] = None,
collation: Optional[Mapping[str, Any]] = None,
array_filters: Optional[list[Mapping[str, Any]]] = None,
@ -160,19 +153,16 @@ class _AsyncClientBulk:
"multi": multi,
}
if upsert is not None:
self.uses_upsert = True
cmd["upsert"] = upsert
if array_filters is not None:
self.uses_array_filters = True
cmd["arrayFilters"] = array_filters
if hint is not None:
self.uses_hint_update = True
cmd["hint"] = hint
if collation is not None:
self.uses_collation = True
cmd["collation"] = collation
if sort is not None:
self.uses_sort = True
cmd["sort"] = sort
if multi:
# A bulk_write containing an update_many is not retryable.
@ -200,16 +190,13 @@ class _AsyncClientBulk:
"multi": False,
}
if upsert is not None:
self.uses_upsert = True
cmd["upsert"] = upsert
if hint is not None:
self.uses_hint_update = True
cmd["hint"] = hint
if collation is not None:
self.uses_collation = True
cmd["collation"] = collation
if sort is not None:
self.uses_sort = True
cmd["sort"] = sort
self.ops.append(("replace", cmd))
self.namespaces.append(namespace)
@ -226,7 +213,6 @@ class _AsyncClientBulk:
"""Create a delete document and add it to the list of ops."""
cmd = {"delete": -1, "filter": selector, "multi": multi}
if hint is not None:
self.uses_hint_delete = True
cmd["hint"] = hint
if collation is not None:
self.uses_collation = True

View File

@ -149,6 +149,7 @@ class Monitor(MonitorBase):
self._listeners = self._settings._pool_options._event_listeners
self._publish = self._listeners is not None and self._listeners.enabled_for_server_heartbeat
self._cancel_context: Optional[_CancellationContext] = None
self._conn_id: Optional[int] = None
self._rtt_monitor = _RttMonitor(
topology,
topology_settings,
@ -243,6 +244,7 @@ class Monitor(MonitorBase):
Returns a ServerDescription.
"""
self._conn_id = None
start = time.monotonic()
try:
try:
@ -272,6 +274,7 @@ class Monitor(MonitorBase):
awaited=awaited,
durationMS=duration * 1000,
failure=error,
driverConnectionId=self._conn_id,
message=_SDAMStatusMessage.HEARTBEAT_FAIL,
)
await self._reset_connection()
@ -314,6 +317,8 @@ class Monitor(MonitorBase):
)
self._cancel_context = conn.cancel_context
# Record the connection id so we can later attach it to the failed log message.
self._conn_id = conn.id
response, round_trip_time = await self._check_with_socket(conn)
if not response.awaitable:
self._rtt_monitor.add_sample(round_trip_time)

View File

@ -332,7 +332,7 @@ class ReplaceOne(Generic[_DocumentType]):
self,
filter: Mapping[str, Any],
replacement: Union[_DocumentType, RawBSONDocument],
upsert: bool = False,
upsert: Optional[bool] = None,
collation: Optional[_CollationIn] = None,
hint: Optional[_IndexKeyHint] = None,
namespace: Optional[str] = None,
@ -693,7 +693,7 @@ class UpdateMany(_UpdateOp):
self._filter,
self._doc,
True,
bool(self._upsert),
self._upsert,
collation=validate_collation_or_none(self._collation),
array_filters=self._array_filters,
hint=self._hint,

View File

@ -140,8 +140,8 @@ class _Bulk:
self,
selector: Mapping[str, Any],
update: Union[Mapping[str, Any], _Pipeline],
multi: bool = False,
upsert: bool = False,
multi: bool,
upsert: Optional[bool],
collation: Optional[Mapping[str, Any]] = None,
array_filters: Optional[list[Mapping[str, Any]]] = None,
hint: Union[str, dict[str, Any], None] = None,
@ -149,9 +149,9 @@ class _Bulk:
) -> None:
"""Create an update document and add it to the list of ops."""
validate_ok_for_update(update)
cmd: dict[str, Any] = dict( # noqa: C406
[("q", selector), ("u", update), ("multi", multi), ("upsert", upsert)]
)
cmd: dict[str, Any] = {"q": selector, "u": update, "multi": multi}
if upsert is not None:
cmd["upsert"] = upsert
if collation is not None:
self.uses_collation = True
cmd["collation"] = collation
@ -173,14 +173,16 @@ class _Bulk:
self,
selector: Mapping[str, Any],
replacement: Mapping[str, Any],
upsert: bool = False,
upsert: Optional[bool],
collation: Optional[Mapping[str, Any]] = None,
hint: Union[str, dict[str, Any], None] = None,
sort: Optional[Mapping[str, Any]] = None,
) -> None:
"""Create a replace document and add it to the list of ops."""
validate_ok_for_replace(replacement)
cmd = {"q": selector, "u": replacement, "multi": False, "upsert": upsert}
cmd: dict[str, Any] = {"q": selector, "u": replacement}
if upsert is not None:
cmd["upsert"] = upsert
if collation is not None:
self.uses_collation = True
cmd["collation"] = collation
@ -200,7 +202,7 @@ class _Bulk:
hint: Union[str, dict[str, Any], None] = None,
) -> None:
"""Create a delete document and add it to the list of ops."""
cmd = {"q": selector, "limit": limit}
cmd: dict[str, Any] = {"q": selector, "limit": limit}
if collation is not None:
self.uses_collation = True
cmd["collation"] = collation

View File

@ -106,20 +106,13 @@ class _ClientBulk:
self.bypass_doc_val = bypass_document_validation
self.comment = comment
self.verbose_results = verbose_results
self.ops: list[tuple[str, Mapping[str, Any]]] = []
self.namespaces: list[str] = []
self.idx_offset: int = 0
self.total_ops: int = 0
self.executed = False
self.uses_upsert = False
self.uses_collation = False
self.uses_array_filters = False
self.uses_hint_update = False
self.uses_hint_delete = False
self.uses_sort = False
self.is_retryable = self.client.options.retry_writes
self.retrying = False
self.started_retryable_write = False
@ -144,7 +137,7 @@ class _ClientBulk:
namespace: str,
selector: Mapping[str, Any],
update: Union[Mapping[str, Any], _Pipeline],
multi: bool = False,
multi: bool,
upsert: Optional[bool] = None,
collation: Optional[Mapping[str, Any]] = None,
array_filters: Optional[list[Mapping[str, Any]]] = None,
@ -160,19 +153,16 @@ class _ClientBulk:
"multi": multi,
}
if upsert is not None:
self.uses_upsert = True
cmd["upsert"] = upsert
if array_filters is not None:
self.uses_array_filters = True
cmd["arrayFilters"] = array_filters
if hint is not None:
self.uses_hint_update = True
cmd["hint"] = hint
if collation is not None:
self.uses_collation = True
cmd["collation"] = collation
if sort is not None:
self.uses_sort = True
cmd["sort"] = sort
if multi:
# A bulk_write containing an update_many is not retryable.
@ -200,16 +190,13 @@ class _ClientBulk:
"multi": False,
}
if upsert is not None:
self.uses_upsert = True
cmd["upsert"] = upsert
if hint is not None:
self.uses_hint_update = True
cmd["hint"] = hint
if collation is not None:
self.uses_collation = True
cmd["collation"] = collation
if sort is not None:
self.uses_sort = True
cmd["sort"] = sort
self.ops.append(("replace", cmd))
self.namespaces.append(namespace)
@ -226,7 +213,6 @@ class _ClientBulk:
"""Create a delete document and add it to the list of ops."""
cmd = {"delete": -1, "filter": selector, "multi": multi}
if hint is not None:
self.uses_hint_delete = True
cmd["hint"] = hint
if collation is not None:
self.uses_collation = True

View File

@ -149,6 +149,7 @@ class Monitor(MonitorBase):
self._listeners = self._settings._pool_options._event_listeners
self._publish = self._listeners is not None and self._listeners.enabled_for_server_heartbeat
self._cancel_context: Optional[_CancellationContext] = None
self._conn_id: Optional[int] = None
self._rtt_monitor = _RttMonitor(
topology,
topology_settings,
@ -243,6 +244,7 @@ class Monitor(MonitorBase):
Returns a ServerDescription.
"""
self._conn_id = None
start = time.monotonic()
try:
try:
@ -272,6 +274,7 @@ class Monitor(MonitorBase):
awaited=awaited,
durationMS=duration * 1000,
failure=error,
driverConnectionId=self._conn_id,
message=_SDAMStatusMessage.HEARTBEAT_FAIL,
)
self._reset_connection()
@ -314,6 +317,8 @@ class Monitor(MonitorBase):
)
self._cancel_context = conn.cancel_context
# Record the connection id so we can later attach it to the failed log message.
self._conn_id = conn.id
response, round_trip_time = self._check_with_socket(conn)
if not response.awaitable:
self._rtt_monitor.add_sample(round_trip_time)

View File

@ -1328,8 +1328,8 @@ class UnifiedSpecTestMixinV1(AsyncIntegrationTest):
if log.module == "ocsp_support":
continue
data = json_util.loads(log.getMessage())
client = data.pop("clientId") if "clientId" in data else data.pop("topologyId")
client_to_log[client].append(
client_id = data.get("clientId", data.get("topologyId"))
client_to_log[client_id].append(
{
"level": log.levelname.lower(),
"component": log.name.replace("pymongo.", "", 1),

View File

@ -357,6 +357,7 @@
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
@ -398,6 +399,7 @@
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
@ -439,6 +441,7 @@
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
@ -589,6 +592,7 @@
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]

View File

@ -324,6 +324,7 @@
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
@ -475,6 +476,7 @@
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]

View File

@ -339,6 +339,7 @@
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
@ -500,6 +501,7 @@
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]

View File

@ -9,9 +9,7 @@
"tests": [
{
"description": "foo",
"operations": [
]
"operations": []
}
]
}

View File

@ -0,0 +1,205 @@
{
"description": "operator-matchAsDocument",
"schemaVersion": "1.13",
"createEntities": [
{
"client": {
"id": "client0"
}
},
{
"database": {
"id": "database0",
"client": "client0",
"databaseName": "test"
}
},
{
"collection": {
"id": "collection0",
"database": "database0",
"collectionName": "coll0"
}
}
],
"initialData": [
{
"collectionName": "coll0",
"databaseName": "test",
"documents": [
{
"_id": 1,
"json": "{ \"x\": 1, \"y\": 2 }"
},
{
"_id": 2,
"json": "1"
},
{
"_id": 3,
"json": "[ \"foo\" ]"
},
{
"_id": 4,
"json": "{ \"x\" }"
}
]
}
],
"tests": [
{
"description": "matchAsDocument with non-matching filter",
"operations": [
{
"name": "find",
"object": "collection0",
"arguments": {
"filter": {
"_id": 1
},
"limit": 1
},
"expectResult": [
{
"_id": 1,
"json": {
"$$matchAsDocument": {
"x": 1,
"y": "two"
}
}
}
]
}
]
},
{
"description": "matchAsDocument evaluates special operators",
"operations": [
{
"name": "find",
"object": "collection0",
"arguments": {
"filter": {
"_id": 1
},
"limit": 1
},
"expectResult": [
{
"_id": 1,
"json": {
"$$matchAsDocument": {
"x": 1,
"y": {
"$$exists": false
}
}
}
}
]
}
]
},
{
"description": "matchAsDocument does not permit extra fields",
"operations": [
{
"name": "find",
"object": "collection0",
"arguments": {
"filter": {
"_id": 1
},
"limit": 1
},
"expectResult": [
{
"_id": 1,
"json": {
"$$matchAsDocument": {
"x": 1
}
}
}
]
}
]
},
{
"description": "matchAsDocument expects JSON object but given scalar",
"operations": [
{
"name": "find",
"object": "collection0",
"arguments": {
"filter": {
"_id": 2
},
"limit": 1
},
"expectResult": [
{
"_id": 2,
"json": {
"$$matchAsDocument": {
"$$matchAsRoot": {}
}
}
}
]
}
]
},
{
"description": "matchAsDocument expects JSON object but given array",
"operations": [
{
"name": "find",
"object": "collection0",
"arguments": {
"filter": {
"_id": 3
},
"limit": 1
},
"expectResult": [
{
"_id": 3,
"json": {
"$$matchAsDocument": {
"$$matchAsRoot": {}
}
}
}
]
}
]
},
{
"description": "matchAsDocument fails to decode Extended JSON",
"operations": [
{
"name": "find",
"object": "collection0",
"arguments": {
"filter": {
"_id": 4
},
"limit": 1
},
"expectResult": [
{
"_id": 4,
"json": {
"$$matchAsDocument": {
"$$matchAsRoot": {}
}
}
}
]
}
]
}
]
}

View File

@ -0,0 +1,67 @@
{
"description": "operator-matchAsRoot",
"schemaVersion": "1.13",
"createEntities": [
{
"client": {
"id": "client0"
}
},
{
"database": {
"id": "database0",
"client": "client0",
"databaseName": "test"
}
},
{
"collection": {
"id": "collection0",
"database": "database0",
"collectionName": "coll0"
}
}
],
"initialData": [
{
"collectionName": "coll0",
"databaseName": "test",
"documents": [
{
"_id": 1,
"x": {
"y": 2,
"z": 3
}
}
]
}
],
"tests": [
{
"description": "matchAsRoot with nested document does not match",
"operations": [
{
"name": "find",
"object": "collection0",
"arguments": {
"filter": {
"_id": 1
},
"limit": 1
},
"expectResult": [
{
"_id": 1,
"x": {
"$$matchAsRoot": {
"y": 3
}
}
}
]
}
]
}
]
}

View File

@ -1,5 +1,5 @@
{
"description": "matches-lte-operator",
"description": "operator-lte",
"schemaVersion": "1.9",
"createEntities": [
{

View File

@ -0,0 +1,124 @@
{
"description": "operator-matchAsDocument",
"schemaVersion": "1.13",
"createEntities": [
{
"client": {
"id": "client0"
}
},
{
"database": {
"id": "database0",
"client": "client0",
"databaseName": "test"
}
},
{
"collection": {
"id": "collection0",
"database": "database0",
"collectionName": "coll0"
}
}
],
"initialData": [
{
"collectionName": "coll0",
"databaseName": "test",
"documents": [
{
"_id": 1,
"json": "{ \"x\": 1, \"y\": 2.0 }"
},
{
"_id": 2,
"json": "{ \"x\": { \"$oid\": \"57e193d7a9cc81b4027498b5\" } }"
}
]
}
],
"tests": [
{
"description": "matchAsDocument performs flexible numeric comparisons",
"operations": [
{
"name": "find",
"object": "collection0",
"arguments": {
"filter": {
"_id": 1
},
"limit": 1
},
"expectResult": [
{
"_id": 1,
"json": {
"$$matchAsDocument": {
"x": 1,
"y": 2
}
}
}
]
}
]
},
{
"description": "matchAsDocument evaluates special operators",
"operations": [
{
"name": "find",
"object": "collection0",
"arguments": {
"filter": {
"_id": 1
},
"limit": 1
},
"expectResult": [
{
"_id": 1,
"json": {
"$$matchAsDocument": {
"x": 1,
"y": {
"$$exists": true
}
}
}
}
]
}
]
},
{
"description": "matchAsDocument decodes Extended JSON",
"operations": [
{
"name": "find",
"object": "collection0",
"arguments": {
"filter": {
"_id": 2
},
"limit": 1
},
"expectResult": [
{
"_id": 2,
"json": {
"$$matchAsDocument": {
"x": {
"$$type": "objectId"
}
}
}
}
]
}
]
}
]
}

View File

@ -0,0 +1,151 @@
{
"description": "operator-matchAsRoot",
"schemaVersion": "1.13",
"createEntities": [
{
"client": {
"id": "client0"
}
},
{
"database": {
"id": "database0",
"client": "client0",
"databaseName": "test"
}
},
{
"collection": {
"id": "collection0",
"database": "database0",
"collectionName": "coll0"
}
}
],
"initialData": [
{
"collectionName": "coll0",
"databaseName": "test",
"documents": [
{
"_id": 1,
"x": {
"y": 2,
"z": 3
}
},
{
"_id": 2,
"json": "{ \"x\": 1, \"y\": 2 }"
}
]
}
],
"tests": [
{
"description": "matchAsRoot with nested document",
"operations": [
{
"name": "find",
"object": "collection0",
"arguments": {
"filter": {
"_id": 1
},
"limit": 1
},
"expectResult": [
{
"_id": 1,
"x": {
"$$matchAsRoot": {
"y": 2
}
}
}
]
}
]
},
{
"description": "matchAsRoot performs flexible numeric comparisons",
"operations": [
{
"name": "find",
"object": "collection0",
"arguments": {
"filter": {
"_id": 1
},
"limit": 1
},
"expectResult": [
{
"_id": 1,
"x": {
"$$matchAsRoot": {
"y": 2
}
}
}
]
}
]
},
{
"description": "matchAsRoot evaluates special operators",
"operations": [
{
"name": "find",
"object": "collection0",
"arguments": {
"filter": {
"_id": 1
},
"limit": 1
},
"expectResult": [
{
"_id": 1,
"x": {
"$$matchAsRoot": {
"y": 2,
"z": {
"$$exists": true
}
}
}
}
]
}
]
},
{
"description": "matchAsRoot with matchAsDocument",
"operations": [
{
"name": "find",
"object": "collection0",
"arguments": {
"filter": {
"_id": 2
},
"limit": 1
},
"expectResult": [
{
"_id": 2,
"json": {
"$$matchAsDocument": {
"$$matchAsRoot": {
"x": 1
}
}
}
}
]
}
]
}
]
}

View File

@ -1314,8 +1314,8 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
if log.module == "ocsp_support":
continue
data = json_util.loads(log.getMessage())
client = data.pop("clientId") if "clientId" in data else data.pop("topologyId")
client_to_log[client].append(
client_id = data.get("clientId", data.get("topologyId"))
client_to_log[client_id].append(
{
"level": log.levelname.lower(),
"component": log.name.replace("pymongo.", "", 1),

View File

@ -433,10 +433,12 @@ class MatchEvaluatorUtil:
self.test.assertLessEqual(actual[key_to_compare], spec)
def _operation_matchAsDocument(self, spec, actual, key_to_compare):
self._match_document(spec, json_util.loads(actual[key_to_compare]), False)
self._match_document(spec, json_util.loads(actual[key_to_compare]), False, test=True)
def _operation_matchAsRoot(self, spec, actual, key_to_compare):
self._match_document(spec, actual, True)
if key_to_compare:
actual = actual[key_to_compare]
self._match_document(spec, actual, True, test=True)
def _evaluate_special_operation(self, opname, spec, actual, key_to_compare):
method_name = "_operation_{}".format(opname.strip("$"))
@ -489,7 +491,7 @@ class MatchEvaluatorUtil:
def _match_document(self, expectation, actual, is_root, test=False):
if self._evaluate_if_special_operation(expectation, actual):
return
return True
self.test.assertIsInstance(actual, abc.Mapping)
for key, value in expectation.items():
@ -521,25 +523,26 @@ class MatchEvaluatorUtil:
self.test.assertIsInstance(actual, abc.MutableSequence)
for e, a in zip(expectation, actual):
if isinstance(e, abc.Mapping):
self._match_document(e, a, is_root=not in_recursive_call, test=test)
res = self._match_document(e, a, is_root=not in_recursive_call, test=test)
else:
self.match_result(e, a, in_recursive_call=True, test=test)
return None
res = self.match_result(e, a, in_recursive_call=True, test=test)
if not res:
return False
return True
# account for flexible numerics in element-wise comparison
if isinstance(expectation, int) or isinstance(expectation, float):
if isinstance(expectation, (int, float)):
if test:
self.test.assertEqual(expectation, actual)
else:
return expectation == actual
return None
else:
if test:
self.test.assertIsInstance(actual, type(expectation))
self.test.assertEqual(expectation, actual)
else:
return isinstance(actual, type(expectation)) and expectation == actual
return None
return True
def match_server_description(self, actual: ServerDescription, spec: dict) -> None:
for field, expected in spec.items():

View File

@ -20,6 +20,7 @@ import contextlib
import copy
import functools
import os
import random
import re
import shutil
import sys
@ -309,6 +310,7 @@ class MockConnection:
def __init__(self):
self.cancel_context = _CancellationContext()
self.more_to_come = False
self.id = random.randint(0, 100)
def close_conn(self, reason):
pass