neutron/neutron/agent/ovsdb/native/connection.py

111 lines
3.8 KiB
Python

# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import threading
import traceback
from ovs.db import idl
from ovs import poller
import retrying
from six.moves import queue as Queue
from neutron.agent.ovsdb.native import helpers
from neutron.agent.ovsdb.native import idlutils
class TransactionQueue(Queue.Queue, object):
def __init__(self, *args, **kwargs):
super(TransactionQueue, self).__init__(*args, **kwargs)
alertpipe = os.pipe()
self.alertin = os.fdopen(alertpipe[0], 'r', 0)
self.alertout = os.fdopen(alertpipe[1], 'w', 0)
def get_nowait(self, *args, **kwargs):
try:
result = super(TransactionQueue, self).get_nowait(*args, **kwargs)
except Queue.Empty:
return None
self.alertin.read(1)
return result
def put(self, *args, **kwargs):
super(TransactionQueue, self).put(*args, **kwargs)
self.alertout.write('X')
self.alertout.flush()
@property
def alert_fileno(self):
return self.alertin.fileno()
class Connection(object):
def __init__(self, connection, timeout, schema_name):
self.idl = None
self.connection = connection
self.timeout = timeout
self.txns = TransactionQueue(1)
self.lock = threading.Lock()
self.schema_name = schema_name
def start(self):
with self.lock:
if self.idl is not None:
return
try:
helper = idlutils.get_schema_helper(self.connection,
self.schema_name)
except Exception:
# We may have failed do to set-manager not being called
helpers.enable_connection_uri(self.connection)
# There is a small window for a race, so retry up to a second
@retrying.retry(wait_exponential_multiplier=10,
stop_max_delay=1000)
def do_get_schema_helper():
return idlutils.get_schema_helper(self.connection,
self.schema_name)
helper = do_get_schema_helper()
helper.register_all()
self.idl = idl.Idl(self.connection, helper)
idlutils.wait_for_change(self.idl, self.timeout)
self.poller = poller.Poller()
self.thread = threading.Thread(target=self.run)
self.thread.setDaemon(True)
self.thread.start()
def run(self):
while True:
self.idl.wait(self.poller)
self.poller.fd_wait(self.txns.alert_fileno, poller.POLLIN)
#TODO(jlibosva): Remove next line once losing connection to ovsdb
# is solved.
self.poller.timer_wait(self.timeout)
self.poller.block()
self.idl.run()
txn = self.txns.get_nowait()
if txn is not None:
try:
txn.results.put(txn.do_commit())
except Exception as ex:
er = idlutils.ExceptionResult(ex=ex,
tb=traceback.format_exc())
txn.results.put(er)
self.txns.task_done()
def queue_txn(self, txn):
self.txns.put(txn)