Merge "Add support for filters into the project cleanup"

This commit is contained in:
Zuul 2020-08-11 22:53:33 +00:00 committed by Gerrit Code Review
commit ff2729ef2f
9 changed files with 548 additions and 73 deletions

View File

@ -421,23 +421,45 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
}
}
def _service_cleanup(self, dry_run=True, status_queue=None):
def _service_cleanup(self, dry_run=True, client_status_queue=None,
identified_resources=None,
filters=None, resource_evaluation_fn=None):
if self._connection.has_service('object-store'):
# Volume backups require object-store to be available, even for
# listing
backups = []
for obj in self.backups(details=False):
if status_queue:
status_queue.put(obj)
if not dry_run:
self.delete_backup(obj)
need_delete = self._service_cleanup_del_res(
self.delete_backup,
obj,
dry_run=dry_run,
client_status_queue=client_status_queue,
identified_resources=identified_resources,
filters=filters,
resource_evaluation_fn=resource_evaluation_fn)
if not dry_run and need_delete:
backups.append(obj)
# Before deleting snapshots need to wait for backups to be deleted
for obj in backups:
try:
self.wait_for_delete(obj)
except exceptions.SDKException:
# Well, did our best, still try further
pass
snapshots = []
for obj in self.snapshots(details=False):
if status_queue:
need_delete = self._service_cleanup_del_res(
self.delete_snapshot,
obj,
dry_run=dry_run,
client_status_queue=client_status_queue,
identified_resources=identified_resources,
filters=filters,
resource_evaluation_fn=resource_evaluation_fn)
if not dry_run and need_delete:
snapshots.append(obj)
status_queue.put(obj)
if not dry_run:
self.delete_snapshot(obj)
# Before deleting volumes need to wait for snapshots to be deleted
for obj in snapshots:
@ -447,8 +469,12 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
# Well, did our best, still try further
pass
for obj in self.volumes(details=False):
if status_queue:
status_queue.put(obj)
if not dry_run:
self.delete_volume(obj)
for obj in self.volumes(details=True):
self._service_cleanup_del_res(
self.delete_volume,
obj,
dry_run=dry_run,
client_status_queue=client_status_queue,
identified_resources=identified_resources,
filters=filters,
resource_evaluation_fn=resource_evaluation_fn)

View File

@ -751,8 +751,14 @@ class _OpenStackCloudMixin:
else:
return False
def project_cleanup(self, dry_run=True,
wait_timeout=120, status_queue=None):
def project_cleanup(
self,
dry_run=True,
wait_timeout=120,
status_queue=None,
filters=None,
resource_evaluation_fn=None
):
"""Cleanup the project resources.
Cleanup all resources in all services, which provide cleanup methods.
@ -762,6 +768,12 @@ class _OpenStackCloudMixin:
to comlete the cleanup.
:param queue status_queue: a threading queue object used to get current
process status. The queue contain processed resources.
:param dict filters: Additional filters for the cleanup (only resources
matching all filters will be deleted, if there are no other
dependencies).
:param resource_evaluation_fn: A callback function, which will be
invoked for each resurce and must return True/False depending on
whether resource need to be deleted or not.
"""
dependencies = {}
get_dep_fn_name = '_get_cleanup_dependencies'
@ -771,9 +783,11 @@ class _OpenStackCloudMixin:
for service in self.config.get_enabled_services():
if hasattr(self, service):
proxy = getattr(self, service)
if (proxy
and hasattr(proxy, get_dep_fn_name)
and hasattr(proxy, cleanup_fn_name)):
if (
proxy
and hasattr(proxy, get_dep_fn_name)
and hasattr(proxy, cleanup_fn_name)
):
deps = getattr(proxy, get_dep_fn_name)()
if deps:
dependencies.update(deps)
@ -783,6 +797,10 @@ class _OpenStackCloudMixin:
for dep in v['before']:
dep_graph.add_node(dep)
dep_graph.add_edge(k, dep)
for dep in v.get('after', []):
dep_graph.add_edge(dep, k)
cleanup_resources = dict()
for service in dep_graph.walk(timeout=wait_timeout):
fn = None
@ -790,11 +808,18 @@ class _OpenStackCloudMixin:
proxy = getattr(self, service)
cleanup_fn = getattr(proxy, cleanup_fn_name, None)
if cleanup_fn:
fn = functools.partial(cleanup_fn, dry_run=dry_run,
status_queue=status_queue)
fn = functools.partial(
cleanup_fn,
dry_run=dry_run,
client_status_queue=status_queue,
identified_resources=cleanup_resources,
filters=filters,
resource_evaluation_fn=resource_evaluation_fn
)
if fn:
self._pool_executor.submit(cleanup_task, dep_graph,
service, fn)
self._pool_executor.submit(
cleanup_task, dep_graph, service, fn
)
else:
dep_graph.node_done(service)
@ -807,5 +832,10 @@ class _OpenStackCloudMixin:
def cleanup_task(graph, service, fn):
fn()
graph.node_done(service)
try:
fn()
except Exception:
log = _log.setup_logging('openstack.project_cleanup')
log.exception('Error in the %s cleanup function' % service)
finally:
graph.node_done(service)

