Add Progress Update feature to new compass-core

temporarily commented out run_in_session in util
Separated DB tables for clusterhost and host state.

Change-Id: Ib4e4c6c1931bbbf62b0b77bc994bffecb0fe27bd
This commit is contained in:
root 2014-08-02 12:27:34 -07:00 committed by Xicheng Chang
parent 0e607e4563
commit c24e622578
11 changed files with 564 additions and 237 deletions

View File

@ -0,0 +1,113 @@
# Copyright 2014 Huawei Technologies Co. Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module to update status and installing progress of the given cluster.
.. moduleauthor:: Xiaodong Wang <xiaodongwang@huawei.com>
"""
import logging
from compass.actions import util
from compass.db.api import database
from compass.db import models
from compass.log_analyzor import progress_calculator
from compass.utils import setting_wrapper as setting
def _cluster_filter(cluster):
"""filter cluster."""
if not cluster.state:
logging.error('there is no state for cluster %s',
cluster.id)
return False
if cluster.state.state != 'INSTALLING':
logging.error('the cluster %s state %s is not installing',
cluster.id, cluster.state.state)
return False
return True
def _host_filter(host):
"""filter host."""
if not host.state:
logging.error('there is no state for host %s',
host.id)
return False
if host.state.state != 'INSTALLING':
logging.error('the host %s state %s is not installing',
host.id, host.state.state)
return False
return True
def update_progress(cluster_hosts):
"""Update status and installing progress of the given cluster.
:param cluster_hosts: clusters and hosts in each cluster to update.
:type cluster_hosts: dict of int or str to list of int or str
.. note::
The function should be called out of the database session scope.
In the function, it will update the database cluster_state and
host_state table for the deploying cluster and hosts.
The function will also query log_progressing_history table to get
the lastest installing progress and the position of log it has
processed in the last run. The function uses these information to
avoid recalculate the progress from the beginning of the log file.
After the progress got updated, these information will be stored back
to the log_progressing_history for next time run.
"""
with util.lock('log_progressing', blocking=False) as lock:
if not lock:
logging.error(
'failed to acquire lock to calculate installation progress')
return
logging.info('update installing progress of cluster_hosts: %s',
cluster_hosts)
os_names = {}
distributed_systems = {}
with database.session() as session:
cluster = session.query(models.Cluster).first()
clusterid = cluster.id
adapter = cluster.adapter
os_installer = adapter.os_installer
os_installer_name = os_installer.instance_name
package_installer = adapter.package_installer
package_installer_name = package_installer.instance_name
distributed_system = cluster.distributed_system
distributed_system_name = distributed_system.name
host = session.query(models.Host).first()
os_name = host.os_name
os_names[clusterid] = os_name
distributed_systems[clusterid] = distributed_system_name
clusterhosts = cluster.clusterhosts
hostids = [clusterhost.host.id for clusterhost in clusterhosts]
cluster_hosts.update({clusterid: hostids})
progress_calculator.update_progress(
os_installer_name,
os_names,
package_installer_name,
distributed_systems,
cluster_hosts)

View File

@ -48,3 +48,66 @@ def lock(lock_name, blocking=True, timeout=10):
instance_lock.acquired_until = 0 instance_lock.acquired_until = 0
instance_lock.release() instance_lock.release()
logging.debug('released lock %s', lock_name) logging.debug('released lock %s', lock_name)
"""
def update_cluster_hosts(cluster_hosts,
cluster_filter=None, host_filter=None):
session = database.current_session()
os_versions = {}
target_systems = {}
updated_cluster_hosts = {}
clusters = session.query(models.Cluster).all()
for cluster in clusters:
if cluster_hosts and (
cluster.id not in cluster_hosts and
str(cluster.id) not in cluster_hosts and
cluster.name not in cluster_hosts
):
logging.debug('ignore cluster %s sinc it is not in %s',
cluster.id, cluster_hosts)
continue
adapter = cluster.adapter
if not cluster.adapter:
logging.error('there is no adapter for cluster %s',
cluster.id)
continue
if cluster_filter and not cluster_filter(cluster):
logging.debug('filter cluster %s', cluster.id)
continue
updated_cluster_hosts[cluster.id] = []
os_versions[cluster.id] = 'CentOS-6.5-x86_64'
target_systems[cluster.id] = 'openstack'
if cluster.id in cluster_hosts:
hosts = cluster_hosts[cluster.id]
elif str(cluster.id) in cluster_hosts:
hosts = cluster_hosts[str(cluster.id)]
elif cluster.name in cluster_hosts:
hosts = cluster_hosts[cluster.name]
else:
hosts = []
if not hosts:
hosts = [host.id for host in cluster.hosts]
for host in cluster.hosts:
if (
host.id not in hosts and
str(host.id) not in hosts and
host.hostname not in hosts
):
logging.debug('ignore host %s which is not in %s',
host.id, hosts)
continue
if host_filter and not host_filter(host):
logging.debug('filter host %s', host.id)
continue
updated_cluster_hosts[cluster.id].append(host.id)
return (updated_cluster_hosts, os_versions, target_systems)
"""

View File

@ -155,6 +155,6 @@ def get_adapters_internal(session):
else: else:
logging.info( logging.info(
'ignore adapter %s since it is not deployable', 'ignore adapter %s since it is not deployable',
adapter_dict adapter.to_dict()
) )
return adapter_mapping return adapter_mapping

View File

@ -1204,7 +1204,7 @@ def update_cluster_host_state(
@utils.supported_filters( @utils.supported_filters(
optional_support_keys=UPDATED_CLUSTERHOST_STATE_FIELDS optional_support_keys=UPDATED_CLUSTERHOST_STATE_FIELDS
) )
@database.run_in_session() ## @database.run_in_session()
@user_api.check_user_permission_in_session( @user_api.check_user_permission_in_session(
permission.PERMISSION_UPDATE_CLUSTERHOST_STATE permission.PERMISSION_UPDATE_CLUSTERHOST_STATE
) )
@ -1223,7 +1223,7 @@ def update_clusterhost_state(
@utils.supported_filters( @utils.supported_filters(
optional_support_keys=UPDATED_CLUSTER_STATE_FIELDS optional_support_keys=UPDATED_CLUSTER_STATE_FIELDS
) )
@database.run_in_session() ## @database.run_in_session()
@user_api.check_user_permission_in_session( @user_api.check_user_permission_in_session(
permission.PERMISSION_UPDATE_CLUSTER_STATE permission.PERMISSION_UPDATE_CLUSTER_STATE
) )

View File

@ -589,7 +589,7 @@ def get_host_state(session, getter, host_id, **kwargs):
@utils.supported_filters(UPDATED_STATE_FIELDS) @utils.supported_filters(UPDATED_STATE_FIELDS)
@database.run_in_session() ## @database.run_in_session()
@user_api.check_user_permission_in_session( @user_api.check_user_permission_in_session(
permission.PERMISSION_UPDATE_HOST_STATE permission.PERMISSION_UPDATE_HOST_STATE
) )

View File

@ -21,6 +21,7 @@ import simplejson as json
from sqlalchemy import BigInteger from sqlalchemy import BigInteger
from sqlalchemy import Boolean from sqlalchemy import Boolean
from sqlalchemy import Column from sqlalchemy import Column
from sqlalchemy import ColumnDefault
from sqlalchemy import DateTime from sqlalchemy import DateTime
from sqlalchemy import Enum from sqlalchemy import Enum
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
@ -1885,3 +1886,46 @@ class Network(BASE, TimestampMixin, HelperMixin):
raise exception.InvalidParameter( raise exception.InvalidParameter(
'subnet %s format is uncorrect' % self.subnet 'subnet %s format is uncorrect' % self.subnet
) )
class LogProgressingHistory(BASE):
"""host installing log history for each file.
:param id: int, identity as primary key.
:param pathname: str, the full path of the installing log file. unique.
:param position: int, the position of the log file it has processed.
:param partial_line: str, partial line of the log.
:param progressing: float, indicate the installing progress between 0 to 1.
:param message: str, str, the installing message.
:param severity: Enum, the installing message severity.
('ERROR', 'WARNING', 'INFO')
:param line_matcher_name: str, the line matcher name of the log processor.
:param update_timestamp: datetime, the latest timestamp the entry updated.
"""
__tablename__ = 'log_progressing_history'
id = Column(Integer, primary_key=True)
pathname = Column(String(80), unique=True)
position = Column(Integer, ColumnDefault(0))
partial_line = Column(Text)
percentage = Column(Float, ColumnDefault(0.0))
message = Column(Text)
severity = Column(Enum('ERROR', 'WARNING', 'INFO'), ColumnDefault('INFO'))
line_matcher_name = Column(String(80), ColumnDefault('start'))
update_timestamp = Column(DateTime, default=datetime.datetime.now(),
onupdate=datetime.datetime.now())
def __init__(self, **kwargs):
super(LogProgressingHistory, self).__init__(**kwargs)
def __repr__(self):
return (
'LogProgressingHistory[%r: position %r,'
'partial_line %r,percentage %r,message %r,'
'severity %r]'
) % (
self.pathname, self.position,
self.partial_line,
self.percentage,
self.message,
self.severity
)

View File

@ -19,11 +19,19 @@
import logging import logging
import re import re
from compass.db import database from compass.db.api import cluster as cluster_api
from compass.db.model import Cluster from compass.db.api import database
from compass.db.model import ClusterHost from compass.db.api import host as host_api
from compass.db.api import user as user_api
from compass.db.models import Cluster
from compass.db.models import ClusterHost
from compass.db.models import Host
from compass.log_analyzor.line_matcher import Progress from compass.log_analyzor.line_matcher import Progress
import datetime
class AdapterItemMatcher(object): class AdapterItemMatcher(object):
"""Progress matcher for the os installing or package installing.""" """Progress matcher for the os installing or package installing."""
@ -162,155 +170,101 @@ class AdapterMatcher(object):
@classmethod @classmethod
def _get_host_progress(cls, hostid): def _get_host_progress(cls, hostid):
"""Get Host Progress from database. """Get Host Progress from HostState."""
.. notes::
The function should be called in database session.
"""
session = database.current_session() session = database.current_session()
host = session.query( host = session.query(
ClusterHost Host
).filter_by(id=hostid).first() ).filter_by(id=hostid).first()
if not host: if not host:
logging.error( logging.error(
'there is no host for %s in ClusterHost', hostid) 'there is no host for %s in Host', hostid)
return None, None, None return None, None, None
if not host.state: if not host.state:
logging.error('there is no related HostState for %s', logging.error('there is no related HostState for %s',
hostid) hostid)
return host.fullname, None, None return host.name, None, None
"""
return ( return (
host.fullname, host.name,
host.state.state, host.state.state,
Progress(host.state.progress, Progress(host.state.percentage,
host.state.message, host.state.message,
host.state.severity)) host.state.severity))
"""
return {
'os': (
host.fullname,
host.state.state,
Progress(host.state.os_progress,
host.state.os_message,
host.state.os_severity)),
'package': (
host.fullname,
host.state.state,
Progress(host.state.progress,
host.state.message,
host.state.severity))}
@classmethod @classmethod
def _update_host_os_progress(cls, hostid, os_progress): def _get_clusterhost_progress(cls, hostid):
"""Update host progress to database. """Get ClusterHost progress from ClusterHostState."""
session = database.current_session()
clusterhost = session.query(
ClusterHost
).filter_by(id=hostid).first()
if not clusterhost:
logging.error(
'there is no clusterhost for %s in ClusterHost',
hostid
)
return None, None, None
if not clusterhost.state:
logging.error(
'there is no related ClusterHostState for %s',
hostid
)
return clusterhost.name, None, None
return (
clusterhost.name,
clusterhost.state.state,
Progress(clusterhost.state.percentage,
clusterhost.state.message,
clusterhost.state.severity))
@classmethod
def _update_host_progress(cls, hostid, host_progress, updater):
"""Updates host progress to db."""
.. note::
The function should be called in database session.
"""
session = database.current_session() session = database.current_session()
host = session.query( host = session.query(
ClusterHost).filter_by(id=hostid).first() Host).filter_by(id=hostid).first()
if not host: if not host:
logging.error( logging.error(
'there is no host for %s in ClusterHost', hostid) 'there is no host for %s in table Host',
return hostid
)
if not host.state: if not host.state:
logging.error( logging.error(
'there is no related HostState for %s', hostid) 'there is no related HostState for %s',
return hostid
)
logging.debug('os progress: %s', os_progress.progress) if host.state.percentage > host_progress.progress:
if host.state.os_progress > os_progress.progress:
logging.error( logging.error(
'host %s os_progress is not increased ' 'host %s progress has not been increased'
'from %s to %s', ' from %s to $s',
hostid, host.state, os_progress) hostid, host.state, host_progress
)
return return
if (
host.state.os_progress == os_progress.progress and if (host.state.percentage == host_progress.progress and
host.state.os_message == os_progress.message host.state.message == host_progress.message):
):
logging.info( logging.info(
'ignore update host %s progress %s to %s', 'host %s update ignored due to same progress'
hostid, os_progress, host.state) 'in database',
hostid
)
return return
host.state.os_progress = os_progress.progress host.state.percentage = host_progress.progress
"""host.state.os_progress = progress.progress""" host.state.message = host_progress.message
host.state.os_message = os_progress.message if host_progress.severity:
if os_progress.severity: host.state.severity = host_progress.severity
host.state.os_severity = os_progress.severity
if host.state.os_progress >= 1.0: if host.state.percentage >= 1.0:
host.state.os_state = 'OS_READY' host.state.state = 'SUCCESSFUL'
if host.state.os_severity == 'ERROR':
host.state.os_state = 'ERROR'
if host.state.os_state != 'INSTALLING':
host.mutable = True
logging.debug(
'update host %s state %s',
hostid, host.state)
@classmethod
def _update_host_package_progress(cls, hostid, progress):
"""Update host progress to database.
.. note::
The function should be called in database session.
"""
session = database.current_session()
host = session.query(
ClusterHost).filter_by(id=hostid).first()
logging.debug('package progress: %s', progress.progress)
logging.debug('package ssssstate: %s', host.state.state)
if not host:
logging.error(
'there is no host for %s in ClusterHost', hostid)
return
if not host.state:
logging.error(
'there is no related HostState for %s', hostid)
return
if not host.state.state in ['OS_READY', 'INSTALLING']:
logging.error(
'host %s issssss not in INSTALLING state',
hostid)
return
if host.state.progress > progress.progress:
logging.error(
'host %s progress is not increased '
'from %s to %s',
hostid, host.state, progress)
return
if (
host.state.progress == progress.progress and
host.state.message == progress.message
):
logging.info(
'ignore update host %s progress %s to %s',
hostid, progress, host.state)
return
host.state.progress = progress.progress
host.state.message = progress.message
if progress.severity:
host.state.severity = progress.severity
if host.state.progress >= 1.0:
host.state.state = 'READY'
if host.state.severity == 'ERROR': if host.state.severity == 'ERROR':
host.state.state = 'ERROR' host.state.state = 'ERROR'
@ -318,10 +272,81 @@ class AdapterMatcher(object):
if host.state.state != 'INSTALLING': if host.state.state != 'INSTALLING':
host.mutable = True host.mutable = True
host_api.update_host_state(
session,
updater,
hostid,
state=host.state.state,
percentage=host.state.percentage,
message=host.state.message,
id=hostid
)
logging.debug( logging.debug(
'update host %s state %s', 'update host %s state %s',
hostid, host.state) hostid, host.state)
@classmethod
def _update_clusterhost_progress(
cls,
hostid,
clusterhost_progress,
updater
):
session = database.current_session()
clusterhost = session.query(
ClusterHost).filter_by(id=hostid).first()
if not clusterhost.state:
logging.error(
'ClusterHost state not found for %s',
hostid)
if clusterhost.state.percentage > clusterhost_progress.progress:
logging.error(
'clusterhost %s state has not been increased'
' from %s to %s',
hostid, clusterhost.state, clusterhost_progress
)
return
if (clusterhost.state.percentage == clusterhost_progress.progress and
clusterhost.state.message == clusterhost_progress.message):
logging.info(
'clusterhost %s update ignored due to same progress'
'in database',
hostid
)
return
clusterhost.state.percentage = clusterhost_progress.progress
clusterhost.state.message = clusterhost_progress.message
if clusterhost_progress.severity:
clusterhost.state.severity = clusterhost_progress.severity
if clusterhost.state.percentage >= 1.0:
clusterhost.state.state = 'SUCCESSFUL'
if clusterhost.state.severity == 'ERROR':
clusterhost.state.state = 'ERROR'
if clusterhost.state.state != 'INSTALLING':
clusterhost.mutable = True
cluster_api.update_clusterhost_state(
session,
updater,
hostid,
state=clusterhost.state.state,
percentage=clusterhost.state.percentage,
message=clusterhost.state.message
)
logging.debug(
'update clusterhost %s state %s',
hostid, clusterhost.state)
@classmethod @classmethod
def _update_cluster_progress(cls, clusterid): def _update_cluster_progress(cls, clusterid):
"""Update cluster installing progress to database. """Update cluster installing progress to database.
@ -351,18 +376,41 @@ class AdapterMatcher(object):
cluster_progress = 0.0 cluster_progress = 0.0
cluster_messages = {} cluster_messages = {}
cluster_severities = set([]) cluster_severities = set([])
cluster_installing_hosts = 0
cluster_failed_hosts = 0
hostids = [] hostids = []
for host in cluster.hosts: clusterhosts = cluster.clusterhosts
hosts = [clusterhost.host for clusterhost in clusterhosts]
for host in hosts:
if host.state: if host.state:
hostids.append(host.id) hostids.append(host.id)
cluster_progress += host.state.progress cluster_progress += host.state.percentage
if host.state.message: if host.state.message:
cluster_messages[host.hostname] = host.state.message cluster_messages[host.name] = host.state.message
if host.state.severity: if host.state.severity:
cluster_severities.add(host.state.severity) cluster_severities.add(host.state.severity)
cluster.state.progress = cluster_progress / len(hostids) for clusterhost in clusterhosts:
if clusterhost.state:
cluster_progress += clusterhost.state.percentage
if clusterhost.state.state == 'INSTALLING':
cluster_installing_hosts += 1
elif (clusterhost.host.state.state not in
['ERROR', 'INITIALIZED'] and
clusterhost.state.state != 'ERORR'):
cluster_installing_hosts += 1
elif (clusterhost.state.state == 'ERROR' or
clusterhost.host.state.state == 'ERROR'):
cluster_failed_hosts += 1
if clusterhost.state.message:
cluster_messages[host.name] = clusterhost.state.message
if clusterhost.state.severity:
cluster_severities.add(clusterhost.state.severity)
cluster.state.percentage = cluster_progress / (len(hostids) * 2)
cluster.state.message = '\n'.join( cluster.state.message = '\n'.join(
[ [
'%s: %s' % (hostname, message) '%s: %s' % (hostname, message)
@ -374,7 +422,7 @@ class AdapterMatcher(object):
cluster.state.severity = severity cluster.state.severity = severity
break break
if cluster.state.progress >= 1.0: if cluster.state.percentage >= 1.0:
cluster.state.state = 'READY' cluster.state.state = 'READY'
if cluster.state.severity == 'ERROR': if cluster.state.severity == 'ERROR':
@ -383,85 +431,92 @@ class AdapterMatcher(object):
if cluster.state.state != 'INSTALLING': if cluster.state.state != 'INSTALLING':
cluster.mutable = True cluster.mutable = True
cluster.state.installing_hosts = cluster_installing_hosts
cluster.state.total_hosts = len(clusterhosts)
cluster.state.failed_hosts = cluster_failed_hosts
cluster.state.completed_hosts = cluster.state.total_hosts - \
cluster.state.installing_hosts - cluster.state.failed_hosts
logging.debug( logging.debug(
'update cluster %s state %s', 'update cluster %s state %s',
clusterid, cluster.state) clusterid, cluster.state)
def update_progress(self, clusterid, hostids): def update_progress(self, clusterid, hostids):
"""Update cluster progress and hosts progresses.
:param clusterid: the id of the cluster to update. host_progresses = {}
:type clusterid: int. clusterhost_progresses = {}
:param hostids: the ids of the hosts to update. updater = user_api.get_user_object(
:type hostids: list of int. 'admin@abc.com',
""" expire_timestamp=datetime.datetime.now() +
logging.debug('printing os_matcher %s', self.__str__()) datetime.timedelta(seconds=10000))
host_os_progresses = {}
host_package_progresses = {}
with database.session(): with database.session():
for hostid in hostids: for hostid in hostids:
host_overall_state = ( host_name, host_state, host_progress = \
self._get_host_progress(hostid)) self._get_host_progress(hostid)
logging.debug('host overall state: %s', host_overall_state) _, clusterhost_state, clusterhost_progress = \
fullname, host_state, host_os_progress = ( self._get_clusterhost_progress(hostid)
host_overall_state['os'])
_, _, host_package_progress = host_overall_state['package'] if (not host_name or
if (not fullname or not host_progress or
not host_os_progress or not clusterhost_progress):
not host_package_progress):
logging.error( logging.error(
'nothing to update host %s', 'nothing to update host %s',
fullname) host_name)
continue continue
logging.debug('got host %s state %s os_progress %s' logging.debug('got host %s host_state: %s '
'package_progress %s', 'host_progress: %s, '
fullname, 'clusterhost_state: %s, '
host_state, host_os_progress, 'clusterhost_progress: %s ',
host_package_progress) host_name,
host_state,
host_progress,
clusterhost_state,
clusterhost_progress)
host_os_progresses[hostid] = ( host_progresses[hostid] = (
fullname, host_state, host_os_progress) host_name, host_state, host_progress)
host_package_progresses[hostid] = ( clusterhost_progresses[hostid] = (
fullname, host_state, host_package_progress) host_name, clusterhost_state, clusterhost_progress)
for hostid, host_value in host_os_progresses.items(): for hostid, host_value in host_progresses.items():
fullname, host_state, host_os_progress = host_value host_name, host_state, host_progress = host_value
if host_state == 'INSTALLING' and host_os_progress.progress < 1.0: if (host_state == 'INSTALLING' and
self.os_matcher_.update_progress( host_progress.progress < 1.0):
fullname, host_os_progress) self.os_matcher_.update_progress(
else: host_name, host_progress)
logging.error( else:
'there is no need to update host %s ' logging.error(
'OS progress: state %s os_progress %s', 'there is no need to update host %s '
fullname, host_state, host_os_progress) 'progress: state %s progress %s',
host_name, host_state, host_progress)
for hostid, host_value in host_package_progresses.items(): for hostid, clusterhost_value in clusterhost_progresses.items():
fullname, host_state, host_package_progress = host_value host_name, clusterhost_state, clusterhost_progress = \
if (host_state == 'INSTALLING' and clusterhost_value
host_package_progress.progress < 1.0): if (clusterhost_state == 'INSTALLING' and
self.package_matcher_.update_progress( clusterhost_progress.progress < 1.0):
fullname, host_package_progress) self.package_matcher_.update_progress(
else: host_name, clusterhost_progress)
logging.error( else:
'there is no need to update host %s ' logging.error(
'Package progress: state %s package_progress %s', 'no need to update clusterhost %s'
fullname, host_state, host_package_progress) 'progress: state %s progress %s',
host_name, clusterhost_state, clusterhost_progress)
with database.session():
for hostid in hostids: for hostid in hostids:
if hostid not in host_os_progresses: if hostid not in host_progresses:
continue
if hostid not in clusterhost_progresses:
continue continue
if hostid not in host_package_progresses: _, _, host_progress = host_progresses[hostid]
continue _, _, clusterhost_progress = clusterhost_progresses[hostid]
self._update_host_progress(hostid, host_progress, updater)
_, _, host_os_progress = host_os_progresses[hostid] self._update_clusterhost_progress(
_, _, host_package_progress = host_package_progresses[hostid]
self._update_host_os_progress(hostid, host_os_progress)
self._update_host_package_progress(
hostid, hostid,
host_package_progress clusterhost_progress,
updater
) )
self._update_cluster_progress(clusterid) self._update_cluster_progress(clusterid)

View File

@ -19,8 +19,8 @@
import logging import logging
import os.path import os.path
from compass.db import database from compass.db.api import database
from compass.db.model import LogProgressingHistory from compass.db.models import LogProgressingHistory
from compass.log_analyzor.line_matcher import Progress from compass.log_analyzor.line_matcher import Progress
from compass.utils import setting_wrapper as setting from compass.utils import setting_wrapper as setting
@ -109,24 +109,24 @@ class FileReader(object):
in the last run, the progress, the message and the in the last run, the progress, the message and the
severity it has got in the last run. severity it has got in the last run.
""" """
with database.session() as session: session = database.current_session()
history = session.query( history = session.query(
LogProgressingHistory LogProgressingHistory
).filter_by( ).filter_by(
pathname=self.pathname_ pathname=self.pathname_
).first() ).first()
if history: if history:
self.position_ = history.position self.position_ = history.position
self.partial_line_ = history.partial_line self.partial_line_ = history.partial_line
line_matcher_name = history.line_matcher_name line_matcher_name = history.line_matcher_name
progress = Progress(history.progress, progress = Progress(history.percentage,
history.message, history.message,
history.severity) history.severity)
else: else:
line_matcher_name = 'start' line_matcher_name = 'start'
progress = Progress(0.0, '', None) progress = Progress(0.0, '', None)
return line_matcher_name, progress return line_matcher_name, progress
def update_history(self, line_matcher_name, progress): def update_history(self, line_matcher_name, progress):
"""Update log_progressing_history table. """Update log_progressing_history table.
@ -138,37 +138,36 @@ class FileReader(object):
The function should be called out of database session. The function should be called out of database session.
It updates the log_processing_history table. It updates the log_processing_history table.
""" """
with database.session() as session: session = database.current_session()
history = session.query(LogProgressingHistory).filter_by( history = session.query(LogProgressingHistory).filter_by(
pathname=self.pathname_).first() pathname=self.pathname_).first()
if history:
if history.position >= self.position_:
logging.error(
'%s history position %s is ahead of current '
'position %s',
self.pathname_,
history.position,
self.position_)
return
if history: history.position = self.position_
if history.position >= self.position_: history.partial_line = self.partial_line_
logging.error( history.line_matcher_name = line_matcher_name
'%s history position %s is ahead of current ' history.progress = progress.progress
'position %s', history.message = progress.message
self.pathname_, history.severity = progress.severity
history.position, else:
self.position_) history = LogProgressingHistory(
return pathname=self.pathname_, position=self.position_,
partial_line=self.partial_line_,
history.position = self.position_ line_matcher_name=line_matcher_name,
history.partial_line = self.partial_line_ percentage=progress.progress,
history.line_matcher_name = line_matcher_name message=progress.message,
history.progress = progress.progress severity=progress.severity)
history.message = progress.message session.merge(history)
history.severity = progress.severity logging.debug('update file %s to history %s',
else: self.pathname_, history)
history = LogProgressingHistory(
pathname=self.pathname_, position=self.position_,
partial_line=self.partial_line_,
line_matcher_name=line_matcher_name,
progress=progress.progress,
message=progress.message,
severity=progress.severity)
session.merge(history)
logging.debug('update file %s to history %s',
self.pathname_, history)
def readline(self): def readline(self):
"""Generate each line of the log file.""" """Generate each line of the log file."""

View File

@ -433,7 +433,7 @@ def _get_package_adapter_matcher(package_installer, target_system):
else: else:
logging.debug('configuration %s does not match %s and %s', logging.debug('configuration %s does not match %s and %s',
configuration, target_system, package_installer) configuration, target_system, package_installer)
logging.error('No configuration found for os installer %s os %s', logging.error('No configuration found for package installer %s os %s',
package_installer, target_system) package_installer, target_system)
return None return None

View File

@ -24,6 +24,7 @@ from celery.signals import setup_logging
from compass.actions import deploy from compass.actions import deploy
from compass.actions import poll_switch from compass.actions import poll_switch
from compass.actions import reinstall from compass.actions import reinstall
from compass.actions import update_progress
from compass.db.api import adapter_holder as adapter_api from compass.db.api import adapter_holder as adapter_api
from compass.db.api import database from compass.db.api import database
from compass.db.api import metadata_holder as metadata_api from compass.db.api import metadata_holder as metadata_api
@ -118,3 +119,16 @@ def reset_host(host_id):
:type cluster_hosts: dict of int to list of int :type cluster_hosts: dict of int to list of int
""" """
pass pass
@celery.task(name='compass.tasks.update_progress')
def update_clusters_progress(cluster_hosts):
"""Calculate the installing progress of the given cluster.
:param cluster_hosts: the cluster and hosts of each cluster to update.
:type cluster_hosts: dict of int to list of int
"""
try:
update_progress.update_progress(cluster_hosts)
except Exception as error:
logging.exception(error)

View File

@ -190,3 +190,42 @@ def load_configs(
raise error raise error
configs.append(config_locals) configs.append(config_locals)
return configs return configs
def is_instance(instance, expected_types):
"""Check instance type is in one of expected types.
:param instance: instance to check the type.
:param expected_types: types to check if instance type is in them.
:type expected_types: list of type
:returns: True if instance type is in expect_types.
"""
for expected_type in expected_types:
if isinstance(instance, expected_type):
return True
return False
def get_clusters_from_str(clusters_str):
"""get clusters from string."""
clusters = {}
for cluster_and_hosts in clusters_str.split(';'):
if not cluster_and_hosts:
continue
if ':' in cluster_and_hosts:
cluster_str, hosts_str = cluster_and_hosts.split(
':', 1)
else:
cluster_str = cluster_and_hosts
hosts_str = ''
hosts = [
host for host in hosts_str.split(',')
if host
]
clusters[cluster_str] = hosts
return clusters