diff --git a/.gitignore b/.gitignore index 0b0906e0f..736a62d36 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ *.py[cod] +*.sqlite # C extensions *.so diff --git a/etc/logging.conf.example b/etc/logging.conf.example new file mode 100644 index 000000000..dcedf9f75 --- /dev/null +++ b/etc/logging.conf.example @@ -0,0 +1,32 @@ +[loggers] +keys=root + +[handlers] +keys=consoleHandler, fileHandler + +[formatters] +keys=verboseFormatter, simpleFormatter + +[logger_root] +level=DEBUG +handlers=consoleHandler, fileHandler + +[handler_consoleHandler] +class=StreamHandler +level=INFO +formatter=simpleFormatter +args=(sys.stdout,) + +[handler_fileHandler] +class=FileHandler +level=INFO +formatter=verboseFormatter +args=("/tmp/mistral.log",) + +[formatter_verboseFormatter] +format=%(asctime)s - %(name)s - %(levelname)s - %(message)s +datefmt= + +[formatter_simpleFormatter] +format=%(asctime)s - %(levelname)s - %(message)s +datefmt= diff --git a/etc/mistral.conf.example b/etc/mistral.conf.example new file mode 100644 index 000000000..b43ac17ab --- /dev/null +++ b/etc/mistral.conf.example @@ -0,0 +1,35 @@ +[DEFAULT] +# Show more verbose log output (sets INFO log level output) +verbose = True + +# Show debugging output in logs (sets DEBUG log level output) +debug = False + +# Log to this file +log_file = /tmp/mistral.log + +# Log levels for specific modules +default_log_levels = mistral=INFO,mistral.cmd.api=INFO,mistral.api=DEBUG,wsme=DEBUG + +# Uncomment this option to get more fine-grained control over logging configuration +#log_config_append = etc/logging.conf + +[api] +# Address to bind the API server to +host = 0.0.0.0 + +# Port the bind the API server to +port = 8989 + +[database] +#A valid SQLAlchemy connection string +#connection = mysql://root:password@localhost:3306/mistral +connection = sqlite:///mistral.sqlite + +[rabbit] +rabbit_host = localhost +rabbit_port = 5672 +rabbit_virtual_host = / +rabbit_task_queue = tasks +rabbit_user = guest +rabbit_password = guest diff --git a/mistral/cmd/api.py b/mistral/cmd/api.py index ef375d74d..bfb707bef 100644 --- a/mistral/cmd/api.py +++ b/mistral/cmd/api.py @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- +# # Copyright 2013 - Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/mistral/cmd/task_executor.py b/mistral/cmd/task_executor.py new file mode 100644 index 000000000..fb43fd1a5 --- /dev/null +++ b/mistral/cmd/task_executor.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Script to start instance of Task Executor.""" + +import sys +from oslo.config import cfg +from mistral import config +from mistral.openstack.common import log as logging +from mistral.engine.scalable.executor import executor + +LOG = logging.getLogger('mistral.cmd.task_executor') + + +def main(): + try: + config.parse_args() + logging.setup('Mistral') + + rabbit_opts = cfg.CONF.rabbit + + executor.start(rabbit_opts) + + LOG.info("Mistral Task Executor is listening RabbitMQ" + " [host=%s, port=%s, task_queue=%s]" % + (rabbit_opts.rabbit_host, + rabbit_opts.rabbit_port, + rabbit_opts.rabbit_task_queue)) + except RuntimeError, e: + sys.stderr.write("ERROR: %s\n" % e) + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/mistral/common/config.py b/mistral/common/config.py deleted file mode 100644 index 375124f85..000000000 --- a/mistral/common/config.py +++ /dev/null @@ -1,220 +0,0 @@ -#!/usr/bin/env python -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2011 OpenStack LLC. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Routines for configuring Glance -""" - -import logging -import logging.config -import logging.handlers -import os -import sys - -from oslo.config import cfg -from paste import deploy - -from muranoapi.openstack.common import log -from muranoapi import __version__ as version - -paste_deploy_opts = [ - cfg.StrOpt('flavor'), - cfg.StrOpt('config_file'), -] - -bind_opts = [ - cfg.StrOpt('bind-host', default='0.0.0.0'), - cfg.IntOpt('bind-port', default='8082'), -] - -reports_opts = [ - cfg.StrOpt('results_exchange', default='task-results'), - cfg.StrOpt('results_queue', default='task-results'), - cfg.StrOpt('reports_exchange', default='task-reports'), - cfg.StrOpt('reports_queue', default='task-reports') -] - -rabbit_opts = [ - cfg.StrOpt('host', default='localhost'), - cfg.IntOpt('port', default=5672), - cfg.StrOpt('login', default='guest'), - cfg.StrOpt('password', default='guest'), - cfg.StrOpt('virtual_host', default='/'), - cfg.BoolOpt('ssl', default=False), - cfg.StrOpt('ca_certs', default='') -] - -db_opts = [ - cfg.BoolOpt('auto_create', default=False, - help=_('A boolean that determines if the database will be ' - 'automatically created.')), -] - -CONF = cfg.CONF -CONF.register_opts(paste_deploy_opts, group='paste_deploy') -CONF.register_cli_opts(bind_opts) -CONF.register_opts(reports_opts, group='reports') -CONF.register_opts(rabbit_opts, group='rabbitmq') -CONF.register_opts(db_opts, group='database') - - -CONF.import_opt('verbose', 'muranoapi.openstack.common.log') -CONF.import_opt('debug', 'muranoapi.openstack.common.log') -CONF.import_opt('log_dir', 'muranoapi.openstack.common.log') -CONF.import_opt('log_file', 'muranoapi.openstack.common.log') -CONF.import_opt('log_config', 'muranoapi.openstack.common.log') -CONF.import_opt('log_format', 'muranoapi.openstack.common.log') -CONF.import_opt('log_date_format', 'muranoapi.openstack.common.log') -CONF.import_opt('use_syslog', 'muranoapi.openstack.common.log') -CONF.import_opt('syslog_log_facility', 'muranoapi.openstack.common.log') - - -cfg.set_defaults(log.log_opts, - default_log_levels=['qpid.messaging=INFO', - 'sqlalchemy=WARN', - 'keystoneclient=INFO', - 'eventlet.wsgi.server=WARN']) - - -def parse_args(args=None, usage=None, default_config_files=None): - CONF(args=args, - project='muranoapi', - version=version, - usage=usage, - default_config_files=default_config_files) - - -def setup_logging(): - """ - Sets up the logging options for a log with supplied name - """ - - if CONF.log_config: - # Use a logging configuration file for all settings... - if os.path.exists(CONF.log_config): - logging.config.fileConfig(CONF.log_config) - return - else: - raise RuntimeError("Unable to locate specified logging " - "config file: %s" % CONF.log_config) - - root_logger = logging.root - if CONF.debug: - root_logger.setLevel(logging.DEBUG) - elif CONF.verbose: - root_logger.setLevel(logging.INFO) - else: - root_logger.setLevel(logging.WARNING) - - formatter = logging.Formatter(CONF.log_format, CONF.log_date_format) - - if CONF.use_syslog: - try: - facility = getattr(logging.handlers.SysLogHandler, - CONF.syslog_log_facility) - except AttributeError: - raise ValueError(_("Invalid syslog facility")) - - handler = logging.handlers.SysLogHandler(address='/dev/log', - facility=facility) - elif CONF.log_file: - logfile = CONF.log_file - if CONF.log_dir: - logfile = os.path.join(CONF.log_dir, logfile) - handler = logging.handlers.WatchedFileHandler(logfile) - else: - handler = logging.StreamHandler(sys.stdout) - - handler.setFormatter(formatter) - root_logger.addHandler(handler) - - -def _get_deployment_flavor(): - """ - Retrieve the paste_deploy.flavor config item, formatted appropriately - for appending to the application name. - """ - flavor = CONF.paste_deploy.flavor - return '' if not flavor else ('-' + flavor) - - -def _get_paste_config_path(): - paste_suffix = '-paste.ini' - conf_suffix = '.conf' - if CONF.config_file: - # Assume paste config is in a paste.ini file corresponding - # to the last config file - path = CONF.config_file[-1].replace(conf_suffix, paste_suffix) - else: - path = CONF.prog + '-paste.ini' - return CONF.find_file(os.path.basename(path)) - - -def _get_deployment_config_file(): - """ - Retrieve the deployment_config_file config item, formatted as an - absolute pathname. - """ - path = CONF.paste_deploy.config_file - if not path: - path = _get_paste_config_path() - if not path: - msg = "Unable to locate paste config file for %s." % CONF.prog - raise RuntimeError(msg) - return os.path.abspath(path) - - -def load_paste_app(app_name=None): - """ - Builds and returns a WSGI app from a paste config file. - - We assume the last config file specified in the supplied ConfigOpts - object is the paste config file. - - :param app_name: name of the application to load - - :raises RuntimeError when config file cannot be located or application - cannot be loaded from config file - """ - if app_name is None: - app_name = CONF.prog - - # append the deployment flavor to the application name, - # in order to identify the appropriate paste pipeline - app_name += _get_deployment_flavor() - - conf_file = _get_deployment_config_file() - - try: - logger = logging.getLogger(__name__) - logger.debug(_("Loading %(app_name)s from %(conf_file)s"), - {'conf_file': conf_file, 'app_name': app_name}) - - app = deploy.loadapp("config:%s" % conf_file, name=app_name) - - # Log the options used when starting if we're in debug mode... - if CONF.debug: - CONF.log_opt_values(logger, logging.DEBUG) - - return app - except (LookupError, ImportError), e: - msg = _("Unable to load %(app_name)s from " - "configuration file %(conf_file)s." - "\nGot: %(e)r") % locals() - logger.error(msg) - raise RuntimeError(msg) diff --git a/mistral/config.py b/mistral/config.py index d58ee27e0..d20dd4609 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -31,10 +31,23 @@ db_opts = [ # TODO: add DB properties. ] +rabbit_opts = [ + cfg.StrOpt('rabbit_host', default='0.0.0.0', + help='RabbitMQ server host name'), + cfg.IntOpt('rabbit_port', default=5672, help='RabbitMQ server port'), + cfg.StrOpt('rabbit_virtual_host', default='/', + help='RabbitMQ server virtual host name'), + cfg.StrOpt('rabbit_task_queue', default='tasks', + help='RabbitMQ tasks queue name'), + cfg.StrOpt('rabbit_user', default='guest', help='RabbitMQ user'), + cfg.StrOpt('rabbit_password', default='guest', help='RabbitMQ password') +] + CONF = cfg.CONF CONF.register_opts(api_opts, group='api') CONF.register_opts(db_opts, group='database') +CONF.register_opts(rabbit_opts, group='rabbit') CONF.import_opt('verbose', 'mistral.openstack.common.log') diff --git a/mistral/db/api.py b/mistral/db/api.py index 5424f9ed8..6488029a5 100644 --- a/mistral/db/api.py +++ b/mistral/db/api.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# TODO: replace this module later with a real implementation from mistral.openstack.common.db import api as db_api from mistral.openstack.common import log as logging @@ -36,8 +35,28 @@ def drop_db(): IMPL.drop_db() +# Transaction control. + + +def start_tx(): + IMPL.start_tx() + + +def commit_tx(): + IMPL.commit_tx() + + +def rollback_tx(): + IMPL.rollback_tx() + + +def end_tx(): + IMPL.end_tx() + + # Workbook + def workbook_get(name): return IMPL.workbook_get(name) diff --git a/mistral/db/sqlalchemy/api.py b/mistral/db/sqlalchemy/api.py index 81ce80c58..09048262f 100644 --- a/mistral/db/sqlalchemy/api.py +++ b/mistral/db/sqlalchemy/api.py @@ -82,6 +82,26 @@ def drop_db(): return True +def start_tx(): + # TODO(rakhmerov): implement + raise NotImplemented + + +def commit_tx(): + # TODO(rakhmerov): implement + raise NotImplemented + + +def rollback_tx(): + # TODO(rakhmerov): implement + raise NotImplemented + + +def end_tx(): + # TODO(rakhmerov): implement + raise NotImplemented + + def event_create(values): values = values.copy() event = m.Event() @@ -245,9 +265,8 @@ def execution_delete(workbook_name, execution_id): def execution_create(workbook_name, values): - values = values.copy() execution = m.WorkflowExecution() - execution.update(values) + execution.update(values.copy()) execution.update({'workbook_name': workbook_name}) session = get_session() @@ -276,7 +295,7 @@ def _task_get(workbook_name, execution_id, task_id, session): @to_dict def tasks_get_all(**kwargs): - return _executions_get_all(get_session(), **kwargs) + return _tasks_get_all(get_session(), **kwargs) def _tasks_get_all(session, **kwargs): diff --git a/mistral/db/sqlalchemy/models.py b/mistral/db/sqlalchemy/models.py index 91d502801..150516fc4 100644 --- a/mistral/db/sqlalchemy/models.py +++ b/mistral/db/sqlalchemy/models.py @@ -49,23 +49,18 @@ class Event(mb.MistralBase): class WorkflowExecution(mb.MistralBase): - """Contains info about particular workflow execution""" + """Contains info about particular workflow execution.""" __tablename__ = 'workflow_executions' - __table_args__ = ( - sa.UniqueConstraint('name'), - ) - id = _id_column() - name = sa.Column(sa.String(80)) workbook_name = sa.Column(sa.String(80)) target_task = sa.Column(sa.String(80)) - workflow_state = sa.Column(sa.String(20)) + state = sa.Column(sa.String(20)) class Workbook(mb.MistralBase): - """Contains info about all DSL (workbook) content""" + """Contains info about workbook (including definition in Mistral DSL).""" __tablename__ = 'workbooks' @@ -75,22 +70,23 @@ class Workbook(mb.MistralBase): id = _id_column() name = sa.Column(sa.String(80), primary_key=True) - doc = sa.Column(sa.String(), nullable=True) + definition = sa.Column(sa.String(), nullable=True) description = sa.Column(sa.String()) tags = sa.Column(st.JsonListType()) scope = sa.Column(sa.String()) class Task(mb.MistralBase): - """Contains info about particular task""" + """Contains info about particular task.""" __tablename__ = 'tasks' id = _id_column() name = sa.Column(sa.String(80)) + dependencies = sa.Column(st.JsonListType()) workbook_name = sa.Column(sa.String(80)) execution_id = sa.Column(sa.String(36)) description = sa.Column(sa.String()) - action = sa.Column(sa.String(80)) + action = sa.Column(st.JsonDictType()) state = sa.Column(sa.String(20)) tags = sa.Column(st.JsonListType()) diff --git a/mistral/dsl.py b/mistral/dsl.py index ad6c16f5e..ee74c26a0 100644 --- a/mistral/dsl.py +++ b/mistral/dsl.py @@ -46,6 +46,10 @@ class Parser(object): def get_tasks(self): return self.doc["Workflow"]["tasks"] + def get_action(self, action_name): + # TODO(rakhmerov): it needs to return action definition as a dict + pass + def get_service_name(self): return self.doc['Service']['name'] diff --git a/mistral/engine/engine.py b/mistral/engine/engine.py new file mode 100644 index 000000000..ad2c6b402 --- /dev/null +++ b/mistral/engine/engine.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Facade interface to Mistral Engine that provides control over lifecycle +of workflow executions. +""" + +import sys + +# TODO(rakhmerov): make it configurable +IMPL = sys.modules["mistral.engine.scalable.engine"] + + +def start_workflow_execution(workbook_name, target_task_name): + """Starts a workflow execution based on the specified workbook name + and target task. + + :param workbook_name: Workbook name + :param target_task_name: Target task name + :return: Workflow execution identifier. + """ + IMPL.start_workflow_execution(workbook_name, target_task_name) + + +def stop_workflow_execution(workbook_name, execution_id): + """Stops the workflow execution with the given id. + + :param workbook_name: Workbook name. + :param execution_id: Workflow execution id. + """ + IMPL.stop_workflow_execution(workbook_name, execution_id) + + +def convey_task_result(workbook_name, execution_id, task_id, state, result): + """Conveys task result to Mistral Engine. + + This method should be used by clients of Mistral Engine to update + state of a task once task action has been performed. One of the + clients of this method is Mistral REST API server that receives + task result from the outside action handlers. + + Note: calling this method serves an event notifying Mistral that + it possibly needs to move the workflow on, i.e. run other workflow + tasks for which all dependencies are satisfied. + + :param workbook_name: Workbook name. + :param execution_id: Workflow execution id. + :param task_id: Task id. + :param state: New task state. + :param result: Task result data. + """ + IMPL.convey_task_result(workbook_name, execution_id, task_id, state, + result) + + +def get_workflow_execution_state(workbook_name, execution_id): + """Gets the workflow execution state. + + :param workbook_name: Workbook name. + :param execution_id: Workflow execution id. + :return: Current workflow state. + """ + IMPL.get_workflow_execution_state(workbook_name, execution_id) + + +def get_task_state(workbook_name, execution_id, task_id): + """Gets task state. + + :param workbook_name: Workbook name. + :param execution_id: Workflow execution id. + :param task_id: Task id. + :return: Current task state. + """ + IMPL.get_task_state(workbook_name, execution_id, task_id) diff --git a/mistral/engine/exception.py b/mistral/engine/exception.py new file mode 100644 index 000000000..e753e1704 --- /dev/null +++ b/mistral/engine/exception.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class EngineException(Exception): + + def __init__(self, message=None): + super(Exception, self).__init__(message) diff --git a/mistral/engine/scalable/__init__.py b/mistral/engine/scalable/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mistral/engine/scalable/engine.py b/mistral/engine/scalable/engine.py new file mode 100644 index 000000000..7f5ed51db --- /dev/null +++ b/mistral/engine/scalable/engine.py @@ -0,0 +1,154 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +import pika +from oslo.config import cfg +from mistral.openstack.common import log as logging +from mistral.db import api as db_api +from mistral import dsl +from mistral.engine import exception +from mistral.engine import states +from mistral.engine.scalable import workflow + + +LOG = logging.getLogger(__name__) + + +def _notify_task_executors(tasks): + opts = cfg.CONF.rabbit + + creds = pika.PlainCredentials(opts.rabbit_user, + opts.rabbit_password) + params = pika.ConnectionParameters(opts.rabbit_host, + opts.rabbit_port, + opts.rabbit_virtual_host, + creds) + + conn = pika.BlockingConnection(params) + LOG.info("Connected to RabbitMQ server [params=%s]" % params) + + try: + channel = conn.channel() + channel.queue_declare(queue=opts.rabbit_task_queue) + + for task in tasks: + msg = json.dumps(task) + channel.basic_publish(exchange='', + routing_key=opts.rabbit_task_queue, + body=msg) + LOG.info("Submitted task for execution: '%s'" % msg) + finally: + conn.close() + + +def start_workflow_execution(workbook_name, target_task_name): + wb = db_api.workbook_get(workbook_name) + wb_dsl = dsl.Parser(wb.definition) + + dsl_tasks = workflow.find_workflow_tasks(wb_dsl, target_task_name) + + db_api.start_tx() + + try: + # Persist execution and tasks in DB. + execution = db_api.execution_create(workbook_name, { + "workbook_name": workbook_name, + "target_task": target_task_name, + "state": states.RUNNING + }) + + tasks = [] + + for dsl_task in dsl_tasks: + task = db_api.task_create(workbook_name, execution["id"], { + "workbook_name": workbook_name, + "execution_id": execution["id"], + "name": dsl_task["name"], + "action": wb_dsl.get_action(dsl_task["action"]), + "state": states.IDLE, + "tags": dsl_task["tags"] + }) + + tasks.append(task) + + _notify_task_executors(tasks) + + db_api.commit_tx() + finally: + db_api.end_tx() + pass + + +def stop_workflow_execution(workbook_name, execution_id): + db_api.execution_update(workbook_name, execution_id, + {"state": states.STOPPED}) + + +def convey_task_result(workbook_name, execution_id, task_id, state, result): + db_api.start_tx() + + try: + # Update task state + task = db_api.task_update(workbook_name, execution_id, task_id, + {"state": state, "result": result}) + + if task["state"] == states.ERROR: + db_api.execution_update(workbook_name, execution_id, { + "state": states.ERROR + }) + + db_api.commit_tx() + return + + execution = db_api.execution_get(workbook_name, execution_id) + + if states.is_stopped_or_finished(execution["state"]): + # The execution has finished or stopped temporarily. + db_api.commit_tx() + return + + # Determine what tasks need to be started. + tasks = db_api.tasks_get(workbook_name, execution_id) + + if workflow.is_finished(tasks): + db_api.commit_tx() + return + + _notify_task_executors(workflow.find_tasks_to_start(tasks)) + + db_api.commit_tx() + finally: + db_api.end_tx() + + +def get_workflow_execution_state(workbook_name, execution_id): + execution = db_api.execution_get(workbook_name, execution_id) + + if not execution: + raise exception.EngineException("Workflow execution not found.") + + return execution["state"] + + +def get_task_state(workbook_name, execution_id, task_id): + task = db_api.task_get(workbook_name, execution_id, task_id) + + if not task: + raise exception.EngineException("Task not found.") + + return task["state"] diff --git a/mistral/engine/scalable/executor/__init__.py b/mistral/engine/scalable/executor/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mistral/engine/scalable/executor/action.py b/mistral/engine/scalable/executor/action.py new file mode 100644 index 000000000..5f05630bf --- /dev/null +++ b/mistral/engine/scalable/executor/action.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import requests + + +class BaseAction(object): + def do_action(self): + pass + + +class RESTAction(BaseAction): + def __init__(self, url, params={}, method="GET", headers=None): + self.url = url + self.params = params + self.method = method + self.headers = headers + + def do_action(self): + requests.request(self.method, self.url, params=self.params, + headers=self.headers) + +# TODO(rakhmerov): add other types of actions. diff --git a/mistral/engine/scalable/executor/executor.py b/mistral/engine/scalable/executor/executor.py new file mode 100644 index 000000000..c678683c1 --- /dev/null +++ b/mistral/engine/scalable/executor/executor.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pika + +from mistral.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +def handle_task(channel, method, properties, body): + channel.basic_ack(delivery_tag=method.delivery_tag) + + LOG.info("Received a message from RabbitMQ: " + body) + #TODO(rakhmerov): implement task execution logic + # 1. Fetch task and execution state from DB + # 2. If execution is in "RUNNING" state and task state is "IDLE" + # then do task action (send a signal) + + +def start(rabbit_opts): + opts = rabbit_opts + + creds = pika.PlainCredentials(opts.rabbit_user, + opts.rabbit_password) + params = pika.ConnectionParameters(opts.rabbit_host, + opts.rabbit_port, + opts.rabbit_virtual_host, + creds) + + conn = pika.BlockingConnection(params) + LOG.info("Connected to RabbitMQ server [params=%s]" % params) + + try: + channel = conn.channel() + channel.queue_declare(queue=opts.rabbit_task_queue) + + LOG.info("Waiting for task messages...") + + channel.basic_qos(prefetch_count=1) + channel.basic_consume(handle_task, + queue=opts.rabbit_task_queue, + no_ack=False) + + channel.start_consuming() + finally: + conn.close() diff --git a/mistral/engine/scalable/workflow.py b/mistral/engine/scalable/workflow.py new file mode 100644 index 000000000..1591611e5 --- /dev/null +++ b/mistral/engine/scalable/workflow.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def find_workflow_tasks(wb_dsl, target_task_name): + # TODO(rakhmerov): implement using networkX + return None + + +def find_tasks_to_start(tasks): + # TODO(rakhmerov): implement using networkX + # We need to analyse graph and see which tasks are ready to start + return tasks + + +def is_finished(tasks): + # TODO(rakhmerov): implement + return False diff --git a/mistral/engine/states.py b/mistral/engine/states.py new file mode 100644 index 000000000..378813081 --- /dev/null +++ b/mistral/engine/states.py @@ -0,0 +1,36 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Valid task and workflow states.""" + +IDLE = 'IDLE' +RUNNING = 'RUNNING' +SUCCESS = 'SUCCESS' +ERROR = 'ERROR' +STOPPED = 'STOPPED' + +_ALL = [IDLE, RUNNING, SUCCESS, ERROR, STOPPED] + + +def is_valid(state): + return state in _ALL + + +def is_finished(state): + return state in [SUCCESS, ERROR] + + +def is_stopped_or_finished(state): + return state == STOPPED or is_finished(state) diff --git a/mistral/scripts/test.py b/mistral/scripts/test.py new file mode 100644 index 000000000..50e759b32 --- /dev/null +++ b/mistral/scripts/test.py @@ -0,0 +1,13 @@ +from mistral import config +from mistral.engine.scalable import engine +from mistral.openstack.common import log as logging + +config.parse_args() +logging.setup("mistral") + +tasks = [] + +for i in range(1000000): + tasks.append({"id": i, "name": "task%s" % i, "execution_id": 1}) + +engine._notify_task_executors(tasks) diff --git a/requirements.txt b/requirements.txt index 52f8ca9db..abeeda35f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,5 @@ amqplib>=0.6.1 argparse croniter oslo.config>=1.2.0 -requests \ No newline at end of file +requests +pika>=0.9.13 \ No newline at end of file