Improved utility to dump and restore memcached data

Utility dumps all known key-value pairs from memcached. Connection
parameters are provided vi config file. Also utility has the following
params:
  --file FILE, -f FILE  File name where to store data
  --restore, -r         Restore data into memcached
  --min-compress-len MIN_COMPRESS_LEN, -m MIN_COMPRESS_LEN
                        The threshold length to kick in auto-compression

Change-Id: Ied6bd2b95cccba42be5a69e13812b2206a0c73b9
This commit is contained in:
Ilya Shakhat
2014-06-03 18:57:34 +04:00
parent 07dcb64ae1
commit 9242d51a08

View File

@@ -16,25 +16,40 @@
import pickle
import sys
import memcache
from oslo.config import cfg
import re
import six
from six.moves.urllib import parse
from stackalytics.openstack.common import log as logging
from stackalytics.processor import config
from stackalytics.processor import runtime_storage
from stackalytics.processor import utils
LOG = logging.getLogger(__name__)
OPTS = [
cfg.BoolOpt('reverse',
cfg.BoolOpt('restore',
short='r',
help='Load data to runtime storage'),
help='Restore data into memcached'),
cfg.StrOpt('file',
short='f',
help='File name'),
help='File name where to store data'),
cfg.StrOpt('min-compress-len', default=0,
short='m',
help='The threshold length to kick in auto-compression'),
]
SINGLE_KEYS = ['module_groups', 'project_types', 'repos', 'releases',
'companies', 'last_update_members_date', 'last_member_index',
'runtime_storage_update_time']
ARRAY_KEYS = ['record', 'user']
BULK_READ_SIZE = 64
MEMCACHED_URI_PREFIX = r'^memcached:\/\/'
def read_records_from_fd(fd):
while True:
try:
@@ -44,31 +59,98 @@ def read_records_from_fd(fd):
yield record
def import_data(runtime_storage_inst, fd):
def store_bucket(memcached_inst, bucket):
res = memcached_inst.set_multi(bucket,
min_compress_len=cfg.CONF.min_compress_len)
if res:
LOG.critical('Failed to set values in memcached: %s', res)
raise Exception('memcached set_multi operation is failed')
def import_data(memcached_inst, fd):
LOG.info('Importing data into memcached')
bucket = {}
count = 0
for record in read_records_from_fd(fd):
count += 1
if len(bucket) < runtime_storage.RECORD_ID_PREFIX:
bucket[record['record_id']] = record
for key, value in read_records_from_fd(fd):
if len(bucket) < BULK_READ_SIZE:
bucket[key] = value
else:
if not runtime_storage_inst.memcached.set_multi(
bucket, key_prefix=runtime_storage.RECORD_ID_PREFIX):
LOG.critical('Failed to set_multi in memcached')
raise Exception('Failed to set_multi in memcached')
store_bucket(memcached_inst, bucket)
bucket = {}
if bucket:
if not runtime_storage_inst.memcached.set_multi(
bucket, key_prefix=runtime_storage.RECORD_ID_PREFIX):
LOG.critical('Failed to set_multi in memcached')
raise Exception('Failed to set_multi in memcached')
runtime_storage_inst._set_record_count(count)
store_bucket(memcached_inst, bucket)
def export_data(runtime_storage_inst, fd):
for record in runtime_storage_inst.get_all_records():
pickle.dump(record, fd)
def get_repo_keys(memcached_inst):
for repo in (memcached_inst.get('repos') or []):
uri = repo['uri']
branches = set(['master'])
for release in repo.get('releases'):
if 'branch' in release:
branches.add(release['branch'])
for branch in branches:
yield 'vcs:' + str(parse.quote_plus(uri) + ':' + branch)
yield 'rcs:' + str(parse.quote_plus(uri) + ':' + branch)
def export_data(memcached_inst, fd):
LOG.info('Exporting data from memcached')
for key in SINGLE_KEYS:
pickle.dump((key, memcached_inst.get(key)), fd)
for key in get_repo_keys(memcached_inst):
pickle.dump((key, memcached_inst.get(key)), fd)
for key in ARRAY_KEYS:
key_count = key + ':count'
count = memcached_inst.get(key_count) or 0
pickle.dump((key_count, memcached_inst.get(key_count)), fd)
key_prefix = key + ':'
for record_id_set in utils.make_range(0, count, BULK_READ_SIZE):
for k, v in six.iteritems(memcached_inst.get_multi(
record_id_set, key_prefix)):
pickle.dump((key_prefix + str(k), v), fd)
for user_seq in range(memcached_inst.get('user:count') or 0):
user = memcached_inst.get('user:%s' % user_seq)
if user:
if user.get('user_id'):
pickle.dump(('user:%s' % user['user_id'], user), fd)
if user.get('launchpad_id'):
pickle.dump(('user:%s' % user['launchpad_id'], user), fd)
for email in user.get('emails') or []:
pickle.dump(('user:%s' % email, user), fd)
def export_data_universal(memcached_inst, fd):
LOG.info('Exporting data from memcached')
slabs = memcached_inst.get_slabs()
for slab_number, slab in six.iteritems(slabs[0][1]):
count = int(slab['number'])
keys = memcached_inst.get_stats(
'cachedump %s %s' % (slab_number, count))[0][1].keys()
n = 0
while n < count:
LOG.debug('Dumping slab %s, start record %s', slab_number, n)
for k, v in six.iteritems(memcached_inst.get_multi(
keys[n: min(count, n + BULK_READ_SIZE)])):
pickle.dump((k, v), fd)
n += BULK_READ_SIZE
def _connect_to_memcached(uri):
stripped = re.sub(MEMCACHED_URI_PREFIX, '', uri)
if stripped:
storage_uri = stripped.split(',')
return memcache.Client(storage_uri)
else:
raise Exception('Invalid storage uri %s' % uri)
def main():
@@ -83,23 +165,22 @@ def main():
logging.setup('stackalytics')
LOG.info('Logging enabled')
runtime_storage_inst = runtime_storage.get_runtime_storage(
cfg.CONF.runtime_storage_uri)
memcached_inst = _connect_to_memcached(cfg.CONF.runtime_storage_uri)
filename = cfg.CONF.file
if cfg.CONF.reverse:
if cfg.CONF.restore:
if filename:
fd = open(filename, 'r')
else:
fd = sys.stdin
import_data(runtime_storage_inst, fd)
import_data(memcached_inst, fd)
else:
if filename:
fd = open(filename, 'w')
else:
fd = sys.stdout
export_data(runtime_storage_inst, fd)
export_data(memcached_inst, fd)
if __name__ == '__main__':