PYTHON-1332 Only gossip cluster time on >= 3.6.

Do not add readConcern to explain.
Test explain with collation.
This commit is contained in:
Shane Harvey 2017-11-16 09:02:19 -08:00
parent 36e585bdbf
commit 2a05236f61
13 changed files with 113 additions and 109 deletions

View File

@ -269,7 +269,7 @@ class _Bulk(object):
while run.idx_offset < len(run.ops):
if session and retryable:
cmd['txnNumber'] = session._transaction_id()
client._send_cluster_time(cmd, session)
sock_info.send_cluster_time(cmd, session, client)
check_keys = run.op_type == _INSERT
ops = islice(run.ops, run.idx_offset, None)
# Run as many ops as possible.

View File

@ -1945,15 +1945,15 @@ class Collection(common.BaseObject):
if exc.code != 26:
raise
cursor = {'id': 0, 'firstBatch': []}
return CommandCursor(coll, cursor, sock_info.address, session=s,
return CommandCursor(coll, cursor, sock_info.address,
session=s,
explicit_session=session is not None)
else:
namespace = _UJOIN % (self.__database.name, "system.indexes")
res = message._first_batch(
sock_info, self.__database.name, "system.indexes",
{"ns": self.__full_name}, 0, slave_ok, codec_options,
ReadPreference.PRIMARY, cmd,
self.database.client._event_listeners, session=None)
self.database.client._event_listeners)
cursor = res["cursor"]
# Note that a collection can only have 64 indexes, so there
# will never be a getMore call.

View File

