PYTHON-981 - Implement ReadConcern.
This commit is contained in:
parent
5396444f3c
commit
dde4a658b5
@ -21,6 +21,7 @@ from pymongo import common
|
||||
from pymongo.errors import ConfigurationError
|
||||
from pymongo.monitoring import _EventListeners
|
||||
from pymongo.pool import PoolOptions
|
||||
from pymongo.read_concern import ReadConcern
|
||||
from pymongo.read_preferences import make_read_preference
|
||||
from pymongo.ssl_support import get_ssl_context
|
||||
from pymongo.write_concern import WriteConcern
|
||||
@ -55,6 +56,12 @@ def _parse_write_concern(options):
|
||||
return WriteConcern(concern, wtimeout, j, fsync)
|
||||
|
||||
|
||||
def _parse_read_concern(options):
|
||||
"""Parse read concern options."""
|
||||
concern = options.get('readconcernlevel')
|
||||
return ReadConcern(concern)
|
||||
|
||||
|
||||
def _parse_ssl_options(options):
|
||||
"""Parse ssl options."""
|
||||
use_ssl = options.get('ssl')
|
||||
@ -122,6 +129,7 @@ class ClientOptions(object):
|
||||
self.__read_preference = _parse_read_preference(options)
|
||||
self.__replica_set_name = options.get('replicaset')
|
||||
self.__write_concern = _parse_write_concern(options)
|
||||
self.__read_concern = _parse_read_concern(options)
|
||||
self.__connect = options.get('connect')
|
||||
|
||||
@property
|
||||
@ -173,3 +181,8 @@ class ClientOptions(object):
|
||||
def write_concern(self):
|
||||
"""A :class:`~pymongo.write_concern.WriteConcern` instance."""
|
||||
return self.__write_concern
|
||||
|
||||
@property
|
||||
def read_concern(self):
|
||||
"""A :class:`~pymongo.read_concern.ReadConcern` instance."""
|
||||
return self.__read_concern
|
||||
|
||||
@ -35,6 +35,7 @@ from pymongo.cursor import Cursor
|
||||
from pymongo.errors import ConfigurationError, InvalidName, OperationFailure
|
||||
from pymongo.helpers import _check_write_command_response
|
||||
from pymongo.operations import _WriteOp, IndexModel
|
||||
from pymongo.read_concern import DEFAULT_READ_CONCERN
|
||||
from pymongo.read_preferences import ReadPreference
|
||||
from pymongo.results import (BulkWriteResult,
|
||||
DeleteResult,
|
||||
@ -71,7 +72,8 @@ class Collection(common.BaseObject):
|
||||
"""
|
||||
|
||||
def __init__(self, database, name, create=False, codec_options=None,
|
||||
read_preference=None, write_concern=None, **kwargs):
|
||||
read_preference=None, write_concern=None, read_concern=None,
|
||||
**kwargs):
|
||||
"""Get / create a Mongo collection.
|
||||
|
||||
Raises :class:`TypeError` if `name` is not an instance of
|
||||
@ -100,9 +102,15 @@ class Collection(common.BaseObject):
|
||||
- `write_concern` (optional): An instance of
|
||||
:class:`~pymongo.write_concern.WriteConcern`. If ``None`` (the
|
||||
default) database.write_concern is used.
|
||||
- `read_concern` (optional): An instance of
|
||||
:class:`~pymongo.read_concern.ReadConcern`. If ``None`` (the
|
||||
default) database.read_concern is used.
|
||||
- `**kwargs` (optional): additional keyword arguments will
|
||||
be passed as options for the create collection command
|
||||
|
||||
.. versionchanged:: 3.2
|
||||
Added the read_concern option.
|
||||
|
||||
.. versionchanged:: 3.0
|
||||
Added the codec_options, read_preference, and write_concern options.
|
||||
Removed the uuid_subtype attribute.
|
||||
@ -128,7 +136,8 @@ class Collection(common.BaseObject):
|
||||
super(Collection, self).__init__(
|
||||
codec_options or database.codec_options,
|
||||
read_preference or database.read_preference,
|
||||
write_concern or database.write_concern)
|
||||
write_concern or database.write_concern,
|
||||
read_concern or database.read_concern)
|
||||
|
||||
if not isinstance(name, string_type):
|
||||
raise TypeError("name must be an instance "
|
||||
@ -164,7 +173,8 @@ class Collection(common.BaseObject):
|
||||
|
||||
def _command(self, sock_info, command, slave_ok=False,
|
||||
read_preference=None,
|
||||
codec_options=None, check=True, allowable_errors=None):
|
||||
codec_options=None, check=True, allowable_errors=None,
|
||||
read_concern=DEFAULT_READ_CONCERN):
|
||||
"""Internal command helper.
|
||||
|
||||
:Parameters:
|
||||
@ -175,6 +185,8 @@ class Collection(common.BaseObject):
|
||||
:class:`~bson.codec_options.CodecOptions`.
|
||||
- `check`: raise OperationFailure if there are errors
|
||||
- `allowable_errors`: errors to ignore if `check` is True
|
||||
- `read_concern` (optional) - An instance of
|
||||
:class:`~pymongo.read_concern.ReadConcern`.
|
||||
|
||||
:Returns:
|
||||
|
||||
@ -188,7 +200,8 @@ class Collection(common.BaseObject):
|
||||
read_preference or self.read_preference,
|
||||
codec_options or self.codec_options,
|
||||
check,
|
||||
allowable_errors)
|
||||
allowable_errors,
|
||||
read_concern=read_concern)
|
||||
|
||||
def __create(self, options):
|
||||
"""Sends a create command with the given options.
|
||||
@ -254,7 +267,8 @@ class Collection(common.BaseObject):
|
||||
return self.__database
|
||||
|
||||
def with_options(
|
||||
self, codec_options=None, read_preference=None, write_concern=None):
|
||||
self, codec_options=None, read_preference=None,
|
||||
write_concern=None, read_concern=None):
|
||||
"""Get a clone of this collection changing the specified settings.
|
||||
|
||||
>>> coll1.read_preference
|
||||
@ -279,13 +293,18 @@ class Collection(common.BaseObject):
|
||||
:class:`~pymongo.write_concern.WriteConcern`. If ``None`` (the
|
||||
default) the :attr:`write_concern` of this :class:`Collection`
|
||||
is used.
|
||||
- `read_concern` (optional): An instance of
|
||||
:class:`~pymongo.read_concern.ReadConcern`. If ``None`` (the
|
||||
default) the :attr:`read_concern` of this :class:`Collection`
|
||||
is used.
|
||||
"""
|
||||
return Collection(self.__database,
|
||||
self.__name,
|
||||
False,
|
||||
codec_options or self.codec_options,
|
||||
read_preference or self.read_preference,
|
||||
write_concern or self.write_concern)
|
||||
write_concern or self.write_concern,
|
||||
read_concern or self.read_concern)
|
||||
|
||||
def initialize_unordered_bulk_op(self):
|
||||
"""Initialize an unordered batch of write operations.
|
||||
@ -1059,7 +1078,8 @@ class Collection(common.BaseObject):
|
||||
('numCursors', num_cursors)])
|
||||
|
||||
with self._socket_for_reads() as (sock_info, slave_ok):
|
||||
result = self._command(sock_info, cmd, slave_ok)
|
||||
result = self._command(sock_info, cmd, slave_ok,
|
||||
read_concern=self.read_concern)
|
||||
|
||||
return [CommandCursor(self, cursor['cursor'], sock_info.address)
|
||||
for cursor in result['cursors']]
|
||||
@ -1068,7 +1088,8 @@ class Collection(common.BaseObject):
|
||||
"""Internal count helper."""
|
||||
with self._socket_for_reads() as (sock_info, slave_ok):
|
||||
res = self._command(sock_info, cmd, slave_ok,
|
||||
allowable_errors=["ns missing"])
|
||||
allowable_errors=["ns missing"],
|
||||
read_concern=self.read_concern)
|
||||
if res.get("errmsg", "") == "ns missing":
|
||||
return 0
|
||||
return int(res["n"])
|
||||
@ -1522,7 +1543,8 @@ class Collection(common.BaseObject):
|
||||
|
||||
cmd.update(kwargs)
|
||||
|
||||
result = self._command(sock_info, cmd, slave_ok)
|
||||
result = self._command(sock_info, cmd, slave_ok,
|
||||
read_concern=self.read_concern)
|
||||
|
||||
if "cursor" in result:
|
||||
cursor = result["cursor"]
|
||||
@ -1653,7 +1675,8 @@ class Collection(common.BaseObject):
|
||||
kwargs["query"] = filter
|
||||
cmd.update(kwargs)
|
||||
with self._socket_for_reads() as (sock_info, slave_ok):
|
||||
return self._command(sock_info, cmd, slave_ok)["values"]
|
||||
return self._command(sock_info, cmd, slave_ok,
|
||||
read_concern=self.read_concern)["values"]
|
||||
|
||||
def map_reduce(self, map, reduce, out, full_response=False, **kwargs):
|
||||
"""Perform a map/reduce operation on this collection.
|
||||
|
||||
@ -25,6 +25,7 @@ from bson.py3compat import string_type, integer_types, iteritems
|
||||
from pymongo.auth import MECHANISMS
|
||||
from pymongo.errors import ConfigurationError
|
||||
from pymongo.monitoring import _validate_event_listeners
|
||||
from pymongo.read_concern import ReadConcern
|
||||
from pymongo.read_preferences import (read_pref_mode_from_name,
|
||||
_ServerMode)
|
||||
from pymongo.ssl_support import validate_cert_reqs
|
||||
@ -423,6 +424,7 @@ VALIDATORS = {
|
||||
'ssl_cert_reqs': validate_cert_reqs,
|
||||
'ssl_ca_certs': validate_readable,
|
||||
'ssl_match_hostname': validate_boolean_or_string,
|
||||
'readconcernlevel': validate_string_or_none,
|
||||
'read_preference': validate_read_preference,
|
||||
'readpreference': validate_read_preference_mode,
|
||||
'readpreferencetags': validate_read_preference_tags,
|
||||
@ -495,7 +497,8 @@ class BaseObject(object):
|
||||
SHOULD NOT BE USED BY DEVELOPERS EXTERNAL TO MONGODB.
|
||||
"""
|
||||
|
||||
def __init__(self, codec_options, read_preference, write_concern):
|
||||
def __init__(self, codec_options, read_preference, write_concern,
|
||||
read_concern):
|
||||
|
||||
if not isinstance(codec_options, CodecOptions):
|
||||
raise TypeError("codec_options must be an instance of "
|
||||
@ -513,6 +516,11 @@ class BaseObject(object):
|
||||
"pymongo.write_concern.WriteConcern")
|
||||
self.__write_concern = write_concern
|
||||
|
||||
if not isinstance(read_concern, ReadConcern):
|
||||
raise TypeError("read_concern must be an instance of "
|
||||
"pymongo.read_concern.ReadConcern")
|
||||
self.__read_concern = read_concern
|
||||
|
||||
@property
|
||||
def codec_options(self):
|
||||
"""Read only access to the :class:`~bson.codec_options.CodecOptions`
|
||||
@ -538,3 +546,11 @@ class BaseObject(object):
|
||||
The :attr:`read_preference` attribute is now read only.
|
||||
"""
|
||||
return self.__read_preference
|
||||
|
||||
@property
|
||||
def read_concern(self):
|
||||
"""Read only access to the read concern of this instance.
|
||||
|
||||
.. versionadded:: 3.2
|
||||
"""
|
||||
return self.__read_concern
|
||||
|
||||
@ -184,6 +184,7 @@ class Cursor(object):
|
||||
|
||||
self.__codec_options = collection.codec_options
|
||||
self.__read_preference = collection.read_preference
|
||||
self.__read_concern = collection.read_concern
|
||||
|
||||
self.__query_flags = cursor_type
|
||||
if self.__read_preference != ReadPreference.PRIMARY:
|
||||
@ -985,7 +986,8 @@ class Cursor(object):
|
||||
self.__codec_options,
|
||||
self.__read_preference,
|
||||
self.__limit,
|
||||
self.__batch_size))
|
||||
self.__batch_size,
|
||||
self.__read_concern))
|
||||
if not self.__id:
|
||||
self.__killed = True
|
||||
elif self.__id: # Get More
|
||||
|
||||
@ -17,7 +17,7 @@
|
||||
import warnings
|
||||
|
||||
from bson.code import Code
|
||||
from bson.codec_options import CodecOptions
|
||||
from bson.codec_options import CodecOptions, DEFAULT_CODEC_OPTIONS
|
||||
from bson.dbref import DBRef
|
||||
from bson.objectid import ObjectId
|
||||
from bson.py3compat import iteritems, string_type, _unicode
|
||||
@ -51,8 +51,8 @@ class Database(common.BaseObject):
|
||||
"""A Mongo database.
|
||||
"""
|
||||
|
||||
def __init__(self, client, name, codec_options=None,
|
||||
read_preference=None, write_concern=None):
|
||||
def __init__(self, client, name, codec_options=None, read_preference=None,
|
||||
write_concern=None, read_concern=None):
|
||||
"""Get a database by client and name.
|
||||
|
||||
Raises :class:`TypeError` if `name` is not an instance of
|
||||
@ -71,9 +71,15 @@ class Database(common.BaseObject):
|
||||
- `write_concern` (optional): An instance of
|
||||
:class:`~pymongo.write_concern.WriteConcern`. If ``None`` (the
|
||||
default) client.write_concern is used.
|
||||
- `read_concern` (optional): An instance of
|
||||
:class:`~pymongo.read_concern.ReadConcern`. If ``None`` (the
|
||||
default) client.read_concern is used.
|
||||
|
||||
.. mongodoc:: databases
|
||||
|
||||
.. versionchanged:: 3.2
|
||||
Added the read_concern option.
|
||||
|
||||
.. versionchanged:: 3.0
|
||||
Added the codec_options, read_preference, and write_concern options.
|
||||
:class:`~pymongo.database.Database` no longer returns an instance
|
||||
@ -89,7 +95,8 @@ class Database(common.BaseObject):
|
||||
super(Database, self).__init__(
|
||||
codec_options or client.codec_options,
|
||||
read_preference or client.read_preference,
|
||||
write_concern or client.write_concern)
|
||||
write_concern or client.write_concern,
|
||||
read_concern or client.read_concern)
|
||||
|
||||
if not isinstance(name, string_type):
|
||||
raise TypeError("name must be an instance "
|
||||
@ -225,8 +232,8 @@ class Database(common.BaseObject):
|
||||
"""
|
||||
return Collection(self, name)
|
||||
|
||||
def get_collection(self, name, codec_options=None,
|
||||
read_preference=None, write_concern=None):
|
||||
def get_collection(self, name, codec_options=None, read_preference=None,
|
||||
write_concern=None, read_concern=None):
|
||||
"""Get a :class:`~pymongo.collection.Collection` with the given name
|
||||
and options.
|
||||
|
||||
@ -259,12 +266,18 @@ class Database(common.BaseObject):
|
||||
:class:`~pymongo.write_concern.WriteConcern`. If ``None`` (the
|
||||
default) the :attr:`write_concern` of this :class:`Database` is
|
||||
used.
|
||||
- `read_concern` (optional): An instance of
|
||||
:class:`~pymongo.read_concern.ReadConcern`. If ``None`` (the
|
||||
default) the :attr:`read_concern` of this :class:`Database` is
|
||||
used.
|
||||
"""
|
||||
return Collection(
|
||||
self, name, False, codec_options, read_preference, write_concern)
|
||||
self, name, False, codec_options, read_preference,
|
||||
write_concern, read_concern)
|
||||
|
||||
def create_collection(self, name, codec_options=None,
|
||||
read_preference=None, write_concern=None, **kwargs):
|
||||
read_preference=None, write_concern=None,
|
||||
read_concern=None, **kwargs):
|
||||
"""Create a new :class:`~pymongo.collection.Collection` in this
|
||||
database.
|
||||
|
||||
@ -298,6 +311,10 @@ class Database(common.BaseObject):
|
||||
:class:`~pymongo.write_concern.WriteConcern`. If ``None`` (the
|
||||
default) the :attr:`write_concern` of this :class:`Database` is
|
||||
used.
|
||||
- `read_concern` (optional): An instance of
|
||||
:class:`~pymongo.read_concern.ReadConcern`. If ``None`` (the
|
||||
default) the :attr:`read_concern` of this :class:`Database` is
|
||||
used.
|
||||
- `**kwargs` (optional): additional keyword arguments will
|
||||
be passed as options for the create collection command
|
||||
|
||||
@ -311,7 +328,8 @@ class Database(common.BaseObject):
|
||||
raise CollectionInvalid("collection %s already exists" % name)
|
||||
|
||||
return Collection(self, name, True, codec_options,
|
||||
read_preference, write_concern, **kwargs)
|
||||
read_preference, write_concern,
|
||||
read_concern, **kwargs)
|
||||
|
||||
def _apply_incoming_manipulators(self, son, collection):
|
||||
"""Apply incoming manipulators to `son`."""
|
||||
@ -351,7 +369,7 @@ class Database(common.BaseObject):
|
||||
|
||||
def _command(self, sock_info, command, slave_ok=False, value=1, check=True,
|
||||
allowable_errors=None, read_preference=ReadPreference.PRIMARY,
|
||||
codec_options=CodecOptions(), **kwargs):
|
||||
codec_options=DEFAULT_CODEC_OPTIONS, **kwargs):
|
||||
"""Internal command helper."""
|
||||
if isinstance(command, string_type):
|
||||
command = SON([(command, value)])
|
||||
@ -367,7 +385,7 @@ class Database(common.BaseObject):
|
||||
|
||||
def command(self, command, value=1, check=True,
|
||||
allowable_errors=None, read_preference=ReadPreference.PRIMARY,
|
||||
codec_options=CodecOptions(), **kwargs):
|
||||
codec_options=DEFAULT_CODEC_OPTIONS, **kwargs):
|
||||
"""Issue a MongoDB command.
|
||||
|
||||
Send command `command` to the database and return the
|
||||
|
||||
@ -32,6 +32,7 @@ from pymongo.errors import (CursorNotFound,
|
||||
WriteConcernError,
|
||||
WTimeoutError)
|
||||
from pymongo.message import _Query, _convert_exception
|
||||
from pymongo.read_concern import DEFAULT_READ_CONCERN
|
||||
|
||||
|
||||
_UUNDER = u("_")
|
||||
@ -240,7 +241,7 @@ def _first_batch(sock_info, db, coll, query, ntoreturn,
|
||||
"""Simple query helper for retrieving a first (and possibly only) batch."""
|
||||
query = _Query(
|
||||
0, db, coll, 0, ntoreturn, query, None,
|
||||
codec_options, read_preference, 0, ntoreturn)
|
||||
codec_options, read_preference, 0, 0, DEFAULT_READ_CONCERN)
|
||||
|
||||
name = next(iter(cmd))
|
||||
duration = None
|
||||
|
||||
@ -34,6 +34,7 @@ try:
|
||||
except ImportError:
|
||||
_use_c = False
|
||||
from pymongo.errors import DocumentTooLarge, InvalidOperation, OperationFailure
|
||||
from pymongo.read_concern import DEFAULT_READ_CONCERN
|
||||
from pymongo.read_preferences import ReadPreference
|
||||
|
||||
|
||||
@ -155,14 +156,18 @@ _MODIFIERS = SON([
|
||||
|
||||
|
||||
def _gen_explain_command(
|
||||
coll, spec, projection, skip, limit, batch_size, options):
|
||||
coll, spec, projection, skip, limit, batch_size,
|
||||
options, read_concern):
|
||||
"""Generate an explain command document."""
|
||||
cmd = _gen_find_command(
|
||||
coll, spec, projection, skip, limit, batch_size, options)
|
||||
if read_concern.level:
|
||||
return SON([('explain', cmd), ('readConcern', read_concern.document)])
|
||||
return SON([('explain', cmd)])
|
||||
|
||||
|
||||
def _gen_find_command(coll, spec, projection, skip, limit, batch_size, options):
|
||||
def _gen_find_command(coll, spec, projection, skip, limit, batch_size,
|
||||
options, read_concern=DEFAULT_READ_CONCERN):
|
||||
"""Generate a find command document."""
|
||||
cmd = SON([('find', coll)])
|
||||
if '$query' in spec:
|
||||
@ -181,6 +186,8 @@ def _gen_find_command(coll, spec, projection, skip, limit, batch_size, options):
|
||||
cmd['singleBatch'] = True
|
||||
if batch_size:
|
||||
cmd['batchSize'] = batch_size
|
||||
if read_concern.level:
|
||||
cmd['readConcern'] = read_concern.document
|
||||
|
||||
if options:
|
||||
cmd.update([(opt, True)
|
||||
@ -205,10 +212,11 @@ class _Query(object):
|
||||
|
||||
__slots__ = ('flags', 'db', 'coll', 'ntoskip', 'ntoreturn', 'spec',
|
||||
'fields', 'codec_options', 'read_preference', 'limit',
|
||||
'batch_size', 'name')
|
||||
'batch_size', 'name', 'read_concern')
|
||||
|
||||
def __init__(self, flags, db, coll, ntoskip, ntoreturn, spec, fields,
|
||||
codec_options, read_preference, limit, batch_size):
|
||||
codec_options, read_preference, limit,
|
||||
batch_size, read_concern):
|
||||
self.flags = flags
|
||||
self.db = db
|
||||
self.coll = coll
|
||||
@ -218,6 +226,7 @@ class _Query(object):
|
||||
self.fields = fields
|
||||
self.codec_options = codec_options
|
||||
self.read_preference = read_preference
|
||||
self.read_concern = read_concern
|
||||
self.limit = limit
|
||||
self.batch_size = batch_size
|
||||
self.name = 'find'
|
||||
@ -231,10 +240,11 @@ class _Query(object):
|
||||
self.name = 'explain'
|
||||
return _gen_explain_command(
|
||||
self.coll, self.spec, self.fields, self.ntoskip,
|
||||
self.limit, self.batch_size, self.flags), self.db
|
||||
self.limit, self.batch_size, self.flags,
|
||||
self.read_concern), self.db
|
||||
return _gen_find_command(
|
||||
self.coll, self.spec, self.fields, self.ntoskip, self.limit,
|
||||
self.batch_size, self.flags), self.db
|
||||
self.batch_size, self.flags, self.read_concern), self.db
|
||||
|
||||
def get_message(self, set_slave_ok, is_mongos, use_cmd=False):
|
||||
"""Get a query message, possibly setting the slaveOk bit."""
|
||||
|
||||
@ -223,6 +223,15 @@ class MongoClient(common.BaseObject):
|
||||
this to ``False`` as that could make your application vulnerable to
|
||||
man-in-the-middle attacks.
|
||||
|
||||
| **Read Concern options:**
|
||||
| (If not set explicitly, this will use the server default)
|
||||
|
||||
- `readConcernLevel`: (string) The read concern level specifies the
|
||||
level of isolation for read operations. For example, a read
|
||||
operation using a read concern level of ``majority`` will only
|
||||
return data that has been written to a majority of nodes. If the
|
||||
level is left unspecified, the server default will be used.
|
||||
|
||||
.. mongodoc:: connections
|
||||
|
||||
.. versionchanged:: 3.0
|
||||
@ -345,7 +354,8 @@ class MongoClient(common.BaseObject):
|
||||
|
||||
super(MongoClient, self).__init__(options.codec_options,
|
||||
options.read_preference,
|
||||
options.write_concern)
|
||||
options.write_concern,
|
||||
options.read_concern)
|
||||
|
||||
self.__all_credentials = {}
|
||||
creds = options.credentials
|
||||
@ -1036,8 +1046,8 @@ class MongoClient(common.BaseObject):
|
||||
|
||||
return self[self.__default_database_name]
|
||||
|
||||
def get_database(self, name, codec_options=None,
|
||||
read_preference=None, write_concern=None):
|
||||
def get_database(self, name, codec_options=None, read_preference=None,
|
||||
write_concern=None, read_concern=None):
|
||||
"""Get a :class:`~pymongo.database.Database` with the given name and
|
||||
options.
|
||||
|
||||
@ -1070,9 +1080,14 @@ class MongoClient(common.BaseObject):
|
||||
:class:`~pymongo.write_concern.WriteConcern`. If ``None`` (the
|
||||
default) the :attr:`write_concern` of this :class:`MongoClient` is
|
||||
used.
|
||||
- `read_concern` (optional): An instance of
|
||||
:class:`~pymongo.read_concern.ReadConcern`. If ``None`` (the
|
||||
default) the :attr:`read_concern` of this :class:`MongoClient` is
|
||||
used.
|
||||
"""
|
||||
return database.Database(
|
||||
self, name, codec_options, read_preference, write_concern)
|
||||
self, name, codec_options, read_preference,
|
||||
write_concern, read_concern)
|
||||
|
||||
@property
|
||||
def is_locked(self):
|
||||
|
||||
@ -31,6 +31,7 @@ except ImportError:
|
||||
|
||||
from pymongo import helpers, message
|
||||
from pymongo.errors import AutoReconnect, NotMasterError, OperationFailure
|
||||
from pymongo.read_concern import DEFAULT_READ_CONCERN
|
||||
|
||||
_UNPACK_INT = struct.Struct("<i").unpack
|
||||
|
||||
@ -38,7 +39,8 @@ _UNPACK_INT = struct.Struct("<i").unpack
|
||||
def command(sock, dbname, spec, slave_ok, is_mongos,
|
||||
read_preference, codec_options, check=True,
|
||||
allowable_errors=None, address=None,
|
||||
check_keys=False, listeners=None, max_bson_size=None):
|
||||
check_keys=False, listeners=None, max_bson_size=None,
|
||||
read_concern=DEFAULT_READ_CONCERN):
|
||||
"""Execute a command over the socket, or raise socket.error.
|
||||
|
||||
:Parameters:
|
||||
@ -55,6 +57,7 @@ def command(sock, dbname, spec, slave_ok, is_mongos,
|
||||
- `check_keys`: if True, check `spec` for invalid keys
|
||||
- `listeners`: An instance of :class:`~pymongo.monitoring.EventListeners`
|
||||
- `max_bson_size`: The maximum encoded bson size for this server
|
||||
- `read_concern`: The read concern for this command.
|
||||
"""
|
||||
name = next(iter(spec))
|
||||
ns = dbname + '.$cmd'
|
||||
@ -63,6 +66,8 @@ def command(sock, dbname, spec, slave_ok, is_mongos,
|
||||
orig = spec
|
||||
if is_mongos:
|
||||
spec = message._maybe_add_read_preference(spec, read_preference)
|
||||
if read_concern.level:
|
||||
spec['readConcern'] = read_concern.document
|
||||
|
||||
publish = listeners is not None and listeners.enabled_for_commands
|
||||
if publish:
|
||||
|
||||
@ -22,6 +22,7 @@ from bson.py3compat import u, itervalues
|
||||
from pymongo import auth, helpers, thread_util
|
||||
from pymongo.errors import (AutoReconnect,
|
||||
ConnectionFailure,
|
||||
ConfigurationError,
|
||||
DocumentTooLarge,
|
||||
NetworkTimeout,
|
||||
NotMasterError,
|
||||
@ -31,6 +32,7 @@ from pymongo.monotonic import time as _time
|
||||
from pymongo.network import (command,
|
||||
receive_message,
|
||||
socket_closed)
|
||||
from pymongo.read_concern import DEFAULT_READ_CONCERN
|
||||
from pymongo.read_preferences import ReadPreference
|
||||
from pymongo.server_type import SERVER_TYPE
|
||||
|
||||
@ -181,7 +183,8 @@ class SocketInfo(object):
|
||||
def command(self, dbname, spec, slave_ok=False,
|
||||
read_preference=ReadPreference.PRIMARY,
|
||||
codec_options=DEFAULT_CODEC_OPTIONS, check=True,
|
||||
allowable_errors=None, check_keys=False):
|
||||
allowable_errors=None, check_keys=False,
|
||||
read_concern=DEFAULT_READ_CONCERN):
|
||||
"""Execute a command or raise ConnectionFailure or OperationFailure.
|
||||
|
||||
:Parameters:
|
||||
@ -193,12 +196,19 @@ class SocketInfo(object):
|
||||
- `check`: raise OperationFailure if there are errors
|
||||
- `allowable_errors`: errors to ignore if `check` is True
|
||||
- `check_keys`: if True, check `spec` for invalid keys
|
||||
- `read_concern`: The read concern for this command.
|
||||
"""
|
||||
if self.max_wire_version < 4 and not read_concern.ok_for_legacy:
|
||||
raise ConfigurationError(
|
||||
'read concern level of %s is not valid '
|
||||
'with a max wire version of %d.'
|
||||
% (read_concern.level, self.max_wire_version))
|
||||
try:
|
||||
return command(self.sock, dbname, spec, slave_ok,
|
||||
self.is_mongos, read_preference, codec_options,
|
||||
check, allowable_errors, self.address,
|
||||
check_keys, self.listeners, self.max_bson_size)
|
||||
check_keys, self.listeners, self.max_bson_size,
|
||||
read_concern)
|
||||
except OperationFailure:
|
||||
raise
|
||||
# Catch socket.error, KeyboardInterrupt, etc. and close ourselves.
|
||||
|
||||
76
pymongo/read_concern.py
Normal file
76
pymongo/read_concern.py
Normal file
@ -0,0 +1,76 @@
|
||||
# Copyright 2015 MongoDB, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License",
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Tools for working with read concerns."""
|
||||
|
||||
from bson.py3compat import string_type
|
||||
|
||||
|
||||
class ReadConcern(object):
|
||||
"""ReadConcern
|
||||
|
||||
:Parameters:
|
||||
- `level`: (string) The read concern level specifies the level of
|
||||
isolation for read operations. For example, a read operation using a
|
||||
read concern level of ``majority`` will only return data that has been
|
||||
written to a majority of nodes. If the level is left unspecified, the
|
||||
server default will be used.
|
||||
|
||||
.. versionadded:: 3.2
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, level=None):
|
||||
if level is None or isinstance(level, string_type):
|
||||
self.__level = level
|
||||
else:
|
||||
raise TypeError(
|
||||
'level must be a string or None.')
|
||||
|
||||
@property
|
||||
def level(self):
|
||||
"""The read concern level."""
|
||||
return self.__level
|
||||
|
||||
@property
|
||||
def ok_for_legacy(self):
|
||||
"""Return ``True`` if this read concern is compatible with
|
||||
old wire protocol versions."""
|
||||
return self.level is None or self.level == 'local'
|
||||
|
||||
@property
|
||||
def document(self):
|
||||
"""The document representation of this read concern.
|
||||
|
||||
.. note::
|
||||
:class:`ReadConcern` is immutable. Mutating the value of
|
||||
:attr:`document` does not mutate this :class:`ReadConcern`.
|
||||
"""
|
||||
doc = {}
|
||||
if self.__level:
|
||||
doc['level'] = self.level
|
||||
return doc
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, ReadConcern):
|
||||
return self.document == other.document
|
||||
return NotImplemented
|
||||
|
||||
def __repr__(self):
|
||||
if self.level:
|
||||
return 'ReadConcern(%s)' % self.level
|
||||
return 'ReadConcern()'
|
||||
|
||||
|
||||
DEFAULT_READ_CONCERN = ReadConcern()
|
||||
@ -18,7 +18,8 @@ import contextlib
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from pymongo.message import _convert_exception
|
||||
from pymongo.errors import ConfigurationError
|
||||
from pymongo.message import _Query, _convert_exception
|
||||
from pymongo.response import Response, ExhaustResponse
|
||||
from pymongo.server_type import SERVER_TYPE
|
||||
|
||||
@ -91,7 +92,17 @@ class Server(object):
|
||||
if publish:
|
||||
start = datetime.now()
|
||||
|
||||
use_find_cmd = (sock_info.max_wire_version >= 4 and not exhaust)
|
||||
use_find_cmd = False
|
||||
if sock_info.max_wire_version >= 4:
|
||||
if not exhaust:
|
||||
use_find_cmd = True
|
||||
elif (isinstance(operation, _Query) and
|
||||
not operation.read_concern.ok_for_legacy):
|
||||
raise ConfigurationError(
|
||||
'read concern level of %s is not valid '
|
||||
'with a max wire version of %d.'
|
||||
% (operation.read_concern.level,
|
||||
sock_info.max_wire_version))
|
||||
|
||||
message = operation.get_message(
|
||||
set_slave_okay, sock_info.is_mongos, use_find_cmd)
|
||||
|
||||
@ -19,8 +19,6 @@ import os
|
||||
import re
|
||||
import sys
|
||||
|
||||
from collections import defaultdict
|
||||
|
||||
sys.path[0:0] = [""]
|
||||
|
||||
import pymongo
|
||||
@ -31,7 +29,7 @@ from pymongo.errors import OperationFailure
|
||||
from pymongo.read_preferences import make_read_preference
|
||||
from pymongo.write_concern import WriteConcern
|
||||
from test import unittest, client_context
|
||||
from test.utils import single_client, wait_until
|
||||
from test.utils import single_client, wait_until, EventListener
|
||||
|
||||
# Location of JSON test specifications.
|
||||
_TEST_PATH = os.path.join(
|
||||
@ -45,21 +43,6 @@ def camel_to_snake(camel):
|
||||
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', snake).lower()
|
||||
|
||||
|
||||
class EventListener(monitoring.CommandListener):
|
||||
|
||||
def __init__(self):
|
||||
self.results = defaultdict(list)
|
||||
|
||||
def started(self, event):
|
||||
self.results['started'].append(event)
|
||||
|
||||
def succeeded(self, event):
|
||||
self.results['succeeded'].append(event)
|
||||
|
||||
def failed(self, event):
|
||||
self.results['failed'].append(event)
|
||||
|
||||
|
||||
class TestAllScenarios(unittest.TestCase):
|
||||
|
||||
@classmethod
|
||||
|
||||
@ -42,6 +42,7 @@ from pymongo.errors import (CollectionInvalid,
|
||||
ExecutionTimeout,
|
||||
InvalidName,
|
||||
OperationFailure)
|
||||
from pymongo.read_concern import ReadConcern
|
||||
from pymongo.read_preferences import ReadPreference
|
||||
from pymongo.write_concern import WriteConcern
|
||||
from test import (client_context,
|
||||
@ -95,12 +96,15 @@ class TestDatabaseNoConnect(unittest.TestCase):
|
||||
def test_get_collection(self):
|
||||
codec_options = CodecOptions(tz_aware=True)
|
||||
write_concern = WriteConcern(w=2, j=True)
|
||||
read_concern = ReadConcern('majority')
|
||||
coll = self.client.pymongo_test.get_collection(
|
||||
'foo', codec_options, ReadPreference.SECONDARY, write_concern)
|
||||
'foo', codec_options, ReadPreference.SECONDARY, write_concern,
|
||||
read_concern)
|
||||
self.assertEqual('foo', coll.name)
|
||||
self.assertEqual(codec_options, coll.codec_options)
|
||||
self.assertEqual(ReadPreference.SECONDARY, coll.read_preference)
|
||||
self.assertEqual(write_concern, coll.write_concern)
|
||||
self.assertEqual(read_concern, coll.read_concern)
|
||||
|
||||
def test_getattr(self):
|
||||
db = self.client.pymongo_test
|
||||
|
||||
@ -17,8 +17,6 @@ import sys
|
||||
import time
|
||||
import warnings
|
||||
|
||||
from collections import defaultdict
|
||||
|
||||
sys.path[0:0] = [""]
|
||||
|
||||
from bson.objectid import ObjectId
|
||||
@ -30,22 +28,7 @@ from pymongo.errors import NotMasterError, OperationFailure
|
||||
from pymongo.read_preferences import ReadPreference
|
||||
from pymongo.write_concern import WriteConcern
|
||||
from test import unittest, client_context, client_knobs
|
||||
from test.utils import single_client, wait_until
|
||||
|
||||
|
||||
class EventListener(monitoring.CommandListener):
|
||||
|
||||
def __init__(self):
|
||||
self.results = defaultdict(list)
|
||||
|
||||
def started(self, event):
|
||||
self.results['started'].append(event)
|
||||
|
||||
def succeeded(self, event):
|
||||
self.results['succeeded'].append(event)
|
||||
|
||||
def failed(self, event):
|
||||
self.results['failed'].append(event)
|
||||
from test.utils import single_client, wait_until, EventListener
|
||||
|
||||
|
||||
class TestCommandMonitoring(unittest.TestCase):
|
||||
|
||||
112
test/test_read_concern.py
Normal file
112
test/test_read_concern.py
Normal file
@ -0,0 +1,112 @@
|
||||
# Copyright 2015 MongoDB, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Test the read_concern module."""
|
||||
|
||||
import pymongo
|
||||
|
||||
from bson.son import SON
|
||||
from pymongo import monitoring
|
||||
from pymongo.errors import ConfigurationError
|
||||
from pymongo.read_concern import ReadConcern
|
||||
|
||||
from test import client_context, pair, unittest
|
||||
from test.utils import single_client, EventListener
|
||||
|
||||
|
||||
class TestReadConcern(unittest.TestCase):
|
||||
|
||||
@classmethod
|
||||
@client_context.require_connection
|
||||
def setUpClass(cls):
|
||||
cls.listener = EventListener()
|
||||
cls.saved_listeners = monitoring._LISTENERS
|
||||
# Don't use any global subscribers.
|
||||
monitoring._LISTENERS = monitoring._Listeners([])
|
||||
cls.client = single_client(event_listeners=[cls.listener])
|
||||
cls.db = cls.client.pymongo_test
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
monitoring._LISTENERS = cls.saved_listeners
|
||||
|
||||
def tearDown(self):
|
||||
self.db.coll.drop()
|
||||
self.listener.results.clear()
|
||||
|
||||
def test_read_concern(self):
|
||||
rc = ReadConcern()
|
||||
self.assertIsNone(rc.level)
|
||||
self.assertTrue(rc.ok_for_legacy)
|
||||
|
||||
rc = ReadConcern('majority')
|
||||
self.assertEqual('majority', rc.level)
|
||||
self.assertFalse(rc.ok_for_legacy)
|
||||
|
||||
rc = ReadConcern('local')
|
||||
self.assertEqual('local', rc.level)
|
||||
self.assertTrue(rc.ok_for_legacy)
|
||||
|
||||
self.assertRaises(TypeError, ReadConcern, 42)
|
||||
|
||||
def test_read_concern_uri(self):
|
||||
uri = 'mongodb://%s/?readConcernLevel=majority' % (pair,)
|
||||
client = pymongo.MongoClient(uri)
|
||||
self.assertEqual(ReadConcern('majority'), client.read_concern)
|
||||
|
||||
@client_context.require_version_max(3, 1)
|
||||
def test_invalid_read_concern(self):
|
||||
coll = self.db.get_collection(
|
||||
'coll', read_concern=ReadConcern('majority'))
|
||||
self.assertRaisesRegexp(
|
||||
ConfigurationError,
|
||||
'read concern level of majority is not valid '
|
||||
'with a max wire version of [0-3]',
|
||||
coll.count)
|
||||
|
||||
@client_context.require_version_min(3, 1, 9, -1)
|
||||
def test_find_command(self):
|
||||
# readConcern not sent in command if not specified.
|
||||
coll = self.db.coll
|
||||
tuple(coll.find({'field': 'value'}))
|
||||
self.assertNotIn('readConcern',
|
||||
self.listener.results['started'][0].command)
|
||||
|
||||
self.listener.results.clear()
|
||||
|
||||
# Explicitly set readConcern to 'local'.
|
||||
coll = self.db.get_collection('coll', read_concern=ReadConcern('local'))
|
||||
tuple(coll.find({'field': 'value'}))
|
||||
self.assertEqual(
|
||||
SON([('find', 'coll'),
|
||||
('filter', {'field': 'value'}),
|
||||
('readConcern', {'level': 'local'})]),
|
||||
self.listener.results['started'][0].command)
|
||||
|
||||
@client_context.require_version_min(3, 1, 9, -1)
|
||||
def test_command_cursor(self):
|
||||
# readConcern not sent in command if not specified.
|
||||
coll = self.db.coll
|
||||
tuple(coll.aggregate([{'$match': {'field': 'value'}}]))
|
||||
self.assertNotIn('readConcern',
|
||||
self.listener.results['started'][0].command)
|
||||
|
||||
self.listener.results.clear()
|
||||
|
||||
# Explicitly set readConcern to 'local'.
|
||||
coll = self.db.get_collection('coll', read_concern=ReadConcern('local'))
|
||||
tuple(coll.aggregate([{'$match': {'field': 'value'}}]))
|
||||
self.assertEqual(
|
||||
{'level': 'local'},
|
||||
self.listener.results['started'][0].command['readConcern'])
|
||||
@ -22,9 +22,11 @@ import sys
|
||||
import threading
|
||||
import time
|
||||
import warnings
|
||||
|
||||
from collections import defaultdict
|
||||
from functools import partial
|
||||
|
||||
from pymongo import MongoClient
|
||||
from pymongo import MongoClient, monitoring
|
||||
from pymongo.errors import AutoReconnect, OperationFailure
|
||||
from pymongo.server_selectors import (any_server_selector,
|
||||
writable_server_selector)
|
||||
@ -37,6 +39,21 @@ from test import (client_context,
|
||||
from test.version import Version
|
||||
|
||||
|
||||
class EventListener(monitoring.CommandListener):
|
||||
|
||||
def __init__(self):
|
||||
self.results = defaultdict(list)
|
||||
|
||||
def started(self, event):
|
||||
self.results['started'].append(event)
|
||||
|
||||
def succeeded(self, event):
|
||||
self.results['succeeded'].append(event)
|
||||
|
||||
def failed(self, event):
|
||||
self.results['failed'].append(event)
|
||||
|
||||
|
||||
def _connection_string_noauth(h, p):
|
||||
if h.startswith("mongodb://"):
|
||||
return h
|
||||
|
||||
Loading…
Reference in New Issue
Block a user