Merge "Warn user if needed when the process is forked"
This commit is contained in:
commit
1834167b4f
@ -14,3 +14,17 @@ Transport
|
|||||||
.. autoclass:: TransportHost
|
.. autoclass:: TransportHost
|
||||||
|
|
||||||
.. autofunction:: set_transport_defaults
|
.. autofunction:: set_transport_defaults
|
||||||
|
|
||||||
|
|
||||||
|
About fork oslo.messaging transport object
|
||||||
|
------------------------------------------
|
||||||
|
|
||||||
|
oslo.messaging can't ensure that forking a process that shares the same
|
||||||
|
transport object is safe for the library consumer, because it relies on
|
||||||
|
different 3rd party libraries that don't ensure that too, but in certain
|
||||||
|
case/driver it works:
|
||||||
|
|
||||||
|
* rabbit: works only if no connection have already been established.
|
||||||
|
* qpid: doesn't work (qpid library have a global state that use fd
|
||||||
|
that can't be resetted)
|
||||||
|
* amqp1: works
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
import functools
|
import functools
|
||||||
import itertools
|
import itertools
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
import random
|
import random
|
||||||
import time
|
import time
|
||||||
|
|
||||||
@ -490,6 +491,7 @@ class Connection(object):
|
|||||||
random.shuffle(self.brokers_params)
|
random.shuffle(self.brokers_params)
|
||||||
self.brokers = itertools.cycle(self.brokers_params)
|
self.brokers = itertools.cycle(self.brokers_params)
|
||||||
|
|
||||||
|
self._initial_pid = os.getpid()
|
||||||
self.reconnect()
|
self.reconnect()
|
||||||
|
|
||||||
def _connect(self, broker):
|
def _connect(self, broker):
|
||||||
@ -578,6 +580,21 @@ class Connection(object):
|
|||||||
LOG.debug("Re-established AMQP queues")
|
LOG.debug("Re-established AMQP queues")
|
||||||
|
|
||||||
def ensure(self, error_callback, method, retry=None):
|
def ensure(self, error_callback, method, retry=None):
|
||||||
|
|
||||||
|
current_pid = os.getpid()
|
||||||
|
if self._initial_pid != current_pid:
|
||||||
|
# NOTE(sileht):
|
||||||
|
# to get the same level of fork support that rabbit driver have
|
||||||
|
# (ie: allow fork before the first connection established)
|
||||||
|
# we could use the kombu workaround:
|
||||||
|
# https://github.com/celery/kombu/blob/master/kombu/transport/
|
||||||
|
# qpid_patches.py#L67
|
||||||
|
LOG.warn("Process forked! "
|
||||||
|
"This can results to unpredictable behavior. "
|
||||||
|
"See: http://docs.openstack.org/developer/"
|
||||||
|
"oslo.messaging/transport.html")
|
||||||
|
self._initial_pid = current_pid
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
return method()
|
return method()
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
import functools
|
import functools
|
||||||
import itertools
|
import itertools
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
import socket
|
import socket
|
||||||
import ssl
|
import ssl
|
||||||
import time
|
import time
|
||||||
@ -492,6 +493,8 @@ class Connection(object):
|
|||||||
hostname, port,
|
hostname, port,
|
||||||
virtual_host)
|
virtual_host)
|
||||||
|
|
||||||
|
self._initial_pid = os.getpid()
|
||||||
|
|
||||||
self.do_consume = True
|
self.do_consume = True
|
||||||
|
|
||||||
self.channel = None
|
self.channel = None
|
||||||
@ -569,6 +572,14 @@ class Connection(object):
|
|||||||
retry = N means N retries
|
retry = N means N retries
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
current_pid = os.getpid()
|
||||||
|
if self._initial_pid != current_pid:
|
||||||
|
LOG.warn("Process forked after connection established! "
|
||||||
|
"This can results to unpredictable behavior. "
|
||||||
|
"See: http://docs.openstack.org/developer/"
|
||||||
|
"oslo.messaging/transport.html")
|
||||||
|
self._initial_pid = current_pid
|
||||||
|
|
||||||
if retry is None:
|
if retry is None:
|
||||||
retry = self.max_retries
|
retry = self.max_retries
|
||||||
if retry is None or retry < 0:
|
if retry is None or retry < 0:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user