Browse Source

Hash Ring: Add support for groups

This patch adds support for groups in the Hash Ring mechanism.

Prior to this patch there was an assumption that all members of the ring
were able to handle the same events but, once we start expanding the use
of the hash ring on different parts of the code such as the Octavia
driver, that assumption is not true anymore because they have their own
events.

With the group mechanism, multiple groups can be created (resulting in a
separated hash ring for each group) that way we can group workers based
on the events they are able to handle.

This patch includes a new migration script that adds a new column called
"group_name" to the "ovn_hash_ring" table.

Change-Id: Ifc489afb764cddeec58597399d7d641c0d7ff279
Signed-off-by: Lucas Alvares Gomes <lucasagomes@gmail.com>
changes/07/670307/4
Lucas Alvares Gomes 3 years ago
parent
commit
04469819f2
  1. 1
      networking_ovn/common/constants.py
  2. 7
      networking_ovn/common/hash_ring_manager.py
  3. 8
      networking_ovn/common/maintenance.py
  4. 29
      networking_ovn/db/hash_ring.py
  5. 2
      networking_ovn/db/migration/alembic_migrations/versions/EXPAND_HEAD
  6. 53
      networking_ovn/db/migration/alembic_migrations/versions/stein/expand/e55d09277410_ovn_hash_ring_add_group_column.py
  7. 1
      networking_ovn/db/models.py
  8. 8
      networking_ovn/ml2/mech_driver.py
  9. 3
      networking_ovn/ovsdb/ovsdb_monitor.py
  10. 9
      networking_ovn/tests/functional/test_ovsdb_monitor.py
  11. 26
      networking_ovn/tests/unit/common/test_hash_ring_manager.py
  12. 79
      networking_ovn/tests/unit/db/test_hash_ring.py
  13. 1
      networking_ovn/tests/unit/ml2/test_mech_driver.py

1
networking_ovn/common/constants.py

@ -157,3 +157,4 @@ DEFAULT_ADDR_FOR_LSP_WITH_PEER = 'router'
HASH_RING_NODES_TIMEOUT = 60
HASH_RING_TOUCH_INTERVAL = 30
HASH_RING_CACHE_TIMEOUT = 30
HASH_RING_ML2_GROUP = 'mechanism_driver'

7
networking_ovn/common/hash_ring_manager.py

@ -29,10 +29,11 @@ LOG = log.getLogger(__name__)
class HashRingManager(object):
def __init__(self):
def __init__(self, group_name):
self._hash_ring = None
self._last_time_loaded = None
self._cache_startup_timeout = True
self._group = group_name
@property
def _wait_startup_before_caching(self):
@ -50,7 +51,7 @@ class HashRingManager(object):
return False
nodes = db_hash_ring.get_active_nodes(
constants.HASH_RING_CACHE_TIMEOUT, from_host=True)
constants.HASH_RING_CACHE_TIMEOUT, self._group, from_host=True)
dont_cache = nodes and nodes[0].created_at == nodes[0].updated_at
if not dont_cache:
self._cache_startup_timeout = False
@ -72,7 +73,7 @@ class HashRingManager(object):
not self._hash_ring.nodes or
cache_timeout >= self._last_time_loaded):
nodes = db_hash_ring.get_active_nodes(
constants.HASH_RING_NODES_TIMEOUT)
constants.HASH_RING_NODES_TIMEOUT, self._group)
self._hash_ring = hashring.HashRing({node.node_uuid
for node in nodes})
self._last_time_loaded = timeutils.utcnow()

8
networking_ovn/common/maintenance.py

@ -386,9 +386,15 @@ class DBInconsistenciesPeriodics(object):
raise periodics.NeverAgain()
class HashRingHealthCheckPeriodics(object):
def __init__(self, group):
self._group = group
@periodics.periodic(spacing=ovn_const.HASH_RING_TOUCH_INTERVAL)
def touch_hash_ring_nodes(self):
# NOTE(lucasagomes): Note that we do not rely on the OVSDB lock
# here because we want the maintenance tasks from each instance to
# execute this task.
db_hash_ring.touch_nodes_from_host()
db_hash_ring.touch_nodes_from_host(self._group)

29
networking_ovn/db/hash_ring.py

