Merge "Enable per policy proxy config options"
This commit is contained in:
commit
263dc8a3f3
@ -1751,7 +1751,9 @@ sorting_method shuffle Storage nodes can be chosen at
|
||||
control. In both the timing and
|
||||
affinity cases, equally-sorting nodes
|
||||
are still randomly chosen to spread
|
||||
load.
|
||||
load. This option may be overridden
|
||||
in a per-policy configuration
|
||||
section.
|
||||
timing_expiry 300 If the "timing" sorting_method is
|
||||
used, the timings will only be valid
|
||||
for the number of seconds configured
|
||||
@ -1809,14 +1811,18 @@ read_affinity None Specifies which backend servers t
|
||||
be given to the selection; lower
|
||||
numbers are higher priority.
|
||||
Default is empty, meaning no
|
||||
preference.
|
||||
preference. This option may be
|
||||
overridden in a per-policy
|
||||
configuration section.
|
||||
write_affinity None Specifies which backend servers to
|
||||
prefer on writes. Format is a comma
|
||||
separated list of affinity
|
||||
descriptors of the form r<N> for
|
||||
region N or r<N>z<M> for region N,
|
||||
zone M. Default is empty, meaning no
|
||||
preference.
|
||||
preference. This option may be
|
||||
overridden in a per-policy
|
||||
configuration section.
|
||||
write_affinity_node_count 2 * replicas The number of local (as governed by
|
||||
the write_affinity setting) nodes to
|
||||
attempt to contact first on writes,
|
||||
@ -1825,9 +1831,99 @@ write_affinity_node_count 2 * replicas The number of local (as governed
|
||||
'* replicas' at the end to have it
|
||||
use the number given times the number
|
||||
of replicas for the ring being used
|
||||
for the request.
|
||||
for the request. This option may be
|
||||
overridden in a per-policy
|
||||
configuration section.
|
||||
============================ =============== =====================================
|
||||
|
||||
Per policy configuration
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Some proxy-server configuration options may be overridden on a per-policy
|
||||
basis by including per-policy config section(s). These options are:
|
||||
|
||||
- sorting_method
|
||||
- read_affinity
|
||||
- write_affinity
|
||||
- write_affinity_node_count
|
||||
|
||||
The per-policy config section name must be of the form::
|
||||
|
||||
[proxy-server:policy:<policy index>]
|
||||
|
||||
.. note::
|
||||
|
||||
The per-policy config section name should refer to the policy index, not
|
||||
the policy name.
|
||||
|
||||
.. note::
|
||||
|
||||
The first part of proxy-server config section name must match the name of
|
||||
the proxy-server config section. This is typically ``proxy-server`` as
|
||||
shown above, but if different then the names of any per-policy config
|
||||
sections must be changed accordingly.
|
||||
|
||||
The value of an option specified in a per-policy section will override any
|
||||
value given in the proxy-server section for that policy only. Otherwise the
|
||||
value of these options will be that specified in the proxy-server section.
|
||||
|
||||
For example, the following section provides policy-specific options for a
|
||||
policy with index 3::
|
||||
|
||||
[proxy-server:policy:3]
|
||||
sorting_method = affinity
|
||||
read_affinity = r2=1
|
||||
write_affinity = r2
|
||||
write_affinity_node_count = 1 * replicas
|
||||
|
||||
.. note::
|
||||
|
||||
It is recommended that per-policy config options are *not* included in the
|
||||
``[DEFAULT]`` section. If they are then the following behavior applies.
|
||||
|
||||
Per-policy config sections will inherit options in the DEFAULT section of
|
||||
the config file, and any such inheritance will take precedence over
|
||||
inheriting options from the proxy-server config section.
|
||||
|
||||
Per-policy config section options will override options in the
|
||||
``[DEFAULT]`` section. Unlike the behavior described under `General Server
|
||||
Configuration`_ for paste-deploy ``filter`` and ``app`` sections, the
|
||||
``set`` keyword is not required for options to override in per-policy
|
||||
config sections.
|
||||
|
||||
For example, given the following settings in a config file::
|
||||
|
||||
[DEFAULT]
|
||||
sorting_method = affinity
|
||||
read_affinity = r0=100
|
||||
write_affinity = r0
|
||||
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
# use of set keyword here overrides [DEFAULT] option
|
||||
set read_affinity = r1=100
|
||||
# without set keyword, [DEFAULT] option overrides in a paste-deploy section
|
||||
write_affinity = r1
|
||||
|
||||
[proxy-server:policy:0]
|
||||
sorting_method = affinity
|
||||
# set keyword not required here to override [DEFAULT] option
|
||||
write_affinity = r1
|
||||
|
||||
would result in policy with index ``0`` having settings:
|
||||
|
||||
* ``read_affinity = r0=100`` (inherited from the ``[DEFAULT]`` section)
|
||||
* ``write_affinity = r1`` (specified in the policy 0 section)
|
||||
|
||||
and any other policy would have the default settings of:
|
||||
|
||||
* ``read_affinity = r1=100`` (set in the proxy-server section)
|
||||
* ``write_affinity = r0`` (inherited from the ``[DEFAULT]`` section)
|
||||
|
||||
|
||||
Tempauth
|
||||
^^^^^^^^
|
||||
|
||||
[tempauth]
|
||||
|
||||
===================== =============================== =======================
|
||||
|
@ -174,6 +174,7 @@ use = egg:swift#proxy
|
||||
# affinity cases, equally-sorting nodes are still randomly chosen to
|
||||
# spread load.
|
||||
# The valid values for sorting_method are "affinity", "shuffle", or "timing".
|
||||
# This option may be overridden in a per-policy configuration section.
|
||||
# sorting_method = shuffle
|
||||
#
|
||||
# If the "timing" sorting_method is used, the timings will only be valid for
|
||||
@ -211,6 +212,7 @@ use = egg:swift#proxy
|
||||
# anything in region 2, then everything else:
|
||||
# read_affinity = r1z1=100, r1z2=200, r2=300
|
||||
# Default is empty, meaning no preference.
|
||||
# This option may be overridden in a per-policy configuration section.
|
||||
# read_affinity =
|
||||
#
|
||||
# Specifies which backend servers to prefer on writes. Format is a comma
|
||||
@ -223,6 +225,7 @@ use = egg:swift#proxy
|
||||
# nodes:
|
||||
# write_affinity = r1, r2
|
||||
# Default is empty, meaning no preference.
|
||||
# This option may be overridden in a per-policy configuration section.
|
||||
# write_affinity =
|
||||
#
|
||||
# The number of local (as governed by the write_affinity setting) nodes to
|
||||
@ -230,6 +233,7 @@ use = egg:swift#proxy
|
||||
# should be an integer number, or use '* replicas' at the end to have it use
|
||||
# the number given times the number of replicas for the ring being used for the
|
||||
# request.
|
||||
# This option may be overridden in a per-policy configuration section.
|
||||
# write_affinity_node_count = 2 * replicas
|
||||
#
|
||||
# These are the headers whose values will only be shown to swift_owners. The
|
||||
@ -249,6 +253,18 @@ use = egg:swift#proxy
|
||||
# ionice_class =
|
||||
# ionice_priority =
|
||||
|
||||
# Some proxy-server configuration options may be overridden on a per-policy
|
||||
# basis by including per-policy config section(s). The value of any option
|
||||
# specified a per-policy section will override any value given in the
|
||||
# proxy-server section for that policy only. Otherwise the value of these
|
||||
# options will be that specified in the proxy-server section.
|
||||
# The section name should refer to the policy index, not the policy name.
|
||||
# [proxy-server:policy:<policy index>]
|
||||
# sorting_method =
|
||||
# read_affinity =
|
||||
# write_affinity =
|
||||
# write_affinity_node_count =
|
||||
|
||||
[filter:tempauth]
|
||||
use = egg:swift#tempauth
|
||||
# You can override the default log routing for this filter here:
|
||||
|
@ -2443,6 +2443,8 @@ def readconf(conf_path, section_name=None, log_name=None, defaults=None,
|
||||
else:
|
||||
c = ConfigParser(defaults)
|
||||
if hasattr(conf_path, 'readline'):
|
||||
if hasattr(conf_path, 'seek'):
|
||||
conf_path.seek(0)
|
||||
c.readfp(conf_path)
|
||||
else:
|
||||
if os.path.isdir(conf_path):
|
||||
|
@ -65,6 +65,7 @@ class NamedConfigLoader(loadwsgi.ConfigLoader):
|
||||
context = super(NamedConfigLoader, self).get_context(
|
||||
object_type, name=name, global_conf=global_conf)
|
||||
context.name = name
|
||||
context.local_conf['__name__'] = name
|
||||
return context
|
||||
|
||||
|
||||
@ -114,7 +115,7 @@ class ConfigString(NamedConfigLoader):
|
||||
self.filename = "string"
|
||||
defaults = {
|
||||
'here': "string",
|
||||
'__file__': "string",
|
||||
'__file__': StringIO(dedent(config_string)),
|
||||
}
|
||||
self.parser = loadwsgi.NicerConfigParser("string", defaults=defaults)
|
||||
self.parser.optionxform = str # Don't lower-case keys
|
||||
|
@ -1306,9 +1306,11 @@ class NodeIter(object):
|
||||
:param partition: ring partition to yield nodes for
|
||||
:param node_iter: optional iterable of nodes to try. Useful if you
|
||||
want to filter or reorder the nodes.
|
||||
:param policy: an instance of :class:`BaseStoragePolicy`. This should be
|
||||
None for an account or container ring.
|
||||
"""
|
||||
|
||||
def __init__(self, app, ring, partition, node_iter=None):
|
||||
def __init__(self, app, ring, partition, node_iter=None, policy=None):
|
||||
self.app = app
|
||||
self.ring = ring
|
||||
self.partition = partition
|
||||
@ -1324,7 +1326,8 @@ class NodeIter(object):
|
||||
# Use of list() here forcibly yanks the first N nodes (the primary
|
||||
# nodes) from node_iter, so the rest of its values are handoffs.
|
||||
self.primary_nodes = self.app.sort_nodes(
|
||||
list(itertools.islice(node_iter, num_primary_nodes)))
|
||||
list(itertools.islice(node_iter, num_primary_nodes)),
|
||||
policy=policy)
|
||||
self.handoff_iter = node_iter
|
||||
self._node_provider = None
|
||||
|
||||
|
@ -129,7 +129,7 @@ class BaseObjectController(Controller):
|
||||
self.container_name = unquote(container_name)
|
||||
self.object_name = unquote(object_name)
|
||||
|
||||
def iter_nodes_local_first(self, ring, partition):
|
||||
def iter_nodes_local_first(self, ring, partition, policy=None):
|
||||
"""
|
||||
Yields nodes for a ring partition.
|
||||
|
||||
@ -143,13 +143,13 @@ class BaseObjectController(Controller):
|
||||
:param ring: ring to get nodes from
|
||||
:param partition: ring partition to yield nodes for
|
||||
"""
|
||||
|
||||
is_local = self.app.write_affinity_is_local_fn
|
||||
policy_conf = self.app.get_policy_options(policy)
|
||||
is_local = policy_conf.write_affinity_is_local_fn
|
||||
if is_local is None:
|
||||
return self.app.iter_nodes(ring, partition)
|
||||
return self.app.iter_nodes(ring, partition, policy=policy)
|
||||
|
||||
primary_nodes = ring.get_part_nodes(partition)
|
||||
num_locals = self.app.write_affinity_node_count(len(primary_nodes))
|
||||
num_locals = policy_conf.write_affinity_node_count(len(primary_nodes))
|
||||
|
||||
all_nodes = itertools.chain(primary_nodes,
|
||||
ring.get_more_nodes(partition))
|
||||
@ -165,7 +165,7 @@ class BaseObjectController(Controller):
|
||||
all_nodes))
|
||||
|
||||
return self.app.iter_nodes(
|
||||
ring, partition, node_iter=local_first_node_iter)
|
||||
ring, partition, node_iter=local_first_node_iter, policy=policy)
|
||||
|
||||
def GETorHEAD(self, req):
|
||||
"""Handle HTTP GET or HEAD requests."""
|
||||
@ -184,7 +184,7 @@ class BaseObjectController(Controller):
|
||||
return aresp
|
||||
partition = obj_ring.get_part(
|
||||
self.account_name, self.container_name, self.object_name)
|
||||
node_iter = self.app.iter_nodes(obj_ring, partition)
|
||||
node_iter = self.app.iter_nodes(obj_ring, partition, policy=policy)
|
||||
|
||||
resp = self._get_or_head_response(req, node_iter, partition, policy)
|
||||
|
||||
@ -541,7 +541,7 @@ class BaseObjectController(Controller):
|
||||
"""
|
||||
obj_ring = policy.object_ring
|
||||
node_iter = GreenthreadSafeIterator(
|
||||
self.iter_nodes_local_first(obj_ring, partition))
|
||||
self.iter_nodes_local_first(obj_ring, partition, policy=policy))
|
||||
pile = GreenPile(len(nodes))
|
||||
|
||||
for nheaders in outgoing_headers:
|
||||
|
@ -16,6 +16,9 @@
|
||||
import mimetypes
|
||||
import os
|
||||
import socket
|
||||
|
||||
from collections import defaultdict
|
||||
|
||||
from swift import gettext_ as _
|
||||
from random import shuffle
|
||||
from time import time
|
||||
@ -32,7 +35,7 @@ from swift.common.ring import Ring
|
||||
from swift.common.utils import cache_from_env, get_logger, \
|
||||
get_remote_client, split_path, config_true_value, generate_trans_id, \
|
||||
affinity_key_function, affinity_locality_predicate, list_from_csv, \
|
||||
register_swift_info
|
||||
register_swift_info, readconf
|
||||
from swift.common.constraints import check_utf8, valid_api_version
|
||||
from swift.proxy.controllers import AccountController, ContainerController, \
|
||||
ObjectControllerRouter, InfoController
|
||||
@ -76,6 +79,67 @@ required_filters = [
|
||||
'catch_errors', 'gatekeeper', 'proxy_logging']}]
|
||||
|
||||
|
||||
def _label_for_policy(policy):
|
||||
if policy is not None:
|
||||
return 'policy %s (%s)' % (policy.idx, policy.name)
|
||||
return '(default)'
|
||||
|
||||
|
||||
class OverrideConf(object):
|
||||
"""
|
||||
Encapsulates proxy server properties that may be overridden e.g. for
|
||||
policy specific configurations.
|
||||
|
||||
:param conf: the proxy-server config dict.
|
||||
:param override_conf: a dict of overriding configuration options.
|
||||
"""
|
||||
def __init__(self, base_conf, override_conf):
|
||||
self.conf = base_conf
|
||||
self.override_conf = override_conf
|
||||
|
||||
self.sorting_method = self._get('sorting_method', 'shuffle').lower()
|
||||
self.read_affinity = self._get('read_affinity', '')
|
||||
try:
|
||||
self.read_affinity_sort_key = affinity_key_function(
|
||||
self.read_affinity)
|
||||
except ValueError as err:
|
||||
# make the message a little more useful
|
||||
raise ValueError("Invalid read_affinity value: %r (%s)" %
|
||||
(self.read_affinity, err.message))
|
||||
|
||||
self.write_affinity = self._get('write_affinity', '')
|
||||
try:
|
||||
self.write_affinity_is_local_fn \
|
||||
= affinity_locality_predicate(self.write_affinity)
|
||||
except ValueError as err:
|
||||
# make the message a little more useful
|
||||
raise ValueError("Invalid write_affinity value: %r (%s)" %
|
||||
(self.write_affinity, err.message))
|
||||
self.write_affinity_node_value = self._get(
|
||||
'write_affinity_node_count', '2 * replicas').lower()
|
||||
value = self.write_affinity_node_value.split()
|
||||
if len(value) == 1:
|
||||
wanc_value = int(value[0])
|
||||
self.write_affinity_node_count = lambda replicas: wanc_value
|
||||
elif len(value) == 3 and value[1] == '*' and value[2] == 'replicas':
|
||||
wanc_value = int(value[0])
|
||||
self.write_affinity_node_count = \
|
||||
lambda replicas: wanc_value * replicas
|
||||
else:
|
||||
raise ValueError(
|
||||
'Invalid write_affinity_node_count value: %r' %
|
||||
(' '.join(value)))
|
||||
|
||||
def __repr__(self):
|
||||
return ('sorting_method: %s, read_affinity: %s, write_affinity: %s, '
|
||||
'write_affinity_node_count: %s' %
|
||||
(self.sorting_method, self.read_affinity, self.write_affinity,
|
||||
self.write_affinity_node_value))
|
||||
|
||||
def _get(self, key, default):
|
||||
return self.override_conf.get(key, self.conf.get(key, default))
|
||||
|
||||
|
||||
class Application(object):
|
||||
"""WSGI application for the proxy server."""
|
||||
|
||||
@ -87,6 +151,9 @@ class Application(object):
|
||||
self.logger = get_logger(conf, log_route='proxy-server')
|
||||
else:
|
||||
self.logger = logger
|
||||
self._override_confs = self._load_per_policy_config(conf)
|
||||
self.sorts_by_timing = any(pc.sorting_method == 'timing'
|
||||
for pc in self._override_confs.values())
|
||||
|
||||
self._error_limiting = {}
|
||||
|
||||
@ -155,7 +222,6 @@ class Application(object):
|
||||
conf.get('strict_cors_mode', 't'))
|
||||
self.node_timings = {}
|
||||
self.timing_expiry = int(conf.get('timing_expiry', 300))
|
||||
self.sorting_method = conf.get('sorting_method', 'shuffle').lower()
|
||||
self.concurrent_gets = \
|
||||
config_true_value(conf.get('concurrent_gets'))
|
||||
self.concurrency_timeout = float(conf.get('concurrency_timeout',
|
||||
@ -170,33 +236,6 @@ class Application(object):
|
||||
else:
|
||||
raise ValueError(
|
||||
'Invalid request_node_count value: %r' % ''.join(value))
|
||||
try:
|
||||
self._read_affinity = read_affinity = conf.get('read_affinity', '')
|
||||
self.read_affinity_sort_key = affinity_key_function(read_affinity)
|
||||
except ValueError as err:
|
||||
# make the message a little more useful
|
||||
raise ValueError("Invalid read_affinity value: %r (%s)" %
|
||||
(read_affinity, err.message))
|
||||
try:
|
||||
write_affinity = conf.get('write_affinity', '')
|
||||
self.write_affinity_is_local_fn \
|
||||
= affinity_locality_predicate(write_affinity)
|
||||
except ValueError as err:
|
||||
# make the message a little more useful
|
||||
raise ValueError("Invalid write_affinity value: %r (%s)" %
|
||||
(write_affinity, err.message))
|
||||
value = conf.get('write_affinity_node_count',
|
||||
'2 * replicas').lower().split()
|
||||
if len(value) == 1:
|
||||
wanc_value = int(value[0])
|
||||
self.write_affinity_node_count = lambda replicas: wanc_value
|
||||
elif len(value) == 3 and value[1] == '*' and value[2] == 'replicas':
|
||||
wanc_value = int(value[0])
|
||||
self.write_affinity_node_count = \
|
||||
lambda replicas: wanc_value * replicas
|
||||
else:
|
||||
raise ValueError(
|
||||
'Invalid write_affinity_node_count value: %r' % ''.join(value))
|
||||
# swift_owner_headers are stripped by the account and container
|
||||
# controllers; we should extend header stripping to object controller
|
||||
# when a privileged object header is implemented.
|
||||
@ -235,15 +274,68 @@ class Application(object):
|
||||
account_autocreate=self.account_autocreate,
|
||||
**constraints.EFFECTIVE_CONSTRAINTS)
|
||||
|
||||
def _make_policy_override(self, policy, conf, override_conf):
|
||||
label_for_policy = _label_for_policy(policy)
|
||||
try:
|
||||
override = OverrideConf(conf, override_conf)
|
||||
self.logger.debug("Loaded override config for %s: %r" %
|
||||
(label_for_policy, override))
|
||||
return override
|
||||
except ValueError as err:
|
||||
raise ValueError(err.message + ' for %s' % label_for_policy)
|
||||
|
||||
def _load_per_policy_config(self, conf):
|
||||
"""
|
||||
Loads per-policy config override values from proxy server conf file.
|
||||
|
||||
:param conf: the proxy server local conf dict
|
||||
:return: a dict mapping :class:`BaseStoragePolicy` to an instance of
|
||||
:class:`OverrideConf` that has policy specific config attributes
|
||||
"""
|
||||
# the default conf will be used when looking up a policy that had no
|
||||
# override conf
|
||||
default_conf = self._make_policy_override(None, conf, {})
|
||||
override_confs = defaultdict(lambda: default_conf)
|
||||
# force None key to be set in the defaultdict so that it is found when
|
||||
# iterating over items in check_config
|
||||
override_confs[None] = default_conf
|
||||
for index, override_conf in conf.get('policy_config', {}).items():
|
||||
try:
|
||||
index = int(index)
|
||||
except ValueError:
|
||||
# require policies to be referenced by index; using index *or*
|
||||
# name isn't possible because names such as "3" are allowed
|
||||
raise ValueError(
|
||||
'Override config must refer to policy index: %r' % index)
|
||||
try:
|
||||
policy = POLICIES[index]
|
||||
except KeyError:
|
||||
raise ValueError(
|
||||
"No policy found for override config, index: %s" % index)
|
||||
override = self._make_policy_override(policy, conf, override_conf)
|
||||
override_confs[policy] = override
|
||||
return override_confs
|
||||
|
||||
def get_policy_options(self, policy):
|
||||
"""
|
||||
Return policy specific options.
|
||||
|
||||
:param policy: an instance of :class:`BaseStoragePolicy`
|
||||
:return: an instance of :class:`OverrideConf`
|
||||
"""
|
||||
return self._override_confs[policy]
|
||||
|
||||
def check_config(self):
|
||||
"""
|
||||
Check the configuration for possible errors
|
||||
"""
|
||||
if self._read_affinity and self.sorting_method != 'affinity':
|
||||
for policy, conf in self._override_confs.items():
|
||||
if conf.read_affinity and conf.sorting_method != 'affinity':
|
||||
self.logger.warning(
|
||||
_("sorting_method is set to '%s', not 'affinity'; "
|
||||
"read_affinity setting will have no effect."),
|
||||
self.sorting_method)
|
||||
_("sorting_method is set to '%(method)s', not 'affinity'; "
|
||||
"%(label)s read_affinity setting will have no effect."),
|
||||
{'label': _label_for_policy(policy),
|
||||
'method': conf.sorting_method})
|
||||
|
||||
def get_object_ring(self, policy_idx):
|
||||
"""
|
||||
@ -425,30 +517,34 @@ class Application(object):
|
||||
self.logger.exception(_('ERROR Unhandled exception in request'))
|
||||
return HTTPServerError(request=req)
|
||||
|
||||
def sort_nodes(self, nodes):
|
||||
'''
|
||||
def sort_nodes(self, nodes, policy=None):
|
||||
"""
|
||||
Sorts nodes in-place (and returns the sorted list) according to
|
||||
the configured strategy. The default "sorting" is to randomly
|
||||
shuffle the nodes. If the "timing" strategy is chosen, the nodes
|
||||
are sorted according to the stored timing data.
|
||||
'''
|
||||
|
||||
:param nodes: a list of nodes
|
||||
:param policy: an instance of :class:`BaseStoragePolicy`
|
||||
"""
|
||||
# In the case of timing sorting, shuffling ensures that close timings
|
||||
# (ie within the rounding resolution) won't prefer one over another.
|
||||
# Python's sort is stable (http://wiki.python.org/moin/HowTo/Sorting/)
|
||||
shuffle(nodes)
|
||||
if self.sorting_method == 'timing':
|
||||
policy_conf = self.get_policy_options(policy)
|
||||
if policy_conf.sorting_method == 'timing':
|
||||
now = time()
|
||||
|
||||
def key_func(node):
|
||||
timing, expires = self.node_timings.get(node['ip'], (-1.0, 0))
|
||||
return timing if expires > now else -1.0
|
||||
nodes.sort(key=key_func)
|
||||
elif self.sorting_method == 'affinity':
|
||||
nodes.sort(key=self.read_affinity_sort_key)
|
||||
elif policy_conf.sorting_method == 'affinity':
|
||||
nodes.sort(key=policy_conf.read_affinity_sort_key)
|
||||
return nodes
|
||||
|
||||
def set_node_timing(self, node, timing):
|
||||
if self.sorting_method != 'timing':
|
||||
if not self.sorts_by_timing:
|
||||
return
|
||||
now = time()
|
||||
timing = round(timing, 3) # sort timings to the millisecond
|
||||
@ -516,8 +612,9 @@ class Application(object):
|
||||
{'msg': msg.decode('utf-8'), 'ip': node['ip'],
|
||||
'port': node['port'], 'device': node['device']})
|
||||
|
||||
def iter_nodes(self, ring, partition, node_iter=None):
|
||||
return NodeIter(self, ring, partition, node_iter=node_iter)
|
||||
def iter_nodes(self, ring, partition, node_iter=None, policy=None):
|
||||
return NodeIter(self, ring, partition, node_iter=node_iter,
|
||||
policy=policy)
|
||||
|
||||
def exception_occurred(self, node, typ, additional_info,
|
||||
**kwargs):
|
||||
@ -575,10 +672,42 @@ class Application(object):
|
||||
self.logger.debug(_("Pipeline is \"%s\""), pipe)
|
||||
|
||||
|
||||
def parse_per_policy_config(conf):
|
||||
"""
|
||||
Search the config file for any per-policy config sections and load those
|
||||
sections to a dict mapping policy reference (name or index) to policy
|
||||
options.
|
||||
|
||||
:param conf: the proxy server conf dict
|
||||
:return: a dict mapping policy reference -> dict of policy options
|
||||
:raises ValueError: if a policy config section has an invalid name
|
||||
"""
|
||||
policy_config = {}
|
||||
try:
|
||||
all_conf = readconf(conf['__file__'])
|
||||
except KeyError:
|
||||
get_logger(conf).warning(
|
||||
"Unable to load policy specific configuration options: "
|
||||
"cannot access proxy server conf file")
|
||||
return policy_config
|
||||
|
||||
policy_section_prefix = conf['__name__'] + ':policy:'
|
||||
for section, options in all_conf.items():
|
||||
if not section.startswith(policy_section_prefix):
|
||||
continue
|
||||
policy_ref = section[len(policy_section_prefix):]
|
||||
policy_config[policy_ref] = options
|
||||
return policy_config
|
||||
|
||||
|
||||
def app_factory(global_conf, **local_conf):
|
||||
"""paste.deploy app factory for creating WSGI proxy apps."""
|
||||
conf = global_conf.copy()
|
||||
conf.update(local_conf)
|
||||
# Do this here so that the use of conf['__file__'] and conf['__name__'] is
|
||||
# isolated from the Application. This also simplifies tests that construct
|
||||
# an Application instance directly.
|
||||
conf['policy_config'] = parse_per_policy_config(conf)
|
||||
app = Application(conf)
|
||||
app.check_config()
|
||||
return app
|
||||
|
@ -169,6 +169,7 @@ class TestWSGI(unittest.TestCase):
|
||||
'here': os.path.dirname(conf_file),
|
||||
'conn_timeout': '0.2',
|
||||
'swift_dir': t,
|
||||
'__name__': 'proxy-server'
|
||||
}
|
||||
self.assertEqual(expected, conf)
|
||||
# logger works
|
||||
@ -234,6 +235,7 @@ class TestWSGI(unittest.TestCase):
|
||||
'here': conf_dir,
|
||||
'conn_timeout': '0.2',
|
||||
'swift_dir': conf_root,
|
||||
'__name__': 'proxy-server'
|
||||
}
|
||||
self.assertEqual(expected, conf)
|
||||
# logger works
|
||||
@ -571,7 +573,7 @@ class TestWSGI(unittest.TestCase):
|
||||
expected = {
|
||||
'__file__': os.path.join(path, 'server.conf.d'),
|
||||
'here': os.path.join(path, 'server.conf.d'),
|
||||
'port': '8080',
|
||||
'port': '8080', '__name__': 'main'
|
||||
}
|
||||
self.assertEqual(conf, expected)
|
||||
|
||||
|
@ -180,7 +180,7 @@ class TestContainerController(TestRingBase):
|
||||
self.assertNotEqual(context['headers']['x-timestamp'], '1.0')
|
||||
|
||||
def test_node_errors(self):
|
||||
self.app.sort_nodes = lambda n: n
|
||||
self.app.sort_nodes = lambda n, *args, **kwargs: n
|
||||
|
||||
for method in ('PUT', 'DELETE', 'POST'):
|
||||
def test_status_map(statuses, expected):
|
||||
|
@ -195,11 +195,12 @@ class BaseObjectControllerMixin(object):
|
||||
|
||||
def test_iter_nodes_local_first_noops_when_no_affinity(self):
|
||||
# this test needs a stable node order - most don't
|
||||
self.app.sort_nodes = lambda l: l
|
||||
self.app.sort_nodes = lambda l, *args, **kwargs: l
|
||||
controller = self.controller_cls(
|
||||
self.app, 'a', 'c', 'o')
|
||||
self.app.write_affinity_is_local_fn = None
|
||||
object_ring = self.policy.object_ring
|
||||
policy = self.policy
|
||||
self.app.get_policy_options(policy).write_affinity_is_local_fn = None
|
||||
object_ring = policy.object_ring
|
||||
all_nodes = object_ring.get_part_nodes(1)
|
||||
all_nodes.extend(object_ring.get_more_nodes(1))
|
||||
|
||||
@ -213,10 +214,11 @@ class BaseObjectControllerMixin(object):
|
||||
def test_iter_nodes_local_first_moves_locals_first(self):
|
||||
controller = self.controller_cls(
|
||||
self.app, 'a', 'c', 'o')
|
||||
self.app.write_affinity_is_local_fn = (
|
||||
policy_conf = self.app.get_policy_options(self.policy)
|
||||
policy_conf.write_affinity_is_local_fn = (
|
||||
lambda node: node['region'] == 1)
|
||||
# we'll write to one more than replica count local nodes
|
||||
self.app.write_affinity_node_count = lambda r: r + 1
|
||||
policy_conf.write_affinity_node_count = lambda r: r + 1
|
||||
|
||||
object_ring = self.policy.object_ring
|
||||
# make our fake ring have plenty of nodes, and not get limited
|
||||
@ -234,7 +236,7 @@ class BaseObjectControllerMixin(object):
|
||||
|
||||
# make sure we have enough local nodes (sanity)
|
||||
all_local_nodes = [n for n in all_nodes if
|
||||
self.app.write_affinity_is_local_fn(n)]
|
||||
policy_conf.write_affinity_is_local_fn(n)]
|
||||
self.assertGreaterEqual(len(all_local_nodes), self.replicas() + 1)
|
||||
|
||||
# finally, create the local_first_nodes iter and flatten it out
|
||||
@ -252,7 +254,8 @@ class BaseObjectControllerMixin(object):
|
||||
def test_iter_nodes_local_first_best_effort(self):
|
||||
controller = self.controller_cls(
|
||||
self.app, 'a', 'c', 'o')
|
||||
self.app.write_affinity_is_local_fn = (
|
||||
policy_conf = self.app.get_policy_options(self.policy)
|
||||
policy_conf.write_affinity_is_local_fn = (
|
||||
lambda node: node['region'] == 1)
|
||||
|
||||
object_ring = self.policy.object_ring
|
||||
@ -266,7 +269,7 @@ class BaseObjectControllerMixin(object):
|
||||
self.assertEqual(len(all_nodes), self.replicas() +
|
||||
POLICIES.default.object_ring.max_more_nodes)
|
||||
all_local_nodes = [n for n in all_nodes if
|
||||
self.app.write_affinity_is_local_fn(n)]
|
||||
policy_conf.write_affinity_is_local_fn(n)]
|
||||
self.assertEqual(len(all_local_nodes), self.replicas())
|
||||
# but the local nodes we do have are at the front of the local iter
|
||||
first_n_local_first_nodes = local_first_nodes[:len(all_local_nodes)]
|
||||
@ -575,6 +578,80 @@ class BaseObjectControllerMixin(object):
|
||||
|
||||
self.assertEqual(container_updates, expected)
|
||||
|
||||
def _check_write_affinity(
|
||||
self, conf, policy_conf, policy, affinity_regions, affinity_count):
|
||||
conf['policy_config'] = policy_conf
|
||||
app = PatchedObjControllerApp(
|
||||
conf, FakeMemcache(), account_ring=FakeRing(),
|
||||
container_ring=FakeRing(), logger=self.logger)
|
||||
|
||||
controller = self.controller_cls(app, 'a', 'c', 'o')
|
||||
|
||||
object_ring = app.get_object_ring(int(policy))
|
||||
# make our fake ring have plenty of nodes, and not get limited
|
||||
# artificially by the proxy max request node count
|
||||
object_ring.max_more_nodes = 100
|
||||
|
||||
all_nodes = object_ring.get_part_nodes(1)
|
||||
all_nodes.extend(object_ring.get_more_nodes(1))
|
||||
|
||||
# make sure we have enough local nodes (sanity)
|
||||
all_local_nodes = [n for n in all_nodes if
|
||||
n['region'] in affinity_regions]
|
||||
self.assertGreaterEqual(len(all_local_nodes), affinity_count)
|
||||
|
||||
# finally, create the local_first_nodes iter and flatten it out
|
||||
local_first_nodes = list(controller.iter_nodes_local_first(
|
||||
object_ring, 1, policy))
|
||||
|
||||
# check that the required number of local nodes were moved up the order
|
||||
node_regions = [node['region'] for node in local_first_nodes]
|
||||
self.assertTrue(
|
||||
all(r in affinity_regions for r in node_regions[:affinity_count]),
|
||||
'Unexpected region found in local nodes, expected %s but got %s' %
|
||||
(affinity_regions, node_regions))
|
||||
return app
|
||||
|
||||
def test_write_affinity_not_configured(self):
|
||||
# default is no write affinity so expect both regions 0 and 1
|
||||
self._check_write_affinity({}, {}, POLICIES[0], [0, 1],
|
||||
2 * self.replicas(POLICIES[0]))
|
||||
self._check_write_affinity({}, {}, POLICIES[1], [0, 1],
|
||||
2 * self.replicas(POLICIES[1]))
|
||||
|
||||
def test_write_affinity_proxy_server_config(self):
|
||||
# without overrides policies use proxy-server config section options
|
||||
conf = {'write_affinity_node_count': '1 * replicas',
|
||||
'write_affinity': 'r0'}
|
||||
self._check_write_affinity(conf, {}, POLICIES[0], [0],
|
||||
self.replicas(POLICIES[0]))
|
||||
self._check_write_affinity(conf, {}, POLICIES[1], [0],
|
||||
self.replicas(POLICIES[1]))
|
||||
|
||||
def test_write_affinity_per_policy_config(self):
|
||||
# check only per-policy configuration is sufficient
|
||||
conf = {}
|
||||
policy_conf = {'0': {'write_affinity_node_count': '1 * replicas',
|
||||
'write_affinity': 'r1'},
|
||||
'1': {'write_affinity_node_count': '5',
|
||||
'write_affinity': 'r0'}}
|
||||
self._check_write_affinity(conf, policy_conf, POLICIES[0], [1],
|
||||
self.replicas(POLICIES[0]))
|
||||
self._check_write_affinity(conf, policy_conf, POLICIES[1], [0], 5)
|
||||
|
||||
def test_write_affinity_per_policy_config_overrides_and_inherits(self):
|
||||
# check per-policy config is preferred over proxy-server section config
|
||||
conf = {'write_affinity_node_count': '1 * replicas',
|
||||
'write_affinity': 'r0'}
|
||||
policy_conf = {'0': {'write_affinity': 'r1'},
|
||||
'1': {'write_affinity_node_count': '3 * replicas'}}
|
||||
# policy 0 inherits default node count, override affinity to r1
|
||||
self._check_write_affinity(conf, policy_conf, POLICIES[0], [1],
|
||||
self.replicas(POLICIES[0]))
|
||||
# policy 1 inherits default affinity to r0, overrides node count
|
||||
self._check_write_affinity(conf, policy_conf, POLICIES[1], [0],
|
||||
3 * self.replicas(POLICIES[1]))
|
||||
|
||||
# end of BaseObjectControllerMixin
|
||||
|
||||
|
||||
@ -843,7 +920,7 @@ class TestReplicatedObjController(BaseObjectControllerMixin,
|
||||
|
||||
def test_PUT_connect_exceptions(self):
|
||||
object_ring = self.app.get_object_ring(None)
|
||||
self.app.sort_nodes = lambda n: n # disable shuffle
|
||||
self.app.sort_nodes = lambda n, *args, **kwargs: n # disable shuffle
|
||||
|
||||
def test_status_map(statuses, expected):
|
||||
self.app._error_limiting = {}
|
||||
|
@ -40,6 +40,7 @@ import re
|
||||
import random
|
||||
from collections import defaultdict
|
||||
import uuid
|
||||
from copy import deepcopy
|
||||
|
||||
import mock
|
||||
from eventlet import sleep, spawn, wsgi, Timeout, debug
|
||||
@ -67,7 +68,7 @@ from swift.common import utils, constraints
|
||||
from swift.common.utils import hash_path, storage_directory, \
|
||||
parse_content_type, parse_mime_headers, \
|
||||
iter_multipart_mime_documents, public, mkdirs, NullLogger
|
||||
from swift.common.wsgi import monkey_patch_mimetools, loadapp
|
||||
from swift.common.wsgi import monkey_patch_mimetools, loadapp, ConfigString
|
||||
from swift.proxy.controllers import base as proxy_base
|
||||
from swift.proxy.controllers.base import get_cache_key, cors_validation, \
|
||||
get_account_info, get_container_info
|
||||
@ -748,20 +749,156 @@ class TestProxyServer(unittest.TestCase):
|
||||
{'ip': '127.0.0.1'}]
|
||||
self.assertEqual(res, exp_sorting)
|
||||
|
||||
def test_node_affinity(self):
|
||||
baseapp = proxy_server.Application({'sorting_method': 'affinity',
|
||||
'read_affinity': 'r1=1'},
|
||||
def _do_sort_nodes(self, conf, policy_conf, nodes, policy,
|
||||
node_timings=None):
|
||||
# Note with shuffling mocked out, sort_nodes will by default return
|
||||
# nodes in the order they are given
|
||||
nodes = deepcopy(nodes)
|
||||
conf = deepcopy(conf)
|
||||
conf['policy_config'] = deepcopy(policy_conf)
|
||||
baseapp = proxy_server.Application(conf,
|
||||
FakeMemcache(),
|
||||
logger=FakeLogger(),
|
||||
container_ring=FakeRing(),
|
||||
account_ring=FakeRing())
|
||||
|
||||
nodes = [{'region': 2, 'zone': 1, 'ip': '127.0.0.1'},
|
||||
{'region': 1, 'zone': 2, 'ip': '127.0.0.2'}]
|
||||
if node_timings:
|
||||
for i, n in enumerate(nodes):
|
||||
baseapp.set_node_timing(n, node_timings[i])
|
||||
with mock.patch('swift.proxy.server.shuffle', lambda x: x):
|
||||
app_sorted = baseapp.sort_nodes(nodes)
|
||||
exp_sorted = [{'region': 1, 'zone': 2, 'ip': '127.0.0.2'},
|
||||
{'region': 2, 'zone': 1, 'ip': '127.0.0.1'}]
|
||||
self.assertEqual(exp_sorted, app_sorted)
|
||||
app_sorted = baseapp.sort_nodes(nodes, policy)
|
||||
self.assertFalse(baseapp.logger.get_lines_for_level('warning'))
|
||||
return baseapp, app_sorted
|
||||
|
||||
def test_sort_nodes_default(self):
|
||||
nodes = [{'region': 0, 'zone': 1, 'ip': '127.0.0.3'},
|
||||
{'region': 1, 'zone': 1, 'ip': '127.0.0.1'},
|
||||
{'region': 2, 'zone': 2, 'ip': '127.0.0.2'}]
|
||||
|
||||
# sanity check - no affinity conf results in node order unchanged
|
||||
app, actual = self._do_sort_nodes({}, {}, nodes, None)
|
||||
self.assertEqual(nodes, actual)
|
||||
|
||||
def test_sort_nodes_by_affinity_proxy_server_config(self):
|
||||
nodes = [{'region': 0, 'zone': 1, 'ip': '127.0.0.3'},
|
||||
{'region': 1, 'zone': 1, 'ip': '127.0.0.1'},
|
||||
{'region': 2, 'zone': 2, 'ip': '127.0.0.2'}]
|
||||
|
||||
# proxy-server affinity conf is to prefer r2
|
||||
conf = {'sorting_method': 'affinity', 'read_affinity': 'r2=1'}
|
||||
app, actual = self._do_sort_nodes(conf, {}, nodes, None)
|
||||
self.assertEqual([nodes[2], nodes[0], nodes[1]], actual)
|
||||
app, actual = self._do_sort_nodes(conf, {}, nodes, POLICIES[0])
|
||||
self.assertEqual([nodes[2], nodes[0], nodes[1]], actual)
|
||||
# check that node timings are not collected if sorting_method != timing
|
||||
self.assertFalse(app.sorts_by_timing) # sanity check
|
||||
self.assertFalse(app.node_timings) # sanity check
|
||||
|
||||
# proxy-server affinity conf is to prefer region 1
|
||||
conf = {'sorting_method': 'affinity', 'read_affinity': 'r1=1'}
|
||||
app, actual = self._do_sort_nodes(conf, {}, nodes, None)
|
||||
self.assertEqual([nodes[1], nodes[0], nodes[2]], actual)
|
||||
app, actual = self._do_sort_nodes(conf, {}, nodes, POLICIES[0])
|
||||
self.assertEqual([nodes[1], nodes[0], nodes[2]], actual)
|
||||
|
||||
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing()),
|
||||
StoragePolicy(1, 'one', False, object_ring=FakeRing())])
|
||||
def test_sort_nodes_by_affinity_per_policy(self):
|
||||
nodes = [{'region': 0, 'zone': 1, 'ip': '127.0.0.4'},
|
||||
{'region': 1, 'zone': 0, 'ip': '127.0.0.3'},
|
||||
{'region': 2, 'zone': 1, 'ip': '127.0.0.1'},
|
||||
{'region': 3, 'zone': 0, 'ip': '127.0.0.2'}]
|
||||
conf = {'sorting_method': 'affinity', 'read_affinity': 'r3=1'}
|
||||
per_policy = {'0': {'sorting_method': 'affinity',
|
||||
'read_affinity': 'r1=1'},
|
||||
'1': {'sorting_method': 'affinity',
|
||||
'read_affinity': 'r2=1'}}
|
||||
# policy 0 affinity prefers r1
|
||||
app, actual = self._do_sort_nodes(conf, per_policy, nodes, POLICIES[0])
|
||||
self.assertEqual([nodes[1], nodes[0], nodes[2], nodes[3]], actual)
|
||||
# policy 1 affinity prefers r2
|
||||
app, actual = self._do_sort_nodes(conf, per_policy, nodes, POLICIES[1])
|
||||
self.assertEqual([nodes[2], nodes[0], nodes[1], nodes[3]], actual)
|
||||
# default affinity prefers r3
|
||||
app, actual = self._do_sort_nodes(conf, per_policy, nodes, None)
|
||||
self.assertEqual([nodes[3], nodes[0], nodes[1], nodes[2]], actual)
|
||||
|
||||
def test_sort_nodes_by_affinity_per_policy_with_no_default(self):
|
||||
# no proxy-server setting but policy 0 prefers r0
|
||||
nodes = [{'region': 1, 'zone': 1, 'ip': '127.0.0.1'},
|
||||
{'region': 0, 'zone': 2, 'ip': '127.0.0.2'}]
|
||||
conf = {}
|
||||
per_policy = {'0': {'sorting_method': 'affinity',
|
||||
'read_affinity': 'r0=0'}}
|
||||
# policy 0 uses affinity sorting
|
||||
app, actual = self._do_sort_nodes(conf, per_policy, nodes, POLICIES[0])
|
||||
self.assertEqual([nodes[1], nodes[0]], actual)
|
||||
# any other policy will use default sorting
|
||||
app, actual = self._do_sort_nodes(conf, per_policy, nodes, None)
|
||||
self.assertEqual(nodes, actual)
|
||||
|
||||
def test_sort_nodes_by_affinity_per_policy_inherits(self):
|
||||
# policy 0 has read_affinity but no sorting_method override,
|
||||
nodes = [{'region': 1, 'zone': 1, 'ip': '127.0.0.1'},
|
||||
{'region': 0, 'zone': 2, 'ip': '127.0.0.2'}]
|
||||
conf = {}
|
||||
per_policy = {'0': {'read_affinity': 'r0=0'}}
|
||||
# policy 0 uses the default sorting method instead of affinity sorting
|
||||
app, actual = self._do_sort_nodes(conf, per_policy, nodes, POLICIES[0])
|
||||
self.assertEqual(nodes, actual)
|
||||
# but if proxy-server sorting_method is affinity then policy 0 inherits
|
||||
conf = {'sorting_method': 'affinity'}
|
||||
app, actual = self._do_sort_nodes(conf, per_policy, nodes, POLICIES[0])
|
||||
self.assertEqual([nodes[1], nodes[0]], actual)
|
||||
|
||||
def test_sort_nodes_by_affinity_per_policy_overrides(self):
|
||||
# default setting is to sort by timing but policy 0 uses read affinity
|
||||
nodes = [{'region': 0, 'zone': 1, 'ip': '127.0.0.3'},
|
||||
{'region': 1, 'zone': 1, 'ip': '127.0.0.1'},
|
||||
{'region': 2, 'zone': 2, 'ip': '127.0.0.2'}]
|
||||
node_timings = [10, 1, 100]
|
||||
conf = {'sorting_method': 'timing'}
|
||||
per_policy = {'0': {'sorting_method': 'affinity',
|
||||
'read_affinity': 'r1=1,r2=2'}}
|
||||
app, actual = self._do_sort_nodes(conf, per_policy, nodes, POLICIES[0],
|
||||
node_timings=node_timings)
|
||||
self.assertEqual([nodes[1], nodes[2], nodes[0]], actual)
|
||||
# check that timings are collected despite one policy using affinity
|
||||
self.assertTrue(app.sorts_by_timing)
|
||||
self.assertEqual(3, len(app.node_timings))
|
||||
# check app defaults to sorting by timing when no policy specified
|
||||
app, actual = self._do_sort_nodes(conf, per_policy, nodes, None,
|
||||
node_timings=node_timings)
|
||||
self.assertEqual([nodes[1], nodes[0], nodes[2]], actual)
|
||||
|
||||
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing()),
|
||||
StoragePolicy(1, 'one', False, object_ring=FakeRing())])
|
||||
def test_sort_nodes_by_timing_per_policy(self):
|
||||
# default setting is to sort by affinity but policy 0 uses timing
|
||||
nodes = [{'region': 0, 'zone': 1, 'ip': '127.0.0.3'},
|
||||
{'region': 1, 'zone': 1, 'ip': '127.0.0.1'},
|
||||
{'region': 2, 'zone': 2, 'ip': '127.0.0.2'}]
|
||||
node_timings = [10, 1, 100]
|
||||
|
||||
conf = {'sorting_method': 'affinity', 'read_affinity': 'r1=1,r2=2'}
|
||||
per_policy = {'0': {'sorting_method': 'timing',
|
||||
'read_affinity': 'r1=1,r2=2'}, # should be ignored
|
||||
'1': {'read_affinity': 'r2=1'}}
|
||||
# policy 0 uses timing
|
||||
app, actual = self._do_sort_nodes(conf, per_policy, nodes, POLICIES[0],
|
||||
node_timings=node_timings)
|
||||
self.assertEqual([nodes[1], nodes[0], nodes[2]], actual)
|
||||
self.assertTrue(app.sorts_by_timing)
|
||||
self.assertEqual(3, len(app.node_timings))
|
||||
|
||||
# policy 1 uses policy specific read affinity
|
||||
app, actual = self._do_sort_nodes(conf, per_policy, nodes, POLICIES[1],
|
||||
node_timings=node_timings)
|
||||
self.assertEqual([nodes[2], nodes[0], nodes[1]], actual)
|
||||
|
||||
# check that with no policy specified the default read affinity is used
|
||||
app, actual = self._do_sort_nodes(conf, per_policy, nodes, None,
|
||||
node_timings=node_timings)
|
||||
self.assertEqual([nodes[1], nodes[2], nodes[0]], actual)
|
||||
|
||||
def test_node_concurrency(self):
|
||||
nodes = [{'region': 1, 'zone': 1, 'ip': '127.0.0.1', 'port': 6010,
|
||||
@ -1141,6 +1278,468 @@ class TestProxyServerLoading(unittest.TestCase):
|
||||
self.assertTrue(policy.object_ring)
|
||||
|
||||
|
||||
@patch_policies()
|
||||
class TestProxyServerConfigLoading(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.tempdir = mkdtemp()
|
||||
account_ring_path = os.path.join(self.tempdir, 'account.ring.gz')
|
||||
write_fake_ring(account_ring_path)
|
||||
container_ring_path = os.path.join(self.tempdir, 'container.ring.gz')
|
||||
write_fake_ring(container_ring_path)
|
||||
|
||||
def tearDown(self):
|
||||
rmtree(self.tempdir)
|
||||
|
||||
def _write_conf(self, conf_body):
|
||||
# this is broken out to a method so that subclasses can override
|
||||
conf_path = os.path.join(self.tempdir, 'proxy-server.conf')
|
||||
with open(conf_path, 'w') as f:
|
||||
f.write(dedent(conf_body))
|
||||
return conf_path
|
||||
|
||||
def _write_conf_and_load_app(self, conf_sections):
|
||||
# write proxy-server.conf file, load app
|
||||
conf_body = """
|
||||
[DEFAULT]
|
||||
swift_dir = %s
|
||||
|
||||
[pipeline:main]
|
||||
pipeline = proxy-server
|
||||
|
||||
%s
|
||||
""" % (self.tempdir, conf_sections)
|
||||
|
||||
conf_path = self._write_conf(conf_body)
|
||||
with mock.patch('swift.proxy.server.get_logger',
|
||||
return_value=FakeLogger()):
|
||||
app = loadapp(conf_path, allow_modify_pipeline=False)
|
||||
return app
|
||||
|
||||
def _check_policy_conf(self, app, exp_conf, exp_is_local):
|
||||
# verify expected config
|
||||
for policy, options in exp_conf.items():
|
||||
for k, v in options.items():
|
||||
actual = getattr(app.get_policy_options(policy), k)
|
||||
if k == "write_affinity_node_count":
|
||||
if policy: # this check only applies when using a policy
|
||||
actual = actual(policy.object_ring.replica_count)
|
||||
self.assertEqual(v, actual)
|
||||
continue
|
||||
self.assertEqual(v, actual,
|
||||
"Expected %s=%s but got %s=%s for policy %s" %
|
||||
(k, v, k, actual, policy))
|
||||
|
||||
for policy, nodes in exp_is_local.items():
|
||||
fn = app.get_policy_options(policy).write_affinity_is_local_fn
|
||||
if nodes is None:
|
||||
self.assertIsNone(fn)
|
||||
continue
|
||||
for node, expected_result in nodes:
|
||||
actual = fn(node)
|
||||
self.assertIs(expected_result, actual,
|
||||
"Expected %s but got %s for %s, policy %s" %
|
||||
(expected_result, actual, node, policy))
|
||||
return app
|
||||
|
||||
def test_per_policy_conf_none_configured(self):
|
||||
conf_sections = """
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
"""
|
||||
expected_default = {"read_affinity": "",
|
||||
"sorting_method": "shuffle",
|
||||
"write_affinity_node_count": 6}
|
||||
exp_conf = {None: expected_default,
|
||||
POLICIES[0]: expected_default,
|
||||
POLICIES[1]: expected_default}
|
||||
exp_is_local = {POLICIES[0]: None,
|
||||
POLICIES[1]: None}
|
||||
app = self._write_conf_and_load_app(conf_sections)
|
||||
self._check_policy_conf(app, exp_conf, exp_is_local)
|
||||
|
||||
def test_per_policy_conf_one_configured(self):
|
||||
conf_sections = """
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
|
||||
[proxy-server:policy:0]
|
||||
sorting_method = affinity
|
||||
read_affinity = r1=100
|
||||
write_affinity = r1
|
||||
write_affinity_node_count = 1 * replicas
|
||||
"""
|
||||
expected_default = {"read_affinity": "",
|
||||
"sorting_method": "shuffle",
|
||||
"write_affinity_node_count": 6}
|
||||
exp_conf = {None: expected_default,
|
||||
POLICIES[0]: {"read_affinity": "r1=100",
|
||||
"sorting_method": "affinity",
|
||||
"write_affinity_node_count": 3},
|
||||
POLICIES[1]: expected_default}
|
||||
exp_is_local = {POLICIES[0]: [({'region': 1, 'zone': 2}, True),
|
||||
({'region': 2, 'zone': 1}, False)],
|
||||
POLICIES[1]: None}
|
||||
app = self._write_conf_and_load_app(conf_sections)
|
||||
self._check_policy_conf(app, exp_conf, exp_is_local)
|
||||
|
||||
default_conf = app.get_policy_options(None)
|
||||
self.assertEqual(
|
||||
('sorting_method: shuffle, read_affinity: , write_affinity: , '
|
||||
'write_affinity_node_count: 2 * replicas'),
|
||||
repr(default_conf))
|
||||
policy_0_conf = app.get_policy_options(POLICIES[0])
|
||||
self.assertEqual(
|
||||
('sorting_method: affinity, read_affinity: r1=100, '
|
||||
'write_affinity: r1, write_affinity_node_count: 1 * replicas'),
|
||||
repr(policy_0_conf))
|
||||
policy_1_conf = app.get_policy_options(POLICIES[1])
|
||||
self.assertIs(default_conf, policy_1_conf)
|
||||
|
||||
def test_per_policy_conf_inherits_defaults(self):
|
||||
conf_sections = """
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
sorting_method = affinity
|
||||
write_affinity_node_count = 1 * replicas
|
||||
|
||||
[proxy-server:policy:0]
|
||||
read_affinity = r1=100
|
||||
write_affinity = r1
|
||||
"""
|
||||
expected_default = {"read_affinity": "",
|
||||
"sorting_method": "affinity",
|
||||
"write_affinity_node_count": 3}
|
||||
exp_conf = {None: expected_default,
|
||||
POLICIES[0]: {"read_affinity": "r1=100",
|
||||
"sorting_method": "affinity",
|
||||
"write_affinity_node_count": 3},
|
||||
POLICIES[1]: expected_default}
|
||||
exp_is_local = {POLICIES[0]: [({'region': 1, 'zone': 2}, True),
|
||||
({'region': 2, 'zone': 1}, False)],
|
||||
POLICIES[1]: None}
|
||||
app = self._write_conf_and_load_app(conf_sections)
|
||||
self._check_policy_conf(app, exp_conf, exp_is_local)
|
||||
|
||||
def test_per_policy_conf_overrides_default_affinity(self):
|
||||
conf_sections = """
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
sorting_method = affinity
|
||||
read_affinity = r2=10
|
||||
write_affinity_node_count = 1 * replicas
|
||||
write_affinity = r2
|
||||
|
||||
[proxy-server:policy:0]
|
||||
read_affinity = r1=100
|
||||
write_affinity = r1
|
||||
write_affinity_node_count = 5
|
||||
|
||||
[proxy-server:policy:1]
|
||||
read_affinity = r1=1
|
||||
write_affinity = r3
|
||||
write_affinity_node_count = 4
|
||||
"""
|
||||
exp_conf = {None: {"read_affinity": "r2=10",
|
||||
"sorting_method": "affinity",
|
||||
"write_affinity_node_count": 3},
|
||||
POLICIES[0]: {"read_affinity": "r1=100",
|
||||
"sorting_method": "affinity",
|
||||
"write_affinity_node_count": 5},
|
||||
POLICIES[1]: {"read_affinity": "r1=1",
|
||||
"sorting_method": "affinity",
|
||||
"write_affinity_node_count": 4}}
|
||||
exp_is_local = {POLICIES[0]: [({'region': 1, 'zone': 2}, True),
|
||||
({'region': 2, 'zone': 1}, False)],
|
||||
POLICIES[1]: [({'region': 3, 'zone': 2}, True),
|
||||
({'region': 1, 'zone': 1}, False),
|
||||
({'region': 2, 'zone': 1}, False)]}
|
||||
app = self._write_conf_and_load_app(conf_sections)
|
||||
self._check_policy_conf(app, exp_conf, exp_is_local)
|
||||
|
||||
def test_per_policy_conf_overrides_default_sorting_method(self):
|
||||
conf_sections = """
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
sorting_method = timing
|
||||
|
||||
[proxy-server:policy:0]
|
||||
sorting_method = affinity
|
||||
read_affinity = r1=100
|
||||
|
||||
[proxy-server:policy:1]
|
||||
sorting_method = affinity
|
||||
read_affinity = r1=1
|
||||
"""
|
||||
exp_conf = {None: {"read_affinity": "",
|
||||
"sorting_method": "timing"},
|
||||
POLICIES[0]: {"read_affinity": "r1=100",
|
||||
"sorting_method": "affinity"},
|
||||
POLICIES[1]: {"read_affinity": "r1=1",
|
||||
"sorting_method": "affinity"}}
|
||||
app = self._write_conf_and_load_app(conf_sections)
|
||||
self._check_policy_conf(app, exp_conf, {})
|
||||
|
||||
def test_per_policy_conf_with_DEFAULT_options(self):
|
||||
conf_body = """
|
||||
[DEFAULT]
|
||||
write_affinity = r0
|
||||
read_affinity = r0=100
|
||||
swift_dir = %s
|
||||
|
||||
[pipeline:main]
|
||||
pipeline = proxy-server
|
||||
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
# in a paste-deploy section, DEFAULT section value overrides
|
||||
write_affinity = r2
|
||||
# ...but the use of 'set' overrides the DEFAULT section value
|
||||
set read_affinity = r1=100
|
||||
|
||||
[proxy-server:policy:0]
|
||||
# not a paste-deploy section so any value here overrides DEFAULT
|
||||
sorting_method = affinity
|
||||
write_affinity = r2
|
||||
read_affinity = r2=100
|
||||
|
||||
[proxy-server:policy:1]
|
||||
sorting_method = affinity
|
||||
""" % self.tempdir
|
||||
|
||||
conf_path = self._write_conf(conf_body)
|
||||
with mock.patch('swift.proxy.server.get_logger',
|
||||
return_value=FakeLogger()):
|
||||
app = loadapp(conf_path, allow_modify_pipeline=False)
|
||||
|
||||
exp_conf = {
|
||||
# default read_affinity is r1, set in proxy-server section
|
||||
None: {"read_affinity": "r1=100",
|
||||
"sorting_method": "shuffle",
|
||||
"write_affinity_node_count": 6},
|
||||
# policy 0 read affinity is r2, dictated by policy 0 section
|
||||
POLICIES[0]: {"read_affinity": "r2=100",
|
||||
"sorting_method": "affinity",
|
||||
"write_affinity_node_count": 6},
|
||||
# policy 1 read_affinity is r0, dictated by DEFAULT section,
|
||||
# overrides proxy server section
|
||||
POLICIES[1]: {"read_affinity": "r0=100",
|
||||
"sorting_method": "affinity",
|
||||
"write_affinity_node_count": 6}}
|
||||
exp_is_local = {
|
||||
# default write_affinity is r0, dictated by DEFAULT section
|
||||
None: [({'region': 0, 'zone': 2}, True),
|
||||
({'region': 1, 'zone': 1}, False)],
|
||||
# policy 0 write_affinity is r2, dictated by policy 0 section
|
||||
POLICIES[0]: [({'region': 0, 'zone': 2}, False),
|
||||
({'region': 2, 'zone': 1}, True)],
|
||||
# policy 1 write_affinity is r0, inherited from default
|
||||
POLICIES[1]: [({'region': 0, 'zone': 2}, True),
|
||||
({'region': 1, 'zone': 1}, False)]}
|
||||
self._check_policy_conf(app, exp_conf, exp_is_local)
|
||||
|
||||
def test_per_policy_conf_warns_about_sorting_method_mismatch(self):
|
||||
# verify that policy specific warnings are emitted when read_affinity
|
||||
# is set but sorting_method is not affinity
|
||||
conf_sections = """
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
read_affinity = r2=10
|
||||
sorting_method = timing
|
||||
|
||||
[proxy-server:policy:0]
|
||||
read_affinity = r1=100
|
||||
|
||||
[proxy-server:policy:1]
|
||||
sorting_method = affinity
|
||||
read_affinity = r1=1
|
||||
"""
|
||||
exp_conf = {None: {"read_affinity": "r2=10",
|
||||
"sorting_method": "timing"},
|
||||
POLICIES[0]: {"read_affinity": "r1=100",
|
||||
"sorting_method": "timing"},
|
||||
POLICIES[1]: {"read_affinity": "r1=1",
|
||||
"sorting_method": "affinity"}}
|
||||
app = self._write_conf_and_load_app(conf_sections)
|
||||
self._check_policy_conf(app, exp_conf, {})
|
||||
lines = app.logger.get_lines_for_level('warning')
|
||||
scopes = {'default', 'policy 0 (nulo)'}
|
||||
for line in lines[:2]:
|
||||
self.assertIn(
|
||||
"sorting_method is set to 'timing', not 'affinity'", line)
|
||||
for scope in scopes:
|
||||
if scope in line:
|
||||
scopes.remove(scope)
|
||||
break
|
||||
else:
|
||||
self.fail("None of %s found in warning: %r" % (scopes, line))
|
||||
self.assertFalse(scopes)
|
||||
|
||||
def test_per_policy_conf_with_unknown_policy(self):
|
||||
# verify that unknown policy section is warned about but doesn't break
|
||||
# other policy configs
|
||||
conf_sections = """
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
read_affinity = r2=10
|
||||
sorting_method = affinity
|
||||
|
||||
[proxy-server:policy:999]
|
||||
read_affinity = r2z1=1
|
||||
"""
|
||||
with self.assertRaises(ValueError) as cm:
|
||||
self._write_conf_and_load_app(conf_sections)
|
||||
self.assertIn('No policy found for override config, index: 999',
|
||||
cm.exception.message)
|
||||
|
||||
def test_per_policy_conf_sets_timing_sorting_method(self):
|
||||
conf_sections = """
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
sorting_method = affinity
|
||||
|
||||
[proxy-server:policy:0]
|
||||
sorting_method = timing
|
||||
|
||||
[proxy-server:policy:1]
|
||||
read_affinity = r1=1
|
||||
"""
|
||||
exp_conf = {None: {"read_affinity": "",
|
||||
"sorting_method": "affinity"},
|
||||
POLICIES[0]: {"read_affinity": "",
|
||||
"sorting_method": "timing"},
|
||||
POLICIES[1]: {"read_affinity": "r1=1",
|
||||
"sorting_method": "affinity"}}
|
||||
app = self._write_conf_and_load_app(conf_sections)
|
||||
self._check_policy_conf(app, exp_conf, {})
|
||||
|
||||
def test_per_policy_conf_invalid_read_affinity_value(self):
|
||||
def do_test(conf_sections, scope):
|
||||
with self.assertRaises(ValueError) as cm:
|
||||
self._write_conf_and_load_app(conf_sections)
|
||||
self.assertIn('broken', cm.exception.message)
|
||||
self.assertIn(
|
||||
'Invalid read_affinity value:', cm.exception.message)
|
||||
self.assertIn(scope, cm.exception.message)
|
||||
|
||||
conf_sections = """
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
sorting_method = affinity
|
||||
read_affinity = r1=1
|
||||
|
||||
[proxy-server:policy:0]
|
||||
sorting_method = affinity
|
||||
read_affinity = broken
|
||||
"""
|
||||
do_test(conf_sections, 'policy 0 (nulo)')
|
||||
|
||||
conf_sections = """
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
sorting_method = affinity
|
||||
read_affinity = broken
|
||||
|
||||
[proxy-server:policy:0]
|
||||
sorting_method = affinity
|
||||
read_affinity = r1=1
|
||||
"""
|
||||
do_test(conf_sections, '(default)')
|
||||
|
||||
def test_per_policy_conf_invalid_write_affinity_value(self):
|
||||
def do_test(conf_sections, scope):
|
||||
with self.assertRaises(ValueError) as cm:
|
||||
self._write_conf_and_load_app(conf_sections)
|
||||
self.assertIn('broken', cm.exception.message)
|
||||
self.assertIn(
|
||||
'Invalid write_affinity value:', cm.exception.message)
|
||||
self.assertIn(scope, cm.exception.message)
|
||||
|
||||
conf_sections = """
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
write_affinity = r1
|
||||
|
||||
[proxy-server:policy:0]
|
||||
sorting_method = affinity
|
||||
write_affinity = broken
|
||||
"""
|
||||
do_test(conf_sections, 'policy 0 (nulo)')
|
||||
|
||||
conf_sections = """
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
write_affinity = broken
|
||||
|
||||
[proxy-server:policy:0]
|
||||
write_affinity = r1
|
||||
"""
|
||||
do_test(conf_sections, '(default)')
|
||||
|
||||
def test_per_policy_conf_invalid_write_affinity_node_count_value(self):
|
||||
def do_test(conf_sections, scope):
|
||||
with self.assertRaises(ValueError) as cm:
|
||||
self._write_conf_and_load_app(conf_sections)
|
||||
self.assertIn('2* replicas', cm.exception.message)
|
||||
self.assertIn('Invalid write_affinity_node_count value:',
|
||||
cm.exception.message)
|
||||
self.assertIn(scope, cm.exception.message)
|
||||
|
||||
conf_sections = """
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
write_affinity_node_count = 2 * replicas
|
||||
|
||||
[proxy-server:policy:0]
|
||||
sorting_method = affinity
|
||||
write_affinity_node_count = 2* replicas
|
||||
"""
|
||||
do_test(conf_sections, 'policy 0 (nulo)')
|
||||
|
||||
conf_sections = """
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
write_affinity_node_count = 2* replicas
|
||||
|
||||
[proxy-server:policy:0]
|
||||
write_affinity_node_count = 2 * replicas
|
||||
"""
|
||||
do_test(conf_sections, '(default)')
|
||||
|
||||
def test_per_policy_conf_bad_section_name(self):
|
||||
conf_sections = """
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
|
||||
[proxy-server:policy:]
|
||||
"""
|
||||
with self.assertRaises(ValueError) as cm:
|
||||
self._write_conf_and_load_app(conf_sections)
|
||||
self.assertIn("Override config must refer to policy index: ''",
|
||||
cm.exception.message)
|
||||
|
||||
def test_per_policy_conf_section_name_not_index(self):
|
||||
conf_sections = """
|
||||
[app:proxy-server]
|
||||
use = egg:swift#proxy
|
||||
|
||||
[proxy-server:policy:uno]
|
||||
"""
|
||||
with self.assertRaises(ValueError) as cm:
|
||||
self._write_conf_and_load_app(conf_sections)
|
||||
self.assertIn("Override config must refer to policy index: 'uno'",
|
||||
cm.exception.message)
|
||||
|
||||
|
||||
class TestProxyServerConfigStringLoading(TestProxyServerConfigLoading):
|
||||
# The proxy may be loaded from a conf string rather than a conf file, for
|
||||
# example when ContainerSync creates an InternalClient from a default
|
||||
# config string. So repeat super-class tests using a string loader.
|
||||
def _write_conf(self, conf_body):
|
||||
# this is broken out to a method so that subclasses can override
|
||||
return ConfigString(conf_body)
|
||||
|
||||
|
||||
class BaseTestObjectController(object):
|
||||
"""
|
||||
A root of TestObjController that implements helper methods for child
|
||||
@ -1953,7 +2552,8 @@ class TestReplicatedObjectController(
|
||||
self.assertEqual(test_errors, [])
|
||||
self.assertTrue(res.status.startswith('201 '))
|
||||
|
||||
def test_PUT_respects_write_affinity(self):
|
||||
def _check_PUT_respects_write_affinity(self, conf, policy,
|
||||
expected_region):
|
||||
written_to = []
|
||||
|
||||
def test_connect(ipaddr, port, device, partition, method, path,
|
||||
@ -1961,33 +2561,65 @@ class TestReplicatedObjectController(
|
||||
if path == '/a/c/o.jpg':
|
||||
written_to.append((ipaddr, port, device))
|
||||
|
||||
# mock shuffle to be a no-op to ensure that the only way nodes would
|
||||
# not be used in ring order is if affinity is respected.
|
||||
with mock.patch('swift.proxy.server.shuffle', lambda x: x):
|
||||
app = proxy_server.Application(
|
||||
conf, FakeMemcache(),
|
||||
logger=debug_logger('proxy-ut'),
|
||||
account_ring=FakeRing(),
|
||||
container_ring=FakeRing())
|
||||
with save_globals():
|
||||
def is_r0(node):
|
||||
return node['region'] == 0
|
||||
|
||||
object_ring = self.app.get_object_ring(None)
|
||||
object_ring = app.get_object_ring(policy)
|
||||
object_ring.max_more_nodes = 100
|
||||
self.app.write_affinity_is_local_fn = is_r0
|
||||
self.app.write_affinity_node_count = lambda r: 3
|
||||
|
||||
controller = \
|
||||
ReplicatedObjectController(
|
||||
self.app, 'a', 'c', 'o.jpg')
|
||||
app, 'a', 'c', 'o.jpg')
|
||||
# requests go to acc, con, obj, obj, obj
|
||||
set_http_connect(200, 200, 201, 201, 201,
|
||||
give_connect=test_connect)
|
||||
req = Request.blank('/v1/a/c/o.jpg', {})
|
||||
req.content_length = 1
|
||||
req.body = 'a'
|
||||
self.app.memcache.store = {}
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o.jpg', method='PUT', body='a',
|
||||
headers={'X-Backend-Storage-Policy-Index': str(policy)})
|
||||
app.memcache.store = {}
|
||||
res = controller.PUT(req)
|
||||
self.assertTrue(res.status.startswith('201 '))
|
||||
|
||||
self.assertEqual(3, len(written_to))
|
||||
for ip, port, device in written_to:
|
||||
# this is kind of a hokey test, but in FakeRing, the port is even
|
||||
# when the region is 0, and odd when the region is 1, so this test
|
||||
# asserts that we only wrote to nodes in region 0.
|
||||
self.assertEqual(0, port % 2)
|
||||
self.assertEqual(expected_region, port % 2)
|
||||
|
||||
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing()),
|
||||
StoragePolicy(1, 'one', False, object_ring=FakeRing())])
|
||||
def test_PUT_respects_write_affinity(self):
|
||||
# nodes in fake ring order have r0z0, r1z1, r0z2
|
||||
# Check default conf via proxy server conf
|
||||
conf = {'write_affinity': 'r0'}
|
||||
self._check_PUT_respects_write_affinity(conf, 0, 0)
|
||||
|
||||
# policy 0 and policy 1 have conf via per policy conf section
|
||||
conf = {
|
||||
'write_affinity': '',
|
||||
'policy_config': {
|
||||
'0': {'write_affinity': 'r0'},
|
||||
'1': {'write_affinity': 'r1'}
|
||||
}
|
||||
}
|
||||
self._check_PUT_respects_write_affinity(conf, 0, 0)
|
||||
self._check_PUT_respects_write_affinity(conf, 1, 1)
|
||||
|
||||
# policy 0 conf via per policy conf section override proxy server conf,
|
||||
# policy 1 uses default
|
||||
conf = {
|
||||
'write_affinity': 'r0',
|
||||
'policy_config': {
|
||||
'0': {'write_affinity': 'r1'}
|
||||
}
|
||||
}
|
||||
self._check_PUT_respects_write_affinity(conf, 0, 1)
|
||||
self._check_PUT_respects_write_affinity(conf, 1, 0)
|
||||
|
||||
def test_PUT_respects_write_affinity_with_507s(self):
|
||||
written_to = []
|
||||
@ -2001,10 +2633,11 @@ class TestReplicatedObjectController(
|
||||
def is_r0(node):
|
||||
return node['region'] == 0
|
||||
|
||||
object_ring = self.app.get_object_ring(None)
|
||||
object_ring = self.app.get_object_ring(0)
|
||||
object_ring.max_more_nodes = 100
|
||||
self.app.write_affinity_is_local_fn = is_r0
|
||||
self.app.write_affinity_node_count = lambda r: 3
|
||||
policy_conf = self.app.get_policy_options(POLICIES[0])
|
||||
policy_conf.write_affinity_is_local_fn = is_r0
|
||||
policy_conf.write_affinity_node_count = lambda r: 3
|
||||
|
||||
controller = \
|
||||
ReplicatedObjectController(
|
||||
@ -2500,7 +3133,7 @@ class TestReplicatedObjectController(
|
||||
# reset the router post patch_policies
|
||||
self.app.obj_controller_router = proxy_server.ObjectControllerRouter()
|
||||
self.app.object_post_as_copy = False
|
||||
self.app.sort_nodes = lambda nodes: nodes
|
||||
self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes
|
||||
backend_requests = []
|
||||
|
||||
def capture_requests(ip, port, method, path, headers, *args,
|
||||
@ -3194,10 +3827,11 @@ class TestReplicatedObjectController(
|
||||
for node in self.app.iter_nodes(object_ring, 0):
|
||||
pass
|
||||
sort_nodes.assert_called_once_with(
|
||||
object_ring.get_part_nodes(0))
|
||||
object_ring.get_part_nodes(0), policy=None)
|
||||
|
||||
def test_iter_nodes_skips_error_limited(self):
|
||||
with mock.patch.object(self.app, 'sort_nodes', lambda n: n):
|
||||
with mock.patch.object(self.app, 'sort_nodes',
|
||||
lambda n, *args, **kwargs: n):
|
||||
object_ring = self.app.get_object_ring(None)
|
||||
first_nodes = list(self.app.iter_nodes(object_ring, 0))
|
||||
second_nodes = list(self.app.iter_nodes(object_ring, 0))
|
||||
@ -3209,7 +3843,8 @@ class TestReplicatedObjectController(
|
||||
|
||||
def test_iter_nodes_gives_extra_if_error_limited_inline(self):
|
||||
object_ring = self.app.get_object_ring(None)
|
||||
with mock.patch.object(self.app, 'sort_nodes', lambda n: n), \
|
||||
with mock.patch.object(self.app, 'sort_nodes',
|
||||
lambda n, *args, **kwargs: n), \
|
||||
mock.patch.object(self.app, 'request_node_count',
|
||||
lambda r: 6), \
|
||||
mock.patch.object(object_ring, 'max_more_nodes', 99):
|
||||
@ -3226,14 +3861,14 @@ class TestReplicatedObjectController(
|
||||
object_ring = self.app.get_object_ring(None)
|
||||
node_list = [dict(id=n, ip='1.2.3.4', port=n, device='D')
|
||||
for n in range(10)]
|
||||
with mock.patch.object(self.app, 'sort_nodes', lambda n: n), \
|
||||
with mock.patch.object(self.app, 'sort_nodes', lambda n, *args, **kwargs: n), \
|
||||
mock.patch.object(self.app, 'request_node_count',
|
||||
lambda r: 3):
|
||||
got_nodes = list(self.app.iter_nodes(object_ring, 0,
|
||||
node_iter=iter(node_list)))
|
||||
self.assertEqual(node_list[:3], got_nodes)
|
||||
|
||||
with mock.patch.object(self.app, 'sort_nodes', lambda n: n), \
|
||||
with mock.patch.object(self.app, 'sort_nodes', lambda n, *args, **kwargs: n), \
|
||||
mock.patch.object(self.app, 'request_node_count',
|
||||
lambda r: 1000000):
|
||||
got_nodes = list(self.app.iter_nodes(object_ring, 0,
|
||||
@ -3300,7 +3935,7 @@ class TestReplicatedObjectController(
|
||||
with save_globals():
|
||||
controller = ReplicatedObjectController(
|
||||
self.app, 'account', 'container', 'object')
|
||||
controller.app.sort_nodes = lambda l: l
|
||||
controller.app.sort_nodes = lambda l, *args, **kwargs: l
|
||||
object_ring = controller.app.get_object_ring(None)
|
||||
self.assert_status_map(controller.HEAD, (200, 200, 503, 200, 200),
|
||||
200)
|
||||
@ -3339,7 +3974,7 @@ class TestReplicatedObjectController(
|
||||
with save_globals():
|
||||
controller = ReplicatedObjectController(
|
||||
self.app, 'account', 'container', 'object')
|
||||
controller.app.sort_nodes = lambda l: l
|
||||
controller.app.sort_nodes = lambda l, *args, **kwargs: l
|
||||
object_ring = controller.app.get_object_ring(None)
|
||||
self.assert_status_map(controller.HEAD, (200, 200, 503, 200, 200),
|
||||
200)
|
||||
@ -3368,7 +4003,7 @@ class TestReplicatedObjectController(
|
||||
with save_globals():
|
||||
controller = ReplicatedObjectController(
|
||||
self.app, 'account', 'container', 'object')
|
||||
controller.app.sort_nodes = lambda l: l
|
||||
controller.app.sort_nodes = lambda l, *args, **kwargs: l
|
||||
object_ring = controller.app.get_object_ring(None)
|
||||
# acc con obj obj obj
|
||||
self.assert_status_map(controller.PUT, (200, 200, 503, 200, 200),
|
||||
@ -3388,7 +4023,7 @@ class TestReplicatedObjectController(
|
||||
with save_globals():
|
||||
controller = ReplicatedObjectController(
|
||||
self.app, 'account', 'container', 'object')
|
||||
controller.app.sort_nodes = lambda l: l
|
||||
controller.app.sort_nodes = lambda l, *args, **kwargs: l
|
||||
object_ring = controller.app.get_object_ring(None)
|
||||
# acc con obj obj obj
|
||||
self.assert_status_map(controller.PUT, (200, 200, 200, 200, 503),
|
||||
@ -4021,6 +4656,78 @@ class TestReplicatedObjectController(
|
||||
controller.GET(req)
|
||||
self.assertTrue(called[0])
|
||||
|
||||
def _check_GET_respects_read_affinity(self, conf, policy, expected_nodes):
|
||||
actual_nodes = []
|
||||
|
||||
def test_connect(ipaddr, port, device, partition, method, path,
|
||||
headers=None, query_string=None):
|
||||
if path == '/a/c/o.jpg':
|
||||
actual_nodes.append(ipaddr)
|
||||
|
||||
# mock shuffle to be a no-op to ensure that the only way nodes would
|
||||
# not be used in ring order is if affinity is respected.
|
||||
with mock.patch('swift.proxy.server.shuffle', lambda x: x):
|
||||
app = proxy_server.Application(
|
||||
conf, FakeMemcache(),
|
||||
logger=debug_logger('proxy-ut'),
|
||||
account_ring=FakeRing(),
|
||||
container_ring=FakeRing())
|
||||
with save_globals():
|
||||
object_ring = app.get_object_ring(policy)
|
||||
object_ring.max_more_nodes = 100
|
||||
controller = \
|
||||
ReplicatedObjectController(
|
||||
app, 'a', 'c', 'o.jpg')
|
||||
# requests go to acc, con, obj, obj, obj
|
||||
set_http_connect(200, 200, 404, 404, 200,
|
||||
give_connect=test_connect)
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o.jpg',
|
||||
headers={'X-Backend-Storage-Policy-Index': str(policy)})
|
||||
app.memcache.store = {}
|
||||
res = controller.GET(req)
|
||||
self.assertTrue(res.status.startswith('200 '))
|
||||
self.assertEqual(3, len(actual_nodes))
|
||||
self.assertEqual(expected_nodes, actual_nodes)
|
||||
|
||||
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing()),
|
||||
StoragePolicy(1, 'one', False, object_ring=FakeRing())])
|
||||
def test_GET_respects_read_affinity(self):
|
||||
# nodes in fake ring order have r0z0, r1z1, r0z2
|
||||
# Check default conf via proxy server conf
|
||||
conf = {'read_affinity': 'r0z2=1, r1=2',
|
||||
'sorting_method': 'affinity'}
|
||||
expected_nodes = ['10.0.0.2', '10.0.0.1', '10.0.0.0']
|
||||
self._check_GET_respects_read_affinity(conf, 0, expected_nodes)
|
||||
|
||||
# policy 0 and policy 1 have conf via per policy conf section
|
||||
conf = {
|
||||
'read_affinity': '',
|
||||
'sorting_method': 'shuffle',
|
||||
'policy_config': {
|
||||
'0': {'read_affinity': 'r1z1=1, r0z2=2',
|
||||
'sorting_method': 'affinity'},
|
||||
'1': {'read_affinity': 'r0z2=1, r0z0=2',
|
||||
'sorting_method': 'affinity'}
|
||||
}
|
||||
}
|
||||
expected_nodes = ['10.0.0.1', '10.0.0.2', '10.0.0.0']
|
||||
self._check_GET_respects_read_affinity(conf, 0, expected_nodes)
|
||||
expected_nodes = ['10.0.0.2', '10.0.0.0', '10.0.0.1']
|
||||
self._check_GET_respects_read_affinity(conf, 1, expected_nodes)
|
||||
|
||||
# policy 0 conf via per policy conf section overrides proxy server conf
|
||||
conf = {
|
||||
'read_affinity': 'r1z1=1, r0z2=2',
|
||||
'sorting_method': 'affinity',
|
||||
'policy_config': {
|
||||
'0': {'read_affinity': 'r0z2=1, r0=2',
|
||||
'sorting_method': 'affinity'}
|
||||
}
|
||||
}
|
||||
expected_nodes = ['10.0.0.2', '10.0.0.0', '10.0.0.1']
|
||||
self._check_GET_respects_read_affinity(conf, 0, expected_nodes)
|
||||
|
||||
def test_HEAD_calls_authorize(self):
|
||||
called = [False]
|
||||
|
||||
@ -7182,7 +7889,7 @@ class TestContainerController(unittest.TestCase):
|
||||
controller = proxy_server.ContainerController(self.app, 'account',
|
||||
'container')
|
||||
container_ring = controller.app.container_ring
|
||||
controller.app.sort_nodes = lambda l: l
|
||||
controller.app.sort_nodes = lambda l, *args, **kwargs: l
|
||||
self.assert_status_map(controller.HEAD, (200, 503, 200, 200), 200,
|
||||
missing_container=False)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user