Support -d <devs> and -p <partitions> in DB replicators.
Similar to the object replicator and reconstructor, these arguments are comma-separated lists of device names and partitions, respectively, on which the account or container replicator will operate. Other devices and partitions are ignored. Change-Id: Ic108f5c38f700ac4c7bcf8315bf4c55306951361
This commit is contained in:
parent
47fed6f2f9
commit
b08c70d38e
@ -14,10 +14,21 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import optparse
|
||||||
|
|
||||||
from swift.account.replicator import AccountReplicator
|
from swift.account.replicator import AccountReplicator
|
||||||
from swift.common.utils import parse_options
|
from swift.common.utils import parse_options
|
||||||
from swift.common.daemon import run_daemon
|
from swift.common.daemon import run_daemon
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
conf_file, options = parse_options(once=True)
|
parser = optparse.OptionParser("%prog CONFIG [options]")
|
||||||
|
parser.add_option('-d', '--devices',
|
||||||
|
help=('Replicate only given devices. '
|
||||||
|
'Comma-separated list. '
|
||||||
|
'Only has effect if --once is used.'))
|
||||||
|
parser.add_option('-p', '--partitions',
|
||||||
|
help=('Replicate only given partitions. '
|
||||||
|
'Comma-separated list. '
|
||||||
|
'Only has effect if --once is used.'))
|
||||||
|
conf_file, options = parse_options(parser=parser, once=True)
|
||||||
run_daemon(AccountReplicator, conf_file, **options)
|
run_daemon(AccountReplicator, conf_file, **options)
|
||||||
|
@ -14,10 +14,21 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import optparse
|
||||||
|
|
||||||
from swift.container.replicator import ContainerReplicator
|
from swift.container.replicator import ContainerReplicator
|
||||||
from swift.common.utils import parse_options
|
from swift.common.utils import parse_options
|
||||||
from swift.common.daemon import run_daemon
|
from swift.common.daemon import run_daemon
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
conf_file, options = parse_options(once=True)
|
parser = optparse.OptionParser("%prog CONFIG [options]")
|
||||||
|
parser.add_option('-d', '--devices',
|
||||||
|
help=('Replicate only given devices. '
|
||||||
|
'Comma-separated list. '
|
||||||
|
'Only has effect if --once is used.'))
|
||||||
|
parser.add_option('-p', '--partitions',
|
||||||
|
help=('Replicate only given partitions. '
|
||||||
|
'Comma-separated list. '
|
||||||
|
'Only has effect if --once is used.'))
|
||||||
|
conf_file, options = parse_options(parser=parser, once=True)
|
||||||
run_daemon(ContainerReplicator, conf_file, **options)
|
run_daemon(ContainerReplicator, conf_file, **options)
|
||||||
|
@ -33,7 +33,7 @@ from swift.common.direct_client import quote
|
|||||||
from swift.common.utils import get_logger, whataremyips, storage_directory, \
|
from swift.common.utils import get_logger, whataremyips, storage_directory, \
|
||||||
renamer, mkdirs, lock_parent_directory, config_true_value, \
|
renamer, mkdirs, lock_parent_directory, config_true_value, \
|
||||||
unlink_older_than, dump_recon_cache, rsync_module_interpolation, \
|
unlink_older_than, dump_recon_cache, rsync_module_interpolation, \
|
||||||
json, Timestamp
|
json, Timestamp, list_from_csv
|
||||||
from swift.common import ring
|
from swift.common import ring
|
||||||
from swift.common.ring.utils import is_local_device
|
from swift.common.ring.utils import is_local_device
|
||||||
from swift.common.http import HTTP_NOT_FOUND, HTTP_INSUFFICIENT_STORAGE
|
from swift.common.http import HTTP_NOT_FOUND, HTTP_INSUFFICIENT_STORAGE
|
||||||
@ -47,6 +47,20 @@ from swift.common.swob import Response, HTTPNotFound, HTTPNoContent, \
|
|||||||
DEBUG_TIMINGS_THRESHOLD = 10
|
DEBUG_TIMINGS_THRESHOLD = 10
|
||||||
|
|
||||||
|
|
||||||
|
def parse_overrides(daemon_kwargs):
|
||||||
|
devices = list_from_csv(daemon_kwargs.get('devices', ''))
|
||||||
|
if not devices:
|
||||||
|
devices = Everything()
|
||||||
|
|
||||||
|
partitions = [
|
||||||
|
int(part) for part in
|
||||||
|
list_from_csv(daemon_kwargs.get('partitions', ''))]
|
||||||
|
if not partitions:
|
||||||
|
partitions = Everything()
|
||||||
|
|
||||||
|
return devices, partitions
|
||||||
|
|
||||||
|
|
||||||
def quarantine_db(object_file, server_type):
|
def quarantine_db(object_file, server_type):
|
||||||
"""
|
"""
|
||||||
In the case that a corrupt file is found, move it to a quarantined area to
|
In the case that a corrupt file is found, move it to a quarantined area to
|
||||||
@ -93,8 +107,7 @@ def roundrobin_datadirs(datadirs):
|
|||||||
|
|
||||||
def walk_datadir(datadir, node_id, part_filter):
|
def walk_datadir(datadir, node_id, part_filter):
|
||||||
partitions = [pd for pd in os.listdir(datadir)
|
partitions = [pd for pd in os.listdir(datadir)
|
||||||
if looks_like_partition(pd)
|
if looks_like_partition(pd) and part_filter(pd)]
|
||||||
and (part_filter is None or part_filter(pd))]
|
|
||||||
random.shuffle(partitions)
|
random.shuffle(partitions)
|
||||||
for partition in partitions:
|
for partition in partitions:
|
||||||
part_dir = os.path.join(datadir, partition)
|
part_dir = os.path.join(datadir, partition)
|
||||||
@ -136,6 +149,15 @@ def roundrobin_datadirs(datadirs):
|
|||||||
its.remove(it)
|
its.remove(it)
|
||||||
|
|
||||||
|
|
||||||
|
class Everything(object):
|
||||||
|
"""
|
||||||
|
A container that contains everything. If "e" is an instance of
|
||||||
|
Everything, then "x in e" is true for all x.
|
||||||
|
"""
|
||||||
|
def __contains__(self, element):
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
class ReplConnection(BufferedHTTPConnection):
|
class ReplConnection(BufferedHTTPConnection):
|
||||||
"""
|
"""
|
||||||
Helper to simplify REPLICATEing to a remote server.
|
Helper to simplify REPLICATEing to a remote server.
|
||||||
@ -634,12 +656,22 @@ class Replicator(Daemon):
|
|||||||
return match.groups()[0]
|
return match.groups()[0]
|
||||||
return "UNKNOWN"
|
return "UNKNOWN"
|
||||||
|
|
||||||
def handoffs_only_filter(self, device_id):
|
def _partition_dir_filter(self, device_id, handoffs_only,
|
||||||
|
partitions_to_replicate):
|
||||||
|
|
||||||
def filt(partition_dir):
|
def filt(partition_dir):
|
||||||
partition = int(partition_dir)
|
partition = int(partition_dir)
|
||||||
primary_node_ids = [
|
if handoffs_only:
|
||||||
d['id'] for d in self.ring.get_part_nodes(partition)]
|
primary_node_ids = [
|
||||||
return device_id not in primary_node_ids
|
d['id'] for d in self.ring.get_part_nodes(partition)]
|
||||||
|
if device_id in primary_node_ids:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if partition not in partitions_to_replicate:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
return filt
|
return filt
|
||||||
|
|
||||||
def report_up_to_date(self, full_info):
|
def report_up_to_date(self, full_info):
|
||||||
@ -647,6 +679,8 @@ class Replicator(Daemon):
|
|||||||
|
|
||||||
def run_once(self, *args, **kwargs):
|
def run_once(self, *args, **kwargs):
|
||||||
"""Run a replication pass once."""
|
"""Run a replication pass once."""
|
||||||
|
devices_to_replicate, partitions_to_replicate = parse_overrides(kwargs)
|
||||||
|
|
||||||
self._zero_stats()
|
self._zero_stats()
|
||||||
dirs = []
|
dirs = []
|
||||||
ips = whataremyips(self.bind_ip)
|
ips = whataremyips(self.bind_ip)
|
||||||
@ -676,15 +710,21 @@ class Replicator(Daemon):
|
|||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
_('Skipping %(device)s as it is not mounted') % node)
|
_('Skipping %(device)s as it is not mounted') % node)
|
||||||
continue
|
continue
|
||||||
|
if node['device'] not in devices_to_replicate:
|
||||||
|
self.logger.debug(
|
||||||
|
'Skipping device %s due to given arguments',
|
||||||
|
node['device'])
|
||||||
|
continue
|
||||||
unlink_older_than(
|
unlink_older_than(
|
||||||
os.path.join(self.root, node['device'], 'tmp'),
|
os.path.join(self.root, node['device'], 'tmp'),
|
||||||
time.time() - self.reclaim_age)
|
time.time() - self.reclaim_age)
|
||||||
datadir = os.path.join(self.root, node['device'], self.datadir)
|
datadir = os.path.join(self.root, node['device'], self.datadir)
|
||||||
if os.path.isdir(datadir):
|
if os.path.isdir(datadir):
|
||||||
self._local_device_ids.add(node['id'])
|
self._local_device_ids.add(node['id'])
|
||||||
filt = (self.handoffs_only_filter(node['id'])
|
part_filt = self._partition_dir_filter(
|
||||||
if self.handoffs_only else None)
|
node['id'], self.handoffs_only,
|
||||||
dirs.append((datadir, node['id'], filt))
|
partitions_to_replicate)
|
||||||
|
dirs.append((datadir, node['id'], part_filt))
|
||||||
if not found_local:
|
if not found_local:
|
||||||
self.logger.error("Can't find itself %s with port %s in ring "
|
self.logger.error("Can't find itself %s with port %s in ring "
|
||||||
"file, not replicating",
|
"file, not replicating",
|
||||||
|
@ -1221,7 +1221,7 @@ class TestDBReplicator(unittest.TestCase):
|
|||||||
|
|
||||||
node_id = 1
|
node_id = 1
|
||||||
results = list(db_replicator.roundrobin_datadirs(
|
results = list(db_replicator.roundrobin_datadirs(
|
||||||
[(datadir, node_id, None)]))
|
[(datadir, node_id, lambda p: True)]))
|
||||||
expected = [
|
expected = [
|
||||||
('450', os.path.join(datadir, db_path), node_id),
|
('450', os.path.join(datadir, db_path), node_id),
|
||||||
]
|
]
|
||||||
@ -1243,13 +1243,13 @@ class TestDBReplicator(unittest.TestCase):
|
|||||||
set(os.listdir(datadir)))
|
set(os.listdir(datadir)))
|
||||||
|
|
||||||
results = list(db_replicator.roundrobin_datadirs(
|
results = list(db_replicator.roundrobin_datadirs(
|
||||||
[(datadir, node_id, None)]))
|
[(datadir, node_id, lambda p: True)]))
|
||||||
self.assertEqual(results, expected)
|
self.assertEqual(results, expected)
|
||||||
self.assertEqual({'1054', '1060', '450'},
|
self.assertEqual({'1054', '1060', '450'},
|
||||||
set(os.listdir(datadir)))
|
set(os.listdir(datadir)))
|
||||||
|
|
||||||
results = list(db_replicator.roundrobin_datadirs(
|
results = list(db_replicator.roundrobin_datadirs(
|
||||||
[(datadir, node_id, None)]))
|
[(datadir, node_id, lambda p: True)]))
|
||||||
self.assertEqual(results, expected)
|
self.assertEqual(results, expected)
|
||||||
# non db file in '1060' dir is not deleted and exception is handled
|
# non db file in '1060' dir is not deleted and exception is handled
|
||||||
self.assertEqual({'1060', '450'},
|
self.assertEqual({'1060', '450'},
|
||||||
@ -1336,8 +1336,8 @@ class TestDBReplicator(unittest.TestCase):
|
|||||||
mock.patch(base + 'random.shuffle', _shuffle), \
|
mock.patch(base + 'random.shuffle', _shuffle), \
|
||||||
mock.patch(base + 'os.rmdir', _rmdir):
|
mock.patch(base + 'os.rmdir', _rmdir):
|
||||||
|
|
||||||
datadirs = [('/srv/node/sda/containers', 1, None),
|
datadirs = [('/srv/node/sda/containers', 1, lambda p: True),
|
||||||
('/srv/node/sdb/containers', 2, None)]
|
('/srv/node/sdb/containers', 2, lambda p: True)]
|
||||||
results = list(db_replicator.roundrobin_datadirs(datadirs))
|
results = list(db_replicator.roundrobin_datadirs(datadirs))
|
||||||
# The results show that the .db files are returned, the devices
|
# The results show that the .db files are returned, the devices
|
||||||
# interleaved.
|
# interleaved.
|
||||||
@ -1584,6 +1584,71 @@ class TestHandoffsOnly(unittest.TestCase):
|
|||||||
'bcbcbcbc15d3835053d568c57e2c83b5',
|
'bcbcbcbc15d3835053d568c57e2c83b5',
|
||||||
'bcbcbcbc15d3835053d568c57e2c83b5.db'), 1)])
|
'bcbcbcbc15d3835053d568c57e2c83b5.db'), 1)])
|
||||||
|
|
||||||
|
def test_override_partitions(self):
|
||||||
|
replicator = TestReplicator({
|
||||||
|
'devices': self.root,
|
||||||
|
'bind_port': 6201,
|
||||||
|
'mount_check': 'no',
|
||||||
|
})
|
||||||
|
|
||||||
|
with patch.object(db_replicator, 'whataremyips',
|
||||||
|
return_value=['10.0.0.1']), \
|
||||||
|
patch.object(replicator, '_replicate_object') as mock_repl, \
|
||||||
|
patch.object(replicator, 'ring', self.FakeRing3Nodes()):
|
||||||
|
replicator.run_once(partitions="0,2")
|
||||||
|
|
||||||
|
self.assertEqual(sorted(mock_repl.mock_calls), [
|
||||||
|
mock.call('0', os.path.join(
|
||||||
|
self.root, 'sdp', 'containers', '0', '220',
|
||||||
|
'010101013cf2b7979af9eaa71cb67220',
|
||||||
|
'010101013cf2b7979af9eaa71cb67220.db'), 0),
|
||||||
|
mock.call('2', os.path.join(
|
||||||
|
self.root, 'sdq', 'containers', '2', '3b5',
|
||||||
|
'bcbcbcbc15d3835053d568c57e2c83b5',
|
||||||
|
'bcbcbcbc15d3835053d568c57e2c83b5.db'), 1)])
|
||||||
|
|
||||||
|
def test_override_devices(self):
|
||||||
|
replicator = TestReplicator({
|
||||||
|
'devices': self.root,
|
||||||
|
'bind_port': 6201,
|
||||||
|
'mount_check': 'no',
|
||||||
|
})
|
||||||
|
|
||||||
|
with patch.object(db_replicator, 'whataremyips',
|
||||||
|
return_value=['10.0.0.1']), \
|
||||||
|
patch.object(replicator, '_replicate_object') as mock_repl, \
|
||||||
|
patch.object(replicator, 'ring', self.FakeRing3Nodes()):
|
||||||
|
replicator.run_once(devices="sdp")
|
||||||
|
|
||||||
|
self.assertEqual(sorted(mock_repl.mock_calls), [
|
||||||
|
mock.call('0', os.path.join(
|
||||||
|
self.root, 'sdp', 'containers', '0', '220',
|
||||||
|
'010101013cf2b7979af9eaa71cb67220',
|
||||||
|
'010101013cf2b7979af9eaa71cb67220.db'), 0),
|
||||||
|
mock.call('1', os.path.join(
|
||||||
|
self.root, 'sdp', 'containers', '1', '98d',
|
||||||
|
'abababab2b5368158355e799323b498d',
|
||||||
|
'abababab2b5368158355e799323b498d.db'), 0)])
|
||||||
|
|
||||||
|
def test_override_devices_and_partitions(self):
|
||||||
|
replicator = TestReplicator({
|
||||||
|
'devices': self.root,
|
||||||
|
'bind_port': 6201,
|
||||||
|
'mount_check': 'no',
|
||||||
|
})
|
||||||
|
|
||||||
|
with patch.object(db_replicator, 'whataremyips',
|
||||||
|
return_value=['10.0.0.1']), \
|
||||||
|
patch.object(replicator, '_replicate_object') as mock_repl, \
|
||||||
|
patch.object(replicator, 'ring', self.FakeRing3Nodes()):
|
||||||
|
replicator.run_once(partitions="0,2", devices="sdp")
|
||||||
|
|
||||||
|
self.assertEqual(sorted(mock_repl.mock_calls), [
|
||||||
|
mock.call('0', os.path.join(
|
||||||
|
self.root, 'sdp', 'containers', '0', '220',
|
||||||
|
'010101013cf2b7979af9eaa71cb67220',
|
||||||
|
'010101013cf2b7979af9eaa71cb67220.db'), 0)])
|
||||||
|
|
||||||
|
|
||||||
class TestReplToNode(unittest.TestCase):
|
class TestReplToNode(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
|
Loading…
Reference in New Issue
Block a user