View File

@ -1495,9 +1495,28 @@ class Proxy(proxy.Proxy):
}
}
def _service_cleanup(self, dry_run=True, status_queue=None):
for obj in self.servers(details=False):
self._service_cleanup_del_res(self.delete_server,
obj,
dry_run,
status_queue)
def _service_cleanup(self, dry_run=True, client_status_queue=None,
identified_resources=None,
filters=None, resource_evaluation_fn=None):
servers = []
for obj in self.servers():
need_delete = self._service_cleanup_del_res(
self.delete_server,
obj,
dry_run=dry_run,
client_status_queue=client_status_queue,
identified_resources=identified_resources,
filters=filters,
resource_evaluation_fn=resource_evaluation_fn)
if not dry_run and need_delete:
# In the dry run we identified, that server will go. To propely
# identify consequences we need to tell others, that the port
# will disappear as well
for port in self._connection.network.ports(device_id=obj.id):
identified_resources[port.id] = port
servers.append(obj)
# We actually need to wait for servers to really disappear, since they
# might be still holding ports on the subnet
for server in servers:
self.wait_for_delete(server)

View File

@ -520,5 +520,7 @@ class Proxy(proxy.Proxy):
}
}
def _service_cleanup(self, dry_run=True, status_queue=False):
def _service_cleanup(self, dry_run=True, client_status_queue=False,
identified_resources=None,
filters=None, resource_evaluation_fn=None):
pass

View File

