merge to trunk

This commit is contained in:
David Goetz
2011-03-03 13:48:38 -08:00
19 changed files with 799 additions and 678 deletions

View File

@@ -215,7 +215,7 @@ Configuring each node
Sample configuration files are provided with all defaults in line-by-line comments.
#. If your going to use the DevAuth (the default swift-auth-server), create
#. If you're going to use the DevAuth (the default swift-auth-server), create
`/etc/swift/auth-server.conf` (you can skip this if you're going to use
Swauth)::

View File

@@ -86,6 +86,8 @@ class AccountController(object):
return Response(status='507 %s is not mounted' % drive)
broker = self._get_account_broker(drive, part, account)
if container: # put account container
if 'x-cf-trans-id' in req.headers:
broker.pending_timeout = 3
if req.headers.get('x-account-override-deleted', 'no').lower() != \
'yes' and broker.is_deleted():
return HTTPNotFound(request=req)
@@ -138,6 +140,9 @@ class AccountController(object):
if self.mount_check and not check_mount(self.root, drive):
return Response(status='507 %s is not mounted' % drive)
broker = self._get_account_broker(drive, part, account)
if not container:
broker.pending_timeout = 0.1
broker.stale_reads_ok = True
if broker.is_deleted():
return HTTPNotFound(request=req)
info = broker.get_info()
@@ -166,6 +171,8 @@ class AccountController(object):
if self.mount_check and not check_mount(self.root, drive):
return Response(status='507 %s is not mounted' % drive)
broker = self._get_account_broker(drive, part, account)
broker.pending_timeout = 0.1
broker.stale_reads_ok = True
if broker.is_deleted():
return HTTPNotFound(request=req)
info = broker.get_info()

View File

