Convert openstack driver to statemachine

This updates the OpenStack driver to use the statemachine framework.

The goal is to revise all remaining drivers to use the statemachine
framework for two reasons:

1) We can dramatically reduce the number of threads in Nodepool which
is our biggest scaling bottleneck.  The OpenStack driver already
includes some work in that direction, but in a way that is unique
to it and not easily shared by other drivers.  The statemachine
framework is an extension of that idea implemented so that every driver
can use it.  This change further reduces the number of threads needed
even for the openstack driver.

2) By unifying all the drivers with a simple interface, we can prepare
to move them into Zuul.

There are a few updates to the statemachine framework to accomodate some
features that only the OpenStack driver used to date.

A number of tests need slight alteration since the openstack driver is
the basis of the "fake" driver used for tests.

Change-Id: Ie59a4e9f09990622b192ad840d9c948db717cce2
This commit is contained in:
James E. Blair 2022-10-24 12:43:26 -07:00 committed by Clark Boylan
parent 3a5db84a33
commit be3edd3e17
30 changed files with 1115 additions and 1387 deletions

View File

@ -256,7 +256,7 @@ class NodePoolCmd(NodepoolApp):
# up in alien list since we can't do anything about them
# anyway.
provider_images = [
image for image in manager.listImages()
image for image in manager.adapter._listImages()
if 'nodepool_build_id' in image['properties']]
except Exception as e:
log.warning("Exception listing alien images for %s: %s"

View File

@ -894,6 +894,7 @@ class ConfigPool(ConfigValue, metaclass=abc.ABCMeta):
self.node_attributes = None
self.priority = None
self.ignore_provider_quota = False
self.azs = None
@classmethod
def getCommonSchemaDict(self):

View File

@ -289,7 +289,7 @@ class AwsAdapter(statemachine.Adapter):
self._running = False
def getCreateStateMachine(self, hostname, label, image_external_id,
metadata, retries, request, log):
metadata, retries, request, az, log):
return AwsCreateStateMachine(self, hostname, label, image_external_id,
metadata, retries, request, log)

View File

@ -176,6 +176,7 @@ class AwsLabel(ConfigValue):
self.iam_instance_profile = label.get('iam-instance-profile', None)
self.tags = label.get('tags', {})
self.dynamic_tags = label.get('dynamic-tags', {})
self.host_key_checking = self.pool.host_key_checking
@staticmethod
def getSchema():

View File

@ -347,7 +347,7 @@ class AzureAdapter(statemachine.Adapter):
def getCreateStateMachine(self, hostname, label,
image_external_id, metadata, retries,
request, log):
request, az, log):
return AzureCreateStateMachine(self, hostname, label,
image_external_id, metadata,
retries, request, log)

View File

@ -172,6 +172,7 @@ class AzureLabel(ConfigValue):
self.dynamic_tags = label.get('dynamic-tags', {})
self.user_data = self._encodeData(label.get('user-data', None))
self.custom_data = self._encodeData(label.get('custom-data', None))
self.host_key_checking = self.pool.host_key_checking
def _encodeData(self, s):
if not s:

View File

@ -54,6 +54,7 @@ class ProviderLabel(ConfigValue):
self.instance_type = None
# The ProviderPool object that owns this label.
self.pool = None
self.host_key_checking = None
def __eq__(self, other):
if isinstance(other, ProviderLabel):

View File

@ -1,34 +1,34 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Copyright 2022 Acme Gating, LLC
#
# http://www.apache.org/licenses/LICENSE-2.0
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.config import loader
from nodepool.driver import Driver
from nodepool.driver.statemachine import StateMachineDriver
from nodepool.driver.fake.config import FakeProviderConfig
from nodepool.driver.fake.provider import FakeProvider
from nodepool.driver.fake.adapter import FakeAdapter
class FakeDriver(Driver):
def __init__(self):
super().__init__()
self.reset()
class FakeDriver(StateMachineDriver):
def reset(self):
self.openstack_config = loader.OpenStackConfig()
def __init__(self):
self.reset()
super().__init__()
def getProviderConfig(self, provider):
return FakeProviderConfig(self, provider)
def getProvider(self, provider_config):
return FakeProvider(provider_config)
def getAdapter(self, provider_config):
return FakeAdapter(provider_config)

View File

@ -1,18 +1,17 @@
# Copyright (C) 2011-2013 OpenStack Foundation
# Copyright 2022 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import threading
@ -20,12 +19,11 @@ import time
import uuid
import openstack.exceptions
from nodepool import exceptions
from nodepool.driver.openstack.provider import OpenStackProvider
from nodepool.driver.fake.handler import FakeNodeRequestHandler
from openstack.cloud.exc import OpenStackCloudCreateException
from nodepool.driver.openstack.adapter import OpenStackAdapter
from nodepool import exceptions
class Dummy(object):
IMAGE = 'Image'
@ -56,6 +54,9 @@ class Dummy(object):
args = ' '.join(args)
return '<%s %s %s>' % (self.__kind, id(self), args)
def __contains__(self, key):
return key in self.__dict__
def __getitem__(self, key, default=None):
return getattr(self, key, default)
@ -90,10 +91,13 @@ class FakeOpenStackCloud(object):
]
if networks is None:
self.ipv6_network_uuid = uuid.uuid4().hex
self.no_auto_ip_network_uuid = uuid.uuid4().hex
networks = [dict(id=uuid.uuid4().hex,
name='fake-public-network-name'),
dict(id=uuid.uuid4().hex,
name='fake-private-network-name'),
dict(id=self.no_auto_ip_network_uuid,
name='no-auto-ip-network-name'),
dict(id=self.ipv6_network_uuid,
name='fake-ipv6-network-name')]
self.networks = networks
@ -112,6 +116,7 @@ class FakeOpenStackCloud(object):
Dummy(Dummy.PORT, id=uuid.uuid4().hex, status='DOWN',
device_owner=None),
]
self._floating_ip_list = []
def _update_quota(self):
self.max_cores, self.max_instances, self.max_ram = FakeOpenStackCloud.\
@ -141,7 +146,10 @@ class FakeOpenStackCloud(object):
addresses = None
# if keyword 'ipv6-uuid' is found in provider config,
# ipv6 address will be available in public addr dict.
auto_ip = True
for nic in nics:
if nic['net-id'] == self.no_auto_ip_network_uuid:
auto_ip = False
if nic['net-id'] != self.ipv6_network_uuid:
continue
addresses = dict(
@ -156,15 +164,20 @@ class FakeOpenStackCloud(object):
interface_ip = 'fake_v6'
break
if not addresses:
addresses = dict(
public=[dict(version=4, addr='fake')],
private=[dict(version=4, addr='fake')]
)
public_v6 = ''
public_v4 = 'fake'
private_v4 = 'fake'
host_id = 'fake'
interface_ip = 'fake'
private_v4 = 'fake'
if auto_ip:
addresses = dict(
public=[dict(version=4, addr='fake')],
private=[dict(version=4, addr='fake')]
)
public_v6 = ''
public_v4 = 'fake'
interface_ip = 'fake'
else:
public_v4 = ''
public_v6 = ''
interface_ip = private_v4
self._update_quota()
over_quota = False
if (instance_type == Dummy.INSTANCE and
@ -190,12 +203,13 @@ class FakeOpenStackCloud(object):
host_id=host_id,
interface_ip=interface_ip,
security_groups=security_groups,
location=Dummy(Dummy.LOCATION, zone=kw.get('az')),
location=Dummy(Dummy.LOCATION, zone=az),
metadata=kw.get('meta', {}),
manager=self,
key_name=kw.get('key_name', None),
should_fail=should_fail,
over_quota=over_quota,
flavor=kw.get('flavor'),
event=threading.Event(),
_kw=kw)
instance_list.append(s)
@ -273,19 +287,22 @@ class FakeOpenStackCloud(object):
def get_server_by_id(self, server_id):
return self.get_server(server_id)
def _clean_floating_ip(self, server):
server.public_v4 = ''
server.public_v6 = ''
server.interface_ip = server.private_v4
return server
def list_floating_ips(self):
return self._floating_ip_list
def wait_for_server(self, server, **kwargs):
while server.status == 'BUILD':
time.sleep(0.1)
auto_ip = kwargs.get('auto_ip')
if not auto_ip:
server = self._clean_floating_ip(server)
return server
def create_floating_ip(self, server):
fip = Dummy('floating_ips',
id=uuid.uuid4().hex,
floating_ip_address='fake',
status='ACTIVE')
self._floating_ip_list.append(fip)
return fip
def _needs_floating_ip(self, server, nat_destination):
return False
def _has_floating_ips(self):
return False
def list_servers(self, bare=False):
return self._server_list
@ -326,8 +343,8 @@ class FakeOpenStackCloud(object):
class FakeUploadFailCloud(FakeOpenStackCloud):
log = logging.getLogger("nodepool.FakeUploadFailCloud")
def __init__(self, times_to_fail=None):
super(FakeUploadFailCloud, self).__init__()
def __init__(self, *args, times_to_fail=None, **kw):
super(FakeUploadFailCloud, self).__init__(*args, **kw)
self.times_to_fail = times_to_fail
self.times_failed = 0
@ -344,14 +361,16 @@ class FakeUploadFailCloud(FakeOpenStackCloud):
class FakeLaunchAndGetFaultCloud(FakeOpenStackCloud):
log = logging.getLogger("nodepool.FakeLaunchAndGetFaultCloud")
def __init__(self):
super().__init__()
def wait_for_server(self, server, **kwargs):
def create_server(self, *args, **kwargs):
# OpenStack provider launch code specifically looks for 'quota' in
# the failure message.
server = super().create_server(
*args, **kwargs,
done_status='ERROR')
# Don't wait for the async update
server.status = 'ERROR'
server.fault = {'message': 'quota server fault'}
raise Exception("wait_for_server failure")
raise OpenStackCloudCreateException('server', server.id)
class FakeLaunchAndDeleteFailCloud(FakeOpenStackCloud):
@ -366,16 +385,18 @@ class FakeLaunchAndDeleteFailCloud(FakeOpenStackCloud):
self.launch_success = False
self.delete_success = False
def wait_for_server(self, **kwargs):
def create_server(self, *args, **kwargs):
if self.times_to_fail_launch is None:
raise Exception("Test fail server launch.")
if self.times_failed_launch < self.times_to_fail_launch:
self.times_failed_launch += 1
raise exceptions.ServerDeleteException("Test fail server launch.")
# Simulate a failure after the server record is created
ret = super().create_server(*args, **kwargs, done_status='ERROR')
ret.fault = {'message': 'expected error'}
return ret
else:
self.launch_success = True
return super(FakeLaunchAndDeleteFailCloud,
self).wait_for_server(**kwargs)
return super().create_server(*args, **kwargs)
def delete_server(self, *args, **kwargs):
if self.times_to_fail_delete is None:
@ -385,8 +406,7 @@ class FakeLaunchAndDeleteFailCloud(FakeOpenStackCloud):
raise exceptions.ServerDeleteException("Test fail server delete.")
else:
self.delete_success = True
return super(FakeLaunchAndDeleteFailCloud,
self).delete_server(*args, **kwargs)
return super().delete_server(*args, **kwargs)
class FakeDeleteImageFailCloud(FakeOpenStackCloud):
@ -404,31 +424,23 @@ class FakeDeleteImageFailCloud(FakeOpenStackCloud):
self).delete_image(*args, **kwargs)
class FakeProvider(OpenStackProvider):
class FakeAdapter(OpenStackAdapter):
fake_cloud = FakeOpenStackCloud
def __init__(self, provider):
def __init__(self, provider_config):
self.createServer_fails = 0
self.createServer_fails_with_external_id = 0
self.__client = FakeProvider.fake_cloud()
super(FakeProvider, self).__init__(provider)
self.__client = FakeAdapter.fake_cloud()
super().__init__(provider_config)
def _getClient(self):
return self.__client
def createServer(self, *args, **kwargs):
def _createServer(self, *args, **kwargs):
while self.createServer_fails:
self.createServer_fails -= 1
raise Exception("Expected createServer exception")
while self.createServer_fails_with_external_id:
self.createServer_fails_with_external_id -= 1
raise OpenStackCloudCreateException('server', 'fakeid')
return super(FakeProvider, self).createServer(*args, **kwargs)
def getRequestHandler(self, poolworker, request):
return FakeNodeRequestHandler(poolworker, request)
def start(self, zk_conn):
if self.provider.region_name == 'broken-region':
raise Exception("Broken cloud config")
super().start(zk_conn)
return super()._createServer(*args, **kwargs)

View File

@ -1,19 +0,0 @@
# Copyright 2017 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nodepool.driver.openstack.handler import OpenStackNodeRequestHandler
class FakeNodeRequestHandler(OpenStackNodeRequestHandler):
launcher_id = "Fake"

View File

