Added error handling for memcached operations

closes bug 1314558

Change-Id: I1cea329176924883c06579f320657a065fee0873
This commit is contained in:
pkholkin
2014-05-07 16:15:22 +04:00
parent ef2ea873d7
commit bbe1d302b6
2 changed files with 43 additions and 33 deletions

View File

@@ -52,12 +52,16 @@ def import_data(runtime_storage_inst, fd):
if len(bucket) < runtime_storage.RECORD_ID_PREFIX: if len(bucket) < runtime_storage.RECORD_ID_PREFIX:
bucket[record['record_id']] = record bucket[record['record_id']] = record
else: else:
runtime_storage_inst.memcached.set_multi( if not runtime_storage_inst.memcached.set_multi(
bucket, key_prefix=runtime_storage.RECORD_ID_PREFIX) bucket, key_prefix=runtime_storage.RECORD_ID_PREFIX):
LOG.critical('Failed to set_multi in memcached')
raise Exception('Failed to set_multi in memcached')
bucket = {} bucket = {}
if bucket: if bucket:
runtime_storage_inst.memcached.set_multi( if not runtime_storage_inst.memcached.set_multi(
bucket, key_prefix=runtime_storage.RECORD_ID_PREFIX) bucket, key_prefix=runtime_storage.RECORD_ID_PREFIX):
LOG.critical('Failed to set_multi in memcached')
raise Exception('Failed to set_multi in memcached')
runtime_storage_inst._set_record_count(count) runtime_storage_inst._set_record_count(count)

View File

