base cluster verifications implementation

this change implements basic things requried for
cluster verifications:

 * API modifications;
 * DB implementation and conductor ops;
 * basic health checks;
 * periodic job.

Partially implements blueprint: cluster-verification
Change-Id: I1b975c9a5e5241c660de66a04bf559fc8f960873
This commit is contained in:
Vitaly Gridnev 2016-01-28 17:47:03 +03:00
parent 8b30734bd3
commit d1f1d58fa9
22 changed files with 996 additions and 8 deletions

View File

@ -546,6 +546,47 @@ class LocalApi(object):
return self._manager.cluster_event_add(
context, provision_step, values)
@r.wrap(r.ClusterVerificationResource)
def cluster_verification_add(self, context, cluster_id, values):
"""Return created verification for the specified cluster."""
return self._manager.cluster_verification_add(
context, _get_id(cluster_id), values)
@r.wrap(r.ClusterVerificationResource)
def cluster_verification_get(self, context, verification_id):
"""Return verification with the specified verification_id."""
return self._manager.cluster_verification_get(
context, _get_id(verification_id))
@r.wrap(r.ClusterVerificationResource)
def cluster_verification_update(self, context, verification_id, values):
"""Return updated verification with the specified verification_id."""
return self._manager.cluster_verification_update(
context, _get_id(verification_id), values)
def cluster_verification_delete(self, context, verification_id):
""""Delete verification with the specified id."""
return self._manager.cluster_verification_delete(
context, _get_id(verification_id))
@r.wrap(r.ClusterHealthCheckResource)
def cluster_health_check_add(self, context, verification_id, values):
"""Return created health check in the specified verification."""
return self._manager.cluster_health_check_add(
context, _get_id(verification_id), values)
@r.wrap(r.ClusterHealthCheckResource)
def cluster_health_check_get(self, context, health_check_id):
"""Return health check with the specified health_check_id."""
return self._manager.cluster_health_check_get(
context, _get_id(health_check_id))
@r.wrap(r.ClusterHealthCheckResource)
def cluster_health_check_update(self, context, health_check_id, values):
"""Return updated health check with the specified health_check_id."""
return self._manager.cluster_health_check_update(
context, _get_id(health_check_id), values)
class RemoteApi(LocalApi):
"""Conductor API that does updates via RPC to the ConductorManager."""

View File

@ -714,3 +714,36 @@ class ConductorManager(db_base.Base):
def cluster_event_add(self, context, provision_step, values):
"""Assign new event to the specified provision step."""
return self.db.cluster_event_add(context, provision_step, values)
# Cluster verifications / health checks ops
def cluster_verification_add(self, context, cluster_id, values):
"""Return created verification for the specified cluster."""
return self.db.cluster_verification_add(context, cluster_id, values)
def cluster_verification_get(self, context, verification_id):
"""Return verification with the specified verification_id."""
return self.db.cluster_verification_get(context, verification_id)
def cluster_verification_update(self, context, verification_id, values):
"""Return updated verification with the specified verification_id."""
return self.db.cluster_verification_update(
context, verification_id, values)
def cluster_verification_delete(self, context, verification_id):
""""Delete verification with the specified id."""
return self.db.cluster_verification_delete(context, verification_id)
def cluster_health_check_add(self, context, verification_id, values):
"""Return created health check in the specified verification."""
return self.db.cluster_health_check_add(
context, verification_id, values)
def cluster_health_check_get(self, context, health_check_id):
"""Return health check with the specified health_check_id."""
return self.db.cluster_health_check_get(context, health_check_id)
def cluster_health_check_update(self, context, health_check_id, values):
"""Return updated health check with the specified health_check_id."""
return self.db.cluster_health_check_update(
context, health_check_id, values)

View File

