Add openstack driver

This adds the openstack driver copied from nodepool and minimally
updated to make some tests pass.

A new (minimal) fake openstack implementation is included.

Many features and configuration options are not yet supported.
This is probably enough implementation to upload images.

Change-Id: Iacb7b11335020d6ff8fcf3d7643d6d62e3efd384
This commit is contained in:
James E. Blair 2024-09-30 15:57:58 -07:00
parent b4666fecc0
commit 7c802b4e88
14 changed files with 2021 additions and 48 deletions

View File

@ -86,11 +86,12 @@ from zuul.driver.gitlab import GitlabDriver
from zuul.driver.gerrit import GerritDriver
from zuul.driver.elasticsearch import ElasticsearchDriver
from zuul.driver.aws import AwsDriver
from zuul.driver.openstack import OpenstackDriver
from zuul.lib.collections import DefaultKeyDict
from zuul.lib.connections import ConnectionRegistry
from zuul.zk import zkobject, ZooKeeperClient
from zuul.zk.components import SchedulerComponent, COMPONENT_REGISTRY
from zuul.zk.event_queues import ConnectionEventQueue
from zuul.zk.event_queues import ConnectionEventQueue, PipelineResultEventQueue
from zuul.zk.executor import ExecutorApi
from zuul.zk.locks import tenant_read_lock, pipeline_lock, SessionAwareLock
from zuul.zk.merger import MergerApi
@ -456,6 +457,7 @@ class TestConnectionRegistry(ConnectionRegistry):
self, test_config, config, upstream_root, additional_event_queues))
self.registerDriver(ElasticsearchDriver())
self.registerDriver(AwsDriver())
self.registerDriver(OpenstackDriver())
class FakeAnsibleManager(zuul.lib.ansible.AnsibleManager):
@ -3829,6 +3831,45 @@ class ZuulTestCase(BaseTestCase):
path = os.path.join(self.test_root, "changes.data")
self.test_config.changes.load(path)
def requestNodes(self, labels, tenant="tenant-one", pipeline="check"):
result_queue = PipelineResultEventQueue(
self.zk_client, tenant, pipeline)
with self.createZKContext(None) as ctx:
# Lock the pipeline, so we can grab the result event
with (self.scheds.first.sched.run_handler_lock,
pipeline_lock(self.zk_client, tenant, pipeline)):
request = model.NodesetRequest.new(
ctx,
tenant_name="tenant-one",
pipeline_name="check",
buildset_uuid=uuid.uuid4().hex,
job_uuid=uuid.uuid4().hex,
job_name="foobar",
labels=labels,
priority=100,
request_time=time.time(),
zuul_event_id=uuid.uuid4().hex,
span_info=None,
)
for _ in iterate_timeout(
10, "nodeset request to be fulfilled"):
result_events = list(result_queue)
if result_events:
for event in result_events:
# Remove event(s) from queue
result_queue.ack(event)
break
self.assertEqual(len(result_events), 1)
for event in result_queue:
self.assertIsInstance(event, model.NodesProvisionedEvent)
self.assertEqual(event.request_id, request.uuid)
self.assertEqual(event.build_set_uuid, request.buildset_uuid)
request.refresh(ctx)
return request
class AnsibleZuulTestCase(ZuulTestCase):
"""ZuulTestCase but with an actual ansible executor running"""

181
tests/fake_openstack.py Normal file
View File

@ -0,0 +1,181 @@
# Copyright (C) 2011-2013 OpenStack Foundation
# Copyright 2022, 2024 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import uuid
from zuul.driver.openstack.openstackendpoint import OpenstackProviderEndpoint
class FakeOpenstackObject:
def __init__(self, **kw):
self.__dict__.update(kw)
self.__kw = list(kw.keys())
def _get_dict(self):
data = {}
for k in self.__kw:
data[k] = getattr(self, k)
return data
def __contains__(self, key):
return key in self.__dict__
def __getitem__(self, key, default=None):
return getattr(self, key, default)
def __setitem__(self, key, value):
setattr(self, key, value)
def get(self, key, default=None):
return getattr(self, key, default)
def set(self, key, value):
setattr(self, key, value)
class FakeOpenstackFlavor(FakeOpenstackObject):
pass
class FakeOpenstackServer(FakeOpenstackObject):
pass
class FakeOpenstackLocation(FakeOpenstackObject):
pass
class FakeOpenstackImage(FakeOpenstackObject):
pass
class FakeOpenstackCloud:
log = logging.getLogger("zuul.FakeOpenstackCloud")
def __init__(self):
self.servers = []
self.volumes = []
self.images = []
self.flavors = [
FakeOpenstackFlavor(
id='425e3203150e43d6b22792f86752533d',
name='Fake Flavor',
ram=8192,
vcpus=4,
)
]
def _getConnection(self):
return FakeOpenstackConnection(self)
class FakeOpenstackResponse:
def __init__(self, data):
self._data = data
self.links = []
def json(self):
return self._data
class FakeOpenstackSession:
def __init__(self, cloud):
self.cloud = cloud
def get(self, uri, headers, params):
if uri == '/servers/detail':
server_list = []
for server in self.cloud.servers:
data = server._get_dict()
data['hostId'] = data.pop('host_id')
data['OS-EXT-AZ:availability_zone'] = data.pop('location').zone
data['os-extended-volumes:volumes_attached'] =\
data.pop('volumes')
server_list.append(data)
return FakeOpenstackResponse({'servers': server_list})
class FakeOpenstackConnection:
log = logging.getLogger("zuul.FakeOpenstackConnection")
def __init__(self, cloud):
self.cloud = cloud
self.compute = FakeOpenstackSession(cloud)
def list_flavors(self, get_extra=False):
return self.cloud.flavors
def list_volumes(self):
return self.cloud.volumes
def list_servers(self):
return self.cloud.servers
def create_server(self, wait=None, name=None, image=None,
flavor=None, config_drive=None, key_name=None,
meta=None):
location = FakeOpenstackLocation(zone=None)
server = FakeOpenstackServer(
id=uuid.uuid4().hex,
name=name,
host_id='fake_host_id',
location=location,
volumes=[],
status='ACTIVE',
addresses=dict(
public=[dict(version=4, addr='fake'),
dict(version=6, addr='fake_v6')],
private=[dict(version=4, addr='fake')]
)
)
self.cloud.servers.append(server)
return server
def delete_server(self, name_or_id):
for x in self.cloud.servers:
if x.id == name_or_id:
self.cloud.servers.remove(x)
return
def create_image(self, wait=None, name=None, filename=None,
is_public=None, md5=None, sha256=None,
timeout=None, **meta):
image = FakeOpenstackImage(
id=uuid.uuid4().hex,
name=name,
filename=filename,
is_public=is_public,
md5=md5,
sha256=sha256,
status='ACTIVE',
)
self.cloud.images.append(image)
return image
def delete_image(self, name_or_id):
for x in self.cloud.servers:
if x.id == name_or_id:
self.cloud.servers.remove(x)
return
class FakeOpenstackProviderEndpoint(OpenstackProviderEndpoint):
def _getClient(self):
return self._fake_cloud._getConnection()
def _expandServer(self, server):
return server

31
tests/fixtures/clouds.yaml vendored Normal file
View File

