# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import unittest
from contextlib import nested
from textwrap import dedent

import mock
from test.unit import debug_logger
from swift.container import sync
from swift.common import utils
from swift.common.wsgi import ConfigString
from swift.common.exceptions import ClientException
from swift.common.storage_policy import StoragePolicy
import test
from test.unit import patch_policies, with_tempdir

utils.HASH_PATH_SUFFIX = 'endcap'
utils.HASH_PATH_PREFIX = 'endcap'


class FakeRing(object):

    def __init__(self):
        self.devs = [{'ip': '10.0.0.%s' % x, 'port': 1000 + x, 'device': 'sda'}
                     for x in range(3)]

    def get_nodes(self, account, container=None, obj=None):
        return 1, list(self.devs)


class FakeContainerBroker(object):

    def __init__(self, path, metadata=None, info=None, deleted=False,
                 items_since=None):
        self.db_file = path
        self.metadata = metadata if metadata else {}
        self.info = info if info else {}
        self.deleted = deleted
        self.items_since = items_since if items_since else []
        self.sync_point1 = -1
        self.sync_point2 = -1

    def get_info(self):
        return self.info

    def is_deleted(self):
        return self.deleted

    def get_items_since(self, sync_point, limit):
        if sync_point < 0:
            sync_point = 0
        return self.items_since[sync_point:sync_point + limit]

    def set_x_container_sync_points(self, sync_point1, sync_point2):
        self.sync_point1 = sync_point1
        self.sync_point2 = sync_point2


