cae5cdd1fa
This patch adds a serial console proxy. This will be used in order to expose VM serial ports via TCP sockets. The named pipe IO workers run in native threads and the serial proxy uses queues to interact with those workers. For this reason, the serial proxy will use native threads as well. This implies that logging cannot be used. Just as the serial console specification states, only one client can be connected at a time. This connection will not be used directly, but via the WebSockets proxy that may run on the controller, which is also responsible of performing authentication. Partially-implements: blueprint hyperv-serial-ports Change-Id: Ie61994a8af7be26a72388d44844e5c482f960891
130 lines
4.3 KiB
Python
130 lines
4.3 KiB
Python
# Copyright 2016 Cloudbase Solutions Srl
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import functools
|
|
import socket
|
|
|
|
from eventlet import patcher
|
|
|
|
from nova import exception
|
|
from nova.i18n import _
|
|
from nova.virt.hyperv import constants
|
|
|
|
# Note(lpetrut): Eventlet greenpipes are not supported on Windows. The named
|
|
# pipe handlers implemented in os-win use Windows API calls which can block
|
|
# the whole thread. In order to avoid this, those workers run in separate
|
|
# 'native' threads.
|
|
#
|
|
# As this proxy communicates with those workers via queues, the serial console
|
|
# proxy workers have to run in 'native' threads as well.
|
|
threading = patcher.original('threading')
|
|
|
|
|
|
def handle_socket_errors(func):
|
|
@functools.wraps(func)
|
|
def wrapper(self, *args, **kwargs):
|
|
try:
|
|
return func(self, *args, **kwargs)
|
|
except socket.error:
|
|
self._client_connected.clear()
|
|
return wrapper
|
|
|
|
|
|
class SerialProxy(threading.Thread):
|
|
def __init__(self, instance_name, addr, port, input_queue,
|
|
output_queue, client_connected):
|
|
super(SerialProxy, self).__init__()
|
|
self.setDaemon(True)
|
|
|
|
self._instance_name = instance_name
|
|
self._addr = addr
|
|
self._port = port
|
|
self._conn = None
|
|
|
|
self._input_queue = input_queue
|
|
self._output_queue = output_queue
|
|
self._client_connected = client_connected
|
|
self._stopped = threading.Event()
|
|
|
|
def _setup_socket(self):
|
|
try:
|
|
self._sock = socket.socket(socket.AF_INET,
|
|
socket.SOCK_STREAM)
|
|
self._sock.setsockopt(socket.SOL_SOCKET,
|
|
socket.SO_REUSEADDR,
|
|
1)
|
|
self._sock.bind((self._addr, self._port))
|
|
self._sock.listen(1)
|
|
except socket.error as err:
|
|
self._sock.close()
|
|
msg = (_('Failed to initialize serial proxy on'
|
|
'%(addr)s:%(port)s, handling connections '
|
|
'to instance %(instance_name)s. Error: %(error)s') %
|
|
{'addr': self._addr,
|
|
'port': self._port,
|
|
'instance_name': self._instance_name,
|
|
'error': err})
|
|
raise exception.NovaException(msg)
|
|
|
|
def stop(self):
|
|
self._stopped.set()
|
|
self._client_connected.clear()
|
|
if self._conn:
|
|
self._conn.shutdown(socket.SHUT_RDWR)
|
|
self._conn.close()
|
|
self._sock.close()
|
|
|
|
def run(self):
|
|
self._setup_socket()
|
|
while not self._stopped.isSet():
|
|
self._accept_conn()
|
|
|
|
@handle_socket_errors
|
|
def _accept_conn(self):
|
|
self._conn, client_addr = self._sock.accept()
|
|
self._client_connected.set()
|
|
|
|
workers = []
|
|
for job in [self._get_data, self._send_data]:
|
|
worker = threading.Thread(target=job)
|
|
worker.setDaemon(True)
|
|
worker.start()
|
|
workers.append(worker)
|
|
|
|
for worker in workers:
|
|
worker_running = (worker.is_alive() and
|
|
worker is not threading.current_thread())
|
|
if worker_running:
|
|
worker.join()
|
|
|
|
self._conn.close()
|
|
self._conn = None
|
|
|
|
@handle_socket_errors
|
|
def _get_data(self):
|
|
while self._client_connected.isSet():
|
|
data = self._conn.recv(constants.SERIAL_CONSOLE_BUFFER_SIZE)
|
|
if not data:
|
|
self._client_connected.clear()
|
|
return
|
|
self._input_queue.put(data)
|
|
|
|
@handle_socket_errors
|
|
def _send_data(self):
|
|
while self._client_connected.isSet():
|
|
data = self._output_queue.get_burst()
|
|
if data:
|
|
self._conn.sendall(data)
|