Per container stat. report
In addition to the container sync stat. report, keeping per container statistics allows administrator with more control over bytes transfered over a specific time per user account: The per container stats are crucial for billing purposes and provides the operator a 'progress bar' equivalent on the container's replication status. Change-Id: Ia8abcdaf53e466e8d60a957c76e32c2b2c5dc3fa
This commit is contained in:
parent
572be24cae
commit
c96d5c671d
@ -121,6 +121,50 @@ should be noted there is no way for an end user to detect sync progress or
|
||||
problems other than HEADing both containers and comparing the overall
|
||||
information.
|
||||
|
||||
|
||||
|
||||
-----------------------------
|
||||
Container Sync Statistics
|
||||
-----------------------------
|
||||
|
||||
Container Sync INFO level logs contains activity metrics and accounting
|
||||
information foe insightful tracking.
|
||||
Currently two different statistics are collected:
|
||||
|
||||
About once an hour or so, accumulated statistics of all operations performed
|
||||
by Container Sync are reported to the log file with the following format:
|
||||
"Since (time): (sync) synced [(delete) deletes, (put) puts], (skip) skipped,
|
||||
(fail) failed"
|
||||
time: last report time
|
||||
sync: number of containers with sync turned on that were successfully synced
|
||||
delete: number of successful DELETE object requests to the target cluster
|
||||
put: number of successful PUT object request to the target cluster
|
||||
skip: number of containers whose sync has been turned off, but are not
|
||||
yet cleared from the sync store
|
||||
fail: number of containers with failure (due to exception, timeout or other
|
||||
reason)
|
||||
|
||||
For each container synced, per container statistics are reported with the
|
||||
following format:
|
||||
Container sync report: (container), time window start: (start), time window
|
||||
end: %(end), puts: (puts), posts: (posts), deletes: (deletes), bytes: (bytes),
|
||||
sync_point1: (point1), sync_point2: (point2), total_rows: (total)
|
||||
container: account/container statistics are for
|
||||
start: report start time
|
||||
end: report end time
|
||||
puts: number of successful PUT object requests to the target container
|
||||
posts: N/A (0)
|
||||
deletes: number of successful DELETE object requests to the target container
|
||||
bytes: number of bytes sent over the network to the target container
|
||||
point1: progress indication - the container's x_container_sync_point1
|
||||
point2: progress indication - the container's x_container_sync_point2
|
||||
total: number of objects processed at the container
|
||||
|
||||
it is possible that more than one server syncs a container, therefore logfiles
|
||||
from all servers need to be evaluated
|
||||
|
||||
|
||||
|
||||
----------------------------------------------------------
|
||||
Using the ``swift`` tool to set up synchronized containers
|
||||
----------------------------------------------------------
|
||||
|
@ -13,6 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import collections
|
||||
import errno
|
||||
import os
|
||||
import uuid
|
||||
@ -198,6 +199,14 @@ class ContainerSync(Daemon):
|
||||
self.container_skips = 0
|
||||
#: Number of containers that had a failure of some type.
|
||||
self.container_failures = 0
|
||||
|
||||
#: Per container stats. These are collected per container.
|
||||
#: puts - the number of puts that were done for the container
|
||||
#: deletes - the number of deletes that were fot the container
|
||||
#: bytes - the total number of bytes transferred per the container
|
||||
self.container_stats = collections.defaultdict(int)
|
||||
self.container_stats.clear()
|
||||
|
||||
#: Time of last stats report.
|
||||
self.reported = time()
|
||||
self.swift_dir = conf.get('swift_dir', '/etc/swift')
|
||||
@ -239,6 +248,7 @@ class ContainerSync(Daemon):
|
||||
while True:
|
||||
begin = time()
|
||||
for path in self.sync_store.synced_containers_generator():
|
||||
self.container_stats.clear()
|
||||
self.container_sync(path)
|
||||
if time() - self.reported >= 3600: # once an hour
|
||||
self.report()
|
||||
@ -282,6 +292,30 @@ class ContainerSync(Daemon):
|
||||
self.container_skips = 0
|
||||
self.container_failures = 0
|
||||
|
||||
def container_report(self, start, end, sync_point1, sync_point2, info,
|
||||
max_row):
|
||||
self.logger.info(_('Container sync report: %(container)s, '
|
||||
'time window start: %(start)s, '
|
||||
'time window end: %(end)s, '
|
||||
'puts: %(puts)s, '
|
||||
'posts: %(posts)s, '
|
||||
'deletes: %(deletes)s, '
|
||||
'bytes: %(bytes)s, '
|
||||
'sync_point1: %(point1)s, '
|
||||
'sync_point2: %(point2)s, '
|
||||
'total_rows: %(total)s'),
|
||||
{'container': '%s/%s' % (info['account'],
|
||||
info['container']),
|
||||
'start': start,
|
||||
'end': end,
|
||||
'puts': self.container_stats['puts'],
|
||||
'posts': 0,
|
||||
'deletes': self.container_stats['deletes'],
|
||||
'bytes': self.container_stats['bytes'],
|
||||
'point1': sync_point1,
|
||||
'point2': sync_point2,
|
||||
'total': max_row})
|
||||
|
||||
def container_sync(self, path):
|
||||
"""
|
||||
Checks the given path for a container database, determines if syncing
|
||||
@ -339,51 +373,68 @@ class ContainerSync(Daemon):
|
||||
self.container_failures += 1
|
||||
self.logger.increment('failures')
|
||||
return
|
||||
stop_at = time() + self.container_time
|
||||
start_at = time()
|
||||
stop_at = start_at + self.container_time
|
||||
next_sync_point = None
|
||||
while time() < stop_at and sync_point2 < sync_point1:
|
||||
rows = broker.get_items_since(sync_point2, 1)
|
||||
if not rows:
|
||||
break
|
||||
row = rows[0]
|
||||
if row['ROWID'] > sync_point1:
|
||||
break
|
||||
# This node will only initially sync out one third of the
|
||||
# objects (if 3 replicas, 1/4 if 4, etc.) and will skip
|
||||
# problematic rows as needed in case of faults.
|
||||
# This section will attempt to sync previously skipped
|
||||
# rows in case the previous attempts by any of the nodes
|
||||
# didn't succeed.
|
||||
if not self.container_sync_row(
|
||||
row, sync_to, user_key, broker, info, realm,
|
||||
realm_key):
|
||||
if not next_sync_point:
|
||||
next_sync_point = sync_point2
|
||||
sync_point2 = row['ROWID']
|
||||
broker.set_x_container_sync_points(None, sync_point2)
|
||||
if next_sync_point:
|
||||
broker.set_x_container_sync_points(None, next_sync_point)
|
||||
while time() < stop_at:
|
||||
rows = broker.get_items_since(sync_point1, 1)
|
||||
if not rows:
|
||||
break
|
||||
row = rows[0]
|
||||
key = hash_path(info['account'], info['container'],
|
||||
row['name'], raw_digest=True)
|
||||
# This node will only initially sync out one third of the
|
||||
# objects (if 3 replicas, 1/4 if 4, etc.). It'll come back
|
||||
# around to the section above and attempt to sync
|
||||
# previously skipped rows in case the other nodes didn't
|
||||
# succeed or in case it failed to do so the first time.
|
||||
if unpack_from('>I', key)[0] % \
|
||||
len(nodes) == ordinal:
|
||||
self.container_sync_row(
|
||||
row, sync_to, user_key, broker, info, realm,
|
||||
realm_key)
|
||||
sync_point1 = row['ROWID']
|
||||
broker.set_x_container_sync_points(sync_point1, None)
|
||||
self.container_syncs += 1
|
||||
self.logger.increment('syncs')
|
||||
sync_stage_time = start_at
|
||||
try:
|
||||
while time() < stop_at and sync_point2 < sync_point1:
|
||||
rows = broker.get_items_since(sync_point2, 1)
|
||||
if not rows:
|
||||
break
|
||||
row = rows[0]
|
||||
if row['ROWID'] > sync_point1:
|
||||
break
|
||||
# This node will only initially sync out one third
|
||||
# of the objects (if 3 replicas, 1/4 if 4, etc.)
|
||||
# and will skip problematic rows as needed in case of
|
||||
# faults.
|
||||
# This section will attempt to sync previously skipped
|
||||
# rows in case the previous attempts by any of the
|
||||
# nodes didn't succeed.
|
||||
if not self.container_sync_row(
|
||||
row, sync_to, user_key, broker, info, realm,
|
||||
realm_key):
|
||||
if not next_sync_point:
|
||||
next_sync_point = sync_point2
|
||||
sync_point2 = row['ROWID']
|
||||
broker.set_x_container_sync_points(None, sync_point2)
|
||||
if next_sync_point:
|
||||
broker.set_x_container_sync_points(None,
|
||||
next_sync_point)
|
||||
else:
|
||||
next_sync_point = sync_point2
|
||||
sync_stage_time = time()
|
||||
while sync_stage_time < stop_at:
|
||||
rows = broker.get_items_since(sync_point1, 1)
|
||||
if not rows:
|
||||
break
|
||||
row = rows[0]
|
||||
key = hash_path(info['account'], info['container'],
|
||||
row['name'], raw_digest=True)
|
||||
# This node will only initially sync out one third of
|
||||
# the objects (if 3 replicas, 1/4 if 4, etc.).
|
||||
# It'll come back around to the section above
|
||||
# and attempt to sync previously skipped rows in case
|
||||
# the other nodes didn't succeed or in case it failed
|
||||
# to do so the first time.
|
||||
if unpack_from('>I', key)[0] % \
|
||||
len(nodes) == ordinal:
|
||||
self.container_sync_row(
|
||||
row, sync_to, user_key, broker, info, realm,
|
||||
realm_key)
|
||||
sync_point1 = row['ROWID']
|
||||
broker.set_x_container_sync_points(sync_point1, None)
|
||||
sync_stage_time = time()
|
||||
self.container_syncs += 1
|
||||
self.logger.increment('syncs')
|
||||
except Exception as ex:
|
||||
raise ex
|
||||
finally:
|
||||
self.container_report(start_at, sync_stage_time,
|
||||
sync_point1,
|
||||
next_sync_point,
|
||||
info, broker.get_max_row())
|
||||
except (Exception, Timeout):
|
||||
self.container_failures += 1
|
||||
self.logger.increment('failures')
|
||||
@ -506,6 +557,7 @@ class ContainerSync(Daemon):
|
||||
if err.http_status != HTTP_NOT_FOUND:
|
||||
raise
|
||||
self.container_deletes += 1
|
||||
self.container_stats['deletes'] += 1
|
||||
self.logger.increment('deletes')
|
||||
self.logger.timing_since('deletes.timing', start_time)
|
||||
else:
|
||||
@ -556,6 +608,8 @@ class ContainerSync(Daemon):
|
||||
proxy=self.select_http_proxy(), logger=self.logger,
|
||||
timeout=self.conn_timeout)
|
||||
self.container_puts += 1
|
||||
self.container_stats['puts'] += 1
|
||||
self.container_stats['bytes'] += row['size']
|
||||
self.logger.increment('puts')
|
||||
self.logger.timing_since('puts.timing', start_time)
|
||||
except ClientException as err:
|
||||
|
@ -58,6 +58,9 @@ class FakeContainerBroker(object):
|
||||
self.sync_point1 = -1
|
||||
self.sync_point2 = -1
|
||||
|
||||
def get_max_row(self):
|
||||
return 1
|
||||
|
||||
def get_info(self):
|
||||
return self.info
|
||||
|
||||
@ -736,6 +739,67 @@ class TestContainerSync(unittest.TestCase):
|
||||
sync.hash_path = orig_hash_path
|
||||
sync.delete_object = orig_delete_object
|
||||
|
||||
def test_container_report(self):
|
||||
container_stats = {'puts': 0,
|
||||
'deletes': 0,
|
||||
'bytes': 0}
|
||||
|
||||
def fake_container_sync_row(self, row, sync_to,
|
||||
user_key, broker, info, realm, realm_key):
|
||||
if 'deleted' in row:
|
||||
container_stats['deletes'] += 1
|
||||
return True
|
||||
|
||||
container_stats['puts'] += 1
|
||||
container_stats['bytes'] += row['size']
|
||||
return True
|
||||
|
||||
def fake_hash_path(account, container, obj, raw_digest=False):
|
||||
# Ensures that no rows match for second loop, ordinal is 0 and
|
||||
# all hashes are 1
|
||||
return '\x01' * 16
|
||||
|
||||
fcb = FakeContainerBroker(
|
||||
'path',
|
||||
info={'account': 'a', 'container': 'c',
|
||||
'storage_policy_index': 0,
|
||||
'x_container_sync_point1': 5,
|
||||
'x_container_sync_point2': -1},
|
||||
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
|
||||
'x-container-sync-key': ('key', 1)},
|
||||
items_since=[{'ROWID': 1, 'name': 'o1', 'size': 0,
|
||||
'deleted': True},
|
||||
{'ROWID': 2, 'name': 'o2', 'size': 1010},
|
||||
{'ROWID': 3, 'name': 'o3', 'size': 0,
|
||||
'deleted': True},
|
||||
{'ROWID': 4, 'name': 'o4', 'size': 90},
|
||||
{'ROWID': 5, 'name': 'o5', 'size': 0}])
|
||||
|
||||
with mock.patch('swift.container.sync.InternalClient'), \
|
||||
mock.patch('swift.container.sync.hash_path',
|
||||
fake_hash_path), \
|
||||
mock.patch('swift.container.sync.ContainerBroker',
|
||||
lambda p: fcb):
|
||||
cring = FakeRing()
|
||||
cs = sync.ContainerSync({}, container_ring=cring,
|
||||
logger=self.logger)
|
||||
cs.container_stats = container_stats
|
||||
cs._myips = ['10.0.0.0'] # Match
|
||||
cs._myport = 1000 # Match
|
||||
cs.allowed_sync_hosts = ['127.0.0.1']
|
||||
funcType = type(sync.ContainerSync.container_sync_row)
|
||||
cs.container_sync_row = funcType(fake_container_sync_row,
|
||||
cs, sync.ContainerSync)
|
||||
cs.container_sync('isa.db')
|
||||
# Succeeds because no rows match
|
||||
log_line = cs.logger.get_lines_for_level('info')[0]
|
||||
lines = log_line.split(',')
|
||||
self.assertTrue('sync_point2: 5', lines.pop().strip())
|
||||
self.assertTrue('sync_point1: 5', lines.pop().strip())
|
||||
self.assertTrue('bytes: 1100', lines.pop().strip())
|
||||
self.assertTrue('deletes: 2', lines.pop().strip())
|
||||
self.assertTrue('puts: 3', lines.pop().strip())
|
||||
|
||||
def test_container_sync_row_delete(self):
|
||||
self._test_container_sync_row_delete(None, None)
|
||||
|
||||
@ -783,7 +847,8 @@ class TestContainerSync(unittest.TestCase):
|
||||
self.assertTrue(cs.container_sync_row(
|
||||
{'deleted': True,
|
||||
'name': 'object',
|
||||
'created_at': created_at}, 'http://sync/to/path',
|
||||
'created_at': created_at,
|
||||
'size': '1000'}, 'http://sync/to/path',
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
@ -925,7 +990,8 @@ class TestContainerSync(unittest.TestCase):
|
||||
self.assertTrue(cs.container_sync_row(
|
||||
{'deleted': False,
|
||||
'name': 'object',
|
||||
'created_at': created_at}, 'http://sync/to/path',
|
||||
'created_at': created_at,
|
||||
'size': 50}, 'http://sync/to/path',
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
@ -953,7 +1019,8 @@ class TestContainerSync(unittest.TestCase):
|
||||
self.assertTrue(cs.container_sync_row(
|
||||
{'deleted': False,
|
||||
'name': 'object',
|
||||
'created_at': timestamp.internal}, 'http://sync/to/path',
|
||||
'created_at': timestamp.internal,
|
||||
'size': 60}, 'http://sync/to/path',
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
@ -966,7 +1033,8 @@ class TestContainerSync(unittest.TestCase):
|
||||
self.assertTrue(cs.container_sync_row(
|
||||
{'deleted': False,
|
||||
'name': 'object',
|
||||
'created_at': '1.1'}, 'http://sync/to/path',
|
||||
'created_at': '1.1',
|
||||
'size': 60}, 'http://sync/to/path',
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
@ -987,7 +1055,8 @@ class TestContainerSync(unittest.TestCase):
|
||||
self.assertFalse(cs.container_sync_row(
|
||||
{'deleted': False,
|
||||
'name': 'object',
|
||||
'created_at': timestamp.internal}, 'http://sync/to/path',
|
||||
'created_at': timestamp.internal,
|
||||
'size': 70}, 'http://sync/to/path',
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
@ -1011,7 +1080,8 @@ class TestContainerSync(unittest.TestCase):
|
||||
self.assertFalse(cs.container_sync_row(
|
||||
{'deleted': False,
|
||||
'name': 'object',
|
||||
'created_at': timestamp.internal}, 'http://sync/to/path',
|
||||
'created_at': timestamp.internal,
|
||||
'size': 80}, 'http://sync/to/path',
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
@ -1038,7 +1108,8 @@ class TestContainerSync(unittest.TestCase):
|
||||
self.assertFalse(cs.container_sync_row(
|
||||
{'deleted': False,
|
||||
'name': 'object',
|
||||
'created_at': timestamp.internal}, 'http://sync/to/path',
|
||||
'created_at': timestamp.internal,
|
||||
'size': 90}, 'http://sync/to/path',
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
@ -1055,7 +1126,8 @@ class TestContainerSync(unittest.TestCase):
|
||||
self.assertFalse(cs.container_sync_row(
|
||||
{'deleted': False,
|
||||
'name': 'object',
|
||||
'created_at': timestamp.internal}, 'http://sync/to/path',
|
||||
'created_at': timestamp.internal,
|
||||
'size': 50}, 'http://sync/to/path',
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
@ -1072,7 +1144,8 @@ class TestContainerSync(unittest.TestCase):
|
||||
self.assertFalse(cs.container_sync_row(
|
||||
{'deleted': False,
|
||||
'name': 'object',
|
||||
'created_at': timestamp.internal}, 'http://sync/to/path',
|
||||
'created_at': timestamp.internal,
|
||||
'size': 50}, 'http://sync/to/path',
|
||||
'key', FakeContainerBroker('broker'),
|
||||
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
|
||||
realm, realm_key))
|
||||
@ -1093,7 +1166,8 @@ class TestContainerSync(unittest.TestCase):
|
||||
test_row = {'deleted': False,
|
||||
'name': 'object',
|
||||
'created_at': timestamp.internal,
|
||||
'etag': '1111'}
|
||||
'etag': '1111',
|
||||
'size': 10}
|
||||
test_info = {'account': 'a',
|
||||
'container': 'c',
|
||||
'storage_policy_index': 0}
|
||||
|
Loading…
Reference in New Issue
Block a user