Add Google Cloud provider

Also add a TaskManager and a SimpleTaskManagerDriver.

Change-Id: I5c44b24600838ae9afcc6a39c482c67933548bc0
This commit is contained in:
James E. Blair 2019-12-10 15:47:39 -08:00
parent c790ec4721
commit 13104ab0ff
8 changed files with 1386 additions and 13 deletions

View File

@ -365,17 +365,17 @@ Options
The driver type.
.. value:: openstack
.. value:: aws
For details on the extra options required and provided by the
OpenStack driver, see the separate section
:attr:`providers.[openstack]`
AWS driver, see the separate section
:attr:`providers.[aws]`
.. value:: static
.. value:: gce
For details on the extra options required and provided by the
static driver, see the separate section
:attr:`providers.[static]`
GCE driver, see the separate section
:attr:`providers.[gce]`
.. value:: kubernetes
@ -389,18 +389,24 @@ Options
openshift driver, see the separate section
:attr:`providers.[openshift]`
.. value:: aws
For details on the extra options required and provided by the
AWS driver, see the separate section
:attr:`providers.[aws]`
.. value:: openshiftpods
For details on the extra options required and provided by the
openshiftpods driver, see the separate section
:attr:`providers.[openshiftpods]`
.. value:: openstack
For details on the extra options required and provided by the
OpenStack driver, see the separate section
:attr:`providers.[openstack]`
.. value:: static
For details on the extra options required and provided by the
static driver, see the separate section
:attr:`providers.[static]`
OpenStack Driver
----------------
@ -1631,7 +1637,7 @@ section of the configuration.
:type: list
Each entry in this section must refer to an entry in the
:attr:`providers.[aws].cloud-images` section.
:attr:`labels` section.
.. code-block:: yaml
@ -1811,3 +1817,246 @@ section of the configuration.
.. _`AWS region`: https://docs.aws.amazon.com/general/latest/gr/rande.html
.. _`Boto configuration`: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
.. _`Boto describe images`: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2.html#EC2.Client.describe_images
.. _gce_driver:
Google Cloud Compute Engine (GCE) Driver
----------------------------------------
Selecting the gce driver adds the following options to the :attr:`providers`
section of the configuration.
.. attr-overview::
:prefix: providers.[gce]
:maxdepth: 3
.. attr:: providers.[gce]
:type: list
An GCE provider's resources are partitioned into groups called `pool`
(see :attr:`providers.[gce].pools` for details), and within a pool,
the node types which are to be made available are listed
(see :attr:`providers.[gce].pools.labels` for details).
See `Application Default Credentials`_ for information on how to
configure credentials and other settings for GCE access in
Nodepool's runtime environment.
.. note:: For documentation purposes the option names are prefixed
``providers.[gce]`` to disambiguate from other
drivers, but ``[gce]`` is not required in the
configuration (e.g. below
``providers.[gce].pools`` refers to the ``pools``
key in the ``providers`` section when the ``gce``
driver is selected).
Example:
.. code-block:: yaml
- name: gce-uscentral1
driver: gce
project: nodepool-123456
region: us-central1
zone: us-central1-a
cloud-images:
- name: debian-stretch
image-project: debian-cloud
image-family: debian-9
username: zuul
key: ssh-rsa ...
pools:
- name: main
max-servers: 8
labels:
- name: debian-stretch
instance-type: f1-micro
cloud-image: debian-stretch
volume-type: standard
volume-size: 10
.. attr:: name
:required:
A unique name for this provider configuration.
.. attr:: region
:required:
Name of the region to use; see `GCE regions and zones`_.
.. attr:: zone
:required:
Name of the zone to use; see `GCE regions and zones`_.
.. attr:: boot-timeout
:type: int seconds
:default: 60
Once an instance is active, how long to try connecting to the
image via SSH. If the timeout is exceeded, the node launch is
aborted and the instance deleted.
.. attr:: launch-retries
:default: 3
The number of times to retry launching a node before considering
the job failed.
.. attr:: cloud-images
:type: list
Each entry in this section must refer to an entry in the
:attr:`labels` section.
.. code-block:: yaml
cloud-images:
- name: debian-stretch
image-project: debian-cloud
image-family: debian-9
username: zuul
key: ssh-rsa ...
Each entry is a dictionary with the following keys:
.. attr:: name
:type: string
:required:
Identifier to refer this cloud-image from
:attr:`providers.[gce].pools.labels` section.
.. attr:: image-id
:type: str
If this is provided, it is used to select the image from the cloud
provider by ID.
.. attr:: image-project
:type: str
If :attr:`providers.[gce].cloud-images.image-id` is not
provided, this is used along with
:attr:`providers.[gce].cloud-images.image-family` to find an
image.
.. attr:: image-family
:type: str
If :attr:`providers.[gce].cloud-images.image-id` is not
provided, this is used along with
:attr:`providers.[gce].cloud-images.image-project` to find an
image.
.. attr:: username
:type: str
The username that a consumer should use when connecting to the node.
.. attr:: key
:type: str
An SSH public key to add to the instance (project global keys
are added automatically).
.. attr:: python-path
:type: str
:default: auto
The path of the default python interpreter. Used by Zuul to set
``ansible_python_interpreter``. The special value ``auto`` will
direct Zuul to use inbuilt Ansible logic to select the
interpreter on Ansible >=2.8, and default to
``/usr/bin/python2`` for earlier versions.
.. attr:: connection-type
:type: str
The connection type that a consumer should use when connecting to the
node. For most images this is not necessary. However when creating
Windows images this could be 'winrm' to enable access via ansible.
.. attr:: connection-port
:type: int
:default: 22/ 5986
The port that a consumer should use when connecting to the node. For
most diskimages this is not necessary. This defaults to 22 for ssh and
5986 for winrm.
.. attr:: pools
:type: list
A pool defines a group of resources from an GCE provider. Each pool has a
maximum number of nodes which can be launched from it, along with a number
of cloud-related attributes used when launching nodes.
.. attr:: name
:required:
A unique name within the provider for this pool of resources.
.. attr:: host-key-checking
:type: bool
:default: True
Specify custom behavior of validation of SSH host keys. When set to
False, nodepool-launcher will not ssh-keyscan nodes after they are
booted. This might be needed if nodepool-launcher and the nodes it
launches are on different networks. The default value is True.
.. attr:: labels
:type: list
Each entry in a pool's `labels` section indicates that the
corresponding label is available for use in this pool. When creating
nodes for a label, the flavor-related attributes in that label's
section will be used.
.. code-block:: yaml
labels:
- name: debian
instance-type: f1-micro
cloud-image: debian-stretch
Each entry is a dictionary with the following keys
.. attr:: name
:type: str
:required:
Identifier to refer this label.
.. attr:: cloud-image
:type: str
:required:
Refers to the name of an externally managed image in the
cloud that already exists on the provider. The value of
``cloud-image`` should match the ``name`` of a previously
configured entry from the ``cloud-images`` section of the
provider. See :attr:`providers.[gce].cloud-images`.
.. attr:: instance-type
:type: str
:required:
Name of the flavor to use. See `GCE machine types`_.
.. attr:: volume-type
:type: string
If given, the root volume type (``standard`` or ``ssd``).
.. attr:: volume-size
:type: int
If given, the size of the root volume, in GiB.
.. _`Application Default Credentials`: https://cloud.google.com/docs/authentication/production
.. _`GCE regions and zones`: https://cloud.google.com/compute/docs/regions-zones/
.. _`GCE machine types`: https://cloud.google.com/compute/docs/machine-types

