Merge branch 'master' of github.com:mongodb/mongo-python-driver

This commit is contained in:
Steven Silvester 2024-08-12 17:23:59 -05:00
commit 248448a6a2
No known key found for this signature in database
GPG Key ID: B1BF5EC3A8B32F91
14 changed files with 160 additions and 100 deletions

View File

@ -435,6 +435,9 @@ functions:
if [ -n "${TEST_INDEX_MANAGEMENT}" ]; then
export TEST_INDEX_MANAGEMENT=1
fi
if [ -n "${SKIP_CSOT_TESTS}" ]; then
export SKIP_CSOT_TESTS=1
fi
GREEN_FRAMEWORK=${GREEN_FRAMEWORK} \
PYTHON_BINARY=${PYTHON_BINARY} \
@ -2072,6 +2075,8 @@ axes:
skip_EC2_auth_test: true
skip_ECS_auth_test: true
skip_web_identity_auth_test: true
# CSOT tests are unreliable on our slow macOS hosts.
SKIP_CSOT_TESTS: true
- id: macos-arm64
display_name: "macOS Arm64"
run_on: macos-14-arm64
@ -2079,6 +2084,8 @@ axes:
skip_EC2_auth_test: true
skip_ECS_auth_test: true
skip_web_identity_auth_test: true
# CSOT tests are unreliable on our slow macOS hosts.
SKIP_CSOT_TESTS: true
- id: rhel7
display_name: "RHEL 7.x"
run_on: rhel79-small
@ -2121,6 +2128,8 @@ axes:
skip_EC2_auth_test: true
skip_web_identity_auth_test: true
venv_bin_dir: "Scripts"
# CSOT tests are unreliable on our slow Windows hosts.
SKIP_CSOT_TESTS: true
# Test with authentication?
- id: auth

View File

@ -60,8 +60,18 @@ def run(framework_name, *args):
# Monkey-patch.
FRAMEWORKS[framework_name]()
arg_list = list(args)
# Never run async tests with a framework
if len(arg_list) <= 1:
arg_list.extend(["-m", "not default_async and default"])
else:
for i in range(len(arg_list) - 1):
if "-m" in arg_list[i]:
arg_list[i + 1] = f"not default_async and {arg_list[i + 1]}"
# Run the tests.
sys.exit(pytest.main(list(args)))
sys.exit(pytest.main(arg_list))
def main():

View File

@ -41,7 +41,7 @@ features = ["test"]
[envs.test.scripts]
test = "pytest -v --durations=5 --maxfail=10 {args}"
test-eg = "bash ./.evergreen/run-tests.sh {args}"
test-async = "test test/asynchronous/ {args}"
test-async = "pytest -v --durations=5 --maxfail=10 -m default_async {args}"
test-mockupdb = ["pip install -U git+https://github.com/ajdavis/mongo-mockup-db@master", "test -m mockupdb"]
[envs.encryption]

View File

@ -70,7 +70,7 @@ zstd = ["requirements/zstd.txt"]
[tool.pytest.ini_options]
minversion = "7"
addopts = ["-ra", "--strict-config", "--strict-markers", "--junitxml=xunit-results/TEST-results.xml", "-m default"]
addopts = ["-ra", "--strict-config", "--strict-markers", "--junitxml=xunit-results/TEST-results.xml", "-m default or default_async"]
testpaths = ["test"]
log_cli_level = "INFO"
faulthandler_timeout = 1500
@ -108,6 +108,7 @@ markers = [
"load_balancer: load balancer tests",
"mockupdb: tests that rely on mockupdb",
"default: default test suite",
"default_async: default async test suite",
]
[tool.mypy]

View File

@ -852,6 +852,10 @@ class ClientContext:
def max_write_batch_size(self):
return (self.hello)["maxWriteBatchSize"]
@property
def max_message_size_bytes(self):
return (self.hello)["maxMessageSizeBytes"]
# Reusable client context
client_context = ClientContext()

View File

@ -854,6 +854,10 @@ class AsyncClientContext:
async def max_write_batch_size(self):
return (await self.hello)["maxWriteBatchSize"]
@property
async def max_message_size_bytes(self):
return (await self.hello)["maxMessageSizeBytes"]
# Reusable client context
async_client_context = AsyncClientContext()

