Implement scrubber util

Scrubber is a utility, that cleans up artifacts that have been marked
as deleted, when delayed_delete mechanism is activated. Enabling
delayed_delete option means that after delete API call, the artifact
will change its status to deleted, but it's data will not be deleted
immediately. Afterwards (according scrubber settings), scrubber will
detect the artifacts with 'deleted' statuses and delete them completely.

The scrubber can be run as a background periodically task or once as
a standalone tool.

Implements-blueprint: scrubber

Change-Id: Icada538521be927bf8e42512fa2d0be9c97c73ff
This commit is contained in:
Mike Fedosin 2017-06-15 03:48:39 +03:00
parent ead2d541e4
commit fb60b14159
6 changed files with 465 additions and 5 deletions

73
glare/cmd/scrubber.py Normal file
View File

@ -0,0 +1,73 @@
#!/usr/bin/env python
# Copyright 2017 - Nokia Networks
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Glare Scrub Service
"""
import os
import sys
# If ../glare/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(possible_topdir, 'glare', '__init__.py')):
sys.path.insert(0, possible_topdir)
import eventlet
import glance_store
from oslo_config import cfg
from oslo_log import log as logging
from glare.common import config
from glare import scrubber
eventlet.patcher.monkey_patch(all=False, socket=True, time=True, select=True,
thread=True, os=True)
CONF = cfg.CONF
logging.register_options(CONF)
CONF.set_default(name='use_stderr', default=True)
def main():
CONF.register_cli_opts(scrubber.scrubber_cmd_cli_opts, group='scrubber')
CONF.register_opts(scrubber.scrubber_cmd_opts, group='scrubber')
try:
config.parse_args()
logging.setup(CONF, 'glare')
glance_store.register_opts(config.CONF)
glance_store.create_stores(config.CONF)
glance_store.verify_default_store()
app = scrubber.Scrubber()
if CONF.scrubber.daemon:
server = scrubber.Daemon(CONF.scrubber.wakeup_time)
server.start(app)
server.wait()
else:
app.run()
except RuntimeError as e:
sys.exit("ERROR: %s" % e)
if __name__ == '__main__':
main()

View File

@ -30,6 +30,7 @@ import glare.common.wsgi
import glare.notification
import glare.objects.base
import glare.objects.meta.registry
import glare.scrubber
_artifacts_opts = [
(None, list(itertools.chain(
@ -45,7 +46,11 @@ _artifacts_opts = [
glare.objects.meta.registry.registry_options))),
profiler.list_opts()[0],
('paste_deploy', glare.common.config.paste_deploy_opts),
('keycloak_oidc', glare.api.middleware.keycloak_auth.keycloak_oidc_opts)
('keycloak_oidc', glare.api.middleware.keycloak_auth.keycloak_oidc_opts),
('scrubber',
glare.scrubber.scrubber_opts +
glare.scrubber.scrubber_cmd_opts +
glare.scrubber.scrubber_cmd_cli_opts)
]

171
glare/scrubber.py Normal file
View File

@ -0,0 +1,171 @@
# Copyright 2017 - Nokia Networks
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet
from oslo_config import cfg
from oslo_log import log as logging
from glare.api.middleware import context
from glare.common import exception
from glare.common import store_api
from glare.db.sqlalchemy import api as db_api
from glare.i18n import _
LOG = logging.getLogger(__name__)
scrubber_opts = [
cfg.IntOpt('scrub_time', default=0, min=0,
help=_("""
The amount of time, in seconds, to delay artifact scrubbing.
When delayed delete is turned on, an artifact is put into
``deleted`` state upon deletion until the scrubber deletes its data.
Typically, soon
after the artifact is put into ``deleted`` state, it is available
for scrubbing. However, scrubbing can be delayed until a later point
using this configuration option. This option denotes the time period
an artifact spends in ``deleted`` state before it is available for
scrubbing.
It is important to realize that this has storage implications. The
larger the ``scrub_time``, the longer the time to reclaim backend
storage from deleted artifacts.
Possible values:
* Any non-negative integer
Related options:
* ``delayed_delete``
""")),
cfg.IntOpt('scrub_pool_size', default=1, min=1,
help=_("""
The size of thread pool to be used for scrubbing artifacts.
When there are a large number of artifacts to scrub, it is
beneficial to scrub artifacts in parallel so that the scrub queue
stays in control and the backend storage is reclaimed in a timely
fashion. This configuration option denotes the maximum number of
artifacts to be scrubbed in parallel. The default value is one,
which signifies serial scrubbing. Any value above one indicates
parallel scrubbing.
Possible values:
* Any non-zero positive integer
Related options:
* ``delayed_delete``
""")),
]
scrubber_cmd_opts = [
cfg.IntOpt('wakeup_time', default=300, min=0,
help=_("""
Time interval, in seconds, between scrubber runs in daemon mode.
Scrubber can be run either as a cron job or daemon. When run as a
daemon, this configuration time specifies the time period between
two runs. When the scrubber wakes up, it fetches and scrubs all
``deleted`` artifacts that are available for scrubbing after taking
``scrub_time`` into consideration.
If the ``wakeup_time`` is set to a large number, there may be a large
number of artifacts to be scrubbed for each run. Also, this impacts
how quickly the backend storage is reclaimed.
Possible values:
* Any non-negative integer
Related options:
* ``daemon``
* ``delayed_delete``
"""))
]
scrubber_cmd_cli_opts = [
cfg.BoolOpt('daemon',
short='D',
default=False,
help=_("""
Run scrubber as a daemon.
This boolean configuration option indicates whether scrubber should
run as a long-running process that wakes up at regular intervals to
scrub artifacts. The wake up interval can be specified using the
configuration option ``wakeup_time``.
If this configuration option is set to ``False``, which is the
default value, scrubber runs once to scrub artifacts and exits.
In this case, if the operator wishes to implement continuous
scrubbing of artifacts, scrubber needs to be scheduled as a cron job.
Possible values:
* True
* False
Related options:
* ``wakeup_time``
"""))
]
CONF = cfg.CONF
CONF.register_opts(scrubber_opts, group='scrubber')
class Daemon(object):
def __init__(self, wakeup_time=300, threads=100):
LOG.info("Starting Daemon: wakeup_time=%(wakeup_time)s "
"threads=%(threads)s",
{'wakeup_time': wakeup_time, 'threads': threads})
self.wakeup_time = wakeup_time
self.event = eventlet.event.Event()
# This pool is used for periodic instantiation of scrubber
self.daemon_pool = eventlet.greenpool.GreenPool(threads)
def start(self, application):
self._run(application)
def wait(self):
try:
self.event.wait()
except KeyboardInterrupt:
LOG.info("Daemon Shutdown on KeyboardInterrupt")
def _run(self, application):
LOG.debug("Running scrubber application")
self.daemon_pool.spawn_n(application.run, self.event)
eventlet.spawn_after(self.wakeup_time, self._run, application)
LOG.debug("Next run scheduled in %s seconds", self.wakeup_time)
class Scrubber(object):
def __init__(self):
self.context = context.RequestContext()
self.context.is_admin = True
self.pool = eventlet.greenpool.GreenPool(
CONF.scrubber.scrub_pool_size)
def run(self, event=None):
while True:
artifacts = db_api._get_all(
context=self.context,
session=db_api.get_session(),
limit=CONF.scrubber.scrub_pool_size,
sort=[],
filters=[('status', None, 'eq', None, 'deleted')])
if not artifacts:
break
self.pool.imap(self._scrub_artifact, artifacts)
@staticmethod
def _scrub_artifact(af):
LOG.info("Begin scrubbing of artifact %s" % af.id)
for blob in af.blobs:
if not blob.external:
try:
store_api.delete_blob(blob.url, context=context)
except exception.NotFound:
# data has already been removed
pass
LOG.info("Blobs successfully deleted for artifact %s", af.id)
# delete artifact itself
db_api.delete(context, af.id, db_api.get_session())
LOG.info("Artifact %s was scrubbed" % af.id)

View File

@ -336,6 +336,56 @@ paste.filter_factory =
"""
class ScrubberDaemon(Server):
"""
Server object that starts/stops/manages the Scrubber server
"""
def __init__(self, test_dir, policy_file, daemon=False, **kwargs):
# NOTE(jkoelker): Set the port to 0 since we actually don't listen
super(ScrubberDaemon, self).__init__(test_dir, 0)
self.server_name = 'scrubber'
self.server_module = 'glare.cmd.%s' % self.server_name
self.daemon = daemon
self.blob_dir = os.path.join(self.test_dir, "artifacts")
self.scrub_time = 5
self.pid_file = os.path.join(self.test_dir, "scrubber.pid")
self.log_file = os.path.join(self.test_dir, "scrubber.log")
self.lock_path = self.test_dir
default_sql_connection = 'sqlite:////%s/tests.sqlite' % self.test_dir
self.sql_connection = os.environ.get('GLARE_TEST_SQL_CONNECTION',
default_sql_connection)
self.policy_file = policy_file
self.policy_default_rule = 'default'
self.conf_base = """[DEFAULT]
debug = %(debug)s
log_file = %(log_file)s
[scrubber]
daemon = %(daemon)s
wakeup_time = 2
scrub_time = %(scrub_time)s
[glance_store]
filesystem_store_datadir=%(blob_dir)s
[oslo_policy]
policy_file = %(policy_file)s
policy_default_rule = %(policy_default_rule)s
[database]
connection = %(sql_connection)s
idle_timeout = 3600
"""
def start(self, expect_exit=True, expected_exitcode=0, **kwargs):
if 'daemon' in kwargs:
expect_exit = False
return super(ScrubberDaemon, self).start(
expect_exit=expect_exit,
expected_exitcode=expected_exitcode,
**kwargs)
class FunctionalTest(test_utils.BaseTestCase):
"""Base test class for any test that wants to test the actual
@ -353,6 +403,8 @@ class FunctionalTest(test_utils.BaseTestCase):
self.api_protocol = 'http'
self.glare_port, glare_sock = test_utils.get_unused_port_and_socket()
self.include_scrubber = False
self.tracecmd = tracecmd_osmap.get(platform.system())
conf_dir = os.path.join(self.test_dir, 'etc')
@ -365,7 +417,10 @@ class FunctionalTest(test_utils.BaseTestCase):
self.policy_file,
sock=glare_sock)
self.pid_files = [self.glare_server.pid_file]
self.scrubber_daemon = ScrubberDaemon(self.test_dir, self.policy_file)
self.pid_files = [self.glare_server.pid_file,
self.scrubber_daemon.pid_file]
self.files_to_destroy = []
self.launched_servers = []
@ -379,6 +434,7 @@ class FunctionalTest(test_utils.BaseTestCase):
super(FunctionalTest, self).tearDown()
self.glare_server.dump_log('glare_server')
self.scrubber_daemon.dump_log('scrubber_daemon')
def set_policy_rules(self, rules):
with open(self.policy_file, 'w') as fap:
@ -423,7 +479,8 @@ class FunctionalTest(test_utils.BaseTestCase):
# server is dead. This eliminates the possibility of a race
# between a child process listening on a port actually dying
# and a new process being started
servers = [self.glare_server]
servers = [self.glare_server,
self.scrubber_daemon]
for s in servers:
try:
s.stop()
@ -510,6 +567,12 @@ class FunctionalTest(test_utils.BaseTestCase):
self.start_with_retry(self.glare_server, 'glare_port', 3, **kwargs)
if self.include_scrubber:
exitcode, out, err = self.scrubber_daemon.start(**kwargs)
self.assertEqual(0, exitcode,
"Failed to spin up the Scrubber daemon. "
"Got: %s" % err)
def ping_server(self, port):
"""Simple ping on the port. If responsive, return True, else
return False.
@ -580,7 +643,7 @@ class FunctionalTest(test_utils.BaseTestCase):
return msg if expect_launch else None
def stop_server(self, server, name):
def stop_server(self, server):
"""Called to stop a single server in a normal fashion.
:param server: the server to stop
@ -589,7 +652,10 @@ class FunctionalTest(test_utils.BaseTestCase):
server.stop()
def stop_servers(self):
self.stop_server(self.glare_server, 'Glare server')
self.stop_server(self.glare_server)
if self.include_scrubber:
self.stop_server(self.scrubber_daemon)
self._reset_database(self.glare_server.sql_connection)

View File

@ -0,0 +1,144 @@
# Copyright 2017 - Nokia Networks
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import time
from oslo_serialization import jsonutils
from six.moves import range
from glare.tests import functional
from glare.tests.functional import base
from glare.tests.utils import execute
class TestScrubber(base.TestArtifact):
"""Test that delayed_delete works and the scrubber deletes"""
def setUp(self):
functional.FunctionalTest.setUp(self)
self.include_scrubber = True
self.set_user('user1')
self.glare_server.deployment_flavor = 'noauth'
self.glare_server.enabled_artifact_types = ','.join(
self.enabled_types)
self.glare_server.custom_artifact_types_modules = (
'glare.tests.sample_artifact')
def _create_sample_artifact(self):
art = self.create_artifact({'name': 'test_art',
'version': '1.0'})
url = '/sample_artifact/%s' % art['id']
headers = {'Content-Type': 'application/octet-stream'}
# upload data to blob
self.put(url=url + '/small_blob', data='aaaaaa',
headers=headers)
# upload a couple of blobs to dict_of_blobs
self.put(url + '/dict_of_blobs/blob1', data='bbbb',
headers=headers)
self.put(url + '/dict_of_blobs/blob2', data='cccc',
headers=headers)
# add external location
body = jsonutils.dumps(
{'url': 'https://www.apache.org/licenses/LICENSE-2.0.txt',
'md5': "fake", 'sha1': "fake_sha", "sha256": "fake_sha256"})
headers = {'Content-Type':
'application/vnd+openstack.glare-custom-location+json'}
self.put(url=url + '/blob', data=body, status=200,
headers=headers)
return url
def test_scrubber_delayed_delete(self):
"""
Test that artifacts don't get deleted immediately and that the scrubber
scrubs them.
"""
self.start_servers(delayed_delete=True, daemon=True,
**self.__dict__.copy())
url = self._create_sample_artifact()
# create another artifact
art2 = self.create_artifact({'name': 'test_art', 'version': '2.0'})
# delete sample artifact
self.delete(url=url)
art = self.get(url)
self.assertEqual('deleted', art['status'])
self.wait_for_scrub(url)
# check that the second artifact wasn't removed
art = self.get('/sample_artifact/%s' % art2['id'])
self.assertEqual('drafted', art['status'])
def test_scrubber_app(self):
"""
Test that the scrubber script runs successfully when not in
daemon mode.
"""
self.start_servers(delayed_delete=True,
**self.__dict__.copy())
url = self._create_sample_artifact()
# wait for the scrub time on the artifacts to pass
time.sleep(self.scrubber_daemon.scrub_time)
# create another artifact
art2 = self.create_artifact({'name': 'test_art', 'version': '2.0'})
# delete sample artifact
self.delete(url=url)
art = self.get(url)
self.assertEqual('deleted', art['status'])
# scrub artifacts and make sure they are deleted
exe_cmd = "%s -m glare.cmd.scrubber" % sys.executable
cmd = ("%s --config-file %s" %
(exe_cmd, self.scrubber_daemon.conf_file_name))
exitcode, out, err = execute(cmd, raise_error=False)
self.assertEqual(0, exitcode)
self.wait_for_scrub(url)
# check that the second artifact wasn't removed
art = self.get('/sample_artifact/%s' % art2['id'])
self.assertEqual('drafted', art['status'])
def wait_for_scrub(self, url):
"""
The build servers sometimes take longer than 15 seconds
to scrub. Give it up to 5 min, checking every 5 seconds.
When/if it flips to deleted, bail immediately.
"""
wait_for = 300 # seconds
check_every = 5 # seconds
for _ in range(wait_for // check_every):
time.sleep(check_every)
try:
self.get(url, status=404)
return
except Exception:
pass
else:
self.fail("Artifact wasn't scrubbed")

View File

@ -31,6 +31,7 @@ data_files =
console_scripts =
glare-api = glare.cmd.api:main
glare-db-manage = glare.cmd.db_manage:main
glare-scrubber = glare.cmd.scrubber:main
oslo.config.opts =
glare = glare.opts:list_artifacts_opts
oslo.policy.enforcer =