Implemented TaskFlow distributed task driver to replace subprocess

Change-Id: Ic64ea2a1f3cb0b8e0b5fa952c39ba95884958e17
This commit is contained in:
tonytan4ever 2015-01-23 16:24:27 -05:00 committed by Sriram Madapusi Vasudevan
parent 938dc1f17d
commit 32fc357d48
35 changed files with 973 additions and 291 deletions

View File

@ -100,12 +100,23 @@ installed and running.
$ pip install -e .
8. Install and start zookeeper driver::
8. Start the Poppy server::
http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html
or more easily use a zookeeper docker:
https://registry.hub.docker.com/u/jplock/zookeeper/
9. Start poppy task flow worker::
$ poppy-worker
10. Start the Poppy server::
$ poppy-server
9. Test out that Poppy is working by requesting the home doc (with a sample project ID)::
11. Test out that Poppy is working by requesting the home doc (with a sample project ID)::
$ curl -i -X GET http://0.0.0.0:8888/v1.0/123

View File

@ -46,6 +46,9 @@ providers = mock,fastly,akamai
# DNS driver module (e.g. default, designate, rackspace)
dns = default
# distributed_task driver module (e.g. TaskFlow, Celery, OsloMessaging)
distributed_task = taskflow
[drivers:transport:limits]
max_services_per_page = 20
@ -77,6 +80,14 @@ replication_strategy = class:SimpleStrategy, replication_factor:1
[drivers:storage:mockdb]
database = poppy
[drivers:distributed_task:taskflow]
jobboard_backend_type = zookeeper
persistent_backend_type = zookeeper
jobboard_backend_host = <your_transport_server(s)>
jobboard_backend_port = <your_transport_port>
persistent_backend_host = <your_transport_server(s)>
persistent_backend_port = <your_transport_port>
[drivers:dns:rackspace]
username = "<operator_username>"

View File

