Merge "Let developers/operators add watchers to object audit"
This commit is contained in:
commit
92b15b0436
126
doc/source/development_watchers.rst
Normal file
126
doc/source/development_watchers.rst
Normal file
@ -0,0 +1,126 @@
|
||||
================
|
||||
Auditor Watchers
|
||||
================
|
||||
|
||||
--------
|
||||
Overview
|
||||
--------
|
||||
|
||||
The duty of auditors is to guard Swift against corruption in the
|
||||
storage media. But because auditors crawl all objects, they can be
|
||||
used to program Swift to operate on every object. It is done through
|
||||
an API known as "watcher".
|
||||
|
||||
Watchers do not have any private view into the cluster.
|
||||
An operator can write a standalone program that walks the
|
||||
directories and performs any desired inspection or maintenance.
|
||||
What watcher brings to the table is a framework to do the same
|
||||
job easily, under resource restrictions already in place
|
||||
for the auditor.
|
||||
|
||||
Operations performed by watchers are often site-specific, or else
|
||||
they would be incorporated into Swift already. However, the code in
|
||||
the tree provides a reference implementation for convenience.
|
||||
It is located in swift/obj/watchers/dark_data.py and implements
|
||||
so-called "Dark Data Watcher".
|
||||
|
||||
Currently, only object auditor supports the watchers.
|
||||
|
||||
-------------
|
||||
The API class
|
||||
-------------
|
||||
|
||||
The implementation of a watcher is a Python class that may look like this::
|
||||
|
||||
class MyWatcher(object):
|
||||
|
||||
def __init__(self, conf, logger, **kwargs):
|
||||
pass
|
||||
|
||||
def start(self, audit_type, **kwargs):
|
||||
pass
|
||||
|
||||
def see_object(self, object_metadata, policy_index, partition,
|
||||
data_file_path, **kwargs):
|
||||
pass
|
||||
|
||||
def end(self, **kwargs):
|
||||
pass
|
||||
|
||||
Arguments to watcher methods are passed as keyword arguments,
|
||||
and methods are expected to consume new, unknown arguments.
|
||||
|
||||
The method __init__() is used to save configuration and logger
|
||||
at the start of the plug-in.
|
||||
|
||||
The method start() is invoked when auditor starts a pass.
|
||||
It usually resets counters. The argument `auditor_type` is string of
|
||||
`"ALL"` or `"ZBF"`, according to the type of the auditor running
|
||||
the watcher. Watchers that talk to the network tend to hang off the
|
||||
ALL-type auditor, the lightweight ones are okay with the ZBF-type.
|
||||
|
||||
The method end() is the closing bracket for start(). It is typically
|
||||
used to log something, or dump some statistics.
|
||||
|
||||
The method see_object() is called when auditor completed an audit
|
||||
of an object. This is where most of the work is done.
|
||||
|
||||
The protocol for see_object() allows it to raise a special exception,
|
||||
QuarantienRequested. Auditor catches it and quarantines the object.
|
||||
In general, it's okay for watcher methods to throw exceptions, so
|
||||
an author of a watcher plugin does not have to catch them explicitly
|
||||
with a try:; they can be just permitted to bubble up naturally.
|
||||
|
||||
-------------------
|
||||
Loading the plugins
|
||||
-------------------
|
||||
|
||||
Swift auditor loads watcher classes from eggs, so it is necessary
|
||||
to wrap the class and provide it an entry point::
|
||||
|
||||
$ cat /usr/lib/python3.8/site-p*/mywatcher*egg-info/entry_points.txt
|
||||
[mywatcher.mysection]
|
||||
mywatcherentry = mywatcher:MyWatcher
|
||||
|
||||
Operator tells Swift auditor what plugins to load by adding them
|
||||
to object-server.conf in the section [object-auditor]. It is also
|
||||
possible to pass parameters, arriving in the argument conf{} of
|
||||
method start()::
|
||||
|
||||
[object-auditor]
|
||||
watchers = mywatcher#mywatcherentry,swift#dark_data
|
||||
|
||||
[object-auditor:watcher:mywatcher#mywatcherentry]
|
||||
myparam=testing2020
|
||||
|
||||
Do not forget to remove the watcher from auditors when done.
|
||||
Although the API itself is very lightweight, it is common for watchers
|
||||
to incur a significant performance penalty: they can talk to networked
|
||||
services or access additional objects.
|
||||
|
||||
-----------------
|
||||
Dark Data Watcher
|
||||
-----------------
|
||||
|
||||
The watcher API is assumed to be under development. Operators who
|
||||
need extensions are welcome to report any needs for more arguments
|
||||
to see_object(). For now, start by copying the provided template watcher
|
||||
swift/obj/watchers/dark_data.py and see if it is sufficient.
|
||||
|
||||
The name of "Dark Data" refers to the scientific hypothesis of Dark Matter,
|
||||
which supposes that the universe contains a lot of matter than we cannot
|
||||
observe. The Dark Data in Swift is the name of objects that are not
|
||||
accounted in the containers.
|
||||
|
||||
The experience of running large scale clusters suggests that Swift does
|
||||
not have any particular bugs that trigger creation of dark data. So,
|
||||
this is an excercise in writing watchers, with a plausible function.
|
||||
|
||||
When enabled, Dark Data watcher definitely drags down the cluster's overall
|
||||
performance, as mentioned above. Of course, the load increase can be
|
||||
mitigated as usual, but at the expense of the total time taken by
|
||||
the pass of auditor.
|
||||
|
||||
Finally, keep in mind that Dark Data watcher needs the container
|
||||
ring to operate, but runs on an object node. This can come up if
|
||||
cluster has nodes separated by function.
|
@ -88,6 +88,7 @@ Developer Documentation
|
||||
development_auth
|
||||
development_middleware
|
||||
development_ondisk_backends
|
||||
development_watchers
|
||||
|
||||
Administrator Documentation
|
||||
===========================
|
||||
|
@ -480,6 +480,27 @@ use = egg:swift#recon
|
||||
# to 86400 (1 day).
|
||||
# rsync_tempfile_timeout = auto
|
||||
|
||||
# A comma-separated list of watcher entry points. This lets operators
|
||||
# programmatically see audited objects.
|
||||
#
|
||||
# The entry point group name is "swift.object_audit_watcher". If your
|
||||
# setup.py has something like this:
|
||||
#
|
||||
# entry_points={'swift.object_audit_watcher': [
|
||||
# 'some_watcher = some_module:Watcher']}
|
||||
#
|
||||
# then you would enable it with "watchers = some_package#some_watcher".
|
||||
# For example, the built-in reference implementation is enabled as
|
||||
# "watchers = swift#dark_data".
|
||||
#
|
||||
# watchers =
|
||||
|
||||
# Watcher-specific parameters can he added after "object-auditor:watcher:"
|
||||
# like the following (note that entry points are qualified by package#):
|
||||
#
|
||||
# [object-auditor:watcher:swift#dark_data]
|
||||
# action=log
|
||||
|
||||
[object-expirer]
|
||||
# If this true, this expirer will execute tasks from legacy expirer task queue,
|
||||
# at least one object server should run with dequeue_from_legacy = true
|
||||
|
@ -133,6 +133,9 @@ swift.diskfile =
|
||||
replication.fs = swift.obj.diskfile:DiskFileManager
|
||||
erasure_coding.fs = swift.obj.diskfile:ECDiskFileManager
|
||||
|
||||
swift.object_audit_watcher =
|
||||
dark_data = swift.obj.watchers.dark_data:DarkDataWatcher
|
||||
|
||||
[egg_info]
|
||||
tag_build =
|
||||
tag_date = 0
|
||||
|
@ -235,6 +235,10 @@ class UnknownSecretIdError(EncryptionException):
|
||||
pass
|
||||
|
||||
|
||||
class QuarantineRequest(SwiftException):
|
||||
pass
|
||||
|
||||
|
||||
class ClientException(Exception):
|
||||
|
||||
def __init__(self, msg, http_scheme='', http_host='', http_port='',
|
||||
|
@ -3071,6 +3071,27 @@ def readconf(conf_path, section_name=None, log_name=None, defaults=None,
|
||||
return conf
|
||||
|
||||
|
||||
def parse_prefixed_conf(conf_file, prefix):
|
||||
"""
|
||||
Search the config file for any common-prefix sections and load those
|
||||
sections to a dict mapping the after-prefix reference to options.
|
||||
|
||||
:param conf_file: the file name of the config to parse
|
||||
:param prefix: the common prefix of the sections
|
||||
:return: a dict mapping policy reference -> dict of policy options
|
||||
:raises ValueError: if a policy config section has an invalid name
|
||||
"""
|
||||
|
||||
ret_config = {}
|
||||
all_conf = readconf(conf_file)
|
||||
for section, options in all_conf.items():
|
||||
if not section.startswith(prefix):
|
||||
continue
|
||||
target_ref = section[len(prefix):]
|
||||
ret_config[target_ref] = options
|
||||
return ret_config
|
||||
|
||||
|
||||
def write_pickle(obj, dest, tmp=None, pickle_protocol=0):
|
||||
"""
|
||||
Ensure that a pickle file gets written to disk. The file
|
||||
|
@ -25,19 +25,23 @@ from contextlib import closing
|
||||
from eventlet import Timeout
|
||||
|
||||
from swift.obj import diskfile, replicator
|
||||
from swift.common.utils import (
|
||||
get_logger, ratelimit_sleep, dump_recon_cache, list_from_csv, listdir,
|
||||
unlink_paths_older_than, readconf, config_auto_int_value, round_robin_iter)
|
||||
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist,\
|
||||
DiskFileDeleted, DiskFileExpired
|
||||
DiskFileDeleted, DiskFileExpired, QuarantineRequest
|
||||
from swift.common.daemon import Daemon
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.utils import (
|
||||
config_auto_int_value, dump_recon_cache, get_logger, list_from_csv,
|
||||
listdir, load_pkg_resource, parse_prefixed_conf, ratelimit_sleep,
|
||||
readconf, round_robin_iter, unlink_paths_older_than, PrefixLoggerAdapter)
|
||||
|
||||
|
||||
class AuditorWorker(object):
|
||||
"""Walk through file system to audit objects"""
|
||||
|
||||
def __init__(self, conf, logger, rcache, devices, zero_byte_only_at_fps=0):
|
||||
def __init__(self, conf, logger, rcache, devices, zero_byte_only_at_fps=0,
|
||||
watcher_defs=None):
|
||||
if watcher_defs is None:
|
||||
watcher_defs = {}
|
||||
self.conf = conf
|
||||
self.logger = logger
|
||||
self.devices = devices
|
||||
@ -95,6 +99,11 @@ class AuditorWorker(object):
|
||||
self.stats_buckets = dict(
|
||||
[(s, 0) for s in self.stats_sizes + ['OVER']])
|
||||
|
||||
self.watchers = [
|
||||
WatcherWrapper(wdef['klass'], name, wdef['conf'], logger)
|
||||
for name, wdef in watcher_defs.items()]
|
||||
logger.debug("%d audit watcher(s) loaded", len(self.watchers))
|
||||
|
||||
def create_recon_nested_dict(self, top_level_key, device_list, item):
|
||||
if device_list:
|
||||
device_key = ''.join(sorted(device_list))
|
||||
@ -114,6 +123,8 @@ class AuditorWorker(object):
|
||||
'%(description)s)') %
|
||||
{'mode': mode, 'audi_type': self.auditor_type,
|
||||
'description': description})
|
||||
for watcher in self.watchers:
|
||||
watcher.start(self.auditor_type)
|
||||
begin = reported = time.time()
|
||||
self.total_bytes_processed = 0
|
||||
self.total_files_processed = 0
|
||||
@ -187,6 +198,8 @@ class AuditorWorker(object):
|
||||
'frate': self.total_files_processed / elapsed,
|
||||
'brate': self.total_bytes_processed / elapsed,
|
||||
'audit': time_auditing, 'audit_rate': time_auditing / elapsed})
|
||||
for watcher in self.watchers:
|
||||
watcher.end()
|
||||
if self.stats_sizes:
|
||||
self.logger.info(
|
||||
_('Object audit stats: %s') % json.dumps(self.stats_buckets))
|
||||
@ -259,6 +272,15 @@ class AuditorWorker(object):
|
||||
incr_by=chunk_len)
|
||||
self.bytes_processed += chunk_len
|
||||
self.total_bytes_processed += chunk_len
|
||||
for watcher in self.watchers:
|
||||
try:
|
||||
watcher.see_object(
|
||||
metadata,
|
||||
df._ondisk_info['data_file'])
|
||||
except QuarantineRequest:
|
||||
raise df._quarantine(
|
||||
df._data_file,
|
||||
"Requested by %s" % watcher.watcher_name)
|
||||
except DiskFileQuarantined as err:
|
||||
self.quarantines += 1
|
||||
self.logger.error(_('ERROR Object %(obj)s failed audit and was'
|
||||
@ -303,6 +325,20 @@ class ObjectAuditor(Daemon):
|
||||
self.rcache = join(self.recon_cache_path, "object.recon")
|
||||
self.interval = int(conf.get('interval', 30))
|
||||
|
||||
watcher_names = set(list_from_csv(conf.get('watchers', '')))
|
||||
# Normally '__file__' is always in config, but tests neglect it often.
|
||||
watcher_configs = \
|
||||
parse_prefixed_conf(conf['__file__'], 'object-auditor:watcher:') \
|
||||
if '__file__' in conf else {}
|
||||
self.watcher_defs = {}
|
||||
for name in watcher_names:
|
||||
self.logger.debug("Loading entry point '%s'", name)
|
||||
wconf = dict(conf)
|
||||
wconf.update(watcher_configs.get(name, {}))
|
||||
self.watcher_defs[name] = {
|
||||
'conf': wconf,
|
||||
'klass': load_pkg_resource("swift.object_audit_watcher", name)}
|
||||
|
||||
def _sleep(self):
|
||||
time.sleep(self.interval)
|
||||
|
||||
@ -318,7 +354,8 @@ class ObjectAuditor(Daemon):
|
||||
device_dirs = kwargs.get('device_dirs')
|
||||
worker = AuditorWorker(self.conf, self.logger, self.rcache,
|
||||
self.devices,
|
||||
zero_byte_only_at_fps=zero_byte_only_at_fps)
|
||||
zero_byte_only_at_fps=zero_byte_only_at_fps,
|
||||
watcher_defs=self.watcher_defs)
|
||||
worker.audit_all_objects(mode=mode, device_dirs=device_dirs)
|
||||
|
||||
def fork_child(self, zero_byte_fps=False, sleep_between_zbf_scanner=False,
|
||||
@ -438,3 +475,62 @@ class ObjectAuditor(Daemon):
|
||||
**kwargs)
|
||||
except (Exception, Timeout) as err:
|
||||
self.logger.exception(_('ERROR auditing: %s'), err)
|
||||
|
||||
|
||||
class WatcherWrapper(object):
|
||||
"""
|
||||
Run the user-supplied watcher.
|
||||
|
||||
Simple and gets the job done. Note that we aren't doing anything
|
||||
to isolate ourselves from hangs or file descriptor leaks
|
||||
in the plugins.
|
||||
"""
|
||||
|
||||
def __init__(self, watcher_class, watcher_name, conf, logger):
|
||||
self.watcher_name = watcher_name
|
||||
self.watcher_in_error = False
|
||||
self.logger = PrefixLoggerAdapter(logger, {})
|
||||
self.logger.set_prefix('[audit-watcher %s] ' % watcher_name)
|
||||
|
||||
try:
|
||||
self.watcher = watcher_class(conf, self.logger)
|
||||
except (Exception, Timeout):
|
||||
self.logger.exception('Error intializing watcher')
|
||||
self.watcher_in_error = True
|
||||
|
||||
def start(self, audit_type):
|
||||
if self.watcher_in_error:
|
||||
return # can't trust the state of the thing; bail
|
||||
try:
|
||||
self.watcher.start(audit_type=audit_type)
|
||||
except (Exception, Timeout):
|
||||
self.logger.exception('Error starting watcher')
|
||||
self.watcher_in_error = True
|
||||
|
||||
def see_object(self, meta, data_file_path):
|
||||
if self.watcher_in_error:
|
||||
return # can't trust the state of the thing; bail
|
||||
kwargs = {'object_metadata': meta,
|
||||
'data_file_path': data_file_path}
|
||||
try:
|
||||
self.watcher.see_object(**kwargs)
|
||||
except QuarantineRequest:
|
||||
# Avoid extra logging.
|
||||
raise
|
||||
except (Exception, Timeout):
|
||||
self.logger.exception(
|
||||
'Error in see_object(meta=%r, data_file_path=%r)',
|
||||
meta, data_file_path)
|
||||
# Do *not* flag watcher as being in an error state; a failure
|
||||
# to process one object shouldn't impact the ability to process
|
||||
# others.
|
||||
|
||||
def end(self):
|
||||
if self.watcher_in_error:
|
||||
return # can't trust the state of the thing; bail
|
||||
kwargs = {}
|
||||
try:
|
||||
self.watcher.end(**kwargs)
|
||||
except (Exception, Timeout):
|
||||
self.logger.exception('Error ending watcher')
|
||||
self.watcher_in_error = True
|
||||
|
0
swift/obj/watchers/__init__.py
Normal file
0
swift/obj/watchers/__init__.py
Normal file
146
swift/obj/watchers/dark_data.py
Normal file
146
swift/obj/watchers/dark_data.py
Normal file
@ -0,0 +1,146 @@
|
||||
# Copyright (c) 2019 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
#
|
||||
# This is an audit watcher that manages the dark data in the cluster.
|
||||
# Since the API for audit watchers is intended to use external plugins,
|
||||
# this code is invoked as if it were external: through pkg_resources.
|
||||
# Our setup.py comes pre-configured for convenience, but the operator has
|
||||
# to enable this watcher honestly by additing DarkDataWatcher to watchers=
|
||||
# in object-server.conf. The default is off, as if this does not exist.
|
||||
# Which is for the best, because of a large performance impact of this.
|
||||
#
|
||||
|
||||
import os
|
||||
import random
|
||||
import shutil
|
||||
|
||||
from eventlet import Timeout
|
||||
|
||||
from swift.common.direct_client import direct_get_container
|
||||
from swift.common.exceptions import ClientException, QuarantineRequest
|
||||
from swift.common.ring import Ring
|
||||
from swift.common.utils import split_path
|
||||
|
||||
|
||||
class ContainerError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class DarkDataWatcher(object):
|
||||
def __init__(self, conf, logger):
|
||||
|
||||
self.logger = logger
|
||||
|
||||
swift_dir = '/etc/swift'
|
||||
self.container_ring = Ring(swift_dir, ring_name='container')
|
||||
self.dark_data_policy = conf.get('action')
|
||||
if self.dark_data_policy not in ['log', 'delete', 'quarantine']:
|
||||
self.logger.warning(
|
||||
"Dark data action %r unknown, defaults to action = 'log'" %
|
||||
(self.dark_data_policy,))
|
||||
self.dark_data_policy = 'log'
|
||||
|
||||
def start(self, audit_type, **other_kwargs):
|
||||
self.is_zbf = audit_type == 'ZBF'
|
||||
self.tot_unknown = 0
|
||||
self.tot_dark = 0
|
||||
self.tot_okay = 0
|
||||
|
||||
def policy_based_object_handling(self, data_file_path, metadata):
|
||||
obj_path = metadata['name']
|
||||
|
||||
if self.dark_data_policy == "quarantine":
|
||||
self.logger.info("quarantining dark data %s" % obj_path)
|
||||
raise QuarantineRequest
|
||||
elif self.dark_data_policy == "log":
|
||||
self.logger.info("reporting dark data %s" % obj_path)
|
||||
elif self.dark_data_policy == "delete":
|
||||
obj_dir = os.path.dirname(data_file_path)
|
||||
self.logger.info("deleting dark data %s" % obj_dir)
|
||||
shutil.rmtree(obj_dir)
|
||||
|
||||
def see_object(self, object_metadata, data_file_path, **other_kwargs):
|
||||
|
||||
# No point in loading the container servers with unnecessary requests.
|
||||
if self.is_zbf:
|
||||
return
|
||||
|
||||
obj_path = object_metadata['name']
|
||||
try:
|
||||
obj_info = get_info_1(self.container_ring, obj_path, self.logger)
|
||||
except ContainerError:
|
||||
self.tot_unknown += 1
|
||||
return
|
||||
|
||||
if obj_info is None:
|
||||
self.tot_dark += 1
|
||||
self.policy_based_object_handling(data_file_path, object_metadata)
|
||||
else:
|
||||
# OK, object is there, but in the future we might want to verify
|
||||
# more. Watch out for versioned objects, EC, and all that.
|
||||
self.tot_okay += 1
|
||||
|
||||
def end(self, **other_kwargs):
|
||||
if self.is_zbf:
|
||||
return
|
||||
self.logger.info("total unknown %d ok %d dark %d" %
|
||||
(self.tot_unknown, self.tot_okay, self.tot_dark))
|
||||
|
||||
|
||||
#
|
||||
# Get the information for 1 object from container server
|
||||
#
|
||||
def get_info_1(container_ring, obj_path, logger):
|
||||
|
||||
path_comps = split_path(obj_path, 1, 3, True)
|
||||
account_name = path_comps[0]
|
||||
container_name = path_comps[1]
|
||||
obj_name = path_comps[2]
|
||||
|
||||
container_part, container_nodes = \
|
||||
container_ring.get_nodes(account_name, container_name)
|
||||
|
||||
if not container_nodes:
|
||||
raise ContainerError()
|
||||
|
||||
# Perhaps we should do something about the way we select the container
|
||||
# nodes. For now we just shuffle. It spreads the load, but it does not
|
||||
# improve upon the the case when some nodes are down, so auditor slows
|
||||
# to a crawl (if this plugin is enabled).
|
||||
random.shuffle(container_nodes)
|
||||
|
||||
dark_flag = 0
|
||||
for node in container_nodes:
|
||||
try:
|
||||
headers, objs = direct_get_container(
|
||||
node, container_part, account_name, container_name,
|
||||
prefix=obj_name, limit=1)
|
||||
except (ClientException, Timeout):
|
||||
# Something is wrong with that server, treat as an error.
|
||||
continue
|
||||
if not objs or objs[0]['name'] != obj_name:
|
||||
dark_flag += 1
|
||||
continue
|
||||
return objs[0]
|
||||
|
||||
# We do not ask for a quorum of container servers to know the object.
|
||||
# Even if 1 server knows the object, we return with the info above.
|
||||
# So, we only end here when all servers either have no record of the
|
||||
# object or error out. In such case, even one non-error server means
|
||||
# that the object is dark.
|
||||
if dark_flag:
|
||||
return None
|
||||
raise ContainerError()
|
@ -35,7 +35,7 @@ from swift.common.ring import Ring
|
||||
from swift.common.utils import Watchdog, get_logger, \
|
||||
get_remote_client, split_path, config_true_value, generate_trans_id, \
|
||||
affinity_key_function, affinity_locality_predicate, list_from_csv, \
|
||||
register_swift_info, readconf, config_auto_int_value
|
||||
register_swift_info, parse_prefixed_conf, config_auto_int_value
|
||||
from swift.common.constraints import check_utf8, valid_api_version
|
||||
from swift.proxy.controllers import AccountController, ContainerController, \
|
||||
ObjectControllerRouter, InfoController
|
||||
@ -773,15 +773,8 @@ def parse_per_policy_config(conf):
|
||||
:return: a dict mapping policy reference -> dict of policy options
|
||||
:raises ValueError: if a policy config section has an invalid name
|
||||
"""
|
||||
policy_config = {}
|
||||
all_conf = readconf(conf['__file__'])
|
||||
policy_section_prefix = conf['__name__'] + ':policy:'
|
||||
for section, options in all_conf.items():
|
||||
if not section.startswith(policy_section_prefix):
|
||||
continue
|
||||
policy_ref = section[len(policy_section_prefix):]
|
||||
policy_config[policy_ref] = options
|
||||
return policy_config
|
||||
return parse_prefixed_conf(conf['__file__'], policy_section_prefix)
|
||||
|
||||
|
||||
def app_factory(global_conf, **local_conf):
|
||||
|
@ -416,6 +416,11 @@ class ProbeTest(unittest.TestCase):
|
||||
def tearDown(self):
|
||||
Manager(['all']).kill()
|
||||
|
||||
def assertLengthEqual(self, obj, length):
|
||||
obj_len = len(obj)
|
||||
self.assertEqual(obj_len, length, 'len(%r) == %d, not %d' % (
|
||||
obj, obj_len, length))
|
||||
|
||||
def device_dir(self, node):
|
||||
server_type, config_number = get_server_number(
|
||||
(node['ip'], node['port']), self.ipport2server)
|
||||
|
183
test/probe/test_dark_data.py
Normal file
183
test/probe/test_dark_data.py
Normal file
@ -0,0 +1,183 @@
|
||||
#!/usr/bin/python -u
|
||||
# Copyright (c) 2010-2012 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import collections
|
||||
import unittest
|
||||
|
||||
import os
|
||||
import uuid
|
||||
import shutil
|
||||
|
||||
from datetime import datetime
|
||||
from six.moves.configparser import ConfigParser
|
||||
|
||||
from test.probe.brain import BrainSplitter
|
||||
from test.probe.common import ReplProbeTest
|
||||
from swift.common import manager
|
||||
from swift.common.storage_policy import get_policy_string
|
||||
from swift.common.manager import Manager, Server
|
||||
from swift.common.utils import readconf
|
||||
|
||||
|
||||
CONF_SECTION = 'object-auditor:watcher:swift#dark_data'
|
||||
|
||||
|
||||
class TestDarkDataDeletion(ReplProbeTest):
|
||||
# NB: could be 'quarantine' in another test
|
||||
action = 'delete'
|
||||
|
||||
def setUp(self):
|
||||
"""
|
||||
Reset all environment and start all servers.
|
||||
"""
|
||||
super(TestDarkDataDeletion, self).setUp()
|
||||
|
||||
self.conf_dest = \
|
||||
os.path.join('/tmp/',
|
||||
datetime.now().strftime('swift-%Y-%m-%d_%H-%M-%S-%f'))
|
||||
os.mkdir(self.conf_dest)
|
||||
|
||||
object_server_dir = os.path.join(self.conf_dest, 'object-server')
|
||||
os.mkdir(object_server_dir)
|
||||
|
||||
for conf_file in Server('object-auditor').conf_files():
|
||||
config = readconf(conf_file)
|
||||
if 'object-auditor' not in config:
|
||||
continue # *somebody* should be set up to run the auditor
|
||||
config['object-auditor'].update(
|
||||
{'watchers': 'swift#dark_data'})
|
||||
# Note that this setdefault business may mean the watcher doesn't
|
||||
# pick up DEFAULT values, but that (probably?) won't matter
|
||||
config.setdefault(CONF_SECTION, {}).update(
|
||||
{'action': self.action})
|
||||
|
||||
parser = ConfigParser()
|
||||
for section in ('object-auditor', CONF_SECTION):
|
||||
parser.add_section(section)
|
||||
for option, value in config[section].items():
|
||||
parser.set(section, option, value)
|
||||
|
||||
file_name = os.path.basename(conf_file)
|
||||
if file_name.endswith('.d'):
|
||||
# Work around conf.d setups (like you might see with VSAIO)
|
||||
file_name = file_name[:-2]
|
||||
with open(os.path.join(object_server_dir, file_name), 'w') as fp:
|
||||
parser.write(fp)
|
||||
|
||||
self.container_name = 'container-%s' % uuid.uuid4()
|
||||
self.object_name = 'object-%s' % uuid.uuid4()
|
||||
self.brain = BrainSplitter(self.url, self.token, self.container_name,
|
||||
self.object_name, 'object',
|
||||
policy=self.policy)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.conf_dest)
|
||||
|
||||
def gather_object_files_by_ext(self):
|
||||
result = collections.defaultdict(set)
|
||||
for node in self.brain.nodes:
|
||||
for path, _, files in os.walk(os.path.join(
|
||||
self.device_dir(node),
|
||||
get_policy_string('objects', self.policy))):
|
||||
for file in files:
|
||||
if file in ('.lock', 'hashes.pkl', 'hashes.invalid'):
|
||||
continue
|
||||
_, ext = os.path.splitext(file)
|
||||
result[ext].add(os.path.join(path, file))
|
||||
return result
|
||||
|
||||
def test_dark_data(self):
|
||||
self.brain.put_container()
|
||||
self.brain.put_object()
|
||||
self.brain.stop_handoff_half()
|
||||
self.brain.delete_object()
|
||||
Manager(['object-updater']).once()
|
||||
Manager(['container-replicator']).once()
|
||||
|
||||
# Sanity check:
|
||||
# * all containers are empty
|
||||
# * primaries that are still up have two .ts files
|
||||
# * primary that's down has one .data file
|
||||
for index, (headers, items) in self.direct_get_container(
|
||||
container=self.container_name).items():
|
||||
self.assertEqual(headers['X-Container-Object-Count'], '0')
|
||||
self.assertEqual(items, [])
|
||||
|
||||
files = self.gather_object_files_by_ext()
|
||||
self.assertLengthEqual(files, 2)
|
||||
self.assertLengthEqual(files['.ts'], 2)
|
||||
self.assertLengthEqual(files['.data'], 1)
|
||||
|
||||
# Simulate a reclaim_age passing,
|
||||
# so the tombstones all got cleaned up
|
||||
for file_path in files['.ts']:
|
||||
os.unlink(file_path)
|
||||
|
||||
# Old node gets reintroduced to the cluster
|
||||
self.brain.start_handoff_half()
|
||||
# ...so replication thinks its got some work to do
|
||||
Manager(['object-replicator']).once()
|
||||
|
||||
# Now we're back to *three* .data files
|
||||
files = self.gather_object_files_by_ext()
|
||||
self.assertLengthEqual(files, 1)
|
||||
self.assertLengthEqual(files['.data'], 3)
|
||||
|
||||
# But that's OK, audit watchers to the rescue!
|
||||
old_swift_dir = manager.SWIFT_DIR
|
||||
manager.SWIFT_DIR = self.conf_dest
|
||||
try:
|
||||
Manager(['object-auditor']).once()
|
||||
finally:
|
||||
manager.SWIFT_DIR = old_swift_dir
|
||||
|
||||
# Verify that the policy was applied.
|
||||
self.check_on_disk_files(files['.data'])
|
||||
|
||||
def check_on_disk_files(self, files):
|
||||
for file_path in files:
|
||||
# File's not there
|
||||
self.assertFalse(os.path.exists(file_path))
|
||||
# And it's not quaratined, either!
|
||||
self.assertPathDoesNotExist(os.path.join(
|
||||
file_path[:file_path.index('objects')], 'quarantined'))
|
||||
|
||||
def assertPathExists(self, path):
|
||||
msg = "Expected path %r to exist, but it doesn't" % path
|
||||
self.assertTrue(os.path.exists(path), msg)
|
||||
|
||||
def assertPathDoesNotExist(self, path):
|
||||
msg = "Expected path %r to not exist, but it does" % path
|
||||
self.assertFalse(os.path.exists(path), msg)
|
||||
|
||||
|
||||
class TestDarkDataQuarantining(TestDarkDataDeletion):
|
||||
action = 'quarantine'
|
||||
|
||||
def check_on_disk_files(self, files):
|
||||
for file_path in files:
|
||||
# File's not there
|
||||
self.assertPathDoesNotExist(file_path)
|
||||
# Got quarantined
|
||||
parts = file_path.split(os.path.sep)
|
||||
quarantine_dir = parts[:parts.index('objects')] + ['quarantined']
|
||||
quarantine_path = os.path.sep.join(
|
||||
quarantine_dir + ['objects'] + parts[-2:])
|
||||
self.assertPathExists(quarantine_path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
@ -242,11 +242,6 @@ class BaseTestContainerSharding(ReplProbeTest):
|
||||
'\n '.join(result['other']))
|
||||
return result
|
||||
|
||||
def assertLengthEqual(self, obj, length):
|
||||
obj_len = len(obj)
|
||||
self.assertEqual(obj_len, length, 'len(%r) == %d, not %d' % (
|
||||
obj, obj_len, length))
|
||||
|
||||
def assert_dict_contains(self, expected_items, actual_dict):
|
||||
ignored = set(expected_items) ^ set(actual_dict)
|
||||
filtered_actual = {k: actual_dict[k]
|
||||
|
@ -12,32 +12,35 @@
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import json
|
||||
|
||||
import unittest
|
||||
import json
|
||||
import mock
|
||||
import os
|
||||
import sys
|
||||
import pkg_resources
|
||||
import signal
|
||||
import time
|
||||
import string
|
||||
import sys
|
||||
import time
|
||||
import xattr
|
||||
from shutil import rmtree
|
||||
from tempfile import mkdtemp
|
||||
import textwrap
|
||||
from os.path import dirname, basename
|
||||
from test.unit import (debug_logger, patch_policies, make_timestamp_iter,
|
||||
DEFAULT_TEST_EC_TYPE, skip_if_no_xattrs)
|
||||
from test.unit import (
|
||||
debug_logger, DEFAULT_TEST_EC_TYPE,
|
||||
make_timestamp_iter, patch_policies, skip_if_no_xattrs)
|
||||
from test.unit.obj.common import write_diskfile
|
||||
from swift.obj import auditor, replicator
|
||||
from swift.obj.watchers.dark_data import DarkDataWatcher
|
||||
from swift.obj.diskfile import (
|
||||
DiskFile, write_metadata, invalidate_hash, get_data_dir,
|
||||
DiskFileManager, ECDiskFileManager, AuditLocation, clear_auditor_status,
|
||||
get_auditor_status, HASH_FILE, HASH_INVALIDATIONS_FILE)
|
||||
from swift.common.utils import (
|
||||
mkdirs, normalize_timestamp, Timestamp, readconf, md5)
|
||||
mkdirs, normalize_timestamp, Timestamp, readconf, md5, PrefixLoggerAdapter)
|
||||
from swift.common.storage_policy import (
|
||||
ECStoragePolicy, StoragePolicy, POLICIES, EC_POLICY)
|
||||
from test.unit.obj.common import write_diskfile
|
||||
|
||||
_mocked_policies = [
|
||||
StoragePolicy(0, 'zero', False),
|
||||
@ -60,8 +63,33 @@ def works_only_once(callable_thing, exception):
|
||||
return only_once
|
||||
|
||||
|
||||
@patch_policies(_mocked_policies)
|
||||
class TestAuditor(unittest.TestCase):
|
||||
def no_audit_watchers(group, name=None):
|
||||
if group == 'swift.object_audit_watcher':
|
||||
return iter([])
|
||||
else:
|
||||
return pkg_resources.iter_entry_points(group, name)
|
||||
|
||||
|
||||
class FakeRing1(object):
|
||||
|
||||
def __init__(self, swift_dir, ring_name=None):
|
||||
return
|
||||
|
||||
def get_nodes(self, *args, **kwargs):
|
||||
x = 1
|
||||
node1 = {'ip': '10.0.0.%s' % x,
|
||||
'replication_ip': '10.0.0.%s' % x,
|
||||
'port': 6200 + x,
|
||||
'replication_port': 6200 + x,
|
||||
'device': 'sda',
|
||||
'zone': x % 3,
|
||||
'region': x % 2,
|
||||
'id': x,
|
||||
'handoff_index': 1}
|
||||
return (1, [node1])
|
||||
|
||||
|
||||
class TestAuditorBase(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
skip_if_no_xattrs()
|
||||
@ -113,7 +141,7 @@ class TestAuditor(unittest.TestCase):
|
||||
# diskfiles for policy 0, 1, 2
|
||||
self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o',
|
||||
policy=POLICIES[0])
|
||||
self.disk_file_p1 = self.df_mgr.get_diskfile('sda', '0', 'a', 'c',
|
||||
self.disk_file_p1 = self.df_mgr.get_diskfile('sda', '0', 'a', 'c2',
|
||||
'o', policy=POLICIES[1])
|
||||
self.disk_file_ec = self.ec_df_mgr.get_diskfile(
|
||||
'sda', '0', 'a', 'c', 'o', policy=POLICIES[2], frag_index=1)
|
||||
@ -121,6 +149,10 @@ class TestAuditor(unittest.TestCase):
|
||||
def tearDown(self):
|
||||
rmtree(os.path.dirname(self.testdir), ignore_errors=1)
|
||||
|
||||
|
||||
@patch_policies(_mocked_policies)
|
||||
class TestAuditor(TestAuditorBase):
|
||||
|
||||
def test_worker_conf_parms(self):
|
||||
def check_common_defaults():
|
||||
self.assertEqual(auditor_worker.max_bytes_per_second, 10000000)
|
||||
@ -1532,5 +1564,163 @@ class TestAuditor(unittest.TestCase):
|
||||
.format(outstanding_pids))
|
||||
|
||||
|
||||
@mock.patch('pkg_resources.iter_entry_points', no_audit_watchers)
|
||||
@patch_policies(_mocked_policies)
|
||||
class TestAuditWatchers(TestAuditorBase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestAuditWatchers, self).setUp()
|
||||
|
||||
timestamp = Timestamp(time.time())
|
||||
|
||||
data = b'0' * 1024
|
||||
etag = md5()
|
||||
with self.disk_file.create() as writer:
|
||||
writer.write(data)
|
||||
etag.update(data)
|
||||
etag = etag.hexdigest()
|
||||
metadata = {
|
||||
'ETag': etag,
|
||||
'X-Timestamp': timestamp.internal,
|
||||
'Content-Length': str(len(data)),
|
||||
'X-Object-Meta-Flavor': 'banana',
|
||||
}
|
||||
writer.put(metadata)
|
||||
|
||||
data = b'1' * 2048
|
||||
etag = md5()
|
||||
with self.disk_file_p1.create() as writer:
|
||||
writer.write(data)
|
||||
etag.update(data)
|
||||
etag = etag.hexdigest()
|
||||
metadata = {
|
||||
'ETag': etag,
|
||||
'X-Timestamp': timestamp.internal,
|
||||
'Content-Length': str(len(data)),
|
||||
'X-Object-Meta-Flavor': 'orange',
|
||||
}
|
||||
writer.put(metadata)
|
||||
|
||||
def test_watchers(self):
|
||||
|
||||
calls = []
|
||||
|
||||
class TestWatcher(object):
|
||||
def __init__(self, conf, logger):
|
||||
self._started = False
|
||||
self._ended = False
|
||||
calls.append(["__init__", conf, logger])
|
||||
|
||||
# Make sure the logger is capable of quacking like a logger
|
||||
logger.debug("getting started")
|
||||
|
||||
def start(self, audit_type, **other_kwargs):
|
||||
if self._started:
|
||||
raise Exception("don't call it twice")
|
||||
self._started = True
|
||||
calls.append(['start', audit_type])
|
||||
|
||||
def see_object(self, object_metadata,
|
||||
data_file_path, **other_kwargs):
|
||||
calls.append(['see_object', object_metadata,
|
||||
data_file_path, other_kwargs])
|
||||
|
||||
def end(self, **other_kwargs):
|
||||
if self._ended:
|
||||
raise Exception("don't call it twice")
|
||||
self._ended = True
|
||||
calls.append(['end'])
|
||||
|
||||
conf = self.conf.copy()
|
||||
conf['watchers'] = 'test_watcher1'
|
||||
conf['__file__'] = '/etc/swift/swift.conf'
|
||||
ret_config = {'swift#dark_data': {'action': 'log'}}
|
||||
with mock.patch('swift.obj.auditor.parse_prefixed_conf',
|
||||
return_value=ret_config), \
|
||||
mock.patch('swift.obj.auditor.load_pkg_resource',
|
||||
side_effect=[TestWatcher]) as mock_load, \
|
||||
mock.patch('swift.obj.auditor.get_logger',
|
||||
lambda *a, **kw: self.logger):
|
||||
my_auditor = auditor.ObjectAuditor(conf)
|
||||
|
||||
self.assertEqual(mock_load.mock_calls, [
|
||||
mock.call('swift.object_audit_watcher', 'test_watcher1'),
|
||||
])
|
||||
|
||||
my_auditor.run_audit(mode='once', zero_byte_fps=float("inf"))
|
||||
|
||||
self.assertEqual(len(calls), 5)
|
||||
|
||||
self.assertEqual(calls[0], ["__init__", conf, mock.ANY])
|
||||
self.assertIsInstance(calls[0][2], PrefixLoggerAdapter)
|
||||
self.assertIs(calls[0][2].logger, self.logger)
|
||||
|
||||
self.assertEqual(calls[1], ["start", "ZBF"])
|
||||
|
||||
self.assertEqual(calls[2][0], "see_object")
|
||||
self.assertEqual(calls[3][0], "see_object")
|
||||
|
||||
# The order in which the auditor finds things on the filesystem is
|
||||
# irrelevant; what matters is that it finds all the things.
|
||||
calls[2:4] = sorted(calls[2:4], key=lambda item: item[1]['name'])
|
||||
|
||||
self.assertDictContainsSubset({'name': '/a/c/o',
|
||||
'X-Object-Meta-Flavor': 'banana'},
|
||||
calls[2][1])
|
||||
self.assertIn('node/sda/objects/0/', calls[2][2]) # data_file_path
|
||||
self.assertTrue(calls[2][2].endswith('.data')) # data_file_path
|
||||
self.assertEqual({}, calls[2][3])
|
||||
|
||||
self.assertDictContainsSubset({'name': '/a/c2/o',
|
||||
'X-Object-Meta-Flavor': 'orange'},
|
||||
calls[3][1])
|
||||
self.assertIn('node/sda/objects-1/0/', calls[3][2]) # data_file_path
|
||||
self.assertTrue(calls[3][2].endswith('.data')) # data_file_path
|
||||
self.assertEqual({}, calls[3][3])
|
||||
|
||||
self.assertEqual(calls[4], ["end"])
|
||||
|
||||
log_lines = self.logger.get_lines_for_level('debug')
|
||||
self.assertIn(
|
||||
"[audit-watcher test_watcher1] getting started",
|
||||
log_lines)
|
||||
|
||||
def test_builtin_watchers(self):
|
||||
|
||||
conf = self.conf.copy()
|
||||
conf['watchers'] = 'test_watcher1'
|
||||
conf['__file__'] = '/etc/swift/swift.conf'
|
||||
ret_config = {'swift#dark_data': {'action': 'log'}}
|
||||
with mock.patch('swift.obj.auditor.parse_prefixed_conf',
|
||||
return_value=ret_config), \
|
||||
mock.patch('swift.obj.auditor.load_pkg_resource',
|
||||
side_effect=[DarkDataWatcher]):
|
||||
my_auditor = auditor.ObjectAuditor(conf, logger=self.logger)
|
||||
|
||||
def fake_direct_get_container(node, part, account, container,
|
||||
prefix=None, limit=None):
|
||||
self.assertEqual(part, 1)
|
||||
self.assertEqual(limit, 1)
|
||||
# The returned entry is not abbreviated, but is full of nonsese.
|
||||
entry = {'bytes': 30968411,
|
||||
'hash': '60303f4122966fe5925f045eb52d1129',
|
||||
'name': '%s' % prefix,
|
||||
'content_type': 'video/mp4',
|
||||
'last_modified': '2017-08-15T03:30:57.693210'}
|
||||
return {}, [entry]
|
||||
|
||||
with mock.patch('swift.obj.watchers.dark_data.Ring', FakeRing1), \
|
||||
mock.patch("swift.obj.watchers.dark_data.direct_get_container",
|
||||
fake_direct_get_container):
|
||||
my_auditor.run_audit(mode='once')
|
||||
|
||||
# N.B. We want to check for ok files instead of dark because
|
||||
# if anything goes wrong inside, we want it fail the test.
|
||||
log_lines = self.logger.get_lines_for_level('info')
|
||||
self.assertIn(
|
||||
'[audit-watcher test_watcher1] total unknown 0 ok 2 dark 0',
|
||||
log_lines)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
Loading…
Reference in New Issue
Block a user