diff --git a/oslo/messaging/_drivers/protocols/amqp/controller.py b/oslo/messaging/_drivers/protocols/amqp/controller.py index 7bb5493b0..7f013533e 100644 --- a/oslo/messaging/_drivers/protocols/amqp/controller.py +++ b/oslo/messaging/_drivers/protocols/amqp/controller.py @@ -57,8 +57,11 @@ class Replies(pyngus.ReceiverEventHandler): self._correlation = {} # map of correlation-id to response queue self._ready = False self._on_ready = on_ready + rname = "Consumer-%s:src=[dynamic]:tgt=replies" % uuid.uuid4().hex self._receiver = connection.create_receiver("replies", - event_handler=self) + event_handler=self, + name=rname) + # capacity determines the maximum number of reply messages this link # can receive. As messages are received and credit is consumed, this # driver will 'top up' the credit back to max capacity. This number @@ -253,8 +256,6 @@ class Controller(pyngus.ConnectionEventHandler): self.group_request_prefix = \ config.oslo_messaging_amqp.group_request_prefix self._container_name = config.oslo_messaging_amqp.container_name - if not self._container_name: - self._container_name = "container-%s" % uuid.uuid4().hex self.idle_timeout = config.oslo_messaging_amqp.idle_timeout self.trace_protocol = config.oslo_messaging_amqp.trace self.ssl_ca_file = config.oslo_messaging_amqp.ssl_ca_file @@ -290,14 +291,13 @@ class Controller(pyngus.ConnectionEventHandler): self._tasks.put(task) self._schedule_task_processing() - def destroy(self): + def shutdown(self, wait=True, timeout=None): """Shutdown the messaging service.""" if self.processor: - self.processor.wakeup(lambda: self._start_shutdown()) - LOG.info("Waiting for eventloop to exit") - self.processor.join() + LOG.debug("Waiting for eventloop to exit") + self.processor.shutdown(wait, timeout) self.processor = None - LOG.info("Eventloop exited, driver shut down") + LOG.debug("Eventloop exited, driver shut down") # The remaining methods are reserved to run from the eventloop thread only! # They must not be invoked directly! diff --git a/oslo/messaging/_drivers/protocols/amqp/driver.py b/oslo/messaging/_drivers/protocols/amqp/driver.py index c84b90813..f1afb4820 100644 --- a/oslo/messaging/_drivers/protocols/amqp/driver.py +++ b/oslo/messaging/_drivers/protocols/amqp/driver.py @@ -21,6 +21,7 @@ messaging protocol. The driver sends messages and creates subscriptions via """ import logging +import os import threading import time @@ -190,25 +191,36 @@ class ProtonDriver(base.BaseDriver): super(ProtonDriver, self).__init__(conf, url, default_exchange, allowed_remote_exmods) + # TODO(grs): handle authentication etc + self._hosts = url.hosts + self._conf = conf + self._default_exchange = default_exchange - # Create a Controller that connects to the messaging service: - self._ctrl = controller.Controller(url.hosts, default_exchange, conf) - - # lazy connection setup - don't cause the controller to connect until + # lazy connection setup - don't create the controller until # after the first messaging request: - self._connect_called = False + self._ctrl = None + self._pid = None self._lock = threading.Lock() def _ensure_connect_called(func): - """Causes the controller to connect to the messaging service when it is - first used. It is safe to push tasks to it whether connected or not, - but those tasks won't be processed until connection completes. + """Causes a new controller to be created when the messaging service is + first used by the current process. It is safe to push tasks to it + whether connected or not, but those tasks won't be processed until + connection completes. """ def wrap(self, *args, **kws): with self._lock: - connect_called = self._connect_called - self._connect_called = True - if not connect_called: + old_pid = self._pid + self._pid = os.getpid() + + if old_pid != self._pid: + if self._ctrl is not None: + LOG.warning("Process forked after connection established!") + self._ctrl.shutdown(wait=False) + # Create a Controller that connects to the messaging service: + self._ctrl = controller.Controller(self._hosts, + self._default_exchange, + self._conf) self._ctrl.connect() return func(self, *args, **kws) return wrap @@ -274,6 +286,7 @@ class ProtonDriver(base.BaseDriver): def cleanup(self): """Release all resources.""" - LOG.debug("Cleaning up ProtonDriver") - self._ctrl.destroy() - self._ctrl = None + if self._ctrl: + self._ctrl.shutdown() + self._ctrl = None + LOG.info("AMQP 1.0 messaging driver shutdown") diff --git a/oslo/messaging/_drivers/protocols/amqp/eventloop.py b/oslo/messaging/_drivers/protocols/amqp/eventloop.py index f3d235a57..04ff868f6 100644 --- a/oslo/messaging/_drivers/protocols/amqp/eventloop.py +++ b/oslo/messaging/_drivers/protocols/amqp/eventloop.py @@ -235,7 +235,7 @@ class Thread(threading.Thread): # Configure a container if container_name is None: - container_name = uuid.uuid4().hex + container_name = "Container-" + uuid.uuid4().hex self._container = pyngus.Container(container_name) self.name = "Thread for Proton container: %s" % self._container.name @@ -245,25 +245,27 @@ class Thread(threading.Thread): def wakeup(self, request=None): """Wake up the eventloop thread, Optionally providing a callable to run - when the eventloop wakes up. + when the eventloop wakes up. Thread safe. """ self._requests.wakeup(request) + def shutdown(self, wait=True, timeout=None): + """Shutdown the eventloop thread. Thread safe. + """ + LOG.debug("eventloop shutdown requested") + self._shutdown = True + self.wakeup() + if wait: + self.join(timeout=timeout) + LOG.debug("eventloop shutdown complete") + + # the following methods are not thread safe - they must be run from the + # eventloop thread + def schedule(self, request, delay): """Invoke request after delay seconds.""" self._schedule.schedule(request, delay) - def destroy(self): - """Stop the processing thread, releasing all resources. - """ - LOG.debug("Stopping Proton container %s", self._container.name) - self.wakeup(lambda: self.shutdown()) - self.join() - - def shutdown(self): - LOG.info("eventloop shutdown requested") - self._shutdown = True - def connect(self, host, handler, properties=None, name=None): """Get a _SocketConnection to a peer represented by url.""" key = name or "%s:%i" % (host.hostname, host.port) @@ -311,6 +313,13 @@ class Thread(threading.Thread): str(serror)) continue raise # assuming fatal... + + # don't process any I/O or timers if woken up by a shutdown: + # if we've been forked we don't want to do I/O on the parent's + # sockets + if self._shutdown: + break + readable, writable, ignore = results for r in readable: