From 30b32d00c45b0e79fce822d23a283f562c1291f9 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Mon, 12 Aug 2024 10:10:19 -0700 Subject: [PATCH 1/3] PYTHON-4649 Skip CSOT tests on slow Windows and macOS hosts (#1784) --- .evergreen/config.yml | 9 +++++++++ test/test_csot.py | 1 + test/unified_format.py | 5 +++++ 3 files changed, 15 insertions(+) diff --git a/.evergreen/config.yml b/.evergreen/config.yml index 1e218e2c7..0df4bdcef 100644 --- a/.evergreen/config.yml +++ b/.evergreen/config.yml @@ -435,6 +435,9 @@ functions: if [ -n "${TEST_INDEX_MANAGEMENT}" ]; then export TEST_INDEX_MANAGEMENT=1 fi + if [ -n "${SKIP_CSOT_TESTS}" ]; then + export SKIP_CSOT_TESTS=1 + fi GREEN_FRAMEWORK=${GREEN_FRAMEWORK} \ PYTHON_BINARY=${PYTHON_BINARY} \ @@ -2072,6 +2075,8 @@ axes: skip_EC2_auth_test: true skip_ECS_auth_test: true skip_web_identity_auth_test: true + # CSOT tests are unreliable on our slow macOS hosts. + SKIP_CSOT_TESTS: true - id: macos-arm64 display_name: "macOS Arm64" run_on: macos-14-arm64 @@ -2079,6 +2084,8 @@ axes: skip_EC2_auth_test: true skip_ECS_auth_test: true skip_web_identity_auth_test: true + # CSOT tests are unreliable on our slow macOS hosts. + SKIP_CSOT_TESTS: true - id: rhel7 display_name: "RHEL 7.x" run_on: rhel79-small @@ -2121,6 +2128,8 @@ axes: skip_EC2_auth_test: true skip_web_identity_auth_test: true venv_bin_dir: "Scripts" + # CSOT tests are unreliable on our slow Windows hosts. + SKIP_CSOT_TESTS: true # Test with authentication? - id: auth diff --git a/test/test_csot.py b/test/test_csot.py index e8ee92d4a..64210b4d6 100644 --- a/test/test_csot.py +++ b/test/test_csot.py @@ -77,6 +77,7 @@ class TestCSOT(IntegrationTest): @client_context.require_change_streams def test_change_stream_can_resume_after_timeouts(self): coll = self.db.test + coll.insert_one({}) with coll.watch() as stream: with pymongo.timeout(0.1): with self.assertRaises(PyMongoError) as ctx: diff --git a/test/unified_format.py b/test/unified_format.py index 0322d83cc..d978ef84d 100644 --- a/test/unified_format.py +++ b/test/unified_format.py @@ -145,6 +145,8 @@ from pymongo.topology_description import TopologyDescription from pymongo.typings import _Address from pymongo.write_concern import WriteConcern +SKIP_CSOT_TESTS = os.getenv("SKIP_CSOT_TESTS") + JSON_OPTS = json_util.JSONOptions(tz_aware=False) IS_INTERRUPTED = False @@ -1953,6 +1955,9 @@ class UnifiedSpecTestMixinV1(IntegrationTest): self.assertListEqual(sorted_expected_documents, actual_documents) def run_scenario(self, spec, uri=None): + if "csot" in self.id().lower() and SKIP_CSOT_TESTS: + raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...") + # Kill all sessions before and after each test to prevent an open # transaction (from a test failure) from blocking collection/database # operations during test set up and tear down. From 2afbd4b2791e619e3983dcc8a04b33e5fc4c4844 Mon Sep 17 00:00:00 2001 From: Shruti Sridhar <77828382+shruti-sridhar@users.noreply.github.com> Date: Mon, 12 Aug 2024 10:21:09 -0700 Subject: [PATCH 2/3] PYTHON-4650 Fix MongoClient.bulk_write test failure when compression is enabled (#1786) --- test/__init__.py | 4 + test/asynchronous/__init__.py | 4 + test/asynchronous/test_client_bulk_write.py | 95 +++++++++++---------- test/test_client_bulk_write.py | 95 +++++++++++---------- 4 files changed, 110 insertions(+), 88 deletions(-) diff --git a/test/__init__.py b/test/__init__.py index b8394bd13..e60736e3e 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -852,6 +852,10 @@ class ClientContext: def max_write_batch_size(self): return (self.hello)["maxWriteBatchSize"] + @property + def max_message_size_bytes(self): + return (self.hello)["maxMessageSizeBytes"] + # Reusable client context client_context = ClientContext() diff --git a/test/asynchronous/__init__.py b/test/asynchronous/__init__.py index 4098d9af8..a95a9e31b 100644 --- a/test/asynchronous/__init__.py +++ b/test/asynchronous/__init__.py @@ -854,6 +854,10 @@ class AsyncClientContext: async def max_write_batch_size(self): return (await self.hello)["maxWriteBatchSize"] + @property + async def max_message_size_bytes(self): + return (await self.hello)["maxMessageSizeBytes"] + # Reusable client context async_client_context = AsyncClientContext() diff --git a/test/asynchronous/test_client_bulk_write.py b/test/asynchronous/test_client_bulk_write.py index f55b3082b..7ce989bbe 100644 --- a/test/asynchronous/test_client_bulk_write.py +++ b/test/asynchronous/test_client_bulk_write.py @@ -56,20 +56,24 @@ class TestClientBulkWrite(AsyncIntegrationTest): # https://github.com/mongodb/specifications/tree/master/source/crud/tests class TestClientBulkWriteCRUD(AsyncIntegrationTest): + async def asyncSetUp(self): + self.max_write_batch_size = await async_client_context.max_write_batch_size + self.max_bson_object_size = await async_client_context.max_bson_size + self.max_message_size_bytes = await async_client_context.max_message_size_bytes + @async_client_context.require_version_min(8, 0, 0, -24) async def test_batch_splits_if_num_operations_too_large(self): listener = OvertCommandListener() client = await async_rs_or_single_client(event_listeners=[listener]) self.addAsyncCleanup(client.aclose) - max_write_batch_size = (await async_client_context.hello)["maxWriteBatchSize"] models = [] - for _ in range(max_write_batch_size + 1): + for _ in range(self.max_write_batch_size + 1): models.append(InsertOne(namespace="db.coll", document={"a": "b"})) self.addAsyncCleanup(client.db["coll"].drop) result = await client.bulk_write(models=models) - self.assertEqual(result.inserted_count, max_write_batch_size + 1) + self.assertEqual(result.inserted_count, self.max_write_batch_size + 1) bulk_write_events = [] for event in listener.started_events: @@ -78,7 +82,7 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): self.assertEqual(len(bulk_write_events), 2) first_event, second_event = bulk_write_events - self.assertEqual(len(first_event.command["ops"]), max_write_batch_size) + self.assertEqual(len(first_event.command["ops"]), self.max_write_batch_size) self.assertEqual(len(second_event.command["ops"]), 1) self.assertEqual(first_event.operation_id, second_event.operation_id) @@ -88,12 +92,9 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): client = await async_rs_or_single_client(event_listeners=[listener]) self.addAsyncCleanup(client.aclose) - max_message_size_bytes = (await async_client_context.hello)["maxMessageSizeBytes"] - max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"] - models = [] - num_models = int(max_message_size_bytes / max_bson_object_size + 1) - b_repeated = "b" * (max_bson_object_size - 500) + num_models = int(self.max_message_size_bytes / self.max_bson_object_size + 1) + b_repeated = "b" * (self.max_bson_object_size - 500) for _ in range(num_models): models.append( InsertOne( @@ -126,7 +127,6 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): retryWrites=False, ) self.addAsyncCleanup(client.aclose) - max_write_batch_size = (await async_client_context.hello)["maxWriteBatchSize"] fail_command = { "configureFailPoint": "failCommand", @@ -138,7 +138,7 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): } async with self.fail_point(fail_command): models = [] - for _ in range(max_write_batch_size + 1): + for _ in range(self.max_write_batch_size + 1): models.append( InsertOne( namespace="db.coll", @@ -152,7 +152,7 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): self.assertEqual(len(context.exception.write_concern_errors), 2) # type: ignore[arg-type] self.assertIsNotNone(context.exception.partial_result) self.assertEqual( - context.exception.partial_result.inserted_count, max_write_batch_size + 1 + context.exception.partial_result.inserted_count, self.max_write_batch_size + 1 ) bulk_write_events = [] @@ -172,9 +172,8 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): await collection.drop() await collection.insert_one(document={"_id": 1}) - max_write_batch_size = (await async_client_context.hello)["maxWriteBatchSize"] models = [] - for _ in range(max_write_batch_size + 1): + for _ in range(self.max_write_batch_size + 1): models.append( InsertOne( namespace="db.coll", @@ -184,7 +183,7 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): with self.assertRaises(ClientBulkWriteException) as context: await client.bulk_write(models=models, ordered=False) - self.assertEqual(len(context.exception.write_errors), max_write_batch_size + 1) # type: ignore[arg-type] + self.assertEqual(len(context.exception.write_errors), self.max_write_batch_size + 1) # type: ignore[arg-type] bulk_write_events = [] for event in listener.started_events: @@ -203,9 +202,8 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): await collection.drop() await collection.insert_one(document={"_id": 1}) - max_write_batch_size = (await async_client_context.hello)["maxWriteBatchSize"] models = [] - for _ in range(max_write_batch_size + 1): + for _ in range(self.max_write_batch_size + 1): models.append( InsertOne( namespace="db.coll", @@ -233,10 +231,9 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): self.addAsyncCleanup(collection.drop) await collection.drop() - max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"] models = [] - a_repeated = "a" * (max_bson_object_size // 2) - b_repeated = "b" * (max_bson_object_size // 2) + a_repeated = "a" * (self.max_bson_object_size // 2) + b_repeated = "b" * (self.max_bson_object_size // 2) models.append( UpdateOne( namespace="db.coll", @@ -275,12 +272,11 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): self.addAsyncCleanup(collection.drop) await collection.drop() - max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"] async with client.start_session() as session: await session.start_transaction() models = [] - a_repeated = "a" * (max_bson_object_size // 2) - b_repeated = "b" * (max_bson_object_size // 2) + a_repeated = "a" * (self.max_bson_object_size // 2) + b_repeated = "b" * (self.max_bson_object_size // 2) models.append( UpdateOne( namespace="db.coll", @@ -319,7 +315,6 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): self.addAsyncCleanup(collection.drop) await collection.drop() - max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"] fail_command = { "configureFailPoint": "failCommand", "mode": {"times": 1}, @@ -327,8 +322,8 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): } async with self.fail_point(fail_command): models = [] - a_repeated = "a" * (max_bson_object_size // 2) - b_repeated = "b" * (max_bson_object_size // 2) + a_repeated = "a" * (self.max_bson_object_size // 2) + b_repeated = "b" * (self.max_bson_object_size // 2) models.append( UpdateOne( namespace="db.coll", @@ -370,8 +365,7 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): client = await async_rs_or_single_client(event_listeners=[listener]) self.addAsyncCleanup(client.aclose) - max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"] - b_repeated = "b" * max_bson_object_size + b_repeated = "b" * self.max_bson_object_size # Insert document. models_insert = [InsertOne(namespace="db.coll", document={"a": b_repeated})] @@ -384,15 +378,25 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): await client.bulk_write(models=models_replace, write_concern=WriteConcern(w=0)) async def _setup_namespace_test_models(self): - max_message_size_bytes = (await async_client_context.hello)["maxMessageSizeBytes"] - max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"] + # See prose test specification below for details on these calculations. + # https://github.com/mongodb/specifications/tree/master/source/crud/tests#details-on-size-calculations + _EXISTING_BULK_WRITE_BYTES = 1122 + _OPERATION_DOC_BYTES = 57 + _NAMESPACE_DOC_BYTES = 217 - ops_bytes = max_message_size_bytes - 1122 - num_models = ops_bytes // max_bson_object_size - remainder_bytes = ops_bytes % max_bson_object_size + # When compression is enabled, max_message_size is + # smaller to account for compression message header. + if async_client_context.client_options.get("compressors"): + max_message_size_bytes = self.max_message_size_bytes - 16 + else: + max_message_size_bytes = self.max_message_size_bytes + + ops_bytes = max_message_size_bytes - _EXISTING_BULK_WRITE_BYTES + num_models = ops_bytes // self.max_bson_object_size + remainder_bytes = ops_bytes % self.max_bson_object_size models = [] - b_repeated = "b" * (max_bson_object_size - 57) + b_repeated = "b" * (self.max_bson_object_size - _OPERATION_DOC_BYTES) for _ in range(num_models): models.append( InsertOne( @@ -400,9 +404,9 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): document={"a": b_repeated}, ) ) - if remainder_bytes >= 217: + if remainder_bytes >= _NAMESPACE_DOC_BYTES: num_models += 1 - b_repeated = "b" * (remainder_bytes - 57) + b_repeated = "b" * (remainder_bytes - _OPERATION_DOC_BYTES) models.append( InsertOne( namespace="db.coll", @@ -485,17 +489,15 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): client = await async_rs_or_single_client() self.addAsyncCleanup(client.aclose) - max_message_size_bytes = (await async_client_context.hello)["maxMessageSizeBytes"] - # Document too large. - b_repeated = "b" * max_message_size_bytes + b_repeated = "b" * self.max_message_size_bytes models = [InsertOne(namespace="db.coll", document={"a": b_repeated})] with self.assertRaises(InvalidOperation) as context: await client.bulk_write(models=models) self.assertIn("cannot do an empty bulk write", context.exception._message) # Namespace too large. - c_repeated = "c" * max_message_size_bytes + c_repeated = "c" * self.max_message_size_bytes namespace = f"db.{c_repeated}" models = [InsertOne(namespace=namespace, document={"a": "b"})] with self.assertRaises(InvalidOperation) as context: @@ -522,9 +524,16 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest): # https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites class TestClientBulkWriteTimeout(AsyncIntegrationTest): + async def asyncSetUp(self): + self.max_write_batch_size = await async_client_context.max_write_batch_size + self.max_bson_object_size = await async_client_context.max_bson_size + self.max_message_size_bytes = await async_client_context.max_message_size_bytes + @async_client_context.require_version_min(8, 0, 0, -24) @async_client_context.require_failCommand_fail_point async def test_timeout_in_multi_batch_bulk_write(self): + _OVERHEAD = 500 + internal_client = await async_rs_or_single_client(timeoutMS=None) self.addAsyncCleanup(internal_client.aclose) @@ -532,8 +541,6 @@ class TestClientBulkWriteTimeout(AsyncIntegrationTest): self.addAsyncCleanup(collection.drop) await collection.drop() - max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"] - max_message_size_bytes = (await async_client_context.hello)["maxMessageSizeBytes"] fail_command = { "configureFailPoint": "failCommand", "mode": {"times": 2}, @@ -541,8 +548,8 @@ class TestClientBulkWriteTimeout(AsyncIntegrationTest): } async with self.fail_point(fail_command): models = [] - num_models = int(max_message_size_bytes / max_bson_object_size + 1) - b_repeated = "b" * (max_bson_object_size - 500) + num_models = int(self.max_message_size_bytes / self.max_bson_object_size + 1) + b_repeated = "b" * (self.max_bson_object_size - _OVERHEAD) for _ in range(num_models): models.append( InsertOne( diff --git a/test/test_client_bulk_write.py b/test/test_client_bulk_write.py index facf2971a..639990880 100644 --- a/test/test_client_bulk_write.py +++ b/test/test_client_bulk_write.py @@ -56,20 +56,24 @@ class TestClientBulkWrite(IntegrationTest): # https://github.com/mongodb/specifications/tree/master/source/crud/tests class TestClientBulkWriteCRUD(IntegrationTest): + def setUp(self): + self.max_write_batch_size = client_context.max_write_batch_size + self.max_bson_object_size = client_context.max_bson_size + self.max_message_size_bytes = client_context.max_message_size_bytes + @client_context.require_version_min(8, 0, 0, -24) def test_batch_splits_if_num_operations_too_large(self): listener = OvertCommandListener() client = rs_or_single_client(event_listeners=[listener]) self.addCleanup(client.close) - max_write_batch_size = (client_context.hello)["maxWriteBatchSize"] models = [] - for _ in range(max_write_batch_size + 1): + for _ in range(self.max_write_batch_size + 1): models.append(InsertOne(namespace="db.coll", document={"a": "b"})) self.addCleanup(client.db["coll"].drop) result = client.bulk_write(models=models) - self.assertEqual(result.inserted_count, max_write_batch_size + 1) + self.assertEqual(result.inserted_count, self.max_write_batch_size + 1) bulk_write_events = [] for event in listener.started_events: @@ -78,7 +82,7 @@ class TestClientBulkWriteCRUD(IntegrationTest): self.assertEqual(len(bulk_write_events), 2) first_event, second_event = bulk_write_events - self.assertEqual(len(first_event.command["ops"]), max_write_batch_size) + self.assertEqual(len(first_event.command["ops"]), self.max_write_batch_size) self.assertEqual(len(second_event.command["ops"]), 1) self.assertEqual(first_event.operation_id, second_event.operation_id) @@ -88,12 +92,9 @@ class TestClientBulkWriteCRUD(IntegrationTest): client = rs_or_single_client(event_listeners=[listener]) self.addCleanup(client.close) - max_message_size_bytes = (client_context.hello)["maxMessageSizeBytes"] - max_bson_object_size = (client_context.hello)["maxBsonObjectSize"] - models = [] - num_models = int(max_message_size_bytes / max_bson_object_size + 1) - b_repeated = "b" * (max_bson_object_size - 500) + num_models = int(self.max_message_size_bytes / self.max_bson_object_size + 1) + b_repeated = "b" * (self.max_bson_object_size - 500) for _ in range(num_models): models.append( InsertOne( @@ -126,7 +127,6 @@ class TestClientBulkWriteCRUD(IntegrationTest): retryWrites=False, ) self.addCleanup(client.close) - max_write_batch_size = (client_context.hello)["maxWriteBatchSize"] fail_command = { "configureFailPoint": "failCommand", @@ -138,7 +138,7 @@ class TestClientBulkWriteCRUD(IntegrationTest): } with self.fail_point(fail_command): models = [] - for _ in range(max_write_batch_size + 1): + for _ in range(self.max_write_batch_size + 1): models.append( InsertOne( namespace="db.coll", @@ -152,7 +152,7 @@ class TestClientBulkWriteCRUD(IntegrationTest): self.assertEqual(len(context.exception.write_concern_errors), 2) # type: ignore[arg-type] self.assertIsNotNone(context.exception.partial_result) self.assertEqual( - context.exception.partial_result.inserted_count, max_write_batch_size + 1 + context.exception.partial_result.inserted_count, self.max_write_batch_size + 1 ) bulk_write_events = [] @@ -172,9 +172,8 @@ class TestClientBulkWriteCRUD(IntegrationTest): collection.drop() collection.insert_one(document={"_id": 1}) - max_write_batch_size = (client_context.hello)["maxWriteBatchSize"] models = [] - for _ in range(max_write_batch_size + 1): + for _ in range(self.max_write_batch_size + 1): models.append( InsertOne( namespace="db.coll", @@ -184,7 +183,7 @@ class TestClientBulkWriteCRUD(IntegrationTest): with self.assertRaises(ClientBulkWriteException) as context: client.bulk_write(models=models, ordered=False) - self.assertEqual(len(context.exception.write_errors), max_write_batch_size + 1) # type: ignore[arg-type] + self.assertEqual(len(context.exception.write_errors), self.max_write_batch_size + 1) # type: ignore[arg-type] bulk_write_events = [] for event in listener.started_events: @@ -203,9 +202,8 @@ class TestClientBulkWriteCRUD(IntegrationTest): collection.drop() collection.insert_one(document={"_id": 1}) - max_write_batch_size = (client_context.hello)["maxWriteBatchSize"] models = [] - for _ in range(max_write_batch_size + 1): + for _ in range(self.max_write_batch_size + 1): models.append( InsertOne( namespace="db.coll", @@ -233,10 +231,9 @@ class TestClientBulkWriteCRUD(IntegrationTest): self.addCleanup(collection.drop) collection.drop() - max_bson_object_size = (client_context.hello)["maxBsonObjectSize"] models = [] - a_repeated = "a" * (max_bson_object_size // 2) - b_repeated = "b" * (max_bson_object_size // 2) + a_repeated = "a" * (self.max_bson_object_size // 2) + b_repeated = "b" * (self.max_bson_object_size // 2) models.append( UpdateOne( namespace="db.coll", @@ -275,12 +272,11 @@ class TestClientBulkWriteCRUD(IntegrationTest): self.addCleanup(collection.drop) collection.drop() - max_bson_object_size = (client_context.hello)["maxBsonObjectSize"] with client.start_session() as session: session.start_transaction() models = [] - a_repeated = "a" * (max_bson_object_size // 2) - b_repeated = "b" * (max_bson_object_size // 2) + a_repeated = "a" * (self.max_bson_object_size // 2) + b_repeated = "b" * (self.max_bson_object_size // 2) models.append( UpdateOne( namespace="db.coll", @@ -319,7 +315,6 @@ class TestClientBulkWriteCRUD(IntegrationTest): self.addCleanup(collection.drop) collection.drop() - max_bson_object_size = (client_context.hello)["maxBsonObjectSize"] fail_command = { "configureFailPoint": "failCommand", "mode": {"times": 1}, @@ -327,8 +322,8 @@ class TestClientBulkWriteCRUD(IntegrationTest): } with self.fail_point(fail_command): models = [] - a_repeated = "a" * (max_bson_object_size // 2) - b_repeated = "b" * (max_bson_object_size // 2) + a_repeated = "a" * (self.max_bson_object_size // 2) + b_repeated = "b" * (self.max_bson_object_size // 2) models.append( UpdateOne( namespace="db.coll", @@ -370,8 +365,7 @@ class TestClientBulkWriteCRUD(IntegrationTest): client = rs_or_single_client(event_listeners=[listener]) self.addCleanup(client.close) - max_bson_object_size = (client_context.hello)["maxBsonObjectSize"] - b_repeated = "b" * max_bson_object_size + b_repeated = "b" * self.max_bson_object_size # Insert document. models_insert = [InsertOne(namespace="db.coll", document={"a": b_repeated})] @@ -384,15 +378,25 @@ class TestClientBulkWriteCRUD(IntegrationTest): client.bulk_write(models=models_replace, write_concern=WriteConcern(w=0)) def _setup_namespace_test_models(self): - max_message_size_bytes = (client_context.hello)["maxMessageSizeBytes"] - max_bson_object_size = (client_context.hello)["maxBsonObjectSize"] + # See prose test specification below for details on these calculations. + # https://github.com/mongodb/specifications/tree/master/source/crud/tests#details-on-size-calculations + _EXISTING_BULK_WRITE_BYTES = 1122 + _OPERATION_DOC_BYTES = 57 + _NAMESPACE_DOC_BYTES = 217 - ops_bytes = max_message_size_bytes - 1122 - num_models = ops_bytes // max_bson_object_size - remainder_bytes = ops_bytes % max_bson_object_size + # When compression is enabled, max_message_size is + # smaller to account for compression message header. + if client_context.client_options.get("compressors"): + max_message_size_bytes = self.max_message_size_bytes - 16 + else: + max_message_size_bytes = self.max_message_size_bytes + + ops_bytes = max_message_size_bytes - _EXISTING_BULK_WRITE_BYTES + num_models = ops_bytes // self.max_bson_object_size + remainder_bytes = ops_bytes % self.max_bson_object_size models = [] - b_repeated = "b" * (max_bson_object_size - 57) + b_repeated = "b" * (self.max_bson_object_size - _OPERATION_DOC_BYTES) for _ in range(num_models): models.append( InsertOne( @@ -400,9 +404,9 @@ class TestClientBulkWriteCRUD(IntegrationTest): document={"a": b_repeated}, ) ) - if remainder_bytes >= 217: + if remainder_bytes >= _NAMESPACE_DOC_BYTES: num_models += 1 - b_repeated = "b" * (remainder_bytes - 57) + b_repeated = "b" * (remainder_bytes - _OPERATION_DOC_BYTES) models.append( InsertOne( namespace="db.coll", @@ -485,17 +489,15 @@ class TestClientBulkWriteCRUD(IntegrationTest): client = rs_or_single_client() self.addCleanup(client.close) - max_message_size_bytes = (client_context.hello)["maxMessageSizeBytes"] - # Document too large. - b_repeated = "b" * max_message_size_bytes + b_repeated = "b" * self.max_message_size_bytes models = [InsertOne(namespace="db.coll", document={"a": b_repeated})] with self.assertRaises(InvalidOperation) as context: client.bulk_write(models=models) self.assertIn("cannot do an empty bulk write", context.exception._message) # Namespace too large. - c_repeated = "c" * max_message_size_bytes + c_repeated = "c" * self.max_message_size_bytes namespace = f"db.{c_repeated}" models = [InsertOne(namespace=namespace, document={"a": "b"})] with self.assertRaises(InvalidOperation) as context: @@ -522,9 +524,16 @@ class TestClientBulkWriteCRUD(IntegrationTest): # https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites class TestClientBulkWriteTimeout(IntegrationTest): + def setUp(self): + self.max_write_batch_size = client_context.max_write_batch_size + self.max_bson_object_size = client_context.max_bson_size + self.max_message_size_bytes = client_context.max_message_size_bytes + @client_context.require_version_min(8, 0, 0, -24) @client_context.require_failCommand_fail_point def test_timeout_in_multi_batch_bulk_write(self): + _OVERHEAD = 500 + internal_client = rs_or_single_client(timeoutMS=None) self.addCleanup(internal_client.close) @@ -532,8 +541,6 @@ class TestClientBulkWriteTimeout(IntegrationTest): self.addCleanup(collection.drop) collection.drop() - max_bson_object_size = (client_context.hello)["maxBsonObjectSize"] - max_message_size_bytes = (client_context.hello)["maxMessageSizeBytes"] fail_command = { "configureFailPoint": "failCommand", "mode": {"times": 2}, @@ -541,8 +548,8 @@ class TestClientBulkWriteTimeout(IntegrationTest): } with self.fail_point(fail_command): models = [] - num_models = int(max_message_size_bytes / max_bson_object_size + 1) - b_repeated = "b" * (max_bson_object_size - 500) + num_models = int(self.max_message_size_bytes / self.max_bson_object_size + 1) + b_repeated = "b" * (self.max_bson_object_size - _OVERHEAD) for _ in range(num_models): models.append( InsertOne( From a232b657d01030d2bc2b40db068ebb49f8b964a4 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Mon, 12 Aug 2024 10:23:43 -0700 Subject: [PATCH 3/3] PYTHON-4613 Skip async tests when testing eventlet/gevent (#1780) --- green_framework_test.py | 12 +++++++++++- hatch.toml | 2 +- pyproject.toml | 3 ++- test/asynchronous/conftest.py | 6 ++---- test/conftest.py | 7 ++----- test/pytest_conf.py | 16 ++++++++++++++++ tools/synchro.py | 1 + 7 files changed, 35 insertions(+), 12 deletions(-) create mode 100644 test/pytest_conf.py diff --git a/green_framework_test.py b/green_framework_test.py index 65025798c..037d0279c 100644 --- a/green_framework_test.py +++ b/green_framework_test.py @@ -60,8 +60,18 @@ def run(framework_name, *args): # Monkey-patch. FRAMEWORKS[framework_name]() + arg_list = list(args) + + # Never run async tests with a framework + if len(arg_list) <= 1: + arg_list.extend(["-m", "not default_async and default"]) + else: + for i in range(len(arg_list) - 1): + if "-m" in arg_list[i]: + arg_list[i + 1] = f"not default_async and {arg_list[i + 1]}" + # Run the tests. - sys.exit(pytest.main(list(args))) + sys.exit(pytest.main(arg_list)) def main(): diff --git a/hatch.toml b/hatch.toml index 25d113e5d..8b1cf93e3 100644 --- a/hatch.toml +++ b/hatch.toml @@ -41,7 +41,7 @@ features = ["test"] [envs.test.scripts] test = "pytest -v --durations=5 --maxfail=10 {args}" test-eg = "bash ./.evergreen/run-tests.sh {args}" -test-async = "test test/asynchronous/ {args}" +test-async = "pytest -v --durations=5 --maxfail=10 -m default_async {args}" test-mockupdb = ["pip install -U git+https://github.com/ajdavis/mongo-mockup-db@master", "test -m mockupdb"] [envs.encryption] diff --git a/pyproject.toml b/pyproject.toml index cfd994f56..4380b57e8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,7 +70,7 @@ zstd = ["requirements/zstd.txt"] [tool.pytest.ini_options] minversion = "7" -addopts = ["-ra", "--strict-config", "--strict-markers", "--junitxml=xunit-results/TEST-results.xml", "-m default"] +addopts = ["-ra", "--strict-config", "--strict-markers", "--junitxml=xunit-results/TEST-results.xml", "-m default or default_async"] testpaths = ["test"] log_cli_level = "INFO" faulthandler_timeout = 1500 @@ -108,6 +108,7 @@ markers = [ "load_balancer: load balancer tests", "mockupdb: tests that rely on mockupdb", "default: default test suite", + "default_async: default async test suite", ] [tool.mypy] diff --git a/test/asynchronous/conftest.py b/test/asynchronous/conftest.py index 398ba4265..f5bcd953a 100644 --- a/test/asynchronous/conftest.py +++ b/test/asynchronous/conftest.py @@ -1,5 +1,6 @@ from __future__ import annotations +from test import pytest_conf from test.asynchronous import async_setup, async_teardown import pytest_asyncio @@ -14,7 +15,4 @@ async def test_setup_and_teardown(): await async_teardown() -def pytest_collection_modifyitems(items, config): - for item in items: - if not any(item.iter_markers()): - item.add_marker("default") +pytest_collection_modifyitems = pytest_conf.pytest_collection_modifyitems diff --git a/test/conftest.py b/test/conftest.py index 39d29355b..431dd152f 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,6 +1,6 @@ from __future__ import annotations -from test import setup, teardown +from test import pytest_conf, setup, teardown import pytest @@ -14,7 +14,4 @@ def test_setup_and_teardown(): teardown() -def pytest_collection_modifyitems(items, config): - for item in items: - if not any(item.iter_markers()): - item.add_marker("default") +pytest_collection_modifyitems = pytest_conf.pytest_collection_modifyitems diff --git a/test/pytest_conf.py b/test/pytest_conf.py new file mode 100644 index 000000000..75f3e7432 --- /dev/null +++ b/test/pytest_conf.py @@ -0,0 +1,16 @@ +from __future__ import annotations + + +def pytest_collection_modifyitems(items, config): + sync_items = [] + async_items = [ + item + for item in items + if "asynchronous" in item.fspath.dirname or sync_items.append(item) # type: ignore[func-returns-value] + ] + for item in async_items: + if not any(item.iter_markers()): + item.add_marker("default_async") + for item in sync_items: + if not any(item.iter_markers()): + item.add_marker("default") diff --git a/tools/synchro.py b/tools/synchro.py index e0af50229..5711e1f84 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -95,6 +95,7 @@ replacements = { "aclose": "close", "async-transactions-ref": "transactions-ref", "async-snapshot-reads-ref": "snapshot-reads-ref", + "default_async": "default", } docstring_replacements: dict[tuple[str, str], str] = {