diff --git a/ovsdbapp/backend/ovs_idl/common/__init__.py b/ovsdbapp/backend/ovs_idl/common/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ovsdbapp/backend/ovs_idl/common/base_connection_utils.py b/ovsdbapp/backend/ovs_idl/common/base_connection_utils.py new file mode 100644 index 00000000..59ffe44e --- /dev/null +++ b/ovsdbapp/backend/ovs_idl/common/base_connection_utils.py @@ -0,0 +1,32 @@ +# Copyright 2017 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class WaitQueue(object): + def __init__(self, max_queue_size): + self.max_queue_size = max_queue_size + self.init_alert_notification() + + def init_alert_notification(self): + raise NotImplementedError() + + def alert_notification_consume(self): + raise NotImplementedError() + + def alert_notify(self): + raise NotImplementedError() + + @property + def alert_fileno(self): + raise NotImplementedError() diff --git a/ovsdbapp/backend/ovs_idl/connection.py b/ovsdbapp/backend/ovs_idl/connection.py index 4097f764..5bdd81bb 100644 --- a/ovsdbapp/backend/ovs_idl/connection.py +++ b/ovsdbapp/backend/ovs_idl/connection.py @@ -18,37 +18,37 @@ import traceback from ovs.db import idl from ovs import poller -import six from six.moves import queue as Queue from ovsdbapp.backend.ovs_idl import idlutils +if os.name == 'nt': + from ovsdbapp.backend.ovs_idl.windows import connection_utils +else: + from ovsdbapp.backend.ovs_idl.linux import connection_utils + class TransactionQueue(Queue.Queue, object): def __init__(self, *args, **kwargs): super(TransactionQueue, self).__init__(*args, **kwargs) - alertpipe = os.pipe() - # NOTE(ivasilevskaya) python 3 doesn't allow unbuffered I/O. Will get - # around this constraint by using binary mode. - self.alertin = os.fdopen(alertpipe[0], 'rb', 0) - self.alertout = os.fdopen(alertpipe[1], 'wb', 0) + self._wait_queue = connection_utils.WaitQueue( + max_queue_size=self.maxsize) def get_nowait(self, *args, **kwargs): try: result = super(TransactionQueue, self).get_nowait(*args, **kwargs) except Queue.Empty: return None - self.alertin.read(1) + self._wait_queue.alert_notification_consume() return result def put(self, *args, **kwargs): super(TransactionQueue, self).put(*args, **kwargs) - self.alertout.write(six.b('X')) - self.alertout.flush() + self._wait_queue.alert_notify() @property def alert_fileno(self): - return self.alertin.fileno() + return self._wait_queue.alert_fileno class Connection(object): diff --git a/ovsdbapp/backend/ovs_idl/linux/__init__.py b/ovsdbapp/backend/ovs_idl/linux/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ovsdbapp/backend/ovs_idl/linux/connection_utils.py b/ovsdbapp/backend/ovs_idl/linux/connection_utils.py new file mode 100644 index 00000000..fa8f480f --- /dev/null +++ b/ovsdbapp/backend/ovs_idl/linux/connection_utils.py @@ -0,0 +1,39 @@ +# Copyright (c) 2015 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os + +import six + +from ovsdbapp.backend.ovs_idl.common import base_connection_utils + + +class WaitQueue(base_connection_utils.WaitQueue): + def init_alert_notification(self): + alertpipe = os.pipe() + # NOTE(ivasilevskaya) python 3 doesn't allow unbuffered I/O. + # Will get around this constraint by using binary mode. + self.alertin = os.fdopen(alertpipe[0], 'rb', 0) + self.alertout = os.fdopen(alertpipe[1], 'wb', 0) + + def alert_notification_consume(self): + self.alertin.read(1) + + def alert_notify(self): + self.alertout.write(six.b('X')) + self.alertout.flush() + + @property + def alert_fileno(self): + return self.alertin.fileno() diff --git a/ovsdbapp/backend/ovs_idl/vlog.py b/ovsdbapp/backend/ovs_idl/vlog.py index 41547763..35df619b 100644 --- a/ovsdbapp/backend/ovs_idl/vlog.py +++ b/ovsdbapp/backend/ovs_idl/vlog.py @@ -14,9 +14,18 @@ import collections import logging +import sys from ovs import vlog +try: + from eventlet import patcher + # If eventlet is installed and the 'thread' module is patched, we will + # skip setting up the python logger on Windows. + EVENTLET_NONBLOCKING_MODE_ENABLED = patcher.is_monkey_patched('thread') +except ImportError: + EVENTLET_NONBLOCKING_MODE_ENABLED = False + _LOG = logging.getLogger(__name__) # Map local log LEVELS to local LOG functions @@ -54,6 +63,16 @@ def use_python_logger(levels=ALL_LEVELS, max_level=None): :param: max_level: the maximum level to log :type: max_level: vlog level, CRITICAL, ERROR, WARN, INFO, or DEBUG """ + if sys.platform == 'win32' and EVENTLET_NONBLOCKING_MODE_ENABLED: + # NOTE(abalutoiu) When using oslo logging we need to keep in mind that + # it does not work well with native threads. We need to be careful when + # we call eventlet.tpool.execute, and make sure that it will not use + # the oslo logging, since it might cause unexpected hangs if + # greenthreads are used. On Windows we have to use + # eventlet.tpool.execute for a call to the ovs lib which will use + # vlog to log messages. We will skip replacing the OVS IDL logger + # functions on Windows to avoid unexpected hangs with oslo logging + return if max_level: levels = levels[:levels.index(max_level) + 1] diff --git a/ovsdbapp/backend/ovs_idl/windows/__init__.py b/ovsdbapp/backend/ovs_idl/windows/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ovsdbapp/backend/ovs_idl/windows/connection_utils.py b/ovsdbapp/backend/ovs_idl/windows/connection_utils.py new file mode 100644 index 00000000..a57c7c94 --- /dev/null +++ b/ovsdbapp/backend/ovs_idl/windows/connection_utils.py @@ -0,0 +1,58 @@ +# Copyright 2017 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from ovs import poller +from ovs import winutils + +from ovsdbapp.backend.ovs_idl.common import base_connection_utils +from ovsdbapp.backend.ovs_idl.windows import utils + + +class WaitQueue(base_connection_utils.WaitQueue): + def init_alert_notification(self): + # We will use an event to get signaled when there is something in + # the queue. The OVS poller can wait on events on Windows. + # NOTE(abalutoiu) The assumption made is that the queue has + # length 1, otherwise we will need to have a list of events with + # the size of the queue. + self.alert_event = winutils.get_new_event(bManualReset=True, + bInitialState=False) + + def alert_notification_consume(self): + # Set the event object to the nonsignaled state to indicate that + # the queue is empty. + winutils.win32event.ResetEvent(self.alert_event) + + def alert_notify(self): + # Set the event object to the signaled state to indicate that + # we have something in the queue. + winutils.win32event.SetEvent(self.alert_event) + + @property + def alert_fileno(self): + return self.alert_event + + +def monkey_patch_poller_support(): + # Ensure that WaitForMultipleObjects will not block other greenthreads. + # poller.block uses WaitForMultipleObjects on Windows + old_block = poller.Poller.block + + def new_block(self): + return utils.avoid_blocking_call(old_block, self) + + poller.Poller.block = new_block + + +monkey_patch_poller_support() diff --git a/ovsdbapp/backend/ovs_idl/windows/utils.py b/ovsdbapp/backend/ovs_idl/windows/utils.py new file mode 100644 index 00000000..7611c44e --- /dev/null +++ b/ovsdbapp/backend/ovs_idl/windows/utils.py @@ -0,0 +1,43 @@ +# Copyright 2017 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +try: + import eventlet + from eventlet import tpool +except ImportError: + eventlet = None + + +def avoid_blocking_call(f, *args, **kwargs): + """Ensure that the method "f" will not block other greenthreads. + + Performs the call to the function "f" received as parameter in a + different thread using tpool.execute when called from a greenthread. + This will ensure that the function "f" will not block other greenthreads. + If not called from a greenthread, it will invoke the function "f" directly. + The function "f" will receive as parameters the arguments "args" and + keyword arguments "kwargs". If eventlet is not installed on the system + then this will call directly the function "f". + """ + if eventlet is None: + return f(*args, **kwargs) + + # Note that eventlet.getcurrent will always return a greenlet object. + # In case of a greenthread, the parent greenlet will always be the hub + # loop greenlet. + if eventlet.getcurrent().parent: + return tpool.execute(f, *args, **kwargs) + else: + return f(*args, **kwargs)