@ -25,51 +25,48 @@ from networking_ovn.db import models
CONF = cfg.CONF
def add_node(node_uuid=None):
def add_node(group_name, node_uuid=None):
if node_uuid is None:
node_uuid = uuidutils.generate_uuid()
session = db_api.get_writer_session()
with session.begin():
row = models.OVNHashRing(node_uuid=node_uuid, hostname=CONF.host)
row = models.OVNHashRing(node_uuid=node_uuid, hostname=CONF.host,
group_name=group_name)
session.add(row)
return node_uuid
def remove_nodes_from_host():
def remove_nodes_from_host(group_name):
session = db_api.get_writer_session()
with session.begin():
session.query(models.OVNHashRing).filter_by(
hostname=CONF.host).delete()
session.query(models.OVNHashRing).filter(
models.OVNHashRing.hostname == CONF.host,
models.OVNHashRing.group_name == group_name).delete()
def _touch(hostname=None, node_uuid=None):
filter_args = {}
if hostname:
filter_args['hostname'] = hostname
if node_uuid:
filter_args['node_uuid'] = node_uuid
def _touch(**filter_args):
session = db_api.get_writer_session()
with session.begin():
session.query(models.OVNHashRing).filter_by(
**filter_args).update({'updated_at': timeutils.utcnow()})
def touch_nodes_from_host():
_touch(hostname=CONF.host)
def touch_nodes_from_host(group_name):
_touch(hostname=CONF.host, group_name=group_name)
def touch_node(node_uuid):
_touch(node_uuid=node_uuid)
def get_active_nodes(interval, from_host=False):
def get_active_nodes(interval, group_name, from_host=False):
session = db_api.get_reader_session()
limit = timeutils.utcnow() - datetime.timedelta(seconds=interval)
with session.begin():
query = session.query(models.OVNHashRing).filter(
models.OVNHashRing.updated_at >= limit)
models.OVNHashRing.updated_at >= limit,
models.OVNHashRing.group_name == group_name)
if from_host:
query = query.filter_by(hostname=CONF.host)
return query.all()

2
networking_ovn/db/migration/alembic_migrations/versions/EXPAND_HEAD

@ -1 +1 @@
4a478c5c1e16
e55d09277410

53
networking_ovn/db/migration/alembic_migrations/versions/stein/expand/e55d09277410_ovn_hash_ring_add_group_column.py

@ -0,0 +1,53 @@
# Copyright 2019 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from alembic import op
import sqlalchemy as sa
from sqlalchemy.engine.reflection import Inspector as insp
"""ovn_hash_ring_add_group_column
Revision ID: e55d09277410
Revises: 4a478c5c1e16
Create Date: 2019-07-09 13:26:31.356414
"""
# revision identifiers, used by Alembic.
revision = 'e55d09277410'
down_revision = '4a478c5c1e16'
MYSQL_ENGINE = 'mysql'
def upgrade():
op.add_column(
'ovn_hash_ring',
sa.Column('group_name', sa.String(length=256), nullable=False))
# Make node_uuid and group_name a composite PK
bind = op.get_bind()
engine = bind.engine
if (engine.name == MYSQL_ENGINE):
op.execute("ALTER TABLE ovn_hash_ring DROP PRIMARY KEY,"
"ADD PRIMARY KEY (node_uuid, group_name);")
else:
inspector = insp.from_engine(bind)
pk_constraint = inspector.get_pk_constraint('ovn_hash_ring')
op.drop_constraint(pk_constraint.get('name'), 'ovn_hash_ring',
type_='primary')
op.create_primary_key(op.f('pk_ovn_hash_ring'),
'ovn_hash_ring', ['node_uuid', 'group_name'])

1
networking_ovn/db/models.py

@ -43,6 +43,7 @@ class OVNRevisionNumbers(model_base.BASEV2):
class OVNHashRing(model_base.BASEV2):
__tablename__ = 'ovn_hash_ring'
node_uuid = sa.Column(sa.String(36), nullable=False, primary_key=True)
group_name = sa.Column(sa.String(256), nullable=False, primary_key=True)
hostname = sa.Column(sa.String(256), nullable=False)
created_at = sa.Column(sa.DateTime(), default=sa.func.now(),
nullable=False)

8
networking_ovn/ml2/mech_driver.py

