diff --git a/pymongo/change_stream.py b/pymongo/change_stream.py index ad3dbaa61..8b2b57289 100644 --- a/pymongo/change_stream.py +++ b/pymongo/change_stream.py @@ -66,9 +66,11 @@ class ChangeStream(object): self._orig_codec_options = target.codec_options if target.codec_options.type_registry._decoder_map: self._decode_custom = True + # Keep the type registry so that we support encoding custom types + # in the pipeline. self._target = target.with_options( codec_options=target.codec_options.with_options( - document_class=RawBSONDocument, type_registry=None)) + document_class=RawBSONDocument)) else: self._target = target diff --git a/test/test_custom_types.py b/test/test_custom_types.py index cc83c1e6d..d7095da38 100644 --- a/test/test_custom_types.py +++ b/test/test_custom_types.py @@ -777,6 +777,29 @@ class ChangeStreamsWCustomTypesTestMixin(object): self.kill_change_stream_cursor(change_stream) self.insert_and_check(change_stream, input_docs[2], expected_docs[2]) + def test_custom_type_in_pipeline(self): + codecopts = CodecOptions(type_registry=TypeRegistry([ + UndecipherableIntEncoder(), UppercaseTextDecoder()])) + self.create_targets(codec_options=codecopts) + + input_docs = [ + {'_id': UndecipherableInt64Type(1), 'data': 'hello'}, + {'_id': 2, 'data': 'world'}, + {'_id': UndecipherableInt64Type(3), 'data': '!'}] + expected_docs = [ + {'_id': 2, 'data': 'WORLD'}, + {'_id': 3, 'data': '!'}] + + # UndecipherableInt64Type should be encoded with the TypeRegistry. + change_stream = self.change_stream( + [{'$match': {'documentKey._id': { + '$gte': UndecipherableInt64Type(2)}}}]) + + self.input_target.insert_one(input_docs[0]) + self.insert_and_check(change_stream, input_docs[1], expected_docs[0]) + self.kill_change_stream_cursor(change_stream) + self.insert_and_check(change_stream, input_docs[2], expected_docs[1]) + def test_break_resume_token(self): # Get one document from a change stream to determine resumeToken type. self.create_targets()