PYTHON-5344 and PYTHON-5403 Allow Instantiated MongoClients to Send Client Metadata On-Demand (#2358)

This commit is contained in:
Iris 2025-06-24 09:34:53 -07:00 committed by GitHub
parent e2bfa9a590
commit 65f7c54208
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 599 additions and 12 deletions

View File

@ -131,6 +131,9 @@ do
gridfs)
cpjson gridfs/tests gridfs
;;
handshake)
cpjson mongodb-handshake/tests handshake
;;
index|index-management)
cpjson index-management/tests index_management
;;

View File

@ -7,6 +7,9 @@ PyMongo 4.14 brings a number of changes including:
- Added :attr:`bson.codec_options.TypeRegistry.codecs` and :attr:`bson.codec_options.TypeRegistry.fallback_encoder` properties
to allow users to directly access the type codecs and fallback encoder for a given :class:`bson.codec_options.TypeRegistry`.
- Added :meth:`pymongo.asynchronous.mongo_client.AsyncMongoClient.append_metadata` and
:meth:`pymongo.mongo_client.MongoClient.append_metadata` to allow instantiated MongoClients to send client metadata
on-demand
- Introduces a minor breaking change. When encoding :class:`bson.binary.BinaryVector`, a ``ValueError`` will be raised
if the 'padding' metadata field is < 0 or > 7, or non-zero for any type other than PACKED_BIT.

View File

@ -70,6 +70,7 @@ from pymongo.asynchronous.command_cursor import AsyncCommandCursor
from pymongo.asynchronous.settings import TopologySettings
from pymongo.asynchronous.topology import Topology, _ErrorContext
from pymongo.client_options import ClientOptions
from pymongo.driver_info import DriverInfo
from pymongo.errors import (
AutoReconnect,
BulkWriteError,
@ -1040,6 +1041,20 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]):
self._kill_cursors_executor = executor
self._opened = False
def append_metadata(self, driver_info: DriverInfo) -> None:
"""Appends the given metadata to existing driver metadata.
:param driver_info: a :class:`~pymongo.driver_info.DriverInfo`
.. versionadded:: 4.14
"""
if not isinstance(driver_info, DriverInfo):
raise TypeError(
f"driver_info must be an instance of DriverInfo, not {type(driver_info)}"
)
self._options.pool_options._update_metadata(driver_info)
def _should_pin_cursor(self, session: Optional[AsyncClientSession]) -> Optional[bool]:
return self._options.load_balanced and not (session and session.in_transaction)

View File

@ -376,18 +376,7 @@ class PoolOptions:
"async",
)
if driver:
if driver.name:
self.__metadata["driver"]["name"] = "{}|{}".format(
self.__metadata["driver"]["name"],
driver.name,
)
if driver.version:
self.__metadata["driver"]["version"] = "{}|{}".format(
_METADATA["driver"]["version"],
driver.version,
)
if driver.platform:
self.__metadata["platform"] = "{}|{}".format(_METADATA["platform"], driver.platform)
self._update_metadata(driver)
env = _metadata_env()
if env:
@ -395,6 +384,25 @@ class PoolOptions:
_truncate_metadata(self.__metadata)
def _update_metadata(self, driver: DriverInfo) -> None:
"""Updates the client's metadata"""
metadata = copy.deepcopy(self.__metadata)
if driver.name:
metadata["driver"]["name"] = "{}|{}".format(
metadata["driver"]["name"],
driver.name,
)
if driver.version:
metadata["driver"]["version"] = "{}|{}".format(
metadata["driver"]["version"],
driver.version,
)
if driver.platform:
metadata["platform"] = "{}|{}".format(metadata["platform"], driver.platform)
self.__metadata = metadata
@property
def _credentials(self) -> Optional[MongoCredential]:
"""A :class:`~pymongo.auth.MongoCredentials` instance or None."""

View File

