diff --git a/sahara/conductor/api.py b/sahara/conductor/api.py index 87780df91d..d65762ab42 100644 --- a/sahara/conductor/api.py +++ b/sahara/conductor/api.py @@ -546,6 +546,47 @@ class LocalApi(object): return self._manager.cluster_event_add( context, provision_step, values) + @r.wrap(r.ClusterVerificationResource) + def cluster_verification_add(self, context, cluster_id, values): + """Return created verification for the specified cluster.""" + return self._manager.cluster_verification_add( + context, _get_id(cluster_id), values) + + @r.wrap(r.ClusterVerificationResource) + def cluster_verification_get(self, context, verification_id): + """Return verification with the specified verification_id.""" + return self._manager.cluster_verification_get( + context, _get_id(verification_id)) + + @r.wrap(r.ClusterVerificationResource) + def cluster_verification_update(self, context, verification_id, values): + """Return updated verification with the specified verification_id.""" + return self._manager.cluster_verification_update( + context, _get_id(verification_id), values) + + def cluster_verification_delete(self, context, verification_id): + """"Delete verification with the specified id.""" + return self._manager.cluster_verification_delete( + context, _get_id(verification_id)) + + @r.wrap(r.ClusterHealthCheckResource) + def cluster_health_check_add(self, context, verification_id, values): + """Return created health check in the specified verification.""" + return self._manager.cluster_health_check_add( + context, _get_id(verification_id), values) + + @r.wrap(r.ClusterHealthCheckResource) + def cluster_health_check_get(self, context, health_check_id): + """Return health check with the specified health_check_id.""" + return self._manager.cluster_health_check_get( + context, _get_id(health_check_id)) + + @r.wrap(r.ClusterHealthCheckResource) + def cluster_health_check_update(self, context, health_check_id, values): + """Return updated health check with the specified health_check_id.""" + return self._manager.cluster_health_check_update( + context, _get_id(health_check_id), values) + class RemoteApi(LocalApi): """Conductor API that does updates via RPC to the ConductorManager.""" diff --git a/sahara/conductor/manager.py b/sahara/conductor/manager.py index 103639d53b..96fd9608bd 100644 --- a/sahara/conductor/manager.py +++ b/sahara/conductor/manager.py @@ -714,3 +714,36 @@ class ConductorManager(db_base.Base): def cluster_event_add(self, context, provision_step, values): """Assign new event to the specified provision step.""" return self.db.cluster_event_add(context, provision_step, values) + + # Cluster verifications / health checks ops + + def cluster_verification_add(self, context, cluster_id, values): + """Return created verification for the specified cluster.""" + return self.db.cluster_verification_add(context, cluster_id, values) + + def cluster_verification_get(self, context, verification_id): + """Return verification with the specified verification_id.""" + return self.db.cluster_verification_get(context, verification_id) + + def cluster_verification_update(self, context, verification_id, values): + """Return updated verification with the specified verification_id.""" + return self.db.cluster_verification_update( + context, verification_id, values) + + def cluster_verification_delete(self, context, verification_id): + """"Delete verification with the specified id.""" + return self.db.cluster_verification_delete(context, verification_id) + + def cluster_health_check_add(self, context, verification_id, values): + """Return created health check in the specified verification.""" + return self.db.cluster_health_check_add( + context, verification_id, values) + + def cluster_health_check_get(self, context, health_check_id): + """Return health check with the specified health_check_id.""" + return self.db.cluster_health_check_get(context, health_check_id) + + def cluster_health_check_update(self, context, health_check_id, values): + """Return updated health check with the specified health_check_id.""" + return self.db.cluster_health_check_update( + context, health_check_id, values) diff --git a/sahara/conductor/objects.py b/sahara/conductor/objects.py index 94f9820a00..11b9373d2d 100644 --- a/sahara/conductor/objects.py +++ b/sahara/conductor/objects.py @@ -336,3 +336,24 @@ class ClusterEvent(object): successful step_id """ + + +class ClusterVerification(object): + """An object representing cluster verification + + id + cluster_id + status + checks + """ + + +class ClusterHealthCheck(object): + """An object representing health check + + id + verification_id + status + description + name + """ diff --git a/sahara/conductor/resource.py b/sahara/conductor/resource.py index 98bc53ad88..f8e4b31d74 100644 --- a/sahara/conductor/resource.py +++ b/sahara/conductor/resource.py @@ -207,6 +207,17 @@ class ClusterTemplateResource(Resource, objects.ClusterTemplate): } +class ClusterHealthCheckResource(Resource, objects.ClusterHealthCheck): + _resource_name = 'cluster_health_check' + + +class ClusterVerificationResource(Resource, objects.ClusterVerification): + _resource_name = 'cluster_verification' + _children = { + 'checks': (ClusterHealthCheckResource, 'verification') + } + + class ClusterResource(Resource, objects.Cluster): def sanitize_cluster_configs(self, cluster_configs): @@ -218,7 +229,8 @@ class ClusterResource(Resource, objects.Cluster): _children = { 'node_groups': (NodeGroupResource, 'cluster'), - 'cluster_template': (ClusterTemplateResource, None) + 'cluster_template': (ClusterTemplateResource, None), + 'verification': (ClusterVerificationResource, 'cluster') } _filter_fields = ['management_private_key', 'extra', 'rollback_info', diff --git a/sahara/db/api.py b/sahara/db/api.py index e732b9c5bf..6eb6e62878 100644 --- a/sahara/db/api.py +++ b/sahara/db/api.py @@ -552,3 +552,46 @@ def cluster_provision_progress_update(context, cluster_id): def cluster_event_add(context, provision_step, values): """Assign new event to the specified provision step.""" return IMPL.cluster_event_add(context, provision_step, values) + + +# Health verifications / checks ops + +@to_dict +def cluster_verification_add(context, cluster_id, values): + """Return created verification for the specified cluster.""" + return IMPL.cluster_verification_add(context, cluster_id, values) + + +@to_dict +def cluster_verification_get(context, verification_id): + """Return verification with the specified verification_id.""" + return IMPL.cluster_verification_get(context, verification_id) + + +@to_dict +def cluster_verification_update(context, verification_id, values): + """Return updated verification with the specified verification_id.""" + return IMPL.cluster_verification_update(context, verification_id, values) + + +def cluster_verification_delete(context, verification_id): + """"Delete verification with the specified id.""" + return IMPL.cluster_verification_delete(context, verification_id) + + +@to_dict +def cluster_health_check_add(context, verification_id, values): + """Return created health check in the specified verification.""" + return IMPL.cluster_health_check_add(context, verification_id, values) + + +@to_dict +def cluster_health_check_get(context, health_check_id): + """Return health check with the specified health_check_id.""" + return IMPL.cluster_health_check_get(context, health_check_id) + + +@to_dict +def cluster_health_check_update(context, health_check_id, values): + """Return updated health check with the specified health_check_id.""" + return IMPL.cluster_health_check_update(context, health_check_id, values) diff --git a/sahara/db/migration/alembic_migrations/versions/030-health-check.py b/sahara/db/migration/alembic_migrations/versions/030-health-check.py new file mode 100644 index 0000000000..1846c7534b --- /dev/null +++ b/sahara/db/migration/alembic_migrations/versions/030-health-check.py @@ -0,0 +1,63 @@ +# Copyright 2016 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""health-check + +Revision ID: 029 +Revises: 028 +Create Date: 2016-01-26 16:11:46.008367 + +""" + +# revision identifiers, used by Alembic. +revision = '030' +down_revision = '029' + +from alembic import op +import sqlalchemy as sa + + +MYSQL_ENGINE = 'InnoDB' +MYSQL_CHARSET = 'utf8' + + +def upgrade(): + op.create_table( + 'cluster_verifications', + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('cluster_id', sa.String(length=36), nullable=True), + sa.Column('status', sa.String(length=15), nullable=True), + sa.ForeignKeyConstraint(['cluster_id'], ['clusters.id'], ), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('id', 'cluster_id'), + mysql_engine=MYSQL_ENGINE, + mysql_charset=MYSQL_CHARSET) + op.create_table( + 'cluster_health_checks', + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('verification_id', sa.String(length=36), nullable=True), + sa.Column('status', sa.String(length=15), nullable=True), + sa.Column('description', sa.Text(), nullable=True), + sa.Column('name', sa.String(length=80), nullable=True), + sa.ForeignKeyConstraint( + ['verification_id'], ['cluster_verifications.id'], ), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('id', 'verification_id'), + mysql_engine=MYSQL_ENGINE, + mysql_charset=MYSQL_CHARSET) diff --git a/sahara/db/sqlalchemy/api.py b/sahara/db/sqlalchemy/api.py index 859f8c9e2e..fbf3ff30c6 100644 --- a/sahara/db/sqlalchemy/api.py +++ b/sahara/db/sqlalchemy/api.py @@ -1369,3 +1369,113 @@ def cluster_event_add(context, step_id, values): session.add(event) return event.id + + +# Cluster verifications / health check ops + +def _cluster_verification_get(context, session, verification_id): + # tenant id is not presented + query = model_query(m.ClusterVerification, context, session, + project_only=False) + return query.filter_by(id=verification_id).first() + + +def cluster_verification_get(context, verification_id): + return _cluster_verification_get(context, get_session(), verification_id) + + +def cluster_verification_add(context, cluster_id, values): + session = get_session() + + with session.begin(): + cluster = _cluster_get(context, session, cluster_id) + + if not cluster: + raise ex.NotFoundException( + cluster_id, _("Cluster id '%s' not found!")) + + verification = m.ClusterVerification() + values['cluster_id'] = cluster_id + verification.update(values) + session.add(verification) + + return verification + + +def cluster_verification_update(context, verification_id, values): + session = get_session() + + with session.begin(): + verification = _cluster_verification_get( + context, session, verification_id) + + if not verification: + raise ex.NotFoundException( + verification_id, _("Verification id '%s' not found!")) + + verification.update(values) + return verification + + +def cluster_verification_delete(context, verification_id): + session = get_session() + + with session.begin(): + verification = _cluster_verification_get( + context, session, verification_id) + + if not verification: + raise ex.NotFoundException( + verification_id, _("Verification id '%s' not found!")) + + for check in verification.checks: + session.delete(check) + + session.delete(verification) + + +def _cluster_health_check_get(context, session, health_check_id): + # tenant id is not presented + query = model_query(m.ClusterHealthCheck, context, session, + project_only=False) + return query.filter_by(id=health_check_id).first() + + +def cluster_health_check_get(context, health_check_id): + return _cluster_health_check_get(context, get_session(), health_check_id) + + +def cluster_health_check_add(context, verification_id, values): + session = get_session() + + with session.begin(): + verification = _cluster_verification_get( + context, session, verification_id) + + if not verification: + raise ex.NotFoundException( + verification_id, _("Verification id '%s' not found!")) + + health_check = m.ClusterHealthCheck() + values['verification_id'] = verification_id + values['tenant_id'] = context.tenant_id + health_check.update(values) + session.add(health_check) + + return health_check + + +def cluster_health_check_update(context, health_check_id, values): + session = get_session() + + with session.begin(): + health_check = _cluster_health_check_get( + context, session, health_check_id) + + if not health_check: + raise ex.NotFoundException( + health_check_id, _("Health check id '%s' not found!")) + + health_check.update(values) + + return health_check diff --git a/sahara/db/sqlalchemy/models.py b/sahara/db/sqlalchemy/models.py index d1289fdfcf..8ccc40abfe 100644 --- a/sahara/db/sqlalchemy/models.py +++ b/sahara/db/sqlalchemy/models.py @@ -72,6 +72,8 @@ class Cluster(mb.SaharaBase): cascade="all,delete", backref='cluster', lazy='joined') + verification = relationship('ClusterVerification', cascade="all,delete", + backref="cluster", lazy='joined') node_groups = relationship('NodeGroup', cascade="all,delete", backref='cluster', lazy='joined') cluster_template_id = sa.Column(sa.String(36), @@ -87,6 +89,8 @@ class Cluster(mb.SaharaBase): d['node_groups'] = [ng.to_dict() for ng in self.node_groups] d['provision_progress'] = [pp.to_dict(show_progress) for pp in self.provision_progress] + if self.verification: + d['verification'] = self.verification[0].to_dict() return d @@ -498,3 +502,38 @@ class ClusterProvisionStep(mb.SaharaBase): if show_progress: d['events'] = [event.to_dict() for event in self.events] return d + + +class ClusterVerification(mb.SaharaBase): + """ClusterVerification represent results of cluster health checks.""" + + __tablename__ = 'cluster_verifications' + + __table_args__ = (sa.UniqueConstraint('id', 'cluster_id'),) + + id = _id_column() + cluster_id = sa.Column( + sa.String(36), sa.ForeignKey('clusters.id')) + status = sa.Column(sa.String(15)) + checks = relationship( + 'ClusterHealthCheck', cascade="all,delete", + backref='ClusterVerification', lazy='joined') + + def to_dict(self): + base = super(ClusterVerification, self).to_dict() + base['checks'] = [check.to_dict() for check in self.checks] + return base + + +class ClusterHealthCheck(mb.SaharaBase): + """ClusterHealthCheck respresent cluster health check.""" + + __tablename__ = 'cluster_health_checks' + __table_args__ = (sa.UniqueConstraint('id', 'verification_id'),) + + id = _id_column() + verification_id = sa.Column( + sa.String(36), sa.ForeignKey('cluster_verifications.id')) + status = sa.Column(sa.String(15)) + description = sa.Column(sa.Text) + name = sa.Column(sa.String(80)) diff --git a/sahara/exceptions.py b/sahara/exceptions.py index ed08099dc2..d7352e3b7a 100644 --- a/sahara/exceptions.py +++ b/sahara/exceptions.py @@ -33,7 +33,7 @@ class SaharaException(Exception): def __str__(self): return self.message - def __init__(self, message=None, code=None): + def __init__(self, message=None, code=None, inject_error_id=True): self.uuid = uuidutils.generate_uuid() if code: @@ -41,9 +41,10 @@ class SaharaException(Exception): if message: self.message = message - # Add Error UUID to the message - self.message = (_('%(message)s\nError ID: %(id)s') - % {'message': self.message, 'id': self.uuid}) + if inject_error_id: + # Add Error UUID to the message if required + self.message = (_('%(message)s\nError ID: %(id)s') + % {'message': self.message, 'id': self.uuid}) super(SaharaException, self).__init__( '%s: %s' % (self.code, self.message)) diff --git a/sahara/service/api.py b/sahara/service/api.py index 4bcc7ad904..47faac7462 100644 --- a/sahara/service/api.py +++ b/sahara/service/api.py @@ -22,6 +22,7 @@ from six.moves.urllib import parse as urlparse from sahara import conductor as c from sahara import context from sahara.plugins import base as plugin_base +from sahara.service.health import verification_base from sahara.service import quotas from sahara.utils import cluster as c_u from sahara.utils import general as g @@ -171,6 +172,9 @@ def terminate_cluster(id): def update_cluster(id, values): + if verification_base.update_verification_required(values): + OPS.handle_verification(id, values) + return conductor.cluster_get(context.ctx(), id) return conductor.cluster_update(context.ctx(), id, values) diff --git a/sahara/service/health/__init__.py b/sahara/service/health/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sahara/service/health/common.py b/sahara/service/health/common.py new file mode 100644 index 0000000000..ac97703d64 --- /dev/null +++ b/sahara/service/health/common.py @@ -0,0 +1,46 @@ +# Copyright (c) 2016 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg + +HEALTH_STATUS_GREEN = "GREEN" +HEALTH_STATUS_YELLOW = "YELLOW" +HEALTH_STATUS_RED = "RED" +HEALTH_STATUS_CHECKING = "CHECKING" +HEALTH_STATUS_DONE = [ + HEALTH_STATUS_GREEN, + HEALTH_STATUS_YELLOW, + HEALTH_STATUS_RED, +] + +VERIFICATIONS_START_OPS = "START" + +VERIFICATIONS_OPS = [ + VERIFICATIONS_START_OPS, +] + +CONF = cfg.CONF + +health_opts = [ + cfg.BoolOpt('verification_enable', default=True, + help="Option to enable verifications for all clusters"), + cfg.IntOpt('verification_periodic_interval', default=600, + help="Interval between two consecutive periodic tasks for" + "verifications, in seconds.") +] +health_opts_group = cfg.OptGroup( + 'cluster_verifications', title='Options to configure verifications') +CONF.register_group(group=health_opts_group) +CONF.register_opts(health_opts, group=health_opts_group) diff --git a/sahara/service/health/health_check_base.py b/sahara/service/health/health_check_base.py new file mode 100644 index 0000000000..8f8c38226f --- /dev/null +++ b/sahara/service/health/health_check_base.py @@ -0,0 +1,150 @@ +# Copyright (c) 2016 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc + +from oslo_log import log as logging +import six + +from sahara import conductor +from sahara import context +from sahara import exceptions +from sahara.i18n import _ +from sahara.i18n import _LE +from sahara.service.health import common +from sahara.utils import cluster as cluster_utils + + +cond = conductor.API +LOG = logging.getLogger(__name__) + + +class BaseHealthError(exceptions.SaharaException): + message_template = _("Cluster health is %(status)s. Reason: %(reason)s") + code = 'HEALTH_ERROR' + status = 'UNKNOWN' + + def __init__(self, reason): + message = self.message_template % { + 'status': self.status, 'reason': reason} + # Ignoring Error id because it's not really needed + super(BaseHealthError, self).__init__(message, inject_error_id=False) + + +class RedHealthError(BaseHealthError): + """Exception to indicate red state of the health check.""" + code = "RED_STATE" + status = 'RED' + + +class YellowHealthError(BaseHealthError): + """Exception to indicate yellow state of health check.""" + code = "YELLOW_STATE" + status = 'YELLOW' + + +@six.add_metaclass(abc.ABCMeta) +class BasicHealthCheck(object): + def __init__(self, cluster): + """Initialize a health check for the specified cluster.""" + self.cluster = cluster + self.health_check_id = None + + @abc.abstractmethod + def get_health_check_name(self): + """Return name of the health check.""" + pass + + @abc.abstractmethod + def is_available(self): + """Validate availability of the health check for the specified cluster + + Return True when the health check is available for the specified + cluster and False when it's not. + """ + pass + + @abc.abstractmethod + def check_health(self): + """Check health of the specified cluster + + Returns description if health check executed successfully. Raises + YellowStateError to indicate Yellow result of the health check, + and RedStateError to indicate Red result of the health check. + """ + pass + + def _indicate_start(self): + vid = self.cluster.verification.id + self.health_check_id = cond.cluster_health_check_add( + context.ctx(), vid, {'status': common.HEALTH_STATUS_CHECKING, + 'name': self.get_health_check_name()}).id + + def _write_result(self, status, description): + cond.cluster_health_check_update( + context.ctx(), self.health_check_id, + {'status': status, 'description': description}) + + def execute(self): + if not self.is_available(): + return + self._indicate_start() + try: + result = self.check_health() + status = common.HEALTH_STATUS_GREEN + except Exception as exc: + result = six.text_type(exc) + if isinstance(exc, BaseHealthError): + status = exc.status + else: + status = common.HEALTH_STATUS_RED + self._write_result(status, result) + + +class AllInstancesAccessible(BasicHealthCheck): + def __init__(self, cluster): + super(AllInstancesAccessible, self).__init__(cluster) + + def is_available(self): + # always available : ) + return True + + def get_health_check_name(self): + return "Check of instances accessibility" + + @staticmethod + def _check_health_for_instance(instance): + with instance.remote() as r: + r.execute_command("cd /tmp/") + + def check_health(self): + instances = cluster_utils.get_instances(self.cluster) + try: + for inst in instances: + self._check_health_for_instance(inst) + except Exception: + LOG.exception(_LE( + "Some instances in the cluster are not available")) + raise RedHealthError(_("Some instances are not available")) + + return _("All instances are available") + + +def get_basic(cluster): + return [AllInstancesAccessible] + + +def get_health_checks(cluster): + return get_basic(cluster) diff --git a/sahara/service/health/verification_base.py b/sahara/service/health/verification_base.py new file mode 100644 index 0000000000..8f235b551b --- /dev/null +++ b/sahara/service/health/verification_base.py @@ -0,0 +1,145 @@ +# Copyright (c) 2016 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections + +from oslo_config import cfg +from oslo_log import log as logging + +from sahara import conductor +from sahara import context +from sahara import exceptions +from sahara.i18n import _ +from sahara.service.health import common +from sahara.service.health import health_check_base +from sahara.utils import cluster as cluster_utils + + +cond = conductor.API +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + + +class CannotVerifyError(exceptions.SaharaException): + code = "CANNOT_VERIFY" + message_template = _("Cannot verify cluster. Reason: %s") + + def __init__(self, reason): + message = self.message_template % reason + super(CannotVerifyError, self).__init__(message) + + +def get_possible_ops(): + return common.VERIFICATIONS_OPS + + +def verification_exists(cluster): + try: + if cluster.verification is not None: + return True + except (AttributeError, KeyError): + return False + + +def validate_verification_ops(cluster, data): + status = data.get('verification', {}).get('status', None) + if not status: + # update doesn't affect verifications + return False + if len(list(data.keys())) != 1: + raise CannotVerifyError( + _("Can't update verification with other updates")) + if status == common.VERIFICATIONS_START_OPS: + return validate_verification_start(cluster) + + +def validate_verification_start(cluster): + if not CONF.cluster_verifications.verification_enable: + raise CannotVerifyError(_("All verifications are disabled")) + ctx = context.ctx() + cluster = cond.cluster_get(ctx, cluster) + if not cluster or cluster.status != cluster_utils.CLUSTER_STATUS_ACTIVE: + raise CannotVerifyError(_("Cluster is not active or doesn't exists")) + if not verification_exists(cluster): + return True + if cluster.verification.status == common.HEALTH_STATUS_CHECKING: + raise CannotVerifyError( + _("Cluster verification in state %s") + % common.HEALTH_STATUS_CHECKING) + return True + + +def _prepare_verification_running(ctx, cluster): + if verification_exists(cluster): + vid = cluster.verification.id + # to delete all data related to the previous one + cond.cluster_verification_delete(ctx, vid) + return (cond.cluster_verification_add( + ctx, cluster, {'status': common.HEALTH_STATUS_CHECKING}), + cond.cluster_get(ctx, cluster)) + + +def _execute_health_checks(ctx, cluster): + health_checks = health_check_base.get_health_checks(cluster) + actual = [] + with context.ThreadGroup() as tg: + for check in health_checks: + actual_check = check(cluster) + actual.append(actual_check) + tg.spawn('health-check-exc', actual_check.execute) + + +def _decide_status_for_verification(ctx, verification): + ver = cond.cluster_verification_get(ctx, verification) + cnt = collections.Counter() + for check in ver.checks: + cnt[check.status] += 1 + + if cnt[common.HEALTH_STATUS_GREEN] == len(ver.checks): + decided_status = common.HEALTH_STATUS_GREEN + elif cnt[common.HEALTH_STATUS_RED] > 0: + decided_status = common.HEALTH_STATUS_RED + else: + decided_status = common.HEALTH_STATUS_YELLOW + + return cond.cluster_verification_update( + context.ctx(), ver.id, {'status': decided_status}) + + +def verification_run(cluster): + ctx = context.ctx() + LOG.debug("Running verification for the cluster") + ver, cluster = _prepare_verification_running(ctx, cluster) + _execute_health_checks(ctx, cluster) + return _decide_status_for_verification(ctx, ver) + + +def handle_verification(cluster, values): + cluster = cond.cluster_get(context.ctx(), cluster) + context.set_current_cluster_id(cluster.id) + values = {} if not values else values + status = values.get('verification', {}).get('status', None) + if status == common.VERIFICATIONS_START_OPS: + verification_run(cluster) + + +def update_verification_required(values): + if values.get('verification', {}).get('status', None): + return True + return False + + +def get_verification_periodic_interval(): + return CONF.cluster_verifications.verification_periodic_interval diff --git a/sahara/service/ops.py b/sahara/service/ops.py index f5b7262f56..83dc5dfe91 100644 --- a/sahara/service/ops.py +++ b/sahara/service/ops.py @@ -28,6 +28,7 @@ from sahara.i18n import _ from sahara.i18n import _LE from sahara.plugins import base as plugin_base from sahara.service.edp import job_manager +from sahara.service.health import verification_base as ver_base from sahara.service import ntp_service from sahara.service import shares from sahara.service import trusts @@ -35,7 +36,6 @@ from sahara.utils import cluster as c_u from sahara.utils import remote from sahara.utils import rpc as rpc_utils - conductor = c.API CONF = cfg.CONF LOG = logging.getLogger(__name__) @@ -75,6 +75,10 @@ class LocalOps(object): context.spawn("Deleting Job Execution %s" % job_execution_id, _delete_job_execution, job_execution_id) + def handle_verification(self, cluster_id, values): + context.spawn('Handling Verification for cluster %s' % cluster_id, + _handle_verification, cluster_id, values) + def get_engine_type_and_version(self): return INFRA.get_type_and_version() @@ -105,6 +109,9 @@ class RemoteOps(rpc_utils.RPCClient): self.cast('delete_job_execution', job_execution_id=job_execution_id) + def handle_verification(self, cluster_id, values): + self.cast('handle_verification', cluster_id=cluster_id, values=values) + def get_engine_type_and_version(self): return self.call('get_engine_type_and_version') @@ -148,6 +155,10 @@ class OpsServer(rpc_utils.RPCServer): def delete_job_execution(self, job_execution_id): _delete_job_execution(job_execution_id) + @request_context + def handle_verification(self, cluster_id, values): + _handle_verification(cluster_id, values) + @request_context def get_engine_type_and_version(self): return INFRA.get_type_and_version() @@ -365,3 +376,7 @@ def _delete_job_execution(job_execution_id): LOG.error(_LE("Job execution can't be cancelled in time. " "Deleting it anyway.")) conductor.job_execution_destroy(context.ctx(), job_execution_id) + + +def _handle_verification(cluster_id, values): + ver_base.handle_verification(cluster_id, values) diff --git a/sahara/service/periodic.py b/sahara/service/periodic.py index 85eb29f476..f261e5f4ae 100644 --- a/sahara/service/periodic.py +++ b/sahara/service/periodic.py @@ -29,6 +29,7 @@ from sahara.i18n import _LW from sahara.service import api from sahara.service import coordinator from sahara.service.edp import job_manager +from sahara.service.health import verification_base as vb from sahara.service import trusts from sahara.utils import cluster as c_u from sahara.utils import edp @@ -224,6 +225,21 @@ def _make_periodic_tasks(): # Add event log info cleanup context.ctx().current_instance_info = context.InstanceInfo() + @periodic_task.periodic_task( + spacing=vb.get_verification_periodic_interval()) + @set_context + def run_verifications(self, ctx): + LOG.debug("Executing health checks for the clusters") + start_dict = {'verification': {'status': 'START'}} + for cluster in conductor.cluster_get_all( + context.ctx(), status=c_u.CLUSTER_STATUS_ACTIVE): + try: + vb.validate_verification_start(cluster) + api.update_cluster(cluster.id, start_dict) + except vb.CannotVerifyError: + LOG.debug("Skipping running verification " + "on the cluster %s" % cluster.name) + return SaharaPeriodicTasks() diff --git a/sahara/service/validations/clusters.py b/sahara/service/validations/clusters.py index c37ccc2de5..aab09ebba0 100644 --- a/sahara/service/validations/clusters.py +++ b/sahara/service/validations/clusters.py @@ -19,10 +19,10 @@ from sahara import context import sahara.exceptions as ex from sahara.i18n import _ import sahara.service.api as api +from sahara.service.health import verification_base from sahara.service.validations import acl import sahara.service.validations.base as b - CONF = cfg.CONF @@ -115,5 +115,8 @@ def check_cluster_delete(cluster_id, **kwargs): def check_cluster_update(cluster_id, data, **kwargs): cluster = api.get_cluster(cluster_id) + verification = verification_base.validate_verification_ops( + cluster, data) acl.check_tenant_for_update(context.current(), cluster) - acl.check_protected_from_update(cluster, data) + if not verification: + acl.check_protected_from_update(cluster, data) diff --git a/sahara/service/validations/clusters_schema.py b/sahara/service/validations/clusters_schema.py index fb0bffb41f..ee6e989359 100644 --- a/sahara/service/validations/clusters_schema.py +++ b/sahara/service/validations/clusters_schema.py @@ -15,6 +15,7 @@ import copy +from sahara.service.health import verification_base import sahara.service.validations.cluster_template_schema as ct_schema from sahara.service.validations import shares @@ -68,6 +69,14 @@ CLUSTER_UPDATE_SCHEMA = { "is_protected": { "type": ["boolean", "null"], }, + "verification": { + "type": "object", + "properties": { + "status": { + "enum": verification_base.get_possible_ops(), + } + }, + }, "shares": copy.deepcopy(shares.SHARE_SCHEMA), }, "additionalProperties": False, diff --git a/sahara/tests/unit/db/migration/test_migrations.py b/sahara/tests/unit/db/migration/test_migrations.py index 0eeed869ed..6cc9da7322 100644 --- a/sahara/tests/unit/db/migration/test_migrations.py +++ b/sahara/tests/unit/db/migration/test_migrations.py @@ -578,6 +578,35 @@ class SaharaMigrationsCheckers(object): self.assertFalse(res['is_protected']) engine.execute(t.delete()) + def _check_030(self, engine, data): + health_check_columns = [ + 'status', + 'name', + 'description', + 'id', + 'verification_id', + 'created_at', + 'updated_at' + ] + + verification_columns = [ + 'status', + 'id', + 'cluster_id', + 'created_at', + 'updated_at' + ] + + self.assertColumnCount(engine, 'cluster_verifications', + verification_columns) + self.assertColumnsExist(engine, 'cluster_verifications', + verification_columns) + + self.assertColumnCount(engine, 'cluster_health_checks', + health_check_columns) + self.assertColumnsExist(engine, 'cluster_health_checks', + health_check_columns) + class TestMigrationsMySQL(SaharaMigrationsCheckers, base.BaseWalkMigrationTestCase, diff --git a/sahara/tests/unit/service/health/__init__.py b/sahara/tests/unit/service/health/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sahara/tests/unit/service/health/test_verification_base.py b/sahara/tests/unit/service/health/test_verification_base.py new file mode 100644 index 0000000000..aa8dc2f9e8 --- /dev/null +++ b/sahara/tests/unit/service/health/test_verification_base.py @@ -0,0 +1,166 @@ +# Copyright (c) 2016 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +import six + +from sahara import conductor +from sahara import context +from sahara import exceptions +from sahara.service.health import health_check_base +from sahara.service.health import verification_base +from sahara.tests.unit import base +from sahara.tests.unit.conductor import test_api + + +class Check(health_check_base.BasicHealthCheck): + def check_health(self): + return "No criminality" + + def get_health_check_name(self): + return "James bond check" + + def is_available(self): + return True + + +class RedCheck(Check): + def check_health(self): + raise health_check_base.RedHealthError("Ooouch!") + + +class YellowCheck(Check): + def check_health(self): + raise health_check_base.YellowHealthError("No problems, boss!") + + +class TestVerifications(base.SaharaWithDbTestCase): + def setUp(self): + super(TestVerifications, self).setUp() + self.api = conductor.API + + def _cluster_sample(self): + ctx = context.ctx() + cluster = self.api.cluster_create(ctx, test_api.SAMPLE_CLUSTER) + return cluster + + @mock.patch('sahara.service.health.health_check_base.get_health_checks') + def test_verification_start(self, get_health_checks): + cluster = self._cluster_sample() + get_health_checks.return_value = [Check] + verification_base.handle_verification(cluster, { + 'verification': {'status': 'START'}}) + cluster = self.api.cluster_get(context.ctx(), cluster) + ver = cluster.verification + self.assertEqual('GREEN', ver['status']) + self.assertEqual(1, len(ver['checks'])) + + self.assertEqual('No criminality', ver.checks[0]['description']) + id = ver['id'] + + get_health_checks.return_value = [YellowCheck, Check, Check] + + verification_base.handle_verification(cluster, { + 'verification': {'status': 'START'}}) + cluster = self.api.cluster_get(context.ctx(), cluster) + ver = cluster.verification + + self.assertEqual('YELLOW', ver['status']) + self.assertEqual(3, len(ver['checks'])) + self.assertNotEqual(ver['id'], id) + + get_health_checks.return_value = [RedCheck, YellowCheck] + + verification_base.handle_verification(cluster, { + 'verification': {'status': 'START'}}) + cluster = self.api.cluster_get(context.ctx(), cluster) + ver = cluster.verification + + self.assertEqual('RED', ver['status']) + self.assertEqual(2, len(ver['checks'])) + self.assertNotEqual(ver['id'], id) + self.assertEqual("James bond check", ver['checks'][0]['name']) + + def _validate_exception(self, exc, expected_message): + message = six.text_type(exc) + # removing Error ID + message = message.split('\n')[0] + self.assertEqual(expected_message, message) + + def test_conductor_crud_verifications(self): + ctx = context.ctx() + try: + self.api.cluster_verification_add( + ctx, '1', values={'status': 'name'}) + except exceptions.NotFoundException as e: + self._validate_exception(e, "Cluster id '1' not found!") + + cl = self._cluster_sample() + ver = self.api.cluster_verification_add( + ctx, cl.id, values={'status': 'GREAT!'}) + ver = self.api.cluster_verification_get(ctx, ver['id']) + self.assertEqual('GREAT!', ver['status']) + + self.api.cluster_verification_update(ctx, ver['id'], + values={'status': "HEY!"}) + ver = self.api.cluster_verification_get(ctx, ver['id']) + self.assertEqual('HEY!', ver['status']) + self.assertIsNone( + self.api.cluster_verification_delete(ctx, ver['id'])) + + try: + self.api.cluster_verification_delete(ctx, ver['id']) + except exceptions.NotFoundException as e: + self._validate_exception( + e, "Verification id '%s' not found!" % ver['id']) + try: + self.api.cluster_verification_update( + ctx, ver['id'], values={'status': "ONE MORE"}) + except exceptions.NotFoundException as e: + self._validate_exception( + e, "Verification id '%s' not found!" % ver['id']) + + self.assertIsNone(self.api.cluster_verification_get(ctx, ver['id'])) + + def test_conductor_crud_health_checks(self): + ctx = context.ctx() + try: + self.api.cluster_health_check_add( + ctx, '1', values={'status': 'status'}) + except exceptions.NotFoundException as e: + self._validate_exception(e, "Verification id '1' not found!") + + cl = self._cluster_sample() + vid = self.api.cluster_verification_add( + ctx, cl.id, values={'status': 'GREAT!'})['id'] + + hc = self.api.cluster_health_check_add(ctx, vid, {'status': "Sah"}) + hc = self.api.cluster_health_check_get(ctx, hc['id']) + self.assertEqual('Sah', hc['status']) + + hc = self.api.cluster_health_check_update( + ctx, hc['id'], {'status': "ara"}) + hc = self.api.cluster_health_check_get(ctx, hc['id']) + self.assertEqual('ara', hc['status']) + + self.api.cluster_verification_delete(ctx, vid) + try: + hc = self.api.cluster_health_check_update( + ctx, hc['id'], {'status': "rulez!"}) + except exceptions.NotFoundException as e: + self._validate_exception( + e, "Health check id '%s' not found!" % hc['id']) + + self.assertIsNone(self.api.cluster_health_check_get(ctx, hc['id'])) diff --git a/sahara/tests/unit/service/validation/test_cluster_update_validation.py b/sahara/tests/unit/service/validation/test_cluster_update_validation.py index 2ebfeb91f5..0d8d6e93ef 100644 --- a/sahara/tests/unit/service/validation/test_cluster_update_validation.py +++ b/sahara/tests/unit/service/validation/test_cluster_update_validation.py @@ -19,6 +19,7 @@ import testtools from sahara import exceptions as ex from sahara.service import api +from sahara.service.health import verification_base from sahara.service.validations import clusters as c_val from sahara.service.validations import clusters_schema as c_schema from sahara.tests.unit.service.validation import utils as u @@ -98,3 +99,44 @@ class TestClusterUpdateValidation(u.ValidationTestCase): except ex.UpdateFailedException as e: self.assert_created_in_another_tenant_exception(e) raise e + + @mock.patch('sahara.conductor.API.cluster_get') + def test_verifications_ops(self, get_cluster_mock): + cluster = tu.create_cluster( + 'cluster1', "tenant_1", "fake", "0.1", ['ng1'], status='Active') + get_cluster_mock.return_value = cluster + self.assertIsNone(c_val.check_cluster_update( + cluster, {'verification': {'status': "START"}})) + cluster = tu.create_cluster( + 'cluster1', "tenant_1", "fake", "0.1", ['ng1'], + status='Active', verification={'status': "CHECKING"}) + get_cluster_mock.return_value = cluster + with testtools.ExpectedException(verification_base.CannotVerifyError): + c_val.check_cluster_update( + cluster, {'verification': {'status': 'START'}}) + + cluster = tu.create_cluster( + 'cluster1', "tenant_1", "fake", "0.1", ['ng1'], + status='Active', verification={'status': "RED"}) + get_cluster_mock.return_value = cluster + self.assertIsNone(c_val.check_cluster_update( + cluster, {'verification': {'status': "START"}})) + + with testtools.ExpectedException(verification_base.CannotVerifyError): + c_val.check_cluster_update(cluster, { + 'is_public': True, 'verification': {'status': "START"}}) + + # allow verification for protected resource + cluster = tu.create_cluster( + 'cluster1', "tenant_1", "fake", "0.1", ['ng1'], + is_protected=True, status='Active') + get_cluster_mock.return_value = cluster + self.assertIsNone(c_val.check_cluster_update( + cluster, {'verification': {'status': "START"}})) + # just for sure that protected works nicely for other + with testtools.ExpectedException(ex.UpdateFailedException): + try: + c_val.check_cluster_update(cluster.id, {'name': 'new'}) + except ex.UpdateFailedException as e: + self.assert_protected_resource_exception(e) + raise e