Lay a foundation for the project cleanup

During Berlin Summit 2018 it was decided to start implementation
of the project cleanup in the SDK. This is a foundation for that
with the required bits and few examples in the services to demonstrate
the interface.

In the followup changes following should be done:
- review cleanup in service proxies (the ones implemented and not)
- describe the feature in the documentation
- start consuming this service from the OSC (need help deciding where
should it be implemented - core OSC, separate project, OSC plugin in
SDK, ...)

Change-Id: Ie29d10475e8d1af1beea3cc5b45984f2be9236ef
This commit is contained in:
Artem Goncharov 2019-12-20 19:27:55 +01:00
parent 60a24f6c47
commit d4ddd5ce99
15 changed files with 508 additions and 1 deletions

View File

@ -349,3 +349,42 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
to delete failed to occur in the specified seconds.
"""
return resource.wait_for_delete(self, res, interval, wait)
def _get_cleanup_dependencies(self):
return {
'block_storage': {
'before': []
}
}
def _service_cleanup(self, dry_run=True, status_queue=None):
if self._connection.has_service('object-store'):
# Volume backups require object-store to be available, even for
# listing
for obj in self.backups(details=False):
if status_queue:
status_queue.put(obj)
if not dry_run:
self.delete_backup(obj)
snapshots = []
for obj in self.snapshots(details=False):
if status_queue:
snapshots.append(obj)
status_queue.put(obj)
if not dry_run:
self.delete_snapshot(obj)
# Before deleting volumes need to wait for snapshots to be deleted
for obj in snapshots:
try:
self.wait_for_delete(obj)
except exceptions.SDKException:
# Well, did our best, still try further
pass
for obj in self.volumes(details=False):
if status_queue:
status_queue.put(obj)
if not dry_run:
self.delete_volume(obj)

View File

@ -11,6 +11,7 @@
# limitations under the License.
import copy
import functools
import queue
import six
# import types so that we can reference ListType in sphinx param declarations.
# We can't just use list, because sphinx gets confused by
@ -35,6 +36,7 @@ from openstack.cloud import _utils
import openstack.config
from openstack.config import cloud_region as cloud_region_mod
from openstack import proxy
from openstack import utils
DEFAULT_SERVER_AGE = 5
DEFAULT_PORT_AGE = 5
@ -749,3 +751,55 @@ class _OpenStackCloudMixin(object):
return True
else:
return False
def project_cleanup(self, dry_run=True,
wait_timeout=120, status_queue=None):
"""Cleanup the project resources.
Cleanup all resources in all services, which provide cleanup methods.
:param bool dry_run: Cleanup or only list identified resources.
:param int wait_timeout: Maximum amount of time given to each service
to comlete the cleanup.
:param queue status_queue: a threading queue object used to get current
process status. The queue contain processed resources.
"""
dependencies = {}
get_dep_fn_name = '_get_cleanup_dependencies'
cleanup_fn_name = '_service_cleanup'
if not status_queue:
status_queue = queue.Queue()
for service in self.config.get_enabled_services():
if hasattr(self, service):
proxy = getattr(self, service)
if (proxy
and hasattr(proxy, get_dep_fn_name)
and hasattr(proxy, cleanup_fn_name)):
deps = getattr(proxy, get_dep_fn_name)()
if deps:
dependencies.update(deps)
dep_graph = utils.TinyDAG()
for k, v in dependencies.items():
dep_graph.add_node(k)
for dep in v['before']:
dep_graph.add_node(dep)
dep_graph.add_edge(k, dep)
for service in dep_graph.walk(timeout=wait_timeout):
fn = None
if hasattr(self, service):
proxy = getattr(self, service)
cleanup_fn = getattr(proxy, cleanup_fn_name, None)
if cleanup_fn:
fn = functools.partial(cleanup_fn, dry_run=dry_run,
status_queue=status_queue)
if fn:
self._pool_executor.submit(cleanup_task, dep_graph,
service, fn)
else:
dep_graph.node_done(service)
def cleanup_task(graph, service, fn):
fn()
graph.node_done(service)

View File

@ -1472,3 +1472,23 @@ class Proxy(proxy.Proxy):
server_id = self._get_resource(_server.Server, server).id
return self._get(_server_diagnostics.ServerDiagnostics,
server_id=server_id, requires_id=False)
def _get_cleanup_dependencies(self):
return {
'compute': {
'before': ['block_storage', 'network', 'identity']
}
}
def _service_cleanup(self, dry_run=True, status_queue=None):
for obj in self.servers(details=False):
self._service_cleanup_del_res(self.delete_server,
obj,
dry_run,
status_queue)
for obj in self.keypairs():
self._service_cleanup_del_res(self.delete_keypair,
obj,
dry_run,
status_queue)

View File

@ -363,6 +363,21 @@ class CloudRegion(object):
services.append("_".join(key.split('_')[:-2]))
return list(set(services))
def get_enabled_services(self):
services = set()
all_services = [k['service_type'] for k in
self._service_type_manager.services]
all_services.extend(k[4:] for k in
self.config.keys() if k.startswith('has_'))
for srv in all_services:
ep = self.get_endpoint_from_catalog(srv)
if ep:
services.add(srv.replace('-', '_'))
return services
def get_auth_args(self):
return self.config.get('auth', {})

View File

@ -480,7 +480,7 @@ class Connection(
attr_name.replace('-', '_'),
property(fget=getter)
)
self.config.enable_service(attr_name)
self.config.enable_service(service.service_type)
def authorize(self):
"""Authorize this Connection

