Fix calls in sysinv to non-existent methods and constants

This review enables the E1101 no-member pylint error check and
provides fixes for all the places that were broken.
There have already been several bugs raised and fixed for code that
would have been caught if this error check had been enabled.

Filtered files:
 - The sqlalchemy 'Table' class confuses pylint and has been
added to the ignored-classes list along with any
sqlalchemy Enums declared in the migration scripts.

Removed:
 - testpci was an old test script and has been removed.

 - sysinv_deploy_helper was an old helper script that was unused
and would have raised runtime errors if invoked.
 - test_sysinv_deploy_helper.py mocked everything in
sysinv_deploy_helper so was removed with it.

Fixed:
 - tox should have been pointing at python-cephclient in stx-integ
rather than a completely different codebase on pypi.

 - impl_zmq / zmq_receiver / matchmaker_redis were removed since
only impl_kombu is used by sysinv, and those files are unused
pre-oslo code.

 - configure_osd_pools was never being called and had been partially
removed. Now the methods calling the non-existent method are removed
as well. (conductor/ceph, conductor/manager, conductor/rpcapi

 - v1/base.py is an abstract class that requires all
of its subclasses to initialize a 'fields' attribute.

 - v1/certificate v1/storage_ceph_external.py and v1/license were
referencing a non-existent 'value' attribute when trying to get
a string representation of the exception.

 - v1/collection has an accessor for _type which is a required field
on all subclasses.

 - v1/lldp_tlv had an invalid check.  The class is never instantiated
with a from_ihosts field, so that check would always raise an exception.

 - v1/service_parameter was raising an exception that did not exist.

 - v1/storage.py was not handling a patch with a for_tierid

 - v1/user.py; exception.KeyError does not exist.
(it is exceptions.KeyError) however exceptions is a default
imported module so does not need to be explicitly declared.

 - v1/utils.py needed to accomodate the renaming of swift to radosgw
constants as part of changeid: Id8d5c6b1159881d44810fc3622990456f1e54e75

  - objects/* Just about every DB object declaration defines its fields
using a dynamic constructor in the base class, so any use of one of
those fields to access the id/uuid during the save routine needed to
have this no-member check suppressed.

 - helm/base  LABEL_COMPUTE does not exist.
It needs to be LABEL_COMPUTE_LABEL

 - helm/openstack and helm/elastic require all of their subclasses to
declare a CHART property however did not enforce themselves as
abstract.

 - puppet/ceph was calling an internal method with the wrong spelling.
it is unlikely those methods are being called and can likely be removed.

 - puppet/ovs was referencing a stale constant. The method that had
this typo does not look to be used and can likely be removed.

 - conductor/ceph: constructing a wrong exception

 - conductor/manager: Fixed spelling of method:
report_external_config_failure.
Removed a call to a non-existent ceph method for a non-existent ceph
service param.

 - conductor/openstack: fixed typo in name of get keystone client method

 - common/exceptions There were exceptions being created that had never
been declared in this file so they have been added.
This includes: AddressPoolNotFoundByID, NTPNotFound

 - common/periodictask had a metaclass defined in an unusual way that
the subclass was unable to tell that it inherited some attributes

 - common/policy uses a metaclass to define a variable that confuses
pylint

 - common/setup needed to reference the right module for email
MessageError

 - service_parameter - _rpm_pkg_is_installed was no longer needed since
the checking for lefthandclient and 3parclient code was removed from
sysinv.

 - sqlalchemy/api had several typos for classes defined in the db model.
There were places that would check isinstance against an internal
tablename value, rather than the python class.

 - sqlalchemy/session uses a attribute system that confuses pylint.

Change-Id: If075893067b28b4fb4252ffd12ecef018e890b95
Signed-off-by: Al Bailey <Al.Bailey@windriver.com>
This commit is contained in:
Al Bailey 2019-08-13 13:25:26 -05:00
parent 6123f0fa9c
commit 356854a133
104 changed files with 252 additions and 1942 deletions

View File

@ -2,10 +2,11 @@
# Specify a configuration file.
rcfile=pylint.rc
# Python code to execute, usually for sys.path manipulation such as pygtk.require().
# Python code to execute, usually for sys.path manipulation such as
# pygtk.require().
#init-hook=
# Add files or directories to the blacklist. They should be base names, not paths.
# Add files or directories to the blacklist. Should be base names, not paths.
ignore=tests
# Pickle collected data for later comparisons.
@ -15,6 +16,19 @@ persistent=yes
# usually to register additional checkers.
load-plugins=
# Use multiple processes to speed up Pylint.
jobs=4
# Allow loading of arbitrary C extensions. Extensions are imported into the
# active Python interpreter and may run arbitrary code.
unsafe-load-any-extension=no
# A comma-separated list of package or module names from where C extensions may
# be loaded. Extensions are loading into the active Python interpreter and may
# run arbitrary code
extension-pkg-whitelist=lxml.etree,greenlet
[MESSAGES CONTROL]
# Enable the message, report, category or checker with the given id(s). You can
@ -26,7 +40,8 @@ load-plugins=
# can either give multiple identifier separated by comma (,) or put this option
# multiple time (only on the command line, not in the configuration file where
# it should appear only once).
# https://pylint.readthedocs.io/en/latest/user_guide/output.html#source-code-analysis-section
# See "Messages Control" section of
# https://pylint.readthedocs.io/en/latest/user_guide
# We are disabling (C)onvention
# We are disabling (R)efactor
# We are selectively disabling (W)arning
@ -72,17 +87,16 @@ load-plugins=
# E0611: no-name-in-module
# E0633: unpacking-non-sequence
# E0701: bad-except-order
# E1101: no-member
# E1102: not-callable
# E1120: no-value-for-parameter
# E1121: too-many-function-args
# E1124: redundant-keyword-arg
disable=C, R, fixme, W0101, W0105, W0106, W0107, W0108, W0110, W0123, W0150,
W0201, W0211, W0212, W0221, W0223, W0231, W0235, W0311, W0402, W0403, W0404,
W0603, W0612, W0613, W0621, W0622, W0631, W0632, W0701, W0703,
W0201, W0211, W0212, W0221, W0223, W0231, W0235, W0311, W0402, W0403,
W0404, W0603, W0612, W0613, W0621, W0622, W0631, W0632, W0701, W0703,
W1113, W1201, W1401, W1505,
E0213, E0401, E0604, E0611, E0633, E0701,
E1101, E1102, E1120, E1121, E1124
E1102, E1120, E1121, E1124
[REPORTS]
# Set the output format. Available formats are text, parseable, colorized, msvs
@ -123,7 +137,7 @@ max-line-length=85
# Maximum number of lines in a module
max-module-lines=1000
# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1 tab).
# String used as indentation unit. This is usually 4 spaces or "\t" (1 tab).
indent-string=' '
@ -132,9 +146,18 @@ indent-string=' '
# mixin class is detected if its name ends with "mixin" (case insensitive).
ignore-mixin-members=yes
# List of module names for which member attributes should not be checked
# (useful for modules/projects where namespaces are manipulated during runtime
# and thus existing member attributes cannot be deduced by static analysis
ignored-modules=distutils,eventlet.green.subprocess,six,six.moves
# List of classes names for which member attributes should not be checked
# (useful for classes with attributes dynamically set).
ignored-classes=SQLObject
# pylint is confused by sqlalchemy Table, as well as sqlalchemy Enum types
# ie: (unprovisioned, identity)
# LookupDict in requests library confuses pylint
ignored-classes=SQLObject, optparse.Values, thread._local, _thread._local,
Table, unprovisioned, identity, LookupDict
# List of members which are set dynamically and missed by pylint inference
# system, and so shouldn't trigger E0201 when accessed. Python regular

View File

@ -1,97 +0,0 @@
#
# Copyright (c) 2013-2014 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import os
import subprocess
import shlex
pciaddr = 0
iclass = 1
vendor = 2
device = 3
revision = 4
svendor = 5
sdevice = 6
class Ipci(object):
'''Class to encapsulate PCI data for System Inventory'''
def __init__(self, pciaddr, iclass, vendor, device, revision,
svendor, sdevice, description=""):
'''Construct a Ipci object with the given values.'''
self.pciaddr = pciaddr
self.iclass = iclass
self.vendor = vendor
self.device = device
self.revision = revision
self.svendor = svendor
self.sdevice = sdevice
def __eq__(self, rhs):
return (self.vendorId == rhs.vendorId and
self.deviceId == rhs.deviceId)
def __ne__(self, rhs):
return (self.vendorId != rhs.vendorId or
self.deviceId != rhs.deviceId)
def __str__(self):
return "%s [%s] [%s]" % (
self.description, self.vendorId, self.deviceId)
def __repr__(self):
return "<PciInfo '%s'>" % str(self)
class IpciOperator(object):
'''Class to encapsulate PCI operations for System Inventory'''
def pci_inics_get(self):
p = subprocess.Popen(["lspci", "-Dm"], stdout=subprocess.PIPE)
pci_inics = []
for line in p.stdout:
if 'Ethernet' in line:
inic = shlex.split(line.strip())
if inic[iclass].startswith('Ethernet controller'):
pci_inics.append(Ipci(inic[pciaddr], inic[iclass],
inic[vendor], inic[device], inic[revision],
inic[svendor], inic[sdevice]))
p.wait()
return pci_inics
def pci_bus_scan_get_attributes(self, pciaddr):
''' For this pciaddr, build a list of dictattributes per port '''
pciaddrs = os.listdir('/sys/bus/pci/devices/')
for a in pciaddrs:
if ((a == pciaddr) or ("0000:" + a == pciaddr)):
# directory with match, so look inside net directory
# expect to find address,speed,mtu etc. info
p = subprocess.Popen(["cat", "a"], stdout=subprocess.PIPE)
p.wait()
my_pci_inics = IpciOperator()
pci_inics = []
pci_inics = my_pci_inics.pci_inics_get()
# post these to database by host, pciaddr
for i in pci_inics:
print("JKUNG pciaddr=%s, iclass=%s, vendor=%s, device=%s, rev=%s, svendor=%s, sdevice=%s" % (i.pciaddr, i.iclass, i.vendor, i.device, i.revision, i.svendor, i.sdevice))
# try:
# rpc.db_post_by_host_and_mac()
# except:
# try patch if that doesnt work, then continue

View File

@ -26,7 +26,7 @@ class APIBase(wtypes.Base):
def as_dict(self):
"""Render this object as a dict of its fields."""
return dict((k, getattr(self, k))
for k in self.fields
for k in self.fields # pylint: disable=no-member
if hasattr(self, k) and
getattr(self, k) != wsme.Unset)

View File

@ -369,9 +369,9 @@ class CertificateController(rest.RestController):
config_dict)
except Exception as e:
msg = "Exception occured e={}".format(e)
msg = "Exception occurred e={}".format(e)
LOG.info(msg)
return dict(success="", error=e.value, body="", certificates={})
return dict(success="", error=str(e), body="", certificates={})
# Update with installed certificate information
values = {

View File

@ -34,7 +34,7 @@ class Collection(base.APIBase):
@property
def collection(self):
return getattr(self, self._type)
return getattr(self, self._type) # pylint: disable=no-member
def has_next(self, limit):
"""Return whether collection has more items."""
@ -45,7 +45,7 @@ class Collection(base.APIBase):
if not self.has_next(limit):
return wtypes.Unset
resource_url = url or self._type
resource_url = url or self._type # pylint: disable=no-member
q_args = ''.join(['%s=%s&' % (key, kwargs[key]) for key in kwargs])
next_args = '?%(args)slimit=%(limit)d&marker=%(marker)s' % {
'args': q_args, 'limit': limit,

View File

@ -802,8 +802,8 @@ class Host(base.APIBase):
uhost.vsc_controllers = wtypes.Unset
uhost.peers = None
if uhost.peer_id:
ipeers = pecan.request.dbapi.peer_get(uhost.peer_id)
if uhost.peer_id: # pylint: disable=no-member
ipeers = pecan.request.dbapi.peer_get(uhost.peer_id) # pylint: disable=no-member
uhost.peers = {'name': ipeers.name, 'hosts': ipeers.hosts}
return uhost

View File

@ -133,6 +133,6 @@ class LicenseController(rest.RestController):
try:
pecan.request.rpcapi.install_license_file(pecan.request.context, contents)
except Exception as e:
return dict(success="", error=e.value)
return dict(success="", error=str(e))
return dict(success="Success: new license installed", error="")

View File

@ -200,9 +200,6 @@ class LLDPTLVController(rest.RestController):
@wsme_pecan.wsexpose(LLDPTLV, int)
def get_one(self, id):
"""Retrieve information about the given lldp tlv."""
if self._from_ihosts:
raise exception.OperationNotPermitted
rpc_lldp_tlv = objects.lldp_tlv.get_by_id(
pecan.request.context, id)
return LLDPTLV.convert_with_links(rpc_lldp_tlv)

View File

@ -334,7 +334,7 @@ class ServiceParameterController(rest.RestController):
if len(parameters) > 1:
msg = _("Cannot specify multiple parameters with custom resource.")
raise wsme.exc.CommandError(msg)
raise wsme.exc.ClientSideError(msg)
for name, value in parameters.items():
new_record = {

View File

@ -370,8 +370,8 @@ class StorageController(rest.RestController):
p['value'] = ihost.id
elif p['path'] == '/tier_uuid':
p['path'] = '/fortierid'
tier = objects.tier.get_by_uuid(pecan.request.context,
p['value'])
tier = objects.storage_tier.get_by_uuid(pecan.request.context,
p['value'])
p['value'] = tier.id
try:

View File

@ -42,11 +42,11 @@ from sysinv.openstack.common import log
from sysinv.openstack.common.gettextutils import _
from sysinv.openstack.common import uuidutils
# pylint: disable=unused-import
from sysinv.api.controllers.v1 import storage_ceph # noqa
from sysinv.api.controllers.v1 import storage_lvm # noqa
from sysinv.api.controllers.v1 import storage_file # noqa
from sysinv.api.controllers.v1 import storage_ceph_external # noqa
# The following four imports implicitly pull in constants and functionality
from sysinv.api.controllers.v1 import storage_ceph # noqa: F401 pylint: disable=unused-import
from sysinv.api.controllers.v1 import storage_lvm # noqa: F401 pylint: disable=unused-import
from sysinv.api.controllers.v1 import storage_file # noqa: F401 pylint: disable=unused-import
from sysinv.api.controllers.v1 import storage_ceph_external # noqa: F401 pylint: disable=unused-import
LOG = log.getLogger(__name__)

View File

@ -289,9 +289,7 @@ class StorageCephExternalController(rest.RestController):
pecan.request.context, file_content, ceph_conf_fn)
except Exception as e:
LOG.exception(e)
return dict(
success="",
error=e.value)
return dict(success="", error=str(e))
return dict(success="Success: ceph config file is uploaded", error="")

View File

@ -303,7 +303,7 @@ class UserController(rest.RestController):
msg = _("User sysadmin update failed: system %s user %s : patch %s"
% (isystem['systemname'], user, patch))
raise wsme.exc.ClientSideError(msg)
except exception.KeyError:
except KeyError:
msg = _("Cannot retrieve shadow entry for sysadmin: system %s : patch %s"
% (isystem['systemname'], patch))
raise wsme.exc.ClientSideError(msg)

View File

@ -722,9 +722,9 @@ class SBApiHelper(object):
def check_swift_enabled():
try:
swift_enabled = pecan.request.dbapi.service_parameter_get_one(
service=constants.SERVICE_TYPE_SWIFT,
section=constants.SERVICE_PARAM_SECTION_SWIFT_CONFIG,
name=constants.SERVICE_PARAM_NAME_SWIFT_SERVICE_ENABLED)
service=constants.SERVICE_TYPE_RADOSGW,
section=constants.SERVICE_PARAM_SECTION_RADOSGW_CONFIG,
name=constants.SERVICE_PARAM_NAME_RADOSGW_SERVICE_ENABLED)
if swift_enabled and swift_enabled.value.lower() == 'true':
raise wsme.exc.ClientSideError(
"Swift is already enabled through service parameter.")

View File

@ -1,340 +0,0 @@
#!/usr/bin/env python
# Copyright (c) 2012 NTT DOCOMO, INC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2013-2016 Wind River Systems, Inc.
#
"""Starter script for Bare-Metal Deployment Service."""
import os
import sys
import threading
import time
import cgi
import re
import six.moves.queue as Queue
import socket
import stat
from wsgiref import simple_server
from sysinv.common import config
from sysinv.common import exception
from sysinv.common import states
from sysinv.common import utils
from sysinv import db
from sysinv.openstack.common import context as sysinv_context
from sysinv.openstack.common import excutils
from sysinv.openstack.common.gettextutils import _
from sysinv.openstack.common import log as logging
QUEUE = Queue.Queue()
LOG = logging.getLogger(__name__)
# All functions are called from deploy() directly or indirectly.
# They are split for stub-out.
def discovery(portal_address, portal_port):
"""Do iSCSI discovery on portal."""
utils.execute('iscsiadm',
'-m', 'discovery',
'-t', 'st',
'-p', '%s:%s' % (portal_address, portal_port),
run_as_root=True,
check_exit_code=[0])
def login_iscsi(portal_address, portal_port, target_iqn):
"""Login to an iSCSI target."""
utils.execute('iscsiadm',
'-m', 'node',
'-p', '%s:%s' % (portal_address, portal_port),
'-T', target_iqn,
'--login',
run_as_root=True,
check_exit_code=[0])
# Ensure the login complete
time.sleep(3)
def logout_iscsi(portal_address, portal_port, target_iqn):
"""Logout from an iSCSI target."""
utils.execute('iscsiadm',
'-m', 'node',
'-p', '%s:%s' % (portal_address, portal_port),
'-T', target_iqn,
'--logout',
run_as_root=True,
check_exit_code=[0])
def make_partitions(dev, root_mb, swap_mb):
"""Create partitions for root and swap on a disk device."""
# Lead in with 1MB to allow room for the partition table itself, otherwise
# the way sfdisk adjusts doesn't shift the partition up to compensate, and
# we lose the space.
# http://bazaar.launchpad.net/~ubuntu-branches/ubuntu/raring/util-linux/
# raring/view/head:/fdisk/sfdisk.c#L1940
stdin_command = ('1,%d,83;\n,%d,82;\n0,0;\n0,0;\n' % (root_mb, swap_mb))
utils.execute('sfdisk', '-uM', dev, process_input=stdin_command,
run_as_root=True,
attempts=3,
check_exit_code=[0])
# avoid "device is busy"
time.sleep(3)
def is_block_device(dev):
"""Check whether a device is block or not."""
s = os.stat(dev)
return stat.S_ISBLK(s.st_mode)
def dd(src, dst):
"""Execute dd from src to dst."""
utils.execute('dd',
'if=%s' % src,
'of=%s' % dst,
'bs=1M',
'oflag=direct',
run_as_root=True,
check_exit_code=[0])
def mkswap(dev, label='swap1'):
"""Execute mkswap on a device."""
utils.execute('mkswap',
'-L', label,
dev,
run_as_root=True,
check_exit_code=[0])
def block_uuid(dev):
"""Get UUID of a block device."""
out, _ = utils.execute('blkid', '-s', 'UUID', '-o', 'value', dev,
run_as_root=True,
check_exit_code=[0])
return out.strip()
def switch_pxe_config(path, root_uuid):
"""Switch a pxe config from deployment mode to service mode."""
with open(path) as f:
lines = f.readlines()
root = 'UUID=%s' % root_uuid
rre = re.compile(r'\$\{ROOT\}')
dre = re.compile('^default .*$')
with open(path, 'w') as f:
for line in lines:
line = rre.sub(root, line)
line = dre.sub('default boot', line)
f.write(line)
def notify(address, port):
"""Notify a node that it becomes ready to reboot."""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((address, port))
s.send('done')
finally:
s.close()
def get_dev(address, port, iqn, lun):
"""Returns a device path for given parameters."""
dev = "/dev/disk/by-path/ip-%s:%s-iscsi-%s-lun-%s" \
% (address, port, iqn, lun)
return dev
def get_image_mb(image_path):
"""Get size of an image in Megabyte."""
mb = 1024 * 1024
image_byte = os.path.getsize(image_path)
# round up size to MB
image_mb = int((image_byte + mb - 1) / mb)
return image_mb
def work_on_disk(dev, root_mb, swap_mb, image_path):
"""Creates partitions and write an image to the root partition."""
root_part = "%s-part1" % dev
swap_part = "%s-part2" % dev
if not is_block_device(dev):
LOG.warn(_("parent device '%s' not found") % dev)
return
make_partitions(dev, root_mb, swap_mb)
if not is_block_device(root_part):
LOG.warn(_("root device '%s' not found") % root_part)
return
if not is_block_device(swap_part):
LOG.warn(_("swap device '%s' not found") % swap_part)
return
dd(image_path, root_part)
mkswap(swap_part)
try:
root_uuid = block_uuid(root_part)
except exception.ProcessExecutionError:
with excutils.save_and_reraise_exception():
LOG.error("Failed to detect root device UUID.")
return root_uuid
def deploy(address, port, iqn, lun, image_path, pxe_config_path,
root_mb, swap_mb):
"""All-in-one function to deploy a node."""
dev = get_dev(address, port, iqn, lun)
image_mb = get_image_mb(image_path)
if image_mb > root_mb:
root_mb = image_mb
discovery(address, port)
login_iscsi(address, port, iqn)
try:
root_uuid = work_on_disk(dev, root_mb, swap_mb, image_path)
except exception.ProcessExecutionError as err:
with excutils.save_and_reraise_exception():
# Log output if there was a error
LOG.error("Cmd : %s" % err.cmd)
LOG.error("StdOut : %s" % err.stdout)
LOG.error("StdErr : %s" % err.stderr)
finally:
logout_iscsi(address, port, iqn)
switch_pxe_config(pxe_config_path, root_uuid)
# Ensure the node started netcat on the port after POST the request.
time.sleep(3)
notify(address, 10000)
class Worker(threading.Thread):
"""Thread that handles requests in queue."""
def __init__(self):
super(Worker, self).__init__()
self.setDaemon(True)
self.stop = False
self.queue_timeout = 1
def run(self):
while not self.stop:
try:
# Set timeout to check self.stop periodically
(node_id, params) = QUEUE.get(block=True,
timeout=self.queue_timeout)
except Queue.Empty:
pass
else:
# Requests comes here from BareMetalDeploy.post()
LOG.info(_('start deployment for node %(node_id)s, '
'params %(params)s') %
{'node_id': node_id, 'params': params})
context = sysinv_context.get_admin_context()
try:
db.bm_node_update(context, node_id,
{'task_state': states.DEPLOYING})
deploy(**params)
except Exception:
LOG.error(_('deployment to node %s failed') % node_id)
db.bm_node_update(context, node_id,
{'task_state': states.DEPLOYFAIL})
else:
LOG.info(_('deployment to node %s done') % node_id)
db.bm_node_update(context, node_id,
{'task_state': states.DEPLOYDONE})
class BareMetalDeploy(object):
"""WSGI server for bare-metal deployment."""
def __init__(self):
self.worker = Worker()
self.worker.start()
def __call__(self, environ, start_response):
method = environ['REQUEST_METHOD']
if method == 'POST':
return self.post(environ, start_response)
else:
start_response('501 Not Implemented',
[('Content-type', 'text/plain')])
return 'Not Implemented'
def post(self, environ, start_response):
LOG.info(_("post: environ=%s") % environ)
inpt = environ['wsgi.input']
length = int(environ.get('CONTENT_LENGTH', 0))
x = inpt.read(length)
q = dict(cgi.parse_qsl(x))
try:
node_id = q['i']
deploy_key = q['k']
address = q['a']
port = q.get('p', '3260')
iqn = q['n']
lun = q.get('l', '1')
err_msg = q.get('e')
except KeyError as e:
start_response('400 Bad Request', [('Content-type', 'text/plain')])
return "parameter '%s' is not defined" % e
if err_msg:
LOG.error(_('Deploy agent error message: %s'), err_msg)
context = sysinv_context.get_admin_context()
d = db.bm_node_get(context, node_id)
if d['deploy_key'] != deploy_key:
start_response('400 Bad Request', [('Content-type', 'text/plain')])
return 'key is not match'
params = {'address': address,
'port': port,
'iqn': iqn,
'lun': lun,
'image_path': d['image_path'],
'pxe_config_path': d['pxe_config_path'],
'root_mb': int(d['root_mb']),
'swap_mb': int(d['swap_mb']),
}
# Restart worker, if needed
if not self.worker.isAlive():
self.worker = Worker()
self.worker.start()
LOG.info(_("request is queued: node %(node_id)s, params %(params)s") %
{'node_id': node_id, 'params': params})
QUEUE.put((node_id, params))
# Requests go to Worker.run()
start_response('200 OK', [('Content-type', 'text/plain')])
return ''
def main():
config.parse_args(sys.argv)
logging.setup("nova")
global LOG
LOG = logging.getLogger('nova.virt.baremetal.deploy_helper')
app = BareMetalDeploy()
srv = simple_server.make_server('', 10000, app)
srv.serve_forever()

View File

@ -606,6 +606,10 @@ class MultipleResults(SysinvException):
message = _("More than one result found.")
class NTPNotFound(NotFound):
message = _("No NTP with id %(intp_id)s found.")
class PTPNotFound(NotFound):
message = _("No PTP with id %(uuid)s found.")
@ -812,6 +816,10 @@ class AddressPoolNotFound(NotFound):
message = _("Address pool %(address_pool_uuid)s not found")
class AddressPoolNotFoundByID(NotFound):
message = _("Address pool %(address_pool_id)s not found")
class AddressPoolNotFoundByName(NotFound):
message = _("Address pool %(name)s not found")

View File

@ -9,7 +9,6 @@
import netaddr
import pecan
import rpm
import wsme
from sysinv.common import constants
@ -250,16 +249,6 @@ def _validate_mac_address(name, value):
pass # allow any OUI value regardless of registration
def _rpm_pkg_is_installed(pkg_name):
ts = rpm.TransactionSet()
mi = ts.dbMatch()
mi.pattern('name', rpm.RPMMIRE_GLOB, pkg_name)
sum = 0
for h in mi:
sum += 1
return (sum > 0)
def _validate_radosgw_enabled(name, value):
if not cutils.is_valid_boolstr(value):
raise wsme.exc.ClientSideError(_(

View File

@ -287,6 +287,7 @@ class CephOperator(object):
reason=response.reason)
return response, body
# TODO(rchurch): remove this method and just use get_osd_pool_quota
def osd_get_pool_quota(self, pool_name):
"""Get the quota for an OSD pool
:param pool_name:
@ -299,8 +300,8 @@ class CephOperator(object):
else:
LOG.error("Getting the quota for %(name)s pool failed:%(reason)s)"
% {"name": pool_name, "reason": resp.reason})
raise exception.CephPoolGetFailure(pool=pool_name,
reason=resp.reason)
raise exception.CephPoolGetQuotaFailure(pool=pool_name,
reason=resp.reason)
def osd_create(self, stor_uuid, **kwargs):
""" Create osd via ceph api
@ -522,8 +523,8 @@ class CephOperator(object):
resp, quota = self._ceph_api.osd_get_pool_quota(pool_name, body='json')
if not resp.ok:
e = exception.CephPoolGetQuotaFailure(
pool=pool_name, reason=resp.reason)
e = exception.CephPoolGetQuotaFailure(pool=pool_name,
reason=resp.reason)
LOG.error(e)
raise e
else:
@ -1444,9 +1445,8 @@ class CephOperator(object):
if t.uuid == self.primary_tier_uuid:
# Query ceph to get rgw object pool name.
# To be safe let configure_osd_pools() be the only place that can
# update the object pool name in CEPH_POOLS, so we make a local
# copy of CEPH_POOLS here.
# a local copy of CEPH_POOLS here is wasteful.
# Nothing changes it anymore.
pools_snapshot = copy.deepcopy(CEPH_POOLS)
for pool in pools_snapshot:
if pool['pool_name'] == constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL:

View File

@ -1547,18 +1547,6 @@ class ConductorManager(service.PeriodicService):
if host['hostname'] == constants.STORAGE_0_HOSTNAME:
self._ceph_mon_create(host)
# TODO(CephPoolsDecouple): remove
def configure_osd_pools(self, context, ceph_backend=None, new_pool_size=None, new_pool_min_size=None):
"""Configure or update configuration of the OSD pools.
If none of the optionals are provided then all pools are updated based on DB configuration.
:param context: an admin context.
:param ceph_backend: Optional ceph backend object of a tier
:param new_pool_size: Optional override for replication number.
:param new_pool_min_size: Optional override for minimum replication number.
"""
self._ceph.configure_osd_pools(ceph_backend, new_pool_size, new_pool_min_size)
def remove_host_config(self, context, host_uuid):
"""Remove configuration files for a host.
@ -6498,7 +6486,7 @@ class ConductorManager(service.PeriodicService):
# self._update_storage_backend_alarm(fm_constants.FM_ALARM_STATE_CLEAR,
# constants.SB_TYPE_EXTERNAL)
def report_exernal_config_failure(self, host_uuid, error):
def report_external_config_failure(self, host_uuid, error):
"""
Callback for Sysinv Agent
@ -7110,9 +7098,6 @@ class ConductorManager(service.PeriodicService):
LOG.info("Updating parameters configuration for service: %s" % service)
if service == constants.SERVICE_TYPE_CEPH:
return self._ceph.update_service_config(do_apply)
# On service parameter add just update the host profile
# for personalities pertinent to that service
if service == constants.SERVICE_TYPE_HTTP:

View File

@ -315,7 +315,7 @@ class OpenStackOperator(object):
nhosts = []
if hasattr(metadata, 'hosts'):
nhosts = metadata.hosts or []
nhosts = metadata.hosts or [] # pylint: disable=no-member
if ihost.hostname in nhosts:
LOG.warn("host=%s in already in aggregate id=%s" %
@ -485,10 +485,10 @@ class OpenStackOperator(object):
region1_name = get_region_name('region_1_name')
if region1_name is None:
region1_name = 'RegionOne'
service_list = self._get_keystoneclient(OPENSTACK_CONFIG).services.list()
service_list = self._get_keystone_client(OPENSTACK_CONFIG).services.list()
for s in service_list:
if s.name.find(constants.SERVICE_TYPE_CINDER) != -1:
endpoint_list += self._get_keystoneclient(OPENSTACK_CONFIG).endpoints.list(
endpoint_list += self._get_keystone_client(OPENSTACK_CONFIG).endpoints.list(
service=s, region=region1_name)
except Exception:
LOG.error("Failed to get keystone endpoints for cinder.")
@ -610,7 +610,7 @@ class OpenStackOperator(object):
region1_name = get_region_name('region_1_name')
if region1_name is None:
region1_name = 'RegionOne'
auth_ref = self._get_keystoneclient(PLATFORM_CONFIG).auth_ref
auth_ref = self._get_keystone_client(PLATFORM_CONFIG).auth_ref
if auth_ref is None:
raise exception.SysinvException(_("Unable to get auth ref "
"from keystone client"))

View File

@ -116,22 +116,6 @@ class ConductorAPI(sysinv.openstack.common.rpc.proxy.RpcProxy):
host=host,
do_worker_apply=do_worker_apply))
# TODO(CephPoolsDecouple): remove
def configure_osd_pools(self, context, ceph_backend=None, new_pool_size=None, new_pool_min_size=None):
"""Configure or update configuration of the OSD pools.
If none of the optionals are provided then all pools are updated based on DB configuration.
:param context: an admin context.
:param ceph_backend: Optional ceph backend object of a tier
:param new_pool_size: Optional override for replication number.
:param new_pool_min_size: Optional override for minimum replication number.
"""
return self.call(context,
self.make_msg('configure_osd_pools',
ceph_backend=ceph_backend,
new_pool_size=new_pool_size,
new_pool_min_size=new_pool_min_size))
def remove_host_config(self, context, host_uuid):
"""Synchronously, have a conductor remove configuration for a host.

View File

@ -438,8 +438,8 @@ def add_port_filter_by_numa_node(query, nodeid):
if utils.is_int_like(nodeid):
#
# Should not need join due to polymorphic ports table
# query = query.join(models.ports,
# models.EthernetPorts.id == models.ports.id)
# query = query.join(models.Ports,
# models.EthernetPorts.id == models.Ports.id)
#
# Query of ethernet_ports table should return data from
# corresponding ports table entry so should be able to
@ -475,8 +475,8 @@ def add_port_filter_by_host(query, hostid):
if utils.is_int_like(hostid):
#
# Should not need join due to polymorphic ports table
# query = query.join(models.ports,
# models.EthernetPorts.id == models.ports.id)
# query = query.join(models.Ports,
# models.EthernetPorts.id == models.Ports.id)
#
# Query of ethernet_ports table should return data from
# corresponding ports table entry so should be able to
@ -989,9 +989,9 @@ def add_sensor_filter_by_ihost_sensorgroup(query, hostid, sensorgroupid):
elif utils.is_uuid_like(hostid) and utils.is_uuid_like(sensorgroupid):
query = query.join(models.ihost,
models.isensorgroup)
models.SensorGroups)
return query.filter(models.ihost.uuid == hostid,
models.isensorgroup.uuid == sensorgroupid)
models.SensorGroups.uuid == sensorgroupid)
LOG.debug("sensor_filter_by_host_isensorgroup: "
"No match for supplied filter ids (%s, %s)"
@ -2514,7 +2514,7 @@ class Connection(api.Connection):
values)
def virtual_interface_destroy(self, interface_id):
return self._interface_destroy(models.VirtuaInterfaces, interface_id)
return self._interface_destroy(models.VirtualInterfaces, interface_id)
def _disk_get(self, disk_id, forihostid=None):
query = model_query(models.idisk)
@ -6738,7 +6738,7 @@ class Connection(api.Connection):
port = self.port_get(int(portid))
elif utils.is_uuid_like(portid):
port = self.port_get(portid.strip())
elif isinstance(portid, models.port):
elif isinstance(portid, models.Ports):
port = portid
else:
raise exception.PortNotFound(port=portid)
@ -6884,7 +6884,7 @@ class Connection(api.Connection):
agent = self.lldp_agent_get(int(agentid))
elif utils.is_uuid_like(agentid):
agent = self.lldp_agent_get(agentid.strip())
elif isinstance(agentid, models.lldp_agents):
elif isinstance(agentid, models.LldpAgents):
agent = agentid
else:
raise exception.LldpAgentNotFound(agent=agentid)
@ -6894,7 +6894,7 @@ class Connection(api.Connection):
neighbour = self.lldp_neighbour_get(int(neighbourid))
elif utils.is_uuid_like(neighbourid):
neighbour = self.lldp_neighbour_get(neighbourid.strip())
elif isinstance(neighbourid, models.lldp_neighbours):
elif isinstance(neighbourid, models.LldpNeighbours):
neighbour = neighbourid
else:
raise exception.LldpNeighbourNotFound(neighbour=neighbourid)
@ -6930,7 +6930,7 @@ class Connection(api.Connection):
agent = self.lldp_agent_get(int(agentid))
elif utils.is_uuid_like(agentid):
agent = self.lldp_agent_get(agentid.strip())
elif isinstance(agentid, models.lldp_agents):
elif isinstance(agentid, models.LldpAgents):
agent = agentid
else:
raise exception.LldpAgentNotFound(agent=agentid)
@ -6940,7 +6940,7 @@ class Connection(api.Connection):
neighbour = self.lldp_neighbour_get(int(neighbourid))
elif utils.is_uuid_like(neighbourid):
neighbour = self.lldp_neighbour_get(neighbourid.strip())
elif isinstance(neighbourid, models.lldp_neighbours):
elif isinstance(neighbourid, models.LldpNeighbours):
neighbour = neighbourid
else:
raise exception.LldpNeighbourNotFound(neighbour=neighbourid)

View File

@ -130,7 +130,7 @@ class BaseHelm(object):
return self._count_hosts_by_label(common.LABEL_CONTROLLER)
def _num_computes(self):
return self._count_hosts_by_label(common.LABEL_COMPUTE)
return self._count_hosts_by_label(common.LABEL_COMPUTE_LABEL)
def _num_controllers_by_personality(self):
return int(self.dbapi.count_hosts_by_personality(

View File

@ -21,6 +21,12 @@ class ElasticBaseHelm(base.BaseHelm):
base.BaseHelm.SUPPORTED_NAMESPACES + [common.HELM_NS_MONITOR]
}
@property
def CHART(self):
# subclasses must define the property: CHART='name of chart'
# if an author of a new chart forgets this, NotImplementedError is raised
raise NotImplementedError
def get_namespaces(self):
return self.SUPPORTED_NAMESPACES

View File

@ -34,6 +34,12 @@ class OpenstackBaseHelm(base.BaseHelm):
base.BaseHelm.SUPPORTED_NAMESPACES + [common.HELM_NS_OPENSTACK]
}
@property
def CHART(self):
# subclasses must define the property: CHART='name of chart'
# if an author of a new chart forgets this, NotImplementedError is raised
raise NotImplementedError
def _get_service_config(self, service):
configs = self.context.setdefault('_service_configs', {})
if service not in configs:

View File

@ -42,4 +42,5 @@ class Address(base.SysinvObject):
return cls.dbapi.address_get(uuid)
def save_changes(self, context, updates):
self.dbapi.address_update(self.uuid, updates)
self.dbapi.address_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -39,4 +39,5 @@ class AddressMode(base.SysinvObject):
return cls.dbapi.address_mode_get(uuid)
def save_changes(self, context, updates):
self.dbapi.address_mode_update(self.uuid, updates)
self.dbapi.address_mode_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -58,4 +58,5 @@ class AddressPool(base.SysinvObject):
return cls.dbapi.address_pool_get(uuid)
def save_changes(self, context, updates):
self.dbapi.address_pool_update(self.uuid, updates)
self.dbapi.address_pool_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -377,7 +377,10 @@ class SysinvObject(object):
@remotable
def refresh(self, context):
"""Refresh the object fields from the persistent store"""
current = self.__class__.get_by_uuid(context, uuid=self.uuid)
# todo(jkung) uuid is not declared in this baseclass
# which means every subclass MUST declare it unless this is refactored
current = self.__class__.get_by_uuid(context,
uuid=self.uuid) # pylint: disable=no-member
for field in self.fields:
if (hasattr(self, get_attrname(field)) and
self[field] != current[field]):
@ -508,7 +511,7 @@ class ObjectListBase(object):
new_obj = self.__class__()
new_obj.objects = self.objects[index]
# NOTE(danms): We must be mixed in with an SysinvObject!
new_obj.obj_reset_changes()
new_obj.obj_reset_changes() # pylint: disable=no-member
new_obj._context = self._context
return new_obj
return self.objects[index]

View File

@ -41,4 +41,5 @@ class CephMon(base.SysinvObject):
return cls.dbapi.ceph_mon_get(uuid)
def save_changes(self, context, updates):
self.dbapi.ceph_mon_update(self.uuid, updates)
self.dbapi.ceph_mon_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -31,4 +31,5 @@ class Certificate(base.SysinvObject):
return cls.dbapi.certificate_get(uuid)
def save_changes(self, context, updates):
self.dbapi.certificate_update(self.uuid, updates)
self.dbapi.certificate_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -75,4 +75,5 @@ class Cluster(base.SysinvObject):
return cls.dbapi.cluster_get(uuid)
def save_changes(self, context, updates):
self.dbapi.cluster_update(self.uuid, updates)
self.dbapi.cluster_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -34,4 +34,5 @@ class Community(base.SysinvObject):
return cls.dbapi.icommunity_get_by_name(name)
def save_changes(self, context, updates):
self.dbapi.icommunity_update(self.uuid, updates)
self.dbapi.icommunity_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -41,4 +41,5 @@ class ControllerFS(base.SysinvObject):
return cls.dbapi.controller_fs_get(uuid)
def save_changes(self, context, updates):
self.dbapi.controller_fs_update(self.uuid, updates)
self.dbapi.controller_fs_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -44,4 +44,5 @@ class CPU(base.SysinvObject):
return cls.dbapi.icpu_get(uuid)
def save_changes(self, context, updates):
self.dbapi.icpu_update(self.uuid, updates)
self.dbapi.icpu_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -39,4 +39,5 @@ class DataNetwork(base.SysinvObject):
return cls.dbapi.datanetwork_get(uuid)
def save_changes(self, context, updates):
self.dbapi.datanetwork_update(self.uuid, updates)
self.dbapi.datanetwork_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -51,4 +51,5 @@ class Disk(base.SysinvObject):
return cls.dbapi.idisk_get(uuid)
def save_changes(self, context, updates):
self.dbapi.idisk_update(self.uuid, updates)
self.dbapi.idisk_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -36,4 +36,5 @@ class DNS(base.SysinvObject):
return cls.dbapi.idns_get(uuid)
def save_changes(self, context, updates):
self.dbapi.idns_update(self.uuid, updates)
self.dbapi.idns_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -49,4 +49,5 @@ class DRBDConfig(base.SysinvObject):
return cls.dbapi.drbdconfig_get(uuid)
def save_changes(self, context, updates):
self.dbapi.drbdconfig_update(self.uuid, updates)
self.dbapi.drbdconfig_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -29,5 +29,7 @@ class HelmOverrides(base.SysinvObject):
return cls.dbapi.helm_override_get(app_id, name, namespace)
def save_changes(self, context, updates):
self.dbapi.helm_override_update(self.app_id, self.name,
self.namespace, updates)
self.dbapi.helm_override_update(self.app_id, # pylint: disable=no-member
self.name, # pylint: disable=no-member
self.namespace, # pylint: disable=no-member
updates)

View File

@ -101,7 +101,8 @@ class Host(base.SysinvObject):
return cls.dbapi.ihost_get(uuid)
def save_changes(self, context, updates):
self.dbapi.ihost_update(self.uuid, updates)
self.dbapi.ihost_update(self.uuid, # pylint: disable=no-member
updates)
class ihost(Host):

View File

@ -35,4 +35,5 @@ class HostFS(base.SysinvObject):
return cls.dbapi.host_fs_get(uuid)
def save_changes(self, context, updates):
self.dbapi.host_fs_update(self.uuid, updates)
self.dbapi.host_fs_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -34,4 +34,5 @@ class HostUpgrade(base.SysinvObject):
return cls.dbapi.host_upgrade_get_by_host(host_id)
def save_changes(self, context, updates):
self.dbapi.host_upgrade_update(self.id, updates)
self.dbapi.host_upgrade_update(self.id, # pylint: disable=no-member
updates)

View File

@ -166,4 +166,5 @@ class Interface(base.SysinvObject):
return cls.dbapi.iinterface_get(uuid)
def save_changes(self, context, updates):
self.dbapi.iinterface_update(self.uuid, updates)
self.dbapi.iinterface_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -27,4 +27,5 @@ class AEInterface(interface_ethernet.EthernetInterface):
return cls.dbapi.ae_interface_get(uuid)
def save_changes(self, context, updates):
self.dbapi.ae_interface_update(self.uuid, updates)
self.dbapi.ae_interface_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -25,4 +25,5 @@ class EthernetInterface(interface_base.InterfaceBase):
return cls.dbapi.ethernet_interface_get(uuid)
def save_changes(self, context, updates):
self.dbapi.ethernet_interface_update(self.uuid, updates)
self.dbapi.ethernet_interface_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -21,4 +21,5 @@ class VirtualInterface(interface_ethernet.EthernetInterface):
return cls.dbapi.virtual_interface_get(uuid)
def save_changes(self, context, updates):
self.dbapi.virtual_interface_update(self.uuid, updates)
self.dbapi.virtual_interface_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -25,4 +25,5 @@ class VLANInterface(interface_ethernet.EthernetInterface):
return cls.dbapi.vlan_interface_get(uuid)
def save_changes(self, context, updates):
self.dbapi.vlan_interface_update(self.uuid, updates)
self.dbapi.vlan_interface_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -30,4 +30,5 @@ class Journal(base.SysinvObject):
return cls.dbapi.journal_get(uuid)
def save_changes(self, context, updates):
self.dbapi.journal_update(self.uuid, updates)
self.dbapi.journal_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -37,4 +37,5 @@ class KubeApp(base.SysinvObject):
return cls.dbapi.kube_app_get_inactive_by_name_version(name, version)
def save_changes(self, context, updates):
self.dbapi.kube_app_update(self.id, updates)
self.dbapi.kube_app_update(self.id, # pylint: disable=no-member
updates)

View File

@ -32,5 +32,7 @@ class KubeAppReleases(base.SysinvObject):
return cls.dbapi.kube_app_chart_release_get(app_id, release, namespace)
def save_changes(self, context, updates):
self.dbapi.kube_app_chart_release_update(self.app_id, self.release,
self.namespace, updates)
self.dbapi.kube_app_chart_release_update(self.app_id, # pylint: disable=no-member
self.release, # pylint: disable=no-member
self.namespace, # pylint: disable=no-member
updates)

View File

@ -36,4 +36,5 @@ class Label(base.SysinvObject):
return cls.dbapi.label_get_by_host(host_id)
def save_changes(self, context, updates):
self.dbapi.label_update(self.uuid, updates)
self.dbapi.label_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -59,4 +59,5 @@ class LLDPAgent(base.SysinvObject):
return cls.dbapi.lldp_agent_get(uuid)
def save_changes(self, context, updates):
self.dbapi.lldp_agent_update(self.uuid, updates)
self.dbapi.lldp_agent_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -58,4 +58,5 @@ class LLDPNeighbour(base.SysinvObject):
return cls.dbapi.lldp_neighbour_get(uuid)
def save_changes(self, context, updates):
self.dbapi.lldp_neighbour_update(self.uuid, updates)
self.dbapi.lldp_neighbour_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -35,4 +35,5 @@ class LLDPTLV(base.SysinvObject):
return cls.dbapi.lldp_tlv_get_by_id(id)
def save_changes(self, context, updates):
self.dbapi.lldp_tlv_update(self.id, updates)
self.dbapi.lldp_tlv_update(self.id, # pylint: disable=no-member
updates)

View File

@ -33,4 +33,5 @@ class Load(base.SysinvObject):
return self.dbapi.load_get(uuid)
def save_changes(self, context, updates):
self.dbapi.load_update(self.uuid, updates)
self.dbapi.load_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -46,7 +46,8 @@ class LVG(base.SysinvObject):
return cls.dbapi.ilvg_get(uuid)
def save_changes(self, context, updates):
self.dbapi.ilvg_update(self.uuid, updates)
self.dbapi.ilvg_update(self.uuid, # pylint: disable=no-member
updates)
class ilvg(LVG):

View File

@ -63,4 +63,5 @@ class Memory(base.SysinvObject):
return cls.dbapi.imemory_get(uuid)
def save_changes(self, context, updates):
self.dbapi.imemory_update(self.uuid, updates)
self.dbapi.imemory_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -34,4 +34,5 @@ class Network(base.SysinvObject):
return cls.dbapi.network_get(uuid)
def save_changes(self, context, updates):
self.dbapi.network_update(self.uuid, updates)
self.dbapi.network_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -98,7 +98,7 @@ class OAMNetwork(base.SysinvObject):
:param context: Security context
"""
network = self.dbapi._network_get(self.uuid)
network = self.dbapi._network_get(self.uuid) # pylint: disable=no-member
address_pool = network.address_pool
addresses = OAMNetwork._get_pool_addresses(address_pool)

View File

@ -31,4 +31,5 @@ class Node(base.SysinvObject):
return cls.dbapi.inode_get(uuid)
def save_changes(self, context, updates):
self.dbapi.inode_update(self.uuid, updates)
self.dbapi.inode_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -37,4 +37,5 @@ class NTP(base.SysinvObject):
return cls.dbapi.intp_get(uuid)
def save_changes(self, context, updates):
self.dbapi.intp_update(self.uuid, updates)
self.dbapi.intp_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -48,4 +48,5 @@ class Partition(base.SysinvObject):
return cls.dbapi.partition_get(uuid)
def save_changes(self, context, updates):
self.dbapi.partition_update(self.uuid, updates)
self.dbapi.partition_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -50,4 +50,5 @@ class PCIDevice(base.SysinvObject):
return cls.dbapi.pci_device_get(uuid)
def save_changes(self, context, updates):
self.dbapi.pci_device_update(self.uuid, updates)
self.dbapi.pci_device_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -48,4 +48,5 @@ class Peer(base.SysinvObject):
return cls.dbapi.peer_get(uuid)
def save_changes(self, context, updates):
self.dbapi.peer_update(self.uuid, updates)
self.dbapi.peer_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -30,4 +30,5 @@ class EthernetPort(port.Port):
return cls.dbapi.ethernet_port_get(uuid)
def save_changes(self, context, updates):
self.dbapi.ethernet_port_update(self.uuid, updates)
self.dbapi.ethernet_port_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -63,4 +63,5 @@ class Profile(base.SysinvObject):
return cls.dbapi.ihost_get(uuid)
def save_changes(self, context, updates):
self.dbapi.ihost_update(self.uuid, updates)
self.dbapi.ihost_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -37,4 +37,5 @@ class PTP(base.SysinvObject):
return cls.dbapi.ptp_get(uuid)
def save_changes(self, context, updates):
self.dbapi.ptp_update(self.uuid, updates)
self.dbapi.ptp_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -50,4 +50,5 @@ class PV(base.SysinvObject):
return cls.dbapi.ipv_get(uuid)
def save_changes(self, context, updates):
self.dbapi.ipv_update(self.uuid, updates)
self.dbapi.ipv_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -42,4 +42,5 @@ class RemoteLogging(base.SysinvObject):
return cls.dbapi.remotelogging_get(uuid)
def save_changes(self, context, updates):
self.dbapi.remotelogging_update(self.uuid, updates)
self.dbapi.remotelogging_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -42,4 +42,5 @@ class Route(base.SysinvObject):
return cls.dbapi.route_get(uuid)
def save_changes(self, context, updates):
self.dbapi.route_update(self.uuid, updates)
self.dbapi.route_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -31,4 +31,5 @@ class SDNController(base.SysinvObject):
return cls.dbapi.sdn_controller_get(uuid)
def save_changes(self, context, updates):
self.dbapi.sdn_controller_update(self.uuid, updates)
self.dbapi.sdn_controller_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -80,4 +80,5 @@ class Sensor(base.SysinvObject):
return cls.dbapi.isensor_get(uuid)
def save_changes(self, context, updates):
self.dbapi.isensor_update(self.uuid, updates)
self.dbapi.isensor_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -64,4 +64,5 @@ class SensorAnalog(base.SysinvObject):
return cls.dbapi.isensor_analog_get(uuid)
def save_changes(self, context, updates):
self.dbapi.isensor_analog_update(self.uuid, updates)
self.dbapi.isensor_analog_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -53,4 +53,5 @@ class SensorDiscrete(base.SysinvObject):
return cls.dbapi.isensor_discrete_get(uuid)
def save_changes(self, context, updates):
self.dbapi.isensor_discrete_update(self.uuid, updates)
self.dbapi.isensor_discrete_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -80,4 +80,5 @@ class SensorGroup(base.SysinvObject):
return cls.dbapi.isensorgroup_get(uuid)
def save_changes(self, context, updates):
self.dbapi.isensorgroup_update(self.uuid, updates)
self.dbapi.isensorgroup_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -62,4 +62,5 @@ class SensorGroupAnalog(base.SysinvObject):
return cls.dbapi.isensorgroup_analog_get(uuid)
def save_changes(self, context, updates):
self.dbapi.isensorgroup_analog_update(self.uuid, updates)
self.dbapi.isensorgroup_analog_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -51,4 +51,5 @@ class SensorGroupDiscrete(base.SysinvObject):
return cls.dbapi.isensorgroup_discrete_get(uuid)
def save_changes(self, context, updates):
self.dbapi.isensorgroup_discrete_update(self.uuid, updates)
self.dbapi.isensorgroup_discrete_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -30,4 +30,5 @@ class Service(base.SysinvObject):
return cls.dbapi.service_get(name)
def save_changes(self, context, updates):
self.dbapi.service_update(self.name, updates)
self.dbapi.service_update(self.name, # pylint: disable=no-member
updates)

View File

@ -32,4 +32,5 @@ class ServiceParameter(base.SysinvObject):
return cls.dbapi.service_parameter_get(uuid)
def save_changes(self, context, updates):
self.dbapi.service_parameter_update(self.uuid, updates)
self.dbapi.service_parameter_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -37,4 +37,5 @@ class SoftwareUpgrade(base.SysinvObject):
return cls.dbapi.software_upgrade_get(uuid)
def save_changes(self, context, updates):
self.dbapi.software_upgrade_update(self.uuid, updates)
self.dbapi.software_upgrade_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -115,4 +115,5 @@ class Storage(base.SysinvObject):
return cls.dbapi.istor_get(uuid)
def save_changes(self, context, updates):
self.dbapi.istor_update(self.uuid, updates)
self.dbapi.istor_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -40,4 +40,5 @@ class StorageCeph(storage_backend.StorageBackend):
return cls.dbapi.storage_ceph_get(uuid)
def save_changes(self, context, updates):
self.dbapi.storage_ceph_update(self.uuid, updates)
self.dbapi.storage_ceph_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -25,4 +25,5 @@ class StorageCephExternal(storage_backend.StorageBackend):
return cls.dbapi.storage_ceph_external_get(uuid)
def save_changes(self, context, updates):
self.dbapi.storage_ceph_external_update(self.uuid, updates)
self.dbapi.storage_ceph_external_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -24,4 +24,5 @@ class StorageExternal(storage_backend.StorageBackend):
return cls.dbapi.storage_external_get(uuid)
def save_changes(self, context, updates):
self.dbapi.storage_external_update(self.uuid, updates)
self.dbapi.storage_external_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -24,4 +24,5 @@ class StorageFile(storage_backend.StorageBackend):
return cls.dbapi.storage_file_get(uuid)
def save_changes(self, context, updates):
self.dbapi.storage_file_update(self.uuid, updates)
self.dbapi.storage_file_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -24,4 +24,5 @@ class StorageLVM(storage_backend.StorageBackend):
return cls.dbapi.storage_lvm_get(uuid)
def save_changes(self, context, updates):
self.dbapi.storage_lvm_update(self.uuid, updates)
self.dbapi.storage_lvm_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -105,4 +105,5 @@ class StorageTier(base.SysinvObject):
return cls.dbapi.storage_tier_get(uuid)
def save_changes(self, context, updates):
self.dbapi.storage_tier_update(self.uuid, updates)
self.dbapi.storage_tier_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -42,4 +42,5 @@ class System(base.SysinvObject):
return cls.dbapi.isystem_get(uuid)
def save_changes(self, context, updates):
self.dbapi.isystem_update(self.uuid, updates)
self.dbapi.isystem_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -27,4 +27,5 @@ class TPMConfig(base.SysinvObject):
return cls.dbapi.tpmconfig_get(uuid)
def save_changes(self, context, updates):
self.dbapi.tpmconfig_update(self.uuid, updates)
self.dbapi.tpmconfig_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -34,4 +34,5 @@ class TPMDevice(base.SysinvObject):
return cls.dbapi.tpmdevice_get(uuid)
def save_changes(self, context, updates):
self.dbapi.tpmdevice_update(self.uuid, updates)
self.dbapi.tpmdevice_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -36,4 +36,5 @@ class TrapDest(base.SysinvObject):
return cls.dbapi.itrapdest_get_by_ip(ip)
def save_changes(self, context, updates):
self.dbapi.itrapdest_update(self.uuid, updates)
self.dbapi.itrapdest_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -39,4 +39,5 @@ class User(base.SysinvObject):
return cls.dbapi.iuser_get(uuid)
def save_changes(self, context, updates):
self.dbapi.iuser_update(self.uuid, updates)
self.dbapi.iuser_update(self.uuid, # pylint: disable=no-member
updates)

View File

@ -354,7 +354,7 @@ def cleanup():
global _ENGINE, _MAKER
if _MAKER:
_MAKER.close_all()
_MAKER.close_all() # pylint: disable=no-member
_MAKER = None
if _ENGINE:
_ENGINE.dispose()

View File

@ -157,12 +157,12 @@ class PeriodicTasks(object):
def run_periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
idle_for = DEFAULT_INTERVAL
for task_name, task in self._periodic_tasks:
for task_name, task in self._periodic_tasks: # pylint: disable=no-member
full_task_name = '.'.join([self.__class__.__name__, task_name])
now = timeutils.utcnow()
spacing = self._periodic_spacing[task_name]
last_run = self._periodic_last_run[task_name]
spacing = self._periodic_spacing[task_name] # pylint: disable=no-member
last_run = self._periodic_last_run[task_name] # pylint: disable=no-member
# If a periodic task is _nearly_ due, then we'll run it early
if spacing is not None and last_run is not None:
@ -176,7 +176,7 @@ class PeriodicTasks(object):
LOG.debug(_("Running periodic task %(full_task_name)s"),
{"full_task_name": full_task_name})
self._periodic_last_run[task_name] = timeutils.utcnow()
self._periodic_last_run[task_name] = timeutils.utcnow() # pylint: disable=no-member
try:
task(self, context)

View File

@ -570,7 +570,7 @@ class ParseState(object):
reductions.
"""
for reduction, methname in self.reducers:
for reduction, methname in self.reducers: # pylint: disable=no-member
if (len(self.tokens) >= len(reduction) and
self.tokens[-len(reduction):] == reduction):
# Get the reduction method

View File

@ -1,857 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import pprint
import re
import socket
import sys
import types
import uuid
import eventlet
import greenlet
from oslo_config import cfg
from sysinv.openstack.common import excutils
from sysinv.openstack.common.gettextutils import _
from sysinv.openstack.common import importutils
from sysinv.openstack.common import jsonutils
from sysinv.openstack.common import processutils as utils
from sysinv.openstack.common.rpc import common as rpc_common
from functools import reduce
zmq = importutils.try_import('eventlet.green.zmq')
# for convenience, are not modified.
pformat = pprint.pformat
Timeout = eventlet.timeout.Timeout
LOG = rpc_common.LOG
RemoteError = rpc_common.RemoteError
RPCException = rpc_common.RPCException
zmq_opts = [
cfg.StrOpt('rpc_zmq_bind_address', default='*',
help='ZeroMQ bind address. Should be a wildcard (*), '
'an ethernet interface, or IP. '
'The "host" option should point or resolve to this '
'address.'),
# The module.Class to use for matchmaking.
cfg.StrOpt(
'rpc_zmq_matchmaker',
default=('sysinv.openstack.common.rpc.'
'matchmaker.MatchMakerLocalhost'),
help='MatchMaker driver',
),
# The following port is unassigned by IANA as of 2012-05-21
cfg.IntOpt('rpc_zmq_port', default=9501,
help='ZeroMQ receiver listening port'),
cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1'),
cfg.IntOpt('rpc_zmq_topic_backlog', default=None,
help='Maximum number of ingress messages to locally buffer '
'per topic. Default is unlimited.'),
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
help='Directory for holding IPC sockets'),
cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
help='Name of this node. Must be a valid hostname, FQDN, or '
'IP address. Must match "host" option, if running Nova.')
]
CONF = cfg.CONF
CONF.register_opts(zmq_opts)
ZMQ_CTX = None # ZeroMQ Context, must be global.
matchmaker = None # memoized matchmaker object
def _serialize(data):
"""
Serialization wrapper
We prefer using JSON, but it cannot encode all types.
Error if a developer passes us bad data.
"""
try:
return jsonutils.dumps(data, ensure_ascii=True)
except TypeError:
with excutils.save_and_reraise_exception():
LOG.error(_("JSON serialization failed."))
def _deserialize(data):
"""
Deserialization wrapper
"""
LOG.debug(_("Deserializing: %s"), data)
return jsonutils.loads(data)
class ZmqSocket(object):
"""
A tiny wrapper around ZeroMQ to simplify the send/recv protocol
and connection management.
Can be used as a Context (supports the 'with' statement).
"""
def __init__(self, addr, zmq_type, bind=True, subscribe=None):
self.sock = _get_ctxt().socket(zmq_type)
self.addr = addr
self.type = zmq_type
self.subscriptions = []
# Support failures on sending/receiving on wrong socket type.
self.can_recv = zmq_type in (zmq.PULL, zmq.SUB)
self.can_send = zmq_type in (zmq.PUSH, zmq.PUB)
self.can_sub = zmq_type in (zmq.SUB, )
# Support list, str, & None for subscribe arg (cast to list)
do_sub = {
list: subscribe,
str: [subscribe],
type(None): []
}[type(subscribe)]
for f in do_sub:
self.subscribe(f)
str_data = {'addr': addr, 'type': self.socket_s(),
'subscribe': subscribe, 'bind': bind}
LOG.debug(_("Connecting to %(addr)s with %(type)s"), str_data)
LOG.debug(_("-> Subscribed to %(subscribe)s"), str_data)
LOG.debug(_("-> bind: %(bind)s"), str_data)
try:
if bind:
self.sock.bind(addr)
else:
self.sock.connect(addr)
except Exception:
raise RPCException(_("Could not open socket."))
def socket_s(self):
"""Get socket type as string."""
t_enum = ('PUSH', 'PULL', 'PUB', 'SUB', 'REP', 'REQ', 'ROUTER',
'DEALER')
return dict([(getattr(zmq, t), t) for t in t_enum])[self.type]
def subscribe(self, msg_filter):
"""Subscribe."""
if not self.can_sub:
raise RPCException("Cannot subscribe on this socket.")
LOG.debug(_("Subscribing to %s"), msg_filter)
try:
self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter)
except Exception:
return
self.subscriptions.append(msg_filter)
def unsubscribe(self, msg_filter):
"""Unsubscribe."""
if msg_filter not in self.subscriptions:
return
self.sock.setsockopt(zmq.UNSUBSCRIBE, msg_filter)
self.subscriptions.remove(msg_filter)
def close(self):
if self.sock is None or self.sock.closed:
return
# We must unsubscribe, or we'll leak descriptors.
if self.subscriptions:
for f in self.subscriptions:
try:
self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
except Exception:
pass
self.subscriptions = []
try:
# Default is to linger
self.sock.close()
except Exception:
# While this is a bad thing to happen,
# it would be much worse if some of the code calling this
# were to fail. For now, lets log, and later evaluate
# if we can safely raise here.
LOG.error("ZeroMQ socket could not be closed.")
self.sock = None
def recv(self):
if not self.can_recv:
raise RPCException(_("You cannot recv on this socket."))
return self.sock.recv_multipart()
def send(self, data):
if not self.can_send:
raise RPCException(_("You cannot send on this socket."))
self.sock.send_multipart(data)
class ZmqClient(object):
"""Client for ZMQ sockets."""
def __init__(self, addr, socket_type=None, bind=False):
if socket_type is None:
socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data, envelope=False):
msg_id = msg_id or 0
if not envelope:
self.outq.send([bytes(x) for x in
(msg_id, topic, 'cast', _serialize(data))])
return
rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
self.outq.send([bytes(x) for x in
((msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg)])
def close(self):
self.outq.close()
class RpcContext(rpc_common.CommonRpcContext):
"""Context that supports replying to a rpc.call."""
def __init__(self, **kwargs):
self.replies = []
super(RpcContext, self).__init__(**kwargs)
def deepcopy(self):
values = self.to_dict()
values['replies'] = self.replies
return self.__class__(**values)
def reply(self, reply=None, failure=None, ending=False):
if ending:
return
self.replies.append(reply)
@classmethod
def marshal(self, ctx):
ctx_data = ctx.to_dict()
return _serialize(ctx_data)
@classmethod
def unmarshal(self, data):
return RpcContext.from_dict(_deserialize(data))
class InternalContext(object):
"""Used by ConsumerBase as a private context for - methods."""
def __init__(self, proxy):
self.proxy = proxy
self.msg_waiter = None
def _get_response(self, ctx, proxy, topic, data):
"""Process a curried message and cast the result to topic."""
LOG.debug(_("Running func with context: %s"), ctx.to_dict())
data.setdefault('version', None)
data.setdefault('args', {})
try:
result = proxy.dispatch(
ctx, data['version'], data['method'],
data.get('namespace'), **data['args'])
return ConsumerBase.normalize_reply(result, ctx.replies)
except greenlet.GreenletExit:
# ignore these since they are just from shutdowns
pass
except rpc_common.ClientException as e:
LOG.debug(_("Expected exception during message handling (%s)") %
e._exc_info[1])
return {'exc':
rpc_common.serialize_remote_exception(e._exc_info,
log_failure=False)}
except Exception:
LOG.error(_("Exception during message handling"))
return {'exc':
rpc_common.serialize_remote_exception(sys.exc_info())}
def reply(self, ctx, proxy,
msg_id=None, context=None, topic=None, msg=None):
"""Reply to a casted call."""
# NOTE(ewindisch): context kwarg exists for Grizzly compat.
# this may be able to be removed earlier than
# 'I' if ConsumerBase.process were refactored.
if type(msg) is list:
payload = msg[-1]
else:
payload = msg
response = ConsumerBase.normalize_reply(
self._get_response(ctx, proxy, topic, payload),
ctx.replies)
LOG.debug(_("Sending reply"))
_multi_send(_cast, ctx, topic, {
'method': '-process_reply',
'args': {
'msg_id': msg_id, # Include for Folsom compat.
'response': response
}
}, _msg_id=msg_id)
class ConsumerBase(object):
"""Base Consumer."""
def __init__(self):
self.private_ctx = InternalContext(None)
@classmethod
def normalize_reply(self, result, replies):
# TODO(ewindisch): re-evaluate and document this method.
if isinstance(result, types.GeneratorType):
return list(result)
elif replies:
return replies
else:
return [result]
def process(self, proxy, ctx, data):
data.setdefault('version', None)
data.setdefault('args', {})
# Method starting with - are
# processed internally. (non-valid method name)
method = data.get('method')
if not method:
LOG.error(_("RPC message did not include method."))
return
# Internal method
# uses internal context for safety.
if method == '-reply':
self.private_ctx.reply(ctx, proxy, **data['args'])
return
proxy.dispatch(ctx, data['version'],
data['method'], data.get('namespace'), **data['args'])
class ZmqBaseReactor(ConsumerBase):
"""
A consumer class implementing a
centralized casting broker (PULL-PUSH)
for RoundRobin requests.
"""
def __init__(self, conf):
super(ZmqBaseReactor, self).__init__()
self.mapping = {}
self.proxies = {}
self.threads = []
self.sockets = []
self.subscribe = {}
self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)
def register(self, proxy, in_addr, zmq_type_in, out_addr=None,
zmq_type_out=None, in_bind=True, out_bind=True,
subscribe=None):
LOG.info(_("Registering reactor"))
if zmq_type_in not in (zmq.PULL, zmq.SUB):
raise RPCException("Bad input socktype")
# Items push in.
inq = ZmqSocket(in_addr, zmq_type_in, bind=in_bind,
subscribe=subscribe)
self.proxies[inq] = proxy
self.sockets.append(inq)
LOG.info(_("In reactor registered"))
if not out_addr:
return
if zmq_type_out not in (zmq.PUSH, zmq.PUB):
raise RPCException("Bad output socktype")
# Items push out.
outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind)
self.mapping[inq] = outq
self.mapping[outq] = inq
self.sockets.append(outq)
LOG.info(_("Out reactor registered"))
def consume_in_thread(self):
def _consume(sock):
LOG.info(_("Consuming socket"))
while True:
self.consume(sock)
for k in self.proxies.keys():
self.threads.append(
self.pool.spawn(_consume, k)
)
def wait(self):
for t in self.threads:
t.wait()
def close(self):
for s in self.sockets:
s.close()
for t in self.threads:
t.kill()
class ZmqProxy(ZmqBaseReactor):
"""
A consumer class implementing a
topic-based proxy, forwarding to
IPC sockets.
"""
def __init__(self, conf):
super(ZmqProxy, self).__init__(conf)
pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\'))
self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep)))
self.topic_proxy = {}
def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir
# TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
topic = data[1]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
if topic.startswith('fanout~'):
sock_type = zmq.PUB
topic = topic.split('.', 1)[0]
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
else:
sock_type = zmq.PUSH
if topic not in self.topic_proxy:
def publisher(waiter):
LOG.info(_("Creating proxy for topic: %s"), topic)
try:
# The topic is received over the network,
# don't trust this input.
if self.badchars.search(topic) is not None:
emsg = _("Topic contained dangerous characters.")
LOG.warn(emsg)
raise RPCException(emsg)
out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
(ipc_dir, topic),
sock_type, bind=True)
except RPCException:
waiter.send_exception(*sys.exc_info())
return
self.topic_proxy[topic] = eventlet.queue.LightQueue(
CONF.rpc_zmq_topic_backlog)
self.sockets.append(out_sock)
# It takes some time for a pub socket to open,
# before we can have any faith in doing a send() to it.
if sock_type == zmq.PUB:
eventlet.sleep(.5)
waiter.send(True)
while(True):
data = self.topic_proxy[topic].get()
out_sock.send(data)
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
{'data': data})
wait_sock_creation = eventlet.event.Event()
eventlet.spawn(publisher, wait_sock_creation)
try:
wait_sock_creation.wait()
except RPCException:
LOG.error(_("Topic socket file creation failed."))
return
try:
self.topic_proxy[topic].put_nowait(data)
LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
{'data': data})
except eventlet.queue.Full:
LOG.error(_("Local per-topic backlog buffer full for topic "
"%(topic)s. Dropping message.") % {'topic': topic})
def consume_in_thread(self):
"""Runs the ZmqProxy service"""
ipc_dir = CONF.rpc_zmq_ipc_dir
consume_in = "tcp://%s:%s" % \
(CONF.rpc_zmq_bind_address,
CONF.rpc_zmq_port)
consumption_proxy = InternalContext(None)
if not os.path.isdir(ipc_dir):
try:
utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
ipc_dir, run_as_root=True)
utils.execute('chmod', '750', ipc_dir, run_as_root=True)
except utils.ProcessExecutionError:
with excutils.save_and_reraise_exception():
LOG.error(_("Could not create IPC directory %s") %
(ipc_dir, ))
try:
self.register(consumption_proxy,
consume_in,
zmq.PULL,
out_bind=True)
except zmq.ZMQError:
with excutils.save_and_reraise_exception():
LOG.error(_("Could not create ZeroMQ receiver daemon. "
"Socket may already be in use."))
super(ZmqProxy, self).consume_in_thread()
def unflatten_envelope(packenv):
"""Unflattens the RPC envelope.
Takes a list and returns a dictionary.
i.e. [1,2,3,4] => {1: 2, 3: 4}
"""
i = iter(packenv)
h = {}
try:
while True:
k = next(i)
h[k] = next(i)
except StopIteration:
return h
class ZmqReactor(ZmqBaseReactor):
"""
A consumer class implementing a
consumer for messages. Can also be
used as a 1:1 proxy
"""
def __init__(self, conf):
super(ZmqReactor, self).__init__(conf)
def consume(self, sock):
# TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
if sock in self.mapping:
LOG.debug(_("ROUTER RELAY-OUT %(data)s") % {
'data': data})
self.mapping[sock].send(data)
return
proxy = self.proxies[sock]
if data[2] == 'cast': # Legacy protocol
packenv = data[3]
ctx, msg = _deserialize(packenv)
request = rpc_common.deserialize_msg(msg)
ctx = RpcContext.unmarshal(ctx)
elif data[2] == 'impl_zmq_v2':
packenv = data[4:]
msg = unflatten_envelope(packenv)
request = rpc_common.deserialize_msg(msg)
# Unmarshal only after verifying the message.
ctx = RpcContext.unmarshal(data[3])
else:
LOG.error(_("ZMQ Envelope version unsupported or unknown."))
return
self.pool.spawn_n(self.process, proxy, ctx, request)
class Connection(rpc_common.Connection):
"""Manages connections and threads."""
def __init__(self, conf):
self.topics = []
self.reactor = ZmqReactor(conf)
def create_consumer(self, topic, proxy, fanout=False):
# Register with matchmaker.
_get_matchmaker().register(topic, CONF.rpc_zmq_host)
# Subscription scenarios
if fanout:
sock_type = zmq.SUB
subscribe = ('', fanout)[type(fanout) == str]
topic = 'fanout~' + topic.split('.', 1)[0]
else:
sock_type = zmq.PULL
subscribe = None
topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
if topic in self.topics:
LOG.info(_("Skipping topic registration. Already registered."))
return
# Receive messages from (local) proxy
inaddr = "ipc://%s/zmq_topic_%s" % \
(CONF.rpc_zmq_ipc_dir, topic)
LOG.debug(_("Consumer is a zmq.%s"),
['PULL', 'SUB'][sock_type == zmq.SUB])
self.reactor.register(proxy, inaddr, sock_type,
subscribe=subscribe, in_bind=False)
self.topics.append(topic)
def close(self):
_get_matchmaker().stop_heartbeat()
for topic in self.topics:
_get_matchmaker().unregister(topic, CONF.rpc_zmq_host)
self.reactor.close()
self.topics = []
def wait(self):
self.reactor.wait()
def consume_in_thread(self):
_get_matchmaker().start_heartbeat()
self.reactor.consume_in_thread()
def _cast(addr, context, topic, msg, timeout=None, envelope=False,
_msg_id=None):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
with Timeout(timeout_cast, exception=rpc_common.Timeout):
try:
conn = ZmqClient(addr)
# assumes cast can't return an exception
conn.cast(_msg_id, topic, payload, envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
if 'conn' in vars():
conn.close()
def _call(addr, context, topic, msg, timeout=None,
envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
# The msg_id is used to track replies.
msg_id = uuid.uuid4().hex
# Replies always come into the reply service.
reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host
LOG.debug(_("Creating payload"))
# Curry the original request into a reply method.
mcontext = RpcContext.marshal(context)
payload = {
'method': '-reply',
'args': {
'msg_id': msg_id,
'topic': reply_topic,
# TODO(ewindisch): safe to remove mcontext in I.
'msg': [mcontext, msg]
}
}
LOG.debug(_("Creating queue socket for reply waiter"))
# Messages arriving async.
# TODO(ewindisch): have reply consumer with dynamic subscription mgmt
with Timeout(timeout, exception=rpc_common.Timeout):
try:
msg_waiter = ZmqSocket(
"ipc://%s/zmq_topic_zmq_replies.%s" %
(CONF.rpc_zmq_ipc_dir,
CONF.rpc_zmq_host),
zmq.SUB, subscribe=msg_id, bind=False
)
LOG.debug(_("Sending cast"))
_cast(addr, context, topic, payload, envelope)
LOG.debug(_("Cast sent; Waiting reply"))
# Blocks until receives reply
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
if msg[2] == 'cast': # Legacy version
raw_msg = _deserialize(msg[-1])[-1]
elif msg[2] == 'impl_zmq_v2':
rpc_envelope = unflatten_envelope(msg[4:])
raw_msg = rpc_common.deserialize_msg(rpc_envelope)
else:
raise rpc_common.UnsupportedRpcEnvelopeVersion(
_("Unsupported or unknown ZMQ envelope returned."))
responses = raw_msg['args']['response']
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")
except (IndexError, KeyError):
raise RPCException(_("RPC Message Invalid."))
finally:
if 'msg_waiter' in vars():
msg_waiter.close()
# It seems we don't need to do all of the following,
# but perhaps it would be useful for multicall?
# One effect of this is that we're checking all
# responses for Exceptions.
for resp in responses:
if isinstance(resp, dict) and 'exc' in resp:
raise rpc_common.deserialize_remote_exception(CONF, resp['exc'])
return responses[-1]
def _multi_send(method, context, topic, msg, timeout=None,
envelope=False, _msg_id=None):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
message to all relevant hosts.
"""
conf = CONF
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
queues = _get_matchmaker().queues(topic)
LOG.debug(_("Sending message(s) to: %s"), queues)
# Don't stack if we have no matchmaker results
if not queues:
LOG.warn(_("No matchmaker results. Not casting."))
# While not strictly a timeout, callers know how to handle
# this exception and a timeout isn't too big a lie.
raise rpc_common.Timeout(_("No match from matchmaker."))
# This supports brokerless fanout (addresses > 1)
for queue in queues:
(_topic, ip_addr) = queue
_addr = "tcp://%s:%s" % (ip_addr, conf.rpc_zmq_port)
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
_topic, msg, timeout, envelope,
_msg_id)
return
return method(_addr, context, _topic, msg, timeout,
envelope)
def create_connection(conf, new=True):
return Connection(conf)
def multicall(conf, *args, **kwargs):
"""Multiple calls."""
return _multi_send(_call, *args, **kwargs)
def call(conf, *args, **kwargs):
"""Send a message, expect a response."""
data = _multi_send(_call, *args, **kwargs)
return data[-1]
def cast(conf, *args, **kwargs):
"""Send a message expecting no reply."""
_multi_send(_cast, *args, **kwargs)
def fanout_cast(conf, context, topic, msg, **kwargs):
"""Send a message to all listening and expect no reply."""
# NOTE(ewindisch): fanout~ is used because it avoid splitting on .
# and acts as a non-subtle hint to the matchmaker and ZmqProxy.
_multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
def notify(conf, context, topic, msg, envelope):
"""
Send notification event.
Notifications are sent to topic-priority.
This differs from the AMQP drivers which send to topic.priority.
"""
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
topic = topic.replace('.', '-')
cast(conf, context, topic, msg, envelope=envelope)
def cleanup():
"""Clean up resources in use by implementation."""
global ZMQ_CTX
if ZMQ_CTX:
ZMQ_CTX.term()
ZMQ_CTX = None
global matchmaker
matchmaker = None
def _get_ctxt():
if not zmq:
raise ImportError("Failed to import eventlet.green.zmq")
global ZMQ_CTX
if not ZMQ_CTX:
ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)
return ZMQ_CTX
def _get_matchmaker(*args, **kwargs):
global matchmaker
if not matchmaker:
mm = CONF.rpc_zmq_matchmaker
if mm.endswith('matchmaker.MatchMakerRing'):
mm.replace('matchmaker', 'matchmaker_ring')
LOG.warn(_('rpc_zmq_matchmaker = %(orig)s is deprecated; use'
' %(new)s instead') % dict(
orig=CONF.rpc_zmq_matchmaker, new=mm))
matchmaker = importutils.import_object(mm, *args, **kwargs)
return matchmaker

View File

@ -1,148 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The MatchMaker classes should accept a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.
"""
from oslo_config import cfg
from sysinv.openstack.common import importutils
from sysinv.openstack.common import log as logging
from sysinv.openstack.common.rpc import matchmaker as mm_common
redis = importutils.try_import('redis')
matchmaker_redis_opts = [
cfg.StrOpt('host',
default='127.0.0.1',
help='Host to locate redis'),
cfg.IntOpt('port',
default=6379,
help='Use this port to connect to redis host.'),
cfg.StrOpt('password',
default=None,
help='Password for Redis server. (optional)'),
]
CONF = cfg.CONF
opt_group = cfg.OptGroup(name='matchmaker_redis',
title='Options for Redis-based MatchMaker')
CONF.register_group(opt_group)
CONF.register_opts(matchmaker_redis_opts, opt_group)
LOG = logging.getLogger(__name__)
class RedisExchange(mm_common.Exchange):
def __init__(self, matchmaker):
self.matchmaker = matchmaker
self.redis = matchmaker.redis
super(RedisExchange, self).__init__()
class RedisTopicExchange(RedisExchange):
"""
Exchange where all topic keys are split, sending to second half.
i.e. "compute.host" sends a message to "compute" running on "host"
"""
def run(self, topic):
while True:
member_name = self.redis.srandmember(topic)
if not member_name:
# If this happens, there are no
# longer any members.
break
if not self.matchmaker.is_alive(topic, member_name):
continue
host = member_name.split('.', 1)[1]
return [(member_name, host)]
return []
class RedisFanoutExchange(RedisExchange):
"""
Return a list of all hosts.
"""
def run(self, topic):
topic = topic.split('~', 1)[1]
hosts = self.redis.smembers(topic)
good_hosts = [host for host in hosts if self.matchmaker.is_alive(topic, host)]
return [(x, x.split('.', 1)[1]) for x in good_hosts]
class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
"""
MatchMaker registering and looking-up hosts with a Redis server.
"""
def __init__(self):
super(MatchMakerRedis, self).__init__()
if not redis:
raise ImportError("Failed to import module redis.")
self.redis = redis.StrictRedis(
host=CONF.matchmaker_redis.host,
port=CONF.matchmaker_redis.port,
password=CONF.matchmaker_redis.password)
self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self))
self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange())
self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self))
def ack_alive(self, key, host):
topic = "%s.%s" % (key, host)
if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl):
# If we could not update the expiration, the key
# might have been pruned. Re-register, creating a new
# key in Redis.
self.register(self.topic_host[host], host)
def is_alive(self, topic, host):
if self.redis.ttl(host) == -1:
self.expire(topic, host)
return False
return True
def expire(self, topic, host):
with self.redis.pipeline() as pipe:
pipe.multi()
pipe.delete(host)
pipe.srem(topic, host)
pipe.execute()
def backend_register(self, key, key_host):
with self.redis.pipeline() as pipe:
pipe.multi()
pipe.sadd(key, key_host)
# No value is needed, we just
# care if it exists. Sets aren't viable
# because only keys can expire.
pipe.set(key_host, '')
pipe.execute()
def backend_unregister(self, key, key_host):
with self.redis.pipeline() as pipe:
pipe.multi()
pipe.srem(key, key_host)
pipe.delete(key_host)
pipe.execute()

View File

@ -1,41 +0,0 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
eventlet.monkey_patch()
import contextlib
import sys
from oslo_config import cfg
from sysinv.openstack.common import log as logging
from sysinv.openstack.common import rpc
from sysinv.openstack.common.rpc import impl_zmq
CONF = cfg.CONF
CONF.register_opts(rpc.rpc_opts)
CONF.register_opts(impl_zmq.zmq_opts)
def main():
CONF(sys.argv[1:], project='oslo')
logging.setup("oslo")
with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor:
reactor.consume_in_thread()
reactor.wait()

View File

@ -23,6 +23,7 @@ Utilities with minimum-depends for use in setup.py
from __future__ import print_function
import email
import email.errors
import os
import re
import subprocess
@ -336,7 +337,7 @@ def _get_version_from_pkg_info(package_name):
return None
try:
pkg_info = email.message_from_file(pkg_info_file)
except email.MessageError:
except email.errors.MessageError:
return None
# Check to make sure we're in our own dir
if pkg_info.get('Name', None) != package_name:

Some files were not shown because too many files have changed in this diff Show More