This change removes the u() helper from bson.py3compat and all of its uses in the driver and tests. PyPy3 continues to be supported since, even though it is based on python 3.2.5, it has always supported the u string prefix. The README and install docs are now explicit about PyPy(3) support.
994 lines
41 KiB
Python
994 lines
41 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
# Copyright 2009-2014 MongoDB, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Test the bson module."""
|
|
|
|
import collections
|
|
import datetime
|
|
import re
|
|
import sys
|
|
import uuid
|
|
|
|
sys.path[0:0] = [""]
|
|
|
|
import bson
|
|
from bson import (BSON,
|
|
decode_all,
|
|
decode_file_iter,
|
|
decode_iter,
|
|
EPOCH_AWARE,
|
|
is_valid,
|
|
Regex)
|
|
from bson.binary import Binary, UUIDLegacy
|
|
from bson.code import Code
|
|
from bson.codec_options import CodecOptions
|
|
from bson.int64 import Int64
|
|
from bson.objectid import ObjectId
|
|
from bson.dbref import DBRef
|
|
from bson.py3compat import PY3, text_type, iteritems, StringIO
|
|
from bson.son import SON
|
|
from bson.timestamp import Timestamp
|
|
from bson.tz_util import FixedOffset
|
|
from bson.errors import (InvalidBSON,
|
|
InvalidDocument,
|
|
InvalidStringData)
|
|
from bson.max_key import MaxKey
|
|
from bson.min_key import MinKey
|
|
from bson.tz_util import (FixedOffset,
|
|
utc)
|
|
|
|
from test import qcheck, SkipTest, unittest
|
|
|
|
if PY3:
|
|
long = int
|
|
|
|
|
|
class NotADict(collections.MutableMapping):
|
|
"""Non-dict type that implements the mapping protocol."""
|
|
|
|
def __init__(self, initial=None):
|
|
if not initial:
|
|
self._dict = {}
|
|
else:
|
|
self._dict = initial
|
|
|
|
def __iter__(self):
|
|
return iter(self._dict)
|
|
|
|
def __getitem__(self, item):
|
|
return self._dict[item]
|
|
|
|
def __delitem__(self, item):
|
|
del self._dict[item]
|
|
|
|
def __setitem__(self, item, value):
|
|
self._dict[item] = value
|
|
|
|
def __len__(self):
|
|
return len(self._dict)
|
|
|
|
def __eq__(self, other):
|
|
if isinstance(other, collections.Mapping):
|
|
return all(self.get(k) == other.get(k) for k in self)
|
|
return NotImplemented
|
|
|
|
def __repr__(self):
|
|
return "NotADict(%s)" % repr(self._dict)
|
|
|
|
|
|
class DSTAwareTimezone(datetime.tzinfo):
|
|
|
|
def __init__(self, offset, name, dst_start_month, dst_end_month):
|
|
self.__offset = offset
|
|
self.__dst_start_month = dst_start_month
|
|
self.__dst_end_month = dst_end_month
|
|
self.__name = name
|
|
|
|
def _is_dst(self, dt):
|
|
return self.__dst_start_month <= dt.month <= self.__dst_end_month
|
|
|
|
def utcoffset(self, dt):
|
|
return datetime.timedelta(minutes=self.__offset) + self.dst(dt)
|
|
|
|
def dst(self, dt):
|
|
if self._is_dst(dt):
|
|
return datetime.timedelta(hours=1)
|
|
return datetime.timedelta(0)
|
|
|
|
def tzname(self, dt):
|
|
return self.__name
|
|
|
|
|
|
class TestBSON(unittest.TestCase):
|
|
def assertInvalid(self, data):
|
|
self.assertRaises(InvalidBSON, bson.BSON(data).decode)
|
|
|
|
def check_encode_then_decode(self, doc_class=dict):
|
|
|
|
# Work around http://bugs.jython.org/issue1728
|
|
if sys.platform.startswith('java'):
|
|
doc_class = SON
|
|
|
|
def helper(doc):
|
|
self.assertEqual(doc, (BSON.encode(doc_class(doc))).decode())
|
|
helper({})
|
|
helper({"test": u"hello"})
|
|
self.assertTrue(isinstance(BSON.encode({"hello": "world"})
|
|
.decode()["hello"],
|
|
text_type))
|
|
helper({"mike": -10120})
|
|
helper({"long": Int64(10)})
|
|
helper({"really big long": 2147483648})
|
|
helper({u"hello": 0.0013109})
|
|
helper({"something": True})
|
|
helper({"false": False})
|
|
helper({"an array": [1, True, 3.8, u"world"]})
|
|
helper({"an object": doc_class({"test": u"something"})})
|
|
helper({"a binary": Binary(b"test", 100)})
|
|
helper({"a binary": Binary(b"test", 128)})
|
|
helper({"a binary": Binary(b"test", 254)})
|
|
helper({"another binary": Binary(b"test", 2)})
|
|
helper(SON([(u'test dst', datetime.datetime(1993, 4, 4, 2))]))
|
|
helper(SON([(u'test negative dst',
|
|
datetime.datetime(1, 1, 1, 1, 1, 1))]))
|
|
helper({"big float": float(10000000000)})
|
|
helper({"ref": DBRef("coll", 5)})
|
|
helper({"ref": DBRef("coll", 5, foo="bar", bar=4)})
|
|
helper({"ref": DBRef("coll", 5, "foo")})
|
|
helper({"ref": DBRef("coll", 5, "foo", foo="bar")})
|
|
helper({"ref": Timestamp(1, 2)})
|
|
helper({"foo": MinKey()})
|
|
helper({"foo": MaxKey()})
|
|
helper({"$field": Code("function(){ return true; }")})
|
|
helper({"$field": Code("return function(){ return x; }", scope={'x': False})})
|
|
|
|
def encode_then_decode(doc):
|
|
return doc_class(doc) == BSON.encode(doc).decode(
|
|
CodecOptions(document_class=doc_class))
|
|
|
|
qcheck.check_unittest(self, encode_then_decode,
|
|
qcheck.gen_mongo_dict(3))
|
|
|
|
def test_encode_then_decode(self):
|
|
self.check_encode_then_decode()
|
|
|
|
def test_encode_then_decode_any_mapping(self):
|
|
self.check_encode_then_decode(doc_class=NotADict)
|
|
|
|
def test_encoding_defaultdict(self):
|
|
dct = collections.defaultdict(dict, [('foo', 'bar')])
|
|
BSON.encode(dct)
|
|
self.assertEqual(dct, collections.defaultdict(dict, [('foo', 'bar')]))
|
|
|
|
def test_basic_validation(self):
|
|
self.assertRaises(TypeError, is_valid, 100)
|
|
self.assertRaises(TypeError, is_valid, u"test")
|
|
self.assertRaises(TypeError, is_valid, 10.4)
|
|
|
|
self.assertInvalid(b"test")
|
|
|
|
# the simplest valid BSON document
|
|
self.assertTrue(is_valid(b"\x05\x00\x00\x00\x00"))
|
|
self.assertTrue(is_valid(BSON(b"\x05\x00\x00\x00\x00")))
|
|
|
|
# failure cases
|
|
self.assertInvalid(b"\x04\x00\x00\x00\x00")
|
|
self.assertInvalid(b"\x05\x00\x00\x00\x01")
|
|
self.assertInvalid(b"\x05\x00\x00\x00")
|
|
self.assertInvalid(b"\x05\x00\x00\x00\x00\x00")
|
|
self.assertInvalid(b"\x07\x00\x00\x00\x02a\x00\x78\x56\x34\x12")
|
|
self.assertInvalid(b"\x09\x00\x00\x00\x10a\x00\x05\x00")
|
|
self.assertInvalid(b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")
|
|
self.assertInvalid(b"\x13\x00\x00\x00\x02foo\x00"
|
|
b"\x04\x00\x00\x00bar\x00\x00")
|
|
self.assertInvalid(b"\x18\x00\x00\x00\x03foo\x00\x0f\x00\x00"
|
|
b"\x00\x10bar\x00\xff\xff\xff\x7f\x00\x00")
|
|
self.assertInvalid(b"\x15\x00\x00\x00\x03foo\x00\x0c"
|
|
b"\x00\x00\x00\x08bar\x00\x01\x00\x00")
|
|
self.assertInvalid(b"\x1c\x00\x00\x00\x03foo\x00"
|
|
b"\x12\x00\x00\x00\x02bar\x00"
|
|
b"\x05\x00\x00\x00baz\x00\x00\x00")
|
|
self.assertInvalid(b"\x10\x00\x00\x00\x02a\x00"
|
|
b"\x04\x00\x00\x00abc\xff\x00")
|
|
|
|
def test_bad_string_lengths(self):
|
|
self.assertInvalid(
|
|
b"\x0c\x00\x00\x00\x02\x00"
|
|
b"\x00\x00\x00\x00\x00\x00")
|
|
self.assertInvalid(
|
|
b"\x12\x00\x00\x00\x02\x00"
|
|
b"\xff\xff\xff\xfffoobar\x00\x00")
|
|
self.assertInvalid(
|
|
b"\x0c\x00\x00\x00\x0e\x00"
|
|
b"\x00\x00\x00\x00\x00\x00")
|
|
self.assertInvalid(
|
|
b"\x12\x00\x00\x00\x0e\x00"
|
|
b"\xff\xff\xff\xfffoobar\x00\x00")
|
|
self.assertInvalid(
|
|
b"\x18\x00\x00\x00\x0c\x00"
|
|
b"\x00\x00\x00\x00\x00RY\xb5j"
|
|
b"\xfa[\xd8A\xd6X]\x99\x00")
|
|
self.assertInvalid(
|
|
b"\x1e\x00\x00\x00\x0c\x00"
|
|
b"\xff\xff\xff\xfffoobar\x00"
|
|
b"RY\xb5j\xfa[\xd8A\xd6X]\x99\x00")
|
|
self.assertInvalid(
|
|
b"\x0c\x00\x00\x00\r\x00"
|
|
b"\x00\x00\x00\x00\x00\x00")
|
|
self.assertInvalid(
|
|
b"\x0c\x00\x00\x00\r\x00"
|
|
b"\xff\xff\xff\xff\x00\x00")
|
|
self.assertInvalid(
|
|
b"\x1c\x00\x00\x00\x0f\x00"
|
|
b"\x15\x00\x00\x00\x00\x00"
|
|
b"\x00\x00\x00\x0c\x00\x00"
|
|
b"\x00\x02\x00\x01\x00\x00"
|
|
b"\x00\x00\x00\x00")
|
|
self.assertInvalid(
|
|
b"\x1c\x00\x00\x00\x0f\x00"
|
|
b"\x15\x00\x00\x00\xff\xff"
|
|
b"\xff\xff\x00\x0c\x00\x00"
|
|
b"\x00\x02\x00\x01\x00\x00"
|
|
b"\x00\x00\x00\x00")
|
|
self.assertInvalid(
|
|
b"\x1c\x00\x00\x00\x0f\x00"
|
|
b"\x15\x00\x00\x00\x01\x00"
|
|
b"\x00\x00\x00\x0c\x00\x00"
|
|
b"\x00\x02\x00\x00\x00\x00"
|
|
b"\x00\x00\x00\x00")
|
|
self.assertInvalid(
|
|
b"\x1c\x00\x00\x00\x0f\x00"
|
|
b"\x15\x00\x00\x00\x01\x00"
|
|
b"\x00\x00\x00\x0c\x00\x00"
|
|
b"\x00\x02\x00\xff\xff\xff"
|
|
b"\xff\x00\x00\x00")
|
|
|
|
def test_random_data_is_not_bson(self):
|
|
qcheck.check_unittest(self, qcheck.isnt(is_valid),
|
|
qcheck.gen_string(qcheck.gen_range(0, 40)))
|
|
|
|
def test_basic_decode(self):
|
|
self.assertEqual({"test": u"hello world"},
|
|
BSON(b"\x1B\x00\x00\x00\x0E\x74\x65\x73\x74\x00\x0C"
|
|
b"\x00\x00\x00\x68\x65\x6C\x6C\x6F\x20\x77\x6F"
|
|
b"\x72\x6C\x64\x00\x00").decode())
|
|
self.assertEqual([{"test": u"hello world"}, {}],
|
|
decode_all(b"\x1B\x00\x00\x00\x0E\x74\x65\x73\x74"
|
|
b"\x00\x0C\x00\x00\x00\x68\x65\x6C\x6C"
|
|
b"\x6f\x20\x77\x6F\x72\x6C\x64\x00\x00"
|
|
b"\x05\x00\x00\x00\x00"))
|
|
self.assertEqual([{"test": u"hello world"}, {}],
|
|
list(decode_iter(
|
|
b"\x1B\x00\x00\x00\x0E\x74\x65\x73\x74"
|
|
b"\x00\x0C\x00\x00\x00\x68\x65\x6C\x6C"
|
|
b"\x6f\x20\x77\x6F\x72\x6C\x64\x00\x00"
|
|
b"\x05\x00\x00\x00\x00")))
|
|
self.assertEqual([{"test": u"hello world"}, {}],
|
|
list(decode_file_iter(StringIO(
|
|
b"\x1B\x00\x00\x00\x0E\x74\x65\x73\x74"
|
|
b"\x00\x0C\x00\x00\x00\x68\x65\x6C\x6C"
|
|
b"\x6f\x20\x77\x6F\x72\x6C\x64\x00\x00"
|
|
b"\x05\x00\x00\x00\x00"))))
|
|
|
|
def test_invalid_decodes(self):
|
|
# Invalid object size (not enough bytes in document for even
|
|
# an object size of first object.
|
|
# NOTE: decode_all and decode_iter don't care, not sure if they should?
|
|
self.assertRaises(InvalidBSON, list,
|
|
decode_file_iter(StringIO(b"\x1B")))
|
|
|
|
# An object size that's too small to even include the object size,
|
|
# but is correctly encoded, along with a correct EOO (and no data).
|
|
data = b"\x01\x00\x00\x00\x00"
|
|
self.assertRaises(InvalidBSON, decode_all, data)
|
|
self.assertRaises(InvalidBSON, list, decode_iter(data))
|
|
self.assertRaises(InvalidBSON, list, decode_file_iter(StringIO(data)))
|
|
|
|
# One object, but with object size listed smaller than it is in the
|
|
# data.
|
|
data = (b"\x1A\x00\x00\x00\x0E\x74\x65\x73\x74"
|
|
b"\x00\x0C\x00\x00\x00\x68\x65\x6C\x6C"
|
|
b"\x6f\x20\x77\x6F\x72\x6C\x64\x00\x00"
|
|
b"\x05\x00\x00\x00\x00")
|
|
self.assertRaises(InvalidBSON, decode_all, data)
|
|
self.assertRaises(InvalidBSON, list, decode_iter(data))
|
|
self.assertRaises(InvalidBSON, list, decode_file_iter(StringIO(data)))
|
|
|
|
# One object, missing the EOO at the end.
|
|
data = (b"\x1B\x00\x00\x00\x0E\x74\x65\x73\x74"
|
|
b"\x00\x0C\x00\x00\x00\x68\x65\x6C\x6C"
|
|
b"\x6f\x20\x77\x6F\x72\x6C\x64\x00\x00"
|
|
b"\x05\x00\x00\x00")
|
|
self.assertRaises(InvalidBSON, decode_all, data)
|
|
self.assertRaises(InvalidBSON, list, decode_iter(data))
|
|
self.assertRaises(InvalidBSON, list, decode_file_iter(StringIO(data)))
|
|
|
|
# One object, sized correctly, with a spot for an EOO, but the EOO
|
|
# isn't 0x00.
|
|
data = (b"\x1B\x00\x00\x00\x0E\x74\x65\x73\x74"
|
|
b"\x00\x0C\x00\x00\x00\x68\x65\x6C\x6C"
|
|
b"\x6f\x20\x77\x6F\x72\x6C\x64\x00\x00"
|
|
b"\x05\x00\x00\x00\xFF")
|
|
self.assertRaises(InvalidBSON, decode_all, data)
|
|
self.assertRaises(InvalidBSON, list, decode_iter(data))
|
|
self.assertRaises(InvalidBSON, list, decode_file_iter(StringIO(data)))
|
|
|
|
def test_data_timestamp(self):
|
|
self.assertEqual({"test": Timestamp(4, 20)},
|
|
BSON(b"\x13\x00\x00\x00\x11\x74\x65\x73\x74\x00\x14"
|
|
b"\x00\x00\x00\x04\x00\x00\x00\x00").decode())
|
|
|
|
def test_basic_encode(self):
|
|
self.assertRaises(TypeError, BSON.encode, 100)
|
|
self.assertRaises(TypeError, BSON.encode, "hello")
|
|
self.assertRaises(TypeError, BSON.encode, None)
|
|
self.assertRaises(TypeError, BSON.encode, [])
|
|
|
|
self.assertEqual(BSON.encode({}), BSON(b"\x05\x00\x00\x00\x00"))
|
|
self.assertEqual(BSON.encode({"test": u"hello world"}),
|
|
b"\x1B\x00\x00\x00\x02\x74\x65\x73\x74\x00\x0C\x00"
|
|
b"\x00\x00\x68\x65\x6C\x6C\x6F\x20\x77\x6F\x72\x6C"
|
|
b"\x64\x00\x00")
|
|
self.assertEqual(BSON.encode({u"mike": 100}),
|
|
b"\x0F\x00\x00\x00\x10\x6D\x69\x6B\x65\x00\x64\x00"
|
|
b"\x00\x00\x00")
|
|
self.assertEqual(BSON.encode({"hello": 1.5}),
|
|
b"\x14\x00\x00\x00\x01\x68\x65\x6C\x6C\x6F\x00\x00"
|
|
b"\x00\x00\x00\x00\x00\xF8\x3F\x00")
|
|
self.assertEqual(BSON.encode({"true": True}),
|
|
b"\x0C\x00\x00\x00\x08\x74\x72\x75\x65\x00\x01\x00")
|
|
self.assertEqual(BSON.encode({"false": False}),
|
|
b"\x0D\x00\x00\x00\x08\x66\x61\x6C\x73\x65\x00\x00"
|
|
b"\x00")
|
|
self.assertEqual(BSON.encode({"empty": []}),
|
|
b"\x11\x00\x00\x00\x04\x65\x6D\x70\x74\x79\x00\x05"
|
|
b"\x00\x00\x00\x00\x00")
|
|
self.assertEqual(BSON.encode({"none": {}}),
|
|
b"\x10\x00\x00\x00\x03\x6E\x6F\x6E\x65\x00\x05\x00"
|
|
b"\x00\x00\x00\x00")
|
|
self.assertEqual(BSON.encode({"test": Binary(b"test", 0)}),
|
|
b"\x14\x00\x00\x00\x05\x74\x65\x73\x74\x00\x04\x00"
|
|
b"\x00\x00\x00\x74\x65\x73\x74\x00")
|
|
self.assertEqual(BSON.encode({"test": Binary(b"test", 2)}),
|
|
b"\x18\x00\x00\x00\x05\x74\x65\x73\x74\x00\x08\x00"
|
|
b"\x00\x00\x02\x04\x00\x00\x00\x74\x65\x73\x74\x00")
|
|
self.assertEqual(BSON.encode({"test": Binary(b"test", 128)}),
|
|
b"\x14\x00\x00\x00\x05\x74\x65\x73\x74\x00\x04\x00"
|
|
b"\x00\x00\x80\x74\x65\x73\x74\x00")
|
|
self.assertEqual(BSON.encode({"test": None}),
|
|
b"\x0B\x00\x00\x00\x0A\x74\x65\x73\x74\x00\x00")
|
|
self.assertEqual(BSON.encode({"date": datetime.datetime(2007, 1, 8,
|
|
0, 30, 11)}),
|
|
b"\x13\x00\x00\x00\x09\x64\x61\x74\x65\x00\x38\xBE"
|
|
b"\x1C\xFF\x0F\x01\x00\x00\x00")
|
|
self.assertEqual(BSON.encode({"regex": re.compile(b"a*b",
|
|
re.IGNORECASE)}),
|
|
b"\x12\x00\x00\x00\x0B\x72\x65\x67\x65\x78\x00\x61"
|
|
b"\x2A\x62\x00\x69\x00\x00")
|
|
self.assertEqual(BSON.encode({"$where": Code("test")}),
|
|
b"\x16\x00\x00\x00\r$where\x00\x05\x00\x00\x00test"
|
|
b"\x00\x00")
|
|
self.assertEqual(BSON.encode({"$field":
|
|
Code("function(){ return true;}", scope=None)}),
|
|
b"+\x00\x00\x00\r$field\x00\x1a\x00\x00\x00"
|
|
b"function(){ return true;}\x00\x00")
|
|
self.assertEqual(BSON.encode({"$field":
|
|
Code("return function(){ return x; }",
|
|
scope={'x': False})}),
|
|
b"=\x00\x00\x00\x0f$field\x000\x00\x00\x00\x1f\x00"
|
|
b"\x00\x00return function(){ return x; }\x00\t\x00"
|
|
b"\x00\x00\x08x\x00\x00\x00\x00")
|
|
a = ObjectId(b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B")
|
|
self.assertEqual(BSON.encode({"oid": a}),
|
|
b"\x16\x00\x00\x00\x07\x6F\x69\x64\x00\x00\x01\x02"
|
|
b"\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x00")
|
|
self.assertEqual(BSON.encode({"ref": DBRef("coll", a)}),
|
|
b"\x2F\x00\x00\x00\x03ref\x00\x25\x00\x00\x00\x02"
|
|
b"$ref\x00\x05\x00\x00\x00coll\x00\x07$id\x00\x00"
|
|
b"\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x00"
|
|
b"\x00")
|
|
|
|
def test_unknown_type(self):
|
|
# Repr value differs with major python version
|
|
part = "type %r for fieldname 'foo'" % (b'\x13',)
|
|
docs = [
|
|
b'\x0e\x00\x00\x00\x13foo\x00\x01\x00\x00\x00\x00',
|
|
(b'\x16\x00\x00\x00\x04foo\x00\x0c\x00\x00\x00\x130'
|
|
b'\x00\x01\x00\x00\x00\x00\x00'),
|
|
(b' \x00\x00\x00\x04bar\x00\x16\x00\x00\x00\x030\x00\x0e\x00\x00'
|
|
b'\x00\x13foo\x00\x01\x00\x00\x00\x00\x00\x00')]
|
|
for bs in docs:
|
|
try:
|
|
bson.BSON(bs).decode()
|
|
except Exception as exc:
|
|
self.assertTrue(isinstance(exc, InvalidBSON))
|
|
self.assertTrue(part in str(exc))
|
|
else:
|
|
self.fail("Failed to raise an exception.")
|
|
|
|
def test_dbpointer(self):
|
|
# *Note* - DBPointer and DBRef are *not* the same thing. DBPointer
|
|
# is a deprecated BSON type. DBRef is a convention that does not
|
|
# exist in the BSON spec, meant to replace DBPointer. PyMongo does
|
|
# not support creation of the DBPointer type, but will decode
|
|
# DBPointer to DBRef.
|
|
|
|
bs = (b"\x18\x00\x00\x00\x0c\x00\x01\x00\x00"
|
|
b"\x00\x00RY\xb5j\xfa[\xd8A\xd6X]\x99\x00")
|
|
|
|
self.assertEqual({'': DBRef('', ObjectId('5259b56afa5bd841d6585d99'))},
|
|
bson.BSON(bs).decode())
|
|
|
|
def test_bad_dbref(self):
|
|
ref_only = {'ref': {'$ref': 'collection'}}
|
|
id_only = {'ref': {'$id': ObjectId()}}
|
|
|
|
self.assertEqual(DBRef('collection', id=None),
|
|
BSON.encode(ref_only).decode()['ref'])
|
|
self.assertEqual(id_only, BSON.encode(id_only).decode())
|
|
|
|
def test_bytes_as_keys(self):
|
|
doc = {b"foo": 'bar'}
|
|
# Since `bytes` are stored as Binary you can't use them
|
|
# as keys in python 3.x. Using binary data as a key makes
|
|
# no sense in BSON anyway and little sense in python.
|
|
if PY3:
|
|
self.assertRaises(InvalidDocument, BSON.encode, doc)
|
|
else:
|
|
self.assertTrue(BSON.encode(doc))
|
|
|
|
def test_datetime_encode_decode(self):
|
|
# Negative timestamps
|
|
dt1 = datetime.datetime(1, 1, 1, 1, 1, 1, 111000)
|
|
dt2 = BSON.encode({"date": dt1}).decode()["date"]
|
|
self.assertEqual(dt1, dt2)
|
|
|
|
dt1 = datetime.datetime(1959, 6, 25, 12, 16, 59, 999000)
|
|
dt2 = BSON.encode({"date": dt1}).decode()["date"]
|
|
self.assertEqual(dt1, dt2)
|
|
|
|
# Positive timestamps
|
|
dt1 = datetime.datetime(9999, 12, 31, 23, 59, 59, 999000)
|
|
dt2 = BSON.encode({"date": dt1}).decode()["date"]
|
|
self.assertEqual(dt1, dt2)
|
|
|
|
dt1 = datetime.datetime(2011, 6, 14, 10, 47, 53, 444000)
|
|
dt2 = BSON.encode({"date": dt1}).decode()["date"]
|
|
self.assertEqual(dt1, dt2)
|
|
|
|
def test_aware_datetime(self):
|
|
aware = datetime.datetime(1993, 4, 4, 2,
|
|
tzinfo=FixedOffset(555, "SomeZone"))
|
|
as_utc = (aware - aware.utcoffset()).replace(tzinfo=utc)
|
|
self.assertEqual(datetime.datetime(1993, 4, 3, 16, 45, tzinfo=utc),
|
|
as_utc)
|
|
after = BSON.encode({"date": aware}).decode(
|
|
CodecOptions(tz_aware=True))["date"]
|
|
self.assertEqual(utc, after.tzinfo)
|
|
self.assertEqual(as_utc, after)
|
|
|
|
def test_local_datetime(self):
|
|
# Timezone -60 minutes of UTC, with DST between April and July.
|
|
tz = DSTAwareTimezone(60, "sixty-minutes", 4, 7)
|
|
|
|
# It's not DST.
|
|
local = datetime.datetime(year=2025, month=12, hour=2, day=1,
|
|
tzinfo=tz)
|
|
options = CodecOptions(tz_aware=True, tzinfo=tz)
|
|
# Encode with this timezone, then decode to UTC.
|
|
encoded = BSON.encode({'date': local}, codec_options=options)
|
|
self.assertEqual(local.replace(hour=1, tzinfo=None),
|
|
encoded.decode()['date'])
|
|
|
|
# It's DST.
|
|
local = datetime.datetime(year=2025, month=4, hour=1, day=1,
|
|
tzinfo=tz)
|
|
encoded = BSON.encode({'date': local}, codec_options=options)
|
|
self.assertEqual(local.replace(month=3, day=31, hour=23, tzinfo=None),
|
|
encoded.decode()['date'])
|
|
|
|
# Encode UTC, then decode in a different timezone.
|
|
encoded = BSON.encode({'date': local.replace(tzinfo=utc)})
|
|
decoded = encoded.decode(options)['date']
|
|
self.assertEqual(local.replace(hour=3), decoded)
|
|
self.assertEqual(tz, decoded.tzinfo)
|
|
|
|
# Test round-tripping.
|
|
self.assertEqual(
|
|
local,
|
|
(BSON
|
|
.encode({'date': local}, codec_options=options)
|
|
.decode(options)['date']))
|
|
|
|
# Test around the Unix Epoch.
|
|
epochs = (
|
|
EPOCH_AWARE,
|
|
EPOCH_AWARE.astimezone(FixedOffset(120, 'one twenty')),
|
|
EPOCH_AWARE.astimezone(FixedOffset(-120, 'minus one twenty'))
|
|
)
|
|
utc_co = CodecOptions(tz_aware=True)
|
|
for epoch in epochs:
|
|
doc = {'epoch': epoch}
|
|
# We always retrieve datetimes in UTC unless told to do otherwise.
|
|
self.assertEqual(
|
|
EPOCH_AWARE,
|
|
BSON.encode(doc).decode(codec_options=utc_co)['epoch'])
|
|
# Round-trip the epoch.
|
|
local_co = CodecOptions(tz_aware=True, tzinfo=epoch.tzinfo)
|
|
self.assertEqual(
|
|
epoch,
|
|
BSON.encode(doc).decode(codec_options=local_co)['epoch'])
|
|
|
|
def test_naive_decode(self):
|
|
aware = datetime.datetime(1993, 4, 4, 2,
|
|
tzinfo=FixedOffset(555, "SomeZone"))
|
|
naive_utc = (aware - aware.utcoffset()).replace(tzinfo=None)
|
|
self.assertEqual(datetime.datetime(1993, 4, 3, 16, 45), naive_utc)
|
|
after = BSON.encode({"date": aware}).decode()["date"]
|
|
self.assertEqual(None, after.tzinfo)
|
|
self.assertEqual(naive_utc, after)
|
|
|
|
def test_dst(self):
|
|
d = {"x": datetime.datetime(1993, 4, 4, 2)}
|
|
self.assertEqual(d, BSON.encode(d).decode())
|
|
|
|
def test_bad_encode(self):
|
|
if not PY3:
|
|
# Python3 treats this as a unicode string which won't raise
|
|
# an exception. If we passed the string as bytes instead we
|
|
# still wouldn't get an error since we store bytes as BSON
|
|
# binary subtype 0.
|
|
self.assertRaises(InvalidStringData, BSON.encode,
|
|
{"lalala": '\xf4\xe0\xf0\xe1\xc0 Color Touch'})
|
|
evil_list = {'a': []}
|
|
evil_list['a'].append(evil_list)
|
|
evil_dict = {}
|
|
evil_dict['a'] = evil_dict
|
|
# Work around what seems like a regression in python 3.5.0.
|
|
# See http://bugs.python.org/issue25222
|
|
# 100 is an arbitrary choice. The default is 1000 on the machines
|
|
# I have access to.
|
|
depth = sys.getrecursionlimit()
|
|
sys.setrecursionlimit(100)
|
|
try:
|
|
for evil_data in [evil_dict, evil_list]:
|
|
self.assertRaises(RuntimeError, BSON.encode, evil_data)
|
|
finally:
|
|
sys.setrecursionlimit(depth)
|
|
|
|
def test_overflow(self):
|
|
self.assertTrue(BSON.encode({"x": long(9223372036854775807)}))
|
|
self.assertRaises(OverflowError, BSON.encode,
|
|
{"x": long(9223372036854775808)})
|
|
|
|
self.assertTrue(BSON.encode({"x": long(-9223372036854775808)}))
|
|
self.assertRaises(OverflowError, BSON.encode,
|
|
{"x": long(-9223372036854775809)})
|
|
|
|
def test_small_long_encode_decode(self):
|
|
encoded1 = BSON.encode({'x': 256})
|
|
decoded1 = BSON.decode(encoded1)['x']
|
|
self.assertEqual(256, decoded1)
|
|
self.assertEqual(type(256), type(decoded1))
|
|
|
|
encoded2 = BSON.encode({'x': Int64(256)})
|
|
decoded2 = BSON.decode(encoded2)['x']
|
|
expected = Int64(256)
|
|
self.assertEqual(expected, decoded2)
|
|
self.assertEqual(type(expected), type(decoded2))
|
|
|
|
self.assertNotEqual(type(decoded1), type(decoded2))
|
|
|
|
def test_tuple(self):
|
|
self.assertEqual({"tuple": [1, 2]},
|
|
BSON.encode({"tuple": (1, 2)}).decode())
|
|
|
|
def test_uuid(self):
|
|
|
|
id = uuid.uuid4()
|
|
transformed_id = (BSON.encode({"id": id})).decode()["id"]
|
|
|
|
self.assertTrue(isinstance(transformed_id, uuid.UUID))
|
|
self.assertEqual(id, transformed_id)
|
|
self.assertNotEqual(uuid.uuid4(), transformed_id)
|
|
|
|
def test_uuid_legacy(self):
|
|
|
|
id = uuid.uuid4()
|
|
legacy = UUIDLegacy(id)
|
|
self.assertEqual(3, legacy.subtype)
|
|
transformed = (BSON.encode({"uuid": legacy})).decode()["uuid"]
|
|
self.assertTrue(isinstance(transformed, uuid.UUID))
|
|
self.assertEqual(id, transformed)
|
|
self.assertNotEqual(UUIDLegacy(uuid.uuid4()), UUIDLegacy(transformed))
|
|
|
|
# The C extension was segfaulting on unicode RegExs, so we have this test
|
|
# that doesn't really test anything but the lack of a segfault.
|
|
def test_unicode_regex(self):
|
|
regex = re.compile(u'revisi\xf3n')
|
|
BSON.encode({"regex": regex}).decode()
|
|
|
|
def test_non_string_keys(self):
|
|
self.assertRaises(InvalidDocument, BSON.encode, {8.9: "test"})
|
|
|
|
def test_utf8(self):
|
|
w = {u"aéあ": u"aéあ"}
|
|
self.assertEqual(w, BSON.encode(w).decode())
|
|
|
|
# b'a\xe9' == u"aé".encode("iso-8859-1")
|
|
iso8859_bytes = b'a\xe9'
|
|
y = {"hello": iso8859_bytes}
|
|
if PY3:
|
|
# Stored as BSON binary subtype 0.
|
|
out = BSON.encode(y).decode()
|
|
self.assertTrue(isinstance(out['hello'], bytes))
|
|
self.assertEqual(out['hello'], iso8859_bytes)
|
|
else:
|
|
# Python 2.
|
|
try:
|
|
BSON.encode(y)
|
|
except InvalidStringData as e:
|
|
self.assertTrue(repr(iso8859_bytes) in str(e))
|
|
|
|
# The next two tests only make sense in python 2.x since
|
|
# you can't use `bytes` type as document keys in python 3.x.
|
|
x = {u"aéあ".encode("utf-8"): u"aéあ".encode("utf-8")}
|
|
self.assertEqual(w, BSON.encode(x).decode())
|
|
|
|
z = {iso8859_bytes: "hello"}
|
|
self.assertRaises(InvalidStringData, BSON.encode, z)
|
|
|
|
def test_null_character(self):
|
|
doc = {"a": "\x00"}
|
|
self.assertEqual(doc, BSON.encode(doc).decode())
|
|
|
|
# This test doesn't make much sense in Python2
|
|
# since {'a': '\x00'} == {'a': u'\x00'}.
|
|
# Decoding here actually returns {'a': '\x00'}
|
|
doc = {"a": u"\x00"}
|
|
self.assertEqual(doc, BSON.encode(doc).decode())
|
|
|
|
self.assertRaises(InvalidDocument, BSON.encode, {b"\x00": "a"})
|
|
self.assertRaises(InvalidDocument, BSON.encode, {u"\x00": "a"})
|
|
|
|
self.assertRaises(InvalidDocument, BSON.encode,
|
|
{"a": re.compile(b"ab\x00c")})
|
|
self.assertRaises(InvalidDocument, BSON.encode,
|
|
{"a": re.compile(u"ab\x00c")})
|
|
|
|
def test_move_id(self):
|
|
self.assertEqual(b"\x19\x00\x00\x00\x02_id\x00\x02\x00\x00\x00a\x00"
|
|
b"\x02a\x00\x02\x00\x00\x00a\x00\x00",
|
|
BSON.encode(SON([("a", "a"), ("_id", "a")])))
|
|
|
|
self.assertEqual(b"\x2c\x00\x00\x00"
|
|
b"\x02_id\x00\x02\x00\x00\x00b\x00"
|
|
b"\x03b\x00"
|
|
b"\x19\x00\x00\x00\x02a\x00\x02\x00\x00\x00a\x00"
|
|
b"\x02_id\x00\x02\x00\x00\x00a\x00\x00\x00",
|
|
BSON.encode(SON([("b",
|
|
SON([("a", "a"), ("_id", "a")])),
|
|
("_id", "b")])))
|
|
|
|
def test_dates(self):
|
|
doc = {"early": datetime.datetime(1686, 5, 5),
|
|
"late": datetime.datetime(2086, 5, 5)}
|
|
try:
|
|
self.assertEqual(doc, BSON.encode(doc).decode())
|
|
except ValueError:
|
|
# Ignore ValueError when no C ext, since it's probably
|
|
# a problem w/ 32-bit Python - we work around this in the
|
|
# C ext, though.
|
|
if bson.has_c():
|
|
raise
|
|
|
|
def test_custom_class(self):
|
|
self.assertIsInstance(BSON.encode({}).decode(), dict)
|
|
self.assertNotIsInstance(BSON.encode({}).decode(), SON)
|
|
self.assertIsInstance(
|
|
BSON.encode({}).decode(CodecOptions(document_class=SON)),
|
|
SON)
|
|
|
|
self.assertEqual(
|
|
1,
|
|
BSON.encode({"x": 1}).decode(
|
|
CodecOptions(document_class=SON))["x"])
|
|
|
|
x = BSON.encode({"x": [{"y": 1}]})
|
|
self.assertIsInstance(
|
|
x.decode(CodecOptions(document_class=SON))["x"][0], SON)
|
|
|
|
def test_subclasses(self):
|
|
# make sure we can serialize subclasses of native Python types.
|
|
class _myint(int):
|
|
pass
|
|
|
|
class _myfloat(float):
|
|
pass
|
|
|
|
class _myunicode(text_type):
|
|
pass
|
|
|
|
d = {'a': _myint(42), 'b': _myfloat(63.9),
|
|
'c': _myunicode('hello world')
|
|
}
|
|
d2 = BSON.encode(d).decode()
|
|
for key, value in iteritems(d2):
|
|
orig_value = d[key]
|
|
orig_type = orig_value.__class__.__bases__[0]
|
|
self.assertEqual(type(value), orig_type)
|
|
self.assertEqual(value, orig_type(value))
|
|
|
|
def test_ordered_dict(self):
|
|
try:
|
|
from collections import OrderedDict
|
|
except ImportError:
|
|
raise SkipTest("No OrderedDict")
|
|
d = OrderedDict([("one", 1), ("two", 2), ("three", 3), ("four", 4)])
|
|
self.assertEqual(
|
|
d,
|
|
BSON.encode(d).decode(CodecOptions(document_class=OrderedDict)))
|
|
|
|
def test_bson_regex(self):
|
|
# Invalid Python regex, though valid PCRE.
|
|
bson_re1 = Regex(r'[\w-\.]')
|
|
self.assertEqual(r'[\w-\.]', bson_re1.pattern)
|
|
self.assertEqual(0, bson_re1.flags)
|
|
|
|
doc1 = {'r': bson_re1}
|
|
doc1_bson = (
|
|
b'\x11\x00\x00\x00' # document length
|
|
b'\x0br\x00[\\w-\\.]\x00\x00' # r: regex
|
|
b'\x00') # document terminator
|
|
|
|
self.assertEqual(doc1_bson, BSON.encode(doc1))
|
|
self.assertEqual(doc1, BSON(doc1_bson).decode())
|
|
|
|
# Valid Python regex, with flags.
|
|
re2 = re.compile(u'.*', re.I | re.M | re.S | re.U | re.X)
|
|
bson_re2 = Regex(u'.*', re.I | re.M | re.S | re.U | re.X)
|
|
|
|
doc2_with_re = {'r': re2}
|
|
doc2_with_bson_re = {'r': bson_re2}
|
|
doc2_bson = (
|
|
b"\x11\x00\x00\x00" # document length
|
|
b"\x0br\x00.*\x00imsux\x00" # r: regex
|
|
b"\x00") # document terminator
|
|
|
|
self.assertEqual(doc2_bson, BSON.encode(doc2_with_re))
|
|
self.assertEqual(doc2_bson, BSON.encode(doc2_with_bson_re))
|
|
|
|
self.assertEqual(re2.pattern, BSON(doc2_bson).decode()['r'].pattern)
|
|
self.assertEqual(re2.flags, BSON(doc2_bson).decode()['r'].flags)
|
|
|
|
def test_regex_from_native(self):
|
|
self.assertEqual('.*', Regex.from_native(re.compile('.*')).pattern)
|
|
self.assertEqual(0, Regex.from_native(re.compile(b'')).flags)
|
|
|
|
regex = re.compile(b'', re.I | re.L | re.M | re.S | re.X)
|
|
self.assertEqual(
|
|
re.I | re.L | re.M | re.S | re.X,
|
|
Regex.from_native(regex).flags)
|
|
|
|
unicode_regex = re.compile('', re.U)
|
|
self.assertEqual(re.U, Regex.from_native(unicode_regex).flags)
|
|
|
|
def test_regex_hash(self):
|
|
self.assertRaises(TypeError, hash, Regex('hello'))
|
|
|
|
def test_exception_wrapping(self):
|
|
# No matter what exception is raised while trying to decode BSON,
|
|
# the final exception always matches InvalidBSON.
|
|
|
|
# {'s': '\xff'}, will throw attempting to decode utf-8.
|
|
bad_doc = b'\x0f\x00\x00\x00\x02s\x00\x03\x00\x00\x00\xff\x00\x00\x00'
|
|
|
|
with self.assertRaises(InvalidBSON) as context:
|
|
decode_all(bad_doc)
|
|
|
|
self.assertIn("codec can't decode byte 0xff",
|
|
str(context.exception))
|
|
|
|
def test_minkey_maxkey_comparison(self):
|
|
# MinKey's <, <=, >, >=, !=, and ==.
|
|
self.assertTrue(MinKey() < None)
|
|
self.assertTrue(MinKey() < 1)
|
|
self.assertTrue(MinKey() <= 1)
|
|
self.assertTrue(MinKey() <= MinKey())
|
|
self.assertFalse(MinKey() > None)
|
|
self.assertFalse(MinKey() > 1)
|
|
self.assertFalse(MinKey() >= 1)
|
|
self.assertTrue(MinKey() >= MinKey())
|
|
self.assertTrue(MinKey() != 1)
|
|
self.assertFalse(MinKey() == 1)
|
|
self.assertTrue(MinKey() == MinKey())
|
|
|
|
# MinKey compared to MaxKey.
|
|
self.assertTrue(MinKey() < MaxKey())
|
|
self.assertTrue(MinKey() <= MaxKey())
|
|
self.assertFalse(MinKey() > MaxKey())
|
|
self.assertFalse(MinKey() >= MaxKey())
|
|
self.assertTrue(MinKey() != MaxKey())
|
|
self.assertFalse(MinKey() == MaxKey())
|
|
|
|
# MaxKey's <, <=, >, >=, !=, and ==.
|
|
self.assertFalse(MaxKey() < None)
|
|
self.assertFalse(MaxKey() < 1)
|
|
self.assertFalse(MaxKey() <= 1)
|
|
self.assertTrue(MaxKey() <= MaxKey())
|
|
self.assertTrue(MaxKey() > None)
|
|
self.assertTrue(MaxKey() > 1)
|
|
self.assertTrue(MaxKey() >= 1)
|
|
self.assertTrue(MaxKey() >= MaxKey())
|
|
self.assertTrue(MaxKey() != 1)
|
|
self.assertFalse(MaxKey() == 1)
|
|
self.assertTrue(MaxKey() == MaxKey())
|
|
|
|
# MaxKey compared to MinKey.
|
|
self.assertFalse(MaxKey() < MinKey())
|
|
self.assertFalse(MaxKey() <= MinKey())
|
|
self.assertTrue(MaxKey() > MinKey())
|
|
self.assertTrue(MaxKey() >= MinKey())
|
|
self.assertTrue(MaxKey() != MinKey())
|
|
self.assertFalse(MaxKey() == MinKey())
|
|
|
|
def test_minkey_maxkey_hash(self):
|
|
self.assertEqual(hash(MaxKey()), hash(MaxKey()))
|
|
self.assertEqual(hash(MinKey()), hash(MinKey()))
|
|
self.assertNotEqual(hash(MaxKey()), hash(MinKey()))
|
|
|
|
def test_timestamp_comparison(self):
|
|
# Timestamp is initialized with time, inc. Time is the more
|
|
# significant comparand.
|
|
self.assertTrue(Timestamp(1, 0) < Timestamp(2, 17))
|
|
self.assertTrue(Timestamp(2, 0) > Timestamp(1, 0))
|
|
self.assertTrue(Timestamp(1, 7) <= Timestamp(2, 0))
|
|
self.assertTrue(Timestamp(2, 0) >= Timestamp(1, 1))
|
|
self.assertTrue(Timestamp(2, 0) <= Timestamp(2, 0))
|
|
self.assertTrue(Timestamp(2, 0) >= Timestamp(2, 0))
|
|
self.assertFalse(Timestamp(1, 0) > Timestamp(2, 0))
|
|
|
|
# Comparison by inc.
|
|
self.assertTrue(Timestamp(1, 0) < Timestamp(1, 1))
|
|
self.assertTrue(Timestamp(1, 1) > Timestamp(1, 0))
|
|
self.assertTrue(Timestamp(1, 0) <= Timestamp(1, 0))
|
|
self.assertTrue(Timestamp(1, 0) <= Timestamp(1, 1))
|
|
self.assertFalse(Timestamp(1, 0) >= Timestamp(1, 1))
|
|
self.assertTrue(Timestamp(1, 0) >= Timestamp(1, 0))
|
|
self.assertTrue(Timestamp(1, 1) >= Timestamp(1, 0))
|
|
self.assertFalse(Timestamp(1, 1) <= Timestamp(1, 0))
|
|
self.assertTrue(Timestamp(1, 0) <= Timestamp(1, 0))
|
|
self.assertFalse(Timestamp(1, 0) > Timestamp(1, 0))
|
|
|
|
def test_bad_id_keys(self):
|
|
self.assertRaises(InvalidDocument, BSON.encode,
|
|
{"_id": {"$bad": 123}}, True)
|
|
self.assertRaises(InvalidDocument, BSON.encode,
|
|
{"_id": {'$oid': "52d0b971b3ba219fdeb4170e"}}, True)
|
|
BSON.encode({"_id": {'$oid': "52d0b971b3ba219fdeb4170e"}})
|
|
|
|
|
|
class TestCodecOptions(unittest.TestCase):
|
|
def test_document_class(self):
|
|
self.assertRaises(TypeError, CodecOptions, document_class=object)
|
|
self.assertIs(SON, CodecOptions(document_class=SON).document_class)
|
|
|
|
def test_tz_aware(self):
|
|
self.assertRaises(TypeError, CodecOptions, tz_aware=1)
|
|
self.assertFalse(CodecOptions().tz_aware)
|
|
self.assertTrue(CodecOptions(tz_aware=True).tz_aware)
|
|
|
|
def test_uuid_representation(self):
|
|
self.assertRaises(ValueError, CodecOptions, uuid_representation=None)
|
|
self.assertRaises(ValueError, CodecOptions, uuid_representation=7)
|
|
self.assertRaises(ValueError, CodecOptions, uuid_representation=2)
|
|
|
|
def test_tzinfo(self):
|
|
self.assertRaises(TypeError, CodecOptions, tzinfo='pacific')
|
|
tz = FixedOffset(42, 'forty-two')
|
|
self.assertRaises(ValueError, CodecOptions, tzinfo=tz)
|
|
self.assertEqual(tz, CodecOptions(tz_aware=True, tzinfo=tz).tzinfo)
|
|
|
|
def test_codec_options_repr(self):
|
|
r = ("CodecOptions(document_class=dict, tz_aware=False, "
|
|
"uuid_representation=PYTHON_LEGACY, "
|
|
"unicode_decode_error_handler='strict', "
|
|
"tzinfo=None)")
|
|
self.assertEqual(r, repr(CodecOptions()))
|
|
|
|
def test_decode_all_defaults(self):
|
|
# Test decode_all()'s default document_class is dict and tz_aware is
|
|
# False. The default uuid_representation is PYTHON_LEGACY but this
|
|
# decodes same as STANDARD, so all this test proves about UUID decoding
|
|
# is that it's not CSHARP_LEGACY or JAVA_LEGACY.
|
|
doc = {'sub_document': {},
|
|
'uuid': uuid.uuid4(),
|
|
'dt': datetime.datetime.utcnow()}
|
|
|
|
decoded = bson.decode_all(bson.BSON.encode(doc))[0]
|
|
self.assertIsInstance(decoded['sub_document'], dict)
|
|
self.assertEqual(decoded['uuid'], doc['uuid'])
|
|
self.assertIsNone(decoded['dt'].tzinfo)
|
|
|
|
def test_unicode_decode_error_handler(self):
|
|
enc = BSON.encode({"keystr": "foobar"})
|
|
|
|
# Test handling of bad key value.
|
|
invalid_key = BSON(enc[:7] + b'\xe9' + enc[8:])
|
|
replaced_key = b'ke\xef\xbf\xbdstr'.decode('utf-8')
|
|
|
|
dec = BSON.decode(invalid_key, CodecOptions(
|
|
unicode_decode_error_handler="replace"))
|
|
self.assertEqual(dec, {replaced_key: u"foobar"})
|
|
|
|
dec = BSON.decode(invalid_key, CodecOptions(
|
|
unicode_decode_error_handler="ignore"))
|
|
self.assertEqual(dec, {"kestr": "foobar"})
|
|
|
|
self.assertRaises(InvalidBSON, BSON.decode, invalid_key, CodecOptions(
|
|
unicode_decode_error_handler="strict"))
|
|
self.assertRaises(InvalidBSON, BSON.decode, invalid_key,
|
|
CodecOptions())
|
|
self.assertRaises(InvalidBSON, BSON.decode, invalid_key)
|
|
|
|
# Test handing of bad string value.
|
|
invalid_val = BSON(enc[:18] + b'\xe9' + enc[19:])
|
|
replaced_val = b'fo\xef\xbf\xbdbar'.decode('utf-8')
|
|
|
|
dec = BSON.decode(invalid_val, CodecOptions(
|
|
unicode_decode_error_handler="replace"))
|
|
self.assertEqual(dec, {"keystr": replaced_val})
|
|
|
|
dec = BSON.decode(invalid_val, CodecOptions(
|
|
unicode_decode_error_handler="ignore"))
|
|
self.assertEqual(dec, {"keystr": "fobar"})
|
|
|
|
self.assertRaises(InvalidBSON, BSON.decode, invalid_val, CodecOptions(
|
|
unicode_decode_error_handler="strict"))
|
|
self.assertRaises(InvalidBSON, BSON.decode, invalid_val,
|
|
CodecOptions())
|
|
self.assertRaises(InvalidBSON, BSON.decode, invalid_val)
|
|
|
|
# Test handing bad key + bad value.
|
|
invalid_both = BSON(
|
|
enc[:7] + b'\xe9' + enc[8:18] + b'\xe9' + enc[19:])
|
|
|
|
dec = BSON.decode(invalid_both, CodecOptions(
|
|
unicode_decode_error_handler="replace"))
|
|
self.assertEqual(dec, {replaced_key: replaced_val})
|
|
|
|
dec = BSON.decode(invalid_both, CodecOptions(
|
|
unicode_decode_error_handler="ignore"))
|
|
self.assertEqual(dec, {"kestr": "fobar"})
|
|
|
|
self.assertRaises(InvalidBSON, BSON.decode, invalid_both, CodecOptions(
|
|
unicode_decode_error_handler="strict"))
|
|
self.assertRaises(InvalidBSON, BSON.decode, invalid_both,
|
|
CodecOptions())
|
|
self.assertRaises(InvalidBSON, BSON.decode, invalid_both)
|
|
|
|
# Test handling bad error mode.
|
|
dec = BSON.decode(enc, CodecOptions(
|
|
unicode_decode_error_handler="junk"))
|
|
self.assertEqual(dec, {"keystr": "foobar"})
|
|
|
|
self.assertRaises(InvalidBSON, BSON.decode, invalid_both,
|
|
CodecOptions(unicode_decode_error_handler="junk"))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|