View File

@ -511,3 +511,14 @@ class Proxy(proxy.Proxy):
:rtype: :class:`~openstack.dns.v2.zone_transfer.ZoneTransferAccept`
"""
return self._create(_zone_transfer.ZoneTransferAccept, **attrs)
def _get_cleanup_dependencies(self):
# DNS may depend on floating ip
return {
'dns': {
'before': ['network']
}
}
def _service_cleanup(self, dry_run=True, status_queue=False):
pass

View File

@ -3990,3 +3990,49 @@ class Proxy(proxy.Proxy):
floatingip = self._get_resource(_floating_ip.FloatingIP, floating_ip)
return self._update(_port_forwarding.PortForwarding, port_forwarding,
floatingip_id=floatingip.id, **attrs)
def _get_cleanup_dependencies(self):
return {
'network': {
'before': ['identity']
}
}
def _service_cleanup(self, dry_run=True, status_queue=None):
for obj in self.ips():
self._service_cleanup_del_res(self.delete_ip, obj, dry_run,
status_queue)
for obj in self.security_groups():
if obj.name != 'default':
self._service_cleanup_del_res(
self.delete_security_group, obj,
dry_run, status_queue)
for port in self.ports():
if port.device_owner in ['network:router_interface',
'network:router_interface_distributed']:
if status_queue:
status_queue.put(obj)
if not dry_run:
try:
self.remove_interface_from_router(
router=port.device_id,
port_id=port.id)
except exceptions.SDKException:
self.log.error('Cannot delete object %s' % obj)
for obj in self.routers():
self._service_cleanup_del_res(
self.delete_router, obj,
dry_run, status_queue)
for obj in self.subnets():
self._service_cleanup_del_res(
self.delete_subnet, obj,
dry_run, status_queue)
for obj in self.networks():
self._service_cleanup_del_res(
self.delete_network, obj,
dry_run, status_queue)

View File

@ -479,3 +479,21 @@ class Proxy(proxy.Proxy):
except Exception as e:
raise exceptions.SDKException(
"Error in processing template files: %s" % str(e))
def _get_cleanup_dependencies(self):
return {
'orchestration': {
'before': ['compute', 'network', 'identity']
}
}
def _service_cleanup(self, dry_run=True, status_queue=None):
stacks = []
for obj in self.stacks():
stacks.append(obj)
self._project_cleanup_del_res(
self.delete_stack, obj,
dry_run, status_queue)
for stack in stacks:
self.wait_for_delete(stack)

View File

@ -521,6 +521,22 @@ class Proxy(adapter.Adapter):
res = self._get_resource(resource_type, value, **attrs)
return res.head(self, base_path=base_path)
def _get_cleanup_dependencies(self):
return None
def _service_cleanup(self, dry_run=True, status_queue=None):
return None
def _service_cleanup_del_res(self, del_fn, obj, dry_run=True,
status_queue=None):
if status_queue:
status_queue.put(obj)
if not dry_run:
try:
del_fn(obj)
except exceptions.SDKException as e:
self.log.error('Cannot delete resource %s: %s', obj, str(e))
def _json_response(response, result_key=None, error_message=None):
"""Temporary method to use to bridge from ShadeAdapter to SDK calls."""

View File

@ -51,6 +51,8 @@ class BaseFunctionalTest(base.TestCase):
_disable_keep_alive(self.conn)
self._demo_name = os.environ.get('OPENSTACKSDK_DEMO_CLOUD', 'devstack')
self._demo_name_alt = os.environ.get('OPENSTACKSDK_DEMO_CLOUD_ALT',
'devstack-alt')
self._op_name = os.environ.get(
'OPENSTACKSDK_OPERATOR_CLOUD', 'devstack-admin')
@ -73,6 +75,13 @@ class BaseFunctionalTest(base.TestCase):
self.user_cloud = connection.Connection(config=user_config)
_disable_keep_alive(self.user_cloud)
# This cloud is used by the project_cleanup test, so you can't rely on
# it
user_config_alt = self.config.get_one(
cloud=self._demo_name_alt, **kwargs)
self.user_cloud_alt = connection.Connection(config=user_config_alt)
_disable_keep_alive(self.user_cloud_alt)
def _set_operator_cloud(self, **kwargs):
operator_config = self.config.get_one(
cloud=self._op_name, **kwargs)

View File

@ -0,0 +1,86 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
test_project_cleanup
----------------------------------
Functional tests for project cleanup methods.
"""
import queue
from openstack.tests.functional import base
class TestProjectCleanup(base.BaseFunctionalTest):
_wait_for_timeout_key = 'OPENSTACKSDK_FUNC_TEST_TIMEOUT_CLEANUP'
def setUp(self):
super(TestProjectCleanup, self).setUp()
self.conn = self.user_cloud_alt
self.network_name = self.getUniqueString('network')
def _create_network_resources(self):
conn = self.conn
self.net = conn.network.create_network(
name=self.network_name,
)
self.subnet = conn.network.create_subnet(
name=self.getUniqueString('subnet'),
network_id=self.net.id,
cidr='192.169.1.0/24',
ip_version=4,
)
self.router = conn.network.create_router(
name=self.getUniqueString('router')
)
conn.network.add_interface_to_router(
self.router.id,
subnet_id=self.subnet.id)
def _test_cleanup(self):
self._create_network_resources()
status_queue = queue.Queue()
self.conn.project_cleanup(
dry_run=True,
wait_timeout=120,
status_queue=status_queue)
objects = []
while not status_queue.empty():
objects.append(status_queue.get())
net_names = list(obj.name for obj in objects)
self.assertIn(self.network_name, net_names)
# Ensure network still exists
net = self.conn.network.get_network(self.net.id)
self.assertEqual(net.name, self.net.name)
self.conn.project_cleanup(
dry_run=False,
wait_timeout=600,
status_queue=status_queue)
objects = []
while not status_queue.empty():
objects.append(status_queue.get())
nets = self.conn.network.networks()
net_names = list(obj.name for obj in nets)
# Since we might not have enough privs to drop all nets - ensure
# we do not have our known one
self.assertNotIn(self.network_name, net_names)
def test_cleanup(self):
self._test_cleanup()

View File

@ -17,10 +17,15 @@ import mock
import sys
from openstack.tests.unit import base
import concurrent.futures
import testtools
import fixtures
import os_service_types
import openstack
from openstack import exceptions
from openstack import utils
@ -168,3 +173,73 @@ class TestOsServiceTypesVersion(base.TestCase):
"This project must be pinned to the latest version of "
"os-service-types. Please bump requirements.txt and "
"lower-constraints.txt accordingly.")
class TestTinyDAG(base.TestCase):
test_graph = {
'a': ['b', 'd', 'f'],
'b': ['c', 'd'],
'c': ['d'],
'd': ['e'],
'e': [],
'f': ['e'],
'g': ['e']
}
def _verify_order(self, test_graph, test_list):
for k, v in test_graph.items():
for dep in v:
self.assertTrue(test_list.index(k) < test_list.index(dep))
def test_from_dict(self):
sot = utils.TinyDAG()
sot.from_dict(self.test_graph)
def test_topological_sort(self):
sot = utils.TinyDAG()
sot.from_dict(self.test_graph)
sorted_list = sot.topological_sort()
self._verify_order(sot.graph, sorted_list)
self.assertEqual(len(self.test_graph.keys()), len(sorted_list))
def test_walk(self):
sot = utils.TinyDAG()
sot.from_dict(self.test_graph)
sorted_list = []
for node in sot.walk():
sorted_list.append(node)
sot.node_done(node)
self._verify_order(sot.graph, sorted_list)
self.assertEqual(len(self.test_graph.keys()), len(sorted_list))
def test_walk_parallel(self):
sot = utils.TinyDAG()
sot.from_dict(self.test_graph)
sorted_list = []
with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor:
for node in sot.walk(timeout=1):
executor.submit(test_walker_fn, sot, node, sorted_list)
self._verify_order(sot.graph, sorted_list)
print(sorted_list)
self.assertEqual(len(self.test_graph.keys()), len(sorted_list))
def test_walk_raise(self):
sot = utils.TinyDAG()
sot.from_dict(self.test_graph)
bad_node = 'f'
with testtools.ExpectedException(exceptions.SDKException):
for node in sot.walk(timeout=1):
if node != bad_node:
sot.node_done(node)
def test_add_node_after_edge(self):
sot = utils.TinyDAG()
sot.add_node('a')
sot.add_edge('a', 'b')
sot.add_node('a')
self.assertEqual(sot._graph['a'], set('b'))
def test_walker_fn(graph, node, lst):
lst.append(node)
graph.node_done(node)

