In a sharded transaction, a session is pinned to the mongos server selected for the initial command. All subsequent commands in the same transaction are routed to the pinned mongos server.
263 lines
8.9 KiB
Python
263 lines
8.9 KiB
Python
# Copyright 2017 MongoDB, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you
|
|
# may not use this file except in compliance with the License. You
|
|
# may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied. See the License for the specific language governing
|
|
# permissions and limitations under the License.
|
|
|
|
"""ChangeStream cursor to iterate over changes on a collection."""
|
|
|
|
import copy
|
|
|
|
from bson.son import SON
|
|
|
|
from pymongo import common
|
|
from pymongo.collation import validate_collation_or_none
|
|
from pymongo.command_cursor import CommandCursor
|
|
from pymongo.errors import (ConnectionFailure,
|
|
InvalidOperation,
|
|
OperationFailure,
|
|
PyMongoError)
|
|
|
|
|
|
# The change streams spec considers the following server errors from the
|
|
# getMore command non-resumable. All other getMore errors are resumable.
|
|
_NON_RESUMABLE_GETMORE_ERRORS = frozenset([
|
|
11601, # Interrupted
|
|
136, # CappedPositionLost
|
|
237, # CursorKilled
|
|
None, # No error code was returned.
|
|
])
|
|
|
|
|
|
class ChangeStream(object):
|
|
"""The internal abstract base class for change stream cursors.
|
|
|
|
Should not be called directly by application developers. Use
|
|
:meth:pymongo.collection.Collection.watch,
|
|
:meth:pymongo.database.Database.watch, or
|
|
:meth:pymongo.mongo_client.MongoClient.watch instead.
|
|
|
|
Defines the interface for change streams. Should be subclassed to
|
|
implement the `ChangeStream._create_cursor` abstract method, and
|
|
the `ChangeStream._database`and ChangeStream._aggregation_target`
|
|
abstract properties.
|
|
"""
|
|
def __init__(self, target, pipeline, full_document, resume_after,
|
|
max_await_time_ms, batch_size, collation,
|
|
start_at_operation_time, session):
|
|
if pipeline is None:
|
|
pipeline = []
|
|
elif not isinstance(pipeline, list):
|
|
raise TypeError("pipeline must be a list")
|
|
|
|
common.validate_string_or_none('full_document', full_document)
|
|
validate_collation_or_none(collation)
|
|
common.validate_non_negative_integer_or_none("batchSize", batch_size)
|
|
|
|
self._target = target
|
|
self._pipeline = copy.deepcopy(pipeline)
|
|
self._full_document = full_document
|
|
self._resume_token = copy.deepcopy(resume_after)
|
|
self._max_await_time_ms = max_await_time_ms
|
|
self._batch_size = batch_size
|
|
self._collation = collation
|
|
self._start_at_operation_time = start_at_operation_time
|
|
self._session = session
|
|
self._cursor = self._create_cursor()
|
|
|
|
@property
|
|
def _aggregation_target(self):
|
|
"""The argument to pass to the aggregate command."""
|
|
raise NotImplementedError
|
|
|
|
@property
|
|
def _database(self):
|
|
"""The database against which the aggregation commands for
|
|
this ChangeStream will be run. """
|
|
raise NotImplementedError
|
|
|
|
def _pipeline_options(self):
|
|
options = {}
|
|
if self._full_document is not None:
|
|
options['fullDocument'] = self._full_document
|
|
if self._resume_token is not None:
|
|
options['resumeAfter'] = self._resume_token
|
|
if self._start_at_operation_time is not None:
|
|
options['startAtOperationTime'] = self._start_at_operation_time
|
|
return options
|
|
|
|
def _full_pipeline(self):
|
|
"""Return the full aggregation pipeline for this ChangeStream."""
|
|
options = self._pipeline_options()
|
|
full_pipeline = [{'$changeStream': options}]
|
|
full_pipeline.extend(self._pipeline)
|
|
return full_pipeline
|
|
|
|
def _run_aggregation_cmd(self, session, explicit_session):
|
|
"""Run the full aggregation pipeline for this ChangeStream and return
|
|
the corresponding CommandCursor.
|
|
"""
|
|
read_preference = self._target._read_preference_for(session)
|
|
client = self._database.client
|
|
with client._socket_for_reads(
|
|
read_preference, session) as (sock_info, slave_ok):
|
|
pipeline = self._full_pipeline()
|
|
cmd = SON([("aggregate", self._aggregation_target),
|
|
("pipeline", pipeline),
|
|
("cursor", {})])
|
|
|
|
result = sock_info.command(
|
|
self._database.name,
|
|
cmd,
|
|
slave_ok,
|
|
read_preference,
|
|
self._target.codec_options,
|
|
parse_write_concern_error=True,
|
|
read_concern=self._target.read_concern,
|
|
collation=self._collation,
|
|
session=session,
|
|
client=self._database.client)
|
|
|
|
cursor = result["cursor"]
|
|
|
|
if (self._start_at_operation_time is None and
|
|
self._resume_token is None and
|
|
cursor.get("_id") is None and
|
|
sock_info.max_wire_version >= 7):
|
|
self._start_at_operation_time = result["operationTime"]
|
|
|
|
ns = cursor["ns"]
|
|
_, collname = ns.split(".", 1)
|
|
aggregation_collection = self._database.get_collection(
|
|
collname, codec_options=self._target.codec_options,
|
|
read_preference=read_preference,
|
|
write_concern=self._target.write_concern,
|
|
read_concern=self._target.read_concern
|
|
)
|
|
|
|
return CommandCursor(
|
|
aggregation_collection, cursor, sock_info.address,
|
|
batch_size=self._batch_size or 0,
|
|
max_await_time_ms=self._max_await_time_ms,
|
|
session=session, explicit_session=explicit_session
|
|
)
|
|
|
|
def _create_cursor(self):
|
|
with self._database.client._tmp_session(self._session, close=False) as s:
|
|
return self._run_aggregation_cmd(
|
|
session=s,
|
|
explicit_session=self._session is not None
|
|
)
|
|
|
|
def _resume(self):
|
|
"""Reestablish this change stream after a resumable error."""
|
|
try:
|
|
self._cursor.close()
|
|
except PyMongoError:
|
|
pass
|
|
self._cursor = self._create_cursor()
|
|
|
|
def close(self):
|
|
"""Close this ChangeStream."""
|
|
self._cursor.close()
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
def next(self):
|
|
"""Advance the cursor.
|
|
|
|
This method blocks until the next change document is returned or an
|
|
unrecoverable error is raised.
|
|
|
|
Raises :exc:`StopIteration` if this ChangeStream is closed.
|
|
"""
|
|
while True:
|
|
try:
|
|
change = self._cursor.next()
|
|
except ConnectionFailure:
|
|
self._resume()
|
|
continue
|
|
except OperationFailure as exc:
|
|
if exc.code in _NON_RESUMABLE_GETMORE_ERRORS:
|
|
raise
|
|
self._resume()
|
|
continue
|
|
try:
|
|
resume_token = change['_id']
|
|
except KeyError:
|
|
self.close()
|
|
raise InvalidOperation(
|
|
"Cannot provide resume functionality when the resume "
|
|
"token is missing.")
|
|
self._resume_token = copy.copy(resume_token)
|
|
self._start_at_operation_time = None
|
|
return change
|
|
|
|
__next__ = next
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.close()
|
|
|
|
|
|
class CollectionChangeStream(ChangeStream):
|
|
"""Class for creating a change stream on a collection.
|
|
|
|
Should not be called directly by application developers. Use
|
|
helper method :meth:`pymongo.collection.Collection.watch` instead.
|
|
|
|
.. versionadded: 3.6
|
|
.. mongodoc:: changeStreams
|
|
"""
|
|
@property
|
|
def _aggregation_target(self):
|
|
return self._target.name
|
|
|
|
@property
|
|
def _database(self):
|
|
return self._target.database
|
|
|
|
|
|
class DatabaseChangeStream(ChangeStream):
|
|
"""Class for creating a change stream on all collections in a database.
|
|
|
|
Should not be called directly by application developers. Use
|
|
helper method :meth:`pymongo.database.Database.watch` instead.
|
|
|
|
.. versionadded: 3.7
|
|
.. mongodoc:: changeStreams
|
|
"""
|
|
@property
|
|
def _aggregation_target(self):
|
|
return 1
|
|
|
|
@property
|
|
def _database(self):
|
|
return self._target
|
|
|
|
|
|
class ClusterChangeStream(DatabaseChangeStream):
|
|
"""Class for creating a change stream on all collections on a cluster.
|
|
|
|
Should not be called directly by application developers. Use
|
|
helper method :meth:`pymongo.mongo_client.MongoClient.watch` instead.
|
|
|
|
.. versionadded: 3.7
|
|
.. mongodoc:: changeStreams
|
|
"""
|
|
def _pipeline_options(self):
|
|
options = super(ClusterChangeStream, self)._pipeline_options()
|
|
options["allChangesForCluster"] = True
|
|
return options
|