rename various files of neutron/*.py to tacker/*.py

Change-Id: Iaad5851d1a6dc68bfa68dcf6ee6da2b226fad469
This commit is contained in:
Isaku Yamahata 2014-06-26 17:11:37 +09:00
parent 70be577f35
commit 54d23acf3e
12 changed files with 228 additions and 351 deletions

View File

@ -1,225 +0,0 @@
# Copyright 2011 VMware, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import weakref
from oslo.config import cfg
from neutron.common import rpc_compat
from neutron.common import utils
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import periodic_task
from neutron.plugins.common import constants
from stevedore import driver
LOG = logging.getLogger(__name__)
class Manager(rpc_compat.RpcCallback, periodic_task.PeriodicTasks):
# Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0'
def __init__(self, host=None):
if not host:
host = cfg.CONF.host
self.host = host
super(Manager, self).__init__()
def periodic_tasks(self, context, raise_on_error=False):
self.run_periodic_tasks(context, raise_on_error=raise_on_error)
def init_host(self):
"""Handle initialization if this is a standalone service.
Child classes should override this method.
"""
pass
def after_start(self):
"""Handler post initialization stuff.
Child classes can override this method.
"""
pass
def validate_post_plugin_load():
"""Checks if the configuration variables are valid.
If the configuration is invalid then the method will return an error
message. If all is OK then it will return None.
"""
if ('dhcp_agents_per_network' in cfg.CONF and
cfg.CONF.dhcp_agents_per_network <= 0):
msg = _("dhcp_agents_per_network must be >= 1. '%s' "
"is invalid.") % cfg.CONF.dhcp_agents_per_network
return msg
def validate_pre_plugin_load():
"""Checks if the configuration variables are valid.
If the configuration is invalid then the method will return an error
message. If all is OK then it will return None.
"""
if cfg.CONF.core_plugin is None:
msg = _('Neutron core_plugin not configured!')
return msg
class NeutronManager(object):
"""Neutron's Manager class.
Neutron's Manager class is responsible for parsing a config file and
instantiating the correct plugin that concretely implements
neutron_plugin_base class.
The caller should make sure that NeutronManager is a singleton.
"""
_instance = None
def __init__(self, options=None, config_file=None):
# If no options have been provided, create an empty dict
if not options:
options = {}
msg = validate_pre_plugin_load()
if msg:
LOG.critical(msg)
raise Exception(msg)
# NOTE(jkoelker) Testing for the subclass with the __subclasshook__
# breaks tach monitoring. It has been removed
# intentionally to allow v2 plugins to be monitored
# for performance metrics.
plugin_provider = cfg.CONF.core_plugin
LOG.info(_("Loading core plugin: %s"), plugin_provider)
self.plugin = self._get_plugin_instance('neutron.core_plugins',
plugin_provider)
msg = validate_post_plugin_load()
if msg:
LOG.critical(msg)
raise Exception(msg)
# core plugin as a part of plugin collection simplifies
# checking extensions
# TODO(enikanorov): make core plugin the same as
# the rest of service plugins
self.service_plugins = {constants.CORE: self.plugin}
self._load_service_plugins()
def _get_plugin_instance(self, namespace, plugin_provider):
try:
# Try to resolve plugin by name
mgr = driver.DriverManager(namespace, plugin_provider)
plugin_class = mgr.driver
except RuntimeError as e1:
# fallback to class name
try:
plugin_class = importutils.import_class(plugin_provider)
except ImportError as e2:
LOG.exception(_("Error loading plugin by name, %s"), e1)
LOG.exception(_("Error loading plugin by class, %s"), e2)
raise ImportError(_("Plugin not found."))
return plugin_class()
def _load_services_from_core_plugin(self):
"""Puts core plugin in service_plugins for supported services."""
LOG.debug(_("Loading services supported by the core plugin"))
# supported service types are derived from supported extensions
for ext_alias in getattr(self.plugin,
"supported_extension_aliases", []):
if ext_alias in constants.EXT_TO_SERVICE_MAPPING:
service_type = constants.EXT_TO_SERVICE_MAPPING[ext_alias]
self.service_plugins[service_type] = self.plugin
LOG.info(_("Service %s is supported by the core plugin"),
service_type)
def _load_service_plugins(self):
"""Loads service plugins.
Starts from the core plugin and checks if it supports
advanced services then loads classes provided in configuration.
"""
# load services from the core plugin first
self._load_services_from_core_plugin()
plugin_providers = cfg.CONF.service_plugins
LOG.debug(_("Loading service plugins: %s"), plugin_providers)
for provider in plugin_providers:
if provider == '':
continue
LOG.info(_("Loading Plugin: %s"), provider)
plugin_inst = self._get_plugin_instance('neutron.service_plugins',
provider)
# only one implementation of svc_type allowed
# specifying more than one plugin
# for the same type is a fatal exception
if plugin_inst.get_plugin_type() in self.service_plugins:
raise ValueError(_("Multiple plugins for service "
"%s were configured"),
plugin_inst.get_plugin_type())
self.service_plugins[plugin_inst.get_plugin_type()] = plugin_inst
# search for possible agent notifiers declared in service plugin
# (needed by agent management extension)
if (hasattr(self.plugin, 'agent_notifiers') and
hasattr(plugin_inst, 'agent_notifiers')):
self.plugin.agent_notifiers.update(plugin_inst.agent_notifiers)
LOG.debug(_("Successfully loaded %(type)s plugin. "
"Description: %(desc)s"),
{"type": plugin_inst.get_plugin_type(),
"desc": plugin_inst.get_plugin_description()})
@classmethod
@utils.synchronized("manager")
def _create_instance(cls):
if not cls.has_instance():
cls._instance = cls()
@classmethod
def has_instance(cls):
return cls._instance is not None
@classmethod
def clear_instance(cls):
cls._instance = None
@classmethod
def get_instance(cls):
# double checked locking
if not cls.has_instance():
cls._create_instance()
return cls._instance
@classmethod
def get_plugin(cls):
# Return a weakref to minimize gc-preventing references.
return weakref.proxy(cls.get_instance().plugin)
@classmethod
def get_service_plugins(cls):
# Return weakrefs to minimize gc-preventing references.
return dict((x, weakref.proxy(y))
for x, y in cls.get_instance().service_plugins.iteritems())

View File

@ -18,4 +18,4 @@
import gettext import gettext
gettext.install('neutron', unicode=1) gettext.install('tacker', unicode=1)

View File

@ -18,15 +18,15 @@ from oslo.config import cfg
import webob.dec import webob.dec
import webob.exc import webob.exc
from neutron import context from tacker import context
from neutron.openstack.common import log as logging from tacker.openstack.common import log as logging
from neutron.openstack.common.middleware import request_id from tacker.openstack.common.middleware import request_id
from neutron import wsgi from tacker import wsgi
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class NeutronKeystoneContext(wsgi.Middleware): class TackerKeystoneContext(wsgi.Middleware):
"""Make a request context from keystone headers.""" """Make a request context from keystone headers."""
@webob.dec.wsgify @webob.dec.wsgify
@ -56,7 +56,7 @@ class NeutronKeystoneContext(wsgi.Middleware):
request_id=req_id) request_id=req_id)
# Inject the context... # Inject the context...
req.environ['neutron.context'] = ctx req.environ['tacker.context'] = ctx
return self.application return self.application

View File

@ -21,11 +21,11 @@ import copy
import datetime import datetime
from neutron.db import api as db_api from tacker.db import api as db_api
from neutron.openstack.common import context as common_context from tacker.openstack.common import context as common_context
from neutron.openstack.common import local from tacker.openstack.common import local
from neutron.openstack.common import log as logging from tacker.openstack.common import log as logging
from neutron import policy from tacker import policy
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)

View File

@ -19,7 +19,7 @@ import pep8
""" """
Guidelines for writing new hacking checks Guidelines for writing new hacking checks
- Use only for Neutron specific tests. OpenStack general tests - Use only for Tacker specific tests. OpenStack general tests
should be submitted to the common 'hacking' module. should be submitted to the common 'hacking' module.
- Pick numbers in the range N3xx. Find the current test with - Pick numbers in the range N3xx. Find the current test with
the highest allocated number and then pick the next value. the highest allocated number and then pick the next value.
@ -27,7 +27,7 @@ Guidelines for writing new hacking checks
on the N3xx value. on the N3xx value.
- List the new rule in the top level HACKING.rst file - List the new rule in the top level HACKING.rst file
- Add test cases for each new rule to - Add test cases for each new rule to
neutron/tests/unit/test_hacking.py tacker/tests/unit/test_hacking.py
""" """
@ -37,7 +37,7 @@ log_translation = re.compile(
def validate_log_translations(logical_line, physical_line, filename): def validate_log_translations(logical_line, physical_line, filename):
# Translations are not required in the test directory # Translations are not required in the test directory
if "neutron/tests" in filename: if "tacker/tests" in filename:
return return
if pep8.noqa(physical_line): if pep8.noqa(physical_line):
return return

165
tacker/manager.py Normal file
View File

@ -0,0 +1,165 @@
# Copyright 2011 VMware, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from tacker.common import rpc_compat
from tacker.common import utils
from tacker.openstack.common import importutils
from tacker.openstack.common import log as logging
from tacker.openstack.common import periodic_task
from tacker.plugins.common import constants
LOG = logging.getLogger(__name__)
class Manager(rpc_compat.RpcCallback, periodic_task.PeriodicTasks):
# Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0'
def __init__(self, host=None):
if not host:
host = cfg.CONF.host
self.host = host
super(Manager, self).__init__()
def periodic_tasks(self, context, raise_on_error=False):
self.run_periodic_tasks(context, raise_on_error=raise_on_error)
def init_host(self):
"""Handle initialization if this is a standalone service.
Child classes should override this method.
"""
pass
def after_start(self):
"""Handler post initialization stuff.
Child classes can override this method.
"""
pass
def validate_post_plugin_load():
"""Checks if the configuration variables are valid.
If the configuration is invalid then the method will return an error
message. If all is OK then it will return None.
"""
pass
def validate_pre_plugin_load():
"""Checks if the configuration variables are valid.
If the configuration is invalid then the method will return an error
message. If all is OK then it will return None.
"""
pass
class TackerManager(object):
"""Tacker's Manager class.
Tacker's Manager class is responsible for parsing a config file and
instantiating the correct plugin that concretely implement
tacker_plugin_base class.
The caller should make sure that TackerManager is a singleton.
"""
_instance = None
def __init__(self, options=None, config_file=None):
# If no options have been provided, create an empty dict
if not options:
options = {}
msg = validate_pre_plugin_load()
if msg:
LOG.critical(msg)
raise Exception(msg)
msg = validate_post_plugin_load()
if msg:
LOG.critical(msg)
raise Exception(msg)
self.service_plugins = {}
self._load_service_plugins()
def _load_service_plugins(self):
"""Loads service plugins.
Starts from the core plugin and checks if it supports
advanced services then loads classes provided in configuration.
"""
# plugin_providers = cfg.CONF.service_plugins
plugin_providers = ['tacker.vm.plugin.ServiceVMPlugin']
LOG.debug(_("Loading service plugins: %s"), plugin_providers)
for provider in plugin_providers:
if provider == '':
continue
try:
LOG.info(_("Loading Plugin: %s"), provider)
plugin_class = importutils.import_class(provider)
except ImportError:
LOG.exception(_("Error loading plugin"))
raise ImportError(_("Plugin not found."))
plugin_inst = plugin_class()
# only one implementation of svc_type allowed
# specifying more than one plugin
# for the same type is a fatal exception
if plugin_inst.get_plugin_type() in self.service_plugins:
raise ValueError(_("Multiple plugins for service "
"%s were configured"),
plugin_inst.get_plugin_type())
self.service_plugins[plugin_inst.get_plugin_type()] = plugin_inst
# # search for possible agent notifiers declared in service plugin
# # (needed by agent management extension)
# if (hasattr(self.plugin, 'agent_notifiers') and
# hasattr(plugin_inst, 'agent_notifiers')):
# self.plugin.agent_notifiers.update(plugin_inst.agent_notifiers)
LOG.debug(_("Successfully loaded %(type)s plugin. "
"Description: %(desc)s"),
{"type": plugin_inst.get_plugin_type(),
"desc": plugin_inst.get_plugin_description()})
@classmethod
@utils.synchronized("manager")
def _create_instance(cls):
if cls._instance is None:
cls._instance = cls()
@classmethod
def get_instance(cls):
# double checked locking
if cls._instance is None:
cls._create_instance()
return cls._instance
@classmethod
def get_plugin(cls):
return cls.get_instance().plugin
@classmethod
def get_service_plugins(cls):
return cls.get_instance().service_plugins

View File

@ -16,20 +16,20 @@
# under the License. # under the License.
""" """
Policy engine for neutron. Largely copied from nova. Policy engine for tacker. Largely copied from nova.
""" """
import itertools import itertools
import re import re
from oslo.config import cfg from oslo.config import cfg
from neutron.api.v2 import attributes from tacker.api.v1 import attributes
from neutron.common import exceptions from tacker.common import exceptions
import neutron.common.utils as utils import tacker.common.utils as utils
from neutron.openstack.common import excutils from tacker.openstack.common import excutils
from neutron.openstack.common import importutils from tacker.openstack.common import importutils
from neutron.openstack.common import log as logging from tacker.openstack.common import log as logging
from neutron.openstack.common import policy from tacker.openstack.common import policy
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -53,7 +53,7 @@ DEPRECATED_ACTION_MAP = {
'set': ['create', 'update'] 'set': ['create', 'update']
} }
cfg.CONF.import_opt('policy_file', 'neutron.common.config') cfg.CONF.import_opt('policy_file', 'tacker.common.config')
def reset(): def reset():
@ -264,12 +264,12 @@ class OwnerCheck(policy.Check):
# check more general # check more general
# FIXME(ihrachys): if import is put in global, circular # FIXME(ihrachys): if import is put in global, circular
# import failure occurs # import failure occurs
from neutron import manager from tacker import manager
f = getattr(manager.NeutronManager.get_instance().plugin, f = getattr(manager.TackerManager.get_instance().plugin,
'get_%s' % parent_res) 'get_%s' % parent_res)
# f *must* exist, if not found it is better to let neutron # f *must* exist, if not found it is better to let tacker
# explode. Check will be performed with admin context # explode. Check will be performed with admin context
context = importutils.import_module('neutron.context') context = importutils.import_module('tacker.context')
try: try:
data = f(context.get_admin_context(), data = f(context.get_admin_context(),
target[parent_foreign_key], target[parent_foreign_key],
@ -329,7 +329,7 @@ def _prepare_check(context, action, target):
def check(context, action, target, plugin=None, might_not_exist=False): def check(context, action, target, plugin=None, might_not_exist=False):
"""Verifies that the action is valid on the target in this context. """Verifies that the action is valid on the target in this context.
:param context: neutron context :param context: tacker context
:param action: string representing the action to be checked :param action: string representing the action to be checked
this should be colon separated for clarity. this should be colon separated for clarity.
:param target: dictionary representing the object of the action :param target: dictionary representing the object of the action
@ -351,7 +351,7 @@ def check(context, action, target, plugin=None, might_not_exist=False):
def enforce(context, action, target, plugin=None): def enforce(context, action, target, plugin=None):
"""Verifies that the action is valid on the target in this context. """Verifies that the action is valid on the target in this context.
:param context: neutron context :param context: tacker context
:param action: string representing the action to be checked :param action: string representing the action to be checked
this should be colon separated for clarity. this should be colon separated for clarity.
:param target: dictionary representing the object of the action :param target: dictionary representing the object of the action
@ -360,7 +360,7 @@ def enforce(context, action, target, plugin=None):
:param plugin: currently unused and deprecated. :param plugin: currently unused and deprecated.
Kept for backward compatibility. Kept for backward compatibility.
:raises neutron.exceptions.PolicyNotAuthorized: if verification fails. :raises tacker.exceptions.PolicyNotAuthorized: if verification fails.
""" """
rule, target, credentials = _prepare_check(context, action, target) rule, target, credentials = _prepare_check(context, action, target)

View File

@ -19,19 +19,15 @@ import os
import random import random
from oslo.config import cfg from oslo.config import cfg
from oslo.messaging import server as rpc_server
from neutron.common import config from tacker.common import config
from neutron.common import rpc_compat from tacker.common import rpc_compat
from neutron import context from tacker import context
from neutron.db import api as session from tacker.openstack.common import excutils
from neutron import manager from tacker.openstack.common import importutils
from neutron.openstack.common import excutils from tacker.openstack.common import log as logging
from neutron.openstack.common import importutils from tacker.openstack.common import loopingcall
from neutron.openstack.common import log as logging from tacker import wsgi
from neutron.openstack.common import loopingcall
from neutron.openstack.common import service as common_service
from neutron import wsgi
service_opts = [ service_opts = [
@ -41,9 +37,6 @@ service_opts = [
cfg.IntOpt('api_workers', cfg.IntOpt('api_workers',
default=0, default=0,
help=_('Number of separate worker processes for service')), help=_('Number of separate worker processes for service')),
cfg.IntOpt('rpc_workers',
default=0,
help=_('Number of RPC worker processes for service')),
cfg.IntOpt('periodic_fuzzy_delay', cfg.IntOpt('periodic_fuzzy_delay',
default=5, default=5,
help=_('Range of seconds to randomly delay when starting the ' help=_('Range of seconds to randomly delay when starting the '
@ -76,11 +69,11 @@ class WsgiService(object):
self.wsgi_app.wait() self.wsgi_app.wait()
class NeutronApiService(WsgiService): class TackerApiService(WsgiService):
"""Class for neutron-api service.""" """Class for tacker-api service."""
@classmethod @classmethod
def create(cls, app_name='neutron'): def create(cls, app_name='tacker'):
# Setup logging early, supplying both the CLI options and the # Setup logging early, supplying both the CLI options and the
# configuration mapping from the config file # configuration mapping from the config file
@ -108,73 +101,17 @@ def serve_wsgi(cls):
return service return service
class RpcWorker(object):
"""Wraps a worker to be handled by ProcessLauncher"""
def __init__(self, plugin):
self._plugin = plugin
self._servers = []
def start(self):
# We may have just forked from parent process. A quick disposal of the
# existing sql connections avoids producing errors later when they are
# discovered to be broken.
session.get_engine().pool.dispose()
self._servers = self._plugin.start_rpc_listeners()
def wait(self):
for server in self._servers:
if isinstance(server, rpc_server.MessageHandlingServer):
server.wait()
def stop(self):
for server in self._servers:
if isinstance(server, rpc_server.MessageHandlingServer):
server.kill()
self._servers = []
def serve_rpc():
plugin = manager.NeutronManager.get_plugin()
# If 0 < rpc_workers then start_rpc_listeners would be called in a
# subprocess and we cannot simply catch the NotImplementedError. It is
# simpler to check this up front by testing whether the plugin supports
# multiple RPC workers.
if not plugin.rpc_workers_supported():
LOG.debug(_("Active plugin doesn't implement start_rpc_listeners"))
if 0 < cfg.CONF.rpc_workers:
msg = _("'rpc_workers = %d' ignored because start_rpc_listeners "
"is not implemented.")
LOG.error(msg, cfg.CONF.rpc_workers)
raise NotImplementedError
try:
rpc = RpcWorker(plugin)
if cfg.CONF.rpc_workers < 1:
rpc.start()
return rpc
else:
launcher = common_service.ProcessLauncher(wait_interval=1.0)
launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
return launcher
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_('Unrecoverable error: please check log '
'for details.'))
def _run_wsgi(app_name): def _run_wsgi(app_name):
app = config.load_paste_app(app_name) app = config.load_paste_app(app_name)
if not app: if not app:
LOG.error(_('No known API applications configured.')) LOG.error(_('No known API applications configured.'))
return return
server = wsgi.Server("Neutron") server = wsgi.Server("Tacker")
server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host, server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host,
workers=cfg.CONF.api_workers) workers=cfg.CONF.api_workers)
# Dump all option values here after all options are parsed # Dump all option values here after all options are parsed
cfg.CONF.log_opt_values(LOG, std_logging.DEBUG) cfg.CONF.log_opt_values(LOG, std_logging.DEBUG)
LOG.info(_("Neutron service started, listening on %(host)s:%(port)s"), LOG.info(_("Tacker service started, listening on %(host)s:%(port)s"),
{'host': cfg.CONF.bind_host, {'host': cfg.CONF.bind_host,
'port': cfg.CONF.bind_port}) 'port': cfg.CONF.bind_port})
return server return server
@ -248,7 +185,7 @@ class Service(rpc_compat.Service):
if not binary: if not binary:
binary = os.path.basename(inspect.stack()[-1][1]) binary = os.path.basename(inspect.stack()[-1][1])
if not topic: if not topic:
topic = binary.rpartition('neutron-')[2] topic = binary.rpartition('tacker-')[2]
topic = topic.replace("-", "_") topic = topic.replace("-", "_")
if not manager: if not manager:
manager = CONF.get('%s_manager' % topic, None) manager = CONF.get('%s_manager' % topic, None)
@ -295,5 +232,5 @@ class Service(rpc_compat.Service):
def report_state(self): def report_state(self):
"""Update the state of this service.""" """Update the state of this service."""
# Todo(gongysh) report state to neutron server # Todo(gongysh) report state to tacker server
pass pass

View File

@ -16,4 +16,4 @@
import pbr.version import pbr.version
version_info = pbr.version.VersionInfo('neutron') version_info = pbr.version.VersionInfo('tacker')

View File

@ -30,22 +30,22 @@ from xml.etree import ElementTree as etree
from xml.parsers import expat from xml.parsers import expat
import eventlet.wsgi import eventlet.wsgi
eventlet.patcher.monkey_patch(all=False, socket=True, thread=True) #eventlet.patcher.monkey_patch(all=False, socket=True, thread=True)
from oslo.config import cfg from oslo.config import cfg
import routes.middleware import routes.middleware
import webob.dec import webob.dec
import webob.exc import webob.exc
from neutron.common import constants from tacker.common import constants
from neutron.common import exceptions as exception from tacker.common import exceptions as exception
from neutron import context from tacker import context
from neutron.db import api from tacker.db import api
from neutron.openstack.common import excutils from tacker.openstack.common import excutils
from neutron.openstack.common import gettextutils from tacker.openstack.common import gettextutils
from neutron.openstack.common import jsonutils from tacker.openstack.common import jsonutils
from neutron.openstack.common import log as logging from tacker.openstack.common import log as logging
from neutron.openstack.common import service as common_service from tacker.openstack.common import service as common_service
from neutron.openstack.common import systemd from tacker.openstack.common import systemd
socket_opts = [ socket_opts = [
cfg.IntOpt('backlog', cfg.IntOpt('backlog',
@ -356,14 +356,14 @@ class Request(webob.Request):
""" """
if not self.accept_language: if not self.accept_language:
return None return None
all_languages = gettextutils.get_available_languages('neutron') all_languages = gettextutils.get_available_languages('tacker')
return self.accept_language.best_match(all_languages) return self.accept_language.best_match(all_languages)
@property @property
def context(self): def context(self):
if 'neutron.context' not in self.environ: if 'tacker.context' not in self.environ:
self.environ['neutron.context'] = context.get_admin_context() self.environ['tacker.context'] = context.get_admin_context()
return self.environ['neutron.context'] return self.environ['tacker.context']
class ActionDispatcher(object): class ActionDispatcher(object):
@ -887,7 +887,7 @@ class Application(object):
which would result in a call to the `Wadl` class as which would result in a call to the `Wadl` class as
import neutron.api.fancy_api import tacker.api.fancy_api
fancy_api.Wadl(latest_version='1.3') fancy_api.Wadl(latest_version='1.3')
You could of course re-implement the `factory` method in subclasses, You could of course re-implement the `factory` method in subclasses,