PYTHON-1318 Remove initialize_unordered_bulk_op and initialize_ordered_bulk_op (#692)

PYTHON-2436 Unskip test_large_inserts_ordered on MongoDB 5.0.
This commit is contained in:
Shane Harvey 2021-08-05 17:58:15 -07:00 committed by GitHub
parent a28b05bf24
commit 69dee51b90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 632 additions and 1395 deletions

View File

@ -67,6 +67,4 @@
.. automethod:: options
.. automethod:: map_reduce
.. automethod:: inline_map_reduce
.. automethod:: initialize_unordered_bulk_op
.. automethod:: initialize_ordered_bulk_op
.. automethod:: count

View File

@ -45,6 +45,10 @@ Breaking Changes in 4.0
- Removed :meth:`pymongo.collection.Collection.update`.
- Removed :meth:`pymongo.collection.Collection.remove`.
- Removed :meth:`pymongo.collection.Collection.find_and_modify`.
- Removed :meth:`pymongo.collection.Collection.initialize_ordered_bulk_op`,
:meth:`pymongo.collection.Collection.initialize_unordered_bulk_op`, and
:class:`pymongo.bulk.BulkOperationBuilder`. Use
:meth:`pymongo.collection.Collection.bulk_write` instead.
- Removed :meth:`pymongo.collection.Collection.group`.
- Removed the ``useCursor`` option for
:meth:`~pymongo.collection.Collection.aggregate`.

View File

@ -315,6 +315,51 @@ Can be changed to this::
replaced_doc = collection.find_one_and_replace({'b': 1}, {'c': 1})
deleted_doc = collection.find_one_and_delete({'c': 1})
Collection.initialize_ordered_bulk_op and initialize_unordered_bulk_op is removed
.................................................................................
Removed :meth:`pymongo.collection.Collection.initialize_ordered_bulk_op`
and :class:`pymongo.bulk.BulkOperationBuilder`. Use
:meth:`pymongo.collection.Collection.bulk_write` instead. Code like this::
batch = coll.initialize_ordered_bulk_op()
batch.insert({'a': 1})
batch.find({'a': 1}).update_one({'$set': {'b': 1}})
batch.find({'a': 2}).upsert().replace_one({'b': 2})
batch.find({'a': 3}).remove()
result = batch.execute()
Can be changed to this::
coll.bulk_write([
InsertOne({'a': 1}),
UpdateOne({'a': 1}, {'$set': {'b': 1}}),
ReplaceOne({'a': 2}, {'b': 2}, upsert=True),
DeleteOne({'a': 3}),
])
Collection.initialize_unordered_bulk_op is removed
..................................................
Removed :meth:`pymongo.collection.Collection.initialize_unordered_bulk_op`.
Use :meth:`pymongo.collection.Collection.bulk_write` instead. Code like this::
batch = coll.initialize_unordered_bulk_op()
batch.insert({'a': 1})
batch.find({'a': 1}).update_one({'$set': {'b': 1}})
batch.find({'a': 2}).upsert().replace_one({'b': 2})
batch.find({'a': 3}).remove()
result = batch.execute()
Can be changed to this::
coll.bulk_write([
InsertOne({'a': 1}),
UpdateOne({'a': 1}, {'$set': {'b': 1}}),
ReplaceOne({'a': 2}, {'b': 2}, upsert=True),
DeleteOne({'a': 3}),
], ordered=False)
Collection.group is removed
...........................

View File

@ -521,183 +521,3 @@ class _Bulk(object):
self.execute_no_results(sock_info, generator)
else:
return self.execute_command(generator, write_concern, session)
class BulkUpsertOperation(object):
"""An interface for adding upsert operations.
"""
__slots__ = ('__selector', '__bulk', '__collation')
def __init__(self, selector, bulk, collation):
self.__selector = selector
self.__bulk = bulk
self.__collation = collation
def update_one(self, update):
"""Update one document matching the selector.
:Parameters:
- `update` (dict): the update operations to apply
"""
self.__bulk.add_update(self.__selector,
update, multi=False, upsert=True,
collation=self.__collation)
def update(self, update):
"""Update all documents matching the selector.
:Parameters:
- `update` (dict): the update operations to apply
"""
self.__bulk.add_update(self.__selector,
update, multi=True, upsert=True,
collation=self.__collation)
def replace_one(self, replacement):
"""Replace one entire document matching the selector criteria.
:Parameters:
- `replacement` (dict): the replacement document
"""
self.__bulk.add_replace(self.__selector, replacement, upsert=True,
collation=self.__collation)
class BulkWriteOperation(object):
"""An interface for adding update or remove operations.
"""
__slots__ = ('__selector', '__bulk', '__collation')
def __init__(self, selector, bulk, collation):
self.__selector = selector
self.__bulk = bulk
self.__collation = collation
def update_one(self, update):
"""Update one document matching the selector criteria.
:Parameters:
- `update` (dict): the update operations to apply
"""
self.__bulk.add_update(self.__selector, update, multi=False,
collation=self.__collation)
def update(self, update):
"""Update all documents matching the selector criteria.
:Parameters:
- `update` (dict): the update operations to apply
"""
self.__bulk.add_update(self.__selector, update, multi=True,
collation=self.__collation)
def replace_one(self, replacement):
"""Replace one entire document matching the selector criteria.
:Parameters:
- `replacement` (dict): the replacement document
"""
self.__bulk.add_replace(self.__selector, replacement,
collation=self.__collation)
def remove_one(self):
"""Remove a single document matching the selector criteria.
"""
self.__bulk.add_delete(self.__selector, _DELETE_ONE,
collation=self.__collation)
def remove(self):
"""Remove all documents matching the selector criteria.
"""
self.__bulk.add_delete(self.__selector, _DELETE_ALL,
collation=self.__collation)
def upsert(self):
"""Specify that all chained update operations should be
upserts.
:Returns:
- A :class:`BulkUpsertOperation` instance, used to add
update operations to this bulk operation.
"""
return BulkUpsertOperation(self.__selector, self.__bulk,
self.__collation)
class BulkOperationBuilder(object):
"""**DEPRECATED**: An interface for executing a batch of write operations.
"""
__slots__ = '__bulk'
def __init__(self, collection, ordered=True,
bypass_document_validation=False):
"""**DEPRECATED**: Initialize a new BulkOperationBuilder instance.
:Parameters:
- `collection`: A :class:`~pymongo.collection.Collection` instance.
- `ordered` (optional): If ``True`` all operations will be executed
serially, in the order provided, and the entire execution will
abort on the first error. If ``False`` operations will be executed
in arbitrary order (possibly in parallel on the server), reporting
any errors that occurred after attempting all operations. Defaults
to ``True``.
- `bypass_document_validation`: (optional) If ``True``, allows the
write to opt-out of document level validation. Default is
``False``.
.. note:: `bypass_document_validation` requires server version
**>= 3.2**
.. versionchanged:: 3.5
Deprecated. Use :meth:`~pymongo.collection.Collection.bulk_write`
instead.
.. versionchanged:: 3.2
Added bypass_document_validation support
"""
self.__bulk = _Bulk(collection, ordered, bypass_document_validation)
def find(self, selector, collation=None):
"""Specify selection criteria for bulk operations.
:Parameters:
- `selector` (dict): the selection criteria for update
and remove operations.
- `collation` (optional): An instance of
:class:`~pymongo.collation.Collation`. This option is only
supported on MongoDB 3.4 and above.
:Returns:
- A :class:`BulkWriteOperation` instance, used to add
update and remove operations to this bulk operation.
.. versionchanged:: 3.4
Added the `collation` option.
"""
validate_is_mapping("selector", selector)
return BulkWriteOperation(selector, self.__bulk, collation)
def insert(self, document):
"""Insert a single document.
:Parameters:
- `document` (dict): the document to insert
.. seealso:: :ref:`writes-and-ids`
"""
self.__bulk.add_insert(document)
def execute(self, write_concern=None):
"""Execute all provided operations.
:Parameters:
- write_concern (optional): the write concern for this bulk
execution.
"""
if write_concern is not None:
write_concern = WriteConcern(**write_concern)
return self.__bulk.execute(write_concern, session=None)

View File

@ -29,7 +29,7 @@ from pymongo import (common,
message)
from pymongo.aggregation import (_CollectionAggregationCommand,
_CollectionRawAggregationCommand)
from pymongo.bulk import BulkOperationBuilder, _Bulk
from pymongo.bulk import _Bulk
from pymongo.command_cursor import CommandCursor, RawBatchCommandCursor
from pymongo.collation import validate_collation_or_none
from pymongo.change_stream import CollectionChangeStream
@ -358,69 +358,6 @@ class Collection(common.BaseObject):
write_concern or self.write_concern,
read_concern or self.read_concern)
def initialize_unordered_bulk_op(self, bypass_document_validation=False):
"""**DEPRECATED** - Initialize an unordered batch of write operations.
Operations will be performed on the server in arbitrary order,
possibly in parallel. All operations will be attempted.
:Parameters:
- `bypass_document_validation`: (optional) If ``True``, allows the
write to opt-out of document level validation. Default is
``False``.
Returns a :class:`~pymongo.bulk.BulkOperationBuilder` instance.
See :ref:`unordered_bulk` for examples.
.. note:: `bypass_document_validation` requires server version
**>= 3.2**
.. versionchanged:: 3.5
Deprecated. Use :meth:`~pymongo.collection.Collection.bulk_write`
instead.
.. versionchanged:: 3.2
Added bypass_document_validation support
.. versionadded:: 2.7
"""
warnings.warn("initialize_unordered_bulk_op is deprecated",
DeprecationWarning, stacklevel=2)
return BulkOperationBuilder(self, False, bypass_document_validation)
def initialize_ordered_bulk_op(self, bypass_document_validation=False):
"""**DEPRECATED** - Initialize an ordered batch of write operations.
Operations will be performed on the server serially, in the
order provided. If an error occurs all remaining operations
are aborted.
:Parameters:
- `bypass_document_validation`: (optional) If ``True``, allows the
write to opt-out of document level validation. Default is
``False``.
Returns a :class:`~pymongo.bulk.BulkOperationBuilder` instance.
See :ref:`ordered_bulk` for examples.
.. note:: `bypass_document_validation` requires server version
**>= 3.2**
.. versionchanged:: 3.5
Deprecated. Use :meth:`~pymongo.collection.Collection.bulk_write`
instead.
.. versionchanged:: 3.2
Added bypass_document_validation support
.. versionadded:: 2.7
"""
warnings.warn("initialize_ordered_bulk_op is deprecated",
DeprecationWarning, stacklevel=2)
return BulkOperationBuilder(self, True, bypass_document_validation)
def bulk_write(self, requests, ordered=True,
bypass_document_validation=False, session=None):
"""Send a batch of write operations to the server.

View File

@ -19,16 +19,20 @@ import sys
sys.path[0:0] = [""]
from bson.objectid import ObjectId
from pymongo.operations import *
from pymongo.errors import (ConfigurationError,
from pymongo.common import partition_node
from pymongo.errors import (BulkWriteError,
ConfigurationError,
InvalidOperation,
OperationFailure)
from pymongo.operations import *
from pymongo.write_concern import WriteConcern
from test import (client_context,
unittest,
IntegrationTest)
from test.utils import (remove_all_users,
rs_or_single_client_noauth)
rs_or_single_client_noauth,
single_client,
wait_until)
class BulkTestBase(IntegrationTest):
@ -37,8 +41,7 @@ class BulkTestBase(IntegrationTest):
def setUpClass(cls):
super(BulkTestBase, cls).setUpClass()
cls.coll = cls.db.test
ismaster = client_context.client.admin.command('ismaster')
cls.has_write_commands = (ismaster.get("maxWireVersion", 0) > 1)
cls.coll_w0 = cls.coll.with_options(write_concern=WriteConcern(w=0))
def setUp(self):
super(BulkTestBase, self).setUp()
@ -48,11 +51,7 @@ class BulkTestBase(IntegrationTest):
"""Compare response from bulk.execute() to expected response."""
for key, value in expected.items():
if key == 'nModified':
if self.has_write_commands:
self.assertEqual(value, actual['nModified'])
else:
# Legacy servers don't include nModified in the response.
self.assertFalse('nModified' in actual)
self.assertEqual(value, actual['nModified'])
elif key == 'upserted':
expected_upserts = value
actual_upserts = actual['upserted']
@ -182,7 +181,7 @@ class TestBulk(BulkTestBase):
self.assertRaises(TypeError, UpdateOne, {}, {}, array_filters={})
def test_array_filters_unacknowledged(self):
coll = self.coll.with_options(write_concern=WriteConcern(w=0))
coll = self.coll_w0
update_one = UpdateOne(
{}, {'$set': {'y.$[i].b': 5}}, array_filters=[{'i.b': 1}])
update_many = UpdateMany(
@ -338,9 +337,7 @@ class TestBulk(BulkTestBase):
self.assertEqual(5, len(result.inserted_ids))
def test_bulk_write_no_results(self):
coll = self.coll.with_options(write_concern=WriteConcern(w=0))
result = coll.bulk_write([InsertOne({})])
result = self.coll_w0.bulk_write([InsertOne({})])
self.assertFalse(result.acknowledged)
self.assertRaises(InvalidOperation, lambda: result.inserted_count)
self.assertRaises(InvalidOperation, lambda: result.matched_count)
@ -359,6 +356,294 @@ class TestBulk(BulkTestBase):
with self.assertRaises(TypeError):
self.coll.bulk_write([{}])
def test_upsert_large(self):
big = 'a' * (client_context.client.max_bson_size - 37)
result = self.coll.bulk_write([
UpdateOne({'x': 1}, {'$set': {'s': big}}, upsert=True)])
self.assertEqualResponse(
{'nMatched': 0,
'nModified': 0,
'nUpserted': 1,
'nInserted': 0,
'nRemoved': 0,
'upserted': [{'index': 0, '_id': '...'}]},
result.bulk_api_result)
self.assertEqual(1, self.coll.count_documents({'x': 1}))
def test_client_generated_upsert_id(self):
result = self.coll.bulk_write([
UpdateOne({'_id': 0}, {'$set': {'a': 0}}, upsert=True),
ReplaceOne({'a': 1}, {'_id': 1}, upsert=True),
# This is just here to make the counts right in all cases.
ReplaceOne({'_id': 2}, {'_id': 2}, upsert=True),
])
self.assertEqualResponse(
{'nMatched': 0,
'nModified': 0,
'nUpserted': 3,
'nInserted': 0,
'nRemoved': 0,
'upserted': [{'index': 0, '_id': 0},
{'index': 1, '_id': 1},
{'index': 2, '_id': 2}]},
result.bulk_api_result)
def test_single_ordered_batch(self):
result = self.coll.bulk_write([
InsertOne({'a': 1}),
UpdateOne({'a': 1}, {'$set': {'b': 1}}),
UpdateOne({'a': 2}, {'$set': {'b': 2}}, upsert=True),
InsertOne({'a': 3}),
DeleteOne({'a': 3}),
])
self.assertEqualResponse(
{'nMatched': 1,
'nModified': 1,
'nUpserted': 1,
'nInserted': 2,
'nRemoved': 1,
'upserted': [{'index': 2, '_id': '...'}]},
result.bulk_api_result)
def test_single_error_ordered_batch(self):
self.coll.create_index('a', unique=True)
self.addCleanup(self.coll.drop_index, [('a', 1)])
requests = [
InsertOne({'b': 1, 'a': 1}),
UpdateOne({'b': 2}, {'$set': {'a': 1}}, upsert=True),
InsertOne({'b': 3, 'a': 2}),
]
try:
self.coll.bulk_write(requests)
except BulkWriteError as exc:
result = exc.details
self.assertEqual(exc.code, 65)
else:
self.fail("Error not raised")
self.assertEqualResponse(
{'nMatched': 0,
'nModified': 0,
'nUpserted': 0,
'nInserted': 1,
'nRemoved': 0,
'upserted': [],
'writeConcernErrors': [],
'writeErrors': [
{'index': 1,
'code': 11000,
'errmsg': '...',
'op': {'q': {'b': 2},
'u': {'$set': {'a': 1}},
'multi': False,
'upsert': True}}]},
result)
def test_multiple_error_ordered_batch(self):
self.coll.create_index('a', unique=True)
self.addCleanup(self.coll.drop_index, [('a', 1)])
requests = [
InsertOne({'b': 1, 'a': 1}),
UpdateOne({'b': 2}, {'$set': {'a': 1}}, upsert=True),
UpdateOne({'b': 3}, {'$set': {'a': 2}}, upsert=True),
UpdateOne({'b': 2}, {'$set': {'a': 1}}, upsert=True),
InsertOne({'b': 4, 'a': 3}),
InsertOne({'b': 5, 'a': 1}),
]
try:
self.coll.bulk_write(requests)
except BulkWriteError as exc:
result = exc.details
self.assertEqual(exc.code, 65)
else:
self.fail("Error not raised")
self.assertEqualResponse(
{'nMatched': 0,
'nModified': 0,
'nUpserted': 0,
'nInserted': 1,
'nRemoved': 0,
'upserted': [],
'writeConcernErrors': [],
'writeErrors': [
{'index': 1,
'code': 11000,
'errmsg': '...',
'op': {'q': {'b': 2},
'u': {'$set': {'a': 1}},
'multi': False,
'upsert': True}}]},
result)
def test_single_unordered_batch(self):
requests = [
InsertOne({'a': 1}),
UpdateOne({'a': 1}, {'$set': {'b': 1}}),
UpdateOne({'a': 2}, {'$set': {'b': 2}}, upsert=True),
InsertOne({'a': 3}),
DeleteOne({'a': 3}),
]
result = self.coll.bulk_write(requests, ordered=False)
self.assertEqualResponse(
{'nMatched': 1,
'nModified': 1,
'nUpserted': 1,
'nInserted': 2,
'nRemoved': 1,
'upserted': [{'index': 2, '_id': '...'}],
'writeErrors': [],
'writeConcernErrors': []},
result.bulk_api_result)
def test_single_error_unordered_batch(self):
self.coll.create_index('a', unique=True)
self.addCleanup(self.coll.drop_index, [('a', 1)])
requests = [
InsertOne({'b': 1, 'a': 1}),
UpdateOne({'b': 2}, {'$set': {'a': 1}}, upsert=True),
InsertOne({'b': 3, 'a': 2}),
]
try:
self.coll.bulk_write(requests, ordered=False)
except BulkWriteError as exc:
result = exc.details
self.assertEqual(exc.code, 65)
else:
self.fail("Error not raised")
self.assertEqualResponse(
{'nMatched': 0,
'nModified': 0,
'nUpserted': 0,
'nInserted': 2,
'nRemoved': 0,
'upserted': [],
'writeConcernErrors': [],
'writeErrors': [
{'index': 1,
'code': 11000,
'errmsg': '...',
'op': {'q': {'b': 2},
'u': {'$set': {'a': 1}},
'multi': False,
'upsert': True}}]},
result)
def test_multiple_error_unordered_batch(self):
self.coll.create_index('a', unique=True)
self.addCleanup(self.coll.drop_index, [('a', 1)])
requests = [
InsertOne({'b': 1, 'a': 1}),
UpdateOne({'b': 2}, {'$set': {'a': 3}}, upsert=True),
UpdateOne({'b': 3}, {'$set': {'a': 4}}, upsert=True),
UpdateOne({'b': 4}, {'$set': {'a': 3}}, upsert=True),
InsertOne({'b': 5, 'a': 2}),
InsertOne({'b': 6, 'a': 1}),
]
try:
self.coll.bulk_write(requests, ordered=False)
except BulkWriteError as exc:
result = exc.details
self.assertEqual(exc.code, 65)
else:
self.fail("Error not raised")
# Assume the update at index 1 runs before the update at index 3,
# although the spec does not require it. Same for inserts.
self.assertEqualResponse(
{'nMatched': 0,
'nModified': 0,
'nUpserted': 2,
'nInserted': 2,
'nRemoved': 0,
'upserted': [
{'index': 1, '_id': '...'},
{'index': 2, '_id': '...'}],
'writeConcernErrors': [],
'writeErrors': [
{'index': 3,
'code': 11000,
'errmsg': '...',
'op': {'q': {'b': 4},
'u': {'$set': {'a': 3}},
'multi': False,
'upsert': True}},
{'index': 5,
'code': 11000,
'errmsg': '...',
'op': {'_id': '...', 'b': 6, 'a': 1}}]},
result)
def test_large_inserts_ordered(self):
big = 'x' * self.coll.database.client.max_bson_size
requests = [
InsertOne({'b': 1, 'a': 1}),
InsertOne({'big': big}),
InsertOne({'b': 2, 'a': 2}),
]
try:
self.coll.bulk_write(requests)
except BulkWriteError as exc:
result = exc.details
self.assertEqual(exc.code, 65)
else:
self.fail("Error not raised")
self.assertEqual(1, result['nInserted'])
self.coll.delete_many({})
big = 'x' * (1024 * 1024 * 4)
result = self.coll.bulk_write([
InsertOne({'a': 1, 'big': big}),
InsertOne({'a': 2, 'big': big}),
InsertOne({'a': 3, 'big': big}),
InsertOne({'a': 4, 'big': big}),
InsertOne({'a': 5, 'big': big}),
InsertOne({'a': 6, 'big': big}),
])
self.assertEqual(6, result.inserted_count)
self.assertEqual(6, self.coll.count_documents({}))
def test_large_inserts_unordered(self):
big = 'x' * self.coll.database.client.max_bson_size
requests = [
InsertOne({'b': 1, 'a': 1}),
InsertOne({'big': big}),
InsertOne({'b': 2, 'a': 2}),
]
try:
self.coll.bulk_write(requests, ordered=False)
except BulkWriteError as exc:
result = exc.details
self.assertEqual(exc.code, 65)
else:
self.fail("Error not raised")
self.assertEqual(2, result['nInserted'])
self.coll.delete_many({})
big = 'x' * (1024 * 1024 * 4)
result = self.coll.bulk_write([
InsertOne({'a': 1, 'big': big}),
InsertOne({'a': 2, 'big': big}),
InsertOne({'a': 3, 'big': big}),
InsertOne({'a': 4, 'big': big}),
InsertOne({'a': 5, 'big': big}),
InsertOne({'a': 6, 'big': big}),
], ordered=False)
self.assertEqual(6, result.inserted_count)
self.assertEqual(6, self.coll.count_documents({}))
class BulkAuthorizationTestBase(BulkTestBase):
@ -387,6 +672,73 @@ class BulkAuthorizationTestBase(BulkTestBase):
remove_all_users(self.db)
class TestBulkUnacknowledged(BulkTestBase):
def tearDown(self):
self.coll.delete_many({})
def test_no_results_ordered_success(self):
requests = [
InsertOne({'a': 1}),
UpdateOne({'a': 3}, {'$set': {'b': 1}}, upsert=True),
InsertOne({'a': 2}),
DeleteOne({'a': 1}),
]
result = self.coll_w0.bulk_write(requests)
self.assertFalse(result.acknowledged)
wait_until(lambda: 2 == self.coll.count_documents({}),
'insert 2 documents')
wait_until(lambda: self.coll.find_one({'_id': 1}) is None,
'removed {"_id": 1}')
def test_no_results_ordered_failure(self):
requests = [
InsertOne({'_id': 1}),
UpdateOne({'_id': 3}, {'$set': {'b': 1}}, upsert=True),
InsertOne({'_id': 2}),
# Fails with duplicate key error.
InsertOne({'_id': 1}),
# Should not be executed since the batch is ordered.
DeleteOne({'_id': 1}),
]
result = self.coll_w0.bulk_write(requests)
self.assertFalse(result.acknowledged)
wait_until(lambda: 3 == self.coll.count_documents({}),
'insert 3 documents')
self.assertEqual({'_id': 1}, self.coll.find_one({'_id': 1}))
def test_no_results_unordered_success(self):
requests = [
InsertOne({'a': 1}),
UpdateOne({'a': 3}, {'$set': {'b': 1}}, upsert=True),
InsertOne({'a': 2}),
DeleteOne({'a': 1}),
]
result = self.coll_w0.bulk_write(requests, ordered=False)
self.assertFalse(result.acknowledged)
wait_until(lambda: 2 == self.coll.count_documents({}),
'insert 2 documents')
wait_until(lambda: self.coll.find_one({'_id': 1}) is None,
'removed {"_id": 1}')
def test_no_results_unordered_failure(self):
requests = [
InsertOne({'_id': 1}),
UpdateOne({'_id': 3}, {'$set': {'b': 1}}, upsert=True),
InsertOne({'_id': 2}),
# Fails with duplicate key error.
InsertOne({'_id': 1}),
# Should be executed since the batch is unordered.
DeleteOne({'_id': 1}),
]
result = self.coll_w0.bulk_write(requests, ordered=False)
self.assertFalse(result.acknowledged)
wait_until(lambda: 2 == self.coll.count_documents({}),
'insert 2 documents')
wait_until(lambda: self.coll.find_one({'_id': 1}) is None,
'removed {"_id": 1}')
class TestBulkAuthorization(BulkAuthorizationTestBase):
def test_readonly(self):
@ -415,5 +767,205 @@ class TestBulkAuthorization(BulkAuthorizationTestBase):
self.assertRaises(OperationFailure, coll.bulk_write, requests)
self.assertEqual(set([1, 2]), set(self.coll.distinct('x')))
class TestBulkWriteConcern(BulkTestBase):
@classmethod
def setUpClass(cls):
super(TestBulkWriteConcern, cls).setUpClass()
cls.w = client_context.w
cls.secondary = None
if cls.w > 1:
for member in client_context.ismaster['hosts']:
if member != client_context.ismaster['primary']:
cls.secondary = single_client(*partition_node(member))
break
# We tested wtimeout errors by specifying a write concern greater than
# the number of members, but in MongoDB 2.7.8+ this causes a different
# sort of error, "Not enough data-bearing nodes". In recent servers we
# use a failpoint to pause replication on a secondary.
cls.need_replication_stopped = client_context.version.at_least(2, 7, 8)
@classmethod
def tearDownClass(cls):
if cls.secondary:
cls.secondary.close()
def cause_wtimeout(self, requests, ordered):
if self.need_replication_stopped:
if not client_context.test_commands_enabled:
self.skipTest("Test commands must be enabled.")
self.secondary.admin.command('configureFailPoint',
'rsSyncApplyStop',
mode='alwaysOn')
try:
coll = self.coll.with_options(
write_concern=WriteConcern(w=self.w, wtimeout=1))
return coll.bulk_write(requests, ordered=ordered)
finally:
self.secondary.admin.command('configureFailPoint',
'rsSyncApplyStop',
mode='off')
else:
coll = self.coll.with_options(
write_concern=WriteConcern(w=self.w + 1, wtimeout=1))
return coll.bulk_write(requests, ordered=ordered)
@client_context.require_replica_set
@client_context.require_secondaries_count(1)
def test_write_concern_failure_ordered(self):
# Ensure we don't raise on wnote.
coll_ww = self.coll.with_options(write_concern=WriteConcern(w=self.w))
result = coll_ww.bulk_write([
DeleteOne({"something": "that does no exist"})])
self.assertTrue(result.acknowledged)
requests = [
InsertOne({'a': 1}),
InsertOne({'a': 2})
]
# Replication wtimeout is a 'soft' error.
# It shouldn't stop batch processing.
try:
self.cause_wtimeout(requests, ordered=True)
except BulkWriteError as exc:
result = exc.details
self.assertEqual(exc.code, 65)
else:
self.fail("Error not raised")
self.assertEqualResponse(
{'nMatched': 0,
'nModified': 0,
'nUpserted': 0,
'nInserted': 2,
'nRemoved': 0,
'upserted': [],
'writeErrors': []},
result)
# When talking to legacy servers there will be a
# write concern error for each operation.
self.assertTrue(len(result['writeConcernErrors']) > 0)
failed = result['writeConcernErrors'][0]
self.assertEqual(64, failed['code'])
self.assertTrue(isinstance(failed['errmsg'], str))
self.coll.delete_many({})
self.coll.create_index('a', unique=True)
self.addCleanup(self.coll.drop_index, [('a', 1)])
# Fail due to write concern support as well
# as duplicate key error on ordered batch.
requests = [
InsertOne({'a': 1}),
ReplaceOne({'a': 3}, {'b': 1}, upsert=True),
InsertOne({'a': 1}),
InsertOne({'a': 2}),
]
try:
self.cause_wtimeout(requests, ordered=True)
except BulkWriteError as exc:
result = exc.details
self.assertEqual(exc.code, 65)
else:
self.fail("Error not raised")
self.assertEqualResponse(
{'nMatched': 0,
'nModified': 0,
'nUpserted': 1,
'nInserted': 1,
'nRemoved': 0,
'upserted': [{'index': 1, '_id': '...'}],
'writeErrors': [
{'index': 2,
'code': 11000,
'errmsg': '...',
'op': {'_id': '...', 'a': 1}}]},
result)
self.assertTrue(len(result['writeConcernErrors']) > 1)
failed = result['writeErrors'][0]
self.assertTrue("duplicate" in failed['errmsg'])
@client_context.require_replica_set
@client_context.require_secondaries_count(1)
def test_write_concern_failure_unordered(self):
# Ensure we don't raise on wnote.
coll_ww = self.coll.with_options(write_concern=WriteConcern(w=self.w))
result = coll_ww.bulk_write([
DeleteOne({"something": "that does no exist"})], ordered=False)
self.assertTrue(result.acknowledged)
requests = [
InsertOne({'a': 1}),
UpdateOne({'a': 3}, {'$set': {'a': 3, 'b': 1}}, upsert=True),
InsertOne({'a': 2}),
]
# Replication wtimeout is a 'soft' error.
# It shouldn't stop batch processing.
try:
self.cause_wtimeout(requests, ordered=False)
except BulkWriteError as exc:
result = exc.details
self.assertEqual(exc.code, 65)
else:
self.fail("Error not raised")
self.assertEqual(2, result['nInserted'])
self.assertEqual(1, result['nUpserted'])
self.assertEqual(0, len(result['writeErrors']))
# When talking to legacy servers there will be a
# write concern error for each operation.
self.assertTrue(len(result['writeConcernErrors']) > 1)
self.coll.delete_many({})
self.coll.create_index('a', unique=True)
self.addCleanup(self.coll.drop_index, [('a', 1)])
# Fail due to write concern support as well
# as duplicate key error on unordered batch.
requests = [
InsertOne({'a': 1}),
UpdateOne({'a': 3}, {'$set': {'a': 3, 'b': 1}}, upsert=True),
InsertOne({'a': 1}),
InsertOne({'a': 2}),
]
try:
self.cause_wtimeout(requests, ordered=False)
except BulkWriteError as exc:
result = exc.details
self.assertEqual(exc.code, 65)
else:
self.fail("Error not raised")
self.assertEqual(2, result['nInserted'])
self.assertEqual(1, result['nUpserted'])
self.assertEqual(1, len(result['writeErrors']))
# When talking to legacy servers there will be a
# write concern error for each operation.
self.assertTrue(len(result['writeConcernErrors']) > 1)
failed = result['writeErrors'][0]
self.assertEqual(2, failed['index'])
self.assertEqual(11000, failed['code'])
self.assertTrue(isinstance(failed['errmsg'], str))
self.assertEqual(1, failed['op']['a'])
failed = result['writeConcernErrors'][0]
self.assertEqual(64, failed['code'])
self.assertTrue(isinstance(failed['errmsg'], str))
upserts = result['upserted']
self.assertEqual(1, len(upserts))
self.assertEqual(1, upserts[0]['index'])
self.assertTrue(upserts[0].get('_id'))
if __name__ == "__main__":
unittest.main()

View File

@ -282,49 +282,6 @@ class TestCollation(IntegrationTest):
check_ops(delete_cmd['deletes'])
check_ops(update_cmd['updates'])
@raisesConfigurationErrorForOldMongoDB
def test_bulk(self):
bulk = self.db.test.initialize_ordered_bulk_op()
bulk.find({'noCollation': 42}).remove_one()
bulk.find({'noCollation': 42}).remove()
bulk.find({'foo': 42}, collation=self.collation).remove_one()
bulk.find({'foo': 42}, collation=self.collation).remove()
bulk.find({'noCollation': 24}).replace_one({'bar': 42})
bulk.find({'noCollation': 84}).upsert().update_one(
{'$set': {'foo': 10}})
bulk.find({'noCollation': 45}).update({'$set': {'bar': 42}})
bulk.find({'foo': 24}, collation=self.collation).replace_one(
{'foo': 42})
bulk.find({'foo': 84}, collation=self.collation).upsert().update_one(
{'$set': {'foo': 10}})
bulk.find({'foo': 45}, collation=self.collation).update({
'$set': {'foo': 42}})
bulk.execute()
delete_cmd = self.listener.results['started'][0].command
update_cmd = self.listener.results['started'][1].command
def check_ops(ops):
for op in ops:
if 'noCollation' in op['q']:
self.assertNotIn('collation', op)
else:
self.assertEqual(self.collation.document,
op['collation'])
check_ops(delete_cmd['deletes'])
check_ops(update_cmd['updates'])
@client_context.require_version_max(3, 3, 8)
def test_mixed_bulk_collation(self):
bulk = self.db.test.initialize_unordered_bulk_op()
bulk.find({'foo': 42}).upsert().update_one(
{'$set': {'bar': 10}})
bulk.find({'foo': 43}, collation=self.collation).remove_one()
with self.assertRaises(ConfigurationError):
bulk.execute()
self.assertIsNone(self.db.test.find_one({'foo': 42}))
@raisesConfigurationErrorForOldMongoDB
def test_indexes_same_keys_different_collations(self):
self.db.test.drop()
@ -356,11 +313,6 @@ class TestCollation(IntegrationTest):
collection.update_one(
{'hello': 'world'}, {'$set': {'hello': 'moon'}},
collation=self.collation)
bulk = collection.initialize_ordered_bulk_op()
bulk.find({'hello': 'world'}, collation=self.collation).update_one(
{'$set': {'hello': 'moon'}})
with self.assertRaises(ConfigurationError):
bulk.execute()
update_one = UpdateOne({'hello': 'world'}, {'$set': {'hello': 'moon'}},
collation=self.collation)
with self.assertRaises(ConfigurationError):

View File

@ -789,6 +789,20 @@ class TestCollection(IntegrationTest):
self.assertFalse(result.acknowledged)
self.assertEqual(20, db.test.count_documents({}))
def test_insert_many_generator(self):
coll = self.db.test
coll.delete_many({})
def gen():
yield {'a': 1, 'b': 1}
yield {'a': 1, 'b': 2}
yield {'a': 2, 'b': 3}
yield {'a': 3, 'b': 5}
yield {'a': 5, 'b': 8}
result = coll.insert_many(gen())
self.assertEqual(5, len(result.inserted_ids))
def test_insert_many_invalid(self):
db = self.db

File diff suppressed because it is too large Load Diff

View File

@ -1081,14 +1081,6 @@ class TestClusterTime(IntegrationTest):
self.addCleanup(collection.drop)
self.addCleanup(client.pymongo_test.collection2.drop)
def bulk_insert(ordered):
if ordered:
bulk = collection.initialize_ordered_bulk_op()
else:
bulk = collection.initialize_unordered_bulk_op()
bulk.insert({})
bulk.execute()
def rename_and_drop():
# Ensure collection exists.
collection.insert_one({})
@ -1130,8 +1122,6 @@ class TestClusterTime(IntegrationTest):
('delete_one', lambda: collection.delete_one({})),
('delete_many', lambda: collection.delete_many({})),
('bulk_write', lambda: collection.bulk_write([InsertOne({})])),
('ordered bulk', lambda: bulk_insert(True)),
('unordered bulk', lambda: bulk_insert(False)),
('rename_and_drop', rename_and_drop),
]