Merge branch 'master' of github.com:mongodb/mongo-python-driver

This commit is contained in:
Steven Silvester 2025-03-01 07:16:53 -06:00
commit 1d39d876de
No known key found for this signature in database
GPG Key ID: B1BF5EC3A8B32F91
38 changed files with 555 additions and 244 deletions

View File

@ -2,38 +2,21 @@
set -eu
# Enable core dumps if enabled on the machine
# Copied from https://github.com/mongodb/mongo/blob/master/etc/evergreen.yml
if [ -f /proc/self/coredump_filter ]; then
# Set the shell process (and its children processes) to dump ELF headers (bit 4),
# anonymous shared mappings (bit 1), and anonymous private mappings (bit 0).
echo 0x13 >/proc/self/coredump_filter
if [ -f /sbin/sysctl ]; then
# Check that the core pattern is set explicitly on our distro image instead
# of being the OS's default value. This ensures that coredump names are consistent
# across distros and can be picked up by Evergreen.
core_pattern=$(/sbin/sysctl -n "kernel.core_pattern")
if [ "$core_pattern" = "dump_%e.%p.core" ]; then
echo "Enabling coredumps"
ulimit -c unlimited
fi
fi
fi
if [ "$(uname -s)" = "Darwin" ]; then
core_pattern_mac=$(/usr/sbin/sysctl -n "kern.corefile")
if [ "$core_pattern_mac" = "dump_%N.%P.core" ]; then
echo "Enabling coredumps"
ulimit -c unlimited
fi
fi
HERE=$(dirname ${BASH_SOURCE:-$0})
HERE="$( cd -- "$HERE" > /dev/null 2>&1 && pwd )"
ROOT=$(dirname "$(dirname $HERE)")
if [ -z "${TEST_CRYPT_SHARED:-}" ]; then
export SKIP_CRYPT_SHARED=1
fi
# Override the tls files if applicable.
if [ "${SSL:-}" == "ssl" ]; then
export TLS_CERT_KEY_FILE=${ROOT}/test/certificates/client.pem
export TLS_PEM_KEY_FILE=${ROOT}/test/certificates/server.pem
export TLS_CA_FILE=${ROOT}/test/certificates/ca.pem
fi
MONGODB_VERSION=${VERSION:-} \
TOPOLOGY=${TOPOLOGY:-} \
AUTH=${AUTH:-} \

View File

@ -76,7 +76,9 @@ EOT
# Write the .env file for drivers-tools.
rm -rf $DRIVERS_TOOLS
git clone https://github.com/mongodb-labs/drivers-evergreen-tools.git $DRIVERS_TOOLS
BRANCH=master
ORG=mongodb-labs
git clone --branch $BRANCH https://github.com/$ORG/drivers-evergreen-tools.git $DRIVERS_TOOLS
cat <<EOT > ${DRIVERS_TOOLS}/.env
SKIP_LEGACY_SHELL=1

View File