@ -739,8 +739,7 @@ class Database(common.BaseObject):
return _first_batch(sock_info, "admin", "$cmd.sys.inprog",
spec, -1, True, self.codec_options,
ReadPreference.PRIMARY, cmd,
self.client._event_listeners,
session=None)
self.client._event_listeners)
def profiling_level(self, session=None):
"""Get the database's current profiling level.

View File

@ -170,31 +170,8 @@ _MODIFIERS = SON([
('$snapshot', 'snapshot')])
def _gen_explain_command(
coll, spec, projection, skip, limit, batch_size,
options, read_concern, session, client):
"""Generate an explain command document."""
cmd = _gen_find_command(coll, spec, projection, skip, limit, batch_size,
options, session=None, client=None)
if read_concern.level:
explain = SON([('explain', cmd), ('readConcern', read_concern.document)])
else:
explain = SON([('explain', cmd)])
if session:
explain['lsid'] = session._use_lsid()
if (session.options.causal_consistency
and session.operation_time is not None):
explain.setdefault(
'readConcern', {})['afterClusterTime'] = session.operation_time
client._send_cluster_time(explain, session)
return explain
def _gen_find_command(coll, spec, projection, skip, limit, batch_size, options,
session, client, read_concern=DEFAULT_READ_CONCERN,
collation=None):
read_concern, collation=None):
"""Generate a find command document."""
cmd = SON([('find', coll)])
if '$query' in spec:
@ -225,19 +202,10 @@ def _gen_find_command(coll, spec, projection, skip, limit, batch_size, options,
cmd.update([(opt, True)
for opt, val in _OPTIONS.items()
if options & val])
if session:
cmd['lsid'] = session._use_lsid()
if (session.options.causal_consistency
and session.operation_time is not None):
cmd.setdefault(
'readConcern', {})['afterClusterTime'] = session.operation_time
if client:
client._send_cluster_time(cmd, session)
return cmd
def _gen_get_more_command(cursor_id, coll, batch_size, max_await_time_ms,
session, client):
def _gen_get_more_command(cursor_id, coll, batch_size, max_await_time_ms):
"""Generate a getMore command document."""
cmd = SON([('getMore', cursor_id),
('collection', coll)])
@ -245,9 +213,6 @@ def _gen_get_more_command(cursor_id, coll, batch_size, max_await_time_ms,
cmd['batchSize'] = batch_size
if max_await_time_ms is not None:
cmd['maxTimeMS'] = max_await_time_ms
if session:
cmd['lsid'] = session._use_lsid()
client._send_cluster_time(cmd, session)
return cmd
@ -299,23 +264,32 @@ class _Query(object):
return use_find_cmd
def as_command(self):
def as_command(self, sock_info):
"""Return a find command document for this query.
Should be called *after* get_message.
"""
if '$explain' in self.spec:
explain = '$explain' in self.spec
cmd = _gen_find_command(
self.coll, self.spec, self.fields, self.ntoskip,
self.limit, self.batch_size, self.flags, self.read_concern,
self.collation)
if explain:
self.name = 'explain'
return _gen_explain_command(
self.coll, self.spec, self.fields, self.ntoskip,
self.limit, self.batch_size, self.flags,
self.read_concern, self.session, self.client), self.db
return _gen_find_command(self.coll, self.spec, self.fields,
self.ntoskip, self.limit, self.batch_size,
self.flags, self.session, self.client,
self.read_concern, self.collation), self.db
cmd = SON([('explain', cmd)])
session = self.session
if session:
cmd['lsid'] = session._use_lsid()
# Explain does not support readConcern.
if (not explain and session.options.causal_consistency
and session.operation_time is not None):
cmd.setdefault(
'readConcern', {})[
'afterClusterTime'] = session.operation_time
sock_info.send_cluster_time(cmd, session, self.client)
return cmd, self.db
def get_message(self, set_slave_ok, is_mongos, use_cmd=False):
def get_message(self, set_slave_ok, sock_info, use_cmd=False):
"""Get a query message, possibly setting the slaveOk bit."""
if set_slave_ok:
# Set the slaveOk bit.
@ -328,7 +302,7 @@ class _Query(object):
if use_cmd:
ns = _UJOIN % (self.db, "$cmd")
spec = self.as_command()[0]
spec = self.as_command(sock_info)[0]
ntoreturn = -1 # All DB commands return 1 document
else:
# OP_QUERY treats ntoreturn of -1 and 1 the same, return
@ -341,7 +315,7 @@ class _Query(object):
else:
ntoreturn = self.limit
if is_mongos:
if sock_info.is_mongos:
spec = _maybe_add_read_preference(spec,
self.read_preference)
@ -372,22 +346,25 @@ class _GetMore(object):
sock_info.validate_session(self.client, self.session)
return sock_info.max_wire_version >= 4 and not exhaust
def as_command(self):
def as_command(self, sock_info):
"""Return a getMore command document for this query."""
return _gen_get_more_command(self.cursor_id, self.coll,
self.ntoreturn,
self.max_await_time_ms,
self.session,
self.client), self.db
cmd = _gen_get_more_command(self.cursor_id, self.coll,
self.ntoreturn,
self.max_await_time_ms)
def get_message(self, dummy0, dummy1, use_cmd=False):
if self.session:
cmd['lsid'] = self.session._use_lsid()
sock_info.send_cluster_time(cmd, self.session, self.client)
return cmd, self.db
def get_message(self, dummy0, sock_info, use_cmd=False):
"""Get a getmore message."""
ns = _UJOIN % (self.db, self.coll)
if use_cmd:
ns = _UJOIN % (self.db, "$cmd")
spec = self.as_command()[0]
spec = self.as_command(sock_info)[0]
return query(0, ns, 0, -1, spec, None, self.codec_options)
@ -401,20 +378,20 @@ class _RawBatchQuery(_Query):
return False
def get_message(self, set_slave_ok, is_mongos, use_cmd=False):
def get_message(self, set_slave_ok, sock_info, use_cmd=False):
# Always pass False for use_cmd.
return super(_RawBatchQuery, self).get_message(set_slave_ok, is_mongos,
False)
return super(_RawBatchQuery, self).get_message(
set_slave_ok, sock_info, False)
class _RawBatchGetMore(_GetMore):
def use_command(self, socket_info, exhaust):
return False
def get_message(self, set_slave_ok, is_mongos, use_cmd=False):
def get_message(self, set_slave_ok, sock_info, use_cmd=False):
# Always pass False for use_cmd.
return super(_RawBatchGetMore, self).get_message(set_slave_ok, is_mongos,
False)
return super(_RawBatchGetMore, self).get_message(
set_slave_ok, sock_info, False)
class _CursorAddress(tuple):
@ -982,12 +959,11 @@ class _OpReply(object):
def _first_batch(sock_info, db, coll, query, ntoreturn,
slave_ok, codec_options, read_preference, cmd, listeners,
session):
slave_ok, codec_options, read_preference, cmd, listeners):
"""Simple query helper for retrieving a first (and possibly only) batch."""
query = _Query(
0, db, coll, 0, query, None, codec_options,
read_preference, ntoreturn, 0, DEFAULT_READ_CONCERN, None, session,
read_preference, ntoreturn, 0, DEFAULT_READ_CONCERN, None, None,
None)
name = next(iter(cmd))
@ -995,8 +971,7 @@ def _first_batch(sock_info, db, coll, query, ntoreturn,
if publish:
start = datetime.datetime.now()
request_id, msg, max_doc_size = query.get_message(slave_ok,
sock_info.is_mongos)
request_id, msg, max_doc_size = query.get_message(slave_ok, sock_info)
if publish:
encoding_duration = datetime.datetime.now() - start

View File

@ -1679,8 +1679,7 @@ class MongoClient(common.BaseObject):
message._first_batch(sock_info, "admin", "$cmd.sys.unlock",
{}, -1, True, self.codec_options,
ReadPreference.PRIMARY, cmd,
self._event_listeners,
session=None)
self._event_listeners)
def __enter__(self):
return self

View File

@ -120,10 +120,8 @@ class Monitor(object):
start = _time()
try:
cluster_time = self._topology.max_cluster_time()
# If the server type is unknown, send metadata with first check.
return self._check_once(metadata=metadata,
cluster_time=cluster_time)
return self._check_once(metadata=metadata)
except ReferenceError:
raise
except Exception as error:
@ -142,9 +140,7 @@ class Monitor(object):
# Always send metadata: this is a new connection.
start = _time()
try:
cluster_time = self._topology.max_cluster_time()
return self._check_once(metadata=self._pool.opts.metadata,
cluster_time=cluster_time)
return self._check_once(metadata=self._pool.opts.metadata)
except ReferenceError:
raise
except Exception as error:
@ -155,7 +151,7 @@ class Monitor(object):
self._avg_round_trip_time.reset()
return default
def _check_once(self, metadata=None, cluster_time=None):
def _check_once(self, metadata=None):
"""A single attempt to call ismaster.
Returns a ServerDescription, or raises an exception.
@ -165,7 +161,7 @@ class Monitor(object):
self._listeners.publish_server_heartbeat_started(address)
with self._pool.get_socket({}) as sock_info:
response, round_trip_time = self._check_with_socket(
sock_info, metadata=metadata, cluster_time=cluster_time)
sock_info, metadata=metadata)
self._avg_round_trip_time.add_sample(round_trip_time)
sd = ServerDescription(
address=address,
@ -177,7 +173,7 @@ class Monitor(object):
return sd
def _check_with_socket(self, sock_info, metadata=None, cluster_time=None):
def _check_with_socket(self, sock_info, metadata=None):
"""Return (IsMaster, round_trip_time).
Can raise ConnectionFailure or OperationFailure.
@ -185,8 +181,10 @@ class Monitor(object):
cmd = SON([('ismaster', 1)])
if metadata is not None:
cmd['client'] = metadata
if cluster_time is not None:
cmd['$clusterTime'] = cluster_time
if self._server_description.max_wire_version >= 6:
cluster_time = self._topology.max_cluster_time()
if cluster_time is not None:
cmd['$clusterTime'] = cluster_time
start = _time()
request_id, msg, max_doc_size = message.query(
0, 'admin.$cmd', 0, -1, cmd,

View File

@ -34,9 +34,8 @@ try:
except ImportError:
_SELECT_ERROR = OSError
from bson import SON
from pymongo import helpers, message
from pymongo.common import MAX_MESSAGE_SIZE, ORDERED_TYPES
from pymongo.common import MAX_MESSAGE_SIZE
from pymongo.errors import (AutoReconnect,
NotMasterError,
OperationFailure,
@ -53,7 +52,7 @@ def command(sock, dbname, spec, slave_ok, is_mongos,
check_keys=False, listeners=None, max_bson_size=None,
read_concern=None,
parse_write_concern_error=False,
collation=None, retryable_write=False):
collation=None):
"""Execute a command over the socket, or raise socket.error.
:Parameters:
@ -76,20 +75,10 @@ def command(sock, dbname, spec, slave_ok, is_mongos,
- `parse_write_concern_error`: Whether to parse the ``writeConcernError``
field in the command response.
- `collation`: The collation for this command.
- `retryable_write`: True if this command is a retryable write.
"""
name = next(iter(spec))
ns = dbname + '.$cmd'
flags = 4 if slave_ok else 0
if (client or session) and not isinstance(spec, ORDERED_TYPES):
# Ensure command name remains in first place.
spec = SON(spec)
if session:
spec['lsid'] = session._use_lsid()
if retryable_write:
spec['txnNumber'] = session._transaction_id()
if client:
client._send_cluster_time(spec, session)
# Publish the original command document, perhaps with lsid and $clusterTime.
orig = spec

View File

@ -33,7 +33,7 @@ from bson import DEFAULT_CODEC_OPTIONS
from bson.py3compat import imap, itervalues, _unicode, integer_types
from bson.son import SON
from pymongo import auth, helpers, thread_util, __version__
from pymongo.common import MAX_MESSAGE_SIZE
from pymongo.common import MAX_MESSAGE_SIZE, ORDERED_TYPES
from pymongo.errors import (AutoReconnect,
ConnectionFailure,
ConfigurationError,
@ -477,6 +477,15 @@ class SocketInfo(object):
elif self.max_wire_version < 5 and collation is not None:
raise ConfigurationError(
'Must be connected to MongoDB 3.4+ to use a collation.')
if (client or session) and not isinstance(spec, ORDERED_TYPES):
# Ensure command name remains in first place.
spec = SON(spec)
if session:
spec['lsid'] = session._use_lsid()
if retryable_write:
spec['txnNumber'] = session._transaction_id()
self.send_cluster_time(spec, session, client)
try:
return command(self.sock, dbname, spec, slave_ok,
self.is_mongos, read_preference, codec_options,
@ -484,8 +493,7 @@ class SocketInfo(object):
self.address, check_keys, self.listeners,
self.max_bson_size, read_concern,
parse_write_concern_error=parse_write_concern_error,
collation=collation,
retryable_write=retryable_write)
collation=collation)
except OperationFailure:
raise
# Catch socket.error, KeyboardInterrupt, etc. and close ourselves.
@ -615,6 +623,11 @@ class SocketInfo(object):
except Exception:
pass
def send_cluster_time(self, command, session, client):
"""Add cluster time for MongoDB >= 3.6."""
if self.max_wire_version >= 6 and client:
client._send_cluster_time(command, session)
def _raise_connection_failure(self, error):
# Catch *all* exceptions from socket methods and close the socket. In
# regular Python, socket operations only raise socket.error, even if

View File

@ -91,12 +91,12 @@ class Server(object):
use_find_cmd = operation.use_command(sock_info, exhaust)
message = operation.get_message(
set_slave_okay, sock_info.is_mongos, use_find_cmd)
set_slave_okay, sock_info, use_find_cmd)
request_id, data, max_doc_size = self._split_message(message)
if publish:
encoding_duration = datetime.now() - start
cmd, dbn = operation.as_command()
cmd, dbn = operation.as_command(sock_info)
listeners.publish_command_start(
cmd, dbn, request_id, sock_info.address)
start = datetime.now()

View File

@ -114,10 +114,13 @@ class TestCollation(unittest.TestCase):
def tearDown(self):
self.listener.results.clear()
def last_command_started(self):
return self.listener.results['started'][-1].command
def assertCollationInLastCommand(self):
self.assertEqual(
self.collation.document,
self.listener.results['started'][-1].command['collation'])
self.last_command_started()['collation'])
@raisesConfigurationErrorForOldMongoDB
def test_create_collection(self):
@ -182,6 +185,15 @@ class TestCollation(unittest.TestCase):
next(self.db.test.find(collation=self.collation))
self.assertCollationInLastCommand()
@raisesConfigurationErrorForOldMongoDB
def test_explain_command(self):
self.listener.results.clear()
self.db.test.find(collation=self.collation).explain()
# The collation should be part of the explained command.
self.assertEqual(
self.collation.document,
self.last_command_started()['explain']['collation'])
@raisesConfigurationErrorForOldMongoDB
def test_group(self):
self.db.test.group('foo', {'foo': {'$gt': 42}}, {},

View File

@ -54,6 +54,7 @@ from pymongo.errors import (ConfigurationError,
from pymongo.message import _COMMAND_OVERHEAD, _gen_find_command
from pymongo.mongo_client import MongoClient
from pymongo.operations import *
from pymongo.read_concern import DEFAULT_READ_CONCERN
from pymongo.read_preferences import ReadPreference
from pymongo.results import (InsertOneResult,
InsertManyResult,
@ -2262,7 +2263,8 @@ class TestCollection(IntegrationTest):
def test_find_command_generation(self):
cmd = _gen_find_command('coll', {'$query': {'foo': 1}, '$dumb': 2},
None, 0, 0, 0, None, None, None)
None, 0, 0, 0, None, DEFAULT_READ_CONCERN,
None)
self.assertEqual(
cmd.to_dict(),
SON([('find', 'coll'),

View File

@ -347,6 +347,18 @@ class TestCursor(IntegrationTest):
# "cursor" pre MongoDB 2.7.6, "executionStats" post
self.assertTrue("cursor" in b or "executionStats" in b)
def test_explain_with_read_concern(self):
# Do not add readConcern level to explain.
listener = WhiteListEventListener("explain")
client = rs_or_single_client(event_listeners=[listener])
self.addCleanup(client.close)
coll = client.pymongo_test.test.with_options(
read_concern=ReadConcern(level="local"))
self.assertTrue(coll.find().explain())
started = listener.results['started']
self.assertEqual(len(started), 1)
self.assertNotIn("readConern", started[0].command)
def test_hint(self):
db = self.db
self.assertRaises(TypeError, db.test.find().hint, 5.5)

View File

@ -707,6 +707,8 @@ class TestCausalConsistency(unittest.TestCase):
@client_context.require_no_standalone
def test_reads(self):
# Make sure the collection exists.
self.client.pymongo_test.test.insert_one({})
self._test_reads(
lambda coll, session: list(coll.aggregate([], session=session)))
self._test_reads(
@ -858,9 +860,12 @@ class TestCausalConsistency(unittest.TestCase):
lambda coll, session: coll.map_reduce(
'function() {}', 'function() {}', 'mrout', session=session))
# It's not a write, but currentOp also doesn't support readConcern
# They are not writes, but currentOp and explain also don't support
# readConcern.
self._test_no_read_concern(
lambda coll, session: coll.database.current_op(session=session))
self._test_no_read_concern(
lambda coll, session: coll.find({}, session=session).explain())
@client_context.require_no_standalone
def test_get_more_does_not_include_read_concern(self):