@ -62,6 +62,7 @@ from bson.codec_options import DEFAULT_CODEC_OPTIONS, CodecOptions, TypeRegistry
from bson.timestamp import Timestamp
from pymongo import _csot, common, helpers_shared, periodic_executor
from pymongo.client_options import ClientOptions
from pymongo.driver_info import DriverInfo
from pymongo.errors import (
AutoReconnect,
BulkWriteError,
@ -1040,6 +1041,20 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
self._kill_cursors_executor = executor
self._opened = False
def append_metadata(self, driver_info: DriverInfo) -> None:
"""Appends the given metadata to existing driver metadata.
:param driver_info: a :class:`~pymongo.driver_info.DriverInfo`
.. versionadded:: 4.14
"""
if not isinstance(driver_info, DriverInfo):
raise TypeError(
f"driver_info must be an instance of DriverInfo, not {type(driver_info)}"
)
self._options.pool_options._update_metadata(driver_info)
def _should_pin_cursor(self, session: Optional[ClientSession]) -> Optional[bool]:
return self._options.load_balanced and not (session and session.in_transaction)

View File

@ -0,0 +1,215 @@
# Copyright 2013-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import asyncio
import os
import pathlib
import time
import unittest
from test.asynchronous import AsyncIntegrationTest
from test.asynchronous.unified_format import generate_test_classes
from test.utils_shared import CMAPListener
from typing import Any, Optional
import pytest
from pymongo import AsyncMongoClient
from pymongo.driver_info import DriverInfo
from pymongo.monitoring import ConnectionClosedEvent
try:
from mockupdb import MockupDB, OpMsgReply
_HAVE_MOCKUPDB = True
except ImportError:
_HAVE_MOCKUPDB = False
pytestmark = pytest.mark.mockupdb
_IS_SYNC = False
# Location of JSON test specifications.
if _IS_SYNC:
_TEST_PATH = os.path.join(pathlib.Path(__file__).resolve().parent, "handshake", "unified")
else:
_TEST_PATH = os.path.join(
pathlib.Path(__file__).resolve().parent.parent, "handshake", "unified"
)
# Generate unified tests.
globals().update(generate_test_classes(_TEST_PATH, module=__name__))
def _get_handshake_driver_info(request):
assert "client" in request
return request["client"]
class TestClientMetadataProse(AsyncIntegrationTest):
async def asyncSetUp(self):
await super().asyncSetUp()
self.server = MockupDB()
self.handshake_req = None
def respond(r):
if "ismaster" in r:
# then this is a handshake request
self.handshake_req = r
return r.reply(OpMsgReply(maxWireVersion=13))
self.server.autoresponds(respond)
self.server.run()
self.addAsyncCleanup(self.server.stop)
async def send_ping_and_get_metadata(
self, client: AsyncMongoClient, is_handshake: bool
) -> tuple[str, Optional[str], Optional[str], dict[str, Any]]:
# reset if handshake request
if is_handshake:
self.handshake_req: Optional[dict] = None
await client.admin.command("ping")
metadata = _get_handshake_driver_info(self.handshake_req)
driver_metadata = metadata["driver"]
name, version, platform = (
driver_metadata["name"],
driver_metadata["version"],
metadata["platform"],
)
return name, version, platform, metadata
async def check_metadata_added(
self,
client: AsyncMongoClient,
add_name: str,
add_version: Optional[str],
add_platform: Optional[str],
) -> None:
# send initial metadata
name, version, platform, metadata = await self.send_ping_and_get_metadata(client, True)
# wait for connection to become idle
await asyncio.sleep(0.005)
# add new metadata
client.append_metadata(DriverInfo(add_name, add_version, add_platform))
new_name, new_version, new_platform, new_metadata = await self.send_ping_and_get_metadata(
client, True
)
self.assertEqual(new_name, f"{name}|{add_name}" if add_name is not None else name)
self.assertEqual(
new_version,
f"{version}|{add_version}" if add_version is not None else version,
)
self.assertEqual(
new_platform,
f"{platform}|{add_platform}" if add_platform is not None else platform,
)
metadata.pop("driver")
metadata.pop("platform")
new_metadata.pop("driver")
new_metadata.pop("platform")
self.assertEqual(metadata, new_metadata)
async def test_append_metadata(self):
client = await self.async_rs_or_single_client(
"mongodb://" + self.server.address_string,
maxIdleTimeMS=1,
driver=DriverInfo("library", "1.2", "Library Platform"),
)
await self.check_metadata_added(client, "framework", "2.0", "Framework Platform")
async def test_append_metadata_platform_none(self):
client = await self.async_rs_or_single_client(
"mongodb://" + self.server.address_string,
maxIdleTimeMS=1,
driver=DriverInfo("library", "1.2", "Library Platform"),
)
await self.check_metadata_added(client, "framework", "2.0", None)
async def test_append_metadata_version_none(self):
client = await self.async_rs_or_single_client(
"mongodb://" + self.server.address_string,
maxIdleTimeMS=1,
driver=DriverInfo("library", "1.2", "Library Platform"),
)
await self.check_metadata_added(client, "framework", None, "Framework Platform")
async def test_append_metadata_platform_version_none(self):
client = await self.async_rs_or_single_client(
"mongodb://" + self.server.address_string,
maxIdleTimeMS=1,
driver=DriverInfo("library", "1.2", "Library Platform"),
)
await self.check_metadata_added(client, "framework", None, None)
async def test_multiple_successive_metadata_updates(self):
client = await self.async_rs_or_single_client(
"mongodb://" + self.server.address_string, maxIdleTimeMS=1, connect=False
)
client.append_metadata(DriverInfo("library", "1.2", "Library Platform"))
await self.check_metadata_added(client, "framework", "2.0", "Framework Platform")
async def test_multiple_successive_metadata_updates_platform_none(self):
client = await self.async_rs_or_single_client(
"mongodb://" + self.server.address_string,
maxIdleTimeMS=1,
)
client.append_metadata(DriverInfo("library", "1.2", "Library Platform"))
await self.check_metadata_added(client, "framework", "2.0", None)
async def test_multiple_successive_metadata_updates_version_none(self):
client = await self.async_rs_or_single_client(
"mongodb://" + self.server.address_string,
maxIdleTimeMS=1,
)
client.append_metadata(DriverInfo("library", "1.2", "Library Platform"))
await self.check_metadata_added(client, "framework", None, "Framework Platform")
async def test_multiple_successive_metadata_updates_platform_version_none(self):
client = await self.async_rs_or_single_client(
"mongodb://" + self.server.address_string,
maxIdleTimeMS=1,
)
client.append_metadata(DriverInfo("library", "1.2", "Library Platform"))
await self.check_metadata_added(client, "framework", None, None)
async def test_doesnt_update_established_connections(self):
listener = CMAPListener()
client = await self.async_rs_or_single_client(
"mongodb://" + self.server.address_string,
maxIdleTimeMS=1,
driver=DriverInfo("library", "1.2", "Library Platform"),
event_listeners=[listener],
)
# send initial metadata
name, version, platform, metadata = await self.send_ping_and_get_metadata(client, True)
self.assertIsNotNone(name)
self.assertIsNotNone(version)
self.assertIsNotNone(platform)
# add data
add_name, add_version, add_platform = "framework", "2.0", "Framework Platform"
client.append_metadata(DriverInfo(add_name, add_version, add_platform))
# check new data isn't sent
self.handshake_req: Optional[dict] = None
await client.admin.command("ping")
self.assertIsNone(self.handshake_req)
self.assertEqual(listener.event_count(ConnectionClosedEvent), 0)
if __name__ == "__main__":
unittest.main()

View File

@ -75,6 +75,7 @@ from pymongo.asynchronous.command_cursor import AsyncCommandCursor
from pymongo.asynchronous.database import AsyncDatabase
from pymongo.asynchronous.encryption import AsyncClientEncryption
from pymongo.asynchronous.helpers import anext
from pymongo.driver_info import DriverInfo
from pymongo.encryption_options import _HAVE_PYMONGOCRYPT
from pymongo.errors import (
AutoReconnect,
@ -813,6 +814,11 @@ class UnifiedSpecTestMixinV1(AsyncIntegrationTest):
self.__raise_if_unsupported("close", target, NonLazyCursor, AsyncCommandCursor)
return await target.close()
async def _clientOperation_appendMetadata(self, target, *args, **kwargs):
info_opts = kwargs["driver_info_options"]
driver_info = DriverInfo(info_opts["name"], info_opts["version"], info_opts["platform"])
target.append_metadata(driver_info)
async def _clientEncryptionOperation_createDataKey(self, target, *args, **kwargs):
if "opts" in kwargs:
kwargs.update(camel_to_snake_args(kwargs.pop("opts")))

View File

@ -0,0 +1,100 @@
{
"description": "client metadata is not propagated to the server",
"schemaVersion": "1.9",
"runOnRequirements": [
{
"minServerVersion": "6.0"
}
],
"createEntities": [
{
"client": {
"id": "client",
"observeEvents": [
"commandSucceededEvent",
"commandFailedEvent",
"connectionClosedEvent",
"connectionCreatedEvent"
]
}
},
{
"database": {
"id": "database",
"client": "client",
"databaseName": "test"
}
}
],
"tests": [
{
"description": "metadata append does not create new connections or close existing ones and no hello command is sent",
"operations": [
{
"name": "runCommand",
"object": "database",
"arguments": {
"commandName": "ping",
"command": {
"ping": 1
}
},
"expectResult": {
"ok": 1
}
},
{
"name": "appendMetadata",
"object": "client",
"arguments": {
"driverInfoOptions": {
"name": "framework",
"version": "2.0",
"platform": "Framework Platform"
}
}
},
{
"name": "runCommand",
"object": "database",
"arguments": {
"commandName": "ping",
"command": {
"ping": 1
}
},
"expectResult": {
"ok": 1
}
}
],
"expectEvents": [
{
"client": "client",
"eventType": "cmap",
"events": [
{
"connectionCreatedEvent": {}
}
]
},
{
"client": "client",
"eventType": "command",
"events": [
{
"commandSucceededEvent": {
"commandName": "ping"
}
},
{
"commandSucceededEvent": {
"commandName": "ping"
}
}
]
}
]
}
]
}

View File

@ -0,0 +1,215 @@
# Copyright 2013-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import asyncio
import os
import pathlib
import time
import unittest
from test import IntegrationTest
from test.unified_format import generate_test_classes
from test.utils_shared import CMAPListener
from typing import Any, Optional
import pytest
from pymongo import MongoClient
from pymongo.driver_info import DriverInfo
from pymongo.monitoring import ConnectionClosedEvent
try:
from mockupdb import MockupDB, OpMsgReply
_HAVE_MOCKUPDB = True
except ImportError:
_HAVE_MOCKUPDB = False
pytestmark = pytest.mark.mockupdb
_IS_SYNC = True
# Location of JSON test specifications.
if _IS_SYNC:
_TEST_PATH = os.path.join(pathlib.Path(__file__).resolve().parent, "handshake", "unified")
else:
_TEST_PATH = os.path.join(
pathlib.Path(__file__).resolve().parent.parent, "handshake", "unified"
)
# Generate unified tests.
globals().update(generate_test_classes(_TEST_PATH, module=__name__))
def _get_handshake_driver_info(request):
assert "client" in request
return request["client"]
class TestClientMetadataProse(IntegrationTest):
def setUp(self):
super().setUp()
self.server = MockupDB()
self.handshake_req = None
def respond(r):
if "ismaster" in r:
# then this is a handshake request
self.handshake_req = r
return r.reply(OpMsgReply(maxWireVersion=13))
self.server.autoresponds(respond)
self.server.run()
self.addCleanup(self.server.stop)
def send_ping_and_get_metadata(
self, client: MongoClient, is_handshake: bool
) -> tuple[str, Optional[str], Optional[str], dict[str, Any]]:
# reset if handshake request
if is_handshake:
self.handshake_req: Optional[dict] = None
client.admin.command("ping")
metadata = _get_handshake_driver_info(self.handshake_req)
driver_metadata = metadata["driver"]
name, version, platform = (
driver_metadata["name"],
driver_metadata["version"],
metadata["platform"],
)
return name, version, platform, metadata
def check_metadata_added(
self,
client: MongoClient,
add_name: str,
add_version: Optional[str],
add_platform: Optional[str],
) -> None:
# send initial metadata
name, version, platform, metadata = self.send_ping_and_get_metadata(client, True)
# wait for connection to become idle
time.sleep(0.005)
# add new metadata
client.append_metadata(DriverInfo(add_name, add_version, add_platform))
new_name, new_version, new_platform, new_metadata = self.send_ping_and_get_metadata(
client, True
)
self.assertEqual(new_name, f"{name}|{add_name}" if add_name is not None else name)
self.assertEqual(
new_version,
f"{version}|{add_version}" if add_version is not None else version,
)
self.assertEqual(
new_platform,
f"{platform}|{add_platform}" if add_platform is not None else platform,
)
metadata.pop("driver")
metadata.pop("platform")
new_metadata.pop("driver")
new_metadata.pop("platform")
self.assertEqual(metadata, new_metadata)
def test_append_metadata(self):
client = self.rs_or_single_client(
"mongodb://" + self.server.address_string,
maxIdleTimeMS=1,
driver=DriverInfo("library", "1.2", "Library Platform"),
)
self.check_metadata_added(client, "framework", "2.0", "Framework Platform")
def test_append_metadata_platform_none(self):
client = self.rs_or_single_client(
"mongodb://" + self.server.address_string,
maxIdleTimeMS=1,
driver=DriverInfo("library", "1.2", "Library Platform"),
)
self.check_metadata_added(client, "framework", "2.0", None)
def test_append_metadata_version_none(self):
client = self.rs_or_single_client(
"mongodb://" + self.server.address_string,
maxIdleTimeMS=1,
driver=DriverInfo("library", "1.2", "Library Platform"),
)
self.check_metadata_added(client, "framework", None, "Framework Platform")
def test_append_metadata_platform_version_none(self):
client = self.rs_or_single_client(
"mongodb://" + self.server.address_string,
maxIdleTimeMS=1,
driver=DriverInfo("library", "1.2", "Library Platform"),
)
self.check_metadata_added(client, "framework", None, None)
def test_multiple_successive_metadata_updates(self):
client = self.rs_or_single_client(
"mongodb://" + self.server.address_string, maxIdleTimeMS=1, connect=False
)
client.append_metadata(DriverInfo("library", "1.2", "Library Platform"))
self.check_metadata_added(client, "framework", "2.0", "Framework Platform")
def test_multiple_successive_metadata_updates_platform_none(self):
client = self.rs_or_single_client(
"mongodb://" + self.server.address_string,
maxIdleTimeMS=1,
)
client.append_metadata(DriverInfo("library", "1.2", "Library Platform"))
self.check_metadata_added(client, "framework", "2.0", None)
def test_multiple_successive_metadata_updates_version_none(self):
client = self.rs_or_single_client(
"mongodb://" + self.server.address_string,
maxIdleTimeMS=1,
)
client.append_metadata(DriverInfo("library", "1.2", "Library Platform"))
self.check_metadata_added(client, "framework", None, "Framework Platform")
def test_multiple_successive_metadata_updates_platform_version_none(self):
client = self.rs_or_single_client(
"mongodb://" + self.server.address_string,
maxIdleTimeMS=1,
)
client.append_metadata(DriverInfo("library", "1.2", "Library Platform"))
self.check_metadata_added(client, "framework", None, None)
def test_doesnt_update_established_connections(self):
listener = CMAPListener()
client = self.rs_or_single_client(
"mongodb://" + self.server.address_string,
maxIdleTimeMS=1,
driver=DriverInfo("library", "1.2", "Library Platform"),
event_listeners=[listener],
)
# send initial metadata
name, version, platform, metadata = self.send_ping_and_get_metadata(client, True)
self.assertIsNotNone(name)
self.assertIsNotNone(version)
self.assertIsNotNone(platform)
# add data
add_name, add_version, add_platform = "framework", "2.0", "Framework Platform"
client.append_metadata(DriverInfo(add_name, add_version, add_platform))
# check new data isn't sent
self.handshake_req: Optional[dict] = None
client.admin.command("ping")
self.assertIsNone(self.handshake_req)
self.assertEqual(listener.event_count(ConnectionClosedEvent), 0)
if __name__ == "__main__":
unittest.main()

View File

@ -67,6 +67,7 @@ from bson.codec_options import DEFAULT_CODEC_OPTIONS
from bson.objectid import ObjectId
from gridfs import GridFSBucket, GridOut, NoFile
from pymongo import ASCENDING, CursorType, MongoClient, _csot
from pymongo.driver_info import DriverInfo
from pymongo.encryption_options import _HAVE_PYMONGOCRYPT
from pymongo.errors import (
AutoReconnect,
@ -810,6 +811,11 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
self.__raise_if_unsupported("close", target, NonLazyCursor, CommandCursor)
return target.close()
def _clientOperation_appendMetadata(self, target, *args, **kwargs):
info_opts = kwargs["driver_info_options"]
driver_info = DriverInfo(info_opts["name"], info_opts["version"], info_opts["platform"])
target.append_metadata(driver_info)
def _clientEncryptionOperation_createDataKey(self, target, *args, **kwargs):
if "opts" in kwargs:
kwargs.update(camel_to_snake_args(kwargs.pop("opts")))

View File

@ -212,6 +212,7 @@ converted_tests = [
"test_client.py",
"test_client_bulk_write.py",
"test_client_context.py",
"test_client_metadata.py",
"test_collation.py",
"test_collection.py",
"test_collection_management.py",