@ -4014,22 +4014,81 @@ class Proxy(proxy.Proxy):
}
}
def _service_cleanup(self, dry_run=True, status_queue=None):
for obj in self.ips():
self._service_cleanup_del_res(self.delete_ip, obj, dry_run,
status_queue)
def _service_cleanup(self, dry_run=True, client_status_queue=None,
identified_resources=None,
filters=None, resource_evaluation_fn=None):
project_id = self.get_project_id()
# Delete floating_ips in the project if no filters defined OR all
# filters are matching and port_id is empty
for obj in self.ips(project_id=project_id):
self._service_cleanup_del_res(
self.delete_ip,
obj,
dry_run=dry_run,
client_status_queue=client_status_queue,
identified_resources=identified_resources,
filters=filters,
resource_evaluation_fn=fip_cleanup_evaluation)
for obj in self.security_groups():
# Delete (try to delete) all security groups in the project
# Let's hope we can't drop SG in use
for obj in self.security_groups(project_id=project_id):
if obj.name != 'default':
self._service_cleanup_del_res(
self.delete_security_group, obj,
dry_run, status_queue)
self.delete_security_group,
obj,
dry_run=dry_run,
client_status_queue=client_status_queue,
identified_resources=identified_resources,
filters=filters,
resource_evaluation_fn=resource_evaluation_fn)
for port in self.ports():
if port.device_owner in ['network:router_interface',
'network:router_interface_distributed']:
if status_queue:
status_queue.put(obj)
# Networks are crazy, try to delete router+net+subnet
# if there are no "other" ports allocated on the net
for net in self.networks(project_id=project_id):
network_has_ports_allocated = False
router_if = list()
for port in self.ports(
project_id=project_id,
network_id=net.id
):
self.log.debug('Looking at port %s' % port)
if port.device_owner in [
'network:router_interface',
'network:router_interface_distributed'
]:
router_if.append(port)
elif port.device_owner == 'network:dhcp':
# we don't treat DHCP as a real port
continue
elif (
identified_resources
and port.device_id not in identified_resources
):
# It seems some no other service identified this resource
# to be deleted. We can assume it doesn't count
network_has_ports_allocated = True
if network_has_ports_allocated:
# If some ports are on net - we cannot delete it
continue
self.log.debug('Network %s should be deleted' % net)
# __Check__ if we need to drop network according to filters
network_must_be_deleted = self._service_cleanup_del_res(
self.delete_network,
net,
dry_run=True,
client_status_queue=None,
identified_resources=None,
filters=filters,
resource_evaluation_fn=resource_evaluation_fn)
if not network_must_be_deleted:
# If not - check another net
continue
# otherwise disconnect router, drop net, subnet, router
# Disconnect
for port in router_if:
if client_status_queue:
client_status_queue.put(port)
if not dry_run:
try:
self.remove_interface_from_router(
@ -4037,18 +4096,72 @@ class Proxy(proxy.Proxy):
port_id=port.id)
except exceptions.SDKException:
self.log.error('Cannot delete object %s' % obj)
# router disconnected, drop it
self._service_cleanup_del_res(
self.delete_router,
self.get_router(port.device_id),
dry_run=dry_run,
client_status_queue=client_status_queue,
identified_resources=identified_resources,
filters=None,
resource_evaluation_fn=None)
# Drop all subnets in the net (no further conditions)
for obj in self.subnets(
project_id=project_id,
network_id=net.id
):
self._service_cleanup_del_res(
self.delete_subnet,
obj,
dry_run=dry_run,
client_status_queue=client_status_queue,
identified_resources=identified_resources,
filters=None,
resource_evaluation_fn=None)
# And now the network itself (we are here definitely only if we
# need that)
self._service_cleanup_del_res(
self.delete_network,
net,
dry_run=dry_run,
client_status_queue=client_status_queue,
identified_resources=identified_resources,
filters=None,
resource_evaluation_fn=None)
# It might happen, that we have routers not attached to anything
for obj in self.routers():
self._service_cleanup_del_res(
self.delete_router, obj,
dry_run, status_queue)
ports = list(self.ports(device_id=obj.id))
if len(ports) == 0:
self._service_cleanup_del_res(
self.delete_router,
obj,
dry_run=dry_run,
client_status_queue=client_status_queue,
identified_resources=identified_resources,
filters=None,
resource_evaluation_fn=None)
for obj in self.subnets():
self._service_cleanup_del_res(
self.delete_subnet, obj,
dry_run, status_queue)
for obj in self.networks():
self._service_cleanup_del_res(
self.delete_network, obj,
dry_run, status_queue)
def fip_cleanup_evaluation(obj, identified_resources=None, filters=None):
"""Determine whether Floating IP should be deleted
:param Resource obj: Floating IP object
:param dict identified_resources: Optional dictionary with resources
identified by other services for deletion.
:param dict filters: dictionary with parameters
"""
if (
filters is not None
and (
obj.port_id is not None
and identified_resources
and obj.port_id not in identified_resources
)
):
# If filters are set, but port is not empty and will not be empty -
# skip
return False
else:
return True

View File

@ -487,13 +487,21 @@ class Proxy(proxy.Proxy):
}
}
def _service_cleanup(self, dry_run=True, status_queue=None):
def _service_cleanup(self, dry_run=True, client_status_queue=None,
identified_resources=None,
filters=None, resource_evaluation_fn=None):
stacks = []
for obj in self.stacks():
stacks.append(obj)
self._project_cleanup_del_res(
self.delete_stack, obj,
dry_run, status_queue)
need_delete = self._service_cleanup_del_res(
self.delete_stack,
obj,
dry_run=dry_run,
client_status_queue=client_status_queue,
identified_resources=identified_resources,
filters=filters,
resource_evaluation_fn=resource_evaluation_fn)
if not dry_run and need_delete:
stacks.append(obj)
for stack in stacks:
self.wait_for_delete(stack)

View File