@ -107,6 +107,7 @@ class OVNMechanismDriver(api.MechanismDriver):
self._ovn_client_inst = None
self._maintenance_thread = None
self.node_uuid = None
self.hash_ring_group = ovn_const.HASH_RING_ML2_GROUP
self.sg_enabled = ovn_acl.is_sg_enabled()
# NOTE(lucasagomes): _clean_hash_ring() must be called before
# self.subscribe() to avoid processes racing when adding or
@ -188,7 +189,7 @@ class OVNMechanismDriver(api.MechanismDriver):
events.BEFORE_DELETE)
def _clean_hash_ring(self, *args, **kwargs):
db_hash_ring.remove_nodes_from_host()
db_hash_ring.remove_nodes_from_host(self.hash_ring_group)
def pre_fork_initialize(self, resource, event, trigger, payload=None):
"""Pre-initialize the ML2/OVN driver."""
@ -204,7 +205,7 @@ class OVNMechanismDriver(api.MechanismDriver):
is_maintenance = (utils.get_method_class(trigger) ==
worker.MaintenanceWorker)
if not is_maintenance:
self.node_uuid = db_hash_ring.add_node()
self.node_uuid = db_hash_ring.add_node(self.hash_ring_group)
self._nb_ovn, self._sb_ovn = impl_idl_ovn.get_ovn_idls(
self, trigger, binding_events=not is_maintenance)
@ -241,6 +242,9 @@ class OVNMechanismDriver(api.MechanismDriver):
self._maintenance_thread = maintenance.MaintenanceThread()
self._maintenance_thread.add_periodics(
maintenance.DBInconsistenciesPeriodics(self._ovn_client))
self._maintenance_thread.add_periodics(
maintenance.HashRingHealthCheckPeriodics(
self.hash_ring_group))
self._maintenance_thread.start()
def _create_security_group_precommit(self, resource, event, trigger,

3
networking_ovn/ovsdb/ovsdb_monitor.py

@ -351,8 +351,9 @@ class OvnIdlDistributedLock(BaseOvnIdl):
super(OvnIdlDistributedLock, self).__init__(remote, schema)
self.driver = driver
self.notify_handler = OvnDbNotifyHandler(driver)
self._hash_ring = hash_ring_manager.HashRingManager()
self._node_uuid = self.driver.node_uuid
self._hash_ring = hash_ring_manager.HashRingManager(
self.driver.hash_ring_group)
self._last_touch = None
def notify(self, event, row, updates=None):

9
networking_ovn/tests/functional/test_ovsdb_monitor.py

@ -187,8 +187,10 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase):
# Create 10 fake workers
for _ in range(10):
node_uuid = uuidutils.generate_uuid()
db_hash_ring.add_node(node_uuid)
fake_driver = mock.MagicMock(node_uuid=node_uuid)
db_hash_ring.add_node(ovn_const.HASH_RING_ML2_GROUP, node_uuid)
fake_driver = mock.MagicMock(
node_uuid=node_uuid,
hash_ring_group=ovn_const.HASH_RING_ML2_GROUP)
_idl = ovsdb_monitor.OvnNbIdl.from_server(
self.ovsdb_server_mgr.get_ovsdb_connection_path(),
'OVN_Northbound', fake_driver)
@ -204,7 +206,8 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase):
# Assert we have 11 active workers in the ring
self.assertEqual(
11, len(db_hash_ring.get_active_nodes(
interval=ovn_const.HASH_RING_NODES_TIMEOUT)))
interval=ovn_const.HASH_RING_NODES_TIMEOUT,
group_name=ovn_const.HASH_RING_ML2_GROUP)))
# Trigger the event
self.create_port()

26
networking_ovn/tests/unit/common/test_hash_ring_manager.py

