From 80e62aed7d63175079422bd6bb4578d45da7f30e Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Fri, 14 Nov 2014 17:06:58 -0500 Subject: [PATCH] Create a new connection when a process fork has been detected This patch attempts to deal with applications that have forked the process after connecting to the broker. First, the creation of the connection is delayed until the application attempts to perform a messaging operation. Second, each time the application performs a messaging operation the current process id is checked against the id of the process that created the connection. If the process ids do not match, the application has called os.fork(). The new child process discards the existing connection and creates a new one. Change-Id: I5455cb0f8d380d6b65f1268b34a91355cbb14aa2 Closes-Bug: #1392868 --- .../_drivers/protocols/amqp/controller.py | 16 ++++---- .../_drivers/protocols/amqp/driver.py | 41 ++++++++++++------- .../_drivers/protocols/amqp/eventloop.py | 35 ++++++++++------ 3 files changed, 57 insertions(+), 35 deletions(-) diff --git a/oslo/messaging/_drivers/protocols/amqp/controller.py b/oslo/messaging/_drivers/protocols/amqp/controller.py index 7bb5493b0..7f013533e 100644 --- a/oslo/messaging/_drivers/protocols/amqp/controller.py +++ b/oslo/messaging/_drivers/protocols/amqp/controller.py @@ -57,8 +57,11 @@ class Replies(pyngus.ReceiverEventHandler): self._correlation = {} # map of correlation-id to response queue self._ready = False self._on_ready = on_ready + rname = "Consumer-%s:src=[dynamic]:tgt=replies" % uuid.uuid4().hex self._receiver = connection.create_receiver("replies", - event_handler=self) + event_handler=self, + name=rname) + # capacity determines the maximum number of reply messages this link # can receive. As messages are received and credit is consumed, this # driver will 'top up' the credit back to max capacity. This number @@ -253,8 +256,6 @@ class Controller(pyngus.ConnectionEventHandler): self.group_request_prefix = \ config.oslo_messaging_amqp.group_request_prefix self._container_name = config.oslo_messaging_amqp.container_name - if not self._container_name: - self._container_name = "container-%s" % uuid.uuid4().hex self.idle_timeout = config.oslo_messaging_amqp.idle_timeout self.trace_protocol = config.oslo_messaging_amqp.trace self.ssl_ca_file = config.oslo_messaging_amqp.ssl_ca_file @@ -290,14 +291,13 @@ class Controller(pyngus.ConnectionEventHandler): self._tasks.put(task) self._schedule_task_processing() - def destroy(self): + def shutdown(self, wait=True, timeout=None): """Shutdown the messaging service.""" if self.processor: - self.processor.wakeup(lambda: self._start_shutdown()) - LOG.info("Waiting for eventloop to exit") - self.processor.join() + LOG.debug("Waiting for eventloop to exit") + self.processor.shutdown(wait, timeout) self.processor = None - LOG.info("Eventloop exited, driver shut down") + LOG.debug("Eventloop exited, driver shut down") # The remaining methods are reserved to run from the eventloop thread only! # They must not be invoked directly! diff --git a/oslo/messaging/_drivers/protocols/amqp/driver.py b/oslo/messaging/_drivers/protocols/amqp/driver.py index c84b90813..f1afb4820 100644 --- a/oslo/messaging/_drivers/protocols/amqp/driver.py +++ b/oslo/messaging/_drivers/protocols/amqp/driver.py @@ -21,6 +21,7 @@ messaging protocol. The driver sends messages and creates subscriptions via """ import logging +import os import threading import time @@ -190,25 +191,36 @@ class ProtonDriver(base.BaseDriver): super(ProtonDriver, self).__init__(conf, url, default_exchange, allowed_remote_exmods) + # TODO(grs): handle authentication etc + self._hosts = url.hosts + self._conf = conf + self._default_exchange = default_exchange - # Create a Controller that connects to the messaging service: - self._ctrl = controller.Controller(url.hosts, default_exchange, conf) - - # lazy connection setup - don't cause the controller to connect until + # lazy connection setup - don't create the controller until # after the first messaging request: - self._connect_called = False + self._ctrl = None + self._pid = None self._lock = threading.Lock() def _ensure_connect_called(func): - """Causes the controller to connect to the messaging service when it is - first used. It is safe to push tasks to it whether connected or not, - but those tasks won't be processed until connection completes. + """Causes a new controller to be created when the messaging service is + first used by the current process. It is safe to push tasks to it + whether connected or not, but those tasks won't be processed until + connection completes. """ def wrap(self, *args, **kws): with self._lock: - connect_called = self._connect_called - self._connect_called = True - if not connect_called: + old_pid = self._pid + self._pid = os.getpid() + + if old_pid != self._pid: + if self._ctrl is not None: + LOG.warning("Process forked after connection established!") + self._ctrl.shutdown(wait=False) + # Create a Controller that connects to the messaging service: + self._ctrl = controller.Controller(self._hosts, + self._default_exchange, + self._conf) self._ctrl.connect() return func(self, *args, **kws) return wrap @@ -274,6 +286,7 @@ class ProtonDriver(base.BaseDriver): def cleanup(self): """Release all resources.""" - LOG.debug("Cleaning up ProtonDriver") - self._ctrl.destroy() - self._ctrl = None + if self._ctrl: + self._ctrl.shutdown() + self._ctrl = None + LOG.info("AMQP 1.0 messaging driver shutdown") diff --git a/oslo/messaging/_drivers/protocols/amqp/eventloop.py b/oslo/messaging/_drivers/protocols/amqp/eventloop.py index f3d235a57..04ff868f6 100644 --- a/oslo/messaging/_drivers/protocols/amqp/eventloop.py +++ b/oslo/messaging/_drivers/protocols/amqp/eventloop.py @@ -235,7 +235,7 @@ class Thread(threading.Thread): # Configure a container if container_name is None: - container_name = uuid.uuid4().hex + container_name = "Container-" + uuid.uuid4().hex self._container = pyngus.Container(container_name) self.name = "Thread for Proton container: %s" % self._container.name @@ -245,25 +245,27 @@ class Thread(threading.Thread): def wakeup(self, request=None): """Wake up the eventloop thread, Optionally providing a callable to run - when the eventloop wakes up. + when the eventloop wakes up. Thread safe. """ self._requests.wakeup(request) + def shutdown(self, wait=True, timeout=None): + """Shutdown the eventloop thread. Thread safe. + """ + LOG.debug("eventloop shutdown requested") + self._shutdown = True + self.wakeup() + if wait: + self.join(timeout=timeout) + LOG.debug("eventloop shutdown complete") + + # the following methods are not thread safe - they must be run from the + # eventloop thread + def schedule(self, request, delay): """Invoke request after delay seconds.""" self._schedule.schedule(request, delay) - def destroy(self): - """Stop the processing thread, releasing all resources. - """ - LOG.debug("Stopping Proton container %s", self._container.name) - self.wakeup(lambda: self.shutdown()) - self.join() - - def shutdown(self): - LOG.info("eventloop shutdown requested") - self._shutdown = True - def connect(self, host, handler, properties=None, name=None): """Get a _SocketConnection to a peer represented by url.""" key = name or "%s:%i" % (host.hostname, host.port) @@ -311,6 +313,13 @@ class Thread(threading.Thread): str(serror)) continue raise # assuming fatal... + + # don't process any I/O or timers if woken up by a shutdown: + # if we've been forked we don't want to do I/O on the parent's + # sockets + if self._shutdown: + break + readable, writable, ignore = results for r in readable: