Removing dependency on eventlet and oslo.service

Change-Id: I453e9b86d4edfedd63cc59e47bf745e166ff836f
This commit is contained in:
Adam Harwell 2017-04-08 05:13:21 +09:00
parent 84b5078798
commit 9027154a5a
7 changed files with 57 additions and 54 deletions

View File

@ -134,6 +134,7 @@ function octavia_configure {
iniuncomment $OCTAVIA_CONF haproxy_amphora rest_request_read_timeout
iniuncomment $OCTAVIA_CONF controller_worker amp_active_retries
iniuncomment $OCTAVIA_CONF controller_worker amp_active_wait_sec
iniuncomment $OCTAVIA_CONF controller_worker workers
# devstack optimizations for tempest runs
iniset $OCTAVIA_CONF haproxy_amphora connection_max_retries 1500
@ -142,6 +143,7 @@ function octavia_configure {
iniset $OCTAVIA_CONF haproxy_amphora rest_request_read_timeout ${OCTAVIA_AMP_READ_TIMEOUT}
iniset $OCTAVIA_CONF controller_worker amp_active_retries 100
iniset $OCTAVIA_CONF controller_worker amp_active_wait_sec 2
iniset $OCTAVIA_CONF controller_worker workers 2
if [[ -a $OCTAVIA_SSH_DIR ]] ; then
rm -rf $OCTAVIA_SSH_DIR

View File

@ -147,6 +147,7 @@
# rest_request_read_timeout = 60
[controller_worker]
# workers = 1
# amp_active_retries = 10
# amp_active_wait_sec = 10
# Glance parameters to extract image ID to use for amphora. Only one of

View File

@ -14,15 +14,14 @@
import sys
import eventlet
eventlet.monkey_patch()
from oslo_config import cfg # noqa: E402
from oslo_reports import guru_meditation_report as gmr # noqa: E402
from oslo_service import service # noqa: E402
import cotyledon
from cotyledon import oslo_config_glue
from oslo_config import cfg
from oslo_reports import guru_meditation_report as gmr
from octavia.common import service as octavia_service # noqa: E402
from octavia.controller.queue import consumer # noqa: E402
from octavia import version # noqa: E402
from octavia.common import service as octavia_service
from octavia.controller.queue import consumer
from octavia import version
CONF = cfg.CONF
@ -32,5 +31,8 @@ def main():
gmr.TextGuruMeditation.setup_autorun(version)
launcher = service.launch(CONF, consumer.Consumer())
launcher.wait()
sm = cotyledon.ServiceManager()
sm.add(consumer.ConsumerService, workers=CONF.controller_worker.workers,
args=(CONF,))
oslo_config_glue.setup(sm, CONF)
sm.run()

View File

@ -216,6 +216,9 @@ haproxy_amphora_opts = [
]
controller_worker_opts = [
cfg.IntOpt('workers',
default=1, min=1,
help='Number of workers for the controller-worker service.'),
cfg.IntOpt('amp_active_retries',
default=10,
help=_('Retry attempts to wait for Amphora to become active')),

View File

@ -12,11 +12,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
import cotyledon
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_messaging.rpc import dispatcher
from oslo_service import service
from octavia.controller.queue import endpoint
from octavia.i18n import _LI
@ -24,38 +23,34 @@ from octavia.i18n import _LI
LOG = logging.getLogger(__name__)
class Consumer(service.Service):
class ConsumerService(cotyledon.Service):
def __init__(self):
super(Consumer, self).__init__()
self.server = None
def __init__(self, worker_id, conf):
super(ConsumerService, self).__init__(worker_id)
self.conf = conf
self.topic = conf.oslo_messaging.topic
self.server = conf.host
self.endpoints = [endpoint.Endpoint()]
self.access_policy = dispatcher.DefaultRPCAccessPolicy
self.message_listener = None
def start(self):
topic = cfg.CONF.oslo_messaging.topic
server = cfg.CONF.host
transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic=topic, server=server, fanout=False)
endpoints = [endpoint.Endpoint()]
access_policy = dispatcher.DefaultRPCAccessPolicy
self.server = messaging.get_rpc_server(transport, target, endpoints,
executor='eventlet',
access_policy=access_policy)
def run(self):
LOG.info(_LI('Starting consumer...'))
self.server.start()
super(Consumer, self).start()
transport = messaging.get_transport(self.conf)
target = messaging.Target(topic=self.topic, server=self.server,
fanout=False)
self.message_listener = messaging.get_rpc_server(
transport, target, self.endpoints,
executor='threading', access_policy=self.access_policy)
self.message_listener.start()
def stop(self, graceful=False):
if self.server:
def terminate(self, graceful=False):
if self.message_listener:
LOG.info(_LI('Stopping consumer...'))
self.server.stop()
self.message_listener.stop()
if graceful:
LOG.info(
_LI('Consumer successfully stopped. Waiting for final '
'messages to be processed...'))
self.server.wait()
super(Consumer, self).stop(graceful=graceful)
def reset(self):
if self.server:
self.server.reset()
super(Consumer, self).reset()
self.message_listener.wait()
super(ConsumerService, self).terminate()

View File

@ -34,9 +34,10 @@ class TestConsumer(base.TestCase):
conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
conf.config(group="oslo_messaging", topic='foo_topic')
conf.config(host='test-hostname')
self.conf = conf.conf
def test_consumer_start(self, mock_rpc_server, mock_endpoint, mock_target,
mock_get_transport):
def test_consumer_run(self, mock_rpc_server, mock_endpoint, mock_target,
mock_get_transport):
mock_get_transport_rv = mock.Mock()
mock_get_transport.return_value = mock_get_transport_rv
mock_rpc_server_rv = mock.Mock()
@ -46,7 +47,7 @@ class TestConsumer(base.TestCase):
mock_target_rv = mock.Mock()
mock_target.return_value = mock_target_rv
consumer.Consumer().start()
consumer.ConsumerService(1, self.conf).run()
mock_get_transport.assert_called_once_with(cfg.CONF)
mock_target.assert_called_once_with(topic='foo_topic',
@ -57,27 +58,27 @@ class TestConsumer(base.TestCase):
mock_rpc_server.assert_called_once_with(mock_get_transport_rv,
mock_target_rv,
[mock_endpoint_rv],
executor='eventlet',
executor='threading',
access_policy=access_policy)
def test_consumer_stop(self, mock_rpc_server, mock_endpoint, mock_target,
mock_get_transport):
def test_consumer_terminate(self, mock_rpc_server, mock_endpoint,
mock_target, mock_get_transport):
mock_rpc_server_rv = mock.Mock()
mock_rpc_server.return_value = mock_rpc_server_rv
cons = consumer.Consumer()
cons.start()
cons.stop()
cons = consumer.ConsumerService(1, self.conf)
cons.run()
cons.terminate()
mock_rpc_server_rv.stop.assert_called_once_with()
self.assertFalse(mock_rpc_server_rv.wait.called)
def test_consumer_graceful_stop(self, mock_rpc_server, mock_endpoint,
mock_target, mock_get_transport):
def test_consumer_graceful_terminate(self, mock_rpc_server, mock_endpoint,
mock_target, mock_get_transport):
mock_rpc_server_rv = mock.Mock()
mock_rpc_server.return_value = mock_rpc_server_rv
cons = consumer.Consumer()
cons.start()
cons.stop(graceful=True)
cons = consumer.ConsumerService(1, self.conf)
cons.run()
cons.terminate(graceful=True)
mock_rpc_server_rv.stop.assert_called_once_with()
mock_rpc_server_rv.wait.assert_called_once_with()

View File

@ -2,11 +2,11 @@
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
alembic>=0.8.10 # MIT
cotyledon>=1.3.0 # Apache-2.0
pecan!=1.0.2,!=1.0.3,!=1.0.4,!=1.2,>=1.0.0 # BSD
pbr!=2.1.0,>=2.0.0 # Apache-2.0
SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT
Babel!=2.4.0,>=2.3.4 # BSD
eventlet!=0.18.3,>=0.18.2 # MIT
requests!=2.12.2,!=2.13.0,>=2.10.0 # Apache-2.0
rfc3986>=0.3.1 # Apache-2.0
keystoneauth1>=2.18.0 # Apache-2.0
@ -24,7 +24,6 @@ oslo.messaging>=5.19.0 # Apache-2.0
oslo.middleware>=3.10.0 # Apache-2.0
oslo.policy>=1.17.0 # Apache-2.0
oslo.reports>=0.6.0 # Apache-2.0
oslo.service>=1.10.0 # Apache-2.0
oslo.utils>=3.20.0 # Apache-2.0
pyasn1 # BSD
pyasn1-modules # BSD