@ -15,6 +15,7 @@ try:
JSONDecodeError = simplejson.scanner.JSONDecodeError
except ImportError:
JSONDecodeError = ValueError
import iso8601
import urllib
from keystoneauth1 import adapter
@ -565,18 +566,79 @@ class Proxy(adapter.Adapter):
def _get_cleanup_dependencies(self):
return None
def _service_cleanup(self, dry_run=True, status_queue=None):
def _service_cleanup(self, dry_run=True, client_status_queue=None,
identified_resources=None, filters=None,
resource_evaluation_fn=None):
return None
def _service_cleanup_del_res(self, del_fn, obj, dry_run=True,
status_queue=None):
if status_queue:
status_queue.put(obj)
if not dry_run:
try:
del_fn(obj)
except exceptions.SDKException as e:
self.log.error('Cannot delete resource %s: %s', obj, str(e))
client_status_queue=None,
identified_resources=None,
filters=None,
resource_evaluation_fn=None):
need_delete = False
try:
if (
resource_evaluation_fn
and callable(resource_evaluation_fn)
):
# Ask a user-provided evaluation function if we need to delete
# the resource
need_del = resource_evaluation_fn(obj, filters,
identified_resources)
if isinstance(need_del, bool):
# Just double check function returned bool
need_delete = need_del
else:
need_delete = \
self._service_cleanup_resource_filters_evaluation(
obj,
filters=filters)
if need_delete:
if client_status_queue:
# Put into queue for client status info
client_status_queue.put(obj)
if identified_resources is not None:
# Put into internal dict shared between threads so that
# other services might know which other resources were
# identified
identified_resources[obj.id] = obj
if not dry_run:
del_fn(obj)
except Exception as e:
self.log.exception('Cannot delete resource %s: %s', obj, str(e))
return need_delete
def _service_cleanup_resource_filters_evaluation(self, obj, filters=None):
part_cond = []
if filters is not None and isinstance(filters, dict):
for k, v in filters.items():
try:
res_val = None
if k == 'created_at' and hasattr(obj, 'created_at'):
res_val = getattr(obj, 'created_at')
if k == 'updated_at' and hasattr(obj, 'updated_at'):
res_val = getattr(obj, 'updated_at')
if res_val:
res_date = iso8601.parse_date(res_val)
cmp_date = iso8601.parse_date(v)
if res_date and cmp_date and res_date <= cmp_date:
part_cond.append(True)
else:
part_cond.append(False)
else:
# There are filters set, but we can't get required
# attribute, so skip the resource
self.log.debug('Requested cleanup attribute %s is not '
'available on the resource' % k)
part_cond.append(False)
except Exception:
self.log.exception('Error during condition evaluation')
if all(part_cond):
return True
else:
return False
def _json_response(response, result_key=None, error_message=None):

View File

