Merge "Improved utility to dump and restore memcached data"

This commit is contained in:
Jenkins
2014-06-04 12:14:02 +00:00
committed by Gerrit Code Review

View File

@@ -16,25 +16,40 @@
import pickle
import sys
import memcache
from oslo.config import cfg
import re
import six
from six.moves.urllib import parse
from stackalytics.openstack.common import log as logging
from stackalytics.processor import config
from stackalytics.processor import runtime_storage
from stackalytics.processor import utils
LOG = logging.getLogger(__name__)
OPTS = [
cfg.BoolOpt('reverse',
cfg.BoolOpt('restore',
short='r',
help='Load data to runtime storage'),
help='Restore data into memcached'),
cfg.StrOpt('file',
short='f',
help='File name'),
help='File name where to store data'),
cfg.StrOpt('min-compress-len', default=0,
short='m',
help='The threshold length to kick in auto-compression'),
]
SINGLE_KEYS = ['module_groups', 'project_types', 'repos', 'releases',
'companies', 'last_update_members_date', 'last_member_index',
'runtime_storage_update_time']
ARRAY_KEYS = ['record', 'user']
BULK_READ_SIZE = 64
MEMCACHED_URI_PREFIX = r'^memcached:\/\/'
def read_records_from_fd(fd):
while True:
try:
@@ -44,31 +59,98 @@ def read_records_from_fd(fd):
yield record
def import_data(runtime_storage_inst, fd):
def store_bucket(memcached_inst, bucket):
res = memcached_inst.set_multi(bucket,
min_compress_len=cfg.CONF.min_compress_len)
if res:
LOG.critical('Failed to set values in memcached: %s', res)
raise Exception('memcached set_multi operation is failed')
def import_data(memcached_inst, fd):
LOG.info('Importing data into memcached')
bucket = {}
count = 0
for record in read_records_from_fd(fd):
count += 1
if len(bucket) < runtime_storage.RECORD_ID_PREFIX:
bucket[record['record_id']] = record
for key, value in read_records_from_fd(fd):
if len(bucket) < BULK_READ_SIZE:
bucket[key] = value
else:
if not runtime_storage_inst.memcached.set_multi(
bucket, key_prefix=runtime_storage.RECORD_ID_PREFIX):
LOG.critical('Failed to set_multi in memcached')
raise Exception('Failed to set_multi in memcached')
store_bucket(memcached_inst, bucket)
bucket = {}
if bucket:
if not runtime_storage_inst.memcached.set_multi(
bucket, key_prefix=runtime_storage.RECORD_ID_PREFIX):
LOG.critical('Failed to set_multi in memcached')
raise Exception('Failed to set_multi in memcached')
runtime_storage_inst._set_record_count(count)
store_bucket(memcached_inst, bucket)
def export_data(runtime_storage_inst, fd):
for record in runtime_storage_inst.get_all_records():
pickle.dump(record, fd)
def get_repo_keys(memcached_inst):
for repo in (memcached_inst.get('repos') or []):
uri = repo['uri']
branches = set(['master'])
for release in repo.get('releases'):
if 'branch' in release:
branches.add(release['branch'])
for branch in branches:
yield 'vcs:' + str(parse.quote_plus(uri) + ':' + branch)
yield 'rcs:' + str(parse.quote_plus(uri) + ':' + branch)
def export_data(memcached_inst, fd):
LOG.info('Exporting data from memcached')
for key in SINGLE_KEYS:
pickle.dump((key, memcached_inst.get(key)), fd)
for key in get_repo_keys(memcached_inst):
pickle.dump((key, memcached_inst.get(key)), fd)
for key in ARRAY_KEYS:
key_count = key + ':count'
count = memcached_inst.get(key_count) or 0
pickle.dump((key_count, memcached_inst.get(key_count)), fd)
key_prefix = key + ':'
for record_id_set in utils.make_range(0, count, BULK_READ_SIZE):
for k, v in six.iteritems(memcached_inst.get_multi(
record_id_set, key_prefix)):
pickle.dump((key_prefix + str(k), v), fd)
for user_seq in range(memcached_inst.get('user:count') or 0):
user = memcached_inst.get('user:%s' % user_seq)
if user:
if user.get('user_id'):
pickle.dump(('user:%s' % user['user_id'], user), fd)
if user.get('launchpad_id'):
pickle.dump(('user:%s' % user['launchpad_id'], user), fd)
for email in user.get('emails') or []:
pickle.dump(('user:%s' % email, user), fd)
def export_data_universal(memcached_inst, fd):
LOG.info('Exporting data from memcached')
slabs = memcached_inst.get_slabs()
for slab_number, slab in six.iteritems(slabs[0][1]):
count = int(slab['number'])
keys = memcached_inst.get_stats(
'cachedump %s %s' % (slab_number, count))[0][1].keys()
n = 0
while n < count:
LOG.debug('Dumping slab %s, start record %s', slab_number, n)
for k, v in six.iteritems(memcached_inst.get_multi(
keys[n: min(count, n + BULK_READ_SIZE)])):
pickle.dump((k, v), fd)
n += BULK_READ_SIZE
def _connect_to_memcached(uri):
stripped = re.sub(MEMCACHED_URI_PREFIX, '', uri)
if stripped:
storage_uri = stripped.split(',')
return memcache.Client(storage_uri)
else:
raise Exception('Invalid storage uri %s' % uri)
def main():
@@ -83,23 +165,22 @@ def main():
logging.setup('stackalytics')
LOG.info('Logging enabled')
runtime_storage_inst = runtime_storage.get_runtime_storage(
cfg.CONF.runtime_storage_uri)
memcached_inst = _connect_to_memcached(cfg.CONF.runtime_storage_uri)
filename = cfg.CONF.file
if cfg.CONF.reverse:
if cfg.CONF.restore:
if filename:
fd = open(filename, 'r')
else:
fd = sys.stdin
import_data(runtime_storage_inst, fd)
import_data(memcached_inst, fd)
else:
if filename:
fd = open(filename, 'w')
else:
fd = sys.stdout
export_data(runtime_storage_inst, fd)
export_data(memcached_inst, fd)
if __name__ == '__main__':