Browse Source

Distributed OVSDB lock: HashRing common methods and DB migration

This patch is responsible for creating the "ovn_hash_ring" database
table and the common methods/classes to access it.

Partial-Bug: #1823715
Change-Id: I052791cda6264baf4497e1be2bf7d3d53c49fa60
Signed-off-by: Lucas Alvares Gomes <lucasagomes@gmail.com>
changes/07/655407/7
Lucas Alvares Gomes 3 years ago
parent
commit
3af381003e
  1. 1
      lower-constraints.txt
  2. 4
      networking_ovn/common/constants.py
  3. 6
      networking_ovn/common/exceptions.py
  4. 95
      networking_ovn/common/hash_ring_manager.py
  5. 61
      networking_ovn/db/hash_ring.py
  6. 2
      networking_ovn/db/migration/alembic_migrations/versions/EXPAND_HEAD
  7. 42
      networking_ovn/db/migration/alembic_migrations/versions/stein/expand/4a478c5c1e16_add_ovn_hash_ring_table.py
  8. 10
      networking_ovn/db/models.py
  9. 121
      networking_ovn/tests/unit/common/test_hash_ring_manager.py
  10. 133
      networking_ovn/tests/unit/db/test_hash_ring.py
  11. 1
      requirements.txt

1
lower-constraints.txt

@ -142,6 +142,7 @@ testrepository==0.0.18
testresources==2.0.0
testscenarios==0.4
testtools==2.2.0
tooz==1.58.0
tinyrpc==0.6
traceback2==1.4.0
unittest2==1.1.0

4
networking_ovn/common/constants.py

