PYTHON-4611 Prefer non deprecated cryptography apis (#1770)

This commit is contained in:
Shane Harvey 2024-08-06 10:50:52 -07:00 committed by GitHub
parent a5d519775d
commit da2465f2c7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 50 additions and 21 deletions

View File

@ -19,7 +19,7 @@ from __future__ import annotations
from collections import namedtuple
from datetime import datetime as _datetime
from datetime import timezone
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Optional
from pymongo.lock import _create_lock
@ -27,6 +27,22 @@ if TYPE_CHECKING:
from cryptography.x509.ocsp import OCSPRequest, OCSPResponse
def _next_update(value: OCSPResponse) -> Optional[_datetime]:
"""Compat helper to return the response's next_update_utc."""
# Added in cryptography 43.0.0.
if hasattr(value, "next_update_utc"):
return value.next_update_utc
return value.next_update
def _this_update(value: OCSPResponse) -> Optional[_datetime]:
"""Compat helper to return the response's this_update_utc."""
# Added in cryptography 43.0.0.
if hasattr(value, "this_update_utc"):
return value.this_update_utc
return value.this_update
class _OCSPCache:
"""A cache for OCSP responses."""
@ -62,25 +78,30 @@ class _OCSPCache:
# As per the OCSP protocol, if the response's nextUpdate field is
# not set, the responder is indicating that newer revocation
# information is available all the time.
if value.next_update is None:
next_update = _next_update(value)
if next_update is None:
self._data.pop(cache_key, None)
return
this_update = _this_update(value)
if this_update is None:
return
now = _datetime.now(tz=timezone.utc)
if this_update.tzinfo is None:
# Make naive to match cryptography.
now = now.replace(tzinfo=None)
# Do nothing if the response is invalid.
if not (
value.this_update
<= _datetime.now(tz=timezone.utc).replace(tzinfo=None)
< value.next_update
):
if not (this_update <= now < next_update):
return
# Cache new response OR update cached response if new response
# has longer validity.
cached_value = self._data.get(cache_key, None)
if cached_value is None or (
cached_value.next_update is not None
and cached_value.next_update < value.next_update
):
if cached_value is None:
self._data[cache_key] = value
return
cached_next_update = _next_update(cached_value)
if cached_next_update is not None and cached_next_update < next_update:
self._data[cache_key] = value
def __getitem__(self, item: OCSPRequest) -> OCSPResponse:
@ -95,13 +116,15 @@ class _OCSPCache:
value = self._data[cache_key]
# Return cached response if it is still valid.
assert value.this_update is not None
assert value.next_update is not None
if (
value.this_update
<= _datetime.now(tz=timezone.utc).replace(tzinfo=None)
< value.next_update
):
this_update = _this_update(value)
next_update = _next_update(value)
assert this_update is not None
assert next_update is not None
now = _datetime.now(tz=timezone.utc)
if this_update.tzinfo is None:
# Make naive to match cryptography.
now = now.replace(tzinfo=None)
if this_update <= now < next_update:
return value
self._data.pop(cache_key, None)

View File

@ -58,6 +58,7 @@ from requests import post as _post
from requests.exceptions import RequestException as _RequestException
from pymongo import _csot
from pymongo.ocsp_cache import _next_update, _this_update
if TYPE_CHECKING:
from cryptography.hazmat.primitives.asymmetric import (
@ -275,13 +276,18 @@ def _verify_response(issuer: Certificate, response: OCSPResponse) -> int:
# Note that we are not using a "tolerance period" as discussed in
# https://tools.ietf.org/rfc/rfc5019.txt?
now = _datetime.now(tz=timezone.utc).replace(tzinfo=None)
this_update = _this_update(response)
now = _datetime.now(tz=timezone.utc)
if this_update and this_update.tzinfo is None:
# Make naive to match cryptography.
now = now.replace(tzinfo=None)
# RFC6960, Section 3.2, Number 5
if response.this_update > now:
if this_update and this_update > now:
_LOGGER.debug("thisUpdate is in the future")
return 0
# RFC6960, Section 3.2, Number 6
if response.next_update and response.next_update < now:
next_update = _next_update(response)
if next_update and next_update < now:
_LOGGER.debug("nextUpdate is in the past")
return 0
return 1