From 756c29234be617b11c768d05bd71499fa95adf3a Mon Sep 17 00:00:00 2001 From: Vitaly Gridnev Date: Wed, 11 Mar 2015 17:47:50 +0300 Subject: [PATCH] Add usages of poll util for service modules This patch add last usages of poll utils for services modules and add new option in sahara config for that. Implements bp: add-timeouts-for-polling Change-Id: I8eed14767a6856faede352854e599bd486539fda --- sahara/config.py | 5 +- sahara/service/direct_engine.py | 55 +++++++++-------- sahara/service/engine.py | 72 +++++++++++++---------- sahara/service/volumes.py | 68 +++++++++------------ sahara/tests/unit/service/test_volumes.py | 6 +- sahara/utils/poll_utils.py | 71 +++++++++++++++++++++- 6 files changed, 174 insertions(+), 103 deletions(-) diff --git a/sahara/config.py b/sahara/config.py index f05ec837..cae8e6fc 100644 --- a/sahara/config.py +++ b/sahara/config.py @@ -126,12 +126,12 @@ def list_opts(): from sahara import main as sahara_main from sahara.service.edp import job_utils from sahara.service import periodic - from sahara.service import volumes from sahara.utils import cluster_progress_ops as cpo from sahara.utils.openstack import heat from sahara.utils.openstack import neutron from sahara.utils.openstack import nova from sahara.utils.openstack import swift + from sahara.utils import poll_utils from sahara.utils import proxy from sahara.utils import wsgi @@ -149,10 +149,11 @@ def list_opts(): sahara_main.opts, job_utils.opts, periodic.periodic_opts, - volumes.opts, proxy.opts, cpo.event_log_opts, wsgi.wsgi_opts)), + (poll_utils.timeouts.name, + itertools.chain(poll_utils.timeouts_opts)), (api.conductor_group.name, itertools.chain(api.conductor_opts)), (cinder.cinder_group.name, diff --git a/sahara/service/direct_engine.py b/sahara/service/direct_engine.py index 6f20fbc3..c70c1d9b 100644 --- a/sahara/service/direct_engine.py +++ b/sahara/service/direct_engine.py @@ -31,7 +31,7 @@ from sahara.utils import cluster_progress_ops as cpo from sahara.utils import general as g from sahara.utils.openstack import neutron from sahara.utils.openstack import nova - +from sahara.utils import poll_utils conductor = c.API CONF = cfg.CONF @@ -423,6 +423,19 @@ class DirectEngine(e.Engine): networks.assign_floating_ip(instance.instance_id, node_group.floating_ip_pool) + @poll_utils.poll_status( + 'await_for_instances_active', + _("Wait for instances to become active"), sleep=1) + def _check_active(self, active_ids, cluster, instances): + if not g.check_cluster_exists(cluster): + return True + for instance in instances: + if instance.id not in active_ids: + if self._check_if_active(instance): + active_ids.add(instance.id) + cpo.add_successful_event(instance) + return len(instances) == len(active_ids) + def _await_active(self, cluster, instances): """Await all instances are in Active status and available.""" if not instances: @@ -433,20 +446,27 @@ class DirectEngine(e.Engine): len(instances)) active_ids = set() - while len(active_ids) != len(instances): - if not g.check_cluster_exists(cluster): - return - for instance in instances: - if instance.id not in active_ids: - if self._check_if_active(instance): - active_ids.add(instance.id) - cpo.add_successful_event(instance) - - context.sleep(1) + self._check_active(active_ids, cluster, instances) LOG.info(_LI("Cluster {cluster_id}: all instances are active").format( cluster_id=cluster.id)) + @poll_utils.poll_status( + 'delete_instances_timeout', + _("Wait for instances to be deleted"), sleep=1) + def _check_deleted(self, deleted_ids, cluster, instances): + if not g.check_cluster_exists(cluster): + return True + + for instance in instances: + if instance.id not in deleted_ids: + if self._check_if_deleted(instance): + LOG.debug("Instance {instance} is deleted".format( + instance=instance.instance_name)) + deleted_ids.add(instance.id) + cpo.add_successful_event(instance) + return len(deleted_ids) == len(instances) + def _await_deleted(self, cluster, instances): """Await all instances are deleted.""" if not instances: @@ -455,18 +475,7 @@ class DirectEngine(e.Engine): cluster.id, _("Wait for instances to be deleted"), len(instances)) deleted_ids = set() - while len(deleted_ids) != len(instances): - if not g.check_cluster_exists(cluster): - return - for instance in instances: - if instance.id not in deleted_ids: - if self._check_if_deleted(instance): - LOG.debug("Instance {instance} is deleted".format( - instance=instance.instance_name)) - deleted_ids.add(instance.id) - cpo.add_successful_event(instance) - - context.sleep(1) + self._check_deleted(deleted_ids, cluster, instances) @cpo.event_wrapper(mark_successful_on_exit=False) def _check_if_active(self, instance): diff --git a/sahara/service/engine.py b/sahara/service/engine.py index 9594ddbb..339ab5d1 100644 --- a/sahara/service/engine.py +++ b/sahara/service/engine.py @@ -30,6 +30,7 @@ from sahara.utils import cluster_progress_ops as cpo from sahara.utils import edp from sahara.utils import general as g from sahara.utils.openstack import nova +from sahara.utils import poll_utils from sahara.utils import remote LOG = logging.getLogger(__name__) @@ -65,6 +66,17 @@ class Engine(object): image_id = node_group.get_image_id() return nova.client().images.get(image_id).username + @poll_utils.poll_status('ips_assign_timeout', _("Assign IPs"), sleep=1) + def _ips_assign(self, ips_assigned, cluster, instances): + if not g.check_cluster_exists(cluster): + return True + for instance in instances: + if instance.id not in ips_assigned: + if networks.init_instances_ips(instance): + ips_assigned.add(instance.id) + cpo.add_successful_event(instance) + return len(ips_assigned) == len(instances) + def _await_networks(self, cluster, instances): if not instances: return @@ -72,16 +84,7 @@ class Engine(object): cpo.add_provisioning_step(cluster.id, _("Assign IPs"), len(instances)) ips_assigned = set() - while len(ips_assigned) != len(instances): - if not g.check_cluster_exists(cluster): - return - for instance in instances: - if instance.id not in ips_assigned: - if networks.init_instances_ips(instance): - ips_assigned.add(instance.id) - cpo.add_successful_event(instance) - - context.sleep(1) + self._ips_assign(ips_assigned, cluster, instances) LOG.info( _LI("Cluster {cluster_id}: all instances have IPs assigned") @@ -101,30 +104,35 @@ class Engine(object): LOG.info(_LI("Cluster {cluster_id}: all instances are accessible") .format(cluster_id=cluster.id)) + @poll_utils.poll_status( + 'wait_until_accessible', _("Wait for instance accessibility"), + sleep=5) + def _is_accessible(self, instance): + try: + # check if ssh is accessible and cloud-init + # script is finished generating authorized_keys + exit_code, stdout = instance.remote().execute_command( + "ls .ssh/authorized_keys", raise_when_error=False) + + if exit_code == 0: + LOG.debug( + 'Instance {instance_name} is accessible'.format( + instance_name=instance.instance_name)) + return True + except Exception as ex: + LOG.debug("Can't login to node {instance_name} {mgmt_ip}, " + "reason {reason}".format( + instance_name=instance.instance_name, + mgmt_ip=instance.management_ip, reason=ex)) + return False + + if not g.check_cluster_exists(instance.cluster): + return True + return False + @cpo.event_wrapper(mark_successful_on_exit=True) def _wait_until_accessible(self, instance): - while True: - try: - # check if ssh is accessible and cloud-init - # script is finished generating authorized_keys - exit_code, stdout = instance.remote().execute_command( - "ls .ssh/authorized_keys", raise_when_error=False) - - if exit_code == 0: - LOG.debug( - 'Instance {instance_name} is accessible'.format( - instance_name=instance.instance_name)) - return - except Exception as ex: - LOG.debug("Can't login to node {instance_name} {mgmt_ip}, " - "reason {reason}".format( - instance_name=instance.instance_name, - mgmt_ip=instance.management_ip, reason=ex)) - - context.sleep(5) - - if not g.check_cluster_exists(instance.cluster): - return + self._is_accessible(instance) def _configure_instances(self, cluster): """Configure active instances. diff --git a/sahara/service/volumes.py b/sahara/service/volumes.py index 27139931..3f3bf6b1 100644 --- a/sahara/service/volumes.py +++ b/sahara/service/volumes.py @@ -15,31 +15,22 @@ from oslo_config import cfg from oslo_log import log as logging -from oslo_utils import timeutils as tu from sahara import conductor as c from sahara import context from sahara import exceptions as ex from sahara.i18n import _ from sahara.i18n import _LE -from sahara.i18n import _LW from sahara.utils import cluster_progress_ops as cpo from sahara.utils.openstack import cinder from sahara.utils.openstack import nova +from sahara.utils import poll_utils conductor = c.API LOG = logging.getLogger(__name__) - -opts = [ - cfg.IntOpt( - 'detach_volume_timeout', default=300, - help='Timeout for detaching volumes from instance (in seconds).') -] - CONF = cfg.CONF -CONF.register_opts(opts) CONF.import_opt('api_version', 'sahara.utils.openstack.cinder', group='cinder') @@ -73,18 +64,11 @@ def attach_to_instances(instances): _attach_volumes_to_node, instance.node_group, instance) +@poll_utils.poll_status( + 'await_attach_volumes', _("Await for attaching volumes to instances"), + sleep=2) def _await_attach_volumes(instance, devices): - timeout = 10 - step = 2 - while timeout > 0: - if _count_attached_devices(instance, devices) == len(devices): - return - - timeout -= step - context.sleep(step) - - raise ex.SystemError(_("Error attach volume to instance %s") % - instance.instance_name) + return _count_attached_devices(instance, devices) == len(devices) @cpo.event_wrapper(mark_successful_on_exit=True) @@ -114,6 +98,16 @@ def _attach_volumes_to_node(node_group, instance): .format(id=instance.instance_id)) +@poll_utils.poll_status( + 'volume_available_timeout', _("Await for volume become available"), + sleep=1) +def _await_available(volume): + volume = cinder.get_volume(volume.id) + if volume.status == 'error': + raise ex.SystemError(_("Volume %s has error status") % volume.id) + return volume.status == 'available' + + def _create_attach_volume(ctx, instance, size, volume_type, volume_local_to_instance, name=None, availability_zone=None): @@ -131,13 +125,7 @@ def _create_attach_volume(ctx, instance, size, volume_type, volume = cinder.client().volumes.create(**kwargs) conductor.append_volume(ctx, instance, volume.id) - - while volume.status != 'available': - volume = cinder.get_volume(volume.id) - if volume.status == 'error': - raise ex.SystemError(_("Volume %s has error status") % volume.id) - - context.sleep(1) + _await_available(volume) resp = nova.client().volumes.create_server_volume( instance.instance_id, volume.id, None) @@ -223,6 +211,15 @@ def detach_from_instance(instance): _delete_volume(volume_id) +@poll_utils.poll_status( + 'detach_volume_timeout', _("Await for volume become detached"), sleep=2) +def _await_detach(volume_id): + volume = cinder.get_volume(volume_id) + if volume.status not in ['available', 'error']: + return False + return True + + def _detach_volume(instance, volume_id): volume = cinder.get_volume(volume_id) try: @@ -233,21 +230,10 @@ def _detach_volume(instance, volume_id): except Exception: LOG.error(_LE("Can't detach volume {id}").format(id=volume.id)) - detach_timeout = CONF.detach_volume_timeout + detach_timeout = CONF.timeouts.detach_volume_timeout LOG.debug("Waiting {timeout} seconds to detach {id} volume".format( timeout=detach_timeout, id=volume_id)) - s_time = tu.utcnow() - while tu.delta_seconds(s_time, tu.utcnow()) < detach_timeout: - volume = cinder.get_volume(volume_id) - if volume.status not in ['available', 'error']: - context.sleep(2) - else: - LOG.debug("Volume {id} has been detached".format(id=volume_id)) - return - else: - LOG.warning(_LW("Can't detach volume {volume}. " - "Current status of volume: {status}").format( - volume=volume_id, status=volume.status)) + _await_detach(volume_id) def _delete_volume(volume_id): diff --git a/sahara/tests/unit/service/test_volumes.py b/sahara/tests/unit/service/test_volumes.py index ae20bd0c..b54ffc4d 100644 --- a/sahara/tests/unit/service/test_volumes.py +++ b/sahara/tests/unit/service/test_volumes.py @@ -117,16 +117,18 @@ class TestAttachVolume(base.SaharaWithDbTestCase): self.assertEqual(2, p_await.call_count) self.assertEqual(4, p_mount.call_count) + @mock.patch('sahara.utils.poll_utils._get_consumed', return_value=0) @mock.patch('sahara.context.sleep') @mock.patch('sahara.service.volumes._count_attached_devices') - def test_await_attach_volume(self, dev_count, p_sleep): + def test_await_attach_volume(self, dev_count, p_sleep, p_get_cons): + self.override_config('await_attach_volumes', 0, group='timeouts') dev_count.return_value = 2 p_sleep.return_value = None instance = r.InstanceResource({'instance_id': '123454321', 'instance_name': 'instt'}) self.assertIsNone(volumes._await_attach_volumes( instance, ['/dev/vda', '/dev/vdb'])) - self.assertRaises(ex.SystemError, volumes._await_attach_volumes, + self.assertRaises(ex.TimeoutException, volumes._await_attach_volumes, instance, ['/dev/vda', '/dev/vdb', '/dev/vdc']) def test_count_attached_devices(self): diff --git a/sahara/utils/poll_utils.py b/sahara/utils/poll_utils.py index c88cec38..731c49fd 100644 --- a/sahara/utils/poll_utils.py +++ b/sahara/utils/poll_utils.py @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import functools + +from oslo_config import cfg from oslo_log import log as logging from oslo_utils import timeutils @@ -26,6 +29,46 @@ LOG = logging.getLogger(__name__) DEFAULT_TIMEOUT = 10800 DEFAULT_SLEEP_TIME = 5 +timeouts_opts = [ + # engine opts + cfg.IntOpt('ips_assign_timeout', + default=DEFAULT_TIMEOUT, + help="Assign IPs timeout, in seconds"), + cfg.IntOpt('wait_until_accessible', + default=DEFAULT_TIMEOUT, + help="Wait for instance accessibility, in seconds"), + + # direct engine opts + cfg.IntOpt('delete_instances_timeout', + default=DEFAULT_TIMEOUT, + help="Wait for instances to be deleted, in seconds"), + cfg.IntOpt('await_for_instances_active', + default=DEFAULT_TIMEOUT, + help="Wait for instances to become active, in seconds"), + + # volumes opts + cfg.IntOpt( + 'detach_volume_timeout', default=300, + help='Timeout for detaching volumes from instance, in seconds', + deprecated_name='detach_volume_timeout', + deprecated_group=None), + + cfg.IntOpt('volume_available_timeout', + default=DEFAULT_TIMEOUT, + help="Wait for volumes to become available, in seconds"), + + cfg.IntOpt('await_attach_volumes', + default=10, + help="Wait for attaching volumes to instances, in seconds") +] + +timeouts = cfg.OptGroup(name='timeouts', + title='Sahara timeouts') + +CONF = cfg.CONF +CONF.register_group(timeouts) +CONF.register_opts(timeouts_opts, group=timeouts) + def _get_consumed(started_at): return timeutils.delta_seconds(started_at, timeutils.utcnow()) @@ -39,8 +82,8 @@ def _get_current_value(cluster, option): return option.default_value -def poll(get_status, kwargs, operation_name=None, timeout_name=None, - timeout=DEFAULT_TIMEOUT, sleep=DEFAULT_SLEEP_TIME, +def poll(get_status, kwargs=None, args=None, operation_name=None, + timeout_name=None, timeout=DEFAULT_TIMEOUT, sleep=DEFAULT_SLEEP_TIME, exception_strategy='raise'): """This util poll status of object obj during some timeout. @@ -63,11 +106,15 @@ def poll(get_status, kwargs, operation_name=None, timeout_name=None, # We shouldn't raise TimeoutException if incorrect timeout specified and # status is ok now. In such way we should execute get_status at least once. at_least_once = True + if not kwargs: + kwargs = {} + if not args: + args = () while at_least_once or _get_consumed(start_time) < timeout: at_least_once = False try: - status = get_status(**kwargs) + status = get_status(*args, **kwargs) except BaseException: if exception_strategy == 'raise': raise @@ -109,3 +156,21 @@ def plugin_option_poll(cluster, get_status, option, operation_name, sleep_time, } poll(**poll_description) + + +def poll_status(option, operation_name, sleep): + def decorator(f): + @functools.wraps(f) + def handler(*args, **kwargs): + poll_description = { + 'get_status': f, + 'kwargs': kwargs, + 'args': args, + 'timeout': getattr(CONF.timeouts, option), + 'operation_name': operation_name, + 'timeout_name': option, + 'sleep': sleep, + } + poll(**poll_description) + return handler + return decorator