Add usages of poll util for service modules

This patch add last usages of poll utils for services
modules and add new option in sahara config for that.

Implements bp: add-timeouts-for-polling

Change-Id: I8eed14767a6856faede352854e599bd486539fda
This commit is contained in:
Vitaly Gridnev 2015-03-11 17:47:50 +03:00
parent a8ddaf4801
commit 756c29234b
6 changed files with 174 additions and 103 deletions

View File

@ -126,12 +126,12 @@ def list_opts():
from sahara import main as sahara_main
from sahara.service.edp import job_utils
from sahara.service import periodic
from sahara.service import volumes
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils.openstack import heat
from sahara.utils.openstack import neutron
from sahara.utils.openstack import nova
from sahara.utils.openstack import swift
from sahara.utils import poll_utils
from sahara.utils import proxy
from sahara.utils import wsgi
@ -149,10 +149,11 @@ def list_opts():
sahara_main.opts,
job_utils.opts,
periodic.periodic_opts,
volumes.opts,
proxy.opts,
cpo.event_log_opts,
wsgi.wsgi_opts)),
(poll_utils.timeouts.name,
itertools.chain(poll_utils.timeouts_opts)),
(api.conductor_group.name,
itertools.chain(api.conductor_opts)),
(cinder.cinder_group.name,

View File

@ -31,7 +31,7 @@ from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import general as g
from sahara.utils.openstack import neutron
from sahara.utils.openstack import nova
from sahara.utils import poll_utils
conductor = c.API
CONF = cfg.CONF
@ -423,6 +423,19 @@ class DirectEngine(e.Engine):
networks.assign_floating_ip(instance.instance_id,
node_group.floating_ip_pool)
@poll_utils.poll_status(
'await_for_instances_active',
_("Wait for instances to become active"), sleep=1)
def _check_active(self, active_ids, cluster, instances):
if not g.check_cluster_exists(cluster):
return True
for instance in instances:
if instance.id not in active_ids:
if self._check_if_active(instance):
active_ids.add(instance.id)
cpo.add_successful_event(instance)
return len(instances) == len(active_ids)
def _await_active(self, cluster, instances):
"""Await all instances are in Active status and available."""
if not instances:
@ -433,20 +446,27 @@ class DirectEngine(e.Engine):
len(instances))
active_ids = set()
while len(active_ids) != len(instances):
if not g.check_cluster_exists(cluster):
return
for instance in instances:
if instance.id not in active_ids:
if self._check_if_active(instance):
active_ids.add(instance.id)
cpo.add_successful_event(instance)
context.sleep(1)
self._check_active(active_ids, cluster, instances)
LOG.info(_LI("Cluster {cluster_id}: all instances are active").format(
cluster_id=cluster.id))
@poll_utils.poll_status(
'delete_instances_timeout',
_("Wait for instances to be deleted"), sleep=1)
def _check_deleted(self, deleted_ids, cluster, instances):
if not g.check_cluster_exists(cluster):
return True
for instance in instances:
if instance.id not in deleted_ids:
if self._check_if_deleted(instance):
LOG.debug("Instance {instance} is deleted".format(
instance=instance.instance_name))
deleted_ids.add(instance.id)
cpo.add_successful_event(instance)
return len(deleted_ids) == len(instances)
def _await_deleted(self, cluster, instances):
"""Await all instances are deleted."""
if not instances:
@ -455,18 +475,7 @@ class DirectEngine(e.Engine):
cluster.id, _("Wait for instances to be deleted"), len(instances))
deleted_ids = set()
while len(deleted_ids) != len(instances):
if not g.check_cluster_exists(cluster):
return
for instance in instances:
if instance.id not in deleted_ids:
if self._check_if_deleted(instance):
LOG.debug("Instance {instance} is deleted".format(
instance=instance.instance_name))
deleted_ids.add(instance.id)
cpo.add_successful_event(instance)
context.sleep(1)
self._check_deleted(deleted_ids, cluster, instances)
@cpo.event_wrapper(mark_successful_on_exit=False)
def _check_if_active(self, instance):

View File