@ -53,6 +53,8 @@ _DRIVER_OPTIONS = [
help='Provider driver(s) to use'),
cfg.StrOpt('dns', default='default',
help='DNS driver to use'),
cfg.StrOpt('distributed_task', default='taskflow',
help='distributed_task driver to use'),
]
_DRIVER_GROUP = 'drivers'
@ -153,7 +155,8 @@ class Bootstrap(object):
manager_type = 'poppy.manager'
manager_name = self.driver_conf.manager
args = [self.conf, self.storage, self.provider, self.dns]
args = [self.conf, self.storage, self.provider, self.dns,
self.distributed_task]
try:
mgr = driver.DriverManager(namespace=manager_type,
@ -189,5 +192,31 @@ class Bootstrap(object):
except RuntimeError as exc:
LOG.exception(exc)
@decorators.lazy_property(write=False)
def distributed_task(self):
"""distributed task driver.
:returns distributed task driver
"""
LOG.debug("loading distributed task")
# create the driver manager to load the appropriate drivers
distributed_task_type = 'poppy.distributed_task'
distributed_task_name = self.driver_conf.distributed_task
args = [self.conf]
LOG.debug((u'Loading distributed_task driver: %s'),
distributed_task_name)
try:
mgr = driver.DriverManager(namespace=distributed_task_type,
name=distributed_task_name,
invoke_on_load=True,
invoke_args=args)
return mgr.driver
except RuntimeError as exc:
LOG.exception(exc)
def run(self):
self.transport.listen()

View File

@ -13,30 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import subprocess
import sys
try:
import uwsgi
use_uwsgi = True
except ImportError:
use_uwsgi = False
from oslo.config import cfg
from poppy import bootstrap
from poppy.openstack.common import log
LOG = log.getLogger(__name__)
if use_uwsgi:
executable = os.path.join(uwsgi.opt['virtualenv'], 'bin', 'python')
else:
executable = sys.executable
def run():
conf = cfg.CONF
conf(project='poppy', prog='poppy', args=[])
def main(*args):
cmd_list = [executable] + list(args[1:])
LOG.info("Starting subprocess %s")
subprocess.Popen(cmd_list, stdout=sys.stdout, env=os.environ.copy())
sys.exit()
if __name__ == '__main__':
main(*sys.argv)
b = bootstrap.Bootstrap(conf)
b.distributed_task.services_controller.run_task_worker()

View File

@ -0,0 +1,21 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from poppy.distributed_task.base import driver
from poppy.distributed_task.base import services
Driver = driver.DistributedTaskDriverBase
ServicesController = services.ServicesControllerBase

View File

@ -0,0 +1,31 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class DistributedTaskControllerBase(object):
"""Top-level class for controllers.
:param driver: Instance of the driver
instantiating this controller.
"""
def __init__(self, driver):
self._driver = driver

View File

@ -0,0 +1,37 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class DistributedTaskDriverBase(object):
"""Interface definition for distributed task queue driver.
:param conf: Configuration containing options for this driver.
:type conf: `oslo.config.ConfigOpts`
"""
def __init__(self, conf):
self._conf = conf
@property
def conf(self):
"""conf
:returns conf
"""
return self._conf

View File

@ -0,0 +1,36 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import six
from poppy.distributed_task.base import controller
@six.add_metaclass(abc.ABCMeta)
class ServicesControllerBase(controller.DistributedTaskControllerBase):
"""Services Controller Base class."""
def __init__(self, driver):
super(ServicesControllerBase, self).__init__(driver)
def submit_task(self):
"""submit a task .
:raises NotImplementedError
"""
raise NotImplementedError

View File

@ -0,0 +1,21 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""TaskFlow distributed task driver for CDN"""
from poppy.distributed_task.taskflow import driver
# Hoist classes into package namespace
Driver = driver.TaskFlowDistributedTaskDriver

View File

@ -0,0 +1,28 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Exports TaskFlow distributed task controllers.
Field Mappings:
In order to reduce the disk / memory space used,
fields name will be, most of the time, the first
letter of their long name. Fields mapping will be
updated and documented in each controller class.
"""
from poppy.distributed_task.taskflow import services
ServicesController = services.ServicesController

View File

@ -0,0 +1,102 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from oslo.config import cfg
from taskflow.jobs import backends as job_backends
from taskflow.persistence import backends as persistence_backends
from poppy.distributed_task import base
from poppy.distributed_task.taskflow import controllers
LOG = logging.getLogger(__name__)
TASKFLOW_OPTIONS = [
cfg.StrOpt('jobboard_backend_type', default='zookeeper',
help='Default jobboard backend type'),
cfg.StrOpt('persistent_backend_type', default='zookeeper',
help='Default jobboard persistent backend type'),
cfg.ListOpt('jobboard_backend_host', default=['localhost'],
help='default jobboard backend server host'),
cfg.IntOpt('jobboard_backend_port', default=2181, help='default'
' jobboard backend server port (e.g: ampq)'),
cfg.ListOpt('persistent_backend_host', default=['localhost'],
help='default persistent backend server host'),
cfg.IntOpt('persistent_backend_port', default=2181, help='default'
' default persistent backend server port (e.g: ampq)'),
]
TASKFLOW_GROUP = 'drivers:distributed_task:taskflow'
class TaskFlowDistributedTaskDriver(base.Driver):
"""TaskFlow distributed task Driver."""
def __init__(self, conf):
super(TaskFlowDistributedTaskDriver, self).__init__(conf)
conf.register_opts(TASKFLOW_OPTIONS, group=TASKFLOW_GROUP)
self.distributed_task_conf = conf[TASKFLOW_GROUP]
job_backends_hosts = ','.join(['%s:%s' % (
host, self.distributed_task_conf.jobboard_backend_port)
for host in
self.distributed_task_conf.jobboard_backend_host])
self.jobboard_backend_conf = {
# This topic could become more complicated
"board": self.distributed_task_conf.jobboard_backend_type,
"hosts": job_backends_hosts,
"path": "/taskflow/jobs/poppy_service_jobs",
}
persistence_backends_hosts = ','.join(['%s:%s' % (
host, self.distributed_task_conf.jobboard_backend_port)
for host in
self.distributed_task_conf.jobboard_backend_host])
self.persistence_backend_conf = {
# This topic could become more complicated
"connection": self.distributed_task_conf.persistent_backend_type,
"hosts": persistence_backends_hosts,
}
def is_alive(self):
"""Health check for TaskFlow worker."""
return True
def persistence(self):
return persistence_backends.backend(
self.persistence_backend_conf.copy())
def job_board(self, conf, persistence, **kwargs):
return job_backends.backend(
'poppy_service_jobs',
conf.copy(), persistence=persistence)
@property
def vendor_name(self):
"""storage name.
:returns 'TaskFlow'
"""
return 'TaskFlow'
@property
def services_controller(self):
"""services_controller.
:returns service controller
"""
return controllers.ServicesController(self)

View File

@ -13,26 +13,33 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import json
import logging
import os
import sys
from oslo.config import cfg
from taskflow.patterns import linear_flow
from taskflow import task
from poppy import bootstrap
from poppy.model.helpers import provider_details
from poppy.openstack.common import log
LOG = log.getLogger(__file__)
logging.basicConfig(level=logging.ERROR,
format='%(levelname)s: %(message)s',
stream=sys.stdout)
LOG = logging.getLogger('Poppy Service Tasks')
LOG.setLevel(logging.DEBUG)
conf = cfg.CONF
conf(project='poppy', prog='poppy', args=[])
def service_create_worker(providers_list_json, project_id, service_id):
LOG.logger.setLevel(logging.INFO)
def service_create_task_func(providers_list_json,
project_id, service_id):
bootstrap_obj = bootstrap.Bootstrap(conf)
service_controller = bootstrap_obj.manager.services_controller
storage_controller = service_controller.storage_controller
@ -41,16 +48,17 @@ def service_create_worker(providers_list_json, project_id, service_id):
try:
service_obj = storage_controller.get(project_id, service_id)
except ValueError:
LOG.info('Creating service {0} from Poppy failed. '
'No such service exists'.format(service_id))
sys.exit(0)
msg = 'Creating service {0} from Poppy failed. ' \
'No such service exists'.format(service_id)
LOG.info(msg)
raise Exception(msg)
responders = []
# try to create all service from each provider
for provider in providers_list:
LOG.info('Starting to create service from %s' % provider)
responder = service_controller.provider_wrapper.create(
service_controller._driver.providers[provider],
service_controller._driver.providers[provider.lower()],
service_obj)
responders.append(responder)
LOG.info('Create service from %s complete...' % provider)
@ -97,23 +105,24 @@ def service_create_worker(providers_list_json, project_id, service_id):
storage_controller.update(project_id, service_id, service_obj)
storage_controller._driver.close_connection()
LOG.info('Create service worker process %s complete...' %
str(os.getpid()))
LOG.info('Create service worker task complete...')
if __name__ == '__main__':
bootstrap_obj = bootstrap.Bootstrap(conf)
class CreateServiceTask(task.Task):
default_provides = "service_created"
parser = argparse.ArgumentParser(description='Create service async worker'
' script arg parser')
def execute(self, providers_list_json,
project_id, service_id):
LOG.info('Start executing create service task...')
service_create_task_func(
providers_list_json,
project_id, service_id)
return True
parser.add_argument('providers_list_json', action="store")
parser.add_argument('project_id', action="store")
parser.add_argument('service_id', action="store")
result = parser.parse_args()
providers_list_json = result.providers_list_json
project_id = result.project_id
service_id = result.service_id
LOG.logger.setLevel(logging.INFO)
service_create_worker(providers_list_json, project_id, service_id)
def create_service():
flow = linear_flow.Flow('Creating poppy-service').add(
CreateServiceTask(),
)
return flow

View File

@ -13,23 +13,32 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import logging
import os
import sys
from oslo.config import cfg
from taskflow.patterns import linear_flow
from taskflow import task
from poppy import bootstrap
from poppy.openstack.common import log
LOG = log.getLogger(__file__)
logging.basicConfig(level=logging.ERROR,
format='%(levelname)s: %(message)s',
stream=sys.stdout)
LOG = logging.getLogger('Poppy Service Tasks')
LOG.setLevel(logging.DEBUG)
conf = cfg.CONF
conf(project='poppy', prog='poppy', args=[])
def service_delete_worker(project_id, service_id):
LOG.logger.setLevel(logging.INFO)
def service_delete_task_func(provider_details,
project_id, service_id):
bootstrap_obj = bootstrap.Bootstrap(conf)
service_controller = bootstrap_obj.manager.services_controller
storage_controller = service_controller.storage_controller
@ -98,16 +107,18 @@ def service_delete_worker(project_id, service_id):
str(os.getpid()))
if __name__ == '__main__':
bootstrap_obj = bootstrap.Bootstrap(conf)
class DeleteServiceTask(task.Task):
default_provides = "service_deleted"
parser = argparse.ArgumentParser(description='Delete service async worker'
' script arg parser')
def execute(self, provider_details, project_id, service_id):
LOG.info('Start executing delete service task...')
service_delete_task_func(provider_details,
project_id, service_id)
return True
parser.add_argument('project_id', action="store")
parser.add_argument('service_id', action="store")
result = parser.parse_args()
project_id = result.project_id
service_id = result.service_id
service_delete_worker(project_id, service_id)
def delete_service():
flow = linear_flow.Flow('Deleting poppy-service').add(
DeleteServiceTask(),
)
return flow

