From 2cdd95aa29b487301c896f6a35144d218f512f5d Mon Sep 17 00:00:00 2001 From: Nikolay Mahotkin Date: Mon, 22 Jun 2015 16:23:59 +0300 Subject: [PATCH] Adding OsloRPC server and client * Using new RPC interface OsloRPCServer and OsloRPCClient are created. TODO (next commits): - integrate new RPC interface in Mistral - make RPC implementation configurable - unit tests Partially implements blueprint mistral-alternative-rpc Co-Authored-By: Dawid Deja Change-Id: I6c9770a5b84509529abc14dff2b4a9f6e3411951 --- .../api/controllers/v2/action_execution.py | 2 +- mistral/api/controllers/v2/execution.py | 2 +- mistral/api/controllers/v2/task.py | 2 +- mistral/cmd/launch.py | 3 +- mistral/engine/actions.py | 2 +- .../engine/{rpc_direct => rpc}/__init__.py | 0 mistral/engine/{rpc_direct => rpc}/base.py | 0 .../{rpc_direct => rpc}/kombu/__init__.py | 0 .../engine/{rpc_direct => rpc}/kombu/base.py | 0 .../kombu/examples/__init__.py | 0 .../kombu/examples/client.py | 2 +- .../kombu/examples/server.py | 2 +- .../{rpc_direct => rpc}/kombu/kombu_client.py | 4 +- .../{rpc_direct => rpc}/kombu/kombu_server.py | 4 +- mistral/engine/rpc/oslo/__init__.py | 0 mistral/engine/rpc/oslo/oslo_client.py | 48 ++++++++++++++++ mistral/engine/rpc/oslo/oslo_server.py | 56 +++++++++++++++++++ mistral/engine/{ => rpc}/rpc.py | 0 mistral/engine/workflows.py | 2 +- mistral/services/periodic.py | 2 +- .../unit/api/v2/test_action_executions.py | 2 +- mistral/tests/unit/api/v2/test_executions.py | 4 +- mistral/tests/unit/api/v2/test_tasks.py | 2 +- mistral/tests/unit/engine/base.py | 3 +- mistral/tests/unit/engine/test_environment.py | 2 +- .../unit/services/test_trigger_service.py | 2 +- 26 files changed, 127 insertions(+), 19 deletions(-) rename mistral/engine/{rpc_direct => rpc}/__init__.py (100%) rename mistral/engine/{rpc_direct => rpc}/base.py (100%) rename mistral/engine/{rpc_direct => rpc}/kombu/__init__.py (100%) rename mistral/engine/{rpc_direct => rpc}/kombu/base.py (100%) rename mistral/engine/{rpc_direct => rpc}/kombu/examples/__init__.py (100%) rename mistral/engine/{rpc_direct => rpc}/kombu/examples/client.py (95%) rename mistral/engine/{rpc_direct => rpc}/kombu/examples/server.py (96%) rename mistral/engine/{rpc_direct => rpc}/kombu/kombu_client.py (98%) rename mistral/engine/{rpc_direct => rpc}/kombu/kombu_server.py (97%) create mode 100644 mistral/engine/rpc/oslo/__init__.py create mode 100644 mistral/engine/rpc/oslo/oslo_client.py create mode 100644 mistral/engine/rpc/oslo/oslo_server.py rename mistral/engine/{ => rpc}/rpc.py (100%) diff --git a/mistral/api/controllers/v2/action_execution.py b/mistral/api/controllers/v2/action_execution.py index bcf5ace2f..efe3d8830 100644 --- a/mistral/api/controllers/v2/action_execution.py +++ b/mistral/api/controllers/v2/action_execution.py @@ -25,7 +25,7 @@ from mistral.api.controllers import resource from mistral.api.controllers.v2 import types from mistral import context from mistral.db.v2 import api as db_api -from mistral.engine import rpc +from mistral.engine.rpc import rpc from mistral import exceptions as exc from mistral.utils import rest_utils from mistral.workflow import states diff --git a/mistral/api/controllers/v2/execution.py b/mistral/api/controllers/v2/execution.py index a7550f9bf..b5b04a47f 100644 --- a/mistral/api/controllers/v2/execution.py +++ b/mistral/api/controllers/v2/execution.py @@ -25,7 +25,7 @@ from mistral.api.controllers.v2 import task from mistral.api.controllers.v2 import types from mistral import context from mistral.db.v2 import api as db_api -from mistral.engine import rpc +from mistral.engine.rpc import rpc from mistral import exceptions as exc from mistral.services import workflows as wf_service from mistral.utils import rest_utils diff --git a/mistral/api/controllers/v2/task.py b/mistral/api/controllers/v2/task.py index 322f310ca..382ebc464 100644 --- a/mistral/api/controllers/v2/task.py +++ b/mistral/api/controllers/v2/task.py @@ -27,7 +27,7 @@ from mistral.api.controllers.v2 import action_execution from mistral.api.controllers.v2 import types from mistral import context from mistral.db.v2 import api as db_api -from mistral.engine import rpc +from mistral.engine.rpc import rpc from mistral import exceptions as exc from mistral.utils import rest_utils from mistral.workbook import parser as spec_parser diff --git a/mistral/cmd/launch.py b/mistral/cmd/launch.py index cd1efa5ac..7b5198e80 100644 --- a/mistral/cmd/launch.py +++ b/mistral/cmd/launch.py @@ -17,6 +17,7 @@ import sys import eventlet + eventlet.monkey_patch( os=True, select=True, @@ -54,7 +55,7 @@ from mistral import context as ctx from mistral.db.v2 import api as db_api from mistral.engine import default_engine as def_eng from mistral.engine import default_executor as def_executor -from mistral.engine import rpc +from mistral.engine.rpc import rpc from mistral.services import expiration_policy from mistral.services import scheduler from mistral.utils import profiler diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py index 0cf4deeed..b12dec286 100644 --- a/mistral/engine/actions.py +++ b/mistral/engine/actions.py @@ -20,7 +20,7 @@ from osprofiler import profiler import six from mistral.db.v2 import api as db_api -from mistral.engine import rpc +from mistral.engine.rpc import rpc from mistral.engine import utils as e_utils from mistral.engine import workflow_handler as wf_handler from mistral import exceptions as exc diff --git a/mistral/engine/rpc_direct/__init__.py b/mistral/engine/rpc/__init__.py similarity index 100% rename from mistral/engine/rpc_direct/__init__.py rename to mistral/engine/rpc/__init__.py diff --git a/mistral/engine/rpc_direct/base.py b/mistral/engine/rpc/base.py similarity index 100% rename from mistral/engine/rpc_direct/base.py rename to mistral/engine/rpc/base.py diff --git a/mistral/engine/rpc_direct/kombu/__init__.py b/mistral/engine/rpc/kombu/__init__.py similarity index 100% rename from mistral/engine/rpc_direct/kombu/__init__.py rename to mistral/engine/rpc/kombu/__init__.py diff --git a/mistral/engine/rpc_direct/kombu/base.py b/mistral/engine/rpc/kombu/base.py similarity index 100% rename from mistral/engine/rpc_direct/kombu/base.py rename to mistral/engine/rpc/kombu/base.py diff --git a/mistral/engine/rpc_direct/kombu/examples/__init__.py b/mistral/engine/rpc/kombu/examples/__init__.py similarity index 100% rename from mistral/engine/rpc_direct/kombu/examples/__init__.py rename to mistral/engine/rpc/kombu/examples/__init__.py diff --git a/mistral/engine/rpc_direct/kombu/examples/client.py b/mistral/engine/rpc/kombu/examples/client.py similarity index 95% rename from mistral/engine/rpc_direct/kombu/examples/client.py rename to mistral/engine/rpc/kombu/examples/client.py index 2304f2240..bb9d737a2 100644 --- a/mistral/engine/rpc_direct/kombu/examples/client.py +++ b/mistral/engine/rpc/kombu/examples/client.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from mistral.engine.rpc_direct.kombu import kombu_client +from mistral.engine.rpc.kombu import kombu_client # Example of using Kombu based RPC client. diff --git a/mistral/engine/rpc_direct/kombu/examples/server.py b/mistral/engine/rpc/kombu/examples/server.py similarity index 96% rename from mistral/engine/rpc_direct/kombu/examples/server.py rename to mistral/engine/rpc/kombu/examples/server.py index 93f24de87..85b8374a6 100644 --- a/mistral/engine/rpc_direct/kombu/examples/server.py +++ b/mistral/engine/rpc/kombu/examples/server.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from mistral.engine.rpc_direct.kombu import kombu_server +from mistral.engine.rpc.kombu import kombu_server # Simple example of endpoint of RPC server, which just diff --git a/mistral/engine/rpc_direct/kombu/kombu_client.py b/mistral/engine/rpc/kombu/kombu_client.py similarity index 98% rename from mistral/engine/rpc_direct/kombu/kombu_client.py rename to mistral/engine/rpc/kombu/kombu_client.py index 21366ba04..dcaa63a03 100644 --- a/mistral/engine/rpc_direct/kombu/kombu_client.py +++ b/mistral/engine/rpc/kombu/kombu_client.py @@ -18,8 +18,8 @@ import time import kombu from oslo_log import log as logging -from mistral.engine.rpc_direct import base as rpc_base -from mistral.engine.rpc_direct.kombu import base as kombu_base +from mistral.engine.rpc import base as rpc_base +from mistral.engine.rpc.kombu import base as kombu_base from mistral import exceptions as exc from mistral import utils diff --git a/mistral/engine/rpc_direct/kombu/kombu_server.py b/mistral/engine/rpc/kombu/kombu_server.py similarity index 97% rename from mistral/engine/rpc_direct/kombu/kombu_server.py rename to mistral/engine/rpc/kombu/kombu_server.py index 090f5add7..14ff53166 100644 --- a/mistral/engine/rpc_direct/kombu/kombu_server.py +++ b/mistral/engine/rpc/kombu/kombu_server.py @@ -19,8 +19,8 @@ import kombu from oslo_log import log as logging from mistral import context as auth_context -from mistral.engine.rpc_direct import base as rpc_base -from mistral.engine.rpc_direct.kombu import base as kombu_base +from mistral.engine.rpc import base as rpc_base +from mistral.engine.rpc.kombu import base as kombu_base from mistral import exceptions as exc diff --git a/mistral/engine/rpc/oslo/__init__.py b/mistral/engine/rpc/oslo/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mistral/engine/rpc/oslo/oslo_client.py b/mistral/engine/rpc/oslo/oslo_client.py new file mode 100644 index 000000000..4e44c3743 --- /dev/null +++ b/mistral/engine/rpc/oslo/oslo_client.py @@ -0,0 +1,48 @@ +# Copyright 2015 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import oslo_messaging as messaging + +from mistral import context as auth_ctx +from mistral.engine.rpc import base as rpc_base +from mistral.engine.rpc import rpc + + +class OsloRPCClient(rpc_base.RPCClient): + def __init__(self, conf): + super(OsloRPCClient, self).__init__(conf) + self.topic = conf.get('topic', '') + + serializer = auth_ctx.RpcContextSerializer( + auth_ctx.JsonPayloadSerializer()) + + self._client = messaging.RPCClient( + rpc.get_transport(), + messaging.Target(topic=self.topic), + serializer=serializer + ) + + def sync_call(self, ctx, method, target=None, **kwargs): + return self._client.prepare(topic=self.topic, server=target).call( + ctx, + method, + **kwargs + ) + + def async_call(self, ctx, method, target=None, **kwargs): + return self._client.prepare(topic=self.topic, server=target).cast( + ctx, + method, + **kwargs + ) diff --git a/mistral/engine/rpc/oslo/oslo_server.py b/mistral/engine/rpc/oslo/oslo_server.py new file mode 100644 index 000000000..6191d7fb0 --- /dev/null +++ b/mistral/engine/rpc/oslo/oslo_server.py @@ -0,0 +1,56 @@ +# Copyright 2015 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging +import oslo_messaging as messaging + +from mistral import context as ctx +from mistral.engine.rpc import base as rpc_base +from mistral.engine.rpc import rpc + + +LOG = logging.getLogger(__name__) + + +class OsloRPCServer(rpc_base.RPCServer): + def __init__(self, conf): + super(OsloRPCServer, self).__init__(conf) + + self.topic = conf.get('topic', '') + self.server_id = conf.get('server_id', '') + self.queue = self.topic + self.routing_key = self.topic + self.channel = None + self.connection = None + self.endpoints = [] + + def register_endpoint(self, endpoint): + self.endpoints.append(endpoint) + + def run(self): + target = messaging.Target( + topic=self.topic, + server=self.server_id + ) + + server = messaging.get_rpc_server( + rpc.get_transport(), + target, + self.endpoints, + executor='eventlet', + serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer()) + ) + + server.start() + server.wait() diff --git a/mistral/engine/rpc.py b/mistral/engine/rpc/rpc.py similarity index 100% rename from mistral/engine/rpc.py rename to mistral/engine/rpc/rpc.py diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index 65f4f2c04..efb339ff5 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -23,7 +23,7 @@ import six from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models as db_models from mistral.engine import dispatcher -from mistral.engine import rpc +from mistral.engine.rpc import rpc from mistral.engine import utils as eng_utils from mistral import exceptions as exc from mistral.services import scheduler diff --git a/mistral/services/periodic.py b/mistral/services/periodic.py index 32568cf0f..804eb0d86 100644 --- a/mistral/services/periodic.py +++ b/mistral/services/periodic.py @@ -21,7 +21,7 @@ from oslo_service import threadgroup from mistral import context as auth_ctx from mistral.db.v2 import api as db_api_v2 -from mistral.engine import rpc +from mistral.engine.rpc import rpc from mistral import exceptions as exc from mistral.services import security from mistral.services import triggers diff --git a/mistral/tests/unit/api/v2/test_action_executions.py b/mistral/tests/unit/api/v2/test_action_executions.py index 5eae73792..b3a7276e8 100644 --- a/mistral/tests/unit/api/v2/test_action_executions.py +++ b/mistral/tests/unit/api/v2/test_action_executions.py @@ -23,7 +23,7 @@ from oslo_config import cfg from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models -from mistral.engine import rpc +from mistral.engine.rpc import rpc from mistral import exceptions as exc from mistral.tests.unit.api import base from mistral.workflow import states diff --git a/mistral/tests/unit/api/v2/test_executions.py b/mistral/tests/unit/api/v2/test_executions.py index 23cc19ccb..3b503264a 100644 --- a/mistral/tests/unit/api/v2/test_executions.py +++ b/mistral/tests/unit/api/v2/test_executions.py @@ -17,6 +17,7 @@ import copy import datetime import json + import mock import uuid from webtest import app as webtest_app @@ -24,12 +25,13 @@ from webtest import app as webtest_app from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import api as sql_db_api from mistral.db.v2.sqlalchemy import models -from mistral.engine import rpc +from mistral.engine.rpc import rpc from mistral import exceptions as exc from mistral.tests.unit.api import base from mistral import utils from mistral.workflow import states + WF_EX = models.WorkflowExecution( id='123e4567-e89b-12d3-a456-426655440000', workflow_name='some', diff --git a/mistral/tests/unit/api/v2/test_tasks.py b/mistral/tests/unit/api/v2/test_tasks.py index 521326802..5c3ba2da7 100644 --- a/mistral/tests/unit/api/v2/test_tasks.py +++ b/mistral/tests/unit/api/v2/test_tasks.py @@ -20,7 +20,7 @@ import mock from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models -from mistral.engine import rpc +from mistral.engine.rpc import rpc from mistral import exceptions as exc from mistral.tests.unit.api import base from mistral.workflow import data_flow diff --git a/mistral/tests/unit/engine/base.py b/mistral/tests/unit/engine/base.py index 56bfadcdc..568ca3b38 100644 --- a/mistral/tests/unit/engine/base.py +++ b/mistral/tests/unit/engine/base.py @@ -22,11 +22,12 @@ from mistral import context as ctx from mistral.db.v2 import api as db_api from mistral.engine import default_engine as def_eng from mistral.engine import default_executor as def_exec -from mistral.engine import rpc +from mistral.engine.rpc import rpc from mistral.services import scheduler from mistral.tests.unit import base from mistral.workflow import states + LOG = logging.getLogger(__name__) # Default delay and timeout in seconds for await_xxx() functions. diff --git a/mistral/tests/unit/engine/test_environment.py b/mistral/tests/unit/engine/test_environment.py index 29a2bcb66..c8c414435 100644 --- a/mistral/tests/unit/engine/test_environment.py +++ b/mistral/tests/unit/engine/test_environment.py @@ -17,7 +17,7 @@ from oslo_config import cfg from mistral.db.v2 import api as db_api from mistral.engine import default_executor -from mistral.engine import rpc +from mistral.engine.rpc import rpc from mistral.services import workbooks as wb_service from mistral.tests.unit.engine import base diff --git a/mistral/tests/unit/services/test_trigger_service.py b/mistral/tests/unit/services/test_trigger_service.py index bc25aea34..1bd278b5b 100644 --- a/mistral/tests/unit/services/test_trigger_service.py +++ b/mistral/tests/unit/services/test_trigger_service.py @@ -17,7 +17,7 @@ import eventlet import mock from oslo_config import cfg -from mistral.engine import rpc +from mistral.engine.rpc import rpc from mistral import exceptions as exc from mistral.services import periodic from mistral.services import security