PYTHON-1332 - Gossip $clusterTime

This commit is contained in:
A. Jesse Jiryu Davis 2017-10-05 16:03:35 -04:00
parent f0b847adb8
commit df018e88e2
22 changed files with 459 additions and 160 deletions

View File

@ -388,6 +388,7 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) {
struct module_state *state = GETSTATE(self);
int request_id = rand();
PyObject* cluster_time = NULL;
unsigned int flags;
char* collection_name = NULL;
int collection_name_length;
@ -429,6 +430,34 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) {
PyErr_NoMemory();
return NULL;
}
/* Pop $clusterTime from dict and write it at the end, avoiding an error
* from the $-prefix and check_keys.
*
* If "dict" is a defaultdict we don't want to call PyMapping_GetItemString
* on it. That would **create** an _id where one didn't previously exist
* (PYTHON-871).
*/
if (PyDict_Check(query)) {
cluster_time = PyDict_GetItemString(query, "$clusterTime");
if (cluster_time) {
/* PyDict_GetItemString returns a borrowed reference. */
Py_INCREF(cluster_time);
if (-1 == PyMapping_DelItemString(query, "$clusterTime")) {
destroy_codec_options(&options);
PyMem_Free(collection_name);
return NULL;
}
}
} else if (PyMapping_HasKeyString(query, "$clusterTime")) {
cluster_time = PyMapping_GetItemString(query, "$clusterTime");
if (!cluster_time
|| -1 == PyMapping_DelItemString(query, "$clusterTime")) {
destroy_codec_options(&options);
PyMem_Free(collection_name);
return NULL;
}
}
if (!buffer_write_int32(buffer, (int32_t)request_id) ||
!buffer_write_bytes(buffer, "\x00\x00\x00\x00\xd4\x07\x00\x00", 8) ||
!buffer_write_int32(buffer, (int32_t)flags) ||
@ -439,6 +468,7 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) {
destroy_codec_options(&options);
buffer_free(buffer);
PyMem_Free(collection_name);
Py_XDECREF(cluster_time);
return NULL;
}
@ -447,8 +477,49 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) {
destroy_codec_options(&options);
buffer_free(buffer);
PyMem_Free(collection_name);
Py_XDECREF(cluster_time);
return NULL;
}
/* back up a byte and write $clusterTime */
if (cluster_time) {
int length;
char zero = 0;
buffer_update_position(buffer, buffer_get_position(buffer) - 1);
if (!write_pair(state->_cbson, buffer, "$clusterTime", 12, cluster_time,
0, &options, 1)) {
destroy_codec_options(&options);
buffer_free(buffer);
PyMem_Free(collection_name);
Py_DECREF(cluster_time);
return NULL;
}
if (!buffer_write_bytes(buffer, &zero, 1)) {
destroy_codec_options(&options);
buffer_free(buffer);
PyMem_Free(collection_name);
Py_DECREF(cluster_time);
return NULL;
}
length = buffer_get_position(buffer) - begin;
buffer_write_int32_at_position(buffer, begin, (int32_t)length);
/* undo popping $clusterTime */
if (-1 == PyMapping_SetItemString(
query, "$clusterTime", cluster_time)) {
destroy_codec_options(&options);
buffer_free(buffer);
PyMem_Free(collection_name);
Py_DECREF(cluster_time);
return NULL;
}
Py_DECREF(cluster_time);
}
max_size = buffer_get_position(buffer) - begin;
if (field_selector != Py_None) {

View File

@ -307,7 +307,8 @@ class _Bulk(object):
}
op_id = _randint()
db_name = self.collection.database.name
listeners = self.collection.database.client._event_listeners
client = self.collection.database.client
listeners = client._event_listeners
with self.collection.database.client._tmp_session(session) as s:
# sock_info.command checks auth, but we use sock_info.write_command.
@ -321,6 +322,7 @@ class _Bulk(object):
cmd['bypassDocumentValidation'] = True
if s:
cmd['lsid'] = s._use_lsid()
client._send_cluster_time(cmd)
bwc = _BulkWriteContext(db_name, cmd, sock_info, op_id,
listeners, s)
@ -329,6 +331,9 @@ class _Bulk(object):
run.ops, True, self.collection.codec_options, bwc)
_merge_command(run, full_result, results)
last_result = results[-1][1]
client._receive_cluster_time(last_result)
# We're supposed to continue if errors are
# at the write concern level (e.g. wtimeout)
if self.ordered and full_result['writeErrors']:

View File

