Implement read preferences for distributing reads among replica set members PYTHON-367

Replace the 'mongo' dict with a Member object everywhere in ReplicaSetConnection.
A handful of commands obey read preferences; most are always sent to primary.
Track a 5-sample moving average of each replica set member's ping time.
Connection detects whether it's connected to primary, secondary, or mongos.
This commit is contained in:
A. Jesse Jiryu Davis 2012-07-24 23:40:30 -04:00
parent 70bf190581
commit f275b2291a
27 changed files with 1619 additions and 386 deletions

View File

@ -22,6 +22,8 @@
.. autoattribute:: database
.. autoattribute:: slave_okay
.. autoattribute:: read_preference
.. autoattribute:: tag_sets
.. autoattribute:: secondary_acceptable_latency_ms
.. autoattribute:: safe
.. autoattribute:: uuid_subtype
.. automethod:: get_lasterror_options

View File

@ -17,11 +17,15 @@
.. autoattribute:: host
.. autoattribute:: port
.. autoattribute:: is_primary
.. autoattribute:: is_mongos
.. autoattribute:: nodes
.. autoattribute:: max_pool_size
.. autoattribute:: document_class
.. autoattribute:: tz_aware
.. autoattribute:: read_preference
.. autoattribute:: tag_sets
.. autoattribute:: secondary_acceptable_latency_ms
.. autoattribute:: slave_okay
.. autoattribute:: safe
.. autoattribute:: is_locked

View File

@ -20,6 +20,8 @@
.. autoattribute:: slave_okay
.. autoattribute:: read_preference
.. autoattribute:: tag_sets
.. autoattribute:: secondary_acceptable_latency_ms
.. autoattribute:: safe
.. automethod:: get_lasterror_options
.. automethod:: set_lasterror_options

View File

@ -13,7 +13,7 @@
Alias for :class:`pymongo.replica_set_connection.ReplicaSetConnection`.
.. autoclass:: pymongo.ReadPreference
.. autoclass:: pymongo.read_preferences.ReadPreference
.. autofunction:: has_c
Sub-modules:

View File

@ -21,6 +21,8 @@
.. autoattribute:: primary
.. autoattribute:: secondaries
.. autoattribute:: read_preference
.. autoattribute:: tag_sets
.. autoattribute:: secondary_acceptable_latency_ms
.. autoattribute:: max_pool_size
.. autoattribute:: document_class
.. autoattribute:: tz_aware

View File

@ -100,7 +100,7 @@ Important New Features:
automatic failover handling and periodically checks the state of the
replica set to handle issues like primary stepdown or secondaries
being removed for backup operations. Read preferences are defined through
:class:`~pymongo.ReadPreference`.
:class:`~pymongo.read_preferences.ReadPreference`.
- PyMongo supports the new BSON binary subtype 4 for UUIDs. The default
subtype to use can be set through
:attr:`~pymongo.collection.Collection.uuid_subtype`

View File

@ -53,8 +53,9 @@ Additionally, it will use a background greenlet instead of a background thread
to monitor the state of the replica set.
Using :meth:`~pymongo.replica_set_connection.ReplicaSetConnection.start_request()`
with :class:`~pymongo.ReadPreference` PRIMARY ensures that the current greenlet
uses the same socket for all operations until a call to :meth:`end_request()`.
with :class:`~pymongo.read_preferences.ReadPreference` PRIMARY ensures that the
current greenlet uses the same socket for all operations until a call to
:meth:`end_request()`.
You must `install Gevent <http://gevent.org/>`_ to use
:class:`~pymongo.replica_set_connection.ReplicaSetConnection`

View File

@ -155,31 +155,113 @@ the operation will succeed::
ReplicaSetConnection
--------------------
In Pymongo-2.1 a new ReplicaSetConnection class was added that provides
some new features not supported in the original Connection class. The most
important of these is the ability to distribute queries to the secondary
members of a replica set. To connect using ReplicaSetConnection just
provide a host:port pair and the name of the replica set::
Using a :class:`~pymongo.replica_set_connection.ReplicaSetConnection` instead
of a simple :class:`~pymongo.connection.Connection` offers two key features:
secondary reads and replica set health monitoring. To connect using
`ReplicaSetConnection` just provide a host:port pair and the name of the
replica set::
>>> from pymongo import ReplicaSetConnection
>>> ReplicaSetConnection("morton.local:27017", replicaSet='foo')
ReplicaSetConnection([u'morton.local:27019', u'morton.local:27017', u'morton.local:27018'])
Secondary Reads
'''''''''''''''
By default an instance of ReplicaSetConnection will only send queries to
the primary member of the replica set. To use secondary members for queries
we have to change the read preference::
the primary member of the replica set. To use secondaries for queries
we have to change the :class:`~pymongo.read_preference.ReadPreference`::
>>> db = ReplicaSetConnection("morton.local:27017", replicaSet='foo').test
>>> from pymongo import ReadPreference
>>> db.read_preference = ReadPreference.SECONDARY
>>> from pymongo.read_preference import ReadPreference
>>> db.read_preference = ReadPreference.SECONDARY_PREFERRED
Now all queries will be sent to the secondary members of the set. If there are
no secondary members the primary will be used as a fallback. If you have
queries you would prefer to never send to the primary you can specify that
using the SECONDARY_ONLY read preference::
using the ``SECONDARY`` read preference::
>>> db.read_preference = ReadPreference.SECONDARY_ONLY
>>> db.read_preference = ReadPreference.SECONDARY
Read preference can be set on a connection, database, collection, or on a
per-query basis.
per-query basis, e.g.::
>>> db.collection.find_one(read_preference=ReadPreference.PRIMARY)
Reads are configured using three options: **read_preference**, **tag_sets**,
and **secondary_acceptable_latency_ms**.
**read_preference**:
- ``PRIMARY``: Read from the primary. This is the default, and provides the
strongest consistency. If no primary is available, raise
:class:`~pymongo.errors.AutoReconnect`.
- ``PRIMARY_PREFERRED``: Read from the primary if available, or if there is
none, read from a secondary matching your choice of ``tag_sets`` and
``secondary_acceptable_latency_ms``.
- ``SECONDARY``: Read from a secondary matching your choice of ``tag_sets`` and
``secondary_acceptable_latency_ms``. If no matching secondary is available,
raise :class:`~pymongo.errors.AutoReconnect`.
- ``SECONDARY_PREFERRED``: Read from a secondary matching your choice of
``tag_sets`` and ``secondary_acceptable_latency_ms`` if available, otherwise
from primary (regardless of the primary's tags and latency).
- ``NEAREST``: Read from any member matching your choice of ``tag_sets`` and
``secondary_acceptable_latency_ms``.
**tag_sets**:
Replica-set members can be `tagged
<http://www.mongodb.org/display/DOCS/Data+Center+Awareness>`_ according to any
criteria you choose. By default, ReplicaSetConnection ignores tags when
choosing a member to read from, but it can be configured with the ``tag_sets``
parameter. ``tag_sets`` must be a list of dictionaries, each dict providing tag
values that the replica set member must match. ReplicaSetConnection tries each
set of tags in turn until it finds a set of tags with at least one matching
member. For example, to prefer reads from the New York data center, but fall
back to the San Francisco data center, tag your replica set members according
to their location and create a ReplicaSetConnection like so:
>>> rsc = ReplicaSetConnection(
... "morton.local:27017",
... replicaSet='foo'
... read_preference=ReadPreference.SECONDARY,
... tag_sets=[{'dc': 'ny'}, {'dc': 'sf'}]
... )
ReplicaSetConnection tries to find secondaries in New York, then San Francisco,
and raises :class:`~pymongo.errors.AutoReconnect` if none are available. As an
additional fallback, specify a final, empty tag set, ``{}``, which means "read
from any member that matches the mode, ignoring tags."
**secondary_acceptable_latency_ms**:
If multiple members match the mode and tag sets, ReplicaSetConnection reads
from among the nearest members, chosen according to ping time. By default,
only members whose ping times are within 15 milliseconds of the nearest
are used for queries. You can choose to distribute reads among members with
higher latencies by setting ``secondary_acceptable_latency_ms`` to a larger
number. In that case, ReplicaSetConnection distributes reads among matching
members within ``secondary_acceptable_latency_ms`` of the closest member's
ping time.
Health Monitoring
'''''''''''''''''
When ReplicaSetConnection is initialized it launches a background task to
monitor the replica set for changes in:
* Health: detect when a member goes down or comes up, or if a different member
becomes primary
* Configuration: detect changes in tags
* Latency: track a moving average of each member's ping time
Replica-set monitoring ensures queries are continually routed to the proper
members as the state of the replica set changes.
It is critical to call
:meth:`~pymongo.replica_set_connection.ReplicaSetConnection.close` to terminate
the monitoring task before your process exits.

View File

