From 653ea8b8d29de07fa31aacf753a0ab884f90d0d2 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Wed, 4 Sep 2024 14:53:21 -0700 Subject: [PATCH 01/10] PYTHON-4164 Document support for KMIP delegated master_key (#1830) --- doc/changelog.rst | 2 ++ pymongo/asynchronous/encryption.py | 3 +++ pymongo/synchronous/encryption.py | 3 +++ .../spec/legacy/fle2v2-Rangev2-Compact.json | 3 ++- 4 files changed, 10 insertions(+), 1 deletion(-) diff --git a/doc/changelog.rst b/doc/changelog.rst index 42a4fdf50..2d574ee8c 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -11,6 +11,8 @@ PyMongo 4.9 brings a number of improvements including: - Added support for In-Use Encryption range queries with MongoDB 8.0. Added :attr:`~pymongo.encryption.Algorithm.RANGE`. ``sparsity`` and ``trim_factor`` are now optional in :class:`~pymongo.encryption_options.RangeOpts`. +- Added support for the "delegated" option for the KMIP ``master_key`` in + :meth:`~pymongo.encryption.ClientEncryption.create_data_key`. - pymongocrypt>=1.10 is now required for :ref:`In-Use Encryption` support. - Added :meth:`~pymongo.cursor.Cursor.to_list` to :class:`~pymongo.cursor.Cursor`, :class:`~pymongo.command_cursor.CommandCursor`, diff --git a/pymongo/asynchronous/encryption.py b/pymongo/asynchronous/encryption.py index c4cb886df..c9e3cadd6 100644 --- a/pymongo/asynchronous/encryption.py +++ b/pymongo/asynchronous/encryption.py @@ -764,6 +764,9 @@ class AsyncClientEncryption(Generic[_DocumentType]): Secret Data managed object. - `endpoint` (string): Optional. Host with optional port, e.g. "example.vault.azure.net:". + - `delegated` (bool): Optional. If True (recommended), the + KMIP server will perform encryption and decryption. If + delegated is not provided, defaults to false. :param key_alt_names: An optional list of string alternate names used to reference a key. If a key is created with alternate diff --git a/pymongo/synchronous/encryption.py b/pymongo/synchronous/encryption.py index 2efa99597..3849cf3f2 100644 --- a/pymongo/synchronous/encryption.py +++ b/pymongo/synchronous/encryption.py @@ -762,6 +762,9 @@ class ClientEncryption(Generic[_DocumentType]): Secret Data managed object. - `endpoint` (string): Optional. Host with optional port, e.g. "example.vault.azure.net:". + - `delegated` (bool): Optional. If True (recommended), the + KMIP server will perform encryption and decryption. If + delegated is not provided, defaults to false. :param key_alt_names: An optional list of string alternate names used to reference a key. If a key is created with alternate diff --git a/test/client-side-encryption/spec/legacy/fle2v2-Rangev2-Compact.json b/test/client-side-encryption/spec/legacy/fle2v2-Rangev2-Compact.json index 59241927c..bba9f2553 100644 --- a/test/client-side-encryption/spec/legacy/fle2v2-Rangev2-Compact.json +++ b/test/client-side-encryption/spec/legacy/fle2v2-Rangev2-Compact.json @@ -6,7 +6,8 @@ "replicaset", "sharded", "load-balanced" - ] + ], + "serverless": "forbid" } ], "database_name": "default", From e27b428914279b7846bed7660836aa1d31986a9b Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Wed, 4 Sep 2024 14:53:32 -0700 Subject: [PATCH 02/10] PYTHON-4150 Document compatibility with MongoDB 3.6 will soon be dropped (#1829) --- doc/changelog.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/doc/changelog.rst b/doc/changelog.rst index 2d574ee8c..c5a4f47d7 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -4,6 +4,9 @@ Changelog Changes in Version 4.9.0 ------------------------- +.. warning:: Driver support for MongoDB 3.6 reached end of life in April 2024. + PyMongo 4.9 will be the last release to support MongoDB 3.6. + PyMongo 4.9 brings a number of improvements including: - Added support for MongoDB 8.0. From 4d4813070dfb79581c755253d0fe69dec43bc3be Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Wed, 4 Sep 2024 19:40:37 -0500 Subject: [PATCH 03/10] PYTHON-4667 Handle $clusterTime from error responses in client Bulk Write (#1822) --- pymongo/asynchronous/bulk.py | 5 ++++- pymongo/asynchronous/client_bulk.py | 8 +++++++- pymongo/synchronous/bulk.py | 5 ++++- pymongo/synchronous/client_bulk.py | 8 +++++++- test/mockupdb/test_cluster_time.py | 31 ++++++++++++++++++++++++++--- 5 files changed, 50 insertions(+), 7 deletions(-) diff --git a/pymongo/asynchronous/bulk.py b/pymongo/asynchronous/bulk.py index c200899dd..9fd673693 100644 --- a/pymongo/asynchronous/bulk.py +++ b/pymongo/asynchronous/bulk.py @@ -281,6 +281,7 @@ class _AsyncBulk: ) if bwc.publish: bwc._succeed(request_id, reply, duration) # type: ignore[arg-type] + await client._process_response(reply, bwc.session) # type: ignore[arg-type] except Exception as exc: duration = datetime.datetime.now() - bwc.start_time if isinstance(exc, (NotPrimaryError, OperationFailure)): @@ -308,6 +309,9 @@ class _AsyncBulk: if bwc.publish: bwc._fail(request_id, failure, duration) + # Process the response from the server. + if isinstance(exc, (NotPrimaryError, OperationFailure)): + await client._process_response(exc.details, bwc.session) # type: ignore[arg-type] raise finally: bwc.start_time = datetime.datetime.now() @@ -449,7 +453,6 @@ class _AsyncBulk: else: request_id, msg, to_send = bwc.batch_command(cmd, ops) result = await self.write_command(bwc, cmd, request_id, msg, to_send, client) # type: ignore[arg-type] - await client._process_response(result, bwc.session) # type: ignore[arg-type] return result, to_send # type: ignore[return-value] diff --git a/pymongo/asynchronous/client_bulk.py b/pymongo/asynchronous/client_bulk.py index b9ab6b876..15a0369f4 100644 --- a/pymongo/asynchronous/client_bulk.py +++ b/pymongo/asynchronous/client_bulk.py @@ -283,6 +283,8 @@ class _AsyncClientBulk: ) if bwc.publish: bwc._succeed(request_id, reply, duration) # type: ignore[arg-type] + # Process the response from the server. + await self.client._process_response(reply, bwc.session) # type: ignore[arg-type] except Exception as exc: duration = datetime.datetime.now() - bwc.start_time if isinstance(exc, (NotPrimaryError, OperationFailure)): @@ -312,6 +314,11 @@ class _AsyncClientBulk: bwc._fail(request_id, failure, duration) # Top-level error will be embedded in ClientBulkWriteException. reply = {"error": exc} + # Process the response from the server. + if isinstance(exc, OperationFailure): + await self.client._process_response(exc.details, bwc.session) # type: ignore[arg-type] + else: + await self.client._process_response({}, bwc.session) # type: ignore[arg-type] finally: bwc.start_time = datetime.datetime.now() return reply # type: ignore[return-value] @@ -431,7 +438,6 @@ class _AsyncClientBulk: result = await self.write_command( bwc, cmd, request_id, msg, to_send_ops, to_send_ns, self.client ) # type: ignore[arg-type] - await self.client._process_response(result, bwc.session) # type: ignore[arg-type] return result, to_send_ops, to_send_ns # type: ignore[return-value] async def _process_results_cursor( diff --git a/pymongo/synchronous/bulk.py b/pymongo/synchronous/bulk.py index 4da64c4a7..27fcff620 100644 --- a/pymongo/synchronous/bulk.py +++ b/pymongo/synchronous/bulk.py @@ -281,6 +281,7 @@ class _Bulk: ) if bwc.publish: bwc._succeed(request_id, reply, duration) # type: ignore[arg-type] + client._process_response(reply, bwc.session) # type: ignore[arg-type] except Exception as exc: duration = datetime.datetime.now() - bwc.start_time if isinstance(exc, (NotPrimaryError, OperationFailure)): @@ -308,6 +309,9 @@ class _Bulk: if bwc.publish: bwc._fail(request_id, failure, duration) + # Process the response from the server. + if isinstance(exc, (NotPrimaryError, OperationFailure)): + client._process_response(exc.details, bwc.session) # type: ignore[arg-type] raise finally: bwc.start_time = datetime.datetime.now() @@ -449,7 +453,6 @@ class _Bulk: else: request_id, msg, to_send = bwc.batch_command(cmd, ops) result = self.write_command(bwc, cmd, request_id, msg, to_send, client) # type: ignore[arg-type] - client._process_response(result, bwc.session) # type: ignore[arg-type] return result, to_send # type: ignore[return-value] diff --git a/pymongo/synchronous/client_bulk.py b/pymongo/synchronous/client_bulk.py index 106e5dcbb..23af231d1 100644 --- a/pymongo/synchronous/client_bulk.py +++ b/pymongo/synchronous/client_bulk.py @@ -283,6 +283,8 @@ class _ClientBulk: ) if bwc.publish: bwc._succeed(request_id, reply, duration) # type: ignore[arg-type] + # Process the response from the server. + self.client._process_response(reply, bwc.session) # type: ignore[arg-type] except Exception as exc: duration = datetime.datetime.now() - bwc.start_time if isinstance(exc, (NotPrimaryError, OperationFailure)): @@ -312,6 +314,11 @@ class _ClientBulk: bwc._fail(request_id, failure, duration) # Top-level error will be embedded in ClientBulkWriteException. reply = {"error": exc} + # Process the response from the server. + if isinstance(exc, OperationFailure): + self.client._process_response(exc.details, bwc.session) # type: ignore[arg-type] + else: + self.client._process_response({}, bwc.session) # type: ignore[arg-type] finally: bwc.start_time = datetime.datetime.now() return reply # type: ignore[return-value] @@ -429,7 +436,6 @@ class _ClientBulk: """Executes a batch of bulkWrite server commands (ack).""" request_id, msg, to_send_ops, to_send_ns = bwc.batch_command(cmd, ops, namespaces) result = self.write_command(bwc, cmd, request_id, msg, to_send_ops, to_send_ns, self.client) # type: ignore[arg-type] - self.client._process_response(result, bwc.session) # type: ignore[arg-type] return result, to_send_ops, to_send_ns # type: ignore[return-value] def _process_results_cursor( diff --git a/test/mockupdb/test_cluster_time.py b/test/mockupdb/test_cluster_time.py index f3ab0a6c5..979484317 100644 --- a/test/mockupdb/test_cluster_time.py +++ b/test/mockupdb/test_cluster_time.py @@ -29,21 +29,22 @@ except ImportError: from bson import Timestamp from pymongo import DeleteMany, InsertOne, MongoClient, UpdateOne +from pymongo.errors import OperationFailure pytestmark = pytest.mark.mockupdb class TestClusterTime(unittest.TestCase): - def cluster_time_conversation(self, callback, replies): + def cluster_time_conversation(self, callback, replies, max_wire_version=6): cluster_time = Timestamp(0, 0) server = MockupDB() - # First test all commands include $clusterTime with wire version 6. + # First test all commands include $clusterTime with max_wire_version. _ = server.autoresponds( "ismaster", { "minWireVersion": 0, - "maxWireVersion": 6, + "maxWireVersion": max_wire_version, "$clusterTime": {"clusterTime": cluster_time}, }, ) @@ -166,6 +167,30 @@ class TestClusterTime(unittest.TestCase): request.reply(reply) client.close() + def test_collection_bulk_error(self): + def callback(client: MongoClient[dict]) -> None: + with self.assertRaises(OperationFailure): + client.db.collection.bulk_write([InsertOne({}), InsertOne({})]) + + self.cluster_time_conversation( + callback, + [{"ok": 0, "errmsg": "mock error"}], + ) + + def test_client_bulk_error(self): + def callback(client: MongoClient[dict]) -> None: + with self.assertRaises(OperationFailure): + client.bulk_write( + [ + InsertOne({}, namespace="db.collection"), + InsertOne({}, namespace="db.collection"), + ] + ) + + self.cluster_time_conversation( + callback, [{"ok": 0, "errmsg": "mock error"}], max_wire_version=25 + ) + if __name__ == "__main__": unittest.main() From 26c55048d4ba760b10d79ce155ae7ae5fb5e9989 Mon Sep 17 00:00:00 2001 From: Jib Date: Thu, 5 Sep 2024 09:39:55 -0400 Subject: [PATCH 04/10] PYTHON-4631: Pushed PREPARE_SHELL creation into an env.sh file (#1788) Co-authored-by: Steven Silvester --- .evergreen/config.yml | 143 +++++++++++----------------- .evergreen/scripts/configure-env.sh | 53 +++++++++++ 2 files changed, 106 insertions(+), 90 deletions(-) create mode 100644 .evergreen/scripts/configure-env.sh diff --git a/.evergreen/config.yml b/.evergreen/config.yml index 8388c7215..c8e314f6c 100644 --- a/.evergreen/config.yml +++ b/.evergreen/config.yml @@ -34,51 +34,14 @@ functions: # Applies the subitted patch, if any # Deprecated. Should be removed. But still needed for certain agents (ZAP) - command: git.apply_patch - # Make an evergreen exapanstion file with dynamic values - - command: shell.exec + # Make an evergreen expansion file with dynamic values + - command: subprocess.exec params: + include_expansions_in_env: ["is_patch", "project", "version_id"] + binary: bash working_dir: "src" - script: | - set +x - # Get the current unique version of this checkout - if [ "${is_patch}" = "true" ]; then - CURRENT_VERSION=$(git describe)-patch-${version_id} - else - CURRENT_VERSION=latest - fi - - export DRIVERS_TOOLS="$(dirname $(pwd))/drivers-tools" - export PROJECT_DIRECTORY="$(pwd)" - - # Python has cygwin path problems on Windows. Detect prospective mongo-orchestration home directory - if [ "Windows_NT" = "$OS" ]; then # Magic variable in cygwin - export DRIVERS_TOOLS=$(cygpath -m $DRIVERS_TOOLS) - export PROJECT_DIRECTORY=$(cygpath -m $PROJECT_DIRECTORY) - fi - - export MONGO_ORCHESTRATION_HOME="$DRIVERS_TOOLS/.evergreen/orchestration" - export MONGODB_BINARIES="$DRIVERS_TOOLS/mongodb/bin" - - cat < expansion.yml - CURRENT_VERSION: "$CURRENT_VERSION" - DRIVERS_TOOLS: "$DRIVERS_TOOLS" - MONGO_ORCHESTRATION_HOME: "$MONGO_ORCHESTRATION_HOME" - MONGODB_BINARIES: "$MONGODB_BINARIES" - PROJECT_DIRECTORY: "$PROJECT_DIRECTORY" - PREPARE_SHELL: | - set -o errexit - export SKIP_LEGACY_SHELL=1 - export DRIVERS_TOOLS="$DRIVERS_TOOLS" - export MONGO_ORCHESTRATION_HOME="$MONGO_ORCHESTRATION_HOME" - export MONGODB_BINARIES="$MONGODB_BINARIES" - export PROJECT_DIRECTORY="$PROJECT_DIRECTORY" - - export TMPDIR="$MONGO_ORCHESTRATION_HOME/db" - export PATH="$MONGODB_BINARIES:$PATH" - export PROJECT="${project}" - export PIP_QUIET=1 - EOT - + args: + - .evergreen/scripts/configure-env.sh # Load the expansion file to make an evergreen variable with the current unique version - command: expansions.update params: @@ -88,14 +51,14 @@ functions: - command: shell.exec params: script: | - ${PREPARE_SHELL} + . src/.evergreen/scripts/env.sh set -o xtrace rm -rf $DRIVERS_TOOLS - if [ "${project}" = "drivers-tools" ]; then + if [ "$PROJECT" = "drivers-tools" ]; then # If this was a patch build, doing a fresh clone would not actually test the patch - cp -R ${PROJECT_DIRECTORY}/ $DRIVERS_TOOLS + cp -R ${PROJECT_DIRECTORY}/ ${DRIVERS_TOOLS} else - git clone https://github.com/mongodb-labs/drivers-evergreen-tools.git $DRIVERS_TOOLS + git clone https://github.com/mongodb-labs/drivers-evergreen-tools.git ${DRIVERS_TOOLS} fi echo "{ \"releases\": { \"default\": \"$MONGODB_BINARIES\" }}" > $MONGO_ORCHESTRATION_HOME/orchestration.config @@ -129,12 +92,12 @@ functions: script: | # Download all the task coverage files. aws s3 cp --recursive s3://${bucket_name}/coverage/${revision}/${version_id}/coverage/ coverage/ - - command: shell.exec + - command: subprocess.exec params: working_dir: "src" - script: | - ${PREPARE_SHELL} - bash .evergreen/combine-coverage.sh + binary: bash + args: + - .evergreen/combine-coverage.sh # Upload the resulting html coverage report. - command: shell.exec params: @@ -164,7 +127,7 @@ functions: - command: shell.exec params: script: | - ${PREPARE_SHELL} + . src/.evergreen/scripts/env.sh set -o xtrace mkdir out_dir find $MONGO_ORCHESTRATION_HOME -name \*.log -exec sh -c 'x="{}"; mv $x $PWD/out_dir/$(basename $(dirname $x))_$(basename $x)' \; @@ -266,7 +229,7 @@ functions: - command: shell.exec params: script: | - ${PREPARE_SHELL} + . src/.evergreen/scripts/env.sh set -o xtrace # Enable core dumps if enabled on the machine @@ -325,13 +288,13 @@ functions: type: setup params: script: | - ${PREPARE_SHELL} + . src/.evergreen/scripts/env.sh bash ${DRIVERS_TOOLS}/.evergreen/atlas_data_lake/pull-mongohouse-image.sh - command: shell.exec type: setup params: script: | - ${PREPARE_SHELL} + . src/.evergreen/scripts/env.sh bash ${DRIVERS_TOOLS}/.evergreen/atlas_data_lake/run-mongohouse-image.sh sleep 1 docker ps @@ -340,7 +303,7 @@ functions: - command: shell.exec params: script: | - ${PREPARE_SHELL} + . src/.evergreen/scripts/env.sh set -o xtrace bash ${DRIVERS_TOOLS}/.evergreen/stop-orchestration.sh @@ -350,7 +313,7 @@ functions: params: working_dir: "src" script: | - ${PREPARE_SHELL} + . .evergreen/scripts/env.sh set -o xtrace PYTHON_BINARY=${PYTHON_BINARY} MOD_WSGI_VERSION=${MOD_WSGI_VERSION} \ MOD_WSGI_EMBEDDED=${MOD_WSGI_EMBEDDED} PROJECT_DIRECTORY=${PROJECT_DIRECTORY} \ @@ -362,7 +325,7 @@ functions: params: working_dir: "src" script: | - ${PREPARE_SHELL} + . .evergreen/scripts/env.sh set -o xtrace export PYTHON_BINARY=${PYTHON_BINARY} bash ${PROJECT_DIRECTORY}/.evergreen/hatch.sh test:test-mockupdb @@ -373,7 +336,7 @@ functions: params: working_dir: "src" script: | - ${PREPARE_SHELL} + . .evergreen/scripts/env.sh set -o xtrace PYTHON_BINARY=${PYTHON_BINARY} bash ${PROJECT_DIRECTORY}/.evergreen/hatch.sh doctest:test @@ -385,7 +348,7 @@ functions: background: true include_expansions_in_env: ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_SESSION_TOKEN"] script: | - ${PREPARE_SHELL} + . .evergreen/scripts/env.sh if [ -n "${test_encryption}" ]; then ./.evergreen/hatch.sh encryption:setup fi @@ -397,7 +360,7 @@ functions: script: | # Disable xtrace set +x - ${PREPARE_SHELL} + . .evergreen/scripts/env.sh if [ -n "${MONGODB_STARTED}" ]; then export PYMONGO_MUST_CONNECT=true fi @@ -497,7 +460,7 @@ functions: shell: "bash" working_dir: "src" script: | - ${PREPARE_SHELL} + . .evergreen/scripts/env.sh .evergreen/run-mongodb-aws-test.sh regular "run aws auth test with assume role credentials": @@ -507,7 +470,7 @@ functions: shell: "bash" working_dir: "src" script: | - ${PREPARE_SHELL} + . .evergreen/scripts/env.sh .evergreen/run-mongodb-aws-test.sh assume-role "run aws auth test with aws EC2 credentials": @@ -521,7 +484,7 @@ functions: echo "This platform does not support the EC2 auth test, skipping..." exit 0 fi - ${PREPARE_SHELL} + . .evergreen/scripts/env.sh .evergreen/run-mongodb-aws-test.sh ec2 "run aws auth test with aws web identity credentials": @@ -535,7 +498,7 @@ functions: echo "This platform does not support the web identity auth test, skipping..." exit 0 fi - ${PREPARE_SHELL} + . .evergreen/scripts/env.sh # Test with and without AWS_ROLE_SESSION_NAME set. .evergreen/run-mongodb-aws-test.sh web-identity AWS_ROLE_SESSION_NAME="test" \ @@ -558,7 +521,7 @@ functions: working_dir: "src" shell: bash script: | - ${PREPARE_SHELL} + . .evergreen/scripts/env.sh .evergreen/run-mongodb-aws-test.sh env-creds "run aws auth test with aws credentials and session token as environment variables": @@ -568,7 +531,7 @@ functions: working_dir: "src" shell: bash script: | - ${PREPARE_SHELL} + . .evergreen/scripts/env.sh .evergreen/run-mongodb-aws-test.sh session-creds "run aws ECS auth test": @@ -582,12 +545,12 @@ functions: echo "This platform does not support the ECS auth test, skipping..." exit 0 fi - ${PREPARE_SHELL} + . .evergreen/scripts/env.sh set -ex cd ${DRIVERS_TOOLS}/.evergreen/auth_aws . ./activate-authawsvenv.sh . aws_setup.sh ecs - export MONGODB_BINARIES="${MONGODB_BINARIES}"; + export MONGODB_BINARIES="$MONGODB_BINARIES"; export PROJECT_DIRECTORY="${PROJECT_DIRECTORY}"; python aws_tester.py ecs cd - @@ -597,9 +560,9 @@ functions: params: working_dir: "src" script: | - ${PREPARE_SHELL} + . .evergreen/scripts/env.sh . .evergreen/hatch.sh encryption:teardown - rm -rf $DRIVERS_TOOLS || true + rm -rf ${DRIVERS_TOOLS} || true rm -f ./secrets-export.sh || true "fix absolute paths": @@ -607,7 +570,7 @@ functions: params: script: | set +x - ${PREPARE_SHELL} + . src/.evergreen/scripts/env.sh for filename in $(find ${DRIVERS_TOOLS} -name \*.json); do perl -p -i -e "s|ABSOLUTE_PATH_REPLACEMENT_TOKEN|${DRIVERS_TOOLS}|g" $filename done @@ -617,20 +580,20 @@ functions: params: script: | set +x - ${PREPARE_SHELL} + . src/.evergreen/scripts/env.sh for i in $(find ${DRIVERS_TOOLS}/.evergreen ${PROJECT_DIRECTORY}/.evergreen -name \*.sh); do cat $i | tr -d '\r' > $i.new mv $i.new $i done # Copy client certificate because symlinks do not work on Windows. - cp ${DRIVERS_TOOLS}/.evergreen/x509gen/client.pem ${MONGO_ORCHESTRATION_HOME}/lib/client.pem + cp ${DRIVERS_TOOLS}/.evergreen/x509gen/client.pem $MONGO_ORCHESTRATION_HOME/lib/client.pem "make files executable": - command: shell.exec params: script: | set +x - ${PREPARE_SHELL} + . src/.evergreen/scripts/env.sh for i in $(find ${DRIVERS_TOOLS}/.evergreen ${PROJECT_DIRECTORY}/.evergreen -name \*.sh); do chmod +x $i done @@ -640,7 +603,7 @@ functions: params: script: | set +x - ${PREPARE_SHELL} + . src/.evergreen/scripts/env.sh echo '{"results": [{ "status": "FAIL", "test_file": "Build", "log_raw": "No test-results.json found was created" } ]}' > ${PROJECT_DIRECTORY}/test-results.json "install dependencies": @@ -648,7 +611,7 @@ functions: params: working_dir: "src" script: | - ${PREPARE_SHELL} + . .evergreen/scripts/env.sh set -o xtrace file="${PROJECT_DIRECTORY}/.evergreen/install-dependencies.sh" # Don't use ${file} syntax here because evergreen treats it as an empty expansion. @@ -679,10 +642,10 @@ functions: params: working_dir: "src" script: | - ${PREPARE_SHELL} + . .evergreen/scripts/env.sh TEST_OCSP=1 \ PYTHON_BINARY=${PYTHON_BINARY} \ - CA_FILE="$DRIVERS_TOOLS/.evergreen/ocsp/${OCSP_ALGORITHM}/ca.pem" \ + CA_FILE="${DRIVERS_TOOLS}/.evergreen/ocsp/${OCSP_ALGORITHM}/ca.pem" \ OCSP_TLS_SHOULD_SUCCEED="${OCSP_TLS_SHOULD_SUCCEED}" \ bash ${PROJECT_DIRECTORY}/.evergreen/hatch.sh test:test-eg @@ -691,7 +654,7 @@ functions: params: background: true script: | - ${PREPARE_SHELL} + . src/.evergreen/scripts/env.sh cd ${DRIVERS_TOOLS}/.evergreen/ocsp . ./activate-ocspvenv.sh python ocsp_mock.py \ @@ -704,7 +667,7 @@ functions: params: background: true script: | - ${PREPARE_SHELL} + . src/.evergreen/scripts/env.sh cd ${DRIVERS_TOOLS}/.evergreen/ocsp . ./activate-ocspvenv.sh python ocsp_mock.py \ @@ -719,7 +682,7 @@ functions: params: background: true script: | - ${PREPARE_SHELL} + . src/.evergreen/scripts/env.sh cd ${DRIVERS_TOOLS}/.evergreen/ocsp . ./activate-ocspvenv.sh python ocsp_mock.py \ @@ -732,7 +695,7 @@ functions: params: background: true script: | - ${PREPARE_SHELL} + . src/.evergreen/scripts/env.sh cd ${DRIVERS_TOOLS}/.evergreen/ocsp . ./activate-ocspvenv.sh python ocsp_mock.py \ @@ -774,7 +737,7 @@ functions: params: shell: "bash" script: | - ${PREPARE_SHELL} + . src/.evergreen/scripts/env.sh cd "${DRIVERS_TOOLS}/.evergreen/auth_aws" if [ -f "./aws_e2e_setup.json" ]; then . ./activate-authawsvenv.sh @@ -794,7 +757,7 @@ functions: params: working_dir: "src" script: | - ${PREPARE_SHELL} + . .evergreen/scripts/env.sh PROJECT_DIRECTORY=${PROJECT_DIRECTORY} bash ${PROJECT_DIRECTORY}/.evergreen/run-perf-tests.sh "attach benchmark test results": @@ -1889,7 +1852,7 @@ tasks: shell: bash script: |- set -o errexit - ${PREPARE_SHELL} + . src/.evergreen/scripts/env.sh cd src git add . git commit -m "add files" @@ -1906,7 +1869,7 @@ tasks: shell: bash script: |- set -o errexit - ${PREPARE_SHELL} + . src/.evergreen/scripts/env.sh cd src git add . git commit -m "add files" @@ -1974,7 +1937,7 @@ tasks: working_dir: "src" shell: "bash" script: | - ${PREPARE_SHELL} + . .evergreen/scripts/env.sh export PYTHON_BINARY=/opt/mongodbtoolchain/v4/bin/python3 export LIBMONGOCRYPT_URL=https://s3.amazonaws.com/mciuploads/libmongocrypt/debian11/master/latest/libmongocrypt.tar.gz SKIP_SERVERS=1 bash ./.evergreen/setup-encryption.sh @@ -2051,7 +2014,7 @@ tasks: shell: "bash" working_dir: src script: | - ${PREPARE_SHELL} + . .evergreen/scripts/env.sh set -x export CONFIG=$PROJECT_DIRECTORY/.github/reviewers.txt export SCRIPT="$DRIVERS_TOOLS/.evergreen/github_app/assign-reviewer.sh" @@ -2067,7 +2030,7 @@ tasks: shell: "bash" working_dir: src script: | - ${PREPARE_SHELL} + . .evergreen/scripts/env.sh set -x export BASE_SHA=${revision} export HEAD_SHA=${github_commit} diff --git a/.evergreen/scripts/configure-env.sh b/.evergreen/scripts/configure-env.sh new file mode 100644 index 000000000..0c9c8bb03 --- /dev/null +++ b/.evergreen/scripts/configure-env.sh @@ -0,0 +1,53 @@ +#!/bin/bash -ex + +# Get the current unique version of this checkout +# shellcheck disable=SC2154 +if [ "$is_patch" = "true" ]; then + # shellcheck disable=SC2154 + CURRENT_VERSION="$(git describe)-patch-$version_id" +else + CURRENT_VERSION=latest +fi + +PROJECT_DIRECTORY="$(pwd)" +DRIVERS_TOOLS="$(dirname $PROJECT_DIRECTORY)/drivers-tools" + +# Python has cygwin path problems on Windows. Detect prospective mongo-orchestration home directory +if [ "Windows_NT" = "$OS" ]; then # Magic variable in cygwin + DRIVERS_TOOLS=$(cygpath -m $DRIVERS_TOOLS) + PROJECT_DIRECTORY=$(cygpath -m $PROJECT_DIRECTORY) +fi + +SCRIPT_DIR="$PROJECT_DIRECTORY/.evergreen/scripts" + +if [ -f "$SCRIPT_DIR/env.sh" ]; then + echo "Reading $SCRIPT_DIR/env.sh file" + . "$SCRIPT_DIR/env.sh" + exit 0 +fi + +export MONGO_ORCHESTRATION_HOME="$DRIVERS_TOOLS/.evergreen/orchestration" +export MONGODB_BINARIES="$DRIVERS_TOOLS/mongodb/bin" + +cat < $SCRIPT_DIR/env.sh +set -o errexit +export PROJECT_DIRECTORY="$PROJECT_DIRECTORY" +export CURRENT_VERSION="$CURRENT_VERSION" +export SKIP_LEGACY_SHELL=1 +export DRIVERS_TOOLS="$DRIVERS_TOOLS" +export MONGO_ORCHESTRATION_HOME="$MONGO_ORCHESTRATION_HOME" +export MONGODB_BINARIES="$MONGODB_BINARIES" +export PROJECT_DIRECTORY="$PROJECT_DIRECTORY" + +export TMPDIR="$MONGO_ORCHESTRATION_HOME/db" +export PATH="$MONGODB_BINARIES:$PATH" +# shellcheck disable=SC2154 +export PROJECT="$project" +export PIP_QUIET=1 +EOT + +# Add these expansions to make it easier to call out tests scripts from the EVG yaml +cat < expansion.yml +DRIVERS_TOOLS: "$DRIVERS_TOOLS" +PROJECT_DIRECTORY: "$PROJECT_DIRECTORY" +EOT From 6e9bf1e4a822cb480bd68c576d4ffb82f8b347d6 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Thu, 5 Sep 2024 10:20:32 -0400 Subject: [PATCH 05/10] PYTHON-4708 - Convert test.qcheck to async (#1832) --- test/asynchronous/qcheck.py | 255 ++++++++++++++++++++++++++++ test/asynchronous/test_grid_file.py | 18 +- test/qcheck.py | 7 +- test/test_grid_file.py | 14 +- tools/synchro.py | 1 + 5 files changed, 280 insertions(+), 15 deletions(-) create mode 100644 test/asynchronous/qcheck.py diff --git a/test/asynchronous/qcheck.py b/test/asynchronous/qcheck.py new file mode 100644 index 000000000..190a7f1a9 --- /dev/null +++ b/test/asynchronous/qcheck.py @@ -0,0 +1,255 @@ +# Copyright 2009-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import datetime +import random +import re +import sys +import traceback + +sys.path[0:0] = [""] + +from bson.dbref import DBRef +from bson.objectid import ObjectId +from bson.son import SON + +_IS_SYNC = False + +gen_target = 100 +reduction_attempts = 10 +examples = 5 + + +def lift(value): + return lambda: value + + +def choose_lifted(generator_list): + return lambda: random.choice(generator_list) + + +def my_map(generator, function): + return lambda: function(generator()) + + +def choose(list): + return lambda: random.choice(list)() + + +def gen_range(start, stop): + return lambda: random.randint(start, stop) + + +def gen_int(): + max_int = 2147483647 + return lambda: random.randint(-max_int - 1, max_int) + + +def gen_float(): + return lambda: (random.random() - 0.5) * sys.maxsize + + +def gen_boolean(): + return lambda: random.choice([True, False]) + + +def gen_printable_char(): + return lambda: chr(random.randint(32, 126)) + + +def gen_printable_string(gen_length): + return lambda: "".join(gen_list(gen_printable_char(), gen_length)()) + + +def gen_char(set=None): + return lambda: bytes([random.randint(0, 255)]) + + +def gen_string(gen_length): + return lambda: b"".join(gen_list(gen_char(), gen_length)()) + + +def gen_unichar(): + return lambda: chr(random.randint(1, 0xFFF)) + + +def gen_unicode(gen_length): + return lambda: "".join([x for x in gen_list(gen_unichar(), gen_length)() if x not in ".$"]) + + +def gen_list(generator, gen_length): + return lambda: [generator() for _ in range(gen_length())] + + +def gen_datetime(): + return lambda: datetime.datetime( + random.randint(1970, 2037), + random.randint(1, 12), + random.randint(1, 28), + random.randint(0, 23), + random.randint(0, 59), + random.randint(0, 59), + random.randint(0, 999) * 1000, + ) + + +def gen_dict(gen_key, gen_value, gen_length): + def a_dict(gen_key, gen_value, length): + result = {} + for _ in range(length): + result[gen_key()] = gen_value() + return result + + return lambda: a_dict(gen_key, gen_value, gen_length()) + + +def gen_regexp(gen_length): + # TODO our patterns only consist of one letter. + # this is because of a bug in CPython's regex equality testing, + # which I haven't quite tracked down, so I'm just ignoring it... + def pattern(): + return "".join(gen_list(choose_lifted("a"), gen_length)()) + + def gen_flags(): + flags = 0 + if random.random() > 0.5: + flags = flags | re.IGNORECASE + if random.random() > 0.5: + flags = flags | re.MULTILINE + if random.random() > 0.5: + flags = flags | re.VERBOSE + + return flags + + return lambda: re.compile(pattern(), gen_flags()) + + +def gen_objectid(): + return lambda: ObjectId() + + +def gen_dbref(): + collection = gen_unicode(gen_range(0, 20)) + return lambda: DBRef(collection(), gen_mongo_value(1, True)()) + + +def gen_mongo_value(depth, ref): + choices = [ + gen_unicode(gen_range(0, 50)), + gen_printable_string(gen_range(0, 50)), + my_map(gen_string(gen_range(0, 1000)), bytes), + gen_int(), + gen_float(), + gen_boolean(), + gen_datetime(), + gen_objectid(), + lift(None), + ] + if ref: + choices.append(gen_dbref()) + if depth > 0: + choices.append(gen_mongo_list(depth, ref)) + choices.append(gen_mongo_dict(depth, ref)) + return choose(choices) + + +def gen_mongo_list(depth, ref): + return gen_list(gen_mongo_value(depth - 1, ref), gen_range(0, 10)) + + +def gen_mongo_dict(depth, ref=True): + return my_map( + gen_dict(gen_unicode(gen_range(0, 20)), gen_mongo_value(depth - 1, ref), gen_range(0, 10)), + SON, + ) + + +def simplify(case): # TODO this is a hack + if isinstance(case, SON) and "$ref" not in case: + simplified = SON(case) # make a copy! + if random.choice([True, False]): + # delete + simplified_keys = list(simplified) + if not len(simplified_keys): + return (False, case) + simplified.pop(random.choice(simplified_keys)) + return (True, simplified) + else: + # simplify a value + simplified_items = list(simplified.items()) + if not len(simplified_items): + return (False, case) + (key, value) = random.choice(simplified_items) + (success, value) = simplify(value) + simplified[key] = value + return (success, success and simplified or case) + if isinstance(case, list): + simplified = list(case) + if random.choice([True, False]): + # delete + if not len(simplified): + return (False, case) + simplified.pop(random.randrange(len(simplified))) + return (True, simplified) + else: + # simplify an item + if not len(simplified): + return (False, case) + index = random.randrange(len(simplified)) + (success, value) = simplify(simplified[index]) + simplified[index] = value + return (success, success and simplified or case) + return (False, case) + + +async def reduce(case, predicate, reductions=0): + for _ in range(reduction_attempts): + (reduced, simplified) = simplify(case) + if reduced and not await predicate(simplified): + return await reduce(simplified, predicate, reductions + 1) + return (reductions, case) + + +async def isnt(predicate): + async def is_not(x): + return not await predicate(x) + + return is_not + + +async def check(predicate, generator): + counter_examples = [] + for _ in range(gen_target): + case = generator() + try: + if not await predicate(case): + reduction = await reduce(case, predicate) + counter_examples.append("after {} reductions: {!r}".format(*reduction)) + except: + counter_examples.append(f"{case!r} : {traceback.format_exc()}") + return counter_examples + + +async def check_unittest(test, predicate, generator): + counter_examples = await check(predicate, generator) + if counter_examples: + failures = len(counter_examples) + message = "\n".join([" -> %s" % f for f in counter_examples[:examples]]) + message = "found %d counter examples, displaying first %d:\n%s" % ( + failures, + min(failures, examples), + message, + ) + test.fail(message) diff --git a/test/asynchronous/test_grid_file.py b/test/asynchronous/test_grid_file.py index 7071fc76f..6d589dc01 100644 --- a/test/asynchronous/test_grid_file.py +++ b/test/asynchronous/test_grid_file.py @@ -21,17 +21,21 @@ import io import sys import zipfile from io import BytesIO -from test.asynchronous import AsyncIntegrationTest, AsyncUnitTest, async_client_context +from test.asynchronous import ( + AsyncIntegrationTest, + AsyncUnitTest, + async_client_context, + qcheck, + unittest, +) from pymongo.asynchronous.database import AsyncDatabase sys.path[0:0] = [""] -from test import IntegrationTest, qcheck, unittest -from test.utils import EventListener, async_rs_or_single_client, rs_or_single_client +from test.utils import EventListener, async_rs_or_single_client from bson.objectid import ObjectId -from gridfs import GridFS from gridfs.asynchronous.grid_file import ( _SEEK_CUR, _SEEK_END, @@ -44,7 +48,7 @@ from gridfs.asynchronous.grid_file import ( from gridfs.errors import NoFile from pymongo import AsyncMongoClient from pymongo.asynchronous.helpers import aiter, anext -from pymongo.errors import ConfigurationError, InvalidOperation, ServerSelectionTimeoutError +from pymongo.errors import ConfigurationError, ServerSelectionTimeoutError from pymongo.message import _CursorAddress _IS_SYNC = False @@ -407,8 +411,6 @@ class AsyncTestGridFile(AsyncIntegrationTest): g = AsyncGridOut(self.db.fs, f._id) self.assertEqual(random_string, await g.read()) - # TODO: https://jira.mongodb.org/browse/PYTHON-4708 - @async_client_context.require_sync async def test_small_chunks(self): self.files = 0 self.chunks = 0 @@ -431,7 +433,7 @@ class AsyncTestGridFile(AsyncIntegrationTest): self.assertEqual(data, await g.read(10) + await g.read(10)) return True - qcheck.check_unittest(self, helper, qcheck.gen_string(qcheck.gen_range(0, 20))) + await qcheck.check_unittest(self, helper, qcheck.gen_string(qcheck.gen_range(0, 20))) async def test_seek(self): f = AsyncGridIn(self.db.fs, chunkSize=3) diff --git a/test/qcheck.py b/test/qcheck.py index 8339bc376..842580cbf 100644 --- a/test/qcheck.py +++ b/test/qcheck.py @@ -25,6 +25,8 @@ from bson.dbref import DBRef from bson.objectid import ObjectId from bson.son import SON +_IS_SYNC = True + gen_target = 100 reduction_attempts = 10 examples = 5 @@ -221,7 +223,10 @@ def reduce(case, predicate, reductions=0): def isnt(predicate): - return lambda x: not predicate(x) + def is_not(x): + return not predicate(x) + + return is_not def check(predicate, generator): diff --git a/test/test_grid_file.py b/test/test_grid_file.py index 0e806eb5c..bd89235b7 100644 --- a/test/test_grid_file.py +++ b/test/test_grid_file.py @@ -21,17 +21,21 @@ import io import sys import zipfile from io import BytesIO -from test import IntegrationTest, UnitTest, client_context +from test import ( + IntegrationTest, + UnitTest, + client_context, + qcheck, + unittest, +) from pymongo.synchronous.database import Database sys.path[0:0] = [""] -from test import IntegrationTest, qcheck, unittest from test.utils import EventListener, rs_or_single_client from bson.objectid import ObjectId -from gridfs import GridFS from gridfs.errors import NoFile from gridfs.synchronous.grid_file import ( _SEEK_CUR, @@ -43,7 +47,7 @@ from gridfs.synchronous.grid_file import ( GridOutCursor, ) from pymongo import MongoClient -from pymongo.errors import ConfigurationError, InvalidOperation, ServerSelectionTimeoutError +from pymongo.errors import ConfigurationError, ServerSelectionTimeoutError from pymongo.message import _CursorAddress from pymongo.synchronous.helpers import iter, next @@ -405,8 +409,6 @@ class TestGridFile(IntegrationTest): g = GridOut(self.db.fs, f._id) self.assertEqual(random_string, g.read()) - # TODO: https://jira.mongodb.org/browse/PYTHON-4708 - @client_context.require_sync def test_small_chunks(self): self.files = 0 self.chunks = 0 diff --git a/tools/synchro.py b/tools/synchro.py index f4019f0bb..dfe3854e2 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -159,6 +159,7 @@ converted_tests = [ "conftest.py", "pymongo_mocks.py", "utils_spec_runner.py", + "qcheck.py", "test_bulk.py", "test_client.py", "test_client_bulk_write.py", From 2742a000c49bd8bf528c08c351f390c4c5d82cb6 Mon Sep 17 00:00:00 2001 From: Iris <58442094+sleepyStick@users.noreply.github.com> Date: Thu, 5 Sep 2024 09:05:24 -0700 Subject: [PATCH 06/10] PYTHON-4730 Fix Failing Async Bulk Tests (#1831) --- test/asynchronous/test_bulk.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/asynchronous/test_bulk.py b/test/asynchronous/test_bulk.py index 24111ad7c..b5f2eefde 100644 --- a/test/asynchronous/test_bulk.py +++ b/test/asynchronous/test_bulk.py @@ -26,8 +26,8 @@ sys.path[0:0] = [""] from test.asynchronous import AsyncIntegrationTest, async_client_context, remove_all_users, unittest from test.utils import ( async_rs_or_single_client_noauth, + async_single_client, async_wait_until, - single_client, ) from bson.binary import Binary, UuidRepresentation @@ -817,7 +817,7 @@ class AsyncBulkAuthorizationTestBase(AsyncBulkTestBase): roles=[], ) - async_client_context.create_user(self.db.name, "noremove", "pw", ["noremove"]) + await async_client_context.create_user(self.db.name, "noremove", "pw", ["noremove"]) async def asyncTearDown(self): await self.db.command("dropRole", "noremove") @@ -919,7 +919,7 @@ class AsyncTestBulkAuthorization(AsyncBulkAuthorizationTestBase): username="readonly", password="pw", authSource="pymongo_test" ) coll = cli.pymongo_test.test - coll.find_one() + await coll.find_one() with self.assertRaises(OperationFailure): await coll.bulk_write([InsertOne({"x": 1})]) @@ -930,7 +930,7 @@ class AsyncTestBulkAuthorization(AsyncBulkAuthorizationTestBase): username="noremove", password="pw", authSource="pymongo_test" ) coll = cli.pymongo_test.test - coll.find_one() + await coll.find_one() requests = [ InsertOne({"x": 1}), ReplaceOne({"x": 2}, {"x": 2}, upsert=True), @@ -954,7 +954,7 @@ class AsyncTestBulkWriteConcern(AsyncBulkTestBase): if cls.w is not None and cls.w > 1: for member in (await async_client_context.hello)["hosts"]: if member != (await async_client_context.hello)["primary"]: - cls.secondary = single_client(*partition_node(member)) + cls.secondary = await async_single_client(*partition_node(member)) break @classmethod From 350413032257614dfb175999830eaf5846882373 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Thu, 5 Sep 2024 11:28:49 -0700 Subject: [PATCH 07/10] PYTHON-4663 Fix coverity warnings in datetime decoding change (#1835) --- bson/_cbsonmodule.c | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/bson/_cbsonmodule.c b/bson/_cbsonmodule.c index cc498f448..3e9d5ecc2 100644 --- a/bson/_cbsonmodule.c +++ b/bson/_cbsonmodule.c @@ -383,10 +383,11 @@ static int millis_from_datetime_ms(PyObject* dt, long long* out){ static PyObject* decode_datetime(PyObject* self, long long millis, const codec_options_t* options){ PyObject* naive = NULL; PyObject* replace = NULL; - PyObject* args = NULL; - PyObject* kwargs = NULL; PyObject* value = NULL; struct module_state *state = GETSTATE(self); + if (!state) { + goto invalid; + } if (options->datetime_conversion == DATETIME_MS){ return datetime_ms_from_millis(self, millis); } @@ -414,8 +415,8 @@ static PyObject* decode_datetime(PyObject* self, long long millis, const codec_o Py_DECREF(utcoffset); return 0; } - min_millis_offset = (PyDateTime_DELTA_GET_DAYS(utcoffset) * 86400 + - PyDateTime_DELTA_GET_SECONDS(utcoffset)) * 1000 + + min_millis_offset = (PyDateTime_DELTA_GET_DAYS(utcoffset) * (int64_t)86400 + + PyDateTime_DELTA_GET_SECONDS(utcoffset)) * (int64_t)1000 + (PyDateTime_DELTA_GET_MICROSECONDS(utcoffset) / 1000); } Py_DECREF(utcoffset); @@ -433,8 +434,8 @@ static PyObject* decode_datetime(PyObject* self, long long millis, const codec_o Py_DECREF(utcoffset); return 0; } - max_millis_offset = (PyDateTime_DELTA_GET_DAYS(utcoffset) * 86400 + - PyDateTime_DELTA_GET_SECONDS(utcoffset)) * 1000 + + max_millis_offset = (PyDateTime_DELTA_GET_DAYS(utcoffset) * (int64_t)86400 + + PyDateTime_DELTA_GET_SECONDS(utcoffset)) * (int64_t)1000 + (PyDateTime_DELTA_GET_MICROSECONDS(utcoffset) / 1000); } Py_DECREF(utcoffset); @@ -487,8 +488,6 @@ static PyObject* decode_datetime(PyObject* self, long long millis, const codec_o invalid: Py_XDECREF(naive); Py_XDECREF(replace); - Py_XDECREF(args); - Py_XDECREF(kwargs); return value; } From 25f724badbef2d266ed01e75fb8f21f895c2b3b1 Mon Sep 17 00:00:00 2001 From: Iris <58442094+sleepyStick@users.noreply.github.com> Date: Thu, 5 Sep 2024 13:09:43 -0700 Subject: [PATCH 08/10] PYTHON-4727 Migrate test_monitoring.py to async (#1834) --- test/asynchronous/test_monitoring.py | 1280 ++++++++++++++++++++++++++ test/test_monitoring.py | 51 +- tools/synchro.py | 1 + 3 files changed, 1316 insertions(+), 16 deletions(-) create mode 100644 test/asynchronous/test_monitoring.py diff --git a/test/asynchronous/test_monitoring.py b/test/asynchronous/test_monitoring.py new file mode 100644 index 000000000..3f6563ee5 --- /dev/null +++ b/test/asynchronous/test_monitoring.py @@ -0,0 +1,1280 @@ +# Copyright 2015-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import asyncio +import copy +import datetime +import sys +import time +from typing import Any + +sys.path[0:0] = [""] + +from test.asynchronous import ( + AsyncIntegrationTest, + async_client_context, + client_knobs, + sanitize_cmd, + unittest, +) +from test.utils import ( + EventListener, + async_rs_or_single_client, + async_single_client, + async_wait_until, +) + +from bson.int64 import Int64 +from bson.objectid import ObjectId +from bson.son import SON +from pymongo import CursorType, DeleteOne, InsertOne, UpdateOne, monitoring +from pymongo.asynchronous.command_cursor import AsyncCommandCursor +from pymongo.asynchronous.helpers import anext +from pymongo.errors import AutoReconnect, NotPrimaryError, OperationFailure +from pymongo.read_preferences import ReadPreference +from pymongo.write_concern import WriteConcern + +_IS_SYNC = False + + +class AsyncTestCommandMonitoring(AsyncIntegrationTest): + listener: EventListener + + @classmethod + @async_client_context.require_connection + async def _setup_class(cls): + await super()._setup_class() + cls.listener = EventListener() + cls.client = await async_rs_or_single_client( + event_listeners=[cls.listener], retryWrites=False + ) + + @classmethod + async def _tearDown_class(cls): + await cls.client.close() + await super()._tearDown_class() + + async def asyncTearDown(self): + self.listener.reset() + await super().asyncTearDown() + + async def test_started_simple(self): + await self.client.pymongo_test.command("ping") + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertEqualCommand(SON([("ping", 1)]), started.command) + self.assertEqual("ping", started.command_name) + self.assertEqual(await self.client.address, started.connection_id) + self.assertEqual("pymongo_test", started.database_name) + self.assertTrue(isinstance(started.request_id, int)) + + async def test_succeeded_simple(self): + await self.client.pymongo_test.command("ping") + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) + self.assertEqual("ping", succeeded.command_name) + self.assertEqual(await self.client.address, succeeded.connection_id) + self.assertEqual(1, succeeded.reply.get("ok")) + self.assertTrue(isinstance(succeeded.request_id, int)) + self.assertTrue(isinstance(succeeded.duration_micros, int)) + + async def test_failed_simple(self): + try: + await self.client.pymongo_test.command("oops!") + except OperationFailure: + pass + started = self.listener.started_events[0] + failed = self.listener.failed_events[0] + self.assertEqual(0, len(self.listener.succeeded_events)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertTrue(isinstance(failed, monitoring.CommandFailedEvent)) + self.assertEqual("oops!", failed.command_name) + self.assertEqual(await self.client.address, failed.connection_id) + self.assertEqual(0, failed.failure.get("ok")) + self.assertTrue(isinstance(failed.request_id, int)) + self.assertTrue(isinstance(failed.duration_micros, int)) + + async def test_find_one(self): + await self.client.pymongo_test.test.find_one() + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertEqualCommand( + SON([("find", "test"), ("filter", {}), ("limit", 1), ("singleBatch", True)]), + started.command, + ) + self.assertEqual("find", started.command_name) + self.assertEqual(await self.client.address, started.connection_id) + self.assertEqual("pymongo_test", started.database_name) + self.assertTrue(isinstance(started.request_id, int)) + + async def test_find_and_get_more(self): + await self.client.pymongo_test.test.drop() + await self.client.pymongo_test.test.insert_many([{} for _ in range(10)]) + self.listener.reset() + cursor = self.client.pymongo_test.test.find(projection={"_id": False}, batch_size=4) + for _ in range(4): + await anext(cursor) + cursor_id = cursor.cursor_id + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertEqualCommand( + SON( + [("find", "test"), ("filter", {}), ("projection", {"_id": False}), ("batchSize", 4)] + ), + started.command, + ) + self.assertEqual("find", started.command_name) + self.assertEqual(await self.client.address, started.connection_id) + self.assertEqual("pymongo_test", started.database_name) + self.assertTrue(isinstance(started.request_id, int)) + self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) + self.assertTrue(isinstance(succeeded.duration_micros, int)) + self.assertEqual("find", succeeded.command_name) + self.assertTrue(isinstance(succeeded.request_id, int)) + self.assertEqual(cursor.address, succeeded.connection_id) + csr = succeeded.reply["cursor"] + self.assertEqual(csr["id"], cursor_id) + self.assertEqual(csr["ns"], "pymongo_test.test") + self.assertEqual(csr["firstBatch"], [{} for _ in range(4)]) + + self.listener.reset() + # Next batch. Exhausting the cursor could cause a getMore + # that returns id of 0 and no results. + await anext(cursor) + try: + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertEqualCommand( + SON([("getMore", cursor_id), ("collection", "test"), ("batchSize", 4)]), + started.command, + ) + self.assertEqual("getMore", started.command_name) + self.assertEqual(await self.client.address, started.connection_id) + self.assertEqual("pymongo_test", started.database_name) + self.assertTrue(isinstance(started.request_id, int)) + self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) + self.assertTrue(isinstance(succeeded.duration_micros, int)) + self.assertEqual("getMore", succeeded.command_name) + self.assertTrue(isinstance(succeeded.request_id, int)) + self.assertEqual(cursor.address, succeeded.connection_id) + csr = succeeded.reply["cursor"] + self.assertEqual(csr["id"], cursor_id) + self.assertEqual(csr["ns"], "pymongo_test.test") + self.assertEqual(csr["nextBatch"], [{} for _ in range(4)]) + finally: + # Exhaust the cursor to avoid kill cursors. + tuple(await cursor.to_list()) + + async def test_find_with_explain(self): + cmd = SON([("explain", SON([("find", "test"), ("filter", {})]))]) + await self.client.pymongo_test.test.drop() + await self.client.pymongo_test.test.insert_one({}) + self.listener.reset() + coll = self.client.pymongo_test.test + # Test that we publish the unwrapped command. + if await self.client.is_mongos: + coll = coll.with_options(read_preference=ReadPreference.PRIMARY_PREFERRED) + res = await coll.find().explain() + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertEqualCommand(cmd, started.command) + self.assertEqual("explain", started.command_name) + self.assertEqual(await self.client.address, started.connection_id) + self.assertEqual("pymongo_test", started.database_name) + self.assertTrue(isinstance(started.request_id, int)) + self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) + self.assertTrue(isinstance(succeeded.duration_micros, int)) + self.assertEqual("explain", succeeded.command_name) + self.assertTrue(isinstance(succeeded.request_id, int)) + self.assertEqual(await self.client.address, succeeded.connection_id) + self.assertEqual(res, succeeded.reply) + + async def _test_find_options(self, query, expected_cmd): + coll = self.client.pymongo_test.test + await coll.drop() + await coll.create_index("x") + await coll.insert_many([{"x": i} for i in range(5)]) + + # Test that we publish the unwrapped command. + self.listener.reset() + if await self.client.is_mongos: + coll = coll.with_options(read_preference=ReadPreference.PRIMARY_PREFERRED) + + cursor = coll.find(**query) + + await anext(cursor) + try: + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertEqualCommand(expected_cmd, started.command) + self.assertEqual("find", started.command_name) + self.assertEqual(await self.client.address, started.connection_id) + self.assertEqual("pymongo_test", started.database_name) + self.assertTrue(isinstance(started.request_id, int)) + self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) + self.assertTrue(isinstance(succeeded.duration_micros, int)) + self.assertEqual("find", succeeded.command_name) + self.assertTrue(isinstance(succeeded.request_id, int)) + self.assertEqual(await self.client.address, succeeded.connection_id) + finally: + # Exhaust the cursor to avoid kill cursors. + tuple(await cursor.to_list()) + + async def test_find_options(self): + query = { + "filter": {}, + "hint": [("x", 1)], + "max_time_ms": 10000, + "max": {"x": 10}, + "min": {"x": -10}, + "return_key": True, + "show_record_id": True, + "projection": {"x": False}, + "skip": 1, + "no_cursor_timeout": True, + "sort": [("_id", 1)], + "allow_partial_results": True, + "comment": "this is a test", + "batch_size": 2, + } + + cmd = { + "find": "test", + "filter": {}, + "hint": SON([("x", 1)]), + "comment": "this is a test", + "maxTimeMS": 10000, + "max": {"x": 10}, + "min": {"x": -10}, + "returnKey": True, + "showRecordId": True, + "sort": SON([("_id", 1)]), + "projection": {"x": False}, + "skip": 1, + "batchSize": 2, + "noCursorTimeout": True, + "allowPartialResults": True, + } + + if async_client_context.version < (4, 1, 0, -1): + query["max_scan"] = 10 + cmd["maxScan"] = 10 + + await self._test_find_options(query, cmd) + + @async_client_context.require_version_max(3, 7, 2) + async def test_find_snapshot(self): + # Test "snapshot" parameter separately, can't combine with "sort". + query = {"filter": {}, "snapshot": True} + + cmd = {"find": "test", "filter": {}, "snapshot": True} + + await self._test_find_options(query, cmd) + + async def test_command_and_get_more(self): + await self.client.pymongo_test.test.drop() + await self.client.pymongo_test.test.insert_many([{"x": 1} for _ in range(10)]) + self.listener.reset() + coll = self.client.pymongo_test.test + # Test that we publish the unwrapped command. + if await self.client.is_mongos: + coll = coll.with_options(read_preference=ReadPreference.PRIMARY_PREFERRED) + cursor = await coll.aggregate([{"$project": {"_id": False, "x": 1}}], batchSize=4) + for _ in range(4): + await anext(cursor) + cursor_id = cursor.cursor_id + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertEqualCommand( + SON( + [ + ("aggregate", "test"), + ("pipeline", [{"$project": {"_id": False, "x": 1}}]), + ("cursor", {"batchSize": 4}), + ] + ), + started.command, + ) + self.assertEqual("aggregate", started.command_name) + self.assertEqual(await self.client.address, started.connection_id) + self.assertEqual("pymongo_test", started.database_name) + self.assertTrue(isinstance(started.request_id, int)) + self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) + self.assertTrue(isinstance(succeeded.duration_micros, int)) + self.assertEqual("aggregate", succeeded.command_name) + self.assertTrue(isinstance(succeeded.request_id, int)) + self.assertEqual(cursor.address, succeeded.connection_id) + expected_cursor = { + "id": cursor_id, + "ns": "pymongo_test.test", + "firstBatch": [{"x": 1} for _ in range(4)], + } + self.assertEqualCommand(expected_cursor, succeeded.reply.get("cursor")) + + self.listener.reset() + await anext(cursor) + try: + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertEqualCommand( + SON([("getMore", cursor_id), ("collection", "test"), ("batchSize", 4)]), + started.command, + ) + self.assertEqual("getMore", started.command_name) + self.assertEqual(await self.client.address, started.connection_id) + self.assertEqual("pymongo_test", started.database_name) + self.assertTrue(isinstance(started.request_id, int)) + self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) + self.assertTrue(isinstance(succeeded.duration_micros, int)) + self.assertEqual("getMore", succeeded.command_name) + self.assertTrue(isinstance(succeeded.request_id, int)) + self.assertEqual(cursor.address, succeeded.connection_id) + expected_result = { + "cursor": { + "id": cursor_id, + "ns": "pymongo_test.test", + "nextBatch": [{"x": 1} for _ in range(4)], + }, + "ok": 1.0, + } + self.assertEqualReply(expected_result, succeeded.reply) + finally: + # Exhaust the cursor to avoid kill cursors. + tuple(await cursor.to_list()) + + async def test_get_more_failure(self): + address = await self.client.address + coll = self.client.pymongo_test.test + cursor_id = Int64(12345) + cursor_doc = {"id": cursor_id, "firstBatch": [], "ns": coll.full_name} + cursor = AsyncCommandCursor(coll, cursor_doc, address) + try: + await anext(cursor) + except Exception: + pass + started = self.listener.started_events[0] + self.assertEqual(0, len(self.listener.succeeded_events)) + failed = self.listener.failed_events[0] + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertEqualCommand( + SON([("getMore", cursor_id), ("collection", "test")]), started.command + ) + self.assertEqual("getMore", started.command_name) + self.assertEqual(await self.client.address, started.connection_id) + self.assertEqual("pymongo_test", started.database_name) + self.assertTrue(isinstance(started.request_id, int)) + self.assertTrue(isinstance(failed, monitoring.CommandFailedEvent)) + self.assertTrue(isinstance(failed.duration_micros, int)) + self.assertEqual("getMore", failed.command_name) + self.assertTrue(isinstance(failed.request_id, int)) + self.assertEqual(cursor.address, failed.connection_id) + self.assertEqual(0, failed.failure.get("ok")) + + @async_client_context.require_replica_set + @async_client_context.require_secondaries_count(1) + async def test_not_primary_error(self): + address = next(iter(await async_client_context.client.secondaries)) + client = await async_single_client(*address, event_listeners=[self.listener]) + # Clear authentication command results from the listener. + await client.admin.command("ping") + self.listener.reset() + error = None + try: + await client.pymongo_test.test.find_one_and_delete({}) + except NotPrimaryError as exc: + error = exc.errors + started = self.listener.started_events[0] + failed = self.listener.failed_events[0] + self.assertEqual(0, len(self.listener.succeeded_events)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertTrue(isinstance(failed, monitoring.CommandFailedEvent)) + self.assertEqual("findAndModify", failed.command_name) + self.assertEqual(address, failed.connection_id) + self.assertEqual(0, failed.failure.get("ok")) + self.assertTrue(isinstance(failed.request_id, int)) + self.assertTrue(isinstance(failed.duration_micros, int)) + self.assertEqual(error, failed.failure) + + @async_client_context.require_no_mongos + async def test_exhaust(self): + await self.client.pymongo_test.test.drop() + await self.client.pymongo_test.test.insert_many([{} for _ in range(11)]) + self.listener.reset() + cursor = self.client.pymongo_test.test.find( + projection={"_id": False}, batch_size=5, cursor_type=CursorType.EXHAUST + ) + await anext(cursor) + cursor_id = cursor.cursor_id + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertEqualCommand( + SON( + [("find", "test"), ("filter", {}), ("projection", {"_id": False}), ("batchSize", 5)] + ), + started.command, + ) + self.assertEqual("find", started.command_name) + self.assertEqual(cursor.address, started.connection_id) + self.assertEqual("pymongo_test", started.database_name) + self.assertTrue(isinstance(started.request_id, int)) + self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) + self.assertTrue(isinstance(succeeded.duration_micros, int)) + self.assertEqual("find", succeeded.command_name) + self.assertTrue(isinstance(succeeded.request_id, int)) + self.assertEqual(cursor.address, succeeded.connection_id) + expected_result = { + "cursor": { + "id": cursor_id, + "ns": "pymongo_test.test", + "firstBatch": [{} for _ in range(5)], + }, + "ok": 1, + } + self.assertEqualReply(expected_result, succeeded.reply) + + self.listener.reset() + tuple(await cursor.to_list()) + self.assertEqual(0, len(self.listener.failed_events)) + for event in self.listener.started_events: + self.assertTrue(isinstance(event, monitoring.CommandStartedEvent)) + self.assertEqualCommand( + SON([("getMore", cursor_id), ("collection", "test"), ("batchSize", 5)]), + event.command, + ) + self.assertEqual("getMore", event.command_name) + self.assertEqual(cursor.address, event.connection_id) + self.assertEqual("pymongo_test", event.database_name) + self.assertTrue(isinstance(event.request_id, int)) + for event in self.listener.succeeded_events: + self.assertTrue(isinstance(event, monitoring.CommandSucceededEvent)) + self.assertTrue(isinstance(event.duration_micros, int)) + self.assertEqual("getMore", event.command_name) + self.assertTrue(isinstance(event.request_id, int)) + self.assertEqual(cursor.address, event.connection_id) + # Last getMore receives a response with cursor id 0. + self.assertEqual(0, self.listener.succeeded_events[-1].reply["cursor"]["id"]) + + async def test_kill_cursors(self): + with client_knobs(kill_cursor_frequency=0.01): + await self.client.pymongo_test.test.drop() + await self.client.pymongo_test.test.insert_many([{} for _ in range(10)]) + cursor = self.client.pymongo_test.test.find().batch_size(5) + await anext(cursor) + cursor_id = cursor.cursor_id + self.listener.reset() + await cursor.close() + await asyncio.sleep(2) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + # There could be more than one cursor_id here depending on + # when the thread last ran. + self.assertIn(cursor_id, started.command["cursors"]) + self.assertEqual("killCursors", started.command_name) + self.assertIs(type(started.connection_id), tuple) + self.assertEqual(cursor.address, started.connection_id) + self.assertEqual("pymongo_test", started.database_name) + self.assertTrue(isinstance(started.request_id, int)) + self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) + self.assertTrue(isinstance(succeeded.duration_micros, int)) + self.assertEqual("killCursors", succeeded.command_name) + self.assertTrue(isinstance(succeeded.request_id, int)) + self.assertIs(type(succeeded.connection_id), tuple) + self.assertEqual(cursor.address, succeeded.connection_id) + # There could be more than one cursor_id here depending on + # when the thread last ran. + self.assertTrue( + cursor_id in succeeded.reply["cursorsUnknown"] + or cursor_id in succeeded.reply["cursorsKilled"] + ) + + async def test_non_bulk_writes(self): + coll = self.client.pymongo_test.test + await coll.drop() + self.listener.reset() + + # Implied write concern insert_one + res = await coll.insert_one({"x": 1}) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertIsInstance(started, monitoring.CommandStartedEvent) + expected = SON( + [ + ("insert", coll.name), + ("ordered", True), + ("documents", [{"_id": res.inserted_id, "x": 1}]), + ] + ) + self.assertEqualCommand(expected, started.command) + self.assertEqual("pymongo_test", started.database_name) + self.assertEqual("insert", started.command_name) + self.assertIsInstance(started.request_id, int) + self.assertEqual(await self.client.address, started.connection_id) + self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeeded.duration_micros, int) + self.assertEqual(started.command_name, succeeded.command_name) + self.assertEqual(started.request_id, succeeded.request_id) + self.assertEqual(started.connection_id, succeeded.connection_id) + reply = succeeded.reply + self.assertEqual(1, reply.get("ok")) + self.assertEqual(1, reply.get("n")) + + # Unacknowledged insert_one + self.listener.reset() + coll = coll.with_options(write_concern=WriteConcern(w=0)) + res = await coll.insert_one({"x": 1}) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertIsInstance(started, monitoring.CommandStartedEvent) + expected = SON( + [ + ("insert", coll.name), + ("ordered", True), + ("documents", [{"_id": res.inserted_id, "x": 1}]), + ("writeConcern", {"w": 0}), + ] + ) + self.assertEqualCommand(expected, started.command) + self.assertEqual("pymongo_test", started.database_name) + self.assertEqual("insert", started.command_name) + self.assertIsInstance(started.request_id, int) + self.assertEqual(await self.client.address, started.connection_id) + self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeeded.duration_micros, int) + self.assertEqual(started.command_name, succeeded.command_name) + self.assertEqual(started.request_id, succeeded.request_id) + self.assertEqual(started.connection_id, succeeded.connection_id) + self.assertEqualReply(succeeded.reply, {"ok": 1}) + + # Explicit write concern insert_one + self.listener.reset() + coll = coll.with_options(write_concern=WriteConcern(w=1)) + res = await coll.insert_one({"x": 1}) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertIsInstance(started, monitoring.CommandStartedEvent) + expected = SON( + [ + ("insert", coll.name), + ("ordered", True), + ("documents", [{"_id": res.inserted_id, "x": 1}]), + ("writeConcern", {"w": 1}), + ] + ) + self.assertEqualCommand(expected, started.command) + self.assertEqual("pymongo_test", started.database_name) + self.assertEqual("insert", started.command_name) + self.assertIsInstance(started.request_id, int) + self.assertEqual(await self.client.address, started.connection_id) + self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeeded.duration_micros, int) + self.assertEqual(started.command_name, succeeded.command_name) + self.assertEqual(started.request_id, succeeded.request_id) + self.assertEqual(started.connection_id, succeeded.connection_id) + reply = succeeded.reply + self.assertEqual(1, reply.get("ok")) + self.assertEqual(1, reply.get("n")) + + # delete_many + self.listener.reset() + res = await coll.delete_many({"x": 1}) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertIsInstance(started, monitoring.CommandStartedEvent) + expected = SON( + [ + ("delete", coll.name), + ("ordered", True), + ("deletes", [SON([("q", {"x": 1}), ("limit", 0)])]), + ("writeConcern", {"w": 1}), + ] + ) + self.assertEqualCommand(expected, started.command) + self.assertEqual("pymongo_test", started.database_name) + self.assertEqual("delete", started.command_name) + self.assertIsInstance(started.request_id, int) + self.assertEqual(await self.client.address, started.connection_id) + self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeeded.duration_micros, int) + self.assertEqual(started.command_name, succeeded.command_name) + self.assertEqual(started.request_id, succeeded.request_id) + self.assertEqual(started.connection_id, succeeded.connection_id) + reply = succeeded.reply + self.assertEqual(1, reply.get("ok")) + self.assertEqual(res.deleted_count, reply.get("n")) + + # replace_one + self.listener.reset() + oid = ObjectId() + res = await coll.replace_one({"_id": oid}, {"_id": oid, "x": 1}, upsert=True) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertIsInstance(started, monitoring.CommandStartedEvent) + expected = SON( + [ + ("update", coll.name), + ("ordered", True), + ( + "updates", + [ + SON( + [ + ("q", {"_id": oid}), + ("u", {"_id": oid, "x": 1}), + ("multi", False), + ("upsert", True), + ] + ) + ], + ), + ("writeConcern", {"w": 1}), + ] + ) + self.assertEqualCommand(expected, started.command) + self.assertEqual("pymongo_test", started.database_name) + self.assertEqual("update", started.command_name) + self.assertIsInstance(started.request_id, int) + self.assertEqual(await self.client.address, started.connection_id) + self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeeded.duration_micros, int) + self.assertEqual(started.command_name, succeeded.command_name) + self.assertEqual(started.request_id, succeeded.request_id) + self.assertEqual(started.connection_id, succeeded.connection_id) + reply = succeeded.reply + self.assertEqual(1, reply.get("ok")) + self.assertEqual(1, reply.get("n")) + self.assertEqual([{"index": 0, "_id": oid}], reply.get("upserted")) + + # update_one + self.listener.reset() + res = await coll.update_one({"x": 1}, {"$inc": {"x": 1}}) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertIsInstance(started, monitoring.CommandStartedEvent) + expected = SON( + [ + ("update", coll.name), + ("ordered", True), + ( + "updates", + [ + SON( + [ + ("q", {"x": 1}), + ("u", {"$inc": {"x": 1}}), + ("multi", False), + ("upsert", False), + ] + ) + ], + ), + ("writeConcern", {"w": 1}), + ] + ) + self.assertEqualCommand(expected, started.command) + self.assertEqual("pymongo_test", started.database_name) + self.assertEqual("update", started.command_name) + self.assertIsInstance(started.request_id, int) + self.assertEqual(await self.client.address, started.connection_id) + self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeeded.duration_micros, int) + self.assertEqual(started.command_name, succeeded.command_name) + self.assertEqual(started.request_id, succeeded.request_id) + self.assertEqual(started.connection_id, succeeded.connection_id) + reply = succeeded.reply + self.assertEqual(1, reply.get("ok")) + self.assertEqual(1, reply.get("n")) + + # update_many + self.listener.reset() + res = await coll.update_many({"x": 2}, {"$inc": {"x": 1}}) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertIsInstance(started, monitoring.CommandStartedEvent) + expected = SON( + [ + ("update", coll.name), + ("ordered", True), + ( + "updates", + [ + SON( + [ + ("q", {"x": 2}), + ("u", {"$inc": {"x": 1}}), + ("multi", True), + ("upsert", False), + ] + ) + ], + ), + ("writeConcern", {"w": 1}), + ] + ) + self.assertEqualCommand(expected, started.command) + self.assertEqual("pymongo_test", started.database_name) + self.assertEqual("update", started.command_name) + self.assertIsInstance(started.request_id, int) + self.assertEqual(await self.client.address, started.connection_id) + self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeeded.duration_micros, int) + self.assertEqual(started.command_name, succeeded.command_name) + self.assertEqual(started.request_id, succeeded.request_id) + self.assertEqual(started.connection_id, succeeded.connection_id) + reply = succeeded.reply + self.assertEqual(1, reply.get("ok")) + self.assertEqual(1, reply.get("n")) + + # delete_one + self.listener.reset() + _ = await coll.delete_one({"x": 3}) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertIsInstance(started, monitoring.CommandStartedEvent) + expected = SON( + [ + ("delete", coll.name), + ("ordered", True), + ("deletes", [SON([("q", {"x": 3}), ("limit", 1)])]), + ("writeConcern", {"w": 1}), + ] + ) + self.assertEqualCommand(expected, started.command) + self.assertEqual("pymongo_test", started.database_name) + self.assertEqual("delete", started.command_name) + self.assertIsInstance(started.request_id, int) + self.assertEqual(await self.client.address, started.connection_id) + self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeeded.duration_micros, int) + self.assertEqual(started.command_name, succeeded.command_name) + self.assertEqual(started.request_id, succeeded.request_id) + self.assertEqual(started.connection_id, succeeded.connection_id) + reply = succeeded.reply + self.assertEqual(1, reply.get("ok")) + self.assertEqual(1, reply.get("n")) + + self.assertEqual(0, await coll.count_documents({})) + + # write errors + await coll.insert_one({"_id": 1}) + try: + self.listener.reset() + await coll.insert_one({"_id": 1}) + except OperationFailure: + pass + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertIsInstance(started, monitoring.CommandStartedEvent) + expected = SON( + [ + ("insert", coll.name), + ("ordered", True), + ("documents", [{"_id": 1}]), + ("writeConcern", {"w": 1}), + ] + ) + self.assertEqualCommand(expected, started.command) + self.assertEqual("pymongo_test", started.database_name) + self.assertEqual("insert", started.command_name) + self.assertIsInstance(started.request_id, int) + self.assertEqual(await self.client.address, started.connection_id) + self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeeded.duration_micros, int) + self.assertEqual(started.command_name, succeeded.command_name) + self.assertEqual(started.request_id, succeeded.request_id) + self.assertEqual(started.connection_id, succeeded.connection_id) + reply = succeeded.reply + self.assertEqual(1, reply.get("ok")) + self.assertEqual(0, reply.get("n")) + errors = reply.get("writeErrors") + self.assertIsInstance(errors, list) + error = errors[0] + self.assertEqual(0, error.get("index")) + self.assertIsInstance(error.get("code"), int) + self.assertIsInstance(error.get("errmsg"), str) + + async def test_insert_many(self): + # This always uses the bulk API. + coll = self.client.pymongo_test.test + await coll.drop() + self.listener.reset() + + big = "x" * (1024 * 1024 * 4) + docs = [{"_id": i, "big": big} for i in range(6)] + await coll.insert_many(docs) + started = self.listener.started_events + succeeded = self.listener.succeeded_events + self.assertEqual(0, len(self.listener.failed_events)) + documents = [] + count = 0 + operation_id = started[0].operation_id + self.assertIsInstance(operation_id, int) + for start, succeed in zip(started, succeeded): + self.assertIsInstance(start, monitoring.CommandStartedEvent) + cmd = sanitize_cmd(start.command) + self.assertEqual(["insert", "ordered", "documents"], list(cmd.keys())) + self.assertEqual(coll.name, cmd["insert"]) + self.assertIs(True, cmd["ordered"]) + documents.extend(cmd["documents"]) + self.assertEqual("pymongo_test", start.database_name) + self.assertEqual("insert", start.command_name) + self.assertIsInstance(start.request_id, int) + self.assertEqual(await self.client.address, start.connection_id) + self.assertIsInstance(succeed, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeed.duration_micros, int) + self.assertEqual(start.command_name, succeed.command_name) + self.assertEqual(start.request_id, succeed.request_id) + self.assertEqual(start.connection_id, succeed.connection_id) + self.assertEqual(start.operation_id, operation_id) + self.assertEqual(succeed.operation_id, operation_id) + reply = succeed.reply + self.assertEqual(1, reply.get("ok")) + count += reply.get("n", 0) + self.assertEqual(documents, docs) + self.assertEqual(6, count) + + async def test_insert_many_unacknowledged(self): + coll = self.client.pymongo_test.test + await coll.drop() + unack_coll = coll.with_options(write_concern=WriteConcern(w=0)) + self.listener.reset() + + # Force two batches on legacy servers. + big = "x" * (1024 * 1024 * 12) + docs = [{"_id": i, "big": big} for i in range(6)] + await unack_coll.insert_many(docs) + started = self.listener.started_events + succeeded = self.listener.succeeded_events + self.assertEqual(0, len(self.listener.failed_events)) + documents = [] + operation_id = started[0].operation_id + self.assertIsInstance(operation_id, int) + for start, succeed in zip(started, succeeded): + self.assertIsInstance(start, monitoring.CommandStartedEvent) + cmd = sanitize_cmd(start.command) + cmd.pop("writeConcern", None) + self.assertEqual(["insert", "ordered", "documents"], list(cmd.keys())) + self.assertEqual(coll.name, cmd["insert"]) + self.assertIs(True, cmd["ordered"]) + documents.extend(cmd["documents"]) + self.assertEqual("pymongo_test", start.database_name) + self.assertEqual("insert", start.command_name) + self.assertIsInstance(start.request_id, int) + self.assertEqual(await self.client.address, start.connection_id) + self.assertIsInstance(succeed, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeed.duration_micros, int) + self.assertEqual(start.command_name, succeed.command_name) + self.assertEqual(start.request_id, succeed.request_id) + self.assertEqual(start.connection_id, succeed.connection_id) + self.assertEqual(start.operation_id, operation_id) + self.assertEqual(succeed.operation_id, operation_id) + self.assertEqual(1, succeed.reply.get("ok")) + self.assertEqual(documents, docs) + + async def check(): + return await coll.count_documents({}) == 6 + + await async_wait_until(check, "insert documents with w=0") + + async def test_bulk_write(self): + coll = self.client.pymongo_test.test + await coll.drop() + self.listener.reset() + + await coll.bulk_write( + [ + InsertOne({"_id": 1}), + UpdateOne({"_id": 1}, {"$set": {"x": 1}}), + DeleteOne({"_id": 1}), + ] + ) + started = self.listener.started_events + succeeded = self.listener.succeeded_events + self.assertEqual(0, len(self.listener.failed_events)) + operation_id = started[0].operation_id + pairs = list(zip(started, succeeded)) + self.assertEqual(3, len(pairs)) + for start, succeed in pairs: + self.assertIsInstance(start, monitoring.CommandStartedEvent) + self.assertEqual("pymongo_test", start.database_name) + self.assertIsInstance(start.request_id, int) + self.assertEqual(await self.client.address, start.connection_id) + self.assertIsInstance(succeed, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeed.duration_micros, int) + self.assertEqual(start.command_name, succeed.command_name) + self.assertEqual(start.request_id, succeed.request_id) + self.assertEqual(start.connection_id, succeed.connection_id) + self.assertEqual(start.operation_id, operation_id) + self.assertEqual(succeed.operation_id, operation_id) + + expected = SON([("insert", coll.name), ("ordered", True), ("documents", [{"_id": 1}])]) + self.assertEqualCommand(expected, started[0].command) + expected = SON( + [ + ("update", coll.name), + ("ordered", True), + ( + "updates", + [ + SON( + [ + ("q", {"_id": 1}), + ("u", {"$set": {"x": 1}}), + ("multi", False), + ("upsert", False), + ] + ) + ], + ), + ] + ) + self.assertEqualCommand(expected, started[1].command) + expected = SON( + [ + ("delete", coll.name), + ("ordered", True), + ("deletes", [SON([("q", {"_id": 1}), ("limit", 1)])]), + ] + ) + self.assertEqualCommand(expected, started[2].command) + + @async_client_context.require_failCommand_fail_point + async def test_bulk_write_command_network_error(self): + coll = self.client.pymongo_test.test + self.listener.reset() + + insert_network_error = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "failCommands": ["insert"], + "closeConnection": True, + }, + } + async with self.fail_point(insert_network_error): + with self.assertRaises(AutoReconnect): + await coll.bulk_write([InsertOne({"_id": 1})]) + failed = self.listener.failed_events + self.assertEqual(1, len(failed)) + event = failed[0] + self.assertEqual(event.command_name, "insert") + self.assertIsInstance(event.failure, dict) + self.assertEqual(event.failure["errtype"], "AutoReconnect") + self.assertTrue(event.failure["errmsg"]) + + @async_client_context.require_failCommand_fail_point + async def test_bulk_write_command_error(self): + coll = self.client.pymongo_test.test + self.listener.reset() + + insert_command_error = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "failCommands": ["insert"], + "await acloseAsyncConnection": False, + "errorCode": 10107, # Not primary + }, + } + async with self.fail_point(insert_command_error): + with self.assertRaises(NotPrimaryError): + await coll.bulk_write([InsertOne({"_id": 1})]) + failed = self.listener.failed_events + self.assertEqual(1, len(failed)) + event = failed[0] + self.assertEqual(event.command_name, "insert") + self.assertIsInstance(event.failure, dict) + self.assertEqual(event.failure["code"], 10107) + self.assertTrue(event.failure["errmsg"]) + + async def test_write_errors(self): + coll = self.client.pymongo_test.test + await coll.drop() + self.listener.reset() + + try: + await coll.bulk_write( + [ + InsertOne({"_id": 1}), + InsertOne({"_id": 1}), + InsertOne({"_id": 1}), + DeleteOne({"_id": 1}), + ], + ordered=False, + ) + except OperationFailure: + pass + started = self.listener.started_events + succeeded = self.listener.succeeded_events + self.assertEqual(0, len(self.listener.failed_events)) + operation_id = started[0].operation_id + pairs = list(zip(started, succeeded)) + errors = [] + for start, succeed in pairs: + self.assertIsInstance(start, monitoring.CommandStartedEvent) + self.assertEqual("pymongo_test", start.database_name) + self.assertIsInstance(start.request_id, int) + self.assertEqual(await self.client.address, start.connection_id) + self.assertIsInstance(succeed, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeed.duration_micros, int) + self.assertEqual(start.command_name, succeed.command_name) + self.assertEqual(start.request_id, succeed.request_id) + self.assertEqual(start.connection_id, succeed.connection_id) + self.assertEqual(start.operation_id, operation_id) + self.assertEqual(succeed.operation_id, operation_id) + if "writeErrors" in succeed.reply: + errors.extend(succeed.reply["writeErrors"]) + + self.assertEqual(2, len(errors)) + fields = {"index", "code", "errmsg"} + for error in errors: + self.assertTrue(fields.issubset(set(error))) + + async def test_first_batch_helper(self): + # Regardless of server version and use of helpers._first_batch + # this test should still pass. + self.listener.reset() + tuple(await (await self.client.pymongo_test.test.list_indexes()).to_list()) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertIsInstance(started, monitoring.CommandStartedEvent) + expected = SON([("listIndexes", "test"), ("cursor", {})]) + self.assertEqualCommand(expected, started.command) + self.assertEqual("pymongo_test", started.database_name) + self.assertEqual("listIndexes", started.command_name) + self.assertIsInstance(started.request_id, int) + self.assertEqual(await self.client.address, started.connection_id) + self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeeded.duration_micros, int) + self.assertEqual(started.command_name, succeeded.command_name) + self.assertEqual(started.request_id, succeeded.request_id) + self.assertEqual(started.connection_id, succeeded.connection_id) + self.assertTrue("cursor" in succeeded.reply) + self.assertTrue("ok" in succeeded.reply) + + self.listener.reset() + + @async_client_context.require_version_max(6, 1, 99) + async def test_sensitive_commands(self): + listeners = self.client._event_listeners + + self.listener.reset() + cmd = SON([("getnonce", 1)]) + listeners.publish_command_start(cmd, "pymongo_test", 12345, await self.client.address, None) # type: ignore[arg-type] + delta = datetime.timedelta(milliseconds=100) + listeners.publish_command_success( + delta, + {"nonce": "e474f4561c5eb40b", "ok": 1.0}, + "getnonce", + 12345, + await self.client.address, # type: ignore[arg-type] + None, + database_name="pymongo_test", + ) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertIsInstance(started, monitoring.CommandStartedEvent) + self.assertEqual({}, started.command) + self.assertEqual("pymongo_test", started.database_name) + self.assertEqual("getnonce", started.command_name) + self.assertIsInstance(started.request_id, int) + self.assertEqual(await self.client.address, started.connection_id) + self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent) + self.assertEqual(succeeded.duration_micros, 100000) + self.assertEqual(started.command_name, succeeded.command_name) + self.assertEqual(started.request_id, succeeded.request_id) + self.assertEqual(started.connection_id, succeeded.connection_id) + self.assertEqual({}, succeeded.reply) + + +class AsyncTestGlobalListener(AsyncIntegrationTest): + listener: EventListener + saved_listeners: Any + + @classmethod + @async_client_context.require_connection + async def _setup_class(cls): + await super()._setup_class() + cls.listener = EventListener() + # We plan to call register(), which internally modifies _LISTENERS. + cls.saved_listeners = copy.deepcopy(monitoring._LISTENERS) + monitoring.register(cls.listener) + cls.client = await async_single_client() + # Get one (authenticated) socket in the pool. + await cls.client.pymongo_test.command("ping") + + @classmethod + async def _tearDown_class(cls): + monitoring._LISTENERS = cls.saved_listeners + await cls.client.close() + await super()._tearDown_class() + + async def asyncSetUp(self): + await super().asyncSetUp() + self.listener.reset() + + async def test_simple(self): + await self.client.pymongo_test.command("ping") + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertEqualCommand(SON([("ping", 1)]), started.command) + self.assertEqual("ping", started.command_name) + self.assertEqual(await self.client.address, started.connection_id) + self.assertEqual("pymongo_test", started.database_name) + self.assertTrue(isinstance(started.request_id, int)) + + +class AsyncTestEventClasses(unittest.IsolatedAsyncioTestCase): + def test_command_event_repr(self): + request_id, connection_id, operation_id, db_name = 1, ("localhost", 27017), 2, "admin" + event = monitoring.CommandStartedEvent( + {"ping": 1}, db_name, request_id, connection_id, operation_id + ) + self.assertEqual( + repr(event), + "", + ) + delta = datetime.timedelta(milliseconds=100) + event = monitoring.CommandSucceededEvent( + delta, {"ok": 1}, "ping", request_id, connection_id, operation_id, database_name=db_name + ) + self.assertEqual( + repr(event), + "", + ) + event = monitoring.CommandFailedEvent( + delta, {"ok": 0}, "ping", request_id, connection_id, operation_id, database_name=db_name + ) + self.assertEqual( + repr(event), + "", + ) + + def test_server_heartbeat_event_repr(self): + connection_id = ("localhost", 27017) + event = monitoring.ServerHeartbeatStartedEvent(connection_id) + self.assertEqual( + repr(event), "" + ) + delta = 0.1 + event = monitoring.ServerHeartbeatSucceededEvent( + delta, + {"ok": 1}, # type: ignore[arg-type] + connection_id, + ) + self.assertEqual( + repr(event), + "", + ) + event = monitoring.ServerHeartbeatFailedEvent( + delta, + "ERROR", # type: ignore[arg-type] + connection_id, + ) + self.assertEqual( + repr(event), + "", + ) + + def test_server_event_repr(self): + server_address = ("localhost", 27017) + topology_id = ObjectId("000000000000000000000001") + event = monitoring.ServerOpeningEvent(server_address, topology_id) + self.assertEqual( + repr(event), + "", + ) + event = monitoring.ServerDescriptionChangedEvent( + "PREV", # type: ignore[arg-type] + "NEW", # type: ignore[arg-type] + server_address, + topology_id, + ) + self.assertEqual( + repr(event), + "", + ) + event = monitoring.ServerClosedEvent(server_address, topology_id) + self.assertEqual( + repr(event), + "", + ) + + def test_topology_event_repr(self): + topology_id = ObjectId("000000000000000000000001") + event = monitoring.TopologyOpenedEvent(topology_id) + self.assertEqual(repr(event), "") + event = monitoring.TopologyDescriptionChangedEvent( + "PREV", # type: ignore[arg-type] + "NEW", # type: ignore[arg-type] + topology_id, + ) + self.assertEqual( + repr(event), + "", + ) + event = monitoring.TopologyClosedEvent(topology_id) + self.assertEqual(repr(event), "") + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_monitoring.py b/test/test_monitoring.py index ed6a3d0bc..8322e2991 100644 --- a/test/test_monitoring.py +++ b/test/test_monitoring.py @@ -13,6 +13,7 @@ # limitations under the License. from __future__ import annotations +import asyncio import copy import datetime import sys @@ -21,8 +22,19 @@ from typing import Any sys.path[0:0] = [""] -from test import IntegrationTest, client_context, client_knobs, sanitize_cmd, unittest -from test.utils import EventListener, rs_or_single_client, single_client, wait_until +from test import ( + IntegrationTest, + client_context, + client_knobs, + sanitize_cmd, + unittest, +) +from test.utils import ( + EventListener, + rs_or_single_client, + single_client, + wait_until, +) from bson.int64 import Int64 from bson.objectid import ObjectId @@ -31,23 +43,26 @@ from pymongo import CursorType, DeleteOne, InsertOne, UpdateOne, monitoring from pymongo.errors import AutoReconnect, NotPrimaryError, OperationFailure from pymongo.read_preferences import ReadPreference from pymongo.synchronous.command_cursor import CommandCursor +from pymongo.synchronous.helpers import next from pymongo.write_concern import WriteConcern +_IS_SYNC = True + class TestCommandMonitoring(IntegrationTest): listener: EventListener @classmethod @client_context.require_connection - def setUpClass(cls): - super().setUpClass() + def _setup_class(cls): + super()._setup_class() cls.listener = EventListener() cls.client = rs_or_single_client(event_listeners=[cls.listener], retryWrites=False) @classmethod - def tearDownClass(cls): + def _tearDown_class(cls): cls.client.close() - super().tearDownClass() + super()._tearDown_class() def tearDown(self): self.listener.reset() @@ -171,7 +186,7 @@ class TestCommandMonitoring(IntegrationTest): self.assertEqual(csr["nextBatch"], [{} for _ in range(4)]) finally: # Exhaust the cursor to avoid kill cursors. - tuple(cursor) + tuple(cursor.to_list()) def test_find_with_explain(self): cmd = SON([("explain", SON([("find", "test"), ("filter", {})]))]) @@ -230,7 +245,7 @@ class TestCommandMonitoring(IntegrationTest): self.assertEqual(self.client.address, succeeded.connection_id) finally: # Exhaust the cursor to avoid kill cursors. - tuple(cursor) + tuple(cursor.to_list()) def test_find_options(self): query = { @@ -356,7 +371,7 @@ class TestCommandMonitoring(IntegrationTest): self.assertEqualReply(expected_result, succeeded.reply) finally: # Exhaust the cursor to avoid kill cursors. - tuple(cursor) + tuple(cursor.to_list()) def test_get_more_failure(self): address = self.client.address @@ -451,7 +466,7 @@ class TestCommandMonitoring(IntegrationTest): self.assertEqualReply(expected_result, succeeded.reply) self.listener.reset() - tuple(cursor) + tuple(cursor.to_list()) self.assertEqual(0, len(self.listener.failed_events)) for event in self.listener.started_events: self.assertTrue(isinstance(event, monitoring.CommandStartedEvent)) @@ -898,7 +913,11 @@ class TestCommandMonitoring(IntegrationTest): self.assertEqual(succeed.operation_id, operation_id) self.assertEqual(1, succeed.reply.get("ok")) self.assertEqual(documents, docs) - wait_until(lambda: coll.count_documents({}) == 6, "insert documents with w=0") + + def check(): + return coll.count_documents({}) == 6 + + wait_until(check, "insert documents with w=0") def test_bulk_write(self): coll = self.client.pymongo_test.test @@ -1058,7 +1077,7 @@ class TestCommandMonitoring(IntegrationTest): # Regardless of server version and use of helpers._first_batch # this test should still pass. self.listener.reset() - tuple(self.client.pymongo_test.test.list_indexes()) + tuple((self.client.pymongo_test.test.list_indexes()).to_list()) started = self.listener.started_events[0] succeeded = self.listener.succeeded_events[0] self.assertEqual(0, len(self.listener.failed_events)) @@ -1119,8 +1138,8 @@ class TestGlobalListener(IntegrationTest): @classmethod @client_context.require_connection - def setUpClass(cls): - super().setUpClass() + def _setup_class(cls): + super()._setup_class() cls.listener = EventListener() # We plan to call register(), which internally modifies _LISTENERS. cls.saved_listeners = copy.deepcopy(monitoring._LISTENERS) @@ -1130,10 +1149,10 @@ class TestGlobalListener(IntegrationTest): cls.client.pymongo_test.command("ping") @classmethod - def tearDownClass(cls): + def _tearDown_class(cls): monitoring._LISTENERS = cls.saved_listeners cls.client.close() - super().tearDownClass() + super()._tearDown_class() def setUp(self): super().setUp() diff --git a/tools/synchro.py b/tools/synchro.py index dfe3854e2..f38a83f12 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -172,6 +172,7 @@ converted_tests = [ "test_session.py", "test_transactions.py", "test_client_context.py", + "test_monitoring.py", ] sync_test_files = [ From 29bbf77dad0867ca19bf9769ca3fddc19cadec23 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Thu, 5 Sep 2024 14:18:48 -0700 Subject: [PATCH 09/10] PYTHON-4607 Use LogRecord.getMessage() not LogRecord.message (#1837) --- test/asynchronous/test_client.py | 2 +- test/asynchronous/test_logger.py | 14 +++++++------- test/test_client.py | 2 +- test/test_logger.py | 14 +++++++------- test/unified_format.py | 2 +- 5 files changed, 17 insertions(+), 17 deletions(-) diff --git a/test/asynchronous/test_client.py b/test/asynchronous/test_client.py index d4f09cde3..97cbdf6db 100644 --- a/test/asynchronous/test_client.py +++ b/test/asynchronous/test_client.py @@ -617,7 +617,7 @@ class AsyncClientUnitTest(AsyncUnitTest): mock_get_hosts.return_value = [(host, 1)] AsyncMongoClient(host) AsyncMongoClient(multi_host) - logs = [record.message for record in cm.records if record.name == "pymongo.client"] + logs = [record.getMessage() for record in cm.records if record.name == "pymongo.client"] self.assertEqual(len(logs), 7) @patch("pymongo.srv_resolver._SrvResolver.get_hosts") diff --git a/test/asynchronous/test_logger.py b/test/asynchronous/test_logger.py index 7a5884651..b219d530e 100644 --- a/test/asynchronous/test_logger.py +++ b/test/asynchronous/test_logger.py @@ -37,15 +37,15 @@ class TestLogger(AsyncIntegrationTest): with self.assertLogs("pymongo.command", level="DEBUG") as cm: await db.test.insert_many(docs) - cmd_started_log = json_util.loads(cm.records[0].message) + cmd_started_log = json_util.loads(cm.records[0].getMessage()) self.assertEqual(len(cmd_started_log["command"]), _DEFAULT_DOCUMENT_LENGTH + 3) - cmd_succeeded_log = json_util.loads(cm.records[1].message) + cmd_succeeded_log = json_util.loads(cm.records[1].getMessage()) self.assertLessEqual(len(cmd_succeeded_log["reply"]), _DEFAULT_DOCUMENT_LENGTH + 3) with self.assertLogs("pymongo.command", level="DEBUG") as cm: await db.test.find({}).to_list() - cmd_succeeded_log = json_util.loads(cm.records[1].message) + cmd_succeeded_log = json_util.loads(cm.records[1].getMessage()) self.assertEqual(len(cmd_succeeded_log["reply"]), _DEFAULT_DOCUMENT_LENGTH + 3) async def test_configured_truncation_limit(self): @@ -55,14 +55,14 @@ class TestLogger(AsyncIntegrationTest): with self.assertLogs("pymongo.command", level="DEBUG") as cm: await db.command(cmd) - cmd_started_log = json_util.loads(cm.records[0].message) + cmd_started_log = json_util.loads(cm.records[0].getMessage()) self.assertEqual(len(cmd_started_log["command"]), 5 + 3) - cmd_succeeded_log = json_util.loads(cm.records[1].message) + cmd_succeeded_log = json_util.loads(cm.records[1].getMessage()) self.assertLessEqual(len(cmd_succeeded_log["reply"]), 5 + 3) with self.assertRaises(OperationFailure): await db.command({"notARealCommand": True}) - cmd_failed_log = json_util.loads(cm.records[-1].message) + cmd_failed_log = json_util.loads(cm.records[-1].getMessage()) self.assertEqual(len(cmd_failed_log["failure"]), 5 + 3) async def test_truncation_multi_byte_codepoints(self): @@ -78,7 +78,7 @@ class TestLogger(AsyncIntegrationTest): with patch.dict("os.environ", {"MONGOB_LOG_MAX_DOCUMENT_LENGTH": length}): with self.assertLogs("pymongo.command", level="DEBUG") as cm: await self.db.test.insert_one({"x": multi_byte_char_str}) - cmd_started_log = json_util.loads(cm.records[0].message)["command"] + cmd_started_log = json_util.loads(cm.records[0].getMessage())["command"] cmd_started_log = cmd_started_log[:-3] last_3_bytes = cmd_started_log.encode()[-3:].decode() diff --git a/test/test_client.py b/test/test_client.py index 22e94dcdd..785139d6a 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -611,7 +611,7 @@ class ClientUnitTest(UnitTest): mock_get_hosts.return_value = [(host, 1)] MongoClient(host) MongoClient(multi_host) - logs = [record.message for record in cm.records if record.name == "pymongo.client"] + logs = [record.getMessage() for record in cm.records if record.name == "pymongo.client"] self.assertEqual(len(logs), 7) @patch("pymongo.srv_resolver._SrvResolver.get_hosts") diff --git a/test/test_logger.py b/test/test_logger.py index d6c30b68a..c0011ec3a 100644 --- a/test/test_logger.py +++ b/test/test_logger.py @@ -36,15 +36,15 @@ class TestLogger(IntegrationTest): with self.assertLogs("pymongo.command", level="DEBUG") as cm: db.test.insert_many(docs) - cmd_started_log = json_util.loads(cm.records[0].message) + cmd_started_log = json_util.loads(cm.records[0].getMessage()) self.assertEqual(len(cmd_started_log["command"]), _DEFAULT_DOCUMENT_LENGTH + 3) - cmd_succeeded_log = json_util.loads(cm.records[1].message) + cmd_succeeded_log = json_util.loads(cm.records[1].getMessage()) self.assertLessEqual(len(cmd_succeeded_log["reply"]), _DEFAULT_DOCUMENT_LENGTH + 3) with self.assertLogs("pymongo.command", level="DEBUG") as cm: db.test.find({}).to_list() - cmd_succeeded_log = json_util.loads(cm.records[1].message) + cmd_succeeded_log = json_util.loads(cm.records[1].getMessage()) self.assertEqual(len(cmd_succeeded_log["reply"]), _DEFAULT_DOCUMENT_LENGTH + 3) def test_configured_truncation_limit(self): @@ -54,14 +54,14 @@ class TestLogger(IntegrationTest): with self.assertLogs("pymongo.command", level="DEBUG") as cm: db.command(cmd) - cmd_started_log = json_util.loads(cm.records[0].message) + cmd_started_log = json_util.loads(cm.records[0].getMessage()) self.assertEqual(len(cmd_started_log["command"]), 5 + 3) - cmd_succeeded_log = json_util.loads(cm.records[1].message) + cmd_succeeded_log = json_util.loads(cm.records[1].getMessage()) self.assertLessEqual(len(cmd_succeeded_log["reply"]), 5 + 3) with self.assertRaises(OperationFailure): db.command({"notARealCommand": True}) - cmd_failed_log = json_util.loads(cm.records[-1].message) + cmd_failed_log = json_util.loads(cm.records[-1].getMessage()) self.assertEqual(len(cmd_failed_log["failure"]), 5 + 3) def test_truncation_multi_byte_codepoints(self): @@ -77,7 +77,7 @@ class TestLogger(IntegrationTest): with patch.dict("os.environ", {"MONGOB_LOG_MAX_DOCUMENT_LENGTH": length}): with self.assertLogs("pymongo.command", level="DEBUG") as cm: self.db.test.insert_one({"x": multi_byte_char_str}) - cmd_started_log = json_util.loads(cm.records[0].message)["command"] + cmd_started_log = json_util.loads(cm.records[0].getMessage())["command"] cmd_started_log = cmd_started_log[:-3] last_3_bytes = cmd_started_log.encode()[-3:].decode() diff --git a/test/unified_format.py b/test/unified_format.py index e4ebf677e..d35aed435 100644 --- a/test/unified_format.py +++ b/test/unified_format.py @@ -1925,7 +1925,7 @@ class UnifiedSpecTestMixinV1(IntegrationTest): for log in log_list: if log.module == "ocsp_support": continue - data = json_util.loads(log.message) + data = json_util.loads(log.getMessage()) client = data.pop("clientId") if "clientId" in data else data.pop("topologyId") client_to_log[client].append( { From 044d92cc146aa009c0ab714184dee0a36628cbe8 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Thu, 5 Sep 2024 19:34:01 -0500 Subject: [PATCH 10/10] PYTHON-4706 Allow running pytest directly without hatch (#1824) --- .evergreen/config.yml | 9 ++++++++- .evergreen/hatch.sh | 12 +++++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/.evergreen/config.yml b/.evergreen/config.yml index c8e314f6c..e718266ef 100644 --- a/.evergreen/config.yml +++ b/.evergreen/config.yml @@ -410,6 +410,7 @@ functions: SSL=${SSL} \ TEST_DATA_LAKE=${TEST_DATA_LAKE} \ MONGODB_API_VERSION=${MONGODB_API_VERSION} \ + SKIP_HATCH=${SKIP_HATCH} \ bash ${PROJECT_DIRECTORY}/.evergreen/hatch.sh test:test-eg "run enterprise auth tests": @@ -561,7 +562,9 @@ functions: working_dir: "src" script: | . .evergreen/scripts/env.sh - . .evergreen/hatch.sh encryption:teardown + if [ -f $DRIVERS_TOOLS/.evergreen/csfle/secrets-export.sh ]; then + . .evergreen/hatch.sh encryption:teardown + fi rm -rf ${DRIVERS_TOOLS} || true rm -f ./secrets-export.sh || true @@ -2083,10 +2086,14 @@ axes: display_name: "RHEL 8.3 (zSeries)" run_on: rhel83-zseries-small batchtime: 10080 # 7 days + variables: + SKIP_HATCH: true - id: rhel81-power8 display_name: "RHEL 8.1 (POWER8)" run_on: rhel81-power8-small batchtime: 10080 # 7 days + variables: + SKIP_HATCH: true - id: rhel82-arm64 display_name: "RHEL 8.2 (ARM64)" run_on: rhel82-arm64-small diff --git a/.evergreen/hatch.sh b/.evergreen/hatch.sh index 843839410..db0da2f4d 100644 --- a/.evergreen/hatch.sh +++ b/.evergreen/hatch.sh @@ -8,7 +8,17 @@ if [ -z "$PYTHON_BINARY" ]; then PYTHON_BINARY=$(find_python3) fi -if $PYTHON_BINARY -m hatch --version; then +# Check if we should skip hatch and run the tests directly. +if [ -n "$SKIP_HATCH" ]; then + ENV_NAME=testenv-$RANDOM + createvirtualenv "$PYTHON_BINARY" $ENV_NAME + # shellcheck disable=SC2064 + trap "deactivate; rm -rf $ENV_NAME" EXIT HUP + python -m pip install -e ".[test]" + run_hatch() { + bash ./.evergreen/run-tests.sh + } +elif $PYTHON_BINARY -m hatch --version; then run_hatch() { $PYTHON_BINARY -m hatch run "$@" }