@ -31,6 +31,7 @@ from pymongo import (common,
message)
from pymongo.bulk import BulkOperationBuilder, _Bulk
from pymongo.command_cursor import CommandCursor, RawBatchCommandCursor
from pymongo.common import ORDERED_TYPES
from pymongo.collation import validate_collation_or_none
from pymongo.change_stream import ChangeStream
from pymongo.cursor import Cursor, RawBatchCursor
@ -47,12 +48,6 @@ from pymongo.results import (BulkWriteResult,
UpdateResult)
from pymongo.write_concern import WriteConcern
try:
from collections import OrderedDict
_ORDERED_TYPES = (SON, OrderedDict)
except ImportError:
_ORDERED_TYPES = (SON,)
_NO_OBJ_ERROR = "No matching object found"
_UJOIN = u"%s.%s"
@ -243,7 +238,8 @@ class Collection(common.BaseObject):
write_concern=write_concern,
parse_write_concern_error=parse_write_concern_error,
collation=collation,
session=s)
session=s,
client=self.__database.client)
def __create(self, options, collation, session):
"""Sends a create command with the given options.
@ -573,7 +569,8 @@ class Collection(common.BaseObject):
command,
codec_options=self.__write_response_codec_options,
check_keys=check_keys,
session=s)
session=s,
client=self.__database.client)
_check_write_command_response([(0, result)])
else:
# Legacy OP_INSERT.
@ -811,7 +808,9 @@ class Collection(common.BaseObject):
self.__database.name,
command,
codec_options=self.__write_response_codec_options,
session=s).copy()
session=s,
client=self.__database.client).copy()
_check_write_command_response([(0, result)])
# Add the updatedExisting field for compatibility.
if result.get('n') and 'upserted' not in result:
@ -1089,7 +1088,8 @@ class Collection(common.BaseObject):
self.__database.name,
command,
codec_options=self.__write_response_codec_options,
session=s)
session=s,
client=self.__database.client)
_check_write_command_response([(0, result)])
return result
else:
@ -2043,7 +2043,8 @@ class Collection(common.BaseObject):
parse_write_concern_error=dollar_out,
read_concern=read_concern,
collation=collation,
session=session)
session=session,
client=self.__database.client)
if "cursor" in result:
cursor = result["cursor"]
@ -2349,8 +2350,9 @@ class Collection(common.BaseObject):
if sock_info.max_wire_version >= 5 and self.write_concern:
cmd['writeConcern'] = self.write_concern.document
cmd.update(kwargs)
sock_info.command('admin', cmd, parse_write_concern_error=True,
session=s)
return sock_info.command(
'admin', cmd, parse_write_concern_error=True,
session=s, client=self.__database.client)
def distinct(self, key, filter=None, session=None, **kwargs):
"""Get a list of distinct values for `key` among all documents
@ -2974,7 +2976,7 @@ class Collection(common.BaseObject):
kwargs['sort'] = helpers._index_document(sort)
# Accept OrderedDict, SON, and dict with len == 1 so we
# don't break existing code already using find_and_modify.
elif (isinstance(sort, _ORDERED_TYPES) or
elif (isinstance(sort, ORDERED_TYPES) or
isinstance(sort, dict) and len(sort) == 1):
warnings.warn("Passing mapping types for `sort` is deprecated,"
" use a list of (key, direction) pairs instead",

View File

@ -129,6 +129,10 @@ class CommandCursor(object):
client = self.__collection.database.client
listeners = client._event_listeners
publish = listeners.enabled_for_commands
start = datetime.datetime.now()
def duration(): return datetime.datetime.now() - start
try:
response = client._send_message_with_response(
operation, address=self.__address)
@ -140,27 +144,24 @@ class CommandCursor(object):
kill()
raise
cmd_duration = response.duration
rqst_id = response.request_id
from_command = response.from_command
reply = response.data
if publish:
start = datetime.datetime.now()
try:
docs = self._unpack_response(reply,
self.__id,
self.__collection.codec_options)
if from_command:
client._receive_cluster_time(docs[0])
helpers._check_command_response(docs[0])
except OperationFailure as exc:
kill()
if publish:
duration = (datetime.datetime.now() - start) + cmd_duration
listeners.publish_command_failure(
duration, exc.details, "getMore", rqst_id, self.__address)
duration(), exc.details, "getMore", rqst_id, self.__address)
raise
except NotMasterError as exc:
@ -169,17 +170,15 @@ class CommandCursor(object):
kill()
if publish:
duration = (datetime.datetime.now() - start) + cmd_duration
listeners.publish_command_failure(
duration, exc.details, "getMore", rqst_id, self.__address)
duration(), exc.details, "getMore", rqst_id, self.__address)
client._reset_server_and_request_check(self.address)
raise
except Exception as exc:
if publish:
duration = (datetime.datetime.now() - start) + cmd_duration
listeners.publish_command_failure(
duration, _convert_exception(exc), "getMore", rqst_id,
duration(), _convert_exception(exc), "getMore", rqst_id,
self.__address)
raise
@ -187,19 +186,22 @@ class CommandCursor(object):
cursor = docs[0]['cursor']
documents = cursor['nextBatch']
self.__id = cursor['id']
if publish:
listeners.publish_command_success(
duration(), docs[0], "getMore", rqst_id,
self.__address)
else:
documents = docs
self.__id = reply.cursor_id
if publish:
duration = (datetime.datetime.now() - start) + cmd_duration
# Must publish in getMore command response format.
res = {"cursor": {"id": self.__id,
"ns": self.__collection.full_name,
"nextBatch": documents},
"ok": 1}
listeners.publish_command_success(
duration, res, "getMore", rqst_id, self.__address)
if publish:
# Must publish in getMore command response format.
res = {"cursor": {"id": self.__id,
"ns": self.__collection.full_name,
"nextBatch": documents},
"ok": 1}
listeners.publish_command_success(
duration(), res, "getMore", rqst_id, self.__address)
if self.__id == 0:
kill()
@ -227,6 +229,7 @@ class CommandCursor(object):
self.__id,
self.__collection.codec_options,
self.__session,
self.__collection.database.client,
self.__max_await_time_ms))
else: # Cursor id is zero nothing else to return
self.__killed = True

View File

@ -19,6 +19,7 @@ import collections
import datetime
import warnings
from bson import SON
from bson.binary import (STANDARD, PYTHON_LEGACY,
JAVA_LEGACY, CSHARP_LEGACY)
from bson.codec_options import CodecOptions
@ -32,6 +33,13 @@ from pymongo.read_preferences import _MONGOS_MODES, _ServerMode
from pymongo.ssl_support import validate_cert_reqs
from pymongo.write_concern import WriteConcern
try:
from collections import OrderedDict
ORDERED_TYPES = (SON, OrderedDict)
except ImportError:
ORDERED_TYPES = (SON,)
# Defaults until we connect to a server and get updated limits.
MAX_BSON_SIZE = 16 * (1024 ** 2)
MAX_MESSAGE_SIZE = 2 * MAX_BSON_SIZE

View File

@ -904,6 +904,9 @@ class Cursor(object):
listeners = client._event_listeners
publish = listeners.enabled_for_commands
from_command = False
start = datetime.datetime.now()
def duration(): return datetime.datetime.now() - start
if operation:
kwargs = {
@ -924,7 +927,6 @@ class Cursor(object):
cmd_name = operation.name
reply = response.data
cmd_duration = response.duration
rqst_id = response.request_id
from_command = response.from_command
except AutoReconnect:
@ -948,28 +950,23 @@ class Cursor(object):
cmd['maxTimeMS'] = self.__max_time_ms
listeners.publish_command_start(
cmd, self.__collection.database.name, 0, self.__address)
start = datetime.datetime.now()
try:
reply = self.__exhaust_mgr.sock.receive_message(None)
except Exception as exc:
if publish:
duration = datetime.datetime.now() - start
listeners.publish_command_failure(
duration, _convert_exception(exc), cmd_name, rqst_id,
duration(), _convert_exception(exc), cmd_name, rqst_id,
self.__address)
if isinstance(exc, ConnectionFailure):
self.__die()
raise
if publish:
cmd_duration = datetime.datetime.now() - start
if publish:
start = datetime.datetime.now()
try:
docs = self._unpack_response(response=reply,
cursor_id=self.__id,
codec_options=self.__codec_options)
if from_command:
client._receive_cluster_time(docs[0])
helpers._check_command_response(docs[0])
except OperationFailure as exc:
self.__killed = True
@ -978,9 +975,8 @@ class Cursor(object):
self.__die()
if publish:
duration = (datetime.datetime.now() - start) + cmd_duration
listeners.publish_command_failure(
duration, exc.details, cmd_name, rqst_id, self.__address)
duration(), exc.details, cmd_name, rqst_id, self.__address)
# If this is a tailable cursor the error is likely
# due to capped collection roll over. Setting
@ -998,22 +994,19 @@ class Cursor(object):
self.__die()
if publish:
duration = (datetime.datetime.now() - start) + cmd_duration
listeners.publish_command_failure(
duration, exc.details, cmd_name, rqst_id, self.__address)
duration(), exc.details, cmd_name, rqst_id, self.__address)
client._reset_server_and_request_check(self.__address)
raise
except Exception as exc:
if publish:
duration = (datetime.datetime.now() - start) + cmd_duration
listeners.publish_command_failure(
duration, _convert_exception(exc), cmd_name, rqst_id,
duration(), _convert_exception(exc), cmd_name, rqst_id,
self.__address)
raise
if publish:
duration = (datetime.datetime.now() - start) + cmd_duration
# Must publish in find / getMore / explain command response format.
if from_command:
res = docs[0]
@ -1028,7 +1021,7 @@ class Cursor(object):
else:
res["cursor"]["nextBatch"] = docs
listeners.publish_command_success(
duration, res, cmd_name, rqst_id, self.__address)
duration(), res, cmd_name, rqst_id, self.__address)
if from_command and cmd_name != "explain":
cursor = docs[0]['cursor']
@ -1085,7 +1078,8 @@ class Cursor(object):
self.__batch_size,
self.__read_concern,
self.__collation,
self.__session)
self.__session,
self.__collection.database.client)
self.__send_message(q)
if not self.__id:
self.__killed = True
@ -1107,6 +1101,7 @@ class Cursor(object):
self.__id,
self.__codec_options,
self.__session,
self.__collection.database.client,
self.__max_await_time_ms)
self.__send_message(g)

