Enables creating container for log_delivery

This patch creates swift container for log delivery, if log_delivery
is enabled.

Change-Id: Ifbabec501a1f8578d5ed5e97feda8e144f93f468
This commit is contained in:
Sriram Madapusi Vasudevan
2015-04-08 18:49:27 -04:00
parent d10bb01b85
commit add80eb0c9
13 changed files with 387 additions and 19 deletions

View File

@@ -140,3 +140,10 @@ akamai_http_config_number = 'MY_AKAMAI_HTTP_CONFIG_NUMBER'
akamai_https_config_number = 'MY_AKAMAI_HTTPS_CONFIG_NUMBER'
san_cert_cnames = "MY_SAN_CERT_LIST"
san_cert_hostname_limit = "MY_SAN_HOSTNAMES_LMIT"
[log_delivery]
identity_url = OPENSTACK_IDENTITY_URL
prefered_dc = DC1,DC2
container_name = .CDN_ACCESS_LOGS

View File

@@ -39,8 +39,9 @@ def create_service():
provides='retry_sleep_time')).add(
create_service_tasks.CreateServiceDNSMappingTask(
rebind=['responders'])),
create_service_tasks.CreateLogDeliveryContainerTask(),
create_service_tasks.GatherProviderDetailsTask(
rebind=['responders', 'dns_responder']),
rebind=['responders', 'dns_responder', 'log_responders']),
common.UpdateProviderDetailTask(rebind=['provider_details_dict'])
)
return flow

View File

@@ -39,8 +39,9 @@ def update_service():
provides='retry_sleep_time')).add(
update_service_tasks.UpdateServiceDNSMappingTask(
rebind=['responders'])),
update_service_tasks.UpdateLogDeliveryContainerTask(),
update_service_tasks.GatherProviderDetailsTask(
rebind=['responders', 'dns_responder']),
rebind=['responders', 'dns_responder', 'log_responders']),
update_service_tasks.UpdateProviderDetailsTask_Errors(
rebind=['provider_details_dict_errors_tuple'])
)

View File

@@ -14,6 +14,7 @@
# limitations under the License.
import json
import requests
from oslo.config import cfg
from taskflow import task
@@ -31,6 +32,86 @@ LOG = log.getLogger(__name__)
conf = cfg.CONF
conf(project='poppy', prog='poppy', args=[])
LOG_DELIVERY_OPTIONS = [
cfg.StrOpt('identity_url', default='',
help='OpenStack Identity URL'),
cfg.StrOpt('container_name', default='.CDN_ACCESS_LOGS',
help='Swift container to put logs'),
cfg.ListOpt('preferred_dcs', default=['DC1', 'DC2'],
help='Preferred DCs to create container'),
]
LOG_DELIVERY_GROUP = 'log_delivery'
def create_log_delivery_container(project_id, auth_token):
# log delivery enabled, create log delivery container for the user
conf.register_opts(LOG_DELIVERY_OPTIONS, group=LOG_DELIVERY_GROUP)
identity_url = conf['log_delivery']['identity_url']
container_name = conf['log_delivery']['container_name']
preferred_dcs = conf['log_delivery']['preferred_dcs']
payload = {
"auth": {
"tenantName": project_id,
"token": {
"id": auth_token
}
}
}
headers = {'Content-Type': 'application/json'}
try:
response = requests.post(identity_url,
data=json.dumps(payload),
headers=headers)
catalog = response.json()
services = catalog['access']['serviceCatalog']
except KeyError:
LOG.info("Could not authenticate "
"with keystone : {0}".format(response.text))
LOG.info("Skipping container {0} creation".format(container_name))
return []
swifturl_public = None
swifturl_internal = None
for service in services:
if service['type'] == 'object-store':
endpoints = service['endpoints']
for endpoint in endpoints:
if endpoint['region'] in preferred_dcs:
# TODO(obulpathi): Add both public and private urls.
# Only internal urls does not work because, not all
# containers are accessable from all DC's using
# internal urls
swifturl_public = endpoint['publicURL']
swifturl_internal = endpoint['internalURL']
break
if swifturl_public and swifturl_internal:
public_container_url = '{0}/{1}'.format(swifturl_public,
container_name)
internal_container_url = '{0}/{1}'.format(swifturl_internal,
container_name)
headers = {'Content-Type': 'application/json',
'X-Auth-Token': auth_token}
LOG.info('Starting to '
'create container {0}'.format(public_container_url))
response = requests.put(public_container_url,
None,
headers=headers)
if response.ok:
LOG.info('Created container {0}'.format(public_container_url))
log_responders = [public_container_url, internal_container_url]
return log_responders
else:
LOG.info('Error creating '
'container {0}'.format(public_container_url))
return []
else:
return []
class UpdateProviderDetailTask(task.Task):
@@ -44,6 +125,14 @@ class UpdateProviderDetailTask(task.Task):
storage_controller = service_controller.storage_controller
service_obj = storage_controller.get(project_id, service_id)
service_obj.provider_details = provider_details_dict
enabled = lambda provider: any([True if 'log_delivery'
in access_url else False
for access_url
in provider.access_urls])
if not all(map(enabled, provider_details_dict.values())):
service_obj.log_delivery.enabled = False
storage_controller.update(project_id, service_id, service_obj)
storage_controller._driver.close_connection()

