PYTHON-1101 - Support sending writeConcern for commands that write.

This commit is contained in:
Luke Lovett 2016-08-02 13:54:55 -07:00
parent b057cd47e8
commit b36a4345fd
11 changed files with 270 additions and 43 deletions

View File

@ -178,7 +178,9 @@ class Collection(common.BaseObject):
def _command(self, sock_info, command, slave_ok=False,
read_preference=None,
codec_options=None, check=True, allowable_errors=None,
read_concern=DEFAULT_READ_CONCERN):
read_concern=DEFAULT_READ_CONCERN,
write_concern=None,
parse_write_concern_error=False):
"""Internal command helper.
:Parameters:
@ -191,6 +193,11 @@ class Collection(common.BaseObject):
- `allowable_errors`: errors to ignore if `check` is True
- `read_concern` (optional) - An instance of
:class:`~pymongo.read_concern.ReadConcern`.
- `write_concern`: An instance of
:class:`~pymongo.write_concern.WriteConcern`. This option is only
valid for MongoDB 3.4 and above.
- `parse_write_concern_error` (optional): Whether to parse a
``writeConcernError`` field in the command response.
:Returns:
@ -198,14 +205,17 @@ class Collection(common.BaseObject):
(result document, address of server the command was run on)
"""
return sock_info.command(self.__database.name,
command,
slave_ok,
read_preference or self.read_preference,
codec_options or self.codec_options,
check,
allowable_errors,
read_concern=read_concern)
return sock_info.command(
self.__database.name,
command,
slave_ok,
read_preference or self.read_preference,
codec_options or self.codec_options,
check,
allowable_errors,
read_concern=read_concern,
write_concern=write_concern,
parse_write_concern_error=parse_write_concern_error)
def __create(self, options):
"""Sends a create command with the given options.
@ -217,7 +227,9 @@ class Collection(common.BaseObject):
cmd.update(options)
with self._socket_for_writes() as sock_info:
self._command(
sock_info, cmd, read_preference=ReadPreference.PRIMARY)
sock_info, cmd, read_preference=ReadPreference.PRIMARY,
write_concern=self.write_concern,
parse_write_concern_error=True)
def __getattr__(self, name):
"""Get a sub-collection of this collection by name.
@ -1268,6 +1280,13 @@ class Collection(common.BaseObject):
introduced in MongoDB **2.6** and cannot be used with earlier
versions.
.. note:: The :attr:`~pymongo.collection.Collection.write_concern` of
this collection is automatically applied to this operation when using
MongoDB >= 3.4.
.. versionchanged:: 3.4
Apply this collection's write concern automatically to this operation
when connected to MongoDB >= 3.4.
.. versionadded:: 3.0
"""
if not isinstance(indexes, list):
@ -1285,7 +1304,9 @@ class Collection(common.BaseObject):
('indexes', list(gen_indexes()))])
with self._socket_for_writes() as sock_info:
self._command(
sock_info, cmd, read_preference=ReadPreference.PRIMARY)
sock_info, cmd, read_preference=ReadPreference.PRIMARY,
write_concern=self.write_concern,
parse_write_concern_error=True)
return names
def __create_index(self, keys, index_options):
@ -1303,7 +1324,9 @@ class Collection(common.BaseObject):
cmd = SON([('createIndexes', self.name), ('indexes', [index])])
try:
self._command(
sock_info, cmd, read_preference=ReadPreference.PRIMARY)
sock_info, cmd, read_preference=ReadPreference.PRIMARY,
write_concern=self.write_concern,
parse_write_concern_error=True)
except OperationFailure as exc:
if exc.code in common.COMMAND_NOT_FOUND_CODES:
index["ns"] = self.__full_name
@ -1374,6 +1397,10 @@ class Collection(common.BaseObject):
.. note:: `partialFilterExpression` requires server version **>= 3.2**
.. note:: The :attr:`~pymongo.collection.Collection.write_concern` of
this collection is automatically applied to this operation when using
MongoDB >= 3.4.
:Parameters:
- `keys`: a single key or a list of (key, direction)
pairs specifying the index to create
@ -1381,6 +1408,9 @@ class Collection(common.BaseObject):
options (see the above list) should be passed as keyword
arguments
.. versionchanged:: 3.4
Apply this collection's write concern automatically to this operation
when connected to MongoDB >= 3.4.
.. versionchanged:: 3.2
Added partialFilterExpression to support partial indexes.
.. versionchanged:: 3.0
@ -1436,6 +1466,15 @@ class Collection(common.BaseObject):
Can be used on non-existant collections or collections with no indexes.
Raises OperationFailure on an error.
.. note:: The :attr:`~pymongo.collection.Collection.write_concern` of
this collection is automatically applied to this operation when using
MongoDB >= 3.4.
.. versionchanged:: 3.4
Apply this collection's write concern automatically to this operation
when connected to MongoDB >= 3.4.
"""
self.__database.client._purge_index(self.__database.name, self.__name)
self.drop_index("*")
@ -1459,6 +1498,15 @@ class Collection(common.BaseObject):
:Parameters:
- `index_or_name`: index (or name of index) to drop
.. note:: The :attr:`~pymongo.collection.Collection.write_concern` of
this collection is automatically applied to this operation when using
MongoDB >= 3.4.
.. versionchanged:: 3.4
Apply this collection's write concern automatically to this operation
when connected to MongoDB >= 3.4.
"""
name = index_or_name
if isinstance(index_or_name, list):
@ -1474,7 +1522,9 @@ class Collection(common.BaseObject):
self._command(sock_info,
cmd,
read_preference=ReadPreference.PRIMARY,
allowable_errors=["ns not found"])
allowable_errors=["ns not found"],
write_concern=self.write_concern,
parse_write_concern_error=True)
def reindex(self):
"""Rebuilds all indexes on this collection.
@ -1482,11 +1532,22 @@ class Collection(common.BaseObject):
.. warning:: reindex blocks all other operations (indexes
are built in the foreground) and will be slow for large
collections.
.. note:: The :attr:`~pymongo.collection.Collection.write_concern` of
this collection is automatically applied to this operation when using
MongoDB >= 3.4.
.. versionchanged:: 3.4
Apply this collection's write concern automatically to this operation
when connected to MongoDB >= 3.4.
"""
cmd = SON([("reIndex", self.__name)])
with self._socket_for_writes() as sock_info:
return self._command(
sock_info, cmd, read_preference=ReadPreference.PRIMARY)
sock_info, cmd, read_preference=ReadPreference.PRIMARY,
write_concern=self.write_concern,
parse_write_concern_error=True)
def list_indexes(self):
"""Get a cursor over the index documents for this collection.
@ -1625,6 +1686,10 @@ class Collection(common.BaseObject):
use :meth:`~pymongo.database.Database.command` instead. An
example is included in the :ref:`aggregate-examples` documentation.
.. note:: The :attr:`~pymongo.collection.Collection.write_concern` of
this collection is automatically applied to this operation when using
MongoDB >= 3.4.
:Parameters:
- `pipeline`: a list of aggregation pipeline stages
- `**kwargs` (optional): See list of options above.
@ -1633,6 +1698,9 @@ class Collection(common.BaseObject):
A :class:`~pymongo.command_cursor.CommandCursor` over the result
set.
.. versionchanged:: 3.4
Apply this collection's write concern automatically to this operation
when connected to MongoDB >= 3.4.
.. versionchanged:: 3.0
The :meth:`aggregate` method always returns a CommandCursor. The
pipeline argument must be a list.
@ -1674,18 +1742,25 @@ class Collection(common.BaseObject):
if batch_size is not None:
kwargs["cursor"]["batchSize"] = batch_size
dollar_out = pipeline and '$out' in pipeline[-1]
if (sock_info.max_wire_version >= 5 and dollar_out and
self.write_concern):
cmd['writeConcern'] = self.write_concern.document
cmd.update(kwargs)
# Apply this Collection's read concern if $out is not in the
# pipeline.
if sock_info.max_wire_version >= 4 and 'readConcern' not in cmd:
if pipeline and '$out' in pipeline[-1]:
result = self._command(sock_info, cmd, slave_ok)
if dollar_out:
result = self._command(sock_info, cmd, slave_ok,
parse_write_concern_error=True)
else:
result = self._command(sock_info, cmd, slave_ok,
read_concern=self.read_concern)
else:
result = self._command(sock_info, cmd, slave_ok)
result = self._command(sock_info, cmd, slave_ok,
parse_write_concern_error=dollar_out)
if "cursor" in result:
cursor = result["cursor"]
@ -1764,6 +1839,15 @@ class Collection(common.BaseObject):
- `**kwargs` (optional): additional arguments to the rename command
may be passed as keyword arguments to this helper method
(i.e. ``dropTarget=True``)
.. note:: The :attr:`~pymongo.collection.Collection.write_concern` of
this collection is automatically applied to this operation when using
MongoDB >= 3.4.
.. versionchanged:: 3.4
Apply this collection's write concern automatically to this operation
when connected to MongoDB >= 3.4.
"""
if not isinstance(new_name, string_type):
raise TypeError("new_name must be an "
@ -1778,9 +1862,11 @@ class Collection(common.BaseObject):
new_name = "%s.%s" % (self.__database.name, new_name)
cmd = SON([("renameCollection", self.__full_name), ("to", new_name)])
cmd.update(kwargs)
with self._socket_for_writes() as sock_info:
sock_info.command('admin', cmd)
if sock_info.max_wire_version >= 5 and self.write_concern:
cmd['writeConcern'] = self.write_concern.document
cmd.update(kwargs)
sock_info.command('admin', cmd, parse_write_concern_error=True)
def distinct(self, key, filter=None, **kwargs):
"""Get a list of distinct values for `key` among all documents
@ -1848,6 +1934,14 @@ class Collection(common.BaseObject):
mapReduce on a secondary use the :meth:`inline_map_reduce` method
instead.
.. note:: The :attr:`~pymongo.collection.Collection.write_concern` of
this collection is automatically applied to this operation (if the
output is not inline) when using MongoDB >= 3.4.
.. versionchanged:: 3.4
Apply this collection's write concern automatically to this operation
when connected to MongoDB >= 3.4.
.. seealso:: :doc:`/examples/aggregation`
.. versionchanged:: 2.2
@ -1856,6 +1950,7 @@ class Collection(common.BaseObject):
.. _map reduce command: http://docs.mongodb.org/manual/reference/command/mapReduce/
.. mongodoc:: mapreduce
"""
if not isinstance(out, (string_type, collections.Mapping)):
raise TypeError("'out' must be an instance of "
@ -1865,17 +1960,24 @@ class Collection(common.BaseObject):
("map", map),
("reduce", reduce),
("out", out)])
cmd.update(kwargs)
inline = 'inline' in cmd['out']
with self._socket_for_primary_reads() as (sock_info, slave_ok):
if (sock_info.max_wire_version >= 5 and self.write_concern and
not inline):
cmd['writeConcern'] = self.write_concern.document
cmd.update(kwargs)
if (sock_info.max_wire_version >= 4 and 'readConcern' not in cmd and
'inline' in cmd['out']):
inline):
# No need to parse 'writeConcernError' here, since the command
# is an inline map reduce.
response = self._command(
sock_info, cmd, slave_ok, ReadPreference.PRIMARY,
read_concern=self.read_concern)
else:
response = self._command(
sock_info, cmd, slave_ok, ReadPreference.PRIMARY)
sock_info, cmd, slave_ok, ReadPreference.PRIMARY,
parse_write_concern_error=not inline)
if full_response or not response.get('result'):
return response

View File

@ -378,19 +378,27 @@ class Database(common.BaseObject):
def _command(self, sock_info, command, slave_ok=False, value=1, check=True,
allowable_errors=None, read_preference=ReadPreference.PRIMARY,
codec_options=DEFAULT_CODEC_OPTIONS, **kwargs):
codec_options=DEFAULT_CODEC_OPTIONS,
write_concern=None,
parse_write_concern_error=False, **kwargs):
"""Internal command helper."""
if isinstance(command, string_type):
command = SON([(command, value)])
if sock_info.max_wire_version >= 5 and write_concern:
command['writeConcern'] = write_concern.document
command.update(kwargs)
return sock_info.command(self.__name,
command,
slave_ok,
read_preference,
codec_options,
check,
allowable_errors)
return sock_info.command(
self.__name,
command,
slave_ok,
read_preference,
codec_options,
check,
allowable_errors,
parse_write_concern_error=parse_write_concern_error)
def command(self, command, value=1, check=True,
allowable_errors=None, read_preference=ReadPreference.PRIMARY,
@ -539,6 +547,15 @@ class Database(common.BaseObject):
:Parameters:
- `name_or_collection`: the name of a collection to drop or the
collection object itself
.. note:: The :attr:`~pymongo.database.Database.write_concern` of
this database is automatically applied to this operation when using
MongoDB >= 3.4.
.. versionchanged:: 3.4
Apply this database's write concern automatically to this operation
when connected to MongoDB >= 3.4.
"""
name = name_or_collection
if isinstance(name, Collection):
@ -550,7 +567,13 @@ class Database(common.BaseObject):
self.__client._purge_index(self.__name, name)
self.command("drop", _unicode(name), allowable_errors=["ns not found"])
with self.__client._socket_for_reads(
ReadPreference.PRIMARY) as (sock_info, slave_ok):
return self._command(
sock_info, 'drop', slave_ok, _unicode(name),
allowable_errors=['ns not found'],
write_concern=self.write_concern,
parse_write_concern_error=True)
def validate_collection(self, name_or_collection,
scandata=False, full=False):

