diff --git a/doc/api/pymongo/collection.rst b/doc/api/pymongo/collection.rst
index b01e4a4f1..7ebd0bfa5 100644
--- a/doc/api/pymongo/collection.rst
+++ b/doc/api/pymongo/collection.rst
@@ -22,6 +22,8 @@
.. autoattribute:: database
.. autoattribute:: slave_okay
.. autoattribute:: read_preference
+ .. autoattribute:: tag_sets
+ .. autoattribute:: secondary_acceptable_latency_ms
.. autoattribute:: safe
.. autoattribute:: uuid_subtype
.. automethod:: get_lasterror_options
diff --git a/doc/api/pymongo/connection.rst b/doc/api/pymongo/connection.rst
index 3cdcfb219..238686960 100644
--- a/doc/api/pymongo/connection.rst
+++ b/doc/api/pymongo/connection.rst
@@ -17,11 +17,15 @@
.. autoattribute:: host
.. autoattribute:: port
+ .. autoattribute:: is_primary
+ .. autoattribute:: is_mongos
.. autoattribute:: nodes
.. autoattribute:: max_pool_size
.. autoattribute:: document_class
.. autoattribute:: tz_aware
.. autoattribute:: read_preference
+ .. autoattribute:: tag_sets
+ .. autoattribute:: secondary_acceptable_latency_ms
.. autoattribute:: slave_okay
.. autoattribute:: safe
.. autoattribute:: is_locked
diff --git a/doc/api/pymongo/database.rst b/doc/api/pymongo/database.rst
index 0f4d4b5db..359d6a949 100644
--- a/doc/api/pymongo/database.rst
+++ b/doc/api/pymongo/database.rst
@@ -20,6 +20,8 @@
.. autoattribute:: slave_okay
.. autoattribute:: read_preference
+ .. autoattribute:: tag_sets
+ .. autoattribute:: secondary_acceptable_latency_ms
.. autoattribute:: safe
.. automethod:: get_lasterror_options
.. automethod:: set_lasterror_options
diff --git a/doc/api/pymongo/index.rst b/doc/api/pymongo/index.rst
index 862a4bd47..c10baaf2b 100644
--- a/doc/api/pymongo/index.rst
+++ b/doc/api/pymongo/index.rst
@@ -13,7 +13,7 @@
Alias for :class:`pymongo.replica_set_connection.ReplicaSetConnection`.
- .. autoclass:: pymongo.ReadPreference
+ .. autoclass:: pymongo.read_preferences.ReadPreference
.. autofunction:: has_c
Sub-modules:
diff --git a/doc/api/pymongo/replica_set_connection.rst b/doc/api/pymongo/replica_set_connection.rst
index 41b854149..3283dae50 100644
--- a/doc/api/pymongo/replica_set_connection.rst
+++ b/doc/api/pymongo/replica_set_connection.rst
@@ -21,6 +21,8 @@
.. autoattribute:: primary
.. autoattribute:: secondaries
.. autoattribute:: read_preference
+ .. autoattribute:: tag_sets
+ .. autoattribute:: secondary_acceptable_latency_ms
.. autoattribute:: max_pool_size
.. autoattribute:: document_class
.. autoattribute:: tz_aware
diff --git a/doc/changelog.rst b/doc/changelog.rst
index 97fabc4b2..a1ee74f12 100644
--- a/doc/changelog.rst
+++ b/doc/changelog.rst
@@ -100,7 +100,7 @@ Important New Features:
automatic failover handling and periodically checks the state of the
replica set to handle issues like primary stepdown or secondaries
being removed for backup operations. Read preferences are defined through
- :class:`~pymongo.ReadPreference`.
+ :class:`~pymongo.read_preferences.ReadPreference`.
- PyMongo supports the new BSON binary subtype 4 for UUIDs. The default
subtype to use can be set through
:attr:`~pymongo.collection.Collection.uuid_subtype`
diff --git a/doc/examples/gevent.rst b/doc/examples/gevent.rst
index 2ede458d2..83720b241 100644
--- a/doc/examples/gevent.rst
+++ b/doc/examples/gevent.rst
@@ -53,8 +53,9 @@ Additionally, it will use a background greenlet instead of a background thread
to monitor the state of the replica set.
Using :meth:`~pymongo.replica_set_connection.ReplicaSetConnection.start_request()`
-with :class:`~pymongo.ReadPreference` PRIMARY ensures that the current greenlet
-uses the same socket for all operations until a call to :meth:`end_request()`.
+with :class:`~pymongo.read_preferences.ReadPreference` PRIMARY ensures that the
+current greenlet uses the same socket for all operations until a call to
+:meth:`end_request()`.
You must `install Gevent `_ to use
:class:`~pymongo.replica_set_connection.ReplicaSetConnection`
diff --git a/doc/examples/replica_set.rst b/doc/examples/replica_set.rst
index 191e1b490..3552d50b3 100644
--- a/doc/examples/replica_set.rst
+++ b/doc/examples/replica_set.rst
@@ -155,31 +155,113 @@ the operation will succeed::
ReplicaSetConnection
--------------------
-In Pymongo-2.1 a new ReplicaSetConnection class was added that provides
-some new features not supported in the original Connection class. The most
-important of these is the ability to distribute queries to the secondary
-members of a replica set. To connect using ReplicaSetConnection just
-provide a host:port pair and the name of the replica set::
+Using a :class:`~pymongo.replica_set_connection.ReplicaSetConnection` instead
+of a simple :class:`~pymongo.connection.Connection` offers two key features:
+secondary reads and replica set health monitoring. To connect using
+`ReplicaSetConnection` just provide a host:port pair and the name of the
+replica set::
>>> from pymongo import ReplicaSetConnection
>>> ReplicaSetConnection("morton.local:27017", replicaSet='foo')
ReplicaSetConnection([u'morton.local:27019', u'morton.local:27017', u'morton.local:27018'])
+Secondary Reads
+'''''''''''''''
+
By default an instance of ReplicaSetConnection will only send queries to
-the primary member of the replica set. To use secondary members for queries
-we have to change the read preference::
+the primary member of the replica set. To use secondaries for queries
+we have to change the :class:`~pymongo.read_preference.ReadPreference`::
>>> db = ReplicaSetConnection("morton.local:27017", replicaSet='foo').test
- >>> from pymongo import ReadPreference
- >>> db.read_preference = ReadPreference.SECONDARY
+ >>> from pymongo.read_preference import ReadPreference
+ >>> db.read_preference = ReadPreference.SECONDARY_PREFERRED
Now all queries will be sent to the secondary members of the set. If there are
no secondary members the primary will be used as a fallback. If you have
queries you would prefer to never send to the primary you can specify that
-using the SECONDARY_ONLY read preference::
+using the ``SECONDARY`` read preference::
- >>> db.read_preference = ReadPreference.SECONDARY_ONLY
+ >>> db.read_preference = ReadPreference.SECONDARY
Read preference can be set on a connection, database, collection, or on a
-per-query basis.
+per-query basis, e.g.::
+ >>> db.collection.find_one(read_preference=ReadPreference.PRIMARY)
+
+Reads are configured using three options: **read_preference**, **tag_sets**,
+and **secondary_acceptable_latency_ms**.
+
+**read_preference**:
+
+- ``PRIMARY``: Read from the primary. This is the default, and provides the
+ strongest consistency. If no primary is available, raise
+ :class:`~pymongo.errors.AutoReconnect`.
+
+- ``PRIMARY_PREFERRED``: Read from the primary if available, or if there is
+ none, read from a secondary matching your choice of ``tag_sets`` and
+ ``secondary_acceptable_latency_ms``.
+
+- ``SECONDARY``: Read from a secondary matching your choice of ``tag_sets`` and
+ ``secondary_acceptable_latency_ms``. If no matching secondary is available,
+ raise :class:`~pymongo.errors.AutoReconnect`.
+
+- ``SECONDARY_PREFERRED``: Read from a secondary matching your choice of
+ ``tag_sets`` and ``secondary_acceptable_latency_ms`` if available, otherwise
+ from primary (regardless of the primary's tags and latency).
+
+- ``NEAREST``: Read from any member matching your choice of ``tag_sets`` and
+ ``secondary_acceptable_latency_ms``.
+
+**tag_sets**:
+
+Replica-set members can be `tagged
+`_ according to any
+criteria you choose. By default, ReplicaSetConnection ignores tags when
+choosing a member to read from, but it can be configured with the ``tag_sets``
+parameter. ``tag_sets`` must be a list of dictionaries, each dict providing tag
+values that the replica set member must match. ReplicaSetConnection tries each
+set of tags in turn until it finds a set of tags with at least one matching
+member. For example, to prefer reads from the New York data center, but fall
+back to the San Francisco data center, tag your replica set members according
+to their location and create a ReplicaSetConnection like so:
+
+ >>> rsc = ReplicaSetConnection(
+ ... "morton.local:27017",
+ ... replicaSet='foo'
+ ... read_preference=ReadPreference.SECONDARY,
+ ... tag_sets=[{'dc': 'ny'}, {'dc': 'sf'}]
+ ... )
+
+ReplicaSetConnection tries to find secondaries in New York, then San Francisco,
+and raises :class:`~pymongo.errors.AutoReconnect` if none are available. As an
+additional fallback, specify a final, empty tag set, ``{}``, which means "read
+from any member that matches the mode, ignoring tags."
+
+**secondary_acceptable_latency_ms**:
+
+If multiple members match the mode and tag sets, ReplicaSetConnection reads
+from among the nearest members, chosen according to ping time. By default,
+only members whose ping times are within 15 milliseconds of the nearest
+are used for queries. You can choose to distribute reads among members with
+higher latencies by setting ``secondary_acceptable_latency_ms`` to a larger
+number. In that case, ReplicaSetConnection distributes reads among matching
+members within ``secondary_acceptable_latency_ms`` of the closest member's
+ping time.
+
+Health Monitoring
+'''''''''''''''''
+
+When ReplicaSetConnection is initialized it launches a background task to
+monitor the replica set for changes in:
+
+* Health: detect when a member goes down or comes up, or if a different member
+ becomes primary
+* Configuration: detect changes in tags
+* Latency: track a moving average of each member's ping time
+
+Replica-set monitoring ensures queries are continually routed to the proper
+members as the state of the replica set changes.
+
+It is critical to call
+:meth:`~pymongo.replica_set_connection.ReplicaSetConnection.close` to terminate
+the monitoring task before your process exits.
diff --git a/gridfs/__init__.py b/gridfs/__init__.py
index dedc4dbd2..9b92e8927 100644
--- a/gridfs/__init__.py
+++ b/gridfs/__init__.py
@@ -54,7 +54,8 @@ class GridFS(object):
self.__collection = database[collection]
self.__files = self.__collection.files
self.__chunks = self.__collection.chunks
- if not database.slave_okay and not database.read_preference:
+ connection = database.connection
+ if not hasattr(connection, 'is_primary') or connection.is_primary:
self.__chunks.ensure_index([("files_id", ASCENDING),
("n", ASCENDING)],
unique=True)
@@ -158,7 +159,7 @@ class GridFS(object):
:Parameters:
- `filename`: ``"filename"`` of the file to get, or `None`
- - `version` (optional): version of the file to get (defualts
+ - `version` (optional): version of the file to get (defaults
to -1, the most recent version uploaded)
- `**kwargs` (optional): find files by custom metadata.
@@ -168,8 +169,8 @@ class GridFS(object):
Accept keyword arguments to find files by custom metadata.
.. versionadded:: 1.9
"""
- database = self.__database
- if not database.slave_okay and not database.read_preference:
+ connection = self.__database.connection
+ if not hasattr(connection, 'is_primary') or connection.is_primary:
self.__files.ensure_index([("filename", ASCENDING),
("uploadDate", DESCENDING)])
diff --git a/pymongo/__init__.py b/pymongo/__init__.py
index e332e58d8..3f8f9ba30 100644
--- a/pymongo/__init__.py
+++ b/pymongo/__init__.py
@@ -48,49 +48,6 @@ SLOW_ONLY = 1
ALL = 2
"""Profile all operations."""
-class ReadPreference:
- """An enum that defines the read preferences supported by PyMongo.
-
- +----------------------+--------------------------------------------------+
- | Connection type | Read Preference |
- +======================+================+================+================+
- | |`PRIMARY` |`SECONDARY` |`SECONDARY_ONLY`|
- +----------------------+----------------+----------------+----------------+
- |Connection to a single|Queries are |Queries are |Same as |
- |host. |allowed if the |allowed if the |`SECONDARY` |
- | |connection is to|connection is to| |
- | |the replica set |the replica set | |
- | |primary. |primary or a | |
- | | |secondary. | |
- +----------------------+----------------+----------------+----------------+
- |Connection to a |Queries are sent|Queries are |Same as |
- |mongos. |to the primary |distributed |`SECONDARY` |
- | |of a shard. |among shard | |
- | | |secondaries. | |
- | | |Queries are sent| |
- | | |to the primary | |
- | | |if no | |
- | | |secondaries are | |
- | | |available. | |
- | | | | |
- +----------------------+----------------+----------------+----------------+
- |ReplicaSetConnection |Queries are sent|Queries are |Queries are |
- | |to the primary |distributed |never sent to |
- | |of the replica |among replica |the replica set |
- | |set. |set secondaries.|primary. An |
- | | |Queries are sent|exception is |
- | | |to the primary |raised if no |
- | | |if no |secondary is |
- | | |secondaries are |available. |
- | | |available. | |
- | | | | |
- +----------------------+----------------+----------------+----------------+
- """
-
- PRIMARY = 0
- SECONDARY = 1
- SECONDARY_ONLY = 2
-
version_tuple = (2, 2, 1, '+')
def get_version_string():
@@ -103,6 +60,7 @@ version = get_version_string()
from pymongo.connection import Connection
from pymongo.replica_set_connection import ReplicaSetConnection
+from pymongo.read_preferences import ReadPreference
def has_c():
"""Is the C extension installed?
diff --git a/pymongo/collection.py b/pymongo/collection.py
index 3b93042ca..88cd2c945 100644
--- a/pymongo/collection.py
+++ b/pymongo/collection.py
@@ -23,7 +23,7 @@ from pymongo import (common,
helpers,
message)
from pymongo.cursor import Cursor
-from pymongo.errors import ConfigurationError, InvalidName, InvalidOperation
+from pymongo.errors import ConfigurationError, InvalidName
def _gen_index_name(keys):
@@ -74,11 +74,14 @@ class Collection(common.BaseObject):
.. mongodoc:: collections
"""
- super(Collection,
- self).__init__(slave_okay=database.slave_okay,
- read_preference=database.read_preference,
- safe=database.safe,
- **(database.get_lasterror_options()))
+ super(Collection, self).__init__(
+ slave_okay=database.slave_okay,
+ read_preference=database.read_preference,
+ tag_sets=database.tag_sets,
+ secondary_acceptable_latency_ms=(
+ database.secondary_acceptable_latency_ms),
+ safe=database.safe,
+ **(database.get_lasterror_options()))
if not isinstance(name, basestring):
raise TypeError("name must be an instance "
@@ -590,13 +593,20 @@ class Collection(common.BaseObject):
:class:`~pymongo.connection.Connection`-level default
- `read_preference` (optional): The read preference for
this query.
+ - `tag_sets` (optional): The tag sets for this query.
+ - `secondary_acceptable_latency_ms` (optional): Any replica-set
+ member whose ping time is within secondary_acceptable_latency_ms of
+ the nearest member may accept reads. Default 15 milliseconds.
.. note:: The `manipulate` parameter may default to False in
a future release.
.. note:: The `max_scan` parameter requires server
version **>= 1.5.1**
-
+
+ .. versionadded:: 2.2.1+
+ The `tag_sets` and `secondary_acceptable_latency_ms` parameters.
+
.. versionadded:: 1.11+
The `await_data`, `partial`, and `manipulate` parameters.
@@ -619,6 +629,11 @@ class Collection(common.BaseObject):
kwargs['slave_okay'] = self.slave_okay
if not 'read_preference' in kwargs:
kwargs['read_preference'] = self.read_preference
+ if not 'tag_sets' in kwargs:
+ kwargs['tag_sets'] = self.tag_sets
+ if not 'secondary_acceptable_latency_ms' in kwargs:
+ kwargs['secondary_acceptable_latency_ms'] = (
+ self.secondary_acceptable_latency_ms)
return Cursor(self, *args, **kwargs)
def count(self):
@@ -924,7 +939,6 @@ class Collection(common.BaseObject):
return self.__database.command("aggregate", self.__name,
pipeline=pipeline,
- read_preference=self.read_preference,
slave_okay=self.slave_okay,
_use_master=use_master)
@@ -949,9 +963,10 @@ class Collection(common.BaseObject):
With :class:`~pymongo.replica_set_connection.ReplicaSetConnection`
or :class:`~pymongo.master_slave_connection.MasterSlaveConnection`,
if the `read_preference` attribute of this instance is not set to
- :attr:`pymongo.ReadPreference.PRIMARY` or the (deprecated)
- `slave_okay` attribute of this instance is set to `True` the group
- command will be sent to a secondary or slave.
+ :attr:`pymongo.read_preferences.ReadPreference.PRIMARY` or
+ :attr:`pymongo.read_preferences.ReadPreference.PRIMARY_PREFERRED`, or
+ the (deprecated) `slave_okay` attribute of this instance is set to
+ `True`, the group command will be sent to a secondary or slave.
:Parameters:
- `key`: fields to group by (see above description)
@@ -989,6 +1004,9 @@ class Collection(common.BaseObject):
return self.__database.command("group", group,
uuid_subtype=self.__uuid_subtype,
read_preference=self.read_preference,
+ tag_sets=self.tag_sets,
+ secondary_acceptable_latency_ms=(
+ self.secondary_acceptable_latency_ms),
slave_okay=self.slave_okay,
_use_master=use_master)["retval"]
@@ -1089,10 +1107,20 @@ class Collection(common.BaseObject):
raise TypeError("'out' must be an instance of "
"%s or dict" % (basestring.__name__,))
+ if isinstance(out, dict) and out.get('inline'):
+ must_use_master = False
+ else:
+ must_use_master = True
+
response = self.__database.command("mapreduce", self.__name,
uuid_subtype=self.__uuid_subtype,
map=map, reduce=reduce,
- out=out, **kwargs)
+ read_preference=self.read_preference,
+ tag_sets=self.tag_sets,
+ secondary_acceptable_latency_ms=(
+ self.secondary_acceptable_latency_ms),
+ out=out, _use_master=must_use_master,
+ **kwargs)
if full_response or not response.get('result'):
return response
@@ -1117,9 +1145,10 @@ class Collection(common.BaseObject):
With :class:`~pymongo.replica_set_connection.ReplicaSetConnection`
or :class:`~pymongo.master_slave_connection.MasterSlaveConnection`,
if the `read_preference` attribute of this instance is not set to
- :attr:`pymongo.ReadPreference.PRIMARY` or the (deprecated)
- `slave_okay` attribute of this instance is set to `True` the inline
- map reduce will be run on a secondary or slave.
+ :attr:`pymongo.read_preferences.ReadPreference.PRIMARY` or
+ :attr:`pymongo.read_preferences.ReadPreference.PRIMARY_PREFERRED`, or
+ the (deprecated) `slave_okay` attribute of this instance is set to
+ `True`, the inline map reduce will be run on a secondary or slave.
:Parameters:
- `map`: map function (as a JavaScript string)
@@ -1142,6 +1171,9 @@ class Collection(common.BaseObject):
res = self.__database.command("mapreduce", self.__name,
uuid_subtype=self.__uuid_subtype,
read_preference=self.read_preference,
+ tag_sets=self.tag_sets,
+ secondary_acceptable_latency_ms=(
+ self.secondary_acceptable_latency_ms),
slave_okay=self.slave_okay,
_use_master=use_master,
map=map, reduce=reduce,
diff --git a/pymongo/common.py b/pymongo/common.py
index 345221ac8..acdb88980 100644
--- a/pymongo/common.py
+++ b/pymongo/common.py
@@ -15,8 +15,9 @@
"""Functions and classes common to multiple pymongo modules."""
import warnings
+from pymongo import read_preferences
-from pymongo import ReadPreference
+from pymongo.read_preferences import ReadPreference
from pymongo.errors import ConfigurationError
@@ -83,31 +84,60 @@ def validate_int_or_basestring(option, value):
"integer or a string" % (option,))
+def validate_positive_float(option, value):
+ """Validates that 'value' is a float, or can be converted to one, and is
+ positive.
+ """
+ err = ConfigurationError("%s must be a positive int or float" % (option,))
+ try:
+ value = float(value)
+ except (ValueError, TypeError):
+ raise err
+ if value <= 0:
+ raise err
+
+ return value
+
+
def validate_timeout_or_none(option, value):
"""Validates a timeout specified in milliseconds returning
a value in floating point seconds.
"""
if value is None:
return value
- try:
- value = float(value)
- except (ValueError, TypeError):
- raise ConfigurationError("%s must be an "
- "instance of int or float" % (option,))
- if value <= 0:
- raise ConfigurationError("%s must be a positive integer" % (option,))
- return value / 1000.0
+ return validate_positive_float(option, value) / 1000.0
def validate_read_preference(dummy, value):
"""Validate read preference for a ReplicaSetConnection.
"""
- if value not in range(ReadPreference.PRIMARY,
- ReadPreference.SECONDARY_ONLY + 1):
+ if value not in read_preferences.modes:
raise ConfigurationError("Not a valid read preference")
return value
+def validate_tag_sets(dummy, value):
+ """Validate tag sets for a ReplicaSetConnection.
+ """
+ if value is None:
+ return [{}]
+
+ if not isinstance(value, list):
+ raise ConfigurationError((
+ "Tag sets %s invalid, must be a list" ) % repr(value))
+ if len(value) == 0:
+ raise ConfigurationError((
+ "Tag sets %s invalid, must be None or contain at least one set of"
+ " tags") % repr(value))
+
+ for tags in value:
+ if not isinstance(tags, dict):
+ raise ConfigurationError(
+ "Tag set %s invalid, must be a dict" % repr(tags))
+
+ return value
+
+
# jounal is an alias for j,
# wtimeoutms is an alias for wtimeout
VALIDATORS = {
@@ -125,6 +155,9 @@ VALIDATORS = {
'sockettimeoutms': validate_timeout_or_none,
'ssl': validate_boolean,
'read_preference': validate_read_preference,
+ 'tag_sets': validate_tag_sets,
+ 'secondaryacceptablelatencyms': validate_positive_float,
+ 'secondary_acceptable_latency_ms': validate_positive_float,
'auto_start_request': validate_boolean,
'use_greenlets': validate_boolean,
}
@@ -160,9 +193,16 @@ class BaseObject(object):
self.__slave_okay = False
self.__read_pref = ReadPreference.PRIMARY
+ self.__tag_sets = [{}]
+ self.__secondary_acceptable_latency_ms = 15
self.__safe = False
self.__safe_opts = {}
self.__set_options(options)
+ if (self.__read_pref == ReadPreference.PRIMARY
+ and self.__tag_sets != [{}]
+ ):
+ raise ConfigurationError(
+ "ReadPreference PRIMARY cannot be combined with tags")
def __set_safe_option(self, option, value, check=False):
"""Validates and sets getlasterror options for this
@@ -183,6 +223,14 @@ class BaseObject(object):
self.__slave_okay = validate_boolean(option, value)
elif option == 'read_preference':
self.__read_pref = validate_read_preference(option, value)
+ elif option == 'tag_sets':
+ self.__tag_sets = validate_tag_sets(option, value)
+ elif option in (
+ 'secondaryAcceptableLatencyMS',
+ 'secondary_acceptable_latency_ms'
+ ):
+ self.__secondary_acceptable_latency_ms = \
+ validate_positive_float(option, value)
elif option == 'safe':
self.__safe = validate_boolean(option, value)
elif option in SAFE_OPTIONS:
@@ -211,9 +259,9 @@ class BaseObject(object):
slave_okay = property(__get_slave_okay, __set_slave_okay)
def __get_read_pref(self):
- """The read preference for this instance.
+ """The read preference mode for this instance.
- See :class:`~pymongo.ReadPreference` for available options.
+ See :class:`~pymongo.read_preferences.ReadPreference` for available options.
.. versionadded:: 2.1
"""
@@ -224,6 +272,47 @@ class BaseObject(object):
self.__read_pref = validate_read_preference('read_preference', value)
read_preference = property(__get_read_pref, __set_read_pref)
+
+ def __get_acceptable_latency(self):
+ """Any replica-set member whose ping time is within
+ secondary_acceptable_latency_ms of the nearest member may accept
+ reads. Defaults to 15 milliseconds.
+
+ See :class:`~pymongo.read_preferences.ReadPreference`.
+
+ .. versionadded:: 2.2.1+
+ """
+ return self.__secondary_acceptable_latency_ms
+
+ def __set_acceptable_latency(self, value):
+ """Property setter for secondary_acceptable_latency_ms"""
+ self.__secondary_acceptable_latency_ms = (validate_positive_float(
+ 'secondary_acceptable_latency_ms', value))
+
+ secondary_acceptable_latency_ms = property(
+ __get_acceptable_latency, __set_acceptable_latency)
+
+ def __get_tag_sets(self):
+ """Set ``tag_sets`` to a list of dictionaries like [{'dc': 'ny'}] to
+ read only from members whose ``dc`` tag has the value ``"ny"``.
+ To specify a priority-order for tag sets, provide a list of
+ tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag
+ set, ``{}``, means "read from any member that matches the mode,
+ ignoring tags." ReplicaSetConnection tries each set of tags in turn
+ until it finds a set of tags with at least one matching member.
+
+ .. seealso:: `Data-Center Awareness
+ `_
+
+ .. versionadded:: 2.2.1+
+ """
+ return self.__tag_sets
+
+ def __set_tag_sets(self, value):
+ """Property setter for tag_sets"""
+ self.__tag_sets = validate_tag_sets('tag_sets', value)
+
+ tag_sets = property(__get_tag_sets, __set_tag_sets)
def __get_safe(self):
"""Use getlasterror with every write operation?
diff --git a/pymongo/connection.py b/pymongo/connection.py
index cf0287987..9a42b5bda 100644
--- a/pymongo/connection.py
+++ b/pymongo/connection.py
@@ -130,7 +130,7 @@ class Connection(common.BaseObject):
Other optional parameters can be passed as keyword arguments:
- `safe`: Use getlasterror for each write operation?
- - `j` or `journal`: Block until write operations have been commited
+ - `j` or `journal`: Block until write operations have been committed
to the journal. Ignored if the server is running without journaling.
Implies safe=True.
- `w`: (integer or string) If this is a replica set write operations
@@ -154,7 +154,8 @@ class Connection(common.BaseObject):
before timing out.
- `ssl`: If True, create the connection to the server using SSL.
- `read_preference`: The read preference for this connection.
- See :class:`~pymongo.ReadPreference` for available options.
+ See :class:`~pymongo.read_preferences.ReadPreference` for available
+ options.
- `auto_start_request`: If True (the default), each thread that
accesses this Connection has a socket allocated to it for the
thread's lifetime. This ensures consistent reads, even if you read
@@ -228,6 +229,8 @@ class Connection(common.BaseObject):
self.__nodes = seeds
self.__host = None
self.__port = None
+ self.__is_primary = False
+ self.__is_mongos = False
for option, value in kwargs.iteritems():
option, value = common.validate(option, value)
@@ -425,6 +428,23 @@ class Connection(common.BaseObject):
"""
return self.__port
+ @property
+ def is_primary(self):
+ """If this Connection is connected to a standalone, a replica-set
+ primary, or the master of a master-slave set.
+
+ .. versionadded:: 2.2.1+
+ """
+ return self.__is_primary
+
+ @property
+ def is_mongos(self):
+ """If this Connection is connected to mongos.
+
+ .. versionadded:: 2.2.1+
+ """
+ return self.__is_mongos
+
@property
def max_pool_size(self):
"""The maximum pool size limit set for this connection.
@@ -507,7 +527,7 @@ class Connection(common.BaseObject):
def __try_node(self, node):
"""Try to connect to this node and see if it works
- for our connection type.
+ for our connection type. Returns ((host, port), is_primary, is_mongos).
:Parameters:
- `node`: The (host, port) pair to try.
@@ -539,7 +559,7 @@ class Connection(common.BaseObject):
# TODO: Rework this for PYTHON-368 (mongos high availability).
if not self.__nodes:
self.__nodes = set([node])
- return node
+ return node, True, response.get('msg', '') == 'isdbgrid'
elif "primary" in response:
candidate = _partition_node(response["primary"])
return self.__try_node(candidate)
@@ -550,7 +570,7 @@ class Connection(common.BaseObject):
# Direct connection
if response.get("arbiterOnly", False):
raise ConfigurationError("%s:%d is an arbiter" % node)
- return node
+ return node, response['ismaster'], response.get('msg', '') == 'isdbgrid'
def __find_node(self, seeds=None):
"""Find a host, port pair suitable for our connection type.
@@ -570,24 +590,27 @@ class Connection(common.BaseObject):
In either case a connection to an arbiter will never succeed.
Sets __host and __port so that :attr:`host` and :attr:`port`
- will return the address of the connected host.
+ will return the address of the connected host. Sets __is_primary to
+ True if this is a primary or master, else False.
"""
errors = []
# self.__nodes may change size as we iterate.
candidates = seeds or self.__nodes.copy()
for candidate in candidates:
try:
- node = self.__try_node(candidate)
- if node:
- return node
+ node, is_primary, is_mongos = self.__try_node(candidate)
+ self.__is_primary = is_primary
+ self.__is_mongos = is_mongos
+ return node
except Exception, why:
errors.append(str(why))
# Try any hosts we discovered that were not in the seed list.
for candidate in self.__nodes - candidates:
try:
- node = self.__try_node(candidate)
- if node:
- return node
+ node, is_primary, is_mongos = self.__try_node(candidate)
+ self.__is_primary = is_primary
+ self.__is_mongos = is_mongos
+ return node
except Exception, why:
errors.append(str(why))
# Couldn't find a suitable host.
diff --git a/pymongo/cursor.py b/pymongo/cursor.py
index d0964d4b1..1fbb31b3a 100644
--- a/pymongo/cursor.py
+++ b/pymongo/cursor.py
@@ -16,9 +16,8 @@
from bson.code import Code
from bson.son import SON
-from pymongo import (helpers,
- message,
- ReadPreference)
+from pymongo import helpers, message, read_preferences
+from pymongo.read_preferences import ReadPreference
from pymongo.errors import (InvalidOperation,
AutoReconnect)
@@ -43,7 +42,8 @@ class Cursor(object):
timeout=True, snapshot=False, tailable=False, sort=None,
max_scan=None, as_class=None, slave_okay=False,
await_data=False, partial=False, manipulate=True,
- read_preference=ReadPreference.PRIMARY,
+ read_preference=ReadPreference.PRIMARY, tag_sets=[{}],
+ secondary_acceptable_latency_ms=None,
_must_use_master=False, _uuid_subtype=None, **kwargs):
"""Create a new cursor.
@@ -113,6 +113,8 @@ class Cursor(object):
self.__slave_okay = slave_okay
self.__manipulate = manipulate
self.__read_preference = read_preference
+ self.__tag_sets = tag_sets
+ self.__secondary_acceptable_latency_ms = secondary_acceptable_latency_ms
self.__tz_aware = collection.database.connection.tz_aware
self.__must_use_master = _must_use_master
self.__uuid_subtype = _uuid_subtype or collection.uuid_subtype
@@ -179,6 +181,9 @@ class Cursor(object):
copy.__partial = self.__partial
copy.__manipulate = self.__manipulate
copy.__read_preference = self.__read_preference
+ copy.__tag_sets = self.__tag_sets
+ copy.__secondary_acceptable_latency_ms = (
+ self.__secondary_acceptable_latency_ms)
copy.__must_use_master = self.__must_use_master
copy.__uuid_subtype = self.__uuid_subtype
copy.__query_flags = self.__query_flags
@@ -217,6 +222,14 @@ class Cursor(object):
operators["$snapshot"] = True
if self.__max_scan:
operators["$maxScan"] = self.__max_scan
+ if self.__collection.database.connection.is_mongos:
+ read_pref = {
+ 'mode': read_preferences.mongos_mode(self.__read_preference)}
+
+ if self.__tag_sets and self.__tag_sets != [{}]:
+ read_pref['tags'] = self.__tag_sets
+
+ operators['$readPreference'] = read_pref
if operators:
# Make a shallow copy so we can cleanly rewind or clone.
@@ -243,7 +256,9 @@ class Cursor(object):
options = self.__query_flags
if self.__tailable:
options |= _QUERY_OPTIONS["tailable_cursor"]
- if self.__slave_okay or self.__read_preference:
+ if (self.__slave_okay
+ or self.__read_preference != ReadPreference.PRIMARY
+ ):
options |= _QUERY_OPTIONS["slave_okay"]
if not self.__timeout:
options |= _QUERY_OPTIONS["no_timeout"]
@@ -474,8 +489,10 @@ class Cursor(object):
With :class:`~pymongo.replica_set_connection.ReplicaSetConnection`
or :class:`~pymongo.master_slave_connection.MasterSlaveConnection`,
- if `read_preference` is not :attr:`pymongo.ReadPreference.PRIMARY` or
- (deprecated) `slave_okay` is `True` the count command will be sent to
+ if `read_preference` is not
+ :attr:`pymongo.read_preferences.ReadPreference.PRIMARY` or
+ :attr:`pymongo.read_preferences.ReadPreference.PRIMARY_PREFERRED`, or
+ (deprecated) `slave_okay` is `True`, the count command will be sent to
a secondary or slave.
:Parameters:
@@ -494,6 +511,9 @@ class Cursor(object):
command = {"query": self.__spec, "fields": self.__fields}
command['read_preference'] = self.__read_preference
+ command['tag_sets'] = self.__tag_sets
+ command['secondary_acceptable_latency_ms'] = (
+ self.__secondary_acceptable_latency_ms)
command['slave_okay'] = self.__slave_okay
use_master = not self.__slave_okay and not self.__read_preference
command['_use_master'] = use_master
@@ -522,7 +542,8 @@ class Cursor(object):
With :class:`~pymongo.replica_set_connection.ReplicaSetConnection`
or :class:`~pymongo.master_slave_connection.MasterSlaveConnection`,
- if `read_preference` is not :attr:`pymongo.ReadPreference.PRIMARY` or
+ if `read_preference` is
+ not :attr:`pymongo.read_preferences.ReadPreference.PRIMARY` or
(deprecated) `slave_okay` is `True` the distinct command will be sent
to a secondary or slave.
@@ -544,6 +565,9 @@ class Cursor(object):
options["query"] = self.__spec
options['read_preference'] = self.__read_preference
+ options['tag_sets'] = self.__tag_sets
+ options['secondary_acceptable_latency_ms'] = (
+ self.__secondary_acceptable_latency_ms)
options['slave_okay'] = self.__slave_okay
use_master = not self.__slave_okay and not self.__read_preference
options['_use_master'] = use_master
@@ -629,6 +653,9 @@ class Cursor(object):
db = self.__collection.database
kwargs = {"_must_use_master": self.__must_use_master}
kwargs["read_preference"] = self.__read_preference
+ kwargs["tag_sets"] = self.__tag_sets
+ kwargs["secondary_acceptable_latency_ms"] = (
+ self.__secondary_acceptable_latency_ms)
if self.__connection_id is not None:
kwargs["_connection_to_use"] = self.__connection_id
kwargs.update(self.__kwargs)
diff --git a/pymongo/database.py b/pymongo/database.py
index 54c9ad53c..1a6fb0af9 100644
--- a/pymongo/database.py
+++ b/pymongo/database.py
@@ -26,6 +26,7 @@ from pymongo.errors import (CollectionInvalid,
InvalidName,
OperationFailure)
from pymongo.son_manipulator import ObjectIdInjector
+from pymongo import read_preferences as rp
def _check_name(name):
@@ -62,6 +63,9 @@ class Database(common.BaseObject):
super(Database,
self).__init__(slave_okay=connection.slave_okay,
read_preference=connection.read_preference,
+ tag_sets=connection.tag_sets,
+ secondary_acceptable_latency_ms=(
+ connection.secondary_acceptable_latency_ms),
safe=connection.safe,
**(connection.get_lasterror_options()))
@@ -313,9 +317,23 @@ class Database(common.BaseObject):
in this list will be ignored by error-checking
- `uuid_subtype` (optional): The BSON binary subtype to use
for a UUID used in this command.
+ - `read_preference`: The read preference for this connection.
+ See :class:`~pymongo.read_preferences.ReadPreference` for available
+ options.
+ - `tag_sets`: Read from replica-set members with these tags.
+ To specify a priority-order for tag sets, provide a list of
+ tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag
+ set, ``{}``, means "read from any member that matches the mode,
+ ignoring tags." ReplicaSetConnection tries each set of tags in turn
+ until it finds a set of tags with at least one matching member.
+ - `secondary_acceptable_latency_ms`: Any replica-set member whose
+ ping time is within secondary_acceptable_latency_ms of the nearest
+ member may accept reads. Default 15 milliseconds.
- `**kwargs` (optional): additional keyword arguments will
be added to the command document before it is sent
+ .. versionchanged:: 2.2.1+
+ Added `tag_sets` and `secondary_acceptable_latency_ms` options.
.. versionchanged:: 2.2
Added support for `as_class` - the class you want to use for
the resulting documents
@@ -332,15 +350,35 @@ class Database(common.BaseObject):
if isinstance(command, basestring):
command = SON([(command, value)])
+ command_name = command.keys()[0]
+ must_use_master = kwargs.pop('_use_master', False)
+ if command_name.lower() not in rp.secondary_ok_commands:
+ must_use_master = True
+
+ # Special-case: mapreduce can go to secondaries only if inline
+ if command_name == 'mapreduce':
+ out = command.get('out') or kwargs.get('out')
+ if not isinstance(out, dict) or not out.get('inline'):
+ must_use_master = True
+
extra_opts = {
'as_class': kwargs.pop('as_class', None),
- 'read_preference': kwargs.pop('read_preference',
- self.read_preference),
'slave_okay': kwargs.pop('slave_okay', self.slave_okay),
- '_must_use_master': kwargs.pop('_use_master', True),
+ '_must_use_master': must_use_master,
'_uuid_subtype': uuid_subtype
}
+ if not must_use_master:
+ extra_opts['read_preference'] = kwargs.pop(
+ 'read_preference',
+ self.read_preference)
+ extra_opts['tag_sets'] = kwargs.pop(
+ 'tag_sets',
+ self.tag_sets)
+ extra_opts['secondary_acceptable_latency_ms'] = kwargs.pop(
+ 'secondary_acceptable_latency_ms',
+ self.secondary_acceptable_latency_ms)
+
fields = kwargs.get('fields')
if fields is not None and not isinstance(fields, dict):
kwargs['fields'] = helpers._fields_list_to_dict(fields)
diff --git a/pymongo/errors.py b/pymongo/errors.py
index 294cb1856..a52baad3a 100644
--- a/pymongo/errors.py
+++ b/pymongo/errors.py
@@ -39,6 +39,9 @@ class AutoReconnect(ConnectionFailure):
will continue to raise this exception until the first successful
connection is made).
"""
+ def __init__(self, message='', errors=None):
+ self.errors = errors or []
+ ConnectionFailure.__init__(self, message)
class ConfigurationError(PyMongoError):
diff --git a/pymongo/master_slave_connection.py b/pymongo/master_slave_connection.py
index e5f477fb7..7b76b5c43 100644
--- a/pymongo/master_slave_connection.py
+++ b/pymongo/master_slave_connection.py
@@ -39,8 +39,8 @@ class MasterSlaveConnection(BaseObject):
to create this `MasterSlaveConnection` can themselves make use of
connection pooling, etc. 'Connection' instances used as slaves should
be created with the read_preference option set to
- :attr:`~pymongo.ReadPreference.SECONDARY`. Safe options are
- inherited from `master` and can be changed in this instance.
+ :attr:`~pymongo.read_preferences.ReadPreference.SECONDARY`. Safe
+ options are inherited from `master` and can be changed in this instance.
Raises TypeError if `master` is not an instance of `Connection` or
slaves is not a list of at least one `Connection` instances.
@@ -85,6 +85,14 @@ class MasterSlaveConnection(BaseObject):
def slaves(self):
return self.__slaves
+ @property
+ def is_mongos(self):
+ """If this MasterSlaveConnection is connected to mongos (always False)
+
+ .. versionadded:: 2.2.1+
+ """
+ return False
+
def get_document_class(self):
return self.__document_class
diff --git a/pymongo/read_preferences.py b/pymongo/read_preferences.py
new file mode 100644
index 000000000..1c34bf07e
--- /dev/null
+++ b/pymongo/read_preferences.py
@@ -0,0 +1,208 @@
+# Copyright 2012 10gen, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License",
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Utilities for choosing which member of a replica set to read from."""
+
+import random
+from collections import deque
+
+from pymongo.errors import ConfigurationError
+
+
+class ReadPreference:
+ """An enum that defines the read preferences supported by PyMongo. Used in
+ three cases:
+
+ :class:`~pymongo.connection.Connection` to a single host:
+
+ * `PRIMARY`: Queries are allowed if the connection is to the replica set
+ primary.
+ * `PRIMARY_PREFERRED`: Queries are allowed if the connection is to the
+ primary or a secondary.
+ * `SECONDARY`: Queries are allowed if the connection is to a secondary.
+ * `SECONDARY_PREFERRED`: Same as `PRIMARY_PREFERRED`.
+ * `NEAREST`: Same as `PRIMARY_PREFERRED`.
+
+ :class:`~pymongo.connection.Connection` to a mongos, with a sharded cluster
+ of replica sets:
+
+ * `PRIMARY`: Queries are sent to the primary of a shard.
+ * `PRIMARY_PREFERRED`: Queries are sent to the primary if available,
+ otherwise a secondary.
+ * `SECONDARY`: Queries are distributed among shard secondaries. An error
+ is raised if no secondaries are available.
+ * `SECONDARY_PREFERRED`: Queries are distributed among shard secondaries,
+ or the primary if no secondary is available.
+ * `NEAREST`: Queries are distributed among all members of a shard.
+
+ :class:`~pymongo.replica_set_connection.ReplicaSetConnection`:
+
+ * `PRIMARY`: Queries are sent to the primary of the replica set.
+ * `PRIMARY_PREFERRED`: Queries are sent to the primary if available,
+ otherwise a secondary.
+ * `SECONDARY`: Queries are distributed among secondaries. An error
+ is raised if no secondaries are available.
+ * `SECONDARY_PREFERRED`: Queries are distributed among secondaries,
+ or the primary if no secondary is available.
+ * `NEAREST`: Queries are distributed among all members.
+ """
+
+ PRIMARY = 0
+ PRIMARY_PREFERRED = 1
+ SECONDARY = 2
+ SECONDARY_ONLY = 2
+ SECONDARY_PREFERRED = 3
+ NEAREST = 4
+
+# For formatting error messages
+modes = {
+ ReadPreference.PRIMARY: 'PRIMARY',
+ ReadPreference.PRIMARY_PREFERRED: 'PRIMARY_PREFERRED',
+ ReadPreference.SECONDARY: 'SECONDARY',
+ ReadPreference.SECONDARY_PREFERRED: 'SECONDARY_PREFERRED',
+ ReadPreference.NEAREST: 'NEAREST',
+}
+
+def select_primary(members):
+ for member in members:
+ if member.is_primary:
+ if member.up:
+ return member
+ else:
+ return None
+
+ return None
+
+
+def select_member_with_tags(members, tags, secondary_only, latency):
+ candidates = []
+
+ for candidate in members:
+ if not candidate.up:
+ continue
+
+ if secondary_only and candidate.is_primary:
+ continue
+
+ if candidate.matches_tags(tags):
+ candidates.append(candidate)
+
+ if not candidates:
+ return None
+
+ # ping_time is in seconds
+ fastest = min([candidate.get_avg_ping_time() for candidate in candidates])
+ near_candidates = [
+ candidate for candidate in candidates
+ if candidate.get_avg_ping_time() - fastest < latency / 1000.]
+
+ return random.choice(near_candidates)
+
+
+def select_member(
+ members,
+ mode=ReadPreference.PRIMARY,
+ tag_sets=None,
+ latency=15
+):
+ """Return a Member or None.
+ """
+ if tag_sets is None:
+ tag_sets = [{}]
+
+ # For brevity
+ PRIMARY = ReadPreference.PRIMARY
+ PRIMARY_PREFERRED = ReadPreference.PRIMARY_PREFERRED
+ SECONDARY = ReadPreference.SECONDARY
+ SECONDARY_PREFERRED = ReadPreference.SECONDARY_PREFERRED
+ NEAREST = ReadPreference.NEAREST
+
+ if mode == PRIMARY:
+ if tag_sets != [{}]:
+ raise ConfigurationError("PRIMARY cannot be combined with tags")
+ return select_primary(members)
+
+ elif mode == PRIMARY_PREFERRED:
+ candidate_primary = select_member(members, PRIMARY, [{}], latency)
+ if candidate_primary:
+ return candidate_primary
+ else:
+ return select_member(members, SECONDARY, tag_sets, latency)
+
+ elif mode == SECONDARY:
+ for tags in tag_sets:
+ candidate = select_member_with_tags(members, tags, True, latency)
+ if candidate:
+ return candidate
+
+ return None
+
+ elif mode == SECONDARY_PREFERRED:
+ candidate_secondary = select_member(
+ members, SECONDARY, tag_sets, latency)
+ if candidate_secondary:
+ return candidate_secondary
+ else:
+ return select_member(members, PRIMARY, [{}], latency)
+
+ elif mode == NEAREST:
+ for tags in tag_sets:
+ candidate = select_member_with_tags(members, tags, False, latency)
+ if candidate:
+ return candidate
+
+ # Ran out of tags.
+ return None
+
+ else:
+ raise ConfigurationError("Invalid mode %s" % repr(mode))
+
+
+"""Commands that may be sent to replica-set secondaries, depending on
+ ReadPreference and tags. All other commands are always run on the primary.
+"""
+secondary_ok_commands = set([
+ "group", "aggregate", "collstats", "dbstats", "count", "distinct",
+ "geonear", "geosearch", "geowalk", "mapreduce",
+])
+
+
+class MovingAverage(object):
+ """Tracks a moving average. Not thread-safe.
+ """
+ def __init__(self, window_sz):
+ self.window_sz = window_sz
+ self.samples = deque()
+ self.total = 0
+
+ def update(self, sample):
+ self.samples.append(sample)
+ self.total += sample
+ if len(self.samples) > self.window_sz:
+ self.total -= self.samples.popleft()
+
+ def get(self):
+ if self.samples:
+ return self.total / float(len(self.samples))
+ else:
+ return None
+
+def mongos_mode(mode):
+ return {
+ ReadPreference.PRIMARY: 'primary',
+ ReadPreference.PRIMARY_PREFERRED: 'primaryPreferred',
+ ReadPreference.SECONDARY: 'secondary',
+ ReadPreference.SECONDARY_PREFERRED: 'secondaryPreferred',
+ ReadPreference.NEAREST: 'nearest',
+ }[mode]
diff --git a/pymongo/replica_set_connection.py b/pymongo/replica_set_connection.py
index acf1b6f54..850f19e7e 100644
--- a/pymongo/replica_set_connection.py
+++ b/pymongo/replica_set_connection.py
@@ -47,8 +47,9 @@ from pymongo import (common,
helpers,
message,
pool,
- uri_parser,
- ReadPreference)
+ uri_parser)
+from pymongo.read_preferences import (
+ ReadPreference, select_member, modes, MovingAverage)
from pymongo.errors import (AutoReconnect,
ConfigurationError,
ConnectionFailure,
@@ -58,7 +59,7 @@ from pymongo.errors import (AutoReconnect,
EMPTY = b("")
MAX_BSON_SIZE = 4 * 1024 * 1024
-
+MAX_RETRY = 3
def _partition_node(node):
"""Split a host:port string returned from mongod/s into
@@ -77,24 +78,32 @@ def _partition_node(node):
class Monitor(object):
"""Base class for replica set monitors.
"""
- def __init__(self, rsc, interval, event_class):
+ _refresh_interval = 30
+ def __init__(self, rsc, event_class):
self.rsc = weakref.proxy(rsc, self.shutdown)
- self.interval = interval
self.event = event_class()
+ self.stopped = False
def shutdown(self, dummy):
"""Signal the monitor to shutdown.
"""
+ self.stopped = True
+ self.event.set()
+
+ def schedule_refresh(self):
+ """Refresh immediately
+ """
self.event.set()
def monitor(self):
"""Run until the RSC is collected or an
unexpected error occurs.
"""
- while not self.event.isSet():
- self.event.wait(self.interval)
- if self.event.isSet():
+ while True:
+ self.event.wait(Monitor._refresh_interval)
+ if self.stopped:
break
+ self.event.clear()
try:
self.rsc.refresh()
except AutoReconnect:
@@ -108,8 +117,8 @@ class Monitor(object):
class MonitorThread(Monitor, threading.Thread):
"""Thread based replica set monitor.
"""
- def __init__(self, rsc, interval=5):
- Monitor.__init__(self, rsc, interval, threading.Event)
+ def __init__(self, rsc):
+ Monitor.__init__(self, rsc, threading.Event)
threading.Thread.__init__(self)
self.setName("ReplicaSetMonitorThread")
@@ -123,13 +132,16 @@ have_gevent = False
try:
from gevent import Greenlet
from gevent.event import Event
+
+ # Used by ReplicaSetConnection
+ from gevent.local import local as gevent_local
have_gevent = True
class MonitorGreenlet(Monitor, Greenlet):
"""Greenlet based replica set monitor.
"""
- def __init__(self, rsc, interval=5):
- Monitor.__init__(self, rsc, interval, Event)
+ def __init__(self, rsc):
+ Monitor.__init__(self, rsc, Event)
Greenlet.__init__(self)
# Don't override `run` in a Greenlet. Add _run instead.
@@ -144,6 +156,69 @@ except ImportError:
pass
+class Member(object):
+ """Represent one member of a replica set
+ """
+ # For unittesting only. Use under no circumstances!
+ _host_to_ping_time = {}
+
+ def __init__(self, host, ismaster_response, ping_time, connection_pool):
+ self.host = host
+ self.pool = connection_pool
+ self.ping_time = MovingAverage(5)
+ self.update(ismaster_response, ping_time)
+
+ def update(self, ismaster_response, ping_time):
+ self.is_primary = ismaster_response['ismaster']
+ self.max_bson_size = ismaster_response.get(
+ 'maxBsonObjectSize', MAX_BSON_SIZE)
+ self.tags = ismaster_response.get('tags', {})
+ self.record_ping_time(ping_time)
+ self.up = True
+
+ def get_avg_ping_time(self):
+ """Get a moving average of this member's ping times
+ """
+ if self.host in Member._host_to_ping_time:
+ # Simulate ping times for unittesting
+ return Member._host_to_ping_time[self.host]
+
+ return self.ping_time.get()
+
+ def record_ping_time(self, ping_time):
+ self.ping_time.update(ping_time)
+
+ def matches_mode(self, mode):
+ if mode == ReadPreference.PRIMARY and not self.is_primary:
+ return False
+
+ if mode == ReadPreference.SECONDARY and self.is_primary:
+ return False
+
+ return True
+
+ def matches_tags(self, tags):
+ """Return True if this member's tags are a superset of the passed-in
+ tags. E.g., if this member is tagged {'dc': 'ny', 'rack': '1'},
+ then it matches {'dc': 'ny'}.
+ """
+ for key, value in tags.items():
+ if key not in self.tags or self.tags[key] != value:
+ return False
+
+ return True
+
+ def matches_tag_sets(self, tag_sets):
+ """Return True if this member matches any of the tag sets, e.g.
+ [{'dc': 'ny'}, {'dc': 'la'}, {}]
+ """
+ for tags in tag_sets:
+ if self.matches_tags(tags):
+ return True
+
+ return False
+
+
class ReplicaSetConnection(common.BaseObject):
"""Connection to a MongoDB replica set.
"""
@@ -198,7 +273,7 @@ class ReplicaSetConnection(common.BaseObject):
Other optional parameters can be passed as keyword arguments:
- `safe`: Use getlasterror for each write operation?
- - `j` or `journal`: Block until write operations have been commited
+ - `j` or `journal`: Block until write operations have been committed
to the journal. Ignored if the server is running without
journaling. Implies safe=True.
- `w`: (integer or string) If this is a replica set write operations
@@ -217,16 +292,23 @@ class ReplicaSetConnection(common.BaseObject):
before timing out.
- `ssl`: If True, create the connection to the servers using SSL.
- `read_preference`: The read preference for this connection.
- See :class:`~pymongo.ReadPreference` for available options.
+ See :class:`~pymongo.read_preferences.ReadPreference` for available
+ - `tag_sets`: Read from replica-set members with these tags.
+ To specify a priority-order for tag sets, provide a list of
+ tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag
+ set, ``{}``, means "read from any member that matches the mode,
+ ignoring tags." ReplicaSetConnection tries each set of tags in turn
+ until it finds a set of tags with at least one matching member.
+ - `secondary_acceptable_latency_ms`: Any replica-set member whose
+ ping time is within secondary_acceptable_latency_ms of the nearest
+ member may accept reads. Default 15 milliseconds.
- `auto_start_request`: If True (the default), each thread that
accesses this :class:`ReplicaSetConnection` has a socket allocated
to it for the thread's lifetime, for each member of the set. For
- :class:`~pymongo.ReadPreference` PRIMARY, auto_start_request=True
- ensures consistent reads, even if you read after an unsafe
- write. For read preferences other than PRIMARY, there are no
- consistency guarantees. (The semantics of auto_start_request,
- :class:`~pymongo.ReadPreference`, and :class:`ReplicaSetConnection`
- may change in future releases of PyMongo.)
+ :class:`~pymongo.read_preferences.ReadPreference` PRIMARY,
+ auto_start_request=True ensures consistent reads, even if you read
+ after an unsafe write. For read preferences other than PRIMARY,
+ there are no consistency guarantees.
- `use_greenlets` (optional): if ``True``, use a background Greenlet
instead of a background thread to monitor state of replica set.
:meth:`start_request()` will ensure that the current greenlet uses
@@ -246,6 +328,8 @@ class ReplicaSetConnection(common.BaseObject):
connection.Connection.
+ .. versionchanged:: 2.2.1+
+ Added `tag_sets` and `secondary_acceptable_latency_ms` options.
.. versionchanged:: 2.2
Added `auto_start_request` and `use_greenlets` options.
Added support for `host`, `port`, and `network_timeout` keyword
@@ -258,7 +342,7 @@ class ReplicaSetConnection(common.BaseObject):
self.__arbiters = set()
self.__writer = None
self.__readers = []
- self.__pools = {}
+ self.__members = {}
self.__index_cache = {}
self.__auth_credentials = {}
@@ -266,6 +350,7 @@ class ReplicaSetConnection(common.BaseObject):
'max_pool_size', max_pool_size)
self.__tz_aware = common.validate_boolean('tz_aware', tz_aware)
self.__document_class = document_class
+ self.__monitor = None
# Compatibility with connection.Connection
host = kwargs.pop('host', hosts_or_uri)
@@ -311,6 +396,7 @@ class ReplicaSetConnection(common.BaseObject):
self.__auto_start_request = self.__opts.get('auto_start_request', True)
self.__in_request = self.__auto_start_request
+ self.__reset_pinned_hosts()
self.__name = self.__opts.get('replicaset')
if not self.__name:
raise ConfigurationError("the replicaSet "
@@ -358,7 +444,6 @@ class ReplicaSetConnection(common.BaseObject):
self.__monitor.setDaemon(True)
self.__monitor.start()
-
def _cached(self, dbname, coll, index):
"""Test if `index` is cached.
"""
@@ -498,6 +583,14 @@ class ReplicaSetConnection(common.BaseObject):
"""
return self.__arbiters
+ @property
+ def is_mongos(self):
+ """If this ReplicaSetConnection is connected to mongos (always False)
+
+ .. versionadded:: 2.2.1+
+ """
+ return False
+
@property
def max_pool_size(self):
"""The maximum pool size limit set for this connection.
@@ -530,7 +623,7 @@ class ReplicaSetConnection(common.BaseObject):
0 if no primary is available.
"""
if self.__writer:
- return self.__pools[self.__writer]['max_bson_size']
+ return self.__members[self.__writer].max_bson_size
return 0
@property
@@ -539,14 +632,17 @@ class ReplicaSetConnection(common.BaseObject):
def __simple_command(self, sock_info, dbname, spec):
"""Send a command to the server.
+ Returns (response, ping_time in seconds).
"""
rqst_id, msg, _ = message.query(0, dbname + '.$cmd', 0, -1, spec)
+ start = time.time()
sock_info.sock.sendall(msg)
response = self.__recv_msg(1, rqst_id, sock_info)
+ end = time.time()
response = helpers._unpack_response(response)['data'][0]
msg = "command %r failed: %%s" % spec
helpers._check_command_response(response, None, msg)
- return response
+ return response, end - start
def __auth(self, sock_info, dbname, user, passwd):
"""Authenticate socket against database `dbname`.
@@ -563,52 +659,86 @@ class ReplicaSetConnection(common.BaseObject):
def __is_master(self, host):
"""Directly call ismaster.
+ Returns (response, connection_pool, ping_time in seconds).
"""
- mpool = self.pool_class(host, self.__max_pool_size,
- self.__net_timeout, self.__conn_timeout,
- self.__use_ssl)
- sock_info = mpool.get_socket()
+ connection_pool = self.pool_class(
+ host, self.__max_pool_size, self.__net_timeout, self.__conn_timeout,
+ self.__use_ssl)
+
+ sock_info = connection_pool.get_socket()
try:
- response = self.__simple_command(
+ response, ping_time = self.__simple_command(
sock_info, 'admin', {'ismaster': 1}
)
- mpool.maybe_return_socket(sock_info)
- return response, mpool
+ connection_pool.maybe_return_socket(sock_info)
+ return response, connection_pool, ping_time
except (ConnectionFailure, socket.error):
- mpool.discard_socket(sock_info)
+ connection_pool.discard_socket(sock_info)
raise
def __update_pools(self):
"""Update the mapping of (host, port) pairs to connection pools.
"""
+ primary = None
secondaries = []
for host in self.__hosts:
- mongo, sock_info = None, None
+ member, sock_info = None, None
try:
- if host in self.__pools:
- mongo = self.__pools[host]
- sock_info = self.__socket(mongo)
- res = self.__simple_command(sock_info, 'admin', {'ismaster': 1})
- mongo['pool'].maybe_return_socket(sock_info)
+ if host in self.__members:
+ member = self.__members[host]
+ sock_info = self.__socket(member)
+ res, ping_time = self.__simple_command(
+ sock_info, 'admin', {'ismaster': 1})
+ member.pool.maybe_return_socket(sock_info)
+ member.update(res, ping_time)
else:
- res, conn = self.__is_master(host)
- bson_max = res.get('maxBsonObjectSize', MAX_BSON_SIZE)
- self.__pools[host] = {'pool': conn,
- 'last_checkout': time.time(),
- 'max_bson_size': bson_max}
+ res, connection_pool, ping_time = self.__is_master(host)
+ self.__members[host] = Member(
+ host=host,
+ ismaster_response=res,
+ ping_time=ping_time,
+ connection_pool=connection_pool)
except (ConnectionFailure, socket.error):
- if mongo:
- mongo['pool'].discard_socket(sock_info)
+ if member:
+ member.pool.discard_socket(sock_info)
+ self.__members.pop(member.host, None)
continue
# Only use hosts that are currently in 'secondary' state
# as readers.
if res['secondary']:
secondaries.append(host)
elif res['ismaster']:
- self.__writer = host
+ primary = host
+
+ if primary != self.__writer:
+ self.__reset_pinned_hosts()
+
+ self.__writer = primary
self.__readers = secondaries
+ def __schedule_refresh(self):
+ self.__monitor.schedule_refresh()
+
+ def __pin_host(self, host):
+ # After first successful read in a request, continue reading from same
+ # member until read preferences change, host goes down, or
+ # end_request(). This offers a small assurance that reads won't jump
+ # around in time.
+ self.__threadlocal.host = host
+
+ def __pinned_host(self):
+ return getattr(self.__threadlocal, 'host', None)
+
+ def __unpin_host(self):
+ self.__threadlocal.host = None
+
+ def __reset_pinned_hosts(self):
+ if self.__opts.get('use_greenlets', False):
+ self.__threadlocal = gevent_local()
+ else:
+ self.__threadlocal = threading.local()
+
def refresh(self):
"""Iterate through the existing host list, or possibly the
seed list, to update the list of hosts and arbiters in this
@@ -619,16 +749,16 @@ class ReplicaSetConnection(common.BaseObject):
hosts = set()
for node in nodes:
- mongo, sock_info = None, None
+ member, sock_info = None, None
try:
- if node in self.__pools:
- mongo = self.__pools[node]
- sock_info = self.__socket(mongo)
- response = self.__simple_command(sock_info, 'admin',
- {'ismaster': 1})
- mongo['pool'].maybe_return_socket(sock_info)
+ if node in self.__members:
+ member = self.__members[node]
+ sock_info = self.__socket(member)
+ response, _ = self.__simple_command(
+ sock_info, 'admin', {'ismaster': 1})
+ member.pool.maybe_return_socket(sock_info)
else:
- response, _ = self.__is_master(node)
+ response, _, _ = self.__is_master(node)
# Check that this host is part of the given replica set.
set_name = response.get('setName')
@@ -650,8 +780,8 @@ class ReplicaSetConnection(common.BaseObject):
hosts.update([_partition_node(h)
for h in response["passives"]])
except (ConnectionFailure, socket.error), why:
- if mongo:
- mongo['pool'].discard_socket(sock_info)
+ if member:
+ member.pool.discard_socket(sock_info)
errors.append("%s:%d: %s" % (node[0], node[1], str(why)))
if hosts:
self.__hosts = hosts
@@ -666,27 +796,28 @@ class ReplicaSetConnection(common.BaseObject):
def __check_is_primary(self, host):
"""Checks if this host is the primary for the replica set.
"""
- mongo, sock_info = None, None
+ member, sock_info = None, None
try:
- if host in self.__pools:
- mongo = self.__pools[host]
- sock_info = self.__socket(mongo)
- res = self.__simple_command(
+ if host in self.__members:
+ member = self.__members[host]
+ sock_info = self.__socket(member)
+ res, ping_time = self.__simple_command(
sock_info, 'admin', {'ismaster': 1}
)
else:
- res, conn = self.__is_master(host)
- bson_max = res.get('maxBsonObjectSize', MAX_BSON_SIZE)
- self.__pools[host] = {'pool': conn,
- 'last_checkout': time.time(),
- 'max_bson_size': bson_max}
+ res, connection_pool, ping_time = self.__is_master(host)
+ self.__members[host] = Member(
+ host=host,
+ ismaster_response=res,
+ ping_time=ping_time,
+ connection_pool=connection_pool)
except (ConnectionFailure, socket.error), why:
- if mongo:
- mongo['pool'].discard_socket(sock_info)
+ if member:
+ member.pool.discard_socket(sock_info)
raise ConnectionFailure("%s:%d: %s" % (host[0], host[1], str(why)))
- if mongo and sock_info:
- mongo['pool'].maybe_return_socket(sock_info)
+ if member and sock_info:
+ member.pool.maybe_return_socket(sock_info)
if res["ismaster"]:
return host
@@ -704,7 +835,9 @@ class ReplicaSetConnection(common.BaseObject):
if one exists.
"""
if self.__writer:
- return self.__pools[self.__writer]
+ primary = self.__members[self.__writer]
+ if primary.up:
+ return primary
# This is either the first connection or we had a failover.
self.refresh()
@@ -713,21 +846,20 @@ class ReplicaSetConnection(common.BaseObject):
for candidate in self.__hosts:
try:
self.__writer = self.__check_is_primary(candidate)
- return self.__pools[self.__writer]
+ return self.__members[self.__writer]
except (ConnectionFailure, socket.error), why:
errors.append(str(why))
# Couldn't find the primary.
raise AutoReconnect(', '.join(errors))
- def __socket(self, mongo):
+ def __socket(self, member):
"""Get a SocketInfo from the pool.
"""
- mpool = mongo['pool']
if self.__auto_start_request:
# No effect if a request already started
self.start_request()
- sock_info = mpool.get_socket()
+ sock_info = member.pool.get_socket()
if self.__auth_credentials:
self.__check_auth(sock_info)
@@ -736,9 +868,9 @@ class ReplicaSetConnection(common.BaseObject):
def disconnect(self):
"""Disconnect from the replica set primary.
"""
- mongo = self.__pools.get(self.__writer)
- if mongo and 'pool' in mongo:
- mongo['pool'].reset()
+ member = self.__members.get(self.__writer)
+ if member:
+ member.pool.reset()
self.__writer = None
def close(self):
@@ -757,7 +889,7 @@ class ReplicaSetConnection(common.BaseObject):
self.__monitor.join(1.0)
self.__monitor = None
self.__writer = None
- self.__pools = {}
+ self.__members = {}
def __check_response_to_last_error(self, response):
"""Check a response to a lastError message for errors.
@@ -851,15 +983,14 @@ class ReplicaSetConnection(common.BaseObject):
- `safe`: check getLastError status after sending the message
"""
if _connection_to_use in (None, -1):
- mongo = self.__find_primary()
+ member = self.__find_primary()
else:
- mongo = self.__pools[_connection_to_use]
+ member = self.__members[_connection_to_use]
sock_info = None
try:
- sock_info = self.__socket(mongo)
- rqst_id, data = self.__check_bson_size(msg,
- mongo['max_bson_size'])
+ sock_info = self.__socket(member)
+ rqst_id, data = self.__check_bson_size(msg, member.max_bson_size)
sock_info.sock.sendall(data)
# Safe mode. We pack the message together with a lastError
# message and send both. We then get the response (to the
@@ -869,101 +1000,166 @@ class ReplicaSetConnection(common.BaseObject):
if safe:
response = self.__recv_msg(1, rqst_id, sock_info)
rv = self.__check_response_to_last_error(response)
- mongo['pool'].maybe_return_socket(sock_info)
+ member.pool.maybe_return_socket(sock_info)
return rv
except(ConnectionFailure, socket.error), why:
- mongo['pool'].discard_socket(sock_info)
+ member.pool.discard_socket(sock_info)
if _connection_to_use in (None, -1):
self.disconnect()
raise AutoReconnect(str(why))
except:
- mongo['pool'].discard_socket(sock_info)
+ member.pool.discard_socket(sock_info)
raise
- def __send_and_receive(self, mongo, msg, **kwargs):
+ def __send_and_receive(self, member, msg, **kwargs):
"""Send a message on the given socket and return the response data.
"""
sock_info = None
try:
- sock_info = self.__socket(mongo)
+ sock_info = self.__socket(member)
if "network_timeout" in kwargs:
sock_info.sock.settimeout(kwargs['network_timeout'])
- rqst_id, data = self.__check_bson_size(msg,
- mongo['max_bson_size'])
+ rqst_id, data = self.__check_bson_size(msg, member.max_bson_size)
sock_info.sock.sendall(data)
response = self.__recv_msg(1, rqst_id, sock_info)
if "network_timeout" in kwargs:
sock_info.sock.settimeout(self.__net_timeout)
- mongo['pool'].maybe_return_socket(sock_info)
+ member.pool.maybe_return_socket(sock_info)
return response
except (ConnectionFailure, socket.error), why:
- host, port = mongo['pool'].pair
- mongo['pool'].discard_socket(sock_info)
+ host, port = member.pool.pair
+ member.pool.discard_socket(sock_info)
raise AutoReconnect("%s:%d: %s" % (host, port, str(why)))
except:
- mongo['pool'].discard_socket(sock_info)
+ member.pool.discard_socket(sock_info)
+ raise
+
+ def __try_read(self, member, msg, **kwargs):
+ """Attempt a read from a member; on failure mark the member "down" and
+ wake up the monitor thread to refresh as soon as possible.
+ """
+ try:
+ return self.__send_and_receive(member, msg, **kwargs)
+ except AutoReconnect:
+ member.up = False
+ self.__schedule_refresh()
raise
def _send_message_with_response(self, msg, _connection_to_use=None,
_must_use_master=False, **kwargs):
"""Send a message to Mongo and return the response.
- Sends the given message and returns the response.
+ Sends the given message and returns (host used, response).
:Parameters:
- `msg`: (request_id, data) pair making up the message to send
"""
- read_pref = kwargs.get('read_preference', ReadPreference.PRIMARY)
- mongo = None
+
+ # If we've disconnected since last read, trigger refresh
+ try:
+ self.__find_primary()
+ except AutoReconnect:
+ # We'll throw an error later
+ pass
+
+ tag_sets = kwargs.get('tag_sets', [{}])
+ mode = kwargs.get('read_preference', ReadPreference.PRIMARY)
+ if _must_use_master:
+ mode = ReadPreference.PRIMARY
+ tag_sets = [{}]
+
+ secondary_acceptable_latency_ms = kwargs.get(
+ 'secondary_acceptable_latency_ms',
+ self.secondary_acceptable_latency_ms)
+
+ member = None
try:
if _connection_to_use is not None:
if _connection_to_use == -1:
- mongo = self.__find_primary()
+ member = self.__find_primary()
else:
- mongo = self.__pools[_connection_to_use]
- return mongo['pool'].pair, self.__send_and_receive(mongo,
- msg,
- **kwargs)
- elif _must_use_master or not read_pref:
- mongo = self.__find_primary()
- return mongo['pool'].pair, self.__send_and_receive(mongo,
- msg,
- **kwargs)
+ member = self.__members[_connection_to_use]
+ return member.pool.pair, self.__try_read(
+ member, msg, **kwargs)
except AutoReconnect:
- if mongo == self.__pools.get(self.__writer):
+ if member == self.__members.get(self.__writer):
self.disconnect()
raise
errors = []
- for host in helpers.shuffled(self.__readers):
+ pinned_member = self.__members.get(self.__pinned_host())
+ if (pinned_member
+ and pinned_member.matches_mode(mode)
+ and pinned_member.matches_tag_sets(tag_sets)
+ and pinned_member.up
+ ):
try:
- mongo = self.__pools[host]
- return host, self.__send_and_receive(mongo, msg, **kwargs)
+ return (
+ pinned_member.host,
+ self.__try_read(pinned_member, msg, **kwargs))
+ except AutoReconnect, why:
+ if _must_use_master or mode == ReadPreference.PRIMARY:
+ self.disconnect()
+ raise
+ else:
+ errors.append(str(why))
+
+ # No pinned member, or pinned member down or doesn't match read pref
+ self.__unpin_host()
+
+ members = self.__members.copy().values()
+
+ while len(errors) < MAX_RETRY:
+ member = select_member(
+ members=members,
+ mode=mode,
+ tag_sets=tag_sets,
+ latency=secondary_acceptable_latency_ms)
+
+ if not member:
+ # Ran out of members to try
+ break
+
+ try:
+ # Sets member.up False on failure, so select_member won't try
+ # it again.
+ response = self.__try_read(member, msg, **kwargs)
+
+ # Success
+ if self.in_request():
+ # Keep reading from this member in this thread / greenlet
+ self.__pin_host(member.host)
+ return member.host, response
except AutoReconnect, why:
errors.append(str(why))
- # Fallback to primary
- if read_pref == ReadPreference.SECONDARY:
- try:
- mongo = self.__find_primary()
- return mongo['pool'].pair, self.__send_and_receive(mongo,
- msg,
- **kwargs)
- except AutoReconnect, why:
- self.disconnect()
- errors.append(str(why))
- raise AutoReconnect(', '.join(errors))
+ members.remove(member)
+
+ # Ran out of tries
+ if mode == ReadPreference.PRIMARY:
+ msg = "No replica set primary available for query"
+ elif mode == ReadPreference.SECONDARY:
+ msg = "No replica set secondary available for query"
+ else:
+ msg = "No replica set members available for query"
+
+ msg += " with ReadPreference %s" % modes[mode]
+
+ if tag_sets != [{}]:
+ msg += " and tags " + repr(tag_sets)
+
+ raise AutoReconnect(msg, errors)
def start_request(self):
"""Ensure the current thread or greenlet always uses the same socket
until it calls :meth:`end_request`. For
- :class:`~pymongo.ReadPreference` PRIMARY, auto_start_request=True
- ensures consistent reads, even if you read after an unsafe write. For
- read preferences other than PRIMARY, there are no consistency
- guarantees.
+ :class:`~pymongo.read_preferences.ReadPreference` PRIMARY,
+ auto_start_request=True ensures consistent reads, even if you read
+ after an unsafe write. For read preferences other than PRIMARY, there
+ are no consistency guarantees.
In Python 2.6 and above, or in Python 2.5 with
"from __future__ import with_statement", :meth:`start_request` can be
@@ -983,9 +1179,8 @@ class ReplicaSetConnection(common.BaseObject):
The :class:`~pymongo.pool.Request` return value.
:meth:`start_request` previously returned None
"""
- for mongo in self.__pools.values():
- if 'pool' in mongo:
- mongo['pool'].start_request()
+ for member in self.__members.values():
+ member.pool.start_request()
self.__in_request = True
return pool.Request(self)
@@ -1011,11 +1206,11 @@ class ReplicaSetConnection(common.BaseObject):
in the middle of a sequence of operations in which ordering is
important. This could lead to unexpected results.
"""
- for mongo in self.__pools.values():
- if 'pool' in mongo:
- mongo['pool'].end_request()
+ for member in self.__members.values():
+ member.pool.end_request()
self.__in_request = False
+ self.__unpin_host()
def __eq__(self, other):
# XXX: Implement this?
diff --git a/test/replica/replset_tools.py b/test/replica/replset_tools.py
index 142265ba1..61fdbffd5 100644
--- a/test/replica/replset_tools.py
+++ b/test/replica/replset_tools.py
@@ -120,35 +120,53 @@ def start_replica_set(members, fresh=True):
expected_arbiters += 1
expected_secondaries = len(members) - expected_arbiters - 1
- while True:
+ # Wait for 5 minutes for replica set to come up
+ patience = 5
+ for _ in range(patience * 60 / 2):
time.sleep(2)
try:
- if (len(get_primary()) == 1 and
+ if (get_primary() and
len(get_secondaries()) == expected_secondaries and
len(get_arbiters()) == expected_arbiters):
break
except pymongo.errors.AutoReconnect:
# Keep waiting
pass
+ else:
+ kill_all_members()
+ raise Exception(
+ "Replica set still not initalized after %s minutes" % patience)
return primary, set_name
+# Connect to a random member
+def get_connection():
+ return pymongo.Connection(nodes.keys(), slave_okay=True, use_greenlets=use_greenlets)
+
+
def get_members_in_state(state):
- c = pymongo.Connection(nodes.keys(), slave_okay=True, use_greenlets=use_greenlets)
- status = c.admin.command('replSetGetStatus')
+ status = get_connection().admin.command('replSetGetStatus')
members = status['members']
return [k['name'] for k in members if k['state'] == state]
def get_primary():
- return get_members_in_state(1)
+ try:
+ primaries = get_members_in_state(1)
+ assert len(primaries) <= 1
+ if primaries:
+ return primaries[0]
+ except pymongo.errors.AutoReconnect:
+ pass
+
+ return None
def get_random_secondary():
secondaries = get_members_in_state(2)
if len(secondaries):
- return [secondaries[random.randrange(0, len(secondaries))]]
- return secondaries
+ return random.choice(secondaries)
+ return None
def get_secondaries():
@@ -160,13 +178,11 @@ def get_arbiters():
def get_passives():
- c = pymongo.Connection(nodes.keys(), slave_okay=True, use_greenlets=use_greenlets)
- return c.admin.command('ismaster').get('passives', [])
+ return get_connection().admin.command('ismaster').get('passives', [])
def get_hosts():
- c = pymongo.Connection(nodes.keys(), slave_okay=True, use_greenlets=use_greenlets)
- return c.admin.command('ismaster').get('hosts', [])
+ return get_connection().admin.command('ismaster').get('hosts', [])
def get_hidden_members():
@@ -182,15 +198,24 @@ def get_hidden_members():
return secondaries
+def get_tags(member):
+ config = get_connection().local.system.replset.find_one()
+ for m in config['members']:
+ if m['host'] == member:
+ return m.get('tags', {})
+
+ raise Exception('member %s not in config' % repr(member))
+
+
def kill_primary(sig=2):
primary = get_primary()
- kill_members(primary, sig)
+ kill_members([primary], sig)
return primary
def kill_secondary(sig=2):
secondary = get_random_secondary()
- kill_members(secondary, sig)
+ kill_members([secondary], sig)
return secondary
diff --git a/test/replica/test_replica_set.py b/test/replica/test_replica_set.py
index c249bb37d..97239e617 100644
--- a/test/replica/test_replica_set.py
+++ b/test/replica/test_replica_set.py
@@ -21,98 +21,84 @@ import replset_tools
from replset_tools import use_greenlets
-from pymongo import (ReplicaSetConnection,
+from pymongo import (replica_set_connection,
+ ReplicaSetConnection,
ReadPreference)
+from pymongo.replica_set_connection import Member, Monitor
from pymongo.connection import Connection, _partition_node
from pymongo.errors import AutoReconnect, ConnectionFailure
+from test import utils
-class TestReadPreference(unittest.TestCase):
+
+# Override default 30-second interval for faster testing
+Monitor._refresh_interval = MONITOR_INTERVAL = 0.5
+
+
+class TestSecondaryConnection(unittest.TestCase):
def setUp(self):
members = [{}, {}, {'arbiterOnly': True}]
res = replset_tools.start_replica_set(members)
self.seed, self.name = res
- def test_read_preference(self):
- c = ReplicaSetConnection(
+ def test_secondary_connection(self):
+ self.c = ReplicaSetConnection(
self.seed, replicaSet=self.name, use_greenlets=use_greenlets)
- self.assertTrue(bool(len(c.secondaries)))
- db = c.pymongo_test
- db.test.remove({}, safe=True, w=len(c.secondaries))
+ self.assertTrue(bool(len(self.c.secondaries)))
+ db = self.c.pymongo_test
+ db.test.remove({}, safe=True, w=len(self.c.secondaries))
# Force replication...
- w = len(c.secondaries) + 1
+ w = len(self.c.secondaries) + 1
db.test.insert({'foo': 'bar'}, safe=True, w=w)
- # Test direct connection to a secondary
- host, port = replset_tools.get_secondaries()[0].split(':')
- port = int(port)
- conn = Connection(
- host, port, slave_okay=True, use_greenlets=use_greenlets)
- self.assertEqual(host, conn.host)
- self.assertEqual(port, conn.port)
- self.assert_(conn.pymongo_test.test.find_one())
- conn = Connection(
- host, port,
- read_preference=ReadPreference.SECONDARY,
- use_greenlets=use_greenlets)
- self.assertEqual(host, conn.host)
- self.assertEqual(port, conn.port)
- self.assert_(conn.pymongo_test.test.find_one())
+ # Test direct connection to a primary or secondary
+ primary_host, primary_port = replset_tools.get_primary().split(':')
+ primary_port = int(primary_port)
+ secondary_host, secondary_port = replset_tools.get_secondaries()[0].split(':')
+ secondary_port = int(secondary_port)
+
+ self.assertTrue(Connection(
+ primary_host, primary_port, use_greenlets=use_greenlets).is_primary)
+
+ self.assertTrue(Connection(
+ primary_host, primary_port, use_greenlets=use_greenlets,
+ read_preference=ReadPreference.PRIMARY_PREFERRED).is_primary)
+
+ self.assertTrue(Connection(
+ primary_host, primary_port, use_greenlets=use_greenlets,
+ read_preference=ReadPreference.SECONDARY_PREFERRED).is_primary)
+
+ self.assertTrue(Connection(
+ primary_host, primary_port, use_greenlets=use_greenlets,
+ read_preference=ReadPreference.NEAREST).is_primary)
+
+ self.assertTrue(Connection(
+ primary_host, primary_port, use_greenlets=use_greenlets,
+ read_preference=ReadPreference.SECONDARY).is_primary)
+
+ for kwargs in [
+ {'read_preference': ReadPreference.PRIMARY_PREFERRED},
+ {'read_preference': ReadPreference.SECONDARY},
+ {'read_preference': ReadPreference.SECONDARY_PREFERRED},
+ {'read_preference': ReadPreference.NEAREST},
+ {'slave_okay': True},
+ ]:
+ conn = Connection(
+ secondary_host, secondary_port, use_greenlets=use_greenlets, **kwargs)
+ self.assertEqual(secondary_host, conn.host)
+ self.assertEqual(secondary_port, conn.port)
+ self.assertFalse(conn.is_primary)
+ self.assert_(conn.pymongo_test.test.find_one())
# Test direct connection to an arbiter
- host = replset_tools.get_arbiters()[0]
+ secondary_host = replset_tools.get_arbiters()[0]
self.assertRaises(
- ConnectionFailure, Connection, host, use_greenlets=use_greenlets)
-
- # Test PRIMARY
- for _ in xrange(10):
- cursor = db.test.find()
- cursor.next()
- self.assertEqual(cursor._Cursor__connection_id, c.primary)
-
- # Test SECONDARY with a secondary
- db.read_preference = ReadPreference.SECONDARY
- for _ in xrange(10):
- cursor = db.test.find()
- cursor.next()
- self.assertTrue(cursor._Cursor__connection_id in c.secondaries)
-
- # Test SECONDARY_ONLY with a secondary
- db.read_preference = ReadPreference.SECONDARY_ONLY
- for _ in xrange(10):
- cursor = db.test.find()
- cursor.next()
- self.assertTrue(cursor._Cursor__connection_id in c.secondaries)
-
- # Test SECONDARY with no secondary
- killed = replset_tools.kill_all_secondaries()
- sleep(5) # Let monitor thread notice change
- self.assertTrue(bool(len(killed)))
- db.read_preference = ReadPreference.SECONDARY
- for _ in xrange(10):
- cursor = db.test.find()
- cursor.next()
- self.assertEqual(cursor._Cursor__connection_id, c.primary)
-
- # Test SECONDARY_ONLY with no secondary
- db.read_preference = ReadPreference.SECONDARY_ONLY
- for _ in xrange(10):
- cursor = db.test.find()
- self.assertRaises(AutoReconnect, cursor.next)
-
- replset_tools.restart_members(killed)
- # Test PRIMARY with no primary (should raise an exception)
- db.read_preference = ReadPreference.PRIMARY
- cursor = db.test.find()
- cursor.next()
- self.assertEqual(cursor._Cursor__connection_id, c.primary)
- killed = replset_tools.kill_primary()
- self.assertTrue(bool(len(killed)))
- self.assertRaises(AutoReconnect, db.test.find_one)
+ ConnectionFailure, Connection, secondary_host, use_greenlets=use_greenlets)
def tearDown(self):
+ self.c.close()
replset_tools.kill_all_members()
@@ -125,36 +111,43 @@ class TestPassiveAndHidden(unittest.TestCase):
self.seed, self.name = res
def test_passive_and_hidden(self):
- c = ReplicaSetConnection(
+ self.c = ReplicaSetConnection(
self.seed, replicaSet=self.name, use_greenlets=use_greenlets)
- db = c.pymongo_test
- db.test.remove({}, safe=True, w=len(c.secondaries))
- w = len(c.secondaries) + 1
+ db = self.c.pymongo_test
+ w = len(self.c.secondaries) + 1
+ db.test.remove({}, safe=True, w=w)
db.test.insert({'foo': 'bar'}, safe=True, w=w)
- db.read_preference = ReadPreference.SECONDARY
passives = replset_tools.get_passives()
passives = [_partition_node(member) for member in passives]
hidden = replset_tools.get_hidden_members()
hidden = [_partition_node(member) for member in hidden]
- self.assertEqual(c.secondaries, set(passives))
+ self.assertEqual(self.c.secondaries, set(passives))
- for _ in xrange(10):
- cursor = db.test.find()
- cursor.next()
- self.assertTrue(cursor._Cursor__connection_id not in hidden)
+ for mode in (
+ ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED
+ ):
+ db.read_preference = mode
+ for _ in xrange(10):
+ cursor = db.test.find()
+ cursor.next()
+ self.assertTrue(cursor._Cursor__connection_id in passives)
+ self.assertTrue(cursor._Cursor__connection_id not in hidden)
replset_tools.kill_members(replset_tools.get_passives(), 2)
- sleep(5) # Let monitor thread notice change
+ sleep(2 * MONITOR_INTERVAL)
+ db.read_preference = ReadPreference.SECONDARY_PREFERRED
for _ in xrange(10):
cursor = db.test.find()
cursor.next()
- self.assertEqual(cursor._Cursor__connection_id, c.primary)
+ self.assertEqual(cursor._Cursor__connection_id, self.c.primary)
def tearDown(self):
+ self.c.close()
replset_tools.kill_all_members()
+
class TestHealthMonitor(unittest.TestCase):
def setUp(self):
@@ -168,18 +161,18 @@ class TestHealthMonitor(unittest.TestCase):
primary = c.primary
secondaries = c.secondaries
+ # Wait for new primary to be elected
def primary_changed():
for _ in xrange(30):
- if c.primary != primary:
+ if c.primary and c.primary != primary:
return True
sleep(1)
return False
killed = replset_tools.kill_primary()
- sleep(5) # Let monitor thread notice change
self.assertTrue(bool(len(killed)))
self.assertTrue(primary_changed())
- self.assertTrue(secondaries != c.secondaries)
+ self.assertNotEqual(secondaries, c.secondaries)
def test_secondary_failure(self):
c = ReplicaSetConnection(
@@ -197,13 +190,13 @@ class TestHealthMonitor(unittest.TestCase):
return False
killed = replset_tools.kill_secondary()
- sleep(5) # Let monitor thread notice change
+ sleep(2 * MONITOR_INTERVAL)
self.assertTrue(bool(len(killed)))
self.assertEqual(primary, c.primary)
self.assertTrue(readers_changed())
secondaries = c.secondaries
- replset_tools.restart_members(killed)
+ replset_tools.restart_members([killed])
self.assertEqual(primary, c.primary)
self.assertTrue(readers_changed())
@@ -212,7 +205,7 @@ class TestHealthMonitor(unittest.TestCase):
self.seed, replicaSet=self.name, use_greenlets=use_greenlets)
self.assertTrue(bool(len(c.secondaries)))
primary = c.primary
- secondaries = c.secondaries
+ secondaries = c.secondaries.copy()
def primary_changed():
for _ in xrange(30):
@@ -223,7 +216,11 @@ class TestHealthMonitor(unittest.TestCase):
replset_tools.stepdown_primary()
self.assertTrue(primary_changed())
- self.assertTrue(secondaries != c.secondaries)
+
+ # There can be a delay between finding the primary and updating
+ # secondaries
+ sleep(5)
+ self.assertNotEqual(secondaries, c.secondaries)
def tearDown(self):
replset_tools.kill_all_members()
@@ -272,7 +269,8 @@ class TestReadWithFailover(unittest.TestCase):
def test_read_with_failover(self):
c = ReplicaSetConnection(
- self.seed, replicaSet=self.name, use_greenlets=use_greenlets)
+ self.seed, replicaSet=self.name, use_greenlets=use_greenlets,
+ auto_start_request=False)
self.assertTrue(bool(len(c.secondaries)))
def iter_cursor(cursor):
@@ -284,15 +282,15 @@ class TestReadWithFailover(unittest.TestCase):
w = len(c.secondaries) + 1
db.test.remove({}, safe=True, w=w)
# Force replication
- db.test.insert([{'foo': i} for i in xrange(10)],
- safe=True, w=w)
+ db.test.insert([{'foo': i} for i in xrange(10)], safe=True, w=w)
self.assertEqual(10, db.test.count())
- db.read_preference = ReadPreference.SECONDARY
+ db.read_preference = ReadPreference.SECONDARY_PREFERRED
cursor = db.test.find().batch_size(5)
cursor.next()
self.assertEqual(5, cursor._Cursor__retrieved)
- killed = replset_tools.kill_primary()
+ self.assertTrue(cursor._Cursor__connection_id in c.secondaries)
+ replset_tools.kill_primary()
# Primary failure shouldn't interrupt the cursor
self.assertTrue(iter_cursor(cursor))
self.assertEqual(10, cursor._Cursor__retrieved)
@@ -300,16 +298,316 @@ class TestReadWithFailover(unittest.TestCase):
def tearDown(self):
replset_tools.kill_all_members()
+
+class TestReadPreference(unittest.TestCase):
+ def setUp(self):
+ members = [
+ # primary
+ {'tags': {'dc': 'ny', 'name': 'primary'}},
+
+ # secondary
+ {'tags': {'dc': 'la', 'name': 'secondary'}, 'priority': 0},
+
+ # other_secondary
+ {'tags': {'dc': 'ny', 'name': 'other_secondary'}, 'priority': 0},
+ ]
+
+ res = replset_tools.start_replica_set(members)
+ self.seed, self.name = res
+
+ primary = replset_tools.get_primary()
+ self.primary = _partition_node(primary)
+ self.primary_tags = replset_tools.get_tags(primary)
+ # Make sure priority worked
+ self.assertEqual('primary', self.primary_tags['name'])
+
+ self.primary_dc = {'dc': self.primary_tags['dc']}
+
+ secondaries = replset_tools.get_secondaries()
+
+ (secondary, ) = [
+ s for s in secondaries
+ if replset_tools.get_tags(s)['name'] == 'secondary']
+
+ self.secondary = _partition_node(secondary)
+ self.secondary_tags = replset_tools.get_tags(secondary)
+ self.secondary_dc = {'dc': self.secondary_tags['dc']}
+
+ (other_secondary, ) = [
+ s for s in secondaries
+ if replset_tools.get_tags(s)['name'] == 'other_secondary']
+
+ self.other_secondary = _partition_node(other_secondary)
+ self.other_secondary_tags = replset_tools.get_tags(other_secondary)
+ self.other_secondary_dc = {'dc': self.other_secondary_tags['dc']}
+
+ self.c = ReplicaSetConnection(
+ self.seed, replicaSet=self.name, use_greenlets=use_greenlets)
+ self.db = self.c.pymongo_test
+ self.w = len(self.c.secondaries) + 1
+ self.db.test.remove({}, safe=True, w=self.w)
+ self.db.test.insert(
+ [{'foo': i} for i in xrange(10)], safe=True, w=self.w)
+
+ self.clear_ping_times()
+
+ def set_ping_time(self, host, ping_time_seconds):
+ Member._host_to_ping_time[host] = ping_time_seconds
+
+ def clear_ping_times(self):
+ Member._host_to_ping_time.clear()
+
+ def test_read_preference(self):
+ # This is long, but we put all the tests in one function to save time
+ # on setUp, which takes about 30 seconds to bring up a replica set.
+ # We pass through four states:
+ #
+ # 1. A primary and two secondaries
+ # 2. Primary down
+ # 3. Primary up, one secondary down
+ # 4. Primary up, all secondaries down
+ #
+ # For each state, we verify the behavior of PRIMARY,
+ # PRIMARY_PREFERRED, SECONDARY, SECONDARY_PREFERRED, and NEAREST
+ c = ReplicaSetConnection(
+ self.seed, replicaSet=self.name, use_greenlets=use_greenlets,
+ auto_start_request=False)
+
+ def assertReadFrom(member, *args, **kwargs):
+ utils.assertReadFrom(self, c, member, *args, **kwargs)
+
+ def assertReadFromAll(members, *args, **kwargs):
+ utils.assertReadFromAll(self, c, members, *args, **kwargs)
+
+ def unpartition_node(node):
+ host, port = node
+ return '%s:%s' % (host, port)
+
+ # To make the code terser, copy modes and hosts into local scope
+ PRIMARY = ReadPreference.PRIMARY
+ PRIMARY_PREFERRED = ReadPreference.PRIMARY_PREFERRED
+ SECONDARY = ReadPreference.SECONDARY
+ SECONDARY_PREFERRED = ReadPreference.SECONDARY_PREFERRED
+ NEAREST = ReadPreference.NEAREST
+
+ primary = self.primary
+ secondary = self.secondary
+ other_secondary = self.other_secondary
+
+ bad_tag = {'bad': 'tag'}
+
+ # 1. THREE MEMBERS UP -------------------------------------------------
+ # PRIMARY
+ assertReadFrom(primary, PRIMARY)
+
+ # PRIMARY_PREFERRED
+ # Trivial: mode and tags both match
+ assertReadFrom(primary, PRIMARY_PREFERRED, self.primary_dc)
+
+ # Secondary matches but not primary, choose primary
+ assertReadFrom(primary, PRIMARY_PREFERRED, self.secondary_dc)
+
+ # Chooses primary, ignoring tag sets
+ assertReadFrom(primary, PRIMARY_PREFERRED, self.primary_dc)
+
+ # Chooses primary, ignoring tag sets
+ assertReadFrom(primary, PRIMARY_PREFERRED, bad_tag)
+ assertReadFrom(primary, PRIMARY_PREFERRED, [bad_tag, {}])
+
+ # SECONDARY
+ assertReadFromAll([secondary, other_secondary], SECONDARY)
+
+ # SECONDARY_PREFERRED
+ assertReadFromAll([secondary, other_secondary], SECONDARY_PREFERRED)
+
+ # Multiple tags
+ assertReadFrom(secondary, SECONDARY_PREFERRED, self.secondary_tags)
+
+ # Fall back to primary if it's the only one matching the tags
+ assertReadFrom(primary, SECONDARY_PREFERRED, {'name': 'primary'})
+
+ # No matching secondaries
+ assertReadFrom(primary, SECONDARY_PREFERRED, bad_tag)
+
+ # Fall back from non-matching tag set to matching set
+ assertReadFromAll([secondary, other_secondary],
+ SECONDARY_PREFERRED, [bad_tag, {}])
+
+ assertReadFrom(other_secondary,
+ SECONDARY_PREFERRED, [bad_tag, {'dc': 'ny'}])
+
+ # NEAREST
+ self.clear_ping_times()
+
+ assertReadFromAll([primary, secondary, other_secondary], NEAREST)
+
+ assertReadFromAll([primary, other_secondary],
+ NEAREST, [bad_tag, {'dc': 'ny'}])
+
+ self.set_ping_time(primary, 0)
+ self.set_ping_time(secondary, .03) # 30 ms
+ self.set_ping_time(other_secondary, 10)
+
+ # Nearest member, no tags
+ assertReadFrom(primary, NEAREST)
+
+ # Tags override nearness
+ assertReadFrom(primary, NEAREST, {'name': 'primary'})
+ assertReadFrom(secondary, NEAREST, self.secondary_dc)
+
+ # Make secondary fast
+ self.set_ping_time(primary, .03) # 30 ms
+ self.set_ping_time(secondary, 0)
+
+ assertReadFrom(secondary, NEAREST)
+
+ # Other secondary fast
+ self.set_ping_time(secondary, 10)
+ self.set_ping_time(other_secondary, 0)
+
+ assertReadFrom(other_secondary, NEAREST)
+
+ # High secondaryAcceptableLatencyMS, should read from all members
+ assertReadFromAll(
+ [primary, secondary, other_secondary],
+ NEAREST, secondary_acceptable_latency_ms=1000*1000)
+
+ self.clear_ping_times()
+
+ assertReadFromAll([primary, other_secondary], NEAREST, [{'dc': 'ny'}])
+
+ # 2. PRIMARY DOWN -----------------------------------------------------
+ killed = replset_tools.kill_primary()
+
+ # Let monitor notice primary's gone
+ sleep(2 * MONITOR_INTERVAL)
+
+ # PRIMARY
+ assertReadFrom(None, PRIMARY)
+
+ # PRIMARY_PREFERRED
+ # No primary, choose matching secondary
+ assertReadFromAll([secondary, other_secondary], PRIMARY_PREFERRED)
+ assertReadFrom(secondary, PRIMARY_PREFERRED, {'name': 'secondary'})
+
+ # No primary or matching secondary
+ assertReadFrom(None, PRIMARY_PREFERRED, bad_tag)
+
+ # SECONDARY
+ assertReadFromAll([secondary, other_secondary], SECONDARY)
+
+ # Only primary matches
+ assertReadFrom(None, SECONDARY, {'name': 'primary'})
+
+ # No matching secondaries
+ assertReadFrom(None, SECONDARY, bad_tag)
+
+ # SECONDARY_PREFERRED
+ assertReadFromAll([secondary, other_secondary], SECONDARY_PREFERRED)
+
+ # Mode and tags both match
+ assertReadFrom(secondary, SECONDARY_PREFERRED, {'name': 'secondary'})
+
+ # NEAREST
+ self.clear_ping_times()
+
+ assertReadFromAll([secondary, other_secondary], NEAREST)
+
+ # 3. PRIMARY UP, ONE SECONDARY DOWN -----------------------------------
+ replset_tools.restart_members([killed])
+
+ for _ in range(30):
+ if replset_tools.get_primary():
+ break
+ sleep(1)
+ else:
+ self.fail("Primary didn't come back up")
+
+ replset_tools.kill_members([unpartition_node(secondary)], 2)
+ self.assertTrue(Connection(
+ unpartition_node(primary), use_greenlets=use_greenlets,
+ slave_okay=True
+ ).admin.command('ismaster')['ismaster'])
+
+ sleep(2 * MONITOR_INTERVAL)
+
+ # PRIMARY
+ assertReadFrom(primary, PRIMARY)
+
+ # PRIMARY_PREFERRED
+ assertReadFrom(primary, PRIMARY_PREFERRED)
+
+ # SECONDARY
+ assertReadFrom(other_secondary, SECONDARY)
+ assertReadFrom(other_secondary, SECONDARY, self.other_secondary_dc)
+
+ # Only the down secondary matches
+ assertReadFrom(None, SECONDARY, {'name': 'secondary'})
+
+ # SECONDARY_PREFERRED
+ assertReadFrom(other_secondary, SECONDARY_PREFERRED)
+ assertReadFrom(
+ other_secondary, SECONDARY_PREFERRED, self.other_secondary_dc)
+
+ # The secondary matching the tag is down, use primary
+ assertReadFrom(primary, SECONDARY_PREFERRED, {'name': 'secondary'})
+
+ # NEAREST
+ assertReadFromAll([primary, other_secondary], NEAREST)
+ assertReadFrom(other_secondary, NEAREST, {'name': 'other_secondary'})
+ assertReadFrom(primary, NEAREST, {'name': 'primary'})
+
+ # 4. PRIMARY UP, ALL SECONDARIES DOWN ---------------------------------
+ replset_tools.kill_members([unpartition_node(other_secondary)], 2)
+ self.assertTrue(Connection(
+ unpartition_node(primary), use_greenlets=use_greenlets,
+ slave_okay=True
+ ).admin.command('ismaster')['ismaster'])
+
+ # PRIMARY
+ assertReadFrom(primary, PRIMARY)
+
+ # PRIMARY_PREFERRED
+ assertReadFrom(primary, PRIMARY_PREFERRED)
+ assertReadFrom(primary, PRIMARY_PREFERRED, self.secondary_dc)
+
+ # SECONDARY
+ assertReadFrom(None, SECONDARY)
+ assertReadFrom(None, SECONDARY, self.other_secondary_dc)
+ assertReadFrom(None, SECONDARY, {'dc': 'ny'})
+
+ # SECONDARY_PREFERRED
+ assertReadFrom(primary, SECONDARY_PREFERRED)
+ assertReadFrom(primary, SECONDARY_PREFERRED, self.secondary_dc)
+ assertReadFrom(primary, SECONDARY_PREFERRED, {'name': 'secondary'})
+ assertReadFrom(primary, SECONDARY_PREFERRED, {'dc': 'ny'})
+
+ # NEAREST
+ assertReadFrom(primary, NEAREST)
+ assertReadFrom(None, NEAREST, self.secondary_dc)
+ assertReadFrom(None, NEAREST, {'name': 'secondary'})
+
+ # Even if primary's slow, still read from it
+ self.set_ping_time(primary, 100)
+ assertReadFrom(primary, NEAREST)
+ assertReadFrom(None, NEAREST, self.secondary_dc)
+
+ self.clear_ping_times()
+
+ def tearDown(self):
+ self.c.close()
+ replset_tools.kill_all_members()
+ self.clear_ping_times()
+
+
if __name__ == '__main__':
if use_greenlets:
print('Using Gevent')
import gevent
print('gevent version %s' % gevent.__version__)
- if gevent.__version__ == '0.13.6':
+ if gevent.__version__ >= '0.13.6':
print('method %s' % gevent.core.get_method())
- else:
- print(gevent.hub.get_hub())
from gevent import monkey
monkey.patch_socket()
sleep = gevent.sleep
diff --git a/test/test_gridfs.py b/test/test_gridfs.py
index 7566e6c89..8e76d6d55 100644
--- a/test/test_gridfs.py
+++ b/test/test_gridfs.py
@@ -16,6 +16,11 @@
"""Tests for the gridfs package.
"""
+from gridfs.grid_file import GridIn
+from pymongo.connection import Connection
+from pymongo.errors import AutoReconnect
+from pymongo.read_preferences import ReadPreference
+from test.test_replica_set_connection import TestConnectionReplicaSetBase
try:
from io import BytesIO as StringIO
@@ -341,5 +346,46 @@ class TestGridfs(unittest.TestCase):
)
+class TestGridfsReplicaSet(TestConnectionReplicaSetBase):
+ def test_gridfs_replica_set(self):
+ rsc = self._get_connection(
+ w=self.w, wtimeout=5000,
+ read_preference=ReadPreference.SECONDARY)
+
+ try:
+ fs = gridfs.GridFS(rsc.pymongo_test)
+ oid = fs.put(b('foo'))
+ content = fs.get(oid).read()
+ self.assertEqual(b('foo'), content)
+ finally:
+ rsc.close()
+
+ def test_gridfs_secondary(self):
+ primary_host, primary_port = self.primary
+ primary_connection = Connection(primary_host, primary_port)
+
+ secondary_host, secondary_port = self.secondaries[0]
+ for secondary_connection in [
+ Connection(secondary_host, secondary_port, slave_okay=True),
+ Connection(secondary_host, secondary_port,
+ read_preference=ReadPreference.SECONDARY),
+ ]:
+ primary_connection.pymongo_test.drop_collection("fs.files")
+ primary_connection.pymongo_test.drop_collection("fs.chunks")
+
+ # Should detect it's connected to secondary and not attempt to
+ # create index
+ fs = gridfs.GridFS(secondary_connection.pymongo_test)
+
+ # This won't detect secondary, raises error
+ self.assertRaises(AutoReconnect, fs.put, b('foo'))
+
+ def tearDown(self):
+ rsc = self._get_connection()
+ rsc.pymongo_test.drop_collection('fs.files')
+ rsc.pymongo_test.drop_collection('fs.chunks')
+ rsc.close()
+
+
if __name__ == "__main__":
unittest.main()
diff --git a/test/test_master_slave_connection.py b/test/test_master_slave_connection.py
index 9b654cf57..3ee99e0e2 100644
--- a/test/test_master_slave_connection.py
+++ b/test/test_master_slave_connection.py
@@ -62,6 +62,16 @@ class TestMasterSlaveConnection(unittest.TestCase):
self.connection = MasterSlaveConnection(self.master, self.slaves)
self.db = self.connection.pymongo_test
+ def tearDown(self):
+ try:
+ self.db.test.drop_indexes()
+ except Exception:
+ # Tests like test_disconnect can monkey with the connection in ways
+ # that make this fail
+ pass
+
+ super(TestMasterSlaveConnection, self).tearDown()
+
def test_types(self):
self.assertRaises(TypeError, MasterSlaveConnection, 1)
self.assertRaises(TypeError, MasterSlaveConnection, self.master, 1)
diff --git a/test/test_replica_set_connection.py b/test/test_replica_set_connection.py
index c3a8c96c9..333088422 100644
--- a/test/test_replica_set_connection.py
+++ b/test/test_replica_set_connection.py
@@ -14,6 +14,7 @@
"""Test the replica_set_connection module."""
+import copy
import datetime
import os
import signal
@@ -23,14 +24,15 @@ import time
import thread
import traceback
import unittest
+
sys.path[0:0] = [""]
from nose.plugins.skip import SkipTest
from bson.son import SON
from bson.tz_util import utc
-from pymongo import ReadPreference
from pymongo.connection import Connection
+from pymongo.read_preferences import ReadPreference
from pymongo.replica_set_connection import ReplicaSetConnection
from pymongo.replica_set_connection import _partition_node
from pymongo.database import Database
@@ -40,7 +42,7 @@ from pymongo.errors import (AutoReconnect,
InvalidName,
OperationFailure)
from test import version
-from test.utils import delay
+from test.utils import delay, assertReadFrom, assertReadFromAll, read_from_which_host
host = os.environ.get("DB_IP", 'localhost')
@@ -82,6 +84,10 @@ class TestConnectionReplicaSetBase(unittest.TestCase):
][0]
self.primary = _partition_node(primary_info['name'])
+ self.secondaries = [
+ _partition_node(m['name']) for m in repl_set_status['members']
+ if m['stateStr'] == 'SECONDARY'
+ ]
else:
raise SkipTest()
@@ -113,28 +119,65 @@ class TestConnection(TestConnectionReplicaSetBase):
self.assertEqual(c.primary, self.primary)
self.assertEqual(c.hosts, self.hosts)
self.assertEqual(c.arbiters, self.arbiters)
- self.assertEqual(c.read_preference, ReadPreference.PRIMARY)
self.assertEqual(c.max_pool_size, 10)
self.assertEqual(c.document_class, dict)
self.assertEqual(c.tz_aware, False)
- self.assertEqual(c.slave_okay, False)
- self.assertEqual(c.safe, False)
+
+ # Make sure RSC's properties are copied to Database and Collection
+ for obj in c, c.pymongo_test, c.pymongo_test.test:
+ self.assertEqual(obj.read_preference, ReadPreference.PRIMARY)
+ self.assertEqual(obj.tag_sets, [{}])
+ self.assertEqual(obj.secondary_acceptable_latency_ms, 15)
+ self.assertEqual(obj.slave_okay, False)
+ self.assertEqual(obj.safe, False)
+
+ cursor = c.pymongo_test.test.find()
+ self.assertEqual(
+ ReadPreference.PRIMARY, cursor._Cursor__read_preference)
+ self.assertEqual([{}], cursor._Cursor__tag_sets)
+ self.assertEqual(15, cursor._Cursor__secondary_acceptable_latency_ms)
+ self.assertEqual(False, cursor._Cursor__slave_okay)
c.close()
+ tag_sets = [{'dc': 'la', 'rack': '2'}, {'foo': 'bar'}]
c = ReplicaSetConnection(pair, replicaSet=self.name, max_pool_size=25,
document_class=SON, tz_aware=True,
slaveOk=False, safe=True,
- read_preference=ReadPreference.SECONDARY)
+ read_preference=ReadPreference.SECONDARY,
+ tag_sets=copy.deepcopy(tag_sets),
+ secondary_acceptable_latency_ms=77)
c.admin.command('ping')
self.assertEqual(c.primary, self.primary)
self.assertEqual(c.hosts, self.hosts)
self.assertEqual(c.arbiters, self.arbiters)
- self.assertEqual(c.read_preference, ReadPreference.SECONDARY)
self.assertEqual(c.max_pool_size, 25)
self.assertEqual(c.document_class, SON)
self.assertEqual(c.tz_aware, True)
- self.assertEqual(c.slave_okay, False)
- self.assertEqual(c.safe, True)
+
+ for obj in c, c.pymongo_test, c.pymongo_test.test:
+ self.assertEqual(obj.read_preference, ReadPreference.SECONDARY)
+ self.assertEqual(obj.tag_sets, tag_sets)
+ self.assertEqual(obj.secondary_acceptable_latency_ms, 77)
+ self.assertEqual(obj.slave_okay, False)
+ self.assertEqual(obj.safe, True)
+
+ cursor = c.pymongo_test.test.find()
+ self.assertEqual(
+ ReadPreference.SECONDARY, cursor._Cursor__read_preference)
+ self.assertEqual(tag_sets, cursor._Cursor__tag_sets)
+ self.assertEqual(77, cursor._Cursor__secondary_acceptable_latency_ms)
+ self.assertEqual(False, cursor._Cursor__slave_okay)
+
+ cursor = c.pymongo_test.test.find(
+ read_preference=ReadPreference.NEAREST,
+ tag_sets=[{'dc':'ny'}, {}],
+ secondary_acceptable_latency_ms=123)
+
+ self.assertEqual(
+ ReadPreference.NEAREST, cursor._Cursor__read_preference)
+ self.assertEqual([{'dc':'ny'}, {}], cursor._Cursor__tag_sets)
+ self.assertEqual(123, cursor._Cursor__secondary_acceptable_latency_ms)
+ self.assertEqual(False, cursor._Cursor__slave_okay)
if version.at_least(c, (1, 7, 4)):
self.assertEqual(c.max_bson_size, 16777216)
@@ -315,17 +358,17 @@ class TestConnection(TestConnectionReplicaSetBase):
self.assertRaises(TypeError, iterate)
connection.close()
- def test_close(self):
+ def test_disconnect(self):
c = self._get_connection()
coll = c.foo.bar
- c.close()
- c.close()
+ c.disconnect()
+ c.disconnect()
coll.count()
- c.close()
- c.close()
+ c.disconnect()
+ c.disconnect()
coll.count()
@@ -614,7 +657,8 @@ class TestConnection(TestConnectionReplicaSetBase):
# auto_start_request should default to True
conn = self._get_connection()
- pools = [mongo['pool'] for mongo in conn._ReplicaSetConnection__pools.values()]
+ pools = [mongo.pool for mongo in
+ conn._ReplicaSetConnection__members.values()]
self.assertTrue(conn.auto_start_request)
self.assertTrue(conn.in_request())
@@ -630,6 +674,7 @@ class TestConnection(TestConnectionReplicaSetBase):
self.assertFalse(pool.in_request())
conn.start_request()
self.assertTrue(conn.in_request())
+ conn.close()
conn = self._get_connection(auto_start_request=False)
self.assertFalse(conn.in_request())
@@ -637,6 +682,72 @@ class TestConnection(TestConnectionReplicaSetBase):
self.assertTrue(conn.in_request())
conn.end_request()
self.assertFalse(conn.in_request())
+ conn.close()
+
+ def test_schedule_refresh(self):
+ # Monitor thread starts waiting for _refresh_interval, 30 seconds
+ conn = self._get_connection()
+
+ # Reconnect if necessary
+ conn.pymongo_test.test.find_one()
+
+ secondaries = conn.secondaries
+ for secondary in secondaries:
+ conn._ReplicaSetConnection__members[secondary].up = False
+
+ conn._ReplicaSetConnection__members[conn.primary].up = False
+
+ # Wake up monitor thread
+ conn._ReplicaSetConnection__schedule_refresh()
+
+ # Refresh interval is 30 seconds; scheduling a refresh tells the
+ # monitor thread / greenlet to start a refresh now. We still need to
+ # sleep a few seconds for it to complete.
+ time.sleep(5)
+ for secondary in secondaries:
+ self.assertTrue(conn._ReplicaSetConnection__members[secondary].up,
+ "ReplicaSetConnection didn't detect secondary is up")
+
+ self.assertTrue(conn._ReplicaSetConnection__members[conn.primary].up,
+ "ReplicaSetConnection didn't detect primary is up")
+
+ conn.close()
+
+ def test_pinned_member(self):
+ conn = self._get_connection(
+ auto_start_request=False, secondary_acceptable_latency_ms=1000*1000)
+
+ host = read_from_which_host(conn, ReadPreference.SECONDARY)
+ self.assertTrue(host in conn.secondaries)
+
+ # No pinning since we're not in a request
+ assertReadFromAll(
+ self, conn, conn.secondaries, ReadPreference.SECONDARY)
+
+ assertReadFromAll(
+ self, conn, list(conn.secondaries) + [conn.primary],
+ ReadPreference.NEAREST)
+
+ conn.start_request()
+ host = read_from_which_host(conn, ReadPreference.SECONDARY)
+ self.assertTrue(host in conn.secondaries)
+ assertReadFrom(self, conn, host, ReadPreference.SECONDARY)
+
+ # Repin
+ primary = read_from_which_host(conn, ReadPreference.PRIMARY)
+ self.assertEqual(conn.primary, primary)
+ assertReadFrom(self, conn, primary, ReadPreference.NEAREST)
+
+ # Repin again
+ host = read_from_which_host(conn, ReadPreference.SECONDARY)
+ self.assertTrue(host in conn.secondaries)
+ assertReadFrom(self, conn, host, ReadPreference.SECONDARY)
+
+ # Unpin
+ conn.end_request()
+ assertReadFromAll(
+ self, conn, list(conn.secondaries) + [conn.primary],
+ ReadPreference.NEAREST)
if __name__ == "__main__":
diff --git a/test/test_threads.py b/test/test_threads.py
index 934ef171a..950472283 100644
--- a/test/test_threads.py
+++ b/test/test_threads.py
@@ -25,9 +25,7 @@ from test.test_connection import get_connection
from pymongo.connection import Connection
from pymongo.replica_set_connection import ReplicaSetConnection
from pymongo.pool import SocketInfo, _closed
-from pymongo.errors import (AutoReconnect,
- OperationFailure,
- DuplicateKeyError)
+from pymongo.errors import AutoReconnect, OperationFailure
def get_pool(connection):
@@ -35,8 +33,8 @@ def get_pool(connection):
return connection._Connection__pool
elif isinstance(connection, ReplicaSetConnection):
writer = connection._ReplicaSetConnection__writer
- pools = connection._ReplicaSetConnection__pools
- return pools[writer]['pool']
+ pools = connection._ReplicaSetConnection__members
+ return pools[writer].pool
else:
raise TypeError(str(connection))
@@ -447,5 +445,6 @@ class TestThreads(BaseTestThreads, unittest.TestCase):
class TestThreadsAuth(BaseTestThreadsAuth, unittest.TestCase):
pass
+
if __name__ == "__main__":
unittest.main()
diff --git a/test/test_threads_replica_set_connection.py b/test/test_threads_replica_set_connection.py
index b363c7b32..b7ff73802 100644
--- a/test/test_threads_replica_set_connection.py
+++ b/test/test_threads_replica_set_connection.py
@@ -15,18 +15,13 @@
"""Test that pymongo is thread safe."""
import unittest
-import os
-from nose.plugins.skip import SkipTest
from pymongo.replica_set_connection import ReplicaSetConnection
-from test.test_threads import (IgnoreAutoReconnect,
- BaseTestThreads,
- BaseTestThreadsAuth)
+from test.test_threads import BaseTestThreads, BaseTestThreadsAuth
from test.test_replica_set_connection import (TestConnectionReplicaSetBase,
pair)
-from pymongo.errors import AutoReconnect
class TestThreadsReplicaSet(TestConnectionReplicaSetBase, BaseTestThreads):
def setUp(self):
@@ -70,6 +65,7 @@ class TestThreadsAuthReplicaSet(TestConnectionReplicaSetBase, BaseTestThreadsAut
"""
return ReplicaSetConnection(pair, replicaSet=self.name)
+
if __name__ == "__main__":
suite = unittest.TestSuite([
unittest.makeSuite(TestThreadsReplicaSet),
diff --git a/test/utils.py b/test/utils.py
index 054af5acb..ccdae4a17 100644
--- a/test/utils.py
+++ b/test/utils.py
@@ -15,6 +15,9 @@
"""Utilities for testing pymongo
"""
+from pymongo.errors import AutoReconnect
+
+
def delay(sec):
# Javascript sleep() only available in MongoDB since version ~1.9
return '''function() {
@@ -48,3 +51,73 @@ def joinall(threads):
def is_mongos(conn):
res = conn.admin.command('ismaster')
return res.get('msg', '') == 'isdbgrid'
+
+def read_from_which_host(
+ rsc,
+ mode,
+ tag_sets=None,
+ secondary_acceptable_latency_ms=15
+):
+ """Read from a ReplicaSetConnection with the given Read Preference mode,
+ tags, and acceptable latency. Return the 'host:port' which was read from.
+
+ :Parameters:
+ - `rsc`: A ReplicaSetConnection
+ - `mode`: A ReadPreference
+ - `tag_sets`: List of dicts of tags for data-center-aware reads
+ - `secondary_acceptable_latency_ms`: a float
+ """
+ db = rsc.pymongo_test
+ db.read_preference = mode
+ if isinstance(tag_sets, dict):
+ tag_sets = [tag_sets]
+ db.tag_sets = tag_sets or [{}]
+ db.secondary_acceptable_latency_ms = secondary_acceptable_latency_ms
+
+ cursor = db.test.find()
+ try:
+ try:
+ cursor.next()
+ except StopIteration:
+ # No documents in collection, that's fine
+ pass
+
+ return cursor._Cursor__connection_id
+ except AutoReconnect:
+ return None
+
+def assertReadFrom(testcase, rsc, member, *args, **kwargs):
+ """Check that a query with the given mode, tag_sets, and
+ secondary_acceptable_latency_ms reads from the expected replica-set
+ member
+
+ :Parameters:
+ - `testcase`: A unittest.TestCase
+ - `rsc`: A ReplicaSetConnection
+ - `member`: replica_set_connection.Member expected to be used
+ - `mode`: A ReadPreference
+ - `tag_sets`: List of dicts of tags for data-center-aware reads
+ - `secondary_acceptable_latency_ms`: a float
+ """
+ for _ in range(10):
+ testcase.assertEqual(member, read_from_which_host(rsc, *args, **kwargs))
+
+def assertReadFromAll(testcase, rsc, members, *args, **kwargs):
+ """Check that a query with the given mode, tag_sets, and
+ secondary_acceptable_latency_ms can read from any of a set of
+ replica-set members.
+
+ :Parameters:
+ - `testcase`: A unittest.TestCase
+ - `rsc`: A ReplicaSetConnection
+ - `members`: Sequence of replica_set_connection.Member expected to be used
+ - `mode`: A ReadPreference
+ - `tag_sets`: List of dicts of tags for data-center-aware reads
+ - `secondary_acceptable_latency_ms`: a float
+ """
+ members = set(members)
+ used = set()
+ for _ in range(100):
+ used.add(read_from_which_host(rsc, *args, **kwargs))
+
+ testcase.assertEqual(members, used)