View File

@@ -20,6 +20,7 @@ from oslo.config import cfg
from taskflow import task
from poppy import bootstrap
from poppy.distributed_task.taskflow.task import common
from poppy.model.helpers import provider_details
from poppy.openstack.common import log
@@ -84,10 +85,37 @@ class CreateServiceDNSMappingTask(task.Task):
time.sleep(retry_sleep_time)
class CreateLogDeliveryContainerTask(task.Task):
default_provides = "log_responders"
def execute(self, project_id, auth_token, service_id):
bootstrap_obj = bootstrap.Bootstrap(conf)
service_controller = bootstrap_obj.manager.services_controller
storage_controller = service_controller.storage_controller
try:
service_obj = storage_controller.get(project_id, service_id)
storage_controller._driver.close_connection()
except ValueError:
msg = 'Creating service {0} from Poppy failed. ' \
'No such service exists'.format(service_id)
LOG.info(msg)
raise Exception(msg)
# if log delivery is not enabled, return
if not service_obj.log_delivery.enabled:
return []
# log delivery enabled, create log delivery container for the user
log_responders = common.create_log_delivery_container(
project_id, auth_token)
return log_responders
class GatherProviderDetailsTask(task.Task):
default_provides = "provider_details_dict"
def execute(self, responders, dns_responder):
def execute(self, responders, dns_responder, log_responders):
provider_details_dict = {}
for responder in responders:
for provider_name in responder:
@@ -111,6 +139,8 @@ class GatherProviderDetailsTask(task.Task):
error_message=error_msg))
else:
access_urls = dns_responder[provider_name]['access_urls']
if log_responders:
access_urls.append({'log_delivery': log_responders})
provider_details_dict[provider_name] = (
provider_details.ProviderDetail(
provider_service_id=responder[provider_name]['id'],

View File

@@ -20,6 +20,7 @@ from oslo.config import cfg
from taskflow import task
from poppy import bootstrap
from poppy.distributed_task.taskflow.task import common
from poppy.model.helpers import provider_details
from poppy.openstack.common import log
from poppy.transport.pecan.models.request import service
@@ -92,10 +93,29 @@ class UpdateServiceDNSMappingTask(task.Task):
time.sleep(retry_sleep_time)
class UpdateLogDeliveryContainerTask(task.Task):
default_provides = "log_responders"
def execute(self, project_id, auth_token, service_old, service_obj):
service_old_json = json.loads(service_old)
service_obj_json = json.loads(service_obj)
# check if log delivery is enabled in this PATCH
if service_old_json['log_delivery']['enabled']:
return
if not service_obj_json['log_delivery']['enabled']:
return
log_responders = common.create_log_delivery_container(
project_id, auth_token)
return log_responders
class GatherProviderDetailsTask(task.Task):
default_provides = "provider_details_dict_errors_tuple"
def execute(self, responders, dns_responder, project_id,
def execute(self, responders, dns_responder, log_responders, project_id,
service_id, service_obj):
bootstrap_obj = bootstrap.Bootstrap(conf)
@@ -127,6 +147,12 @@ class GatherProviderDetailsTask(task.Task):
error_message=error_msg))
else:
access_urls = dns_responder[provider_name]['access_urls']
if log_responders:
access_urls.append({'log_delivery': log_responders})
provider_details_dict[provider_name] = (
provider_details.ProviderDetail(
provider_service_id=responder[provider_name]['id'],
access_urls=access_urls))
provider_details_dict[provider_name] = (
provider_details.ProviderDetail(
provider_service_id=responder[provider_name]['id'],

View File

@@ -403,7 +403,8 @@ class ServicesController(base.ServicesBase):
access_urls = provider_detail.access_urls
old_access_urls_map[provider_name] = {'access_urls': access_urls}
for access_url in access_urls:
old_domains.add(access_url['domain'])
if 'domain' in access_url:
old_domains.add(access_url['domain'])
# if there is a provider error, don't try dns update
for responder in responders:

View File

@@ -52,7 +52,7 @@ class ServicesControllerBase(controller.ManagerControllerBase):
raise NotImplementedError
@abc.abstractmethod
def create(self, project_id, service_obj):
def create(self, project_id, auth_token, service_obj):
"""create
:param project_id

View File

@@ -135,7 +135,7 @@ class DefaultServicesController(base.ServicesController):
"""
return self.storage_controller.get(project_id, service_id)
def create(self, project_id, service_json):
def create(self, project_id, auth_token, service_json):
"""create.
:param project_id
@@ -179,6 +179,7 @@ class DefaultServicesController(base.ServicesController):
kwargs = {
'providers_list_json': json.dumps(providers),
'project_id': project_id,
'auth_token': auth_token,
'service_id': service_id,
'time_seconds': self.determine_sleep_times()
}
@@ -188,7 +189,7 @@ class DefaultServicesController(base.ServicesController):
return service_obj
def update(self, project_id, service_id, service_updates):
def update(self, project_id, service_id, auth_token, service_updates):
"""update.
:param project_id
@@ -271,6 +272,7 @@ class DefaultServicesController(base.ServicesController):
kwargs = {
'project_id': project_id,
'service_id': service_id,
'auth_token': auth_token,
'service_old': json.dumps(service_old.to_dict()),
'service_obj': json.dumps(service_new.to_dict()),
'time_seconds': self.determine_sleep_times()

View File

@@ -162,8 +162,9 @@ class ServicesController(base.Controller, hooks.HookController):
service_json_dict = json.loads(pecan.request.body.decode('utf-8'))
service_id = None
try:
service_obj = services_controller.create(
self.project_id, service_json_dict)
service_obj = services_controller.create(self.project_id,
self.auth_token,
service_json_dict)
service_id = service_obj.service_id
except LookupError as e: # error handler for no flavor
pecan.abort(400, detail=str(e))
@@ -211,7 +212,7 @@ class ServicesController(base.Controller, hooks.HookController):
try:
services_controller.update(
self.project_id, service_id, service_updates)
self.project_id, service_id, self.auth_token, service_updates)
except exceptions.ValidationFailed as e:
pecan.abort(400, detail=str(e))
except LookupError as e: # error handler for no flavor

View File

@@ -63,3 +63,6 @@ class ContextHook(hooks.PecanHook):
"tenant", None)
state.controller.__self__.base_url = getattr(local.store.context,
"base_url", None)
'''Attach auth_token as a member variable project_id to controller.'''
state.controller.__self__.auth_token = getattr(local.store.context,
"auth_token", None)

View File

@@ -68,9 +68,14 @@ class Model(collections.OrderedDict):
# add the access urls
access_urls = provider_detail.access_urls
for access_url in access_urls:
self["links"].append(link.Model(
access_url['operator_url'],
'access_url'))
if 'operator_url' in access_url:
self['links'].append(link.Model(
access_url['operator_url'],
'access_url'))
elif 'log_delivery' in access_url:
self['links'].append(link.Model(
access_url['log_delivery'],
'log_delivery'))
# add any errors
error_message = provider_detail.error_message

View File

@@ -14,16 +14,19 @@
# limitations under the License.
import json
import random
import uuid
import ddt
import mock
from oslo.config import cfg
import requests
from poppy.distributed_task.taskflow.task import common
from poppy.distributed_task.taskflow.task import create_service_tasks
from poppy.distributed_task.taskflow.task import delete_service_tasks
from poppy.distributed_task.taskflow.task import purge_service_tasks
from poppy.distributed_task.taskflow.task import update_service_tasks
from poppy.manager.default import driver
@@ -34,6 +37,20 @@ from poppy.transport.pecan.models.request import service
from tests.unit import base
class Response(object):
def __init__(self, resp_status, resp_json=None):
self.resp_status = resp_status
self.resp_json = resp_json
@property
def ok(self):
return self.resp_status
def json(self):
return self.resp_json
@ddt.ddt
class DefaultManagerServiceTests(base.TestCase):
@@ -75,6 +92,7 @@ class DefaultManagerServiceTests(base.TestCase):
self.project_id = str(uuid.uuid4())
self.service_name = str(uuid.uuid4())
self.service_id = str(uuid.uuid4())
self.auth_token = str(uuid.uuid4())
self.service_json = {
"name": self.service_name,
"domains": [
@@ -105,7 +123,10 @@ class DefaultManagerServiceTests(base.TestCase):
]
}
],
"flavor_id": "standard"
"flavor_id": "standard",
"log_delivery": {
"enabled": False
},
}
self.service_obj = service.load_from_json(self.service_json)
@@ -162,14 +183,65 @@ class DefaultManagerServiceTests(base.TestCase):
create_dns = create_service_tasks.CreateServiceDNSMappingTask()
dns_responder = create_dns.execute(responders, 0)
gather_provider = create_service_tasks.GatherProviderDetailsTask()
log_responder = \
create_service_tasks.CreateLogDeliveryContainerTask()
provider_details_dict = \
gather_provider.execute(responders, dns_responder)
gather_provider.execute(responders,
dns_responder,
log_responder)
update_provider_details = common.UpdateProviderDetailTask()
update_provider_details.execute(provider_details_dict,
self.project_id, self.service_id)
bootstrap_mock_create()
def mock_update_service(self, provider_details_json):
@mock.patch('poppy.bootstrap.Bootstrap')
def bootstrap_mock_update(mock_bootstrap):
mock_bootstrap.return_value = self.bootstrap_obj
service_old = json.dumps(self.service_json)
update_provider = update_service_tasks.UpdateProviderServicesTask()
service_updates = self.service_json.copy()
service_updates['log_delivery'] = {
'enabled': True
}
service_updates_json = json.dumps(service_updates)
responders = update_provider.execute(
service_old,
service_updates_json
)
update_dns = update_service_tasks.UpdateServiceDNSMappingTask()
dns_responder = update_dns.execute(responders, 0, service_old,
service_updates_json)
log_delivery_update = \
update_service_tasks.UpdateLogDeliveryContainerTask()
log_responder = log_delivery_update.execute(self.project_id,
self.auth_token,
service_old,
service_updates_json)
gather_provider = update_service_tasks.GatherProviderDetailsTask()
provider_details_dict = \
gather_provider.execute(responders,
dns_responder,
log_responder,
self.project_id,
self.service_id,
service_old)
update_provider_details = \
update_service_tasks.UpdateProviderDetailsTask_Errors()
update_provider_details.execute(provider_details_dict,
self.project_id,
self.service_id,
service_old,
service_updates_json)
bootstrap_mock_update()
def test_create(self):
# fake one return value
self.sc.flavor_controller.get.return_value = flavor.Flavor(
@@ -204,7 +276,7 @@ class DefaultManagerServiceTests(base.TestCase):
elif name == "fastly":
return_mock = mock.Mock(
return_value={
'Fastly': {'error': "fail to create servcice",
'Fastly': {'error': "fail to create service",
'error_detail': 'Fastly Create failed'
' because of XYZ'}})
@@ -233,7 +305,9 @@ class DefaultManagerServiceTests(base.TestCase):
providers.__getitem__.side_effect = get_provider_extension_by_name
service_obj = self.sc.create(self.project_id, self.service_json)
service_obj = self.sc.create(self.project_id,
self.auth_token,
self.service_json)
# ensure the manager calls the storage driver with the appropriate data
self.sc.storage_controller.create.assert_called_once_with(
@@ -311,6 +385,131 @@ class DefaultManagerServiceTests(base.TestCase):
self.mock_create_service(provider_details_json)
@ddt.file_data('data_provider_details.json')
def test_update_service_worker_success_and_failure(self,
provider_details_json):
self.provider_details = {}
for provider_name in provider_details_json:
provider_detail_dict = json.loads(
provider_details_json[provider_name]
)
provider_service_id = provider_detail_dict.get('id', None)
access_urls = provider_detail_dict.get('access_urls', None)
status = provider_detail_dict.get('status', u'deployed')
provider_detail_obj = provider_details.ProviderDetail(
provider_service_id=provider_service_id,
access_urls=access_urls,
status=status)
self.provider_details[provider_name] = provider_detail_obj
providers = self.sc._driver.providers
def get_provider_extension_by_name(name):
if name == 'cloudfront':
return_mock = {
'CloudFront': {
'id':
'08d2e326-377e-11e4-b531-3c15c2b8d2d6',
'links': [{'href': 'www.mysite.com',
'rel': 'access_url'}],
'status': 'deploy_in_progress'
}
}
service_controller = mock.Mock(
create=mock.Mock(return_value=return_mock)
)
return mock.Mock(obj=mock.Mock(
provider_name='CloudFront',
service_controller=service_controller)
)
elif name == 'fastly':
return_mock = {
'Fastly': {'error': "fail to create servcice",
'error_detail': 'Fastly Create failed'
' because of XYZ'}
}
service_controller = mock.Mock(
create=mock.Mock(return_value=return_mock)
)
return mock.Mock(obj=mock.Mock(
provider_name='MaxCDN',
service_controller=service_controller)
)
else:
return_mock = {
name.title(): {
'id':
'08d2e326-377e-11e4-b531-3c15c2b8d2d6',
'links': [
{'href': 'www.mysite.com',
'rel': 'access_url'}]
}
}
service_controller = mock.Mock(
create=mock.Mock(return_value=return_mock)
)
return mock.Mock(obj=mock.Mock(
provider_name=name.title(),
service_controller=service_controller)
)
providers.__getitem__.side_effect = get_provider_extension_by_name
conf = cfg.CONF
conf(project='poppy', prog='poppy', args=[])
LOG_DELIVERY_OPTIONS = [
cfg.ListOpt('preferred_dcs', default=['DC1', 'DC2'],
help='Preferred DCs to create container'),
]
LOG_DELIVERY_GROUP = 'log_delivery'
conf.register_opts(LOG_DELIVERY_OPTIONS, group=LOG_DELIVERY_GROUP)
region = random.choice(conf['log_delivery']['preferred_dcs'])
catalog_json = {
'access': {
'serviceCatalog': [
{
'type': 'object-store',
'endpoints': [
{
'region': region,
'publicURL': 'public',
'internalURL': 'private'
}
]
}
]
}
}
# NOTE(TheSriram): Successful update
with mock.patch.object(requests,
'post',
return_value=Response(True, catalog_json)):
with mock.patch.object(requests,
'put',
return_value=Response(True)):
self.mock_update_service(provider_details_json)
# NOTE(TheSriram): Unsuccessful update due to keystone
with mock.patch.object(requests,
'post',
return_value=Response(False, catalog_json)):
with mock.patch.object(requests,
'put',
return_value=Response(True)):
self.mock_update_service(provider_details_json)
# NOTE(TheSriram): Unsuccessful update due to swift
with mock.patch.object(requests,
'post',
return_value=Response(True, catalog_json)):
with mock.patch.object(requests,
'put',
return_value=Response(False)):
self.mock_update_service(provider_details_json)
@ddt.file_data('service_update.json')
def test_update(self, update_json):
provider_details_dict = {
@@ -347,7 +546,10 @@ class DefaultManagerServiceTests(base.TestCase):
}
])
self.sc.update(self.project_id, self.service_id, service_updates)
self.sc.update(self.project_id,
self.service_id,
self.auth_token,
service_updates)
# ensure the manager calls the storage driver with the appropriate data
self.sc.storage_controller.update.assert_called_once()