PYTHON-4650 Fix MongoClient.bulk_write test failure when compression is enabled (#1786)

This commit is contained in:
Shruti Sridhar 2024-08-12 10:21:09 -07:00 committed by GitHub
parent 30b32d00c4
commit 2afbd4b279
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 110 additions and 88 deletions

View File

@ -852,6 +852,10 @@ class ClientContext:
def max_write_batch_size(self):
return (self.hello)["maxWriteBatchSize"]
@property
def max_message_size_bytes(self):
return (self.hello)["maxMessageSizeBytes"]
# Reusable client context
client_context = ClientContext()

View File

@ -854,6 +854,10 @@ class AsyncClientContext:
async def max_write_batch_size(self):
return (await self.hello)["maxWriteBatchSize"]
@property
async def max_message_size_bytes(self):
return (await self.hello)["maxMessageSizeBytes"]
# Reusable client context
async_client_context = AsyncClientContext()

View File

@ -56,20 +56,24 @@ class TestClientBulkWrite(AsyncIntegrationTest):
# https://github.com/mongodb/specifications/tree/master/source/crud/tests
class TestClientBulkWriteCRUD(AsyncIntegrationTest):
async def asyncSetUp(self):
self.max_write_batch_size = await async_client_context.max_write_batch_size
self.max_bson_object_size = await async_client_context.max_bson_size
self.max_message_size_bytes = await async_client_context.max_message_size_bytes
@async_client_context.require_version_min(8, 0, 0, -24)
async def test_batch_splits_if_num_operations_too_large(self):
listener = OvertCommandListener()
client = await async_rs_or_single_client(event_listeners=[listener])
self.addAsyncCleanup(client.aclose)
max_write_batch_size = (await async_client_context.hello)["maxWriteBatchSize"]
models = []
for _ in range(max_write_batch_size + 1):
for _ in range(self.max_write_batch_size + 1):
models.append(InsertOne(namespace="db.coll", document={"a": "b"}))
self.addAsyncCleanup(client.db["coll"].drop)
result = await client.bulk_write(models=models)
self.assertEqual(result.inserted_count, max_write_batch_size + 1)
self.assertEqual(result.inserted_count, self.max_write_batch_size + 1)
bulk_write_events = []
for event in listener.started_events:
@ -78,7 +82,7 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
self.assertEqual(len(bulk_write_events), 2)
first_event, second_event = bulk_write_events
self.assertEqual(len(first_event.command["ops"]), max_write_batch_size)
self.assertEqual(len(first_event.command["ops"]), self.max_write_batch_size)
self.assertEqual(len(second_event.command["ops"]), 1)
self.assertEqual(first_event.operation_id, second_event.operation_id)
@ -88,12 +92,9 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
client = await async_rs_or_single_client(event_listeners=[listener])
self.addAsyncCleanup(client.aclose)
max_message_size_bytes = (await async_client_context.hello)["maxMessageSizeBytes"]
max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"]
models = []
num_models = int(max_message_size_bytes / max_bson_object_size + 1)
b_repeated = "b" * (max_bson_object_size - 500)
num_models = int(self.max_message_size_bytes / self.max_bson_object_size + 1)
b_repeated = "b" * (self.max_bson_object_size - 500)
for _ in range(num_models):
models.append(
InsertOne(
@ -126,7 +127,6 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
retryWrites=False,
)
self.addAsyncCleanup(client.aclose)
max_write_batch_size = (await async_client_context.hello)["maxWriteBatchSize"]
fail_command = {
"configureFailPoint": "failCommand",
@ -138,7 +138,7 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
}
async with self.fail_point(fail_command):
models = []
for _ in range(max_write_batch_size + 1):
for _ in range(self.max_write_batch_size + 1):
models.append(
InsertOne(
namespace="db.coll",
@ -152,7 +152,7 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
self.assertEqual(len(context.exception.write_concern_errors), 2) # type: ignore[arg-type]
self.assertIsNotNone(context.exception.partial_result)
self.assertEqual(
context.exception.partial_result.inserted_count, max_write_batch_size + 1
context.exception.partial_result.inserted_count, self.max_write_batch_size + 1
)
bulk_write_events = []
@ -172,9 +172,8 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
await collection.drop()
await collection.insert_one(document={"_id": 1})
max_write_batch_size = (await async_client_context.hello)["maxWriteBatchSize"]
models = []
for _ in range(max_write_batch_size + 1):
for _ in range(self.max_write_batch_size + 1):
models.append(
InsertOne(
namespace="db.coll",
@ -184,7 +183,7 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
with self.assertRaises(ClientBulkWriteException) as context:
await client.bulk_write(models=models, ordered=False)
self.assertEqual(len(context.exception.write_errors), max_write_batch_size + 1) # type: ignore[arg-type]
self.assertEqual(len(context.exception.write_errors), self.max_write_batch_size + 1) # type: ignore[arg-type]
bulk_write_events = []
for event in listener.started_events:
@ -203,9 +202,8 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
await collection.drop()
await collection.insert_one(document={"_id": 1})
max_write_batch_size = (await async_client_context.hello)["maxWriteBatchSize"]
models = []
for _ in range(max_write_batch_size + 1):
for _ in range(self.max_write_batch_size + 1):
models.append(
InsertOne(
namespace="db.coll",
@ -233,10 +231,9 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
self.addAsyncCleanup(collection.drop)
await collection.drop()
max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"]
models = []
a_repeated = "a" * (max_bson_object_size // 2)
b_repeated = "b" * (max_bson_object_size // 2)
a_repeated = "a" * (self.max_bson_object_size // 2)
b_repeated = "b" * (self.max_bson_object_size // 2)
models.append(
UpdateOne(
namespace="db.coll",
@ -275,12 +272,11 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
self.addAsyncCleanup(collection.drop)
await collection.drop()
max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"]
async with client.start_session() as session:
await session.start_transaction()
models = []
a_repeated = "a" * (max_bson_object_size // 2)
b_repeated = "b" * (max_bson_object_size // 2)
a_repeated = "a" * (self.max_bson_object_size // 2)
b_repeated = "b" * (self.max_bson_object_size // 2)
models.append(
UpdateOne(
namespace="db.coll",
@ -319,7 +315,6 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
self.addAsyncCleanup(collection.drop)
await collection.drop()
max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"]
fail_command = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
@ -327,8 +322,8 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
}
async with self.fail_point(fail_command):
models = []
a_repeated = "a" * (max_bson_object_size // 2)
b_repeated = "b" * (max_bson_object_size // 2)
a_repeated = "a" * (self.max_bson_object_size // 2)
b_repeated = "b" * (self.max_bson_object_size // 2)
models.append(
UpdateOne(
namespace="db.coll",
@ -370,8 +365,7 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
client = await async_rs_or_single_client(event_listeners=[listener])
self.addAsyncCleanup(client.aclose)
max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"]
b_repeated = "b" * max_bson_object_size
b_repeated = "b" * self.max_bson_object_size
# Insert document.
models_insert = [InsertOne(namespace="db.coll", document={"a": b_repeated})]
@ -384,15 +378,25 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
await client.bulk_write(models=models_replace, write_concern=WriteConcern(w=0))
async def _setup_namespace_test_models(self):
max_message_size_bytes = (await async_client_context.hello)["maxMessageSizeBytes"]
max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"]
# See prose test specification below for details on these calculations.
# https://github.com/mongodb/specifications/tree/master/source/crud/tests#details-on-size-calculations
_EXISTING_BULK_WRITE_BYTES = 1122
_OPERATION_DOC_BYTES = 57
_NAMESPACE_DOC_BYTES = 217
ops_bytes = max_message_size_bytes - 1122
num_models = ops_bytes // max_bson_object_size
remainder_bytes = ops_bytes % max_bson_object_size
# When compression is enabled, max_message_size is
# smaller to account for compression message header.
if async_client_context.client_options.get("compressors"):
max_message_size_bytes = self.max_message_size_bytes - 16
else:
max_message_size_bytes = self.max_message_size_bytes
ops_bytes = max_message_size_bytes - _EXISTING_BULK_WRITE_BYTES
num_models = ops_bytes // self.max_bson_object_size
remainder_bytes = ops_bytes % self.max_bson_object_size
models = []
b_repeated = "b" * (max_bson_object_size - 57)
b_repeated = "b" * (self.max_bson_object_size - _OPERATION_DOC_BYTES)
for _ in range(num_models):
models.append(
InsertOne(
@ -400,9 +404,9 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
document={"a": b_repeated},
)
)
if remainder_bytes >= 217:
if remainder_bytes >= _NAMESPACE_DOC_BYTES:
num_models += 1
b_repeated = "b" * (remainder_bytes - 57)
b_repeated = "b" * (remainder_bytes - _OPERATION_DOC_BYTES)
models.append(
InsertOne(
namespace="db.coll",
@ -485,17 +489,15 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
client = await async_rs_or_single_client()
self.addAsyncCleanup(client.aclose)
max_message_size_bytes = (await async_client_context.hello)["maxMessageSizeBytes"]
# Document too large.
b_repeated = "b" * max_message_size_bytes
b_repeated = "b" * self.max_message_size_bytes
models = [InsertOne(namespace="db.coll", document={"a": b_repeated})]
with self.assertRaises(InvalidOperation) as context:
await client.bulk_write(models=models)
self.assertIn("cannot do an empty bulk write", context.exception._message)
# Namespace too large.
c_repeated = "c" * max_message_size_bytes
c_repeated = "c" * self.max_message_size_bytes
namespace = f"db.{c_repeated}"
models = [InsertOne(namespace=namespace, document={"a": "b"})]
with self.assertRaises(InvalidOperation) as context:
@ -522,9 +524,16 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
# https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites
class TestClientBulkWriteTimeout(AsyncIntegrationTest):
async def asyncSetUp(self):
self.max_write_batch_size = await async_client_context.max_write_batch_size
self.max_bson_object_size = await async_client_context.max_bson_size
self.max_message_size_bytes = await async_client_context.max_message_size_bytes
@async_client_context.require_version_min(8, 0, 0, -24)
@async_client_context.require_failCommand_fail_point
async def test_timeout_in_multi_batch_bulk_write(self):
_OVERHEAD = 500
internal_client = await async_rs_or_single_client(timeoutMS=None)
self.addAsyncCleanup(internal_client.aclose)
@ -532,8 +541,6 @@ class TestClientBulkWriteTimeout(AsyncIntegrationTest):
self.addAsyncCleanup(collection.drop)
await collection.drop()
max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"]
max_message_size_bytes = (await async_client_context.hello)["maxMessageSizeBytes"]
fail_command = {
"configureFailPoint": "failCommand",
"mode": {"times": 2},
@ -541,8 +548,8 @@ class TestClientBulkWriteTimeout(AsyncIntegrationTest):
}
async with self.fail_point(fail_command):
models = []
num_models = int(max_message_size_bytes / max_bson_object_size + 1)
b_repeated = "b" * (max_bson_object_size - 500)
num_models = int(self.max_message_size_bytes / self.max_bson_object_size + 1)
b_repeated = "b" * (self.max_bson_object_size - _OVERHEAD)
for _ in range(num_models):
models.append(
InsertOne(

View File

@ -56,20 +56,24 @@ class TestClientBulkWrite(IntegrationTest):
# https://github.com/mongodb/specifications/tree/master/source/crud/tests
class TestClientBulkWriteCRUD(IntegrationTest):
def setUp(self):
self.max_write_batch_size = client_context.max_write_batch_size
self.max_bson_object_size = client_context.max_bson_size
self.max_message_size_bytes = client_context.max_message_size_bytes
@client_context.require_version_min(8, 0, 0, -24)
def test_batch_splits_if_num_operations_too_large(self):
listener = OvertCommandListener()
client = rs_or_single_client(event_listeners=[listener])
self.addCleanup(client.close)
max_write_batch_size = (client_context.hello)["maxWriteBatchSize"]
models = []
for _ in range(max_write_batch_size + 1):
for _ in range(self.max_write_batch_size + 1):
models.append(InsertOne(namespace="db.coll", document={"a": "b"}))
self.addCleanup(client.db["coll"].drop)
result = client.bulk_write(models=models)
self.assertEqual(result.inserted_count, max_write_batch_size + 1)
self.assertEqual(result.inserted_count, self.max_write_batch_size + 1)
bulk_write_events = []
for event in listener.started_events:
@ -78,7 +82,7 @@ class TestClientBulkWriteCRUD(IntegrationTest):
self.assertEqual(len(bulk_write_events), 2)
first_event, second_event = bulk_write_events
self.assertEqual(len(first_event.command["ops"]), max_write_batch_size)
self.assertEqual(len(first_event.command["ops"]), self.max_write_batch_size)
self.assertEqual(len(second_event.command["ops"]), 1)
self.assertEqual(first_event.operation_id, second_event.operation_id)
@ -88,12 +92,9 @@ class TestClientBulkWriteCRUD(IntegrationTest):
client = rs_or_single_client(event_listeners=[listener])
self.addCleanup(client.close)
max_message_size_bytes = (client_context.hello)["maxMessageSizeBytes"]
max_bson_object_size = (client_context.hello)["maxBsonObjectSize"]
models = []
num_models = int(max_message_size_bytes / max_bson_object_size + 1)
b_repeated = "b" * (max_bson_object_size - 500)
num_models = int(self.max_message_size_bytes / self.max_bson_object_size + 1)
b_repeated = "b" * (self.max_bson_object_size - 500)
for _ in range(num_models):
models.append(
InsertOne(
@ -126,7 +127,6 @@ class TestClientBulkWriteCRUD(IntegrationTest):
retryWrites=False,
)
self.addCleanup(client.close)
max_write_batch_size = (client_context.hello)["maxWriteBatchSize"]
fail_command = {
"configureFailPoint": "failCommand",
@ -138,7 +138,7 @@ class TestClientBulkWriteCRUD(IntegrationTest):
}
with self.fail_point(fail_command):
models = []
for _ in range(max_write_batch_size + 1):
for _ in range(self.max_write_batch_size + 1):
models.append(
InsertOne(
namespace="db.coll",
@ -152,7 +152,7 @@ class TestClientBulkWriteCRUD(IntegrationTest):
self.assertEqual(len(context.exception.write_concern_errors), 2) # type: ignore[arg-type]
self.assertIsNotNone(context.exception.partial_result)
self.assertEqual(
context.exception.partial_result.inserted_count, max_write_batch_size + 1
context.exception.partial_result.inserted_count, self.max_write_batch_size + 1
)
bulk_write_events = []
@ -172,9 +172,8 @@ class TestClientBulkWriteCRUD(IntegrationTest):
collection.drop()
collection.insert_one(document={"_id": 1})
max_write_batch_size = (client_context.hello)["maxWriteBatchSize"]
models = []
for _ in range(max_write_batch_size + 1):
for _ in range(self.max_write_batch_size + 1):
models.append(
InsertOne(
namespace="db.coll",
@ -184,7 +183,7 @@ class TestClientBulkWriteCRUD(IntegrationTest):
with self.assertRaises(ClientBulkWriteException) as context:
client.bulk_write(models=models, ordered=False)
self.assertEqual(len(context.exception.write_errors), max_write_batch_size + 1) # type: ignore[arg-type]
self.assertEqual(len(context.exception.write_errors), self.max_write_batch_size + 1) # type: ignore[arg-type]
bulk_write_events = []
for event in listener.started_events:
@ -203,9 +202,8 @@ class TestClientBulkWriteCRUD(IntegrationTest):
collection.drop()
collection.insert_one(document={"_id": 1})
max_write_batch_size = (client_context.hello)["maxWriteBatchSize"]
models = []
for _ in range(max_write_batch_size + 1):
for _ in range(self.max_write_batch_size + 1):
models.append(
InsertOne(
namespace="db.coll",
@ -233,10 +231,9 @@ class TestClientBulkWriteCRUD(IntegrationTest):
self.addCleanup(collection.drop)
collection.drop()
max_bson_object_size = (client_context.hello)["maxBsonObjectSize"]
models = []
a_repeated = "a" * (max_bson_object_size // 2)
b_repeated = "b" * (max_bson_object_size // 2)
a_repeated = "a" * (self.max_bson_object_size // 2)
b_repeated = "b" * (self.max_bson_object_size // 2)
models.append(
UpdateOne(
namespace="db.coll",
@ -275,12 +272,11 @@ class TestClientBulkWriteCRUD(IntegrationTest):
self.addCleanup(collection.drop)
collection.drop()
max_bson_object_size = (client_context.hello)["maxBsonObjectSize"]
with client.start_session() as session:
session.start_transaction()
models = []
a_repeated = "a" * (max_bson_object_size // 2)
b_repeated = "b" * (max_bson_object_size // 2)
a_repeated = "a" * (self.max_bson_object_size // 2)
b_repeated = "b" * (self.max_bson_object_size // 2)
models.append(
UpdateOne(
namespace="db.coll",
@ -319,7 +315,6 @@ class TestClientBulkWriteCRUD(IntegrationTest):
self.addCleanup(collection.drop)
collection.drop()
max_bson_object_size = (client_context.hello)["maxBsonObjectSize"]
fail_command = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
@ -327,8 +322,8 @@ class TestClientBulkWriteCRUD(IntegrationTest):
}
with self.fail_point(fail_command):
models = []
a_repeated = "a" * (max_bson_object_size // 2)
b_repeated = "b" * (max_bson_object_size // 2)
a_repeated = "a" * (self.max_bson_object_size // 2)
b_repeated = "b" * (self.max_bson_object_size // 2)
models.append(
UpdateOne(
namespace="db.coll",
@ -370,8 +365,7 @@ class TestClientBulkWriteCRUD(IntegrationTest):
client = rs_or_single_client(event_listeners=[listener])
self.addCleanup(client.close)
max_bson_object_size = (client_context.hello)["maxBsonObjectSize"]
b_repeated = "b" * max_bson_object_size
b_repeated = "b" * self.max_bson_object_size
# Insert document.
models_insert = [InsertOne(namespace="db.coll", document={"a": b_repeated})]
@ -384,15 +378,25 @@ class TestClientBulkWriteCRUD(IntegrationTest):
client.bulk_write(models=models_replace, write_concern=WriteConcern(w=0))
def _setup_namespace_test_models(self):
max_message_size_bytes = (client_context.hello)["maxMessageSizeBytes"]
max_bson_object_size = (client_context.hello)["maxBsonObjectSize"]
# See prose test specification below for details on these calculations.
# https://github.com/mongodb/specifications/tree/master/source/crud/tests#details-on-size-calculations
_EXISTING_BULK_WRITE_BYTES = 1122
_OPERATION_DOC_BYTES = 57
_NAMESPACE_DOC_BYTES = 217
ops_bytes = max_message_size_bytes - 1122
num_models = ops_bytes // max_bson_object_size
remainder_bytes = ops_bytes % max_bson_object_size
# When compression is enabled, max_message_size is
# smaller to account for compression message header.
if client_context.client_options.get("compressors"):
max_message_size_bytes = self.max_message_size_bytes - 16
else:
max_message_size_bytes = self.max_message_size_bytes
ops_bytes = max_message_size_bytes - _EXISTING_BULK_WRITE_BYTES
num_models = ops_bytes // self.max_bson_object_size
remainder_bytes = ops_bytes % self.max_bson_object_size
models = []
b_repeated = "b" * (max_bson_object_size - 57)
b_repeated = "b" * (self.max_bson_object_size - _OPERATION_DOC_BYTES)
for _ in range(num_models):
models.append(
InsertOne(
@ -400,9 +404,9 @@ class TestClientBulkWriteCRUD(IntegrationTest):
document={"a": b_repeated},
)
)
if remainder_bytes >= 217:
if remainder_bytes >= _NAMESPACE_DOC_BYTES:
num_models += 1
b_repeated = "b" * (remainder_bytes - 57)
b_repeated = "b" * (remainder_bytes - _OPERATION_DOC_BYTES)
models.append(
InsertOne(
namespace="db.coll",
@ -485,17 +489,15 @@ class TestClientBulkWriteCRUD(IntegrationTest):
client = rs_or_single_client()
self.addCleanup(client.close)
max_message_size_bytes = (client_context.hello)["maxMessageSizeBytes"]
# Document too large.
b_repeated = "b" * max_message_size_bytes
b_repeated = "b" * self.max_message_size_bytes
models = [InsertOne(namespace="db.coll", document={"a": b_repeated})]
with self.assertRaises(InvalidOperation) as context:
client.bulk_write(models=models)
self.assertIn("cannot do an empty bulk write", context.exception._message)
# Namespace too large.
c_repeated = "c" * max_message_size_bytes
c_repeated = "c" * self.max_message_size_bytes
namespace = f"db.{c_repeated}"
models = [InsertOne(namespace=namespace, document={"a": "b"})]
with self.assertRaises(InvalidOperation) as context:
@ -522,9 +524,16 @@ class TestClientBulkWriteCRUD(IntegrationTest):
# https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites
class TestClientBulkWriteTimeout(IntegrationTest):
def setUp(self):
self.max_write_batch_size = client_context.max_write_batch_size
self.max_bson_object_size = client_context.max_bson_size
self.max_message_size_bytes = client_context.max_message_size_bytes
@client_context.require_version_min(8, 0, 0, -24)
@client_context.require_failCommand_fail_point
def test_timeout_in_multi_batch_bulk_write(self):
_OVERHEAD = 500
internal_client = rs_or_single_client(timeoutMS=None)
self.addCleanup(internal_client.close)
@ -532,8 +541,6 @@ class TestClientBulkWriteTimeout(IntegrationTest):
self.addCleanup(collection.drop)
collection.drop()
max_bson_object_size = (client_context.hello)["maxBsonObjectSize"]
max_message_size_bytes = (client_context.hello)["maxMessageSizeBytes"]
fail_command = {
"configureFailPoint": "failCommand",
"mode": {"times": 2},
@ -541,8 +548,8 @@ class TestClientBulkWriteTimeout(IntegrationTest):
}
with self.fail_point(fail_command):
models = []
num_models = int(max_message_size_bytes / max_bson_object_size + 1)
b_repeated = "b" * (max_bson_object_size - 500)
num_models = int(self.max_message_size_bytes / self.max_bson_object_size + 1)
b_repeated = "b" * (self.max_bson_object_size - _OVERHEAD)
for _ in range(num_models):
models.append(
InsertOne(