View File

@ -82,6 +82,8 @@ Those objects are referenced from the Driver main interface that needs to
be implemented in the __init__.py file of the driver directory.
.. _provider_config:
ProviderConfig
~~~~~~~~~~~~~~
@ -147,3 +149,57 @@ The launch procedure usually consists of the following operations:
Once an external_id is obtained, it should be stored to the node.external_id.
- Once the resource is created, READY should be stored to the node.state.
Otherwise raise an exception to restart the launch attempt.
TaskManager
-----------
If you need to use a thread-unsafe client library, or you need to
manage rate limiting in your driver, you may want to use the
:py:class:`~nodepool.driver.taskmanager.TaskManager` class. Implement
any remote API calls as tasks and invoke them by submitting the tasks
to the TaskManager. It will run them sequentially from a single
thread, and assist in rate limiting.
The :py:class:`~nodepool.driver.taskmanager.BaseTaskManagerProvider`
class is a subclass of :py:class:`~nodepool.driver.Provider` which
starts and stops a TaskManager automatically. Inherit from it to
build a Provider as described above with a TaskManager.
.. autoclass:: nodepool.driver.taskmanager.Task
:members:
.. autoclass:: nodepool.driver.taskmanager.TaskManager
:members:
.. autoclass:: nodepool.driver.taskmanager.BaseTaskManagerProvider
Simple Drivers
--------------
If your system is simple enough, you may be able to use the
SimpleTaskManagerDriver class to implement support with just a few
methods. In order to use this class, your system must create and
delete instances as a unit (without requiring multiple resource
creation calls such as volumes or floating IPs).
.. note:: This system is still in development and lacks robust support
for quotas or image building.
To use this system, you will need to implement a few subclasses.
First, create a :ref:`provider_config` subclass as you would for any
driver. Then, subclass
:py:class:`~nodepool.driver.simple.SimpleTaskManagerInstance` to map
remote instance data into a format the simple driver can understand.
Next, subclass
:py:class:`~nodepool.driver.simple.SimpleTaskManagerAdapter` to
implement the main API methods of your provider. Finally, subclass
:py:class:`~nodepool.driver.simple.SimpleTaskManagerDriver` to tie them
all together.
See the ``gce`` provider for an example.
.. autoclass:: nodepool.driver.simple.SimpleTaskManagerInstance
:members:
.. autoclass:: nodepool.driver.simple.SimpleTaskManagerAdapter
:members:
.. autoclass:: nodepool.driver.simple.SimpleTaskManagerDriver
:members:

