PYTHON-1818 Support custom type encoding in watch pipelines

(cherry picked from commit 9cca2a7d2c)
This commit is contained in:
Shane Harvey 2019-04-19 13:57:24 -07:00
parent 824b58ac60
commit 353be17179
2 changed files with 26 additions and 1 deletions

View File

@ -66,9 +66,11 @@ class ChangeStream(object):
self._orig_codec_options = target.codec_options
if target.codec_options.type_registry._decoder_map:
self._decode_custom = True
# Keep the type registry so that we support encoding custom types
# in the pipeline.
self._target = target.with_options(
codec_options=target.codec_options.with_options(
document_class=RawBSONDocument, type_registry=None))
document_class=RawBSONDocument))
else:
self._target = target

View File

@ -777,6 +777,29 @@ class ChangeStreamsWCustomTypesTestMixin(object):
self.kill_change_stream_cursor(change_stream)
self.insert_and_check(change_stream, input_docs[2], expected_docs[2])
def test_custom_type_in_pipeline(self):
codecopts = CodecOptions(type_registry=TypeRegistry([
UndecipherableIntEncoder(), UppercaseTextDecoder()]))
self.create_targets(codec_options=codecopts)
input_docs = [
{'_id': UndecipherableInt64Type(1), 'data': 'hello'},
{'_id': 2, 'data': 'world'},
{'_id': UndecipherableInt64Type(3), 'data': '!'}]
expected_docs = [
{'_id': 2, 'data': 'WORLD'},
{'_id': 3, 'data': '!'}]
# UndecipherableInt64Type should be encoded with the TypeRegistry.
change_stream = self.change_stream(
[{'$match': {'documentKey._id': {
'$gte': UndecipherableInt64Type(2)}}}])
self.input_target.insert_one(input_docs[0])
self.insert_and_check(change_stream, input_docs[1], expected_docs[0])
self.kill_change_stream_cursor(change_stream)
self.insert_and_check(change_stream, input_docs[2], expected_docs[1])
def test_break_resume_token(self):
# Get one document from a change stream to determine resumeToken type.
self.create_targets()