Add RPC callbacks

This commit is contained in:
Ofer Ben-Yacov 2016-12-29 09:57:55 +02:00
parent 5e8d0cae6c
commit e095cb81c7
17 changed files with 378 additions and 144 deletions

View File

@ -21,7 +21,7 @@ classifier =
[files]
packages =
wan-qos
wan_qos
data_files =
etc/neutron =
etc/wan_qos_agent.ini
@ -33,4 +33,4 @@ console_scripts =
neutron.db.alembic_migrations =
wan-qos = wan_qos.db.migration:alembic_migrations
neutronclient.extension =
wan_qos =wan_qos.wanqos_client._wan_qos
wan_qos = wan_qos.wanqos_client._wanqos

29
setup.py Normal file
View File

@ -0,0 +1,29 @@
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# THIS FILE IS MANAGED BY THE GLOBAL REQUIREMENTS REPO - DO NOT EDIT
import setuptools
# In python < 2.7.4, a lazy loading of package `pbr` will break
# setuptools if some other modules registered functions in `atexit`.
# solution from: http://bugs.python.org/issue15881#msg170215
try:
import multiprocessing # noqa
except ImportError:
pass
setuptools.setup(
setup_requires=['pbr>=1.8'],
pbr=True)

View File

@ -1,63 +0,0 @@
# Copyright (c) 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.agent.ovsdb import api
from neutron.agent.ovsdb import impl_idl
from neutron.agent.ovsdb.native import commands as cmd
from neutron.agent.ovsdb.native import connection
from neutron.agent.ovsdb.native import idlutils
from neutron.agent.ovsdb.native import vlog
def _get_queue_id_list(api, port_name):
port_row = idlutils.row_by_value(api.idl, 'Port', 'name', port_name)
if port_row and port_row.qos:
qos_row = api._tables['QoS'].rows[port_row.qos[0].uuid]
if qos_row:
queues = idlutils.get_column_value(qos_row, 'queues')
return queues.keys()
class GetQueueIdList(cmd.BaseCommand):
def __init__(self, api, port_name):
super(GetQueueIdList, self).__init__(api)
self.port_name = port_name
def run_idl(self, txn):
self.result = _get_queue_id_list(self.api, self.port_name)
class AddQueue(cmd.BaseCommand):
def __init__(self, api, port_name, queue_id, min_rate, max_rate):
super(AddQueue, self).__init__(api)
self.port_name = port_name
self.queue_id = queue_id
self.min_rate = min_rate
self.max_rate = max_rate
def run_idl(self, txn):
port_row = idlutils.row_by_value(self.api.idl, 'Port', 'name',
self.port_name)
qos_row = self.api._tables['QoS'].rows[port_row.qos[0].uuid]
queues = getattr(qos_row, 'queues', [])
if self.queue_id in queues.keys():
raise Exception
queue_row = txn.insert(self.api._tables['Queue'])
queue_row.other_config = {'min-rate': self.min_rate,
'max-rate': self.max_rate}
queues[self.queue_id] = queue_row
qos_row.verify('queues')
qos_row.queues = queues
self.result = 'Done'

View File

@ -1,29 +0,0 @@
# Copyright (c) 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.agent.ovsdb import api
from neutron.agent.ovsdb import impl_idl
from neutron.agent.ovsdb.native import commands as cmd
from neutron.agent.ovsdb.native import connection
from neutron.agent.ovsdb.native import idlutils
from neutron.agent.ovsdb.native import vlog
class OvsdbQosIdl(impl_idl.OvsdbIdl):
def __init__(self, context, conn, timeout):
super(OvsdbQosIdl, self).__init__(context)
self.ovsdb_connection = connection.Connection(conn,
timeout,
'Open_vSwitch')

View File

@ -45,7 +45,6 @@ def main():
common_config.init(sys.argv[1:])
config.setup_logging()
server = neutron_service.Service.create(
binary='neutron-wan-qos-agent',
topic=topics.TC_AGENT,
report_interval=10,
manager='wan_qos.agent.tc_manager.TcAgentManager')

View File

@ -16,7 +16,6 @@
from subprocess import call
from subprocess import check_call
from oslo_config import cfg
from oslo_log import log as logging
import agent_api

View File