View File

@ -145,7 +145,8 @@ def _unpack_response(response,
return result
def _check_command_response(response, msg=None, allowable_errors=None):
def _check_command_response(response, msg=None, allowable_errors=None,
parse_write_concern_error=False):
"""Check the response to a command for errors.
"""
if "ok" not in response:
@ -163,6 +164,10 @@ def _check_command_response(response, msg=None, allowable_errors=None):
response.get("code"),
response)
if parse_write_concern_error and 'writeConcernError' in response:
wce = response['writeConcernError']
raise WriteConcernError(wce['errmsg'], wce['code'], wce)
if not response["ok"]:
details = response

View File

@ -1107,6 +1107,15 @@ class MongoClient(common.BaseObject):
- `name_or_database`: the name of a database to drop, or a
:class:`~pymongo.database.Database` instance representing the
database to drop
.. note:: The :attr:`~pymongo.mongo_client.MongoClient.write_concern` of
this client is automatically applied to this operation when using
MongoDB >= 3.4.
.. versionchanged:: 3.4
Apply this client's write concern automatically to this operation
when connected to MongoDB >= 3.4.
"""
name = name_or_database
if isinstance(name, database.Database):
@ -1117,8 +1126,15 @@ class MongoClient(common.BaseObject):
"of %s or a Database" % (string_type.__name__,))
self._purge_index(name)
self[name].command("dropDatabase",
read_preference=ReadPreference.PRIMARY)
with self._socket_for_reads(
ReadPreference.PRIMARY) as (sock_info, slave_ok):
self[name]._command(
sock_info,
"dropDatabase",
slave_ok=slave_ok,
read_preference=ReadPreference.PRIMARY,
write_concern=self.write_concern,
parse_write_concern_error=True)
def get_default_database(self):
"""Get the database named in the MongoDB connection URI.

