Make sure the context is correctly passed through the rpc

This adds a rpc context serializer to pass the context to the engine and
executor.

Change-Id: I6e76c7e28f475bb815f9184041bc8b848249d419
This commit is contained in:
Angus Salkeld 2014-06-19 16:22:20 +10:00
parent f814baae8c
commit 60d90e5660
6 changed files with 59 additions and 17 deletions

View File

@ -40,6 +40,7 @@ from wsgiref import simple_server
from mistral.api import app from mistral.api import app
from mistral import config from mistral import config
from mistral import context
from mistral import engine from mistral import engine
from mistral.engine import executor from mistral.engine import executor
from mistral.openstack.common import log as logging from mistral.openstack.common import log as logging
@ -49,23 +50,27 @@ LOG = logging.getLogger(__name__)
def launch_executor(transport): def launch_executor(transport):
serializer = context.RpcContextSerializer(context.JsonPayloadSerializer())
target = messaging.Target(topic=cfg.CONF.executor.topic, target = messaging.Target(topic=cfg.CONF.executor.topic,
server=cfg.CONF.executor.host) server=cfg.CONF.executor.host)
# Since engine and executor are tightly coupled, use the engine # Since engine and executor are tightly coupled, use the engine
# configuration to decide which executor to get. # configuration to decide which executor to get.
endpoints = [executor.get_executor(cfg.CONF.engine.engine, transport)] endpoints = [executor.get_executor(cfg.CONF.engine.engine, transport)]
server = messaging.get_rpc_server( server = messaging.get_rpc_server(
transport, target, endpoints, executor='eventlet') transport, target, endpoints, executor='eventlet',
serializer=serializer)
server.start() server.start()
server.wait() server.wait()
def launch_engine(transport): def launch_engine(transport):
serializer = context.RpcContextSerializer(context.JsonPayloadSerializer())
target = messaging.Target(topic=cfg.CONF.engine.topic, target = messaging.Target(topic=cfg.CONF.engine.topic,
server=cfg.CONF.engine.host) server=cfg.CONF.engine.host)
endpoints = [engine.get_engine(cfg.CONF.engine.engine, transport)] endpoints = [engine.get_engine(cfg.CONF.engine.engine, transport)]
server = messaging.get_rpc_server( server = messaging.get_rpc_server(
transport, target, endpoints, executor='eventlet') transport, target, endpoints, executor='eventlet',
serializer=serializer)
server.start() server.start()
server.wait() server.wait()

View File

@ -15,9 +15,11 @@
# limitations under the License. # limitations under the License.
import eventlet import eventlet
from oslo import messaging
from pecan import hooks from pecan import hooks
from mistral import exceptions as exc from mistral import exceptions as exc
from mistral.openstack.common import jsonutils
from mistral.openstack.common import log as logging from mistral.openstack.common import log as logging
from mistral import utils from mistral import utils
@ -123,6 +125,36 @@ def context_from_headers(headers):
) )
class JsonPayloadSerializer(messaging.NoOpSerializer):
@staticmethod
def serialize_entity(context, entity):
return jsonutils.to_primitive(entity, convert_instances=True)
class RpcContextSerializer(messaging.Serializer):
def __init__(self, base=None):
self._base = base or messaging.NoOpSerializer()
def serialize_entity(self, context, entity):
if not self._base:
return entity
return self._base.serialize_entity(context, entity)
def deserialize_entity(self, context, entity):
if not self._base:
return entity
return self._base.deserialize_entity(context, entity)
def serialize_context(self, context):
return context.to_dict()
def deserialize_context(self, context):
ctx = MistralContext(**context)
set_ctx(ctx)
return ctx
class ContextHook(hooks.PecanHook): class ContextHook(hooks.PecanHook):
def before(self, state): def before(self, state):
request_ctx = context_from_headers(state.request.headers) request_ctx = context_from_headers(state.request.headers)

View File