@@ -83,22 +83,21 @@ class MemcachedStorage(RuntimeStorage):
if not merge_handler: if not merge_handler:
record['record_id'] = record_id record['record_id'] = record_id
LOG.debug('Update record %s', record) LOG.debug('Update record %s', record)
self.memcached.set(self._get_record_name(record_id), self.set_by_key(self._get_record_name(record_id), record)
record)
else: else:
original = self.memcached.get(self._get_record_name( original = self.get_by_key(self._get_record_name(
record_id)) record_id))
if merge_handler(original, record): if merge_handler(original, record):
LOG.debug('Update record with merge %s', record) LOG.debug('Update record with merge %s', record)
self.memcached.set(self._get_record_name(record_id), self.set_by_key(self._get_record_name(record_id),
original) original)
else: else:
# insert record # insert record
record_id = self._get_record_count() record_id = self._get_record_count()
record['record_id'] = record_id record['record_id'] = record_id
self.record_index[record['primary_key']] = record_id self.record_index[record['primary_key']] = record_id
LOG.debug('Insert new record %s', record) LOG.debug('Insert new record %s', record)
self.memcached.set(self._get_record_name(record_id), record) self.set_by_key(self._get_record_name(record_id), record)
self._set_record_count(record_id + 1) self._set_record_count(record_id + 1)
self._commit_update(record_id) self._commit_update(record_id)
@@ -109,7 +108,7 @@ class MemcachedStorage(RuntimeStorage):
continue continue
record_id = self.record_index[correction['primary_key']] record_id = self.record_index[correction['primary_key']]
original = self.memcached.get(self._get_record_name(record_id)) original = self.get_by_key(self._get_record_name(record_id))
need_update = False need_update = False
for field, value in six.iteritems(correction): for field, value in six.iteritems(correction):
@@ -118,7 +117,7 @@ class MemcachedStorage(RuntimeStorage):
original[field] = value original[field] = value
if need_update: if need_update:
self.memcached.set(self._get_record_name(record_id), original) self.set_by_key(self._get_record_name(record_id), original)
self._commit_update(record_id) self._commit_update(record_id)
def inc_user_count(self): def inc_user_count(self):
@@ -134,16 +133,22 @@ class MemcachedStorage(RuntimeStorage):
return self.memcached.get(key.encode('utf8')) return self.memcached.get(key.encode('utf8'))
def set_by_key(self, key, value): def set_by_key(self, key, value):
self.memcached.set(key.encode('utf8'), value) if not self.memcached.set(key.encode('utf8'), value):
LOG.critical('Failed to store data in memcached: '
'key %(key)s, value %(value)s',
{'key': key, 'value': value})
raise Exception('Memcached set failed')
def delete_by_key(self, key): def delete_by_key(self, key):
self.memcached.delete(key.encode('utf8')) if not self.memcached.delete(key.encode('utf8')):
LOG.critical('Failed to delete data from memcached: key %s', key)
raise Exception('Memcached delete failed')
def get_update(self, pid): def get_update(self, pid):
last_update = self.memcached.get('pid:%s' % pid) last_update = self.get_by_key('pid:%s' % pid)
update_count = self._get_update_count() update_count = self._get_update_count()
self.memcached.set('pid:%s' % pid, update_count) self.set_by_key('pid:%s' % pid, update_count)
self._set_pids(pid) self._set_pids(pid)
if not last_update: if not last_update:
@@ -159,23 +164,23 @@ class MemcachedStorage(RuntimeStorage):
yield i yield i
def active_pids(self, pids): def active_pids(self, pids):
stored_pids = self.memcached.get('pids') or set() stored_pids = self.get_by_key('pids') or set()
for pid in stored_pids: for pid in stored_pids:
if pid not in pids: if pid not in pids:
LOG.debug('Purge dead uwsgi pid %s from pids list', pid) LOG.debug('Purge dead uwsgi pid %s from pids list', pid)
self.memcached.delete('pid:%s' % pid) self.delete_by_key('pid:%s' % pid)
self.memcached.set('pids', pids) self.set_by_key('pids', pids)
# remove unneeded updates # remove unneeded updates
min_update = self._get_update_count() min_update = self._get_update_count()
for pid in pids: for pid in pids:
n = self.memcached.get('pid:%s' % pid) n = self.get_by_key('pid:%s' % pid)
if n: if n:
if n < min_update: if n < min_update:
min_update = n min_update = n
first_valid_update = self.memcached.get('first_valid_update') or 0 first_valid_update = self.get_by_key('first_valid_update') or 0
LOG.debug('Purge polled updates from %(first)s to %(min)s', LOG.debug('Purge polled updates from %(first)s to %(min)s',
{'first': first_valid_update, 'min': min_update}) {'first': first_valid_update, 'min': min_update})
@@ -183,28 +188,29 @@ class MemcachedStorage(RuntimeStorage):
BULK_DELETE_SIZE): BULK_DELETE_SIZE):
if not self.memcached.delete_multi(delete_id_set, if not self.memcached.delete_multi(delete_id_set,
key_prefix=UPDATE_ID_PREFIX): key_prefix=UPDATE_ID_PREFIX):
raise Exception('Failed to delete from memcache') LOG.critical('Failed to delete_multi from memcached')
raise Exception('Failed to delete_multi from memcached')
self.memcached.set('first_valid_update', min_update) self.set_by_key('first_valid_update', min_update)
def _get_update_count(self): def _get_update_count(self):
return self.memcached.get('update:count') or 0 return self.get_by_key('update:count') or 0
def _set_pids(self, pid): def _set_pids(self, pid):
pids = self.memcached.get('pids') or set() pids = self.get_by_key('pids') or set()
if pid in pids: if pid in pids:
return return
pids.add(pid) pids.add(pid)
self.memcached.set('pids', pids) self.set_by_key('pids', pids)
def _get_record_name(self, record_id): def _get_record_name(self, record_id):
return RECORD_ID_PREFIX + str(record_id) return RECORD_ID_PREFIX + str(record_id)
def _get_record_count(self): def _get_record_count(self):
return self.memcached.get('record:count') or 0 return self.get_by_key('record:count') or 0
def _set_record_count(self, count): def _set_record_count(self, count):
self.memcached.set('record:count', count) self.set_by_key('record:count', count)
def get_all_records(self): def get_all_records(self):
for record_id_set in utils.make_range(0, self._get_record_count(), for record_id_set in utils.make_range(0, self._get_record_count(),
@@ -215,16 +221,16 @@ class MemcachedStorage(RuntimeStorage):
def _commit_update(self, record_id): def _commit_update(self, record_id):
count = self._get_update_count() count = self._get_update_count()
self.memcached.set(UPDATE_ID_PREFIX + str(count), record_id) self.set_by_key(UPDATE_ID_PREFIX + str(count), record_id)
self.memcached.set('update:count', count + 1) self.set_by_key('update:count', count + 1)
def _init_user_count(self): def _init_user_count(self):
if not self.memcached.get('user:count'): if not self.get_by_key('user:count'):
self.memcached.set('user:count', 1) self.set_by_key('user:count', 1)
def get_runtime_storage(uri): def get_runtime_storage(uri):
LOG.debug('Runtime storage is requested for uri %s' % uri) LOG.debug('Runtime storage is requested for uri %s', uri)
match = re.search(MEMCACHED_URI_PREFIX, uri) match = re.search(MEMCACHED_URI_PREFIX, uri)
if match: if match:
return MemcachedStorage(uri) return MemcachedStorage(uri)