View File

@ -13,27 +13,34 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import json
import logging
import os
import sys
from oslo.config import cfg
from taskflow.patterns import linear_flow
from taskflow import task
from poppy import bootstrap
from poppy.openstack.common import log
from poppy.transport.pecan.models.request import (
provider_details as req_provider_details
)
LOG = log.getLogger(__file__)
logging.basicConfig(level=logging.ERROR,
format='%(levelname)s: %(message)s',
stream=sys.stdout)
LOG = logging.getLogger('Poppy Service Tasks')
LOG.setLevel(logging.DEBUG)
conf = cfg.CONF
conf(project='poppy', prog='poppy', args=[])
def service_purge_worker(provider_details,
project_id, service_id, purge_url):
LOG.logger.setLevel(logging.INFO)
def service_purge_task_func(provider_details,
project_id, service_id, purge_url):
bootstrap_obj = bootstrap.Bootstrap(conf)
service_controller = bootstrap_obj.manager.services_controller
provider_details = json.loads(provider_details)
@ -96,20 +103,18 @@ def service_purge_worker(provider_details,
str(os.getpid()))
if __name__ == '__main__':
bootstrap_obj = bootstrap.Bootstrap(conf)
class PurgeServiceTask(task.Task):
default_provides = "service_purged"
parser = argparse.ArgumentParser(description='Delete service async worker'
' script arg parser')
def execute(self, provider_details, project_id, service_id, purge_url):
LOG.info('Start executing purge service task...')
service_purge_task_func(
provider_details, project_id, service_id, purge_url)
return True
parser.add_argument('provider_details', action="store")
parser.add_argument('project_id', action="store")
parser.add_argument('service_id', action="store")
parser.add_argument('purge_url', action="store")
result = parser.parse_args()
provider_details = result.provider_details
project_id = result.project_id
service_id = result.service_id
purge_url = result.purge_url
service_purge_worker(provider_details, project_id, service_id, purge_url)
def purge_service():
flow = linear_flow.Flow('Purge poppy-service').add(
PurgeServiceTask(),
)
return flow

View File

@ -13,26 +13,34 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import json
import logging
import os
import sys
from oslo.config import cfg
from taskflow.patterns import linear_flow
from taskflow import task
from poppy import bootstrap
from poppy.model.helpers import provider_details
from poppy.openstack.common import log
from poppy.transport.pecan.models.request import service
LOG = log.getLogger(__file__)
logging.basicConfig(level=logging.ERROR,
format='%(levelname)s: %(message)s',
stream=sys.stdout)
LOG = logging.getLogger('Poppy Service Tasks')
LOG.setLevel(logging.DEBUG)
conf = cfg.CONF
conf(project='poppy', prog='poppy', args=[])
def update_worker(project_id, service_id,
service_old, service_obj):
LOG.logger.setLevel(logging.INFO)
def service_update_task_func(project_id, service_id,
service_old, service_obj):
bootstrap_obj = bootstrap.Bootstrap(conf)
service_controller = bootstrap_obj.manager.services_controller
@ -118,20 +126,19 @@ def update_worker(project_id, service_id,
str(os.getpid()))
if __name__ == '__main__':
bootstrap_obj = bootstrap.Bootstrap(conf)
class UpdateServiceTask(task.Task):
default_provides = "service_updated"
parser = argparse.ArgumentParser(description='Delete service async worker'
' script arg parser')
def execute(self, project_id, service_id, service_old, service_obj):
LOG.info('Start executing update service task...')
service_update_task_func(
project_id, service_id,
service_old, service_obj)
return True
parser.add_argument('project_id', action="store")
parser.add_argument('service_id', action="store")
parser.add_argument('service_old', action="store")
parser.add_argument('service_obj', action="store")
result = parser.parse_args()
project_id = result.project_id
service_id = result.service_id
service_old = result.service_old
service_obj = result.service_obj
update_worker(project_id, service_id, service_old, service_obj)
def update_service():
flow = linear_flow.Flow('Updating poppy-service').add(
UpdateServiceTask(),
)
return flow

View File