@ -336,3 +336,24 @@ class ClusterEvent(object):
successful
step_id
"""
class ClusterVerification(object):
"""An object representing cluster verification
id
cluster_id
status
checks
"""
class ClusterHealthCheck(object):
"""An object representing health check
id
verification_id
status
description
name
"""

View File

@ -207,6 +207,17 @@ class ClusterTemplateResource(Resource, objects.ClusterTemplate):
}
class ClusterHealthCheckResource(Resource, objects.ClusterHealthCheck):
_resource_name = 'cluster_health_check'
class ClusterVerificationResource(Resource, objects.ClusterVerification):
_resource_name = 'cluster_verification'
_children = {
'checks': (ClusterHealthCheckResource, 'verification')
}
class ClusterResource(Resource, objects.Cluster):
def sanitize_cluster_configs(self, cluster_configs):
@ -218,7 +229,8 @@ class ClusterResource(Resource, objects.Cluster):
_children = {
'node_groups': (NodeGroupResource, 'cluster'),
'cluster_template': (ClusterTemplateResource, None)
'cluster_template': (ClusterTemplateResource, None),
'verification': (ClusterVerificationResource, 'cluster')
}
_filter_fields = ['management_private_key', 'extra', 'rollback_info',

View File

@ -552,3 +552,46 @@ def cluster_provision_progress_update(context, cluster_id):
def cluster_event_add(context, provision_step, values):
"""Assign new event to the specified provision step."""
return IMPL.cluster_event_add(context, provision_step, values)
# Health verifications / checks ops
@to_dict
def cluster_verification_add(context, cluster_id, values):
"""Return created verification for the specified cluster."""
return IMPL.cluster_verification_add(context, cluster_id, values)
@to_dict
def cluster_verification_get(context, verification_id):
"""Return verification with the specified verification_id."""
return IMPL.cluster_verification_get(context, verification_id)
@to_dict
def cluster_verification_update(context, verification_id, values):
"""Return updated verification with the specified verification_id."""
return IMPL.cluster_verification_update(context, verification_id, values)
def cluster_verification_delete(context, verification_id):
""""Delete verification with the specified id."""
return IMPL.cluster_verification_delete(context, verification_id)
@to_dict
def cluster_health_check_add(context, verification_id, values):
"""Return created health check in the specified verification."""
return IMPL.cluster_health_check_add(context, verification_id, values)
@to_dict
def cluster_health_check_get(context, health_check_id):
"""Return health check with the specified health_check_id."""
return IMPL.cluster_health_check_get(context, health_check_id)
@to_dict
def cluster_health_check_update(context, health_check_id, values):
"""Return updated health check with the specified health_check_id."""
return IMPL.cluster_health_check_update(context, health_check_id, values)

View File