@ -24,12 +24,15 @@ from networking_ovn.common import hash_ring_manager
from networking_ovn.db import hash_ring as db_hash_ring
from networking_ovn.tests.unit.db import base as db_base
HASH_RING_TEST_GROUP = 'test_group'
class TestHashRingManager(db_base.DBTestCase):
def setUp(self):
super(TestHashRingManager, self).setUp()
self.hash_ring_manager = hash_ring_manager.HashRingManager()
self.hash_ring_manager = hash_ring_manager.HashRingManager(
HASH_RING_TEST_GROUP)
def _verify_hashes(self, hash_dict):
for target_node, uuid_ in hash_dict.items():
@ -38,8 +41,8 @@ class TestHashRingManager(db_base.DBTestCase):
def test_get_node(self):
# Use pre-defined UUIDs to make the hashes predictable
node_1_uuid = db_hash_ring.add_node('node-1')
node_2_uuid = db_hash_ring.add_node('node-2')
node_1_uuid = db_hash_ring.add_node(HASH_RING_TEST_GROUP, 'node-1')
node_2_uuid = db_hash_ring.add_node(HASH_RING_TEST_GROUP, 'node-2')
hash_dict_before = {node_1_uuid: 'fake-uuid',
node_2_uuid: 'fake-uuid-0'}
@ -52,13 +55,14 @@ class TestHashRingManager(db_base.DBTestCase):
def test_ring_rebalance(self):
# Use pre-defined UUIDs to make the hashes predictable
node_1_uuid = db_hash_ring.add_node('node-1')
node_2_uuid = db_hash_ring.add_node('node-2')
node_1_uuid = db_hash_ring.add_node(HASH_RING_TEST_GROUP, 'node-1')
node_2_uuid = db_hash_ring.add_node(HASH_RING_TEST_GROUP, 'node-2')
# Add another node from a different host
with mock.patch.object(db_hash_ring, 'CONF') as mock_conf:
mock_conf.host = 'another-host-52359446-c366'
another_host_node = db_hash_ring.add_node('another-host')
another_host_node = db_hash_ring.add_node(
HASH_RING_TEST_GROUP, 'another-host')
# Assert all nodes are alive in the ring
self.hash_ring_manager.refresh()
@ -76,7 +80,7 @@ class TestHashRingManager(db_base.DBTestCase):
seconds=constants.HASH_RING_NODES_TIMEOUT)
with mock.patch.object(timeutils, 'utcnow') as mock_utcnow:
mock_utcnow.return_value = fake_utcnow
db_hash_ring.touch_nodes_from_host()
db_hash_ring.touch_nodes_from_host(HASH_RING_TEST_GROUP)
# Now assert that the ring was re-balanced and only the node from
# another host is marked as alive
@ -91,22 +95,22 @@ class TestHashRingManager(db_base.DBTestCase):
self._verify_hashes(hash_dict_after_rebalance)
# Now touch the nodes so they appear active again
db_hash_ring.touch_nodes_from_host()
db_hash_ring.touch_nodes_from_host(HASH_RING_TEST_GROUP)
self.hash_ring_manager.refresh()
# The ring should re-balance and as it was before
self._verify_hashes(hash_dict_before)
def test__wait_startup_before_caching(self):
db_hash_ring.add_node('node-1')
db_hash_ring.add_node('node-2')
db_hash_ring.add_node(HASH_RING_TEST_GROUP, 'node-1')
db_hash_ring.add_node(HASH_RING_TEST_GROUP, 'node-2')
# Assert it will return True until created_at != updated_at
self.assertTrue(self.hash_ring_manager._wait_startup_before_caching)
self.assertTrue(self.hash_ring_manager._cache_startup_timeout)
# Touch the nodes (== update the updated_at column)
db_hash_ring.touch_nodes_from_host()
db_hash_ring.touch_nodes_from_host(HASH_RING_TEST_GROUP)
# Assert it's now False. Waiting is not needed anymore
self.assertFalse(self.hash_ring_manager._wait_startup_before_caching)

79
networking_ovn/tests/unit/db/test_hash_ring.py

