swift/swift/common/memcached.py

319 lines
12 KiB
Python

# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Lucid comes with memcached: v1.4.2. Protocol documentation for that
version is at:
http://github.com/memcached/memcached/blob/1.4.2/doc/protocol.txt
"""
import cPickle as pickle
import logging
import socket
import time
from bisect import bisect
from hashlib import md5
DEFAULT_MEMCACHED_PORT = 11211
CONN_TIMEOUT = 0.3
IO_TIMEOUT = 2.0
PICKLE_FLAG = 1
NODE_WEIGHT = 50
PICKLE_PROTOCOL = 2
TRY_COUNT = 3
# if ERROR_LIMIT_COUNT errors occur in ERROR_LIMIT_TIME seconds, the server
# will be considered failed for ERROR_LIMIT_DURATION seconds.
ERROR_LIMIT_COUNT = 10
ERROR_LIMIT_TIME = 60
ERROR_LIMIT_DURATION = 60
def md5hash(key):
return md5(key).hexdigest()
class MemcacheConnectionError(Exception):
pass
class MemcacheRing(object):
"""
Simple, consistent-hashed memcache client.
"""
def __init__(self, servers, connect_timeout=CONN_TIMEOUT,
io_timeout=IO_TIMEOUT, tries=TRY_COUNT):
self._ring = {}
self._errors = dict(((serv, []) for serv in servers))
self._error_limited = dict(((serv, 0) for serv in servers))
for server in sorted(servers):
for i in xrange(NODE_WEIGHT):
self._ring[md5hash('%s-%s' % (server, i))] = server
self._tries = tries if tries <= len(servers) else len(servers)
self._sorted = sorted(self._ring.keys())
self._client_cache = dict(((server, []) for server in servers))
self._connect_timeout = connect_timeout
self._io_timeout = io_timeout
def _exception_occurred(self, server, e, action='talking'):
if isinstance(e, socket.timeout):
logging.error(_("Timeout %(action)s to memcached: %(server)s"),
{'action': action, 'server': server})
else:
logging.exception(_("Error %(action)s to memcached: %(server)s"),
{'action': action, 'server': server})
now = time.time()
self._errors[server].append(time.time())
if len(self._errors[server]) > ERROR_LIMIT_COUNT:
self._errors[server] = [err for err in self._errors[server]
if err > now - ERROR_LIMIT_TIME]
if len(self._errors[server]) > ERROR_LIMIT_COUNT:
self._error_limited[server] = now + ERROR_LIMIT_DURATION
logging.error(_('Error limiting server %s'), server)
def _get_conns(self, key):
"""
Retrieves a server conn from the pool, or connects a new one.
Chooses the server based on a consistent hash of "key".
"""
pos = bisect(self._sorted, key)
served = []
while len(served) < self._tries:
pos = (pos + 1) % len(self._sorted)
server = self._ring[self._sorted[pos]]
if server in served:
continue
served.append(server)
if self._error_limited[server] > time.time():
continue
try:
fp, sock = self._client_cache[server].pop()
yield server, fp, sock
except IndexError:
try:
if ':' in server:
host, port = server.split(':')
else:
host = server
port = DEFAULT_MEMCACHED_PORT
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.settimeout(self._connect_timeout)
sock.connect((host, int(port)))
sock.settimeout(self._io_timeout)
yield server, sock.makefile(), sock
except Exception, e:
self._exception_occurred(server, e, 'connecting')
def _return_conn(self, server, fp, sock):
""" Returns a server connection to the pool """
self._client_cache[server].append((fp, sock))
def set(self, key, value, serialize=True, timeout=0):
"""
Set a key/value pair in memcache
:param key: key
:param value: value
:param serialize: if True, value is pickled before sending to memcache
:param timeout: ttl in memcache
"""
key = md5hash(key)
if timeout > 0:
timeout += time.time()
flags = 0
if serialize:
value = pickle.dumps(value, PICKLE_PROTOCOL)
flags |= PICKLE_FLAG
for (server, fp, sock) in self._get_conns(key):
try:
sock.sendall('set %s %d %d %s noreply\r\n%s\r\n' % \
(key, flags, timeout, len(value), value))
self._return_conn(server, fp, sock)
return
except Exception, e:
self._exception_occurred(server, e)
def get(self, key):
"""
Gets the object specified by key. It will also unpickle the object
before returning if it is pickled in memcache.
:param key: key
:returns: value of the key in memcache
"""
key = md5hash(key)
value = None
for (server, fp, sock) in self._get_conns(key):
try:
sock.sendall('get %s\r\n' % key)
line = fp.readline().strip().split()
while line[0].upper() != 'END':
if line[0].upper() == 'VALUE' and line[1] == key:
size = int(line[3])
value = fp.read(size)
if int(line[2]) & PICKLE_FLAG:
value = pickle.loads(value)
fp.readline()
line = fp.readline().strip().split()
self._return_conn(server, fp, sock)
return value
except Exception, e:
self._exception_occurred(server, e)
def incr(self, key, delta=1, timeout=0):
"""
Increments a key which has a numeric value by delta.
If the key can't be found, it's added as delta or 0 if delta < 0.
If passed a negative number, will use memcached's decr. Returns
the int stored in memcached
Note: The data memcached stores as the result of incr/decr is
an unsigned int. decr's that result in a number below 0 are
stored as 0.
:param key: key
:param delta: amount to add to the value of key (or set as the value
if the key is not found) will be cast to an int
:param timeout: ttl in memcache
:raises MemcacheConnectionError:
"""
key = md5hash(key)
command = 'incr'
if delta < 0:
command = 'decr'
delta = str(abs(int(delta)))
for (server, fp, sock) in self._get_conns(key):
try:
sock.sendall('%s %s %s\r\n' % (command, key, delta))
line = fp.readline().strip().split()
if line[0].upper() == 'NOT_FOUND':
add_val = delta
if command == 'decr':
add_val = '0'
sock.sendall('add %s %d %d %s\r\n%s\r\n' % \
(key, 0, timeout, len(add_val), add_val))
line = fp.readline().strip().split()
if line[0].upper() == 'NOT_STORED':
sock.sendall('%s %s %s\r\n' % (command, key, delta))
line = fp.readline().strip().split()
ret = int(line[0].strip())
else:
ret = int(add_val)
else:
ret = int(line[0].strip())
self._return_conn(server, fp, sock)
return ret
except Exception, e:
self._exception_occurred(server, e)
raise MemcacheConnectionError("No Memcached connections succeeded.")
def decr(self, key, delta=1, timeout=0):
"""
Decrements a key which has a numeric value by delta. Calls incr with
-delta.
:param key: key
:param delta: amount to subtract to the value of key (or set the
value to 0 if the key is not found) will be cast to
an int
:param timeout: ttl in memcache
:raises MemcacheConnectionError:
"""
self.incr(key, delta=-delta, timeout=timeout)
def delete(self, key):
"""
Deletes a key/value pair from memcache.
:param key: key to be deleted
"""
key = md5hash(key)
for (server, fp, sock) in self._get_conns(key):
try:
sock.sendall('delete %s noreply\r\n' % key)
self._return_conn(server, fp, sock)
return
except Exception, e:
self._exception_occurred(server, e)
def set_multi(self, mapping, server_key, serialize=True, timeout=0):
"""
Sets multiple key/value pairs in memcache.
:param mapping: dictonary of keys and values to be set in memcache
:param servery_key: key to use in determining which server in the ring
is used
:param serialize: if True, value is pickled before sending to memcache
:param timeout: ttl for memcache
"""
server_key = md5hash(server_key)
if timeout > 0:
timeout += time.time()
msg = ''
for key, value in mapping.iteritems():
key = md5hash(key)
flags = 0
if serialize:
value = pickle.dumps(value, PICKLE_PROTOCOL)
flags |= PICKLE_FLAG
msg += ('set %s %d %d %s noreply\r\n%s\r\n' %
(key, flags, timeout, len(value), value))
for (server, fp, sock) in self._get_conns(server_key):
try:
sock.sendall(msg)
self._return_conn(server, fp, sock)
return
except Exception, e:
self._exception_occurred(server, e)
def get_multi(self, keys, server_key):
"""
Gets multiple values from memcache for the given keys.
:param keys: keys for values to be retrieved from memcache
:param servery_key: key to use in determining which server in the ring
is used
:returns: list of values
"""
server_key = md5hash(server_key)
keys = [md5hash(key) for key in keys]
for (server, fp, sock) in self._get_conns(server_key):
try:
sock.sendall('get %s\r\n' % ' '.join(keys))
line = fp.readline().strip().split()
responses = {}
while line[0].upper() != 'END':
if line[0].upper() == 'VALUE':
size = int(line[3])
value = fp.read(size)
if int(line[2]) & PICKLE_FLAG:
value = pickle.loads(value)
responses[line[1]] = value
fp.readline()
line = fp.readline().strip().split()
values = []
for key in keys:
if key in responses:
values.append(responses[key])
else:
values.append(None)
self._return_conn(server, fp, sock)
return values
except Exception, e:
self._exception_occurred(server, e)