PYTHON-4691 Fix non-UTC timezones with DATETIME_CLAMP/DATETIME_AUTO (#1811)

This commit is contained in:
Shane Harvey 2024-08-28 11:39:03 -07:00 committed by GitHub
parent 9d3b5033fa
commit 28697df6f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 98 additions and 20 deletions

View File

@ -114,17 +114,40 @@ class DatetimeMS:
return self._value
def _datetime_to_millis(dtm: datetime.datetime) -> int:
"""Convert datetime to milliseconds since epoch UTC."""
if dtm.utcoffset() is not None:
dtm = dtm - dtm.utcoffset() # type: ignore
return int(calendar.timegm(dtm.timetuple()) * 1000 + dtm.microsecond // 1000)
_MIN_UTC = datetime.datetime.min.replace(tzinfo=utc)
_MAX_UTC = datetime.datetime.max.replace(tzinfo=utc)
_MIN_UTC_MS = _datetime_to_millis(_MIN_UTC)
_MAX_UTC_MS = _datetime_to_millis(_MAX_UTC)
# Inclusive and exclusive min and max for timezones.
# Timezones are hashed by their offset, which is a timedelta
# and therefore there are more than 24 possible timezones.
@functools.lru_cache(maxsize=None)
def _min_datetime_ms(tz: datetime.timezone = datetime.timezone.utc) -> int:
return _datetime_to_millis(datetime.datetime.min.replace(tzinfo=tz))
delta = tz.utcoffset(_MIN_UTC)
if delta is not None:
offset_millis = (delta.days * 86400 + delta.seconds) * 1000 + delta.microseconds // 1000
else:
offset_millis = 0
return max(_MIN_UTC_MS, _MIN_UTC_MS - offset_millis)
@functools.lru_cache(maxsize=None)
def _max_datetime_ms(tz: datetime.timezone = datetime.timezone.utc) -> int:
return _datetime_to_millis(datetime.datetime.max.replace(tzinfo=tz))
delta = tz.utcoffset(_MAX_UTC)
if delta is not None:
offset_millis = (delta.days * 86400 + delta.seconds) * 1000 + delta.microseconds // 1000
else:
offset_millis = 0
return min(_MAX_UTC_MS, _MAX_UTC_MS - offset_millis)
def _millis_to_datetime(
@ -162,10 +185,3 @@ def _millis_to_datetime(
return DatetimeMS(millis)
else:
raise ValueError("datetime_conversion must be an element of DatetimeConversion")
def _datetime_to_millis(dtm: datetime.datetime) -> int:
"""Convert datetime to milliseconds since epoch UTC."""
if dtm.utcoffset() is not None:
dtm = dtm - dtm.utcoffset() # type: ignore
return int(calendar.timegm(dtm.timetuple()) * 1000 + dtm.microsecond // 1000)

View File

@ -1252,54 +1252,116 @@ class TestDatetimeConversion(unittest.TestCase):
def test_clamping(self):
# Test clamping from below and above.
opts1 = CodecOptions(
opts = CodecOptions(
datetime_conversion=DatetimeConversion.DATETIME_CLAMP,
tz_aware=True,
tzinfo=datetime.timezone.utc,
)
below = encode({"x": DatetimeMS(_datetime_to_millis(datetime.datetime.min) - 1)})
dec_below = decode(below, opts1)
dec_below = decode(below, opts)
self.assertEqual(
dec_below["x"], datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
)
above = encode({"x": DatetimeMS(_datetime_to_millis(datetime.datetime.max) + 1)})
dec_above = decode(above, opts1)
dec_above = decode(above, opts)
self.assertEqual(
dec_above["x"],
datetime.datetime.max.replace(tzinfo=datetime.timezone.utc, microsecond=999000),
)
def test_tz_clamping(self):
def test_tz_clamping_local(self):
# Naive clamping to local tz.
opts1 = CodecOptions(datetime_conversion=DatetimeConversion.DATETIME_CLAMP, tz_aware=False)
opts = CodecOptions(datetime_conversion=DatetimeConversion.DATETIME_CLAMP, tz_aware=False)
below = encode({"x": DatetimeMS(_datetime_to_millis(datetime.datetime.min) - 24 * 60 * 60)})
dec_below = decode(below, opts1)
dec_below = decode(below, opts)
self.assertEqual(dec_below["x"], datetime.datetime.min)
above = encode({"x": DatetimeMS(_datetime_to_millis(datetime.datetime.max) + 24 * 60 * 60)})
dec_above = decode(above, opts1)
dec_above = decode(above, opts)
self.assertEqual(
dec_above["x"],
datetime.datetime.max.replace(microsecond=999000),
)
# Aware clamping.
opts2 = CodecOptions(datetime_conversion=DatetimeConversion.DATETIME_CLAMP, tz_aware=True)
def test_tz_clamping_utc(self):
# Aware clamping default utc.
opts = CodecOptions(datetime_conversion=DatetimeConversion.DATETIME_CLAMP, tz_aware=True)
below = encode({"x": DatetimeMS(_datetime_to_millis(datetime.datetime.min) - 24 * 60 * 60)})
dec_below = decode(below, opts2)
dec_below = decode(below, opts)
self.assertEqual(
dec_below["x"], datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
)
above = encode({"x": DatetimeMS(_datetime_to_millis(datetime.datetime.max) + 24 * 60 * 60)})
dec_above = decode(above, opts2)
dec_above = decode(above, opts)
self.assertEqual(
dec_above["x"],
datetime.datetime.max.replace(tzinfo=datetime.timezone.utc, microsecond=999000),
)
def test_tz_clamping_non_utc(self):
for tz in [FixedOffset(60, "+1H"), FixedOffset(-60, "-1H")]:
opts = CodecOptions(
datetime_conversion=DatetimeConversion.DATETIME_CLAMP, tz_aware=True, tzinfo=tz
)
# Min/max values in this timezone which can be represented in both BSON and datetime UTC.
try:
min_tz = datetime.datetime.min.replace(tzinfo=utc).astimezone(tz)
except OverflowError:
min_tz = datetime.datetime.min.replace(tzinfo=tz)
try:
max_tz = datetime.datetime.max.replace(tzinfo=utc, microsecond=999000).astimezone(
tz
)
except OverflowError:
max_tz = datetime.datetime.max.replace(tzinfo=tz, microsecond=999000)
for in_range in [
min_tz,
min_tz + datetime.timedelta(milliseconds=1),
max_tz - datetime.timedelta(milliseconds=1),
max_tz,
]:
doc = decode(encode({"x": in_range}), opts)
self.assertEqual(doc["x"], in_range)
for too_low in [
DatetimeMS(_datetime_to_millis(min_tz) - 1),
DatetimeMS(_datetime_to_millis(min_tz) - 60 * 60 * 1000),
DatetimeMS(_datetime_to_millis(min_tz) - 1 - 60 * 60 * 1000),
DatetimeMS(_datetime_to_millis(datetime.datetime.min) - 1),
DatetimeMS(_datetime_to_millis(datetime.datetime.min) - 60 * 60 * 1000),
DatetimeMS(_datetime_to_millis(datetime.datetime.min) - 1 - 60 * 60 * 1000),
]:
doc = decode(encode({"x": too_low}), opts)
self.assertEqual(doc["x"], min_tz)
for too_high in [
DatetimeMS(_datetime_to_millis(max_tz) + 1),
DatetimeMS(_datetime_to_millis(max_tz) + 60 * 60 * 1000),
DatetimeMS(_datetime_to_millis(max_tz) + 1 + 60 * 60 * 1000),
DatetimeMS(_datetime_to_millis(datetime.datetime.max) + 1),
DatetimeMS(_datetime_to_millis(datetime.datetime.max) + 60 * 60 * 1000),
DatetimeMS(_datetime_to_millis(datetime.datetime.max) + 1 + 60 * 60 * 1000),
]:
doc = decode(encode({"x": too_high}), opts)
self.assertEqual(doc["x"], max_tz)
def test_tz_clamping_non_utc_simple(self):
dtm = datetime.datetime(2024, 8, 23)
encoded = encode({"d": dtm})
self.assertEqual(decode(encoded)["d"], dtm)
for conversion in [
DatetimeConversion.DATETIME,
DatetimeConversion.DATETIME_CLAMP,
DatetimeConversion.DATETIME_AUTO,
]:
for tz in [FixedOffset(60, "+1H"), FixedOffset(-60, "-1H")]:
opts = CodecOptions(datetime_conversion=conversion, tz_aware=True, tzinfo=tz)
self.assertEqual(decode(encoded, opts)["d"], dtm.replace(tzinfo=utc).astimezone(tz))
def test_datetime_auto(self):
# Naive auto, in range.
opts1 = CodecOptions(datetime_conversion=DatetimeConversion.DATETIME_AUTO)