Merge "Fix controller worker graceful shutdown"
This commit is contained in:
commit
2a60c13863
@ -385,6 +385,11 @@ function octavia_configure {
|
|||||||
iniset $OCTAVIA_CONF api_settings bind_port ${OCTAVIA_HA_PORT}
|
iniset $OCTAVIA_CONF api_settings bind_port ${OCTAVIA_HA_PORT}
|
||||||
iniset $OCTAVIA_CONF api_settings bind_host 0.0.0.0
|
iniset $OCTAVIA_CONF api_settings bind_host 0.0.0.0
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
# set default graceful_shutdown_timeout to 300 sec (5 minutes)
|
||||||
|
# TODO(gthiemonge) update this value after persistant taskflow commits are
|
||||||
|
# merged
|
||||||
|
iniset $OCTAVIA_CONF DEFAULT graceful_shutdown_timeout 300
|
||||||
}
|
}
|
||||||
|
|
||||||
function create_mgmt_network_interface {
|
function create_mgmt_network_interface {
|
||||||
|
@ -26,3 +26,4 @@ Octavia Configuration Options
|
|||||||
oslo.db
|
oslo.db
|
||||||
oslo.log
|
oslo.log
|
||||||
oslo.messaging
|
oslo.messaging
|
||||||
|
cotyledon
|
||||||
|
@ -16,6 +16,9 @@
|
|||||||
# transport_url = rabbit://<user>:<pass>@server01,<user>:<pass>@server02/<vhost>
|
# transport_url = rabbit://<user>:<pass>@server01,<user>:<pass>@server02/<vhost>
|
||||||
# transport_url =
|
# transport_url =
|
||||||
|
|
||||||
|
# How long in seconds to wait for octavia worker to exit before killing them.
|
||||||
|
# graceful_shutdown_timeout = 60
|
||||||
|
|
||||||
[api_settings]
|
[api_settings]
|
||||||
# bind_host = 127.0.0.1
|
# bind_host = 127.0.0.1
|
||||||
# bind_port = 9876
|
# bind_port = 9876
|
||||||
|
@ -46,11 +46,11 @@ class ConsumerService(cotyledon.Service):
|
|||||||
)
|
)
|
||||||
self.message_listener.start()
|
self.message_listener.start()
|
||||||
|
|
||||||
def terminate(self, graceful=False):
|
def terminate(self):
|
||||||
if self.message_listener:
|
if self.message_listener:
|
||||||
LOG.info('Stopping consumer...')
|
LOG.info('Stopping consumer...')
|
||||||
self.message_listener.stop()
|
self.message_listener.stop()
|
||||||
if graceful:
|
|
||||||
LOG.info('Consumer successfully stopped. Waiting for final '
|
LOG.info('Consumer successfully stopped. Waiting for final '
|
||||||
'messages to be processed...')
|
'messages to be processed...')
|
||||||
self.message_listener.wait()
|
self.message_listener.wait()
|
||||||
|
@ -47,11 +47,11 @@ class ConsumerService(cotyledon.Service):
|
|||||||
)
|
)
|
||||||
self.message_listener.start()
|
self.message_listener.start()
|
||||||
|
|
||||||
def terminate(self, graceful=False):
|
def terminate(self):
|
||||||
if self.message_listener:
|
if self.message_listener:
|
||||||
LOG.info('Stopping V2 consumer...')
|
LOG.info('Stopping V2 consumer...')
|
||||||
self.message_listener.stop()
|
self.message_listener.stop()
|
||||||
if graceful:
|
|
||||||
LOG.info('V2 Consumer successfully stopped. Waiting for '
|
LOG.info('V2 Consumer successfully stopped. Waiting for '
|
||||||
'final messages to be processed...')
|
'final messages to be processed...')
|
||||||
self.message_listener.wait()
|
self.message_listener.wait()
|
||||||
|
@ -58,15 +58,4 @@ class TestConsumer(base.TestRpc):
|
|||||||
cons.run()
|
cons.run()
|
||||||
cons.terminate()
|
cons.terminate()
|
||||||
mock_rpc_server_rv.stop.assert_called_once_with()
|
mock_rpc_server_rv.stop.assert_called_once_with()
|
||||||
self.assertFalse(mock_rpc_server_rv.wait.called)
|
|
||||||
|
|
||||||
@mock.patch.object(messaging, 'get_rpc_server')
|
|
||||||
def test_consumer_graceful_terminate(self, mock_rpc_server):
|
|
||||||
mock_rpc_server_rv = mock.Mock()
|
|
||||||
mock_rpc_server.return_value = mock_rpc_server_rv
|
|
||||||
|
|
||||||
cons = consumer.ConsumerService(1, self.conf)
|
|
||||||
cons.run()
|
|
||||||
cons.terminate(graceful=True)
|
|
||||||
mock_rpc_server_rv.stop.assert_called_once_with()
|
|
||||||
mock_rpc_server_rv.wait.assert_called_once_with()
|
mock_rpc_server_rv.wait.assert_called_once_with()
|
||||||
|
@ -58,15 +58,4 @@ class TestConsumer(base.TestRpc):
|
|||||||
cons.run()
|
cons.run()
|
||||||
cons.terminate()
|
cons.terminate()
|
||||||
mock_rpc_server_rv.stop.assert_called_once_with()
|
mock_rpc_server_rv.stop.assert_called_once_with()
|
||||||
self.assertFalse(mock_rpc_server_rv.wait.called)
|
|
||||||
|
|
||||||
@mock.patch.object(messaging, 'get_rpc_server')
|
|
||||||
def test_consumer_graceful_terminate(self, mock_rpc_server):
|
|
||||||
mock_rpc_server_rv = mock.Mock()
|
|
||||||
mock_rpc_server.return_value = mock_rpc_server_rv
|
|
||||||
|
|
||||||
cons = consumer.ConsumerService(1, self.conf)
|
|
||||||
cons.run()
|
|
||||||
cons.terminate(graceful=True)
|
|
||||||
mock_rpc_server_rv.stop.assert_called_once_with()
|
|
||||||
mock_rpc_server_rv.wait.assert_called_once_with()
|
mock_rpc_server_rv.wait.assert_called_once_with()
|
||||||
|
@ -0,0 +1,9 @@
|
|||||||
|
---
|
||||||
|
fixes:
|
||||||
|
- |
|
||||||
|
Fix a bug that could interrupt resource creation when performing a graceful
|
||||||
|
shutdown of the controller worker and leave resources in a
|
||||||
|
PENDING_CREATE/PENDING_UPDATE/PENDING_DELETE provisioning status. If the
|
||||||
|
duration of an Octavia flow is greater than the 'graceful_shutdown_timeout'
|
||||||
|
configuration value, stopping the Octavia worker can still interrupt the
|
||||||
|
creation of resources.
|
3
tox.ini
3
tox.ini
@ -129,7 +129,8 @@ commands =
|
|||||||
--namespace oslo.db \
|
--namespace oslo.db \
|
||||||
--namespace oslo.log \
|
--namespace oslo.log \
|
||||||
--namespace oslo.messaging \
|
--namespace oslo.messaging \
|
||||||
--namespace keystonemiddleware.auth_token
|
--namespace keystonemiddleware.auth_token \
|
||||||
|
--namespace cotyledon
|
||||||
|
|
||||||
[testenv:genpolicy]
|
[testenv:genpolicy]
|
||||||
basepython = python3
|
basepython = python3
|
||||||
|
Loading…
Reference in New Issue
Block a user