Test Pool(use_greenlets=True) when Gevent is not installed. PYTHON-561
This commit is contained in:
parent
58b5ecec92
commit
7d0240a88a
@ -14,16 +14,19 @@
|
||||
|
||||
"""Tests for connection-pooling with greenlets and Gevent"""
|
||||
|
||||
import gc
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from nose.plugins.skip import SkipTest
|
||||
|
||||
from pymongo import pool
|
||||
from pymongo.errors import ConfigurationError
|
||||
from test import host, port
|
||||
from test.utils import looplet
|
||||
from test.test_pooling_base import (
|
||||
_TestPooling, _TestMaxPoolSize, _TestMaxOpenSockets,
|
||||
_TestPoolSocketSharing, _TestWaitQueueMultiple)
|
||||
_TestPoolSocketSharing, _TestWaitQueueMultiple, has_gevent)
|
||||
|
||||
|
||||
class TestPoolingGevent(_TestPooling, unittest.TestCase):
|
||||
@ -189,5 +192,49 @@ class TestWaitQueueMultipleGevent(_TestWaitQueueMultiple, unittest.TestCase):
|
||||
use_greenlets = True
|
||||
|
||||
|
||||
class TestUseGreenletsWithoutGevent(unittest.TestCase):
|
||||
def test_use_greenlets_without_gevent(self):
|
||||
# Verify that Pool(use_greenlets=True) raises ConfigurationError if
|
||||
# Gevent is not installed, and that its destructor runs without error.
|
||||
if has_gevent:
|
||||
raise SkipTest(
|
||||
"Gevent is installed, can't test what happens calling "
|
||||
"Pool(use_greenlets=True) when Gevent is unavailable")
|
||||
|
||||
# Possible outcomes of __del__.
|
||||
DID_NOT_RUN, RAISED, SUCCESS = range(3)
|
||||
outcome = [DID_NOT_RUN]
|
||||
|
||||
class TestPool(pool.Pool):
|
||||
def __del__(self):
|
||||
try:
|
||||
pool.Pool.__del__(self) # Pool is old-style, no super()
|
||||
outcome[0] = SUCCESS
|
||||
except:
|
||||
outcome[0] = RAISED
|
||||
|
||||
# Pool raises ConfigurationError, "The Gevent module is not available".
|
||||
self.assertRaises(
|
||||
ConfigurationError,
|
||||
TestPool,
|
||||
pair=(host, port),
|
||||
max_size=10,
|
||||
net_timeout=1000,
|
||||
conn_timeout=1000,
|
||||
use_ssl=False,
|
||||
use_greenlets=True)
|
||||
|
||||
# Convince Jython or PyPy to call __del__.
|
||||
for _ in range(10):
|
||||
if outcome[0] == DID_NOT_RUN:
|
||||
gc.collect()
|
||||
time.sleep(0.1)
|
||||
|
||||
if outcome[0] == DID_NOT_RUN:
|
||||
self.fail("Pool.__del__ didn't run")
|
||||
elif outcome[0] == RAISED:
|
||||
self.fail("Pool.__del__ raised exception")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user