@ -15,22 +15,33 @@
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from neutron import context as ctx
from wan_qos.agent import tc_driver
from wan_qos.common import api
from wan_qos.common import topics
LOG = logging.getLogger(__name__)
class TcAgentManager:
def __init__(self, host, conf=None):
target = messaging.Target(version='1.0')
def __init__(self, host=None, conf=None):
self.agent = tc_driver.TcDriver()
if not conf:
self.conf = cfg.CONF
else:
self.conf = conf
if not host:
host = self.conf.host
lan_port = self.conf.WANQOS.lan_port_name
wan_port = self.conf.WANQOS.wan_port_name
self.agent.set_ports(lan_port, wan_port)
self.plugin_rpc = api.TcPluginApi(host, topics.TC_PLUGIN)
def init_host(self):
self.agent.clear_all()
@ -45,9 +56,9 @@ class TcAgentManager:
}
self.agent.set_root_queue(tc_dict)
def after_start(self):
LOG.info("WAN QoS agent started")
def periodic_tasks(self, context, raise_on_error=False):
pass
LOG.info(
self.plugin_rpc.agent_up_notification(ctx.get_admin_context()))

49
wan_qos/common/api.py Normal file
View File

@ -0,0 +1,49 @@
# Copyright 2016 Huawei corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import oslo_messaging
from neutron.common import rpc as n_rpc
from wan_qos.common import topics
class TcPluginApi(object):
def __init__(self, host, topic=topics.TC_PLUGIN):
self.host = host
target = oslo_messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
def agent_up_notification(self, context):
cctxt = self.client.prepare()
return cctxt.cast(context, 'agent_up_notification', host=self.host)
def get_configuration_from_db(self, context):
cctxt = self.client.prepare()
return cctxt.call(context, 'get_configuration_from_db', host=self.host)
class TcAgentApi(object):
def __init__(self, host, topic=topics.TC_AGENT):
self.host = host
target = oslo_messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
def create_wan_qos(self, context, wan_qos_dict):
cctxt = self.client.prepare()
return cctxt.call(context,
'create_wan_qos',
wan_qos_dict)

View File

@ -13,3 +13,5 @@
# License for the specific language governing permissions and limitations
# under the License.
WAN_QOS = 'WAN_QOS'
WANQOS = 'wanqos'

View File

@ -13,5 +13,5 @@
# License for the specific language governing permissions and limitations
# under the License.
TC_AGENT = 'wan_tc_agent'
TC_PLUGIN = 'wan_tc_plugin'
TC_AGENT = 'wan_qos_agent'
TC_PLUGIN = 'wan_qos_plugin'

View File

@ -0,0 +1,105 @@
# Copyright 2016 Huawei corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from neutron_lib.api import extensions
from neutron.api.v2 import resource_helper
from wan_qos.common import constants
RESOURCE_ATTRIBUTE_MAP = {
constants.WAN_QOS: {
'id': {'allow_post': False, 'allow_put': False,
'is_visible': True},
'max_rate': {'allow_post': True, 'allow_put': False,
'validate': {'type:string': None},
'is_visible': True, 'default': ''},
'min_rate': {'allow_post': True, 'allow_put': False,
'validate': {'type:string': None},
'is_visible': True, 'default': ''},
'network_id': {'allow_post': True, 'allow_put': False,
'validate': {'type:string': None},
'is_visible': True},
'tenant_id': {'allow_post': True, 'allow_put': False,
'validate': {'type:string': None},
'required_by_policy': True,
'is_visible': True}
},
}
class WanQos(extensions.ExtensionDescriptor):
@classmethod
def get_name(cls):
return "WAN QoS"
@classmethod
def get_alias(cls):
return "wan_qos"
@classmethod
def get_description(cls):
return "Limit traffic on WAN links"
@classmethod
def get_updated(cls):
return "2016-12-01T00:00:00-00:00"
@classmethod
def get_resources(cls):
"""Returns Ext Resources."""
mem_actions = {}
plural_mappings = resource_helper.build_plural_mappings(
{}, RESOURCE_ATTRIBUTE_MAP)
resources = resource_helper.build_resource_info(plural_mappings,
RESOURCE_ATTRIBUTE_MAP,
constants.WANQOS,
action_map=mem_actions,
register_quota=True,
translate_name=True)
return resources
def get_extended_resources(self, version):
if version == "2.0":
return RESOURCE_ATTRIBUTE_MAP
else:
return {}
class WanQosPluginBase(object):
@abc.abstractmethod
def create_wan_qos(self, context, wan_qos):
pass
@abc.abstractmethod
def get_wan_qos(self, id):
pass
@abc.abstractmethod
def get_wan_qoss(self):
pass
@abc.abstractmethod
def update_wan_qos(self, context, id, wan_qos):
pass
@abc.abstractmethod
def delete_wan_qos(self, context, id):
pass

View File

