diff --git a/pymongo/bulk.py b/pymongo/bulk.py index 0189c2eac..76d024d9c 100644 --- a/pymongo/bulk.py +++ b/pymongo/bulk.py @@ -302,7 +302,8 @@ class _Bulk(object): if retryable and not self.started_retryable_write: session._start_retryable_write() self.started_retryable_write = True - session._apply_to(cmd, retryable, ReadPreference.PRIMARY) + session._apply_to(cmd, retryable, ReadPreference.PRIMARY, + sock_info) sock_info.send_cluster_time(cmd, session, client) ops = islice(run.ops, run.idx_offset, None) # Run as many ops as possible in one command. diff --git a/pymongo/client_session.py b/pymongo/client_session.py index 5f4402de6..5471a358d 100644 --- a/pymongo/client_session.py +++ b/pymongo/client_session.py @@ -866,7 +866,7 @@ class ClientSession(object): return self._transaction.opts.read_preference return None - def _apply_to(self, command, is_retryable, read_preference): + def _apply_to(self, command, is_retryable, read_preference, sock_info): self._check_ended() self._server_session.last_use = monotonic.time() @@ -891,7 +891,7 @@ class ClientSession(object): rc = self._transaction.opts.read_concern.document if rc: command['readConcern'] = rc - self._update_read_concern(command) + self._update_read_concern(command, sock_info) command['txnNumber'] = self._server_session.transaction_id command['autocommit'] = False @@ -900,12 +900,15 @@ class ClientSession(object): self._check_ended() self._server_session.inc_transaction_id() - def _update_read_concern(self, cmd): + def _update_read_concern(self, cmd, sock_info): if (self.options.causal_consistency and self.operation_time is not None): cmd.setdefault('readConcern', {})[ 'afterClusterTime'] = self.operation_time if self.options.snapshot: + if sock_info.max_wire_version < 13: + raise ConfigurationError( + 'Snapshot reads require MongoDB 5.0 or later') rc = cmd.setdefault('readConcern', {}) rc['level'] = 'snapshot' if self._snapshot_time is not None: diff --git a/pymongo/message.py b/pymongo/message.py index 82319419e..ac5c1f2c5 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -315,10 +315,10 @@ class _Query(object): session = self.session sock_info.add_server_api(cmd) if session: - session._apply_to(cmd, False, self.read_preference) + session._apply_to(cmd, False, self.read_preference, sock_info) # Explain does not support readConcern. if not explain and not session.in_transaction: - session._update_read_concern(cmd) + session._update_read_concern(cmd, sock_info) sock_info.send_cluster_time(cmd, session, self.client) # Support auto encryption client = self.client @@ -420,7 +420,7 @@ class _GetMore(object): self.max_await_time_ms) if self.session: - self.session._apply_to(cmd, False, self.read_preference) + self.session._apply_to(cmd, False, self.read_preference, sock_info) sock_info.add_server_api(cmd) sock_info.send_cluster_time(cmd, self.session, self.client) # Support auto encryption diff --git a/pymongo/network.py b/pymongo/network.py index d965f6dfd..0bd4aba7c 100644 --- a/pymongo/network.py +++ b/pymongo/network.py @@ -94,7 +94,7 @@ def command(sock_info, dbname, spec, slave_ok, is_mongos, if read_concern.level: spec['readConcern'] = read_concern.document if session: - session._update_read_concern(spec) + session._update_read_concern(spec, sock_info) if collation is not None: spec['collation'] = collation diff --git a/pymongo/pool.py b/pymongo/pool.py index b1eb49009..62ccca2e2 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -736,7 +736,8 @@ class SocketInfo(object): self.add_server_api(spec) if session: - session._apply_to(spec, retryable_write, read_preference) + session._apply_to(spec, retryable_write, read_preference, + self) self.send_cluster_time(spec, session, client) listeners = self.listeners if publish_events else None unacknowledged = write_concern and not write_concern.acknowledged diff --git a/test/sessions/unified/snapshot-sessions-not-supported-client-error.json b/test/sessions/unified/snapshot-sessions-not-supported-client-error.json new file mode 100644 index 000000000..129aa8d74 --- /dev/null +++ b/test/sessions/unified/snapshot-sessions-not-supported-client-error.json @@ -0,0 +1,113 @@ +{ + "description": "snapshot-sessions-not-supported-client-error", + "schemaVersion": "1.0", + "runOnRequirements": [ + { + "minServerVersion": "3.6", + "maxServerVersion": "4.4.99" + } + ], + "createEntities": [ + { + "client": { + "id": "client0", + "observeEvents": [ + "commandStartedEvent", + "commandFailedEvent" + ] + } + }, + { + "database": { + "id": "database0", + "client": "client0", + "databaseName": "database0" + } + }, + { + "collection": { + "id": "collection0", + "database": "database0", + "collectionName": "collection0" + } + }, + { + "session": { + "id": "session0", + "client": "client0", + "sessionOptions": { + "snapshot": true + } + } + } + ], + "initialData": [ + { + "collectionName": "collection0", + "databaseName": "database0", + "documents": [ + { + "_id": 1, + "x": 11 + } + ] + } + ], + "tests": [ + { + "description": "Client error on find with snapshot", + "operations": [ + { + "name": "find", + "object": "collection0", + "arguments": { + "session": "session0", + "filter": {} + }, + "expectError": { + "isClientError": true, + "errorContains": "Snapshot reads require MongoDB 5.0 or later" + } + } + ], + "expectEvents": [] + }, + { + "description": "Client error on aggregate with snapshot", + "operations": [ + { + "name": "aggregate", + "object": "collection0", + "arguments": { + "session": "session0", + "pipeline": [] + }, + "expectError": { + "isClientError": true, + "errorContains": "Snapshot reads require MongoDB 5.0 or later" + } + } + ], + "expectEvents": [] + }, + { + "description": "Client error on distinct with snapshot", + "operations": [ + { + "name": "distinct", + "object": "collection0", + "arguments": { + "fieldName": "x", + "filter": {}, + "session": "session0" + }, + "expectError": { + "isClientError": true, + "errorContains": "Snapshot reads require MongoDB 5.0 or later" + } + } + ], + "expectEvents": [] + } + ] +} diff --git a/test/sessions/unified/snapshot-sessions-not-supported-server-error.json b/test/sessions/unified/snapshot-sessions-not-supported-server-error.json index b6ce00216..79213f314 100644 --- a/test/sessions/unified/snapshot-sessions-not-supported-server-error.json +++ b/test/sessions/unified/snapshot-sessions-not-supported-server-error.json @@ -3,11 +3,7 @@ "schemaVersion": "1.0", "runOnRequirements": [ { - "minServerVersion": "3.6", - "maxServerVersion": "4.4.99" - }, - { - "minServerVersion": "3.6", + "minServerVersion": "5.0", "topologies": [ "single" ] @@ -20,11 +16,6 @@ "observeEvents": [ "commandStartedEvent", "commandFailedEvent" - ], - "ignoreCommandMonitoringEvents": [ - "findAndModify", - "insert", - "update" ] } }, @@ -106,6 +97,91 @@ ] } ] + }, + { + "description": "Server returns an error on aggregate with snapshot", + "operations": [ + { + "name": "aggregate", + "object": "collection0", + "arguments": { + "session": "session0", + "pipeline": [] + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "aggregate" + } + } + ] + } + ] + }, + { + "description": "Server returns an error on distinct with snapshot", + "operations": [ + { + "name": "distinct", + "object": "collection0", + "arguments": { + "fieldName": "x", + "filter": {}, + "session": "session0" + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "distinct": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "distinct" + } + } + ] + } + ] } ] }