View File

@ -0,0 +1,27 @@
# Copyright 2019 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
from nodepool.driver.simple import SimpleTaskManagerDriver
from nodepool.driver.gce.config import GCEProviderConfig
from nodepool.driver.gce.adapter import GCEAdapter
class GCEDriver(SimpleTaskManagerDriver):
def getProviderConfig(self, provider):
return GCEProviderConfig(self, provider)
def getAdapter(self, provider_config):
return GCEAdapter(provider_config)

View File

@ -0,0 +1,115 @@
# Copyright 2019 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from nodepool.driver.simple import SimpleTaskManagerAdapter
from nodepool.driver.simple import SimpleTaskManagerInstance
import googleapiclient.discovery
class GCEInstance(SimpleTaskManagerInstance):
def load(self, data):
if data['status'] == 'TERMINATED':
self.deleted = True
elif data['status'] == 'RUNNING':
self.ready = True
self.external_id = data['name']
self.az = data['zone']
iface = data.get('networkInterfaces', [])
if len(iface):
self.private_ipv4 = iface[0].get('networkIP')
access = iface[0].get('accessConfigs', [])
if len(access):
self.public_ipv4 = access[0].get('natIP')
self.interface_ip = self.public_ipv4 or self.private_ipv4
if data.get('metadata'):
for item in data['metadata'].get('items', []):
self.metadata[item['key']] = item['value']
class GCEAdapter(SimpleTaskManagerAdapter):
log = logging.getLogger("nodepool.driver.gce.GCEAdapter")
def __init__(self, provider):
self.provider = provider
self.compute = googleapiclient.discovery.build('compute', 'v1')
def listInstances(self, task_manager):
servers = []
q = self.compute.instances().list(project=self.provider.project,
zone=self.provider.zone)
with task_manager.rateLimit():
result = q.execute()
for instance in result.get('items', []):
servers.append(GCEInstance(instance))
return servers
def deleteInstance(self, task_manager, server_id):
q = self.compute.instances().delete(project=self.provider.project,
zone=self.provider.zone,
instance=server_id)
with task_manager.rateLimit():
q.execute()
def _getImageId(self, task_manager, cloud_image):
image_id = cloud_image.image_id
if image_id:
return image_id
if cloud_image.image_family:
q = self.compute.images().getFromFamily(
project=cloud_image.image_project,
family=cloud_image.image_family)
with task_manager.rateLimit():
result = q.execute()
image_id = result['selfLink']
return image_id
def createInstance(self, task_manager, hostname, metadata, label):
image_id = self._getImageId(task_manager, label.cloud_image)
disk = dict(boot=True,
autoDelete=True,
initializeParams=dict(sourceImage=image_id))
machine_type = 'zones/{}/machineTypes/{}'.format(
self.provider.zone, label.instance_type)
network = dict(network='global/networks/default',
accessConfigs=[dict(
type='ONE_TO_ONE_NAT',
name='External NAT')])
metadata_items = []
for (k, v) in metadata.items():
metadata_items.append(dict(key=k, value=v))
meta = dict(items=metadata_items)
args = dict(
name=hostname,
machineType=machine_type,
disks=[disk],
networkInterfaces=[network],
serviceAccounts=[],
metadata=meta)
q = self.compute.instances().insert(
project=self.provider.project,
zone=self.provider.zone,
body=args)
with task_manager.rateLimit():
q.execute()
return hostname

View File

