diff --git a/oslo_messaging/_drivers/protocols/amqp/controller.py b/oslo_messaging/_drivers/protocols/amqp/controller.py index 1dd91f7ec..91a5b0baa 100644 --- a/oslo_messaging/_drivers/protocols/amqp/controller.py +++ b/oslo_messaging/_drivers/protocols/amqp/controller.py @@ -50,6 +50,46 @@ class Task(object): """This method will be run on the eventloop thread.""" +class Sender(pyngus.SenderEventHandler): + """A single outgoing link to a given address""" + def __init__(self, address): + self._address = address + self._link = None + + def attach(self, connection): + # open a link to the destination + sname = "Producer-%s:src=%s:tgt=%s" % (uuid.uuid4().hex, + self._address, + self._address) + self._link = connection.create_sender(name=sname, + source_address=self._address, + target_address=self._address) + self._link.open() + + def detach(self): + # close the link + if self._link: + self._link.close() + + def destroy(self): + # drop reference to link. The link will be freed when the + # connection is destroyed + self._link = None + + def send(self, message, callback): + # send message out the link, invoke callback when acked + self._link.send(message, delivery_callback=callback) + + def sender_remote_closed(self, sender_link, pn_condition): + LOG.debug("sender_remote_closed condition=%s", pn_condition) + sender_link.close() + + def sender_failed(self, sender_link, error): + """Protocol error occurred.""" + LOG.error(_LE("Outgoing link to %(addr) failed. error=%(error)"), + {"addr": self._address, "error": error}) + + class Replies(pyngus.ReceiverEventHandler): """This is the receiving link for all reply messages. Messages are routed to the proper Listener's incoming queue using the correlation-id header in @@ -73,8 +113,13 @@ class Replies(pyngus.ReceiverEventHandler): self._credit = 0 self._receiver.open() + def detach(self): + # close the link + self._receiver.close() + def destroy(self): - self._correlation = None + # drop reference to link. Link will be freed when the connection is + # released. self._receiver = None def ready(self): @@ -119,11 +164,17 @@ class Replies(pyngus.ReceiverEventHandler): """This is a Pyngus callback, invoked by Pyngus when the peer of this receiver link has initiated closing the connection. """ - # TODO(kgiusti) Unclear if this error will ever occur (as opposed to - # the Connection failing instead). Log for now, possibly implement a - # recovery strategy if necessary. - LOG.error(_LE("Reply subscription closed by peer: %s"), - (pn_condition or "no error given")) + # TODO(kgiusti) Log for now, possibly implement a recovery strategy if + # necessary. + if pn_condition: + LOG.error(_LE("Reply subscription closed by peer: %s"), + pn_condition) + receiver.close() + + def receiver_failed(self, receiver_link, error): + """Protocol error occurred.""" + LOG.error(_LE("Link to reply queue %(addr) failed. error=%(error)"), + {"addr": self._address, "error": error}) def message_received(self, receiver, message, handle): """This is a Pyngus callback, invoked by Pyngus when a new message @@ -162,14 +213,13 @@ class Server(pyngus.ReceiverEventHandler): self._incoming = incoming self._addresses = addresses self._capacity = 500 # credit per link - self._receivers = None + self._receivers = [] self._id = subscription_id def attach(self, connection): """Create receiver links over the given connection for all the configured addresses. """ - self._receivers = [] for a in self._addresses: props = {"snd-settle-mode": "settled"} rname = "Consumer-%s:src=%s:tgt=%s" % (uuid.uuid4().hex, a, a) @@ -187,8 +237,18 @@ class Server(pyngus.ReceiverEventHandler): r.open() self._receivers.append(r) - def destroy(self): - self._receivers = None + def detach(self): + # close the links + for receiver in self._receivers: + receiver.close() + + def reset(self): + # destroy the links, but keep the addresses around since we may be + # failing over. Since links are destroyed, this cannot be called from + # any of the following ReceiverLink callbacks. + for r in self._receivers: + r.destroy() + self._receivers = [] # Pyngus ReceiverLink event callbacks: @@ -196,12 +256,19 @@ class Server(pyngus.ReceiverEventHandler): """This is a Pyngus callback, invoked by Pyngus when the peer of this receiver link has initiated closing the connection. """ - vals = { - "addr": receiver.source_address or receiver.target_address, - "err_msg": pn_condition or "no error given" - } - LOG.error(_LE("Server subscription %(addr)s closed " - "by peer: %(err_msg)s"), vals) + if pn_condition: + vals = { + "addr": receiver.source_address or receiver.target_address, + "err_msg": pn_condition + } + LOG.error(_LE("Server subscription %(addr)s closed " + "by peer: %(err_msg)s"), vals) + receiver.close() + + def receiver_failed(self, receiver_link, error): + """Protocol error occurred.""" + LOG.error(_LE("Listener link queue %(addr) failed. error=%(error)"), + {"addr": self._address, "error": error}) def message_received(self, receiver, message, handle): """This is a Pyngus callback, invoked by Pyngus when a new message @@ -323,22 +390,16 @@ class Controller(pyngus.ConnectionEventHandler): self._tasks.put(task) self._schedule_task_processing() - def shutdown(self, wait=True, timeout=None): + def shutdown(self, timeout=None): """Shutdown the messaging service.""" + LOG.info(_LI("Shutting down the AMQP 1.0 connection")) if self.processor: + self.processor.wakeup(lambda: self._start_shutdown()) LOG.debug("Waiting for eventloop to exit") - self.processor.shutdown(wait, timeout) + self.processor.join(timeout) + self._hard_reset() + self.processor.destroy() self.processor = None - self._tasks = None - self._senders = None - for servers in self._servers.values(): - for server in servers.values(): - server.destroy() - self._servers.clear() - self._socket_connection = None - if self._replies: - self._replies.destroy() - self._replies = None LOG.debug("Eventloop exited, driver shut down") # The remaining methods are reserved to run from the eventloop thread only! @@ -426,16 +487,10 @@ class Controller(pyngus.ConnectionEventHandler): def _sender(self, address): # if we already have a sender for that address, use it # else establish the sender and cache it - if address in self._senders: - sender = self._senders[address] - else: - sname = "Producer-%s:src=%s:tgt=%s" % (uuid.uuid4().hex, - address, address) - conn = self._socket_connection.connection - sender = conn.create_sender(source_address=address, - target_address=address, - name=sname) - sender.open() + sender = self._senders.get(address) + if sender is None: + sender = Sender(address) + sender.attach(self._socket_connection.connection) self._senders[address] = sender return sender @@ -446,7 +501,7 @@ class Controller(pyngus.ConnectionEventHandler): """ address = str(addr) message.address = address - self._sender(address).send(message, delivery_callback=callback) + self._sender(address).send(message, callback) def _server_address(self, target): return self._concatenate([self.server_request_prefix, @@ -540,17 +595,24 @@ class Controller(pyngus.ConnectionEventHandler): self._replies and self._replies.ready()) def _start_shutdown(self): - """Called when the driver destroys the controller, this method attempts - to cleanly close the AMQP connection to the peer. + """Called when the application is closing the transport. + Attempt to cleanly flush/close all links. """ - LOG.info(_LI("Shutting down AMQP connection")) self._closing = True - if self._socket_connection.connection.active: + if (self._socket_connection + and self._socket_connection.connection + and self._socket_connection.connection.active): # try a clean shutdown + for sender in self._senders.values(): + sender.detach() + for servers in self._servers.values(): + for server in servers.values(): + server.detach() + self._replies.detach() self._socket_connection.connection.close() else: # don't wait for a close from the remote, may never happen - self._complete_shutdown() + self.processor.shutdown() # reply link active callback: @@ -567,7 +629,7 @@ class Controller(pyngus.ConnectionEventHandler): def socket_error(self, error): """Called by eventloop when a socket error occurs.""" - LOG.debug("Socket failure: %s", error) + LOG.error(_LE("Socket failure: %s"), error) self._handle_connection_loss() # Pyngus connection event callbacks (and their helpers), all invoked from @@ -612,14 +674,12 @@ class Controller(pyngus.ConnectionEventHandler): """This is a Pyngus callback, invoked by Pyngus when the peer has requested that the connection be closed. """ - if not self._closing: - # The messaging service/broker is trying to shut down the - # connection. Acknowledge the close, and try to reconnect/failover - # later once the connection has closed (connection_closed is - # called). - LOG.info(_LI("Connection closed by peer: %s"), - reason or "no reason given") - self._socket_connection.connection.close() + # The messaging service/broker is trying to shut down the + # connection. Acknowledge the close, and try to reconnect/failover + # later once the connection has closed (connection_closed is called). + if reason: + LOG.info(_LI("Connection closed by peer: %s"), reason) + self._socket_connection.connection.close() def sasl_done(self, connection, pn_sasl, outcome): """This is a Pyngus callback invoked when the SASL handshake @@ -635,28 +695,19 @@ class Controller(pyngus.ConnectionEventHandler): 'username': self.hosts.current.username}) # connection failure will be handled later - def _complete_shutdown(self): - """The AMQP Connection has closed, and the driver shutdown is complete. - Clean up controller resources and exit. - """ - self._socket_connection.close() - self.processor.shutdown() - LOG.info(_LI("Messaging has shutdown")) - def _handle_connection_loss(self): """The connection to the messaging service has been lost. Try to - reestablish the connection/failover. + reestablish the connection/failover if not shutting down the driver. """ if self._closing: # we're in the middle of shutting down the driver anyways, # just consider it done: - self._complete_shutdown() + self.processor.shutdown() else: # for some reason, we've lost the connection to the messaging # service. Try to re-establish the connection: if not self._reconnecting: self._reconnecting = True - self._replies = None LOG.info(_LI("delaying reconnect attempt for %d seconds"), self._delay) self.processor.schedule(lambda: self._do_reconnect(), @@ -668,14 +719,29 @@ class Controller(pyngus.ConnectionEventHandler): """Invoked on connection/socket failure, failover and re-connect to the messaging service. """ - # note well: since this method destroys the connection, it cannot be - # invoked directly from a pyngus callback. Use processor.schedule() to - # run this method on the main loop instead. if not self._closing: + self._hard_reset() self._reconnecting = False - self._senders = {} - self._socket_connection.reset() host = self.hosts.next() LOG.info(_LI("Reconnecting to: %(hostname)s:%(port)s"), {'hostname': host.hostname, 'port': host.port}) self._socket_connection.connect(host) + + def _hard_reset(self): + """Reset the controller to its pre-connection state""" + # note well: since this method destroys the connection, it cannot be + # invoked directly from a pyngus callback. Use processor.schedule() to + # run this method on the main loop instead. + for sender in self._senders.values(): + sender.destroy() + self._senders.clear() + for servers in self._servers.values(): + for server in servers.values(): + # discard links, but keep servers around to re-attach if + # failing over + server.reset() + if self._replies: + self._replies.destroy() + self._replies = None + if self._socket_connection: + self._socket_connection.reset() diff --git a/oslo_messaging/_drivers/protocols/amqp/driver.py b/oslo_messaging/_drivers/protocols/amqp/driver.py index 68fbbf4d8..9194cccca 100644 --- a/oslo_messaging/_drivers/protocols/amqp/driver.py +++ b/oslo_messaging/_drivers/protocols/amqp/driver.py @@ -199,19 +199,28 @@ class ProtonDriver(base.BaseDriver): """ def wrap(self, *args, **kws): with self._lock: + # check to see if a fork was done after the Controller and its + # I/O thread was spawned. old_pid will be None the first time + # this is called which will cause the Controller to be created. old_pid = self._pid self._pid = os.getpid() - if old_pid != self._pid: - if self._ctrl is not None: - LOG.warning(_LW("Process forked after connection " - "established!")) - self._ctrl.shutdown(wait=False) - # Create a Controller that connects to the messaging service: - self._ctrl = controller.Controller(self._hosts, - self._default_exchange, - self._conf) - self._ctrl.connect() + if old_pid != self._pid: + if self._ctrl is not None: + # fork was called after the Controller was created, and + # we are now executing as the child process. Do not + # touch the existing Controller - it is owned by the + # parent. Best we can do here is simply drop it and + # hope we get lucky. + LOG.warning(_LW("Process forked after connection " + "established!")) + self._ctrl = None + # Create a Controller that connects to the messaging + # service: + self._ctrl = controller.Controller(self._hosts, + self._default_exchange, + self._conf) + self._ctrl.connect() return func(self, *args, **kws) return wrap diff --git a/oslo_messaging/_drivers/protocols/amqp/eventloop.py b/oslo_messaging/_drivers/protocols/amqp/eventloop.py index a9a828da4..dfe073034 100644 --- a/oslo_messaging/_drivers/protocols/amqp/eventloop.py +++ b/oslo_messaging/_drivers/protocols/amqp/eventloop.py @@ -76,7 +76,7 @@ class _SocketConnection(object): except (socket.timeout, socket.error) as e: # pyngus handles EAGAIN/EWOULDBLOCK and EINTER self.connection.close_input() - self.connection.close() + self.connection.close_output() self._handler.socket_error(str(e)) return pyngus.Connection.EOS @@ -90,7 +90,7 @@ class _SocketConnection(object): except (socket.timeout, socket.error) as e: # pyngus handles EAGAIN/EWOULDBLOCK and EINTER self.connection.close_output() - self.connection.close() + self.connection.close_input() self._handler.socket_error(str(e)) return pyngus.Connection.EOS @@ -161,6 +161,7 @@ class _SocketConnection(object): def close(self): if self.socket: self.socket.close() + self.socket = None class Schedule(object): @@ -257,15 +258,18 @@ class Thread(threading.Thread): """ self._requests.wakeup(request) - def shutdown(self, wait=True, timeout=None): + def shutdown(self): """Shutdown the eventloop thread. Thread safe. """ LOG.debug("eventloop shutdown requested") self._shutdown = True self.wakeup() - if wait: - self.join(timeout=timeout) - LOG.debug("eventloop shutdown complete") + + def destroy(self): + # release the container. This can only be called after the eventloop + # thread exited + self._container.destroy() + self._container = None # the following methods are not thread safe - they must be run from the # eventloop thread @@ -322,12 +326,6 @@ class Thread(threading.Thread): continue raise # assuming fatal... - # don't process any I/O or timers if woken up by a shutdown: - # if we've been forked we don't want to do I/O on the parent's - # sockets - if self._shutdown: - break - readable, writable, ignore = results for r in readable: @@ -345,4 +343,3 @@ class Thread(threading.Thread): LOG.info(_LI("eventloop thread exiting, container=%s"), self._container.name) - self._container.destroy() diff --git a/oslo_messaging/tests/test_amqp_driver.py b/oslo_messaging/tests/test_amqp_driver.py index b011a6383..b4f7b0459 100644 --- a/oslo_messaging/tests/test_amqp_driver.py +++ b/oslo_messaging/tests/test_amqp_driver.py @@ -516,9 +516,9 @@ class TestFailover(test_utils.BaseTestCase): if broker.isAlive(): broker.stop() - def test_broker_failover(self): - """Simulate failover of one broker to another.""" + def _failover(self, fail_brokers): self._brokers[0].start() + # self.config(trace=True, group="oslo_messaging_amqp") driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) target = oslo_messaging.Target(topic="my-topic") @@ -541,11 +541,10 @@ class TestFailover(test_utils.BaseTestCase): self.assertEqual(self._brokers[0].topic_count, 1) self.assertEqual(self._brokers[0].direct_count, 1) - # fail broker 0 and start broker 1: - self._brokers[0].stop() - self._brokers[1].start() + # invoke failover method + fail_brokers(self._brokers[0], self._brokers[1]) - # wait for listener links to re-establish + # wait for listener links to re-establish on broker 1 # 4 = 3 links per listener + 1 for the global reply queue predicate = lambda: self._brokers[1].sender_link_count == 4 _wait_until(predicate, 30) @@ -571,10 +570,40 @@ class TestFailover(test_utils.BaseTestCase): self._brokers[1].stop() driver.cleanup() + def test_broker_crash(self): + """Simulate a failure of one broker.""" + def _meth(broker0, broker1): + # fail broker 0 and start broker 1: + broker0.stop() + time.sleep(0.5) + broker1.start() + self._failover(_meth) + + def test_broker_shutdown(self): + """Simulate a normal shutdown of a broker.""" + def _meth(broker0, broker1): + broker0.stop(clean=True) + time.sleep(0.5) + broker1.start() + self._failover(_meth) + + def test_heartbeat_failover(self): + """Simulate broker heartbeat timeout.""" + def _meth(broker0, broker1): + # keep alive heartbeat from broker 0 will stop, which should force + # failover to broker 1 in about two seconds + broker0.pause() + broker1.start() + self.config(idle_timeout=2, group="oslo_messaging_amqp") + self._failover(_meth) + self._brokers[0].stop() + def test_listener_failover(self): - """Verify that Listeners are re-established after failover. + """Verify that Listeners sharing the same topic are re-established + after failover. """ self._brokers[0].start() + # self.config(trace=True, group="oslo_messaging_amqp") driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) target = oslo_messaging.Target(topic="my-topic") @@ -596,11 +625,11 @@ class TestFailover(test_utils.BaseTestCase): _wait_until(predicate, 30) self.assertTrue(predicate()) - # fail broker 0 and start broker 1: - self._brokers[0].stop() + # start broker 1 then shutdown broker 0: self._brokers[1].start() + self._brokers[0].stop(clean=True) - # wait again for 7 sending links to re-establish + # wait again for 7 sending links to re-establish on broker 1 predicate = lambda: self._brokers[1].sender_link_count == 7 _wait_until(predicate, 30) self.assertTrue(predicate()) @@ -617,8 +646,8 @@ class TestFailover(test_utils.BaseTestCase): listener2.join(timeout=30) self.assertFalse(listener1.isAlive() or listener2.isAlive()) - self._brokers[1].stop() driver.cleanup() + self._brokers[1].stop() class FakeBroker(threading.Thread): @@ -659,23 +688,19 @@ class FakeBroker(threading.Thread): self.connection.open() self.sender_links = set() self.receiver_links = set() - self.closed = False + self.dead_links = set() def destroy(self): """Destroy the test connection.""" - # destroy modifies the set, so make a copy - tmp = self.sender_links.copy() - while tmp: - link = tmp.pop() - link.destroy() - # destroy modifies the set, so make a copy - tmp = self.receiver_links.copy() - while tmp: - link = tmp.pop() + for link in self.sender_links | self.receiver_links: link.destroy() + self.sender_links.clear() + self.receiver_links.clear() + self.dead_links.clear() self.connection.destroy() self.connection = None self.socket.close() + self.socket = None def fileno(self): """Allows use of this in a select() call.""" @@ -685,18 +710,23 @@ class FakeBroker(threading.Thread): """Called when socket is read-ready.""" try: pyngus.read_socket_input(self.connection, self.socket) + self.connection.process(time.time()) except socket.error: - pass - self.connection.process(time.time()) + self._socket_error() def send_output(self): """Called when socket is write-ready.""" try: pyngus.write_socket_output(self.connection, self.socket) + self.connection.process(time.time()) except socket.error: - pass - self.connection.process(time.time()) + self._socket_error() + + def _socket_error(self): + self.connection.close_input() + self.connection.close_output() + # the broker will clean up in its main loop # Pyngus ConnectionEventHandler callbacks: @@ -710,7 +740,6 @@ class FakeBroker(threading.Thread): def connection_closed(self, connection): """Connection close completed.""" self.server.connection_count -= 1 - self.closed = True # main loop will destroy def connection_failed(self, connection, error): """Connection failure detected.""" @@ -757,10 +786,10 @@ class FakeBroker(threading.Thread): def destroy(self): """Destroy the link.""" - self._cleanup() conn = self.conn self.conn = None conn.sender_links.remove(self) + conn.dead_links.discard(self) if self.link: self.link.destroy() self.link = None @@ -774,6 +803,7 @@ class FakeBroker(threading.Thread): self.server.remove_route(self.link.source_address, self) self.routed = False + self.conn.dead_links.add(self) # Pyngus SenderEventHandler callbacks: @@ -783,12 +813,14 @@ class FakeBroker(threading.Thread): self.routed = True def sender_remote_closed(self, sender_link, error): - self._cleanup() self.link.close() def sender_closed(self, sender_link): self.server.sender_link_count -= 1 - self.destroy() + self._cleanup() + + def sender_failed(self, sender_link, error): + self.sender_closed(sender_link) class ReceiverLink(pyngus.ReceiverEventHandler): """An AMQP Receiving link.""" @@ -808,6 +840,7 @@ class FakeBroker(threading.Thread): conn = self.conn self.conn = None conn.receiver_links.remove(self) + conn.dead_links.discard(self) if self.link: self.link.destroy() self.link = None @@ -822,7 +855,10 @@ class FakeBroker(threading.Thread): def receiver_closed(self, receiver_link): self.server.receiver_link_count -= 1 - self.destroy() + self.conn.dead_links.add(self) + + def receiver_failed(self, receiver_link, error): + self.receiver_closed(receiver_link) def message_received(self, receiver_link, message, handle): """Forward this message out the proper sending link.""" @@ -863,6 +899,7 @@ class FakeBroker(threading.Thread): % (self.host, self.port)) self._connections = {} self._sources = {} + self._pause = threading.Event() # count of messages forwarded, by messaging pattern self.direct_count = 0 self.topic_count = 0 @@ -878,14 +915,26 @@ class FakeBroker(threading.Thread): """Start the server.""" LOG.debug("Starting Test Broker on %s:%d", self.host, self.port) self._shutdown = False + self._closing = False self.daemon = True + self._pause.set() self._my_socket.listen(10) super(FakeBroker, self).start() - def stop(self): - """Shutdown the server.""" + def pause(self): + self._pause.clear() + os.write(self._wakeup_pipe[1], b'!') + + def stop(self, clean=False): + """Stop the server.""" + # If clean is True, attempt a clean shutdown by closing all open + # links/connections first. Otherwise force an immediate disconnect LOG.debug("Stopping test Broker %s:%d", self.host, self.port) - self._shutdown = True + if clean: + self._closing = 1 + else: + self._shutdown = True + self._pause.set() os.write(self._wakeup_pipe[1], b'!') self.join() LOG.debug("Test Broker %s:%d stopped", self.host, self.port) @@ -894,6 +943,7 @@ class FakeBroker(threading.Thread): """Process I/O and timer events until the broker is stopped.""" LOG.debug("Test Broker on %s:%d started", self.host, self.port) while not self._shutdown: + self._pause.wait() readers, writers, timers = self.container.need_processing() # map pyngus Connections back to _TestConnections: @@ -915,16 +965,19 @@ class FakeBroker(threading.Thread): worked = set() for r in readable: if r is self._my_socket: - # new inbound connection request received, - # create a new Connection for it: - client_socket, client_address = self._my_socket.accept() - name = str(client_address) - conn = FakeBroker.Connection(self, client_socket, name, - self._sasl_mechanisms, - self._user_credentials, - self._sasl_config_dir, - self._sasl_config_name) - self._connections[conn.name] = conn + # new inbound connection request received + sock, addr = self._my_socket.accept() + if not self._closing: + # create a new Connection for it: + name = str(addr) + conn = FakeBroker.Connection(self, sock, name, + self._sasl_mechanisms, + self._user_credentials, + self._sasl_config_dir, + self._sasl_config_name) + self._connections[conn.name] = conn + else: + sock.close() # drop it elif r is self._wakeup_pipe[0]: os.read(self._wakeup_pipe[0], 512) else: @@ -943,14 +996,26 @@ class FakeBroker(threading.Thread): w.send_output() worked.add(w) - # clean up any closed connections: + # clean up any closed connections or links: while worked: conn = worked.pop() - if conn.closed: + if conn.connection.closed: del self._connections[conn.name] conn.destroy() + else: + while conn.dead_links: + conn.dead_links.pop().destroy() - # Shutting down + if self._closing and not self._connections: + self._shutdown = True + elif self._closing == 1: + # start closing connections + self._closing = 2 + for conn in self._connections.values(): + conn.connection.close() + + # Shutting down. Any open links are just disconnected - the peer will + # see a socket close. self._my_socket.close() for conn in self._connections.values(): conn.destroy()