Merge "Add coordination feature to mistral service"
This commit is contained in:
commit
29ff8478d2
@ -19,6 +19,7 @@ import pecan
|
||||
|
||||
from mistral.api import access_control
|
||||
from mistral import context as ctx
|
||||
from mistral import coordination
|
||||
from mistral.db.v2 import api as db_api_v2
|
||||
from mistral.services import periodic
|
||||
|
||||
@ -49,6 +50,8 @@ def setup_app(config=None):
|
||||
|
||||
periodic.setup()
|
||||
|
||||
coordination.Service('api_group').register_membership()
|
||||
|
||||
app = pecan.make_app(
|
||||
app_conf.pop('root'),
|
||||
hooks=lambda: [ctx.ContextHook(), ctx.AuthHook()],
|
||||
|
@ -52,6 +52,7 @@ from mistral import version
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@ -73,6 +74,8 @@ def launch_executor(transport):
|
||||
serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer())
|
||||
)
|
||||
|
||||
executor_v2.register_membership()
|
||||
|
||||
server.start()
|
||||
server.wait()
|
||||
|
||||
@ -102,6 +105,8 @@ def launch_engine(transport):
|
||||
serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer())
|
||||
)
|
||||
|
||||
engine_v2.register_membership()
|
||||
|
||||
server.start()
|
||||
server.wait()
|
||||
|
||||
|
@ -14,8 +14,10 @@
|
||||
|
||||
import six
|
||||
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from oslo_service import threadgroup
|
||||
from retrying import retry
|
||||
import tooz.coordination
|
||||
|
||||
@ -24,6 +26,8 @@ from mistral import utils
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
_SERVICE_COORDINATOR = None
|
||||
|
||||
|
||||
class ServiceCoordinator(object):
|
||||
"""Service coordinator.
|
||||
@ -143,3 +147,56 @@ class ServiceCoordinator(object):
|
||||
LOG.warning('Group %s does not exist.', group_id)
|
||||
|
||||
return []
|
||||
|
||||
|
||||
def cleanup_service_coordinator():
|
||||
"""Intends to be used by tests to recreate service coordinator."""
|
||||
|
||||
global _SERVICE_COORDINATOR
|
||||
|
||||
_SERVICE_COORDINATOR = None
|
||||
|
||||
|
||||
def get_service_coordinator(my_id=None):
|
||||
global _SERVICE_COORDINATOR
|
||||
|
||||
if not _SERVICE_COORDINATOR:
|
||||
_SERVICE_COORDINATOR = ServiceCoordinator(my_id=my_id)
|
||||
_SERVICE_COORDINATOR.start()
|
||||
|
||||
return _SERVICE_COORDINATOR
|
||||
|
||||
|
||||
class Service(object):
|
||||
def __init__(self, group_type):
|
||||
self.group_type = group_type
|
||||
self._tg = None
|
||||
|
||||
@lockutils.synchronized('service_coordinator')
|
||||
def register_membership(self):
|
||||
"""Registers group membership.
|
||||
|
||||
Because this method will be invoked on each service startup almost at
|
||||
the same time, so it must be synchronized, in case all the services
|
||||
are started within same process.
|
||||
"""
|
||||
|
||||
service_coordinator = get_service_coordinator()
|
||||
|
||||
if service_coordinator.is_active():
|
||||
service_coordinator.join_group(self.group_type)
|
||||
|
||||
self._tg = threadgroup.ThreadGroup()
|
||||
|
||||
self._tg.add_timer(
|
||||
cfg.CONF.coordination.heartbeat_interval,
|
||||
service_coordinator.heartbeat
|
||||
)
|
||||
|
||||
def stop(self):
|
||||
service_coordinator = get_service_coordinator()
|
||||
|
||||
if service_coordinator.is_active():
|
||||
self._tg.stop()
|
||||
|
||||
service_coordinator.stop()
|
@ -18,6 +18,7 @@ import traceback
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from mistral import coordination
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.db.v2.sqlalchemy import models as db_models
|
||||
from mistral.engine import action_handler
|
||||
@ -43,10 +44,12 @@ LOG = logging.getLogger(__name__)
|
||||
# the submodules are referenced.
|
||||
|
||||
|
||||
class DefaultEngine(base.Engine):
|
||||
class DefaultEngine(base.Engine, coordination.Service):
|
||||
def __init__(self, engine_client):
|
||||
self._engine_client = engine_client
|
||||
|
||||
coordination.Service.__init__(self, 'engine_group')
|
||||
|
||||
@u.log_exec(LOG)
|
||||
def start_workflow(self, wf_name, wf_input, description='', **params):
|
||||
wf_exec_id = None
|
||||
|
@ -13,9 +13,11 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from mistral.actions import action_factory as a_f
|
||||
from mistral import coordination
|
||||
from mistral.engine import base
|
||||
from mistral import exceptions as exc
|
||||
from mistral.utils import inspect_utils as i_u
|
||||
@ -25,10 +27,12 @@ from mistral.workflow import utils as wf_utils
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DefaultExecutor(base.Executor):
|
||||
class DefaultExecutor(base.Executor, coordination.Service):
|
||||
def __init__(self, engine_client):
|
||||
self._engine_client = engine_client
|
||||
|
||||
coordination.Service.__init__(self, 'executor_group')
|
||||
|
||||
def run_action(self, action_ex_id, action_class_str, attributes,
|
||||
action_params):
|
||||
"""Runs action.
|
||||
|
@ -12,15 +12,16 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
|
||||
from mistral import coordination
|
||||
from mistral.tests import base
|
||||
from mistral.utils import coordination
|
||||
|
||||
|
||||
class CoordinationTest(base.BaseTest):
|
||||
class ServiceCoordinatorTest(base.BaseTest):
|
||||
def setUp(self):
|
||||
super(CoordinationTest, self).setUp()
|
||||
super(ServiceCoordinatorTest, self).setUp()
|
||||
|
||||
def test_start(self):
|
||||
cfg.CONF.set_default(
|
||||
@ -116,3 +117,31 @@ class CoordinationTest(base.BaseTest):
|
||||
|
||||
self.assertEqual(0, len(members_after))
|
||||
self.assertEqual(set([]), members_after)
|
||||
|
||||
|
||||
class ServiceTest(base.BaseTest):
|
||||
def setUp(self):
|
||||
super(ServiceTest, self).setUp()
|
||||
|
||||
# Re-intialize the global service coordinator object, in order to use
|
||||
# new coordination configuration.
|
||||
coordination.cleanup_service_coordinator()
|
||||
|
||||
@mock.patch('mistral.utils.get_process_identifier', return_value='fake_id')
|
||||
def test_register_membership(self, mock_get_identifier):
|
||||
cfg.CONF.set_default('backend_url', 'zake://', 'coordination')
|
||||
|
||||
srv = coordination.Service('fake_group')
|
||||
srv.register_membership()
|
||||
|
||||
self.addCleanup(srv.stop)
|
||||
|
||||
srv_coordinator = coordination.get_service_coordinator()
|
||||
|
||||
self.assertIsNotNone(srv_coordinator)
|
||||
self.assertTrue(srv_coordinator.is_active())
|
||||
|
||||
members = srv_coordinator.get_members('fake_group')
|
||||
|
||||
mock_get_identifier.assert_called_once_with()
|
||||
self.assertEqual(set(['fake_id']), members)
|
@ -12,6 +12,7 @@ keystonemiddleware>=1.5.0
|
||||
kombu>=3.0.7
|
||||
mock>=1.0
|
||||
networkx>=1.8
|
||||
oslo.concurrency>=2.3.0 # Apache-2.0
|
||||
oslo.config>=1.11.0 # Apache-2.0
|
||||
oslo.db>=1.10.0 # Apache-2.0
|
||||
oslo.messaging!=1.12.0,>=1.8.0 # Apache-2.0
|
||||
|
Loading…
Reference in New Issue
Block a user