HyperV: Add serial console proxy

This patch adds a serial console proxy. This will be used in order
to expose VM serial ports via TCP sockets.

The named pipe IO workers run in native threads and the serial
proxy uses queues to interact with those workers. For this reason,
the serial proxy will use native threads as well.

This implies that logging cannot be used.

Just as the serial console specification states, only one client
can be connected at a time.

This connection will not be used directly, but via the WebSockets
proxy that may run on the controller, which is also responsible of
performing authentication.

Partially-implements: blueprint hyperv-serial-ports

Change-Id: Ie61994a8af7be26a72388d44844e5c482f960891
This commit is contained in:
Lucian Petrut
2015-05-16 05:19:54 +03:00
parent 18169d572a
commit cae5cdd1fa
3 changed files with 262 additions and 0 deletions

View File

@@ -0,0 +1,130 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import socket
import mock
from nova import exception
from nova.tests.unit.virt.hyperv import test_base
from nova.virt.hyperv import serialproxy
class SerialProxyTestCase(test_base.HyperVBaseTestCase):
@mock.patch.object(socket, 'socket')
def setUp(self, mock_socket):
super(SerialProxyTestCase, self).setUp()
self._mock_socket = mock_socket
self._mock_input_queue = mock.Mock()
self._mock_output_queue = mock.Mock()
self._mock_client_connected = mock.Mock()
threading_patcher = mock.patch.object(serialproxy, 'threading')
threading_patcher.start()
self.addCleanup(threading_patcher.stop)
self._proxy = serialproxy.SerialProxy(
mock.sentinel.instance_nane,
mock.sentinel.host,
mock.sentinel.port,
self._mock_input_queue,
self._mock_output_queue,
self._mock_client_connected)
@mock.patch.object(socket, 'socket')
def test_setup_socket_exception(self, mock_socket):
fake_socket = mock_socket.return_value
fake_socket.listen.side_effect = socket.error
self.assertRaises(exception.NovaException,
self._proxy._setup_socket)
fake_socket.setsockopt.assert_called_once_with(socket.SOL_SOCKET,
socket.SO_REUSEADDR,
1)
fake_socket.bind.assert_called_once_with((mock.sentinel.host,
mock.sentinel.port))
def test_stop_serial_proxy(self):
self._proxy._conn = mock.Mock()
self._proxy._sock = mock.Mock()
self._proxy.stop()
self._proxy._stopped.set.assert_called_once_with()
self._proxy._client_connected.clear.assert_called_once_with()
self._proxy._conn.shutdown.assert_called_once_with(socket.SHUT_RDWR)
self._proxy._conn.close.assert_called_once_with()
self._proxy._sock.close.assert_called_once_with()
@mock.patch.object(serialproxy.SerialProxy, '_accept_conn')
@mock.patch.object(serialproxy.SerialProxy, '_setup_socket')
def test_run(self, mock_setup_socket, mock_accept_con):
self._proxy._stopped = mock.MagicMock()
self._proxy._stopped.isSet.side_effect = [False, True]
self._proxy.run()
mock_setup_socket.assert_called_once_with()
mock_accept_con.assert_called_once_with()
def test_accept_connection(self):
mock_conn = mock.Mock()
self._proxy._sock = mock.Mock()
self._proxy._sock.accept.return_value = [
mock_conn, (mock.sentinel.client_addr, mock.sentinel.client_port)]
self._proxy._accept_conn()
self._proxy._client_connected.set.assert_called_once_with()
mock_conn.close.assert_called_once_with()
self.assertIsNone(self._proxy._conn)
thread = serialproxy.threading.Thread
for job in [self._proxy._get_data,
self._proxy._send_data]:
thread.assert_any_call(target=job)
def test_get_data(self):
self._mock_client_connected.isSet.return_value = True
self._proxy._conn = mock.Mock()
self._proxy._conn.recv.side_effect = [mock.sentinel.data, None]
self._proxy._get_data()
self._mock_client_connected.clear.assert_called_once_with()
self._mock_input_queue.put.assert_called_once_with(mock.sentinel.data)
def _test_send_data(self, exception=None):
self._mock_client_connected.isSet.side_effect = [True, False]
self._mock_output_queue.get_burst.return_value = mock.sentinel.data
self._proxy._conn = mock.Mock()
self._proxy._conn.sendall.side_effect = exception
self._proxy._send_data()
self._proxy._conn.sendall.assert_called_once_with(
mock.sentinel.data)
if exception:
self._proxy._client_connected.clear.assert_called_once_with()
def test_send_data(self):
self._test_send_data()
def test_send_data_exception(self):
self._test_send_data(exception=socket.error)

View File

@@ -18,6 +18,7 @@ Constants used in ops classes
"""
from os_win import constants
from oslo_utils import units
from nova.compute import arch
from nova.compute import power_state
@@ -66,3 +67,5 @@ IMAGE_PROP_VM_GEN_2 = "hyperv-gen2"
VM_GEN_1 = 1
VM_GEN_2 = 2
SERIAL_CONSOLE_BUFFER_SIZE = 4 * units.Ki

View File

@@ -0,0 +1,129 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import socket
from eventlet import patcher
from nova import exception
from nova.i18n import _
from nova.virt.hyperv import constants
# Note(lpetrut): Eventlet greenpipes are not supported on Windows. The named
# pipe handlers implemented in os-win use Windows API calls which can block
# the whole thread. In order to avoid this, those workers run in separate
# 'native' threads.
#
# As this proxy communicates with those workers via queues, the serial console
# proxy workers have to run in 'native' threads as well.
threading = patcher.original('threading')
def handle_socket_errors(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except socket.error:
self._client_connected.clear()
return wrapper
class SerialProxy(threading.Thread):
def __init__(self, instance_name, addr, port, input_queue,
output_queue, client_connected):
super(SerialProxy, self).__init__()
self.setDaemon(True)
self._instance_name = instance_name
self._addr = addr
self._port = port
self._conn = None
self._input_queue = input_queue
self._output_queue = output_queue
self._client_connected = client_connected
self._stopped = threading.Event()
def _setup_socket(self):
try:
self._sock = socket.socket(socket.AF_INET,
socket.SOCK_STREAM)
self._sock.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR,
1)
self._sock.bind((self._addr, self._port))
self._sock.listen(1)
except socket.error as err:
self._sock.close()
msg = (_('Failed to initialize serial proxy on'
'%(addr)s:%(port)s, handling connections '
'to instance %(instance_name)s. Error: %(error)s') %
{'addr': self._addr,
'port': self._port,
'instance_name': self._instance_name,
'error': err})
raise exception.NovaException(msg)
def stop(self):
self._stopped.set()
self._client_connected.clear()
if self._conn:
self._conn.shutdown(socket.SHUT_RDWR)
self._conn.close()
self._sock.close()
def run(self):
self._setup_socket()
while not self._stopped.isSet():
self._accept_conn()
@handle_socket_errors
def _accept_conn(self):
self._conn, client_addr = self._sock.accept()
self._client_connected.set()
workers = []
for job in [self._get_data, self._send_data]:
worker = threading.Thread(target=job)
worker.setDaemon(True)
worker.start()
workers.append(worker)
for worker in workers:
worker_running = (worker.is_alive() and
worker is not threading.current_thread())
if worker_running:
worker.join()
self._conn.close()
self._conn = None
@handle_socket_errors
def _get_data(self):
while self._client_connected.isSet():
data = self._conn.recv(constants.SERIAL_CONSOLE_BUFFER_SIZE)
if not data:
self._client_connected.clear()
return
self._input_queue.put(data)
@handle_socket_errors
def _send_data(self):
while self._client_connected.isSet():
data = self._output_queue.get_burst()
if data:
self._conn.sendall(data)