diff --git a/compass/actions/update_progress.py b/compass/actions/update_progress.py new file mode 100644 index 00000000..96b000d4 --- /dev/null +++ b/compass/actions/update_progress.py @@ -0,0 +1,113 @@ +# Copyright 2014 Huawei Technologies Co. Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Module to update status and installing progress of the given cluster. + + .. moduleauthor:: Xiaodong Wang +""" +import logging + +from compass.actions import util +from compass.db.api import database +from compass.db import models +from compass.log_analyzor import progress_calculator +from compass.utils import setting_wrapper as setting + + +def _cluster_filter(cluster): + """filter cluster.""" + if not cluster.state: + logging.error('there is no state for cluster %s', + cluster.id) + return False + + if cluster.state.state != 'INSTALLING': + logging.error('the cluster %s state %s is not installing', + cluster.id, cluster.state.state) + return False + + return True + + +def _host_filter(host): + """filter host.""" + if not host.state: + logging.error('there is no state for host %s', + host.id) + return False + + if host.state.state != 'INSTALLING': + logging.error('the host %s state %s is not installing', + host.id, host.state.state) + return False + + return True + + +def update_progress(cluster_hosts): + """Update status and installing progress of the given cluster. + + :param cluster_hosts: clusters and hosts in each cluster to update. + :type cluster_hosts: dict of int or str to list of int or str + + .. note:: + The function should be called out of the database session scope. + In the function, it will update the database cluster_state and + host_state table for the deploying cluster and hosts. + + The function will also query log_progressing_history table to get + the lastest installing progress and the position of log it has + processed in the last run. The function uses these information to + avoid recalculate the progress from the beginning of the log file. + After the progress got updated, these information will be stored back + to the log_progressing_history for next time run. + """ + with util.lock('log_progressing', blocking=False) as lock: + if not lock: + logging.error( + 'failed to acquire lock to calculate installation progress') + return + + logging.info('update installing progress of cluster_hosts: %s', + cluster_hosts) + os_names = {} + distributed_systems = {} + with database.session() as session: + cluster = session.query(models.Cluster).first() + clusterid = cluster.id + + adapter = cluster.adapter + os_installer = adapter.os_installer + os_installer_name = os_installer.instance_name + package_installer = adapter.package_installer + package_installer_name = package_installer.instance_name + + distributed_system = cluster.distributed_system + distributed_system_name = distributed_system.name + + host = session.query(models.Host).first() + os_name = host.os_name + os_names[clusterid] = os_name + distributed_systems[clusterid] = distributed_system_name + + clusterhosts = cluster.clusterhosts + hostids = [clusterhost.host.id for clusterhost in clusterhosts] + cluster_hosts.update({clusterid: hostids}) + + progress_calculator.update_progress( + os_installer_name, + os_names, + package_installer_name, + distributed_systems, + cluster_hosts) diff --git a/compass/actions/util.py b/compass/actions/util.py index 84b89681..29c2a44e 100644 --- a/compass/actions/util.py +++ b/compass/actions/util.py @@ -48,3 +48,66 @@ def lock(lock_name, blocking=True, timeout=10): instance_lock.acquired_until = 0 instance_lock.release() logging.debug('released lock %s', lock_name) + +""" +def update_cluster_hosts(cluster_hosts, + cluster_filter=None, host_filter=None): + session = database.current_session() + os_versions = {} + target_systems = {} + updated_cluster_hosts = {} + clusters = session.query(models.Cluster).all() + for cluster in clusters: + if cluster_hosts and ( + cluster.id not in cluster_hosts and + str(cluster.id) not in cluster_hosts and + cluster.name not in cluster_hosts + ): + logging.debug('ignore cluster %s sinc it is not in %s', + cluster.id, cluster_hosts) + continue + + adapter = cluster.adapter + if not cluster.adapter: + logging.error('there is no adapter for cluster %s', + cluster.id) + continue + + if cluster_filter and not cluster_filter(cluster): + logging.debug('filter cluster %s', cluster.id) + continue + + updated_cluster_hosts[cluster.id] = [] + os_versions[cluster.id] = 'CentOS-6.5-x86_64' + target_systems[cluster.id] = 'openstack' + + if cluster.id in cluster_hosts: + hosts = cluster_hosts[cluster.id] + elif str(cluster.id) in cluster_hosts: + hosts = cluster_hosts[str(cluster.id)] + elif cluster.name in cluster_hosts: + hosts = cluster_hosts[cluster.name] + else: + hosts = [] + + if not hosts: + hosts = [host.id for host in cluster.hosts] + + for host in cluster.hosts: + if ( + host.id not in hosts and + str(host.id) not in hosts and + host.hostname not in hosts + ): + logging.debug('ignore host %s which is not in %s', + host.id, hosts) + continue + + if host_filter and not host_filter(host): + logging.debug('filter host %s', host.id) + continue + + updated_cluster_hosts[cluster.id].append(host.id) + + return (updated_cluster_hosts, os_versions, target_systems) +""" diff --git a/compass/db/api/adapter.py b/compass/db/api/adapter.py index 1dd792b5..694e95e1 100644 --- a/compass/db/api/adapter.py +++ b/compass/db/api/adapter.py @@ -155,6 +155,6 @@ def get_adapters_internal(session): else: logging.info( 'ignore adapter %s since it is not deployable', - adapter_dict + adapter.to_dict() ) return adapter_mapping diff --git a/compass/db/api/cluster.py b/compass/db/api/cluster.py index 8804c0dc..0fa0cf3a 100644 --- a/compass/db/api/cluster.py +++ b/compass/db/api/cluster.py @@ -1204,7 +1204,7 @@ def update_cluster_host_state( @utils.supported_filters( optional_support_keys=UPDATED_CLUSTERHOST_STATE_FIELDS ) -@database.run_in_session() +## @database.run_in_session() @user_api.check_user_permission_in_session( permission.PERMISSION_UPDATE_CLUSTERHOST_STATE ) @@ -1223,7 +1223,7 @@ def update_clusterhost_state( @utils.supported_filters( optional_support_keys=UPDATED_CLUSTER_STATE_FIELDS ) -@database.run_in_session() +## @database.run_in_session() @user_api.check_user_permission_in_session( permission.PERMISSION_UPDATE_CLUSTER_STATE ) diff --git a/compass/db/api/host.py b/compass/db/api/host.py index fefd7304..db2a2e57 100644 --- a/compass/db/api/host.py +++ b/compass/db/api/host.py @@ -589,7 +589,7 @@ def get_host_state(session, getter, host_id, **kwargs): @utils.supported_filters(UPDATED_STATE_FIELDS) -@database.run_in_session() +## @database.run_in_session() @user_api.check_user_permission_in_session( permission.PERMISSION_UPDATE_HOST_STATE ) diff --git a/compass/db/models.py b/compass/db/models.py index ae2e23ef..faf6c7f1 100644 --- a/compass/db/models.py +++ b/compass/db/models.py @@ -21,6 +21,7 @@ import simplejson as json from sqlalchemy import BigInteger from sqlalchemy import Boolean from sqlalchemy import Column +from sqlalchemy import ColumnDefault from sqlalchemy import DateTime from sqlalchemy import Enum from sqlalchemy.ext.declarative import declarative_base @@ -1885,3 +1886,46 @@ class Network(BASE, TimestampMixin, HelperMixin): raise exception.InvalidParameter( 'subnet %s format is uncorrect' % self.subnet ) + + +class LogProgressingHistory(BASE): + """host installing log history for each file. + + :param id: int, identity as primary key. + :param pathname: str, the full path of the installing log file. unique. + :param position: int, the position of the log file it has processed. + :param partial_line: str, partial line of the log. + :param progressing: float, indicate the installing progress between 0 to 1. + :param message: str, str, the installing message. + :param severity: Enum, the installing message severity. + ('ERROR', 'WARNING', 'INFO') + :param line_matcher_name: str, the line matcher name of the log processor. + :param update_timestamp: datetime, the latest timestamp the entry updated. + """ + __tablename__ = 'log_progressing_history' + id = Column(Integer, primary_key=True) + pathname = Column(String(80), unique=True) + position = Column(Integer, ColumnDefault(0)) + partial_line = Column(Text) + percentage = Column(Float, ColumnDefault(0.0)) + message = Column(Text) + severity = Column(Enum('ERROR', 'WARNING', 'INFO'), ColumnDefault('INFO')) + line_matcher_name = Column(String(80), ColumnDefault('start')) + update_timestamp = Column(DateTime, default=datetime.datetime.now(), + onupdate=datetime.datetime.now()) + + def __init__(self, **kwargs): + super(LogProgressingHistory, self).__init__(**kwargs) + + def __repr__(self): + return ( + 'LogProgressingHistory[%r: position %r,' + 'partial_line %r,percentage %r,message %r,' + 'severity %r]' + ) % ( + self.pathname, self.position, + self.partial_line, + self.percentage, + self.message, + self.severity + ) diff --git a/compass/log_analyzor/adapter_matcher.py b/compass/log_analyzor/adapter_matcher.py index ff1a0abb..f0dadf6e 100644 --- a/compass/log_analyzor/adapter_matcher.py +++ b/compass/log_analyzor/adapter_matcher.py @@ -19,11 +19,19 @@ import logging import re -from compass.db import database -from compass.db.model import Cluster -from compass.db.model import ClusterHost +from compass.db.api import cluster as cluster_api +from compass.db.api import database +from compass.db.api import host as host_api +from compass.db.api import user as user_api + +from compass.db.models import Cluster +from compass.db.models import ClusterHost +from compass.db.models import Host + from compass.log_analyzor.line_matcher import Progress +import datetime + class AdapterItemMatcher(object): """Progress matcher for the os installing or package installing.""" @@ -162,155 +170,101 @@ class AdapterMatcher(object): @classmethod def _get_host_progress(cls, hostid): - """Get Host Progress from database. + """Get Host Progress from HostState.""" - .. notes:: - The function should be called in database session. - """ session = database.current_session() host = session.query( - ClusterHost + Host ).filter_by(id=hostid).first() if not host: logging.error( - 'there is no host for %s in ClusterHost', hostid) + 'there is no host for %s in Host', hostid) return None, None, None if not host.state: logging.error('there is no related HostState for %s', hostid) - return host.fullname, None, None + return host.name, None, None - """ return ( - host.fullname, + host.name, host.state.state, - Progress(host.state.progress, + Progress(host.state.percentage, host.state.message, host.state.severity)) - """ - return { - 'os': ( - host.fullname, - host.state.state, - Progress(host.state.os_progress, - host.state.os_message, - host.state.os_severity)), - 'package': ( - host.fullname, - host.state.state, - Progress(host.state.progress, - host.state.message, - host.state.severity))} @classmethod - def _update_host_os_progress(cls, hostid, os_progress): - """Update host progress to database. + def _get_clusterhost_progress(cls, hostid): + """Get ClusterHost progress from ClusterHostState.""" + + session = database.current_session() + clusterhost = session.query( + ClusterHost + ).filter_by(id=hostid).first() + if not clusterhost: + logging.error( + 'there is no clusterhost for %s in ClusterHost', + hostid + ) + return None, None, None + + if not clusterhost.state: + logging.error( + 'there is no related ClusterHostState for %s', + hostid + ) + return clusterhost.name, None, None + + return ( + clusterhost.name, + clusterhost.state.state, + Progress(clusterhost.state.percentage, + clusterhost.state.message, + clusterhost.state.severity)) + + @classmethod + def _update_host_progress(cls, hostid, host_progress, updater): + """Updates host progress to db.""" - .. note:: - The function should be called in database session. - """ session = database.current_session() host = session.query( - ClusterHost).filter_by(id=hostid).first() + Host).filter_by(id=hostid).first() if not host: logging.error( - 'there is no host for %s in ClusterHost', hostid) - return + 'there is no host for %s in table Host', + hostid + ) if not host.state: logging.error( - 'there is no related HostState for %s', hostid) - return + 'there is no related HostState for %s', + hostid + ) - logging.debug('os progress: %s', os_progress.progress) - - if host.state.os_progress > os_progress.progress: + if host.state.percentage > host_progress.progress: logging.error( - 'host %s os_progress is not increased ' - 'from %s to %s', - hostid, host.state, os_progress) + 'host %s progress has not been increased' + ' from %s to $s', + hostid, host.state, host_progress + ) return - if ( - host.state.os_progress == os_progress.progress and - host.state.os_message == os_progress.message - ): + + if (host.state.percentage == host_progress.progress and + host.state.message == host_progress.message): logging.info( - 'ignore update host %s progress %s to %s', - hostid, os_progress, host.state) + 'host %s update ignored due to same progress' + 'in database', + hostid + ) return - host.state.os_progress = os_progress.progress - """host.state.os_progress = progress.progress""" - host.state.os_message = os_progress.message - if os_progress.severity: - host.state.os_severity = os_progress.severity + host.state.percentage = host_progress.progress + host.state.message = host_progress.message + if host_progress.severity: + host.state.severity = host_progress.severity - if host.state.os_progress >= 1.0: - host.state.os_state = 'OS_READY' - - if host.state.os_severity == 'ERROR': - host.state.os_state = 'ERROR' - - if host.state.os_state != 'INSTALLING': - host.mutable = True - - logging.debug( - 'update host %s state %s', - hostid, host.state) - - @classmethod - def _update_host_package_progress(cls, hostid, progress): - """Update host progress to database. - - .. note:: - The function should be called in database session. - """ - session = database.current_session() - host = session.query( - ClusterHost).filter_by(id=hostid).first() - - logging.debug('package progress: %s', progress.progress) - logging.debug('package ssssstate: %s', host.state.state) - if not host: - logging.error( - 'there is no host for %s in ClusterHost', hostid) - return - - if not host.state: - logging.error( - 'there is no related HostState for %s', hostid) - return - - if not host.state.state in ['OS_READY', 'INSTALLING']: - logging.error( - 'host %s issssss not in INSTALLING state', - hostid) - return - - if host.state.progress > progress.progress: - logging.error( - 'host %s progress is not increased ' - 'from %s to %s', - hostid, host.state, progress) - return - - if ( - host.state.progress == progress.progress and - host.state.message == progress.message - ): - logging.info( - 'ignore update host %s progress %s to %s', - hostid, progress, host.state) - return - - host.state.progress = progress.progress - host.state.message = progress.message - if progress.severity: - host.state.severity = progress.severity - - if host.state.progress >= 1.0: - host.state.state = 'READY' + if host.state.percentage >= 1.0: + host.state.state = 'SUCCESSFUL' if host.state.severity == 'ERROR': host.state.state = 'ERROR' @@ -318,10 +272,81 @@ class AdapterMatcher(object): if host.state.state != 'INSTALLING': host.mutable = True + host_api.update_host_state( + session, + updater, + hostid, + state=host.state.state, + percentage=host.state.percentage, + message=host.state.message, + id=hostid + ) + logging.debug( 'update host %s state %s', hostid, host.state) + @classmethod + def _update_clusterhost_progress( + cls, + hostid, + clusterhost_progress, + updater + ): + + session = database.current_session() + clusterhost = session.query( + ClusterHost).filter_by(id=hostid).first() + + if not clusterhost.state: + logging.error( + 'ClusterHost state not found for %s', + hostid) + + if clusterhost.state.percentage > clusterhost_progress.progress: + logging.error( + 'clusterhost %s state has not been increased' + ' from %s to %s', + hostid, clusterhost.state, clusterhost_progress + ) + return + + if (clusterhost.state.percentage == clusterhost_progress.progress and + clusterhost.state.message == clusterhost_progress.message): + logging.info( + 'clusterhost %s update ignored due to same progress' + 'in database', + hostid + ) + return + + clusterhost.state.percentage = clusterhost_progress.progress + clusterhost.state.message = clusterhost_progress.message + if clusterhost_progress.severity: + clusterhost.state.severity = clusterhost_progress.severity + + if clusterhost.state.percentage >= 1.0: + clusterhost.state.state = 'SUCCESSFUL' + + if clusterhost.state.severity == 'ERROR': + clusterhost.state.state = 'ERROR' + + if clusterhost.state.state != 'INSTALLING': + clusterhost.mutable = True + + cluster_api.update_clusterhost_state( + session, + updater, + hostid, + state=clusterhost.state.state, + percentage=clusterhost.state.percentage, + message=clusterhost.state.message + ) + + logging.debug( + 'update clusterhost %s state %s', + hostid, clusterhost.state) + @classmethod def _update_cluster_progress(cls, clusterid): """Update cluster installing progress to database. @@ -351,18 +376,41 @@ class AdapterMatcher(object): cluster_progress = 0.0 cluster_messages = {} cluster_severities = set([]) + cluster_installing_hosts = 0 + cluster_failed_hosts = 0 hostids = [] - for host in cluster.hosts: + clusterhosts = cluster.clusterhosts + hosts = [clusterhost.host for clusterhost in clusterhosts] + for host in hosts: if host.state: hostids.append(host.id) - cluster_progress += host.state.progress + cluster_progress += host.state.percentage if host.state.message: - cluster_messages[host.hostname] = host.state.message + cluster_messages[host.name] = host.state.message if host.state.severity: cluster_severities.add(host.state.severity) - cluster.state.progress = cluster_progress / len(hostids) + for clusterhost in clusterhosts: + if clusterhost.state: + cluster_progress += clusterhost.state.percentage + if clusterhost.state.state == 'INSTALLING': + cluster_installing_hosts += 1 + elif (clusterhost.host.state.state not in + ['ERROR', 'INITIALIZED'] and + clusterhost.state.state != 'ERORR'): + cluster_installing_hosts += 1 + elif (clusterhost.state.state == 'ERROR' or + clusterhost.host.state.state == 'ERROR'): + cluster_failed_hosts += 1 + + if clusterhost.state.message: + cluster_messages[host.name] = clusterhost.state.message + + if clusterhost.state.severity: + cluster_severities.add(clusterhost.state.severity) + + cluster.state.percentage = cluster_progress / (len(hostids) * 2) cluster.state.message = '\n'.join( [ '%s: %s' % (hostname, message) @@ -374,7 +422,7 @@ class AdapterMatcher(object): cluster.state.severity = severity break - if cluster.state.progress >= 1.0: + if cluster.state.percentage >= 1.0: cluster.state.state = 'READY' if cluster.state.severity == 'ERROR': @@ -383,85 +431,92 @@ class AdapterMatcher(object): if cluster.state.state != 'INSTALLING': cluster.mutable = True + cluster.state.installing_hosts = cluster_installing_hosts + cluster.state.total_hosts = len(clusterhosts) + cluster.state.failed_hosts = cluster_failed_hosts + cluster.state.completed_hosts = cluster.state.total_hosts - \ + cluster.state.installing_hosts - cluster.state.failed_hosts + logging.debug( 'update cluster %s state %s', clusterid, cluster.state) def update_progress(self, clusterid, hostids): - """Update cluster progress and hosts progresses. - :param clusterid: the id of the cluster to update. - :type clusterid: int. - :param hostids: the ids of the hosts to update. - :type hostids: list of int. - """ - logging.debug('printing os_matcher %s', self.__str__()) - host_os_progresses = {} - host_package_progresses = {} + host_progresses = {} + clusterhost_progresses = {} + updater = user_api.get_user_object( + 'admin@abc.com', + expire_timestamp=datetime.datetime.now() + + datetime.timedelta(seconds=10000)) with database.session(): for hostid in hostids: - host_overall_state = ( - self._get_host_progress(hostid)) - logging.debug('host overall state: %s', host_overall_state) - fullname, host_state, host_os_progress = ( - host_overall_state['os']) - _, _, host_package_progress = host_overall_state['package'] - if (not fullname or - not host_os_progress or - not host_package_progress): + host_name, host_state, host_progress = \ + self._get_host_progress(hostid) + _, clusterhost_state, clusterhost_progress = \ + self._get_clusterhost_progress(hostid) + + if (not host_name or + not host_progress or + not clusterhost_progress): logging.error( 'nothing to update host %s', - fullname) + host_name) continue - logging.debug('got host %s state %s os_progress %s' - 'package_progress %s', - fullname, - host_state, host_os_progress, - host_package_progress) + logging.debug('got host %s host_state: %s ' + 'host_progress: %s, ' + 'clusterhost_state: %s, ' + 'clusterhost_progress: %s ', + host_name, + host_state, + host_progress, + clusterhost_state, + clusterhost_progress) - host_os_progresses[hostid] = ( - fullname, host_state, host_os_progress) - host_package_progresses[hostid] = ( - fullname, host_state, host_package_progress) + host_progresses[hostid] = ( + host_name, host_state, host_progress) + clusterhost_progresses[hostid] = ( + host_name, clusterhost_state, clusterhost_progress) - for hostid, host_value in host_os_progresses.items(): - fullname, host_state, host_os_progress = host_value - if host_state == 'INSTALLING' and host_os_progress.progress < 1.0: - self.os_matcher_.update_progress( - fullname, host_os_progress) - else: - logging.error( - 'there is no need to update host %s ' - 'OS progress: state %s os_progress %s', - fullname, host_state, host_os_progress) + for hostid, host_value in host_progresses.items(): + host_name, host_state, host_progress = host_value + if (host_state == 'INSTALLING' and + host_progress.progress < 1.0): + self.os_matcher_.update_progress( + host_name, host_progress) + else: + logging.error( + 'there is no need to update host %s ' + 'progress: state %s progress %s', + host_name, host_state, host_progress) - for hostid, host_value in host_package_progresses.items(): - fullname, host_state, host_package_progress = host_value - if (host_state == 'INSTALLING' and - host_package_progress.progress < 1.0): - self.package_matcher_.update_progress( - fullname, host_package_progress) - else: - logging.error( - 'there is no need to update host %s ' - 'Package progress: state %s package_progress %s', - fullname, host_state, host_package_progress) + for hostid, clusterhost_value in clusterhost_progresses.items(): + host_name, clusterhost_state, clusterhost_progress = \ + clusterhost_value + if (clusterhost_state == 'INSTALLING' and + clusterhost_progress.progress < 1.0): + self.package_matcher_.update_progress( + host_name, clusterhost_progress) + else: + logging.error( + 'no need to update clusterhost %s' + 'progress: state %s progress %s', + host_name, clusterhost_state, clusterhost_progress) - with database.session(): for hostid in hostids: - if hostid not in host_os_progresses: + if hostid not in host_progresses: + continue + if hostid not in clusterhost_progresses: continue - if hostid not in host_package_progresses: - continue - - _, _, host_os_progress = host_os_progresses[hostid] - _, _, host_package_progress = host_package_progresses[hostid] - self._update_host_os_progress(hostid, host_os_progress) - self._update_host_package_progress( + _, _, host_progress = host_progresses[hostid] + _, _, clusterhost_progress = clusterhost_progresses[hostid] + self._update_host_progress(hostid, host_progress, updater) + self._update_clusterhost_progress( hostid, - host_package_progress + clusterhost_progress, + updater ) self._update_cluster_progress(clusterid) diff --git a/compass/log_analyzor/file_matcher.py b/compass/log_analyzor/file_matcher.py index 28a3bdbc..535de8a4 100644 --- a/compass/log_analyzor/file_matcher.py +++ b/compass/log_analyzor/file_matcher.py @@ -19,8 +19,8 @@ import logging import os.path -from compass.db import database -from compass.db.model import LogProgressingHistory +from compass.db.api import database +from compass.db.models import LogProgressingHistory from compass.log_analyzor.line_matcher import Progress from compass.utils import setting_wrapper as setting @@ -109,24 +109,24 @@ class FileReader(object): in the last run, the progress, the message and the severity it has got in the last run. """ - with database.session() as session: - history = session.query( - LogProgressingHistory - ).filter_by( - pathname=self.pathname_ - ).first() - if history: - self.position_ = history.position - self.partial_line_ = history.partial_line - line_matcher_name = history.line_matcher_name - progress = Progress(history.progress, - history.message, - history.severity) - else: - line_matcher_name = 'start' - progress = Progress(0.0, '', None) + session = database.current_session() + history = session.query( + LogProgressingHistory + ).filter_by( + pathname=self.pathname_ + ).first() + if history: + self.position_ = history.position + self.partial_line_ = history.partial_line + line_matcher_name = history.line_matcher_name + progress = Progress(history.percentage, + history.message, + history.severity) + else: + line_matcher_name = 'start' + progress = Progress(0.0, '', None) - return line_matcher_name, progress + return line_matcher_name, progress def update_history(self, line_matcher_name, progress): """Update log_progressing_history table. @@ -138,37 +138,36 @@ class FileReader(object): The function should be called out of database session. It updates the log_processing_history table. """ - with database.session() as session: - history = session.query(LogProgressingHistory).filter_by( - pathname=self.pathname_).first() + session = database.current_session() + history = session.query(LogProgressingHistory).filter_by( + pathname=self.pathname_).first() + if history: + if history.position >= self.position_: + logging.error( + '%s history position %s is ahead of current ' + 'position %s', + self.pathname_, + history.position, + self.position_) + return - if history: - if history.position >= self.position_: - logging.error( - '%s history position %s is ahead of current ' - 'position %s', - self.pathname_, - history.position, - self.position_) - return - - history.position = self.position_ - history.partial_line = self.partial_line_ - history.line_matcher_name = line_matcher_name - history.progress = progress.progress - history.message = progress.message - history.severity = progress.severity - else: - history = LogProgressingHistory( - pathname=self.pathname_, position=self.position_, - partial_line=self.partial_line_, - line_matcher_name=line_matcher_name, - progress=progress.progress, - message=progress.message, - severity=progress.severity) - session.merge(history) - logging.debug('update file %s to history %s', - self.pathname_, history) + history.position = self.position_ + history.partial_line = self.partial_line_ + history.line_matcher_name = line_matcher_name + history.progress = progress.progress + history.message = progress.message + history.severity = progress.severity + else: + history = LogProgressingHistory( + pathname=self.pathname_, position=self.position_, + partial_line=self.partial_line_, + line_matcher_name=line_matcher_name, + percentage=progress.progress, + message=progress.message, + severity=progress.severity) + session.merge(history) + logging.debug('update file %s to history %s', + self.pathname_, history) def readline(self): """Generate each line of the log file.""" diff --git a/compass/log_analyzor/progress_calculator.py b/compass/log_analyzor/progress_calculator.py index 487d61c0..539eb359 100644 --- a/compass/log_analyzor/progress_calculator.py +++ b/compass/log_analyzor/progress_calculator.py @@ -433,7 +433,7 @@ def _get_package_adapter_matcher(package_installer, target_system): else: logging.debug('configuration %s does not match %s and %s', configuration, target_system, package_installer) - logging.error('No configuration found for os installer %s os %s', + logging.error('No configuration found for package installer %s os %s', package_installer, target_system) return None diff --git a/compass/tasks/tasks.py b/compass/tasks/tasks.py index 1a6b5d79..9001f12b 100644 --- a/compass/tasks/tasks.py +++ b/compass/tasks/tasks.py @@ -24,6 +24,7 @@ from celery.signals import setup_logging from compass.actions import deploy from compass.actions import poll_switch from compass.actions import reinstall +from compass.actions import update_progress from compass.db.api import adapter_holder as adapter_api from compass.db.api import database from compass.db.api import metadata_holder as metadata_api @@ -118,3 +119,16 @@ def reset_host(host_id): :type cluster_hosts: dict of int to list of int """ pass + + +@celery.task(name='compass.tasks.update_progress') +def update_clusters_progress(cluster_hosts): + """Calculate the installing progress of the given cluster. + + :param cluster_hosts: the cluster and hosts of each cluster to update. + :type cluster_hosts: dict of int to list of int + """ + try: + update_progress.update_progress(cluster_hosts) + except Exception as error: + logging.exception(error) diff --git a/compass/utils/util.py b/compass/utils/util.py index bc747c67..8489579d 100644 --- a/compass/utils/util.py +++ b/compass/utils/util.py @@ -190,3 +190,42 @@ def load_configs( raise error configs.append(config_locals) return configs + + +def is_instance(instance, expected_types): + """Check instance type is in one of expected types. + + :param instance: instance to check the type. + :param expected_types: types to check if instance type is in them. + :type expected_types: list of type + + :returns: True if instance type is in expect_types. + """ + for expected_type in expected_types: + if isinstance(instance, expected_type): + return True + + return False + + +def get_clusters_from_str(clusters_str): + """get clusters from string.""" + clusters = {} + for cluster_and_hosts in clusters_str.split(';'): + if not cluster_and_hosts: + continue + + if ':' in cluster_and_hosts: + cluster_str, hosts_str = cluster_and_hosts.split( + ':', 1) + else: + cluster_str = cluster_and_hosts + hosts_str = '' + + hosts = [ + host for host in hosts_str.split(',') + if host + ] + clusters[cluster_str] = hosts + + return clusters