@ -0,0 +1,31 @@
clouds:
fake:
auth:
username: 'fake'
password: 'fake'
project_id: 'fake'
auth_url: 'fake'
fake-vhd:
auth:
username: 'fake'
password: 'fake'
project_id: 'fake'
auth_url: 'fake'
image_format: 'vhd'
fake-qcow2:
auth:
username: 'fake'
password: 'fake'
project_id: 'fake'
auth_url: 'fake'
image_format: 'qcow2'
fake-raw:
auth:
username: 'fake'
password: 'fake'
project_id: 'fake'
auth_url: 'fake'
image_format: 'raw'

View File

@ -0,0 +1,80 @@
- pipeline:
name: check
manager: independent
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
Verified: 1
failure:
gerrit:
Verified: -1
- pipeline:
name: image
manager: independent
trigger:
zuul:
- event: image-build
success:
zuul:
image-built: true
image-validated: true
- job:
name: base
parent: null
run: playbooks/base.yaml
nodeset:
nodes:
- label: ubuntu-xenial
name: controller
- job:
name: build-debian-local-image
image-build-name: debian-local
- project:
name: org/common-config
image:
jobs:
- build-debian-local-image
- image:
name: debian-local
type: zuul
- flavor:
name: normal
- label:
name: debian-local-normal
image: debian-local
flavor: normal
- section:
name: openstack-base
abstract: true
connection: openstack
boot-timeout: 120
launch-timeout: 600
launch-attempts: 2
# TODO
# key-name: zuul
flavors:
- name: normal
flavor-name: Fake Flavor
images:
- name: debian-local
- section:
name: openstack
parent: openstack-base
- provider:
name: openstack-main
section: openstack
labels:
- name: debian-local-normal
key-name: zuul

View File

@ -0,0 +1,124 @@
- pipeline:
name: check
manager: independent
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
Verified: 1
failure:
gerrit:
Verified: -1
- pipeline:
name: gate
manager: dependent
success-message: Build succeeded (gate).
trigger:
gerrit:
- event: comment-added
approval:
- Approved: 1
success:
gerrit:
Verified: 2
submit: true
failure:
gerrit:
Verified: -2
start:
gerrit:
Verified: 0
precedence: high
- pipeline:
name: post
manager: independent
trigger:
gerrit:
- event: ref-updated
ref: ^(?!refs/).*$
- pipeline:
name: tag
manager: independent
trigger:
gerrit:
- event: ref-updated
ref: ^refs/tags/.*$
- job:
name: base
parent: null
run: playbooks/base.yaml
nodeset:
nodes:
- label: debian-normal
name: controller
- job:
name: check-job
run: playbooks/check.yaml
- job:
name: post-job
run: playbooks/post.yaml
- job:
name: tag-job
run: playbooks/tag.yaml
- project:
name: org/project
check:
jobs:
- check-job
gate:
jobs:
- check-job
post:
jobs:
- post-job
tag:
jobs:
- tag-job
- image:
name: debian
type: cloud
- flavor:
name: normal
- label:
name: debian-normal
image: debian
flavor: normal
- section:
name: openstack-base
abstract: true
connection: openstack
boot-timeout: 120
launch-timeout: 600
launch-attempts: 2
# TODO
# key-name: zuul
flavors:
- name: normal
flavor-name: Fake Flavor
images:
- name: debian
image-id: fake-image
- section:
name: openstack
parent: openstack-base
- provider:
name: openstack-main
section: openstack
labels:
- name: debian-normal
key-name: zuul

View File

@ -40,3 +40,6 @@ dburi=$MYSQL_FIXTURE_DBURI$
driver=aws
access_key_id=fake
secret_access_key=fake
[connection openstack]
driver=openstack

View File

@ -13,14 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import time
import uuid
from unittest import mock
from zuul import model
from zuul.launcher.client import LauncherClient
from zuul.zk.event_queues import PipelineResultEventQueue
from zuul.zk.locks import pipeline_lock
import responses
import testtools
@ -267,44 +263,6 @@ class TestLauncher(ZuulTestCase):
self.assertEqual("test_external_id", uploads[0].external_id)
self.assertTrue(uploads[0].validated)
def _requestNodes(self, labels):
result_queue = PipelineResultEventQueue(
self.zk_client, "tenant-one", "check")
with self.createZKContext(None) as ctx:
# Lock the pipeline, so we can grab the result event
with pipeline_lock(self.zk_client, "tenant-one", "check"):
request = model.NodesetRequest.new(
ctx,
tenant_name="tenant-one",
pipeline_name="check",
buildset_uuid=uuid.uuid4().hex,
job_uuid=uuid.uuid4().hex,
job_name="foobar",
labels=labels,
priority=100,
request_time=time.time(),
zuul_event_id=uuid.uuid4().hex,
span_info=None,
)
for _ in iterate_timeout(
10, "nodeset request to be fulfilled"):
result_events = list(result_queue)
if result_events:
for event in result_events:
# Remove event(s) from queue
result_queue.ack(event)
break
self.assertEqual(len(result_events), 1)
for event in result_queue:
self.assertIsInstance(event, model.NodesProvisionedEvent)
self.assertEqual(event.request_id, request.uuid)
self.assertEqual(event.build_set_uuid, request.buildset_uuid)
request.refresh(ctx)
return request
@simple_layout('layouts/nodepool.yaml', enable_nodepool=True)
def test_jobs_executed(self):
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@ -357,7 +315,7 @@ class TestLauncher(ZuulTestCase):
def test_launcher_missing_label(self):
ctx = self.createZKContext(None)
labels = ["debian-normal", "debian-unavailable"]
request = self._requestNodes(labels)
request = self.requestNodes(labels)
self.assertEqual(request.state, model.NodesetRequest.State.FAILED)
self.assertEqual(len(request.nodes), 0)
@ -370,7 +328,7 @@ class TestLauncher(ZuulTestCase):
nodeset.addNode(model.Node("node", "debian-normal"))
ctx = self.createZKContext(None)
request = self._requestNodes([n.label for n in nodeset.getNodes()])
request = self.requestNodes([n.label for n in nodeset.getNodes()])
client = LauncherClient(self.zk_client, None)
request = client.getRequest(request.uuid)
@ -415,7 +373,7 @@ class TestLauncher(ZuulTestCase):
@simple_layout('layouts/nodepool.yaml', enable_nodepool=True)
def test_lost_nodeset_request(self):
ctx = self.createZKContext(None)
request = self._requestNodes(["debian-normal"])
request = self.requestNodes(["debian-normal"])
provider_nodes = []
for node_id in request.nodes:
@ -442,7 +400,7 @@ class TestLauncher(ZuulTestCase):
@okay_tracebacks('_getQuotaForInstanceType')
def test_failed_node(self):
ctx = self.createZKContext(None)
request = self._requestNodes(["debian-invalid"])
request = self.requestNodes(["debian-invalid"])
self.assertEqual(request.state, model.NodesetRequest.State.FAILED)
provider_nodes = request.provider_nodes[0]
self.assertEqual(len(provider_nodes), 2)
@ -499,7 +457,7 @@ class TestLauncher(ZuulTestCase):
nodeset.addNode(model.Node("node", "debian-local-normal"))
ctx = self.createZKContext(None)
request = self._requestNodes([n.label for n in nodeset.getNodes()])
request = self.requestNodes([n.label for n in nodeset.getNodes()])
client = LauncherClient(self.zk_client, None)
request = client.getRequest(request.uuid)

