murano-agent/muranoagent/app.py

206 lines
7.5 KiB
Python

# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import win32
import sys
import os
from execution_plan_runner import ExecutionPlanRunner
from execution_plan_queue import ExecutionPlanQueue
from execution_result import ExecutionResult
from openstack.common import log as logging
from openstack.common import service
from muranoagent.common.config import CONF
from muranoagent.common.messaging import MqClient, Message
from exceptions import AgentException
from time import sleep
from bunch import Bunch
import semver
import types
log = logging.getLogger(__name__)
format_version = '2.0.0'
class MuranoAgent(service.Service):
def __init__(self):
self._queue = ExecutionPlanQueue()
super(MuranoAgent, self).__init__()
@staticmethod
def _load_package(name):
try:
log.debug('Loading plugin %s', name)
__import__(name)
except Exception:
log.warn('Cannot load package %s', name, exc_info=True)
pass
def _load(self):
path = os.path.join(os.path.dirname(__file__), 'executors')
sys.path.insert(1, path)
for entry in os.listdir(path):
package_path = os.path.join(path, entry)
if os.path.isdir(package_path):
MuranoAgent._load_package(entry)
def start(self):
self._load()
msg_iterator = self._wait_plan()
while True:
try:
self._loop_func(msg_iterator)
except Exception as ex:
log.exception(ex)
sleep(5)
def _loop_func(self, msg_iterator):
result, timestamp = self._queue.get_execution_plan_result()
if result is not None:
if self._send_result(result):
self._queue.remove(timestamp)
return
plan = self._queue.get_execution_plan()
if plan is not None:
self._run(plan)
return
msg_iterator.next()
def _run(self, plan):
with ExecutionPlanRunner(plan) as runner:
try:
result = runner.run()
execution_result = ExecutionResult.from_result(result, plan)
self._queue.put_execution_result(execution_result, plan)
except Exception, ex:
execution_result = ExecutionResult.from_error(ex, plan)
self._queue.put_execution_result(execution_result, plan)
def _send_result(self, result):
with self._create_rmq_client() as mq:
msg = Message()
msg.body = result
msg.id = result.get('SourceID')
mq.send(message=msg,
key=CONF.rabbitmq.result_routing_key,
exchange=CONF.rabbitmq.result_exchange)
return True
def _create_rmq_client(self):
rabbitmq = CONF.rabbitmq
connection_params = {
'login': rabbitmq.login,
'password': rabbitmq.password,
'host': rabbitmq.host,
'port': rabbitmq.port,
'virtual_host': rabbitmq.virtual_host,
'ssl': rabbitmq.ssl,
'ca_certs': rabbitmq.ca_certs.strip() or None
}
return MqClient(**connection_params)
def _wait_plan(self):
delay = 5
while True:
try:
with self._create_rmq_client() as mq:
with mq.open(CONF.rabbitmq.input_queue,
prefetch_count=1) as subscription:
while True:
msg = subscription.get_message(timeout=5)
if msg is not None and isinstance(msg.body, dict):
self._handle_message(msg)
if msg is not None:
msg.ack()
yield
delay = 5
except KeyboardInterrupt:
break
except Exception:
log.warn('Communication error', exc_info=True)
sleep(delay)
delay = min(delay * 1.2, 60)
def _handle_message(self, msg):
print msg.body
if 'ID' not in msg.body and msg.id:
msg.body['ID'] = msg.id
err = self._verify_plan(msg.body)
if err is None:
self._queue.put_execution_plan(msg.body)
else:
try:
execution_result = ExecutionResult.from_error(
err, Bunch(msg.body))
self._send_result(execution_result)
except ValueError:
log.warn('Execution result is not produced')
def _verify_plan(self, plan):
plan_format_version = plan.get('FormatVersion', '1.0.0')
if semver.compare(plan_format_version, '2.0.0') > 0 or \
semver.compare(plan_format_version, format_version) < 0:
range_str = 'in range 2.0.0-{0}'.format(plan_format_version) \
if format_version != '2.0.0' \
else 'equal to {0}'.format(format_version)
return AgentException(
3,
'Unsupported format version {0} (must be {1})'.format(
plan_format_version, range_str))
for attr in ('Scripts', 'Files', 'Options'):
if attr is plan and not isinstance(
plan[attr], types.DictionaryType):
return AgentException(
2, '{0} is not a dictionary'.format(attr))
for name, script in plan.get('Scripts', {}).items():
for attr in ('Type', 'EntryPoint'):
if attr not in script or not isinstance(
script[attr], types.StringTypes):
return AgentException(
2, 'Incorrect {0} entry in script {1}'.format(
attr, name))
if not isinstance(script.get('Options', {}), types.DictionaryType):
return AgentException(
2, 'Incorrect Options entry in script {0}'.format(name))
if script['EntryPoint'] not in plan.get('Files', {}):
return AgentException(
2, 'Script {0} misses entry point {1}'.format(
name, script['EntryPoint']))
for additional_file in script.get('Files', []):
if additional_file not in plan.get('Files', {}):
return AgentException(
2, 'Script {0} misses file {1}'.format(
name, additional_file))
for key, plan_file in plan.get('Files', {}).items():
for attr in ('BodyType', 'Body', 'Name'):
if attr not in plan_file:
return AgentException(
2, 'Incorrect {0} entry in file {1}'.format(
attr, key))
if plan_file['BodyType'] not in ('Text', 'Base64'):
return AgentException(
2, 'Incorrect BodyType in file {1}'.format(key))
return None