basic auto reconnection
This commit is contained in:
parent
17788927fd
commit
f751f6b0a4
@ -68,7 +68,6 @@ class Connection(object):
|
||||
if pool_size <= 0:
|
||||
raise ValueError("pool_size must be positive")
|
||||
|
||||
# host and port for the master
|
||||
self.__host = None
|
||||
self.__port = None
|
||||
|
||||
@ -82,18 +81,13 @@ class Connection(object):
|
||||
|
||||
self.__pool_size = pool_size
|
||||
self.__auto_start_request = auto_start_request
|
||||
|
||||
# map from threads to sockets
|
||||
self.__thread_map = {}
|
||||
|
||||
# count of how many threads are mapped to each socket
|
||||
self.__thread_count = [0 for _ in range(self.__pool_size)]
|
||||
|
||||
# locks to be used around socket-level operations
|
||||
self.__locks = [threading.Lock() for _ in range(self.__pool_size)]
|
||||
|
||||
# sockets that make up the pool
|
||||
self.__sockets = [None for _ in range(self.__pool_size)]
|
||||
self.__currently_resetting = False
|
||||
|
||||
if _connect:
|
||||
self.__find_master()
|
||||
@ -178,6 +172,8 @@ class Connection(object):
|
||||
address of the master.
|
||||
"""
|
||||
_logger.debug("finding master")
|
||||
self.__host = None
|
||||
self.__port = None
|
||||
for (host, port) in self.__nodes:
|
||||
_logger.debug("trying %r:%r" % (host, port))
|
||||
try:
|
||||
@ -208,6 +204,8 @@ class Connection(object):
|
||||
|
||||
Connect to the master if this is a paired connection.
|
||||
"""
|
||||
if self.host() is None or self.port() is None:
|
||||
self.__find_master()
|
||||
_logger.debug("connecting socket %s..." % socket_number)
|
||||
if self.__sockets[socket_number]:
|
||||
_logger.debug("closing previous connection")
|
||||
@ -225,6 +223,32 @@ class Connection(object):
|
||||
self.__sockets[socket_number] = None
|
||||
raise ConnectionFailure("could not connect to %r" % self.__nodes)
|
||||
|
||||
def __reset(self):
|
||||
"""Reset everything and start connecting again.
|
||||
|
||||
Closes all open sockets and resets them to None. Re-finds the master.
|
||||
|
||||
This should be done in case of a connection failure or a "not master"
|
||||
error.
|
||||
"""
|
||||
if self.__currently_resetting:
|
||||
return
|
||||
self.__currently_resetting = True
|
||||
|
||||
for i in range(self.__pool_size):
|
||||
# prevent all operations during the reset
|
||||
self.__locks[i].acquire()
|
||||
if self.__sockets[i] is not None:
|
||||
self.__sockets[i].close()
|
||||
self.__sockets[i] = None
|
||||
|
||||
try:
|
||||
self.__find_master()
|
||||
finally:
|
||||
self.__currently_resetting = False
|
||||
for i in range(self.__pool_size):
|
||||
self.__locks[i].release()
|
||||
|
||||
def set_cursor_manager(self, manager_class):
|
||||
"""Set this connections cursor manager.
|
||||
|
||||
@ -268,8 +292,12 @@ class Connection(object):
|
||||
self.__thread_map[thread] = sock
|
||||
self.__thread_count[sock] += 1
|
||||
|
||||
if not self.__sockets[sock]:
|
||||
self.__connect(sock)
|
||||
try:
|
||||
if not self.__sockets[sock]:
|
||||
self.__connect(sock)
|
||||
except:
|
||||
self.__locks[sock].release()
|
||||
raise
|
||||
return sock
|
||||
|
||||
def __send_message_on_socket(self, operation, data, sock):
|
||||
@ -284,9 +312,12 @@ class Connection(object):
|
||||
|
||||
total_sent = 0
|
||||
while total_sent < len(to_send):
|
||||
sent = sock.send(to_send[total_sent:])
|
||||
try:
|
||||
sent = sock.send(to_send[total_sent:])
|
||||
except socket.error:
|
||||
raise ConnectionFailure("connection closed, resetting")
|
||||
if sent == 0:
|
||||
raise ConnectionFailure("connection closed")
|
||||
raise ConnectionFailure("connection closed, resetting")
|
||||
total_sent += sent
|
||||
|
||||
return request_id
|
||||
@ -303,14 +334,22 @@ class Connection(object):
|
||||
"""
|
||||
sock_number = self.__get_socket()
|
||||
sock = self.__sockets[sock_number]
|
||||
self.__send_message_on_socket(operation, data, sock)
|
||||
self.__locks[sock_number].release()
|
||||
try:
|
||||
self.__send_message_on_socket(operation, data, sock)
|
||||
self.__locks[sock_number].release()
|
||||
except ConnectionFailure:
|
||||
self.__locks[sock_number].release()
|
||||
self.__reset()
|
||||
raise
|
||||
|
||||
def __receive_message_on_socket(self, operation, request_id, sock):
|
||||
def receive(length):
|
||||
message = ""
|
||||
while len(message) < length:
|
||||
chunk = sock.recv(length - len(message))
|
||||
try:
|
||||
chunk = sock.recv(length - len(message))
|
||||
except socket.error:
|
||||
raise ConnectionFailure("connection closed")
|
||||
if chunk == "":
|
||||
raise ConnectionFailure("connection closed")
|
||||
message += chunk
|
||||
@ -345,10 +384,15 @@ class Connection(object):
|
||||
|
||||
sock_number = self.__get_socket()
|
||||
sock = self.__sockets[sock_number]
|
||||
request_id = self.__send_message_on_socket(operation, data, sock)
|
||||
result = self.__receive_message_on_socket(1, request_id, sock)
|
||||
self.__locks[sock_number].release()
|
||||
return result
|
||||
try:
|
||||
request_id = self.__send_message_on_socket(operation, data, sock)
|
||||
result = self.__receive_message_on_socket(1, request_id, sock)
|
||||
self.__locks[sock_number].release()
|
||||
return result
|
||||
except ConnectionFailure:
|
||||
self.__locks[sock_number].release()
|
||||
self.__reset()
|
||||
raise
|
||||
|
||||
def start_request(self):
|
||||
"""Start a "request".
|
||||
|
||||
Loading…
Reference in New Issue
Block a user