View File

@ -0,0 +1,156 @@
# Copyright 2024 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import fixtures
import testtools
from kazoo.exceptions import NoNodeError
from zuul import model
from zuul.launcher.client import LauncherClient
from zuul.driver.openstack import OpenstackDriver
from tests.fake_openstack import (
FakeOpenstackCloud,
FakeOpenstackProviderEndpoint,
)
from tests.base import (
FIXTURE_DIR,
ZuulTestCase,
iterate_timeout,
simple_layout,
return_data,
)
class TestOpenstackDriver(ZuulTestCase):
config_file = 'zuul-connections-nodepool.conf'
debian_return_data = {
'zuul': {
'artifacts': [
{
'name': 'raw image',
'url': 'http://example.com/image.raw',
'metadata': {
'type': 'zuul_image',
'image_name': 'debian-local',
'format': 'raw',
'sha256': ('59984dd82f51edb3777b969739a92780'
'a520bb314b8d64b294d5de976bd8efb9'),
'md5sum': '262278e1632567a907e4604e9edd2e83',
}
},
]
}
}
def setUp(self):
self.initTestConfig()
clouds_yaml = os.path.join(FIXTURE_DIR, 'clouds.yaml')
self.useFixture(
fixtures.EnvironmentVariable('OS_CLIENT_CONFIG_FILE', clouds_yaml))
self.fake_cloud = FakeOpenstackCloud()
self.patch(OpenstackDriver, '_endpoint_class',
FakeOpenstackProviderEndpoint)
self.patch(FakeOpenstackProviderEndpoint,
'_fake_cloud', self.fake_cloud)
super().setUp()
def tearDown(self):
super().tearDown()
# TODO: make this a generic driver test
@simple_layout('layouts/openstack/nodepool.yaml', enable_nodepool=True)
def test_openstack_config(self):
layout = self.scheds.first.sched.abide.tenants.get('tenant-one').layout
provider = layout.providers['openstack-main']
endpoint = provider.getEndpoint()
self.assertEqual([], list(endpoint.listInstances()))
# TODO: make this a generic driver test
@simple_layout('layouts/openstack/nodepool.yaml', enable_nodepool=True)
def test_openstack_node_lifecycle(self):
nodeset = model.NodeSet()
nodeset.addNode(model.Node("node", "debian-normal"))
ctx = self.createZKContext(None)
request = self.requestNodes([n.label for n in nodeset.getNodes()])
client = LauncherClient(self.zk_client, None)
request = client.getRequest(request.uuid)
self.assertEqual(request.state, model.NodesetRequest.State.FULFILLED)
self.assertEqual(len(request.nodes), 1)
client.acceptNodeset(request, nodeset)
self.waitUntilSettled()
with testtools.ExpectedException(NoNodeError):
# Request should be gone
request.refresh(ctx)
for node in nodeset.getNodes():
pnode = node._provider_node
self.assertIsNotNone(pnode)
self.assertTrue(pnode.hasLock())
client.useNodeset(nodeset)
self.waitUntilSettled()
for node in nodeset.getNodes():
pnode = node._provider_node
self.assertTrue(pnode.hasLock())
self.assertTrue(pnode.state, pnode.State.IN_USE)
client.returnNodeset(nodeset)
self.waitUntilSettled()
for node in nodeset.getNodes():
pnode = node._provider_node
self.assertFalse(pnode.hasLock())
self.assertTrue(pnode.state, pnode.State.USED)
for _ in iterate_timeout(60, "node to be deleted"):
try:
pnode.refresh(ctx)
except NoNodeError:
break
# TODO: make this a generic driver test
@simple_layout('layouts/openstack/nodepool-image.yaml',
enable_nodepool=True)
@return_data(
'build-debian-local-image',
'refs/heads/master',
debian_return_data,
)
def test_openstack_diskimage(self):
self.waitUntilSettled()
self.assertHistory([
dict(name='build-debian-local-image', result='SUCCESS'),
], ordered=False)
name = 'review.example.com%2Forg%2Fcommon-config/debian-local'
artifacts = self.launcher.image_build_registry.\
getArtifactsForImage(name)
self.assertEqual(1, len(artifacts))
self.assertEqual('raw', artifacts[0].format)
self.assertTrue(artifacts[0].validated)
uploads = self.launcher.image_upload_registry.getUploadsForImage(
name)
self.assertEqual(1, len(uploads))
self.assertEqual(artifacts[0].uuid, uploads[0].artifact_uuid)
self.assertIsNotNone(uploads[0].external_id)
self.assertTrue(uploads[0].validated)

View File

@ -0,0 +1,68 @@
# Copyright 2024 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import urllib
from zuul.driver import Driver, ConnectionInterface, ProviderInterface
from zuul.driver.openstack import (
openstackconnection,
openstackmodel,
openstackprovider,
openstackendpoint,
)
class OpenstackDriver(Driver, ConnectionInterface, ProviderInterface):
name = 'openstack'
_endpoint_class = openstackendpoint.OpenstackProviderEndpoint
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
self.endpoints = {}
def getConnection(self, name, config):
return openstackconnection.OpenstackConnection(self, name, config)
def getProvider(self, connection, tenant_name, canonical_name,
provider_config):
return openstackprovider.OpenstackProvider(
self, connection, tenant_name, canonical_name, provider_config)
def getProviderClass(self):
return openstackprovider.OpenstackProvider
def getProviderSchema(self):
return openstackprovider.OpenstackProviderSchema().getProviderSchema()
def getProviderNodeClass(self):
return openstackmodel.OpenstackProviderNode
def getEndpoint(self, provider):
region = provider.region or ''
endpoint_id = '/'.join([
urllib.parse.quote_plus(provider.connection.connection_name),
urllib.parse.quote_plus(region),
])
try:
return self.endpoints[endpoint_id]
except KeyError:
pass
endpoint = self._endpoint_class(
self, provider.connection, provider.region)
self.endpoints[endpoint_id] = endpoint
return endpoint
def stop(self):
for endpoint in self.endpoints.values():
endpoint.stop()

View File

@ -0,0 +1,39 @@
# Copyright 2024 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
from zuul.connection import BaseConnection
class OpenstackConnection(BaseConnection):
driver_name = 'openstack'
log = logging.getLogger("zuul.OpenstackConnection")
def __init__(self, driver, connection_name, connection_config):
super().__init__(driver, connection_name, connection_config)
# Allow the user to specify the clouds.yaml via the config
# setting or the environment variable. Otherwise we will use
# the client lib default paths.
self.config_files = self.connection_config.get(
'client_config_file',
os.getenv('OS_CLIENT_CONFIG_FILE', None))
self.cloud_name = self.connection_config.get('cloud')
self.region_name = self.connection_config.get('region')
# Rate limit: requests/second
self.rate = self.connection_config.get('rate', 2)

View File

