diff --git a/test/__init__.py b/test/__init__.py index b8394bd13..e60736e3e 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -852,6 +852,10 @@ class ClientContext: def max_write_batch_size(self): return (self.hello)["maxWriteBatchSize"] + @property + def max_message_size_bytes(self): + return (self.hello)["maxMessageSizeBytes"] + # Reusable client context client_context = ClientContext() diff --git a/test/asynchronous/__init__.py b/test/asynchronous/__init__.py index 4098d9af8..a95a9e31b 100644 --- a/test/asynchronous/__init__.py +++ b/test/asynchronous/__init__.py @@ -854,6 +854,10 @@ class AsyncClientContext: async def max_write_batch_size(self): return (await self.hello)["maxWriteBatchSize"] + @property + async def max_message_size_bytes(self): + return (await self.hello)["maxMessageSizeBytes"] + # Reusable client context async_client_context = AsyncClientContext() diff --git a/test/asynchronous/test_client_bulk_write.py b/test/asynchronous/test_client_bulk_write.py index f55b3082b..7ce989bbe 100644 --- a/test/asynchronous/test_client_bulk_write.py +++ b/test/asynchronous/test_client_bulk_write.py @@ -56,20 +56,24 @@ class TestClientBulkWrite(AsyncIntegrationTest): # https://github.com/mongodb/specifications/tree/master/source/crud/tests class TestClientBulkWriteCRUD(AsyncIntegrationTest): + async def asyncSetUp(self): + self.max_write_batch_size = await async_client_context.max_write_batch_size + self.max_bson_object_size = await async_client_context.max_bson_size + self.max_message_size_bytes = await async_client_context.max_message_size_bytes + @async_client_context.require_version_min(8, 0, 0, -24) async def test_batch_splits_if_num_operations_too_large(self): listener = OvertCommandListener() client = await async_rs_or_single_client(event_listeners=[listener]) self.addAsyncCleanup(client.aclose) - max_write_batch_size = (await async_client_context.hello)["maxWriteBatchSize"] models = [] - for _ in range(max_write_batch_size + 1): + for _ in range(self.max_write_batch_size + 1): models.append(InsertOne(namespace="db.coll", document={"a": "b"})) self.addAsyncCleanup(client.db["coll"].drop) result = await client.bulk_write(models=models) - self.assertEqual(result.inserted_count, max_write_batch_size + 1) + self.assertEqual(result.inserted_count, self.max_write_batch_size + 1) bulk_write_events = [] for event in listener.started_events: @@ -78,7 +82,7 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): self.assertEqual(len(bulk_write_events), 2) first_event, second_event = bulk_write_events - self.assertEqual(len(first_event.command["ops"]), max_write_batch_size) + self.assertEqual(len(first_event.command["ops"]), self.max_write_batch_size) self.assertEqual(len(second_event.command["ops"]), 1) self.assertEqual(first_event.operation_id, second_event.operation_id) @@ -88,12 +92,9 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): client = await async_rs_or_single_client(event_listeners=[listener]) self.addAsyncCleanup(client.aclose) - max_message_size_bytes = (await async_client_context.hello)["maxMessageSizeBytes"] - max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"] - models = [] - num_models = int(max_message_size_bytes / max_bson_object_size + 1) - b_repeated = "b" * (max_bson_object_size - 500) + num_models = int(self.max_message_size_bytes / self.max_bson_object_size + 1) + b_repeated = "b" * (self.max_bson_object_size - 500) for _ in range(num_models): models.append( InsertOne( @@ -126,7 +127,6 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): retryWrites=False, ) self.addAsyncCleanup(client.aclose) - max_write_batch_size = (await async_client_context.hello)["maxWriteBatchSize"] fail_command = { "configureFailPoint": "failCommand", @@ -138,7 +138,7 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): } async with self.fail_point(fail_command): models = [] - for _ in range(max_write_batch_size + 1): + for _ in range(self.max_write_batch_size + 1): models.append( InsertOne( namespace="db.coll", @@ -152,7 +152,7 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): self.assertEqual(len(context.exception.write_concern_errors), 2) # type: ignore[arg-type] self.assertIsNotNone(context.exception.partial_result) self.assertEqual( - context.exception.partial_result.inserted_count, max_write_batch_size + 1 + context.exception.partial_result.inserted_count, self.max_write_batch_size + 1 ) bulk_write_events = [] @@ -172,9 +172,8 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): await collection.drop() await collection.insert_one(document={"_id": 1}) - max_write_batch_size = (await async_client_context.hello)["maxWriteBatchSize"] models = [] - for _ in range(max_write_batch_size + 1): + for _ in range(self.max_write_batch_size + 1): models.append( InsertOne( namespace="db.coll", @@ -184,7 +183,7 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): with self.assertRaises(ClientBulkWriteException) as context: await client.bulk_write(models=models, ordered=False) - self.assertEqual(len(context.exception.write_errors), max_write_batch_size + 1) # type: ignore[arg-type] + self.assertEqual(len(context.exception.write_errors), self.max_write_batch_size + 1) # type: ignore[arg-type] bulk_write_events = [] for event in listener.started_events: @@ -203,9 +202,8 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): await collection.drop() await collection.insert_one(document={"_id": 1}) - max_write_batch_size = (await async_client_context.hello)["maxWriteBatchSize"] models = [] - for _ in range(max_write_batch_size + 1): + for _ in range(self.max_write_batch_size + 1): models.append( InsertOne( namespace="db.coll", @@ -233,10 +231,9 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): self.addAsyncCleanup(collection.drop) await collection.drop() - max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"] models = [] - a_repeated = "a" * (max_bson_object_size // 2) - b_repeated = "b" * (max_bson_object_size // 2) + a_repeated = "a" * (self.max_bson_object_size // 2) + b_repeated = "b" * (self.max_bson_object_size // 2) models.append( UpdateOne( namespace="db.coll", @@ -275,12 +272,11 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): self.addAsyncCleanup(collection.drop) await collection.drop() - max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"] async with client.start_session() as session: await session.start_transaction() models = [] - a_repeated = "a" * (max_bson_object_size // 2) - b_repeated = "b" * (max_bson_object_size // 2) + a_repeated = "a" * (self.max_bson_object_size // 2) + b_repeated = "b" * (self.max_bson_object_size // 2) models.append( UpdateOne( namespace="db.coll", @@ -319,7 +315,6 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): self.addAsyncCleanup(collection.drop) await collection.drop() - max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"] fail_command = { "configureFailPoint": "failCommand", "mode": {"times": 1}, @@ -327,8 +322,8 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): } async with self.fail_point(fail_command): models = [] - a_repeated = "a" * (max_bson_object_size // 2) - b_repeated = "b" * (max_bson_object_size // 2) + a_repeated = "a" * (self.max_bson_object_size // 2) + b_repeated = "b" * (self.max_bson_object_size // 2) models.append( UpdateOne( namespace="db.coll", @@ -370,8 +365,7 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): client = await async_rs_or_single_client(event_listeners=[listener]) self.addAsyncCleanup(client.aclose) - max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"] - b_repeated = "b" * max_bson_object_size + b_repeated = "b" * self.max_bson_object_size # Insert document. models_insert = [InsertOne(namespace="db.coll", document={"a": b_repeated})] @@ -384,15 +378,25 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): await client.bulk_write(models=models_replace, write_concern=WriteConcern(w=0)) async def _setup_namespace_test_models(self): - max_message_size_bytes = (await async_client_context.hello)["maxMessageSizeBytes"] - max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"] + # See prose test specification below for details on these calculations. + # https://github.com/mongodb/specifications/tree/master/source/crud/tests#details-on-size-calculations + _EXISTING_BULK_WRITE_BYTES = 1122 + _OPERATION_DOC_BYTES = 57 + _NAMESPACE_DOC_BYTES = 217 - ops_bytes = max_message_size_bytes - 1122 - num_models = ops_bytes // max_bson_object_size - remainder_bytes = ops_bytes % max_bson_object_size + # When compression is enabled, max_message_size is + # smaller to account for compression message header. + if async_client_context.client_options.get("compressors"): + max_message_size_bytes = self.max_message_size_bytes - 16 + else: + max_message_size_bytes = self.max_message_size_bytes + + ops_bytes = max_message_size_bytes - _EXISTING_BULK_WRITE_BYTES + num_models = ops_bytes // self.max_bson_object_size + remainder_bytes = ops_bytes % self.max_bson_object_size models = [] - b_repeated = "b" * (max_bson_object_size - 57) + b_repeated = "b" * (self.max_bson_object_size - _OPERATION_DOC_BYTES) for _ in range(num_models): models.append( InsertOne( @@ -400,9 +404,9 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): document={"a": b_repeated}, ) ) - if remainder_bytes >= 217: + if remainder_bytes >= _NAMESPACE_DOC_BYTES: num_models += 1 - b_repeated = "b" * (remainder_bytes - 57) + b_repeated = "b" * (remainder_bytes - _OPERATION_DOC_BYTES) models.append( InsertOne( namespace="db.coll", @@ -485,17 +489,15 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): client = await async_rs_or_single_client() self.addAsyncCleanup(client.aclose) - max_message_size_bytes = (await async_client_context.hello)["maxMessageSizeBytes"] - # Document too large. - b_repeated = "b" * max_message_size_bytes + b_repeated = "b" * self.max_message_size_bytes models = [InsertOne(namespace="db.coll", document={"a": b_repeated})] with self.assertRaises(InvalidOperation) as context: await client.bulk_write(models=models) self.assertIn("cannot do an empty bulk write", context.exception._message) # Namespace too large. - c_repeated = "c" * max_message_size_bytes + c_repeated = "c" * self.max_message_size_bytes namespace = f"db.{c_repeated}" models = [InsertOne(namespace=namespace, document={"a": "b"})] with self.assertRaises(InvalidOperation) as context: @@ -522,9 +524,16 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): # https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites class TestClientBulkWriteTimeout(AsyncIntegrationTest): + async def asyncSetUp(self): + self.max_write_batch_size = await async_client_context.max_write_batch_size + self.max_bson_object_size = await async_client_context.max_bson_size + self.max_message_size_bytes = await async_client_context.max_message_size_bytes + @async_client_context.require_version_min(8, 0, 0, -24) @async_client_context.require_failCommand_fail_point async def test_timeout_in_multi_batch_bulk_write(self): + _OVERHEAD = 500 + internal_client = await async_rs_or_single_client(timeoutMS=None) self.addAsyncCleanup(internal_client.aclose) @@ -532,8 +541,6 @@ class TestClientBulkWriteTimeout(AsyncIntegrationTest): self.addAsyncCleanup(collection.drop) await collection.drop() - max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"] - max_message_size_bytes = (await async_client_context.hello)["maxMessageSizeBytes"] fail_command = { "configureFailPoint": "failCommand", "mode": {"times": 2}, @@ -541,8 +548,8 @@ class TestClientBulkWriteTimeout(AsyncIntegrationTest): } async with self.fail_point(fail_command): models = [] - num_models = int(max_message_size_bytes / max_bson_object_size + 1) - b_repeated = "b" * (max_bson_object_size - 500) + num_models = int(self.max_message_size_bytes / self.max_bson_object_size + 1) + b_repeated = "b" * (self.max_bson_object_size - _OVERHEAD) for _ in range(num_models): models.append( InsertOne( diff --git a/test/test_client_bulk_write.py b/test/test_client_bulk_write.py index facf2971a..639990880 100644 --- a/test/test_client_bulk_write.py +++ b/test/test_client_bulk_write.py @@ -56,20 +56,24 @@ class TestClientBulkWrite(IntegrationTest): # https://github.com/mongodb/specifications/tree/master/source/crud/tests class TestClientBulkWriteCRUD(IntegrationTest): + def setUp(self): + self.max_write_batch_size = client_context.max_write_batch_size + self.max_bson_object_size = client_context.max_bson_size + self.max_message_size_bytes = client_context.max_message_size_bytes + @client_context.require_version_min(8, 0, 0, -24) def test_batch_splits_if_num_operations_too_large(self): listener = OvertCommandListener() client = rs_or_single_client(event_listeners=[listener]) self.addCleanup(client.close) - max_write_batch_size = (client_context.hello)["maxWriteBatchSize"] models = [] - for _ in range(max_write_batch_size + 1): + for _ in range(self.max_write_batch_size + 1): models.append(InsertOne(namespace="db.coll", document={"a": "b"})) self.addCleanup(client.db["coll"].drop) result = client.bulk_write(models=models) - self.assertEqual(result.inserted_count, max_write_batch_size + 1) + self.assertEqual(result.inserted_count, self.max_write_batch_size + 1) bulk_write_events = [] for event in listener.started_events: @@ -78,7 +82,7 @@ class TestClientBulkWriteCRUD(IntegrationTest): self.assertEqual(len(bulk_write_events), 2) first_event, second_event = bulk_write_events - self.assertEqual(len(first_event.command["ops"]), max_write_batch_size) + self.assertEqual(len(first_event.command["ops"]), self.max_write_batch_size) self.assertEqual(len(second_event.command["ops"]), 1) self.assertEqual(first_event.operation_id, second_event.operation_id) @@ -88,12 +92,9 @@ class TestClientBulkWriteCRUD(IntegrationTest): client = rs_or_single_client(event_listeners=[listener]) self.addCleanup(client.close) - max_message_size_bytes = (client_context.hello)["maxMessageSizeBytes"] - max_bson_object_size = (client_context.hello)["maxBsonObjectSize"] - models = [] - num_models = int(max_message_size_bytes / max_bson_object_size + 1) - b_repeated = "b" * (max_bson_object_size - 500) + num_models = int(self.max_message_size_bytes / self.max_bson_object_size + 1) + b_repeated = "b" * (self.max_bson_object_size - 500) for _ in range(num_models): models.append( InsertOne( @@ -126,7 +127,6 @@ class TestClientBulkWriteCRUD(IntegrationTest): retryWrites=False, ) self.addCleanup(client.close) - max_write_batch_size = (client_context.hello)["maxWriteBatchSize"] fail_command = { "configureFailPoint": "failCommand", @@ -138,7 +138,7 @@ class TestClientBulkWriteCRUD(IntegrationTest): } with self.fail_point(fail_command): models = [] - for _ in range(max_write_batch_size + 1): + for _ in range(self.max_write_batch_size + 1): models.append( InsertOne( namespace="db.coll", @@ -152,7 +152,7 @@ class TestClientBulkWriteCRUD(IntegrationTest): self.assertEqual(len(context.exception.write_concern_errors), 2) # type: ignore[arg-type] self.assertIsNotNone(context.exception.partial_result) self.assertEqual( - context.exception.partial_result.inserted_count, max_write_batch_size + 1 + context.exception.partial_result.inserted_count, self.max_write_batch_size + 1 ) bulk_write_events = [] @@ -172,9 +172,8 @@ class TestClientBulkWriteCRUD(IntegrationTest): collection.drop() collection.insert_one(document={"_id": 1}) - max_write_batch_size = (client_context.hello)["maxWriteBatchSize"] models = [] - for _ in range(max_write_batch_size + 1): + for _ in range(self.max_write_batch_size + 1): models.append( InsertOne( namespace="db.coll", @@ -184,7 +183,7 @@ class TestClientBulkWriteCRUD(IntegrationTest): with self.assertRaises(ClientBulkWriteException) as context: client.bulk_write(models=models, ordered=False) - self.assertEqual(len(context.exception.write_errors), max_write_batch_size + 1) # type: ignore[arg-type] + self.assertEqual(len(context.exception.write_errors), self.max_write_batch_size + 1) # type: ignore[arg-type] bulk_write_events = [] for event in listener.started_events: @@ -203,9 +202,8 @@ class TestClientBulkWriteCRUD(IntegrationTest): collection.drop() collection.insert_one(document={"_id": 1}) - max_write_batch_size = (client_context.hello)["maxWriteBatchSize"] models = [] - for _ in range(max_write_batch_size + 1): + for _ in range(self.max_write_batch_size + 1): models.append( InsertOne( namespace="db.coll", @@ -233,10 +231,9 @@ class TestClientBulkWriteCRUD(IntegrationTest): self.addCleanup(collection.drop) collection.drop() - max_bson_object_size = (client_context.hello)["maxBsonObjectSize"] models = [] - a_repeated = "a" * (max_bson_object_size // 2) - b_repeated = "b" * (max_bson_object_size // 2) + a_repeated = "a" * (self.max_bson_object_size // 2) + b_repeated = "b" * (self.max_bson_object_size // 2) models.append( UpdateOne( namespace="db.coll", @@ -275,12 +272,11 @@ class TestClientBulkWriteCRUD(IntegrationTest): self.addCleanup(collection.drop) collection.drop() - max_bson_object_size = (client_context.hello)["maxBsonObjectSize"] with client.start_session() as session: session.start_transaction() models = [] - a_repeated = "a" * (max_bson_object_size // 2) - b_repeated = "b" * (max_bson_object_size // 2) + a_repeated = "a" * (self.max_bson_object_size // 2) + b_repeated = "b" * (self.max_bson_object_size // 2) models.append( UpdateOne( namespace="db.coll", @@ -319,7 +315,6 @@ class TestClientBulkWriteCRUD(IntegrationTest): self.addCleanup(collection.drop) collection.drop() - max_bson_object_size = (client_context.hello)["maxBsonObjectSize"] fail_command = { "configureFailPoint": "failCommand", "mode": {"times": 1}, @@ -327,8 +322,8 @@ class TestClientBulkWriteCRUD(IntegrationTest): } with self.fail_point(fail_command): models = [] - a_repeated = "a" * (max_bson_object_size // 2) - b_repeated = "b" * (max_bson_object_size // 2) + a_repeated = "a" * (self.max_bson_object_size // 2) + b_repeated = "b" * (self.max_bson_object_size // 2) models.append( UpdateOne( namespace="db.coll", @@ -370,8 +365,7 @@ class TestClientBulkWriteCRUD(IntegrationTest): client = rs_or_single_client(event_listeners=[listener]) self.addCleanup(client.close) - max_bson_object_size = (client_context.hello)["maxBsonObjectSize"] - b_repeated = "b" * max_bson_object_size + b_repeated = "b" * self.max_bson_object_size # Insert document. models_insert = [InsertOne(namespace="db.coll", document={"a": b_repeated})] @@ -384,15 +378,25 @@ class TestClientBulkWriteCRUD(IntegrationTest): client.bulk_write(models=models_replace, write_concern=WriteConcern(w=0)) def _setup_namespace_test_models(self): - max_message_size_bytes = (client_context.hello)["maxMessageSizeBytes"] - max_bson_object_size = (client_context.hello)["maxBsonObjectSize"] + # See prose test specification below for details on these calculations. + # https://github.com/mongodb/specifications/tree/master/source/crud/tests#details-on-size-calculations + _EXISTING_BULK_WRITE_BYTES = 1122 + _OPERATION_DOC_BYTES = 57 + _NAMESPACE_DOC_BYTES = 217 - ops_bytes = max_message_size_bytes - 1122 - num_models = ops_bytes // max_bson_object_size - remainder_bytes = ops_bytes % max_bson_object_size + # When compression is enabled, max_message_size is + # smaller to account for compression message header. + if client_context.client_options.get("compressors"): + max_message_size_bytes = self.max_message_size_bytes - 16 + else: + max_message_size_bytes = self.max_message_size_bytes + + ops_bytes = max_message_size_bytes - _EXISTING_BULK_WRITE_BYTES + num_models = ops_bytes // self.max_bson_object_size + remainder_bytes = ops_bytes % self.max_bson_object_size models = [] - b_repeated = "b" * (max_bson_object_size - 57) + b_repeated = "b" * (self.max_bson_object_size - _OPERATION_DOC_BYTES) for _ in range(num_models): models.append( InsertOne( @@ -400,9 +404,9 @@ class TestClientBulkWriteCRUD(IntegrationTest): document={"a": b_repeated}, ) ) - if remainder_bytes >= 217: + if remainder_bytes >= _NAMESPACE_DOC_BYTES: num_models += 1 - b_repeated = "b" * (remainder_bytes - 57) + b_repeated = "b" * (remainder_bytes - _OPERATION_DOC_BYTES) models.append( InsertOne( namespace="db.coll", @@ -485,17 +489,15 @@ class TestClientBulkWriteCRUD(IntegrationTest): client = rs_or_single_client() self.addCleanup(client.close) - max_message_size_bytes = (client_context.hello)["maxMessageSizeBytes"] - # Document too large. - b_repeated = "b" * max_message_size_bytes + b_repeated = "b" * self.max_message_size_bytes models = [InsertOne(namespace="db.coll", document={"a": b_repeated})] with self.assertRaises(InvalidOperation) as context: client.bulk_write(models=models) self.assertIn("cannot do an empty bulk write", context.exception._message) # Namespace too large. - c_repeated = "c" * max_message_size_bytes + c_repeated = "c" * self.max_message_size_bytes namespace = f"db.{c_repeated}" models = [InsertOne(namespace=namespace, document={"a": "b"})] with self.assertRaises(InvalidOperation) as context: @@ -522,9 +524,16 @@ class TestClientBulkWriteCRUD(IntegrationTest): # https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites class TestClientBulkWriteTimeout(IntegrationTest): + def setUp(self): + self.max_write_batch_size = client_context.max_write_batch_size + self.max_bson_object_size = client_context.max_bson_size + self.max_message_size_bytes = client_context.max_message_size_bytes + @client_context.require_version_min(8, 0, 0, -24) @client_context.require_failCommand_fail_point def test_timeout_in_multi_batch_bulk_write(self): + _OVERHEAD = 500 + internal_client = rs_or_single_client(timeoutMS=None) self.addCleanup(internal_client.close) @@ -532,8 +541,6 @@ class TestClientBulkWriteTimeout(IntegrationTest): self.addCleanup(collection.drop) collection.drop() - max_bson_object_size = (client_context.hello)["maxBsonObjectSize"] - max_message_size_bytes = (client_context.hello)["maxMessageSizeBytes"] fail_command = { "configureFailPoint": "failCommand", "mode": {"times": 2}, @@ -541,8 +548,8 @@ class TestClientBulkWriteTimeout(IntegrationTest): } with self.fail_point(fail_command): models = [] - num_models = int(max_message_size_bytes / max_bson_object_size + 1) - b_repeated = "b" * (max_bson_object_size - 500) + num_models = int(self.max_message_size_bytes / self.max_bson_object_size + 1) + b_repeated = "b" * (self.max_bson_object_size - _OVERHEAD) for _ in range(num_models): models.append( InsertOne(