@ -150,3 +150,7 @@ MAINTENANCE_DELETE_TYPE_ORDER = {
# The addresses field to set in the logical switch port which has a
# peer router port (connecting to the logical router).
DEFAULT_ADDR_FOR_LSP_WITH_PEER = 'router'
# Hash Ring constants
HASH_RING_NODES_TIMEOUT = 60
HASH_RING_CACHE_TIMEOUT = 30

6
networking_ovn/common/exceptions.py

@ -34,3 +34,9 @@ class StandardAttributeIDNotFound(n_exc.NeutronException):
class AgentStatsNotFound(n_exc.NeutronException):
_message = _('The stats for agent %(agent_id)s could not be found')
class HashRingIsEmpty(n_exc.NeutronException):
_message = _('Hash Ring returned empty when hashing "%(key)s". '
'This should never happen in a normal situation, please '
'check the status of your cluster')

95
networking_ovn/common/hash_ring_manager.py

@ -0,0 +1,95 @@
# Copyright 2019 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from oslo_log import log
from oslo_utils import timeutils
import six
from tooz import hashring
from networking_ovn.common import constants
from networking_ovn.common import exceptions
from networking_ovn.db import hash_ring as db_hash_ring
LOG = log.getLogger(__name__)
class HashRingManager(object):
def __init__(self):
self._hash_ring = None
self._last_time_loaded = None
self._cache_startup_timeout = True
@property
def _wait_startup_before_caching(self):
# NOTE(lucasagomes): Some events are processed at the service's
# startup time and since many services may be started concurrently
# we do not want to use a cached hash ring at that point. This
# method checks if the created_at and updated_at columns from the
# nodes in the ring from this host is equal, and if so it means
# that the service just started.
# If the startup timeout already expired, there's no reason to
# keep reading from the DB. At this point this will always
# return False
if not self._cache_startup_timeout:
return False
nodes = db_hash_ring.get_active_nodes(
constants.HASH_RING_CACHE_TIMEOUT, from_host=True)
dont_cache = nodes and nodes[0].created_at == nodes[0].updated_at
if not dont_cache:
self._cache_startup_timeout = False
return dont_cache
def _load_hash_ring(self, refresh=False):
cache_timeout = timeutils.utcnow() - datetime.timedelta(
seconds=constants.HASH_RING_CACHE_TIMEOUT)
# Refresh the cache if:
# - Refreshed is forced (refresh=True)
# - Service just started (_wait_startup_before_caching)
# - Hash Ring is not yet instantiated
# - Cache has timed out
if (refresh or
self._wait_startup_before_caching or
self._hash_ring is None or
not self._hash_ring.nodes or
cache_timeout >= self._last_time_loaded):
nodes = db_hash_ring.get_active_nodes(
constants.HASH_RING_NODES_TIMEOUT)
self._hash_ring = hashring.HashRing({node.node_uuid
for node in nodes})
self._last_time_loaded = timeutils.utcnow()
def refresh(self):
self._load_hash_ring(refresh=True)
def get_node(self, key):
self._load_hash_ring()
# tooz expects a byte string for the hash
if isinstance(key, six.string_types):
key = key.encode('utf-8')
try:
# We need to pop the value from the set. If empty,
# KeyError is raised
return self._hash_ring[key].pop()
except KeyError:
raise exceptions.HashRingIsEmpty(key=key)

61
networking_ovn/db/hash_ring.py

@ -0,0 +1,61 @@
# Copyright 2019 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from neutron_lib.db import api as db_api
from oslo_config import cfg
from oslo_utils import timeutils
from oslo_utils import uuidutils
from networking_ovn.db import models
CONF = cfg.CONF
def add_node(node_uuid=None):
if node_uuid is None:
node_uuid = uuidutils.generate_uuid()
session = db_api.get_writer_session()
with session.begin():
row = models.OVNHashRing(node_uuid=node_uuid, hostname=CONF.host)
session.add(row)
return node_uuid
def remove_nodes_from_host():
session = db_api.get_writer_session()
with session.begin():
session.query(models.OVNHashRing).filter_by(
hostname=CONF.host).delete()
def touch_nodes_from_host():
session = db_api.get_writer_session()
with session.begin():
session.query(models.OVNHashRing).filter_by(
hostname=CONF.host).update({'updated_at': timeutils.utcnow()})
def get_active_nodes(interval, from_host=False):
session = db_api.get_reader_session()
limit = timeutils.utcnow() - datetime.timedelta(seconds=interval)
with session.begin():
query = session.query(models.OVNHashRing).filter(
models.OVNHashRing.updated_at >= limit)
if from_host:
query = query.filter_by(hostname=CONF.host)
return query.all()

2
networking_ovn/db/migration/alembic_migrations/versions/EXPAND_HEAD

@ -1 +1 @@
5c198d2723b6
4a478c5c1e16

42
networking_ovn/db/migration/alembic_migrations/versions/stein/expand/4a478c5c1e16_add_ovn_hash_ring_table.py

@ -0,0 +1,42 @@
# Copyright 2019 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from alembic import op
import sqlalchemy as sa
"""add ovn_hash_ring table
Revision ID: 4a478c5c1e16
Revises: 5c198d2723b6
Create Date: 2019-04-09 10:43:48.960899
"""
# revision identifiers, used by Alembic.
revision = '4a478c5c1e16'
down_revision = '5c198d2723b6'
def upgrade():
op.create_table(
'ovn_hash_ring',
sa.Column('node_uuid', sa.String(36), nullable=False,
primary_key=True),
sa.Column('hostname', sa.String(length=256), nullable=False),
sa.Column('created_at', sa.DateTime, nullable=False,
default=sa.func.now()),
sa.Column('updated_at', sa.DateTime, nullable=False,
default=sa.func.now()),
)

10
networking_ovn/db/models.py

@ -38,3 +38,13 @@ class OVNRevisionNumbers(model_base.BASEV2):
default=sa.func.now(), nullable=False)
updated_at = sa.Column(sa.TIMESTAMP, default=sa.func.now(),
onupdate=sa.func.now(), nullable=True)
class OVNHashRing(model_base.BASEV2):
__tablename__ = 'ovn_hash_ring'
node_uuid = sa.Column(sa.String(36), nullable=False, primary_key=True)
hostname = sa.Column(sa.String(256), nullable=False)
created_at = sa.Column(sa.DateTime(), default=sa.func.now(),
nullable=False)
updated_at = sa.Column(sa.DateTime(), default=sa.func.now(),
nullable=False)

121
networking_ovn/tests/unit/common/test_hash_ring_manager.py