View File

@ -45,7 +45,8 @@ def command(sock, dbname, spec, slave_ok, is_mongos,
read_preference, codec_options, check=True,
allowable_errors=None, address=None,
check_keys=False, listeners=None, max_bson_size=None,
read_concern=DEFAULT_READ_CONCERN):
read_concern=DEFAULT_READ_CONCERN,
parse_write_concern_error=False):
"""Execute a command over the socket, or raise socket.error.
:Parameters:
@ -63,6 +64,8 @@ def command(sock, dbname, spec, slave_ok, is_mongos,
- `listeners`: An instance of :class:`~pymongo.monitoring.EventListeners`
- `max_bson_size`: The maximum encoded bson size for this server
- `read_concern`: The read concern for this command.
- `parse_write_concern_error`: Whether to parse the ``writeConcernError``
field in the command response.
"""
name = next(iter(spec))
ns = dbname + '.$cmd'
@ -99,7 +102,9 @@ def command(sock, dbname, spec, slave_ok, is_mongos,
response_doc = unpacked['data'][0]
if check:
helpers._check_command_response(response_doc, None, allowable_errors)
helpers._check_command_response(
response_doc, None, allowable_errors,
parse_write_concern_error=parse_write_concern_error)
except Exception as exc:
if publish:
duration = (datetime.datetime.now() - start) + encoding_duration

View File

@ -237,7 +237,9 @@ class SocketInfo(object):
read_preference=ReadPreference.PRIMARY,
codec_options=DEFAULT_CODEC_OPTIONS, check=True,
allowable_errors=None, check_keys=False,
read_concern=DEFAULT_READ_CONCERN):
read_concern=DEFAULT_READ_CONCERN,
write_concern=None,
parse_write_concern_error=False):
"""Execute a command or raise ConnectionFailure or OperationFailure.
:Parameters:
@ -250,18 +252,24 @@ class SocketInfo(object):
- `allowable_errors`: errors to ignore if `check` is True
- `check_keys`: if True, check `spec` for invalid keys
- `read_concern`: The read concern for this command.
- `write_concern`: The write concern for this command.
- `parse_write_concern_error`: Whether to parse the
``writeConcernError` field in the command response.
"""
if self.max_wire_version < 4 and not read_concern.ok_for_legacy:
raise ConfigurationError(
'read concern level of %s is not valid '
'with a max wire version of %d.'
% (read_concern.level, self.max_wire_version))
if self.max_wire_version >= 5 and write_concern:
spec['writeConcern'] = write_concern.document
try:
return command(self.sock, dbname, spec, slave_ok,
self.is_mongos, read_preference, codec_options,
check, allowable_errors, self.address,
check_keys, self.listeners, self.max_bson_size,
read_concern)
read_concern,
parse_write_concern_error=parse_write_concern_error)
except OperationFailure:
raise
# Catch socket.error, KeyboardInterrupt, etc. and close ourselves.

