From 9242d51a0876b27fdaf49ae274035962a19f75d7 Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Tue, 3 Jun 2014 18:57:34 +0400 Subject: [PATCH] Improved utility to dump and restore memcached data Utility dumps all known key-value pairs from memcached. Connection parameters are provided vi config file. Also utility has the following params: --file FILE, -f FILE File name where to store data --restore, -r Restore data into memcached --min-compress-len MIN_COMPRESS_LEN, -m MIN_COMPRESS_LEN The threshold length to kick in auto-compression Change-Id: Ied6bd2b95cccba42be5a69e13812b2206a0c73b9 --- stackalytics/processor/dump.py | 137 ++++++++++++++++++++++++++------- 1 file changed, 109 insertions(+), 28 deletions(-) diff --git a/stackalytics/processor/dump.py b/stackalytics/processor/dump.py index c5b467c92..9d00f4301 100644 --- a/stackalytics/processor/dump.py +++ b/stackalytics/processor/dump.py @@ -16,25 +16,40 @@ import pickle import sys +import memcache from oslo.config import cfg +import re +import six +from six.moves.urllib import parse from stackalytics.openstack.common import log as logging from stackalytics.processor import config -from stackalytics.processor import runtime_storage +from stackalytics.processor import utils LOG = logging.getLogger(__name__) OPTS = [ - cfg.BoolOpt('reverse', + cfg.BoolOpt('restore', short='r', - help='Load data to runtime storage'), + help='Restore data into memcached'), cfg.StrOpt('file', short='f', - help='File name'), + help='File name where to store data'), + cfg.StrOpt('min-compress-len', default=0, + short='m', + help='The threshold length to kick in auto-compression'), ] +SINGLE_KEYS = ['module_groups', 'project_types', 'repos', 'releases', + 'companies', 'last_update_members_date', 'last_member_index', + 'runtime_storage_update_time'] +ARRAY_KEYS = ['record', 'user'] +BULK_READ_SIZE = 64 +MEMCACHED_URI_PREFIX = r'^memcached:\/\/' + + def read_records_from_fd(fd): while True: try: @@ -44,31 +59,98 @@ def read_records_from_fd(fd): yield record -def import_data(runtime_storage_inst, fd): +def store_bucket(memcached_inst, bucket): + res = memcached_inst.set_multi(bucket, + min_compress_len=cfg.CONF.min_compress_len) + if res: + LOG.critical('Failed to set values in memcached: %s', res) + raise Exception('memcached set_multi operation is failed') + + +def import_data(memcached_inst, fd): + LOG.info('Importing data into memcached') bucket = {} - count = 0 - for record in read_records_from_fd(fd): - count += 1 - if len(bucket) < runtime_storage.RECORD_ID_PREFIX: - bucket[record['record_id']] = record + for key, value in read_records_from_fd(fd): + if len(bucket) < BULK_READ_SIZE: + bucket[key] = value else: - if not runtime_storage_inst.memcached.set_multi( - bucket, key_prefix=runtime_storage.RECORD_ID_PREFIX): - LOG.critical('Failed to set_multi in memcached') - raise Exception('Failed to set_multi in memcached') + store_bucket(memcached_inst, bucket) bucket = {} if bucket: - if not runtime_storage_inst.memcached.set_multi( - bucket, key_prefix=runtime_storage.RECORD_ID_PREFIX): - LOG.critical('Failed to set_multi in memcached') - raise Exception('Failed to set_multi in memcached') - - runtime_storage_inst._set_record_count(count) + store_bucket(memcached_inst, bucket) -def export_data(runtime_storage_inst, fd): - for record in runtime_storage_inst.get_all_records(): - pickle.dump(record, fd) +def get_repo_keys(memcached_inst): + for repo in (memcached_inst.get('repos') or []): + uri = repo['uri'] + branches = set(['master']) + for release in repo.get('releases'): + if 'branch' in release: + branches.add(release['branch']) + + for branch in branches: + yield 'vcs:' + str(parse.quote_plus(uri) + ':' + branch) + yield 'rcs:' + str(parse.quote_plus(uri) + ':' + branch) + + +def export_data(memcached_inst, fd): + LOG.info('Exporting data from memcached') + + for key in SINGLE_KEYS: + pickle.dump((key, memcached_inst.get(key)), fd) + + for key in get_repo_keys(memcached_inst): + pickle.dump((key, memcached_inst.get(key)), fd) + + for key in ARRAY_KEYS: + key_count = key + ':count' + count = memcached_inst.get(key_count) or 0 + pickle.dump((key_count, memcached_inst.get(key_count)), fd) + + key_prefix = key + ':' + + for record_id_set in utils.make_range(0, count, BULK_READ_SIZE): + for k, v in six.iteritems(memcached_inst.get_multi( + record_id_set, key_prefix)): + pickle.dump((key_prefix + str(k), v), fd) + + for user_seq in range(memcached_inst.get('user:count') or 0): + user = memcached_inst.get('user:%s' % user_seq) + if user: + if user.get('user_id'): + pickle.dump(('user:%s' % user['user_id'], user), fd) + if user.get('launchpad_id'): + pickle.dump(('user:%s' % user['launchpad_id'], user), fd) + for email in user.get('emails') or []: + pickle.dump(('user:%s' % email, user), fd) + + +def export_data_universal(memcached_inst, fd): + LOG.info('Exporting data from memcached') + slabs = memcached_inst.get_slabs() + for slab_number, slab in six.iteritems(slabs[0][1]): + count = int(slab['number']) + keys = memcached_inst.get_stats( + 'cachedump %s %s' % (slab_number, count))[0][1].keys() + + n = 0 + while n < count: + LOG.debug('Dumping slab %s, start record %s', slab_number, n) + + for k, v in six.iteritems(memcached_inst.get_multi( + keys[n: min(count, n + BULK_READ_SIZE)])): + pickle.dump((k, v), fd) + + n += BULK_READ_SIZE + + +def _connect_to_memcached(uri): + stripped = re.sub(MEMCACHED_URI_PREFIX, '', uri) + if stripped: + storage_uri = stripped.split(',') + return memcache.Client(storage_uri) + else: + raise Exception('Invalid storage uri %s' % uri) def main(): @@ -83,23 +165,22 @@ def main(): logging.setup('stackalytics') LOG.info('Logging enabled') - runtime_storage_inst = runtime_storage.get_runtime_storage( - cfg.CONF.runtime_storage_uri) + memcached_inst = _connect_to_memcached(cfg.CONF.runtime_storage_uri) filename = cfg.CONF.file - if cfg.CONF.reverse: + if cfg.CONF.restore: if filename: fd = open(filename, 'r') else: fd = sys.stdin - import_data(runtime_storage_inst, fd) + import_data(memcached_inst, fd) else: if filename: fd = open(filename, 'w') else: fd = sys.stdout - export_data(runtime_storage_inst, fd) + export_data(memcached_inst, fd) if __name__ == '__main__':