@ -24,6 +24,8 @@ from networking_ovn.db import hash_ring as db_hash_ring
from networking_ovn.db import models
from networking_ovn.tests.unit.db import base as db_base
HASH_RING_TEST_GROUP = 'test_group'
class TestHashRing(db_base.DBTestCase):
@ -36,10 +38,11 @@ class TestHashRing(db_base.DBTestCase):
except exc.NoResultFound:
pass
def _add_nodes_and_assert_exists(self, count=1):
def _add_nodes_and_assert_exists(self, count=1,
group_name=HASH_RING_TEST_GROUP):
nodes = []
for i in range(count):
node_uuid = db_hash_ring.add_node()
node_uuid = db_hash_ring.add_node(group_name)
self.assertIsNotNone(self._get_node_row(node_uuid))
nodes.append(node_uuid)
return nodes
@ -55,7 +58,7 @@ class TestHashRing(db_base.DBTestCase):
mock_conf.host = 'another-host-52359446-c366'
another_host_node = self._add_nodes_and_assert_exists()[0]
db_hash_ring.remove_nodes_from_host()
db_hash_ring.remove_nodes_from_host(HASH_RING_TEST_GROUP)
# Assert that all nodes from that host have been removed
for n in nodes:
self.assertIsNone(self._get_node_row(n))
@ -81,7 +84,7 @@ class TestHashRing(db_base.DBTestCase):
self.assertEqual(node_db.created_at, node_db.updated_at)
# Touch the nodes from our host
db_hash_ring.touch_nodes_from_host()
db_hash_ring.touch_nodes_from_host(HASH_RING_TEST_GROUP)
# Assert that updated_at is now updated
for node in nodes:
@ -102,18 +105,20 @@ class TestHashRing(db_base.DBTestCase):
another_host_node = self._add_nodes_and_assert_exists()[0]
# Assert all nodes are active (within 60 seconds)
self.assertEqual(4, len(db_hash_ring.get_active_nodes(interval=60)))
self.assertEqual(4, len(db_hash_ring.get_active_nodes(
interval=60, group_name=HASH_RING_TEST_GROUP)))
# Substract 60 seconds from utcnow() and touch the nodes from
# our host
fake_utcnow = timeutils.utcnow() - datetime.timedelta(seconds=60)
with mock.patch.object(timeutils, 'utcnow') as mock_utcnow:
mock_utcnow.return_value = fake_utcnow
db_hash_ring.touch_nodes_from_host()
db_hash_ring.touch_nodes_from_host(HASH_RING_TEST_GROUP)
# Now assert that all nodes from our host are seeing as offline.
# Only the node from another host should be active
active_nodes = db_hash_ring.get_active_nodes(interval=60)
active_nodes = db_hash_ring.get_active_nodes(
interval=60, group_name=HASH_RING_TEST_GROUP)
self.assertEqual(1, len(active_nodes))
self.assertEqual(another_host_node, active_nodes[0].node_uuid)
@ -127,8 +132,8 @@ class TestHashRing(db_base.DBTestCase):
self._add_nodes_and_assert_exists()
# Assert only the 3 nodes from this host is returned
active_nodes = db_hash_ring.get_active_nodes(interval=60,
from_host=True)
active_nodes = db_hash_ring.get_active_nodes(
interval=60, group_name=HASH_RING_TEST_GROUP, from_host=True)
self.assertEqual(3, len(active_nodes))
self.assertNotIn(another_host_id, active_nodes)
@ -151,3 +156,59 @@ class TestHashRing(db_base.DBTestCase):
for node in nodes[1:]:
node_db = self._get_node_row(node)
self.assertEqual(node_db.created_at, node_db.updated_at)
def test_active_nodes_different_groups(self):
another_group = 'another_test_group'
self._add_nodes_and_assert_exists(count=3)
self._add_nodes_and_assert_exists(count=2, group_name=another_group)
active_nodes = db_hash_ring.get_active_nodes(
interval=60, group_name=HASH_RING_TEST_GROUP)
self.assertEqual(3, len(active_nodes))
for node in active_nodes:
self.assertEqual(HASH_RING_TEST_GROUP, node.group_name)
active_nodes = db_hash_ring.get_active_nodes(
interval=60, group_name=another_group)
self.assertEqual(2, len(active_nodes))
for node in active_nodes:
self.assertEqual(another_group, node.group_name)
def test_remove_nodes_from_host_different_groups(self):
another_group = 'another_test_group'
group1 = self._add_nodes_and_assert_exists(count=3)
group2 = self._add_nodes_and_assert_exists(
count=2, group_name=another_group)
db_hash_ring.remove_nodes_from_host(HASH_RING_TEST_GROUP)
# Assert that all nodes from that group have been removed
for node in group1:
self.assertIsNone(self._get_node_row(node))
# Assert that all nodes from a different group are intact
for node in group2:
self.assertIsNotNone(self._get_node_row(node))
def test_touch_nodes_from_host_different_groups(self):
another_group = 'another_test_group'
group1 = self._add_nodes_and_assert_exists(count=3)
group2 = self._add_nodes_and_assert_exists(
count=2, group_name=another_group)
# Assert that updated_at isn't updated yet
for node in group1 + group2:
node_db = self._get_node_row(node)
self.assertEqual(node_db.created_at, node_db.updated_at)
# Touch the nodes from group1
db_hash_ring.touch_nodes_from_host(HASH_RING_TEST_GROUP)
# Assert that updated_at was updated for group1
for node in group1:
node_db = self._get_node_row(node)
self.assertGreater(node_db.updated_at, node_db.created_at)
# Assert that updated_at wasn't updated for group2
for node in group2:
node_db = self._get_node_row(node)
self.assertEqual(node_db.created_at, node_db.updated_at)

1
networking_ovn/tests/unit/ml2/test_mech_driver.py

@ -1617,6 +1617,7 @@ class OVNMechanismDriverTestCase(test_plugin.Ml2PluginV2TestCase):
p.start()
self.addCleanup(p.stop)
self.driver.node_uuid = node_uuid
self.driver.hash_ring_group = 'fake_hash_ring_group'
mm = directory.get_plugin().mechanism_manager
self.mech_driver = mm.mech_drivers['ovn'].obj

Loading…
Cancel
Save