Migrate ActivePort to the new model framework

Change-Id: I3cb89d7eb7c0c2f71ce2a3cf958e4282b66ce8e0
Partially-Implements: bp refactor-nb-api
This commit is contained in:
Omer Anson 2017-06-06 17:10:04 +03:00 committed by Dima Kuznetsov
parent 4f860a5df4
commit 01a295fed7
11 changed files with 82 additions and 273 deletions

View File

@ -25,6 +25,7 @@ from ryu.ofproto import ether
from dragonflow import conf as cfg
from dragonflow.controller.common import constants as controller_const
from dragonflow.controller import df_base_app
from dragonflow.db.models import active_port
from dragonflow.db.models import l2
LOG = log.getLogger(__name__)
@ -64,10 +65,16 @@ class ActivePortDetectionApp(df_base_app.DFlowApp):
arp_pkt.src_ip == arp_pkt.dst_ip) or \
arp_pkt.opcode == arp.ARP_REPLY:
match = msg.match
in_port = match.get('in_port', None)
if in_port:
self._update_active_port_in_db(
arp_pkt.src_ip, arp_pkt.src_mac, in_port)
unique_key = match.get('reg6')
if not unique_key:
return
lport = self.db_store2.get_one(
l2.LogicalPort(unique_key=unique_key),
index=l2.LogicalPort.get_index('unique_key'))
if not lport:
return
self._update_active_port_in_db(
arp_pkt.src_ip, arp_pkt.src_mac, lport)
def _get_ips_in_allowed_address_pairs(self, lport):
ips = set()
@ -163,59 +170,42 @@ class ActivePortDetectionApp(df_base_app.DFlowApp):
match=gratuitous_arp_match,
command=ofproto.OFPFC_DELETE)
def _if_old_active_port_need_update(self, old_port, ip, mac, found_lport):
if (old_port.get_network_id() == found_lport.lswitch.id and
old_port.get_ip() == ip and
old_port.get_detected_mac() == mac and
old_port.get_topic() == found_lport.topic and
old_port.get_detected_lport_id() == found_lport.id):
return False
def _get_active_port_id(self, lswitch, ip_str):
return lswitch.id + ip_str
return True
def _update_active_port_in_db(self, ip, mac, ofport):
lports = self.db_store2.get_all(l2.LogicalPort)
found_lport = None
for lport in lports:
if ofport == lport.ofport:
found_lport = lport
break
if found_lport is None:
LOG.info("There is no logical port matched this "
"ofport(%s).", ofport)
return
lswitch = found_lport.lswitch
topic = found_lport.topic
found_lport_id = found_lport.id
key = lswitch.id + ip
old_active_port = self.db_store.get_active_port(key)
if (not old_active_port or self._if_old_active_port_need_update(
old_active_port, ip, mac, found_lport)):
LOG.info("Detected new active node. ip=%(ip)s, "
def _update_active_port_in_db(self, ip_str, mac, lport):
lswitch = lport.lswitch
topic = lport.topic
found_lport_id = lport.id
key = self._get_active_port_id(lswitch, ip_str)
old_active_port = self.db_store2.get_one(
active_port.AllowedAddressPairsActivePort(id=key))
new_active_port = active_port.AllowedAddressPairsActivePort(
id=key,
topic=topic,
network=lswitch.id,
ip=ip_str,
detected_mac=mac,
detected_lport=found_lport_id
)
if not old_active_port:
LOG.info("Detected new active node. ip=%(ip_str)s, "
"mac=%(mac)s, lport_id=%(lport_id)s",
{'ip': ip, 'mac': mac, 'lport_id': found_lport_id})
if old_active_port:
self.nb_api.update_active_port(
id=key,
topic=topic,
detected_mac=mac,
detected_lport_id=found_lport_id)
else:
self.nb_api.create_active_port(
id=key,
topic=topic,
network_id=lswitch.id,
ip=ip,
detected_mac=mac,
detected_lport_id=found_lport_id)
{'ip': ip_str, 'mac': mac, 'lport_id': found_lport_id})
self.nb_api.create(new_active_port)
elif old_active_port != new_active_port:
LOG.info("Detected update in active node. ip=%(ip_str)s, "
"mac=%(mac)s, lport_id=%(lport_id)s",
{'ip': ip_str, 'mac': mac, 'lport_id': found_lport_id})
self.nb_api.update(new_active_port)
def _remove_active_port_from_db_by_lport(self, network_id, ip, lport):
key = network_id + ip
old_active_port = self.db_store.get_active_port(key)
def _remove_active_port_from_db_by_lport(self, lswitch, ip_str, lport):
key = self._get_active_port_id(lswitch, ip_str)
old_active_port = self.db_store2.get_one(
active_port.AllowedAddressPairsActivePort(id=key))
if (old_active_port and
old_active_port.get_detected_lport_id() == lport.id):
self.nb_api.delete_active_port(key, lport.topic)
old_active_port.detected_lport.id == lport.id):
self.nb_api.delete(old_active_port)
def _add_target_ip(self, ip, lport):
# install flows which send the arp reply or gratuitous arp to
@ -245,7 +235,7 @@ class ActivePortDetectionApp(df_base_app.DFlowApp):
# Try to remove the active node detected from this lport and used
# this ip from dragonflow DB
self._remove_active_port_from_db_by_lport(lport.lswitch.id,
self._remove_active_port_from_db_by_lport(lport.lswitch,
str(ip), lport)
def _get_detect_items(self):

View File

@ -111,20 +111,6 @@ class DfLocalController(object):
self._register_models()
self.db_sync_loop()
def _register_legacy_model_refreshers(self):
refreshers = [
df_db_objects_refresh.DfObjectRefresher(
'Active Ports',
self.db_store.get_active_port_keys,
self.nb_api.get_active_ports,
self.update_activeport,
self.delete_activeport,
),
]
for refresher in refreshers:
df_db_objects_refresh.add_refresher(refresher)
def _register_models(self):
for model in model_framework.iter_models_by_dependency_order():
# FIXME (dimak) do not register topicless models for now
@ -150,8 +136,6 @@ class DfLocalController(object):
),
)
self._register_legacy_model_refreshers()
def db_sync_loop(self):
while True:
time.sleep(1)
@ -335,35 +319,6 @@ class DfLocalController(object):
def ovs_sync_started(self):
self.open_flow_app.notify_ovs_sync_started()
def update_activeport(self, active_port):
old_active_port = self.db_store.get_active_port(active_port.get_id())
lean_lport = l2.LogicalPort(id=active_port.get_detected_lport_id(),
topic=active_port.get_topic())
lport = self.db_store2.get_one(lean_lport)
LOG.info("Active port updated. Active port = %(new)s, "
"old active port = %(old)s",
{'new': active_port, 'old': old_active_port})
self.db_store.update_active_port(active_port.get_id(),
active_port)
if lport:
self.open_flow_app.notify_update_active_port(active_port,
old_active_port)
else:
LOG.info("The logical port is not ready for the "
"active node: %s", active_port)
def delete_activeport(self, active_port_key):
active_port = self.db_store.get_active_port(active_port_key)
if active_port is not None:
self.db_store.delete_active_port(active_port_key)
LOG.info("Active node was removed. Active node = %s",
active_port)
lean_lport = l2.LogicalPort(id=active_port.get_detected_lport_id(),
topic=active_port.get_topic())
lport = self.db_store2.get_one(lean_lport)
if lport is not None:
self.open_flow_app.notify_remove_active_port(active_port)
def _is_newer(self, obj, cached_obj):
'''Check wether obj is newer than cached_on.

View File

@ -83,14 +83,6 @@ class RyuDFAdapter(ofp_handler.OFPHandler):
def notify_ovs_sync_started(self):
self.dispatcher.dispatch('ovs_sync_started')
def notify_update_active_port(self, active_port, old_active_port):
self.dispatcher.dispatch('update_active_port',
active_port=active_port,
old_active_port=old_active_port)
def notify_remove_active_port(self, active_port):
self.dispatcher.dispatch('remove_active_port', active_port)
@handler.set_ev_handler(ofp_event.EventOFPSwitchFeatures,
handler.CONFIG_DISPATCHER)
def switch_features_handler(self, ev):

View File

@ -18,7 +18,6 @@ import eventlet
from jsonmodels import errors
from oslo_config import cfg
from oslo_log import log
from oslo_serialization import jsonutils
from oslo_utils import excutils
import dragonflow.common.exceptions as df_exceptions
@ -26,7 +25,6 @@ from dragonflow.common import utils as df_utils
from dragonflow.db import db_common
from dragonflow.db import model_framework as mf
from dragonflow.db import model_proxy as mproxy
from dragonflow.db import models as db_models
from dragonflow.db.models import core
@ -267,47 +265,6 @@ class NbApi(object):
obj = model_class.from_json(value)
self.controller.update(obj)
def create_active_port(self, id, topic, **columns):
active_port = {'topic': topic}
for col, val in columns.items():
active_port[col] = val
active_port_json = jsonutils.dumps(active_port)
self.driver.create_key(
db_models.AllowedAddressPairsActivePort.table_name, id,
active_port_json, topic)
self._send_db_change_event(
db_models.AllowedAddressPairsActivePort.table_name, id, 'create',
active_port_json, topic)
def update_active_port(self, id, topic, **columns):
active_port_json = self.driver.get_key(
db_models.AllowedAddressPairsActivePort.table_name, id, topic)
active_port = jsonutils.loads(active_port_json)
active_port['topic'] = topic
for col, val in columns.items():
active_port[col] = val
active_port_json = jsonutils.dumps(active_port)
self.driver.set_key(db_models.AllowedAddressPairsActivePort.table_name,
id, active_port_json, topic)
self._send_db_change_event(
db_models.AllowedAddressPairsActivePort.table_name, id, 'set',
active_port_json, topic)
def delete_active_port(self, id, topic):
self.driver.delete_key(
db_models.AllowedAddressPairsActivePort.table_name, id, topic)
self._send_db_change_event(
db_models.AllowedAddressPairsActivePort.table_name, id, 'delete',
id, topic)
def get_active_ports(self, topic=None):
res = []
for active_port_json in self.driver.get_all_entries(
db_models.AllowedAddressPairsActivePort.table_name, topic):
res.append(db_models.AllowedAddressPairsActivePort(
active_port_json))
return res
def create(self, obj, skip_send_event=False):
"""Create the provided object in the database and publish an event
about its creation.

View File

@ -16,8 +16,6 @@
import collections
import threading
from dragonflow.db import models
class TenantDbStore(object):
@ -25,7 +23,6 @@ class TenantDbStore(object):
self.activeports = {}
self.lock = threading.Lock()
self._table_name_mapping = {
models.AllowedAddressPairsActivePort.table_name: self.activeports
}
def _get_table_by_name(self, table_name):
@ -109,32 +106,6 @@ class DbStore(object):
if table_item:
return table_item.get_unique_key()
def get_active_port(self, active_port_key, topic=None):
return self.get(models.AllowedAddressPairsActivePort.table_name,
active_port_key, topic)
def update_active_port(self, active_port_key, active_port, topic=None):
self.set(models.AllowedAddressPairsActivePort.table_name,
active_port_key, active_port, topic)
def delete_active_port(self, active_port_key, topic=None):
self.delete(models.AllowedAddressPairsActivePort.table_name,
active_port_key, topic)
def get_active_ports(self, topic=None):
return self.values(models.AllowedAddressPairsActivePort.table_name,
topic)
def get_active_port_keys(self, topic=None):
return self.keys(models.AllowedAddressPairsActivePort.table_name,
topic)
def get_active_ports_by_network_id(self, network_id, topic=None):
activeports = self.values(
models.AllowedAddressPairsActivePort.table_name, topic)
return [activeport for activeport in activeports
if activeport.get_network_id() == network_id]
def clear(self, topic=None):
if not topic:
for tenant_db in self.tenant_dbs.values():

View File

@ -15,7 +15,6 @@ from dragonflow.db.models import legacy
NbObject = legacy.NbObject
NbDbObject = legacy.NbDbObject
UniqueKeyMixin = legacy.UniqueKeyMixin
AllowedAddressPairsActivePort = legacy.AllowedAddressPairsActivePort
UNIQUE_KEY = legacy.UNIQUE_KEY

View File

@ -0,0 +1,27 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import dragonflow.db.field_types as df_fields
import dragonflow.db.model_framework as mf
from dragonflow.db.models import l2
from dragonflow.db.models import mixins
@mf.register_model
@mf.construct_nb_db_model
class AllowedAddressPairsActivePort(mf.ModelBase, mixins.Topic,
mixins.BasicEvents):
table_name = "activeport"
ip = df_fields.IpAddressField()
network = df_fields.ReferenceField(l2.LogicalSwitch)
detected_mac = df_fields.MacAddressField()
detected_lport = df_fields.ReferenceField(l2.LogicalPort)

View File

@ -9,6 +9,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from dragonflow.db.models import active_port # noqa
from dragonflow.db.models import bgp # noqa
from dragonflow.db.models import core # noqa
from dragonflow.db.models import l2 # noqa

View File

@ -84,41 +84,3 @@ class UniqueKeyMixin(object):
def get_unique_key(self):
return self.inner_obj.get(UNIQUE_KEY)
@register_model_class
class AllowedAddressPairsActivePort(NbDbObject):
table_name = "activeport"
def get_id(self):
id = self.inner_obj.get('network_id') + self.inner_obj.get('ip')
return id
def get_ip(self):
return self.inner_obj.get('ip')
def get_network_id(self):
return self.inner_obj.get('network_id')
def get_detected_mac(self):
return self.inner_obj.get('detected_mac')
def get_detected_lport_id(self):
return self.inner_obj.get('detected_lport_id')
def __eq__(self, other):
if isinstance(other, self.__class__):
if (self.get_network_id() == other.get_network_id() and
self.get_ip() == other.get_ip() and
self.get_detected_mac() == other.get_detected_mac() and
self.get_topic() == other.get_topic() and
(self.get_detected_lport_id() ==
other.get_detected_lport_id())):
return True
return False
def __ne__(self, other):
if self == other:
return False
return True

View File

@ -23,6 +23,7 @@ from ryu.ofproto import inet
from dragonflow import conf as cfg
from dragonflow.controller.common import constants
from dragonflow.db.models import active_port
from dragonflow.db.models import l2
from dragonflow.db.models import l3
from dragonflow.tests.common import app_testing_objects
@ -2022,23 +2023,23 @@ class TestAllowedAddressPairsDetectActive(test_base.DFTestBase):
def _is_expected_active_port(self, active_port):
lport = self.port.port.get_logical_port()
if lport.topic != active_port.get_topic():
if lport.topic != active_port.topic:
return False
if lport.id != active_port.get_detected_lport_id():
if lport.id != active_port.detected_lport.id:
return False
if lport.lswitch.id != active_port.get_network_id():
if lport.lswitch.id != active_port.network.id:
return False
if str(self.allowed_address_pair_ip_address) != active_port.get_ip():
if self.allowed_address_pair_ip_address != active_port.ip:
return False
if str(self.allowed_address_pair_mac_address) != \
active_port.get_detected_mac():
if self.allowed_address_pair_mac_address != active_port.detected_mac:
return False
return True
def _if_the_expected_active_port_exists(self):
active_ports = self.nb_api.get_active_ports()
for active_port in active_ports:
if self._is_expected_active_port(active_port):
active_ports = self.nb_api.get_all(
active_port.AllowedAddressPairsActivePort)
for port in active_ports:
if self._is_expected_active_port(port):
return True
return False

View File

@ -16,7 +16,6 @@ from oslo_config import cfg
from dragonflow.common import constants
from dragonflow.controller import df_local_controller
from dragonflow.controller import ryu_base_app
from dragonflow.db import db_store
from dragonflow.db import db_store2
from dragonflow.db import model_proxy
from dragonflow.db.models import core
@ -53,51 +52,6 @@ class DfLocalControllerTestCase(test_app_base.DFAppTestBase):
mock_delete_lport.assert_called_once_with(lport)
mock_db_store2_delete.assert_called_once_with(chassis)
@mock.patch.object(ryu_base_app.RyuDFAdapter,
'notify_update_active_port')
@mock.patch.object(db_store.DbStore, 'update_active_port')
@mock.patch.object(db_store2.DbStore2, 'get_one')
@mock.patch.object(db_store.DbStore, 'get_active_port')
def test_update_activeport(self, mock_get_active, mock_get_one,
mock_update, mock_notify):
active_port = mock.Mock()
active_port.get_id.return_value = 'fake_id'
active_port.get_topic.return_value = 'fake_topic'
active_port.get_detected_lport_id.return_value = 'fake_lport_id'
mock_get_active.return_value = None
mock_update.return_value = None
mock_get_one.return_value = None
self.assertIsNone(self.controller.update_activeport(active_port))
mock_notify.assert_not_called()
lport = mock.Mock()
mock_get_one.return_value = lport
self.assertIsNone(self.controller.update_activeport(active_port))
mock_notify.assert_called_once_with(active_port, None)
@mock.patch.object(ryu_base_app.RyuDFAdapter,
'notify_remove_active_port')
@mock.patch.object(db_store.DbStore, 'delete_active_port')
@mock.patch.object(db_store2.DbStore2, 'get_one')
@mock.patch.object(db_store.DbStore, 'get_active_port')
def test_delete_activeport(self, mock_get_active, mock_get_one,
mock_delete, mock_notify):
active_port = mock.Mock()
active_port.get_topic.return_value = 'fake_topic'
active_port.get_detected_lport_id.return_value = 'fake_lport_id'
mock_get_active.return_value = None
self.assertIsNone(self.controller.delete_activeport('fake_id'))
mock_notify.assert_not_called()
mock_get_active.return_value = active_port
mock_delete.return_value = None
lport = mock.Mock()
mock_get_one.return_value = lport
self.assertIsNone(self.controller.delete_activeport('fake_id'))
mock_notify.assert_called_once_with(active_port)
def test_register_chassis(self):
cfg.CONF.set_override('external_host_ip',
'172.24.4.100',