Move replication allow method to decorators

Remove logic of allowed methods list from object, container and account
servers. Instead of it add replicator decorator to utils and use new
decorator for REPLICATE methods in object/account/container servers.
This decorator mark method as special for usfor use only by the
replication.

If the option replication_server is not used, then this mechanism is not
enabled. If the replicaton_server option is set (not None) then the
respective server is a replicator (option value is True) and should use
ONLY the methods marked for replication server using the decorator, or
it is a normal server type and should NOT use methods marked for the
replication server.

Change-Id: I1041b31413cd0c39000317cc57a8c27816e1dfe8
This commit is contained in:
Vladimir Vechkanov 2013-05-23 20:16:21 +04:00
parent bc35717a61
commit bc08215f83
7 changed files with 111 additions and 135 deletions

View File

@ -27,7 +27,7 @@ from swift.account.utils import account_listing_response, \
from swift.common.db import AccountBroker, DatabaseConnectionError
from swift.common.utils import get_logger, get_param, hash_path, public, \
normalize_timestamp, storage_directory, config_true_value, \
validate_device_partition, json, timing_stats
validate_device_partition, json, timing_stats, replication
from swift.common.constraints import ACCOUNT_LISTING_LIMIT, \
check_mount, check_float, check_utf8, FORMAT2CONTENT_TYPE
from swift.common.db_replicator import ReplicatorRpc
@ -49,17 +49,9 @@ class AccountController(object):
self.root = conf.get('devices', '/srv/node')
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
replication_server = conf.get('replication_server', None)
if replication_server is None:
allowed_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'REPLICATE',
'POST']
else:
if replication_server is not None:
replication_server = config_true_value(replication_server)
if replication_server:
allowed_methods = ['REPLICATE']
else:
allowed_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'POST']
self.replication_server = replication_server
self.allowed_methods = allowed_methods
self.replicator_rpc = ReplicatorRpc(self.root, DATADIR, AccountBroker,
self.mount_check,
logger=self.logger)
@ -259,6 +251,7 @@ class AccountController(object):
delimiter)
@public
@replication
@timing_stats()
def REPLICATE(self, req):
"""
@ -323,7 +316,9 @@ class AccountController(object):
try:
method = getattr(self, req.method)
getattr(method, 'publicly_accessible')
if req.method not in self.allowed_methods:
replication_method = getattr(method, 'replication', False)
if (self.replication_server is not None and
self.replication_server != replication_method):
raise AttributeError('Not allowed method.')
except AttributeError:
res = HTTPMethodNotAllowed()

View File

@ -1841,6 +1841,24 @@ def streq_const_time(s1, s2):
return result == 0
def replication(func):
"""
Decorator to declare which methods are accessible for different
type of servers:
* If option replication_server is None then this decorator
doesn't matter.
* If option replication_server is True then ONLY decorated with
this decorator methods will be started.
* If option replication_server is False then decorated with this
decorator methods will NOT be started.
:param func: function to mark accessible for replication
"""
func.replication = True
return func
def public(func):
"""
Decorator to declare which methods are publicly accessible as HTTP

View File

@ -27,7 +27,8 @@ import swift.common.db
from swift.common.db import ContainerBroker
from swift.common.utils import get_logger, get_param, hash_path, public, \
normalize_timestamp, storage_directory, validate_sync_to, \
config_true_value, validate_device_partition, json, timing_stats
config_true_value, validate_device_partition, json, timing_stats, \
replication
from swift.common.constraints import CONTAINER_LISTING_LIMIT, \
check_mount, check_float, check_utf8, FORMAT2CONTENT_TYPE
from swift.common.bufferedhttp import http_connect
@ -56,17 +57,9 @@ class ContainerController(object):
self.node_timeout = int(conf.get('node_timeout', 3))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
replication_server = conf.get('replication_server', None)
if replication_server is None:
allowed_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'REPLICATE',
'POST']
else:
if replication_server is not None:
replication_server = config_true_value(replication_server)
if replication_server:
allowed_methods = ['REPLICATE']
else:
allowed_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'POST']
self.replication_server = replication_server
self.allowed_methods = allowed_methods
self.allowed_sync_hosts = [
h.strip()
for h in conf.get('allowed_sync_hosts', '127.0.0.1').split(',')
@ -469,6 +462,7 @@ class ContainerController(object):
return ret
@public
@replication
@timing_stats(sample_rate=0.01)
def REPLICATE(self, req):
"""
@ -542,7 +536,9 @@ class ContainerController(object):
try:
method = getattr(self, req.method)
getattr(method, 'publicly_accessible')
if req.method not in self.allowed_methods:
replication_method = getattr(method, 'replication', False)
if (self.replication_server is not None and
self.replication_server != replication_method):
raise AttributeError('Not allowed method.')
except AttributeError:
res = HTTPMethodNotAllowed()

View File

@ -35,7 +35,7 @@ from swift.common.utils import mkdirs, normalize_timestamp, public, \
storage_directory, hash_path, renamer, fallocate, fsync, fdatasync, \
split_path, drop_buffer_cache, get_logger, write_pickle, \
config_true_value, validate_device_partition, timing_stats, \
ThreadPool
ThreadPool, replication
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_object_creation, check_mount, \
check_float, check_utf8
@ -496,17 +496,9 @@ class ObjectController(object):
self.slow = int(conf.get('slow', 0))
self.bytes_per_sync = int(conf.get('mb_per_sync', 512)) * 1024 * 1024
replication_server = conf.get('replication_server', None)
if replication_server is None:
allowed_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'REPLICATE',
'POST']
else:
if replication_server is not None:
replication_server = config_true_value(replication_server)
if replication_server:
allowed_methods = ['REPLICATE']
else:
allowed_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'POST']
self.replication_server = replication_server
self.allowed_methods = allowed_methods
self.threads_per_disk = int(conf.get('threads_per_disk', '0'))
self.threadpools = defaultdict(
lambda: ThreadPool(nthreads=self.threads_per_disk))
@ -1006,6 +998,7 @@ class ObjectController(object):
return resp
@public
@replication
@timing_stats(sample_rate=0.1)
def REPLICATE(self, request):
"""
@ -1043,7 +1036,9 @@ class ObjectController(object):
try:
method = getattr(self, req.method)
getattr(method, 'publicly_accessible')
if req.method not in self.allowed_methods:
replication_method = getattr(method, 'replication', False)
if (self.replication_server is not None and
self.replication_server != replication_method):
raise AttributeError('Not allowed method.')
except AttributeError:
res = HTTPMethodNotAllowed()

View File

@ -25,7 +25,7 @@ import xml.dom.minidom
from swift.common.swob import Request
from swift.account.server import AccountController, ACCOUNT_LISTING_LIMIT
from swift.common.utils import normalize_timestamp
from swift.common.utils import normalize_timestamp, replication, public
class TestAccountController(unittest.TestCase):
@ -1347,24 +1347,14 @@ class TestAccountController(unittest.TestCase):
def test_list_allowed_methods(self):
""" Test list of allowed_methods """
methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'REPLICATE', 'POST']
self.assertEquals(self.controller.allowed_methods, methods)
def test_allowed_methods_from_configuration_file(self):
"""
Test list of allowed_methods which
were set from configuration file.
"""
conf = {'devices': self.testdir, 'mount_check': 'false'}
self.assertEquals(AccountController(conf).allowed_methods,
['DELETE', 'PUT', 'HEAD', 'GET', 'REPLICATE',
'POST'])
conf['replication_server'] = 'True'
self.assertEquals(AccountController(conf).allowed_methods,
['REPLICATE'])
conf['replication_server'] = 'False'
self.assertEquals(AccountController(conf).allowed_methods,
['DELETE', 'PUT', 'HEAD', 'GET', 'POST'])
obj_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'POST']
repl_methods = ['REPLICATE']
for method_name in obj_methods:
method = getattr(self.controller, method_name)
self.assertFalse(hasattr(method, 'replication'))
for method_name in repl_methods:
method = getattr(self.controller, method_name)
self.assertEquals(method.replication, True)
def test_correct_allowed_method(self):
"""
@ -1374,13 +1364,15 @@ class TestAccountController(unittest.TestCase):
inbuf = StringIO()
errbuf = StringIO()
outbuf = StringIO()
self.controller = AccountController(
{'devices': self.testdir, 'mount_check': 'false',
'replication_server': 'false'})
def start_response(*args):
""" Sends args to outbuf """
outbuf.writelines(args)
method = self.controller.allowed_methods[0]
method = 'PUT'
env = {'REQUEST_METHOD': method,
'SCRIPT_NAME': '',
'PATH_INFO': '/sda1/p/a/c',
@ -1396,14 +1388,13 @@ class TestAccountController(unittest.TestCase):
'wsgi.multiprocess': False,
'wsgi.run_once': False}
answer = ['<html><h1>Method Not Allowed</h1><p>The method is not '
'allowed for this resource.</p></html>']
method_res = mock.MagicMock()
mock_method = public(lambda x: mock.MagicMock(return_value=method_res))
with mock.patch.object(self.controller, method,
return_value=mock.MagicMock()) as mock_method:
new=mock_method):
mock_method.replication = False
response = self.controller.__call__(env, start_response)
self.assertNotEqual(response, answer)
self.assertEqual(mock_method.call_count, 1)
self.assertEqual(response, method_res)
def test_not_allowed_method(self):
"""
@ -1413,13 +1404,15 @@ class TestAccountController(unittest.TestCase):
inbuf = StringIO()
errbuf = StringIO()
outbuf = StringIO()
self.controller = AccountController(
{'devices': self.testdir, 'mount_check': 'false',
'replication_server': 'false'})
def start_response(*args):
""" Sends args to outbuf """
outbuf.writelines(args)
method = self.controller.allowed_methods[0]
method = 'PUT'
env = {'REQUEST_METHOD': method,
'SCRIPT_NAME': '',
'PATH_INFO': '/sda1/p/a/c',
@ -1437,14 +1430,12 @@ class TestAccountController(unittest.TestCase):
answer = ['<html><h1>Method Not Allowed</h1><p>The method is not '
'allowed for this resource.</p></html>']
mock_method = replication(public(lambda x: mock.MagicMock()))
with mock.patch.object(self.controller, method,
return_value=mock.MagicMock()) as mock_method:
self.controller.allowed_methods.remove(method)
new=mock_method):
mock_method.replication = True
response = self.controller.__call__(env, start_response)
self.assertEqual(mock_method.call_count, 0)
self.assertEqual(response, answer)
self.controller.allowed_methods.append(method)
if __name__ == '__main__':
unittest.main()