@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())])
class TestContainerSync(unittest.TestCase):

    def setUp(self):
        self.logger = debug_logger('test-container-sync')

    def test_FileLikeIter(self):
        # Retained test to show new FileLikeIter acts just like the removed
        # _Iter2FileLikeObject did.
        flo = sync.FileLikeIter(iter(['123', '4567', '89', '0']))
        expect = '1234567890'

        got = flo.read(2)
        self.assertTrue(len(got) <= 2)
        self.assertEquals(got, expect[:len(got)])
        expect = expect[len(got):]

        got = flo.read(5)
        self.assertTrue(len(got) <= 5)
        self.assertEquals(got, expect[:len(got)])
        expect = expect[len(got):]

        self.assertEquals(flo.read(), expect)
        self.assertEquals(flo.read(), '')
        self.assertEquals(flo.read(2), '')

        flo = sync.FileLikeIter(iter(['123', '4567', '89', '0']))
        self.assertEquals(flo.read(), '1234567890')
        self.assertEquals(flo.read(), '')
        self.assertEquals(flo.read(2), '')

    def assertLogMessage(self, msg_level, expected, skip=0):
        for line in self.logger.get_lines_for_level(msg_level)[skip:]:
            msg = 'expected %r not in %r' % (expected, line)
            self.assertTrue(expected in line, msg)

    @with_tempdir
    def test_init(self, tempdir):
        ic_conf_path = os.path.join(tempdir, 'internal-client.conf')
        cring = FakeRing()

        with mock.patch('swift.container.sync.InternalClient'):
            cs = sync.ContainerSync({}, container_ring=cring)
        self.assertTrue(cs.container_ring is cring)

        # specified but not exists will not start
        conf = {'internal_client_conf_path': ic_conf_path}
        self.assertRaises(SystemExit, sync.ContainerSync, conf,
                          container_ring=cring, logger=self.logger)

        # not specified will use default conf
        with mock.patch('swift.container.sync.InternalClient') as mock_ic:
            cs = sync.ContainerSync({}, container_ring=cring,
                                    logger=self.logger)
        self.assertTrue(cs.container_ring is cring)
        self.assertTrue(mock_ic.called)
        conf_path, name, retry = mock_ic.call_args[0]
        self.assertTrue(isinstance(conf_path, ConfigString))
        self.assertEquals(conf_path.contents.getvalue(),
                          dedent(sync.ic_conf_body))
        self.assertLogMessage('warning', 'internal_client_conf_path')
        self.assertLogMessage('warning', 'internal-client.conf-sample')

        # correct
        contents = dedent(sync.ic_conf_body)
        with open(ic_conf_path, 'w') as f:
            f.write(contents)
        with mock.patch('swift.container.sync.InternalClient') as mock_ic:
            cs = sync.ContainerSync(conf, container_ring=cring)
        self.assertTrue(cs.container_ring is cring)
        self.assertTrue(mock_ic.called)
        conf_path, name, retry = mock_ic.call_args[0]
        self.assertEquals(conf_path, ic_conf_path)

        sample_conf_filename = os.path.join(
            os.path.dirname(test.__file__),
            '../etc/internal-client.conf-sample')
        with open(sample_conf_filename) as sample_conf_file:
            sample_conf = sample_conf_file.read()
        self.assertEqual(contents, sample_conf)

    def test_run_forever(self):
        # This runs runs_forever with fakes to succeed for two loops, the first
        # causing a report but no interval sleep, the second no report but an
        # interval sleep.
        time_calls = [0]
        sleep_calls = []
        audit_location_generator_calls = [0]

        def fake_time():
            time_calls[0] += 1
            returns = [1,     # Initialized reported time
                       1,     # Start time
                       3602,  # Is it report time (yes)
                       3602,  # Report time
                       3602,  # Elapsed time for "under interval" (no)
                       3602,  # Start time
                       3603,  # Is it report time (no)
                       3603]  # Elapsed time for "under interval" (yes)
            if time_calls[0] == len(returns) + 1:
                raise Exception('we are now done')
            return returns[time_calls[0] - 1]

        def fake_sleep(amount):
            sleep_calls.append(amount)

        def fake_audit_location_generator(*args, **kwargs):
            audit_location_generator_calls[0] += 1
            # Makes .container_sync() short-circuit
            yield 'container.db', 'device', 'partition'
            return

        orig_time = sync.time
        orig_sleep = sync.sleep
        orig_ContainerBroker = sync.ContainerBroker
        orig_audit_location_generator = sync.audit_location_generator
        try:
            sync.ContainerBroker = lambda p: FakeContainerBroker(
                p, info={'account': 'a', 'container': 'c',
                         'storage_policy_index': 0})
            sync.time = fake_time
            sync.sleep = fake_sleep

            with mock.patch('swift.container.sync.InternalClient'):
                cs = sync.ContainerSync({}, container_ring=FakeRing())
            sync.audit_location_generator = fake_audit_location_generator
            cs.run_forever(1, 2, a=3, b=4, verbose=True)
        except Exception as err:
            if str(err) != 'we are now done':
                raise
        finally:
            sync.time = orig_time
            sync.sleep = orig_sleep
            sync.audit_location_generator = orig_audit_location_generator
            sync.ContainerBroker = orig_ContainerBroker

        self.assertEquals(time_calls, [9])
        self.assertEquals(len(sleep_calls), 2)
        self.assertTrue(sleep_calls[0] <= cs.interval)
        self.assertTrue(sleep_calls[1] == cs.interval - 1)
        self.assertEquals(audit_location_generator_calls, [2])
        self.assertEquals(cs.reported, 3602)

    def test_run_once(self):
        # This runs runs_once with fakes twice, the first causing an interim
        # report, the second with no interim report.
        time_calls = [0]
        audit_location_generator_calls = [0]

        def fake_time():
            time_calls[0] += 1
            returns = [1,     # Initialized reported time
                       1,     # Start time
                       3602,  # Is it report time (yes)
                       3602,  # Report time
                       3602,  # End report time
                       3602,  # For elapsed
                       3602,  # Start time
                       3603,  # Is it report time (no)
                       3604,  # End report time
                       3605]  # For elapsed
            if time_calls[0] == len(returns) + 1:
                raise Exception('we are now done')
            return returns[time_calls[0] - 1]

        def fake_audit_location_generator(*args, **kwargs):
            audit_location_generator_calls[0] += 1
            # Makes .container_sync() short-circuit
            yield 'container.db', 'device', 'partition'
            return

        orig_time = sync.time
        orig_audit_location_generator = sync.audit_location_generator
        orig_ContainerBroker = sync.ContainerBroker
        try:
            sync.ContainerBroker = lambda p: FakeContainerBroker(
                p, info={'account': 'a', 'container': 'c',
                         'storage_policy_index': 0})
            sync.time = fake_time

            with mock.patch('swift.container.sync.InternalClient'):
                cs = sync.ContainerSync({}, container_ring=FakeRing())
            sync.audit_location_generator = fake_audit_location_generator
            cs.run_once(1, 2, a=3, b=4, verbose=True)
            self.assertEquals(time_calls, [6])
            self.assertEquals(audit_location_generator_calls, [1])
            self.assertEquals(cs.reported, 3602)
            cs.run_once()
        except Exception as err:
            if str(err) != 'we are now done':
                raise
        finally:
            sync.time = orig_time
            sync.audit_location_generator = orig_audit_location_generator
            sync.ContainerBroker = orig_ContainerBroker

        self.assertEquals(time_calls, [10])
        self.assertEquals(audit_location_generator_calls, [2])
        self.assertEquals(cs.reported, 3604)

    def test_container_sync_not_db(self):
        cring = FakeRing()
        with mock.patch('swift.container.sync.InternalClient'):
            cs = sync.ContainerSync({}, container_ring=cring)
        self.assertEquals(cs.container_failures, 0)

    def test_container_sync_missing_db(self):
        cring = FakeRing()
        with mock.patch('swift.container.sync.InternalClient'):
            cs = sync.ContainerSync({}, container_ring=cring)
        cs.container_sync('isa.db')
        self.assertEquals(cs.container_failures, 1)

    def test_container_sync_not_my_db(self):
        # Db could be there due to handoff replication so test that we ignore
        # those.
        cring = FakeRing()
        with mock.patch('swift.container.sync.InternalClient'):
            cs = sync.ContainerSync({
                'bind_ip': '10.0.0.0',
            }, container_ring=cring)
            # Plumbing test for bind_ip and whataremyips()
            self.assertEqual(['10.0.0.0'], cs._myips)
        orig_ContainerBroker = sync.ContainerBroker
        try:
            sync.ContainerBroker = lambda p: FakeContainerBroker(
                p, info={'account': 'a', 'container': 'c',
                         'storage_policy_index': 0})
            cs._myips = ['127.0.0.1']   # No match
            cs._myport = 1              # No match
            cs.container_sync('isa.db')
            self.assertEquals(cs.container_failures, 0)

            cs._myips = ['10.0.0.0']    # Match
            cs._myport = 1              # No match
            cs.container_sync('isa.db')
            self.assertEquals(cs.container_failures, 0)

            cs._myips = ['127.0.0.1']   # No match
            cs._myport = 1000           # Match
            cs.container_sync('isa.db')
            self.assertEquals(cs.container_failures, 0)

            cs._myips = ['10.0.0.0']    # Match
            cs._myport = 1000           # Match
            # This complete match will cause the 1 container failure since the
            # broker's info doesn't contain sync point keys
            cs.container_sync('isa.db')
            self.assertEquals(cs.container_failures, 1)
        finally:
            sync.ContainerBroker = orig_ContainerBroker

    def test_container_sync_deleted(self):
        cring = FakeRing()
        with mock.patch('swift.container.sync.InternalClient'):
            cs = sync.ContainerSync({}, container_ring=cring)
        orig_ContainerBroker = sync.ContainerBroker
        try:
            sync.ContainerBroker = lambda p: FakeContainerBroker(
                p, info={'account': 'a', 'container': 'c',
                         'storage_policy_index': 0}, deleted=False)
            cs._myips = ['10.0.0.0']    # Match
            cs._myport = 1000           # Match
            # This complete match will cause the 1 container failure since the
            # broker's info doesn't contain sync point keys
            cs.container_sync('isa.db')
            self.assertEquals(cs.container_failures, 1)

            sync.ContainerBroker = lambda p: FakeContainerBroker(
                p, info={'account': 'a', 'container': 'c',
                         'storage_policy_index': 0}, deleted=True)
            # This complete match will not cause any more container failures
            # since the broker indicates deletion
            cs.container_sync('isa.db')
            self.assertEquals(cs.container_failures, 1)
        finally:
            sync.ContainerBroker = orig_ContainerBroker

    def test_container_sync_no_to_or_key(self):
        cring = FakeRing()
        with mock.patch('swift.container.sync.InternalClient'):
            cs = sync.ContainerSync({}, container_ring=cring)
        orig_ContainerBroker = sync.ContainerBroker
        try:
            sync.ContainerBroker = lambda p: FakeContainerBroker(
                p, info={'account': 'a', 'container': 'c',
                         'storage_policy_index': 0,
                         'x_container_sync_point1': -1,
                         'x_container_sync_point2': -1})
            cs._myips = ['10.0.0.0']    # Match
            cs._myport = 1000           # Match
            # This complete match will be skipped since the broker's metadata
            # has no x-container-sync-to or x-container-sync-key
            cs.container_sync('isa.db')
            self.assertEquals(cs.container_failures, 0)
            self.assertEquals(cs.container_skips, 1)

            sync.ContainerBroker = lambda p: FakeContainerBroker(
                p, info={'account': 'a', 'container': 'c',
                         'storage_policy_index': 0,
                         'x_container_sync_point1': -1,
                         'x_container_sync_point2': -1},
                metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1)})
            cs._myips = ['10.0.0.0']    # Match
            cs._myport = 1000           # Match
            # This complete match will be skipped since the broker's metadata
            # has no x-container-sync-key
            cs.container_sync('isa.db')
            self.assertEquals(cs.container_failures, 0)
            self.assertEquals(cs.container_skips, 2)

            sync.ContainerBroker = lambda p: FakeContainerBroker(
                p, info={'account': 'a', 'container': 'c',
                         'storage_policy_index': 0,
                         'x_container_sync_point1': -1,
                         'x_container_sync_point2': -1},
                metadata={'x-container-sync-key': ('key', 1)})
            cs._myips = ['10.0.0.0']    # Match
            cs._myport = 1000           # Match
            # This complete match will be skipped since the broker's metadata
            # has no x-container-sync-to
            cs.container_sync('isa.db')
            self.assertEquals(cs.container_failures, 0)
            self.assertEquals(cs.container_skips, 3)

            sync.ContainerBroker = lambda p: FakeContainerBroker(
                p, info={'account': 'a', 'container': 'c',
                         'storage_policy_index': 0,
                         'x_container_sync_point1': -1,
                         'x_container_sync_point2': -1},
                metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
                          'x-container-sync-key': ('key', 1)})
            cs._myips = ['10.0.0.0']    # Match
            cs._myport = 1000           # Match
            cs.allowed_sync_hosts = []
            # This complete match will cause a container failure since the
            # sync-to won't validate as allowed.
            cs.container_sync('isa.db')
            self.assertEquals(cs.container_failures, 1)
            self.assertEquals(cs.container_skips, 3)

            sync.ContainerBroker = lambda p: FakeContainerBroker(
                p, info={'account': 'a', 'container': 'c',
                         'storage_policy_index': 0,
                         'x_container_sync_point1': -1,
                         'x_container_sync_point2': -1},
                metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
                          'x-container-sync-key': ('key', 1)})
            cs._myips = ['10.0.0.0']    # Match
            cs._myport = 1000           # Match
            cs.allowed_sync_hosts = ['127.0.0.1']
            # This complete match will succeed completely since the broker
            # get_items_since will return no new rows.
            cs.container_sync('isa.db')
            self.assertEquals(cs.container_failures, 1)
            self.assertEquals(cs.container_skips, 3)
        finally:
            sync.ContainerBroker = orig_ContainerBroker

    def test_container_stop_at(self):
        cring = FakeRing()
        with mock.patch('swift.container.sync.InternalClient'):
            cs = sync.ContainerSync({}, container_ring=cring)
        orig_ContainerBroker = sync.ContainerBroker
        orig_time = sync.time
        try:
            sync.ContainerBroker = lambda p: FakeContainerBroker(
                p, info={'account': 'a', 'container': 'c',
                         'storage_policy_index': 0,
                         'x_container_sync_point1': -1,
                         'x_container_sync_point2': -1},
                metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
                          'x-container-sync-key': ('key', 1)},
                items_since=['erroneous data'])
            cs._myips = ['10.0.0.0']    # Match
            cs._myport = 1000           # Match
            cs.allowed_sync_hosts = ['127.0.0.1']
            # This sync will fail since the items_since data is bad.
            cs.container_sync('isa.db')
            self.assertEquals(cs.container_failures, 1)
            self.assertEquals(cs.container_skips, 0)

            # Set up fake times to make the sync short-circuit as having taken
            # too long
            fake_times = [
                1.0,        # Compute the time to move on
                100000.0,   # Compute if it's time to move on from first loop
                100000.0]   # Compute if it's time to move on from second loop

            def fake_time():
                return fake_times.pop(0)

            sync.time = fake_time
            # This same sync won't fail since it will look like it took so long
            # as to be time to move on (before it ever actually tries to do
            # anything).
            cs.container_sync('isa.db')
            self.assertEquals(cs.container_failures, 1)
            self.assertEquals(cs.container_skips, 0)
        finally:
            sync.ContainerBroker = orig_ContainerBroker
            sync.time = orig_time

    def test_container_first_loop(self):
        cring = FakeRing()
        with mock.patch('swift.container.sync.InternalClient'):
            cs = sync.ContainerSync({}, container_ring=cring)

        def fake_hash_path(account, container, obj, raw_digest=False):
            # Ensures that no rows match for full syncing, ordinal is 0 and
            # all hashes are 0
            return '\x00' * 16
        fcb = FakeContainerBroker(
            'path',
            info={'account': 'a', 'container': 'c',
                  'storage_policy_index': 0,
                  'x_container_sync_point1': 2,
                  'x_container_sync_point2': -1},
            metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
                      'x-container-sync-key': ('key', 1)},
            items_since=[{'ROWID': 1, 'name': 'o'}])
        with nested(
                mock.patch('swift.container.sync.ContainerBroker',
                           lambda p: fcb),
                mock.patch('swift.container.sync.hash_path', fake_hash_path)):
            cs._myips = ['10.0.0.0']    # Match
            cs._myport = 1000           # Match
            cs.allowed_sync_hosts = ['127.0.0.1']
            cs.container_sync('isa.db')
            # Succeeds because no rows match
            self.assertEquals(cs.container_failures, 1)
            self.assertEquals(cs.container_skips, 0)
            self.assertEquals(fcb.sync_point1, None)
            self.assertEquals(fcb.sync_point2, -1)

        def fake_hash_path(account, container, obj, raw_digest=False):
            # Ensures that all rows match for full syncing, ordinal is 0
            # and all hashes are 1
            return '\x01' * 16
        fcb = FakeContainerBroker('path', info={'account': 'a',
                                                'container': 'c',
                                                'storage_policy_index': 0,
                                                'x_container_sync_point1': 1,
                                                'x_container_sync_point2': 1},
                                  metadata={'x-container-sync-to':
                                            ('http://127.0.0.1/a/c', 1),
                                            'x-container-sync-key':
                                            ('key', 1)},
                                  items_since=[{'ROWID': 1, 'name': 'o'}])
        with nested(
                mock.patch('swift.container.sync.ContainerBroker',
                           lambda p: fcb),
                mock.patch('swift.container.sync.hash_path', fake_hash_path)):
            cs._myips = ['10.0.0.0']    # Match
            cs._myport = 1000           # Match
            cs.allowed_sync_hosts = ['127.0.0.1']
            cs.container_sync('isa.db')
            # Succeeds because the two sync points haven't deviated yet
            self.assertEquals(cs.container_failures, 1)
            self.assertEquals(cs.container_skips, 0)
            self.assertEquals(fcb.sync_point1, -1)
            self.assertEquals(fcb.sync_point2, -1)

        fcb = FakeContainerBroker(
            'path',
            info={'account': 'a', 'container': 'c',
                  'storage_policy_index': 0,
                  'x_container_sync_point1': 2,
                  'x_container_sync_point2': -1},
            metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
                      'x-container-sync-key': ('key', 1)},
            items_since=[{'ROWID': 1, 'name': 'o'}])
        with mock.patch('swift.container.sync.ContainerBroker', lambda p: fcb):
            cs._myips = ['10.0.0.0']    # Match
            cs._myport = 1000           # Match
            cs.allowed_sync_hosts = ['127.0.0.1']
            cs.container_sync('isa.db')
            # Fails because container_sync_row will fail since the row has no
            # 'deleted' key
            self.assertEquals(cs.container_failures, 2)
            self.assertEquals(cs.container_skips, 0)
            self.assertEquals(fcb.sync_point1, None)
            self.assertEquals(fcb.sync_point2, -1)

        def fake_delete_object(*args, **kwargs):
            raise ClientException
        fcb = FakeContainerBroker(
            'path',
            info={'account': 'a', 'container': 'c',
                  'storage_policy_index': 0,
                  'x_container_sync_point1': 2,
                  'x_container_sync_point2': -1},
            metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
                      'x-container-sync-key': ('key', 1)},
            items_since=[{'ROWID': 1, 'name': 'o', 'created_at': '1.2',
                          'deleted': True}])
        with nested(
                mock.patch('swift.container.sync.ContainerBroker',
                           lambda p: fcb),
                mock.patch('swift.container.sync.delete_object',
                           fake_delete_object)):
            cs._myips = ['10.0.0.0']    # Match
            cs._myport = 1000           # Match
            cs.allowed_sync_hosts = ['127.0.0.1']
            cs.container_sync('isa.db')
            # Fails because delete_object fails
            self.assertEquals(cs.container_failures, 3)
            self.assertEquals(cs.container_skips, 0)
            self.assertEquals(fcb.sync_point1, None)
            self.assertEquals(fcb.sync_point2, -1)

        fcb = FakeContainerBroker(
            'path',
            info={'account': 'a', 'container': 'c',
                  'storage_policy_index': 0,
                  'x_container_sync_point1': 2,
                  'x_container_sync_point2': -1},
            metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
                      'x-container-sync-key': ('key', 1)},
            items_since=[{'ROWID': 1, 'name': 'o', 'created_at': '1.2',
                          'deleted': True}])
        with nested(
                mock.patch('swift.container.sync.ContainerBroker',
                           lambda p: fcb),
                mock.patch('swift.container.sync.delete_object',
                           lambda *x, **y: None)):
            cs._myips = ['10.0.0.0']    # Match
            cs._myport = 1000           # Match
            cs.allowed_sync_hosts = ['127.0.0.1']
            cs.container_sync('isa.db')
            # Succeeds because delete_object succeeds
            self.assertEquals(cs.container_failures, 3)
            self.assertEquals(cs.container_skips, 0)
            self.assertEquals(fcb.sync_point1, None)
            self.assertEquals(fcb.sync_point2, 1)

    def test_container_second_loop(self):
        cring = FakeRing()
        with mock.patch('swift.container.sync.InternalClient'):
            cs = sync.ContainerSync({}, container_ring=cring,
                                    logger=self.logger)
        orig_ContainerBroker = sync.ContainerBroker
        orig_hash_path = sync.hash_path
        orig_delete_object = sync.delete_object
        try:
            # We'll ensure the first loop is always skipped by keeping the two
            # sync points equal

            def fake_hash_path(account, container, obj, raw_digest=False):
                # Ensures that no rows match for second loop, ordinal is 0 and
                # all hashes are 1
                return '\x01' * 16

            sync.hash_path = fake_hash_path
            fcb = FakeContainerBroker(
                'path',
                info={'account': 'a', 'container': 'c',
                      'storage_policy_index': 0,
                      'x_container_sync_point1': -1,
                      'x_container_sync_point2': -1},
                metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
                          'x-container-sync-key': ('key', 1)},
                items_since=[{'ROWID': 1, 'name': 'o'}])
            sync.ContainerBroker = lambda p: fcb
            cs._myips = ['10.0.0.0']    # Match
            cs._myport = 1000           # Match
            cs.allowed_sync_hosts = ['127.0.0.1']
            cs.container_sync('isa.db')
            # Succeeds because no rows match
            self.assertEquals(cs.container_failures, 0)
            self.assertEquals(cs.container_skips, 0)
            self.assertEquals(fcb.sync_point1, 1)
            self.assertEquals(fcb.sync_point2, None)

            def fake_hash_path(account, container, obj, raw_digest=False):
                # Ensures that all rows match for second loop, ordinal is 0 and
                # all hashes are 0
                return '\x00' * 16

            def fake_delete_object(*args, **kwargs):
                pass

            sync.hash_path = fake_hash_path
            sync.delete_object = fake_delete_object
            fcb = FakeContainerBroker(
                'path',
                info={'account': 'a', 'container': 'c',
                      'storage_policy_index': 0,
                      'x_container_sync_point1': -1,
                      'x_container_sync_point2': -1},
                metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
                          'x-container-sync-key': ('key', 1)},
                items_since=[{'ROWID': 1, 'name': 'o'}])
            sync.ContainerBroker = lambda p: fcb
            cs._myips = ['10.0.0.0']    # Match
            cs._myport = 1000           # Match
            cs.allowed_sync_hosts = ['127.0.0.1']
            cs.container_sync('isa.db')
            # Fails because row is missing 'deleted' key
            # Nevertheless the fault is skipped
            self.assertEquals(cs.container_failures, 1)
            self.assertEquals(cs.container_skips, 0)
            self.assertEquals(fcb.sync_point1, 1)
            self.assertEquals(fcb.sync_point2, None)

            fcb = FakeContainerBroker(
                'path',
                info={'account': 'a', 'container': 'c',
                      'storage_policy_index': 0,
                      'x_container_sync_point1': -1,
                      'x_container_sync_point2': -1},
                metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
                          'x-container-sync-key': ('key', 1)},
                items_since=[{'ROWID': 1, 'name': 'o', 'created_at': '1.2',
                              'deleted': True}])
            sync.ContainerBroker = lambda p: fcb
            cs._myips = ['10.0.0.0']    # Match
            cs._myport = 1000           # Match
            cs.allowed_sync_hosts = ['127.0.0.1']
            cs.container_sync('isa.db')
            # Succeeds because row now has 'deleted' key and delete_object
            # succeeds
            self.assertEquals(cs.container_failures, 1)
            self.assertEquals(cs.container_skips, 0)
            self.assertEquals(fcb.sync_point1, 1)
            self.assertEquals(fcb.sync_point2, None)
        finally:
            sync.ContainerBroker = orig_ContainerBroker
            sync.hash_path = orig_hash_path
            sync.delete_object = orig_delete_object

    def test_container_sync_row_delete(self):
        self._test_container_sync_row_delete(None, None)

    def test_container_sync_row_delete_using_realms(self):
        self._test_container_sync_row_delete('US', 'realm_key')

    def _test_container_sync_row_delete(self, realm, realm_key):
        orig_uuid = sync.uuid
        orig_delete_object = sync.delete_object
        try:
            class FakeUUID(object):
                class uuid4(object):
                    hex = 'abcdef'

            sync.uuid = FakeUUID

            def fake_delete_object(path, name=None, headers=None, proxy=None,
                                   logger=None, timeout=None):
                self.assertEquals(path, 'http://sync/to/path')
                self.assertEquals(name, 'object')
                if realm:
                    self.assertEquals(headers, {
                        'x-container-sync-auth':
                        'US abcdef 90e95aabb45a6cdc0892a3db5535e7f918428c90',
                        'x-timestamp': '1.2'})
                else:
                    self.assertEquals(
                        headers,
                        {'x-container-sync-key': 'key', 'x-timestamp': '1.2'})
                self.assertEquals(proxy, 'http://proxy')
                self.assertEqual(timeout, 5.0)
                self.assertEqual(logger, self.logger)

            sync.delete_object = fake_delete_object

            with mock.patch('swift.container.sync.InternalClient'):
                cs = sync.ContainerSync({}, container_ring=FakeRing(),
                                        logger=self.logger)
            cs.http_proxies = ['http://proxy']
            # Success
            self.assertTrue(cs.container_sync_row(
                {'deleted': True,
                 'name': 'object',
                 'created_at': '1.2'}, 'http://sync/to/path',
                'key', FakeContainerBroker('broker'),
                {'account': 'a', 'container': 'c', 'storage_policy_index': 0},
                realm, realm_key))
            self.assertEquals(cs.container_deletes, 1)

            exc = []

            def fake_delete_object(*args, **kwargs):
                exc.append(Exception('test exception'))
                raise exc[-1]

            sync.delete_object = fake_delete_object
            # Failure because of delete_object exception
            self.assertFalse(cs.container_sync_row(
                {'deleted': True,
                 'name': 'object',
                 'created_at': '1.2'}, 'http://sync/to/path',
                'key', FakeContainerBroker('broker'),
                {'account': 'a', 'container': 'c', 'storage_policy_index': 0},
                realm, realm_key))
            self.assertEquals(cs.container_deletes, 1)
            self.assertEquals(len(exc), 1)
            self.assertEquals(str(exc[-1]), 'test exception')

            def fake_delete_object(*args, **kwargs):
                exc.append(ClientException('test client exception'))
                raise exc[-1]

            sync.delete_object = fake_delete_object
            # Failure because of delete_object exception
            self.assertFalse(cs.container_sync_row(
                {'deleted': True,
                 'name': 'object',
                 'created_at': '1.2'}, 'http://sync/to/path',
                'key', FakeContainerBroker('broker'),
                {'account': 'a', 'container': 'c', 'storage_policy_index': 0},
                realm, realm_key))
            self.assertEquals(cs.container_deletes, 1)
            self.assertEquals(len(exc), 2)
            self.assertEquals(str(exc[-1]), 'test client exception')

            def fake_delete_object(*args, **kwargs):
                exc.append(ClientException('test client exception',
                                           http_status=404))
                raise exc[-1]

            sync.delete_object = fake_delete_object
            # Success because the object wasn't even found
            self.assertTrue(cs.container_sync_row(
                {'deleted': True,
                 'name': 'object',
                 'created_at': '1.2'}, 'http://sync/to/path',
                'key', FakeContainerBroker('broker'),
                {'account': 'a', 'container': 'c', 'storage_policy_index': 0},
                realm, realm_key))
            self.assertEquals(cs.container_deletes, 2)
            self.assertEquals(len(exc), 3)
            self.assertEquals(str(exc[-1]), 'test client exception: 404')
        finally:
            sync.uuid = orig_uuid
            sync.delete_object = orig_delete_object

    def test_container_sync_row_put(self):
        self._test_container_sync_row_put(None, None)

    def test_container_sync_row_put_using_realms(self):
        self._test_container_sync_row_put('US', 'realm_key')

    def _test_container_sync_row_put(self, realm, realm_key):
        orig_uuid = sync.uuid
        orig_shuffle = sync.shuffle
        orig_put_object = sync.put_object
        try:
            class FakeUUID(object):
                class uuid4(object):
                    hex = 'abcdef'

            sync.uuid = FakeUUID
            sync.shuffle = lambda x: x

            def fake_put_object(sync_to, name=None, headers=None,
                                contents=None, proxy=None, logger=None,
                                timeout=None):
                self.assertEquals(sync_to, 'http://sync/to/path')
                self.assertEquals(name, 'object')
                if realm:
                    self.assertEqual(headers, {
                        'x-container-sync-auth':
                        'US abcdef ef62c64bb88a33fa00722daa23d5d43253164962',
                        'x-timestamp': '1.2',
                        'etag': 'etagvalue',
                        'other-header': 'other header value',
                        'content-type': 'text/plain'})
                else:
                    self.assertEquals(headers, {
                        'x-container-sync-key': 'key',
                        'x-timestamp': '1.2',
                        'other-header': 'other header value',
                        'etag': 'etagvalue',
                        'content-type': 'text/plain'})
                self.assertEquals(contents.read(), 'contents')
                self.assertEquals(proxy, 'http://proxy')
                self.assertEqual(timeout, 5.0)
                self.assertEqual(logger, self.logger)

            sync.put_object = fake_put_object

            with mock.patch('swift.container.sync.InternalClient'):
                cs = sync.ContainerSync({}, container_ring=FakeRing(),
                                        logger=self.logger)
            cs.http_proxies = ['http://proxy']

            def fake_get_object(acct, con, obj, headers, acceptable_statuses):
                self.assertEqual(headers['X-Backend-Storage-Policy-Index'],
                                 '0')
                return (200, {'other-header': 'other header value',
                        'etag': '"etagvalue"', 'x-timestamp': '1.2',
                        'content-type': 'text/plain; swift_bytes=123'},
                        iter('contents'))

            cs.swift.get_object = fake_get_object
            # Success as everything says it worked
            self.assertTrue(cs.container_sync_row(
                {'deleted': False,
                 'name': 'object',
                 'created_at': '1.2'}, 'http://sync/to/path',
                'key', FakeContainerBroker('broker'),
                {'account': 'a', 'container': 'c', 'storage_policy_index': 0},
                realm, realm_key))
            self.assertEquals(cs.container_puts, 1)

            def fake_get_object(acct, con, obj, headers, acceptable_statuses):
                self.assertEquals(headers['X-Newest'], True)
                self.assertEquals(headers['X-Backend-Storage-Policy-Index'],
                                  '0')
                return (200, {'date': 'date value',
                        'last-modified': 'last modified value',
                        'x-timestamp': '1.2',
                        'other-header': 'other header value',
                        'etag': '"etagvalue"',
                        'content-type': 'text/plain; swift_bytes=123'},
                        iter('contents'))

            cs.swift.get_object = fake_get_object
            # Success as everything says it worked, also checks 'date' and
            # 'last-modified' headers are removed and that 'etag' header is
            # stripped of double quotes.
            self.assertTrue(cs.container_sync_row(
                {'deleted': False,
                 'name': 'object',
                 'created_at': '1.2'}, 'http://sync/to/path',
                'key', FakeContainerBroker('broker'),
                {'account': 'a', 'container': 'c', 'storage_policy_index': 0},
                realm, realm_key))
            self.assertEquals(cs.container_puts, 2)

            exc = []

            def fake_get_object(acct, con, obj, headers, acceptable_statuses):
                self.assertEquals(headers['X-Newest'], True)
                self.assertEquals(headers['X-Backend-Storage-Policy-Index'],
                                  '0')
                exc.append(Exception('test exception'))
                raise exc[-1]

            cs.swift.get_object = fake_get_object
            # Fail due to completely unexpected exception
            self.assertFalse(cs.container_sync_row(
                {'deleted': False,
                 'name': 'object',
                 'created_at': '1.2'}, 'http://sync/to/path',
                'key', FakeContainerBroker('broker'),
                {'account': 'a', 'container': 'c', 'storage_policy_index': 0},
                realm, realm_key))
            self.assertEquals(cs.container_puts, 2)
            self.assertEquals(len(exc), 1)
            self.assertEquals(str(exc[-1]), 'test exception')

            exc = []

            def fake_get_object(acct, con, obj, headers, acceptable_statuses):
                self.assertEquals(headers['X-Newest'], True)
                self.assertEquals(headers['X-Backend-Storage-Policy-Index'],
                                  '0')

                exc.append(ClientException('test client exception'))
                raise exc[-1]

            cs.swift.get_object = fake_get_object
            # Fail due to all direct_get_object calls failing
            self.assertFalse(cs.container_sync_row(
                {'deleted': False,
                 'name': 'object',
                 'created_at': '1.2'}, 'http://sync/to/path',
                'key', FakeContainerBroker('broker'),
                {'account': 'a', 'container': 'c', 'storage_policy_index': 0},
                realm, realm_key))
            self.assertEquals(cs.container_puts, 2)
            self.assertEquals(len(exc), 1)
            self.assertEquals(str(exc[-1]), 'test client exception')

            def fake_get_object(acct, con, obj, headers, acceptable_statuses):
                self.assertEquals(headers['X-Newest'], True)
                self.assertEquals(headers['X-Backend-Storage-Policy-Index'],
                                  '0')
                return (200, {'other-header': 'other header value',
                        'x-timestamp': '1.2', 'etag': '"etagvalue"'},
                        iter('contents'))

            def fake_put_object(*args, **kwargs):
                raise ClientException('test client exception', http_status=401)

            cs.swift.get_object = fake_get_object
            sync.put_object = fake_put_object
            # Fail due to 401
            self.assertFalse(cs.container_sync_row(
                {'deleted': False,
                 'name': 'object',
                 'created_at': '1.2'}, 'http://sync/to/path',
                'key', FakeContainerBroker('broker'),
                {'account': 'a', 'container': 'c', 'storage_policy_index': 0},
                realm, realm_key))
            self.assertEquals(cs.container_puts, 2)
            self.assertLogMessage('info', 'Unauth')

            def fake_put_object(*args, **kwargs):
                raise ClientException('test client exception', http_status=404)

            sync.put_object = fake_put_object
            # Fail due to 404
            self.assertFalse(cs.container_sync_row(
                {'deleted': False,
                 'name': 'object',
                 'created_at': '1.2'}, 'http://sync/to/path',
                'key', FakeContainerBroker('broker'),
                {'account': 'a', 'container': 'c', 'storage_policy_index': 0},
                realm, realm_key))
            self.assertEquals(cs.container_puts, 2)
            self.assertLogMessage('info', 'Not found', 1)

            def fake_put_object(*args, **kwargs):
                raise ClientException('test client exception', http_status=503)

            sync.put_object = fake_put_object
            # Fail due to 503
            self.assertFalse(cs.container_sync_row(
                {'deleted': False,
                 'name': 'object',
                 'created_at': '1.2'}, 'http://sync/to/path',
                'key', FakeContainerBroker('broker'),
                {'account': 'a', 'container': 'c', 'storage_policy_index': 0},
                realm, realm_key))
            self.assertEquals(cs.container_puts, 2)
            self.assertLogMessage('error', 'ERROR Syncing')
        finally:
            sync.uuid = orig_uuid
            sync.shuffle = orig_shuffle
            sync.put_object = orig_put_object

    def test_select_http_proxy_None(self):

        with mock.patch('swift.container.sync.InternalClient'):
            cs = sync.ContainerSync(
                {'sync_proxy': ''}, container_ring=FakeRing())
        self.assertEqual(cs.select_http_proxy(), None)

    def test_select_http_proxy_one(self):

        with mock.patch('swift.container.sync.InternalClient'):
            cs = sync.ContainerSync(
                {'sync_proxy': 'http://one'}, container_ring=FakeRing())
        self.assertEqual(cs.select_http_proxy(), 'http://one')

    def test_select_http_proxy_multiple(self):

        with mock.patch('swift.container.sync.InternalClient'):
            cs = sync.ContainerSync(
                {'sync_proxy': 'http://one,http://two,http://three'},
                container_ring=FakeRing())
        self.assertEqual(
            set(cs.http_proxies),
            set(['http://one', 'http://two', 'http://three']))


if __name__ == '__main__':
    unittest.main()