@ -0,0 +1,993 @@
# Copyright (C) 2011-2013 OpenStack Foundation
# Copyright 2017 Red Hat
# Copyright 2022 Acme Gating, LLC
# Copyright 2022-2024 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import logging
import math
import operator
import time
import urllib.parse
from collections import UserDict
from concurrent.futures import ThreadPoolExecutor
import openstack
from keystoneauth1.exceptions.catalog import EndpointNotFound
from zuul import exceptions
from zuul.driver.openstack.openstackmodel import (
OpenstackResource,
OpenstackInstance,
)
from zuul.driver.util import (
LazyExecutorTTLCache,
QuotaInformation,
RateLimiter,
)
from zuul.provider import (
BaseProviderEndpoint,
statemachine
)
CACHE_TTL = 10
def quota_from_flavor(flavor, label=None, volumes=None):
args = dict(instances=1,
cores=flavor.vcpus,
ram=flavor.ram)
if label and label.boot_from_volume:
args['volumes'] = 1
args['volume-gb'] = label.volume_size
elif volumes:
args['volumes'] = len(volumes)
args['volume-gb'] = sum([v.size for v in volumes])
return QuotaInformation(**args)
def quota_from_limits(compute, volume):
def bound_value(value):
if value == -1:
return math.inf
return value
args = dict(
instances=bound_value(compute.max_total_instances),
cores=bound_value(compute.max_total_cores),
ram=bound_value(compute.max_total_ram_size))
if volume is not None:
args['volumes'] = bound_value(volume['absolute']['maxTotalVolumes'])
args['volume-gb'] = bound_value(
volume['absolute']['maxTotalVolumeGigabytes'])
return QuotaInformation(**args)
class ZuulOpenstackServer(UserDict):
# Most of OpenStackSDK is designed around a dictionary interface,
# but due to the historic use of the Munch object, some code
# (add_server_interfaces) accesses values by attribute instead of
# key. For just those values, we provide getters and setters.
@property
def access_ipv4(self):
return self.data.get('access_ipv4')
@access_ipv4.setter
def access_ipv4(self, value):
self.data['access_ipv4'] = value
@property
def public_v4(self):
return self.data.get('public_v4')
@public_v4.setter
def public_v4(self, value):
self.data['public_v4'] = value
@property
def private_v4(self):
return self.data.get('private_v4')
@private_v4.setter
def private_v4(self, value):
self.data['private_v4'] = value
@property
def access_ipv6(self):
return self.data.get('access_ipv6')
@access_ipv6.setter
def access_ipv6(self, value):
self.data['access_ipv6'] = value
@property
def public_v6(self):
return self.data.get('public_v6')
@public_v6.setter
def public_v6(self, value):
self.data['public_v6'] = value
class OpenstackDeleteStateMachine(statemachine.StateMachine):
FLOATING_IP_DELETING = 'deleting floating ip'
SERVER_DELETE_SUBMIT = 'submit delete server'
SERVER_DELETE = 'delete server'
SERVER_DELETING = 'deleting server'
COMPLETE = 'complete'
def __init__(self, endpoint, node, log):
self.log = log
self.endpoint = endpoint
super().__init__(node.delete_state)
self.external_id = node.openstack_server_id
self.floating_ips = None
def advance(self):
if self.state == self.START:
self.server = self.endpoint._getServer(self.external_id)
if (self.server and
self.endpoint._hasFloatingIps() and
self.server.get('addresses')):
self.floating_ips = self.endpoint._getFloatingIps(self.server)
for fip in self.floating_ips:
self.endpoint._deleteFloatingIp(fip)
self.state = self.FLOATING_IP_DELETING
if not self.floating_ips:
self.state = self.SERVER_DELETE_SUBMIT
if self.state == self.FLOATING_IP_DELETING:
fips = []
for fip in self.floating_ips:
fip = self.endpoint._refreshFloatingIpDelete(fip)
if not fip or fip['status'] == 'DOWN':
fip = None
if fip:
fips.append(fip)
self.floating_ips = fips
if self.floating_ips:
return
else:
self.state = self.SERVER_DELETE_SUBMIT
if self.state == self.SERVER_DELETE_SUBMIT:
self.delete_future = self.endpoint._submitApi(
self.endpoint._deleteServer,
self.external_id)
self.state = self.SERVER_DELETE
if self.state == self.SERVER_DELETE:
if self.endpoint._completeApi(self.delete_future):
self.state = self.SERVER_DELETING
if self.state == self.SERVER_DELETING:
self.server = self.endpoint._refreshServerDelete(self.server)
if self.server:
return
else:
self.state = self.COMPLETE
if self.state == self.COMPLETE:
self.complete = True
class OpenstackCreateStateMachine(statemachine.StateMachine):
SERVER_CREATING_SUBMIT = 'submit creating server'
SERVER_CREATING = 'creating server'
FLOATING_IP_CREATING = 'creating floating ip'
FLOATING_IP_ATTACHING = 'attaching floating ip'
COMPLETE = 'complete'
def __init__(self, endpoint, node, hostname, label, flavor, image,
image_external_id, tags, log):
self.log = log
self.endpoint = endpoint
self.label = label
self.flavor = flavor
self.image = image
self.server = None
self.hostname = hostname
# TODO: implement az
self.az = None
super().__init__(node.create_state)
self.attempts = node.create_state.get("attempts", 0)
self.image_external_id = node.create_state.get(
"image_external_id", image_external_id)
self.config_drive = image.config_drive
if image_external_id:
self.image_external = image_external_id
else:
# launch using unmanaged cloud image
if image.image_id:
# Using a dict with the ID bypasses an image search during
# server creation.
self.image_external = dict(id=image.image_id)
else:
self.image_external = image.external_name
# props = label.instance_properties.copy()
# for k, v in label.dynamic_instance_properties.items():
# try:
# props[k] = v.format(request=self.request.getSafeAttributes())
# except Exception:
# self.log.exception(
# "Error formatting dynamic instance property %s", k)
# if not props:
# props = None
# Put provider.name and image_name in as groups so that ansible
# inventory can auto-create groups for us based on each of those
# qualities
# Also list each of those values directly so that non-ansible
# consumption programs don't need to play a game of knowing that
# groups[0] is the image name or anything silly like that.
# groups_list = [self.provider.name]
# groups_list.append(image_name)
# groups_list.append(label.name)
# meta = dict(
# groups=",".join(groups_list),
# )
# merge in any instance properties provided from config
# if props:
# meta.update(props)
# merge nodepool-internal metadata
meta = {}
meta.update(tags)
self.metadata = meta
self.os_flavor = self.endpoint._findFlavor(
flavor_name=flavor.flavor_name,
# min_ram=self.label.min_ram,
)
self.quota = quota_from_flavor(self.os_flavor, label=self.label)
self.external_id = None
def _handleServerFault(self):
# Return True if this is a quota fault
if not self.external_id:
return
try:
server = self.endpoint._getServerByIdNow(self.external_id)
if not server:
return
fault = server.get('fault', {}).get('message')
if fault:
self.log.error('Detailed node error: %s', fault)
if 'quota' in fault:
return True
except Exception:
self.log.exception(
'Failed to retrieve node error information:')
def advance(self):
if self.state == self.START:
self.external_id = None
self.create_future = self.endpoint._submitApi(
self.endpoint._createServer,
self.hostname,
image=self.image_external,
flavor=self.os_flavor,
key_name=self.label.key_name,
az=self.az,
config_drive=self.config_drive,
# networks=self.label.networks,
# security_groups=self.label.pool.security_groups,
boot_from_volume=self.label.boot_from_volume,
# volume_size=self.label.volume_size,
instance_properties=self.metadata,
# userdata=self.label.userdata,
)
self.state = self.SERVER_CREATING_SUBMIT
if self.state == self.SERVER_CREATING_SUBMIT:
try:
try:
self.server = self.endpoint._completeApi(
self.create_future)
if self.server is None:
return
self.external_id = self.server['id']
self.state = self.SERVER_CREATING
except openstack.cloud.exc.OpenStackCloudCreateException as e:
if e.resource_id:
self.external_id = e.resource_id
if self._handleServerFault():
self.log.exception("Launch attempt failed:")
raise exceptions.QuotaException("Quota exceeded")
raise
except Exception as e:
if 'quota exceeded' in str(e).lower():
self.log.exception("Launch attempt failed:")
raise exceptions.QuotaException("Quota exceeded")
if 'number of ports exceeded' in str(e).lower():
self.log.exception("Launch attempt failed:")
raise exceptions.QuotaException("Quota exceeded")
raise
if self.state == self.SERVER_CREATING:
self.server = self.endpoint._refreshServer(self.server)
if self.server['status'] == 'ACTIVE':
# if (self.label.pool.auto_floating_ip and
# self.endpoint._needsFloatingIp(self.server)):
# self.floating_ip = self.endpoint._createFloatingIp(
# self.server)
# self.state = self.FLOATING_IP_CREATING
# else:
self.state = self.COMPLETE
elif self.server['status'] == 'ERROR':
if ('fault' in self.server and self.server['fault'] is not None
and 'message' in self.server['fault']):
self.log.error(
"Error in creating the server."
" Compute service reports fault: {reason}".format(
reason=self.server['fault']['message']))
error_message = self.server['fault']['message'].lower()
if all(s in error_message for s in ('exceeds', 'quota')):
raise exceptions.QuotaException("Quota exceeded")
raise exceptions.LaunchStatusException("Server in error state")
else:
return
if self.state == self.FLOATING_IP_CREATING:
self.floating_ip = self.endpoint._refreshFloatingIp(
self.floating_ip)
if self.floating_ip.get('port_id', None):
if self.floating_ip['status'] == 'ACTIVE':
self.state = self.FLOATING_IP_ATTACHING
else:
return
else:
self.endpoint._attachIpToServer(self.server, self.floating_ip)
self.state = self.FLOATING_IP_ATTACHING
if self.state == self.FLOATING_IP_ATTACHING:
self.server = self.endpoint._refreshServer(self.server)
ext_ip = openstack.cloud.meta.get_server_ip(
self.server, ext_tag='floating', public=True)
if ext_ip == self.floating_ip['floating_ip_address']:
self.state = self.COMPLETE
else:
return
if self.state == self.COMPLETE:
self.complete = True
return self.endpoint._getInstance(self.server, self.quota)
class OpenstackProviderEndpoint(BaseProviderEndpoint):
"""An OPENSTACK Endpoint corresponds to a single OPENSTACK region,
and can include multiple availability zones.
"""
IMAGE_UPLOAD_SLEEP = 30
def __init__(self, driver, connection, region):
name = f'{connection.connection_name}-{region}'
super().__init__(driver, connection, name)
self.region = region
# Wrap these instance methods with a per-instance LRU cache so
# that we don't leak memory over time when the endpoint is
# occasionally replaced.
self._findImage = functools.lru_cache(maxsize=None)(
self._findImage)
self._listFlavors = functools.lru_cache(maxsize=None)(
self._listFlavors)
self._findNetwork = functools.lru_cache(maxsize=None)(
self._findNetwork)
self._listAZs = functools.lru_cache(maxsize=None)(
self._listAZs)
self.log = logging.getLogger(f"zuul.openstack.{self.name}")
self._running = True
# The default http connection pool size is 10; match it for
# efficiency.
workers = 10
self.log.info("Create executor with max workers=%s", workers)
self.api_executor = ThreadPoolExecutor(
thread_name_prefix=f'openstack-api-{self.name}',
max_workers=workers)
# Use a lazy TTL cache for these. This uses the TPE to
# asynchronously update the cached values, meanwhile returning
# the previous cached data if available. This means every
# call after the first one is instantaneous.
self._listServers = LazyExecutorTTLCache(
CACHE_TTL, self.api_executor)(
self._listServers)
self._listVolumes = LazyExecutorTTLCache(
CACHE_TTL, self.api_executor)(
self._listVolumes)
self._listFloatingIps = LazyExecutorTTLCache(
CACHE_TTL, self.api_executor)(
self._listFloatingIps)
self.rate_limiter = RateLimiter(self.name,
connection.rate)
self._last_image_check_failure = time.time()
self._last_port_cleanup = None
self._client = self._getClient()
def stop(self):
self.api_executor.shutdown()
self._running = False
def listResources(self):
for server in self._listServers():
if server['status'].lower() == 'deleted':
continue
yield OpenstackResource(server.get('metadata', {}),
OpenstackResource.TYPE_INSTANCE,
server['id'])
# Floating IP and port leakage can't be handled by the
# automatic resource cleanup in cleanupLeakedResources because
# openstack doesn't store metadata on those objects, so we
# call internal cleanup methods here.
if self.provider.port_cleanup_interval:
self._cleanupLeakedPorts()
if self.provider.clean_floating_ips:
self._cleanupFloatingIps()
def deleteResource(self, resource):
self.log.info(f"Deleting leaked {resource.type}: {resource.id}")
if resource.type == OpenstackResource.TYPE_INSTANCE:
self._deleteServer(resource.id)
def listInstances(self):
volumes = {}
for volume in self._listVolumes():
volumes[volume.id] = volume
for server in self._listServers():
if server['status'].lower() == 'deleted':
continue
flavor = self._getFlavorFromServer(server)
server_volumes = []
for vattach in server.get(
'os-extended-volumes:volumes_attached', []):
volume = volumes.get(vattach['id'])
if volume:
server_volumes.append(volume)
quota = quota_from_flavor(flavor, volumes=server_volumes)
yield OpenstackInstance(self.provider, server, quota)
def getQuotaLimits(self):
# with Timer(self.log, 'API call get_compute_limits'):
compute = self._client.get_compute_limits()
try:
# with Timer(self.log, 'API call get_volume_limits'):
volume = self._client.get_volume_limits()
except EndpointNotFound:
volume = None
return quota_from_limits(compute, volume)
def getQuotaForLabel(self, label):
flavor = self._findFlavor(label.flavor_name, label.min_ram)
return quota_from_flavor(flavor, label=label)
def getAZs(self):
azs = self._listAZs()
if not azs:
# If there are no zones, return a list containing None so that
# random.choice can pick None and pass that to Nova. If this
# feels dirty, please direct your ire to policy.json and the
# ability to turn off random portions of the OpenStack API.
return [None]
return azs
def labelReady(self, label):
if not label.cloud_image:
return False
# If an image ID was supplied, we'll assume it is ready since
# we don't currently have a way of validating that (except during
# server creation).
if label.cloud_image.image_id:
return True
image = self._findImage(label.cloud_image.external_name)
if not image:
self.log.warning(
"Provider %s is configured to use %s as the"
" cloud-image for label %s and that"
" cloud-image could not be found in the"
" cloud." % (self.provider.name,
label.cloud_image.external_name,
label.name))
# If the user insists there should be an image but it
# isn't in our cache, invalidate the cache periodically so
# that we can see new cloud image uploads.
if (time.time() - self._last_image_check_failure >
self.IMAGE_CHECK_TIMEOUT):
self._findImage.cache_clear()
self._last_image_check_failure = time.time()
return False
return True
def uploadImage(self, provider_image, filename,
image_format, metadata, md5, sha256, timeout):
# configure glance and upload image. Note the meta flags
# are provided as custom glance properties
# NOTE: we have wait=True set here. This is not how we normally
# do things in nodepool, preferring to poll ourselves thankyouverymuch.
# However - two things to note:
# - PUT has no aysnc mechanism, so we have to handle it anyway
# - v2 w/task waiting is very strange and complex - but we have to
# block for our v1 clouds anyway, so we might as well
# have the interface be the same and treat faking-out
# a openstacksdk-level fake-async interface later
if not metadata:
metadata = {}
if image_format:
metadata['disk_format'] = image_format
# with Timer(self.log, 'API call create_image'):
image = self._client.create_image(
name=provider_image.name,
filename=filename,
is_public=False,
wait=True,
md5=md5,
sha256=sha256,
timeout=timeout,
**metadata)
return image.id
def deleteImage(self, external_id):
self.log.debug(f"Deleting image {external_id}")
# with Timer(self.log, 'API call delete_image'):
return self._client.delete_image(external_id)
# Local implementation
def _getInstance(self, server, quota):
return OpenstackInstance(
self.connection.cloud_name,
self.connection.region_name,
server, quota)
def _getClient(self):
config = openstack.config.OpenStackConfig(
config_files=self.connection.config_files,
load_envvars=False,
app_name='zuul',
)
region = config.get_one(cloud=self.connection.cloud_name,
region_name=self.connection.region_name)
return openstack.connection.Connection(
config=region,
use_direct_get=False,
rate_limit=self.connection.rate,
)
def _submitApi(self, api, *args, **kw):
return self.api_executor.submit(
api, *args, **kw)
def _completeApi(self, future):
if not future.done():
return None
return future.result()
def _createServer(self, name, image, flavor,
az=None, key_name=None, config_drive=True,
networks=None, security_groups=None,
boot_from_volume=False, volume_size=50,
instance_properties=None, userdata=None):
if not networks:
networks = []
if not isinstance(image, dict):
# if it's a dict, we already have the cloud id. If it's not,
# we don't know if it's name or ID so need to look it up
image = self._findImage(image)
create_args = dict(name=name,
image=image,
flavor=flavor,
config_drive=config_drive)
if boot_from_volume:
create_args['boot_from_volume'] = boot_from_volume
create_args['volume_size'] = volume_size
# NOTE(pabelanger): Always cleanup volumes when we delete a server.
create_args['terminate_volume'] = True
if key_name:
create_args['key_name'] = key_name
if az:
create_args['availability_zone'] = az
if security_groups:
create_args['security_groups'] = security_groups
if userdata:
create_args['userdata'] = userdata
nics = []
for network in networks:
net_id = self._findNetwork(network)['id']
nics.append({'net-id': net_id})
if nics:
create_args['nics'] = nics
if instance_properties:
create_args['meta'] = instance_properties
try:
# with Timer(self.log, 'API call create_server'):
return self._client.create_server(wait=False, **create_args)
except openstack.exceptions.BadRequestException:
# We've gotten a 400 error from nova - which means the request
# was malformed. The most likely cause of that, unless something
# became functionally and systemically broken, is stale az, image
# or flavor cache. Log a message, invalidate the caches so that
# next time we get new caches.
self.log.info(
"Clearing az, flavor and image caches due to 400 error "
"from nova")
self._findImage.cache_clear()
self._listFlavors.cache_clear()
self._findNetwork.cache_clear()
self._listAZs.cache_clear()
raise
# This method is wrapped with an LRU cache in the constructor.
def _listAZs(self):
# with Timer(self.log, 'API call list_availability_zone_names'):
return self._client.list_availability_zone_names()
# This method is wrapped with an LRU cache in the constructor.
def _findImage(self, name):
# with Timer(self.log, 'API call get_image'):
return self._client.get_image(name, filters={'status': 'active'})
# This method is wrapped with an LRU cache in the constructor.
def _listFlavors(self):
# with Timer(self.log, 'API call list_flavors'):
flavors = self._client.list_flavors(get_extra=False)
flavors.sort(key=operator.itemgetter('ram', 'name'))
return flavors
# This method is only used by the nodepool alien-image-list
# command and only works with the openstack driver.
def _listImages(self):
# with Timer(self.log, 'API call list_images'):
return self._client.list_images()
def _findFlavorByName(self, flavor_name):
for f in self._listFlavors():
if flavor_name in (f['name'], f['id']):
return f
raise Exception("Unable to find flavor: %s" % flavor_name)
def _findFlavorByRam(self, min_ram, flavor_name):
for f in self._listFlavors():
if (f['ram'] >= min_ram
and (not flavor_name or flavor_name in f['name'])):
return f
raise Exception("Unable to find flavor with min ram: %s" % min_ram)
def _findFlavorById(self, flavor_id):
for f in self._listFlavors():
if f['id'] == flavor_id:
return f
raise Exception("Unable to find flavor with id: %s" % flavor_id)
def _findFlavor(self, flavor_name, min_ram=None):
if min_ram:
return self._findFlavorByRam(min_ram, flavor_name)
else:
return self._findFlavorByName(flavor_name)
# This method is wrapped with an LRU cache in the constructor.
def _findNetwork(self, name):
# with Timer(self.log, 'API call get_network'):
network = self._client.get_network(name)
if not network:
raise Exception("Unable to find network %s in provider %s" % (
name, self.provider.name))
return network
# This method is based on code from OpenStackSDK, licensed
# under ASL2.
def _simpleServerList(self):
session = self._client.compute
limit = None
query_params = {}
uri = '/servers/detail'
ret = []
while uri:
response = session.get(
uri,
headers={"Accept": "application/json"},
params=query_params,
)
data = response.json()
last_marker = query_params.pop('marker', None)
query_params.pop('limit', None)
resources = data['servers']
if not isinstance(resources, list):
resources = [resources]
ret += [ZuulOpenstackServer(x) for x in resources]
if resources:
marker = resources[-1]['id']
uri, next_params = self._getNextLink(
uri, response, data, marker, limit
)
try:
if next_params['marker'] == last_marker:
raise Exception(
'Endless pagination loop detected, aborting'
)
except KeyError:
pass
query_params.update(next_params)
else:
break
return ret
# This method is based on code from OpenStackSDK, licensed
# under ASL2.
def _getNextLink(self, uri, response, data, marker, limit):
pagination_key = 'servers_links'
next_link = None
params = {}
if isinstance(data, dict):
links = data.get(pagination_key, {})
for item in links:
if item.get('rel') == 'next' and 'href' in item:
next_link = item['href']
break
if next_link and next_link.startswith('/v'):
next_link = next_link[next_link.find('/', 1):]
if not next_link and 'next' in response.links:
# RFC5988 specifies Link headers and requests parses them if they
# are there. We prefer link dicts in resource body, but if those
# aren't there and Link headers are, use them.
next_link = response.links['next']['uri']
# Parse params from Link (next page URL) into params.
# This prevents duplication of query parameters that with large
# number of pages result in HTTP 414 error eventually.
if next_link:
parts = urllib.parse.urlparse(next_link)
query_params = urllib.parse.parse_qs(parts.query)
params.update(query_params)
next_link = urllib.parse.urljoin(next_link, parts.path)
# If we still have no link, and limit was given and is non-zero,
# and the number of records yielded equals the limit, then the user
# is playing pagination ball so we should go ahead and try once more.
if not next_link and limit:
next_link = uri
params['marker'] = marker
params['limit'] = limit
return next_link, params
def _listServers(self):
# with Timer(self.log, 'API call detailed server list'):
return self._simpleServerList()
def _listVolumes(self):
try:
# with Timer(self.log, 'API call list_volumes'):
return self._client.list_volumes()
except EndpointNotFound:
return []
def _listFloatingIps(self):
# with Timer(self.log, 'API call list_floating_ips'):
return self._client.list_floating_ips()
def _refreshServer(self, obj):
ret = self._getServer(obj['id'])
if ret:
return ret
return obj
def _expandServer(self, server):
return openstack.cloud.meta.add_server_interfaces(
self._client, server)
def _getServer(self, external_id):
for server in self._listServers():
if server['id'] == external_id:
if server['status'] in ['ACTIVE', 'ERROR']:
return self._expandServer(server)
return server
return None
def _getServerByIdNow(self, server_id):
# A synchronous get server by id. Only to be used in error
# handling where we can't wait for the list to update.
# with Timer(self.log, 'API call get_server_by_id'):
return self._client.get_server_by_id(server_id)
def _refreshServerDelete(self, obj):
if obj is None:
return obj
for server in self._listServers():
if server['id'] == obj['id']:
if server['status'].lower() == 'deleted':
return None
return server
return None
def _refreshFloatingIp(self, obj):
for fip in self._listFloatingIps():
if fip['id'] == obj['id']:
return fip
return obj
def _refreshFloatingIpDelete(self, obj):
if obj is None:
return obj
for fip in self._listFloatingIps():
if fip['id'] == obj['id']:
if fip.status == 'DOWN':
return None
return fip
return None
def _needsFloatingIp(self, server):
# with Timer(self.log, 'API call _needs_floating_ip'):
return self._client._needs_floating_ip(
server=server, nat_destination=None)
def _createFloatingIp(self, server):
# with Timer(self.log, 'API call create_floating_ip'):
return self._client.create_floating_ip(server=server, wait=True)
def _attachIpToServer(self, server, fip):
# skip_attach is ignored for nova, which is the only time we
# should actually call this method.
# with Timer(self.log, 'API call _attach_ip_to_server'):
return self._client._attach_ip_to_server(
server=server, floating_ip=fip,
skip_attach=True)
def _hasFloatingIps(self):
# Not a network call
return self._client._has_floating_ips()
def _getFloatingIps(self, server):
fips = openstack.cloud.meta.find_nova_interfaces(
server['addresses'], ext_tag='floating')
ret = []
for fip in fips:
# with Timer(self.log, 'API call get_floating_ip'):
ret.append(self._client.get_floating_ip(
id=None, filters={'floating_ip_address': fip['addr']}))
return ret
def _deleteFloatingIp(self, fip):
# with Timer(self.log, 'API call delete_floating_ip'):
self._client.delete_floating_ip(fip['id'], retry=0)
def _deleteServer(self, external_id):
# with Timer(self.log, 'API call delete_server'):
self._client.delete_server(external_id)
return True
def _getFlavorFromServer(self, server):
# In earlier versions of nova or the sdk, flavor has just an id.
# In later versions it returns the information we're looking for.
# If we get the information we want, we do not need to try to
# lookup the flavor in our list.
if hasattr(server['flavor'], 'vcpus'):
return server['flavor']
else:
return self._findFlavorById(server['flavor']['id'])
# The port cleanup logic. We don't get tags or metadata, so we
# have to figure this out on our own.
# This method is not cached
def _listPorts(self, status=None):
'''
List known ports.
:param str status: A valid port status. E.g., 'ACTIVE' or 'DOWN'.
'''
if status:
ports = self._client.list_ports(filters={'status': status})
else:
ports = self._client.list_ports()
return ports
def _filterComputePorts(self, ports):
'''
Return a list of compute ports (or no device owner).
We are not interested in ports for routers or DHCP.
'''
ret = []
for p in ports:
if (p.device_owner is None or p.device_owner == '' or
p.device_owner.startswith("compute:")):
ret.append(p)
return ret
def _cleanupLeakedPorts(self):
if not self._last_port_cleanup:
self._last_port_cleanup = time.monotonic()
ports = self._listPorts(status='DOWN')
ports = self._filterComputePorts(ports)
self._down_ports = set([p.id for p in ports])
return
# Return if not enough time has passed between cleanup
last_check_in_secs = int(time.monotonic() - self._last_port_cleanup)
if last_check_in_secs <= self.provider.port_cleanup_interval:
return
ports = self._listPorts(status='DOWN')
ports = self._filterComputePorts(ports)
current_set = set([p.id for p in ports])
remove_set = current_set & self._down_ports
removed_count = 0
for port_id in remove_set:
try:
self._deletePort(port_id)
except Exception:
self.log.exception("Exception deleting port %s in %s:",
port_id, self.provider.name)
else:
removed_count += 1
self.log.debug("Removed DOWN port %s in %s",
port_id, self.provider.name)
if self._statsd and removed_count:
key = 'nodepool.provider.%s.leaked.ports' % (self.provider.name)
self._statsd.incr(key, removed_count)
self._last_port_cleanup = time.monotonic()
# Rely on OpenStack to tell us the down ports rather than doing our
# own set adjustment.
ports = self._listPorts(status='DOWN')
ports = self._filterComputePorts(ports)
self._down_ports = set([p.id for p in ports])
def _deletePort(self, port_id):
self._client.delete_port(port_id)
def _cleanupFloatingIps(self):
did_clean = self._client.delete_unattached_floating_ips()
if did_clean:
# some openstacksdk's return True if any port was
# cleaned, rather than the count. Just set it to 1 to
# indicate something happened.
if type(did_clean) is bool:
did_clean = 1
if self._statsd:
key = ('nodepool.provider.%s.leaked.floatingips'
% self.provider.name)
self._statsd.incr(key, did_clean)
def getConsoleLog(self, label, external_id):
if not label.console_log:
return None
try:
return self._client.get_server_console(external_id)
except openstack.exceptions.OpenStackCloudException:
return None