@@ -27,14 +27,13 @@ import cPickle as pickle
import errno
from random import randint
from tempfile import mkstemp
import traceback
from eventlet import sleep
import simplejson as json
import sqlite3
from swift.common.utils import normalize_timestamp, renamer, \
mkdirs, lock_parent_directory
mkdirs, lock_parent_directory, fallocate
from swift.common.exceptions import LockTimeout
@@ -42,9 +41,8 @@ from swift.common.exceptions import LockTimeout
BROKER_TIMEOUT = 25
#: Pickle protocol to use
PICKLE_PROTOCOL = 2
CONNECT_ATTEMPTS = 4
PENDING_COMMIT_TIMEOUT = 900
AUTOCHECKPOINT = 8192
#: Max number of pending entries
PENDING_CAP = 131072
class DatabaseConnectionError(sqlite3.DatabaseError):
@@ -125,48 +123,48 @@ def get_db_connection(path, timeout=30, okay_to_create=False):
:param okay_to_create: if True, create the DB if it doesn't exist
:returns: DB connection object
"""
# retry logic to address:
# http://www.mail-archive.com/sqlite-users@sqlite.org/msg57092.html
for attempt in xrange(CONNECT_ATTEMPTS):
try:
connect_time = time.time()
conn = sqlite3.connect(path, check_same_thread=False,
factory=GreenDBConnection, timeout=timeout)
try:
connect_time = time.time()
conn = sqlite3.connect(path, check_same_thread=False,
factory=GreenDBConnection, timeout=timeout)
if path != ':memory:' and not okay_to_create:
# attempt to detect and fail when connect creates the db file
if path != ':memory:' and not okay_to_create:
stat = os.stat(path)
if stat.st_size == 0 and stat.st_ctime >= connect_time:
os.unlink(path)
raise DatabaseConnectionError(path,
'DB file created by connect?')
conn.execute('PRAGMA journal_mode = WAL')
conn.execute('PRAGMA synchronous = NORMAL')
conn.execute('PRAGMA wal_autocheckpoint = %s' % AUTOCHECKPOINT)
conn.execute('PRAGMA count_changes = OFF')
conn.execute('PRAGMA temp_store = MEMORY')
conn.create_function('chexor', 3, chexor)
conn.row_factory = sqlite3.Row
conn.text_factory = str
return conn
except sqlite3.DatabaseError, e:
errstr = traceback.format_exc()
raise DatabaseConnectionError(path, errstr, timeout=timeout)
stat = os.stat(path)
if stat.st_size == 0 and stat.st_ctime >= connect_time:
os.unlink(path)
raise DatabaseConnectionError(path,
'DB file created by connect?')
conn.row_factory = sqlite3.Row
conn.text_factory = str
conn.execute('PRAGMA synchronous = NORMAL')
conn.execute('PRAGMA count_changes = OFF')
conn.execute('PRAGMA temp_store = MEMORY')
conn.execute('PRAGMA journal_mode = DELETE')
conn.create_function('chexor', 3, chexor)
except sqlite3.DatabaseError:
import traceback
raise DatabaseConnectionError(path, traceback.format_exc(),
timeout=timeout)
return conn
class DatabaseBroker(object):
"""Encapsulates working with a database."""
def __init__(self, db_file, timeout=BROKER_TIMEOUT, logger=None,
account=None, container=None):
account=None, container=None, pending_timeout=10,
stale_reads_ok=False):
""" Encapsulates working with a database. """
self.conn = None
self.db_file = db_file
self.pending_file = self.db_file + '.pending'
self.pending_timeout = pending_timeout
self.stale_reads_ok = stale_reads_ok
self.db_dir = os.path.dirname(db_file)
self.timeout = timeout
self.logger = logger or logging.getLogger()
self.account = account
self.container = container
self._db_version = -1
def initialize(self, put_timestamp=None):
"""
@@ -235,7 +233,7 @@ class DatabaseBroker(object):
conn.close()
with open(tmp_db_file, 'r+b') as fp:
os.fsync(fp.fileno())
with lock_parent_directory(self.db_file, self.timeout):
with lock_parent_directory(self.db_file, self.pending_timeout):
if os.path.exists(self.db_file):
# It's as if there was a "condition" where different parts
# of the system were "racing" each other.
@@ -287,7 +285,6 @@ class DatabaseBroker(object):
self.conn = None
orig_isolation_level = conn.isolation_level
conn.isolation_level = None
conn.execute('PRAGMA journal_mode = DELETE') # remove journal files
conn.execute('BEGIN IMMEDIATE')
try:
yield True
@@ -295,7 +292,6 @@ class DatabaseBroker(object):
pass
try:
conn.execute('ROLLBACK')
conn.execute('PRAGMA journal_mode = WAL') # back to WAL mode
conn.isolation_level = orig_isolation_level
self.conn = conn
except Exception:
@@ -352,6 +348,11 @@ class DatabaseBroker(object):
:param count: number to get
:returns: list of objects between start and end
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn:
curs = conn.execute('''
SELECT * FROM %s WHERE ROWID > ? ORDER BY ROWID ASC LIMIT ?
@@ -400,7 +401,11 @@ class DatabaseBroker(object):
:returns: dict containing keys: hash, id, created_at, put_timestamp,
delete_timestamp, count, max_row, and metadata
"""
self._commit_puts()
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
query_part1 = '''
SELECT hash, id, created_at, put_timestamp, delete_timestamp,
%s_count AS count,
@@ -450,6 +455,34 @@ class DatabaseBroker(object):
(rec['sync_point'], rec['remote_id']))
conn.commit()
def _preallocate(self):
"""
The idea is to allocate space in front of an expanding db. If it gets
within 512k of a boundary, it allocates to the next boundary.
Boundaries are 2m, 5m, 10m, 25m, 50m, then every 50m after.
"""
if self.db_file == ':memory:':
return
MB = (1024 * 1024)
def prealloc_points():
for pm in (1, 2, 5, 10, 25, 50):
yield pm * MB
while True:
pm += 50
yield pm * MB
stat = os.stat(self.db_file)
file_size = stat.st_size
allocated_size = stat.st_blocks * 512
for point in prealloc_points():
if file_size <= point - MB / 2:
prealloc_size = point
break
if allocated_size < prealloc_size:
with open(self.db_file, 'rb+') as fp:
fallocate(fp.fileno(), int(prealloc_size))
@property
def metadata(self):
"""
@@ -574,7 +607,7 @@ class ContainerBroker(DatabaseBroker):
conn.executescript("""
CREATE TABLE object (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
name TEXT UNIQUE,
created_at TEXT,
size INTEGER,
content_type TEXT,
@@ -582,7 +615,7 @@ class ContainerBroker(DatabaseBroker):
deleted INTEGER DEFAULT 0
);
CREATE INDEX ix_object_deleted_name ON object (deleted, name);
CREATE INDEX ix_object_deleted ON object (deleted);
CREATE TRIGGER object_insert AFTER INSERT ON object
BEGIN
@@ -645,15 +678,6 @@ class ContainerBroker(DatabaseBroker):
''', (self.account, self.container, normalize_timestamp(time.time()),
str(uuid4()), put_timestamp))
def _get_db_version(self, conn):
if self._db_version == -1:
self._db_version = 0
for row in conn.execute('''
SELECT name FROM sqlite_master
WHERE name = 'ix_object_deleted_name' '''):
self._db_version = 1
return self._db_version
def _newid(self, conn):
conn.execute('''
UPDATE container_stat
@@ -693,6 +717,11 @@ class ContainerBroker(DatabaseBroker):
:returns: True if the database has no active objects, False otherwise
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn:
row = conn.execute(
'SELECT object_count from container_stat').fetchone()
@@ -700,16 +729,17 @@ class ContainerBroker(DatabaseBroker):
def _commit_puts(self, item_list=None):
"""Handles commiting rows in .pending files."""
pending_file = self.db_file + '.pending'
if self.db_file == ':memory:' or not os.path.exists(pending_file):
return
if not os.path.getsize(pending_file):
os.unlink(pending_file)
if self.db_file == ':memory:' or not os.path.exists(self.pending_file):
return
if item_list is None:
item_list = []
with lock_parent_directory(pending_file, PENDING_COMMIT_TIMEOUT):
with open(pending_file, 'r+b') as fp:
with lock_parent_directory(self.pending_file, self.pending_timeout):
self._preallocate()
if not os.path.getsize(self.pending_file):
if item_list:
self.merge_items(item_list)
return
with open(self.pending_file, 'r+b') as fp:
for entry in fp.read().split(':'):
if entry:
try:
@@ -722,11 +752,11 @@ class ContainerBroker(DatabaseBroker):
except Exception:
self.logger.exception(
_('Invalid pending entry %(file)s: %(entry)s'),
{'file': pending_file, 'entry': entry})
{'file': self.pending_file, 'entry': entry})
if item_list:
self.merge_items(item_list)
try:
os.unlink(pending_file)
os.ftruncate(fp.fileno(), 0)
except OSError, err:
if err.errno != errno.ENOENT:
raise
@@ -744,6 +774,7 @@ class ContainerBroker(DatabaseBroker):
delete
:param sync_timestamp: max update_at timestamp of sync rows to delete
"""
self._commit_puts()
with self.get() as conn:
conn.execute("""
DELETE FROM object
@@ -787,9 +818,30 @@ class ContainerBroker(DatabaseBroker):
record = {'name': name, 'created_at': timestamp, 'size': size,
'content_type': content_type, 'etag': etag,
'deleted': deleted}
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
if self.db_file == ':memory:':
self.merge_items([record])
return
if not os.path.exists(self.db_file):
raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
self.merge_items([record])
pending_size = 0
try:
pending_size = os.path.getsize(self.pending_file)
except OSError, err:
if err.errno != errno.ENOENT:
raise
if pending_size > PENDING_CAP:
self._commit_puts([record])
else:
with lock_parent_directory(
self.pending_file, self.pending_timeout):
with open(self.pending_file, 'a+b') as fp:
# Colons aren't used in base64 encoding; so they are our
# delimiter
fp.write(':')
fp.write(pickle.dumps(
(name, timestamp, size, content_type, etag, deleted),
protocol=PICKLE_PROTOCOL).encode('base64'))
fp.flush()
def is_deleted(self, timestamp=None):
"""
@@ -799,6 +851,11 @@ class ContainerBroker(DatabaseBroker):
"""
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
return True
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn:
row = conn.execute('''
SELECT put_timestamp, delete_timestamp, object_count
@@ -821,6 +878,11 @@ class ContainerBroker(DatabaseBroker):
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash, id)
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn:
return conn.execute('''
SELECT account, container, created_at, put_timestamp,
@@ -857,6 +919,11 @@ class ContainerBroker(DatabaseBroker):
:returns: list of object names
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
rv = []
with self.get() as conn:
row = conn.execute('''
@@ -893,6 +960,11 @@ class ContainerBroker(DatabaseBroker):
:returns: list of tuples of (name, created_at, size, content_type,
etag)
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
if path is not None:
prefix = path
if path:
@@ -916,10 +988,7 @@ class ContainerBroker(DatabaseBroker):
elif prefix:
query += ' name >= ? AND'
query_args.append(prefix)
if self._get_db_version(conn) < 1:
query += ' +deleted = 0 ORDER BY name LIMIT ?'
else:
query += ' deleted = 0 ORDER BY name LIMIT ?'
query += ' +deleted = 0 ORDER BY name LIMIT ?'
query_args.append(limit - len(results))
curs = conn.execute(query, query_args)
curs.row_factory = None
@@ -967,19 +1036,18 @@ class ContainerBroker(DatabaseBroker):
max_rowid = -1
for rec in item_list:
conn.execute('''
DELETE FROM object WHERE name = ? AND created_at < ? AND
deleted IN (0, 1)
DELETE FROM object WHERE name = ? AND
(created_at < ?)
''', (rec['name'], rec['created_at']))
if not conn.execute('''
SELECT name FROM object WHERE name = ? AND
deleted IN (0, 1)
''', (rec['name'],)).fetchall():
try:
conn.execute('''
INSERT INTO object (name, created_at, size,
content_type, etag, deleted)
VALUES (?, ?, ?, ?, ?, ?)
''', ([rec['name'], rec['created_at'], rec['size'],
rec['content_type'], rec['etag'], rec['deleted']]))
except sqlite3.IntegrityError:
pass
if source:
max_rowid = max(max_rowid, rec['ROWID'])
if source:
@@ -1023,7 +1091,7 @@ class AccountBroker(DatabaseBroker):
conn.executescript("""
CREATE TABLE container (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
name TEXT UNIQUE,
put_timestamp TEXT,
delete_timestamp TEXT,
object_count INTEGER,
@@ -1031,9 +1099,8 @@ class AccountBroker(DatabaseBroker):
deleted INTEGER DEFAULT 0
);
CREATE INDEX ix_container_deleted_name ON
container (deleted, name);
CREATE INDEX ix_container_deleted ON container (deleted);
CREATE INDEX ix_container_name ON container (name);
CREATE TRIGGER container_insert AFTER INSERT ON container
BEGIN
UPDATE account_stat
@@ -1097,15 +1164,6 @@ class AccountBroker(DatabaseBroker):
''', (self.account, normalize_timestamp(time.time()), str(uuid4()),
put_timestamp))
def _get_db_version(self, conn):
if self._db_version == -1:
self._db_version = 0
for row in conn.execute('''
SELECT name FROM sqlite_master
WHERE name = 'ix_container_deleted_name' '''):
self._db_version = 1
return self._db_version
def update_put_timestamp(self, timestamp):
"""
Update the put_timestamp. Only modifies it if it is greater than
@@ -1135,16 +1193,17 @@ class AccountBroker(DatabaseBroker):
def _commit_puts(self, item_list=None):
"""Handles commiting rows in .pending files."""
pending_file = self.db_file + '.pending'
if self.db_file == ':memory:' or not os.path.exists(pending_file):
return
if not os.path.getsize(pending_file):
os.unlink(pending_file)
if self.db_file == ':memory:' or not os.path.exists(self.pending_file):
return
if item_list is None:
item_list = []
with lock_parent_directory(pending_file, PENDING_COMMIT_TIMEOUT):
with open(pending_file, 'r+b') as fp:
with lock_parent_directory(self.pending_file, self.pending_timeout):
self._preallocate()
if not os.path.getsize(self.pending_file):
if item_list:
self.merge_items(item_list)
return
with open(self.pending_file, 'r+b') as fp:
for entry in fp.read().split(':'):
if entry:
try:
@@ -1160,11 +1219,11 @@ class AccountBroker(DatabaseBroker):
except Exception:
self.logger.exception(
_('Invalid pending entry %(file)s: %(entry)s'),
{'file': pending_file, 'entry': entry})
{'file': self.pending_file, 'entry': entry})
if item_list:
self.merge_items(item_list)
try:
os.unlink(pending_file)
os.ftruncate(fp.fileno(), 0)
except OSError, err:
if err.errno != errno.ENOENT:
raise
@@ -1175,6 +1234,11 @@ class AccountBroker(DatabaseBroker):
:returns: True if the database has no active containers.
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn:
row = conn.execute(
'SELECT container_count from account_stat').fetchone()
@@ -1194,6 +1258,7 @@ class AccountBroker(DatabaseBroker):
:param sync_timestamp: max update_at timestamp of sync rows to delete
"""
self._commit_puts()
with self.get() as conn:
conn.execute('''
DELETE FROM container WHERE
@@ -1221,6 +1286,11 @@ class AccountBroker(DatabaseBroker):
:returns: put_timestamp of the container
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn:
ret = conn.execute('''
SELECT put_timestamp FROM container
@@ -1241,8 +1311,6 @@ class AccountBroker(DatabaseBroker):
:param object_count: number of objects in the container
:param bytes_used: number of bytes used by the container
"""
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
if delete_timestamp > put_timestamp and \
object_count in (None, '', 0, '0'):
deleted = 1
@@ -1253,7 +1321,24 @@ class AccountBroker(DatabaseBroker):
'object_count': object_count,
'bytes_used': bytes_used,
'deleted': deleted}
self.merge_items([record])
if self.db_file == ':memory:':
self.merge_items([record])
return
commit = False
with lock_parent_directory(self.pending_file, self.pending_timeout):
with open(self.pending_file, 'a+b') as fp:
# Colons aren't used in base64 encoding; so they are our
# delimiter
fp.write(':')
fp.write(pickle.dumps(
(name, put_timestamp, delete_timestamp, object_count,
bytes_used, deleted),
protocol=PICKLE_PROTOCOL).encode('base64'))
fp.flush()
if fp.tell() > PENDING_CAP:
commit = True
if commit:
self._commit_puts()
def can_delete_db(self, cutoff):
"""
@@ -1261,6 +1346,7 @@ class AccountBroker(DatabaseBroker):
:returns: True if the account can be deleted, False otherwise
"""
self._commit_puts()
with self.get() as conn:
row = conn.execute('''
SELECT status, put_timestamp, delete_timestamp, container_count
@@ -1286,6 +1372,11 @@ class AccountBroker(DatabaseBroker):
"""
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
return True
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn:
row = conn.execute('''
SELECT put_timestamp, delete_timestamp, container_count, status
@@ -1310,6 +1401,11 @@ class AccountBroker(DatabaseBroker):
delete_timestamp, container_count, object_count,
bytes_used, hash, id)
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
with self.get() as conn:
return conn.execute('''
SELECT account, created_at, put_timestamp, delete_timestamp,
@@ -1326,6 +1422,11 @@ class AccountBroker(DatabaseBroker):
:returns: list of container names
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
rv = []
with self.get() as conn:
row = conn.execute('''
@@ -1359,6 +1460,11 @@ class AccountBroker(DatabaseBroker):
:returns: list of tuples of (name, object_count, bytes_used, 0)
"""
try:
self._commit_puts()
except LockTimeout:
if not self.stale_reads_ok:
raise
if delimiter and not prefix:
prefix = ''
orig_marker = marker
@@ -1379,10 +1485,7 @@ class AccountBroker(DatabaseBroker):
elif prefix:
query += ' name >= ? AND'
query_args.append(prefix)
if self._get_db_version(conn) < 1:
query += ' +deleted = 0 ORDER BY name LIMIT ?'
else:
query += ' deleted = 0 ORDER BY name LIMIT ?'
query += ' +deleted = 0 ORDER BY name LIMIT ?'
query_args.append(limit - len(results))
curs = conn.execute(query, query_args)
curs.row_factory = None
@@ -1426,39 +1529,51 @@ class AccountBroker(DatabaseBroker):
record = [rec['name'], rec['put_timestamp'],
rec['delete_timestamp'], rec['object_count'],
rec['bytes_used'], rec['deleted']]
curs = conn.execute('''
SELECT name, put_timestamp, delete_timestamp,
object_count, bytes_used, deleted
FROM container WHERE name = ? AND
deleted IN (0, 1)
''', (rec['name'],))
curs.row_factory = None
row = curs.fetchone()
if row:
row = list(row)
for i in xrange(5):
if record[i] is None and row[i] is not None:
record[i] = row[i]
if row[1] > record[1]: # Keep newest put_timestamp
record[1] = row[1]
if row[2] > record[2]: # Keep newest delete_timestamp
record[2] = row[2]
# If deleted, mark as such
if record[2] > record[1] and \
record[3] in (None, '', 0, '0'):
record[5] = 1
else:
record[5] = 0
conn.execute('''
DELETE FROM container WHERE name = ? AND
deleted IN (0, 1)
''', (record[0],))
conn.execute('''
INSERT INTO container (name, put_timestamp,
delete_timestamp, object_count, bytes_used,
deleted)
VALUES (?, ?, ?, ?, ?, ?)
''', record)
try:
conn.execute('''
INSERT INTO container (name, put_timestamp,
delete_timestamp, object_count, bytes_used,
deleted)
VALUES (?, ?, ?, ?, ?, ?)
''', record)
except sqlite3.IntegrityError:
curs = conn.execute('''
SELECT name, put_timestamp, delete_timestamp,
object_count, bytes_used, deleted
FROM container WHERE name = ? AND
(put_timestamp < ? OR delete_timestamp < ? OR
object_count != ? OR bytes_used != ?)''',
(rec['name'], rec['put_timestamp'],
rec['delete_timestamp'], rec['object_count'],
rec['bytes_used']))
curs.row_factory = None
row = curs.fetchone()
if row:
row = list(row)
for i in xrange(5):
if record[i] is None and row[i] is not None:
record[i] = row[i]
if row[1] > record[1]: # Keep newest put_timestamp
record[1] = row[1]
if row[2] > record[2]: # Keep newest delete_timestamp
record[2] = row[2]
conn.execute('DELETE FROM container WHERE name = ?',
(record[0],))
# If deleted, mark as such
if record[2] > record[1] and \
record[3] in (None, '', 0, '0'):
record[5] = 1
else:
record[5] = 0
try:
conn.execute('''
INSERT INTO container (name, put_timestamp,
delete_timestamp, object_count, bytes_used,
deleted)
VALUES (?, ?, ?, ?, ?, ?)
''', record)
except sqlite3.IntegrityError:
continue
if source:
max_rowid = max(max_rowid, rec['ROWID'])
if source:

View File

@@ -180,9 +180,7 @@ class Replicator(Daemon):
return False
# perform block-level sync if the db was modified during the first sync
if os.path.exists(broker.db_file + '-journal') or \
os.path.exists(broker.db_file + '-wal') or \
os.path.exists(broker.db_file + '-shm') or \
os.path.getmtime(broker.db_file) > mtime:
os.path.getmtime(broker.db_file) > mtime:
# grab a lock so nobody else can modify it
with broker.lock():
if not self._rsync_file(broker.db_file, remote_file, False):
@@ -318,7 +316,7 @@ class Replicator(Daemon):
self.logger.debug(_('Replicating db %s'), object_file)
self.stats['attempted'] += 1
try:
broker = self.brokerclass(object_file)
broker = self.brokerclass(object_file, pending_timeout=30)
broker.reclaim(time.time() - self.reclaim_age,
time.time() - (self.reclaim_age * 2))
info = broker.get_replication_info()

View File

@@ -38,13 +38,17 @@ TRY_COUNT = 3
# will be considered failed for ERROR_LIMIT_DURATION seconds.
ERROR_LIMIT_COUNT = 10
ERROR_LIMIT_TIME = 60
ERROR_LIMIT_DURATION = 300
ERROR_LIMIT_DURATION = 60
def md5hash(key):
return md5(key).hexdigest()
class MemcacheConnectionError(Exception):
pass
class MemcacheRing(object):
"""
Simple, consistent-hashed memcache client.
@@ -180,6 +184,7 @@ class MemcacheRing(object):
:param delta: amount to add to the value of key (or set as the value
if the key is not found) will be cast to an int
:param timeout: ttl in memcache
:raises MemcacheConnectionError:
"""
key = md5hash(key)
command = 'incr'
@@ -209,6 +214,7 @@ class MemcacheRing(object):
return ret
except Exception, e:
self._exception_occurred(server, e)
raise MemcacheConnectionError("No Memcached connections succeeded.")
def decr(self, key, delta=1, timeout=0):
"""
@@ -220,6 +226,7 @@ class MemcacheRing(object):
value to 0 if the key is not found) will be cast to
an int
:param timeout: ttl in memcache
:raises MemcacheConnectionError:
"""
self.incr(key, delta=-delta, timeout=timeout)

View File

@@ -18,6 +18,7 @@ from webob.exc import HTTPNotFound
from swift.common.utils import split_path, cache_from_env, get_logger
from swift.proxy.server import get_container_memcache_key
from swift.common.memcached import MemcacheConnectionError
class MaxSleepTimeHitError(Exception):
@@ -136,28 +137,31 @@ class RateLimitMiddleware(object):
:param max_rate: maximum rate allowed in requests per second
:raises: MaxSleepTimeHitError if max sleep time is exceeded.
'''
now_m = int(round(time.time() * self.clock_accuracy))
time_per_request_m = int(round(self.clock_accuracy / max_rate))
running_time_m = self.memcache_client.incr(key,
delta=time_per_request_m)
need_to_sleep_m = 0
if (now_m - running_time_m >
self.rate_buffer_seconds * self.clock_accuracy):
next_avail_time = int(now_m + time_per_request_m)
self.memcache_client.set(key, str(next_avail_time),
serialize=False)
else:
need_to_sleep_m = \
max(running_time_m - now_m - time_per_request_m, 0)
try:
now_m = int(round(time.time() * self.clock_accuracy))
time_per_request_m = int(round(self.clock_accuracy / max_rate))
running_time_m = self.memcache_client.incr(key,
delta=time_per_request_m)
need_to_sleep_m = 0
if (now_m - running_time_m >
self.rate_buffer_seconds * self.clock_accuracy):
next_avail_time = int(now_m + time_per_request_m)
self.memcache_client.set(key, str(next_avail_time),
serialize=False)
else:
need_to_sleep_m = \
max(running_time_m - now_m - time_per_request_m, 0)
max_sleep_m = self.max_sleep_time_seconds * self.clock_accuracy
if max_sleep_m - need_to_sleep_m <= self.clock_accuracy * 0.01:
# treat as no-op decrement time
self.memcache_client.decr(key, delta=time_per_request_m)
raise MaxSleepTimeHitError("Max Sleep Time Exceeded: %s" %
need_to_sleep_m)
max_sleep_m = self.max_sleep_time_seconds * self.clock_accuracy
if max_sleep_m - need_to_sleep_m <= self.clock_accuracy * 0.01:
# treat as no-op decrement time
self.memcache_client.decr(key, delta=time_per_request_m)
raise MaxSleepTimeHitError("Max Sleep Time Exceeded: %s" %
need_to_sleep_m)
return float(need_to_sleep_m) / self.clock_accuracy
return float(need_to_sleep_m) / self.clock_accuracy
except MemcacheConnectionError:
return 0
def handle_ratelimit(self, req, account_name, container_name, obj_name):
'''

View File

@@ -16,6 +16,9 @@
"""
The swift3 middleware will emulate the S3 REST api on top of swift.
The boto python library is necessary to use this middleware (install
the python-boto package if you use Ubuntu).
The following opperations are currently supported:
* GET Service
@@ -55,6 +58,7 @@ import rfc822
import hmac
import base64
import errno
import boto.utils
from xml.sax.saxutils import escape as xml_escape
import cgi
@@ -378,31 +382,18 @@ class Swift3Middleware(object):
return ServiceController, d
def get_account_info(self, env, req):
if req.headers.get("content-md5"):
md5 = req.headers.get("content-md5")
else:
md5 = ""
if req.headers.get("content-type"):
content_type = req.headers.get("content-type")
else:
content_type = ""
if req.headers.get("date"):
date = req.headers.get("date")
else:
date = ""
h = req.method + "\n" + md5 + "\n" + content_type + "\n" + date + "\n"
for header in req.headers:
if header.startswith("X-Amz-"):
h += header.lower() + ":" + str(req.headers[header]) + "\n"
h += req.path
try:
account, user, _junk = \
req.headers['Authorization'].split(' ')[-1].split(':')
except Exception:
return None, None
headers = {}
for key in req.headers:
if type(req.headers[key]) == str:
headers[key] = req.headers[key]
h = boto.utils.canonical_string(req.method, req.path_qs, headers)
token = base64.urlsafe_b64encode(h)
return '%s:%s' % (account, user), token

View File

@@ -149,4 +149,12 @@ class Ring(object):
zones.remove(self.devs[part2dev_id[part]]['zone'])
while zones:
zone = zones.pop(part % len(zones))
yield self.zone2devs[zone][part % len(self.zone2devs[zone])]
weighted_node = None
for i in xrange(len(self.zone2devs[zone])):
node = self.zone2devs[zone][(part + i) %
len(self.zone2devs[zone])]
if node.get('weight'):
weighted_node = node
break
if weighted_node:
yield weighted_node

View File

@@ -925,6 +925,17 @@ def ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5):
return running_time + time_per_request
class ContextPool(GreenPool):
"GreenPool subclassed to kill its coros when it gets gc'ed"
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
for coro in list(self.coroutines_running):
coro.kill()
class ModifiedParseResult(ParseResult):
"Parse results class for urlparse."

View File

@@ -160,7 +160,7 @@ class ContainerController(object):
return resp
if existed:
return HTTPNoContent(request=req)
return HTTPAccepted(request=req)
return HTTPNotFound()
def PUT(self, req):
"""Handle HTTP PUT request."""
@@ -219,6 +219,8 @@ class ContainerController(object):
if self.mount_check and not check_mount(self.root, drive):
return Response(status='507 %s is not mounted' % drive)
broker = self._get_container_broker(drive, part, account, container)
broker.pending_timeout = 0.1
broker.stale_reads_ok = True
if broker.is_deleted():
return HTTPNotFound(request=req)
info = broker.get_info()
@@ -244,6 +246,8 @@ class ContainerController(object):
if self.mount_check and not check_mount(self.root, drive):
return Response(status='507 %s is not mounted' % drive)
broker = self._get_container_broker(drive, part, account, container)
broker.pending_timeout = 0.1
broker.stale_reads_ok = True
if broker.is_deleted():
return HTTPNotFound(request=req)
info = broker.get_info()

View File

@@ -31,7 +31,7 @@ import functools
from hashlib import md5
from random import shuffle
from eventlet import sleep, TimeoutError
from eventlet import sleep, GreenPile, Queue, TimeoutError
from eventlet.timeout import Timeout
from webob.exc import HTTPBadRequest, HTTPMethodNotAllowed, \
HTTPNotFound, HTTPPreconditionFailed, \
@@ -42,7 +42,7 @@ from webob import Request, Response
from swift.common.ring import Ring
from swift.common.utils import get_logger, normalize_timestamp, split_path, \
cache_from_env
cache_from_env, ContextPool
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_metadata, check_object_creation, \
check_utf8, CONTAINER_LISTING_LIMIT, MAX_ACCOUNT_NAME_LENGTH, \
@@ -266,6 +266,7 @@ class SegmentedIterable(object):
class Controller(object):
"""Base WSGI controller class for the proxy"""
server_type = _('Base')
def __init__(self, app):
self.account_name = None
@@ -359,8 +360,6 @@ class Controller(object):
path = '/%s' % account
headers = {'x-cf-trans-id': self.trans_id}
for node in self.iter_nodes(partition, nodes, self.app.account_ring):
if self.error_limited(node):
continue
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
@@ -433,8 +432,6 @@ class Controller(object):
attempts_left = self.app.container_ring.replica_count
headers = {'x-cf-trans-id': self.trans_id}
for node in self.iter_nodes(partition, nodes, self.app.container_ring):
if self.error_limited(node):
continue
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
@@ -490,36 +487,54 @@ class Controller(object):
:param ring: ring to get handoff nodes from
"""
for node in nodes:
yield node
if not self.error_limited(node):
yield node
for node in ring.get_more_nodes(partition):
yield node
if not self.error_limited(node):
yield node
def get_update_nodes(self, partition, nodes, ring):
""" Returns ring.replica_count nodes; the nodes will not be error
limited, if possible. """
def _make_request(self, nodes, part, method, path, headers, query):
for node in nodes:
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], part, method, path,
headers=headers, query_string=query)
conn.node = node
with Timeout(self.app.node_timeout):
resp = conn.getresponse()
if 200 <= resp.status < 500:
return resp.status, resp.reason, resp.read()
elif resp.status == 507:
self.error_limit(node)
except Exception:
self.error_limit(node)
self.exception_occurred(node, self.server_type,
_('Trying to %(method)s %(path)s') %
{'method': method, 'path': path})
def make_requests(self, req, ring, part, method, path, headers,
query_string=''):
"""
Attempt to get a non error limited list of nodes.
Sends an HTTP request to multiple nodes and aggregates the results.
It attempts the primary nodes concurrently, then iterates over the
handoff nodes as needed.
:param partition: partition for the nodes
:param nodes: list of node dicts for the partition
:param ring: ring to get handoff nodes from
:returns: list of node dicts that are not error limited (if possible)
:param headers: a list of dicts, where each dict represents one
backend request that should be made.
:returns: a webob Response object
"""
# make a copy so we don't modify caller's list
nodes = list(nodes)
update_nodes = []
for node in self.iter_nodes(partition, nodes, ring):
if self.error_limited(node):
continue
update_nodes.append(node)
if len(update_nodes) >= ring.replica_count:
break
while len(update_nodes) < ring.replica_count:
node = nodes.pop()
if node not in update_nodes:
update_nodes.append(node)
return update_nodes
nodes = self.iter_nodes(part, ring.get_part_nodes(part), ring)
pile = GreenPile(ring.replica_count)
for head in headers:
pile.spawn(self._make_request, nodes, part, method, path,
head, query_string)
response = [resp for resp in pile if resp]
while len(response) < ring.replica_count:
response.append((503, '', ''))
statuses, reasons, bodies = zip(*response)
return self.best_response(req, statuses, reasons, bodies,
'%s %s' % (self.server_type, req.method))
def best_response(self, req, statuses, reasons, bodies, server_type,
etag=None):
@@ -659,6 +674,7 @@ class Controller(object):
class ObjectController(Controller):
"""WSGI controller for object requests."""
server_type = _('Object')
def __init__(self, app, account_name, container_name, object_name,
**kwargs):
@@ -667,37 +683,6 @@ class ObjectController(Controller):
self.container_name = unquote(container_name)
self.object_name = unquote(object_name)
def node_post_or_delete(self, req, partition, node, path):
"""
Handle common POST/DELETE functionality
:param req: webob.Request object
:param partition: partition for the object
:param node: node dictionary for the object
:param path: path to send for the request
"""
if self.error_limited(node):
return 500, '', ''
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'],
partition, req.method, path, req.headers)
with Timeout(self.app.node_timeout):
response = conn.getresponse()
body = response.read()
if response.status == 507:
self.error_limit(node)
elif response.status >= 500:
self.error_occurred(node,
_('ERROR %(status)d %(body)s From Object Server') %
{'status': response.status, 'body': body[:1024]})
return response.status, response.reason, body
except (Exception, TimeoutError):
self.exception_occurred(node, _('Object'),
_('Trying to %(method)s %(path)s') %
{'method': req.method, 'path': req.path})
return 500, '', ''
def GETorHEAD(self, req):
"""Handle HTTP GET or HEAD requests."""
if 'swift.authorize' in req.environ:
@@ -874,35 +859,50 @@ class ObjectController(Controller):
return aresp
if not containers:
return HTTPNotFound(request=req)
containers = self.get_update_nodes(container_partition, containers,
self.app.container_ring)
partition, nodes = self.app.object_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
req.headers['X-Timestamp'] = normalize_timestamp(time.time())
statuses = []
reasons = []
bodies = []
for node in self.iter_nodes(partition, nodes, self.app.object_ring):
container = containers.pop()
req.headers['X-Container-Host'] = '%(ip)s:%(port)s' % container
req.headers['X-Container-Partition'] = container_partition
req.headers['X-Container-Device'] = container['device']
status, reason, body = \
self.node_post_or_delete(req, partition, node, req.path_info)
if 200 <= status < 300 or 400 <= status < 500:
statuses.append(status)
reasons.append(reason)
bodies.append(body)
else:
containers.insert(0, container)
if not containers:
break
while len(statuses) < len(nodes):
statuses.append(503)
reasons.append('')
bodies.append('')
return self.best_response(req, statuses, reasons,
bodies, _('Object POST'))
headers = []
for container in containers:
nheaders = dict(req.headers.iteritems())
nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container
nheaders['X-Container-Partition'] = container_partition
nheaders['X-Container-Device'] = container['device']
headers.append(nheaders)
return self.make_requests(req, self.app.object_ring,
partition, 'POST', req.path_info, headers)
def _send_file(self, conn, path):
"""Method for a file PUT coro"""
while True:
chunk = conn.queue.get()
if not conn.failed:
try:
with ChunkWriteTimeout(self.app.node_timeout):
conn.send(chunk)
except (Exception, ChunkWriteTimeout):
conn.failed = True
self.exception_occurred(conn.node, _('Object'),
_('Trying to write to %s') % path)
conn.queue.task_done()
def _connect_put_node(self, nodes, part, path, headers):
"""Method for a file PUT connect"""
for node in nodes:
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], part, 'PUT', path, headers)
with Timeout(self.app.node_timeout):
resp = conn.getexpect()
if resp.status == 100:
conn.node = node
return conn
elif resp.status == 507:
self.error_limit(node)
except:
self.exception_occurred(node, _('Object'),
_('Expect: 100-continue on %s') % path)
@public
@delay_denial
@@ -916,8 +916,6 @@ class ObjectController(Controller):
return aresp
if not containers:
return HTTPNotFound(request=req)
containers = self.get_update_nodes(container_partition, containers,
self.app.container_ring)
partition, nodes = self.app.object_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
req.headers['X-Timestamp'] = normalize_timestamp(time.time())
@@ -925,15 +923,12 @@ class ObjectController(Controller):
content_type_manually_set = True
if not req.headers.get('content-type'):
guessed_type, _junk = mimetypes.guess_type(req.path_info)
if not guessed_type:
req.headers['Content-Type'] = 'application/octet-stream'
else:
req.headers['Content-Type'] = guessed_type
req.headers['Content-Type'] = guessed_type or \
'application/octet-stream'
content_type_manually_set = False
error_response = check_object_creation(req, self.object_name)
if error_response:
return error_response
conns = []
data_source = \
iter(lambda: req.body_file.read(self.app.client_chunk_size), '')
source_header = req.headers.get('X-Copy-From')
@@ -984,75 +979,57 @@ class ObjectController(Controller):
if k.lower().startswith('x-object-meta-'):
new_req.headers[k] = v
req = new_req
for node in self.iter_nodes(partition, nodes, self.app.object_ring):
container = containers.pop()
req.headers['X-Container-Host'] = '%(ip)s:%(port)s' % container
req.headers['X-Container-Partition'] = container_partition
req.headers['X-Container-Device'] = container['device']
req.headers['Expect'] = '100-continue'
resp = conn = None
if not self.error_limited(node):
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], partition, 'PUT',
req.path_info, req.headers)
conn.node = node
with Timeout(self.app.node_timeout):
resp = conn.getexpect()
except (Exception, TimeoutError):
self.exception_occurred(node, _('Object'),
_('Expect: 100-continue on %s') % req.path)
if conn and resp:
if resp.status == 100:
conns.append(conn)
if not containers:
break
continue
elif resp.status == 507:
self.error_limit(node)
containers.insert(0, container)
node_iter = self.iter_nodes(partition, nodes, self.app.object_ring)
pile = GreenPile(len(nodes))
for container in containers:
nheaders = dict(req.headers.iteritems())
nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container
nheaders['X-Container-Partition'] = container_partition
nheaders['X-Container-Device'] = container['device']
nheaders['Expect'] = '100-continue'
pile.spawn(self._connect_put_node, node_iter, partition,
req.path_info, nheaders)
conns = [conn for conn in pile if conn]
if len(conns) <= len(nodes) / 2:
self.app.logger.error(
_('Object PUT returning 503, %(conns)s/%(nodes)s '
'required connections'),
{'conns': len(conns), 'nodes': len(nodes) // 2 + 1})
return HTTPServiceUnavailable(request=req)
chunked = req.headers.get('transfer-encoding')
try:
req.bytes_transferred = 0
while True:
with ChunkReadTimeout(self.app.client_timeout):
try:
chunk = data_source.next()
except StopIteration:
if req.headers.get('transfer-encoding'):
chunk = ''
else:
with ContextPool(len(nodes)) as pool:
for conn in conns:
conn.failed = False
conn.queue = Queue(self.app.put_queue_depth)
pool.spawn(self._send_file, conn, req.path)
req.bytes_transferred = 0
while True:
with ChunkReadTimeout(self.app.client_timeout):
try:
chunk = next(data_source)
except StopIteration:
if chunked:
[conn.queue.put('0\r\n\r\n') for conn in conns]
break
len_chunk = len(chunk)
req.bytes_transferred += len_chunk
if req.bytes_transferred > MAX_FILE_SIZE:
return HTTPRequestEntityTooLarge(request=req)
for conn in list(conns):
try:
with ChunkWriteTimeout(self.app.node_timeout):
if req.headers.get('transfer-encoding'):
conn.send('%x\r\n%s\r\n' % (len_chunk, chunk))
else:
conn.send(chunk)
except (Exception, TimeoutError):
self.exception_occurred(conn.node, _('Object'),
_('Trying to write to %s') % req.path)
conns.remove(conn)
if len(conns) <= len(nodes) / 2:
self.app.logger.error(
_('Object PUT exceptions during send, '
'%(conns)s/%(nodes)s required connections'),
{'conns': len(conns),
'nodes': len(nodes) // 2 + 1})
return HTTPServiceUnavailable(request=req)
if req.headers.get('transfer-encoding') and chunk == '':
break
req.bytes_transferred += len(chunk)
if req.bytes_transferred > MAX_FILE_SIZE:
return HTTPRequestEntityTooLarge(request=req)
for conn in list(conns):
if not conn.failed:
conn.queue.put('%x\r\n%s\r\n' % (len(chunk), chunk)
if chunked else chunk)
else:
conns.remove(conn)
if len(conns) <= len(nodes) / 2:
self.app.logger.error(_('Object PUT exceptions during'
' send, %(conns)s/%(nodes)s required connections'),
{'conns': len(conns), 'nodes': len(nodes) / 2 + 1})
return HTTPServiceUnavailable(request=req)
for conn in conns:
if conn.queue.unfinished_tasks:
conn.queue.join()
conns = [conn for conn in conns if not conn.failed]
except ChunkReadTimeout, err:
self.app.logger.warn(
_('ERROR Client read timeout (%ss)'), err.seconds)
@@ -1122,35 +1099,18 @@ class ObjectController(Controller):
return aresp
if not containers:
return HTTPNotFound(request=req)
containers = self.get_update_nodes(container_partition, containers,
self.app.container_ring)
partition, nodes = self.app.object_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
req.headers['X-Timestamp'] = normalize_timestamp(time.time())
statuses = []
reasons = []
bodies = []
for node in self.iter_nodes(partition, nodes, self.app.object_ring):
container = containers.pop()
req.headers['X-Container-Host'] = '%(ip)s:%(port)s' % container
req.headers['X-Container-Partition'] = container_partition
req.headers['X-Container-Device'] = container['device']
status, reason, body = \
self.node_post_or_delete(req, partition, node, req.path_info)
if 200 <= status < 300 or 400 <= status < 500:
statuses.append(status)
reasons.append(reason)
bodies.append(body)
else:
containers.insert(0, container)
if not containers:
break
while len(statuses) < len(nodes):
statuses.append(503)
reasons.append('')
bodies.append('')
return self.best_response(req, statuses, reasons, bodies,
_('Object DELETE'))
headers = []
for container in containers:
nheaders = dict(req.headers.iteritems())
nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container
nheaders['X-Container-Partition'] = container_partition
nheaders['X-Container-Device'] = container['device']
headers.append(nheaders)
return self.make_requests(req, self.app.object_ring,
partition, 'DELETE', req.path_info, headers)
@public
@delay_denial
@@ -1184,6 +1144,7 @@ class ObjectController(Controller):
class ContainerController(Controller):
"""WSGI controller for container requests"""
server_type = _('Container')
# Ensure these are all lowercase
pass_through_headers = ['x-container-read', 'x-container-write']
@@ -1259,59 +1220,25 @@ class ContainerController(Controller):
account_partition, accounts = self.account_info(self.account_name)
if not accounts:
return HTTPNotFound(request=req)
accounts = self.get_update_nodes(account_partition, accounts,
self.app.account_ring)
container_partition, containers = self.app.container_ring.get_nodes(
self.account_name, self.container_name)
headers = {'X-Timestamp': normalize_timestamp(time.time()),
'x-cf-trans-id': self.trans_id}
headers.update(value for value in req.headers.iteritems()
if value[0].lower() in self.pass_through_headers or
value[0].lower().startswith('x-container-meta-'))
statuses = []
reasons = []
bodies = []
for node in self.iter_nodes(container_partition, containers,
self.app.container_ring):
if self.error_limited(node):
continue
try:
account = accounts.pop()
headers['X-Account-Host'] = '%(ip)s:%(port)s' % account
headers['X-Account-Partition'] = account_partition
headers['X-Account-Device'] = account['device']
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], container_partition, 'PUT',
req.path_info, headers)
with Timeout(self.app.node_timeout):
source = conn.getresponse()
body = source.read()
if 200 <= source.status < 300 \
or 400 <= source.status < 500:
statuses.append(source.status)
reasons.append(source.reason)
bodies.append(body)
else:
if source.status == 507:
self.error_limit(node)
accounts.insert(0, account)
except (Exception, TimeoutError):
accounts.insert(0, account)
self.exception_occurred(node, _('Container'),
_('Trying to PUT to %s') % req.path)
if not accounts:
break
while len(statuses) < len(containers):
statuses.append(503)
reasons.append('')
bodies.append('')
headers = []
for account in accounts:
nheaders = {'X-Timestamp': normalize_timestamp(time.time()),
'x-cf-trans-id': self.trans_id,
'X-Account-Host': '%(ip)s:%(port)s' % account,
'X-Account-Partition': account_partition,
'X-Account-Device': account['device']}
nheaders.update(value for value in req.headers.iteritems()
if value[0].lower() in self.pass_through_headers or
value[0].lower().startswith('x-container-meta-'))
headers.append(nheaders)
if self.app.memcache:
cache_key = get_container_memcache_key(self.account_name,
self.container_name)
self.app.memcache.delete(cache_key)
return self.best_response(req, statuses, reasons, bodies,
_('Container PUT'))
return self.make_requests(req, self.app.container_ring,
container_partition, 'PUT', req.path_info, headers)
@public
def POST(self, req):
@@ -1330,43 +1257,13 @@ class ContainerController(Controller):
headers.update(value for value in req.headers.iteritems()
if value[0].lower() in self.pass_through_headers or
value[0].lower().startswith('x-container-meta-'))
statuses = []
reasons = []
bodies = []
for node in self.iter_nodes(container_partition, containers,
self.app.container_ring):
if self.error_limited(node):
continue
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], container_partition, 'POST',
req.path_info, headers)
with Timeout(self.app.node_timeout):
source = conn.getresponse()
body = source.read()
if 200 <= source.status < 300 \
or 400 <= source.status < 500:
statuses.append(source.status)
reasons.append(source.reason)
bodies.append(body)
elif source.status == 507:
self.error_limit(node)
except (Exception, TimeoutError):
self.exception_occurred(node, _('Container'),
_('Trying to POST %s') % req.path)
if len(statuses) >= len(containers):
break
while len(statuses) < len(containers):
statuses.append(503)
reasons.append('')
bodies.append('')
if self.app.memcache:
cache_key = get_container_memcache_key(self.account_name,
self.container_name)
self.app.memcache.delete(cache_key)
return self.best_response(req, statuses, reasons, bodies,
_('Container POST'))
return self.make_requests(req, self.app.container_ring,
container_partition, 'POST', req.path_info,
[headers] * len(containers))
@public
def DELETE(self, req):
@@ -1374,65 +1271,21 @@ class ContainerController(Controller):
account_partition, accounts = self.account_info(self.account_name)
if not accounts:
return HTTPNotFound(request=req)
accounts = self.get_update_nodes(account_partition, accounts,
self.app.account_ring)
container_partition, containers = self.app.container_ring.get_nodes(
self.account_name, self.container_name)
headers = {'X-Timestamp': normalize_timestamp(time.time()),
'x-cf-trans-id': self.trans_id}
statuses = []
reasons = []
bodies = []
for node in self.iter_nodes(container_partition, containers,
self.app.container_ring):
if self.error_limited(node):
continue
try:
account = accounts.pop()
headers['X-Account-Host'] = '%(ip)s:%(port)s' % account
headers['X-Account-Partition'] = account_partition
headers['X-Account-Device'] = account['device']
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], container_partition, 'DELETE',
req.path_info, headers)
with Timeout(self.app.node_timeout):
source = conn.getresponse()
body = source.read()
if 200 <= source.status < 300 \
or 400 <= source.status < 500:
statuses.append(source.status)
reasons.append(source.reason)
bodies.append(body)
else:
if source.status == 507:
self.error_limit(node)
accounts.insert(0, account)
except (Exception, TimeoutError):
accounts.insert(0, account)
self.exception_occurred(node, _('Container'),
_('Trying to DELETE %s') % req.path)
if not accounts:
break
while len(statuses) < len(containers):
statuses.append(503)
reasons.append('')
bodies.append('')
headers = []
for account in accounts:
headers.append({'X-Timestamp': normalize_timestamp(time.time()),
'X-Cf-Trans-Id': self.trans_id,
'X-Account-Host': '%(ip)s:%(port)s' % account,
'X-Account-Partition': account_partition,
'X-Account-Device': account['device']})
if self.app.memcache:
cache_key = get_container_memcache_key(self.account_name,
self.container_name)
self.app.memcache.delete(cache_key)
resp = self.best_response(req, statuses, reasons, bodies,
_('Container DELETE'))
if 200 <= resp.status_int <= 299:
for status in statuses:
if status < 200 or status > 299:
# If even one node doesn't do the delete, we can't be sure
# what the outcome will be once everything is in sync; so
# we 503.
self.app.logger.error(_('Returning 503 because not all '
'container nodes confirmed DELETE'))
return HTTPServiceUnavailable(request=req)
resp = self.make_requests(req, self.app.container_ring,
container_partition, 'DELETE', req.path_info, headers)
if resp.status_int == 202: # Indicates no server had the container
return HTTPNotFound(request=req)
return resp
@@ -1440,6 +1293,7 @@ class ContainerController(Controller):
class AccountController(Controller):
"""WSGI controller for account requests"""
server_type = _('Account')
def __init__(self, app, account_name, **kwargs):
Controller.__init__(self, app)
@@ -1470,42 +1324,10 @@ class AccountController(Controller):
'x-cf-trans-id': self.trans_id}
headers.update(value for value in req.headers.iteritems()
if value[0].lower().startswith('x-account-meta-'))
statuses = []
reasons = []
bodies = []
for node in self.iter_nodes(account_partition, accounts,
self.app.account_ring):
if self.error_limited(node):
continue
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], account_partition, 'PUT',
req.path_info, headers)
with Timeout(self.app.node_timeout):
source = conn.getresponse()
body = source.read()
if 200 <= source.status < 300 \
or 400 <= source.status < 500:
statuses.append(source.status)
reasons.append(source.reason)
bodies.append(body)
else:
if source.status == 507:
self.error_limit(node)
except (Exception, TimeoutError):
self.exception_occurred(node, _('Account'),
_('Trying to PUT to %s') % req.path)
if len(statuses) >= len(accounts):
break
while len(statuses) < len(accounts):
statuses.append(503)
reasons.append('')
bodies.append('')
if self.app.memcache:
self.app.memcache.delete('account%s' % req.path_info.rstrip('/'))
return self.best_response(req, statuses, reasons, bodies,
_('Account PUT'))
return self.make_requests(req, self.app.account_ring, account_partition,
'PUT', req.path_info, [headers] * len(accounts))
@public
def POST(self, req):
@@ -1519,41 +1341,10 @@ class AccountController(Controller):
'X-CF-Trans-Id': self.trans_id}
headers.update(value for value in req.headers.iteritems()
if value[0].lower().startswith('x-account-meta-'))
statuses = []
reasons = []
bodies = []
for node in self.iter_nodes(account_partition, accounts,
self.app.account_ring):
if self.error_limited(node):
continue
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], account_partition, 'POST',
req.path_info, headers)
with Timeout(self.app.node_timeout):
source = conn.getresponse()
body = source.read()
if 200 <= source.status < 300 \
or 400 <= source.status < 500:
statuses.append(source.status)
reasons.append(source.reason)
bodies.append(body)
elif source.status == 507:
self.error_limit(node)
except (Exception, TimeoutError):
self.exception_occurred(node, _('Account'),
_('Trying to POST %s') % req.path)
if len(statuses) >= len(accounts):
break
while len(statuses) < len(accounts):
statuses.append(503)
reasons.append('')
bodies.append('')
if self.app.memcache:
self.app.memcache.delete('account%s' % req.path_info.rstrip('/'))
return self.best_response(req, statuses, reasons, bodies,
_('Account POST'))
return self.make_requests(req, self.app.account_ring, account_partition,
'POST', req.path_info, [headers] * len(accounts))
@public
def DELETE(self, req):
@@ -1564,41 +1355,10 @@ class AccountController(Controller):
self.app.account_ring.get_nodes(self.account_name)
headers = {'X-Timestamp': normalize_timestamp(time.time()),
'X-CF-Trans-Id': self.trans_id}
statuses = []
reasons = []
bodies = []
for node in self.iter_nodes(account_partition, accounts,
self.app.account_ring):
if self.error_limited(node):
continue
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], account_partition, 'DELETE',
req.path_info, headers)
with Timeout(self.app.node_timeout):
source = conn.getresponse()
body = source.read()
if 200 <= source.status < 300 \
or 400 <= source.status < 500:
statuses.append(source.status)
reasons.append(source.reason)
bodies.append(body)
elif source.status == 507:
self.error_limit(node)
except (Exception, TimeoutError):
self.exception_occurred(node, _('Account'),
_('Trying to DELETE %s') % req.path)
if len(statuses) >= len(accounts):
break
while len(statuses) < len(accounts):
statuses.append(503)
reasons.append('')
bodies.append('')
if self.app.memcache:
self.app.memcache.delete('account%s' % req.path_info.rstrip('/'))
return self.best_response(req, statuses, reasons, bodies,
_('Account DELETE'))
return self.make_requests(req, self.app.account_ring, account_partition,
'DELETE', req.path_info, [headers] * len(accounts))
class BaseApplication(object):
@@ -1624,6 +1384,7 @@ class BaseApplication(object):
self.node_timeout = int(conf.get('node_timeout', 10))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.client_timeout = int(conf.get('client_timeout', 60))
self.put_queue_depth = int(conf.get('put_queue_depth', 10))
self.object_chunk_size = int(conf.get('object_chunk_size', 65536))
self.client_chunk_size = int(conf.get('client_chunk_size', 65536))
self.log_headers = conf.get('log_headers') == 'True'

View File

@@ -159,11 +159,10 @@ class LogProcessor(object):
def get_object_data(self, swift_account, container_name, object_name,
compressed=False):
'''reads an object and yields its lines'''
code, o = self.internal_proxy.get_object(swift_account,
container_name,
object_name)
code, o = self.internal_proxy.get_object(swift_account, container_name,
object_name)
if code < 200 or code >= 300:
return
raise BadFileDownload()
last_part = ''
last_compressed_part = ''
# magic in the following zlib.decompressobj argument is courtesy of
@@ -273,7 +272,7 @@ class LogProcessorDaemon(Daemon):
already_processed_files = cPickle.loads(buf)
else:
already_processed_files = set()
except Exception:
except BadFileDownload:
already_processed_files = set()
self.logger.debug(_('found %d processed files') % \
len(already_processed_files))
@@ -362,7 +361,11 @@ class LogProcessorDaemon(Daemon):
def multiprocess_collate(processor_args, logs_to_process, worker_count):
'''yield hourly data from logs_to_process'''
'''
yield hourly data from logs_to_process
Every item that this function yields will be added to the processed files
list.
'''
results = []
in_queue = multiprocessing.Queue()
out_queue = multiprocessing.Queue()
@@ -376,33 +379,30 @@ def multiprocess_collate(processor_args, logs_to_process, worker_count):
for x in logs_to_process:
in_queue.put(x)
for _junk in range(worker_count):
in_queue.put(None)
count = 0
in_queue.put(None) # tell the worker to end
while True:
try:
item, data = out_queue.get_nowait()
count += 1
if data:
yield item, data
if count >= len(logs_to_process):
# this implies that one result will come from every request
break
except Queue.Empty:
time.sleep(.1)
for r in results:
r.join()
time.sleep(.01)
else:
if not isinstance(data, BadFileDownload):
yield item, data
if not any(r.is_alive() for r in results) and out_queue.empty():
# all the workers are done and nothing is in the queue
break
def collate_worker(processor_args, in_queue, out_queue):
'''worker process for multiprocess_collate'''
p = LogProcessor(*processor_args)
while True:
item = in_queue.get()
if item is None:
# no more work to process
break
try:
item = in_queue.get_nowait()
if item is None:
break
except Queue.Empty:
time.sleep(.1)
else:
ret = p.process_one_file(*item)
out_queue.put((item, ret))
except BadFileDownload, err:
ret = err
out_queue.put((item, ret))

View File

@@ -70,17 +70,6 @@ class TestContainerFailures(unittest.TestCase):
self.assert_(object1 in [o['name'] for o in
client.get_container(self.url, self.token, container)[1]])
# This fails because all three nodes have to indicate deletion before
# we tell the user it worked. Since the first node 409s (it hasn't got
# the update that the object was deleted yet), the whole must 503
# (until every is synced up, then the delete would work).
exc = None
try:
client.delete_container(self.url, self.token, container)
except client.ClientException, err:
exc = err
self.assert_(exc)
self.assert_(exc.http_status, 503)
# Unfortunately, the following might pass or fail, depending on the
# position of the account server associated with the first container
# server we had killed. If the associated happens to be the first
@@ -144,17 +133,6 @@ class TestContainerFailures(unittest.TestCase):
self.assert_(object1 not in [o['name'] for o in
client.get_container(self.url, self.token, container)[1]])
# This fails because all three nodes have to indicate deletion before
# we tell the user it worked. Since the first node 409s (it hasn't got
# the update that the object was deleted yet), the whole must 503
# (until every is synced up, then the delete would work).
exc = None
try:
client.delete_container(self.url, self.token, container)
except client.ClientException, err:
exc = err
self.assert_(exc)
self.assert_(exc.http_status, 503)
# Unfortunately, the following might pass or fail, depending on the
# position of the account server associated with the first container
# server we had killed. If the associated happens to be the first

View File

@@ -21,12 +21,14 @@ from webob import Request
from swift.common.middleware import ratelimit
from swift.proxy.server import get_container_memcache_key
from swift.common.memcached import MemcacheConnectionError
class FakeMemcache(object):
def __init__(self):
self.store = {}
self.error_on_incr = False
def get(self, key):
return self.store.get(key)
@@ -36,6 +38,8 @@ class FakeMemcache(object):
return True
def incr(self, key, delta=1, timeout=0):
if self.error_on_incr:
raise MemcacheConnectionError('Memcache restarting')
self.store[key] = int(self.store.setdefault(key, 0)) + int(delta)
if self.store[key] < 0:
self.store[key] = 0
@@ -403,6 +407,21 @@ class TestRateLimit(unittest.TestCase):
start_response)
self._run(make_app_call, num_calls, current_rate)
def test_restarting_memcache(self):
current_rate = 2
num_calls = 5
conf_dict = {'account_ratelimit': current_rate}
self.test_ratelimit = ratelimit.filter_factory(conf_dict)(FakeApp())
ratelimit.http_connect = mock_http_connect(204)
req = Request.blank('/v/a')
req.environ['swift.cache'] = FakeMemcache()
req.environ['swift.cache'].error_on_incr = True
make_app_call = lambda: self.test_ratelimit(req.environ,
start_response)
begin = time.time()
self._run(make_app_call, num_calls, current_rate, check_time=False)
time_took = time.time() - begin
self.assert_(round(time_took, 1) == 0) # no memcache, no limiting
if __name__ == '__main__':
unittest.main()

View File

@@ -22,9 +22,14 @@ from webob.exc import HTTPUnauthorized, HTTPCreated, HTTPNoContent,\
HTTPAccepted, HTTPBadRequest, HTTPNotFound, HTTPConflict
import xml.dom.minidom
import simplejson
from nose.plugins.skip import SkipTest
from swift.common.middleware import swift3
try:
from swift.common.middleware import swift3
skip = False
except Exception:
# Skip the swift3 tests if boto is not installed
skip = True
class FakeApp(object):
def __init__(self):
@@ -190,6 +195,8 @@ def start_response(*args):
class TestSwift3(unittest.TestCase):
def setUp(self):
if skip:
raise SkipTest
self.app = swift3.filter_factory({})(FakeApp())
def test_non_s3_request_passthrough(self):

View File

@@ -50,7 +50,8 @@ class TestRing(unittest.TestCase):
os.mkdir(self.testdir)
self.testgz = os.path.join(self.testdir, 'ring.gz')
self.intended_replica2part2dev_id = [[0, 2, 0, 2], [2, 0, 2, 0]]
self.intended_devs = [{'id': 0, 'zone': 0}, None, {'id': 2, 'zone': 2}]
self.intended_devs = [{'id': 0, 'zone': 0, 'weight': 1.0}, None,
{'id': 2, 'zone': 2, 'weight': 1.0}]
self.intended_part_shift = 30
self.intended_reload_time = 15
pickle.dump(ring.RingData(self.intended_replica2part2dev_id,
@@ -79,7 +80,7 @@ class TestRing(unittest.TestCase):
def test_has_changed(self):
self.assertEquals(self.ring.has_changed(), False)
os.utime(self.testgz, (time()+60, time()+60))
os.utime(self.testgz, (time() + 60, time() + 60))
self.assertEquals(self.ring.has_changed(), True)
def test_reload(self):
@@ -87,7 +88,7 @@ class TestRing(unittest.TestCase):
self.ring = ring.Ring(self.testgz, reload_time=0.001)
orig_mtime = self.ring._mtime
self.assertEquals(len(self.ring.devs), 3)
self.intended_devs.append({'id': 3, 'zone': 3})
self.intended_devs.append({'id': 3, 'zone': 3, 'weight': 1.0})
pickle.dump(ring.RingData(self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift),
GzipFile(self.testgz, 'wb'))
@@ -100,7 +101,7 @@ class TestRing(unittest.TestCase):
self.ring = ring.Ring(self.testgz, reload_time=0.001)
orig_mtime = self.ring._mtime
self.assertEquals(len(self.ring.devs), 4)
self.intended_devs.append({'id': 4, 'zone': 4})
self.intended_devs.append({'id': 4, 'zone': 4, 'weight': 1.0})
pickle.dump(ring.RingData(self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift),
GzipFile(self.testgz, 'wb'))
@@ -115,7 +116,7 @@ class TestRing(unittest.TestCase):
orig_mtime = self.ring._mtime
part, nodes = self.ring.get_nodes('a')
self.assertEquals(len(self.ring.devs), 5)
self.intended_devs.append({'id': 5, 'zone': 5})
self.intended_devs.append({'id': 5, 'zone': 5, 'weight': 1.0})
pickle.dump(ring.RingData(self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift),
GzipFile(self.testgz, 'wb'))
@@ -134,57 +135,71 @@ class TestRing(unittest.TestCase):
self.assertRaises(TypeError, self.ring.get_nodes)
part, nodes = self.ring.get_nodes('a')
self.assertEquals(part, 0)
self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}])
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
part, nodes = self.ring.get_nodes('a1')
self.assertEquals(part, 0)
self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}])
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
part, nodes = self.ring.get_nodes('a4')
self.assertEquals(part, 1)
self.assertEquals(nodes, [{'id': 2, 'zone': 2}, {'id': 0, 'zone': 0}])
self.assertEquals(nodes, [{'id': 2, 'zone': 2, 'weight': 1.0},
{'id': 0, 'zone': 0, 'weight': 1.0}])
part, nodes = self.ring.get_nodes('aa')
self.assertEquals(part, 1)
self.assertEquals(nodes, [{'id': 2, 'zone': 2}, {'id': 0, 'zone': 0}])
self.assertEquals(nodes, [{'id': 2, 'zone': 2, 'weight': 1.0},
{'id': 0, 'zone': 0, 'weight': 1.0}])
part, nodes = self.ring.get_nodes('a', 'c1')
self.assertEquals(part, 0)
self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}])
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
part, nodes = self.ring.get_nodes('a', 'c0')
self.assertEquals(part, 3)
self.assertEquals(nodes, [{'id': 2, 'zone': 2}, {'id': 0, 'zone': 0}])
self.assertEquals(nodes, [{'id': 2, 'zone': 2, 'weight': 1.0},
{'id': 0, 'zone': 0, 'weight': 1.0}])
part, nodes = self.ring.get_nodes('a', 'c3')
self.assertEquals(part, 2)
self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}])
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
part, nodes = self.ring.get_nodes('a', 'c2')
self.assertEquals(part, 2)
self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}])
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
part, nodes = self.ring.get_nodes('a', 'c', 'o1')
self.assertEquals(part, 1)
self.assertEquals(nodes, [{'id': 2, 'zone': 2}, {'id': 0, 'zone': 0}])
self.assertEquals(nodes, [{'id': 2, 'zone': 2, 'weight': 1.0},
{'id': 0, 'zone': 0, 'weight': 1.0}])
part, nodes = self.ring.get_nodes('a', 'c', 'o5')
self.assertEquals(part, 0)
self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}])
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
part, nodes = self.ring.get_nodes('a', 'c', 'o0')
self.assertEquals(part, 0)
self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}])
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
part, nodes = self.ring.get_nodes('a', 'c', 'o2')
self.assertEquals(part, 2)
self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}])
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
def test_get_more_nodes(self):
# Yes, these tests are deliberately very fragile. We want to make sure
# that if someone changes the results the ring produces, they know it.
part, nodes = self.ring.get_nodes('a', 'c', 'o2')
self.assertEquals(part, 2)
self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}])
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
nodes = list(self.ring.get_more_nodes(part))
self.assertEquals(nodes, [])
self.ring.devs.append({'id': 3, 'zone': 0})
self.ring.devs.append({'id': 3, 'zone': 0, 'weight': 1.0})
self.ring.zone2devs[0].append(self.ring.devs[3])
part, nodes = self.ring.get_nodes('a', 'c', 'o2')
self.assertEquals(part, 2)
self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}])
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
nodes = list(self.ring.get_more_nodes(part))
self.assertEquals(nodes, [])
@@ -193,18 +208,36 @@ class TestRing(unittest.TestCase):
self.ring.zone2devs[3] = [self.ring.devs[3]]
part, nodes = self.ring.get_nodes('a', 'c', 'o2')
self.assertEquals(part, 2)
self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}])
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
nodes = list(self.ring.get_more_nodes(part))
self.assertEquals(nodes, [{'id': 3, 'zone': 3}])
self.assertEquals(nodes, [{'id': 3, 'zone': 3, 'weight': 1.0}])
self.ring.devs.append(None)
self.ring.devs.append({'id': 5, 'zone': 5})
self.ring.devs.append({'id': 5, 'zone': 5, 'weight': 1.0})
self.ring.zone2devs[5] = [self.ring.devs[5]]
part, nodes = self.ring.get_nodes('a', 'c', 'o2')
self.assertEquals(part, 2)
self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}])
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
nodes = list(self.ring.get_more_nodes(part))
self.assertEquals(nodes, [{'id': 3, 'zone': 3}, {'id': 5, 'zone': 5}])
self.assertEquals(nodes, [{'id': 3, 'zone': 3, 'weight': 1.0},
{'id': 5, 'zone': 5, 'weight': 1.0}])
self.ring.devs.append({'id': 6, 'zone': 5, 'weight': 1.0})
self.ring.zone2devs[5].append(self.ring.devs[6])
nodes = list(self.ring.get_more_nodes(part))
self.assertEquals(nodes, [{'id': 3, 'zone': 3, 'weight': 1.0},
{'id': 5, 'zone': 5, 'weight': 1.0}])
self.ring.devs[5]['weight'] = 0
nodes = list(self.ring.get_more_nodes(part))
self.assertEquals(nodes, [{'id': 3, 'zone': 3, 'weight': 1.0},
{'id': 6, 'zone': 5, 'weight': 1.0}])
self.ring.devs[3]['weight'] = 0
self.ring.devs.append({'id': 7, 'zone': 6, 'weight': 0.0})
self.ring.zone2devs[6] = [self.ring.devs[7]]
nodes = list(self.ring.get_more_nodes(part))
self.assertEquals(nodes, [{'id': 6, 'zone': 5, 'weight': 1.0}])
if __name__ == '__main__':

View File

@@ -50,6 +50,7 @@ class MockMemcached(object):
self.cache = {}
self.down = False
self.exc_on_delete = False
self.read_return_none = False
def sendall(self, string):
if self.down:
@@ -110,6 +111,8 @@ class MockMemcached(object):
else:
self.outbuf += 'NOT_FOUND\r\n'
def readline(self):
if self.read_return_none:
return None
if self.down:
raise Exception('mock is down')
if '\n' in self.outbuf:
@@ -166,6 +169,9 @@ class TestMemcached(unittest.TestCase):
self.assertEquals(memcache_client.get('some_key'), '6')
memcache_client.incr('some_key', delta=-15)
self.assertEquals(memcache_client.get('some_key'), '0')
mock.read_return_none = True
self.assertRaises(memcached.MemcacheConnectionError,
memcache_client.incr, 'some_key', delta=-15)
def test_decr(self):
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])
@@ -179,6 +185,10 @@ class TestMemcached(unittest.TestCase):
self.assertEquals(memcache_client.get('some_key'), '11')
memcache_client.decr('some_key', delta=15)
self.assertEquals(memcache_client.get('some_key'), '0')
mock.read_return_none = True
self.assertRaises(memcached.MemcacheConnectionError,
memcache_client.decr, 'some_key', delta=15)
def test_retry(self):
logging.getLogger().addHandler(NullLoggingHandler())

View File

@@ -249,6 +249,9 @@ class FakeRing(object):
{'ip': '10.0.0.%s' % x, 'port': 1000 + x, 'device': 'sda'}
return 1, devs
def get_part_nodes(self, part):
return self.get_nodes('blah')[1]
def get_more_nodes(self, nodes):
# 9 is the true cap
for x in xrange(3, min(3 + self.max_more_nodes, 9)):
@@ -2735,7 +2738,7 @@ class TestContainerController(unittest.TestCase):
self.assert_status_map(controller.DELETE,
(200, 204, 204, 204), 204)
self.assert_status_map(controller.DELETE,
(200, 204, 204, 503), 503)
(200, 204, 204, 503), 204)
self.assert_status_map(controller.DELETE,
(200, 204, 503, 503), 503)
self.assert_status_map(controller.DELETE,

View File

@@ -15,9 +15,11 @@
import unittest
from test.unit import tmpfile
import Queue
from swift.common import internal_proxy
from swift.stats import log_processor
from swift.common.exceptions import ChunkReadTimeout
class FakeUploadApp(object):
@@ -33,6 +35,11 @@ class DumbLogger(object):
pass
class DumbInternalProxy(object):
def __init__(self, code=200, timeout=False, bad_compressed=False):
self.code = code
self.timeout = timeout
self.bad_compressed = bad_compressed
def get_container_list(self, account, container, marker=None,
end_marker=None):
n = '2010/03/14/13/obj1'
@@ -46,22 +53,28 @@ class DumbInternalProxy(object):
return []
def get_object(self, account, container, object_name):
code = 200
if object_name.endswith('.gz'):
# same data as below, compressed with gzip -9
def data():
yield '\x1f\x8b\x08'
yield '\x08"\xd79L'
yield '\x02\x03te'
yield 'st\x00\xcbO'
yield '\xca\xe2JI,I'
yield '\xe4\x02\x00O\xff'
yield '\xa3Y\t\x00\x00\x00'
if self.bad_compressed:
# invalid compressed data
def data():
yield '\xff\xff\xff\xff\xff\xff\xff'
else:
# 'obj\ndata', compressed with gzip -9
def data():
yield '\x1f\x8b\x08'
yield '\x08"\xd79L'
yield '\x02\x03te'
yield 'st\x00\xcbO'
yield '\xca\xe2JI,I'
yield '\xe4\x02\x00O\xff'
yield '\xa3Y\t\x00\x00\x00'
else:
def data():
yield 'obj\n'
if self.timeout:
raise ChunkReadTimeout
yield 'data'
return code, data()
return self.code, data()
class TestLogProcessor(unittest.TestCase):
@@ -159,6 +172,19 @@ use = egg:swift#proxy
'prefix_query': 0}}
self.assertEquals(result, expected)
def test_process_one_access_file_error(self):
access_proxy_config = self.proxy_config.copy()
access_proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
p = log_processor.LogProcessor(access_proxy_config, DumbLogger())
p._internal_proxy = DumbInternalProxy(code=500)
self.assertRaises(log_processor.BadFileDownload, p.process_one_file,
'access', 'a', 'c', 'o')
def test_get_container_listing(self):
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
p._internal_proxy = DumbInternalProxy()
@@ -193,6 +219,18 @@ use = egg:swift#proxy
result = list(p.get_object_data('a', 'c', 'o.gz', True))
self.assertEquals(result, expected)
def test_get_object_data_errors(self):
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
p._internal_proxy = DumbInternalProxy(code=500)
result = p.get_object_data('a', 'c', 'o')
self.assertRaises(log_processor.BadFileDownload, list, result)
p._internal_proxy = DumbInternalProxy(bad_compressed=True)
result = p.get_object_data('a', 'c', 'o.gz', True)
self.assertRaises(log_processor.BadFileDownload, list, result)
p._internal_proxy = DumbInternalProxy(timeout=True)
result = p.get_object_data('a', 'c', 'o')
self.assertRaises(log_processor.BadFileDownload, list, result)
def test_get_stat_totals(self):
stats_proxy_config = self.proxy_config.copy()
stats_proxy_config.update({
@@ -262,3 +300,130 @@ use = egg:swift#proxy
# these only work for Py2.7+
#self.assertIsInstance(k, str)
self.assertTrue(isinstance(k, str), type(k))
def test_collate_worker(self):
try:
log_processor.LogProcessor._internal_proxy = DumbInternalProxy()
def get_object_data(*a,**kw):
return [self.access_test_line]
orig_get_object_data = log_processor.LogProcessor.get_object_data
log_processor.LogProcessor.get_object_data = get_object_data
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
processor_args = (proxy_config, DumbLogger())
q_in = Queue.Queue()
q_out = Queue.Queue()
work_request = ('access', 'a','c','o')
q_in.put(work_request)
q_in.put(None)
log_processor.collate_worker(processor_args, q_in, q_out)
item, ret = q_out.get()
self.assertEquals(item, work_request)
expected = {('acct', '2010', '07', '09', '04'):
{('public', 'object', 'GET', '2xx'): 1,
('public', 'bytes_out'): 95,
'marker_query': 0,
'format_query': 1,
'delimiter_query': 0,
'path_query': 0,
('public', 'bytes_in'): 6,
'prefix_query': 0}}
self.assertEquals(ret, expected)
finally:
log_processor.LogProcessor._internal_proxy = None
log_processor.LogProcessor.get_object_data = orig_get_object_data
def test_collate_worker_error(self):
def get_object_data(*a,**kw):
raise log_processor.BadFileDownload()
orig_get_object_data = log_processor.LogProcessor.get_object_data
try:
log_processor.LogProcessor.get_object_data = get_object_data
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
processor_args = (proxy_config, DumbLogger())
q_in = Queue.Queue()
q_out = Queue.Queue()
work_request = ('access', 'a','c','o')
q_in.put(work_request)
q_in.put(None)
log_processor.collate_worker(processor_args, q_in, q_out)
item, ret = q_out.get()
self.assertEquals(item, work_request)
# these only work for Py2.7+
#self.assertIsInstance(ret, log_processor.BadFileDownload)
self.assertTrue(isinstance(ret, log_processor.BadFileDownload),
type(ret))
finally:
log_processor.LogProcessor.get_object_data = orig_get_object_data
def test_multiprocess_collate(self):
try:
log_processor.LogProcessor._internal_proxy = DumbInternalProxy()
def get_object_data(*a,**kw):
return [self.access_test_line]
orig_get_object_data = log_processor.LogProcessor.get_object_data
log_processor.LogProcessor.get_object_data = get_object_data
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
processor_args = (proxy_config, DumbLogger())
item = ('access', 'a','c','o')
logs_to_process = [item]
results = log_processor.multiprocess_collate(processor_args,
logs_to_process,
1)
results = list(results)
expected = [(item, {('acct', '2010', '07', '09', '04'):
{('public', 'object', 'GET', '2xx'): 1,
('public', 'bytes_out'): 95,
'marker_query': 0,
'format_query': 1,
'delimiter_query': 0,
'path_query': 0,
('public', 'bytes_in'): 6,
'prefix_query': 0}})]
self.assertEquals(results, expected)
finally:
log_processor.LogProcessor._internal_proxy = None
log_processor.LogProcessor.get_object_data = orig_get_object_data
def test_multiprocess_collate_errors(self):
def get_object_data(*a,**kw):
raise log_processor.BadFileDownload()
orig_get_object_data = log_processor.LogProcessor.get_object_data
try:
log_processor.LogProcessor.get_object_data = get_object_data
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
processor_args = (proxy_config, DumbLogger())
item = ('access', 'a','c','o')
logs_to_process = [item]
results = log_processor.multiprocess_collate(processor_args,
logs_to_process,
1)
results = list(results)
expected = []
self.assertEquals(results, expected)
finally:
log_processor.LogProcessor._internal_proxy = None
log_processor.LogProcessor.get_object_data = orig_get_object_data