update host state when updating clusterhost state.

Change-Id: I9c208870de9cc6e6dd8310c32455a951b1f09585
This commit is contained in:
xiaodongwang 2014-08-04 16:33:03 -07:00
parent ce17c1c3db
commit 21cc07bed3
11 changed files with 181 additions and 91 deletions

@ -88,16 +88,13 @@ def update_progress(cluster_hosts):
clusterid = cluster.id
adapter = cluster.adapter
os_installer = adapter.os_installer
os_installer = adapter.adapter_os_installer
os_installer_name = os_installer.instance_name
package_installer = adapter.package_installer
package_installer = adapter.adapter_package_installer
package_installer_name = package_installer.instance_name
distributed_system = cluster.distributed_system
distributed_system_name = distributed_system.name
host = session.query(models.Host).first()
os_name = host.os_name
distributed_system_name = cluster.distributed_system_name
os_name = cluster.os_name
os_names[clusterid] = os_name
distributed_systems[clusterid] = distributed_system_name

@ -238,6 +238,17 @@ def show_user(user_id):
)
@app.route("/current-user", methods=['GET'])
@log_user_action
@login_required
def show_current_user():
"""Get user."""
data = _get_request_args()
return utils.make_json_response(
200, user_api.get_current_user(current_user, **data)
)
@app.route("/users/<int:user_id>", methods=['PUT'])
@log_user_action
@login_required