@ -0,0 +1,121 @@
# Copyright 2019 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import mock
from oslo_utils import timeutils
from networking_ovn.common import constants
from networking_ovn.common import exceptions
from networking_ovn.common import hash_ring_manager
from networking_ovn.db import hash_ring as db_hash_ring
from networking_ovn.tests.unit.db import base as db_base
class TestHashRingManager(db_base.DBTestCase):
def setUp(self):
super(TestHashRingManager, self).setUp()
self.hash_ring_manager = hash_ring_manager.HashRingManager()
def _verify_hashes(self, hash_dict):
for target_node, uuid_ in hash_dict.items():
self.assertEqual(target_node,
self.hash_ring_manager.get_node(uuid_))
def test_get_node(self):
# Use pre-defined UUIDs to make the hashes predictable
node_1_uuid = db_hash_ring.add_node('node-1')
node_2_uuid = db_hash_ring.add_node('node-2')
hash_dict_before = {node_1_uuid: 'fake-uuid',
node_2_uuid: 'fake-uuid-0'}
self._verify_hashes(hash_dict_before)
def test_get_node_no_active_nodes(self):
self.assertRaises(
exceptions.HashRingIsEmpty, self.hash_ring_manager.get_node,
'fake-uuid')
def test_ring_rebalance(self):
# Use pre-defined UUIDs to make the hashes predictable
node_1_uuid = db_hash_ring.add_node('node-1')
node_2_uuid = db_hash_ring.add_node('node-2')
# Add another node from a different host
with mock.patch.object(db_hash_ring, 'CONF') as mock_conf:
mock_conf.host = 'another-host-52359446-c366'
another_host_node = db_hash_ring.add_node('another-host')
# Assert all nodes are alive in the ring
self.hash_ring_manager.refresh()
self.assertEqual(3, len(self.hash_ring_manager._hash_ring.nodes))
# Hash certain values against the nodes
hash_dict_before = {node_1_uuid: 'fake-uuid',
node_2_uuid: 'fake-uuid-0',
another_host_node: 'fake-uuid-ABCDE'}
self._verify_hashes(hash_dict_before)
# Mock utcnow() as the HASH_RING_NODES_TIMEOUT have expired
# already and touch the nodes from our host
fake_utcnow = timeutils.utcnow() - datetime.timedelta(
seconds=constants.HASH_RING_NODES_TIMEOUT)
with mock.patch.object(timeutils, 'utcnow') as mock_utcnow:
mock_utcnow.return_value = fake_utcnow
db_hash_ring.touch_nodes_from_host()
# Now assert that the ring was re-balanced and only the node from
# another host is marked as alive
self.hash_ring_manager.refresh()
self.assertEqual([another_host_node],
list(self.hash_ring_manager._hash_ring.nodes.keys()))
# Now only "another_host_node" is alive, all values should hash to it
hash_dict_after_rebalance = {another_host_node: 'fake-uuid',
another_host_node: 'fake-uuid-0',
another_host_node: 'fake-uuid-ABCDE'}
self._verify_hashes(hash_dict_after_rebalance)
# Now touch the nodes so they appear active again
db_hash_ring.touch_nodes_from_host()
self.hash_ring_manager.refresh()
# The ring should re-balance and as it was before
self._verify_hashes(hash_dict_before)
def test__wait_startup_before_caching(self):
db_hash_ring.add_node('node-1')
db_hash_ring.add_node('node-2')
# Assert it will return True until created_at != updated_at
self.assertTrue(self.hash_ring_manager._wait_startup_before_caching)
self.assertTrue(self.hash_ring_manager._cache_startup_timeout)
# Touch the nodes (== update the updated_at column)
db_hash_ring.touch_nodes_from_host()
# Assert it's now False. Waiting is not needed anymore
self.assertFalse(self.hash_ring_manager._wait_startup_before_caching)
self.assertFalse(self.hash_ring_manager._cache_startup_timeout)
# Now assert that since the _cache_startup_timeout has been
# flipped, we no longer will read from the database
with mock.patch.object(hash_ring_manager.db_hash_ring,
'get_active_nodes') as get_nodes_mock:
self.assertFalse(
self.hash_ring_manager._wait_startup_before_caching)
self.assertFalse(get_nodes_mock.called)

133
networking_ovn/tests/unit/db/test_hash_ring.py

