Fix the driver shutdown/failover logic
Re-work much of the shutdown/failover logic. Failover due to a broker hang or broker-initiated shutdown works properly and new unit tests have been added for these cases. The shutdown code has been fixed to correctly shutdown the background thread and release resources. Socket failure and link failure handling has been improved. Fork handling code works properly. Closes-Bug: #1554046 Change-Id: I5aea2f8212e60964fb78eae30078fcf672d7f447
This commit is contained in:
@@ -50,6 +50,46 @@ class Task(object):
|
||||
"""This method will be run on the eventloop thread."""
|
||||
|
||||
|
||||
class Sender(pyngus.SenderEventHandler):
|
||||
"""A single outgoing link to a given address"""
|
||||
def __init__(self, address):
|
||||
self._address = address
|
||||
self._link = None
|
||||
|
||||
def attach(self, connection):
|
||||
# open a link to the destination
|
||||
sname = "Producer-%s:src=%s:tgt=%s" % (uuid.uuid4().hex,
|
||||
self._address,
|
||||
self._address)
|
||||
self._link = connection.create_sender(name=sname,
|
||||
source_address=self._address,
|
||||
target_address=self._address)
|
||||
self._link.open()
|
||||
|
||||
def detach(self):
|
||||
# close the link
|
||||
if self._link:
|
||||
self._link.close()
|
||||
|
||||
def destroy(self):
|
||||
# drop reference to link. The link will be freed when the
|
||||
# connection is destroyed
|
||||
self._link = None
|
||||
|
||||
def send(self, message, callback):
|
||||
# send message out the link, invoke callback when acked
|
||||
self._link.send(message, delivery_callback=callback)
|
||||
|
||||
def sender_remote_closed(self, sender_link, pn_condition):
|
||||
LOG.debug("sender_remote_closed condition=%s", pn_condition)
|
||||
sender_link.close()
|
||||
|
||||
def sender_failed(self, sender_link, error):
|
||||
"""Protocol error occurred."""
|
||||
LOG.error(_LE("Outgoing link to %(addr) failed. error=%(error)"),
|
||||
{"addr": self._address, "error": error})
|
||||
|
||||
|
||||
class Replies(pyngus.ReceiverEventHandler):
|
||||
"""This is the receiving link for all reply messages. Messages are routed
|
||||
to the proper Listener's incoming queue using the correlation-id header in
|
||||
@@ -73,8 +113,13 @@ class Replies(pyngus.ReceiverEventHandler):
|
||||
self._credit = 0
|
||||
self._receiver.open()
|
||||
|
||||
def detach(self):
|
||||
# close the link
|
||||
self._receiver.close()
|
||||
|
||||
def destroy(self):
|
||||
self._correlation = None
|
||||
# drop reference to link. Link will be freed when the connection is
|
||||
# released.
|
||||
self._receiver = None
|
||||
|
||||
def ready(self):
|
||||
@@ -119,11 +164,17 @@ class Replies(pyngus.ReceiverEventHandler):
|
||||
"""This is a Pyngus callback, invoked by Pyngus when the peer of this
|
||||
receiver link has initiated closing the connection.
|
||||
"""
|
||||
# TODO(kgiusti) Unclear if this error will ever occur (as opposed to
|
||||
# the Connection failing instead). Log for now, possibly implement a
|
||||
# recovery strategy if necessary.
|
||||
LOG.error(_LE("Reply subscription closed by peer: %s"),
|
||||
(pn_condition or "no error given"))
|
||||
# TODO(kgiusti) Log for now, possibly implement a recovery strategy if
|
||||
# necessary.
|
||||
if pn_condition:
|
||||
LOG.error(_LE("Reply subscription closed by peer: %s"),
|
||||
pn_condition)
|
||||
receiver.close()
|
||||
|
||||
def receiver_failed(self, receiver_link, error):
|
||||
"""Protocol error occurred."""
|
||||
LOG.error(_LE("Link to reply queue %(addr) failed. error=%(error)"),
|
||||
{"addr": self._address, "error": error})
|
||||
|
||||
def message_received(self, receiver, message, handle):
|
||||
"""This is a Pyngus callback, invoked by Pyngus when a new message
|
||||
@@ -162,14 +213,13 @@ class Server(pyngus.ReceiverEventHandler):
|
||||
self._incoming = incoming
|
||||
self._addresses = addresses
|
||||
self._capacity = 500 # credit per link
|
||||
self._receivers = None
|
||||
self._receivers = []
|
||||
self._id = subscription_id
|
||||
|
||||
def attach(self, connection):
|
||||
"""Create receiver links over the given connection for all the
|
||||
configured addresses.
|
||||
"""
|
||||
self._receivers = []
|
||||
for a in self._addresses:
|
||||
props = {"snd-settle-mode": "settled"}
|
||||
rname = "Consumer-%s:src=%s:tgt=%s" % (uuid.uuid4().hex, a, a)
|
||||
@@ -187,8 +237,18 @@ class Server(pyngus.ReceiverEventHandler):
|
||||
r.open()
|
||||
self._receivers.append(r)
|
||||
|
||||
def destroy(self):
|
||||
self._receivers = None
|
||||
def detach(self):
|
||||
# close the links
|
||||
for receiver in self._receivers:
|
||||
receiver.close()
|
||||
|
||||
def reset(self):
|
||||
# destroy the links, but keep the addresses around since we may be
|
||||
# failing over. Since links are destroyed, this cannot be called from
|
||||
# any of the following ReceiverLink callbacks.
|
||||
for r in self._receivers:
|
||||
r.destroy()
|
||||
self._receivers = []
|
||||
|
||||
# Pyngus ReceiverLink event callbacks:
|
||||
|
||||
@@ -196,12 +256,19 @@ class Server(pyngus.ReceiverEventHandler):
|
||||
"""This is a Pyngus callback, invoked by Pyngus when the peer of this
|
||||
receiver link has initiated closing the connection.
|
||||
"""
|
||||
vals = {
|
||||
"addr": receiver.source_address or receiver.target_address,
|
||||
"err_msg": pn_condition or "no error given"
|
||||
}
|
||||
LOG.error(_LE("Server subscription %(addr)s closed "
|
||||
"by peer: %(err_msg)s"), vals)
|
||||
if pn_condition:
|
||||
vals = {
|
||||
"addr": receiver.source_address or receiver.target_address,
|
||||
"err_msg": pn_condition
|
||||
}
|
||||
LOG.error(_LE("Server subscription %(addr)s closed "
|
||||
"by peer: %(err_msg)s"), vals)
|
||||
receiver.close()
|
||||
|
||||
def receiver_failed(self, receiver_link, error):
|
||||
"""Protocol error occurred."""
|
||||
LOG.error(_LE("Listener link queue %(addr) failed. error=%(error)"),
|
||||
{"addr": self._address, "error": error})
|
||||
|
||||
def message_received(self, receiver, message, handle):
|
||||
"""This is a Pyngus callback, invoked by Pyngus when a new message
|
||||
@@ -323,22 +390,16 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
self._tasks.put(task)
|
||||
self._schedule_task_processing()
|
||||
|
||||
def shutdown(self, wait=True, timeout=None):
|
||||
def shutdown(self, timeout=None):
|
||||
"""Shutdown the messaging service."""
|
||||
LOG.info(_LI("Shutting down the AMQP 1.0 connection"))
|
||||
if self.processor:
|
||||
self.processor.wakeup(lambda: self._start_shutdown())
|
||||
LOG.debug("Waiting for eventloop to exit")
|
||||
self.processor.shutdown(wait, timeout)
|
||||
self.processor.join(timeout)
|
||||
self._hard_reset()
|
||||
self.processor.destroy()
|
||||
self.processor = None
|
||||
self._tasks = None
|
||||
self._senders = None
|
||||
for servers in self._servers.values():
|
||||
for server in servers.values():
|
||||
server.destroy()
|
||||
self._servers.clear()
|
||||
self._socket_connection = None
|
||||
if self._replies:
|
||||
self._replies.destroy()
|
||||
self._replies = None
|
||||
LOG.debug("Eventloop exited, driver shut down")
|
||||
|
||||
# The remaining methods are reserved to run from the eventloop thread only!
|
||||
@@ -426,16 +487,10 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
def _sender(self, address):
|
||||
# if we already have a sender for that address, use it
|
||||
# else establish the sender and cache it
|
||||
if address in self._senders:
|
||||
sender = self._senders[address]
|
||||
else:
|
||||
sname = "Producer-%s:src=%s:tgt=%s" % (uuid.uuid4().hex,
|
||||
address, address)
|
||||
conn = self._socket_connection.connection
|
||||
sender = conn.create_sender(source_address=address,
|
||||
target_address=address,
|
||||
name=sname)
|
||||
sender.open()
|
||||
sender = self._senders.get(address)
|
||||
if sender is None:
|
||||
sender = Sender(address)
|
||||
sender.attach(self._socket_connection.connection)
|
||||
self._senders[address] = sender
|
||||
return sender
|
||||
|
||||
@@ -446,7 +501,7 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
"""
|
||||
address = str(addr)
|
||||
message.address = address
|
||||
self._sender(address).send(message, delivery_callback=callback)
|
||||
self._sender(address).send(message, callback)
|
||||
|
||||
def _server_address(self, target):
|
||||
return self._concatenate([self.server_request_prefix,
|
||||
@@ -540,17 +595,24 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
self._replies and self._replies.ready())
|
||||
|
||||
def _start_shutdown(self):
|
||||
"""Called when the driver destroys the controller, this method attempts
|
||||
to cleanly close the AMQP connection to the peer.
|
||||
"""Called when the application is closing the transport.
|
||||
Attempt to cleanly flush/close all links.
|
||||
"""
|
||||
LOG.info(_LI("Shutting down AMQP connection"))
|
||||
self._closing = True
|
||||
if self._socket_connection.connection.active:
|
||||
if (self._socket_connection
|
||||
and self._socket_connection.connection
|
||||
and self._socket_connection.connection.active):
|
||||
# try a clean shutdown
|
||||
for sender in self._senders.values():
|
||||
sender.detach()
|
||||
for servers in self._servers.values():
|
||||
for server in servers.values():
|
||||
server.detach()
|
||||
self._replies.detach()
|
||||
self._socket_connection.connection.close()
|
||||
else:
|
||||
# don't wait for a close from the remote, may never happen
|
||||
self._complete_shutdown()
|
||||
self.processor.shutdown()
|
||||
|
||||
# reply link active callback:
|
||||
|
||||
@@ -567,7 +629,7 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
|
||||
def socket_error(self, error):
|
||||
"""Called by eventloop when a socket error occurs."""
|
||||
LOG.debug("Socket failure: %s", error)
|
||||
LOG.error(_LE("Socket failure: %s"), error)
|
||||
self._handle_connection_loss()
|
||||
|
||||
# Pyngus connection event callbacks (and their helpers), all invoked from
|
||||
@@ -612,14 +674,12 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
"""This is a Pyngus callback, invoked by Pyngus when the peer has
|
||||
requested that the connection be closed.
|
||||
"""
|
||||
if not self._closing:
|
||||
# The messaging service/broker is trying to shut down the
|
||||
# connection. Acknowledge the close, and try to reconnect/failover
|
||||
# later once the connection has closed (connection_closed is
|
||||
# called).
|
||||
LOG.info(_LI("Connection closed by peer: %s"),
|
||||
reason or "no reason given")
|
||||
self._socket_connection.connection.close()
|
||||
# The messaging service/broker is trying to shut down the
|
||||
# connection. Acknowledge the close, and try to reconnect/failover
|
||||
# later once the connection has closed (connection_closed is called).
|
||||
if reason:
|
||||
LOG.info(_LI("Connection closed by peer: %s"), reason)
|
||||
self._socket_connection.connection.close()
|
||||
|
||||
def sasl_done(self, connection, pn_sasl, outcome):
|
||||
"""This is a Pyngus callback invoked when the SASL handshake
|
||||
@@ -635,28 +695,19 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
'username': self.hosts.current.username})
|
||||
# connection failure will be handled later
|
||||
|
||||
def _complete_shutdown(self):
|
||||
"""The AMQP Connection has closed, and the driver shutdown is complete.
|
||||
Clean up controller resources and exit.
|
||||
"""
|
||||
self._socket_connection.close()
|
||||
self.processor.shutdown()
|
||||
LOG.info(_LI("Messaging has shutdown"))
|
||||
|
||||
def _handle_connection_loss(self):
|
||||
"""The connection to the messaging service has been lost. Try to
|
||||
reestablish the connection/failover.
|
||||
reestablish the connection/failover if not shutting down the driver.
|
||||
"""
|
||||
if self._closing:
|
||||
# we're in the middle of shutting down the driver anyways,
|
||||
# just consider it done:
|
||||
self._complete_shutdown()
|
||||
self.processor.shutdown()
|
||||
else:
|
||||
# for some reason, we've lost the connection to the messaging
|
||||
# service. Try to re-establish the connection:
|
||||
if not self._reconnecting:
|
||||
self._reconnecting = True
|
||||
self._replies = None
|
||||
LOG.info(_LI("delaying reconnect attempt for %d seconds"),
|
||||
self._delay)
|
||||
self.processor.schedule(lambda: self._do_reconnect(),
|
||||
@@ -668,14 +719,29 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
"""Invoked on connection/socket failure, failover and re-connect to the
|
||||
messaging service.
|
||||
"""
|
||||
# note well: since this method destroys the connection, it cannot be
|
||||
# invoked directly from a pyngus callback. Use processor.schedule() to
|
||||
# run this method on the main loop instead.
|
||||
if not self._closing:
|
||||
self._hard_reset()
|
||||
self._reconnecting = False
|
||||
self._senders = {}
|
||||
self._socket_connection.reset()
|
||||
host = self.hosts.next()
|
||||
LOG.info(_LI("Reconnecting to: %(hostname)s:%(port)s"),
|
||||
{'hostname': host.hostname, 'port': host.port})
|
||||
self._socket_connection.connect(host)
|
||||
|
||||
def _hard_reset(self):
|
||||
"""Reset the controller to its pre-connection state"""
|
||||
# note well: since this method destroys the connection, it cannot be
|
||||
# invoked directly from a pyngus callback. Use processor.schedule() to
|
||||
# run this method on the main loop instead.
|
||||
for sender in self._senders.values():
|
||||
sender.destroy()
|
||||
self._senders.clear()
|
||||
for servers in self._servers.values():
|
||||
for server in servers.values():
|
||||
# discard links, but keep servers around to re-attach if
|
||||
# failing over
|
||||
server.reset()
|
||||
if self._replies:
|
||||
self._replies.destroy()
|
||||
self._replies = None
|
||||
if self._socket_connection:
|
||||
self._socket_connection.reset()
|
||||
|
||||
@@ -199,19 +199,28 @@ class ProtonDriver(base.BaseDriver):
|
||||
"""
|
||||
def wrap(self, *args, **kws):
|
||||
with self._lock:
|
||||
# check to see if a fork was done after the Controller and its
|
||||
# I/O thread was spawned. old_pid will be None the first time
|
||||
# this is called which will cause the Controller to be created.
|
||||
old_pid = self._pid
|
||||
self._pid = os.getpid()
|
||||
|
||||
if old_pid != self._pid:
|
||||
if self._ctrl is not None:
|
||||
LOG.warning(_LW("Process forked after connection "
|
||||
"established!"))
|
||||
self._ctrl.shutdown(wait=False)
|
||||
# Create a Controller that connects to the messaging service:
|
||||
self._ctrl = controller.Controller(self._hosts,
|
||||
self._default_exchange,
|
||||
self._conf)
|
||||
self._ctrl.connect()
|
||||
if old_pid != self._pid:
|
||||
if self._ctrl is not None:
|
||||
# fork was called after the Controller was created, and
|
||||
# we are now executing as the child process. Do not
|
||||
# touch the existing Controller - it is owned by the
|
||||
# parent. Best we can do here is simply drop it and
|
||||
# hope we get lucky.
|
||||
LOG.warning(_LW("Process forked after connection "
|
||||
"established!"))
|
||||
self._ctrl = None
|
||||
# Create a Controller that connects to the messaging
|
||||
# service:
|
||||
self._ctrl = controller.Controller(self._hosts,
|
||||
self._default_exchange,
|
||||
self._conf)
|
||||
self._ctrl.connect()
|
||||
return func(self, *args, **kws)
|
||||
return wrap
|
||||
|
||||
|
||||
@@ -76,7 +76,7 @@ class _SocketConnection(object):
|
||||
except (socket.timeout, socket.error) as e:
|
||||
# pyngus handles EAGAIN/EWOULDBLOCK and EINTER
|
||||
self.connection.close_input()
|
||||
self.connection.close()
|
||||
self.connection.close_output()
|
||||
self._handler.socket_error(str(e))
|
||||
return pyngus.Connection.EOS
|
||||
|
||||
@@ -90,7 +90,7 @@ class _SocketConnection(object):
|
||||
except (socket.timeout, socket.error) as e:
|
||||
# pyngus handles EAGAIN/EWOULDBLOCK and EINTER
|
||||
self.connection.close_output()
|
||||
self.connection.close()
|
||||
self.connection.close_input()
|
||||
self._handler.socket_error(str(e))
|
||||
return pyngus.Connection.EOS
|
||||
|
||||
@@ -161,6 +161,7 @@ class _SocketConnection(object):
|
||||
def close(self):
|
||||
if self.socket:
|
||||
self.socket.close()
|
||||
self.socket = None
|
||||
|
||||
|
||||
class Schedule(object):
|
||||
@@ -257,15 +258,18 @@ class Thread(threading.Thread):
|
||||
"""
|
||||
self._requests.wakeup(request)
|
||||
|
||||
def shutdown(self, wait=True, timeout=None):
|
||||
def shutdown(self):
|
||||
"""Shutdown the eventloop thread. Thread safe.
|
||||
"""
|
||||
LOG.debug("eventloop shutdown requested")
|
||||
self._shutdown = True
|
||||
self.wakeup()
|
||||
if wait:
|
||||
self.join(timeout=timeout)
|
||||
LOG.debug("eventloop shutdown complete")
|
||||
|
||||
def destroy(self):
|
||||
# release the container. This can only be called after the eventloop
|
||||
# thread exited
|
||||
self._container.destroy()
|
||||
self._container = None
|
||||
|
||||
# the following methods are not thread safe - they must be run from the
|
||||
# eventloop thread
|
||||
@@ -322,12 +326,6 @@ class Thread(threading.Thread):
|
||||
continue
|
||||
raise # assuming fatal...
|
||||
|
||||
# don't process any I/O or timers if woken up by a shutdown:
|
||||
# if we've been forked we don't want to do I/O on the parent's
|
||||
# sockets
|
||||
if self._shutdown:
|
||||
break
|
||||
|
||||
readable, writable, ignore = results
|
||||
|
||||
for r in readable:
|
||||
@@ -345,4 +343,3 @@ class Thread(threading.Thread):
|
||||
|
||||
LOG.info(_LI("eventloop thread exiting, container=%s"),
|
||||
self._container.name)
|
||||
self._container.destroy()
|
||||
|
||||
@@ -516,9 +516,9 @@ class TestFailover(test_utils.BaseTestCase):
|
||||
if broker.isAlive():
|
||||
broker.stop()
|
||||
|
||||
def test_broker_failover(self):
|
||||
"""Simulate failover of one broker to another."""
|
||||
def _failover(self, fail_brokers):
|
||||
self._brokers[0].start()
|
||||
# self.config(trace=True, group="oslo_messaging_amqp")
|
||||
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
|
||||
|
||||
target = oslo_messaging.Target(topic="my-topic")
|
||||
@@ -541,11 +541,10 @@ class TestFailover(test_utils.BaseTestCase):
|
||||
self.assertEqual(self._brokers[0].topic_count, 1)
|
||||
self.assertEqual(self._brokers[0].direct_count, 1)
|
||||
|
||||
# fail broker 0 and start broker 1:
|
||||
self._brokers[0].stop()
|
||||
self._brokers[1].start()
|
||||
# invoke failover method
|
||||
fail_brokers(self._brokers[0], self._brokers[1])
|
||||
|
||||
# wait for listener links to re-establish
|
||||
# wait for listener links to re-establish on broker 1
|
||||
# 4 = 3 links per listener + 1 for the global reply queue
|
||||
predicate = lambda: self._brokers[1].sender_link_count == 4
|
||||
_wait_until(predicate, 30)
|
||||
@@ -571,10 +570,40 @@ class TestFailover(test_utils.BaseTestCase):
|
||||
self._brokers[1].stop()
|
||||
driver.cleanup()
|
||||
|
||||
def test_broker_crash(self):
|
||||
"""Simulate a failure of one broker."""
|
||||
def _meth(broker0, broker1):
|
||||
# fail broker 0 and start broker 1:
|
||||
broker0.stop()
|
||||
time.sleep(0.5)
|
||||
broker1.start()
|
||||
self._failover(_meth)
|
||||
|
||||
def test_broker_shutdown(self):
|
||||
"""Simulate a normal shutdown of a broker."""
|
||||
def _meth(broker0, broker1):
|
||||
broker0.stop(clean=True)
|
||||
time.sleep(0.5)
|
||||
broker1.start()
|
||||
self._failover(_meth)
|
||||
|
||||
def test_heartbeat_failover(self):
|
||||
"""Simulate broker heartbeat timeout."""
|
||||
def _meth(broker0, broker1):
|
||||
# keep alive heartbeat from broker 0 will stop, which should force
|
||||
# failover to broker 1 in about two seconds
|
||||
broker0.pause()
|
||||
broker1.start()
|
||||
self.config(idle_timeout=2, group="oslo_messaging_amqp")
|
||||
self._failover(_meth)
|
||||
self._brokers[0].stop()
|
||||
|
||||
def test_listener_failover(self):
|
||||
"""Verify that Listeners are re-established after failover.
|
||||
"""Verify that Listeners sharing the same topic are re-established
|
||||
after failover.
|
||||
"""
|
||||
self._brokers[0].start()
|
||||
# self.config(trace=True, group="oslo_messaging_amqp")
|
||||
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
|
||||
|
||||
target = oslo_messaging.Target(topic="my-topic")
|
||||
@@ -596,11 +625,11 @@ class TestFailover(test_utils.BaseTestCase):
|
||||
_wait_until(predicate, 30)
|
||||
self.assertTrue(predicate())
|
||||
|
||||
# fail broker 0 and start broker 1:
|
||||
self._brokers[0].stop()
|
||||
# start broker 1 then shutdown broker 0:
|
||||
self._brokers[1].start()
|
||||
self._brokers[0].stop(clean=True)
|
||||
|
||||
# wait again for 7 sending links to re-establish
|
||||
# wait again for 7 sending links to re-establish on broker 1
|
||||
predicate = lambda: self._brokers[1].sender_link_count == 7
|
||||
_wait_until(predicate, 30)
|
||||
self.assertTrue(predicate())
|
||||
@@ -617,8 +646,8 @@ class TestFailover(test_utils.BaseTestCase):
|
||||
listener2.join(timeout=30)
|
||||
self.assertFalse(listener1.isAlive() or listener2.isAlive())
|
||||
|
||||
self._brokers[1].stop()
|
||||
driver.cleanup()
|
||||
self._brokers[1].stop()
|
||||
|
||||
|
||||
class FakeBroker(threading.Thread):
|
||||
@@ -659,23 +688,19 @@ class FakeBroker(threading.Thread):
|
||||
self.connection.open()
|
||||
self.sender_links = set()
|
||||
self.receiver_links = set()
|
||||
self.closed = False
|
||||
self.dead_links = set()
|
||||
|
||||
def destroy(self):
|
||||
"""Destroy the test connection."""
|
||||
# destroy modifies the set, so make a copy
|
||||
tmp = self.sender_links.copy()
|
||||
while tmp:
|
||||
link = tmp.pop()
|
||||
link.destroy()
|
||||
# destroy modifies the set, so make a copy
|
||||
tmp = self.receiver_links.copy()
|
||||
while tmp:
|
||||
link = tmp.pop()
|
||||
for link in self.sender_links | self.receiver_links:
|
||||
link.destroy()
|
||||
self.sender_links.clear()
|
||||
self.receiver_links.clear()
|
||||
self.dead_links.clear()
|
||||
self.connection.destroy()
|
||||
self.connection = None
|
||||
self.socket.close()
|
||||
self.socket = None
|
||||
|
||||
def fileno(self):
|
||||
"""Allows use of this in a select() call."""
|
||||
@@ -685,18 +710,23 @@ class FakeBroker(threading.Thread):
|
||||
"""Called when socket is read-ready."""
|
||||
try:
|
||||
pyngus.read_socket_input(self.connection, self.socket)
|
||||
self.connection.process(time.time())
|
||||
except socket.error:
|
||||
pass
|
||||
self.connection.process(time.time())
|
||||
self._socket_error()
|
||||
|
||||
def send_output(self):
|
||||
"""Called when socket is write-ready."""
|
||||
try:
|
||||
pyngus.write_socket_output(self.connection,
|
||||
self.socket)
|
||||
self.connection.process(time.time())
|
||||
except socket.error:
|
||||
pass
|
||||
self.connection.process(time.time())
|
||||
self._socket_error()
|
||||
|
||||
def _socket_error(self):
|
||||
self.connection.close_input()
|
||||
self.connection.close_output()
|
||||
# the broker will clean up in its main loop
|
||||
|
||||
# Pyngus ConnectionEventHandler callbacks:
|
||||
|
||||
@@ -710,7 +740,6 @@ class FakeBroker(threading.Thread):
|
||||
def connection_closed(self, connection):
|
||||
"""Connection close completed."""
|
||||
self.server.connection_count -= 1
|
||||
self.closed = True # main loop will destroy
|
||||
|
||||
def connection_failed(self, connection, error):
|
||||
"""Connection failure detected."""
|
||||
@@ -757,10 +786,10 @@ class FakeBroker(threading.Thread):
|
||||
|
||||
def destroy(self):
|
||||
"""Destroy the link."""
|
||||
self._cleanup()
|
||||
conn = self.conn
|
||||
self.conn = None
|
||||
conn.sender_links.remove(self)
|
||||
conn.dead_links.discard(self)
|
||||
if self.link:
|
||||
self.link.destroy()
|
||||
self.link = None
|
||||
@@ -774,6 +803,7 @@ class FakeBroker(threading.Thread):
|
||||
self.server.remove_route(self.link.source_address,
|
||||
self)
|
||||
self.routed = False
|
||||
self.conn.dead_links.add(self)
|
||||
|
||||
# Pyngus SenderEventHandler callbacks:
|
||||
|
||||
@@ -783,12 +813,14 @@ class FakeBroker(threading.Thread):
|
||||
self.routed = True
|
||||
|
||||
def sender_remote_closed(self, sender_link, error):
|
||||
self._cleanup()
|
||||
self.link.close()
|
||||
|
||||
def sender_closed(self, sender_link):
|
||||
self.server.sender_link_count -= 1
|
||||
self.destroy()
|
||||
self._cleanup()
|
||||
|
||||
def sender_failed(self, sender_link, error):
|
||||
self.sender_closed(sender_link)
|
||||
|
||||
class ReceiverLink(pyngus.ReceiverEventHandler):
|
||||
"""An AMQP Receiving link."""
|
||||
@@ -808,6 +840,7 @@ class FakeBroker(threading.Thread):
|
||||
conn = self.conn
|
||||
self.conn = None
|
||||
conn.receiver_links.remove(self)
|
||||
conn.dead_links.discard(self)
|
||||
if self.link:
|
||||
self.link.destroy()
|
||||
self.link = None
|
||||
@@ -822,7 +855,10 @@ class FakeBroker(threading.Thread):
|
||||
|
||||
def receiver_closed(self, receiver_link):
|
||||
self.server.receiver_link_count -= 1
|
||||
self.destroy()
|
||||
self.conn.dead_links.add(self)
|
||||
|
||||
def receiver_failed(self, receiver_link, error):
|
||||
self.receiver_closed(receiver_link)
|
||||
|
||||
def message_received(self, receiver_link, message, handle):
|
||||
"""Forward this message out the proper sending link."""
|
||||
@@ -863,6 +899,7 @@ class FakeBroker(threading.Thread):
|
||||
% (self.host, self.port))
|
||||
self._connections = {}
|
||||
self._sources = {}
|
||||
self._pause = threading.Event()
|
||||
# count of messages forwarded, by messaging pattern
|
||||
self.direct_count = 0
|
||||
self.topic_count = 0
|
||||
@@ -878,14 +915,26 @@ class FakeBroker(threading.Thread):
|
||||
"""Start the server."""
|
||||
LOG.debug("Starting Test Broker on %s:%d", self.host, self.port)
|
||||
self._shutdown = False
|
||||
self._closing = False
|
||||
self.daemon = True
|
||||
self._pause.set()
|
||||
self._my_socket.listen(10)
|
||||
super(FakeBroker, self).start()
|
||||
|
||||
def stop(self):
|
||||
"""Shutdown the server."""
|
||||
def pause(self):
|
||||
self._pause.clear()
|
||||
os.write(self._wakeup_pipe[1], b'!')
|
||||
|
||||
def stop(self, clean=False):
|
||||
"""Stop the server."""
|
||||
# If clean is True, attempt a clean shutdown by closing all open
|
||||
# links/connections first. Otherwise force an immediate disconnect
|
||||
LOG.debug("Stopping test Broker %s:%d", self.host, self.port)
|
||||
self._shutdown = True
|
||||
if clean:
|
||||
self._closing = 1
|
||||
else:
|
||||
self._shutdown = True
|
||||
self._pause.set()
|
||||
os.write(self._wakeup_pipe[1], b'!')
|
||||
self.join()
|
||||
LOG.debug("Test Broker %s:%d stopped", self.host, self.port)
|
||||
@@ -894,6 +943,7 @@ class FakeBroker(threading.Thread):
|
||||
"""Process I/O and timer events until the broker is stopped."""
|
||||
LOG.debug("Test Broker on %s:%d started", self.host, self.port)
|
||||
while not self._shutdown:
|
||||
self._pause.wait()
|
||||
readers, writers, timers = self.container.need_processing()
|
||||
|
||||
# map pyngus Connections back to _TestConnections:
|
||||
@@ -915,16 +965,19 @@ class FakeBroker(threading.Thread):
|
||||
worked = set()
|
||||
for r in readable:
|
||||
if r is self._my_socket:
|
||||
# new inbound connection request received,
|
||||
# create a new Connection for it:
|
||||
client_socket, client_address = self._my_socket.accept()
|
||||
name = str(client_address)
|
||||
conn = FakeBroker.Connection(self, client_socket, name,
|
||||
self._sasl_mechanisms,
|
||||
self._user_credentials,
|
||||
self._sasl_config_dir,
|
||||
self._sasl_config_name)
|
||||
self._connections[conn.name] = conn
|
||||
# new inbound connection request received
|
||||
sock, addr = self._my_socket.accept()
|
||||
if not self._closing:
|
||||
# create a new Connection for it:
|
||||
name = str(addr)
|
||||
conn = FakeBroker.Connection(self, sock, name,
|
||||
self._sasl_mechanisms,
|
||||
self._user_credentials,
|
||||
self._sasl_config_dir,
|
||||
self._sasl_config_name)
|
||||
self._connections[conn.name] = conn
|
||||
else:
|
||||
sock.close() # drop it
|
||||
elif r is self._wakeup_pipe[0]:
|
||||
os.read(self._wakeup_pipe[0], 512)
|
||||
else:
|
||||
@@ -943,14 +996,26 @@ class FakeBroker(threading.Thread):
|
||||
w.send_output()
|
||||
worked.add(w)
|
||||
|
||||
# clean up any closed connections:
|
||||
# clean up any closed connections or links:
|
||||
while worked:
|
||||
conn = worked.pop()
|
||||
if conn.closed:
|
||||
if conn.connection.closed:
|
||||
del self._connections[conn.name]
|
||||
conn.destroy()
|
||||
else:
|
||||
while conn.dead_links:
|
||||
conn.dead_links.pop().destroy()
|
||||
|
||||
# Shutting down
|
||||
if self._closing and not self._connections:
|
||||
self._shutdown = True
|
||||
elif self._closing == 1:
|
||||
# start closing connections
|
||||
self._closing = 2
|
||||
for conn in self._connections.values():
|
||||
conn.connection.close()
|
||||
|
||||
# Shutting down. Any open links are just disconnected - the peer will
|
||||
# see a socket close.
|
||||
self._my_socket.close()
|
||||
for conn in self._connections.values():
|
||||
conn.destroy()
|
||||
|
||||
Reference in New Issue
Block a user