swift/test/unit/obj/test_reconstructor.py

2814 lines
117 KiB
Python
Raw Normal View History

# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import unittest
import os
from hashlib import md5
import mock
import six.moves.cPickle as pickle
import tempfile
import time
import shutil
import re
import random
import struct
from eventlet import Timeout, sleep
from contextlib import closing, contextmanager
from gzip import GzipFile
from shutil import rmtree
from swift.common import utils
from swift.common.swob import HeaderKeyDict
from swift.common.exceptions import DiskFileError
from swift.obj import diskfile, reconstructor as object_reconstructor
from swift.common import ring
from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy,
POLICIES, EC_POLICY)
from swift.obj.reconstructor import REVERT
from test.unit import (patch_policies, debug_logger, mocked_http_conn,
FabricatedRing, make_timestamp_iter)
@contextmanager
def mock_ssync_sender(ssync_calls=None, response_callback=None, **kwargs):
def fake_ssync(daemon, node, job, suffixes):
if ssync_calls is not None:
ssync_calls.append(
{'node': node, 'job': job, 'suffixes': suffixes})
def fake_call():
if response_callback:
response = response_callback(node, job, suffixes)
else:
response = True, {}
return response
return fake_call
with mock.patch('swift.obj.reconstructor.ssync_sender', fake_ssync):
yield fake_ssync
def make_ec_archive_bodies(policy, test_body):
segment_size = policy.ec_segment_size
# split up the body into buffers
chunks = [test_body[x:x + segment_size]
for x in range(0, len(test_body), segment_size)]
# encode the buffers into fragment payloads
fragment_payloads = []
for chunk in chunks:
fragments = policy.pyeclib_driver.encode(chunk)
if not fragments:
break
fragment_payloads.append(fragments)
# join up the fragment payloads per node
Allow 1+ object-servers-per-disk deployment Enabled by a new > 0 integer config value, "servers_per_port" in the [DEFAULT] config section for object-server and/or replication server configs. The setting's integer value determines how many different object-server workers handle requests for any single unique local port in the ring. In this mode, the parent swift-object-server process continues to run as the original user (i.e. root if low-port binding is required), binds to all ports as defined in the ring, and forks off the specified number of workers per listen socket. The child, per-port servers drop privileges and behave pretty much how object-server workers always have, except that because the ring has unique ports per disk, the object-servers will only be handling requests for a single disk. The parent process detects dead servers and restarts them (with the correct listen socket), starts missing servers when an updated ring file is found with a device on the server with a new port, and kills extraneous servers when their port is found to no longer be in the ring. The ring files are stat'ed at most every "ring_check_interval" seconds, as configured in the object-server config (same default of 15s). Immediately stopping all swift-object-worker processes still works by sending the parent a SIGTERM. Likewise, a SIGHUP to the parent process still causes the parent process to close all listen sockets and exit, allowing existing children to finish serving their existing requests. The drop_privileges helper function now has an optional param to suppress the setsid() call, which otherwise screws up the child workers' process management. The class method RingData.load() can be told to only load the ring metadata (i.e. everything except replica2part2dev_id) with the optional kwarg, header_only=True. This is used to keep the parent and all forked off workers from unnecessarily having full copies of all storage policy rings in memory. A new helper class, swift.common.storage_policy.BindPortsCache, provides a method to return a set of all device ports in all rings for the server on which it is instantiated (identified by its set of IP addresses). The BindPortsCache instance will track mtimes of ring files, so they are not opened more frequently than necessary. This patch includes enhancements to the probe tests and object-replicator/object-reconstructor config plumbing to allow the probe tests to work correctly both in the "normal" config (same IP but unique ports for each SAIO "server") and a server-per-port setup where each SAIO "server" must have a unique IP address and unique port per disk within each "server". The main probe tests only work with 4 servers and 4 disks, but you can see the difference in the rings for the EC probe tests where there are 2 disks per server for a total of 8 disks. Specifically, swift.common.ring.utils.is_local_device() will ignore the ports when the "my_port" argument is None. Then, object-replicator and object-reconstructor both set self.bind_port to None if server_per_port is enabled. Bonus improvement for IPv6 addresses in is_local_device(). This PR for vagrant-swift-all-in-one will aid in testing this patch: https://github.com/swiftstack/vagrant-swift-all-in-one/pull/16/ Also allow SAIO to answer is_local_device() better; common SAIO setups have multiple "servers" all on the same host with different ports for the different "servers" (which happen to match the IPs specified in the rings for the devices on each of those "servers"). However, you can configure the SAIO to have different localhost IP addresses (e.g. 127.0.0.1, 127.0.0.2, etc.) in the ring and in the servers' config files' bind_ip setting. This new whataremyips() implementation combined with a little plumbing allows is_local_device() to accurately answer, even on an SAIO. In the default case (an unspecified bind_ip defaults to '0.0.0.0') as well as an explict "bind to everything" like '0.0.0.0' or '::', whataremyips() behaves as it always has, returning all IP addresses for the server. Also updated probe tests to handle each "server" in the SAIO having a unique IP address. For some (noisy) benchmarks that show servers_per_port=X is at least as good as the same number of "normal" workers: https://gist.github.com/dbishop/c214f89ca708a6b1624a#file-summary-md Benchmarks showing the benefits of I/O isolation with a small number of slow disks: https://gist.github.com/dbishop/fd0ab067babdecfb07ca#file-results-md If you were wondering what the overhead of threads_per_disk looks like: https://gist.github.com/dbishop/1d14755fedc86a161718#file-tabular_results-md DocImpact Change-Id: I2239a4000b41a7e7cc53465ce794af49d44796c6
2015-05-14 22:14:15 -07:00
ec_archive_bodies = [''.join(frags) for frags in zip(*fragment_payloads)]
return ec_archive_bodies
def _create_test_rings(path):
testgz = os.path.join(path, 'object.ring.gz')
intended_replica2part2dev_id = [
[0, 1, 2],
[1, 2, 3],
[2, 3, 0]
]
intended_devs = [
{'id': 0, 'device': 'sda1', 'zone': 0, 'ip': '127.0.0.0',
'port': 6000},
{'id': 1, 'device': 'sda1', 'zone': 1, 'ip': '127.0.0.1',
'port': 6000},
{'id': 2, 'device': 'sda1', 'zone': 2, 'ip': '127.0.0.2',
'port': 6000},
{'id': 3, 'device': 'sda1', 'zone': 4, 'ip': '127.0.0.3',
'port': 6000}
]
intended_part_shift = 30
with closing(GzipFile(testgz, 'wb')) as f:
pickle.dump(
ring.RingData(intended_replica2part2dev_id,
intended_devs, intended_part_shift),
f)
testgz = os.path.join(path, 'object-1.ring.gz')
with closing(GzipFile(testgz, 'wb')) as f:
pickle.dump(
ring.RingData(intended_replica2part2dev_id,
intended_devs, intended_part_shift),
f)
def count_stats(logger, key, metric):
count = 0
for record in logger.log_dict[key]:
log_args, log_kwargs = record
m = log_args[0]
if re.match(metric, m):
count += 1
return count
def get_header_frag_index(self, body):
metadata = self.policy.pyeclib_driver.get_metadata(body)
frag_index = struct.unpack('h', metadata[:2])[0]
return {
'X-Object-Sysmeta-Ec-Frag-Index': frag_index,
}
@patch_policies([StoragePolicy(0, name='zero', is_default=True),
ECStoragePolicy(1, name='one', ec_type='jerasure_rs_vand',
ec_ndata=2, ec_nparity=1)])
class TestGlobalSetupObjectReconstructor(unittest.TestCase):
def setUp(self):
self.testdir = tempfile.mkdtemp()
_create_test_rings(self.testdir)
POLICIES[0].object_ring = ring.Ring(self.testdir, ring_name='object')
POLICIES[1].object_ring = ring.Ring(self.testdir, ring_name='object-1')
utils.HASH_PATH_SUFFIX = 'endcap'
utils.HASH_PATH_PREFIX = ''
self.devices = os.path.join(self.testdir, 'node')
os.makedirs(self.devices)
os.mkdir(os.path.join(self.devices, 'sda1'))
self.objects = os.path.join(self.devices, 'sda1',
diskfile.get_data_dir(POLICIES[0]))
self.objects_1 = os.path.join(self.devices, 'sda1',
diskfile.get_data_dir(POLICIES[1]))
os.mkdir(self.objects)
os.mkdir(self.objects_1)
self.parts = {}
self.parts_1 = {}
self.part_nums = ['0', '1', '2']
for part in self.part_nums:
self.parts[part] = os.path.join(self.objects, part)
os.mkdir(self.parts[part])
self.parts_1[part] = os.path.join(self.objects_1, part)
os.mkdir(self.parts_1[part])
self.conf = dict(
swift_dir=self.testdir, devices=self.devices, mount_check='false',
timeout='300', stats_interval='1')
self.logger = debug_logger('test-reconstructor')
self.reconstructor = object_reconstructor.ObjectReconstructor(
self.conf, logger=self.logger)
self.policy = POLICIES[1]
# most of the reconstructor test methods require that there be
# real objects in place, not just part dirs, so we'll create them
# all here....
# part 0: 3C1/hash/xxx-1.data <-- job: sync_only - parnters (FI 1)
# /xxx.durable <-- included in earlier job (FI 1)
# 061/hash/xxx-1.data <-- included in earlier job (FI 1)
# /xxx.durable <-- included in earlier job (FI 1)
# /xxx-2.data <-- job: sync_revert to index 2
# part 1: 3C1/hash/xxx-0.data <-- job: sync_only - parnters (FI 0)
# /xxx-1.data <-- job: sync_revert to index 1
# /xxx.durable <-- included in earlier jobs (FI 0, 1)
# 061/hash/xxx-1.data <-- included in earlier job (FI 1)
# /xxx.durable <-- included in earlier job (FI 1)
# part 2: 3C1/hash/xxx-2.data <-- job: sync_revert to index 2
# /xxx.durable <-- included in earlier job (FI 2)
# 061/hash/xxx-0.data <-- job: sync_revert to index 0
# /xxx.durable <-- included in earlier job (FI 0)
def _create_frag_archives(policy, obj_path, local_id, obj_set):
# we'll create 2 sets of objects in different suffix dirs
# so we cover all the scenarios we want (3 of them)
# 1) part dir with all FI's matching the local node index
# 2) part dir with one local and mix of others
# 3) part dir with no local FI and one or more others
def part_0(set):
if set == 0:
# just the local
return local_id
else:
# onde local and all of another
if obj_num == 0:
return local_id
else:
return (local_id + 1) % 3
def part_1(set):
if set == 0:
# one local and all of another
if obj_num == 0:
return local_id
else:
return (local_id + 2) % 3
else:
# just the local node
return local_id
def part_2(set):
# this part is a handoff in our config (always)
# so lets do a set with indicies from different nodes
if set == 0:
return (local_id + 1) % 3
else:
return (local_id + 2) % 3
# function dictionary for defining test scenarios base on set #
scenarios = {'0': part_0,
'1': part_1,
'2': part_2}
def _create_df(obj_num, part_num):
self._create_diskfile(
part=part_num, object_name='o' + str(obj_set),
policy=policy, frag_index=scenarios[part_num](obj_set),
timestamp=utils.Timestamp(t))
for part_num in self.part_nums:
# create 3 unique objcets per part, each part
# will then have a unique mix of FIs for the
# possible scenarios
for obj_num in range(0, 3):
_create_df(obj_num, part_num)
ips = utils.whataremyips()
for policy in [p for p in POLICIES if p.policy_type == EC_POLICY]:
self.ec_policy = policy
self.ec_obj_ring = self.reconstructor.load_object_ring(
self.ec_policy)
data_dir = diskfile.get_data_dir(self.ec_policy)
for local_dev in [dev for dev in self.ec_obj_ring.devs
if dev and dev['replication_ip'] in ips and
dev['replication_port'] ==
self.reconstructor.port]:
self.ec_local_dev = local_dev
dev_path = os.path.join(self.reconstructor.devices_dir,
self.ec_local_dev['device'])
self.ec_obj_path = os.path.join(dev_path, data_dir)
# create a bunch of FA's to test
t = 1421181937.70054 # time.time()
with mock.patch('swift.obj.diskfile.time') as mock_time:
# since (a) we are using a fixed time here to create
# frags which corresponds to all the hardcoded hashes and
# (b) the EC diskfile will delete its .data file right
# after creating if it has expired, use this horrible hack
# to prevent the reclaim happening
mock_time.time.return_value = 0.0
_create_frag_archives(self.ec_policy, self.ec_obj_path,
self.ec_local_dev['id'], 0)
_create_frag_archives(self.ec_policy, self.ec_obj_path,
self.ec_local_dev['id'], 1)
break
break
def tearDown(self):
rmtree(self.testdir, ignore_errors=1)
def _create_diskfile(self, policy=None, part=0, object_name='o',
frag_index=0, timestamp=None, test_data=None):
policy = policy or self.policy
df_mgr = self.reconstructor._df_router[policy]
df = df_mgr.get_diskfile('sda1', part, 'a', 'c', object_name,
policy=policy)
with df.create() as writer:
timestamp = timestamp or utils.Timestamp(time.time())
test_data = test_data or 'test data'
writer.write(test_data)
metadata = {
'X-Timestamp': timestamp.internal,
'Content-Length': len(test_data),
'Etag': md5(test_data).hexdigest(),
'X-Object-Sysmeta-Ec-Frag-Index': frag_index,
}
writer.put(metadata)
writer.commit(timestamp)
return df
def assert_expected_jobs(self, part_num, jobs):
for job in jobs:
del job['path']
del job['policy']
if 'local_index' in job:
del job['local_index']
job['suffixes'].sort()
expected = []
# part num 0
expected.append(
[{
'sync_to': [{
'index': 2,
'replication_port': 6000,
'zone': 2,
'ip': '127.0.0.2',
'region': 1,
'port': 6000,
'replication_ip': '127.0.0.2',
'device': 'sda1',
'id': 2,
}],
'job_type': object_reconstructor.REVERT,
'suffixes': ['061'],
'partition': 0,
'frag_index': 2,
'device': 'sda1',
'local_dev': {
'replication_port': 6000,
'zone': 1,
'ip': '127.0.0.1',
'region': 1,
'id': 1,
'replication_ip': '127.0.0.1',
'device': 'sda1', 'port': 6000,
},
'hashes': {
'061': {
None: '85b02a5283704292a511078a5c483da5',
2: '0e6e8d48d801dc89fd31904ae3b31229',
1: '0e6e8d48d801dc89fd31904ae3b31229',
},
'3c1': {
None: '85b02a5283704292a511078a5c483da5',
1: '0e6e8d48d801dc89fd31904ae3b31229',
},
},
}, {
'sync_to': [{
'index': 0,
'replication_port': 6000,
'zone': 0,
'ip': '127.0.0.0',
'region': 1,
'port': 6000,
'replication_ip': '127.0.0.0',
'device': 'sda1', 'id': 0,
}, {
'index': 2,
'replication_port': 6000,
'zone': 2,
'ip': '127.0.0.2',
'region': 1,
'port': 6000,
'replication_ip': '127.0.0.2',
'device': 'sda1',
'id': 2,
}],
'job_type': object_reconstructor.SYNC,
'sync_diskfile_builder': self.reconstructor.reconstruct_fa,
'suffixes': ['061', '3c1'],
'partition': 0,
'frag_index': 1,
'device': 'sda1',
'local_dev': {
'replication_port': 6000,
'zone': 1,
'ip': '127.0.0.1',
'region': 1,
'id': 1,
'replication_ip': '127.0.0.1',
'device': 'sda1',
'port': 6000,
},
'hashes':
{
'061': {
None: '85b02a5283704292a511078a5c483da5',
2: '0e6e8d48d801dc89fd31904ae3b31229',
1: '0e6e8d48d801dc89fd31904ae3b31229'
},
'3c1': {
None: '85b02a5283704292a511078a5c483da5',
1: '0e6e8d48d801dc89fd31904ae3b31229',
},
},
}]
)
# part num 1
expected.append(
[{
'sync_to': [{
'index': 1,
'replication_port': 6000,
'zone': 2,
'ip': '127.0.0.2',
'region': 1,
'port': 6000,
'replication_ip': '127.0.0.2',
'device': 'sda1',
'id': 2,
}],
'job_type': object_reconstructor.REVERT,
'suffixes': ['061', '3c1'],
'partition': 1,
'frag_index': 1,
'device': 'sda1',
'local_dev': {
'replication_port': 6000,
'zone': 1,
'ip': '127.0.0.1',
'region': 1,
'id': 1,
'replication_ip': '127.0.0.1',
'device': 'sda1',
'port': 6000,
},
'hashes':
{
'061': {
None: '85b02a5283704292a511078a5c483da5',
1: '0e6e8d48d801dc89fd31904ae3b31229',
},
'3c1': {
0: '0e6e8d48d801dc89fd31904ae3b31229',
None: '85b02a5283704292a511078a5c483da5',
1: '0e6e8d48d801dc89fd31904ae3b31229',
},
},
}, {
'sync_to': [{
'index': 2,
'replication_port': 6000,
'zone': 4,
'ip': '127.0.0.3',
'region': 1,
'port': 6000,
'replication_ip': '127.0.0.3',
'device': 'sda1', 'id': 3,
}, {
'index': 1,
'replication_port': 6000,
'zone': 2,
'ip': '127.0.0.2',
'region': 1,
'port': 6000,
'replication_ip': '127.0.0.2',
'device': 'sda1',
'id': 2,
}],
'job_type': object_reconstructor.SYNC,
'sync_diskfile_builder': self.reconstructor.reconstruct_fa,
'suffixes': ['3c1'],
'partition': 1,
'frag_index': 0,
'device': 'sda1',
'local_dev': {
'replication_port': 6000,
'zone': 1,
'ip': '127.0.0.1',
'region': 1,
'id': 1,
'replication_ip': '127.0.0.1',
'device': 'sda1',
'port': 6000,
},
'hashes': {
'061': {
None: '85b02a5283704292a511078a5c483da5',
1: '0e6e8d48d801dc89fd31904ae3b31229',
},
'3c1': {
0: '0e6e8d48d801dc89fd31904ae3b31229',
None: '85b02a5283704292a511078a5c483da5',
1: '0e6e8d48d801dc89fd31904ae3b31229',
},
},
}]
)
# part num 2
expected.append(
[{
'sync_to': [{
'index': 0,
'replication_port': 6000,
'zone': 2,
'ip': '127.0.0.2',
'region': 1,
'port': 6000,
'replication_ip': '127.0.0.2',
'device': 'sda1', 'id': 2,
}],
'job_type': object_reconstructor.REVERT,
'suffixes': ['061'],
'partition': 2,
'frag_index': 0,
'device': 'sda1',
'local_dev': {
'replication_port': 6000,
'zone': 1,
'ip': '127.0.0.1',
'region': 1,
'id': 1,
'replication_ip': '127.0.0.1',
'device': 'sda1',
'port': 6000,
},
'hashes': {
'061': {
0: '0e6e8d48d801dc89fd31904ae3b31229',
None: '85b02a5283704292a511078a5c483da5'
},
'3c1': {
None: '85b02a5283704292a511078a5c483da5',
2: '0e6e8d48d801dc89fd31904ae3b31229'
},
},
}, {
'sync_to': [{
'index': 2,
'replication_port': 6000,
'zone': 0,
'ip': '127.0.0.0',
'region': 1,
'port': 6000,
'replication_ip': '127.0.0.0',
'device': 'sda1',
'id': 0,
}],
'job_type': object_reconstructor.REVERT,
'suffixes': ['3c1'],
'partition': 2,
'frag_index': 2,
'device': 'sda1',
'local_dev': {
'replication_port': 6000,
'zone': 1,
'ip': '127.0.0.1',
'region': 1,
'id': 1,
'replication_ip': '127.0.0.1',
'device': 'sda1',
'port': 6000
},
'hashes': {
'061': {
0: '0e6e8d48d801dc89fd31904ae3b31229',
None: '85b02a5283704292a511078a5c483da5'
},
'3c1': {
None: '85b02a5283704292a511078a5c483da5',
2: '0e6e8d48d801dc89fd31904ae3b31229'
},
},
}]
)
def check_jobs(part_num):
try:
expected_jobs = expected[int(part_num)]
except (IndexError, ValueError):
self.fail('Unknown part number %r' % part_num)
expected_by_part_frag_index = dict(
((j['partition'], j['frag_index']), j) for j in expected_jobs)
for job in jobs:
job_key = (job['partition'], job['frag_index'])
if job_key in expected_by_part_frag_index:
for k, value in job.items():
expected_value = \
expected_by_part_frag_index[job_key][k]
try:
if isinstance(value, list):
value.sort()
expected_value.sort()
self.assertEqual(value, expected_value)
except AssertionError as e:
extra_info = \
'\n\n... for %r in part num %s job %r' % (
Allow 1+ object-servers-per-disk deployment Enabled by a new > 0 integer config value, "servers_per_port" in the [DEFAULT] config section for object-server and/or replication server configs. The setting's integer value determines how many different object-server workers handle requests for any single unique local port in the ring. In this mode, the parent swift-object-server process continues to run as the original user (i.e. root if low-port binding is required), binds to all ports as defined in the ring, and forks off the specified number of workers per listen socket. The child, per-port servers drop privileges and behave pretty much how object-server workers always have, except that because the ring has unique ports per disk, the object-servers will only be handling requests for a single disk. The parent process detects dead servers and restarts them (with the correct listen socket), starts missing servers when an updated ring file is found with a device on the server with a new port, and kills extraneous servers when their port is found to no longer be in the ring. The ring files are stat'ed at most every "ring_check_interval" seconds, as configured in the object-server config (same default of 15s). Immediately stopping all swift-object-worker processes still works by sending the parent a SIGTERM. Likewise, a SIGHUP to the parent process still causes the parent process to close all listen sockets and exit, allowing existing children to finish serving their existing requests. The drop_privileges helper function now has an optional param to suppress the setsid() call, which otherwise screws up the child workers' process management. The class method RingData.load() can be told to only load the ring metadata (i.e. everything except replica2part2dev_id) with the optional kwarg, header_only=True. This is used to keep the parent and all forked off workers from unnecessarily having full copies of all storage policy rings in memory. A new helper class, swift.common.storage_policy.BindPortsCache, provides a method to return a set of all device ports in all rings for the server on which it is instantiated (identified by its set of IP addresses). The BindPortsCache instance will track mtimes of ring files, so they are not opened more frequently than necessary. This patch includes enhancements to the probe tests and object-replicator/object-reconstructor config plumbing to allow the probe tests to work correctly both in the "normal" config (same IP but unique ports for each SAIO "server") and a server-per-port setup where each SAIO "server" must have a unique IP address and unique port per disk within each "server". The main probe tests only work with 4 servers and 4 disks, but you can see the difference in the rings for the EC probe tests where there are 2 disks per server for a total of 8 disks. Specifically, swift.common.ring.utils.is_local_device() will ignore the ports when the "my_port" argument is None. Then, object-replicator and object-reconstructor both set self.bind_port to None if server_per_port is enabled. Bonus improvement for IPv6 addresses in is_local_device(). This PR for vagrant-swift-all-in-one will aid in testing this patch: https://github.com/swiftstack/vagrant-swift-all-in-one/pull/16/ Also allow SAIO to answer is_local_device() better; common SAIO setups have multiple "servers" all on the same host with different ports for the different "servers" (which happen to match the IPs specified in the rings for the devices on each of those "servers"). However, you can configure the SAIO to have different localhost IP addresses (e.g. 127.0.0.1, 127.0.0.2, etc.) in the ring and in the servers' config files' bind_ip setting. This new whataremyips() implementation combined with a little plumbing allows is_local_device() to accurately answer, even on an SAIO. In the default case (an unspecified bind_ip defaults to '0.0.0.0') as well as an explict "bind to everything" like '0.0.0.0' or '::', whataremyips() behaves as it always has, returning all IP addresses for the server. Also updated probe tests to handle each "server" in the SAIO having a unique IP address. For some (noisy) benchmarks that show servers_per_port=X is at least as good as the same number of "normal" workers: https://gist.github.com/dbishop/c214f89ca708a6b1624a#file-summary-md Benchmarks showing the benefits of I/O isolation with a small number of slow disks: https://gist.github.com/dbishop/fd0ab067babdecfb07ca#file-results-md If you were wondering what the overhead of threads_per_disk looks like: https://gist.github.com/dbishop/1d14755fedc86a161718#file-tabular_results-md DocImpact Change-Id: I2239a4000b41a7e7cc53465ce794af49d44796c6
2015-05-14 22:14:15 -07:00
k, part_num, job_key)
raise AssertionError(str(e) + extra_info)
else:
self.fail(
'Unexpected job %r for part num %s - '
'expected jobs where %r' % (
job_key, part_num,
expected_by_part_frag_index.keys()))
for expected_job in expected_jobs:
if expected_job in jobs:
jobs.remove(expected_job)
self.assertFalse(jobs) # that should be all of them
check_jobs(part_num)
def _run_once(self, http_count, extra_devices, override_devices=None):
ring_devs = list(self.policy.object_ring.devs)
for device, parts in extra_devices.items():
device_path = os.path.join(self.devices, device)
os.mkdir(device_path)
for part in range(parts):
os.makedirs(os.path.join(device_path, 'objects-1', str(part)))
# we update the ring to make is_local happy
devs = [dict(d) for d in ring_devs]
for d in devs:
d['device'] = device
self.policy.object_ring.devs.extend(devs)
self.reconstructor.stats_interval = 0
self.process_job = lambda j: sleep(0)
with mocked_http_conn(*[200] * http_count, body=pickle.dumps({})):
with mock_ssync_sender():
self.reconstructor.run_once(devices=override_devices)
def test_run_once(self):
# sda1: 3 is done in setup
extra_devices = {
'sdb1': 4,
'sdc1': 1,
'sdd1': 0,
}
self._run_once(18, extra_devices)
stats_lines = set()
for line in self.logger.get_lines_for_level('info'):
if 'devices reconstructed in' not in line:
continue
stat_line = line.split('of', 1)[0].strip()
stats_lines.add(stat_line)
acceptable = set([
'0/3 (0.00%) partitions',
'8/8 (100.00%) partitions',
])
matched = stats_lines & acceptable
self.assertEqual(matched, acceptable,
'missing some expected acceptable:\n%s' % (
'\n'.join(sorted(acceptable - matched))))
self.assertEqual(self.reconstructor.reconstruction_device_count, 4)
self.assertEqual(self.reconstructor.reconstruction_part_count, 8)
self.assertEqual(self.reconstructor.part_count, 8)
def test_run_once_override_devices(self):
# sda1: 3 is done in setup
extra_devices = {
'sdb1': 4,
'sdc1': 1,
'sdd1': 0,
}
self._run_once(2, extra_devices, 'sdc1')
stats_lines = set()
for line in self.logger.get_lines_for_level('info'):
if 'devices reconstructed in' not in line:
continue
stat_line = line.split('of', 1)[0].strip()
stats_lines.add(stat_line)
acceptable = set([
'1/1 (100.00%) partitions',
])
matched = stats_lines & acceptable
self.assertEqual(matched, acceptable,
'missing some expected acceptable:\n%s' % (
'\n'.join(sorted(acceptable - matched))))
self.assertEqual(self.reconstructor.reconstruction_device_count, 1)
self.assertEqual(self.reconstructor.reconstruction_part_count, 1)
self.assertEqual(self.reconstructor.part_count, 1)
def test_get_response(self):
part = self.part_nums[0]
node = POLICIES[0].object_ring.get_part_nodes(int(part))[0]
for stat_code in (200, 400):
with mocked_http_conn(stat_code):
resp = self.reconstructor._get_response(node, part,
path='nada',
headers={},
policy=POLICIES[0])
if resp:
self.assertEqual(resp.status, 200)
else:
self.assertEqual(
len(self.reconstructor.logger.log_dict['warning']), 1)
def test_reconstructor_does_not_log_on_404(self):
part = self.part_nums[0]
node = POLICIES[0].object_ring.get_part_nodes(int(part))[0]
with mocked_http_conn(404):
self.reconstructor._get_response(node, part,
path='some_path',
headers={},
policy=POLICIES[0])
# Make sure that no warnings are emitted for a 404
len_warning_lines = len(self.logger.get_lines_for_level('warning'))
self.assertEqual(len_warning_lines, 0)
def test_reconstructor_skips_bogus_partition_dirs(self):
# A directory in the wrong place shouldn't crash the reconstructor
self.reconstructor._reset_stats()
rmtree(self.objects_1)
os.mkdir(self.objects_1)
os.mkdir(os.path.join(self.objects_1, "burrito"))
jobs = []
for part_info in self.reconstructor.collect_parts():
jobs += self.reconstructor.build_reconstruction_jobs(part_info)
self.assertEqual(len(jobs), 0)
def test_check_ring(self):
testring = tempfile.mkdtemp()
_create_test_rings(testring)
obj_ring = ring.Ring(testring, ring_name='object') # noqa
self.assertTrue(self.reconstructor.check_ring(obj_ring))
orig_check = self.reconstructor.next_check
self.reconstructor.next_check = orig_check - 30
self.assertTrue(self.reconstructor.check_ring(obj_ring))
self.reconstructor.next_check = orig_check
orig_ring_time = obj_ring._mtime
obj_ring._mtime = orig_ring_time - 30
self.assertTrue(self.reconstructor.check_ring(obj_ring))
self.reconstructor.next_check = orig_check - 30
self.assertFalse(self.reconstructor.check_ring(obj_ring))
rmtree(testring, ignore_errors=1)
def test_build_reconstruction_jobs(self):
self.reconstructor.handoffs_first = False
self.reconstructor._reset_stats()
for part_info in self.reconstructor.collect_parts():
jobs = self.reconstructor.build_reconstruction_jobs(part_info)
self.assertTrue(jobs[0]['job_type'] in
(object_reconstructor.SYNC,
object_reconstructor.REVERT))
self.assert_expected_jobs(part_info['partition'], jobs)
self.reconstructor.handoffs_first = True
self.reconstructor._reset_stats()
for part_info in self.reconstructor.collect_parts():
jobs = self.reconstructor.build_reconstruction_jobs(part_info)
self.assertTrue(jobs[0]['job_type'] ==
object_reconstructor.REVERT)
self.assert_expected_jobs(part_info['partition'], jobs)
def test_get_partners(self):
# we're going to perform an exhaustive test of every possible
# combination of partitions and nodes in our custom test ring
# format: [dev_id in question, 'part_num',
# [part_nodes for the given part], left id, right id...]
expected_partners = sorted([
(0, '0', [0, 1, 2], 2, 1), (0, '2', [2, 3, 0], 3, 2),
(1, '0', [0, 1, 2], 0, 2), (1, '1', [1, 2, 3], 3, 2),
(2, '0', [0, 1, 2], 1, 0), (2, '1', [1, 2, 3], 1, 3),
(2, '2', [2, 3, 0], 0, 3), (3, '1', [1, 2, 3], 2, 1),
(3, '2', [2, 3, 0], 2, 0), (0, '0', [0, 1, 2], 2, 1),
(0, '2', [2, 3, 0], 3, 2), (1, '0', [0, 1, 2], 0, 2),
(1, '1', [1, 2, 3], 3, 2), (2, '0', [0, 1, 2], 1, 0),
(2, '1', [1, 2, 3], 1, 3), (2, '2', [2, 3, 0], 0, 3),
(3, '1', [1, 2, 3], 2, 1), (3, '2', [2, 3, 0], 2, 0),
])
got_partners = []
for pol in POLICIES:
obj_ring = pol.object_ring
for part_num in self.part_nums:
part_nodes = obj_ring.get_part_nodes(int(part_num))
primary_ids = [n['id'] for n in part_nodes]
for node in part_nodes:
partners = object_reconstructor._get_partners(
node['index'], part_nodes)
left = partners[0]['id']
right = partners[1]['id']
got_partners.append((
node['id'], part_num, primary_ids, left, right))
self.assertEqual(expected_partners, sorted(got_partners))
def test_collect_parts(self):
self.reconstructor._reset_stats()
parts = []
for part_info in self.reconstructor.collect_parts():
parts.append(part_info['partition'])
self.assertEqual(sorted(parts), [0, 1, 2])
def test_collect_parts_mkdirs_error(self):
def blowup_mkdirs(path):
raise OSError('Ow!')
self.reconstructor._reset_stats()
with mock.patch.object(object_reconstructor, 'mkdirs', blowup_mkdirs):
rmtree(self.objects_1, ignore_errors=1)
parts = []
for part_info in self.reconstructor.collect_parts():
parts.append(part_info['partition'])
error_lines = self.logger.get_lines_for_level('error')
self.assertEqual(len(error_lines), 1)
log_args, log_kwargs = self.logger.log_dict['error'][0]
self.assertEqual(str(log_kwargs['exc_info'][1]), 'Ow!')
def test_removes_zbf(self):
# After running xfs_repair, a partition directory could become a
# zero-byte file. If this happens, the reconstructor should clean it
# up, log something, and move on to the next partition.
# Surprise! Partition dir 1 is actually a zero-byte file.
pol_1_part_1_path = os.path.join(self.objects_1, '1')
rmtree(pol_1_part_1_path)
with open(pol_1_part_1_path, 'w'):
pass
self.assertTrue(os.path.isfile(pol_1_part_1_path)) # sanity check
# since our collect_parts job is a generator, that yields directly
# into build_jobs and then spawns it's safe to do the remove_files
# without making reconstructor startup slow
self.reconstructor._reset_stats()
for part_info in self.reconstructor.collect_parts():
self.assertNotEqual(pol_1_part_1_path, part_info['part_path'])
self.assertFalse(os.path.exists(pol_1_part_1_path))
warnings = self.reconstructor.logger.get_lines_for_level('warning')
self.assertEqual(1, len(warnings))
self.assertTrue('Unexpected entity in data dir:' in warnings[0],
'Warning not found in %s' % warnings)
def _make_fake_ssync(self, ssync_calls):
class _fake_ssync(object):
def __init__(self, daemon, node, job, suffixes, **kwargs):
# capture context and generate an available_map of objs
context = {}
context['node'] = node
context['job'] = job
context['suffixes'] = suffixes
self.suffixes = suffixes
self.daemon = daemon
self.job = job
hash_gen = self.daemon._diskfile_mgr.yield_hashes(
self.job['device'], self.job['partition'],
self.job['policy'], self.suffixes,
frag_index=self.job.get('frag_index'))
self.available_map = {}
Add POST capability to ssync for .meta files ssync currently does the wrong thing when replicating object dirs containing both a .data and a .meta file. The ssync sender uses a single PUT to send both object content and metadata to the receiver, using the metadata (.meta file) timestamp. This results in the object content timestamp being advanced to the metadata timestamp, potentially overwriting newer object data on the receiver and causing an inconsistency with the container server record for the object. For example, replicating an object dir with {t0.data(etag=x), t2.meta} to a receiver with t1.data(etag=y) will result in the creation of t2.data(etag=x) on the receiver. However, the container server will continue to list the object as t1(etag=y). This patch modifies ssync to replicate the content of .data and .meta separately using a PUT request for the data (no change) and a POST request for the metadata. In effect, ssync replication replicates the client operations that generated the .data and .meta files so that the result of replication is the same as if the original client requests had persisted on all object servers. Apart from maintaining correct timestamps across sync'd nodes, this has the added benefit of not needing to PUT objects when only the metadata has changed and a POST will suffice. Taking the same example, ssync sender will no longer PUT t0.data but will POST t2.meta resulting in the receiver having t1.data and t2.meta. The changes are backwards compatible: an upgraded sender will only sync data files to a legacy receiver and will not sync meta files (fixing the erroneous behavior described above); a legacy sender will operate as before when sync'ing to an upgraded receiver. Changes: - diskfile API provides methods to get the data file timestamp as distinct from the diskfile timestamp. - diskfile yield_hashes return tuple now passes a dict mapping data and meta (if any) timestamps to their respective values in the timestamp field. - ssync_sender will encode data and meta timestamps in the (hash_path, timestamp) tuple sent to the receiver during missing_checks. - ssync_receiver compares sender's data and meta timestamps to any local diskfile and may specify that only data or meta parts are sent during updates phase by appending a qualifier to the hash returned in its 'wanted' list. - ssync_sender now sends POST subrequests when a meta file exists and its content needs to be replicated. - ssync_sender may send *only* a POST if the receiver indicates that is the only part required to be sync'd. - object server will allow PUT and DELETE with earlier timestamp than a POST - Fixed TODO related to replicated objects with fast-POST and ssync Related spec change-id: I60688efc3df692d3a39557114dca8c5490f7837e Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com> Closes-Bug: 1501528 Change-Id: I97552d194e5cc342b0a3f4b9800de8aa6b9cb85b
2015-04-22 12:56:50 +01:00
for path, hash_, timestamps in hash_gen:
self.available_map[hash_] = timestamps
context['available_map'] = self.available_map
ssync_calls.append(context)
def __call__(self, *args, **kwargs):
return True, self.available_map
return _fake_ssync
def test_delete_reverted(self):
# verify reconstructor deletes reverted frag indexes after ssync'ing
def visit_obj_dirs(context):
for suff in context['suffixes']:
suff_dir = os.path.join(
context['job']['path'], suff)
for root, dirs, files in os.walk(suff_dir):
for d in dirs:
dirpath = os.path.join(root, d)
files = os.listdir(dirpath)
yield dirpath, files
n_files = n_files_after = 0
# run reconstructor with delete function mocked out to check calls
ssync_calls = []
delete_func =\
'swift.obj.reconstructor.ObjectReconstructor.delete_reverted_objs'
with mock.patch('swift.obj.reconstructor.ssync_sender',
self._make_fake_ssync(ssync_calls)):
with mocked_http_conn(*[200] * 12, body=pickle.dumps({})):
with mock.patch(delete_func) as mock_delete:
self.reconstructor.reconstruct()
expected_calls = []
for context in ssync_calls:
if context['job']['job_type'] == REVERT:
for dirpath, files in visit_obj_dirs(context):
# sanity check - expect some files to be in dir,
# may not be for the reverted frag index
self.assertTrue(files)
n_files += len(files)
expected_calls.append(mock.call(context['job'],
context['available_map'],
context['node']['index']))
mock_delete.assert_has_calls(expected_calls, any_order=True)
ssync_calls = []
with mock.patch('swift.obj.reconstructor.ssync_sender',
self._make_fake_ssync(ssync_calls)):
with mocked_http_conn(*[200] * 12, body=pickle.dumps({})):
self.reconstructor.reconstruct()
for context in ssync_calls:
if context['job']['job_type'] == REVERT:
data_file_tail = ('#%s.data'
% context['node']['index'])
for dirpath, files in visit_obj_dirs(context):
n_files_after += len(files)
for filename in files:
self.assertFalse(
filename.endswith(data_file_tail))
# sanity check that some files should were deleted
self.assertTrue(n_files > n_files_after)
def test_get_part_jobs(self):
# yeah, this test code expects a specific setup
self.assertEqual(len(self.part_nums), 3)
# OK, at this point we should have 4 loaded parts with one
jobs = []
for partition in os.listdir(self.ec_obj_path):
part_path = os.path.join(self.ec_obj_path, partition)
jobs = self.reconstructor._get_part_jobs(
self.ec_local_dev, part_path, int(partition), self.ec_policy)
self.assert_expected_jobs(partition, jobs)
def assertStatCount(self, stat_method, stat_prefix, expected_count):
count = count_stats(self.logger, stat_method, stat_prefix)
msg = 'expected %s != %s for %s %s' % (
expected_count, count, stat_method, stat_prefix)
self.assertEqual(expected_count, count, msg)
def test_delete_partition(self):
# part 2 is predefined to have all revert jobs
part_path = os.path.join(self.objects_1, '2')
self.assertTrue(os.access(part_path, os.F_OK))
ssync_calls = []
status = [200] * 2
body = pickle.dumps({})
with mocked_http_conn(*status, body=body) as request_log:
with mock.patch('swift.obj.reconstructor.ssync_sender',
self._make_fake_ssync(ssync_calls)):
self.reconstructor.reconstruct(override_partitions=[2])
expected_repliate_calls = set([
('127.0.0.0', '/sda1/2/3c1'),
('127.0.0.2', '/sda1/2/061'),
])
found_calls = set((r['ip'], r['path'])
for r in request_log.requests)
self.assertEqual(expected_repliate_calls, found_calls)
expected_ssync_calls = sorted([
('127.0.0.0', REVERT, 2, ['3c1']),
('127.0.0.2', REVERT, 2, ['061']),
])
self.assertEqual(expected_ssync_calls, sorted((
c['node']['ip'],
c['job']['job_type'],
c['job']['partition'],
c['suffixes'],
) for c in ssync_calls))
expected_stats = {
('increment', 'partition.delete.count.'): 2,
('timing_since', 'partition.delete.timing'): 2,
}
for stat_key, expected in expected_stats.items():
stat_method, stat_prefix = stat_key
self.assertStatCount(stat_method, stat_prefix, expected)
# part 2 should be totally empty
policy = POLICIES[1]
hash_gen = self.reconstructor._df_router[policy].yield_hashes(
'sda1', '2', policy)
for path, hash_, ts in hash_gen:
self.fail('found %s with %s in %s', (hash_, ts, path))
# but the partition directory and hashes pkl still exist
self.assertTrue(os.access(part_path, os.F_OK))
hashes_path = os.path.join(self.objects_1, '2', diskfile.HASH_FILE)
self.assertTrue(os.access(hashes_path, os.F_OK))
# ... but on next pass
ssync_calls = []
with mocked_http_conn() as request_log:
with mock.patch('swift.obj.reconstructor.ssync_sender',
self._make_fake_ssync(ssync_calls)):
self.reconstructor.reconstruct(override_partitions=[2])
# reconstruct won't generate any replicate or ssync_calls
self.assertFalse(request_log.requests)
self.assertFalse(ssync_calls)
# and the partition will get removed!
self.assertFalse(os.access(part_path, os.F_OK))
def test_process_job_all_success(self):
self.reconstructor._reset_stats()
with mock_ssync_sender():
with mocked_http_conn(*[200] * 12, body=pickle.dumps({})):
found_jobs = []
for part_info in self.reconstructor.collect_parts():
jobs = self.reconstructor.build_reconstruction_jobs(
part_info)
found_jobs.extend(jobs)
for job in jobs:
self.logger._clear()
node_count = len(job['sync_to'])
self.reconstructor.process_job(job)
if job['job_type'] == object_reconstructor.REVERT:
self.assertEqual(0, count_stats(
self.logger, 'update_stats', 'suffix.hashes'))
else:
self.assertStatCount('update_stats',
'suffix.hashes',
node_count)
self.assertEqual(node_count, count_stats(
self.logger, 'update_stats', 'suffix.hashes'))
self.assertEqual(node_count, count_stats(
self.logger, 'update_stats', 'suffix.syncs'))
self.assertFalse('error' in
self.logger.all_log_lines())
self.assertEqual(self.reconstructor.suffix_sync, 8)
self.assertEqual(self.reconstructor.suffix_count, 8)
self.assertEqual(len(found_jobs), 6)
def test_process_job_all_insufficient_storage(self):
self.reconstructor._reset_stats()
with mock_ssync_sender():
with mocked_http_conn(*[507] * 8):
found_jobs = []
for part_info in self.reconstructor.collect_parts():
jobs = self.reconstructor.build_reconstruction_jobs(
part_info)
found_jobs.extend(jobs)
for job in jobs:
self.logger._clear()
self.reconstructor.process_job(job)
for line in self.logger.get_lines_for_level('error'):
self.assertTrue('responded as unmounted' in line)
self.assertEqual(0, count_stats(
self.logger, 'update_stats', 'suffix.hashes'))
self.assertEqual(0, count_stats(
self.logger, 'update_stats', 'suffix.syncs'))
self.assertEqual(self.reconstructor.suffix_sync, 0)
self.assertEqual(self.reconstructor.suffix_count, 0)
self.assertEqual(len(found_jobs), 6)
def test_process_job_all_client_error(self):
self.reconstructor._reset_stats()
with mock_ssync_sender():
with mocked_http_conn(*[400] * 8):
found_jobs = []
for part_info in self.reconstructor.collect_parts():
jobs = self.reconstructor.build_reconstruction_jobs(
part_info)
found_jobs.extend(jobs)
for job in jobs:
self.logger._clear()
self.reconstructor.process_job(job)
for line in self.logger.get_lines_for_level('error'):
self.assertTrue('Invalid response 400' in line)
self.assertEqual(0, count_stats(
self.logger, 'update_stats', 'suffix.hashes'))
self.assertEqual(0, count_stats(
self.logger, 'update_stats', 'suffix.syncs'))
self.assertEqual(self.reconstructor.suffix_sync, 0)
self.assertEqual(self.reconstructor.suffix_count, 0)
self.assertEqual(len(found_jobs), 6)
def test_process_job_all_timeout(self):
self.reconstructor._reset_stats()
with mock_ssync_sender(), mocked_http_conn(*[Timeout()] * 8):
found_jobs = []
for part_info in self.reconstructor.collect_parts():
jobs = self.reconstructor.build_reconstruction_jobs(
part_info)
found_jobs.extend(jobs)
for job in jobs:
self.logger._clear()
self.reconstructor.process_job(job)
for line in self.logger.get_lines_for_level('error'):
self.assertTrue('Timeout (Nones)' in line)
self.assertStatCount(
'update_stats', 'suffix.hashes', 0)
self.assertStatCount(
'update_stats', 'suffix.syncs', 0)
self.assertEqual(self.reconstructor.suffix_sync, 0)
self.assertEqual(self.reconstructor.suffix_count, 0)
self.assertEqual(len(found_jobs), 6)
@patch_policies(with_ec_default=True)
class TestObjectReconstructor(unittest.TestCase):
def setUp(self):
self.policy = POLICIES.default
Allow 1+ object-servers-per-disk deployment Enabled by a new > 0 integer config value, "servers_per_port" in the [DEFAULT] config section for object-server and/or replication server configs. The setting's integer value determines how many different object-server workers handle requests for any single unique local port in the ring. In this mode, the parent swift-object-server process continues to run as the original user (i.e. root if low-port binding is required), binds to all ports as defined in the ring, and forks off the specified number of workers per listen socket. The child, per-port servers drop privileges and behave pretty much how object-server workers always have, except that because the ring has unique ports per disk, the object-servers will only be handling requests for a single disk. The parent process detects dead servers and restarts them (with the correct listen socket), starts missing servers when an updated ring file is found with a device on the server with a new port, and kills extraneous servers when their port is found to no longer be in the ring. The ring files are stat'ed at most every "ring_check_interval" seconds, as configured in the object-server config (same default of 15s). Immediately stopping all swift-object-worker processes still works by sending the parent a SIGTERM. Likewise, a SIGHUP to the parent process still causes the parent process to close all listen sockets and exit, allowing existing children to finish serving their existing requests. The drop_privileges helper function now has an optional param to suppress the setsid() call, which otherwise screws up the child workers' process management. The class method RingData.load() can be told to only load the ring metadata (i.e. everything except replica2part2dev_id) with the optional kwarg, header_only=True. This is used to keep the parent and all forked off workers from unnecessarily having full copies of all storage policy rings in memory. A new helper class, swift.common.storage_policy.BindPortsCache, provides a method to return a set of all device ports in all rings for the server on which it is instantiated (identified by its set of IP addresses). The BindPortsCache instance will track mtimes of ring files, so they are not opened more frequently than necessary. This patch includes enhancements to the probe tests and object-replicator/object-reconstructor config plumbing to allow the probe tests to work correctly both in the "normal" config (same IP but unique ports for each SAIO "server") and a server-per-port setup where each SAIO "server" must have a unique IP address and unique port per disk within each "server". The main probe tests only work with 4 servers and 4 disks, but you can see the difference in the rings for the EC probe tests where there are 2 disks per server for a total of 8 disks. Specifically, swift.common.ring.utils.is_local_device() will ignore the ports when the "my_port" argument is None. Then, object-replicator and object-reconstructor both set self.bind_port to None if server_per_port is enabled. Bonus improvement for IPv6 addresses in is_local_device(). This PR for vagrant-swift-all-in-one will aid in testing this patch: https://github.com/swiftstack/vagrant-swift-all-in-one/pull/16/ Also allow SAIO to answer is_local_device() better; common SAIO setups have multiple "servers" all on the same host with different ports for the different "servers" (which happen to match the IPs specified in the rings for the devices on each of those "servers"). However, you can configure the SAIO to have different localhost IP addresses (e.g. 127.0.0.1, 127.0.0.2, etc.) in the ring and in the servers' config files' bind_ip setting. This new whataremyips() implementation combined with a little plumbing allows is_local_device() to accurately answer, even on an SAIO. In the default case (an unspecified bind_ip defaults to '0.0.0.0') as well as an explict "bind to everything" like '0.0.0.0' or '::', whataremyips() behaves as it always has, returning all IP addresses for the server. Also updated probe tests to handle each "server" in the SAIO having a unique IP address. For some (noisy) benchmarks that show servers_per_port=X is at least as good as the same number of "normal" workers: https://gist.github.com/dbishop/c214f89ca708a6b1624a#file-summary-md Benchmarks showing the benefits of I/O isolation with a small number of slow disks: https://gist.github.com/dbishop/fd0ab067babdecfb07ca#file-results-md If you were wondering what the overhead of threads_per_disk looks like: https://gist.github.com/dbishop/1d14755fedc86a161718#file-tabular_results-md DocImpact Change-Id: I2239a4000b41a7e7cc53465ce794af49d44796c6
2015-05-14 22:14:15 -07:00
self.policy.object_ring._rtime = time.time() + 3600
self.testdir = tempfile.mkdtemp()
self.devices = os.path.join(self.testdir, 'devices')
self.local_dev = self.policy.object_ring.devs[0]
self.ip = self.local_dev['replication_ip']
self.port = self.local_dev['replication_port']
self.conf = {
'devices': self.devices,
'mount_check': False,
Allow 1+ object-servers-per-disk deployment Enabled by a new > 0 integer config value, "servers_per_port" in the [DEFAULT] config section for object-server and/or replication server configs. The setting's integer value determines how many different object-server workers handle requests for any single unique local port in the ring. In this mode, the parent swift-object-server process continues to run as the original user (i.e. root if low-port binding is required), binds to all ports as defined in the ring, and forks off the specified number of workers per listen socket. The child, per-port servers drop privileges and behave pretty much how object-server workers always have, except that because the ring has unique ports per disk, the object-servers will only be handling requests for a single disk. The parent process detects dead servers and restarts them (with the correct listen socket), starts missing servers when an updated ring file is found with a device on the server with a new port, and kills extraneous servers when their port is found to no longer be in the ring. The ring files are stat'ed at most every "ring_check_interval" seconds, as configured in the object-server config (same default of 15s). Immediately stopping all swift-object-worker processes still works by sending the parent a SIGTERM. Likewise, a SIGHUP to the parent process still causes the parent process to close all listen sockets and exit, allowing existing children to finish serving their existing requests. The drop_privileges helper function now has an optional param to suppress the setsid() call, which otherwise screws up the child workers' process management. The class method RingData.load() can be told to only load the ring metadata (i.e. everything except replica2part2dev_id) with the optional kwarg, header_only=True. This is used to keep the parent and all forked off workers from unnecessarily having full copies of all storage policy rings in memory. A new helper class, swift.common.storage_policy.BindPortsCache, provides a method to return a set of all device ports in all rings for the server on which it is instantiated (identified by its set of IP addresses). The BindPortsCache instance will track mtimes of ring files, so they are not opened more frequently than necessary. This patch includes enhancements to the probe tests and object-replicator/object-reconstructor config plumbing to allow the probe tests to work correctly both in the "normal" config (same IP but unique ports for each SAIO "server") and a server-per-port setup where each SAIO "server" must have a unique IP address and unique port per disk within each "server". The main probe tests only work with 4 servers and 4 disks, but you can see the difference in the rings for the EC probe tests where there are 2 disks per server for a total of 8 disks. Specifically, swift.common.ring.utils.is_local_device() will ignore the ports when the "my_port" argument is None. Then, object-replicator and object-reconstructor both set self.bind_port to None if server_per_port is enabled. Bonus improvement for IPv6 addresses in is_local_device(). This PR for vagrant-swift-all-in-one will aid in testing this patch: https://github.com/swiftstack/vagrant-swift-all-in-one/pull/16/ Also allow SAIO to answer is_local_device() better; common SAIO setups have multiple "servers" all on the same host with different ports for the different "servers" (which happen to match the IPs specified in the rings for the devices on each of those "servers"). However, you can configure the SAIO to have different localhost IP addresses (e.g. 127.0.0.1, 127.0.0.2, etc.) in the ring and in the servers' config files' bind_ip setting. This new whataremyips() implementation combined with a little plumbing allows is_local_device() to accurately answer, even on an SAIO. In the default case (an unspecified bind_ip defaults to '0.0.0.0') as well as an explict "bind to everything" like '0.0.0.0' or '::', whataremyips() behaves as it always has, returning all IP addresses for the server. Also updated probe tests to handle each "server" in the SAIO having a unique IP address. For some (noisy) benchmarks that show servers_per_port=X is at least as good as the same number of "normal" workers: https://gist.github.com/dbishop/c214f89ca708a6b1624a#file-summary-md Benchmarks showing the benefits of I/O isolation with a small number of slow disks: https://gist.github.com/dbishop/fd0ab067babdecfb07ca#file-results-md If you were wondering what the overhead of threads_per_disk looks like: https://gist.github.com/dbishop/1d14755fedc86a161718#file-tabular_results-md DocImpact Change-Id: I2239a4000b41a7e7cc53465ce794af49d44796c6
2015-05-14 22:14:15 -07:00
'bind_ip': self.ip,
'bind_port': self.port,
}
self.logger = debug_logger('object-reconstructor')
self._configure_reconstructor()
self.policy.object_ring.max_more_nodes = \
self.policy.object_ring.replicas
self.ts_iter = make_timestamp_iter()
def _configure_reconstructor(self, **kwargs):
self.conf.update(kwargs)
self.reconstructor = object_reconstructor.ObjectReconstructor(
self.conf, logger=self.logger)
self.reconstructor._reset_stats()
# some tests bypass build_reconstruction_jobs and go to process_job
# directly, so you end up with a /0 when you try to show the
# percentage of complete jobs as ratio of the total job count
self.reconstructor.job_count = 1
def tearDown(self):
self.reconstructor._reset_stats()
self.reconstructor.stats_line()
shutil.rmtree(self.testdir)
def ts(self):
return next(self.ts_iter)
def test_collect_parts_skips_non_ec_policy_and_device(self):
stub_parts = (371, 78, 419, 834)
for policy in POLICIES:
datadir = diskfile.get_data_dir(policy)
for part in stub_parts:
utils.mkdirs(os.path.join(
self.devices, self.local_dev['device'],
datadir, str(part)))
Allow 1+ object-servers-per-disk deployment Enabled by a new > 0 integer config value, "servers_per_port" in the [DEFAULT] config section for object-server and/or replication server configs. The setting's integer value determines how many different object-server workers handle requests for any single unique local port in the ring. In this mode, the parent swift-object-server process continues to run as the original user (i.e. root if low-port binding is required), binds to all ports as defined in the ring, and forks off the specified number of workers per listen socket. The child, per-port servers drop privileges and behave pretty much how object-server workers always have, except that because the ring has unique ports per disk, the object-servers will only be handling requests for a single disk. The parent process detects dead servers and restarts them (with the correct listen socket), starts missing servers when an updated ring file is found with a device on the server with a new port, and kills extraneous servers when their port is found to no longer be in the ring. The ring files are stat'ed at most every "ring_check_interval" seconds, as configured in the object-server config (same default of 15s). Immediately stopping all swift-object-worker processes still works by sending the parent a SIGTERM. Likewise, a SIGHUP to the parent process still causes the parent process to close all listen sockets and exit, allowing existing children to finish serving their existing requests. The drop_privileges helper function now has an optional param to suppress the setsid() call, which otherwise screws up the child workers' process management. The class method RingData.load() can be told to only load the ring metadata (i.e. everything except replica2part2dev_id) with the optional kwarg, header_only=True. This is used to keep the parent and all forked off workers from unnecessarily having full copies of all storage policy rings in memory. A new helper class, swift.common.storage_policy.BindPortsCache, provides a method to return a set of all device ports in all rings for the server on which it is instantiated (identified by its set of IP addresses). The BindPortsCache instance will track mtimes of ring files, so they are not opened more frequently than necessary. This patch includes enhancements to the probe tests and object-replicator/object-reconstructor config plumbing to allow the probe tests to work correctly both in the "normal" config (same IP but unique ports for each SAIO "server") and a server-per-port setup where each SAIO "server" must have a unique IP address and unique port per disk within each "server". The main probe tests only work with 4 servers and 4 disks, but you can see the difference in the rings for the EC probe tests where there are 2 disks per server for a total of 8 disks. Specifically, swift.common.ring.utils.is_local_device() will ignore the ports when the "my_port" argument is None. Then, object-replicator and object-reconstructor both set self.bind_port to None if server_per_port is enabled. Bonus improvement for IPv6 addresses in is_local_device(). This PR for vagrant-swift-all-in-one will aid in testing this patch: https://github.com/swiftstack/vagrant-swift-all-in-one/pull/16/ Also allow SAIO to answer is_local_device() better; common SAIO setups have multiple "servers" all on the same host with different ports for the different "servers" (which happen to match the IPs specified in the rings for the devices on each of those "servers"). However, you can configure the SAIO to have different localhost IP addresses (e.g. 127.0.0.1, 127.0.0.2, etc.) in the ring and in the servers' config files' bind_ip setting. This new whataremyips() implementation combined with a little plumbing allows is_local_device() to accurately answer, even on an SAIO. In the default case (an unspecified bind_ip defaults to '0.0.0.0') as well as an explict "bind to everything" like '0.0.0.0' or '::', whataremyips() behaves as it always has, returning all IP addresses for the server. Also updated probe tests to handle each "server" in the SAIO having a unique IP address. For some (noisy) benchmarks that show servers_per_port=X is at least as good as the same number of "normal" workers: https://gist.github.com/dbishop/c214f89ca708a6b1624a#file-summary-md Benchmarks showing the benefits of I/O isolation with a small number of slow disks: https://gist.github.com/dbishop/fd0ab067babdecfb07ca#file-results-md If you were wondering what the overhead of threads_per_disk looks like: https://gist.github.com/dbishop/1d14755fedc86a161718#file-tabular_results-md DocImpact Change-Id: I2239a4000b41a7e7cc53465ce794af49d44796c6
2015-05-14 22:14:15 -07:00
part_infos = list(self.reconstructor.collect_parts())
found_parts = sorted(int(p['partition']) for p in part_infos)
self.assertEqual(found_parts, sorted(stub_parts))
for part_info in part_infos:
self.assertEqual(part_info['local_dev'], self.local_dev)
self.assertEqual(part_info['policy'], self.policy)
self.assertEqual(part_info['part_path'],
os.path.join(self.devices,
self.local_dev['device'],
diskfile.get_data_dir(self.policy),
str(part_info['partition'])))
Allow 1+ object-servers-per-disk deployment Enabled by a new > 0 integer config value, "servers_per_port" in the [DEFAULT] config section for object-server and/or replication server configs. The setting's integer value determines how many different object-server workers handle requests for any single unique local port in the ring. In this mode, the parent swift-object-server process continues to run as the original user (i.e. root if low-port binding is required), binds to all ports as defined in the ring, and forks off the specified number of workers per listen socket. The child, per-port servers drop privileges and behave pretty much how object-server workers always have, except that because the ring has unique ports per disk, the object-servers will only be handling requests for a single disk. The parent process detects dead servers and restarts them (with the correct listen socket), starts missing servers when an updated ring file is found with a device on the server with a new port, and kills extraneous servers when their port is found to no longer be in the ring. The ring files are stat'ed at most every "ring_check_interval" seconds, as configured in the object-server config (same default of 15s). Immediately stopping all swift-object-worker processes still works by sending the parent a SIGTERM. Likewise, a SIGHUP to the parent process still causes the parent process to close all listen sockets and exit, allowing existing children to finish serving their existing requests. The drop_privileges helper function now has an optional param to suppress the setsid() call, which otherwise screws up the child workers' process management. The class method RingData.load() can be told to only load the ring metadata (i.e. everything except replica2part2dev_id) with the optional kwarg, header_only=True. This is used to keep the parent and all forked off workers from unnecessarily having full copies of all storage policy rings in memory. A new helper class, swift.common.storage_policy.BindPortsCache, provides a method to return a set of all device ports in all rings for the server on which it is instantiated (identified by its set of IP addresses). The BindPortsCache instance will track mtimes of ring files, so they are not opened more frequently than necessary. This patch includes enhancements to the probe tests and object-replicator/object-reconstructor config plumbing to allow the probe tests to work correctly both in the "normal" config (same IP but unique ports for each SAIO "server") and a server-per-port setup where each SAIO "server" must have a unique IP address and unique port per disk within each "server". The main probe tests only work with 4 servers and 4 disks, but you can see the difference in the rings for the EC probe tests where there are 2 disks per server for a total of 8 disks. Specifically, swift.common.ring.utils.is_local_device() will ignore the ports when the "my_port" argument is None. Then, object-replicator and object-reconstructor both set self.bind_port to None if server_per_port is enabled. Bonus improvement for IPv6 addresses in is_local_device(). This PR for vagrant-swift-all-in-one will aid in testing this patch: https://github.com/swiftstack/vagrant-swift-all-in-one/pull/16/ Also allow SAIO to answer is_local_device() better; common SAIO setups have multiple "servers" all on the same host with different ports for the different "servers" (which happen to match the IPs specified in the rings for the devices on each of those "servers"). However, you can configure the SAIO to have different localhost IP addresses (e.g. 127.0.0.1, 127.0.0.2, etc.) in the ring and in the servers' config files' bind_ip setting. This new whataremyips() implementation combined with a little plumbing allows is_local_device() to accurately answer, even on an SAIO. In the default case (an unspecified bind_ip defaults to '0.0.0.0') as well as an explict "bind to everything" like '0.0.0.0' or '::', whataremyips() behaves as it always has, returning all IP addresses for the server. Also updated probe tests to handle each "server" in the SAIO having a unique IP address. For some (noisy) benchmarks that show servers_per_port=X is at least as good as the same number of "normal" workers: https://gist.github.com/dbishop/c214f89ca708a6b1624a#file-summary-md Benchmarks showing the benefits of I/O isolation with a small number of slow disks: https://gist.github.com/dbishop/fd0ab067babdecfb07ca#file-results-md If you were wondering what the overhead of threads_per_disk looks like: https://gist.github.com/dbishop/1d14755fedc86a161718#file-tabular_results-md DocImpact Change-Id: I2239a4000b41a7e7cc53465ce794af49d44796c6
2015-05-14 22:14:15 -07:00
def test_collect_parts_skips_non_local_devs_servers_per_port(self):
self._configure_reconstructor(devices=self.devices, mount_check=False,
bind_ip=self.ip, bind_port=self.port,
servers_per_port=2)
device_parts = {
'sda': (374,),
'sdb': (179, 807), # w/one-serv-per-port, same IP alone is local
'sdc': (363, 468, 843),
'sdd': (912,), # "not local" via different IP
}
for policy in POLICIES:
datadir = diskfile.get_data_dir(policy)
for dev, parts in device_parts.items():
for part in parts:
utils.mkdirs(os.path.join(
self.devices, dev,
datadir, str(part)))
# we're only going to add sda and sdc into the ring
local_devs = ('sda', 'sdb', 'sdc')
stub_ring_devs = [{
'device': dev,
'replication_ip': self.ip,
'replication_port': self.port + 1 if dev == 'sdb' else self.port,
} for dev in local_devs]
stub_ring_devs.append({
'device': 'sdd',
'replication_ip': '127.0.0.88', # not local via IP
'replication_port': self.port,
})
self.reconstructor.bind_ip = '0.0.0.0' # use whataremyips
with mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]), \
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs):
Allow 1+ object-servers-per-disk deployment Enabled by a new > 0 integer config value, "servers_per_port" in the [DEFAULT] config section for object-server and/or replication server configs. The setting's integer value determines how many different object-server workers handle requests for any single unique local port in the ring. In this mode, the parent swift-object-server process continues to run as the original user (i.e. root if low-port binding is required), binds to all ports as defined in the ring, and forks off the specified number of workers per listen socket. The child, per-port servers drop privileges and behave pretty much how object-server workers always have, except that because the ring has unique ports per disk, the object-servers will only be handling requests for a single disk. The parent process detects dead servers and restarts them (with the correct listen socket), starts missing servers when an updated ring file is found with a device on the server with a new port, and kills extraneous servers when their port is found to no longer be in the ring. The ring files are stat'ed at most every "ring_check_interval" seconds, as configured in the object-server config (same default of 15s). Immediately stopping all swift-object-worker processes still works by sending the parent a SIGTERM. Likewise, a SIGHUP to the parent process still causes the parent process to close all listen sockets and exit, allowing existing children to finish serving their existing requests. The drop_privileges helper function now has an optional param to suppress the setsid() call, which otherwise screws up the child workers' process management. The class method RingData.load() can be told to only load the ring metadata (i.e. everything except replica2part2dev_id) with the optional kwarg, header_only=True. This is used to keep the parent and all forked off workers from unnecessarily having full copies of all storage policy rings in memory. A new helper class, swift.common.storage_policy.BindPortsCache, provides a method to return a set of all device ports in all rings for the server on which it is instantiated (identified by its set of IP addresses). The BindPortsCache instance will track mtimes of ring files, so they are not opened more frequently than necessary. This patch includes enhancements to the probe tests and object-replicator/object-reconstructor config plumbing to allow the probe tests to work correctly both in the "normal" config (same IP but unique ports for each SAIO "server") and a server-per-port setup where each SAIO "server" must have a unique IP address and unique port per disk within each "server". The main probe tests only work with 4 servers and 4 disks, but you can see the difference in the rings for the EC probe tests where there are 2 disks per server for a total of 8 disks. Specifically, swift.common.ring.utils.is_local_device() will ignore the ports when the "my_port" argument is None. Then, object-replicator and object-reconstructor both set self.bind_port to None if server_per_port is enabled. Bonus improvement for IPv6 addresses in is_local_device(). This PR for vagrant-swift-all-in-one will aid in testing this patch: https://github.com/swiftstack/vagrant-swift-all-in-one/pull/16/ Also allow SAIO to answer is_local_device() better; common SAIO setups have multiple "servers" all on the same host with different ports for the different "servers" (which happen to match the IPs specified in the rings for the devices on each of those "servers"). However, you can configure the SAIO to have different localhost IP addresses (e.g. 127.0.0.1, 127.0.0.2, etc.) in the ring and in the servers' config files' bind_ip setting. This new whataremyips() implementation combined with a little plumbing allows is_local_device() to accurately answer, even on an SAIO. In the default case (an unspecified bind_ip defaults to '0.0.0.0') as well as an explict "bind to everything" like '0.0.0.0' or '::', whataremyips() behaves as it always has, returning all IP addresses for the server. Also updated probe tests to handle each "server" in the SAIO having a unique IP address. For some (noisy) benchmarks that show servers_per_port=X is at least as good as the same number of "normal" workers: https://gist.github.com/dbishop/c214f89ca708a6b1624a#file-summary-md Benchmarks showing the benefits of I/O isolation with a small number of slow disks: https://gist.github.com/dbishop/fd0ab067babdecfb07ca#file-results-md If you were wondering what the overhead of threads_per_disk looks like: https://gist.github.com/dbishop/1d14755fedc86a161718#file-tabular_results-md DocImpact Change-Id: I2239a4000b41a7e7cc53465ce794af49d44796c6
2015-05-14 22:14:15 -07:00
part_infos = list(self.reconstructor.collect_parts())
found_parts = sorted(int(p['partition']) for p in part_infos)
expected_parts = sorted(itertools.chain(
*(device_parts[d] for d in local_devs)))
self.assertEqual(found_parts, expected_parts)
for part_info in part_infos:
self.assertEqual(part_info['policy'], self.policy)
self.assertTrue(part_info['local_dev'] in stub_ring_devs)
dev = part_info['local_dev']
self.assertEqual(part_info['part_path'],
os.path.join(self.devices,
dev['device'],
diskfile.get_data_dir(self.policy),
str(part_info['partition'])))
def test_collect_parts_multi_device_skips_non_non_local_devs(self):
device_parts = {
'sda': (374,),
'sdb': (179, 807), # "not local" via different port
'sdc': (363, 468, 843),
'sdd': (912,), # "not local" via different IP
}
for policy in POLICIES:
datadir = diskfile.get_data_dir(policy)
for dev, parts in device_parts.items():
for part in parts:
utils.mkdirs(os.path.join(
self.devices, dev,
datadir, str(part)))
# we're only going to add sda and sdc into the ring
local_devs = ('sda', 'sdc')
stub_ring_devs = [{
'device': dev,
'replication_ip': self.ip,
'replication_port': self.port,
} for dev in local_devs]
stub_ring_devs.append({
'device': 'sdb',
'replication_ip': self.ip,
'replication_port': self.port + 1, # not local via port
})
stub_ring_devs.append({
'device': 'sdd',
'replication_ip': '127.0.0.88', # not local via IP
'replication_port': self.port,
})
self.reconstructor.bind_ip = '0.0.0.0' # use whataremyips
with mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]), \
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs):
Allow 1+ object-servers-per-disk deployment Enabled by a new > 0 integer config value, "servers_per_port" in the [DEFAULT] config section for object-server and/or replication server configs. The setting's integer value determines how many different object-server workers handle requests for any single unique local port in the ring. In this mode, the parent swift-object-server process continues to run as the original user (i.e. root if low-port binding is required), binds to all ports as defined in the ring, and forks off the specified number of workers per listen socket. The child, per-port servers drop privileges and behave pretty much how object-server workers always have, except that because the ring has unique ports per disk, the object-servers will only be handling requests for a single disk. The parent process detects dead servers and restarts them (with the correct listen socket), starts missing servers when an updated ring file is found with a device on the server with a new port, and kills extraneous servers when their port is found to no longer be in the ring. The ring files are stat'ed at most every "ring_check_interval" seconds, as configured in the object-server config (same default of 15s). Immediately stopping all swift-object-worker processes still works by sending the parent a SIGTERM. Likewise, a SIGHUP to the parent process still causes the parent process to close all listen sockets and exit, allowing existing children to finish serving their existing requests. The drop_privileges helper function now has an optional param to suppress the setsid() call, which otherwise screws up the child workers' process management. The class method RingData.load() can be told to only load the ring metadata (i.e. everything except replica2part2dev_id) with the optional kwarg, header_only=True. This is used to keep the parent and all forked off workers from unnecessarily having full copies of all storage policy rings in memory. A new helper class, swift.common.storage_policy.BindPortsCache, provides a method to return a set of all device ports in all rings for the server on which it is instantiated (identified by its set of IP addresses). The BindPortsCache instance will track mtimes of ring files, so they are not opened more frequently than necessary. This patch includes enhancements to the probe tests and object-replicator/object-reconstructor config plumbing to allow the probe tests to work correctly both in the "normal" config (same IP but unique ports for each SAIO "server") and a server-per-port setup where each SAIO "server" must have a unique IP address and unique port per disk within each "server". The main probe tests only work with 4 servers and 4 disks, but you can see the difference in the rings for the EC probe tests where there are 2 disks per server for a total of 8 disks. Specifically, swift.common.ring.utils.is_local_device() will ignore the ports when the "my_port" argument is None. Then, object-replicator and object-reconstructor both set self.bind_port to None if server_per_port is enabled. Bonus improvement for IPv6 addresses in is_local_device(). This PR for vagrant-swift-all-in-one will aid in testing this patch: https://github.com/swiftstack/vagrant-swift-all-in-one/pull/16/ Also allow SAIO to answer is_local_device() better; common SAIO setups have multiple "servers" all on the same host with different ports for the different "servers" (which happen to match the IPs specified in the rings for the devices on each of those "servers"). However, you can configure the SAIO to have different localhost IP addresses (e.g. 127.0.0.1, 127.0.0.2, etc.) in the ring and in the servers' config files' bind_ip setting. This new whataremyips() implementation combined with a little plumbing allows is_local_device() to accurately answer, even on an SAIO. In the default case (an unspecified bind_ip defaults to '0.0.0.0') as well as an explict "bind to everything" like '0.0.0.0' or '::', whataremyips() behaves as it always has, returning all IP addresses for the server. Also updated probe tests to handle each "server" in the SAIO having a unique IP address. For some (noisy) benchmarks that show servers_per_port=X is at least as good as the same number of "normal" workers: https://gist.github.com/dbishop/c214f89ca708a6b1624a#file-summary-md Benchmarks showing the benefits of I/O isolation with a small number of slow disks: https://gist.github.com/dbishop/fd0ab067babdecfb07ca#file-results-md If you were wondering what the overhead of threads_per_disk looks like: https://gist.github.com/dbishop/1d14755fedc86a161718#file-tabular_results-md DocImpact Change-Id: I2239a4000b41a7e7cc53465ce794af49d44796c6
2015-05-14 22:14:15 -07:00
part_infos = list(self.reconstructor.collect_parts())
found_parts = sorted(int(p['partition']) for p in part_infos)
expected_parts = sorted(itertools.chain(
*(device_parts[d] for d in local_devs)))
self.assertEqual(found_parts, expected_parts)
for part_info in part_infos:
self.assertEqual(part_info['policy'], self.policy)
self.assertTrue(part_info['local_dev'] in stub_ring_devs)
dev = part_info['local_dev']
self.assertEqual(part_info['part_path'],
os.path.join(self.devices,
dev['device'],
diskfile.get_data_dir(self.policy),
str(part_info['partition'])))
def test_collect_parts_multi_device_skips_non_ring_devices(self):
device_parts = {
'sda': (374,),
'sdc': (363, 468, 843),
}
for policy in POLICIES:
datadir = diskfile.get_data_dir(policy)
for dev, parts in device_parts.items():
for part in parts:
utils.mkdirs(os.path.join(
self.devices, dev,
datadir, str(part)))
# we're only going to add sda and sdc into the ring
local_devs = ('sda', 'sdc')
stub_ring_devs = [{
'device': dev,
'replication_ip': self.ip,
Allow 1+ object-servers-per-disk deployment Enabled by a new > 0 integer config value, "servers_per_port" in the [DEFAULT] config section for object-server and/or replication server configs. The setting's integer value determines how many different object-server workers handle requests for any single unique local port in the ring. In this mode, the parent swift-object-server process continues to run as the original user (i.e. root if low-port binding is required), binds to all ports as defined in the ring, and forks off the specified number of workers per listen socket. The child, per-port servers drop privileges and behave pretty much how object-server workers always have, except that because the ring has unique ports per disk, the object-servers will only be handling requests for a single disk. The parent process detects dead servers and restarts them (with the correct listen socket), starts missing servers when an updated ring file is found with a device on the server with a new port, and kills extraneous servers when their port is found to no longer be in the ring. The ring files are stat'ed at most every "ring_check_interval" seconds, as configured in the object-server config (same default of 15s). Immediately stopping all swift-object-worker processes still works by sending the parent a SIGTERM. Likewise, a SIGHUP to the parent process still causes the parent process to close all listen sockets and exit, allowing existing children to finish serving their existing requests. The drop_privileges helper function now has an optional param to suppress the setsid() call, which otherwise screws up the child workers' process management. The class method RingData.load() can be told to only load the ring metadata (i.e. everything except replica2part2dev_id) with the optional kwarg, header_only=True. This is used to keep the parent and all forked off workers from unnecessarily having full copies of all storage policy rings in memory. A new helper class, swift.common.storage_policy.BindPortsCache, provides a method to return a set of all device ports in all rings for the server on which it is instantiated (identified by its set of IP addresses). The BindPortsCache instance will track mtimes of ring files, so they are not opened more frequently than necessary. This patch includes enhancements to the probe tests and object-replicator/object-reconstructor config plumbing to allow the probe tests to work correctly both in the "normal" config (same IP but unique ports for each SAIO "server") and a server-per-port setup where each SAIO "server" must have a unique IP address and unique port per disk within each "server". The main probe tests only work with 4 servers and 4 disks, but you can see the difference in the rings for the EC probe tests where there are 2 disks per server for a total of 8 disks. Specifically, swift.common.ring.utils.is_local_device() will ignore the ports when the "my_port" argument is None. Then, object-replicator and object-reconstructor both set self.bind_port to None if server_per_port is enabled. Bonus improvement for IPv6 addresses in is_local_device(). This PR for vagrant-swift-all-in-one will aid in testing this patch: https://github.com/swiftstack/vagrant-swift-all-in-one/pull/16/ Also allow SAIO to answer is_local_device() better; common SAIO setups have multiple "servers" all on the same host with different ports for the different "servers" (which happen to match the IPs specified in the rings for the devices on each of those "servers"). However, you can configure the SAIO to have different localhost IP addresses (e.g. 127.0.0.1, 127.0.0.2, etc.) in the ring and in the servers' config files' bind_ip setting. This new whataremyips() implementation combined with a little plumbing allows is_local_device() to accurately answer, even on an SAIO. In the default case (an unspecified bind_ip defaults to '0.0.0.0') as well as an explict "bind to everything" like '0.0.0.0' or '::', whataremyips() behaves as it always has, returning all IP addresses for the server. Also updated probe tests to handle each "server" in the SAIO having a unique IP address. For some (noisy) benchmarks that show servers_per_port=X is at least as good as the same number of "normal" workers: https://gist.github.com/dbishop/c214f89ca708a6b1624a#file-summary-md Benchmarks showing the benefits of I/O isolation with a small number of slow disks: https://gist.github.com/dbishop/fd0ab067babdecfb07ca#file-results-md If you were wondering what the overhead of threads_per_disk looks like: https://gist.github.com/dbishop/1d14755fedc86a161718#file-tabular_results-md DocImpact Change-Id: I2239a4000b41a7e7cc53465ce794af49d44796c6
2015-05-14 22:14:15 -07:00
'replication_port': self.port,
} for dev in local_devs]
Allow 1+ object-servers-per-disk deployment Enabled by a new > 0 integer config value, "servers_per_port" in the [DEFAULT] config section for object-server and/or replication server configs. The setting's integer value determines how many different object-server workers handle requests for any single unique local port in the ring. In this mode, the parent swift-object-server process continues to run as the original user (i.e. root if low-port binding is required), binds to all ports as defined in the ring, and forks off the specified number of workers per listen socket. The child, per-port servers drop privileges and behave pretty much how object-server workers always have, except that because the ring has unique ports per disk, the object-servers will only be handling requests for a single disk. The parent process detects dead servers and restarts them (with the correct listen socket), starts missing servers when an updated ring file is found with a device on the server with a new port, and kills extraneous servers when their port is found to no longer be in the ring. The ring files are stat'ed at most every "ring_check_interval" seconds, as configured in the object-server config (same default of 15s). Immediately stopping all swift-object-worker processes still works by sending the parent a SIGTERM. Likewise, a SIGHUP to the parent process still causes the parent process to close all listen sockets and exit, allowing existing children to finish serving their existing requests. The drop_privileges helper function now has an optional param to suppress the setsid() call, which otherwise screws up the child workers' process management. The class method RingData.load() can be told to only load the ring metadata (i.e. everything except replica2part2dev_id) with the optional kwarg, header_only=True. This is used to keep the parent and all forked off workers from unnecessarily having full copies of all storage policy rings in memory. A new helper class, swift.common.storage_policy.BindPortsCache, provides a method to return a set of all device ports in all rings for the server on which it is instantiated (identified by its set of IP addresses). The BindPortsCache instance will track mtimes of ring files, so they are not opened more frequently than necessary. This patch includes enhancements to the probe tests and object-replicator/object-reconstructor config plumbing to allow the probe tests to work correctly both in the "normal" config (same IP but unique ports for each SAIO "server") and a server-per-port setup where each SAIO "server" must have a unique IP address and unique port per disk within each "server". The main probe tests only work with 4 servers and 4 disks, but you can see the difference in the rings for the EC probe tests where there are 2 disks per server for a total of 8 disks. Specifically, swift.common.ring.utils.is_local_device() will ignore the ports when the "my_port" argument is None. Then, object-replicator and object-reconstructor both set self.bind_port to None if server_per_port is enabled. Bonus improvement for IPv6 addresses in is_local_device(). This PR for vagrant-swift-all-in-one will aid in testing this patch: https://github.com/swiftstack/vagrant-swift-all-in-one/pull/16/ Also allow SAIO to answer is_local_device() better; common SAIO setups have multiple "servers" all on the same host with different ports for the different "servers" (which happen to match the IPs specified in the rings for the devices on each of those "servers"). However, you can configure the SAIO to have different localhost IP addresses (e.g. 127.0.0.1, 127.0.0.2, etc.) in the ring and in the servers' config files' bind_ip setting. This new whataremyips() implementation combined with a little plumbing allows is_local_device() to accurately answer, even on an SAIO. In the default case (an unspecified bind_ip defaults to '0.0.0.0') as well as an explict "bind to everything" like '0.0.0.0' or '::', whataremyips() behaves as it always has, returning all IP addresses for the server. Also updated probe tests to handle each "server" in the SAIO having a unique IP address. For some (noisy) benchmarks that show servers_per_port=X is at least as good as the same number of "normal" workers: https://gist.github.com/dbishop/c214f89ca708a6b1624a#file-summary-md Benchmarks showing the benefits of I/O isolation with a small number of slow disks: https://gist.github.com/dbishop/fd0ab067babdecfb07ca#file-results-md If you were wondering what the overhead of threads_per_disk looks like: https://gist.github.com/dbishop/1d14755fedc86a161718#file-tabular_results-md DocImpact Change-Id: I2239a4000b41a7e7cc53465ce794af49d44796c6
2015-05-14 22:14:15 -07:00
self.reconstructor.bind_ip = '0.0.0.0' # use whataremyips
with mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]), \
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs):
part_infos = list(self.reconstructor.collect_parts())
found_parts = sorted(int(p['partition']) for p in part_infos)
expected_parts = sorted(itertools.chain(
*(device_parts[d] for d in local_devs)))
self.assertEqual(found_parts, expected_parts)
for part_info in part_infos:
self.assertEqual(part_info['policy'], self.policy)
self.assertTrue(part_info['local_dev'] in stub_ring_devs)
dev = part_info['local_dev']
self.assertEqual(part_info['part_path'],
os.path.join(self.devices,
dev['device'],
diskfile.get_data_dir(self.policy),
str(part_info['partition'])))
def test_collect_parts_mount_check(self):
# each device has one part in it
local_devs = ('sda', 'sdb')
for i, dev in enumerate(local_devs):
datadir = diskfile.get_data_dir(self.policy)
utils.mkdirs(os.path.join(
self.devices, dev, datadir, str(i)))
stub_ring_devs = [{
'device': dev,
'replication_ip': self.ip,
'replication_port': self.port
} for dev in local_devs]
with mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]), \
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs):
part_infos = list(self.reconstructor.collect_parts())
self.assertEqual(2, len(part_infos)) # sanity
self.assertEqual(set(int(p['partition']) for p in part_infos),
set([0, 1]))
paths = []
def fake_check_mount(devices, device):
paths.append(os.path.join(devices, device))
return False
with mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]), \
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs), \
mock.patch('swift.obj.diskfile.check_mount',
fake_check_mount):
part_infos = list(self.reconstructor.collect_parts())
self.assertEqual(2, len(part_infos)) # sanity, same jobs
self.assertEqual(set(int(p['partition']) for p in part_infos),
set([0, 1]))
# ... because ismount was not called
self.assertEqual(paths, [])
# ... now with mount check
self._configure_reconstructor(mount_check=True)
self.assertTrue(self.reconstructor.mount_check)
for policy in POLICIES:
self.assertTrue(self.reconstructor._df_router[policy].mount_check)
with mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]), \
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs), \
mock.patch('swift.obj.diskfile.check_mount',
fake_check_mount):
part_infos = list(self.reconstructor.collect_parts())
self.assertEqual([], part_infos) # sanity, no jobs
# ... because fake_ismount returned False for both paths
self.assertEqual(set(paths), set([
os.path.join(self.devices, dev) for dev in local_devs]))
def fake_check_mount(devices, device):
path = os.path.join(devices, device)
if path.endswith('sda'):
return True
else:
return False
with mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]), \
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs), \
mock.patch('swift.obj.diskfile.check_mount',
fake_check_mount):
part_infos = list(self.reconstructor.collect_parts())
self.assertEqual(1, len(part_infos)) # only sda picked up (part 0)
self.assertEqual(part_infos[0]['partition'], 0)
def test_collect_parts_cleans_tmp(self):
local_devs = ('sda', 'sdc')
stub_ring_devs = [{
'device': dev,
'replication_ip': self.ip,
'replication_port': self.port
} for dev in local_devs]
for device in local_devs:
utils.mkdirs(os.path.join(self.devices, device))
fake_unlink = mock.MagicMock()
self.reconstructor.reclaim_age = 1000
now = time.time()
with mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]), \
mock.patch('swift.obj.reconstructor.time.time',
return_value=now), \
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs), \
mock.patch('swift.obj.reconstructor.unlink_older_than',
fake_unlink):
self.assertEqual([], list(self.reconstructor.collect_parts()))
# each local device hash unlink_older_than called on it,
# with now - self.reclaim_age
tmpdir = diskfile.get_tmp_dir(self.policy)
expected = now - 1000
self.assertEqual(fake_unlink.mock_calls, [
mock.call(os.path.join(self.devices, dev, tmpdir), expected)
for dev in local_devs])
def test_collect_parts_creates_datadir(self):
# create just the device path
dev_path = os.path.join(self.devices, self.local_dev['device'])
utils.mkdirs(dev_path)
with mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]):
self.assertEqual([], list(self.reconstructor.collect_parts()))
datadir_path = os.path.join(dev_path,
diskfile.get_data_dir(self.policy))
self.assertTrue(os.path.exists(datadir_path))
def test_collect_parts_creates_datadir_error(self):
# create just the device path
datadir_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy))
utils.mkdirs(os.path.dirname(datadir_path))
with mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]), \
mock.patch('swift.obj.reconstructor.mkdirs',
side_effect=OSError('kaboom!')):
self.assertEqual([], list(self.reconstructor.collect_parts()))
error_lines = self.logger.get_lines_for_level('error')
self.assertEqual(len(error_lines), 1)
line = error_lines[0]
self.assertTrue('Unable to create' in line)
self.assertTrue(datadir_path in line)
def test_collect_parts_skips_invalid_paths(self):
datadir_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy))
utils.mkdirs(os.path.dirname(datadir_path))
with open(datadir_path, 'w') as f:
f.write('junk')
with mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]):
self.assertEqual([], list(self.reconstructor.collect_parts()))
self.assertTrue(os.path.exists(datadir_path))
error_lines = self.logger.get_lines_for_level('error')
self.assertEqual(len(error_lines), 1)
line = error_lines[0]
self.assertTrue('Unable to list partitions' in line)
self.assertTrue(datadir_path in line)
def test_collect_parts_removes_non_partition_files(self):
# create some junk next to partitions
datadir_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy))
num_parts = 3
for part in range(num_parts):
utils.mkdirs(os.path.join(datadir_path, str(part)))
junk_file = os.path.join(datadir_path, 'junk')
with open(junk_file, 'w') as f:
f.write('junk')
with mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]):
part_infos = list(self.reconstructor.collect_parts())
# the file is not included in the part_infos map
self.assertEqual(sorted(p['part_path'] for p in part_infos),
sorted([os.path.join(datadir_path, str(i))
for i in range(num_parts)]))
# and gets cleaned up
self.assertFalse(os.path.exists(junk_file))
def test_collect_parts_overrides(self):
# setup multiple devices, with multiple parts
device_parts = {
'sda': (374, 843),
'sdb': (179, 807),
'sdc': (363, 468, 843),
}
datadir = diskfile.get_data_dir(self.policy)
for dev, parts in device_parts.items():
for part in parts:
utils.mkdirs(os.path.join(
self.devices, dev,
datadir, str(part)))
# we're only going to add sda and sdc into the ring
local_devs = ('sda', 'sdc')
stub_ring_devs = [{
'device': dev,
'replication_ip': self.ip,
'replication_port': self.port
} for dev in local_devs]
expected = (
({}, [
('sda', 374),
('sda', 843),
('sdc', 363),
('sdc', 468),
('sdc', 843),
]),
({'override_devices': ['sda', 'sdc']}, [
('sda', 374),
('sda', 843),
('sdc', 363),
('sdc', 468),
('sdc', 843),
]),
({'override_devices': ['sdc']}, [
('sdc', 363),
('sdc', 468),
('sdc', 843),
]),
({'override_devices': ['sda']}, [
('sda', 374),
('sda', 843),
]),
({'override_devices': ['sdx']}, []),
({'override_partitions': [374]}, [
('sda', 374),
]),
({'override_partitions': [843]}, [
('sda', 843),
('sdc', 843),
]),
({'override_partitions': [843], 'override_devices': ['sda']}, [
('sda', 843),
]),
)
with mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]), \
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs):
for kwargs, expected_parts in expected:
part_infos = list(self.reconstructor.collect_parts(**kwargs))
expected_paths = set(
os.path.join(self.devices, dev, datadir, str(part))
for dev, part in expected_parts)
found_paths = set(p['part_path'] for p in part_infos)
msg = 'expected %r != %r for %r' % (
expected_paths, found_paths, kwargs)
self.assertEqual(expected_paths, found_paths, msg)
def test_build_jobs_creates_empty_hashes(self):
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy), '0')
utils.mkdirs(part_path)
part_info = {
'local_dev': self.local_dev,
'policy': self.policy,
'partition': 0,
'part_path': part_path,
}
jobs = self.reconstructor.build_reconstruction_jobs(part_info)
self.assertEqual(1, len(jobs))
job = jobs[0]
self.assertEqual(job['job_type'], object_reconstructor.SYNC)
self.assertEqual(job['frag_index'], 0)
self.assertEqual(job['suffixes'], [])
self.assertEqual(len(job['sync_to']), 2)
self.assertEqual(job['partition'], 0)
self.assertEqual(job['path'], part_path)
self.assertEqual(job['hashes'], {})
self.assertEqual(job['policy'], self.policy)
self.assertEqual(job['local_dev'], self.local_dev)
self.assertEqual(job['device'], self.local_dev['device'])
hashes_file = os.path.join(part_path,
diskfile.HASH_FILE)
self.assertTrue(os.path.exists(hashes_file))
suffixes = self.reconstructor._get_hashes(
self.policy, part_path, do_listdir=True)
self.assertEqual(suffixes, {})
def test_build_jobs_no_hashes(self):
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy), '0')
part_info = {
'local_dev': self.local_dev,
'policy': self.policy,
'partition': 0,
'part_path': part_path,
}
stub_hashes = {}
with mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes)):
jobs = self.reconstructor.build_reconstruction_jobs(part_info)
self.assertEqual(1, len(jobs))
job = jobs[0]
self.assertEqual(job['job_type'], object_reconstructor.SYNC)
self.assertEqual(job['frag_index'], 0)
self.assertEqual(job['suffixes'], [])
self.assertEqual(len(job['sync_to']), 2)
self.assertEqual(job['partition'], 0)
self.assertEqual(job['path'], part_path)
self.assertEqual(job['hashes'], {})
self.assertEqual(job['policy'], self.policy)
self.assertEqual(job['local_dev'], self.local_dev)
self.assertEqual(job['device'], self.local_dev['device'])
def test_build_jobs_primary(self):
ring = self.policy.object_ring = FabricatedRing()
# find a partition for which we're a primary
for partition in range(2 ** ring.part_power):
part_nodes = ring.get_part_nodes(partition)
try:
frag_index = [n['id'] for n in part_nodes].index(
self.local_dev['id'])
except ValueError:
pass
else:
break
else:
self.fail("the ring doesn't work: %r" % ring._replica2part2dev_id)
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy),
str(partition))
part_info = {
'local_dev': self.local_dev,
'policy': self.policy,
'partition': partition,
'part_path': part_path,
}
stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'},
'abc': {frag_index: 'hash', None: 'hash'},
}
with mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes)):
jobs = self.reconstructor.build_reconstruction_jobs(part_info)
self.assertEqual(1, len(jobs))
job = jobs[0]
self.assertEqual(job['job_type'], object_reconstructor.SYNC)
self.assertEqual(job['frag_index'], frag_index)
self.assertEqual(job['suffixes'], stub_hashes.keys())
self.assertEqual(set([n['index'] for n in job['sync_to']]),
set([(frag_index + 1) % ring.replicas,
(frag_index - 1) % ring.replicas]))
self.assertEqual(job['partition'], partition)
self.assertEqual(job['path'], part_path)
self.assertEqual(job['hashes'], stub_hashes)
self.assertEqual(job['policy'], self.policy)
self.assertEqual(job['local_dev'], self.local_dev)
self.assertEqual(job['device'], self.local_dev['device'])
def test_build_jobs_handoff(self):
ring = self.policy.object_ring = FabricatedRing()
# find a partition for which we're a handoff
for partition in range(2 ** ring.part_power):
part_nodes = ring.get_part_nodes(partition)
if self.local_dev['id'] not in [n['id'] for n in part_nodes]:
break
else:
self.fail("the ring doesn't work: %r" % ring._replica2part2dev_id)
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy),
str(partition))
part_info = {
'local_dev': self.local_dev,
'policy': self.policy,
'partition': partition,
'part_path': part_path,
}
# since this part doesn't belong on us it doesn't matter what
# frag_index we have
frag_index = random.randint(0, ring.replicas - 1)
stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'},
'abc': {None: 'hash'},
}
with mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes)):
jobs = self.reconstructor.build_reconstruction_jobs(part_info)
self.assertEqual(1, len(jobs))
job = jobs[0]
self.assertEqual(job['job_type'], object_reconstructor.REVERT)
self.assertEqual(job['frag_index'], frag_index)
self.assertEqual(sorted(job['suffixes']), sorted(stub_hashes.keys()))
self.assertEqual(len(job['sync_to']), 1)
self.assertEqual(job['sync_to'][0]['index'], frag_index)
self.assertEqual(job['path'], part_path)
self.assertEqual(job['partition'], partition)
self.assertEqual(sorted(job['hashes']), sorted(stub_hashes))
self.assertEqual(job['local_dev'], self.local_dev)
def test_build_jobs_mixed(self):
ring = self.policy.object_ring = FabricatedRing()
# find a partition for which we're a primary
for partition in range(2 ** ring.part_power):
part_nodes = ring.get_part_nodes(partition)
try:
frag_index = [n['id'] for n in part_nodes].index(
self.local_dev['id'])
except ValueError:
pass
else:
break
else:
self.fail("the ring doesn't work: %r" % ring._replica2part2dev_id)
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy),
str(partition))
part_info = {
'local_dev': self.local_dev,
'policy': self.policy,
'partition': partition,
'part_path': part_path,
}
other_frag_index = random.choice([f for f in range(ring.replicas)
if f != frag_index])
stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'},
'456': {other_frag_index: 'hash', None: 'hash'},
'abc': {None: 'hash'},
}
with mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes)):
jobs = self.reconstructor.build_reconstruction_jobs(part_info)
self.assertEqual(2, len(jobs))
sync_jobs, revert_jobs = [], []
for job in jobs:
self.assertEqual(job['partition'], partition)
self.assertEqual(job['path'], part_path)
self.assertEqual(sorted(job['hashes']), sorted(stub_hashes))
self.assertEqual(job['policy'], self.policy)
self.assertEqual(job['local_dev'], self.local_dev)
self.assertEqual(job['device'], self.local_dev['device'])
{
object_reconstructor.SYNC: sync_jobs,
object_reconstructor.REVERT: revert_jobs,
}[job['job_type']].append(job)
self.assertEqual(1, len(sync_jobs))
job = sync_jobs[0]
self.assertEqual(job['frag_index'], frag_index)
self.assertEqual(sorted(job['suffixes']), sorted(['123', 'abc']))
self.assertEqual(len(job['sync_to']), 2)
self.assertEqual(set([n['index'] for n in job['sync_to']]),
set([(frag_index + 1) % ring.replicas,
(frag_index - 1) % ring.replicas]))
self.assertEqual(1, len(revert_jobs))
job = revert_jobs[0]
self.assertEqual(job['frag_index'], other_frag_index)
self.assertEqual(job['suffixes'], ['456'])
self.assertEqual(len(job['sync_to']), 1)
self.assertEqual(job['sync_to'][0]['index'], other_frag_index)
def test_build_jobs_revert_only_tombstones(self):
ring = self.policy.object_ring = FabricatedRing()
# find a partition for which we're a handoff
for partition in range(2 ** ring.part_power):
part_nodes = ring.get_part_nodes(partition)
if self.local_dev['id'] not in [n['id'] for n in part_nodes]:
break
else:
self.fail("the ring doesn't work: %r" % ring._replica2part2dev_id)
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy),
str(partition))
part_info = {
'local_dev': self.local_dev,
'policy': self.policy,
'partition': partition,
'part_path': part_path,
}
# we have no fragment index to hint the jobs where they belong
stub_hashes = {
'123': {None: 'hash'},
'abc': {None: 'hash'},
}
with mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes)):
jobs = self.reconstructor.build_reconstruction_jobs(part_info)
self.assertEqual(len(jobs), 1)
job = jobs[0]
expected = {
'job_type': object_reconstructor.REVERT,
'frag_index': None,
'suffixes': stub_hashes.keys(),
'partition': partition,
'path': part_path,
'hashes': stub_hashes,
'policy': self.policy,
'local_dev': self.local_dev,
'device': self.local_dev['device'],
}
self.assertEqual(ring.replica_count, len(job['sync_to']))
for k, v in expected.items():
msg = 'expected %s != %s for %s' % (
v, job[k], k)
self.assertEqual(v, job[k], msg)
def test_get_suffix_delta(self):
# different
local_suff = {'123': {None: 'abc', 0: 'def'}}
remote_suff = {'456': {None: 'ghi', 0: 'jkl'}}
local_index = 0
remote_index = 0
suffs = self.reconstructor.get_suffix_delta(local_suff,
local_index,
remote_suff,
remote_index)
self.assertEqual(suffs, ['123'])
# now the same
remote_suff = {'123': {None: 'abc', 0: 'def'}}
suffs = self.reconstructor.get_suffix_delta(local_suff,
local_index,
remote_suff,
remote_index)
self.assertEqual(suffs, [])
# now with a mis-matched None key (missing durable)
remote_suff = {'123': {None: 'ghi', 0: 'def'}}
suffs = self.reconstructor.get_suffix_delta(local_suff,
local_index,
remote_suff,
remote_index)
self.assertEqual(suffs, ['123'])
# now with bogus local index
local_suff = {'123': {None: 'abc', 99: 'def'}}
remote_suff = {'456': {None: 'ghi', 0: 'jkl'}}
suffs = self.reconstructor.get_suffix_delta(local_suff,
local_index,
remote_suff,
remote_index)
self.assertEqual(suffs, ['123'])
def test_process_job_primary_in_sync(self):
replicas = self.policy.object_ring.replicas
frag_index = random.randint(0, replicas - 1)
sync_to = [n for n in self.policy.object_ring.devs
if n != self.local_dev][:2]
# setup left and right hashes
stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'},
'abc': {frag_index: 'hash', None: 'hash'},
}
left_index = sync_to[0]['index'] = (frag_index - 1) % replicas
left_hashes = {
'123': {left_index: 'hash', None: 'hash'},
'abc': {left_index: 'hash', None: 'hash'},
}
right_index = sync_to[1]['index'] = (frag_index + 1) % replicas
right_hashes = {
'123': {right_index: 'hash', None: 'hash'},
'abc': {right_index: 'hash', None: 'hash'},
}
partition = 0
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy),
str(partition))
job = {
'job_type': object_reconstructor.SYNC,
'frag_index': frag_index,
'suffixes': stub_hashes.keys(),
'sync_to': sync_to,
'partition': partition,
'path': part_path,
'hashes': stub_hashes,
'policy': self.policy,
'local_dev': self.local_dev,
}
responses = [(200, pickle.dumps(hashes)) for hashes in (
left_hashes, right_hashes)]
codes, body_iter = zip(*responses)
ssync_calls = []
with mock_ssync_sender(ssync_calls), \
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes)), \
mocked_http_conn(*codes, body_iter=body_iter) as request_log:
self.reconstructor.process_job(job)
expected_suffix_calls = set([
('10.0.0.1', '/sdb/0'),
('10.0.0.2', '/sdc/0'),
])
self.assertEqual(expected_suffix_calls,
set((r['ip'], r['path'])
for r in request_log.requests))
self.assertEqual(len(ssync_calls), 0)
def test_process_job_primary_not_in_sync(self):
replicas = self.policy.object_ring.replicas
frag_index = random.randint(0, replicas - 1)
sync_to = [n for n in self.policy.object_ring.devs
if n != self.local_dev][:2]
# setup left and right hashes
stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'},
'abc': {frag_index: 'hash', None: 'hash'},
}
sync_to[0]['index'] = (frag_index - 1) % replicas
left_hashes = {}
sync_to[1]['index'] = (frag_index + 1) % replicas
right_hashes = {}
partition = 0
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy),
str(partition))
job = {
'job_type': object_reconstructor.SYNC,
'frag_index': frag_index,
'suffixes': stub_hashes.keys(),
'sync_to': sync_to,
'partition': partition,
'path': part_path,
'hashes': stub_hashes,
'policy': self.policy,
'local_dev': self.local_dev,
}
responses = [(200, pickle.dumps(hashes)) for hashes in (
left_hashes, left_hashes, right_hashes, right_hashes)]
codes, body_iter = zip(*responses)
ssync_calls = []
with mock_ssync_sender(ssync_calls), \
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes)), \
mocked_http_conn(*codes, body_iter=body_iter) as request_log:
self.reconstructor.process_job(job)
expected_suffix_calls = set([
('10.0.0.1', '/sdb/0'),
('10.0.0.1', '/sdb/0/123-abc'),
('10.0.0.2', '/sdc/0'),
('10.0.0.2', '/sdc/0/123-abc'),
])
self.assertEqual(expected_suffix_calls,
set((r['ip'], r['path'])
for r in request_log.requests))
expected_ssync_calls = sorted([
('10.0.0.1', 0, set(['123', 'abc'])),
('10.0.0.2', 0, set(['123', 'abc'])),
])
self.assertEqual(expected_ssync_calls, sorted((
c['node']['ip'],
c['job']['partition'],
set(c['suffixes']),
) for c in ssync_calls))
def test_process_job_sync_missing_durable(self):
replicas = self.policy.object_ring.replicas
frag_index = random.randint(0, replicas - 1)
sync_to = [n for n in self.policy.object_ring.devs
if n != self.local_dev][:2]
# setup left and right hashes
stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'},
'abc': {frag_index: 'hash', None: 'hash'},
}
# left hand side is in sync
left_index = sync_to[0]['index'] = (frag_index - 1) % replicas
left_hashes = {
'123': {left_index: 'hash', None: 'hash'},
'abc': {left_index: 'hash', None: 'hash'},
}
# right hand side has fragment, but no durable (None key is whack)
right_index = sync_to[1]['index'] = (frag_index + 1) % replicas
right_hashes = {
'123': {right_index: 'hash', None: 'hash'},
'abc': {right_index: 'hash', None: 'different-because-durable'},
}
partition = 0
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy),
str(partition))
job = {
'job_type': object_reconstructor.SYNC,
'frag_index': frag_index,
'suffixes': stub_hashes.keys(),
'sync_to': sync_to,
'partition': partition,
'path': part_path,
'hashes': stub_hashes,
'policy': self.policy,
'local_dev': self.local_dev,
}
responses = [(200, pickle.dumps(hashes)) for hashes in (
left_hashes, right_hashes, right_hashes)]
codes, body_iter = zip(*responses)
ssync_calls = []
with mock_ssync_sender(ssync_calls), \
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes)), \
mocked_http_conn(*codes, body_iter=body_iter) as request_log:
self.reconstructor.process_job(job)
expected_suffix_calls = set([
('10.0.0.1', '/sdb/0'),
('10.0.0.2', '/sdc/0'),
('10.0.0.2', '/sdc/0/abc'),
])
self.assertEqual(expected_suffix_calls,
set((r['ip'], r['path'])
for r in request_log.requests))
expected_ssync_calls = sorted([
('10.0.0.2', 0, ['abc']),
])
self.assertEqual(expected_ssync_calls, sorted((
c['node']['ip'],
c['job']['partition'],
c['suffixes'],
) for c in ssync_calls))
def test_process_job_primary_some_in_sync(self):
replicas = self.policy.object_ring.replicas
frag_index = random.randint(0, replicas - 1)
sync_to = [n for n in self.policy.object_ring.devs
if n != self.local_dev][:2]
# setup left and right hashes
stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'},
'abc': {frag_index: 'hash', None: 'hash'},
}
left_index = sync_to[0]['index'] = (frag_index - 1) % replicas
left_hashes = {
'123': {left_index: 'hashX', None: 'hash'},
'abc': {left_index: 'hash', None: 'hash'},
}
right_index = sync_to[1]['index'] = (frag_index + 1) % replicas
right_hashes = {
'123': {right_index: 'hash', None: 'hash'},
}
partition = 0
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy),
str(partition))
job = {
'job_type': object_reconstructor.SYNC,
'frag_index': frag_index,
'suffixes': stub_hashes.keys(),
'sync_to': sync_to,
'partition': partition,
'path': part_path,
'hashes': stub_hashes,
'policy': self.policy,
'local_dev': self.local_dev,
}
responses = [(200, pickle.dumps(hashes)) for hashes in (
left_hashes, left_hashes, right_hashes, right_hashes)]
codes, body_iter = zip(*responses)
ssync_calls = []
with mock_ssync_sender(ssync_calls), \
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes)), \
mocked_http_conn(*codes, body_iter=body_iter) as request_log:
self.reconstructor.process_job(job)
expected_suffix_calls = set([
('10.0.0.1', '/sdb/0'),
('10.0.0.1', '/sdb/0/123'),
('10.0.0.2', '/sdc/0'),
('10.0.0.2', '/sdc/0/abc'),
])
self.assertEqual(expected_suffix_calls,
set((r['ip'], r['path'])
for r in request_log.requests))
self.assertEqual(len(ssync_calls), 2)
self.assertEqual(set(c['node']['index'] for c in ssync_calls),
set([left_index, right_index]))
for call in ssync_calls:
if call['node']['index'] == left_index:
self.assertEqual(call['suffixes'], ['123'])
elif call['node']['index'] == right_index:
self.assertEqual(call['suffixes'], ['abc'])
else:
self.fail('unexpected call %r' % call)
def test_process_job_primary_down(self):
replicas = self.policy.object_ring.replicas
partition = 0
frag_index = random.randint(0, replicas - 1)
stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'},
'abc': {frag_index: 'hash', None: 'hash'},
}
part_nodes = self.policy.object_ring.get_part_nodes(partition)
sync_to = part_nodes[:2]
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy),
str(partition))
job = {
'job_type': object_reconstructor.SYNC,
'frag_index': frag_index,
'suffixes': stub_hashes.keys(),
'sync_to': sync_to,
'partition': partition,
'path': part_path,
'hashes': stub_hashes,
'policy': self.policy,
'device': self.local_dev['device'],
'local_dev': self.local_dev,
}
non_local = {'called': 0}
def ssync_response_callback(*args):
# in this test, ssync fails on the first (primary sync_to) node
if non_local['called'] >= 1:
return True, {}
non_local['called'] += 1
return False, {}
expected_suffix_calls = set()
for node in part_nodes[:3]:
expected_suffix_calls.update([
(node['replication_ip'], '/%s/0' % node['device']),
(node['replication_ip'], '/%s/0/123-abc' % node['device']),
])
ssync_calls = []
with mock_ssync_sender(ssync_calls,
response_callback=ssync_response_callback), \
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes)), \
mocked_http_conn(*[200] * len(expected_suffix_calls),
body=pickle.dumps({})) as request_log:
self.reconstructor.process_job(job)
found_suffix_calls = set((r['ip'], r['path'])
for r in request_log.requests)
self.assertEqual(expected_suffix_calls, found_suffix_calls)
expected_ssync_calls = sorted([
('10.0.0.0', 0, set(['123', 'abc'])),
('10.0.0.1', 0, set(['123', 'abc'])),
('10.0.0.2', 0, set(['123', 'abc'])),
])
found_ssync_calls = sorted((
c['node']['ip'],
c['job']['partition'],
set(c['suffixes']),
) for c in ssync_calls)
self.assertEqual(expected_ssync_calls, found_ssync_calls)
def test_process_job_suffix_call_errors(self):
replicas = self.policy.object_ring.replicas
partition = 0
frag_index = random.randint(0, replicas - 1)
stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'},
'abc': {frag_index: 'hash', None: 'hash'},
}
part_nodes = self.policy.object_ring.get_part_nodes(partition)
sync_to = part_nodes[:2]
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy),
str(partition))
job = {
'job_type': object_reconstructor.SYNC,
'frag_index': frag_index,
'suffixes': stub_hashes.keys(),
'sync_to': sync_to,
'partition': partition,
'path': part_path,
'hashes': stub_hashes,
'policy': self.policy,
'device': self.local_dev['device'],
'local_dev': self.local_dev,
}
expected_suffix_calls = set((
node['replication_ip'], '/%s/0' % node['device']
) for node in part_nodes)
possible_errors = [404, 507, Timeout(), Exception('kaboom!')]
codes = [random.choice(possible_errors)
for r in expected_suffix_calls]
ssync_calls = []
with mock_ssync_sender(ssync_calls), \
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes)), \
mocked_http_conn(*codes) as request_log:
self.reconstructor.process_job(job)
found_suffix_calls = set((r['ip'], r['path'])
for r in request_log.requests)
self.assertEqual(expected_suffix_calls, found_suffix_calls)
self.assertFalse(ssync_calls)
def test_process_job_handoff(self):
replicas = self.policy.object_ring.replicas
frag_index = random.randint(0, replicas - 1)
sync_to = [random.choice([n for n in self.policy.object_ring.devs
if n != self.local_dev])]
sync_to[0]['index'] = frag_index
stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'},
'abc': {frag_index: 'hash', None: 'hash'},
}
partition = 0
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy),
str(partition))
job = {
'job_type': object_reconstructor.REVERT,
'frag_index': frag_index,
'suffixes': stub_hashes.keys(),
'sync_to': sync_to,
'partition': partition,
'path': part_path,
'hashes': stub_hashes,
'policy': self.policy,
'local_dev': self.local_dev,
}
ssync_calls = []
with mock_ssync_sender(ssync_calls), \
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes)), \
mocked_http_conn(200, body=pickle.dumps({})) as request_log:
self.reconstructor.process_job(job)
expected_suffix_calls = set([
(sync_to[0]['ip'], '/%s/0/123-abc' % sync_to[0]['device']),
])
found_suffix_calls = set((r['ip'], r['path'])
for r in request_log.requests)
self.assertEqual(expected_suffix_calls, found_suffix_calls)
self.assertEqual(len(ssync_calls), 1)
call = ssync_calls[0]
self.assertEqual(call['node'], sync_to[0])
self.assertEqual(set(call['suffixes']), set(['123', 'abc']))
def test_process_job_revert_to_handoff(self):
replicas = self.policy.object_ring.replicas
frag_index = random.randint(0, replicas - 1)
sync_to = [random.choice([n for n in self.policy.object_ring.devs
if n != self.local_dev])]
sync_to[0]['index'] = frag_index
partition = 0
handoff = next(self.policy.object_ring.get_more_nodes(partition))
stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'},
'abc': {frag_index: 'hash', None: 'hash'},
}
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy),
str(partition))
job = {
'job_type': object_reconstructor.REVERT,
'frag_index': frag_index,
'suffixes': stub_hashes.keys(),
'sync_to': sync_to,
'partition': partition,
'path': part_path,
'hashes': stub_hashes,
'policy': self.policy,
'local_dev': self.local_dev,
}
non_local = {'called': 0}
def ssync_response_callback(*args):
# in this test, ssync fails on the first (primary sync_to) node
if non_local['called'] >= 1:
return True, {}
non_local['called'] += 1
return False, {}
expected_suffix_calls = set([
(node['replication_ip'], '/%s/0/123-abc' % node['device'])
for node in (sync_to[0], handoff)
])
ssync_calls = []
with mock_ssync_sender(ssync_calls,
response_callback=ssync_response_callback), \
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes)), \
mocked_http_conn(*[200] * len(expected_suffix_calls),
body=pickle.dumps({})) as request_log:
self.reconstructor.process_job(job)
found_suffix_calls = set((r['ip'], r['path'])
for r in request_log.requests)
self.assertEqual(expected_suffix_calls, found_suffix_calls)
self.assertEqual(len(ssync_calls), len(expected_suffix_calls))
call = ssync_calls[0]
self.assertEqual(call['node'], sync_to[0])
self.assertEqual(set(call['suffixes']), set(['123', 'abc']))
def test_process_job_revert_is_handoff(self):
replicas = self.policy.object_ring.replicas
frag_index = random.randint(0, replicas - 1)
sync_to = [random.choice([n for n in self.policy.object_ring.devs
if n != self.local_dev])]
sync_to[0]['index'] = frag_index
partition = 0
handoff_nodes = list(self.policy.object_ring.get_more_nodes(partition))
stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'},
'abc': {frag_index: 'hash', None: 'hash'},
}
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy),
str(partition))
job = {
'job_type': object_reconstructor.REVERT,
'frag_index': frag_index,
'suffixes': stub_hashes.keys(),
'sync_to': sync_to,
'partition': partition,
'path': part_path,
'hashes': stub_hashes,
'policy': self.policy,
'local_dev': handoff_nodes[-1],
}
def ssync_response_callback(*args):
# in this test ssync always fails, until we encounter ourselves in
# the list of possible handoff's to sync to
return False, {}
expected_suffix_calls = set([
(sync_to[0]['replication_ip'],
'/%s/0/123-abc' % sync_to[0]['device'])
] + [
(node['replication_ip'], '/%s/0/123-abc' % node['device'])
for node in handoff_nodes[:-1]
])
ssync_calls = []
with mock_ssync_sender(ssync_calls,
response_callback=ssync_response_callback), \
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes)), \
mocked_http_conn(*[200] * len(expected_suffix_calls),
body=pickle.dumps({})) as request_log:
self.reconstructor.process_job(job)
found_suffix_calls = set((r['ip'], r['path'])
for r in request_log.requests)
self.assertEqual(expected_suffix_calls, found_suffix_calls)
# this is ssync call to primary (which fails) plus the ssync call to
# all of the handoffs (except the last one - which is the local_dev)
self.assertEqual(len(ssync_calls), len(handoff_nodes))
call = ssync_calls[0]
self.assertEqual(call['node'], sync_to[0])
self.assertEqual(set(call['suffixes']), set(['123', 'abc']))
def test_process_job_revert_cleanup(self):
replicas = self.policy.object_ring.replicas
frag_index = random.randint(0, replicas - 1)
sync_to = [random.choice([n for n in self.policy.object_ring.devs
if n != self.local_dev])]
sync_to[0]['index'] = frag_index
partition = 0
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy),
str(partition))
os.makedirs(part_path)
df_mgr = self.reconstructor._df_router[self.policy]
df = df_mgr.get_diskfile(self.local_dev['device'], partition, 'a',
'c', 'data-obj', policy=self.policy)
ts = self.ts()
with df.create() as writer:
test_data = 'test data'
writer.write(test_data)
metadata = {
'X-Timestamp': ts.internal,
'Content-Length': len(test_data),
'Etag': md5(test_data).hexdigest(),
'X-Object-Sysmeta-Ec-Frag-Index': frag_index,
}
writer.put(metadata)
writer.commit(ts)
ohash = os.path.basename(df._datadir)
suffix = os.path.basename(os.path.dirname(df._datadir))
job = {
'job_type': object_reconstructor.REVERT,
'frag_index': frag_index,
'suffixes': [suffix],
'sync_to': sync_to,
'partition': partition,
'path': part_path,
'hashes': {},
'policy': self.policy,
'local_dev': self.local_dev,
}
def ssync_response_callback(*args):
Add POST capability to ssync for .meta files ssync currently does the wrong thing when replicating object dirs containing both a .data and a .meta file. The ssync sender uses a single PUT to send both object content and metadata to the receiver, using the metadata (.meta file) timestamp. This results in the object content timestamp being advanced to the metadata timestamp, potentially overwriting newer object data on the receiver and causing an inconsistency with the container server record for the object. For example, replicating an object dir with {t0.data(etag=x), t2.meta} to a receiver with t1.data(etag=y) will result in the creation of t2.data(etag=x) on the receiver. However, the container server will continue to list the object as t1(etag=y). This patch modifies ssync to replicate the content of .data and .meta separately using a PUT request for the data (no change) and a POST request for the metadata. In effect, ssync replication replicates the client operations that generated the .data and .meta files so that the result of replication is the same as if the original client requests had persisted on all object servers. Apart from maintaining correct timestamps across sync'd nodes, this has the added benefit of not needing to PUT objects when only the metadata has changed and a POST will suffice. Taking the same example, ssync sender will no longer PUT t0.data but will POST t2.meta resulting in the receiver having t1.data and t2.meta. The changes are backwards compatible: an upgraded sender will only sync data files to a legacy receiver and will not sync meta files (fixing the erroneous behavior described above); a legacy sender will operate as before when sync'ing to an upgraded receiver. Changes: - diskfile API provides methods to get the data file timestamp as distinct from the diskfile timestamp. - diskfile yield_hashes return tuple now passes a dict mapping data and meta (if any) timestamps to their respective values in the timestamp field. - ssync_sender will encode data and meta timestamps in the (hash_path, timestamp) tuple sent to the receiver during missing_checks. - ssync_receiver compares sender's data and meta timestamps to any local diskfile and may specify that only data or meta parts are sent during updates phase by appending a qualifier to the hash returned in its 'wanted' list. - ssync_sender now sends POST subrequests when a meta file exists and its content needs to be replicated. - ssync_sender may send *only* a POST if the receiver indicates that is the only part required to be sync'd. - object server will allow PUT and DELETE with earlier timestamp than a POST - Fixed TODO related to replicated objects with fast-POST and ssync Related spec change-id: I60688efc3df692d3a39557114dca8c5490f7837e Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com> Closes-Bug: 1501528 Change-Id: I97552d194e5cc342b0a3f4b9800de8aa6b9cb85b
2015-04-22 12:56:50 +01:00
return True, {ohash: {'ts_data': ts}}
ssync_calls = []
with mock_ssync_sender(ssync_calls,
response_callback=ssync_response_callback):
with mocked_http_conn(200, body=pickle.dumps({})) as request_log:
self.reconstructor.process_job(job)
self.assertEqual([
(sync_to[0]['replication_ip'], '/%s/0/%s' % (
sync_to[0]['device'], suffix)),
], [
(r['ip'], r['path']) for r in request_log.requests
])
# hashpath is still there, but only the durable remains
files = os.listdir(df._datadir)
self.assertEqual(1, len(files))
self.assertTrue(files[0].endswith('.durable'))
# and more to the point, the next suffix recalc will clean it up
df_mgr = self.reconstructor._df_router[self.policy]
df_mgr.get_hashes(self.local_dev['device'], str(partition), [],
self.policy)
self.assertFalse(os.access(df._datadir, os.F_OK))
def test_process_job_revert_cleanup_tombstone(self):
sync_to = [random.choice([n for n in self.policy.object_ring.devs
if n != self.local_dev])]
partition = 0
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy),
str(partition))
os.makedirs(part_path)
df_mgr = self.reconstructor._df_router[self.policy]
df = df_mgr.get_diskfile(self.local_dev['device'], partition, 'a',
'c', 'data-obj', policy=self.policy)
ts = self.ts()
df.delete(ts)
ohash = os.path.basename(df._datadir)
suffix = os.path.basename(os.path.dirname(df._datadir))
job = {
'job_type': object_reconstructor.REVERT,
'frag_index': None,
'suffixes': [suffix],
'sync_to': sync_to,
'partition': partition,
'path': part_path,
'hashes': {},
'policy': self.policy,
'local_dev': self.local_dev,
}
def ssync_response_callback(*args):
Add POST capability to ssync for .meta files ssync currently does the wrong thing when replicating object dirs containing both a .data and a .meta file. The ssync sender uses a single PUT to send both object content and metadata to the receiver, using the metadata (.meta file) timestamp. This results in the object content timestamp being advanced to the metadata timestamp, potentially overwriting newer object data on the receiver and causing an inconsistency with the container server record for the object. For example, replicating an object dir with {t0.data(etag=x), t2.meta} to a receiver with t1.data(etag=y) will result in the creation of t2.data(etag=x) on the receiver. However, the container server will continue to list the object as t1(etag=y). This patch modifies ssync to replicate the content of .data and .meta separately using a PUT request for the data (no change) and a POST request for the metadata. In effect, ssync replication replicates the client operations that generated the .data and .meta files so that the result of replication is the same as if the original client requests had persisted on all object servers. Apart from maintaining correct timestamps across sync'd nodes, this has the added benefit of not needing to PUT objects when only the metadata has changed and a POST will suffice. Taking the same example, ssync sender will no longer PUT t0.data but will POST t2.meta resulting in the receiver having t1.data and t2.meta. The changes are backwards compatible: an upgraded sender will only sync data files to a legacy receiver and will not sync meta files (fixing the erroneous behavior described above); a legacy sender will operate as before when sync'ing to an upgraded receiver. Changes: - diskfile API provides methods to get the data file timestamp as distinct from the diskfile timestamp. - diskfile yield_hashes return tuple now passes a dict mapping data and meta (if any) timestamps to their respective values in the timestamp field. - ssync_sender will encode data and meta timestamps in the (hash_path, timestamp) tuple sent to the receiver during missing_checks. - ssync_receiver compares sender's data and meta timestamps to any local diskfile and may specify that only data or meta parts are sent during updates phase by appending a qualifier to the hash returned in its 'wanted' list. - ssync_sender now sends POST subrequests when a meta file exists and its content needs to be replicated. - ssync_sender may send *only* a POST if the receiver indicates that is the only part required to be sync'd. - object server will allow PUT and DELETE with earlier timestamp than a POST - Fixed TODO related to replicated objects with fast-POST and ssync Related spec change-id: I60688efc3df692d3a39557114dca8c5490f7837e Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com> Closes-Bug: 1501528 Change-Id: I97552d194e5cc342b0a3f4b9800de8aa6b9cb85b
2015-04-22 12:56:50 +01:00
return True, {ohash: {'ts_data': ts}}
ssync_calls = []
with mock_ssync_sender(ssync_calls,
response_callback=ssync_response_callback):
with mocked_http_conn(200, body=pickle.dumps({})) as request_log:
self.reconstructor.process_job(job)
self.assertEqual([
(sync_to[0]['replication_ip'], '/%s/0/%s' % (
sync_to[0]['device'], suffix)),
], [
(r['ip'], r['path']) for r in request_log.requests
])
# hashpath is still there, but it's empty
self.assertEqual([], os.listdir(df._datadir))
def test_reconstruct_fa_no_errors(self):
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1)
responses = list()
for body in ec_archive_bodies:
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
responses.append((200, body, headers))
# make a hook point at
# swift.obj.reconstructor.ObjectReconstructor._get_response
called_headers = []
orig_func = object_reconstructor.ObjectReconstructor._get_response
def _get_response_hook(self, node, part, path, headers, policy):
called_headers.append(headers)
return orig_func(self, node, part, path, headers, policy)
codes, body_iter, headers = zip(*responses)
get_response_path = \
'swift.obj.reconstructor.ObjectReconstructor._get_response'
with mock.patch(get_response_path, _get_response_hook):
with mocked_http_conn(
*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, metadata)
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
for called_header in called_headers:
called_header = HeaderKeyDict(called_header)
self.assertTrue('Content-Length' in called_header)
self.assertEqual(called_header['Content-Length'], '0')
self.assertTrue('User-Agent' in called_header)
user_agent = called_header['User-Agent']
self.assertTrue(user_agent.startswith('obj-reconstructor'))
def test_reconstruct_fa_errors_works(self):
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[4]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(4)
base_responses = list()
for body in ec_archive_bodies:
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
base_responses.append((200, body, headers))
# since we're already missing a fragment a +2 scheme can only support
# one additional failure at a time
for error in (Timeout(), 404, Exception('kaboom!')):
responses = base_responses
error_index = random.randint(0, len(responses) - 1)
responses[error_index] = (error, '', '')
codes, body_iter, headers_iter = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter,
headers=headers_iter):
df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata))
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
def test_reconstruct_parity_fa_with_data_node_failure(self):
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[-4]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
}
# make up some data (trim some amount to make it unaligned with
# segment size)
test_data = ('rebuild' * self.policy.ec_segment_size)[:-454]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
# the scheme is 10+4, so this gets a parity node
broken_body = ec_archive_bodies.pop(-4)
responses = list()
for body in ec_archive_bodies:
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
responses.append((200, body, headers))
for error in (Timeout(), 404, Exception('kaboom!')):
# grab a data node index
error_index = random.randint(0, self.policy.ec_ndata - 1)
responses[error_index] = (error, '', '')
codes, body_iter, headers_iter = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter,
headers=headers_iter):
df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata))
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
def test_reconstruct_fa_errors_fails(self):
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
policy = self.policy
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
}
possible_errors = [404, Timeout(), Exception('kaboom!')]
codes = [random.choice(possible_errors) for i in
range(policy.object_ring.replicas - 1)]
with mocked_http_conn(*codes):
self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa,
job, node, metadata)
def test_reconstruct_fa_with_mixed_old_etag(self):
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1)
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
# bad response
bad_headers = {
'X-Object-Sysmeta-Ec-Etag': 'some garbage',
'X-Backend-Timestamp': next(ts).internal,
}
# good responses
responses = list()
t1 = next(ts).internal
for body in ec_archive_bodies:
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag,
'X-Backend-Timestamp': t1})
responses.append((200, body, headers))
# mixed together
error_index = random.randint(0, self.policy.ec_ndata)
error_headers = get_header_frag_index(self,
(responses[error_index])[1])
error_headers.update(bad_headers)
bad_response = (200, '', bad_headers)
responses[error_index] = bad_response
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, metadata)
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
def test_reconstruct_fa_with_mixed_new_etag(self):
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1)
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
# good responses
responses = list()
t0 = next(ts).internal
for body in ec_archive_bodies:
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag,
'X-Backend-Timestamp': t0})
responses.append((200, body, headers))
# sanity check before negative test
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata))
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
# one newer etag can spoil the bunch
new_index = random.randint(0, len(responses) - self.policy.ec_nparity)
new_headers = get_header_frag_index(self, (responses[new_index])[1])
new_headers.update({'X-Object-Sysmeta-Ec-Etag': 'some garbage',
'X-Backend-Timestamp': next(ts).internal})
new_response = (200, '', new_headers)
responses[new_index] = new_response
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa,
job, node, dict(metadata))
def test_reconstruct_fa_finds_itself_does_not_fail(self):
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
# instead of popping the broken body, we'll just leave it in the list
# of responses and take away something else.
broken_body = ec_archive_bodies[1]
ec_archive_bodies = ec_archive_bodies[:-1]
def make_header(body):
metadata = self.policy.pyeclib_driver.get_metadata(body)
frag_index = struct.unpack('h', metadata[:2])[0]
return {
'X-Object-Sysmeta-Ec-Frag-Index': frag_index,
'X-Object-Sysmeta-Ec-Etag': etag,
}
responses = [(200, body, make_header(body))
for body in ec_archive_bodies]
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, metadata)
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
def test_reconstruct_fa_finds_duplicate_does_not_fail(self):
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1)
# add some duplicates
num_duplicates = self.policy.ec_nparity - 1
ec_archive_bodies = (ec_archive_bodies[:num_duplicates] +
ec_archive_bodies)[:-num_duplicates]
def make_header(body):
metadata = self.policy.pyeclib_driver.get_metadata(body)
frag_index = struct.unpack('h', metadata[:2])[0]
return {
'X-Object-Sysmeta-Ec-Frag-Index': frag_index,
'X-Object-Sysmeta-Ec-Etag': etag,
}
responses = [(200, body, make_header(body))
for body in ec_archive_bodies]
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, metadata)
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
if __name__ == '__main__':
unittest.main()