PYTHON-4807 - Specify how to handle unacknowledged+(ordered|verbose|m… (#1979)

This commit is contained in:
Noah Stapp 2024-10-29 12:28:33 -04:00 committed by GitHub
parent dfb6a9a4f3
commit 2f1227c504
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 159 additions and 6 deletions

View File

@ -2354,6 +2354,13 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]):
if not write_concern:
write_concern = self.write_concern
if write_concern and not write_concern.acknowledged and verbose_results:
raise InvalidOperation(
"Cannot request unacknowledged write concern and verbose results"
)
elif write_concern and not write_concern.acknowledged and ordered:
raise InvalidOperation("Cannot request unacknowledged write concern and ordered writes")
common.validate_list("models", models)
blk = _AsyncClientBulk(

View File

@ -2342,6 +2342,13 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
if not write_concern:
write_concern = self.write_concern
if write_concern and not write_concern.acknowledged and verbose_results:
raise InvalidOperation(
"Cannot request unacknowledged write concern and verbose results"
)
elif write_concern and not write_concern.acknowledged and ordered:
raise InvalidOperation("Cannot request unacknowledged write concern and ordered writes")
common.validate_list("models", models)
blk = _ClientBulk(

View File

@ -401,12 +401,16 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
# Insert document.
models_insert = [InsertOne(namespace="db.coll", document={"a": b_repeated})]
with self.assertRaises(DocumentTooLarge):
await client.bulk_write(models=models_insert, write_concern=WriteConcern(w=0))
await client.bulk_write(
models=models_insert, ordered=False, write_concern=WriteConcern(w=0)
)
# Replace document.
models_replace = [ReplaceOne(namespace="db.coll", filter={}, replacement={"a": b_repeated})]
with self.assertRaises(DocumentTooLarge):
await client.bulk_write(models=models_replace, write_concern=WriteConcern(w=0))
await client.bulk_write(
models=models_replace, ordered=False, write_concern=WriteConcern(w=0)
)
async def _setup_namespace_test_models(self):
# See prose test specification below for details on these calculations.
@ -590,6 +594,44 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
self.assertEqual(result.update_results[1].did_upsert, True)
self.assertEqual(result.update_results[2].did_upsert, False)
@async_client_context.require_version_min(8, 0, 0, -24)
@async_client_context.require_no_serverless
async def test_15_unacknowledged_write_across_batches(self):
listener = OvertCommandListener()
client = await self.async_rs_or_single_client(event_listeners=[listener])
collection = client.db["coll"]
self.addAsyncCleanup(collection.drop)
await collection.drop()
await client.db.command({"create": "db.coll"})
b_repeated = "b" * (self.max_bson_object_size - 500)
models = [
InsertOne(namespace="db.coll", document={"a": b_repeated})
for _ in range(int(self.max_message_size_bytes / self.max_bson_object_size) + 1)
]
listener.reset()
res = await client.bulk_write(models, ordered=False, write_concern=WriteConcern(w=0))
self.assertEqual(False, res.acknowledged)
events = listener.started_events
self.assertEqual(2, len(events))
self.assertEqual(
int(self.max_message_size_bytes / self.max_bson_object_size),
len(events[0].command["ops"]),
)
self.assertEqual(1, len(events[1].command["ops"]))
self.assertEqual(events[0].operation_id, events[1].operation_id)
self.assertEqual({"w": 0}, events[0].command["writeConcern"])
self.assertEqual({"w": 0}, events[1].command["writeConcern"])
self.assertEqual(
int(self.max_message_size_bytes / self.max_bson_object_size) + 1,
await collection.count_documents({}),
)
# https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites
class TestClientBulkWriteCSOT(AsyncIntegrationTest):

View File

@ -91,7 +91,8 @@
}
}
}
]
],
"ordered": false
},
"expectResult": {
"insertedCount": {
@ -158,7 +159,7 @@
"command": {
"bulkWrite": 1,
"errorsOnly": true,
"ordered": true,
"ordered": false,
"ops": [
{
"insert": 0,

View File

@ -450,6 +450,64 @@
}
}
]
},
{
"description": "Requesting unacknowledged write with verboseResults is a client-side error",
"operations": [
{
"name": "clientBulkWrite",
"object": "client0",
"arguments": {
"models": [
{
"insertOne": {
"namespace": "crud-tests.coll0",
"document": {
"_id": 10
}
}
}
],
"verboseResults": true,
"ordered": false,
"writeConcern": {
"w": 0
}
},
"expectError": {
"isClientError": true,
"errorContains": "Cannot request unacknowledged write concern and verbose results"
}
}
]
},
{
"description": "Requesting unacknowledged write with ordered is a client-side error",
"operations": [
{
"name": "clientBulkWrite",
"object": "client0",
"arguments": {
"models": [
{
"insertOne": {
"namespace": "crud-tests.coll0",
"document": {
"_id": 10
}
}
}
],
"writeConcern": {
"w": 0
}
},
"expectError": {
"isClientError": true,
"errorContains": "Cannot request unacknowledged write concern and ordered writes"
}
}
]
}
]
}

View File

@ -401,12 +401,12 @@ class TestClientBulkWriteCRUD(IntegrationTest):
# Insert document.
models_insert = [InsertOne(namespace="db.coll", document={"a": b_repeated})]
with self.assertRaises(DocumentTooLarge):
client.bulk_write(models=models_insert, write_concern=WriteConcern(w=0))
client.bulk_write(models=models_insert, ordered=False, write_concern=WriteConcern(w=0))
# Replace document.
models_replace = [ReplaceOne(namespace="db.coll", filter={}, replacement={"a": b_repeated})]
with self.assertRaises(DocumentTooLarge):
client.bulk_write(models=models_replace, write_concern=WriteConcern(w=0))
client.bulk_write(models=models_replace, ordered=False, write_concern=WriteConcern(w=0))
def _setup_namespace_test_models(self):
# See prose test specification below for details on these calculations.
@ -590,6 +590,44 @@ class TestClientBulkWriteCRUD(IntegrationTest):
self.assertEqual(result.update_results[1].did_upsert, True)
self.assertEqual(result.update_results[2].did_upsert, False)
@client_context.require_version_min(8, 0, 0, -24)
@client_context.require_no_serverless
def test_15_unacknowledged_write_across_batches(self):
listener = OvertCommandListener()
client = self.rs_or_single_client(event_listeners=[listener])
collection = client.db["coll"]
self.addCleanup(collection.drop)
collection.drop()
client.db.command({"create": "db.coll"})
b_repeated = "b" * (self.max_bson_object_size - 500)
models = [
InsertOne(namespace="db.coll", document={"a": b_repeated})
for _ in range(int(self.max_message_size_bytes / self.max_bson_object_size) + 1)
]
listener.reset()
res = client.bulk_write(models, ordered=False, write_concern=WriteConcern(w=0))
self.assertEqual(False, res.acknowledged)
events = listener.started_events
self.assertEqual(2, len(events))
self.assertEqual(
int(self.max_message_size_bytes / self.max_bson_object_size),
len(events[0].command["ops"]),
)
self.assertEqual(1, len(events[1].command["ops"]))
self.assertEqual(events[0].operation_id, events[1].operation_id)
self.assertEqual({"w": 0}, events[0].command["writeConcern"])
self.assertEqual({"w": 0}, events[1].command["writeConcern"])
self.assertEqual(
int(self.max_message_size_bytes / self.max_bson_object_size) + 1,
collection.count_documents({}),
)
# https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites
class TestClientBulkWriteCSOT(IntegrationTest):