View File

@ -106,3 +106,5 @@ class WriteConcern(object):
def __ne__(self, other):
return self.document != other.document
def __bool__(self):
return bool(self.document)

View File

@ -40,7 +40,8 @@ from pymongo.errors import (AutoReconnect,
InvalidName,
OperationFailure,
NetworkTimeout,
InvalidURI)
InvalidURI,
WriteConcernError)
from pymongo.monitoring import (ServerHeartbeatListener,
ServerHeartbeatStartedEvent)
from pymongo.mongo_client import MongoClient
@ -64,7 +65,6 @@ from test import (client_context,
from test.pymongo_mocks import MockClient
from test.utils import (assertRaisesExactly,
delay,
HeartbeatEventListener,
remove_all_users,
server_is_master_with_slave,
get_pool,
@ -75,6 +75,7 @@ from test.utils import (assertRaisesExactly,
rs_or_single_client_noauth,
single_client,
lazy_client_trial,
IMPOSSIBLE_WRITE_CONCERN,
NTHREADS)
@ -462,6 +463,12 @@ class TestClient(IntegrationTest):
self.assertIn("pymongo_test", dbs)
self.assertIn("pymongo_test2", dbs)
self.client.drop_database("pymongo_test")
if client_context.version.at_least(3, 3, 9) and client_context.is_rs:
wc_client = rs_or_single_client(w=len(client_context.nodes) + 1)
with self.assertRaises(WriteConcernError):
wc_client.drop_database('pymongo_test2')
self.client.drop_database(self.client.pymongo_test2)
raise SkipTest("This test often fails due to SERVER-2329")

View File

@ -16,6 +16,7 @@
"""Test the collection module."""
import contextlib
import re
import sys
import threading
@ -59,7 +60,8 @@ from pymongo.write_concern import WriteConcern
from test.test_client import IntegrationTest
from test.utils import (is_mongos, enable_text_search, get_pool,
rs_or_single_client, single_client,
wait_until, EventListener)
wait_until, EventListener,
IMPOSSIBLE_WRITE_CONCERN)
from test import client_context, host, port, unittest
@ -124,6 +126,29 @@ class TestCollection(IntegrationTest):
def tearDownClass(cls):
cls.db.drop_collection("test_large_limit")
@contextlib.contextmanager
def write_concern_collection(self):
if client_context.version.at_least(3, 3, 9) and client_context.is_rs:
with self.assertRaises(WriteConcernError):
# Unsatisfiable write concern.
yield Collection(
self.db, 'test',
write_concern=WriteConcern(w=len(client_context.nodes) + 1))
else:
yield self.db.test
@client_context.require_version_min(3, 3, 9)
def test_create(self):
# No Exception.
db = client_context.rs_or_standalone_client.pymongo_test
db.create_test_no_wc.drop()
Collection(db, name='create_test_no_wc', create=True)
with self.assertRaises(OperationFailure):
Collection(
db, name='create-test-wc',
write_concern=IMPOSSIBLE_WRITE_CONCERN,
create=True)
def test_drop_nonexistent_collection(self):
self.db.drop_collection('test')
self.assertFalse('test' in self.db.collection_names())
@ -181,6 +206,9 @@ class TestCollection(IntegrationTest):
db.test.create_indexes,
[IndexModel('a', unique=True)])
with self.write_concern_collection() as coll:
coll.create_indexes([IndexModel('hello')])
def test_create_index(self):
db = self.db
@ -221,6 +249,9 @@ class TestCollection(IntegrationTest):
self.assertRaises(
DuplicateKeyError, db.test.create_index, 'a', unique=True)
with self.write_concern_collection() as coll:
coll.create_index([('hello', DESCENDING)])
def test_drop_index(self):
db = self.db
db.test.drop_indexes()
@ -247,6 +278,9 @@ class TestCollection(IntegrationTest):
self.assertEqual(len(db.test.index_information()), 2)
self.assertTrue("hello_1" in db.test.index_information())
with self.write_concern_collection() as coll:
coll.drop_index('hello_1')
def test_reindex(self):
db = self.db
db.drop_collection("test")
@ -273,6 +307,9 @@ class TestCollection(IntegrationTest):
else:
check_result(reindexed)
with self.write_concern_collection() as coll:
coll.reindex()
def test_list_indexes(self):
db = self.db
db.test.drop()
@ -1453,6 +1490,11 @@ class TestCollection(IntegrationTest):
self.assertTrue(isinstance(result, CommandCursor))
self.assertEqual([{'foo': [1, 2]}], list(result))
# Test write concern.
out_pipeline = [pipeline, {'$out': 'output-collection'}]
with self.write_concern_collection() as coll:
coll.aggregate(out_pipeline)
def test_aggregate_raw_bson(self):
db = self.db
db.drop_collection("test")
@ -1739,6 +1781,9 @@ class TestCollection(IntegrationTest):
self.assertRaises(OperationFailure, db.foo.rename, "test")
db.foo.rename("test", dropTarget=True)
with self.write_concern_collection() as coll:
coll.rename('foo')
def test_find_one(self):
db = self.db
db.drop_collection("test")
@ -2003,6 +2048,9 @@ class TestCollection(IntegrationTest):
full_response=True)
self.assertEqual(6, full_result["counts"]["emit"])
with self.write_concern_collection() as coll:
coll.map_reduce(map, reduce, 'output')
def test_messages_with_unicode_collection_names(self):
db = self.db

