diff --git a/xenapi/README b/xenapi/README new file mode 100644 index 00000000..1fc67aa7 --- /dev/null +++ b/xenapi/README @@ -0,0 +1,2 @@ +This directory contains files that are required for the XenAPI support. They +should be installed in the XenServer / Xen Cloud Platform domain 0. diff --git a/xenapi/etc/xapi.d/plugins/objectstore b/xenapi/etc/xapi.d/plugins/objectstore new file mode 100644 index 00000000..271e7337 --- /dev/null +++ b/xenapi/etc/xapi.d/plugins/objectstore @@ -0,0 +1,231 @@ +#!/usr/bin/env python + +# Copyright (c) 2010 Citrix Systems, Inc. +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# +# XenAPI plugin for fetching images from nova-objectstore. +# + +import base64 +import errno +import hmac +import os +import os.path +import sha +import time +import urlparse + +import XenAPIPlugin + +from pluginlib_nova import * +configure_logging('objectstore') + + +KERNEL_DIR = '/boot/guest' + +DOWNLOAD_CHUNK_SIZE = 2 * 1024 * 1024 +SECTOR_SIZE = 512 +MBR_SIZE_SECTORS = 63 +MBR_SIZE_BYTES = MBR_SIZE_SECTORS * SECTOR_SIZE + + +def get_vdi(session, args): + src_url = exists(args, 'src_url') + username = exists(args, 'username') + password = exists(args, 'password') + add_partition = validate_bool(args, 'add_partition', 'false') + + (proto, netloc, url_path, _, _, _) = urlparse.urlparse(src_url) + + sr = find_sr(session) + if sr is None: + raise Exception('Cannot find SR to write VDI to') + + virtual_size = \ + get_content_length(proto, netloc, url_path, username, password) + if virtual_size < 0: + raise Exception('Cannot get VDI size') + + vdi_size = virtual_size + if add_partition: + # Make room for MBR. + vdi_size += MBR_SIZE_BYTES + + vdi = create_vdi(session, sr, src_url, vdi_size, False) + with_vdi_in_dom0(session, vdi, False, + lambda dev: get_vdi_(proto, netloc, url_path, + username, password, add_partition, + virtual_size, '/dev/%s' % dev)) + return session.xenapi.VDI.get_uuid(vdi) + + +def get_vdi_(proto, netloc, url_path, username, password, add_partition, + virtual_size, dest): + + if add_partition: + write_partition(virtual_size, dest) + + offset = add_partition and MBR_SIZE_BYTES or 0 + get(proto, netloc, url_path, username, password, dest, offset) + + +def write_partition(virtual_size, dest): + mbr_last = MBR_SIZE_SECTORS - 1 + primary_first = MBR_SIZE_SECTORS + primary_last = MBR_SIZE_SECTORS + (virtual_size / SECTOR_SIZE) - 1 + + logging.debug('Writing partition table %d %d to %s...', + primary_first, primary_last, dest) + + result = os.system('parted --script %s mklabel msdos' % dest) + if result != 0: + raise Exception('Failed to mklabel') + result = os.system('parted --script %s mkpart primary %ds %ds' % + (dest, primary_first, primary_last)) + if result != 0: + raise Exception('Failed to mkpart') + + logging.debug('Writing partition table %s done.', dest) + + +def find_sr(session): + host = get_this_host(session) + srs = session.xenapi.SR.get_all() + for sr in srs: + sr_rec = session.xenapi.SR.get_record(sr) + if not ('i18n-key' in sr_rec['other_config'] and + sr_rec['other_config']['i18n-key'] == 'local-storage'): + continue + for pbd in sr_rec['PBDs']: + pbd_rec = session.xenapi.PBD.get_record(pbd) + if pbd_rec['host'] == host: + return sr + return None + + +def get_kernel(session, args): + src_url = exists(args, 'src_url') + username = exists(args, 'username') + password = exists(args, 'password') + + (proto, netloc, url_path, _, _, _) = urlparse.urlparse(src_url) + + dest = os.path.join(KERNEL_DIR, url_path[1:]) + + # Paranoid check against people using ../ to do rude things. + if os.path.commonprefix([KERNEL_DIR, dest]) != KERNEL_DIR: + raise Exception('Illegal destination %s %s', (url_path, dest)) + + dirname = os.path.dirname(dest) + try: + os.makedirs(dirname) + except os.error, e: + if e.errno != errno.EEXIST: + raise + if not os.path.isdir(dirname): + raise Exception('Cannot make directory %s', dirname) + + try: + os.remove(dest) + except: + pass + + get(proto, netloc, url_path, username, password, dest, 0) + + return dest + + +def get_content_length(proto, netloc, url_path, username, password): + headers = make_headers('HEAD', url_path, username, password) + return with_http_connection( + proto, netloc, + lambda conn: get_content_length_(url_path, headers, conn)) + + +def get_content_length_(url_path, headers, conn): + conn.request('HEAD', url_path, None, headers) + response = conn.getresponse() + if response.status != 200: + raise Exception('%d %s' % (response.status, response.reason)) + + return long(response.getheader('Content-Length', -1)) + + +def get(proto, netloc, url_path, username, password, dest, offset): + headers = make_headers('GET', url_path, username, password) + download(proto, netloc, url_path, headers, dest, offset) + + +def make_headers(verb, url_path, username, password): + headers = {} + headers['Date'] = \ + time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()) + headers['Authorization'] = \ + 'AWS %s:%s' % (username, + s3_authorization(verb, url_path, password, headers)) + return headers + + +def s3_authorization(verb, path, password, headers): + sha1 = hmac.new(password, digestmod=sha) + sha1.update(plaintext(verb, path, headers)) + return base64.encodestring(sha1.digest()).strip() + + +def plaintext(verb, path, headers): + return '%s\n\n\n%s\n%s' % (verb, + "\n".join([headers[h] for h in headers]), + path) + + +def download(proto, netloc, url_path, headers, dest, offset): + with_http_connection( + proto, netloc, + lambda conn: download_(url_path, dest, offset, headers, conn)) + + +def download_(url_path, dest, offset, headers, conn): + conn.request('GET', url_path, None, headers) + response = conn.getresponse() + if response.status != 200: + raise Exception('%d %s' % (response.status, response.reason)) + + length = response.getheader('Content-Length', -1) + + with_file( + dest, 'a', + lambda dest_file: download_all(response, length, dest_file, offset)) + + +def download_all(response, length, dest_file, offset): + dest_file.seek(offset) + i = 0 + while True: + buf = response.read(DOWNLOAD_CHUNK_SIZE) + if buf: + dest_file.write(buf) + else: + return + i += len(buf) + if length != -1 and i >= length: + return + + +if __name__ == '__main__': + XenAPIPlugin.dispatch({'get_vdi': get_vdi, + 'get_kernel': get_kernel}) diff --git a/xenapi/etc/xapi.d/plugins/pluginlib_nova.py b/xenapi/etc/xapi.d/plugins/pluginlib_nova.py new file mode 100755 index 00000000..2d323a01 --- /dev/null +++ b/xenapi/etc/xapi.d/plugins/pluginlib_nova.py @@ -0,0 +1,216 @@ +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# +# Helper functions for the Nova xapi plugins. In time, this will merge +# with the pluginlib.py shipped with xapi, but for now, that file is not +# very stable, so it's easiest just to have a copy of all the functions +# that we need. +# + +import httplib +import logging +import logging.handlers +import re +import time + + +##### Logging setup + +def configure_logging(name): + log = logging.getLogger() + log.setLevel(logging.DEBUG) + sysh = logging.handlers.SysLogHandler('/dev/log') + sysh.setLevel(logging.DEBUG) + formatter = logging.Formatter('%s: %%(levelname)-8s %%(message)s' % name) + sysh.setFormatter(formatter) + log.addHandler(sysh) + + +##### Exceptions + +class PluginError(Exception): + """Base Exception class for all plugin errors.""" + def __init__(self, *args): + Exception.__init__(self, *args) + +class ArgumentError(PluginError): + """Raised when required arguments are missing, argument values are invalid, + or incompatible arguments are given. + """ + def __init__(self, *args): + PluginError.__init__(self, *args) + + +##### Helpers + +def ignore_failure(func, *args, **kwargs): + try: + return func(*args, **kwargs) + except XenAPI.Failure, e: + logging.error('Ignoring XenAPI.Failure %s', e) + return None + + +##### Argument validation + +ARGUMENT_PATTERN = re.compile(r'^[a-zA-Z0-9_:\.\-,]+$') + +def validate_exists(args, key, default=None): + """Validates that a string argument to a RPC method call is given, and + matches the shell-safe regex, with an optional default value in case it + does not exist. + + Returns the string. + """ + if key in args: + if len(args[key]) == 0: + raise ArgumentError('Argument %r value %r is too short.' % (key, args[key])) + if not ARGUMENT_PATTERN.match(args[key]): + raise ArgumentError('Argument %r value %r contains invalid characters.' % (key, args[key])) + if args[key][0] == '-': + raise ArgumentError('Argument %r value %r starts with a hyphen.' % (key, args[key])) + return args[key] + elif default is not None: + return default + else: + raise ArgumentError('Argument %s is required.' % key) + +def validate_bool(args, key, default=None): + """Validates that a string argument to a RPC method call is a boolean string, + with an optional default value in case it does not exist. + + Returns the python boolean value. + """ + value = validate_exists(args, key, default) + if value.lower() == 'true': + return True + elif value.lower() == 'false': + return False + else: + raise ArgumentError("Argument %s may not take value %r. Valid values are ['true', 'false']." % (key, value)) + +def exists(args, key): + """Validates that a freeform string argument to a RPC method call is given. + Returns the string. + """ + if key in args: + return args[key] + else: + raise ArgumentError('Argument %s is required.' % key) + +def optional(args, key): + """If the given key is in args, return the corresponding value, otherwise + return None""" + return key in args and args[key] or None + + +def get_this_host(session): + return session.xenapi.session.get_this_host(session.handle) + + +def get_domain_0(session): + this_host_ref = get_this_host(session) + expr = 'field "is_control_domain" = "true" and field "resident_on" = "%s"' % this_host_ref + return session.xenapi.VM.get_all_records_where(expr).keys()[0] + + +def create_vdi(session, sr_ref, name_label, virtual_size, read_only): + vdi_ref = session.xenapi.VDI.create( + { 'name_label': name_label, + 'name_description': '', + 'SR': sr_ref, + 'virtual_size': str(virtual_size), + 'type': 'User', + 'sharable': False, + 'read_only': read_only, + 'xenstore_data': {}, + 'other_config': {}, + 'sm_config': {}, + 'tags': [] }) + logging.debug('Created VDI %s (%s, %s, %s) on %s.', vdi_ref, name_label, + virtual_size, read_only, sr_ref) + return vdi_ref + + +def with_vdi_in_dom0(session, vdi, read_only, f): + dom0 = get_domain_0(session) + vbd_rec = {} + vbd_rec['VM'] = dom0 + vbd_rec['VDI'] = vdi + vbd_rec['userdevice'] = 'autodetect' + vbd_rec['bootable'] = False + vbd_rec['mode'] = read_only and 'RO' or 'RW' + vbd_rec['type'] = 'disk' + vbd_rec['unpluggable'] = True + vbd_rec['empty'] = False + vbd_rec['other_config'] = {} + vbd_rec['qos_algorithm_type'] = '' + vbd_rec['qos_algorithm_params'] = {} + vbd_rec['qos_supported_algorithms'] = [] + logging.debug('Creating VBD for VDI %s ... ', vdi) + vbd = session.xenapi.VBD.create(vbd_rec) + logging.debug('Creating VBD for VDI %s done.', vdi) + try: + logging.debug('Plugging VBD %s ... ', vbd) + session.xenapi.VBD.plug(vbd) + logging.debug('Plugging VBD %s done.', vbd) + return f(session.xenapi.VBD.get_device(vbd)) + finally: + logging.debug('Destroying VBD for VDI %s ... ', vdi) + vbd_unplug_with_retry(session, vbd) + ignore_failure(session.xenapi.VBD.destroy, vbd) + logging.debug('Destroying VBD for VDI %s done.', vdi) + + +def vbd_unplug_with_retry(session, vbd): + """Call VBD.unplug on the given VBD, with a retry if we get + DEVICE_DETACH_REJECTED. For reasons which I don't understand, we're + seeing the device still in use, even when all processes using the device + should be dead.""" + while True: + try: + session.xenapi.VBD.unplug(vbd) + logging.debug('VBD.unplug successful first time.') + return + except XenAPI.Failure, e: + if (len(e.details) > 0 and + e.details[0] == 'DEVICE_DETACH_REJECTED'): + logging.debug('VBD.unplug rejected: retrying...') + time.sleep(1) + elif (len(e.details) > 0 and + e.details[0] == 'DEVICE_ALREADY_DETACHED'): + logging.debug('VBD.unplug successful eventually.') + return + else: + logging.error('Ignoring XenAPI.Failure in VBD.unplug: %s', e) + return + + +def with_http_connection(proto, netloc, f): + conn = (proto == 'https' and + httplib.HTTPSConnection(netloc) or + httplib.HTTPConnection(netloc)) + try: + return f(conn) + finally: + conn.close() + + +def with_file(dest_path, mode, f): + dest = open(dest_path, mode) + try: + return f(dest) + finally: + dest.close()