Merge remote-tracking branch 'origin/feature/qos' into merge-branch

Change-Id: I7f2342d62634f5b4af3a083cc1aaff46efe28519
This commit is contained in:
Ihar Hrachyshka 2015-07-07 15:37:58 +02:00
commit 2fed2617cd
48 changed files with 1915 additions and 6 deletions

View File

@ -2,3 +2,4 @@
host=review.openstack.org
port=29418
project=openstack/neutron.git
defaultbranch=feature/qos

View File

@ -47,8 +47,10 @@ Neutron Internals
plugin-api
db_layer
rpc_api
rpc_callbacks
layer3
l2_agents
quality_of_service
advanced_services
oslo-incubator
callbacks

View File

@ -0,0 +1,4 @@
Quality of Service
==================
TODO(QoS)

View File

@ -0,0 +1,229 @@
=================================
Neutron Messaging Callback System
=================================
Neutron already has a callback system [link-to: callbacks.rst] for
in-process resource callbacks where publishers and subscribers are able
to publish, subscribe and extend resources.
This system is different, and is intended to be used for inter-process
callbacks, via the messaging fanout mechanisms.
In Neutron, agents may need to subscribe to specific resource details which
may change over time. And the purpose of this messaging callback system
is to allow agent subscription to those resources without the need to extend
modify existing RPC calls, or creating new RPC messages.
A few resource which can benefit of this system:
* security groups members
* security group rules,
* QoS policies.
Using a remote publisher/subscriber pattern, the information about such
resources could be published using fanout queues to all interested nodes,
minimizing messaging requests from agents to server since the agents
get subscribed for their whole lifecycle (unless they unsubscribe).
Within an agent, there could be multiple subscriber callbacks to the same
resource events, the resources updates would be dispatched to the subscriber
callbacks from a single message. Any update would come in a single message,
doing only a single oslo versioned objects deserialization on each receiving
agent.
This publishing/subscription mechanism is highly dependent on the format
of the resources passed around. This is why the library only allows
versioned objects to be published and subscribed. Oslo versioned objects
allow object version down/up conversion. #[vo_mkcompat]_ #[vo_mkcptests]_
For the VO's versioning schema look here: #[vo_versioning]_
versioned_objects serialization/deserialization with the
obj_to_primitive(target_version=..) and primitive_to_obj() #[ov_serdes]_
methods is used internally to convert/retrieve objects before/after messaging.
Considering rolling upgrades, there are several scenarios to look at:
* publisher (generally neutron-server or a service) and subscriber (agent)
know the same version of the objects, so they serialize, and deserialize
without issues.
* publisher knows (and sends) an older version of the object, subscriber
will get the object updated to latest version on arrival before any
callback is called.
* publisher sends a newer version of the object, subscriber won't be able
to deserialize the object, in this case (PLEASE DISCUSS), we can think of two
strategies:
a) During upgrades, we pin neutron-server to a compatible version for resource
fanout updates, and server sends both the old, and the newer version to
different topic, queues. Old agents receive the updates on the old version
topic, new agents receive updates on the new version topic.
When the whole system upgraded, we un-pin the compatible version fanout.
A variant of this could be using a single fanout queue, and sending the
pinned version of the object to all. Newer agents can deserialize to the
latest version and upgrade any fields internally. Again at the end, we
unpin the version and restart the service.
b) The subscriber will rpc call the publisher to start publishing also a downgraded
version of the object on every update on a separate queue. The complication
of this version, is the need to ignore new version objects as long as we keep
receiving the downgraded ones, and otherwise resend the request to send the
downgraded objects after a certain timeout (thinking of the case where the
request for downgraded queue is done, but the publisher restarted).
This approach is more complicated to implement, but more automated from the
administrator point of view. We may want to look into it as a second step
from a
c) The subscriber will send a registry.get_info for the latest specific version
he knows off. This can have scalability issues during upgrade as any outdated
agent will require a flow of two messages (request, and response). This is
indeed very bad at scale if you have hundreds or thousands of agents.
Option a seems like a reasonable strategy, similar to what nova does now with
versioned objects.
Serialized versioned objects look like::
{'versioned_object.version': '1.0',
'versioned_object.name': 'QoSProfile',
'versioned_object.data': {'rules': [
{'versioned_object.version': '1.0',
'versioned_object.name': 'QoSRule',
'versioned_object.data': {'name': u'a'},
'versioned_object.namespace': 'versionedobjects'}
],
'uuid': u'abcde',
'name': u'aaa'},
'versioned_object.namespace': 'versionedobjects'}
Topic names for the fanout queues
=================================
if we adopted option a:
neutron-<resouce_type>_<resource_id>-<vo_version>
[neutron-<resouce_type>_<resource_id>-<vo_version_compat>]
if we adopted option b for rolling upgrades:
neutron-<resource_type>-<resource_id>
neutron-<resource_type>-<resource_id>-<vo_version>
for option c, just:
neutron-<resource_type>-<resource_id>
Subscribing to resources
========================
Imagine that you have agent A, which just got to handle a new port, which
has an associated security group, and QoS policy.
The agent code processing port updates may look like::
from neutron.rpc_resources import events
from neutron.rpc_resources import resources
from neutron.rpc_resources import registry
def process_resource_updates(resource_type, resource_id, resource_list, action_type):
# send to the right handler which will update any control plane
# details related to the updated resource...
def port_update(...):
# here we extract sg_id and qos_policy_id from port..
registry.subscribe(resources.SG_RULES, sg_id,
callback=process_resource_updates)
sg_rules = registry.get_info(resources.SG_RULES, sg_id)
registry.subscribe(resources.SG_MEMBERS, sg_id,
callback=process_resource_updates)
sg_members = registry.get_info(resources.SG_MEMBERS, sg_id)
registry.subscribe(resources.QOS_RULES, qos_policy_id,
callback=process_resource_updates)
qos_rules = registry.get_info(resources.QOS_RULES, qos_policy_id,
callback=process_resource_updates)
cleanup_subscriptions()
def cleanup_subscriptions()
sg_ids = determine_unreferenced_sg_ids()
qos_policy_id = determine_unreferenced_qos_policy_ids()
registry.unsubscribe_info(resource.SG_RULES, sg_ids)
registry.unsubscribe_info(resource.SG_MEMBERS, sg_ids)
registry.unsubscribe_info(resource.QOS_RULES, qos_policy_id)
Another unsubscription strategy could be to lazily unsubscribe resources when
we receive updates for them, and we discover that they are not needed anymore.
Deleted resources are automatically unsubscribed as we receive the delete event.
NOTE(irenab): this could be extended to core resources like ports, making use
of the standard neutron in-process callbacks at server side and propagating
AFTER_UPDATE events, for example, but we may need to wait until those callbacks
are used with proper versioned objects.
Unsubscribing to resources
==========================
There are a few options to unsubscribe registered callbacks:
* unsubscribe_resource_id(): it selectively unsubscribes an specific
resource type + id.
* unsubscribe_resource_type(): it unsubscribes from an specific resource type,
any ID.
* unsubscribe_all(): it unsubscribes all subscribed resources and ids.
Sending resource updates
========================
On the server side, resource updates could come from anywhere, a service plugin,
an extension, anything that updates the resource and that it's of any interest
to the agents.
The server/publisher side may look like::
from neutron.rpc_resources import events
from neutron.rpc_resources import resources
from neutron.rpc_resources import registry as rpc_registry
def add_qos_x_rule(...):
update_the_db(...)
send_rpc_updates_on_qos_policy(qos_policy_id)
def del_qos_x_rule(...):
update_the_db(...)
send_rpc_deletion_of_qos_policy(qos_policy_id)
def send_rpc_updates_on_qos_policy(qos_policy_id):
rules = get_qos_policy_rules_versioned_object(qos_policy_id)
rpc_registry.notify(resources.QOS_RULES, qos_policy_id, rules, events.UPDATE)
def send_rpc_deletion_of_qos_policy(qos_policy_id):
rpc_registry.notify(resources.QOS_RULES, qos_policy_id, None, events.DELETE)
# This part is added for the registry mechanism, to be able to request
# older versions of the notified objects if any oudated agent requires
# them.
def retrieve_older_version_callback(qos_policy_id, version):
return get_qos_policy_rules_versioned_object(qos_policy_id, version)
rpc_registry.register_retrieve_callback(resource.QOS_RULES,
retrieve_older_version_callback)
References
==========
.. [#ov_serdes] https://github.com/openstack/oslo.versionedobjects/blob/master/oslo_versionedobjects/tests/test_objects.py#L621
.. [#vo_mkcompat] https://github.com/openstack/oslo.versionedobjects/blob/master/oslo_versionedobjects/base.py#L460
.. [#vo_mkcptests] https://github.com/openstack/oslo.versionedobjects/blob/master/oslo_versionedobjects/tests/test_objects.py#L111
.. [#vo_versioning] https://github.com/openstack/oslo.versionedobjects/blob/master/oslo_versionedobjects/base.py#L236

View File

@ -75,7 +75,7 @@
# of its entrypoint name.
#
# service_plugins =
# Example: service_plugins = router,firewall,lbaas,vpnaas,metering
# Example: service_plugins = router,firewall,lbaas,vpnaas,metering,qos
# Paste configuration file
# api_paste_config = api-paste.ini

View File

@ -39,12 +39,14 @@
"get_network:provider:physical_network": "rule:admin_only",
"get_network:provider:segmentation_id": "rule:admin_only",
"get_network:queue_id": "rule:admin_only",
"get_network:qos_policy_id": "rule:admin_only",
"create_network:shared": "rule:admin_only",
"create_network:router:external": "rule:admin_only",
"create_network:segments": "rule:admin_only",
"create_network:provider:network_type": "rule:admin_only",
"create_network:provider:physical_network": "rule:admin_only",
"create_network:provider:segmentation_id": "rule:admin_only",
"create_network:qos_policy_id": "rule:admin_only",
"update_network": "rule:admin_or_owner",
"update_network:segments": "rule:admin_only",
"update_network:shared": "rule:admin_only",
@ -52,6 +54,7 @@
"update_network:provider:physical_network": "rule:admin_only",
"update_network:provider:segmentation_id": "rule:admin_only",
"update_network:router:external": "rule:admin_only",
"update_network:qos_policy_id": "rule:admin_only",
"delete_network": "rule:admin_or_owner",
"create_port": "",
@ -62,12 +65,14 @@
"create_port:binding:profile": "rule:admin_only",
"create_port:mac_learning_enabled": "rule:admin_or_network_owner or rule:context_is_advsvc",
"create_port:allowed_address_pairs": "rule:admin_or_network_owner",
"create_port:qos_policy_id": "rule:admin_only",
"get_port": "rule:admin_or_owner or rule:context_is_advsvc",
"get_port:queue_id": "rule:admin_only",
"get_port:binding:vif_type": "rule:admin_only",
"get_port:binding:vif_details": "rule:admin_only",
"get_port:binding:host_id": "rule:admin_only",
"get_port:binding:profile": "rule:admin_only",
"get_port:qos_policy_id": "rule:admin_only",
"update_port": "rule:admin_or_owner or rule:context_is_advsvc",
"update_port:mac_address": "rule:admin_only or rule:context_is_advsvc",
"update_port:fixed_ips": "rule:admin_or_network_owner or rule:context_is_advsvc",
@ -76,6 +81,7 @@
"update_port:binding:profile": "rule:admin_only",
"update_port:mac_learning_enabled": "rule:admin_or_network_owner or rule:context_is_advsvc",
"update_port:allowed_address_pairs": "rule:admin_or_network_owner",
"update_port:qos_policy_id": "rule:admin_only",
"delete_port": "rule:admin_or_owner or rule:context_is_advsvc",
"get_router:ha": "rule:admin_only",

View File

@ -488,6 +488,81 @@ class OVSBridge(BaseOVS):
txn.add(self.ovsdb.db_set('Controller',
controller_uuid, *attr))
def _create_qos_bw_limit_queue(self, port_name, max_bw_in_bits,
max_burst_in_bits):
external_ids = {'id': port_name}
queue_other_config = {'min-rate': max_bw_in_bits,
'max-rate': max_bw_in_bits,
'burst': max_burst_in_bits}
self.ovsdb.db_create(
'Queue', external_ids=external_ids,
other_config=queue_other_config).execute(check_error=True)
def _create_qos_bw_limit_profile(self, port_name, max_bw_in_bits):
external_ids = {'id': port_name}
queue = self.ovsdb.db_find(
'Queue',
('external_ids', '=', {'id': port_name}),
columns=['_uuid']).execute(
check_error=True)
queues = {}
queues[0] = queue[0]['_uuid']
qos_other_config = {'max-rate': max_bw_in_bits}
self.ovsdb.db_create('QoS', external_ids=external_ids,
other_config=qos_other_config,
type='linux-htb',
queues=queues).execute(check_error=True)
def create_qos_bw_limit_for_port(self, port_name, max_kbps,
max_burst_kbps):
# TODO(QoS) implement this with transactions,
# or roll back on failure
max_bw_in_bits = str(max_kbps * 1000)
max_burst_in_bits = str(max_burst_kbps * 1000)
self._create_qos_bw_limit_queue(port_name, max_bw_in_bits,
max_burst_in_bits)
self._create_qos_bw_limit_profile(port_name, max_bw_in_bits)
qos = self.ovsdb.db_find('QoS',
('external_ids', '=', {'id': port_name}),
columns=['_uuid']).execute(check_error=True)
qos_profile = qos[0]['_uuid']
self.set_db_attribute('Port', port_name, 'qos', qos_profile,
check_error=True)
def get_qos_bw_limit_for_port(self, port_name):
res = self.ovsdb.db_find(
'Queue',
('external_ids', '=', {'id': port_name}),
columns=['other_config']).execute(check_error=True)
if res is None or len(res) == 0:
return None, None
other_config = res[0]['other_config']
max_kbps = int(other_config['max-rate']) / 1000
max_burst_kbps = int(other_config['burst']) / 1000
return max_kbps, max_burst_kbps
def del_qos_bw_limit_for_port(self, port_name):
qos = self.ovsdb.db_find('QoS',
('external_ids', '=', {'id': port_name}),
columns=['_uuid']).execute(check_error=True)
qos_row = qos[0]['_uuid']
queue = self.ovsdb.db_find('Queue',
('external_ids', '=', {'id': port_name}),
columns=['_uuid']).execute(check_error=True)
queue_row = queue[0]['_uuid']
with self.ovsdb.transaction(check_error=True) as txn:
txn.add(self.ovsdb.db_set('Port', port_name, ('qos', [])))
txn.add(self.ovsdb.db_destroy('QoS', qos_row))
txn.add(self.ovsdb.db_destroy('Queue', queue_row))
def __enter__(self):
self.create()
return self

View File

View File

@ -0,0 +1,61 @@
# Copyright (c) 2015 Mellanox Technologies, Ltd
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class AgentCoreResourceExtension(object):
"""Define stable abstract interface for Agent extension.
An agent extension extends the agent core functionality.
"""
def initialize(self, resource_rpc):
"""Perform agent core resource extension initialization.
Called after all extensions have been loaded.
No abstract methods defined below will be
called prior to this method being called.
:param resource_rpc - the agent side rpc for getting
resource by type and id
"""
self.resource_rpc = resource_rpc
def handle_network(self, context, data):
"""handle agent extension for network.
:param context - rpc context
:param data - network data
"""
pass
def handle_subnet(self, context, data):
"""handle agent extension for subnet.
:param context - rpc context
:param data - subnet data
"""
pass
def handle_port(self, context, data):
"""handle agent extension for port.
:param context - rpc context
:param data - port data
"""
pass

View File

@ -0,0 +1,70 @@
# Copyright (c) 2015 Mellanox Technologies, Ltd
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
import stevedore
from neutron.i18n import _LE, _LI
LOG = log.getLogger(__name__)
# TODO(QoS) add unit tests to Agent extensions mgr
class AgentExtensionsManager(stevedore.named.NamedExtensionManager):
"""Manage agent extensions."""
def __init__(self, agent_extensions):
# Ordered list of agent extensions, defining
# the order in which the agent extensions are called.
LOG.info(_LI("Configured agent extensions names: %s"),
agent_extensions)
super(AgentExtensionsManager, self).__init__(
'neutron.agent.l2.extensions', agent_extensions,
invoke_on_load=True, name_order=True)
LOG.info(_LI("Loaded agent extensions names: %s"), self.names())
def _call_on_agent_extensions(self, method_name, context, data):
"""Helper method for calling a method across all agent extensions."""
for extension in self:
try:
getattr(extension.obj, method_name)(context, data)
# TODO(QoS) add agent extensions exception and catch them here
except AttributeError:
LOG.exception(
_LE("Agent Extension '%(name)s' failed in %(method)s"),
{'name': extension.name, 'method': method_name}
)
def initialize(self, resource_rpc):
# Initialize each agent extension in the list.
for extension in self:
LOG.info(_LI("Initializing agent extension '%s'"), extension.name)
extension.obj.initialize(resource_rpc)
def handle_network(self, context, data):
"""Notify all agent extensions to handle network."""
self._call_on_agent_extensions("handle_network", context, data)
def handle_subnet(self, context, data):
"""Notify all agent extensions to handle subnet."""
self._call_on_agent_extensions("handle_subnet", context, data)
def handle_port(self, context, data):
"""Notify all agent extensions to handle port."""
self._call_on_agent_extensions("handle_port", context, data)
#TODO(Qos) we are missing how to handle delete. we can pass action
#type in all the handle methods or add handle_delete_resource methods

View File

@ -0,0 +1,55 @@
# Copyright (c) 2015 Mellanox Technologies, Ltd
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
from neutron.agent.l2 import agent_extensions_manager
#TODO(QoS): add unit tests to L2 Agent
@six.add_metaclass(abc.ABCMeta)
class L2Agent(object):
"""Define stable abstract interface for L2 Agent
This class initialize the agent extension manager and
provides API for calling the extensions manager process
extensions methods.
"""
def __init__(self, polling_interval):
self.polling_interval = polling_interval
self.agent_extensions_mgr = None
self.resource_rpc = None
def initialize(self):
#TODO(QoS): get extensions from server ????
agent_extensions = ('qos', )
self.agent_extensions_mgr = (
agent_extensions_manager.AgentExtensionsManager(
agent_extensions))
self.agent_extensions_mgr.initialize(self.resource_rpc)
def process_network_extensions(self, context, network):
self.agent_extensions_mgr.handle_network(
context, network)
def process_subnet_extensions(self, context, subnet):
self.agent_extensions_mgr.handle_subnet(
context, subnet)
def process_port_extensions(self, context, port):
self.agent_extensions_mgr.handle_port(
context, port)

View File

@ -161,6 +161,29 @@ class API(object):
:returns: :class:`Command` with field value result
"""
@abc.abstractmethod
def db_create(self, table, **col_values):
"""Create a command to create new record
:param table: The OVS table containing the record to be created
:type table: string
:param col_values: The columns and their associated values
to be set after create
:type col_values: Dictionary of columns id's and values
:returns: :class:`Command` with no result
"""
@abc.abstractmethod
def db_destroy(self, table, record):
"""Create a command to destroy a record
:param table: The OVS table containing the record to be destroyed
:type table: string
:param record: The record id (name/uuid) to be destroyed
:type record: uuid/string
:returns: :class:`Command` with no result
"""
@abc.abstractmethod
def db_set(self, table, record, *col_values):
"""Create a command to set fields in a record

View File

@ -169,6 +169,12 @@ class OvsdbIdl(api.API):
def br_set_external_id(self, name, field, value):
return cmd.BrSetExternalIdCommand(self, name, field, value)
def db_create(self, table, **col_values):
return cmd.DbCreateCommand(self, table, **col_values)
def db_destroy(self, table, record):
return cmd.DbDestroyCommand(self, table, record)
def db_set(self, table, record, *col_values):
return cmd.DbSetCommand(self, table, record, *col_values)

View File

@ -184,6 +184,15 @@ class OvsdbVsctl(ovsdb.API):
return BaseCommand(self.context, 'br-get-external-id',
args=[name, field])
def db_create(self, table, **col_values):
args = [table]
args += _set_colval_args(*col_values.items())
return BaseCommand(self.context, 'create', args=args)
def db_destroy(self, table, record):
args = [table, record]
return BaseCommand(self.context, 'destroy', args=args)
def db_set(self, table, record, *col_values):
args = [table, record]
args += _set_colval_args(*col_values)
@ -256,8 +265,11 @@ def _set_colval_args(*col_values):
col, k, op, ovsdb.py_to_val(v)) for k, v in val.items()]
elif (isinstance(val, collections.Sequence)
and not isinstance(val, six.string_types)):
args.append(
"%s%s%s" % (col, op, ",".join(map(ovsdb.py_to_val, val))))
if len(val) == 0:
args.append("%s%s%s" % (col, op, "[]"))
else:
args.append(
"%s%s%s" % (col, op, ",".join(map(ovsdb.py_to_val, val))))
else:
args.append("%s%s%s" % (col, op, ovsdb.py_to_val(val)))
return args

View File

@ -148,6 +148,30 @@ class BrSetExternalIdCommand(BaseCommand):
br.external_ids = external_ids
class DbCreateCommand(BaseCommand):
def __init__(self, api, table, **columns):
super(DbCreateCommand, self).__init__(api)
self.table = table
self.columns = columns
def run_idl(self, txn):
row = txn.insert(self.api._tables[self.table])
for col, val in self.columns.items():
setattr(row, col, val)
self.result = row
class DbDestroyCommand(BaseCommand):
def __init__(self, api, table, record):
super(DbDestroyCommand, self).__init__(api)
self.table = table
self.record = record
def run_idl(self, txn):
record = idlutils.row_by_record(self.api.idl, self.table, self.record)
record.delete()
class DbSetCommand(BaseCommand):
def __init__(self, api, table, record, *col_values):
super(DbSetCommand, self).__init__(api)

View File

View File

@ -0,0 +1,19 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
UPDATED = 'updated'
DELETED = 'deleted'
VALID = (
UPDATED,
DELETED
)

View File

@ -0,0 +1,68 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.api.rpc.callbacks import resource_manager
# TODO(ajo): consider adding locking
CALLBACK_MANAGER = None
def _get_resources_callback_manager():
global CALLBACK_MANAGER
if CALLBACK_MANAGER is None:
CALLBACK_MANAGER = resource_manager.ResourcesCallbacksManager()
return CALLBACK_MANAGER
#resource implementation callback registration functions
def get_info(resource_type, resource_id, **kwargs):
"""Get information about resource type with resource id.
The function will check the providers for an specific remotable
resource and get the resource.
:returns: an oslo versioned object.
"""
callback = _get_resources_callback_manager().get_callback(resource_type)
if callback:
return callback(resource_type, resource_id, **kwargs)
def register_provider(callback, resource_type):
_get_resources_callback_manager().register(callback, resource_type)
# resource RPC callback for pub/sub
#Agent side
def subscribe(callback, resource_type, resource_id):
#TODO(QoS): we have to finish the real update notifications
raise NotImplementedError("we should finish update notifications")
def unsubscribe(callback, resource_type, resource_id):
#TODO(QoS): we have to finish the real update notifications
raise NotImplementedError("we should finish update notifications")
def unsubscribe_all():
#TODO(QoS): we have to finish the real update notifications
raise NotImplementedError("we should finish update notifications")
#Server side
def notify(resource_type, event, obj):
#TODO(QoS): we have to finish the real update notifications
raise NotImplementedError("we should finish update notifications")
def clear():
_get_resources_callback_manager().clear()

View File

@ -0,0 +1,69 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from oslo_log import log as logging
from neutron.api.rpc.callbacks import resources
from neutron.callbacks import exceptions
LOG = logging.getLogger(__name__)
class ResourcesCallbacksManager(object):
"""A callback system that allows information providers in a loose manner.
"""
def __init__(self):
self.clear()
def register(self, callback, resource):
"""register callback for a resource .
One callback can be register to a resource
:param callback: the callback. It must raise or return a dict.
:param resource: the resource. It must be a valid resource.
"""
LOG.debug("register: %(callback)s %(resource)s",
{'callback': callback, 'resource': resource})
if resource not in resources.VALID:
raise exceptions.Invalid(element='resource', value=resource)
self._callbacks[resource] = callback
def unregister(self, resource):
"""Unregister callback from the registry.
:param callback: the callback.
:param resource: the resource.
"""
LOG.debug("Unregister: %(resource)s",
{'resource': resource})
if resource not in resources.VALID:
raise exceptions.Invalid(element='resource', value=resource)
self._callbacks[resource] = None
def clear(self):
"""Brings the manager to a clean slate."""
self._callbacks = collections.defaultdict(dict)
def get_callback(self, resource):
"""Return the callback if found, None otherwise.
:param resource: the resource. It must be a valid resource.
"""
if resource not in resources.VALID:
raise exceptions.Invalid(element='resource', value=resource)
return self._callbacks[resource]

View File

@ -0,0 +1,19 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
QOS_POLICY = 'qos-policy'
QOS_RULE = 'qos-rule'
VALID = (
QOS_POLICY,
QOS_RULE,
)

View File

@ -0,0 +1,71 @@
# Copyright (c) 2015 Mellanox Technologies, Ltd
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import helpers as log_helpers
from oslo_log import log as logging
import oslo_messaging
from neutron.api.rpc.callbacks import registry
from neutron.common import constants
from neutron.common import rpc as n_rpc
from neutron.common import topics
LOG = logging.getLogger(__name__)
class ResourcesServerRpcApi(object):
"""Agent-side RPC (stub) for agent-to-plugin interaction.
This class implements the client side of an rpc interface. The server side
can be found below: ResourcesServerRpcCallback. For more information on
changing rpc interfaces, see doc/source/devref/rpc_api.rst.
"""
def __init__(self):
target = oslo_messaging.Target(
topic=topics.PLUGIN, version='1.0',
namespace=constants.RPC_NAMESPACE_RESOURCES)
self.client = n_rpc.get_client(target)
@log_helpers.log_method_call
def get_info(self, context, resource_type, resource_id):
cctxt = self.client.prepare()
#TODO(Qos): add deserialize version object
return cctxt.call(context, 'get_info',
resource_type=resource_type, resource_id=resource_id)
class ResourcesServerRpcCallback(object):
"""Plugin-side RPC (implementation) for agent-to-plugin interaction.
This class implements the server side of an rpc interface. The client side
can be found above: ResourcesServerRpcApi. For more information on
changing rpc interfaces, see doc/source/devref/rpc_api.rst.
"""
# History
# 1.0 Initial version
target = oslo_messaging.Target(
version='1.0', namespace=constants.RPC_NAMESPACE_RESOURCES)
def get_info(self, context, resource_type, resource_id):
kwargs = {'context': context}
#TODO(Qos): add serialize version object
return registry.get_info(
resource_type,
resource_id,
**kwargs)

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
NETWORK = 'network'
PORT = 'port'
ROUTER = 'router'
ROUTER_GATEWAY = 'router_gateway'
@ -19,6 +20,7 @@ SECURITY_GROUP_RULE = 'security_group_rule'
SUBNET = 'subnet'
VALID = (
NETWORK,
PORT,
ROUTER,
ROUTER_GATEWAY,

View File

@ -174,6 +174,8 @@ RPC_NAMESPACE_SECGROUP = None
RPC_NAMESPACE_DVR = None
# RPC interface for reporting state back to the plugin
RPC_NAMESPACE_STATE = None
# RPC interface for agent to plugin resources API
RPC_NAMESPACE_RESOURCES = None
# Default network MTU value when not configured
DEFAULT_NETWORK_MTU = 0

View File

@ -19,6 +19,7 @@ PORT = 'port'
SECURITY_GROUP = 'security_group'
L2POPULATION = 'l2population'
DVR = 'dvr'
RESOURCES = 'resources'
CREATE = 'create'
DELETE = 'delete'

View File

@ -19,9 +19,12 @@ import six
from oslo_config import cfg
from oslo_db import exception as os_db_exception
from oslo_db.sqlalchemy import session
from oslo_utils import uuidutils
from sqlalchemy import exc
from sqlalchemy import orm
from neutron.db import common_db_mixin
_FACADE = None
@ -85,3 +88,48 @@ class convert_db_exception_to_retry(object):
except self.to_catch as e:
raise os_db_exception.RetryRequest(e)
return wrapper
# Common database operation implementations
# TODO(QoS): consider handling multiple objects found, or no objects at all
# TODO(QoS): consider changing the name and making it public, officially
def _find_object(context, model, *kwargs):
with context.session.begin(subtransactions=True):
return (common_db_mixin.model_query(context, model)
.filter_by(**kwargs)
.first())
def get_object(context, model, id):
with context.session.begin(subtransactions=True):
return (common_db_mixin.model_query(context, model)
.filter_by(id=id)
.one())
def get_objects(context, model):
with context.session.begin(subtransactions=True):
return common_db_mixin.model_query(context, model).all()
def create_object(context, model, values):
with context.session.begin(subtransactions=True):
if 'id' not in values:
values['id'] = uuidutils.generate_uuid()
db_obj = model(**values)
context.session.add(db_obj)
return db_obj.__dict__
def update_object(context, model, id, values):
with context.session.begin(subtransactions=True):
db_obj = get_object(context, model, id)
db_obj.update(values)
db_obj.save(session=context.session)
return db_obj.__dict__
def delete_object(context, model, id):
with context.session.begin(subtransactions=True):
db_obj = get_object(context, model, id)
db_obj.delete()

View File

@ -0,0 +1,79 @@
# Copyright 2015 Huawei Technologies India Pvt Ltd, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""qos db changes
Revision ID: 48153cb5f051
Revises: 599c6a226151
Create Date: 2015-06-24 17:03:34.965101
"""
# revision identifiers, used by Alembic.
revision = '48153cb5f051'
down_revision = '52c5312f6baf'
from alembic import op
import sqlalchemy as sa
from neutron.api.v2 import attributes as attrs
def upgrade():
op.create_table(
'qos_policies',
sa.Column('id', sa.String(length=36), primary_key=True),
sa.Column('name', sa.String(length=attrs.NAME_MAX_LEN)),
sa.Column('description', sa.String(length=attrs.DESCRIPTION_MAX_LEN)),
sa.Column('shared', sa.Boolean()),
sa.Column('tenant_id', sa.String(length=attrs.TENANT_ID_MAX_LEN),
index=True))
op.create_table(
'qos_network_policy_bindings',
sa.Column('policy_id', sa.String(length=36),
sa.ForeignKey('qos_policies.id', ondelete='CASCADE'),
nullable=False),
sa.Column('network_id', sa.String(length=36),
sa.ForeignKey('networks.id', ondelete='CASCADE'),
nullable=False, unique=True))
op.create_table(
'qos_port_policy_bindings',
sa.Column('policy_id', sa.String(length=36),
sa.ForeignKey('qos_policies.id', ondelete='CASCADE'),
nullable=False),
sa.Column('port_id', sa.String(length=36),
sa.ForeignKey('ports.id', ondelete='CASCADE'),
nullable=False, unique=True))
op.create_table(
'qos_rules',
sa.Column('id', sa.String(length=36), primary_key=True),
sa.Column('qos_policy_id', sa.String(length=36),
sa.ForeignKey('qos_policies.id', ondelete='CASCADE'),
nullable=False),
sa.Column('type', sa.String(length=255)),
sa.Column('tenant_id', sa.String(length=attrs.TENANT_ID_MAX_LEN),
index=True))
op.create_table(
'qos_bandwidth_limit_rules',
sa.Column('qos_rule_id', sa.String(length=36),
sa.ForeignKey('qos_rules.id', ondelete='CASCADE'),
nullable=False,
primary_key=True),
sa.Column('max_kbps', sa.Integer()),
sa.Column('max_burst_kbps', sa.Integer()))

View File

@ -1 +1 @@
52c5312f6baf
48153cb5f051

View File

@ -39,6 +39,7 @@ from neutron.db import model_base
from neutron.db import models_v2 # noqa
from neutron.db import portbindings_db # noqa
from neutron.db import portsecurity_db # noqa
from neutron.db.qos import models as qos_models # noqa
from neutron.db import quota_db # noqa
from neutron.db import securitygroups_db # noqa
from neutron.db import servicetype_db # noqa

View File

27
neutron/db/qos/api.py Normal file
View File

@ -0,0 +1,27 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.db.qos import models
def create_policy_network_binding(context, policy_id, network_id):
with context.session.begin(subtransactions=True):
db_obj = models.QosNetworkPolicyBinding(policy_id=policy_id,
network_id=network_id)
context.session.add(db_obj)
def create_policy_port_binding(context, policy_id, port_id):
with context.session.begin(subtransactions=True):
db_obj = models.QosPortPolicyBinding(policy_id=policy_id,
port_id=port_id)
context.session.add(db_obj)

81
neutron/db/qos/models.py Executable file
View File

@ -0,0 +1,81 @@
# Copyright 2015 Huawei Technologies India Pvt Ltd, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
import sqlalchemy as sa
from neutron.api.v2 import attributes as attrs
from neutron.db import model_base
from neutron.db import models_v2
LOG = logging.getLogger(__name__)
class QosPolicy(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
__tablename__ = 'qos_policies'
name = sa.Column(sa.String(attrs.NAME_MAX_LEN))
description = sa.Column(sa.String(attrs.DESCRIPTION_MAX_LEN))
shared = sa.Column(sa.Boolean)
class QosNetworkPolicyBinding(model_base.BASEV2):
__tablename__ = 'qos_network_policy_bindings'
policy_id = sa.Column(sa.String(36),
sa.ForeignKey('qos_policies.id',
ondelete='CASCADE'),
nullable=False,
primary_key=True)
network_id = sa.Column(sa.String(36),
sa.ForeignKey('networks.id',
ondelete='CASCADE'),
nullable=False,
unique=True,
primary_key=True)
class QosPortPolicyBinding(model_base.BASEV2):
__tablename__ = 'qos_port_policy_bindings'
policy_id = sa.Column(sa.String(36),
sa.ForeignKey('qos_policies.id',
ondelete='CASCADE'),
nullable=False,
primary_key=True)
port_id = sa.Column(sa.String(36),
sa.ForeignKey('ports.id',
ondelete='CASCADE'),
nullable=False,
unique=True,
primary_key=True)
class QosRule(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
__tablename__ = 'qos_rules'
type = sa.Column(sa.String(255))
qos_policy_id = sa.Column(sa.String(36),
sa.ForeignKey('qos_policies.id',
ondelete='CASCADE'),
nullable=False)
class QosBandwidthLimitRule(QosRule):
__tablename__ = 'qos_bandwidth_limit_rules'
max_kbps = sa.Column(sa.Integer)
max_burst_kbps = sa.Column(sa.Integer)
qos_rule_id = sa.Column(sa.String(36),
sa.ForeignKey('qos_rules.id',
ondelete='CASCADE'),
nullable=False,
primary_key=True)

235
neutron/extensions/qos.py Normal file
View File

@ -0,0 +1,235 @@
# Copyright (c) 2015 Red Hat Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import itertools
import six
from neutron.api import extensions
from neutron.api.v2 import attributes as attr
from neutron.api.v2 import base
from neutron.api.v2 import resource_helper
from neutron import manager
from neutron.plugins.common import constants
from neutron.services import service_base
QOS_PREFIX = "/qos"
RULE_TYPE_BANDWIDTH_LIMIT = 'bandwidth_limit'
VALID_RULE_TYPES = [RULE_TYPE_BANDWIDTH_LIMIT]
# Attribute Map
QOS_RULE_COMMON_FIELDS = {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
'is_visible': True,
'primary_key': True},
'type': {'allow_post': True, 'allow_put': True, 'is_visible': True,
'default': '',
'validate': {'type:values': VALID_RULE_TYPES}},
'tenant_id': {'allow_post': True, 'allow_put': False,
'required_by_policy': True,
'is_visible': True},
}
RESOURCE_ATTRIBUTE_MAP = {
'policies': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
'is_visible': True, 'primary_key': True},
'name': {'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': '',
'validate': {'type:string': None}},
'description': {'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': '',
'validate': {'type:string': None}},
'shared': {'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': False,
'convert_to': attr.convert_to_boolean},
'tenant_id': {'allow_post': True, 'allow_put': False,
'required_by_policy': True,
'is_visible': True}
}
}
SUB_RESOURCE_ATTRIBUTE_MAP = {
'bandwidth_limit_rules': {
'parent': {'collection_name': 'policies',
'member_name': 'policy'},
'parameters': dict(QOS_RULE_COMMON_FIELDS,
**{'max_kbps': {
'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': None,
'validate': {'type:non_negative': None}},
'max_burst_kbps': {
'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': 0,
'validate': {'type:non_negative': None}}})
}
}
QOS_POLICY_ID = "qos_policy_id"
EXTENDED_ATTRIBUTES_2_0 = {
'ports': {QOS_POLICY_ID: {'allow_post': True,
'allow_put': True,
'is_visible': True,
'default': None,
'validate': {'type:uuid_or_none': None}}},
'networks': {QOS_POLICY_ID: {'allow_post': True,
'allow_put': True,
'is_visible': True,
'default': None,
'validate': {'type:uuid_or_none': None}}}}
class Qos(extensions.ExtensionDescriptor):
"""Quality of service API extension."""
@classmethod
def get_name(cls):
return "qos"
@classmethod
def get_alias(cls):
return "qos"
@classmethod
def get_namespace(cls):
#TODO(QoS): Remove, there's still a caller using it for log/debug
# which will crash otherwise
return None
@classmethod
def get_description(cls):
return "The Quality of Service extension."
@classmethod
def get_updated(cls):
return "2015-06-08T10:00:00-00:00"
@classmethod
def get_plugin_interface(cls):
return QoSPluginBase
@classmethod
def get_resources(cls):
"""Returns Ext Resources."""
special_mappings = {'policies': 'policy'}
plural_mappings = resource_helper.build_plural_mappings(
special_mappings, itertools.chain(RESOURCE_ATTRIBUTE_MAP,
SUB_RESOURCE_ATTRIBUTE_MAP))
attr.PLURALS.update(plural_mappings)
resources = resource_helper.build_resource_info(
plural_mappings,
RESOURCE_ATTRIBUTE_MAP,
constants.QOS,
translate_name=True,
allow_bulk=True)
plugin = manager.NeutronManager.get_service_plugins()[constants.QOS]
for collection_name in SUB_RESOURCE_ATTRIBUTE_MAP:
resource_name = collection_name[:-1]
parent = SUB_RESOURCE_ATTRIBUTE_MAP[collection_name].get('parent')
params = SUB_RESOURCE_ATTRIBUTE_MAP[collection_name].get(
'parameters')
controller = base.create_resource(collection_name, resource_name,
plugin, params,
allow_bulk=True,
parent=parent,
allow_pagination=True,
allow_sorting=True)
resource = extensions.ResourceExtension(
collection_name,
controller, parent,
path_prefix=QOS_PREFIX,
attr_map=params)
resources.append(resource)
return resources
def update_attributes_map(self, attributes, extension_attrs_map=None):
super(Qos, self).update_attributes_map(
attributes, extension_attrs_map=RESOURCE_ATTRIBUTE_MAP)
def get_extended_resources(self, version):
if version == "2.0":
return dict(EXTENDED_ATTRIBUTES_2_0.items() +
RESOURCE_ATTRIBUTE_MAP.items())
else:
return {}
@six.add_metaclass(abc.ABCMeta)
class QoSPluginBase(service_base.ServicePluginBase):
def get_plugin_description(self):
"""returns string description of the plugin."""
return "QoS Service Plugin for ports and networks"
def get_plugin_type(self):
return constants.QOS
@abc.abstractmethod
def get_policy(self, context, policy_id, fields=None):
pass
@abc.abstractmethod
def get_policies(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
pass
@abc.abstractmethod
def create_policy(self, context, policy):
pass
@abc.abstractmethod
def update_policy(self, context, policy_id, policy):
pass
@abc.abstractmethod
def delete_policy(self, context, policy_id):
pass
@abc.abstractmethod
def get_policy_bandwidth_limit_rule(self, context, rule_id,
policy_id, fields=None):
pass
@abc.abstractmethod
def get_policy_bandwidth_limit_rules(self, context, policy_id,
filters=None, fields=None,
sorts=None, limit=None,
marker=None, page_reverse=False):
pass
@abc.abstractmethod
def create_policy_bandwidth_limit_rule(self, context, policy_id,
bandwidth_limit_rule):
pass
@abc.abstractmethod
def update_policy_bandwidth_limit_rule(self, context, rule_id,
policy_id, bandwidth_limit_rule):
pass
@abc.abstractmethod
def delete_policy_bandwidth_limit_rule(self, context, rule_id, policy_id):
pass

View File

62
neutron/objects/base.py Normal file
View File

@ -0,0 +1,62 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from oslo_versionedobjects import base as obj_base
import six
from neutron.db import api as db_api
# TODO(QoS): revisit dict compatibility and how we can isolate dict behavior
@six.add_metaclass(abc.ABCMeta)
class NeutronObject(obj_base.VersionedObject,
obj_base.VersionedObjectDictCompat):
# should be overridden for all persistent objects
db_model = None
def from_db_object(self, *objs):
for field in self.fields:
for db_obj in objs:
if field in db_obj:
setattr(self, field, db_obj[field])
break
self.obj_reset_changes()
@classmethod
def get_by_id(cls, context, id):
db_obj = db_api.get_object(context, cls.db_model, id)
return cls(context, **db_obj)
@classmethod
def get_objects(cls, context):
db_objs = db_api.get_objects(context, cls.db_model)
objs = [cls(context, **db_obj) for db_obj in db_objs]
return objs
def create(self):
fields = self.obj_get_changes()
db_obj = db_api.create_object(self._context, self.db_model, fields)
self.from_db_object(db_obj)
def update(self):
updates = self.obj_get_changes()
db_obj = db_api.update_object(self._context, self.db_model,
self.id, updates)
self.from_db_object(self, db_obj)
def delete(self):
db_api.delete_object(self._context, self.db_model, self.id)

View File

View File

@ -0,0 +1,77 @@
# Copyright 2015 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_versionedobjects import base as obj_base
from oslo_versionedobjects import fields as obj_fields
from neutron.db import api as db_api
from neutron.db.qos import api as qos_db_api
from neutron.db.qos import models as qos_db_model
from neutron.objects import base
# TODO(QoS): add rule lists to object fields
# TODO(QoS): implement something for binding networks and ports with policies
@obj_base.VersionedObjectRegistry.register
class QosPolicy(base.NeutronObject):
db_model = qos_db_model.QosPolicy
port_binding_model = qos_db_model.QosPortPolicyBinding
network_binding_model = qos_db_model.QosNetworkPolicyBinding
fields = {
'id': obj_fields.UUIDField(),
'tenant_id': obj_fields.UUIDField(),
'name': obj_fields.StringField(),
'description': obj_fields.StringField(),
'shared': obj_fields.BooleanField()
}
@classmethod
def _get_object_policy(cls, context, model, **kwargs):
# TODO(QoS): we should make sure we use public functions
binding_db_obj = db_api._find_object(context, model, **kwargs)
# TODO(QoS): rethink handling missing binding case
if binding_db_obj:
return cls.get_by_id(context, binding_db_obj['policy_id'])
@classmethod
def get_network_policy(cls, context, network_id):
return cls._get_object_policy(context, cls.network_binding_model,
network_id=network_id)
@classmethod
def get_port_policy(cls, context, port_id):
return cls._get_object_policy(context, cls.port_binding_model,
port_id=port_id)
def attach_network(self, network_id):
qos_db_api.create_policy_network_binding(policy_id=self.id,
network_id=network_id)
def attach_port(self, port_id):
qos_db_api.create_policy_port_binding(policy_id=self.id,
port_id=port_id)
def detach_network(self, network_id):
# TODO(QoS): implement it, in the next life maybe
pass
def detach_port(self, port_id):
# TODO(QoS): implement it, in the next life maybe
pass

110
neutron/objects/qos/rule.py Normal file
View File

@ -0,0 +1,110 @@
# Copyright 2015 Huawei Technologies India Pvt Ltd, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from oslo_versionedobjects import base as obj_base
from oslo_versionedobjects import fields as obj_fields
import six
from neutron.db import api as db_api
from neutron.db.qos import models as qos_db_model
from neutron.objects import base
@six.add_metaclass(abc.ABCMeta)
class QosRule(base.NeutronObject):
base_db_model = qos_db_model.QosRule
fields = {
'id': obj_fields.UUIDField(),
'tenant_id': obj_fields.UUIDField(),
'type': obj_fields.StringField(),
'qos_policy_id': obj_fields.UUIDField()
}
_core_fields = list(fields.keys())
@classmethod
def _is_core_field(cls, field):
return field in cls._core_fields
@staticmethod
def _filter_fields(fields, func):
return {
key: val for key, val in fields.items()
if func(key)
}
# TODO(QoS): reimplement get_by_id to merge both core and addn fields
def _get_changed_core_fields(self):
fields = self.obj_get_changes()
return self._filter_fields(fields, self._is_core_field)
def _get_changed_addn_fields(self):
fields = self.obj_get_changes()
return self._filter_fields(
fields, lambda key: not self._is_core_field(key))
# TODO(QoS): create and update are not transactional safe
def create(self):
# create base qos_rule
core_fields = self._get_changed_core_fields()
base_db_obj = db_api.create_object(
self._context, self.base_db_model, core_fields)
# create type specific qos_..._rule
addn_fields = self._get_changed_addn_fields()
addn_fields['qos_rule_id'] = base_db_obj.id
addn_db_obj = db_api.create_object(
self._context, self.db_model, addn_fields)
# merge two db objects into single neutron one
self.from_db_object(self._context, self, base_db_obj, addn_db_obj)
def update(self):
updated_db_objs = []
# update base qos_rule, if needed
core_fields = self._get_changed_core_fields()
if core_fields:
base_db_obj = db_api.create_object(
self._context, self.base_db_model, core_fields)
updated_db_objs.append(base_db_obj)
addn_fields = self._get_changed_addn_fields()
if addn_fields:
addn_db_obj = db_api.update_object(
self._context, self.base_db_model, self.id, addn_fields)
updated_db_objs.append(addn_db_obj)
# update neutron object with values from both database objects
self.from_db_object(self._context, self, *updated_db_objs)
# delete is the same, additional rule object cleanup is done thru cascading
@obj_base.VersionedObjectRegistry.register
class QosBandwidthLimitRule(QosRule):
db_model = qos_db_model.QosBandwidthLimitRule
fields = {
'max_kbps': obj_fields.IntegerField(),
'max_burst_kbps': obj_fields.IntegerField()
}

View File

@ -22,6 +22,7 @@ FIREWALL = "FIREWALL"
VPN = "VPN"
METERING = "METERING"
L3_ROUTER_NAT = "L3_ROUTER_NAT"
QOS = "QOS"
# Maps extension alias to service type
EXT_TO_SERVICE_MAPPING = {
@ -31,7 +32,8 @@ EXT_TO_SERVICE_MAPPING = {
'fwaas': FIREWALL,
'vpnaas': VPN,
'metering': METERING,
'router': L3_ROUTER_NAT
'router': L3_ROUTER_NAT,
'qos': QOS,
}
# Service operation status constants

View File

@ -31,6 +31,7 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import dvr_rpc
from neutron.api.rpc.handlers import metadata_rpc
from neutron.api.rpc.handlers import resources_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.api.v2 import attributes
from neutron.callbacks import events
@ -153,7 +154,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
dvr_rpc.DVRServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(),
agents_db.AgentExtRpcCallback(),
metadata_rpc.MetadataRpcCallback()
metadata_rpc.MetadataRpcCallback(),
resources_rpc.ResourcesServerRpcCallback()
]
def _setup_dhcp(self):
@ -621,6 +623,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def create_network(self, context, network):
result, mech_context = self._create_network_with_retries(context,
network)
self._notify_registry(
resources.NETWORK, events.AFTER_CREATE, context, result)
try:
self.mechanism_manager.create_network_postcommit(mech_context)
except ml2_exc.MechanismDriverError:
@ -633,6 +637,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def create_network_bulk(self, context, networks):
objects = self._create_bulk_ml2(attributes.NETWORK, context, networks)
for obj in objects:
self._notify_registry(resources.NETWORK,
events.AFTER_CREATE,
context,
obj)
return [obj['result'] for obj in objects]
def update_network(self, context, id, network):
@ -655,6 +665,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
original_network=original_network)
self.mechanism_manager.update_network_precommit(mech_context)
# Notifications must be sent after the above transaction is complete
self._notify_registry(
resources.NETWORK, events.AFTER_UPDATE, context, updated_network)
# TODO(apech) - handle errors raised by update_network, potentially
# by re-calling update_network with the previous attributes. For
# now the error is propogated to the caller, which is expected to
@ -1499,3 +1513,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
if port:
return port.id
return device
def _notify_registry(self, resource_type, event_type, context, resource):
kwargs = {
'context': context,
resource_type: resource,
}
registry.notify(resource_type, event_type, self, **kwargs)

View File

View File

@ -0,0 +1,177 @@
# Copyright (c) 2015 Red Hat Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron import manager
from neutron.api.rpc.callbacks import registry as rpc_registry
from neutron.api.rpc.callbacks import resources
from neutron.extensions import qos
from neutron.i18n import _LW
from neutron.plugins.common import constants
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
#TODO(QoS): remove this stub when db is ready
def _get_qos_policy_cb_stub(resource, policy_id, **kwargs):
"""Hardcoded stub for testing until we get the db working."""
qos_policy = {
"tenant_id": "8d4c70a21fed4aeba121a1a429ba0d04",
"id": "46ebaec0-0570-43ac-82f6-60d2b03168c4",
"name": "10Mbit",
"description": "This policy limits the ports to 10Mbit max.",
"shared": False,
"rules": [{
"id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793",
"max_kbps": "10000",
"max_burst_kbps": "0",
"type": "bandwidth_limit"
}]
}
return qos_policy
def _get_qos_policy_cb(resource, policy_id, **kwargs):
qos_plugin = manager.NeutronManager.get_service_plugins().get(
constants.QOS)
context = kwargs.get('context')
if context is None:
LOG.warning(_LW(
'Received %(resource)s %(policy_id)s without context'),
{'resource': resource, 'policy_id': policy_id}
)
return
qos_policy = qos_plugin.get_qos_policy(context, policy_id)
return qos_policy
#TODO(QoS): remove this stub when db is ready
def _get_qos_bandwidth_limit_rule_cb_stub(resource, rule_id, **kwargs):
"""Hardcoded for testing until we get the db working."""
bandwidth_limit = {
"id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793",
"qos_policy_id": "46ebaec0-0570-43ac-82f6-60d2b03168c4",
"max_kbps": "10000",
"max_burst_kbps": "0",
}
return bandwidth_limit
def _get_qos_bandwidth_limit_rule_cb(resource, rule_id, **kwargs):
qos_plugin = manager.NeutronManager.get_service_plugins().get(
constants.QOS)
context = kwargs.get('context')
if context is None:
LOG.warning(_LW(
'Received %(resource)s %(rule_id,)s without context '),
{'resource': resource, 'rule_id,': rule_id}
)
return
bandwidth_limit = qos_plugin.get_qos_bandwidth_limit_rule(
context,
rule_id)
return bandwidth_limit
class QoSPlugin(qos.QoSPluginBase):
"""Implementation of the Neutron QoS Service Plugin.
This class implements a Quality of Service plugin that
provides quality of service parameters over ports and
networks.
"""
supported_extension_aliases = ['qos']
def __init__(self):
super(QoSPlugin, self).__init__()
self.register_resource_providers()
#self.register_port_callbacks()
#self.register_net_callbacks()
self._inline_test()
def _inline_test(self):
#TODO(gampel) remove inline unitesting
self.ctx = None
kwargs = {'context': self.ctx}
qos_policy = rpc_registry.get_info(
resources.QOS_POLICY,
"46ebaec0-0570-43ac-82f6-60d2b03168c4",
**kwargs)
LOG.debug("qos_policy test : %s)",
qos_policy)
def register_resource_providers(self):
rpc_registry.register_provider(
_get_qos_bandwidth_limit_rule_cb_stub,
resources.QOS_RULE)
rpc_registry.register_provider(
_get_qos_policy_cb_stub,
resources.QOS_POLICY)
def register_port_callbacks(self):
# TODO(qos): Register the callbacks to properly manage
# extension of resources
pass
def register_net_callbacks(self):
# TODO(qos): Register the callbacks to properly manage
# extension of resources
pass
def create_policy(self, context, policy):
pass
def update_policy(self, context, policy_id, policy):
pass
def delete_policy(self, context, policy_id):
pass
def get_policy(self, context, policy_id, fields=None):
pass
def get_policies(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
pass
def create_policy_bandwidth_limit_rule(self, context, policy_id,
bandwidth_limit_rule):
pass
def update_policy_bandwidth_limit_rule(self, context, rule_id,
policy_id, bandwidth_limit_rule):
pass
def get_policy_bandwidth_limit_rule(self, context, rule_id,
policy_id, fields=None):
pass
def delete_policy_bandwidth_limit_rule(self, context, rule_id, policy_id):
pass
def get_policy_bandwidth_limit_rules(self, context, policy_id,
filters=None, fields=None,
sorts=None, limit=None,
marker=None, page_reverse=False):
pass

View File

@ -39,12 +39,14 @@
"get_network:provider:physical_network": "rule:admin_only",
"get_network:provider:segmentation_id": "rule:admin_only",
"get_network:queue_id": "rule:admin_only",
"get_network:qos_policy_id": "rule:admin_only",
"create_network:shared": "rule:admin_only",
"create_network:router:external": "rule:admin_only",
"create_network:segments": "rule:admin_only",
"create_network:provider:network_type": "rule:admin_only",
"create_network:provider:physical_network": "rule:admin_only",
"create_network:provider:segmentation_id": "rule:admin_only",
"create_network:qos_policy_id": "rule:admin_only",
"update_network": "rule:admin_or_owner",
"update_network:segments": "rule:admin_only",
"update_network:shared": "rule:admin_only",
@ -52,6 +54,7 @@
"update_network:provider:physical_network": "rule:admin_only",
"update_network:provider:segmentation_id": "rule:admin_only",
"update_network:router:external": "rule:admin_only",
"update_network:qos_policy_id": "rule:admin_only",
"delete_network": "rule:admin_or_owner",
"create_port": "",
@ -62,12 +65,14 @@
"create_port:binding:profile": "rule:admin_only",
"create_port:mac_learning_enabled": "rule:admin_or_network_owner or rule:context_is_advsvc",
"create_port:allowed_address_pairs": "rule:admin_or_network_owner",
"create_port:qos_policy_id": "rule:admin_only",
"get_port": "rule:admin_or_owner or rule:context_is_advsvc",
"get_port:queue_id": "rule:admin_only",
"get_port:binding:vif_type": "rule:admin_only",
"get_port:binding:vif_details": "rule:admin_only",
"get_port:binding:host_id": "rule:admin_only",
"get_port:binding:profile": "rule:admin_only",
"get_port:qos_policy_id": "rule:admin_only",
"update_port": "rule:admin_or_owner or rule:context_is_advsvc",
"update_port:mac_address": "rule:admin_only or rule:context_is_advsvc",
"update_port:fixed_ips": "rule:admin_or_network_owner or rule:context_is_advsvc",
@ -76,6 +81,7 @@
"update_port:binding:profile": "rule:admin_only",
"update_port:mac_learning_enabled": "rule:admin_or_network_owner or rule:context_is_advsvc",
"update_port:allowed_address_pairs": "rule:admin_or_network_owner",
"update_port:qos_policy_id": "rule:admin_only",
"delete_port": "rule:admin_or_owner or rule:context_is_advsvc",
"get_router:ha": "rule:admin_only",

View File

@ -284,6 +284,17 @@ class OVSBridgeTestCase(OVSBridgeTestBase):
controller,
'connection_mode'))
def test_qos_bw_limit(self):
port_name, _ = self.create_ovs_port()
self.br.create_qos_bw_limit_for_port(port_name, 700, 70)
max_rate, burst = self.br.get_qos_bw_limit_for_port(port_name)
self.assertEqual(700, max_rate)
self.assertEqual(70, burst)
self.br.del_qos_bw_limit_for_port(port_name)
max_rate, burst = self.br.get_qos_bw_limit_for_port(port_name)
self.assertIsNone(max_rate)
self.assertIsNone(burst)
class OVSLibTestCase(base.BaseOVSLinuxTestCase):

View File

@ -0,0 +1,78 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.api.rpc.callbacks import registry as rpc_registry
from neutron.api.rpc.callbacks import resources
from neutron.tests import base
class ResourcesCallbackRequestTestCase(base.BaseTestCase):
def setUp(self):
super(ResourcesCallbackRequestTestCase, self).setUp()
self.resource_id = '46ebaec0-0570-43ac-82f6-60d2b03168c4'
self.qos_rule_id = '5f126d84-551a-4dcf-bb01-0e9c0df0c793'
def test_resource_callback_request(self):
#TODO(QoS) convert it to the version object format
def _get_qos_policy_cb(resource, policy_id, **kwargs):
qos_policy = {
"tenant_id": "8d4c70a21fed4aeba121a1a429ba0d04",
"id": "46ebaec0-0570-43ac-82f6-60d2b03168c4",
"name": "10Mbit",
"description": "This policy limits the ports to 10Mbit max.",
"shared": False,
"rules": [{
"id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793",
"max_kbps": "10000",
"max_burst_kbps": "0",
"type": "bnadwidth_limit"
}]
}
return qos_policy
#TODO(QoS) convert it to the version object format
def _get_qos_bandwidth_limit_rule_cb(resource, rule_id, **kwargs):
bandwidth_limit = {
"id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793",
"qos_policy_id": "46ebaec0-0570-43ac-82f6-60d2b03168c4",
"max_kbps": "10000",
"max_burst_kbps": "0",
}
return bandwidth_limit
rpc_registry.register_provider(
_get_qos_bandwidth_limit_rule_cb,
resources.QOS_RULE)
rpc_registry.register_provider(
_get_qos_policy_cb,
resources.QOS_POLICY)
self.ctx = None
kwargs = {'context': self.ctx}
qos_policy = rpc_registry.get_info(
resources.QOS_POLICY,
self.resource_id,
**kwargs)
self.assertEqual(self.resource_id, qos_policy['id'])
qos_rule = rpc_registry.get_info(
resources.QOS_RULE,
self.qos_rule_id,
**kwargs)
self.assertEqual(self.qos_rule_id, qos_rule['id'])

View File

@ -1581,3 +1581,75 @@ class TestMl2PluginCreateUpdateDeletePort(base.BaseTestCase):
# run the transaction balancing function defined in this test
plugin.delete_port(self.context, 'fake_id')
self.assertTrue(self.notify.call_count)
class TestMl2PluginCreateUpdateNetwork(base.BaseTestCase):
def setUp(self):
super(TestMl2PluginCreateUpdateNetwork, self).setUp()
self.context = mock.MagicMock()
self.notify_p = mock.patch('neutron.callbacks.registry.notify')
self.notify = self.notify_p.start()
def _ensure_transaction_is_closed(self):
transaction = self.context.session.begin(subtransactions=True)
enter = transaction.__enter__.call_count
exit = transaction.__exit__.call_count
self.assertEqual(enter, exit)
def _create_plugin_for_create_update_network(self):
plugin = ml2_plugin.Ml2Plugin()
plugin.extension_manager = mock.Mock()
plugin.type_manager = mock.Mock()
plugin.mechanism_manager = mock.Mock()
plugin.notifier = mock.Mock()
mock.patch('neutron.extensions.providernet.'
'_raise_if_updates_provider_attributes').start()
self.notify.side_effect = (
lambda r, e, t, **kwargs: self._ensure_transaction_is_closed())
return plugin
def test_create_network_rpc_outside_transaction(self):
with mock.patch.object(ml2_plugin.Ml2Plugin, '__init__') as init,\
mock.patch.object(base_plugin.NeutronDbPluginV2,
'create_network'):
init.return_value = None
plugin = self._create_plugin_for_create_update_network()
plugin.create_network(self.context, mock.MagicMock())
kwargs = {'context': self.context, 'network': mock.ANY}
self.notify.assert_called_once_with('network', 'after_create',
plugin, **kwargs)
def test_create_network_bulk_rpc_outside_transaction(self):
with mock.patch.object(ml2_plugin.Ml2Plugin, '__init__') as init,\
mock.patch.object(base_plugin.NeutronDbPluginV2,
'create_network'):
init.return_value = None
plugin = self._create_plugin_for_create_update_network()
plugin.create_network_bulk(self.context,
{'networks':
[mock.MagicMock(), mock.MagicMock()]})
self.assertEqual(2, self.notify.call_count)
def test_update_network_rpc_outside_transaction(self):
with mock.patch.object(ml2_plugin.Ml2Plugin, '__init__') as init,\
mock.patch.object(base_plugin.NeutronDbPluginV2,
'update_network'):
init.return_value = None
plugin = self._create_plugin_for_create_update_network()
plugin.update_network(self.context, 'fake_id', mock.MagicMock())
kwargs = {
'context': self.context,
'network': mock.ANY,
}
self.notify.assert_called_once_with('network', 'after_update',
plugin, **kwargs)

View File

@ -35,6 +35,7 @@ oslo.rootwrap>=2.0.0 # Apache-2.0
oslo.serialization>=1.4.0 # Apache-2.0
oslo.service>=0.1.0 # Apache-2.0
oslo.utils>=1.6.0 # Apache-2.0
oslo.versionedobjects>=0.3.0,!=0.5.0
python-novaclient>=2.22.0

View File

@ -143,6 +143,7 @@ neutron.service_plugins =
neutron.services.loadbalancer.plugin.LoadBalancerPlugin = neutron_lbaas.services.loadbalancer.plugin:LoadBalancerPlugin
neutron.services.vpn.plugin.VPNDriverPlugin = neutron_vpnaas.services.vpn.plugin:VPNDriverPlugin
ibm_l3 = neutron.services.l3_router.l3_sdnve:SdnveL3ServicePlugin
qos = neutron.services.qos.qos_plugin:QoSPlugin
neutron.service_providers =
# These are for backwards compat with Juno firewall service provider configuration values
neutron.services.firewall.drivers.linux.iptables_fwaas.IptablesFwaasDriver = neutron_fwaas.services.firewall.drivers.linux.iptables_fwaas:IptablesFwaasDriver
@ -201,6 +202,7 @@ neutron.openstack.common.cache.backends =
neutron.ipam_drivers =
fake = neutron.tests.unit.ipam.fake_driver:FakeDriver
internal = neutron.ipam.drivers.neutrondb_ipam.driver:NeutronDbPool
neutron.agent.l2.extensions =
# These are for backwards compat with Icehouse notification_driver configuration values
oslo.messaging.notify.drivers =
neutron.openstack.common.notifier.log_notifier = oslo_messaging.notify._impl_log:LogDriver