@ -54,7 +54,8 @@ class GridFS(object):
self.__collection = database[collection]
self.__files = self.__collection.files
self.__chunks = self.__collection.chunks
if not database.slave_okay and not database.read_preference:
connection = database.connection
if not hasattr(connection, 'is_primary') or connection.is_primary:
self.__chunks.ensure_index([("files_id", ASCENDING),
("n", ASCENDING)],
unique=True)
@ -158,7 +159,7 @@ class GridFS(object):
:Parameters:
- `filename`: ``"filename"`` of the file to get, or `None`
- `version` (optional): version of the file to get (defualts
- `version` (optional): version of the file to get (defaults
to -1, the most recent version uploaded)
- `**kwargs` (optional): find files by custom metadata.
@ -168,8 +169,8 @@ class GridFS(object):
Accept keyword arguments to find files by custom metadata.
.. versionadded:: 1.9
"""
database = self.__database
if not database.slave_okay and not database.read_preference:
connection = self.__database.connection
if not hasattr(connection, 'is_primary') or connection.is_primary:
self.__files.ensure_index([("filename", ASCENDING),
("uploadDate", DESCENDING)])

View File

@ -48,49 +48,6 @@ SLOW_ONLY = 1
ALL = 2
"""Profile all operations."""
class ReadPreference:
"""An enum that defines the read preferences supported by PyMongo.
+----------------------+--------------------------------------------------+
| Connection type | Read Preference |
+======================+================+================+================+
| |`PRIMARY` |`SECONDARY` |`SECONDARY_ONLY`|
+----------------------+----------------+----------------+----------------+
|Connection to a single|Queries are |Queries are |Same as |
|host. |allowed if the |allowed if the |`SECONDARY` |
| |connection is to|connection is to| |
| |the replica set |the replica set | |
| |primary. |primary or a | |
| | |secondary. | |
+----------------------+----------------+----------------+----------------+
|Connection to a |Queries are sent|Queries are |Same as |
|mongos. |to the primary |distributed |`SECONDARY` |
| |of a shard. |among shard | |
| | |secondaries. | |
| | |Queries are sent| |
| | |to the primary | |
| | |if no | |
| | |secondaries are | |
| | |available. | |
| | | | |
+----------------------+----------------+----------------+----------------+
|ReplicaSetConnection |Queries are sent|Queries are |Queries are |
| |to the primary |distributed |never sent to |
| |of the replica |among replica |the replica set |
| |set. |set secondaries.|primary. An |
| | |Queries are sent|exception is |
| | |to the primary |raised if no |
| | |if no |secondary is |
| | |secondaries are |available. |
| | |available. | |
| | | | |
+----------------------+----------------+----------------+----------------+
"""
PRIMARY = 0
SECONDARY = 1
SECONDARY_ONLY = 2
version_tuple = (2, 2, 1, '+')
def get_version_string():
@ -103,6 +60,7 @@ version = get_version_string()
from pymongo.connection import Connection
from pymongo.replica_set_connection import ReplicaSetConnection
from pymongo.read_preferences import ReadPreference
def has_c():
"""Is the C extension installed?

View File

@ -23,7 +23,7 @@ from pymongo import (common,
helpers,
message)
from pymongo.cursor import Cursor
from pymongo.errors import ConfigurationError, InvalidName, InvalidOperation
from pymongo.errors import ConfigurationError, InvalidName
def _gen_index_name(keys):
@ -74,11 +74,14 @@ class Collection(common.BaseObject):
.. mongodoc:: collections
"""
super(Collection,
self).__init__(slave_okay=database.slave_okay,
read_preference=database.read_preference,
safe=database.safe,
**(database.get_lasterror_options()))
super(Collection, self).__init__(
slave_okay=database.slave_okay,
read_preference=database.read_preference,
tag_sets=database.tag_sets,
secondary_acceptable_latency_ms=(
database.secondary_acceptable_latency_ms),
safe=database.safe,
**(database.get_lasterror_options()))
if not isinstance(name, basestring):
raise TypeError("name must be an instance "
@ -590,13 +593,20 @@ class Collection(common.BaseObject):
:class:`~pymongo.connection.Connection`-level default
- `read_preference` (optional): The read preference for
this query.
- `tag_sets` (optional): The tag sets for this query.
- `secondary_acceptable_latency_ms` (optional): Any replica-set
member whose ping time is within secondary_acceptable_latency_ms of
the nearest member may accept reads. Default 15 milliseconds.
.. note:: The `manipulate` parameter may default to False in
a future release.
.. note:: The `max_scan` parameter requires server
version **>= 1.5.1**
.. versionadded:: 2.2.1+
The `tag_sets` and `secondary_acceptable_latency_ms` parameters.
.. versionadded:: 1.11+
The `await_data`, `partial`, and `manipulate` parameters.
@ -619,6 +629,11 @@ class Collection(common.BaseObject):
kwargs['slave_okay'] = self.slave_okay
if not 'read_preference' in kwargs:
kwargs['read_preference'] = self.read_preference
if not 'tag_sets' in kwargs:
kwargs['tag_sets'] = self.tag_sets
if not 'secondary_acceptable_latency_ms' in kwargs:
kwargs['secondary_acceptable_latency_ms'] = (
self.secondary_acceptable_latency_ms)
return Cursor(self, *args, **kwargs)
def count(self):
@ -924,7 +939,6 @@ class Collection(common.BaseObject):
return self.__database.command("aggregate", self.__name,
pipeline=pipeline,
read_preference=self.read_preference,
slave_okay=self.slave_okay,
_use_master=use_master)
@ -949,9 +963,10 @@ class Collection(common.BaseObject):
With :class:`~pymongo.replica_set_connection.ReplicaSetConnection`
or :class:`~pymongo.master_slave_connection.MasterSlaveConnection`,
if the `read_preference` attribute of this instance is not set to
:attr:`pymongo.ReadPreference.PRIMARY` or the (deprecated)
`slave_okay` attribute of this instance is set to `True` the group
command will be sent to a secondary or slave.
:attr:`pymongo.read_preferences.ReadPreference.PRIMARY` or
:attr:`pymongo.read_preferences.ReadPreference.PRIMARY_PREFERRED`, or
the (deprecated) `slave_okay` attribute of this instance is set to
`True`, the group command will be sent to a secondary or slave.
:Parameters:
- `key`: fields to group by (see above description)
@ -989,6 +1004,9 @@ class Collection(common.BaseObject):
return self.__database.command("group", group,
uuid_subtype=self.__uuid_subtype,
read_preference=self.read_preference,
tag_sets=self.tag_sets,
secondary_acceptable_latency_ms=(
self.secondary_acceptable_latency_ms),
slave_okay=self.slave_okay,
_use_master=use_master)["retval"]
@ -1089,10 +1107,20 @@ class Collection(common.BaseObject):
raise TypeError("'out' must be an instance of "
"%s or dict" % (basestring.__name__,))
if isinstance(out, dict) and out.get('inline'):
must_use_master = False
else:
must_use_master = True
response = self.__database.command("mapreduce", self.__name,
uuid_subtype=self.__uuid_subtype,
map=map, reduce=reduce,
out=out, **kwargs)
read_preference=self.read_preference,
tag_sets=self.tag_sets,
secondary_acceptable_latency_ms=(
self.secondary_acceptable_latency_ms),
out=out, _use_master=must_use_master,
**kwargs)
if full_response or not response.get('result'):
return response
@ -1117,9 +1145,10 @@ class Collection(common.BaseObject):
With :class:`~pymongo.replica_set_connection.ReplicaSetConnection`
or :class:`~pymongo.master_slave_connection.MasterSlaveConnection`,
if the `read_preference` attribute of this instance is not set to
:attr:`pymongo.ReadPreference.PRIMARY` or the (deprecated)
`slave_okay` attribute of this instance is set to `True` the inline
map reduce will be run on a secondary or slave.
:attr:`pymongo.read_preferences.ReadPreference.PRIMARY` or
:attr:`pymongo.read_preferences.ReadPreference.PRIMARY_PREFERRED`, or
the (deprecated) `slave_okay` attribute of this instance is set to
`True`, the inline map reduce will be run on a secondary or slave.
:Parameters:
- `map`: map function (as a JavaScript string)
@ -1142,6 +1171,9 @@ class Collection(common.BaseObject):
res = self.__database.command("mapreduce", self.__name,
uuid_subtype=self.__uuid_subtype,
read_preference=self.read_preference,
tag_sets=self.tag_sets,
secondary_acceptable_latency_ms=(
self.secondary_acceptable_latency_ms),
slave_okay=self.slave_okay,
_use_master=use_master,
map=map, reduce=reduce,

View File

@ -15,8 +15,9 @@
"""Functions and classes common to multiple pymongo modules."""
import warnings
from pymongo import read_preferences
from pymongo import ReadPreference
from pymongo.read_preferences import ReadPreference
from pymongo.errors import ConfigurationError
@ -83,31 +84,60 @@ def validate_int_or_basestring(option, value):
"integer or a string" % (option,))
def validate_positive_float(option, value):
"""Validates that 'value' is a float, or can be converted to one, and is
positive.
"""
err = ConfigurationError("%s must be a positive int or float" % (option,))
try:
value = float(value)
except (ValueError, TypeError):
raise err
if value <= 0:
raise err
return value
def validate_timeout_or_none(option, value):
"""Validates a timeout specified in milliseconds returning
a value in floating point seconds.
"""
if value is None:
return value
try:
value = float(value)
except (ValueError, TypeError):
raise ConfigurationError("%s must be an "
"instance of int or float" % (option,))
if value <= 0:
raise ConfigurationError("%s must be a positive integer" % (option,))
return value / 1000.0
return validate_positive_float(option, value) / 1000.0
def validate_read_preference(dummy, value):
"""Validate read preference for a ReplicaSetConnection.
"""
if value not in range(ReadPreference.PRIMARY,
ReadPreference.SECONDARY_ONLY + 1):
if value not in read_preferences.modes:
raise ConfigurationError("Not a valid read preference")
return value
def validate_tag_sets(dummy, value):
"""Validate tag sets for a ReplicaSetConnection.
"""
if value is None:
return [{}]
if not isinstance(value, list):
raise ConfigurationError((
"Tag sets %s invalid, must be a list" ) % repr(value))
if len(value) == 0:
raise ConfigurationError((
"Tag sets %s invalid, must be None or contain at least one set of"
" tags") % repr(value))
for tags in value:
if not isinstance(tags, dict):
raise ConfigurationError(
"Tag set %s invalid, must be a dict" % repr(tags))
return value
# jounal is an alias for j,
# wtimeoutms is an alias for wtimeout
VALIDATORS = {
@ -125,6 +155,9 @@ VALIDATORS = {
'sockettimeoutms': validate_timeout_or_none,
'ssl': validate_boolean,
'read_preference': validate_read_preference,
'tag_sets': validate_tag_sets,
'secondaryacceptablelatencyms': validate_positive_float,
'secondary_acceptable_latency_ms': validate_positive_float,
'auto_start_request': validate_boolean,
'use_greenlets': validate_boolean,
}
@ -160,9 +193,16 @@ class BaseObject(object):
self.__slave_okay = False
self.__read_pref = ReadPreference.PRIMARY
self.__tag_sets = [{}]
self.__secondary_acceptable_latency_ms = 15
self.__safe = False
self.__safe_opts = {}
self.__set_options(options)
if (self.__read_pref == ReadPreference.PRIMARY
and self.__tag_sets != [{}]
):
raise ConfigurationError(
"ReadPreference PRIMARY cannot be combined with tags")
def __set_safe_option(self, option, value, check=False):
"""Validates and sets getlasterror options for this
@ -183,6 +223,14 @@ class BaseObject(object):
self.__slave_okay = validate_boolean(option, value)
elif option == 'read_preference':
self.__read_pref = validate_read_preference(option, value)
elif option == 'tag_sets':
self.__tag_sets = validate_tag_sets(option, value)
elif option in (
'secondaryAcceptableLatencyMS',
'secondary_acceptable_latency_ms'
):
self.__secondary_acceptable_latency_ms = \
validate_positive_float(option, value)
elif option == 'safe':
self.__safe = validate_boolean(option, value)
elif option in SAFE_OPTIONS:
@ -211,9 +259,9 @@ class BaseObject(object):
slave_okay = property(__get_slave_okay, __set_slave_okay)
def __get_read_pref(self):
"""The read preference for this instance.
"""The read preference mode for this instance.
See :class:`~pymongo.ReadPreference` for available options.
See :class:`~pymongo.read_preferences.ReadPreference` for available options.
.. versionadded:: 2.1
"""
@ -224,6 +272,47 @@ class BaseObject(object):
self.__read_pref = validate_read_preference('read_preference', value)
read_preference = property(__get_read_pref, __set_read_pref)
def __get_acceptable_latency(self):
"""Any replica-set member whose ping time is within
secondary_acceptable_latency_ms of the nearest member may accept
reads. Defaults to 15 milliseconds.
See :class:`~pymongo.read_preferences.ReadPreference`.
.. versionadded:: 2.2.1+
"""
return self.__secondary_acceptable_latency_ms
def __set_acceptable_latency(self, value):
"""Property setter for secondary_acceptable_latency_ms"""
self.__secondary_acceptable_latency_ms = (validate_positive_float(
'secondary_acceptable_latency_ms', value))
secondary_acceptable_latency_ms = property(
__get_acceptable_latency, __set_acceptable_latency)
def __get_tag_sets(self):
"""Set ``tag_sets`` to a list of dictionaries like [{'dc': 'ny'}] to
read only from members whose ``dc`` tag has the value ``"ny"``.
To specify a priority-order for tag sets, provide a list of
tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag
set, ``{}``, means "read from any member that matches the mode,
ignoring tags." ReplicaSetConnection tries each set of tags in turn
until it finds a set of tags with at least one matching member.
.. seealso:: `Data-Center Awareness
<http://www.mongodb.org/display/DOCS/Data+Center+Awareness>`_
.. versionadded:: 2.2.1+
"""
return self.__tag_sets
def __set_tag_sets(self, value):
"""Property setter for tag_sets"""
self.__tag_sets = validate_tag_sets('tag_sets', value)
tag_sets = property(__get_tag_sets, __set_tag_sets)
def __get_safe(self):
"""Use getlasterror with every write operation?

View File

@ -130,7 +130,7 @@ class Connection(common.BaseObject):
Other optional parameters can be passed as keyword arguments:
- `safe`: Use getlasterror for each write operation?
- `j` or `journal`: Block until write operations have been commited
- `j` or `journal`: Block until write operations have been committed
to the journal. Ignored if the server is running without journaling.
Implies safe=True.
- `w`: (integer or string) If this is a replica set write operations
@ -154,7 +154,8 @@ class Connection(common.BaseObject):
before timing out.
- `ssl`: If True, create the connection to the server using SSL.
- `read_preference`: The read preference for this connection.
See :class:`~pymongo.ReadPreference` for available options.
See :class:`~pymongo.read_preferences.ReadPreference` for available
options.
- `auto_start_request`: If True (the default), each thread that
accesses this Connection has a socket allocated to it for the
thread's lifetime. This ensures consistent reads, even if you read
@ -228,6 +229,8 @@ class Connection(common.BaseObject):
self.__nodes = seeds
self.__host = None
self.__port = None
self.__is_primary = False
self.__is_mongos = False
for option, value in kwargs.iteritems():
option, value = common.validate(option, value)
@ -425,6 +428,23 @@ class Connection(common.BaseObject):
"""
return self.__port
@property
def is_primary(self):
"""If this Connection is connected to a standalone, a replica-set
primary, or the master of a master-slave set.
.. versionadded:: 2.2.1+
"""
return self.__is_primary
@property
def is_mongos(self):
"""If this Connection is connected to mongos.
.. versionadded:: 2.2.1+
"""
return self.__is_mongos
@property
def max_pool_size(self):
"""The maximum pool size limit set for this connection.
@ -507,7 +527,7 @@ class Connection(common.BaseObject):
def __try_node(self, node):
"""Try to connect to this node and see if it works
for our connection type.
for our connection type. Returns ((host, port), is_primary, is_mongos).
:Parameters:
- `node`: The (host, port) pair to try.
@ -539,7 +559,7 @@ class Connection(common.BaseObject):
# TODO: Rework this for PYTHON-368 (mongos high availability).
if not self.__nodes:
self.__nodes = set([node])
return node
return node, True, response.get('msg', '') == 'isdbgrid'
elif "primary" in response:
candidate = _partition_node(response["primary"])
return self.__try_node(candidate)
@ -550,7 +570,7 @@ class Connection(common.BaseObject):
# Direct connection
if response.get("arbiterOnly", False):
raise ConfigurationError("%s:%d is an arbiter" % node)
return node
return node, response['ismaster'], response.get('msg', '') == 'isdbgrid'
def __find_node(self, seeds=None):
"""Find a host, port pair suitable for our connection type.
@ -570,24 +590,27 @@ class Connection(common.BaseObject):
In either case a connection to an arbiter will never succeed.
Sets __host and __port so that :attr:`host` and :attr:`port`
will return the address of the connected host.
will return the address of the connected host. Sets __is_primary to
True if this is a primary or master, else False.
"""
errors = []
# self.__nodes may change size as we iterate.
candidates = seeds or self.__nodes.copy()
for candidate in candidates:
try:
node = self.__try_node(candidate)
if node:
return node
node, is_primary, is_mongos = self.__try_node(candidate)
self.__is_primary = is_primary
self.__is_mongos = is_mongos
return node
except Exception, why:
errors.append(str(why))
# Try any hosts we discovered that were not in the seed list.
for candidate in self.__nodes - candidates:
try:
node = self.__try_node(candidate)
if node:
return node
node, is_primary, is_mongos = self.__try_node(candidate)
self.__is_primary = is_primary
self.__is_mongos = is_mongos
return node
except Exception, why:
errors.append(str(why))
# Couldn't find a suitable host.

View File

@ -16,9 +16,8 @@
from bson.code import Code
from bson.son import SON
from pymongo import (helpers,
message,
ReadPreference)
from pymongo import helpers, message, read_preferences
from pymongo.read_preferences import ReadPreference
from pymongo.errors import (InvalidOperation,
AutoReconnect)
@ -43,7 +42,8 @@ class Cursor(object):
timeout=True, snapshot=False, tailable=False, sort=None,
max_scan=None, as_class=None, slave_okay=False,
await_data=False, partial=False, manipulate=True,
read_preference=ReadPreference.PRIMARY,
read_preference=ReadPreference.PRIMARY, tag_sets=[{}],
secondary_acceptable_latency_ms=None,
_must_use_master=False, _uuid_subtype=None, **kwargs):
"""Create a new cursor.
@ -113,6 +113,8 @@ class Cursor(object):
self.__slave_okay = slave_okay
self.__manipulate = manipulate
self.__read_preference = read_preference
self.__tag_sets = tag_sets
self.__secondary_acceptable_latency_ms = secondary_acceptable_latency_ms
self.__tz_aware = collection.database.connection.tz_aware
self.__must_use_master = _must_use_master
self.__uuid_subtype = _uuid_subtype or collection.uuid_subtype
@ -179,6 +181,9 @@ class Cursor(object):
copy.__partial = self.__partial
copy.__manipulate = self.__manipulate
copy.__read_preference = self.__read_preference
copy.__tag_sets = self.__tag_sets
copy.__secondary_acceptable_latency_ms = (
self.__secondary_acceptable_latency_ms)
copy.__must_use_master = self.__must_use_master
copy.__uuid_subtype = self.__uuid_subtype
copy.__query_flags = self.__query_flags
@ -217,6 +222,14 @@ class Cursor(object):
operators["$snapshot"] = True
if self.__max_scan:
operators["$maxScan"] = self.__max_scan
if self.__collection.database.connection.is_mongos:
read_pref = {
'mode': read_preferences.mongos_mode(self.__read_preference)}
if self.__tag_sets and self.__tag_sets != [{}]:
read_pref['tags'] = self.__tag_sets
operators['$readPreference'] = read_pref
if operators:
# Make a shallow copy so we can cleanly rewind or clone.
@ -243,7 +256,9 @@ class Cursor(object):
options = self.__query_flags
if self.__tailable:
options |= _QUERY_OPTIONS["tailable_cursor"]
if self.__slave_okay or self.__read_preference:
if (self.__slave_okay
or self.__read_preference != ReadPreference.PRIMARY
):
options |= _QUERY_OPTIONS["slave_okay"]
if not self.__timeout:
options |= _QUERY_OPTIONS["no_timeout"]
@ -474,8 +489,10 @@ class Cursor(object):
With :class:`~pymongo.replica_set_connection.ReplicaSetConnection`
or :class:`~pymongo.master_slave_connection.MasterSlaveConnection`,
if `read_preference` is not :attr:`pymongo.ReadPreference.PRIMARY` or
(deprecated) `slave_okay` is `True` the count command will be sent to
if `read_preference` is not
:attr:`pymongo.read_preferences.ReadPreference.PRIMARY` or
:attr:`pymongo.read_preferences.ReadPreference.PRIMARY_PREFERRED`, or
(deprecated) `slave_okay` is `True`, the count command will be sent to
a secondary or slave.
:Parameters:
@ -494,6 +511,9 @@ class Cursor(object):
command = {"query": self.__spec, "fields": self.__fields}
command['read_preference'] = self.__read_preference
command['tag_sets'] = self.__tag_sets
command['secondary_acceptable_latency_ms'] = (
self.__secondary_acceptable_latency_ms)
command['slave_okay'] = self.__slave_okay
use_master = not self.__slave_okay and not self.__read_preference
command['_use_master'] = use_master
@ -522,7 +542,8 @@ class Cursor(object):
With :class:`~pymongo.replica_set_connection.ReplicaSetConnection`
or :class:`~pymongo.master_slave_connection.MasterSlaveConnection`,
if `read_preference` is not :attr:`pymongo.ReadPreference.PRIMARY` or
if `read_preference` is
not :attr:`pymongo.read_preferences.ReadPreference.PRIMARY` or
(deprecated) `slave_okay` is `True` the distinct command will be sent
to a secondary or slave.
@ -544,6 +565,9 @@ class Cursor(object):
options["query"] = self.__spec
options['read_preference'] = self.__read_preference
options['tag_sets'] = self.__tag_sets
options['secondary_acceptable_latency_ms'] = (
self.__secondary_acceptable_latency_ms)
options['slave_okay'] = self.__slave_okay
use_master = not self.__slave_okay and not self.__read_preference
options['_use_master'] = use_master
@ -629,6 +653,9 @@ class Cursor(object):
db = self.__collection.database
kwargs = {"_must_use_master": self.__must_use_master}
kwargs["read_preference"] = self.__read_preference
kwargs["tag_sets"] = self.__tag_sets
kwargs["secondary_acceptable_latency_ms"] = (
self.__secondary_acceptable_latency_ms)
if self.__connection_id is not None:
kwargs["_connection_to_use"] = self.__connection_id
kwargs.update(self.__kwargs)

View File

@ -26,6 +26,7 @@ from pymongo.errors import (CollectionInvalid,
InvalidName,
OperationFailure)
from pymongo.son_manipulator import ObjectIdInjector
from pymongo import read_preferences as rp
def _check_name(name):
@ -62,6 +63,9 @@ class Database(common.BaseObject):
super(Database,
self).__init__(slave_okay=connection.slave_okay,
read_preference=connection.read_preference,
tag_sets=connection.tag_sets,
secondary_acceptable_latency_ms=(
connection.secondary_acceptable_latency_ms),
safe=connection.safe,
**(connection.get_lasterror_options()))
@ -313,9 +317,23 @@ class Database(common.BaseObject):
in this list will be ignored by error-checking
- `uuid_subtype` (optional): The BSON binary subtype to use
for a UUID used in this command.
- `read_preference`: The read preference for this connection.
See :class:`~pymongo.read_preferences.ReadPreference` for available
options.
- `tag_sets`: Read from replica-set members with these tags.
To specify a priority-order for tag sets, provide a list of
tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag
set, ``{}``, means "read from any member that matches the mode,
ignoring tags." ReplicaSetConnection tries each set of tags in turn
until it finds a set of tags with at least one matching member.
- `secondary_acceptable_latency_ms`: Any replica-set member whose
ping time is within secondary_acceptable_latency_ms of the nearest
member may accept reads. Default 15 milliseconds.
- `**kwargs` (optional): additional keyword arguments will
be added to the command document before it is sent
.. versionchanged:: 2.2.1+
Added `tag_sets` and `secondary_acceptable_latency_ms` options.
.. versionchanged:: 2.2
Added support for `as_class` - the class you want to use for
the resulting documents
@ -332,15 +350,35 @@ class Database(common.BaseObject):
if isinstance(command, basestring):
command = SON([(command, value)])
command_name = command.keys()[0]
must_use_master = kwargs.pop('_use_master', False)
if command_name.lower() not in rp.secondary_ok_commands:
must_use_master = True
# Special-case: mapreduce can go to secondaries only if inline
if command_name == 'mapreduce':
out = command.get('out') or kwargs.get('out')
if not isinstance(out, dict) or not out.get('inline'):
must_use_master = True
extra_opts = {
'as_class': kwargs.pop('as_class', None),
'read_preference': kwargs.pop('read_preference',
self.read_preference),
'slave_okay': kwargs.pop('slave_okay', self.slave_okay),
'_must_use_master': kwargs.pop('_use_master', True),
'_must_use_master': must_use_master,
'_uuid_subtype': uuid_subtype
}
if not must_use_master:
extra_opts['read_preference'] = kwargs.pop(
'read_preference',
self.read_preference)
extra_opts['tag_sets'] = kwargs.pop(
'tag_sets',
self.tag_sets)
extra_opts['secondary_acceptable_latency_ms'] = kwargs.pop(
'secondary_acceptable_latency_ms',
self.secondary_acceptable_latency_ms)
fields = kwargs.get('fields')
if fields is not None and not isinstance(fields, dict):
kwargs['fields'] = helpers._fields_list_to_dict(fields)

View File

@ -39,6 +39,9 @@ class AutoReconnect(ConnectionFailure):
will continue to raise this exception until the first successful
connection is made).
"""
def __init__(self, message='', errors=None):
self.errors = errors or []
ConnectionFailure.__init__(self, message)
class ConfigurationError(PyMongoError):

View File

@ -39,8 +39,8 @@ class MasterSlaveConnection(BaseObject):
to create this `MasterSlaveConnection` can themselves make use of
connection pooling, etc. 'Connection' instances used as slaves should
be created with the read_preference option set to
:attr:`~pymongo.ReadPreference.SECONDARY`. Safe options are
inherited from `master` and can be changed in this instance.
:attr:`~pymongo.read_preferences.ReadPreference.SECONDARY`. Safe
options are inherited from `master` and can be changed in this instance.
Raises TypeError if `master` is not an instance of `Connection` or
slaves is not a list of at least one `Connection` instances.
@ -85,6 +85,14 @@ class MasterSlaveConnection(BaseObject):
def slaves(self):
return self.__slaves
@property
def is_mongos(self):
"""If this MasterSlaveConnection is connected to mongos (always False)
.. versionadded:: 2.2.1+
"""
return False
def get_document_class(self):
return self.__document_class

208
pymongo/read_preferences.py Normal file
View File

@ -0,0 +1,208 @@
# Copyright 2012 10gen, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License",
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for choosing which member of a replica set to read from."""
import random
from collections import deque
from pymongo.errors import ConfigurationError
class ReadPreference:
"""An enum that defines the read preferences supported by PyMongo. Used in
three cases:
:class:`~pymongo.connection.Connection` to a single host:
* `PRIMARY`: Queries are allowed if the connection is to the replica set
primary.
* `PRIMARY_PREFERRED`: Queries are allowed if the connection is to the
primary or a secondary.
* `SECONDARY`: Queries are allowed if the connection is to a secondary.
* `SECONDARY_PREFERRED`: Same as `PRIMARY_PREFERRED`.
* `NEAREST`: Same as `PRIMARY_PREFERRED`.
:class:`~pymongo.connection.Connection` to a mongos, with a sharded cluster
of replica sets:
* `PRIMARY`: Queries are sent to the primary of a shard.
* `PRIMARY_PREFERRED`: Queries are sent to the primary if available,
otherwise a secondary.
* `SECONDARY`: Queries are distributed among shard secondaries. An error
is raised if no secondaries are available.
* `SECONDARY_PREFERRED`: Queries are distributed among shard secondaries,
or the primary if no secondary is available.
* `NEAREST`: Queries are distributed among all members of a shard.
:class:`~pymongo.replica_set_connection.ReplicaSetConnection`:
* `PRIMARY`: Queries are sent to the primary of the replica set.
* `PRIMARY_PREFERRED`: Queries are sent to the primary if available,
otherwise a secondary.
* `SECONDARY`: Queries are distributed among secondaries. An error
is raised if no secondaries are available.
* `SECONDARY_PREFERRED`: Queries are distributed among secondaries,
or the primary if no secondary is available.
* `NEAREST`: Queries are distributed among all members.
"""
PRIMARY = 0
PRIMARY_PREFERRED = 1
SECONDARY = 2
SECONDARY_ONLY = 2
SECONDARY_PREFERRED = 3
NEAREST = 4
# For formatting error messages
modes = {
ReadPreference.PRIMARY: 'PRIMARY',
ReadPreference.PRIMARY_PREFERRED: 'PRIMARY_PREFERRED',
ReadPreference.SECONDARY: 'SECONDARY',
ReadPreference.SECONDARY_PREFERRED: 'SECONDARY_PREFERRED',
ReadPreference.NEAREST: 'NEAREST',
}
def select_primary(members):
for member in members:
if member.is_primary:
if member.up:
return member
else:
return None
return None
def select_member_with_tags(members, tags, secondary_only, latency):
candidates = []
for candidate in members:
if not candidate.up:
continue
if secondary_only and candidate.is_primary:
continue
if candidate.matches_tags(tags):
candidates.append(candidate)
if not candidates:
return None
# ping_time is in seconds
fastest = min([candidate.get_avg_ping_time() for candidate in candidates])
near_candidates = [
candidate for candidate in candidates
if candidate.get_avg_ping_time() - fastest < latency / 1000.]
return random.choice(near_candidates)
def select_member(
members,
mode=ReadPreference.PRIMARY,
tag_sets=None,
latency=15
):
"""Return a Member or None.
"""
if tag_sets is None:
tag_sets = [{}]
# For brevity
PRIMARY = ReadPreference.PRIMARY
PRIMARY_PREFERRED = ReadPreference.PRIMARY_PREFERRED
SECONDARY = ReadPreference.SECONDARY
SECONDARY_PREFERRED = ReadPreference.SECONDARY_PREFERRED
NEAREST = ReadPreference.NEAREST
if mode == PRIMARY:
if tag_sets != [{}]:
raise ConfigurationError("PRIMARY cannot be combined with tags")
return select_primary(members)
elif mode == PRIMARY_PREFERRED:
candidate_primary = select_member(members, PRIMARY, [{}], latency)
if candidate_primary:
return candidate_primary
else:
return select_member(members, SECONDARY, tag_sets, latency)
elif mode == SECONDARY:
for tags in tag_sets:
candidate = select_member_with_tags(members, tags, True, latency)
if candidate:
return candidate
return None
elif mode == SECONDARY_PREFERRED:
candidate_secondary = select_member(
members, SECONDARY, tag_sets, latency)
if candidate_secondary:
return candidate_secondary
else:
return select_member(members, PRIMARY, [{}], latency)
elif mode == NEAREST:
for tags in tag_sets:
candidate = select_member_with_tags(members, tags, False, latency)
if candidate:
return candidate
# Ran out of tags.
return None
else:
raise ConfigurationError("Invalid mode %s" % repr(mode))
"""Commands that may be sent to replica-set secondaries, depending on
ReadPreference and tags. All other commands are always run on the primary.
"""
secondary_ok_commands = set([
"group", "aggregate", "collstats", "dbstats", "count", "distinct",
"geonear", "geosearch", "geowalk", "mapreduce",
])
class MovingAverage(object):
"""Tracks a moving average. Not thread-safe.
"""
def __init__(self, window_sz):
self.window_sz = window_sz
self.samples = deque()
self.total = 0
def update(self, sample):
self.samples.append(sample)
self.total += sample
if len(self.samples) > self.window_sz:
self.total -= self.samples.popleft()
def get(self):
if self.samples:
return self.total / float(len(self.samples))
else:
return None
def mongos_mode(mode):
return {
ReadPreference.PRIMARY: 'primary',
ReadPreference.PRIMARY_PREFERRED: 'primaryPreferred',
ReadPreference.SECONDARY: 'secondary',
ReadPreference.SECONDARY_PREFERRED: 'secondaryPreferred',
ReadPreference.NEAREST: 'nearest',
}[mode]