@ -0,0 +1,133 @@
# Copyright 2019 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import mock
from neutron_lib.db import api as db_api
from oslo_utils import timeutils
from sqlalchemy.orm import exc
from networking_ovn.db import hash_ring as db_hash_ring
from networking_ovn.db import models
from networking_ovn.tests.unit.db import base as db_base
class TestHashRing(db_base.DBTestCase):
def _get_node_row(self, node_uuid):
try:
session = db_api.get_reader_session()
with session.begin():
return session.query(models.OVNHashRing).filter_by(
node_uuid=node_uuid).one()
except exc.NoResultFound:
pass
def _add_nodes_and_assert_exists(self, count=1):
nodes = []
for i in range(count):
node_uuid = db_hash_ring.add_node()
self.assertIsNotNone(self._get_node_row(node_uuid))
nodes.append(node_uuid)
return nodes
def test_add_node(self):
self._add_nodes_and_assert_exists()
def test_remove_nodes_from_host(self):
nodes = self._add_nodes_and_assert_exists(count=3)
# Add another node from a different host
with mock.patch.object(db_hash_ring, 'CONF') as mock_conf:
mock_conf.host = 'another-host-52359446-c366'
another_host_node = self._add_nodes_and_assert_exists()[0]
db_hash_ring.remove_nodes_from_host()
# Assert that all nodes from that host have been removed
for n in nodes:
self.assertIsNone(self._get_node_row(n))
# Assert that the node from another host wasn't removed
self.assertIsNotNone(self._get_node_row(another_host_node))
def test_touch_nodes_from_host(self):
nodes = self._add_nodes_and_assert_exists(count=3)
# Add another node from a different host
with mock.patch.object(db_hash_ring, 'CONF') as mock_conf:
mock_conf.host = 'another-host-52359446-c366'
another_host_node = self._add_nodes_and_assert_exists()[0]
# Assert that updated_at isn't updated yet
for node in nodes:
node_db = self._get_node_row(node)
self.assertEqual(node_db.created_at, node_db.updated_at)
# Assert the same for the node from another host
node_db = self._get_node_row(another_host_node)
self.assertEqual(node_db.created_at, node_db.updated_at)
# Touch the nodes from our host
db_hash_ring.touch_nodes_from_host()
# Assert that updated_at is now updated
for node in nodes:
node_db = self._get_node_row(node)
self.assertGreater(node_db.updated_at, node_db.created_at)
# Assert that the node from another host hasn't been touched
# (updated_at is not updated)
node_db = self._get_node_row(another_host_node)
self.assertEqual(node_db.created_at, node_db.updated_at)
def test_active_nodes(self):
self._add_nodes_and_assert_exists(count=3)
# Add another node from a different host
with mock.patch.object(db_hash_ring, 'CONF') as mock_conf:
mock_conf.host = 'another-host-52359446-c366'
another_host_node = self._add_nodes_and_assert_exists()[0]
# Assert all nodes are active (within 60 seconds)
self.assertEqual(4, len(db_hash_ring.get_active_nodes(interval=60)))
# Substract 60 seconds from utcnow() and touch the nodes from
# our host
fake_utcnow = timeutils.utcnow() - datetime.timedelta(seconds=60)
with mock.patch.object(timeutils, 'utcnow') as mock_utcnow:
mock_utcnow.return_value = fake_utcnow
db_hash_ring.touch_nodes_from_host()
# Now assert that all nodes from our host are seeing as offline.
# Only the node from another host should be active
active_nodes = db_hash_ring.get_active_nodes(interval=60)
self.assertEqual(1, len(active_nodes))
self.assertEqual(another_host_node, active_nodes[0].node_uuid)
def test_active_nodes_from_host(self):
self._add_nodes_and_assert_exists(count=3)
# Add another node from a different host
another_host_id = 'another-host-52359446-c366'
with mock.patch.object(db_hash_ring, 'CONF') as mock_conf:
mock_conf.host = another_host_id
self._add_nodes_and_assert_exists()
# Assert only the 3 nodes from this host is returned
active_nodes = db_hash_ring.get_active_nodes(interval=60,
from_host=True)
self.assertEqual(3, len(active_nodes))
self.assertNotIn(another_host_id, active_nodes)

1
requirements.txt

@ -16,6 +16,7 @@ Babel!=2.4.0,>=2.3.4 # BSD
six>=1.10.0 # MIT
neutron>=13.0.0.0b2 # Apache-2.0
octavia-lib>=1.1.1 # Apache-2.0
tooz>=1.58.0 # Apache-2.0
# The comment below indicates this project repo is current with neutron-lib
# and should receive neutron-lib consumption patches as they are released

Loading…
Cancel
Save