@ -26,6 +26,7 @@ from stevedore import driver
# the submodules are referenced. # the submodules are referenced.
cfg.CONF.import_opt('workflow_trace_log_name', 'mistral.config') cfg.CONF.import_opt('workflow_trace_log_name', 'mistral.config')
from mistral import context as auth_context
from mistral.db import api as db_api from mistral.db import api as db_api
from mistral import dsl_parser as parser from mistral import dsl_parser as parser
from mistral.engine import data_flow from mistral.engine import data_flow
@ -71,7 +72,7 @@ class Engine(object):
and target task. and target task.
:param cntx: a request context dict :param cntx: a request context dict
:type cntx: dict :type cntx: MistralContext
:param kwargs: a dict of method arguments :param kwargs: a dict of method arguments
:type kwargs: dict :type kwargs: dict
:return: Workflow execution. :return: Workflow execution.
@ -387,7 +388,8 @@ class Engine(object):
retries, break_on, delay_sec = task_spec.get_retry_parameters() retries, break_on, delay_sec = task_spec.get_retry_parameters()
if delay_sec > 0: if delay_sec > 0:
# Run the task after the specified delay. # Run the task after the specified delay.
eventlet.spawn_after(delay_sec, run_delayed_task) eventlet.spawn_after(delay_sec, run_delayed_task,
context=auth_context.ctx())
else: else:
LOG.warn("No delay specified for task(id=%s) name=%s. Not " LOG.warn("No delay specified for task(id=%s) name=%s. Not "
"scheduling for execution." % (task['id'], task['name'])) "scheduling for execution." % (task['id'], task['name']))
@ -402,8 +404,11 @@ class EngineClient(object):
:param transport: a messaging transport handle :param transport: a messaging transport handle
:type transport: Transport :type transport: Transport
""" """
serializer = auth_context.RpcContextSerializer(
auth_context.JsonPayloadSerializer())
target = messaging.Target(topic=cfg.CONF.engine.topic) target = messaging.Target(topic=cfg.CONF.engine.topic)
self._client = messaging.RPCClient(transport, target) self._client = messaging.RPCClient(transport, target,
serializer=serializer)
def start_workflow_execution(self, workbook_name, task_name, context=None): def start_workflow_execution(self, workbook_name, task_name, context=None):
"""Starts a workflow execution based on the specified workbook name """Starts a workflow execution based on the specified workbook name
@ -414,8 +419,7 @@ class EngineClient(object):
:param context: Execution context which defines a workflow input :param context: Execution context which defines a workflow input
:return: Workflow execution. :return: Workflow execution.
""" """
# TODO(m4dcoder): refactor auth context cntx = auth_context.ctx()
cntx = {}
kwargs = {'workbook_name': workbook_name, kwargs = {'workbook_name': workbook_name,
'task_name': task_name, 'task_name': task_name,
'context': context} 'context': context}
@ -465,8 +469,7 @@ class EngineClient(object):
:param execution_id: Workflow execution id. :param execution_id: Workflow execution id.
:return: Current workflow state. :return: Current workflow state.
""" """
# TODO(m4dcoder): refactor auth context cntx = auth_context.ctx()
cntx = {}
kwargs = {'workbook_name': workbook_name, kwargs = {'workbook_name': workbook_name,
'execution_id': execution_id} 'execution_id': execution_id}
return self._client.call( return self._client.call(
@ -480,8 +483,7 @@ class EngineClient(object):
:param task_id: Task id. :param task_id: Task id.
:return: Current task state. :return: Current task state.
""" """
# TODO(m4dcoder): refactor auth context cntx = auth_context.ctx()
cntx = {}
kwargs = {'workbook_name': workbook_name, kwargs = {'workbook_name': workbook_name,
'executioin_id': execution_id, 'executioin_id': execution_id,
'task_id': task_id} 'task_id': task_id}

View File

@ -15,6 +15,7 @@
from oslo.config import cfg from oslo.config import cfg
from oslo import messaging from oslo import messaging
from mistral import context as auth_context
from mistral import engine from mistral import engine
from mistral.engine import executor from mistral.engine import executor
from mistral.openstack.common import log as logging from mistral.openstack.common import log as logging
@ -30,10 +31,8 @@ class DefaultEngine(engine.Engine):
self.transport = messaging.get_transport(cfg.CONF) self.transport = messaging.get_transport(cfg.CONF)
exctr = executor.ExecutorClient(self.transport) exctr = executor.ExecutorClient(self.transport)
for task in tasks: for task in tasks:
# TODO(m4dcoder): Fill request context argument with auth info
context = {}
exctr.handle_task(context, task=task)
LOG.info("Submitted task for execution: '%s'" % task) LOG.info("Submitted task for execution: '%s'" % task)
exctr.handle_task(auth_context.ctx(), task=task)
def _run_tasks(self, tasks): def _run_tasks(self, tasks):
# TODO(rakhmerov): # TODO(rakhmerov):

View File

@ -83,7 +83,7 @@ class DefaultExecutor(executor.Executor):
"""Handle the execution of the workbook task. """Handle the execution of the workbook task.
:param cntx: a request context dict :param cntx: a request context dict
:type cntx: dict :type cntx: MistralContext
:param kwargs: a dict of method arguments :param kwargs: a dict of method arguments
:type kwargs: dict :type kwargs: dict
""" """

View File

@ -19,6 +19,7 @@ from oslo import messaging
import six import six
from stevedore import driver from stevedore import driver
from mistral import context as auth_context
from mistral import engine from mistral import engine
from mistral.openstack.common import log as logging from mistral.openstack.common import log as logging
@ -57,14 +58,17 @@ class ExecutorClient(object):
:param transport: a messaging transport handle :param transport: a messaging transport handle
:type transport: Transport :type transport: Transport
""" """
serializer = auth_context.RpcContextSerializer(
auth_context.JsonPayloadSerializer())
target = messaging.Target(topic=cfg.CONF.executor.topic) target = messaging.Target(topic=cfg.CONF.executor.topic)
self._client = messaging.RPCClient(transport, target) self._client = messaging.RPCClient(transport, target,
serializer=serializer)
def handle_task(self, cntx, **kwargs): def handle_task(self, cntx, **kwargs):
"""Send the task request to the Executor for execution. """Send the task request to the Executor for execution.
:param cntx: a request context dict :param cntx: a request context dict
:type cntx: dict :type cntx: MistralContext
:param kwargs: a dict of method arguments :param kwargs: a dict of method arguments
:type kwargs: dict :type kwargs: dict
""" """