From 9c33bbde6923b26f111572ae967a3b97a8ab12f2 Mon Sep 17 00:00:00 2001 From: Prashanth Pai Date: Tue, 20 Jan 2015 12:14:32 +0530 Subject: [PATCH] Allow rsync to use compression From rsync's man page: -z, --compress With this option, rsync compresses the file data as it is sent to the destination machine, which reduces the amount of data being transmitted -- something that is useful over a slow connection. A configurable option has been added to allow rsync to compress, but only if the remote node is in a different region than the local one. NOTE: Objects that are already compressed (for example: .tar.gz, .mp3) might slow down the syncing process. On wire compression can also be extended to ssync later in a different change if required. In case of ssync, we could explore faster compression libraries like lz4. rsync uses zlib which is slow but offers higher compression ratio. Change-Id: Ic9b9cbff9b5e68bef8257b522cc352fc3544db3c Signed-off-by: Prashanth Pai --- etc/account-server.conf-sample | 5 +++ etc/container-server.conf-sample | 5 +++ etc/object-server.conf-sample | 7 +++ swift/common/db_replicator.py | 62 +++++++++++++++++++++----- swift/container/replicator.py | 5 ++- swift/obj/replicator.py | 7 +++ test/unit/common/test_db_replicator.py | 59 +++++++++++++++++++----- test/unit/obj/test_replicator.py | 37 +++++++++++++++ 8 files changed, 163 insertions(+), 24 deletions(-) diff --git a/etc/account-server.conf-sample b/etc/account-server.conf-sample index 6a7fcb929b..98c97acf6f 100644 --- a/etc/account-server.conf-sample +++ b/etc/account-server.conf-sample @@ -114,6 +114,11 @@ use = egg:swift#recon # of run_pause. # run_pause = 30 # +# Allow rsync to compress data which is transmitted to destination node +# during sync. However, this is applicable only when destination node is in +# a different region than the local one. +# rsync_compress = no +# # recon_cache_path = /var/cache/swift [account-auditor] diff --git a/etc/container-server.conf-sample b/etc/container-server.conf-sample index de511368ad..7405a3d250 100644 --- a/etc/container-server.conf-sample +++ b/etc/container-server.conf-sample @@ -115,6 +115,11 @@ use = egg:swift#recon # of run_pause. # run_pause = 30 # +# Allow rsync to compress data which is transmitted to destination node +# during sync. However, this is applicable only when destination node is in +# a different region than the local one. +# rsync_compress = no +# # recon_cache_path = /var/cache/swift [container-updater] diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index b594a9576f..933f30f2f1 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -174,6 +174,13 @@ use = egg:swift#recon # passed to rsync for io op timeout # rsync_io_timeout = 30 # +# Allow rsync to compress data which is transmitted to destination node +# during sync. However, this is applicable only when destination node is in +# a different region than the local one. +# NOTE: Objects that are already compressed (for example: .tar.gz, .mp3) might +# slow down the syncing process. +# rsync_compress = no +# # node_timeout = # max duration of an http request; this is for REPLICATE finalization calls and # so should be longer than node_timeout diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py index e456beed75..334cf74347 100644 --- a/swift/common/db_replicator.py +++ b/swift/common/db_replicator.py @@ -167,6 +167,8 @@ class Replicator(Daemon): self.vm_test_mode = config_true_value(conf.get('vm_test_mode', 'no')) self.node_timeout = int(conf.get('node_timeout', 10)) self.conn_timeout = float(conf.get('conn_timeout', 0.5)) + self.rsync_compress = config_true_value( + conf.get('rsync_compress', 'no')) self.reclaim_age = float(conf.get('reclaim_age', 86400 * 7)) swift.common.db.DB_PREALLOCATION = \ config_true_value(conf.get('db_preallocation', 'f')) @@ -209,13 +211,16 @@ class Replicator(Daemon): ('no_change', 'hashmatch', 'rsync', 'diff', 'ts_repl', 'empty', 'diff_capped')])) - def _rsync_file(self, db_file, remote_file, whole_file=True): + def _rsync_file(self, db_file, remote_file, whole_file=True, + different_region=False): """ Sync a single file using rsync. Used by _rsync_db to handle syncing. :param db_file: file to be synced :param remote_file: remote location to sync the DB file to :param whole-file: if True, uses rsync's --whole-file flag + :param different_region: if True, the destination node is in a + different region :returns: True if the sync was successful, False otherwise """ @@ -224,6 +229,12 @@ class Replicator(Daemon): '--contimeout=%s' % int(math.ceil(self.conn_timeout))] if whole_file: popen_args.append('--whole-file') + + if self.rsync_compress and different_region: + # Allow for compression, but only if the remote node is in + # a different region than the local one. + popen_args.append('--compress') + popen_args.extend([db_file, remote_file]) proc = subprocess.Popen(popen_args) proc.communicate() @@ -233,7 +244,8 @@ class Replicator(Daemon): return proc.returncode == 0 def _rsync_db(self, broker, device, http, local_id, - replicate_method='complete_rsync', replicate_timeout=None): + replicate_method='complete_rsync', replicate_timeout=None, + different_region=False): """ Sync a whole db using rsync. @@ -243,6 +255,8 @@ class Replicator(Daemon): :param local_id: unique ID of the local database replica :param replicate_method: remote operation to perform after rsync :param replicate_timeout: timeout to wait in seconds + :param different_region: if True, the destination node is in a + different region """ device_ip = rsync_ip(device['replication_ip']) if self.vm_test_mode: @@ -253,14 +267,17 @@ class Replicator(Daemon): remote_file = '%s::%s/%s/tmp/%s' % ( device_ip, self.server_type, device['device'], local_id) mtime = os.path.getmtime(broker.db_file) - if not self._rsync_file(broker.db_file, remote_file): + if not self._rsync_file(broker.db_file, remote_file, + different_region=different_region): return False # perform block-level sync if the db was modified during the first sync if os.path.exists(broker.db_file + '-journal') or \ os.path.getmtime(broker.db_file) > mtime: # grab a lock so nobody else can modify it with broker.lock(): - if not self._rsync_file(broker.db_file, remote_file, False): + if not self._rsync_file(broker.db_file, remote_file, + whole_file=False, + different_region=different_region): return False with Timeout(replicate_timeout or self.node_timeout): response = http.replicate(replicate_method, local_id) @@ -363,7 +380,8 @@ class Replicator(Daemon): 'put_timestamp', 'delete_timestamp', 'metadata') return tuple(info[key] for key in sync_args_order) - def _repl_to_node(self, node, broker, partition, info): + def _repl_to_node(self, node, broker, partition, info, + different_region=False): """ Replicate a database to a node. @@ -373,6 +391,8 @@ class Replicator(Daemon): :param info: DB info as a dictionary of {'max_row', 'hash', 'id', 'created_at', 'put_timestamp', 'delete_timestamp', 'metadata'} + :param different_region: if True, the destination node is in a + different region :returns: True if successful, False otherwise """ @@ -382,13 +402,16 @@ class Replicator(Daemon): response = http.replicate('sync', *sync_args) if not response: return False - return self._handle_sync_response(node, response, info, broker, http) + return self._handle_sync_response(node, response, info, broker, http, + different_region=different_region) - def _handle_sync_response(self, node, response, info, broker, http): + def _handle_sync_response(self, node, response, info, broker, http, + different_region=False): if response.status == HTTP_NOT_FOUND: # completely missing, rsync self.stats['rsync'] += 1 self.logger.increment('rsyncs') - return self._rsync_db(broker, node, http, info['id']) + return self._rsync_db(broker, node, http, info['id'], + different_region=different_region) elif response.status == HTTP_INSUFFICIENT_STORAGE: raise DriveNotMounted() elif response.status >= 200 and response.status < 300: @@ -403,7 +426,8 @@ class Replicator(Daemon): self.logger.increment('remote_merges') return self._rsync_db(broker, node, http, info['id'], replicate_method='rsync_then_merge', - replicate_timeout=(info['count'] / 2000)) + replicate_timeout=(info['count'] / 2000), + different_region=different_region) # else send diffs over to the remote server return self._usync_db(max(rinfo['point'], local_sync), broker, http, rinfo['id'], info['id']) @@ -470,6 +494,11 @@ class Replicator(Daemon): return responses = [] nodes = self.ring.get_part_nodes(int(partition)) + local_dev = None + for node in nodes: + if node['id'] == node_id: + local_dev = node + break if shouldbehere: shouldbehere = bool([n for n in nodes if n['id'] == node_id]) # See Footnote [1] for an explanation of the repl_nodes assignment. @@ -478,10 +507,23 @@ class Replicator(Daemon): i += 1 repl_nodes = nodes[i + 1:] + nodes[:i] more_nodes = self.ring.get_more_nodes(int(partition)) + if not local_dev: + # Check further if local device is a handoff node + for node in more_nodes: + if node['id'] == node_id: + local_dev = node + break for node in repl_nodes: + different_region = False + if local_dev and local_dev['region'] != node['region']: + # This additional information will help later if we + # want to handle syncing to a node in different + # region with some optimizations. + different_region = True success = False try: - success = self._repl_to_node(node, broker, partition, info) + success = self._repl_to_node(node, broker, partition, info, + different_region) except DriveNotMounted: repl_nodes.append(more_nodes.next()) self.logger.error(_('ERROR Remote drive not mounted %s'), node) diff --git a/swift/container/replicator.py b/swift/container/replicator.py index 8974535251..8d3bfce7f8 100644 --- a/swift/container/replicator.py +++ b/swift/container/replicator.py @@ -59,7 +59,8 @@ class ContainerReplicator(db_replicator.Replicator): 'storage_policy_index')) return sync_args - def _handle_sync_response(self, node, response, info, broker, http): + def _handle_sync_response(self, node, response, info, broker, http, + different_region): parent = super(ContainerReplicator, self) if is_success(response.status): remote_info = json.loads(response.data) @@ -74,7 +75,7 @@ class ContainerReplicator(db_replicator.Replicator): broker.merge_timestamps(*(remote_info[key] for key in sync_timestamps)) rv = parent._handle_sync_response( - node, response, info, broker, http) + node, response, info, broker, http, different_region) return rv def find_local_handoff_for_part(self, part): diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index b3df0ce28f..eb65eb3879 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -76,6 +76,8 @@ class ObjectReplicator(Daemon): self.rsync_timeout = int(conf.get('rsync_timeout', 900)) self.rsync_io_timeout = conf.get('rsync_io_timeout', '30') self.rsync_bwlimit = conf.get('rsync_bwlimit', '0') + self.rsync_compress = config_true_value( + conf.get('rsync_compress', 'no')) self.http_timeout = int(conf.get('http_timeout', 60)) self.lockup_timeout = int(conf.get('lockup_timeout', 1800)) self.recon_cache_path = conf.get('recon_cache_path', @@ -183,6 +185,11 @@ class ObjectReplicator(Daemon): '--contimeout=%s' % self.rsync_io_timeout, '--bwlimit=%s' % self.rsync_bwlimit, ] + if self.rsync_compress and \ + job['region'] != node['region']: + # Allow for compression, but only if the remote node is in + # a different region than the local one. + args.append('--compress') node_ip = rsync_ip(node['replication_ip']) if self.vm_test_mode: rsync_module = '%s::object%s' % (node_ip, node['replication_port']) diff --git a/test/unit/common/test_db_replicator.py b/test/unit/common/test_db_replicator.py index 0f3cc72e94..e50aa68dae 100644 --- a/test/unit/common/test_db_replicator.py +++ b/test/unit/common/test_db_replicator.py @@ -92,22 +92,23 @@ class FakeRingWithNodes(object): class Ring(object): devs = [dict( id=1, weight=10.0, zone=1, ip='1.1.1.1', port=6000, device='sdb', - meta='' + meta='', replication_ip='1.1.1.1', replication_port=6000, region=1 ), dict( id=2, weight=10.0, zone=2, ip='1.1.1.2', port=6000, device='sdb', - meta='' + meta='', replication_ip='1.1.1.2', replication_port=6000, region=2 ), dict( id=3, weight=10.0, zone=3, ip='1.1.1.3', port=6000, device='sdb', - meta='' + meta='', replication_ip='1.1.1.3', replication_port=6000, region=1 ), dict( id=4, weight=10.0, zone=4, ip='1.1.1.4', port=6000, device='sdb', - meta='' + meta='', replication_ip='1.1.1.4', replication_port=6000, region=2 ), dict( id=5, weight=10.0, zone=5, ip='1.1.1.5', port=6000, device='sdb', - meta='' + meta='', replication_ip='1.1.1.5', replication_port=6000, region=1 ), dict( id=6, weight=10.0, zone=6, ip='1.1.1.6', port=6000, device='sdb', - meta='')] + meta='', replication_ip='1.1.1.6', replication_port=6000, region=2 + )] def __init__(self, path, reload_time=15, ring_name=None): pass @@ -334,9 +335,26 @@ class TestDBReplicator(unittest.TestCase): '/some/file', 'remote:/some_file'],) self.assertEqual(exp_args, process.args) + def test_rsync_file_popen_args_different_region_and_rsync_compress(self): + replicator = TestReplicator({}) + for rsync_compress in (False, True): + replicator.rsync_compress = rsync_compress + for different_region in (False, True): + with _mock_process(0) as process: + replicator._rsync_file('/some/file', 'remote:/some_file', + False, different_region) + if rsync_compress and different_region: + # --compress arg should be passed to rsync binary + # only when rsync_compress option is enabled + # AND destination node is in a different + # region + self.assertTrue('--compress' in process.args[0]) + else: + self.assertFalse('--compress' in process.args[0]) + def test_rsync_db(self): replicator = TestReplicator({}) - replicator._rsync_file = lambda *args: True + replicator._rsync_file = lambda *args, **kwargs: True fake_device = {'replication_ip': '127.0.0.1', 'device': 'sda1'} replicator._rsync_db(FakeBroker(), fake_device, ReplHttp(), 'abcd') @@ -355,7 +373,8 @@ class TestDBReplicator(unittest.TestCase): self.db_file = db_file self.remote_file = remote_file - def _rsync_file(self_, db_file, remote_file, whole_file=True): + def _rsync_file(self_, db_file, remote_file, whole_file=True, + different_region=False): self.assertEqual(self_.db_file, db_file) self.assertEqual(self_.remote_file, remote_file) self_._rsync_file_called = True @@ -403,7 +422,8 @@ class TestDBReplicator(unittest.TestCase): self.broker = broker self._rsync_file_call_count = 0 - def _rsync_file(self_, db_file, remote_file, whole_file=True): + def _rsync_file(self_, db_file, remote_file, whole_file=True, + different_region=False): self_._rsync_file_call_count += 1 if self_._rsync_file_call_count == 1: self.assertEquals(True, whole_file) @@ -630,6 +650,20 @@ class TestDBReplicator(unittest.TestCase): [(('Found /path/to/file for /a%20c%20t/c%20o%20n when it should ' 'be on partition 0; will replicate out and remove.',), {})]) + def test_replicate_object_different_region(self): + db_replicator.ring = FakeRingWithNodes() + replicator = TestReplicator({}) + replicator._repl_to_node = mock.Mock() + # For node_id = 1, one replica in same region(1) and other is in a + # different region(2). Refer: FakeRingWithNodes + replicator._replicate_object('0', '/path/to/file', 1) + # different_region was set True and passed to _repl_to_node() + self.assertEqual(replicator._repl_to_node.call_args_list[0][0][-1], + True) + # different_region was set False and passed to _repl_to_node() + self.assertEqual(replicator._repl_to_node.call_args_list[1][0][-1], + False) + def test_delete_db(self): db_replicator.lock_parent_directory = lock_parent_directory replicator = TestReplicator({}, logger=unit.FakeLogger()) @@ -1202,7 +1236,8 @@ class TestReplToNode(unittest.TestCase): mock.call(self.broker, self.fake_node, self.http, self.fake_info['id'], replicate_method='rsync_then_merge', - replicate_timeout=(self.fake_info['count'] / 2000)) + replicate_timeout=(self.fake_info['count'] / 2000), + different_region=False) ]) def test_repl_to_node_already_in_sync(self): @@ -1217,13 +1252,13 @@ class TestReplToNode(unittest.TestCase): def test_repl_to_node_not_found(self): self.http = ReplHttp('{"id": 3, "point": -1}', set_status=404) self.assertEquals(self.replicator._repl_to_node( - self.fake_node, self.broker, '0', self.fake_info), True) + self.fake_node, self.broker, '0', self.fake_info, False), True) self.replicator.logger.increment.assert_has_calls([ mock.call.increment('rsyncs') ]) self.replicator._rsync_db.assert_has_calls([ mock.call(self.broker, self.fake_node, self.http, - self.fake_info['id']) + self.fake_info['id'], different_region=False) ]) def test_repl_to_node_drive_not_mounted(self): diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index bf1c5bcb52..0bb86794ee 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -1116,6 +1116,43 @@ class TestObjectReplicator(unittest.TestCase): '/a83', headers=self.headers)) mock_http.assert_has_calls(reqs, any_order=True) + def test_rsync_compress_different_region(self): + self.assertEqual(self.replicator.sync_method, self.replicator.rsync) + jobs = self.replicator.collect_jobs() + _m_rsync = mock.Mock(return_value=0) + _m_os_path_exists = mock.Mock(return_value=True) + with mock.patch.object(self.replicator, '_rsync', _m_rsync): + with mock.patch('os.path.exists', _m_os_path_exists): + for job in jobs: + self.assertTrue('region' in job) + for node in job['nodes']: + for rsync_compress in (True, False): + self.replicator.rsync_compress = rsync_compress + ret = \ + self.replicator.sync(node, job, + ['fake_suffix']) + self.assertTrue(ret) + if node['region'] != job['region']: + if rsync_compress: + # --compress arg should be passed to rsync + # binary only when rsync_compress option is + # enabled AND destination node is in a + # different region + self.assertTrue('--compress' in + _m_rsync.call_args[0][0]) + else: + self.assertFalse('--compress' in + _m_rsync.call_args[0][0]) + else: + self.assertFalse('--compress' in + _m_rsync.call_args[0][0]) + self.assertEqual( + _m_os_path_exists.call_args_list[-1][0][0], + os.path.join(job['path'], 'fake_suffix')) + self.assertEqual( + _m_os_path_exists.call_args_list[-2][0][0], + os.path.join(job['path'])) + if __name__ == '__main__': unittest.main()