From c8887c6b6678a07a0bde4789af40345b3f89ee88 Mon Sep 17 00:00:00 2001 From: Stan Lagun Date: Thu, 4 Apr 2013 18:10:25 +0400 Subject: [PATCH] Licenses added --- conductor/bin/conductor | 25 +- conductor/conductor/app.py | 195 ++++---- conductor/conductor/cloud_formation.py | 211 +++++---- conductor/conductor/commands/__init__.py | 15 + .../conductor/commands/cloud_formation.py | 357 ++++++++------- conductor/conductor/commands/command.py | 16 + conductor/conductor/commands/dispatcher.py | 81 ++-- conductor/conductor/config.py | 423 +++++++++--------- conductor/conductor/function_context.py | 16 + conductor/conductor/helpers.py | 15 + conductor/conductor/rabbitmq.py | 268 +++++------ conductor/conductor/reporting.py | 15 + conductor/conductor/version.py | 2 - conductor/conductor/windows_agent.py | 71 +-- conductor/conductor/workflow.py | 18 +- conductor/conductor/xml_code_engine.py | 28 +- 16 files changed, 966 insertions(+), 790 deletions(-) diff --git a/conductor/bin/conductor b/conductor/bin/conductor index 4c938b80..e92f28a8 100644 --- a/conductor/bin/conductor +++ b/conductor/bin/conductor @@ -1,20 +1,17 @@ -#!/usr/bin/env python -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2011 OpenStack LLC. -# All Rights Reserved. +# Copyright (c) 2013 Mirantis Inc. # -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. import sys diff --git a/conductor/conductor/app.py b/conductor/conductor/app.py index 35e746bc..e70f59e8 100644 --- a/conductor/conductor/app.py +++ b/conductor/conductor/app.py @@ -1,90 +1,105 @@ -import datetime -import glob -import sys -import traceback - -import anyjson -from conductor.openstack.common import service -from workflow import Workflow -from commands.dispatcher import CommandDispatcher -from openstack.common import log as logging -from config import Config -import reporting -import rabbitmq - -import windows_agent -import cloud_formation - -config = Config(sys.argv[1] if len(sys.argv) > 1 else None) - -log = logging.getLogger(__name__) - - -def task_received(task, message_id): - with rabbitmq.RmqClient() as rmqclient: - try: - log.info('Starting processing task {0}: {1}'.format( - message_id, anyjson.dumps(task))) - reporter = reporting.Reporter(rmqclient, message_id, task['id']) - - command_dispatcher = CommandDispatcher( - task['name'], rmqclient, task['token'], task['tenant_id']) - workflows = [] - for path in glob.glob("data/workflows/*.xml"): - log.debug('Loading XML {0}'.format(path)) - workflow = Workflow(path, task, command_dispatcher, config, - reporter) - workflows.append(workflow) - - while True: - try: - while True: - result = False - for workflow in workflows: - if workflow.execute(): - result = True - if not result: - break - if not command_dispatcher.execute_pending(): - break - except Exception as ex: - log.exception(ex) - break - - command_dispatcher.close() - finally: - del task['token'] - result_msg = rabbitmq.Message() - result_msg.body = task - result_msg.id = message_id - - rmqclient.send(message=result_msg, key='task-results') - log.info('Finished processing task {0}. Result = {1}'.format( - message_id, anyjson.dumps(task))) - - -class ConductorWorkflowService(service.Service): - def __init__(self): - super(ConductorWorkflowService, self).__init__() - - def start(self): - super(ConductorWorkflowService, self).start() - self.tg.add_thread(self._start_rabbitmq) - - def stop(self): - super(ConductorWorkflowService, self).stop() - - def _start_rabbitmq(self): - while True: - try: - with rabbitmq.RmqClient() as rmq: - rmq.declare('tasks', 'tasks') - rmq.declare('task-results') - with rmq.open('tasks') as subscription: - while True: - msg = subscription.get_message() - self.tg.add_thread( - task_received, msg.body, msg.id) - except Exception as ex: - log.exception(ex) - +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import glob +import sys +import traceback + +import anyjson +from conductor.openstack.common import service +from workflow import Workflow +from commands.dispatcher import CommandDispatcher +from openstack.common import log as logging +from config import Config +import reporting +import rabbitmq + +import windows_agent +import cloud_formation + +config = Config(sys.argv[1] if len(sys.argv) > 1 else None) + +log = logging.getLogger(__name__) + + +def task_received(task, message_id): + with rabbitmq.RmqClient() as rmqclient: + try: + log.info('Starting processing task {0}: {1}'.format( + message_id, anyjson.dumps(task))) + reporter = reporting.Reporter(rmqclient, message_id, task['id']) + + command_dispatcher = CommandDispatcher( + task['name'], rmqclient, task['token'], task['tenant_id']) + workflows = [] + for path in glob.glob("data/workflows/*.xml"): + log.debug('Loading XML {0}'.format(path)) + workflow = Workflow(path, task, command_dispatcher, config, + reporter) + workflows.append(workflow) + + while True: + try: + while True: + result = False + for workflow in workflows: + if workflow.execute(): + result = True + if not result: + break + if not command_dispatcher.execute_pending(): + break + except Exception as ex: + log.exception(ex) + break + + command_dispatcher.close() + finally: + del task['token'] + result_msg = rabbitmq.Message() + result_msg.body = task + result_msg.id = message_id + + rmqclient.send(message=result_msg, key='task-results') + log.info('Finished processing task {0}. Result = {1}'.format( + message_id, anyjson.dumps(task))) + + +class ConductorWorkflowService(service.Service): + def __init__(self): + super(ConductorWorkflowService, self).__init__() + + def start(self): + super(ConductorWorkflowService, self).start() + self.tg.add_thread(self._start_rabbitmq) + + def stop(self): + super(ConductorWorkflowService, self).stop() + + def _start_rabbitmq(self): + while True: + try: + with rabbitmq.RmqClient() as rmq: + rmq.declare('tasks', 'tasks') + rmq.declare('task-results') + with rmq.open('tasks') as subscription: + while True: + msg = subscription.get_message() + self.tg.add_thread( + task_received, msg.body, msg.id) + except Exception as ex: + log.exception(ex) + diff --git a/conductor/conductor/cloud_formation.py b/conductor/conductor/cloud_formation.py index a1283553..8f2b9a5e 100644 --- a/conductor/conductor/cloud_formation.py +++ b/conductor/conductor/cloud_formation.py @@ -1,98 +1,113 @@ -import base64 - -import xml_code_engine -import config -from random import choice -import time -import string - - -def update_cf_stack(engine, context, body, template, - mappings, arguments, **kwargs): - command_dispatcher = context['/commandDispatcher'] - - callback = lambda result: engine.evaluate_content( - body.find('success'), context) - - command_dispatcher.execute( - name='cf', command='CreateOrUpdate', template=template, - mappings=mappings, arguments=arguments, callback=callback) - - -def delete_cf_stack(engine, context, body, **kwargs): - command_dispatcher = context['/commandDispatcher'] - - callback = lambda result: engine.evaluate_content( - body.find('success'), context) - - command_dispatcher.execute( - name='cf', command='Delete', callback=callback) - - -def prepare_user_data(context, hostname, service, unit, template='Default', **kwargs): - settings = config.CONF.rabbitmq - - with open('data/init.ps1') as init_script_file: - with open('data/templates/agent-config/{0}.template'.format( - template)) as template_file: - init_script = init_script_file.read() - template_data = template_file.read() - template_data = template_data.replace( - '%RABBITMQ_HOST%', settings.host) - template_data = template_data.replace( - '%RABBITMQ_INPUT_QUEUE%', - '-'.join([str(context['/dataSource']['name']), - str(service), str(unit)]).lower() - ) - template_data = template_data.replace( - '%RESULT_QUEUE%', - '-execution-results-{0}'.format( - str(context['/dataSource']['name'])).lower()) - - init_script = init_script.replace( - '%WINDOWS_AGENT_CONFIG_BASE64%', - base64.b64encode(template_data)) - - init_script = init_script.replace('%INTERNAL_HOSTNAME%', hostname) - - return init_script - -counter = 0 - - -def int2base(x, base): - digs = string.digits + string.lowercase - if x < 0: sign = -1 - elif x==0: return '0' - else: sign = 1 - x *= sign - digits = [] - while x: - digits.append(digs[x % base]) - x /= base - if sign < 0: - digits.append('-') - digits.reverse() - return ''.join(digits) - - -def generate_hostname(**kwargs): - global counter - prefix = ''.join(choice(string.lowercase) for _ in range(5)) - timestamp = int2base(int(time.time() * 1000), 36)[:8] - suffix = int2base(counter, 36) - counter = (counter + 1) % 1296 - return prefix + timestamp + suffix - - -xml_code_engine.XmlCodeEngine.register_function( - update_cf_stack, "update-cf-stack") - -xml_code_engine.XmlCodeEngine.register_function( - delete_cf_stack, "delete-cf-stack") - -xml_code_engine.XmlCodeEngine.register_function( - prepare_user_data, "prepare-user-data") - -xml_code_engine.XmlCodeEngine.register_function( - generate_hostname, "generate-hostname") +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 + +import xml_code_engine +import config +from random import choice +import time +import string + + +def update_cf_stack(engine, context, body, template, + mappings, arguments, **kwargs): + command_dispatcher = context['/commandDispatcher'] + + callback = lambda result: engine.evaluate_content( + body.find('success'), context) + + command_dispatcher.execute( + name='cf', command='CreateOrUpdate', template=template, + mappings=mappings, arguments=arguments, callback=callback) + + +def delete_cf_stack(engine, context, body, **kwargs): + command_dispatcher = context['/commandDispatcher'] + + callback = lambda result: engine.evaluate_content( + body.find('success'), context) + + command_dispatcher.execute( + name='cf', command='Delete', callback=callback) + + +def prepare_user_data(context, hostname, service, unit, + template='Default', **kwargs): + settings = config.CONF.rabbitmq + + with open('data/init.ps1') as init_script_file: + with open('data/templates/agent-config/{0}.template'.format( + template)) as template_file: + init_script = init_script_file.read() + template_data = template_file.read() + template_data = template_data.replace( + '%RABBITMQ_HOST%', settings.host) + template_data = template_data.replace( + '%RABBITMQ_INPUT_QUEUE%', + '-'.join([str(context['/dataSource']['name']), + str(service), str(unit)]).lower() + ) + template_data = template_data.replace( + '%RESULT_QUEUE%', + '-execution-results-{0}'.format( + str(context['/dataSource']['name'])).lower()) + + init_script = init_script.replace( + '%WINDOWS_AGENT_CONFIG_BASE64%', + base64.b64encode(template_data)) + + init_script = init_script.replace('%INTERNAL_HOSTNAME%', hostname) + + return init_script + +counter = 0 + +def int2base(x, base): + digs = string.digits + string.lowercase + if x < 0: sign = -1 + elif x==0: return '0' + else: sign = 1 + x *= sign + digits = [] + while x: + digits.append(digs[x % base]) + x /= base + if sign < 0: + digits.append('-') + digits.reverse() + return ''.join(digits) + + +def generate_hostname(**kwargs): + global counter + prefix = ''.join(choice(string.lowercase) for _ in range(5)) + timestamp = int2base(int(time.time() * 1000), 36)[:8] + suffix = int2base(counter, 36) + counter = (counter + 1) % 1296 + return prefix + timestamp + suffix + + +xml_code_engine.XmlCodeEngine.register_function( + update_cf_stack, "update-cf-stack") + +xml_code_engine.XmlCodeEngine.register_function( + delete_cf_stack, "delete-cf-stack") + +xml_code_engine.XmlCodeEngine.register_function( + prepare_user_data, "prepare-user-data") + +xml_code_engine.XmlCodeEngine.register_function( + generate_hostname, "generate-hostname") diff --git a/conductor/conductor/commands/__init__.py b/conductor/conductor/commands/__init__.py index 551f6ea8..1fa2c573 100644 --- a/conductor/conductor/commands/__init__.py +++ b/conductor/conductor/commands/__init__.py @@ -1 +1,16 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import command diff --git a/conductor/conductor/commands/cloud_formation.py b/conductor/conductor/commands/cloud_formation.py index 409c652c..9cc7a1ca 100644 --- a/conductor/conductor/commands/cloud_formation.py +++ b/conductor/conductor/commands/cloud_formation.py @@ -1,169 +1,188 @@ -import anyjson -import eventlet - -import jsonpath -from conductor.openstack.common import log as logging -import conductor.helpers -from command import CommandBase -import conductor.config -from heatclient.client import Client -import heatclient.exc -from keystoneclient.v2_0 import client as ksclient -import types - -log = logging.getLogger(__name__) - - -class HeatExecutor(CommandBase): - def __init__(self, stack, token, tenant_id): - self._update_pending_list = [] - self._delete_pending_list = [] - self._stack = stack - settings = conductor.config.CONF.heat - - client = ksclient.Client(endpoint=settings.auth_url) - auth_data = client.tokens.authenticate( - tenant_id=tenant_id, - token=token) - - scoped_token = auth_data.id - - heat_url = jsonpath.jsonpath(auth_data.serviceCatalog, - "$[?(@.name == 'heat')].endpoints[0].publicURL")[0] - - self._heat_client = Client('1', heat_url, - token_only=True, token=scoped_token) - - def execute(self, command, callback, **kwargs): - log.debug('Got command {0} on stack {1}'.format(command, self._stack)) - - if command == 'CreateOrUpdate': - return self._execute_create_update( - kwargs['template'], - kwargs['mappings'], - kwargs['arguments'], - callback) - elif command == 'Delete': - return self._execute_delete(callback) - - def _execute_create_update(self, template, mappings, arguments, callback): - with open('data/templates/cf/%s.template' % template) as template_file: - template_data = template_file.read() - - template_data = conductor.helpers.transform_json( - anyjson.loads(template_data), mappings) - - self._update_pending_list.append({ - 'template': template_data, - 'arguments': arguments, - 'callback': callback - }) - - def _execute_delete(self, callback): - self._delete_pending_list.append({ - 'callback': callback - }) - - def has_pending_commands(self): - return len(self._update_pending_list) + \ - len(self._delete_pending_list) > 0 - - def execute_pending(self): - r1 = self._execute_pending_updates() - r2 = self._execute_pending_deletes() - return r1 or r2 - - def _execute_pending_updates(self): - if not len(self._update_pending_list): - return False - - template, arguments = self._get_current_template() - stack_exists = (template != {}) - - for t in self._update_pending_list: - template = conductor.helpers.merge_dicts( - template, t['template'], max_levels=2) - arguments = conductor.helpers.merge_dicts( - arguments, t['arguments'], max_levels=1) - - log.info( - 'Executing heat template {0} with arguments {1} on stack {2}' - .format(anyjson.dumps(template), arguments, self._stack)) - - if stack_exists: - self._heat_client.stacks.update( - stack_id=self._stack, - parameters=arguments, - template=template) - log.debug( - 'Waiting for the stack {0} to be update'.format(self._stack)) - self._wait_state('UPDATE_COMPLETE') - log.info('Stack {0} updated'.format(self._stack)) - else: - self._heat_client.stacks.create( - stack_name=self._stack, - parameters=arguments, - template=template) - log.debug('Waiting for the stack {0} to be create'.format( - self._stack)) - self._wait_state('CREATE_COMPLETE') - log.info('Stack {0} created'.format(self._stack)) - - pending_list = self._update_pending_list - self._update_pending_list = [] - - for item in pending_list: - item['callback'](True) - - return True - - def _execute_pending_deletes(self): - if not len(self._delete_pending_list): - return False - - log.debug('Deleting stack {0}'.format(self._stack)) - try: - self._heat_client.stacks.delete( - stack_id=self._stack) - log.debug( - 'Waiting for the stack {0} to be deleted'.format(self._stack)) - self._wait_state(['DELETE_COMPLETE', '']) - log.info('Stack {0} deleted'.format(self._stack)) - except Exception as ex: - log.exception(ex) - - pending_list = self._delete_pending_list - self._delete_pending_list = [] - - for item in pending_list: - item['callback'](True) - return True - - def _get_current_template(self): - try: - stack_info = self._heat_client.stacks.get(stack_id=self._stack) - template = self._heat_client.stacks.template( - stack_id='{0}/{1}'.format(stack_info.stack_name, stack_info.id)) - return template, stack_info.parameters - except heatclient.exc.HTTPNotFound: - return {}, {} - - def _wait_state(self, state): - if isinstance(state, types.ListType): - states = state - else: - states = [state] - - while True: - try: - status = self._heat_client.stacks.get( - stack_id=self._stack).stack_status - except heatclient.exc.HTTPNotFound: - status = '' - - if 'IN_PROGRESS' in status: - eventlet.sleep(1) - continue - if status not in states: - raise EnvironmentError() - return +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import anyjson +import eventlet +import types +import jsonpath + +from conductor.openstack.common import log as logging +import conductor.helpers +from command import CommandBase +import conductor.config +from heatclient.client import Client +import heatclient.exc +from keystoneclient.v2_0 import client as ksclient + +log = logging.getLogger(__name__) + + +class HeatExecutor(CommandBase): + def __init__(self, stack, token, tenant_id): + self._update_pending_list = [] + self._delete_pending_list = [] + self._stack = stack + settings = conductor.config.CONF.heat + + client = ksclient.Client(endpoint=settings.auth_url) + auth_data = client.tokens.authenticate( + tenant_id=tenant_id, + token=token) + + scoped_token = auth_data.id + + heat_url = jsonpath.jsonpath( + auth_data.serviceCatalog, + "$[?(@.name == 'heat')].endpoints[0].publicURL")[0] + + self._heat_client = Client( + '1', + heat_url, + token_only=True, + token=scoped_token) + + def execute(self, command, callback, **kwargs): + log.debug('Got command {0} on stack {1}'.format(command, self._stack)) + + if command == 'CreateOrUpdate': + return self._execute_create_update( + kwargs['template'], + kwargs['mappings'], + kwargs['arguments'], + callback) + elif command == 'Delete': + return self._execute_delete(callback) + + def _execute_create_update(self, template, mappings, arguments, callback): + with open('data/templates/cf/%s.template' % template) as template_file: + template_data = template_file.read() + + template_data = conductor.helpers.transform_json( + anyjson.loads(template_data), mappings) + + self._update_pending_list.append({ + 'template': template_data, + 'arguments': arguments, + 'callback': callback + }) + + def _execute_delete(self, callback): + self._delete_pending_list.append({ + 'callback': callback + }) + + def has_pending_commands(self): + return len(self._update_pending_list) + \ + len(self._delete_pending_list) > 0 + + def execute_pending(self): + r1 = self._execute_pending_updates() + r2 = self._execute_pending_deletes() + return r1 or r2 + + def _execute_pending_updates(self): + if not len(self._update_pending_list): + return False + + template, arguments = self._get_current_template() + stack_exists = (template != {}) + + for t in self._update_pending_list: + template = conductor.helpers.merge_dicts( + template, t['template'], max_levels=2) + arguments = conductor.helpers.merge_dicts( + arguments, t['arguments'], max_levels=1) + + log.info( + 'Executing heat template {0} with arguments {1} on stack {2}' + .format(anyjson.dumps(template), arguments, self._stack)) + + if stack_exists: + self._heat_client.stacks.update( + stack_id=self._stack, + parameters=arguments, + template=template) + log.debug( + 'Waiting for the stack {0} to be update'.format(self._stack)) + self._wait_state('UPDATE_COMPLETE') + log.info('Stack {0} updated'.format(self._stack)) + else: + self._heat_client.stacks.create( + stack_name=self._stack, + parameters=arguments, + template=template) + log.debug('Waiting for the stack {0} to be create'.format( + self._stack)) + self._wait_state('CREATE_COMPLETE') + log.info('Stack {0} created'.format(self._stack)) + + pending_list = self._update_pending_list + self._update_pending_list = [] + + for item in pending_list: + item['callback'](True) + + return True + + def _execute_pending_deletes(self): + if not len(self._delete_pending_list): + return False + + log.debug('Deleting stack {0}'.format(self._stack)) + try: + self._heat_client.stacks.delete( + stack_id=self._stack) + log.debug( + 'Waiting for the stack {0} to be deleted'.format(self._stack)) + self._wait_state(['DELETE_COMPLETE', '']) + log.info('Stack {0} deleted'.format(self._stack)) + except Exception as ex: + log.exception(ex) + + pending_list = self._delete_pending_list + self._delete_pending_list = [] + + for item in pending_list: + item['callback'](True) + return True + + def _get_current_template(self): + try: + stack_info = self._heat_client.stacks.get(stack_id=self._stack) + template = self._heat_client.stacks.template( + stack_id='{0}/{1}'.format(stack_info.stack_name, stack_info.id)) + return template, stack_info.parameters + except heatclient.exc.HTTPNotFound: + return {}, {} + + def _wait_state(self, state): + if isinstance(state, types.ListType): + states = state + else: + states = [state] + + while True: + try: + status = self._heat_client.stacks.get( + stack_id=self._stack).stack_status + except heatclient.exc.HTTPNotFound: + status = '' + + if 'IN_PROGRESS' in status: + eventlet.sleep(1) + continue + if status not in states: + raise EnvironmentError() + return diff --git a/conductor/conductor/commands/command.py b/conductor/conductor/commands/command.py index ad2d469b..606c7293 100644 --- a/conductor/conductor/commands/command.py +++ b/conductor/conductor/commands/command.py @@ -1,3 +1,19 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + class CommandBase(object): def execute(self, **kwargs): pass diff --git a/conductor/conductor/commands/dispatcher.py b/conductor/conductor/commands/dispatcher.py index 92e11835..37014459 100644 --- a/conductor/conductor/commands/dispatcher.py +++ b/conductor/conductor/commands/dispatcher.py @@ -1,33 +1,48 @@ -import command -import cloud_formation -import windows_agent - - -class CommandDispatcher(command.CommandBase): - def __init__(self, environment_id, rmqclient, token, tenant_id): - self._command_map = { - 'cf': cloud_formation.HeatExecutor(environment_id, token, tenant_id), - 'agent': windows_agent.WindowsAgentExecutor( - environment_id, rmqclient) - } - - def execute(self, name, **kwargs): - self._command_map[name].execute(**kwargs) - - def execute_pending(self): - result = False - for command in self._command_map.values(): - result |= command.execute_pending() - - return result - - def has_pending_commands(self): - result = False - for command in self._command_map.values(): - result |= command.has_pending_commands() - - return result - - def close(self): - for t in self._command_map.values(): - t.close() +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import command +import cloud_formation +import windows_agent + + +class CommandDispatcher(command.CommandBase): + def __init__(self, environment, rmqclient, token, tenant_id): + self._command_map = { + 'cf': cloud_formation.HeatExecutor(environment, token, tenant_id), + 'agent': windows_agent.WindowsAgentExecutor( + environment, rmqclient) + } + + def execute(self, name, **kwargs): + self._command_map[name].execute(**kwargs) + + def execute_pending(self): + result = False + for command in self._command_map.values(): + result |= command.execute_pending() + + return result + + def has_pending_commands(self): + result = False + for command in self._command_map.values(): + result |= command.has_pending_commands() + + return result + + def close(self): + for t in self._command_map.values(): + t.close() diff --git a/conductor/conductor/config.py b/conductor/conductor/config.py index a5caea7e..bb804dab 100644 --- a/conductor/conductor/config.py +++ b/conductor/conductor/config.py @@ -1,213 +1,210 @@ -#!/usr/bin/env python -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2011 OpenStack LLC. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Routines for configuring Glance -""" - -import logging -import logging.config -import logging.handlers -import os -import sys - -from oslo.config import cfg -from paste import deploy - -from conductor.version import version_info as version -from ConfigParser import SafeConfigParser - -paste_deploy_opts = [ - cfg.StrOpt('flavor'), - cfg.StrOpt('config_file'), -] - -rabbit_opts = [ - cfg.StrOpt('host', default='localhost'), - cfg.IntOpt('port', default=5672), - cfg.StrOpt('login', default='guest'), - cfg.StrOpt('password', default='guest'), - cfg.StrOpt('virtual_host', default='/'), -] - -heat_opts = [ - cfg.StrOpt('auth_url'), -] - -CONF = cfg.CONF -CONF.register_opts(paste_deploy_opts, group='paste_deploy') -CONF.register_opts(rabbit_opts, group='rabbitmq') -CONF.register_opts(heat_opts, group='heat') - - -CONF.import_opt('verbose', 'conductor.openstack.common.log') -CONF.import_opt('debug', 'conductor.openstack.common.log') -CONF.import_opt('log_dir', 'conductor.openstack.common.log') -CONF.import_opt('log_file', 'conductor.openstack.common.log') -CONF.import_opt('log_config', 'conductor.openstack.common.log') -CONF.import_opt('log_format', 'conductor.openstack.common.log') -CONF.import_opt('log_date_format', 'conductor.openstack.common.log') -CONF.import_opt('use_syslog', 'conductor.openstack.common.log') -CONF.import_opt('syslog_log_facility', 'conductor.openstack.common.log') - - -def parse_args(args=None, usage=None, default_config_files=None): - CONF(args=args, - project='conductor', - version=version.cached_version_string(), - usage=usage, - default_config_files=default_config_files) - - -def setup_logging(): - """ - Sets up the logging options for a log with supplied name - """ - - if CONF.log_config: - # Use a logging configuration file for all settings... - if os.path.exists(CONF.log_config): - logging.config.fileConfig(CONF.log_config) - return - else: - raise RuntimeError("Unable to locate specified logging " - "config file: %s" % CONF.log_config) - - root_logger = logging.root - if CONF.debug: - root_logger.setLevel(logging.DEBUG) - elif CONF.verbose: - root_logger.setLevel(logging.INFO) - else: - root_logger.setLevel(logging.WARNING) - - formatter = logging.Formatter(CONF.log_format, CONF.log_date_format) - - if CONF.use_syslog: - try: - facility = getattr(logging.handlers.SysLogHandler, - CONF.syslog_log_facility) - except AttributeError: - raise ValueError(_("Invalid syslog facility")) - - handler = logging.handlers.SysLogHandler(address='/dev/log', - facility=facility) - elif CONF.log_file: - logfile = CONF.log_file - if CONF.log_dir: - logfile = os.path.join(CONF.log_dir, logfile) - handler = logging.handlers.WatchedFileHandler(logfile) - else: - handler = logging.StreamHandler(sys.stdout) - - handler.setFormatter(formatter) - root_logger.addHandler(handler) - - -def _get_deployment_flavor(): - """ - Retrieve the paste_deploy.flavor config item, formatted appropriately - for appending to the application name. - """ - flavor = CONF.paste_deploy.flavor - return '' if not flavor else ('-' + flavor) - - -def _get_paste_config_path(): - paste_suffix = '-paste.ini' - conf_suffix = '.conf' - if CONF.config_file: - # Assume paste config is in a paste.ini file corresponding - # to the last config file - path = CONF.config_file[-1].replace(conf_suffix, paste_suffix) - else: - path = CONF.prog + '-paste.ini' - return CONF.find_file(os.path.basename(path)) - - -def _get_deployment_config_file(): - """ - Retrieve the deployment_config_file config item, formatted as an - absolute pathname. - """ - path = CONF.paste_deploy.config_file - if not path: - path = _get_paste_config_path() - if not path: - msg = "Unable to locate paste config file for %s." % CONF.prog - raise RuntimeError(msg) - return os.path.abspath(path) - - -def load_paste_app(app_name=None): - """ - Builds and returns a WSGI app from a paste config file. - - We assume the last config file specified in the supplied ConfigOpts - object is the paste config file. - - :param app_name: name of the application to load - - :raises RuntimeError when config file cannot be located or application - cannot be loaded from config file - """ - if app_name is None: - app_name = CONF.prog - - # append the deployment flavor to the application name, - # in order to identify the appropriate paste pipeline - app_name += _get_deployment_flavor() - - conf_file = _get_deployment_config_file() - - try: - logger = logging.getLogger(__name__) - logger.debug(_("Loading %(app_name)s from %(conf_file)s"), - {'conf_file': conf_file, 'app_name': app_name}) - - app = deploy.loadapp("config:%s" % conf_file, name=app_name) - - # Log the options used when starting if we're in debug mode... - if CONF.debug: - CONF.log_opt_values(logger, logging.DEBUG) - - return app - except (LookupError, ImportError), e: - msg = _("Unable to load %(app_name)s from " - "configuration file %(conf_file)s." - "\nGot: %(e)r") % locals() - logger.error(msg) - raise RuntimeError(msg) - - -class Config(object): - CONFIG_PATH = './etc/app.config' - - def __init__(self, filename=None): - self.config = SafeConfigParser() - self.config.read(filename or self.CONFIG_PATH) - - def get_setting(self, section, name, default=None): - if not self.config.has_option(section, name): - return default - return self.config.get(section, name) - - def __getitem__(self, item): - parts = item.rsplit('.', 1) - return self.get_setting( - parts[0] if len(parts) == 2 else 'DEFAULT', parts[-1]) +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Routines for configuring Glance +""" + +import logging +import logging.config +import logging.handlers +import os +import sys + +from oslo.config import cfg +from paste import deploy + +from conductor.version import version_info as version +from ConfigParser import SafeConfigParser + +paste_deploy_opts = [ + cfg.StrOpt('flavor'), + cfg.StrOpt('config_file'), +] + +rabbit_opts = [ + cfg.StrOpt('host', default='localhost'), + cfg.IntOpt('port', default=5672), + cfg.StrOpt('login', default='guest'), + cfg.StrOpt('password', default='guest'), + cfg.StrOpt('virtual_host', default='/'), +] + +heat_opts = [ + cfg.StrOpt('auth_url'), +] + +CONF = cfg.CONF +CONF.register_opts(paste_deploy_opts, group='paste_deploy') +CONF.register_opts(rabbit_opts, group='rabbitmq') +CONF.register_opts(heat_opts, group='heat') + + +CONF.import_opt('verbose', 'conductor.openstack.common.log') +CONF.import_opt('debug', 'conductor.openstack.common.log') +CONF.import_opt('log_dir', 'conductor.openstack.common.log') +CONF.import_opt('log_file', 'conductor.openstack.common.log') +CONF.import_opt('log_config', 'conductor.openstack.common.log') +CONF.import_opt('log_format', 'conductor.openstack.common.log') +CONF.import_opt('log_date_format', 'conductor.openstack.common.log') +CONF.import_opt('use_syslog', 'conductor.openstack.common.log') +CONF.import_opt('syslog_log_facility', 'conductor.openstack.common.log') + + +def parse_args(args=None, usage=None, default_config_files=None): + CONF(args=args, + project='conductor', + version=version.cached_version_string(), + usage=usage, + default_config_files=default_config_files) + + +def setup_logging(): + """ + Sets up the logging options for a log with supplied name + """ + + if CONF.log_config: + # Use a logging configuration file for all settings... + if os.path.exists(CONF.log_config): + logging.config.fileConfig(CONF.log_config) + return + else: + raise RuntimeError("Unable to locate specified logging " + "config file: %s" % CONF.log_config) + + root_logger = logging.root + if CONF.debug: + root_logger.setLevel(logging.DEBUG) + elif CONF.verbose: + root_logger.setLevel(logging.INFO) + else: + root_logger.setLevel(logging.WARNING) + + formatter = logging.Formatter(CONF.log_format, CONF.log_date_format) + + if CONF.use_syslog: + try: + facility = getattr(logging.handlers.SysLogHandler, + CONF.syslog_log_facility) + except AttributeError: + raise ValueError(_("Invalid syslog facility")) + + handler = logging.handlers.SysLogHandler(address='/dev/log', + facility=facility) + elif CONF.log_file: + logfile = CONF.log_file + if CONF.log_dir: + logfile = os.path.join(CONF.log_dir, logfile) + handler = logging.handlers.WatchedFileHandler(logfile) + else: + handler = logging.StreamHandler(sys.stdout) + + handler.setFormatter(formatter) + root_logger.addHandler(handler) + + +def _get_deployment_flavor(): + """ + Retrieve the paste_deploy.flavor config item, formatted appropriately + for appending to the application name. + """ + flavor = CONF.paste_deploy.flavor + return '' if not flavor else ('-' + flavor) + + +def _get_paste_config_path(): + paste_suffix = '-paste.ini' + conf_suffix = '.conf' + if CONF.config_file: + # Assume paste config is in a paste.ini file corresponding + # to the last config file + path = CONF.config_file[-1].replace(conf_suffix, paste_suffix) + else: + path = CONF.prog + '-paste.ini' + return CONF.find_file(os.path.basename(path)) + + +def _get_deployment_config_file(): + """ + Retrieve the deployment_config_file config item, formatted as an + absolute pathname. + """ + path = CONF.paste_deploy.config_file + if not path: + path = _get_paste_config_path() + if not path: + msg = "Unable to locate paste config file for %s." % CONF.prog + raise RuntimeError(msg) + return os.path.abspath(path) + + +def load_paste_app(app_name=None): + """ + Builds and returns a WSGI app from a paste config file. + + We assume the last config file specified in the supplied ConfigOpts + object is the paste config file. + + :param app_name: name of the application to load + + :raises RuntimeError when config file cannot be located or application + cannot be loaded from config file + """ + if app_name is None: + app_name = CONF.prog + + # append the deployment flavor to the application name, + # in order to identify the appropriate paste pipeline + app_name += _get_deployment_flavor() + + conf_file = _get_deployment_config_file() + + try: + logger = logging.getLogger(__name__) + logger.debug(_("Loading %(app_name)s from %(conf_file)s"), + {'conf_file': conf_file, 'app_name': app_name}) + + app = deploy.loadapp("config:%s" % conf_file, name=app_name) + + # Log the options used when starting if we're in debug mode... + if CONF.debug: + CONF.log_opt_values(logger, logging.DEBUG) + + return app + except (LookupError, ImportError), e: + msg = _("Unable to load %(app_name)s from " + "configuration file %(conf_file)s." + "\nGot: %(e)r") % locals() + logger.error(msg) + raise RuntimeError(msg) + + +class Config(object): + CONFIG_PATH = './etc/app.config' + + def __init__(self, filename=None): + self.config = SafeConfigParser() + self.config.read(filename or self.CONFIG_PATH) + + def get_setting(self, section, name, default=None): + if not self.config.has_option(section, name): + return default + return self.config.get(section, name) + + def __getitem__(self, item): + parts = item.rsplit('.', 1) + return self.get_setting( + parts[0] if len(parts) == 2 else 'DEFAULT', parts[-1]) diff --git a/conductor/conductor/function_context.py b/conductor/conductor/function_context.py index e27b6db8..e210fd7b 100644 --- a/conductor/conductor/function_context.py +++ b/conductor/conductor/function_context.py @@ -1,3 +1,19 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + class Context(object): def __init__(self, parent=None): self._parent = parent diff --git a/conductor/conductor/helpers.py b/conductor/conductor/helpers.py index 435a35be..da196560 100644 --- a/conductor/conductor/helpers.py +++ b/conductor/conductor/helpers.py @@ -1,3 +1,18 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import types diff --git a/conductor/conductor/rabbitmq.py b/conductor/conductor/rabbitmq.py index 7183deed..f9d5d4c6 100644 --- a/conductor/conductor/rabbitmq.py +++ b/conductor/conductor/rabbitmq.py @@ -1,127 +1,141 @@ -from eventlet import patcher -puka = patcher.import_patched('puka') -#import puka -import anyjson -import config - - -class RmqClient(object): - def __init__(self): - settings = config.CONF.rabbitmq - self._client = puka.Client('amqp://{0}:{1}@{2}:{3}/{4}'.format( - settings.login, - settings.password, - settings.host, - settings.port, - settings.virtual_host - )) - self._connected = False - - def __enter__(self): - self.connect() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() - return False - - def connect(self): - if not self._connected: - promise = self._client.connect() - self._client.wait(promise, timeout=10000) - self._connected = True - - def close(self): - if self._connected: - self._client.close() - self._connected = False - - def declare(self, queue, exchange=None): - promise = self._client.queue_declare(str(queue), durable=True) - self._client.wait(promise) - - if exchange: - promise = self._client.exchange_declare(str(exchange), durable=True) - self._client.wait(promise) - promise = self._client.queue_bind( - str(queue), str(exchange), routing_key=str(queue)) - self._client.wait(promise) - - def send(self, message, key, exchange='', timeout=None): - if not self._connected: - raise RuntimeError('Not connected to RabbitMQ') - - headers = { 'message_id': message.id } - - promise = self._client.basic_publish( - exchange=str(exchange), - routing_key=str(key), - body=anyjson.dumps(message.body), - headers=headers) - self._client.wait(promise, timeout=timeout) - - def open(self, queue): - if not self._connected: - raise RuntimeError('Not connected to RabbitMQ') - - return Subscription(self._client, queue) - - -class Subscription(object): - def __init__(self, client, queue): - self._client = client - self._queue = queue - self._promise = None - self._lastMessage = None - - def __enter__(self): - self._promise = self._client.basic_consume( - queue=self._queue, - prefetch_count=1) - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self._ack_last() - promise = self._client.basic_cancel(self._promise) - self._client.wait(promise) - return False - - def _ack_last(self): - if self._lastMessage: - self._client.basic_ack(self._lastMessage) - self._lastMessage = None - - def get_message(self, timeout=None): - if not self._promise: - raise RuntimeError( - "Subscription object must be used within 'with' block") - self._ack_last() - self._lastMessage = self._client.wait(self._promise, timeout=timeout) - #print self._lastMessage - msg = Message() - msg.body = anyjson.loads(self._lastMessage['body']) - msg.id = self._lastMessage['headers'].get('message_id') - return msg - - -class Message(object): - def __init__(self): - self._body = {} - self._id = '' - - @property - def body(self): - return self._body - - @body.setter - def body(self, value): - self._body = value - - @property - def id(self): - return self._id - - @id.setter - def id(self, value): - self._id = value or '' - +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from eventlet import patcher +puka = patcher.import_patched('puka') +#import puka +import anyjson +import config + + +class RmqClient(object): + def __init__(self): + settings = config.CONF.rabbitmq + self._client = puka.Client('amqp://{0}:{1}@{2}:{3}/{4}'.format( + settings.login, + settings.password, + settings.host, + settings.port, + settings.virtual_host + )) + self._connected = False + + def __enter__(self): + self.connect() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + return False + + def connect(self): + if not self._connected: + promise = self._client.connect() + self._client.wait(promise, timeout=10000) + self._connected = True + + def close(self): + if self._connected: + self._client.close() + self._connected = False + + def declare(self, queue, exchange=None): + promise = self._client.queue_declare(str(queue), durable=True) + self._client.wait(promise) + + if exchange: + promise = self._client.exchange_declare(str(exchange), durable=True) + self._client.wait(promise) + promise = self._client.queue_bind( + str(queue), str(exchange), routing_key=str(queue)) + self._client.wait(promise) + + def send(self, message, key, exchange='', timeout=None): + if not self._connected: + raise RuntimeError('Not connected to RabbitMQ') + + headers = { 'message_id': message.id } + + promise = self._client.basic_publish( + exchange=str(exchange), + routing_key=str(key), + body=anyjson.dumps(message.body), + headers=headers) + self._client.wait(promise, timeout=timeout) + + def open(self, queue): + if not self._connected: + raise RuntimeError('Not connected to RabbitMQ') + + return Subscription(self._client, queue) + + +class Subscription(object): + def __init__(self, client, queue): + self._client = client + self._queue = queue + self._promise = None + self._lastMessage = None + + def __enter__(self): + self._promise = self._client.basic_consume( + queue=self._queue, + prefetch_count=1) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._ack_last() + promise = self._client.basic_cancel(self._promise) + self._client.wait(promise) + return False + + def _ack_last(self): + if self._lastMessage: + self._client.basic_ack(self._lastMessage) + self._lastMessage = None + + def get_message(self, timeout=None): + if not self._promise: + raise RuntimeError( + "Subscription object must be used within 'with' block") + self._ack_last() + self._lastMessage = self._client.wait(self._promise, timeout=timeout) + msg = Message() + msg.body = anyjson.loads(self._lastMessage['body']) + msg.id = self._lastMessage['headers'].get('message_id') + return msg + + +class Message(object): + def __init__(self): + self._body = {} + self._id = '' + + @property + def body(self): + return self._body + + @body.setter + def body(self, value): + self._body = value + + @property + def id(self): + return self._id + + @id.setter + def id(self, value): + self._id = value or '' + diff --git a/conductor/conductor/reporting.py b/conductor/conductor/reporting.py index b6c1458d..d9d5fdf9 100644 --- a/conductor/conductor/reporting.py +++ b/conductor/conductor/reporting.py @@ -1,3 +1,18 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import xml_code_engine import rabbitmq diff --git a/conductor/conductor/version.py b/conductor/conductor/version.py index 736f2407..f17cad4c 100644 --- a/conductor/conductor/version.py +++ b/conductor/conductor/version.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2012 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/conductor/conductor/windows_agent.py b/conductor/conductor/windows_agent.py index e25b5530..7470228c 100644 --- a/conductor/conductor/windows_agent.py +++ b/conductor/conductor/windows_agent.py @@ -1,29 +1,44 @@ -import xml_code_engine - -from openstack.common import log as logging -log = logging.getLogger(__name__) - - -def send_command(engine, context, body, template, service, host, mappings=None, - result=None, **kwargs): - if not mappings: - mappings = {} - command_dispatcher = context['/commandDispatcher'] - - def callback(result_value): - log.info( - 'Received result from {2} for {0}: {1}'.format( - template, result_value, host)) - if result is not None: - context[result] = result_value['Result'] - - success_handler = body.find('success') - if success_handler is not None: - engine.evaluate_content(success_handler, context) - - command_dispatcher.execute( - name='agent', template=template, mappings=mappings, - host=host, service=service, callback=callback) - - +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import xml_code_engine + +from openstack.common import log as logging +log = logging.getLogger(__name__) + + +def send_command(engine, context, body, template, service, host, mappings=None, + result=None, **kwargs): + if not mappings: + mappings = {} + command_dispatcher = context['/commandDispatcher'] + + def callback(result_value): + log.info( + 'Received result from {2} for {0}: {1}'.format( + template, result_value, host)) + if result is not None: + context[result] = result_value['Result'] + + success_handler = body.find('success') + if success_handler is not None: + engine.evaluate_content(success_handler, context) + + command_dispatcher.execute( + name='agent', template=template, mappings=mappings, + host=host, service=service, callback=callback) + + xml_code_engine.XmlCodeEngine.register_function(send_command, "send-command") \ No newline at end of file diff --git a/conductor/conductor/workflow.py b/conductor/conductor/workflow.py index e0f61987..53dde023 100644 --- a/conductor/conductor/workflow.py +++ b/conductor/conductor/workflow.py @@ -1,3 +1,18 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import jsonpath import types import re @@ -124,9 +139,6 @@ class Workflow(object): def _rule_func(match, context, body, engine, limit=0, name=None, **kwargs): position = context['__dataSource_currentPosition'] or [] - # data = context['__dataSource_currentObj'] - # if data is None: - # data = context['/dataSource'] position, match = Workflow._get_relative_position(match, context) data = Workflow._get_path(context['/dataSource'], position) match = re.sub(r'@\.([\w.]+)', diff --git a/conductor/conductor/xml_code_engine.py b/conductor/conductor/xml_code_engine.py index 42c18a1e..8661ead4 100644 --- a/conductor/conductor/xml_code_engine.py +++ b/conductor/conductor/xml_code_engine.py @@ -1,4 +1,18 @@ -#from lxml import etree +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import xml.etree.ElementTree as etree import types @@ -121,15 +135,3 @@ XmlCodeEngine.register_function(_function_func, "function") XmlCodeEngine.register_function(_null_func, "null") XmlCodeEngine.register_function(_true_func, "true") XmlCodeEngine.register_function(_false_func, "false") - - -def xprint(context, body, **kwargs): - print "------------------------ start ------------------------" - for arg in kwargs: - print "%s = %s" % (arg, kwargs[arg]) - print 'context = ', context - print 'body = %s (%s)' % (body, body.text) - print "------------------------- end -------------------------" - - -XmlCodeEngine.register_function(xprint, "print")