fix: update service state to failed, after exhausting DNS retries

Change-Id: I4eea482dca0bd60f92e4b9c6b0b091504fe86907
Related-Bug: 1494430
This commit is contained in:
Sriram Madapusi Vasudevan 2015-09-10 14:11:57 -04:00
parent 6b26c9a918
commit 0165725b6d
8 changed files with 399 additions and 96 deletions

View File

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
import json
import time
@ -30,6 +32,15 @@ LOG = log.getLogger(__name__)
conf = cfg.CONF
conf(project='poppy', prog='poppy', args=[])
DNS_OPTIONS = [
cfg.IntOpt('retries', default=5,
help='Total number of Retries after Exponentially Backing Off')
]
DNS_GROUP = 'driver:dns'
conf.register_opts(DNS_OPTIONS, group=DNS_GROUP)
class CreateProviderServicesTask(task.Task):
default_provides = "responders"
@ -72,7 +83,7 @@ class CreateProviderServicesTask(task.Task):
class CreateServiceDNSMappingTask(task.Task):
default_provides = "dns_responder"
def execute(self, responders, retry_sleep_time):
def execute(self, responders, retry_sleep_time, project_id, service_id):
service_controller, dns = \
memoized_controllers.task_controllers('poppy', 'dns')
dns_responder = dns.create(responders)
@ -100,12 +111,53 @@ class CreateServiceDNSMappingTask(task.Task):
dns_responder[provider_name]))
return dns_responder
def revert(self, responders, retry_sleep_time, **kwargs):
def revert(self, responders, retry_sleep_time,
project_id, service_id, **kwargs):
if self.name in kwargs['flow_failures'].keys():
LOG.info('Sleeping for {0} seconds and '
'retrying'.format(retry_sleep_time))
if retry_sleep_time is not None:
time.sleep(retry_sleep_time)
retries = conf[DNS_GROUP].retries
current_progress = (1.0 / retries)
if hasattr(self, 'retry_progress') \
and hasattr(self, 'retry_index'):
self.retry_index = self.retry_index + 1
self.retry_progress = current_progress * self.retry_index
if not hasattr(self, 'retry_progress') \
and not hasattr(self, 'retry_index'):
self.retry_progress = current_progress
self.retry_index = 1
if self.retry_progress == 1.0:
LOG.warn('Maximum retry attempts of '
'{0} reached for Task {1}'.format(retries, self.name))
LOG.warn('Setting of state of service_id: '
'{0} and project_id: {1} '
'to failed'.format(service_id, project_id))
provider_details_dict = {}
result = kwargs['result']
for responder in responders:
for provider_name in responder:
provider_details_dict[provider_name] = (
provider_details.ProviderDetail(
error_info=result.traceback_str,
status='failed',
error_message='Failed after '
'{0} DNS '
'retries'.format(retries),
error_class=str(result.exc_info[0])))
# serialize provider_details_dict
for provider_name in provider_details_dict:
provider_details_dict[provider_name] = (
provider_details_dict[provider_name].to_dict())
update_provider_details = common.UpdateProviderDetailTask()
update_provider_details.execute(provider_details_dict,
project_id,
service_id)
else:
LOG.warn('Sleeping for {0} seconds and '
'retrying'.format(retry_sleep_time))
if retry_sleep_time is not None:
time.sleep(retry_sleep_time)
class CreateLogDeliveryContainerTask(task.Task):

View File

