Add OVSDB manager protocol application
Allows listening on a socket for OVSDB clients, reacting to their events and modifying their database. Co-Authored-By: Chris Hansen <chris.hansen.career@gmail.com> Co-Authored-By: Ravi Kamachi <ravi.kamachi@rackspace.com> Signed-off-by: Jason Kölker <jason@koelker.net> Signed-off-by: Chris Hansen <chris.hansen.career@gmail.com> Signed-off-by: Ravi Kamachi <ravi.kamachi@rackspace.com> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
parent
0de43f7b60
commit
aa198d6900
@ -12,3 +12,4 @@ Ryu provides some useful library for your network applications.
|
||||
library_of_config.rst
|
||||
library_bgp_speaker.rst
|
||||
library_bgp_speaker_ref.rst
|
||||
library_ovsdb_manager.rst
|
||||
|
61
doc/source/library_ovsdb_manager.rst
Normal file
61
doc/source/library_ovsdb_manager.rst
Normal file
@ -0,0 +1,61 @@
|
||||
*********************
|
||||
OVSDB Manager library
|
||||
*********************
|
||||
|
||||
Introduction
|
||||
============
|
||||
|
||||
Ryu OVSDB Manager library allows your code to interact with devices
|
||||
speaking the OVSDB protocol. This enables your code to perform remote
|
||||
management of the devices and react to topology changes on them.
|
||||
|
||||
Example
|
||||
=======
|
||||
|
||||
The following logs all new OVSDB connections and allows creating a port
|
||||
on a bridge.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
import uuid
|
||||
|
||||
from ryu.base import app_manager
|
||||
from ryu.services.protocols.ovsdb import api as ovsdb
|
||||
from ryu.services.protocols.ovsdb import event as ovsdb_event
|
||||
|
||||
|
||||
class MyApp(app_manager.RyuApp):
|
||||
@set_ev_cls(ovsdb_event.EventNewOVSDBConnection)
|
||||
def handle_new_ovsdb_connection(self, ev):
|
||||
system_id = ev.system_id
|
||||
self.logger.info('New OVSDB connection from system id %s',
|
||||
systemd_id)
|
||||
|
||||
def create_port(self, systemd_id, bridge_name, name):
|
||||
new_iface_uuid = uuid.uuid4()
|
||||
new_port_uuid = uuid.uuid4()
|
||||
|
||||
def _create_port(tables, insert):
|
||||
bridge = ovsdb.row_by_name(self, system_id, bridge_name)
|
||||
|
||||
iface = insert(tables['Interface'], new_iface_uuid)
|
||||
iface.name = name
|
||||
iface.type = 'internal'
|
||||
|
||||
port = insert(tables['Port'], new_port_uuid)
|
||||
port.name = name
|
||||
port.interfaces = [iface]
|
||||
|
||||
brdige.ports = bridfe.ports + [port]
|
||||
|
||||
return (new_port_uuid, new_iface_uuid)
|
||||
|
||||
req = ovsdb_event.EventModifyRequest(system_id, _create_port)
|
||||
rep = self.send_request(req)
|
||||
|
||||
if rep.status != 'success':
|
||||
self.logger.error('Error creating port %s on bridge %s: %s',
|
||||
name, bridge, rep.status)
|
||||
return None
|
||||
|
||||
return reply.insert_uuid[new_port_uuid]
|
14
ryu/services/protocols/ovsdb/__init__.py
Normal file
14
ryu/services/protocols/ovsdb/__init__.py
Normal file
@ -0,0 +1,14 @@
|
||||
# Copyright (c) 2014 Rackspace Hosting
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
137
ryu/services/protocols/ovsdb/api.py
Normal file
137
ryu/services/protocols/ovsdb/api.py
Normal file
@ -0,0 +1,137 @@
|
||||
# Copyright (c) 2014 Rackspace Hosting
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from ryu.lib import dpid as dpidlib
|
||||
from ryu.services.protocols.ovsdb import event as ovsdb_event
|
||||
|
||||
|
||||
def match_row(manager, system_id, table, fn):
|
||||
def _match_row(tables):
|
||||
return next((r for r in tables[table].rows.values()
|
||||
if fn(r)), None)
|
||||
|
||||
request_to_get_tables = ovsdb_event.EventReadRequest(system_id,
|
||||
_match_row)
|
||||
reply_to_get_tables = manager.send_request(request_to_get_tables)
|
||||
return reply_to_get_tables.result
|
||||
|
||||
|
||||
def match_rows(manager, system_id, table, fn):
|
||||
def _match_rows(tables):
|
||||
return (r for r in tables[table].rows.values() if fn(r))
|
||||
|
||||
request = ovsdb_event.EventReadRequest(system_id, _match_rows)
|
||||
reply = manager.send_request(request)
|
||||
return reply.result
|
||||
|
||||
|
||||
def row_by_name(manager, system_id, name, table='Bridge', fn=None):
|
||||
matched_row = match_row(manager, system_id, table,
|
||||
lambda row: row.name == name)
|
||||
|
||||
if fn is not None:
|
||||
return fn(matched_row)
|
||||
|
||||
return matched_row
|
||||
|
||||
|
||||
def get_column_value(manager, table, record, column):
|
||||
"""
|
||||
Example : To get datapath_id from Bridge table
|
||||
get_column_value('Bridge', <bridge name>, 'datapath_id').strip('"')
|
||||
"""
|
||||
row = row_by_name(manager, record, table)
|
||||
value = getattr(row, column, "")
|
||||
|
||||
if isinstance(value, list) and len(value) == 1:
|
||||
value = value[0]
|
||||
|
||||
return str(value)
|
||||
|
||||
|
||||
def get_iface_by_name(manager, system_id, name, fn=None):
|
||||
iface = row_by_name(manager, system_id, name, 'Interface')
|
||||
|
||||
if fn is not None:
|
||||
return fn(iface)
|
||||
|
||||
return iface
|
||||
|
||||
|
||||
def get_bridge_for_iface_name(manager, system_id, iface_name, fn=None):
|
||||
iface = row_by_name(manager, system_id, iface_name, 'Interface')
|
||||
port = match_row(manager, system_id, 'Port',
|
||||
lambda x: iface in x.interfaces)
|
||||
bridge = match_row(manager, system_id, 'Bridge',
|
||||
lambda x: port in x.ports)
|
||||
|
||||
if fn is not None:
|
||||
return fn(bridge)
|
||||
|
||||
return bridge
|
||||
|
||||
|
||||
def get_table(manager, system_id, name):
|
||||
def _get_table(tables):
|
||||
return tables[name]
|
||||
|
||||
request_to_get_tables = ovsdb_event.EventReadRequest(system_id,
|
||||
_get_table)
|
||||
reply_to_get_tables = manager.send_request(request_to_get_tables)
|
||||
return reply_to_get_tables.result
|
||||
|
||||
|
||||
def get_bridge_by_datapath_id(manager, system_id, datapath_id, fn=None):
|
||||
def _match_fn(row):
|
||||
row_dpid = dpidlib.str_to_dpid(str(row.datapath_id[0]))
|
||||
return row_dpid == datapath_id
|
||||
|
||||
bridge = match_row(manager, system_id, 'Bridge', _match_fn)
|
||||
|
||||
if fn is not None:
|
||||
return fn(bridge)
|
||||
|
||||
return bridge
|
||||
|
||||
|
||||
def get_datapath_ids_for_systemd_id(manager, system_id):
|
||||
def _get_dp_ids(tables):
|
||||
dp_ids = []
|
||||
|
||||
bridges = tables.get('Bridge')
|
||||
|
||||
if not bridges:
|
||||
return dp_ids
|
||||
|
||||
for bridge in bridges.rows.values():
|
||||
datapath_ids = bridge.datapath_id
|
||||
dp_ids.extend(dpidlib.str_to_dpid(dp_id) for dp_id in datapath_ids)
|
||||
|
||||
return dp_ids
|
||||
|
||||
request = ovsdb_event.EventReadRequest(system_id, _get_dp_ids)
|
||||
reply = manager.send_request(request)
|
||||
return reply.result
|
||||
|
||||
|
||||
def get_bridges_by_system_id(manager, system_id):
|
||||
return get_table(manager, system_id, 'Bridge').rows.values()
|
||||
|
||||
|
||||
def bridge_exists(manager, system_id, bridge_name):
|
||||
return bool(row_by_name(manager, system_id, bridge_name))
|
||||
|
||||
|
||||
def port_exists(manager, system_id, port_name):
|
||||
return bool(row_by_name(manager, system_id, port_name, 'Port'))
|
336
ryu/services/protocols/ovsdb/client.py
Normal file
336
ryu/services/protocols/ovsdb/client.py
Normal file
@ -0,0 +1,336 @@
|
||||
# Copyright (c) 2014 Rackspace Hosting
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import collections
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
# NOTE(jkoelker) Patch Vlog so that is uses standard logging
|
||||
from ovs import vlog
|
||||
|
||||
|
||||
class Vlog(vlog.Vlog):
|
||||
def __init__(self, name):
|
||||
self.log = logging.getLogger('ovs.%s' % name)
|
||||
|
||||
def __log(self, level, message, **kwargs):
|
||||
level = vlog.LEVELS.get(level, logging.DEBUG)
|
||||
self.log.log(level, message, **kwargs)
|
||||
|
||||
vlog.Vlog = Vlog
|
||||
|
||||
|
||||
from ovs import jsonrpc
|
||||
from ovs import reconnect
|
||||
from ovs import stream
|
||||
from ovs import timeval
|
||||
from ovs.db import idl
|
||||
|
||||
from ryu.base import app_manager
|
||||
from ryu.lib import hub
|
||||
from ryu.services.protocols.ovsdb import event
|
||||
from ryu.services.protocols.ovsdb import model
|
||||
|
||||
|
||||
now = timeval.msec
|
||||
|
||||
|
||||
def _uuid_to_row(atom, base):
|
||||
if base.ref_table:
|
||||
value = base.ref_table.rows.get(atom)
|
||||
else:
|
||||
value = atom
|
||||
|
||||
if isinstance(value, idl.Row):
|
||||
value = str(value.uuid)
|
||||
|
||||
return value
|
||||
|
||||
|
||||
def dictify(row):
|
||||
if row is None:
|
||||
return
|
||||
|
||||
return dict([(k, v.to_python(_uuid_to_row))
|
||||
for k, v in row._data.items()])
|
||||
|
||||
|
||||
def discover_schemas(connection):
|
||||
# NOTE(jkoelker) currently only the Open_vSwitch schema
|
||||
# is supported.
|
||||
# TODO(jkoelker) support arbitrary schemas
|
||||
req = jsonrpc.Message.create_request('list_dbs', [])
|
||||
error, reply = connection.transact_block(req)
|
||||
|
||||
if error or reply.error:
|
||||
return
|
||||
|
||||
schemas = []
|
||||
for db in reply.result:
|
||||
if db != 'Open_vSwitch':
|
||||
continue
|
||||
|
||||
req = jsonrpc.Message.create_request('get_schema', [db])
|
||||
error, reply = connection.transact_block(req)
|
||||
|
||||
if error or reply.error:
|
||||
# TODO(jkoelker) Error handling
|
||||
continue
|
||||
|
||||
schemas.append(reply.result)
|
||||
|
||||
return schemas
|
||||
|
||||
|
||||
def discover_system_id(idl):
|
||||
system_id = None
|
||||
|
||||
while system_id is None:
|
||||
idl.run()
|
||||
openvswitch = idl.tables['Open_vSwitch'].rows
|
||||
|
||||
if openvswitch:
|
||||
row = openvswitch.get(list(openvswitch.keys())[0])
|
||||
system_id = row.external_ids.get('system-id')
|
||||
|
||||
return system_id
|
||||
|
||||
|
||||
# NOTE(jkoelker) Wrap ovs's Idl to accept an existing session, and
|
||||
# trigger callbacks on changes
|
||||
class Idl(idl.Idl):
|
||||
def __init__(self, session, schema):
|
||||
if not isinstance(schema, idl.SchemaHelper):
|
||||
schema = idl.SchemaHelper(schema_json=schema)
|
||||
schema.register_all()
|
||||
|
||||
schema = schema.get_idl_schema()
|
||||
|
||||
# NOTE(jkoelker) event buffer
|
||||
self._events = []
|
||||
|
||||
self.tables = schema.tables
|
||||
self._db = schema
|
||||
self._session = session
|
||||
self._monitor_request_id = None
|
||||
self._last_seqno = None
|
||||
self.change_seqno = 0
|
||||
|
||||
# Database locking.
|
||||
self.lock_name = None # Name of lock we need, None if none.
|
||||
self.has_lock = False # Has db server said we have the lock?
|
||||
self.is_lock_contended = False # Has db server said we can't get lock?
|
||||
self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
|
||||
|
||||
# Transaction support.
|
||||
self.txn = None
|
||||
self._outstanding_txns = {}
|
||||
|
||||
for table in schema.tables.values():
|
||||
for column in table.columns.values():
|
||||
if not hasattr(column, 'alert'):
|
||||
column.alert = True
|
||||
table.need_table = False
|
||||
table.rows = {}
|
||||
table.idl = self
|
||||
|
||||
@property
|
||||
def events(self):
|
||||
events = self._events
|
||||
self._events = []
|
||||
return events
|
||||
|
||||
def __process_update(self, table, uuid, old, new):
|
||||
old_row = table.rows.get(uuid)
|
||||
if old_row is not None:
|
||||
old_row = model.Row(dictify(old_row))
|
||||
old_row['_uuid'] = uuid
|
||||
|
||||
changed = idl.Idl.__process_update(self, table, uuid, old, new)
|
||||
|
||||
if changed:
|
||||
if not new:
|
||||
ev = (event.EventRowDelete, (table.name, old_row))
|
||||
|
||||
elif not old:
|
||||
new_row = model.Row(dictify(table.rows.get(uuid)))
|
||||
new_row['_uuid'] = uuid
|
||||
ev = (event.EventRowInsert, (table.name, new_row))
|
||||
|
||||
else:
|
||||
new_row = model.Row(dictify(table.rows.get(uuid)))
|
||||
new_row['_uuid'] = uuid
|
||||
|
||||
ev = (event.EventRowUpdate, (table.name, old_row, new_row))
|
||||
|
||||
self._events.append(ev)
|
||||
|
||||
return changed
|
||||
|
||||
|
||||
class RemoteOvsdb(app_manager.RyuApp):
|
||||
_EVENTS = [event.EventRowUpdate,
|
||||
event.EventRowDelete,
|
||||
event.EventRowInsert,
|
||||
event.EventInterfaceDeleted,
|
||||
event.EventInterfaceInserted,
|
||||
event.EventInterfaceUpdated,
|
||||
event.EventPortDeleted,
|
||||
event.EventPortInserted,
|
||||
event.EventPortUpdated]
|
||||
|
||||
@classmethod
|
||||
def factory(cls, sock, address, *args, **kwargs):
|
||||
ovs_stream = stream.Stream(sock, None, None)
|
||||
connection = jsonrpc.Connection(ovs_stream)
|
||||
schemas = discover_schemas(connection)
|
||||
|
||||
if not schemas:
|
||||
return
|
||||
|
||||
fsm = reconnect.Reconnect(now())
|
||||
fsm.set_name('%s:%s' % address)
|
||||
fsm.enable(now())
|
||||
fsm.set_passive(True, now())
|
||||
fsm.set_max_tries(-1)
|
||||
fsm.connected(now())
|
||||
|
||||
session = jsonrpc.Session(fsm, connection)
|
||||
idl = Idl(session, schemas[0])
|
||||
|
||||
system_id = discover_system_id(idl)
|
||||
name = cls.instance_name(system_id)
|
||||
ovs_stream.name = name
|
||||
connection.name = name
|
||||
fsm.set_name(name)
|
||||
|
||||
kwargs = kwargs.copy()
|
||||
kwargs['address'] = address
|
||||
kwargs['idl'] = idl
|
||||
kwargs['name'] = name
|
||||
kwargs['system_id'] = system_id
|
||||
|
||||
app_mgr = app_manager.AppManager.get_instance()
|
||||
return app_mgr.instantiate(cls, *args, **kwargs)
|
||||
|
||||
@classmethod
|
||||
def instance_name(cls, system_id):
|
||||
return '%s-%s' % (cls.__name__, system_id)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(RemoteOvsdb, self).__init__(*args, **kwargs)
|
||||
self.address = kwargs['address']
|
||||
self._idl = kwargs['idl']
|
||||
self.system_id = kwargs['system_id']
|
||||
self.name = kwargs['name']
|
||||
self._txn_q = collections.deque()
|
||||
|
||||
def _event_proxy_loop(self):
|
||||
while self.is_active:
|
||||
events = self._idl.events
|
||||
|
||||
if not events:
|
||||
hub.sleep(0.1)
|
||||
continue
|
||||
|
||||
for event in events:
|
||||
ev = event[0]
|
||||
args = event[1]
|
||||
self._submit_event(ev(self.system_id, *args))
|
||||
|
||||
hub.sleep(0)
|
||||
|
||||
def _submit_event(self, ev):
|
||||
self.send_event_to_observers(ev)
|
||||
try:
|
||||
ev_cls_name = 'Event' + ev.table + ev.event_type
|
||||
proxy_ev_cls = getattr(event, ev_cls_name, None)
|
||||
if proxy_ev_cls:
|
||||
self.send_event_to_observers(proxy_ev_cls(ev))
|
||||
except Exception:
|
||||
self.logger.exception('Error submitting specific event for OVSDB',
|
||||
self.system_id)
|
||||
|
||||
def _idl_loop(self):
|
||||
while self.is_active:
|
||||
try:
|
||||
self._idl.run()
|
||||
self._transactions()
|
||||
except Exception:
|
||||
self.logger.exception('Error running IDL for system_id %s' %
|
||||
self.system_id)
|
||||
break
|
||||
|
||||
hub.sleep(0)
|
||||
|
||||
def _run_thread(self, func, *args, **kwargs):
|
||||
try:
|
||||
func(*args, **kwargs)
|
||||
|
||||
finally:
|
||||
self.stop()
|
||||
|
||||
def _transactions(self):
|
||||
if not self._txn_q:
|
||||
return
|
||||
|
||||
# NOTE(jkoelker) possibly run multiple transactions per loop?
|
||||
self._transaction()
|
||||
|
||||
def _transaction(self):
|
||||
req = self._txn_q.popleft()
|
||||
txn = idl.Transaction(self._idl)
|
||||
|
||||
uuids = req.func(self._idl.tables, txn.insert)
|
||||
status = txn.commit_block()
|
||||
|
||||
insert_uuids = {}
|
||||
err_msg = None
|
||||
|
||||
if status in (idl.Transaction.SUCCESS,
|
||||
idl.Transaction.UNCHANGED):
|
||||
if uuids:
|
||||
if isinstance(uuids, uuid.UUID):
|
||||
insert_uuids[uuids] = txn.get_insert_uuid(uuids)
|
||||
|
||||
else:
|
||||
insert_uuids = dict((uuid, txn.get_insert_uuid(uuid))
|
||||
for uuid in uuids)
|
||||
else:
|
||||
err_msg = txn.get_error()
|
||||
|
||||
rep = event.EventModifyReply(self.system_id, status, insert_uuids,
|
||||
err_msg)
|
||||
self.reply_to_request(req, rep)
|
||||
|
||||
def modify_request_handler(self, ev):
|
||||
self._txn_q.append(ev)
|
||||
|
||||
def read_request_handler(self, ev):
|
||||
result = ev.func(self._idl.tables)
|
||||
rep = event.EventReadReply(self.system_id, result)
|
||||
self.reply_to_request(ev, rep)
|
||||
|
||||
def start(self):
|
||||
super(RemoteOvsdb, self).start()
|
||||
t = hub.spawn(self._run_thread, self._idl_loop)
|
||||
self.threads.append(t)
|
||||
|
||||
t = hub.spawn(self._run_thread, self._event_proxy_loop)
|
||||
self.threads.append(t)
|
||||
|
||||
def stop(self):
|
||||
super(RemoteOvsdb, self).stop()
|
||||
self._idl.close()
|
180
ryu/services/protocols/ovsdb/event.py
Normal file
180
ryu/services/protocols/ovsdb/event.py
Normal file
@ -0,0 +1,180 @@
|
||||
# Copyright (c) 2014 Rackspace Hosting
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ryu.controller import event as ryu_event
|
||||
from ryu.controller import handler
|
||||
|
||||
|
||||
class EventRowBase(ryu_event.EventBase):
|
||||
def __init__(self, system_id, table, row, event_type):
|
||||
super(EventRowBase, self).__init__()
|
||||
self.system_id = system_id
|
||||
self.table = table
|
||||
self.row = row
|
||||
self.event_type = event_type
|
||||
|
||||
def __str__(self):
|
||||
return '%s<system_id=%s table=%s, uuid=%s>' % (self.__class__.__name__,
|
||||
self.system_id,
|
||||
self.table,
|
||||
self.row['_uuid'])
|
||||
|
||||
|
||||
class EventRowDelete(EventRowBase):
|
||||
def __init__(self, system_id, table, row):
|
||||
super(EventRowDelete, self).__init__(system_id, table, row, 'Deleted')
|
||||
|
||||
|
||||
class EventRowInsert(EventRowBase):
|
||||
def __init__(self, system_id, table, row):
|
||||
super(EventRowInsert, self).__init__(system_id, table, row, 'Inserted')
|
||||
|
||||
|
||||
class EventRowUpdate(ryu_event.EventBase):
|
||||
def __init__(self, system_id, table, old, new):
|
||||
super(EventRowUpdate, self).__init__()
|
||||
self.system_id = system_id
|
||||
self.table = table
|
||||
self.old = old
|
||||
self.new = new
|
||||
self.event_type = 'Updated'
|
||||
|
||||
def __str__(self):
|
||||
return '%s<system_id=%s table=%s, uuid=%s>' % (self.__class__.__name__,
|
||||
self.system_id,
|
||||
self.table,
|
||||
self.old['_uuid'])
|
||||
|
||||
|
||||
class EventModifyRequest(ryu_event.EventRequestBase):
|
||||
""" Dispatch a modify function to OVSDB
|
||||
|
||||
`func` must be a callable that accepts an insert fucntion and the
|
||||
IDL.tables object. It can then modify the tables as needed. For inserts,
|
||||
specify a UUID for each insert, and return a tuple of the temporary
|
||||
UUID's. The execution of `func` will be wrapped in a single transaction
|
||||
and the reply will include a dict of temporary UUID to real UUID mappings.
|
||||
|
||||
e.g.
|
||||
|
||||
new_port_uuid = uuid.uuid4()
|
||||
|
||||
def modify(tables, insert):
|
||||
bridges = tables['Bridge'].rows
|
||||
bridge = None
|
||||
for b in bridges:
|
||||
if b.name == 'my-bridge':
|
||||
bridge = b
|
||||
|
||||
if not bridge:
|
||||
return
|
||||
|
||||
port = insert('Port', new_port_uuid)
|
||||
|
||||
bridge.ports = bridge.ports + [port]
|
||||
|
||||
return (new_port_uuid, )
|
||||
|
||||
request = EventModifyRequest(system_id, modify)
|
||||
reply = send_request(request)
|
||||
|
||||
port_uuid = reply.insert_uuids[new_port_uuid]
|
||||
"""
|
||||
def __init__(self, system_id, func):
|
||||
super(EventModifyRequest, self).__init__()
|
||||
self.dst = 'OVSDB'
|
||||
self.system_id = system_id
|
||||
self.func = func
|
||||
|
||||
|
||||
class EventModifyReply(ryu_event.EventReplyBase):
|
||||
def __init__(self, system_id, status, insert_uuids, err_msg):
|
||||
self.system_id = system_id
|
||||
self.status = status
|
||||
self.insert_uuids = insert_uuids
|
||||
self.err_msg = err_msg
|
||||
|
||||
|
||||
class EventNewOVSDBConnection(ryu_event.EventBase):
|
||||
def __init__(self, system_id):
|
||||
super(EventNewOVSDBConnection, self).__init__()
|
||||
self.system_id = system_id
|
||||
|
||||
def __str__(self):
|
||||
return '%s<system_id=%s>' % (self.__class__.__name__,
|
||||
self.system_id)
|
||||
|
||||
|
||||
class EventReadRequest(ryu_event.EventRequestBase):
|
||||
def __init__(self, system_id, func):
|
||||
self.system_id = system_id
|
||||
self.func = func
|
||||
self.dst = 'OVSDB'
|
||||
|
||||
|
||||
class EventReadReply(ryu_event.EventReplyBase):
|
||||
def __init__(self, system_id, result, err_msg=''):
|
||||
self.system_id = system_id
|
||||
self.result = result
|
||||
self.err_msg = err_msg
|
||||
|
||||
|
||||
class EventRowInsertedBase(EventRowInsert):
|
||||
def __init__(self, ev):
|
||||
super(EventRowInsertedBase, self).__init__(ev.system_id,
|
||||
ev.table,
|
||||
ev.row)
|
||||
|
||||
|
||||
class EventRowDeletedBase(EventRowDelete):
|
||||
def __init__(self, ev):
|
||||
super(EventRowDeletedBase, self).__init__(ev.system_id,
|
||||
ev.table,
|
||||
ev.row)
|
||||
|
||||
|
||||
class EventRowUpdatedBase(EventRowUpdate):
|
||||
def __init__(self, ev):
|
||||
super(EventRowUpdatedBase, self).__init__(ev.system_id,
|
||||
ev.table,
|
||||
ev.old,
|
||||
ev.new)
|
||||
|
||||
|
||||
class EventPortInserted(EventRowInsertedBase):
|
||||
pass
|
||||
|
||||
|
||||
class EventPortDeleted(EventRowDeletedBase):
|
||||
pass
|
||||
|
||||
|
||||
class EventPortUpdated(EventRowUpdatedBase):
|
||||
pass
|
||||
|
||||
|
||||
class EventInterfaceInserted(EventRowInsertedBase):
|
||||
pass
|
||||
|
||||
|
||||
class EventInterfaceDeleted(EventRowDeletedBase):
|
||||
pass
|
||||
|
||||
|
||||
class EventInterfaceUpdated(EventRowUpdatedBase):
|
||||
pass
|
||||
|
||||
|
||||
handler.register_service('ryu.services.protocols.ovsdb.manager')
|
149
ryu/services/protocols/ovsdb/manager.py
Normal file
149
ryu/services/protocols/ovsdb/manager.py
Normal file
@ -0,0 +1,149 @@
|
||||
# Copyright (c) 2014 Rackspace Hosting
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import ssl
|
||||
import socket
|
||||
|
||||
from ryu import cfg
|
||||
from ryu.base import app_manager
|
||||
from ryu.lib import hub
|
||||
from ryu.services.protocols.ovsdb import client
|
||||
from ryu.services.protocols.ovsdb import event
|
||||
from ryu.controller import handler
|
||||
|
||||
|
||||
opts = (cfg.StrOpt('address', default='0.0.0.0', help='OVSDB address'),
|
||||
cfg.IntOpt('port', default=6640, help='OVSDB port'),
|
||||
cfg.StrOpt('mngr-privkey', default=None, help='manager private key'),
|
||||
cfg.StrOpt('mngr-cert', default=None, help='manager certificate'),
|
||||
cfg.ListOpt('whitelist', default=[],
|
||||
help='Whitelist of address to allow to connect'))
|
||||
|
||||
cfg.CONF.register_opts(opts, 'ovsdb')
|
||||
|
||||
|
||||
class OVSDB(app_manager.RyuApp):
|
||||
_EVENTS = [event.EventNewOVSDBConnection,
|
||||
event.EventModifyRequest,
|
||||
event.EventReadRequest]
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(OVSDB, self).__init__(*args, **kwargs)
|
||||
self._address = self.CONF.ovsdb.address
|
||||
self._port = self.CONF.ovsdb.port
|
||||
self._clients = {}
|
||||
|
||||
def _accept(self, server):
|
||||
if self.CONF.ovsdb.whitelist:
|
||||
def check(address):
|
||||
if address in self.CONF.ovsdb.whitelist:
|
||||
return True
|
||||
|
||||
self.logger.debug('Connection from non-whitelist client '
|
||||
'(%s:%s)' % address)
|
||||
return False
|
||||
|
||||
else:
|
||||
def check(address):
|
||||
return True
|
||||
|
||||
while True:
|
||||
# TODO(jkoelker) SSL Certificate Fingerprint check
|
||||
sock, client_address = server.accept()
|
||||
|
||||
if not check(client_address[0]):
|
||||
sock.shutdown(socket.SHUT_RDWR)
|
||||
sock.close()
|
||||
continue
|
||||
|
||||
self.logger.debug('New connection from %s:%s' % client_address)
|
||||
t = hub.spawn(self._start_remote, sock, client_address)
|
||||
self.threads.append(t)
|
||||
|
||||
def _proxy_event(self, ev):
|
||||
system_id = ev.system_id
|
||||
client_name = client.RemoteOvsdb.instance_name(system_id)
|
||||
|
||||
if client_name not in self._clients:
|
||||
self.logger.info('Unknown remote system_id %s' % system_id)
|
||||
return
|
||||
|
||||
return self.send_event(client_name, ev)
|
||||
|
||||
def _start_remote(self, sock, client_address):
|
||||
app = client.RemoteOvsdb.factory(sock, client_address)
|
||||
|
||||
if app:
|
||||
self._clients[app.name] = app
|
||||
app.start()
|
||||
ev = event.EventNewOVSDBConnection(app.system_id)
|
||||
self.send_event_to_observers(ev)
|
||||
|
||||
def start(self):
|
||||
server = hub.listen((self._address, self._port))
|
||||
key = self.CONF.ovsdb.mngr_privkey or self.CONF.ctl_privkey
|
||||
cert = self.CONF.ovsdb.mngr_cert or self.CONF.ctl_cert
|
||||
|
||||
if key is not None and cert is not None:
|
||||
ssl_kwargs = dict(keyfile=key, certfile=cert, server_side=True)
|
||||
|
||||
if self.CONF.ca_certs is not None:
|
||||
ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
|
||||
ssl_kwargs['ca_certs'] = self.CONF.ca_certs
|
||||
|
||||
server = ssl.wrap_socket(server, **ssl_kwargs)
|
||||
|
||||
self._server = server
|
||||
|
||||
self.logger.info('Listening on %s:%s for clients' % (self._address,
|
||||
self._port))
|
||||
t = hub.spawn(self._accept, self._server)
|
||||
super(OVSDB, self).start()
|
||||
return t
|
||||
|
||||
def stop(self):
|
||||
for client in self._clients.values():
|
||||
client.stop()
|
||||
|
||||
super(OVSDB, self).stop()
|
||||
|
||||
@handler.set_ev_cls(event.EventModifyRequest)
|
||||
def modify_request_handler(self, ev):
|
||||
|
||||
system_id = ev.system_id
|
||||
client_name = client.RemoteOvsdb.instance_name(system_id)
|
||||
remote = self._clients.get(client_name)
|
||||
|
||||
if not remote:
|
||||
msg = 'Unknown remote system_id %s' % system_id
|
||||
self.logger.info(msg)
|
||||
rep = event.EventModifyReply(system_id, None, None, msg)
|
||||
return self.reply_to_request(ev, rep)
|
||||
|
||||
return remote.modify_request_handler(ev)
|
||||
|
||||
@handler.set_ev_cls(event.EventReadRequest)
|
||||
def read_request_handler(self, ev):
|
||||
system_id = ev.system_id
|
||||
client_name = client.RemoteOvsdb.instance_name(system_id)
|
||||
remote = self._clients.get(client_name)
|
||||
|
||||
if not remote:
|
||||
msg = 'Unknown remote system_id %s' % system_id
|
||||
self.logger.info(msg)
|
||||
rep = event.EventReadReply(self.system_id, None, msg)
|
||||
return self.reply_to_request(ev, rep)
|
||||
|
||||
return remote.read_request_handler(ev)
|
44
ryu/services/protocols/ovsdb/model.py
Normal file
44
ryu/services/protocols/ovsdb/model.py
Normal file
@ -0,0 +1,44 @@
|
||||
# Copyright (c) 2014 Rackspace Hosting
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import uuid
|
||||
|
||||
|
||||
class _UUIDDict(dict):
|
||||
def _uuidize(self):
|
||||
if '_uuid' not in self or self['_uuid'] is None:
|
||||
self['_uuid'] = uuid.uuid4()
|
||||
|
||||
@property
|
||||
def uuid(self):
|
||||
self._uuidize()
|
||||
return self['_uuid']
|
||||
|
||||
@uuid.setter
|
||||
def uuid(self, value):
|
||||
self['_uuid'] = value
|
||||
|
||||
|
||||
class Row(_UUIDDict):
|
||||
@property
|
||||
def delete(self):
|
||||
if '_delete' in self and self['_delete']:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
@delete.setter
|
||||
def delete(self, value):
|
||||
self['_delete'] = value
|
Loading…
Reference in New Issue
Block a user