PYTHON-1860 Use OP_MSG for find/aggregate_raw_batches when supported (#622)

(cherry picked from commit 209d5009e6)
This commit is contained in:
Prashant Mital 2021-05-19 12:05:35 -07:00 committed by Prashant Mital
parent 2ac2da09d5
commit 0cbd7a2fa4
No known key found for this signature in database
GPG Key ID: 8EFE2B468F727B75
5 changed files with 50 additions and 14 deletions

View File

@ -1062,6 +1062,16 @@ def _decode_selective(rawdoc, fields, codec_options):
return doc
def _convert_raw_document_lists_to_streams(document):
cursor = document.get('cursor')
if cursor:
for key in ('firstBatch', 'nextBatch'):
batch = cursor.get(key)
if batch:
stream = b"".join(doc.raw for doc in batch)
cursor[key] = [stream]
def _decode_all_selective(data, codec_options, fields):
"""Decode BSON data to a single document while using user-provided
custom decoding logic.

View File

@ -16,6 +16,7 @@
from collections import deque
from bson import _convert_raw_document_lists_to_streams
from bson.py3compat import integer_types
from pymongo.errors import (ConnectionFailure,
InvalidOperation,
@ -302,7 +303,13 @@ class RawBatchCommandCursor(CommandCursor):
def _unpack_response(self, response, cursor_id, codec_options,
user_fields=None, legacy_response=False):
return response.raw_response(cursor_id)
raw_response = response.raw_response(
cursor_id, user_fields=user_fields)
if not legacy_response:
# OP_MSG returns firstBatch/nextBatch documents as a BSON array
# Re-assemble the array of documents into a document stream
_convert_raw_document_lists_to_streams(raw_response[0])
return raw_response
def __getitem__(self, index):
raise InvalidOperation("Cannot call __getitem__ on RawBatchCursor")

View File

@ -19,7 +19,7 @@ import warnings
from collections import deque
from bson import RE_TYPE
from bson import RE_TYPE, _convert_raw_document_lists_to_streams
from bson.code import Code
from bson.py3compat import (iteritems,
integer_types,
@ -1137,7 +1137,6 @@ class Cursor(object):
limit = min(limit, self.__batch_size)
else:
limit = self.__batch_size
# Exhaust cursors don't send getMore messages.
g = self._getmore_class(self.__dbname,
self.__collname,
@ -1303,7 +1302,13 @@ class RawBatchCursor(Cursor):
def _unpack_response(self, response, cursor_id, codec_options,
user_fields=None, legacy_response=False):
return response.raw_response(cursor_id)
raw_response = response.raw_response(
cursor_id, user_fields=user_fields)
if not legacy_response:
# OP_MSG returns firstBatch/nextBatch documents as a BSON array
# Re-assemble the array of documents into a document stream
_convert_raw_document_lists_to_streams(raw_response[0])
return raw_response
def explain(self):
"""Returns an explain plan record for this cursor.

View File

@ -28,10 +28,12 @@ import bson
from bson import (CodecOptions,
decode,
encode,
_decode_selective,
_dict_to_bson,
_make_c_string)
from bson.codec_options import DEFAULT_CODEC_OPTIONS
from bson.raw_bson import _inflate_bson, DEFAULT_RAW_BSON_OPTIONS
from bson.raw_bson import (_inflate_bson, DEFAULT_RAW_BSON_OPTIONS,
RawBSONDocument)
from bson.py3compat import b, StringIO
from bson.son import SON
@ -442,28 +444,30 @@ class _GetMore(object):
return get_more(ns, self.ntoreturn, self.cursor_id, ctx)
# TODO: Use OP_MSG once the server is able to respond with document streams.
class _RawBatchQuery(_Query):
def use_command(self, socket_info, exhaust):
# Compatibility checks.
super(_RawBatchQuery, self).use_command(socket_info, exhaust)
# Use OP_MSG when available.
if socket_info.op_msg_enabled and not exhaust:
return True
return False
def get_message(self, set_slave_ok, sock_info, use_cmd=False):
# Always pass False for use_cmd.
return super(_RawBatchQuery, self).get_message(
set_slave_ok, sock_info, False)
set_slave_ok, sock_info, use_cmd)
class _RawBatchGetMore(_GetMore):
def use_command(self, socket_info, exhaust):
# Use OP_MSG when available.
if socket_info.op_msg_enabled and not exhaust:
return True
return False
def get_message(self, set_slave_ok, sock_info, use_cmd=False):
# Always pass False for use_cmd.
return super(_RawBatchGetMore, self).get_message(
set_slave_ok, sock_info, False)
set_slave_ok, sock_info, use_cmd)
class _CursorAddress(tuple):
@ -1492,7 +1496,7 @@ class _OpReply(object):
self.number_returned = number_returned
self.documents = documents
def raw_response(self, cursor_id=None):
def raw_response(self, cursor_id=None, user_fields=None):
"""Check the response header from the database, without decoding BSON.
Check the response for errors and unpack.
@ -1602,8 +1606,15 @@ class _OpMsg(object):
self.flags = flags
self.payload_document = payload_document
def raw_response(self, cursor_id=None):
raise NotImplementedError
def raw_response(self, cursor_id=None, user_fields={}):
"""
cursor_id is ignored
user_fields is used to determine which fields must not be decoded
"""
inflated_response = _decode_selective(
RawBSONDocument(self.payload_document), user_fields,
DEFAULT_RAW_BSON_OPTIONS)
return [inflated_response]
def unpack_response(self, cursor_id=None,
codec_options=_UNICODE_REPLACE_CODEC_OPTIONS,

View File

@ -41,6 +41,7 @@ from pymongo.errors import (ConfigurationError,
OperationFailure)
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import ReadPreference
from pymongo.write_concern import WriteConcern
from test import (client_context,
unittest,
IntegrationTest)
@ -1526,6 +1527,8 @@ class TestRawBatchCursor(IntegrationTest):
@client_context.require_version_min(3, 2)
def test_read_concern(self):
self.db.get_collection(
"test", write_concern=WriteConcern(w="majority")).insert_one({})
c = self.db.get_collection("test", read_concern=ReadConcern("majority"))
next(c.find_raw_batches())