Implementation of replication servers

Support separate replication ip address:
- Added new function in utils. This function provides ability
  to select separate IP address for replication service.
- Db_replicator and object replicators were changed.
  Replication process uses new function now.

Replication network parameters:
- Replication network fields (replication_ip, replication_port)
  support was added to device dictionary in swift-ring-builder script.
- Changes were made to support new fields in search, show and set_info
  functions.

Implementation of replication servers:
- Separate replication servers use the same code as normal replication
  servers, but with replication_server parameter = True.  When using a
  separate replication network, the non-replication servers set
  replication_server = False.  When there is no separate replication
  network (the default case), replication_server is not included in the config.

DocImpact
Change-Id: Ie9af5bdcdf9241c355e36053ca4adfe49dc35bd0
Implements: blueprint dedicated-replication-network
This commit is contained in:
Sergey Kraynev 2012-12-17 06:39:25 -05:00
parent 884b7bb8f8
commit ea7858176b
29 changed files with 1640 additions and 99 deletions

View File

@ -42,9 +42,11 @@ def format_device(dev):
Format a device for display.
"""
copy_dev = dev.copy()
if ':' in copy_dev['ip']:
copy_dev['ip'] = '[' + copy_dev['ip'] + ']'
return ('d%(id)sr%(region)sz%(zone)s-%(ip)s:%(port)s/%(device)s_'
for key in ('ip', 'replication_ip'):
if ':' in copy_dev[key]:
copy_dev[key] = '[' + copy_dev[key] + ']'
return ('d%(id)sr%(region)sz%(zone)s-%(ip)s:%(port)sR'
'%(replication_ip)s:%(replication_port)s/%(device)s_'
'"%(meta)s"' % copy_dev)
@ -102,8 +104,9 @@ swift-ring-builder <builder_file>
print 'The minimum number of hours before a partition can be ' \
'reassigned is %s' % builder.min_part_hours
if builder.devs:
print 'Devices: id region zone ip address port' \
' name weight partitions balance meta'
print 'Devices: id region zone ip address port ' \
'replication ip replication port name ' \
'weight partitions balance meta'
weighted_parts = builder.parts * builder.replicas / \
sum(d['weight'] for d in builder.devs if d is not None)
for dev in builder.devs:
@ -117,11 +120,12 @@ swift-ring-builder <builder_file>
else:
balance = 100.0 * dev['parts'] / \
(dev['weight'] * weighted_parts) - 100.0
print(' %5d %7d %5d %15s %5d %9s %6.02f %10s'
'%7.02f %s' %
print(' %5d %7d %5d %15s %5d %15s %17d %9s %6.02f '
'%10s %7.02f %s' %
(dev['id'], dev['region'], dev['zone'], dev['ip'],
dev['port'], dev['device'], dev['weight'], dev['parts'],
balance, dev['meta']))
dev['port'], dev['replication_ip'],
dev['replication_port'], dev['device'], dev['weight'],
dev['parts'], balance, dev['meta']))
exit(EXIT_SUCCESS)
def search():
@ -138,8 +142,9 @@ swift-ring-builder <builder_file> search <search-value>
if not devs:
print 'No matching devices found'
exit(EXIT_ERROR)
print 'Devices: id region zone ip address port name ' \
'weight partitions balance meta'
print 'Devices: id region zone ip address port ' \
'replication ip replication port name weight partitions ' \
'balance meta'
weighted_parts = builder.parts * builder.replicas / \
sum(d['weight'] for d in builder.devs if d is not None)
for dev in devs:
@ -151,10 +156,12 @@ swift-ring-builder <builder_file> search <search-value>
else:
balance = 100.0 * dev['parts'] / \
(dev['weight'] * weighted_parts) - 100.0
print(' %5d %7d %5d %15s %5d %9s %6.02f %10s %7.02f %s' %
print(' %5d %7d %5d %15s %5d %15s %17d %9s %6.02f %10s '
'%7.02f %s' %
(dev['id'], dev['region'], dev['zone'], dev['ip'],
dev['port'], dev['device'], dev['weight'], dev['parts'],
balance, dev['meta']))
dev['port'], dev['replication_ip'], dev['replication_port'],
dev['device'], dev['weight'], dev['parts'], balance,
dev['meta']))
exit(EXIT_SUCCESS)
def list_parts():
@ -195,8 +202,12 @@ swift-ring-builder <builder_file> list_parts <search-value> [<search-value>] ..
def add():
"""
swift-ring-builder <builder_file> add
[r<region>]z<zone>-<ip>:<port>/<device_name>_<meta> <weight>
[[r<region>]z<zone>-<ip>:<port>/<device_name>_<meta> <weight>] ...
[r<region>]z<zone>-<ip>:<port>[R<r_ip>:<r_port>]/<device_name>_<meta>
<weight>
[[r<region>]z<zone>-<ip>:<port>[R<r_ip>:<r_port>]/<device_name>_<meta>
<weight>] ...
Where <r_ip> and <r_port> are replication ip and port.
Adds devices to the ring with the given information. No partitions will be
assigned to the new device until after running 'rebalance'. This is so you
@ -258,6 +269,33 @@ swift-ring-builder <builder_file> add
port = int(rest[1:i])
rest = rest[i:]
replication_ip = ip
replication_port = port
if rest.startswith('R'):
i = 1
if rest[i] == '[':
i += 1
while i < len(rest) and rest[i] != ']':
i += 1
i += 1
replication_ip = rest[1:i].lstrip('[').rstrip(']')
rest = rest[i:]
else:
while i < len(rest) and rest[i] in '0123456789.':
i += 1
replication_ip = rest[1:i]
rest = rest[i:]
if not rest.startswith(':'):
print 'Invalid add value: %s' % devstr
print "The on-disk ring builder is unchanged.\n"
exit(EXIT_ERROR)
i = 1
while i < len(rest) and rest[i].isdigit():
i += 1
replication_port = int(rest[1:i])
rest = rest[i:]
if not rest.startswith('/'):
print 'Invalid add value: %s' % devstr
print "The on-disk ring builder is unchanged.\n"
@ -295,8 +333,10 @@ swift-ring-builder <builder_file> add
exit(EXIT_ERROR)
builder.add_dev({'region': region, 'zone': zone, 'ip': ip,
'port': port, 'device': device_name,
'weight': weight, 'meta': meta})
'port': port, 'replication_ip': replication_ip,
'replication_port': replication_port,
'device': device_name, 'weight': weight,
'meta': meta})
new_dev = builder.search_devs(
'r%dz%d-%s:%s/%s' %
(region, zone, ip, port, device_name))[0]
@ -348,8 +388,10 @@ swift-ring-builder <builder_file> set_weight <search-value> <weight>
def set_info():
"""
swift-ring-builder <builder_file> set_info
<search-value> <ip>:<port>/<device_name>_<meta>
[<search-value> <ip>:<port>/<device_name>_<meta>] ...
<search-value> <ip>:<port>[R<r_ip>:<r_port>]/<device_name>_<meta>
[<search-value> <ip>:<port>[R<r_ip>:<r_port>]/<device_name>_<meta>] ...
Where <r_ip> and <r_port> are replication ip and port.
For each search-value, resets the matched device's information.
This information isn't used to assign partitions, so you can use
@ -391,6 +433,29 @@ swift-ring-builder <builder_file> set_info
i += 1
change.append(('port', int(change_value[1:i])))
change_value = change_value[i:]
if change_value.startswith('R'):
change_value = change_value[1:]
if len(change_value) and change_value[0].isdigit():
i = 1
while (i < len(change_value) and
change_value[i] in '0123456789.'):
i += 1
change.append(('replication_ip', change_value[:i]))
change_value = change_value[i:]
elif len(change_value) and change_value[0] == '[':
i = 1
while i < len(change_value) and change_value[i] != ']':
i += 1
i += 1
change.append(('replication_ip',
change_value[:i].lstrip('[').rstrip(']')))
change_value = change_value[i:]
if change_value.startswith(':'):
i = 1
while i < len(change_value) and change_value[i].isdigit():
i += 1
change.append(('replication_port', int(change_value[1:i])))
change_value = change_value[i:]
if change_value.startswith('/'):
i = 1
while i < len(change_value) and change_value[i] != '_':

View File

@ -76,6 +76,7 @@ Administrator Documentation
howto_installmultinode
deployment_guide
admin_guide
replication_network
Source Documentation
====================

View File

@ -2,39 +2,102 @@
Replication
===========
Since each replica in swift functions independently, and clients generally require only a simple majority of nodes responding to consider an operation successful, transient failures like network partitions can quickly cause replicas to diverge. These differences are eventually reconciled by asynchronous, peer-to-peer replicator processes. The replicator processes traverse their local filesystems, concurrently performing operations in a manner that balances load across physical disks.
Because each replica in swift functions independently, and clients generally
require only a simple majority of nodes responding to consider an operation
successful, transient failures like network partitions can quickly cause
replicas to diverge. These differences are eventually reconciled by
asynchronous, peer-to-peer replicator processes. The replicator processes
traverse their local filesystems, concurrently performing operations in a
manner that balances load across physical disks.
Replication uses a push model, with records and files generally only being copied from local to remote replicas. This is important because data on the node may not belong there (as in the case of handoffs and ring changes), and a replicator can't know what data exists elsewhere in the cluster that it should pull in. It's the duty of any node that contains data to ensure that data gets to where it belongs. Replica placement is handled by the ring.
Replication uses a push model, with records and files generally only being
copied from local to remote replicas. This is important because data on the
node may not belong there (as in the case of handoffs and ring changes), and a
replicator can't know what data exists elsewhere in the cluster that it should
pull in. It's the duty of any node that contains data to ensure that data gets
to where it belongs. Replica placement is handled by the ring.
Every deleted record or file in the system is marked by a tombstone, so that deletions can be replicated alongside creations. These tombstones are cleaned up by the replication process after a period of time referred to as the consistency window, which is related to replication duration and how long transient failures can remove a node from the cluster. Tombstone cleanup must be tied to replication to reach replica convergence.
Every deleted record or file in the system is marked by a tombstone, so that
deletions can be replicated alongside creations. The replication process cleans
up tombstones after a time period known as the consistency window.
The consistency window encompasses replication duration and how long transient
failure can remove a node from the cluster. Tombstone cleanup must
be tied to replication to reach replica convergence.
If a replicator detects that a remote drive has failed, it will use the ring's "get_more_nodes" interface to choose an alternate node to synchronize with. The replicator can maintain desired levels of replication in the face of disk failures, though some replicas may not be in an immediately usable location. Note that the replicator doesn't maintain desired levels of replication in the case of other failures (e.g. entire node failures) because the most of such failures are transient.
If a replicator detects that a remote drive has failed, the replicator uses
the get_more_nodes interface for the ring to choose an alternate node with
which to synchronize. The replicator can maintain desired levels of replication
in the face of disk failures, though some replicas may not be in an immediately
usable location. Note that the replicator doesn't maintain desired levels of
replication when other failures, such as entire node failures, occur because
most failure are transient.
Replication is an area of active development, and likely rife with potential improvements to speed and correctness.
There are two major classes of replicator - the db replicator, which replicates accounts and containers, and the object replicator, which replicates object data.
Replication is an area of active development, and likely rife with potential
improvements to speed and correctness.
There are two major classes of replicator - the db replicator, which
replicates accounts and containers, and the object replicator, which
replicates object data.
--------------
DB Replication
--------------
The first step performed by db replication is a low-cost hash comparison to find out whether or not two replicas already match. Under normal operation, this check is able to verify that most databases in the system are already synchronized very quickly. If the hashes differ, the replicator brings the databases in sync by sharing records added since the last sync point.
The first step performed by db replication is a low-cost hash comparison to
determine whether two replicas already match. Under normal operation,
this check is able to verify that most databases in the system are already
synchronized very quickly. If the hashes differ, the replicator brings the
databases in sync by sharing records added since the last sync point.
This sync point is a high water mark noting the last record at which two databases were known to be in sync, and is stored in each database as a tuple of the remote database id and record id. Database ids are unique amongst all replicas of the database, and record ids are monotonically increasing integers. After all new records have been pushed to the remote database, the entire sync table of the local database is pushed, so the remote database knows it's now in sync with everyone the local database has previously synchronized with.
This sync point is a high water mark noting the last record at which two
databases were known to be in sync, and is stored in each database as a tuple
of the remote database id and record id. Database ids are unique amongst all
replicas of the database, and record ids are monotonically increasing
integers. After all new records have been pushed to the remote database, the
entire sync table of the local database is pushed, so the remote database
can guarantee that it is in sync with everything with which the local database
has previously synchronized.
If a replica is found to be missing entirely, the whole local database file is transmitted to the peer using rsync(1) and vested with a new unique id.
If a replica is found to be missing entirely, the whole local database file is
transmitted to the peer using rsync(1) and vested with a new unique id.
In practice, DB replication can process hundreds of databases per concurrency setting per second (up to the number of available CPUs or disks) and is bound by the number of DB transactions that must be performed.
In practice, DB replication can process hundreds of databases per concurrency
setting per second (up to the number of available CPUs or disks) and is bound
by the number of DB transactions that must be performed.
------------------
Object Replication
------------------
The initial implementation of object replication simply performed an rsync to push data from a local partition to all remote servers it was expected to exist on. While this performed adequately at small scale, replication times skyrocketed once directory structures could no longer be held in RAM. We now use a modification of this scheme in which a hash of the contents for each suffix directory is saved to a per-partition hashes file. The hash for a suffix directory is invalidated when the contents of that suffix directory are modified.
The initial implementation of object replication simply performed an rsync to
push data from a local partition to all remote servers it was expected to
exist on. While this performed adequately at small scale, replication times
skyrocketed once directory structures could no longer be held in RAM. We now
use a modification of this scheme in which a hash of the contents for each
suffix directory is saved to a per-partition hashes file. The hash for a
suffix directory is invalidated when the contents of that suffix directory are
modified.
The object replication process reads in these hash files, calculating any invalidated hashes. It then transmits the hashes to each remote server that should hold the partition, and only suffix directories with differing hashes on the remote server are rsynced. After pushing files to the remote server, the replication process notifies it to recalculate hashes for the rsynced suffix directories.
The object replication process reads in these hash files, calculating any
invalidated hashes. It then transmits the hashes to each remote server that
should hold the partition, and only suffix directories with differing hashes
on the remote server are rsynced. After pushing files to the remote server,
the replication process notifies it to recalculate hashes for the rsynced
suffix directories.
Performance of object replication is generally bound by the number of uncached directories it has to traverse, usually as a result of invalidated suffix directory hashes. Using write volume and partition counts from our running systems, it was designed so that around 2% of the hash space on a normal node will be invalidated per day, which has experimentally given us acceptable replication speeds.
Performance of object replication is generally bound by the number of uncached
directories it has to traverse, usually as a result of invalidated suffix
directory hashes. Using write volume and partition counts from our running
systems, it was designed so that around 2% of the hash space on a normal node
will be invalidated per day, which has experimentally given us acceptable
replication speeds.
-----------------------------
Dedicated replication network
-----------------------------
Swift has support for using dedicated network for replication traffic.
For more information see :ref:`Overview of dedicated replication network
<Dedicated-replication-network>`.

View File

@ -0,0 +1,508 @@
.. _Dedicated-replication-network:
=============================
Dedicated replication network
=============================
-------
Summary
-------
Swift's replication process is essential for consistency and availability of
data. By default, replication activity will use the same network interface as
other cluster operations. However, if a replication interface is set in the
ring for a node, that node will send replication traffic on its designated
separate replication network interface. Replication traffic includes REPLICATE
requests and rsync traffic.
To separate the cluster-internal replication traffic from client traffic,
separate replication servers can be used. These replication servers are based
on the standard storage servers, but they listen on the replication IP and
only respond to REPLICATE requests. Storage servers can serve REPLICATE
requests, so an operator can transition to using a separate replication
network with no cluster downtime.
Replication IP and port information is stored in the ring on a per-node basis.
These parameters will be used if they are present, but they are not required.
If this information does not exist or is empty for a particular node, the
node's standard IP and port will be used for replication.
--------------------
For SAIO replication
--------------------
#. Create new script in ~/bin/ (for example: remakerings_new)::
#!/bin/bash
cd /etc/swift
rm -f *.builder *.ring.gz backups/*.builder backups/*.ring.gz
swift-ring-builder object.builder create 18 3 1
swift-ring-builder object.builder add z1-127.0.0.1:6010R127.0.0.1:6050/sdb1 1
swift-ring-builder object.builder add z2-127.0.0.1:6020R127.0.0.1:6060/sdb2 1
swift-ring-builder object.builder add z3-127.0.0.1:6030R127.0.0.1:6070/sdb3 1
swift-ring-builder object.builder add z4-127.0.0.1:6040R127.0.0.1:6080/sdb4 1
swift-ring-builder object.builder rebalance
swift-ring-builder container.builder create 18 3 1
swift-ring-builder container.builder add z1-127.0.0.1:6011R127.0.0.1:6051/sdb1 1
swift-ring-builder container.builder add z2-127.0.0.1:6021R127.0.0.1:6061/sdb2 1
swift-ring-builder container.builder add z3-127.0.0.1:6031R127.0.0.1:6071/sdb3 1
swift-ring-builder container.builder add z4-127.0.0.1:6041R127.0.0.1:6081/sdb4 1
swift-ring-builder container.builder rebalance
swift-ring-builder account.builder create 18 3 1
swift-ring-builder account.builder add z1-127.0.0.1:6012R127.0.0.1:6052/sdb1 1
swift-ring-builder account.builder add z2-127.0.0.1:6022R127.0.0.1:6062/sdb2 1
swift-ring-builder account.builder add z3-127.0.0.1:6032R127.0.0.1:6072/sdb3 1
swift-ring-builder account.builder add z4-127.0.0.1:6042R127.0.0.1:6082/sdb4 1
swift-ring-builder account.builder rebalance
.. note::
Syntax of adding device has been changed: R<ip_replication>:<port_replication> was added between z<zone>-<ip>:<port> and /<device_name>_<meta> <weight>. Added devices will use <ip_replication> and <port_replication> for replication activities.
#. Add next rows in /etc/rsyncd.conf::
[account6052]
max connections = 25
path = /srv/1/node/
read only = false
lock file = /var/lock/account6052.lock
[account6062]
max connections = 25
path = /srv/2/node/
read only = false
lock file = /var/lock/account6062.lock
[account6072]
max connections = 25
path = /srv/3/node/
read only = false
lock file = /var/lock/account6072.lock
[account6082]
max connections = 25
path = /srv/4/node/
read only = false
lock file = /var/lock/account6082.lock
[container6051]
max connections = 25
path = /srv/1/node/
read only = false
lock file = /var/lock/container6051.lock
[container6061]
max connections = 25
path = /srv/2/node/
read only = false
lock file = /var/lock/container6061.lock
[container6071]
max connections = 25
path = /srv/3/node/
read only = false
lock file = /var/lock/container6071.lock
[container6081]
max connections = 25
path = /srv/4/node/
read only = false
lock file = /var/lock/container6081.lock
[object6050]
max connections = 25
path = /srv/1/node/
read only = false
lock file = /var/lock/object6050.lock
[object6060]
max connections = 25
path = /srv/2/node/
read only = false
lock file = /var/lock/object6060.lock
[object6070]
max connections = 25
path = /srv/3/node/
read only = false
lock file = /var/lock/object6070.lock
[object6080]
max connections = 25
path = /srv/4/node/
read only = false
lock file = /var/lock/object6080.lock
#. Restart rsync deamon::
service rsync restart
#. Add changes in configuration files in directories:
* /etc/swift/object-server(files: 1.conf, 2.conf, 3.conf, 4.conf)
* /etc/swift/container-server(files: 1.conf, 2.conf, 3.conf, 4.conf)
* /etc/swift/account-server(files: 1.conf, 2.conf, 3.conf, 4.conf)
delete all configuration options in section [<*>-replicator]
#. Add configuration files for object-server, in /etc/swift/objec-server/
* 5.conf::
[DEFAULT]
devices = /srv/1/node
mount_check = false
disable_fallocate = true
bind_port = 6050
user = swift
log_facility = LOG_LOCAL2
recon_cache_path = /var/cache/swift
[pipeline:main]
pipeline = recon object-server
[app:object-server]
use = egg:swift#object
replication_server = True
[filter:recon]
use = egg:swift#recon
[object-replicator]
vm_test_mode = yes
* 6.conf::
[DEFAULT]
devices = /srv/2/node
mount_check = false
disable_fallocate = true
bind_port = 6060
user = swift
log_facility = LOG_LOCAL3
recon_cache_path = /var/cache/swift2
[pipeline:main]
pipeline = recon object-server
[app:object-server]
use = egg:swift#object
replication_server = True
[filter:recon]
use = egg:swift#recon
[object-replicator]
vm_test_mode = yes
* 7.conf::
[DEFAULT]
devices = /srv/3/node
mount_check = false
disable_fallocate = true
bind_port = 6070
user = swift
log_facility = LOG_LOCAL4
recon_cache_path = /var/cache/swift3
[pipeline:main]
pipeline = recon object-server
[app:object-server]
use = egg:swift#object
replication_server = True
[filter:recon]
use = egg:swift#recon
[object-replicator]
vm_test_mode = yes
* 8.conf::
[DEFAULT]
devices = /srv/4/node
mount_check = false
disable_fallocate = true
bind_port = 6080
user = swift
log_facility = LOG_LOCAL5
recon_cache_path = /var/cache/swift4
[pipeline:main]
pipeline = recon object-server
[app:object-server]
use = egg:swift#object
replication_server = True
[filter:recon]
use = egg:swift#recon
[object-replicator]
vm_test_mode = yes
#. Add configuration files for container-server, in /etc/swift/container-server/
* 5.conf::
[DEFAULT]
devices = /srv/1/node
mount_check = false
disable_fallocate = true
bind_port = 6051
user = swift
log_facility = LOG_LOCAL2
recon_cache_path = /var/cache/swift
[pipeline:main]
pipeline = recon container-server
[app:container-server]
use = egg:swift#container
replication_server = True
[filter:recon]
use = egg:swift#recon
[container-replicator]
vm_test_mode = yes
* 6.conf::
[DEFAULT]
devices = /srv/2/node
mount_check = false
disable_fallocate = true
bind_port = 6061
user = swift
log_facility = LOG_LOCAL3
recon_cache_path = /var/cache/swift2
[pipeline:main]
pipeline = recon container-server
[app:container-server]
use = egg:swift#container
replication_server = True
[filter:recon]
use = egg:swift#recon
[container-replicator]
vm_test_mode = yes
* 7.conf::
[DEFAULT]
devices = /srv/3/node
mount_check = false
disable_fallocate = true
bind_port = 6071
user = swift
log_facility = LOG_LOCAL4
recon_cache_path = /var/cache/swift3
[pipeline:main]
pipeline = recon container-server
[app:container-server]
use = egg:swift#container
replication_server = True
[filter:recon]
use = egg:swift#recon
[container-replicator]
vm_test_mode = yes
* 8.conf::
[DEFAULT]
devices = /srv/4/node
mount_check = false
disable_fallocate = true
bind_port = 6081
user = swift
log_facility = LOG_LOCAL5
recon_cache_path = /var/cache/swift4
[pipeline:main]
pipeline = recon container-server
[app:container-server]
use = egg:swift#container
replication_server = True
[filter:recon]
use = egg:swift#recon
[container-replicator]
vm_test_mode = yes
#. Add configuration files for account-server, in /etc/swift/account-server/
* 5.conf::
[DEFAULT]
devices = /srv/1/node
mount_check = false
disable_fallocate = true
bind_port = 6052
user = swift
log_facility = LOG_LOCAL2
recon_cache_path = /var/cache/swift
[pipeline:main]
pipeline = recon account-server
[app:account-server]
use = egg:swift#account
replication_server = True
[filter:recon]
use = egg:swift#recon
[account-replicator]
vm_test_mode = yes
* 6.conf::
[DEFAULT]
devices = /srv/2/node
mount_check = false
disable_fallocate = true
bind_port = 6062
user = swift
log_facility = LOG_LOCAL3
recon_cache_path = /var/cache/swift2
[pipeline:main]
pipeline = recon account-server
[app:account-server]
use = egg:swift#account
replication_server = True
[filter:recon]
use = egg:swift#recon
[account-replicator]
vm_test_mode = yes
* 7.conf::
[DEFAULT]
devices = /srv/3/node
mount_check = false
disable_fallocate = true
bind_port = 6072
user = swift
log_facility = LOG_LOCAL4
recon_cache_path = /var/cache/swift3
[pipeline:main]
pipeline = recon account-server
[app:account-server]
use = egg:swift#account
replication_server = True
[filter:recon]
use = egg:swift#recon
[account-replicator]
vm_test_mode = yes
* 8.conf::
[DEFAULT]
devices = /srv/4/node
mount_check = false
disable_fallocate = true
bind_port = 6082
user = swift
log_facility = LOG_LOCAL5
recon_cache_path = /var/cache/swift4
[pipeline:main]
pipeline = recon account-server
[app:account-server]
use = egg:swift#account
replication_server = True
[filter:recon]
use = egg:swift#recon
[account-replicator]
vm_test_mode = yes
---------------------------------
For a Multiple Server replication
---------------------------------
#. Move configuration file.
* Configuration file for object-server from /etc/swift/object-server.conf to /etc/swift/object-server/1.conf
* Configuration file for container-server from /etc/swift/container-server.conf to /etc/swift/container-server/1.conf
* Configuration file for account-server from /etc/swift/account-server.conf to /etc/swift/account-server/1.conf
#. Add changes in configuration files in directories:
* /etc/swift/object-server(files: 1.conf)
* /etc/swift/container-server(files: 1.conf)
* /etc/swift/account-server(files: 1.conf)
delete all configuration options in section [<*>-replicator]
#. Add configuration files for object-server, in /etc/swift/object-server/2.conf::
[DEFAULT]
bind_ip = $STORAGE_LOCAL_NET_IP
workers = 2
[pipeline:main]
pipeline = object-server
[app:object-server]
use = egg:swift#object
replication_server = True
[object-replicator]
#. Add configuration files for container-server, in /etc/swift/container-server/2.conf::
[DEFAULT]
bind_ip = $STORAGE_LOCAL_NET_IP
workers = 2
[pipeline:main]
pipeline = container-server
[app:container-server]
use = egg:swift#container
replication_server = True
[container-replicator]
#. Add configuration files for account-server, in /etc/swift/account-server/2.conf::
[DEFAULT]
bind_ip = $STORAGE_LOCAL_NET_IP
workers = 2
[pipeline:main]
pipeline = account-server
[app:account-server]
use = egg:swift#account
replication_server = True
[account-replicator]

View File

@ -48,6 +48,13 @@ use = egg:swift#account
# set log_address = /dev/log
# auto_create_account_prefix = .
# max_clients = 1024
# Configure parameter for creating specific server
# To handle all verbs, including replication verbs, do not specify
# "replication_server" (this is the default). To only handle replication,
# set to a True value (e.g. "True" or "1"). To handle only non-replication
# verbs, set to "False". Unless you have a separate replication network, you
# should not specify any value for "replication_server".
# replication_server = False
[filter:healthcheck]
use = egg:swift#healthcheck

View File

@ -54,6 +54,13 @@ use = egg:swift#container
# allow_versions = False
# auto_create_account_prefix = .
# max_clients = 1024
# Configure parameter for creating specific server
# To handle all verbs, including replication verbs, do not specify
# "replication_server" (this is the default). To only handle replication,
# set to a True value (e.g. "True" or "1"). To handle only non-replication
# verbs, set to "False". Unless you have a separate replication network, you
# should not specify any value for "replication_server".
# replication_server = False
[filter:healthcheck]
use = egg:swift#healthcheck

View File

@ -63,6 +63,13 @@ use = egg:swift#object
# allowed_headers = Content-Disposition, Content-Encoding, X-Delete-At, X-Object-Manifest, X-Static-Large-Object
# auto_create_account_prefix = .
# max_clients = 1024
# Configure parameter for creating specific server
# To handle all verbs, including replication verbs, do not specify
# "replication_server" (this is the default). To only handle replication,
# set to a True value (e.g. "True" or "1"). To handle only non-replication
# verbs, set to "False". Unless you have a separate replication network, you
# should not specify any value for "replication_server".
# replication_server = False
[filter:healthcheck]
use = egg:swift#healthcheck

View File

@ -47,6 +47,18 @@ class AccountController(object):
self.logger = get_logger(conf, log_route='account-server')
self.root = conf.get('devices', '/srv/node')
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
replication_server = conf.get('replication_server', None)
if replication_server is None:
allowed_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'REPLICATE',
'POST']
else:
replication_server = config_true_value(replication_server)
if replication_server:
allowed_methods = ['REPLICATE']
else:
allowed_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'POST']
self.replication_server = replication_server
self.allowed_methods = allowed_methods
self.replicator_rpc = ReplicatorRpc(self.root, DATADIR, AccountBroker,
self.mount_check,
logger=self.logger)
@ -327,6 +339,8 @@ class AccountController(object):
try:
method = getattr(self, req.method)
getattr(method, 'publicly_accessible')
if req.method not in self.allowed_methods:
raise AttributeError('Not allowed method.')
except AttributeError:
res = HTTPMethodNotAllowed()
else:

View File

@ -115,7 +115,8 @@ class ReplConnection(BufferedHTTPConnection):
""
self.logger = logger
self.node = node
BufferedHTTPConnection.__init__(self, '%(ip)s:%(port)s' % node)
host = "%s:%s" % (node['replication_ip'], node['replication_port'])
BufferedHTTPConnection.__init__(self, host)
self.path = '/%s/%s/%s' % (node['device'], partition, hash_)
def replicate(self, *args):
@ -237,11 +238,11 @@ class Replicator(Daemon):
:param replicate_method: remote operation to perform after rsync
:param replicate_timeout: timeout to wait in seconds
"""
device_ip = rsync_ip(device['ip'])
device_ip = rsync_ip(device['replication_ip'])
if self.vm_test_mode:
remote_file = '%s::%s%s/%s/tmp/%s' % (
device_ip, self.server_type, device['port'], device['device'],
local_id)
device_ip, self.server_type, device['replication_port'],
device['device'], local_id)
else:
remote_file = '%s::%s/%s/tmp/%s' % (
device_ip, self.server_type, device['device'], local_id)
@ -509,7 +510,8 @@ class Replicator(Daemon):
self.logger.error(_('ERROR Failed to get my own IPs?'))
return
for node in self.ring.devs:
if node and node['ip'] in ips and node['port'] == self.port:
if (node and node['replication_ip'] in ips and
node['replication_port'] == self.port):
if self.mount_check and not os.path.ismount(
os.path.join(self.root, node['device'])):
self.logger.warn(

View File

@ -991,13 +991,23 @@ class RingBuilder(object):
#really old rings didn't have meta keys
if dev and 'meta' not in dev:
dev['meta'] = ''
# NOTE(akscram): An old ring builder file don't contain
# replication parameters.
if dev:
if 'ip' in dev:
dev.setdefault('replication_ip', dev['ip'])
if 'port' in dev:
dev.setdefault('replication_port', dev['port'])
return builder
def search_devs(self, search_value):
"""
The <search-value> can be of the form::
d<device_id>r<region>z<zone>-<ip>:<port>/<device_name>_<meta>
d<device_id>r<region>z<zone>-<ip>:<port>[R<r_ip>:<r_port>]/
<device_name>_<meta>
Where <r_ip> and <r_port> are replication ip and port.
Any part is optional, but you must include at least one part.
@ -1010,6 +1020,10 @@ class RingBuilder(object):
1.2.3.4 Matches devices in any zone with the ip 1.2.3.4
z1:5678 Matches devices in zone 1 using port 5678
:5678 Matches devices that use port 5678
R5.6.7.8 Matches devices that use replication ip 5.6.7.8
R:5678 Matches devices that use replication port 5678
1.2.3.4R5.6.7.8 Matches devices that use ip 1.2.3.4 and replication ip
5.6.7.8
/sdb1 Matches devices with the device name sdb1
_shiny Matches devices with shiny in the meta data
_"snet: 5.6.7.8" Matches devices with snet: 5.6.7.8 in the meta data
@ -1066,6 +1080,30 @@ class RingBuilder(object):
i += 1
match.append(('port', int(search_value[1:i])))
search_value = search_value[i:]
# replication parameters
if search_value.startswith('R'):
search_value = search_value[1:]
if len(search_value) and search_value[0].isdigit():
i = 1
while (i < len(search_value) and
search_value[i] in '0123456789.'):
i += 1
match.append(('replication_ip', search_value[:i]))
search_value = search_value[i:]
elif len(search_value) and search_value[0] == '[':
i = 1
while i < len(search_value) and search_value[i] != ']':
i += 1
i += 1
match.append(('replication_ip',
search_value[:i].lstrip('[').rstrip(']')))
search_value = search_value[i:]
if search_value.startswith(':'):
i = 1
while i < len(search_value) and search_value[i].isdigit():
i += 1
match.append(('replication_port', int(search_value[1:i])))
search_value = search_value[i:]
if search_value.startswith('/'):
i = 1
while i < len(search_value) and search_value[i] != '_':

View File

@ -146,6 +146,17 @@ class Ring(object):
ring_data = RingData.load(self.serialized_path)
self._mtime = getmtime(self.serialized_path)
self._devs = ring_data.devs
# NOTE(akscram): Replication parameters like replication_ip
# and replication_port are required for
# replication process. An old replication
# ring doesn't contain this parameters into
# device.
for dev in self._devs:
if dev:
if 'ip' in dev:
dev.setdefault('replication_ip', dev['ip'])
if 'port' in dev:
dev.setdefault('replication_port', dev['port'])
self._replica2part2dev_id = ring_data._replica2part2dev_id
self._part_shift = ring_data._part_shift

View File

@ -55,6 +55,18 @@ class ContainerController(object):
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
self.node_timeout = int(conf.get('node_timeout', 3))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
replication_server = conf.get('replication_server', None)
if replication_server is None:
allowed_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'REPLICATE',
'POST']
else:
replication_server = config_true_value(replication_server)
if replication_server:
allowed_methods = ['REPLICATE']
else:
allowed_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'POST']
self.replication_server = replication_server
self.allowed_methods = allowed_methods
self.allowed_sync_hosts = [
h.strip()
for h in conf.get('allowed_sync_hosts', '127.0.0.1').split(',')
@ -515,6 +527,8 @@ class ContainerController(object):
try:
method = getattr(self, req.method)
getattr(method, 'publicly_accessible')
if req.method not in self.allowed_methods:
raise AttributeError('Not allowed method.')
except AttributeError:
res = HTTPMethodNotAllowed()
else:

View File

@ -341,9 +341,9 @@ class ObjectReplicator(Daemon):
'--timeout=%s' % self.rsync_io_timeout,
'--contimeout=%s' % self.rsync_io_timeout,
]
node_ip = rsync_ip(node['ip'])
node_ip = rsync_ip(node['replication_ip'])
if self.vm_test_mode:
rsync_module = '%s::object%s' % (node_ip, node['port'])
rsync_module = '%s::object%s' % (node_ip, node['replication_port'])
else:
rsync_module = '%s::object' % node_ip
had_any = False
@ -392,11 +392,11 @@ class ObjectReplicator(Daemon):
success = self.rsync(node, job, suffixes)
if success:
with Timeout(self.http_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'],
job['partition'], 'REPLICATE',
'/' + '-'.join(suffixes),
headers=self.headers)
conn = http_connect(
node['replication_ip'],
node['replication_port'],
node['device'], job['partition'], 'REPLICATE',
'/' + '-'.join(suffixes), headers=self.headers)
conn.getresponse().read()
responses.append(success)
if not suffixes or (len(responses) ==
@ -436,7 +436,7 @@ class ObjectReplicator(Daemon):
try:
with Timeout(self.http_timeout):
resp = http_connect(
node['ip'], node['port'],
node['replication_ip'], node['replication_port'],
node['device'], job['partition'], 'REPLICATE',
'', headers=self.headers).getresponse()
if resp.status == HTTP_INSUFFICIENT_STORAGE:
@ -448,7 +448,7 @@ class ObjectReplicator(Daemon):
self.logger.error(_("Invalid response %(resp)s "
"from %(ip)s"),
{'resp': resp.status,
'ip': node['ip']})
'ip': node['replication_ip']})
continue
remote_hash = pickle.loads(resp.read())
del resp
@ -469,7 +469,7 @@ class ObjectReplicator(Daemon):
self.rsync(node, job, suffixes)
with Timeout(self.http_timeout):
conn = http_connect(
node['ip'], node['port'],
node['replication_ip'], node['replication_port'],
node['device'], job['partition'], 'REPLICATE',
'/' + '-'.join(suffixes),
headers=self.headers)
@ -561,8 +561,8 @@ class ObjectReplicator(Daemon):
jobs = []
ips = whataremyips()
for local_dev in [dev for dev in self.object_ring.devs
if dev and dev['ip'] in ips and
dev['port'] == self.port]:
if dev and dev['replication_ip'] in ips and
dev['replication_port'] == self.port]:
dev_path = join(self.devices_dir, local_dev['device'])
obj_path = join(dev_path, 'objects')
tmp_path = join(dev_path, 'tmp')

View File

@ -442,6 +442,18 @@ class ObjectController(object):
self.max_upload_time = int(conf.get('max_upload_time', 86400))
self.slow = int(conf.get('slow', 0))
self.bytes_per_sync = int(conf.get('mb_per_sync', 512)) * 1024 * 1024
replication_server = conf.get('replication_server', None)
if replication_server is None:
allowed_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'REPLICATE',
'POST']
else:
replication_server = config_true_value(replication_server)
if replication_server:
allowed_methods = ['REPLICATE']
else:
allowed_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'POST']
self.replication_server = replication_server
self.allowed_methods = allowed_methods
default_allowed_headers = '''
content-disposition,
content-encoding,
@ -964,6 +976,8 @@ class ObjectController(object):
try:
method = getattr(self, req.method)
getattr(method, 'publicly_accessible')
if req.method not in self.allowed_methods:
raise AttributeError('Not allowed method.')
except AttributeError:
res = HTTPMethodNotAllowed()
else:

View File

@ -14,7 +14,7 @@
# limitations under the License.
from httplib import HTTPConnection
from os import kill
from os import kill, path
from signal import SIGTERM
from subprocess import Popen, PIPE
from time import sleep, time
@ -29,6 +29,9 @@ from test.probe import CHECK_SERVER_TIMEOUT
def start_server(port, port2server, pids, check=True):
server = port2server[port]
if server[:-1] in ('account', 'container', 'object'):
if not path.exists('/etc/swift/%s-server/%s.conf' %
(server[:-1], server[-1])):
return None
pids[server] = Popen([
'swift-%s-server' % server[:-1],
'/etc/swift/%s-server/%s.conf' % (server[:-1], server[-1])]).pid
@ -45,6 +48,8 @@ def start_server(port, port2server, pids, check=True):
def check_server(port, port2server, pids, timeout=CHECK_SERVER_TIMEOUT):
server = port2server[port]
if server[:-1] in ('account', 'container', 'object'):
if int(server[-1]) > 4:
return None
path = '/connect/1/2'
if server[:-1] == 'container':
path += '/3'
@ -132,9 +137,10 @@ def reset_environment():
pids = {}
try:
port2server = {}
config_dict = {}
for server, port in [('account', 6002), ('container', 6001),
('object', 6000)]:
for number in xrange(1, 5):
for number in xrange(1, 9):
port2server[port + (number * 10)] = '%s%d' % (server, number)
for port in port2server:
start_server(port, port2server, pids, check=False)
@ -145,6 +151,9 @@ def reset_environment():
account_ring = Ring('/etc/swift/account.ring.gz')
container_ring = Ring('/etc/swift/container.ring.gz')
object_ring = Ring('/etc/swift/object.ring.gz')
for name in ('account', 'container', 'object'):
for server in (name, '%s-replicator' % name):
config_dict[server] = '/etc/swift/%s-server/%%d.conf' % name
except BaseException:
try:
raise
@ -154,14 +163,17 @@ def reset_environment():
except Exception:
pass
return pids, port2server, account_ring, container_ring, object_ring, url, \
token, account
token, account, config_dict
def get_to_final_state():
processes = []
for job in ('account-replicator', 'container-replicator',
'object-replicator'):
for number in xrange(1, 5):
for number in xrange(1, 9):
if not path.exists('/etc/swift/%s-server/%d.conf' %
(job.split('-')[0], number)):
continue
processes.append(Popen([
'swift-%s' % job,
'/etc/swift/%s-server/%d.conf' % (job.split('-')[0], number),
@ -180,7 +192,10 @@ def get_to_final_state():
processes = []
for job in ('account-replicator', 'container-replicator',
'object-replicator'):
for number in xrange(1, 5):
for number in xrange(1, 9):
if not path.exists('/etc/swift/%s-server/%d.conf' %
(job.split('-')[0], number)):
continue
processes.append(Popen([
'swift-%s' % job,
'/etc/swift/%s-server/%d.conf' % (job.split('-')[0], number),

View File

@ -29,7 +29,7 @@ class TestAccountFailures(TestCase):
def setUp(self):
(self.pids, self.port2server, self.account_ring, self.container_ring,
self.object_ring, self.url, self.token,
self.account) = reset_environment()
self.account, self.configs) = reset_environment()
def tearDown(self):
kill_servers(self.port2server, self.pids)
@ -140,7 +140,7 @@ class TestAccountFailures(TestCase):
for node in xrange(1, 5):
processes.append(Popen([
'swift-container-updater',
'/etc/swift/container-server/%d.conf' % node,
self.configs['container'] % node,
'once']))
for process in processes:
process.wait()

View File

@ -41,7 +41,7 @@ class TestContainerFailures(TestCase):
def setUp(self):
(self.pids, self.port2server, self.account_ring, self.container_ring,
self.object_ring, self.url, self.token,
self.account) = reset_environment()
self.account, self.configs) = reset_environment()
def tearDown(self):
kill_servers(self.port2server, self.pids)
@ -120,8 +120,7 @@ class TestContainerFailures(TestCase):
node_id = (onode['port'] - 6000) / 10
device = onode['device']
hash_str = hash_path(self.account, container)
server_conf = readconf('/etc/swift/container-server/%s.conf' %
node_id)
server_conf = readconf(self.configs['container'] % node_id)
devices = server_conf['app:container-server']['devices']
obj_dir = '%s/%s/containers/%s/%s/%s/' % (devices,
device, opart,

View File

@ -34,7 +34,7 @@ class TestEmptyDevice(TestCase):
def setUp(self):
(self.pids, self.port2server, self.account_ring, self.container_ring,
self.object_ring, self.url, self.token,
self.account) = reset_environment()
self.account, self.configs) = reset_environment()
def tearDown(self):
kill_servers(self.port2server, self.pids)
@ -42,8 +42,7 @@ class TestEmptyDevice(TestCase):
def _get_objects_dir(self, onode):
device = onode['device']
node_id = (onode['port'] - 6000) / 10
obj_server_conf = readconf('/etc/swift/object-server/%s.conf' %
node_id)
obj_server_conf = readconf(self.configs['object'] % node_id)
devices = obj_server_conf['app:object-server']['devices']
obj_dir = '%s/%s' % (devices, device)
return obj_dir
@ -124,12 +123,20 @@ class TestEmptyDevice(TestCase):
self.assertEquals(exc.http_status, 404)
self.assertFalse(os.path.exists(obj_dir))
try:
port_num = onode['replication_port']
except KeyError:
port_num = onode['port']
try:
another_port_num = another_onode['replication_port']
except KeyError:
another_port_num = another_onode['port']
call(['swift-object-replicator',
'/etc/swift/object-server/%d.conf' %
((onode['port'] - 6000) / 10), 'once'])
self.configs['object-replicator'] %
((port_num - 6000) / 10), 'once'])
call(['swift-object-replicator',
'/etc/swift/object-server/%d.conf' %
((another_onode['port'] - 6000) / 10), 'once'])
self.configs['object-replicator'] %
((another_port_num - 6000) / 10), 'once'])
odata = direct_client.direct_get_object(onode, opart, self.account,
container, obj)[-1]

View File

@ -30,7 +30,7 @@ class TestObjectAsyncUpdate(TestCase):
def setUp(self):
(self.pids, self.port2server, self.account_ring, self.container_ring,
self.object_ring, self.url, self.token,
self.account) = reset_environment()
self.account, self.configs) = reset_environment()
def tearDown(self):
kill_servers(self.port2server, self.pids)
@ -57,7 +57,7 @@ class TestObjectAsyncUpdate(TestCase):
processes = []
for node in xrange(1, 5):
processes.append(Popen(['swift-object-updater',
'/etc/swift/object-server/%d.conf' % node,
self.configs['object'] % node,
'once']))
for process in processes:
process.wait()

View File

@ -38,7 +38,7 @@ class TestObjectFailures(TestCase):
def setUp(self):
(self.pids, self.port2server, self.account_ring, self.container_ring,
self.object_ring, self.url, self.token,
self.account) = reset_environment()
self.account, self.configs) = reset_environment()
def tearDown(self):
kill_servers(self.port2server, self.pids)
@ -54,8 +54,7 @@ class TestObjectFailures(TestCase):
node_id = (onode['port'] - 6000) / 10
device = onode['device']
hash_str = hash_path(self.account, container, obj)
obj_server_conf = readconf('/etc/swift/object-server/%s.conf' %
node_id)
obj_server_conf = readconf(self.configs['object'] % node_id)
devices = obj_server_conf['app:object-server']['devices']
obj_dir = '%s/%s/objects/%s/%s/%s/' % (devices,
device, opart,

View File

@ -30,7 +30,7 @@ class TestObjectHandoff(TestCase):
def setUp(self):
(self.pids, self.port2server, self.account_ring, self.container_ring,
self.object_ring, self.url, self.token,
self.account) = reset_environment()
self.account, self.configs) = reset_environment()
def tearDown(self):
kill_servers(self.port2server, self.pids)
@ -116,14 +116,23 @@ class TestObjectHandoff(TestCase):
# Run the extra server last so it'll remove its extra partition
processes = []
for node in onodes:
try:
port_num = node['replication_port']
except KeyError:
port_num = node['port']
processes.append(Popen(['swift-object-replicator',
'/etc/swift/object-server/%d.conf' %
((node['port'] - 6000) / 10), 'once']))
self.configs['object-replicator'] %
((port_num - 6000) / 10),
'once']))
for process in processes:
process.wait()
try:
another_port_num = another_onode['replication_port']
except KeyError:
another_port_num = another_onode['port']
call(['swift-object-replicator',
'/etc/swift/object-server/%d.conf' %
((another_onode['port'] - 6000) / 10), 'once'])
self.configs['object-replicator'] %
((another_port_num - 6000) / 10), 'once'])
odata = direct_client.direct_get_object(onode, opart, self.account,
container, obj)[-1]
if odata != 'VERIFY':
@ -163,14 +172,19 @@ class TestObjectHandoff(TestCase):
# Run the extra server last so it'll remove its extra partition
processes = []
for node in onodes:
try:
port_num = node['replication_port']
except KeyError:
port_num = node['port']
processes.append(Popen(['swift-object-replicator',
'/etc/swift/object-server/%d.conf' %
((node['port'] - 6000) / 10), 'once']))
self.configs['object-replicator'] %
((port_num - 6000) / 10),
'once']))
for process in processes:
process.wait()
call(['swift-object-replicator',
'/etc/swift/object-server/%d.conf' %
((another_onode['port'] - 6000) / 10), 'once'])
self.configs['object-replicator'] %
((another_port_num - 6000) / 10), 'once'])
exc = None
try:
direct_client.direct_get_object(another_onode, opart, self.account,

View File

@ -0,0 +1,207 @@
#!/usr/bin/python -u
# Copyright (c) 2010-2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from subprocess import call, Popen
from unittest import main, TestCase
from uuid import uuid4
import os
import time
import shutil
from swiftclient import client
#from swift.common import direct_client
from test.probe.common import kill_server, kill_servers, reset_environment, \
start_server
def collect_info(path_list):
"""
Recursive collect dirs and files in path_list directory.
:param path_list: start directory for collecting
:return files_list, dir_list: tuple of included
directories and files
"""
files_list = []
dir_list = []
for path in path_list:
temp_files_list = []
temp_dir_list = []
for root, dirs, files in os.walk(path):
temp_files_list += files
temp_dir_list += dirs
files_list.append(temp_files_list)
dir_list.append(temp_dir_list)
return files_list, dir_list
def find_max_occupancy_node(dir_list):
"""
Find node with maximum occupancy.
:param list_dir: list of directories for each node.
:return number: number node in list_dir
"""
count = 0
number = 0
lenght = 0
for dirs in dir_list:
if lenght < len(dirs):
lenght = len(dirs)
number = count
count += 1
return number
class TestReplicatorFunctions(TestCase):
"""
Class for testing replicators and replication servers.
By default configuration - replication servers not used.
For testing separete replication servers servers need to change
ring's files using set_info command or new ring's files with
different port values.
"""
def setUp(self):
"""
Reset all environment and start all servers.
"""
(self.pids, self.port2server, self.account_ring, self.container_ring,
self.object_ring, self.url, self.token,
self.account, self.configs) = reset_environment()
def tearDown(self):
"""
Stop all servers.
"""
kill_servers(self.port2server, self.pids)
def test_main(self):
# Create one account, container and object file.
# Find node with account, container and object replicas.
# Delete all directories and files from this node (device).
# Wait 60 seconds and check replication results.
# Delete directories and files in objects storage without
# deleting file "hashes.pkl".
# Check, that files not replicated.
# Delete file "hashes.pkl".
# Check, that all files were replicated.
path_list = ['/srv/1/node/sdb1/', '/srv/2/node/sdb2/',
'/srv/3/node/sdb3/', '/srv/4/node/sdb4/']
# Put data to storage nodes
container = 'container-%s' % uuid4()
client.put_container(self.url, self.token, container)
obj = 'object-%s' % uuid4()
client.put_object(self.url, self.token, container, obj, 'VERIFY')
# Get all data file information
(files_list, dirs_list) = collect_info(path_list)
num = find_max_occupancy_node(dirs_list)
test_node = path_list[num]
test_node_files_list = []
for files in files_list[num]:
if not files.endswith('.pending'):
test_node_files_list.append(files)
test_node_dirs_list = dirs_list[num]
# Run all replicators
processes = []
for num in xrange(1, 9):
for server in ['object-replicator',
'container-replicator',
'account-replicator']:
if not os.path.exists(self.configs[server] % (num)):
continue
processes.append(Popen(['swift-%s' % (server),
self.configs[server] % (num),
'forever']))
# Delete some files
for dirs in os.listdir(test_node):
shutil.rmtree(test_node+dirs)
self.assertFalse(os.listdir(test_node))
# We will keep trying these tests until they pass for up to 60s
begin = time.time()
while True:
(new_files_list, new_dirs_list) = collect_info([test_node])
try:
# Check replicate files and dirs
for files in test_node_files_list:
self.assertTrue(files in new_files_list[0])
for dirs in test_node_dirs_list:
self.assertTrue(dirs in new_dirs_list[0])
break
except Exception:
if time.time() - begin > 60:
raise
time.sleep(1)
# Check behavior by deleting hashes.pkl file
for dirs in os.listdir(test_node + 'objects/'):
for input_dirs in os.listdir(test_node + 'objects/' + dirs):
eval_dirs = '/' + input_dirs
if os.path.isdir(test_node + 'objects/' + dirs + eval_dirs):
shutil.rmtree(test_node + 'objects/' + dirs + eval_dirs)
# We will keep trying these tests until they pass for up to 60s
begin = time.time()
while True:
try:
for dirs in os.listdir(test_node + 'objects/'):
for input_dirs in os.listdir(
test_node + 'objects/' + dirs):
self.assertFalse(os.path.isdir(test_node + 'objects/' +
dirs + '/' + input_dirs))
break
except Exception:
if time.time() - begin > 60:
raise
time.sleep(1)
for dirs in os.listdir(test_node + 'objects/'):
os.remove(test_node + 'objects/' + dirs + '/hashes.pkl')
# We will keep trying these tests until they pass for up to 60s
begin = time.time()
while True:
try:
(new_files_list, new_dirs_list) = collect_info([test_node])
# Check replicate files and dirs
for files in test_node_files_list:
self.assertTrue(files in new_files_list[0])
for dirs in test_node_dirs_list:
self.assertTrue(dirs in new_dirs_list[0])
break
except Exception:
if time.time() - begin > 60:
raise
time.sleep(1)
for process in processes:
process.kill()
if __name__ == '__main__':
main()

View File

@ -15,6 +15,7 @@
import errno
import os
import mock
import unittest
from shutil import rmtree
from StringIO import StringIO
@ -1197,6 +1198,120 @@ class TestAccountController(unittest.TestCase):
self.assertEquals(resp.content_type, 'application/xml')
self.assertEquals(resp.charset, 'utf-8')
def test_serv_reserv(self):
"""
Test replication_server flag
was set from configuration file.
"""
conf = {'devices': self.testdir, 'mount_check': 'false'}
self.assertEquals(AccountController(conf).replication_server, None)
for val in [True, '1', 'True', 'true']:
conf['replication_server'] = val
self.assertTrue(AccountController(conf).replication_server)
for val in [False, 0, '0', 'False', 'false', 'test_string']:
conf['replication_server'] = val
self.assertFalse(AccountController(conf).replication_server)
def test_list_allowed_methods(self):
""" Test list of allowed_methods """
methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'REPLICATE', 'POST']
self.assertEquals(self.controller.allowed_methods, methods)
def test_allowed_methods_from_configuration_file(self):
"""
Test list of allowed_methods which
were set from configuration file.
"""
conf = {'devices': self.testdir, 'mount_check': 'false'}
self.assertEquals(AccountController(conf).allowed_methods,
['DELETE', 'PUT', 'HEAD', 'GET', 'REPLICATE',
'POST'])
conf['replication_server'] = 'True'
self.assertEquals(AccountController(conf).allowed_methods,
['REPLICATE'])
conf['replication_server'] = 'False'
self.assertEquals(AccountController(conf).allowed_methods,
['DELETE', 'PUT', 'HEAD', 'GET', 'POST'])
def test_correct_allowed_method(self):
"""
Test correct work for allowed method using
swift.account_server.AccountController.__call__
"""
inbuf = StringIO()
errbuf = StringIO()
outbuf = StringIO()
def start_response(*args):
""" Sends args to outbuf """
outbuf.writelines(args)
method = self.controller.allowed_methods[0]
env = {'REQUEST_METHOD': method,
'SCRIPT_NAME': '',
'PATH_INFO': '/sda1/p/a/c',
'SERVER_NAME': '127.0.0.1',
'SERVER_PORT': '8080',
'SERVER_PROTOCOL': 'HTTP/1.0',
'CONTENT_LENGTH': '0',
'wsgi.version': (1, 0),
'wsgi.url_scheme': 'http',
'wsgi.input': inbuf,
'wsgi.errors': errbuf,
'wsgi.multithread': False,
'wsgi.multiprocess': False,
'wsgi.run_once': False}
answer = ['<html><h1>Method Not Allowed</h1><p>The method is not '
'allowed for this resource.</p></html>']
with mock.patch.object(self.controller, method,
return_value=mock.MagicMock()) as mock_method:
response = self.controller.__call__(env, start_response)
self.assertNotEqual(response, answer)
self.assertEqual(mock_method.call_count, 1)
def test_not_allowed_method(self):
"""
Test correct work for NOT allowed method using
swift.account_server.AccountController.__call__
"""
inbuf = StringIO()
errbuf = StringIO()
outbuf = StringIO()
def start_response(*args):
""" Sends args to outbuf """
outbuf.writelines(args)
method = self.controller.allowed_methods[0]
env = {'REQUEST_METHOD': method,
'SCRIPT_NAME': '',
'PATH_INFO': '/sda1/p/a/c',
'SERVER_NAME': '127.0.0.1',
'SERVER_PORT': '8080',
'SERVER_PROTOCOL': 'HTTP/1.0',
'CONTENT_LENGTH': '0',
'wsgi.version': (1, 0),
'wsgi.url_scheme': 'http',
'wsgi.input': inbuf,
'wsgi.errors': errbuf,
'wsgi.multithread': False,
'wsgi.multiprocess': False,
'wsgi.run_once': False}
answer = ['<html><h1>Method Not Allowed</h1><p>The method is not '
'allowed for this resource.</p></html>']
with mock.patch.object(self.controller, method,
return_value=mock.MagicMock()) as mock_method:
self.controller.allowed_methods.remove(method)
response = self.controller.__call__(env, start_response)
self.assertEqual(mock_method.call_count, 0)
self.assertEqual(response, answer)
self.controller.allowed_methods.append(method)
if __name__ == '__main__':
unittest.main()

View File

@ -710,8 +710,20 @@ class TestRingBuilder(unittest.TestCase):
'ip': '127.0.0.2', 'port': 10002, 'device': 'sdc1',
'meta': 'meta2'},
{'id': 3, 'region': 1, 'zone': 3, 'weight': 2,
'ip': '127.0.0.3', 'port': 10003, 'device': 'sdffd1',
'meta': 'meta3'}]
'ip': '127.0.0.3', 'port': 10003, 'device': 'sdd1',
'meta': 'meta3'},
{'id': 4, 'region': 2, 'zone': 4, 'weight': 1,
'ip': '127.0.0.4', 'port': 10004, 'device': 'sde1',
'meta': 'meta4', 'replication_ip': '127.0.0.10',
'replication_port': 20000},
{'id': 5, 'region': 2, 'zone': 5, 'weight': 2,
'ip': '127.0.0.5', 'port': 10005, 'device': 'sdf1',
'meta': 'meta5', 'replication_ip': '127.0.0.11',
'replication_port': 20001},
{'id': 6, 'region': 2, 'zone': 6, 'weight': 2,
'ip': '127.0.0.6', 'port': 10006, 'device': 'sdg1',
'meta': 'meta6', 'replication_ip': '127.0.0.12',
'replication_port': 20002}]
for d in devs:
rb.add_dev(d)
rb.rebalance()
@ -731,6 +743,12 @@ class TestRingBuilder(unittest.TestCase):
self.assertEquals(res, [devs[1]])
res = rb.search_devs(':10001')
self.assertEquals(res, [devs[1]])
res = rb.search_devs('R127.0.0.10')
self.assertEquals(res, [devs[4]])
res = rb.search_devs('R[127.0.0.10]:20000')
self.assertEquals(res, [devs[4]])
res = rb.search_devs('R:20000')
self.assertEquals(res, [devs[4]])
res = rb.search_devs('/sdb1')
self.assertEquals(res, [devs[1]])
res = rb.search_devs('_meta1')

View File

@ -107,14 +107,22 @@ class TestRing(unittest.TestCase):
array.array('H', [0, 1, 0, 1]),
array.array('H', [3, 4, 3, 4])]
self.intended_devs = [{'id': 0, 'region': 0, 'zone': 0, 'weight': 1.0,
'ip': '10.1.1.1', 'port': 6000},
'ip': '10.1.1.1', 'port': 6000,
'replication_ip': '10.1.0.1',
'replication_port': 6066},
{'id': 1, 'region': 0, 'zone': 0, 'weight': 1.0,
'ip': '10.1.1.1', 'port': 6000},
'ip': '10.1.1.1', 'port': 6000,
'replication_ip': '10.1.0.2',
'replication_port': 6066},
None,
{'id': 3, 'region': 0, 'zone': 2, 'weight': 1.0,
'ip': '10.1.2.1', 'port': 6000},
'ip': '10.1.2.1', 'port': 6000,
'replication_ip': '10.2.0.1',
'replication_port': 6066},
{'id': 4, 'region': 0, 'zone': 2, 'weight': 1.0,
'ip': '10.1.2.2', 'port': 6000}]
'ip': '10.1.2.2', 'port': 6000,
'replication_ip': '10.2.0.1',
'replication_port': 6066}]
self.intended_part_shift = 30
self.intended_reload_time = 15
ring.RingData(self.intended_replica2part2dev_id,
@ -201,6 +209,45 @@ class TestRing(unittest.TestCase):
self.assertEquals(len(self.ring.devs), 9)
self.assertNotEquals(self.ring._mtime, orig_mtime)
def test_reload_without_replication(self):
replication_less_devs = [{'id': 0, 'region': 0, 'zone': 0,
'weight': 1.0, 'ip': '10.1.1.1',
'port': 6000},
{'id': 1, 'region': 0, 'zone': 0,
'weight': 1.0, 'ip': '10.1.1.1',
'port': 6000},
None,
{'id': 3, 'region': 0, 'zone': 2,
'weight': 1.0, 'ip': '10.1.2.1',
'port': 6000},
{'id': 4, 'region': 0, 'zone': 2,
'weight': 1.0, 'ip': '10.1.2.2',
'port': 6000}]
intended_devs = [{'id': 0, 'region': 0, 'zone': 0, 'weight': 1.0,
'ip': '10.1.1.1', 'port': 6000,
'replication_ip': '10.1.1.1',
'replication_port': 6000},
{'id': 1, 'region': 0, 'zone': 0, 'weight': 1.0,
'ip': '10.1.1.1', 'port': 6000,
'replication_ip': '10.1.1.1',
'replication_port': 6000},
None,
{'id': 3, 'region': 0, 'zone': 2, 'weight': 1.0,
'ip': '10.1.2.1', 'port': 6000,
'replication_ip': '10.1.2.1',
'replication_port': 6000},
{'id': 4, 'region': 0, 'zone': 2, 'weight': 1.0,
'ip': '10.1.2.2', 'port': 6000,
'replication_ip': '10.1.2.2',
'replication_port': 6000}]
testgz = os.path.join(self.testdir, 'without_replication.ring.gz')
ring.RingData(self.intended_replica2part2dev_id,
replication_less_devs, self.intended_part_shift).save(testgz)
self.ring = ring.Ring(self.testdir,
reload_time=self.intended_reload_time,
ring_name='without_replication')
self.assertEquals(self.ring.devs, intended_devs)
def test_get_part(self):
part1 = self.ring.get_part('a')
nodes1 = self.ring.get_part_nodes(part1)

View File

@ -198,7 +198,8 @@ class TestDBReplicator(unittest.TestCase):
self.delete_db_calls.append(object_file)
def test_repl_connection(self):
node = {'ip': '127.0.0.1', 'port': 80, 'device': 'sdb1'}
node = {'replication_ip': '127.0.0.1', 'replication_port': 80,
'device': 'sdb1'}
conn = db_replicator.ReplConnection(node, '1234567890', 'abcdefg',
logging.getLogger())
@ -253,11 +254,13 @@ class TestDBReplicator(unittest.TestCase):
def test_rsync_db(self):
replicator = TestReplicator({})
replicator._rsync_file = lambda *args: True
fake_device = {'ip': '127.0.0.1', 'device': 'sda1'}
fake_device = {'replication_ip': '127.0.0.1', 'device': 'sda1'}
replicator._rsync_db(FakeBroker(), fake_device, ReplHttp(), 'abcd')
def test_rsync_db_rsync_file_call(self):
fake_device = {'ip': '127.0.0.1', 'port': '0', 'device': 'sda1'}
fake_device = {'ip': '127.0.0.1', 'port': '0',
'replication_ip': '127.0.0.1', 'replication_port': '0',
'device': 'sda1'}
def mock_rsync_ip(ip):
self.assertEquals(fake_device['ip'], ip)
@ -305,7 +308,8 @@ class TestDBReplicator(unittest.TestCase):
with patch('os.path.exists', lambda *args: True):
replicator = MyTestReplicator()
fake_device = {'ip': '127.0.0.1', 'device': 'sda1'}
fake_device = {'ip': '127.0.0.1', 'replication_ip': '127.0.0.1',
'device': 'sda1'}
replicator._rsync_db(FakeBroker(), fake_device, ReplHttp(), 'abcd')
self.assertEqual(True, replicator._rsync_file_called)
@ -332,7 +336,8 @@ class TestDBReplicator(unittest.TestCase):
with patch('os.path.exists', lambda *args: True):
broker = FakeBroker()
replicator = MyTestReplicator(broker)
fake_device = {'ip': '127.0.0.1', 'device': 'sda1'}
fake_device = {'ip': '127.0.0.1', 'replication_ip': '127.0.0.1',
'device': 'sda1'}
replicator._rsync_db(broker, fake_device, ReplHttp(), 'abcd')
self.assertEquals(2, replicator._rsync_file_call_count)
@ -341,7 +346,8 @@ class TestDBReplicator(unittest.TestCase):
with patch('os.path.getmtime', ChangingMtimesOs()):
broker = FakeBroker()
replicator = MyTestReplicator(broker)
fake_device = {'ip': '127.0.0.1', 'device': 'sda1'}
fake_device = {'ip': '127.0.0.1', 'replication_ip': '127.0.0.1',
'device': 'sda1'}
replicator._rsync_db(broker, fake_device, ReplHttp(), 'abcd')
self.assertEquals(2, replicator._rsync_file_call_count)

View File

@ -15,6 +15,7 @@
import operator
import os
import mock
import unittest
from contextlib import contextmanager
from shutil import rmtree
@ -1377,6 +1378,123 @@ class TestContainerController(unittest.TestCase):
'user-agent': 'container-server %d' % os.getpid(),
'x-trans-id': '-'})})
def test_serv_reserv(self):
"""
Test replication_server flag
was set from configuration file.
"""
container_controller = container_server.ContainerController
conf = {'devices': self.testdir, 'mount_check': 'false'}
self.assertEquals(container_controller(conf).replication_server, None)
for val in [True, '1', 'True', 'true']:
conf['replication_server'] = val
self.assertTrue(container_controller(conf).replication_server)
for val in [False, 0, '0', 'False', 'false', 'test_string']:
conf['replication_server'] = val
self.assertFalse(container_controller(conf).replication_server)
def test_list_allowed_methods(self):
""" Test list of allowed_methods """
methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'REPLICATE', 'POST']
self.assertEquals(self.controller.allowed_methods, methods)
def test_allowed_methods_from_configuration_file(self):
"""
Test list of allowed_methods which
were set from configuration file.
"""
container_controller = container_server.ContainerController
conf = {'devices': self.testdir, 'mount_check': 'false'}
self.assertEquals(container_controller(conf).allowed_methods,
['DELETE', 'PUT', 'HEAD', 'GET', 'REPLICATE',
'POST'])
conf['replication_server'] = 'True'
self.assertEquals(container_controller(conf).allowed_methods,
['REPLICATE'])
conf['replication_server'] = 'False'
self.assertEquals(container_controller(conf).allowed_methods,
['DELETE', 'PUT', 'HEAD', 'GET', 'POST'])
def test_correct_allowed_method(self):
"""
Test correct work for allowed method using
swift.container_server.ContainerController.__call__
"""
inbuf = StringIO()
errbuf = StringIO()
outbuf = StringIO()
def start_response(*args):
""" Sends args to outbuf """
outbuf.writelines(args)
method = self.controller.allowed_methods[0]
env = {'REQUEST_METHOD': method,
'SCRIPT_NAME': '',
'PATH_INFO': '/sda1/p/a/c',
'SERVER_NAME': '127.0.0.1',
'SERVER_PORT': '8080',
'SERVER_PROTOCOL': 'HTTP/1.0',
'CONTENT_LENGTH': '0',
'wsgi.version': (1, 0),
'wsgi.url_scheme': 'http',
'wsgi.input': inbuf,
'wsgi.errors': errbuf,
'wsgi.multithread': False,
'wsgi.multiprocess': False,
'wsgi.run_once': False}
answer = ['<html><h1>Method Not Allowed</h1><p>The method is not '
'allowed for this resource.</p></html>']
with mock.patch.object(self.controller, method,
return_value=mock.MagicMock()) as mock_method:
response = self.controller.__call__(env, start_response)
self.assertNotEqual(response, answer)
self.assertEqual(mock_method.call_count, 1)
def test_not_allowed_method(self):
"""
Test correct work for NOT allowed method using
swift.container_server.ContainerController.__call__
"""
inbuf = StringIO()
errbuf = StringIO()
outbuf = StringIO()
def start_response(*args):
""" Sends args to outbuf """
outbuf.writelines(args)
method = self.controller.allowed_methods[0]
env = {'REQUEST_METHOD': method,
'SCRIPT_NAME': '',
'PATH_INFO': '/sda1/p/a/c',
'SERVER_NAME': '127.0.0.1',
'SERVER_PORT': '8080',
'SERVER_PROTOCOL': 'HTTP/1.0',
'CONTENT_LENGTH': '0',
'wsgi.version': (1, 0),
'wsgi.url_scheme': 'http',
'wsgi.input': inbuf,
'wsgi.errors': errbuf,
'wsgi.multithread': False,
'wsgi.multiprocess': False,
'wsgi.run_once': False}
answer = ['<html><h1>Method Not Allowed</h1><p>The method is not '
'allowed for this resource.</p></html>']
with mock.patch.object(self.controller, method,
return_value=mock.MagicMock()) as mock_method:
self.controller.allowed_methods.remove(method)
response = self.controller.__call__(env, start_response)
self.assertEqual(mock_method.call_count, 0)
self.assertEqual(response, answer)
self.controller.allowed_methods.append(method)
if __name__ == '__main__':
unittest.main()

View File

@ -17,6 +17,7 @@ from __future__ import with_statement
import unittest
import os
import mock
from gzip import GzipFile
from shutil import rmtree
import cPickle as pickle
@ -25,7 +26,7 @@ import tempfile
from contextlib import contextmanager
from eventlet.green import subprocess
from eventlet import Timeout, tpool
from test.unit import FakeLogger, mock
from test.unit import FakeLogger, mock as unit_mock
from swift.common import utils
from swift.common.utils import hash_path, mkdirs, normalize_timestamp
from swift.common import ring
@ -232,7 +233,7 @@ class TestObjectReplicator(unittest.TestCase):
def getmtime(filename):
i[0] += 1
return 1
with mock({'os.path.getmtime': getmtime}):
with unit_mock({'os.path.getmtime': getmtime}):
hashed, hashes = object_replicator.get_hashes(
part, recalculate=['a83'])
self.assertEquals(i[0], 2)
@ -247,7 +248,7 @@ class TestObjectReplicator(unittest.TestCase):
def getmtime(filename):
i[0] += 1
return 1
with mock({'os.path.getmtime': getmtime}):
with unit_mock({'os.path.getmtime': getmtime}):
hashed, hashes = object_replicator.get_hashes(
part, recalculate=[])
# getmtime will actually not get called. Initially, the pickle.load
@ -270,7 +271,7 @@ class TestObjectReplicator(unittest.TestCase):
if i[0] < 3:
i[0] += 1
return i[0]
with mock({'os.path.getmtime': getmtime}):
with unit_mock({'os.path.getmtime': getmtime}):
hashed, hashes = object_replicator.get_hashes(
part, recalculate=['a83'])
self.assertEquals(i[0], 3)
@ -612,5 +613,129 @@ class TestObjectReplicator(unittest.TestCase):
with _mock_process([(0, "stuff in log")] * 100):
self.replicator.replicate()
@mock.patch('swift.obj.replicator.tpool_reraise', autospec=True)
@mock.patch('swift.obj.replicator.http_connect', autospec=True)
def test_update(self, mock_http, mock_tpool_reraise):
def set_default(self):
self.replicator.suffix_count = 0
self.replicator.suffix_sync = 0
self.replicator.suffix_hash = 0
self.replicator.replication_count = 0
self.replicator.partition_times = []
self.headers = {'Content-Length': '0',
'user-agent': 'obj-replicator %s' % os.getpid()}
self.replicator.logger = mock_logger = mock.MagicMock()
mock_tpool_reraise.return_value = (0, {})
all_jobs = self.replicator.collect_jobs()
jobs = [job for job in all_jobs if not job['delete']]
mock_http.return_value = answer = mock.MagicMock()
answer.getresponse.return_value = resp = mock.MagicMock()
# Check uncorrect http_connect with status 507 and
# count of attempts and call args
resp.status = 507
error = '%(ip)s/%(device)s responded as unmounted'
expect = 'Error syncing partition'
for job in jobs:
set_default(self)
self.replicator.update(job)
self.assertTrue(error in mock_logger.error.call_args[0][0])
self.assertTrue(expect in mock_logger.exception.call_args[0][0])
self.assertEquals(len(self.replicator.partition_times), 1)
self.assertEquals(mock_http.call_count, len(self.ring._devs) - 1)
reqs = []
for node in job['nodes']:
reqs.append(mock.call(node['ip'], node['port'], node['device'],
job['partition'], 'REPLICATE', '',
headers=self.headers))
if job['partition'] == '0':
self.assertEquals(self.replicator.suffix_hash, 0)
mock_http.assert_has_calls(reqs, any_order=True)
mock_http.reset_mock()
mock_logger.reset_mock()
# Check uncorrect http_connect with status 400 != HTTP_OK
resp.status = 400
error = 'Invalid response %(resp)s from %(ip)s'
for job in jobs:
set_default(self)
self.replicator.update(job)
self.assertTrue(error in mock_logger.error.call_args[0][0])
self.assertEquals(len(self.replicator.partition_times), 1)
mock_logger.reset_mock()
# Check successful http_connection and exception with
# uncorrect pickle.loads(resp.read())
resp.status = 200
expect = 'Error syncing with node:'
for job in jobs:
set_default(self)
self.replicator.update(job)
self.assertTrue(expect in mock_logger.exception.call_args[0][0])
self.assertEquals(len(self.replicator.partition_times), 1)
mock_logger.reset_mock()
# Check successful http_connection and correct
# pickle.loads(resp.read()) for non local node
resp.status = 200
local_job = None
resp.read.return_value = pickle.dumps({})
for job in jobs:
set_default(self)
if job['partition'] == '0':
local_job = job.copy()
continue
self.replicator.update(job)
self.assertEquals(mock_logger.exception.call_count, 0)
self.assertEquals(mock_logger.error.call_count, 0)
self.assertEquals(len(self.replicator.partition_times), 1)
self.assertEquals(self.replicator.suffix_hash, 0)
self.assertEquals(self.replicator.suffix_sync, 0)
self.assertEquals(self.replicator.suffix_count, 0)
mock_logger.reset_mock()
# Check seccesfull http_connect and rsync for local node
mock_tpool_reraise.return_value = (1, {'a83': 'ba47fd314242ec8c'
'7efb91f5d57336e4'})
resp.read.return_value = pickle.dumps({'a83': 'c130a2c17ed45102a'
'ada0f4eee69494ff'})
set_default(self)
self.replicator.rsync = fake_func = mock.MagicMock()
self.replicator.update(local_job)
reqs = []
for node in local_job['nodes']:
reqs.append(mock.call(node, local_job, ['a83']))
fake_func.assert_has_calls(reqs, any_order=True)
self.assertEquals(fake_func.call_count, 2)
self.assertEquals(self.replicator.replication_count, 1)
self.assertEquals(self.replicator.suffix_sync, 2)
self.assertEquals(self.replicator.suffix_hash, 1)
self.assertEquals(self.replicator.suffix_count, 1)
mock_http.reset_mock()
mock_logger.reset_mock()
# test for replication params
repl_job = local_job.copy()
for node in repl_job['nodes']:
node['replication_ip'] = '127.0.0.11'
node['replication_port'] = '6011'
set_default(self)
self.replicator.update(repl_job)
reqs = []
for node in repl_job['nodes']:
reqs.append(mock.call(node['replication_ip'],
node['replication_port'], node['device'],
repl_job['partition'], 'REPLICATE',
'', headers=self.headers))
reqs.append(mock.call(node['replication_ip'],
node['replication_port'], node['device'],
repl_job['partition'], 'REPLICATE',
'/a83', headers=self.headers))
mock_http.assert_has_calls(reqs, any_order=True)
if __name__ == '__main__':
unittest.main()

View File

@ -18,6 +18,7 @@
import cPickle as pickle
import operator
import os
import mock
import unittest
import email
from shutil import rmtree
@ -2608,5 +2609,124 @@ class TestObjectController(unittest.TestCase):
finally:
object_server.fallocate = orig_fallocate
def test_serv_reserv(self):
"""
Test replication_server flag
was set from configuration file.
"""
conf = {'devices': self.testdir, 'mount_check': 'false'}
self.assertEquals(
object_server.ObjectController(conf).replication_server, None)
for val in [True, '1', 'True', 'true']:
conf['replication_server'] = val
self.assertTrue(
object_server.ObjectController(conf).replication_server)
for val in [False, 0, '0', 'False', 'false', 'test_string']:
conf['replication_server'] = val
self.assertFalse(
object_server.ObjectController(conf).replication_server)
def test_list_allowed_methods(self):
""" Test list of allowed_methods """
methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'REPLICATE', 'POST']
self.assertEquals(self.object_controller.allowed_methods, methods)
def test_allowed_methods_from_configuration_file(self):
"""
Test list of allowed_methods which
were set from configuration file.
"""
conf = {'devices': self.testdir, 'mount_check': 'false'}
self.assertEquals(object_server.ObjectController(conf).allowed_methods,
['DELETE', 'PUT', 'HEAD', 'GET', 'REPLICATE',
'POST'])
conf['replication_server'] = 'True'
self.assertEquals(object_server.ObjectController(conf).allowed_methods,
['REPLICATE'])
conf['replication_server'] = 'False'
self.assertEquals(object_server.ObjectController(conf).allowed_methods,
['DELETE', 'PUT', 'HEAD', 'GET', 'POST'])
def test_correct_allowed_method(self):
"""
Test correct work for allowed method using
swift.object_server.ObjectController.__call__
"""
inbuf = StringIO()
errbuf = StringIO()
outbuf = StringIO()
def start_response(*args):
""" Sends args to outbuf """
outbuf.writelines(args)
method = self.object_controller.allowed_methods[0]
env = {'REQUEST_METHOD': method,
'SCRIPT_NAME': '',
'PATH_INFO': '/sda1/p/a/c',
'SERVER_NAME': '127.0.0.1',
'SERVER_PORT': '8080',
'SERVER_PROTOCOL': 'HTTP/1.0',
'CONTENT_LENGTH': '0',
'wsgi.version': (1, 0),
'wsgi.url_scheme': 'http',
'wsgi.input': inbuf,
'wsgi.errors': errbuf,
'wsgi.multithread': False,
'wsgi.multiprocess': False,
'wsgi.run_once': False}
answer = ['<html><h1>Method Not Allowed</h1><p>The method is not '
'allowed for this resource.</p></html>']
with mock.patch.object(self.object_controller, method,
return_value=mock.MagicMock()) as mock_method:
response = self.object_controller.__call__(env, start_response)
self.assertNotEqual(response, answer)
self.assertEqual(mock_method.call_count, 1)
def test_not_allowed_method(self):
"""
Test correct work for NOT allowed method using
swift.object_server.ObjectController.__call__
"""
inbuf = StringIO()
errbuf = StringIO()
outbuf = StringIO()
def start_response(*args):
""" Sends args to outbuf """
outbuf.writelines(args)
method = self.object_controller.allowed_methods[0]
env = {'REQUEST_METHOD': method,
'SCRIPT_NAME': '',
'PATH_INFO': '/sda1/p/a/c',
'SERVER_NAME': '127.0.0.1',
'SERVER_PORT': '8080',
'SERVER_PROTOCOL': 'HTTP/1.0',
'CONTENT_LENGTH': '0',
'wsgi.version': (1, 0),
'wsgi.url_scheme': 'http',
'wsgi.input': inbuf,
'wsgi.errors': errbuf,
'wsgi.multithread': False,
'wsgi.multiprocess': False,
'wsgi.run_once': False}
answer = ['<html><h1>Method Not Allowed</h1><p>The method is not '
'allowed for this resource.</p></html>']
with mock.patch.object(self.object_controller, method,
return_value=mock.MagicMock()) as mock_method:
self.object_controller.allowed_methods.remove(method)
response = self.object_controller.__call__(env, start_response)
self.assertEqual(mock_method.call_count, 0)
self.assertEqual(response, answer)
self.object_controller.allowed_methods.append(method)
if __name__ == '__main__':
unittest.main()