From dde4a658b567dcbc2a7385f28d79b59297f2f6b9 Mon Sep 17 00:00:00 2001 From: Luke Lovett Date: Fri, 18 Sep 2015 15:52:40 -0700 Subject: [PATCH] PYTHON-981 - Implement ReadConcern. --- pymongo/client_options.py | 13 ++++ pymongo/collection.py | 43 +++++++--- pymongo/common.py | 18 ++++- pymongo/cursor.py | 4 +- pymongo/database.py | 40 +++++++--- pymongo/helpers.py | 3 +- pymongo/message.py | 22 ++++-- pymongo/mongo_client.py | 23 +++++- pymongo/network.py | 7 +- pymongo/pool.py | 14 +++- pymongo/read_concern.py | 76 ++++++++++++++++++ pymongo/server.py | 15 +++- test/test_command_monitoring_spec.py | 19 +---- test/test_database.py | 6 +- test/test_monitoring.py | 19 +---- test/test_read_concern.py | 112 +++++++++++++++++++++++++++ test/utils.py | 19 ++++- 17 files changed, 376 insertions(+), 77 deletions(-) create mode 100644 pymongo/read_concern.py create mode 100644 test/test_read_concern.py diff --git a/pymongo/client_options.py b/pymongo/client_options.py index 815a8c060..732a04cee 100644 --- a/pymongo/client_options.py +++ b/pymongo/client_options.py @@ -21,6 +21,7 @@ from pymongo import common from pymongo.errors import ConfigurationError from pymongo.monitoring import _EventListeners from pymongo.pool import PoolOptions +from pymongo.read_concern import ReadConcern from pymongo.read_preferences import make_read_preference from pymongo.ssl_support import get_ssl_context from pymongo.write_concern import WriteConcern @@ -55,6 +56,12 @@ def _parse_write_concern(options): return WriteConcern(concern, wtimeout, j, fsync) +def _parse_read_concern(options): + """Parse read concern options.""" + concern = options.get('readconcernlevel') + return ReadConcern(concern) + + def _parse_ssl_options(options): """Parse ssl options.""" use_ssl = options.get('ssl') @@ -122,6 +129,7 @@ class ClientOptions(object): self.__read_preference = _parse_read_preference(options) self.__replica_set_name = options.get('replicaset') self.__write_concern = _parse_write_concern(options) + self.__read_concern = _parse_read_concern(options) self.__connect = options.get('connect') @property @@ -173,3 +181,8 @@ class ClientOptions(object): def write_concern(self): """A :class:`~pymongo.write_concern.WriteConcern` instance.""" return self.__write_concern + + @property + def read_concern(self): + """A :class:`~pymongo.read_concern.ReadConcern` instance.""" + return self.__read_concern diff --git a/pymongo/collection.py b/pymongo/collection.py index 0d038c498..5e83a772e 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -35,6 +35,7 @@ from pymongo.cursor import Cursor from pymongo.errors import ConfigurationError, InvalidName, OperationFailure from pymongo.helpers import _check_write_command_response from pymongo.operations import _WriteOp, IndexModel +from pymongo.read_concern import DEFAULT_READ_CONCERN from pymongo.read_preferences import ReadPreference from pymongo.results import (BulkWriteResult, DeleteResult, @@ -71,7 +72,8 @@ class Collection(common.BaseObject): """ def __init__(self, database, name, create=False, codec_options=None, - read_preference=None, write_concern=None, **kwargs): + read_preference=None, write_concern=None, read_concern=None, + **kwargs): """Get / create a Mongo collection. Raises :class:`TypeError` if `name` is not an instance of @@ -100,9 +102,15 @@ class Collection(common.BaseObject): - `write_concern` (optional): An instance of :class:`~pymongo.write_concern.WriteConcern`. If ``None`` (the default) database.write_concern is used. + - `read_concern` (optional): An instance of + :class:`~pymongo.read_concern.ReadConcern`. If ``None`` (the + default) database.read_concern is used. - `**kwargs` (optional): additional keyword arguments will be passed as options for the create collection command + .. versionchanged:: 3.2 + Added the read_concern option. + .. versionchanged:: 3.0 Added the codec_options, read_preference, and write_concern options. Removed the uuid_subtype attribute. @@ -128,7 +136,8 @@ class Collection(common.BaseObject): super(Collection, self).__init__( codec_options or database.codec_options, read_preference or database.read_preference, - write_concern or database.write_concern) + write_concern or database.write_concern, + read_concern or database.read_concern) if not isinstance(name, string_type): raise TypeError("name must be an instance " @@ -164,7 +173,8 @@ class Collection(common.BaseObject): def _command(self, sock_info, command, slave_ok=False, read_preference=None, - codec_options=None, check=True, allowable_errors=None): + codec_options=None, check=True, allowable_errors=None, + read_concern=DEFAULT_READ_CONCERN): """Internal command helper. :Parameters: @@ -175,6 +185,8 @@ class Collection(common.BaseObject): :class:`~bson.codec_options.CodecOptions`. - `check`: raise OperationFailure if there are errors - `allowable_errors`: errors to ignore if `check` is True + - `read_concern` (optional) - An instance of + :class:`~pymongo.read_concern.ReadConcern`. :Returns: @@ -188,7 +200,8 @@ class Collection(common.BaseObject): read_preference or self.read_preference, codec_options or self.codec_options, check, - allowable_errors) + allowable_errors, + read_concern=read_concern) def __create(self, options): """Sends a create command with the given options. @@ -254,7 +267,8 @@ class Collection(common.BaseObject): return self.__database def with_options( - self, codec_options=None, read_preference=None, write_concern=None): + self, codec_options=None, read_preference=None, + write_concern=None, read_concern=None): """Get a clone of this collection changing the specified settings. >>> coll1.read_preference @@ -279,13 +293,18 @@ class Collection(common.BaseObject): :class:`~pymongo.write_concern.WriteConcern`. If ``None`` (the default) the :attr:`write_concern` of this :class:`Collection` is used. + - `read_concern` (optional): An instance of + :class:`~pymongo.read_concern.ReadConcern`. If ``None`` (the + default) the :attr:`read_concern` of this :class:`Collection` + is used. """ return Collection(self.__database, self.__name, False, codec_options or self.codec_options, read_preference or self.read_preference, - write_concern or self.write_concern) + write_concern or self.write_concern, + read_concern or self.read_concern) def initialize_unordered_bulk_op(self): """Initialize an unordered batch of write operations. @@ -1059,7 +1078,8 @@ class Collection(common.BaseObject): ('numCursors', num_cursors)]) with self._socket_for_reads() as (sock_info, slave_ok): - result = self._command(sock_info, cmd, slave_ok) + result = self._command(sock_info, cmd, slave_ok, + read_concern=self.read_concern) return [CommandCursor(self, cursor['cursor'], sock_info.address) for cursor in result['cursors']] @@ -1068,7 +1088,8 @@ class Collection(common.BaseObject): """Internal count helper.""" with self._socket_for_reads() as (sock_info, slave_ok): res = self._command(sock_info, cmd, slave_ok, - allowable_errors=["ns missing"]) + allowable_errors=["ns missing"], + read_concern=self.read_concern) if res.get("errmsg", "") == "ns missing": return 0 return int(res["n"]) @@ -1522,7 +1543,8 @@ class Collection(common.BaseObject): cmd.update(kwargs) - result = self._command(sock_info, cmd, slave_ok) + result = self._command(sock_info, cmd, slave_ok, + read_concern=self.read_concern) if "cursor" in result: cursor = result["cursor"] @@ -1653,7 +1675,8 @@ class Collection(common.BaseObject): kwargs["query"] = filter cmd.update(kwargs) with self._socket_for_reads() as (sock_info, slave_ok): - return self._command(sock_info, cmd, slave_ok)["values"] + return self._command(sock_info, cmd, slave_ok, + read_concern=self.read_concern)["values"] def map_reduce(self, map, reduce, out, full_response=False, **kwargs): """Perform a map/reduce operation on this collection. diff --git a/pymongo/common.py b/pymongo/common.py index befaa1e72..dc1af4c6f 100644 --- a/pymongo/common.py +++ b/pymongo/common.py @@ -25,6 +25,7 @@ from bson.py3compat import string_type, integer_types, iteritems from pymongo.auth import MECHANISMS from pymongo.errors import ConfigurationError from pymongo.monitoring import _validate_event_listeners +from pymongo.read_concern import ReadConcern from pymongo.read_preferences import (read_pref_mode_from_name, _ServerMode) from pymongo.ssl_support import validate_cert_reqs @@ -423,6 +424,7 @@ VALIDATORS = { 'ssl_cert_reqs': validate_cert_reqs, 'ssl_ca_certs': validate_readable, 'ssl_match_hostname': validate_boolean_or_string, + 'readconcernlevel': validate_string_or_none, 'read_preference': validate_read_preference, 'readpreference': validate_read_preference_mode, 'readpreferencetags': validate_read_preference_tags, @@ -495,7 +497,8 @@ class BaseObject(object): SHOULD NOT BE USED BY DEVELOPERS EXTERNAL TO MONGODB. """ - def __init__(self, codec_options, read_preference, write_concern): + def __init__(self, codec_options, read_preference, write_concern, + read_concern): if not isinstance(codec_options, CodecOptions): raise TypeError("codec_options must be an instance of " @@ -513,6 +516,11 @@ class BaseObject(object): "pymongo.write_concern.WriteConcern") self.__write_concern = write_concern + if not isinstance(read_concern, ReadConcern): + raise TypeError("read_concern must be an instance of " + "pymongo.read_concern.ReadConcern") + self.__read_concern = read_concern + @property def codec_options(self): """Read only access to the :class:`~bson.codec_options.CodecOptions` @@ -538,3 +546,11 @@ class BaseObject(object): The :attr:`read_preference` attribute is now read only. """ return self.__read_preference + + @property + def read_concern(self): + """Read only access to the read concern of this instance. + + .. versionadded:: 3.2 + """ + return self.__read_concern diff --git a/pymongo/cursor.py b/pymongo/cursor.py index 300f07d0b..f0f732b44 100644 --- a/pymongo/cursor.py +++ b/pymongo/cursor.py @@ -184,6 +184,7 @@ class Cursor(object): self.__codec_options = collection.codec_options self.__read_preference = collection.read_preference + self.__read_concern = collection.read_concern self.__query_flags = cursor_type if self.__read_preference != ReadPreference.PRIMARY: @@ -985,7 +986,8 @@ class Cursor(object): self.__codec_options, self.__read_preference, self.__limit, - self.__batch_size)) + self.__batch_size, + self.__read_concern)) if not self.__id: self.__killed = True elif self.__id: # Get More diff --git a/pymongo/database.py b/pymongo/database.py index 50ca1f48d..9250cbb13 100644 --- a/pymongo/database.py +++ b/pymongo/database.py @@ -17,7 +17,7 @@ import warnings from bson.code import Code -from bson.codec_options import CodecOptions +from bson.codec_options import CodecOptions, DEFAULT_CODEC_OPTIONS from bson.dbref import DBRef from bson.objectid import ObjectId from bson.py3compat import iteritems, string_type, _unicode @@ -51,8 +51,8 @@ class Database(common.BaseObject): """A Mongo database. """ - def __init__(self, client, name, codec_options=None, - read_preference=None, write_concern=None): + def __init__(self, client, name, codec_options=None, read_preference=None, + write_concern=None, read_concern=None): """Get a database by client and name. Raises :class:`TypeError` if `name` is not an instance of @@ -71,9 +71,15 @@ class Database(common.BaseObject): - `write_concern` (optional): An instance of :class:`~pymongo.write_concern.WriteConcern`. If ``None`` (the default) client.write_concern is used. + - `read_concern` (optional): An instance of + :class:`~pymongo.read_concern.ReadConcern`. If ``None`` (the + default) client.read_concern is used. .. mongodoc:: databases + .. versionchanged:: 3.2 + Added the read_concern option. + .. versionchanged:: 3.0 Added the codec_options, read_preference, and write_concern options. :class:`~pymongo.database.Database` no longer returns an instance @@ -89,7 +95,8 @@ class Database(common.BaseObject): super(Database, self).__init__( codec_options or client.codec_options, read_preference or client.read_preference, - write_concern or client.write_concern) + write_concern or client.write_concern, + read_concern or client.read_concern) if not isinstance(name, string_type): raise TypeError("name must be an instance " @@ -225,8 +232,8 @@ class Database(common.BaseObject): """ return Collection(self, name) - def get_collection(self, name, codec_options=None, - read_preference=None, write_concern=None): + def get_collection(self, name, codec_options=None, read_preference=None, + write_concern=None, read_concern=None): """Get a :class:`~pymongo.collection.Collection` with the given name and options. @@ -259,12 +266,18 @@ class Database(common.BaseObject): :class:`~pymongo.write_concern.WriteConcern`. If ``None`` (the default) the :attr:`write_concern` of this :class:`Database` is used. + - `read_concern` (optional): An instance of + :class:`~pymongo.read_concern.ReadConcern`. If ``None`` (the + default) the :attr:`read_concern` of this :class:`Database` is + used. """ return Collection( - self, name, False, codec_options, read_preference, write_concern) + self, name, False, codec_options, read_preference, + write_concern, read_concern) def create_collection(self, name, codec_options=None, - read_preference=None, write_concern=None, **kwargs): + read_preference=None, write_concern=None, + read_concern=None, **kwargs): """Create a new :class:`~pymongo.collection.Collection` in this database. @@ -298,6 +311,10 @@ class Database(common.BaseObject): :class:`~pymongo.write_concern.WriteConcern`. If ``None`` (the default) the :attr:`write_concern` of this :class:`Database` is used. + - `read_concern` (optional): An instance of + :class:`~pymongo.read_concern.ReadConcern`. If ``None`` (the + default) the :attr:`read_concern` of this :class:`Database` is + used. - `**kwargs` (optional): additional keyword arguments will be passed as options for the create collection command @@ -311,7 +328,8 @@ class Database(common.BaseObject): raise CollectionInvalid("collection %s already exists" % name) return Collection(self, name, True, codec_options, - read_preference, write_concern, **kwargs) + read_preference, write_concern, + read_concern, **kwargs) def _apply_incoming_manipulators(self, son, collection): """Apply incoming manipulators to `son`.""" @@ -351,7 +369,7 @@ class Database(common.BaseObject): def _command(self, sock_info, command, slave_ok=False, value=1, check=True, allowable_errors=None, read_preference=ReadPreference.PRIMARY, - codec_options=CodecOptions(), **kwargs): + codec_options=DEFAULT_CODEC_OPTIONS, **kwargs): """Internal command helper.""" if isinstance(command, string_type): command = SON([(command, value)]) @@ -367,7 +385,7 @@ class Database(common.BaseObject): def command(self, command, value=1, check=True, allowable_errors=None, read_preference=ReadPreference.PRIMARY, - codec_options=CodecOptions(), **kwargs): + codec_options=DEFAULT_CODEC_OPTIONS, **kwargs): """Issue a MongoDB command. Send command `command` to the database and return the diff --git a/pymongo/helpers.py b/pymongo/helpers.py index 48deb89b7..e5caf8ac3 100644 --- a/pymongo/helpers.py +++ b/pymongo/helpers.py @@ -32,6 +32,7 @@ from pymongo.errors import (CursorNotFound, WriteConcernError, WTimeoutError) from pymongo.message import _Query, _convert_exception +from pymongo.read_concern import DEFAULT_READ_CONCERN _UUNDER = u("_") @@ -240,7 +241,7 @@ def _first_batch(sock_info, db, coll, query, ntoreturn, """Simple query helper for retrieving a first (and possibly only) batch.""" query = _Query( 0, db, coll, 0, ntoreturn, query, None, - codec_options, read_preference, 0, ntoreturn) + codec_options, read_preference, 0, 0, DEFAULT_READ_CONCERN) name = next(iter(cmd)) duration = None diff --git a/pymongo/message.py b/pymongo/message.py index 0b7c77db9..0e1246f94 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -34,6 +34,7 @@ try: except ImportError: _use_c = False from pymongo.errors import DocumentTooLarge, InvalidOperation, OperationFailure +from pymongo.read_concern import DEFAULT_READ_CONCERN from pymongo.read_preferences import ReadPreference @@ -155,14 +156,18 @@ _MODIFIERS = SON([ def _gen_explain_command( - coll, spec, projection, skip, limit, batch_size, options): + coll, spec, projection, skip, limit, batch_size, + options, read_concern): """Generate an explain command document.""" cmd = _gen_find_command( coll, spec, projection, skip, limit, batch_size, options) + if read_concern.level: + return SON([('explain', cmd), ('readConcern', read_concern.document)]) return SON([('explain', cmd)]) -def _gen_find_command(coll, spec, projection, skip, limit, batch_size, options): +def _gen_find_command(coll, spec, projection, skip, limit, batch_size, + options, read_concern=DEFAULT_READ_CONCERN): """Generate a find command document.""" cmd = SON([('find', coll)]) if '$query' in spec: @@ -181,6 +186,8 @@ def _gen_find_command(coll, spec, projection, skip, limit, batch_size, options): cmd['singleBatch'] = True if batch_size: cmd['batchSize'] = batch_size + if read_concern.level: + cmd['readConcern'] = read_concern.document if options: cmd.update([(opt, True) @@ -205,10 +212,11 @@ class _Query(object): __slots__ = ('flags', 'db', 'coll', 'ntoskip', 'ntoreturn', 'spec', 'fields', 'codec_options', 'read_preference', 'limit', - 'batch_size', 'name') + 'batch_size', 'name', 'read_concern') def __init__(self, flags, db, coll, ntoskip, ntoreturn, spec, fields, - codec_options, read_preference, limit, batch_size): + codec_options, read_preference, limit, + batch_size, read_concern): self.flags = flags self.db = db self.coll = coll @@ -218,6 +226,7 @@ class _Query(object): self.fields = fields self.codec_options = codec_options self.read_preference = read_preference + self.read_concern = read_concern self.limit = limit self.batch_size = batch_size self.name = 'find' @@ -231,10 +240,11 @@ class _Query(object): self.name = 'explain' return _gen_explain_command( self.coll, self.spec, self.fields, self.ntoskip, - self.limit, self.batch_size, self.flags), self.db + self.limit, self.batch_size, self.flags, + self.read_concern), self.db return _gen_find_command( self.coll, self.spec, self.fields, self.ntoskip, self.limit, - self.batch_size, self.flags), self.db + self.batch_size, self.flags, self.read_concern), self.db def get_message(self, set_slave_ok, is_mongos, use_cmd=False): """Get a query message, possibly setting the slaveOk bit.""" diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index 1de70974e..c68e6dc3b 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -223,6 +223,15 @@ class MongoClient(common.BaseObject): this to ``False`` as that could make your application vulnerable to man-in-the-middle attacks. + | **Read Concern options:** + | (If not set explicitly, this will use the server default) + + - `readConcernLevel`: (string) The read concern level specifies the + level of isolation for read operations. For example, a read + operation using a read concern level of ``majority`` will only + return data that has been written to a majority of nodes. If the + level is left unspecified, the server default will be used. + .. mongodoc:: connections .. versionchanged:: 3.0 @@ -345,7 +354,8 @@ class MongoClient(common.BaseObject): super(MongoClient, self).__init__(options.codec_options, options.read_preference, - options.write_concern) + options.write_concern, + options.read_concern) self.__all_credentials = {} creds = options.credentials @@ -1036,8 +1046,8 @@ class MongoClient(common.BaseObject): return self[self.__default_database_name] - def get_database(self, name, codec_options=None, - read_preference=None, write_concern=None): + def get_database(self, name, codec_options=None, read_preference=None, + write_concern=None, read_concern=None): """Get a :class:`~pymongo.database.Database` with the given name and options. @@ -1070,9 +1080,14 @@ class MongoClient(common.BaseObject): :class:`~pymongo.write_concern.WriteConcern`. If ``None`` (the default) the :attr:`write_concern` of this :class:`MongoClient` is used. + - `read_concern` (optional): An instance of + :class:`~pymongo.read_concern.ReadConcern`. If ``None`` (the + default) the :attr:`read_concern` of this :class:`MongoClient` is + used. """ return database.Database( - self, name, codec_options, read_preference, write_concern) + self, name, codec_options, read_preference, + write_concern, read_concern) @property def is_locked(self): diff --git a/pymongo/network.py b/pymongo/network.py index 2f06613ed..23c1421d6 100644 --- a/pymongo/network.py +++ b/pymongo/network.py @@ -31,6 +31,7 @@ except ImportError: from pymongo import helpers, message from pymongo.errors import AutoReconnect, NotMasterError, OperationFailure +from pymongo.read_concern import DEFAULT_READ_CONCERN _UNPACK_INT = struct.Struct("= 4 and not exhaust) + use_find_cmd = False + if sock_info.max_wire_version >= 4: + if not exhaust: + use_find_cmd = True + elif (isinstance(operation, _Query) and + not operation.read_concern.ok_for_legacy): + raise ConfigurationError( + 'read concern level of %s is not valid ' + 'with a max wire version of %d.' + % (operation.read_concern.level, + sock_info.max_wire_version)) message = operation.get_message( set_slave_okay, sock_info.is_mongos, use_find_cmd) diff --git a/test/test_command_monitoring_spec.py b/test/test_command_monitoring_spec.py index d130e2a78..c3f88f9c5 100644 --- a/test/test_command_monitoring_spec.py +++ b/test/test_command_monitoring_spec.py @@ -19,8 +19,6 @@ import os import re import sys -from collections import defaultdict - sys.path[0:0] = [""] import pymongo @@ -31,7 +29,7 @@ from pymongo.errors import OperationFailure from pymongo.read_preferences import make_read_preference from pymongo.write_concern import WriteConcern from test import unittest, client_context -from test.utils import single_client, wait_until +from test.utils import single_client, wait_until, EventListener # Location of JSON test specifications. _TEST_PATH = os.path.join( @@ -45,21 +43,6 @@ def camel_to_snake(camel): return re.sub('([a-z0-9])([A-Z])', r'\1_\2', snake).lower() -class EventListener(monitoring.CommandListener): - - def __init__(self): - self.results = defaultdict(list) - - def started(self, event): - self.results['started'].append(event) - - def succeeded(self, event): - self.results['succeeded'].append(event) - - def failed(self, event): - self.results['failed'].append(event) - - class TestAllScenarios(unittest.TestCase): @classmethod diff --git a/test/test_database.py b/test/test_database.py index d92c3a6cf..d1f465663 100644 --- a/test/test_database.py +++ b/test/test_database.py @@ -42,6 +42,7 @@ from pymongo.errors import (CollectionInvalid, ExecutionTimeout, InvalidName, OperationFailure) +from pymongo.read_concern import ReadConcern from pymongo.read_preferences import ReadPreference from pymongo.write_concern import WriteConcern from test import (client_context, @@ -95,12 +96,15 @@ class TestDatabaseNoConnect(unittest.TestCase): def test_get_collection(self): codec_options = CodecOptions(tz_aware=True) write_concern = WriteConcern(w=2, j=True) + read_concern = ReadConcern('majority') coll = self.client.pymongo_test.get_collection( - 'foo', codec_options, ReadPreference.SECONDARY, write_concern) + 'foo', codec_options, ReadPreference.SECONDARY, write_concern, + read_concern) self.assertEqual('foo', coll.name) self.assertEqual(codec_options, coll.codec_options) self.assertEqual(ReadPreference.SECONDARY, coll.read_preference) self.assertEqual(write_concern, coll.write_concern) + self.assertEqual(read_concern, coll.read_concern) def test_getattr(self): db = self.client.pymongo_test diff --git a/test/test_monitoring.py b/test/test_monitoring.py index bc4d8dcf7..371ec4846 100644 --- a/test/test_monitoring.py +++ b/test/test_monitoring.py @@ -17,8 +17,6 @@ import sys import time import warnings -from collections import defaultdict - sys.path[0:0] = [""] from bson.objectid import ObjectId @@ -30,22 +28,7 @@ from pymongo.errors import NotMasterError, OperationFailure from pymongo.read_preferences import ReadPreference from pymongo.write_concern import WriteConcern from test import unittest, client_context, client_knobs -from test.utils import single_client, wait_until - - -class EventListener(monitoring.CommandListener): - - def __init__(self): - self.results = defaultdict(list) - - def started(self, event): - self.results['started'].append(event) - - def succeeded(self, event): - self.results['succeeded'].append(event) - - def failed(self, event): - self.results['failed'].append(event) +from test.utils import single_client, wait_until, EventListener class TestCommandMonitoring(unittest.TestCase): diff --git a/test/test_read_concern.py b/test/test_read_concern.py new file mode 100644 index 000000000..1468e2c65 --- /dev/null +++ b/test/test_read_concern.py @@ -0,0 +1,112 @@ +# Copyright 2015 MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test the read_concern module.""" + +import pymongo + +from bson.son import SON +from pymongo import monitoring +from pymongo.errors import ConfigurationError +from pymongo.read_concern import ReadConcern + +from test import client_context, pair, unittest +from test.utils import single_client, EventListener + + +class TestReadConcern(unittest.TestCase): + + @classmethod + @client_context.require_connection + def setUpClass(cls): + cls.listener = EventListener() + cls.saved_listeners = monitoring._LISTENERS + # Don't use any global subscribers. + monitoring._LISTENERS = monitoring._Listeners([]) + cls.client = single_client(event_listeners=[cls.listener]) + cls.db = cls.client.pymongo_test + + @classmethod + def tearDownClass(cls): + monitoring._LISTENERS = cls.saved_listeners + + def tearDown(self): + self.db.coll.drop() + self.listener.results.clear() + + def test_read_concern(self): + rc = ReadConcern() + self.assertIsNone(rc.level) + self.assertTrue(rc.ok_for_legacy) + + rc = ReadConcern('majority') + self.assertEqual('majority', rc.level) + self.assertFalse(rc.ok_for_legacy) + + rc = ReadConcern('local') + self.assertEqual('local', rc.level) + self.assertTrue(rc.ok_for_legacy) + + self.assertRaises(TypeError, ReadConcern, 42) + + def test_read_concern_uri(self): + uri = 'mongodb://%s/?readConcernLevel=majority' % (pair,) + client = pymongo.MongoClient(uri) + self.assertEqual(ReadConcern('majority'), client.read_concern) + + @client_context.require_version_max(3, 1) + def test_invalid_read_concern(self): + coll = self.db.get_collection( + 'coll', read_concern=ReadConcern('majority')) + self.assertRaisesRegexp( + ConfigurationError, + 'read concern level of majority is not valid ' + 'with a max wire version of [0-3]', + coll.count) + + @client_context.require_version_min(3, 1, 9, -1) + def test_find_command(self): + # readConcern not sent in command if not specified. + coll = self.db.coll + tuple(coll.find({'field': 'value'})) + self.assertNotIn('readConcern', + self.listener.results['started'][0].command) + + self.listener.results.clear() + + # Explicitly set readConcern to 'local'. + coll = self.db.get_collection('coll', read_concern=ReadConcern('local')) + tuple(coll.find({'field': 'value'})) + self.assertEqual( + SON([('find', 'coll'), + ('filter', {'field': 'value'}), + ('readConcern', {'level': 'local'})]), + self.listener.results['started'][0].command) + + @client_context.require_version_min(3, 1, 9, -1) + def test_command_cursor(self): + # readConcern not sent in command if not specified. + coll = self.db.coll + tuple(coll.aggregate([{'$match': {'field': 'value'}}])) + self.assertNotIn('readConcern', + self.listener.results['started'][0].command) + + self.listener.results.clear() + + # Explicitly set readConcern to 'local'. + coll = self.db.get_collection('coll', read_concern=ReadConcern('local')) + tuple(coll.aggregate([{'$match': {'field': 'value'}}])) + self.assertEqual( + {'level': 'local'}, + self.listener.results['started'][0].command['readConcern']) diff --git a/test/utils.py b/test/utils.py index 344ab8822..d643f4229 100644 --- a/test/utils.py +++ b/test/utils.py @@ -22,9 +22,11 @@ import sys import threading import time import warnings + +from collections import defaultdict from functools import partial -from pymongo import MongoClient +from pymongo import MongoClient, monitoring from pymongo.errors import AutoReconnect, OperationFailure from pymongo.server_selectors import (any_server_selector, writable_server_selector) @@ -37,6 +39,21 @@ from test import (client_context, from test.version import Version +class EventListener(monitoring.CommandListener): + + def __init__(self): + self.results = defaultdict(list) + + def started(self, event): + self.results['started'].append(event) + + def succeeded(self, event): + self.results['succeeded'].append(event) + + def failed(self, event): + self.results['failed'].append(event) + + def _connection_string_noauth(h, p): if h.startswith("mongodb://"): return h