diff --git a/doc/changelog.rst b/doc/changelog.rst index b45cc0dbb..820472f5d 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -47,6 +47,10 @@ Changes and Deprecations: and disabling it is not recommended, see `does TCP keepalive time affect MongoDB Deployments? `_ +- Deprecated :meth:`~pymongo.collection.Collection.initialize_ordered_bulk_op`, + :meth:`~pymongo.collection.Collection.initialize_unordered_bulk_op`, and + :class:`~pymongo.bulk.BulkOperationBuilder`. Use + :meth:`~pymongo.collection.Collection.bulk_write` instead. - Deprecated :const:`~bson.json_util.STRICT_JSON_OPTIONS`. Use :const:`~bson.json_util.RELAXED_JSON_OPTIONS` or :const:`~bson.json_util.CANONICAL_JSON_OPTIONS` instead. diff --git a/doc/examples/bulk.rst b/doc/examples/bulk.rst index 15a94128b..fb8fb1afb 100644 --- a/doc/examples/bulk.rst +++ b/doc/examples/bulk.rst @@ -47,26 +47,24 @@ Ordered Bulk Write Operations ............................. Ordered bulk write operations are batched and sent to the server in the -order provided for serial execution. The return value is a document -describing the type and count of operations performed. +order provided for serial execution. The return value is an instance of +:class:`~pymongo.results.BulkWriteResult` describing the type and count +of operations performed. .. doctest:: :options: +NORMALIZE_WHITESPACE >>> from pprint import pprint - >>> - >>> bulk = db.test.initialize_ordered_bulk_op() - >>> # Remove all documents from the previous example. - ... - >>> bulk.find({}).remove() - >>> bulk.insert({'_id': 1}) - >>> bulk.insert({'_id': 2}) - >>> bulk.insert({'_id': 3}) - >>> bulk.find({'_id': 1}).update({'$set': {'foo': 'bar'}}) - >>> bulk.find({'_id': 4}).upsert().update({'$inc': {'j': 1}}) - >>> bulk.find({'j': 1}).replace_one({'j': 2}) - >>> result = bulk.execute() - >>> pprint(result) + >>> from pymongo import InsertOne, DeleteMany, ReplaceOne, UpdateOne + >>> result = db.test.bulk_write([ + ... DeleteMany({}), # Remove all documents from the previous example. + ... InsertOne({'_id': 1}), + ... InsertOne({'_id': 2}), + ... InsertOne({'_id': 3}), + ... UpdateOne({'_id': 1}, {'$set': {'foo': 'bar'}}), + ... UpdateOne({'_id': 4}, {'$inc': {'j': 1}}, upsert=True), + ... ReplaceOne({'j': 1}, {'j': 2})]) + >>> pprint(result.bulk_api_result) {'nInserted': 3, 'nMatched': 2, 'nModified': 2, @@ -91,15 +89,14 @@ the failure. .. doctest:: :options: +NORMALIZE_WHITESPACE + >>> from pymongo import InsertOne, DeleteOne, ReplaceOne >>> from pymongo.errors import BulkWriteError - >>> bulk = db.test.initialize_ordered_bulk_op() - >>> bulk.find({'j': 2}).replace_one({'i': 5}) - >>> # Violates the unique key constraint on _id. - ... - >>> bulk.insert({'_id': 4}) - >>> bulk.find({'i': 5}).remove_one() + >>> requests = [ + ... ReplaceOne({'j': 2}, {'i': 5}), + ... InsertOne({'_id': 4}), # Violates the unique key constraint on _id. + ... DeleteOne({'i': 5})] >>> try: - ... bulk.execute() + ... db.test.bulk_write(requests) ... except BulkWriteError as bwe: ... pprint(bwe.details) ... @@ -131,13 +128,13 @@ and fourth operations succeed. .. doctest:: :options: +NORMALIZE_WHITESPACE - >>> bulk = db.test.initialize_unordered_bulk_op() - >>> bulk.insert({'_id': 1}) - >>> bulk.find({'_id': 2}).remove_one() - >>> bulk.insert({'_id': 3}) - >>> bulk.find({'_id': 4}).replace_one({'i': 1}) + >>> requests = [ + ... InsertOne({'_id': 1}), + ... DeleteOne({'_id': 2}), + ... InsertOne({'_id': 3}), + ... ReplaceOne({'_id': 4}, {'i': 1})] >>> try: - ... bulk.execute() + ... db.test.bulk_write(requests, ordered=False) ... except BulkWriteError as bwe: ... pprint(bwe.details) ... @@ -160,22 +157,17 @@ and fourth operations succeed. Write Concern ............. -By default bulk operations are executed with the +Bulk operations are executed with the :attr:`~pymongo.collection.Collection.write_concern` of the collection they -are executed against. A custom write concern can be passed to the -:meth:`~pymongo.bulk.BulkOperationBuilder.execute` method. Write concern -errors (e.g. wtimeout) will be reported after all operations are attempted, -regardless of execution order. +are executed against. Write concern errors (e.g. wtimeout) will be reported +after all operations are attempted, regardless of execution order. :: - - >>> bulk = db.test.initialize_ordered_bulk_op() - >>> bulk.insert({'a': 0}) - >>> bulk.insert({'a': 1}) - >>> bulk.insert({'a': 2}) - >>> bulk.insert({'a': 3}) + >>> from pymongo import WriteConcern + >>> coll = db.get_collection( + ... 'test', write_concern=WriteConcern(w=3, wtimeout=1)) >>> try: - ... bulk.execute({'w': 3, 'wtimeout': 1}) + ... coll.bulk_write([InsertOne({'a': i}) for i in range(4)]) ... except BulkWriteError as bwe: ... pprint(bwe.details) ... diff --git a/pymongo/bulk.py b/pymongo/bulk.py index 2749fa1a6..abdf3b444 100644 --- a/pymongo/bulk.py +++ b/pymongo/bulk.py @@ -599,14 +599,14 @@ class BulkWriteOperation(object): class BulkOperationBuilder(object): - """An interface for executing a batch of write operations. + """**DEPRECATED**: An interface for executing a batch of write operations. """ __slots__ = '__bulk' def __init__(self, collection, ordered=True, bypass_document_validation=False): - """Initialize a new BulkOperationBuilder instance. + """**DEPRECATED**: Initialize a new BulkOperationBuilder instance. :Parameters: - `collection`: A :class:`~pymongo.collection.Collection` instance. @@ -623,6 +623,10 @@ class BulkOperationBuilder(object): .. note:: `bypass_document_validation` requires server version **>= 3.2** + .. versionchanged:: 3.5 + Deprecated. Use :meth:`~pymongo.collection.Collection.bulk_write` + instead. + .. versionchanged:: 3.2 Added bypass_document_validation support """ diff --git a/pymongo/collection.py b/pymongo/collection.py index a22cb4f17..da4b09ca1 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -338,7 +338,7 @@ class Collection(common.BaseObject): read_concern or self.read_concern) def initialize_unordered_bulk_op(self, bypass_document_validation=False): - """Initialize an unordered batch of write operations. + """**DEPRECATED** - Initialize an unordered batch of write operations. Operations will be performed on the server in arbitrary order, possibly in parallel. All operations will be attempted. @@ -355,15 +355,21 @@ class Collection(common.BaseObject): .. note:: `bypass_document_validation` requires server version **>= 3.2** + .. versionchanged:: 3.5 + Deprecated. Use :meth:`~pymongo.collection.Collection.bulk_write` + instead. + .. versionchanged:: 3.2 - Added bypass_document_validation support + Added bypass_document_validation support .. versionadded:: 2.7 """ + warnings.warn("initialize_unordered_bulk_op is deprecated", + DeprecationWarning, stacklevel=2) return BulkOperationBuilder(self, False, bypass_document_validation) def initialize_ordered_bulk_op(self, bypass_document_validation=False): - """Initialize an ordered batch of write operations. + """**DEPRECATED** - Initialize an ordered batch of write operations. Operations will be performed on the server serially, in the order provided. If an error occurs all remaining operations @@ -381,11 +387,17 @@ class Collection(common.BaseObject): .. note:: `bypass_document_validation` requires server version **>= 3.2** + .. versionchanged:: 3.5 + Deprecated. Use :meth:`~pymongo.collection.Collection.bulk_write` + instead. + .. versionchanged:: 3.2 - Added bypass_document_validation support + Added bypass_document_validation support .. versionadded:: 2.7 """ + warnings.warn("initialize_ordered_bulk_op is deprecated", + DeprecationWarning, stacklevel=2) return BulkOperationBuilder(self, True, bypass_document_validation) def bulk_write(self, requests, ordered=True, diff --git a/test/test_bulk.py b/test/test_bulk.py index 8b6638d3a..9545346d1 100644 --- a/test/test_bulk.py +++ b/test/test_bulk.py @@ -128,51 +128,7 @@ class BulkTestBase(IntegrationTest): class TestBulk(BulkTestBase): def test_empty(self): - bulk = self.coll.initialize_ordered_bulk_op() - self.assertRaises(InvalidOperation, bulk.execute) - - def test_find(self): - # find() requires a selector. - bulk = self.coll.initialize_ordered_bulk_op() - self.assertRaises(TypeError, bulk.find) - self.assertRaises(TypeError, bulk.find, 'foo') - # No error. - bulk.find({}) - - @client_context.require_version_min(3, 1, 9, -1) - def test_bypass_document_validation_bulk_op(self): - - # Test insert - self.coll.insert_one({"z": 0}) - self.db.command(SON([("collMod", "test"), - ("validator", {"z": {"$gte": 0}})])) - bulk = self.coll.initialize_ordered_bulk_op( - bypass_document_validation=False) - bulk.insert({"z": -1}) # error - self.assertRaises(BulkWriteError, bulk.execute) - self.assertEqual(0, self.coll.count({"z": -1})) - - bulk = self.coll.initialize_ordered_bulk_op( - bypass_document_validation=True) - bulk.insert({"z": -1}) - bulk.execute() - self.assertEqual(1, self.coll.count({"z": -1})) - - self.coll.insert_one({"z": 0}) - self.db.command(SON([("collMod", "test"), - ("validator", {"z": {"$gte": 0}})])) - bulk = self.coll.initialize_unordered_bulk_op( - bypass_document_validation=False) - bulk.insert({"z": -1}) # error - self.assertRaises(BulkWriteError, bulk.execute) - self.assertEqual(1, self.coll.count({"z": -1})) - - bulk = self.coll.initialize_unordered_bulk_op( - bypass_document_validation=True) - bulk.insert({"z": -1}) - bulk.execute() - self.assertEqual(2, self.coll.count({"z": -1})) - self.coll.drop() + self.assertRaises(InvalidOperation, self.coll.bulk_write, []) def test_insert(self): expected = { @@ -186,44 +142,10 @@ class TestBulk(BulkTestBase): 'writeConcernErrors': [] } - bulk = self.coll.initialize_ordered_bulk_op() - self.assertRaises(TypeError, bulk.insert, 1) - - # find() before insert() is prohibited. - self.assertRaises(AttributeError, lambda: bulk.find({}).insert({})) - - # We don't allow multiple documents per call. - self.assertRaises(TypeError, bulk.insert, [{}, {}]) - self.assertRaises(TypeError, bulk.insert, ({} for _ in range(2))) - - bulk.insert({}) - result = bulk.execute() - self.assertEqualResponse(expected, result) - - self.assertEqual(1, self.coll.count()) - doc = self.coll.find_one() - self.assertTrue(oid_generated_on_client(doc['_id'])) - - bulk = self.coll.initialize_unordered_bulk_op() - bulk.insert({}) - result = bulk.execute() - self.assertEqualResponse(expected, result) - - self.assertEqual(2, self.coll.count()) - result = self.coll.bulk_write([InsertOne({})]) self.assertEqualResponse(expected, result.bulk_api_result) self.assertEqual(1, result.inserted_count) - self.assertEqual(3, self.coll.count()) - - def test_insert_check_keys(self): - bulk = self.coll.initialize_ordered_bulk_op() - bulk.insert({'$dollar': 1}) - self.assertRaises(InvalidDocument, bulk.execute) - - bulk = self.coll.initialize_ordered_bulk_op() - bulk.insert({'a.b': 1}) - self.assertRaises(InvalidDocument, bulk.execute) + self.assertEqual(1, self.coll.count()) def test_update(self): @@ -239,88 +161,12 @@ class TestBulk(BulkTestBase): } self.coll.insert_many([{}, {}]) - bulk = self.coll.initialize_ordered_bulk_op() - - # update() requires find() first. - self.assertRaises( - AttributeError, - lambda: bulk.update({'$set': {'x': 1}})) - - self.assertRaises(TypeError, bulk.find({}).update, 1) - self.assertRaises(ValueError, bulk.find({}).update, {}) - - # All fields must be $-operators. - self.assertRaises(ValueError, bulk.find({}).update, {'foo': 'bar'}) - bulk.find({}).update({'$set': {'foo': 'bar'}}) - result = bulk.execute() - self.assertEqualResponse(expected, result) - self.assertEqual(self.coll.find({'foo': 'bar'}).count(), 2) - - self.coll.delete_many({}) - self.coll.insert_many([{}, {}]) result = self.coll.bulk_write([UpdateMany({}, {'$set': {'foo': 'bar'}})]) self.assertEqualResponse(expected, result.bulk_api_result) self.assertEqual(2, result.matched_count) self.assertTrue(result.modified_count in (2, None)) - # All fields must be $-operators -- validated server-side. - bulk = self.coll.initialize_ordered_bulk_op() - updates = SON([('$set', {'x': 1}), ('y', 1)]) - bulk.find({}).update(updates) - self.assertRaises(BulkWriteError, bulk.execute) - - self.coll.delete_many({}) - self.coll.insert_many([{}, {}]) - - bulk = self.coll.initialize_unordered_bulk_op() - bulk.find({}).update({'$set': {'bim': 'baz'}}) - result = bulk.execute() - self.assertEqualResponse( - {'nMatched': 2, - 'nModified': 2, - 'nUpserted': 0, - 'nInserted': 0, - 'nRemoved': 0, - 'upserted': [], - 'writeErrors': [], - 'writeConcernErrors': []}, - result) - - self.assertEqual(self.coll.find({'bim': 'baz'}).count(), 2) - - self.coll.insert_one({'x': 1}) - bulk = self.coll.initialize_unordered_bulk_op() - bulk.find({'x': 1}).update({'$set': {'x': 42}}) - result = bulk.execute() - self.assertEqualResponse( - {'nMatched': 1, - 'nModified': 1, - 'nUpserted': 0, - 'nInserted': 0, - 'nRemoved': 0, - 'upserted': [], - 'writeErrors': [], - 'writeConcernErrors': []}, - result) - - self.assertEqual(1, self.coll.find({'x': 42}).count()) - - # Second time, x is already 42 so nModified is 0. - bulk = self.coll.initialize_unordered_bulk_op() - bulk.find({'x': 42}).update({'$set': {'x': 42}}) - result = bulk.execute() - self.assertEqualResponse( - {'nMatched': 1, - 'nModified': 0, - 'nUpserted': 0, - 'nInserted': 0, - 'nRemoved': 0, - 'upserted': [], - 'writeErrors': [], - 'writeConcernErrors': []}, - result) - def test_update_one(self): expected = { @@ -336,46 +182,12 @@ class TestBulk(BulkTestBase): self.coll.insert_many([{}, {}]) - bulk = self.coll.initialize_ordered_bulk_op() - - # update_one() requires find() first. - self.assertRaises( - AttributeError, - lambda: bulk.update_one({'$set': {'x': 1}})) - - self.assertRaises(TypeError, bulk.find({}).update_one, 1) - self.assertRaises(ValueError, bulk.find({}).update_one, {}) - self.assertRaises(ValueError, bulk.find({}).update_one, {'foo': 'bar'}) - bulk.find({}).update_one({'$set': {'foo': 'bar'}}) - result = bulk.execute() - self.assertEqualResponse(expected, result) - - self.assertEqual(self.coll.find({'foo': 'bar'}).count(), 1) - - self.coll.delete_many({}) - self.coll.insert_many([{}, {}]) result = self.coll.bulk_write([UpdateOne({}, {'$set': {'foo': 'bar'}})]) self.assertEqualResponse(expected, result.bulk_api_result) self.assertEqual(1, result.matched_count) self.assertTrue(result.modified_count in (1, None)) - self.coll.delete_many({}) - self.coll.insert_many([{}, {}]) - - bulk = self.coll.initialize_unordered_bulk_op() - bulk.find({}).update_one({'$set': {'bim': 'baz'}}) - result = bulk.execute() - self.assertEqualResponse(expected, result) - - self.assertEqual(self.coll.find({'bim': 'baz'}).count(), 1) - - # All fields must be $-operators -- validated server-side. - bulk = self.coll.initialize_ordered_bulk_op() - updates = SON([('$set', {'x': 1}), ('y', 1)]) - bulk.find({}).update_one(updates) - self.assertRaises(BulkWriteError, bulk.execute) - def test_replace_one(self): expected = { @@ -391,33 +203,11 @@ class TestBulk(BulkTestBase): self.coll.insert_many([{}, {}]) - bulk = self.coll.initialize_ordered_bulk_op() - self.assertRaises(TypeError, bulk.find({}).replace_one, 1) - self.assertRaises(ValueError, - bulk.find({}).replace_one, {'$set': {'foo': 'bar'}}) - bulk.find({}).replace_one({'foo': 'bar'}) - result = bulk.execute() - self.assertEqualResponse(expected, result) - - self.assertEqual(self.coll.find({'foo': 'bar'}).count(), 1) - - self.coll.delete_many({}) - self.coll.insert_many([{}, {}]) result = self.coll.bulk_write([ReplaceOne({}, {'foo': 'bar'})]) self.assertEqualResponse(expected, result.bulk_api_result) self.assertEqual(1, result.matched_count) self.assertTrue(result.modified_count in (1, None)) - self.coll.delete_many({}) - self.coll.insert_many([{}, {}]) - - bulk = self.coll.initialize_unordered_bulk_op() - bulk.find({}).replace_one({'bim': 'baz'}) - result = bulk.execute() - self.assertEqualResponse(expected, result) - - self.assertEqual(self.coll.find({'bim': 'baz'}).count(), 1) - def test_remove(self): # Test removing all documents, ordered. expected = { @@ -432,90 +222,12 @@ class TestBulk(BulkTestBase): } self.coll.insert_many([{}, {}]) - bulk = self.coll.initialize_ordered_bulk_op() - - # remove() must be preceded by find(). - self.assertRaises(AttributeError, lambda: bulk.remove()) - bulk.find({}).remove() - result = bulk.execute() - self.assertEqualResponse(expected, result) - - self.assertEqual(self.coll.count(), 0) - - self.coll.insert_many([{}, {}]) result = self.coll.bulk_write([DeleteMany({})]) self.assertEqualResponse(expected, result.bulk_api_result) self.assertEqual(2, result.deleted_count) - # Test removing some documents, ordered. - self.coll.insert_many([{}, {'x': 1}, {}, {'x': 1}]) - - bulk = self.coll.initialize_ordered_bulk_op() - - bulk.find({'x': 1}).remove() - result = bulk.execute() - self.assertEqualResponse( - {'nMatched': 0, - 'nModified': 0, - 'nUpserted': 0, - 'nInserted': 0, - 'nRemoved': 2, - 'upserted': [], - 'writeErrors': [], - 'writeConcernErrors': []}, - result) - - self.assertEqual(self.coll.count(), 2) - self.coll.delete_many({}) - - # Test removing all documents, unordered. - self.coll.insert_many([{}, {}]) - - bulk = self.coll.initialize_unordered_bulk_op() - bulk.find({}).remove() - result = bulk.execute() - self.assertEqualResponse( - {'nMatched': 0, - 'nModified': 0, - 'nUpserted': 0, - 'nInserted': 0, - 'nRemoved': 2, - 'upserted': [], - 'writeErrors': [], - 'writeConcernErrors': []}, - result) - - # Test removing some documents, unordered. - self.assertEqual(self.coll.count(), 0) - - self.coll.insert_many([{}, {'x': 1}, {}, {'x': 1}]) - - bulk = self.coll.initialize_unordered_bulk_op() - bulk.find({'x': 1}).remove() - result = bulk.execute() - self.assertEqualResponse( - {'nMatched': 0, - 'nModified': 0, - 'nUpserted': 0, - 'nInserted': 0, - 'nRemoved': 2, - 'upserted': [], - 'writeErrors': [], - 'writeConcernErrors': []}, - result) - - self.assertEqual(self.coll.count(), 2) - self.coll.delete_many({}) - def test_remove_one(self): - - bulk = self.coll.initialize_ordered_bulk_op() - - # remove_one() must be preceded by find(). - self.assertRaises(AttributeError, lambda: bulk.remove_one()) - # Test removing one document, empty selector. - # First ordered, then unordered. self.coll.insert_many([{}, {}]) expected = { 'nMatched': 0, @@ -528,53 +240,12 @@ class TestBulk(BulkTestBase): 'writeConcernErrors': [] } - bulk.find({}).remove_one() - result = bulk.execute() - self.assertEqualResponse(expected, result) - - self.assertEqual(self.coll.count(), 1) - - self.coll.insert_one({}) result = self.coll.bulk_write([DeleteOne({})]) self.assertEqualResponse(expected, result.bulk_api_result) self.assertEqual(1, result.deleted_count) self.assertEqual(self.coll.count(), 1) - self.coll.insert_one({}) - - bulk = self.coll.initialize_unordered_bulk_op() - bulk.find({}).remove_one() - result = bulk.execute() - self.assertEqualResponse(expected, result) - - self.assertEqual(self.coll.count(), 1) - - # Test removing one document, with a selector. - # First ordered, then unordered. - self.coll.insert_one({'x': 1}) - - bulk = self.coll.initialize_ordered_bulk_op() - bulk.find({'x': 1}).remove_one() - result = bulk.execute() - self.assertEqualResponse(expected, result) - - self.assertEqual([{}], list(self.coll.find({}, {'_id': False}))) - self.coll.insert_one({'x': 1}) - - bulk = self.coll.initialize_unordered_bulk_op() - bulk.find({'x': 1}).remove_one() - result = bulk.execute() - self.assertEqualResponse(expected, result) - - self.assertEqual([{}], list(self.coll.find({}, {'_id': False}))) - def test_upsert(self): - bulk = self.coll.initialize_ordered_bulk_op() - - # upsert() requires find() first. - self.assertRaises( - AttributeError, - lambda: bulk.upsert()) expected = { 'nMatched': 0, @@ -587,11 +258,6 @@ class TestBulk(BulkTestBase): # Note, in MongoDB 2.4 the server won't return the # "upserted" field unless _id is an ObjectId - bulk.find({}).upsert().replace_one({'foo': 'bar'}) - result = bulk.execute() - self.assertEqualResponse(expected, result) - - self.coll.delete_many({}) result = self.coll.bulk_write([ReplaceOne({}, {'foo': 'bar'}, upsert=True)]) @@ -602,355 +268,20 @@ class TestBulk(BulkTestBase): self.assertEqual(self.coll.find({'foo': 'bar'}).count(), 1) - bulk = self.coll.initialize_ordered_bulk_op() - bulk.find({}).upsert().update_one({'$set': {'bim': 'baz'}}) - result = bulk.execute() - self.assertEqualResponse( - {'nMatched': 1, - 'nModified': 1, - 'nUpserted': 0, - 'nInserted': 0, - 'nRemoved': 0, - 'upserted': [], - 'writeErrors': [], - 'writeConcernErrors': []}, - result) - - self.assertEqual(self.coll.find({'bim': 'baz'}).count(), 1) - - bulk = self.coll.initialize_ordered_bulk_op() - bulk.find({}).upsert().update({'$set': {'bim': 'bop'}}) - # Non-upsert, no matches. - bulk.find({'x': 1}).update({'$set': {'x': 2}}) - result = bulk.execute() - self.assertEqualResponse( - {'nMatched': 1, - 'nModified': 1, - 'nUpserted': 0, - 'nInserted': 0, - 'nRemoved': 0, - 'upserted': [], - 'writeErrors': [], - 'writeConcernErrors': []}, - result) - - self.assertEqual(self.coll.find({'bim': 'bop'}).count(), 1) - self.assertEqual(self.coll.find({'x': 2}).count(), 0) - - def test_upsert_large(self): - big = 'a' * (client_context.client.max_bson_size - 37) - bulk = self.coll.initialize_ordered_bulk_op() - bulk.find({'x': 1}).upsert().update({'$set': {'s': big}}) - result = bulk.execute() - self.assertEqualResponse( - {'nMatched': 0, - 'nModified': 0, - 'nUpserted': 1, - 'nInserted': 0, - 'nRemoved': 0, - 'upserted': [{'index': 0, '_id': '...'}]}, - result) - - self.assertEqual(1, self.coll.find({'x': 1}).count()) - - def test_client_generated_upsert_id(self): - batch = self.coll.initialize_ordered_bulk_op() - batch.find({'_id': 0}).upsert().update_one({'$set': {'a': 0}}) - batch.find({'a': 1}).upsert().replace_one({'_id': 1}) - if not client_context.version.at_least(2, 6, 0): - # This case is only possible in MongoDB versions before 2.6. - batch.find({'_id': 3}).upsert().replace_one({'_id': 2}) - else: - # This is just here to make the counts right in all cases. - batch.find({'_id': 2}).upsert().replace_one({'_id': 2}) - result = batch.execute() - self.assertEqualResponse( - {'nMatched': 0, - 'nModified': 0, - 'nUpserted': 3, - 'nInserted': 0, - 'nRemoved': 0, - 'upserted': [{'index': 0, '_id': 0}, - {'index': 1, '_id': 1}, - {'index': 2, '_id': 2}]}, - result) - - def test_single_ordered_batch(self): - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({'a': 1}) - batch.find({'a': 1}).update_one({'$set': {'b': 1}}) - batch.find({'a': 2}).upsert().update_one({'$set': {'b': 2}}) - batch.insert({'a': 3}) - batch.find({'a': 3}).remove() - result = batch.execute() - self.assertEqualResponse( - {'nMatched': 1, - 'nModified': 1, - 'nUpserted': 1, - 'nInserted': 2, - 'nRemoved': 1, - 'upserted': [{'index': 2, '_id': '...'}]}, - result) - - def test_single_error_ordered_batch(self): - self.coll.create_index('a', unique=True) - self.addCleanup(self.coll.drop_index, [('a', 1)]) - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({'b': 1, 'a': 1}) - batch.find({'b': 2}).upsert().update_one({'$set': {'a': 1}}) - batch.insert({'b': 3, 'a': 2}) - - try: - batch.execute() - except BulkWriteError as exc: - result = exc.details - self.assertEqual(exc.code, 65) - else: - self.fail("Error not raised") - - self.assertEqualResponse( - {'nMatched': 0, - 'nModified': 0, - 'nUpserted': 0, - 'nInserted': 1, - 'nRemoved': 0, - 'upserted': [], - 'writeConcernErrors': [], - 'writeErrors': [ - {'index': 1, - 'code': 11000, - 'errmsg': '...', - 'op': {'q': {'b': 2}, - 'u': {'$set': {'a': 1}}, - 'multi': False, - 'upsert': True}}]}, - result) - - def test_multiple_error_ordered_batch(self): - self.coll.create_index('a', unique=True) - self.addCleanup(self.coll.drop_index, [('a', 1)]) - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({'b': 1, 'a': 1}) - batch.find({'b': 2}).upsert().update_one({'$set': {'a': 1}}) - batch.find({'b': 3}).upsert().update_one({'$set': {'a': 2}}) - batch.find({'b': 2}).upsert().update_one({'$set': {'a': 1}}) - batch.insert({'b': 4, 'a': 3}) - batch.insert({'b': 5, 'a': 1}) - - try: - batch.execute() - except BulkWriteError as exc: - result = exc.details - self.assertEqual(exc.code, 65) - else: - self.fail("Error not raised") - - self.assertEqualResponse( - {'nMatched': 0, - 'nModified': 0, - 'nUpserted': 0, - 'nInserted': 1, - 'nRemoved': 0, - 'upserted': [], - 'writeConcernErrors': [], - 'writeErrors': [ - {'index': 1, - 'code': 11000, - 'errmsg': '...', - 'op': {'q': {'b': 2}, - 'u': {'$set': {'a': 1}}, - 'multi': False, - 'upsert': True}}]}, - result) - - def test_single_unordered_batch(self): - batch = self.coll.initialize_unordered_bulk_op() - batch.insert({'a': 1}) - batch.find({'a': 1}).update_one({'$set': {'b': 1}}) - batch.find({'a': 2}).upsert().update_one({'$set': {'b': 2}}) - batch.insert({'a': 3}) - batch.find({'a': 3}).remove() - result = batch.execute() - self.assertEqualResponse( - {'nMatched': 1, - 'nModified': 1, - 'nUpserted': 1, - 'nInserted': 2, - 'nRemoved': 1, - 'upserted': [{'index': 2, '_id': '...'}], - 'writeErrors': [], - 'writeConcernErrors': []}, - result) - - def test_single_error_unordered_batch(self): - self.coll.create_index('a', unique=True) - self.addCleanup(self.coll.drop_index, [('a', 1)]) - batch = self.coll.initialize_unordered_bulk_op() - batch.insert({'b': 1, 'a': 1}) - batch.find({'b': 2}).upsert().update_one({'$set': {'a': 1}}) - batch.insert({'b': 3, 'a': 2}) - - try: - batch.execute() - except BulkWriteError as exc: - result = exc.details - self.assertEqual(exc.code, 65) - else: - self.fail("Error not raised") - - self.assertEqualResponse( - {'nMatched': 0, - 'nModified': 0, - 'nUpserted': 0, - 'nInserted': 2, - 'nRemoved': 0, - 'upserted': [], - 'writeConcernErrors': [], - 'writeErrors': [ - {'index': 1, - 'code': 11000, - 'errmsg': '...', - 'op': {'q': {'b': 2}, - 'u': {'$set': {'a': 1}}, - 'multi': False, - 'upsert': True}}]}, - result) - - def test_multiple_error_unordered_batch(self): - self.coll.create_index('a', unique=True) - self.addCleanup(self.coll.drop_index, [('a', 1)]) - batch = self.coll.initialize_unordered_bulk_op() - batch.insert({'b': 1, 'a': 1}) - batch.find({'b': 2}).upsert().update_one({'$set': {'a': 3}}) - batch.find({'b': 3}).upsert().update_one({'$set': {'a': 4}}) - batch.find({'b': 4}).upsert().update_one({'$set': {'a': 3}}) - batch.insert({'b': 5, 'a': 2}) - batch.insert({'b': 6, 'a': 1}) - - try: - batch.execute() - except BulkWriteError as exc: - result = exc.details - self.assertEqual(exc.code, 65) - else: - self.fail("Error not raised") - # Assume the update at index 1 runs before the update at index 3, - # although the spec does not require it. Same for inserts. - self.assertEqualResponse( - {'nMatched': 0, - 'nModified': 0, - 'nUpserted': 2, - 'nInserted': 2, - 'nRemoved': 0, - 'upserted': [ - {'index': 1, '_id': '...'}, - {'index': 2, '_id': '...'}], - 'writeConcernErrors': [], - 'writeErrors': [ - {'index': 3, - 'code': 11000, - 'errmsg': '...', - 'op': {'q': {'b': 4}, - 'u': {'$set': {'a': 3}}, - 'multi': False, - 'upsert': True}}, - {'index': 5, - 'code': 11000, - 'errmsg': '...', - 'op': {'_id': '...', 'b': 6, 'a': 1}}]}, - result) - - def test_large_inserts_ordered(self): - big = 'x' * self.coll.database.client.max_bson_size - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({'b': 1, 'a': 1}) - batch.insert({'big': big}) - batch.insert({'b': 2, 'a': 2}) - - try: - batch.execute() - except BulkWriteError as exc: - result = exc.details - self.assertEqual(exc.code, 65) - else: - self.fail("Error not raised") - - self.assertEqual(1, result['nInserted']) - - self.coll.delete_many({}) - - big = 'x' * (1024 * 1024 * 4) - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({'a': 1, 'big': big}) - batch.insert({'a': 2, 'big': big}) - batch.insert({'a': 3, 'big': big}) - batch.insert({'a': 4, 'big': big}) - batch.insert({'a': 5, 'big': big}) - batch.insert({'a': 6, 'big': big}) - result = batch.execute() - - self.assertEqual(6, result['nInserted']) - self.assertEqual(6, self.coll.count()) - - def test_large_inserts_unordered(self): - big = 'x' * self.coll.database.client.max_bson_size - batch = self.coll.initialize_unordered_bulk_op() - batch.insert({'b': 1, 'a': 1}) - batch.insert({'big': big}) - batch.insert({'b': 2, 'a': 2}) - - try: - batch.execute() - except BulkWriteError as exc: - result = exc.details - self.assertEqual(exc.code, 65) - else: - self.fail("Error not raised") - - self.assertEqual(2, result['nInserted']) - - self.coll.delete_many({}) - - big = 'x' * (1024 * 1024 * 4) - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({'a': 1, 'big': big}) - batch.insert({'a': 2, 'big': big}) - batch.insert({'a': 3, 'big': big}) - batch.insert({'a': 4, 'big': big}) - batch.insert({'a': 5, 'big': big}) - batch.insert({'a': 6, 'big': big}) - result = batch.execute() - - self.assertEqual(6, result['nInserted']) - self.assertEqual(6, self.coll.count()) - def test_numerous_inserts(self): # Ensure we don't exceed server's 1000-document batch size limit. n_docs = 2100 - batch = self.coll.initialize_unordered_bulk_op() - for _ in range(n_docs): - batch.insert({}) - - result = batch.execute() - self.assertEqual(n_docs, result['nInserted']) + requests = [InsertOne({}) for _ in range(n_docs)] + result = self.coll.bulk_write(requests, ordered=False) + self.assertEqual(n_docs, result.inserted_count) self.assertEqual(n_docs, self.coll.count()) # Same with ordered bulk. - self.coll.delete_many({}) - batch = self.coll.initialize_ordered_bulk_op() - for _ in range(n_docs): - batch.insert({}) - - result = batch.execute() - self.assertEqual(n_docs, result['nInserted']) + self.coll.drop() + result = self.coll.bulk_write(requests) + self.assertEqual(n_docs, result.inserted_count) self.assertEqual(n_docs, self.coll.count()) - def test_multiple_execution(self): - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({}) - batch.execute() - self.assertRaises(InvalidOperation, batch.execute) - def test_generator_insert(self): def gen(): yield {'a': 1, 'b': 1} @@ -962,248 +293,6 @@ class TestBulk(BulkTestBase): result = self.coll.insert_many(gen()) self.assertEqual(5, len(result.inserted_ids)) - -class TestBulkWriteConcern(BulkTestBase): - - @classmethod - def setUpClass(cls): - super(TestBulkWriteConcern, cls).setUpClass() - cls.w = client_context.w - cls.secondary = None - if cls.w > 1: - for member in client_context.ismaster['hosts']: - if member != client_context.ismaster['primary']: - cls.secondary = single_client(*partition_node(member)) - break - - # We tested wtimeout errors by specifying a write concern greater than - # the number of members, but in MongoDB 2.7.8+ this causes a different - # sort of error, "Not enough data-bearing nodes". In recent servers we - # use a failpoint to pause replication on a secondary. - cls.need_replication_stopped = client_context.version.at_least(2, 7, 8) - - def cause_wtimeout(self, batch): - if self.need_replication_stopped: - if not client_context.test_commands_enabled: - raise SkipTest("Test commands must be enabled.") - - self.secondary.admin.command('configureFailPoint', - 'rsSyncApplyStop', - mode='alwaysOn') - - try: - return batch.execute({'w': self.w, 'wtimeout': 1}) - finally: - self.secondary.admin.command('configureFailPoint', - 'rsSyncApplyStop', - mode='off') - else: - return batch.execute({'w': self.w + 1, 'wtimeout': 1}) - - def test_fsync_and_j(self): - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({'a': 1}) - self.assertRaises( - ConfigurationError, - batch.execute, {'fsync': True, 'j': True}) - - @client_context.require_replica_set - def test_write_concern_failure_ordered(self): - # Ensure we don't raise on wnote. - batch = self.coll.initialize_ordered_bulk_op() - batch.find({"something": "that does no exist"}).remove() - self.assertTrue(batch.execute({"w": self.w})) - - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({'a': 1}) - batch.insert({'a': 2}) - - # Replication wtimeout is a 'soft' error. - # It shouldn't stop batch processing. - try: - self.cause_wtimeout(batch) - except BulkWriteError as exc: - result = exc.details - self.assertEqual(exc.code, 65) - else: - self.fail("Error not raised") - - self.assertEqualResponse( - {'nMatched': 0, - 'nModified': 0, - 'nUpserted': 0, - 'nInserted': 2, - 'nRemoved': 0, - 'upserted': [], - 'writeErrors': []}, - result) - - # When talking to legacy servers there will be a - # write concern error for each operation. - self.assertTrue(len(result['writeConcernErrors']) > 0) - - failed = result['writeConcernErrors'][0] - self.assertEqual(64, failed['code']) - self.assertTrue(isinstance(failed['errmsg'], string_type)) - - self.coll.delete_many({}) - self.coll.create_index('a', unique=True) - self.addCleanup(self.coll.drop_index, [('a', 1)]) - - # Fail due to write concern support as well - # as duplicate key error on ordered batch. - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({'a': 1}) - batch.find({'a': 3}).upsert().replace_one({'b': 1}) - batch.insert({'a': 1}) - batch.insert({'a': 2}) - try: - self.cause_wtimeout(batch) - except BulkWriteError as exc: - result = exc.details - self.assertEqual(exc.code, 65) - else: - self.fail("Error not raised") - - self.assertEqualResponse( - {'nMatched': 0, - 'nModified': 0, - 'nUpserted': 1, - 'nInserted': 1, - 'nRemoved': 0, - 'upserted': [{'index': 1, '_id': '...'}], - 'writeErrors': [ - {'index': 2, - 'code': 11000, - 'errmsg': '...', - 'op': {'_id': '...', 'a': 1}}]}, - result) - - self.assertTrue(len(result['writeConcernErrors']) > 1) - failed = result['writeErrors'][0] - self.assertTrue("duplicate" in failed['errmsg']) - - @client_context.require_replica_set - def test_write_concern_failure_unordered(self): - # Ensure we don't raise on wnote. - batch = self.coll.initialize_unordered_bulk_op() - batch.find({"something": "that does no exist"}).remove() - self.assertTrue(batch.execute({"w": self.w})) - - batch = self.coll.initialize_unordered_bulk_op() - batch.insert({'a': 1}) - batch.find({'a': 3}).upsert().update_one({'$set': {'a': 3, 'b': 1}}) - batch.insert({'a': 2}) - - # Replication wtimeout is a 'soft' error. - # It shouldn't stop batch processing. - try: - self.cause_wtimeout(batch) - except BulkWriteError as exc: - result = exc.details - self.assertEqual(exc.code, 65) - else: - self.fail("Error not raised") - - self.assertEqual(2, result['nInserted']) - self.assertEqual(1, result['nUpserted']) - self.assertEqual(0, len(result['writeErrors'])) - # When talking to legacy servers there will be a - # write concern error for each operation. - self.assertTrue(len(result['writeConcernErrors']) > 1) - - self.coll.delete_many({}) - self.coll.create_index('a', unique=True) - self.addCleanup(self.coll.drop_index, [('a', 1)]) - - # Fail due to write concern support as well - # as duplicate key error on unordered batch. - batch = self.coll.initialize_unordered_bulk_op() - batch.insert({'a': 1}) - batch.find({'a': 3}).upsert().update_one({'$set': {'a': 3, - 'b': 1}}) - batch.insert({'a': 1}) - batch.insert({'a': 2}) - try: - self.cause_wtimeout(batch) - except BulkWriteError as exc: - result = exc.details - self.assertEqual(exc.code, 65) - else: - self.fail("Error not raised") - - self.assertEqual(2, result['nInserted']) - self.assertEqual(1, result['nUpserted']) - self.assertEqual(1, len(result['writeErrors'])) - # When talking to legacy servers there will be a - # write concern error for each operation. - self.assertTrue(len(result['writeConcernErrors']) > 1) - - failed = result['writeErrors'][0] - self.assertEqual(2, failed['index']) - self.assertEqual(11000, failed['code']) - self.assertTrue(isinstance(failed['errmsg'], string_type)) - self.assertEqual(1, failed['op']['a']) - - failed = result['writeConcernErrors'][0] - self.assertEqual(64, failed['code']) - self.assertTrue(isinstance(failed['errmsg'], string_type)) - - upserts = result['upserted'] - self.assertEqual(1, len(upserts)) - self.assertEqual(1, upserts[0]['index']) - self.assertTrue(upserts[0].get('_id')) - - -class TestBulkNoResults(BulkTestBase): - - def test_no_results_ordered_success(self): - - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({'_id': 1}) - batch.find({'_id': 3}).upsert().update_one({'$set': {'b': 1}}) - batch.insert({'_id': 2}) - batch.find({'_id': 1}).remove_one() - self.assertTrue(batch.execute({'w': 0}) is None) - wait_until(lambda: 2 == self.coll.count(), - 'insert 2 documents') - - def test_no_results_ordered_failure(self): - - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({'_id': 1}) - batch.find({'_id': 3}).upsert().update_one({'$set': {'b': 1}}) - batch.insert({'_id': 2}) - batch.insert({'_id': 1}) - batch.find({'_id': 1}).remove_one() - self.assertTrue(batch.execute({'w': 0}) is None) - wait_until(lambda: 3 == self.coll.count(), - 'insert 3 documents') - - def test_no_results_unordered_success(self): - - batch = self.coll.initialize_unordered_bulk_op() - batch.insert({'_id': 1}) - batch.find({'_id': 3}).upsert().update_one({'$set': {'b': 1}}) - batch.insert({'_id': 2}) - batch.find({'_id': 1}).remove_one() - self.assertTrue(batch.execute({'w': 0}) is None) - wait_until(lambda: 2 == self.coll.count(), - 'insert 2 documents') - - def test_no_results_unordered_failure(self): - - batch = self.coll.initialize_unordered_bulk_op() - batch.insert({'_id': 1}) - batch.find({'_id': 3}).upsert().update_one({'$set': {'b': 1}}) - batch.insert({'_id': 2}) - batch.insert({'_id': 1}) - batch.find({'_id': 1}).remove_one() - self.assertTrue(batch.execute({'w': 0}) is None) - wait_until(lambda: 2 == self.coll.count(), - 'insert 2 documents') - self.assertTrue(self.coll.find_one({'_id': 1}) is None) - def test_bulk_write_no_results(self): coll = self.coll.with_options(write_concern=WriteConcern(w=0)) @@ -1227,16 +316,16 @@ class TestBulkNoResults(BulkTestBase): self.coll.bulk_write([{}]) -class TestBulkAuthorization(BulkTestBase): +class BulkAuthorizationTestBase(BulkTestBase): @classmethod @client_context.require_auth @client_context.require_version_min(2, 5, 3) def setUpClass(cls): - super(TestBulkAuthorization, cls).setUpClass() + super(BulkAuthorizationTestBase, cls).setUpClass() def setUp(self): - super(TestBulkAuthorization, self).setUp() + super(BulkAuthorizationTestBase, self).setUp() self.db.add_user('readonly', 'pw', roles=['read']) self.db.command( 'createRole', 'noremove', @@ -1252,30 +341,33 @@ class TestBulkAuthorization(BulkTestBase): self.db.command('dropRole', 'noremove') remove_all_users(self.db) + +class TestBulkAuthorization(BulkAuthorizationTestBase): + def test_readonly(self): # We test that an authorization failure aborts the batch and is raised # as OperationFailure. - cli = rs_or_single_client_noauth() - db = cli.pymongo_test - coll = db.test - db.authenticate('readonly', 'pw') - bulk = coll.initialize_ordered_bulk_op() - bulk.insert({'x': 1}) - self.assertRaises(OperationFailure, bulk.execute) + cli = rs_or_single_client_noauth(username='readonly', password='pw', + authSource='pymongo_test') + coll = cli.pymongo_test.test + coll.find_one() + self.assertRaises(OperationFailure, coll.bulk_write, + [InsertOne({'x': 1})]) def test_no_remove(self): # We test that an authorization failure aborts the batch and is raised # as OperationFailure. - cli = rs_or_single_client_noauth() - db = cli.pymongo_test - coll = db.test - db.authenticate('noremove', 'pw') - bulk = coll.initialize_ordered_bulk_op() - bulk.insert({'x': 1}) - bulk.find({'x': 2}).upsert().replace_one({'x': 2}) - bulk.find({}).remove() # Prohibited. - bulk.insert({'x': 3}) # Never attempted. - self.assertRaises(OperationFailure, bulk.execute) + cli = rs_or_single_client_noauth(username='noremove', password='pw', + authSource='pymongo_test') + coll = cli.pymongo_test.test + coll.find_one() + requests = [ + InsertOne({'x': 1}), + ReplaceOne({'x': 2}, {'x': 2}, upsert=True), + DeleteMany({}), # Prohibited. + InsertOne({'x': 3}), # Never attempted. + ] + self.assertRaises(OperationFailure, coll.bulk_write, requests) self.assertEqual(set([1, 2]), set(self.coll.distinct('x'))) if __name__ == "__main__": diff --git a/test/test_collation.py b/test/test_collation.py index 764b24a30..49f4d8e21 100644 --- a/test/test_collation.py +++ b/test/test_collation.py @@ -101,10 +101,15 @@ class TestCollation(unittest.TestCase): cls.client = rs_or_single_client(event_listeners=[cls.listener]) cls.db = cls.client.pymongo_test cls.collation = Collation('en_US') + cls.warn_context = warnings.catch_warnings() + cls.warn_context.__enter__() + warnings.simplefilter("ignore", DeprecationWarning) @classmethod def tearDownClass(cls): monitoring._LISTENERS = cls.saved_listeners + cls.warn_context.__exit__() + cls.warn_context = None def tearDown(self): self.listener.results.clear() @@ -179,12 +184,10 @@ class TestCollation(unittest.TestCase): @raisesConfigurationErrorForOldMongoDB def test_group(self): - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - self.db.test.group('foo', {'foo': {'$gt': 42}}, {}, - 'function(a, b) { return a; }', - collation=self.collation) - self.assertCollationInLastCommand() + self.db.test.group('foo', {'foo': {'$gt': 42}}, {}, + 'function(a, b) { return a; }', + collation=self.collation) + self.assertCollationInLastCommand() @raisesConfigurationErrorForOldMongoDB def test_map_reduce(self): diff --git a/test/test_legacy_api.py b/test/test_legacy_api.py index a9810b6eb..9b113b466 100644 --- a/test/test_legacy_api.py +++ b/test/test_legacy_api.py @@ -28,10 +28,13 @@ from bson.code import Code from bson.codec_options import CodecOptions from bson.dbref import DBRef from bson.objectid import ObjectId +from bson.py3compat import string_type from bson.son import SON from pymongo import ASCENDING, DESCENDING from pymongo.database import Database -from pymongo.errors import (ConfigurationError, +from pymongo.common import partition_node +from pymongo.errors import (BulkWriteError, + ConfigurationError, CursorNotFound, DocumentTooLarge, DuplicateKeyError, @@ -47,25 +50,40 @@ from pymongo.son_manipulator import (AutoReference, from pymongo.write_concern import WriteConcern from test import client_context, qcheck, unittest, SkipTest from test.test_client import IntegrationTest -from test.utils import (ignore_deprecations, - joinall, +from test.test_bulk import BulkTestBase, BulkAuthorizationTestBase +from test.utils import (joinall, + oid_generated_on_client, + remove_all_users, rs_or_single_client, + rs_or_single_client_noauth, + single_client, wait_until) +class DeprecationFilter(object): + + def __init__(self, action="ignore"): + """Start filtering deprecations.""" + self.warn_context = warnings.catch_warnings() + self.warn_context.__enter__() + warnings.simplefilter(action, DeprecationWarning) + + def stop(self): + """Stop filtering deprecations.""" + self.warn_context.__exit__() + self.warn_context = None + + class TestDeprecations(IntegrationTest): @classmethod def setUpClass(cls): super(TestDeprecations, cls).setUpClass() - cls.warn_context = warnings.catch_warnings() - cls.warn_context.__enter__() - warnings.simplefilter("error", DeprecationWarning) + cls.deprecation_filter = DeprecationFilter("error") @classmethod def tearDownClass(cls): - cls.warn_context.__exit__() - cls.warn_context = None + cls.deprecation_filter.stop() def test_save_deprecation(self): self.assertRaises( @@ -108,14 +126,11 @@ class TestLegacy(IntegrationTest): def setUpClass(cls): super(TestLegacy, cls).setUpClass() cls.w = client_context.w - cls.warn_context = warnings.catch_warnings() - cls.warn_context.__enter__() - warnings.simplefilter("ignore", DeprecationWarning) + cls.deprecation_filter = DeprecationFilter() @classmethod def tearDownClass(cls): - cls.warn_context.__exit__() - cls.warn_context = None + cls.deprecation_filter.stop() def test_insert_find_one(self): # Tests legacy insert. @@ -1424,5 +1439,1098 @@ class TestLegacy(IntegrationTest): self.assertEqual(Database(c, 'foo'), c.get_default_database()) +class TestLegacyBulk(BulkTestBase): + + @classmethod + def setUpClass(cls): + super(TestLegacyBulk, cls).setUpClass() + cls.deprecation_filter = DeprecationFilter() + + @classmethod + def tearDownClass(cls): + cls.deprecation_filter.stop() + + def test_empty(self): + bulk = self.coll.initialize_ordered_bulk_op() + self.assertRaises(InvalidOperation, bulk.execute) + + def test_find(self): + # find() requires a selector. + bulk = self.coll.initialize_ordered_bulk_op() + self.assertRaises(TypeError, bulk.find) + self.assertRaises(TypeError, bulk.find, 'foo') + # No error. + bulk.find({}) + + @client_context.require_version_min(3, 1, 9, -1) + def test_bypass_document_validation_bulk_op(self): + + # Test insert + self.coll.insert_one({"z": 0}) + self.db.command(SON([("collMod", "test"), + ("validator", {"z": {"$gte": 0}})])) + bulk = self.coll.initialize_ordered_bulk_op( + bypass_document_validation=False) + bulk.insert({"z": -1}) # error + self.assertRaises(BulkWriteError, bulk.execute) + self.assertEqual(0, self.coll.count({"z": -1})) + + bulk = self.coll.initialize_ordered_bulk_op( + bypass_document_validation=True) + bulk.insert({"z": -1}) + bulk.execute() + self.assertEqual(1, self.coll.count({"z": -1})) + + self.coll.insert_one({"z": 0}) + self.db.command(SON([("collMod", "test"), + ("validator", {"z": {"$gte": 0}})])) + bulk = self.coll.initialize_unordered_bulk_op( + bypass_document_validation=False) + bulk.insert({"z": -1}) # error + self.assertRaises(BulkWriteError, bulk.execute) + self.assertEqual(1, self.coll.count({"z": -1})) + + bulk = self.coll.initialize_unordered_bulk_op( + bypass_document_validation=True) + bulk.insert({"z": -1}) + bulk.execute() + self.assertEqual(2, self.coll.count({"z": -1})) + self.coll.drop() + + def test_insert(self): + expected = { + 'nMatched': 0, + 'nModified': 0, + 'nUpserted': 0, + 'nInserted': 1, + 'nRemoved': 0, + 'upserted': [], + 'writeErrors': [], + 'writeConcernErrors': [] + } + + bulk = self.coll.initialize_ordered_bulk_op() + self.assertRaises(TypeError, bulk.insert, 1) + + # find() before insert() is prohibited. + self.assertRaises(AttributeError, lambda: bulk.find({}).insert({})) + + # We don't allow multiple documents per call. + self.assertRaises(TypeError, bulk.insert, [{}, {}]) + self.assertRaises(TypeError, bulk.insert, ({} for _ in range(2))) + + bulk.insert({}) + result = bulk.execute() + self.assertEqualResponse(expected, result) + + self.assertEqual(1, self.coll.count()) + doc = self.coll.find_one() + self.assertTrue(oid_generated_on_client(doc['_id'])) + + bulk = self.coll.initialize_unordered_bulk_op() + bulk.insert({}) + result = bulk.execute() + self.assertEqualResponse(expected, result) + + self.assertEqual(2, self.coll.count()) + + def test_insert_check_keys(self): + bulk = self.coll.initialize_ordered_bulk_op() + bulk.insert({'$dollar': 1}) + self.assertRaises(InvalidDocument, bulk.execute) + + bulk = self.coll.initialize_ordered_bulk_op() + bulk.insert({'a.b': 1}) + self.assertRaises(InvalidDocument, bulk.execute) + + def test_update(self): + + expected = { + 'nMatched': 2, + 'nModified': 2, + 'nUpserted': 0, + 'nInserted': 0, + 'nRemoved': 0, + 'upserted': [], + 'writeErrors': [], + 'writeConcernErrors': [] + } + self.coll.insert_many([{}, {}]) + + bulk = self.coll.initialize_ordered_bulk_op() + + # update() requires find() first. + self.assertRaises( + AttributeError, + lambda: bulk.update({'$set': {'x': 1}})) + + self.assertRaises(TypeError, bulk.find({}).update, 1) + self.assertRaises(ValueError, bulk.find({}).update, {}) + + # All fields must be $-operators. + self.assertRaises(ValueError, bulk.find({}).update, {'foo': 'bar'}) + bulk.find({}).update({'$set': {'foo': 'bar'}}) + result = bulk.execute() + self.assertEqualResponse(expected, result) + self.assertEqual(self.coll.find({'foo': 'bar'}).count(), 2) + + # All fields must be $-operators -- validated server-side. + bulk = self.coll.initialize_ordered_bulk_op() + updates = SON([('$set', {'x': 1}), ('y', 1)]) + bulk.find({}).update(updates) + self.assertRaises(BulkWriteError, bulk.execute) + + self.coll.delete_many({}) + self.coll.insert_many([{}, {}]) + + bulk = self.coll.initialize_unordered_bulk_op() + bulk.find({}).update({'$set': {'bim': 'baz'}}) + result = bulk.execute() + self.assertEqualResponse( + {'nMatched': 2, + 'nModified': 2, + 'nUpserted': 0, + 'nInserted': 0, + 'nRemoved': 0, + 'upserted': [], + 'writeErrors': [], + 'writeConcernErrors': []}, + result) + + self.assertEqual(self.coll.find({'bim': 'baz'}).count(), 2) + + self.coll.insert_one({'x': 1}) + bulk = self.coll.initialize_unordered_bulk_op() + bulk.find({'x': 1}).update({'$set': {'x': 42}}) + result = bulk.execute() + self.assertEqualResponse( + {'nMatched': 1, + 'nModified': 1, + 'nUpserted': 0, + 'nInserted': 0, + 'nRemoved': 0, + 'upserted': [], + 'writeErrors': [], + 'writeConcernErrors': []}, + result) + + self.assertEqual(1, self.coll.find({'x': 42}).count()) + + # Second time, x is already 42 so nModified is 0. + bulk = self.coll.initialize_unordered_bulk_op() + bulk.find({'x': 42}).update({'$set': {'x': 42}}) + result = bulk.execute() + self.assertEqualResponse( + {'nMatched': 1, + 'nModified': 0, + 'nUpserted': 0, + 'nInserted': 0, + 'nRemoved': 0, + 'upserted': [], + 'writeErrors': [], + 'writeConcernErrors': []}, + result) + + def test_update_one(self): + + expected = { + 'nMatched': 1, + 'nModified': 1, + 'nUpserted': 0, + 'nInserted': 0, + 'nRemoved': 0, + 'upserted': [], + 'writeErrors': [], + 'writeConcernErrors': [] + } + + self.coll.insert_many([{}, {}]) + + bulk = self.coll.initialize_ordered_bulk_op() + + # update_one() requires find() first. + self.assertRaises( + AttributeError, + lambda: bulk.update_one({'$set': {'x': 1}})) + + self.assertRaises(TypeError, bulk.find({}).update_one, 1) + self.assertRaises(ValueError, bulk.find({}).update_one, {}) + self.assertRaises(ValueError, bulk.find({}).update_one, {'foo': 'bar'}) + bulk.find({}).update_one({'$set': {'foo': 'bar'}}) + result = bulk.execute() + self.assertEqualResponse(expected, result) + + self.assertEqual(self.coll.find({'foo': 'bar'}).count(), 1) + + self.coll.delete_many({}) + self.coll.insert_many([{}, {}]) + + bulk = self.coll.initialize_unordered_bulk_op() + bulk.find({}).update_one({'$set': {'bim': 'baz'}}) + result = bulk.execute() + self.assertEqualResponse(expected, result) + + self.assertEqual(self.coll.find({'bim': 'baz'}).count(), 1) + + # All fields must be $-operators -- validated server-side. + bulk = self.coll.initialize_ordered_bulk_op() + updates = SON([('$set', {'x': 1}), ('y', 1)]) + bulk.find({}).update_one(updates) + self.assertRaises(BulkWriteError, bulk.execute) + + def test_replace_one(self): + + expected = { + 'nMatched': 1, + 'nModified': 1, + 'nUpserted': 0, + 'nInserted': 0, + 'nRemoved': 0, + 'upserted': [], + 'writeErrors': [], + 'writeConcernErrors': [] + } + + self.coll.insert_many([{}, {}]) + + bulk = self.coll.initialize_ordered_bulk_op() + self.assertRaises(TypeError, bulk.find({}).replace_one, 1) + self.assertRaises(ValueError, + bulk.find({}).replace_one, {'$set': {'foo': 'bar'}}) + bulk.find({}).replace_one({'foo': 'bar'}) + result = bulk.execute() + self.assertEqualResponse(expected, result) + + self.assertEqual(self.coll.find({'foo': 'bar'}).count(), 1) + + self.coll.delete_many({}) + self.coll.insert_many([{}, {}]) + + bulk = self.coll.initialize_unordered_bulk_op() + bulk.find({}).replace_one({'bim': 'baz'}) + result = bulk.execute() + self.assertEqualResponse(expected, result) + + self.assertEqual(self.coll.find({'bim': 'baz'}).count(), 1) + + def test_remove(self): + # Test removing all documents, ordered. + expected = { + 'nMatched': 0, + 'nModified': 0, + 'nUpserted': 0, + 'nInserted': 0, + 'nRemoved': 2, + 'upserted': [], + 'writeErrors': [], + 'writeConcernErrors': [] + } + self.coll.insert_many([{}, {}]) + + bulk = self.coll.initialize_ordered_bulk_op() + + # remove() must be preceded by find(). + self.assertRaises(AttributeError, lambda: bulk.remove()) + bulk.find({}).remove() + result = bulk.execute() + self.assertEqualResponse(expected, result) + + self.assertEqual(self.coll.count(), 0) + + # Test removing some documents, ordered. + self.coll.insert_many([{}, {'x': 1}, {}, {'x': 1}]) + + bulk = self.coll.initialize_ordered_bulk_op() + + bulk.find({'x': 1}).remove() + result = bulk.execute() + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 0, + 'nInserted': 0, + 'nRemoved': 2, + 'upserted': [], + 'writeErrors': [], + 'writeConcernErrors': []}, + result) + + self.assertEqual(self.coll.count(), 2) + self.coll.delete_many({}) + + # Test removing all documents, unordered. + self.coll.insert_many([{}, {}]) + + bulk = self.coll.initialize_unordered_bulk_op() + bulk.find({}).remove() + result = bulk.execute() + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 0, + 'nInserted': 0, + 'nRemoved': 2, + 'upserted': [], + 'writeErrors': [], + 'writeConcernErrors': []}, + result) + + # Test removing some documents, unordered. + self.assertEqual(self.coll.count(), 0) + + self.coll.insert_many([{}, {'x': 1}, {}, {'x': 1}]) + + bulk = self.coll.initialize_unordered_bulk_op() + bulk.find({'x': 1}).remove() + result = bulk.execute() + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 0, + 'nInserted': 0, + 'nRemoved': 2, + 'upserted': [], + 'writeErrors': [], + 'writeConcernErrors': []}, + result) + + self.assertEqual(self.coll.count(), 2) + self.coll.delete_many({}) + + def test_remove_one(self): + + bulk = self.coll.initialize_ordered_bulk_op() + + # remove_one() must be preceded by find(). + self.assertRaises(AttributeError, lambda: bulk.remove_one()) + + # Test removing one document, empty selector. + # First ordered, then unordered. + self.coll.insert_many([{}, {}]) + expected = { + 'nMatched': 0, + 'nModified': 0, + 'nUpserted': 0, + 'nInserted': 0, + 'nRemoved': 1, + 'upserted': [], + 'writeErrors': [], + 'writeConcernErrors': [] + } + + bulk.find({}).remove_one() + result = bulk.execute() + self.assertEqualResponse(expected, result) + + self.assertEqual(self.coll.count(), 1) + + self.coll.insert_one({}) + + bulk = self.coll.initialize_unordered_bulk_op() + bulk.find({}).remove_one() + result = bulk.execute() + self.assertEqualResponse(expected, result) + + self.assertEqual(self.coll.count(), 1) + + # Test removing one document, with a selector. + # First ordered, then unordered. + self.coll.insert_one({'x': 1}) + + bulk = self.coll.initialize_ordered_bulk_op() + bulk.find({'x': 1}).remove_one() + result = bulk.execute() + self.assertEqualResponse(expected, result) + + self.assertEqual([{}], list(self.coll.find({}, {'_id': False}))) + self.coll.insert_one({'x': 1}) + + bulk = self.coll.initialize_unordered_bulk_op() + bulk.find({'x': 1}).remove_one() + result = bulk.execute() + self.assertEqualResponse(expected, result) + + self.assertEqual([{}], list(self.coll.find({}, {'_id': False}))) + + def test_upsert(self): + bulk = self.coll.initialize_ordered_bulk_op() + + # upsert() requires find() first. + self.assertRaises( + AttributeError, + lambda: bulk.upsert()) + + expected = { + 'nMatched': 0, + 'nModified': 0, + 'nUpserted': 1, + 'nInserted': 0, + 'nRemoved': 0, + 'upserted': [{'index': 0, '_id': '...'}] + } + + # Note, in MongoDB 2.4 the server won't return the + # "upserted" field unless _id is an ObjectId + bulk.find({}).upsert().replace_one({'foo': 'bar'}) + result = bulk.execute() + self.assertEqualResponse(expected, result) + + bulk = self.coll.initialize_ordered_bulk_op() + bulk.find({}).upsert().update_one({'$set': {'bim': 'baz'}}) + result = bulk.execute() + self.assertEqualResponse( + {'nMatched': 1, + 'nModified': 1, + 'nUpserted': 0, + 'nInserted': 0, + 'nRemoved': 0, + 'upserted': [], + 'writeErrors': [], + 'writeConcernErrors': []}, + result) + + self.assertEqual(self.coll.find({'bim': 'baz'}).count(), 1) + + bulk = self.coll.initialize_ordered_bulk_op() + bulk.find({}).upsert().update({'$set': {'bim': 'bop'}}) + # Non-upsert, no matches. + bulk.find({'x': 1}).update({'$set': {'x': 2}}) + result = bulk.execute() + self.assertEqualResponse( + {'nMatched': 1, + 'nModified': 1, + 'nUpserted': 0, + 'nInserted': 0, + 'nRemoved': 0, + 'upserted': [], + 'writeErrors': [], + 'writeConcernErrors': []}, + result) + + self.assertEqual(self.coll.find({'bim': 'bop'}).count(), 1) + self.assertEqual(self.coll.find({'x': 2}).count(), 0) + + def test_upsert_large(self): + big = 'a' * (client_context.client.max_bson_size - 37) + bulk = self.coll.initialize_ordered_bulk_op() + bulk.find({'x': 1}).upsert().update({'$set': {'s': big}}) + result = bulk.execute() + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 1, + 'nInserted': 0, + 'nRemoved': 0, + 'upserted': [{'index': 0, '_id': '...'}]}, + result) + + self.assertEqual(1, self.coll.find({'x': 1}).count()) + + def test_client_generated_upsert_id(self): + batch = self.coll.initialize_ordered_bulk_op() + batch.find({'_id': 0}).upsert().update_one({'$set': {'a': 0}}) + batch.find({'a': 1}).upsert().replace_one({'_id': 1}) + if not client_context.version.at_least(2, 6, 0): + # This case is only possible in MongoDB versions before 2.6. + batch.find({'_id': 3}).upsert().replace_one({'_id': 2}) + else: + # This is just here to make the counts right in all cases. + batch.find({'_id': 2}).upsert().replace_one({'_id': 2}) + result = batch.execute() + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 3, + 'nInserted': 0, + 'nRemoved': 0, + 'upserted': [{'index': 0, '_id': 0}, + {'index': 1, '_id': 1}, + {'index': 2, '_id': 2}]}, + result) + + def test_single_ordered_batch(self): + batch = self.coll.initialize_ordered_bulk_op() + batch.insert({'a': 1}) + batch.find({'a': 1}).update_one({'$set': {'b': 1}}) + batch.find({'a': 2}).upsert().update_one({'$set': {'b': 2}}) + batch.insert({'a': 3}) + batch.find({'a': 3}).remove() + result = batch.execute() + self.assertEqualResponse( + {'nMatched': 1, + 'nModified': 1, + 'nUpserted': 1, + 'nInserted': 2, + 'nRemoved': 1, + 'upserted': [{'index': 2, '_id': '...'}]}, + result) + + def test_single_error_ordered_batch(self): + self.coll.create_index('a', unique=True) + self.addCleanup(self.coll.drop_index, [('a', 1)]) + batch = self.coll.initialize_ordered_bulk_op() + batch.insert({'b': 1, 'a': 1}) + batch.find({'b': 2}).upsert().update_one({'$set': {'a': 1}}) + batch.insert({'b': 3, 'a': 2}) + + try: + batch.execute() + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 0, + 'nInserted': 1, + 'nRemoved': 0, + 'upserted': [], + 'writeConcernErrors': [], + 'writeErrors': [ + {'index': 1, + 'code': 11000, + 'errmsg': '...', + 'op': {'q': {'b': 2}, + 'u': {'$set': {'a': 1}}, + 'multi': False, + 'upsert': True}}]}, + result) + + def test_multiple_error_ordered_batch(self): + self.coll.create_index('a', unique=True) + self.addCleanup(self.coll.drop_index, [('a', 1)]) + batch = self.coll.initialize_ordered_bulk_op() + batch.insert({'b': 1, 'a': 1}) + batch.find({'b': 2}).upsert().update_one({'$set': {'a': 1}}) + batch.find({'b': 3}).upsert().update_one({'$set': {'a': 2}}) + batch.find({'b': 2}).upsert().update_one({'$set': {'a': 1}}) + batch.insert({'b': 4, 'a': 3}) + batch.insert({'b': 5, 'a': 1}) + + try: + batch.execute() + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 0, + 'nInserted': 1, + 'nRemoved': 0, + 'upserted': [], + 'writeConcernErrors': [], + 'writeErrors': [ + {'index': 1, + 'code': 11000, + 'errmsg': '...', + 'op': {'q': {'b': 2}, + 'u': {'$set': {'a': 1}}, + 'multi': False, + 'upsert': True}}]}, + result) + + def test_single_unordered_batch(self): + batch = self.coll.initialize_unordered_bulk_op() + batch.insert({'a': 1}) + batch.find({'a': 1}).update_one({'$set': {'b': 1}}) + batch.find({'a': 2}).upsert().update_one({'$set': {'b': 2}}) + batch.insert({'a': 3}) + batch.find({'a': 3}).remove() + result = batch.execute() + self.assertEqualResponse( + {'nMatched': 1, + 'nModified': 1, + 'nUpserted': 1, + 'nInserted': 2, + 'nRemoved': 1, + 'upserted': [{'index': 2, '_id': '...'}], + 'writeErrors': [], + 'writeConcernErrors': []}, + result) + + def test_single_error_unordered_batch(self): + self.coll.create_index('a', unique=True) + self.addCleanup(self.coll.drop_index, [('a', 1)]) + batch = self.coll.initialize_unordered_bulk_op() + batch.insert({'b': 1, 'a': 1}) + batch.find({'b': 2}).upsert().update_one({'$set': {'a': 1}}) + batch.insert({'b': 3, 'a': 2}) + + try: + batch.execute() + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 0, + 'nInserted': 2, + 'nRemoved': 0, + 'upserted': [], + 'writeConcernErrors': [], + 'writeErrors': [ + {'index': 1, + 'code': 11000, + 'errmsg': '...', + 'op': {'q': {'b': 2}, + 'u': {'$set': {'a': 1}}, + 'multi': False, + 'upsert': True}}]}, + result) + + def test_multiple_error_unordered_batch(self): + self.coll.create_index('a', unique=True) + self.addCleanup(self.coll.drop_index, [('a', 1)]) + batch = self.coll.initialize_unordered_bulk_op() + batch.insert({'b': 1, 'a': 1}) + batch.find({'b': 2}).upsert().update_one({'$set': {'a': 3}}) + batch.find({'b': 3}).upsert().update_one({'$set': {'a': 4}}) + batch.find({'b': 4}).upsert().update_one({'$set': {'a': 3}}) + batch.insert({'b': 5, 'a': 2}) + batch.insert({'b': 6, 'a': 1}) + + try: + batch.execute() + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + # Assume the update at index 1 runs before the update at index 3, + # although the spec does not require it. Same for inserts. + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 2, + 'nInserted': 2, + 'nRemoved': 0, + 'upserted': [ + {'index': 1, '_id': '...'}, + {'index': 2, '_id': '...'}], + 'writeConcernErrors': [], + 'writeErrors': [ + {'index': 3, + 'code': 11000, + 'errmsg': '...', + 'op': {'q': {'b': 4}, + 'u': {'$set': {'a': 3}}, + 'multi': False, + 'upsert': True}}, + {'index': 5, + 'code': 11000, + 'errmsg': '...', + 'op': {'_id': '...', 'b': 6, 'a': 1}}]}, + result) + + def test_large_inserts_ordered(self): + big = 'x' * self.coll.database.client.max_bson_size + batch = self.coll.initialize_ordered_bulk_op() + batch.insert({'b': 1, 'a': 1}) + batch.insert({'big': big}) + batch.insert({'b': 2, 'a': 2}) + + try: + batch.execute() + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqual(1, result['nInserted']) + + self.coll.delete_many({}) + + big = 'x' * (1024 * 1024 * 4) + batch = self.coll.initialize_ordered_bulk_op() + batch.insert({'a': 1, 'big': big}) + batch.insert({'a': 2, 'big': big}) + batch.insert({'a': 3, 'big': big}) + batch.insert({'a': 4, 'big': big}) + batch.insert({'a': 5, 'big': big}) + batch.insert({'a': 6, 'big': big}) + result = batch.execute() + + self.assertEqual(6, result['nInserted']) + self.assertEqual(6, self.coll.count()) + + def test_large_inserts_unordered(self): + big = 'x' * self.coll.database.client.max_bson_size + batch = self.coll.initialize_unordered_bulk_op() + batch.insert({'b': 1, 'a': 1}) + batch.insert({'big': big}) + batch.insert({'b': 2, 'a': 2}) + + try: + batch.execute() + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqual(2, result['nInserted']) + + self.coll.delete_many({}) + + big = 'x' * (1024 * 1024 * 4) + batch = self.coll.initialize_ordered_bulk_op() + batch.insert({'a': 1, 'big': big}) + batch.insert({'a': 2, 'big': big}) + batch.insert({'a': 3, 'big': big}) + batch.insert({'a': 4, 'big': big}) + batch.insert({'a': 5, 'big': big}) + batch.insert({'a': 6, 'big': big}) + result = batch.execute() + + self.assertEqual(6, result['nInserted']) + self.assertEqual(6, self.coll.count()) + + def test_numerous_inserts(self): + # Ensure we don't exceed server's 1000-document batch size limit. + n_docs = 2100 + batch = self.coll.initialize_unordered_bulk_op() + for _ in range(n_docs): + batch.insert({}) + + result = batch.execute() + self.assertEqual(n_docs, result['nInserted']) + self.assertEqual(n_docs, self.coll.count()) + + # Same with ordered bulk. + self.coll.delete_many({}) + batch = self.coll.initialize_ordered_bulk_op() + for _ in range(n_docs): + batch.insert({}) + + result = batch.execute() + self.assertEqual(n_docs, result['nInserted']) + self.assertEqual(n_docs, self.coll.count()) + + def test_multiple_execution(self): + batch = self.coll.initialize_ordered_bulk_op() + batch.insert({}) + batch.execute() + self.assertRaises(InvalidOperation, batch.execute) + + def test_generator_insert(self): + def gen(): + yield {'a': 1, 'b': 1} + yield {'a': 1, 'b': 2} + yield {'a': 2, 'b': 3} + yield {'a': 3, 'b': 5} + yield {'a': 5, 'b': 8} + + result = self.coll.insert_many(gen()) + self.assertEqual(5, len(result.inserted_ids)) + + +class TestLegacyBulkNoResults(BulkTestBase): + + @classmethod + def setUpClass(cls): + super(TestLegacyBulkNoResults, cls).setUpClass() + cls.deprecation_filter = DeprecationFilter() + + @classmethod + def tearDownClass(cls): + cls.deprecation_filter.stop() + + def tearDown(self): + self.coll.delete_many({}) + + def test_no_results_ordered_success(self): + + batch = self.coll.initialize_ordered_bulk_op() + batch.insert({'_id': 1}) + batch.find({'_id': 3}).upsert().update_one({'$set': {'b': 1}}) + batch.insert({'_id': 2}) + batch.find({'_id': 1}).remove_one() + self.assertTrue(batch.execute({'w': 0}) is None) + wait_until(lambda: 2 == self.coll.count(), + 'insert 2 documents') + + def test_no_results_ordered_failure(self): + + batch = self.coll.initialize_ordered_bulk_op() + batch.insert({'_id': 1}) + batch.find({'_id': 3}).upsert().update_one({'$set': {'b': 1}}) + batch.insert({'_id': 2}) + batch.insert({'_id': 1}) + batch.find({'_id': 1}).remove_one() + self.assertTrue(batch.execute({'w': 0}) is None) + wait_until(lambda: 3 == self.coll.count(), + 'insert 3 documents') + + def test_no_results_unordered_success(self): + + batch = self.coll.initialize_unordered_bulk_op() + batch.insert({'_id': 1}) + batch.find({'_id': 3}).upsert().update_one({'$set': {'b': 1}}) + batch.insert({'_id': 2}) + batch.find({'_id': 1}).remove_one() + self.assertTrue(batch.execute({'w': 0}) is None) + wait_until(lambda: 2 == self.coll.count(), + 'insert 2 documents') + + def test_no_results_unordered_failure(self): + + batch = self.coll.initialize_unordered_bulk_op() + batch.insert({'_id': 1}) + batch.find({'_id': 3}).upsert().update_one({'$set': {'b': 1}}) + batch.insert({'_id': 2}) + batch.insert({'_id': 1}) + batch.find({'_id': 1}).remove_one() + self.assertTrue(batch.execute({'w': 0}) is None) + wait_until(lambda: 2 == self.coll.count(), + 'insert 2 documents') + self.assertEqual(self.coll.find_one({'_id': 1}), None) + + +class TestLegacyBulkWriteConcern(BulkTestBase): + + @classmethod + def setUpClass(cls): + super(TestLegacyBulkWriteConcern, cls).setUpClass() + cls.w = client_context.w + cls.secondary = None + if cls.w > 1: + for member in client_context.ismaster['hosts']: + if member != client_context.ismaster['primary']: + cls.secondary = single_client(*partition_node(member)) + break + + # We tested wtimeout errors by specifying a write concern greater than + # the number of members, but in MongoDB 2.7.8+ this causes a different + # sort of error, "Not enough data-bearing nodes". In recent servers we + # use a failpoint to pause replication on a secondary. + cls.need_replication_stopped = client_context.version.at_least(2, 7, 8) + cls.deprecation_filter = DeprecationFilter() + + @classmethod + def tearDownClass(cls): + cls.deprecation_filter.stop() + + def cause_wtimeout(self, batch): + if self.need_replication_stopped: + if not client_context.test_commands_enabled: + raise SkipTest("Test commands must be enabled.") + + self.secondary.admin.command('configureFailPoint', + 'rsSyncApplyStop', + mode='alwaysOn') + + try: + return batch.execute({'w': self.w, 'wtimeout': 1}) + finally: + self.secondary.admin.command('configureFailPoint', + 'rsSyncApplyStop', + mode='off') + else: + return batch.execute({'w': self.w + 1, 'wtimeout': 1}) + + def test_fsync_and_j(self): + batch = self.coll.initialize_ordered_bulk_op() + batch.insert({'a': 1}) + self.assertRaises( + ConfigurationError, + batch.execute, {'fsync': True, 'j': True}) + + @client_context.require_replica_set + def test_write_concern_failure_ordered(self): + # Ensure we don't raise on wnote. + batch = self.coll.initialize_ordered_bulk_op() + batch.find({"something": "that does no exist"}).remove() + self.assertTrue(batch.execute({"w": self.w})) + + batch = self.coll.initialize_ordered_bulk_op() + batch.insert({'a': 1}) + batch.insert({'a': 2}) + + # Replication wtimeout is a 'soft' error. + # It shouldn't stop batch processing. + try: + self.cause_wtimeout(batch) + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 0, + 'nInserted': 2, + 'nRemoved': 0, + 'upserted': [], + 'writeErrors': []}, + result) + + # When talking to legacy servers there will be a + # write concern error for each operation. + self.assertTrue(len(result['writeConcernErrors']) > 0) + + failed = result['writeConcernErrors'][0] + self.assertEqual(64, failed['code']) + self.assertTrue(isinstance(failed['errmsg'], string_type)) + + self.coll.delete_many({}) + self.coll.create_index('a', unique=True) + self.addCleanup(self.coll.drop_index, [('a', 1)]) + + # Fail due to write concern support as well + # as duplicate key error on ordered batch. + batch = self.coll.initialize_ordered_bulk_op() + batch.insert({'a': 1}) + batch.find({'a': 3}).upsert().replace_one({'b': 1}) + batch.insert({'a': 1}) + batch.insert({'a': 2}) + try: + self.cause_wtimeout(batch) + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 1, + 'nInserted': 1, + 'nRemoved': 0, + 'upserted': [{'index': 1, '_id': '...'}], + 'writeErrors': [ + {'index': 2, + 'code': 11000, + 'errmsg': '...', + 'op': {'_id': '...', 'a': 1}}]}, + result) + + self.assertTrue(len(result['writeConcernErrors']) > 1) + failed = result['writeErrors'][0] + self.assertTrue("duplicate" in failed['errmsg']) + + @client_context.require_replica_set + def test_write_concern_failure_unordered(self): + # Ensure we don't raise on wnote. + batch = self.coll.initialize_unordered_bulk_op() + batch.find({"something": "that does no exist"}).remove() + self.assertTrue(batch.execute({"w": self.w})) + + batch = self.coll.initialize_unordered_bulk_op() + batch.insert({'a': 1}) + batch.find({'a': 3}).upsert().update_one({'$set': {'a': 3, 'b': 1}}) + batch.insert({'a': 2}) + + # Replication wtimeout is a 'soft' error. + # It shouldn't stop batch processing. + try: + self.cause_wtimeout(batch) + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqual(2, result['nInserted']) + self.assertEqual(1, result['nUpserted']) + self.assertEqual(0, len(result['writeErrors'])) + # When talking to legacy servers there will be a + # write concern error for each operation. + self.assertTrue(len(result['writeConcernErrors']) > 1) + + self.coll.delete_many({}) + self.coll.create_index('a', unique=True) + self.addCleanup(self.coll.drop_index, [('a', 1)]) + + # Fail due to write concern support as well + # as duplicate key error on unordered batch. + batch = self.coll.initialize_unordered_bulk_op() + batch.insert({'a': 1}) + batch.find({'a': 3}).upsert().update_one({'$set': {'a': 3, + 'b': 1}}) + batch.insert({'a': 1}) + batch.insert({'a': 2}) + try: + self.cause_wtimeout(batch) + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqual(2, result['nInserted']) + self.assertEqual(1, result['nUpserted']) + self.assertEqual(1, len(result['writeErrors'])) + # When talking to legacy servers there will be a + # write concern error for each operation. + self.assertTrue(len(result['writeConcernErrors']) > 1) + + failed = result['writeErrors'][0] + self.assertEqual(2, failed['index']) + self.assertEqual(11000, failed['code']) + self.assertTrue(isinstance(failed['errmsg'], string_type)) + self.assertEqual(1, failed['op']['a']) + + failed = result['writeConcernErrors'][0] + self.assertEqual(64, failed['code']) + self.assertTrue(isinstance(failed['errmsg'], string_type)) + + upserts = result['upserted'] + self.assertEqual(1, len(upserts)) + self.assertEqual(1, upserts[0]['index']) + self.assertTrue(upserts[0].get('_id')) + + +class TestLegacyBulkAuthorization(BulkAuthorizationTestBase): + + @classmethod + def setUpClass(cls): + super(TestLegacyBulkAuthorization, cls).setUpClass() + cls.deprecation_filter = DeprecationFilter() + + @classmethod + def tearDownClass(cls): + cls.deprecation_filter.stop() + + def test_readonly(self): + # We test that an authorization failure aborts the batch and is raised + # as OperationFailure. + cli = rs_or_single_client_noauth() + db = cli.pymongo_test + coll = db.test + db.authenticate('readonly', 'pw') + bulk = coll.initialize_ordered_bulk_op() + bulk.insert({'x': 1}) + self.assertRaises(OperationFailure, bulk.execute) + + def test_no_remove(self): + # We test that an authorization failure aborts the batch and is raised + # as OperationFailure. + cli = rs_or_single_client_noauth() + db = cli.pymongo_test + coll = db.test + db.authenticate('noremove', 'pw') + bulk = coll.initialize_ordered_bulk_op() + bulk.insert({'x': 1}) + bulk.find({'x': 2}).upsert().replace_one({'x': 2}) + bulk.find({}).remove() # Prohibited. + bulk.insert({'x': 3}) # Never attempted. + self.assertRaises(OperationFailure, bulk.execute) + self.assertEqual(set([1, 2]), set(self.coll.distinct('x'))) + if __name__ == "__main__": unittest.main()