From abfa0d35bcbc99107dfac71f58c4b1606dc7656a Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Wed, 2 Feb 2022 13:53:58 -0600 Subject: [PATCH] PYTHON-3075 bulk_write does not apply CodecOptions to upserted_ids result (#840) --- pymongo/message.py | 13 +++---- pymongo/pool.py | 4 +-- test/test_bulk.py | 76 +++++++++++++++++++++++++++++++++++++++++ test/test_encryption.py | 33 +++++++++++++++++- 4 files changed, 117 insertions(+), 9 deletions(-) diff --git a/pymongo/message.py b/pymongo/message.py index 584528c2f..f632214a0 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -32,6 +32,7 @@ from bson import (CodecOptions, _decode_selective, _dict_to_bson, _make_c_string) +from bson import codec_options from bson.int64 import Int64 from bson.raw_bson import (_inflate_bson, DEFAULT_RAW_BSON_OPTIONS, RawBSONDocument) @@ -798,7 +799,7 @@ class _BulkWriteContext(object): self._start(cmd, request_id, docs) start = datetime.datetime.now() try: - reply = self.sock_info.write_command(request_id, msg) + reply = self.sock_info.write_command(request_id, msg, self.codec) if self.publish: duration = (datetime.datetime.now() - start) + duration self._succeed(request_id, reply, duration) @@ -866,7 +867,7 @@ class _EncryptedBulkWriteContext(_BulkWriteContext): batched_cmd, to_send = self._batch_command(cmd, docs) result = self.sock_info.command( self.db_name, batched_cmd, - codec_options=_UNICODE_REPLACE_CODEC_OPTIONS, + codec_options=self.codec, session=self.session, client=client) return result, to_send @@ -1205,9 +1206,9 @@ class _OpReply(object): return bson._decode_all_selective( self.documents, codec_options, user_fields) - def command_response(self): + def command_response(self, codec_options): """Unpack a command response.""" - docs = self.unpack_response() + docs = self.unpack_response(codec_options=codec_options) assert self.number_returned == 1 return docs[0] @@ -1273,9 +1274,9 @@ class _OpMsg(object): return bson._decode_all_selective( self.payload_document, codec_options, user_fields) - def command_response(self): + def command_response(self, codec_options): """Unpack a command response.""" - return self.unpack_response()[0] + return self.unpack_response(codec_options=codec_options)[0] def raw_command_response(self): """Return the bytes of the command response.""" diff --git a/pymongo/pool.py b/pymongo/pool.py index a0868c991..70920d5b2 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -775,7 +775,7 @@ class SocketInfo(object): self._raise_if_not_writable(True) self.send_message(msg, max_doc_size) - def write_command(self, request_id, msg): + def write_command(self, request_id, msg, codec_options): """Send "insert" etc. command, returning response as a dict. Can raise ConnectionFailure or OperationFailure. @@ -786,7 +786,7 @@ class SocketInfo(object): """ self.send_message(msg, 0) reply = self.receive_message(request_id) - result = reply.command_response() + result = reply.command_response(codec_options) # Raises NotPrimaryError or OperationFailure. helpers._check_command_response(result, self.max_wire_version) diff --git a/test/test_bulk.py b/test/test_bulk.py index f93cd6c76..08740a437 100644 --- a/test/test_bulk.py +++ b/test/test_bulk.py @@ -15,9 +15,13 @@ """Test the bulk API.""" import sys +import uuid +from bson.binary import UuidRepresentation +from bson.codec_options import CodecOptions sys.path[0:0] = [""] +from bson import Binary from bson.objectid import ObjectId from pymongo.common import partition_node from pymongo.errors import (BulkWriteError, @@ -376,6 +380,78 @@ class TestBulk(BulkTestBase): {'index': 2, '_id': 2}]}, result.bulk_api_result) + def test_upsert_uuid_standard(self): + options = CodecOptions(uuid_representation=UuidRepresentation.STANDARD) + coll = self.coll.with_options(codec_options=options) + uuids = [uuid.uuid4() for _ in range(3)] + result = coll.bulk_write([ + UpdateOne({'_id': uuids[0]}, {'$set': {'a': 0}}, upsert=True), + ReplaceOne({'a': 1}, {'_id': uuids[1]}, upsert=True), + # This is just here to make the counts right in all cases. + ReplaceOne({'_id': uuids[2]}, {'_id': uuids[2]}, upsert=True), + ]) + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 3, + 'nInserted': 0, + 'nRemoved': 0, + 'upserted': [{'index': 0, '_id': uuids[0]}, + {'index': 1, '_id': uuids[1]}, + {'index': 2, '_id': uuids[2]}]}, + result.bulk_api_result) + + def test_upsert_uuid_unspecified(self): + options = CodecOptions(uuid_representation=UuidRepresentation.UNSPECIFIED) + coll = self.coll.with_options(codec_options=options) + uuids = [Binary.from_uuid(uuid.uuid4()) for _ in range(3)] + result = coll.bulk_write([ + UpdateOne({'_id': uuids[0]}, {'$set': {'a': 0}}, upsert=True), + ReplaceOne({'a': 1}, {'_id': uuids[1]}, upsert=True), + # This is just here to make the counts right in all cases. + ReplaceOne({'_id': uuids[2]}, {'_id': uuids[2]}, upsert=True), + ]) + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 3, + 'nInserted': 0, + 'nRemoved': 0, + 'upserted': [{'index': 0, '_id': uuids[0]}, + {'index': 1, '_id': uuids[1]}, + {'index': 2, '_id': uuids[2]}]}, + result.bulk_api_result) + + def test_upsert_uuid_standard_subdocuments(self): + options = CodecOptions(uuid_representation=UuidRepresentation.STANDARD) + coll = self.coll.with_options(codec_options=options) + ids = [ + {'f': Binary(bytes(i)), 'f2': uuid.uuid4()} + for i in range(3) + ] + + result = coll.bulk_write([ + UpdateOne({'_id': ids[0]}, {'$set': {'a': 0}}, upsert=True), + ReplaceOne({'a': 1}, {'_id': ids[1]}, upsert=True), + # This is just here to make the counts right in all cases. + ReplaceOne({'_id': ids[2]}, {'_id': ids[2]}, upsert=True), + ]) + + # The `Binary` values are returned as `bytes` objects. + for _id in ids: + _id['f'] = bytes(_id['f']) + + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 3, + 'nInserted': 0, + 'nRemoved': 0, + 'upserted': [{'index': 0, '_id': ids[0]}, + {'index': 1, '_id': ids[1]}, + {'index': 2, '_id': ids[2]}]}, + result.bulk_api_result) + def test_single_ordered_batch(self): result = self.coll.bulk_write([ InsertOne({'a': 1}), diff --git a/test/test_encryption.py b/test/test_encryption.py index 88acadfba..8e47d4452 100644 --- a/test/test_encryption.py +++ b/test/test_encryption.py @@ -29,6 +29,7 @@ sys.path[0:0] = [""] from bson import encode, json_util from bson.binary import (Binary, + UuidRepresentation, JAVA_LEGACY, STANDARD, UUID_SUBTYPE) @@ -50,13 +51,14 @@ from pymongo.errors import (BulkWriteError, ServerSelectionTimeoutError, WriteError) from pymongo.mongo_client import MongoClient -from pymongo.operations import InsertOne +from pymongo.operations import InsertOne, ReplaceOne, UpdateOne from pymongo.write_concern import WriteConcern from test import (unittest, CA_PEM, CLIENT_PEM, client_context, IntegrationTest, PyMongoTestCase) +from test.test_bulk import BulkTestBase from test.utils import (TestCreator, camel_to_snake_args, OvertCommandListener, @@ -313,6 +315,35 @@ class TestClientSimple(EncryptionIntegrationTest): client.admin.command('ping') +class TestEncryptedBulkWrite(BulkTestBase, EncryptionIntegrationTest): + + def test_upsert_uuid_standard_encrypte(self): + opts = AutoEncryptionOpts(KMS_PROVIDERS, 'keyvault.datakeys') + client = rs_or_single_client(auto_encryption_opts=opts) + self.addCleanup(client.close) + + options = CodecOptions(uuid_representation=UuidRepresentation.STANDARD) + encrypted_coll = client.pymongo_test.test + coll = encrypted_coll.with_options(codec_options=options) + uuids = [uuid.uuid4() for _ in range(3)] + result = coll.bulk_write([ + UpdateOne({'_id': uuids[0]}, {'$set': {'a': 0}}, upsert=True), + ReplaceOne({'a': 1}, {'_id': uuids[1]}, upsert=True), + # This is just here to make the counts right in all cases. + ReplaceOne({'_id': uuids[2]}, {'_id': uuids[2]}, upsert=True), + ]) + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 3, + 'nInserted': 0, + 'nRemoved': 0, + 'upserted': [{'index': 0, '_id': uuids[0]}, + {'index': 1, '_id': uuids[1]}, + {'index': 2, '_id': uuids[2]}]}, + result.bulk_api_result) + + class TestClientMaxWireVersion(IntegrationTest): @classmethod