From 2b667df14f55f669d2b4cb4126f4501b4c30420e Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Wed, 26 Feb 2025 11:16:44 -0800 Subject: [PATCH 1/9] PYTHON-5120 Reduce configureFailPoint duplication in tests (#2131) --- test/__init__.py | 17 +++++++++++------ test/asynchronous/__init__.py | 17 +++++++++++------ test/asynchronous/test_connection_monitoring.py | 7 +------ test/asynchronous/test_transactions.py | 7 +------ test/asynchronous/unified_format.py | 8 ++------ test/asynchronous/utils_spec_runner.py | 9 ++------- test/test_connection_monitoring.py | 7 +------ test/test_transactions.py | 7 +------ test/unified_format.py | 8 ++------ test/utils_spec_runner.py | 9 ++------- 10 files changed, 34 insertions(+), 62 deletions(-) diff --git a/test/__init__.py b/test/__init__.py index 6eda00bde..307780271 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -933,17 +933,22 @@ class PyMongoTestCase(unittest.TestCase): def assertEqualReply(self, expected, actual, msg=None): self.assertEqual(sanitize_reply(expected), sanitize_reply(actual), msg) + @staticmethod + def configure_fail_point(client, command_args, off=False): + cmd = {"configureFailPoint": "failCommand"} + cmd.update(command_args) + if off: + cmd["mode"] = "off" + cmd.pop("data", None) + client.admin.command(cmd) + @contextmanager def fail_point(self, command_args): - cmd_on = SON([("configureFailPoint", "failCommand")]) - cmd_on.update(command_args) - client_context.client.admin.command(cmd_on) + self.configure_fail_point(client_context.client, command_args) try: yield finally: - client_context.client.admin.command( - "configureFailPoint", cmd_on["configureFailPoint"], mode="off" - ) + self.configure_fail_point(client_context.client, command_args, off=True) @contextmanager def fork( diff --git a/test/asynchronous/__init__.py b/test/asynchronous/__init__.py index b3b0ca93e..f03fcf4ee 100644 --- a/test/asynchronous/__init__.py +++ b/test/asynchronous/__init__.py @@ -935,17 +935,22 @@ class AsyncPyMongoTestCase(unittest.TestCase): def assertEqualReply(self, expected, actual, msg=None): self.assertEqual(sanitize_reply(expected), sanitize_reply(actual), msg) + @staticmethod + async def configure_fail_point(client, command_args, off=False): + cmd = {"configureFailPoint": "failCommand"} + cmd.update(command_args) + if off: + cmd["mode"] = "off" + cmd.pop("data", None) + await client.admin.command(cmd) + @asynccontextmanager async def fail_point(self, command_args): - cmd_on = SON([("configureFailPoint", "failCommand")]) - cmd_on.update(command_args) - await async_client_context.client.admin.command(cmd_on) + await self.configure_fail_point(async_client_context.client, command_args) try: yield finally: - await async_client_context.client.admin.command( - "configureFailPoint", cmd_on["configureFailPoint"], mode="off" - ) + await self.configure_fail_point(async_client_context.client, command_args, off=True) @contextmanager def fork( diff --git a/test/asynchronous/test_connection_monitoring.py b/test/asynchronous/test_connection_monitoring.py index a68b2a90c..cdf4887ba 100644 --- a/test/asynchronous/test_connection_monitoring.py +++ b/test/asynchronous/test_connection_monitoring.py @@ -211,15 +211,10 @@ class AsyncTestCMAP(AsyncIntegrationTest): self.check_object(actual, expected) self.assertIn(message, str(actual)) - async def _set_fail_point(self, client, command_args): - cmd = SON([("configureFailPoint", "failCommand")]) - cmd.update(command_args) - await client.admin.command(cmd) - async def set_fail_point(self, command_args): if not async_client_context.supports_failCommand_fail_point: self.skipTest("failCommand fail point must be supported") - await self._set_fail_point(self.client, command_args) + await self.configure_fail_point(self.client, command_args) async def run_scenario(self, scenario_def, test): """Run a CMAP spec test.""" diff --git a/test/asynchronous/test_transactions.py b/test/asynchronous/test_transactions.py index d11d0a977..5f75746a4 100644 --- a/test/asynchronous/test_transactions.py +++ b/test/asynchronous/test_transactions.py @@ -410,15 +410,10 @@ class TestTransactionsConvenientAPI(AsyncTransactionsBase): for address in async_client_context.mongoses: self.mongos_clients.append(await self.async_single_client("{}:{}".format(*address))) - async def _set_fail_point(self, client, command_args): - cmd = {"configureFailPoint": "failCommand"} - cmd.update(command_args) - await client.admin.command(cmd) - async def set_fail_point(self, command_args): clients = self.mongos_clients if self.mongos_clients else [self.client] for client in clients: - await self._set_fail_point(client, command_args) + await self.configure_fail_point(client, command_args) @async_client_context.require_transactions async def test_callback_raises_custom_error(self): diff --git a/test/asynchronous/unified_format.py b/test/asynchronous/unified_format.py index 695f58ee2..c315e8694 100644 --- a/test/asynchronous/unified_format.py +++ b/test/asynchronous/unified_format.py @@ -1008,12 +1008,8 @@ class UnifiedSpecTestMixinV1(AsyncIntegrationTest): if not async_client_context.test_commands_enabled: self.skipTest("Test commands must be enabled") - cmd_on = SON([("configureFailPoint", "failCommand")]) - cmd_on.update(command_args) - await client.admin.command(cmd_on) - self.addAsyncCleanup( - client.admin.command, "configureFailPoint", cmd_on["configureFailPoint"], mode="off" - ) + await self.configure_fail_point(client, command_args) + self.addAsyncCleanup(self.configure_fail_point, client, command_args, off=True) async def _testOperation_failPoint(self, spec): await self.__set_fail_point( diff --git a/test/asynchronous/utils_spec_runner.py b/test/asynchronous/utils_spec_runner.py index 11d88850f..7530ba36a 100644 --- a/test/asynchronous/utils_spec_runner.py +++ b/test/asynchronous/utils_spec_runner.py @@ -264,15 +264,10 @@ class AsyncSpecRunner(AsyncIntegrationTest): async def asyncTearDown(self) -> None: self.knobs.disable() - async def _set_fail_point(self, client, command_args): - cmd = SON([("configureFailPoint", "failCommand")]) - cmd.update(command_args) - await client.admin.command(cmd) - async def set_fail_point(self, command_args): clients = self.mongos_clients if self.mongos_clients else [self.client] for client in clients: - await self._set_fail_point(client, command_args) + await self.configure_fail_point(client, command_args) async def targeted_fail_point(self, session, fail_point): """Run the targetedFailPoint test operation. @@ -281,7 +276,7 @@ class AsyncSpecRunner(AsyncIntegrationTest): """ clients = {c.address: c for c in self.mongos_clients} client = clients[session._pinned_address] - await self._set_fail_point(client, fail_point) + await self.configure_fail_point(client, fail_point) self.addAsyncCleanup(self.set_fail_point, {"mode": "off"}) def assert_session_pinned(self, session): diff --git a/test/test_connection_monitoring.py b/test/test_connection_monitoring.py index 810d44093..3987f2b68 100644 --- a/test/test_connection_monitoring.py +++ b/test/test_connection_monitoring.py @@ -211,15 +211,10 @@ class TestCMAP(IntegrationTest): self.check_object(actual, expected) self.assertIn(message, str(actual)) - def _set_fail_point(self, client, command_args): - cmd = SON([("configureFailPoint", "failCommand")]) - cmd.update(command_args) - client.admin.command(cmd) - def set_fail_point(self, command_args): if not client_context.supports_failCommand_fail_point: self.skipTest("failCommand fail point must be supported") - self._set_fail_point(self.client, command_args) + self.configure_fail_point(self.client, command_args) def run_scenario(self, scenario_def, test): """Run a CMAP spec test.""" diff --git a/test/test_transactions.py b/test/test_transactions.py index 949b88e60..7a8dcd0f0 100644 --- a/test/test_transactions.py +++ b/test/test_transactions.py @@ -402,15 +402,10 @@ class TestTransactionsConvenientAPI(TransactionsBase): for address in client_context.mongoses: self.mongos_clients.append(self.single_client("{}:{}".format(*address))) - def _set_fail_point(self, client, command_args): - cmd = {"configureFailPoint": "failCommand"} - cmd.update(command_args) - client.admin.command(cmd) - def set_fail_point(self, command_args): clients = self.mongos_clients if self.mongos_clients else [self.client] for client in clients: - self._set_fail_point(client, command_args) + self.configure_fail_point(client, command_args) @client_context.require_transactions def test_callback_raises_custom_error(self): diff --git a/test/unified_format.py b/test/unified_format.py index 73dee10dd..d5698f5a7 100644 --- a/test/unified_format.py +++ b/test/unified_format.py @@ -999,12 +999,8 @@ class UnifiedSpecTestMixinV1(IntegrationTest): if not client_context.test_commands_enabled: self.skipTest("Test commands must be enabled") - cmd_on = SON([("configureFailPoint", "failCommand")]) - cmd_on.update(command_args) - client.admin.command(cmd_on) - self.addCleanup( - client.admin.command, "configureFailPoint", cmd_on["configureFailPoint"], mode="off" - ) + self.configure_fail_point(client, command_args) + self.addCleanup(self.configure_fail_point, client, command_args, off=True) def _testOperation_failPoint(self, spec): self.__set_fail_point( diff --git a/test/utils_spec_runner.py b/test/utils_spec_runner.py index 98949431d..ac4031e82 100644 --- a/test/utils_spec_runner.py +++ b/test/utils_spec_runner.py @@ -264,15 +264,10 @@ class SpecRunner(IntegrationTest): def tearDown(self) -> None: self.knobs.disable() - def _set_fail_point(self, client, command_args): - cmd = SON([("configureFailPoint", "failCommand")]) - cmd.update(command_args) - client.admin.command(cmd) - def set_fail_point(self, command_args): clients = self.mongos_clients if self.mongos_clients else [self.client] for client in clients: - self._set_fail_point(client, command_args) + self.configure_fail_point(client, command_args) def targeted_fail_point(self, session, fail_point): """Run the targetedFailPoint test operation. @@ -281,7 +276,7 @@ class SpecRunner(IntegrationTest): """ clients = {c.address: c for c in self.mongos_clients} client = clients[session._pinned_address] - self._set_fail_point(client, fail_point) + self.configure_fail_point(client, fail_point) self.addCleanup(self.set_fail_point, {"mode": "off"}) def assert_session_pinned(self, session): From 324ed1730ff0c7146b8b49e08e27cd4ae74d8b07 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Wed, 26 Feb 2025 13:21:07 -0600 Subject: [PATCH 2/9] DRIVERS-3119 Add options to provide certificate and CA files (#2159) --- .../scripts/bootstrap-mongo-orchestration.sh | 37 +++++-------------- .evergreen/scripts/configure-env.sh | 4 +- .evergreen/scripts/prepare-resources.sh | 24 ------------ .evergreen/scripts/setup-system.sh | 29 ++++++++++++++- 4 files changed, 41 insertions(+), 53 deletions(-) delete mode 100755 .evergreen/scripts/prepare-resources.sh diff --git a/.evergreen/scripts/bootstrap-mongo-orchestration.sh b/.evergreen/scripts/bootstrap-mongo-orchestration.sh index 8f7d9d0ae..af38edd09 100755 --- a/.evergreen/scripts/bootstrap-mongo-orchestration.sh +++ b/.evergreen/scripts/bootstrap-mongo-orchestration.sh @@ -2,38 +2,21 @@ set -eu - -# Enable core dumps if enabled on the machine -# Copied from https://github.com/mongodb/mongo/blob/master/etc/evergreen.yml -if [ -f /proc/self/coredump_filter ]; then - # Set the shell process (and its children processes) to dump ELF headers (bit 4), - # anonymous shared mappings (bit 1), and anonymous private mappings (bit 0). - echo 0x13 >/proc/self/coredump_filter - - if [ -f /sbin/sysctl ]; then - # Check that the core pattern is set explicitly on our distro image instead - # of being the OS's default value. This ensures that coredump names are consistent - # across distros and can be picked up by Evergreen. - core_pattern=$(/sbin/sysctl -n "kernel.core_pattern") - if [ "$core_pattern" = "dump_%e.%p.core" ]; then - echo "Enabling coredumps" - ulimit -c unlimited - fi - fi -fi - -if [ "$(uname -s)" = "Darwin" ]; then - core_pattern_mac=$(/usr/sbin/sysctl -n "kern.corefile") - if [ "$core_pattern_mac" = "dump_%N.%P.core" ]; then - echo "Enabling coredumps" - ulimit -c unlimited - fi -fi +HERE=$(dirname ${BASH_SOURCE:-$0}) +HERE="$( cd -- "$HERE" > /dev/null 2>&1 && pwd )" +ROOT=$(dirname "$(dirname $HERE)") if [ -z "${TEST_CRYPT_SHARED:-}" ]; then export SKIP_CRYPT_SHARED=1 fi +# Override the tls files if applicable. +if [ "${SSL:-}" == "ssl" ]; then + export TLS_CERT_KEY_FILE=${ROOT}/test/certificates/client.pem + export TLS_PEM_KEY_FILE=${ROOT}/test/certificates/server.pem + export TLS_CA_FILE=${ROOT}/test/certificates/ca.pem +fi + MONGODB_VERSION=${VERSION:-} \ TOPOLOGY=${TOPOLOGY:-} \ AUTH=${AUTH:-} \ diff --git a/.evergreen/scripts/configure-env.sh b/.evergreen/scripts/configure-env.sh index 551541356..f23af8a81 100755 --- a/.evergreen/scripts/configure-env.sh +++ b/.evergreen/scripts/configure-env.sh @@ -76,7 +76,9 @@ EOT # Write the .env file for drivers-tools. rm -rf $DRIVERS_TOOLS -git clone https://github.com/mongodb-labs/drivers-evergreen-tools.git $DRIVERS_TOOLS +BRANCH=master +ORG=mongodb-labs +git clone --branch $BRANCH https://github.com/$ORG/drivers-evergreen-tools.git $DRIVERS_TOOLS cat < ${DRIVERS_TOOLS}/.env SKIP_LEGACY_SHELL=1 diff --git a/.evergreen/scripts/prepare-resources.sh b/.evergreen/scripts/prepare-resources.sh deleted file mode 100755 index f5285a39d..000000000 --- a/.evergreen/scripts/prepare-resources.sh +++ /dev/null @@ -1,24 +0,0 @@ -#!/bin/bash -set -eu - -HERE=$(dirname ${BASH_SOURCE:-$0}) -pushd $HERE -. env.sh - -popd - -# Copy PyMongo's test certificates over driver-evergreen-tools' -cp ${PROJECT_DIRECTORY}/test/certificates/* ${DRIVERS_TOOLS}/.evergreen/x509gen/ - -# Replace MongoOrchestration's client certificate. -cp ${PROJECT_DIRECTORY}/test/certificates/client.pem ${MONGO_ORCHESTRATION_HOME}/lib/client.pem - -if [ -w /etc/hosts ]; then - SUDO="" -else - SUDO="sudo" -fi - -# Add 'server' and 'hostname_not_in_cert' as a hostnames -echo "127.0.0.1 server" | $SUDO tee -a /etc/hosts -echo "127.0.0.1 hostname_not_in_cert" | $SUDO tee -a /etc/hosts diff --git a/.evergreen/scripts/setup-system.sh b/.evergreen/scripts/setup-system.sh index d78d924f6..0ab08ff01 100755 --- a/.evergreen/scripts/setup-system.sh +++ b/.evergreen/scripts/setup-system.sh @@ -7,8 +7,35 @@ pushd "$(dirname "$(dirname $HERE)")" echo "Setting up system..." bash .evergreen/scripts/configure-env.sh source .evergreen/scripts/env.sh -bash .evergreen/scripts/prepare-resources.sh bash $DRIVERS_TOOLS/.evergreen/setup.sh bash .evergreen/scripts/install-dependencies.sh popd + +# Enable core dumps if enabled on the machine +# Copied from https://github.com/mongodb/mongo/blob/master/etc/evergreen.yml +if [ -f /proc/self/coredump_filter ]; then + # Set the shell process (and its children processes) to dump ELF headers (bit 4), + # anonymous shared mappings (bit 1), and anonymous private mappings (bit 0). + echo 0x13 >/proc/self/coredump_filter + + if [ -f /sbin/sysctl ]; then + # Check that the core pattern is set explicitly on our distro image instead + # of being the OS's default value. This ensures that coredump names are consistent + # across distros and can be picked up by Evergreen. + core_pattern=$(/sbin/sysctl -n "kernel.core_pattern") + if [ "$core_pattern" = "dump_%e.%p.core" ]; then + echo "Enabling coredumps" + ulimit -c unlimited + fi + fi +fi + +if [ "$(uname -s)" = "Darwin" ]; then + core_pattern_mac=$(/usr/sbin/sysctl -n "kern.corefile") + if [ "$core_pattern_mac" = "dump_%N.%P.core" ]; then + echo "Enabling coredumps" + ulimit -c unlimited + fi +fi + echo "Setting up system... done." From f5aeac3cccde06ab1166d44f63d5ebae116d554e Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Wed, 26 Feb 2025 13:21:46 -0600 Subject: [PATCH 3/9] DRIVERS-3058 Include nsType field in ChangeStreamDocument (#2157) --- .../unified/change-streams-nsType.json | 145 ++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100644 test/change_streams/unified/change-streams-nsType.json diff --git a/test/change_streams/unified/change-streams-nsType.json b/test/change_streams/unified/change-streams-nsType.json new file mode 100644 index 000000000..1861c9a5e --- /dev/null +++ b/test/change_streams/unified/change-streams-nsType.json @@ -0,0 +1,145 @@ +{ + "description": "change-streams-nsType", + "schemaVersion": "1.7", + "runOnRequirements": [ + { + "minServerVersion": "8.1.0", + "topologies": [ + "replicaset", + "sharded" + ], + "serverless": "forbid" + } + ], + "createEntities": [ + { + "client": { + "id": "client0", + "useMultipleMongoses": false + } + }, + { + "database": { + "id": "database0", + "client": "client0", + "databaseName": "database0" + } + } + ], + "tests": [ + { + "description": "nsType is present when creating collections", + "operations": [ + { + "name": "dropCollection", + "object": "database0", + "arguments": { + "collection": "foo" + } + }, + { + "name": "createChangeStream", + "object": "database0", + "arguments": { + "pipeline": [], + "showExpandedEvents": true + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "createCollection", + "object": "database0", + "arguments": { + "collection": "foo" + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "create", + "nsType": "collection" + } + } + ] + }, + { + "description": "nsType is present when creating timeseries", + "operations": [ + { + "name": "dropCollection", + "object": "database0", + "arguments": { + "collection": "foo" + } + }, + { + "name": "createChangeStream", + "object": "database0", + "arguments": { + "pipeline": [], + "showExpandedEvents": true + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "createCollection", + "object": "database0", + "arguments": { + "collection": "foo", + "timeseries": { + "timeField": "time", + "metaField": "meta", + "granularity": "minutes" + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "create", + "nsType": "timeseries" + } + } + ] + }, + { + "description": "nsType is present when creating views", + "operations": [ + { + "name": "dropCollection", + "object": "database0", + "arguments": { + "collection": "foo" + } + }, + { + "name": "createChangeStream", + "object": "database0", + "arguments": { + "pipeline": [], + "showExpandedEvents": true + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "createCollection", + "object": "database0", + "arguments": { + "collection": "foo", + "viewOn": "testName" + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "create", + "nsType": "view" + } + } + ] + } + ] +} From 61feccacfefaf342ad673bf320b68d839bbdf66c Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Wed, 26 Feb 2025 13:23:02 -0600 Subject: [PATCH 4/9] DRIVERS-2915 Add ENVIRONMENT auth mechanism property to test URIs (#2160) --- test/connection_string/test/valid-options.json | 7 ++++--- test/connection_string/test/valid-warnings.json | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/test/connection_string/test/valid-options.json b/test/connection_string/test/valid-options.json index 6c86172d0..e094bcf60 100644 --- a/test/connection_string/test/valid-options.json +++ b/test/connection_string/test/valid-options.json @@ -40,7 +40,7 @@ }, { "description": "Colon in a key value pair", - "uri": "mongodb://example.com/?authMechanism=MONGODB-OIDC&authMechanismProperties=TOKEN_RESOURCE:mongodb://test-cluster", + "uri": "mongodb://example.com/?authMechanism=MONGODB-OIDC&authMechanismProperties=TOKEN_RESOURCE:mongodb://test-cluster,ENVIRONMENT:azure", "valid": true, "warning": false, "hosts": [ @@ -53,9 +53,10 @@ "auth": null, "options": { "authmechanismProperties": { - "TOKEN_RESOURCE": "mongodb://test-cluster" + "TOKEN_RESOURCE": "mongodb://test-cluster", + "ENVIRONMENT": "azure" } } } ] -} +} \ No newline at end of file diff --git a/test/connection_string/test/valid-warnings.json b/test/connection_string/test/valid-warnings.json index daf814a75..c46a8311c 100644 --- a/test/connection_string/test/valid-warnings.json +++ b/test/connection_string/test/valid-warnings.json @@ -96,7 +96,7 @@ }, { "description": "Comma in a key value pair causes a warning", - "uri": "mongodb://localhost?authMechanism=MONGODB-OIDC&authMechanismProperties=TOKEN_RESOURCE:mongodb://host1%2Chost2", + "uri": "mongodb://localhost?authMechanism=MONGODB-OIDC&authMechanismProperties=TOKEN_RESOURCE:mongodb://host1%2Chost2,ENVIRONMENT:azure", "valid": true, "warning": true, "hosts": [ @@ -112,4 +112,4 @@ } } ] -} +} \ No newline at end of file From 85ca6f1d9fa71badeeee2b80db7ec89dc4bef0f4 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Wed, 26 Feb 2025 13:18:04 -0800 Subject: [PATCH 5/9] PYTHON-4579 Stop gossiping $clusterTime on SDAM connections (#1925) --- pymongo/asynchronous/monitor.py | 16 +++------- pymongo/asynchronous/network.py | 4 +++ pymongo/asynchronous/pool.py | 13 +++++---- pymongo/asynchronous/topology.py | 1 - pymongo/synchronous/monitor.py | 16 +++------- pymongo/synchronous/network.py | 4 +++ pymongo/synchronous/pool.py | 13 +++++---- pymongo/synchronous/topology.py | 1 - test/asynchronous/test_session.py | 42 ++++++++++++++++++++++++--- test/test_discovery_and_monitoring.py | 18 +++++------- test/test_session.py | 38 ++++++++++++++++++++++-- 11 files changed, 112 insertions(+), 54 deletions(-) diff --git a/pymongo/asynchronous/monitor.py b/pymongo/asynchronous/monitor.py index 15289af4d..d7f87b718 100644 --- a/pymongo/asynchronous/monitor.py +++ b/pymongo/asynchronous/monitor.py @@ -21,11 +21,11 @@ import atexit import logging import time import weakref -from typing import TYPE_CHECKING, Any, Mapping, Optional, cast +from typing import TYPE_CHECKING, Any, Optional from pymongo import common, periodic_executor from pymongo._csot import MovingMinimum -from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled +from pymongo.errors import NetworkTimeout, _OperationCancelled from pymongo.hello import Hello from pymongo.lock import _async_create_lock from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage @@ -255,13 +255,7 @@ class Monitor(MonitorBase): self._conn_id = None start = time.monotonic() try: - try: - return await self._check_once() - except (OperationFailure, NotPrimaryError) as exc: - # Update max cluster time even when hello fails. - details = cast(Mapping[str, Any], exc.details) - await self._topology.receive_cluster_time(details.get("$clusterTime")) - raise + return await self._check_once() except ReferenceError: raise except Exception as error: @@ -358,7 +352,6 @@ class Monitor(MonitorBase): Can raise ConnectionFailure or OperationFailure. """ - cluster_time = self._topology.max_cluster_time() start = time.monotonic() if conn.more_to_come: # Read the next streaming hello (MongoDB 4.4+). @@ -368,13 +361,12 @@ class Monitor(MonitorBase): ): # Initiate streaming hello (MongoDB 4.4+). response = await conn._hello( - cluster_time, self._server_description.topology_version, self._settings.heartbeat_frequency, ) else: # New connection handshake or polling hello (MongoDB <4.4). - response = await conn._hello(cluster_time, None, None) + response = await conn._hello(None, None) duration = _monotonic_duration(start) return response, duration diff --git a/pymongo/asynchronous/network.py b/pymongo/asynchronous/network.py index d17aead12..c7a5580ec 100644 --- a/pymongo/asynchronous/network.py +++ b/pymongo/asynchronous/network.py @@ -207,6 +207,10 @@ async def command( ) response_doc = unpacked_docs[0] + if not conn.ready: + cluster_time = response_doc.get("$clusterTime") + if cluster_time: + conn._cluster_time = cluster_time if client: await client._process_response(response_doc, session) if check: diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 1da695c5c..698558aa5 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -102,7 +102,7 @@ if TYPE_CHECKING: from pymongo.pyopenssl_context import _sslConn from pymongo.read_concern import ReadConcern from pymongo.read_preferences import _ServerMode - from pymongo.typings import ClusterTime, _Address, _CollationIn + from pymongo.typings import _Address, _CollationIn from pymongo.write_concern import WriteConcern try: @@ -310,6 +310,8 @@ class AsyncConnection: self.connect_rtt = 0.0 self._client_id = pool._client_id self.creation_time = time.monotonic() + # For gossiping $clusterTime from the connection handshake to the client. + self._cluster_time = None def set_conn_timeout(self, timeout: Optional[float]) -> None: """Cache last timeout to avoid duplicate calls to conn.settimeout.""" @@ -374,11 +376,10 @@ class AsyncConnection: return {HelloCompat.LEGACY_CMD: 1, "helloOk": True} async def hello(self) -> Hello: - return await self._hello(None, None, None) + return await self._hello(None, None) async def _hello( self, - cluster_time: Optional[ClusterTime], topology_version: Optional[Any], heartbeat_frequency: Optional[int], ) -> Hello[dict[str, Any]]: @@ -401,9 +402,6 @@ class AsyncConnection: if self.opts.connect_timeout: self.set_conn_timeout(self.opts.connect_timeout + heartbeat_frequency) - if not performing_handshake and cluster_time is not None: - cmd["$clusterTime"] = cluster_time - creds = self.opts._credentials if creds: if creds.mechanism == "DEFAULT" and creds.username: @@ -1316,6 +1314,9 @@ class Pool: conn.close_conn(ConnectionClosedReason.ERROR) raise + if handler: + await handler.client._topology.receive_cluster_time(conn._cluster_time) + return conn @contextlib.asynccontextmanager diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 19fc76b0d..bb003bbfd 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -501,7 +501,6 @@ class Topology: self._description = new_td await self._update_servers() - self._receive_cluster_time_no_lock(server_description.cluster_time) if self._publish_tp and not suppress_event: assert self._events is not None diff --git a/pymongo/synchronous/monitor.py b/pymongo/synchronous/monitor.py index 802ba4742..c39a57c39 100644 --- a/pymongo/synchronous/monitor.py +++ b/pymongo/synchronous/monitor.py @@ -21,11 +21,11 @@ import atexit import logging import time import weakref -from typing import TYPE_CHECKING, Any, Mapping, Optional, cast +from typing import TYPE_CHECKING, Any, Optional from pymongo import common, periodic_executor from pymongo._csot import MovingMinimum -from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled +from pymongo.errors import NetworkTimeout, _OperationCancelled from pymongo.hello import Hello from pymongo.lock import _create_lock from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage @@ -253,13 +253,7 @@ class Monitor(MonitorBase): self._conn_id = None start = time.monotonic() try: - try: - return self._check_once() - except (OperationFailure, NotPrimaryError) as exc: - # Update max cluster time even when hello fails. - details = cast(Mapping[str, Any], exc.details) - self._topology.receive_cluster_time(details.get("$clusterTime")) - raise + return self._check_once() except ReferenceError: raise except Exception as error: @@ -356,7 +350,6 @@ class Monitor(MonitorBase): Can raise ConnectionFailure or OperationFailure. """ - cluster_time = self._topology.max_cluster_time() start = time.monotonic() if conn.more_to_come: # Read the next streaming hello (MongoDB 4.4+). @@ -366,13 +359,12 @@ class Monitor(MonitorBase): ): # Initiate streaming hello (MongoDB 4.4+). response = conn._hello( - cluster_time, self._server_description.topology_version, self._settings.heartbeat_frequency, ) else: # New connection handshake or polling hello (MongoDB <4.4). - response = conn._hello(cluster_time, None, None) + response = conn._hello(None, None) duration = _monotonic_duration(start) return response, duration diff --git a/pymongo/synchronous/network.py b/pymongo/synchronous/network.py index 7206dca73..543b069bf 100644 --- a/pymongo/synchronous/network.py +++ b/pymongo/synchronous/network.py @@ -207,6 +207,10 @@ def command( ) response_doc = unpacked_docs[0] + if not conn.ready: + cluster_time = response_doc.get("$clusterTime") + if cluster_time: + conn._cluster_time = cluster_time if client: client._process_response(response_doc, session) if check: diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 978f0ae39..e575710ff 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -102,7 +102,7 @@ if TYPE_CHECKING: from pymongo.synchronous.auth import _AuthContext from pymongo.synchronous.client_session import ClientSession from pymongo.synchronous.mongo_client import MongoClient, _MongoClientErrorHandler - from pymongo.typings import ClusterTime, _Address, _CollationIn + from pymongo.typings import _Address, _CollationIn from pymongo.write_concern import WriteConcern try: @@ -310,6 +310,8 @@ class Connection: self.connect_rtt = 0.0 self._client_id = pool._client_id self.creation_time = time.monotonic() + # For gossiping $clusterTime from the connection handshake to the client. + self._cluster_time = None def set_conn_timeout(self, timeout: Optional[float]) -> None: """Cache last timeout to avoid duplicate calls to conn.settimeout.""" @@ -374,11 +376,10 @@ class Connection: return {HelloCompat.LEGACY_CMD: 1, "helloOk": True} def hello(self) -> Hello: - return self._hello(None, None, None) + return self._hello(None, None) def _hello( self, - cluster_time: Optional[ClusterTime], topology_version: Optional[Any], heartbeat_frequency: Optional[int], ) -> Hello[dict[str, Any]]: @@ -401,9 +402,6 @@ class Connection: if self.opts.connect_timeout: self.set_conn_timeout(self.opts.connect_timeout + heartbeat_frequency) - if not performing_handshake and cluster_time is not None: - cmd["$clusterTime"] = cluster_time - creds = self.opts._credentials if creds: if creds.mechanism == "DEFAULT" and creds.username: @@ -1310,6 +1308,9 @@ class Pool: conn.close_conn(ConnectionClosedReason.ERROR) raise + if handler: + handler.client._topology.receive_cluster_time(conn._cluster_time) + return conn @contextlib.contextmanager diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index 6a8503c6c..2bc893454 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -501,7 +501,6 @@ class Topology: self._description = new_td self._update_servers() - self._receive_cluster_time_no_lock(server_description.cluster_time) if self._publish_tp and not suppress_event: assert self._events is not None diff --git a/test/asynchronous/test_session.py b/test/asynchronous/test_session.py index 03d1032b5..568d392cd 100644 --- a/test/asynchronous/test_session.py +++ b/test/asynchronous/test_session.py @@ -36,8 +36,10 @@ from test.asynchronous import ( async_client_context, unittest, ) +from test.asynchronous.helpers import client_knobs from test.utils import ( EventListener, + HeartbeatEventListener, OvertCommandListener, async_wait_until, ) @@ -1135,12 +1137,10 @@ class TestClusterTime(AsyncIntegrationTest): if "$clusterTime" not in (await async_client_context.hello): raise SkipTest("$clusterTime not supported") + # Sessions prose test: 3) $clusterTime in commands async def test_cluster_time(self): listener = SessionTestListener() - # Prevent heartbeats from updating $clusterTime between operations. - client = await self.async_rs_or_single_client( - event_listeners=[listener], heartbeatFrequencyMS=999999 - ) + client = await self.async_rs_or_single_client(event_listeners=[listener]) collection = client.pymongo_test.collection # Prepare for tests of find() and aggregate(). await collection.insert_many([{} for _ in range(10)]) @@ -1219,6 +1219,40 @@ class TestClusterTime(AsyncIntegrationTest): f"{f.__name__} sent wrong $clusterTime with {event.command_name}", ) + # Sessions prose test: 20) Drivers do not gossip `$clusterTime` on SDAM commands + async def test_cluster_time_not_used_by_sdam(self): + heartbeat_listener = HeartbeatEventListener() + cmd_listener = OvertCommandListener() + with client_knobs(min_heartbeat_interval=0.01): + c1 = await self.async_single_client( + event_listeners=[heartbeat_listener, cmd_listener], heartbeatFrequencyMS=10 + ) + cluster_time = (await c1.admin.command({"ping": 1}))["$clusterTime"] + self.assertEqual(c1._topology.max_cluster_time(), cluster_time) + + # Advance the server's $clusterTime by performing an insert via another client. + await self.db.test.insert_one({"advance": "$clusterTime"}) + # Wait until the client C1 processes the next pair of SDAM heartbeat started + succeeded events. + heartbeat_listener.reset() + + async def next_heartbeat(): + events = heartbeat_listener.events + for i in range(len(events) - 1): + if isinstance(events[i], monitoring.ServerHeartbeatStartedEvent): + if isinstance(events[i + 1], monitoring.ServerHeartbeatSucceededEvent): + return True + return False + + await async_wait_until( + next_heartbeat, "never found pair of heartbeat started + succeeded events" + ) + # Assert that C1's max $clusterTime is still the same and has not been updated by SDAM. + cmd_listener.reset() + await c1.admin.command({"ping": 1}) + started = cmd_listener.started_events[0] + self.assertEqual(started.command_name, "ping") + self.assertEqual(started.command["$clusterTime"], cluster_time) + if __name__ == "__main__": unittest.main() diff --git a/test/test_discovery_and_monitoring.py b/test/test_discovery_and_monitoring.py index ce7a52f1a..70dcfc5b4 100644 --- a/test/test_discovery_and_monitoring.py +++ b/test/test_discovery_and_monitoring.py @@ -244,7 +244,7 @@ class TestClusterTimeComparison(unittest.TestCase): def test_cluster_time_comparison(self): t = create_mock_topology("mongodb://host") - def send_cluster_time(time, inc, should_update): + def send_cluster_time(time, inc): old = t.max_cluster_time() new = {"clusterTime": Timestamp(time, inc)} got_hello( @@ -259,16 +259,14 @@ class TestClusterTimeComparison(unittest.TestCase): ) actual = t.max_cluster_time() - if should_update: - self.assertEqual(actual, new) - else: - self.assertEqual(actual, old) + # We never update $clusterTime from monitoring connections. + self.assertEqual(actual, old) - send_cluster_time(0, 1, True) - send_cluster_time(2, 2, True) - send_cluster_time(2, 1, False) - send_cluster_time(1, 3, False) - send_cluster_time(2, 3, True) + send_cluster_time(0, 1) + send_cluster_time(2, 2) + send_cluster_time(2, 1) + send_cluster_time(1, 3) + send_cluster_time(2, 3) class TestIgnoreStaleErrors(IntegrationTest): diff --git a/test/test_session.py b/test/test_session.py index 175a28249..e80ab4189 100644 --- a/test/test_session.py +++ b/test/test_session.py @@ -36,8 +36,10 @@ from test import ( client_context, unittest, ) +from test.helpers import client_knobs from test.utils import ( EventListener, + HeartbeatEventListener, OvertCommandListener, wait_until, ) @@ -1121,10 +1123,10 @@ class TestClusterTime(IntegrationTest): if "$clusterTime" not in (client_context.hello): raise SkipTest("$clusterTime not supported") + # Sessions prose test: 3) $clusterTime in commands def test_cluster_time(self): listener = SessionTestListener() - # Prevent heartbeats from updating $clusterTime between operations. - client = self.rs_or_single_client(event_listeners=[listener], heartbeatFrequencyMS=999999) + client = self.rs_or_single_client(event_listeners=[listener]) collection = client.pymongo_test.collection # Prepare for tests of find() and aggregate(). collection.insert_many([{} for _ in range(10)]) @@ -1203,6 +1205,38 @@ class TestClusterTime(IntegrationTest): f"{f.__name__} sent wrong $clusterTime with {event.command_name}", ) + # Sessions prose test: 20) Drivers do not gossip `$clusterTime` on SDAM commands + def test_cluster_time_not_used_by_sdam(self): + heartbeat_listener = HeartbeatEventListener() + cmd_listener = OvertCommandListener() + with client_knobs(min_heartbeat_interval=0.01): + c1 = self.single_client( + event_listeners=[heartbeat_listener, cmd_listener], heartbeatFrequencyMS=10 + ) + cluster_time = (c1.admin.command({"ping": 1}))["$clusterTime"] + self.assertEqual(c1._topology.max_cluster_time(), cluster_time) + + # Advance the server's $clusterTime by performing an insert via another client. + self.db.test.insert_one({"advance": "$clusterTime"}) + # Wait until the client C1 processes the next pair of SDAM heartbeat started + succeeded events. + heartbeat_listener.reset() + + def next_heartbeat(): + events = heartbeat_listener.events + for i in range(len(events) - 1): + if isinstance(events[i], monitoring.ServerHeartbeatStartedEvent): + if isinstance(events[i + 1], monitoring.ServerHeartbeatSucceededEvent): + return True + return False + + wait_until(next_heartbeat, "never found pair of heartbeat started + succeeded events") + # Assert that C1's max $clusterTime is still the same and has not been updated by SDAM. + cmd_listener.reset() + c1.admin.command({"ping": 1}) + started = cmd_listener.started_events[0] + self.assertEqual(started.command_name, "ping") + self.assertEqual(started.command["$clusterTime"], cluster_time) + if __name__ == "__main__": unittest.main() From c9a85ad321f98caf314fc9da7a367e94d661abac Mon Sep 17 00:00:00 2001 From: Iris <58442094+sleepyStick@users.noreply.github.com> Date: Thu, 27 Feb 2025 08:05:23 -0800 Subject: [PATCH 6/9] PYTHON-5090 Convert test.test_monitor to async (#2106) --- test/asynchronous/test_monitor.py | 121 ++++++++++++++++++++++++++++++ test/test_monitor.py | 29 ++++++- tools/synchro.py | 1 + 3 files changed, 147 insertions(+), 4 deletions(-) create mode 100644 test/asynchronous/test_monitor.py diff --git a/test/asynchronous/test_monitor.py b/test/asynchronous/test_monitor.py new file mode 100644 index 000000000..2705fbda3 --- /dev/null +++ b/test/asynchronous/test_monitor.py @@ -0,0 +1,121 @@ +# Copyright 2014-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test the monitor module.""" +from __future__ import annotations + +import asyncio +import gc +import subprocess +import sys +import warnings +from functools import partial + +sys.path[0:0] = [""] + +from test.asynchronous import AsyncIntegrationTest, async_client_context, connected, unittest +from test.utils import ( + ServerAndTopologyEventListener, + async_wait_until, +) + +from pymongo.periodic_executor import _EXECUTORS + +_IS_SYNC = False + + +def unregistered(ref): + gc.collect() + return ref not in _EXECUTORS + + +def get_executors(client): + executors = [] + for server in client._topology._servers.values(): + executors.append(server._monitor._executor) + executors.append(server._monitor._rtt_monitor._executor) + executors.append(client._kill_cursors_executor) + executors.append(client._topology._Topology__events_executor) + return [e for e in executors if e is not None] + + +class TestMonitor(AsyncIntegrationTest): + async def create_client(self): + listener = ServerAndTopologyEventListener() + client = await self.unmanaged_async_single_client(event_listeners=[listener]) + await connected(client) + return client + + async def test_cleanup_executors_on_client_del(self): + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + client = await self.create_client() + executors = get_executors(client) + self.assertEqual(len(executors), 4) + + # Each executor stores a weakref to itself in _EXECUTORS. + executor_refs = [(r, r()._name) for r in _EXECUTORS.copy() if r() in executors] + + del executors + del client + + for ref, name in executor_refs: + await async_wait_until( + partial(unregistered, ref), f"unregister executor: {name}", timeout=5 + ) + + def resource_warning_caught(): + gc.collect() + for warning in w: + if ( + issubclass(warning.category, ResourceWarning) + and "Call AsyncMongoClient.close() to safely shut down your client and free up resources." + in str(warning.message) + ): + return True + return False + + await async_wait_until(resource_warning_caught, "catch resource warning") + + async def test_cleanup_executors_on_client_close(self): + client = await self.create_client() + executors = get_executors(client) + self.assertEqual(len(executors), 4) + + await client.close() + + for executor in executors: + await async_wait_until( + lambda: executor._stopped, f"closed executor: {executor._name}", timeout=5 + ) + + @async_client_context.require_sync + def test_no_thread_start_runtime_err_on_shutdown(self): + """Test we silence noisy runtime errors fired when the AsyncMongoClient spawns a new thread + on process shutdown.""" + command = [ + sys.executable, + "-c", + "from pymongo import AsyncMongoClient; c = AsyncMongoClient()", + ] + completed_process: subprocess.CompletedProcess = subprocess.run( + command, capture_output=True + ) + + self.assertFalse(completed_process.stderr) + self.assertFalse(completed_process.stdout) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_monitor.py b/test/test_monitor.py index a704f3d8c..0fb7eb9ca 100644 --- a/test/test_monitor.py +++ b/test/test_monitor.py @@ -15,6 +15,7 @@ """Test the monitor module.""" from __future__ import annotations +import asyncio import gc import subprocess import sys @@ -23,7 +24,7 @@ from functools import partial sys.path[0:0] = [""] -from test import IntegrationTest, connected, unittest +from test import IntegrationTest, client_context, connected, unittest from test.utils import ( ServerAndTopologyEventListener, wait_until, @@ -31,6 +32,8 @@ from test.utils import ( from pymongo.periodic_executor import _EXECUTORS +_IS_SYNC = True + def unregistered(ref): gc.collect() @@ -55,8 +58,8 @@ class TestMonitor(IntegrationTest): return client def test_cleanup_executors_on_client_del(self): - with warnings.catch_warnings(): - warnings.simplefilter("ignore") + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") client = self.create_client() executors = get_executors(client) self.assertEqual(len(executors), 4) @@ -70,6 +73,19 @@ class TestMonitor(IntegrationTest): for ref, name in executor_refs: wait_until(partial(unregistered, ref), f"unregister executor: {name}", timeout=5) + def resource_warning_caught(): + gc.collect() + for warning in w: + if ( + issubclass(warning.category, ResourceWarning) + and "Call MongoClient.close() to safely shut down your client and free up resources." + in str(warning.message) + ): + return True + return False + + wait_until(resource_warning_caught, "catch resource warning") + def test_cleanup_executors_on_client_close(self): client = self.create_client() executors = get_executors(client) @@ -80,10 +96,15 @@ class TestMonitor(IntegrationTest): for executor in executors: wait_until(lambda: executor._stopped, f"closed executor: {executor._name}", timeout=5) + @client_context.require_sync def test_no_thread_start_runtime_err_on_shutdown(self): """Test we silence noisy runtime errors fired when the MongoClient spawns a new thread on process shutdown.""" - command = [sys.executable, "-c", "from pymongo import MongoClient; c = MongoClient()"] + command = [ + sys.executable, + "-c", + "from pymongo import MongoClient; c = MongoClient()", + ] completed_process: subprocess.CompletedProcess = subprocess.run( command, capture_output=True ) diff --git a/tools/synchro.py b/tools/synchro.py index 39c53b435..877a68353 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -226,6 +226,7 @@ converted_tests = [ "test_load_balancer.py", "test_logger.py", "test_max_staleness.py", + "test_monitor.py", "test_monitoring.py", "test_mongos_load_balancing.py", "test_on_demand_csfle.py", From e52965eea4f578d0359c43e8ef4b49711d2133f1 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Thu, 27 Feb 2025 15:07:50 -0800 Subject: [PATCH 7/9] Remove redundant branch in GridFS (#2064) --- gridfs/asynchronous/grid_file.py | 17 +++++++---------- gridfs/synchronous/grid_file.py | 17 +++++++---------- 2 files changed, 14 insertions(+), 20 deletions(-) diff --git a/gridfs/asynchronous/grid_file.py b/gridfs/asynchronous/grid_file.py index baa88d480..3f3179c45 100644 --- a/gridfs/asynchronous/grid_file.py +++ b/gridfs/asynchronous/grid_file.py @@ -1301,11 +1301,8 @@ class AsyncGridIn: raise ValueError("cannot write to a closed file") try: - if isinstance(data, AsyncGridOut): - read = data.read - else: - # file-like - read = data.read + # file-like + read = data.read except AttributeError: # string if not isinstance(data, (str, bytes)): @@ -1317,7 +1314,7 @@ class AsyncGridIn: raise TypeError( "must specify an encoding for file in order to write str" ) from None - read = io.BytesIO(data).read # type: ignore[assignment] + read = io.BytesIO(data).read if inspect.iscoroutinefunction(read): await self._write_async(read) @@ -1331,15 +1328,15 @@ class AsyncGridIn: except BaseException: await self.abort() raise - self._buffer.write(to_write) # type: ignore - if len(to_write) < space: # type: ignore + self._buffer.write(to_write) + if len(to_write) < space: return # EOF or incomplete await self._flush_buffer() to_write = read(self.chunk_size) - while to_write and len(to_write) == self.chunk_size: # type: ignore + while to_write and len(to_write) == self.chunk_size: await self._flush_data(to_write) to_write = read(self.chunk_size) - self._buffer.write(to_write) # type: ignore + self._buffer.write(to_write) async def _write_async(self, read: Any) -> None: if self._buffer.tell() > 0: diff --git a/gridfs/synchronous/grid_file.py b/gridfs/synchronous/grid_file.py index ea0b53cfb..35386857d 100644 --- a/gridfs/synchronous/grid_file.py +++ b/gridfs/synchronous/grid_file.py @@ -1291,11 +1291,8 @@ class GridIn: raise ValueError("cannot write to a closed file") try: - if isinstance(data, GridOut): - read = data.read - else: - # file-like - read = data.read + # file-like + read = data.read except AttributeError: # string if not isinstance(data, (str, bytes)): @@ -1307,7 +1304,7 @@ class GridIn: raise TypeError( "must specify an encoding for file in order to write str" ) from None - read = io.BytesIO(data).read # type: ignore[assignment] + read = io.BytesIO(data).read if inspect.iscoroutinefunction(read): self._write_async(read) @@ -1321,15 +1318,15 @@ class GridIn: except BaseException: self.abort() raise - self._buffer.write(to_write) # type: ignore - if len(to_write) < space: # type: ignore + self._buffer.write(to_write) + if len(to_write) < space: return # EOF or incomplete self._flush_buffer() to_write = read(self.chunk_size) - while to_write and len(to_write) == self.chunk_size: # type: ignore + while to_write and len(to_write) == self.chunk_size: self._flush_data(to_write) to_write = read(self.chunk_size) - self._buffer.write(to_write) # type: ignore + self._buffer.write(to_write) def _write_async(self, read: Any) -> None: if self._buffer.tell() > 0: From 080c1c61212594e4de4792cf055b904a570c4359 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Fri, 28 Feb 2025 10:48:36 -0800 Subject: [PATCH 8/9] PYTHON-5166 Allow Database.command to run bulkWrite commands (#2164) --- doc/changelog.rst | 21 ++++++++++++++++++++- pymongo/message.py | 2 +- test/asynchronous/test_database.py | 15 +++++++++++++++ test/test_database.py | 15 +++++++++++++++ 4 files changed, 51 insertions(+), 2 deletions(-) diff --git a/doc/changelog.rst b/doc/changelog.rst index ee66bb178..fcad842de 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -1,7 +1,26 @@ Changelog ========= -Changes in Version 4.11.0 (YYYY/MM/DD) +Changes in Version 4.11.2 (YYYY/MM/DD) +-------------------------------------- + +Version 4.11.2 is a bug fix release. + +- Fixed a bug where :meth:`~pymongo.database.Database.command` would fail when attempting to run the bulkWrite command. + +Issues Resolved +............... + +See the `PyMongo 4.11.2 release notes in JIRA`_ for the list of resolved issues in this release. + +.. _PyMongo 4.11.2 release notes in JIRA: https://jira.mongodb.org/secure/ReleaseNote.jspa?projectId=10004&version=42506 + +Changes in Version 4.11.1 (2025/02/10) +-------------------------------------- + +- Fixed support for prebuilt ``ppc64le`` and ``s390x`` wheels. + +Changes in Version 4.11.0 (2025/01/28) -------------------------------------- .. warning:: PyMongo 4.11 drops support for Python 3.8 and PyPy 3.9: Python 3.9+ or PyPy 3.10+ is now required. diff --git a/pymongo/message.py b/pymongo/message.py index 10c9edb5c..8e2fd6f99 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -105,7 +105,7 @@ _FIELD_MAP = { "insert": "documents", "update": "updates", "delete": "deletes", - "bulkWrite": "bulkWrite", + "bulkWrite": "ops", } _UNICODE_REPLACE_CODEC_OPTIONS: CodecOptions[Mapping[str, Any]] = CodecOptions( diff --git a/test/asynchronous/test_database.py b/test/asynchronous/test_database.py index 55a8cc3ab..2bbf763ab 100644 --- a/test/asynchronous/test_database.py +++ b/test/asynchronous/test_database.py @@ -430,6 +430,21 @@ class TestDatabase(AsyncIntegrationTest): for doc in result["cursor"]["firstBatch"]: self.assertTrue(isinstance(doc["r"], Regex)) + async def test_command_bulkWrite(self): + # Ensure bulk write commands can be run directly via db.command(). + if async_client_context.version.at_least(8, 0): + await self.client.admin.command( + { + "bulkWrite": 1, + "nsInfo": [{"ns": self.db.test.full_name}], + "ops": [{"insert": 0, "document": {}}], + } + ) + await self.db.command({"insert": "test", "documents": [{}]}) + await self.db.command({"update": "test", "updates": [{"q": {}, "u": {"$set": {"x": 1}}}]}) + await self.db.command({"delete": "test", "deletes": [{"q": {}, "limit": 1}]}) + await self.db.test.drop() + async def test_cursor_command(self): db = self.client.pymongo_test await db.test.drop() diff --git a/test/test_database.py b/test/test_database.py index aad9089bd..48cca921b 100644 --- a/test/test_database.py +++ b/test/test_database.py @@ -425,6 +425,21 @@ class TestDatabase(IntegrationTest): for doc in result["cursor"]["firstBatch"]: self.assertTrue(isinstance(doc["r"], Regex)) + def test_command_bulkWrite(self): + # Ensure bulk write commands can be run directly via db.command(). + if client_context.version.at_least(8, 0): + self.client.admin.command( + { + "bulkWrite": 1, + "nsInfo": [{"ns": self.db.test.full_name}], + "ops": [{"insert": 0, "document": {}}], + } + ) + self.db.command({"insert": "test", "documents": [{}]}) + self.db.command({"update": "test", "updates": [{"q": {}, "u": {"$set": {"x": 1}}}]}) + self.db.command({"delete": "test", "deletes": [{"q": {}, "limit": 1}]}) + self.db.test.drop() + def test_cursor_command(self): db = self.client.pymongo_test db.test.drop() From e28f49c51098592268ace56c49b8110691c178b7 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Fri, 28 Feb 2025 15:24:28 -0800 Subject: [PATCH 9/9] PYTHON-5164 Fix mockupdb TestClusterTime (#2163) --- test/mockupdb/test_cluster_time.py | 49 +++--------------------------- 1 file changed, 5 insertions(+), 44 deletions(-) diff --git a/test/mockupdb/test_cluster_time.py b/test/mockupdb/test_cluster_time.py index ea879b7ea..42ca91697 100644 --- a/test/mockupdb/test_cluster_time.py +++ b/test/mockupdb/test_cluster_time.py @@ -123,50 +123,11 @@ class TestClusterTime(PyMongoTestCase): client = self.simple_client(server.uri, heartbeatFrequencyMS=500) - request = server.receives("ismaster") - # No $clusterTime in first ismaster, only in subsequent ones - self.assertNotIn("$clusterTime", request) - request.ok(reply) - - # Next exchange: client returns first clusterTime, we send the second. - request = server.receives("ismaster") - self.assertIn("$clusterTime", request) - self.assertEqual(request["$clusterTime"]["clusterTime"], cluster_time) - cluster_time = Timestamp(cluster_time.time, cluster_time.inc + 1) - reply["$clusterTime"] = {"clusterTime": cluster_time} - request.reply(reply) - - # Third exchange: client returns second clusterTime. - request = server.receives("ismaster") - self.assertEqual(request["$clusterTime"]["clusterTime"], cluster_time) - - # Return command error with a new clusterTime. - cluster_time = Timestamp(cluster_time.time, cluster_time.inc + 1) - error = { - "ok": 0, - "code": 211, - "errmsg": "Cache Reader No keys found for HMAC ...", - "$clusterTime": {"clusterTime": cluster_time}, - } - request.reply(error) - - # PyMongo 3.11+ closes the monitoring connection on command errors. - - # Fourth exchange: the Monitor closes the connection and runs the - # handshake on a new connection. - request = server.receives("ismaster") - # No $clusterTime in first ismaster, only in subsequent ones - self.assertNotIn("$clusterTime", request) - - # Reply without $clusterTime. - reply.pop("$clusterTime") - request.reply(reply) - - # Fifth exchange: the Monitor attempt uses the clusterTime from - # the previous isMaster error. - request = server.receives("ismaster") - self.assertEqual(request["$clusterTime"]["clusterTime"], cluster_time) - request.reply(reply) + for _ in range(3): + request = server.receives("ismaster") + # No $clusterTime in heartbeats or handshakes. + self.assertNotIn("$clusterTime", request) + request.ok(reply) client.close() def test_collection_bulk_error(self):