@ -1055,7 +1055,7 @@ def review_cluster(session, reviewer, cluster_id, review={}, **kwargs):
deployed_os_config, host.os_id, True
)
host_api.validate_host(session, host)
host.config_validated = True
utils.update_db_object(session, host, config_validated=True)
package_config = cluster.package_config
if package_config:
metadata_api.validate_package_config(
@ -1070,8 +1070,8 @@ def review_cluster(session, reviewer, cluster_id, review={}, **kwargs):
deployed_package_config,
cluster.adapter_id, True
)
clusterhost.config_validated = True
cluster.config_validated = True
utils.update_db_object(session, clusterhost, config_validated=True)
utils.update_db_object(session, cluster, config_validated=True)
return {
'cluster': cluster,
'clusterhosts': cluster.clusterhosts
@ -1108,16 +1108,8 @@ def deploy_cluster(
)
is_cluster_editable(session, cluster, deployer)
is_cluster_validated(session, cluster)
utils.update_db_object(
session, cluster.state, state='INITIALIZED'
)
utils.update_db_object(session, cluster.state, state='INITIALIZED')
for clusterhost in clusterhosts:
if cluster.distributed_system:
is_clusterhost_validated(session, clusterhost)
utils.update_db_object(
session, clusterhost.state,
state='INITIALIZED'
)
host = clusterhost.host
if host_api.is_host_editable(
session, host, deployer,
@ -1126,8 +1118,11 @@ def deploy_cluster(
host_api.is_host_validated(
session, host
)
utils.update_db_object(session, host.state, state='INITIALIZED')
if cluster.distributed_system:
is_clusterhost_validated(session, clusterhost)
utils.update_db_object(
session, host.state, state='INITIALIZED'
session, clusterhost.state, state='INITIALIZED'
)
celery_client.celery.send_task(
@ -1208,7 +1203,7 @@ def update_cluster_host_state(
@utils.supported_filters(
optional_support_keys=UPDATED_CLUSTERHOST_STATE_FIELDS
)
## @database.run_in_session()
@database.run_in_session()
@user_api.check_user_permission_in_session(
permission.PERMISSION_UPDATE_CLUSTERHOST_STATE
)
@ -1227,7 +1222,7 @@ def update_clusterhost_state(
@utils.supported_filters(
optional_support_keys=UPDATED_CLUSTER_STATE_FIELDS
)
## @database.run_in_session()
@database.run_in_session()
@user_api.check_user_permission_in_session(
permission.PERMISSION_UPDATE_CLUSTER_STATE
)

@ -84,7 +84,7 @@ RESP_STATE_FIELDS = [
'id', 'state', 'percentage', 'message'
]
UPDATED_STATE_FIELDS = [
'id', 'state', 'percentage', 'message'
'state', 'percentage', 'message'
]
@ -588,8 +588,8 @@ def get_host_state(session, getter, host_id, **kwargs):
).state_dict()
@utils.supported_filters(UPDATED_STATE_FIELDS)
## @database.run_in_session()
@utils.supported_filters(optional_support_keys=UPDATED_STATE_FIELDS)
@database.run_in_session()
@user_api.check_user_permission_in_session(
permission.PERMISSION_UPDATE_HOST_STATE
)

@ -293,6 +293,19 @@ def get_user(
)
@utils.supported_filters()
@database.run_in_session()
@utils.wrap_to_dict(RESP_FIELDS)
def get_current_user(
session, getter,
exception_when_missing=True, **kwargs
):
"""get field dict of a user."""
return utils.get_db_object(
session, models.User, exception_when_missing, id=getter.id
)
@utils.supported_filters(
optional_support_keys=SUPPORTED_FIELDS
)

@ -173,7 +173,7 @@ class FieldMixin(HelperMixin):
field_type_data = Column(
'field_type',
Enum('basestring', 'int', 'float', 'list', 'bool'),
default='basestring'
ColumnDefault('basestring')
)
display_type = Column(
Enum(
@ -181,7 +181,7 @@ class FieldMixin(HelperMixin):
'multiselect', 'combobox', 'text',
'multitext', 'password'
),
default='text'
ColumnDefault('text')
)
validator_data = Column('validator', Text)
js_validator = Column(Text)
@ -270,13 +270,13 @@ class StateMixin(TimestampMixin, HelperMixin):
'UNINITIALIZED', 'INITIALIZED',
'INSTALLING', 'SUCCESSFUL', 'ERROR'
),
default='UNINITIIALIZED'
ColumnDefault('UNINITIALIZED')
)
percentage = Column(Float, default=0.0)
message = Column(Text, default='')
severity = Column(
Enum('INFO', 'WARNING', 'ERROR'),
default='INFO'
ColumnDefault('INFO')
)
def update(self):
@ -284,13 +284,14 @@ class StateMixin(TimestampMixin, HelperMixin):
self.percentage = 0.0
self.severity = 'INFO'
self.message = ''
if self.severity == 'ERROR':
self.state = 'ERROR'
if self.state == 'INSTALLING':
if self.severity == 'ERROR':
self.state = 'ERROR'
elif self.percentage >= 1.0:
self.state = 'SUCCESSFUL'
self.percentage = 1.0
if self.state == 'SUCCESSFUL':
self.percentage = 1.0
if self.percentage >= 1.0:
self.state = 'SUCCESSFUL'
self.percentage = 1.0
super(StateMixin, self).update()
@ -392,6 +393,18 @@ class ClusterHostState(BASE, StateMixin):
primary_key=True
)
def update(self):
host_state = self.host.state
if self.state == 'INITIALIZED':
if host_state.state in ['UNINITIALIZED']:
host_state.state = 'INITIALIZED'
host_state.update()
elif self.state == 'INSTALLING':
if host_state.state in ['UNINITIALIZED', 'INITIALIZED']:
host_state.state = 'INSTALLING'
host_state.update()
super(ClusterHostState, self).update()
class ClusterHost(BASE, TimestampMixin, HelperMixin):
"""ClusterHost table."""
@ -429,6 +442,15 @@ class ClusterHost(BASE, TimestampMixin, HelperMixin):
self.state = ClusterHostState()
super(ClusterHost, self).__init__(**kwargs)
def update(self):
if self.host.reinstall_os:
if self.state in ['SUCCESSFUL', 'ERROR']:
if self.config_validated:
self.state.state = 'INITIALIZED'
else:
self.state.state = 'UNINITIALIZED'
self.state.update()
@property
def name(self):
return '%s.%s' % (self.host.name, self.cluster.name)
@ -440,6 +462,10 @@ class ClusterHost(BASE, TimestampMixin, HelperMixin):
@patched_package_config.setter
def patched_package_config(self, value):
self.package_config = util.merge_dict(dict(self.package_config), value)
logging.info(
'patch clusterhost %s package config: %s',
self.id, value
)
self.config_validated = False
@property
@ -451,6 +477,10 @@ class ClusterHost(BASE, TimestampMixin, HelperMixin):
package_config = dict(self.package_config)
package_config.update(value)
self.package_config = package_config
logging.info(
'put clusterhost %s package config: %s',
self.id, value
)
self.config_validated = False
@property
@ -460,7 +490,7 @@ class ClusterHost(BASE, TimestampMixin, HelperMixin):
@patched_os_config.setter
def patched_os_config(self, value):
host = self.host
host.os_config = util.merge_dict(dict(host.os_config), value)
host.patched_os_config = value
@property
def put_os_config(self):
@ -469,9 +499,7 @@ class ClusterHost(BASE, TimestampMixin, HelperMixin):
@put_os_config.setter
def put_os_config(self, value):
host = self.host
os_config = dict(host.os_config)
os_config.update(value)
host.os_config = os_config
host.put_os_config = value
@hybrid_property
def distributed_system_name(self):
@ -586,6 +614,26 @@ class HostState(BASE, StateMixin):
host = self.host
if self.state == 'INSTALLING':
host.reinstall_os = False
for clusterhost in self.host.clusterhosts:
if clusterhost.state in [
'SUCCESSFUL', 'ERROR'
]:
clusterhost.state = 'INSTALLING'
clusterhost.state.update()
elif self.state == 'UNINITIALIZED':
for clusterhost in self.host.clusterhosts:
if clusterhost.state in [
'INITIALIZED', 'INSTALLING', 'SUCCESSFUL', 'ERROR'
]:
clusterhost.state = 'UNINITIALIZED'
clusterhost.state.update()
elif self.state == 'INITIALIZED':
for clusterhost in self.host.clusterhosts:
if clusterhost.state in [
'INSTALLING', 'SUCCESSFUL', 'ERROR'
]:
clusterhost.state = 'INITIALIZED'
clusterhost.state.update()
super(HostState, self).update()
@ -643,6 +691,7 @@ class Host(BASE, TimestampMixin, HelperMixin):
@patched_os_config.setter
def patched_os_config(self, value):
self.os_config = util.merge_dict(dict(self.os_config), value)
logging.info('patch host os config in %s: %s', self.id, value)
self.config_validated = False
@property
@ -654,11 +703,11 @@ class Host(BASE, TimestampMixin, HelperMixin):
os_config = dict(self.os_config)
os_config.update(value)
self.os_config = os_config
logging.info('put host os config in %s: %s', self.id, value)
self.config_validated = False
def __init__(self, id, **kwargs):
self.id = id
self.name = str(self.id)
self.state = HostState()
super(Host, self).__init__(**kwargs)
@ -669,7 +718,12 @@ class Host(BASE, TimestampMixin, HelperMixin):
def update(self):
if self.reinstall_os:
self.state = HostState()
if self.state in ['SUCCESSFUL', 'ERROR']:
if self.config_validated:
self.state.state = 'INITIALIZED'
else:
self.state.state = 'UNINITIALIZED'
self.state.update()
os = self.os
if os:
self.os_name = os.name
@ -864,7 +918,12 @@ class Cluster(BASE, TimestampMixin, HelperMixin):
def update(self):
if self.reinstall_distributed_system:
self.state = ClusterState()
if self.state in ['SUCCESSFUL', 'ERROR']:
if self.config_validated:
self.state.state = 'INITIALIZED'
else:
self.state.state = 'UNINITIALIZED'
self.state.update()
os = self.os
if os:
self.os_name = os.name
@ -876,14 +935,10 @@ class Cluster(BASE, TimestampMixin, HelperMixin):
self.adapter_name = adapter.name
self.distributed_system = adapter.adapter_distributed_system
self.distributed_system_name = self.distributed_system.name
self.put_package_config = {
'roles': [role.name for role in adapter.roles]
}
else:
self.adapter_name = None
self.distributed_system = None
self.distributed_system_name = None
self.package_config = {}
super(Cluster, self).update()
def validate(self):
@ -926,6 +981,7 @@ class Cluster(BASE, TimestampMixin, HelperMixin):
@patched_os_config.setter
def patched_os_config(self, value):
self.os_config = util.merge_dict(dict(self.os_config), value)
logging.info('patch cluster %s os config: %s', self.id, value)
self.config_validated = False
@property
@ -937,6 +993,7 @@ class Cluster(BASE, TimestampMixin, HelperMixin):
os_config = dict(self.os_config)
os_config.update(value)
self.os_config = os_config
logging.info('put cluster %s os config: %s', self.id, value)
self.config_validated = False
@property
@ -947,6 +1004,7 @@ class Cluster(BASE, TimestampMixin, HelperMixin):
def patched_package_config(self, value):
package_config = dict(self.package_config)
self.package_config = util.merge_dict(package_config, value)
logging.info('patch cluster %s package config: %s', self.id, value)
self.config_validated = False
@property
@ -958,6 +1016,7 @@ class Cluster(BASE, TimestampMixin, HelperMixin):
package_config = dict(self.package_config)
package_config.update(value)
self.package_config = package_config
logging.info('put cluster %s package config: %s', self.id, value)
self.config_validated = False
@hybrid_property
@ -1345,7 +1404,7 @@ class Switch(BASE, HelperMixin, TimestampMixin):
state = Column(Enum('initialized', 'unreachable', 'notsupported',
'repolling', 'error', 'under_monitoring',
name='switch_state'),
default='initialized')
ColumnDefault('initialized'))
filters = Column(JSONEncoded, default=[])
switch_machines = relationship(
SwitchMachine,

@ -200,7 +200,7 @@ class AdapterMatcher(object):
session = database.current_session()
clusterhost = session.query(
ClusterHost
).filter_by(id=hostid).first()
).filter_by(host_id=hostid).first()
if not clusterhost:
logging.error(
'there is no clusterhost for %s in ClusterHost',
@ -226,7 +226,7 @@ class AdapterMatcher(object):
def _update_host_progress(cls, hostid, host_progress, updater):
"""Updates host progress to db."""
state = ''
state = 'INSTALLING'
with database.session() as session:
host = session.query(
Host).filter_by(id=hostid).first()
@ -265,13 +265,13 @@ class AdapterMatcher(object):
if host.state.severity == 'ERROR':
state = 'ERROR'
logging.info('update host state by %s', updater)
host_api.update_host_state(
updater,
hostid,
state=state,
percentage=host_progress.progress,
message=host_progress.message,
id=hostid
message=host_progress.message
)
logging.debug(
@ -281,15 +281,16 @@ class AdapterMatcher(object):
@classmethod
def _update_clusterhost_progress(
cls,
clusterid,
hostid,
clusterhost_progress,
updater
):
clusterhost_state = ''
clusterhost_state = 'INSTALLING'
with database.session() as session:
clusterhost = session.query(
ClusterHost).filter_by(id=hostid).first()
ClusterHost).filter_by(host_id=hostid).first()
if not clusterhost.state:
logging.error(
@ -322,8 +323,10 @@ class AdapterMatcher(object):
if clusterhost.state.severity == 'ERROR':
clusterhost_state = 'ERROR'
cluster_api.update_clusterhost_state(
logging.info('updatge clusterhost state by %s', updater)
cluster_api.update_cluster_host_state(
updater,
clusterid,
hostid,
state=clusterhost_state,
percentage=clusterhost_progress.progress,
@ -364,40 +367,44 @@ class AdapterMatcher(object):
cluster_messages = {}
cluster_severities = set([])
cluster_installing_hosts = 0
cluster_completed_hosts = 0
cluster_failed_hosts = 0
hostids = []
clusterhosts = cluster.clusterhosts
hosts = [clusterhost.host for clusterhost in clusterhosts]
for host in hosts:
if host.state:
hostids.append(host.id)
cluster_progress += host.state.percentage
if not cluster.distributed_system:
hosts = [clusterhost.host for clusterhost in clusterhosts]
for host in hosts:
if host.state:
cluster_progress += host.state.percentage
if host.state.state == 'INSTALLING':
cluster_installing_hosts += 1
elif host.state.state == 'SUCCESSFUL':
cluster_completed_hosts += 1
elif host.state.state == 'ERROR':
cluster_failed_hosts += 1
if host.state.message:
cluster_messages[host.name] = host.state.message
if host.state.severity:
cluster_severities.add(host.state.severity)
else:
for clusterhost in clusterhosts:
if clusterhost.state:
cluster_progress += clusterhost.state.percentage
if clusterhost.state.state == 'INSTALLING':
cluster_installing_hosts += 1
elif clusterhost.state.state == 'SUCCESSFUL':
cluster_completed_hosts += 1
elif clusterhost.state.state == 'ERROR':
cluster_failed_hosts += 1
if clusterhost.state.message:
cluster_messages[clusterhost.name] = (
clusterhost.state.message
)
if clusterhost.state.severity:
cluster_severities.add(clusterhost.state.severity)
for clusterhost in clusterhosts:
if clusterhost.state:
cluster_progress += clusterhost.state.percentage
if clusterhost.state.state == 'INSTALLING':
cluster_installing_hosts += 1
elif (clusterhost.host.state.state not in
['ERROR', 'INITIALIZED'] and
clusterhost.state.state != 'ERORR'):
cluster_installing_hosts += 1
elif (clusterhost.state.state == 'ERROR' or
clusterhost.host.state.state == 'ERROR'):
cluster_failed_hosts += 1
if clusterhost.state.message:
cluster_messages[host.name] = clusterhost.state.message
if clusterhost.state.severity:
cluster_severities.add(clusterhost.state.severity)
cluster.state.percentage = cluster_progress / (len(hostids) * 2)
cluster.state.percentage = (
float(cluster_completed_hosts) / float(cluster.state.total_hosts)
)
cluster.state.message = '\n'.join(
[
'%s: %s' % (hostname, message)
@ -410,19 +417,15 @@ class AdapterMatcher(object):
break
if cluster.state.percentage >= 1.0:
cluster.state.state = 'READY'
cluster.state.state = 'SUCCESSFUL'
if cluster.state.severity == 'ERROR':
cluster.state.state = 'ERROR'
if cluster.state.state != 'INSTALLING':
cluster.mutable = True
cluster.state.installing_hosts = cluster_installing_hosts
cluster.state.total_hosts = len(clusterhosts)
cluster.state.failed_hosts = cluster_failed_hosts
cluster.state.completed_hosts = cluster.state.total_hosts - \
cluster.state.installing_hosts - cluster.state.failed_hosts
cluster.state.completed_hosts = cluster_completed_hosts
logging.debug(
'update cluster %s state %s',
@ -433,7 +436,9 @@ class AdapterMatcher(object):
host_progresses = {}
clusterhost_progresses = {}
updater = user_api.get_user_object(
'admin@abc.com'
'admin@abc.com',
expire_timestamp=datetime.datetime.now() +
datetime.timedelta(seconds=10000)
)
with database.session():
for hostid in hostids:
@ -500,6 +505,7 @@ class AdapterMatcher(object):
_, _, clusterhost_progress = clusterhost_progresses[hostid]
self._update_host_progress(hostid, host_progress, updater)
self._update_clusterhost_progress(
clusterid,
hostid,
clusterhost_progress,
updater

@ -19,6 +19,7 @@
import logging
from celery.signals import celeryd_init
from celery.signals import setup_logging
from compass.actions import deploy
from compass.actions import poll_switch
@ -44,6 +45,14 @@ def global_celery_init(**_):
metadata_api.load_metadatas()
@setup_logging.connect()
def tasks_setup_logging(**_):
"""Setup logging options from compass setting."""
flags.init()
flags.OPTIONS.logfile = setting.CELERY_LOGFILE
logsetting.init()
@celery.task(name='compass.tasks.pollswitch')
def pollswitch(
poller_email, ip_addr, credentials,
@ -129,6 +138,7 @@ def update_clusters_progress(cluster_hosts):
:param cluster_hosts: the cluster and hosts of each cluster to update.
:type cluster_hosts: dict of int to list of int
"""
logging.info('update_clusters_progress: %s', cluster_hosts)
try:
update_progress.update_progress(cluster_hosts)
except Exception as error:

@ -4,7 +4,7 @@ flask-restful
flask-sqlalchemy
flask-login
celery
cheetah
cheetah==2.4.1
netaddr
paramiko==1.7.5
simplejson
@ -14,6 +14,5 @@ redis
flask-wtf
itsdangerous
importlib
MySQL-python
lockfile
daemon

@ -26,7 +26,7 @@ SUSE=/etc/SuSE-release
CELERY=$CeleryPath
if [ -f $DEBIAN ]; then
. /lib/lsb/init_functions
. /lib/lsb/init-functions
elif [ -f $SUSE -a -r /etc/rc.status ]; then
. /etc/rc.status
else

@ -26,7 +26,7 @@ SUSE=/etc/SuSE-release
PYTHON=$Python
if [ -f $DEBIAN ]; then
. /lib/lsb/init_functions
. /lib/lsb/init-functions
elif [ -f $SUSE -a -r /etc/rc.status ]; then
. /etc/rc.status
else