View File

@ -434,19 +434,21 @@ class Database(common.BaseObject):
check,
allowable_errors,
parse_write_concern_error=parse_write_concern_error,
session=session)
session=session,
client=self.__client)
with self.__client._tmp_session(session) as s:
return sock_info.command(
self.__name,
command,
slave_ok,
read_preference,
codec_options,
check,
allowable_errors,
parse_write_concern_error=parse_write_concern_error,
session=s)
self.__name,
command,
slave_ok,
read_preference,
codec_options,
check,
allowable_errors,
parse_write_concern_error=parse_write_concern_error,
session=s,
client=self.__client)
def command(self, command, value=1, check=True,
allowable_errors=None, read_preference=ReadPreference.PRIMARY,
@ -719,7 +721,8 @@ class Database(common.BaseObject):
with self.__client._socket_for_writes() as sock_info:
if sock_info.max_wire_version >= 4:
with self.__client._tmp_session(session) as s:
return sock_info.command("admin", cmd, session=s)
return sock_info.command("admin", cmd, session=s,
client=self.__client)
else:
spec = {"$all": True} if include_all else {}
x = _first_batch(sock_info, "admin", "$cmd.sys.inprog",

View File

@ -127,6 +127,10 @@ class IsMaster(object):
def election_id(self):
return self._doc.get('electionId')
@property
def cluster_time(self):
return self._doc.get('$clusterTime')
@property
def logical_session_timeout_minutes(self):
return self._doc.get('logicalSessionTimeoutMinutes')

View File

@ -172,10 +172,10 @@ _MODIFIERS = SON([
def _gen_explain_command(
coll, spec, projection, skip, limit, batch_size,
options, read_concern, session):
options, read_concern, session, client):
"""Generate an explain command document."""
cmd = _gen_find_command(coll, spec, projection, skip, limit, batch_size,
options, session=None)
options, session=None, client=None)
if read_concern.level:
explain = SON([('explain', cmd), ('readConcern', read_concern.document)])
else:
@ -184,11 +184,12 @@ def _gen_explain_command(
if session:
explain['lsid'] = session._use_lsid()
client._send_cluster_time(explain)
return explain
def _gen_find_command(coll, spec, projection, skip, limit, batch_size, options,
session, read_concern=DEFAULT_READ_CONCERN,
session, client, read_concern=DEFAULT_READ_CONCERN,
collation=None):
"""Generate a find command document."""
cmd = SON([('find', coll)])
@ -222,11 +223,13 @@ def _gen_find_command(coll, spec, projection, skip, limit, batch_size, options,
if options & val])
if session:
cmd['lsid'] = session._use_lsid()
if client:
client._send_cluster_time(cmd)
return cmd
def _gen_get_more_command(cursor_id, coll, batch_size, max_await_time_ms,
session):
session, client):
"""Generate a getMore command document."""
cmd = SON([('getMore', cursor_id),
('collection', coll)])
@ -236,6 +239,7 @@ def _gen_get_more_command(cursor_id, coll, batch_size, max_await_time_ms,
cmd['maxTimeMS'] = max_await_time_ms
if session:
cmd['lsid'] = session._use_lsid()
client._send_cluster_time(cmd)
return cmd
@ -245,11 +249,11 @@ class _Query(object):
__slots__ = ('flags', 'db', 'coll', 'ntoskip', 'spec',
'fields', 'codec_options', 'read_preference', 'limit',
'batch_size', 'name', 'read_concern', 'collation',
'session')
'session', 'client')
def __init__(self, flags, db, coll, ntoskip, spec, fields,
codec_options, read_preference, limit,
batch_size, read_concern, collation, session):
batch_size, read_concern, collation, session, client):
self.flags = flags
self.db = db
self.coll = coll
@ -263,6 +267,7 @@ class _Query(object):
self.batch_size = batch_size
self.collation = collation
self.session = session
self.client = client
self.name = 'find'
def use_command(self, sock_info, exhaust):
@ -296,11 +301,11 @@ class _Query(object):
return _gen_explain_command(
self.coll, self.spec, self.fields, self.ntoskip,
self.limit, self.batch_size, self.flags,
self.read_concern, self.session), self.db
self.read_concern, self.session, self.client), self.db
return _gen_find_command(self.coll, self.spec, self.fields,
self.ntoskip, self.limit, self.batch_size,
self.flags, self.session, self.read_concern,
self.collation), self.db
self.flags, self.session, self.client,
self.read_concern, self.collation), self.db
def get_message(self, set_slave_ok, is_mongos, use_cmd=False):
"""Get a query message, possibly setting the slaveOk bit."""
@ -340,19 +345,20 @@ class _GetMore(object):
"""A getmore operation."""
__slots__ = ('db', 'coll', 'ntoreturn', 'cursor_id', 'max_await_time_ms',
'codec_options', 'session')
'codec_options', 'session', 'client')
name = 'getMore'
def __init__(self, db, coll, ntoreturn, cursor_id, codec_options, session,
max_await_time_ms=None):
client, max_await_time_ms=None):
self.db = db
self.coll = coll
self.ntoreturn = ntoreturn
self.cursor_id = cursor_id
self.codec_options = codec_options
self.max_await_time_ms = max_await_time_ms
self.session = session
self.client = client
self.max_await_time_ms = max_await_time_ms
def use_command(self, sock_info, exhaust):
sock_info.check_session_auth_matches(self.session)
@ -363,7 +369,8 @@ class _GetMore(object):
return _gen_get_more_command(self.cursor_id, self.coll,
self.ntoreturn,
self.max_await_time_ms,
self.session), self.db
self.session,
self.client), self.db
def get_message(self, dummy0, dummy1, use_cmd=False):
"""Get a getmore message."""
@ -507,7 +514,19 @@ def query(options, collection_name, num_to_skip,
data += bson._make_c_string(collection_name)
data += struct.pack("<i", num_to_skip)
data += struct.pack("<i", num_to_return)
encoded = bson.BSON.encode(query, check_keys, opts)
if check_keys:
# Temporarily remove $clusterTime to avoid an error from the $-prefix.
cluster_time = query.pop('$clusterTime', None)
encoded = bson.BSON.encode(query, True, opts)
if cluster_time is not None:
extra = bson._name_value_to_bson(
b"$clusterTime\x00", cluster_time, False, opts)
encoded = (
bson._PACK_INT(len(encoded) + len(extra))
+ encoded[4:-1] + extra + b'\x00')
query['$clusterTime'] = cluster_time
else:
encoded = bson.BSON.encode(query, False, opts)
data += encoded
max_bson_size = len(encoded)
if field_selector is not None:
@ -977,7 +996,8 @@ def _first_batch(sock_info, db, coll, query, ntoreturn,
"""Simple query helper for retrieving a first (and possibly only) batch."""
query = _Query(
0, db, coll, 0, query, None, codec_options,
read_preference, ntoreturn, 0, DEFAULT_READ_CONCERN, None, session)
read_preference, ntoreturn, 0, DEFAULT_READ_CONCERN, None, session,
None)
name = next(iter(cmd))
publish = listeners.enabled_for_commands

View File

@ -1140,7 +1140,7 @@ class MongoClient(common.BaseObject):
spec = SON([('killCursors', coll), ('cursors', cursor_ids)])
with server.get_socket(self.__all_credentials) as sock_info:
if sock_info.max_wire_version >= 4 and namespace is not None:
sock_info.command(db, spec, session=session)
sock_info.command(db, spec, session=session, client=self)
else:
if publish:
start = datetime.datetime.now()
@ -1271,6 +1271,14 @@ class MongoClient(common.BaseObject):
else:
yield None
def _send_cluster_time(self, command):
cluster_time = self._topology.max_cluster_time()
if cluster_time:
command['$clusterTime'] = cluster_time
def _receive_cluster_time(self, reply):
self._topology.receive_cluster_time(reply.get('$clusterTime'))
def server_info(self, session=None):
"""Get information about the MongoDB server we're connected to.
@ -1476,7 +1484,8 @@ class MongoClient(common.BaseObject):
if sock_info.max_wire_version >= 4:
try:
with self._tmp_session(session) as s:
sock_info.command("admin", cmd, session=s)
sock_info.command(
"admin", cmd, session=s, client=self)
except OperationFailure as exc:
# Ignore "DB not locked" to replicate old behavior
if exc.code != 125:

View File

@ -36,7 +36,7 @@ except ImportError:
from bson import SON
from pymongo import helpers, message
from pymongo.common import MAX_MESSAGE_SIZE
from pymongo.common import MAX_MESSAGE_SIZE, ORDERED_TYPES
from pymongo.errors import (AutoReconnect,
NotMasterError,
OperationFailure,
@ -49,7 +49,7 @@ _UNPACK_HEADER = struct.Struct("<iiii").unpack
def command(sock, dbname, spec, slave_ok, is_mongos,
read_preference, codec_options, session, check=True,
read_preference, codec_options, session, client, check=True,
allowable_errors=None, address=None,
check_keys=False, listeners=None, max_bson_size=None,
read_concern=DEFAULT_READ_CONCERN,
@ -66,6 +66,7 @@ def command(sock, dbname, spec, slave_ok, is_mongos,
- `read_preference`: a read preference
- `codec_options`: a CodecOptions instance
- `session`: optional ClientSession instance.
- `client`: optional MongoClient instance for updating $clusterTime.
- `check`: raise OperationFailure if there are errors
- `allowable_errors`: errors to ignore if `check` is True
- `address`: the (host, port) of `sock`
@ -80,13 +81,15 @@ def command(sock, dbname, spec, slave_ok, is_mongos,
name = next(iter(spec))
ns = dbname + '.$cmd'
flags = 4 if slave_ok else 0
if session is not None:
if spec.__class__ is dict:
# Ensure command name remains in first place.
spec = SON(spec)
if (client or session) and not isinstance(spec, ORDERED_TYPES):
# Ensure command name remains in first place.
spec = SON(spec)
if session:
spec['lsid'] = session._use_lsid()
if client:
client._send_cluster_time(spec)
# Publish the original command document, perhaps with session id.
# Publish the original command document, perhaps with lsid and $clusterTime.
orig = spec
if is_mongos:
spec = message._maybe_add_read_preference(spec, read_preference)
@ -118,6 +121,8 @@ def command(sock, dbname, spec, slave_ok, is_mongos,
unpacked_docs = reply.unpack_response(codec_options=codec_options)
response_doc = unpacked_docs[0]
if client:
client._receive_cluster_time(response_doc)
if check:
helpers._check_command_response(
response_doc, None, allowable_errors,

View File

@ -437,7 +437,8 @@ class SocketInfo(object):
write_concern=None,
parse_write_concern_error=False,
collation=None,
session=None):
session=None,
client=None):
"""Execute a command or raise an error.
:Parameters:
@ -455,6 +456,7 @@ class SocketInfo(object):
``writeConcernError`` field in the command response.
- `collation`: The collation for this command.
- `session`: optional ClientSession instance.
- `client`: optional MongoClient for gossipping $clusterTime.
"""
self.check_session_auth_matches(session)
if self.max_wire_version < 4 and not read_concern.ok_for_legacy:
@ -474,9 +476,9 @@ class SocketInfo(object):
try:
return command(self.sock, dbname, spec, slave_ok,
self.is_mongos, read_preference, codec_options,
session, check, allowable_errors, self.address,
check_keys, self.listeners, self.max_bson_size,
read_concern,
session, client, check, allowable_errors,
self.address, check_keys, self.listeners,
self.max_bson_size, read_concern,
parse_write_concern_error=parse_write_concern_error,
collation=collation)
except OperationFailure:
@ -823,6 +825,7 @@ class Pool:
False,
ReadPreference.PRIMARY,
DEFAULT_CODEC_OPTIONS,
None,
None))
else:
ismaster = None

View File

@ -46,7 +46,7 @@ class ServerDescription(object):
'_max_write_batch_size', '_min_wire_version', '_max_wire_version',
'_round_trip_time', '_me', '_is_writable', '_is_readable',
'_ls_timeout_minutes', '_error', '_set_version', '_election_id',
'_last_write_date', '_last_update_time')
'_cluster_time', '_last_write_date', '_last_update_time')
def __init__(
self,
@ -70,6 +70,7 @@ class ServerDescription(object):
self._max_wire_version = ismaster.max_wire_version
self._set_version = ismaster.set_version
self._election_id = ismaster.election_id
self._cluster_time = ismaster.cluster_time
self._is_writable = ismaster.is_writable
self._is_readable = ismaster.is_readable
self._ls_timeout_minutes = ismaster.logical_session_timeout_minutes
@ -150,6 +151,10 @@ class ServerDescription(object):
def election_id(self):
return self._election_id
@property
def cluster_time(self):
return self._cluster_time
@property
def election_tuple(self):
return self._set_version, self._election_id

View File

@ -108,6 +108,7 @@ class Topology(object):
self._condition = self._settings.condition_class(self._lock)
self._servers = {}
self._pid = None
self._max_cluster_time = None
self._session_pool = _ServerSessionPool()
if self._publish_server or self._publish_tp:
@ -266,6 +267,8 @@ class Topology(object):
self._description, server_description)
self._update_servers()
self._receive_cluster_time_no_lock(
server_description.cluster_time)
if self._publish_tp:
self._events.put((
@ -317,6 +320,28 @@ class Topology(object):
"""Return set of arbiter addresses."""
return self._get_replica_set_members(arbiter_server_selector)
def max_cluster_time(self):
"""Return a document, the highest seen $clusterTime."""
return self._max_cluster_time
def _receive_cluster_time_no_lock(self, cluster_time):
# Driver Sessions Spec: "Whenever a driver receives a cluster time from
# a server it MUST compare it to the current highest seen cluster time
# for the deployment. If the new cluster time is higher than the
# highest seen cluster time it MUST become the new highest seen cluster
# time. Two cluster times are compared using only the BsonTimestamp
# value of the clusterTime embedded field."
if cluster_time:
# ">" uses bson.timestamp.Timestamp's comparison operator.
if (not self._max_cluster_time
or cluster_time['clusterTime'] >
self._max_cluster_time['clusterTime']):
self._max_cluster_time = cluster_time
def receive_cluster_time(self, cluster_time):
with self._lock:
self._receive_cluster_time_no_lock(cluster_time)
def request_check_all(self, wait_time=5):
"""Wake all monitors, wait for at least one to check its server."""
with self._lock:

View File

@ -497,7 +497,29 @@ class ClientContext(object):
client_context = ClientContext()
class IntegrationTest(unittest.TestCase):
def sanitize_cmd(cmd):
cp = cmd.copy()
cp.pop('$clusterTime', None)
cp.pop('lsid', None)
return cp
def sanitize_reply(reply):
cp = reply.copy()
cp.pop('$clusterTime', None)
cp.pop('operationTime', None)
return cp
class PyMongoTestCase(unittest.TestCase):
def assertEqualCommand(self, expected, actual, msg=None):
self.assertEqual(expected, sanitize_cmd(actual), msg)
def assertEqualReply(self, expected, actual, msg=None):
self.assertEqual(expected, sanitize_reply(actual), msg)
class IntegrationTest(PyMongoTestCase):
"""Base class for TestCases that need a connection to MongoDB to pass."""
@classmethod

View File

@ -1055,7 +1055,7 @@ class TestClient(IntegrationTest):
client._send_message_with_response(
operation=message._GetMore('pymongo_test', 'collection',
101, 1234, client.codec_options,
None),
None, client),
address=('not-a-member', 27017))
def test_heartbeat_frequency_ms(self):

View File

@ -2233,7 +2233,7 @@ class TestCollection(IntegrationTest):
def test_find_command_generation(self):
cmd = _gen_find_command('coll', {'$query': {'foo': 1}, '$dumb': 2},
None, 0, 0, 0, None, None)
None, 0, 0, 0, None, None, None)
self.assertEqual(
cmd.to_dict(),
SON([('find', 'coll'),

View File

@ -20,7 +20,7 @@ import threading
sys.path[0:0] = [""]
from bson import json_util
from bson import json_util, Timestamp
from pymongo import common
from pymongo.errors import ConfigurationError
from pymongo.topology import Topology
@ -209,5 +209,33 @@ def create_tests():
create_tests()
class TestClusterTimeComparison(unittest.TestCase):
def test_cluster_time_comparison(self):
t = create_mock_topology('mongodb://host')
def send_cluster_time(time, inc, should_update):
old = t.max_cluster_time()
new = {'clusterTime': Timestamp(time, inc)}
got_ismaster(t,
('host', 27017),
{'ok': 1,
'minWireVersion': 0,
'maxWireVersion': 6,
'$clusterTime': new})
actual = t.max_cluster_time()
if should_update:
self.assertEqual(actual, new)
else:
self.assertEqual(actual, old)
send_cluster_time(0, 1, True)
send_cluster_time(2, 2, True)
send_cluster_time(2, 1, False)
send_cluster_time(1, 3, False)
send_cluster_time(2, 3, True)
if __name__ == "__main__":
unittest.main()

View File

@ -28,14 +28,18 @@ from pymongo.command_cursor import CommandCursor
from pymongo.errors import NotMasterError, OperationFailure
from pymongo.read_preferences import ReadPreference
from pymongo.write_concern import WriteConcern
from test import unittest, client_context, client_knobs
from test import (client_context,
client_knobs,
PyMongoTestCase,
sanitize_cmd,
unittest)
from test.utils import (EventListener,
rs_or_single_client,
single_client,
wait_until)
class TestCommandMonitoring(unittest.TestCase):
class TestCommandMonitoring(PyMongoTestCase):
@classmethod
@client_context.require_connection
@ -63,7 +67,7 @@ class TestCommandMonitoring(unittest.TestCase):
isinstance(succeeded, monitoring.CommandSucceededEvent))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqual(SON([('ismaster', 1)]), started.command)
self.assertEqualCommand(SON([('ismaster', 1)]), started.command)
self.assertEqual('ismaster', started.command_name)
self.assertEqual(self.client.address, started.connection_id)
self.assertEqual('pymongo_test', started.database_name)
@ -114,7 +118,7 @@ class TestCommandMonitoring(unittest.TestCase):
isinstance(succeeded, monitoring.CommandSucceededEvent))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqual(
self.assertEqualCommand(
SON([('find', 'test'),
('filter', {}),
('limit', 1),
@ -141,7 +145,7 @@ class TestCommandMonitoring(unittest.TestCase):
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqual(
self.assertEqualCommand(
SON([('find', 'test'),
('filter', {}),
('projection', {'_id': False}),
@ -173,7 +177,7 @@ class TestCommandMonitoring(unittest.TestCase):
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqual(
self.assertEqualCommand(
SON([('getMore', cursor_id),
('collection', 'test'),
('batchSize', 4)]),
@ -214,7 +218,7 @@ class TestCommandMonitoring(unittest.TestCase):
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqual(cmd, started.command)
self.assertEqualCommand(cmd, started.command)
self.assertEqual('explain', started.command_name)
self.assertEqual(self.client.address, started.connection_id)
self.assertEqual('pymongo_test', started.database_name)
@ -249,7 +253,7 @@ class TestCommandMonitoring(unittest.TestCase):
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqual(expected_cmd, started.command)
self.assertEqualCommand(expected_cmd, started.command)
self.assertEqual('find', started.command_name)
self.assertEqual(self.client.address, started.connection_id)
self.assertEqual('pymongo_test', started.database_name)
@ -332,7 +336,7 @@ class TestCommandMonitoring(unittest.TestCase):
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqual(
self.assertEqualCommand(
SON([('aggregate', 'test'),
('pipeline', [{'$project': {'_id': False, 'x': 1}}]),
('cursor', {'batchSize': 4})]),
@ -350,7 +354,7 @@ class TestCommandMonitoring(unittest.TestCase):
expected_cursor = {'id': cursor_id,
'ns': 'pymongo_test.test',
'firstBatch': [{'x': 1} for _ in range(4)]}
self.assertEqual(expected_cursor, succeeded.reply.get('cursor'))
self.assertEqualCommand(expected_cursor, succeeded.reply.get('cursor'))
self.listener.results.clear()
next(cursor)
@ -361,7 +365,7 @@ class TestCommandMonitoring(unittest.TestCase):
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqual(
self.assertEqualCommand(
SON([('getMore', cursor_id),
('collection', 'test'),
('batchSize', 4)]),
@ -380,8 +384,8 @@ class TestCommandMonitoring(unittest.TestCase):
'cursor': {'id': cursor_id,
'ns': 'pymongo_test.test',
'nextBatch': [{'x': 1} for _ in range(4)]},
'ok': 1}
self.assertEqual(expected_result, succeeded.reply)
'ok': 1.0}
self.assertEqualReply(expected_result, succeeded.reply)
finally:
# Exhaust the cursor to avoid kill cursors.
tuple(cursor)
@ -401,7 +405,7 @@ class TestCommandMonitoring(unittest.TestCase):
failed = results['failed'][0]
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqual(
self.assertEqualCommand(
SON([('getMore', 12345),
('collection', 'test')]),
started.command)
@ -462,7 +466,7 @@ class TestCommandMonitoring(unittest.TestCase):
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqual(
self.assertEqualCommand(
SON([('find', 'test'),
('filter', {}),
('projection', {'_id': False}),
@ -483,7 +487,7 @@ class TestCommandMonitoring(unittest.TestCase):
'ns': 'pymongo_test.test',
'firstBatch': [{} for _ in range(5)]},
'ok': 1}
self.assertEqual(expected_result, succeeded.reply)
self.assertEqualReply(expected_result, succeeded.reply)
self.listener.results.clear()
tuple(cursor)
@ -493,7 +497,7 @@ class TestCommandMonitoring(unittest.TestCase):
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqual(
self.assertEqualCommand(
SON([('getMore', cursor_id),
('collection', 'test'),
('batchSize', 5)]),
@ -513,7 +517,7 @@ class TestCommandMonitoring(unittest.TestCase):
'ns': 'pymongo_test.test',
'nextBatch': [{} for _ in range(5)]},
'ok': 1}
self.assertEqual(expected_result, succeeded.reply)
self.assertEqualReply(expected_result, succeeded.reply)
def test_kill_cursors(self):
with client_knobs(kill_cursor_frequency=0.01):
@ -566,7 +570,7 @@ class TestCommandMonitoring(unittest.TestCase):
expected = SON([('insert', coll.name),
('ordered', True),
('documents', [{'_id': res.inserted_id, 'x': 1}])])
self.assertEqual(expected, started.command)
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('insert', started.command_name)
self.assertIsInstance(started.request_id, int)
@ -593,7 +597,7 @@ class TestCommandMonitoring(unittest.TestCase):
('ordered', True),
('documents', [{'_id': res.inserted_id, 'x': 1}]),
('writeConcern', {'w': 0})])
self.assertEqual(expected, started.command)
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('insert', started.command_name)
self.assertIsInstance(started.request_id, int)
@ -603,7 +607,7 @@ class TestCommandMonitoring(unittest.TestCase):
self.assertEqual(started.command_name, succeeded.command_name)
self.assertEqual(started.request_id, succeeded.request_id)
self.assertEqual(started.connection_id, succeeded.connection_id)
self.assertEqual(succeeded.reply, {'ok': 1})
self.assertEqualReply(succeeded.reply, {'ok': 1})
# Explicit write concern insert_one
self.listener.results.clear()
@ -618,7 +622,7 @@ class TestCommandMonitoring(unittest.TestCase):
('ordered', True),
('documents', [{'_id': res.inserted_id, 'x': 1}]),
('writeConcern', {'w': 1})])
self.assertEqual(expected, started.command)
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('insert', started.command_name)
self.assertIsInstance(started.request_id, int)
@ -645,7 +649,7 @@ class TestCommandMonitoring(unittest.TestCase):
('deletes', [SON([('q', {'x': 1}),
('limit', 0)])]),
('writeConcern', {'w': 1})])
self.assertEqual(expected, started.command)
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('delete', started.command_name)
self.assertIsInstance(started.request_id, int)
@ -675,7 +679,7 @@ class TestCommandMonitoring(unittest.TestCase):
('multi', False),
('upsert', True)])]),
('writeConcern', {'w': 1})])
self.assertEqual(expected, started.command)
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('update', started.command_name)
self.assertIsInstance(started.request_id, int)
@ -705,7 +709,7 @@ class TestCommandMonitoring(unittest.TestCase):
('multi', False),
('upsert', False)])]),
('writeConcern', {'w': 1})])
self.assertEqual(expected, started.command)
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('update', started.command_name)
self.assertIsInstance(started.request_id, int)
@ -734,7 +738,7 @@ class TestCommandMonitoring(unittest.TestCase):
('multi', True),
('upsert', False)])]),
('writeConcern', {'w': 1})])
self.assertEqual(expected, started.command)
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('update', started.command_name)
self.assertIsInstance(started.request_id, int)
@ -761,7 +765,7 @@ class TestCommandMonitoring(unittest.TestCase):
('deletes', [SON([('q', {'x': 3}),
('limit', 1)])]),
('writeConcern', {'w': 1})])
self.assertEqual(expected, started.command)
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('delete', started.command_name)
self.assertIsInstance(started.request_id, int)
@ -793,7 +797,7 @@ class TestCommandMonitoring(unittest.TestCase):
('ordered', True),
('documents', [{'_id': 1}]),
('writeConcern', {'w': 1})])
self.assertEqual(expected, started.command)
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('insert', started.command_name)
self.assertIsInstance(started.request_id, int)
@ -831,7 +835,7 @@ class TestCommandMonitoring(unittest.TestCase):
expected = SON([('insert', coll.name),
('ordered', True),
('documents', [{'_id': _id, 'x': 1}])])
self.assertEqual(expected, started.command)
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('insert', started.command_name)
self.assertIsInstance(started.request_id, int)
@ -857,7 +861,7 @@ class TestCommandMonitoring(unittest.TestCase):
('ordered', True),
('documents', [{'_id': _id, 'x': 1}]),
('writeConcern', {'w': 0})])
self.assertEqual(expected, started.command)
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('insert', started.command_name)
self.assertIsInstance(started.request_id, int)
@ -881,7 +885,7 @@ class TestCommandMonitoring(unittest.TestCase):
('ordered', True),
('documents', [{'_id': _id, 'x': 1}]),
('writeConcern', {'w': 1})])
self.assertEqual(expected, started.command)
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('insert', started.command_name)
self.assertIsInstance(started.request_id, int)
@ -908,7 +912,7 @@ class TestCommandMonitoring(unittest.TestCase):
('deletes', [SON([('q', {'x': 1}),
('limit', 0)])]),
('writeConcern', {'w': 1})])
self.assertEqual(expected, started.command)
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('delete', started.command_name)
self.assertIsInstance(started.request_id, int)
@ -938,7 +942,7 @@ class TestCommandMonitoring(unittest.TestCase):
('multi', False),
('upsert', True)])]),
('writeConcern', {'w': 1})])
self.assertEqual(expected, started.command)
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('update', started.command_name)
self.assertIsInstance(started.request_id, int)
@ -967,7 +971,7 @@ class TestCommandMonitoring(unittest.TestCase):
('u', {'$inc': {'x': 1}}),
('multi', False),
('upsert', False)])])])
self.assertEqual(expected, started.command)
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('update', started.command_name)
self.assertIsInstance(started.request_id, int)
@ -995,7 +999,7 @@ class TestCommandMonitoring(unittest.TestCase):
('u', {'$inc': {'x': 1}}),
('multi', True),
('upsert', False)])])])
self.assertEqual(expected, started.command)
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('update', started.command_name)
self.assertIsInstance(started.request_id, int)
@ -1021,7 +1025,7 @@ class TestCommandMonitoring(unittest.TestCase):
('ordered', True),
('deletes', [SON([('q', {'x': 3}),
('limit', 1)])])])
self.assertEqual(expected, started.command)
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('delete', started.command_name)
self.assertIsInstance(started.request_id, int)
@ -1056,7 +1060,7 @@ class TestCommandMonitoring(unittest.TestCase):
self.assertIsInstance(operation_id, int)
for start, succeed in zip(started, succeeded):
self.assertIsInstance(start, monitoring.CommandStartedEvent)
cmd = start.command
cmd = sanitize_cmd(start.command)
self.assertEqual(['insert', 'ordered', 'documents'],
list(cmd.keys()))
self.assertEqual(coll.name, cmd['insert'])
@ -1102,7 +1106,7 @@ class TestCommandMonitoring(unittest.TestCase):
self.assertIsInstance(operation_id, int)
for start, succeed in zip(started, succeeded):
self.assertIsInstance(start, monitoring.CommandStartedEvent)
cmd = start.command
cmd = sanitize_cmd(start.command)
self.assertEqual(['insert', 'ordered', 'documents'],
list(cmd.keys()))
self.assertEqual(coll.name, cmd['insert'])
@ -1156,19 +1160,19 @@ class TestCommandMonitoring(unittest.TestCase):
expected = SON([('insert', coll.name),
('ordered', True),
('documents', [{'_id': 1}])])
self.assertEqual(expected, started[0].command)
self.assertEqualCommand(expected, started[0].command)
expected = SON([('update', coll.name),
('ordered', True),
('updates', [SON([('q', {'_id': 1}),
('u', {'$set': {'x': 1}}),
('multi', False),
('upsert', False)])])])
self.assertEqual(expected, started[1].command)
self.assertEqualCommand(expected, started[1].command)
expected = SON([('delete', coll.name),
('ordered', True),
('deletes', [SON([('q', {'_id': 1}),
('limit', 1)])])])
self.assertEqual(expected, started[2].command)
self.assertEqualCommand(expected, started[2].command)
def test_write_errors(self):
coll = self.client.pymongo_test.test
@ -1221,7 +1225,7 @@ class TestCommandMonitoring(unittest.TestCase):
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('listCollections', 1), ('cursor', {})])
self.assertEqual(expected, started.command)
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('listCollections', started.command_name)
self.assertIsInstance(started.request_id, int)
@ -1239,7 +1243,7 @@ class TestCommandMonitoring(unittest.TestCase):
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('listIndexes', 'test'), ('cursor', {})])
self.assertEqual(expected, started.command)
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('listIndexes', started.command_name)
self.assertIsInstance(started.request_id, int)
@ -1257,7 +1261,7 @@ class TestCommandMonitoring(unittest.TestCase):
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('currentOp', 1), ('$all', True)])
self.assertEqual(expected, started.command)
self.assertEqualCommand(expected, started.command)
self.assertEqual('admin', started.database_name)
self.assertEqual('currentOp', started.command_name)
self.assertIsInstance(started.request_id, int)
@ -1280,7 +1284,7 @@ class TestCommandMonitoring(unittest.TestCase):
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = {'fsyncUnlock': 1}
self.assertEqual(expected, started.command)
self.assertEqualCommand(expected, started.command)
self.assertEqual('admin', started.database_name)
self.assertEqual('fsyncUnlock', started.command_name)
self.assertIsInstance(started.request_id, int)
@ -1320,7 +1324,7 @@ class TestCommandMonitoring(unittest.TestCase):
self.assertEqual({}, succeeded.reply)
class TestGlobalListener(unittest.TestCase):
class TestGlobalListener(PyMongoTestCase):
@classmethod
@client_context.require_connection
@ -1350,7 +1354,7 @@ class TestGlobalListener(unittest.TestCase):
isinstance(succeeded, monitoring.CommandSucceededEvent))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqual(SON([('ismaster', 1)]), started.command)
self.assertEqualCommand(SON([('ismaster', 1)]), started.command)
self.assertEqual('ismaster', started.command_name)
self.assertEqual(self.client.address, started.connection_id)
self.assertEqual('pymongo_test', started.database_name)

View File

@ -14,18 +14,16 @@
"""Test the read_concern module."""
import pymongo
from bson.son import SON
from pymongo import monitoring
from pymongo.errors import ConfigurationError, OperationFailure
from pymongo.read_concern import ReadConcern
from test import client_context, unittest
from test import client_context, PyMongoTestCase
from test.utils import single_client, rs_or_single_client, EventListener
class TestReadConcern(unittest.TestCase):
class TestReadConcern(PyMongoTestCase):
@classmethod
@client_context.require_connection
@ -89,7 +87,7 @@ class TestReadConcern(unittest.TestCase):
# Explicitly set readConcern to 'local'.
coll = self.db.get_collection('coll', read_concern=ReadConcern('local'))
tuple(coll.find({'field': 'value'}))
self.assertEqual(
self.assertEqualCommand(
SON([('find', 'coll'),
('filter', {'field': 'value'}),
('readConcern', {'level': 'local'})]),

View File

@ -25,15 +25,12 @@ from pymongo.errors import (ConfigurationError,
InvalidOperation,
OperationFailure)
from pymongo.monotonic import time as _time
from test import IntegrationTest, client_context, db_user, db_pwd
from test import IntegrationTest, client_context, db_user, db_pwd, SkipTest
from test.utils import ignore_deprecations, rs_or_single_client, EventListener
# Ignore auth commands like saslStart, so we can assert lsid is in all commands.
class SessionTestListener(EventListener):
def __init__(self):
super(SessionTestListener, self).__init__(ignore_lsid=False)
def started(self, event):
if not event.command_name.startswith('sasl'):
super(SessionTestListener, self).started(event)
@ -739,3 +736,104 @@ class TestSessionsNotSupported(IntegrationTest):
with self.assertRaisesRegex(
ConfigurationError, "Sessions are not supported"):
self.client.start_session()
class TestClusterTime(IntegrationTest):
def setUp(self):
super(TestClusterTime, self).setUp()
if '$clusterTime' not in client_context.ismaster:
raise SkipTest('$clusterTime not supported')
@ignore_deprecations
def test_cluster_time(self):
listener = SessionTestListener()
# Prevent heartbeats from updating $clusterTime between operations.
client = rs_or_single_client(event_listeners=[listener],
heartbeatFrequencyMS=999999)
collection = client.pymongo_test.collection
# Prepare for tests of find() and aggregate().
collection.insert_many([{} for _ in range(10)])
self.addCleanup(collection.drop)
self.addCleanup(client.pymongo_test.collection2.drop)
def bulk_insert(ordered):
if ordered:
bulk = collection.initialize_ordered_bulk_op()
else:
bulk = collection.initialize_unordered_bulk_op()
bulk.insert({})
bulk.execute()
def rename_and_drop():
# Ensure collection exists.
collection.insert_one({})
collection.rename('collection2')
client.pymongo_test.collection2.drop()
def insert_and_find():
cursor = collection.find().batch_size(1)
for _ in range(10):
# Advance the cluster time.
collection.insert_one({})
next(cursor)
cursor.close()
def insert_and_aggregate():
cursor = collection.aggregate([], batchSize=1).batch_size(1)
for _ in range(5):
# Advance the cluster time.
collection.insert_one({})
next(cursor)
cursor.close()
ops = [
# Tests from Driver Sessions Spec.
('ping', lambda: client.admin.command('ping')),
('aggregate', lambda: list(collection.aggregate([]))),
('find', lambda: list(collection.find())),
('insert_one', lambda: collection.insert_one({})),
# Additional PyMongo tests.
('insert_and_find', insert_and_find),
('insert_and_aggregate', insert_and_aggregate),
('update_one',
lambda: collection.update_one({}, {'$set': {'x': 1}})),
('update_many',
lambda: collection.update_many({}, {'$set': {'x': 1}})),
('delete_one', lambda: collection.delete_one({})),
('delete_many', lambda: collection.delete_many({})),
('bulk_write', lambda: collection.bulk_write([InsertOne({})])),
('ordered bulk', lambda: bulk_insert(True)),
('unordered bulk', lambda: bulk_insert(False)),
('rename_and_drop', rename_and_drop),
]
for name, f in ops:
listener.results.clear()
# Call f() twice, insert to advance clusterTime, call f() again.
f()
f()
collection.insert_one({})
f()
self.assertGreaterEqual(len(listener.results['started']), 1)
for i, event in enumerate(listener.results['started']):
self.assertTrue(
'$clusterTime' in event.command,
"%s sent no $clusterTime with %s" % (
f.__name__, event.command_name))
if i > 0:
succeeded = listener.results['succeeded'][i - 1]
self.assertTrue(
'$clusterTime' in succeeded.reply,
"%s received no $clusterTime with %s" % (
f.__name__, succeeded.command_name))
self.assertTrue(
event.command['$clusterTime']['clusterTime'] >=
succeeded.reply['$clusterTime']['clusterTime'],
"%s sent wrong $clusterTime with %s" % (
f.__name__, event.command_name))

View File

@ -29,14 +29,12 @@ from functools import partial
from pymongo import MongoClient, monitoring
from pymongo.errors import AutoReconnect, OperationFailure
from pymongo.monitoring import CommandStartedEvent
from pymongo.server_selectors import (any_server_selector,
writable_server_selector)
from pymongo.write_concern import WriteConcern
from test import (client_context,
db_user,
db_pwd)
from test.version import Version
IMPOSSIBLE_WRITE_CONCERN = WriteConcern(w=1000)
@ -63,17 +61,10 @@ class WhiteListEventListener(monitoring.CommandListener):
class EventListener(monitoring.CommandListener):
def __init__(self, ignore_lsid=True):
self.ignore_lsid = ignore_lsid
def __init__(self):
self.results = defaultdict(list)
def started(self, event):
if self.ignore_lsid and 'lsid' in event.command:
cmd = event.command.copy()
cmd.pop('lsid', None)
event = CommandStartedEvent(cmd, event.database_name,
event.request_id, event.connection_id,
event.operation_id)
self.results['started'].append(event)
def succeeded(self, event):