Use the hello command instead:
doc = client.admin.command('hello')
max_bson_size = doc['maxBsonObjectSize']
max_message_size = doc['maxMessageSizeBytes']
max_write_batch_size = doc['maxWriteBatchSize']
Also add documentation for TopologyDescription.apply_selector.
950 lines
33 KiB
Python
950 lines
33 KiB
Python
# Copyright 2014-present MongoDB, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Test the bulk API."""
|
|
|
|
import sys
|
|
|
|
sys.path[0:0] = [""]
|
|
|
|
from bson.objectid import ObjectId
|
|
from pymongo.common import partition_node
|
|
from pymongo.errors import (BulkWriteError,
|
|
ConfigurationError,
|
|
InvalidOperation,
|
|
OperationFailure)
|
|
from pymongo.operations import *
|
|
from pymongo.write_concern import WriteConcern
|
|
from test import (client_context,
|
|
unittest,
|
|
IntegrationTest)
|
|
from test.utils import (remove_all_users,
|
|
rs_or_single_client_noauth,
|
|
single_client,
|
|
wait_until)
|
|
|
|
|
|
class BulkTestBase(IntegrationTest):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(BulkTestBase, cls).setUpClass()
|
|
cls.coll = cls.db.test
|
|
cls.coll_w0 = cls.coll.with_options(write_concern=WriteConcern(w=0))
|
|
|
|
def setUp(self):
|
|
super(BulkTestBase, self).setUp()
|
|
self.coll.drop()
|
|
|
|
def assertEqualResponse(self, expected, actual):
|
|
"""Compare response from bulk.execute() to expected response."""
|
|
for key, value in expected.items():
|
|
if key == 'nModified':
|
|
self.assertEqual(value, actual['nModified'])
|
|
elif key == 'upserted':
|
|
expected_upserts = value
|
|
actual_upserts = actual['upserted']
|
|
self.assertEqual(
|
|
len(expected_upserts), len(actual_upserts),
|
|
'Expected %d elements in "upserted", got %d' % (
|
|
len(expected_upserts), len(actual_upserts)))
|
|
|
|
for e, a in zip(expected_upserts, actual_upserts):
|
|
self.assertEqualUpsert(e, a)
|
|
|
|
elif key == 'writeErrors':
|
|
expected_errors = value
|
|
actual_errors = actual['writeErrors']
|
|
self.assertEqual(
|
|
len(expected_errors), len(actual_errors),
|
|
'Expected %d elements in "writeErrors", got %d' % (
|
|
len(expected_errors), len(actual_errors)))
|
|
|
|
for e, a in zip(expected_errors, actual_errors):
|
|
self.assertEqualWriteError(e, a)
|
|
|
|
else:
|
|
self.assertEqual(
|
|
actual.get(key), value,
|
|
'%r value of %r does not match expected %r' %
|
|
(key, actual.get(key), value))
|
|
|
|
def assertEqualUpsert(self, expected, actual):
|
|
"""Compare bulk.execute()['upserts'] to expected value.
|
|
|
|
Like: {'index': 0, '_id': ObjectId()}
|
|
"""
|
|
self.assertEqual(expected['index'], actual['index'])
|
|
if expected['_id'] == '...':
|
|
# Unspecified value.
|
|
self.assertTrue('_id' in actual)
|
|
else:
|
|
self.assertEqual(expected['_id'], actual['_id'])
|
|
|
|
def assertEqualWriteError(self, expected, actual):
|
|
"""Compare bulk.execute()['writeErrors'] to expected value.
|
|
|
|
Like: {'index': 0, 'code': 123, 'errmsg': '...', 'op': { ... }}
|
|
"""
|
|
self.assertEqual(expected['index'], actual['index'])
|
|
self.assertEqual(expected['code'], actual['code'])
|
|
if expected['errmsg'] == '...':
|
|
# Unspecified value.
|
|
self.assertTrue('errmsg' in actual)
|
|
else:
|
|
self.assertEqual(expected['errmsg'], actual['errmsg'])
|
|
|
|
expected_op = expected['op'].copy()
|
|
actual_op = actual['op'].copy()
|
|
if expected_op.get('_id') == '...':
|
|
# Unspecified _id.
|
|
self.assertTrue('_id' in actual_op)
|
|
actual_op.pop('_id')
|
|
expected_op.pop('_id')
|
|
|
|
self.assertEqual(expected_op, actual_op)
|
|
|
|
|
|
class TestBulk(BulkTestBase):
|
|
|
|
def test_empty(self):
|
|
self.assertRaises(InvalidOperation, self.coll.bulk_write, [])
|
|
|
|
def test_insert(self):
|
|
expected = {
|
|
'nMatched': 0,
|
|
'nModified': 0,
|
|
'nUpserted': 0,
|
|
'nInserted': 1,
|
|
'nRemoved': 0,
|
|
'upserted': [],
|
|
'writeErrors': [],
|
|
'writeConcernErrors': []
|
|
}
|
|
|
|
result = self.coll.bulk_write([InsertOne({})])
|
|
self.assertEqualResponse(expected, result.bulk_api_result)
|
|
self.assertEqual(1, result.inserted_count)
|
|
self.assertEqual(1, self.coll.count_documents({}))
|
|
|
|
def _test_update_many(self, update):
|
|
|
|
expected = {
|
|
'nMatched': 2,
|
|
'nModified': 2,
|
|
'nUpserted': 0,
|
|
'nInserted': 0,
|
|
'nRemoved': 0,
|
|
'upserted': [],
|
|
'writeErrors': [],
|
|
'writeConcernErrors': []
|
|
}
|
|
self.coll.insert_many([{}, {}])
|
|
|
|
result = self.coll.bulk_write([UpdateMany({}, update)])
|
|
self.assertEqualResponse(expected, result.bulk_api_result)
|
|
self.assertEqual(2, result.matched_count)
|
|
self.assertTrue(result.modified_count in (2, None))
|
|
|
|
def test_update_many(self):
|
|
self._test_update_many({'$set': {'foo': 'bar'}})
|
|
|
|
@client_context.require_version_min(4, 1, 11)
|
|
def test_update_many_pipeline(self):
|
|
self._test_update_many([{'$set': {'foo': 'bar'}}])
|
|
|
|
def test_array_filters_validation(self):
|
|
self.assertRaises(TypeError, UpdateMany, {}, {}, array_filters={})
|
|
self.assertRaises(TypeError, UpdateOne, {}, {}, array_filters={})
|
|
|
|
def test_array_filters_unacknowledged(self):
|
|
coll = self.coll_w0
|
|
update_one = UpdateOne(
|
|
{}, {'$set': {'y.$[i].b': 5}}, array_filters=[{'i.b': 1}])
|
|
update_many = UpdateMany(
|
|
{}, {'$set': {'y.$[i].b': 5}}, array_filters=[{'i.b': 1}])
|
|
self.assertRaises(ConfigurationError, coll.bulk_write, [update_one])
|
|
self.assertRaises(ConfigurationError, coll.bulk_write, [update_many])
|
|
|
|
def _test_update_one(self, update):
|
|
expected = {
|
|
'nMatched': 1,
|
|
'nModified': 1,
|
|
'nUpserted': 0,
|
|
'nInserted': 0,
|
|
'nRemoved': 0,
|
|
'upserted': [],
|
|
'writeErrors': [],
|
|
'writeConcernErrors': []
|
|
}
|
|
|
|
self.coll.insert_many([{}, {}])
|
|
|
|
result = self.coll.bulk_write([UpdateOne({}, update)])
|
|
self.assertEqualResponse(expected, result.bulk_api_result)
|
|
self.assertEqual(1, result.matched_count)
|
|
self.assertTrue(result.modified_count in (1, None))
|
|
|
|
def test_update_one(self):
|
|
self._test_update_one({'$set': {'foo': 'bar'}})
|
|
|
|
@client_context.require_version_min(4, 1, 11)
|
|
def test_update_one_pipeline(self):
|
|
self._test_update_one([{'$set': {'foo': 'bar'}}])
|
|
|
|
def test_replace_one(self):
|
|
|
|
expected = {
|
|
'nMatched': 1,
|
|
'nModified': 1,
|
|
'nUpserted': 0,
|
|
'nInserted': 0,
|
|
'nRemoved': 0,
|
|
'upserted': [],
|
|
'writeErrors': [],
|
|
'writeConcernErrors': []
|
|
}
|
|
|
|
self.coll.insert_many([{}, {}])
|
|
|
|
result = self.coll.bulk_write([ReplaceOne({}, {'foo': 'bar'})])
|
|
self.assertEqualResponse(expected, result.bulk_api_result)
|
|
self.assertEqual(1, result.matched_count)
|
|
self.assertTrue(result.modified_count in (1, None))
|
|
|
|
def test_remove(self):
|
|
# Test removing all documents, ordered.
|
|
expected = {
|
|
'nMatched': 0,
|
|
'nModified': 0,
|
|
'nUpserted': 0,
|
|
'nInserted': 0,
|
|
'nRemoved': 2,
|
|
'upserted': [],
|
|
'writeErrors': [],
|
|
'writeConcernErrors': []
|
|
}
|
|
self.coll.insert_many([{}, {}])
|
|
|
|
result = self.coll.bulk_write([DeleteMany({})])
|
|
self.assertEqualResponse(expected, result.bulk_api_result)
|
|
self.assertEqual(2, result.deleted_count)
|
|
|
|
def test_remove_one(self):
|
|
# Test removing one document, empty selector.
|
|
self.coll.insert_many([{}, {}])
|
|
expected = {
|
|
'nMatched': 0,
|
|
'nModified': 0,
|
|
'nUpserted': 0,
|
|
'nInserted': 0,
|
|
'nRemoved': 1,
|
|
'upserted': [],
|
|
'writeErrors': [],
|
|
'writeConcernErrors': []
|
|
}
|
|
|
|
result = self.coll.bulk_write([DeleteOne({})])
|
|
self.assertEqualResponse(expected, result.bulk_api_result)
|
|
self.assertEqual(1, result.deleted_count)
|
|
self.assertEqual(self.coll.count_documents({}), 1)
|
|
|
|
def test_upsert(self):
|
|
|
|
expected = {
|
|
'nMatched': 0,
|
|
'nModified': 0,
|
|
'nUpserted': 1,
|
|
'nInserted': 0,
|
|
'nRemoved': 0,
|
|
'upserted': [{'index': 0, '_id': '...'}]
|
|
}
|
|
|
|
result = self.coll.bulk_write([ReplaceOne({},
|
|
{'foo': 'bar'},
|
|
upsert=True)])
|
|
self.assertEqualResponse(expected, result.bulk_api_result)
|
|
self.assertEqual(1, result.upserted_count)
|
|
self.assertEqual(1, len(result.upserted_ids))
|
|
self.assertTrue(isinstance(result.upserted_ids.get(0), ObjectId))
|
|
|
|
self.assertEqual(self.coll.count_documents({'foo': 'bar'}), 1)
|
|
|
|
def test_numerous_inserts(self):
|
|
# Ensure we don't exceed server's maxWriteBatchSize size limit.
|
|
n_docs = client_context.max_write_batch_size + 100
|
|
requests = [InsertOne({}) for _ in range(n_docs)]
|
|
result = self.coll.bulk_write(requests, ordered=False)
|
|
self.assertEqual(n_docs, result.inserted_count)
|
|
self.assertEqual(n_docs, self.coll.count_documents({}))
|
|
|
|
# Same with ordered bulk.
|
|
self.coll.drop()
|
|
result = self.coll.bulk_write(requests)
|
|
self.assertEqual(n_docs, result.inserted_count)
|
|
self.assertEqual(n_docs, self.coll.count_documents({}))
|
|
|
|
def test_bulk_max_message_size(self):
|
|
self.coll.delete_many({})
|
|
self.addCleanup(self.coll.delete_many, {})
|
|
_16_MB = 16 * 1000 * 1000
|
|
# Generate a list of documents such that the first batched OP_MSG is
|
|
# as close as possible to the 48MB limit.
|
|
docs = [
|
|
{'_id': 1, 'l': 's' * _16_MB},
|
|
{'_id': 2, 'l': 's' * _16_MB},
|
|
{'_id': 3, 'l': 's' * (_16_MB - 10000)},
|
|
]
|
|
# Fill in the remaining ~10000 bytes with small documents.
|
|
for i in range(4, 10000):
|
|
docs.append({'_id': i})
|
|
result = self.coll.insert_many(docs)
|
|
self.assertEqual(len(docs), len(result.inserted_ids))
|
|
|
|
def test_generator_insert(self):
|
|
def gen():
|
|
yield {'a': 1, 'b': 1}
|
|
yield {'a': 1, 'b': 2}
|
|
yield {'a': 2, 'b': 3}
|
|
yield {'a': 3, 'b': 5}
|
|
yield {'a': 5, 'b': 8}
|
|
|
|
result = self.coll.insert_many(gen())
|
|
self.assertEqual(5, len(result.inserted_ids))
|
|
|
|
def test_bulk_write_no_results(self):
|
|
result = self.coll_w0.bulk_write([InsertOne({})])
|
|
self.assertFalse(result.acknowledged)
|
|
self.assertRaises(InvalidOperation, lambda: result.inserted_count)
|
|
self.assertRaises(InvalidOperation, lambda: result.matched_count)
|
|
self.assertRaises(InvalidOperation, lambda: result.modified_count)
|
|
self.assertRaises(InvalidOperation, lambda: result.deleted_count)
|
|
self.assertRaises(InvalidOperation, lambda: result.upserted_count)
|
|
self.assertRaises(InvalidOperation, lambda: result.upserted_ids)
|
|
|
|
def test_bulk_write_invalid_arguments(self):
|
|
# The requests argument must be a list.
|
|
generator = (InsertOne({}) for _ in range(10))
|
|
with self.assertRaises(TypeError):
|
|
self.coll.bulk_write(generator)
|
|
|
|
# Document is not wrapped in a bulk write operation.
|
|
with self.assertRaises(TypeError):
|
|
self.coll.bulk_write([{}])
|
|
|
|
def test_upsert_large(self):
|
|
big = 'a' * (client_context.max_bson_size - 37)
|
|
result = self.coll.bulk_write([
|
|
UpdateOne({'x': 1}, {'$set': {'s': big}}, upsert=True)])
|
|
self.assertEqualResponse(
|
|
{'nMatched': 0,
|
|
'nModified': 0,
|
|
'nUpserted': 1,
|
|
'nInserted': 0,
|
|
'nRemoved': 0,
|
|
'upserted': [{'index': 0, '_id': '...'}]},
|
|
result.bulk_api_result)
|
|
|
|
self.assertEqual(1, self.coll.count_documents({'x': 1}))
|
|
|
|
def test_client_generated_upsert_id(self):
|
|
result = self.coll.bulk_write([
|
|
UpdateOne({'_id': 0}, {'$set': {'a': 0}}, upsert=True),
|
|
ReplaceOne({'a': 1}, {'_id': 1}, upsert=True),
|
|
# This is just here to make the counts right in all cases.
|
|
ReplaceOne({'_id': 2}, {'_id': 2}, upsert=True),
|
|
])
|
|
self.assertEqualResponse(
|
|
{'nMatched': 0,
|
|
'nModified': 0,
|
|
'nUpserted': 3,
|
|
'nInserted': 0,
|
|
'nRemoved': 0,
|
|
'upserted': [{'index': 0, '_id': 0},
|
|
{'index': 1, '_id': 1},
|
|
{'index': 2, '_id': 2}]},
|
|
result.bulk_api_result)
|
|
|
|
def test_single_ordered_batch(self):
|
|
result = self.coll.bulk_write([
|
|
InsertOne({'a': 1}),
|
|
UpdateOne({'a': 1}, {'$set': {'b': 1}}),
|
|
UpdateOne({'a': 2}, {'$set': {'b': 2}}, upsert=True),
|
|
InsertOne({'a': 3}),
|
|
DeleteOne({'a': 3}),
|
|
])
|
|
self.assertEqualResponse(
|
|
{'nMatched': 1,
|
|
'nModified': 1,
|
|
'nUpserted': 1,
|
|
'nInserted': 2,
|
|
'nRemoved': 1,
|
|
'upserted': [{'index': 2, '_id': '...'}]},
|
|
result.bulk_api_result)
|
|
|
|
def test_single_error_ordered_batch(self):
|
|
self.coll.create_index('a', unique=True)
|
|
self.addCleanup(self.coll.drop_index, [('a', 1)])
|
|
requests = [
|
|
InsertOne({'b': 1, 'a': 1}),
|
|
UpdateOne({'b': 2}, {'$set': {'a': 1}}, upsert=True),
|
|
InsertOne({'b': 3, 'a': 2}),
|
|
]
|
|
try:
|
|
self.coll.bulk_write(requests)
|
|
except BulkWriteError as exc:
|
|
result = exc.details
|
|
self.assertEqual(exc.code, 65)
|
|
else:
|
|
self.fail("Error not raised")
|
|
|
|
self.assertEqualResponse(
|
|
{'nMatched': 0,
|
|
'nModified': 0,
|
|
'nUpserted': 0,
|
|
'nInserted': 1,
|
|
'nRemoved': 0,
|
|
'upserted': [],
|
|
'writeConcernErrors': [],
|
|
'writeErrors': [
|
|
{'index': 1,
|
|
'code': 11000,
|
|
'errmsg': '...',
|
|
'op': {'q': {'b': 2},
|
|
'u': {'$set': {'a': 1}},
|
|
'multi': False,
|
|
'upsert': True}}]},
|
|
result)
|
|
|
|
def test_multiple_error_ordered_batch(self):
|
|
self.coll.create_index('a', unique=True)
|
|
self.addCleanup(self.coll.drop_index, [('a', 1)])
|
|
requests = [
|
|
InsertOne({'b': 1, 'a': 1}),
|
|
UpdateOne({'b': 2}, {'$set': {'a': 1}}, upsert=True),
|
|
UpdateOne({'b': 3}, {'$set': {'a': 2}}, upsert=True),
|
|
UpdateOne({'b': 2}, {'$set': {'a': 1}}, upsert=True),
|
|
InsertOne({'b': 4, 'a': 3}),
|
|
InsertOne({'b': 5, 'a': 1}),
|
|
]
|
|
|
|
try:
|
|
self.coll.bulk_write(requests)
|
|
except BulkWriteError as exc:
|
|
result = exc.details
|
|
self.assertEqual(exc.code, 65)
|
|
else:
|
|
self.fail("Error not raised")
|
|
|
|
self.assertEqualResponse(
|
|
{'nMatched': 0,
|
|
'nModified': 0,
|
|
'nUpserted': 0,
|
|
'nInserted': 1,
|
|
'nRemoved': 0,
|
|
'upserted': [],
|
|
'writeConcernErrors': [],
|
|
'writeErrors': [
|
|
{'index': 1,
|
|
'code': 11000,
|
|
'errmsg': '...',
|
|
'op': {'q': {'b': 2},
|
|
'u': {'$set': {'a': 1}},
|
|
'multi': False,
|
|
'upsert': True}}]},
|
|
result)
|
|
|
|
def test_single_unordered_batch(self):
|
|
requests = [
|
|
InsertOne({'a': 1}),
|
|
UpdateOne({'a': 1}, {'$set': {'b': 1}}),
|
|
UpdateOne({'a': 2}, {'$set': {'b': 2}}, upsert=True),
|
|
InsertOne({'a': 3}),
|
|
DeleteOne({'a': 3}),
|
|
]
|
|
result = self.coll.bulk_write(requests, ordered=False)
|
|
self.assertEqualResponse(
|
|
{'nMatched': 1,
|
|
'nModified': 1,
|
|
'nUpserted': 1,
|
|
'nInserted': 2,
|
|
'nRemoved': 1,
|
|
'upserted': [{'index': 2, '_id': '...'}],
|
|
'writeErrors': [],
|
|
'writeConcernErrors': []},
|
|
result.bulk_api_result)
|
|
|
|
def test_single_error_unordered_batch(self):
|
|
self.coll.create_index('a', unique=True)
|
|
self.addCleanup(self.coll.drop_index, [('a', 1)])
|
|
requests = [
|
|
InsertOne({'b': 1, 'a': 1}),
|
|
UpdateOne({'b': 2}, {'$set': {'a': 1}}, upsert=True),
|
|
InsertOne({'b': 3, 'a': 2}),
|
|
]
|
|
|
|
try:
|
|
self.coll.bulk_write(requests, ordered=False)
|
|
except BulkWriteError as exc:
|
|
result = exc.details
|
|
self.assertEqual(exc.code, 65)
|
|
else:
|
|
self.fail("Error not raised")
|
|
|
|
self.assertEqualResponse(
|
|
{'nMatched': 0,
|
|
'nModified': 0,
|
|
'nUpserted': 0,
|
|
'nInserted': 2,
|
|
'nRemoved': 0,
|
|
'upserted': [],
|
|
'writeConcernErrors': [],
|
|
'writeErrors': [
|
|
{'index': 1,
|
|
'code': 11000,
|
|
'errmsg': '...',
|
|
'op': {'q': {'b': 2},
|
|
'u': {'$set': {'a': 1}},
|
|
'multi': False,
|
|
'upsert': True}}]},
|
|
result)
|
|
|
|
def test_multiple_error_unordered_batch(self):
|
|
self.coll.create_index('a', unique=True)
|
|
self.addCleanup(self.coll.drop_index, [('a', 1)])
|
|
requests = [
|
|
InsertOne({'b': 1, 'a': 1}),
|
|
UpdateOne({'b': 2}, {'$set': {'a': 3}}, upsert=True),
|
|
UpdateOne({'b': 3}, {'$set': {'a': 4}}, upsert=True),
|
|
UpdateOne({'b': 4}, {'$set': {'a': 3}}, upsert=True),
|
|
InsertOne({'b': 5, 'a': 2}),
|
|
InsertOne({'b': 6, 'a': 1}),
|
|
]
|
|
|
|
try:
|
|
self.coll.bulk_write(requests, ordered=False)
|
|
except BulkWriteError as exc:
|
|
result = exc.details
|
|
self.assertEqual(exc.code, 65)
|
|
else:
|
|
self.fail("Error not raised")
|
|
# Assume the update at index 1 runs before the update at index 3,
|
|
# although the spec does not require it. Same for inserts.
|
|
self.assertEqualResponse(
|
|
{'nMatched': 0,
|
|
'nModified': 0,
|
|
'nUpserted': 2,
|
|
'nInserted': 2,
|
|
'nRemoved': 0,
|
|
'upserted': [
|
|
{'index': 1, '_id': '...'},
|
|
{'index': 2, '_id': '...'}],
|
|
'writeConcernErrors': [],
|
|
'writeErrors': [
|
|
{'index': 3,
|
|
'code': 11000,
|
|
'errmsg': '...',
|
|
'op': {'q': {'b': 4},
|
|
'u': {'$set': {'a': 3}},
|
|
'multi': False,
|
|
'upsert': True}},
|
|
{'index': 5,
|
|
'code': 11000,
|
|
'errmsg': '...',
|
|
'op': {'_id': '...', 'b': 6, 'a': 1}}]},
|
|
result)
|
|
|
|
def test_large_inserts_ordered(self):
|
|
big = 'x' * client_context.max_bson_size
|
|
requests = [
|
|
InsertOne({'b': 1, 'a': 1}),
|
|
InsertOne({'big': big}),
|
|
InsertOne({'b': 2, 'a': 2}),
|
|
]
|
|
|
|
try:
|
|
self.coll.bulk_write(requests)
|
|
except BulkWriteError as exc:
|
|
result = exc.details
|
|
self.assertEqual(exc.code, 65)
|
|
else:
|
|
self.fail("Error not raised")
|
|
|
|
self.assertEqual(1, result['nInserted'])
|
|
|
|
self.coll.delete_many({})
|
|
|
|
big = 'x' * (1024 * 1024 * 4)
|
|
result = self.coll.bulk_write([
|
|
InsertOne({'a': 1, 'big': big}),
|
|
InsertOne({'a': 2, 'big': big}),
|
|
InsertOne({'a': 3, 'big': big}),
|
|
InsertOne({'a': 4, 'big': big}),
|
|
InsertOne({'a': 5, 'big': big}),
|
|
InsertOne({'a': 6, 'big': big}),
|
|
])
|
|
|
|
self.assertEqual(6, result.inserted_count)
|
|
self.assertEqual(6, self.coll.count_documents({}))
|
|
|
|
def test_large_inserts_unordered(self):
|
|
big = 'x' * client_context.max_bson_size
|
|
requests = [
|
|
InsertOne({'b': 1, 'a': 1}),
|
|
InsertOne({'big': big}),
|
|
InsertOne({'b': 2, 'a': 2}),
|
|
]
|
|
|
|
try:
|
|
self.coll.bulk_write(requests, ordered=False)
|
|
except BulkWriteError as exc:
|
|
result = exc.details
|
|
self.assertEqual(exc.code, 65)
|
|
else:
|
|
self.fail("Error not raised")
|
|
|
|
self.assertEqual(2, result['nInserted'])
|
|
|
|
self.coll.delete_many({})
|
|
|
|
big = 'x' * (1024 * 1024 * 4)
|
|
result = self.coll.bulk_write([
|
|
InsertOne({'a': 1, 'big': big}),
|
|
InsertOne({'a': 2, 'big': big}),
|
|
InsertOne({'a': 3, 'big': big}),
|
|
InsertOne({'a': 4, 'big': big}),
|
|
InsertOne({'a': 5, 'big': big}),
|
|
InsertOne({'a': 6, 'big': big}),
|
|
], ordered=False)
|
|
|
|
self.assertEqual(6, result.inserted_count)
|
|
self.assertEqual(6, self.coll.count_documents({}))
|
|
|
|
|
|
class BulkAuthorizationTestBase(BulkTestBase):
|
|
|
|
@classmethod
|
|
@client_context.require_auth
|
|
@client_context.require_no_api_version
|
|
def setUpClass(cls):
|
|
super(BulkAuthorizationTestBase, cls).setUpClass()
|
|
|
|
def setUp(self):
|
|
super(BulkAuthorizationTestBase, self).setUp()
|
|
client_context.create_user(
|
|
self.db.name, 'readonly', 'pw', ['read'])
|
|
self.db.command(
|
|
'createRole', 'noremove',
|
|
privileges=[{
|
|
'actions': ['insert', 'update', 'find'],
|
|
'resource': {'db': 'pymongo_test', 'collection': 'test'}
|
|
}],
|
|
roles=[])
|
|
|
|
client_context.create_user(self.db.name, 'noremove', 'pw', ['noremove'])
|
|
|
|
def tearDown(self):
|
|
self.db.command('dropRole', 'noremove')
|
|
remove_all_users(self.db)
|
|
|
|
|
|
class TestBulkUnacknowledged(BulkTestBase):
|
|
|
|
def tearDown(self):
|
|
self.coll.delete_many({})
|
|
|
|
def test_no_results_ordered_success(self):
|
|
requests = [
|
|
InsertOne({'a': 1}),
|
|
UpdateOne({'a': 3}, {'$set': {'b': 1}}, upsert=True),
|
|
InsertOne({'a': 2}),
|
|
DeleteOne({'a': 1}),
|
|
]
|
|
result = self.coll_w0.bulk_write(requests)
|
|
self.assertFalse(result.acknowledged)
|
|
wait_until(lambda: 2 == self.coll.count_documents({}),
|
|
'insert 2 documents')
|
|
wait_until(lambda: self.coll.find_one({'_id': 1}) is None,
|
|
'removed {"_id": 1}')
|
|
|
|
def test_no_results_ordered_failure(self):
|
|
requests = [
|
|
InsertOne({'_id': 1}),
|
|
UpdateOne({'_id': 3}, {'$set': {'b': 1}}, upsert=True),
|
|
InsertOne({'_id': 2}),
|
|
# Fails with duplicate key error.
|
|
InsertOne({'_id': 1}),
|
|
# Should not be executed since the batch is ordered.
|
|
DeleteOne({'_id': 1}),
|
|
]
|
|
result = self.coll_w0.bulk_write(requests)
|
|
self.assertFalse(result.acknowledged)
|
|
wait_until(lambda: 3 == self.coll.count_documents({}),
|
|
'insert 3 documents')
|
|
self.assertEqual({'_id': 1}, self.coll.find_one({'_id': 1}))
|
|
|
|
def test_no_results_unordered_success(self):
|
|
requests = [
|
|
InsertOne({'a': 1}),
|
|
UpdateOne({'a': 3}, {'$set': {'b': 1}}, upsert=True),
|
|
InsertOne({'a': 2}),
|
|
DeleteOne({'a': 1}),
|
|
]
|
|
result = self.coll_w0.bulk_write(requests, ordered=False)
|
|
self.assertFalse(result.acknowledged)
|
|
wait_until(lambda: 2 == self.coll.count_documents({}),
|
|
'insert 2 documents')
|
|
wait_until(lambda: self.coll.find_one({'_id': 1}) is None,
|
|
'removed {"_id": 1}')
|
|
|
|
def test_no_results_unordered_failure(self):
|
|
requests = [
|
|
InsertOne({'_id': 1}),
|
|
UpdateOne({'_id': 3}, {'$set': {'b': 1}}, upsert=True),
|
|
InsertOne({'_id': 2}),
|
|
# Fails with duplicate key error.
|
|
InsertOne({'_id': 1}),
|
|
# Should be executed since the batch is unordered.
|
|
DeleteOne({'_id': 1}),
|
|
]
|
|
result = self.coll_w0.bulk_write(requests, ordered=False)
|
|
self.assertFalse(result.acknowledged)
|
|
wait_until(lambda: 2 == self.coll.count_documents({}),
|
|
'insert 2 documents')
|
|
wait_until(lambda: self.coll.find_one({'_id': 1}) is None,
|
|
'removed {"_id": 1}')
|
|
|
|
|
|
class TestBulkAuthorization(BulkAuthorizationTestBase):
|
|
|
|
def test_readonly(self):
|
|
# We test that an authorization failure aborts the batch and is raised
|
|
# as OperationFailure.
|
|
cli = rs_or_single_client_noauth(username='readonly', password='pw',
|
|
authSource='pymongo_test')
|
|
coll = cli.pymongo_test.test
|
|
coll.find_one()
|
|
self.assertRaises(OperationFailure, coll.bulk_write,
|
|
[InsertOne({'x': 1})])
|
|
|
|
def test_no_remove(self):
|
|
# We test that an authorization failure aborts the batch and is raised
|
|
# as OperationFailure.
|
|
cli = rs_or_single_client_noauth(username='noremove', password='pw',
|
|
authSource='pymongo_test')
|
|
coll = cli.pymongo_test.test
|
|
coll.find_one()
|
|
requests = [
|
|
InsertOne({'x': 1}),
|
|
ReplaceOne({'x': 2}, {'x': 2}, upsert=True),
|
|
DeleteMany({}), # Prohibited.
|
|
InsertOne({'x': 3}), # Never attempted.
|
|
]
|
|
self.assertRaises(OperationFailure, coll.bulk_write, requests)
|
|
self.assertEqual(set([1, 2]), set(self.coll.distinct('x')))
|
|
|
|
|
|
class TestBulkWriteConcern(BulkTestBase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(TestBulkWriteConcern, cls).setUpClass()
|
|
cls.w = client_context.w
|
|
cls.secondary = None
|
|
if cls.w > 1:
|
|
for member in client_context.hello['hosts']:
|
|
if member != client_context.hello['primary']:
|
|
cls.secondary = single_client(*partition_node(member))
|
|
break
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
if cls.secondary:
|
|
cls.secondary.close()
|
|
|
|
def cause_wtimeout(self, requests, ordered):
|
|
if not client_context.test_commands_enabled:
|
|
self.skipTest("Test commands must be enabled.")
|
|
|
|
# Use the rsSyncApplyStop failpoint to pause replication on a
|
|
# secondary which will cause a wtimeout error.
|
|
self.secondary.admin.command('configureFailPoint',
|
|
'rsSyncApplyStop',
|
|
mode='alwaysOn')
|
|
|
|
try:
|
|
coll = self.coll.with_options(
|
|
write_concern=WriteConcern(w=self.w, wtimeout=1))
|
|
return coll.bulk_write(requests, ordered=ordered)
|
|
finally:
|
|
self.secondary.admin.command('configureFailPoint',
|
|
'rsSyncApplyStop',
|
|
mode='off')
|
|
|
|
@client_context.require_replica_set
|
|
@client_context.require_secondaries_count(1)
|
|
def test_write_concern_failure_ordered(self):
|
|
# Ensure we don't raise on wnote.
|
|
coll_ww = self.coll.with_options(write_concern=WriteConcern(w=self.w))
|
|
result = coll_ww.bulk_write([
|
|
DeleteOne({"something": "that does no exist"})])
|
|
self.assertTrue(result.acknowledged)
|
|
|
|
requests = [
|
|
InsertOne({'a': 1}),
|
|
InsertOne({'a': 2})
|
|
]
|
|
# Replication wtimeout is a 'soft' error.
|
|
# It shouldn't stop batch processing.
|
|
try:
|
|
self.cause_wtimeout(requests, ordered=True)
|
|
except BulkWriteError as exc:
|
|
result = exc.details
|
|
self.assertEqual(exc.code, 65)
|
|
else:
|
|
self.fail("Error not raised")
|
|
|
|
self.assertEqualResponse(
|
|
{'nMatched': 0,
|
|
'nModified': 0,
|
|
'nUpserted': 0,
|
|
'nInserted': 2,
|
|
'nRemoved': 0,
|
|
'upserted': [],
|
|
'writeErrors': []},
|
|
result)
|
|
|
|
# When talking to legacy servers there will be a
|
|
# write concern error for each operation.
|
|
self.assertTrue(len(result['writeConcernErrors']) > 0)
|
|
|
|
failed = result['writeConcernErrors'][0]
|
|
self.assertEqual(64, failed['code'])
|
|
self.assertTrue(isinstance(failed['errmsg'], str))
|
|
|
|
self.coll.delete_many({})
|
|
self.coll.create_index('a', unique=True)
|
|
self.addCleanup(self.coll.drop_index, [('a', 1)])
|
|
|
|
# Fail due to write concern support as well
|
|
# as duplicate key error on ordered batch.
|
|
requests = [
|
|
InsertOne({'a': 1}),
|
|
ReplaceOne({'a': 3}, {'b': 1}, upsert=True),
|
|
InsertOne({'a': 1}),
|
|
InsertOne({'a': 2}),
|
|
]
|
|
try:
|
|
self.cause_wtimeout(requests, ordered=True)
|
|
except BulkWriteError as exc:
|
|
result = exc.details
|
|
self.assertEqual(exc.code, 65)
|
|
else:
|
|
self.fail("Error not raised")
|
|
|
|
self.assertEqualResponse(
|
|
{'nMatched': 0,
|
|
'nModified': 0,
|
|
'nUpserted': 1,
|
|
'nInserted': 1,
|
|
'nRemoved': 0,
|
|
'upserted': [{'index': 1, '_id': '...'}],
|
|
'writeErrors': [
|
|
{'index': 2,
|
|
'code': 11000,
|
|
'errmsg': '...',
|
|
'op': {'_id': '...', 'a': 1}}]},
|
|
result)
|
|
|
|
self.assertTrue(len(result['writeConcernErrors']) > 1)
|
|
failed = result['writeErrors'][0]
|
|
self.assertTrue("duplicate" in failed['errmsg'])
|
|
|
|
@client_context.require_replica_set
|
|
@client_context.require_secondaries_count(1)
|
|
def test_write_concern_failure_unordered(self):
|
|
# Ensure we don't raise on wnote.
|
|
coll_ww = self.coll.with_options(write_concern=WriteConcern(w=self.w))
|
|
result = coll_ww.bulk_write([
|
|
DeleteOne({"something": "that does no exist"})], ordered=False)
|
|
self.assertTrue(result.acknowledged)
|
|
|
|
requests = [
|
|
InsertOne({'a': 1}),
|
|
UpdateOne({'a': 3}, {'$set': {'a': 3, 'b': 1}}, upsert=True),
|
|
InsertOne({'a': 2}),
|
|
]
|
|
# Replication wtimeout is a 'soft' error.
|
|
# It shouldn't stop batch processing.
|
|
try:
|
|
self.cause_wtimeout(requests, ordered=False)
|
|
except BulkWriteError as exc:
|
|
result = exc.details
|
|
self.assertEqual(exc.code, 65)
|
|
else:
|
|
self.fail("Error not raised")
|
|
|
|
self.assertEqual(2, result['nInserted'])
|
|
self.assertEqual(1, result['nUpserted'])
|
|
self.assertEqual(0, len(result['writeErrors']))
|
|
# When talking to legacy servers there will be a
|
|
# write concern error for each operation.
|
|
self.assertTrue(len(result['writeConcernErrors']) > 1)
|
|
|
|
self.coll.delete_many({})
|
|
self.coll.create_index('a', unique=True)
|
|
self.addCleanup(self.coll.drop_index, [('a', 1)])
|
|
|
|
# Fail due to write concern support as well
|
|
# as duplicate key error on unordered batch.
|
|
requests = [
|
|
InsertOne({'a': 1}),
|
|
UpdateOne({'a': 3}, {'$set': {'a': 3, 'b': 1}}, upsert=True),
|
|
InsertOne({'a': 1}),
|
|
InsertOne({'a': 2}),
|
|
]
|
|
try:
|
|
self.cause_wtimeout(requests, ordered=False)
|
|
except BulkWriteError as exc:
|
|
result = exc.details
|
|
self.assertEqual(exc.code, 65)
|
|
else:
|
|
self.fail("Error not raised")
|
|
|
|
self.assertEqual(2, result['nInserted'])
|
|
self.assertEqual(1, result['nUpserted'])
|
|
self.assertEqual(1, len(result['writeErrors']))
|
|
# When talking to legacy servers there will be a
|
|
# write concern error for each operation.
|
|
self.assertTrue(len(result['writeConcernErrors']) > 1)
|
|
|
|
failed = result['writeErrors'][0]
|
|
self.assertEqual(2, failed['index'])
|
|
self.assertEqual(11000, failed['code'])
|
|
self.assertTrue(isinstance(failed['errmsg'], str))
|
|
self.assertEqual(1, failed['op']['a'])
|
|
|
|
failed = result['writeConcernErrors'][0]
|
|
self.assertEqual(64, failed['code'])
|
|
self.assertTrue(isinstance(failed['errmsg'], str))
|
|
|
|
upserts = result['upserted']
|
|
self.assertEqual(1, len(upserts))
|
|
self.assertEqual(1, upserts[0]['index'])
|
|
self.assertTrue(upserts[0].get('_id'))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|