# Copyright (c) 2018 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. from importlib import import_module try: from importlib.machinery import SourceFileLoader def source_loader_workflow_instance(mname, mpath, conf, session_id, data): mi = SourceFileLoader(mname, mpath).load_module() return mi.Workflow(conf, session_id, data) except ImportError: from imp import load_source def source_loader_workflow_instance(mname, mpath, conf, session_id, data): mi = load_source(mname, mpath) return mi.Workflow(conf, session_id, data) import os from oslo_config import cfg from oslo_log import log as logging import oslo_messaging as messaging from oslo_service import service from shutil import rmtree from uuid import uuid1 as generate_uuid from fenix import context from fenix.utils.download import download_url import fenix.utils.identity_auth MAX_SESSIONS = 3 LOG = logging.getLogger(__name__) CONF = cfg.CONF opts = [ cfg.StrOpt('host', default="127.0.0.1", help="API host IP"), cfg.IntOpt('port', default=5000, help="API port to use."), cfg.IntOpt('project_maintenance_reply', default=40, help="Project maintenance reply confirmation time in seconds"), cfg.IntOpt('project_scale_in_reply', default=60, help="Project scale in reply confirmation time in seconds"), cfg.StrOpt('local_cache_dir', default="/tmp", help="Local cache directory"), cfg.StrOpt('live_migration_retries', default=5, help="Number of live migration retries"), cfg.StrOpt('live_migration_wait_time', default=600, help="How long to wait live migration to be done"), ] CONF.register_opts(fenix.utils.identity_auth.os_opts, group='service_user') CONF.register_opts(opts) class RPCClient(object): def __init__(self, target): super(RPCClient, self).__init__() self._client = messaging.RPCClient( target=target, transport=messaging.get_rpc_transport(cfg.CONF), ) def cast(self, name, **kwargs): ctx = context.current() self._client.cast(ctx.to_dict(), name, **kwargs) def call(self, name, **kwargs): return self._client.call({}, name, **kwargs) class EngineEndpoint(object): def __init__(self): self.workflow_sessions = {} def _validate_session(self, session_id): if session_id not in self.workflow_sessions.keys(): return False return True def admin_get(self, ctx): """Get maintenance workflow sessions""" LOG.info("EngineEndpoint: admin_get") return {"sessions": self.workflow_sessions.keys()} def admin_create_session(self, ctx, data): """Create maintenance workflow session thread""" LOG.info("EngineEndpoint: admin_create_session") LOG.info("data: %s" % data) if len(self.workflow_sessions.keys()) == MAX_SESSIONS: LOG.error("Too many sessions: %d" % MAX_SESSIONS) return None session_id = str(generate_uuid()) if "workflow" not in data: workflow = "fenix.workflow.workflows.default" else: workflow = "fenix.workflow.workflows.%s" % data["workflow"] LOG.info("Workflow plugin module: %s" % workflow) session_dir = "%s/%s" % (CONF.local_cache_dir, session_id) os.mkdir(session_dir) if "download" in data: os.mkdir(session_dir + "/workflow") os.mkdir(session_dir + "/actions") for url in data["download"]: download_url(session_dir, str(url)) try: wf_plugin = getattr(import_module(workflow), 'Workflow') self.workflow_sessions[session_id] = wf_plugin(CONF, session_id, data) except ImportError: download_plugin_dir = session_dir + "/workflow/" download_plugin_file = "%s/%s.py" % (download_plugin_dir, data["workflow"]) if os.path.isfile(download_plugin_file): self.workflow_sessions[session_id] = ( source_loader_workflow_instance(workflow, download_plugin_file, CONF, session_id, data)) else: raise Exception('%s: could not find workflow plugin %s' % (self.session_id, data["workflow"])) self.workflow_sessions[session_id].start() return {"session_id": session_id} def admin_get_session(self, ctx, session_id): """Get maintenance workflow session details""" if not self._validate_session(session_id): return None LOG.info("EngineEndpoint: admin_get_session") return ({"session_id": session_id, "state": self.workflow_sessions[session_id].session.state}) def admin_delete_session(self, ctx, session_id): """Delete maintenance workflow session thread""" LOG.info("EngineEndpoint: admin_delete_session") self.workflow_sessions[session_id].cleanup() self.workflow_sessions[session_id].stop() self.workflow_sessions.pop(session_id) session_dir = "%s/%s" % (CONF.local_cache_dir, session_id) rmtree(session_dir) return {} def admin_update_session(self, ctx, session_id, data): """Update maintenance workflow session""" LOG.info("EngineEndpoint: admin_update_session") # TBD Update data to workflow and return updated data return data def project_get_session(self, ctx, session_id, project_id): """Get maintenance workflow session project specific details""" if not self._validate_session(session_id): return None LOG.info("EngineEndpoint: project_get_session") instance_ids = (self.workflow_sessions[session_id]. state_instance_ids(project_id)) return {"instance_ids": instance_ids} def project_update_session(self, ctx, session_id, project_id, data): """Update maintenance workflow session project state""" LOG.info("EngineEndpoint: project_update_session") session_obj = self.workflow_sessions[session_id] project = session_obj.project(project_id) project.state = data["state"] if "instance_actions" in data: session_obj.proj_instance_actions[project_id] = ( data["instance_actions"].copy()) return data class RPCServer(service.Service): def __init__(self, target): super(RPCServer, self).__init__() self._server = messaging.get_rpc_server( target=target, transport=messaging.get_rpc_transport(cfg.CONF), endpoints=[ EngineEndpoint()], executor='eventlet', ) def start(self): super(RPCServer, self).start() self.tg.add_thread(self._server.start) def stop(self): super(RPCServer, self).stop() self._server.stop() def prepare_service(argv=[]): logging.setup(cfg.CONF, 'fenix')