View File

@ -1,5 +1,6 @@
from __future__ import annotations
from test import pytest_conf
from test.asynchronous import async_setup, async_teardown
import pytest_asyncio
@ -14,7 +15,4 @@ async def test_setup_and_teardown():
await async_teardown()
def pytest_collection_modifyitems(items, config):
for item in items:
if not any(item.iter_markers()):
item.add_marker("default")
pytest_collection_modifyitems = pytest_conf.pytest_collection_modifyitems

View File

@ -56,20 +56,24 @@ class TestClientBulkWrite(AsyncIntegrationTest):
# https://github.com/mongodb/specifications/tree/master/source/crud/tests
class TestClientBulkWriteCRUD(AsyncIntegrationTest):
async def asyncSetUp(self):
self.max_write_batch_size = await async_client_context.max_write_batch_size
self.max_bson_object_size = await async_client_context.max_bson_size
self.max_message_size_bytes = await async_client_context.max_message_size_bytes
@async_client_context.require_version_min(8, 0, 0, -24)
async def test_batch_splits_if_num_operations_too_large(self):
listener = OvertCommandListener()
client = await async_rs_or_single_client(event_listeners=[listener])
self.addAsyncCleanup(client.aclose)
max_write_batch_size = (await async_client_context.hello)["maxWriteBatchSize"]
models = []
for _ in range(max_write_batch_size + 1):
for _ in range(self.max_write_batch_size + 1):
models.append(InsertOne(namespace="db.coll", document={"a": "b"}))
self.addAsyncCleanup(client.db["coll"].drop)
result = await client.bulk_write(models=models)
self.assertEqual(result.inserted_count, max_write_batch_size + 1)
self.assertEqual(result.inserted_count, self.max_write_batch_size + 1)
bulk_write_events = []
for event in listener.started_events:
@ -78,7 +82,7 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
self.assertEqual(len(bulk_write_events), 2)
first_event, second_event = bulk_write_events
self.assertEqual(len(first_event.command["ops"]), max_write_batch_size)
self.assertEqual(len(first_event.command["ops"]), self.max_write_batch_size)
self.assertEqual(len(second_event.command["ops"]), 1)
self.assertEqual(first_event.operation_id, second_event.operation_id)
@ -88,12 +92,9 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
client = await async_rs_or_single_client(event_listeners=[listener])
self.addAsyncCleanup(client.aclose)
max_message_size_bytes = (await async_client_context.hello)["maxMessageSizeBytes"]
max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"]
models = []
num_models = int(max_message_size_bytes / max_bson_object_size + 1)
b_repeated = "b" * (max_bson_object_size - 500)
num_models = int(self.max_message_size_bytes / self.max_bson_object_size + 1)
b_repeated = "b" * (self.max_bson_object_size - 500)
for _ in range(num_models):
models.append(
InsertOne(
@ -126,7 +127,6 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
retryWrites=False,
)
self.addAsyncCleanup(client.aclose)
max_write_batch_size = (await async_client_context.hello)["maxWriteBatchSize"]
fail_command = {
"configureFailPoint": "failCommand",
@ -138,7 +138,7 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
}
async with self.fail_point(fail_command):
models = []
for _ in range(max_write_batch_size + 1):
for _ in range(self.max_write_batch_size + 1):
models.append(
InsertOne(
namespace="db.coll",
@ -152,7 +152,7 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
self.assertEqual(len(context.exception.write_concern_errors), 2) # type: ignore[arg-type]
self.assertIsNotNone(context.exception.partial_result)
self.assertEqual(
context.exception.partial_result.inserted_count, max_write_batch_size + 1
context.exception.partial_result.inserted_count, self.max_write_batch_size + 1
)
bulk_write_events = []
@ -172,9 +172,8 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
await collection.drop()
await collection.insert_one(document={"_id": 1})
max_write_batch_size = (await async_client_context.hello)["maxWriteBatchSize"]
models = []
for _ in range(max_write_batch_size + 1):
for _ in range(self.max_write_batch_size + 1):
models.append(
InsertOne(
namespace="db.coll",
@ -184,7 +183,7 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
with self.assertRaises(ClientBulkWriteException) as context:
await client.bulk_write(models=models, ordered=False)
self.assertEqual(len(context.exception.write_errors), max_write_batch_size + 1) # type: ignore[arg-type]
self.assertEqual(len(context.exception.write_errors), self.max_write_batch_size + 1) # type: ignore[arg-type]
bulk_write_events = []
for event in listener.started_events:
@ -203,9 +202,8 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
await collection.drop()
await collection.insert_one(document={"_id": 1})
max_write_batch_size = (await async_client_context.hello)["maxWriteBatchSize"]
models = []
for _ in range(max_write_batch_size + 1):
for _ in range(self.max_write_batch_size + 1):
models.append(
InsertOne(
namespace="db.coll",
@ -233,10 +231,9 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
self.addAsyncCleanup(collection.drop)
await collection.drop()
max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"]
models = []
a_repeated = "a" * (max_bson_object_size // 2)
b_repeated = "b" * (max_bson_object_size // 2)
a_repeated = "a" * (self.max_bson_object_size // 2)
b_repeated = "b" * (self.max_bson_object_size // 2)
models.append(
UpdateOne(
namespace="db.coll",
@ -275,12 +272,11 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
self.addAsyncCleanup(collection.drop)
await collection.drop()
max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"]
async with client.start_session() as session:
await session.start_transaction()
models = []
a_repeated = "a" * (max_bson_object_size // 2)
b_repeated = "b" * (max_bson_object_size // 2)
a_repeated = "a" * (self.max_bson_object_size // 2)
b_repeated = "b" * (self.max_bson_object_size // 2)
models.append(
UpdateOne(
namespace="db.coll",
@ -319,7 +315,6 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
self.addAsyncCleanup(collection.drop)
await collection.drop()
max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"]
fail_command = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
@ -327,8 +322,8 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
}
async with self.fail_point(fail_command):
models = []
a_repeated = "a" * (max_bson_object_size // 2)
b_repeated = "b" * (max_bson_object_size // 2)
a_repeated = "a" * (self.max_bson_object_size // 2)
b_repeated = "b" * (self.max_bson_object_size // 2)
models.append(
UpdateOne(
namespace="db.coll",
@ -370,8 +365,7 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
client = await async_rs_or_single_client(event_listeners=[listener])
self.addAsyncCleanup(client.aclose)
max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"]
b_repeated = "b" * max_bson_object_size
b_repeated = "b" * self.max_bson_object_size
# Insert document.
models_insert = [InsertOne(namespace="db.coll", document={"a": b_repeated})]
@ -384,15 +378,25 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
await client.bulk_write(models=models_replace, write_concern=WriteConcern(w=0))
async def _setup_namespace_test_models(self):
max_message_size_bytes = (await async_client_context.hello)["maxMessageSizeBytes"]
max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"]
# See prose test specification below for details on these calculations.
# https://github.com/mongodb/specifications/tree/master/source/crud/tests#details-on-size-calculations
_EXISTING_BULK_WRITE_BYTES = 1122
_OPERATION_DOC_BYTES = 57
_NAMESPACE_DOC_BYTES = 217
ops_bytes = max_message_size_bytes - 1122
num_models = ops_bytes // max_bson_object_size
remainder_bytes = ops_bytes % max_bson_object_size
# When compression is enabled, max_message_size is
# smaller to account for compression message header.
if async_client_context.client_options.get("compressors"):
max_message_size_bytes = self.max_message_size_bytes - 16
else:
max_message_size_bytes = self.max_message_size_bytes
ops_bytes = max_message_size_bytes - _EXISTING_BULK_WRITE_BYTES
num_models = ops_bytes // self.max_bson_object_size
remainder_bytes = ops_bytes % self.max_bson_object_size
models = []
b_repeated = "b" * (max_bson_object_size - 57)
b_repeated = "b" * (self.max_bson_object_size - _OPERATION_DOC_BYTES)
for _ in range(num_models):
models.append(
InsertOne(
@ -400,9 +404,9 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
document={"a": b_repeated},
)
)
if remainder_bytes >= 217:
if remainder_bytes >= _NAMESPACE_DOC_BYTES:
num_models += 1
b_repeated = "b" * (remainder_bytes - 57)
b_repeated = "b" * (remainder_bytes - _OPERATION_DOC_BYTES)
models.append(
InsertOne(
namespace="db.coll",
@ -485,17 +489,15 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
client = await async_rs_or_single_client()
self.addAsyncCleanup(client.aclose)
max_message_size_bytes = (await async_client_context.hello)["maxMessageSizeBytes"]
# Document too large.
b_repeated = "b" * max_message_size_bytes
b_repeated = "b" * self.max_message_size_bytes
models = [InsertOne(namespace="db.coll", document={"a": b_repeated})]
with self.assertRaises(InvalidOperation) as context:
await client.bulk_write(models=models)
self.assertIn("cannot do an empty bulk write", context.exception._message)
# Namespace too large.
c_repeated = "c" * max_message_size_bytes
c_repeated = "c" * self.max_message_size_bytes
namespace = f"db.{c_repeated}"
models = [InsertOne(namespace=namespace, document={"a": "b"})]
with self.assertRaises(InvalidOperation) as context:
@ -522,9 +524,16 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
# https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites
class TestClientBulkWriteTimeout(AsyncIntegrationTest):
async def asyncSetUp(self):
self.max_write_batch_size = await async_client_context.max_write_batch_size
self.max_bson_object_size = await async_client_context.max_bson_size
self.max_message_size_bytes = await async_client_context.max_message_size_bytes
@async_client_context.require_version_min(8, 0, 0, -24)
@async_client_context.require_failCommand_fail_point
async def test_timeout_in_multi_batch_bulk_write(self):
_OVERHEAD = 500
internal_client = await async_rs_or_single_client(timeoutMS=None)
self.addAsyncCleanup(internal_client.aclose)
@ -532,8 +541,6 @@ class TestClientBulkWriteTimeout(AsyncIntegrationTest):
self.addAsyncCleanup(collection.drop)
await collection.drop()
max_bson_object_size = (await async_client_context.hello)["maxBsonObjectSize"]
max_message_size_bytes = (await async_client_context.hello)["maxMessageSizeBytes"]
fail_command = {
"configureFailPoint": "failCommand",
"mode": {"times": 2},
@ -541,8 +548,8 @@ class TestClientBulkWriteTimeout(AsyncIntegrationTest):
}
async with self.fail_point(fail_command):
models = []
num_models = int(max_message_size_bytes / max_bson_object_size + 1)
b_repeated = "b" * (max_bson_object_size - 500)
num_models = int(self.max_message_size_bytes / self.max_bson_object_size + 1)
b_repeated = "b" * (self.max_bson_object_size - _OVERHEAD)
for _ in range(num_models):
models.append(
InsertOne(

View File

@ -1,6 +1,6 @@
from __future__ import annotations
from test import setup, teardown
from test import pytest_conf, setup, teardown
import pytest
@ -14,7 +14,4 @@ def test_setup_and_teardown():
teardown()
def pytest_collection_modifyitems(items, config):
for item in items:
if not any(item.iter_markers()):
item.add_marker("default")
pytest_collection_modifyitems = pytest_conf.pytest_collection_modifyitems

16
test/pytest_conf.py Normal file
View File

@ -0,0 +1,16 @@
from __future__ import annotations
def pytest_collection_modifyitems(items, config):
sync_items = []
async_items = [
item
for item in items
if "asynchronous" in item.fspath.dirname or sync_items.append(item) # type: ignore[func-returns-value]
]
for item in async_items:
if not any(item.iter_markers()):
item.add_marker("default_async")
for item in sync_items:
if not any(item.iter_markers()):
item.add_marker("default")

View File

@ -56,20 +56,24 @@ class TestClientBulkWrite(IntegrationTest):
# https://github.com/mongodb/specifications/tree/master/source/crud/tests
class TestClientBulkWriteCRUD(IntegrationTest):
def setUp(self):
self.max_write_batch_size = client_context.max_write_batch_size
self.max_bson_object_size = client_context.max_bson_size
self.max_message_size_bytes = client_context.max_message_size_bytes
@client_context.require_version_min(8, 0, 0, -24)
def test_batch_splits_if_num_operations_too_large(self):
listener = OvertCommandListener()
client = rs_or_single_client(event_listeners=[listener])
self.addCleanup(client.close)
max_write_batch_size = (client_context.hello)["maxWriteBatchSize"]
models = []
for _ in range(max_write_batch_size + 1):
for _ in range(self.max_write_batch_size + 1):
models.append(InsertOne(namespace="db.coll", document={"a": "b"}))
self.addCleanup(client.db["coll"].drop)
result = client.bulk_write(models=models)
self.assertEqual(result.inserted_count, max_write_batch_size + 1)
self.assertEqual(result.inserted_count, self.max_write_batch_size + 1)
bulk_write_events = []
for event in listener.started_events:
@ -78,7 +82,7 @@ class TestClientBulkWriteCRUD(IntegrationTest):
self.assertEqual(len(bulk_write_events), 2)
first_event, second_event = bulk_write_events
self.assertEqual(len(first_event.command["ops"]), max_write_batch_size)
self.assertEqual(len(first_event.command["ops"]), self.max_write_batch_size)
self.assertEqual(len(second_event.command["ops"]), 1)
self.assertEqual(first_event.operation_id, second_event.operation_id)
@ -88,12 +92,9 @@ class TestClientBulkWriteCRUD(IntegrationTest):
client = rs_or_single_client(event_listeners=[listener])
self.addCleanup(client.close)
max_message_size_bytes = (client_context.hello)["maxMessageSizeBytes"]
max_bson_object_size = (client_context.hello)["maxBsonObjectSize"]
models = []
num_models = int(max_message_size_bytes / max_bson_object_size + 1)
b_repeated = "b" * (max_bson_object_size - 500)
num_models = int(self.max_message_size_bytes / self.max_bson_object_size + 1)
b_repeated = "b" * (self.max_bson_object_size - 500)
for _ in range(num_models):
models.append(
InsertOne(
@ -126,7 +127,6 @@ class TestClientBulkWriteCRUD(IntegrationTest):
retryWrites=False,
)
self.addCleanup(client.close)
max_write_batch_size = (client_context.hello)["maxWriteBatchSize"]
fail_command = {
"configureFailPoint": "failCommand",
@ -138,7 +138,7 @@ class TestClientBulkWriteCRUD(IntegrationTest):
}
with self.fail_point(fail_command):
models = []
for _ in range(max_write_batch_size + 1):
for _ in range(self.max_write_batch_size + 1):
models.append(
InsertOne(
namespace="db.coll",
@ -152,7 +152,7 @@ class TestClientBulkWriteCRUD(IntegrationTest):
self.assertEqual(len(context.exception.write_concern_errors), 2) # type: ignore[arg-type]
self.assertIsNotNone(context.exception.partial_result)
self.assertEqual(
context.exception.partial_result.inserted_count, max_write_batch_size + 1
context.exception.partial_result.inserted_count, self.max_write_batch_size + 1
)
bulk_write_events = []
@ -172,9 +172,8 @@ class TestClientBulkWriteCRUD(IntegrationTest):
collection.drop()
collection.insert_one(document={"_id": 1})
max_write_batch_size = (client_context.hello)["maxWriteBatchSize"]
models = []
for _ in range(max_write_batch_size + 1):
for _ in range(self.max_write_batch_size + 1):
models.append(
InsertOne(
namespace="db.coll",
@ -184,7 +183,7 @@ class TestClientBulkWriteCRUD(IntegrationTest):
with self.assertRaises(ClientBulkWriteException) as context:
client.bulk_write(models=models, ordered=False)
self.assertEqual(len(context.exception.write_errors), max_write_batch_size + 1) # type: ignore[arg-type]
self.assertEqual(len(context.exception.write_errors), self.max_write_batch_size + 1) # type: ignore[arg-type]
bulk_write_events = []
for event in listener.started_events:
@ -203,9 +202,8 @@ class TestClientBulkWriteCRUD(IntegrationTest):
collection.drop()
collection.insert_one(document={"_id": 1})
max_write_batch_size = (client_context.hello)["maxWriteBatchSize"]
models = []
for _ in range(max_write_batch_size + 1):
for _ in range(self.max_write_batch_size + 1):
models.append(
InsertOne(
namespace="db.coll",
@ -233,10 +231,9 @@ class TestClientBulkWriteCRUD(IntegrationTest):
self.addCleanup(collection.drop)
collection.drop()
max_bson_object_size = (client_context.hello)["maxBsonObjectSize"]
models = []
a_repeated = "a" * (max_bson_object_size // 2)
b_repeated = "b" * (max_bson_object_size // 2)
a_repeated = "a" * (self.max_bson_object_size // 2)
b_repeated = "b" * (self.max_bson_object_size // 2)
models.append(
UpdateOne(
namespace="db.coll",
@ -275,12 +272,11 @@ class TestClientBulkWriteCRUD(IntegrationTest):
self.addCleanup(collection.drop)
collection.drop()
max_bson_object_size = (client_context.hello)["maxBsonObjectSize"]
with client.start_session() as session:
session.start_transaction()
models = []
a_repeated = "a" * (max_bson_object_size // 2)
b_repeated = "b" * (max_bson_object_size // 2)
a_repeated = "a" * (self.max_bson_object_size // 2)
b_repeated = "b" * (self.max_bson_object_size // 2)
models.append(
UpdateOne(
namespace="db.coll",
@ -319,7 +315,6 @@ class TestClientBulkWriteCRUD(IntegrationTest):
self.addCleanup(collection.drop)
collection.drop()
max_bson_object_size = (client_context.hello)["maxBsonObjectSize"]
fail_command = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
@ -327,8 +322,8 @@ class TestClientBulkWriteCRUD(IntegrationTest):
}
with self.fail_point(fail_command):
models = []
a_repeated = "a" * (max_bson_object_size // 2)
b_repeated = "b" * (max_bson_object_size // 2)
a_repeated = "a" * (self.max_bson_object_size // 2)
b_repeated = "b" * (self.max_bson_object_size // 2)
models.append(
UpdateOne(
namespace="db.coll",
@ -370,8 +365,7 @@ class TestClientBulkWriteCRUD(IntegrationTest):
client = rs_or_single_client(event_listeners=[listener])
self.addCleanup(client.close)
max_bson_object_size = (client_context.hello)["maxBsonObjectSize"]
b_repeated = "b" * max_bson_object_size
b_repeated = "b" * self.max_bson_object_size
# Insert document.
models_insert = [InsertOne(namespace="db.coll", document={"a": b_repeated})]
@ -384,15 +378,25 @@ class TestClientBulkWriteCRUD(IntegrationTest):
client.bulk_write(models=models_replace, write_concern=WriteConcern(w=0))
def _setup_namespace_test_models(self):
max_message_size_bytes = (client_context.hello)["maxMessageSizeBytes"]
max_bson_object_size = (client_context.hello)["maxBsonObjectSize"]
# See prose test specification below for details on these calculations.
# https://github.com/mongodb/specifications/tree/master/source/crud/tests#details-on-size-calculations
_EXISTING_BULK_WRITE_BYTES = 1122
_OPERATION_DOC_BYTES = 57
_NAMESPACE_DOC_BYTES = 217
ops_bytes = max_message_size_bytes - 1122
num_models = ops_bytes // max_bson_object_size
remainder_bytes = ops_bytes % max_bson_object_size
# When compression is enabled, max_message_size is
# smaller to account for compression message header.
if client_context.client_options.get("compressors"):
max_message_size_bytes = self.max_message_size_bytes - 16
else:
max_message_size_bytes = self.max_message_size_bytes
ops_bytes = max_message_size_bytes - _EXISTING_BULK_WRITE_BYTES
num_models = ops_bytes // self.max_bson_object_size
remainder_bytes = ops_bytes % self.max_bson_object_size
models = []
b_repeated = "b" * (max_bson_object_size - 57)
b_repeated = "b" * (self.max_bson_object_size - _OPERATION_DOC_BYTES)
for _ in range(num_models):
models.append(
InsertOne(
@ -400,9 +404,9 @@ class TestClientBulkWriteCRUD(IntegrationTest):
document={"a": b_repeated},
)
)
if remainder_bytes >= 217:
if remainder_bytes >= _NAMESPACE_DOC_BYTES:
num_models += 1
b_repeated = "b" * (remainder_bytes - 57)
b_repeated = "b" * (remainder_bytes - _OPERATION_DOC_BYTES)
models.append(
InsertOne(
namespace="db.coll",
@ -485,17 +489,15 @@ class TestClientBulkWriteCRUD(IntegrationTest):
client = rs_or_single_client()
self.addCleanup(client.close)
max_message_size_bytes = (client_context.hello)["maxMessageSizeBytes"]
# Document too large.
b_repeated = "b" * max_message_size_bytes
b_repeated = "b" * self.max_message_size_bytes
models = [InsertOne(namespace="db.coll", document={"a": b_repeated})]
with self.assertRaises(InvalidOperation) as context:
client.bulk_write(models=models)
self.assertIn("cannot do an empty bulk write", context.exception._message)
# Namespace too large.
c_repeated = "c" * max_message_size_bytes
c_repeated = "c" * self.max_message_size_bytes
namespace = f"db.{c_repeated}"
models = [InsertOne(namespace=namespace, document={"a": "b"})]
with self.assertRaises(InvalidOperation) as context:
@ -522,9 +524,16 @@ class TestClientBulkWriteCRUD(IntegrationTest):
# https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites
class TestClientBulkWriteTimeout(IntegrationTest):
def setUp(self):
self.max_write_batch_size = client_context.max_write_batch_size
self.max_bson_object_size = client_context.max_bson_size
self.max_message_size_bytes = client_context.max_message_size_bytes
@client_context.require_version_min(8, 0, 0, -24)
@client_context.require_failCommand_fail_point
def test_timeout_in_multi_batch_bulk_write(self):
_OVERHEAD = 500
internal_client = rs_or_single_client(timeoutMS=None)
self.addCleanup(internal_client.close)
@ -532,8 +541,6 @@ class TestClientBulkWriteTimeout(IntegrationTest):
self.addCleanup(collection.drop)
collection.drop()
max_bson_object_size = (client_context.hello)["maxBsonObjectSize"]
max_message_size_bytes = (client_context.hello)["maxMessageSizeBytes"]
fail_command = {
"configureFailPoint": "failCommand",
"mode": {"times": 2},
@ -541,8 +548,8 @@ class TestClientBulkWriteTimeout(IntegrationTest):
}
with self.fail_point(fail_command):
models = []
num_models = int(max_message_size_bytes / max_bson_object_size + 1)
b_repeated = "b" * (max_bson_object_size - 500)
num_models = int(self.max_message_size_bytes / self.max_bson_object_size + 1)
b_repeated = "b" * (self.max_bson_object_size - _OVERHEAD)
for _ in range(num_models):
models.append(
InsertOne(

View File

@ -77,6 +77,7 @@ class TestCSOT(IntegrationTest):
@client_context.require_change_streams
def test_change_stream_can_resume_after_timeouts(self):
coll = self.db.test
coll.insert_one({})
with coll.watch() as stream:
with pymongo.timeout(0.1):
with self.assertRaises(PyMongoError) as ctx:

View File

@ -145,6 +145,8 @@ from pymongo.topology_description import TopologyDescription
from pymongo.typings import _Address
from pymongo.write_concern import WriteConcern
SKIP_CSOT_TESTS = os.getenv("SKIP_CSOT_TESTS")
JSON_OPTS = json_util.JSONOptions(tz_aware=False)
IS_INTERRUPTED = False
@ -1953,6 +1955,9 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
self.assertListEqual(sorted_expected_documents, actual_documents)
def run_scenario(self, spec, uri=None):
if "csot" in self.id().lower() and SKIP_CSOT_TESTS:
raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...")
# Kill all sessions before and after each test to prevent an open
# transaction (from a test failure) from blocking collection/database
# operations during test set up and tear down.

View File

@ -95,6 +95,7 @@ replacements = {
"aclose": "close",
"async-transactions-ref": "transactions-ref",
"async-snapshot-reads-ref": "snapshot-reads-ref",
"default_async": "default",
}
docstring_replacements: dict[tuple[str, str], str] = {