diff --git a/doc/source/transport.rst b/doc/source/transport.rst index f914269d9..fc6a7cfb1 100644 --- a/doc/source/transport.rst +++ b/doc/source/transport.rst @@ -14,3 +14,17 @@ Transport .. autoclass:: TransportHost .. autofunction:: set_transport_defaults + + +About fork oslo.messaging transport object +------------------------------------------ + +oslo.messaging can't ensure that forking a process that shares the same +transport object is safe for the library consumer, because it relies on +different 3rd party libraries that don't ensure that too, but in certain +case/driver it works: + +* rabbit: works only if no connection have already been established. +* qpid: doesn't work (qpid library have a global state that use fd + that can't be resetted) +* amqp1: works diff --git a/oslo/messaging/_drivers/impl_qpid.py b/oslo/messaging/_drivers/impl_qpid.py index 28bde7ca9..25083c35c 100644 --- a/oslo/messaging/_drivers/impl_qpid.py +++ b/oslo/messaging/_drivers/impl_qpid.py @@ -16,6 +16,7 @@ import functools import itertools import logging +import os import random import time @@ -490,6 +491,7 @@ class Connection(object): random.shuffle(self.brokers_params) self.brokers = itertools.cycle(self.brokers_params) + self._initial_pid = os.getpid() self.reconnect() def _connect(self, broker): @@ -578,6 +580,21 @@ class Connection(object): LOG.debug("Re-established AMQP queues") def ensure(self, error_callback, method, retry=None): + + current_pid = os.getpid() + if self._initial_pid != current_pid: + # NOTE(sileht): + # to get the same level of fork support that rabbit driver have + # (ie: allow fork before the first connection established) + # we could use the kombu workaround: + # https://github.com/celery/kombu/blob/master/kombu/transport/ + # qpid_patches.py#L67 + LOG.warn("Process forked! " + "This can results to unpredictable behavior. " + "See: http://docs.openstack.org/developer/" + "oslo.messaging/transport.html") + self._initial_pid = current_pid + while True: try: return method() diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py index 939a3cec2..2fda9b40c 100644 --- a/oslo/messaging/_drivers/impl_rabbit.py +++ b/oslo/messaging/_drivers/impl_rabbit.py @@ -15,6 +15,7 @@ import functools import itertools import logging +import os import socket import ssl import time @@ -472,6 +473,8 @@ class Connection(object): hostname, port, virtual_host) + self._initial_pid = os.getpid() + self.do_consume = True self.channel = None @@ -553,6 +556,14 @@ class Connection(object): retry = N means N retries """ + current_pid = os.getpid() + if self._initial_pid != current_pid: + LOG.warn("Process forked after connection established! " + "This can results to unpredictable behavior. " + "See: http://docs.openstack.org/developer/" + "oslo.messaging/transport.html") + self._initial_pid = current_pid + if retry is None: retry = self.max_retries if retry is None or retry < 0: