Add Windows support when using eventlet with monkey_patch

Currently when eventlet monkey patch is used and the os module
is monkey patched, os.fdopen can not be used on Windows.
This happens because Windows pipes don't support non-blocking I/O.

This patch addreses this issue by using an event for
the alert instead of a pipe. The OVS poller on Windows
supports waiting on events.

The code from connection.py was split in platform independent
and platform dependent code. There are three new folders added
which contain the code from connection.py that was splitted.

Change-Id: I088de72ff05742e11372dec86f71d94f667cda35
This commit is contained in:
Alin Balutoiu 2017-04-20 12:28:32 +03:00
parent 03ede4ec34
commit df0546f621
9 changed files with 201 additions and 10 deletions

View File

@ -0,0 +1,32 @@
# Copyright 2017 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class WaitQueue(object):
def __init__(self, max_queue_size):
self.max_queue_size = max_queue_size
self.init_alert_notification()
def init_alert_notification(self):
raise NotImplementedError()
def alert_notification_consume(self):
raise NotImplementedError()
def alert_notify(self):
raise NotImplementedError()
@property
def alert_fileno(self):
raise NotImplementedError()

View File

@ -18,37 +18,37 @@ import traceback
from ovs.db import idl
from ovs import poller
import six
from six.moves import queue as Queue
from ovsdbapp.backend.ovs_idl import idlutils
if os.name == 'nt':
from ovsdbapp.backend.ovs_idl.windows import connection_utils
else:
from ovsdbapp.backend.ovs_idl.linux import connection_utils
class TransactionQueue(Queue.Queue, object):
def __init__(self, *args, **kwargs):
super(TransactionQueue, self).__init__(*args, **kwargs)
alertpipe = os.pipe()
# NOTE(ivasilevskaya) python 3 doesn't allow unbuffered I/O. Will get
# around this constraint by using binary mode.
self.alertin = os.fdopen(alertpipe[0], 'rb', 0)
self.alertout = os.fdopen(alertpipe[1], 'wb', 0)
self._wait_queue = connection_utils.WaitQueue(
max_queue_size=self.maxsize)
def get_nowait(self, *args, **kwargs):
try:
result = super(TransactionQueue, self).get_nowait(*args, **kwargs)
except Queue.Empty:
return None
self.alertin.read(1)
self._wait_queue.alert_notification_consume()
return result
def put(self, *args, **kwargs):
super(TransactionQueue, self).put(*args, **kwargs)
self.alertout.write(six.b('X'))
self.alertout.flush()
self._wait_queue.alert_notify()
@property
def alert_fileno(self):
return self.alertin.fileno()
return self._wait_queue.alert_fileno
class Connection(object):

View File

@ -0,0 +1,39 @@
# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import six
from ovsdbapp.backend.ovs_idl.common import base_connection_utils
class WaitQueue(base_connection_utils.WaitQueue):
def init_alert_notification(self):
alertpipe = os.pipe()
# NOTE(ivasilevskaya) python 3 doesn't allow unbuffered I/O.
# Will get around this constraint by using binary mode.
self.alertin = os.fdopen(alertpipe[0], 'rb', 0)
self.alertout = os.fdopen(alertpipe[1], 'wb', 0)
def alert_notification_consume(self):
self.alertin.read(1)
def alert_notify(self):
self.alertout.write(six.b('X'))
self.alertout.flush()
@property
def alert_fileno(self):
return self.alertin.fileno()

View File

@ -14,9 +14,18 @@
import collections
import logging
import sys
from ovs import vlog
try:
from eventlet import patcher
# If eventlet is installed and the 'thread' module is patched, we will
# skip setting up the python logger on Windows.
EVENTLET_NONBLOCKING_MODE_ENABLED = patcher.is_monkey_patched('thread')
except ImportError:
EVENTLET_NONBLOCKING_MODE_ENABLED = False
_LOG = logging.getLogger(__name__)
# Map local log LEVELS to local LOG functions
@ -54,6 +63,16 @@ def use_python_logger(levels=ALL_LEVELS, max_level=None):
:param: max_level: the maximum level to log
:type: max_level: vlog level, CRITICAL, ERROR, WARN, INFO, or DEBUG
"""
if sys.platform == 'win32' and EVENTLET_NONBLOCKING_MODE_ENABLED:
# NOTE(abalutoiu) When using oslo logging we need to keep in mind that
# it does not work well with native threads. We need to be careful when
# we call eventlet.tpool.execute, and make sure that it will not use
# the oslo logging, since it might cause unexpected hangs if
# greenthreads are used. On Windows we have to use
# eventlet.tpool.execute for a call to the ovs lib which will use
# vlog to log messages. We will skip replacing the OVS IDL logger
# functions on Windows to avoid unexpected hangs with oslo logging
return
if max_level:
levels = levels[:levels.index(max_level) + 1]

View File

@ -0,0 +1,58 @@
# Copyright 2017 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ovs import poller
from ovs import winutils
from ovsdbapp.backend.ovs_idl.common import base_connection_utils
from ovsdbapp.backend.ovs_idl.windows import utils
class WaitQueue(base_connection_utils.WaitQueue):
def init_alert_notification(self):
# We will use an event to get signaled when there is something in
# the queue. The OVS poller can wait on events on Windows.
# NOTE(abalutoiu) The assumption made is that the queue has
# length 1, otherwise we will need to have a list of events with
# the size of the queue.
self.alert_event = winutils.get_new_event(bManualReset=True,
bInitialState=False)
def alert_notification_consume(self):
# Set the event object to the nonsignaled state to indicate that
# the queue is empty.
winutils.win32event.ResetEvent(self.alert_event)
def alert_notify(self):
# Set the event object to the signaled state to indicate that
# we have something in the queue.
winutils.win32event.SetEvent(self.alert_event)
@property
def alert_fileno(self):
return self.alert_event
def monkey_patch_poller_support():
# Ensure that WaitForMultipleObjects will not block other greenthreads.
# poller.block uses WaitForMultipleObjects on Windows
old_block = poller.Poller.block
def new_block(self):
return utils.avoid_blocking_call(old_block, self)
poller.Poller.block = new_block
monkey_patch_poller_support()

View File

@ -0,0 +1,43 @@
# Copyright 2017 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
try:
import eventlet
from eventlet import tpool
except ImportError:
eventlet = None
def avoid_blocking_call(f, *args, **kwargs):
"""Ensure that the method "f" will not block other greenthreads.
Performs the call to the function "f" received as parameter in a
different thread using tpool.execute when called from a greenthread.
This will ensure that the function "f" will not block other greenthreads.
If not called from a greenthread, it will invoke the function "f" directly.
The function "f" will receive as parameters the arguments "args" and
keyword arguments "kwargs". If eventlet is not installed on the system
then this will call directly the function "f".
"""
if eventlet is None:
return f(*args, **kwargs)
# Note that eventlet.getcurrent will always return a greenlet object.
# In case of a greenthread, the parent greenlet will always be the hub
# loop greenlet.
if eventlet.getcurrent().parent:
return tpool.execute(f, *args, **kwargs)
else:
return f(*args, **kwargs)