Merge "Support etcd clustering configuration"
This commit is contained in:
commit
ba0ad9a121
@ -20,6 +20,7 @@ import urllib3
|
||||
from urllib3 import connection
|
||||
from urllib3 import exceptions
|
||||
|
||||
from dragonflow._i18n import _LE
|
||||
from dragonflow.common import exceptions as df_exceptions
|
||||
from dragonflow.db import db_api
|
||||
|
||||
@ -87,6 +88,21 @@ def _error_catcher(self):
|
||||
urllib3.HTTPResponse._error_catcher = _error_catcher
|
||||
|
||||
|
||||
def _check_valid_host(host_str):
|
||||
return ':' in host_str and host_str[-1] != ':'
|
||||
|
||||
|
||||
def _parse_hosts(hosts):
|
||||
host_ports = []
|
||||
for host_str in hosts:
|
||||
if _check_valid_host(host_str):
|
||||
host_port = host_str.strip().split(':')
|
||||
host_ports.append((host_port[0], int(host_port[1])))
|
||||
else:
|
||||
LOG.error(_LE("The host string %s is invalid."), host_str)
|
||||
return tuple(host_ports)
|
||||
|
||||
|
||||
class EtcdDbDriver(db_api.DbApi):
|
||||
|
||||
def __init__(self):
|
||||
@ -97,7 +113,11 @@ class EtcdDbDriver(db_api.DbApi):
|
||||
self.pool = eventlet.GreenPool(size=1)
|
||||
|
||||
def initialize(self, db_ip, db_port, **args):
|
||||
self.client = etcd.Client(host=db_ip, port=db_port)
|
||||
hosts = _parse_hosts(args['config'].remote_db_hosts)
|
||||
if hosts:
|
||||
self.client = etcd.Client(host=hosts, allow_reconnect=True)
|
||||
else:
|
||||
self.client = etcd.Client(host=db_ip, port=db_port)
|
||||
|
||||
def support_publish_subscribe(self):
|
||||
return True
|
||||
|
52
dragonflow/tests/unit/test_etcd_driver.py
Normal file
52
dragonflow/tests/unit/test_etcd_driver.py
Normal file
@ -0,0 +1,52 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import mock
|
||||
|
||||
from dragonflow.db.drivers import etcd_db_driver
|
||||
from dragonflow.db.drivers.etcd_db_driver import _parse_hosts
|
||||
from dragonflow.tests import base as tests_base
|
||||
|
||||
|
||||
class TestEtcdDB(tests_base.BaseTestCase):
|
||||
|
||||
def test_parse_none(self):
|
||||
fake_host = []
|
||||
expected = ()
|
||||
output = _parse_hosts(fake_host)
|
||||
self.assertEqual(expected, output)
|
||||
|
||||
def test_parse_empty(self):
|
||||
fake_host = [""]
|
||||
expected = ()
|
||||
output = _parse_hosts(fake_host)
|
||||
self.assertEqual(expected, output)
|
||||
|
||||
def test_parse_one_host(self):
|
||||
fake_host = ['127.0.0.1:80']
|
||||
expected = (('127.0.0.1', 80),)
|
||||
output = _parse_hosts(fake_host)
|
||||
self.assertEqual(expected, output)
|
||||
|
||||
def test_parse_multiple_hosts(self):
|
||||
fake_host = ['127.0.0.1:80', '192.168.0.1:8080']
|
||||
expected = (('127.0.0.1', 80), ('192.168.0.1', 8080))
|
||||
output = _parse_hosts(fake_host)
|
||||
self.assertEqual(expected, output)
|
||||
|
||||
def test_parse_multiple_hosts_invalid(self):
|
||||
fake_host = ['127.0.0.1:80', '192.168.0.1']
|
||||
expected = (('127.0.0.1', 80),)
|
||||
with mock.patch.object(etcd_db_driver.LOG, 'error') as log_err:
|
||||
output = _parse_hosts(fake_host)
|
||||
self.assertEqual(expected, output)
|
||||
log_err.assert_called_once_with(
|
||||
u'The host string %s is invalid.', '192.168.0.1')
|
@ -42,10 +42,12 @@ class DFL3RouterPluginBase(test_plugin.Ml2PluginV2TestCase):
|
||||
def setUp(self):
|
||||
lock_db = mock.patch('dragonflow.db.neutron.lockedobjects_db').start()
|
||||
lock_db.wrap_db_lock = empty_wrapper
|
||||
nbapi_instance = mock.patch('dragonflow.db.api_nb.NbApi').start()
|
||||
nbapi_instance.get_instance.return_value = mock.MagicMock()
|
||||
super(DFL3RouterPluginBase, self).setUp()
|
||||
self.l3p = (manager.NeutronManager.
|
||||
get_service_plugins()['L3_ROUTER_NAT'])
|
||||
self.nb_api = self.l3p.nb_api = mock.MagicMock()
|
||||
self.nb_api = self.l3p.nb_api
|
||||
self.ctx = nctx.get_admin_context()
|
||||
|
||||
|
||||
|
@ -44,7 +44,6 @@ class TestDFMechDriver(base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestDFMechDriver, self).setUp()
|
||||
self.driver = mech_driver.DFMechDriver()
|
||||
self.driver.initialize()
|
||||
self.driver.nb_api = mock.Mock()
|
||||
self.dbversion = 0
|
||||
version_db._create_db_version_row = mock.Mock(
|
||||
@ -325,10 +324,12 @@ class TestDFMechDriverRevision(test_plugin.Ml2PluginV2TestCase):
|
||||
return p
|
||||
|
||||
def setUp(self):
|
||||
nbapi_instance = mock.patch('dragonflow.db.api_nb.NbApi').start()
|
||||
nbapi_instance.get_instance.return_value = mock.MagicMock()
|
||||
super(TestDFMechDriverRevision, self).setUp()
|
||||
mm = self.driver.mechanism_manager
|
||||
self.mech_driver = mm.mech_drivers['df'].obj
|
||||
self.nb_api = self.mech_driver.nb_api = mock.MagicMock()
|
||||
self.nb_api = self.mech_driver.nb_api
|
||||
|
||||
def _test_create_security_group_revision(self):
|
||||
s = {'security_group': {'tenant_id': 'some_tenant', 'name': '',
|
||||
|
Loading…
Reference in New Issue
Block a user