diff --git a/fenix/api/v1/app.py b/fenix/api/v1/app.py index 7ede1c1..aa6eb81 100644 --- a/fenix/api/v1/app.py +++ b/fenix/api/v1/app.py @@ -34,7 +34,7 @@ api_opts = [ help="API host IP"), cfg.IntOpt('port', default=5000, - help="API port to use."), + help="API port to use.") ] CONF = cfg.CONF @@ -49,9 +49,8 @@ def setup_app(pecan_config=None, debug=False, argv=None): hooks.DBHook(), hooks.ContextHook(), hooks.RPCHook()] - app = pecan.make_app(pecan_config.app.root, - debug=CONF.debug, + debug=True, hooks=app_hooks, wrap_app=middleware.ParsableErrorMiddleware, guess_content_type_from_ext=False) diff --git a/fenix/api/v1/controllers/__init__.py b/fenix/api/v1/controllers/__init__.py index bc9bbc4..0c24c86 100644 --- a/fenix/api/v1/controllers/__init__.py +++ b/fenix/api/v1/controllers/__init__.py @@ -36,6 +36,12 @@ class V1Controller(rest.RestController): session = controller.SessionController() # maps to v1/maintenance/{session_id}/{project_id} project = controller.ProjectController() + # maps to v1/maintenance/{session_id}/{project_id}/{instance_id} + project_instance = controller.ProjectInstanceController() + # maps to v1/instance/{instance_id} + instance = controller.InstanceController() + # maps to v1/instance_group/{group_id} + instance_group = controller.InstanceGroupController() @pecan.expose() def _route(self, args): @@ -44,7 +50,6 @@ class V1Controller(rest.RestController): It allows to map controller URL with correct controller instance. By default, it maps with the same name. """ - try: route = self._routes.get(args[0], args[0]) depth = len(args) @@ -53,10 +58,17 @@ class V1Controller(rest.RestController): args[0] = 'http404-nonexistingcontroller' elif depth == 1: args[0] = route - elif depth == 2 and route == "maintenance": - args[0] = "session" + elif depth == 2: + if route == "maintenance": + args[0] = "session" + elif route in ["instance", "instance_group"]: + args[0] = route + else: + args[0] = 'http404-nonexistingcontroller' elif depth == 3 and route == "maintenance": args[0] = "project" + elif depth == 4 and route == "maintenance": + args[0] = "project_instance" else: args[0] = 'http404-nonexistingcontroller' except IndexError: diff --git a/fenix/api/v1/controllers/maintenance.py b/fenix/api/v1/controllers/maintenance.py index 1f9161f..65b27a4 100644 --- a/fenix/api/v1/controllers/maintenance.py +++ b/fenix/api/v1/controllers/maintenance.py @@ -45,7 +45,10 @@ class ProjectController(rest.RestController): abort(400) engine_data = self.engine_rpcapi.project_get_session(session_id, project_id) - response.text = jsonutils.dumps(engine_data) + try: + response.text = jsonutils.dumps(engine_data) + except TypeError: + response.body = jsonutils.dumps(engine_data) # PUT /v1/maintenance// @policy.authorize('maintenance:session:project', 'put') @@ -55,7 +58,33 @@ class ProjectController(rest.RestController): engine_data = self.engine_rpcapi.project_update_session(session_id, project_id, data) - response.text = jsonutils.dumps(engine_data) + try: + response.text = jsonutils.dumps(engine_data) + except TypeError: + response.body = jsonutils.dumps(engine_data) + + +class ProjectInstanceController(rest.RestController): + + name = 'project_instance' + + def __init__(self): + self.engine_rpcapi = maintenance.EngineRPCAPI() + + # PUT /v1/maintenance/// + @policy.authorize('maintenance:session:project:instance', 'put') + @expose(content_type='application/json') + def put(self, session_id, project_id, instance_id): + data = json.loads(request.body.decode('utf8')) + engine_data = ( + self.engine_rpcapi.project_update_session_instance(session_id, + project_id, + instance_id, + data)) + try: + response.text = jsonutils.dumps(engine_data) + except TypeError: + response.body = jsonutils.dumps(engine_data) class SessionController(rest.RestController): @@ -76,7 +105,10 @@ class SessionController(rest.RestController): if session is None: response.status = 404 return {"error": "Invalid session"} - response.text = jsonutils.dumps(session) + try: + response.text = jsonutils.dumps(session) + except TypeError: + response.body = jsonutils.dumps(session) # PUT /v1/maintenance/ @policy.authorize('maintenance:session', 'put') @@ -84,7 +116,10 @@ class SessionController(rest.RestController): def put(self, session_id): data = json.loads(request.body.decode('utf8')) engine_data = self.engine_rpcapi.admin_update_session(session_id, data) - response.text = jsonutils.dumps(engine_data) + try: + response.text = jsonutils.dumps(engine_data) + except TypeError: + response.body = jsonutils.dumps(engine_data) # DELETE /v1/maintenance/ @policy.authorize('maintenance:session', 'delete') @@ -94,7 +129,10 @@ class SessionController(rest.RestController): LOG.error("Unexpected data") abort(400) engine_data = self.engine_rpcapi.admin_delete_session(session_id) - response.text = jsonutils.dumps(engine_data) + try: + response.text = jsonutils.dumps(engine_data) + except TypeError: + response.body = jsonutils.dumps(engine_data) class MaintenanceController(rest.RestController): @@ -112,16 +150,120 @@ class MaintenanceController(rest.RestController): LOG.error("Unexpected data") abort(400) sessions = self.engine_rpcapi.admin_get() - response.text = jsonutils.dumps(sessions) + try: + response.text = jsonutils.dumps(sessions) + except TypeError: + response.body = jsonutils.dumps(sessions) # POST /v1/maintenance @policy.authorize('maintenance', 'post') @expose(content_type='application/json') def post(self): - LOG.debug("POST /v1/maintenance") data = json.loads(request.body.decode('utf8')) session = self.engine_rpcapi.admin_create_session(data) if session is None: response.status = 509 return {"error": "Too many sessions"} - response.text = jsonutils.dumps(session) + try: + response.text = jsonutils.dumps(session) + except TypeError: + response.body = jsonutils.dumps(session) + + +class InstanceController(rest.RestController): + + name = 'instance' + + def __init__(self): + self.engine_rpcapi = maintenance.EngineRPCAPI() + + # GET /v1/instance/ + @policy.authorize('instance', 'get') + @expose(content_type='application/json') + def get(self, instance_id): + if request.body: + LOG.error("Unexpected data") + abort(400) + session = self.engine_rpcapi.get_instance(instance_id) + if session is None: + response.status = 404 + return {"error": "Invalid session"} + try: + response.text = jsonutils.dumps(session) + except TypeError: + response.body = jsonutils.dumps(session) + + # PUT /v1/instance/ + @policy.authorize('instance', 'put') + @expose(content_type='application/json') + def put(self, instance_id): + data = json.loads(request.body.decode('utf8')) + engine_data = self.engine_rpcapi.update_instance(instance_id, + data) + try: + response.text = jsonutils.dumps(engine_data) + except TypeError: + response.body = jsonutils.dumps(engine_data) + + # DELETE /v1/instance/ + @policy.authorize('instance', 'delete') + @expose(content_type='application/json') + def delete(self, instance_id): + if request.body: + LOG.error("Unexpected data") + abort(400) + engine_data = self.engine_rpcapi.delete_instance(instance_id) + try: + response.text = jsonutils.dumps(engine_data) + except TypeError: + response.body = jsonutils.dumps(engine_data) + + +class InstanceGroupController(rest.RestController): + + name = 'instance_group' + + def __init__(self): + self.engine_rpcapi = maintenance.EngineRPCAPI() + + # GET /v1/instance_group/ + @policy.authorize('instance_group', 'get') + @expose(content_type='application/json') + def get(self, group_id): + if request.body: + LOG.error("Unexpected data") + abort(400) + session = self.engine_rpcapi.get_instance_group(group_id) + if session is None: + response.status = 404 + return {"error": "Invalid session"} + try: + response.text = jsonutils.dumps(session) + except TypeError: + response.body = jsonutils.dumps(session) + + # PUT /v1/instance_group/ + @policy.authorize('instance_group', 'put') + @expose(content_type='application/json') + def put(self, group_id): + data = json.loads(request.body.decode('utf8')) + engine_data = ( + self.engine_rpcapi.update_instance_group(group_id, data)) + try: + response.text = jsonutils.dumps(engine_data) + except TypeError: + response.body = jsonutils.dumps(engine_data) + + # DELETE /v1/instance_group/ + @policy.authorize('instance_group', 'delete') + @expose(content_type='application/json') + def delete(self, group_id): + if request.body: + LOG.error("Unexpected data") + abort(400) + engine_data = ( + self.engine_rpcapi.delete_instance_group(group_id)) + try: + response.text = jsonutils.dumps(engine_data) + except TypeError: + response.body = jsonutils.dumps(engine_data) diff --git a/fenix/api/v1/maintenance.py b/fenix/api/v1/maintenance.py index f309a35..956de21 100644 --- a/fenix/api/v1/maintenance.py +++ b/fenix/api/v1/maintenance.py @@ -58,3 +58,40 @@ class EngineRPCAPI(service.RPCClient): """Update maintenance workflow session project state""" return self.call('project_update_session', session_id=session_id, project_id=project_id, data=data) + + def project_update_session_instance(self, session_id, project_id, + instance_id, data): + """Update maintenance workflow session project instance project_state + + """ + return self.call('project_update_session_instance', + session_id=session_id, + project_id=project_id, + instance_id=instance_id, + data=data) + + def get_instance(self, instance_id): + """Get internal instance""" + return self.call('get_instance', instance_id=instance_id) + + def update_instance(self, instance_id, data): + """Update internal instance""" + return self.call('update_instance', instance_id=instance_id, + data=data) + + def delete_instance(self, instance_id): + """Delete internal instance""" + return self.call('delete_instance', instance_id=instance_id) + + def get_instance_group(self, group_id): + """Get internal instance group""" + return self.call('get_instance_group', group_id=group_id) + + def update_instance_group(self, group_id, data): + """Update internal instance group""" + return self.call('update_instance_group', group_id=group_id, + data=data) + + def delete_instance_group(self, group_id): + """Delete internal instance group""" + return self.call('delete_instance_group', group_id=group_id) diff --git a/fenix/db/api.py b/fenix/db/api.py index 3797a05..79250b0 100644 --- a/fenix/db/api.py +++ b/fenix/db/api.py @@ -197,3 +197,38 @@ def create_instances(instances): def remove_instance(session_id, instance_id): return IMPL.remove_instance(session_id, instance_id) + + +def update_project_instance(values): + return IMPL.update_project_instance(values) + + +def remove_project_instance(instance_id): + return IMPL.remove_project_instance(instance_id) + + +def project_instance_get(instance_id): + return IMPL.project_instance_get(instance_id) + + +def group_instances_get(group_id): + return IMPL.group_instances_get(group_id) + + +def instance_group_get(group_id): + return IMPL.instance_group_get(group_id) + + +def instance_group_get_detailed(group_id): + instances = group_instances_get(group_id) + group = instance_group_get(group_id) + group['instance_ids'] = instances + return group + + +def update_instance_group(values): + return IMPL.update_instance_group(values) + + +def remove_instance_group(instance_id): + return IMPL.remove_instance_group(instance_id) diff --git a/fenix/db/migration/alembic_migrations/versions/001_initial.py b/fenix/db/migration/alembic_migrations/versions/001_initial.py index 681a0f9..b15bd3f 100644 --- a/fenix/db/migration/alembic_migrations/versions/001_initial.py +++ b/fenix/db/migration/alembic_migrations/versions/001_initial.py @@ -141,6 +141,37 @@ def upgrade(): name='_session_local_file_uc'), sa.PrimaryKeyConstraint('id')) + op.create_table( + 'project_instances', + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('instance_id', sa.String(36), primary_key=True, + default=_generate_unicode_uuid), + sa.Column('project_id', sa.String(36), nullable=True), + sa.Column('group_id', sa.String(36), nullable=False), + sa.Column('instance_name', sa.String(length=255), + nullable=False), + sa.Column('max_interruption_time', sa.Integer, nullable=False), + sa.Column('migration_type', sa.String(36), nullable=False), + sa.Column('resource_mitigation', sa.Boolean, default=False), + sa.Column('lead_time', sa.Integer, nullable=False), + sa.PrimaryKeyConstraint('instance_id')) + + op.create_table( + 'instance_groups', + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('group_id', sa.String(36), primary_key=True), + sa.Column('project_id', sa.String(36), nullable=True), + sa.Column('group_name', sa.String(length=255), nullable=False), + sa.Column('anti_affinity_group', sa.Boolean, default=False), + sa.Column('max_instances_per_host', sa.Integer, + nullable=False), + sa.Column('max_impacted_members', sa.Integer, nullable=False), + sa.Column('recovery_time', sa.Integer, nullable=False), + sa.Column('resource_mitigation', sa.Boolean, default=False), + sa.PrimaryKeyConstraint('group_id')) + def downgrade(): op.drop_table('sessions') @@ -149,3 +180,5 @@ def downgrade(): op.drop_table('instances') op.drop_table('action_plugins') op.drop_table('downloads') + op.drop_table('project_instances') + op.drop_table('instance_groups') diff --git a/fenix/db/sqlalchemy/api.py b/fenix/db/sqlalchemy/api.py index 84df43d..20c5590 100644 --- a/fenix/db/sqlalchemy/api.py +++ b/fenix/db/sqlalchemy/api.py @@ -60,6 +60,9 @@ def setup_db(): models.MaintenanceHost.metadata.create_all(engine) models.MaintenanceProject.metadata.create_all(engine) models.MaintenanceInstance.metadata.create_all(engine) + models.ProjectInstance.metadata.create_all(engine) + models.MaintenanceProject.metadata.create_all(engine) + models.InstanceGroup.metadata.create_all(engine) except sa.exc.OperationalError as e: LOG.error("Database registration exception: %s", e) return False @@ -499,3 +502,90 @@ def remove_instance(session_id, instance_id): model='instances') session.delete(minstance) + + +# Project instances +def _project_instance_get(session, instance_id): + query = model_query(models.ProjectInstance, session) + return query.filter_by(instance_id=instance_id).first() + + +def project_instance_get(instance_id): + return _project_instance_get(get_session(), instance_id) + + +def update_project_instance(values): + values = values.copy() + minstance = models.ProjectInstance() + minstance.update(values) + + session = get_session() + with session.begin(): + try: + minstance.save(session=session) + except common_db_exc.DBDuplicateEntry as e: + # raise exception about duplicated columns (e.columns) + raise db_exc.FenixDBDuplicateEntry( + model=minstance.__class__.__name__, columns=e.columns) + + return project_instance_get(minstance.instance_id) + + +def remove_project_instance(instance_id): + session = get_session() + with session.begin(): + minstance = _project_instance_get(session, instance_id) + if not minstance: + # raise not found error + raise db_exc.FenixDBNotFound(session, instance_id=instance_id, + model='project_instances') + + session.delete(minstance) + + +# Instances groups +def _instance_group_get(session, group_id): + query = model_query(models.InstanceGroup, session) + return query.filter_by(group_id=group_id).first() + + +def instance_group_get(group_id): + return _instance_group_get(get_session(), group_id) + + +def _group_instances_get(session, group_id): + query = model_query(models.ProjectInstance, session) + return query.filter_by(group_id=group_id).all() + + +def group_instances_get(group_id): + return _group_instances_get(get_session(), group_id) + + +def update_instance_group(values): + values = values.copy() + minstance = models.InstanceGroup() + minstance.update(values) + + session = get_session() + with session.begin(): + try: + minstance.save(session=session) + except common_db_exc.DBDuplicateEntry as e: + # raise exception about duplicated columns (e.columns) + raise db_exc.FenixDBDuplicateEntry( + model=minstance.__class__.__name__, columns=e.columns) + + return instance_group_get(minstance.group_id) + + +def remove_instance_group(group_id): + session = get_session() + with session.begin(): + minstance = _instance_group_get(session, group_id) + if not minstance: + # raise not found error + raise db_exc.FenixDBNotFound(session, group_id=group_id, + model='instance_groups') + + session.delete(minstance) diff --git a/fenix/db/sqlalchemy/models.py b/fenix/db/sqlalchemy/models.py index 62ecc33..837746e 100644 --- a/fenix/db/sqlalchemy/models.py +++ b/fenix/db/sqlalchemy/models.py @@ -154,3 +154,39 @@ class MaintenanceDownload(mb.FenixBase): def to_dict(self): return super(MaintenanceDownload, self).to_dict() + + +class ProjectInstance(mb.FenixBase): + """Project instances""" + + __tablename__ = 'project_instances' + + instance_id = sa.Column(sa.String(36), primary_key=True) + project_id = sa.Column(sa.String(36), nullable=True) + group_id = sa.Column(sa.String(36), nullable=False) + instance_name = sa.Column(sa.String(length=255), nullable=False) + max_interruption_time = sa.Column(sa.Integer, nullable=False) + migration_type = sa.Column(sa.String(36), nullable=False) + resource_mitigation = sa.Column(sa.Boolean, default=False) + lead_time = sa.Column(sa.Integer, nullable=False) + + def to_dict(self): + return super(ProjectInstance, self).to_dict() + + +class InstanceGroup(mb.FenixBase): + """Instances groups""" + + __tablename__ = 'instance_groups' + + group_id = sa.Column(sa.String(36), primary_key=True) + project_id = sa.Column(sa.String(36), nullable=True) + group_name = sa.Column(sa.String(length=255), nullable=False) + anti_affinity_group = sa.Column(sa.Boolean, default=False) + max_instances_per_host = sa.Column(sa.Integer, nullable=True) + max_impacted_members = sa.Column(sa.Integer, nullable=False) + recovery_time = sa.Column(sa.Integer, nullable=False) + resource_mitigation = sa.Column(sa.Boolean, default=False) + + def to_dict(self): + return super(InstanceGroup, self).to_dict() diff --git a/fenix/policies/__init__.py b/fenix/policies/__init__.py index f046bb0..759a09c 100644 --- a/fenix/policies/__init__.py +++ b/fenix/policies/__init__.py @@ -13,15 +13,21 @@ import itertools from fenix.policies import base +from fenix.policies import instance +from fenix.policies import instance_group from fenix.policies import maintenance from fenix.policies import project +from fenix.policies import project_instance from fenix.policies import session def list_rules(): return itertools.chain( base.list_rules(), + instance.list_rules(), + instance_group.list_rules(), maintenance.list_rules(), session.list_rules(), project.list_rules(), + project_instance.list_rules() ) diff --git a/fenix/policies/instance.py b/fenix/policies/instance.py new file mode 100644 index 0000000..98bd05a --- /dev/null +++ b/fenix/policies/instance.py @@ -0,0 +1,57 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_policy import policy + +from fenix.policies import base + +POLICY_ROOT = 'fenix:instance:%s' + +instance_policies = [ + policy.DocumentedRuleDefault( + name=POLICY_ROOT % 'get', + check_str=base.RULE_ADMIN_OR_OWNER, + description='Policy rule for showing instance API.', + operations=[ + { + 'path': '/{api_version}/instance/{instance_id}/', + 'method': 'GET' + } + ] + ), + policy.DocumentedRuleDefault( + name=POLICY_ROOT % 'put', + check_str=base.RULE_ADMIN_OR_OWNER, + description='Policy rule for updating instance API.', + operations=[ + { + 'path': '/{api_version}/instance/{instance_id}/', + 'method': 'PUT' + } + ] + ), + policy.DocumentedRuleDefault( + name=POLICY_ROOT % 'delete', + check_str=base.RULE_ADMIN_OR_OWNER, + description='Policy rule for deleting instance API', + operations=[ + { + 'path': '/{api_version}/instance/{instance_id}', + 'method': 'DELETE' + } + ] + ) +] + + +def list_rules(): + return instance_policies diff --git a/fenix/policies/instance_group.py b/fenix/policies/instance_group.py new file mode 100644 index 0000000..a9adffd --- /dev/null +++ b/fenix/policies/instance_group.py @@ -0,0 +1,57 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_policy import policy + +from fenix.policies import base + +POLICY_ROOT = 'fenix:instance_group:%s' + +instance_group_policies = [ + policy.DocumentedRuleDefault( + name=POLICY_ROOT % 'get', + check_str=base.RULE_ADMIN_OR_OWNER, + description='Policy rule for showing instance_group API.', + operations=[ + { + 'path': '/{api_version}/instance_group/{group_id}/', + 'method': 'GET' + } + ] + ), + policy.DocumentedRuleDefault( + name=POLICY_ROOT % 'put', + check_str=base.RULE_ADMIN_OR_OWNER, + description='Policy rule for updating instance_group API.', + operations=[ + { + 'path': '/{api_version}/instance_group/{group_id}/', + 'method': 'PUT' + } + ] + ), + policy.DocumentedRuleDefault( + name=POLICY_ROOT % 'delete', + check_str=base.RULE_ADMIN_OR_OWNER, + description='Policy rule for deleting instance_group API', + operations=[ + { + 'path': '/{api_version}/instance_group/{group_id}', + 'method': 'DELETE' + } + ] + ) +] + + +def list_rules(): + return instance_group_policies diff --git a/fenix/policies/project_instance.py b/fenix/policies/project_instance.py new file mode 100644 index 0000000..bdd93d1 --- /dev/null +++ b/fenix/policies/project_instance.py @@ -0,0 +1,37 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_policy import policy + +from fenix.policies import base + +POLICY_ROOT = 'fenix:maintenance:session:project:instance:%s' + +project_policies = [ + policy.DocumentedRuleDefault( + name=POLICY_ROOT % 'put', + check_str=base.RULE_ADMIN_OR_OWNER, + description='Policy rule for setting project session affected ' + 'instance action and state API.', + operations=[ + { + 'path': '/{api_version}/maintenance/{seission_id}/' + '{project_id}/{instance_id}', + 'method': 'PUT' + } + ] + ) +] + + +def list_rules(): + return project_policies diff --git a/fenix/utils/service.py b/fenix/utils/service.py index 2352e43..d761b09 100644 --- a/fenix/utils/service.py +++ b/fenix/utils/service.py @@ -36,6 +36,7 @@ from shutil import rmtree from uuid import uuid1 as generate_uuid from fenix import context +from fenix.db import api as db_api from fenix.utils.download import download_url import fenix.utils.identity_auth @@ -191,6 +192,47 @@ class EngineEndpoint(object): data["instance_actions"].copy()) return data + def project_update_session_instance(self, ctx, session_id, project_id, + instance_id, data): + """Update maintenance workflow session project instance state""" + LOG.info("EngineEndpoint: project_update_session_instance") + session_obj = self.workflow_sessions[session_id] + instance = session_obj.instance_by_id(instance_id) + instance.project_state = data["state"] + if "instance_action" in data: + instance.action = data["instance_action"] + return data + + def get_instance(self, ctx, instance_id): + LOG.info("EngineEndpoint: get_instance") + instance = db_api.project_instance_get(instance_id) + return instance + + def update_instance(self, ctx, instance_id, data): + LOG.info("EngineEndpoint: update_instance") + instance = db_api.update_project_instance(data) + return instance + + def delete_instance(self, ctx, instance_id): + LOG.info("EngineEndpoint: delete_instance") + db_api.remove_project_instance(instance_id) + return {} + + def get_instance_group(self, ctx, group_id): + LOG.info("EngineEndpoint: get_instance_group") + instance_group = db_api.instance_group_get_detailed(group_id) + return instance_group + + def update_instance_group(self, ctx, group_id, data): + LOG.info("EngineEndpoint: update_instance_group") + instance_group = db_api.update_instance_group(data) + return instance_group + + def delete_instance_group(self, ctx, group_id): + LOG.info("EngineEndpoint: delete_instance_group") + db_api.remove_instance_group(group_id) + return {} + class RPCServer(service.Service): diff --git a/fenix/utils/thread.py b/fenix/utils/thread.py new file mode 100644 index 0000000..a035b14 --- /dev/null +++ b/fenix/utils/thread.py @@ -0,0 +1,27 @@ +# Copyright (c) 2019 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +def run_async(func): + from functools import wraps + from threading import Thread + + @wraps(func) + def async_func(*args, **kwargs): + thread = Thread(target=func, args=args, kwargs=kwargs) + thread.start() + return thread + + return async_func diff --git a/fenix/workflow/workflow.py b/fenix/workflow/workflow.py index 7f85f20..54c0a7e 100644 --- a/fenix/workflow/workflow.py +++ b/fenix/workflow/workflow.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. import aodhclient.client as aodhclient +from aodhclient.exceptions import BadRequest from ast import literal_eval import collections from oslo_log import log as logging @@ -293,6 +294,10 @@ class BaseWorkflow(Thread): if instance.host == host and instance.project_id == project] + def instances_by_host(self, host): + return [instance for instance in self.instances if + instance.host == host] + def instance_action_by_project_reply(self, project, instance_id): return self.proj_instance_actions[project][instance_id] @@ -416,9 +421,13 @@ class BaseWorkflow(Thread): LOG.info("%s: done" % self.session_id) def projects_listen_alarm(self, match_event): - match_projects = ([str(alarm['project_id']) for alarm in - self.aodh.alarm.list() if - str(alarm['event_rule']['event_type']) == + try: + alarms = self.aodh.alarm.list({'all_projects': True}) + except BadRequest: + # Train or earlier + alarms = self.aodh.alarm.list() + match_projects = ([str(alarm['project_id']) for alarm in alarms + if str(alarm['event_rule']['event_type']) == match_event]) all_projects_match = True for project in self.project_names(): @@ -435,6 +444,10 @@ class BaseWorkflow(Thread): self.session_id, project_id) + if "/maintenance/" not in instance_ids: + # Single instance + reply_url += "/%s" % instance_ids[0] + payload = dict(project_id=project_id, instance_ids=instance_ids, allowed_actions=allowed_actions, @@ -513,3 +526,28 @@ class BaseWorkflow(Thread): pnames = self._project_names_in_state(projects, state) LOG.error('%s: projects not answered: %s' % (self.session_id, pnames)) return False + + def wait_instance_reply_state(self, state, instance, timer_name): + state_ack = 'ACK_%s' % state + state_nack = 'NACK_%s' % state + LOG.info('wait_instance_reply_state: %s' % instance.instance_id) + while not self.is_timer_expired(timer_name): + answer = instance.project_state + if answer == state: + pass + else: + self.stop_timer(timer_name) + if answer == state_ack: + LOG.info('%s in: %s' % (instance.instance_id, answer)) + return True + elif answer == state_nack: + LOG.info('%s in: %s' % (instance.instance_id, answer)) + return False + else: + LOG.error('%s in invalid state: %s' % + (instance.instance_id, answer)) + return False + time.sleep(1) + LOG.error('%s: timer %s expired' % + (self.session_id, timer_name)) + return False diff --git a/fenix/workflow/workflows/vnf.py b/fenix/workflow/workflows/vnf.py new file mode 100644 index 0000000..471aee7 --- /dev/null +++ b/fenix/workflow/workflows/vnf.py @@ -0,0 +1,1087 @@ +# Copyright (c) 2019 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import datetime +from importlib import import_module +try: + from importlib.machinery import SourceFileLoader + + def mod_loader_action_instance(mname, mpath, session_instance, + ap_db_instance): + mi = SourceFileLoader(mname, mpath).load_module() + return mi.ActionPlugin(session_instance, ap_db_instance) +except ImportError: + from imp import load_source + + def mod_loader_action_instance(mname, mpath, session_instance, + ap_db_instance): + mi = load_source(mname, mpath) + return mi.ActionPlugin(session_instance, ap_db_instance) + +from novaclient import API_MAX_VERSION as nova_max_version +import novaclient.client as novaclient +from novaclient.exceptions import BadRequest + +import os +from oslo_log import log as logging +import time + +from fenix.db import api as db_api +from fenix.db import exceptions as db_exc +from fenix.utils.thread import run_async +from fenix.utils.time import datetime_to_str +from fenix.utils.time import is_time_after_time +from fenix.utils.time import reply_time_str +from fenix.utils.time import time_now_str + + +from fenix.workflow.workflow import BaseWorkflow + +LOG = logging.getLogger(__name__) + + +class Workflow(BaseWorkflow): + + def __init__(self, conf, session_id, data): + super(Workflow, self).__init__(conf, session_id, data) + nova_version = 2.53 + self.nova = novaclient.Client(nova_version, session=self.auth_session) + max_nova_server_ver = float(self.nova.versions.get_current().version) + max_nova_client_ver = float(nova_max_version.get_string()) + if max_nova_server_ver > 2.53 and max_nova_client_ver > 2.53: + if max_nova_client_ver <= max_nova_server_ver: + nova_version = max_nova_client_ver + else: + nova_version = max_nova_server_ver + self.nova = novaclient.Client(nova_version, + session=self.auth_session) + if not self.hosts: + self.hosts = self._init_hosts_by_services() + else: + self._init_update_hosts() + LOG.info("%s: initialized. Nova version %f" % (self.session_id, + nova_version)) + + LOG.info('%s: Execute pre action plugins' % (self.session_id)) + self.maintenance_by_plugin_type("localhost", "pre") + # How many members of each instance group are currently affected + self.group_impacted_members = {} + + def _init_hosts_by_services(self): + LOG.info("%s: Dicovering hosts by Nova services" % self.session_id) + hosts = [] + hostnames = [] + controllers = self.nova.services.list(binary='nova-conductor') + for controller in controllers: + host = {} + service_host = str(controller.__dict__.get(u'host')) + if service_host in hostnames: + continue + host['hostname'] = service_host + hostnames.append(service_host) + host['type'] = 'controller' + if str(controller.__dict__.get(u'status')) == 'disabled': + LOG.error("%s: %s nova-conductor disabled before maintenance" + % (self.session_id, service_host)) + raise Exception("%s: %s already disabled" + % (self.session_id, service_host)) + host['disabled'] = False + host['details'] = str(controller.__dict__.get(u'id')) + host['maintained'] = False + hosts.append(host) + + computes = self.nova.services.list(binary='nova-compute') + for compute in computes: + host = {} + service_host = str(compute.__dict__.get(u'host')) + host['hostname'] = service_host + host['type'] = 'compute' + if str(compute.__dict__.get(u'status')) == 'disabled': + LOG.error("%s: %s nova-compute disabled before maintenance" + % (self.session_id, service_host)) + raise Exception("%s: %s already disabled" + % (self.session_id, service_host)) + host['disabled'] = False + host['details'] = str(compute.__dict__.get(u'id')) + host['maintained'] = False + hosts.append(host) + + return db_api.create_hosts_by_details(self.session_id, hosts) + + def _init_update_hosts(self): + LOG.info("%s: Update given hosts" % self.session_id) + controllers = self.nova.services.list(binary='nova-conductor') + computes = self.nova.services.list(binary='nova-compute') + + for host in self.hosts: + hostname = host.hostname + host.disabled = False + host.maintained = False + match = [compute for compute in computes if + hostname == compute.host] + if match: + host.type = 'compute' + if match[0].status == 'disabled': + LOG.error("%s: %s nova-compute disabled before maintenance" + % (self.session_id, hostname)) + raise Exception("%s: %s already disabled" + % (self.session_id, hostname)) + host.details = match[0].id + continue + if ([controller for controller in controllers if + hostname == controller.host]): + host.type = 'controller' + continue + host.type = 'other' + + def disable_host_nova_compute(self, hostname): + LOG.info('%s: disable nova-compute on host %s' % (self.session_id, + hostname)) + host = self.get_host_by_name(hostname) + try: + self.nova.services.disable_log_reason(host.details, "maintenance") + except TypeError: + LOG.debug('%s: Using old API to disable nova-compute on host %s' % + (self.session_id, hostname)) + self.nova.services.disable_log_reason(hostname, "nova-compute", + "maintenance") + host.disabled = True + + def enable_host_nova_compute(self, hostname): + LOG.info('%s: enable nova-compute on host %s' % (self.session_id, + hostname)) + host = self.get_host_by_name(hostname) + try: + self.nova.services.enable(host.details) + except TypeError: + LOG.debug('%s: Using old API to enable nova-compute on host %s' % + (self.session_id, hostname)) + self.nova.services.enable(hostname, "nova-compute") + host.disabled = False + + def get_compute_hosts(self): + return [host.hostname for host in self.hosts + if host.type == 'compute'] + + def get_empty_computes(self): + all_computes = self.get_compute_hosts() + instance_computes = [] + for instance in self.instances: + if instance.host not in instance_computes: + instance_computes.append(instance.host) + return [host for host in all_computes if host not in instance_computes] + + def get_instance_details(self, instance): + network_interfaces = next(iter(instance.addresses.values())) + for network_interface in network_interfaces: + _type = network_interface.get('OS-EXT-IPS:type') + if _type == "floating": + LOG.info('Instance with floating ip: %s %s' % + (instance.id, instance.name)) + return "floating_ip" + return None + + def _fenix_instance(self, project_id, instance_id, instance_name, host, + state, details, action=None, project_state=None, + action_done=False): + instance = {'session_id': self.session_id, + 'instance_id': instance_id, + 'action': action, + 'project_id': project_id, + 'instance_id': instance_id, + 'project_state': project_state, + 'state': state, + 'instance_name': instance_name, + 'action_done': action_done, + 'host': host, + 'details': details} + return instance + + def initialize_server_info(self): + project_ids = [] + instances = [] + compute_hosts = self.get_compute_hosts() + opts = {'all_tenants': True} + servers = self.nova.servers.list(detailed=True, search_opts=opts) + for server in servers: + try: + host = str(server.__dict__.get('OS-EXT-SRV-ATTR:host')) + if host not in compute_hosts: + continue + project_id = str(server.tenant_id) + instance_name = str(server.name) + instance_id = str(server.id) + details = self.get_instance_details(server) + state = str(server.__dict__.get('OS-EXT-STS:vm_state')) + except Exception: + raise Exception('can not get params from server=%s' % server) + instances.append(self._fenix_instance(project_id, instance_id, + instance_name, host, state, + details)) + if project_id not in project_ids: + project_ids.append(project_id) + + if len(project_ids): + self.projects = self.init_projects(project_ids) + else: + LOG.info('%s: No projects on computes under maintenance' % + self.session_id) + if len(instances): + self.instances = self.add_instances(instances) + else: + LOG.info('%s: No instances on computes under maintenance' % + self.session_id) + LOG.info(str(self)) + + def update_instance(self, project_id, instance_id, instance_name, host, + state, details): + if self.instance_id_found(instance_id): + # TBD Might need to update instance variables here if not done + # somewhere else + return + elif self.instance_name_found(instance_name): + # Project has made re-instantiation, remove old add new + old_instance = self.instance_by_name(instance_name) + instance = self._fenix_instance(project_id, instance_id, + instance_name, host, + state, details, + old_instance.action, + old_instance.project_state, + old_instance.action_done) + self.instances.append(self.add_instance(instance)) + self.remove_instance(old_instance) + else: + # Instance new, as project has added instances + instance = self._fenix_instance(project_id, instance_id, + instance_name, host, + state, details) + self.instances.append(self.add_instance(instance)) + + def remove_non_existing_instances(self, instance_ids): + remove_instances = [instance for instance in + self.instances if instance.instance_id not in + instance_ids] + for instance in remove_instances: + # Instance deleted, as project possibly scaled down + self.remove_instance(instance) + + def update_server_info(self): + # TBD This keeps internal instance information up-to-date and prints + # it out. Same could be done by updating the information when changed + # Anyhow this also double checks information against Nova + instance_ids = [] + compute_hosts = self.get_compute_hosts() + opts = {'all_tenants': True} + servers = self.nova.servers.list(detailed=True, search_opts=opts) + for server in servers: + try: + host = str(server.__dict__.get('OS-EXT-SRV-ATTR:host')) + if host not in compute_hosts: + continue + project_id = str(server.tenant_id) + instance_name = str(server.name) + instance_id = str(server.id) + details = self.get_instance_details(server) + state = str(server.__dict__.get('OS-EXT-STS:vm_state')) + except Exception: + LOG.error('can not get params from server: %s, retry...' % + str(server.id)) + # TBD sometimes cannot get all parameters, this retry can be + # enhanced when caught better + time.sleep(5) + try: + server = self.nova.servers.get(str(server.id)) + host = str(server.__dict__.get('OS-EXT-SRV-ATTR:host')) + if host not in compute_hosts: + continue + project_id = str(server.tenant_id) + instance_name = str(server.name) + instance_id = str(server.id) + details = self.get_instance_details(server) + state = str(server.__dict__.get('OS-EXT-STS:vm_state')) + except Exception: + raise Exception('can not get params from server: %s' % + str(server.id)) + LOG.info('got params from server: %s' % str(server.id)) + self.update_instance(project_id, instance_id, instance_name, host, + state, details) + instance_ids.append(instance_id) + self.remove_non_existing_instances(instance_ids) + + LOG.info(str(self)) + + def confirm_maintenance(self): + allowed_actions = [] + actions_at = self.session.maintenance_at + state = 'MAINTENANCE' + self.set_projets_state(state) + all_replied = False + project_not_replied = None + retry = 2 + while not all_replied: + for project in self.project_names(): + if (project_not_replied is not None and project not in + project_not_replied): + continue + LOG.info('\nMAINTENANCE to project %s\n' % project) + instance_ids = '%s/v1/maintenance/%s/%s' % (self.url, + self.session_id, + project) + reply_at = reply_time_str(self.conf.project_maintenance_reply) + if is_time_after_time(reply_at, actions_at): + LOG.error('%s: No time for project to answer in state: %s' + % (self.session_id, state)) + self.session.state = "MAINTENANCE_FAILED" + return False + metadata = self.session.meta + self._project_notify(project, instance_ids, allowed_actions, + actions_at, reply_at, state, metadata) + self.start_timer(self.conf.project_maintenance_reply, + 'MAINTENANCE_TIMEOUT') + + all_replied = self.wait_projects_state(state, '%s_TIMEOUT' % state) + if not all_replied: + if retry == 0: + LOG.info('confirm_maintenance failed after retries') + break + else: + LOG.info('confirm_maintenance retry') + projects = self.get_projects_with_state() + project_not_replied = ( + self._project_names_in_state(projects, state)) + retry -= 1 + return all_replied + + def confirm_scale_in(self): + allowed_actions = [] + actions_at = reply_time_str(self.conf.project_scale_in_reply) + reply_at = actions_at + state = 'SCALE_IN' + self.set_projets_state(state) + all_replied = False + project_not_replied = None + retry = 2 + while not all_replied: + for project in self.project_names(): + if (project_not_replied is not None and project not in + project_not_replied): + continue + LOG.info('\nSCALE_IN to project %s\n' % project) + instance_ids = '%s/v1/maintenance/%s/%s' % (self.url, + self.session_id, + project) + metadata = self.session.meta + self._project_notify(project, instance_ids, allowed_actions, + actions_at, reply_at, state, metadata) + self.start_timer(self.conf.project_scale_in_reply, + 'SCALE_IN_TIMEOUT') + + all_replied = self.wait_projects_state(state, '%s_TIMEOUT' % state) + if not all_replied: + if retry == 0: + LOG.info('confirm_scale_in failed after retries') + break + else: + LOG.info('confirm_scale_in retry') + projects = self.get_projects_with_state() + project_not_replied = ( + self._project_names_in_state(projects, state)) + retry -= 1 + return all_replied + + def need_scale_in(self): + hvisors = self.nova.hypervisors.list(detailed=True) + prev_vcpus = 0 + free_vcpus = 0 + prev_hostname = '' + LOG.info('checking hypervisors for VCPU capacity') + for hvisor in hvisors: + hostname = ( + hvisor.__getattr__('hypervisor_hostname').split(".", 1)[0]) + if hostname not in self.get_compute_hosts(): + continue + vcpus = hvisor.__getattr__('vcpus') + vcpus_used = hvisor.__getattr__('vcpus_used') + if prev_vcpus != 0 and prev_vcpus != vcpus: + raise Exception('%s: %d vcpus on %s does not match to' + '%d on %s' + % (self.session_id, vcpus, hostname, + prev_vcpus, prev_hostname)) + free_vcpus += vcpus - vcpus_used + prev_vcpus = vcpus + prev_hostname = hostname + if free_vcpus >= vcpus: + # TBD vcpu capacity might be too scattered so moving instances from + # one host to other host still might not succeed. At least with + # NUMA and CPU pinning, one should calculate and ask specific + # instances + return False + else: + return True + + def get_vcpus_by_host(self, host, hvisors): + hvisor = ([h for h in hvisors if + h.__getattr__('hypervisor_hostname').split(".", 1)[0] + == host][0]) + vcpus = hvisor.__getattr__('vcpus') + vcpus_used = hvisor.__getattr__('vcpus_used') + return vcpus, vcpus_used + + def find_host_to_be_empty(self, need_empty, weighted_hosts, + act_instances_hosts): + print("need_empty: %s" % need_empty) + hosts_to_be_empty = [] + for vcpus_used in sorted(weighted_hosts.keys()): + print("vcpus_used in weighted_hosts: %s" % vcpus_used) + weighted_candidates = weighted_hosts[vcpus_used] + if len(weighted_candidates) == need_empty: + # Happened to be exact match to needed + hosts_to_be_empty = weighted_hosts[vcpus_used] + print("hosts to be empty: %s" % hosts_to_be_empty) + elif len(weighted_candidates) > need_empty: + # More candidates than we need, dig deeper to act_instances + for act_instances in sorted(act_instances_hosts.keys()): + print("act_instances in act_instances_hosts: %s" + % act_instances) + for candidate in weighted_candidates: + print("candidate: %s" % candidate) + if candidate in act_instances_hosts[act_instances]: + print("host to be empty: %s" % candidate) + hosts_to_be_empty.append(candidate) + if len(hosts_to_be_empty) == need_empty: + break + if len(hosts_to_be_empty) == need_empty: + break + if len(hosts_to_be_empty) == need_empty: + break + if len(hosts_to_be_empty) != need_empty: + print("we failed to search hosts to be empty!!!") + return hosts_to_be_empty + + def make_empty_hosts(self, state): + # TBD disable nova-compute on empty host or parallel to be empty + # host so cannot move instance to there. Otherwise make math + # to find the target host and give it to move operation + # done with temporarily_disabled_hosts + temporarily_disabled_hosts = [] + hvisors = self.nova.hypervisors.list(detailed=True) + weighted_hosts = {} + act_instances_hosts = {} + self.empty_hosts = [] + full_capacity = 0 + used_capacity = 0 + for host in self.get_compute_hosts(): + vcpus, vcpus_used = self.get_vcpus_by_host(host, hvisors) + full_capacity += vcpus + used_capacity += vcpus_used + act_instances = 0 + for project in self.project_names(): + for instance in (self.instances_by_host_and_project(host, + project)): + if instance.details and "floating_ip" in instance.details: + act_instances += 1 + if vcpus_used == 0: + self.empty_hosts.append(host) + temporarily_disabled_hosts.append(host) + self.disable_host_nova_compute(host) + elif vcpus_used == vcpus: + # We do not choose full host + continue + else: + if vcpus_used not in weighted_hosts: + weighted_hosts[vcpus_used] = [host] + else: + weighted_hosts[vcpus_used].append(host) + if act_instances not in act_instances_hosts: + act_instances_hosts[act_instances] = [host] + else: + act_instances_hosts[act_instances].append(host) + # how many empty hosts possible + parallel_hosts = int((full_capacity - used_capacity) / vcpus) + need_empty = parallel_hosts - len(self.empty_hosts) + + if need_empty != 0: + hosts_to_be_empty = self.find_host_to_be_empty(need_empty, + weighted_hosts, + act_instances_hosts) + thrs = [] + for host in hosts_to_be_empty: + temporarily_disabled_hosts.append(host) + self.disable_host_nova_compute(host) + thrs.append(self.actions_to_have_empty_host(host, state)) + LOG.info("waiting hosts %s to be empty..." % hosts_to_be_empty) + for thr in thrs: + thr.join() + LOG.info("hosts %s empty..." % hosts_to_be_empty) + for host in temporarily_disabled_hosts: + self.enable_host_nova_compute(host) + return True + + def confirm_instance_action(self, instance, state): + instance_id = instance.instance_id + LOG.info('%s to instance %s' % (state, instance_id)) + allowed_actions = ['MIGRATE', 'LIVE_MIGRATE', 'OWN_ACTION'] + try: + instance_constraints = db_api.project_instance_get(instance_id) + wait_time = instance_constraints.lead_time + LOG.info("%s actions_at from constraints lead_time: %s" % + (instance_id, wait_time)) + except db_exc.FenixDBNotFound: + wait_time = self.conf.project_maintenance_reply + actions_at = reply_time_str(wait_time) + reply_at = actions_at + instance.project_state = state + metadata = self.session.meta + retry = 2 + replied = False + while not replied: + metadata = self.session.meta + self._project_notify(instance.project_id, [instance_id], + allowed_actions, actions_at, reply_at, + state, metadata) + timer = '%s_%s_TIMEOUT' % (state, instance_id) + self.start_timer(self.conf.project_maintenance_reply, timer) + replied = self.wait_instance_reply_state(state, instance, timer) + if not replied: + if retry == 0: + LOG.info('confirm_instance_action for %s failed after ' + 'retries' % instance.instance_id) + break + else: + LOG.info('confirm_instance_action for %s retry' + % instance.instance_id) + else: + break + retry -= 1 + return replied + + def confirm_maintenance_complete(self): + state = 'MAINTENANCE_COMPLETE' + metadata = self.session.meta + actions_at = reply_time_str(self.conf.project_scale_in_reply) + reply_at = actions_at + self.set_projets_state(state) + all_replied = False + project_not_replied = None + retry = 2 + while not all_replied: + for project in self.project_names(): + if (project_not_replied is not None and project not in + project_not_replied): + continue + LOG.info('%s to project %s' % (state, project)) + instance_ids = '%s/v1/maintenance/%s/%s' % (self.url, + self.session_id, + project) + allowed_actions = [] + self._project_notify(project, instance_ids, allowed_actions, + actions_at, reply_at, state, metadata) + self.start_timer(self.conf.project_scale_in_reply, + '%s_TIMEOUT' % state) + + all_replied = self.wait_projects_state(state, '%s_TIMEOUT' % state) + if not all_replied: + if retry == 0: + LOG.info('confirm_maintenance_complete failed after ' + 'retries') + break + else: + LOG.info('confirm_maintenance_complete retry') + projects = self.get_projects_with_state() + project_not_replied = ( + self._project_names_in_state(projects, state)) + retry -= 1 + return all_replied + + def notify_action_done(self, instance): + instance_ids = [instance.instance_id] + project = instance.project_id + allowed_actions = [] + actions_at = None + reply_at = None + state = "INSTANCE_ACTION_DONE" + instance.project_state = state + metadata = "{}" + self._project_notify(project, instance_ids, allowed_actions, + actions_at, reply_at, state, metadata) + + @run_async + def instance_action(self, instance, state, target_host=None): + if not self.confirm_instance_action(instance, state): + raise Exception('%s: instance %s action %s ' + 'confirmation failed' % + (self.session_id, instance.instance_id, + instance.action)) + # TBD from constraints or override in instance.action + LOG.info('Action %s instance %s ' % (instance.action, + instance.instance_id)) + try: + instance_constraints = ( + db_api.project_instance_get(instance.instance_id)) + group_id = instance_constraints.group_id + instance_group = db_api.instance_group_get(group_id) + if group_id not in self.group_impacted_members: + self.group_impacted_members[group_id] = 0 + max_parallel = instance_group.max_impacted_members + LOG.info("%s - instance_group: %s max_impacted_members: %s " + "recovery_time: %s" % + (instance.instance_id, instance_group.group_name, + max_parallel, instance_group.recovery_time)) + except db_exc.FenixDBNotFound: + raise Exception('failed to get %s constraints' % + (instance.instance_id)) + while max_parallel < self.group_impacted_members[group_id]: + LOG.info('%s waiting in group queue / max_parallel %s/%s' % + (instance.instance_id, + self.group_impacted_members[group_id], + max_parallel)) + time.sleep(5) + self.group_impacted_members[group_id] += 1 + LOG.debug("%s Reserved / max_impacted_members: %s/%s" % + (instance.instance_id, self.group_impacted_members[group_id], + max_parallel)) + if instance.action == 'MIGRATE': + if not self.migrate_server(instance, target_host): + return False + self.notify_action_done(instance) + elif instance.action == 'OWN_ACTION': + pass + elif instance.action == 'LIVE_MIGRATE': + if not self.live_migrate_server(instance, target_host): + return False + self.notify_action_done(instance) + else: + self.group_impacted_members[group_id] -= 1 + raise Exception('%s: instance %s action ' + '%s not supported' % + (self.session_id, instance.instance_id, + instance.action)) + # We need to obey recovery time for instance group before + # decrease self.group_impacted_members[group_id] to allow + # one more instances of same group to be affected by any move + # operation + if instance_group.recovery_time > 0: + LOG.debug("%s wait VNF to recover from move for %ssec" + % (instance.instance_id, + instance_group.recovery_time)) + time.sleep(instance_group.recovery_time) + self.group_impacted_members[group_id] -= 1 + LOG.debug("%s Reservation freed. remain / max_impacted_members: %s/%s" + % (instance.instance_id, + self.group_impacted_members[group_id], + max_parallel)) + + @run_async + def actions_to_have_empty_host(self, host, state, target_host=None): + thrs = [] + LOG.info('actions_to_have_empty_host %s' % host) + instances = self.instances_by_host(host) + if not instances: + raise Exception('No instances on host: %s' % host) + for instance in instances: + LOG.info('move %s from %s' % (instance.instance_name, host)) + thrs.append(self.instance_action(instance, state, + target_host)) + for thr in thrs: + thr.join() + return self._wait_host_empty(host) + + def _wait_host_empty(self, host): + hid = self.nova.hypervisors.search(host)[0].id + vcpus_used_last = 0 + # wait 4min to get host emptys + for j in range(48): + hvisor = self.nova.hypervisors.get(hid) + vcpus_used = hvisor.__getattr__('vcpus_used') + if vcpus_used > 0: + if vcpus_used != vcpus_used_last or vcpus_used_last == 0: + LOG.info('%s still has %d vcpus reserved. wait...' + % (host, vcpus_used)) + vcpus_used_last = vcpus_used + time.sleep(5) + else: + LOG.info('%s empty' % host) + return True + LOG.info('%s host still not empty' % host) + return False + + def live_migrate_server(self, instance, target_host=None): + server_id = instance.instance_id + server = self.nova.servers.get(server_id) + instance.state = server.__dict__.get('OS-EXT-STS:vm_state') + orig_host = str(server.__dict__.get('OS-EXT-SRV-ATTR:host')) + LOG.info('live_migrate_server %s state %s host %s to %s' % + (server_id, instance.state, orig_host, target_host)) + orig_vm_state = instance.state + last_vm_status = str(server.__dict__.get('status')) + last_migration_status = "active" + try: + server.live_migrate(host=target_host) + waited = 0 + migrate_retries = 0 + while waited != self.conf.live_migration_wait_time: + time.sleep(1) + server = self.nova.servers.get(server_id) + host = str(server.__dict__.get('OS-EXT-SRV-ATTR:host')) + vm_status = str(server.__dict__.get('status')) + instance.state = server.__dict__.get('OS-EXT-STS:vm_state') + instance.host = host + if vm_status != last_vm_status: + LOG.info('instance %s status changed: %s' % (server_id, + vm_status)) + if instance.state == 'error': + LOG.error('instance %s live migration failed' + % server_id) + return False + elif orig_vm_state != instance.state: + LOG.info('instance %s state changed: %s' % (server_id, + instance.state)) + elif host != orig_host: + LOG.info('instance %s live migrated to host %s' % + (server_id, host)) + return True + migration = ( + self.nova.migrations.list(instance_uuid=server_id)[0]) + if migration.status == 'error': + if migrate_retries == self.conf.live_migration_retries: + LOG.error('instance %s live migration failed after ' + '%d retries' % + (server_id, + self.conf.live_migration_retries)) + return False + # When live migrate fails it can fail fast after calling + # To have Nova time to be ready for next live migration + # There needs to be enough time to wait before retry + # And waiting more on next retry have better chance to + # Have live migration finally through + time.sleep(2 * (migrate_retries + 5)) + LOG.info('instance %s live migration failed, retry' + % server_id) + server.live_migrate(host=target_host) + waited = 0 + migrate_retries = migrate_retries + 1 + elif migration.status != last_migration_status: + LOG.info('instance %s live migration status changed: %s' + % (server_id, migration.status)) + waited = waited + 1 + last_migration_status = migration.status + last_vm_status = vm_status + LOG.error('instance %s live migration did not finish in %ss, ' + 'state: %s' % (server_id, waited, instance.state)) + except Exception as e: + LOG.error('server %s live migration failed, Exception=%s' % + (server_id, e)) + return False + + def migrate_server(self, instance, target_host=None): + server_id = instance.instance_id + server = self.nova.servers.get(server_id) + instance.state = server.__dict__.get('OS-EXT-STS:vm_state') + orig_host = str(server.__dict__.get('OS-EXT-SRV-ATTR:host')) + LOG.info('migrate_server %s state %s host %s to %s' % + (server_id, instance.state, orig_host, target_host)) + last_vm_state = instance.state + retry_migrate = 7 + while True: + try: + server.migrate(host=target_host) + time.sleep(5) + retries = 48 + while instance.state != 'resized' and retries > 0: + # try to confirm within 4min + server = self.nova.servers.get(server_id) + host = str(server.__dict__.get('OS-EXT-SRV-ATTR:host')) + instance.state = server.__dict__.get('OS-EXT-STS:vm_state') + if instance.state == 'resized': + server.confirm_resize() + LOG.info('instance %s migration resized to host %s' % + (server_id, host)) + instance.host = host + return True + if last_vm_state != instance.state: + LOG.info('instance %s state changed: %s' % (server_id, + instance.state)) + if instance.state == 'error': + LOG.error('instance %s migration failed, state: %s' + % (server_id, instance.state)) + instance.host = host + return False + time.sleep(5) + retries = retries - 1 + last_vm_state = instance.state + # Timout waiting state to change + break + + except BadRequest: + if retry_migrate == 0: + LOG.error('server %s migrate failed after retries' % + server_id) + return False + # Might take time for scheduler to sync inconsistent instance + # list for host. + retry_timeout = 40 - (retry_migrate * 5) + LOG.info('server %s migrate failed, retry in %s sec' + % (server_id, retry_timeout)) + time.sleep(retry_timeout) + # Somehow the retry mightwork when refresh the server + server = self.nova.servers.get(server_id) + except Exception as e: + LOG.error('server %s migration failed, Exception=%s' % + (server_id, e)) + return False + finally: + retry_migrate = retry_migrate - 1 + LOG.error('instance %s migration timeout, state: %s' % + (server_id, instance.state)) + return False + + def maintenance_by_plugin_type(self, hostname, plugin_type): + aps = self.get_action_plugins_by_type(plugin_type) + session_dir = "%s/%s" % (self.conf.engine.local_cache_dir, + self.session_id) + download_plugin_dir = session_dir + "/actions/" + if aps: + LOG.info("%s: Calling action plug-ins with type %s" % + (self.session_id, plugin_type)) + for ap in aps: + ap_name = "fenix.workflow.actions.%s" % ap.plugin + LOG.info("%s: Calling action plug-in module: %s" % + (self.session_id, ap_name)) + ap_db_instance = self._create_action_plugin_instance(ap.plugin, + hostname) + try: + action_plugin = getattr(import_module(ap_name), + 'ActionPlugin') + ap_instance = action_plugin(self, ap_db_instance) + except ImportError: + download_plugin_file = "%s/%s.py" % (download_plugin_dir, + ap.plugin) + LOG.info("%s: Trying from: %s" % (self.session_id, + download_plugin_file)) + if os.path.isfile(download_plugin_file): + ap_instance = ( + mod_loader_action_instance(ap_name, + download_plugin_file, + self, + ap_db_instance)) + else: + raise Exception('%s: could not find action plugin %s' % + (self.session_id, ap.plugin)) + + ap_instance.run() + if ap_db_instance.state: + LOG.info('%s: %s finished with %s host %s' % + (self.session_id, ap.plugin, + ap_db_instance.state, hostname)) + if 'FAILED' in ap_db_instance.state: + raise Exception('%s: %s finished with %s host %s' % + (self.session_id, ap.plugin, + ap_db_instance.state, hostname)) + else: + raise Exception('%s: %s reported no state for host %s' % + (self.session_id, ap.plugin, hostname)) + # If ap_db_instance failed, we keep it for state + db_api.remove_action_plugin_instance(ap_db_instance) + else: + LOG.info("%s: No action plug-ins with type %s" % + (self.session_id, plugin_type)) + + @run_async + def host_maintenance_async(self, hostname): + self.host_maintenance(hostname) + + def host_maintenance(self, hostname): + host = self.get_host_by_name(hostname) + if host.type == "compute": + self._wait_host_empty(hostname) + LOG.info('IN_MAINTENANCE %s' % hostname) + self._admin_notify(self.conf.service_user.os_project_name, + hostname, + 'IN_MAINTENANCE', + self.session_id) + for plugin_type in ["host", host.type]: + LOG.info('%s: Execute %s action plugins' % (self.session_id, + plugin_type)) + self.maintenance_by_plugin_type(hostname, plugin_type) + self._admin_notify(self.conf.service_user.os_project_name, + hostname, + 'MAINTENANCE_COMPLETE', + self.session_id) + if host.type == "compute": + self.enable_host_nova_compute(hostname) + LOG.info('MAINTENANCE_COMPLETE %s' % hostname) + host.maintained = True + + def maintenance(self): + LOG.info("%s: maintenance called" % self.session_id) + self.initialize_server_info() + + if not self.projects_listen_alarm('maintenance.scheduled'): + self.session.state = 'MAINTENANCE_FAILED' + return + + if not self.confirm_maintenance(): + self.session.state = 'MAINTENANCE_FAILED' + return + + maintenance_empty_hosts = self.get_empty_computes() + + if len(maintenance_empty_hosts) == 0: + if self.need_scale_in(): + LOG.info('%s: Need to scale in to get capacity for ' + 'empty host' % (self.session_id)) + self.session.state = 'SCALE_IN' + else: + LOG.info('%s: Free capacity, but need empty host' % + (self.session_id)) + self.session.state = 'PREPARE_MAINTENANCE' + else: + LOG.info('Empty host found') + self.session.state = 'START_MAINTENANCE' + + if self.session.maintenance_at > datetime.datetime.utcnow(): + time_now = time_now_str() + LOG.info('Time now: %s maintenance starts: %s....' % + (time_now, datetime_to_str(self.session.maintenance_at))) + td = self.session.maintenance_at - datetime.datetime.utcnow() + self.start_timer(td.total_seconds(), 'MAINTENANCE_START_TIMEOUT') + while not self.is_timer_expired('MAINTENANCE_START_TIMEOUT'): + time.sleep(1) + + time_now = time_now_str() + LOG.info('Time to start maintenance: %s' % time_now) + + def scale_in(self): + LOG.info("%s: scale in" % self.session_id) + # TBD we just blindly ask to scale_in to have at least one + # empty compute. With NUMA and CPI pinning and together with + # how many instances can be affected at the same time, we should + # calculate and ask scaling of specific instances + if not self.confirm_scale_in(): + self.session.state = 'MAINTENANCE_FAILED' + return + # TBD it takes time to have proper information updated about free + # capacity. Should make sure instances removed has also VCPUs removed + self.update_server_info() + maintenance_empty_hosts = self.get_empty_computes() + + if len(maintenance_empty_hosts) == 0: + if self.need_scale_in(): + LOG.info('%s: Need to scale in more to get capacity for ' + 'empty host' % (self.session_id)) + self.session.state = 'SCALE_IN' + else: + LOG.info('%s: Free capacity, but need empty host' % + (self.session_id)) + self.session.state = 'PREPARE_MAINTENANCE' + else: + LOG.info('Empty host found') + for host in maintenance_empty_hosts: + self._wait_host_empty(host) + self.session.state = 'START_MAINTENANCE' + + def prepare_maintenance(self): + LOG.info("%s: prepare_maintenance called" % self.session_id) + if not self.make_empty_hosts('PREPARE_MAINTENANCE'): + LOG.error('make_empty_hosts failed') + self.session.state = 'MAINTENANCE_FAILED' + else: + self.session.state = 'START_MAINTENANCE' + self.update_server_info() + + def start_maintenance(self): + LOG.info("%s: start_maintenance called" % self.session_id) + empty_hosts = self.get_empty_computes() + if not empty_hosts: + LOG.info("%s: No empty host to be maintained" % self.session_id) + self.session.state = 'MAINTENANCE_FAILED' + return + maintained_hosts = self.get_maintained_hosts_by_type('compute') + if not maintained_hosts: + computes = self.get_compute_hosts() + for compute in computes: + # When we start to maintain compute hosts, all these hosts + # nova-compute service is disabled, so projects cannot have + # instances scheduled to not maintained hosts + self.disable_host_nova_compute(compute) + for host in self.get_controller_hosts(): + # TBD one might need to change this. Now all controllers + # maintenance serialized + self.host_maintenance(host) + # First we maintain all empty compute hosts + thrs = [] + for host in empty_hosts: + thrs.append(self.host_maintenance_async(host)) + for thr in thrs: + thr.join() + self.session.state = 'PLANNED_MAINTENANCE' + + def planned_maintenance(self): + LOG.info("%s: planned_maintenance called" % self.session_id) + maintained_hosts = self.get_maintained_hosts_by_type('compute') + compute_hosts = self.get_compute_hosts() + not_maintained_hosts = ([host for host in compute_hosts if host + not in maintained_hosts]) + empty_compute_hosts = self.get_empty_computes() + parallel = len(empty_compute_hosts) + not_maintained = len(not_maintained_hosts) + while not_maintained: + if not_maintained < parallel: + parallel = not_maintained + thrs = [] + for index in range(parallel): + shost = not_maintained_hosts[index] + thost = empty_compute_hosts[index] + thrs.append( + self.actions_to_have_empty_host(shost, + 'PLANNED_MAINTENANCE', + thost)) + for thr in thrs: + thr.join() + thrs = [] + for index in range(parallel): + host = not_maintained_hosts[index] + thrs.append(self.host_maintenance_async(host)) + for thr in thrs: + thr.join() + empty_compute_hosts = self.get_empty_computes() + del not_maintained_hosts[:parallel] + parallel = len(empty_compute_hosts) + not_maintained = len(not_maintained_hosts) + self.update_server_info() + + LOG.info("%s: planned_maintenance done" % self.session_id) + self.session.state = 'MAINTENANCE_COMPLETE' + + def maintenance_complete(self): + LOG.info("%s: maintenance_complete called" % self.session_id) + LOG.info('%s: Execute post action plugins' % self.session_id) + self.maintenance_by_plugin_type("localhost", "post") + LOG.info('Projects may still need to up scale back to full ' + 'capcity') + if not self.confirm_maintenance_complete(): + self.session.state = 'MAINTENANCE_FAILED' + return + self.update_server_info() + self.session.state = 'MAINTENANCE_DONE' + + def maintenance_done(self): + pass + + def maintenance_failed(self): + LOG.info("%s: maintenance_failed called" % self.session_id) + + def cleanup(self): + LOG.info("%s: cleanup" % self.session_id) + db_api.remove_session(self.session_id) diff --git a/tox.ini b/tox.ini index 748607c..d8f021e 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,6 @@ [tox] minversion = 2.0 -envlist = py36,py35,py27,pep8,docs +envlist = py36,py35,pep8,docs skipsdist = True [testenv]