View File

@ -0,0 +1,63 @@
# Copyright 2024 BMW Group
# Copyright 2024 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from zuul import model
from zuul.provider import statemachine
class OpenstackProviderNode(model.ProviderNode, subclass_id="openstack"):
def __init__(self):
super().__init__()
self._set(
openstack_server_id=None,
)
def getDriverData(self):
return dict(
openstack_server_id=self.openstack_server_id,
)
class OpenstackInstance(statemachine.Instance):
def __init__(self, cloud, region, server, quota):
super().__init__()
self.openstack_server_id = server['id']
self.metadata = server.get('metadata', {})
self.private_ipv4 = server.get('private_v4')
self.private_ipv6 = None
self.public_ipv4 = server.get('public_v4')
self.public_ipv6 = server.get('public_v6')
self.host_id = server['hostId']
self.cloud = cloud
self.region = region
self.az = server.get('OS-EXT-AZ:availability_zone')
self.interface_ip = server.get('interface_ip')
self.quota = quota
def getQuotaInformation(self):
return self.quota
class OpenstackResource(statemachine.Resource):
TYPE_HOST = 'host'
TYPE_INSTANCE = 'instance'
TYPE_AMI = 'ami'
TYPE_SNAPSHOT = 'snapshot'
TYPE_VOLUME = 'volume'
TYPE_OBJECT = 'object'
def __init__(self, metadata, type, id):
super().__init__(metadata, type)
self.id = id