@ -162,7 +162,7 @@ class GceAdapter(statemachine.Adapter):
self.provider.rate)
def getCreateStateMachine(self, hostname, label, image_external_id,
metadata, retries, request, log):
metadata, retries, request, az, log):
return GceCreateStateMachine(self, hostname, label, image_external_id,
metadata, retries, request, log)

View File

@ -61,6 +61,7 @@ class GceLabel(ConfigValue):
self.volume_type = label.get('volume-type', 'pd-standard')
self.volume_size = label.get('volume-size', '10')
self.diskimage = None
self.host_key_checking = self.pool.host_key_checking
class GcePool(ConfigPool):

View File

@ -381,7 +381,7 @@ class IBMVPCAdapter(statemachine.Adapter):
def getCreateStateMachine(self, hostname, label,
image_external_id, metadata, retries,
request, log):
request, az, log):
return IBMVPCCreateStateMachine(self, hostname, label,
image_external_id, metadata,
retries)

View File

@ -162,6 +162,7 @@ class IBMVPCLabel(ConfigValue):
self.profile = label['profile']
self.user_data = label.get('user-data', None)
self.host_key_checking = self.pool.host_key_checking
@staticmethod
def getSchema():

View File

@ -277,7 +277,7 @@ class MetastaticAdapter(statemachine.Adapter):
def getCreateStateMachine(self, hostname, label,
image_external_id, metadata, retries,
request, log):
request, az, log):
return MetastaticCreateStateMachine(self, hostname, label,
image_external_id, metadata,
retries)

View File

@ -45,6 +45,7 @@ class MetastaticLabel(ConfigValue):
self.cloud_image = MetastaticCloudImage()
self.max_parallel_jobs = label.get('max-parallel-jobs', 1)
self.grace_time = label.get('grace-time', 60)
self.host_key_checking = self.pool.host_key_checking
@staticmethod
def getSchema():

View File

@ -1,34 +1,34 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Copyright 2022 Acme Gating, LLC
#
# http://www.apache.org/licenses/LICENSE-2.0
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.config import loader
from nodepool.driver import Driver
from nodepool.driver.statemachine import StateMachineDriver
from nodepool.driver.openstack.config import OpenStackProviderConfig
from nodepool.driver.openstack.provider import OpenStackProvider
from nodepool.driver.openstack.adapter import OpenStackAdapter
class OpenStackDriver(Driver):
def __init__(self):
super().__init__()
self.reset()
class OpenStackDriver(StateMachineDriver):
def reset(self):
self.openstack_config = loader.OpenStackConfig()
def __init__(self):
self.reset()
super().__init__()
def getProviderConfig(self, provider):
return OpenStackProviderConfig(self, provider)
def getProvider(self, provider_config):
return OpenStackProvider(provider_config)
def getAdapter(self, provider_config):
return OpenStackAdapter(provider_config)

View File

@ -0,0 +1,801 @@
# Copyright (C) 2011-2013 OpenStack Foundation
# Copyright 2017 Red Hat
# Copyright 2022 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
from concurrent.futures import ThreadPoolExecutor
import functools
import logging
import time
import operator
import cachetools.func
import openstack
from nodepool.driver.utils import QuotaInformation
from nodepool.driver import statemachine
from nodepool import exceptions
from nodepool import stats
from nodepool import version
CACHE_TTL = 10
class OpenStackInstance(statemachine.Instance):
def __init__(self, provider, server, quota):
super().__init__()
self.external_id = server.id
self.metadata = server.metadata
self.private_ipv4 = server.private_v4
self.private_ipv6 = None
self.public_ipv4 = server.public_v4
self.public_ipv6 = server.public_v6
self.host_id = server.host_id
self.cloud = provider.cloud_config.name
self.region = provider.region_name
self.az = server.location.zone
self.interface_ip = server.interface_ip
self.quota = quota
def getQuotaInformation(self):
return self.quota
class OpenStackResource(statemachine.Resource):
def __init__(self, metadata, type, id):
super().__init__(metadata)
self.type = type
self.id = id
class OpenStackDeleteStateMachine(statemachine.StateMachine):
FLOATING_IP_DELETING = 'deleting floating ip'
SERVER_DELETE = 'delete server'
SERVER_DELETING = 'deleting server'
COMPLETE = 'complete'
def __init__(self, adapter, external_id, log):
self.log = log
super().__init__()
self.adapter = adapter
self.external_id = external_id
self.floating_ips = None
def advance(self):
if self.state == self.START:
self.server = self.adapter._getServer(self.external_id)
if (self.server and
self.adapter._hasFloatingIps() and
self.server.addresses):
self.floating_ips = self.adapter._getFloatingIps(self.server)
for fip in self.floating_ips:
self.adapter._deleteFloatingIp(fip)
self.state = self.FLOATING_IP_DELETING
if not self.floating_ips:
self.state = self.SERVER_DELETE
if self.state == self.FLOATING_IP_DELETING:
fips = []
for fip in self.floating_ips:
fip = self.adapter._refreshFloatingIpDelete(fip)
if not fip or fip['status'] == 'DOWN':
fip = None
if fip:
fips.append(fip)
self.floating_ips = fips
if self.floating_ips:
return
else:
self.state = self.SERVER_DELETE
if self.state == self.SERVER_DELETE:
self.adapter._deleteServer(self.external_id)
self.state = self.SERVER_DELETING
if self.state == self.SERVER_DELETING:
self.server = self.adapter._refreshServerDelete(self.server)
if self.server:
return
else:
self.state = self.COMPLETE
if self.state == self.COMPLETE:
self.complete = True
class OpenStackCreateStateMachine(statemachine.StateMachine):
SERVER_CREATING_SUBMIT = 'submit creating server'
SERVER_CREATING = 'creating server'
SERVER_RETRY = 'retrying server creation'
SERVER_RETRY_DELETING = 'deleting server for retry'
FLOATING_IP_CREATING = 'creating floating ip'
FLOATING_IP_ATTACHING = 'attaching floating ip'
COMPLETE = 'complete'
def __init__(self, adapter, hostname, label, image_external_id,
metadata, retries, request, az, log):
self.log = log
super().__init__()
self.adapter = adapter
self.provider = adapter.provider
self.retries = retries
self.attempts = 0
self.label = label
self.server = None
self.hostname = hostname
self.request = request
self.az = az
if image_external_id:
self.image_external = image_external_id
diskimage = self.provider.diskimages[label.diskimage.name]
self.config_drive = diskimage.config_drive
image_name = diskimage.name
else:
# launch using unmanaged cloud image
self.config_drive = label.cloud_image.config_drive
if label.cloud_image.image_id:
# Using a dict with the ID bypasses an image search during
# server creation.
self.image_external = dict(id=label.cloud_image.image_id)
else:
self.image_external = label.cloud_image.external_name
image_name = label.cloud_image.name
props = label.instance_properties.copy()
for k, v in label.dynamic_instance_properties.items():
try:
props[k] = v.format(request=self.request.getSafeAttributes())
except Exception:
self.log.exception(
"Error formatting dynamic instance property %s", k)
if not props:
props = None
# Put provider.name and image_name in as groups so that ansible
# inventory can auto-create groups for us based on each of those
# qualities
# Also list each of those values directly so that non-ansible
# consumption programs don't need to play a game of knowing that
# groups[0] is the image name or anything silly like that.
groups_list = [self.provider.name]
groups_list.append(image_name)
groups_list.append(label.name)
meta = dict(
groups=",".join(groups_list),
)
# merge in any instance properties provided from config
if props:
meta.update(props)
# merge nodepool-internal metadata
meta.update(metadata)
self.metadata = meta
self.flavor = self.adapter._findFlavor(
flavor_name=self.label.flavor_name,
min_ram=self.label.min_ram)
self.quota = QuotaInformation.construct_from_flavor(self.flavor)
self.external_id = None
def _handleServerFault(self):
if not self.external_id:
return
try:
server = self.adapter._getServerByIdNow(self.external_id)
if not server:
return
fault = server.get('fault', {}).get('message')
if fault:
self.log.error('Detailed node error: %s', fault)
if 'quota' in fault:
self.quota_exceeded = True
except Exception:
self.log.exception(
'Failed to retrieve node error information:')
def advance(self):
if self.state == self.START:
self.external_id = None
self.quota_exceeded = False
self.create_future = self.adapter._submitApi(
self.adapter._createServer,
self.hostname,
image=self.image_external,
flavor=self.flavor,
key_name=self.label.key_name,
az=self.az,
config_drive=self.config_drive,
networks=self.label.networks,
security_groups=self.label.pool.security_groups,
boot_from_volume=self.label.boot_from_volume,
volume_size=self.label.volume_size,
instance_properties=self.metadata,
userdata=self.label.userdata)
self.state = self.SERVER_CREATING_SUBMIT
if self.state == self.SERVER_CREATING_SUBMIT:
try:
try:
self.server = self.adapter._completeApi(self.create_future)
if self.server is None:
return
self.external_id = self.server.id
self.state = self.SERVER_CREATING
except openstack.cloud.exc.OpenStackCloudCreateException as e:
if e.resource_id:
self.external_id = e.resource_id
self._handleServerFault()
raise
except Exception as e:
self.log.exception("Launch attempt %d/%d failed:",
self.attempts, self.retries)
if 'quota exceeded' in str(e).lower():
self.quota_exceeded = True
if 'number of ports exceeded' in str(e).lower():
self.quota_exceeded = True
self.state = self.SERVER_RETRY
if self.state == self.SERVER_CREATING:
self.server = self.adapter._refreshServer(self.server)
if self.server.status == 'ACTIVE':
if (self.label.pool.auto_floating_ip and
self.adapter._needsFloatingIp(self.server)):
self.floating_ip = self.adapter._createFloatingIp(
self.server)
self.state = self.FLOATING_IP_CREATING
else:
self.state = self.COMPLETE
elif self.server.status == 'ERROR':
if ('fault' in self.server and self.server['fault'] is not None
and 'message' in self.server['fault']):
self.log.error(
"Error in creating the server."
" Compute service reports fault: {reason}".format(
reason=self.server['fault']['message']))
if self.external_id:
try:
self.server = self.adapter._deleteServer(
self.external_id)
except Exception:
self.log.exception("Error deleting server:")
self.server = None
else:
self.server = None
self.state = self.SERVER_RETRY
else:
return
if self.state == self.SERVER_RETRY:
if self.external_id:
try:
self.server = self.adapter._deleteServer(self.external_id)
except Exception:
self.log.exception("Error deleting server:")
# We must keep trying the delete until timeout in
# order to avoid having two servers for the same
# node id.
return
else:
self.server = None
self.state = self.SERVER_RETRY_DELETING
if self.state == self.SERVER_RETRY_DELETING:
self.server = self.adapter._refreshServerDelete(self.server)
if self.server:
return
self.attempts += 1
if self.attempts >= self.retries:
raise Exception("Too many retries")
if self.quota_exceeded:
# A quota exception is not directly recoverable so bail
# out immediately with a specific exception.
self.log.info("Quota exceeded, invalidating quota cache")
raise exceptions.QuotaException("Quota exceeded")
self.state = self.START
return
if self.state == self.FLOATING_IP_CREATING:
self.floating_ip = self.adapter._refreshFloatingIp(
self.floating_ip)
if self.floating_ip.get('port_id', None):
if self.floating_ip['status'] == 'ACTIVE':
self.state = self.FLOATING_IP_ATTACHING
else:
return
else:
self.adapter._attachIpToServer(self.server, self.floating_ip)
self.state = self.FLOATING_IP_ATTACHING
if self.state == self.FLOATING_IP_ATTACHING:
self.server = self.adapter._refreshServer(self.server)
ext_ip = openstack.cloud.meta.get_server_ip(
self.server, ext_tag='floating', public=True)
if ext_ip == self.floating_ip['floating_ip_address']:
self.state = self.COMPLETE
else:
return
if self.state == self.COMPLETE:
self.complete = True
return OpenStackInstance(
self.adapter.provider, self.server, self.quota)
class OpenStackAdapter(statemachine.Adapter):
# If we fail to find an image specified by the config, invalidate
# the image cache after this interval:
IMAGE_CHECK_TIMEOUT = 300
def __init__(self, provider_config):
# Wrap these instance methods with a per-instance LRU cache so
# that we don't leak memory over time when the adapter is
# occasionally replaced.
self._findImage = functools.lru_cache(maxsize=None)(
self._findImage)
self._listFlavors = functools.lru_cache(maxsize=None)(
self._listFlavors)
self._findNetwork = functools.lru_cache(maxsize=None)(
self._findNetwork)
self._listAZs = functools.lru_cache(maxsize=None)(
self._listAZs)
self.log = logging.getLogger(
f"nodepool.OpenStackAdapter.{provider_config.name}")
self.provider = provider_config
workers = 8
self.log.info("Create executor with max workers=%s", workers)
self.api_executor = ThreadPoolExecutor(
thread_name_prefix=f'openstack-api-{provider_config.name}',
max_workers=workers)
self._last_image_check_failure = time.time()
self._last_port_cleanup = None
self._statsd = stats.get_client()
self._client = self._getClient()
def stop(self):
self.api_executor.shutdown()
def getCreateStateMachine(self, hostname, label, image_external_id,
metadata, retries, request, az, log):
return OpenStackCreateStateMachine(
self, hostname, label, image_external_id,
metadata, retries, request, az, log)
def getDeleteStateMachine(self, external_id, log):
return OpenStackDeleteStateMachine(self, external_id, log)
def listResources(self):
for server in self._listServers():
if server.status.lower() == 'deleted':
continue
yield OpenStackResource(server.metadata,
'server', server.id)
# Floating IP and port leakage can't be handled by the
# automatic resource cleanup in cleanupLeakedResources because
# openstack doesn't store metadata on those objects, so we
# call internal cleanup methods here.
if self.provider.port_cleanup_interval:
self._cleanupLeakedPorts()
if self.provider.clean_floating_ips:
self._cleanupFloatingIps()
def deleteResource(self, resource):
self.log.info(f"Deleting leaked {resource.type}: {resource.id}")
if resource.type == 'server':
self._deleteServer(resource.id)
def listInstances(self):
for server in self._listServers():
if server.status.lower() == 'deleted':
continue
flavor = self._getFlavorFromServer(server)
quota = QuotaInformation.construct_from_flavor(flavor)
yield OpenStackInstance(self.provider, server, quota)
def getQuotaLimits(self):
limits = self._client.get_compute_limits()
return QuotaInformation.construct_from_limits(limits)
def getQuotaForLabel(self, label):
flavor = self._findFlavor(label.flavor_name, label.min_ram)
return QuotaInformation.construct_from_flavor(flavor)
def getAZs(self):
azs = self._listAZs()
if not azs:
# If there are no zones, return a list containing None so that
# random.choice can pick None and pass that to Nova. If this
# feels dirty, please direct your ire to policy.json and the
# ability to turn off random portions of the OpenStack API.
return [None]
return azs
def labelReady(self, label):
if not label.cloud_image:
return False
# If an image ID was supplied, we'll assume it is ready since
# we don't currently have a way of validating that (except during
# server creation).
if label.cloud_image.image_id:
return True
image = self._findImage(label.cloud_image.external_name)
if not image:
self.log.warning(
"Provider %s is configured to use %s as the"
" cloud-image for label %s and that"
" cloud-image could not be found in the"
" cloud." % (self.provider.name,
label.cloud_image.external_name,
label.name))
# If the user insists there should be an image but it
# isn't in our cache, invalidate the cache periodically so
# that we can see new cloud image uploads.
if (time.time() - self._last_image_check_failure >
self.IMAGE_CHECK_TIMEOUT):
self._findImage.cache_clear()
self._last_image_check_failure = time.time()
return False
return True
def uploadImage(self, provider_image, image_name, filename,
image_format, metadata, md5, sha256):
# configure glance and upload image. Note the meta flags
# are provided as custom glance properties
# NOTE: we have wait=True set here. This is not how we normally
# do things in nodepool, preferring to poll ourselves thankyouverymuch.
# However - two things to note:
# - PUT has no aysnc mechanism, so we have to handle it anyway
# - v2 w/task waiting is very strange and complex - but we have to
# block for our v1 clouds anyway, so we might as well
# have the interface be the same and treat faking-out
# a openstacksdk-level fake-async interface later
if not metadata:
metadata = {}
if image_format:
metadata['disk_format'] = image_format
image = self._client.create_image(
name=image_name,
filename=filename,
is_public=False,
wait=True,
md5=md5,
sha256=sha256,
**metadata)
return image.id
def deleteImage(self, external_id):
self.log.debug(f"Deleting image {external_id}")
return self._client.delete_image(external_id)
# Local implementation
def _getClient(self):
rate_limit = None
# nodepool tracks rate limit in time between requests.
# openstacksdk tracks rate limit in requests per second.
# 1/time = requests-per-second.
if self.provider.rate:
rate_limit = 1 / self.provider.rate
return openstack.connection.Connection(
config=self.provider.cloud_config,
use_direct_get=False,
rate_limit=rate_limit,
app_name='nodepool',
app_version=version.version_info.version_string()
)
def _submitApi(self, api, *args, **kw):
return self.api_executor.submit(
api, *args, **kw)
def _completeApi(self, future):
if not future.done():
return None
return future.result()
def _createServer(self, name, image, flavor,
az=None, key_name=None, config_drive=True,
networks=None, security_groups=None,
boot_from_volume=False, volume_size=50,
instance_properties=None, userdata=None):
if not networks:
networks = []
if not isinstance(image, dict):
# if it's a dict, we already have the cloud id. If it's not,
# we don't know if it's name or ID so need to look it up
image = self._findImage(image)
create_args = dict(name=name,
image=image,
flavor=flavor,
config_drive=config_drive)
if boot_from_volume:
create_args['boot_from_volume'] = boot_from_volume
create_args['volume_size'] = volume_size
# NOTE(pabelanger): Always cleanup volumes when we delete a server.
create_args['terminate_volume'] = True
if key_name:
create_args['key_name'] = key_name
if az:
create_args['availability_zone'] = az
if security_groups:
create_args['security_groups'] = security_groups
if userdata:
create_args['userdata'] = userdata
nics = []
for network in networks:
net_id = self._findNetwork(network)['id']
nics.append({'net-id': net_id})
if nics:
create_args['nics'] = nics
if instance_properties:
create_args['meta'] = instance_properties
try:
return self._client.create_server(wait=False, **create_args)
except openstack.exceptions.BadRequestException:
# We've gotten a 400 error from nova - which means the request
# was malformed. The most likely cause of that, unless something
# became functionally and systemically broken, is stale az, image
# or flavor cache. Log a message, invalidate the caches so that
# next time we get new caches.
self.log.info(
"Clearing az, flavor and image caches due to 400 error "
"from nova")
self._findImage.cache_clear()
self._listFlavors.cache_clear()
self._findNetwork.cache_clear()
self._listAZs.cache_clear()
raise
# This method is wrapped with an LRU cache in the constructor.
def _listAZs(self):
return self._client.list_availability_zone_names()
# This method is wrapped with an LRU cache in the constructor.
def _findImage(self, name):
return self._client.get_image(name, filters={'status': 'active'})
# This method is wrapped with an LRU cache in the constructor.
def _listFlavors(self):
return self._client.list_flavors(get_extra=False)
# This method is only used by the nodepool alien-image-list
# command and only works with the openstack driver.
def _listImages(self):
return self._client.list_images()
def _getFlavors(self):
flavors = self._listFlavors()
flavors.sort(key=operator.itemgetter('ram'))
return flavors
def _findFlavorByName(self, flavor_name):
for f in self._getFlavors():
if flavor_name in (f['name'], f['id']):
return f
raise Exception("Unable to find flavor: %s" % flavor_name)
def _findFlavorByRam(self, min_ram, flavor_name):
for f in self._getFlavors():
if (f['ram'] >= min_ram
and (not flavor_name or flavor_name in f['name'])):
return f
raise Exception("Unable to find flavor with min ram: %s" % min_ram)
def _findFlavorById(self, flavor_id):
for f in self._getFlavors():
if f['id'] == flavor_id:
return f
raise Exception("Unable to find flavor with id: %s" % flavor_id)
def _findFlavor(self, flavor_name, min_ram):
if min_ram:
return self._findFlavorByRam(min_ram, flavor_name)
else:
return self._findFlavorByName(flavor_name)
# This method is wrapped with an LRU cache in the constructor.
def _findNetwork(self, name):
network = self._client.get_network(name)
if not network:
raise Exception("Unable to find network %s in provider %s" % (
name, self.provider.name))
return network
@cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL)
def _listServers(self):
return self._client.list_servers()
@cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL)
def _listFloatingIps(self):
return self._client.list_floating_ips()
def _refreshServer(self, obj):
for server in self._listServers():
if server.id == obj.id:
return server
return obj
def _getServer(self, external_id):
for server in self._listServers():
if server.id == external_id:
return server
return None
def _getServerByIdNow(self, server_id):
# A synchronous get server by id. Only to be used in error
# handling where we can't wait for the list to update.
return self._client.get_server_by_id(server_id)
def _refreshServerDelete(self, obj):
if obj is None:
return obj
for server in self._listServers():
if server.id == obj.id:
if server.status.lower() == 'deleted':
return None
return server
return None
def _refreshFloatingIp(self, obj):
for fip in self._listFloatingIps():
if fip.id == obj.id:
return fip
return obj
def _refreshFloatingIpDelete(self, obj):
if obj is None:
return obj
for fip in self._listFloatingIps():
if fip.id == obj.id:
if fip.status == 'DOWN':
return None
return fip
return obj
def _needsFloatingIp(self, server):
return self._client._needs_floating_ip(
server=server, nat_destination=None)
def _createFloatingIp(self, server):
return self._client.create_floating_ip(server=server, wait=True)
def _attachIpToServer(self, server, fip):
# skip_attach is ignored for nova, which is the only time we
# should actually call this method.
return self._client._attach_ip_to_server(
server=server, floating_ip=fip,
skip_attach=True)
def _hasFloatingIps(self):
return self._client._has_floating_ips()
def _getFloatingIps(self, server):
fips = openstack.cloud.meta.find_nova_interfaces(
server['addresses'], ext_tag='floating')
fips = [self._client.get_floating_ip(
id=None, filters={'floating_ip_address': fip['addr']})
for fip in fips]
return fips
def _deleteFloatingIp(self, fip):
self._client.delete_floating_ip(fip['id'], retry=0)
def _deleteServer(self, external_id):
self._client.delete_server(external_id)
def _getFlavorFromServer(self, server):
# In earlier versions of nova or the sdk, flavor has just an id.
# In later versions it returns the information we're looking for.
# If we get the information we want, we do not need to try to
# lookup the flavor in our list.
if hasattr(server.flavor, 'vcpus'):
return server.flavor
else:
return self._findFlavorById(server.flavor.id)
# The port cleanup logic. We don't get tags or metadata, so we
# have to figure this out on our own.
# This method is not cached
def _listPorts(self, status=None):
'''
List known ports.
:param str status: A valid port status. E.g., 'ACTIVE' or 'DOWN'.
'''
if status:
ports = self._client.list_ports(filters={'status': status})
else:
ports = self._client.list_ports()
return ports
def _filterComputePorts(self, ports):
'''
Return a list of compute ports (or no device owner).
We are not interested in ports for routers or DHCP.
'''
ret = []
for p in ports:
if (p.device_owner is None or p.device_owner == '' or
p.device_owner.startswith("compute:")):
ret.append(p)
return ret
def _cleanupLeakedPorts(self):
if not self._last_port_cleanup:
self._last_port_cleanup = time.monotonic()
ports = self._listPorts(status='DOWN')
ports = self._filterComputePorts(ports)
self._down_ports = set([p.id for p in ports])
return
# Return if not enough time has passed between cleanup
last_check_in_secs = int(time.monotonic() - self._last_port_cleanup)
if last_check_in_secs <= self.provider.port_cleanup_interval:
return
ports = self._listPorts(status='DOWN')
ports = self._filterComputePorts(ports)
current_set = set([p.id for p in ports])
remove_set = current_set & self._down_ports
removed_count = 0
for port_id in remove_set:
try:
self._deletePort(port_id)
except Exception:
self.log.exception("Exception deleting port %s in %s:",
port_id, self.provider.name)
else:
removed_count += 1
self.log.debug("Removed DOWN port %s in %s",
port_id, self.provider.name)
if self._statsd and removed_count:
key = 'nodepool.provider.%s.leaked.ports' % (self.provider.name)
self._statsd.incr(key, removed_count)
self._last_port_cleanup = time.monotonic()
# Rely on OpenStack to tell us the down ports rather than doing our
# own set adjustment.
ports = self._listPorts(status='DOWN')
ports = self._filterComputePorts(ports)
self._down_ports = set([p.id for p in ports])
def _deletePort(self, port_id):
self._client.delete_port(port_id)
def _cleanupFloatingIps(self):
did_clean = self._client.delete_unattached_floating_ips()
if did_clean:
# some openstacksdk's return True if any port was
# cleaned, rather than the count. Just set it to 1 to
# indicate something happened.
if type(did_clean) == bool:
did_clean = 1
if self._statsd:
key = ('nodepool.provider.%s.leaked.floatingips'
% self.provider.name)
self._statsd.incr(key, did_clean)

View File

