Make the dark data watcher work with sharded containers
Be willing to accept shards instead of objects when querying containers. If we receive shards, be willing to query them looking for the object. Change-Id: I0d8dd42f81b97dddd6cf8910afaef4ba85e67d27 Partial-Bug: #1925346
This commit is contained in:
parent
365db20275
commit
6b91334298
@ -42,6 +42,10 @@ servers agree, it will silently fail to detect anything if even one
|
|||||||
of container servers in the ring is down or unreacheable. This is
|
of container servers in the ring is down or unreacheable. This is
|
||||||
done in the interest of operators who run with action=delete.
|
done in the interest of operators who run with action=delete.
|
||||||
|
|
||||||
|
If a container is sharded, there is a small edgecase where an object row could
|
||||||
|
be misplaced. So it is recommended to always start with action=log, before
|
||||||
|
your confident to run action=delete.
|
||||||
|
|
||||||
Finally, keep in mind that Dark Data watcher needs the container
|
Finally, keep in mind that Dark Data watcher needs the container
|
||||||
ring to operate, but runs on an object node. This can come up if
|
ring to operate, but runs on an object node. This can come up if
|
||||||
cluster has nodes separated by function.
|
cluster has nodes separated by function.
|
||||||
@ -57,7 +61,7 @@ from eventlet import Timeout
|
|||||||
from swift.common.direct_client import direct_get_container
|
from swift.common.direct_client import direct_get_container
|
||||||
from swift.common.exceptions import ClientException, QuarantineRequest
|
from swift.common.exceptions import ClientException, QuarantineRequest
|
||||||
from swift.common.ring import Ring
|
from swift.common.ring import Ring
|
||||||
from swift.common.utils import split_path, Timestamp
|
from swift.common.utils import split_path, Namespace, Timestamp
|
||||||
|
|
||||||
|
|
||||||
class ContainerError(Exception):
|
class ContainerError(Exception):
|
||||||
@ -114,7 +118,7 @@ class DarkDataWatcher(object):
|
|||||||
|
|
||||||
obj_path = object_metadata['name']
|
obj_path = object_metadata['name']
|
||||||
try:
|
try:
|
||||||
obj_info = get_info_1(self.container_ring, obj_path, self.logger)
|
obj_info = get_info_1(self.container_ring, obj_path)
|
||||||
except ContainerError:
|
except ContainerError:
|
||||||
self.tot_unknown += 1
|
self.tot_unknown += 1
|
||||||
return
|
return
|
||||||
@ -137,39 +141,73 @@ class DarkDataWatcher(object):
|
|||||||
#
|
#
|
||||||
# Get the information for 1 object from container server
|
# Get the information for 1 object from container server
|
||||||
#
|
#
|
||||||
def get_info_1(container_ring, obj_path, logger):
|
def get_info_1(container_ring, obj_path):
|
||||||
|
|
||||||
path_comps = split_path(obj_path, 1, 3, True)
|
path_comps = split_path(obj_path, 1, 3, True)
|
||||||
account_name = path_comps[0]
|
account_name = path_comps[0]
|
||||||
container_name = path_comps[1]
|
container_name = path_comps[1]
|
||||||
obj_name = path_comps[2]
|
obj_name = path_comps[2]
|
||||||
|
visited = set()
|
||||||
|
|
||||||
container_part, container_nodes = \
|
def check_container(account_name, container_name):
|
||||||
container_ring.get_nodes(account_name, container_name)
|
record_type = 'auto'
|
||||||
|
if (account_name, container_name) in visited:
|
||||||
|
# Already queried; So we have a last ditch effort and specifically
|
||||||
|
# ask for object data as this could be pointing back to the root
|
||||||
|
# If the container doesn't have objects then this will return
|
||||||
|
# no objects and break the loop.
|
||||||
|
record_type = 'object'
|
||||||
|
else:
|
||||||
|
visited.add((account_name, container_name))
|
||||||
|
|
||||||
if not container_nodes:
|
container_part, container_nodes = \
|
||||||
raise ContainerError()
|
container_ring.get_nodes(account_name, container_name)
|
||||||
|
if not container_nodes:
|
||||||
|
raise ContainerError()
|
||||||
|
|
||||||
# Perhaps we should do something about the way we select the container
|
# Perhaps we should do something about the way we select the container
|
||||||
# nodes. For now we just shuffle. It spreads the load, but it does not
|
# nodes. For now we just shuffle. It spreads the load, but it does not
|
||||||
# improve upon the the case when some nodes are down, so auditor slows
|
# improve upon the the case when some nodes are down, so auditor slows
|
||||||
# to a crawl (if this plugin is enabled).
|
# to a crawl (if this plugin is enabled).
|
||||||
random.shuffle(container_nodes)
|
random.shuffle(container_nodes)
|
||||||
|
|
||||||
err_flag = 0
|
err_flag = 0
|
||||||
for node in container_nodes:
|
shards = set()
|
||||||
try:
|
for node in container_nodes:
|
||||||
headers, objs = direct_get_container(
|
try:
|
||||||
node, container_part, account_name, container_name,
|
# The prefix+limit trick is used when a traditional listing
|
||||||
prefix=obj_name, limit=1)
|
# is returned, while includes is there for shards.
|
||||||
except (ClientException, Timeout):
|
# See the how GET routes it in swift/container/server.py.
|
||||||
# Something is wrong with that server, treat as an error.
|
headers, objs_or_shards = direct_get_container(
|
||||||
err_flag += 1
|
node, container_part, account_name, container_name,
|
||||||
continue
|
prefix=obj_name, limit=1,
|
||||||
if objs and objs[0]['name'] == obj_name:
|
extra_params={'includes': obj_name, 'states': 'listing'},
|
||||||
return objs[0]
|
headers={'X-Backend-Record-Type': record_type})
|
||||||
|
except (ClientException, Timeout):
|
||||||
|
# Something is wrong with that server, treat as an error.
|
||||||
|
err_flag += 1
|
||||||
|
continue
|
||||||
|
if headers.get('X-Backend-Record-Type') == 'shard':
|
||||||
|
# When using includes=obj_name, we don't need to anything
|
||||||
|
# like find_shard_range(obj_name, ... objs_or_shards).
|
||||||
|
if len(objs_or_shards) != 0:
|
||||||
|
namespace = Namespace(objs_or_shards[0]['name'],
|
||||||
|
objs_or_shards[0]['lower'],
|
||||||
|
objs_or_shards[0]['upper'])
|
||||||
|
shards.add((namespace.account, namespace.container))
|
||||||
|
continue
|
||||||
|
if objs_or_shards and objs_or_shards[0]['name'] == obj_name:
|
||||||
|
return objs_or_shards[0]
|
||||||
|
|
||||||
# We only report the object as dark if all known servers agree that it is.
|
# If we got back some shards, recurse
|
||||||
if err_flag:
|
for account_name, container_name in shards:
|
||||||
raise ContainerError()
|
res = check_container(account_name, container_name)
|
||||||
return None
|
if res:
|
||||||
|
return res
|
||||||
|
|
||||||
|
# We only report the object as dark if all known servers agree to it.
|
||||||
|
if err_flag:
|
||||||
|
raise ContainerError()
|
||||||
|
return None
|
||||||
|
|
||||||
|
return check_container(account_name, container_name)
|
||||||
|
@ -909,7 +909,7 @@ class TestAuditor(TestAuditorBase):
|
|||||||
kwargs['zero_byte_fps'] = 50
|
kwargs['zero_byte_fps'] = 50
|
||||||
self.auditor.run_audit(**kwargs)
|
self.auditor.run_audit(**kwargs)
|
||||||
self.assertFalse(os.path.isdir(quarantine_path))
|
self.assertFalse(os.path.isdir(quarantine_path))
|
||||||
del(kwargs['zero_byte_fps'])
|
del (kwargs['zero_byte_fps'])
|
||||||
clear_auditor_status(self.devices, 'objects')
|
clear_auditor_status(self.devices, 'objects')
|
||||||
self.auditor.run_audit(**kwargs)
|
self.auditor.run_audit(**kwargs)
|
||||||
self.assertTrue(os.path.isdir(quarantine_path))
|
self.assertTrue(os.path.isdir(quarantine_path))
|
||||||
@ -1760,7 +1760,8 @@ class TestAuditWatchers(TestAuditorBase):
|
|||||||
writer.commit(timestamp)
|
writer.commit(timestamp)
|
||||||
|
|
||||||
def fake_direct_get_container(node, part, account, container,
|
def fake_direct_get_container(node, part, account, container,
|
||||||
prefix=None, limit=None):
|
prefix=None, limit=None,
|
||||||
|
extra_params=None, headers=None):
|
||||||
self.assertEqual(part, 1)
|
self.assertEqual(part, 1)
|
||||||
self.assertEqual(limit, 1)
|
self.assertEqual(limit, 1)
|
||||||
|
|
||||||
@ -1877,7 +1878,8 @@ class TestAuditWatchers(TestAuditorBase):
|
|||||||
for cur in scenario:
|
for cur in scenario:
|
||||||
|
|
||||||
def fake_direct_get_container(node, part, account, container,
|
def fake_direct_get_container(node, part, account, container,
|
||||||
prefix=None, limit=None):
|
prefix=None, limit=None,
|
||||||
|
extra_params=None, headers=None):
|
||||||
self.assertEqual(part, 1)
|
self.assertEqual(part, 1)
|
||||||
self.assertEqual(limit, 1)
|
self.assertEqual(limit, 1)
|
||||||
|
|
||||||
@ -1936,6 +1938,275 @@ class TestAuditWatchers(TestAuditorBase):
|
|||||||
' '.join(words[3:]), cur['cr'])
|
' '.join(words[3:]), cur['cr'])
|
||||||
self.fail(msg=msg)
|
self.fail(msg=msg)
|
||||||
|
|
||||||
|
def test_dark_data_with_sharding(self):
|
||||||
|
|
||||||
|
# We use the EC object because it's all alone in its fake container.
|
||||||
|
main_acc = self.disk_file_ec._account
|
||||||
|
shard_acc = ".shards_%s" % main_acc
|
||||||
|
cont = self.disk_file_ec._container
|
||||||
|
|
||||||
|
def fake_direct_get_container(node, part, account, container,
|
||||||
|
prefix=None, limit=None,
|
||||||
|
extra_params=None, headers=None):
|
||||||
|
self.assertEqual(part, 1)
|
||||||
|
self.assertEqual(limit, 1)
|
||||||
|
self.assertIn('X-Backend-Record-Type', headers)
|
||||||
|
self.assertEqual(headers['X-Backend-Record-Type'], 'auto')
|
||||||
|
|
||||||
|
if account == shard_acc:
|
||||||
|
# Listing shards - just shortcut with a made-up response.
|
||||||
|
entry = {'bytes': 30968411,
|
||||||
|
'hash': '60303f4122966fe5925f045eb52d1129',
|
||||||
|
'name': '%s' % prefix,
|
||||||
|
'content_type': 'video/mp4',
|
||||||
|
'last_modified': '2017-08-15T03:30:57.693210'}
|
||||||
|
return {'X-Backend-Record-Type': 'object'}, [entry]
|
||||||
|
|
||||||
|
else:
|
||||||
|
if account == main_acc and container == cont:
|
||||||
|
# The root container has no listing but has a shard range.
|
||||||
|
entry = {
|
||||||
|
'name': '%s/%s' % (shard_acc, cont),
|
||||||
|
'timestamp': '1630106063.23826',
|
||||||
|
'lower': '',
|
||||||
|
'upper': '',
|
||||||
|
'object_count': 1,
|
||||||
|
'bytes_used': 1024,
|
||||||
|
'meta_timestamp': '1630106063.23826',
|
||||||
|
'deleted': 0,
|
||||||
|
'state': 'sharded',
|
||||||
|
'state_timestamp': '1630106063.23826',
|
||||||
|
'epoch': None,
|
||||||
|
'reported': 1,
|
||||||
|
'tombstones': -1}
|
||||||
|
return {'X-Backend-Record-Type': 'shard'}, [entry]
|
||||||
|
|
||||||
|
else:
|
||||||
|
# It's an un-sharded container, no tricks.
|
||||||
|
entry = {'bytes': 30968411,
|
||||||
|
'hash': '60303f4122966fe5925f045eb52d1129',
|
||||||
|
'name': '%s' % prefix,
|
||||||
|
'content_type': 'video/mp4',
|
||||||
|
'last_modified': '2017-08-15T03:30:57.693210'}
|
||||||
|
return {}, [entry]
|
||||||
|
|
||||||
|
conf = self.conf.copy()
|
||||||
|
conf['watchers'] = 'test_watcher1'
|
||||||
|
conf['__file__'] = '/etc/swift/swift.conf'
|
||||||
|
|
||||||
|
ret_config = {'test_watcher1': {'action': 'log', 'grace_age': '0'}}
|
||||||
|
with mock.patch('swift.obj.auditor.parse_prefixed_conf',
|
||||||
|
return_value=ret_config), \
|
||||||
|
mock.patch('swift.obj.auditor.load_pkg_resource',
|
||||||
|
side_effect=[DarkDataWatcher]):
|
||||||
|
my_auditor = auditor.ObjectAuditor(conf, logger=self.logger)
|
||||||
|
|
||||||
|
with mock.patch('swift.obj.watchers.dark_data.Ring', FakeRing1), \
|
||||||
|
mock.patch("swift.obj.watchers.dark_data.direct_get_container",
|
||||||
|
fake_direct_get_container):
|
||||||
|
my_auditor.run_audit(mode='once')
|
||||||
|
|
||||||
|
log_lines = self.logger.get_lines_for_level('info')
|
||||||
|
self.assertIn(
|
||||||
|
'[audit-watcher test_watcher1] total unknown 0 ok 3 dark 0',
|
||||||
|
log_lines)
|
||||||
|
|
||||||
|
def test_dark_data_with_sharding_fallback_to_root(self):
|
||||||
|
|
||||||
|
# We use the EC object because it's all alone in its fake container.
|
||||||
|
main_acc = self.disk_file_ec._account
|
||||||
|
shard_acc = ".shards_%s" % main_acc
|
||||||
|
cont = self.disk_file_ec._container
|
||||||
|
call_stack = []
|
||||||
|
|
||||||
|
def fake_direct_get_container(node, part, account, container,
|
||||||
|
prefix=None, limit=None,
|
||||||
|
extra_params=None, headers=None):
|
||||||
|
self.assertEqual(part, 1)
|
||||||
|
self.assertEqual(limit, 1)
|
||||||
|
call_stack.append((account, container, headers))
|
||||||
|
|
||||||
|
if account == shard_acc:
|
||||||
|
# return a shard listing that actaully points to the root OSR
|
||||||
|
entry = {
|
||||||
|
'name': '%s/%s' % (main_acc, cont),
|
||||||
|
'timestamp': '1630106063.23826',
|
||||||
|
'lower': '',
|
||||||
|
'upper': '',
|
||||||
|
'object_count': 1,
|
||||||
|
'bytes_used': 1024,
|
||||||
|
'meta_timestamp': '1630106063.23826',
|
||||||
|
'deleted': 0,
|
||||||
|
'state': 'sharded',
|
||||||
|
'state_timestamp': '1630106063.23826',
|
||||||
|
'epoch': None,
|
||||||
|
'reported': 1,
|
||||||
|
'tombstones': -1}
|
||||||
|
return {'X-Backend-Record-Type': 'shard'}, [entry]
|
||||||
|
|
||||||
|
else:
|
||||||
|
if account == main_acc and container == cont:
|
||||||
|
if headers['X-Backend-Record-Type'] == 'auto':
|
||||||
|
# The root container has no listing but has a shard
|
||||||
|
# range.
|
||||||
|
entry = {
|
||||||
|
'name': '%s/%s' % (shard_acc, cont),
|
||||||
|
'timestamp': '1630106063.23826',
|
||||||
|
'lower': '',
|
||||||
|
'upper': '',
|
||||||
|
'object_count': 1,
|
||||||
|
'bytes_used': 1024,
|
||||||
|
'meta_timestamp': '1630106063.23826',
|
||||||
|
'deleted': 0,
|
||||||
|
'state': 'sharded',
|
||||||
|
'state_timestamp': '1630106063.23826',
|
||||||
|
'epoch': None,
|
||||||
|
'reported': 1,
|
||||||
|
'tombstones': -1}
|
||||||
|
return {'X-Backend-Record-Type': 'shard'}, [entry]
|
||||||
|
else:
|
||||||
|
# we've come back with a direct record-type = object
|
||||||
|
self.assertEqual(headers['X-Backend-Record-Type'],
|
||||||
|
'object')
|
||||||
|
# let's give them the obj, they've tried hard enough.
|
||||||
|
entry = {'bytes': 30968411,
|
||||||
|
'hash': '60303f4122966fe5925f045eb52d1129',
|
||||||
|
'name': '%s' % prefix,
|
||||||
|
'content_type': 'video/mp4',
|
||||||
|
'last_modified': '2017-08-15T03:30:57.693210'}
|
||||||
|
return {'X-Backend-Record-Type': 'object'}, [entry]
|
||||||
|
|
||||||
|
else:
|
||||||
|
# It's an un-sharded container, no tricks.
|
||||||
|
entry = {'bytes': 30968411,
|
||||||
|
'hash': '60303f4122966fe5925f045eb52d1129',
|
||||||
|
'name': '%s' % prefix,
|
||||||
|
'content_type': 'video/mp4',
|
||||||
|
'last_modified': '2017-08-15T03:30:57.693210'}
|
||||||
|
return {}, [entry]
|
||||||
|
|
||||||
|
conf = self.conf.copy()
|
||||||
|
conf['watchers'] = 'test_watcher1'
|
||||||
|
conf['__file__'] = '/etc/swift/swift.conf'
|
||||||
|
|
||||||
|
ret_config = {'test_watcher1': {'action': 'log', 'grace_age': '0'}}
|
||||||
|
with mock.patch('swift.obj.auditor.parse_prefixed_conf',
|
||||||
|
return_value=ret_config), \
|
||||||
|
mock.patch('swift.obj.auditor.load_pkg_resource',
|
||||||
|
side_effect=[DarkDataWatcher]):
|
||||||
|
my_auditor = auditor.ObjectAuditor(conf, logger=self.logger)
|
||||||
|
|
||||||
|
with mock.patch('swift.obj.watchers.dark_data.Ring', FakeRing1), \
|
||||||
|
mock.patch("swift.obj.watchers.dark_data.direct_get_container",
|
||||||
|
fake_direct_get_container):
|
||||||
|
my_auditor.run_audit(mode='once')
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
call_stack[-3:],
|
||||||
|
[(main_acc, cont, {'X-Backend-Record-Type': 'auto'}),
|
||||||
|
(shard_acc, cont, {'X-Backend-Record-Type': 'auto'}),
|
||||||
|
(main_acc, cont, {'X-Backend-Record-Type': 'object'})])
|
||||||
|
|
||||||
|
log_lines = self.logger.get_lines_for_level('info')
|
||||||
|
self.assertIn(
|
||||||
|
'[audit-watcher test_watcher1] total unknown 0 ok 3 dark 0',
|
||||||
|
log_lines)
|
||||||
|
|
||||||
|
def test_dark_data_with_sharding_fallback_to_root_no_objects(self):
|
||||||
|
|
||||||
|
# We use the EC object because it's all alone in its fake container.
|
||||||
|
main_acc = self.disk_file_ec._account
|
||||||
|
shard_acc = ".shards_%s" % main_acc
|
||||||
|
cont = self.disk_file_ec._container
|
||||||
|
call_stack = []
|
||||||
|
|
||||||
|
def fake_direct_get_container(node, part, account, container,
|
||||||
|
prefix=None, limit=None,
|
||||||
|
extra_params=None, headers=None):
|
||||||
|
self.assertEqual(part, 1)
|
||||||
|
self.assertEqual(limit, 1)
|
||||||
|
call_stack.append((account, container, headers))
|
||||||
|
|
||||||
|
if account == shard_acc:
|
||||||
|
# return a shard listing that actaully points to the root OSR
|
||||||
|
entry = {
|
||||||
|
'name': '%s/%s' % (main_acc, cont),
|
||||||
|
'timestamp': '1630106063.23826',
|
||||||
|
'lower': '',
|
||||||
|
'upper': '',
|
||||||
|
'object_count': 1,
|
||||||
|
'bytes_used': 1024,
|
||||||
|
'meta_timestamp': '1630106063.23826',
|
||||||
|
'deleted': 0,
|
||||||
|
'state': 'sharded',
|
||||||
|
'state_timestamp': '1630106063.23826',
|
||||||
|
'epoch': None,
|
||||||
|
'reported': 1,
|
||||||
|
'tombstones': -1}
|
||||||
|
return {'X-Backend-Record-Type': 'shard'}, [entry]
|
||||||
|
|
||||||
|
else:
|
||||||
|
if account == main_acc and container == cont:
|
||||||
|
if headers['X-Backend-Record-Type'] == 'auto':
|
||||||
|
# The root container has no listing but has a shard
|
||||||
|
# range.
|
||||||
|
entry = {
|
||||||
|
'name': '%s/%s' % (shard_acc, cont),
|
||||||
|
'timestamp': '1630106063.23826',
|
||||||
|
'lower': '',
|
||||||
|
'upper': '',
|
||||||
|
'object_count': 1,
|
||||||
|
'bytes_used': 1024,
|
||||||
|
'meta_timestamp': '1630106063.23826',
|
||||||
|
'deleted': 0,
|
||||||
|
'state': 'sharded',
|
||||||
|
'state_timestamp': '1630106063.23826',
|
||||||
|
'epoch': None,
|
||||||
|
'reported': 1,
|
||||||
|
'tombstones': -1}
|
||||||
|
return {'X-Backend-Record-Type': 'shard'}, [entry]
|
||||||
|
else:
|
||||||
|
# we've come back with a direct record-type = object
|
||||||
|
self.assertEqual(headers['X-Backend-Record-Type'],
|
||||||
|
'object')
|
||||||
|
return {'X-Backend-Record-Type': 'object'}, []
|
||||||
|
|
||||||
|
else:
|
||||||
|
# It's an un-sharded container, no tricks.
|
||||||
|
entry = {'bytes': 30968411,
|
||||||
|
'hash': '60303f4122966fe5925f045eb52d1129',
|
||||||
|
'name': '%s' % prefix,
|
||||||
|
'content_type': 'video/mp4',
|
||||||
|
'last_modified': '2017-08-15T03:30:57.693210'}
|
||||||
|
return {}, [entry]
|
||||||
|
|
||||||
|
conf = self.conf.copy()
|
||||||
|
conf['watchers'] = 'test_watcher1'
|
||||||
|
conf['__file__'] = '/etc/swift/swift.conf'
|
||||||
|
|
||||||
|
ret_config = {'test_watcher1': {'action': 'log', 'grace_age': '0'}}
|
||||||
|
with mock.patch('swift.obj.auditor.parse_prefixed_conf',
|
||||||
|
return_value=ret_config), \
|
||||||
|
mock.patch('swift.obj.auditor.load_pkg_resource',
|
||||||
|
side_effect=[DarkDataWatcher]):
|
||||||
|
my_auditor = auditor.ObjectAuditor(conf, logger=self.logger)
|
||||||
|
|
||||||
|
with mock.patch('swift.obj.watchers.dark_data.Ring', FakeRing1), \
|
||||||
|
mock.patch("swift.obj.watchers.dark_data.direct_get_container",
|
||||||
|
fake_direct_get_container):
|
||||||
|
my_auditor.run_audit(mode='once')
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
call_stack[-3:],
|
||||||
|
[(main_acc, cont, {'X-Backend-Record-Type': 'auto'}),
|
||||||
|
(shard_acc, cont, {'X-Backend-Record-Type': 'auto'}),
|
||||||
|
(main_acc, cont, {'X-Backend-Record-Type': 'object'})])
|
||||||
|
|
||||||
|
log_lines = self.logger.get_lines_for_level('info')
|
||||||
|
self.assertIn(
|
||||||
|
'[audit-watcher test_watcher1] total unknown 0 ok 2 dark 1',
|
||||||
|
log_lines)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Loading…
Reference in New Issue
Block a user