mongo-python-driver/test/test_json_util.py

581 lines
23 KiB
Python

# Copyright 2009-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test some utilities for working with JSON and PyMongo."""
import datetime
import json
import re
import sys
import uuid
from typing import Any, List, MutableMapping
from bson.codec_options import CodecOptions, DatetimeConversion
sys.path[0:0] = [""]
from test import IntegrationTest, unittest
from bson import EPOCH_AWARE, EPOCH_NAIVE, SON, DatetimeMS, json_util
from bson.binary import (
ALL_UUID_REPRESENTATIONS,
MD5_SUBTYPE,
STANDARD,
USER_DEFINED_SUBTYPE,
Binary,
UuidRepresentation,
)
from bson.code import Code
from bson.datetime_ms import _max_datetime_ms
from bson.dbref import DBRef
from bson.int64 import Int64
from bson.json_util import (
LEGACY_JSON_OPTIONS,
DatetimeRepresentation,
JSONMode,
JSONOptions,
)
from bson.max_key import MaxKey
from bson.min_key import MinKey
from bson.objectid import ObjectId
from bson.regex import Regex
from bson.timestamp import Timestamp
from bson.tz_util import FixedOffset, utc
STRICT_JSON_OPTIONS = JSONOptions(
strict_number_long=True,
datetime_representation=DatetimeRepresentation.ISO8601,
strict_uuid=True,
json_mode=JSONMode.LEGACY,
)
class TestJsonUtil(unittest.TestCase):
def round_tripped(self, doc, **kwargs):
return json_util.loads(json_util.dumps(doc, **kwargs), **kwargs)
def round_trip(self, doc, **kwargs):
self.assertEqual(doc, self.round_tripped(doc, **kwargs))
def test_basic(self):
self.round_trip({"hello": "world"})
def test_loads_bytes(self):
string = b'{"hello": "world"}'
self.assertEqual(json_util.loads(bytes(string)), {"hello": "world"})
self.assertEqual(json_util.loads(bytearray(string)), {"hello": "world"})
def test_json_options_with_options(self):
opts = JSONOptions(
datetime_representation=DatetimeRepresentation.NUMBERLONG, json_mode=JSONMode.LEGACY
)
self.assertEqual(opts.datetime_representation, DatetimeRepresentation.NUMBERLONG)
opts2 = opts.with_options(
datetime_representation=DatetimeRepresentation.ISO8601, json_mode=JSONMode.LEGACY
)
self.assertEqual(opts2.datetime_representation, DatetimeRepresentation.ISO8601)
opts = JSONOptions(strict_number_long=True, json_mode=JSONMode.LEGACY)
self.assertEqual(opts.strict_number_long, True)
opts2 = opts.with_options(strict_number_long=False)
self.assertEqual(opts2.strict_number_long, False)
opts = json_util.CANONICAL_JSON_OPTIONS
self.assertNotEqual(opts.uuid_representation, UuidRepresentation.JAVA_LEGACY)
opts2 = opts.with_options(uuid_representation=UuidRepresentation.JAVA_LEGACY)
self.assertEqual(opts2.uuid_representation, UuidRepresentation.JAVA_LEGACY)
self.assertEqual(opts2.document_class, dict)
opts3 = opts2.with_options(document_class=SON)
self.assertEqual(opts3.uuid_representation, UuidRepresentation.JAVA_LEGACY)
self.assertEqual(opts3.document_class, SON)
def test_objectid(self):
self.round_trip({"id": ObjectId()})
def test_dbref(self):
self.round_trip({"ref": DBRef("foo", 5)})
self.round_trip({"ref": DBRef("foo", 5, "db")})
self.round_trip({"ref": DBRef("foo", ObjectId())})
# Check order.
self.assertEqual(
'{"$ref": "collection", "$id": 1, "$db": "db"}',
json_util.dumps(DBRef("collection", 1, "db")),
)
def test_datetime(self):
tz_aware_opts = json_util.DEFAULT_JSON_OPTIONS.with_options(tz_aware=True)
# only millis, not micros
self.round_trip(
{"date": datetime.datetime(2009, 12, 9, 15, 49, 45, 191000, utc)},
json_options=tz_aware_opts,
)
self.round_trip({"date": datetime.datetime(2009, 12, 9, 15, 49, 45, 191000)})
for jsn in [
'{"dt": { "$date" : "1970-01-01T00:00:00.000+0000"}}',
'{"dt": { "$date" : "1970-01-01T00:00:00.000000+0000"}}',
'{"dt": { "$date" : "1970-01-01T00:00:00.000+00:00"}}',
'{"dt": { "$date" : "1970-01-01T00:00:00.000000+00:00"}}',
'{"dt": { "$date" : "1970-01-01T00:00:00.000000+00"}}',
'{"dt": { "$date" : "1970-01-01T00:00:00.000Z"}}',
'{"dt": { "$date" : "1970-01-01T00:00:00.000000Z"}}',
'{"dt": { "$date" : "1970-01-01T00:00:00Z"}}',
'{"dt": {"$date": "1970-01-01T00:00:00.000"}}',
'{"dt": { "$date" : "1970-01-01T00:00:00"}}',
'{"dt": { "$date" : "1970-01-01T00:00:00.000000"}}',
'{"dt": { "$date" : "1969-12-31T16:00:00.000-0800"}}',
'{"dt": { "$date" : "1969-12-31T16:00:00.000000-0800"}}',
'{"dt": { "$date" : "1969-12-31T16:00:00.000-08:00"}}',
'{"dt": { "$date" : "1969-12-31T16:00:00.000000-08:00"}}',
'{"dt": { "$date" : "1969-12-31T16:00:00.000000-08"}}',
'{"dt": { "$date" : "1970-01-01T01:00:00.000+0100"}}',
'{"dt": { "$date" : "1970-01-01T01:00:00.000000+0100"}}',
'{"dt": { "$date" : "1970-01-01T01:00:00.000+01:00"}}',
'{"dt": { "$date" : "1970-01-01T01:00:00.000000+01:00"}}',
'{"dt": { "$date" : "1970-01-01T01:00:00.000000+01"}}',
]:
self.assertEqual(EPOCH_AWARE, json_util.loads(jsn, json_options=tz_aware_opts)["dt"])
self.assertEqual(EPOCH_NAIVE, json_util.loads(jsn)["dt"])
dtm = datetime.datetime(1, 1, 1, 1, 1, 1, 0, utc)
jsn = '{"dt": {"$date": -62135593139000}}'
self.assertEqual(dtm, json_util.loads(jsn, json_options=tz_aware_opts)["dt"])
jsn = '{"dt": {"$date": {"$numberLong": "-62135593139000"}}}'
self.assertEqual(dtm, json_util.loads(jsn, json_options=tz_aware_opts)["dt"])
# Test dumps format
pre_epoch = {"dt": datetime.datetime(1, 1, 1, 1, 1, 1, 10000, utc)}
post_epoch = {"dt": datetime.datetime(1972, 1, 1, 1, 1, 1, 10000, utc)}
self.assertEqual(
'{"dt": {"$date": {"$numberLong": "-62135593138990"}}}', json_util.dumps(pre_epoch)
)
self.assertEqual(
'{"dt": {"$date": "1972-01-01T01:01:01.010Z"}}', json_util.dumps(post_epoch)
)
self.assertEqual(
'{"dt": {"$date": -62135593138990}}',
json_util.dumps(pre_epoch, json_options=LEGACY_JSON_OPTIONS),
)
self.assertEqual(
'{"dt": {"$date": 63075661010}}',
json_util.dumps(post_epoch, json_options=LEGACY_JSON_OPTIONS),
)
self.assertEqual(
'{"dt": {"$date": {"$numberLong": "-62135593138990"}}}',
json_util.dumps(pre_epoch, json_options=STRICT_JSON_OPTIONS),
)
self.assertEqual(
'{"dt": {"$date": "1972-01-01T01:01:01.010Z"}}',
json_util.dumps(post_epoch, json_options=STRICT_JSON_OPTIONS),
)
number_long_options = JSONOptions(
datetime_representation=DatetimeRepresentation.NUMBERLONG, json_mode=JSONMode.LEGACY
)
self.assertEqual(
'{"dt": {"$date": {"$numberLong": "63075661010"}}}',
json_util.dumps(post_epoch, json_options=number_long_options),
)
self.assertEqual(
'{"dt": {"$date": {"$numberLong": "-62135593138990"}}}',
json_util.dumps(pre_epoch, json_options=number_long_options),
)
# ISO8601 mode assumes naive datetimes are UTC
pre_epoch_naive = {"dt": datetime.datetime(1, 1, 1, 1, 1, 1, 10000)}
post_epoch_naive = {"dt": datetime.datetime(1972, 1, 1, 1, 1, 1, 10000)}
self.assertEqual(
'{"dt": {"$date": {"$numberLong": "-62135593138990"}}}',
json_util.dumps(pre_epoch_naive, json_options=STRICT_JSON_OPTIONS),
)
self.assertEqual(
'{"dt": {"$date": "1972-01-01T01:01:01.010Z"}}',
json_util.dumps(post_epoch_naive, json_options=STRICT_JSON_OPTIONS),
)
# Test tz_aware and tzinfo options
self.assertEqual(
datetime.datetime(1972, 1, 1, 1, 1, 1, 10000, utc),
json_util.loads(
'{"dt": {"$date": "1972-01-01T01:01:01.010+0000"}}', json_options=tz_aware_opts
)["dt"],
)
self.assertEqual(
datetime.datetime(1972, 1, 1, 1, 1, 1, 10000, utc),
json_util.loads(
'{"dt": {"$date": "1972-01-01T01:01:01.010+0000"}}',
json_options=JSONOptions(tz_aware=True, tzinfo=utc),
)["dt"],
)
self.assertEqual(
datetime.datetime(1972, 1, 1, 1, 1, 1, 10000),
json_util.loads(
'{"dt": {"$date": "1972-01-01T01:01:01.010+0000"}}',
json_options=JSONOptions(tz_aware=False),
)["dt"],
)
self.round_trip(pre_epoch_naive, json_options=JSONOptions(tz_aware=False))
# Test a non-utc timezone
pacific = FixedOffset(-8 * 60, "US/Pacific")
aware_datetime = {"dt": datetime.datetime(2002, 10, 27, 6, 0, 0, 10000, pacific)}
self.assertEqual(
'{"dt": {"$date": "2002-10-27T06:00:00.010-0800"}}',
json_util.dumps(aware_datetime, json_options=STRICT_JSON_OPTIONS),
)
self.round_trip(
aware_datetime,
json_options=JSONOptions(json_mode=JSONMode.LEGACY, tz_aware=True, tzinfo=pacific),
)
self.round_trip(
aware_datetime,
json_options=JSONOptions(
datetime_representation=DatetimeRepresentation.ISO8601,
json_mode=JSONMode.LEGACY,
tz_aware=True,
tzinfo=pacific,
),
)
def test_datetime_ms(self):
# Test ISO8601 in-range
dat_min = {"x": DatetimeMS(0)}
dat_max = {"x": DatetimeMS(_max_datetime_ms())}
opts = JSONOptions(datetime_representation=DatetimeRepresentation.ISO8601)
self.assertEqual(
dat_min["x"].as_datetime(CodecOptions(tz_aware=False)),
json_util.loads(json_util.dumps(dat_min))["x"],
)
self.assertEqual(
dat_max["x"].as_datetime(CodecOptions(tz_aware=False)),
json_util.loads(json_util.dumps(dat_max))["x"],
)
# Test ISO8601 out-of-range
dat_min = {"x": DatetimeMS(-1)}
dat_max = {"x": DatetimeMS(_max_datetime_ms() + 1)}
self.assertEqual('{"x": {"$date": {"$numberLong": "-1"}}}', json_util.dumps(dat_min))
self.assertEqual(
'{"x": {"$date": {"$numberLong": "' + str(int(dat_max["x"])) + '"}}}',
json_util.dumps(dat_max),
)
# Test legacy.
opts = JSONOptions(
datetime_representation=DatetimeRepresentation.LEGACY, json_mode=JSONMode.LEGACY
)
self.assertEqual('{"x": {"$date": "-1"}}', json_util.dumps(dat_min, json_options=opts))
self.assertEqual(
'{"x": {"$date": "' + str(int(dat_max["x"])) + '"}}',
json_util.dumps(dat_max, json_options=opts),
)
# Test regular.
opts = JSONOptions(
datetime_representation=DatetimeRepresentation.NUMBERLONG, json_mode=JSONMode.LEGACY
)
self.assertEqual(
'{"x": {"$date": {"$numberLong": "-1"}}}', json_util.dumps(dat_min, json_options=opts)
)
self.assertEqual(
'{"x": {"$date": {"$numberLong": "' + str(int(dat_max["x"])) + '"}}}',
json_util.dumps(dat_max, json_options=opts),
)
# Test decode from datetime.datetime to DatetimeMS
dat_min = {"x": datetime.datetime.min}
dat_max = {"x": DatetimeMS(_max_datetime_ms()).as_datetime(CodecOptions(tz_aware=False))}
opts = JSONOptions(
datetime_representation=DatetimeRepresentation.ISO8601,
datetime_conversion=DatetimeConversion.DATETIME_MS,
)
self.assertEqual(
DatetimeMS(dat_min["x"]),
json_util.loads(json_util.dumps(dat_min), json_options=opts)["x"],
)
self.assertEqual(
DatetimeMS(dat_max["x"]),
json_util.loads(json_util.dumps(dat_max), json_options=opts)["x"],
)
def test_regex_object_hook(self):
# Extended JSON format regular expression.
pat = "a*b"
json_re = '{"$regex": "%s", "$options": "u"}' % pat
loaded = json_util.object_hook(json.loads(json_re))
self.assertTrue(isinstance(loaded, Regex))
self.assertEqual(pat, loaded.pattern)
self.assertEqual(re.U, loaded.flags)
def test_regex(self):
for regex_instance in (re.compile("a*b", re.IGNORECASE), Regex("a*b", re.IGNORECASE)):
res = self.round_tripped({"r": regex_instance})["r"]
self.assertEqual("a*b", res.pattern)
res = self.round_tripped({"r": Regex("a*b", re.IGNORECASE)})["r"]
self.assertEqual("a*b", res.pattern)
self.assertEqual(re.IGNORECASE, res.flags)
unicode_options = re.I | re.M | re.S | re.U | re.X
regex = re.compile("a*b", unicode_options)
res = self.round_tripped({"r": regex})["r"]
self.assertEqual(unicode_options, res.flags)
# Some tools may not add $options if no flags are set.
res = json_util.loads('{"r": {"$regex": "a*b"}}')["r"]
self.assertEqual(0, res.flags)
self.assertEqual(
Regex(".*", "ilm"), json_util.loads('{"r": {"$regex": ".*", "$options": "ilm"}}')["r"]
)
# Check order.
self.assertEqual(
'{"$regularExpression": {"pattern": ".*", "options": "mx"}}',
json_util.dumps(Regex(".*", re.M | re.X)),
)
self.assertEqual(
'{"$regularExpression": {"pattern": ".*", "options": "mx"}}',
json_util.dumps(re.compile(b".*", re.M | re.X)),
)
self.assertEqual(
'{"$regex": ".*", "$options": "mx"}',
json_util.dumps(Regex(".*", re.M | re.X), json_options=LEGACY_JSON_OPTIONS),
)
def test_regex_validation(self):
non_str_types = [10, {}, []]
docs = [{"$regex": i} for i in non_str_types]
for doc in docs:
self.assertEqual(doc, json_util.loads(json.dumps(doc)))
doc = {"$regex": ""}
self.assertIsInstance(json_util.loads(json.dumps(doc)), Regex)
def test_minkey(self):
self.round_trip({"m": MinKey()})
def test_maxkey(self):
self.round_trip({"m": MaxKey()})
def test_timestamp(self):
dct = {"ts": Timestamp(4, 13)}
res = json_util.dumps(dct, default=json_util.default)
rtdct = json_util.loads(res)
self.assertEqual(dct, rtdct)
self.assertEqual('{"ts": {"$timestamp": {"t": 4, "i": 13}}}', res)
def test_uuid_default(self):
# Cannot directly encode native UUIDs with the default
# uuid_representation.
doc = {"uuid": uuid.UUID("f47ac10b-58cc-4372-a567-0e02b2c3d479")}
with self.assertRaisesRegex(ValueError, "cannot encode native uuid"):
json_util.dumps(doc)
legacy_jsn = '{"uuid": {"$uuid": "f47ac10b58cc4372a5670e02b2c3d479"}}'
expected = {"uuid": Binary(b"\xf4z\xc1\x0bX\xccCr\xa5g\x0e\x02\xb2\xc3\xd4y", 4)}
self.assertEqual(json_util.loads(legacy_jsn), expected)
def test_uuid(self):
doc = {"uuid": uuid.UUID("f47ac10b-58cc-4372-a567-0e02b2c3d479")}
uuid_legacy_opts = LEGACY_JSON_OPTIONS.with_options(
uuid_representation=UuidRepresentation.PYTHON_LEGACY
)
self.round_trip(doc, json_options=uuid_legacy_opts)
self.assertEqual(
'{"uuid": {"$uuid": "f47ac10b58cc4372a5670e02b2c3d479"}}',
json_util.dumps(doc, json_options=LEGACY_JSON_OPTIONS),
)
self.assertEqual(
'{"uuid": {"$binary": "9HrBC1jMQ3KlZw4CssPUeQ==", "$type": "03"}}',
json_util.dumps(
doc,
json_options=STRICT_JSON_OPTIONS.with_options(
uuid_representation=UuidRepresentation.PYTHON_LEGACY
),
),
)
self.assertEqual(
'{"uuid": {"$binary": "9HrBC1jMQ3KlZw4CssPUeQ==", "$type": "04"}}',
json_util.dumps(
doc,
json_options=JSONOptions(
strict_uuid=True, json_mode=JSONMode.LEGACY, uuid_representation=STANDARD
),
),
)
self.assertEqual(
doc,
json_util.loads(
'{"uuid": {"$binary": "9HrBC1jMQ3KlZw4CssPUeQ==", "$type": "03"}}',
json_options=uuid_legacy_opts,
),
)
for uuid_representation in set(ALL_UUID_REPRESENTATIONS) - {UuidRepresentation.UNSPECIFIED}:
options = JSONOptions(
strict_uuid=True, json_mode=JSONMode.LEGACY, uuid_representation=uuid_representation
)
self.round_trip(doc, json_options=options)
# Ignore UUID representation when decoding BSON binary subtype 4.
self.assertEqual(
doc,
json_util.loads(
'{"uuid": {"$binary": "9HrBC1jMQ3KlZw4CssPUeQ==", "$type": "04"}}',
json_options=options,
),
)
def test_uuid_uuid_rep_unspecified(self):
_uuid = uuid.uuid4()
options = JSONOptions(
strict_uuid=True,
json_mode=JSONMode.LEGACY,
uuid_representation=UuidRepresentation.UNSPECIFIED,
)
# Cannot directly encode native UUIDs with UNSPECIFIED.
doc = {"uuid": _uuid}
with self.assertRaises(ValueError):
json_util.dumps(doc, json_options=options)
# All UUID subtypes are decoded as Binary with UNSPECIFIED.
# subtype 3
doc = {"uuid": Binary(_uuid.bytes, subtype=3)}
ext_json_str = json_util.dumps(doc)
self.assertEqual(doc, json_util.loads(ext_json_str, json_options=options))
# subtype 4
doc = {"uuid": Binary(_uuid.bytes, subtype=4)}
ext_json_str = json_util.dumps(doc)
self.assertEqual(doc, json_util.loads(ext_json_str, json_options=options))
# $uuid-encoded fields
doc = {"uuid": Binary(_uuid.bytes, subtype=4)}
ext_json_str = json_util.dumps({"uuid": _uuid}, json_options=LEGACY_JSON_OPTIONS)
self.assertEqual(doc, json_util.loads(ext_json_str, json_options=options))
def test_binary(self):
bin_type_dict = {"bin": b"\x00\x01\x02\x03\x04"}
md5_type_dict = {
"md5": Binary(b" n7\x18\xaf\t/\xd1\xd1/\x80\xca\xe7q\xcc\xac", MD5_SUBTYPE)
}
custom_type_dict = {"custom": Binary(b"hello", USER_DEFINED_SUBTYPE)}
self.round_trip(bin_type_dict)
self.round_trip(md5_type_dict)
self.round_trip(custom_type_dict)
# Binary with subtype 0 is decoded into bytes in Python 3.
bin = json_util.loads('{"bin": {"$binary": "AAECAwQ=", "$type": "00"}}')["bin"]
self.assertEqual(type(bin), bytes)
# PYTHON-443 ensure old type formats are supported
json_bin_dump = json_util.dumps(bin_type_dict, json_options=LEGACY_JSON_OPTIONS)
self.assertIn('"$type": "00"', json_bin_dump)
self.assertEqual(
bin_type_dict, json_util.loads('{"bin": {"$type": 0, "$binary": "AAECAwQ="}}')
)
json_bin_dump = json_util.dumps(md5_type_dict, json_options=LEGACY_JSON_OPTIONS)
# Check order.
self.assertEqual(
'{"md5": {"$binary": "IG43GK8JL9HRL4DK53HMrA==", "$type": "05"}}', json_bin_dump
)
self.assertEqual(
md5_type_dict,
json_util.loads('{"md5": {"$type": 5, "$binary": "IG43GK8JL9HRL4DK53HMrA=="}}'),
)
json_bin_dump = json_util.dumps(custom_type_dict, json_options=LEGACY_JSON_OPTIONS)
self.assertIn('"$type": "80"', json_bin_dump)
self.assertEqual(
custom_type_dict,
json_util.loads('{"custom": {"$type": 128, "$binary": "aGVsbG8="}}'),
)
# Handle mongoexport where subtype >= 128
self.assertEqual(
128,
json_util.loads('{"custom": {"$type": "ffffff80", "$binary": "aGVsbG8="}}')[
"custom"
].subtype,
)
self.assertEqual(
255,
json_util.loads('{"custom": {"$type": "ffffffff", "$binary": "aGVsbG8="}}')[
"custom"
].subtype,
)
def test_code(self):
self.round_trip({"code": Code("function x() { return 1; }")})
code = Code("return z", z=2)
res = json_util.dumps(code)
self.assertEqual(code, json_util.loads(res))
# Check order.
self.assertEqual('{"$code": "return z", "$scope": {"z": 2}}', res)
no_scope = Code("function() {}")
self.assertEqual('{"$code": "function() {}"}', json_util.dumps(no_scope))
def test_undefined(self):
jsn = '{"name": {"$undefined": true}}'
self.assertIsNone(json_util.loads(jsn)["name"])
def test_numberlong(self):
jsn = '{"weight": {"$numberLong": "65535"}}'
self.assertEqual(json_util.loads(jsn)["weight"], Int64(65535))
self.assertEqual(json_util.dumps({"weight": Int64(65535)}), '{"weight": 65535}')
json_options = JSONOptions(strict_number_long=True, json_mode=JSONMode.LEGACY)
self.assertEqual(json_util.dumps({"weight": Int64(65535)}, json_options=json_options), jsn)
def test_loads_document_class(self):
# document_class dict should always work
self.assertEqual(
{"foo": "bar"},
json_util.loads('{"foo": "bar"}', json_options=JSONOptions(document_class=dict)),
)
self.assertEqual(
SON([("foo", "bar"), ("b", 1)]),
json_util.loads('{"foo": "bar", "b": 1}', json_options=JSONOptions(document_class=SON)),
)
class TestJsonUtilRoundtrip(IntegrationTest):
def test_cursor(self):
db = self.db
db.drop_collection("test")
docs: List[MutableMapping[str, Any]] = [
{"foo": [1, 2]},
{"bar": {"hello": "world"}},
{"code": Code("function x() { return 1; }")},
{"bin": Binary(b"\x00\x01\x02\x03\x04", USER_DEFINED_SUBTYPE)},
{"dbref": {"_ref": DBRef("simple", ObjectId("509b8db456c02c5ab7e63c34"))}},
]
db.test.insert_many(docs)
reloaded_docs = json_util.loads(json_util.dumps(db.test.find()))
for doc in docs:
self.assertTrue(doc in reloaded_docs)
if __name__ == "__main__":
unittest.main()