@ -0,0 +1,84 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.utils import uuidutils
from taskflow.conductors import single_threaded
from taskflow import engines
from taskflow.persistence import logbook
from poppy.distributed_task import base
from poppy.openstack.common import log
LOG = log.getLogger(__name__)
class ServicesController(base.ServicesController):
def __init__(self, driver):
super(ServicesController, self).__init__(driver)
self.driver = driver
self.jobboard_backend_conf = self.driver.jobboard_backend_conf
@property
def persistence(self):
return self.driver.persistence()
def submit_task(self, flow_factory, **kwargs):
"""submit a task.
"""
with self.persistence as persistence:
with self.driver.job_board(
self.jobboard_backend_conf.copy(),
persistence=persistence) as board:
job_id = uuidutils.generate_uuid()
job_name = '-'.join([flow_factory.__name__, job_id])
job_logbook = logbook.LogBook(job_name)
flow_detail = logbook.FlowDetail(job_name,
uuidutils.generate_uuid())
factory_args = ()
factory_kwargs = {}
engines.save_factory_details(flow_detail, flow_factory,
factory_args, factory_kwargs)
job_logbook.add(flow_detail)
persistence.get_connection().save_logbook(job_logbook)
job_details = {
'store': kwargs
}
job = board.post(job_name,
book=job_logbook,
details=job_details)
LOG.info("%s posted" % (job))
def run_task_worker(self):
"""Run a task flow worker (conductor).
"""
with self.persistence as persistence:
with self.driver.job_board(
self.jobboard_backend_conf.copy(),
persistence=persistence) as board:
conductor = single_threaded.SingleThreadedConductor(
"Poppy service worker conductor", board, persistence,
engine='serial')
conductor.run()

View File

@ -21,11 +21,12 @@ import six
@six.add_metaclass(abc.ABCMeta)
class ManagerDriverBase(object):
"""Base class for driver manager."""
def __init__(self, conf, storage, providers, dns):
def __init__(self, conf, storage, providers, dns, distributed_task):
self._conf = conf
self._storage = storage
self._providers = providers
self._dns = dns
self._distributed_task = distributed_task
@property
def conf(self):
@ -55,6 +56,10 @@ class ManagerDriverBase(object):
def dns(self):
return self._dns
@property
def distributed_task(self):
return self._distributed_task
@abc.abstractproperty
def services_controller(self):
"""Returns the driver's services controller

View File

@ -23,11 +23,11 @@ from poppy.manager.default import controllers
class DefaultManagerDriver(base.Driver):
"""Default Manager Driver."""
def __init__(self, conf, storage, providers, dns):
def __init__(self, conf, storage, providers, dns, distributed_task):
super(DefaultManagerDriver, self).__init__(
conf, storage, providers, dns)
conf, storage, providers, dns, distributed_task)
@decorators.lazy_property(write=False)
@decorators.lazy_property(write=True)
def services_controller(self):
return controllers.Services(self)

View File

@ -14,18 +14,14 @@
# limitations under the License.
import json
import os
import subprocess
import sys
try:
import uwsgi
use_uwsgi = True
except ImportError:
use_uwsgi = False
import jsonpatch
from poppy.common import errors
from poppy.distributed_task.taskflow.flow import create_service
from poppy.distributed_task.taskflow.flow import delete_service
from poppy.distributed_task.taskflow.flow import purge_service
from poppy.distributed_task.taskflow.flow import update_service
from poppy.manager import base
from poppy.model import service
from poppy.openstack.common import log
@ -45,6 +41,8 @@ class DefaultServicesController(base.ServicesController):
self.storage_controller = self._driver.storage.services_controller
self.flavor_controller = self._driver.storage.flavors_controller
self.dns_controller = self._driver.dns.services_controller
self.distributed_task_controller = (
self._driver.distributed_task.services_controller)
def _get_provider_details(self, project_id, service_id):
try:
@ -98,24 +96,12 @@ class DefaultServicesController(base.ServicesController):
except ValueError as e:
raise e
proxy_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'service_async_workers',
'sub_process_proxy.py')
script_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'service_async_workers',
'create_service_worker.py')
if use_uwsgi:
executable = os.path.join(uwsgi.opt['virtualenv'], 'bin', 'python')
else:
executable = sys.executable
cmd_list = [executable,
proxy_path,
script_path,
json.dumps(providers),
project_id, service_id]
LOG.info('Starting create service subprocess: %s' % cmd_list)
p = subprocess.Popen(cmd_list, env=os.environ.copy())
p.communicate()
kwargs = {'providers_list_json': json.dumps(providers),
'project_id': project_id,
'service_id': service_id,
}
self.distributed_task_controller.submit_task(
create_service.create_service, **kwargs)
return
@ -167,25 +153,15 @@ class DefaultServicesController(base.ServicesController):
provider_details[provider].status = u'update_in_progress'
self.storage_controller.update(project_id, service_id, service_old)
proxy_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'service_async_workers',
'sub_process_proxy.py')
script_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'service_async_workers',
'update_service_worker.py')
if use_uwsgi:
executable = os.path.join(uwsgi.opt['virtualenv'], 'bin', 'python')
else:
executable = sys.executable
cmd_list = [executable,
proxy_path,
script_path,
project_id, service_id,
json.dumps(service_old.to_dict()),
json.dumps(service_new.to_dict())]
LOG.info('Starting update service subprocess: %s' % cmd_list)
p = subprocess.Popen(cmd_list, env=os.environ.copy())
p.communicate()
kwargs = {
'project_id': project_id,
'service_id': service_id,
'service_old': json.dumps(service_old.to_dict()),
'service_obj': json.dumps(service_new.to_dict())
}
self.distributed_task_controller.submit_task(
update_service.update_service, **kwargs)
return
@ -198,6 +174,9 @@ class DefaultServicesController(base.ServicesController):
"""
service_obj = self.storage_controller.get(project_id, service_id)
# get provider details for this service
provider_details = self._get_provider_details(project_id, service_id)
# change each provider detail's status to delete_in_progress
for provider in service_obj.provider_details:
service_obj.provider_details[provider].status = (
@ -205,23 +184,15 @@ class DefaultServicesController(base.ServicesController):
self.storage_controller.update(project_id, service_id, service_obj)
proxy_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'service_async_workers',
'sub_process_proxy.py')
script_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'service_async_workers',
'delete_service_worker.py')
if use_uwsgi:
executable = os.path.join(uwsgi.opt['virtualenv'], 'bin', 'python')
else:
executable = sys.executable
cmd_list = [executable,
proxy_path,
script_path,
project_id, service_id]
LOG.info('Starting delete service subprocess: %s' % cmd_list)
p = subprocess.Popen(cmd_list, env=os.environ.copy())
p.communicate()
kwargs = {
"provider_details": json.dumps(
dict([(k, v.to_dict()) for k, v in provider_details.items()])),
"project_id": project_id,
"service_id": service_id
}
self.distributed_task_controller.submit_task(
delete_service.delete_service, **kwargs)
return
@ -230,26 +201,15 @@ class DefaultServicesController(base.ServicesController):
provider_details = self._get_provider_details(project_id, service_id)
# possible validation of purge url here...
proxy_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'service_async_workers',
'sub_process_proxy.py')
script_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'service_async_workers',
'purge_service_worker.py')
if use_uwsgi:
executable = os.path.join(uwsgi.opt['virtualenv'], 'bin', 'python')
else:
executable = sys.executable
cmd_list = [executable,
proxy_path,
script_path,
json.dumps(dict([(k, v.to_dict())
for k, v in provider_details.items()])),
project_id, service_id,
str(purge_url)]
kwargs = {
'provider_details': json.dumps(
dict([(k, v.to_dict()) for k, v in provider_details.items()])),
'project_id': project_id,
'service_id': service_id,
'purge_url': str(purge_url)
}
LOG.info('Starting purge service subprocess: %s' % cmd_list)
p = subprocess.Popen(cmd_list, env=os.environ.copy())
p.communicate()
self.distributed_task_controller.submit_task(
purge_service.purge_service, **kwargs)
return

