Merge "Create a new connection when a process fork has been detected"
This commit is contained in:
commit
6e4ccfa60c
@ -57,8 +57,11 @@ class Replies(pyngus.ReceiverEventHandler):
|
||||
self._correlation = {} # map of correlation-id to response queue
|
||||
self._ready = False
|
||||
self._on_ready = on_ready
|
||||
rname = "Consumer-%s:src=[dynamic]:tgt=replies" % uuid.uuid4().hex
|
||||
self._receiver = connection.create_receiver("replies",
|
||||
event_handler=self)
|
||||
event_handler=self,
|
||||
name=rname)
|
||||
|
||||
# capacity determines the maximum number of reply messages this link
|
||||
# can receive. As messages are received and credit is consumed, this
|
||||
# driver will 'top up' the credit back to max capacity. This number
|
||||
@ -253,8 +256,6 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
self.group_request_prefix = \
|
||||
config.oslo_messaging_amqp.group_request_prefix
|
||||
self._container_name = config.oslo_messaging_amqp.container_name
|
||||
if not self._container_name:
|
||||
self._container_name = "container-%s" % uuid.uuid4().hex
|
||||
self.idle_timeout = config.oslo_messaging_amqp.idle_timeout
|
||||
self.trace_protocol = config.oslo_messaging_amqp.trace
|
||||
self.ssl_ca_file = config.oslo_messaging_amqp.ssl_ca_file
|
||||
@ -290,14 +291,13 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
self._tasks.put(task)
|
||||
self._schedule_task_processing()
|
||||
|
||||
def destroy(self):
|
||||
def shutdown(self, wait=True, timeout=None):
|
||||
"""Shutdown the messaging service."""
|
||||
if self.processor:
|
||||
self.processor.wakeup(lambda: self._start_shutdown())
|
||||
LOG.info("Waiting for eventloop to exit")
|
||||
self.processor.join()
|
||||
LOG.debug("Waiting for eventloop to exit")
|
||||
self.processor.shutdown(wait, timeout)
|
||||
self.processor = None
|
||||
LOG.info("Eventloop exited, driver shut down")
|
||||
LOG.debug("Eventloop exited, driver shut down")
|
||||
|
||||
# The remaining methods are reserved to run from the eventloop thread only!
|
||||
# They must not be invoked directly!
|
||||
|
@ -21,6 +21,7 @@ messaging protocol. The driver sends messages and creates subscriptions via
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
|
||||
@ -190,25 +191,36 @@ class ProtonDriver(base.BaseDriver):
|
||||
|
||||
super(ProtonDriver, self).__init__(conf, url, default_exchange,
|
||||
allowed_remote_exmods)
|
||||
# TODO(grs): handle authentication etc
|
||||
self._hosts = url.hosts
|
||||
self._conf = conf
|
||||
self._default_exchange = default_exchange
|
||||
|
||||
# Create a Controller that connects to the messaging service:
|
||||
self._ctrl = controller.Controller(url.hosts, default_exchange, conf)
|
||||
|
||||
# lazy connection setup - don't cause the controller to connect until
|
||||
# lazy connection setup - don't create the controller until
|
||||
# after the first messaging request:
|
||||
self._connect_called = False
|
||||
self._ctrl = None
|
||||
self._pid = None
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def _ensure_connect_called(func):
|
||||
"""Causes the controller to connect to the messaging service when it is
|
||||
first used. It is safe to push tasks to it whether connected or not,
|
||||
but those tasks won't be processed until connection completes.
|
||||
"""Causes a new controller to be created when the messaging service is
|
||||
first used by the current process. It is safe to push tasks to it
|
||||
whether connected or not, but those tasks won't be processed until
|
||||
connection completes.
|
||||
"""
|
||||
def wrap(self, *args, **kws):
|
||||
with self._lock:
|
||||
connect_called = self._connect_called
|
||||
self._connect_called = True
|
||||
if not connect_called:
|
||||
old_pid = self._pid
|
||||
self._pid = os.getpid()
|
||||
|
||||
if old_pid != self._pid:
|
||||
if self._ctrl is not None:
|
||||
LOG.warning("Process forked after connection established!")
|
||||
self._ctrl.shutdown(wait=False)
|
||||
# Create a Controller that connects to the messaging service:
|
||||
self._ctrl = controller.Controller(self._hosts,
|
||||
self._default_exchange,
|
||||
self._conf)
|
||||
self._ctrl.connect()
|
||||
return func(self, *args, **kws)
|
||||
return wrap
|
||||
@ -277,6 +289,7 @@ class ProtonDriver(base.BaseDriver):
|
||||
|
||||
def cleanup(self):
|
||||
"""Release all resources."""
|
||||
LOG.debug("Cleaning up ProtonDriver")
|
||||
self._ctrl.destroy()
|
||||
self._ctrl = None
|
||||
if self._ctrl:
|
||||
self._ctrl.shutdown()
|
||||
self._ctrl = None
|
||||
LOG.info("AMQP 1.0 messaging driver shutdown")
|
||||
|
@ -235,7 +235,7 @@ class Thread(threading.Thread):
|
||||
|
||||
# Configure a container
|
||||
if container_name is None:
|
||||
container_name = uuid.uuid4().hex
|
||||
container_name = "Container-" + uuid.uuid4().hex
|
||||
self._container = pyngus.Container(container_name)
|
||||
|
||||
self.name = "Thread for Proton container: %s" % self._container.name
|
||||
@ -245,25 +245,27 @@ class Thread(threading.Thread):
|
||||
|
||||
def wakeup(self, request=None):
|
||||
"""Wake up the eventloop thread, Optionally providing a callable to run
|
||||
when the eventloop wakes up.
|
||||
when the eventloop wakes up. Thread safe.
|
||||
"""
|
||||
self._requests.wakeup(request)
|
||||
|
||||
def shutdown(self, wait=True, timeout=None):
|
||||
"""Shutdown the eventloop thread. Thread safe.
|
||||
"""
|
||||
LOG.debug("eventloop shutdown requested")
|
||||
self._shutdown = True
|
||||
self.wakeup()
|
||||
if wait:
|
||||
self.join(timeout=timeout)
|
||||
LOG.debug("eventloop shutdown complete")
|
||||
|
||||
# the following methods are not thread safe - they must be run from the
|
||||
# eventloop thread
|
||||
|
||||
def schedule(self, request, delay):
|
||||
"""Invoke request after delay seconds."""
|
||||
self._schedule.schedule(request, delay)
|
||||
|
||||
def destroy(self):
|
||||
"""Stop the processing thread, releasing all resources.
|
||||
"""
|
||||
LOG.debug("Stopping Proton container %s", self._container.name)
|
||||
self.wakeup(lambda: self.shutdown())
|
||||
self.join()
|
||||
|
||||
def shutdown(self):
|
||||
LOG.info("eventloop shutdown requested")
|
||||
self._shutdown = True
|
||||
|
||||
def connect(self, host, handler, properties=None, name=None):
|
||||
"""Get a _SocketConnection to a peer represented by url."""
|
||||
key = name or "%s:%i" % (host.hostname, host.port)
|
||||
@ -311,6 +313,13 @@ class Thread(threading.Thread):
|
||||
str(serror))
|
||||
continue
|
||||
raise # assuming fatal...
|
||||
|
||||
# don't process any I/O or timers if woken up by a shutdown:
|
||||
# if we've been forked we don't want to do I/O on the parent's
|
||||
# sockets
|
||||
if self._shutdown:
|
||||
break
|
||||
|
||||
readable, writable, ignore = results
|
||||
|
||||
for r in readable:
|
||||
|
Loading…
x
Reference in New Issue
Block a user