@ -1,24 +0,0 @@
#!/bin/bash
set -eu
HERE=$(dirname ${BASH_SOURCE:-$0})
pushd $HERE
. env.sh
popd
# Copy PyMongo's test certificates over driver-evergreen-tools'
cp ${PROJECT_DIRECTORY}/test/certificates/* ${DRIVERS_TOOLS}/.evergreen/x509gen/
# Replace MongoOrchestration's client certificate.
cp ${PROJECT_DIRECTORY}/test/certificates/client.pem ${MONGO_ORCHESTRATION_HOME}/lib/client.pem
if [ -w /etc/hosts ]; then
SUDO=""
else
SUDO="sudo"
fi
# Add 'server' and 'hostname_not_in_cert' as a hostnames
echo "127.0.0.1 server" | $SUDO tee -a /etc/hosts
echo "127.0.0.1 hostname_not_in_cert" | $SUDO tee -a /etc/hosts

View File

@ -7,8 +7,35 @@ pushd "$(dirname "$(dirname $HERE)")"
echo "Setting up system..."
bash .evergreen/scripts/configure-env.sh
source .evergreen/scripts/env.sh
bash .evergreen/scripts/prepare-resources.sh
bash $DRIVERS_TOOLS/.evergreen/setup.sh
bash .evergreen/scripts/install-dependencies.sh
popd
# Enable core dumps if enabled on the machine
# Copied from https://github.com/mongodb/mongo/blob/master/etc/evergreen.yml
if [ -f /proc/self/coredump_filter ]; then
# Set the shell process (and its children processes) to dump ELF headers (bit 4),
# anonymous shared mappings (bit 1), and anonymous private mappings (bit 0).
echo 0x13 >/proc/self/coredump_filter
if [ -f /sbin/sysctl ]; then
# Check that the core pattern is set explicitly on our distro image instead
# of being the OS's default value. This ensures that coredump names are consistent
# across distros and can be picked up by Evergreen.
core_pattern=$(/sbin/sysctl -n "kernel.core_pattern")
if [ "$core_pattern" = "dump_%e.%p.core" ]; then
echo "Enabling coredumps"
ulimit -c unlimited
fi
fi
fi
if [ "$(uname -s)" = "Darwin" ]; then
core_pattern_mac=$(/usr/sbin/sysctl -n "kern.corefile")
if [ "$core_pattern_mac" = "dump_%N.%P.core" ]; then
echo "Enabling coredumps"
ulimit -c unlimited
fi
fi
echo "Setting up system... done."

View File

@ -1,7 +1,26 @@
Changelog
=========
Changes in Version 4.11.0 (YYYY/MM/DD)
Changes in Version 4.11.2 (YYYY/MM/DD)
--------------------------------------
Version 4.11.2 is a bug fix release.
- Fixed a bug where :meth:`~pymongo.database.Database.command` would fail when attempting to run the bulkWrite command.
Issues Resolved
...............
See the `PyMongo 4.11.2 release notes in JIRA`_ for the list of resolved issues in this release.
.. _PyMongo 4.11.2 release notes in JIRA: https://jira.mongodb.org/secure/ReleaseNote.jspa?projectId=10004&version=42506
Changes in Version 4.11.1 (2025/02/10)
--------------------------------------
- Fixed support for prebuilt ``ppc64le`` and ``s390x`` wheels.
Changes in Version 4.11.0 (2025/01/28)
--------------------------------------
.. warning:: PyMongo 4.11 drops support for Python 3.8 and PyPy 3.9: Python 3.9+ or PyPy 3.10+ is now required.

View File

@ -1301,11 +1301,8 @@ class AsyncGridIn:
raise ValueError("cannot write to a closed file")
try:
if isinstance(data, AsyncGridOut):
read = data.read
else:
# file-like
read = data.read
# file-like
read = data.read
except AttributeError:
# string
if not isinstance(data, (str, bytes)):
@ -1317,7 +1314,7 @@ class AsyncGridIn:
raise TypeError(
"must specify an encoding for file in order to write str"
) from None
read = io.BytesIO(data).read # type: ignore[assignment]
read = io.BytesIO(data).read
if inspect.iscoroutinefunction(read):
await self._write_async(read)
@ -1331,15 +1328,15 @@ class AsyncGridIn:
except BaseException:
await self.abort()
raise
self._buffer.write(to_write) # type: ignore
if len(to_write) < space: # type: ignore
self._buffer.write(to_write)
if len(to_write) < space:
return # EOF or incomplete
await self._flush_buffer()
to_write = read(self.chunk_size)
while to_write and len(to_write) == self.chunk_size: # type: ignore
while to_write and len(to_write) == self.chunk_size:
await self._flush_data(to_write)
to_write = read(self.chunk_size)
self._buffer.write(to_write) # type: ignore
self._buffer.write(to_write)
async def _write_async(self, read: Any) -> None:
if self._buffer.tell() > 0:

View File

@ -1291,11 +1291,8 @@ class GridIn:
raise ValueError("cannot write to a closed file")
try:
if isinstance(data, GridOut):
read = data.read
else:
# file-like
read = data.read
# file-like
read = data.read
except AttributeError:
# string
if not isinstance(data, (str, bytes)):
@ -1307,7 +1304,7 @@ class GridIn:
raise TypeError(
"must specify an encoding for file in order to write str"
) from None
read = io.BytesIO(data).read # type: ignore[assignment]
read = io.BytesIO(data).read
if inspect.iscoroutinefunction(read):
self._write_async(read)
@ -1321,15 +1318,15 @@ class GridIn:
except BaseException:
self.abort()
raise
self._buffer.write(to_write) # type: ignore
if len(to_write) < space: # type: ignore
self._buffer.write(to_write)
if len(to_write) < space:
return # EOF or incomplete
self._flush_buffer()
to_write = read(self.chunk_size)
while to_write and len(to_write) == self.chunk_size: # type: ignore
while to_write and len(to_write) == self.chunk_size:
self._flush_data(to_write)
to_write = read(self.chunk_size)
self._buffer.write(to_write) # type: ignore
self._buffer.write(to_write)
def _write_async(self, read: Any) -> None:
if self._buffer.tell() > 0:

View File

@ -21,11 +21,11 @@ import atexit
import logging
import time
import weakref
from typing import TYPE_CHECKING, Any, Mapping, Optional, cast
from typing import TYPE_CHECKING, Any, Optional
from pymongo import common, periodic_executor
from pymongo._csot import MovingMinimum
from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled
from pymongo.errors import NetworkTimeout, _OperationCancelled
from pymongo.hello import Hello
from pymongo.lock import _async_create_lock
from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage
@ -255,13 +255,7 @@ class Monitor(MonitorBase):
self._conn_id = None
start = time.monotonic()
try:
try:
return await self._check_once()
except (OperationFailure, NotPrimaryError) as exc:
# Update max cluster time even when hello fails.
details = cast(Mapping[str, Any], exc.details)
await self._topology.receive_cluster_time(details.get("$clusterTime"))
raise
return await self._check_once()
except ReferenceError:
raise
except Exception as error:
@ -358,7 +352,6 @@ class Monitor(MonitorBase):
Can raise ConnectionFailure or OperationFailure.
"""
cluster_time = self._topology.max_cluster_time()
start = time.monotonic()
if conn.more_to_come:
# Read the next streaming hello (MongoDB 4.4+).
@ -368,13 +361,12 @@ class Monitor(MonitorBase):
):
# Initiate streaming hello (MongoDB 4.4+).
response = await conn._hello(
cluster_time,
self._server_description.topology_version,
self._settings.heartbeat_frequency,
)
else:
# New connection handshake or polling hello (MongoDB <4.4).
response = await conn._hello(cluster_time, None, None)
response = await conn._hello(None, None)
duration = _monotonic_duration(start)
return response, duration

View File

@ -207,6 +207,10 @@ async def command(
)
response_doc = unpacked_docs[0]
if not conn.ready:
cluster_time = response_doc.get("$clusterTime")
if cluster_time:
conn._cluster_time = cluster_time
if client:
await client._process_response(response_doc, session)
if check:

View File

@ -102,7 +102,7 @@ if TYPE_CHECKING:
from pymongo.pyopenssl_context import _sslConn
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import _ServerMode
from pymongo.typings import ClusterTime, _Address, _CollationIn
from pymongo.typings import _Address, _CollationIn
from pymongo.write_concern import WriteConcern
try:
@ -310,6 +310,8 @@ class AsyncConnection:
self.connect_rtt = 0.0
self._client_id = pool._client_id
self.creation_time = time.monotonic()
# For gossiping $clusterTime from the connection handshake to the client.
self._cluster_time = None
def set_conn_timeout(self, timeout: Optional[float]) -> None:
"""Cache last timeout to avoid duplicate calls to conn.settimeout."""
@ -374,11 +376,10 @@ class AsyncConnection:
return {HelloCompat.LEGACY_CMD: 1, "helloOk": True}
async def hello(self) -> Hello:
return await self._hello(None, None, None)
return await self._hello(None, None)
async def _hello(
self,
cluster_time: Optional[ClusterTime],
topology_version: Optional[Any],
heartbeat_frequency: Optional[int],
) -> Hello[dict[str, Any]]:
@ -401,9 +402,6 @@ class AsyncConnection:
if self.opts.connect_timeout:
self.set_conn_timeout(self.opts.connect_timeout + heartbeat_frequency)
if not performing_handshake and cluster_time is not None:
cmd["$clusterTime"] = cluster_time
creds = self.opts._credentials
if creds:
if creds.mechanism == "DEFAULT" and creds.username:
@ -1316,6 +1314,9 @@ class Pool:
conn.close_conn(ConnectionClosedReason.ERROR)
raise
if handler:
await handler.client._topology.receive_cluster_time(conn._cluster_time)
return conn
@contextlib.asynccontextmanager

View File

@ -501,7 +501,6 @@ class Topology:
self._description = new_td
await self._update_servers()
self._receive_cluster_time_no_lock(server_description.cluster_time)
if self._publish_tp and not suppress_event:
assert self._events is not None

View File

@ -105,7 +105,7 @@ _FIELD_MAP = {
"insert": "documents",
"update": "updates",
"delete": "deletes",
"bulkWrite": "bulkWrite",
"bulkWrite": "ops",
}
_UNICODE_REPLACE_CODEC_OPTIONS: CodecOptions[Mapping[str, Any]] = CodecOptions(

View File

@ -21,11 +21,11 @@ import atexit
import logging
import time
import weakref
from typing import TYPE_CHECKING, Any, Mapping, Optional, cast
from typing import TYPE_CHECKING, Any, Optional
from pymongo import common, periodic_executor
from pymongo._csot import MovingMinimum
from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled
from pymongo.errors import NetworkTimeout, _OperationCancelled
from pymongo.hello import Hello
from pymongo.lock import _create_lock
from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage
@ -253,13 +253,7 @@ class Monitor(MonitorBase):
self._conn_id = None
start = time.monotonic()
try:
try:
return self._check_once()
except (OperationFailure, NotPrimaryError) as exc:
# Update max cluster time even when hello fails.
details = cast(Mapping[str, Any], exc.details)
self._topology.receive_cluster_time(details.get("$clusterTime"))
raise
return self._check_once()
except ReferenceError:
raise
except Exception as error:
@ -356,7 +350,6 @@ class Monitor(MonitorBase):
Can raise ConnectionFailure or OperationFailure.
"""
cluster_time = self._topology.max_cluster_time()
start = time.monotonic()
if conn.more_to_come:
# Read the next streaming hello (MongoDB 4.4+).
@ -366,13 +359,12 @@ class Monitor(MonitorBase):
):
# Initiate streaming hello (MongoDB 4.4+).
response = conn._hello(
cluster_time,
self._server_description.topology_version,
self._settings.heartbeat_frequency,
)
else:
# New connection handshake or polling hello (MongoDB <4.4).
response = conn._hello(cluster_time, None, None)
response = conn._hello(None, None)
duration = _monotonic_duration(start)
return response, duration

View File

@ -207,6 +207,10 @@ def command(
)
response_doc = unpacked_docs[0]
if not conn.ready:
cluster_time = response_doc.get("$clusterTime")
if cluster_time:
conn._cluster_time = cluster_time
if client:
client._process_response(response_doc, session)
if check:

View File

@ -102,7 +102,7 @@ if TYPE_CHECKING:
from pymongo.synchronous.auth import _AuthContext
from pymongo.synchronous.client_session import ClientSession
from pymongo.synchronous.mongo_client import MongoClient, _MongoClientErrorHandler
from pymongo.typings import ClusterTime, _Address, _CollationIn
from pymongo.typings import _Address, _CollationIn
from pymongo.write_concern import WriteConcern
try:
@ -310,6 +310,8 @@ class Connection:
self.connect_rtt = 0.0
self._client_id = pool._client_id
self.creation_time = time.monotonic()
# For gossiping $clusterTime from the connection handshake to the client.
self._cluster_time = None
def set_conn_timeout(self, timeout: Optional[float]) -> None:
"""Cache last timeout to avoid duplicate calls to conn.settimeout."""
@ -374,11 +376,10 @@ class Connection:
return {HelloCompat.LEGACY_CMD: 1, "helloOk": True}
def hello(self) -> Hello:
return self._hello(None, None, None)
return self._hello(None, None)
def _hello(
self,
cluster_time: Optional[ClusterTime],
topology_version: Optional[Any],
heartbeat_frequency: Optional[int],
) -> Hello[dict[str, Any]]:
@ -401,9 +402,6 @@ class Connection:
if self.opts.connect_timeout:
self.set_conn_timeout(self.opts.connect_timeout + heartbeat_frequency)
if not performing_handshake and cluster_time is not None:
cmd["$clusterTime"] = cluster_time
creds = self.opts._credentials
if creds:
if creds.mechanism == "DEFAULT" and creds.username:
@ -1310,6 +1308,9 @@ class Pool:
conn.close_conn(ConnectionClosedReason.ERROR)
raise
if handler:
handler.client._topology.receive_cluster_time(conn._cluster_time)
return conn
@contextlib.contextmanager

View File

@ -501,7 +501,6 @@ class Topology:
self._description = new_td
self._update_servers()
self._receive_cluster_time_no_lock(server_description.cluster_time)
if self._publish_tp and not suppress_event:
assert self._events is not None

View File

@ -933,17 +933,22 @@ class PyMongoTestCase(unittest.TestCase):
def assertEqualReply(self, expected, actual, msg=None):
self.assertEqual(sanitize_reply(expected), sanitize_reply(actual), msg)
@staticmethod
def configure_fail_point(client, command_args, off=False):
cmd = {"configureFailPoint": "failCommand"}
cmd.update(command_args)
if off:
cmd["mode"] = "off"
cmd.pop("data", None)
client.admin.command(cmd)
@contextmanager
def fail_point(self, command_args):
cmd_on = SON([("configureFailPoint", "failCommand")])
cmd_on.update(command_args)
client_context.client.admin.command(cmd_on)
self.configure_fail_point(client_context.client, command_args)
try:
yield
finally:
client_context.client.admin.command(
"configureFailPoint", cmd_on["configureFailPoint"], mode="off"
)
self.configure_fail_point(client_context.client, command_args, off=True)
@contextmanager
def fork(

View File

@ -935,17 +935,22 @@ class AsyncPyMongoTestCase(unittest.TestCase):
def assertEqualReply(self, expected, actual, msg=None):
self.assertEqual(sanitize_reply(expected), sanitize_reply(actual), msg)
@staticmethod
async def configure_fail_point(client, command_args, off=False):
cmd = {"configureFailPoint": "failCommand"}
cmd.update(command_args)
if off:
cmd["mode"] = "off"
cmd.pop("data", None)
await client.admin.command(cmd)
@asynccontextmanager
async def fail_point(self, command_args):
cmd_on = SON([("configureFailPoint", "failCommand")])
cmd_on.update(command_args)
await async_client_context.client.admin.command(cmd_on)
await self.configure_fail_point(async_client_context.client, command_args)
try:
yield
finally:
await async_client_context.client.admin.command(
"configureFailPoint", cmd_on["configureFailPoint"], mode="off"
)
await self.configure_fail_point(async_client_context.client, command_args, off=True)
@contextmanager
def fork(

View File

@ -211,15 +211,10 @@ class AsyncTestCMAP(AsyncIntegrationTest):
self.check_object(actual, expected)
self.assertIn(message, str(actual))
async def _set_fail_point(self, client, command_args):
cmd = SON([("configureFailPoint", "failCommand")])
cmd.update(command_args)
await client.admin.command(cmd)
async def set_fail_point(self, command_args):
if not async_client_context.supports_failCommand_fail_point:
self.skipTest("failCommand fail point must be supported")
await self._set_fail_point(self.client, command_args)
await self.configure_fail_point(self.client, command_args)
async def run_scenario(self, scenario_def, test):
"""Run a CMAP spec test."""

View File

@ -430,6 +430,21 @@ class TestDatabase(AsyncIntegrationTest):
for doc in result["cursor"]["firstBatch"]:
self.assertTrue(isinstance(doc["r"], Regex))
async def test_command_bulkWrite(self):
# Ensure bulk write commands can be run directly via db.command().
if async_client_context.version.at_least(8, 0):
await self.client.admin.command(
{
"bulkWrite": 1,
"nsInfo": [{"ns": self.db.test.full_name}],
"ops": [{"insert": 0, "document": {}}],
}
)
await self.db.command({"insert": "test", "documents": [{}]})
await self.db.command({"update": "test", "updates": [{"q": {}, "u": {"$set": {"x": 1}}}]})
await self.db.command({"delete": "test", "deletes": [{"q": {}, "limit": 1}]})
await self.db.test.drop()
async def test_cursor_command(self):
db = self.client.pymongo_test
await db.test.drop()

View File

@ -0,0 +1,121 @@
# Copyright 2014-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test the monitor module."""
from __future__ import annotations
import asyncio
import gc
import subprocess
import sys
import warnings
from functools import partial
sys.path[0:0] = [""]
from test.asynchronous import AsyncIntegrationTest, async_client_context, connected, unittest
from test.utils import (
ServerAndTopologyEventListener,
async_wait_until,
)
from pymongo.periodic_executor import _EXECUTORS
_IS_SYNC = False
def unregistered(ref):
gc.collect()
return ref not in _EXECUTORS
def get_executors(client):
executors = []
for server in client._topology._servers.values():
executors.append(server._monitor._executor)
executors.append(server._monitor._rtt_monitor._executor)
executors.append(client._kill_cursors_executor)
executors.append(client._topology._Topology__events_executor)
return [e for e in executors if e is not None]
class TestMonitor(AsyncIntegrationTest):
async def create_client(self):
listener = ServerAndTopologyEventListener()
client = await self.unmanaged_async_single_client(event_listeners=[listener])
await connected(client)
return client
async def test_cleanup_executors_on_client_del(self):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
client = await self.create_client()
executors = get_executors(client)
self.assertEqual(len(executors), 4)
# Each executor stores a weakref to itself in _EXECUTORS.
executor_refs = [(r, r()._name) for r in _EXECUTORS.copy() if r() in executors]
del executors
del client
for ref, name in executor_refs:
await async_wait_until(
partial(unregistered, ref), f"unregister executor: {name}", timeout=5
)
def resource_warning_caught():
gc.collect()
for warning in w:
if (
issubclass(warning.category, ResourceWarning)
and "Call AsyncMongoClient.close() to safely shut down your client and free up resources."
in str(warning.message)
):
return True
return False
await async_wait_until(resource_warning_caught, "catch resource warning")
async def test_cleanup_executors_on_client_close(self):
client = await self.create_client()
executors = get_executors(client)
self.assertEqual(len(executors), 4)
await client.close()
for executor in executors:
await async_wait_until(
lambda: executor._stopped, f"closed executor: {executor._name}", timeout=5
)
@async_client_context.require_sync
def test_no_thread_start_runtime_err_on_shutdown(self):
"""Test we silence noisy runtime errors fired when the AsyncMongoClient spawns a new thread
on process shutdown."""
command = [
sys.executable,
"-c",
"from pymongo import AsyncMongoClient; c = AsyncMongoClient()",
]
completed_process: subprocess.CompletedProcess = subprocess.run(
command, capture_output=True
)
self.assertFalse(completed_process.stderr)
self.assertFalse(completed_process.stdout)
if __name__ == "__main__":
unittest.main()

View File

@ -36,8 +36,10 @@ from test.asynchronous import (
async_client_context,
unittest,
)
from test.asynchronous.helpers import client_knobs
from test.utils import (
EventListener,
HeartbeatEventListener,
OvertCommandListener,
async_wait_until,
)
@ -1135,12 +1137,10 @@ class TestClusterTime(AsyncIntegrationTest):
if "$clusterTime" not in (await async_client_context.hello):
raise SkipTest("$clusterTime not supported")
# Sessions prose test: 3) $clusterTime in commands
async def test_cluster_time(self):
listener = SessionTestListener()
# Prevent heartbeats from updating $clusterTime between operations.
client = await self.async_rs_or_single_client(
event_listeners=[listener], heartbeatFrequencyMS=999999
)
client = await self.async_rs_or_single_client(event_listeners=[listener])
collection = client.pymongo_test.collection
# Prepare for tests of find() and aggregate().
await collection.insert_many([{} for _ in range(10)])
@ -1219,6 +1219,40 @@ class TestClusterTime(AsyncIntegrationTest):
f"{f.__name__} sent wrong $clusterTime with {event.command_name}",
)
# Sessions prose test: 20) Drivers do not gossip `$clusterTime` on SDAM commands
async def test_cluster_time_not_used_by_sdam(self):
heartbeat_listener = HeartbeatEventListener()
cmd_listener = OvertCommandListener()
with client_knobs(min_heartbeat_interval=0.01):
c1 = await self.async_single_client(
event_listeners=[heartbeat_listener, cmd_listener], heartbeatFrequencyMS=10
)
cluster_time = (await c1.admin.command({"ping": 1}))["$clusterTime"]
self.assertEqual(c1._topology.max_cluster_time(), cluster_time)
# Advance the server's $clusterTime by performing an insert via another client.
await self.db.test.insert_one({"advance": "$clusterTime"})
# Wait until the client C1 processes the next pair of SDAM heartbeat started + succeeded events.
heartbeat_listener.reset()
async def next_heartbeat():
events = heartbeat_listener.events
for i in range(len(events) - 1):
if isinstance(events[i], monitoring.ServerHeartbeatStartedEvent):
if isinstance(events[i + 1], monitoring.ServerHeartbeatSucceededEvent):
return True
return False
await async_wait_until(
next_heartbeat, "never found pair of heartbeat started + succeeded events"
)
# Assert that C1's max $clusterTime is still the same and has not been updated by SDAM.
cmd_listener.reset()
await c1.admin.command({"ping": 1})
started = cmd_listener.started_events[0]
self.assertEqual(started.command_name, "ping")
self.assertEqual(started.command["$clusterTime"], cluster_time)
if __name__ == "__main__":
unittest.main()

View File

@ -410,15 +410,10 @@ class TestTransactionsConvenientAPI(AsyncTransactionsBase):
for address in async_client_context.mongoses:
self.mongos_clients.append(await self.async_single_client("{}:{}".format(*address)))
async def _set_fail_point(self, client, command_args):
cmd = {"configureFailPoint": "failCommand"}
cmd.update(command_args)
await client.admin.command(cmd)
async def set_fail_point(self, command_args):
clients = self.mongos_clients if self.mongos_clients else [self.client]
for client in clients:
await self._set_fail_point(client, command_args)
await self.configure_fail_point(client, command_args)
@async_client_context.require_transactions
async def test_callback_raises_custom_error(self):

View File

@ -1008,12 +1008,8 @@ class UnifiedSpecTestMixinV1(AsyncIntegrationTest):
if not async_client_context.test_commands_enabled:
self.skipTest("Test commands must be enabled")
cmd_on = SON([("configureFailPoint", "failCommand")])
cmd_on.update(command_args)
await client.admin.command(cmd_on)
self.addAsyncCleanup(
client.admin.command, "configureFailPoint", cmd_on["configureFailPoint"], mode="off"
)
await self.configure_fail_point(client, command_args)
self.addAsyncCleanup(self.configure_fail_point, client, command_args, off=True)
async def _testOperation_failPoint(self, spec):
await self.__set_fail_point(

View File

@ -264,15 +264,10 @@ class AsyncSpecRunner(AsyncIntegrationTest):
async def asyncTearDown(self) -> None:
self.knobs.disable()
async def _set_fail_point(self, client, command_args):
cmd = SON([("configureFailPoint", "failCommand")])
cmd.update(command_args)
await client.admin.command(cmd)
async def set_fail_point(self, command_args):
clients = self.mongos_clients if self.mongos_clients else [self.client]
for client in clients:
await self._set_fail_point(client, command_args)
await self.configure_fail_point(client, command_args)
async def targeted_fail_point(self, session, fail_point):
"""Run the targetedFailPoint test operation.
@ -281,7 +276,7 @@ class AsyncSpecRunner(AsyncIntegrationTest):
"""
clients = {c.address: c for c in self.mongos_clients}
client = clients[session._pinned_address]
await self._set_fail_point(client, fail_point)
await self.configure_fail_point(client, fail_point)
self.addAsyncCleanup(self.set_fail_point, {"mode": "off"})
def assert_session_pinned(self, session):

View File

@ -0,0 +1,145 @@
{
"description": "change-streams-nsType",
"schemaVersion": "1.7",
"runOnRequirements": [
{
"minServerVersion": "8.1.0",
"topologies": [
"replicaset",
"sharded"
],
"serverless": "forbid"
}
],
"createEntities": [
{
"client": {
"id": "client0",
"useMultipleMongoses": false
}
},
{
"database": {
"id": "database0",
"client": "client0",
"databaseName": "database0"
}
}
],
"tests": [
{
"description": "nsType is present when creating collections",
"operations": [
{
"name": "dropCollection",
"object": "database0",
"arguments": {
"collection": "foo"
}
},
{
"name": "createChangeStream",
"object": "database0",
"arguments": {
"pipeline": [],
"showExpandedEvents": true
},
"saveResultAsEntity": "changeStream0"
},
{
"name": "createCollection",
"object": "database0",
"arguments": {
"collection": "foo"
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "changeStream0",
"expectResult": {
"operationType": "create",
"nsType": "collection"
}
}
]
},
{
"description": "nsType is present when creating timeseries",
"operations": [
{
"name": "dropCollection",
"object": "database0",
"arguments": {
"collection": "foo"
}
},
{
"name": "createChangeStream",
"object": "database0",
"arguments": {
"pipeline": [],
"showExpandedEvents": true
},
"saveResultAsEntity": "changeStream0"
},
{
"name": "createCollection",
"object": "database0",
"arguments": {
"collection": "foo",
"timeseries": {
"timeField": "time",
"metaField": "meta",
"granularity": "minutes"
}
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "changeStream0",
"expectResult": {
"operationType": "create",
"nsType": "timeseries"
}
}
]
},
{
"description": "nsType is present when creating views",
"operations": [
{
"name": "dropCollection",
"object": "database0",
"arguments": {
"collection": "foo"
}
},
{
"name": "createChangeStream",
"object": "database0",
"arguments": {
"pipeline": [],
"showExpandedEvents": true
},
"saveResultAsEntity": "changeStream0"
},
{
"name": "createCollection",
"object": "database0",
"arguments": {
"collection": "foo",
"viewOn": "testName"
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "changeStream0",
"expectResult": {
"operationType": "create",
"nsType": "view"
}
}
]
}
]
}

View File

@ -40,7 +40,7 @@
},
{
"description": "Colon in a key value pair",
"uri": "mongodb://example.com/?authMechanism=MONGODB-OIDC&authMechanismProperties=TOKEN_RESOURCE:mongodb://test-cluster",
"uri": "mongodb://example.com/?authMechanism=MONGODB-OIDC&authMechanismProperties=TOKEN_RESOURCE:mongodb://test-cluster,ENVIRONMENT:azure",
"valid": true,
"warning": false,
"hosts": [
@ -53,9 +53,10 @@
"auth": null,
"options": {
"authmechanismProperties": {
"TOKEN_RESOURCE": "mongodb://test-cluster"
"TOKEN_RESOURCE": "mongodb://test-cluster",
"ENVIRONMENT": "azure"
}
}
}
]
}
}

View File

@ -96,7 +96,7 @@
},
{
"description": "Comma in a key value pair causes a warning",
"uri": "mongodb://localhost?authMechanism=MONGODB-OIDC&authMechanismProperties=TOKEN_RESOURCE:mongodb://host1%2Chost2",
"uri": "mongodb://localhost?authMechanism=MONGODB-OIDC&authMechanismProperties=TOKEN_RESOURCE:mongodb://host1%2Chost2,ENVIRONMENT:azure",
"valid": true,
"warning": true,
"hosts": [
@ -112,4 +112,4 @@
}
}
]
}
}

View File

@ -123,50 +123,11 @@ class TestClusterTime(PyMongoTestCase):
client = self.simple_client(server.uri, heartbeatFrequencyMS=500)
request = server.receives("ismaster")
# No $clusterTime in first ismaster, only in subsequent ones
self.assertNotIn("$clusterTime", request)
request.ok(reply)
# Next exchange: client returns first clusterTime, we send the second.
request = server.receives("ismaster")
self.assertIn("$clusterTime", request)
self.assertEqual(request["$clusterTime"]["clusterTime"], cluster_time)
cluster_time = Timestamp(cluster_time.time, cluster_time.inc + 1)
reply["$clusterTime"] = {"clusterTime": cluster_time}
request.reply(reply)
# Third exchange: client returns second clusterTime.
request = server.receives("ismaster")
self.assertEqual(request["$clusterTime"]["clusterTime"], cluster_time)
# Return command error with a new clusterTime.
cluster_time = Timestamp(cluster_time.time, cluster_time.inc + 1)
error = {
"ok": 0,
"code": 211,
"errmsg": "Cache Reader No keys found for HMAC ...",
"$clusterTime": {"clusterTime": cluster_time},
}
request.reply(error)
# PyMongo 3.11+ closes the monitoring connection on command errors.
# Fourth exchange: the Monitor closes the connection and runs the
# handshake on a new connection.
request = server.receives("ismaster")
# No $clusterTime in first ismaster, only in subsequent ones
self.assertNotIn("$clusterTime", request)
# Reply without $clusterTime.
reply.pop("$clusterTime")
request.reply(reply)
# Fifth exchange: the Monitor attempt uses the clusterTime from
# the previous isMaster error.
request = server.receives("ismaster")
self.assertEqual(request["$clusterTime"]["clusterTime"], cluster_time)
request.reply(reply)
for _ in range(3):
request = server.receives("ismaster")
# No $clusterTime in heartbeats or handshakes.
self.assertNotIn("$clusterTime", request)
request.ok(reply)
client.close()
def test_collection_bulk_error(self):

View File

@ -211,15 +211,10 @@ class TestCMAP(IntegrationTest):
self.check_object(actual, expected)
self.assertIn(message, str(actual))
def _set_fail_point(self, client, command_args):
cmd = SON([("configureFailPoint", "failCommand")])
cmd.update(command_args)
client.admin.command(cmd)
def set_fail_point(self, command_args):
if not client_context.supports_failCommand_fail_point:
self.skipTest("failCommand fail point must be supported")
self._set_fail_point(self.client, command_args)
self.configure_fail_point(self.client, command_args)
def run_scenario(self, scenario_def, test):
"""Run a CMAP spec test."""

View File

@ -425,6 +425,21 @@ class TestDatabase(IntegrationTest):
for doc in result["cursor"]["firstBatch"]:
self.assertTrue(isinstance(doc["r"], Regex))
def test_command_bulkWrite(self):
# Ensure bulk write commands can be run directly via db.command().
if client_context.version.at_least(8, 0):
self.client.admin.command(
{
"bulkWrite": 1,
"nsInfo": [{"ns": self.db.test.full_name}],
"ops": [{"insert": 0, "document": {}}],
}
)
self.db.command({"insert": "test", "documents": [{}]})
self.db.command({"update": "test", "updates": [{"q": {}, "u": {"$set": {"x": 1}}}]})
self.db.command({"delete": "test", "deletes": [{"q": {}, "limit": 1}]})
self.db.test.drop()
def test_cursor_command(self):
db = self.client.pymongo_test
db.test.drop()

View File

@ -244,7 +244,7 @@ class TestClusterTimeComparison(unittest.TestCase):
def test_cluster_time_comparison(self):
t = create_mock_topology("mongodb://host")
def send_cluster_time(time, inc, should_update):
def send_cluster_time(time, inc):
old = t.max_cluster_time()
new = {"clusterTime": Timestamp(time, inc)}
got_hello(
@ -259,16 +259,14 @@ class TestClusterTimeComparison(unittest.TestCase):
)
actual = t.max_cluster_time()
if should_update:
self.assertEqual(actual, new)
else:
self.assertEqual(actual, old)
# We never update $clusterTime from monitoring connections.
self.assertEqual(actual, old)
send_cluster_time(0, 1, True)
send_cluster_time(2, 2, True)
send_cluster_time(2, 1, False)
send_cluster_time(1, 3, False)
send_cluster_time(2, 3, True)
send_cluster_time(0, 1)
send_cluster_time(2, 2)
send_cluster_time(2, 1)
send_cluster_time(1, 3)
send_cluster_time(2, 3)
class TestIgnoreStaleErrors(IntegrationTest):

View File

@ -15,6 +15,7 @@
"""Test the monitor module."""
from __future__ import annotations
import asyncio
import gc
import subprocess
import sys
@ -23,7 +24,7 @@ from functools import partial
sys.path[0:0] = [""]
from test import IntegrationTest, connected, unittest
from test import IntegrationTest, client_context, connected, unittest
from test.utils import (
ServerAndTopologyEventListener,
wait_until,
@ -31,6 +32,8 @@ from test.utils import (
from pymongo.periodic_executor import _EXECUTORS
_IS_SYNC = True
def unregistered(ref):
gc.collect()
@ -55,8 +58,8 @@ class TestMonitor(IntegrationTest):
return client
def test_cleanup_executors_on_client_del(self):
with warnings.catch_warnings():
warnings.simplefilter("ignore")
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
client = self.create_client()
executors = get_executors(client)
self.assertEqual(len(executors), 4)
@ -70,6 +73,19 @@ class TestMonitor(IntegrationTest):
for ref, name in executor_refs:
wait_until(partial(unregistered, ref), f"unregister executor: {name}", timeout=5)
def resource_warning_caught():
gc.collect()
for warning in w:
if (
issubclass(warning.category, ResourceWarning)
and "Call MongoClient.close() to safely shut down your client and free up resources."
in str(warning.message)
):
return True
return False
wait_until(resource_warning_caught, "catch resource warning")
def test_cleanup_executors_on_client_close(self):
client = self.create_client()
executors = get_executors(client)
@ -80,10 +96,15 @@ class TestMonitor(IntegrationTest):
for executor in executors:
wait_until(lambda: executor._stopped, f"closed executor: {executor._name}", timeout=5)
@client_context.require_sync
def test_no_thread_start_runtime_err_on_shutdown(self):
"""Test we silence noisy runtime errors fired when the MongoClient spawns a new thread
on process shutdown."""
command = [sys.executable, "-c", "from pymongo import MongoClient; c = MongoClient()"]
command = [
sys.executable,
"-c",
"from pymongo import MongoClient; c = MongoClient()",
]
completed_process: subprocess.CompletedProcess = subprocess.run(
command, capture_output=True
)

View File

@ -36,8 +36,10 @@ from test import (
client_context,
unittest,
)
from test.helpers import client_knobs
from test.utils import (
EventListener,
HeartbeatEventListener,
OvertCommandListener,
wait_until,
)
@ -1121,10 +1123,10 @@ class TestClusterTime(IntegrationTest):
if "$clusterTime" not in (client_context.hello):
raise SkipTest("$clusterTime not supported")
# Sessions prose test: 3) $clusterTime in commands
def test_cluster_time(self):
listener = SessionTestListener()
# Prevent heartbeats from updating $clusterTime between operations.
client = self.rs_or_single_client(event_listeners=[listener], heartbeatFrequencyMS=999999)
client = self.rs_or_single_client(event_listeners=[listener])
collection = client.pymongo_test.collection
# Prepare for tests of find() and aggregate().
collection.insert_many([{} for _ in range(10)])
@ -1203,6 +1205,38 @@ class TestClusterTime(IntegrationTest):
f"{f.__name__} sent wrong $clusterTime with {event.command_name}",
)
# Sessions prose test: 20) Drivers do not gossip `$clusterTime` on SDAM commands
def test_cluster_time_not_used_by_sdam(self):
heartbeat_listener = HeartbeatEventListener()
cmd_listener = OvertCommandListener()
with client_knobs(min_heartbeat_interval=0.01):
c1 = self.single_client(
event_listeners=[heartbeat_listener, cmd_listener], heartbeatFrequencyMS=10
)
cluster_time = (c1.admin.command({"ping": 1}))["$clusterTime"]
self.assertEqual(c1._topology.max_cluster_time(), cluster_time)
# Advance the server's $clusterTime by performing an insert via another client.
self.db.test.insert_one({"advance": "$clusterTime"})
# Wait until the client C1 processes the next pair of SDAM heartbeat started + succeeded events.
heartbeat_listener.reset()
def next_heartbeat():
events = heartbeat_listener.events
for i in range(len(events) - 1):
if isinstance(events[i], monitoring.ServerHeartbeatStartedEvent):
if isinstance(events[i + 1], monitoring.ServerHeartbeatSucceededEvent):
return True
return False
wait_until(next_heartbeat, "never found pair of heartbeat started + succeeded events")
# Assert that C1's max $clusterTime is still the same and has not been updated by SDAM.
cmd_listener.reset()
c1.admin.command({"ping": 1})
started = cmd_listener.started_events[0]
self.assertEqual(started.command_name, "ping")
self.assertEqual(started.command["$clusterTime"], cluster_time)
if __name__ == "__main__":
unittest.main()

View File

@ -402,15 +402,10 @@ class TestTransactionsConvenientAPI(TransactionsBase):
for address in client_context.mongoses:
self.mongos_clients.append(self.single_client("{}:{}".format(*address)))
def _set_fail_point(self, client, command_args):
cmd = {"configureFailPoint": "failCommand"}
cmd.update(command_args)
client.admin.command(cmd)
def set_fail_point(self, command_args):
clients = self.mongos_clients if self.mongos_clients else [self.client]
for client in clients:
self._set_fail_point(client, command_args)
self.configure_fail_point(client, command_args)
@client_context.require_transactions
def test_callback_raises_custom_error(self):

View File

@ -999,12 +999,8 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
if not client_context.test_commands_enabled:
self.skipTest("Test commands must be enabled")
cmd_on = SON([("configureFailPoint", "failCommand")])
cmd_on.update(command_args)
client.admin.command(cmd_on)
self.addCleanup(
client.admin.command, "configureFailPoint", cmd_on["configureFailPoint"], mode="off"
)
self.configure_fail_point(client, command_args)
self.addCleanup(self.configure_fail_point, client, command_args, off=True)
def _testOperation_failPoint(self, spec):
self.__set_fail_point(

View File

@ -264,15 +264,10 @@ class SpecRunner(IntegrationTest):
def tearDown(self) -> None:
self.knobs.disable()
def _set_fail_point(self, client, command_args):
cmd = SON([("configureFailPoint", "failCommand")])
cmd.update(command_args)
client.admin.command(cmd)
def set_fail_point(self, command_args):
clients = self.mongos_clients if self.mongos_clients else [self.client]
for client in clients:
self._set_fail_point(client, command_args)
self.configure_fail_point(client, command_args)
def targeted_fail_point(self, session, fail_point):
"""Run the targetedFailPoint test operation.
@ -281,7 +276,7 @@ class SpecRunner(IntegrationTest):
"""
clients = {c.address: c for c in self.mongos_clients}
client = clients[session._pinned_address]
self._set_fail_point(client, fail_point)
self.configure_fail_point(client, fail_point)
self.addCleanup(self.set_fail_point, {"mode": "off"})
def assert_session_pinned(self, session):

View File

@ -226,6 +226,7 @@ converted_tests = [
"test_load_balancer.py",
"test_logger.py",
"test_max_staleness.py",
"test_monitor.py",
"test_monitoring.py",
"test_mongos_load_balancing.py",
"test_on_demand_csfle.py",