@ -0,0 +1,63 @@
# Copyright 2016 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""health-check
Revision ID: 029
Revises: 028
Create Date: 2016-01-26 16:11:46.008367
"""
# revision identifiers, used by Alembic.
revision = '030'
down_revision = '029'
from alembic import op
import sqlalchemy as sa
MYSQL_ENGINE = 'InnoDB'
MYSQL_CHARSET = 'utf8'
def upgrade():
op.create_table(
'cluster_verifications',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('cluster_id', sa.String(length=36), nullable=True),
sa.Column('status', sa.String(length=15), nullable=True),
sa.ForeignKeyConstraint(['cluster_id'], ['clusters.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('id', 'cluster_id'),
mysql_engine=MYSQL_ENGINE,
mysql_charset=MYSQL_CHARSET)
op.create_table(
'cluster_health_checks',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('verification_id', sa.String(length=36), nullable=True),
sa.Column('status', sa.String(length=15), nullable=True),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('name', sa.String(length=80), nullable=True),
sa.ForeignKeyConstraint(
['verification_id'], ['cluster_verifications.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('id', 'verification_id'),
mysql_engine=MYSQL_ENGINE,
mysql_charset=MYSQL_CHARSET)

View File

@ -1369,3 +1369,113 @@ def cluster_event_add(context, step_id, values):
session.add(event)
return event.id
# Cluster verifications / health check ops
def _cluster_verification_get(context, session, verification_id):
# tenant id is not presented
query = model_query(m.ClusterVerification, context, session,
project_only=False)
return query.filter_by(id=verification_id).first()
def cluster_verification_get(context, verification_id):
return _cluster_verification_get(context, get_session(), verification_id)
def cluster_verification_add(context, cluster_id, values):
session = get_session()
with session.begin():
cluster = _cluster_get(context, session, cluster_id)
if not cluster:
raise ex.NotFoundException(
cluster_id, _("Cluster id '%s' not found!"))
verification = m.ClusterVerification()
values['cluster_id'] = cluster_id
verification.update(values)
session.add(verification)
return verification
def cluster_verification_update(context, verification_id, values):
session = get_session()
with session.begin():
verification = _cluster_verification_get(
context, session, verification_id)
if not verification:
raise ex.NotFoundException(
verification_id, _("Verification id '%s' not found!"))
verification.update(values)
return verification
def cluster_verification_delete(context, verification_id):
session = get_session()
with session.begin():
verification = _cluster_verification_get(
context, session, verification_id)
if not verification:
raise ex.NotFoundException(
verification_id, _("Verification id '%s' not found!"))
for check in verification.checks:
session.delete(check)
session.delete(verification)
def _cluster_health_check_get(context, session, health_check_id):
# tenant id is not presented
query = model_query(m.ClusterHealthCheck, context, session,
project_only=False)
return query.filter_by(id=health_check_id).first()
def cluster_health_check_get(context, health_check_id):
return _cluster_health_check_get(context, get_session(), health_check_id)
def cluster_health_check_add(context, verification_id, values):
session = get_session()
with session.begin():
verification = _cluster_verification_get(
context, session, verification_id)
if not verification:
raise ex.NotFoundException(
verification_id, _("Verification id '%s' not found!"))
health_check = m.ClusterHealthCheck()
values['verification_id'] = verification_id
values['tenant_id'] = context.tenant_id
health_check.update(values)
session.add(health_check)
return health_check
def cluster_health_check_update(context, health_check_id, values):
session = get_session()
with session.begin():
health_check = _cluster_health_check_get(
context, session, health_check_id)
if not health_check:
raise ex.NotFoundException(
health_check_id, _("Health check id '%s' not found!"))
health_check.update(values)
return health_check

View File

@ -72,6 +72,8 @@ class Cluster(mb.SaharaBase):
cascade="all,delete",
backref='cluster',
lazy='joined')
verification = relationship('ClusterVerification', cascade="all,delete",
backref="cluster", lazy='joined')
node_groups = relationship('NodeGroup', cascade="all,delete",
backref='cluster', lazy='joined')
cluster_template_id = sa.Column(sa.String(36),
@ -87,6 +89,8 @@ class Cluster(mb.SaharaBase):
d['node_groups'] = [ng.to_dict() for ng in self.node_groups]
d['provision_progress'] = [pp.to_dict(show_progress) for pp in
self.provision_progress]
if self.verification:
d['verification'] = self.verification[0].to_dict()
return d
@ -498,3 +502,38 @@ class ClusterProvisionStep(mb.SaharaBase):
if show_progress:
d['events'] = [event.to_dict() for event in self.events]
return d
class ClusterVerification(mb.SaharaBase):
"""ClusterVerification represent results of cluster health checks."""
__tablename__ = 'cluster_verifications'
__table_args__ = (sa.UniqueConstraint('id', 'cluster_id'),)
id = _id_column()
cluster_id = sa.Column(
sa.String(36), sa.ForeignKey('clusters.id'))
status = sa.Column(sa.String(15))
checks = relationship(
'ClusterHealthCheck', cascade="all,delete",
backref='ClusterVerification', lazy='joined')
def to_dict(self):
base = super(ClusterVerification, self).to_dict()
base['checks'] = [check.to_dict() for check in self.checks]
return base
class ClusterHealthCheck(mb.SaharaBase):
"""ClusterHealthCheck respresent cluster health check."""
__tablename__ = 'cluster_health_checks'
__table_args__ = (sa.UniqueConstraint('id', 'verification_id'),)
id = _id_column()
verification_id = sa.Column(
sa.String(36), sa.ForeignKey('cluster_verifications.id'))
status = sa.Column(sa.String(15))
description = sa.Column(sa.Text)
name = sa.Column(sa.String(80))

View File

@ -33,7 +33,7 @@ class SaharaException(Exception):
def __str__(self):
return self.message
def __init__(self, message=None, code=None):
def __init__(self, message=None, code=None, inject_error_id=True):
self.uuid = uuidutils.generate_uuid()
if code:
@ -41,9 +41,10 @@ class SaharaException(Exception):
if message:
self.message = message
# Add Error UUID to the message
self.message = (_('%(message)s\nError ID: %(id)s')
% {'message': self.message, 'id': self.uuid})
if inject_error_id:
# Add Error UUID to the message if required
self.message = (_('%(message)s\nError ID: %(id)s')
% {'message': self.message, 'id': self.uuid})
super(SaharaException, self).__init__(
'%s: %s' % (self.code, self.message))

View File

@ -22,6 +22,7 @@ from six.moves.urllib import parse as urlparse
from sahara import conductor as c
from sahara import context
from sahara.plugins import base as plugin_base
from sahara.service.health import verification_base
from sahara.service import quotas
from sahara.utils import cluster as c_u
from sahara.utils import general as g
@ -171,6 +172,9 @@ def terminate_cluster(id):
def update_cluster(id, values):
if verification_base.update_verification_required(values):
OPS.handle_verification(id, values)
return conductor.cluster_get(context.ctx(), id)
return conductor.cluster_update(context.ctx(), id, values)

View File

View File

@ -0,0 +1,46 @@
# Copyright (c) 2016 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
HEALTH_STATUS_GREEN = "GREEN"
HEALTH_STATUS_YELLOW = "YELLOW"
HEALTH_STATUS_RED = "RED"
HEALTH_STATUS_CHECKING = "CHECKING"
HEALTH_STATUS_DONE = [
HEALTH_STATUS_GREEN,
HEALTH_STATUS_YELLOW,
HEALTH_STATUS_RED,
]
VERIFICATIONS_START_OPS = "START"
VERIFICATIONS_OPS = [
VERIFICATIONS_START_OPS,
]
CONF = cfg.CONF
health_opts = [
cfg.BoolOpt('verification_enable', default=True,
help="Option to enable verifications for all clusters"),
cfg.IntOpt('verification_periodic_interval', default=600,
help="Interval between two consecutive periodic tasks for"
"verifications, in seconds.")
]
health_opts_group = cfg.OptGroup(
'cluster_verifications', title='Options to configure verifications')
CONF.register_group(group=health_opts_group)
CONF.register_opts(health_opts, group=health_opts_group)

View File

@ -0,0 +1,150 @@
# Copyright (c) 2016 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from oslo_log import log as logging
import six
from sahara import conductor
from sahara import context
from sahara import exceptions
from sahara.i18n import _
from sahara.i18n import _LE
from sahara.service.health import common
from sahara.utils import cluster as cluster_utils
cond = conductor.API
LOG = logging.getLogger(__name__)
class BaseHealthError(exceptions.SaharaException):
message_template = _("Cluster health is %(status)s. Reason: %(reason)s")
code = 'HEALTH_ERROR'
status = 'UNKNOWN'
def __init__(self, reason):
message = self.message_template % {
'status': self.status, 'reason': reason}
# Ignoring Error id because it's not really needed
super(BaseHealthError, self).__init__(message, inject_error_id=False)
class RedHealthError(BaseHealthError):
"""Exception to indicate red state of the health check."""
code = "RED_STATE"
status = 'RED'
class YellowHealthError(BaseHealthError):
"""Exception to indicate yellow state of health check."""
code = "YELLOW_STATE"
status = 'YELLOW'
@six.add_metaclass(abc.ABCMeta)
class BasicHealthCheck(object):
def __init__(self, cluster):
"""Initialize a health check for the specified cluster."""
self.cluster = cluster
self.health_check_id = None
@abc.abstractmethod
def get_health_check_name(self):
"""Return name of the health check."""
pass
@abc.abstractmethod
def is_available(self):
"""Validate availability of the health check for the specified cluster
Return True when the health check is available for the specified
cluster and False when it's not.
"""
pass
@abc.abstractmethod
def check_health(self):
"""Check health of the specified cluster
Returns description if health check executed successfully. Raises
YellowStateError to indicate Yellow result of the health check,
and RedStateError to indicate Red result of the health check.
"""
pass
def _indicate_start(self):
vid = self.cluster.verification.id
self.health_check_id = cond.cluster_health_check_add(
context.ctx(), vid, {'status': common.HEALTH_STATUS_CHECKING,
'name': self.get_health_check_name()}).id
def _write_result(self, status, description):
cond.cluster_health_check_update(
context.ctx(), self.health_check_id,
{'status': status, 'description': description})
def execute(self):
if not self.is_available():
return
self._indicate_start()
try:
result = self.check_health()
status = common.HEALTH_STATUS_GREEN
except Exception as exc:
result = six.text_type(exc)
if isinstance(exc, BaseHealthError):
status = exc.status
else:
status = common.HEALTH_STATUS_RED
self._write_result(status, result)
class AllInstancesAccessible(BasicHealthCheck):
def __init__(self, cluster):
super(AllInstancesAccessible, self).__init__(cluster)
def is_available(self):
# always available : )
return True
def get_health_check_name(self):
return "Check of instances accessibility"
@staticmethod
def _check_health_for_instance(instance):
with instance.remote() as r:
r.execute_command("cd /tmp/")
def check_health(self):
instances = cluster_utils.get_instances(self.cluster)
try:
for inst in instances:
self._check_health_for_instance(inst)
except Exception:
LOG.exception(_LE(
"Some instances in the cluster are not available"))
raise RedHealthError(_("Some instances are not available"))
return _("All instances are available")
def get_basic(cluster):
return [AllInstancesAccessible]
def get_health_checks(cluster):
return get_basic(cluster)

View File

@ -0,0 +1,145 @@
# Copyright (c) 2016 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
from oslo_config import cfg
from oslo_log import log as logging
from sahara import conductor
from sahara import context
from sahara import exceptions
from sahara.i18n import _
from sahara.service.health import common
from sahara.service.health import health_check_base
from sahara.utils import cluster as cluster_utils
cond = conductor.API
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class CannotVerifyError(exceptions.SaharaException):
code = "CANNOT_VERIFY"
message_template = _("Cannot verify cluster. Reason: %s")
def __init__(self, reason):
message = self.message_template % reason
super(CannotVerifyError, self).__init__(message)
def get_possible_ops():
return common.VERIFICATIONS_OPS
def verification_exists(cluster):
try:
if cluster.verification is not None:
return True
except (AttributeError, KeyError):
return False
def validate_verification_ops(cluster, data):
status = data.get('verification', {}).get('status', None)
if not status:
# update doesn't affect verifications
return False
if len(list(data.keys())) != 1:
raise CannotVerifyError(
_("Can't update verification with other updates"))
if status == common.VERIFICATIONS_START_OPS:
return validate_verification_start(cluster)
def validate_verification_start(cluster):
if not CONF.cluster_verifications.verification_enable:
raise CannotVerifyError(_("All verifications are disabled"))
ctx = context.ctx()
cluster = cond.cluster_get(ctx, cluster)
if not cluster or cluster.status != cluster_utils.CLUSTER_STATUS_ACTIVE:
raise CannotVerifyError(_("Cluster is not active or doesn't exists"))
if not verification_exists(cluster):
return True
if cluster.verification.status == common.HEALTH_STATUS_CHECKING:
raise CannotVerifyError(
_("Cluster verification in state %s")
% common.HEALTH_STATUS_CHECKING)
return True
def _prepare_verification_running(ctx, cluster):
if verification_exists(cluster):
vid = cluster.verification.id
# to delete all data related to the previous one
cond.cluster_verification_delete(ctx, vid)
return (cond.cluster_verification_add(
ctx, cluster, {'status': common.HEALTH_STATUS_CHECKING}),
cond.cluster_get(ctx, cluster))
def _execute_health_checks(ctx, cluster):
health_checks = health_check_base.get_health_checks(cluster)
actual = []
with context.ThreadGroup() as tg:
for check in health_checks:
actual_check = check(cluster)
actual.append(actual_check)
tg.spawn('health-check-exc', actual_check.execute)
def _decide_status_for_verification(ctx, verification):
ver = cond.cluster_verification_get(ctx, verification)
cnt = collections.Counter()
for check in ver.checks:
cnt[check.status] += 1
if cnt[common.HEALTH_STATUS_GREEN] == len(ver.checks):
decided_status = common.HEALTH_STATUS_GREEN
elif cnt[common.HEALTH_STATUS_RED] > 0:
decided_status = common.HEALTH_STATUS_RED
else:
decided_status = common.HEALTH_STATUS_YELLOW
return cond.cluster_verification_update(
context.ctx(), ver.id, {'status': decided_status})
def verification_run(cluster):
ctx = context.ctx()
LOG.debug("Running verification for the cluster")
ver, cluster = _prepare_verification_running(ctx, cluster)
_execute_health_checks(ctx, cluster)
return _decide_status_for_verification(ctx, ver)
def handle_verification(cluster, values):
cluster = cond.cluster_get(context.ctx(), cluster)
context.set_current_cluster_id(cluster.id)
values = {} if not values else values
status = values.get('verification', {}).get('status', None)
if status == common.VERIFICATIONS_START_OPS:
verification_run(cluster)
def update_verification_required(values):
if values.get('verification', {}).get('status', None):
return True
return False
def get_verification_periodic_interval():
return CONF.cluster_verifications.verification_periodic_interval

View File

@ -28,6 +28,7 @@ from sahara.i18n import _
from sahara.i18n import _LE
from sahara.plugins import base as plugin_base
from sahara.service.edp import job_manager
from sahara.service.health import verification_base as ver_base
from sahara.service import ntp_service
from sahara.service import shares
from sahara.service import trusts
@ -35,7 +36,6 @@ from sahara.utils import cluster as c_u
from sahara.utils import remote
from sahara.utils import rpc as rpc_utils
conductor = c.API
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@ -75,6 +75,10 @@ class LocalOps(object):
context.spawn("Deleting Job Execution %s" % job_execution_id,
_delete_job_execution, job_execution_id)
def handle_verification(self, cluster_id, values):
context.spawn('Handling Verification for cluster %s' % cluster_id,
_handle_verification, cluster_id, values)
def get_engine_type_and_version(self):
return INFRA.get_type_and_version()
@ -105,6 +109,9 @@ class RemoteOps(rpc_utils.RPCClient):
self.cast('delete_job_execution',
job_execution_id=job_execution_id)
def handle_verification(self, cluster_id, values):
self.cast('handle_verification', cluster_id=cluster_id, values=values)
def get_engine_type_and_version(self):
return self.call('get_engine_type_and_version')
@ -148,6 +155,10 @@ class OpsServer(rpc_utils.RPCServer):
def delete_job_execution(self, job_execution_id):
_delete_job_execution(job_execution_id)
@request_context
def handle_verification(self, cluster_id, values):
_handle_verification(cluster_id, values)
@request_context
def get_engine_type_and_version(self):
return INFRA.get_type_and_version()
@ -365,3 +376,7 @@ def _delete_job_execution(job_execution_id):
LOG.error(_LE("Job execution can't be cancelled in time. "
"Deleting it anyway."))
conductor.job_execution_destroy(context.ctx(), job_execution_id)
def _handle_verification(cluster_id, values):
ver_base.handle_verification(cluster_id, values)

View File

@ -29,6 +29,7 @@ from sahara.i18n import _LW
from sahara.service import api
from sahara.service import coordinator
from sahara.service.edp import job_manager
from sahara.service.health import verification_base as vb
from sahara.service import trusts
from sahara.utils import cluster as c_u
from sahara.utils import edp
@ -224,6 +225,21 @@ def _make_periodic_tasks():
# Add event log info cleanup
context.ctx().current_instance_info = context.InstanceInfo()
@periodic_task.periodic_task(
spacing=vb.get_verification_periodic_interval())
@set_context
def run_verifications(self, ctx):
LOG.debug("Executing health checks for the clusters")
start_dict = {'verification': {'status': 'START'}}
for cluster in conductor.cluster_get_all(
context.ctx(), status=c_u.CLUSTER_STATUS_ACTIVE):
try:
vb.validate_verification_start(cluster)
api.update_cluster(cluster.id, start_dict)
except vb.CannotVerifyError:
LOG.debug("Skipping running verification "
"on the cluster %s" % cluster.name)
return SaharaPeriodicTasks()

View File

@ -19,10 +19,10 @@ from sahara import context
import sahara.exceptions as ex
from sahara.i18n import _
import sahara.service.api as api
from sahara.service.health import verification_base
from sahara.service.validations import acl
import sahara.service.validations.base as b
CONF = cfg.CONF
@ -115,5 +115,8 @@ def check_cluster_delete(cluster_id, **kwargs):
def check_cluster_update(cluster_id, data, **kwargs):
cluster = api.get_cluster(cluster_id)
verification = verification_base.validate_verification_ops(
cluster, data)
acl.check_tenant_for_update(context.current(), cluster)
acl.check_protected_from_update(cluster, data)
if not verification:
acl.check_protected_from_update(cluster, data)

View File

@ -15,6 +15,7 @@
import copy
from sahara.service.health import verification_base
import sahara.service.validations.cluster_template_schema as ct_schema
from sahara.service.validations import shares
@ -68,6 +69,14 @@ CLUSTER_UPDATE_SCHEMA = {
"is_protected": {
"type": ["boolean", "null"],
},
"verification": {
"type": "object",
"properties": {
"status": {
"enum": verification_base.get_possible_ops(),
}
},
},
"shares": copy.deepcopy(shares.SHARE_SCHEMA),
},
"additionalProperties": False,

View File

@ -578,6 +578,35 @@ class SaharaMigrationsCheckers(object):
self.assertFalse(res['is_protected'])
engine.execute(t.delete())
def _check_030(self, engine, data):
health_check_columns = [
'status',
'name',
'description',
'id',
'verification_id',
'created_at',
'updated_at'
]
verification_columns = [
'status',
'id',
'cluster_id',
'created_at',
'updated_at'
]
self.assertColumnCount(engine, 'cluster_verifications',
verification_columns)
self.assertColumnsExist(engine, 'cluster_verifications',
verification_columns)
self.assertColumnCount(engine, 'cluster_health_checks',
health_check_columns)
self.assertColumnsExist(engine, 'cluster_health_checks',
health_check_columns)
class TestMigrationsMySQL(SaharaMigrationsCheckers,
base.BaseWalkMigrationTestCase,

View File

@ -0,0 +1,166 @@
# Copyright (c) 2016 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import six
from sahara import conductor
from sahara import context
from sahara import exceptions
from sahara.service.health import health_check_base
from sahara.service.health import verification_base
from sahara.tests.unit import base
from sahara.tests.unit.conductor import test_api
class Check(health_check_base.BasicHealthCheck):
def check_health(self):
return "No criminality"
def get_health_check_name(self):
return "James bond check"
def is_available(self):
return True
class RedCheck(Check):
def check_health(self):
raise health_check_base.RedHealthError("Ooouch!")
class YellowCheck(Check):
def check_health(self):
raise health_check_base.YellowHealthError("No problems, boss!")
class TestVerifications(base.SaharaWithDbTestCase):
def setUp(self):
super(TestVerifications, self).setUp()
self.api = conductor.API
def _cluster_sample(self):
ctx = context.ctx()
cluster = self.api.cluster_create(ctx, test_api.SAMPLE_CLUSTER)
return cluster
@mock.patch('sahara.service.health.health_check_base.get_health_checks')
def test_verification_start(self, get_health_checks):
cluster = self._cluster_sample()
get_health_checks.return_value = [Check]
verification_base.handle_verification(cluster, {
'verification': {'status': 'START'}})
cluster = self.api.cluster_get(context.ctx(), cluster)
ver = cluster.verification
self.assertEqual('GREEN', ver['status'])
self.assertEqual(1, len(ver['checks']))
self.assertEqual('No criminality', ver.checks[0]['description'])
id = ver['id']
get_health_checks.return_value = [YellowCheck, Check, Check]
verification_base.handle_verification(cluster, {
'verification': {'status': 'START'}})
cluster = self.api.cluster_get(context.ctx(), cluster)
ver = cluster.verification
self.assertEqual('YELLOW', ver['status'])
self.assertEqual(3, len(ver['checks']))
self.assertNotEqual(ver['id'], id)
get_health_checks.return_value = [RedCheck, YellowCheck]
verification_base.handle_verification(cluster, {
'verification': {'status': 'START'}})
cluster = self.api.cluster_get(context.ctx(), cluster)
ver = cluster.verification
self.assertEqual('RED', ver['status'])
self.assertEqual(2, len(ver['checks']))
self.assertNotEqual(ver['id'], id)
self.assertEqual("James bond check", ver['checks'][0]['name'])
def _validate_exception(self, exc, expected_message):
message = six.text_type(exc)
# removing Error ID
message = message.split('\n')[0]
self.assertEqual(expected_message, message)
def test_conductor_crud_verifications(self):
ctx = context.ctx()
try:
self.api.cluster_verification_add(
ctx, '1', values={'status': 'name'})
except exceptions.NotFoundException as e:
self._validate_exception(e, "Cluster id '1' not found!")
cl = self._cluster_sample()
ver = self.api.cluster_verification_add(
ctx, cl.id, values={'status': 'GREAT!'})
ver = self.api.cluster_verification_get(ctx, ver['id'])
self.assertEqual('GREAT!', ver['status'])
self.api.cluster_verification_update(ctx, ver['id'],
values={'status': "HEY!"})
ver = self.api.cluster_verification_get(ctx, ver['id'])
self.assertEqual('HEY!', ver['status'])
self.assertIsNone(
self.api.cluster_verification_delete(ctx, ver['id']))
try:
self.api.cluster_verification_delete(ctx, ver['id'])
except exceptions.NotFoundException as e:
self._validate_exception(
e, "Verification id '%s' not found!" % ver['id'])
try:
self.api.cluster_verification_update(
ctx, ver['id'], values={'status': "ONE MORE"})
except exceptions.NotFoundException as e:
self._validate_exception(
e, "Verification id '%s' not found!" % ver['id'])
self.assertIsNone(self.api.cluster_verification_get(ctx, ver['id']))
def test_conductor_crud_health_checks(self):
ctx = context.ctx()
try:
self.api.cluster_health_check_add(
ctx, '1', values={'status': 'status'})
except exceptions.NotFoundException as e:
self._validate_exception(e, "Verification id '1' not found!")
cl = self._cluster_sample()
vid = self.api.cluster_verification_add(
ctx, cl.id, values={'status': 'GREAT!'})['id']
hc = self.api.cluster_health_check_add(ctx, vid, {'status': "Sah"})
hc = self.api.cluster_health_check_get(ctx, hc['id'])
self.assertEqual('Sah', hc['status'])
hc = self.api.cluster_health_check_update(
ctx, hc['id'], {'status': "ara"})
hc = self.api.cluster_health_check_get(ctx, hc['id'])
self.assertEqual('ara', hc['status'])
self.api.cluster_verification_delete(ctx, vid)
try:
hc = self.api.cluster_health_check_update(
ctx, hc['id'], {'status': "rulez!"})
except exceptions.NotFoundException as e:
self._validate_exception(
e, "Health check id '%s' not found!" % hc['id'])
self.assertIsNone(self.api.cluster_health_check_get(ctx, hc['id']))

View File

@ -19,6 +19,7 @@ import testtools
from sahara import exceptions as ex
from sahara.service import api
from sahara.service.health import verification_base
from sahara.service.validations import clusters as c_val
from sahara.service.validations import clusters_schema as c_schema
from sahara.tests.unit.service.validation import utils as u
@ -98,3 +99,44 @@ class TestClusterUpdateValidation(u.ValidationTestCase):
except ex.UpdateFailedException as e:
self.assert_created_in_another_tenant_exception(e)
raise e
@mock.patch('sahara.conductor.API.cluster_get')
def test_verifications_ops(self, get_cluster_mock):
cluster = tu.create_cluster(
'cluster1', "tenant_1", "fake", "0.1", ['ng1'], status='Active')
get_cluster_mock.return_value = cluster
self.assertIsNone(c_val.check_cluster_update(
cluster, {'verification': {'status': "START"}}))
cluster = tu.create_cluster(
'cluster1', "tenant_1", "fake", "0.1", ['ng1'],
status='Active', verification={'status': "CHECKING"})
get_cluster_mock.return_value = cluster
with testtools.ExpectedException(verification_base.CannotVerifyError):
c_val.check_cluster_update(
cluster, {'verification': {'status': 'START'}})
cluster = tu.create_cluster(
'cluster1', "tenant_1", "fake", "0.1", ['ng1'],
status='Active', verification={'status': "RED"})
get_cluster_mock.return_value = cluster
self.assertIsNone(c_val.check_cluster_update(
cluster, {'verification': {'status': "START"}}))
with testtools.ExpectedException(verification_base.CannotVerifyError):
c_val.check_cluster_update(cluster, {
'is_public': True, 'verification': {'status': "START"}})
# allow verification for protected resource
cluster = tu.create_cluster(
'cluster1', "tenant_1", "fake", "0.1", ['ng1'],
is_protected=True, status='Active')
get_cluster_mock.return_value = cluster
self.assertIsNone(c_val.check_cluster_update(
cluster, {'verification': {'status': "START"}}))
# just for sure that protected works nicely for other
with testtools.ExpectedException(ex.UpdateFailedException):
try:
c_val.check_cluster_update(cluster.id, {'name': 'new'})
except ex.UpdateFailedException as e:
self.assert_protected_resource_exception(e)
raise e