Use oslo.service loopingcall
Instead of the locally implemented looping call classes use the standard once from oslo.service. This allows us to automatically switch between the eventlet and the native threading based implementation when nova-compute starts supporting both. Change-Id: Ida95f739d5f1c41595b07175fdb5239d7cf4d4fb Signed-off-by: Balazs Gibizer <gibi@redhat.com>
This commit is contained in:
@@ -25,11 +25,10 @@ import logging
|
||||
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_context import context
|
||||
from oslo_service import loopingcall
|
||||
from oslo_utils import excutils
|
||||
from oslo_utils import reflection
|
||||
|
||||
from oslo_vmware._i18n import _
|
||||
from oslo_vmware.common import loopingcall
|
||||
from oslo_vmware import exceptions
|
||||
from oslo_vmware import pbm
|
||||
from oslo_vmware import vim
|
||||
@@ -45,85 +44,6 @@ def _trunc_id(session_id):
|
||||
return session_id[-5:]
|
||||
|
||||
|
||||
# TODO(vbala) Move this class to excutils.py.
|
||||
class RetryDecorator:
|
||||
"""Decorator for retrying a function upon suggested exceptions.
|
||||
|
||||
The decorated function is retried for the given number of times, and the
|
||||
sleep time between the retries is incremented until max sleep time is
|
||||
reached. If the max retry count is set to -1, then the decorated function
|
||||
is invoked indefinitely until an exception is thrown, and the caught
|
||||
exception is not in the list of suggested exceptions.
|
||||
"""
|
||||
|
||||
def __init__(self, max_retry_count=-1, inc_sleep_time=10,
|
||||
max_sleep_time=60, exceptions=()):
|
||||
"""Configure the retry object using the input params.
|
||||
|
||||
:param max_retry_count: maximum number of times the given function must
|
||||
be retried when one of the input 'exceptions'
|
||||
is caught. When set to -1, it will be retried
|
||||
indefinitely until an exception is thrown
|
||||
and the caught exception is not in param
|
||||
exceptions.
|
||||
:param inc_sleep_time: incremental time in seconds for sleep time
|
||||
between retries
|
||||
:param max_sleep_time: max sleep time in seconds beyond which the sleep
|
||||
time will not be incremented using param
|
||||
inc_sleep_time. On reaching this threshold,
|
||||
max_sleep_time will be used as the sleep time.
|
||||
:param exceptions: suggested exceptions for which the function must be
|
||||
retried
|
||||
"""
|
||||
self._max_retry_count = max_retry_count
|
||||
self._inc_sleep_time = inc_sleep_time
|
||||
self._max_sleep_time = max_sleep_time
|
||||
self._exceptions = exceptions
|
||||
self._retry_count = 0
|
||||
self._sleep_time = 0
|
||||
|
||||
def __call__(self, f):
|
||||
func_name = reflection.get_callable_name(f)
|
||||
|
||||
def _func(*args, **kwargs):
|
||||
result = None
|
||||
try:
|
||||
if self._retry_count:
|
||||
LOG.debug("Invoking %(func_name)s; retry count is "
|
||||
"%(retry_count)d.",
|
||||
{'func_name': func_name,
|
||||
'retry_count': self._retry_count})
|
||||
result = f(*args, **kwargs)
|
||||
except self._exceptions:
|
||||
with excutils.save_and_reraise_exception() as ctxt:
|
||||
LOG.warning("Exception which is in the suggested list "
|
||||
"of exceptions occurred while invoking "
|
||||
"function: %s.",
|
||||
func_name,
|
||||
exc_info=True)
|
||||
if (self._max_retry_count != -1 and
|
||||
self._retry_count >= self._max_retry_count):
|
||||
LOG.error("Cannot retry upon suggested exception "
|
||||
"since retry count (%(retry_count)d) "
|
||||
"reached max retry count "
|
||||
"(%(max_retry_count)d).",
|
||||
{'retry_count': self._retry_count,
|
||||
'max_retry_count': self._max_retry_count})
|
||||
else:
|
||||
ctxt.reraise = False
|
||||
self._retry_count += 1
|
||||
self._sleep_time += self._inc_sleep_time
|
||||
return self._sleep_time
|
||||
raise loopingcall.LoopingCallDone(result)
|
||||
|
||||
def func(*args, **kwargs):
|
||||
loop = loopingcall.DynamicLoopingCall(_func, *args, **kwargs)
|
||||
evt = loop.start(periodic_interval_max=self._max_sleep_time)
|
||||
return evt.wait()
|
||||
|
||||
return func
|
||||
|
||||
|
||||
class VMwareAPISession:
|
||||
"""Setup a session with the server and handles all calls made to it.
|
||||
|
||||
@@ -226,7 +146,8 @@ class VMwareAPISession:
|
||||
self._pbm.set_soap_cookie(self._vim.get_http_cookie())
|
||||
return self._pbm
|
||||
|
||||
@RetryDecorator(exceptions=(exceptions.VimConnectionException,))
|
||||
@loopingcall.RetryDecorator(
|
||||
exceptions=(exceptions.VimConnectionException,))
|
||||
@lockutils.synchronized('oslo_vmware_api_lock')
|
||||
def _create_session(self):
|
||||
"""Establish session with the server."""
|
||||
@@ -292,9 +213,10 @@ class VMwareAPISession:
|
||||
VimSessionOverLoadException, VimConnectionException
|
||||
"""
|
||||
|
||||
@RetryDecorator(max_retry_count=self._api_retry_count,
|
||||
exceptions=(exceptions.VimSessionOverLoadException,
|
||||
exceptions.VimConnectionException))
|
||||
@loopingcall.RetryDecorator(
|
||||
max_retry_count=self._api_retry_count,
|
||||
exceptions=(exceptions.VimSessionOverLoadException,
|
||||
exceptions.VimConnectionException))
|
||||
def _invoke_api(module, method, *args, **kwargs):
|
||||
try:
|
||||
api_method = getattr(module, method)
|
||||
|
||||
@@ -1,143 +0,0 @@
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# Copyright 2011 Justin Santa Barbara
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from eventlet import event
|
||||
from eventlet import greenthread
|
||||
from oslo_utils import timeutils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LoopingCallDone(Exception):
|
||||
"""Exception to break out and stop a LoopingCall.
|
||||
|
||||
The poll-function passed to LoopingCall can raise this exception to
|
||||
break out of the loop normally. This is somewhat analogous to
|
||||
StopIteration.
|
||||
|
||||
An optional return-value can be included as the argument to the exception;
|
||||
this return-value will be returned by LoopingCall.wait()
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, retvalue=True):
|
||||
""":param retvalue: Value that LoopingCall.wait() should return."""
|
||||
self.retvalue = retvalue
|
||||
|
||||
|
||||
class LoopingCallBase:
|
||||
def __init__(self, f=None, *args, **kw):
|
||||
self.args = args
|
||||
self.kw = kw
|
||||
self.f = f
|
||||
self._running = False
|
||||
self.done = None
|
||||
|
||||
def stop(self):
|
||||
self._running = False
|
||||
|
||||
def wait(self):
|
||||
return self.done.wait()
|
||||
|
||||
|
||||
class FixedIntervalLoopingCall(LoopingCallBase):
|
||||
"""A fixed interval looping call."""
|
||||
|
||||
def start(self, interval, initial_delay=None):
|
||||
self._running = True
|
||||
done = event.Event()
|
||||
|
||||
def _inner():
|
||||
if initial_delay:
|
||||
greenthread.sleep(initial_delay)
|
||||
|
||||
try:
|
||||
while self._running:
|
||||
start = timeutils.utcnow()
|
||||
self.f(*self.args, **self.kw)
|
||||
end = timeutils.utcnow()
|
||||
if not self._running:
|
||||
break
|
||||
delay = interval - timeutils.delta_seconds(start, end)
|
||||
if delay <= 0:
|
||||
LOG.warning('task run outlasted interval '
|
||||
'by %s sec',
|
||||
-delay)
|
||||
greenthread.sleep(delay if delay > 0 else 0)
|
||||
except LoopingCallDone as e:
|
||||
self.stop()
|
||||
done.send(e.retvalue)
|
||||
except Exception:
|
||||
done.send_exception(*sys.exc_info())
|
||||
return
|
||||
else:
|
||||
done.send(True)
|
||||
|
||||
self.done = done
|
||||
|
||||
greenthread.spawn_n(_inner)
|
||||
return self.done
|
||||
|
||||
|
||||
# TODO(mikal): this class name is deprecated in Havana and should be removed
|
||||
# in the I release
|
||||
LoopingCall = FixedIntervalLoopingCall
|
||||
|
||||
|
||||
class DynamicLoopingCall(LoopingCallBase):
|
||||
"""A looping call which sleeps until the next known event.
|
||||
|
||||
The function called should return how long to sleep for before being
|
||||
called again.
|
||||
"""
|
||||
|
||||
def start(self, initial_delay=None, periodic_interval_max=None):
|
||||
self._running = True
|
||||
done = event.Event()
|
||||
|
||||
def _inner():
|
||||
if initial_delay:
|
||||
greenthread.sleep(initial_delay)
|
||||
|
||||
try:
|
||||
while self._running:
|
||||
idle = self.f(*self.args, **self.kw)
|
||||
if not self._running:
|
||||
break
|
||||
|
||||
if periodic_interval_max is not None:
|
||||
idle = min(idle, periodic_interval_max)
|
||||
LOG.debug('Dynamic looping call sleeping for %.02f '
|
||||
'seconds', idle)
|
||||
greenthread.sleep(idle)
|
||||
except LoopingCallDone as e:
|
||||
self.stop()
|
||||
done.send(e.retvalue)
|
||||
except Exception:
|
||||
done.send_exception(*sys.exc_info())
|
||||
return
|
||||
else:
|
||||
done.send(True)
|
||||
|
||||
self.done = done
|
||||
|
||||
greenthread.spawn(_inner)
|
||||
return self.done
|
||||
@@ -22,9 +22,9 @@ import tarfile
|
||||
|
||||
from eventlet import timeout
|
||||
|
||||
from oslo_service import loopingcall
|
||||
from oslo_utils import units
|
||||
from oslo_vmware._i18n import _
|
||||
from oslo_vmware.common import loopingcall
|
||||
from oslo_vmware import constants
|
||||
from oslo_vmware import exceptions
|
||||
from oslo_vmware import image_util
|
||||
|
||||
@@ -29,71 +29,6 @@ from oslo_vmware.tests import base
|
||||
from oslo_vmware import vim_util
|
||||
|
||||
|
||||
class RetryDecoratorTest(base.TestCase):
|
||||
"""Tests for retry decorator class."""
|
||||
|
||||
def test_retry(self):
|
||||
result = "RESULT"
|
||||
|
||||
@api.RetryDecorator()
|
||||
def func(*args, **kwargs):
|
||||
return result
|
||||
|
||||
self.assertEqual(result, func())
|
||||
|
||||
def func2(*args, **kwargs):
|
||||
return result
|
||||
|
||||
retry = api.RetryDecorator()
|
||||
self.assertEqual(result, retry(func2)())
|
||||
self.assertTrue(retry._retry_count == 0)
|
||||
|
||||
def test_retry_with_expected_exceptions(self):
|
||||
result = "RESULT"
|
||||
responses = [exceptions.VimSessionOverLoadException(None),
|
||||
exceptions.VimSessionOverLoadException(None),
|
||||
result]
|
||||
|
||||
def func(*args, **kwargs):
|
||||
response = responses.pop(0)
|
||||
if isinstance(response, Exception):
|
||||
raise response
|
||||
return response
|
||||
|
||||
sleep_time_incr = 0.01
|
||||
retry_count = 2
|
||||
retry = api.RetryDecorator(10, sleep_time_incr, 10,
|
||||
(exceptions.VimSessionOverLoadException,))
|
||||
self.assertEqual(result, retry(func)())
|
||||
self.assertTrue(retry._retry_count == retry_count)
|
||||
self.assertEqual(retry_count * sleep_time_incr, retry._sleep_time)
|
||||
|
||||
def test_retry_with_max_retries(self):
|
||||
responses = [exceptions.VimSessionOverLoadException(None),
|
||||
exceptions.VimSessionOverLoadException(None),
|
||||
exceptions.VimSessionOverLoadException(None)]
|
||||
|
||||
def func(*args, **kwargs):
|
||||
response = responses.pop(0)
|
||||
if isinstance(response, Exception):
|
||||
raise response
|
||||
return response
|
||||
|
||||
retry = api.RetryDecorator(2, 0, 0,
|
||||
(exceptions.VimSessionOverLoadException,))
|
||||
self.assertRaises(exceptions.VimSessionOverLoadException, retry(func))
|
||||
self.assertTrue(retry._retry_count == 2)
|
||||
|
||||
def test_retry_with_unexpected_exception(self):
|
||||
|
||||
def func(*args, **kwargs):
|
||||
raise exceptions.VimException(None)
|
||||
|
||||
retry = api.RetryDecorator()
|
||||
self.assertRaises(exceptions.VimException, retry(func))
|
||||
self.assertTrue(retry._retry_count == 0)
|
||||
|
||||
|
||||
class VMwareAPISessionTest(base.TestCase):
|
||||
"""Tests for VMwareAPISession."""
|
||||
|
||||
|
||||
@@ -313,7 +313,7 @@ class ImageTransferUtilityTest(base.TestCase):
|
||||
|
||||
@mock.patch.object(image_transfer, '_start_transfer')
|
||||
@mock.patch('oslo_vmware.rw_handles.VmdkReadHandle')
|
||||
@mock.patch('oslo_vmware.common.loopingcall.FixedIntervalLoopingCall')
|
||||
@mock.patch('oslo_service.loopingcall.FixedIntervalLoopingCall')
|
||||
def test_copy_stream_optimized_disk(
|
||||
self, loopingcall, vmdk_read_handle, start_transfer):
|
||||
|
||||
|
||||
@@ -14,3 +14,4 @@ requests>=2.14.2 # Apache-2.0
|
||||
urllib3>=1.21.1 # MIT
|
||||
oslo.concurrency>=3.26.0 # Apache-2.0
|
||||
oslo.context>=2.19.2 # Apache-2.0
|
||||
oslo.service>=4.3.0 # Apache-2.0
|
||||
|
||||
Reference in New Issue
Block a user