View File

@ -10,7 +10,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import queue
import string
import threading
import time
import six
@ -190,3 +192,110 @@ def maximum_supported_microversion(adapter, client_maximum):
result = min(client_max, server_max)
return discover.version_to_string(result)
class TinyDAG(six.Iterator):
"""Tiny DAG
Bases on the Kahn's algorithm, and enables parallel visiting of the nodes
(parallel execution of the workflow items).
"""
def __init__(self, data=None):
self._reset()
self._lock = threading.Lock()
if data and isinstance(data, dict):
self.from_dict(data)
def _reset(self):
self._graph = dict()
self._wait_timeout = 120
@property
def graph(self):
"""Get graph as adjacency dict
"""
return self._graph
def add_node(self, node):
self._graph.setdefault(node, set())
def add_edge(self, u, v):
self._graph[u].add(v)
def from_dict(self, data):
self._reset()
for k, v in data.items():
self.add_node(k)
for dep in v:
self.add_edge(k, dep)
def walk(self, timeout=None):
"""Start the walking from the beginning.
"""
if timeout:
self._wait_timeout = timeout
return self
def __iter__(self):
self._start_traverse()
return self
def __next__(self):
# Start waiting if it is expected to get something
# (counting down from graph length to 0).
if (self._it_cnt > 0):
self._it_cnt -= 1
try:
res = self._queue.get(
block=True,
timeout=self._wait_timeout)
return res
except queue.Empty:
raise exceptions.SDKException('Timeout waiting for '
'cleanup task to complete')
else:
raise StopIteration
def node_done(self, node):
"""Mark node as "processed" and put following items into the queue"""
self._done.add(node)
for v in self._graph[node]:
self._run_in_degree[v] -= 1
if self._run_in_degree[v] == 0:
self._queue.put(v)
def _start_traverse(self):
"""Initialize graph traversing"""
self._run_in_degree = self._get_in_degree()
self._queue = queue.Queue()
self._done = set()
self._it_cnt = len(self._graph)
for k, v in self._run_in_degree.items():
if v == 0:
self._queue.put(k)
def _get_in_degree(self):
"""Calculate the in_degree (count incoming) for nodes"""
_in_degree = dict()
_in_degree = {u: 0 for u in self._graph.keys()}
for u in self._graph:
for v in self._graph[u]:
_in_degree[v] += 1
return _in_degree
def topological_sort(self):
"""Return the graph nodes in the topological order"""
result = []
for node in self:
result.append(node)
self.node_done(node)
return result
def size(self):
return len(self._graph.keys())

View File

@ -0,0 +1,8 @@
---
features:
- |
Project cleanup functionality. It provides a single method in the
connection object, which calls cleanup method in all supported services
(both part of the SDK itself and all "imported" in the runtime or through
the vendor_hook functionality). Cleanup is working in multiple threads
where possible (no dependencies between services).

View File

@ -36,6 +36,7 @@ setenv =
OS_TEST_TIMEOUT=600
OPENSTACKSDK_FUNC_TEST_TIMEOUT_LOAD_BALANCER=600
OPENSTACKSDK_EXAMPLE_CONFIG_KEY=functional
OPENSTACKSDK_FUNC_TEST_TIMEOUT_PROJECT_CLEANUP=1200
commands = stestr --test-path ./openstack/tests/functional/{env:OPENSTACKSDK_TESTS_SUBDIR:} run --serial {posargs}
stestr slowest