Add a multithreaded stress test (#199)

* Add a multithreaded stress test

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* add noqa

* add a docstring for the new test

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
This commit is contained in:
Nathan Goldbaum 2025-05-03 04:45:47 -06:00 committed by GitHub
parent 9143d9092b
commit b8321fa3b6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,5 +1,10 @@
# SPDX-License-Identifier: MIT
import secrets
import sys
import threading
from concurrent.futures import ThreadPoolExecutor
from unittest import mock
import pytest
@ -194,3 +199,38 @@ class TestPasswordHasher:
hash = ph.hash("hello")
assert ph.verify(hash, "hello") is True
def test_multithreaded_hashing():
"""
Hash passwords in a thread pool and check for thread safety
"""
hasher = PasswordHasher(parallelism=2)
num_passwords = 100
passwords = [secrets.token_urlsafe(15) for _ in range(num_passwords)]
def closure(b, passwords):
b.wait()
for password in passwords:
assert hasher.verify(hasher.hash(password), password)
max_workers = 4
chunks = [passwords[i::max_workers] for i in range(max_workers)]
orig_interval = sys.getswitchinterval()
with ThreadPoolExecutor(max_workers=max_workers) as tpe:
barrier = threading.Barrier(max_workers)
futures = []
try:
sys.setswitchinterval(0.00001)
for chunk in chunks:
futures.append(tpe.submit(closure, barrier, chunk)) # noqa: PERF401
finally:
sys.setswitchinterval(orig_interval)
if len(futures) < max_workers:
barrier.abort()
for f in futures:
f.result()