View File

@ -47,8 +47,9 @@ from pymongo import (common,
helpers,
message,
pool,
uri_parser,
ReadPreference)
uri_parser)
from pymongo.read_preferences import (
ReadPreference, select_member, modes, MovingAverage)
from pymongo.errors import (AutoReconnect,
ConfigurationError,
ConnectionFailure,
@ -58,7 +59,7 @@ from pymongo.errors import (AutoReconnect,
EMPTY = b("")
MAX_BSON_SIZE = 4 * 1024 * 1024
MAX_RETRY = 3
def _partition_node(node):
"""Split a host:port string returned from mongod/s into
@ -77,24 +78,32 @@ def _partition_node(node):
class Monitor(object):
"""Base class for replica set monitors.
"""
def __init__(self, rsc, interval, event_class):
_refresh_interval = 30
def __init__(self, rsc, event_class):
self.rsc = weakref.proxy(rsc, self.shutdown)
self.interval = interval
self.event = event_class()
self.stopped = False
def shutdown(self, dummy):
"""Signal the monitor to shutdown.
"""
self.stopped = True
self.event.set()
def schedule_refresh(self):
"""Refresh immediately
"""
self.event.set()
def monitor(self):
"""Run until the RSC is collected or an
unexpected error occurs.
"""
while not self.event.isSet():
self.event.wait(self.interval)
if self.event.isSet():
while True:
self.event.wait(Monitor._refresh_interval)
if self.stopped:
break
self.event.clear()
try:
self.rsc.refresh()
except AutoReconnect:
@ -108,8 +117,8 @@ class Monitor(object):
class MonitorThread(Monitor, threading.Thread):
"""Thread based replica set monitor.
"""
def __init__(self, rsc, interval=5):
Monitor.__init__(self, rsc, interval, threading.Event)
def __init__(self, rsc):
Monitor.__init__(self, rsc, threading.Event)
threading.Thread.__init__(self)
self.setName("ReplicaSetMonitorThread")
@ -123,13 +132,16 @@ have_gevent = False
try:
from gevent import Greenlet
from gevent.event import Event
# Used by ReplicaSetConnection
from gevent.local import local as gevent_local
have_gevent = True
class MonitorGreenlet(Monitor, Greenlet):
"""Greenlet based replica set monitor.
"""
def __init__(self, rsc, interval=5):
Monitor.__init__(self, rsc, interval, Event)
def __init__(self, rsc):
Monitor.__init__(self, rsc, Event)
Greenlet.__init__(self)
# Don't override `run` in a Greenlet. Add _run instead.
@ -144,6 +156,69 @@ except ImportError:
pass
class Member(object):
"""Represent one member of a replica set
"""
# For unittesting only. Use under no circumstances!
_host_to_ping_time = {}
def __init__(self, host, ismaster_response, ping_time, connection_pool):
self.host = host
self.pool = connection_pool
self.ping_time = MovingAverage(5)
self.update(ismaster_response, ping_time)
def update(self, ismaster_response, ping_time):
self.is_primary = ismaster_response['ismaster']
self.max_bson_size = ismaster_response.get(
'maxBsonObjectSize', MAX_BSON_SIZE)
self.tags = ismaster_response.get('tags', {})
self.record_ping_time(ping_time)
self.up = True
def get_avg_ping_time(self):
"""Get a moving average of this member's ping times
"""
if self.host in Member._host_to_ping_time:
# Simulate ping times for unittesting
return Member._host_to_ping_time[self.host]
return self.ping_time.get()
def record_ping_time(self, ping_time):
self.ping_time.update(ping_time)
def matches_mode(self, mode):
if mode == ReadPreference.PRIMARY and not self.is_primary:
return False
if mode == ReadPreference.SECONDARY and self.is_primary:
return False
return True
def matches_tags(self, tags):
"""Return True if this member's tags are a superset of the passed-in
tags. E.g., if this member is tagged {'dc': 'ny', 'rack': '1'},
then it matches {'dc': 'ny'}.
"""
for key, value in tags.items():
if key not in self.tags or self.tags[key] != value:
return False
return True
def matches_tag_sets(self, tag_sets):
"""Return True if this member matches any of the tag sets, e.g.
[{'dc': 'ny'}, {'dc': 'la'}, {}]
"""
for tags in tag_sets:
if self.matches_tags(tags):
return True
return False
class ReplicaSetConnection(common.BaseObject):
"""Connection to a MongoDB replica set.
"""
@ -198,7 +273,7 @@ class ReplicaSetConnection(common.BaseObject):
Other optional parameters can be passed as keyword arguments:
- `safe`: Use getlasterror for each write operation?
- `j` or `journal`: Block until write operations have been commited
- `j` or `journal`: Block until write operations have been committed
to the journal. Ignored if the server is running without
journaling. Implies safe=True.
- `w`: (integer or string) If this is a replica set write operations
@ -217,16 +292,23 @@ class ReplicaSetConnection(common.BaseObject):
before timing out.
- `ssl`: If True, create the connection to the servers using SSL.
- `read_preference`: The read preference for this connection.
See :class:`~pymongo.ReadPreference` for available options.
See :class:`~pymongo.read_preferences.ReadPreference` for available
- `tag_sets`: Read from replica-set members with these tags.
To specify a priority-order for tag sets, provide a list of
tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag
set, ``{}``, means "read from any member that matches the mode,
ignoring tags." ReplicaSetConnection tries each set of tags in turn
until it finds a set of tags with at least one matching member.
- `secondary_acceptable_latency_ms`: Any replica-set member whose
ping time is within secondary_acceptable_latency_ms of the nearest
member may accept reads. Default 15 milliseconds.
- `auto_start_request`: If True (the default), each thread that
accesses this :class:`ReplicaSetConnection` has a socket allocated
to it for the thread's lifetime, for each member of the set. For
:class:`~pymongo.ReadPreference` PRIMARY, auto_start_request=True
ensures consistent reads, even if you read after an unsafe
write. For read preferences other than PRIMARY, there are no
consistency guarantees. (The semantics of auto_start_request,
:class:`~pymongo.ReadPreference`, and :class:`ReplicaSetConnection`
may change in future releases of PyMongo.)
:class:`~pymongo.read_preferences.ReadPreference` PRIMARY,
auto_start_request=True ensures consistent reads, even if you read
after an unsafe write. For read preferences other than PRIMARY,
there are no consistency guarantees.
- `use_greenlets` (optional): if ``True``, use a background Greenlet
instead of a background thread to monitor state of replica set.
:meth:`start_request()` will ensure that the current greenlet uses
@ -246,6 +328,8 @@ class ReplicaSetConnection(common.BaseObject):
connection.Connection.
.. versionchanged:: 2.2.1+
Added `tag_sets` and `secondary_acceptable_latency_ms` options.
.. versionchanged:: 2.2
Added `auto_start_request` and `use_greenlets` options.
Added support for `host`, `port`, and `network_timeout` keyword
@ -258,7 +342,7 @@ class ReplicaSetConnection(common.BaseObject):
self.__arbiters = set()
self.__writer = None
self.__readers = []
self.__pools = {}
self.__members = {}
self.__index_cache = {}
self.__auth_credentials = {}
@ -266,6 +350,7 @@ class ReplicaSetConnection(common.BaseObject):
'max_pool_size', max_pool_size)
self.__tz_aware = common.validate_boolean('tz_aware', tz_aware)
self.__document_class = document_class
self.__monitor = None
# Compatibility with connection.Connection
host = kwargs.pop('host', hosts_or_uri)
@ -311,6 +396,7 @@ class ReplicaSetConnection(common.BaseObject):
self.__auto_start_request = self.__opts.get('auto_start_request', True)
self.__in_request = self.__auto_start_request
self.__reset_pinned_hosts()
self.__name = self.__opts.get('replicaset')
if not self.__name:
raise ConfigurationError("the replicaSet "
@ -358,7 +444,6 @@ class ReplicaSetConnection(common.BaseObject):
self.__monitor.setDaemon(True)
self.__monitor.start()
def _cached(self, dbname, coll, index):
"""Test if `index` is cached.
"""
@ -498,6 +583,14 @@ class ReplicaSetConnection(common.BaseObject):
"""
return self.__arbiters
@property
def is_mongos(self):
"""If this ReplicaSetConnection is connected to mongos (always False)
.. versionadded:: 2.2.1+
"""
return False
@property
def max_pool_size(self):
"""The maximum pool size limit set for this connection.
@ -530,7 +623,7 @@ class ReplicaSetConnection(common.BaseObject):
0 if no primary is available.
"""
if self.__writer:
return self.__pools[self.__writer]['max_bson_size']
return self.__members[self.__writer].max_bson_size
return 0
@property
@ -539,14 +632,17 @@ class ReplicaSetConnection(common.BaseObject):
def __simple_command(self, sock_info, dbname, spec):
"""Send a command to the server.
Returns (response, ping_time in seconds).
"""
rqst_id, msg, _ = message.query(0, dbname + '.$cmd', 0, -1, spec)
start = time.time()
sock_info.sock.sendall(msg)
response = self.__recv_msg(1, rqst_id, sock_info)
end = time.time()
response = helpers._unpack_response(response)['data'][0]
msg = "command %r failed: %%s" % spec
helpers._check_command_response(response, None, msg)
return response
return response, end - start
def __auth(self, sock_info, dbname, user, passwd):
"""Authenticate socket against database `dbname`.
@ -563,52 +659,86 @@ class ReplicaSetConnection(common.BaseObject):
def __is_master(self, host):
"""Directly call ismaster.
Returns (response, connection_pool, ping_time in seconds).
"""
mpool = self.pool_class(host, self.__max_pool_size,
self.__net_timeout, self.__conn_timeout,
self.__use_ssl)
sock_info = mpool.get_socket()
connection_pool = self.pool_class(
host, self.__max_pool_size, self.__net_timeout, self.__conn_timeout,
self.__use_ssl)
sock_info = connection_pool.get_socket()
try:
response = self.__simple_command(
response, ping_time = self.__simple_command(
sock_info, 'admin', {'ismaster': 1}
)
mpool.maybe_return_socket(sock_info)
return response, mpool
connection_pool.maybe_return_socket(sock_info)
return response, connection_pool, ping_time
except (ConnectionFailure, socket.error):
mpool.discard_socket(sock_info)
connection_pool.discard_socket(sock_info)
raise
def __update_pools(self):
"""Update the mapping of (host, port) pairs to connection pools.
"""
primary = None
secondaries = []
for host in self.__hosts:
mongo, sock_info = None, None
member, sock_info = None, None
try:
if host in self.__pools:
mongo = self.__pools[host]
sock_info = self.__socket(mongo)
res = self.__simple_command(sock_info, 'admin', {'ismaster': 1})
mongo['pool'].maybe_return_socket(sock_info)
if host in self.__members:
member = self.__members[host]
sock_info = self.__socket(member)
res, ping_time = self.__simple_command(
sock_info, 'admin', {'ismaster': 1})
member.pool.maybe_return_socket(sock_info)
member.update(res, ping_time)
else:
res, conn = self.__is_master(host)
bson_max = res.get('maxBsonObjectSize', MAX_BSON_SIZE)
self.__pools[host] = {'pool': conn,
'last_checkout': time.time(),
'max_bson_size': bson_max}
res, connection_pool, ping_time = self.__is_master(host)
self.__members[host] = Member(
host=host,
ismaster_response=res,
ping_time=ping_time,
connection_pool=connection_pool)
except (ConnectionFailure, socket.error):
if mongo:
mongo['pool'].discard_socket(sock_info)
if member:
member.pool.discard_socket(sock_info)
self.__members.pop(member.host, None)
continue
# Only use hosts that are currently in 'secondary' state
# as readers.
if res['secondary']:
secondaries.append(host)
elif res['ismaster']:
self.__writer = host
primary = host
if primary != self.__writer:
self.__reset_pinned_hosts()
self.__writer = primary
self.__readers = secondaries
def __schedule_refresh(self):
self.__monitor.schedule_refresh()
def __pin_host(self, host):
# After first successful read in a request, continue reading from same
# member until read preferences change, host goes down, or
# end_request(). This offers a small assurance that reads won't jump
# around in time.
self.__threadlocal.host = host
def __pinned_host(self):
return getattr(self.__threadlocal, 'host', None)
def __unpin_host(self):
self.__threadlocal.host = None
def __reset_pinned_hosts(self):
if self.__opts.get('use_greenlets', False):
self.__threadlocal = gevent_local()
else:
self.__threadlocal = threading.local()
def refresh(self):
"""Iterate through the existing host list, or possibly the
seed list, to update the list of hosts and arbiters in this
@ -619,16 +749,16 @@ class ReplicaSetConnection(common.BaseObject):
hosts = set()
for node in nodes:
mongo, sock_info = None, None
member, sock_info = None, None
try:
if node in self.__pools:
mongo = self.__pools[node]
sock_info = self.__socket(mongo)
response = self.__simple_command(sock_info, 'admin',
{'ismaster': 1})
mongo['pool'].maybe_return_socket(sock_info)
if node in self.__members:
member = self.__members[node]
sock_info = self.__socket(member)
response, _ = self.__simple_command(
sock_info, 'admin', {'ismaster': 1})
member.pool.maybe_return_socket(sock_info)
else:
response, _ = self.__is_master(node)
response, _, _ = self.__is_master(node)
# Check that this host is part of the given replica set.
set_name = response.get('setName')
@ -650,8 +780,8 @@ class ReplicaSetConnection(common.BaseObject):
hosts.update([_partition_node(h)
for h in response["passives"]])
except (ConnectionFailure, socket.error), why:
if mongo:
mongo['pool'].discard_socket(sock_info)
if member:
member.pool.discard_socket(sock_info)
errors.append("%s:%d: %s" % (node[0], node[1], str(why)))
if hosts:
self.__hosts = hosts
@ -666,27 +796,28 @@ class ReplicaSetConnection(common.BaseObject):
def __check_is_primary(self, host):
"""Checks if this host is the primary for the replica set.
"""
mongo, sock_info = None, None
member, sock_info = None, None
try:
if host in self.__pools:
mongo = self.__pools[host]
sock_info = self.__socket(mongo)
res = self.__simple_command(
if host in self.__members:
member = self.__members[host]
sock_info = self.__socket(member)
res, ping_time = self.__simple_command(
sock_info, 'admin', {'ismaster': 1}
)
else:
res, conn = self.__is_master(host)
bson_max = res.get('maxBsonObjectSize', MAX_BSON_SIZE)
self.__pools[host] = {'pool': conn,
'last_checkout': time.time(),
'max_bson_size': bson_max}
res, connection_pool, ping_time = self.__is_master(host)
self.__members[host] = Member(
host=host,
ismaster_response=res,
ping_time=ping_time,
connection_pool=connection_pool)
except (ConnectionFailure, socket.error), why:
if mongo:
mongo['pool'].discard_socket(sock_info)
if member:
member.pool.discard_socket(sock_info)
raise ConnectionFailure("%s:%d: %s" % (host[0], host[1], str(why)))
if mongo and sock_info:
mongo['pool'].maybe_return_socket(sock_info)
if member and sock_info:
member.pool.maybe_return_socket(sock_info)
if res["ismaster"]:
return host
@ -704,7 +835,9 @@ class ReplicaSetConnection(common.BaseObject):
if one exists.
"""
if self.__writer:
return self.__pools[self.__writer]
primary = self.__members[self.__writer]
if primary.up:
return primary
# This is either the first connection or we had a failover.
self.refresh()
@ -713,21 +846,20 @@ class ReplicaSetConnection(common.BaseObject):
for candidate in self.__hosts:
try:
self.__writer = self.__check_is_primary(candidate)
return self.__pools[self.__writer]
return self.__members[self.__writer]
except (ConnectionFailure, socket.error), why:
errors.append(str(why))
# Couldn't find the primary.
raise AutoReconnect(', '.join(errors))
def __socket(self, mongo):
def __socket(self, member):
"""Get a SocketInfo from the pool.
"""
mpool = mongo['pool']
if self.__auto_start_request:
# No effect if a request already started
self.start_request()
sock_info = mpool.get_socket()
sock_info = member.pool.get_socket()
if self.__auth_credentials:
self.__check_auth(sock_info)
@ -736,9 +868,9 @@ class ReplicaSetConnection(common.BaseObject):
def disconnect(self):
"""Disconnect from the replica set primary.
"""
mongo = self.__pools.get(self.__writer)
if mongo and 'pool' in mongo:
mongo['pool'].reset()
member = self.__members.get(self.__writer)
if member:
member.pool.reset()
self.__writer = None
def close(self):
@ -757,7 +889,7 @@ class ReplicaSetConnection(common.BaseObject):
self.__monitor.join(1.0)
self.__monitor = None
self.__writer = None
self.__pools = {}
self.__members = {}
def __check_response_to_last_error(self, response):
"""Check a response to a lastError message for errors.
@ -851,15 +983,14 @@ class ReplicaSetConnection(common.BaseObject):
- `safe`: check getLastError status after sending the message
"""
if _connection_to_use in (None, -1):
mongo = self.__find_primary()
member = self.__find_primary()
else:
mongo = self.__pools[_connection_to_use]
member = self.__members[_connection_to_use]
sock_info = None
try:
sock_info = self.__socket(mongo)
rqst_id, data = self.__check_bson_size(msg,
mongo['max_bson_size'])
sock_info = self.__socket(member)
rqst_id, data = self.__check_bson_size(msg, member.max_bson_size)
sock_info.sock.sendall(data)
# Safe mode. We pack the message together with a lastError
# message and send both. We then get the response (to the
@ -869,101 +1000,166 @@ class ReplicaSetConnection(common.BaseObject):
if safe:
response = self.__recv_msg(1, rqst_id, sock_info)
rv = self.__check_response_to_last_error(response)
mongo['pool'].maybe_return_socket(sock_info)
member.pool.maybe_return_socket(sock_info)
return rv
except(ConnectionFailure, socket.error), why:
mongo['pool'].discard_socket(sock_info)
member.pool.discard_socket(sock_info)
if _connection_to_use in (None, -1):
self.disconnect()
raise AutoReconnect(str(why))
except:
mongo['pool'].discard_socket(sock_info)
member.pool.discard_socket(sock_info)
raise
def __send_and_receive(self, mongo, msg, **kwargs):
def __send_and_receive(self, member, msg, **kwargs):
"""Send a message on the given socket and return the response data.
"""
sock_info = None
try:
sock_info = self.__socket(mongo)
sock_info = self.__socket(member)
if "network_timeout" in kwargs:
sock_info.sock.settimeout(kwargs['network_timeout'])
rqst_id, data = self.__check_bson_size(msg,
mongo['max_bson_size'])
rqst_id, data = self.__check_bson_size(msg, member.max_bson_size)
sock_info.sock.sendall(data)
response = self.__recv_msg(1, rqst_id, sock_info)
if "network_timeout" in kwargs:
sock_info.sock.settimeout(self.__net_timeout)
mongo['pool'].maybe_return_socket(sock_info)
member.pool.maybe_return_socket(sock_info)
return response
except (ConnectionFailure, socket.error), why:
host, port = mongo['pool'].pair
mongo['pool'].discard_socket(sock_info)
host, port = member.pool.pair
member.pool.discard_socket(sock_info)
raise AutoReconnect("%s:%d: %s" % (host, port, str(why)))
except:
mongo['pool'].discard_socket(sock_info)
member.pool.discard_socket(sock_info)
raise
def __try_read(self, member, msg, **kwargs):
"""Attempt a read from a member; on failure mark the member "down" and
wake up the monitor thread to refresh as soon as possible.
"""
try:
return self.__send_and_receive(member, msg, **kwargs)
except AutoReconnect:
member.up = False
self.__schedule_refresh()
raise
def _send_message_with_response(self, msg, _connection_to_use=None,
_must_use_master=False, **kwargs):
"""Send a message to Mongo and return the response.
Sends the given message and returns the response.
Sends the given message and returns (host used, response).
:Parameters:
- `msg`: (request_id, data) pair making up the message to send
"""
read_pref = kwargs.get('read_preference', ReadPreference.PRIMARY)
mongo = None
# If we've disconnected since last read, trigger refresh
try:
self.__find_primary()
except AutoReconnect:
# We'll throw an error later
pass
tag_sets = kwargs.get('tag_sets', [{}])
mode = kwargs.get('read_preference', ReadPreference.PRIMARY)
if _must_use_master:
mode = ReadPreference.PRIMARY
tag_sets = [{}]
secondary_acceptable_latency_ms = kwargs.get(
'secondary_acceptable_latency_ms',
self.secondary_acceptable_latency_ms)
member = None
try:
if _connection_to_use is not None:
if _connection_to_use == -1:
mongo = self.__find_primary()
member = self.__find_primary()
else:
mongo = self.__pools[_connection_to_use]
return mongo['pool'].pair, self.__send_and_receive(mongo,
msg,
**kwargs)
elif _must_use_master or not read_pref:
mongo = self.__find_primary()
return mongo['pool'].pair, self.__send_and_receive(mongo,
msg,
**kwargs)
member = self.__members[_connection_to_use]
return member.pool.pair, self.__try_read(
member, msg, **kwargs)
except AutoReconnect:
if mongo == self.__pools.get(self.__writer):
if member == self.__members.get(self.__writer):
self.disconnect()
raise
errors = []
for host in helpers.shuffled(self.__readers):
pinned_member = self.__members.get(self.__pinned_host())
if (pinned_member
and pinned_member.matches_mode(mode)
and pinned_member.matches_tag_sets(tag_sets)
and pinned_member.up
):
try:
mongo = self.__pools[host]
return host, self.__send_and_receive(mongo, msg, **kwargs)
return (
pinned_member.host,
self.__try_read(pinned_member, msg, **kwargs))
except AutoReconnect, why:
if _must_use_master or mode == ReadPreference.PRIMARY:
self.disconnect()
raise
else:
errors.append(str(why))
# No pinned member, or pinned member down or doesn't match read pref
self.__unpin_host()
members = self.__members.copy().values()
while len(errors) < MAX_RETRY:
member = select_member(
members=members,
mode=mode,
tag_sets=tag_sets,
latency=secondary_acceptable_latency_ms)
if not member:
# Ran out of members to try
break
try:
# Sets member.up False on failure, so select_member won't try
# it again.
response = self.__try_read(member, msg, **kwargs)
# Success
if self.in_request():
# Keep reading from this member in this thread / greenlet
self.__pin_host(member.host)
return member.host, response
except AutoReconnect, why:
errors.append(str(why))
# Fallback to primary
if read_pref == ReadPreference.SECONDARY:
try:
mongo = self.__find_primary()
return mongo['pool'].pair, self.__send_and_receive(mongo,
msg,
**kwargs)
except AutoReconnect, why:
self.disconnect()
errors.append(str(why))
raise AutoReconnect(', '.join(errors))
members.remove(member)
# Ran out of tries
if mode == ReadPreference.PRIMARY:
msg = "No replica set primary available for query"
elif mode == ReadPreference.SECONDARY:
msg = "No replica set secondary available for query"
else:
msg = "No replica set members available for query"
msg += " with ReadPreference %s" % modes[mode]
if tag_sets != [{}]:
msg += " and tags " + repr(tag_sets)
raise AutoReconnect(msg, errors)
def start_request(self):
"""Ensure the current thread or greenlet always uses the same socket
until it calls :meth:`end_request`. For
:class:`~pymongo.ReadPreference` PRIMARY, auto_start_request=True
ensures consistent reads, even if you read after an unsafe write. For
read preferences other than PRIMARY, there are no consistency
guarantees.
:class:`~pymongo.read_preferences.ReadPreference` PRIMARY,
auto_start_request=True ensures consistent reads, even if you read
after an unsafe write. For read preferences other than PRIMARY, there
are no consistency guarantees.
In Python 2.6 and above, or in Python 2.5 with
"from __future__ import with_statement", :meth:`start_request` can be
@ -983,9 +1179,8 @@ class ReplicaSetConnection(common.BaseObject):
The :class:`~pymongo.pool.Request` return value.
:meth:`start_request` previously returned None
"""
for mongo in self.__pools.values():
if 'pool' in mongo:
mongo['pool'].start_request()
for member in self.__members.values():
member.pool.start_request()
self.__in_request = True
return pool.Request(self)
@ -1011,11 +1206,11 @@ class ReplicaSetConnection(common.BaseObject):
in the middle of a sequence of operations in which ordering is
important. This could lead to unexpected results.
"""
for mongo in self.__pools.values():
if 'pool' in mongo:
mongo['pool'].end_request()
for member in self.__members.values():
member.pool.end_request()
self.__in_request = False
self.__unpin_host()
def __eq__(self, other):
# XXX: Implement this?

View File

@ -120,35 +120,53 @@ def start_replica_set(members, fresh=True):
expected_arbiters += 1
expected_secondaries = len(members) - expected_arbiters - 1
while True:
# Wait for 5 minutes for replica set to come up
patience = 5
for _ in range(patience * 60 / 2):
time.sleep(2)
try:
if (len(get_primary()) == 1 and
if (get_primary() and
len(get_secondaries()) == expected_secondaries and
len(get_arbiters()) == expected_arbiters):
break
except pymongo.errors.AutoReconnect:
# Keep waiting
pass
else:
kill_all_members()
raise Exception(
"Replica set still not initalized after %s minutes" % patience)
return primary, set_name
# Connect to a random member
def get_connection():
return pymongo.Connection(nodes.keys(), slave_okay=True, use_greenlets=use_greenlets)
def get_members_in_state(state):
c = pymongo.Connection(nodes.keys(), slave_okay=True, use_greenlets=use_greenlets)
status = c.admin.command('replSetGetStatus')
status = get_connection().admin.command('replSetGetStatus')
members = status['members']
return [k['name'] for k in members if k['state'] == state]
def get_primary():
return get_members_in_state(1)
try:
primaries = get_members_in_state(1)
assert len(primaries) <= 1
if primaries:
return primaries[0]
except pymongo.errors.AutoReconnect:
pass
return None
def get_random_secondary():
secondaries = get_members_in_state(2)
if len(secondaries):
return [secondaries[random.randrange(0, len(secondaries))]]
return secondaries
return random.choice(secondaries)
return None
def get_secondaries():
@ -160,13 +178,11 @@ def get_arbiters():
def get_passives():
c = pymongo.Connection(nodes.keys(), slave_okay=True, use_greenlets=use_greenlets)
return c.admin.command('ismaster').get('passives', [])
return get_connection().admin.command('ismaster').get('passives', [])
def get_hosts():
c = pymongo.Connection(nodes.keys(), slave_okay=True, use_greenlets=use_greenlets)
return c.admin.command('ismaster').get('hosts', [])
return get_connection().admin.command('ismaster').get('hosts', [])
def get_hidden_members():
@ -182,15 +198,24 @@ def get_hidden_members():
return secondaries
def get_tags(member):
config = get_connection().local.system.replset.find_one()
for m in config['members']:
if m['host'] == member:
return m.get('tags', {})
raise Exception('member %s not in config' % repr(member))
def kill_primary(sig=2):
primary = get_primary()
kill_members(primary, sig)
kill_members([primary], sig)
return primary
def kill_secondary(sig=2):
secondary = get_random_secondary()
kill_members(secondary, sig)
kill_members([secondary], sig)
return secondary

View File

@ -21,98 +21,84 @@ import replset_tools
from replset_tools import use_greenlets
from pymongo import (ReplicaSetConnection,
from pymongo import (replica_set_connection,
ReplicaSetConnection,
ReadPreference)
from pymongo.replica_set_connection import Member, Monitor
from pymongo.connection import Connection, _partition_node
from pymongo.errors import AutoReconnect, ConnectionFailure
from test import utils
class TestReadPreference(unittest.TestCase):
# Override default 30-second interval for faster testing
Monitor._refresh_interval = MONITOR_INTERVAL = 0.5
class TestSecondaryConnection(unittest.TestCase):
def setUp(self):
members = [{}, {}, {'arbiterOnly': True}]
res = replset_tools.start_replica_set(members)
self.seed, self.name = res
def test_read_preference(self):
c = ReplicaSetConnection(
def test_secondary_connection(self):
self.c = ReplicaSetConnection(
self.seed, replicaSet=self.name, use_greenlets=use_greenlets)
self.assertTrue(bool(len(c.secondaries)))
db = c.pymongo_test
db.test.remove({}, safe=True, w=len(c.secondaries))
self.assertTrue(bool(len(self.c.secondaries)))
db = self.c.pymongo_test
db.test.remove({}, safe=True, w=len(self.c.secondaries))
# Force replication...
w = len(c.secondaries) + 1
w = len(self.c.secondaries) + 1
db.test.insert({'foo': 'bar'}, safe=True, w=w)
# Test direct connection to a secondary
host, port = replset_tools.get_secondaries()[0].split(':')
port = int(port)
conn = Connection(
host, port, slave_okay=True, use_greenlets=use_greenlets)
self.assertEqual(host, conn.host)
self.assertEqual(port, conn.port)
self.assert_(conn.pymongo_test.test.find_one())
conn = Connection(
host, port,
read_preference=ReadPreference.SECONDARY,
use_greenlets=use_greenlets)
self.assertEqual(host, conn.host)
self.assertEqual(port, conn.port)
self.assert_(conn.pymongo_test.test.find_one())
# Test direct connection to a primary or secondary
primary_host, primary_port = replset_tools.get_primary().split(':')
primary_port = int(primary_port)
secondary_host, secondary_port = replset_tools.get_secondaries()[0].split(':')
secondary_port = int(secondary_port)
self.assertTrue(Connection(
primary_host, primary_port, use_greenlets=use_greenlets).is_primary)
self.assertTrue(Connection(
primary_host, primary_port, use_greenlets=use_greenlets,
read_preference=ReadPreference.PRIMARY_PREFERRED).is_primary)
self.assertTrue(Connection(
primary_host, primary_port, use_greenlets=use_greenlets,
read_preference=ReadPreference.SECONDARY_PREFERRED).is_primary)
self.assertTrue(Connection(
primary_host, primary_port, use_greenlets=use_greenlets,
read_preference=ReadPreference.NEAREST).is_primary)
self.assertTrue(Connection(
primary_host, primary_port, use_greenlets=use_greenlets,
read_preference=ReadPreference.SECONDARY).is_primary)
for kwargs in [
{'read_preference': ReadPreference.PRIMARY_PREFERRED},
{'read_preference': ReadPreference.SECONDARY},
{'read_preference': ReadPreference.SECONDARY_PREFERRED},
{'read_preference': ReadPreference.NEAREST},
{'slave_okay': True},
]:
conn = Connection(
secondary_host, secondary_port, use_greenlets=use_greenlets, **kwargs)
self.assertEqual(secondary_host, conn.host)
self.assertEqual(secondary_port, conn.port)
self.assertFalse(conn.is_primary)
self.assert_(conn.pymongo_test.test.find_one())
# Test direct connection to an arbiter
host = replset_tools.get_arbiters()[0]
secondary_host = replset_tools.get_arbiters()[0]
self.assertRaises(
ConnectionFailure, Connection, host, use_greenlets=use_greenlets)
# Test PRIMARY
for _ in xrange(10):
cursor = db.test.find()
cursor.next()
self.assertEqual(cursor._Cursor__connection_id, c.primary)
# Test SECONDARY with a secondary
db.read_preference = ReadPreference.SECONDARY
for _ in xrange(10):
cursor = db.test.find()
cursor.next()
self.assertTrue(cursor._Cursor__connection_id in c.secondaries)
# Test SECONDARY_ONLY with a secondary
db.read_preference = ReadPreference.SECONDARY_ONLY
for _ in xrange(10):
cursor = db.test.find()
cursor.next()
self.assertTrue(cursor._Cursor__connection_id in c.secondaries)
# Test SECONDARY with no secondary
killed = replset_tools.kill_all_secondaries()
sleep(5) # Let monitor thread notice change
self.assertTrue(bool(len(killed)))
db.read_preference = ReadPreference.SECONDARY
for _ in xrange(10):
cursor = db.test.find()
cursor.next()
self.assertEqual(cursor._Cursor__connection_id, c.primary)
# Test SECONDARY_ONLY with no secondary
db.read_preference = ReadPreference.SECONDARY_ONLY
for _ in xrange(10):
cursor = db.test.find()
self.assertRaises(AutoReconnect, cursor.next)
replset_tools.restart_members(killed)
# Test PRIMARY with no primary (should raise an exception)
db.read_preference = ReadPreference.PRIMARY
cursor = db.test.find()
cursor.next()
self.assertEqual(cursor._Cursor__connection_id, c.primary)
killed = replset_tools.kill_primary()
self.assertTrue(bool(len(killed)))
self.assertRaises(AutoReconnect, db.test.find_one)
ConnectionFailure, Connection, secondary_host, use_greenlets=use_greenlets)
def tearDown(self):
self.c.close()
replset_tools.kill_all_members()
@ -125,36 +111,43 @@ class TestPassiveAndHidden(unittest.TestCase):
self.seed, self.name = res
def test_passive_and_hidden(self):
c = ReplicaSetConnection(
self.c = ReplicaSetConnection(
self.seed, replicaSet=self.name, use_greenlets=use_greenlets)
db = c.pymongo_test
db.test.remove({}, safe=True, w=len(c.secondaries))
w = len(c.secondaries) + 1
db = self.c.pymongo_test
w = len(self.c.secondaries) + 1
db.test.remove({}, safe=True, w=w)
db.test.insert({'foo': 'bar'}, safe=True, w=w)
db.read_preference = ReadPreference.SECONDARY
passives = replset_tools.get_passives()
passives = [_partition_node(member) for member in passives]
hidden = replset_tools.get_hidden_members()
hidden = [_partition_node(member) for member in hidden]
self.assertEqual(c.secondaries, set(passives))
self.assertEqual(self.c.secondaries, set(passives))
for _ in xrange(10):
cursor = db.test.find()
cursor.next()
self.assertTrue(cursor._Cursor__connection_id not in hidden)
for mode in (
ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED
):
db.read_preference = mode
for _ in xrange(10):
cursor = db.test.find()
cursor.next()
self.assertTrue(cursor._Cursor__connection_id in passives)
self.assertTrue(cursor._Cursor__connection_id not in hidden)
replset_tools.kill_members(replset_tools.get_passives(), 2)
sleep(5) # Let monitor thread notice change
sleep(2 * MONITOR_INTERVAL)
db.read_preference = ReadPreference.SECONDARY_PREFERRED
for _ in xrange(10):
cursor = db.test.find()
cursor.next()
self.assertEqual(cursor._Cursor__connection_id, c.primary)
self.assertEqual(cursor._Cursor__connection_id, self.c.primary)
def tearDown(self):
self.c.close()
replset_tools.kill_all_members()
class TestHealthMonitor(unittest.TestCase):
def setUp(self):
@ -168,18 +161,18 @@ class TestHealthMonitor(unittest.TestCase):
primary = c.primary
secondaries = c.secondaries
# Wait for new primary to be elected
def primary_changed():
for _ in xrange(30):
if c.primary != primary:
if c.primary and c.primary != primary:
return True
sleep(1)
return False
killed = replset_tools.kill_primary()
sleep(5) # Let monitor thread notice change
self.assertTrue(bool(len(killed)))
self.assertTrue(primary_changed())
self.assertTrue(secondaries != c.secondaries)
self.assertNotEqual(secondaries, c.secondaries)
def test_secondary_failure(self):
c = ReplicaSetConnection(
@ -197,13 +190,13 @@ class TestHealthMonitor(unittest.TestCase):
return False
killed = replset_tools.kill_secondary()
sleep(5) # Let monitor thread notice change
sleep(2 * MONITOR_INTERVAL)
self.assertTrue(bool(len(killed)))
self.assertEqual(primary, c.primary)
self.assertTrue(readers_changed())
secondaries = c.secondaries
replset_tools.restart_members(killed)
replset_tools.restart_members([killed])
self.assertEqual(primary, c.primary)
self.assertTrue(readers_changed())
@ -212,7 +205,7 @@ class TestHealthMonitor(unittest.TestCase):
self.seed, replicaSet=self.name, use_greenlets=use_greenlets)
self.assertTrue(bool(len(c.secondaries)))
primary = c.primary
secondaries = c.secondaries
secondaries = c.secondaries.copy()
def primary_changed():
for _ in xrange(30):
@ -223,7 +216,11 @@ class TestHealthMonitor(unittest.TestCase):
replset_tools.stepdown_primary()
self.assertTrue(primary_changed())
self.assertTrue(secondaries != c.secondaries)
# There can be a delay between finding the primary and updating
# secondaries
sleep(5)
self.assertNotEqual(secondaries, c.secondaries)
def tearDown(self):
replset_tools.kill_all_members()
@ -272,7 +269,8 @@ class TestReadWithFailover(unittest.TestCase):
def test_read_with_failover(self):
c = ReplicaSetConnection(
self.seed, replicaSet=self.name, use_greenlets=use_greenlets)
self.seed, replicaSet=self.name, use_greenlets=use_greenlets,
auto_start_request=False)
self.assertTrue(bool(len(c.secondaries)))
def iter_cursor(cursor):
@ -284,15 +282,15 @@ class TestReadWithFailover(unittest.TestCase):
w = len(c.secondaries) + 1
db.test.remove({}, safe=True, w=w)
# Force replication
db.test.insert([{'foo': i} for i in xrange(10)],
safe=True, w=w)
db.test.insert([{'foo': i} for i in xrange(10)], safe=True, w=w)
self.assertEqual(10, db.test.count())
db.read_preference = ReadPreference.SECONDARY
db.read_preference = ReadPreference.SECONDARY_PREFERRED
cursor = db.test.find().batch_size(5)
cursor.next()
self.assertEqual(5, cursor._Cursor__retrieved)
killed = replset_tools.kill_primary()
self.assertTrue(cursor._Cursor__connection_id in c.secondaries)
replset_tools.kill_primary()
# Primary failure shouldn't interrupt the cursor
self.assertTrue(iter_cursor(cursor))
self.assertEqual(10, cursor._Cursor__retrieved)
@ -300,16 +298,316 @@ class TestReadWithFailover(unittest.TestCase):
def tearDown(self):
replset_tools.kill_all_members()
class TestReadPreference(unittest.TestCase):
def setUp(self):
members = [
# primary
{'tags': {'dc': 'ny', 'name': 'primary'}},
# secondary
{'tags': {'dc': 'la', 'name': 'secondary'}, 'priority': 0},
# other_secondary
{'tags': {'dc': 'ny', 'name': 'other_secondary'}, 'priority': 0},
]
res = replset_tools.start_replica_set(members)
self.seed, self.name = res
primary = replset_tools.get_primary()
self.primary = _partition_node(primary)
self.primary_tags = replset_tools.get_tags(primary)
# Make sure priority worked
self.assertEqual('primary', self.primary_tags['name'])
self.primary_dc = {'dc': self.primary_tags['dc']}
secondaries = replset_tools.get_secondaries()
(secondary, ) = [
s for s in secondaries
if replset_tools.get_tags(s)['name'] == 'secondary']
self.secondary = _partition_node(secondary)
self.secondary_tags = replset_tools.get_tags(secondary)
self.secondary_dc = {'dc': self.secondary_tags['dc']}
(other_secondary, ) = [
s for s in secondaries
if replset_tools.get_tags(s)['name'] == 'other_secondary']
self.other_secondary = _partition_node(other_secondary)
self.other_secondary_tags = replset_tools.get_tags(other_secondary)
self.other_secondary_dc = {'dc': self.other_secondary_tags['dc']}
self.c = ReplicaSetConnection(
self.seed, replicaSet=self.name, use_greenlets=use_greenlets)
self.db = self.c.pymongo_test
self.w = len(self.c.secondaries) + 1
self.db.test.remove({}, safe=True, w=self.w)
self.db.test.insert(
[{'foo': i} for i in xrange(10)], safe=True, w=self.w)
self.clear_ping_times()
def set_ping_time(self, host, ping_time_seconds):
Member._host_to_ping_time[host] = ping_time_seconds
def clear_ping_times(self):
Member._host_to_ping_time.clear()
def test_read_preference(self):
# This is long, but we put all the tests in one function to save time
# on setUp, which takes about 30 seconds to bring up a replica set.
# We pass through four states:
#
# 1. A primary and two secondaries
# 2. Primary down
# 3. Primary up, one secondary down
# 4. Primary up, all secondaries down
#
# For each state, we verify the behavior of PRIMARY,
# PRIMARY_PREFERRED, SECONDARY, SECONDARY_PREFERRED, and NEAREST
c = ReplicaSetConnection(
self.seed, replicaSet=self.name, use_greenlets=use_greenlets,
auto_start_request=False)
def assertReadFrom(member, *args, **kwargs):
utils.assertReadFrom(self, c, member, *args, **kwargs)
def assertReadFromAll(members, *args, **kwargs):
utils.assertReadFromAll(self, c, members, *args, **kwargs)
def unpartition_node(node):
host, port = node
return '%s:%s' % (host, port)
# To make the code terser, copy modes and hosts into local scope
PRIMARY = ReadPreference.PRIMARY
PRIMARY_PREFERRED = ReadPreference.PRIMARY_PREFERRED
SECONDARY = ReadPreference.SECONDARY
SECONDARY_PREFERRED = ReadPreference.SECONDARY_PREFERRED
NEAREST = ReadPreference.NEAREST
primary = self.primary
secondary = self.secondary
other_secondary = self.other_secondary
bad_tag = {'bad': 'tag'}
# 1. THREE MEMBERS UP -------------------------------------------------
# PRIMARY
assertReadFrom(primary, PRIMARY)
# PRIMARY_PREFERRED
# Trivial: mode and tags both match
assertReadFrom(primary, PRIMARY_PREFERRED, self.primary_dc)
# Secondary matches but not primary, choose primary
assertReadFrom(primary, PRIMARY_PREFERRED, self.secondary_dc)
# Chooses primary, ignoring tag sets
assertReadFrom(primary, PRIMARY_PREFERRED, self.primary_dc)
# Chooses primary, ignoring tag sets
assertReadFrom(primary, PRIMARY_PREFERRED, bad_tag)
assertReadFrom(primary, PRIMARY_PREFERRED, [bad_tag, {}])
# SECONDARY
assertReadFromAll([secondary, other_secondary], SECONDARY)
# SECONDARY_PREFERRED
assertReadFromAll([secondary, other_secondary], SECONDARY_PREFERRED)
# Multiple tags
assertReadFrom(secondary, SECONDARY_PREFERRED, self.secondary_tags)
# Fall back to primary if it's the only one matching the tags
assertReadFrom(primary, SECONDARY_PREFERRED, {'name': 'primary'})
# No matching secondaries
assertReadFrom(primary, SECONDARY_PREFERRED, bad_tag)
# Fall back from non-matching tag set to matching set
assertReadFromAll([secondary, other_secondary],
SECONDARY_PREFERRED, [bad_tag, {}])
assertReadFrom(other_secondary,
SECONDARY_PREFERRED, [bad_tag, {'dc': 'ny'}])
# NEAREST
self.clear_ping_times()
assertReadFromAll([primary, secondary, other_secondary], NEAREST)
assertReadFromAll([primary, other_secondary],
NEAREST, [bad_tag, {'dc': 'ny'}])
self.set_ping_time(primary, 0)
self.set_ping_time(secondary, .03) # 30 ms
self.set_ping_time(other_secondary, 10)
# Nearest member, no tags
assertReadFrom(primary, NEAREST)
# Tags override nearness
assertReadFrom(primary, NEAREST, {'name': 'primary'})
assertReadFrom(secondary, NEAREST, self.secondary_dc)
# Make secondary fast
self.set_ping_time(primary, .03) # 30 ms
self.set_ping_time(secondary, 0)
assertReadFrom(secondary, NEAREST)
# Other secondary fast
self.set_ping_time(secondary, 10)
self.set_ping_time(other_secondary, 0)
assertReadFrom(other_secondary, NEAREST)
# High secondaryAcceptableLatencyMS, should read from all members
assertReadFromAll(
[primary, secondary, other_secondary],
NEAREST, secondary_acceptable_latency_ms=1000*1000)
self.clear_ping_times()
assertReadFromAll([primary, other_secondary], NEAREST, [{'dc': 'ny'}])
# 2. PRIMARY DOWN -----------------------------------------------------
killed = replset_tools.kill_primary()
# Let monitor notice primary's gone
sleep(2 * MONITOR_INTERVAL)
# PRIMARY
assertReadFrom(None, PRIMARY)
# PRIMARY_PREFERRED
# No primary, choose matching secondary
assertReadFromAll([secondary, other_secondary], PRIMARY_PREFERRED)
assertReadFrom(secondary, PRIMARY_PREFERRED, {'name': 'secondary'})
# No primary or matching secondary
assertReadFrom(None, PRIMARY_PREFERRED, bad_tag)
# SECONDARY
assertReadFromAll([secondary, other_secondary], SECONDARY)
# Only primary matches
assertReadFrom(None, SECONDARY, {'name': 'primary'})
# No matching secondaries
assertReadFrom(None, SECONDARY, bad_tag)
# SECONDARY_PREFERRED
assertReadFromAll([secondary, other_secondary], SECONDARY_PREFERRED)
# Mode and tags both match
assertReadFrom(secondary, SECONDARY_PREFERRED, {'name': 'secondary'})
# NEAREST
self.clear_ping_times()
assertReadFromAll([secondary, other_secondary], NEAREST)
# 3. PRIMARY UP, ONE SECONDARY DOWN -----------------------------------
replset_tools.restart_members([killed])
for _ in range(30):
if replset_tools.get_primary():
break
sleep(1)
else:
self.fail("Primary didn't come back up")
replset_tools.kill_members([unpartition_node(secondary)], 2)
self.assertTrue(Connection(
unpartition_node(primary), use_greenlets=use_greenlets,
slave_okay=True
).admin.command('ismaster')['ismaster'])
sleep(2 * MONITOR_INTERVAL)
# PRIMARY
assertReadFrom(primary, PRIMARY)
# PRIMARY_PREFERRED
assertReadFrom(primary, PRIMARY_PREFERRED)
# SECONDARY
assertReadFrom(other_secondary, SECONDARY)
assertReadFrom(other_secondary, SECONDARY, self.other_secondary_dc)
# Only the down secondary matches
assertReadFrom(None, SECONDARY, {'name': 'secondary'})
# SECONDARY_PREFERRED
assertReadFrom(other_secondary, SECONDARY_PREFERRED)
assertReadFrom(
other_secondary, SECONDARY_PREFERRED, self.other_secondary_dc)
# The secondary matching the tag is down, use primary
assertReadFrom(primary, SECONDARY_PREFERRED, {'name': 'secondary'})
# NEAREST
assertReadFromAll([primary, other_secondary], NEAREST)
assertReadFrom(other_secondary, NEAREST, {'name': 'other_secondary'})
assertReadFrom(primary, NEAREST, {'name': 'primary'})
# 4. PRIMARY UP, ALL SECONDARIES DOWN ---------------------------------
replset_tools.kill_members([unpartition_node(other_secondary)], 2)
self.assertTrue(Connection(
unpartition_node(primary), use_greenlets=use_greenlets,
slave_okay=True
).admin.command('ismaster')['ismaster'])
# PRIMARY
assertReadFrom(primary, PRIMARY)
# PRIMARY_PREFERRED
assertReadFrom(primary, PRIMARY_PREFERRED)
assertReadFrom(primary, PRIMARY_PREFERRED, self.secondary_dc)
# SECONDARY
assertReadFrom(None, SECONDARY)
assertReadFrom(None, SECONDARY, self.other_secondary_dc)
assertReadFrom(None, SECONDARY, {'dc': 'ny'})
# SECONDARY_PREFERRED
assertReadFrom(primary, SECONDARY_PREFERRED)
assertReadFrom(primary, SECONDARY_PREFERRED, self.secondary_dc)
assertReadFrom(primary, SECONDARY_PREFERRED, {'name': 'secondary'})
assertReadFrom(primary, SECONDARY_PREFERRED, {'dc': 'ny'})
# NEAREST
assertReadFrom(primary, NEAREST)
assertReadFrom(None, NEAREST, self.secondary_dc)
assertReadFrom(None, NEAREST, {'name': 'secondary'})
# Even if primary's slow, still read from it
self.set_ping_time(primary, 100)
assertReadFrom(primary, NEAREST)
assertReadFrom(None, NEAREST, self.secondary_dc)
self.clear_ping_times()
def tearDown(self):
self.c.close()
replset_tools.kill_all_members()
self.clear_ping_times()
if __name__ == '__main__':
if use_greenlets:
print('Using Gevent')
import gevent
print('gevent version %s' % gevent.__version__)
if gevent.__version__ == '0.13.6':
if gevent.__version__ >= '0.13.6':
print('method %s' % gevent.core.get_method())
else:
print(gevent.hub.get_hub())
from gevent import monkey
monkey.patch_socket()
sleep = gevent.sleep

View File

@ -16,6 +16,11 @@
"""Tests for the gridfs package.
"""
from gridfs.grid_file import GridIn
from pymongo.connection import Connection
from pymongo.errors import AutoReconnect
from pymongo.read_preferences import ReadPreference
from test.test_replica_set_connection import TestConnectionReplicaSetBase
try:
from io import BytesIO as StringIO
@ -341,5 +346,46 @@ class TestGridfs(unittest.TestCase):
)
class TestGridfsReplicaSet(TestConnectionReplicaSetBase):
def test_gridfs_replica_set(self):
rsc = self._get_connection(
w=self.w, wtimeout=5000,
read_preference=ReadPreference.SECONDARY)
try:
fs = gridfs.GridFS(rsc.pymongo_test)
oid = fs.put(b('foo'))
content = fs.get(oid).read()
self.assertEqual(b('foo'), content)
finally:
rsc.close()
def test_gridfs_secondary(self):
primary_host, primary_port = self.primary
primary_connection = Connection(primary_host, primary_port)
secondary_host, secondary_port = self.secondaries[0]
for secondary_connection in [
Connection(secondary_host, secondary_port, slave_okay=True),
Connection(secondary_host, secondary_port,
read_preference=ReadPreference.SECONDARY),
]:
primary_connection.pymongo_test.drop_collection("fs.files")
primary_connection.pymongo_test.drop_collection("fs.chunks")
# Should detect it's connected to secondary and not attempt to
# create index
fs = gridfs.GridFS(secondary_connection.pymongo_test)
# This won't detect secondary, raises error
self.assertRaises(AutoReconnect, fs.put, b('foo'))
def tearDown(self):
rsc = self._get_connection()
rsc.pymongo_test.drop_collection('fs.files')
rsc.pymongo_test.drop_collection('fs.chunks')
rsc.close()
if __name__ == "__main__":
unittest.main()

View File

@ -62,6 +62,16 @@ class TestMasterSlaveConnection(unittest.TestCase):
self.connection = MasterSlaveConnection(self.master, self.slaves)
self.db = self.connection.pymongo_test
def tearDown(self):
try:
self.db.test.drop_indexes()
except Exception:
# Tests like test_disconnect can monkey with the connection in ways
# that make this fail
pass
super(TestMasterSlaveConnection, self).tearDown()
def test_types(self):
self.assertRaises(TypeError, MasterSlaveConnection, 1)
self.assertRaises(TypeError, MasterSlaveConnection, self.master, 1)

View File

@ -14,6 +14,7 @@
"""Test the replica_set_connection module."""
import copy
import datetime
import os
import signal
@ -23,14 +24,15 @@ import time
import thread
import traceback
import unittest
sys.path[0:0] = [""]
from nose.plugins.skip import SkipTest
from bson.son import SON
from bson.tz_util import utc
from pymongo import ReadPreference
from pymongo.connection import Connection
from pymongo.read_preferences import ReadPreference
from pymongo.replica_set_connection import ReplicaSetConnection
from pymongo.replica_set_connection import _partition_node
from pymongo.database import Database
@ -40,7 +42,7 @@ from pymongo.errors import (AutoReconnect,
InvalidName,
OperationFailure)
from test import version
from test.utils import delay
from test.utils import delay, assertReadFrom, assertReadFromAll, read_from_which_host
host = os.environ.get("DB_IP", 'localhost')
@ -82,6 +84,10 @@ class TestConnectionReplicaSetBase(unittest.TestCase):
][0]
self.primary = _partition_node(primary_info['name'])
self.secondaries = [
_partition_node(m['name']) for m in repl_set_status['members']
if m['stateStr'] == 'SECONDARY'
]
else:
raise SkipTest()
@ -113,28 +119,65 @@ class TestConnection(TestConnectionReplicaSetBase):
self.assertEqual(c.primary, self.primary)
self.assertEqual(c.hosts, self.hosts)
self.assertEqual(c.arbiters, self.arbiters)
self.assertEqual(c.read_preference, ReadPreference.PRIMARY)
self.assertEqual(c.max_pool_size, 10)
self.assertEqual(c.document_class, dict)
self.assertEqual(c.tz_aware, False)
self.assertEqual(c.slave_okay, False)
self.assertEqual(c.safe, False)
# Make sure RSC's properties are copied to Database and Collection
for obj in c, c.pymongo_test, c.pymongo_test.test:
self.assertEqual(obj.read_preference, ReadPreference.PRIMARY)
self.assertEqual(obj.tag_sets, [{}])
self.assertEqual(obj.secondary_acceptable_latency_ms, 15)
self.assertEqual(obj.slave_okay, False)
self.assertEqual(obj.safe, False)
cursor = c.pymongo_test.test.find()
self.assertEqual(
ReadPreference.PRIMARY, cursor._Cursor__read_preference)
self.assertEqual([{}], cursor._Cursor__tag_sets)
self.assertEqual(15, cursor._Cursor__secondary_acceptable_latency_ms)
self.assertEqual(False, cursor._Cursor__slave_okay)
c.close()
tag_sets = [{'dc': 'la', 'rack': '2'}, {'foo': 'bar'}]
c = ReplicaSetConnection(pair, replicaSet=self.name, max_pool_size=25,
document_class=SON, tz_aware=True,
slaveOk=False, safe=True,
read_preference=ReadPreference.SECONDARY)
read_preference=ReadPreference.SECONDARY,
tag_sets=copy.deepcopy(tag_sets),
secondary_acceptable_latency_ms=77)
c.admin.command('ping')
self.assertEqual(c.primary, self.primary)
self.assertEqual(c.hosts, self.hosts)
self.assertEqual(c.arbiters, self.arbiters)
self.assertEqual(c.read_preference, ReadPreference.SECONDARY)
self.assertEqual(c.max_pool_size, 25)
self.assertEqual(c.document_class, SON)
self.assertEqual(c.tz_aware, True)
self.assertEqual(c.slave_okay, False)
self.assertEqual(c.safe, True)
for obj in c, c.pymongo_test, c.pymongo_test.test:
self.assertEqual(obj.read_preference, ReadPreference.SECONDARY)
self.assertEqual(obj.tag_sets, tag_sets)
self.assertEqual(obj.secondary_acceptable_latency_ms, 77)
self.assertEqual(obj.slave_okay, False)
self.assertEqual(obj.safe, True)
cursor = c.pymongo_test.test.find()
self.assertEqual(
ReadPreference.SECONDARY, cursor._Cursor__read_preference)
self.assertEqual(tag_sets, cursor._Cursor__tag_sets)
self.assertEqual(77, cursor._Cursor__secondary_acceptable_latency_ms)
self.assertEqual(False, cursor._Cursor__slave_okay)
cursor = c.pymongo_test.test.find(
read_preference=ReadPreference.NEAREST,
tag_sets=[{'dc':'ny'}, {}],
secondary_acceptable_latency_ms=123)
self.assertEqual(
ReadPreference.NEAREST, cursor._Cursor__read_preference)
self.assertEqual([{'dc':'ny'}, {}], cursor._Cursor__tag_sets)
self.assertEqual(123, cursor._Cursor__secondary_acceptable_latency_ms)
self.assertEqual(False, cursor._Cursor__slave_okay)
if version.at_least(c, (1, 7, 4)):
self.assertEqual(c.max_bson_size, 16777216)
@ -315,17 +358,17 @@ class TestConnection(TestConnectionReplicaSetBase):
self.assertRaises(TypeError, iterate)
connection.close()
def test_close(self):
def test_disconnect(self):
c = self._get_connection()
coll = c.foo.bar
c.close()
c.close()
c.disconnect()
c.disconnect()
coll.count()
c.close()
c.close()
c.disconnect()
c.disconnect()
coll.count()
@ -614,7 +657,8 @@ class TestConnection(TestConnectionReplicaSetBase):
# auto_start_request should default to True
conn = self._get_connection()
pools = [mongo['pool'] for mongo in conn._ReplicaSetConnection__pools.values()]
pools = [mongo.pool for mongo in
conn._ReplicaSetConnection__members.values()]
self.assertTrue(conn.auto_start_request)
self.assertTrue(conn.in_request())
@ -630,6 +674,7 @@ class TestConnection(TestConnectionReplicaSetBase):
self.assertFalse(pool.in_request())
conn.start_request()
self.assertTrue(conn.in_request())
conn.close()
conn = self._get_connection(auto_start_request=False)
self.assertFalse(conn.in_request())
@ -637,6 +682,72 @@ class TestConnection(TestConnectionReplicaSetBase):
self.assertTrue(conn.in_request())
conn.end_request()
self.assertFalse(conn.in_request())
conn.close()
def test_schedule_refresh(self):
# Monitor thread starts waiting for _refresh_interval, 30 seconds
conn = self._get_connection()
# Reconnect if necessary
conn.pymongo_test.test.find_one()
secondaries = conn.secondaries
for secondary in secondaries:
conn._ReplicaSetConnection__members[secondary].up = False
conn._ReplicaSetConnection__members[conn.primary].up = False
# Wake up monitor thread
conn._ReplicaSetConnection__schedule_refresh()
# Refresh interval is 30 seconds; scheduling a refresh tells the
# monitor thread / greenlet to start a refresh now. We still need to
# sleep a few seconds for it to complete.
time.sleep(5)
for secondary in secondaries:
self.assertTrue(conn._ReplicaSetConnection__members[secondary].up,
"ReplicaSetConnection didn't detect secondary is up")
self.assertTrue(conn._ReplicaSetConnection__members[conn.primary].up,
"ReplicaSetConnection didn't detect primary is up")
conn.close()
def test_pinned_member(self):
conn = self._get_connection(
auto_start_request=False, secondary_acceptable_latency_ms=1000*1000)
host = read_from_which_host(conn, ReadPreference.SECONDARY)
self.assertTrue(host in conn.secondaries)
# No pinning since we're not in a request
assertReadFromAll(
self, conn, conn.secondaries, ReadPreference.SECONDARY)
assertReadFromAll(
self, conn, list(conn.secondaries) + [conn.primary],
ReadPreference.NEAREST)
conn.start_request()
host = read_from_which_host(conn, ReadPreference.SECONDARY)
self.assertTrue(host in conn.secondaries)
assertReadFrom(self, conn, host, ReadPreference.SECONDARY)
# Repin
primary = read_from_which_host(conn, ReadPreference.PRIMARY)
self.assertEqual(conn.primary, primary)
assertReadFrom(self, conn, primary, ReadPreference.NEAREST)
# Repin again
host = read_from_which_host(conn, ReadPreference.SECONDARY)
self.assertTrue(host in conn.secondaries)
assertReadFrom(self, conn, host, ReadPreference.SECONDARY)
# Unpin
conn.end_request()
assertReadFromAll(
self, conn, list(conn.secondaries) + [conn.primary],
ReadPreference.NEAREST)
if __name__ == "__main__":

View File

@ -25,9 +25,7 @@ from test.test_connection import get_connection
from pymongo.connection import Connection
from pymongo.replica_set_connection import ReplicaSetConnection
from pymongo.pool import SocketInfo, _closed
from pymongo.errors import (AutoReconnect,
OperationFailure,
DuplicateKeyError)
from pymongo.errors import AutoReconnect, OperationFailure
def get_pool(connection):
@ -35,8 +33,8 @@ def get_pool(connection):
return connection._Connection__pool
elif isinstance(connection, ReplicaSetConnection):
writer = connection._ReplicaSetConnection__writer
pools = connection._ReplicaSetConnection__pools
return pools[writer]['pool']
pools = connection._ReplicaSetConnection__members
return pools[writer].pool
else:
raise TypeError(str(connection))
@ -447,5 +445,6 @@ class TestThreads(BaseTestThreads, unittest.TestCase):
class TestThreadsAuth(BaseTestThreadsAuth, unittest.TestCase):
pass
if __name__ == "__main__":
unittest.main()

View File

@ -15,18 +15,13 @@
"""Test that pymongo is thread safe."""
import unittest
import os
from nose.plugins.skip import SkipTest
from pymongo.replica_set_connection import ReplicaSetConnection
from test.test_threads import (IgnoreAutoReconnect,
BaseTestThreads,
BaseTestThreadsAuth)
from test.test_threads import BaseTestThreads, BaseTestThreadsAuth
from test.test_replica_set_connection import (TestConnectionReplicaSetBase,
pair)
from pymongo.errors import AutoReconnect
class TestThreadsReplicaSet(TestConnectionReplicaSetBase, BaseTestThreads):
def setUp(self):
@ -70,6 +65,7 @@ class TestThreadsAuthReplicaSet(TestConnectionReplicaSetBase, BaseTestThreadsAut
"""
return ReplicaSetConnection(pair, replicaSet=self.name)
if __name__ == "__main__":
suite = unittest.TestSuite([
unittest.makeSuite(TestThreadsReplicaSet),

View File

@ -15,6 +15,9 @@
"""Utilities for testing pymongo
"""
from pymongo.errors import AutoReconnect
def delay(sec):
# Javascript sleep() only available in MongoDB since version ~1.9
return '''function() {
@ -48,3 +51,73 @@ def joinall(threads):
def is_mongos(conn):
res = conn.admin.command('ismaster')
return res.get('msg', '') == 'isdbgrid'
def read_from_which_host(
rsc,
mode,
tag_sets=None,
secondary_acceptable_latency_ms=15
):
"""Read from a ReplicaSetConnection with the given Read Preference mode,
tags, and acceptable latency. Return the 'host:port' which was read from.
:Parameters:
- `rsc`: A ReplicaSetConnection
- `mode`: A ReadPreference
- `tag_sets`: List of dicts of tags for data-center-aware reads
- `secondary_acceptable_latency_ms`: a float
"""
db = rsc.pymongo_test
db.read_preference = mode
if isinstance(tag_sets, dict):
tag_sets = [tag_sets]
db.tag_sets = tag_sets or [{}]
db.secondary_acceptable_latency_ms = secondary_acceptable_latency_ms
cursor = db.test.find()
try:
try:
cursor.next()
except StopIteration:
# No documents in collection, that's fine
pass
return cursor._Cursor__connection_id
except AutoReconnect:
return None
def assertReadFrom(testcase, rsc, member, *args, **kwargs):
"""Check that a query with the given mode, tag_sets, and
secondary_acceptable_latency_ms reads from the expected replica-set
member
:Parameters:
- `testcase`: A unittest.TestCase
- `rsc`: A ReplicaSetConnection
- `member`: replica_set_connection.Member expected to be used
- `mode`: A ReadPreference
- `tag_sets`: List of dicts of tags for data-center-aware reads
- `secondary_acceptable_latency_ms`: a float
"""
for _ in range(10):
testcase.assertEqual(member, read_from_which_host(rsc, *args, **kwargs))
def assertReadFromAll(testcase, rsc, members, *args, **kwargs):
"""Check that a query with the given mode, tag_sets, and
secondary_acceptable_latency_ms can read from any of a set of
replica-set members.
:Parameters:
- `testcase`: A unittest.TestCase
- `rsc`: A ReplicaSetConnection
- `members`: Sequence of replica_set_connection.Member expected to be used
- `mode`: A ReadPreference
- `tag_sets`: List of dicts of tags for data-center-aware reads
- `secondary_acceptable_latency_ms`: a float
"""
members = set(members)
used = set()
for _ in range(100):
used.add(read_from_which_host(rsc, *args, **kwargs))
testcase.assertEqual(members, used)