Merge "static: enable using a single host with different user or port"
This commit is contained in:
commit
a7dc2770b7
|
@ -1039,8 +1039,8 @@ Selecting the static driver adds the following options to the
|
|||
.. note::
|
||||
|
||||
Although you may define more than one pool, it is essentially
|
||||
useless to do so since a node's ``name`` must be unique
|
||||
across all pools.
|
||||
useless to do so since a node's ``name``, ``username`` and ``port``
|
||||
must be unique across all pools.
|
||||
|
||||
Each entry is a dictionary with entries as follows
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
import logging
|
||||
|
||||
from collections import Counter
|
||||
from collections import namedtuple
|
||||
|
||||
from nodepool import exceptions
|
||||
from nodepool import nodeutils
|
||||
|
@ -27,6 +28,17 @@ class StaticNodeError(Exception):
|
|||
pass
|
||||
|
||||
|
||||
Node = namedtuple("Node", ["hostname", "username", "port"])
|
||||
|
||||
|
||||
def nodeTuple(node):
|
||||
"""Return an unique identifier tuple for a static node"""
|
||||
if isinstance(node, dict):
|
||||
return Node(node["name"], node["username"], node["connection-port"])
|
||||
else:
|
||||
return Node(node.hostname, node.username, node.connection_port)
|
||||
|
||||
|
||||
class StaticNodeProvider(Provider):
|
||||
log = logging.getLogger("nodepool.driver.static."
|
||||
"StaticNodeProvider")
|
||||
|
@ -63,18 +75,18 @@ class StaticNodeProvider(Provider):
|
|||
raise StaticNodeError("%s: host key mismatches (%s)" %
|
||||
(node["name"], keys))
|
||||
|
||||
def getRegisteredReadyNodes(self, hostname):
|
||||
def getRegisteredReadyNodes(self, node_tuple):
|
||||
'''
|
||||
Get all registered nodes with the given hostname that are READY.
|
||||
Get all registered nodes with the given identifier that are READY.
|
||||
|
||||
:param str hostname: Hostname of the node (maps to static node name).
|
||||
:param Node node_tuple: the namedtuple Node.
|
||||
:returns: A list of matching Node objects.
|
||||
'''
|
||||
nodes = []
|
||||
for node in self.zk.nodeIterator():
|
||||
if (node.provider != self.provider.name or
|
||||
node.state != zk.READY or
|
||||
node.hostname != hostname
|
||||
nodeTuple(node) != node_tuple
|
||||
):
|
||||
continue
|
||||
nodes.append(node)
|
||||
|
@ -95,25 +107,26 @@ class StaticNodeProvider(Provider):
|
|||
self.log.exception("Failed to connect to node %s:",
|
||||
static_node["name"])
|
||||
try:
|
||||
self.deregisterNode(count=1, node_name=static_node["name"])
|
||||
self.deregisterNode(count=1, node_tuple=nodeTuple(static_node))
|
||||
except Exception:
|
||||
self.log.exception("Couldn't deregister static node:")
|
||||
|
||||
return False
|
||||
|
||||
def getRegisteredNodeHostnames(self):
|
||||
def getRegisteredNodes(self):
|
||||
'''
|
||||
Get hostnames for all registered static nodes.
|
||||
|
||||
:note: We assume hostnames are unique across pools.
|
||||
:note: We assume hostnames, username and port are unique across pools.
|
||||
|
||||
:returns: A set of registered hostnames for the static driver.
|
||||
:returns: A set of registered (hostnames, usernames, ports) tuple for
|
||||
the static driver.
|
||||
'''
|
||||
registered = Counter()
|
||||
for node in self.zk.nodeIterator():
|
||||
if node.provider != self.provider.name:
|
||||
continue
|
||||
registered.update([node.hostname])
|
||||
registered.update([nodeTuple(node)])
|
||||
return registered
|
||||
|
||||
def registerNodeFromConfig(self, count, provider_name, pool_name,
|
||||
|
@ -160,7 +173,7 @@ class StaticNodeProvider(Provider):
|
|||
:param dict static_node: The node definition from the config file.
|
||||
'''
|
||||
host_keys = self.checkHost(static_node)
|
||||
nodes = self.getRegisteredReadyNodes(static_node["name"])
|
||||
nodes = self.getRegisteredReadyNodes(nodeTuple(static_node))
|
||||
new_attrs = (
|
||||
static_node["labels"],
|
||||
static_node["username"],
|
||||
|
@ -197,7 +210,7 @@ class StaticNodeProvider(Provider):
|
|||
finally:
|
||||
self.zk.unlockNode(node)
|
||||
|
||||
def deregisterNode(self, count, node_name):
|
||||
def deregisterNode(self, count, node_tuple):
|
||||
'''
|
||||
Attempt to delete READY nodes.
|
||||
|
||||
|
@ -208,9 +221,9 @@ class StaticNodeProvider(Provider):
|
|||
:param str node_name: The static node name/hostname.
|
||||
'''
|
||||
self.log.debug("Deregistering %s nodes with hostname %s",
|
||||
count, node_name)
|
||||
count, node_tuple.hostname)
|
||||
|
||||
nodes = self.getRegisteredReadyNodes(node_name)
|
||||
nodes = self.getRegisteredReadyNodes(node_tuple)
|
||||
|
||||
for node in nodes:
|
||||
if count <= 0:
|
||||
|
@ -243,7 +256,7 @@ class StaticNodeProvider(Provider):
|
|||
self.zk.unlockNode(node)
|
||||
|
||||
def syncNodeCount(self, registered, node, pool):
|
||||
current_count = registered[node["name"]]
|
||||
current_count = registered[nodeTuple(node)]
|
||||
|
||||
# Register nodes to synchronize with our configuration.
|
||||
if current_count < node["max-parallel-jobs"]:
|
||||
|
@ -257,13 +270,13 @@ class StaticNodeProvider(Provider):
|
|||
elif current_count > node["max-parallel-jobs"]:
|
||||
deregister_cnt = current_count - node["max-parallel-jobs"]
|
||||
try:
|
||||
self.deregisterNode(deregister_cnt, node["name"])
|
||||
self.deregisterNode(deregister_cnt, nodeTuple(node))
|
||||
except Exception:
|
||||
self.log.exception("Couldn't deregister static node:")
|
||||
|
||||
def _start(self, zk_conn):
|
||||
self.zk = zk_conn
|
||||
registered = self.getRegisteredNodeHostnames()
|
||||
registered = self.getRegisteredNodes()
|
||||
|
||||
static_nodes = {}
|
||||
for pool in self.provider.pools.values():
|
||||
|
@ -280,15 +293,15 @@ class StaticNodeProvider(Provider):
|
|||
self.log.exception("Couldn't update static node:")
|
||||
continue
|
||||
|
||||
static_nodes[node["name"]] = node
|
||||
static_nodes[nodeTuple(node)] = node
|
||||
|
||||
# De-register nodes to synchronize with our configuration.
|
||||
# This case covers any registered nodes that no longer appear in
|
||||
# the config.
|
||||
for hostname in list(registered):
|
||||
if hostname not in static_nodes:
|
||||
for node in list(registered):
|
||||
if node not in static_nodes:
|
||||
try:
|
||||
self.deregisterNode(registered[hostname], hostname)
|
||||
self.deregisterNode(registered[node], node)
|
||||
except Exception:
|
||||
self.log.exception("Couldn't deregister static node:")
|
||||
continue
|
||||
|
@ -303,11 +316,11 @@ class StaticNodeProvider(Provider):
|
|||
self.log.debug("Stopping")
|
||||
|
||||
def listNodes(self):
|
||||
registered = self.getRegisteredNodeHostnames()
|
||||
registered = self.getRegisteredNodes()
|
||||
servers = []
|
||||
for pool in self.provider.pools.values():
|
||||
for node in pool.nodes:
|
||||
if node["name"] in registered:
|
||||
if nodeTuple(node) in registered:
|
||||
servers.append(node)
|
||||
return servers
|
||||
|
||||
|
@ -330,7 +343,7 @@ class StaticNodeProvider(Provider):
|
|||
return True
|
||||
|
||||
def cleanupLeakedResources(self):
|
||||
registered = self.getRegisteredNodeHostnames()
|
||||
registered = self.getRegisteredNodes()
|
||||
for pool in self.provider.pools.values():
|
||||
for node in pool.nodes:
|
||||
try:
|
||||
|
@ -353,12 +366,12 @@ class StaticNodeProvider(Provider):
|
|||
return
|
||||
|
||||
try:
|
||||
registered = self.getRegisteredNodeHostnames()
|
||||
registered = self.getRegisteredNodes()
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Cannot get registered hostnames for node re-registration:")
|
||||
return
|
||||
current_count = registered[node.hostname]
|
||||
current_count = registered[nodeTuple(node)]
|
||||
|
||||
# It's possible we were not able to de-register nodes due to a config
|
||||
# change (because they were in use). In that case, don't bother to
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
zookeeper-servers:
|
||||
- host: {zookeeper_host}
|
||||
port: {zookeeper_port}
|
||||
chroot: {zookeeper_chroot}
|
||||
|
||||
labels:
|
||||
- name: fake-label
|
||||
|
||||
providers:
|
||||
- name: static-provider
|
||||
driver: static
|
||||
pools:
|
||||
- name: main
|
||||
nodes:
|
||||
- name: fake-host-1
|
||||
labels: fake-label
|
||||
host-key: ssh-rsa FAKEKEY
|
||||
timeout: 13
|
||||
connection-port: 22022
|
||||
username: zuul
|
||||
- name: fake-host-1
|
||||
labels: fake-label
|
||||
host-key: ssh-rsa FAKEKEY
|
||||
timeout: 13
|
||||
connection-port: 22022
|
||||
username: zuul-2
|
|
@ -82,6 +82,23 @@ class TestDriverStatic(tests.DBTestCase):
|
|||
nodes = self.waitForNodes('fake-label')
|
||||
self.assertEqual(nodes[0].python_path, "/usr/bin/python3")
|
||||
|
||||
def test_static_multiname(self):
|
||||
'''
|
||||
Test that multi name node registration works.
|
||||
'''
|
||||
configfile = self.setup_config('static-multiname.yaml')
|
||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||
pool.start()
|
||||
|
||||
self.log.debug("Waiting for node pre-registration")
|
||||
nodes = self.waitForNodes('fake-label', 2)
|
||||
self.assertEqual(len(nodes), 2)
|
||||
|
||||
self.assertEqual(nodes[0].state, zk.READY)
|
||||
self.assertEqual(nodes[0].username, 'zuul-2')
|
||||
self.assertEqual(nodes[1].state, zk.READY)
|
||||
self.assertEqual(nodes[1].username, 'zuul')
|
||||
|
||||
def test_static_unresolvable(self):
|
||||
'''
|
||||
Test that basic node registration works.
|
||||
|
|
Loading…
Reference in New Issue