From aa198d6900e2044ad940f4a1e89b3e7e0174c8c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jason=20K=C3=B6lker?= Date: Fri, 24 Jul 2015 17:58:02 +0000 Subject: [PATCH] Add OVSDB manager protocol application MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Allows listening on a socket for OVSDB clients, reacting to their events and modifying their database. Co-Authored-By: Chris Hansen Co-Authored-By: Ravi Kamachi Signed-off-by: Jason Kölker Signed-off-by: Chris Hansen Signed-off-by: Ravi Kamachi Signed-off-by: FUJITA Tomonori --- doc/source/library.rst | 1 + doc/source/library_ovsdb_manager.rst | 61 ++++ ryu/services/protocols/ovsdb/__init__.py | 14 + ryu/services/protocols/ovsdb/api.py | 137 +++++++++ ryu/services/protocols/ovsdb/client.py | 336 +++++++++++++++++++++++ ryu/services/protocols/ovsdb/event.py | 180 ++++++++++++ ryu/services/protocols/ovsdb/manager.py | 149 ++++++++++ ryu/services/protocols/ovsdb/model.py | 44 +++ 8 files changed, 922 insertions(+) create mode 100644 doc/source/library_ovsdb_manager.rst create mode 100644 ryu/services/protocols/ovsdb/__init__.py create mode 100644 ryu/services/protocols/ovsdb/api.py create mode 100644 ryu/services/protocols/ovsdb/client.py create mode 100644 ryu/services/protocols/ovsdb/event.py create mode 100644 ryu/services/protocols/ovsdb/manager.py create mode 100644 ryu/services/protocols/ovsdb/model.py diff --git a/doc/source/library.rst b/doc/source/library.rst index 38cc3872..bc8ff67f 100644 --- a/doc/source/library.rst +++ b/doc/source/library.rst @@ -12,3 +12,4 @@ Ryu provides some useful library for your network applications. library_of_config.rst library_bgp_speaker.rst library_bgp_speaker_ref.rst + library_ovsdb_manager.rst diff --git a/doc/source/library_ovsdb_manager.rst b/doc/source/library_ovsdb_manager.rst new file mode 100644 index 00000000..b23ae81d --- /dev/null +++ b/doc/source/library_ovsdb_manager.rst @@ -0,0 +1,61 @@ +********************* +OVSDB Manager library +********************* + +Introduction +============ + +Ryu OVSDB Manager library allows your code to interact with devices +speaking the OVSDB protocol. This enables your code to perform remote +management of the devices and react to topology changes on them. + +Example +======= + +The following logs all new OVSDB connections and allows creating a port +on a bridge. + +.. code-block:: python + + import uuid + + from ryu.base import app_manager + from ryu.services.protocols.ovsdb import api as ovsdb + from ryu.services.protocols.ovsdb import event as ovsdb_event + + + class MyApp(app_manager.RyuApp): + @set_ev_cls(ovsdb_event.EventNewOVSDBConnection) + def handle_new_ovsdb_connection(self, ev): + system_id = ev.system_id + self.logger.info('New OVSDB connection from system id %s', + systemd_id) + + def create_port(self, systemd_id, bridge_name, name): + new_iface_uuid = uuid.uuid4() + new_port_uuid = uuid.uuid4() + + def _create_port(tables, insert): + bridge = ovsdb.row_by_name(self, system_id, bridge_name) + + iface = insert(tables['Interface'], new_iface_uuid) + iface.name = name + iface.type = 'internal' + + port = insert(tables['Port'], new_port_uuid) + port.name = name + port.interfaces = [iface] + + brdige.ports = bridfe.ports + [port] + + return (new_port_uuid, new_iface_uuid) + + req = ovsdb_event.EventModifyRequest(system_id, _create_port) + rep = self.send_request(req) + + if rep.status != 'success': + self.logger.error('Error creating port %s on bridge %s: %s', + name, bridge, rep.status) + return None + + return reply.insert_uuid[new_port_uuid] diff --git a/ryu/services/protocols/ovsdb/__init__.py b/ryu/services/protocols/ovsdb/__init__.py new file mode 100644 index 00000000..fb3d4547 --- /dev/null +++ b/ryu/services/protocols/ovsdb/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) 2014 Rackspace Hosting +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/ryu/services/protocols/ovsdb/api.py b/ryu/services/protocols/ovsdb/api.py new file mode 100644 index 00000000..ea73cbf2 --- /dev/null +++ b/ryu/services/protocols/ovsdb/api.py @@ -0,0 +1,137 @@ +# Copyright (c) 2014 Rackspace Hosting +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from ryu.lib import dpid as dpidlib +from ryu.services.protocols.ovsdb import event as ovsdb_event + + +def match_row(manager, system_id, table, fn): + def _match_row(tables): + return next((r for r in tables[table].rows.values() + if fn(r)), None) + + request_to_get_tables = ovsdb_event.EventReadRequest(system_id, + _match_row) + reply_to_get_tables = manager.send_request(request_to_get_tables) + return reply_to_get_tables.result + + +def match_rows(manager, system_id, table, fn): + def _match_rows(tables): + return (r for r in tables[table].rows.values() if fn(r)) + + request = ovsdb_event.EventReadRequest(system_id, _match_rows) + reply = manager.send_request(request) + return reply.result + + +def row_by_name(manager, system_id, name, table='Bridge', fn=None): + matched_row = match_row(manager, system_id, table, + lambda row: row.name == name) + + if fn is not None: + return fn(matched_row) + + return matched_row + + +def get_column_value(manager, table, record, column): + """ + Example : To get datapath_id from Bridge table + get_column_value('Bridge', , 'datapath_id').strip('"') + """ + row = row_by_name(manager, record, table) + value = getattr(row, column, "") + + if isinstance(value, list) and len(value) == 1: + value = value[0] + + return str(value) + + +def get_iface_by_name(manager, system_id, name, fn=None): + iface = row_by_name(manager, system_id, name, 'Interface') + + if fn is not None: + return fn(iface) + + return iface + + +def get_bridge_for_iface_name(manager, system_id, iface_name, fn=None): + iface = row_by_name(manager, system_id, iface_name, 'Interface') + port = match_row(manager, system_id, 'Port', + lambda x: iface in x.interfaces) + bridge = match_row(manager, system_id, 'Bridge', + lambda x: port in x.ports) + + if fn is not None: + return fn(bridge) + + return bridge + + +def get_table(manager, system_id, name): + def _get_table(tables): + return tables[name] + + request_to_get_tables = ovsdb_event.EventReadRequest(system_id, + _get_table) + reply_to_get_tables = manager.send_request(request_to_get_tables) + return reply_to_get_tables.result + + +def get_bridge_by_datapath_id(manager, system_id, datapath_id, fn=None): + def _match_fn(row): + row_dpid = dpidlib.str_to_dpid(str(row.datapath_id[0])) + return row_dpid == datapath_id + + bridge = match_row(manager, system_id, 'Bridge', _match_fn) + + if fn is not None: + return fn(bridge) + + return bridge + + +def get_datapath_ids_for_systemd_id(manager, system_id): + def _get_dp_ids(tables): + dp_ids = [] + + bridges = tables.get('Bridge') + + if not bridges: + return dp_ids + + for bridge in bridges.rows.values(): + datapath_ids = bridge.datapath_id + dp_ids.extend(dpidlib.str_to_dpid(dp_id) for dp_id in datapath_ids) + + return dp_ids + + request = ovsdb_event.EventReadRequest(system_id, _get_dp_ids) + reply = manager.send_request(request) + return reply.result + + +def get_bridges_by_system_id(manager, system_id): + return get_table(manager, system_id, 'Bridge').rows.values() + + +def bridge_exists(manager, system_id, bridge_name): + return bool(row_by_name(manager, system_id, bridge_name)) + + +def port_exists(manager, system_id, port_name): + return bool(row_by_name(manager, system_id, port_name, 'Port')) diff --git a/ryu/services/protocols/ovsdb/client.py b/ryu/services/protocols/ovsdb/client.py new file mode 100644 index 00000000..175936ca --- /dev/null +++ b/ryu/services/protocols/ovsdb/client.py @@ -0,0 +1,336 @@ +# Copyright (c) 2014 Rackspace Hosting +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import logging +import uuid + +# NOTE(jkoelker) Patch Vlog so that is uses standard logging +from ovs import vlog + + +class Vlog(vlog.Vlog): + def __init__(self, name): + self.log = logging.getLogger('ovs.%s' % name) + + def __log(self, level, message, **kwargs): + level = vlog.LEVELS.get(level, logging.DEBUG) + self.log.log(level, message, **kwargs) + +vlog.Vlog = Vlog + + +from ovs import jsonrpc +from ovs import reconnect +from ovs import stream +from ovs import timeval +from ovs.db import idl + +from ryu.base import app_manager +from ryu.lib import hub +from ryu.services.protocols.ovsdb import event +from ryu.services.protocols.ovsdb import model + + +now = timeval.msec + + +def _uuid_to_row(atom, base): + if base.ref_table: + value = base.ref_table.rows.get(atom) + else: + value = atom + + if isinstance(value, idl.Row): + value = str(value.uuid) + + return value + + +def dictify(row): + if row is None: + return + + return dict([(k, v.to_python(_uuid_to_row)) + for k, v in row._data.items()]) + + +def discover_schemas(connection): + # NOTE(jkoelker) currently only the Open_vSwitch schema + # is supported. + # TODO(jkoelker) support arbitrary schemas + req = jsonrpc.Message.create_request('list_dbs', []) + error, reply = connection.transact_block(req) + + if error or reply.error: + return + + schemas = [] + for db in reply.result: + if db != 'Open_vSwitch': + continue + + req = jsonrpc.Message.create_request('get_schema', [db]) + error, reply = connection.transact_block(req) + + if error or reply.error: + # TODO(jkoelker) Error handling + continue + + schemas.append(reply.result) + + return schemas + + +def discover_system_id(idl): + system_id = None + + while system_id is None: + idl.run() + openvswitch = idl.tables['Open_vSwitch'].rows + + if openvswitch: + row = openvswitch.get(list(openvswitch.keys())[0]) + system_id = row.external_ids.get('system-id') + + return system_id + + +# NOTE(jkoelker) Wrap ovs's Idl to accept an existing session, and +# trigger callbacks on changes +class Idl(idl.Idl): + def __init__(self, session, schema): + if not isinstance(schema, idl.SchemaHelper): + schema = idl.SchemaHelper(schema_json=schema) + schema.register_all() + + schema = schema.get_idl_schema() + + # NOTE(jkoelker) event buffer + self._events = [] + + self.tables = schema.tables + self._db = schema + self._session = session + self._monitor_request_id = None + self._last_seqno = None + self.change_seqno = 0 + + # Database locking. + self.lock_name = None # Name of lock we need, None if none. + self.has_lock = False # Has db server said we have the lock? + self.is_lock_contended = False # Has db server said we can't get lock? + self._lock_request_id = None # JSON-RPC ID of in-flight lock request. + + # Transaction support. + self.txn = None + self._outstanding_txns = {} + + for table in schema.tables.values(): + for column in table.columns.values(): + if not hasattr(column, 'alert'): + column.alert = True + table.need_table = False + table.rows = {} + table.idl = self + + @property + def events(self): + events = self._events + self._events = [] + return events + + def __process_update(self, table, uuid, old, new): + old_row = table.rows.get(uuid) + if old_row is not None: + old_row = model.Row(dictify(old_row)) + old_row['_uuid'] = uuid + + changed = idl.Idl.__process_update(self, table, uuid, old, new) + + if changed: + if not new: + ev = (event.EventRowDelete, (table.name, old_row)) + + elif not old: + new_row = model.Row(dictify(table.rows.get(uuid))) + new_row['_uuid'] = uuid + ev = (event.EventRowInsert, (table.name, new_row)) + + else: + new_row = model.Row(dictify(table.rows.get(uuid))) + new_row['_uuid'] = uuid + + ev = (event.EventRowUpdate, (table.name, old_row, new_row)) + + self._events.append(ev) + + return changed + + +class RemoteOvsdb(app_manager.RyuApp): + _EVENTS = [event.EventRowUpdate, + event.EventRowDelete, + event.EventRowInsert, + event.EventInterfaceDeleted, + event.EventInterfaceInserted, + event.EventInterfaceUpdated, + event.EventPortDeleted, + event.EventPortInserted, + event.EventPortUpdated] + + @classmethod + def factory(cls, sock, address, *args, **kwargs): + ovs_stream = stream.Stream(sock, None, None) + connection = jsonrpc.Connection(ovs_stream) + schemas = discover_schemas(connection) + + if not schemas: + return + + fsm = reconnect.Reconnect(now()) + fsm.set_name('%s:%s' % address) + fsm.enable(now()) + fsm.set_passive(True, now()) + fsm.set_max_tries(-1) + fsm.connected(now()) + + session = jsonrpc.Session(fsm, connection) + idl = Idl(session, schemas[0]) + + system_id = discover_system_id(idl) + name = cls.instance_name(system_id) + ovs_stream.name = name + connection.name = name + fsm.set_name(name) + + kwargs = kwargs.copy() + kwargs['address'] = address + kwargs['idl'] = idl + kwargs['name'] = name + kwargs['system_id'] = system_id + + app_mgr = app_manager.AppManager.get_instance() + return app_mgr.instantiate(cls, *args, **kwargs) + + @classmethod + def instance_name(cls, system_id): + return '%s-%s' % (cls.__name__, system_id) + + def __init__(self, *args, **kwargs): + super(RemoteOvsdb, self).__init__(*args, **kwargs) + self.address = kwargs['address'] + self._idl = kwargs['idl'] + self.system_id = kwargs['system_id'] + self.name = kwargs['name'] + self._txn_q = collections.deque() + + def _event_proxy_loop(self): + while self.is_active: + events = self._idl.events + + if not events: + hub.sleep(0.1) + continue + + for event in events: + ev = event[0] + args = event[1] + self._submit_event(ev(self.system_id, *args)) + + hub.sleep(0) + + def _submit_event(self, ev): + self.send_event_to_observers(ev) + try: + ev_cls_name = 'Event' + ev.table + ev.event_type + proxy_ev_cls = getattr(event, ev_cls_name, None) + if proxy_ev_cls: + self.send_event_to_observers(proxy_ev_cls(ev)) + except Exception: + self.logger.exception('Error submitting specific event for OVSDB', + self.system_id) + + def _idl_loop(self): + while self.is_active: + try: + self._idl.run() + self._transactions() + except Exception: + self.logger.exception('Error running IDL for system_id %s' % + self.system_id) + break + + hub.sleep(0) + + def _run_thread(self, func, *args, **kwargs): + try: + func(*args, **kwargs) + + finally: + self.stop() + + def _transactions(self): + if not self._txn_q: + return + + # NOTE(jkoelker) possibly run multiple transactions per loop? + self._transaction() + + def _transaction(self): + req = self._txn_q.popleft() + txn = idl.Transaction(self._idl) + + uuids = req.func(self._idl.tables, txn.insert) + status = txn.commit_block() + + insert_uuids = {} + err_msg = None + + if status in (idl.Transaction.SUCCESS, + idl.Transaction.UNCHANGED): + if uuids: + if isinstance(uuids, uuid.UUID): + insert_uuids[uuids] = txn.get_insert_uuid(uuids) + + else: + insert_uuids = dict((uuid, txn.get_insert_uuid(uuid)) + for uuid in uuids) + else: + err_msg = txn.get_error() + + rep = event.EventModifyReply(self.system_id, status, insert_uuids, + err_msg) + self.reply_to_request(req, rep) + + def modify_request_handler(self, ev): + self._txn_q.append(ev) + + def read_request_handler(self, ev): + result = ev.func(self._idl.tables) + rep = event.EventReadReply(self.system_id, result) + self.reply_to_request(ev, rep) + + def start(self): + super(RemoteOvsdb, self).start() + t = hub.spawn(self._run_thread, self._idl_loop) + self.threads.append(t) + + t = hub.spawn(self._run_thread, self._event_proxy_loop) + self.threads.append(t) + + def stop(self): + super(RemoteOvsdb, self).stop() + self._idl.close() diff --git a/ryu/services/protocols/ovsdb/event.py b/ryu/services/protocols/ovsdb/event.py new file mode 100644 index 00000000..2353a4ff --- /dev/null +++ b/ryu/services/protocols/ovsdb/event.py @@ -0,0 +1,180 @@ +# Copyright (c) 2014 Rackspace Hosting +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ryu.controller import event as ryu_event +from ryu.controller import handler + + +class EventRowBase(ryu_event.EventBase): + def __init__(self, system_id, table, row, event_type): + super(EventRowBase, self).__init__() + self.system_id = system_id + self.table = table + self.row = row + self.event_type = event_type + + def __str__(self): + return '%s' % (self.__class__.__name__, + self.system_id, + self.table, + self.row['_uuid']) + + +class EventRowDelete(EventRowBase): + def __init__(self, system_id, table, row): + super(EventRowDelete, self).__init__(system_id, table, row, 'Deleted') + + +class EventRowInsert(EventRowBase): + def __init__(self, system_id, table, row): + super(EventRowInsert, self).__init__(system_id, table, row, 'Inserted') + + +class EventRowUpdate(ryu_event.EventBase): + def __init__(self, system_id, table, old, new): + super(EventRowUpdate, self).__init__() + self.system_id = system_id + self.table = table + self.old = old + self.new = new + self.event_type = 'Updated' + + def __str__(self): + return '%s' % (self.__class__.__name__, + self.system_id, + self.table, + self.old['_uuid']) + + +class EventModifyRequest(ryu_event.EventRequestBase): + """ Dispatch a modify function to OVSDB + + `func` must be a callable that accepts an insert fucntion and the + IDL.tables object. It can then modify the tables as needed. For inserts, + specify a UUID for each insert, and return a tuple of the temporary + UUID's. The execution of `func` will be wrapped in a single transaction + and the reply will include a dict of temporary UUID to real UUID mappings. + + e.g. + + new_port_uuid = uuid.uuid4() + + def modify(tables, insert): + bridges = tables['Bridge'].rows + bridge = None + for b in bridges: + if b.name == 'my-bridge': + bridge = b + + if not bridge: + return + + port = insert('Port', new_port_uuid) + + bridge.ports = bridge.ports + [port] + + return (new_port_uuid, ) + + request = EventModifyRequest(system_id, modify) + reply = send_request(request) + + port_uuid = reply.insert_uuids[new_port_uuid] + """ + def __init__(self, system_id, func): + super(EventModifyRequest, self).__init__() + self.dst = 'OVSDB' + self.system_id = system_id + self.func = func + + +class EventModifyReply(ryu_event.EventReplyBase): + def __init__(self, system_id, status, insert_uuids, err_msg): + self.system_id = system_id + self.status = status + self.insert_uuids = insert_uuids + self.err_msg = err_msg + + +class EventNewOVSDBConnection(ryu_event.EventBase): + def __init__(self, system_id): + super(EventNewOVSDBConnection, self).__init__() + self.system_id = system_id + + def __str__(self): + return '%s' % (self.__class__.__name__, + self.system_id) + + +class EventReadRequest(ryu_event.EventRequestBase): + def __init__(self, system_id, func): + self.system_id = system_id + self.func = func + self.dst = 'OVSDB' + + +class EventReadReply(ryu_event.EventReplyBase): + def __init__(self, system_id, result, err_msg=''): + self.system_id = system_id + self.result = result + self.err_msg = err_msg + + +class EventRowInsertedBase(EventRowInsert): + def __init__(self, ev): + super(EventRowInsertedBase, self).__init__(ev.system_id, + ev.table, + ev.row) + + +class EventRowDeletedBase(EventRowDelete): + def __init__(self, ev): + super(EventRowDeletedBase, self).__init__(ev.system_id, + ev.table, + ev.row) + + +class EventRowUpdatedBase(EventRowUpdate): + def __init__(self, ev): + super(EventRowUpdatedBase, self).__init__(ev.system_id, + ev.table, + ev.old, + ev.new) + + +class EventPortInserted(EventRowInsertedBase): + pass + + +class EventPortDeleted(EventRowDeletedBase): + pass + + +class EventPortUpdated(EventRowUpdatedBase): + pass + + +class EventInterfaceInserted(EventRowInsertedBase): + pass + + +class EventInterfaceDeleted(EventRowDeletedBase): + pass + + +class EventInterfaceUpdated(EventRowUpdatedBase): + pass + + +handler.register_service('ryu.services.protocols.ovsdb.manager') diff --git a/ryu/services/protocols/ovsdb/manager.py b/ryu/services/protocols/ovsdb/manager.py new file mode 100644 index 00000000..b34fb7d3 --- /dev/null +++ b/ryu/services/protocols/ovsdb/manager.py @@ -0,0 +1,149 @@ +# Copyright (c) 2014 Rackspace Hosting +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import ssl +import socket + +from ryu import cfg +from ryu.base import app_manager +from ryu.lib import hub +from ryu.services.protocols.ovsdb import client +from ryu.services.protocols.ovsdb import event +from ryu.controller import handler + + +opts = (cfg.StrOpt('address', default='0.0.0.0', help='OVSDB address'), + cfg.IntOpt('port', default=6640, help='OVSDB port'), + cfg.StrOpt('mngr-privkey', default=None, help='manager private key'), + cfg.StrOpt('mngr-cert', default=None, help='manager certificate'), + cfg.ListOpt('whitelist', default=[], + help='Whitelist of address to allow to connect')) + +cfg.CONF.register_opts(opts, 'ovsdb') + + +class OVSDB(app_manager.RyuApp): + _EVENTS = [event.EventNewOVSDBConnection, + event.EventModifyRequest, + event.EventReadRequest] + + def __init__(self, *args, **kwargs): + super(OVSDB, self).__init__(*args, **kwargs) + self._address = self.CONF.ovsdb.address + self._port = self.CONF.ovsdb.port + self._clients = {} + + def _accept(self, server): + if self.CONF.ovsdb.whitelist: + def check(address): + if address in self.CONF.ovsdb.whitelist: + return True + + self.logger.debug('Connection from non-whitelist client ' + '(%s:%s)' % address) + return False + + else: + def check(address): + return True + + while True: + # TODO(jkoelker) SSL Certificate Fingerprint check + sock, client_address = server.accept() + + if not check(client_address[0]): + sock.shutdown(socket.SHUT_RDWR) + sock.close() + continue + + self.logger.debug('New connection from %s:%s' % client_address) + t = hub.spawn(self._start_remote, sock, client_address) + self.threads.append(t) + + def _proxy_event(self, ev): + system_id = ev.system_id + client_name = client.RemoteOvsdb.instance_name(system_id) + + if client_name not in self._clients: + self.logger.info('Unknown remote system_id %s' % system_id) + return + + return self.send_event(client_name, ev) + + def _start_remote(self, sock, client_address): + app = client.RemoteOvsdb.factory(sock, client_address) + + if app: + self._clients[app.name] = app + app.start() + ev = event.EventNewOVSDBConnection(app.system_id) + self.send_event_to_observers(ev) + + def start(self): + server = hub.listen((self._address, self._port)) + key = self.CONF.ovsdb.mngr_privkey or self.CONF.ctl_privkey + cert = self.CONF.ovsdb.mngr_cert or self.CONF.ctl_cert + + if key is not None and cert is not None: + ssl_kwargs = dict(keyfile=key, certfile=cert, server_side=True) + + if self.CONF.ca_certs is not None: + ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED + ssl_kwargs['ca_certs'] = self.CONF.ca_certs + + server = ssl.wrap_socket(server, **ssl_kwargs) + + self._server = server + + self.logger.info('Listening on %s:%s for clients' % (self._address, + self._port)) + t = hub.spawn(self._accept, self._server) + super(OVSDB, self).start() + return t + + def stop(self): + for client in self._clients.values(): + client.stop() + + super(OVSDB, self).stop() + + @handler.set_ev_cls(event.EventModifyRequest) + def modify_request_handler(self, ev): + + system_id = ev.system_id + client_name = client.RemoteOvsdb.instance_name(system_id) + remote = self._clients.get(client_name) + + if not remote: + msg = 'Unknown remote system_id %s' % system_id + self.logger.info(msg) + rep = event.EventModifyReply(system_id, None, None, msg) + return self.reply_to_request(ev, rep) + + return remote.modify_request_handler(ev) + + @handler.set_ev_cls(event.EventReadRequest) + def read_request_handler(self, ev): + system_id = ev.system_id + client_name = client.RemoteOvsdb.instance_name(system_id) + remote = self._clients.get(client_name) + + if not remote: + msg = 'Unknown remote system_id %s' % system_id + self.logger.info(msg) + rep = event.EventReadReply(self.system_id, None, msg) + return self.reply_to_request(ev, rep) + + return remote.read_request_handler(ev) diff --git a/ryu/services/protocols/ovsdb/model.py b/ryu/services/protocols/ovsdb/model.py new file mode 100644 index 00000000..992c785f --- /dev/null +++ b/ryu/services/protocols/ovsdb/model.py @@ -0,0 +1,44 @@ +# Copyright (c) 2014 Rackspace Hosting +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import uuid + + +class _UUIDDict(dict): + def _uuidize(self): + if '_uuid' not in self or self['_uuid'] is None: + self['_uuid'] = uuid.uuid4() + + @property + def uuid(self): + self._uuidize() + return self['_uuid'] + + @uuid.setter + def uuid(self, value): + self['_uuid'] = value + + +class Row(_UUIDDict): + @property + def delete(self): + if '_delete' in self and self['_delete']: + return True + + return False + + @delete.setter + def delete(self, value): + self['_delete'] = value