Get rid of coordination

We deprecated this feature in the previous release of mistral, let's get
rid of this now.

Change-Id: I0164e2a38c174156510d65f1c326f6fb646687bd
Signed-off-by: Arnaud M <arnaud.morin@gmail.com>
This commit is contained in:
Arnaud M 2025-04-23 23:44:02 +02:00
parent 73221b74c4
commit 6a2e451e21
16 changed files with 10 additions and 567 deletions

View File

@ -193,30 +193,6 @@ Note: using CLI, Environment can be created via JSON or YAML file.
:webprefix: /v2/environments
Services
--------
Through service management API, system administrator or operator can retrieve
Mistral services information of the system, including service group and service
identifier. The internal implementation of this feature make use of tooz
library, which needs coordinator backend(the most commonly used at present is
Zookeeper) installed, please refer to tooz official documentation for more
detailed instruction.
There are three service groups according to Mistral architecture currently,
namely api_group, engine_group and executor_group. The service identifier
contains name of the host that the service is running on and the process
identifier of the service on that host.
.. autotype:: mistral.api.controllers.v2.resources.Service
:members:
.. autotype:: mistral.api.controllers.v2.resources.Services
:members:
.. rest-controller:: mistral.api.controllers.v2.service:ServicesController
:webprefix: /v2/services
Validation
----------

View File

