PYTHON-2777 Raise client side error for snapshot reads on <5.0 (#659)

(cherry picked from commit 4152600ae6)
This commit is contained in:
Shane Harvey 2021-06-29 14:40:48 -07:00 committed by Shane Harvey
parent ee97eae027
commit 99e21c3ada
7 changed files with 213 additions and 19 deletions

View File

@ -302,7 +302,8 @@ class _Bulk(object):
if retryable and not self.started_retryable_write:
session._start_retryable_write()
self.started_retryable_write = True
session._apply_to(cmd, retryable, ReadPreference.PRIMARY)
session._apply_to(cmd, retryable, ReadPreference.PRIMARY,
sock_info)
sock_info.send_cluster_time(cmd, session, client)
ops = islice(run.ops, run.idx_offset, None)
# Run as many ops as possible in one command.

View File

@ -866,7 +866,7 @@ class ClientSession(object):
return self._transaction.opts.read_preference
return None
def _apply_to(self, command, is_retryable, read_preference):
def _apply_to(self, command, is_retryable, read_preference, sock_info):
self._check_ended()
self._server_session.last_use = monotonic.time()
@ -891,7 +891,7 @@ class ClientSession(object):
rc = self._transaction.opts.read_concern.document
if rc:
command['readConcern'] = rc
self._update_read_concern(command)
self._update_read_concern(command, sock_info)
command['txnNumber'] = self._server_session.transaction_id
command['autocommit'] = False
@ -900,12 +900,15 @@ class ClientSession(object):
self._check_ended()
self._server_session.inc_transaction_id()
def _update_read_concern(self, cmd):
def _update_read_concern(self, cmd, sock_info):
if (self.options.causal_consistency
and self.operation_time is not None):
cmd.setdefault('readConcern', {})[
'afterClusterTime'] = self.operation_time
if self.options.snapshot:
if sock_info.max_wire_version < 13:
raise ConfigurationError(
'Snapshot reads require MongoDB 5.0 or later')
rc = cmd.setdefault('readConcern', {})
rc['level'] = 'snapshot'
if self._snapshot_time is not None:

View File

@ -315,10 +315,10 @@ class _Query(object):
session = self.session
sock_info.add_server_api(cmd)
if session:
session._apply_to(cmd, False, self.read_preference)
session._apply_to(cmd, False, self.read_preference, sock_info)
# Explain does not support readConcern.
if not explain and not session.in_transaction:
session._update_read_concern(cmd)
session._update_read_concern(cmd, sock_info)
sock_info.send_cluster_time(cmd, session, self.client)
# Support auto encryption
client = self.client
@ -420,7 +420,7 @@ class _GetMore(object):
self.max_await_time_ms)
if self.session:
self.session._apply_to(cmd, False, self.read_preference)
self.session._apply_to(cmd, False, self.read_preference, sock_info)
sock_info.add_server_api(cmd)
sock_info.send_cluster_time(cmd, self.session, self.client)
# Support auto encryption

View File

@ -94,7 +94,7 @@ def command(sock_info, dbname, spec, slave_ok, is_mongos,
if read_concern.level:
spec['readConcern'] = read_concern.document
if session:
session._update_read_concern(spec)
session._update_read_concern(spec, sock_info)
if collation is not None:
spec['collation'] = collation

View File

@ -736,7 +736,8 @@ class SocketInfo(object):
self.add_server_api(spec)
if session:
session._apply_to(spec, retryable_write, read_preference)
session._apply_to(spec, retryable_write, read_preference,
self)
self.send_cluster_time(spec, session, client)
listeners = self.listeners if publish_events else None
unacknowledged = write_concern and not write_concern.acknowledged

View File

@ -0,0 +1,113 @@
{
"description": "snapshot-sessions-not-supported-client-error",
"schemaVersion": "1.0",
"runOnRequirements": [
{
"minServerVersion": "3.6",
"maxServerVersion": "4.4.99"
}
],
"createEntities": [
{
"client": {
"id": "client0",
"observeEvents": [
"commandStartedEvent",
"commandFailedEvent"
]
}
},
{
"database": {
"id": "database0",
"client": "client0",
"databaseName": "database0"
}
},
{
"collection": {
"id": "collection0",
"database": "database0",
"collectionName": "collection0"
}
},
{
"session": {
"id": "session0",
"client": "client0",
"sessionOptions": {
"snapshot": true
}
}
}
],
"initialData": [
{
"collectionName": "collection0",
"databaseName": "database0",
"documents": [
{
"_id": 1,
"x": 11
}
]
}
],
"tests": [
{
"description": "Client error on find with snapshot",
"operations": [
{
"name": "find",
"object": "collection0",
"arguments": {
"session": "session0",
"filter": {}
},
"expectError": {
"isClientError": true,
"errorContains": "Snapshot reads require MongoDB 5.0 or later"
}
}
],
"expectEvents": []
},
{
"description": "Client error on aggregate with snapshot",
"operations": [
{
"name": "aggregate",
"object": "collection0",
"arguments": {
"session": "session0",
"pipeline": []
},
"expectError": {
"isClientError": true,
"errorContains": "Snapshot reads require MongoDB 5.0 or later"
}
}
],
"expectEvents": []
},
{
"description": "Client error on distinct with snapshot",
"operations": [
{
"name": "distinct",
"object": "collection0",
"arguments": {
"fieldName": "x",
"filter": {},
"session": "session0"
},
"expectError": {
"isClientError": true,
"errorContains": "Snapshot reads require MongoDB 5.0 or later"
}
}
],
"expectEvents": []
}
]
}

View File

@ -3,11 +3,7 @@
"schemaVersion": "1.0",
"runOnRequirements": [
{
"minServerVersion": "3.6",
"maxServerVersion": "4.4.99"
},
{
"minServerVersion": "3.6",
"minServerVersion": "5.0",
"topologies": [
"single"
]
@ -20,11 +16,6 @@
"observeEvents": [
"commandStartedEvent",
"commandFailedEvent"
],
"ignoreCommandMonitoringEvents": [
"findAndModify",
"insert",
"update"
]
}
},
@ -106,6 +97,91 @@
]
}
]
},
{
"description": "Server returns an error on aggregate with snapshot",
"operations": [
{
"name": "aggregate",
"object": "collection0",
"arguments": {
"session": "session0",
"pipeline": []
},
"expectError": {
"isError": true,
"isClientError": false
}
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"command": {
"aggregate": "collection0",
"readConcern": {
"level": "snapshot",
"atClusterTime": {
"$$exists": false
}
}
}
}
},
{
"commandFailedEvent": {
"commandName": "aggregate"
}
}
]
}
]
},
{
"description": "Server returns an error on distinct with snapshot",
"operations": [
{
"name": "distinct",
"object": "collection0",
"arguments": {
"fieldName": "x",
"filter": {},
"session": "session0"
},
"expectError": {
"isError": true,
"isClientError": false
}
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"command": {
"distinct": "collection0",
"readConcern": {
"level": "snapshot",
"atClusterTime": {
"$$exists": false
}
}
}
}
},
{
"commandFailedEvent": {
"commandName": "distinct"
}
}
]
}
]
}
]
}