Refactor futurist calls

After futurist calls were introduced, the code became difficult to
understand. For example, local variables are used something like global.
To keep the code easier to understand, the usage of local variables
should be more scoped.

This commit introduces a wrapper function for futurist.ThreadPoolExecutor
and converts inline functions into normal methods.
I believe it improves the code readability a lot.

Change-Id: Id5b7a06c50e397c8c27447322d7f64f2d65c06b6
This commit is contained in:
Akihiro Motoki 2017-11-23 13:35:46 +00:00 committed by Ivan Kolodyazhny
parent 73a0bbd43e
commit 56ae087995
5 changed files with 305 additions and 198 deletions

View File

@ -45,10 +45,10 @@ class InstanceViewTest(test.BaseAdminViewTests):
.MultipleTimes().AndReturn(True) .MultipleTimes().AndReturn(True)
api.keystone.tenant_list(IsA(http.HttpRequest)).\ api.keystone.tenant_list(IsA(http.HttpRequest)).\
AndReturn([tenants, False]) AndReturn([tenants, False])
search_opts = {'marker': None, 'paginate': True, 'all_tenants': True}
api.glance.image_list_detailed(IsA(http.HttpRequest))\ api.glance.image_list_detailed(IsA(http.HttpRequest))\
.AndReturn(images) .AndReturn((images, False, False))
api.nova.flavor_list(IsA(http.HttpRequest)).AndReturn(flavors) api.nova.flavor_list(IsA(http.HttpRequest)).AndReturn(flavors)
search_opts = {'marker': None, 'paginate': True, 'all_tenants': True}
api.nova.server_list(IsA(http.HttpRequest), api.nova.server_list(IsA(http.HttpRequest),
search_opts=search_opts) \ search_opts=search_opts) \
.AndReturn([servers, False]) .AndReturn([servers, False])
@ -69,6 +69,7 @@ class InstanceViewTest(test.BaseAdminViewTests):
servers = self.servers.list() servers = self.servers.list()
tenants = self.tenants.list() tenants = self.tenants.list()
flavors = self.flavors.list() flavors = self.flavors.list()
images = self.images.list()
full_flavors = OrderedDict([(f.id, f) for f in flavors]) full_flavors = OrderedDict([(f.id, f) for f in flavors])
search_opts = {'marker': None, 'paginate': True, 'all_tenants': True} search_opts = {'marker': None, 'paginate': True, 'all_tenants': True}
api.nova.server_list(IsA(http.HttpRequest), api.nova.server_list(IsA(http.HttpRequest),
@ -78,6 +79,8 @@ class InstanceViewTest(test.BaseAdminViewTests):
.MultipleTimes().AndReturn(True) .MultipleTimes().AndReturn(True)
api.nova.extension_supported('Shelve', IsA(http.HttpRequest)) \ api.nova.extension_supported('Shelve', IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(True) .MultipleTimes().AndReturn(True)
api.glance.image_list_detailed(IsA(http.HttpRequest))\
.AndReturn((images, False, False))
api.nova.flavor_list(IsA(http.HttpRequest)). \ api.nova.flavor_list(IsA(http.HttpRequest)). \
AndRaise(self.exceptions.nova) AndRaise(self.exceptions.nova)
api.keystone.tenant_list(IsA(http.HttpRequest)).\ api.keystone.tenant_list(IsA(http.HttpRequest)).\
@ -143,13 +146,18 @@ class InstanceViewTest(test.BaseAdminViewTests):
}) })
def test_index_server_list_exception(self): def test_index_server_list_exception(self):
tenants = self.tenants.list() tenants = self.tenants.list()
images = self.images.list()
flavors = self.flavors.list()
api.keystone.tenant_list(IsA(http.HttpRequest)).\
AndReturn([tenants, False])
api.glance.image_list_detailed(IsA(http.HttpRequest))\
.AndReturn((images, False, False))
api.nova.flavor_list(IsA(http.HttpRequest)).AndReturn(flavors)
search_opts = {'marker': None, 'paginate': True, 'all_tenants': True} search_opts = {'marker': None, 'paginate': True, 'all_tenants': True}
api.nova.server_list(IsA(http.HttpRequest), api.nova.server_list(IsA(http.HttpRequest),
search_opts=search_opts) \ search_opts=search_opts) \
.AndRaise(self.exceptions.nova) .AndRaise(self.exceptions.nova)
api.keystone.tenant_list(IsA(http.HttpRequest)).\
AndReturn([tenants, False])
self.mox.ReplayAll() self.mox.ReplayAll()
@ -207,7 +215,7 @@ class InstanceViewTest(test.BaseAdminViewTests):
api.keystone.tenant_list(IsA(http.HttpRequest)).\ api.keystone.tenant_list(IsA(http.HttpRequest)).\
AndReturn([self.tenants.list(), False]) AndReturn([self.tenants.list(), False])
api.glance.image_list_detailed(IsA(http.HttpRequest)) \ api.glance.image_list_detailed(IsA(http.HttpRequest)) \
.AndReturn(images) .AndReturn((images, False, False))
api.nova.flavor_list(IsA(http.HttpRequest)).AndReturn(flavors) api.nova.flavor_list(IsA(http.HttpRequest)).AndReturn(flavors)
search_opts = {'marker': None, 'paginate': True, 'all_tenants': True} search_opts = {'marker': None, 'paginate': True, 'all_tenants': True}
api.nova.server_list(IsA(http.HttpRequest), api.nova.server_list(IsA(http.HttpRequest),
@ -240,13 +248,13 @@ class InstanceViewTest(test.BaseAdminViewTests):
api.keystone.tenant_list(IsA(http.HttpRequest)) \ api.keystone.tenant_list(IsA(http.HttpRequest)) \
.AndReturn([self.tenants.list(), False]) .AndReturn([self.tenants.list(), False])
api.glance.image_list_detailed(IsA(http.HttpRequest)) \ api.glance.image_list_detailed(IsA(http.HttpRequest)) \
.AndReturn(images) .AndReturn((images, False, False))
api.nova.flavor_list(IsA(http.HttpRequest)).AndReturn(flavors) api.nova.flavor_list(IsA(http.HttpRequest)).AndReturn(flavors)
search_opts = {'marker': None, 'paginate': True, 'all_tenants': True}
api.nova.extension_supported('AdminActions', IsA(http.HttpRequest)) \ api.nova.extension_supported('AdminActions', IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(True) .MultipleTimes().AndReturn(True)
api.nova.extension_supported('Shelve', IsA(http.HttpRequest)) \ api.nova.extension_supported('Shelve', IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(True) .MultipleTimes().AndReturn(True)
search_opts = {'marker': None, 'paginate': True, 'all_tenants': True}
api.nova.server_list(IsA(http.HttpRequest), api.nova.server_list(IsA(http.HttpRequest),
search_opts=search_opts) \ search_opts=search_opts) \
.AndReturn([servers, False]) .AndReturn([servers, False])

View File

@ -17,8 +17,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import futurist
from django.conf import settings from django.conf import settings
from django.urls import reverse from django.urls import reverse
from django.urls import reverse_lazy from django.urls import reverse_lazy
@ -39,6 +37,7 @@ from openstack_dashboard.dashboards.admin.instances import tabs
from openstack_dashboard.dashboards.project.instances import views from openstack_dashboard.dashboards.project.instances import views
from openstack_dashboard.dashboards.project.instances.workflows \ from openstack_dashboard.dashboards.project.instances.workflows \
import update_instance import update_instance
from openstack_dashboard.utils import futurist_utils
# re-use console from project.instances.views to make reflection work # re-use console from project.instances.views to make reflection work
@ -81,14 +80,50 @@ class AdminIndexView(tables.DataTableView):
def needs_filter_first(self, table): def needs_filter_first(self, table):
return self._needs_filter_first return self._needs_filter_first
def _get_tenants(self):
# Gather our tenants to correlate against IDs
try:
tenants, __ = api.keystone.tenant_list(self.request)
return dict([(t.id, t) for t in tenants])
except Exception:
msg = _('Unable to retrieve instance project information.')
exceptions.handle(self.request, msg)
return {}
def _get_images(self):
# Gather our images to correlate againts IDs
try:
images, __, __ = api.glance.image_list_detailed(self.request)
return dict([(image.id, image) for image in images])
except Exception:
msg = _("Unable to retrieve image list.")
exceptions.handle(self.request, msg)
return {}
def _get_flavors(self):
# Gather our flavors to correlate against IDs
try:
flavors = api.nova.flavor_list(self.request)
return dict([(str(flavor.id), flavor) for flavor in flavors])
except Exception:
msg = _("Unable to retrieve flavor list.")
exceptions.handle(self.request, msg)
return {}
def _get_instances(self, search_opts):
try:
instances, self._more = api.nova.server_list(
self.request,
search_opts=search_opts)
except Exception:
self._more = False
instances = []
exceptions.handle(self.request,
_('Unable to retrieve instance list.'))
return instances
def get_data(self): def get_data(self):
instances = [] instances = []
tenants = []
tenant_dict = {}
images = []
image_map = {}
flavors = []
full_flavors = {}
marker = self.request.GET.get( marker = self.request.GET.get(
project_tables.AdminInstancesTable._meta.pagination_param, None) project_tables.AdminInstancesTable._meta.pagination_param, None)
@ -102,80 +137,35 @@ class AdminIndexView(tables.DataTableView):
# selected, then search criteria must be provided and return an empty # selected, then search criteria must be provided and return an empty
# list # list
filter_first = getattr(settings, 'FILTER_DATA_FIRST', {}) filter_first = getattr(settings, 'FILTER_DATA_FIRST', {})
if filter_first.get('admin.instances', False) and \ if (filter_first.get('admin.instances', False) and
len(search_opts) == len(default_search_opts): len(search_opts) == len(default_search_opts)):
self._needs_filter_first = True self._needs_filter_first = True
self._more = False self._more = False
return instances return []
self._needs_filter_first = False self._needs_filter_first = False
def _task_get_tenants(): results = futurist_utils.call_functions_parallel(
# Gather our tenants to correlate against IDs self._get_images, self._get_flavors, self._get_tenants)
try: image_dict, flavor_dict, tenant_dict = results
tmp_tenants, __ = api.keystone.tenant_list(self.request)
tenants.extend(tmp_tenants)
tenant_dict.update([(t.id, t) for t in tenants])
except Exception:
msg = _('Unable to retrieve instance project information.')
exceptions.handle(self.request, msg)
def _task_get_images():
# Gather our images to correlate againts IDs
try:
tmp_images = api.glance.image_list_detailed(self.request)[0]
images.extend(tmp_images)
image_map.update([(image.id, image) for image in images])
except Exception:
msg = _("Unable to retrieve image list.")
exceptions.handle(self.request, msg)
def _task_get_flavors():
# Gather our flavors to correlate against IDs
try:
tmp_flavors = api.nova.flavor_list(self.request)
flavors.extend(tmp_flavors)
full_flavors.update([(str(flavor.id), flavor)
for flavor in flavors])
except Exception:
msg = _("Unable to retrieve flavor list.")
exceptions.handle(self.request, msg)
def _task_get_instances():
try:
tmp_instances, self._more = api.nova.server_list(
self.request,
search_opts=search_opts)
instances.extend(tmp_instances)
except Exception:
self._more = False
exceptions.handle(self.request,
_('Unable to retrieve instance list.'))
# In case of exception when calling nova.server_list
# don't call api.network
return
with futurist.ThreadPoolExecutor(max_workers=3) as e:
e.submit(fn=_task_get_tenants)
e.submit(fn=_task_get_images)
e.submit(fn=_task_get_flavors)
non_api_filter_info = ( non_api_filter_info = (
('project', 'tenant_id', tenants), ('project', 'tenant_id', tenant_dict.values()),
('image_name', 'image', images), ('image_name', 'image', image_dict.values()),
('flavor_name', 'flavor', flavors), ('flavor_name', 'flavor', flavor_dict.values()),
) )
if not views.process_non_api_filters(search_opts, non_api_filter_info): if not views.process_non_api_filters(search_opts, non_api_filter_info):
self._more = False self._more = False
return [] return []
_task_get_instances() instances = self._get_instances(search_opts)
# Loop through instances to get image, flavor and tenant info. # Loop through instances to get image, flavor and tenant info.
for inst in instances: for inst in instances:
if hasattr(inst, 'image') and isinstance(inst.image, dict): if hasattr(inst, 'image') and isinstance(inst.image, dict):
if inst.image.get('id') in image_map: image_id = inst.image.get('id')
inst.image = image_map[inst.image.get('id')] if image_id in image_dict:
inst.image = image_dict[image_id]
# In case image not found in image_map, set name to empty # In case image not found in image_map, set name to empty
# to avoid fallback API call to Glance in api/nova.py # to avoid fallback API call to Glance in api/nova.py
# until the call is deprecated in api itself # until the call is deprecated in api itself
@ -184,10 +174,10 @@ class AdminIndexView(tables.DataTableView):
flavor_id = inst.flavor["id"] flavor_id = inst.flavor["id"]
try: try:
if flavor_id in full_flavors: if flavor_id in flavor_dict:
inst.full_flavor = full_flavors[flavor_id] inst.full_flavor = flavor_dict[flavor_id]
else: else:
# If the flavor_id is not in full_flavors list, # If the flavor_id is not in flavor_dict list,
# gets it via nova api. # gets it via nova api.
inst.full_flavor = api.nova.flavor_get( inst.full_flavor = api.nova.flavor_get(
self.request, flavor_id) self.request, flavor_id)

View File

@ -22,8 +22,6 @@ Views for managing instances.
from collections import OrderedDict from collections import OrderedDict
import logging import logging
import futurist
from django.conf import settings from django.conf import settings
from django import http from django import http
from django import shortcuts from django import shortcuts
@ -53,6 +51,7 @@ from openstack_dashboard.dashboards.project.instances \
import tabs as project_tabs import tabs as project_tabs
from openstack_dashboard.dashboards.project.instances \ from openstack_dashboard.dashboards.project.instances \
import workflows as project_workflows import workflows as project_workflows
from openstack_dashboard.utils import futurist_utils
from openstack_dashboard.views import get_url_with_pagination from openstack_dashboard.views import get_url_with_pagination
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -65,106 +64,101 @@ class IndexView(tables.DataTableView):
def has_more_data(self, table): def has_more_data(self, table):
return self._more return self._more
def _get_flavors(self):
# Gather our flavors to correlate our instances to them
try:
flavors = api.nova.flavor_list(self.request)
return dict([(str(flavor.id), flavor) for flavor in flavors])
except Exception:
exceptions.handle(self.request, ignore=True)
return {}
def _get_images(self):
# Gather our images to correlate our instances to them
try:
# TODO(gabriel): Handle pagination.
images = api.glance.image_list_detailed(self.request)[0]
return dict([(str(image.id), image) for image in images])
except Exception:
exceptions.handle(self.request, ignore=True)
return {}
def _get_instances(self, search_opts):
try:
instances, self._more = api.nova.server_list(
self.request,
search_opts=search_opts)
except Exception:
self._more = False
instances = []
exceptions.handle(self.request,
_('Unable to retrieve instances.'))
# In case of exception when calling nova.server_list
# don't call api.network
if not instances:
return []
# TODO(future): Explore more efficient logic to sync IP address
# and drop the setting OPENSTACK_INSTANCE_RETRIEVE_IP_ADDRESSES.
# The situation servers_update_addresses() is needed is only
# when IP address of a server is updated via neutron API and
# nova network info cache is not synced. Precisely there is no
# need to check IP addresses of all servers. It is sufficient to
# fetch IP address information for servers recently updated.
if not getattr(settings,
'OPENSTACK_INSTANCE_RETRIEVE_IP_ADDRESSES', True):
return instances
try:
api.network.servers_update_addresses(self.request, instances)
except Exception:
exceptions.handle(
self.request,
message=_('Unable to retrieve IP addresses from Neutron.'),
ignore=True)
return instances
def get_data(self): def get_data(self):
marker = self.request.GET.get( marker = self.request.GET.get(
project_tables.InstancesTable._meta.pagination_param, None) project_tables.InstancesTable._meta.pagination_param, None)
search_opts = self.get_filters({'marker': marker, 'paginate': True}) search_opts = self.get_filters({'marker': marker, 'paginate': True})
instances = [] image_dict, flavor_dict = futurist_utils.call_functions_parallel(
flavors = [] self._get_images, self._get_flavors)
full_flavors = {}
images = []
image_map = {}
def _task_get_instances():
# Gather our instances
try:
tmp_instances, self._more = api.nova.server_list(
self.request,
search_opts=search_opts)
instances.extend(tmp_instances)
except Exception:
self._more = False
exceptions.handle(self.request,
_('Unable to retrieve instances.'))
# In case of exception when calling nova.server_list
# don't call api.network
return
# TODO(future): Explore more efficient logic to sync IP address
# and drop the setting OPENSTACK_INSTANCE_RETRIEVE_IP_ADDRESSES.
# The situation servers_update_addresses() is needed # is only
# when IP address of a server is updated via neutron API and
# nova network info cache is not synced. Precisely there is no
# need to check IP addresses of all serves. It is sufficient to
# fetch IP address information for servers recently updated.
if not getattr(settings,
'OPENSTACK_INSTANCE_RETRIEVE_IP_ADDRESSES', True):
return
try:
api.network.servers_update_addresses(self.request, instances)
except Exception:
exceptions.handle(
self.request,
message=_('Unable to retrieve IP addresses from Neutron.'),
ignore=True)
def _task_get_flavors():
# Gather our flavors to correlate our instances to them
try:
tmp_flavors = api.nova.flavor_list(self.request)
flavors.extend(tmp_flavors)
full_flavors.update([(str(flavor.id), flavor)
for flavor in flavors])
except Exception:
exceptions.handle(self.request, ignore=True)
def _task_get_images():
# Gather our images to correlate our instances to them
try:
# TODO(gabriel): Handle pagination.
tmp_images = api.glance.image_list_detailed(self.request)[0]
images.extend(tmp_images)
image_map.update([(str(image.id), image) for image in images])
except Exception:
exceptions.handle(self.request, ignore=True)
with futurist.ThreadPoolExecutor(max_workers=3) as e:
e.submit(fn=_task_get_flavors)
e.submit(fn=_task_get_images)
non_api_filter_info = ( non_api_filter_info = (
('image_name', 'image', images), ('image_name', 'image', image_dict.values()),
('flavor_name', 'flavor', flavors), ('flavor_name', 'flavor', flavor_dict.values()),
) )
if not process_non_api_filters(search_opts, non_api_filter_info): if not process_non_api_filters(search_opts, non_api_filter_info):
self._more = False self._more = False
return [] return []
_task_get_instances() instances = self._get_instances(search_opts)
# Loop through instances to get flavor info. # Loop through instances to get flavor info.
for instance in instances: for instance in instances:
if hasattr(instance, 'image'): if hasattr(instance, 'image'):
# Instance from image returns dict # Instance from image returns dict
if isinstance(instance.image, dict): if isinstance(instance.image, dict):
if instance.image.get('id') in image_map: image_id = instance.image.get('id')
instance.image = image_map[instance.image.get('id')] if image_id in image_dict:
# In case image not found in image_map, set name to empty instance.image = image_dict[image_id]
# In case image not found in image_dict, set name to empty
# to avoid fallback API call to Glance in api/nova.py # to avoid fallback API call to Glance in api/nova.py
# until the call is deprecated in api itself # until the call is deprecated in api itself
else: else:
instance.image['name'] = _("-") instance.image['name'] = _("-")
flavor_id = instance.flavor["id"] flavor_id = instance.flavor["id"]
if flavor_id in full_flavors: if flavor_id in flavor_dict:
instance.full_flavor = full_flavors[flavor_id] instance.full_flavor = flavor_dict[flavor_id]
else: else:
# If the flavor_id is not in full_flavors list, # If the flavor_id is not in flavor_dict,
# put info in the log file. # put info in the log file.
msg = ('Unable to retrieve flavor "%s" for instance "%s".' LOG.info('Unable to retrieve flavor "%s" for instance "%s".',
% (flavor_id, instance.id)) flavor_id, instance.id)
LOG.info(msg)
return instances return instances
@ -408,6 +402,51 @@ class DetailView(tabs.TabView):
table = project_tables.InstancesTable(self.request) table = project_tables.InstancesTable(self.request)
return table.render_row_actions(instance) return table.render_row_actions(instance)
def _get_volumes(self, instance):
instance_id = instance.id
try:
instance.volumes = api.nova.instance_volumes_list(self.request,
instance_id)
# Sort by device name
instance.volumes.sort(key=lambda vol: vol.device)
except Exception:
msg = _('Unable to retrieve volume list for instance '
'"%(name)s" (%(id)s).') % {'name': instance.name,
'id': instance_id}
exceptions.handle(self.request, msg, ignore=True)
def _get_flavor(self, instance):
instance_id = instance.id
try:
instance.full_flavor = api.nova.flavor_get(
self.request, instance.flavor["id"])
except Exception:
msg = _('Unable to retrieve flavor information for instance '
'"%(name)s" (%(id)s).') % {'name': instance.name,
'id': instance_id}
exceptions.handle(self.request, msg, ignore=True)
def _get_security_groups(self, instance):
instance_id = instance.id
try:
instance.security_groups = api.neutron.server_security_groups(
self.request, instance_id)
except Exception:
msg = _('Unable to retrieve security groups for instance '
'"%(name)s" (%(id)s).') % {'name': instance.name,
'id': instance_id}
exceptions.handle(self.request, msg, ignore=True)
def _update_addresses(self, instance):
instance_id = instance.id
try:
api.network.servers_update_addresses(self.request, [instance])
except Exception:
msg = _('Unable to retrieve IP addresses from Neutron for '
'instance "%(name)s" (%(id)s).') \
% {'name': instance.name, 'id': instance_id}
exceptions.handle(self.request, msg, ignore=True)
@memoized.memoized_method @memoized.memoized_method
def get_data(self): def get_data(self):
instance_id = self.kwargs['instance_id'] instance_id = self.kwargs['instance_id']
@ -428,52 +467,12 @@ class DetailView(tabs.TabView):
instance.status_label = ( instance.status_label = (
filters.get_display_label(choices, instance.status)) filters.get_display_label(choices, instance.status))
def _task_get_volumes(): futurist_utils.call_functions_parallel(
try: (self._get_volumes, [instance]),
instance.volumes = api.nova.instance_volumes_list(self.request, (self._get_flavor, [instance]),
instance_id) (self._get_security_groups, [instance]),
# Sort by device name (self._update_addresses, [instance]),
instance.volumes.sort(key=lambda vol: vol.device) )
except Exception:
msg = _('Unable to retrieve volume list for instance '
'"%(name)s" (%(id)s).') % {'name': instance.name,
'id': instance_id}
exceptions.handle(self.request, msg, ignore=True)
def _task_get_flavor():
try:
instance.full_flavor = api.nova.flavor_get(
self.request, instance.flavor["id"])
except Exception:
msg = _('Unable to retrieve flavor information for instance '
'"%(name)s" (%(id)s).') % {'name': instance.name,
'id': instance_id}
exceptions.handle(self.request, msg, ignore=True)
def _task_get_security_groups():
try:
instance.security_groups = api.neutron.server_security_groups(
self.request, instance_id)
except Exception:
msg = _('Unable to retrieve security groups for instance '
'"%(name)s" (%(id)s).') % {'name': instance.name,
'id': instance_id}
exceptions.handle(self.request, msg, ignore=True)
def _task_update_addresses():
try:
api.network.servers_update_addresses(self.request, [instance])
except Exception:
msg = _('Unable to retrieve IP addresses from Neutron for '
'instance "%(name)s" (%(id)s).') \
% {'name': instance.name, 'id': instance_id}
exceptions.handle(self.request, msg, ignore=True)
with futurist.ThreadPoolExecutor(max_workers=4) as e:
e.submit(fn=_task_get_volumes)
e.submit(fn=_task_get_flavor)
e.submit(fn=_task_get_security_groups)
e.submit(fn=_task_update_addresses)
return instance return instance

View File

@ -0,0 +1,60 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
from openstack_dashboard.utils import futurist_utils
class FuturistUtilsTests(unittest.TestCase):
def test_call_functions_parallel(self):
def func1():
return 10
def func2():
return 20
ret = futurist_utils.call_functions_parallel(func1, func2)
self.assertEqual(ret, (10, 20))
def test_call_functions_parallel_with_args(self):
def func1(a):
return a
def func2(a, b):
return a + b
def func3():
return 3
ret = futurist_utils.call_functions_parallel(
(func1, [5]),
(func2, [10, 20]),
func3)
self.assertEqual(ret, (5, 30, 3))
def test_call_functions_parallel_with_kwargs(self):
def func1(a):
return a
def func2(a, b):
return a + b
def func3():
return 3
ret = futurist_utils.call_functions_parallel(
(func1, [], {'a': 5}),
(func2, [], {'a': 10, 'b': 20}),
func3)
self.assertEqual(ret, (5, 30, 3))

View File

@ -0,0 +1,50 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import futurist
def call_functions_parallel(*worker_defs):
"""Call specified functions in parallel.
:param *worker_defs: Each positional argument can be either of
a function to be called or a tuple which consists of a function,
a list of positional arguments) and keyword arguments (optional).
If you need to pass arguments, you need to pass a tuple.
Example usages are like:
call_functions_parallel(func1, func2, func3)
call_functions_parallel(func1, (func2, [1, 2]))
call_functions_parallel((func1, [], {'a': 1}),
(func2, [], {'a': 2, 'b': 10}))
:returns: a tuple of values returned from individual functions.
None is returned if a corresponding function does not return.
It is better to return values other than None from individual
functions.
"""
# TODO(amotoki): Needs to figure out what max_workers can be specified.
# According to e0ne, the apache default configuration in devstack allows
# only 10 threads. What happens if max_worker=11 is specified?
max_workers = len(worker_defs)
# Prepare a list with enough length.
futures = [None] * len(worker_defs)
with futurist.ThreadPoolExecutor(max_workers=max_workers) as e:
for index, func_def in enumerate(worker_defs):
if callable(func_def):
func_def = [func_def]
args = func_def[1] if len(func_def) > 1 else []
kwargs = func_def[2] if len(func_def) > 2 else {}
func = functools.partial(func_def[0], *args, **kwargs)
futures[index] = e.submit(fn=func)
return tuple(f.result() for f in futures)