PYTHON-3075 bulk_write does not apply CodecOptions to upserted_ids result (#840)

This commit is contained in:
Steven Silvester 2022-02-02 13:53:58 -06:00 committed by GitHub
parent aa60c2a2c0
commit abfa0d35bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 117 additions and 9 deletions

View File

@ -32,6 +32,7 @@ from bson import (CodecOptions,
_decode_selective,
_dict_to_bson,
_make_c_string)
from bson import codec_options
from bson.int64 import Int64
from bson.raw_bson import (_inflate_bson, DEFAULT_RAW_BSON_OPTIONS,
RawBSONDocument)
@ -798,7 +799,7 @@ class _BulkWriteContext(object):
self._start(cmd, request_id, docs)
start = datetime.datetime.now()
try:
reply = self.sock_info.write_command(request_id, msg)
reply = self.sock_info.write_command(request_id, msg, self.codec)
if self.publish:
duration = (datetime.datetime.now() - start) + duration
self._succeed(request_id, reply, duration)
@ -866,7 +867,7 @@ class _EncryptedBulkWriteContext(_BulkWriteContext):
batched_cmd, to_send = self._batch_command(cmd, docs)
result = self.sock_info.command(
self.db_name, batched_cmd,
codec_options=_UNICODE_REPLACE_CODEC_OPTIONS,
codec_options=self.codec,
session=self.session, client=client)
return result, to_send
@ -1205,9 +1206,9 @@ class _OpReply(object):
return bson._decode_all_selective(
self.documents, codec_options, user_fields)
def command_response(self):
def command_response(self, codec_options):
"""Unpack a command response."""
docs = self.unpack_response()
docs = self.unpack_response(codec_options=codec_options)
assert self.number_returned == 1
return docs[0]
@ -1273,9 +1274,9 @@ class _OpMsg(object):
return bson._decode_all_selective(
self.payload_document, codec_options, user_fields)
def command_response(self):
def command_response(self, codec_options):
"""Unpack a command response."""
return self.unpack_response()[0]
return self.unpack_response(codec_options=codec_options)[0]
def raw_command_response(self):
"""Return the bytes of the command response."""

View File

@ -775,7 +775,7 @@ class SocketInfo(object):
self._raise_if_not_writable(True)
self.send_message(msg, max_doc_size)
def write_command(self, request_id, msg):
def write_command(self, request_id, msg, codec_options):
"""Send "insert" etc. command, returning response as a dict.
Can raise ConnectionFailure or OperationFailure.
@ -786,7 +786,7 @@ class SocketInfo(object):
"""
self.send_message(msg, 0)
reply = self.receive_message(request_id)
result = reply.command_response()
result = reply.command_response(codec_options)
# Raises NotPrimaryError or OperationFailure.
helpers._check_command_response(result, self.max_wire_version)

View File

@ -15,9 +15,13 @@
"""Test the bulk API."""
import sys
import uuid
from bson.binary import UuidRepresentation
from bson.codec_options import CodecOptions
sys.path[0:0] = [""]
from bson import Binary
from bson.objectid import ObjectId
from pymongo.common import partition_node
from pymongo.errors import (BulkWriteError,
@ -376,6 +380,78 @@ class TestBulk(BulkTestBase):
{'index': 2, '_id': 2}]},
result.bulk_api_result)
def test_upsert_uuid_standard(self):
options = CodecOptions(uuid_representation=UuidRepresentation.STANDARD)
coll = self.coll.with_options(codec_options=options)
uuids = [uuid.uuid4() for _ in range(3)]
result = coll.bulk_write([
UpdateOne({'_id': uuids[0]}, {'$set': {'a': 0}}, upsert=True),
ReplaceOne({'a': 1}, {'_id': uuids[1]}, upsert=True),
# This is just here to make the counts right in all cases.
ReplaceOne({'_id': uuids[2]}, {'_id': uuids[2]}, upsert=True),
])
self.assertEqualResponse(
{'nMatched': 0,
'nModified': 0,
'nUpserted': 3,
'nInserted': 0,
'nRemoved': 0,
'upserted': [{'index': 0, '_id': uuids[0]},
{'index': 1, '_id': uuids[1]},
{'index': 2, '_id': uuids[2]}]},
result.bulk_api_result)
def test_upsert_uuid_unspecified(self):
options = CodecOptions(uuid_representation=UuidRepresentation.UNSPECIFIED)
coll = self.coll.with_options(codec_options=options)
uuids = [Binary.from_uuid(uuid.uuid4()) for _ in range(3)]
result = coll.bulk_write([
UpdateOne({'_id': uuids[0]}, {'$set': {'a': 0}}, upsert=True),
ReplaceOne({'a': 1}, {'_id': uuids[1]}, upsert=True),
# This is just here to make the counts right in all cases.
ReplaceOne({'_id': uuids[2]}, {'_id': uuids[2]}, upsert=True),
])
self.assertEqualResponse(
{'nMatched': 0,
'nModified': 0,
'nUpserted': 3,
'nInserted': 0,
'nRemoved': 0,
'upserted': [{'index': 0, '_id': uuids[0]},
{'index': 1, '_id': uuids[1]},
{'index': 2, '_id': uuids[2]}]},
result.bulk_api_result)
def test_upsert_uuid_standard_subdocuments(self):
options = CodecOptions(uuid_representation=UuidRepresentation.STANDARD)
coll = self.coll.with_options(codec_options=options)
ids = [
{'f': Binary(bytes(i)), 'f2': uuid.uuid4()}
for i in range(3)
]
result = coll.bulk_write([
UpdateOne({'_id': ids[0]}, {'$set': {'a': 0}}, upsert=True),
ReplaceOne({'a': 1}, {'_id': ids[1]}, upsert=True),
# This is just here to make the counts right in all cases.
ReplaceOne({'_id': ids[2]}, {'_id': ids[2]}, upsert=True),
])
# The `Binary` values are returned as `bytes` objects.
for _id in ids:
_id['f'] = bytes(_id['f'])
self.assertEqualResponse(
{'nMatched': 0,
'nModified': 0,
'nUpserted': 3,
'nInserted': 0,
'nRemoved': 0,
'upserted': [{'index': 0, '_id': ids[0]},
{'index': 1, '_id': ids[1]},
{'index': 2, '_id': ids[2]}]},
result.bulk_api_result)
def test_single_ordered_batch(self):
result = self.coll.bulk_write([
InsertOne({'a': 1}),

View File

@ -29,6 +29,7 @@ sys.path[0:0] = [""]
from bson import encode, json_util
from bson.binary import (Binary,
UuidRepresentation,
JAVA_LEGACY,
STANDARD,
UUID_SUBTYPE)
@ -50,13 +51,14 @@ from pymongo.errors import (BulkWriteError,
ServerSelectionTimeoutError,
WriteError)
from pymongo.mongo_client import MongoClient
from pymongo.operations import InsertOne
from pymongo.operations import InsertOne, ReplaceOne, UpdateOne
from pymongo.write_concern import WriteConcern
from test import (unittest, CA_PEM, CLIENT_PEM,
client_context,
IntegrationTest,
PyMongoTestCase)
from test.test_bulk import BulkTestBase
from test.utils import (TestCreator,
camel_to_snake_args,
OvertCommandListener,
@ -313,6 +315,35 @@ class TestClientSimple(EncryptionIntegrationTest):
client.admin.command('ping')
class TestEncryptedBulkWrite(BulkTestBase, EncryptionIntegrationTest):
def test_upsert_uuid_standard_encrypte(self):
opts = AutoEncryptionOpts(KMS_PROVIDERS, 'keyvault.datakeys')
client = rs_or_single_client(auto_encryption_opts=opts)
self.addCleanup(client.close)
options = CodecOptions(uuid_representation=UuidRepresentation.STANDARD)
encrypted_coll = client.pymongo_test.test
coll = encrypted_coll.with_options(codec_options=options)
uuids = [uuid.uuid4() for _ in range(3)]
result = coll.bulk_write([
UpdateOne({'_id': uuids[0]}, {'$set': {'a': 0}}, upsert=True),
ReplaceOne({'a': 1}, {'_id': uuids[1]}, upsert=True),
# This is just here to make the counts right in all cases.
ReplaceOne({'_id': uuids[2]}, {'_id': uuids[2]}, upsert=True),
])
self.assertEqualResponse(
{'nMatched': 0,
'nModified': 0,
'nUpserted': 3,
'nInserted': 0,
'nRemoved': 0,
'upserted': [{'index': 0, '_id': uuids[0]},
{'index': 1, '_id': uuids[1]},
{'index': 2, '_id': uuids[2]}]},
result.bulk_api_result)
class TestClientMaxWireVersion(IntegrationTest):
@classmethod