@ -0,0 +1,250 @@
# Copyright 2018-2019 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
import voluptuous as v
from nodepool.driver import ConfigPool
from nodepool.driver import ConfigValue
from nodepool.driver import ProviderConfig
class ProviderCloudImage(ConfigValue):
def __init__(self):
self.name = None
self.image_id = None
self.username = None
self.key = None
self.python_path = None
self.connection_type = None
self.connection_port = None
def __eq__(self, other):
if isinstance(other, ProviderCloudImage):
return (self.name == other.name
and self.image_id == other.image_id
and self.username == other.username
and self.key == other.key
and self.python_path == other.python_path
and self.connection_type == other.connection_type
and self.connection_port == other.connection_port)
return False
def __repr__(self):
return "<ProviderCloudImage %s>" % self.name
@property
def external_name(self):
'''Human readable version of external.'''
return self.image_id or self.name
class ProviderLabel(ConfigValue):
def __init__(self):
self.name = None
self.cloud_image = None
self.instance_type = None
self.volume_size = None
self.volume_type = None
# The ProviderPool object that owns this label.
self.pool = None
def __eq__(self, other):
if isinstance(other, ProviderLabel):
# NOTE(Shrews): We intentionally do not compare 'pool' here
# since this causes recursive checks with ProviderPool.
return (other.name == self.name
and other.cloud_image == self.cloud_image
and other.instance_type == self.instance_type
and other.volume_size == self.volume_size
and other.volume_type == self.volume_type)
return False
def __repr__(self):
return "<ProviderLabel %s>" % self.name
class ProviderPool(ConfigPool):
def __init__(self):
self.name = None
self.host_key_checking = True
self.labels = None
# The ProviderConfig object that owns this pool.
self.provider = None
# Initialize base class attributes
super().__init__()
def load(self, pool_config, full_config, provider):
super().load(pool_config)
self.name = pool_config['name']
self.provider = provider
self.host_key_checking = bool(
pool_config.get('host-key-checking', True))
for label in pool_config.get('labels', []):
pl = ProviderLabel()
pl.name = label['name']
pl.pool = self
self.labels[pl.name] = pl
cloud_image_name = label.get('cloud-image', None)
if cloud_image_name:
cloud_image = self.provider.cloud_images.get(
cloud_image_name, None)
if not cloud_image:
raise ValueError(
"cloud-image %s does not exist in provider %s"
" but is referenced in label %s" %
(cloud_image_name, self.name, pl.name))
else:
cloud_image = None
pl.cloud_image = cloud_image
pl.instance_type = label['instance-type']
pl.volume_type = label.get('volume-type', 'standard')
pl.volume_size = label.get('volume-size', '10')
full_config.labels[label['name']].pools.append(self)
def __eq__(self, other):
if isinstance(other, ProviderPool):
# NOTE(Shrews): We intentionally do not compare 'provider' here
# since this causes recursive checks with OpenStackProviderConfig.
return (super().__eq__(other)
and other.name == self.name
and other.host_key_checking == self.host_key_checking
and other.labels == self.labels)
return False
def __repr__(self):
return "<ProviderPool %s>" % self.name
class GCEProviderConfig(ProviderConfig):
def __init__(self, driver, provider):
self.driver_object = driver
self.__pools = {}
self.region = None
self.boot_timeout = None
self.launch_retries = None
self.project = None
self.zone = None
self.cloud_images = {}
self.rate_limit = None
super().__init__(provider)
def __eq__(self, other):
if isinstance(other, GCEProviderConfig):
return (super().__eq__(other)
and other.region == self.region
and other.pools == self.pools
and other.boot_timeout == self.boot_timeout
and other.launch_retries == self.launch_retries
and other.cloud_images == self.cloud_images
and other.project == self.project
and other.rate_limit == self.rate_limit
and other.zone == self.zone)
return False
@property
def pools(self):
return self.__pools
@property
def manage_images(self):
# Currently we have no image management for google. This should
# be updated if that changes.
return False
@staticmethod
def reset():
pass
def load(self, config):
self.region = self.provider.get('region')
self.boot_timeout = self.provider.get('boot-timeout', 60)
self.launch_retries = self.provider.get('launch-retries', 3)
self.project = self.provider.get('project')
self.zone = self.provider.get('zone')
self.rate_limit = self.provider.get('rate-limit', 1)
default_port_mapping = {
'ssh': 22,
'winrm': 5986,
}
# TODO: diskimages
for image in self.provider.get('cloud-images', []):
i = ProviderCloudImage()
i.name = image['name']
i.image_id = image.get('image-id', None)
i.image_project = image.get('image-project', None)
i.image_family = image.get('image-family', None)
i.username = image.get('username', None)
i.key = image.get('key', None)
i.python_path = image.get('python-path', 'auto')
i.connection_type = image.get('connection-type', 'ssh')
i.connection_port = image.get(
'connection-port',
default_port_mapping.get(i.connection_type, 22))
self.cloud_images[i.name] = i
for pool in self.provider.get('pools', []):
pp = ProviderPool()
pp.load(pool, config, self)
self.pools[pp.name] = pp
def getSchema(self):
pool_label = {
v.Required('name'): str,
v.Required('cloud-image'): str,
v.Required('instance-type'): str,
'volume-type': str,
'volume-size': int
}
pool = ConfigPool.getCommonSchemaDict()
pool.update({
v.Required('name'): str,
v.Required('labels'): [pool_label],
})
provider_cloud_images = {
'name': str,
'connection-type': str,
'connection-port': int,
'image-id': str,
'username': str,
'key': str,
'python-path': str,
}
provider = ProviderConfig.getCommonSchemaDict()
provider.update({
v.Required('pools'): [pool],
v.Required('region'): str,
v.Required('project'): str,
v.Required('zone'): str,
'cloud-images': [provider_cloud_images],
'boot-timeout': int,
'launch-retries': int,
})
return v.Schema(provider)
def getSupportedLabels(self, pool_name=None):
labels = set()
for pool in self.pools.values():
if not pool_name or (pool.name == pool_name):
labels.update(pool.labels.keys())
return labels

