diff --git a/pymongo/bulk.py b/pymongo/bulk.py index 804a8d0c2..bef1ddf85 100644 --- a/pymongo/bulk.py +++ b/pymongo/bulk.py @@ -269,7 +269,7 @@ class _Bulk(object): while run.idx_offset < len(run.ops): if session and retryable: cmd['txnNumber'] = session._transaction_id() - client._send_cluster_time(cmd, session) + sock_info.send_cluster_time(cmd, session, client) check_keys = run.op_type == _INSERT ops = islice(run.ops, run.idx_offset, None) # Run as many ops as possible. diff --git a/pymongo/collection.py b/pymongo/collection.py index 6271cd59c..29aa3e2ea 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -1945,15 +1945,15 @@ class Collection(common.BaseObject): if exc.code != 26: raise cursor = {'id': 0, 'firstBatch': []} - return CommandCursor(coll, cursor, sock_info.address, session=s, + return CommandCursor(coll, cursor, sock_info.address, + session=s, explicit_session=session is not None) else: - namespace = _UJOIN % (self.__database.name, "system.indexes") res = message._first_batch( sock_info, self.__database.name, "system.indexes", {"ns": self.__full_name}, 0, slave_ok, codec_options, ReadPreference.PRIMARY, cmd, - self.database.client._event_listeners, session=None) + self.database.client._event_listeners) cursor = res["cursor"] # Note that a collection can only have 64 indexes, so there # will never be a getMore call. diff --git a/pymongo/database.py b/pymongo/database.py index 635992155..8949ad8a6 100644 --- a/pymongo/database.py +++ b/pymongo/database.py @@ -739,8 +739,7 @@ class Database(common.BaseObject): return _first_batch(sock_info, "admin", "$cmd.sys.inprog", spec, -1, True, self.codec_options, ReadPreference.PRIMARY, cmd, - self.client._event_listeners, - session=None) + self.client._event_listeners) def profiling_level(self, session=None): """Get the database's current profiling level. diff --git a/pymongo/message.py b/pymongo/message.py index 8773b8da5..bfbc60183 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -170,31 +170,8 @@ _MODIFIERS = SON([ ('$snapshot', 'snapshot')]) -def _gen_explain_command( - coll, spec, projection, skip, limit, batch_size, - options, read_concern, session, client): - """Generate an explain command document.""" - cmd = _gen_find_command(coll, spec, projection, skip, limit, batch_size, - options, session=None, client=None) - if read_concern.level: - explain = SON([('explain', cmd), ('readConcern', read_concern.document)]) - else: - explain = SON([('explain', cmd)]) - - if session: - explain['lsid'] = session._use_lsid() - if (session.options.causal_consistency - and session.operation_time is not None): - explain.setdefault( - 'readConcern', {})['afterClusterTime'] = session.operation_time - - client._send_cluster_time(explain, session) - return explain - - def _gen_find_command(coll, spec, projection, skip, limit, batch_size, options, - session, client, read_concern=DEFAULT_READ_CONCERN, - collation=None): + read_concern, collation=None): """Generate a find command document.""" cmd = SON([('find', coll)]) if '$query' in spec: @@ -225,19 +202,10 @@ def _gen_find_command(coll, spec, projection, skip, limit, batch_size, options, cmd.update([(opt, True) for opt, val in _OPTIONS.items() if options & val]) - if session: - cmd['lsid'] = session._use_lsid() - if (session.options.causal_consistency - and session.operation_time is not None): - cmd.setdefault( - 'readConcern', {})['afterClusterTime'] = session.operation_time - if client: - client._send_cluster_time(cmd, session) return cmd -def _gen_get_more_command(cursor_id, coll, batch_size, max_await_time_ms, - session, client): +def _gen_get_more_command(cursor_id, coll, batch_size, max_await_time_ms): """Generate a getMore command document.""" cmd = SON([('getMore', cursor_id), ('collection', coll)]) @@ -245,9 +213,6 @@ def _gen_get_more_command(cursor_id, coll, batch_size, max_await_time_ms, cmd['batchSize'] = batch_size if max_await_time_ms is not None: cmd['maxTimeMS'] = max_await_time_ms - if session: - cmd['lsid'] = session._use_lsid() - client._send_cluster_time(cmd, session) return cmd @@ -299,23 +264,32 @@ class _Query(object): return use_find_cmd - def as_command(self): + def as_command(self, sock_info): """Return a find command document for this query. Should be called *after* get_message. """ - if '$explain' in self.spec: + explain = '$explain' in self.spec + cmd = _gen_find_command( + self.coll, self.spec, self.fields, self.ntoskip, + self.limit, self.batch_size, self.flags, self.read_concern, + self.collation) + if explain: self.name = 'explain' - return _gen_explain_command( - self.coll, self.spec, self.fields, self.ntoskip, - self.limit, self.batch_size, self.flags, - self.read_concern, self.session, self.client), self.db - return _gen_find_command(self.coll, self.spec, self.fields, - self.ntoskip, self.limit, self.batch_size, - self.flags, self.session, self.client, - self.read_concern, self.collation), self.db + cmd = SON([('explain', cmd)]) + session = self.session + if session: + cmd['lsid'] = session._use_lsid() + # Explain does not support readConcern. + if (not explain and session.options.causal_consistency + and session.operation_time is not None): + cmd.setdefault( + 'readConcern', {})[ + 'afterClusterTime'] = session.operation_time + sock_info.send_cluster_time(cmd, session, self.client) + return cmd, self.db - def get_message(self, set_slave_ok, is_mongos, use_cmd=False): + def get_message(self, set_slave_ok, sock_info, use_cmd=False): """Get a query message, possibly setting the slaveOk bit.""" if set_slave_ok: # Set the slaveOk bit. @@ -328,7 +302,7 @@ class _Query(object): if use_cmd: ns = _UJOIN % (self.db, "$cmd") - spec = self.as_command()[0] + spec = self.as_command(sock_info)[0] ntoreturn = -1 # All DB commands return 1 document else: # OP_QUERY treats ntoreturn of -1 and 1 the same, return @@ -341,7 +315,7 @@ class _Query(object): else: ntoreturn = self.limit - if is_mongos: + if sock_info.is_mongos: spec = _maybe_add_read_preference(spec, self.read_preference) @@ -372,22 +346,25 @@ class _GetMore(object): sock_info.validate_session(self.client, self.session) return sock_info.max_wire_version >= 4 and not exhaust - def as_command(self): + def as_command(self, sock_info): """Return a getMore command document for this query.""" - return _gen_get_more_command(self.cursor_id, self.coll, - self.ntoreturn, - self.max_await_time_ms, - self.session, - self.client), self.db + cmd = _gen_get_more_command(self.cursor_id, self.coll, + self.ntoreturn, + self.max_await_time_ms) - def get_message(self, dummy0, dummy1, use_cmd=False): + if self.session: + cmd['lsid'] = self.session._use_lsid() + sock_info.send_cluster_time(cmd, self.session, self.client) + return cmd, self.db + + def get_message(self, dummy0, sock_info, use_cmd=False): """Get a getmore message.""" ns = _UJOIN % (self.db, self.coll) if use_cmd: ns = _UJOIN % (self.db, "$cmd") - spec = self.as_command()[0] + spec = self.as_command(sock_info)[0] return query(0, ns, 0, -1, spec, None, self.codec_options) @@ -401,20 +378,20 @@ class _RawBatchQuery(_Query): return False - def get_message(self, set_slave_ok, is_mongos, use_cmd=False): + def get_message(self, set_slave_ok, sock_info, use_cmd=False): # Always pass False for use_cmd. - return super(_RawBatchQuery, self).get_message(set_slave_ok, is_mongos, - False) + return super(_RawBatchQuery, self).get_message( + set_slave_ok, sock_info, False) class _RawBatchGetMore(_GetMore): def use_command(self, socket_info, exhaust): return False - def get_message(self, set_slave_ok, is_mongos, use_cmd=False): + def get_message(self, set_slave_ok, sock_info, use_cmd=False): # Always pass False for use_cmd. - return super(_RawBatchGetMore, self).get_message(set_slave_ok, is_mongos, - False) + return super(_RawBatchGetMore, self).get_message( + set_slave_ok, sock_info, False) class _CursorAddress(tuple): @@ -982,12 +959,11 @@ class _OpReply(object): def _first_batch(sock_info, db, coll, query, ntoreturn, - slave_ok, codec_options, read_preference, cmd, listeners, - session): + slave_ok, codec_options, read_preference, cmd, listeners): """Simple query helper for retrieving a first (and possibly only) batch.""" query = _Query( 0, db, coll, 0, query, None, codec_options, - read_preference, ntoreturn, 0, DEFAULT_READ_CONCERN, None, session, + read_preference, ntoreturn, 0, DEFAULT_READ_CONCERN, None, None, None) name = next(iter(cmd)) @@ -995,8 +971,7 @@ def _first_batch(sock_info, db, coll, query, ntoreturn, if publish: start = datetime.datetime.now() - request_id, msg, max_doc_size = query.get_message(slave_ok, - sock_info.is_mongos) + request_id, msg, max_doc_size = query.get_message(slave_ok, sock_info) if publish: encoding_duration = datetime.datetime.now() - start diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index e21129e7e..604456608 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -1679,8 +1679,7 @@ class MongoClient(common.BaseObject): message._first_batch(sock_info, "admin", "$cmd.sys.unlock", {}, -1, True, self.codec_options, ReadPreference.PRIMARY, cmd, - self._event_listeners, - session=None) + self._event_listeners) def __enter__(self): return self diff --git a/pymongo/monitor.py b/pymongo/monitor.py index a0e689455..1881ddc75 100644 --- a/pymongo/monitor.py +++ b/pymongo/monitor.py @@ -120,10 +120,8 @@ class Monitor(object): start = _time() try: - cluster_time = self._topology.max_cluster_time() # If the server type is unknown, send metadata with first check. - return self._check_once(metadata=metadata, - cluster_time=cluster_time) + return self._check_once(metadata=metadata) except ReferenceError: raise except Exception as error: @@ -142,9 +140,7 @@ class Monitor(object): # Always send metadata: this is a new connection. start = _time() try: - cluster_time = self._topology.max_cluster_time() - return self._check_once(metadata=self._pool.opts.metadata, - cluster_time=cluster_time) + return self._check_once(metadata=self._pool.opts.metadata) except ReferenceError: raise except Exception as error: @@ -155,7 +151,7 @@ class Monitor(object): self._avg_round_trip_time.reset() return default - def _check_once(self, metadata=None, cluster_time=None): + def _check_once(self, metadata=None): """A single attempt to call ismaster. Returns a ServerDescription, or raises an exception. @@ -165,7 +161,7 @@ class Monitor(object): self._listeners.publish_server_heartbeat_started(address) with self._pool.get_socket({}) as sock_info: response, round_trip_time = self._check_with_socket( - sock_info, metadata=metadata, cluster_time=cluster_time) + sock_info, metadata=metadata) self._avg_round_trip_time.add_sample(round_trip_time) sd = ServerDescription( address=address, @@ -177,7 +173,7 @@ class Monitor(object): return sd - def _check_with_socket(self, sock_info, metadata=None, cluster_time=None): + def _check_with_socket(self, sock_info, metadata=None): """Return (IsMaster, round_trip_time). Can raise ConnectionFailure or OperationFailure. @@ -185,8 +181,10 @@ class Monitor(object): cmd = SON([('ismaster', 1)]) if metadata is not None: cmd['client'] = metadata - if cluster_time is not None: - cmd['$clusterTime'] = cluster_time + if self._server_description.max_wire_version >= 6: + cluster_time = self._topology.max_cluster_time() + if cluster_time is not None: + cmd['$clusterTime'] = cluster_time start = _time() request_id, msg, max_doc_size = message.query( 0, 'admin.$cmd', 0, -1, cmd, diff --git a/pymongo/network.py b/pymongo/network.py index 8d76412dc..ce76ea8c4 100644 --- a/pymongo/network.py +++ b/pymongo/network.py @@ -34,9 +34,8 @@ try: except ImportError: _SELECT_ERROR = OSError -from bson import SON from pymongo import helpers, message -from pymongo.common import MAX_MESSAGE_SIZE, ORDERED_TYPES +from pymongo.common import MAX_MESSAGE_SIZE from pymongo.errors import (AutoReconnect, NotMasterError, OperationFailure, @@ -53,7 +52,7 @@ def command(sock, dbname, spec, slave_ok, is_mongos, check_keys=False, listeners=None, max_bson_size=None, read_concern=None, parse_write_concern_error=False, - collation=None, retryable_write=False): + collation=None): """Execute a command over the socket, or raise socket.error. :Parameters: @@ -76,20 +75,10 @@ def command(sock, dbname, spec, slave_ok, is_mongos, - `parse_write_concern_error`: Whether to parse the ``writeConcernError`` field in the command response. - `collation`: The collation for this command. - - `retryable_write`: True if this command is a retryable write. """ name = next(iter(spec)) ns = dbname + '.$cmd' flags = 4 if slave_ok else 0 - if (client or session) and not isinstance(spec, ORDERED_TYPES): - # Ensure command name remains in first place. - spec = SON(spec) - if session: - spec['lsid'] = session._use_lsid() - if retryable_write: - spec['txnNumber'] = session._transaction_id() - if client: - client._send_cluster_time(spec, session) # Publish the original command document, perhaps with lsid and $clusterTime. orig = spec diff --git a/pymongo/pool.py b/pymongo/pool.py index 289d72e5c..61ebf2609 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -33,7 +33,7 @@ from bson import DEFAULT_CODEC_OPTIONS from bson.py3compat import imap, itervalues, _unicode, integer_types from bson.son import SON from pymongo import auth, helpers, thread_util, __version__ -from pymongo.common import MAX_MESSAGE_SIZE +from pymongo.common import MAX_MESSAGE_SIZE, ORDERED_TYPES from pymongo.errors import (AutoReconnect, ConnectionFailure, ConfigurationError, @@ -477,6 +477,15 @@ class SocketInfo(object): elif self.max_wire_version < 5 and collation is not None: raise ConfigurationError( 'Must be connected to MongoDB 3.4+ to use a collation.') + + if (client or session) and not isinstance(spec, ORDERED_TYPES): + # Ensure command name remains in first place. + spec = SON(spec) + if session: + spec['lsid'] = session._use_lsid() + if retryable_write: + spec['txnNumber'] = session._transaction_id() + self.send_cluster_time(spec, session, client) try: return command(self.sock, dbname, spec, slave_ok, self.is_mongos, read_preference, codec_options, @@ -484,8 +493,7 @@ class SocketInfo(object): self.address, check_keys, self.listeners, self.max_bson_size, read_concern, parse_write_concern_error=parse_write_concern_error, - collation=collation, - retryable_write=retryable_write) + collation=collation) except OperationFailure: raise # Catch socket.error, KeyboardInterrupt, etc. and close ourselves. @@ -615,6 +623,11 @@ class SocketInfo(object): except Exception: pass + def send_cluster_time(self, command, session, client): + """Add cluster time for MongoDB >= 3.6.""" + if self.max_wire_version >= 6 and client: + client._send_cluster_time(command, session) + def _raise_connection_failure(self, error): # Catch *all* exceptions from socket methods and close the socket. In # regular Python, socket operations only raise socket.error, even if diff --git a/pymongo/server.py b/pymongo/server.py index 58277a319..5f982c8c7 100644 --- a/pymongo/server.py +++ b/pymongo/server.py @@ -91,12 +91,12 @@ class Server(object): use_find_cmd = operation.use_command(sock_info, exhaust) message = operation.get_message( - set_slave_okay, sock_info.is_mongos, use_find_cmd) + set_slave_okay, sock_info, use_find_cmd) request_id, data, max_doc_size = self._split_message(message) if publish: encoding_duration = datetime.now() - start - cmd, dbn = operation.as_command() + cmd, dbn = operation.as_command(sock_info) listeners.publish_command_start( cmd, dbn, request_id, sock_info.address) start = datetime.now() diff --git a/test/test_collation.py b/test/test_collation.py index 71133f3e5..ef46713c1 100644 --- a/test/test_collation.py +++ b/test/test_collation.py @@ -114,10 +114,13 @@ class TestCollation(unittest.TestCase): def tearDown(self): self.listener.results.clear() + def last_command_started(self): + return self.listener.results['started'][-1].command + def assertCollationInLastCommand(self): self.assertEqual( self.collation.document, - self.listener.results['started'][-1].command['collation']) + self.last_command_started()['collation']) @raisesConfigurationErrorForOldMongoDB def test_create_collection(self): @@ -182,6 +185,15 @@ class TestCollation(unittest.TestCase): next(self.db.test.find(collation=self.collation)) self.assertCollationInLastCommand() + @raisesConfigurationErrorForOldMongoDB + def test_explain_command(self): + self.listener.results.clear() + self.db.test.find(collation=self.collation).explain() + # The collation should be part of the explained command. + self.assertEqual( + self.collation.document, + self.last_command_started()['explain']['collation']) + @raisesConfigurationErrorForOldMongoDB def test_group(self): self.db.test.group('foo', {'foo': {'$gt': 42}}, {}, diff --git a/test/test_collection.py b/test/test_collection.py index fad990bc3..f4fe34b62 100644 --- a/test/test_collection.py +++ b/test/test_collection.py @@ -54,6 +54,7 @@ from pymongo.errors import (ConfigurationError, from pymongo.message import _COMMAND_OVERHEAD, _gen_find_command from pymongo.mongo_client import MongoClient from pymongo.operations import * +from pymongo.read_concern import DEFAULT_READ_CONCERN from pymongo.read_preferences import ReadPreference from pymongo.results import (InsertOneResult, InsertManyResult, @@ -2262,7 +2263,8 @@ class TestCollection(IntegrationTest): def test_find_command_generation(self): cmd = _gen_find_command('coll', {'$query': {'foo': 1}, '$dumb': 2}, - None, 0, 0, 0, None, None, None) + None, 0, 0, 0, None, DEFAULT_READ_CONCERN, + None) self.assertEqual( cmd.to_dict(), SON([('find', 'coll'), diff --git a/test/test_cursor.py b/test/test_cursor.py index 6428ee065..54b3c72e9 100644 --- a/test/test_cursor.py +++ b/test/test_cursor.py @@ -347,6 +347,18 @@ class TestCursor(IntegrationTest): # "cursor" pre MongoDB 2.7.6, "executionStats" post self.assertTrue("cursor" in b or "executionStats" in b) + def test_explain_with_read_concern(self): + # Do not add readConcern level to explain. + listener = WhiteListEventListener("explain") + client = rs_or_single_client(event_listeners=[listener]) + self.addCleanup(client.close) + coll = client.pymongo_test.test.with_options( + read_concern=ReadConcern(level="local")) + self.assertTrue(coll.find().explain()) + started = listener.results['started'] + self.assertEqual(len(started), 1) + self.assertNotIn("readConern", started[0].command) + def test_hint(self): db = self.db self.assertRaises(TypeError, db.test.find().hint, 5.5) diff --git a/test/test_session.py b/test/test_session.py index 78718b363..e6c2ccdec 100644 --- a/test/test_session.py +++ b/test/test_session.py @@ -707,6 +707,8 @@ class TestCausalConsistency(unittest.TestCase): @client_context.require_no_standalone def test_reads(self): + # Make sure the collection exists. + self.client.pymongo_test.test.insert_one({}) self._test_reads( lambda coll, session: list(coll.aggregate([], session=session))) self._test_reads( @@ -858,9 +860,12 @@ class TestCausalConsistency(unittest.TestCase): lambda coll, session: coll.map_reduce( 'function() {}', 'function() {}', 'mrout', session=session)) - # It's not a write, but currentOp also doesn't support readConcern + # They are not writes, but currentOp and explain also don't support + # readConcern. self._test_no_read_concern( lambda coll, session: coll.database.current_op(session=session)) + self._test_no_read_concern( + lambda coll, session: coll.find({}, session=session).explain()) @client_context.require_no_standalone def test_get_more_does_not_include_read_concern(self):