View File

@ -15,13 +15,14 @@
import copy
import json
import logging
from poppy.common import decorators
from poppy.common import util
from poppy.openstack.common import log
from poppy.provider import base
LOG = log.getLogger(__name__)
# to use log inside worker, we need to directly use logging
LOG = logging.getLogger('Poppy Service Tasks')
class ServiceController(base.ServiceBase):
@ -101,9 +102,8 @@ class ServiceController(base.ServiceBase):
policy_name=dp),
data=json.dumps(post_data),
headers=self.request_header)
# Use print for now as LOG.info will not in subprocess
print('akamai response code: %s' % resp.status_code)
print('akamai response text: %s' % resp.text)
LOG.info('akamai response code: %s' % resp.status_code)
LOG.info('akamai response text: %s' % resp.text)
if resp.status_code != 200:
raise RuntimeError(resp.text)
@ -112,8 +112,8 @@ class ServiceController(base.ServiceBase):
ids.append(dp_obj)
# TODO(tonytan4ever): leave empty links for now
# may need to work with dns integration
print('Creating policy %s on domain %s complete' %
(dp, classified_domain.domain))
LOG.info('Creating policy %s on domain %s complete' %
(dp, classified_domain.domain))
provider_access_url = self._get_provider_access_url(
classified_domain, dp)
links.append({'href': provider_access_url,
@ -161,9 +161,9 @@ class ServiceController(base.ServiceBase):
headers=self.request_header)
# if the policy is not found with provider, create it
if resp.status_code == 404:
print('akamai response code: %s' % resp.status_code)
print('upserting service with akamai: %s' %
service_obj.service_id)
LOG.info('akamai response code: %s' % resp.status_code)
LOG.info('upserting service with'
'akamai: %s' % service_obj.service_id)
return self.create(service_obj)
elif resp.status_code != 200:
raise RuntimeError(resp.text)
@ -224,7 +224,7 @@ class ServiceController(base.ServiceBase):
classified_domain.protocol)):
# in this case we should update existing policy
# instead of create a new policy
print('Start to update policy %s' % dp)
LOG.info('Start to update policy %s' % dp)
# TODO(tonytan4ever): also classify domains based on
# their protocols. http and https domains needs to be
# created with separate base urls.
@ -239,7 +239,7 @@ class ServiceController(base.ServiceBase):
'protocol': classified_domain.protocol}
policies.remove(dp_obj)
else:
print('Start to create new policy %s' % dp)
LOG.info('Start to create new policy %s' % dp)
resp = self.policy_api_client.put(
self.policy_api_base_url.format(
configuration_number=(
@ -247,8 +247,8 @@ class ServiceController(base.ServiceBase):
policy_name=dp),
data=json.dumps(policy_content),
headers=self.request_header)
print('akamai response code: %s' % resp.status_code)
print('akamai response text: %s' % resp.text)
LOG.info('akamai response code: %s' % resp.status_code)
LOG.info('akamai response text: %s' % resp.text)
if resp.status_code != 200:
raise RuntimeError(resp.text)
dp_obj = {'policy_name': dp,
@ -256,8 +256,8 @@ class ServiceController(base.ServiceBase):
ids.append(dp_obj)
# TODO(tonytan4ever): leave empty links for now
# may need to work with dns integration
print('Creating/Updating policy %s on domain %s '
'complete' % (dp, classified_domain.domain))
LOG.info('Creating/Updating policy %s on domain %s '
'complete' % (dp, classified_domain.domain))
provider_access_url = self._get_provider_access_url(
classified_domain, dp)
links.append({'href': provider_access_url,
@ -272,18 +272,18 @@ class ServiceController(base.ServiceBase):
configuration_number = self._get_configuration_number(
util.dict2obj(policy))
print('Starting to delete old policy %s' %
policy['policy_name'])
LOG.info('Starting to delete old policy %s' %
policy['policy_name'])
resp = self.policy_api_client.delete(
self.policy_api_base_url.format(
configuration_number=configuration_number,
policy_name=policy['policy_name']))
print('akamai response code: %s' % resp.status_code)
print('akamai response text: %s' % resp.text)
LOG.info('akamai response code: %s' % resp.status_code)
LOG.info('akamai response text: %s' % resp.text)
if resp.status_code != 200:
raise RuntimeError(resp.text)
print('Delete old policy %s complete' %
policy['policy_name'])
LOG.info('Delete old policy %s complete' %
policy['policy_name'])
except Exception:
return self.responder.failed("failed to update service")
@ -335,17 +335,17 @@ class ServiceController(base.ServiceBase):
# post new policies back with Akamai Policy API
try:
print('Start to update policy %s ' % policy)
LOG.info('Start to update policy %s ' % policy)
resp = self.policy_api_client.put(
self.policy_api_base_url.format(
configuration_number=configuration_number,
policy_name=policy['policy_name']),
data=json.dumps(policy_content),
headers=self.request_header)
print('akamai response code: %s' % resp.status_code)
print('akamai response text: %s' % resp.text)
print('Update policy %s complete' %
policy['policy_name'])
LOG.info('akamai response code: %s' % resp.status_code)
LOG.info('akamai response text: %s' % resp.text)
LOG.info('Update policy %s complete' %
policy['policy_name'])
except Exception:
return self.responder.failed("failed to update service")
provider_access_url = self._get_provider_access_url(
@ -371,7 +371,7 @@ class ServiceController(base.ServiceBase):
return self.responder.failed(str(e))
try:
for policy in policies:
print('Starting to delete policy %s' % policy)
LOG.info('Starting to delete policy %s' % policy)
# TODO(tonytan4ever): needs to look at if service
# domain is an https domain, if it is then a different
# base url is needed
@ -382,8 +382,8 @@ class ServiceController(base.ServiceBase):
self.policy_api_base_url.format(
configuration_number=configuration_number,
policy_name=policy['policy_name']))
print('akamai response code: %s' % resp.status_code)
print('akamai response text: %s' % resp.text)
LOG.info('akamai response code: %s' % resp.status_code)
LOG.info('akamai response text: %s' % resp.text)
if resp.status_code != 200:
raise RuntimeError(resp.text)
except Exception as e:

View File

@ -34,4 +34,5 @@ from poppy import bootstrap
conf = cfg.CONF
conf(project='poppy', prog='poppy', args=[])
app = bootstrap.Bootstrap(conf).transport.app

View File

@ -0,0 +1,3 @@
taskflow
-e git+https://github.com/python-zk/kazoo.git#egg=kazoo
zake

View File

@ -33,6 +33,7 @@ upload-dir = doc/build/html
[entry_points]
console_scripts =
poppy-server = poppy.cmd.server:run
poppy-worker = poppy.cmd.task_flow_worker:run
poppy.transport =
pecan = poppy.transport.pecan:Driver
@ -56,6 +57,10 @@ poppy.provider =
maxcdn = poppy.provider.maxcdn:Driver
akamai = poppy.provider.akamai:Driver
poppy.distributed_task =
taskflow = poppy.distributed_task.taskflow:Driver
[wheel]
universal = 1

View File

@ -43,8 +43,8 @@ class TestBase(fixtures.BaseTestFixture):
cls.auth_config.user_name,
cls.auth_config.api_key)
else:
auth_token = 'dummy'
project_id = 'dummy'
auth_token = str(uuid.uuid4())
project_id = str(uuid.uuid4())
cls.test_config = config.TestConfig()

View File

@ -19,7 +19,7 @@ server = {
}
app = {
'root': 'tests.functional.transport.pecan.mock.MockPecanEndpoint',
'root': 'tests.functional.transport.pecan.mock_ep.MockPecanEndpoint',
'modules': ['tests.functional.transport.pecan.pecan_app'],
'debug': True,
'errors': {

View File

@ -15,6 +15,7 @@
import os
import mock
from oslo.config import cfg
import webtest
@ -33,7 +34,17 @@ class BaseFunctionalTest(base.TestCase):
))))
conf_path = os.path.join(tests_path, 'etc', 'default_functional.conf')
cfg.CONF(args=[], default_config_files=[conf_path])
poppy_wsgi = bootstrap.Bootstrap(cfg.CONF).transport.app
b_obj = bootstrap.Bootstrap(cfg.CONF)
# mock the persistence part for taskflow distributed_task
mock_persistence = mock.Mock()
mock_persistence.__enter__ = mock.Mock()
mock_persistence.__exit__ = mock.Mock()
b_obj.distributed_task.persistence = mock.Mock()
b_obj.distributed_task.persistence.return_value = mock_persistence
b_obj.distributed_task.job_board = mock.Mock()
b_obj.distributed_task.job_board.return_value = (
mock_persistence.copy())
poppy_wsgi = b_obj.transport.app
self.app = webtest.app.TestApp(poppy_wsgi)

View File

View File

@ -0,0 +1,44 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unittests for TaskFlow distributed_task driver implementation."""
from oslo.config import cfg
from poppy.distributed_task.taskflow import driver
from tests.unit import base
class TestDriver(base.TestCase):
def setUp(self):
super(TestDriver, self).setUp()
self.conf = cfg.ConfigOpts()
self.distributed_task_driver = (
driver.TaskFlowDistributedTaskDriver(self.conf))
def test_init(self):
self.assertTrue(self.distributed_task_driver is not None)
def test_vendor_name(self):
self.assertEqual('TaskFlow', self.distributed_task_driver.vendor_name)
def test_is_alive(self):
self.assertEqual(True, self.distributed_task_driver.is_alive())
def test_service_contoller(self):
self.assertTrue(self.distributed_task_driver.services_controller
is not None)

View File

@ -0,0 +1,57 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unittests for TaskFlow distributed_task service_controller."""
import mock
from oslo.config import cfg
from poppy.distributed_task.taskflow import driver
from tests.unit import base
class TestServiceController(base.TestCase):
def setUp(self):
super(TestServiceController, self).setUp()
self.conf = cfg.ConfigOpts()
self.distributed_task_driver = (
driver.TaskFlowDistributedTaskDriver(self.conf))
self.mock_persistence_n_board = mock.Mock()
self.mock_persistence_n_board.__enter__ = mock.Mock()
self.mock_persistence_n_board.__exit__ = mock.Mock()
self.distributed_task_driver.persistence = mock.Mock()
self.distributed_task_driver.persistence.return_value = (
self.mock_persistence_n_board)
self.distributed_task_driver.job_board = mock.Mock()
self.distributed_task_driver.job_board.return_value = (
self.mock_persistence_n_board.copy())
def test_persistence(self):
self.assertTrue(self.distributed_task_driver.persistence is not None)
def test_submit_task(self):
flow_factory = mock.Mock()
self.distributed_task_driver.services_controller.submit_task(
flow_factory,
**{})
self.mock_persistence_n_board.get_connection.assert_called()
self.mock_persistence_n_board.post.assert_called()
def test_run_task_worker(self):
self.distributed_task_driver.services_controller.run_task_worker()
self.distributed_task_driver.persistence.assert_called()
self.distributed_task_driver.job_board.assert_called()

View File

@ -29,7 +29,9 @@ class DefaultManagerFlavorTests(base.TestCase):
@mock.patch('poppy.storage.base.driver.StorageDriverBase')
@mock.patch('poppy.provider.base.driver.ProviderDriverBase')
@mock.patch('poppy.dns.base.driver.DNSDriverBase')
def setUp(self, mock_driver, mock_provider, mock_dns):
@mock.patch('poppy.distributed_task.base.driver.DistributedTaskDriverBase')
def setUp(self, mock_driver, mock_provider, mock_dns,
mock_distributed_task):
super(DefaultManagerFlavorTests, self).setUp()
# create mocked config and driver
@ -42,7 +44,8 @@ class DefaultManagerFlavorTests(base.TestCase):
manager_driver = driver.DefaultManagerDriver(cfg.CONF,
mock_driver,
mock_provider,
mock_dns)
mock_dns,
mock_distributed_task)
# stubbed driver
self.fc = flavors.DefaultFlavorsController(manager_driver)

