186 lines
6.0 KiB
Python
186 lines
6.0 KiB
Python
# Copyright (c) 2013 Mirantis Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import pickle
|
|
import re
|
|
import sys
|
|
|
|
import memcache
|
|
from oslo_config import cfg
|
|
from oslo_log import log as logging
|
|
import six
|
|
|
|
from stackalytics.processor import config
|
|
from stackalytics.processor import utils
|
|
|
|
|
|
CONF = cfg.CONF
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
OPTS = [
|
|
cfg.BoolOpt('restore',
|
|
short='r',
|
|
help='Restore data into memcached'),
|
|
cfg.StrOpt('file',
|
|
short='f',
|
|
help='The name of file to store data'),
|
|
cfg.StrOpt('min-compress-len', default=0,
|
|
short='m',
|
|
help='The threshold length to kick in auto-compression'),
|
|
]
|
|
|
|
|
|
SINGLE_KEYS = ['module_groups', 'project_types', 'repos', 'releases',
|
|
'companies', 'last_update_members_date', 'last_member_index',
|
|
'runtime_storage_update_time']
|
|
ARRAY_KEYS = ['record', 'user']
|
|
BULK_READ_SIZE = 64
|
|
MEMCACHED_URI_PREFIX = r'^memcached:\/\/'
|
|
|
|
|
|
def read_records_from_fd(fd):
|
|
while True:
|
|
try:
|
|
record = pickle.load(fd)
|
|
except EOFError:
|
|
break
|
|
yield record
|
|
|
|
|
|
def store_bucket(memcached_inst, bucket):
|
|
LOG.debug('Store bucket of records into memcached')
|
|
res = memcached_inst.set_multi(bucket,
|
|
min_compress_len=CONF.min_compress_len)
|
|
if res:
|
|
LOG.critical('Failed to set values in memcached: %s', res)
|
|
raise Exception('memcached set_multi operation is failed')
|
|
|
|
|
|
def import_data(memcached_inst, fd):
|
|
LOG.info('Importing data into memcached')
|
|
bucket = {}
|
|
for key, value in read_records_from_fd(fd):
|
|
LOG.debug('Reading record key %s, value %s', key, value)
|
|
if len(bucket) == BULK_READ_SIZE:
|
|
store_bucket(memcached_inst, bucket)
|
|
bucket = {}
|
|
bucket[key] = value
|
|
if bucket:
|
|
store_bucket(memcached_inst, bucket)
|
|
LOG.info('Importing data into memcached was completed')
|
|
|
|
|
|
def get_repo_keys(memcached_inst):
|
|
for repo in (memcached_inst.get('repos') or []):
|
|
uri = repo['uri']
|
|
quoted_uri = six.moves.urllib.parse.quote_plus(uri)
|
|
|
|
yield 'bug_modified_since-%s' % repo['module']
|
|
|
|
branches = {repo.get('default_branch', 'master')}
|
|
for release in repo.get('releases'):
|
|
if 'branch' in release:
|
|
branches.add(release['branch'])
|
|
|
|
for branch in branches:
|
|
yield 'vcs:%s:%s' % (quoted_uri, branch)
|
|
yield 'rcs:%s:%s' % (quoted_uri, branch)
|
|
|
|
|
|
def export_data(memcached_inst, fd):
|
|
LOG.info('Exporting data from memcached')
|
|
|
|
for key in SINGLE_KEYS:
|
|
pickle.dump((key, memcached_inst.get(key)), fd)
|
|
|
|
for key in get_repo_keys(memcached_inst):
|
|
pickle.dump((key, memcached_inst.get(key)), fd)
|
|
|
|
for key in ARRAY_KEYS:
|
|
key_count = key + ':count'
|
|
count = memcached_inst.get(key_count) or 0
|
|
pickle.dump((key_count, memcached_inst.get(key_count)), fd)
|
|
|
|
key_prefix = key + ':'
|
|
|
|
for record_id_set in utils.make_range(0, count + 1, BULK_READ_SIZE):
|
|
# memcache limits the size of returned data to specific yet unknown
|
|
# chunk size, the code should verify that all requested records are
|
|
# returned an be able to fall back to one-by-one retrieval
|
|
|
|
chunk = memcached_inst.get_multi(record_id_set, key_prefix)
|
|
if len(chunk) < len(record_id_set):
|
|
# retrieve one-by-one
|
|
for record_id in record_id_set:
|
|
key = key_prefix + str(record_id)
|
|
pickle.dump((key, memcached_inst.get(key)), fd)
|
|
else:
|
|
# dump the whole chunk
|
|
for k, v in six.iteritems(chunk):
|
|
pickle.dump((key_prefix + str(k), v), fd)
|
|
|
|
for user_seq in range((memcached_inst.get('user:count') or 0) + 1):
|
|
user = memcached_inst.get('user:%s' % user_seq)
|
|
if user:
|
|
if user.get('user_id'):
|
|
pickle.dump((('user:%s' % user['user_id']).encode('utf8'),
|
|
user), fd)
|
|
if user.get('launchpad_id'):
|
|
pickle.dump(('user:%s' % user['launchpad_id'], user), fd)
|
|
for hostname, ids in user.get('gerrit_ids', {}).items():
|
|
for gerrit_id in ids:
|
|
pickle.dump(('user:gerrit:%s:%s' % (hostname, gerrit_id),
|
|
user),
|
|
fd)
|
|
if user.get('member_id'):
|
|
pickle.dump(('user:member:%s' % user['member_id'], user), fd)
|
|
for email in user.get('emails') or []:
|
|
pickle.dump((('user:%s' % email).encode('utf8'), user), fd)
|
|
LOG.info('Exporting data from memcached was completed')
|
|
|
|
|
|
def _connect_to_memcached(uri):
|
|
stripped = re.sub(MEMCACHED_URI_PREFIX, '', uri)
|
|
if stripped:
|
|
storage_uri = stripped.split(',')
|
|
return memcache.Client(storage_uri)
|
|
else:
|
|
raise Exception('Invalid storage uri %s' % uri)
|
|
|
|
|
|
def main():
|
|
utils.init_config_and_logging(config.CONNECTION_OPTS + OPTS)
|
|
|
|
memcached_inst = _connect_to_memcached(CONF.runtime_storage_uri)
|
|
|
|
filename = CONF.file
|
|
|
|
if CONF.restore:
|
|
if filename:
|
|
fd = open(filename, 'r')
|
|
else:
|
|
fd = sys.stdin
|
|
import_data(memcached_inst, fd)
|
|
else:
|
|
if filename:
|
|
fd = open(filename, 'w')
|
|
else:
|
|
fd = sys.stdout
|
|
export_data(memcached_inst, fd)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|