From 30589cfa5f3b74bdb663f9c33b24552630e51d9a Mon Sep 17 00:00:00 2001 From: Moshe Elisha Date: Thu, 23 Jul 2015 17:13:37 +0000 Subject: [PATCH] Support large datasets for execution objects Allow the cloud provider to configure the size limit of workflow input, action input, action output, task publish and workflow params This will allow users to execute workflows that handle much bigger datasets from the ones which are supported today. The changes made in order to achieve that goal: * Increase DB columns size so they will no longer be the barrier * Add configuration options to control the limit * Add event listener on the columns to enforce the size limitation Change-Id: If7c29f9325e60ce456e23d5c7b6ceb3477a028d4 Implements: blueprint support-large-datasets --- etc/mistral.conf.sample | 3 + mistral/config.py | 5 +- .../005_increase_execution_columns_size.py | 45 +++++ mistral/db/v2/sqlalchemy/models.py | 58 +++++- mistral/exceptions.py | 9 + .../test_execution_fields_size_limitation.py | 187 ++++++++++++++++++ 6 files changed, 300 insertions(+), 7 deletions(-) create mode 100644 mistral/db/sqlalchemy/migration/alembic_migrations/versions/005_increase_execution_columns_size.py create mode 100644 mistral/tests/unit/engine/test_execution_fields_size_limitation.py diff --git a/etc/mistral.conf.sample b/etc/mistral.conf.sample index 66ac6d86..aa985084 100644 --- a/etc/mistral.conf.sample +++ b/etc/mistral.conf.sample @@ -327,6 +327,9 @@ # The version of the executor. (string value) #version = 1.0 +# The default maximum size in KB of large text fields of runtime +# execution objects. Use -1 for no limit. (integer value) +#execution_field_size_limit_kb = 1024 [keystone_authtoken] diff --git a/mistral/config.py b/mistral/config.py index f8103333..b18904fe 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -72,7 +72,10 @@ engine_opts = [ cfg.StrOpt('topic', default='mistral_engine', help='The message topic that the engine listens on.'), cfg.StrOpt('version', default='1.0', - help='The version of the engine.') + help='The version of the engine.'), + cfg.IntOpt('execution_field_size_limit_kb', default=1024, + help='The default maximum size in KB of large text fields ' + 'of runtime execution objects. Use -1 for no limit.'), ] executor_opts = [ diff --git a/mistral/db/sqlalchemy/migration/alembic_migrations/versions/005_increase_execution_columns_size.py b/mistral/db/sqlalchemy/migration/alembic_migrations/versions/005_increase_execution_columns_size.py new file mode 100644 index 00000000..14b96e65 --- /dev/null +++ b/mistral/db/sqlalchemy/migration/alembic_migrations/versions/005_increase_execution_columns_size.py @@ -0,0 +1,45 @@ +# Copyright 2015 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Increase executions_v2 column size from JsonDictType to JsonLongDictType + +Revision ID: 005 +Revises: 004 +Create Date: 2015-07-21 08:48:51.636094 + +""" + +# revision identifiers, used by Alembic. +revision = '005' +down_revision = '004' + +from alembic import op +from mistral.db.sqlalchemy import types as st + + +def upgrade(): + # Changing column types from JsonDictType to JsonLongDictType + op.alter_column('executions_v2', 'runtime_context', + type_=st.JsonLongDictType()) + op.alter_column('executions_v2', 'input', + type_=st.JsonLongDictType()) + op.alter_column('executions_v2', 'params', + type_=st.JsonLongDictType()) + op.alter_column('executions_v2', 'context', + type_=st.JsonLongDictType()) + op.alter_column('executions_v2', 'action_spec', + type_=st.JsonLongDictType()) + op.alter_column('executions_v2', 'published', + type_=st.JsonLongDictType()) diff --git a/mistral/db/v2/sqlalchemy/models.py b/mistral/db/v2/sqlalchemy/models.py index adde499f..d0a17add 100644 --- a/mistral/db/v2/sqlalchemy/models.py +++ b/mistral/db/v2/sqlalchemy/models.py @@ -19,14 +19,21 @@ import sqlalchemy as sa from sqlalchemy import event from sqlalchemy.orm import backref from sqlalchemy.orm import relationship +import sys + +from oslo_config import cfg +from oslo_log import log as logging from mistral.db.sqlalchemy import model_base as mb from mistral.db.sqlalchemy import types as st +from mistral import exceptions as exc from mistral import utils # Definition objects. +LOG = logging.getLogger(__name__) + class Definition(mb.MistralSecureModelBase): __abstract__ = True @@ -106,7 +113,7 @@ class Execution(mb.MistralSecureModelBase): # Runtime context like iteration_no of a repeater. # Effectively internal engine properties which will be used to determine # execution of a task. - runtime_context = sa.Column(st.JsonDictType()) + runtime_context = sa.Column(st.JsonLongDictType()) class ActionExecution(Execution): @@ -118,7 +125,7 @@ class ActionExecution(Execution): # Main properties. accepted = sa.Column(sa.Boolean(), default=False) - input = sa.Column(st.JsonDictType(), nullable=True) + input = sa.Column(st.JsonLongDictType(), nullable=True) output = sa.orm.deferred(sa.Column(st.JsonLongDictType(), nullable=True)) @@ -131,10 +138,10 @@ class WorkflowExecution(ActionExecution): } # Main properties. - params = sa.Column(st.JsonDictType()) + params = sa.Column(st.JsonLongDictType()) # TODO(rakhmerov): We need to get rid of this field at all. - context = sa.Column(st.JsonDictType()) + context = sa.Column(st.JsonLongDictType()) class TaskExecution(Execution): @@ -145,7 +152,7 @@ class TaskExecution(Execution): } # Main properties. - action_spec = sa.Column(st.JsonDictType()) + action_spec = sa.Column(st.JsonLongDictType()) # Whether the task is fully processed (publishing and calculating commands # after it). It allows to simplify workflow controller implementations @@ -154,7 +161,7 @@ class TaskExecution(Execution): # Data Flow properties. in_context = sa.Column(st.JsonLongDictType()) - published = sa.Column(st.JsonDictType()) + published = sa.Column(st.JsonLongDictType()) for cls in utils.iter_subclasses(Execution): @@ -166,6 +173,40 @@ for cls in utils.iter_subclasses(Execution): retval=True ) + +def validate_long_type_length(cls, field_name, value): + """Makes sure the value does not exceeds the maximum size.""" + if value: + # Get the configured limit. + size_limit_kb = cfg.CONF.engine.execution_field_size_limit_kb + + # If the size is unlimited. + if (size_limit_kb < 0): + return + + size_kb = sys.getsizeof(str(value)) / 1024 + if (size_kb > size_limit_kb): + LOG.error( + "Size limit %dKB exceed for class [%s], " + "field %s of size %dKB.", + size_limit_kb, str(cls), field_name, size_kb + ) + raise exc.SizeLimitExceededException(field_name, size_kb, + size_limit_kb) + + +def register_length_validator(attr_name): + """Register an event listener on the attribute that will + validate the size every time a 'set' occurs. + """ + for cls in utils.iter_subclasses(Execution): + if hasattr(cls, attr_name): + event.listen( + getattr(cls, attr_name), + 'set', + lambda t, v, o, i: validate_long_type_length(cls, attr_name, v) + ) + # Many-to-one for 'Execution' and 'TaskExecution'. Execution.task_execution_id = sa.Column( @@ -296,3 +337,8 @@ class CronTrigger(mb.MistralSecureModelBase): # Register all hooks related to secure models. mb.register_secure_model_hooks() + +# Register an event listener to verify that the size of all the long columns +# affected by the user do not exceed the limit configuration. +for attr_name in ['input', 'output', 'params', 'published']: + register_length_validator(attr_name) diff --git a/mistral/exceptions.py b/mistral/exceptions.py index 0dc7ddf1..fdbcab17 100644 --- a/mistral/exceptions.py +++ b/mistral/exceptions.py @@ -110,3 +110,12 @@ class InvalidModelException(DSLParsingException): class InvalidResultException(MistralException): http_code = 400 message = "Unable to parse result" + + +class SizeLimitExceededException(MistralException): + http_code = 400 + + def __init__(self, field_name, size_kb, size_limit_kb): + super(SizeLimitExceededException, self).__init__( + "Size of '%s' is %dKB which exceeds the limit of %dKB" + % (field_name, size_kb, size_limit_kb)) diff --git a/mistral/tests/unit/engine/test_execution_fields_size_limitation.py b/mistral/tests/unit/engine/test_execution_fields_size_limitation.py new file mode 100644 index 00000000..70f555f1 --- /dev/null +++ b/mistral/tests/unit/engine/test_execution_fields_size_limitation.py @@ -0,0 +1,187 @@ +# Copyright 2015 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg +from oslo_log import log as logging +import testtools + +from mistral.actions import base as actions_base +from mistral.db.v2 import api as db_api +from mistral import exceptions as exc +from mistral.services import workflows as wf_service +from mistral.tests import base as test_base +from mistral.tests.unit.engine import base +from mistral.workflow import utils as wf_utils + +LOG = logging.getLogger(__name__) + +# Use the set_default method to set value otherwise in certain test cases +# the change in value is not permanent. +cfg.CONF.set_default('auth_enable', False, group='pecan') + +WF = """ +--- +version: '2.0' + +wf: + input: + - workflow_input: '__WORKFLOW_INPUT__' + - action_output_length: 0 + tasks: + task1: + action: my_action + input: + action_input: '__ACTION_INPUT__' + action_output_length: <% $.action_output_length %> + publish: + p_var: '__TASK_PUBLISHED__' +""" + + +class MyAction(actions_base.Action): + def __init__(self, action_input, action_output_length): + self.action_input = action_input + self.action_output_length = action_output_length + + def run(self): + return wf_utils.Result( + data=''.join('A' for _ in range(self.action_output_length)) + ) + + def test(self): + raise NotImplementedError + + +def expect_size_limit_exception(field_name): + def logger(test_func): + def wrapped(*args, **kwargs): + with testtools.ExpectedException(exc.SizeLimitExceededException, + value_re="Size of '%s' is 1KB " + "which exceeds the limit" + " of 0KB" % field_name): + return test_func(*args, **kwargs) + + return wrapped + + return logger + + +def generate_workflow(tokens): + new_wf = WF + long_string = ''.join('A' for _ in range(1024)) + for token in tokens: + new_wf = new_wf.replace(token, long_string) + return new_wf + + +class ExecutionFieldsSizeLimitTest(base.EngineTestCase): + def setUp(self): + """Resets the size limit config between tests""" + super(ExecutionFieldsSizeLimitTest, self).setUp() + cfg.CONF.set_default('execution_field_size_limit_kb', 0, + group='engine') + test_base.register_action_class('my_action', MyAction) + + def test_default_limit(self): + cfg.CONF.set_default('execution_field_size_limit_kb', -1, + group='engine') + + new_wf = generate_workflow( + ['__ACTION_INPUT_', '__WORKFLOW_INPUT__', + '__TASK_PUBLISHED__']) + + wf_service.create_workflows(new_wf) + + # Start workflow. + wf_ex = self.engine.start_workflow('wf', {}) + + self._await(lambda: self.is_execution_success(wf_ex.id)) + + @expect_size_limit_exception('input') + def test_workflow_input_default_value_limit(self): + new_wf = generate_workflow(['__WORKFLOW_INPUT__']) + + wf_service.create_workflows(new_wf) + + # Start workflow. + self.engine.start_workflow('wf', {}) + + @expect_size_limit_exception('input') + def test_workflow_input_limit(self): + wf_service.create_workflows(WF) + + # Start workflow. + self.engine.start_workflow( + 'wf', + { + 'workflow_input': ''.join('A' for _ in range(1024)) + } + ) + + @expect_size_limit_exception('input') + def test_action_input_limit(self): + new_wf = generate_workflow(['__ACTION_INPUT__']) + + wf_service.create_workflows(new_wf) + + # Start workflow. + self.engine.start_workflow('wf', {}) + + def test_action_output_limit(self): + wf_service.create_workflows(WF) + + # Start workflow. + wf_ex = self.engine.start_workflow('wf', { + 'action_output_length': 1024 + }) + + self._await(lambda: self.is_execution_error(wf_ex.id)) + + # Note: We need to reread execution to access related tasks. + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertEqual("Size of 'output' is 1KB which exceeds " + "the limit of 0KB", + wf_ex.state_info) + + def test_task_published_limit(self): + new_wf = generate_workflow(['__TASK_PUBLISHED__']) + + wf_service.create_workflows(new_wf) + + # Start workflow. + wf_ex = self.engine.start_workflow('wf', {}) + + self._await(lambda: self.is_execution_error(wf_ex.id)) + + # Note: We need to reread execution to access related tasks. + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertEqual("Size of 'published' is 1KB which exceeds " + "the limit of 0KB", + wf_ex.state_info) + + @expect_size_limit_exception('params') + def test_workflow_params_limit(self): + wf_service.create_workflows(WF) + + # Start workflow. + long_string = ''.join('A' for _ in range(1024)) + self.engine.start_workflow('wf', {}, '', env={'param': long_string}) + + def tearDown(self): + """Restores the size limit config to default""" + super(ExecutionFieldsSizeLimitTest, self).tearDown() + cfg.CONF.set_default('execution_field_size_limit_kb', 1024, + group='engine')