From df018e88e2ab0cf9dd7710093ba0bb6d2f57ee4a Mon Sep 17 00:00:00 2001 From: "A. Jesse Jiryu Davis" Date: Thu, 5 Oct 2017 16:03:35 -0400 Subject: [PATCH] PYTHON-1332 - Gossip $clusterTime --- pymongo/_cmessagemodule.c | 71 +++++++++++++++++ pymongo/bulk.py | 7 +- pymongo/collection.py | 30 ++++---- pymongo/command_cursor.py | 39 +++++----- pymongo/common.py | 8 ++ pymongo/cursor.py | 29 +++---- pymongo/database.py | 25 +++--- pymongo/ismaster.py | 4 + pymongo/message.py | 50 ++++++++---- pymongo/mongo_client.py | 13 +++- pymongo/network.py | 19 +++-- pymongo/pool.py | 11 ++- pymongo/server_description.py | 7 +- pymongo/topology.py | 25 ++++++ test/__init__.py | 24 +++++- test/test_client.py | 2 +- test/test_collection.py | 2 +- test/test_discovery_and_monitoring.py | 30 +++++++- test/test_monitoring.py | 98 ++++++++++++------------ test/test_read_concern.py | 8 +- test/test_session.py | 106 +++++++++++++++++++++++++- test/utils.py | 11 +-- 22 files changed, 459 insertions(+), 160 deletions(-) diff --git a/pymongo/_cmessagemodule.c b/pymongo/_cmessagemodule.c index 5f5040283..60099dbcd 100644 --- a/pymongo/_cmessagemodule.c +++ b/pymongo/_cmessagemodule.c @@ -388,6 +388,7 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) { struct module_state *state = GETSTATE(self); int request_id = rand(); + PyObject* cluster_time = NULL; unsigned int flags; char* collection_name = NULL; int collection_name_length; @@ -429,6 +430,34 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) { PyErr_NoMemory(); return NULL; } + + /* Pop $clusterTime from dict and write it at the end, avoiding an error + * from the $-prefix and check_keys. + * + * If "dict" is a defaultdict we don't want to call PyMapping_GetItemString + * on it. That would **create** an _id where one didn't previously exist + * (PYTHON-871). + */ + if (PyDict_Check(query)) { + cluster_time = PyDict_GetItemString(query, "$clusterTime"); + if (cluster_time) { + /* PyDict_GetItemString returns a borrowed reference. */ + Py_INCREF(cluster_time); + if (-1 == PyMapping_DelItemString(query, "$clusterTime")) { + destroy_codec_options(&options); + PyMem_Free(collection_name); + return NULL; + } + } + } else if (PyMapping_HasKeyString(query, "$clusterTime")) { + cluster_time = PyMapping_GetItemString(query, "$clusterTime"); + if (!cluster_time + || -1 == PyMapping_DelItemString(query, "$clusterTime")) { + destroy_codec_options(&options); + PyMem_Free(collection_name); + return NULL; + } + } if (!buffer_write_int32(buffer, (int32_t)request_id) || !buffer_write_bytes(buffer, "\x00\x00\x00\x00\xd4\x07\x00\x00", 8) || !buffer_write_int32(buffer, (int32_t)flags) || @@ -439,6 +468,7 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) { destroy_codec_options(&options); buffer_free(buffer); PyMem_Free(collection_name); + Py_XDECREF(cluster_time); return NULL; } @@ -447,8 +477,49 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) { destroy_codec_options(&options); buffer_free(buffer); PyMem_Free(collection_name); + Py_XDECREF(cluster_time); return NULL; } + + /* back up a byte and write $clusterTime */ + if (cluster_time) { + int length; + char zero = 0; + + buffer_update_position(buffer, buffer_get_position(buffer) - 1); + if (!write_pair(state->_cbson, buffer, "$clusterTime", 12, cluster_time, + 0, &options, 1)) { + destroy_codec_options(&options); + buffer_free(buffer); + PyMem_Free(collection_name); + Py_DECREF(cluster_time); + return NULL; + } + + if (!buffer_write_bytes(buffer, &zero, 1)) { + destroy_codec_options(&options); + buffer_free(buffer); + PyMem_Free(collection_name); + Py_DECREF(cluster_time); + return NULL; + } + + length = buffer_get_position(buffer) - begin; + buffer_write_int32_at_position(buffer, begin, (int32_t)length); + + /* undo popping $clusterTime */ + if (-1 == PyMapping_SetItemString( + query, "$clusterTime", cluster_time)) { + destroy_codec_options(&options); + buffer_free(buffer); + PyMem_Free(collection_name); + Py_DECREF(cluster_time); + return NULL; + } + + Py_DECREF(cluster_time); + } + max_size = buffer_get_position(buffer) - begin; if (field_selector != Py_None) { diff --git a/pymongo/bulk.py b/pymongo/bulk.py index 1ca8a7ec2..179e8ad4d 100644 --- a/pymongo/bulk.py +++ b/pymongo/bulk.py @@ -307,7 +307,8 @@ class _Bulk(object): } op_id = _randint() db_name = self.collection.database.name - listeners = self.collection.database.client._event_listeners + client = self.collection.database.client + listeners = client._event_listeners with self.collection.database.client._tmp_session(session) as s: # sock_info.command checks auth, but we use sock_info.write_command. @@ -321,6 +322,7 @@ class _Bulk(object): cmd['bypassDocumentValidation'] = True if s: cmd['lsid'] = s._use_lsid() + client._send_cluster_time(cmd) bwc = _BulkWriteContext(db_name, cmd, sock_info, op_id, listeners, s) @@ -329,6 +331,9 @@ class _Bulk(object): run.ops, True, self.collection.codec_options, bwc) _merge_command(run, full_result, results) + last_result = results[-1][1] + client._receive_cluster_time(last_result) + # We're supposed to continue if errors are # at the write concern level (e.g. wtimeout) if self.ordered and full_result['writeErrors']: diff --git a/pymongo/collection.py b/pymongo/collection.py index 135139408..cdf48b960 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -31,6 +31,7 @@ from pymongo import (common, message) from pymongo.bulk import BulkOperationBuilder, _Bulk from pymongo.command_cursor import CommandCursor, RawBatchCommandCursor +from pymongo.common import ORDERED_TYPES from pymongo.collation import validate_collation_or_none from pymongo.change_stream import ChangeStream from pymongo.cursor import Cursor, RawBatchCursor @@ -47,12 +48,6 @@ from pymongo.results import (BulkWriteResult, UpdateResult) from pymongo.write_concern import WriteConcern -try: - from collections import OrderedDict - _ORDERED_TYPES = (SON, OrderedDict) -except ImportError: - _ORDERED_TYPES = (SON,) - _NO_OBJ_ERROR = "No matching object found" _UJOIN = u"%s.%s" @@ -243,7 +238,8 @@ class Collection(common.BaseObject): write_concern=write_concern, parse_write_concern_error=parse_write_concern_error, collation=collation, - session=s) + session=s, + client=self.__database.client) def __create(self, options, collation, session): """Sends a create command with the given options. @@ -573,7 +569,8 @@ class Collection(common.BaseObject): command, codec_options=self.__write_response_codec_options, check_keys=check_keys, - session=s) + session=s, + client=self.__database.client) _check_write_command_response([(0, result)]) else: # Legacy OP_INSERT. @@ -811,7 +808,9 @@ class Collection(common.BaseObject): self.__database.name, command, codec_options=self.__write_response_codec_options, - session=s).copy() + session=s, + client=self.__database.client).copy() + _check_write_command_response([(0, result)]) # Add the updatedExisting field for compatibility. if result.get('n') and 'upserted' not in result: @@ -1089,7 +1088,8 @@ class Collection(common.BaseObject): self.__database.name, command, codec_options=self.__write_response_codec_options, - session=s) + session=s, + client=self.__database.client) _check_write_command_response([(0, result)]) return result else: @@ -2043,7 +2043,8 @@ class Collection(common.BaseObject): parse_write_concern_error=dollar_out, read_concern=read_concern, collation=collation, - session=session) + session=session, + client=self.__database.client) if "cursor" in result: cursor = result["cursor"] @@ -2349,8 +2350,9 @@ class Collection(common.BaseObject): if sock_info.max_wire_version >= 5 and self.write_concern: cmd['writeConcern'] = self.write_concern.document cmd.update(kwargs) - sock_info.command('admin', cmd, parse_write_concern_error=True, - session=s) + return sock_info.command( + 'admin', cmd, parse_write_concern_error=True, + session=s, client=self.__database.client) def distinct(self, key, filter=None, session=None, **kwargs): """Get a list of distinct values for `key` among all documents @@ -2974,7 +2976,7 @@ class Collection(common.BaseObject): kwargs['sort'] = helpers._index_document(sort) # Accept OrderedDict, SON, and dict with len == 1 so we # don't break existing code already using find_and_modify. - elif (isinstance(sort, _ORDERED_TYPES) or + elif (isinstance(sort, ORDERED_TYPES) or isinstance(sort, dict) and len(sort) == 1): warnings.warn("Passing mapping types for `sort` is deprecated," " use a list of (key, direction) pairs instead", diff --git a/pymongo/command_cursor.py b/pymongo/command_cursor.py index 79bd9a45a..1ff58ef8f 100644 --- a/pymongo/command_cursor.py +++ b/pymongo/command_cursor.py @@ -129,6 +129,10 @@ class CommandCursor(object): client = self.__collection.database.client listeners = client._event_listeners publish = listeners.enabled_for_commands + start = datetime.datetime.now() + + def duration(): return datetime.datetime.now() - start + try: response = client._send_message_with_response( operation, address=self.__address) @@ -140,27 +144,24 @@ class CommandCursor(object): kill() raise - cmd_duration = response.duration rqst_id = response.request_id from_command = response.from_command reply = response.data - if publish: - start = datetime.datetime.now() try: docs = self._unpack_response(reply, self.__id, self.__collection.codec_options) if from_command: + client._receive_cluster_time(docs[0]) helpers._check_command_response(docs[0]) except OperationFailure as exc: kill() if publish: - duration = (datetime.datetime.now() - start) + cmd_duration listeners.publish_command_failure( - duration, exc.details, "getMore", rqst_id, self.__address) + duration(), exc.details, "getMore", rqst_id, self.__address) raise except NotMasterError as exc: @@ -169,17 +170,15 @@ class CommandCursor(object): kill() if publish: - duration = (datetime.datetime.now() - start) + cmd_duration listeners.publish_command_failure( - duration, exc.details, "getMore", rqst_id, self.__address) + duration(), exc.details, "getMore", rqst_id, self.__address) client._reset_server_and_request_check(self.address) raise except Exception as exc: if publish: - duration = (datetime.datetime.now() - start) + cmd_duration listeners.publish_command_failure( - duration, _convert_exception(exc), "getMore", rqst_id, + duration(), _convert_exception(exc), "getMore", rqst_id, self.__address) raise @@ -187,19 +186,22 @@ class CommandCursor(object): cursor = docs[0]['cursor'] documents = cursor['nextBatch'] self.__id = cursor['id'] + if publish: + listeners.publish_command_success( + duration(), docs[0], "getMore", rqst_id, + self.__address) else: documents = docs self.__id = reply.cursor_id - if publish: - duration = (datetime.datetime.now() - start) + cmd_duration - # Must publish in getMore command response format. - res = {"cursor": {"id": self.__id, - "ns": self.__collection.full_name, - "nextBatch": documents}, - "ok": 1} - listeners.publish_command_success( - duration, res, "getMore", rqst_id, self.__address) + if publish: + # Must publish in getMore command response format. + res = {"cursor": {"id": self.__id, + "ns": self.__collection.full_name, + "nextBatch": documents}, + "ok": 1} + listeners.publish_command_success( + duration(), res, "getMore", rqst_id, self.__address) if self.__id == 0: kill() @@ -227,6 +229,7 @@ class CommandCursor(object): self.__id, self.__collection.codec_options, self.__session, + self.__collection.database.client, self.__max_await_time_ms)) else: # Cursor id is zero nothing else to return self.__killed = True diff --git a/pymongo/common.py b/pymongo/common.py index a4d810d01..17d107b10 100644 --- a/pymongo/common.py +++ b/pymongo/common.py @@ -19,6 +19,7 @@ import collections import datetime import warnings +from bson import SON from bson.binary import (STANDARD, PYTHON_LEGACY, JAVA_LEGACY, CSHARP_LEGACY) from bson.codec_options import CodecOptions @@ -32,6 +33,13 @@ from pymongo.read_preferences import _MONGOS_MODES, _ServerMode from pymongo.ssl_support import validate_cert_reqs from pymongo.write_concern import WriteConcern +try: + from collections import OrderedDict + ORDERED_TYPES = (SON, OrderedDict) +except ImportError: + ORDERED_TYPES = (SON,) + + # Defaults until we connect to a server and get updated limits. MAX_BSON_SIZE = 16 * (1024 ** 2) MAX_MESSAGE_SIZE = 2 * MAX_BSON_SIZE diff --git a/pymongo/cursor.py b/pymongo/cursor.py index 14a8eedd2..37376c5d9 100644 --- a/pymongo/cursor.py +++ b/pymongo/cursor.py @@ -904,6 +904,9 @@ class Cursor(object): listeners = client._event_listeners publish = listeners.enabled_for_commands from_command = False + start = datetime.datetime.now() + + def duration(): return datetime.datetime.now() - start if operation: kwargs = { @@ -924,7 +927,6 @@ class Cursor(object): cmd_name = operation.name reply = response.data - cmd_duration = response.duration rqst_id = response.request_id from_command = response.from_command except AutoReconnect: @@ -948,28 +950,23 @@ class Cursor(object): cmd['maxTimeMS'] = self.__max_time_ms listeners.publish_command_start( cmd, self.__collection.database.name, 0, self.__address) - start = datetime.datetime.now() try: reply = self.__exhaust_mgr.sock.receive_message(None) except Exception as exc: if publish: - duration = datetime.datetime.now() - start listeners.publish_command_failure( - duration, _convert_exception(exc), cmd_name, rqst_id, + duration(), _convert_exception(exc), cmd_name, rqst_id, self.__address) if isinstance(exc, ConnectionFailure): self.__die() raise - if publish: - cmd_duration = datetime.datetime.now() - start - if publish: - start = datetime.datetime.now() try: docs = self._unpack_response(response=reply, cursor_id=self.__id, codec_options=self.__codec_options) if from_command: + client._receive_cluster_time(docs[0]) helpers._check_command_response(docs[0]) except OperationFailure as exc: self.__killed = True @@ -978,9 +975,8 @@ class Cursor(object): self.__die() if publish: - duration = (datetime.datetime.now() - start) + cmd_duration listeners.publish_command_failure( - duration, exc.details, cmd_name, rqst_id, self.__address) + duration(), exc.details, cmd_name, rqst_id, self.__address) # If this is a tailable cursor the error is likely # due to capped collection roll over. Setting @@ -998,22 +994,19 @@ class Cursor(object): self.__die() if publish: - duration = (datetime.datetime.now() - start) + cmd_duration listeners.publish_command_failure( - duration, exc.details, cmd_name, rqst_id, self.__address) + duration(), exc.details, cmd_name, rqst_id, self.__address) client._reset_server_and_request_check(self.__address) raise except Exception as exc: if publish: - duration = (datetime.datetime.now() - start) + cmd_duration listeners.publish_command_failure( - duration, _convert_exception(exc), cmd_name, rqst_id, + duration(), _convert_exception(exc), cmd_name, rqst_id, self.__address) raise if publish: - duration = (datetime.datetime.now() - start) + cmd_duration # Must publish in find / getMore / explain command response format. if from_command: res = docs[0] @@ -1028,7 +1021,7 @@ class Cursor(object): else: res["cursor"]["nextBatch"] = docs listeners.publish_command_success( - duration, res, cmd_name, rqst_id, self.__address) + duration(), res, cmd_name, rqst_id, self.__address) if from_command and cmd_name != "explain": cursor = docs[0]['cursor'] @@ -1085,7 +1078,8 @@ class Cursor(object): self.__batch_size, self.__read_concern, self.__collation, - self.__session) + self.__session, + self.__collection.database.client) self.__send_message(q) if not self.__id: self.__killed = True @@ -1107,6 +1101,7 @@ class Cursor(object): self.__id, self.__codec_options, self.__session, + self.__collection.database.client, self.__max_await_time_ms) self.__send_message(g) diff --git a/pymongo/database.py b/pymongo/database.py index 9ced4df2b..edd0a260a 100644 --- a/pymongo/database.py +++ b/pymongo/database.py @@ -434,19 +434,21 @@ class Database(common.BaseObject): check, allowable_errors, parse_write_concern_error=parse_write_concern_error, - session=session) + session=session, + client=self.__client) with self.__client._tmp_session(session) as s: return sock_info.command( - self.__name, - command, - slave_ok, - read_preference, - codec_options, - check, - allowable_errors, - parse_write_concern_error=parse_write_concern_error, - session=s) + self.__name, + command, + slave_ok, + read_preference, + codec_options, + check, + allowable_errors, + parse_write_concern_error=parse_write_concern_error, + session=s, + client=self.__client) def command(self, command, value=1, check=True, allowable_errors=None, read_preference=ReadPreference.PRIMARY, @@ -719,7 +721,8 @@ class Database(common.BaseObject): with self.__client._socket_for_writes() as sock_info: if sock_info.max_wire_version >= 4: with self.__client._tmp_session(session) as s: - return sock_info.command("admin", cmd, session=s) + return sock_info.command("admin", cmd, session=s, + client=self.__client) else: spec = {"$all": True} if include_all else {} x = _first_batch(sock_info, "admin", "$cmd.sys.inprog", diff --git a/pymongo/ismaster.py b/pymongo/ismaster.py index 1c76c4bf8..c97a10585 100644 --- a/pymongo/ismaster.py +++ b/pymongo/ismaster.py @@ -127,6 +127,10 @@ class IsMaster(object): def election_id(self): return self._doc.get('electionId') + @property + def cluster_time(self): + return self._doc.get('$clusterTime') + @property def logical_session_timeout_minutes(self): return self._doc.get('logicalSessionTimeoutMinutes') diff --git a/pymongo/message.py b/pymongo/message.py index 09e823c09..2fddb6660 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -172,10 +172,10 @@ _MODIFIERS = SON([ def _gen_explain_command( coll, spec, projection, skip, limit, batch_size, - options, read_concern, session): + options, read_concern, session, client): """Generate an explain command document.""" cmd = _gen_find_command(coll, spec, projection, skip, limit, batch_size, - options, session=None) + options, session=None, client=None) if read_concern.level: explain = SON([('explain', cmd), ('readConcern', read_concern.document)]) else: @@ -184,11 +184,12 @@ def _gen_explain_command( if session: explain['lsid'] = session._use_lsid() + client._send_cluster_time(explain) return explain def _gen_find_command(coll, spec, projection, skip, limit, batch_size, options, - session, read_concern=DEFAULT_READ_CONCERN, + session, client, read_concern=DEFAULT_READ_CONCERN, collation=None): """Generate a find command document.""" cmd = SON([('find', coll)]) @@ -222,11 +223,13 @@ def _gen_find_command(coll, spec, projection, skip, limit, batch_size, options, if options & val]) if session: cmd['lsid'] = session._use_lsid() + if client: + client._send_cluster_time(cmd) return cmd def _gen_get_more_command(cursor_id, coll, batch_size, max_await_time_ms, - session): + session, client): """Generate a getMore command document.""" cmd = SON([('getMore', cursor_id), ('collection', coll)]) @@ -236,6 +239,7 @@ def _gen_get_more_command(cursor_id, coll, batch_size, max_await_time_ms, cmd['maxTimeMS'] = max_await_time_ms if session: cmd['lsid'] = session._use_lsid() + client._send_cluster_time(cmd) return cmd @@ -245,11 +249,11 @@ class _Query(object): __slots__ = ('flags', 'db', 'coll', 'ntoskip', 'spec', 'fields', 'codec_options', 'read_preference', 'limit', 'batch_size', 'name', 'read_concern', 'collation', - 'session') + 'session', 'client') def __init__(self, flags, db, coll, ntoskip, spec, fields, codec_options, read_preference, limit, - batch_size, read_concern, collation, session): + batch_size, read_concern, collation, session, client): self.flags = flags self.db = db self.coll = coll @@ -263,6 +267,7 @@ class _Query(object): self.batch_size = batch_size self.collation = collation self.session = session + self.client = client self.name = 'find' def use_command(self, sock_info, exhaust): @@ -296,11 +301,11 @@ class _Query(object): return _gen_explain_command( self.coll, self.spec, self.fields, self.ntoskip, self.limit, self.batch_size, self.flags, - self.read_concern, self.session), self.db + self.read_concern, self.session, self.client), self.db return _gen_find_command(self.coll, self.spec, self.fields, self.ntoskip, self.limit, self.batch_size, - self.flags, self.session, self.read_concern, - self.collation), self.db + self.flags, self.session, self.client, + self.read_concern, self.collation), self.db def get_message(self, set_slave_ok, is_mongos, use_cmd=False): """Get a query message, possibly setting the slaveOk bit.""" @@ -340,19 +345,20 @@ class _GetMore(object): """A getmore operation.""" __slots__ = ('db', 'coll', 'ntoreturn', 'cursor_id', 'max_await_time_ms', - 'codec_options', 'session') + 'codec_options', 'session', 'client') name = 'getMore' def __init__(self, db, coll, ntoreturn, cursor_id, codec_options, session, - max_await_time_ms=None): + client, max_await_time_ms=None): self.db = db self.coll = coll self.ntoreturn = ntoreturn self.cursor_id = cursor_id self.codec_options = codec_options - self.max_await_time_ms = max_await_time_ms self.session = session + self.client = client + self.max_await_time_ms = max_await_time_ms def use_command(self, sock_info, exhaust): sock_info.check_session_auth_matches(self.session) @@ -363,7 +369,8 @@ class _GetMore(object): return _gen_get_more_command(self.cursor_id, self.coll, self.ntoreturn, self.max_await_time_ms, - self.session), self.db + self.session, + self.client), self.db def get_message(self, dummy0, dummy1, use_cmd=False): """Get a getmore message.""" @@ -507,7 +514,19 @@ def query(options, collection_name, num_to_skip, data += bson._make_c_string(collection_name) data += struct.pack("= 4 and namespace is not None: - sock_info.command(db, spec, session=session) + sock_info.command(db, spec, session=session, client=self) else: if publish: start = datetime.datetime.now() @@ -1271,6 +1271,14 @@ class MongoClient(common.BaseObject): else: yield None + def _send_cluster_time(self, command): + cluster_time = self._topology.max_cluster_time() + if cluster_time: + command['$clusterTime'] = cluster_time + + def _receive_cluster_time(self, reply): + self._topology.receive_cluster_time(reply.get('$clusterTime')) + def server_info(self, session=None): """Get information about the MongoDB server we're connected to. @@ -1476,7 +1484,8 @@ class MongoClient(common.BaseObject): if sock_info.max_wire_version >= 4: try: with self._tmp_session(session) as s: - sock_info.command("admin", cmd, session=s) + sock_info.command( + "admin", cmd, session=s, client=self) except OperationFailure as exc: # Ignore "DB not locked" to replicate old behavior if exc.code != 125: diff --git a/pymongo/network.py b/pymongo/network.py index 2538dac67..14b2dc89a 100644 --- a/pymongo/network.py +++ b/pymongo/network.py @@ -36,7 +36,7 @@ except ImportError: from bson import SON from pymongo import helpers, message -from pymongo.common import MAX_MESSAGE_SIZE +from pymongo.common import MAX_MESSAGE_SIZE, ORDERED_TYPES from pymongo.errors import (AutoReconnect, NotMasterError, OperationFailure, @@ -49,7 +49,7 @@ _UNPACK_HEADER = struct.Struct("" uses bson.timestamp.Timestamp's comparison operator. + if (not self._max_cluster_time + or cluster_time['clusterTime'] > + self._max_cluster_time['clusterTime']): + self._max_cluster_time = cluster_time + + def receive_cluster_time(self, cluster_time): + with self._lock: + self._receive_cluster_time_no_lock(cluster_time) + def request_check_all(self, wait_time=5): """Wake all monitors, wait for at least one to check its server.""" with self._lock: diff --git a/test/__init__.py b/test/__init__.py index b53a08119..2b53b1bf1 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -497,7 +497,29 @@ class ClientContext(object): client_context = ClientContext() -class IntegrationTest(unittest.TestCase): +def sanitize_cmd(cmd): + cp = cmd.copy() + cp.pop('$clusterTime', None) + cp.pop('lsid', None) + return cp + + +def sanitize_reply(reply): + cp = reply.copy() + cp.pop('$clusterTime', None) + cp.pop('operationTime', None) + return cp + + +class PyMongoTestCase(unittest.TestCase): + def assertEqualCommand(self, expected, actual, msg=None): + self.assertEqual(expected, sanitize_cmd(actual), msg) + + def assertEqualReply(self, expected, actual, msg=None): + self.assertEqual(expected, sanitize_reply(actual), msg) + + +class IntegrationTest(PyMongoTestCase): """Base class for TestCases that need a connection to MongoDB to pass.""" @classmethod diff --git a/test/test_client.py b/test/test_client.py index 1bb7316db..9171ff4bc 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -1055,7 +1055,7 @@ class TestClient(IntegrationTest): client._send_message_with_response( operation=message._GetMore('pymongo_test', 'collection', 101, 1234, client.codec_options, - None), + None, client), address=('not-a-member', 27017)) def test_heartbeat_frequency_ms(self): diff --git a/test/test_collection.py b/test/test_collection.py index bc36c6b42..a7fd360d8 100644 --- a/test/test_collection.py +++ b/test/test_collection.py @@ -2233,7 +2233,7 @@ class TestCollection(IntegrationTest): def test_find_command_generation(self): cmd = _gen_find_command('coll', {'$query': {'foo': 1}, '$dumb': 2}, - None, 0, 0, 0, None, None) + None, 0, 0, 0, None, None, None) self.assertEqual( cmd.to_dict(), SON([('find', 'coll'), diff --git a/test/test_discovery_and_monitoring.py b/test/test_discovery_and_monitoring.py index d8021d937..5b27e6a7f 100644 --- a/test/test_discovery_and_monitoring.py +++ b/test/test_discovery_and_monitoring.py @@ -20,7 +20,7 @@ import threading sys.path[0:0] = [""] -from bson import json_util +from bson import json_util, Timestamp from pymongo import common from pymongo.errors import ConfigurationError from pymongo.topology import Topology @@ -209,5 +209,33 @@ def create_tests(): create_tests() + +class TestClusterTimeComparison(unittest.TestCase): + def test_cluster_time_comparison(self): + t = create_mock_topology('mongodb://host') + + def send_cluster_time(time, inc, should_update): + old = t.max_cluster_time() + new = {'clusterTime': Timestamp(time, inc)} + got_ismaster(t, + ('host', 27017), + {'ok': 1, + 'minWireVersion': 0, + 'maxWireVersion': 6, + '$clusterTime': new}) + + actual = t.max_cluster_time() + if should_update: + self.assertEqual(actual, new) + else: + self.assertEqual(actual, old) + + send_cluster_time(0, 1, True) + send_cluster_time(2, 2, True) + send_cluster_time(2, 1, False) + send_cluster_time(1, 3, False) + send_cluster_time(2, 3, True) + + if __name__ == "__main__": unittest.main() diff --git a/test/test_monitoring.py b/test/test_monitoring.py index 65820b233..afcfb8b87 100644 --- a/test/test_monitoring.py +++ b/test/test_monitoring.py @@ -28,14 +28,18 @@ from pymongo.command_cursor import CommandCursor from pymongo.errors import NotMasterError, OperationFailure from pymongo.read_preferences import ReadPreference from pymongo.write_concern import WriteConcern -from test import unittest, client_context, client_knobs +from test import (client_context, + client_knobs, + PyMongoTestCase, + sanitize_cmd, + unittest) from test.utils import (EventListener, rs_or_single_client, single_client, wait_until) -class TestCommandMonitoring(unittest.TestCase): +class TestCommandMonitoring(PyMongoTestCase): @classmethod @client_context.require_connection @@ -63,7 +67,7 @@ class TestCommandMonitoring(unittest.TestCase): isinstance(succeeded, monitoring.CommandSucceededEvent)) self.assertTrue( isinstance(started, monitoring.CommandStartedEvent)) - self.assertEqual(SON([('ismaster', 1)]), started.command) + self.assertEqualCommand(SON([('ismaster', 1)]), started.command) self.assertEqual('ismaster', started.command_name) self.assertEqual(self.client.address, started.connection_id) self.assertEqual('pymongo_test', started.database_name) @@ -114,7 +118,7 @@ class TestCommandMonitoring(unittest.TestCase): isinstance(succeeded, monitoring.CommandSucceededEvent)) self.assertTrue( isinstance(started, monitoring.CommandStartedEvent)) - self.assertEqual( + self.assertEqualCommand( SON([('find', 'test'), ('filter', {}), ('limit', 1), @@ -141,7 +145,7 @@ class TestCommandMonitoring(unittest.TestCase): self.assertEqual(0, len(results['failed'])) self.assertTrue( isinstance(started, monitoring.CommandStartedEvent)) - self.assertEqual( + self.assertEqualCommand( SON([('find', 'test'), ('filter', {}), ('projection', {'_id': False}), @@ -173,7 +177,7 @@ class TestCommandMonitoring(unittest.TestCase): self.assertEqual(0, len(results['failed'])) self.assertTrue( isinstance(started, monitoring.CommandStartedEvent)) - self.assertEqual( + self.assertEqualCommand( SON([('getMore', cursor_id), ('collection', 'test'), ('batchSize', 4)]), @@ -214,7 +218,7 @@ class TestCommandMonitoring(unittest.TestCase): self.assertEqual(0, len(results['failed'])) self.assertTrue( isinstance(started, monitoring.CommandStartedEvent)) - self.assertEqual(cmd, started.command) + self.assertEqualCommand(cmd, started.command) self.assertEqual('explain', started.command_name) self.assertEqual(self.client.address, started.connection_id) self.assertEqual('pymongo_test', started.database_name) @@ -249,7 +253,7 @@ class TestCommandMonitoring(unittest.TestCase): self.assertEqual(0, len(results['failed'])) self.assertTrue( isinstance(started, monitoring.CommandStartedEvent)) - self.assertEqual(expected_cmd, started.command) + self.assertEqualCommand(expected_cmd, started.command) self.assertEqual('find', started.command_name) self.assertEqual(self.client.address, started.connection_id) self.assertEqual('pymongo_test', started.database_name) @@ -332,7 +336,7 @@ class TestCommandMonitoring(unittest.TestCase): self.assertEqual(0, len(results['failed'])) self.assertTrue( isinstance(started, monitoring.CommandStartedEvent)) - self.assertEqual( + self.assertEqualCommand( SON([('aggregate', 'test'), ('pipeline', [{'$project': {'_id': False, 'x': 1}}]), ('cursor', {'batchSize': 4})]), @@ -350,7 +354,7 @@ class TestCommandMonitoring(unittest.TestCase): expected_cursor = {'id': cursor_id, 'ns': 'pymongo_test.test', 'firstBatch': [{'x': 1} for _ in range(4)]} - self.assertEqual(expected_cursor, succeeded.reply.get('cursor')) + self.assertEqualCommand(expected_cursor, succeeded.reply.get('cursor')) self.listener.results.clear() next(cursor) @@ -361,7 +365,7 @@ class TestCommandMonitoring(unittest.TestCase): self.assertEqual(0, len(results['failed'])) self.assertTrue( isinstance(started, monitoring.CommandStartedEvent)) - self.assertEqual( + self.assertEqualCommand( SON([('getMore', cursor_id), ('collection', 'test'), ('batchSize', 4)]), @@ -380,8 +384,8 @@ class TestCommandMonitoring(unittest.TestCase): 'cursor': {'id': cursor_id, 'ns': 'pymongo_test.test', 'nextBatch': [{'x': 1} for _ in range(4)]}, - 'ok': 1} - self.assertEqual(expected_result, succeeded.reply) + 'ok': 1.0} + self.assertEqualReply(expected_result, succeeded.reply) finally: # Exhaust the cursor to avoid kill cursors. tuple(cursor) @@ -401,7 +405,7 @@ class TestCommandMonitoring(unittest.TestCase): failed = results['failed'][0] self.assertTrue( isinstance(started, monitoring.CommandStartedEvent)) - self.assertEqual( + self.assertEqualCommand( SON([('getMore', 12345), ('collection', 'test')]), started.command) @@ -462,7 +466,7 @@ class TestCommandMonitoring(unittest.TestCase): self.assertEqual(0, len(results['failed'])) self.assertTrue( isinstance(started, monitoring.CommandStartedEvent)) - self.assertEqual( + self.assertEqualCommand( SON([('find', 'test'), ('filter', {}), ('projection', {'_id': False}), @@ -483,7 +487,7 @@ class TestCommandMonitoring(unittest.TestCase): 'ns': 'pymongo_test.test', 'firstBatch': [{} for _ in range(5)]}, 'ok': 1} - self.assertEqual(expected_result, succeeded.reply) + self.assertEqualReply(expected_result, succeeded.reply) self.listener.results.clear() tuple(cursor) @@ -493,7 +497,7 @@ class TestCommandMonitoring(unittest.TestCase): self.assertEqual(0, len(results['failed'])) self.assertTrue( isinstance(started, monitoring.CommandStartedEvent)) - self.assertEqual( + self.assertEqualCommand( SON([('getMore', cursor_id), ('collection', 'test'), ('batchSize', 5)]), @@ -513,7 +517,7 @@ class TestCommandMonitoring(unittest.TestCase): 'ns': 'pymongo_test.test', 'nextBatch': [{} for _ in range(5)]}, 'ok': 1} - self.assertEqual(expected_result, succeeded.reply) + self.assertEqualReply(expected_result, succeeded.reply) def test_kill_cursors(self): with client_knobs(kill_cursor_frequency=0.01): @@ -566,7 +570,7 @@ class TestCommandMonitoring(unittest.TestCase): expected = SON([('insert', coll.name), ('ordered', True), ('documents', [{'_id': res.inserted_id, 'x': 1}])]) - self.assertEqual(expected, started.command) + self.assertEqualCommand(expected, started.command) self.assertEqual('pymongo_test', started.database_name) self.assertEqual('insert', started.command_name) self.assertIsInstance(started.request_id, int) @@ -593,7 +597,7 @@ class TestCommandMonitoring(unittest.TestCase): ('ordered', True), ('documents', [{'_id': res.inserted_id, 'x': 1}]), ('writeConcern', {'w': 0})]) - self.assertEqual(expected, started.command) + self.assertEqualCommand(expected, started.command) self.assertEqual('pymongo_test', started.database_name) self.assertEqual('insert', started.command_name) self.assertIsInstance(started.request_id, int) @@ -603,7 +607,7 @@ class TestCommandMonitoring(unittest.TestCase): self.assertEqual(started.command_name, succeeded.command_name) self.assertEqual(started.request_id, succeeded.request_id) self.assertEqual(started.connection_id, succeeded.connection_id) - self.assertEqual(succeeded.reply, {'ok': 1}) + self.assertEqualReply(succeeded.reply, {'ok': 1}) # Explicit write concern insert_one self.listener.results.clear() @@ -618,7 +622,7 @@ class TestCommandMonitoring(unittest.TestCase): ('ordered', True), ('documents', [{'_id': res.inserted_id, 'x': 1}]), ('writeConcern', {'w': 1})]) - self.assertEqual(expected, started.command) + self.assertEqualCommand(expected, started.command) self.assertEqual('pymongo_test', started.database_name) self.assertEqual('insert', started.command_name) self.assertIsInstance(started.request_id, int) @@ -645,7 +649,7 @@ class TestCommandMonitoring(unittest.TestCase): ('deletes', [SON([('q', {'x': 1}), ('limit', 0)])]), ('writeConcern', {'w': 1})]) - self.assertEqual(expected, started.command) + self.assertEqualCommand(expected, started.command) self.assertEqual('pymongo_test', started.database_name) self.assertEqual('delete', started.command_name) self.assertIsInstance(started.request_id, int) @@ -675,7 +679,7 @@ class TestCommandMonitoring(unittest.TestCase): ('multi', False), ('upsert', True)])]), ('writeConcern', {'w': 1})]) - self.assertEqual(expected, started.command) + self.assertEqualCommand(expected, started.command) self.assertEqual('pymongo_test', started.database_name) self.assertEqual('update', started.command_name) self.assertIsInstance(started.request_id, int) @@ -705,7 +709,7 @@ class TestCommandMonitoring(unittest.TestCase): ('multi', False), ('upsert', False)])]), ('writeConcern', {'w': 1})]) - self.assertEqual(expected, started.command) + self.assertEqualCommand(expected, started.command) self.assertEqual('pymongo_test', started.database_name) self.assertEqual('update', started.command_name) self.assertIsInstance(started.request_id, int) @@ -734,7 +738,7 @@ class TestCommandMonitoring(unittest.TestCase): ('multi', True), ('upsert', False)])]), ('writeConcern', {'w': 1})]) - self.assertEqual(expected, started.command) + self.assertEqualCommand(expected, started.command) self.assertEqual('pymongo_test', started.database_name) self.assertEqual('update', started.command_name) self.assertIsInstance(started.request_id, int) @@ -761,7 +765,7 @@ class TestCommandMonitoring(unittest.TestCase): ('deletes', [SON([('q', {'x': 3}), ('limit', 1)])]), ('writeConcern', {'w': 1})]) - self.assertEqual(expected, started.command) + self.assertEqualCommand(expected, started.command) self.assertEqual('pymongo_test', started.database_name) self.assertEqual('delete', started.command_name) self.assertIsInstance(started.request_id, int) @@ -793,7 +797,7 @@ class TestCommandMonitoring(unittest.TestCase): ('ordered', True), ('documents', [{'_id': 1}]), ('writeConcern', {'w': 1})]) - self.assertEqual(expected, started.command) + self.assertEqualCommand(expected, started.command) self.assertEqual('pymongo_test', started.database_name) self.assertEqual('insert', started.command_name) self.assertIsInstance(started.request_id, int) @@ -831,7 +835,7 @@ class TestCommandMonitoring(unittest.TestCase): expected = SON([('insert', coll.name), ('ordered', True), ('documents', [{'_id': _id, 'x': 1}])]) - self.assertEqual(expected, started.command) + self.assertEqualCommand(expected, started.command) self.assertEqual('pymongo_test', started.database_name) self.assertEqual('insert', started.command_name) self.assertIsInstance(started.request_id, int) @@ -857,7 +861,7 @@ class TestCommandMonitoring(unittest.TestCase): ('ordered', True), ('documents', [{'_id': _id, 'x': 1}]), ('writeConcern', {'w': 0})]) - self.assertEqual(expected, started.command) + self.assertEqualCommand(expected, started.command) self.assertEqual('pymongo_test', started.database_name) self.assertEqual('insert', started.command_name) self.assertIsInstance(started.request_id, int) @@ -881,7 +885,7 @@ class TestCommandMonitoring(unittest.TestCase): ('ordered', True), ('documents', [{'_id': _id, 'x': 1}]), ('writeConcern', {'w': 1})]) - self.assertEqual(expected, started.command) + self.assertEqualCommand(expected, started.command) self.assertEqual('pymongo_test', started.database_name) self.assertEqual('insert', started.command_name) self.assertIsInstance(started.request_id, int) @@ -908,7 +912,7 @@ class TestCommandMonitoring(unittest.TestCase): ('deletes', [SON([('q', {'x': 1}), ('limit', 0)])]), ('writeConcern', {'w': 1})]) - self.assertEqual(expected, started.command) + self.assertEqualCommand(expected, started.command) self.assertEqual('pymongo_test', started.database_name) self.assertEqual('delete', started.command_name) self.assertIsInstance(started.request_id, int) @@ -938,7 +942,7 @@ class TestCommandMonitoring(unittest.TestCase): ('multi', False), ('upsert', True)])]), ('writeConcern', {'w': 1})]) - self.assertEqual(expected, started.command) + self.assertEqualCommand(expected, started.command) self.assertEqual('pymongo_test', started.database_name) self.assertEqual('update', started.command_name) self.assertIsInstance(started.request_id, int) @@ -967,7 +971,7 @@ class TestCommandMonitoring(unittest.TestCase): ('u', {'$inc': {'x': 1}}), ('multi', False), ('upsert', False)])])]) - self.assertEqual(expected, started.command) + self.assertEqualCommand(expected, started.command) self.assertEqual('pymongo_test', started.database_name) self.assertEqual('update', started.command_name) self.assertIsInstance(started.request_id, int) @@ -995,7 +999,7 @@ class TestCommandMonitoring(unittest.TestCase): ('u', {'$inc': {'x': 1}}), ('multi', True), ('upsert', False)])])]) - self.assertEqual(expected, started.command) + self.assertEqualCommand(expected, started.command) self.assertEqual('pymongo_test', started.database_name) self.assertEqual('update', started.command_name) self.assertIsInstance(started.request_id, int) @@ -1021,7 +1025,7 @@ class TestCommandMonitoring(unittest.TestCase): ('ordered', True), ('deletes', [SON([('q', {'x': 3}), ('limit', 1)])])]) - self.assertEqual(expected, started.command) + self.assertEqualCommand(expected, started.command) self.assertEqual('pymongo_test', started.database_name) self.assertEqual('delete', started.command_name) self.assertIsInstance(started.request_id, int) @@ -1056,7 +1060,7 @@ class TestCommandMonitoring(unittest.TestCase): self.assertIsInstance(operation_id, int) for start, succeed in zip(started, succeeded): self.assertIsInstance(start, monitoring.CommandStartedEvent) - cmd = start.command + cmd = sanitize_cmd(start.command) self.assertEqual(['insert', 'ordered', 'documents'], list(cmd.keys())) self.assertEqual(coll.name, cmd['insert']) @@ -1102,7 +1106,7 @@ class TestCommandMonitoring(unittest.TestCase): self.assertIsInstance(operation_id, int) for start, succeed in zip(started, succeeded): self.assertIsInstance(start, monitoring.CommandStartedEvent) - cmd = start.command + cmd = sanitize_cmd(start.command) self.assertEqual(['insert', 'ordered', 'documents'], list(cmd.keys())) self.assertEqual(coll.name, cmd['insert']) @@ -1156,19 +1160,19 @@ class TestCommandMonitoring(unittest.TestCase): expected = SON([('insert', coll.name), ('ordered', True), ('documents', [{'_id': 1}])]) - self.assertEqual(expected, started[0].command) + self.assertEqualCommand(expected, started[0].command) expected = SON([('update', coll.name), ('ordered', True), ('updates', [SON([('q', {'_id': 1}), ('u', {'$set': {'x': 1}}), ('multi', False), ('upsert', False)])])]) - self.assertEqual(expected, started[1].command) + self.assertEqualCommand(expected, started[1].command) expected = SON([('delete', coll.name), ('ordered', True), ('deletes', [SON([('q', {'_id': 1}), ('limit', 1)])])]) - self.assertEqual(expected, started[2].command) + self.assertEqualCommand(expected, started[2].command) def test_write_errors(self): coll = self.client.pymongo_test.test @@ -1221,7 +1225,7 @@ class TestCommandMonitoring(unittest.TestCase): self.assertEqual(0, len(results['failed'])) self.assertIsInstance(started, monitoring.CommandStartedEvent) expected = SON([('listCollections', 1), ('cursor', {})]) - self.assertEqual(expected, started.command) + self.assertEqualCommand(expected, started.command) self.assertEqual('pymongo_test', started.database_name) self.assertEqual('listCollections', started.command_name) self.assertIsInstance(started.request_id, int) @@ -1239,7 +1243,7 @@ class TestCommandMonitoring(unittest.TestCase): self.assertEqual(0, len(results['failed'])) self.assertIsInstance(started, monitoring.CommandStartedEvent) expected = SON([('listIndexes', 'test'), ('cursor', {})]) - self.assertEqual(expected, started.command) + self.assertEqualCommand(expected, started.command) self.assertEqual('pymongo_test', started.database_name) self.assertEqual('listIndexes', started.command_name) self.assertIsInstance(started.request_id, int) @@ -1257,7 +1261,7 @@ class TestCommandMonitoring(unittest.TestCase): self.assertEqual(0, len(results['failed'])) self.assertIsInstance(started, monitoring.CommandStartedEvent) expected = SON([('currentOp', 1), ('$all', True)]) - self.assertEqual(expected, started.command) + self.assertEqualCommand(expected, started.command) self.assertEqual('admin', started.database_name) self.assertEqual('currentOp', started.command_name) self.assertIsInstance(started.request_id, int) @@ -1280,7 +1284,7 @@ class TestCommandMonitoring(unittest.TestCase): self.assertEqual(0, len(results['failed'])) self.assertIsInstance(started, monitoring.CommandStartedEvent) expected = {'fsyncUnlock': 1} - self.assertEqual(expected, started.command) + self.assertEqualCommand(expected, started.command) self.assertEqual('admin', started.database_name) self.assertEqual('fsyncUnlock', started.command_name) self.assertIsInstance(started.request_id, int) @@ -1320,7 +1324,7 @@ class TestCommandMonitoring(unittest.TestCase): self.assertEqual({}, succeeded.reply) -class TestGlobalListener(unittest.TestCase): +class TestGlobalListener(PyMongoTestCase): @classmethod @client_context.require_connection @@ -1350,7 +1354,7 @@ class TestGlobalListener(unittest.TestCase): isinstance(succeeded, monitoring.CommandSucceededEvent)) self.assertTrue( isinstance(started, monitoring.CommandStartedEvent)) - self.assertEqual(SON([('ismaster', 1)]), started.command) + self.assertEqualCommand(SON([('ismaster', 1)]), started.command) self.assertEqual('ismaster', started.command_name) self.assertEqual(self.client.address, started.connection_id) self.assertEqual('pymongo_test', started.database_name) diff --git a/test/test_read_concern.py b/test/test_read_concern.py index 1eaa34c4e..f501cf2ed 100644 --- a/test/test_read_concern.py +++ b/test/test_read_concern.py @@ -14,18 +14,16 @@ """Test the read_concern module.""" -import pymongo - from bson.son import SON from pymongo import monitoring from pymongo.errors import ConfigurationError, OperationFailure from pymongo.read_concern import ReadConcern -from test import client_context, unittest +from test import client_context, PyMongoTestCase from test.utils import single_client, rs_or_single_client, EventListener -class TestReadConcern(unittest.TestCase): +class TestReadConcern(PyMongoTestCase): @classmethod @client_context.require_connection @@ -89,7 +87,7 @@ class TestReadConcern(unittest.TestCase): # Explicitly set readConcern to 'local'. coll = self.db.get_collection('coll', read_concern=ReadConcern('local')) tuple(coll.find({'field': 'value'})) - self.assertEqual( + self.assertEqualCommand( SON([('find', 'coll'), ('filter', {'field': 'value'}), ('readConcern', {'level': 'local'})]), diff --git a/test/test_session.py b/test/test_session.py index 1d928e026..01f8f2762 100644 --- a/test/test_session.py +++ b/test/test_session.py @@ -25,15 +25,12 @@ from pymongo.errors import (ConfigurationError, InvalidOperation, OperationFailure) from pymongo.monotonic import time as _time -from test import IntegrationTest, client_context, db_user, db_pwd +from test import IntegrationTest, client_context, db_user, db_pwd, SkipTest from test.utils import ignore_deprecations, rs_or_single_client, EventListener # Ignore auth commands like saslStart, so we can assert lsid is in all commands. class SessionTestListener(EventListener): - def __init__(self): - super(SessionTestListener, self).__init__(ignore_lsid=False) - def started(self, event): if not event.command_name.startswith('sasl'): super(SessionTestListener, self).started(event) @@ -739,3 +736,104 @@ class TestSessionsNotSupported(IntegrationTest): with self.assertRaisesRegex( ConfigurationError, "Sessions are not supported"): self.client.start_session() + + +class TestClusterTime(IntegrationTest): + def setUp(self): + super(TestClusterTime, self).setUp() + if '$clusterTime' not in client_context.ismaster: + raise SkipTest('$clusterTime not supported') + + @ignore_deprecations + def test_cluster_time(self): + listener = SessionTestListener() + # Prevent heartbeats from updating $clusterTime between operations. + client = rs_or_single_client(event_listeners=[listener], + heartbeatFrequencyMS=999999) + collection = client.pymongo_test.collection + # Prepare for tests of find() and aggregate(). + collection.insert_many([{} for _ in range(10)]) + self.addCleanup(collection.drop) + self.addCleanup(client.pymongo_test.collection2.drop) + + def bulk_insert(ordered): + if ordered: + bulk = collection.initialize_ordered_bulk_op() + else: + bulk = collection.initialize_unordered_bulk_op() + bulk.insert({}) + bulk.execute() + + def rename_and_drop(): + # Ensure collection exists. + collection.insert_one({}) + collection.rename('collection2') + client.pymongo_test.collection2.drop() + + def insert_and_find(): + cursor = collection.find().batch_size(1) + for _ in range(10): + # Advance the cluster time. + collection.insert_one({}) + next(cursor) + + cursor.close() + + def insert_and_aggregate(): + cursor = collection.aggregate([], batchSize=1).batch_size(1) + for _ in range(5): + # Advance the cluster time. + collection.insert_one({}) + next(cursor) + + cursor.close() + + ops = [ + # Tests from Driver Sessions Spec. + ('ping', lambda: client.admin.command('ping')), + ('aggregate', lambda: list(collection.aggregate([]))), + ('find', lambda: list(collection.find())), + ('insert_one', lambda: collection.insert_one({})), + + # Additional PyMongo tests. + ('insert_and_find', insert_and_find), + ('insert_and_aggregate', insert_and_aggregate), + ('update_one', + lambda: collection.update_one({}, {'$set': {'x': 1}})), + ('update_many', + lambda: collection.update_many({}, {'$set': {'x': 1}})), + ('delete_one', lambda: collection.delete_one({})), + ('delete_many', lambda: collection.delete_many({})), + ('bulk_write', lambda: collection.bulk_write([InsertOne({})])), + ('ordered bulk', lambda: bulk_insert(True)), + ('unordered bulk', lambda: bulk_insert(False)), + ('rename_and_drop', rename_and_drop), + ] + + for name, f in ops: + listener.results.clear() + # Call f() twice, insert to advance clusterTime, call f() again. + f() + f() + collection.insert_one({}) + f() + + self.assertGreaterEqual(len(listener.results['started']), 1) + for i, event in enumerate(listener.results['started']): + self.assertTrue( + '$clusterTime' in event.command, + "%s sent no $clusterTime with %s" % ( + f.__name__, event.command_name)) + + if i > 0: + succeeded = listener.results['succeeded'][i - 1] + self.assertTrue( + '$clusterTime' in succeeded.reply, + "%s received no $clusterTime with %s" % ( + f.__name__, succeeded.command_name)) + + self.assertTrue( + event.command['$clusterTime']['clusterTime'] >= + succeeded.reply['$clusterTime']['clusterTime'], + "%s sent wrong $clusterTime with %s" % ( + f.__name__, event.command_name)) diff --git a/test/utils.py b/test/utils.py index d4ac7bcb0..7f072747c 100644 --- a/test/utils.py +++ b/test/utils.py @@ -29,14 +29,12 @@ from functools import partial from pymongo import MongoClient, monitoring from pymongo.errors import AutoReconnect, OperationFailure -from pymongo.monitoring import CommandStartedEvent from pymongo.server_selectors import (any_server_selector, writable_server_selector) from pymongo.write_concern import WriteConcern from test import (client_context, db_user, db_pwd) -from test.version import Version IMPOSSIBLE_WRITE_CONCERN = WriteConcern(w=1000) @@ -63,17 +61,10 @@ class WhiteListEventListener(monitoring.CommandListener): class EventListener(monitoring.CommandListener): - def __init__(self, ignore_lsid=True): - self.ignore_lsid = ignore_lsid + def __init__(self): self.results = defaultdict(list) def started(self, event): - if self.ignore_lsid and 'lsid' in event.command: - cmd = event.command.copy() - cmd.pop('lsid', None) - event = CommandStartedEvent(cmd, event.database_name, - event.request_id, event.connection_id, - event.operation_id) self.results['started'].append(event) def succeeded(self, event):