diff --git a/fenix/api/__init__.py b/fenix/api/__init__.py new file mode 100644 index 0000000..5996bfb --- /dev/null +++ b/fenix/api/__init__.py @@ -0,0 +1,52 @@ +# Copyright (c) 2018 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from flask import Flask +from oslo_config import cfg +from oslo_log import log as logging +import sys + +from fenix.api import v1 + + +LOG = logging.getLogger(__name__) + +api_opts = [ + cfg.StrOpt('api_config', + default="api.conf", + help="Configuration file for API service."), + cfg.StrOpt('host', + default="127.0.0.1", + help="API host IP"), + cfg.IntOpt('port', + default=5000, + help="API port to use."), +] + +CONF = cfg.CONF +CONF.register_opts(api_opts) +logging.register_options(cfg.CONF) +cfg.CONF(sys.argv[1:], project='fenix', prog='fenix-api') + + +def create_app(global_config, **local_config): + return setup_app() + + +def setup_app(config=None): + app = Flask(__name__, static_folder=None) + app.config.update(PROPAGATE_EXCEPTIONS=True) + app.register_blueprint(v1.bp, url_prefix='/v1') + return app diff --git a/fenix/api/v1/__init__.py b/fenix/api/v1/__init__.py new file mode 100644 index 0000000..5879977 --- /dev/null +++ b/fenix/api/v1/__init__.py @@ -0,0 +1,26 @@ +# Copyright (c) 2018 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from flask import Blueprint +import flask_restful as restful + +from fenix.api.v1.routes import routes + + +bp = Blueprint('v1', __name__) +api = restful.Api(bp, catch_all_404s=True) + +for route in routes: + api.add_resource(route.pop('resource'), *route.pop('urls'), **route) diff --git a/fenix/api/v1/base.py b/fenix/api/v1/base.py new file mode 100644 index 0000000..9a6814e --- /dev/null +++ b/fenix/api/v1/base.py @@ -0,0 +1,52 @@ +# Copyright (c) 2018 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import decorator +import flask +import flask_restful as restful +import inspect +import json +import re + +SORT_KEY_SPLITTER = re.compile('[ ,]') + + +class Resource(restful.Resource): + method_decorators = [] + + def error_response(self, status_code, message): + body = json.dumps( + { + 'status': status_code, + 'message': message + }, + ) + resp = flask.make_response("{body}\n".format(body=body)) + resp.status_code = status_code + return resp + + +@decorator.decorator +def http_codes(f, *args, **kwargs): + try: + return f(*args, **kwargs) + except Exception as err: + try: + inspect.getmodule(f).LOG.error( + 'Error during %s: %s' % (f.__qualname__, err)) + except AttributeError: + inspect.getmodule(f).LOG.error( + 'Error during %s: %s' % (f.__name__, err)) + return args[0].error_response(500, 'Unknown Error') diff --git a/fenix/api/v1/maintenance.py b/fenix/api/v1/maintenance.py new file mode 100644 index 0000000..afd8a33 --- /dev/null +++ b/fenix/api/v1/maintenance.py @@ -0,0 +1,155 @@ +# Copyright (c) 2018 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from flask import request +import json +from oslo_log import log +from oslo_serialization import jsonutils + +from fenix.api.v1 import base +from fenix import engine +from fenix.utils import service + +LOG = log.getLogger(__name__) + + +class EngineRPCAPI(service.RPCClient): + BASE_RPC_API_VERSION = '1.0' + + def __init__(self): + """Initiate RPC API client with needed topic and RPC version.""" + super(EngineRPCAPI, self).__init__(engine.get_target()) + + def admin_get(self): + """Get maintenance workflow sessions""" + return self.call('admin_get') + + def admin_create_session(self, data): + """Create maintenance workflow session thread""" + return self.call('admin_create_session', data=data) + + def admin_get_session(self, session_id): + """Get maintenance workflow session details""" + return self.call('admin_get_session', session_id=session_id) + + def admin_delete_session(self, session_id): + """Delete maintenance workflow session thread""" + return self.call('admin_delete_session', session_id=session_id) + + def admin_update_session(self, session_id): + """Update maintenance workflow session""" + return self.call('admin_update_session', session_id=session_id) + + def project_get_session(self, session_id, project_id): + """Get maintenance workflow session project specific details""" + return self.call('project_get_session', session_id=session_id, + project_id=project_id) + + def project_update_session(self, session_id, project_id, data): + """Update maintenance workflow session project state""" + return self.call('project_update_session', session_id=session_id, + project_id=project_id, data=data) + + +class Maintenance(base.Resource): + + def __init__(self): + self.engine_rpcapi = EngineRPCAPI() + + @base.http_codes + def get(self): + if request.data: + LOG.error("Unexpected data") + return {}, 400, None + LOG.info("admin get") + LOG.info("self.engine_rpcapi.admin_get") + sessions = self.engine_rpcapi.admin_get() + LOG.info("return: %s" % sessions) + return jsonutils.to_primitive(sessions), 200, None + + @base.http_codes + def post(self): + LOG.info("admin post: first") + data = json.loads(request.data.decode('utf8')) + LOG.info("admin post: %s" % data) + session = self.engine_rpcapi.admin_create_session(data) + if session is None: + return {"error": "Too many sessions"}, 509, None + LOG.info("return: %s" % session) + return jsonutils.to_primitive(session), 200, None + + +class MaintenanceSession(base.Resource): + + def __init__(self): + self.engine_rpcapi = EngineRPCAPI() + + @base.http_codes + def get(self, session_id=None): + if request.data: + LOG.error("Unexpected data") + return {}, 400, None + LOG.info("admin session_id get") + session = self.engine_rpcapi.admin_get_session(session_id) + if session is None: + return {"error": "Invalid session"}, 404, None + return jsonutils.to_primitive(session), 200, None + + @base.http_codes + def put(self, session_id=None): + data = json.loads(request.data.decode('utf8')) + LOG.info("admin session_id put: %s" % data) + engine_data = self.engine_rpcapi.admin_update_session(session_id) + LOG.info("engine_data: %s" % engine_data) + response_body = {'maintenance': request.base_url, + 'session_id': session_id} + return jsonutils.to_primitive(response_body), 200, None + + @base.http_codes + def delete(self, session_id=None): + if request.data: + LOG.error("Unexpected data") + return {}, 400, None + LOG.info("admin session_id delete") + ret = self.engine_rpcapi.admin_delete_session(session_id) + LOG.info("return: %s" % ret) + return jsonutils.to_primitive(ret), 200, None + + +class MaintenanceSessionProject(base.Resource): + + def __init__(self): + self.engine_rpcapi = EngineRPCAPI() + + @base.http_codes + def get(self, session_id=None, project_id=None): + if request.data: + LOG.error("Unexpected data") + return {}, 400, None + LOG.info("%s_get" % project_id) + engine_data = self.engine_rpcapi.project_get_session(session_id, + project_id) + LOG.info("engine_data: %s" % engine_data) + return jsonutils.to_primitive(engine_data), 200, None + + @base.http_codes + def put(self, session_id=None, project_id=None): + data = json.loads(request.data.decode('utf8')) + LOG.info("%s_put: %s" % (project_id, data)) + engine_data = self.engine_rpcapi.project_update_session(session_id, + project_id, + data) + LOG.info("engine_data: %s" % engine_data) + return jsonutils.to_primitive(engine_data), 200, None diff --git a/fenix/api/v1/routes.py b/fenix/api/v1/routes.py new file mode 100644 index 0000000..fc22fbf --- /dev/null +++ b/fenix/api/v1/routes.py @@ -0,0 +1,28 @@ +# Copyright (c) 2018 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from fenix.api.v1 import maintenance + +routes = [ + dict(resource=maintenance.Maintenance, + urls=['/maintenance'], + endpoint='maintenance'), + dict(resource=maintenance.MaintenanceSession, + urls=['/maintenance/'], + endpoint='maintenance/'), + dict(resource=maintenance.MaintenanceSessionProject, + urls=['/maintenance//'], + endpoint='maintenance//'), +] diff --git a/fenix/cmd/api.py b/fenix/cmd/api.py new file mode 100644 index 0000000..53918bd --- /dev/null +++ b/fenix/cmd/api.py @@ -0,0 +1,37 @@ +# Copyright (c) 2018 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from wsgiref import simple_server + +from oslo_config import cfg +from oslo_log import log as logging + +from fenix import api + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +def main(): + app = api.setup_app() + host, port = cfg.CONF.host, cfg.CONF.port + LOG.info("host %s port %s" % (host, port)) + srv = simple_server.make_server(host, port, app) + LOG.info("fenix-api started") + srv.serve_forever() + + +if __name__ == "__main__": + main() diff --git a/fenix/cmd/engine.py b/fenix/cmd/engine.py new file mode 100644 index 0000000..07ac42f --- /dev/null +++ b/fenix/cmd/engine.py @@ -0,0 +1,39 @@ +# Copyright (c) 2018 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +eventlet.monkey_patch() + +import sys + +from oslo_config import cfg +from oslo_service import service + +from fenix.engine import service as engine_service +from fenix.utils import service as service_utils + + +def main(): + cfg.CONF(project='fenix', prog='fenix-engine') + service_utils.prepare_service(sys.argv) + + service.launch( + cfg.CONF, + engine_service.EngineService() + ).wait() + + +if __name__ == '__main__': + main() diff --git a/fenix/context.py b/fenix/context.py new file mode 100644 index 0000000..18c5524 --- /dev/null +++ b/fenix/context.py @@ -0,0 +1,102 @@ +# Copyright (c) 2018 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import threading + + +class BaseContext(object): + + _elements = set() + _context_stack = threading.local() + + def __init__(self, __mapping=None, **kwargs): + if __mapping is None: + self.__values = dict(**kwargs) + else: + if isinstance(__mapping, BaseContext): + __mapping = __mapping.__values + self.__values = dict(__mapping) + self.__values.update(**kwargs) + not_supported_keys = set(self.__values) - self._elements + for k in not_supported_keys: + del self.__values[k] + + def __getattr__(self, name): + try: + return self.__values[name] + except KeyError: + if name in self._elements: + return None + else: + raise AttributeError(name) + + def __setattr__(self, name, value): + # NOTE(yorik-sar): only the very first assignment for __values is + # allowed. All context arguments should be set at the time the context + # object is being created. + if not self.__dict__: + super(BaseContext, self).__setattr__(name, value) + else: + raise Exception(self.__dict__, name, value) + + def __enter__(self): + try: + stack = self._context_stack.stack + except AttributeError: + stack = [] + self._context_stack.stack = stack + stack.append(self) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + res = self._context_stack.stack.pop() + assert res is self, "self should be the top element of the stack" + + @classmethod + def current(cls): + try: + return cls._context_stack.stack[-1] + except (AttributeError, IndexError): + raise RuntimeError("Context isn't available here") + + # NOTE(yorik-sar): as long as oslo.rpc requires this + def to_dict(self): + return self.__values + + +class FenixContext(BaseContext): + + _elements = set([ + "user_id", + "project_id", + "auth_token", + "service_catalog", + "user_name", + "project_name", + "roles", + "is_admin", + ]) + + @classmethod + def elevated(cls): + try: + ctx = cls.current() + except RuntimeError: + ctx = None + return cls(ctx, is_admin=True) + + +def current(): + return FenixContext.current() diff --git a/fenix/engine/__init__.py b/fenix/engine/__init__.py new file mode 100644 index 0000000..adb0ff1 --- /dev/null +++ b/fenix/engine/__init__.py @@ -0,0 +1,39 @@ +# Copyright (c) 2018 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg +from oslo_log import log as logging +import oslo_messaging as messaging + +opts = [ + cfg.StrOpt('rpc_topic', + default='fenix.engine', + help='fenix-engine messages'), + cfg.StrOpt('host', + default="127.0.0.1", + help="API host IP"), +] + +CONF = cfg.CONF + +CONF.register_opts(opts, 'engine') +logging.register_options(cfg.CONF) +RPC_API_VERSION = '1.0' + + +def get_target(): + return messaging.Target(topic=CONF.engine.rpc_topic, + version=RPC_API_VERSION, + server=CONF.engine.host) diff --git a/fenix/engine/service.py b/fenix/engine/service.py new file mode 100644 index 0000000..117f0f2 --- /dev/null +++ b/fenix/engine/service.py @@ -0,0 +1,46 @@ +# Copyright (c) 2018 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg +from oslo_log import log as logging + +from fenix import engine +from fenix.utils import service as service_utils + +opts = [ + cfg.IntOpt('wait_project_reply', + default=120, + help='Wait for project reply after message sent to project '), +] + +CONF = cfg.CONF +CONF.register_opts(opts, 'engine') +LOG = logging.getLogger(__name__) + + +class EngineService(service_utils.RPCServer): + """Service class for the fenix-engine service. + + Responsible for Fenix workflow + """ + + def __init__(self): + target = engine.get_target() + super(EngineService, self).__init__(target) + LOG.error("EngineService init") + + def start(self): + super(EngineService, self).start() + LOG.error("EngineService start") diff --git a/fenix/utils/__init__.py b/fenix/utils/__init__.py new file mode 100644 index 0000000..34b90cb --- /dev/null +++ b/fenix/utils/__init__.py @@ -0,0 +1,33 @@ +# Copyright (c) 2018 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import functools + + +class LazyProxy(object): + + def __init__(self, klass, *args, **kwargs): + self.klass = klass + self.args = args + self.kwargs = kwargs + self.instance = None + + def __getattr__(self, name): + return functools.partial(self.__run_method, name) + + def __run_method(self, __name, *args, **kwargs): + if self.instance is None: + self.instance = self.klass(*self.args, **self.kwargs) + return getattr(self.instance, __name)(*args, **kwargs) diff --git a/fenix/utils/identity_auth.py b/fenix/utils/identity_auth.py new file mode 100644 index 0000000..328be32 --- /dev/null +++ b/fenix/utils/identity_auth.py @@ -0,0 +1,51 @@ +# Copyright (c) 2018 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os + +from keystoneauth1 import loading +from keystoneauth1 import session + + +def get_identity_auth(username=None, password=None, project=None): + auth_url = os.environ['OS_AUTH_URL'] + username = username or os.environ['OS_USERNAME'] + password = password or os.environ['OS_PASSWORD'] + user_domain_name = os.environ.get('OS_USER_DOMAIN_NAME') or 'Default' + user_domain_id = os.environ.get('OS_USER_DOMAIN_ID') or 'default' + project_name = (project or os.environ.get('OS_PROJECT_NAME') or + os.environ.get('OS_TENANT_NAME')) + project_domain_name = os.environ.get('OS_PROJECT_DOMAIN_NAME') or 'Default' + project_domain_id = os.environ.get('OS_PROJECT_DOMAIN_ID') or 'default' + + loader = loading.get_plugin_loader('password') + return loader.load_from_options( + auth_url=auth_url, + username=username, + password=password, + user_domain_name=user_domain_name, + user_domain_id=user_domain_id, + project_name=project_name, + tenant_name=project_name, + project_domain_name=project_domain_name, + project_domain_id=project_domain_id) + + +def get_session(auth=None): + """Get a user credentials auth session.""" + if auth is None: + auth = get_identity_auth() + return session.Session(auth=auth, + verify=os.environ.get('OS_CACERT')) diff --git a/fenix/utils/service.py b/fenix/utils/service.py new file mode 100644 index 0000000..5e46665 --- /dev/null +++ b/fenix/utils/service.py @@ -0,0 +1,175 @@ +# Copyright (c) 2018 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from importlib import import_module +import os +from oslo_config import cfg +from oslo_log import log as logging +import oslo_messaging as messaging +from oslo_service import service +from uuid import uuid1 as generate_uuid + +from fenix import context + +MAX_SESSIONS = 3 + +LOG = logging.getLogger(__name__) + +CONF = cfg.CONF + +opts = [ + cfg.StrOpt('host', + default="127.0.0.1", + help="API host IP"), + cfg.IntOpt('port', + default=5000, + help="API port to use."), + cfg.StrOpt('workflow_user', + default=os.environ.get('OS_USERNAME', 'admin'), + help="API host IP"), + cfg.StrOpt('workflow_password', + default=os.environ.get('OS_PASSWORD', 'admin'), + help="API host IP"), + cfg.StrOpt('workflow_project', + default=os.environ.get('OS_PROJECT_NAME', 'admin'), + help="API host IP"), + cfg.IntOpt('project_maintenance_reply', + default=20, + help="Project maintenance reply confirmation time in seconds"), + cfg.IntOpt('project_scale_in_reply', + default=60, + help="Project scale in reply confirmation time in seconds"), +] + +CONF.register_opts(opts) + + +class RPCClient(object): + def __init__(self, target): + super(RPCClient, self).__init__() + self._client = messaging.RPCClient( + target=target, + transport=messaging.get_rpc_transport(cfg.CONF), + ) + + def cast(self, name, **kwargs): + ctx = context.current() + self._client.cast(ctx.to_dict(), name, **kwargs) + + def call(self, name, **kwargs): + return self._client.call({}, name, **kwargs) + + +class EngineEndpoint(object): + + def __init__(self): + self.workflow_sessions = {} + + def _validate_session(self, session_id): + if session_id not in self.workflow_sessions.keys(): + return False + return True + + def admin_get(self, ctx): + """Get maintenance workflow sessions""" + LOG.info("EngineEndpoint: admin_get") + return {'sessions': self.workflow_sessions.keys()} + + def admin_create_session(self, ctx, data): + """Create maintenance workflow session thread""" + LOG.info("EngineEndpoint: admin_create_session") + LOG.info("data: %s" % data) + if len(self.workflow_sessions.keys()) == MAX_SESSIONS: + LOG.error("Too many sessions: %d" % MAX_SESSIONS) + return None + session_id = str(generate_uuid()) + if "workflow" not in data: + workflow = "fenix.workflow.workflows.default" + else: + workflow = "fenix.workflow.workflows.%s" % data["workflow"] + LOG.info("Workflow plugin module: %s" % workflow) + wf_plugin = getattr(import_module(workflow), 'Workflow') + self.workflow_sessions[session_id] = ( + wf_plugin(CONF, + session_id, + data)) + self.workflow_sessions[session_id].start() + return {'session_id': session_id} + + def admin_get_session(self, ctx, session_id): + """Get maintenance workflow session details""" + if not self._validate_session(session_id): + return None + LOG.info("EngineEndpoint: admin_get_session") + return ({'session_id': session_id, 'state': + self.workflow_sessions[session_id].state}) + + def admin_delete_session(self, ctx, session_id): + """Delete maintenance workflow session thread""" + LOG.info("EngineEndpoint: admin_delete_session") + self.workflow_sessions[session_id].cleanup() + self.workflow_sessions[session_id].stop() + self.workflow_sessions.pop(session_id) + return {} + + def admin_update_session(self, ctx, session_id): + """Update maintenance workflow session""" + LOG.info("EngineEndpoint: admin_update_session") + return {'session_id': session_id} + + def project_get_session(self, ctx, session_id, project_id): + """Get maintenance workflow session project specific details""" + if not self._validate_session(session_id): + return None + LOG.info("EngineEndpoint: project_get_session") + instance_ids = (self.workflow_sessions[session_id].session_data. + instance_ids_by_project(project_id)) + return {'instance_ids': instance_ids} + + def project_update_session(self, ctx, session_id, project_id, data): + """Update maintenance workflow session project state""" + LOG.info("EngineEndpoint: project_update_session") + session_data = self.workflow_sessions[session_id].session_data + project = session_data.project(project_id) + project.state = data["state"] + if 'instance_actions' in data: + session_data.proj_instance_actions[project_id] = ( + data['instance_actions'].copy()) + return data + + +class RPCServer(service.Service): + + def __init__(self, target): + super(RPCServer, self).__init__() + self._server = messaging.get_rpc_server( + target=target, + transport=messaging.get_rpc_transport(cfg.CONF), + endpoints=[ + EngineEndpoint()], + executor='eventlet', + ) + + def start(self): + super(RPCServer, self).start() + self.tg.add_thread(self._server.start) + + def stop(self): + super(RPCServer, self).stop() + self._server.stop() + + +def prepare_service(argv=[]): + logging.setup(cfg.CONF, 'fenix') diff --git a/fenix/workflow/__init__.py b/fenix/workflow/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fenix/workflow/workflow.py b/fenix/workflow/workflow.py new file mode 100644 index 0000000..1a8726b --- /dev/null +++ b/fenix/workflow/workflow.py @@ -0,0 +1,191 @@ +# Copyright (c) 2018 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log as logging +from oslo_service import threadgroup +from threading import Thread +import time + + +LOG = logging.getLogger(__name__) + + +class Instance(object): + + def __init__(self, project, instance_id, instance_name, host, ha=False): + self.project = project + self.instance_id = instance_id + self.instance_name = instance_name + self.host = host + self.ha = ha + + def __str__(self): + return "%s: %s" % (self.instance_id, self.instance_name) + + def is_on_host(self, host): + if self.host == host: + return True + else: + return False + + +class Project(object): + + def __init__(self, name): + self.name = name + self.state = None + + +class SessionData(object): + + def __init__(self, data): + self.projects = [] + self.hosts = data['hosts'] + self.maintenance_at = str(data['maintenance_at']) + self.metadata = data['metadata'] + self.instances = [] + self.maintained_hosts = [] + self.proj_instance_actions = {} + + def get_empty_hosts(self): + empty_hosts = list(self.hosts) + ([empty_hosts.remove(instance.host) for instance in + self.instances if instance.host in empty_hosts]) + return empty_hosts + + def add_instance(self, project, instance_id, instance_name, host, + ha=False): + if host not in self.hosts: + LOG.error('%s: instance %s in invalid host ' % + (self.session_id, instance_id, host)) + if project not in self.project_names(): + self.projects.append(Project(project)) + self.instances.append(Instance(project, instance_id, instance_name, + host, ha)) + + def project(self, name): + return ([project for project in self.projects if + project.name == name][0]) + + def project_names(self): + return [project.name for project in self.projects] + + def set_projets_state(self, state): + for project in self.projects: + project.state = state + + def instances_by_project(self, project): + return [instance for instance in self.instances if + instance.project == project] + + def instance_ids_by_project(self, project): + return [instance.instance_id for instance in self.instances if + instance.project == project] + + def instances_by_host_and_project(self, host, project): + return [instance for instance in self.instances + if instance.host == host + and instance.project == project] + + def instance_action_by_project_reply(self, project, instance_id): + return self.proj_instance_actions[project][instance_id] + + def __str__(self): + info = 'Instance info:\n' + for host in self.hosts: + info += ('%s:\n' % host) + for project in self.project_names(): + instances = self.instances_by_host_and_project(host, project) + if instances: + info += (' %s:\n' % project) + for instance in instances: + info += (' %s\n' % instance) + return info + + +class BaseWorkflow(Thread): + + def __init__(self, conf, session_id, data): + Thread.__init__(self) + self.conf = conf + self.session_id = session_id + self.stopped = False + self.thg = threadgroup.ThreadGroup() + self.timer = {} + self.state = 'MAINTENANCE' + self.session_data = SessionData(data) + self.states_methods = {'MAINTENANCE': 'maintenance', + 'SCALE_IN': 'scale_in', + 'PREPARE_MAINTENANCE': 'prepare_maintenance', + 'START_MAINTENANCE': 'start_maintenance', + 'PLANNED_MAINTENANCE': 'planned_maintenance', + 'MAINTENANCE_COMPLETE': 'maintenance_complete', + 'MAINTENANCE_DONE': 'maintenance_done', + 'FAILED': 'maintenance_failed'} + + def _timer_expired(self, name): + LOG.info("%s: timer expired %s" % (self.session_id, name)) + if name in self.timer.keys(): + self.timer[name].stop() + self.thg.timer_done(self.timer[name]) + self.timer.pop(name) + + def is_timer_expired(self, name): + if name in self.timer.keys(): + return False + else: + return True + + def stop_timer(self, name): + LOG.info("%s: stop_timer %s" % (self.session_id, name)) + if name in self.timer.keys(): + self.timer[name].stop() + self.thg.timer_done(self.timer[name]) + self.timer.pop(name) + + def start_timer(self, delay, name): + LOG.info("%s: start_timer %s" % (self.session_id, name)) + if name in self.timer.keys(): + LOG.error("%s: timer exist!" % self.session_id) + else: + self.timer[name] = (self.thg.add_timer(delay, + self._timer_expired, + delay, + name)) + + def cleanup(self): + LOG.info("%s: cleanup" % self.session_id) + + def stop(self): + LOG.info("%s: stop" % self.session_id) + self.stopped = True + + def maintenance(self): + LOG.error("%s: maintenance method not implemented!" % self.session_id) + + def maintenance_failed(self): + LOG.error("%s: maintenance_failed method not implemented!" % + self.session_id) + + def run(self): + LOG.info("%s: started" % self.session_id) + while not self.stopped: + if self.state != "MAINTENANCE_DONE" and self.state != "FAILED": + statefunc = getattr(self, self.states_methods[self.state]) + statefunc() + else: + time.sleep(1) + # IDLE while session removed + LOG.info("%s: done" % self.session_id) diff --git a/fenix/workflow/workflows/__init__.py b/fenix/workflow/workflows/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fenix/workflow/workflows/default.py b/fenix/workflow/workflows/default.py new file mode 100644 index 0000000..50615d6 --- /dev/null +++ b/fenix/workflow/workflows/default.py @@ -0,0 +1,678 @@ +# Copyright (c) 2018 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +from novaclient.exceptions import BadRequest +from oslo_log import log as logging +import oslo_messaging as messaging +import time + +from fenix.utils.identity_auth import get_identity_auth +from fenix.utils.identity_auth import get_session + +import aodhclient.client as aodhclient +import novaclient.client as novaclient + + +from fenix.workflow.workflow import BaseWorkflow + +LOG = logging.getLogger(__name__) + + +class Workflow(BaseWorkflow): + + def __init__(self, conf, session_id, data): + super(Workflow, self).__init__(conf, session_id, data) + self.url = "http://%s:%s" % (conf.host, conf.port) + self.auth = get_identity_auth(conf.workflow_user, + conf.workflow_password, + conf.workflow_project) + self.session = get_session(auth=self.auth) + self.aodh = aodhclient.Client('2', self.session) + self.nova = novaclient.Client(version='2.34', session=self.session) + transport = messaging.get_transport(self.conf) + self.notif_proj = messaging.Notifier(transport, + 'maintenance.planned', + driver='messaging', + topics=['notifications']) + self.notif_proj = self.notif_proj.prepare(publisher_id='fenix') + self.notif_admin = messaging.Notifier(transport, + 'maintenance.host', + driver='messaging', + topics=['notifications']) + self.notif_admin = self.notif_admin.prepare(publisher_id='fenix') + LOG.info("%s: initialized" % self.session_id) + + def cleanup(self): + LOG.info("%s: cleanup" % self.session_id) + + def stop(self): + LOG.info("%s: stop" % self.session_id) + self.stopped = True + + def is_ha_instance(self, instance): + network_interfaces = next(iter(instance.addresses.values())) + for network_interface in network_interfaces: + _type = network_interface.get('OS-EXT-IPS:type') + if _type == "floating": + LOG.info('Instance with floating ip: %s %s' % + (instance.id, instance.name)) + return True + return False + + def initialize_server_info(self): + opts = {'all_tenants': True} + servers = self.nova.servers.list(detailed=True, search_opts=opts) + for server in servers: + try: + host = str(server.__dict__.get('OS-EXT-SRV-ATTR:host')) + project = str(server.tenant_id) + instance_name = str(server.name) + instance_id = str(server.id) + ha = self.is_ha_instance(server) + except Exception: + raise Exception('can not get params from server=%s' % server) + self.session_data.add_instance(project, + instance_id, + instance_name, + host, + ha) + LOG.info(str(self.session_data)) + + def update_server_info(self): + opts = {'all_tenants': True} + servers = self.nova.servers.list(detailed=True, search_opts=opts) + # TBD actually update, not regenerate + self.session_data.instances = [] + for server in servers: + try: + host = str(server.__dict__.get('OS-EXT-SRV-ATTR:host')) + project = str(server.tenant_id) + instance_name = str(server.name) + instance_id = str(server.id) + ha = self.is_ha_instance(server) + except Exception: + raise Exception('can not get params from server=%s' % server) + self.session_data.add_instance(project, + instance_id, + instance_name, + host, + ha) + LOG.info(str(self.session_data)) + + def projects_listen_alarm(self, match_event): + match_projects = ([str(alarm['project_id']) for alarm in + self.aodh.alarm.list() if + str(alarm['event_rule']['event_type']) == + match_event]) + all_projects_match = True + for project in self.session_data.project_names(): + if project not in match_projects: + LOG.error('%s: project %s not ' + 'listening to %s' % + (self.session_id, project, match_event)) + all_projects_match = False + return all_projects_match + + def str_to_datetime(self, dt_str): + mdate, mtime = dt_str.split() + year, month, day = map(int, mdate.split('-')) + hours, minutes, seconds = map(int, mtime.split(':')) + return datetime.datetime(year, month, day, hours, minutes, seconds) + + def reply_time_str(self, wait): + now = datetime.datetime.utcnow() + reply = now - datetime.timedelta( + seconds=wait) + return (reply.strftime('%Y-%m-%d %H:%M:%S')) + + def is_time_after_time(self, before, after): + if type(before) == str: + time_before = self.str_to_datetime(before) + else: + time_before = before + if type(after) == str: + time_after = self.str_to_datetime(after) + else: + time_after = after + if time_before > time_after: + return True + else: + return False + + def _project_notify(self, project_id, instance_ids, allowed_actions, + actions_at, reply_at, state, metadata): + reply_url = '%s/v1/maintenance/%s/%s' % (self.url, + self.session_id, + project_id) + + payload = dict(project_id=project_id, + instance_ids=instance_ids, + allowed_actions=allowed_actions, + state=state, + actions_at=actions_at, + reply_at=reply_at, + session_id=self.session_id, + metadata=metadata, + reply_url=reply_url) + + LOG.info('Sending "maintenance.planned" to project: %s' % payload) + + self.notif_proj.info({'some': 'context'}, 'maintenance.scheduled', + payload) + + def _admin_notify(self, project, host, state, session_id): + payload = dict(project_id=project, host=host, state=state, + session_id=session_id) + + LOG.info('Sending "maintenance.host": %s' % payload) + + self.notif_admin.info({'some': 'context'}, 'maintenance.host', payload) + + def projects_answer(self, state, projects=None): + state_ack = 'ACK_%s' % state + state_nack = 'NACK_%s' % state + if projects: + state_projects = ([p for p in self.session_data.projects if + p.name in projects]) + else: + state_projects = self.session_data.projects + for project in state_projects: + pstate = project.state + if pstate == state: + break + elif pstate == state_ack: + continue + elif pstate == state_nack: + LOG.error('%s: %s from %s' % + (self.session_id, pstate, project.name)) + break + else: + LOG.error('%s: Project %s in invalid state %s' % + (self.session_id, project.name, pstate)) + break + return pstate + + def wait_projects_state(self, state, timer_name, projects=None): + state_ack = 'ACK_%s' % state + state_nack = 'NACK_%s' % state + while not self.is_timer_expired(timer_name): + answer = self.projects_answer(state, projects) + if answer == state: + pass + else: + self.stop_timer(timer_name) + if answer == state_ack: + LOG.info('all projects in: %s' % state_ack) + return True + elif answer == state_nack: + return False + else: + return False + time.sleep(1) + LOG.error('%s: timer %s expired waiting answer to state %s' % + (self.session_id, timer_name, state)) + LOG.error('%s: project states' % self.session_id) + return False + + def confirm_maintenance(self): + allowed_actions = [] + actions_at = self.session_data.maintenance_at + state = 'MAINTENANCE' + self.session_data.set_projets_state(state) + for project in self.session_data.project_names(): + LOG.info('\nMAINTENANCE to project %s\n' % project) + instance_ids = '%s/v1/maintenance/%s/%s' % (self.url, + self.session_id, + project) + reply_at = self.reply_time_str(self.conf.project_maintenance_reply) + if self.is_time_after_time(reply_at, actions_at): + raise Exception('%s: No time for project to' + ' answer in state: %s' % + (self.session_id, state)) + metadata = self.session_data.metadata + self._project_notify(project, instance_ids, allowed_actions, + actions_at, reply_at, state, metadata) + self.start_timer(self.conf.project_maintenance_reply, + 'MAINTENANCE_TIMEOUT') + return self.wait_projects_state(state, 'MAINTENANCE_TIMEOUT') + + def confirm_scale_in(self): + allowed_actions = [] + actions_at = self.reply_time_str(self.conf.project_scale_in_reply) + reply_at = actions_at + state = 'SCALE_IN' + self.session_data.set_projets_state(state) + for project in self.session_data.project_names(): + LOG.info('\nSCALE_IN to project %s\n' % project) + instance_ids = '%s/v1/maintenance/%s/%s' % (self.url, + self.session_id, + project) + metadata = self.session_data.metadata + self._project_notify(project, instance_ids, allowed_actions, + actions_at, reply_at, state, metadata) + self.start_timer(self.conf.project_maintenance_reply, + 'SCALE_IN_TIMEOUT') + return self.wait_projects_state(state, 'SCALE_IN_TIMEOUT') + + def need_scale_in(self): + hvisors = self.nova.hypervisors.list(detailed=True) + prev_vcpus = 0 + free_vcpus = 0 + prev_hostname = '' + LOG.info('checking hypervisors for VCPU capacity') + for hvisor in hvisors: + vcpus = hvisor.__getattr__('vcpus') + vcpus_used = hvisor.__getattr__('vcpus_used') + hostname = hvisor.__getattr__('hypervisor_hostname') + if prev_vcpus != 0 and prev_vcpus != vcpus: + raise Exception('%s: %d vcpus on %s does not match to' + '%d on %s' + % (self.session_id, vcpus, hostname, + prev_vcpus, prev_hostname)) + free_vcpus += vcpus - vcpus_used + prev_vcpus = vcpus + prev_hostname = hostname + if free_vcpus >= vcpus: + # TBD vcpu capacity might be too scattered so moving instances from + # one host to other host still might not succeed. + return False + else: + return True + + def get_free_vcpus_by_host(self, host, hvisors): + hvisor = ([h for h in hvisors if + h.__getattr__('hypervisor_hostname') == host][0]) + vcpus = hvisor.__getattr__('vcpus') + vcpus_used = hvisor.__getattr__('vcpus_used') + return vcpus - vcpus_used + + def find_host_to_be_empty(self): + # Preferrably host with most free vcpus, no ha instances and least + # instances altogether + host_to_be_empty = None + host_nonha_instances = 0 + host_free_vcpus = 0 + hvisors = self.nova.hypervisors.list(detailed=True) + for host in self.session_data.hosts: + free_vcpus = self.get_free_vcpus_by_host(host, hvisors) + ha_instances = 0 + nonha_instances = 0 + for project in self.session_data.project_names(): + for instance in ( + self.session_data.instances_by_host_and_project(host, + project)): + if instance.ha: + ha_instances += 1 + else: + nonha_instances += 1 + LOG.info('host %s has %d ha and %d non ha instances %s free ' + 'vcpus' % (host, ha_instances, nonha_instances, + free_vcpus)) + if ha_instances == 0: + # We do not want to choose host with HA instance + if host_to_be_empty: + # We have host candidate, let's see if this is better + if free_vcpus > host_free_vcpus: + # Choose as most vcpus free + host_to_be_empty = host + host_nonha_instances = nonha_instances + host_free_vcpus = 0 + elif free_vcpus == host_free_vcpus: + if nonha_instances < host_nonha_instances: + # Choose as most vcpus free and least instances + host_to_be_empty = host + host_nonha_instances = nonha_instances + host_free_vcpus = 0 + else: + # This is first host candidate + host_to_be_empty = host + host_nonha_instances = nonha_instances + host_free_vcpus = 0 + if not host_to_be_empty: + # No best cadidate found, let's choose last host in loop + host_to_be_empty = host + LOG.info('host %s selected to be empty' % host_to_be_empty) + # TBD It might yet not be possible to move instances away from this + # host is other hosts has vcpu capacity scattered. It should be checked + # if instances on this host fits to other hosts + return host_to_be_empty + + def confirm_host_to_be_emptied(self, host, statebase): + state = statebase + allowed_actions = ['MIGRATE', 'LIVE_MIGRATE', 'OWN_ACTION'] + actions_at = self.reply_time_str(self.conf.project_maintenance_reply) + reply_at = actions_at + self.session_data.set_projets_state(statebase) + projects = [] + for project in self.session_data.project_names(): + instances = ( + self.session_data.instances_by_host_and_project(host, project)) + if not instances: + continue + projects.append(project) + LOG.info('%s to project %s' % (state, project)) + info = "Instances\n" + for instance in instances: + info += ('%s\n' % instance) + LOG.info(info) + instance_ids = '%s/v1/maintenance/%s/%s' % (self.url, + self.session_id, + project) + metadata = self.session_data.metadata + self._project_notify(project, instance_ids, allowed_actions, + actions_at, reply_at, state, metadata) + self.start_timer(self.conf.project_maintenance_reply, + '%s_TIMEOUT' % statebase) + return self.wait_projects_state(state, '%s_TIMEOUT' % statebase, + projects) + + def confirm_maintenance_complete(self): + state = 'MAINTENANCE_COMPLETE' + metadata = self.session_data.metadata + actions_at = self.reply_time_str(self.conf.project_scale_in_reply) + reply_at = actions_at + self.session_data.set_projets_state(state) + for project in self.session_data.project_names(): + LOG.info('%s to project %s' % (state, project)) + instance_ids = '%s/v1/maintenance/%s/%s' % (self.url, + self.session_id, + project) + allowed_actions = [] + self._project_notify(project, instance_ids, allowed_actions, + actions_at, reply_at, state, metadata) + self.start_timer(self.conf.project_scale_in_reply, + '%s_TIMEOUT' % state) + return self.wait_projects_state(state, '%s_TIMEOUT' % state) + + def notify_action_done(self, project, instance_id): + instance_ids = instance_id + allowed_actions = [] + actions_at = None + reply_at = None + state = "INSTANCE_ACTION_DONE" + metadata = None + self._project_notify(project, instance_ids, allowed_actions, + actions_at, reply_at, state, metadata) + + def actions_to_have_empty_host(self, host): + # TBD these might be done parallel + for project in self.session_data.proj_instance_actions.keys(): + instances = ( + self.session_data.instances_by_host_and_project(host, project)) + for instance in instances: + action = (self.session_data.instance_action_by_project_reply( + project, instance.instance_id)) + LOG.info('Action %s instance %s ' % (action, + instance.instance_id)) + if action == 'MIGRATE': + if not self.migrate_server(instance.instance_id): + return False + self.notify_action_done(project, instance.instance_id) + elif action == 'OWN_ACTION': + pass + else: + # TBD LIVE_MIGRATE not supported + raise Exception('%s: instance %s action ' + '%s not supported' % + (self.session_id, instance.instance_id, + action)) + return self._wait_host_empty(host) + + def _wait_host_empty(self, host): + hid = self.nova.hypervisors.search(host)[0].id + vcpus_used_last = 0 + # wait 4min to get host empty + for j in range(48): + hvisor = self.nova.hypervisors.get(hid) + vcpus_used = hvisor.__getattr__('vcpus_used') + if vcpus_used > 0: + if vcpus_used != vcpus_used_last or vcpus_used_last == 0: + LOG.info('%s still has %d vcpus reserved. wait...' + % (host, vcpus_used)) + vcpus_used_last = vcpus_used + time.sleep(5) + else: + LOG.info('%s empty' % host) + return True + LOG.info('%s host still not empty' % host) + return False + + def migrate_server(self, server_id): + # TBD this method should be enhanced for errors and to have failed + # instance back to state active instead of error + server = self.nova.servers.get(server_id) + vm_state = server.__dict__.get('OS-EXT-STS:vm_state') + LOG.info('server %s state %s' % (server_id, vm_state)) + last_vm_state = vm_state + retry_migrate = 5 + while True: + try: + server.migrate() + time.sleep(5) + retries = 36 + while vm_state != 'resized' and retries > 0: + # try to confirm within 3min + server = self.nova.servers.get(server_id) + vm_state = server.__dict__.get('OS-EXT-STS:vm_state') + if vm_state == 'resized': + server.confirm_resize() + LOG.info('instance %s migration confirmed' % + server_id) + return True + if last_vm_state != vm_state: + LOG.info('instance %s state: %s' % (server_id, + vm_state)) + if vm_state == 'error': + LOG.error('instance %s migration failed, state: %s' + % (server_id, vm_state)) + return False + time.sleep(5) + retries = retries - 1 + last_vm_state = vm_state + # Timout waiting state to change + break + + except BadRequest: + if retry_migrate == 0: + raise Exception('server %s migrate failed' % server_id) + # Might take time for scheduler to sync inconsistent instance + # list for host + retry_time = 180 - (retry_migrate * 30) + LOG.info('server %s migrate failed, retry in %s sec' + % (server_id, retry_time)) + time.sleep(retry_time) + except Exception as e: + LOG.error('server %s migration failed, Exception=%s' % + (server_id, e)) + return False + finally: + retry_migrate = retry_migrate - 1 + LOG.error('instance %s migration timeout, state: %s' % + (server_id, vm_state)) + return False + + def host_maintenance(self, host): + LOG.info('maintaining host %s' % host) + # TBD Here we should call maintenance plugin given in maintenance + # session creation + time.sleep(5) + + def maintenance(self): + LOG.info("%s: maintenance called" % self.session_id) + self.initialize_server_info() + + if not self.projects_listen_alarm('maintenance.scheduled'): + self.state = 'FAILED' + return + + if not self.confirm_maintenance(): + self.state = 'FAILED' + return + + maintenance_empty_hosts = self.session_data.get_empty_hosts() + + if len(maintenance_empty_hosts) == 0: + if self.need_scale_in(): + LOG.info('%s: Need to scale in to get capacity for ' + 'empty host' % (self.session_id)) + self.state = 'SCALE_IN' + else: + LOG.info('%s: Free capacity, but need empty host' % + (self.session_id)) + self.state = 'PREPARE_MAINTENANCE' + else: + LOG.info('Empty host found %') + self.state = 'START_MAINTENANCE' + + maint_at = self.str_to_datetime( + self.session_data.maintenance_at) + if maint_at > datetime.datetime.utcnow(): + time_now = (datetime.datetime.utcnow().strftime( + '%Y-%m-%d %H:%M:%S')) + LOG.info('Time now: %s maintenance starts: %s....' % + (time_now, self.session_data.maintenance_at)) + td = maint_at - datetime.datetime.utcnow() + self.start_timer(td.total_seconds(), 'MAINTENANCE_START_TIMEOUT') + while not self.is_timer_expired('MAINTENANCE_START_TIMEOUT'): + time.sleep(1) + + time_now = (datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')) + LOG.info('Time to start maintenance: %s' % time_now) + + def scale_in(self): + LOG.info("%s: scale in" % self.session_id) + + if not self.confirm_scale_in(): + self.state = 'FAILED' + return + # TBD it takes time to have proper infromation updated about free + # capacity. Should make sure instances removed has also VCPUs removed + maintenance_empty_hosts = self.session_data.get_empty_hosts() + + if len(maintenance_empty_hosts) == 0: + if self.need_scale_in(): + LOG.info('%s: Need to scale in more to get capacity for ' + 'empty host' % (self.session_id)) + self.state = 'SCALE_IN' + else: + LOG.info('%s: Free capacity, but need empty host' % + (self.session_id)) + self.state = 'PREPARE_MAINTENANCE' + else: + LOG.info('Empty host found %') + self.state = 'START_MAINTENANCE' + self.update_server_info() + + def prepare_maintenance(self): + LOG.info("%s: prepare_maintenance called" % self.session_id) + host = self.find_host_to_be_empty() + if not self.confirm_host_to_be_emptied(host, 'PREPARE_MAINTENANCE'): + self.state = 'FAILED' + return + if not self.actions_to_have_empty_host(host): + # TBD we found the hard way that we couldn't make host empty and + # need to scale in more. Thigns might fail after this if any + # instance if error or Nova scheduler cached data corrupted for + # what instance on which host + LOG.info('%s: Failed to empty %s. Need to scale in more to get ' + 'capacity for empty host' % (self.session_id, host)) + self.state = 'SCALE_IN' + else: + self.state = 'START_MAINTENANCE' + self.update_server_info() + + def start_maintenance(self): + LOG.info("%s: start_maintenance called" % self.session_id) + empty_hosts = self.session_data.get_empty_hosts() + if not empty_hosts: + LOG.info("%s: No empty host to be maintained" % self.session_id) + self.state = 'FAILED' + return + maintained_hosts = self.session_data.maintained_hosts + if not maintained_hosts: + # First we maintain all empty hosts + for host in empty_hosts: + # TBD we wait host VCPUs to report right, but this is not + # correct place. We should handle this after scale in + # also this could be made parallel if more than one empty host + self._wait_host_empty(host) + + LOG.info('IN_MAINTENANCE host %s' % host) + self._admin_notify(self.conf.workflow_project, host, + 'IN_MAINTENANCE', + self.session_id) + self.host_maintenance(host) + self._admin_notify(self.conf.workflow_project, host, + 'MAINTENANCE_COMPLETE', + self.session_id) + LOG.info('MAINTENANCE_COMPLETE host %s' % host) + maintained_hosts.append(host) + else: + # Now we maintain hosts gone trough PLANNED_MAINTENANCE + hosts = [h for h in empty_hosts if h not in maintained_hosts] + for host in hosts: + # TBD this could be made parallel if more than one empty host + self._wait_host_empty(host) + + LOG.info('IN_MAINTENANCE host %s' % host) + self._admin_notify(self.conf.workflow_project, host, + 'IN_MAINTENANCE', + self.session_id) + self.host_maintenance(host) + self._admin_notify(self.conf.workflow_project, host, + 'MAINTENANCE_COMPLETE', + self.session_id) + LOG.info('MAINTENANCE_COMPLETE host %s' % host) + maintained_hosts.append(host) + if [h for h in self.session_data.hosts if h not in maintained_hosts]: + # Not all host maintained + self.state = 'PLANNED_MAINTENANCE' + else: + self.state = 'MAINTENANCE_COMPLETE' + + def planned_maintenance(self): + LOG.info("%s: planned_maintenance called" % self.session_id) + maintained_hosts = self.session_data.maintained_hosts + not_maintained_hosts = ([h for h in self.session_data.hosts if h not in + maintained_hosts]) + LOG.info("%s: Not maintained hosts: %s" % (self.session_id, + not_maintained_hosts)) + host = not_maintained_hosts[0] + if not self.confirm_host_to_be_emptied(host, 'PLANNED_MAINTENANCE'): + self.state = 'FAILED' + return + if not self.actions_to_have_empty_host(host): + # Failure in here might indicate action to move instance failed. + # This might be as Nova VCPU capacity was not yet emptied from + # expected target hosts + self.state = 'FAILED' + return + self.update_server_info() + self.state = 'START_MAINTENANCE' + + def maintenance_complete(self): + LOG.info("%s: maintenance_complete called" % self.session_id) + LOG.info('Projects may still need to up scale back to full ' + 'capcity') + self.confirm_maintenance_complete() + self.update_server_info() + self.state = 'MAINTENANCE_DONE' + + def maintenance_done(self): + pass + + def maintenance_failed(self): + LOG.info("%s: maintenance_failed called" % self.session_id)