488
nodepool/driver/simple.py Normal file
View File

@ -0,0 +1,488 @@
# Copyright 2019 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
import logging
import math
from nodepool.driver.taskmanager import BaseTaskManagerProvider, Task
from nodepool.driver import Driver, NodeRequestHandler
from nodepool.driver.utils import NodeLauncher, QuotaInformation
from nodepool.nodeutils import iterate_timeout, nodescan
from nodepool import exceptions
from nodepool import zk
# Private support classes
class CreateInstanceTask(Task):
name = 'create_instance'
def main(self, manager):
return self.args['adapter'].createInstance(
manager, self.args['hostname'], self.args['metadata'],
self.args['label_config'])
class DeleteInstanceTask(Task):
name = 'delete_instance'
def main(self, manager):
return self.args['adapter'].deleteInstance(
manager, self.args['external_id'])
class ListInstancesTask(Task):
name = 'list_instances'
def main(self, manager):
return self.args['adapter'].listInstances(manager)
class SimpleTaskManagerLauncher(NodeLauncher):
"""The NodeLauncher implementation for the SimpleTaskManager driver
framework"""
def __init__(self, handler, node, provider_config, provider_label):
super().__init__(handler.zk, node, provider_config)
self.provider_name = provider_config.name
self.retries = provider_config.launch_retries
self.pool = provider_config.pools[provider_label.pool.name]
self.handler = handler
self.zk = handler.zk
self.boot_timeout = provider_config.boot_timeout
self.label = provider_label
def launch(self):
self.log.debug("Starting %s instance" % self.node.type)
attempts = 1
hostname = 'nodepool-' + self.node.id
tm = self.handler.manager.task_manager
adapter = self.handler.manager.adapter
metadata = {'nodepool_node_id': self.node.id,
'nodepool_pool_name': self.pool.name,
'nodepool_provider_name': self.provider_name}
if self.label.cloud_image.key:
metadata['ssh-keys'] = '{}:{}'.format(
self.label.cloud_image.username,
self.label.cloud_image.key)
while attempts <= self.retries:
try:
t = tm.submitTask(CreateInstanceTask(
adapter=adapter, hostname=hostname,
metadata=metadata,
label_config=self.label))
external_id = t.wait()
break
except Exception:
if attempts <= self.retries:
self.log.exception(
"Launch attempt %d/%d failed for node %s:",
attempts, self.retries, self.node.id)
if attempts == self.retries:
raise
attempts += 1
time.sleep(1)
self.node.external_id = external_id
self.zk.storeNode(self.node)
for count in iterate_timeout(
self.boot_timeout, exceptions.LaunchStatusException,
"server %s creation" % external_id):
instance = self.handler.manager.getInstance(external_id)
if instance and instance.ready:
break
self.log.debug("Created instance %s", repr(instance))
server_ip = instance.interface_ip
self.node.connection_port = self.label.cloud_image.connection_port
self.node.connection_type = self.label.cloud_image.connection_type
keys = []
if self.pool.host_key_checking:
try:
if (self.node.connection_type == 'ssh' or
self.node.connection_type == 'network_cli'):
gather_hostkeys = True
else:
gather_hostkeys = False
keys = nodescan(server_ip, port=self.node.connection_port,
timeout=180, gather_hostkeys=gather_hostkeys)
except Exception:
raise exceptions.LaunchKeyscanException(
"Can't scan instance %s key" % hostname)
self.log.info("Instance %s ready" % hostname)
self.node.state = zk.READY
self.node.external_id = hostname
self.node.hostname = hostname
self.node.interface_ip = server_ip
self.node.public_ipv4 = instance.public_ipv4
self.node.private_ipv4 = instance.private_ipv4
self.node.public_ipv6 = instance.public_ipv6
self.node.region = instance.region
self.node.az = instance.az
self.node.host_keys = keys
self.node.username = self.label.cloud_image.username
self.node.python_path = self.label.cloud_image.python_path
self.zk.storeNode(self.node)
self.log.info("Instance %s is ready", hostname)
class SimpleTaskManagerHandler(NodeRequestHandler):
log = logging.getLogger("nodepool.driver.simple."
"SimpleTaskManagerHandler")
def __init__(self, pw, request):
super().__init__(pw, request)
self._threads = []
@property
def alive_thread_count(self):
count = 0
for t in self._threads:
if t.isAlive():
count += 1
return count
def imagesAvailable(self):
'''
Determines if the requested images are available for this provider.
:returns: True if it is available, False otherwise.
'''
return True
def hasProviderQuota(self, node_types):
'''
Checks if a provider has enough quota to handle a list of nodes.
This does not take our currently existing nodes into account.
:param node_types: list of node types to check
:return: True if the node list fits into the provider, False otherwise
'''
# TODO: Add support for real quota handling; this only handles
# max_servers.
needed_quota = QuotaInformation(
cores=1,
instances=len(node_types),
ram=1,
default=1)
pool_quota = QuotaInformation(
cores=math.inf,
instances=self.pool.max_servers,
ram=math.inf,
default=math.inf)
pool_quota.subtract(needed_quota)
self.log.debug("hasProviderQuota({},{}) = {}".format(
self.pool, node_types, pool_quota))
return pool_quota.non_negative()
def hasRemainingQuota(self, ntype):
'''
Checks if the predicted quota is enough for an additional node of type
ntype.
:param ntype: node type for the quota check
:return: True if there is enough quota, False otherwise
'''
# TODO: Add support for real quota handling; this only handles
# max_servers.
needed_quota = QuotaInformation(cores=1, instances=1, ram=1, default=1)
n_running = self.manager.countNodes(self.provider.name, self.pool.name)
pool_quota = QuotaInformation(
cores=math.inf,
instances=self.pool.max_servers - n_running,
ram=math.inf,
default=math.inf)
pool_quota.subtract(needed_quota)
self.log.debug("hasRemainingQuota({},{}) = {}".format(
self.pool, ntype, pool_quota))
return pool_quota.non_negative()
def launchesComplete(self):
'''
Check if all launch requests have completed.
When all of the Node objects have reached a final state (READY or
FAILED), we'll know all threads have finished the launch process.
'''
if not self._threads:
return True
# Give the NodeLaunch threads time to finish.
if self.alive_thread_count:
return False
node_states = [node.state for node in self.nodeset]
# NOTE: It is very important that NodeLauncher always sets one
# of these states, no matter what.
if not all(s in (zk.READY, zk.FAILED) for s in node_states):
return False
return True
def launch(self, node):
label = self.pool.labels[node.type[0]]
thd = SimpleTaskManagerLauncher(self, node, self.provider, label)
thd.start()
self._threads.append(thd)
class SimpleTaskManagerProvider(BaseTaskManagerProvider):
"""The Provider implementation for the SimpleTaskManager driver
framework"""
def __init__(self, adapter, provider):
super().__init__(provider)
self.adapter = adapter
self.node_cache_time = 0
self.node_cache = []
self._zk = None
def start(self, zk_conn):
super().start(zk_conn)
self._zk = zk_conn
def getRequestHandler(self, poolworker, request):
return SimpleTaskManagerHandler(poolworker, request)
def labelReady(self, label):
return True
def cleanupNode(self, external_id):
instance = self.getInstance(external_id)
if (not instance) or instance.deleted:
raise exceptions.NotFound()
t = self.task_manager.submitTask(DeleteInstanceTask(
adapter=self.adapter, external_id=external_id))
t.wait()
def waitForNodeCleanup(self, external_id, timeout=600):
for count in iterate_timeout(
timeout, exceptions.ServerDeleteException,
"server %s deletion" % external_id):
instance = self.getInstance(external_id)
if (not instance) or instance.deleted:
return
def cleanupLeakedResources(self):
deleting_nodes = {}
for node in self._zk.nodeIterator():
if node.state == zk.DELETING:
if node.provider != self.provider.name:
continue
if node.provider not in deleting_nodes:
deleting_nodes[node.provider] = []
deleting_nodes[node.provider].append(node.external_id)
for server in self.listNodes():
meta = server.metadata
if meta.get('nodepool_provider_name') != self.provider.name:
# Not our responsibility
continue
if (server.external_id in
deleting_nodes.get(self.provider.name, [])):
# Already deleting this node
continue
if not self._zk.getNode(meta['nodepool_node_id']):
self.log.warning(
"Marking for delete leaked instance %s in %s "
"(unknown node id %s)",
server.external_id, self.provider.name,
meta['nodepool_node_id']
)
# Create an artifical node to use for deleting the server.
node = zk.Node()
node.external_id = server.external_id
node.provider = self.provider.name
node.state = zk.DELETING
self._zk.storeNode(node)
def listNodes(self):
now = time.monotonic()
if now - self.node_cache_time > 5:
t = self.task_manager.submitTask(ListInstancesTask(
adapter=self.adapter))
nodes = t.wait()
self.node_cache = nodes
self.node_cache_time = time.monotonic()
return self.node_cache
def countNodes(self, provider_name, pool_name):
return len(
[n for n in self.listNodes() if
n.metadata.get('nodepool_provider_name') == provider_name and
n.metadata.get('nodepool_pool_name') == pool_name])
def getInstance(self, external_id):
for candidate in self.listNodes():
if (candidate.external_id == external_id):
return candidate
return None
# Public interface below
class SimpleTaskManagerInstance:
"""Represents a cloud instance
This class is used by the Simple Task Manager Driver classes to
represent a standardized version of a remote cloud instance.
Implement this class in your driver, override the :py:meth:`load`
method, and supply as many of the fields as possible.
:param data: An opaque data object to be passed to the load method.
"""
def __init__(self, data):
self.ready = False
self.deleted = False
self.external_id = None
self.public_ipv4 = None
self.public_ipv6 = None
self.private_ipv4 = None
self.interface_ip = None
self.az = None
self.region = None
self.metadata = {}
self.load(data)
def __repr__(self):
state = []
if self.ready:
state.append('ready')
if self.deleted:
state.append('deleted')
state = ' '.join(state)
return '<{klass} {external_id} {state}>'.format(
klass=self.__class__.__name__,
external_id=self.external_id,
state=state)
def load(self, data):
"""Parse data and update this object's attributes
:param data: An opaque data object which was passed to the
constructor.
Override this method and extract data from the `data`
parameter.
The following attributes are required:
* ready: bool (whether the instance is ready)
* deleted: bool (whether the instance is in a deleted state)
* external_id: str (the unique id of the instance)
* interface_ip: str
* metadata: dict
The following are optional:
* public_ipv4: str
* public_ipv6: str
* private_ipv4: str
* az: str
* region: str
"""
raise NotImplementedError()
class SimpleTaskManagerAdapter:
"""Public interface for the simple TaskManager Provider
Implement these methods as simple synchronous calls, and pass this
class to the SimpleTaskManagerDriver class.
You can establish a single long-lived connection in the
initializer. The provider will call methods on this object from a
single thread.
All methods accept a task_manager argument. Use this to control
rate limiting:
.. code:: python
with task_manager.rateLimit():
<execute API call>
"""
def __init__(self, provider):
pass
def createInstance(self, task_manager, hostname, metadata, label_config):
"""Create an instance
:param TaskManager task_manager: An instance of
:py:class:`~nodepool.driver.taskmananger.TaskManager`.
:param str hostname: The intended hostname for the instance.
:param dict metadata: A dictionary of key/value pairs that
must be stored on the instance.
:param ProviderLabel label_config: A LabelConfig object describing
the instance which should be created.
"""
raise NotImplementedError()
def deleteInstance(self, task_manager, external_id):
"""Delete an instance
:param TaskManager task_manager: An instance of
:py:class:`~nodepool.driver.taskmananger.TaskManager`.
:param str external_id: The id of the cloud instance.
"""
raise NotImplementedError()
def listInstances(self, task_manager):
"""Return a list of instances
:param TaskManager task_manager: An instance of
:py:class:`~nodepool.driver.taskmananger.TaskManager`.
:returns: A list of :py:class:`SimpleTaskManagerInstance` objects.
"""
raise NotImplementedError()
class SimpleTaskManagerDriver(Driver):
"""Subclass this to make a simple driver"""
def getProvider(self, provider_config):
"""Return a provider.
Usually this method does not need to be overridden.
"""
adapter = self.getAdapter(provider_config)
return SimpleTaskManagerProvider(adapter, provider_config)
# Public interface
def getProviderConfig(self, provider):
"""Instantiate a config object
:param dict provider: A dictionary of YAML config describing
the provider.
:returns: A ProviderConfig instance with the parsed data.
"""
raise NotImplementedError()
def getAdapter(self, provider_config):
"""Instantiate an adapter
:param ProviderConfig provider_config: An instance of
ProviderConfig previously returned by :py:meth:`getProviderConfig`.
:returns: An instance of :py:class:`SimpleTaskManagerAdapter`
"""
raise NotImplementedError()

