From d838607b2f2d112e867ce50dfae397e8eb0af633 Mon Sep 17 00:00:00 2001 From: Eyal Date: Tue, 24 Dec 2019 13:40:37 +0200 Subject: [PATCH] Add coordination support for devstack This way we can test the service api later on gate In order to use etcd in gate few changes were made: * All identifiers must be byte type (group type, member id) * Tooz has a built-in mechanizm for heartbeat no need to implement it * Need to use eventlet monkey patch since etcd client uses blocking methods * Services name must be identical to LAUNCH_OPTIONS used in cli * Gate coordination url should be define with a schema of etcd+http which is the etcd gateway and works better then just etcd Change-Id: I772651e33eada4a5c2149bfa867095c277eddeed --- devstack/plugin.sh | 6 ++++ mistral/api/app.py | 3 ++ mistral/event_engine/event_engine_server.py | 2 +- mistral/service/coordination.py | 40 ++++----------------- 4 files changed, 16 insertions(+), 35 deletions(-) diff --git a/devstack/plugin.sh b/devstack/plugin.sh index 287d64946..35406d179 100755 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -105,6 +105,12 @@ function configure_mistral { if [ "$MISTRAL_USE_MOD_WSGI" == "True" ]; then _config_mistral_apache_wsgi fi + + if [[ ! -z "$MISTRAL_COORDINATION_URL" ]]; then + iniset $MISTRAL_CONF_FILE coordination backend_url "$MISTRAL_COORDINATION_URL" + elif is_service_enabled etcd3; then + iniset $MISTRAL_CONF_FILE coordination backend_url "etcd3+http://${SERVICE_HOST}:$ETCD_PORT" + fi } diff --git a/mistral/api/app.py b/mistral/api/app.py index 0e02f3576..3cea22e2f 100644 --- a/mistral/api/app.py +++ b/mistral/api/app.py @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import eventlet +eventlet.monkey_patch() + from oslo_config import cfg import oslo_middleware.cors as cors_middleware import oslo_middleware.http_proxy_to_wsgi as http_proxy_to_wsgi_middleware diff --git a/mistral/event_engine/event_engine_server.py b/mistral/event_engine/event_engine_server.py index 1d76122be..73ad0d3fc 100644 --- a/mistral/event_engine/event_engine_server.py +++ b/mistral/event_engine/event_engine_server.py @@ -35,7 +35,7 @@ class EventEngineServer(service_base.MistralService): """ def __init__(self, event_engine): - super(EventEngineServer, self).__init__('event_engine_group') + super(EventEngineServer, self).__init__('event-engine_group') self._event_engine = event_engine self._rpc_server = None diff --git a/mistral/service/coordination.py b/mistral/service/coordination.py index 26a4952b7..596ab4402 100644 --- a/mistral/service/coordination.py +++ b/mistral/service/coordination.py @@ -17,7 +17,6 @@ import six from oslo_concurrency import lockutils from oslo_config import cfg from oslo_log import log -from oslo_service import threadgroup import tenacity import tooz.coordination @@ -40,7 +39,7 @@ class ServiceCoordinator(object): def __init__(self, my_id=None): self._coordinator = None - self._my_id = my_id or utils.get_process_identifier() + self._my_id = six.b(my_id or utils.get_process_identifier()) self._started = False def start(self): @@ -53,7 +52,7 @@ class ServiceCoordinator(object): self._my_id ) - self._coordinator.start() + self._coordinator.start(start_heart=True) self._started = True LOG.info('Coordination backend started successfully.') @@ -78,30 +77,13 @@ class ServiceCoordinator(object): def is_active(self): return self._coordinator and self._started - def heartbeat(self): - if not self.is_active(): - # Re-connect. - self.start() - - if not self.is_active(): - LOG.debug("Coordination backend didn't start.") - return - - try: - self._coordinator.heartbeat() - except tooz.coordination.ToozError as e: - LOG.exception('Error sending a heartbeat to coordination ' - 'backend. %s', six.text_type(e)) - - self._started = False - @tenacity.retry(stop=tenacity.stop_after_attempt(5)) def join_group(self, group_id): if not self.is_active() or not group_id: return try: - join_req = self._coordinator.join_group(group_id) + join_req = self._coordinator.join_group(six.b(group_id)) join_req.get() LOG.info( @@ -114,7 +96,7 @@ class ServiceCoordinator(object): except tooz.coordination.MemberAlreadyExist: return except tooz.coordination.GroupNotCreated as e: - create_grp_req = self._coordinator.create_group(group_id) + create_grp_req = self._coordinator.create_group(six.b(group_id)) try: create_grp_req.get() @@ -126,7 +108,7 @@ class ServiceCoordinator(object): def leave_group(self, group_id): if self.is_active(): - self._coordinator.leave_group(group_id) + self._coordinator.leave_group(six.b(group_id)) LOG.info( 'Left service group:%s, member:%s', @@ -143,7 +125,7 @@ class ServiceCoordinator(object): if not self.is_active(): return [] - get_members_req = self._coordinator.get_members(group_id) + get_members_req = self._coordinator.get_members(six.b(group_id)) try: members = get_members_req.get() @@ -178,7 +160,6 @@ def get_service_coordinator(my_id=None): class Service(object): def __init__(self, group_type): self.group_type = group_type - self._tg = None @lockutils.synchronized('service_coordinator') def register_membership(self): @@ -194,17 +175,8 @@ class Service(object): if service_coordinator.is_active(): service_coordinator.join_group(self.group_type) - self._tg = threadgroup.ThreadGroup() - - self._tg.add_timer( - cfg.CONF.coordination.heartbeat_interval, - service_coordinator.heartbeat - ) - def stop(self): service_coordinator = get_service_coordinator() if service_coordinator.is_active(): - self._tg.stop() - service_coordinator.stop()