@ -48,9 +48,46 @@ class TestProjectCleanup(base.BaseFunctionalTest):
self.router.id,
subnet_id=self.subnet.id)
def _test_cleanup(self):
def test_cleanup(self):
self._create_network_resources()
status_queue = queue.Queue()
# First round - check no resources are old enough
self.conn.project_cleanup(
dry_run=True,
wait_timeout=120,
status_queue=status_queue,
filters={'created_at': '2000-01-01'})
self.assertTrue(status_queue.empty())
# Second round - resource evaluation function return false, ensure
# nothing identified
self.conn.project_cleanup(
dry_run=True,
wait_timeout=120,
status_queue=status_queue,
filters={'created_at': '2200-01-01'},
resource_evaluation_fn=lambda x, y, z: False)
self.assertTrue(status_queue.empty())
# Third round - filters set too low
self.conn.project_cleanup(
dry_run=True,
wait_timeout=120,
status_queue=status_queue,
filters={'created_at': '2200-01-01'})
objects = []
while not status_queue.empty():
objects.append(status_queue.get())
# At least known networks should be identified
net_names = list(obj.name for obj in objects)
self.assertIn(self.network_name, net_names)
# Fourth round - dry run with no filters, ensure everything identified
self.conn.project_cleanup(
dry_run=True,
wait_timeout=120,
@ -67,6 +104,7 @@ class TestProjectCleanup(base.BaseFunctionalTest):
net = self.conn.network.get_network(self.net.id)
self.assertEqual(net.name, self.net.name)
# Last round - do a real cleanup
self.conn.project_cleanup(
dry_run=False,
wait_timeout=600,
@ -81,6 +119,3 @@ class TestProjectCleanup(base.BaseFunctionalTest):
# Since we might not have enough privs to drop all nets - ensure
# we do not have our known one
self.assertNotIn(self.network_name, net_names)
def test_cleanup(self):
self._test_cleanup()

View File

@ -14,6 +14,7 @@ from testscenarios import load_tests_apply_scenarios as load_tests # noqa
from unittest import mock
import munch
import queue
from openstack import exceptions
from openstack import proxy
@ -518,3 +519,182 @@ class TestExtractName(base.TestCase):
results = proxy.Proxy(mock.Mock())._extract_name(self.url)
self.assertEqual(self.parts, results)
class TestProxyCleanup(base.TestCase):
def setUp(self):
super(TestProxyCleanup, self).setUp()
self.session = mock.Mock()
self.session._sdk_connection = self.cloud
self.fake_id = 1
self.fake_name = "fake_name"
self.fake_result = "fake_result"
self.res = mock.Mock(spec=resource.Resource)
self.res.id = self.fake_id
self.res.created_at = '2020-01-02T03:04:05'
self.res.updated_at = '2020-01-03T03:04:05'
self.res_no_updated = mock.Mock(spec=resource.Resource)
self.res_no_updated.created_at = '2020-01-02T03:04:05'
self.sot = proxy.Proxy(self.session)
self.delete_mock = mock.Mock()
def test_filters_evaluation_created_at(self):
self.assertTrue(
self.sot._service_cleanup_resource_filters_evaluation(
self.res,
filters={
'created_at': '2020-02-03T00:00:00'
}
)
)
def test_filters_evaluation_created_at_not(self):
self.assertFalse(
self.sot._service_cleanup_resource_filters_evaluation(
self.res,
filters={
'created_at': '2020-01-01T00:00:00'
}
)
)
def test_filters_evaluation_updated_at(self):
self.assertTrue(
self.sot._service_cleanup_resource_filters_evaluation(
self.res,
filters={
'updated_at': '2020-02-03T00:00:00'
}
)
)
def test_filters_evaluation_updated_at_not(self):
self.assertFalse(
self.sot._service_cleanup_resource_filters_evaluation(
self.res,
filters={
'updated_at': '2020-01-01T00:00:00'
}
)
)
def test_filters_evaluation_updated_at_missing(self):
self.assertFalse(
self.sot._service_cleanup_resource_filters_evaluation(
self.res_no_updated,
filters={
'updated_at': '2020-01-01T00:00:00'
}
)
)
def test_filters_empty(self):
self.assertTrue(
self.sot._service_cleanup_resource_filters_evaluation(
self.res_no_updated
)
)
def test_service_cleanup_dry_run(self):
self.assertTrue(
self.sot._service_cleanup_del_res(
self.delete_mock,
self.res,
dry_run=True
)
)
self.delete_mock.assert_not_called()
def test_service_cleanup_dry_run_default(self):
self.assertTrue(
self.sot._service_cleanup_del_res(
self.delete_mock,
self.res
)
)
self.delete_mock.assert_not_called()
def test_service_cleanup_real_run(self):
self.assertTrue(
self.sot._service_cleanup_del_res(
self.delete_mock,
self.res,
dry_run=False,
)
)
self.delete_mock.assert_called_with(self.res)
def test_service_cleanup_real_run_identified_resources(self):
rd = dict()
self.assertTrue(
self.sot._service_cleanup_del_res(
self.delete_mock,
self.res,
dry_run=False,
identified_resources=rd
)
)
self.delete_mock.assert_called_with(self.res)
self.assertEqual(self.res, rd[self.res.id])
def test_service_cleanup_resource_evaluation_false(self):
self.assertFalse(
self.sot._service_cleanup_del_res(
self.delete_mock,
self.res,
dry_run=False,
resource_evaluation_fn=lambda x, y, z: False
)
)
self.delete_mock.assert_not_called()
def test_service_cleanup_resource_evaluation_true(self):
self.assertTrue(
self.sot._service_cleanup_del_res(
self.delete_mock,
self.res,
dry_run=False,
resource_evaluation_fn=lambda x, y, z: True
)
)
self.delete_mock.assert_called()
def test_service_cleanup_resource_evaluation_override_filters(self):
self.assertFalse(
self.sot._service_cleanup_del_res(
self.delete_mock,
self.res,
dry_run=False,
resource_evaluation_fn=lambda x, y, z: False,
filters={'created_at': '2200-01-01'}
)
)
def test_service_cleanup_filters(self):
self.assertTrue(
self.sot._service_cleanup_del_res(
self.delete_mock,
self.res,
dry_run=False,
filters={'created_at': '2200-01-01'}
)
)
self.delete_mock.assert_called()
def test_service_cleanup_queue(self):
q = queue.Queue()
self.assertTrue(
self.sot._service_cleanup_del_res(
self.delete_mock,
self.res,
dry_run=False,
client_status_queue=q,
filters={'created_at': '2200-01-01'}
)
)
self.assertEqual(self.res, q.get_nowait())