View File

@ -0,0 +1,234 @@
# Copyright 2022-2024 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import math
import zuul.provider.schema as provider_schema
from zuul.lib.voluputil import (
Required, Optional, Nullable, discriminate, assemble, RequiredExclusive,
)
import voluptuous as vs
from zuul.driver.openstack.openstackendpoint import (
OpenstackCreateStateMachine,
OpenstackDeleteStateMachine,
)
from zuul.driver.util import QuotaInformation
from zuul.provider import (
BaseProvider,
BaseProviderFlavor,
BaseProviderImage,
BaseProviderLabel,
BaseProviderSchema,
)
class OpenstackProviderImage(BaseProviderImage):
openstack_image_filters = {
'name': str,
'values': [str],
}
openstack_cloud_schema = vs.Schema({
vs.Exclusive(Required('image-id'), 'spec'): str,
vs.Exclusive(Required('image-filters'), 'spec'): [
openstack_image_filters],
Optional('config-drive', default=True): bool,
Required('type'): 'cloud',
})
cloud_schema = vs.All(
assemble(
BaseProviderImage.schema,
openstack_cloud_schema,
),
RequiredExclusive('image_id', 'image_filters',
msg=('Provide either '
'"image-filters", or "image-id" keys'))
)
inheritable_openstack_zuul_schema = vs.Schema({
# None is an acceptable explicit value for imds-support
Optional('config-drive', default=True): bool,
})
openstack_zuul_schema = vs.Schema({
Required('type'): 'zuul',
Optional('tags', default=dict): {str: str},
})
zuul_schema = assemble(
BaseProviderImage.schema,
openstack_zuul_schema,
inheritable_openstack_zuul_schema,
)
inheritable_schema = assemble(
BaseProviderImage.inheritable_schema,
inheritable_openstack_zuul_schema,
)
schema = vs.Union(
cloud_schema, zuul_schema,
discriminant=discriminate(
lambda val, alt: val['type'] == alt['type']))
def __init__(self, image_config, provider_config):
self.image_id = None
self.image_filters = None
super().__init__(image_config, provider_config)
class OpenstackProviderFlavor(BaseProviderFlavor):
openstack_flavor_schema = vs.Schema({
Required('flavor-name'): str,
})
inheritable_schema = assemble(
BaseProviderFlavor.inheritable_schema,
provider_schema.cloud_flavor,
)
schema = assemble(
BaseProviderFlavor.schema,
provider_schema.cloud_flavor,
openstack_flavor_schema,
)
class OpenstackProviderLabel(BaseProviderLabel):
openstack_label_schema = vs.Schema({
Optional('boot-from-volume', default=False): bool,
})
inheritable_schema = assemble(
BaseProviderLabel.inheritable_schema,
provider_schema.ssh_label,
)
schema = assemble(
BaseProviderLabel.schema,
provider_schema.ssh_label,
openstack_label_schema,
)
class OpenstackProviderSchema(BaseProviderSchema):
def getLabelSchema(self):
return OpenstackProviderLabel.schema
def getImageSchema(self):
return OpenstackProviderImage.schema
def getFlavorSchema(self):
return OpenstackProviderFlavor.schema
def getProviderSchema(self):
schema = super().getProviderSchema()
openstack_provider_schema = vs.Schema({
Optional('region'): Nullable(str),
})
return assemble(
schema,
openstack_provider_schema,
OpenstackProviderImage.inheritable_schema,
OpenstackProviderFlavor.inheritable_schema,
OpenstackProviderLabel.inheritable_schema,
)
class OpenstackProvider(BaseProvider, subclass_id='openstack'):
log = logging.getLogger("zuul.OpenstackProvider")
schema = OpenstackProviderSchema().getProviderSchema()
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
# In listResources, we reconcile AMIs which appear to be
# imports but have no nodepool tags, however it's possible
# that these aren't nodepool images. If we determine that's
# the case, we'll add their ids here so we don't waste our
# time on that again. We do not need to serialize these;
# these are ephemeral caches.
self._set(
not_our_images=set(),
not_our_snapshots=set(),
)
@property
def endpoint(self):
ep = getattr(self, '_endpoint', None)
if ep:
return ep
self._set(_endpoint=self.getEndpoint())
return self._endpoint
def parseImage(self, image_config, provider_config):
return OpenstackProviderImage(image_config, provider_config)
def parseFlavor(self, flavor_config, provider_config):
return OpenstackProviderFlavor(flavor_config, provider_config)
def parseLabel(self, label_config, provider_config):
return OpenstackProviderLabel(label_config, provider_config)
def getEndpoint(self):
return self.driver.getEndpoint(self)
def getCreateStateMachine(self, node, image_external_id, log):
# TODO: decide on a method of producing a hostname
# that is max 15 chars.
hostname = f"np{node.uuid[:13]}"
label = self.labels[node.label]
flavor = self.flavors[label.flavor]
image = self.images[label.image]
return OpenstackCreateStateMachine(
self.endpoint,
node,
hostname,
label,
flavor,
image,
image_external_id,
node.tags,
log)
def getDeleteStateMachine(self, node, log):
return OpenstackDeleteStateMachine(self.endpoint, node, log)
def listInstances(self):
return self.endpoint.listInstances()
def listResources(self):
# TODO: implement
return []
def deleteResource(self, resource):
# TODO: implement
pass
def getQuotaLimits(self):
# TODO: implement
args = dict(default=math.inf)
return QuotaInformation(**args)
def getQuotaForLabel(self, label):
flavor = self.flavors[label.flavor]
return self.endpoint.getQuotaForLabel(label, flavor)
def uploadImage(self, provider_image,
filename, image_format, metadata, md5, sha256):
# TODO make this configurable
# timeout = self.image_import_timeout
timeout = 300
return self.endpoint.uploadImage(
provider_image,
filename, image_format, metadata, md5, sha256,
timeout)
def deleteImage(self, external_id):
self.endpoint.deleteImage(external_id)

View File

@ -32,6 +32,7 @@ import zuul.driver.pagure
import zuul.driver.gitlab
import zuul.driver.elasticsearch
import zuul.driver.aws
import zuul.driver.openstack
from zuul.connection import BaseConnection
from zuul.driver import (
ProviderInterface,
@ -69,6 +70,7 @@ class ConnectionRegistry(object):
self.registerDriver(zuul.driver.gitlab.GitlabDriver())
self.registerDriver(zuul.driver.elasticsearch.ElasticsearchDriver())
self.registerDriver(zuul.driver.aws.AwsDriver())
self.registerDriver(zuul.driver.openstack.OpenstackDriver())
def registerDriver(self, driver):
if driver.name in self.drivers: