Improve data access method of ryu-agent
fixes bug #1110174 This patch implement rpc in ryu-agent, instead of accessing a database directly. Because it was not necessary to transmit information via database, therefore the table is eliminated. Also, I remove openflow controller stuff from a configuration file of the Ryu plugin because it was not used anymore. Change-Id: I5e261297c3f92c6a1ac5df229084176e84694e87
This commit is contained in:
parent
4ab676fc4f
commit
40e76356dd
@ -16,9 +16,7 @@ sql_connection = sqlite://
|
||||
[OVS]
|
||||
integration_bridge = br-int
|
||||
|
||||
# openflow_controller = <host IP address of ofp controller>:<port: 6633>
|
||||
# openflow_rest_api = <host IP address of ofp rest api service>:<port: 8080>
|
||||
openflow_controller = 127.0.0.1:6633
|
||||
openflow_rest_api = 127.0.0.1:8080
|
||||
|
||||
# tunnel key range: 0 < tunnel_key_min < tunnel_key_max
|
||||
|
@ -0,0 +1,59 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2013 OpenStack LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
"""ryu plugin update
|
||||
|
||||
Revision ID: 49332180ca96
|
||||
Revises: 1149d7de0cfa
|
||||
Create Date: 2013-01-30 07:52:58.472885
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '49332180ca96'
|
||||
down_revision = '1149d7de0cfa'
|
||||
|
||||
# Change to ['*'] if this migration applies to all plugins
|
||||
|
||||
migration_for_plugins = [
|
||||
'quantum.plugins.ryu.ryu_quantum_plugin.RyuQuantumPluginV2'
|
||||
]
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
from quantum.db import migration
|
||||
|
||||
|
||||
def upgrade(active_plugin=None, options=None):
|
||||
if not migration.should_run(active_plugin, migration_for_plugins):
|
||||
return
|
||||
|
||||
op.drop_table('ofp_server')
|
||||
|
||||
|
||||
def downgrade(active_plugin=None, options=None):
|
||||
if not migration.should_run(active_plugin, migration_for_plugins):
|
||||
return
|
||||
|
||||
op.create_table(
|
||||
'ofp_server',
|
||||
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
|
||||
sa.Column('address', sa.String(length=255)),
|
||||
sa.Column('host_type', sa.String(length=255)),
|
||||
sa.PrimaryKeyConstraint(u'id')
|
||||
)
|
@ -28,11 +28,14 @@ import netifaces
|
||||
from ryu.app import client
|
||||
from ryu.app import conf_switch_key
|
||||
from ryu.app import rest_nw_id
|
||||
from sqlalchemy.ext.sqlsoup import SqlSoup
|
||||
|
||||
from quantum.agent.linux import ovs_lib
|
||||
from quantum.agent.linux.ovs_lib import VifPort
|
||||
from quantum.agent import rpc as agent_rpc
|
||||
from quantum.common import config as logging_config
|
||||
from quantum.common import exceptions as q_exc
|
||||
from quantum.common import topics
|
||||
from quantum import q_context
|
||||
from quantum.openstack.common import cfg
|
||||
from quantum.openstack.common.cfg import NoSuchGroupError
|
||||
from quantum.openstack.common.cfg import NoSuchOptError
|
||||
@ -40,7 +43,6 @@ from quantum.openstack.common import log
|
||||
from quantum.plugins.ryu.common import config
|
||||
|
||||
|
||||
cfg.CONF.import_opt('sql_connection', 'quantum.db.api', 'DATABASE')
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
@ -148,29 +150,42 @@ class VifPortSet(object):
|
||||
port.switch.datapath_id, port.ofport)
|
||||
|
||||
|
||||
class RyuPluginApi(agent_rpc.PluginApi):
|
||||
def get_ofp_rest_api_addr(self, context):
|
||||
LOG.debug(_("Get Ryu rest API address"))
|
||||
return self.call(context,
|
||||
self.make_msg('get_ofp_rest_api'),
|
||||
topic=self.topic)
|
||||
|
||||
|
||||
class OVSQuantumOFPRyuAgent(object):
|
||||
def __init__(self, integ_br, ofp_rest_api_addr,
|
||||
tunnel_ip, ovsdb_ip, ovsdb_port,
|
||||
def __init__(self, integ_br, tunnel_ip, ovsdb_ip, ovsdb_port,
|
||||
root_helper):
|
||||
super(OVSQuantumOFPRyuAgent, self).__init__()
|
||||
self.int_br = None
|
||||
self.vif_ports = None
|
||||
self._setup_integration_br(root_helper, integ_br,
|
||||
ofp_rest_api_addr,
|
||||
tunnel_ip, ovsdb_port, ovsdb_ip)
|
||||
self._setup_rpc()
|
||||
self._setup_integration_br(root_helper, integ_br, tunnel_ip,
|
||||
ovsdb_port, ovsdb_ip)
|
||||
|
||||
def _setup_rpc(self):
|
||||
self.plugin_rpc = RyuPluginApi(topics.PLUGIN)
|
||||
self.context = q_context.get_admin_context_without_session()
|
||||
|
||||
def _setup_integration_br(self, root_helper, integ_br,
|
||||
ofp_rest_api_addr,
|
||||
tunnel_ip, ovsdb_port, ovsdb_ip):
|
||||
self.int_br = OVSBridge(integ_br, root_helper)
|
||||
self.int_br.find_datapath_id()
|
||||
|
||||
ryu_rest_client = client.OFPClient(ofp_rest_api_addr)
|
||||
rest_api_addr = self.plugin_rpc.get_ofp_rest_api_addr(self.context)
|
||||
if not rest_api_addr:
|
||||
raise q_exc.Invalid(_("Ryu rest API port isn't specified"))
|
||||
LOG.debug(_("Going to ofp controller mode %s"), rest_api_addr)
|
||||
|
||||
ryu_rest_client = client.OFPClient(rest_api_addr)
|
||||
|
||||
self.vif_ports = VifPortSet(self.int_br, ryu_rest_client)
|
||||
self.vif_ports.setup()
|
||||
|
||||
sc_client = client.SwitchConfClient(ofp_rest_api_addr)
|
||||
sc_client = client.SwitchConfClient(rest_api_addr)
|
||||
sc_client.set_key(self.int_br.datapath_id,
|
||||
conf_switch_key.OVS_TUNNEL_ADDR, tunnel_ip)
|
||||
|
||||
@ -180,31 +195,6 @@ class OVSQuantumOFPRyuAgent(object):
|
||||
'tcp:%s:%d' % (ovsdb_ip, ovsdb_port))
|
||||
|
||||
|
||||
def check_ofp_rest_api_addr(db):
|
||||
LOG.debug(_("Checking db"))
|
||||
|
||||
servers = db.ofp_server.all()
|
||||
|
||||
ofp_controller_addr = None
|
||||
ofp_rest_api_addr = None
|
||||
for serv in servers:
|
||||
if serv.host_type == "REST_API":
|
||||
ofp_rest_api_addr = serv.address
|
||||
elif serv.host_type == "controller":
|
||||
ofp_controller_addr = serv.address
|
||||
else:
|
||||
LOG.warn(_("Ignoring unknown server type %s"), serv)
|
||||
|
||||
LOG.debug(_("API %s"), ofp_rest_api_addr)
|
||||
if ofp_controller_addr:
|
||||
LOG.warn(_('OF controller parameter is stale %s'), ofp_controller_addr)
|
||||
if not ofp_rest_api_addr:
|
||||
raise RuntimeError(_("Ryu rest API port isn't specified"))
|
||||
|
||||
LOG.debug(_("Going to ofp controller mode %s"), ofp_rest_api_addr)
|
||||
return ofp_rest_api_addr
|
||||
|
||||
|
||||
def main():
|
||||
cfg.CONF(project='quantum')
|
||||
|
||||
@ -212,13 +202,6 @@ def main():
|
||||
|
||||
integ_br = cfg.CONF.OVS.integration_bridge
|
||||
root_helper = cfg.CONF.AGENT.root_helper
|
||||
options = {"sql_connection": cfg.CONF.DATABASE.sql_connection}
|
||||
db = SqlSoup(options["sql_connection"])
|
||||
|
||||
LOG.info(_("Connecting to database \"%(database)s\" on %(host)s"),
|
||||
{"database": db.engine.url.database,
|
||||
"host": db.engine.url.host})
|
||||
ofp_rest_api_addr = check_ofp_rest_api_addr(db)
|
||||
|
||||
tunnel_ip = _get_tunnel_ip()
|
||||
LOG.debug(_('tunnel_ip %s'), tunnel_ip)
|
||||
@ -227,8 +210,8 @@ def main():
|
||||
ovsdb_ip = _get_ovsdb_ip()
|
||||
LOG.debug(_('ovsdb_ip %s'), ovsdb_ip)
|
||||
try:
|
||||
OVSQuantumOFPRyuAgent(integ_br, ofp_rest_api_addr,
|
||||
tunnel_ip, ovsdb_ip, ovsdb_port, root_helper)
|
||||
OVSQuantumOFPRyuAgent(integ_br, tunnel_ip, ovsdb_ip, ovsdb_port,
|
||||
root_helper)
|
||||
except httplib.HTTPException, e:
|
||||
LOG.error(_("Initialization failed: %s"), e)
|
||||
sys.exit(1)
|
||||
|
@ -20,8 +20,6 @@ from quantum.openstack.common import cfg
|
||||
ovs_opts = [
|
||||
cfg.StrOpt('integration_bridge', default='br-int',
|
||||
help=_("Integration bridge to use")),
|
||||
cfg.StrOpt('openflow_controller', default='127.0.0.1:6633',
|
||||
help=_("OpenFlow controller to connect to")),
|
||||
cfg.StrOpt('openflow_rest_api', default='127.0.0.1:8080',
|
||||
help=_("OpenFlow REST API location")),
|
||||
cfg.IntOpt('tunnel_key_min', default=1,
|
||||
|
@ -28,16 +28,6 @@ from quantum.plugins.ryu.db import models_v2 as ryu_models_v2
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def set_ofp_servers(hosts):
|
||||
session = db.get_session()
|
||||
session.query(ryu_models_v2.OFPServer).delete()
|
||||
for (host_address, host_type) in hosts:
|
||||
host = ryu_models_v2.OFPServer(address=host_address,
|
||||
host_type=host_type)
|
||||
session.add(host)
|
||||
session.flush()
|
||||
|
||||
|
||||
def network_all_tenant_list():
|
||||
session = db.get_session()
|
||||
return session.query(models_v2.Network).all()
|
||||
|
@ -19,20 +19,6 @@ import sqlalchemy as sa
|
||||
from quantum.db import model_base
|
||||
|
||||
|
||||
class OFPServer(model_base.BASEV2):
|
||||
"""Openflow Server/API address."""
|
||||
__tablename__ = 'ofp_server'
|
||||
|
||||
id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
|
||||
address = sa.Column(sa.String(64)) # netloc <host ip address>:<port>
|
||||
host_type = sa.Column(sa.String(255)) # server type
|
||||
# Controller, REST_API
|
||||
|
||||
def __repr__(self):
|
||||
return "<OFPServer(%s,%s,%s)>" % (self.id, self.address,
|
||||
self.host_type)
|
||||
|
||||
|
||||
class TunnelKeyLast(model_base.BASEV2):
|
||||
"""Lastly allocated Tunnel key. The next key allocation will be started
|
||||
from this value + 1
|
||||
|
@ -1,19 +0,0 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
# Copyright 2012 Isaku Yamahata <yamahata at valinux co jp>
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
# @author: Isaku Yamahata
|
||||
|
||||
CONTROLLER = 'controller'
|
||||
REST_API = 'REST_API'
|
@ -34,7 +34,6 @@ from quantum.openstack.common import log as logging
|
||||
from quantum.openstack.common import rpc
|
||||
from quantum.plugins.ryu.common import config
|
||||
from quantum.plugins.ryu.db import api_v2 as db_api_v2
|
||||
from quantum.plugins.ryu import ofp_service_type
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -45,9 +44,16 @@ class RyuRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
|
||||
|
||||
RPC_API_VERSION = '1.0'
|
||||
|
||||
def __init__(self, ofp_rest_api_addr):
|
||||
self.ofp_rest_api_addr = ofp_rest_api_addr
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
return q_rpc.PluginRpcDispatcher([self])
|
||||
|
||||
def get_ofp_rest_api(self, context, **kwargs):
|
||||
LOG.debug(_("get_ofp_rest_api: %s"), self.ofp_rest_api_addr)
|
||||
return self.ofp_rest_api_addr
|
||||
|
||||
|
||||
class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
|
||||
l3_db.L3_NAT_db_mixin):
|
||||
@ -59,19 +65,13 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
|
||||
|
||||
self.tunnel_key = db_api_v2.TunnelKey(
|
||||
cfg.CONF.OVS.tunnel_key_min, cfg.CONF.OVS.tunnel_key_max)
|
||||
ofp_con_host = cfg.CONF.OVS.openflow_controller
|
||||
ofp_api_host = cfg.CONF.OVS.openflow_rest_api
|
||||
|
||||
if ofp_con_host is None or ofp_api_host is None:
|
||||
self.ofp_api_host = cfg.CONF.OVS.openflow_rest_api
|
||||
if not self.ofp_api_host:
|
||||
raise q_exc.Invalid(_('Invalid configuration. check ryu.ini'))
|
||||
|
||||
hosts = [(ofp_con_host, ofp_service_type.CONTROLLER),
|
||||
(ofp_api_host, ofp_service_type.REST_API)]
|
||||
db_api_v2.set_ofp_servers(hosts)
|
||||
|
||||
self.client = client.OFPClient(ofp_api_host)
|
||||
self.tun_client = client.TunnelClient(ofp_api_host)
|
||||
self.iface_client = client.QuantumIfaceClient(ofp_api_host)
|
||||
self.client = client.OFPClient(self.ofp_api_host)
|
||||
self.tun_client = client.TunnelClient(self.ofp_api_host)
|
||||
self.iface_client = client.QuantumIfaceClient(self.ofp_api_host)
|
||||
for nw_id in rest_nw_id.RESERVED_NETWORK_IDS:
|
||||
if nw_id != rest_nw_id.NW_ID_UNKNOWN:
|
||||
self.client.update_network(nw_id)
|
||||
@ -82,7 +82,7 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
|
||||
|
||||
def _setup_rpc(self):
|
||||
self.conn = rpc.create_connection(new=True)
|
||||
self.callbacks = RyuRpcCallbacks()
|
||||
self.callbacks = RyuRpcCallbacks(self.ofp_api_host)
|
||||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
self.conn.create_consumer(topics.PLUGIN, self.dispatcher, fanout=False)
|
||||
self.conn.consume_in_thread()
|
||||
|
@ -31,5 +31,4 @@ class ConfigurationTest(unittest2.TestCase):
|
||||
self.assertEqual(2, cfg.CONF.DATABASE.reconnect_interval)
|
||||
self.assertEqual(2, cfg.CONF.AGENT.polling_interval)
|
||||
self.assertEqual('sudo', cfg.CONF.AGENT.root_helper)
|
||||
self.assertEqual('127.0.0.1:6633', cfg.CONF.OVS.openflow_controller)
|
||||
self.assertEqual('127.0.0.1:8080', cfg.CONF.OVS.openflow_rest_api)
|
||||
|
@ -24,27 +24,10 @@ from quantum.openstack.common import cfg
|
||||
from quantum.plugins.ryu.common import config
|
||||
from quantum.plugins.ryu.db import api_v2 as db_api_v2
|
||||
from quantum.plugins.ryu.db import models_v2 as ryu_models_v2
|
||||
from quantum.plugins.ryu import ofp_service_type
|
||||
from quantum.tests.unit import test_db_plugin as test_plugin
|
||||
|
||||
|
||||
class RyuDBTest(test_plugin.QuantumDbPluginV2TestCase):
|
||||
def setUp(self):
|
||||
super(RyuDBTest, self).setUp()
|
||||
self.hosts = [(cfg.CONF.OVS.openflow_controller,
|
||||
ofp_service_type.CONTROLLER),
|
||||
(cfg.CONF.OVS.openflow_rest_api,
|
||||
ofp_service_type.REST_API)]
|
||||
db_api_v2.set_ofp_servers(self.hosts)
|
||||
|
||||
def test_ofp_server(self):
|
||||
session = db.get_session()
|
||||
servers = session.query(ryu_models_v2.OFPServer).all()
|
||||
print servers
|
||||
self.assertEqual(len(servers), 2)
|
||||
for s in servers:
|
||||
self.assertTrue((s.address, s.host_type) in self.hosts)
|
||||
|
||||
@staticmethod
|
||||
def _tunnel_key_sort(key_list):
|
||||
key_list.sort(key=operator.attrgetter('tunnel_key'))
|
||||
|
Loading…
Reference in New Issue
Block a user