@ -29,11 +29,11 @@ class ProviderDiskImage(ConfigValue):
self.config_drive = None
self.connection_type = None
self.connection_port = None
self.username = None
self.python_path = None
self.shell_type = None
self.meta = None
def __repr__(self):
return "<ProviderDiskImage %s>" % self.name
class ProviderCloudImage(ConfigValue):
def __init__(self):
@ -47,9 +47,6 @@ class ProviderCloudImage(ConfigValue):
self.connection_type = None
self.connection_port = None
def __repr__(self):
return "<ProviderCloudImage %s>" % self.name
@property
def external_name(self):
'''Human readable version of external.'''
@ -76,28 +73,6 @@ class ProviderLabel(ConfigValue):
# The ProviderPool object that owns this label.
self.pool = None
def __eq__(self, other):
if isinstance(other, ProviderLabel):
# NOTE(Shrews): We intentionally do not compare 'pool' here
# since this causes recursive checks with ProviderPool.
return (other.diskimage == self.diskimage and
other.cloud_image == self.cloud_image and
other.min_ram == self.min_ram and
other.flavor_name == self.flavor_name and
other.key_name == self.key_name and
other.name == self.name and
other.console_log == self.console_log and
other.boot_from_volume == self.boot_from_volume and
other.volume_size == self.volume_size and
other.instance_properties == self.instance_properties and
other.userdata == self.userdata and
other.networks == self.networks and
other.host_key_checking == self.host_key_checking)
return False
def __repr__(self):
return "<ProviderLabel %s>" % self.name
class ProviderPool(ConfigPool):
ignore_equality = ['provider']
@ -112,6 +87,7 @@ class ProviderPool(ConfigPool):
self.security_groups = None
self.auto_floating_ip = True
self.host_key_checking = True
self.use_internal_ip = False
self.labels = None
# The OpenStackProviderConfig object that owns this pool.
self.provider = None
@ -119,9 +95,6 @@ class ProviderPool(ConfigPool):
# Initialize base class attributes
super().__init__()
def __repr__(self):
return "<ProviderPool %s>" % self.name
def load(self, pool_config, full_config, provider):
'''
Load pool configuration options.
@ -259,6 +232,9 @@ class OpenStackProviderConfig(ProviderConfig):
diskimage.image_types.add(self.image_type)
i.pause = bool(image.get('pause', False))
i.config_drive = image.get('config-drive', None)
i.username = diskimage.username
i.python_path = diskimage.python_path
i.shell_type = diskimage.shell_type
i.connection_type = image.get('connection-type', 'ssh')
i.connection_port = image.get(
'connection-port',

View File

@ -1,477 +0,0 @@
# Copyright (C) 2011-2014 OpenStack Foundation
# Copyright 2017 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import math
import pprint
import random
from kazoo import exceptions as kze
import openstack
from nodepool import exceptions
from nodepool import nodeutils as utils
from nodepool.zk import zookeeper as zk
from nodepool.driver.utils import NodeLauncher, QuotaInformation
from nodepool.driver import NodeRequestHandler
class OpenStackNodeLauncher(NodeLauncher):
def __init__(self, handler, node, provider_config, provider_label,
request):
'''
Initialize the launcher.
:param OpenStackNodeRequestHandler handler: The handler object.
:param Node node: A Node object describing the node to launch.
:param ProviderConfig provider_config: A ProviderConfig object
describing the provider launching this node.
:param ProviderLabel provider_label: A ProviderLabel object
describing the label to use for the node.
:param NodeRequest request: The NodeRequest that prompted the
launch.
'''
super().__init__(handler, node, provider_config)
# Number of times to retry failed launches.
self._retries = provider_config.launch_retries
self.label = provider_label
self.pool = provider_label.pool
self.request = request
def _logConsole(self, server_id, hostname):
if not self.label.console_log:
return
console = self.handler.manager.getServerConsole(server_id)
if console:
self.log.info('Console log from hostname %s:' % hostname)
for line in console.splitlines():
self.log.info(line.rstrip())
def _launchNode(self):
if self.label.diskimage:
diskimage = self.provider_config.diskimages[
self.label.diskimage.name]
else:
diskimage = None
if diskimage:
# launch using diskimage
cloud_image = self.handler.zk.getMostRecentImageUpload(
diskimage.name, self.provider_config.name)
if not cloud_image:
raise exceptions.LaunchNodepoolException(
"Unable to find current cloud image %s in %s" %
(diskimage.name, self.provider_config.name)
)
config_drive = diskimage.config_drive
# Using a dict with the ID bypasses an image search during
# server creation.
image_external = dict(id=cloud_image.external_id)
image_id = "{path}/{upload_id}".format(
path=self.handler.zk._imageUploadPath(
cloud_image.image_name,
cloud_image.build_id,
cloud_image.provider_name),
upload_id=cloud_image.id)
image_name = diskimage.name
username = cloud_image.username
python_path = cloud_image.python_path
shell_type = cloud_image.shell_type
connection_type = diskimage.connection_type
connection_port = diskimage.connection_port
else:
# launch using unmanaged cloud image
config_drive = self.label.cloud_image.config_drive
if self.label.cloud_image.image_id:
# Using a dict with the ID bypasses an image search during
# server creation.
image_external = dict(id=self.label.cloud_image.image_id)
else:
image_external = self.label.cloud_image.external_name
image_id = self.label.cloud_image.name
image_name = self.label.cloud_image.name
username = self.label.cloud_image.username
python_path = self.label.cloud_image.python_path
shell_type = self.label.cloud_image.shell_type
connection_type = self.label.cloud_image.connection_type
connection_port = self.label.cloud_image.connection_port
hostname = self.provider_config.hostname_format.format(
label=self.label, provider=self.provider_config, node=self.node
)
self.log.info(
"Creating server with hostname %s in %s from image %s" % (
hostname, self.provider_config.name, image_name))
# NOTE: We store the node ID in the server metadata to use for leaked
# instance detection. We cannot use the external server ID for this
# because that isn't available in ZooKeeper until after the server is
# active, which could cause a race in leak detection.
props = self.label.instance_properties.copy()
for k, v in self.label.dynamic_instance_properties.items():
try:
props[k] = v.format(request=self.request.getSafeAttributes())
except Exception:
self.log.exception(
"Error formatting dynamic instance property %s", k)
if not props:
props = None
try:
server = self.handler.manager.createServer(
hostname,
image=image_external,
min_ram=self.label.min_ram,
flavor_name=self.label.flavor_name,
key_name=self.label.key_name,
az=self.node.az,
config_drive=config_drive,
nodepool_node_id=self.node.id,
nodepool_node_label=self.node.type[0],
nodepool_image_name=image_name,
nodepool_pool_name=self.node.pool,
networks=self.label.networks,
security_groups=self.pool.security_groups,
boot_from_volume=self.label.boot_from_volume,
volume_size=self.label.volume_size,
instance_properties=props,
userdata=self.label.userdata)
except openstack.cloud.exc.OpenStackCloudCreateException as e:
if e.resource_id:
self.node.external_id = e.resource_id
# The outer exception handler will handle storing the
# node immediately after this.
raise
self.node.external_id = server.id
self.node.hostname = hostname
self.node.image_id = image_id
pool = self.handler.provider.pools.get(self.node.pool)
resources = self.handler.manager.quotaNeededByLabel(
self.node.type[0], pool)
self.node.resources = resources.get_resources()
if username:
self.node.username = username
self.node.python_path = python_path
self.node.shell_type = shell_type
self.node.connection_type = connection_type
self.node.connection_port = connection_port
# Checkpoint save the updated node info
self.zk.storeNode(self.node)
self.log.debug("Waiting for server %s" % server.id)
server = self.handler.manager.waitForServer(
server, self.provider_config.launch_timeout,
auto_ip=self.pool.auto_floating_ip)
if server.status != 'ACTIVE':
raise exceptions.LaunchStatusException("Server %s for node id: %s "
"status: %s" %
(server.id, self.node.id,
server.status))
# If we didn't specify an AZ, set it to the one chosen by Nova.
# Do this after we are done waiting since AZ may not be available
# immediately after the create request.
if not self.node.az:
self.node.az = server.location.zone
interface_ip = server.interface_ip
if not interface_ip:
self.log.debug(
"Server data for failed IP: %s" % pprint.pformat(
server))
raise exceptions.LaunchNetworkException(
"Unable to find public IP of server")
self.node.host_id = server.host_id
self.node.interface_ip = interface_ip
self.node.public_ipv4 = server.public_v4
self.node.public_ipv6 = server.public_v6
self.node.private_ipv4 = server.private_v4
# devstack-gate multi-node depends on private_v4 being populated
# with something. On clouds that don't have a private address, use
# the public.
if not self.node.private_ipv4:
self.node.private_ipv4 = server.public_v4
# Checkpoint save the updated node info
self.zk.storeNode(self.node)
self.log.debug(
"Node is running [region: %s, az: %s, ip: %s ipv4: %s, "
"ipv6: %s, hostid: %s]" %
(self.node.region, self.node.az,
self.node.interface_ip, self.node.public_ipv4,
self.node.public_ipv6, self.node.host_id))
# wait and scan the new node and record in ZooKeeper
host_keys = []
if self.label.host_key_checking:
try:
self.log.debug("Gathering host keys")
# only gather host keys if the connection type is ssh or
# network_cli
gather_host_keys = (
connection_type == 'ssh' or
connection_type == 'network_cli')
host_keys = utils.nodescan(
interface_ip,
timeout=self.provider_config.boot_timeout,
gather_hostkeys=gather_host_keys,
port=connection_port)
if gather_host_keys and not host_keys:
raise exceptions.LaunchKeyscanException(
"Unable to gather host keys")
except exceptions.ConnectionTimeoutException:
self._logConsole(self.node.external_id, self.node.hostname)
raise
self.node.host_keys = host_keys
self.zk.storeNode(self.node)
def launch(self):
attempts = 1
while attempts <= self._retries:
try:
self._launchNode()
break
except kze.SessionExpiredError:
# If we lost our ZooKeeper session, we've lost our node lock
# so there's no need to continue.
raise
except Exception as e:
if attempts <= self._retries:
self.log.exception("Launch attempt %d/%d failed:",
attempts, self._retries)
# If we got an external id we need to fetch the server info
# again in order to retrieve the fault reason as this is not
# included in the server object we already have.
quota_exceeded = False
if self.node.external_id:
try:
server = self.handler.manager.getServerById(
self.node.external_id) or {}
fault = server.get('fault', {}).get('message')
if fault:
self.log.error('Detailed node error: %s', fault)
if 'quota' in fault:
quota_exceeded = True
except Exception:
self.log.exception(
'Failed to retrieve node error information:')
# If we created an instance, delete it.
if self.node.external_id:
deleting_node = zk.Node()
deleting_node.provider = self.node.provider
deleting_node.pool = self.node.pool
deleting_node.type = self.node.type
deleting_node.external_id = self.node.external_id
deleting_node.state = zk.DELETING
self.zk.storeNode(deleting_node)
self.log.info("Node %s scheduled for cleanup",
deleting_node.external_id)
self.node.external_id = None
self.node.public_ipv4 = None
self.node.public_ipv6 = None
self.node.interface_ip = None
self.zk.storeNode(self.node)
if attempts == self._retries:
raise
if 'quota exceeded' in str(e).lower():
quota_exceeded = True
if 'number of ports exceeded' in str(e).lower():
quota_exceeded = True
if quota_exceeded:
# A quota exception is not directly recoverable so bail
# out immediately with a specific exception.
self.log.info("Quota exceeded, invalidating quota cache")
self.handler.manager.invalidateQuotaCache()
raise exceptions.QuotaException("Quota exceeded")
attempts += 1
self.node.state = zk.READY
self.zk.storeNode(self.node)
self.log.info("Node is ready")
class OpenStackNodeRequestHandler(NodeRequestHandler):
def __init__(self, pw, request):
super().__init__(pw, request)
self.chosen_az = None
self._threads = []
@property
def alive_thread_count(self):
count = 0
for t in self._threads:
if t.is_alive():
count += 1
return count
def imagesAvailable(self):
'''
Determines if the requested images are available for this provider.
ZooKeeper is queried for an image uploaded to the provider that is
in the READY state.
:returns: True if it is available, False otherwise.
'''
if self.provider.manage_images:
for label in self.request.node_types:
if self.pool.labels[label].cloud_image:
if not self.manager.labelReady(self.pool.labels[label]):
return False
else:
if not self.zk.getMostRecentImageUpload(
self.pool.labels[label].diskimage.name,
self.provider.name):
return False
return True
def hasRemainingQuota(self, ntype):
needed_quota = self.manager.quotaNeededByLabel(ntype, self.pool)
if not self.pool.ignore_provider_quota:
# Calculate remaining quota which is calculated as:
# quota = <total nodepool quota> - <used quota> - <quota for node>
cloud_quota = self.manager.estimatedNodepoolQuota()
cloud_quota.subtract(
self.manager.estimatedNodepoolQuotaUsed())
cloud_quota.subtract(needed_quota)
self.log.debug("Predicted remaining provider quota: %s",
cloud_quota)
if not cloud_quota.non_negative():
return False
# Now calculate pool specific quota. Values indicating no quota default
# to math.inf representing infinity that can be calculated with.
pool_quota = QuotaInformation(cores=self.pool.max_cores,
instances=self.pool.max_servers,
ram=self.pool.max_ram,
default=math.inf)
pool_quota.subtract(
self.manager.estimatedNodepoolQuotaUsed(self.pool))
self.log.debug("Current pool quota: %s" % pool_quota)
pool_quota.subtract(needed_quota)
self.log.debug("Predicted remaining pool quota: %s", pool_quota)
return pool_quota.non_negative()
def hasProviderQuota(self, node_types):
needed_quota = QuotaInformation()
for ntype in node_types:
needed_quota.add(
self.manager.quotaNeededByLabel(ntype, self.pool))
if not self.pool.ignore_provider_quota:
cloud_quota = self.manager.estimatedNodepoolQuota()
cloud_quota.subtract(needed_quota)
if not cloud_quota.non_negative():
return False
# Now calculate pool specific quota. Values indicating no quota default
# to math.inf representing infinity that can be calculated with.
pool_quota = QuotaInformation(cores=self.pool.max_cores,
instances=self.pool.max_servers,
ram=self.pool.max_ram,
default=math.inf)
pool_quota.subtract(needed_quota)
return pool_quota.non_negative()
def checkReusableNode(self, node):
if self.chosen_az and node.az != self.chosen_az:
return False
return True
def nodeReusedNotification(self, node):
"""
We attempt to group the node set within the same provider availability
zone.
For this to work properly, the provider entry in the nodepool
config must list the availability zones. Otherwise, new nodes will be
put in random AZs at nova's whim. The exception being if there is an
existing node in the READY state that we can select for this node set.
Its AZ will then be used for new nodes, as well as any other READY
nodes.
"""
# If we haven't already chosen an AZ, select the
# AZ from this ready node. This will cause new nodes
# to share this AZ, as well.
if not self.chosen_az and node.az:
self.chosen_az = node.az
def setNodeMetadata(self, node):
"""
Select grouping AZ if we didn't set AZ from a selected,
pre-existing node
"""
if not self.chosen_az:
self.chosen_az = random.choice(
self.pool.azs or self.manager.getAZs())
node.az = self.chosen_az
node.cloud = self.provider.cloud_config.name
node.region = self.provider.region_name
def launchesComplete(self):
'''
Check if all launch requests have completed.
When all of the Node objects have reached a final state (READY, FAILED
or ABORTED), we'll know all threads have finished the launch process.
'''
if not self._threads:
return True
# Give the NodeLaunch threads time to finish.
if self.alive_thread_count:
return False
node_states = [node.state for node in self.nodeset]
# NOTE: It's very important that NodeLauncher always sets one of
# these states, no matter what.
if not all(s in (zk.READY, zk.FAILED, zk.ABORTED)
for s in node_states):
return False
return True
def launch(self, node):
label = self.pool.labels[node.type[0]]
thd = OpenStackNodeLauncher(self, node, self.provider, label,
self.request)
thd.start()
self._threads.append(thd)

View File

@ -1,659 +0,0 @@
# Copyright (C) 2011-2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import operator
import threading
import time
import openstack
from openstack.exceptions import ResourceTimeout
from nodepool import exceptions
from nodepool.driver import Provider
from nodepool.driver.utils import QuotaInformation, QuotaSupport
from nodepool.driver.utils import NodeDeleter
from nodepool import stats
from nodepool import version
from nodepool.zk import zookeeper as zk
# Import entire module to avoid partial-loading, circular import
from nodepool.driver.openstack import handler
IPS_LIST_AGE = 5 # How long to keep a cached copy of the ip list
class OpenStackProvider(Provider, QuotaSupport):
log = logging.getLogger("nodepool.driver.openstack.OpenStackProvider")
def __init__(self, provider):
super().__init__()
self.provider = provider
self._images = {}
self._networks = {}
self.__flavors = {} # TODO(gtema): caching
self.__azs = None
self._zk = None
self._down_ports = set()
self._last_port_cleanup = None
self._statsd = stats.get_client()
self.running = False
self._server_list_watcher = threading.Thread(
name='ServerListWatcher', target=self._watchServerList,
daemon=True)
self._server_list_watcher_stop_event = threading.Event()
self._cleanup_queue = {}
self._startup_queue = {}
def start(self, zk_conn):
self.resetClient()
self._zk = zk_conn
self.running = True
self._server_list_watcher.start()
def stop(self):
self.running = False
self._server_list_watcher_stop_event.set()
def idle(self):
pass
def join(self):
self._server_list_watcher.join()
def getRequestHandler(self, poolworker, request):
return handler.OpenStackNodeRequestHandler(poolworker, request)
# TODO(gtema): caching
@property
def _flavors(self):
if not self.__flavors:
self.__flavors = self._getFlavors()
return self.__flavors
def _getClient(self):
rate_limit = None
# nodepool tracks rate limit in time between requests.
# openstacksdk tracks rate limit in requests per second.
# 1/time = requests-per-second.
if self.provider.rate:
rate_limit = 1 / self.provider.rate
return openstack.connection.Connection(
config=self.provider.cloud_config,
use_direct_get=False,
rate_limit=rate_limit,
app_name='nodepool',
app_version=version.version_info.version_string()
)
def getProviderLimits(self):
limits = self._client.get_compute_limits()
return QuotaInformation.construct_from_limits(limits)
def quotaNeededByLabel(self, ntype, pool):
provider_label = pool.labels[ntype]
flavor = self.findFlavor(provider_label.flavor_name,
provider_label.min_ram)
return QuotaInformation.construct_from_flavor(flavor)
def unmanagedQuotaUsed(self):
'''
Sums up the quota used by servers unmanaged by nodepool.
:return: Calculated quota in use by unmanaged servers
'''
flavors = self.listFlavorsById()
used_quota = QuotaInformation()
node_ids = set([n.id for n in self._zk.nodeIterator()])
for server in self.listNodes():
meta = server.get('metadata', {})
nodepool_provider_name = meta.get('nodepool_provider_name')
if (nodepool_provider_name and
nodepool_provider_name == self.provider.name):
# This provider (regardless of the launcher) owns this
# server so it must not be accounted for unmanaged
# quota; unless it has leaked.
nodepool_node_id = meta.get('nodepool_node_id')
# FIXME(tobiash): Add a test case for this
if nodepool_node_id and nodepool_node_id in node_ids:
# It has not leaked.
continue
# In earlier versions of nova or the sdk, flavor has just an id.
# In later versions it returns the information we're looking for.
# If we get the information we want, we do not need to try to
# lookup the flavor in our list.
if hasattr(server.flavor, 'vcpus'):
flavor = server.flavor
else:
flavor = flavors.get(server.flavor.id)
# If we still haven't found the flavor, skip handling this
# server instead of failing completely
if not flavor:
continue
used_quota.add(QuotaInformation.construct_from_flavor(flavor))
return used_quota
def resetClient(self):
self._client = self._getClient()
def _getFlavors(self):
flavors = self.listFlavors()
flavors.sort(key=operator.itemgetter('ram'))
return flavors
# TODO(gtema): These next three methods duplicate logic that is in
# openstacksdk, caching is not enabled there by default
# Remove it when caching is default
def _findFlavorByName(self, flavor_name):
for f in self._flavors:
if flavor_name in (f['name'], f['id']):
return f
raise Exception("Unable to find flavor: %s" % flavor_name)
def _findFlavorByRam(self, min_ram, flavor_name):
for f in self._flavors:
if (f['ram'] >= min_ram
and (not flavor_name or flavor_name in f['name'])):
return f
raise Exception("Unable to find flavor with min ram: %s" % min_ram)
def findFlavor(self, flavor_name, min_ram):
# Note: this will throw an error if the provider is offline
# but all the callers are in threads (they call in via CreateServer) so
# the mainloop won't be affected.
# TODO(gtema): enable commented block when openstacksdk has caching
# enabled by default
# if min_ram:
# return self._client.get_flavor_by_ram(
# ram=min_ram,
# include=flavor_name,
# get_extra=False)
# else:
# return self._client.get_flavor(flavor_name, get_extra=False)
if min_ram:
return self._findFlavorByRam(min_ram, flavor_name)
else:
return self._findFlavorByName(flavor_name)
def findImage(self, name):
if name in self._images:
return self._images[name]
image = self._client.get_image(name, filters={'status': 'active'})
self._images[name] = image
return image
def findNetwork(self, name):
if name in self._networks:
return self._networks[name]
network = self._client.get_network(name)
if not network:
raise Exception("Unable to find network %s in provider %s" % (
name, self.provider.name))
self._networks[name] = network
return network
def deleteImage(self, name, id):
if name in self._images:
del self._images[name]
return self._client.delete_image(id)
def createServer(self, name, image,
flavor_name=None, min_ram=None,
az=None, key_name=None, config_drive=True,
nodepool_node_id=None, nodepool_node_label=None,
nodepool_image_name=None,
nodepool_pool_name=None,
networks=None, security_groups=None,
boot_from_volume=False, volume_size=50,
instance_properties=None, userdata=None):
if not networks:
networks = []
if not isinstance(image, dict):
# if it's a dict, we already have the cloud id. If it's not,
# we don't know if it's name or ID so need to look it up
image = self.findImage(image)
flavor = self.findFlavor(flavor_name=flavor_name, min_ram=min_ram)
create_args = dict(name=name,
image=image,
flavor=flavor,
config_drive=config_drive)
if boot_from_volume:
create_args['boot_from_volume'] = boot_from_volume
create_args['volume_size'] = volume_size
# NOTE(pabelanger): Always cleanup volumes when we delete a server.
create_args['terminate_volume'] = True
if key_name:
create_args['key_name'] = key_name
if az:
create_args['availability_zone'] = az
if security_groups:
create_args['security_groups'] = security_groups
if userdata:
create_args['userdata'] = userdata
nics = []
for network in networks:
net_id = self.findNetwork(network)['id']
nics.append({'net-id': net_id})
if nics:
create_args['nics'] = nics
# Put provider.name and image_name in as groups so that ansible
# inventory can auto-create groups for us based on each of those
# qualities
# Also list each of those values directly so that non-ansible
# consumption programs don't need to play a game of knowing that
# groups[0] is the image name or anything silly like that.
groups_list = [self.provider.name]
if nodepool_image_name:
groups_list.append(nodepool_image_name)
if nodepool_node_label:
groups_list.append(nodepool_node_label)
meta = dict(
groups=",".join(groups_list),
nodepool_provider_name=self.provider.name,
nodepool_pool_name=nodepool_pool_name,
)
# merge in any provided properties
if instance_properties:
meta = {**instance_properties, **meta}
if nodepool_node_id:
meta['nodepool_node_id'] = nodepool_node_id
if nodepool_image_name:
meta['nodepool_image_name'] = nodepool_image_name
if nodepool_node_label:
meta['nodepool_node_label'] = nodepool_node_label
create_args['meta'] = meta
try:
return self._client.create_server(wait=False, **create_args)
except openstack.exceptions.BadRequestException:
# We've gotten a 400 error from nova - which means the request
# was malformed. The most likely cause of that, unless something
# became functionally and systemically broken, is stale az, image
# or flavor cache. Log a message, invalidate the caches so that
# next time we get new caches.
self._images = {}
self.__azs = None
self.__flavors = {} # TODO(gtema): caching
self.log.info(
"Clearing az, flavor and image caches due to 400 error "
"from nova")
raise
def getServer(self, server_id):
return self._client.get_server(server_id)
def getServerById(self, server_id):
return self._client.get_server_by_id(server_id)
def getServerConsole(self, server_id):
try:
return self._client.get_server_console(server_id)
except openstack.exceptions.OpenStackCloudException:
return None
def waitForServer(self, server, timeout=3600, auto_ip=True):
# This method is called from a separate thread per server. In order to
# reduce thread contention we don't call wait_for_server right now
# but put this thread on sleep until the desired instance is either
# in ACTIVE or ERROR state. After that just continue with
# wait_for_server which will continue its magic.
# TODO: log annotation
self.log.debug('Wait for central server creation %s', server.id)
event = threading.Event()
start_time = time.monotonic()
self._startup_queue[server.id] = (event, start_time + timeout)
if not event.wait(timeout=timeout):
# On timeout emit the same exception as wait_for_server would to
timeout_message = "Timeout waiting for the server to come up."
raise ResourceTimeout(timeout_message)
# TODO: log annotation
self.log.debug('Finished wait for central server creation %s',
server.id)
# Re-calculate timeout to account for the duration so far
elapsed = time.monotonic() - start_time
timeout = max(0, timeout - elapsed)
return self._client.wait_for_server(
server=server, auto_ip=auto_ip,
reuse=False, timeout=timeout)
def waitForNodeCleanup(self, server_id, timeout=600):
event = threading.Event()
self._cleanup_queue[server_id] = (event, time.monotonic() + timeout)
if not event.wait(timeout=timeout):
raise exceptions.ServerDeleteException(
"server %s deletion" % server_id)
def createImage(self, server, image_name, meta):
return self._client.create_image_snapshot(
image_name, server, **meta)
def getImage(self, image_id):
return self._client.get_image(image_id, filters={'status': 'active'})
def labelReady(self, label):
if not label.cloud_image:
return False
# If an image ID was supplied, we'll assume it is ready since
# we don't currently have a way of validating that (except during
# server creation).
if label.cloud_image.image_id:
return True
image = self.getImage(label.cloud_image.external_name)
if not image:
self.log.warning(
"Provider %s is configured to use %s as the"
" cloud-image for label %s and that"
" cloud-image could not be found in the"
" cloud." % (self.provider.name,
label.cloud_image.external_name,
label.name))
return False
return True
def uploadImage(self, provider_image, image_name, filename,
image_type=None, meta=None, md5=None, sha256=None):
# configure glance and upload image. Note the meta flags
# are provided as custom glance properties
# NOTE: we have wait=True set here. This is not how we normally
# do things in nodepool, preferring to poll ourselves thankyouverymuch.
# However - two things to note:
# - PUT has no aysnc mechanism, so we have to handle it anyway
# - v2 w/task waiting is very strange and complex - but we have to
# block for our v1 clouds anyway, so we might as well
# have the interface be the same and treat faking-out
# a openstacksdk-level fake-async interface later
if not meta:
meta = {}
if image_type:
meta['disk_format'] = image_type
image = self._client.create_image(
name=image_name,
filename=filename,
is_public=False,
wait=True,
md5=md5,
sha256=sha256,
**meta)
return image.id
def listPorts(self, status=None):
'''
List known ports.
:param str status: A valid port status. E.g., 'ACTIVE' or 'DOWN'.
'''
if status:
ports = self._client.list_ports(filters={'status': status})
else:
ports = self._client.list_ports()
return ports
def deletePort(self, port_id):
self._client.delete_port(port_id)
def listImages(self):
return self._client.list_images()
def listFlavors(self):
return self._client.list_flavors(get_extra=False)
def listFlavorsById(self):
flavors = {}
for flavor in self._client.list_flavors(get_extra=False):
flavors[flavor.id] = flavor
return flavors
def listNodes(self):
# list_servers carries the nodepool server list caching logic
return self._client.list_servers()
def deleteServer(self, server_id):
return self._client.delete_server(server_id, delete_ips=True)
def startNodeCleanup(self, node):
t = NodeDeleter(self._zk, self, node)
t.start()
return t
def cleanupNode(self, server_id):
server = self.getServer(server_id)
if not server:
raise exceptions.NotFound()
self.log.debug('Deleting server %s' % server_id)
self.deleteServer(server_id)
def cleanupLeakedInstances(self):
'''
Delete any leaked server instances.
Remove any servers found in this provider that are not recorded in
the ZooKeeper data.
'''
deleting_nodes = {}
for node in self._zk.nodeIterator():
if node.state == zk.DELETING:
if node.provider != self.provider.name:
continue
if node.provider not in deleting_nodes:
deleting_nodes[node.provider] = []
deleting_nodes[node.provider].append(node.external_id)
for server in self._client.list_servers(bare=True):
meta = server.get('metadata', {})
if 'nodepool_provider_name' not in meta:
continue
if meta['nodepool_provider_name'] != self.provider.name:
# Another launcher, sharing this provider but configured
# with a different name, owns this.
continue
if (self.provider.name in deleting_nodes and
server.id in deleting_nodes[self.provider.name]):
# Already deleting this node
continue
if not self._zk.getNode(meta['nodepool_node_id']):
self.log.warning(
"Marking for delete leaked instance %s (%s) in %s "
"(unknown node id %s)",
server.name, server.id, self.provider.name,
meta['nodepool_node_id']
)
# Create an artifical node to use for deleting the server.
node = zk.Node()
node.external_id = server.id
node.provider = self.provider.name
node.pool = meta.get('nodepool_pool_name')
node.state = zk.DELETING
self._zk.storeNode(node)
if self._statsd:
key = ('nodepool.provider.%s.leaked.nodes'
% self.provider.name)
self._statsd.incr(key)
def filterComputePorts(self, ports):
'''
Return a list of compute ports (or no device owner).
We are not interested in ports for routers or DHCP.
'''
ret = []
for p in ports:
if (p.device_owner is None or p.device_owner == '' or
p.device_owner.startswith("compute:")):
ret.append(p)
return ret
def cleanupLeakedPorts(self):
if not self._last_port_cleanup:
self._last_port_cleanup = time.monotonic()
ports = self.listPorts(status='DOWN')
ports = self.filterComputePorts(ports)
self._down_ports = set([p.id for p in ports])
return
# Return if not enough time has passed between cleanup
last_check_in_secs = int(time.monotonic() - self._last_port_cleanup)
if last_check_in_secs <= self.provider.port_cleanup_interval:
return
ports = self.listPorts(status='DOWN')
ports = self.filterComputePorts(ports)
current_set = set([p.id for p in ports])
remove_set = current_set & self._down_ports
removed_count = 0
for port_id in remove_set:
try:
self.deletePort(port_id)
except Exception:
self.log.exception("Exception deleting port %s in %s:",
port_id, self.provider.name)
else:
removed_count += 1
self.log.debug("Removed DOWN port %s in %s",
port_id, self.provider.name)
if self._statsd and removed_count:
key = 'nodepool.provider.%s.leaked.ports' % (self.provider.name)
self._statsd.incr(key, removed_count)
self._last_port_cleanup = time.monotonic()
# Rely on OpenStack to tell us the down ports rather than doing our
# own set adjustment.
ports = self.listPorts(status='DOWN')
ports = self.filterComputePorts(ports)
self._down_ports = set([p.id for p in ports])
def cleanupLeakedResources(self):
self.cleanupLeakedInstances()
if self.provider.port_cleanup_interval:
self.cleanupLeakedPorts()
if self.provider.clean_floating_ips:
did_clean = self._client.delete_unattached_floating_ips()
if did_clean:
# some openstacksdk's return True if any port was
# cleaned, rather than the count. Just set it to 1 to
# indicate something happened.
if type(did_clean) == bool:
did_clean = 1
if self._statsd:
key = ('nodepool.provider.%s.leaked.floatingips'
% self.provider.name)
self._statsd.incr(key, did_clean)
def getAZs(self):
if self.__azs is None:
self.__azs = self._client.list_availability_zone_names()
if not self.__azs:
# If there are no zones, return a list containing None so that
# random.choice can pick None and pass that to Nova. If this
# feels dirty, please direct your ire to policy.json and the
# ability to turn off random portions of the OpenStack API.
self.__azs = [None]
return self.__azs
def _watchServerList(self):
log = logging.getLogger(
"nodepool.driver.openstack.OpenStackProvider.watcher")
while self.running:
if self._server_list_watcher_stop_event.wait(5):
# We're stopping now so don't wait with any thread for node
# deletion.
for event, _ in self._cleanup_queue.values():
event.set()
for event, _ in self._startup_queue.values():
event.set()
break
if not self._cleanup_queue and not self._startup_queue:
# No server deletion to wait for so check can be skipped
continue
try:
log.debug('Get server list')
start = time.monotonic()
# List bare to avoid neutron calls
servers = self._client.list_servers(bare=True)
log.debug('Got server list in %.3fs', time.monotonic() - start)
except Exception:
log.exception('Failed to get server list')
continue
def process_timeouts(queue):
for server_id in list(queue.keys()):
# Remove entries that are beyond timeout
_, timeout = queue[server_id]
if time.monotonic() > timeout:
del queue[server_id]
# Process cleanup queue
existing_server_ids = {
server.id for server in servers
if server.status != 'DELETED'
}
for server_id in list(self._cleanup_queue.keys()):
# Notify waiting threads that don't have server ids
if server_id not in existing_server_ids:
# Notify the thread which is waiting for the delete
log.debug('Waking up cleanup thread for server %s',
server_id)
self._cleanup_queue[server_id][0].set()
del self._cleanup_queue[server_id]
# Process startup queue
finished_server_ids = {
server.id for server in servers
if server.status in ('ACTIVE', 'ERROR')
}
for server_id in list(self._startup_queue.keys()):
# Notify waiting threads that don't have server ids
if server_id in finished_server_ids:
# Notify the thread which is waiting for the delete
log.debug('Waking up startup thread for server %s',
server_id)
self._startup_queue[server_id][0].set()
del self._startup_queue[server_id]
# Process timeouts
process_timeouts(self._cleanup_queue)
process_timeouts(self._startup_queue)
log.debug('Done')

View File

@ -17,6 +17,7 @@
import time
import logging
import math
import random
import threading
from concurrent.futures.thread import ThreadPoolExecutor
@ -137,7 +138,7 @@ class StateMachineNodeLauncher(stats.StatsReporter):
'nodepool_provider_name': self.manager.provider.name}
self.state_machine = self.manager.adapter.getCreateStateMachine(
hostname, label, image_external_id, metadata, retries,
self.handler.request, self.log)
self.handler.request, self.handler.chosen_az, self.log)
def updateNodeFromInstance(self, instance):
if instance is None:
@ -157,6 +158,7 @@ class StateMachineNodeLauncher(stats.StatsReporter):
node.public_ipv4 = instance.public_ipv4
node.private_ipv4 = instance.private_ipv4
node.public_ipv6 = instance.public_ipv6
node.host_id = instance.host_id
node.cloud = instance.cloud
node.region = instance.region
node.az = instance.az
@ -205,20 +207,17 @@ class StateMachineNodeLauncher(stats.StatsReporter):
instance = state_machine.advance()
self.log.debug(f"State machine for {node.id} at "
f"{state_machine.state}")
if not node.external_id:
if not state_machine.external_id:
raise Exception("Driver implementation error: state "
"machine must produce external ID "
"after first advancement")
if not node.external_id and state_machine.external_id:
node.external_id = state_machine.external_id
self.zk.storeNode(node)
if state_machine.complete and not self.keyscan_future:
self.updateNodeFromInstance(instance)
self.log.debug("Submitting keyscan request for %s",
node.interface_ip)
label = self.handler.pool.labels[self.node.type[0]]
future = self.manager.keyscan_worker.submit(
keyscan,
self.handler.pool.host_key_checking,
label.host_key_checking,
node.id, node.interface_ip,
node.connection_type, node.connection_port,
self.manager.provider.boot_timeout)
@ -240,6 +239,7 @@ class StateMachineNodeLauncher(stats.StatsReporter):
node.external_id = state_machine.external_id
self.zk.storeNode(node)
statsd_key = 'error.quota'
self.manager.invalidateQuotaCache()
except Exception as e:
self.log.exception(
"Launch failed for node %s:", node.id)
@ -352,6 +352,7 @@ class StateMachineHandler(NodeRequestHandler):
def __init__(self, pw, request):
super().__init__(pw, request)
self.chosen_az = None
self.launchers = []
@property
@ -364,6 +365,15 @@ class StateMachineHandler(NodeRequestHandler):
:returns: True if it is available, False otherwise.
'''
for label in self.request.node_types:
if self.pool.labels[label].cloud_image:
if not self.manager.labelReady(self.pool.labels[label]):
return False
else:
if not self.zk.getMostRecentImageUpload(
self.pool.labels[label].diskimage.name,
self.provider.name):
return False
return True
def hasProviderQuota(self, node_types):
@ -414,17 +424,18 @@ class StateMachineHandler(NodeRequestHandler):
needed_quota = self.manager.quotaNeededByLabel(ntype, self.pool)
self.log.debug("Needed quota: %s", needed_quota)
# Calculate remaining quota which is calculated as:
# quota = <total nodepool quota> - <used quota> - <quota for node>
cloud_quota = self.manager.estimatedNodepoolQuota()
cloud_quota.subtract(
self.manager.estimatedNodepoolQuotaUsed())
cloud_quota.subtract(needed_quota)
self.log.debug("Predicted remaining provider quota: %s",
cloud_quota)
if not self.pool.ignore_provider_quota:
# Calculate remaining quota which is calculated as:
# quota = <total nodepool quota> - <used quota> - <quota for node>
cloud_quota = self.manager.estimatedNodepoolQuota()
cloud_quota.subtract(
self.manager.estimatedNodepoolQuotaUsed())
cloud_quota.subtract(needed_quota)
self.log.debug("Predicted remaining provider quota: %s",
cloud_quota)
if not cloud_quota.non_negative():
return False
if not cloud_quota.non_negative():
return False
# Now calculate pool specific quota. Values indicating no quota default
# to math.inf representing infinity that can be calculated with.
@ -444,6 +455,37 @@ class StateMachineHandler(NodeRequestHandler):
return pool_quota.non_negative()
def checkReusableNode(self, node):
if self.chosen_az and node.az != self.chosen_az:
return False
return True
def nodeReusedNotification(self, node):
"""
We attempt to group the node set within the same provider availability
zone.
For this to work properly, the provider entry in the nodepool
config must list the availability zones. Otherwise, new node placement
will be determined by the cloud. The exception being if there is an
existing node in the READY state that we can select for this node set.
Its AZ will then be used for new nodes, as well as any other READY
nodes.
"""
# If we haven't already chosen an AZ, select the
# AZ from this ready node. This will cause new nodes
# to share this AZ, as well.
if not self.chosen_az and node.az:
self.chosen_az = node.az
def setNodeMetadata(self, node):
"""
Select grouping AZ if we didn't set AZ from a selected,
pre-existing node
"""
if not self.chosen_az:
self.chosen_az = random.choice(
self.pool.azs or self.manager.adapter.getAZs())
def launchesComplete(self):
'''
Check if all launch requests have completed.
@ -495,9 +537,11 @@ class StateMachineProvider(Provider, QuotaSupport):
super().start(zk_conn)
self.running = True
self._zk = zk_conn
self.keyscan_worker = ThreadPoolExecutor()
self.keyscan_worker = ThreadPoolExecutor(
thread_name_prefix=f'keyscan-{self.provider.name}')
self.state_machine_thread = threading.Thread(
target=self._runStateMachines)
target=self._runStateMachines,
daemon=True)
self.state_machine_thread.start()
def stop(self):
@ -555,7 +599,7 @@ class StateMachineProvider(Provider, QuotaSupport):
return StateMachineHandler(poolworker, request)
def labelReady(self, label):
return True
return self.adapter.labelReady(label)
def getProviderLimits(self):
try:
@ -745,6 +789,7 @@ class Instance:
* cloud: str
* az: str
* region: str
* host_id: str
* driver_data: any
* slot: int
@ -769,6 +814,7 @@ class Instance:
self.cloud = None
self.az = None
self.region = None
self.host_id = None
self.metadata = {}
self.driver_data = None
self.slot = None
@ -951,6 +997,36 @@ class Adapter:
"""
return QuotaInformation(instances=1)
def getAZs(self):
"""Return a list of availability zones for this provider
One of these will be selected at random and supplied to the
create state machine. If a request handler is building a node
set from an existing ready node, then the AZ from that node
will be used instead of the results of this method.
:returns: A list of availability zone names.
"""
return [None]
def labelReady(self, label):
"""Indicate whether a label is ready in the provided cloud
This is used by the launcher to determine whether it should
consider a label to be in-service for a provider. If this
returns False, the label will be ignored for this provider.
This does not need to consider whether a diskimage is ready;
the launcher handles that itself. Instead, this can be used
to determine whether a cloud-image is available.
:param ProviderLabel label: A config object describing a label
for an instance.
:returns: A bool indicating whether the label is ready.
"""
return True
# The following methods must be implemented only if image
# management is supported:

View File

@ -287,6 +287,10 @@ class BaseTestCase(testtools.TestCase):
continue
if t.name.startswith("ThreadPoolExecutor"):
continue
if t.name.startswith("openstack-api"):
continue
if t.name.startswith("keyscan"):
continue
if t.name not in whitelist:
done = False
if done:
@ -584,7 +588,11 @@ class DBTestCase(BaseTestCase):
for _ in iterate_timeout(ONE_MINUTE, Exception,
"Cloud instance deletion",
interval=1):
servers = manager.listNodes()
if hasattr(manager, 'adapter'):
servers = manager.adapter._listServers()
else:
# TODO: remove once all drivers use statemachine
servers = manager.listNodes()
if not (instance_id in [s.id for s in servers]):
break

View File

@ -34,7 +34,7 @@ providers:
- name: main
max-servers: 96
networks:
- 'some-name'
- 'no-auto-ip-network-name'
auto-floating-ip: False
labels:
- name: fake-label1

View File

@ -20,7 +20,7 @@ import mock
import time
from nodepool import builder, tests
from nodepool.driver.fake import provider as fakeprovider
from nodepool.driver.fake import adapter as fakeadapter
from nodepool.zk import zookeeper as zk
from nodepool.config import Config
from nodepool.nodeutils import iterate_timeout
@ -173,13 +173,13 @@ class TestNodePoolBuilder(tests.DBTestCase):
"""Test that image upload fails are handled properly."""
# Now swap out the upload fake so that the next uploads fail
fake_client = fakeprovider.FakeUploadFailCloud(times_to_fail=1)
fake_client = fakeadapter.FakeUploadFailCloud(times_to_fail=1)
def get_fake_client(*args, **kwargs):
return fake_client
self.useFixture(fixtures.MockPatchObject(
fakeprovider.FakeProvider, '_getClient',
fakeadapter.FakeAdapter, '_getClient',
get_fake_client))
configfile = self.setup_config('node.yaml')
@ -264,13 +264,13 @@ class TestNodePoolBuilder(tests.DBTestCase):
def test_image_removal_dib_deletes_first(self):
# Break cloud image deleting
fake_client = fakeprovider.FakeDeleteImageFailCloud()
fake_client = fakeadapter.FakeDeleteImageFailCloud()
def get_fake_client(*args, **kwargs):
return fake_client
self.useFixture(fixtures.MockPatchObject(
fakeprovider.FakeProvider, '_getClient',
fakeadapter.FakeAdapter, '_getClient',
get_fake_client))
configfile = self.setup_config('node_two_image.yaml')

View File

@ -121,7 +121,7 @@ class TestNodepoolCMD(tests.DBTestCase):
def fail_list(self):
raise RuntimeError('Fake list error')
self.useFixture(fixtures.MonkeyPatch(
'nodepool.driver.fake.provider.FakeOpenStackCloud.list_servers',
'nodepool.driver.fake.adapter.FakeOpenStackCloud.list_servers',
fail_list))
configfile = self.setup_config("node_cmd.yaml")

View File

@ -86,7 +86,7 @@ class TestDriverMetastatic(tests.DBTestCase):
pool.start()
self.wait_for_config(pool)
manager = pool.getProviderManager('fake-provider')
manager._client.create_image(name="fake-image")
manager.adapter._client.create_image(name="fake-image")
# Request a node, verify that there is a backing node, and it
# has the same connection info
@ -151,7 +151,7 @@ class TestDriverMetastatic(tests.DBTestCase):
pool.start()
self.wait_for_config(pool)
manager = pool.getProviderManager('fake-provider')
manager._client.create_image(name="fake-image")
manager.adapter._client.create_image(name="fake-image")
# Request a node, verify that there is a backing node, and it
# has the same connection info
@ -166,7 +166,7 @@ class TestDriverMetastatic(tests.DBTestCase):
pool.start()
self.wait_for_config(pool)
manager = pool.getProviderManager('fake-provider')
manager._client.create_image(name="fake-image")
manager.adapter._client.create_image(name="fake-image")
# Allocate a second node, should have same backing node
node2 = self._requestNode()
@ -188,7 +188,7 @@ class TestDriverMetastatic(tests.DBTestCase):
pool.start()
self.wait_for_config(pool)
manager = pool.getProviderManager('fake-provider')
manager._client.create_image(name="fake-image")
manager.adapter._client.create_image(name="fake-image")
# Request a node, verify that there is a backing node, and it
# has the same connection info
@ -209,7 +209,7 @@ class TestDriverMetastatic(tests.DBTestCase):
pool.start()
self.wait_for_config(pool)
manager = pool.getProviderManager('fake-provider')
manager._client.create_image(name="fake-image")
manager.adapter._client.create_image(name="fake-image")
# Delete the metastatic node and verify that backing is deleted
node1.state = zk.DELETING

View File

@ -498,7 +498,8 @@ class TestDriverStatic(tests.DBTestCase):
self.wait_for_config(pool)
manager = pool.getProviderManager('openstack-provider')
manager._client.create_image(name="fake-image")
manager.adapter._client.create_image(name="fake-image")
manager.adapter.IMAGE_CHECK_TIMEOUT = 0
req = zk.NodeRequest()
req.state = zk.REQUESTED

View File

@ -24,7 +24,8 @@ import testtools
from nodepool import tests
from nodepool.zk import zookeeper as zk
from nodepool.zk.components import PoolComponent
from nodepool.driver.fake import provider as fakeprovider
from nodepool.driver.statemachine import StateMachineProvider
from nodepool.driver.fake import adapter as fakeadapter
from nodepool.nodeutils import iterate_timeout
import nodepool.launcher
from nodepool.version import get_version_string
@ -35,6 +36,12 @@ from kazoo import exceptions as kze
class TestLauncher(tests.DBTestCase):
log = logging.getLogger("nodepool.TestLauncher")
def setUp(self):
super().setUp()
StateMachineProvider.MINIMUM_SLEEP = 0.1
StateMachineProvider.MAXIMUM_SLEEP = 1
def test_node_assignment(self):
'''
Successful node launch should have unlocked nodes in READY state
@ -89,7 +96,7 @@ class TestLauncher(tests.DBTestCase):
# We check the "cloud" side attributes are set from nodepool side
provider = pool.getProviderManager('fake-provider')
cloud_node = provider.getServer(node.hostname)
cloud_node = provider.adapter._getServer(node.external_id)
self.assertEqual(
cloud_node.metadata['nodepool_provider_name'],
'fake-provider')
@ -183,7 +190,7 @@ class TestLauncher(tests.DBTestCase):
def fake_get_quota():
return (max_cores, max_instances, max_ram)
self.useFixture(fixtures.MockPatchObject(
fakeprovider.FakeProvider.fake_cloud, '_get_quota',
fakeadapter.FakeAdapter.fake_cloud, '_get_quota',
fake_get_quota
))
@ -196,7 +203,7 @@ class TestLauncher(tests.DBTestCase):
pool.start()
self.wait_for_config(pool)
client = pool.getProviderManager('fake-provider')._getClient()
client = pool.getProviderManager('fake-provider').adapter._getClient()
req1 = zk.NodeRequest()
req1.state = zk.REQUESTED
@ -412,7 +419,7 @@ class TestLauncher(tests.DBTestCase):
def fake_get_quota():
return (math.inf, 1, math.inf)
self.useFixture(fixtures.MockPatchObject(
fakeprovider.FakeProvider.fake_cloud, '_get_quota',
fakeadapter.FakeAdapter.fake_cloud, '_get_quota',
fake_get_quota
))
@ -472,7 +479,7 @@ class TestLauncher(tests.DBTestCase):
nonlocal max_cores, max_instances, max_ram
return (max_cores, max_instances, max_ram)
self.useFixture(fixtures.MockPatchObject(
fakeprovider.FakeProvider.fake_cloud, '_get_quota',
fakeadapter.FakeAdapter.fake_cloud, '_get_quota',
fake_get_quota
))
@ -485,7 +492,7 @@ class TestLauncher(tests.DBTestCase):
pool.start()
self.wait_for_config(pool)
client = pool.getProviderManager('fake-provider')._getClient()
client = pool.getProviderManager('fake-provider').adapter._getClient()
# Wait for a single node to be created
req1 = zk.NodeRequest()
@ -545,7 +552,7 @@ class TestLauncher(tests.DBTestCase):
pool.start()
self.wait_for_config(pool)
manager = pool.getProviderManager('fake-provider')
manager.createServer_fails = 2
manager.adapter.createServer_fails = 2
req = zk.NodeRequest()
req.state = zk.REQUESTED
@ -553,7 +560,7 @@ class TestLauncher(tests.DBTestCase):
self.zk.storeNodeRequest(req)
req = self.waitForNodeRequest(req)
self.assertEqual(0, manager.createServer_fails)
self.assertEqual(0, manager.adapter.createServer_fails)
self.assertEqual(req.state, zk.FAILED)
self.assertNotEqual(req.declined_by, [])
@ -578,7 +585,7 @@ class TestLauncher(tests.DBTestCase):
self.assertEqual(req.state, zk.FULFILLED)
# now change the azs in the cloud
cloud = pool.getProviderManager('fake-provider')._getClient()
cloud = pool.getProviderManager('fake-provider').adapter._getClient()
cloud._azs = ['new-az1', 'new-az2']
# Do a second request. This will fail because the cached azs are not
@ -661,7 +668,7 @@ class TestLauncher(tests.DBTestCase):
provider = (builder._upload_workers[0]._config.
provider_managers['fake-provider'])
cloud_image = provider.getImage(image.external_id)
cloud_image = provider.adapter._findImage(image.external_id)
self.assertEqual(
cloud_image._kw.get('diskimage_metadata'), 'diskimage')
self.assertEqual(
@ -690,7 +697,7 @@ class TestLauncher(tests.DBTestCase):
# We check the "cloud" side attributes are set from nodepool side
provider = pool.getProviderManager('fake-provider')
cloud_node = provider.getServer(nodes[0].hostname)
cloud_node = provider.adapter._getServer(nodes[0].external_id)
self.assertEqual(
cloud_node.metadata['nodepool_provider_name'],
'fake-provider')
@ -927,7 +934,7 @@ class TestLauncher(tests.DBTestCase):
self.assertEqual(nodes[0].provider, 'fake-provider')
self.assertEqual(len(nodes_def_sg), 1)
self.assertEqual(nodes_def_sg[0].provider, 'fake-provider')
client = pool.getProviderManager('fake-provider')._getClient()
client = pool.getProviderManager('fake-provider').adapter._getClient()
for server in client._server_list:
if server.id == nodes[0].external_id:
self.assertEqual(server.security_groups, ['fake-sg'])
@ -1066,7 +1073,7 @@ class TestLauncher(tests.DBTestCase):
# Get fake cloud record and set status to DELETING
manager = pool.getProviderManager('fake-provider')
for instance in manager.listNodes():
for instance in manager.adapter._listServers():
if instance.id == nodes[0].external_id:
instance.status = 'DELETED'
break
@ -1078,7 +1085,7 @@ class TestLauncher(tests.DBTestCase):
self.waitForNodeDeletion(nodes[0])
api_record_remains = False
for instance in manager.listNodes():
for instance in manager.adapter._listServers():
if instance.id == nodes[0].external_id:
api_record_remains = True
break
@ -1098,7 +1105,7 @@ class TestLauncher(tests.DBTestCase):
pool.start()
self.wait_for_config(pool)
manager = pool.getProviderManager('fake-provider')
manager.createServer_fails = 2
manager.adapter.createServer_fails = 2
self.waitForImage('fake-provider', 'fake-image')
req = zk.NodeRequest()
@ -1110,7 +1117,7 @@ class TestLauncher(tests.DBTestCase):
self.assertEqual(req.state, zk.FAILED)
# retries in config is set to 2, so 2 attempts to create a server
self.assertEqual(0, manager.createServer_fails)
self.assertEqual(0, manager.adapter.createServer_fails)
def test_node_launch_with_broken_znodes(self):
"""Test that node launch still works if there are broken znodes"""
@ -1152,7 +1159,7 @@ class TestLauncher(tests.DBTestCase):
pool.start()
self.wait_for_config(pool)
manager = pool.getProviderManager('fake-provider')
manager.createServer_fails_with_external_id = 2
manager.adapter.createServer_fails_with_external_id = 2
self.waitForImage('fake-provider', 'fake-image')
# Stop the DeletedNodeWorker so we can make sure the fake znode that
@ -1170,7 +1177,8 @@ class TestLauncher(tests.DBTestCase):
self.assertEqual(req.state, zk.FAILED)
# retries in config is set to 2, so 2 attempts to create a server
self.assertEqual(0, manager.createServer_fails_with_external_id)
self.assertEqual(
0, manager.adapter.createServer_fails_with_external_id)
# Request another node to check if nothing is wedged
req = zk.NodeRequest()
@ -1186,7 +1194,7 @@ class TestLauncher(tests.DBTestCase):
raise RuntimeError('Fake Error')
self.useFixture(fixtures.MockPatchObject(
fakeprovider.FakeProvider, 'deleteServer', fail_delete))
fakeadapter.FakeAdapter, '_deleteServer', fail_delete))
configfile = self.setup_config('node.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
@ -1215,10 +1223,10 @@ class TestLauncher(tests.DBTestCase):
def test_node_delete_error(self):
def error_delete(self, name):
# Set ERROR status instead of deleting the node
self._getClient()._server_list[0].status = 'ERROR'
self._client._server_list[0].status = 'ERROR'
self.useFixture(fixtures.MockPatchObject(
fakeprovider.FakeProvider, 'deleteServer', error_delete))
fakeadapter.FakeAdapter, '_deleteServer', error_delete))
configfile = self.setup_config('node_delete_error.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
@ -1244,14 +1252,10 @@ class TestLauncher(tests.DBTestCase):
# wait the cleanup thread to kick in
time.sleep(5)
zk_nodes = self.zk.getNodes()
self.assertEqual(len(zk_nodes), 1)
node = self.zk.getNode(zk_nodes[0])
self.assertEqual(node.state, zk.DELETING)
# remove error nodes
pool.getProviderManager(
'fake-provider')._getClient()._server_list.clear()
# Make sure it shows up as leaked
manager = pool.getProviderManager('fake-provider')
instances = list(manager.adapter.listInstances())
self.assertEqual(1, len(instances))
def test_leaked_node(self):
"""Test that a leaked node is deleted"""
@ -1267,7 +1271,7 @@ class TestLauncher(tests.DBTestCase):
# Make sure we have a node built and ready
self.assertEqual(len(nodes), 1)
manager = pool.getProviderManager('fake-provider')
servers = manager.listNodes()
servers = manager.adapter._listServers()
self.assertEqual(len(servers), 1)
# Delete the node from ZooKeeper, but leave the instance
@ -1286,7 +1290,7 @@ class TestLauncher(tests.DBTestCase):
self.waitForInstanceDeletion(manager, nodes[0].external_id)
# Make sure we end up with only one server (the replacement)
servers = manager.listNodes()
servers = manager.adapter._listServers()
self.assertEqual(len(servers), 1)
def test_max_ready_age(self):
@ -1599,9 +1603,10 @@ class TestLauncher(tests.DBTestCase):
pool.start()
self.wait_for_config(pool)
manager = pool.getProviderManager('fake-provider')
manager._client.create_image(name="fake-image")
manager._client.create_image(name="fake-image-windows")
manager._client.create_image(name="fake-image-windows-port")
manager.adapter.IMAGE_CHECK_TIMEOUT = 1
manager.adapter._client.create_image(name="fake-image")
manager.adapter._client.create_image(name="fake-image-windows")
manager.adapter._client.create_image(name="fake-image-windows-port")
nodes = self.waitForNodes('fake-label')
self.assertEqual(len(nodes), 1)
@ -1638,7 +1643,8 @@ class TestLauncher(tests.DBTestCase):
pool.start()
self.wait_for_config(pool)
manager = pool.getProviderManager('fake-provider')
manager._client.create_image(name="provider-named-image")
manager.adapter.IMAGE_CHECK_TIMEOUT = 1
manager.adapter._client.create_image(name="provider-named-image")
nodes = self.waitForNodes('fake-label')
self.assertEqual(len(nodes), 1)
@ -1936,7 +1942,7 @@ class TestLauncher(tests.DBTestCase):
self.wait_for_config(pool)
manager = pool.getProviderManager('good-provider')
manager._client.create_image(name="good-image")
manager.adapter._client.create_image(name="good-image")
good_req = zk.NodeRequest()
good_req.state = zk.REQUESTED
@ -2013,11 +2019,11 @@ class TestLauncher(tests.DBTestCase):
time.sleep(1)
launcher_pools = self.zk.getRegisteredPools()
@mock.patch('nodepool.driver.openstack.handler.'
'OpenStackNodeLauncher._launchNode')
@mock.patch('nodepool.driver.statemachine.'
'StateMachineNodeLauncher.launch')
def test_launchNode_session_expired(self, mock_launch):
'''
Test ZK session lost during _launchNode().
Test ZK session lost during launch().
'''
mock_launch.side_effect = kze.SessionExpiredError()
@ -2044,19 +2050,19 @@ class TestLauncher(tests.DBTestCase):
while self.zk.countPoolNodes('fake-provider', 'main'):
time.sleep(0)
@mock.patch('nodepool.driver.openstack.provider.'
'OpenStackProvider.invalidateQuotaCache')
@mock.patch('nodepool.driver.statemachine.'
'StateMachineProvider.invalidateQuotaCache')
def test_launchNode_node_fault_message(self, mock_invalidatequotacache):
'''
Test failed launch can get detailed node fault info if available.
'''
fake_client = fakeprovider.FakeLaunchAndGetFaultCloud()
fake_client = fakeadapter.FakeLaunchAndGetFaultCloud()
def get_fake_client(*args, **kwargs):
return fake_client
self.useFixture(fixtures.MockPatchObject(
fakeprovider.FakeProvider, '_getClient',
fakeadapter.FakeAdapter, '_getClient',
get_fake_client))
configfile = self.setup_config('node_launch_retry.yaml')
@ -2089,20 +2095,19 @@ class TestLauncher(tests.DBTestCase):
Test that the launcher keeps trying to spawn a node in case of a
delete error
'''
fake_client = fakeprovider.FakeLaunchAndDeleteFailCloud(
fake_client = fakeadapter.FakeLaunchAndDeleteFailCloud(
times_to_fail=1)
def get_fake_client(*args, **kwargs):
return fake_client
self.useFixture(fixtures.MockPatchObject(
fakeprovider.FakeProvider, '_getClient',
fakeadapter.FakeAdapter, '_getClient',
get_fake_client))
configfile = self.setup_config('node_launch_retry.yaml')
self.useBuilder(configfile)
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.cleanup_interval = 60
pool.start()
self.waitForImage('fake-provider', 'fake-image')
@ -2248,7 +2253,7 @@ class TestLauncher(tests.DBTestCase):
def fake_get_quota():
return (0, 20, 1000000)
self.useFixture(fixtures.MockPatchObject(
fakeprovider.FakeProvider.fake_cloud, '_get_quota',
fakeadapter.FakeAdapter.fake_cloud, '_get_quota',
fake_get_quota
))
@ -2283,7 +2288,7 @@ class TestLauncher(tests.DBTestCase):
def fake_get_quota():
return (0, 20, 1000000)
self.useFixture(fixtures.MockPatchObject(
fakeprovider.FakeProvider.fake_cloud, '_get_quota',
fakeadapter.FakeAdapter.fake_cloud, '_get_quota',
fake_get_quota
))
@ -2380,7 +2385,7 @@ class TestLauncher(tests.DBTestCase):
return (100, max_instances, 1000000)
self.useFixture(fixtures.MockPatchObject(
fakeprovider.FakeProvider.fake_cloud, '_get_quota',
fakeadapter.FakeAdapter.fake_cloud, '_get_quota',
fake_get_quota
))
@ -2423,7 +2428,7 @@ class TestLauncher(tests.DBTestCase):
return (100, max_instances, 1000000)
self.useFixture(fixtures.MockPatchObject(
fakeprovider.FakeProvider.fake_cloud, '_get_quota',
fakeadapter.FakeAdapter.fake_cloud, '_get_quota',
fake_get_quota
))
@ -2565,7 +2570,7 @@ class TestLauncher(tests.DBTestCase):
self.waitForNodes('fake-label')
manager = pool.getProviderManager('fake-provider')
down_ports = manager.listPorts(status='DOWN')
down_ports = manager.adapter._listPorts(status='DOWN')
self.assertEqual(2, len(down_ports))
self.log.debug("Down ports: %s", down_ports)
@ -2580,13 +2585,10 @@ class TestLauncher(tests.DBTestCase):
except AssertionError:
# config still hasn't updated, retry
manager = pool.getProviderManager('fake-provider')
# Reset the client as a new fake client will have been
# created.
manager.resetClient()
for _ in iterate_timeout(4, Exception, 'assert ports are cleaned'):
try:
down_ports = manager.listPorts(status='DOWN')
down_ports = manager.adapter._listPorts(status='DOWN')
self.assertEqual(0, len(down_ports))
break
except AssertionError:

View File

@ -63,7 +63,7 @@ class TestShadeIntegration(tests.IntegrationTestCase):
# thread that causes wait_for_threads in subsequent tests to fail.
self.addCleanup(pm.stop)
pm.start(None)
self.assertEqual(pm._client.auth, auth_data)
self.assertEqual(pm.adapter._client.auth, auth_data)
def test_nodepool_occ_config_reload(self):
configfile = self.setup_config('integration_occ.yaml')
@ -76,8 +76,8 @@ class TestShadeIntegration(tests.IntegrationTestCase):
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.updateConfig()
provider_manager = pool.config.provider_managers['real-provider']
self.assertEqual(provider_manager._client.auth, auth_data)
pm = pool.config.provider_managers['real-provider']
self.assertEqual(pm.adapter._client.auth, auth_data)
# update the config
auth_data['password'] = 'os_new_real'
@ -86,5 +86,5 @@ class TestShadeIntegration(tests.IntegrationTestCase):
yaml.safe_dump(occ_config, h)
pool.updateConfig()
provider_manager = pool.config.provider_managers['real-provider']
self.assertEqual(provider_manager._client.auth, auth_data)
pm = pool.config.provider_managers['real-provider']
self.assertEqual(pm.adapter._client.auth, auth_data)