@ -13,14 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
import json
import time
from oslo_config import cfg
from taskflow import task
from poppy.distributed_task.taskflow.task import common
from poppy.distributed_task.utils import exc_loader
from poppy.distributed_task.utils import memoized_controllers
from poppy.model.helpers import provider_details as pd
from poppy.openstack.common import log
from poppy.transport.pecan.models.request import (
provider_details as req_provider_details
@ -31,6 +35,15 @@ LOG = log.getLogger(__name__)
conf = cfg.CONF
conf(project='poppy', prog='poppy', args=[])
DNS_OPTIONS = [
cfg.IntOpt('retries', default=5,
help='Total number of Retries after Exponentially Backing Off')
]
DNS_GROUP = 'driver:dns'
conf.register_opts(DNS_OPTIONS, group=DNS_GROUP)
class DeleteProviderServicesTask(task.Task):
default_provides = "responders"
@ -58,7 +71,8 @@ class DeleteProviderServicesTask(task.Task):
class DeleteServiceDNSMappingTask(task.Task):
default_provides = "dns_responder"
def execute(self, provider_details, retry_sleep_time):
def execute(self, provider_details, retry_sleep_time,
responders, project_id, service_id):
service_controller, dns = \
memoized_controllers.task_controllers('poppy', 'dns')
@ -96,12 +110,52 @@ class DeleteServiceDNSMappingTask(task.Task):
return dns_responder
def revert(self, provider_details, retry_sleep_time, **kwargs):
def revert(self, provider_details, retry_sleep_time,
responders, project_id, service_id, **kwargs):
if self.name in kwargs['flow_failures'].keys():
LOG.info('Sleeping for {0} seconds and '
'retrying'.format(retry_sleep_time))
if retry_sleep_time is not None:
time.sleep(retry_sleep_time)
retries = conf[DNS_GROUP].retries
current_progress = (1.0 / retries)
if hasattr(self, 'retry_progress') \
and hasattr(self, 'retry_index'):
self.retry_index = self.retry_index + 1
self.retry_progress = current_progress * self.retry_index
if not hasattr(self, 'retry_progress') \
and not hasattr(self, 'retry_index'):
self.retry_progress = current_progress
self.retry_index = 1
if self.retry_progress == 1.0:
LOG.warn('Maximum retry attempts of '
'{0} reached for Task {1}'.format(retries, self.name))
LOG.warn('Setting of state of service_id: '
'{0} and project_id: {1} '
'to failed'.format(service_id, project_id))
provider_details_dict = {}
result = kwargs['result']
for responder in responders:
for provider_name in responder:
provider_details_dict[provider_name] = (
pd.ProviderDetail(
error_info=result.traceback_str,
status='failed',
error_message='Failed after '
'{0} DNS '
'retries'.format(retries),
error_class=str(result.exc_info[0])))
# serialize provider_details_dict
for provider_name in provider_details_dict:
provider_details_dict[provider_name] = (
provider_details_dict[provider_name].to_dict())
update_provider_details = common.UpdateProviderDetailTask()
update_provider_details.execute(provider_details_dict,
project_id,
service_id)
else:
LOG.warn('Sleeping for {0} seconds and '
'retrying'.format(retry_sleep_time))
if retry_sleep_time is not None:
time.sleep(retry_sleep_time)
class GatherProviderDetailsTask(task.Task):

View File

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
import json
import time
@ -32,6 +34,15 @@ LOG = log.getLogger(__name__)
conf = cfg.CONF
conf(project='poppy', prog='poppy', args=[])
DNS_OPTIONS = [
cfg.IntOpt('retries', default=5,
help='Total number of Retries after Exponentially Backing Off')
]
DNS_GROUP = 'driver:dns'
conf.register_opts(DNS_OPTIONS, group=DNS_GROUP)
class UpdateProviderServicesTask(task.Task):
default_provides = "responders"
@ -61,7 +72,8 @@ class UpdateProviderServicesTask(task.Task):
class UpdateServiceDNSMappingTask(task.Task):
default_provides = "dns_responder"
def execute(self, responders, retry_sleep_time, service_old, service_obj):
def execute(self, responders, retry_sleep_time,
service_old, service_obj, project_id, service_id):
service_controller, dns = \
memoized_controllers.task_controllers('poppy', 'dns')
service_obj_json = json.loads(service_obj)
@ -100,12 +112,52 @@ class UpdateServiceDNSMappingTask(task.Task):
return dns_responder
def revert(self, responders, retry_sleep_time, **kwargs):
def revert(self, responders, retry_sleep_time,
project_id, service_id, **kwargs):
if self.name in kwargs['flow_failures'].keys():
LOG.info('Sleeping for {0} seconds and '
'retrying'.format(retry_sleep_time))
if retry_sleep_time is not None:
time.sleep(retry_sleep_time)
retries = conf[DNS_GROUP].retries
current_progress = (1.0 / retries)
if hasattr(self, 'retry_progress') \
and hasattr(self, 'retry_index'):
self.retry_index = self.retry_index + 1
self.retry_progress = current_progress * self.retry_index
if not hasattr(self, 'retry_progress') \
and not hasattr(self, 'retry_index'):
self.retry_progress = current_progress
self.retry_index = 1
if self.retry_progress == 1.0:
LOG.warn('Maximum retry attempts of '
'{0} reached for Task {1}'.format(retries, self.name))
LOG.warn('Setting of state of service_id: '
'{0} and project_id: {1} '
'to failed'.format(service_id, project_id))
provider_details_dict = {}
result = kwargs['result']
for responder in responders:
for provider_name in responder:
provider_details_dict[provider_name] = (
provider_details.ProviderDetail(
error_info=result.traceback_str,
status='failed',
error_message='Failed after '
'{0} DNS '
'retries'.format(retries),
error_class=str(result.exc_info[0])))
# serialize provider_details_dict
for provider_name in provider_details_dict:
provider_details_dict[provider_name] = (
provider_details_dict[provider_name].to_dict())
update_provider_details = common.UpdateProviderDetailTask()
update_provider_details.execute(provider_details_dict,
project_id,
service_id)
else:
LOG.warn('Sleeping for {0} seconds and '
'retrying'.format(retry_sleep_time))
if retry_sleep_time is not None:
time.sleep(retry_sleep_time)
class UpdateLogDeliveryContainerTask(task.Task):

View File

@ -17,6 +17,25 @@ import abc
import six
from oslo_config import cfg
_DRIVER_DNS_OPTIONS = [
cfg.IntOpt(
'retries',
default=5,
help='Total number of Retries after Exponentially Backing Off'),
cfg.IntOpt(
'min_backoff_range',
default=20,
help='Minimum Number of seconds to sleep between retries'),
cfg.IntOpt(
'max_backoff_range',
default=30,
help='Maximum Number of seconds to sleep between retries'),
]
_DRIVER_DNS_GROUP = 'driver:dns'
@six.add_metaclass(abc.ABCMeta)
class DNSDriverBase(object):
@ -36,6 +55,7 @@ class DNSDriverBase(object):
def __init__(self, conf):
self._conf = conf
self._conf.register_opts(_DRIVER_DNS_OPTIONS, group=_DRIVER_DNS_GROUP)
@abc.abstractmethod
def is_alive(self):

View File

@ -17,7 +17,6 @@ import json
import random
import jsonpatch
from oslo_config import cfg
from poppy.common import errors
from poppy.distributed_task.taskflow.flow import create_service
@ -35,29 +34,7 @@ from poppy.transport.validators.schemas import service as service_schema
LOG = log.getLogger(__name__)
DNS_OPTIONS = [
cfg.IntOpt(
'retries',
default=5,
help='Total number of Retries after Exponentially Backing Off'),
cfg.IntOpt(
'min_backoff_range',
default=20,
help='Minimum Number of seconds to sleep between retries'),
cfg.IntOpt(
'max_backoff_range',
default=30,
help='Maximum Number of seconds to sleep between retries'),
]
PROVIDER_OPTIONS = [
cfg.IntOpt(
'default_cache_ttl',
default=86400,
help='Default ttl to be set, when no caching rules are specified'),
]
DNS_GROUP = 'drivers:dns'
DNS_GROUP = 'driver:dns'
PROVIDER_GROUP = 'drivers:provider'
@ -74,10 +51,6 @@ class DefaultServicesController(base.ServicesController):
self.distributed_task_controller = (
self._driver.distributed_task.services_controller)
self.driver.conf.register_opts(DNS_OPTIONS,
group=DNS_GROUP)
self.driver.conf.register_opts(PROVIDER_OPTIONS,
group=PROVIDER_GROUP)
self.dns_conf = self.driver.conf[DNS_GROUP]
self.provider_conf = self.driver.conf[PROVIDER_GROUP]

View File

@ -17,6 +17,17 @@ import abc
import six
from oslo_config import cfg
_PROVIDER_OPTIONS = [
cfg.IntOpt(
'default_cache_ttl',
default=86400,
help='Default ttl to be set, when no caching rules are specified'),
]
_PROVIDER_GROUP = 'drivers:provider'
@six.add_metaclass(abc.ABCMeta)
class ProviderDriverBase(object):
@ -34,6 +45,7 @@ class ProviderDriverBase(object):
def __init__(self, conf):
self._conf = conf
self._conf.register_opts(_PROVIDER_OPTIONS, group=_PROVIDER_GROUP)
@abc.abstractmethod
def is_alive(self):

View File

@ -42,6 +42,25 @@ class TestFlowRuns(base.TestCase):
super(TestFlowRuns, self).setUp()
self.time_factor = 0.001
self.total_retries = 5
self.dns_exception_responder = [
{
'cdn_provider':
{
'error': 'DNSException',
'error_class': 'tests.unit.distributed_task'
'.taskflow.test_flows.DNSException'
}
}
]
self.dns_responder = [
{
'cdn_provider':
{
'success': 'True',
}
}
]
def all_controllers(self):
service_controller, storage_controller = \
@ -50,52 +69,19 @@ class TestFlowRuns(base.TestCase):
memoized_controllers.task_controllers('poppy', 'dns')
return service_controller, storage_controller, dns_controller
def dns_exceptions(self):
def dns_exceptions_and_succeed(self):
# NOTE(TheSriram): create a chain of mocked return values,
# to allow for retries, and finally succeed. The last value
# indicating success, is just shown to indicate
# that exceptions were not thrown.
dns_responder_returns = [
[
{
'cdn_provider':
{
'error': 'DNSException',
'error_class': 'tests.unit.distributed_task'
'.taskflow.test_flows.DNSException'
}
}
],
[
{
'cdn_provider':
{
'error': 'DNSException',
'error_class': 'tests.unit.distributed_task'
'.taskflow.test_flows.DNSException'
}
}
],
[
{
'cdn_provider':
{
'error': 'DNSException',
'error_class': 'tests.unit.distributed_task'
'.taskflow.test_flows.DNSException'
}
}
],
[
{
'cdn_provider':
{
'success': 'True',
}
}
],
]
dns_responder_returns = [self.dns_exception_responder * 3]
dns_responder_returns.append(self.dns_responder)
return dns_responder_returns
def dns_exceptions_only(self):
# NOTE(TheSriram): create a chain of mocked return values,
# to allow for retries, and finally fail.
dns_responder_returns = [self.dns_exception_responder * 5]
return dns_responder_returns
def patch_create_flow(self, service_controller,
@ -439,7 +425,8 @@ class TestFlowRuns(base.TestCase):
engines.run(delete_service.delete_service(), store=kwargs)
@mock.patch('pyrax.set_credentials')
def test_update_flow_dns_exception_with_retry(self, mock_creds):
def test_update_flow_dns_exception_with_retry_and_succeed(self,
mock_creds):
service_id = str(uuid.uuid4())
domains_old = domain.Domain(domain='cdn.poppy.org')
domains_new = domain.Domain(domain='mycdn.poppy.org')
@ -476,7 +463,7 @@ class TestFlowRuns(base.TestCase):
self.patch_update_flow(service_controller, storage_controller,
dns_controller)
dns_controller.update = mock.Mock()
dns_responder_returns = self.dns_exceptions()
dns_responder_returns = self.dns_exceptions_and_succeed()
dns_controller.update._mock_side_effect = (dns_responder for
dns_responder in
@ -485,7 +472,54 @@ class TestFlowRuns(base.TestCase):
engines.run(update_service.update_service(), store=kwargs)
@mock.patch('pyrax.set_credentials')
def test_create_flow_dns_exception_with_retry(self, mock_creds):
def test_update_flow_dns_exception_with_retry_and_fail(self, mock_creds):
service_id = str(uuid.uuid4())
domains_old = domain.Domain(domain='cdn.poppy.org')
domains_new = domain.Domain(domain='mycdn.poppy.org')
current_origin = origin.Origin(origin='poppy.org')
service_old = service.Service(service_id=service_id,
name='poppy cdn service',
domains=[domains_old],
origins=[current_origin],
flavor_id='cdn')
service_new = service.Service(service_id=service_id,
name='poppy cdn service',
domains=[domains_new],
origins=[current_origin],
flavor_id='cdn')
kwargs = {
'project_id': json.dumps(str(uuid.uuid4())),
'auth_token': json.dumps(str(uuid.uuid4())),
'service_id': json.dumps(service_id),
'time_seconds': [i * self.time_factor for
i in range(self.total_retries)],
'service_old': json.dumps(service_old.to_dict()),
'service_obj': json.dumps(service_new.to_dict())
}
service_controller, storage_controller, dns_controller = \
self.all_controllers()
with MonkeyPatchControllers(service_controller,
dns_controller,
storage_controller,
memoized_controllers.task_controllers):
self.patch_update_flow(service_controller, storage_controller,
dns_controller)
dns_controller.update = mock.Mock()
dns_responder_returns = self.dns_exceptions_only()
dns_controller.update._mock_side_effect = (dns_responder for
dns_responder in
dns_responder_returns)
engines.run(update_service.update_service(), store=kwargs)
@mock.patch('pyrax.set_credentials')
def test_create_flow_dns_exception_with_retry_and_succeed(self,
mock_creds):
providers = ['cdn_provider']
kwargs = {
'providers_list_json': json.dumps(providers),
@ -508,7 +542,7 @@ class TestFlowRuns(base.TestCase):
dns_controller)
dns_controller.create = mock.Mock()
dns_responder_returns = self.dns_exceptions()
dns_responder_returns = self.dns_exceptions_and_succeed()
dns_controller.create._mock_side_effect = (dns_responder for
dns_responder in
@ -517,7 +551,40 @@ class TestFlowRuns(base.TestCase):
engines.run(create_service.create_service(), store=kwargs)
@mock.patch('pyrax.set_credentials')
def test_delete_flow_dns_exception_with_retry(self, mock_creds):
def test_create_flow_dns_exception_with_retry_and_fail(self, mock_creds):
providers = ['cdn_provider']
kwargs = {
'providers_list_json': json.dumps(providers),
'project_id': json.dumps(str(uuid.uuid4())),
'auth_token': json.dumps(str(uuid.uuid4())),
'service_id': json.dumps(str(uuid.uuid4())),
'time_seconds': [i * self.time_factor for
i in range(self.total_retries)]
}
service_controller, storage_controller, dns_controller = \
self.all_controllers()
with MonkeyPatchControllers(service_controller,
dns_controller,
storage_controller,
memoized_controllers.task_controllers):
self.patch_create_flow(service_controller, storage_controller,
dns_controller)
dns_controller.create = mock.Mock()
dns_responder_returns = self.dns_exceptions_only()
dns_controller.create._mock_side_effect = (dns_responder for
dns_responder in
dns_responder_returns)
engines.run(create_service.create_service(), store=kwargs)
@mock.patch('pyrax.set_credentials')
def test_delete_flow_dns_exception_with_retry_and_succeed(self,
mock_creds):
service_id = str(uuid.uuid4())
domains_old = domain.Domain(domain='cdn.poppy.org')
current_origin = origin.Origin(origin='poppy.org')
@ -548,7 +615,46 @@ class TestFlowRuns(base.TestCase):
self.patch_delete_flow(service_controller, storage_controller,
dns_controller)
dns_controller.delete = mock.Mock()
dns_responder_returns = self.dns_exceptions()
dns_responder_returns = self.dns_exceptions_and_succeed()
dns_controller.delete._mock_side_effect = (dns_responder for
dns_responder in
dns_responder_returns)
engines.run(delete_service.delete_service(), store=kwargs)
@mock.patch('pyrax.set_credentials')
def test_delete_flow_dns_exception_with_retry_and_fail(self, mock_creds):
service_id = str(uuid.uuid4())
domains_old = domain.Domain(domain='cdn.poppy.org')
current_origin = origin.Origin(origin='poppy.org')
service_obj = service.Service(service_id=service_id,
name='poppy cdn service',
domains=[domains_old],
origins=[current_origin],
flavor_id='cdn')
kwargs = {
'project_id': json.dumps(str(uuid.uuid4())),
'service_id': json.dumps(service_id),
'time_seconds': [i * self.time_factor for
i in range(self.total_retries)],
'provider_details': json.dumps(
dict([(k, v.to_dict())
for k, v in service_obj.provider_details.items()]))
}
service_controller, storage_controller, dns_controller = \
self.all_controllers()
with MonkeyPatchControllers(service_controller,
dns_controller,
storage_controller,
memoized_controllers.task_controllers):
self.patch_delete_flow(service_controller, storage_controller,
dns_controller)
dns_controller.delete = mock.Mock()
dns_responder_returns = self.dns_exceptions_only()
dns_controller.delete._mock_side_effect = (dns_responder for
dns_responder in

View File

@ -110,6 +110,36 @@ class DefaultManagerServiceTests(base.TestCase):
# create mocked config and driver
conf = cfg.ConfigOpts()
_DRIVER_DNS_OPTIONS = [
cfg.IntOpt(
'retries',
default=5,
help='Total number of Retries after '
'Exponentially Backing Off'),
cfg.IntOpt(
'min_backoff_range',
default=20,
help='Minimum Number of seconds to sleep between retries'),
cfg.IntOpt(
'max_backoff_range',
default=30,
help='Maximum Number of seconds to sleep between retries'),
]
_PROVIDER_OPTIONS = [
cfg.IntOpt(
'default_cache_ttl',
default=86400,
help='Default ttl to be set, when no caching '
'rules are specified'),
]
_DRIVER_DNS_GROUP = 'driver:dns'
_PROVIDER_GROUP = 'drivers:provider'
conf.register_opts(_PROVIDER_OPTIONS, group=_PROVIDER_GROUP)
conf.register_opts(_DRIVER_DNS_OPTIONS, group=_DRIVER_DNS_GROUP)
self.bootstrap_obj = mock_bootstrap(conf)
# mock a stevedore provider extension
@ -208,7 +238,8 @@ class DefaultManagerServiceTests(base.TestCase):
in self.provider_details.items()]))
responders = delete_provider.execute(provider_details)
delete_dns = delete_service_tasks.DeleteServiceDNSMappingTask()
dns_responders = delete_dns.execute(provider_details, 0)
dns_responders = delete_dns.execute(provider_details, 0, responders,
self.project_id, self.service_id)
gather_provider = delete_service_tasks.GatherProviderDetailsTask()
changed_provider_dict = gather_provider.execute(responders,
@ -232,7 +263,8 @@ class DefaultManagerServiceTests(base.TestCase):
self.project_id,
self.service_id)
create_dns = create_service_tasks.CreateServiceDNSMappingTask()
dns_responder = create_dns.execute(responders, 0)
dns_responder = create_dns.execute(responders, 0, self.project_id,
self.service_id)
gather_provider = create_service_tasks.GatherProviderDetailsTask()
log_responder = \
create_service_tasks.CreateLogDeliveryContainerTask()
@ -264,7 +296,9 @@ class DefaultManagerServiceTests(base.TestCase):
update_dns = update_service_tasks.UpdateServiceDNSMappingTask()
dns_responder = update_dns.execute(responders, 0, service_old,
service_updates_json)
service_updates_json,
self.project_id,
self.service_id)
log_delivery_update = \
update_service_tasks.UpdateLogDeliveryContainerTask()