View File

@ -20,8 +20,13 @@ import ddt
import mock
from oslo.config import cfg
from poppy.distributed_task.taskflow.flow import create_service
from poppy.distributed_task.taskflow.flow import delete_service
from poppy.distributed_task.taskflow.flow import purge_service
from poppy.manager.default import driver
from poppy.manager.default.service_async_workers import purge_service_worker
from poppy.manager.default import services
from poppy.model import flavor
from poppy.model.helpers import provider_details
@ -32,13 +37,18 @@ from tests.unit import base
@ddt.ddt
class DefaultManagerServiceTests(base.TestCase):
@mock.patch('poppy.bootstrap.Bootstrap')
@mock.patch('poppy.dns.base.driver.DNSDriverBase')
@mock.patch('poppy.storage.base.driver.StorageDriverBase')
def setUp(self, mock_storage, mock_dns):
@mock.patch('poppy.distributed_task.base.driver.DistributedTaskDriverBase')
def setUp(self, mock_bootstrap, mock_dns, mock_storage,
mock_distributed_task):
super(DefaultManagerServiceTests, self).setUp()
# create mocked config and driver
conf = cfg.ConfigOpts()
self.bootstrap_obj = mock_bootstrap(conf)
# mock a stevedore provider extension
def get_provider_by_name(name):
@ -55,11 +65,13 @@ class DefaultManagerServiceTests(base.TestCase):
manager_driver = driver.DefaultManagerDriver(conf,
mock_storage,
mock_providers,
mock_dns)
mock_dns,
mock_distributed_task)
# stubbed driver
self.sc = services.DefaultServicesController(manager_driver)
self.bootstrap_obj.manager = manager_driver
self.bootstrap_obj.manager.services_controller = self.sc
self.project_id = str(uuid.uuid4())
self.service_name = str(uuid.uuid4())
self.service_id = str(uuid.uuid4())
@ -98,6 +110,40 @@ class DefaultManagerServiceTests(base.TestCase):
self.service_obj = service.load_from_json(self.service_json)
@mock.patch('poppy.bootstrap.Bootstrap')
def mock_purge_service(self, mock_bootstrap):
mock_bootstrap.return_value = self.bootstrap_obj
purge_service.service_purge_task_func(
json.dumps(dict([(k, v.to_dict())
for k, v in
self.provider_details.items()])),
self.project_id,
self.service_id,
str(None))
@mock.patch('poppy.bootstrap.Bootstrap')
def mock_delete_service(self, mock_bootstrap):
mock_bootstrap.return_value = self.bootstrap_obj
delete_service.service_delete_task_func(
json.dumps(dict([(k, v.to_dict())
for k, v in
self.provider_details.items()])),
self.project_id,
self.service_id)
def mock_create_service(self, provider_details_json):
@mock.patch('poppy.bootstrap.Bootstrap')
def bootstrap_mock_create(mock_bootstrap):
mock_bootstrap.return_value = self.bootstrap_obj
res = create_service.service_create_task_func(
providers_list_json=json.dumps(provider_details_json),
project_id=self.project_id,
service_id=self.service_id,
)
self.assertIsNone(res)
bootstrap_mock_create()
def test_create(self):
service_obj = service.load_from_json(self.service_json)
# fake one return value
@ -187,59 +233,58 @@ class DefaultManagerServiceTests(base.TestCase):
providers = self.sc._driver.providers
def get_provider_extension_by_name(name):
if name == 'cloudfront':
return_mock = {
'CloudFront': {
'id':
'08d2e326-377e-11e4-b531-3c15c2b8d2d6',
'links': [{'href': 'www.mysite.com',
'rel': 'access_url'}],
'status': 'deploy_in_progress'
}
def get_provider_extension_by_name(name):
if name == 'cloudfront':
return_mock = {
'CloudFront': {
'id':
'08d2e326-377e-11e4-b531-3c15c2b8d2d6',
'links': [{'href': 'www.mysite.com',
'rel': 'access_url'}],
'status': 'deploy_in_progress'
}
service_controller = mock.Mock(
create=mock.Mock(return_value=return_mock)
)
return mock.Mock(obj=mock.Mock(
provider_name='CloudFront',
service_controller=service_controller)
)
elif name == 'fastly':
return_mock = {
'Fastly': {'error': "fail to create servcice",
'error_detail': 'Fastly Create failed'
' because of XYZ'}
}
service_controller = mock.Mock(
create=mock.Mock(return_value=return_mock)
)
return mock.Mock(obj=mock.Mock(
provider_name='CloudFront',
service_controller=service_controller)
)
elif name == 'fastly':
return_mock = {
'Fastly': {'error': "fail to create servcice",
'error_detail': 'Fastly Create failed'
' because of XYZ'}
}
service_controller = mock.Mock(
create=mock.Mock(return_value=return_mock)
)
return mock.Mock(obj=mock.Mock(
provider_name='MaxCDN',
service_controller=service_controller)
)
else:
return_mock = {
name.title(): {
'id':
'08d2e326-377e-11e4-b531-3c15c2b8d2d6',
'links': [
{'href': 'www.mysite.com',
'rel': 'access_url'}]
}
service_controller = mock.Mock(
create=mock.Mock(return_value=return_mock)
)
return mock.Mock(obj=mock.Mock(
provider_name='MaxCDN',
service_controller=service_controller)
)
else:
return_mock = {
name.title(): {
'id':
'08d2e326-377e-11e4-b531-3c15c2b8d2d6',
'links': [
{'href': 'www.mysite.com',
'rel': 'access_url'}]
}
}
service_controller = mock.Mock(
create=mock.Mock(return_value=return_mock)
)
return mock.Mock(obj=mock.Mock(
provider_name=name.title(),
service_controller=service_controller)
)
}
service_controller = mock.Mock(
create=mock.Mock(return_value=return_mock)
)
return mock.Mock(obj=mock.Mock(
provider_name=name.title(),
service_controller=service_controller)
)
providers.__getitem__.side_effect = get_provider_extension_by_name
# ensure the manager calls the storage driver with the appropriate data
self.sc.storage_controller.create.assert_called_once()
self.mock_create_service(provider_details_json)
@ddt.file_data('service_update.json')
def test_update(self, update_json):
@ -308,11 +353,140 @@ class DefaultManagerServiceTests(base.TestCase):
self.sc.delete(self.project_id, self.service_id)
# ensure the manager calls the storage driver with the appropriate data
sc.get.assert_called_once_with(self.project_id, self.service_id)
sc.update.assert_called_once_with(self.project_id,
self.service_id,
self.service_obj)
# break into 2 lines.
sc = self.sc.storage_controller
sc.get_provider_details.assert_called_once_with(
self.project_id,
self.service_id)
@ddt.file_data('data_provider_details.json')
def test_detele_service_worker_success(self, provider_details_json):
self.provider_details = {}
for provider_name in provider_details_json:
provider_detail_dict = json.loads(
provider_details_json[provider_name]
)
provider_service_id = provider_detail_dict.get("id", None)
access_urls = provider_detail_dict.get("access_urls", None)
status = provider_detail_dict.get("status", u'deployed')
provider_detail_obj = provider_details.ProviderDetail(
provider_service_id=provider_service_id,
access_urls=access_urls,
status=status)
self.provider_details[provider_name] = provider_detail_obj
providers = self.sc._driver.providers
def get_provider_extension_by_name(name):
if name == 'cloudfront':
return_mock = {
'CloudFront': {
'id':
'08d2e326-377e-11e4-b531-3c15c2b8d2d6',
}
}
service_controller = mock.Mock(
delete=mock.Mock(return_value=return_mock)
)
return mock.Mock(obj=mock.Mock(
provider_name='CloudFront',
service_controller=service_controller)
)
elif name == 'maxcdn':
return_mock = {
'MaxCDN': {'id': "pullzone345"}
}
service_controller = mock.Mock(
delete=mock.Mock(return_value=return_mock)
)
return mock.Mock(obj=mock.Mock(
provider_name='MaxCDN',
service_controller=service_controller)
)
else:
return_mock = {
name.title(): {
'id':
'08d2e326-377e-11e4-b531-3c15c2b8d2d6',
}
}
service_controller = mock.Mock(
delete=mock.Mock(return_value=return_mock)
)
return mock.Mock(obj=mock.Mock(
provider_name=name.title(),
service_controller=service_controller)
)
providers.__getitem__.side_effect = get_provider_extension_by_name
self.mock_delete_service()
@ddt.file_data('data_provider_details.json')
def test_delete_service_worker_with_error(self, provider_details_json):
self.provider_details = {}
for provider_name in provider_details_json:
provider_detail_dict = json.loads(
provider_details_json[provider_name]
)
provider_service_id = provider_detail_dict.get("id", None)
access_urls = provider_detail_dict.get("access_urls", None)
status = provider_detail_dict.get("status", u'deployed')
provider_detail_obj = provider_details.ProviderDetail(
provider_service_id=provider_service_id,
access_urls=access_urls,
status=status)
self.provider_details[provider_name] = provider_detail_obj
providers = self.sc._driver.providers
def get_provider_extension_by_name(name):
if name == 'cloudfront':
return mock.Mock(
obj=mock.Mock(
provider_name='CloudFront',
service_controller=mock.Mock(
delete=mock.Mock(
return_value={
'CloudFront': {
'id':
'08d2e326-377e-11e4-b531-3c15c2b8d2d6',
}}),
)))
elif name == 'maxcdn':
return mock.Mock(obj=mock.Mock(
provider_name='MaxCDN',
service_controller=mock.Mock(
delete=mock.Mock(return_value={
'MaxCDN': {'error': "fail to create servcice",
'error_detail':
'MaxCDN delete service'
' failed because of XYZ'}})
)
))
else:
return mock.Mock(
obj=mock.Mock(
provider_name=name.title(),
service_controller=mock.Mock(
delete=mock.Mock(
return_value={
name.title(): {
'id':
'08d2e326-377e-11e4-b531-3c15c2b8d2d6',
}}),
)))
providers.__getitem__.side_effect = get_provider_extension_by_name
self.mock_delete_service()
@ddt.file_data('data_provider_details.json')
def test_purge(self, provider_details_json):
self.provider_details = {}
@ -400,13 +574,7 @@ class DefaultManagerServiceTests(base.TestCase):
providers.__getitem__.side_effect = get_provider_extension_by_name
purge_service_worker.service_purge_worker(
json.dumps(dict([(k, v.to_dict())
for k, v in
self.provider_details.items()])),
self.project_id,
self.service_id,
str(None))
self.mock_purge_service()
@ddt.file_data('data_provider_details.json')
def test_purge_service_worker_with_error(self, provider_details_json):
@ -454,10 +622,4 @@ class DefaultManagerServiceTests(base.TestCase):
providers.__getitem__.side_effect = get_provider_extension_by_name
purge_service_worker.service_purge_worker(
json.dumps(dict([(k, v.to_dict())
for k, v in
self.provider_details.items()])),
self.project_id,
self.service_id,
str(None))
self.mock_purge_service()