From df0546f6215d4004f0b8e35d28a0b13fdaf982b7 Mon Sep 17 00:00:00 2001 From: Alin Balutoiu Date: Thu, 20 Apr 2017 12:28:32 +0300 Subject: [PATCH] Add Windows support when using eventlet with monkey_patch Currently when eventlet monkey patch is used and the os module is monkey patched, os.fdopen can not be used on Windows. This happens because Windows pipes don't support non-blocking I/O. This patch addreses this issue by using an event for the alert instead of a pipe. The OVS poller on Windows supports waiting on events. The code from connection.py was split in platform independent and platform dependent code. There are three new folders added which contain the code from connection.py that was splitted. Change-Id: I088de72ff05742e11372dec86f71d94f667cda35 --- ovsdbapp/backend/ovs_idl/common/__init__.py | 0 .../ovs_idl/common/base_connection_utils.py | 32 ++++++++++ ovsdbapp/backend/ovs_idl/connection.py | 20 +++---- ovsdbapp/backend/ovs_idl/linux/__init__.py | 0 .../backend/ovs_idl/linux/connection_utils.py | 39 +++++++++++++ ovsdbapp/backend/ovs_idl/vlog.py | 19 ++++++ ovsdbapp/backend/ovs_idl/windows/__init__.py | 0 .../ovs_idl/windows/connection_utils.py | 58 +++++++++++++++++++ ovsdbapp/backend/ovs_idl/windows/utils.py | 43 ++++++++++++++ 9 files changed, 201 insertions(+), 10 deletions(-) create mode 100644 ovsdbapp/backend/ovs_idl/common/__init__.py create mode 100644 ovsdbapp/backend/ovs_idl/common/base_connection_utils.py create mode 100644 ovsdbapp/backend/ovs_idl/linux/__init__.py create mode 100644 ovsdbapp/backend/ovs_idl/linux/connection_utils.py create mode 100644 ovsdbapp/backend/ovs_idl/windows/__init__.py create mode 100644 ovsdbapp/backend/ovs_idl/windows/connection_utils.py create mode 100644 ovsdbapp/backend/ovs_idl/windows/utils.py diff --git a/ovsdbapp/backend/ovs_idl/common/__init__.py b/ovsdbapp/backend/ovs_idl/common/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ovsdbapp/backend/ovs_idl/common/base_connection_utils.py b/ovsdbapp/backend/ovs_idl/common/base_connection_utils.py new file mode 100644 index 00000000..59ffe44e --- /dev/null +++ b/ovsdbapp/backend/ovs_idl/common/base_connection_utils.py @@ -0,0 +1,32 @@ +# Copyright 2017 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class WaitQueue(object): + def __init__(self, max_queue_size): + self.max_queue_size = max_queue_size + self.init_alert_notification() + + def init_alert_notification(self): + raise NotImplementedError() + + def alert_notification_consume(self): + raise NotImplementedError() + + def alert_notify(self): + raise NotImplementedError() + + @property + def alert_fileno(self): + raise NotImplementedError() diff --git a/ovsdbapp/backend/ovs_idl/connection.py b/ovsdbapp/backend/ovs_idl/connection.py index 4097f764..5bdd81bb 100644 --- a/ovsdbapp/backend/ovs_idl/connection.py +++ b/ovsdbapp/backend/ovs_idl/connection.py @@ -18,37 +18,37 @@ import traceback from ovs.db import idl from ovs import poller -import six from six.moves import queue as Queue from ovsdbapp.backend.ovs_idl import idlutils +if os.name == 'nt': + from ovsdbapp.backend.ovs_idl.windows import connection_utils +else: + from ovsdbapp.backend.ovs_idl.linux import connection_utils + class TransactionQueue(Queue.Queue, object): def __init__(self, *args, **kwargs): super(TransactionQueue, self).__init__(*args, **kwargs) - alertpipe = os.pipe() - # NOTE(ivasilevskaya) python 3 doesn't allow unbuffered I/O. Will get - # around this constraint by using binary mode. - self.alertin = os.fdopen(alertpipe[0], 'rb', 0) - self.alertout = os.fdopen(alertpipe[1], 'wb', 0) + self._wait_queue = connection_utils.WaitQueue( + max_queue_size=self.maxsize) def get_nowait(self, *args, **kwargs): try: result = super(TransactionQueue, self).get_nowait(*args, **kwargs) except Queue.Empty: return None - self.alertin.read(1) + self._wait_queue.alert_notification_consume() return result def put(self, *args, **kwargs): super(TransactionQueue, self).put(*args, **kwargs) - self.alertout.write(six.b('X')) - self.alertout.flush() + self._wait_queue.alert_notify() @property def alert_fileno(self): - return self.alertin.fileno() + return self._wait_queue.alert_fileno class Connection(object): diff --git a/ovsdbapp/backend/ovs_idl/linux/__init__.py b/ovsdbapp/backend/ovs_idl/linux/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ovsdbapp/backend/ovs_idl/linux/connection_utils.py b/ovsdbapp/backend/ovs_idl/linux/connection_utils.py new file mode 100644 index 00000000..fa8f480f --- /dev/null +++ b/ovsdbapp/backend/ovs_idl/linux/connection_utils.py @@ -0,0 +1,39 @@ +# Copyright (c) 2015 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os + +import six + +from ovsdbapp.backend.ovs_idl.common import base_connection_utils + + +class WaitQueue(base_connection_utils.WaitQueue): + def init_alert_notification(self): + alertpipe = os.pipe() + # NOTE(ivasilevskaya) python 3 doesn't allow unbuffered I/O. + # Will get around this constraint by using binary mode. + self.alertin = os.fdopen(alertpipe[0], 'rb', 0) + self.alertout = os.fdopen(alertpipe[1], 'wb', 0) + + def alert_notification_consume(self): + self.alertin.read(1) + + def alert_notify(self): + self.alertout.write(six.b('X')) + self.alertout.flush() + + @property + def alert_fileno(self): + return self.alertin.fileno() diff --git a/ovsdbapp/backend/ovs_idl/vlog.py b/ovsdbapp/backend/ovs_idl/vlog.py index 41547763..35df619b 100644 --- a/ovsdbapp/backend/ovs_idl/vlog.py +++ b/ovsdbapp/backend/ovs_idl/vlog.py @@ -14,9 +14,18 @@ import collections import logging +import sys from ovs import vlog +try: + from eventlet import patcher + # If eventlet is installed and the 'thread' module is patched, we will + # skip setting up the python logger on Windows. + EVENTLET_NONBLOCKING_MODE_ENABLED = patcher.is_monkey_patched('thread') +except ImportError: + EVENTLET_NONBLOCKING_MODE_ENABLED = False + _LOG = logging.getLogger(__name__) # Map local log LEVELS to local LOG functions @@ -54,6 +63,16 @@ def use_python_logger(levels=ALL_LEVELS, max_level=None): :param: max_level: the maximum level to log :type: max_level: vlog level, CRITICAL, ERROR, WARN, INFO, or DEBUG """ + if sys.platform == 'win32' and EVENTLET_NONBLOCKING_MODE_ENABLED: + # NOTE(abalutoiu) When using oslo logging we need to keep in mind that + # it does not work well with native threads. We need to be careful when + # we call eventlet.tpool.execute, and make sure that it will not use + # the oslo logging, since it might cause unexpected hangs if + # greenthreads are used. On Windows we have to use + # eventlet.tpool.execute for a call to the ovs lib which will use + # vlog to log messages. We will skip replacing the OVS IDL logger + # functions on Windows to avoid unexpected hangs with oslo logging + return if max_level: levels = levels[:levels.index(max_level) + 1] diff --git a/ovsdbapp/backend/ovs_idl/windows/__init__.py b/ovsdbapp/backend/ovs_idl/windows/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ovsdbapp/backend/ovs_idl/windows/connection_utils.py b/ovsdbapp/backend/ovs_idl/windows/connection_utils.py new file mode 100644 index 00000000..a57c7c94 --- /dev/null +++ b/ovsdbapp/backend/ovs_idl/windows/connection_utils.py @@ -0,0 +1,58 @@ +# Copyright 2017 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from ovs import poller +from ovs import winutils + +from ovsdbapp.backend.ovs_idl.common import base_connection_utils +from ovsdbapp.backend.ovs_idl.windows import utils + + +class WaitQueue(base_connection_utils.WaitQueue): + def init_alert_notification(self): + # We will use an event to get signaled when there is something in + # the queue. The OVS poller can wait on events on Windows. + # NOTE(abalutoiu) The assumption made is that the queue has + # length 1, otherwise we will need to have a list of events with + # the size of the queue. + self.alert_event = winutils.get_new_event(bManualReset=True, + bInitialState=False) + + def alert_notification_consume(self): + # Set the event object to the nonsignaled state to indicate that + # the queue is empty. + winutils.win32event.ResetEvent(self.alert_event) + + def alert_notify(self): + # Set the event object to the signaled state to indicate that + # we have something in the queue. + winutils.win32event.SetEvent(self.alert_event) + + @property + def alert_fileno(self): + return self.alert_event + + +def monkey_patch_poller_support(): + # Ensure that WaitForMultipleObjects will not block other greenthreads. + # poller.block uses WaitForMultipleObjects on Windows + old_block = poller.Poller.block + + def new_block(self): + return utils.avoid_blocking_call(old_block, self) + + poller.Poller.block = new_block + + +monkey_patch_poller_support() diff --git a/ovsdbapp/backend/ovs_idl/windows/utils.py b/ovsdbapp/backend/ovs_idl/windows/utils.py new file mode 100644 index 00000000..7611c44e --- /dev/null +++ b/ovsdbapp/backend/ovs_idl/windows/utils.py @@ -0,0 +1,43 @@ +# Copyright 2017 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +try: + import eventlet + from eventlet import tpool +except ImportError: + eventlet = None + + +def avoid_blocking_call(f, *args, **kwargs): + """Ensure that the method "f" will not block other greenthreads. + + Performs the call to the function "f" received as parameter in a + different thread using tpool.execute when called from a greenthread. + This will ensure that the function "f" will not block other greenthreads. + If not called from a greenthread, it will invoke the function "f" directly. + The function "f" will receive as parameters the arguments "args" and + keyword arguments "kwargs". If eventlet is not installed on the system + then this will call directly the function "f". + """ + if eventlet is None: + return f(*args, **kwargs) + + # Note that eventlet.getcurrent will always return a greenlet object. + # In case of a greenthread, the parent greenlet will always be the hub + # loop greenlet. + if eventlet.getcurrent().parent: + return tpool.execute(f, *args, **kwargs) + else: + return f(*args, **kwargs)