@ -26,7 +26,6 @@ from mistral import config as m_config
from mistral import context as ctx
from mistral.db.v2 import api as db_api_v2
from mistral.rpc import base as rpc
from mistral.service import coordination
from mistral.services import periodic
@ -62,8 +61,6 @@ def setup_app(config=None):
if cfg.CONF.cron_trigger.enabled:
periodic.setup()
coordination.Service('api_group').register_membership()
app = pecan.make_app(
app_conf.pop('root'),
hooks=lambda: [ctx.AuthHook(), maintenance.MaintenanceHook(),

View File

@ -26,7 +26,6 @@ from mistral.api.controllers.v2 import dynamic_action
from mistral.api.controllers.v2 import environment
from mistral.api.controllers.v2 import event_trigger
from mistral.api.controllers.v2 import execution
from mistral.api.controllers.v2 import service
from mistral.api.controllers.v2 import task
from mistral.api.controllers.v2 import workbook
from mistral.api.controllers.v2 import workflow
@ -58,7 +57,6 @@ class Controller(object):
cron_triggers = cron_trigger.CronTriggersController()
environments = environment.EnvironmentController()
action_executions = action_execution.ActionExecutionsController()
services = service.ServicesController()
event_triggers = event_trigger.EventTriggersController()
@wsme_pecan.wsexpose(RootResource)

View File

@ -1,84 +0,0 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
from pecan import rest
import tooz.coordination
import wsmeext.pecan as wsme_pecan
from mistral.api import access_control as acl
from mistral.api.controllers.v2 import resources
from mistral import context
from mistral import exceptions as exc
from mistral.service import coordination
from mistral.utils import rest_utils
LOG = logging.getLogger(__name__)
class ServicesController(rest.RestController):
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.Services)
def get_all(self):
"""Return all services."""
acl.enforce('services:list', context.ctx())
LOG.debug("Fetch services.")
if not cfg.CONF.coordination.backend_url:
raise exc.CoordinationNotSupportedException("Service API "
"is not supported.")
service_coordinator = coordination.get_service_coordinator()
if not service_coordinator.is_active():
raise exc.CoordinationException(
"Failed to connect to coordination backend."
)
# Should be the same as LAUNCH_OPTIONS in launch.py
# At the moment there is a duplication, need to solve it.
# We cannot depend on launch.py since it uses eventlet monkey patch
# under wsgi it causes problems
mistral_services = {'api', 'engine', 'executor',
'event-engine', 'notifier'}
services_list = []
service_group = ['%s_group' % i for i in mistral_services]
try:
for group in service_group:
members = service_coordinator.get_members(group)
members_list = [
resources.Service.from_dict(
{
'type': group,
'name': member
}
)
for member in members
]
services_list.extend(members_list)
except tooz.coordination.ToozError as e:
# In the scenario of network interruption or manually shutdown
# connection shutdown, ToozError will be raised.
raise exc.CoordinationException(
"Failed to get service members from coordination backend. %s"
% str(e)
)
return resources.Services(services=services_list)

View File

@ -632,23 +632,6 @@ context_versioning_opts = [
)
]
coordination_opts = [
cfg.StrOpt(
'backend_url',
secret=True,
deprecated_for_removal=True,
deprecated_reason='Coordination will be removed from mistral code',
help=_('The backend URL to be used for coordination')
),
cfg.FloatOpt(
'heartbeat_interval',
default=5.0,
deprecated_for_removal=True,
deprecated_reason='This option has been unused and has had no effect',
help=_('Number of seconds between heartbeats for coordination.')
)
]
profiler_opts = profiler.list_opts()[0][1]
profiler_opts.append(
cfg.StrOpt(
@ -807,7 +790,6 @@ CRON_TRIGGER_GROUP = 'cron_trigger'
EVENT_ENGINE_GROUP = 'event_engine'
NOTIFIER_GROUP = 'notifier'
PECAN_GROUP = 'pecan'
COORDINATION_GROUP = 'coordination'
EXECUTION_EXPIRATION_POLICY_GROUP = 'execution_expiration_policy'
ACTION_HEARTBEAT_GROUP = 'action_heartbeat'
ACTION_LOGGING_GROUP = 'action_logging'
@ -851,7 +833,6 @@ CONF.register_opts(context_versioning_opts, group=CONTEXT_VERSIONING_GROUP)
CONF.register_opts(event_engine_opts, group=EVENT_ENGINE_GROUP)
CONF.register_opts(notifier_opts, group=NOTIFIER_GROUP)
CONF.register_opts(pecan_opts, group=PECAN_GROUP)
CONF.register_opts(coordination_opts, group=COORDINATION_GROUP)
CONF.register_opts(profiler_opts, group=PROFILER_GROUP)
CONF.register_opts(keycloak_oidc_opts, group=KEYCLOAK_OIDC_GROUP)
CONF.register_opts(yaql_opts, group=YAQL_GROUP)
@ -896,7 +877,6 @@ def list_opts():
(CRON_TRIGGER_GROUP, cron_trigger_opts),
(NOTIFIER_GROUP, notifier_opts),
(PECAN_GROUP, pecan_opts),
(COORDINATION_GROUP, coordination_opts),
(EXECUTION_EXPIRATION_POLICY_GROUP, execution_expiration_policy_opts),
(PROFILER_GROUP, profiler_opts),
(KEYCLOAK_OIDC_GROUP, keycloak_oidc_opts),

View File

@ -44,12 +44,11 @@ class EngineServer(service_base.MistralService):
"""Engine server.
This class manages engine life-cycle and gets registered as an RPC
endpoint to process engine specific calls. It also registers a
cluster member associated with this instance of engine.
endpoint to process engine specific calls.
"""
def __init__(self, engine, setup_profiler=True):
super(EngineServer, self).__init__('engine_group', setup_profiler)
super(EngineServer, self).__init__(setup_profiler)
self.engine = engine
self._rpc_server = None

View File

@ -29,13 +29,11 @@ class EventEngineServer(service_base.MistralService):
"""RPC EventEngine server.
This class manages event engine life-cycle and gets registered as
an RPC endpoint to process event engine specific calls. It also
registers a cluster member associated with this instance of event
engine.
an RPC endpoint to process event engine specific calls.
"""
def __init__(self, event_engine):
super(EventEngineServer, self).__init__('event-engine_group')
super(EventEngineServer, self).__init__()
self._event_engine = event_engine
self._rpc_server = None

View File

@ -31,12 +31,11 @@ class ExecutorServer(service_base.MistralService):
"""Executor server.
This class manages executor life-cycle and gets registered as an RPC
endpoint to process executor specific calls. It also registers a
cluster member associated with this instance of executor.
endpoint to process executor specific calls.
"""
def __init__(self, executor, setup_profiler=True):
super(ExecutorServer, self).__init__('executor_group', setup_profiler)
super(ExecutorServer, self).__init__(setup_profiler)
self.executor = executor
self._rpc_server = None

View File

@ -27,10 +27,7 @@ LOG = logging.getLogger(__name__)
class NotificationServer(service_base.MistralService):
def __init__(self, notifier, setup_profiler=True):
super(NotificationServer, self).__init__(
'notifier_group',
setup_profiler
)
super(NotificationServer, self).__init__(setup_profiler)
self.notifier = notifier
self._rpc_server = None

View File

@ -33,10 +33,7 @@ class SchedulerServer(service_base.MistralService):
"""
def __init__(self, scheduler, setup_profiler=True):
super(SchedulerServer, self).__init__(
'scheduler_group',
setup_profiler
)
super(SchedulerServer, self).__init__(setup_profiler)
self.scheduler = scheduler
self._rpc_server = None

View File

@ -17,8 +17,6 @@ import threading
from oslo_log import log as logging
from oslo_service import service
from mistral.service import coordination
LOG = logging.getLogger(__name__)
@ -26,11 +24,10 @@ class MistralService(service.Service):
"""Base class for Mistral services.
The term 'service' here means any Mistral component that can run as
an independent process and thus can be registered as a cluster member.
an independent process.
"""
def __init__(self, cluster_group, setup_profiler=True):
def __init__(self, setup_profiler=True):
super(MistralService, self).__init__()
self.cluster_member = coordination.Service(cluster_group)
self._setup_profiler = setup_profiler
self._started = threading.Event()
@ -44,7 +41,6 @@ class MistralService(service.Service):
def start(self):
super(MistralService, self).start()
self.cluster_member.register_membership()
def stop(self, graceful=False):
super(MistralService, self).stop(graceful)
@ -53,7 +49,3 @@ class MistralService(service.Service):
# TODO(rakhmerov): Probably we could also take care of an RPC server
# if it exists for this particular service type. Take a look at
# executor and engine servers.
# TODO(rakhmerov): This method is not implemented correctly now
# (not thread-safe). Uncomment this call once it's fixed.
# self.cluster_member.stop()

View File

@ -1,183 +0,0 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log
import tenacity
import tooz.coordination
from mistral_lib import utils
LOG = log.getLogger(__name__)
_SERVICE_COORDINATOR = None
class ServiceCoordinator(object):
"""Service coordinator.
This class uses the `tooz` library to manage group membership.
To ensure that the other agents know this agent is still alive,
the `heartbeat` method should be called periodically.
"""
def __init__(self, my_id=None):
self._coordinator = None
self._my_id = my_id or utils.get_process_identifier()
self._my_id = self._my_id.encode("latin-1")
self._started = False
def start(self):
backend_url = cfg.CONF.coordination.backend_url
if backend_url:
try:
self._coordinator = tooz.coordination.get_coordinator(
backend_url,
self._my_id
)
self._coordinator.start(start_heart=True)
self._started = True
LOG.info('Coordination backend started successfully.')
except tooz.coordination.ToozError as e:
self._started = False
LOG.exception('Error connecting to coordination backend. '
'%s', str(e))
def stop(self):
if not self.is_active():
return
try:
self._coordinator.stop()
except tooz.coordination.ToozError:
LOG.warning('Error connecting to coordination backend.')
finally:
self._coordinator = None
self._started = False
def is_active(self):
return self._coordinator and self._started
@tenacity.retry(stop=tenacity.stop_after_attempt(5))
def join_group(self, group_id):
if not self.is_active() or not group_id:
return
try:
join_req = self._coordinator.join_group(group_id.encode("latin-1"))
join_req.get()
LOG.info(
'Joined service group:%s, member:%s',
group_id,
self._my_id
)
return
except tooz.coordination.MemberAlreadyExist:
return
except tooz.coordination.GroupNotCreated as e:
create_grp_req = self._coordinator.create_group(
group_id.encode("latin-1"))
try:
create_grp_req.get()
except tooz.coordination.GroupAlreadyExist:
pass
# Re-raise exception to join group again.
raise e
def leave_group(self, group_id):
if self.is_active():
self._coordinator.leave_group(group_id.encode("latin-1"))
LOG.info(
'Left service group:%s, member:%s',
group_id,
self._my_id
)
def get_members(self, group_id):
"""Gets members of coordination group.
ToozError exception must be handled when this function is invoded, we
leave it to the invoker for the handling decision.
"""
if not self.is_active():
return []
get_members_req = self._coordinator.get_members(
group_id.encode("latin-1"))
try:
members = get_members_req.get()
LOG.debug('Members of group %s: %s', group_id, members)
return members
except tooz.coordination.GroupNotCreated:
LOG.warning('Group %s does not exist.', group_id)
return []
def cleanup_service_coordinator():
"""Intends to be used by tests to recreate service coordinator."""
global _SERVICE_COORDINATOR
_SERVICE_COORDINATOR = None
def get_service_coordinator(my_id=None):
global _SERVICE_COORDINATOR
if not _SERVICE_COORDINATOR:
_SERVICE_COORDINATOR = ServiceCoordinator(my_id=my_id)
_SERVICE_COORDINATOR.start()
return _SERVICE_COORDINATOR
class Service(object):
def __init__(self, group_type):
self.group_type = group_type
@lockutils.synchronized('service_coordinator')
def register_membership(self):
"""Registers group membership.
Because this method will be invoked on each service startup almost at
the same time, so it must be synchronized, in case all the services
are started within same process.
"""
service_coordinator = get_service_coordinator()
if service_coordinator.is_active():
service_coordinator.join_group(self.group_type)
def stop(self):
service_coordinator = get_service_coordinator()
if service_coordinator.is_active():
service_coordinator.stop()

View File

@ -1,75 +0,0 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import mock
from oslo_config import cfg
import tooz.coordination
from webtest import app as webtest_app
from mistral.service import coordination
from mistral.tests.unit.api import base
class TestServicesController(base.APITest):
def test_get_all(self):
cfg.CONF.set_default('backend_url', 'zake://', 'coordination')
coordination.cleanup_service_coordinator()
service_coordinator = coordination.get_service_coordinator(
my_id='service1'
)
service_coordinator.join_group('api_group')
resp = self.app.get('/v2/services')
self.assertEqual(200, resp.status_int)
self.assertEqual(1, len(resp.json['services']))
srv_ret = [{"name": "service1", "type": "api_group"}]
self.assertCountEqual(srv_ret, resp.json['services'])
def test_get_all_without_backend(self):
cfg.CONF.set_default('backend_url', None, 'coordination')
coordination.cleanup_service_coordinator()
coordination.get_service_coordinator()
context = self.assertRaises(
webtest_app.AppError,
self.app.get,
'/v2/services',
)
self.assertIn('Service API is not supported', context.args[0])
@mock.patch('mistral.service.coordination.ServiceCoordinator.get_members',
side_effect=tooz.coordination.ToozError('error message'))
def test_get_all_with_get_members_error(self, mock_get_members):
cfg.CONF.set_default('backend_url', 'zake://', 'coordination')
coordination.cleanup_service_coordinator()
coordination.get_service_coordinator()
context = self.assertRaises(
webtest_app.AppError,
self.app.get,
'/v2/services',
)
self.assertIn(
'Failed to get service members from coordination backend',
context.args[0]
)

View File

@ -1,146 +0,0 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import mock
from oslo_config import cfg
from mistral.service import coordination
from mistral.tests.unit import base
class ServiceCoordinatorTest(base.BaseTest):
def test_start(self):
cfg.CONF.set_default(
'backend_url',
'zake://',
'coordination'
)
coordinator = coordination.ServiceCoordinator('fake_id')
coordinator.start()
self.assertTrue(coordinator.is_active())
def test_start_without_backend(self):
cfg.CONF.set_default('backend_url', None, 'coordination')
coordinator = coordination.ServiceCoordinator()
coordinator.start()
self.assertFalse(coordinator.is_active())
def test_stop_not_active(self):
cfg.CONF.set_default('backend_url', None, 'coordination')
coordinator = coordination.ServiceCoordinator()
coordinator.start()
coordinator.stop()
self.assertFalse(coordinator.is_active())
def test_stop(self):
cfg.CONF.set_default(
'backend_url',
'zake://',
'coordination'
)
coordinator = coordination.ServiceCoordinator()
coordinator.start()
coordinator.stop()
self.assertFalse(coordinator.is_active())
def test_join_group_not_active(self):
cfg.CONF.set_default('backend_url', None, 'coordination')
coordinator = coordination.ServiceCoordinator()
coordinator.start()
coordinator.join_group('fake_group')
members = coordinator.get_members('fake_group')
self.assertFalse(coordinator.is_active())
self.assertEqual(0, len(members))
def test_join_group_and_get_members(self):
cfg.CONF.set_default(
'backend_url',
'zake://',
'coordination'
)
coordinator = coordination.ServiceCoordinator(my_id='fake_id')
coordinator.start()
coordinator.join_group('fake_group')
members = coordinator.get_members('fake_group')
self.assertEqual(1, len(members))
self.assertCountEqual(('fake_id'.encode("latin-1"),), members)
def test_join_group_and_leave_group(self):
cfg.CONF.set_default(
'backend_url',
'zake://',
'coordination'
)
coordinator = coordination.ServiceCoordinator(my_id='fake_id')
coordinator.start()
coordinator.join_group('fake_group')
members_before = coordinator.get_members('fake_group')
coordinator.leave_group('fake_group')
members_after = coordinator.get_members('fake_group')
self.assertEqual(1, len(members_before))
self.assertEqual(set(['fake_id'.encode("latin-1")]), members_before)
self.assertEqual(0, len(members_after))
self.assertEqual(set([]), members_after)
class ServiceTest(base.BaseTest):
def setUp(self):
super(ServiceTest, self).setUp()
# Re-initialize the global service coordinator object, in order to use
# new coordination configuration.
coordination.cleanup_service_coordinator()
@mock.patch('mistral_lib.utils.get_process_identifier',
return_value='fake_id')
def test_register_membership(self, mock_get_identifier):
cfg.CONF.set_default('backend_url', 'zake://', 'coordination')
srv = coordination.Service('fake_group')
srv.register_membership()
self.addCleanup(srv.stop)
srv_coordinator = coordination.get_service_coordinator()
self.assertIsNotNone(srv_coordinator)
self.assertTrue(srv_coordinator.is_active())
members = srv_coordinator.get_members('fake_group')
mock_get_identifier.assert_called_once_with()
self.assertEqual(set(['fake_id'.encode("latin-1")]), members)

View File

@ -37,5 +37,4 @@ SQLAlchemy>=1.2.5 # MIT
stevedore>=1.20.0 # Apache-2.0
WSME>=0.8.0 # MIT
yaql>=3.0.0 # Apache 2.0 License
tooz>=1.58.0 # Apache-2.0

View File

@ -10,4 +10,3 @@ tempest>=21.0.0 # Apache-2.0
stestr>=2.0.0 # Apache-2.0
testtools>=2.2.0 # MIT
WebTest>= 3.0.0 # Apache-2.0
zake>=0.1.6 # Apache-2.0