fenix/fenix/utils/service.py

218 lines
7.9 KiB
Python

# Copyright (c) 2018 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from importlib import import_module
try:
from importlib.machinery import SourceFileLoader
def source_loader_workflow_instance(mname, mpath, conf, session_id, data):
mi = SourceFileLoader(mname, mpath).load_module()
return mi.Workflow(conf, session_id, data)
except ImportError:
from imp import load_source
def source_loader_workflow_instance(mname, mpath, conf, session_id, data):
mi = load_source(mname, mpath)
return mi.Workflow(conf, session_id, data)
import os
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_service import service
from shutil import rmtree
from uuid import uuid1 as generate_uuid
from fenix import context
from fenix.utils.download import download_url
import fenix.utils.identity_auth
MAX_SESSIONS = 3
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
opts = [
cfg.StrOpt('host',
default="127.0.0.1",
help="API host IP"),
cfg.IntOpt('port',
default=5000,
help="API port to use."),
cfg.IntOpt('project_maintenance_reply',
default=40,
help="Project maintenance reply confirmation time in seconds"),
cfg.IntOpt('project_scale_in_reply',
default=60,
help="Project scale in reply confirmation time in seconds"),
cfg.StrOpt('local_cache_dir',
default="/tmp",
help="Local cache directory"),
cfg.StrOpt('live_migration_retries',
default=5,
help="Number of live migration retries"),
cfg.StrOpt('live_migration_wait_time',
default=600,
help="How long to wait live migration to be done"),
]
CONF.register_opts(fenix.utils.identity_auth.os_opts, group='service_user')
CONF.register_opts(opts)
class RPCClient(object):
def __init__(self, target):
super(RPCClient, self).__init__()
self._client = messaging.RPCClient(
target=target,
transport=messaging.get_rpc_transport(cfg.CONF),
)
def cast(self, name, **kwargs):
ctx = context.current()
self._client.cast(ctx.to_dict(), name, **kwargs)
def call(self, name, **kwargs):
return self._client.call({}, name, **kwargs)
class EngineEndpoint(object):
def __init__(self):
self.workflow_sessions = {}
def _validate_session(self, session_id):
if session_id not in self.workflow_sessions.keys():
return False
return True
def admin_get(self, ctx):
"""Get maintenance workflow sessions"""
LOG.info("EngineEndpoint: admin_get")
return {"sessions": self.workflow_sessions.keys()}
def admin_create_session(self, ctx, data):
"""Create maintenance workflow session thread"""
LOG.info("EngineEndpoint: admin_create_session")
LOG.info("data: %s" % data)
if len(self.workflow_sessions.keys()) == MAX_SESSIONS:
LOG.error("Too many sessions: %d" % MAX_SESSIONS)
return None
session_id = str(generate_uuid())
if "workflow" not in data:
workflow = "fenix.workflow.workflows.default"
else:
workflow = "fenix.workflow.workflows.%s" % data["workflow"]
LOG.info("Workflow plugin module: %s" % workflow)
session_dir = "%s/%s" % (CONF.local_cache_dir, session_id)
os.mkdir(session_dir)
if "download" in data:
os.mkdir(session_dir + "/workflow")
os.mkdir(session_dir + "/actions")
for url in data["download"]:
download_url(session_dir, str(url))
try:
wf_plugin = getattr(import_module(workflow), 'Workflow')
self.workflow_sessions[session_id] = wf_plugin(CONF,
session_id,
data)
except ImportError:
download_plugin_dir = session_dir + "/workflow/"
download_plugin_file = "%s/%s.py" % (download_plugin_dir,
data["workflow"])
if os.path.isfile(download_plugin_file):
self.workflow_sessions[session_id] = (
source_loader_workflow_instance(workflow,
download_plugin_file,
CONF,
session_id,
data))
else:
raise Exception('%s: could not find workflow plugin %s' %
(self.session_id, data["workflow"]))
self.workflow_sessions[session_id].start()
return {"session_id": session_id}
def admin_get_session(self, ctx, session_id):
"""Get maintenance workflow session details"""
if not self._validate_session(session_id):
return None
LOG.info("EngineEndpoint: admin_get_session")
return ({"session_id": session_id, "state":
self.workflow_sessions[session_id].session.state})
def admin_delete_session(self, ctx, session_id):
"""Delete maintenance workflow session thread"""
LOG.info("EngineEndpoint: admin_delete_session")
self.workflow_sessions[session_id].cleanup()
self.workflow_sessions[session_id].stop()
self.workflow_sessions.pop(session_id)
session_dir = "%s/%s" % (CONF.local_cache_dir, session_id)
rmtree(session_dir)
return {}
def admin_update_session(self, ctx, session_id, data):
"""Update maintenance workflow session"""
LOG.info("EngineEndpoint: admin_update_session")
# TBD Update data to workflow and return updated data
return data
def project_get_session(self, ctx, session_id, project_id):
"""Get maintenance workflow session project specific details"""
if not self._validate_session(session_id):
return None
LOG.info("EngineEndpoint: project_get_session")
instance_ids = (self.workflow_sessions[session_id].
state_instance_ids(project_id))
return {"instance_ids": instance_ids}
def project_update_session(self, ctx, session_id, project_id, data):
"""Update maintenance workflow session project state"""
LOG.info("EngineEndpoint: project_update_session")
session_obj = self.workflow_sessions[session_id]
project = session_obj.project(project_id)
project.state = data["state"]
if "instance_actions" in data:
session_obj.proj_instance_actions[project_id] = (
data["instance_actions"].copy())
return data
class RPCServer(service.Service):
def __init__(self, target):
super(RPCServer, self).__init__()
self._server = messaging.get_rpc_server(
target=target,
transport=messaging.get_rpc_transport(cfg.CONF),
endpoints=[
EngineEndpoint()],
executor='eventlet',
)
def start(self):
super(RPCServer, self).start()
self.tg.add_thread(self._server.start)
def stop(self):
super(RPCServer, self).stop()
self._server.stop()
def prepare_service(argv=[]):
logging.setup(cfg.CONF, 'fenix')