View File

@ -41,7 +41,8 @@ from pymongo.errors import (CollectionInvalid,
ConfigurationError,
ExecutionTimeout,
InvalidName,
OperationFailure)
OperationFailure,
WriteConcernError)
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import ReadPreference
from pymongo.write_concern import WriteConcern
@ -55,7 +56,8 @@ from test.utils import (ignore_deprecations,
remove_all_users,
rs_or_single_client_noauth,
rs_or_single_client,
server_started_with_auth)
server_started_with_auth,
IMPOSSIBLE_WRITE_CONCERN)
if PY3:
@ -219,6 +221,12 @@ class TestDatabase(IntegrationTest):
db.drop_collection(db.test.doesnotexist)
if client_context.version.at_least(3, 3, 9) and client_context.is_rs:
db_wc = Database(self.client, 'pymongo_test',
write_concern=IMPOSSIBLE_WRITE_CONCERN)
with self.assertRaises(WriteConcernError):
db_wc.drop_collection('test')
def test_validate_collection(self):
db = self.client.pymongo_test

View File

@ -39,6 +39,9 @@ from test import (client_context,
from test.version import Version
IMPOSSIBLE_WRITE_CONCERN = WriteConcern(w=len(client_context.nodes) + 1)
class EventListener(monitoring.CommandListener):
def __init__(self):