diff --git a/glare/cmd/scrubber.py b/glare/cmd/scrubber.py new file mode 100644 index 0000000..693ecfc --- /dev/null +++ b/glare/cmd/scrubber.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python + +# Copyright 2017 - Nokia Networks +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Glare Scrub Service +""" + +import os +import sys + +# If ../glare/__init__.py exists, add ../ to Python search path, so that +# it will override what happens to be installed in /usr/(local/)lib/python... +possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), + os.pardir, + os.pardir)) +if os.path.exists(os.path.join(possible_topdir, 'glare', '__init__.py')): + sys.path.insert(0, possible_topdir) +import eventlet + +import glance_store +from oslo_config import cfg +from oslo_log import log as logging + +from glare.common import config +from glare import scrubber + +eventlet.patcher.monkey_patch(all=False, socket=True, time=True, select=True, + thread=True, os=True) + +CONF = cfg.CONF +logging.register_options(CONF) +CONF.set_default(name='use_stderr', default=True) + + +def main(): + CONF.register_cli_opts(scrubber.scrubber_cmd_cli_opts, group='scrubber') + CONF.register_opts(scrubber.scrubber_cmd_opts, group='scrubber') + + try: + config.parse_args() + logging.setup(CONF, 'glare') + + glance_store.register_opts(config.CONF) + glance_store.create_stores(config.CONF) + glance_store.verify_default_store() + + app = scrubber.Scrubber() + + if CONF.scrubber.daemon: + server = scrubber.Daemon(CONF.scrubber.wakeup_time) + server.start(app) + server.wait() + else: + app.run() + except RuntimeError as e: + sys.exit("ERROR: %s" % e) + + +if __name__ == '__main__': + main() diff --git a/glare/opts.py b/glare/opts.py index 0530384..83bfd8b 100644 --- a/glare/opts.py +++ b/glare/opts.py @@ -30,6 +30,7 @@ import glare.common.wsgi import glare.notification import glare.objects.base import glare.objects.meta.registry +import glare.scrubber _artifacts_opts = [ (None, list(itertools.chain( @@ -45,7 +46,11 @@ _artifacts_opts = [ glare.objects.meta.registry.registry_options))), profiler.list_opts()[0], ('paste_deploy', glare.common.config.paste_deploy_opts), - ('keycloak_oidc', glare.api.middleware.keycloak_auth.keycloak_oidc_opts) + ('keycloak_oidc', glare.api.middleware.keycloak_auth.keycloak_oidc_opts), + ('scrubber', + glare.scrubber.scrubber_opts + + glare.scrubber.scrubber_cmd_opts + + glare.scrubber.scrubber_cmd_cli_opts) ] diff --git a/glare/scrubber.py b/glare/scrubber.py new file mode 100644 index 0000000..52d81b4 --- /dev/null +++ b/glare/scrubber.py @@ -0,0 +1,171 @@ +# Copyright 2017 - Nokia Networks +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import eventlet +from oslo_config import cfg +from oslo_log import log as logging + +from glare.api.middleware import context +from glare.common import exception +from glare.common import store_api +from glare.db.sqlalchemy import api as db_api +from glare.i18n import _ + +LOG = logging.getLogger(__name__) + +scrubber_opts = [ + cfg.IntOpt('scrub_time', default=0, min=0, + help=_(""" +The amount of time, in seconds, to delay artifact scrubbing. +When delayed delete is turned on, an artifact is put into +``deleted`` state upon deletion until the scrubber deletes its data. +Typically, soon +after the artifact is put into ``deleted`` state, it is available +for scrubbing. However, scrubbing can be delayed until a later point +using this configuration option. This option denotes the time period +an artifact spends in ``deleted`` state before it is available for +scrubbing. +It is important to realize that this has storage implications. The +larger the ``scrub_time``, the longer the time to reclaim backend +storage from deleted artifacts. +Possible values: + * Any non-negative integer +Related options: + * ``delayed_delete`` +""")), + cfg.IntOpt('scrub_pool_size', default=1, min=1, + help=_(""" +The size of thread pool to be used for scrubbing artifacts. +When there are a large number of artifacts to scrub, it is +beneficial to scrub artifacts in parallel so that the scrub queue +stays in control and the backend storage is reclaimed in a timely +fashion. This configuration option denotes the maximum number of +artifacts to be scrubbed in parallel. The default value is one, +which signifies serial scrubbing. Any value above one indicates +parallel scrubbing. +Possible values: + * Any non-zero positive integer +Related options: + * ``delayed_delete`` +""")), +] + +scrubber_cmd_opts = [ + cfg.IntOpt('wakeup_time', default=300, min=0, + help=_(""" +Time interval, in seconds, between scrubber runs in daemon mode. +Scrubber can be run either as a cron job or daemon. When run as a +daemon, this configuration time specifies the time period between +two runs. When the scrubber wakes up, it fetches and scrubs all +``deleted`` artifacts that are available for scrubbing after taking +``scrub_time`` into consideration. +If the ``wakeup_time`` is set to a large number, there may be a large +number of artifacts to be scrubbed for each run. Also, this impacts +how quickly the backend storage is reclaimed. +Possible values: + * Any non-negative integer +Related options: + * ``daemon`` + * ``delayed_delete`` +""")) +] + +scrubber_cmd_cli_opts = [ + cfg.BoolOpt('daemon', + short='D', + default=False, + help=_(""" +Run scrubber as a daemon. +This boolean configuration option indicates whether scrubber should +run as a long-running process that wakes up at regular intervals to +scrub artifacts. The wake up interval can be specified using the +configuration option ``wakeup_time``. +If this configuration option is set to ``False``, which is the +default value, scrubber runs once to scrub artifacts and exits. +In this case, if the operator wishes to implement continuous +scrubbing of artifacts, scrubber needs to be scheduled as a cron job. +Possible values: + * True + * False +Related options: + * ``wakeup_time`` +""")) +] + +CONF = cfg.CONF +CONF.register_opts(scrubber_opts, group='scrubber') + + +class Daemon(object): + def __init__(self, wakeup_time=300, threads=100): + LOG.info("Starting Daemon: wakeup_time=%(wakeup_time)s " + "threads=%(threads)s", + {'wakeup_time': wakeup_time, 'threads': threads}) + self.wakeup_time = wakeup_time + self.event = eventlet.event.Event() + # This pool is used for periodic instantiation of scrubber + self.daemon_pool = eventlet.greenpool.GreenPool(threads) + + def start(self, application): + self._run(application) + + def wait(self): + try: + self.event.wait() + except KeyboardInterrupt: + LOG.info("Daemon Shutdown on KeyboardInterrupt") + + def _run(self, application): + LOG.debug("Running scrubber application") + self.daemon_pool.spawn_n(application.run, self.event) + eventlet.spawn_after(self.wakeup_time, self._run, application) + LOG.debug("Next run scheduled in %s seconds", self.wakeup_time) + + +class Scrubber(object): + def __init__(self): + self.context = context.RequestContext() + self.context.is_admin = True + self.pool = eventlet.greenpool.GreenPool( + CONF.scrubber.scrub_pool_size) + + def run(self, event=None): + while True: + artifacts = db_api._get_all( + context=self.context, + session=db_api.get_session(), + limit=CONF.scrubber.scrub_pool_size, + sort=[], + filters=[('status', None, 'eq', None, 'deleted')]) + if not artifacts: + break + self.pool.imap(self._scrub_artifact, artifacts) + + @staticmethod + def _scrub_artifact(af): + LOG.info("Begin scrubbing of artifact %s" % af.id) + for blob in af.blobs: + if not blob.external: + try: + store_api.delete_blob(blob.url, context=context) + except exception.NotFound: + # data has already been removed + pass + LOG.info("Blobs successfully deleted for artifact %s", af.id) + + # delete artifact itself + db_api.delete(context, af.id, db_api.get_session()) + + LOG.info("Artifact %s was scrubbed" % af.id) diff --git a/glare/tests/functional/__init__.py b/glare/tests/functional/__init__.py index 28b9a8d..860fb21 100644 --- a/glare/tests/functional/__init__.py +++ b/glare/tests/functional/__init__.py @@ -336,6 +336,56 @@ paste.filter_factory = """ +class ScrubberDaemon(Server): + """ + Server object that starts/stops/manages the Scrubber server + """ + + def __init__(self, test_dir, policy_file, daemon=False, **kwargs): + # NOTE(jkoelker): Set the port to 0 since we actually don't listen + super(ScrubberDaemon, self).__init__(test_dir, 0) + self.server_name = 'scrubber' + self.server_module = 'glare.cmd.%s' % self.server_name + self.daemon = daemon + + self.blob_dir = os.path.join(self.test_dir, "artifacts") + self.scrub_time = 5 + self.pid_file = os.path.join(self.test_dir, "scrubber.pid") + self.log_file = os.path.join(self.test_dir, "scrubber.log") + self.lock_path = self.test_dir + + default_sql_connection = 'sqlite:////%s/tests.sqlite' % self.test_dir + self.sql_connection = os.environ.get('GLARE_TEST_SQL_CONNECTION', + default_sql_connection) + self.policy_file = policy_file + self.policy_default_rule = 'default' + + self.conf_base = """[DEFAULT] +debug = %(debug)s +log_file = %(log_file)s +[scrubber] +daemon = %(daemon)s +wakeup_time = 2 +scrub_time = %(scrub_time)s +[glance_store] +filesystem_store_datadir=%(blob_dir)s +[oslo_policy] +policy_file = %(policy_file)s +policy_default_rule = %(policy_default_rule)s +[database] +connection = %(sql_connection)s +idle_timeout = 3600 +""" + + def start(self, expect_exit=True, expected_exitcode=0, **kwargs): + if 'daemon' in kwargs: + expect_exit = False + return super(ScrubberDaemon, self).start( + expect_exit=expect_exit, + expected_exitcode=expected_exitcode, + **kwargs) + + class FunctionalTest(test_utils.BaseTestCase): """Base test class for any test that wants to test the actual @@ -353,6 +403,8 @@ class FunctionalTest(test_utils.BaseTestCase): self.api_protocol = 'http' self.glare_port, glare_sock = test_utils.get_unused_port_and_socket() + self.include_scrubber = False + self.tracecmd = tracecmd_osmap.get(platform.system()) conf_dir = os.path.join(self.test_dir, 'etc') @@ -365,7 +417,10 @@ class FunctionalTest(test_utils.BaseTestCase): self.policy_file, sock=glare_sock) - self.pid_files = [self.glare_server.pid_file] + self.scrubber_daemon = ScrubberDaemon(self.test_dir, self.policy_file) + + self.pid_files = [self.glare_server.pid_file, + self.scrubber_daemon.pid_file] self.files_to_destroy = [] self.launched_servers = [] @@ -379,6 +434,7 @@ class FunctionalTest(test_utils.BaseTestCase): super(FunctionalTest, self).tearDown() self.glare_server.dump_log('glare_server') + self.scrubber_daemon.dump_log('scrubber_daemon') def set_policy_rules(self, rules): with open(self.policy_file, 'w') as fap: @@ -423,7 +479,8 @@ class FunctionalTest(test_utils.BaseTestCase): # server is dead. This eliminates the possibility of a race # between a child process listening on a port actually dying # and a new process being started - servers = [self.glare_server] + servers = [self.glare_server, + self.scrubber_daemon] for s in servers: try: s.stop() @@ -510,6 +567,12 @@ class FunctionalTest(test_utils.BaseTestCase): self.start_with_retry(self.glare_server, 'glare_port', 3, **kwargs) + if self.include_scrubber: + exitcode, out, err = self.scrubber_daemon.start(**kwargs) + self.assertEqual(0, exitcode, + "Failed to spin up the Scrubber daemon. " + "Got: %s" % err) + def ping_server(self, port): """Simple ping on the port. If responsive, return True, else return False. @@ -580,7 +643,7 @@ class FunctionalTest(test_utils.BaseTestCase): return msg if expect_launch else None - def stop_server(self, server, name): + def stop_server(self, server): """Called to stop a single server in a normal fashion. :param server: the server to stop @@ -589,7 +652,10 @@ class FunctionalTest(test_utils.BaseTestCase): server.stop() def stop_servers(self): - self.stop_server(self.glare_server, 'Glare server') + self.stop_server(self.glare_server) + + if self.include_scrubber: + self.stop_server(self.scrubber_daemon) self._reset_database(self.glare_server.sql_connection) diff --git a/glare/tests/functional/test_scrubber.py b/glare/tests/functional/test_scrubber.py new file mode 100644 index 0000000..bffa65a --- /dev/null +++ b/glare/tests/functional/test_scrubber.py @@ -0,0 +1,144 @@ +# Copyright 2017 - Nokia Networks +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import time + +from oslo_serialization import jsonutils +from six.moves import range + +from glare.tests import functional +from glare.tests.functional import base +from glare.tests.utils import execute + + +class TestScrubber(base.TestArtifact): + + """Test that delayed_delete works and the scrubber deletes""" + + def setUp(self): + functional.FunctionalTest.setUp(self) + + self.include_scrubber = True + self.set_user('user1') + self.glare_server.deployment_flavor = 'noauth' + + self.glare_server.enabled_artifact_types = ','.join( + self.enabled_types) + self.glare_server.custom_artifact_types_modules = ( + 'glare.tests.sample_artifact') + + def _create_sample_artifact(self): + art = self.create_artifact({'name': 'test_art', + 'version': '1.0'}) + + url = '/sample_artifact/%s' % art['id'] + headers = {'Content-Type': 'application/octet-stream'} + + # upload data to blob + self.put(url=url + '/small_blob', data='aaaaaa', + headers=headers) + + # upload a couple of blobs to dict_of_blobs + self.put(url + '/dict_of_blobs/blob1', data='bbbb', + headers=headers) + self.put(url + '/dict_of_blobs/blob2', data='cccc', + headers=headers) + + # add external location + body = jsonutils.dumps( + {'url': 'https://www.apache.org/licenses/LICENSE-2.0.txt', + 'md5': "fake", 'sha1': "fake_sha", "sha256": "fake_sha256"}) + headers = {'Content-Type': + 'application/vnd+openstack.glare-custom-location+json'} + self.put(url=url + '/blob', data=body, status=200, + headers=headers) + + return url + + def test_scrubber_delayed_delete(self): + """ + Test that artifacts don't get deleted immediately and that the scrubber + scrubs them. + """ + self.start_servers(delayed_delete=True, daemon=True, + **self.__dict__.copy()) + + url = self._create_sample_artifact() + + # create another artifact + art2 = self.create_artifact({'name': 'test_art', 'version': '2.0'}) + + # delete sample artifact + self.delete(url=url) + art = self.get(url) + self.assertEqual('deleted', art['status']) + + self.wait_for_scrub(url) + + # check that the second artifact wasn't removed + art = self.get('/sample_artifact/%s' % art2['id']) + self.assertEqual('drafted', art['status']) + + def test_scrubber_app(self): + """ + Test that the scrubber script runs successfully when not in + daemon mode. + """ + self.start_servers(delayed_delete=True, + **self.__dict__.copy()) + + url = self._create_sample_artifact() + + # wait for the scrub time on the artifacts to pass + time.sleep(self.scrubber_daemon.scrub_time) + + # create another artifact + art2 = self.create_artifact({'name': 'test_art', 'version': '2.0'}) + + # delete sample artifact + self.delete(url=url) + art = self.get(url) + self.assertEqual('deleted', art['status']) + + # scrub artifacts and make sure they are deleted + exe_cmd = "%s -m glare.cmd.scrubber" % sys.executable + cmd = ("%s --config-file %s" % + (exe_cmd, self.scrubber_daemon.conf_file_name)) + exitcode, out, err = execute(cmd, raise_error=False) + self.assertEqual(0, exitcode) + + self.wait_for_scrub(url) + + # check that the second artifact wasn't removed + art = self.get('/sample_artifact/%s' % art2['id']) + self.assertEqual('drafted', art['status']) + + def wait_for_scrub(self, url): + """ + The build servers sometimes take longer than 15 seconds + to scrub. Give it up to 5 min, checking every 5 seconds. + When/if it flips to deleted, bail immediately. + """ + wait_for = 300 # seconds + check_every = 5 # seconds + for _ in range(wait_for // check_every): + time.sleep(check_every) + try: + self.get(url, status=404) + return + except Exception: + pass + else: + self.fail("Artifact wasn't scrubbed") diff --git a/setup.cfg b/setup.cfg index 3c49087..058d675 100644 --- a/setup.cfg +++ b/setup.cfg @@ -31,6 +31,7 @@ data_files = console_scripts = glare-api = glare.cmd.api:main glare-db-manage = glare.cmd.db_manage:main + glare-scrubber = glare.cmd.scrubber:main oslo.config.opts = glare = glare.opts:list_artifacts_opts oslo.policy.enforcer =