Merge "Add Progress Update feature to new compass-core" into dev/experimental

This commit is contained in:
Jenkins 2014-08-03 21:16:05 +00:00 committed by Gerrit Code Review
commit 814f35d22e
11 changed files with 564 additions and 237 deletions

View File

@ -0,0 +1,113 @@
# Copyright 2014 Huawei Technologies Co. Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module to update status and installing progress of the given cluster.
.. moduleauthor:: Xiaodong Wang <xiaodongwang@huawei.com>
"""
import logging
from compass.actions import util
from compass.db.api import database
from compass.db import models
from compass.log_analyzor import progress_calculator
from compass.utils import setting_wrapper as setting
def _cluster_filter(cluster):
"""filter cluster."""
if not cluster.state:
logging.error('there is no state for cluster %s',
cluster.id)
return False
if cluster.state.state != 'INSTALLING':
logging.error('the cluster %s state %s is not installing',
cluster.id, cluster.state.state)
return False
return True
def _host_filter(host):
"""filter host."""
if not host.state:
logging.error('there is no state for host %s',
host.id)
return False
if host.state.state != 'INSTALLING':
logging.error('the host %s state %s is not installing',
host.id, host.state.state)
return False
return True
def update_progress(cluster_hosts):
"""Update status and installing progress of the given cluster.
:param cluster_hosts: clusters and hosts in each cluster to update.
:type cluster_hosts: dict of int or str to list of int or str
.. note::
The function should be called out of the database session scope.
In the function, it will update the database cluster_state and
host_state table for the deploying cluster and hosts.
The function will also query log_progressing_history table to get
the lastest installing progress and the position of log it has
processed in the last run. The function uses these information to
avoid recalculate the progress from the beginning of the log file.
After the progress got updated, these information will be stored back
to the log_progressing_history for next time run.
"""
with util.lock('log_progressing', blocking=False) as lock:
if not lock:
logging.error(
'failed to acquire lock to calculate installation progress')
return
logging.info('update installing progress of cluster_hosts: %s',
cluster_hosts)
os_names = {}
distributed_systems = {}
with database.session() as session:
cluster = session.query(models.Cluster).first()
clusterid = cluster.id
adapter = cluster.adapter
os_installer = adapter.os_installer
os_installer_name = os_installer.instance_name
package_installer = adapter.package_installer
package_installer_name = package_installer.instance_name
distributed_system = cluster.distributed_system
distributed_system_name = distributed_system.name
host = session.query(models.Host).first()
os_name = host.os_name
os_names[clusterid] = os_name
distributed_systems[clusterid] = distributed_system_name
clusterhosts = cluster.clusterhosts
hostids = [clusterhost.host.id for clusterhost in clusterhosts]
cluster_hosts.update({clusterid: hostids})
progress_calculator.update_progress(
os_installer_name,
os_names,
package_installer_name,
distributed_systems,
cluster_hosts)

View File

@ -48,3 +48,66 @@ def lock(lock_name, blocking=True, timeout=10):
instance_lock.acquired_until = 0
instance_lock.release()
logging.debug('released lock %s', lock_name)
"""
def update_cluster_hosts(cluster_hosts,
cluster_filter=None, host_filter=None):
session = database.current_session()
os_versions = {}
target_systems = {}
updated_cluster_hosts = {}
clusters = session.query(models.Cluster).all()
for cluster in clusters:
if cluster_hosts and (
cluster.id not in cluster_hosts and
str(cluster.id) not in cluster_hosts and
cluster.name not in cluster_hosts
):
logging.debug('ignore cluster %s sinc it is not in %s',
cluster.id, cluster_hosts)
continue
adapter = cluster.adapter
if not cluster.adapter:
logging.error('there is no adapter for cluster %s',
cluster.id)
continue
if cluster_filter and not cluster_filter(cluster):
logging.debug('filter cluster %s', cluster.id)
continue
updated_cluster_hosts[cluster.id] = []
os_versions[cluster.id] = 'CentOS-6.5-x86_64'
target_systems[cluster.id] = 'openstack'
if cluster.id in cluster_hosts:
hosts = cluster_hosts[cluster.id]
elif str(cluster.id) in cluster_hosts:
hosts = cluster_hosts[str(cluster.id)]
elif cluster.name in cluster_hosts:
hosts = cluster_hosts[cluster.name]
else:
hosts = []
if not hosts:
hosts = [host.id for host in cluster.hosts]
for host in cluster.hosts:
if (
host.id not in hosts and
str(host.id) not in hosts and
host.hostname not in hosts
):
logging.debug('ignore host %s which is not in %s',
host.id, hosts)
continue
if host_filter and not host_filter(host):
logging.debug('filter host %s', host.id)
continue
updated_cluster_hosts[cluster.id].append(host.id)
return (updated_cluster_hosts, os_versions, target_systems)
"""

View File

@ -155,6 +155,6 @@ def get_adapters_internal(session):
else:
logging.info(
'ignore adapter %s since it is not deployable',
adapter_dict
adapter.to_dict()
)
return adapter_mapping

View File

@ -1204,7 +1204,7 @@ def update_cluster_host_state(
@utils.supported_filters(
optional_support_keys=UPDATED_CLUSTERHOST_STATE_FIELDS
)
@database.run_in_session()
## @database.run_in_session()
@user_api.check_user_permission_in_session(
permission.PERMISSION_UPDATE_CLUSTERHOST_STATE
)
@ -1223,7 +1223,7 @@ def update_clusterhost_state(
@utils.supported_filters(
optional_support_keys=UPDATED_CLUSTER_STATE_FIELDS
)
@database.run_in_session()
## @database.run_in_session()
@user_api.check_user_permission_in_session(
permission.PERMISSION_UPDATE_CLUSTER_STATE
)

View File

@ -589,7 +589,7 @@ def get_host_state(session, getter, host_id, **kwargs):
@utils.supported_filters(UPDATED_STATE_FIELDS)
@database.run_in_session()
## @database.run_in_session()
@user_api.check_user_permission_in_session(
permission.PERMISSION_UPDATE_HOST_STATE
)

View File

@ -21,6 +21,7 @@ import simplejson as json
from sqlalchemy import BigInteger
from sqlalchemy import Boolean
from sqlalchemy import Column
from sqlalchemy import ColumnDefault
from sqlalchemy import DateTime
from sqlalchemy import Enum
from sqlalchemy.ext.declarative import declarative_base
@ -1885,3 +1886,46 @@ class Network(BASE, TimestampMixin, HelperMixin):
raise exception.InvalidParameter(
'subnet %s format is uncorrect' % self.subnet
)
class LogProgressingHistory(BASE):
"""host installing log history for each file.
:param id: int, identity as primary key.
:param pathname: str, the full path of the installing log file. unique.
:param position: int, the position of the log file it has processed.
:param partial_line: str, partial line of the log.
:param progressing: float, indicate the installing progress between 0 to 1.
:param message: str, str, the installing message.
:param severity: Enum, the installing message severity.
('ERROR', 'WARNING', 'INFO')
:param line_matcher_name: str, the line matcher name of the log processor.
:param update_timestamp: datetime, the latest timestamp the entry updated.
"""
__tablename__ = 'log_progressing_history'
id = Column(Integer, primary_key=True)
pathname = Column(String(80), unique=True)
position = Column(Integer, ColumnDefault(0))
partial_line = Column(Text)
percentage = Column(Float, ColumnDefault(0.0))
message = Column(Text)
severity = Column(Enum('ERROR', 'WARNING', 'INFO'), ColumnDefault('INFO'))
line_matcher_name = Column(String(80), ColumnDefault('start'))
update_timestamp = Column(DateTime, default=datetime.datetime.now(),
onupdate=datetime.datetime.now())
def __init__(self, **kwargs):
super(LogProgressingHistory, self).__init__(**kwargs)
def __repr__(self):
return (
'LogProgressingHistory[%r: position %r,'
'partial_line %r,percentage %r,message %r,'
'severity %r]'
) % (
self.pathname, self.position,
self.partial_line,
self.percentage,
self.message,
self.severity
)

View File

@ -19,11 +19,19 @@
import logging
import re
from compass.db import database
from compass.db.model import Cluster
from compass.db.model import ClusterHost
from compass.db.api import cluster as cluster_api
from compass.db.api import database
from compass.db.api import host as host_api
from compass.db.api import user as user_api
from compass.db.models import Cluster
from compass.db.models import ClusterHost
from compass.db.models import Host
from compass.log_analyzor.line_matcher import Progress
import datetime
class AdapterItemMatcher(object):
"""Progress matcher for the os installing or package installing."""
@ -162,155 +170,101 @@ class AdapterMatcher(object):
@classmethod
def _get_host_progress(cls, hostid):
"""Get Host Progress from database.
"""Get Host Progress from HostState."""
.. notes::
The function should be called in database session.
"""
session = database.current_session()
host = session.query(
ClusterHost
Host
).filter_by(id=hostid).first()
if not host:
logging.error(
'there is no host for %s in ClusterHost', hostid)
'there is no host for %s in Host', hostid)
return None, None, None
if not host.state:
logging.error('there is no related HostState for %s',
hostid)
return host.fullname, None, None
return host.name, None, None
"""
return (
host.fullname,
host.name,
host.state.state,
Progress(host.state.progress,
Progress(host.state.percentage,
host.state.message,
host.state.severity))
"""
return {
'os': (
host.fullname,
host.state.state,
Progress(host.state.os_progress,
host.state.os_message,
host.state.os_severity)),
'package': (
host.fullname,
host.state.state,
Progress(host.state.progress,
host.state.message,
host.state.severity))}
@classmethod
def _update_host_os_progress(cls, hostid, os_progress):
"""Update host progress to database.
def _get_clusterhost_progress(cls, hostid):
"""Get ClusterHost progress from ClusterHostState."""
session = database.current_session()
clusterhost = session.query(
ClusterHost
).filter_by(id=hostid).first()
if not clusterhost:
logging.error(
'there is no clusterhost for %s in ClusterHost',
hostid
)
return None, None, None
if not clusterhost.state:
logging.error(
'there is no related ClusterHostState for %s',
hostid
)
return clusterhost.name, None, None
return (
clusterhost.name,
clusterhost.state.state,
Progress(clusterhost.state.percentage,
clusterhost.state.message,
clusterhost.state.severity))
@classmethod
def _update_host_progress(cls, hostid, host_progress, updater):
"""Updates host progress to db."""
.. note::
The function should be called in database session.
"""
session = database.current_session()
host = session.query(
ClusterHost).filter_by(id=hostid).first()
Host).filter_by(id=hostid).first()
if not host:
logging.error(
'there is no host for %s in ClusterHost', hostid)
return
'there is no host for %s in table Host',
hostid
)
if not host.state:
logging.error(
'there is no related HostState for %s', hostid)
return
'there is no related HostState for %s',
hostid
)
logging.debug('os progress: %s', os_progress.progress)
if host.state.os_progress > os_progress.progress:
if host.state.percentage > host_progress.progress:
logging.error(
'host %s os_progress is not increased '
'from %s to %s',
hostid, host.state, os_progress)
'host %s progress has not been increased'
' from %s to $s',
hostid, host.state, host_progress
)
return
if (
host.state.os_progress == os_progress.progress and
host.state.os_message == os_progress.message
):
if (host.state.percentage == host_progress.progress and
host.state.message == host_progress.message):
logging.info(
'ignore update host %s progress %s to %s',
hostid, os_progress, host.state)
'host %s update ignored due to same progress'
'in database',
hostid
)
return
host.state.os_progress = os_progress.progress
"""host.state.os_progress = progress.progress"""
host.state.os_message = os_progress.message
if os_progress.severity:
host.state.os_severity = os_progress.severity
host.state.percentage = host_progress.progress
host.state.message = host_progress.message
if host_progress.severity:
host.state.severity = host_progress.severity
if host.state.os_progress >= 1.0:
host.state.os_state = 'OS_READY'
if host.state.os_severity == 'ERROR':
host.state.os_state = 'ERROR'
if host.state.os_state != 'INSTALLING':
host.mutable = True
logging.debug(
'update host %s state %s',
hostid, host.state)
@classmethod
def _update_host_package_progress(cls, hostid, progress):
"""Update host progress to database.
.. note::
The function should be called in database session.
"""
session = database.current_session()
host = session.query(
ClusterHost).filter_by(id=hostid).first()
logging.debug('package progress: %s', progress.progress)
logging.debug('package ssssstate: %s', host.state.state)
if not host:
logging.error(
'there is no host for %s in ClusterHost', hostid)
return
if not host.state:
logging.error(
'there is no related HostState for %s', hostid)
return
if not host.state.state in ['OS_READY', 'INSTALLING']:
logging.error(
'host %s issssss not in INSTALLING state',
hostid)
return
if host.state.progress > progress.progress:
logging.error(
'host %s progress is not increased '
'from %s to %s',
hostid, host.state, progress)
return
if (
host.state.progress == progress.progress and
host.state.message == progress.message
):
logging.info(
'ignore update host %s progress %s to %s',
hostid, progress, host.state)
return
host.state.progress = progress.progress
host.state.message = progress.message
if progress.severity:
host.state.severity = progress.severity
if host.state.progress >= 1.0:
host.state.state = 'READY'
if host.state.percentage >= 1.0:
host.state.state = 'SUCCESSFUL'
if host.state.severity == 'ERROR':
host.state.state = 'ERROR'
@ -318,10 +272,81 @@ class AdapterMatcher(object):
if host.state.state != 'INSTALLING':
host.mutable = True
host_api.update_host_state(
session,
updater,
hostid,
state=host.state.state,
percentage=host.state.percentage,
message=host.state.message,
id=hostid
)
logging.debug(
'update host %s state %s',
hostid, host.state)
@classmethod
def _update_clusterhost_progress(
cls,
hostid,
clusterhost_progress,
updater
):
session = database.current_session()
clusterhost = session.query(
ClusterHost).filter_by(id=hostid).first()
if not clusterhost.state:
logging.error(
'ClusterHost state not found for %s',
hostid)
if clusterhost.state.percentage > clusterhost_progress.progress:
logging.error(
'clusterhost %s state has not been increased'
' from %s to %s',
hostid, clusterhost.state, clusterhost_progress
)
return
if (clusterhost.state.percentage == clusterhost_progress.progress and
clusterhost.state.message == clusterhost_progress.message):
logging.info(
'clusterhost %s update ignored due to same progress'
'in database',
hostid
)
return
clusterhost.state.percentage = clusterhost_progress.progress
clusterhost.state.message = clusterhost_progress.message
if clusterhost_progress.severity:
clusterhost.state.severity = clusterhost_progress.severity
if clusterhost.state.percentage >= 1.0:
clusterhost.state.state = 'SUCCESSFUL'
if clusterhost.state.severity == 'ERROR':
clusterhost.state.state = 'ERROR'
if clusterhost.state.state != 'INSTALLING':
clusterhost.mutable = True
cluster_api.update_clusterhost_state(
session,
updater,
hostid,
state=clusterhost.state.state,
percentage=clusterhost.state.percentage,
message=clusterhost.state.message
)
logging.debug(
'update clusterhost %s state %s',
hostid, clusterhost.state)
@classmethod
def _update_cluster_progress(cls, clusterid):
"""Update cluster installing progress to database.
@ -351,18 +376,41 @@ class AdapterMatcher(object):
cluster_progress = 0.0
cluster_messages = {}
cluster_severities = set([])
cluster_installing_hosts = 0
cluster_failed_hosts = 0
hostids = []
for host in cluster.hosts:
clusterhosts = cluster.clusterhosts
hosts = [clusterhost.host for clusterhost in clusterhosts]
for host in hosts:
if host.state:
hostids.append(host.id)
cluster_progress += host.state.progress
cluster_progress += host.state.percentage
if host.state.message:
cluster_messages[host.hostname] = host.state.message
cluster_messages[host.name] = host.state.message
if host.state.severity:
cluster_severities.add(host.state.severity)
cluster.state.progress = cluster_progress / len(hostids)
for clusterhost in clusterhosts:
if clusterhost.state:
cluster_progress += clusterhost.state.percentage
if clusterhost.state.state == 'INSTALLING':
cluster_installing_hosts += 1
elif (clusterhost.host.state.state not in
['ERROR', 'INITIALIZED'] and
clusterhost.state.state != 'ERORR'):
cluster_installing_hosts += 1
elif (clusterhost.state.state == 'ERROR' or
clusterhost.host.state.state == 'ERROR'):
cluster_failed_hosts += 1
if clusterhost.state.message:
cluster_messages[host.name] = clusterhost.state.message
if clusterhost.state.severity:
cluster_severities.add(clusterhost.state.severity)
cluster.state.percentage = cluster_progress / (len(hostids) * 2)
cluster.state.message = '\n'.join(
[
'%s: %s' % (hostname, message)
@ -374,7 +422,7 @@ class AdapterMatcher(object):
cluster.state.severity = severity
break
if cluster.state.progress >= 1.0:
if cluster.state.percentage >= 1.0:
cluster.state.state = 'READY'
if cluster.state.severity == 'ERROR':
@ -383,85 +431,92 @@ class AdapterMatcher(object):
if cluster.state.state != 'INSTALLING':
cluster.mutable = True
cluster.state.installing_hosts = cluster_installing_hosts
cluster.state.total_hosts = len(clusterhosts)
cluster.state.failed_hosts = cluster_failed_hosts
cluster.state.completed_hosts = cluster.state.total_hosts - \
cluster.state.installing_hosts - cluster.state.failed_hosts
logging.debug(
'update cluster %s state %s',
clusterid, cluster.state)
def update_progress(self, clusterid, hostids):
"""Update cluster progress and hosts progresses.
:param clusterid: the id of the cluster to update.
:type clusterid: int.
:param hostids: the ids of the hosts to update.
:type hostids: list of int.
"""
logging.debug('printing os_matcher %s', self.__str__())
host_os_progresses = {}
host_package_progresses = {}
host_progresses = {}
clusterhost_progresses = {}
updater = user_api.get_user_object(
'admin@abc.com',
expire_timestamp=datetime.datetime.now() +
datetime.timedelta(seconds=10000))
with database.session():
for hostid in hostids:
host_overall_state = (
self._get_host_progress(hostid))
logging.debug('host overall state: %s', host_overall_state)
fullname, host_state, host_os_progress = (
host_overall_state['os'])
_, _, host_package_progress = host_overall_state['package']
if (not fullname or
not host_os_progress or
not host_package_progress):
host_name, host_state, host_progress = \
self._get_host_progress(hostid)
_, clusterhost_state, clusterhost_progress = \
self._get_clusterhost_progress(hostid)
if (not host_name or
not host_progress or
not clusterhost_progress):
logging.error(
'nothing to update host %s',
fullname)
host_name)
continue
logging.debug('got host %s state %s os_progress %s'
'package_progress %s',
fullname,
host_state, host_os_progress,
host_package_progress)
logging.debug('got host %s host_state: %s '
'host_progress: %s, '
'clusterhost_state: %s, '
'clusterhost_progress: %s ',
host_name,
host_state,
host_progress,
clusterhost_state,
clusterhost_progress)
host_os_progresses[hostid] = (
fullname, host_state, host_os_progress)
host_package_progresses[hostid] = (
fullname, host_state, host_package_progress)
host_progresses[hostid] = (
host_name, host_state, host_progress)
clusterhost_progresses[hostid] = (
host_name, clusterhost_state, clusterhost_progress)
for hostid, host_value in host_os_progresses.items():
fullname, host_state, host_os_progress = host_value
if host_state == 'INSTALLING' and host_os_progress.progress < 1.0:
self.os_matcher_.update_progress(
fullname, host_os_progress)
else:
logging.error(
'there is no need to update host %s '
'OS progress: state %s os_progress %s',
fullname, host_state, host_os_progress)
for hostid, host_value in host_package_progresses.items():
fullname, host_state, host_package_progress = host_value
for hostid, host_value in host_progresses.items():
host_name, host_state, host_progress = host_value
if (host_state == 'INSTALLING' and
host_package_progress.progress < 1.0):
self.package_matcher_.update_progress(
fullname, host_package_progress)
host_progress.progress < 1.0):
self.os_matcher_.update_progress(
host_name, host_progress)
else:
logging.error(
'there is no need to update host %s '
'Package progress: state %s package_progress %s',
fullname, host_state, host_package_progress)
'progress: state %s progress %s',
host_name, host_state, host_progress)
for hostid, clusterhost_value in clusterhost_progresses.items():
host_name, clusterhost_state, clusterhost_progress = \
clusterhost_value
if (clusterhost_state == 'INSTALLING' and
clusterhost_progress.progress < 1.0):
self.package_matcher_.update_progress(
host_name, clusterhost_progress)
else:
logging.error(
'no need to update clusterhost %s'
'progress: state %s progress %s',
host_name, clusterhost_state, clusterhost_progress)
with database.session():
for hostid in hostids:
if hostid not in host_os_progresses:
if hostid not in host_progresses:
continue
if hostid not in clusterhost_progresses:
continue
if hostid not in host_package_progresses:
continue
_, _, host_os_progress = host_os_progresses[hostid]
_, _, host_package_progress = host_package_progresses[hostid]
self._update_host_os_progress(hostid, host_os_progress)
self._update_host_package_progress(
_, _, host_progress = host_progresses[hostid]
_, _, clusterhost_progress = clusterhost_progresses[hostid]
self._update_host_progress(hostid, host_progress, updater)
self._update_clusterhost_progress(
hostid,
host_package_progress
clusterhost_progress,
updater
)
self._update_cluster_progress(clusterid)

View File

@ -19,8 +19,8 @@
import logging
import os.path
from compass.db import database
from compass.db.model import LogProgressingHistory
from compass.db.api import database
from compass.db.models import LogProgressingHistory
from compass.log_analyzor.line_matcher import Progress
from compass.utils import setting_wrapper as setting
@ -109,7 +109,7 @@ class FileReader(object):
in the last run, the progress, the message and the
severity it has got in the last run.
"""
with database.session() as session:
session = database.current_session()
history = session.query(
LogProgressingHistory
).filter_by(
@ -119,7 +119,7 @@ class FileReader(object):
self.position_ = history.position
self.partial_line_ = history.partial_line
line_matcher_name = history.line_matcher_name
progress = Progress(history.progress,
progress = Progress(history.percentage,
history.message,
history.severity)
else:
@ -138,10 +138,9 @@ class FileReader(object):
The function should be called out of database session.
It updates the log_processing_history table.
"""
with database.session() as session:
session = database.current_session()
history = session.query(LogProgressingHistory).filter_by(
pathname=self.pathname_).first()
if history:
if history.position >= self.position_:
logging.error(
@ -163,7 +162,7 @@ class FileReader(object):
pathname=self.pathname_, position=self.position_,
partial_line=self.partial_line_,
line_matcher_name=line_matcher_name,
progress=progress.progress,
percentage=progress.progress,
message=progress.message,
severity=progress.severity)
session.merge(history)

View File

@ -433,7 +433,7 @@ def _get_package_adapter_matcher(package_installer, target_system):
else:
logging.debug('configuration %s does not match %s and %s',
configuration, target_system, package_installer)
logging.error('No configuration found for os installer %s os %s',
logging.error('No configuration found for package installer %s os %s',
package_installer, target_system)
return None

View File

@ -24,6 +24,7 @@ from celery.signals import setup_logging
from compass.actions import deploy
from compass.actions import poll_switch
from compass.actions import reinstall
from compass.actions import update_progress
from compass.db.api import adapter_holder as adapter_api
from compass.db.api import database
from compass.db.api import metadata_holder as metadata_api
@ -118,3 +119,16 @@ def reset_host(host_id):
:type cluster_hosts: dict of int to list of int
"""
pass
@celery.task(name='compass.tasks.update_progress')
def update_clusters_progress(cluster_hosts):
"""Calculate the installing progress of the given cluster.
:param cluster_hosts: the cluster and hosts of each cluster to update.
:type cluster_hosts: dict of int to list of int
"""
try:
update_progress.update_progress(cluster_hosts)
except Exception as error:
logging.exception(error)

View File

@ -190,3 +190,42 @@ def load_configs(
raise error
configs.append(config_locals)
return configs
def is_instance(instance, expected_types):
"""Check instance type is in one of expected types.
:param instance: instance to check the type.
:param expected_types: types to check if instance type is in them.
:type expected_types: list of type
:returns: True if instance type is in expect_types.
"""
for expected_type in expected_types:
if isinstance(instance, expected_type):
return True
return False
def get_clusters_from_str(clusters_str):
"""get clusters from string."""
clusters = {}
for cluster_and_hosts in clusters_str.split(';'):
if not cluster_and_hosts:
continue
if ':' in cluster_and_hosts:
cluster_str, hosts_str = cluster_and_hosts.split(
':', 1)
else:
cluster_str = cluster_and_hosts
hosts_str = ''
hosts = [
host for host in hosts_str.split(',')
if host
]
clusters[cluster_str] = hosts
return clusters