@ -30,6 +30,7 @@ from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import edp
from sahara.utils import general as g
from sahara.utils.openstack import nova
from sahara.utils import poll_utils
from sahara.utils import remote
LOG = logging.getLogger(__name__)
@ -65,6 +66,17 @@ class Engine(object):
image_id = node_group.get_image_id()
return nova.client().images.get(image_id).username
@poll_utils.poll_status('ips_assign_timeout', _("Assign IPs"), sleep=1)
def _ips_assign(self, ips_assigned, cluster, instances):
if not g.check_cluster_exists(cluster):
return True
for instance in instances:
if instance.id not in ips_assigned:
if networks.init_instances_ips(instance):
ips_assigned.add(instance.id)
cpo.add_successful_event(instance)
return len(ips_assigned) == len(instances)
def _await_networks(self, cluster, instances):
if not instances:
return
@ -72,16 +84,7 @@ class Engine(object):
cpo.add_provisioning_step(cluster.id, _("Assign IPs"), len(instances))
ips_assigned = set()
while len(ips_assigned) != len(instances):
if not g.check_cluster_exists(cluster):
return
for instance in instances:
if instance.id not in ips_assigned:
if networks.init_instances_ips(instance):
ips_assigned.add(instance.id)
cpo.add_successful_event(instance)
context.sleep(1)
self._ips_assign(ips_assigned, cluster, instances)
LOG.info(
_LI("Cluster {cluster_id}: all instances have IPs assigned")
@ -101,9 +104,10 @@ class Engine(object):
LOG.info(_LI("Cluster {cluster_id}: all instances are accessible")
.format(cluster_id=cluster.id))
@cpo.event_wrapper(mark_successful_on_exit=True)
def _wait_until_accessible(self, instance):
while True:
@poll_utils.poll_status(
'wait_until_accessible', _("Wait for instance accessibility"),
sleep=5)
def _is_accessible(self, instance):
try:
# check if ssh is accessible and cloud-init
# script is finished generating authorized_keys
@ -114,17 +118,21 @@ class Engine(object):
LOG.debug(
'Instance {instance_name} is accessible'.format(
instance_name=instance.instance_name))
return
return True
except Exception as ex:
LOG.debug("Can't login to node {instance_name} {mgmt_ip}, "
"reason {reason}".format(
instance_name=instance.instance_name,
mgmt_ip=instance.management_ip, reason=ex))
context.sleep(5)
return False
if not g.check_cluster_exists(instance.cluster):
return
return True
return False
@cpo.event_wrapper(mark_successful_on_exit=True)
def _wait_until_accessible(self, instance):
self._is_accessible(instance)
def _configure_instances(self, cluster):
"""Configure active instances.

View File

@ -15,31 +15,22 @@
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import timeutils as tu
from sahara import conductor as c
from sahara import context
from sahara import exceptions as ex
from sahara.i18n import _
from sahara.i18n import _LE
from sahara.i18n import _LW
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils.openstack import cinder
from sahara.utils.openstack import nova
from sahara.utils import poll_utils
conductor = c.API
LOG = logging.getLogger(__name__)
opts = [
cfg.IntOpt(
'detach_volume_timeout', default=300,
help='Timeout for detaching volumes from instance (in seconds).')
]
CONF = cfg.CONF
CONF.register_opts(opts)
CONF.import_opt('api_version', 'sahara.utils.openstack.cinder',
group='cinder')
@ -73,18 +64,11 @@ def attach_to_instances(instances):
_attach_volumes_to_node, instance.node_group, instance)
@poll_utils.poll_status(
'await_attach_volumes', _("Await for attaching volumes to instances"),
sleep=2)
def _await_attach_volumes(instance, devices):
timeout = 10
step = 2
while timeout > 0:
if _count_attached_devices(instance, devices) == len(devices):
return
timeout -= step
context.sleep(step)
raise ex.SystemError(_("Error attach volume to instance %s") %
instance.instance_name)
return _count_attached_devices(instance, devices) == len(devices)
@cpo.event_wrapper(mark_successful_on_exit=True)
@ -114,6 +98,16 @@ def _attach_volumes_to_node(node_group, instance):
.format(id=instance.instance_id))
@poll_utils.poll_status(
'volume_available_timeout', _("Await for volume become available"),
sleep=1)
def _await_available(volume):
volume = cinder.get_volume(volume.id)
if volume.status == 'error':
raise ex.SystemError(_("Volume %s has error status") % volume.id)
return volume.status == 'available'
def _create_attach_volume(ctx, instance, size, volume_type,
volume_local_to_instance, name=None,
availability_zone=None):
@ -131,13 +125,7 @@ def _create_attach_volume(ctx, instance, size, volume_type,
volume = cinder.client().volumes.create(**kwargs)
conductor.append_volume(ctx, instance, volume.id)
while volume.status != 'available':
volume = cinder.get_volume(volume.id)
if volume.status == 'error':
raise ex.SystemError(_("Volume %s has error status") % volume.id)
context.sleep(1)
_await_available(volume)
resp = nova.client().volumes.create_server_volume(
instance.instance_id, volume.id, None)
@ -223,6 +211,15 @@ def detach_from_instance(instance):
_delete_volume(volume_id)
@poll_utils.poll_status(
'detach_volume_timeout', _("Await for volume become detached"), sleep=2)
def _await_detach(volume_id):
volume = cinder.get_volume(volume_id)
if volume.status not in ['available', 'error']:
return False
return True
def _detach_volume(instance, volume_id):
volume = cinder.get_volume(volume_id)
try:
@ -233,21 +230,10 @@ def _detach_volume(instance, volume_id):
except Exception:
LOG.error(_LE("Can't detach volume {id}").format(id=volume.id))
detach_timeout = CONF.detach_volume_timeout
detach_timeout = CONF.timeouts.detach_volume_timeout
LOG.debug("Waiting {timeout} seconds to detach {id} volume".format(
timeout=detach_timeout, id=volume_id))
s_time = tu.utcnow()
while tu.delta_seconds(s_time, tu.utcnow()) < detach_timeout:
volume = cinder.get_volume(volume_id)
if volume.status not in ['available', 'error']:
context.sleep(2)
else:
LOG.debug("Volume {id} has been detached".format(id=volume_id))
return
else:
LOG.warning(_LW("Can't detach volume {volume}. "
"Current status of volume: {status}").format(
volume=volume_id, status=volume.status))
_await_detach(volume_id)
def _delete_volume(volume_id):

View File

@ -117,16 +117,18 @@ class TestAttachVolume(base.SaharaWithDbTestCase):
self.assertEqual(2, p_await.call_count)
self.assertEqual(4, p_mount.call_count)
@mock.patch('sahara.utils.poll_utils._get_consumed', return_value=0)
@mock.patch('sahara.context.sleep')
@mock.patch('sahara.service.volumes._count_attached_devices')
def test_await_attach_volume(self, dev_count, p_sleep):
def test_await_attach_volume(self, dev_count, p_sleep, p_get_cons):
self.override_config('await_attach_volumes', 0, group='timeouts')
dev_count.return_value = 2
p_sleep.return_value = None
instance = r.InstanceResource({'instance_id': '123454321',
'instance_name': 'instt'})
self.assertIsNone(volumes._await_attach_volumes(
instance, ['/dev/vda', '/dev/vdb']))
self.assertRaises(ex.SystemError, volumes._await_attach_volumes,
self.assertRaises(ex.TimeoutException, volumes._await_attach_volumes,
instance, ['/dev/vda', '/dev/vdb', '/dev/vdc'])
def test_count_attached_devices(self):

View File

@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import timeutils
@ -26,6 +29,46 @@ LOG = logging.getLogger(__name__)
DEFAULT_TIMEOUT = 10800
DEFAULT_SLEEP_TIME = 5
timeouts_opts = [
# engine opts
cfg.IntOpt('ips_assign_timeout',
default=DEFAULT_TIMEOUT,
help="Assign IPs timeout, in seconds"),
cfg.IntOpt('wait_until_accessible',
default=DEFAULT_TIMEOUT,
help="Wait for instance accessibility, in seconds"),
# direct engine opts
cfg.IntOpt('delete_instances_timeout',
default=DEFAULT_TIMEOUT,
help="Wait for instances to be deleted, in seconds"),
cfg.IntOpt('await_for_instances_active',
default=DEFAULT_TIMEOUT,
help="Wait for instances to become active, in seconds"),
# volumes opts
cfg.IntOpt(
'detach_volume_timeout', default=300,
help='Timeout for detaching volumes from instance, in seconds',
deprecated_name='detach_volume_timeout',
deprecated_group=None),
cfg.IntOpt('volume_available_timeout',
default=DEFAULT_TIMEOUT,
help="Wait for volumes to become available, in seconds"),
cfg.IntOpt('await_attach_volumes',
default=10,
help="Wait for attaching volumes to instances, in seconds")
]
timeouts = cfg.OptGroup(name='timeouts',
title='Sahara timeouts')
CONF = cfg.CONF
CONF.register_group(timeouts)
CONF.register_opts(timeouts_opts, group=timeouts)
def _get_consumed(started_at):
return timeutils.delta_seconds(started_at, timeutils.utcnow())
@ -39,8 +82,8 @@ def _get_current_value(cluster, option):
return option.default_value
def poll(get_status, kwargs, operation_name=None, timeout_name=None,
timeout=DEFAULT_TIMEOUT, sleep=DEFAULT_SLEEP_TIME,
def poll(get_status, kwargs=None, args=None, operation_name=None,
timeout_name=None, timeout=DEFAULT_TIMEOUT, sleep=DEFAULT_SLEEP_TIME,
exception_strategy='raise'):
"""This util poll status of object obj during some timeout.
@ -63,11 +106,15 @@ def poll(get_status, kwargs, operation_name=None, timeout_name=None,
# We shouldn't raise TimeoutException if incorrect timeout specified and
# status is ok now. In such way we should execute get_status at least once.
at_least_once = True
if not kwargs:
kwargs = {}
if not args:
args = ()
while at_least_once or _get_consumed(start_time) < timeout:
at_least_once = False
try:
status = get_status(**kwargs)
status = get_status(*args, **kwargs)
except BaseException:
if exception_strategy == 'raise':
raise
@ -109,3 +156,21 @@ def plugin_option_poll(cluster, get_status, option, operation_name, sleep_time,
}
poll(**poll_description)
def poll_status(option, operation_name, sleep):
def decorator(f):
@functools.wraps(f)
def handler(*args, **kwargs):
poll_description = {
'get_status': f,
'kwargs': kwargs,
'args': args,
'timeout': getattr(CONF.timeouts, option),
'operation_name': operation_name,
'timeout_name': option,
'sleep': sleep,
}
poll(**poll_description)
return handler
return decorator