View File

@ -0,0 +1,187 @@
# Copyright 2019 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
import logging
import queue
import threading
from nodepool.driver import Provider
class Task:
"""Base task class for use with :py:class:`TaskManager`
Subclass this to implement your own tasks.
Set the `name` field to the name of your task and override the
:py:meth:`main` method.
Keyword arguments to the constructor are stored on `self.args` for
use by the :py:meth:`main` method.
"""
name = "task_name"
def __init__(self, **kw):
self._wait_event = threading.Event()
self._exception = None
self._traceback = None
self._result = None
self.args = kw
def done(self, result):
self._result = result
self._wait_event.set()
def exception(self, e):
self._exception = e
self._wait_event.set()
def wait(self):
"""Call this method after submitting the task to the TaskManager to
receieve the results."""
self._wait_event.wait()
if self._exception:
raise self._exception
return self._result
def run(self, manager):
try:
self.done(self.main(manager))
except Exception as e:
self.exception(e)
def main(self, manager):
"""Implement the work of the task
:param TaskManager manager: The instance of
:py:class:`TaskManager` running this task.
Arguments passed to the constructor are available as `self.args`.
"""
pass
class StopTask(Task):
name = "stop_taskmanager"
def main(self, manager):
manager._running = False
class RateLimitContextManager:
def __init__(self, task_manager):
self.task_manager = task_manager
def __enter__(self):
if self.task_manager.last_ts is None:
return
while True:
delta = time.monotonic() - self.task_manager.last_ts
if delta >= self.task_manager.delta:
break
time.sleep(self.task_manager.delta - delta)
def __exit__(self, etype, value, tb):
self.task_manager.last_ts = time.monotonic()
class TaskManager:
"""A single-threaded task dispatcher
This class is meant to be instantiated by a Provider in order to
execute remote API calls from a single thread with rate limiting.
:param str name: The name of the TaskManager (usually the provider name)
used in logging.
:param float rate_limit: The rate limit of the task manager expressed in
requests per second.
"""
log = logging.getLogger("nodepool.driver.taskmanager.TaskManager")
def __init__(self, name, rate_limit):
self._running = True
self.name = name
self.queue = queue.Queue()
self.delta = 1.0 / rate_limit
self.last_ts = None
def rateLimit(self):
"""Return a context manager to perform rate limiting. Use as follows:
.. code: python
with task_manager.rateLimit():
<execute API call>
"""
return RateLimitContextManager(self)
def submitTask(self, task):
"""Submit a task to the task manager.
:param Task task: An instance of a subclass of :py:class:`Task`.
:returns: The submitted task for use in function chaning.
"""
self.queue.put(task)
return task
def stop(self):
"""Stop the task manager."""
self.submitTask(StopTask())
def run(self):
try:
while True:
task = self.queue.get()
if not task:
if not self._running:
break
continue
self.log.debug("Manager %s running task %s (queue %s)" %
(self.name, task.name, self.queue.qsize()))
task.run(self)
self.queue.task_done()
except Exception:
self.log.exception("Task manager died")
raise
class BaseTaskManagerProvider(Provider):
"""Subclass this to build a Provider with an included taskmanager"""
log = logging.getLogger("nodepool.driver.taskmanager.TaskManagerProvider")
def __init__(self, provider):
self.provider = provider
self.thread = None
self.task_manager = TaskManager(provider.name, provider.rate_limit)
def start(self, zk_conn):
self.log.debug("Starting")
if self.thread is None:
self.log.debug("Starting thread")
self.thread = threading.Thread(target=self.task_manager.run)
self.thread.start()
def stop(self):
self.log.debug("Stopping")
if self.thread is not None:
self.log.debug("Stopping thread")
self.task_manager.stop()
def join(self):
self.log.debug("Joining")
if self.thread is not None:
self.thread.join()

View File

@ -17,3 +17,4 @@ Paste
WebOb>=1.8.1
openshift<=0.8.9
boto3
google-api-python-client