Update docker client
- use the default docker sock path in case no one is found - override docker client class to perform direct connection to remote host via Paramiko (without any SSH tunnel). - test client APIs both on localhost and any remote SSH server Change-Id: Id508c4dc99436a155ec9c1bd71d0c332c97d67f1
This commit is contained in:
parent
e6f8872aca
commit
e3d3f48ed3
@ -15,7 +15,12 @@
|
||||
# under the License.
|
||||
from __future__ import absolute_import
|
||||
|
||||
import socket
|
||||
import typing
|
||||
|
||||
import docker
|
||||
from docker.transport import unixconn
|
||||
from docker import constants
|
||||
|
||||
import tobiko
|
||||
from tobiko.docker import _exception
|
||||
@ -23,9 +28,12 @@ from tobiko.docker import _shell
|
||||
from tobiko.shell import ssh
|
||||
|
||||
|
||||
def get_docker_client(base_urls=None, ssh_client=None):
|
||||
def get_docker_client(base_urls: typing.List[str] = None,
|
||||
ssh_client: ssh.SSHClientType = None,
|
||||
sudo=False):
|
||||
return DockerClientFixture(base_urls=base_urls,
|
||||
ssh_client=ssh_client)
|
||||
ssh_client=ssh_client,
|
||||
sudo=sudo)
|
||||
|
||||
|
||||
def list_docker_containers(client=None, **kwargs):
|
||||
@ -49,16 +57,22 @@ def docker_client(obj=None):
|
||||
|
||||
class DockerClientFixture(tobiko.SharedFixture):
|
||||
|
||||
base_urls = None
|
||||
client = None
|
||||
ssh_client = None
|
||||
base_urls: typing.Optional[typing.List[str]] = None
|
||||
ssh_client = ssh.SSHClientType = None
|
||||
sudo = False
|
||||
client: typing.Optional['DockerClient'] = None
|
||||
|
||||
def __init__(self, base_urls=None, ssh_client=None):
|
||||
def __init__(self,
|
||||
base_urls: typing.Iterable[str] = None,
|
||||
ssh_client: ssh.SSHClientType = None,
|
||||
sudo: bool = None):
|
||||
super(DockerClientFixture, self).__init__()
|
||||
if base_urls:
|
||||
self.base_urls = list(base_urls)
|
||||
if ssh_client:
|
||||
if ssh_client is not None:
|
||||
self.ssh_client = ssh_client
|
||||
if sudo is not None:
|
||||
self.sudo = sudo
|
||||
|
||||
def setup_fixture(self):
|
||||
self.setup_ssh_client()
|
||||
@ -86,21 +100,21 @@ class DockerClientFixture(tobiko.SharedFixture):
|
||||
return client
|
||||
|
||||
def create_client(self):
|
||||
exc_info = None
|
||||
error: typing.Optional[Exception] = None
|
||||
for base_url in self.base_urls:
|
||||
if self.ssh_client:
|
||||
base_url = ssh.get_port_forward_url(ssh_client=self.ssh_client,
|
||||
url=base_url)
|
||||
client = docker.DockerClient(base_url=base_url)
|
||||
client = DockerClient(base_url=base_url,
|
||||
ssh_client=self.ssh_client,
|
||||
sudo=self.sudo)
|
||||
try:
|
||||
client.ping()
|
||||
except Exception:
|
||||
exc_info = exc_info or tobiko.exc_info()
|
||||
except Exception as ex:
|
||||
ex.__cause__ = error
|
||||
error = ex
|
||||
else:
|
||||
return client
|
||||
|
||||
if exc_info:
|
||||
exc_info.reraise()
|
||||
if isinstance(error, Exception):
|
||||
raise error
|
||||
else:
|
||||
raise _exception.DockerError('Unable to create docker client')
|
||||
|
||||
@ -108,4 +122,131 @@ class DockerClientFixture(tobiko.SharedFixture):
|
||||
return tobiko.setup_fixture(self).client
|
||||
|
||||
def discover_docker_urls(self):
|
||||
return _shell.discover_docker_urls(ssh_client=self.ssh_client)
|
||||
return _shell.discover_docker_urls(ssh_client=self.ssh_client,
|
||||
sudo=self.sudo)
|
||||
|
||||
|
||||
class DockerClient(docker.DockerClient):
|
||||
|
||||
def __init__(self,
|
||||
base_url: str,
|
||||
ssh_client: ssh.SSHClientType = None,
|
||||
sudo=False):
|
||||
# pylint: disable=super-init-not-called
|
||||
self.api = APIClient(base_url=base_url,
|
||||
ssh_client=ssh_client,
|
||||
sudo=sudo)
|
||||
|
||||
|
||||
class APIClient(docker.APIClient):
|
||||
|
||||
def __init__(self,
|
||||
base_url: str,
|
||||
ssh_client: ssh.SSHClientType = None,
|
||||
sudo=False):
|
||||
self.ssh_client = ssh_client
|
||||
self.sudo = sudo
|
||||
super(APIClient, self).__init__(base_url=base_url)
|
||||
|
||||
def get_adapter(self, url):
|
||||
adapter = super(APIClient, self).get_adapter(url)
|
||||
if isinstance(adapter, unixconn.UnixHTTPAdapter):
|
||||
new_adapter = UnixHTTPAdapter(
|
||||
socket_url=f"http+unix://{adapter.socket_path}",
|
||||
timeout=adapter.timeout,
|
||||
max_pool_size=adapter.max_pool_size,
|
||||
ssh_client=self.ssh_client,
|
||||
sudo=self.sudo)
|
||||
self._custom_adapter = new_adapter
|
||||
for prefix, other_adapter in self.adapters.items():
|
||||
if adapter is other_adapter:
|
||||
self.adapters[prefix] = new_adapter
|
||||
return new_adapter
|
||||
else:
|
||||
return adapter
|
||||
|
||||
|
||||
class UnixHTTPAdapter(unixconn.UnixHTTPAdapter):
|
||||
|
||||
def __init__(self,
|
||||
socket_url: str,
|
||||
timeout=60,
|
||||
pool_connections=constants.DEFAULT_NUM_POOLS,
|
||||
max_pool_size=constants.DEFAULT_MAX_POOL_SIZE,
|
||||
ssh_client: ssh.SSHClientType = None,
|
||||
sudo=False):
|
||||
self.ssh_client = ssh_client
|
||||
self.sudo = sudo
|
||||
super().__init__(socket_url=socket_url,
|
||||
timeout=timeout,
|
||||
pool_connections=pool_connections,
|
||||
max_pool_size=max_pool_size)
|
||||
|
||||
def get_connection(self, url, proxies=None):
|
||||
with self.pools.lock:
|
||||
pool = self.pools.get(url)
|
||||
if pool:
|
||||
return pool
|
||||
|
||||
pool = UnixHTTPConnectionPool(base_url=url,
|
||||
socket_path=self.socket_path,
|
||||
timeout=self.timeout,
|
||||
maxsize=self.max_pool_size,
|
||||
ssh_client=self.ssh_client,
|
||||
sudo=self.sudo)
|
||||
self.pools[url] = pool
|
||||
|
||||
return pool
|
||||
|
||||
|
||||
class UnixHTTPConnectionPool(unixconn.UnixHTTPConnectionPool):
|
||||
|
||||
def __init__(self,
|
||||
base_url: str,
|
||||
socket_path: str,
|
||||
timeout=60,
|
||||
maxsize=10,
|
||||
ssh_client: ssh.SSHClientType = None,
|
||||
sudo=False):
|
||||
self.ssh_client = ssh_client
|
||||
self.sudo = sudo
|
||||
super().__init__(base_url=base_url, socket_path=socket_path,
|
||||
timeout=timeout, maxsize=maxsize)
|
||||
|
||||
def _new_conn(self):
|
||||
return UnixHTTPConnection(base_url=self.base_url,
|
||||
unix_socket=self.socket_path,
|
||||
timeout=self.timeout,
|
||||
ssh_client=self.ssh_client,
|
||||
sudo=self.sudo)
|
||||
|
||||
|
||||
class UnixHTTPConnection(unixconn.UnixHTTPConnection):
|
||||
|
||||
def __init__(self,
|
||||
base_url: str,
|
||||
unix_socket: str,
|
||||
timeout=60,
|
||||
ssh_client: ssh.SSHClientType = None,
|
||||
sudo=False):
|
||||
self.ssh_client = ssh_client
|
||||
self.sudo = sudo
|
||||
super(UnixHTTPConnection, self).__init__(base_url=base_url,
|
||||
unix_socket=unix_socket,
|
||||
timeout=timeout)
|
||||
|
||||
def connect(self):
|
||||
ssh_client = ssh.ssh_client_fixture(self.ssh_client)
|
||||
if ssh_client is None:
|
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
sock.settimeout(self.timeout)
|
||||
sock.connect(self.unix_socket)
|
||||
elif self.sudo:
|
||||
client = self.ssh_client.connect()
|
||||
sock = client.get_transport().open_session(timeout=self.timeout)
|
||||
command = f"sudo nc -U '{self.unix_socket}'"
|
||||
sock.exec_command(command)
|
||||
else:
|
||||
sock = self.ssh_client.open_unix_socket(
|
||||
socket_path=self.unix_socket)
|
||||
self.sock = sock
|
||||
|
@ -15,40 +15,97 @@
|
||||
# under the License.
|
||||
from __future__ import absolute_import
|
||||
|
||||
import collections
|
||||
from urllib import parse
|
||||
import typing
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
from tobiko.docker import _exception
|
||||
from tobiko.shell import sh
|
||||
from tobiko.shell import ssh
|
||||
|
||||
|
||||
def discover_docker_urls(**execute_params):
|
||||
result = sh.execute('ps aux | grep dockerd', stdin=False, stdout=True,
|
||||
stderr=True, expect_exit_status=None, **execute_params)
|
||||
if result.exit_status or not result.stdout:
|
||||
raise _exception.DockerUrlNotFoundError(details=result.stderr)
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
def discover_docker_urls(
|
||||
ssh_client: ssh.SSHClientType = None,
|
||||
default_url='unix:/var/run/docker.sock',
|
||||
check_urls=True,
|
||||
sudo=False):
|
||||
urls = []
|
||||
for line in result.stdout.splitlines():
|
||||
fields = line.strip().split()
|
||||
if fields:
|
||||
offset = 0
|
||||
while True:
|
||||
processes = sh.list_processes(command='^dockerd',
|
||||
ssh_client=ssh_client,
|
||||
sudo=sudo)
|
||||
|
||||
for process in processes:
|
||||
if process.command_line:
|
||||
urls += urls_from_command_line(process.command_line)
|
||||
|
||||
urls.append(default_url)
|
||||
urls = list(collections.OrderedDict.fromkeys(urls))
|
||||
|
||||
error: typing.Optional[Exception] = None
|
||||
if check_urls:
|
||||
valid_urls = []
|
||||
for url in urls:
|
||||
parsed_url = parse.urlparse(url)
|
||||
if (parsed_url.scheme == 'unix' and
|
||||
parsed_url.path.startswith('/')):
|
||||
try:
|
||||
offset = fields.index('-H', offset)
|
||||
url = fields[offset + 1]
|
||||
except (ValueError, IndexError):
|
||||
break
|
||||
sh.execute(f"test -r '{parsed_url.path}'",
|
||||
ssh_client=ssh_client,
|
||||
sudo=sudo)
|
||||
except sh.ShellCommandFailed as ex:
|
||||
LOG.exception(
|
||||
f"Can't read from socket: {parsed_url.path}")
|
||||
ex.__cause__ = error
|
||||
error = ex
|
||||
else:
|
||||
urls.append(url)
|
||||
offset += 2
|
||||
|
||||
if not urls:
|
||||
raise _exception.DockerUrlNotFoundError(details='\n' + result.stdout)
|
||||
valid_urls.append(url)
|
||||
|
||||
if not valid_urls:
|
||||
raise _exception.DockerUrlNotFoundError(
|
||||
"Docker is not running") from error
|
||||
urls = valid_urls
|
||||
return urls
|
||||
|
||||
|
||||
def is_docker_running(ssh_client=None, **execute_params):
|
||||
def urls_from_command_line(command_line: sh.ShellCommand,
|
||||
default_url='unix:/var/run/docker.sock') \
|
||||
-> typing.List[str]:
|
||||
urls = []
|
||||
arg_is_url = False
|
||||
for arg in command_line[1:]:
|
||||
if arg == '-H':
|
||||
arg_is_url = True
|
||||
continue
|
||||
if arg_is_url:
|
||||
arg_is_url = False
|
||||
try:
|
||||
url = parse.urlparse(arg)
|
||||
except Exception:
|
||||
LOG.debug(f'Invalid URL: {arg}', exc_info=1)
|
||||
continue
|
||||
if url.scheme == 'fd':
|
||||
urls.append(default_url)
|
||||
continue
|
||||
if url.scheme != 'unix':
|
||||
LOG.debug(f'Unsupported URL scheme: {arg}')
|
||||
continue
|
||||
if not url.path.startswith('/'):
|
||||
LOG.debug(f'Unsupported URL path: {arg}')
|
||||
continue
|
||||
urls.append(arg)
|
||||
return urls
|
||||
|
||||
|
||||
def is_docker_running(ssh_client=None, sudo=False, **execute_params):
|
||||
try:
|
||||
discover_docker_urls(ssh_client=ssh_client, **execute_params)
|
||||
discover_docker_urls(ssh_client=ssh_client,
|
||||
sudo=sudo,
|
||||
**execute_params)
|
||||
except _exception.DockerUrlNotFoundError:
|
||||
return False
|
||||
else:
|
||||
|
@ -21,47 +21,56 @@ import testtools
|
||||
|
||||
import tobiko
|
||||
from tobiko import docker
|
||||
from tobiko.openstack import keystone
|
||||
from tobiko.openstack import topology
|
||||
from tobiko.shell import ssh
|
||||
|
||||
|
||||
class DockerNodeFixture(tobiko.SharedFixture):
|
||||
class LocalDockerClientTest(testtools.TestCase):
|
||||
|
||||
node = None
|
||||
|
||||
def setup_fixture(self):
|
||||
nodes = topology.list_openstack_nodes()
|
||||
for node in nodes:
|
||||
assert node.ssh_client is not None
|
||||
if docker.is_docker_running(ssh_client=node.ssh_client):
|
||||
self.node = node
|
||||
break
|
||||
|
||||
if self.node is None:
|
||||
nodes_text = ' '.join(node.name for node in nodes)
|
||||
tobiko.skip_test("Docker server is not running in any of nodes "
|
||||
f"{nodes_text}")
|
||||
|
||||
|
||||
@keystone.skip_unless_has_keystone_credentials()
|
||||
class DockerClientTest(testtools.TestCase):
|
||||
|
||||
node = tobiko.required_setup_fixture(DockerNodeFixture)
|
||||
sudo = False
|
||||
|
||||
@property
|
||||
def ssh_client(self):
|
||||
return self.node.node.ssh_client
|
||||
def ssh_client(self) -> ssh.SSHClientType:
|
||||
for ssh_client in self.iter_ssh_clients():
|
||||
if docker.is_docker_running(ssh_client=ssh_client,
|
||||
sudo=self.sudo):
|
||||
return ssh_client
|
||||
tobiko.skip_test('Docker not installed')
|
||||
|
||||
@staticmethod
|
||||
def iter_ssh_clients():
|
||||
yield False
|
||||
|
||||
def test_get_docker_client(self):
|
||||
client = docker.get_docker_client(ssh_client=self.ssh_client)
|
||||
client = docker.get_docker_client(ssh_client=self.ssh_client,
|
||||
sudo=self.sudo)
|
||||
self.assertIsInstance(client, docker.DockerClientFixture)
|
||||
|
||||
def test_connect_docker_client(self):
|
||||
client = docker.get_docker_client(ssh_client=self.ssh_client).connect()
|
||||
client = docker.get_docker_client(ssh_client=self.ssh_client,
|
||||
sudo=self.sudo).connect()
|
||||
self.assertIsInstance(client, docker_client.DockerClient)
|
||||
client.ping()
|
||||
|
||||
def test_list_docker_containers(self):
|
||||
client = docker.get_docker_client(ssh_client=self.ssh_client,
|
||||
sudo=self.sudo)
|
||||
for container in docker.list_docker_containers(
|
||||
ssh_client=self.ssh_client):
|
||||
client=client):
|
||||
self.assertIsInstance(container, containers.Container)
|
||||
|
||||
|
||||
class SShDockerClientTest(LocalDockerClientTest):
|
||||
|
||||
sudo = True
|
||||
|
||||
@staticmethod
|
||||
def iter_ssh_clients():
|
||||
ssh_client = ssh.ssh_proxy_client()
|
||||
if isinstance(ssh_client, ssh.SSHClientFixture):
|
||||
yield ssh_client
|
||||
|
||||
nodes = topology.list_openstack_nodes()
|
||||
for node in nodes:
|
||||
if isinstance(node.ssh_client, ssh.SSHClientFixture):
|
||||
yield ssh_client
|
||||
|
@ -1,137 +0,0 @@
|
||||
# Copyright 2018 Red Hat
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from __future__ import absolute_import
|
||||
|
||||
import mock
|
||||
|
||||
from tobiko import docker
|
||||
from tobiko.tests import unit
|
||||
|
||||
|
||||
# This class help us to emulate tobiko.shell.sh.execute
|
||||
# to simulate errors or not and test the tobiko.docker._shell module
|
||||
class FakeProcess:
|
||||
exit_status = 0
|
||||
stdout = ''
|
||||
stderr = ''
|
||||
|
||||
|
||||
class TestShell(unit.TobikoUnitTest):
|
||||
|
||||
def setUp(self):
|
||||
super(TestShell, self).setUp()
|
||||
self.fproc = FakeProcess()
|
||||
|
||||
@mock.patch('tobiko.shell.sh.execute')
|
||||
def test_discover_docker_urls_single(self, mock_execute):
|
||||
self.fproc.exit_status = 0
|
||||
self.fproc.stdout = 'root 1999 0.0 0.2 1456736 85272 ?' \
|
||||
'Ssl 15:16 0:01 /usr/bin/dockerd -H fd:// ' \
|
||||
'--containerd=/run/containerd/containerd.sock'
|
||||
self.fproc.stderr = ''
|
||||
mock_execute.return_value = self.fproc
|
||||
self.assertEqual(docker.discover_docker_urls(), ['fd://'])
|
||||
|
||||
@mock.patch('tobiko.shell.sh.execute')
|
||||
def test_discover_docker_urls_multiple(self, mock_execute):
|
||||
self.fproc.exit_status = 0
|
||||
self.fproc.stdout = 'root 1999 0.0 0.2 1456736 85272 ?' \
|
||||
'Ssl 15:16 0:01 /usr/bin/dockerd -H fd:// ' \
|
||||
'-H fd:// ' \
|
||||
'--containerd=/run/containerd/containerd.sock'
|
||||
self.fproc.stderr = ''
|
||||
mock_execute.return_value = self.fproc
|
||||
self.assertEqual(docker.discover_docker_urls(), ['fd://', 'fd://'])
|
||||
|
||||
@mock.patch('tobiko.shell.sh.execute')
|
||||
def test_discover_docker_urls_separated(self, mock_execute):
|
||||
self.fproc.exit_status = 0
|
||||
self.fproc.stdout = 'root 1999 0.0 0.2 1456736 85272 ?' \
|
||||
'Ssl 15:16 0:01 /usr/bin/dockerd -H fd:// ' \
|
||||
'--containerd=/run/containerd/containerd.sock ' \
|
||||
'-H boom'
|
||||
self.fproc.stderr = ''
|
||||
mock_execute.return_value = self.fproc
|
||||
self.assertEqual(docker.discover_docker_urls(), ['fd://', 'boom'])
|
||||
|
||||
@mock.patch('tobiko.shell.sh.execute')
|
||||
def test_discover_docker_urls_many_daemons(self, mock_execute):
|
||||
self.fproc.exit_status = 0
|
||||
self.fproc.stdout = 'root 1999 0.0 0.2 1456736 85272 ?' \
|
||||
'Ssl 15:16 0:01 /usr/bin/dockerd ' \
|
||||
'-H fd:// -H boom ' \
|
||||
'--containerd=/run/containerd/containerd.sock\n' \
|
||||
'root 1999 0.0 0.2 1456736 85272 ?' \
|
||||
'Ssl 15:16 0:01 /usr/bin/dockerd -H foo ' \
|
||||
'--containerd=/run/containerd/containerd.sock ' \
|
||||
'-H bar'
|
||||
self.fproc.stderr = ''
|
||||
mock_execute.return_value = self.fproc
|
||||
self.assertEqual(docker.discover_docker_urls(),
|
||||
['fd://', 'boom', 'foo', 'bar'])
|
||||
|
||||
@mock.patch('tobiko.shell.sh.execute')
|
||||
def test_discover_docker_urls_no_daemons(self, mock_execute):
|
||||
self.fproc.exit_status = 0
|
||||
self.fproc.stdout = ''
|
||||
self.fproc.stderr = ''
|
||||
mock_execute.return_value = self.fproc
|
||||
self.assertRaises(docker.DockerUrlNotFoundError,
|
||||
docker.discover_docker_urls)
|
||||
|
||||
@mock.patch('tobiko.shell.sh.execute')
|
||||
def test_is_docker_running(self, mock_execute):
|
||||
self.fproc.exit_status = 0
|
||||
self.fproc.stdout = 'root 1999 0.0 0.2 1456736 85272 ?' \
|
||||
'Ssl 15:16 0:01 /usr/bin/dockerd ' \
|
||||
'-H fd:// -H boom ' \
|
||||
'--containerd=/run/containerd/containerd.sock\n' \
|
||||
'root 1999 0.0 0.2 1456736 85272 ?' \
|
||||
'Ssl 15:16 0:01 /usr/bin/dockerd -H foo ' \
|
||||
'--containerd=/run/containerd/containerd.sock ' \
|
||||
'-H bar'
|
||||
self.fproc.stderr = ''
|
||||
mock_execute.return_value = self.fproc
|
||||
self.assertEqual(docker.is_docker_running(), True)
|
||||
|
||||
@mock.patch('tobiko.shell.sh.execute')
|
||||
def test_is_docker_running_without_results(self, mock_execute):
|
||||
self.fproc.exit_status = 0
|
||||
self.fproc.stdout = ''
|
||||
self.fproc.stderr = ''
|
||||
mock_execute.return_value = self.fproc
|
||||
self.assertEqual(docker.is_docker_running(), False)
|
||||
|
||||
@mock.patch('tobiko.shell.sh.execute')
|
||||
def test_is_docker_running_exit_1(self, mock_execute):
|
||||
self.fproc.exit_status = 1
|
||||
self.fproc.stdout = 'root 1999 0.0 0.2 1456736 85272 ?' \
|
||||
'Ssl 15:16 0:01 /usr/bin/dockerd ' \
|
||||
'-H fd:// -H boom ' \
|
||||
'--containerd=/run/containerd/containerd.sock\n' \
|
||||
'root 1999 0.0 0.2 1456736 85272 ?' \
|
||||
'Ssl 15:16 0:01 /usr/bin/dockerd -H foo ' \
|
||||
'--containerd=/run/containerd/containerd.sock ' \
|
||||
'-H bar'
|
||||
self.fproc.stderr = ''
|
||||
mock_execute.return_value = self.fproc
|
||||
self.assertEqual(docker.is_docker_running(), False)
|
||||
|
||||
@mock.patch('tobiko.shell.sh.execute')
|
||||
def test_is_docker_running_error_exist_but_exit_0(self, mock_execute):
|
||||
self.fproc.exit_status = 0
|
||||
self.fproc.stdout = ''
|
||||
self.fproc.stderr = 'boom'
|
||||
mock_execute.return_value = self.fproc
|
||||
self.assertEqual(docker.is_docker_running(), False)
|
Loading…
x
Reference in New Issue
Block a user