diff --git a/openstack/block_storage/v3/_proxy.py b/openstack/block_storage/v3/_proxy.py index 11b72952d..1fe43d933 100644 --- a/openstack/block_storage/v3/_proxy.py +++ b/openstack/block_storage/v3/_proxy.py @@ -349,3 +349,42 @@ class Proxy(_base_proxy.BaseBlockStorageProxy): to delete failed to occur in the specified seconds. """ return resource.wait_for_delete(self, res, interval, wait) + + def _get_cleanup_dependencies(self): + return { + 'block_storage': { + 'before': [] + } + } + + def _service_cleanup(self, dry_run=True, status_queue=None): + if self._connection.has_service('object-store'): + # Volume backups require object-store to be available, even for + # listing + for obj in self.backups(details=False): + if status_queue: + status_queue.put(obj) + if not dry_run: + self.delete_backup(obj) + + snapshots = [] + for obj in self.snapshots(details=False): + if status_queue: + snapshots.append(obj) + status_queue.put(obj) + if not dry_run: + self.delete_snapshot(obj) + + # Before deleting volumes need to wait for snapshots to be deleted + for obj in snapshots: + try: + self.wait_for_delete(obj) + except exceptions.SDKException: + # Well, did our best, still try further + pass + + for obj in self.volumes(details=False): + if status_queue: + status_queue.put(obj) + if not dry_run: + self.delete_volume(obj) diff --git a/openstack/cloud/openstackcloud.py b/openstack/cloud/openstackcloud.py index 390ca1503..ba37a0824 100755 --- a/openstack/cloud/openstackcloud.py +++ b/openstack/cloud/openstackcloud.py @@ -11,6 +11,7 @@ # limitations under the License. import copy import functools +import queue import six # import types so that we can reference ListType in sphinx param declarations. # We can't just use list, because sphinx gets confused by @@ -35,6 +36,7 @@ from openstack.cloud import _utils import openstack.config from openstack.config import cloud_region as cloud_region_mod from openstack import proxy +from openstack import utils DEFAULT_SERVER_AGE = 5 DEFAULT_PORT_AGE = 5 @@ -749,3 +751,55 @@ class _OpenStackCloudMixin(object): return True else: return False + + def project_cleanup(self, dry_run=True, + wait_timeout=120, status_queue=None): + """Cleanup the project resources. + + Cleanup all resources in all services, which provide cleanup methods. + + :param bool dry_run: Cleanup or only list identified resources. + :param int wait_timeout: Maximum amount of time given to each service + to comlete the cleanup. + :param queue status_queue: a threading queue object used to get current + process status. The queue contain processed resources. + """ + dependencies = {} + get_dep_fn_name = '_get_cleanup_dependencies' + cleanup_fn_name = '_service_cleanup' + if not status_queue: + status_queue = queue.Queue() + for service in self.config.get_enabled_services(): + if hasattr(self, service): + proxy = getattr(self, service) + if (proxy + and hasattr(proxy, get_dep_fn_name) + and hasattr(proxy, cleanup_fn_name)): + deps = getattr(proxy, get_dep_fn_name)() + if deps: + dependencies.update(deps) + dep_graph = utils.TinyDAG() + for k, v in dependencies.items(): + dep_graph.add_node(k) + for dep in v['before']: + dep_graph.add_node(dep) + dep_graph.add_edge(k, dep) + + for service in dep_graph.walk(timeout=wait_timeout): + fn = None + if hasattr(self, service): + proxy = getattr(self, service) + cleanup_fn = getattr(proxy, cleanup_fn_name, None) + if cleanup_fn: + fn = functools.partial(cleanup_fn, dry_run=dry_run, + status_queue=status_queue) + if fn: + self._pool_executor.submit(cleanup_task, dep_graph, + service, fn) + else: + dep_graph.node_done(service) + + +def cleanup_task(graph, service, fn): + fn() + graph.node_done(service) diff --git a/openstack/compute/v2/_proxy.py b/openstack/compute/v2/_proxy.py index 0eef18930..a897509eb 100644 --- a/openstack/compute/v2/_proxy.py +++ b/openstack/compute/v2/_proxy.py @@ -1472,3 +1472,23 @@ class Proxy(proxy.Proxy): server_id = self._get_resource(_server.Server, server).id return self._get(_server_diagnostics.ServerDiagnostics, server_id=server_id, requires_id=False) + + def _get_cleanup_dependencies(self): + return { + 'compute': { + 'before': ['block_storage', 'network', 'identity'] + } + } + + def _service_cleanup(self, dry_run=True, status_queue=None): + for obj in self.servers(details=False): + self._service_cleanup_del_res(self.delete_server, + obj, + dry_run, + status_queue) + + for obj in self.keypairs(): + self._service_cleanup_del_res(self.delete_keypair, + obj, + dry_run, + status_queue) diff --git a/openstack/config/cloud_region.py b/openstack/config/cloud_region.py index b44c89afc..95d6735e0 100644 --- a/openstack/config/cloud_region.py +++ b/openstack/config/cloud_region.py @@ -363,6 +363,21 @@ class CloudRegion(object): services.append("_".join(key.split('_')[:-2])) return list(set(services)) + def get_enabled_services(self): + services = set() + + all_services = [k['service_type'] for k in + self._service_type_manager.services] + all_services.extend(k[4:] for k in + self.config.keys() if k.startswith('has_')) + + for srv in all_services: + ep = self.get_endpoint_from_catalog(srv) + if ep: + services.add(srv.replace('-', '_')) + + return services + def get_auth_args(self): return self.config.get('auth', {}) diff --git a/openstack/connection.py b/openstack/connection.py index 7937326f4..2ee65ec06 100644 --- a/openstack/connection.py +++ b/openstack/connection.py @@ -480,7 +480,7 @@ class Connection( attr_name.replace('-', '_'), property(fget=getter) ) - self.config.enable_service(attr_name) + self.config.enable_service(service.service_type) def authorize(self): """Authorize this Connection diff --git a/openstack/dns/v2/_proxy.py b/openstack/dns/v2/_proxy.py index ff357e927..610e9e830 100644 --- a/openstack/dns/v2/_proxy.py +++ b/openstack/dns/v2/_proxy.py @@ -511,3 +511,14 @@ class Proxy(proxy.Proxy): :rtype: :class:`~openstack.dns.v2.zone_transfer.ZoneTransferAccept` """ return self._create(_zone_transfer.ZoneTransferAccept, **attrs) + + def _get_cleanup_dependencies(self): + # DNS may depend on floating ip + return { + 'dns': { + 'before': ['network'] + } + } + + def _service_cleanup(self, dry_run=True, status_queue=False): + pass diff --git a/openstack/network/v2/_proxy.py b/openstack/network/v2/_proxy.py index f6f69580c..caf7842c7 100644 --- a/openstack/network/v2/_proxy.py +++ b/openstack/network/v2/_proxy.py @@ -3990,3 +3990,49 @@ class Proxy(proxy.Proxy): floatingip = self._get_resource(_floating_ip.FloatingIP, floating_ip) return self._update(_port_forwarding.PortForwarding, port_forwarding, floatingip_id=floatingip.id, **attrs) + + def _get_cleanup_dependencies(self): + return { + 'network': { + 'before': ['identity'] + } + } + + def _service_cleanup(self, dry_run=True, status_queue=None): + for obj in self.ips(): + self._service_cleanup_del_res(self.delete_ip, obj, dry_run, + status_queue) + + for obj in self.security_groups(): + if obj.name != 'default': + self._service_cleanup_del_res( + self.delete_security_group, obj, + dry_run, status_queue) + + for port in self.ports(): + if port.device_owner in ['network:router_interface', + 'network:router_interface_distributed']: + if status_queue: + status_queue.put(obj) + if not dry_run: + try: + self.remove_interface_from_router( + router=port.device_id, + port_id=port.id) + except exceptions.SDKException: + self.log.error('Cannot delete object %s' % obj) + + for obj in self.routers(): + self._service_cleanup_del_res( + self.delete_router, obj, + dry_run, status_queue) + + for obj in self.subnets(): + self._service_cleanup_del_res( + self.delete_subnet, obj, + dry_run, status_queue) + + for obj in self.networks(): + self._service_cleanup_del_res( + self.delete_network, obj, + dry_run, status_queue) diff --git a/openstack/orchestration/v1/_proxy.py b/openstack/orchestration/v1/_proxy.py index b1eeaaf79..37bbaede2 100644 --- a/openstack/orchestration/v1/_proxy.py +++ b/openstack/orchestration/v1/_proxy.py @@ -479,3 +479,21 @@ class Proxy(proxy.Proxy): except Exception as e: raise exceptions.SDKException( "Error in processing template files: %s" % str(e)) + + def _get_cleanup_dependencies(self): + return { + 'orchestration': { + 'before': ['compute', 'network', 'identity'] + } + } + + def _service_cleanup(self, dry_run=True, status_queue=None): + stacks = [] + for obj in self.stacks(): + stacks.append(obj) + self._project_cleanup_del_res( + self.delete_stack, obj, + dry_run, status_queue) + + for stack in stacks: + self.wait_for_delete(stack) diff --git a/openstack/proxy.py b/openstack/proxy.py index 2ea253038..6c1456ab9 100644 --- a/openstack/proxy.py +++ b/openstack/proxy.py @@ -521,6 +521,22 @@ class Proxy(adapter.Adapter): res = self._get_resource(resource_type, value, **attrs) return res.head(self, base_path=base_path) + def _get_cleanup_dependencies(self): + return None + + def _service_cleanup(self, dry_run=True, status_queue=None): + return None + + def _service_cleanup_del_res(self, del_fn, obj, dry_run=True, + status_queue=None): + if status_queue: + status_queue.put(obj) + if not dry_run: + try: + del_fn(obj) + except exceptions.SDKException as e: + self.log.error('Cannot delete resource %s: %s', obj, str(e)) + def _json_response(response, result_key=None, error_message=None): """Temporary method to use to bridge from ShadeAdapter to SDK calls.""" diff --git a/openstack/tests/functional/base.py b/openstack/tests/functional/base.py index 1a19153ba..f3e4f2a67 100644 --- a/openstack/tests/functional/base.py +++ b/openstack/tests/functional/base.py @@ -51,6 +51,8 @@ class BaseFunctionalTest(base.TestCase): _disable_keep_alive(self.conn) self._demo_name = os.environ.get('OPENSTACKSDK_DEMO_CLOUD', 'devstack') + self._demo_name_alt = os.environ.get('OPENSTACKSDK_DEMO_CLOUD_ALT', + 'devstack-alt') self._op_name = os.environ.get( 'OPENSTACKSDK_OPERATOR_CLOUD', 'devstack-admin') @@ -73,6 +75,13 @@ class BaseFunctionalTest(base.TestCase): self.user_cloud = connection.Connection(config=user_config) _disable_keep_alive(self.user_cloud) + # This cloud is used by the project_cleanup test, so you can't rely on + # it + user_config_alt = self.config.get_one( + cloud=self._demo_name_alt, **kwargs) + self.user_cloud_alt = connection.Connection(config=user_config_alt) + _disable_keep_alive(self.user_cloud_alt) + def _set_operator_cloud(self, **kwargs): operator_config = self.config.get_one( cloud=self._op_name, **kwargs) diff --git a/openstack/tests/functional/cloud/test_project_cleanup.py b/openstack/tests/functional/cloud/test_project_cleanup.py new file mode 100644 index 000000000..01a152cda --- /dev/null +++ b/openstack/tests/functional/cloud/test_project_cleanup.py @@ -0,0 +1,86 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +test_project_cleanup +---------------------------------- + +Functional tests for project cleanup methods. +""" +import queue + +from openstack.tests.functional import base + + +class TestProjectCleanup(base.BaseFunctionalTest): + + _wait_for_timeout_key = 'OPENSTACKSDK_FUNC_TEST_TIMEOUT_CLEANUP' + + def setUp(self): + super(TestProjectCleanup, self).setUp() + self.conn = self.user_cloud_alt + self.network_name = self.getUniqueString('network') + + def _create_network_resources(self): + conn = self.conn + self.net = conn.network.create_network( + name=self.network_name, + ) + self.subnet = conn.network.create_subnet( + name=self.getUniqueString('subnet'), + network_id=self.net.id, + cidr='192.169.1.0/24', + ip_version=4, + ) + self.router = conn.network.create_router( + name=self.getUniqueString('router') + ) + conn.network.add_interface_to_router( + self.router.id, + subnet_id=self.subnet.id) + + def _test_cleanup(self): + self._create_network_resources() + status_queue = queue.Queue() + self.conn.project_cleanup( + dry_run=True, + wait_timeout=120, + status_queue=status_queue) + + objects = [] + while not status_queue.empty(): + objects.append(status_queue.get()) + + net_names = list(obj.name for obj in objects) + self.assertIn(self.network_name, net_names) + + # Ensure network still exists + net = self.conn.network.get_network(self.net.id) + self.assertEqual(net.name, self.net.name) + + self.conn.project_cleanup( + dry_run=False, + wait_timeout=600, + status_queue=status_queue) + + objects = [] + while not status_queue.empty(): + objects.append(status_queue.get()) + + nets = self.conn.network.networks() + net_names = list(obj.name for obj in nets) + # Since we might not have enough privs to drop all nets - ensure + # we do not have our known one + self.assertNotIn(self.network_name, net_names) + + def test_cleanup(self): + self._test_cleanup() diff --git a/openstack/tests/unit/test_utils.py b/openstack/tests/unit/test_utils.py index c16cfe2dc..570709222 100644 --- a/openstack/tests/unit/test_utils.py +++ b/openstack/tests/unit/test_utils.py @@ -17,10 +17,15 @@ import mock import sys from openstack.tests.unit import base +import concurrent.futures + +import testtools + import fixtures import os_service_types import openstack +from openstack import exceptions from openstack import utils @@ -168,3 +173,73 @@ class TestOsServiceTypesVersion(base.TestCase): "This project must be pinned to the latest version of " "os-service-types. Please bump requirements.txt and " "lower-constraints.txt accordingly.") + + +class TestTinyDAG(base.TestCase): + test_graph = { + 'a': ['b', 'd', 'f'], + 'b': ['c', 'd'], + 'c': ['d'], + 'd': ['e'], + 'e': [], + 'f': ['e'], + 'g': ['e'] + } + + def _verify_order(self, test_graph, test_list): + for k, v in test_graph.items(): + for dep in v: + self.assertTrue(test_list.index(k) < test_list.index(dep)) + + def test_from_dict(self): + sot = utils.TinyDAG() + sot.from_dict(self.test_graph) + + def test_topological_sort(self): + sot = utils.TinyDAG() + sot.from_dict(self.test_graph) + sorted_list = sot.topological_sort() + self._verify_order(sot.graph, sorted_list) + self.assertEqual(len(self.test_graph.keys()), len(sorted_list)) + + def test_walk(self): + sot = utils.TinyDAG() + sot.from_dict(self.test_graph) + sorted_list = [] + for node in sot.walk(): + sorted_list.append(node) + sot.node_done(node) + self._verify_order(sot.graph, sorted_list) + self.assertEqual(len(self.test_graph.keys()), len(sorted_list)) + + def test_walk_parallel(self): + sot = utils.TinyDAG() + sot.from_dict(self.test_graph) + sorted_list = [] + with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor: + for node in sot.walk(timeout=1): + executor.submit(test_walker_fn, sot, node, sorted_list) + self._verify_order(sot.graph, sorted_list) + print(sorted_list) + self.assertEqual(len(self.test_graph.keys()), len(sorted_list)) + + def test_walk_raise(self): + sot = utils.TinyDAG() + sot.from_dict(self.test_graph) + bad_node = 'f' + with testtools.ExpectedException(exceptions.SDKException): + for node in sot.walk(timeout=1): + if node != bad_node: + sot.node_done(node) + + def test_add_node_after_edge(self): + sot = utils.TinyDAG() + sot.add_node('a') + sot.add_edge('a', 'b') + sot.add_node('a') + self.assertEqual(sot._graph['a'], set('b')) + + +def test_walker_fn(graph, node, lst): + lst.append(node) + graph.node_done(node) diff --git a/openstack/utils.py b/openstack/utils.py index efab5cce5..cca475de5 100644 --- a/openstack/utils.py +++ b/openstack/utils.py @@ -10,7 +10,9 @@ # License for the specific language governing permissions and limitations # under the License. +import queue import string +import threading import time import six @@ -190,3 +192,110 @@ def maximum_supported_microversion(adapter, client_maximum): result = min(client_max, server_max) return discover.version_to_string(result) + + +class TinyDAG(six.Iterator): + """Tiny DAG + + Bases on the Kahn's algorithm, and enables parallel visiting of the nodes + (parallel execution of the workflow items). + """ + + def __init__(self, data=None): + self._reset() + self._lock = threading.Lock() + if data and isinstance(data, dict): + self.from_dict(data) + + def _reset(self): + self._graph = dict() + self._wait_timeout = 120 + + @property + def graph(self): + """Get graph as adjacency dict + """ + return self._graph + + def add_node(self, node): + self._graph.setdefault(node, set()) + + def add_edge(self, u, v): + self._graph[u].add(v) + + def from_dict(self, data): + self._reset() + for k, v in data.items(): + self.add_node(k) + for dep in v: + self.add_edge(k, dep) + + def walk(self, timeout=None): + """Start the walking from the beginning. + """ + if timeout: + self._wait_timeout = timeout + return self + + def __iter__(self): + self._start_traverse() + return self + + def __next__(self): + # Start waiting if it is expected to get something + # (counting down from graph length to 0). + if (self._it_cnt > 0): + self._it_cnt -= 1 + try: + res = self._queue.get( + block=True, + timeout=self._wait_timeout) + return res + + except queue.Empty: + raise exceptions.SDKException('Timeout waiting for ' + 'cleanup task to complete') + else: + raise StopIteration + + def node_done(self, node): + """Mark node as "processed" and put following items into the queue""" + self._done.add(node) + + for v in self._graph[node]: + self._run_in_degree[v] -= 1 + if self._run_in_degree[v] == 0: + self._queue.put(v) + + def _start_traverse(self): + """Initialize graph traversing""" + self._run_in_degree = self._get_in_degree() + self._queue = queue.Queue() + self._done = set() + self._it_cnt = len(self._graph) + + for k, v in self._run_in_degree.items(): + if v == 0: + self._queue.put(k) + + def _get_in_degree(self): + """Calculate the in_degree (count incoming) for nodes""" + _in_degree = dict() + _in_degree = {u: 0 for u in self._graph.keys()} + for u in self._graph: + for v in self._graph[u]: + _in_degree[v] += 1 + + return _in_degree + + def topological_sort(self): + """Return the graph nodes in the topological order""" + result = [] + for node in self: + result.append(node) + self.node_done(node) + + return result + + def size(self): + return len(self._graph.keys()) diff --git a/releasenotes/notes/add_project_cleanup-39c3517b25a5372e.yaml b/releasenotes/notes/add_project_cleanup-39c3517b25a5372e.yaml new file mode 100644 index 000000000..f99c66fc9 --- /dev/null +++ b/releasenotes/notes/add_project_cleanup-39c3517b25a5372e.yaml @@ -0,0 +1,8 @@ +--- +features: + - | + Project cleanup functionality. It provides a single method in the + connection object, which calls cleanup method in all supported services + (both part of the SDK itself and all "imported" in the runtime or through + the vendor_hook functionality). Cleanup is working in multiple threads + where possible (no dependencies between services). diff --git a/tox.ini b/tox.ini index 43283d415..2072fcbf7 100644 --- a/tox.ini +++ b/tox.ini @@ -36,6 +36,7 @@ setenv = OS_TEST_TIMEOUT=600 OPENSTACKSDK_FUNC_TEST_TIMEOUT_LOAD_BALANCER=600 OPENSTACKSDK_EXAMPLE_CONFIG_KEY=functional + OPENSTACKSDK_FUNC_TEST_TIMEOUT_PROJECT_CLEANUP=1200 commands = stestr --test-path ./openstack/tests/functional/{env:OPENSTACKSDK_TESTS_SUBDIR:} run --serial {posargs} stestr slowest