View File

@ -28,7 +28,7 @@ import simplejson
from swift.common.swob import Request, HeaderKeyDict
import swift.container
from swift.container import server as container_server
from swift.common.utils import normalize_timestamp, mkdirs
from swift.common.utils import normalize_timestamp, mkdirs, public, replication
from test.unit import fake_http_connect
@ -1435,25 +1435,14 @@ class TestContainerController(unittest.TestCase):
def test_list_allowed_methods(self):
""" Test list of allowed_methods """
methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'REPLICATE', 'POST']
self.assertEquals(self.controller.allowed_methods, methods)
def test_allowed_methods_from_configuration_file(self):
"""
Test list of allowed_methods which
were set from configuration file.
"""
container_controller = container_server.ContainerController
conf = {'devices': self.testdir, 'mount_check': 'false'}
self.assertEquals(container_controller(conf).allowed_methods,
['DELETE', 'PUT', 'HEAD', 'GET', 'REPLICATE',
'POST'])
conf['replication_server'] = 'True'
self.assertEquals(container_controller(conf).allowed_methods,
['REPLICATE'])
conf['replication_server'] = 'False'
self.assertEquals(container_controller(conf).allowed_methods,
['DELETE', 'PUT', 'HEAD', 'GET', 'POST'])
obj_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'POST']
repl_methods = ['REPLICATE']
for method_name in obj_methods:
method = getattr(self.controller, method_name)
self.assertFalse(hasattr(method, 'replication'))
for method_name in repl_methods:
method = getattr(self.controller, method_name)
self.assertEquals(method.replication, True)
def test_correct_allowed_method(self):
"""
@ -1463,12 +1452,15 @@ class TestContainerController(unittest.TestCase):
inbuf = StringIO()
errbuf = StringIO()
outbuf = StringIO()
self.controller = container_server.ContainerController(
{'devices': self.testdir, 'mount_check': 'false',
'replication_server': 'false'})
def start_response(*args):
""" Sends args to outbuf """
outbuf.writelines(args)
method = self.controller.allowed_methods[0]
method = 'PUT'
env = {'REQUEST_METHOD': method,
'SCRIPT_NAME': '',
@ -1485,14 +1477,11 @@ class TestContainerController(unittest.TestCase):
'wsgi.multiprocess': False,
'wsgi.run_once': False}
answer = ['<html><h1>Method Not Allowed</h1><p>The method is not '
'allowed for this resource.</p></html>']
with mock.patch.object(self.controller, method,
return_value=mock.MagicMock()) as mock_method:
method_res = mock.MagicMock()
mock_method = public(lambda x: mock.MagicMock(return_value=method_res))
with mock.patch.object(self.controller, method, new=mock_method):
response = self.controller.__call__(env, start_response)
self.assertNotEqual(response, answer)
self.assertEqual(mock_method.call_count, 1)
self.assertEqual(response, method_res)
def test_not_allowed_method(self):
"""
@ -1502,12 +1491,15 @@ class TestContainerController(unittest.TestCase):
inbuf = StringIO()
errbuf = StringIO()
outbuf = StringIO()
self.controller = container_server.ContainerController(
{'devices': self.testdir, 'mount_check': 'false',
'replication_server': 'false'})
def start_response(*args):
""" Sends args to outbuf """
outbuf.writelines(args)
method = self.controller.allowed_methods[0]
method = 'PUT'
env = {'REQUEST_METHOD': method,
'SCRIPT_NAME': '',
@ -1526,14 +1518,10 @@ class TestContainerController(unittest.TestCase):
answer = ['<html><h1>Method Not Allowed</h1><p>The method is not '
'allowed for this resource.</p></html>']
with mock.patch.object(self.controller, method,
return_value=mock.MagicMock()) as mock_method:
self.controller.allowed_methods.remove(method)
mock_method = replication(public(lambda x: mock.MagicMock()))
with mock.patch.object(self.controller, method, new=mock_method):
response = self.controller.__call__(env, start_response)
self.assertEqual(mock_method.call_count, 0)
self.assertEqual(response, answer)
self.controller.allowed_methods.append(method)
if __name__ == '__main__':

View File

@ -35,7 +35,8 @@ from test.unit import connect_tcp, readuntil2crlfs
from swift.obj import server as object_server
from swift.common import utils
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
NullLogger, storage_directory
NullLogger, storage_directory, public, \
replication
from swift.common.exceptions import DiskFileNotExist
from swift.common import constraints
from eventlet import tpool
@ -2790,24 +2791,14 @@ class TestObjectController(unittest.TestCase):
def test_list_allowed_methods(self):
""" Test list of allowed_methods """
methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'REPLICATE', 'POST']
self.assertEquals(self.object_controller.allowed_methods, methods)
def test_allowed_methods_from_configuration_file(self):
"""
Test list of allowed_methods which
were set from configuration file.
"""
conf = {'devices': self.testdir, 'mount_check': 'false'}
self.assertEquals(object_server.ObjectController(conf).allowed_methods,
['DELETE', 'PUT', 'HEAD', 'GET', 'REPLICATE',
'POST'])
conf['replication_server'] = 'True'
self.assertEquals(object_server.ObjectController(conf).allowed_methods,
['REPLICATE'])
conf['replication_server'] = 'False'
self.assertEquals(object_server.ObjectController(conf).allowed_methods,
['DELETE', 'PUT', 'HEAD', 'GET', 'POST'])
obj_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'POST']
repl_methods = ['REPLICATE']
for method_name in obj_methods:
method = getattr(self.object_controller, method_name)
self.assertFalse(hasattr(method, 'replication'))
for method_name in repl_methods:
method = getattr(self.object_controller, method_name)
self.assertEquals(method.replication, True)
def test_correct_allowed_method(self):
"""
@ -2817,13 +2808,16 @@ class TestObjectController(unittest.TestCase):
inbuf = StringIO()
errbuf = StringIO()
outbuf = StringIO()
method_called = False
self.object_controller = object_server.ObjectController(
{'devices': self.testdir, 'mount_check': 'false',
'replication_server': 'false'})
def start_response(*args):
""" Sends args to outbuf """
outbuf.writelines(args)
method = self.object_controller.allowed_methods[0]
method = 'PUT'
env = {'REQUEST_METHOD': method,
'SCRIPT_NAME': '',
'PATH_INFO': '/sda1/p/a/c',
@ -2839,14 +2833,12 @@ class TestObjectController(unittest.TestCase):
'wsgi.multiprocess': False,
'wsgi.run_once': False}
answer = ['<html><h1>Method Not Allowed</h1><p>The method is not '
'allowed for this resource.</p></html>']
method_res = mock.MagicMock()
mock_method = public(lambda x: mock.MagicMock(return_value=method_res))
with mock.patch.object(self.object_controller, method,
return_value=mock.MagicMock()) as mock_method:
new=mock_method):
response = self.object_controller.__call__(env, start_response)
self.assertNotEqual(response, answer)
self.assertEqual(mock_method.call_count, 1)
self.assertEqual(response, method_res)
def test_not_allowed_method(self):
"""
@ -2856,12 +2848,15 @@ class TestObjectController(unittest.TestCase):
inbuf = StringIO()
errbuf = StringIO()
outbuf = StringIO()
self.object_controller = object_server.ObjectController(
{'devices': self.testdir, 'mount_check': 'false',
'replication_server': 'false'})
def start_response(*args):
""" Sends args to outbuf """
outbuf.writelines(args)
method = self.object_controller.allowed_methods[0]
method = 'PUT'
env = {'REQUEST_METHOD': method,
'SCRIPT_NAME': '',
@ -2880,14 +2875,12 @@ class TestObjectController(unittest.TestCase):
answer = ['<html><h1>Method Not Allowed</h1><p>The method is not '
'allowed for this resource.</p></html>']
mock_method = replication(public(lambda x: mock.MagicMock()))
with mock.patch.object(self.object_controller, method,
return_value=mock.MagicMock()) as mock_method:
self.object_controller.allowed_methods.remove(method)
new=mock_method):
mock_method.replication = True
response = self.object_controller.__call__(env, start_response)
self.assertEqual(mock_method.call_count, 0)
self.assertEqual(response, answer)
self.object_controller.allowed_methods.append(method)
if __name__ == '__main__':