@ -0,0 +1,98 @@
# Copyright 2016 Huawei corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.common import rpc as n_rpc
from neutron.db import agents_db
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import importutils
import oslo_messaging as messaging
from neutron.services import service_base
from wan_qos.common import api
from wan_qos.common import constants
from wan_qos.common import topics
from wan_qos.extensions import wanqos
LOG = logging.getLogger(__name__)
class PluginRpcCallback(object):
target = messaging.Target(version='1.0')
def __init__(self, plugin):
super(PluginRpcCallback, self).__init__()
self.plugin = plugin
LOG.debug('rpc callback started.')
def agent_up_notification(self, context, host):
LOG.debug('got up notification from %s' % host)
self.plugin.agent_up_notification(host)
class WanQosDriver(service_base.ServicePluginBase):
def get_plugin_description(self):
pass
def get_plugin_type(self):
pass
@property
def service_type(self):
return 'wan_qos'
class WanQosPlugin(wanqos.WanQosPluginBase):
def __init__(self):
rpc_callback = importutils.import_object(
'wan_qos.services.plugin.PluginRpcCallback', self)
endpoints = (
[rpc_callback, agents_db.AgentExtRpcCallback()])
self.agent_rpc = api.TcAgentApi(cfg.CONF.host)
self.conn = n_rpc.create_connection()
self.conn.create_consumer(topics.TC_PLUGIN,
endpoints,
fanout=False)
self.conn.consume_in_threads()
def get_plugin_type(self):
"""Get type of the plugin."""
return constants.WANQOS
def get_plugin_description(self):
"""Get description of the plugin."""
return 'Plugin for rate limiting on WAN links.'
def get_wan_qos(self, id):
pass
def get_wan_qoss(self):
pass
def delete_wan_qos(self, context, id):
pass
def update_wan_qos(self, context, id, wan_qos):
pass
def create_wan_qos(self, context, wan_qos):
pass
# self.agent_rpc.create_wan_qos(context, wan_qos)
def agent_up_notification(self, host):
LOG.debug('agent %s is up' % host)
return 'OK'

View File

@ -0,0 +1,39 @@
# Copyright 2016 Huawei corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
import sys
from oslo_config import cfg
from oslo_service import service
from neutron.agent.common import config
from neutron.common import config as common_config
from neutron import service as neutron_service
from wan_qos.common import topics
from wan_qos.services import plugin
def main():
common_config.init(sys.argv[1:])
config.setup_logging()
wanqos_plugin = plugin.WanQosPlugin()
while True:
time.sleep(3)
if __name__ == '__main__':
main()

View File

@ -1,42 +0,0 @@
# Copyright (c) 2016 Huawei, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import testtools
from neutron.tests import base
import wan_qos.agent.ovsdb.impl_idl as impl_idl
import wan_qos.agent.ovsdb.commands as cmd
class OvsdbIdlTestCase(base.BaseTestCase):
def setUp(self):
super(OvsdbIdlTestCase, self).setUp()
self.vsctl_timeout = 10
self.ovsdb_idl = impl_idl.OvsdbQosIdl(self, 'tcp:127.0.0.1:6640', 30)
self.ovsdb_idl.ovsdb_connection.start()
self.idl = self.ovsdb_idl.ovsdb_connection.idl
def test1(self):
assert self.ovsdb_idl.br_exists('tc-br').execute()==True
def get_queue_list(self):
print (cmd.GetQueueIdList(self.ovsdb_idl, 'enp1s0f1').execute())
def add_queue(self):
print (cmd.AddQueue(self.ovsdb_idl, 'enp1s0f1', 1, '1000000', '1000000')
.execute())

View File

@ -12,14 +12,40 @@
# License for the specific language governing permissions and limitations
# under the License.
from wan_qos.agent import tc_driver
import time
from neutron.tests import base
from oslo_config import cfg
from wan_qos.agent import tc_driver
from wan_qos.agent import tc_manager
from wan_qos.services import plugin
wanqos_group = cfg.OptGroup(name='WANQOS',
title='WAN QoS options')
opts = [
cfg.StrOpt('lan_port_name',
default='enp1s0f0',
help='LAN side port name'),
cfg.StrOpt('lan_max_rate',
default='100mbit',
help='LAN side port rate'),
cfg.StrOpt('wan_port_name',
default='enp1s0f1',
help='WAN side port name'),
cfg.StrOpt('wan_max_rate',
default='100mbit',
help='WAN side port rate')
]
class TestTcDriver(base.BaseTestCase):
def setUp(self):
super(TestTcDriver, self).setUp()
cfg.CONF.register_group(wanqos_group)
cfg.CONF.register_opts(opts, group='WANQOS')
self.tc_agent = tc_driver.TcDriver()
self.tc_agent.set_ports('enp1s0f0', 'enp1s0f1')
@ -112,3 +138,14 @@ class TestTcDriver(base.BaseTestCase):
'child': '10'
}
self.tc_agent.remove_traffic_limiter(tc_dict)
class TestApiMessages(base.BaseTestCase):
def setUp(self):
super(TestApiMessages, self).setUp()
cfg.CONF.register_group(wanqos_group)
cfg.CONF.register_opts(opts, group='WANQOS')
self.plugin = plugin.WanQosPlugin()