Merge "Fix the driver shutdown/failover logic"

This commit is contained in:
Jenkins 2016-04-04 16:59:42 +00:00 committed by Gerrit Code Review
commit 340589e47d
4 changed files with 275 additions and 138 deletions

View File

@ -50,6 +50,46 @@ class Task(object):
"""This method will be run on the eventloop thread."""
class Sender(pyngus.SenderEventHandler):
"""A single outgoing link to a given address"""
def __init__(self, address):
self._address = address
self._link = None
def attach(self, connection):
# open a link to the destination
sname = "Producer-%s:src=%s:tgt=%s" % (uuid.uuid4().hex,
self._address,
self._address)
self._link = connection.create_sender(name=sname,
source_address=self._address,
target_address=self._address)
self._link.open()
def detach(self):
# close the link
if self._link:
self._link.close()
def destroy(self):
# drop reference to link. The link will be freed when the
# connection is destroyed
self._link = None
def send(self, message, callback):
# send message out the link, invoke callback when acked
self._link.send(message, delivery_callback=callback)
def sender_remote_closed(self, sender_link, pn_condition):
LOG.debug("sender_remote_closed condition=%s", pn_condition)
sender_link.close()
def sender_failed(self, sender_link, error):
"""Protocol error occurred."""
LOG.error(_LE("Outgoing link to %(addr) failed. error=%(error)"),
{"addr": self._address, "error": error})
class Replies(pyngus.ReceiverEventHandler):
"""This is the receiving link for all reply messages. Messages are routed
to the proper Listener's incoming queue using the correlation-id header in
@ -73,8 +113,13 @@ class Replies(pyngus.ReceiverEventHandler):
self._credit = 0
self._receiver.open()
def detach(self):
# close the link
self._receiver.close()
def destroy(self):
self._correlation = None
# drop reference to link. Link will be freed when the connection is
# released.
self._receiver = None
def ready(self):
@ -119,11 +164,17 @@ class Replies(pyngus.ReceiverEventHandler):
"""This is a Pyngus callback, invoked by Pyngus when the peer of this
receiver link has initiated closing the connection.
"""
# TODO(kgiusti) Unclear if this error will ever occur (as opposed to
# the Connection failing instead). Log for now, possibly implement a
# recovery strategy if necessary.
LOG.error(_LE("Reply subscription closed by peer: %s"),
(pn_condition or "no error given"))
# TODO(kgiusti) Log for now, possibly implement a recovery strategy if
# necessary.
if pn_condition:
LOG.error(_LE("Reply subscription closed by peer: %s"),
pn_condition)
receiver.close()
def receiver_failed(self, receiver_link, error):
"""Protocol error occurred."""
LOG.error(_LE("Link to reply queue %(addr) failed. error=%(error)"),
{"addr": self._address, "error": error})
def message_received(self, receiver, message, handle):
"""This is a Pyngus callback, invoked by Pyngus when a new message
@ -162,14 +213,13 @@ class Server(pyngus.ReceiverEventHandler):
self._incoming = incoming
self._addresses = addresses
self._capacity = 500 # credit per link
self._receivers = None
self._receivers = []
self._id = subscription_id
def attach(self, connection):
"""Create receiver links over the given connection for all the
configured addresses.
"""
self._receivers = []
for a in self._addresses:
props = {"snd-settle-mode": "settled"}
rname = "Consumer-%s:src=%s:tgt=%s" % (uuid.uuid4().hex, a, a)
@ -187,8 +237,18 @@ class Server(pyngus.ReceiverEventHandler):
r.open()
self._receivers.append(r)
def destroy(self):
self._receivers = None
def detach(self):
# close the links
for receiver in self._receivers:
receiver.close()
def reset(self):
# destroy the links, but keep the addresses around since we may be
# failing over. Since links are destroyed, this cannot be called from
# any of the following ReceiverLink callbacks.
for r in self._receivers:
r.destroy()
self._receivers = []
# Pyngus ReceiverLink event callbacks:
@ -196,12 +256,19 @@ class Server(pyngus.ReceiverEventHandler):
"""This is a Pyngus callback, invoked by Pyngus when the peer of this
receiver link has initiated closing the connection.
"""
vals = {
"addr": receiver.source_address or receiver.target_address,
"err_msg": pn_condition or "no error given"
}
LOG.error(_LE("Server subscription %(addr)s closed "
"by peer: %(err_msg)s"), vals)
if pn_condition:
vals = {
"addr": receiver.source_address or receiver.target_address,
"err_msg": pn_condition
}
LOG.error(_LE("Server subscription %(addr)s closed "
"by peer: %(err_msg)s"), vals)
receiver.close()
def receiver_failed(self, receiver_link, error):
"""Protocol error occurred."""
LOG.error(_LE("Listener link queue %(addr) failed. error=%(error)"),
{"addr": self._address, "error": error})
def message_received(self, receiver, message, handle):
"""This is a Pyngus callback, invoked by Pyngus when a new message
@ -323,22 +390,16 @@ class Controller(pyngus.ConnectionEventHandler):
self._tasks.put(task)
self._schedule_task_processing()
def shutdown(self, wait=True, timeout=None):
def shutdown(self, timeout=None):
"""Shutdown the messaging service."""
LOG.info(_LI("Shutting down the AMQP 1.0 connection"))
if self.processor:
self.processor.wakeup(lambda: self._start_shutdown())
LOG.debug("Waiting for eventloop to exit")
self.processor.shutdown(wait, timeout)
self.processor.join(timeout)
self._hard_reset()
self.processor.destroy()
self.processor = None
self._tasks = None
self._senders = None
for servers in self._servers.values():
for server in servers.values():
server.destroy()
self._servers.clear()
self._socket_connection = None
if self._replies:
self._replies.destroy()
self._replies = None
LOG.debug("Eventloop exited, driver shut down")
# The remaining methods are reserved to run from the eventloop thread only!
@ -426,16 +487,10 @@ class Controller(pyngus.ConnectionEventHandler):
def _sender(self, address):
# if we already have a sender for that address, use it
# else establish the sender and cache it
if address in self._senders:
sender = self._senders[address]
else:
sname = "Producer-%s:src=%s:tgt=%s" % (uuid.uuid4().hex,
address, address)
conn = self._socket_connection.connection
sender = conn.create_sender(source_address=address,
target_address=address,
name=sname)
sender.open()
sender = self._senders.get(address)
if sender is None:
sender = Sender(address)
sender.attach(self._socket_connection.connection)
self._senders[address] = sender
return sender
@ -446,7 +501,7 @@ class Controller(pyngus.ConnectionEventHandler):
"""
address = str(addr)
message.address = address
self._sender(address).send(message, delivery_callback=callback)
self._sender(address).send(message, callback)
def _server_address(self, target):
return self._concatenate([self.server_request_prefix,
@ -540,17 +595,24 @@ class Controller(pyngus.ConnectionEventHandler):
self._replies and self._replies.ready())
def _start_shutdown(self):
"""Called when the driver destroys the controller, this method attempts
to cleanly close the AMQP connection to the peer.
"""Called when the application is closing the transport.
Attempt to cleanly flush/close all links.
"""
LOG.info(_LI("Shutting down AMQP connection"))
self._closing = True
if self._socket_connection.connection.active:
if (self._socket_connection
and self._socket_connection.connection
and self._socket_connection.connection.active):
# try a clean shutdown
for sender in self._senders.values():
sender.detach()
for servers in self._servers.values():
for server in servers.values():
server.detach()
self._replies.detach()
self._socket_connection.connection.close()
else:
# don't wait for a close from the remote, may never happen
self._complete_shutdown()
self.processor.shutdown()
# reply link active callback:
@ -567,7 +629,7 @@ class Controller(pyngus.ConnectionEventHandler):
def socket_error(self, error):
"""Called by eventloop when a socket error occurs."""
LOG.debug("Socket failure: %s", error)
LOG.error(_LE("Socket failure: %s"), error)
self._handle_connection_loss()
# Pyngus connection event callbacks (and their helpers), all invoked from
@ -612,14 +674,12 @@ class Controller(pyngus.ConnectionEventHandler):
"""This is a Pyngus callback, invoked by Pyngus when the peer has
requested that the connection be closed.
"""
if not self._closing:
# The messaging service/broker is trying to shut down the
# connection. Acknowledge the close, and try to reconnect/failover
# later once the connection has closed (connection_closed is
# called).
LOG.info(_LI("Connection closed by peer: %s"),
reason or "no reason given")
self._socket_connection.connection.close()
# The messaging service/broker is trying to shut down the
# connection. Acknowledge the close, and try to reconnect/failover
# later once the connection has closed (connection_closed is called).
if reason:
LOG.info(_LI("Connection closed by peer: %s"), reason)
self._socket_connection.connection.close()
def sasl_done(self, connection, pn_sasl, outcome):
"""This is a Pyngus callback invoked when the SASL handshake
@ -635,28 +695,19 @@ class Controller(pyngus.ConnectionEventHandler):
'username': self.hosts.current.username})
# connection failure will be handled later
def _complete_shutdown(self):
"""The AMQP Connection has closed, and the driver shutdown is complete.
Clean up controller resources and exit.
"""
self._socket_connection.close()
self.processor.shutdown()
LOG.info(_LI("Messaging has shutdown"))
def _handle_connection_loss(self):
"""The connection to the messaging service has been lost. Try to
reestablish the connection/failover.
reestablish the connection/failover if not shutting down the driver.
"""
if self._closing:
# we're in the middle of shutting down the driver anyways,
# just consider it done:
self._complete_shutdown()
self.processor.shutdown()
else:
# for some reason, we've lost the connection to the messaging
# service. Try to re-establish the connection:
if not self._reconnecting:
self._reconnecting = True
self._replies = None
LOG.info(_LI("delaying reconnect attempt for %d seconds"),
self._delay)
self.processor.schedule(lambda: self._do_reconnect(),
@ -668,14 +719,29 @@ class Controller(pyngus.ConnectionEventHandler):
"""Invoked on connection/socket failure, failover and re-connect to the
messaging service.
"""
# note well: since this method destroys the connection, it cannot be
# invoked directly from a pyngus callback. Use processor.schedule() to
# run this method on the main loop instead.
if not self._closing:
self._hard_reset()
self._reconnecting = False
self._senders = {}
self._socket_connection.reset()
host = self.hosts.next()
LOG.info(_LI("Reconnecting to: %(hostname)s:%(port)s"),
{'hostname': host.hostname, 'port': host.port})
self._socket_connection.connect(host)
def _hard_reset(self):
"""Reset the controller to its pre-connection state"""
# note well: since this method destroys the connection, it cannot be
# invoked directly from a pyngus callback. Use processor.schedule() to
# run this method on the main loop instead.
for sender in self._senders.values():
sender.destroy()
self._senders.clear()
for servers in self._servers.values():
for server in servers.values():
# discard links, but keep servers around to re-attach if
# failing over
server.reset()
if self._replies:
self._replies.destroy()
self._replies = None
if self._socket_connection:
self._socket_connection.reset()

View File

@ -199,19 +199,28 @@ class ProtonDriver(base.BaseDriver):
"""
def wrap(self, *args, **kws):
with self._lock:
# check to see if a fork was done after the Controller and its
# I/O thread was spawned. old_pid will be None the first time
# this is called which will cause the Controller to be created.
old_pid = self._pid
self._pid = os.getpid()
if old_pid != self._pid:
if self._ctrl is not None:
LOG.warning(_LW("Process forked after connection "
"established!"))
self._ctrl.shutdown(wait=False)
# Create a Controller that connects to the messaging service:
self._ctrl = controller.Controller(self._hosts,
self._default_exchange,
self._conf)
self._ctrl.connect()
if old_pid != self._pid:
if self._ctrl is not None:
# fork was called after the Controller was created, and
# we are now executing as the child process. Do not
# touch the existing Controller - it is owned by the
# parent. Best we can do here is simply drop it and
# hope we get lucky.
LOG.warning(_LW("Process forked after connection "
"established!"))
self._ctrl = None
# Create a Controller that connects to the messaging
# service:
self._ctrl = controller.Controller(self._hosts,
self._default_exchange,
self._conf)
self._ctrl.connect()
return func(self, *args, **kws)
return wrap

View File

@ -76,7 +76,7 @@ class _SocketConnection(object):
except (socket.timeout, socket.error) as e:
# pyngus handles EAGAIN/EWOULDBLOCK and EINTER
self.connection.close_input()
self.connection.close()
self.connection.close_output()
self._handler.socket_error(str(e))
return pyngus.Connection.EOS
@ -90,7 +90,7 @@ class _SocketConnection(object):
except (socket.timeout, socket.error) as e:
# pyngus handles EAGAIN/EWOULDBLOCK and EINTER
self.connection.close_output()
self.connection.close()
self.connection.close_input()
self._handler.socket_error(str(e))
return pyngus.Connection.EOS
@ -161,6 +161,7 @@ class _SocketConnection(object):
def close(self):
if self.socket:
self.socket.close()
self.socket = None
class Schedule(object):
@ -257,15 +258,18 @@ class Thread(threading.Thread):
"""
self._requests.wakeup(request)
def shutdown(self, wait=True, timeout=None):
def shutdown(self):
"""Shutdown the eventloop thread. Thread safe.
"""
LOG.debug("eventloop shutdown requested")
self._shutdown = True
self.wakeup()
if wait:
self.join(timeout=timeout)
LOG.debug("eventloop shutdown complete")
def destroy(self):
# release the container. This can only be called after the eventloop
# thread exited
self._container.destroy()
self._container = None
# the following methods are not thread safe - they must be run from the
# eventloop thread
@ -322,12 +326,6 @@ class Thread(threading.Thread):
continue
raise # assuming fatal...
# don't process any I/O or timers if woken up by a shutdown:
# if we've been forked we don't want to do I/O on the parent's
# sockets
if self._shutdown:
break
readable, writable, ignore = results
for r in readable:
@ -345,4 +343,3 @@ class Thread(threading.Thread):
LOG.info(_LI("eventloop thread exiting, container=%s"),
self._container.name)
self._container.destroy()

View File

@ -516,9 +516,9 @@ class TestFailover(test_utils.BaseTestCase):
if broker.isAlive():
broker.stop()
def test_broker_failover(self):
"""Simulate failover of one broker to another."""
def _failover(self, fail_brokers):
self._brokers[0].start()
# self.config(trace=True, group="oslo_messaging_amqp")
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
target = oslo_messaging.Target(topic="my-topic")
@ -541,11 +541,10 @@ class TestFailover(test_utils.BaseTestCase):
self.assertEqual(self._brokers[0].topic_count, 1)
self.assertEqual(self._brokers[0].direct_count, 1)
# fail broker 0 and start broker 1:
self._brokers[0].stop()
self._brokers[1].start()
# invoke failover method
fail_brokers(self._brokers[0], self._brokers[1])
# wait for listener links to re-establish
# wait for listener links to re-establish on broker 1
# 4 = 3 links per listener + 1 for the global reply queue
predicate = lambda: self._brokers[1].sender_link_count == 4
_wait_until(predicate, 30)
@ -571,10 +570,40 @@ class TestFailover(test_utils.BaseTestCase):
self._brokers[1].stop()
driver.cleanup()
def test_broker_crash(self):
"""Simulate a failure of one broker."""
def _meth(broker0, broker1):
# fail broker 0 and start broker 1:
broker0.stop()
time.sleep(0.5)
broker1.start()
self._failover(_meth)
def test_broker_shutdown(self):
"""Simulate a normal shutdown of a broker."""
def _meth(broker0, broker1):
broker0.stop(clean=True)
time.sleep(0.5)
broker1.start()
self._failover(_meth)
def test_heartbeat_failover(self):
"""Simulate broker heartbeat timeout."""
def _meth(broker0, broker1):
# keep alive heartbeat from broker 0 will stop, which should force
# failover to broker 1 in about two seconds
broker0.pause()
broker1.start()
self.config(idle_timeout=2, group="oslo_messaging_amqp")
self._failover(_meth)
self._brokers[0].stop()
def test_listener_failover(self):
"""Verify that Listeners are re-established after failover.
"""Verify that Listeners sharing the same topic are re-established
after failover.
"""
self._brokers[0].start()
# self.config(trace=True, group="oslo_messaging_amqp")
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
target = oslo_messaging.Target(topic="my-topic")
@ -596,11 +625,11 @@ class TestFailover(test_utils.BaseTestCase):
_wait_until(predicate, 30)
self.assertTrue(predicate())
# fail broker 0 and start broker 1:
self._brokers[0].stop()
# start broker 1 then shutdown broker 0:
self._brokers[1].start()
self._brokers[0].stop(clean=True)
# wait again for 7 sending links to re-establish
# wait again for 7 sending links to re-establish on broker 1
predicate = lambda: self._brokers[1].sender_link_count == 7
_wait_until(predicate, 30)
self.assertTrue(predicate())
@ -617,8 +646,8 @@ class TestFailover(test_utils.BaseTestCase):
listener2.join(timeout=30)
self.assertFalse(listener1.isAlive() or listener2.isAlive())
self._brokers[1].stop()
driver.cleanup()
self._brokers[1].stop()
class FakeBroker(threading.Thread):
@ -659,23 +688,19 @@ class FakeBroker(threading.Thread):
self.connection.open()
self.sender_links = set()
self.receiver_links = set()
self.closed = False
self.dead_links = set()
def destroy(self):
"""Destroy the test connection."""
# destroy modifies the set, so make a copy
tmp = self.sender_links.copy()
while tmp:
link = tmp.pop()
link.destroy()
# destroy modifies the set, so make a copy
tmp = self.receiver_links.copy()
while tmp:
link = tmp.pop()
for link in self.sender_links | self.receiver_links:
link.destroy()
self.sender_links.clear()
self.receiver_links.clear()
self.dead_links.clear()
self.connection.destroy()
self.connection = None
self.socket.close()
self.socket = None
def fileno(self):
"""Allows use of this in a select() call."""
@ -685,18 +710,23 @@ class FakeBroker(threading.Thread):
"""Called when socket is read-ready."""
try:
pyngus.read_socket_input(self.connection, self.socket)
self.connection.process(time.time())
except socket.error:
pass
self.connection.process(time.time())
self._socket_error()
def send_output(self):
"""Called when socket is write-ready."""
try:
pyngus.write_socket_output(self.connection,
self.socket)
self.connection.process(time.time())
except socket.error:
pass
self.connection.process(time.time())
self._socket_error()
def _socket_error(self):
self.connection.close_input()
self.connection.close_output()
# the broker will clean up in its main loop
# Pyngus ConnectionEventHandler callbacks:
@ -710,7 +740,6 @@ class FakeBroker(threading.Thread):
def connection_closed(self, connection):
"""Connection close completed."""
self.server.connection_count -= 1
self.closed = True # main loop will destroy
def connection_failed(self, connection, error):
"""Connection failure detected."""
@ -757,10 +786,10 @@ class FakeBroker(threading.Thread):
def destroy(self):
"""Destroy the link."""
self._cleanup()
conn = self.conn
self.conn = None
conn.sender_links.remove(self)
conn.dead_links.discard(self)
if self.link:
self.link.destroy()
self.link = None
@ -774,6 +803,7 @@ class FakeBroker(threading.Thread):
self.server.remove_route(self.link.source_address,
self)
self.routed = False
self.conn.dead_links.add(self)
# Pyngus SenderEventHandler callbacks:
@ -783,12 +813,14 @@ class FakeBroker(threading.Thread):
self.routed = True
def sender_remote_closed(self, sender_link, error):
self._cleanup()
self.link.close()
def sender_closed(self, sender_link):
self.server.sender_link_count -= 1
self.destroy()
self._cleanup()
def sender_failed(self, sender_link, error):
self.sender_closed(sender_link)
class ReceiverLink(pyngus.ReceiverEventHandler):
"""An AMQP Receiving link."""
@ -808,6 +840,7 @@ class FakeBroker(threading.Thread):
conn = self.conn
self.conn = None
conn.receiver_links.remove(self)
conn.dead_links.discard(self)
if self.link:
self.link.destroy()
self.link = None
@ -822,7 +855,10 @@ class FakeBroker(threading.Thread):
def receiver_closed(self, receiver_link):
self.server.receiver_link_count -= 1
self.destroy()
self.conn.dead_links.add(self)
def receiver_failed(self, receiver_link, error):
self.receiver_closed(receiver_link)
def message_received(self, receiver_link, message, handle):
"""Forward this message out the proper sending link."""
@ -863,6 +899,7 @@ class FakeBroker(threading.Thread):
% (self.host, self.port))
self._connections = {}
self._sources = {}
self._pause = threading.Event()
# count of messages forwarded, by messaging pattern
self.direct_count = 0
self.topic_count = 0
@ -878,14 +915,26 @@ class FakeBroker(threading.Thread):
"""Start the server."""
LOG.debug("Starting Test Broker on %s:%d", self.host, self.port)
self._shutdown = False
self._closing = False
self.daemon = True
self._pause.set()
self._my_socket.listen(10)
super(FakeBroker, self).start()
def stop(self):
"""Shutdown the server."""
def pause(self):
self._pause.clear()
os.write(self._wakeup_pipe[1], b'!')
def stop(self, clean=False):
"""Stop the server."""
# If clean is True, attempt a clean shutdown by closing all open
# links/connections first. Otherwise force an immediate disconnect
LOG.debug("Stopping test Broker %s:%d", self.host, self.port)
self._shutdown = True
if clean:
self._closing = 1
else:
self._shutdown = True
self._pause.set()
os.write(self._wakeup_pipe[1], b'!')
self.join()
LOG.debug("Test Broker %s:%d stopped", self.host, self.port)
@ -894,6 +943,7 @@ class FakeBroker(threading.Thread):
"""Process I/O and timer events until the broker is stopped."""
LOG.debug("Test Broker on %s:%d started", self.host, self.port)
while not self._shutdown:
self._pause.wait()
readers, writers, timers = self.container.need_processing()
# map pyngus Connections back to _TestConnections:
@ -915,16 +965,19 @@ class FakeBroker(threading.Thread):
worked = set()
for r in readable:
if r is self._my_socket:
# new inbound connection request received,
# create a new Connection for it:
client_socket, client_address = self._my_socket.accept()
name = str(client_address)
conn = FakeBroker.Connection(self, client_socket, name,
self._sasl_mechanisms,
self._user_credentials,
self._sasl_config_dir,
self._sasl_config_name)
self._connections[conn.name] = conn
# new inbound connection request received
sock, addr = self._my_socket.accept()
if not self._closing:
# create a new Connection for it:
name = str(addr)
conn = FakeBroker.Connection(self, sock, name,
self._sasl_mechanisms,
self._user_credentials,
self._sasl_config_dir,
self._sasl_config_name)
self._connections[conn.name] = conn
else:
sock.close() # drop it
elif r is self._wakeup_pipe[0]:
os.read(self._wakeup_pipe[0], 512)
else:
@ -943,14 +996,26 @@ class FakeBroker(threading.Thread):
w.send_output()
worked.add(w)
# clean up any closed connections:
# clean up any closed connections or links:
while worked:
conn = worked.pop()
if conn.closed:
if conn.connection.closed:
del self._connections[conn.name]
conn.destroy()
else:
while conn.dead_links:
conn.dead_links.pop().destroy()
# Shutting down
if self._closing and not self._connections:
self._shutdown = True
elif self._closing == 1:
# start closing connections
self._closing = 2
for conn in self._connections.values():
conn.connection.close()
# Shutting down. Any open links are just disconnected - the peer will
# see a socket close.
self._my_socket.close()
for conn in self._connections.values():
conn.destroy()