diff --git a/devstack/plugin.sh b/devstack/plugin.sh index 4c4f79a5f8..cb31ba57ec 100644 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -134,6 +134,7 @@ function octavia_configure { iniuncomment $OCTAVIA_CONF haproxy_amphora rest_request_read_timeout iniuncomment $OCTAVIA_CONF controller_worker amp_active_retries iniuncomment $OCTAVIA_CONF controller_worker amp_active_wait_sec + iniuncomment $OCTAVIA_CONF controller_worker workers # devstack optimizations for tempest runs iniset $OCTAVIA_CONF haproxy_amphora connection_max_retries 1500 @@ -142,6 +143,7 @@ function octavia_configure { iniset $OCTAVIA_CONF haproxy_amphora rest_request_read_timeout ${OCTAVIA_AMP_READ_TIMEOUT} iniset $OCTAVIA_CONF controller_worker amp_active_retries 100 iniset $OCTAVIA_CONF controller_worker amp_active_wait_sec 2 + iniset $OCTAVIA_CONF controller_worker workers 2 if [[ -a $OCTAVIA_SSH_DIR ]] ; then rm -rf $OCTAVIA_SSH_DIR diff --git a/etc/octavia.conf b/etc/octavia.conf index 9715435e51..49866b20fb 100644 --- a/etc/octavia.conf +++ b/etc/octavia.conf @@ -147,6 +147,7 @@ # rest_request_read_timeout = 60 [controller_worker] +# workers = 1 # amp_active_retries = 10 # amp_active_wait_sec = 10 # Glance parameters to extract image ID to use for amphora. Only one of diff --git a/octavia/cmd/octavia_worker.py b/octavia/cmd/octavia_worker.py index 71c96c22b5..94a976fb2d 100644 --- a/octavia/cmd/octavia_worker.py +++ b/octavia/cmd/octavia_worker.py @@ -14,15 +14,14 @@ import sys -import eventlet -eventlet.monkey_patch() -from oslo_config import cfg # noqa: E402 -from oslo_reports import guru_meditation_report as gmr # noqa: E402 -from oslo_service import service # noqa: E402 +import cotyledon +from cotyledon import oslo_config_glue +from oslo_config import cfg +from oslo_reports import guru_meditation_report as gmr -from octavia.common import service as octavia_service # noqa: E402 -from octavia.controller.queue import consumer # noqa: E402 -from octavia import version # noqa: E402 +from octavia.common import service as octavia_service +from octavia.controller.queue import consumer +from octavia import version CONF = cfg.CONF @@ -32,5 +31,8 @@ def main(): gmr.TextGuruMeditation.setup_autorun(version) - launcher = service.launch(CONF, consumer.Consumer()) - launcher.wait() + sm = cotyledon.ServiceManager() + sm.add(consumer.ConsumerService, workers=CONF.controller_worker.workers, + args=(CONF,)) + oslo_config_glue.setup(sm, CONF) + sm.run() diff --git a/octavia/common/config.py b/octavia/common/config.py index 674bd1da8a..d8fccbe369 100644 --- a/octavia/common/config.py +++ b/octavia/common/config.py @@ -216,6 +216,9 @@ haproxy_amphora_opts = [ ] controller_worker_opts = [ + cfg.IntOpt('workers', + default=1, min=1, + help='Number of workers for the controller-worker service.'), cfg.IntOpt('amp_active_retries', default=10, help=_('Retry attempts to wait for Amphora to become active')), diff --git a/octavia/controller/queue/consumer.py b/octavia/controller/queue/consumer.py index 96c1ce8c74..bd088e2160 100644 --- a/octavia/controller/queue/consumer.py +++ b/octavia/controller/queue/consumer.py @@ -12,11 +12,10 @@ # License for the specific language governing permissions and limitations # under the License. -from oslo_config import cfg +import cotyledon from oslo_log import log as logging import oslo_messaging as messaging from oslo_messaging.rpc import dispatcher -from oslo_service import service from octavia.controller.queue import endpoint from octavia.i18n import _LI @@ -24,38 +23,34 @@ from octavia.i18n import _LI LOG = logging.getLogger(__name__) -class Consumer(service.Service): +class ConsumerService(cotyledon.Service): - def __init__(self): - super(Consumer, self).__init__() - self.server = None + def __init__(self, worker_id, conf): + super(ConsumerService, self).__init__(worker_id) + self.conf = conf + self.topic = conf.oslo_messaging.topic + self.server = conf.host + self.endpoints = [endpoint.Endpoint()] + self.access_policy = dispatcher.DefaultRPCAccessPolicy + self.message_listener = None - def start(self): - topic = cfg.CONF.oslo_messaging.topic - server = cfg.CONF.host - transport = messaging.get_transport(cfg.CONF) - target = messaging.Target(topic=topic, server=server, fanout=False) - endpoints = [endpoint.Endpoint()] - access_policy = dispatcher.DefaultRPCAccessPolicy - self.server = messaging.get_rpc_server(transport, target, endpoints, - executor='eventlet', - access_policy=access_policy) + def run(self): LOG.info(_LI('Starting consumer...')) - self.server.start() - super(Consumer, self).start() + transport = messaging.get_transport(self.conf) + target = messaging.Target(topic=self.topic, server=self.server, + fanout=False) + self.message_listener = messaging.get_rpc_server( + transport, target, self.endpoints, + executor='threading', access_policy=self.access_policy) + self.message_listener.start() - def stop(self, graceful=False): - if self.server: + def terminate(self, graceful=False): + if self.message_listener: LOG.info(_LI('Stopping consumer...')) - self.server.stop() + self.message_listener.stop() if graceful: LOG.info( _LI('Consumer successfully stopped. Waiting for final ' 'messages to be processed...')) - self.server.wait() - super(Consumer, self).stop(graceful=graceful) - - def reset(self): - if self.server: - self.server.reset() - super(Consumer, self).reset() + self.message_listener.wait() + super(ConsumerService, self).terminate() diff --git a/octavia/tests/unit/controller/queue/test_consumer.py b/octavia/tests/unit/controller/queue/test_consumer.py index 23f1d96e19..e9cea5c91d 100644 --- a/octavia/tests/unit/controller/queue/test_consumer.py +++ b/octavia/tests/unit/controller/queue/test_consumer.py @@ -34,9 +34,10 @@ class TestConsumer(base.TestCase): conf = self.useFixture(oslo_fixture.Config(cfg.CONF)) conf.config(group="oslo_messaging", topic='foo_topic') conf.config(host='test-hostname') + self.conf = conf.conf - def test_consumer_start(self, mock_rpc_server, mock_endpoint, mock_target, - mock_get_transport): + def test_consumer_run(self, mock_rpc_server, mock_endpoint, mock_target, + mock_get_transport): mock_get_transport_rv = mock.Mock() mock_get_transport.return_value = mock_get_transport_rv mock_rpc_server_rv = mock.Mock() @@ -46,7 +47,7 @@ class TestConsumer(base.TestCase): mock_target_rv = mock.Mock() mock_target.return_value = mock_target_rv - consumer.Consumer().start() + consumer.ConsumerService(1, self.conf).run() mock_get_transport.assert_called_once_with(cfg.CONF) mock_target.assert_called_once_with(topic='foo_topic', @@ -57,27 +58,27 @@ class TestConsumer(base.TestCase): mock_rpc_server.assert_called_once_with(mock_get_transport_rv, mock_target_rv, [mock_endpoint_rv], - executor='eventlet', + executor='threading', access_policy=access_policy) - def test_consumer_stop(self, mock_rpc_server, mock_endpoint, mock_target, - mock_get_transport): + def test_consumer_terminate(self, mock_rpc_server, mock_endpoint, + mock_target, mock_get_transport): mock_rpc_server_rv = mock.Mock() mock_rpc_server.return_value = mock_rpc_server_rv - cons = consumer.Consumer() - cons.start() - cons.stop() + cons = consumer.ConsumerService(1, self.conf) + cons.run() + cons.terminate() mock_rpc_server_rv.stop.assert_called_once_with() self.assertFalse(mock_rpc_server_rv.wait.called) - def test_consumer_graceful_stop(self, mock_rpc_server, mock_endpoint, - mock_target, mock_get_transport): + def test_consumer_graceful_terminate(self, mock_rpc_server, mock_endpoint, + mock_target, mock_get_transport): mock_rpc_server_rv = mock.Mock() mock_rpc_server.return_value = mock_rpc_server_rv - cons = consumer.Consumer() - cons.start() - cons.stop(graceful=True) + cons = consumer.ConsumerService(1, self.conf) + cons.run() + cons.terminate(graceful=True) mock_rpc_server_rv.stop.assert_called_once_with() mock_rpc_server_rv.wait.assert_called_once_with() diff --git a/requirements.txt b/requirements.txt index c765030dfe..1c71e74a85 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,11 +2,11 @@ # of appearance. Changing the order has an impact on the overall integration # process, which may cause wedges in the gate later. alembic>=0.8.10 # MIT +cotyledon>=1.3.0 # Apache-2.0 pecan!=1.0.2,!=1.0.3,!=1.0.4,!=1.2,>=1.0.0 # BSD pbr!=2.1.0,>=2.0.0 # Apache-2.0 SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT Babel!=2.4.0,>=2.3.4 # BSD -eventlet!=0.18.3,>=0.18.2 # MIT requests!=2.12.2,!=2.13.0,>=2.10.0 # Apache-2.0 rfc3986>=0.3.1 # Apache-2.0 keystoneauth1>=2.18.0 # Apache-2.0 @@ -24,7 +24,6 @@ oslo.messaging>=5.19.0 # Apache-2.0 oslo.middleware>=3.10.0 # Apache-2.0 oslo.policy>=1.17.0 # Apache-2.0 oslo.reports>=0.6.0 # Apache-2.0 -oslo.service>=1.10.0 # Apache-2.0 oslo.utils>=3.20.0 # Apache-2.0 pyasn1 # BSD pyasn1-modules # BSD