From 69dee51b90c631585fa0b0d42628e081666d83d4 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Thu, 5 Aug 2021 17:58:15 -0700 Subject: [PATCH] PYTHON-1318 Remove initialize_unordered_bulk_op and initialize_ordered_bulk_op (#692) PYTHON-2436 Unskip test_large_inserts_ordered on MongoDB 5.0. --- doc/api/pymongo/collection.rst | 2 - doc/changelog.rst | 4 + doc/migrate-to-pymongo4.rst | 45 ++ pymongo/bulk.py | 180 ------ pymongo/collection.py | 65 +- test/test_bulk.py | 580 ++++++++++++++++- test/test_collation.py | 48 -- test/test_collection.py | 14 + test/test_legacy_api.py | 1079 +------------------------------- test/test_session.py | 10 - 10 files changed, 632 insertions(+), 1395 deletions(-) diff --git a/doc/api/pymongo/collection.rst b/doc/api/pymongo/collection.rst index 240318c0e..8430c5a06 100644 --- a/doc/api/pymongo/collection.rst +++ b/doc/api/pymongo/collection.rst @@ -67,6 +67,4 @@ .. automethod:: options .. automethod:: map_reduce .. automethod:: inline_map_reduce - .. automethod:: initialize_unordered_bulk_op - .. automethod:: initialize_ordered_bulk_op .. automethod:: count diff --git a/doc/changelog.rst b/doc/changelog.rst index 1c482cbc3..c1b6bd9f3 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -45,6 +45,10 @@ Breaking Changes in 4.0 - Removed :meth:`pymongo.collection.Collection.update`. - Removed :meth:`pymongo.collection.Collection.remove`. - Removed :meth:`pymongo.collection.Collection.find_and_modify`. +- Removed :meth:`pymongo.collection.Collection.initialize_ordered_bulk_op`, + :meth:`pymongo.collection.Collection.initialize_unordered_bulk_op`, and + :class:`pymongo.bulk.BulkOperationBuilder`. Use + :meth:`pymongo.collection.Collection.bulk_write` instead. - Removed :meth:`pymongo.collection.Collection.group`. - Removed the ``useCursor`` option for :meth:`~pymongo.collection.Collection.aggregate`. diff --git a/doc/migrate-to-pymongo4.rst b/doc/migrate-to-pymongo4.rst index 0cefc21fc..ba3116899 100644 --- a/doc/migrate-to-pymongo4.rst +++ b/doc/migrate-to-pymongo4.rst @@ -315,6 +315,51 @@ Can be changed to this:: replaced_doc = collection.find_one_and_replace({'b': 1}, {'c': 1}) deleted_doc = collection.find_one_and_delete({'c': 1}) +Collection.initialize_ordered_bulk_op and initialize_unordered_bulk_op is removed +................................................................................. + +Removed :meth:`pymongo.collection.Collection.initialize_ordered_bulk_op` +and :class:`pymongo.bulk.BulkOperationBuilder`. Use +:meth:`pymongo.collection.Collection.bulk_write` instead. Code like this:: + + batch = coll.initialize_ordered_bulk_op() + batch.insert({'a': 1}) + batch.find({'a': 1}).update_one({'$set': {'b': 1}}) + batch.find({'a': 2}).upsert().replace_one({'b': 2}) + batch.find({'a': 3}).remove() + result = batch.execute() + +Can be changed to this:: + + coll.bulk_write([ + InsertOne({'a': 1}), + UpdateOne({'a': 1}, {'$set': {'b': 1}}), + ReplaceOne({'a': 2}, {'b': 2}, upsert=True), + DeleteOne({'a': 3}), + ]) + +Collection.initialize_unordered_bulk_op is removed +.................................................. + +Removed :meth:`pymongo.collection.Collection.initialize_unordered_bulk_op`. +Use :meth:`pymongo.collection.Collection.bulk_write` instead. Code like this:: + + batch = coll.initialize_unordered_bulk_op() + batch.insert({'a': 1}) + batch.find({'a': 1}).update_one({'$set': {'b': 1}}) + batch.find({'a': 2}).upsert().replace_one({'b': 2}) + batch.find({'a': 3}).remove() + result = batch.execute() + +Can be changed to this:: + + coll.bulk_write([ + InsertOne({'a': 1}), + UpdateOne({'a': 1}, {'$set': {'b': 1}}), + ReplaceOne({'a': 2}, {'b': 2}, upsert=True), + DeleteOne({'a': 3}), + ], ordered=False) + Collection.group is removed ........................... diff --git a/pymongo/bulk.py b/pymongo/bulk.py index 893d8a83e..ff3f5974d 100644 --- a/pymongo/bulk.py +++ b/pymongo/bulk.py @@ -521,183 +521,3 @@ class _Bulk(object): self.execute_no_results(sock_info, generator) else: return self.execute_command(generator, write_concern, session) - - -class BulkUpsertOperation(object): - """An interface for adding upsert operations. - """ - - __slots__ = ('__selector', '__bulk', '__collation') - - def __init__(self, selector, bulk, collation): - self.__selector = selector - self.__bulk = bulk - self.__collation = collation - - def update_one(self, update): - """Update one document matching the selector. - - :Parameters: - - `update` (dict): the update operations to apply - """ - self.__bulk.add_update(self.__selector, - update, multi=False, upsert=True, - collation=self.__collation) - - def update(self, update): - """Update all documents matching the selector. - - :Parameters: - - `update` (dict): the update operations to apply - """ - self.__bulk.add_update(self.__selector, - update, multi=True, upsert=True, - collation=self.__collation) - - def replace_one(self, replacement): - """Replace one entire document matching the selector criteria. - - :Parameters: - - `replacement` (dict): the replacement document - """ - self.__bulk.add_replace(self.__selector, replacement, upsert=True, - collation=self.__collation) - - -class BulkWriteOperation(object): - """An interface for adding update or remove operations. - """ - - __slots__ = ('__selector', '__bulk', '__collation') - - def __init__(self, selector, bulk, collation): - self.__selector = selector - self.__bulk = bulk - self.__collation = collation - - def update_one(self, update): - """Update one document matching the selector criteria. - - :Parameters: - - `update` (dict): the update operations to apply - """ - self.__bulk.add_update(self.__selector, update, multi=False, - collation=self.__collation) - - def update(self, update): - """Update all documents matching the selector criteria. - - :Parameters: - - `update` (dict): the update operations to apply - """ - self.__bulk.add_update(self.__selector, update, multi=True, - collation=self.__collation) - - def replace_one(self, replacement): - """Replace one entire document matching the selector criteria. - - :Parameters: - - `replacement` (dict): the replacement document - """ - self.__bulk.add_replace(self.__selector, replacement, - collation=self.__collation) - - def remove_one(self): - """Remove a single document matching the selector criteria. - """ - self.__bulk.add_delete(self.__selector, _DELETE_ONE, - collation=self.__collation) - - def remove(self): - """Remove all documents matching the selector criteria. - """ - self.__bulk.add_delete(self.__selector, _DELETE_ALL, - collation=self.__collation) - - def upsert(self): - """Specify that all chained update operations should be - upserts. - - :Returns: - - A :class:`BulkUpsertOperation` instance, used to add - update operations to this bulk operation. - """ - return BulkUpsertOperation(self.__selector, self.__bulk, - self.__collation) - - -class BulkOperationBuilder(object): - """**DEPRECATED**: An interface for executing a batch of write operations. - """ - - __slots__ = '__bulk' - - def __init__(self, collection, ordered=True, - bypass_document_validation=False): - """**DEPRECATED**: Initialize a new BulkOperationBuilder instance. - - :Parameters: - - `collection`: A :class:`~pymongo.collection.Collection` instance. - - `ordered` (optional): If ``True`` all operations will be executed - serially, in the order provided, and the entire execution will - abort on the first error. If ``False`` operations will be executed - in arbitrary order (possibly in parallel on the server), reporting - any errors that occurred after attempting all operations. Defaults - to ``True``. - - `bypass_document_validation`: (optional) If ``True``, allows the - write to opt-out of document level validation. Default is - ``False``. - - .. note:: `bypass_document_validation` requires server version - **>= 3.2** - - .. versionchanged:: 3.5 - Deprecated. Use :meth:`~pymongo.collection.Collection.bulk_write` - instead. - - .. versionchanged:: 3.2 - Added bypass_document_validation support - """ - self.__bulk = _Bulk(collection, ordered, bypass_document_validation) - - def find(self, selector, collation=None): - """Specify selection criteria for bulk operations. - - :Parameters: - - `selector` (dict): the selection criteria for update - and remove operations. - - `collation` (optional): An instance of - :class:`~pymongo.collation.Collation`. This option is only - supported on MongoDB 3.4 and above. - - :Returns: - - A :class:`BulkWriteOperation` instance, used to add - update and remove operations to this bulk operation. - - .. versionchanged:: 3.4 - Added the `collation` option. - - """ - validate_is_mapping("selector", selector) - return BulkWriteOperation(selector, self.__bulk, collation) - - def insert(self, document): - """Insert a single document. - - :Parameters: - - `document` (dict): the document to insert - - .. seealso:: :ref:`writes-and-ids` - """ - self.__bulk.add_insert(document) - - def execute(self, write_concern=None): - """Execute all provided operations. - - :Parameters: - - write_concern (optional): the write concern for this bulk - execution. - """ - if write_concern is not None: - write_concern = WriteConcern(**write_concern) - return self.__bulk.execute(write_concern, session=None) diff --git a/pymongo/collection.py b/pymongo/collection.py index 2a0ffd6ce..848b39280 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -29,7 +29,7 @@ from pymongo import (common, message) from pymongo.aggregation import (_CollectionAggregationCommand, _CollectionRawAggregationCommand) -from pymongo.bulk import BulkOperationBuilder, _Bulk +from pymongo.bulk import _Bulk from pymongo.command_cursor import CommandCursor, RawBatchCommandCursor from pymongo.collation import validate_collation_or_none from pymongo.change_stream import CollectionChangeStream @@ -358,69 +358,6 @@ class Collection(common.BaseObject): write_concern or self.write_concern, read_concern or self.read_concern) - def initialize_unordered_bulk_op(self, bypass_document_validation=False): - """**DEPRECATED** - Initialize an unordered batch of write operations. - - Operations will be performed on the server in arbitrary order, - possibly in parallel. All operations will be attempted. - - :Parameters: - - `bypass_document_validation`: (optional) If ``True``, allows the - write to opt-out of document level validation. Default is - ``False``. - - Returns a :class:`~pymongo.bulk.BulkOperationBuilder` instance. - - See :ref:`unordered_bulk` for examples. - - .. note:: `bypass_document_validation` requires server version - **>= 3.2** - - .. versionchanged:: 3.5 - Deprecated. Use :meth:`~pymongo.collection.Collection.bulk_write` - instead. - - .. versionchanged:: 3.2 - Added bypass_document_validation support - - .. versionadded:: 2.7 - """ - warnings.warn("initialize_unordered_bulk_op is deprecated", - DeprecationWarning, stacklevel=2) - return BulkOperationBuilder(self, False, bypass_document_validation) - - def initialize_ordered_bulk_op(self, bypass_document_validation=False): - """**DEPRECATED** - Initialize an ordered batch of write operations. - - Operations will be performed on the server serially, in the - order provided. If an error occurs all remaining operations - are aborted. - - :Parameters: - - `bypass_document_validation`: (optional) If ``True``, allows the - write to opt-out of document level validation. Default is - ``False``. - - Returns a :class:`~pymongo.bulk.BulkOperationBuilder` instance. - - See :ref:`ordered_bulk` for examples. - - .. note:: `bypass_document_validation` requires server version - **>= 3.2** - - .. versionchanged:: 3.5 - Deprecated. Use :meth:`~pymongo.collection.Collection.bulk_write` - instead. - - .. versionchanged:: 3.2 - Added bypass_document_validation support - - .. versionadded:: 2.7 - """ - warnings.warn("initialize_ordered_bulk_op is deprecated", - DeprecationWarning, stacklevel=2) - return BulkOperationBuilder(self, True, bypass_document_validation) - def bulk_write(self, requests, ordered=True, bypass_document_validation=False, session=None): """Send a batch of write operations to the server. diff --git a/test/test_bulk.py b/test/test_bulk.py index 7d8176e57..9834da55c 100644 --- a/test/test_bulk.py +++ b/test/test_bulk.py @@ -19,16 +19,20 @@ import sys sys.path[0:0] = [""] from bson.objectid import ObjectId -from pymongo.operations import * -from pymongo.errors import (ConfigurationError, +from pymongo.common import partition_node +from pymongo.errors import (BulkWriteError, + ConfigurationError, InvalidOperation, OperationFailure) +from pymongo.operations import * from pymongo.write_concern import WriteConcern from test import (client_context, unittest, IntegrationTest) from test.utils import (remove_all_users, - rs_or_single_client_noauth) + rs_or_single_client_noauth, + single_client, + wait_until) class BulkTestBase(IntegrationTest): @@ -37,8 +41,7 @@ class BulkTestBase(IntegrationTest): def setUpClass(cls): super(BulkTestBase, cls).setUpClass() cls.coll = cls.db.test - ismaster = client_context.client.admin.command('ismaster') - cls.has_write_commands = (ismaster.get("maxWireVersion", 0) > 1) + cls.coll_w0 = cls.coll.with_options(write_concern=WriteConcern(w=0)) def setUp(self): super(BulkTestBase, self).setUp() @@ -48,11 +51,7 @@ class BulkTestBase(IntegrationTest): """Compare response from bulk.execute() to expected response.""" for key, value in expected.items(): if key == 'nModified': - if self.has_write_commands: - self.assertEqual(value, actual['nModified']) - else: - # Legacy servers don't include nModified in the response. - self.assertFalse('nModified' in actual) + self.assertEqual(value, actual['nModified']) elif key == 'upserted': expected_upserts = value actual_upserts = actual['upserted'] @@ -182,7 +181,7 @@ class TestBulk(BulkTestBase): self.assertRaises(TypeError, UpdateOne, {}, {}, array_filters={}) def test_array_filters_unacknowledged(self): - coll = self.coll.with_options(write_concern=WriteConcern(w=0)) + coll = self.coll_w0 update_one = UpdateOne( {}, {'$set': {'y.$[i].b': 5}}, array_filters=[{'i.b': 1}]) update_many = UpdateMany( @@ -338,9 +337,7 @@ class TestBulk(BulkTestBase): self.assertEqual(5, len(result.inserted_ids)) def test_bulk_write_no_results(self): - - coll = self.coll.with_options(write_concern=WriteConcern(w=0)) - result = coll.bulk_write([InsertOne({})]) + result = self.coll_w0.bulk_write([InsertOne({})]) self.assertFalse(result.acknowledged) self.assertRaises(InvalidOperation, lambda: result.inserted_count) self.assertRaises(InvalidOperation, lambda: result.matched_count) @@ -359,6 +356,294 @@ class TestBulk(BulkTestBase): with self.assertRaises(TypeError): self.coll.bulk_write([{}]) + def test_upsert_large(self): + big = 'a' * (client_context.client.max_bson_size - 37) + result = self.coll.bulk_write([ + UpdateOne({'x': 1}, {'$set': {'s': big}}, upsert=True)]) + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 1, + 'nInserted': 0, + 'nRemoved': 0, + 'upserted': [{'index': 0, '_id': '...'}]}, + result.bulk_api_result) + + self.assertEqual(1, self.coll.count_documents({'x': 1})) + + def test_client_generated_upsert_id(self): + result = self.coll.bulk_write([ + UpdateOne({'_id': 0}, {'$set': {'a': 0}}, upsert=True), + ReplaceOne({'a': 1}, {'_id': 1}, upsert=True), + # This is just here to make the counts right in all cases. + ReplaceOne({'_id': 2}, {'_id': 2}, upsert=True), + ]) + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 3, + 'nInserted': 0, + 'nRemoved': 0, + 'upserted': [{'index': 0, '_id': 0}, + {'index': 1, '_id': 1}, + {'index': 2, '_id': 2}]}, + result.bulk_api_result) + + def test_single_ordered_batch(self): + result = self.coll.bulk_write([ + InsertOne({'a': 1}), + UpdateOne({'a': 1}, {'$set': {'b': 1}}), + UpdateOne({'a': 2}, {'$set': {'b': 2}}, upsert=True), + InsertOne({'a': 3}), + DeleteOne({'a': 3}), + ]) + self.assertEqualResponse( + {'nMatched': 1, + 'nModified': 1, + 'nUpserted': 1, + 'nInserted': 2, + 'nRemoved': 1, + 'upserted': [{'index': 2, '_id': '...'}]}, + result.bulk_api_result) + + def test_single_error_ordered_batch(self): + self.coll.create_index('a', unique=True) + self.addCleanup(self.coll.drop_index, [('a', 1)]) + requests = [ + InsertOne({'b': 1, 'a': 1}), + UpdateOne({'b': 2}, {'$set': {'a': 1}}, upsert=True), + InsertOne({'b': 3, 'a': 2}), + ] + try: + self.coll.bulk_write(requests) + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 0, + 'nInserted': 1, + 'nRemoved': 0, + 'upserted': [], + 'writeConcernErrors': [], + 'writeErrors': [ + {'index': 1, + 'code': 11000, + 'errmsg': '...', + 'op': {'q': {'b': 2}, + 'u': {'$set': {'a': 1}}, + 'multi': False, + 'upsert': True}}]}, + result) + + def test_multiple_error_ordered_batch(self): + self.coll.create_index('a', unique=True) + self.addCleanup(self.coll.drop_index, [('a', 1)]) + requests = [ + InsertOne({'b': 1, 'a': 1}), + UpdateOne({'b': 2}, {'$set': {'a': 1}}, upsert=True), + UpdateOne({'b': 3}, {'$set': {'a': 2}}, upsert=True), + UpdateOne({'b': 2}, {'$set': {'a': 1}}, upsert=True), + InsertOne({'b': 4, 'a': 3}), + InsertOne({'b': 5, 'a': 1}), + ] + + try: + self.coll.bulk_write(requests) + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 0, + 'nInserted': 1, + 'nRemoved': 0, + 'upserted': [], + 'writeConcernErrors': [], + 'writeErrors': [ + {'index': 1, + 'code': 11000, + 'errmsg': '...', + 'op': {'q': {'b': 2}, + 'u': {'$set': {'a': 1}}, + 'multi': False, + 'upsert': True}}]}, + result) + + def test_single_unordered_batch(self): + requests = [ + InsertOne({'a': 1}), + UpdateOne({'a': 1}, {'$set': {'b': 1}}), + UpdateOne({'a': 2}, {'$set': {'b': 2}}, upsert=True), + InsertOne({'a': 3}), + DeleteOne({'a': 3}), + ] + result = self.coll.bulk_write(requests, ordered=False) + self.assertEqualResponse( + {'nMatched': 1, + 'nModified': 1, + 'nUpserted': 1, + 'nInserted': 2, + 'nRemoved': 1, + 'upserted': [{'index': 2, '_id': '...'}], + 'writeErrors': [], + 'writeConcernErrors': []}, + result.bulk_api_result) + + def test_single_error_unordered_batch(self): + self.coll.create_index('a', unique=True) + self.addCleanup(self.coll.drop_index, [('a', 1)]) + requests = [ + InsertOne({'b': 1, 'a': 1}), + UpdateOne({'b': 2}, {'$set': {'a': 1}}, upsert=True), + InsertOne({'b': 3, 'a': 2}), + ] + + try: + self.coll.bulk_write(requests, ordered=False) + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 0, + 'nInserted': 2, + 'nRemoved': 0, + 'upserted': [], + 'writeConcernErrors': [], + 'writeErrors': [ + {'index': 1, + 'code': 11000, + 'errmsg': '...', + 'op': {'q': {'b': 2}, + 'u': {'$set': {'a': 1}}, + 'multi': False, + 'upsert': True}}]}, + result) + + def test_multiple_error_unordered_batch(self): + self.coll.create_index('a', unique=True) + self.addCleanup(self.coll.drop_index, [('a', 1)]) + requests = [ + InsertOne({'b': 1, 'a': 1}), + UpdateOne({'b': 2}, {'$set': {'a': 3}}, upsert=True), + UpdateOne({'b': 3}, {'$set': {'a': 4}}, upsert=True), + UpdateOne({'b': 4}, {'$set': {'a': 3}}, upsert=True), + InsertOne({'b': 5, 'a': 2}), + InsertOne({'b': 6, 'a': 1}), + ] + + try: + self.coll.bulk_write(requests, ordered=False) + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + # Assume the update at index 1 runs before the update at index 3, + # although the spec does not require it. Same for inserts. + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 2, + 'nInserted': 2, + 'nRemoved': 0, + 'upserted': [ + {'index': 1, '_id': '...'}, + {'index': 2, '_id': '...'}], + 'writeConcernErrors': [], + 'writeErrors': [ + {'index': 3, + 'code': 11000, + 'errmsg': '...', + 'op': {'q': {'b': 4}, + 'u': {'$set': {'a': 3}}, + 'multi': False, + 'upsert': True}}, + {'index': 5, + 'code': 11000, + 'errmsg': '...', + 'op': {'_id': '...', 'b': 6, 'a': 1}}]}, + result) + + def test_large_inserts_ordered(self): + big = 'x' * self.coll.database.client.max_bson_size + requests = [ + InsertOne({'b': 1, 'a': 1}), + InsertOne({'big': big}), + InsertOne({'b': 2, 'a': 2}), + ] + + try: + self.coll.bulk_write(requests) + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqual(1, result['nInserted']) + + self.coll.delete_many({}) + + big = 'x' * (1024 * 1024 * 4) + result = self.coll.bulk_write([ + InsertOne({'a': 1, 'big': big}), + InsertOne({'a': 2, 'big': big}), + InsertOne({'a': 3, 'big': big}), + InsertOne({'a': 4, 'big': big}), + InsertOne({'a': 5, 'big': big}), + InsertOne({'a': 6, 'big': big}), + ]) + + self.assertEqual(6, result.inserted_count) + self.assertEqual(6, self.coll.count_documents({})) + + def test_large_inserts_unordered(self): + big = 'x' * self.coll.database.client.max_bson_size + requests = [ + InsertOne({'b': 1, 'a': 1}), + InsertOne({'big': big}), + InsertOne({'b': 2, 'a': 2}), + ] + + try: + self.coll.bulk_write(requests, ordered=False) + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqual(2, result['nInserted']) + + self.coll.delete_many({}) + + big = 'x' * (1024 * 1024 * 4) + result = self.coll.bulk_write([ + InsertOne({'a': 1, 'big': big}), + InsertOne({'a': 2, 'big': big}), + InsertOne({'a': 3, 'big': big}), + InsertOne({'a': 4, 'big': big}), + InsertOne({'a': 5, 'big': big}), + InsertOne({'a': 6, 'big': big}), + ], ordered=False) + + self.assertEqual(6, result.inserted_count) + self.assertEqual(6, self.coll.count_documents({})) + class BulkAuthorizationTestBase(BulkTestBase): @@ -387,6 +672,73 @@ class BulkAuthorizationTestBase(BulkTestBase): remove_all_users(self.db) +class TestBulkUnacknowledged(BulkTestBase): + + def tearDown(self): + self.coll.delete_many({}) + + def test_no_results_ordered_success(self): + requests = [ + InsertOne({'a': 1}), + UpdateOne({'a': 3}, {'$set': {'b': 1}}, upsert=True), + InsertOne({'a': 2}), + DeleteOne({'a': 1}), + ] + result = self.coll_w0.bulk_write(requests) + self.assertFalse(result.acknowledged) + wait_until(lambda: 2 == self.coll.count_documents({}), + 'insert 2 documents') + wait_until(lambda: self.coll.find_one({'_id': 1}) is None, + 'removed {"_id": 1}') + + def test_no_results_ordered_failure(self): + requests = [ + InsertOne({'_id': 1}), + UpdateOne({'_id': 3}, {'$set': {'b': 1}}, upsert=True), + InsertOne({'_id': 2}), + # Fails with duplicate key error. + InsertOne({'_id': 1}), + # Should not be executed since the batch is ordered. + DeleteOne({'_id': 1}), + ] + result = self.coll_w0.bulk_write(requests) + self.assertFalse(result.acknowledged) + wait_until(lambda: 3 == self.coll.count_documents({}), + 'insert 3 documents') + self.assertEqual({'_id': 1}, self.coll.find_one({'_id': 1})) + + def test_no_results_unordered_success(self): + requests = [ + InsertOne({'a': 1}), + UpdateOne({'a': 3}, {'$set': {'b': 1}}, upsert=True), + InsertOne({'a': 2}), + DeleteOne({'a': 1}), + ] + result = self.coll_w0.bulk_write(requests, ordered=False) + self.assertFalse(result.acknowledged) + wait_until(lambda: 2 == self.coll.count_documents({}), + 'insert 2 documents') + wait_until(lambda: self.coll.find_one({'_id': 1}) is None, + 'removed {"_id": 1}') + + def test_no_results_unordered_failure(self): + requests = [ + InsertOne({'_id': 1}), + UpdateOne({'_id': 3}, {'$set': {'b': 1}}, upsert=True), + InsertOne({'_id': 2}), + # Fails with duplicate key error. + InsertOne({'_id': 1}), + # Should be executed since the batch is unordered. + DeleteOne({'_id': 1}), + ] + result = self.coll_w0.bulk_write(requests, ordered=False) + self.assertFalse(result.acknowledged) + wait_until(lambda: 2 == self.coll.count_documents({}), + 'insert 2 documents') + wait_until(lambda: self.coll.find_one({'_id': 1}) is None, + 'removed {"_id": 1}') + + class TestBulkAuthorization(BulkAuthorizationTestBase): def test_readonly(self): @@ -415,5 +767,205 @@ class TestBulkAuthorization(BulkAuthorizationTestBase): self.assertRaises(OperationFailure, coll.bulk_write, requests) self.assertEqual(set([1, 2]), set(self.coll.distinct('x'))) + +class TestBulkWriteConcern(BulkTestBase): + + @classmethod + def setUpClass(cls): + super(TestBulkWriteConcern, cls).setUpClass() + cls.w = client_context.w + cls.secondary = None + if cls.w > 1: + for member in client_context.ismaster['hosts']: + if member != client_context.ismaster['primary']: + cls.secondary = single_client(*partition_node(member)) + break + + # We tested wtimeout errors by specifying a write concern greater than + # the number of members, but in MongoDB 2.7.8+ this causes a different + # sort of error, "Not enough data-bearing nodes". In recent servers we + # use a failpoint to pause replication on a secondary. + cls.need_replication_stopped = client_context.version.at_least(2, 7, 8) + + @classmethod + def tearDownClass(cls): + if cls.secondary: + cls.secondary.close() + + def cause_wtimeout(self, requests, ordered): + if self.need_replication_stopped: + if not client_context.test_commands_enabled: + self.skipTest("Test commands must be enabled.") + + self.secondary.admin.command('configureFailPoint', + 'rsSyncApplyStop', + mode='alwaysOn') + + try: + coll = self.coll.with_options( + write_concern=WriteConcern(w=self.w, wtimeout=1)) + return coll.bulk_write(requests, ordered=ordered) + finally: + self.secondary.admin.command('configureFailPoint', + 'rsSyncApplyStop', + mode='off') + else: + coll = self.coll.with_options( + write_concern=WriteConcern(w=self.w + 1, wtimeout=1)) + return coll.bulk_write(requests, ordered=ordered) + + @client_context.require_replica_set + @client_context.require_secondaries_count(1) + def test_write_concern_failure_ordered(self): + # Ensure we don't raise on wnote. + coll_ww = self.coll.with_options(write_concern=WriteConcern(w=self.w)) + result = coll_ww.bulk_write([ + DeleteOne({"something": "that does no exist"})]) + self.assertTrue(result.acknowledged) + + requests = [ + InsertOne({'a': 1}), + InsertOne({'a': 2}) + ] + # Replication wtimeout is a 'soft' error. + # It shouldn't stop batch processing. + try: + self.cause_wtimeout(requests, ordered=True) + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 0, + 'nInserted': 2, + 'nRemoved': 0, + 'upserted': [], + 'writeErrors': []}, + result) + + # When talking to legacy servers there will be a + # write concern error for each operation. + self.assertTrue(len(result['writeConcernErrors']) > 0) + + failed = result['writeConcernErrors'][0] + self.assertEqual(64, failed['code']) + self.assertTrue(isinstance(failed['errmsg'], str)) + + self.coll.delete_many({}) + self.coll.create_index('a', unique=True) + self.addCleanup(self.coll.drop_index, [('a', 1)]) + + # Fail due to write concern support as well + # as duplicate key error on ordered batch. + requests = [ + InsertOne({'a': 1}), + ReplaceOne({'a': 3}, {'b': 1}, upsert=True), + InsertOne({'a': 1}), + InsertOne({'a': 2}), + ] + try: + self.cause_wtimeout(requests, ordered=True) + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 1, + 'nInserted': 1, + 'nRemoved': 0, + 'upserted': [{'index': 1, '_id': '...'}], + 'writeErrors': [ + {'index': 2, + 'code': 11000, + 'errmsg': '...', + 'op': {'_id': '...', 'a': 1}}]}, + result) + + self.assertTrue(len(result['writeConcernErrors']) > 1) + failed = result['writeErrors'][0] + self.assertTrue("duplicate" in failed['errmsg']) + + @client_context.require_replica_set + @client_context.require_secondaries_count(1) + def test_write_concern_failure_unordered(self): + # Ensure we don't raise on wnote. + coll_ww = self.coll.with_options(write_concern=WriteConcern(w=self.w)) + result = coll_ww.bulk_write([ + DeleteOne({"something": "that does no exist"})], ordered=False) + self.assertTrue(result.acknowledged) + + requests = [ + InsertOne({'a': 1}), + UpdateOne({'a': 3}, {'$set': {'a': 3, 'b': 1}}, upsert=True), + InsertOne({'a': 2}), + ] + # Replication wtimeout is a 'soft' error. + # It shouldn't stop batch processing. + try: + self.cause_wtimeout(requests, ordered=False) + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqual(2, result['nInserted']) + self.assertEqual(1, result['nUpserted']) + self.assertEqual(0, len(result['writeErrors'])) + # When talking to legacy servers there will be a + # write concern error for each operation. + self.assertTrue(len(result['writeConcernErrors']) > 1) + + self.coll.delete_many({}) + self.coll.create_index('a', unique=True) + self.addCleanup(self.coll.drop_index, [('a', 1)]) + + # Fail due to write concern support as well + # as duplicate key error on unordered batch. + requests = [ + InsertOne({'a': 1}), + UpdateOne({'a': 3}, {'$set': {'a': 3, 'b': 1}}, upsert=True), + InsertOne({'a': 1}), + InsertOne({'a': 2}), + ] + try: + self.cause_wtimeout(requests, ordered=False) + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqual(2, result['nInserted']) + self.assertEqual(1, result['nUpserted']) + self.assertEqual(1, len(result['writeErrors'])) + # When talking to legacy servers there will be a + # write concern error for each operation. + self.assertTrue(len(result['writeConcernErrors']) > 1) + + failed = result['writeErrors'][0] + self.assertEqual(2, failed['index']) + self.assertEqual(11000, failed['code']) + self.assertTrue(isinstance(failed['errmsg'], str)) + self.assertEqual(1, failed['op']['a']) + + failed = result['writeConcernErrors'][0] + self.assertEqual(64, failed['code']) + self.assertTrue(isinstance(failed['errmsg'], str)) + + upserts = result['upserted'] + self.assertEqual(1, len(upserts)) + self.assertEqual(1, upserts[0]['index']) + self.assertTrue(upserts[0].get('_id')) + + if __name__ == "__main__": unittest.main() diff --git a/test/test_collation.py b/test/test_collation.py index 3aca63d47..2052fc8db 100644 --- a/test/test_collation.py +++ b/test/test_collation.py @@ -282,49 +282,6 @@ class TestCollation(IntegrationTest): check_ops(delete_cmd['deletes']) check_ops(update_cmd['updates']) - @raisesConfigurationErrorForOldMongoDB - def test_bulk(self): - bulk = self.db.test.initialize_ordered_bulk_op() - bulk.find({'noCollation': 42}).remove_one() - bulk.find({'noCollation': 42}).remove() - bulk.find({'foo': 42}, collation=self.collation).remove_one() - bulk.find({'foo': 42}, collation=self.collation).remove() - bulk.find({'noCollation': 24}).replace_one({'bar': 42}) - bulk.find({'noCollation': 84}).upsert().update_one( - {'$set': {'foo': 10}}) - bulk.find({'noCollation': 45}).update({'$set': {'bar': 42}}) - bulk.find({'foo': 24}, collation=self.collation).replace_one( - {'foo': 42}) - bulk.find({'foo': 84}, collation=self.collation).upsert().update_one( - {'$set': {'foo': 10}}) - bulk.find({'foo': 45}, collation=self.collation).update({ - '$set': {'foo': 42}}) - bulk.execute() - - delete_cmd = self.listener.results['started'][0].command - update_cmd = self.listener.results['started'][1].command - - def check_ops(ops): - for op in ops: - if 'noCollation' in op['q']: - self.assertNotIn('collation', op) - else: - self.assertEqual(self.collation.document, - op['collation']) - - check_ops(delete_cmd['deletes']) - check_ops(update_cmd['updates']) - - @client_context.require_version_max(3, 3, 8) - def test_mixed_bulk_collation(self): - bulk = self.db.test.initialize_unordered_bulk_op() - bulk.find({'foo': 42}).upsert().update_one( - {'$set': {'bar': 10}}) - bulk.find({'foo': 43}, collation=self.collation).remove_one() - with self.assertRaises(ConfigurationError): - bulk.execute() - self.assertIsNone(self.db.test.find_one({'foo': 42})) - @raisesConfigurationErrorForOldMongoDB def test_indexes_same_keys_different_collations(self): self.db.test.drop() @@ -356,11 +313,6 @@ class TestCollation(IntegrationTest): collection.update_one( {'hello': 'world'}, {'$set': {'hello': 'moon'}}, collation=self.collation) - bulk = collection.initialize_ordered_bulk_op() - bulk.find({'hello': 'world'}, collation=self.collation).update_one( - {'$set': {'hello': 'moon'}}) - with self.assertRaises(ConfigurationError): - bulk.execute() update_one = UpdateOne({'hello': 'world'}, {'$set': {'hello': 'moon'}}, collation=self.collation) with self.assertRaises(ConfigurationError): diff --git a/test/test_collection.py b/test/test_collection.py index 8a8f4aa68..b8fdbf7ee 100644 --- a/test/test_collection.py +++ b/test/test_collection.py @@ -789,6 +789,20 @@ class TestCollection(IntegrationTest): self.assertFalse(result.acknowledged) self.assertEqual(20, db.test.count_documents({})) + def test_insert_many_generator(self): + coll = self.db.test + coll.delete_many({}) + + def gen(): + yield {'a': 1, 'b': 1} + yield {'a': 1, 'b': 2} + yield {'a': 2, 'b': 3} + yield {'a': 3, 'b': 5} + yield {'a': 5, 'b': 8} + + result = coll.insert_many(gen()) + self.assertEqual(5, len(result.inserted_ids)) + def test_insert_many_invalid(self): db = self.db diff --git a/test/test_legacy_api.py b/test/test_legacy_api.py index 246fe333f..2149f1f81 100644 --- a/test/test_legacy_api.py +++ b/test/test_legacy_api.py @@ -18,23 +18,11 @@ import sys sys.path[0:0] = [""] -from bson.son import SON from pymongo import ASCENDING, GEOHAYSTACK -from pymongo.common import partition_node -from pymongo.errors import (BulkWriteError, - ConfigurationError, - InvalidDocument, - InvalidOperation, - OperationFailure) from pymongo.operations import IndexModel -from test import client_context, unittest, SkipTest +from test import unittest from test.test_client import IntegrationTest -from test.test_bulk import BulkTestBase, BulkAuthorizationTestBase -from test.utils import (DeprecationFilter, - oid_generated_on_process, - rs_or_single_client_noauth, - single_client, - wait_until) +from test.utils import DeprecationFilter class TestDeprecations(IntegrationTest): @@ -58,1068 +46,5 @@ class TestDeprecations(IntegrationTest): DeprecationWarning, self.db.test.create_indexes, indexes) -class TestLegacy(IntegrationTest): - - @classmethod - def setUpClass(cls): - super(TestLegacy, cls).setUpClass() - cls.w = client_context.w - cls.deprecation_filter = DeprecationFilter() - - @classmethod - def tearDownClass(cls): - cls.deprecation_filter.stop() - - -class TestLegacyBulk(BulkTestBase): - - @classmethod - def setUpClass(cls): - super(TestLegacyBulk, cls).setUpClass() - cls.deprecation_filter = DeprecationFilter() - - @classmethod - def tearDownClass(cls): - cls.deprecation_filter.stop() - - def test_empty(self): - bulk = self.coll.initialize_ordered_bulk_op() - self.assertRaises(InvalidOperation, bulk.execute) - - def test_find(self): - # find() requires a selector. - bulk = self.coll.initialize_ordered_bulk_op() - self.assertRaises(TypeError, bulk.find) - self.assertRaises(TypeError, bulk.find, 'foo') - # No error. - bulk.find({}) - - def test_insert(self): - expected = { - 'nMatched': 0, - 'nModified': 0, - 'nUpserted': 0, - 'nInserted': 1, - 'nRemoved': 0, - 'upserted': [], - 'writeErrors': [], - 'writeConcernErrors': [] - } - - bulk = self.coll.initialize_ordered_bulk_op() - self.assertRaises(TypeError, bulk.insert, 1) - - # find() before insert() is prohibited. - self.assertRaises(AttributeError, lambda: bulk.find({}).insert({})) - - # We don't allow multiple documents per call. - self.assertRaises(TypeError, bulk.insert, [{}, {}]) - self.assertRaises(TypeError, bulk.insert, ({} for _ in range(2))) - - bulk.insert({}) - result = bulk.execute() - self.assertEqualResponse(expected, result) - - self.assertEqual(1, self.coll.count()) - doc = self.coll.find_one() - self.assertTrue(oid_generated_on_process(doc['_id'])) - - bulk = self.coll.initialize_unordered_bulk_op() - bulk.insert({}) - result = bulk.execute() - self.assertEqualResponse(expected, result) - - self.assertEqual(2, self.coll.count()) - - def test_update(self): - - expected = { - 'nMatched': 2, - 'nModified': 2, - 'nUpserted': 0, - 'nInserted': 0, - 'nRemoved': 0, - 'upserted': [], - 'writeErrors': [], - 'writeConcernErrors': [] - } - self.coll.insert_many([{}, {}]) - - bulk = self.coll.initialize_ordered_bulk_op() - - # update() requires find() first. - self.assertRaises( - AttributeError, - lambda: bulk.update({'$set': {'x': 1}})) - - self.assertRaises(TypeError, bulk.find({}).update, 1) - self.assertRaises(ValueError, bulk.find({}).update, {}) - - # All fields must be $-operators. - self.assertRaises(ValueError, bulk.find({}).update, {'foo': 'bar'}) - bulk.find({}).update({'$set': {'foo': 'bar'}}) - result = bulk.execute() - self.assertEqualResponse(expected, result) - self.assertEqual(self.coll.find({'foo': 'bar'}).count(), 2) - - # All fields must be $-operators -- validated server-side. - bulk = self.coll.initialize_ordered_bulk_op() - updates = SON([('$set', {'x': 1}), ('y', 1)]) - bulk.find({}).update(updates) - self.assertRaises(BulkWriteError, bulk.execute) - - self.coll.delete_many({}) - self.coll.insert_many([{}, {}]) - - bulk = self.coll.initialize_unordered_bulk_op() - bulk.find({}).update({'$set': {'bim': 'baz'}}) - result = bulk.execute() - self.assertEqualResponse( - {'nMatched': 2, - 'nModified': 2, - 'nUpserted': 0, - 'nInserted': 0, - 'nRemoved': 0, - 'upserted': [], - 'writeErrors': [], - 'writeConcernErrors': []}, - result) - - self.assertEqual(self.coll.find({'bim': 'baz'}).count(), 2) - - self.coll.insert_one({'x': 1}) - bulk = self.coll.initialize_unordered_bulk_op() - bulk.find({'x': 1}).update({'$set': {'x': 42}}) - result = bulk.execute() - self.assertEqualResponse( - {'nMatched': 1, - 'nModified': 1, - 'nUpserted': 0, - 'nInserted': 0, - 'nRemoved': 0, - 'upserted': [], - 'writeErrors': [], - 'writeConcernErrors': []}, - result) - - self.assertEqual(1, self.coll.find({'x': 42}).count()) - - # Second time, x is already 42 so nModified is 0. - bulk = self.coll.initialize_unordered_bulk_op() - bulk.find({'x': 42}).update({'$set': {'x': 42}}) - result = bulk.execute() - self.assertEqualResponse( - {'nMatched': 1, - 'nModified': 0, - 'nUpserted': 0, - 'nInserted': 0, - 'nRemoved': 0, - 'upserted': [], - 'writeErrors': [], - 'writeConcernErrors': []}, - result) - - def test_update_one(self): - - expected = { - 'nMatched': 1, - 'nModified': 1, - 'nUpserted': 0, - 'nInserted': 0, - 'nRemoved': 0, - 'upserted': [], - 'writeErrors': [], - 'writeConcernErrors': [] - } - - self.coll.insert_many([{}, {}]) - - bulk = self.coll.initialize_ordered_bulk_op() - - # update_one() requires find() first. - self.assertRaises( - AttributeError, - lambda: bulk.update_one({'$set': {'x': 1}})) - - self.assertRaises(TypeError, bulk.find({}).update_one, 1) - self.assertRaises(ValueError, bulk.find({}).update_one, {}) - self.assertRaises(ValueError, bulk.find({}).update_one, {'foo': 'bar'}) - bulk.find({}).update_one({'$set': {'foo': 'bar'}}) - result = bulk.execute() - self.assertEqualResponse(expected, result) - - self.assertEqual(self.coll.find({'foo': 'bar'}).count(), 1) - - self.coll.delete_many({}) - self.coll.insert_many([{}, {}]) - - bulk = self.coll.initialize_unordered_bulk_op() - bulk.find({}).update_one({'$set': {'bim': 'baz'}}) - result = bulk.execute() - self.assertEqualResponse(expected, result) - - self.assertEqual(self.coll.find({'bim': 'baz'}).count(), 1) - - # All fields must be $-operators -- validated server-side. - bulk = self.coll.initialize_ordered_bulk_op() - updates = SON([('$set', {'x': 1}), ('y', 1)]) - bulk.find({}).update_one(updates) - self.assertRaises(BulkWriteError, bulk.execute) - - def test_replace_one(self): - - expected = { - 'nMatched': 1, - 'nModified': 1, - 'nUpserted': 0, - 'nInserted': 0, - 'nRemoved': 0, - 'upserted': [], - 'writeErrors': [], - 'writeConcernErrors': [] - } - - self.coll.insert_many([{}, {}]) - - bulk = self.coll.initialize_ordered_bulk_op() - self.assertRaises(TypeError, bulk.find({}).replace_one, 1) - self.assertRaises(ValueError, - bulk.find({}).replace_one, {'$set': {'foo': 'bar'}}) - bulk.find({}).replace_one({'foo': 'bar'}) - result = bulk.execute() - self.assertEqualResponse(expected, result) - - self.assertEqual(self.coll.find({'foo': 'bar'}).count(), 1) - - self.coll.delete_many({}) - self.coll.insert_many([{}, {}]) - - bulk = self.coll.initialize_unordered_bulk_op() - bulk.find({}).replace_one({'bim': 'baz'}) - result = bulk.execute() - self.assertEqualResponse(expected, result) - - self.assertEqual(self.coll.find({'bim': 'baz'}).count(), 1) - - def test_remove(self): - # Test removing all documents, ordered. - expected = { - 'nMatched': 0, - 'nModified': 0, - 'nUpserted': 0, - 'nInserted': 0, - 'nRemoved': 2, - 'upserted': [], - 'writeErrors': [], - 'writeConcernErrors': [] - } - self.coll.insert_many([{}, {}]) - - bulk = self.coll.initialize_ordered_bulk_op() - - # remove() must be preceded by find(). - self.assertRaises(AttributeError, lambda: bulk.remove()) - bulk.find({}).remove() - result = bulk.execute() - self.assertEqualResponse(expected, result) - - self.assertEqual(self.coll.count(), 0) - - # Test removing some documents, ordered. - self.coll.insert_many([{}, {'x': 1}, {}, {'x': 1}]) - - bulk = self.coll.initialize_ordered_bulk_op() - - bulk.find({'x': 1}).remove() - result = bulk.execute() - self.assertEqualResponse( - {'nMatched': 0, - 'nModified': 0, - 'nUpserted': 0, - 'nInserted': 0, - 'nRemoved': 2, - 'upserted': [], - 'writeErrors': [], - 'writeConcernErrors': []}, - result) - - self.assertEqual(self.coll.count(), 2) - self.coll.delete_many({}) - - # Test removing all documents, unordered. - self.coll.insert_many([{}, {}]) - - bulk = self.coll.initialize_unordered_bulk_op() - bulk.find({}).remove() - result = bulk.execute() - self.assertEqualResponse( - {'nMatched': 0, - 'nModified': 0, - 'nUpserted': 0, - 'nInserted': 0, - 'nRemoved': 2, - 'upserted': [], - 'writeErrors': [], - 'writeConcernErrors': []}, - result) - - # Test removing some documents, unordered. - self.assertEqual(self.coll.count(), 0) - - self.coll.insert_many([{}, {'x': 1}, {}, {'x': 1}]) - - bulk = self.coll.initialize_unordered_bulk_op() - bulk.find({'x': 1}).remove() - result = bulk.execute() - self.assertEqualResponse( - {'nMatched': 0, - 'nModified': 0, - 'nUpserted': 0, - 'nInserted': 0, - 'nRemoved': 2, - 'upserted': [], - 'writeErrors': [], - 'writeConcernErrors': []}, - result) - - self.assertEqual(self.coll.count(), 2) - self.coll.delete_many({}) - - def test_remove_one(self): - - bulk = self.coll.initialize_ordered_bulk_op() - - # remove_one() must be preceded by find(). - self.assertRaises(AttributeError, lambda: bulk.remove_one()) - - # Test removing one document, empty selector. - # First ordered, then unordered. - self.coll.insert_many([{}, {}]) - expected = { - 'nMatched': 0, - 'nModified': 0, - 'nUpserted': 0, - 'nInserted': 0, - 'nRemoved': 1, - 'upserted': [], - 'writeErrors': [], - 'writeConcernErrors': [] - } - - bulk.find({}).remove_one() - result = bulk.execute() - self.assertEqualResponse(expected, result) - - self.assertEqual(self.coll.count(), 1) - - self.coll.insert_one({}) - - bulk = self.coll.initialize_unordered_bulk_op() - bulk.find({}).remove_one() - result = bulk.execute() - self.assertEqualResponse(expected, result) - - self.assertEqual(self.coll.count(), 1) - - # Test removing one document, with a selector. - # First ordered, then unordered. - self.coll.insert_one({'x': 1}) - - bulk = self.coll.initialize_ordered_bulk_op() - bulk.find({'x': 1}).remove_one() - result = bulk.execute() - self.assertEqualResponse(expected, result) - - self.assertEqual([{}], list(self.coll.find({}, {'_id': False}))) - self.coll.insert_one({'x': 1}) - - bulk = self.coll.initialize_unordered_bulk_op() - bulk.find({'x': 1}).remove_one() - result = bulk.execute() - self.assertEqualResponse(expected, result) - - self.assertEqual([{}], list(self.coll.find({}, {'_id': False}))) - - def test_upsert(self): - bulk = self.coll.initialize_ordered_bulk_op() - - # upsert() requires find() first. - self.assertRaises( - AttributeError, - lambda: bulk.upsert()) - - expected = { - 'nMatched': 0, - 'nModified': 0, - 'nUpserted': 1, - 'nInserted': 0, - 'nRemoved': 0, - 'upserted': [{'index': 0, '_id': '...'}] - } - - bulk.find({}).upsert().replace_one({'foo': 'bar'}) - result = bulk.execute() - self.assertEqualResponse(expected, result) - - bulk = self.coll.initialize_ordered_bulk_op() - bulk.find({}).upsert().update_one({'$set': {'bim': 'baz'}}) - result = bulk.execute() - self.assertEqualResponse( - {'nMatched': 1, - 'nModified': 1, - 'nUpserted': 0, - 'nInserted': 0, - 'nRemoved': 0, - 'upserted': [], - 'writeErrors': [], - 'writeConcernErrors': []}, - result) - - self.assertEqual(self.coll.find({'bim': 'baz'}).count(), 1) - - bulk = self.coll.initialize_ordered_bulk_op() - bulk.find({}).upsert().update({'$set': {'bim': 'bop'}}) - # Non-upsert, no matches. - bulk.find({'x': 1}).update({'$set': {'x': 2}}) - result = bulk.execute() - self.assertEqualResponse( - {'nMatched': 1, - 'nModified': 1, - 'nUpserted': 0, - 'nInserted': 0, - 'nRemoved': 0, - 'upserted': [], - 'writeErrors': [], - 'writeConcernErrors': []}, - result) - - self.assertEqual(self.coll.find({'bim': 'bop'}).count(), 1) - self.assertEqual(self.coll.find({'x': 2}).count(), 0) - - def test_upsert_large(self): - big = 'a' * (client_context.client.max_bson_size - 37) - bulk = self.coll.initialize_ordered_bulk_op() - bulk.find({'x': 1}).upsert().update({'$set': {'s': big}}) - result = bulk.execute() - self.assertEqualResponse( - {'nMatched': 0, - 'nModified': 0, - 'nUpserted': 1, - 'nInserted': 0, - 'nRemoved': 0, - 'upserted': [{'index': 0, '_id': '...'}]}, - result) - - self.assertEqual(1, self.coll.find({'x': 1}).count()) - - def test_client_generated_upsert_id(self): - batch = self.coll.initialize_ordered_bulk_op() - batch.find({'_id': 0}).upsert().update_one({'$set': {'a': 0}}) - batch.find({'a': 1}).upsert().replace_one({'_id': 1}) - # This is just here to make the counts right in all cases. - batch.find({'_id': 2}).upsert().replace_one({'_id': 2}) - result = batch.execute() - self.assertEqualResponse( - {'nMatched': 0, - 'nModified': 0, - 'nUpserted': 3, - 'nInserted': 0, - 'nRemoved': 0, - 'upserted': [{'index': 0, '_id': 0}, - {'index': 1, '_id': 1}, - {'index': 2, '_id': 2}]}, - result) - - def test_single_ordered_batch(self): - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({'a': 1}) - batch.find({'a': 1}).update_one({'$set': {'b': 1}}) - batch.find({'a': 2}).upsert().update_one({'$set': {'b': 2}}) - batch.insert({'a': 3}) - batch.find({'a': 3}).remove() - result = batch.execute() - self.assertEqualResponse( - {'nMatched': 1, - 'nModified': 1, - 'nUpserted': 1, - 'nInserted': 2, - 'nRemoved': 1, - 'upserted': [{'index': 2, '_id': '...'}]}, - result) - - def test_single_error_ordered_batch(self): - self.coll.create_index('a', unique=True) - self.addCleanup(self.coll.drop_index, [('a', 1)]) - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({'b': 1, 'a': 1}) - batch.find({'b': 2}).upsert().update_one({'$set': {'a': 1}}) - batch.insert({'b': 3, 'a': 2}) - - try: - batch.execute() - except BulkWriteError as exc: - result = exc.details - self.assertEqual(exc.code, 65) - else: - self.fail("Error not raised") - - self.assertEqualResponse( - {'nMatched': 0, - 'nModified': 0, - 'nUpserted': 0, - 'nInserted': 1, - 'nRemoved': 0, - 'upserted': [], - 'writeConcernErrors': [], - 'writeErrors': [ - {'index': 1, - 'code': 11000, - 'errmsg': '...', - 'op': {'q': {'b': 2}, - 'u': {'$set': {'a': 1}}, - 'multi': False, - 'upsert': True}}]}, - result) - - def test_multiple_error_ordered_batch(self): - self.coll.create_index('a', unique=True) - self.addCleanup(self.coll.drop_index, [('a', 1)]) - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({'b': 1, 'a': 1}) - batch.find({'b': 2}).upsert().update_one({'$set': {'a': 1}}) - batch.find({'b': 3}).upsert().update_one({'$set': {'a': 2}}) - batch.find({'b': 2}).upsert().update_one({'$set': {'a': 1}}) - batch.insert({'b': 4, 'a': 3}) - batch.insert({'b': 5, 'a': 1}) - - try: - batch.execute() - except BulkWriteError as exc: - result = exc.details - self.assertEqual(exc.code, 65) - else: - self.fail("Error not raised") - - self.assertEqualResponse( - {'nMatched': 0, - 'nModified': 0, - 'nUpserted': 0, - 'nInserted': 1, - 'nRemoved': 0, - 'upserted': [], - 'writeConcernErrors': [], - 'writeErrors': [ - {'index': 1, - 'code': 11000, - 'errmsg': '...', - 'op': {'q': {'b': 2}, - 'u': {'$set': {'a': 1}}, - 'multi': False, - 'upsert': True}}]}, - result) - - def test_single_unordered_batch(self): - batch = self.coll.initialize_unordered_bulk_op() - batch.insert({'a': 1}) - batch.find({'a': 1}).update_one({'$set': {'b': 1}}) - batch.find({'a': 2}).upsert().update_one({'$set': {'b': 2}}) - batch.insert({'a': 3}) - batch.find({'a': 3}).remove() - result = batch.execute() - self.assertEqualResponse( - {'nMatched': 1, - 'nModified': 1, - 'nUpserted': 1, - 'nInserted': 2, - 'nRemoved': 1, - 'upserted': [{'index': 2, '_id': '...'}], - 'writeErrors': [], - 'writeConcernErrors': []}, - result) - - def test_single_error_unordered_batch(self): - self.coll.create_index('a', unique=True) - self.addCleanup(self.coll.drop_index, [('a', 1)]) - batch = self.coll.initialize_unordered_bulk_op() - batch.insert({'b': 1, 'a': 1}) - batch.find({'b': 2}).upsert().update_one({'$set': {'a': 1}}) - batch.insert({'b': 3, 'a': 2}) - - try: - batch.execute() - except BulkWriteError as exc: - result = exc.details - self.assertEqual(exc.code, 65) - else: - self.fail("Error not raised") - - self.assertEqualResponse( - {'nMatched': 0, - 'nModified': 0, - 'nUpserted': 0, - 'nInserted': 2, - 'nRemoved': 0, - 'upserted': [], - 'writeConcernErrors': [], - 'writeErrors': [ - {'index': 1, - 'code': 11000, - 'errmsg': '...', - 'op': {'q': {'b': 2}, - 'u': {'$set': {'a': 1}}, - 'multi': False, - 'upsert': True}}]}, - result) - - def test_multiple_error_unordered_batch(self): - self.coll.create_index('a', unique=True) - self.addCleanup(self.coll.drop_index, [('a', 1)]) - batch = self.coll.initialize_unordered_bulk_op() - batch.insert({'b': 1, 'a': 1}) - batch.find({'b': 2}).upsert().update_one({'$set': {'a': 3}}) - batch.find({'b': 3}).upsert().update_one({'$set': {'a': 4}}) - batch.find({'b': 4}).upsert().update_one({'$set': {'a': 3}}) - batch.insert({'b': 5, 'a': 2}) - batch.insert({'b': 6, 'a': 1}) - - try: - batch.execute() - except BulkWriteError as exc: - result = exc.details - self.assertEqual(exc.code, 65) - else: - self.fail("Error not raised") - # Assume the update at index 1 runs before the update at index 3, - # although the spec does not require it. Same for inserts. - self.assertEqualResponse( - {'nMatched': 0, - 'nModified': 0, - 'nUpserted': 2, - 'nInserted': 2, - 'nRemoved': 0, - 'upserted': [ - {'index': 1, '_id': '...'}, - {'index': 2, '_id': '...'}], - 'writeConcernErrors': [], - 'writeErrors': [ - {'index': 3, - 'code': 11000, - 'errmsg': '...', - 'op': {'q': {'b': 4}, - 'u': {'$set': {'a': 3}}, - 'multi': False, - 'upsert': True}}, - {'index': 5, - 'code': 11000, - 'errmsg': '...', - 'op': {'_id': '...', 'b': 6, 'a': 1}}]}, - result) - - @client_context.require_version_max(4, 8) # PYTHON-2436 - def test_large_inserts_ordered(self): - big = 'x' * self.coll.database.client.max_bson_size - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({'b': 1, 'a': 1}) - batch.insert({'big': big}) - batch.insert({'b': 2, 'a': 2}) - - try: - batch.execute() - except BulkWriteError as exc: - result = exc.details - self.assertEqual(exc.code, 65) - else: - self.fail("Error not raised") - - self.assertEqual(1, result['nInserted']) - - self.coll.delete_many({}) - - big = 'x' * (1024 * 1024 * 4) - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({'a': 1, 'big': big}) - batch.insert({'a': 2, 'big': big}) - batch.insert({'a': 3, 'big': big}) - batch.insert({'a': 4, 'big': big}) - batch.insert({'a': 5, 'big': big}) - batch.insert({'a': 6, 'big': big}) - result = batch.execute() - - self.assertEqual(6, result['nInserted']) - self.assertEqual(6, self.coll.count()) - - def test_large_inserts_unordered(self): - big = 'x' * self.coll.database.client.max_bson_size - batch = self.coll.initialize_unordered_bulk_op() - batch.insert({'b': 1, 'a': 1}) - batch.insert({'big': big}) - batch.insert({'b': 2, 'a': 2}) - - try: - batch.execute() - except BulkWriteError as exc: - result = exc.details - self.assertEqual(exc.code, 65) - else: - self.fail("Error not raised") - - self.assertEqual(2, result['nInserted']) - - self.coll.delete_many({}) - - big = 'x' * (1024 * 1024 * 4) - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({'a': 1, 'big': big}) - batch.insert({'a': 2, 'big': big}) - batch.insert({'a': 3, 'big': big}) - batch.insert({'a': 4, 'big': big}) - batch.insert({'a': 5, 'big': big}) - batch.insert({'a': 6, 'big': big}) - result = batch.execute() - - self.assertEqual(6, result['nInserted']) - self.assertEqual(6, self.coll.count()) - - def test_numerous_inserts(self): - # Ensure we don't exceed server's 1000-document batch size limit. - n_docs = 2100 - batch = self.coll.initialize_unordered_bulk_op() - for _ in range(n_docs): - batch.insert({}) - - result = batch.execute() - self.assertEqual(n_docs, result['nInserted']) - self.assertEqual(n_docs, self.coll.count()) - - # Same with ordered bulk. - self.coll.delete_many({}) - batch = self.coll.initialize_ordered_bulk_op() - for _ in range(n_docs): - batch.insert({}) - - result = batch.execute() - self.assertEqual(n_docs, result['nInserted']) - self.assertEqual(n_docs, self.coll.count()) - - def test_multiple_execution(self): - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({}) - batch.execute() - self.assertRaises(InvalidOperation, batch.execute) - - def test_generator_insert(self): - def gen(): - yield {'a': 1, 'b': 1} - yield {'a': 1, 'b': 2} - yield {'a': 2, 'b': 3} - yield {'a': 3, 'b': 5} - yield {'a': 5, 'b': 8} - - result = self.coll.insert_many(gen()) - self.assertEqual(5, len(result.inserted_ids)) - - -class TestLegacyBulkNoResults(BulkTestBase): - - @classmethod - def setUpClass(cls): - super(TestLegacyBulkNoResults, cls).setUpClass() - cls.deprecation_filter = DeprecationFilter() - - @classmethod - def tearDownClass(cls): - cls.deprecation_filter.stop() - - def tearDown(self): - self.coll.delete_many({}) - - def test_no_results_ordered_success(self): - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({'_id': 1}) - batch.find({'_id': 3}).upsert().update_one({'$set': {'b': 1}}) - batch.insert({'_id': 2}) - batch.find({'_id': 1}).remove_one() - self.assertTrue(batch.execute({'w': 0}) is None) - wait_until(lambda: 2 == self.coll.count(), - 'insert 2 documents') - wait_until(lambda: self.coll.find_one({'_id': 1}) is None, - 'removed {"_id": 1}') - - def test_no_results_ordered_failure(self): - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({'_id': 1}) - batch.find({'_id': 3}).upsert().update_one({'$set': {'b': 1}}) - batch.insert({'_id': 2}) - # Fails with duplicate key error. - batch.insert({'_id': 1}) - # Should not be executed since the batch is ordered. - batch.find({'_id': 1}).remove_one() - self.assertTrue(batch.execute({'w': 0}) is None) - wait_until(lambda: 3 == self.coll.count(), - 'insert 3 documents') - self.assertEqual({'_id': 1}, self.coll.find_one({'_id': 1})) - - def test_no_results_unordered_success(self): - batch = self.coll.initialize_unordered_bulk_op() - batch.insert({'_id': 1}) - batch.find({'_id': 3}).upsert().update_one({'$set': {'b': 1}}) - batch.insert({'_id': 2}) - batch.find({'_id': 1}).remove_one() - self.assertTrue(batch.execute({'w': 0}) is None) - wait_until(lambda: 2 == self.coll.count(), - 'insert 2 documents') - wait_until(lambda: self.coll.find_one({'_id': 1}) is None, - 'removed {"_id": 1}') - - def test_no_results_unordered_failure(self): - batch = self.coll.initialize_unordered_bulk_op() - batch.insert({'_id': 1}) - batch.find({'_id': 3}).upsert().update_one({'$set': {'b': 1}}) - batch.insert({'_id': 2}) - # Fails with duplicate key error. - batch.insert({'_id': 1}) - # Should be executed since the batch is unordered. - batch.find({'_id': 1}).remove_one() - self.assertTrue(batch.execute({'w': 0}) is None) - wait_until(lambda: 2 == self.coll.count(), - 'insert 2 documents') - wait_until(lambda: self.coll.find_one({'_id': 1}) is None, - 'removed {"_id": 1}') - - -class TestLegacyBulkWriteConcern(BulkTestBase): - - @classmethod - def setUpClass(cls): - super(TestLegacyBulkWriteConcern, cls).setUpClass() - cls.w = client_context.w - cls.secondary = None - if cls.w > 1: - for member in client_context.ismaster['hosts']: - if member != client_context.ismaster['primary']: - cls.secondary = single_client(*partition_node(member)) - break - - # We tested wtimeout errors by specifying a write concern greater than - # the number of members, but in MongoDB 2.7.8+ this causes a different - # sort of error, "Not enough data-bearing nodes". In recent servers we - # use a failpoint to pause replication on a secondary. - cls.need_replication_stopped = client_context.version.at_least(2, 7, 8) - cls.deprecation_filter = DeprecationFilter() - - @classmethod - def tearDownClass(cls): - cls.deprecation_filter.stop() - if cls.secondary: - cls.secondary.close() - - def cause_wtimeout(self, batch): - if self.need_replication_stopped: - if not client_context.test_commands_enabled: - raise SkipTest("Test commands must be enabled.") - - self.secondary.admin.command('configureFailPoint', - 'rsSyncApplyStop', - mode='alwaysOn') - - try: - return batch.execute({'w': self.w, 'wtimeout': 1}) - finally: - self.secondary.admin.command('configureFailPoint', - 'rsSyncApplyStop', - mode='off') - else: - return batch.execute({'w': self.w + 1, 'wtimeout': 1}) - - def test_fsync_and_j(self): - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({'a': 1}) - self.assertRaises( - ConfigurationError, - batch.execute, {'fsync': True, 'j': True}) - - @client_context.require_replica_set - def test_write_concern_failure_ordered(self): - # Ensure we don't raise on wnote. - batch = self.coll.initialize_ordered_bulk_op() - batch.find({"something": "that does no exist"}).remove() - self.assertTrue(batch.execute({"w": self.w})) - - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({'a': 1}) - batch.insert({'a': 2}) - - # Replication wtimeout is a 'soft' error. - # It shouldn't stop batch processing. - try: - self.cause_wtimeout(batch) - except BulkWriteError as exc: - result = exc.details - self.assertEqual(exc.code, 65) - else: - self.fail("Error not raised") - - self.assertEqualResponse( - {'nMatched': 0, - 'nModified': 0, - 'nUpserted': 0, - 'nInserted': 2, - 'nRemoved': 0, - 'upserted': [], - 'writeErrors': []}, - result) - - # When talking to legacy servers there will be a - # write concern error for each operation. - self.assertTrue(len(result['writeConcernErrors']) > 0) - - failed = result['writeConcernErrors'][0] - self.assertEqual(64, failed['code']) - self.assertTrue(isinstance(failed['errmsg'], str)) - - self.coll.delete_many({}) - self.coll.create_index('a', unique=True) - self.addCleanup(self.coll.drop_index, [('a', 1)]) - - # Fail due to write concern support as well - # as duplicate key error on ordered batch. - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({'a': 1}) - batch.find({'a': 3}).upsert().replace_one({'b': 1}) - batch.insert({'a': 1}) - batch.insert({'a': 2}) - try: - self.cause_wtimeout(batch) - except BulkWriteError as exc: - result = exc.details - self.assertEqual(exc.code, 65) - else: - self.fail("Error not raised") - - self.assertEqualResponse( - {'nMatched': 0, - 'nModified': 0, - 'nUpserted': 1, - 'nInserted': 1, - 'nRemoved': 0, - 'upserted': [{'index': 1, '_id': '...'}], - 'writeErrors': [ - {'index': 2, - 'code': 11000, - 'errmsg': '...', - 'op': {'_id': '...', 'a': 1}}]}, - result) - - self.assertTrue(len(result['writeConcernErrors']) > 1) - failed = result['writeErrors'][0] - self.assertTrue("duplicate" in failed['errmsg']) - - @client_context.require_replica_set - def test_write_concern_failure_unordered(self): - # Ensure we don't raise on wnote. - batch = self.coll.initialize_unordered_bulk_op() - batch.find({"something": "that does no exist"}).remove() - self.assertTrue(batch.execute({"w": self.w})) - - batch = self.coll.initialize_unordered_bulk_op() - batch.insert({'a': 1}) - batch.find({'a': 3}).upsert().update_one({'$set': {'a': 3, 'b': 1}}) - batch.insert({'a': 2}) - - # Replication wtimeout is a 'soft' error. - # It shouldn't stop batch processing. - try: - self.cause_wtimeout(batch) - except BulkWriteError as exc: - result = exc.details - self.assertEqual(exc.code, 65) - else: - self.fail("Error not raised") - - self.assertEqual(2, result['nInserted']) - self.assertEqual(1, result['nUpserted']) - self.assertEqual(0, len(result['writeErrors'])) - # When talking to legacy servers there will be a - # write concern error for each operation. - self.assertTrue(len(result['writeConcernErrors']) > 1) - - self.coll.delete_many({}) - self.coll.create_index('a', unique=True) - self.addCleanup(self.coll.drop_index, [('a', 1)]) - - # Fail due to write concern support as well - # as duplicate key error on unordered batch. - batch = self.coll.initialize_unordered_bulk_op() - batch.insert({'a': 1}) - batch.find({'a': 3}).upsert().update_one({'$set': {'a': 3, - 'b': 1}}) - batch.insert({'a': 1}) - batch.insert({'a': 2}) - try: - self.cause_wtimeout(batch) - except BulkWriteError as exc: - result = exc.details - self.assertEqual(exc.code, 65) - else: - self.fail("Error not raised") - - self.assertEqual(2, result['nInserted']) - self.assertEqual(1, result['nUpserted']) - self.assertEqual(1, len(result['writeErrors'])) - # When talking to legacy servers there will be a - # write concern error for each operation. - self.assertTrue(len(result['writeConcernErrors']) > 1) - - failed = result['writeErrors'][0] - self.assertEqual(2, failed['index']) - self.assertEqual(11000, failed['code']) - self.assertTrue(isinstance(failed['errmsg'], str)) - self.assertEqual(1, failed['op']['a']) - - failed = result['writeConcernErrors'][0] - self.assertEqual(64, failed['code']) - self.assertTrue(isinstance(failed['errmsg'], str)) - - upserts = result['upserted'] - self.assertEqual(1, len(upserts)) - self.assertEqual(1, upserts[0]['index']) - self.assertTrue(upserts[0].get('_id')) - - -class TestLegacyBulkAuthorization(BulkAuthorizationTestBase): - - @classmethod - def setUpClass(cls): - super(TestLegacyBulkAuthorization, cls).setUpClass() - cls.deprecation_filter = DeprecationFilter() - - @classmethod - def tearDownClass(cls): - cls.deprecation_filter.stop() - - def test_readonly(self): - # We test that an authorization failure aborts the batch and is raised - # as OperationFailure. - cli = rs_or_single_client_noauth( - username='readonly', password='pw', authSource='pymongo_test') - coll = cli.pymongo_test.test - bulk = coll.initialize_ordered_bulk_op() - bulk.insert({'x': 1}) - self.assertRaises(OperationFailure, bulk.execute) - - def test_no_remove(self): - # We test that an authorization failure aborts the batch and is raised - # as OperationFailure. - cli = rs_or_single_client_noauth( - username='noremove', password='pw', authSource='pymongo_test') - coll = cli.pymongo_test.test - bulk = coll.initialize_ordered_bulk_op() - bulk.insert({'x': 1}) - bulk.find({'x': 2}).upsert().replace_one({'x': 2}) - bulk.find({}).remove() # Prohibited. - bulk.insert({'x': 3}) # Never attempted. - self.assertRaises(OperationFailure, bulk.execute) - self.assertEqual(set([1, 2]), set(self.coll.distinct('x'))) - if __name__ == "__main__": unittest.main() diff --git a/test/test_session.py b/test/test_session.py index e92b5bbbb..5bcad4e75 100644 --- a/test/test_session.py +++ b/test/test_session.py @@ -1081,14 +1081,6 @@ class TestClusterTime(IntegrationTest): self.addCleanup(collection.drop) self.addCleanup(client.pymongo_test.collection2.drop) - def bulk_insert(ordered): - if ordered: - bulk = collection.initialize_ordered_bulk_op() - else: - bulk = collection.initialize_unordered_bulk_op() - bulk.insert({}) - bulk.execute() - def rename_and_drop(): # Ensure collection exists. collection.insert_one({}) @@ -1130,8 +1122,6 @@ class TestClusterTime(IntegrationTest): ('delete_one', lambda: collection.delete_one({})), ('delete_many', lambda: collection.delete_many({})), ('bulk_write', lambda: collection.bulk_write([InsertOne({})])), - ('ordered bulk', lambda: bulk_insert(True)), - ('unordered bulk', lambda: bulk_insert(False)), ('rename_and_drop', rename_and_drop), ]