Adding the guestagent.

* Added the manager code to service
* Updated conf so it works on a guest
* Added exceptions for the guest
* Added all the guest code from legacy reddwarf
This commit is contained in:
Michael Basnight 2012-03-18 15:40:35 -05:00
parent dea04d64f9
commit 2bdaa9d8fc
13 changed files with 1449 additions and 19 deletions

View File

@ -22,8 +22,10 @@ eventlet.monkey_patch()
import gettext
import optparse
import os
import socket
import sys
gettext.install('reddwarf', unicode=1)
# If ../reddwarf/__init__.py exists, add ../ to Python search path, so that
@ -37,6 +39,9 @@ if os.path.exists(os.path.join(possible_topdir, 'reddwarf', '__init__.py')):
from reddwarf import version
from reddwarf.common import config
from reddwarf.common import service
# TODO(hub-cap): find out why the db api isint being imported properly
#from reddwarf.db import db_api
if __name__ == '__main__':
parser = optparse.OptionParser(version="%%prog %s"
@ -48,7 +53,9 @@ if __name__ == '__main__':
try:
conf, app = config.Config.load_paste_app('reddwarf-guestagent',
options, args)
server = service.Service.create(binary='reddwarf-guestagent')
# db_api.configure_db(conf)
server = service.Service.create(binary='reddwarf-guestagent',
host=socket.gethostname())
service.serve(server)
service.wait()
except RuntimeError as error:

View File

@ -46,17 +46,11 @@ reddwarf_proxy_admin_tenant_name = admin
reddwarf_auth_url = http://0.0.0.0:5000/v2.0
# Manager impl for the taskmanager
guestagent_manager=reddwarf.guestagent.manager.GuestAgent
guestagent_manager=reddwarf.guestagent.manager.GuestManager
# ============ notifer queue kombu connection options ========================
# ============ kombu connection options ========================
notifier_queue_hostname = localhost
notifier_queue_userid = guest
notifier_queue_password = guest
notifier_queue_ssl = False
notifier_queue_port = 5672
notifier_queue_virtual_host = /
notifier_queue_transport = memory
rabbit_host=10.0.0.1
[composite:reddwarf-guestagent]
use = call:reddwarf.common.wsgi:versioned_urlmap

View File

@ -47,3 +47,9 @@ class InvalidRPCConnectionReuse(ReddwarfError):
class NotFound(ReddwarfError):
message = _("Resource %(uuid)s cannot be found")
class GuestError(ReddwarfError):
message = _("An error occurred communicating with the guest: "
"%(original_message).")

View File

@ -162,6 +162,27 @@ class Service(object):
return service_obj
class Manager(object):
def __init__(self, host=None):
if not host:
#TODO(hub-cap): We need to fix this
host = "ubuntu"
self.host = host
super(Manager, self).__init__()
def periodic_tasks(self, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
pass
def init_host(self):
"""Handle initialization if this is a standalone service.
Child classes should override this method.
"""
pass
_launcher = None

View File

@ -18,7 +18,9 @@
import datetime
import inspect
import logging
import re
import sys
import uuid
from eventlet import event
@ -29,6 +31,7 @@ from eventlet.green import subprocess
from reddwarf.openstack.common import utils as openstack_utils
LOG = logging.getLogger(__name__)
import_class = openstack_utils.import_class
import_object = openstack_utils.import_object
bool_from_string = openstack_utils.bool_from_string

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright (c) 2011 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -14,3 +14,5 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from api import API

169
reddwarf/guestagent/api.py Normal file
View File

@ -0,0 +1,169 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Handles all request to the Platform or Guest VM
"""
import logging
from reddwarf import rpc
from reddwarf.common import config
from reddwarf.common import exception
from reddwarf.common import utils
# from nova.db import api as dbapi
# from nova.db import base
LOG = logging.getLogger(__name__)
class API(object):
"""API for interacting with the guest manager."""
def __init__(self, **kwargs):
super(API, self).__init__(**kwargs)
def _get_routing_key(self, context, id):
"""Create the routing key based on the container id"""
# TODO(hub-cap): make this work in a real environment with
# more than one guest
return "guestagent.ubuntu"
def create_user(self, context, id, users):
"""Make an asynchronous call to create a new database user"""
LOG.debug("Creating Users for Instance %s", id)
rpc.cast(context, self._get_routing_key(context, id),
{"method": "create_user",
"args": {"users": users}
})
def list_users(self, context, id):
"""Make an asynchronous call to list database users"""
LOG.debug("Listing Users for Instance %s", id)
return rpc.call(context, self._get_routing_key(context, id),
{"method": "list_users"})
def delete_user(self, context, id, user):
"""Make an asynchronous call to delete an existing database user"""
LOG.debug("Deleting user %s for Instance %s",
user, id)
rpc.cast(context, self._get_routing_key(context, id),
{"method": "delete_user",
"args": {"user": user}
})
def create_database(self, context, id, databases):
"""Make an asynchronous call to create a new database
within the specified container"""
LOG.debug("Creating databases for Instance %s", id)
rpc.cast(context, self._get_routing_key(context, id),
{"method": "create_database",
"args": {"databases": databases}
})
def list_databases(self, context, id):
"""Make an asynchronous call to list database users"""
LOG.debug("Listing Users for Instance %s", id)
return rpc.call(context, self._get_routing_key(context, id),
{"method": "list_databases"})
def delete_database(self, context, id, database):
"""Make an asynchronous call to delete an existing database
within the specified container"""
LOG.debug("Deleting database %s for Instance %s",
database, id)
rpc.cast(context, self._get_routing_key(context, id),
{"method": "delete_database",
"args": {"database": database}
})
def enable_root(self, context, id):
"""Make a synchronous call to enable the root user for
access from anywhere"""
LOG.debug("Enable root user for Instance %s", id)
return rpc.call(context, self._get_routing_key(context, id),
{"method": "enable_root"})
def disable_root(self, context, id):
"""Make a synchronous call to disable the root user for
access from anywhere"""
LOG.debug("Disable root user for Instance %s", id)
return rpc.call(context, self._get_routing_key(context, id),
{"method": "disable_root"})
def is_root_enabled(self, context, id):
"""Make a synchronous call to check if root access is
available for the container"""
LOG.debug("Check root access for Instance %s", id)
return rpc.call(context, self._get_routing_key(context, id),
{"method": "is_root_enabled"})
def get_diagnostics(self, context, id):
"""Make a synchronous call to get diagnostics for the container"""
LOG.debug("Check diagnostics on Instance %s", id)
return rpc.call(context, self._get_routing_key(context, id),
{"method": "get_diagnostics"})
def prepare(self, context, id, memory_mb, databases):
"""Make an asynchronous call to prepare the guest
as a database container"""
LOG.debug(_("Sending the call to prepare the Guest"))
#TODO(hub-cap): add this to the kombu api
rpc.cast_with_consumer(context, self._get_routing_key(context, id),
{"method": "prepare",
"args": {"databases": databases,
"memory_mb": memory_mb}
})
def restart(self, context, id):
"""Restart the MySQL server."""
LOG.debug(_("Sending the call to restart MySQL on the Guest."))
rpc.call(context, self._get_routing_key(context, id),
{"method": "restart",
"args": {}
})
def start_mysql_with_conf_changes(self, context, id, updated_memory_size):
"""Start the MySQL server."""
LOG.debug(_("Sending the call to start MySQL on the Guest."))
try:
rpc.call(context, self._get_routing_key(context, id),
{"method": "start_mysql_with_conf_changes",
"args": {'updated_memory_size': updated_memory_size}
})
except Exception as e:
LOG.error(e)
raise exception.GuestError(original_message=str(e))
def stop_mysql(self, context, id):
"""Stop the MySQL server."""
LOG.debug(_("Sending the call to stop MySQL on the Guest."))
try:
rpc.call(context, self._get_routing_key(context, id),
{"method": "stop_mysql",
"args": {}
})
except Exception as e:
LOG.error(e)
raise exception.GuestError(original_message=str(e))
def upgrade(self, context, id):
"""Make an asynchronous call to self upgrade the guest agent"""
topic = self._get_routing_key(context, id)
LOG.debug("Sending an upgrade call to nova-guest %s", topic)
rpc.cast_with_consumer(context, topic, {"method": "upgrade"})

View File

@ -0,0 +1,16 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,383 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import string
class Base(object):
def serialize(self):
return self.__dict__
def deserialize(self, o):
self.__dict__ = o
class MySQLDatabase(Base):
"""Represents a Database and its properties"""
# Defaults
__charset__ = "utf8"
__collation__ = "utf8_general_ci"
dbname = re.compile("^[A-Za-z0-9_-]+[\s\?\#\@]*[A-Za-z0-9_-]+$")
# Complete list of acceptable values
charset = {"big5": ["big5_chinese_ci", "big5_bin"],
"dec8": ["dec8_swedish_ci", "dec8_bin"],
"cp850": ["cp850_general_ci", "cp850_bin"],
"hp8": ["hp8_english_ci", "hp8_bin"],
"koi8r": ["koi8r_general_ci", "koi8r_bin"],
"latin1": ["latin1_swedish_ci",
"latin1_german1_ci",
"latin1_danish_ci",
"latin1_german2_ci",
"latin1_bin",
"latin1_general_ci",
"latin1_general_cs",
"latin1_spanish_ci"],
"latin2": ["latin2_general_ci",
"latin2_czech_cs",
"latin2_hungarian_ci",
"latin2_croatian_ci",
"latin2_bin"],
"swe7": ["swe7_swedish_ci", "swe7_bin"],
"ascii": ["ascii_general_ci", "ascii_bin"],
"ujis": ["ujis_japanese_ci", "ujis_bin"],
"sjis": ["sjis_japanese_ci", "sjis_bin"],
"hebrew": ["hebrew_general_ci", "hebrew_bin"],
"tis620": ["tis620_thai_ci", "tis620_bin"],
"euckr": ["euckr_korean_ci", "euckr_bin"],
"koi8u": ["koi8u_general_ci", "koi8u_bin"],
"gb2312": ["gb2312_chinese_ci", "gb2312_bin"],
"greek": ["greek_general_ci", "greek_bin"],
"cp1250": ["cp1250_general_ci",
"cp1250_czech_cs",
"cp1250_croatian_ci",
"cp1250_bin",
"cp1250_polish_ci"],
"gbk": ["gbk_chinese_ci", "gbk_bin"],
"latin5": ["latin5_turkish_ci", "latin5_bin"],
"armscii8": ["armscii8_general_ci", "armscii8_bin"],
"utf8": ["utf8_general_ci",
"utf8_bin",
"utf8_unicode_ci",
"utf8_icelandic_ci",
"utf8_latvian_ci",
"utf8_romanian_ci",
"utf8_slovenian_ci",
"utf8_polish_ci",
"utf8_estonian_ci",
"utf8_spanish_ci",
"utf8_swedish_ci",
"utf8_turkish_ci",
"utf8_czech_ci",
"utf8_danish_ci",
"utf8_lithuanian_ci",
"utf8_slovak_ci",
"utf8_spanish2_ci",
"utf8_roman_ci",
"utf8_persian_ci",
"utf8_esperanto_ci",
"utf8_hungarian_ci"],
"ucs2": ["ucs2_general_ci",
"ucs2_bin",
"ucs2_unicode_ci",
"ucs2_icelandic_ci",
"ucs2_latvian_ci",
"ucs2_romanian_ci",
"ucs2_slovenian_ci",
"ucs2_polish_ci",
"ucs2_estonian_ci",
"ucs2_spanish_ci",
"ucs2_swedish_ci",
"ucs2_turkish_ci",
"ucs2_czech_ci",
"ucs2_danish_ci",
"ucs2_lithuanian_ci",
"ucs2_slovak_ci",
"ucs2_spanish2_ci",
"ucs2_roman_ci",
"ucs2_persian_ci",
"ucs2_esperanto_ci",
"ucs2_hungarian_ci"],
"cp866": ["cp866_general_ci", "cp866_bin"],
"keybcs2": ["keybcs2_general_ci", "keybcs2_bin"],
"macce": ["macce_general_ci", "macce_bin"],
"macroman": ["macroman_general_ci", "macroman_bin"],
"cp852": ["cp852_general_ci", "cp852_bin"],
"latin7": ["latin7_general_ci",
"latin7_estonian_cs",
"latin7_general_cs",
"latin7_bin"],
"cp1251": ["cp1251_general_ci",
"cp1251_bulgarian_ci",
"cp1251_ukrainian_ci",
"cp1251_bin",
"cp1251_general_cs"],
"cp1256": ["cp1256_general_ci", "cp1256_bin"],
"cp1257": ["cp1257_general_ci",
"cp1257_lithuanian_ci",
"cp1257_bin"],
"binary": ["binary"],
"geostd8": ["geostd8_general_ci", "geostd8_bin"],
"cp932": ["cp932_japanese_ci", "cp932_bin"],
"eucjpms": ["eucjpms_japanese_ci", "eucjpms_bin"]}
collation = {"big5_chinese_ci": "big5",
"big5_bin": "big5",
"dec8_swedish_ci": "dec8",
"dec8_bin": "dec8",
"cp850_general_ci": "cp850",
"cp850_bin": "cp850",
"hp8_english_ci": "hp8",
"hp8_bin": "hp8",
"koi8r_general_ci": "koi8r",
"koi8r_bin": "koi8r",
"latin1_german1_ci": "latin1",
"latin1_swedish_ci": "latin1",
"latin1_danish_ci": "latin1",
"latin1_german2_ci": "latin1",
"latin1_bin": "latin1",
"latin1_general_ci": "latin1",
"latin1_general_cs": "latin1",
"latin1_spanish_ci": "latin1",
"latin2_czech_cs": "latin2",
"latin2_general_ci": "latin2",
"latin2_hungarian_ci": "latin2",
"latin2_croatian_ci": "latin2",
"latin2_bin": "latin2",
"swe7_swedish_ci": "swe7",
"swe7_bin": "swe7",
"ascii_general_ci": "ascii",
"ascii_bin": "ascii",
"ujis_japanese_ci": "ujis",
"ujis_bin": "ujis",
"sjis_japanese_ci": "sjis",
"sjis_bin": "sjis",
"hebrew_general_ci": "hebrew",
"hebrew_bin": "hebrew",
"tis620_thai_ci": "tis620",
"tis620_bin": "tis620",
"euckr_korean_ci": "euckr",
"euckr_bin": "euckr",
"koi8u_general_ci": "koi8u",
"koi8u_bin": "koi8u",
"gb2312_chinese_ci": "gb2312",
"gb2312_bin": "gb2312",
"greek_general_ci": "greek",
"greek_bin": "greek",
"cp1250_general_ci": "cp1250",
"cp1250_czech_cs": "cp1250",
"cp1250_croatian_ci": "cp1250",
"cp1250_bin": "cp1250",
"cp1250_polish_ci": "cp1250",
"gbk_chinese_ci": "gbk",
"gbk_bin": "gbk",
"latin5_turkish_ci": "latin5",
"latin5_bin": "latin5",
"armscii8_general_ci": "armscii8",
"armscii8_bin": "armscii8",
"utf8_general_ci": "utf8",
"utf8_bin": "utf8",
"utf8_unicode_ci": "utf8",
"utf8_icelandic_ci": "utf8",
"utf8_latvian_ci": "utf8",
"utf8_romanian_ci": "utf8",
"utf8_slovenian_ci": "utf8",
"utf8_polish_ci": "utf8",
"utf8_estonian_ci": "utf8",
"utf8_spanish_ci": "utf8",
"utf8_swedish_ci": "utf8",
"utf8_turkish_ci": "utf8",
"utf8_czech_ci": "utf8",
"utf8_danish_ci": "utf8",
"utf8_lithuanian_ci": "utf8",
"utf8_slovak_ci": "utf8",
"utf8_spanish2_ci": "utf8",
"utf8_roman_ci": "utf8",
"utf8_persian_ci": "utf8",
"utf8_esperanto_ci": "utf8",
"utf8_hungarian_ci": "utf8",
"ucs2_general_ci": "ucs2",
"ucs2_bin": "ucs2",
"ucs2_unicode_ci": "ucs2",
"ucs2_icelandic_ci": "ucs2",
"ucs2_latvian_ci": "ucs2",
"ucs2_romanian_ci": "ucs2",
"ucs2_slovenian_ci": "ucs2",
"ucs2_polish_ci": "ucs2",
"ucs2_estonian_ci": "ucs2",
"ucs2_spanish_ci": "ucs2",
"ucs2_swedish_ci": "ucs2",
"ucs2_turkish_ci": "ucs2",
"ucs2_czech_ci": "ucs2",
"ucs2_danish_ci": "ucs2",
"ucs2_lithuanian_ci": "ucs2",
"ucs2_slovak_ci": "ucs2",
"ucs2_spanish2_ci": "ucs2",
"ucs2_roman_ci": "ucs2",
"ucs2_persian_ci": "ucs2",
"ucs2_esperanto_ci": "ucs2",
"ucs2_hungarian_ci": "ucs2",
"cp866_general_ci": "cp866",
"cp866_bin": "cp866",
"keybcs2_general_ci": "keybcs2",
"keybcs2_bin": "keybcs2",
"macce_general_ci": "macce",
"macce_bin": "macce",
"macroman_general_ci": "macroman",
"macroman_bin": "macroman",
"cp852_general_ci": "cp852",
"cp852_bin": "cp852",
"latin7_estonian_cs": "latin7",
"latin7_general_ci": "latin7",
"latin7_general_cs": "latin7",
"latin7_bin": "latin7",
"cp1251_bulgarian_ci": "cp1251",
"cp1251_ukrainian_ci": "cp1251",
"cp1251_bin": "cp1251",
"cp1251_general_ci": "cp1251",
"cp1251_general_cs": "cp1251",
"cp1256_general_ci": "cp1256",
"cp1256_bin": "cp1256",
"cp1257_lithuanian_ci": "cp1257",
"cp1257_bin": "cp1257",
"cp1257_general_ci": "cp1257",
"binary": "binary",
"geostd8_general_ci": "geostd8",
"geostd8_bin": "geostd8",
"cp932_japanese_ci": "cp932",
"cp932_bin": "cp932",
"eucjpms_japanese_ci": "eucjpms",
"eucjpms_bin": "eucjpms"}
def __init__(self):
self._name = None
self._collate = None
self._character_set = None
@property
def name(self):
return self._name
@name.setter
def name(self, value):
if not value or not self.dbname.match(value) or \
string.find("%r" % value, "\\") != -1:
raise ValueError("'%s' is not a valid database name" % value)
elif len(value) > 64:
raise ValueError("Database name '%s' is too long. Max length = 64"
% value)
else:
self._name = value
@property
def collate(self):
"""Get the appropriate collate value"""
if not self._collate and not self._character_set:
return self.__collation__
elif not self._collate:
return self.charset[self._character_set][0]
else:
return self._collate
@collate.setter
def collate(self, value):
"""Validate the collation and set it"""
if not value:
pass
elif self._character_set:
if not value in self.charset[self._character_set]:
raise ValueError("'%s' not a valid collation for charset '%s'"
% (value, self._character_set))
self._collate = value
else:
if not value in self.collation:
raise ValueError("'%s' not a valid collation" % value)
self._collate = value
self._character_set = self.collation[value]
@property
def character_set(self):
"""Get the appropriate character set value"""
if not self._character_set:
return self.__charset__
else:
return self._character_set
@character_set.setter
def character_set(self, value):
"""Validate the character set and set it"""
if not value:
pass
elif not value in self.charset:
raise ValueError("'%s' not a valid character set" % value)
else:
self._character_set = value
class MySQLUser(Base):
"""Represents a MySQL User and its associated properties"""
not_supported_chars = re.compile("^\s|\s$|'|\"|;|`|,|/|\\\\")
def __init__(self):
self._name = None
self._password = None
self._databases = []
def _check_valid(self, value):
if not value or self.not_supported_chars.search(value) or \
string.find("%r" % value, "\\") != -1:
return False
else:
return True
@property
def name(self):
return self._name
@name.setter
def name(self, value):
if not self._check_valid(value):
raise ValueError("'%s' is not a valid user name" % value)
elif len(value) > 16:
raise ValueError("User name '%s' is too long. Max length = 16"
% value)
else:
self._name = value
@property
def password(self):
return self._password
@password.setter
def password(self, value):
if not self._check_valid(value):
raise ValueError("'%s' is not a valid password" % value)
else:
self._password = value
@property
def databases(self):
return self._databases
@databases.setter
def databases(self, value):
mydb = MySQLDatabase()
mydb.name = value
self._databases.append(mydb.serialize())

View File

@ -0,0 +1,495 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Handles all processes within the Guest VM, considering it as a Platform
The :py:class:`GuestManager` class is a :py:class:`nova.manager.Manager` that
handles RPC calls relating to Platform specific operations.
**Related Flags**
"""
import logging
import os
import re
import sys
import uuid
from datetime import date
from sqlalchemy import create_engine
from sqlalchemy import exc
from sqlalchemy import interfaces
from sqlalchemy.sql.expression import text
# from nova.compute import power_state
# from nova.exception import ProcessExecutionError
# from reddwarf.db import api as dbapi
from reddwarf.common.exception import ProcessExecutionError
from reddwarf.common import utils
# from reddwarf.guestagent import utils as guest_utils
from reddwarf.guestagent.db import models
ADMIN_USER_NAME = "os_admin"
LOG = logging.getLogger(__name__)
FLUSH = text("""FLUSH PRIVILEGES;""")
ENGINE = None
MYSQLD_ARGS = None
PREPARING = False
def generate_random_password():
return str(uuid.uuid4())
def get_engine():
"""Create the default engine with the updated admin user"""
#TODO(rnirmal):Based on permissions issues being resolved we may revert
#url = URL(drivername='mysql', host='localhost',
# query={'read_default_file': '/etc/mysql/my.cnf'})
global ENGINE
if ENGINE:
return ENGINE
#ENGINE = create_engine(name_or_url=url)
pwd, err = utils.execute("sudo", "awk", "/password\\t=/{print $3}",
"/etc/mysql/my.cnf")
if not err:
ENGINE = create_engine("mysql://%s:%s@localhost:3306" %
(ADMIN_USER_NAME, pwd.strip()),
pool_recycle=7200, echo=True,
listeners=[KeepAliveConnection()])
else:
LOG.error(_(err))
return ENGINE
def load_mysqld_options():
try:
out, err = utils.execute("/usr/sbin/mysqld", "--print-defaults",
run_as_root=True)
arglist = re.split("\n", out)[1].split()
args = {}
for item in arglist:
if "=" in item:
key, value = item.split("=")
args[key.lstrip("--")] = value
else:
args[item.lstrip("--")] = None
return args
except ProcessExecutionError as e:
return None
class DBaaSAgent(object):
""" Database as a Service Agent Controller """
def create_user(self, users):
"""Create users and grant them privileges for the
specified databases"""
host = "%"
client = LocalSqlClient(get_engine())
with client:
for item in users:
user = models.MySQLUser()
user.deserialize(item)
# TODO(cp16net):Should users be allowed to create users
# 'os_admin' or 'debian-sys-maint'
t = text("""CREATE USER `%s`@:host IDENTIFIED BY '%s';"""
% (user.name, user.password))
client.execute(t, host=host)
for database in user.databases:
mydb = models.MySQLDatabase()
mydb.deserialize(database)
t = text("""
GRANT ALL PRIVILEGES ON `%s`.* TO `%s`@:host;"""
% (mydb.name, user.name))
client.execute(t, host=host)
def list_users(self):
"""List users that have access to the database"""
LOG.debug("---Listing Users---")
users = []
client = LocalSqlClient(get_engine())
with client:
mysql_user = models.MySQLUser()
t = text("""select User from mysql.user where host !=
'localhost';""")
result = client.execute(t)
LOG.debug("result = " + str(result))
for row in result:
LOG.debug("user = " + str(row))
mysql_user = models.MySQLUser()
mysql_user.name = row['User']
users.append(mysql_user.serialize())
LOG.debug("users = " + str(users))
return users
def delete_user(self, user):
"""Delete the specified users"""
client = LocalSqlClient(get_engine())
with client:
mysql_user = models.MySQLUser()
mysql_user.deserialize(user)
t = text("""DROP USER `%s`""" % mysql_user.name)
client.execute(t)
def create_database(self, databases):
"""Create the list of specified databases"""
client = LocalSqlClient(get_engine())
with client:
for item in databases:
mydb = models.MySQLDatabase()
mydb.deserialize(item)
t = text("""CREATE DATABASE IF NOT EXISTS
`%s` CHARACTER SET = %s COLLATE = %s;"""
% (mydb.name, mydb.character_set, mydb.collate))
client.execute(t)
def list_databases(self):
"""List databases the user created on this mysql instance"""
LOG.debug("---Listing Databases---")
databases = []
client = LocalSqlClient(get_engine())
with client:
# If you have an external volume mounted at /var/lib/mysql
# the lost+found directory will show up in mysql as a database
# which will create errors if you try to do any database ops
# on it. So we remove it here if it exists.
t = text('''
SELECT
schema_name as name,
default_character_set_name as charset,
default_collation_name as collation
FROM
information_schema.schemata
WHERE
schema_name not in
('mysql', 'information_schema', 'lost+found')
ORDER BY
schema_name ASC;
''')
database_names = client.execute(t)
LOG.debug("database_names = %r" % database_names)
for database in database_names:
LOG.debug("database = %s " % str(database))
mysql_db = models.MySQLDatabase()
mysql_db.name = database[0]
mysql_db.character_set = database[1]
mysql_db.collate = database[2]
databases.append(mysql_db.serialize())
LOG.debug("databases = " + str(databases))
return databases
def delete_database(self, database):
"""Delete the specified database"""
client = LocalSqlClient(get_engine())
with client:
mydb = models.MySQLDatabase()
mydb.deserialize(database)
t = text("""DROP DATABASE `%s`;""" % mydb.name)
client.execute(t)
def enable_root(self):
"""Enable the root user global access and/or reset the root password"""
host = "%"
user = models.MySQLUser()
user.name = "root"
user.password = generate_random_password()
client = LocalSqlClient(get_engine())
with client:
try:
t = text("""CREATE USER :user@:host;""")
client.execute(t, user=user.name, host=host, pwd=user.password)
except exc.OperationalError as err:
# Ignore, user is already created, just reset the password
# TODO(rnirmal): More fine grained error checking later on
LOG.debug(err)
with client:
t = text("""UPDATE mysql.user SET Password=PASSWORD(:pwd)
WHERE User=:user;""")
client.execute(t, user=user.name, pwd=user.password)
t = text("""GRANT ALL PRIVILEGES ON *.* TO :user@:host
WITH GRANT OPTION;""")
client.execute(t, user=user.name, host=host)
return user.serialize()
def disable_root(self):
"""Disable root access apart from localhost"""
host = "localhost"
pwd = generate_random_password()
user = "root"
client = LocalSqlClient(get_engine())
with client:
t = text("""DELETE FROM mysql.user where User=:user
and Host!=:host""")
client.execute(t, user=user, host=host)
t = text("""UPDATE mysql.user SET Password=PASSWORD(:pwd)
WHERE User=:user;""")
client.execute(t, pwd=pwd, user=user)
return True
def is_root_enabled(self):
"""Return True if root access is enabled; False otherwise."""
client = LocalSqlClient(get_engine())
with client:
mysql_user = models.MySQLUser()
t = text("""SELECT User FROM mysql.user where User = 'root'
and host != 'localhost';""")
result = client.execute(t)
LOG.debug("result = " + str(result))
return result.rowcount != 0
def prepare(self, databases):
"""Makes ready DBAAS on a Guest container."""
global PREPARING
PREPARING = True
from reddwarf.guest.pkg import PkgAgent
if not isinstance(self, PkgAgent):
raise TypeError("This must also be an instance of Pkg agent.")
preparer = DBaaSPreparer(self)
preparer.prepare()
self.create_database(databases)
PREPARING = False
def update_status(self):
"""Update the status of the MySQL service"""
global MYSQLD_ARGS
global PREPARING
# instance_id = guest_utils.get_instance_id()
if PREPARING:
#TODO(hub-cap): Fix the guest_status_update
# dbapi.guest_status_update(instance_id, power_state.BUILDING)
return
try:
out, err = utils.execute("/usr/bin/mysqladmin", "ping",
run_as_root=True)
#TODO(hub-cap): Fix the guest_status_update
# dbapi.guest_status_update(instance_id, power_state.RUNNING)
except ProcessExecutionError as e:
try:
out, err = utils.execute("ps", "-C", "mysqld", "h")
pid = out.split()[0]
# TODO(rnirmal): Need to create new statuses for instances
# where the mysql service is up, but unresponsive
#TODO(hub-cap): Fix the guest_status_update
# dbapi.guest_status_update(instance_id, power_state.BLOCKED)
except ProcessExecutionError as e:
if not MYSQLD_ARGS:
MYSQLD_ARGS = load_mysqld_options()
pid_file = MYSQLD_ARGS.get('pid-file',
'/var/run/mysqld/mysqld.pid')
if os.path.exists(pid_file):
pass
#TODO(hub-cap): Fix the guest_status_update
# dbapi.guest_status_update(instance_id,
# power_state.CRASHED)
else:
pass
#TODO(hub-cap): Fix the guest_status_update
# dbapi.guest_status_update(instance_id,
# power_state.SHUTDOWN)
class LocalSqlClient(object):
"""A sqlalchemy wrapper to manage transactions"""
def __init__(self, engine, use_flush=True):
self.engine = engine
self.use_flush = use_flush
def __enter__(self):
self.conn = self.engine.connect()
self.trans = self.conn.begin()
return self.conn
def __exit__(self, type, value, traceback):
if self.trans:
if type is not None: # An error occurred
self.trans.rollback()
else:
if self.use_flush:
self.conn.execute(FLUSH)
self.trans.commit()
self.conn.close()
def execute(self, t, **kwargs):
try:
return self.conn.execute(t, kwargs)
except:
self.trans.rollback()
self.trans = None
raise
class KeepAliveConnection(interfaces.PoolListener):
"""
A connection pool listener that ensures live connections are returned
from the connecction pool at checkout. This alleviates the problem of
MySQL connections timeing out.
"""
def checkout(self, dbapi_con, con_record, con_proxy):
"""Event triggered when a connection is checked out from the pool"""
try:
try:
dbapi_con.ping(False)
except TypeError:
dbapi_con.ping()
except dbapi_con.OperationalError, ex:
if ex.args[0] in (2006, 2013, 2014, 2045, 2055):
raise exc.DisconnectionError()
else:
raise
class DBaaSPreparer(object):
"""Prepares DBaaS on a Guest container."""
TIME_OUT = 1000
def __init__(self, pkg_agent):
""" By default login with root no password for initial setup. """
self.engine = create_engine("mysql://root:@localhost:3306", echo=True)
self.pkg = pkg_agent
def _generate_root_password(self, client):
""" Generate and set a random root password and forget about it. """
t = text("""UPDATE mysql.user SET Password=PASSWORD(:pwd)
WHERE User='root';""")
client.execute(t, pwd=generate_random_password())
def _init_mycnf(self, password):
"""
Install the set of mysql my.cnf templates from dbaas-mycnf package.
The package generates a template suited for the current
container flavor. Update the os_admin user and password
to the my.cnf file for direct login from localhost
"""
orig_mycnf = "/etc/mysql/my.cnf"
final_mycnf = "/var/lib/mysql/my.cnf"
tmp_mycnf = "/tmp/my.cnf.tmp"
dbaas_mycnf = "/etc/dbaas/my.cnf/my.cnf.default"
LOG.debug(_("Installing my.cnf templates"))
self.pkg.pkg_install("dbaas-mycnf", self.TIME_OUT)
if os.path.isfile(dbaas_mycnf):
utils.execute("sudo", "mv", orig_mycnf,
"%(name)s.%(date)s"
% {'name': orig_mycnf,
'date': date.today().isoformat()})
utils.execute("sudo", "cp", dbaas_mycnf, orig_mycnf)
mycnf_file = open(orig_mycnf, 'r')
tmp_file = open(tmp_mycnf, 'w')
for line in mycnf_file:
tmp_file.write(line)
if "[client]" in line:
tmp_file.write("user\t\t= %s\n" % ADMIN_USER_NAME)
tmp_file.write("password\t= %s\n" % password)
mycnf_file.close()
tmp_file.close()
utils.execute("sudo", "mv", tmp_mycnf, final_mycnf)
utils.execute("sudo", "rm", orig_mycnf)
utils.execute("sudo", "ln", "-s", final_mycnf, orig_mycnf)
def _remove_anonymous_user(self, client):
t = text("""DELETE FROM mysql.user WHERE User='';""")
client.execute(t)
def _remove_remote_root_access(self, client):
t = text("""DELETE FROM mysql.user
WHERE User='root'
AND Host!='localhost';""")
client.execute(t)
def _create_admin_user(self, client, password):
"""
Create a os_admin user with a random password
with all privileges similar to the root user
"""
t = text("CREATE USER :user@'localhost';")
client.execute(t, user=ADMIN_USER_NAME)
t = text("""
UPDATE mysql.user SET Password=PASSWORD(:pwd)
WHERE User=:user;
""")
client.execute(t, pwd=password, user=ADMIN_USER_NAME)
t = text("""
GRANT ALL PRIVILEGES ON *.* TO :user@'localhost'
WITH GRANT OPTION;
""")
client.execute(t, user=ADMIN_USER_NAME)
def _install_mysql(self):
"""Install mysql server. The current version is 5.1"""
LOG.debug(_("Installing mysql server"))
self.pkg.pkg_install("mysql-server-5.1", self.TIME_OUT)
#TODO(rnirmal): Add checks to make sure the package got installed
def _restart_mysql(self):
"""
Restart mysql after all the modifications are completed.
List of modifications:
- Remove existing ib_logfile*
"""
# TODO(rnirmal): To be replaced by the mounted volume location
# FIXME once we have volumes in place, use default till then
mysql_base_dir = "/var/lib/mysql"
try:
LOG.debug(_("Restarting mysql..."))
utils.execute("sudo", "service", "mysql", "stop")
# Remove the ib_logfile, if not mysql won't start.
# For some reason wildcards don't seem to work, so
# deleting both the files separately
utils.execute("sudo", "rm", "%s/ib_logfile0" % mysql_base_dir)
utils.execute("sudo", "rm", "%s/ib_logfile1" % mysql_base_dir)
utils.execute("sudo", "service", "mysql", "start")
except ProcessExecutionError:
LOG.error(_("Unable to restart mysql server."))
def prepare(self):
"""Prepare the guest machine with a secure mysql server installation"""
LOG.info(_("Preparing Guest as MySQL Server"))
try:
utils.execute("apt-get", "update", run_as_root=True)
except ProcessExecutionError as e:
LOG.error(_("Error updating the apt sources"))
self._install_mysql()
admin_password = generate_random_password()
client = LocalSqlClient(self.engine)
with client:
self._generate_root_password(client)
self._remove_anonymous_user(client)
self._remove_remote_root_access(client)
self._create_admin_user(client, admin_password)
self._init_mycnf(admin_password)
self._restart_mysql()
LOG.info(_("Dbaas preparation complete."))

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright (c) 2011 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -15,19 +15,80 @@
# License for the specific language governing permissions and limitations
# under the License.
"""
Handles all processes within the Guest VM, considering it as a Platform
The :py:class:`GuestManager` class is a :py:class:`nova.manager.Manager` that
handles RPC calls relating to Platform specific operations.
"""
import functools
import logging
from reddwarf.common import exception
from reddwarf.common import utils
from reddwarf.common import service
LOG = logging.getLogger(__name__)
class GuestAgent(object):
"""Task manager impl"""
class GuestManager(service.Manager):
def __init__(self, *args, **kwargs):
LOG.info("GuestAgent init %s %s" % (args, kwargs))
"""Manages the tasks within a Guest VM."""
def __init__(self, guest_drivers=None, *args, **kwargs):
if not guest_drivers:
#TODO(hub-cap): fix this, maybe make it a flag
guest_drivers = ['reddwarf.guestagent.dbaas.DBaaSAgent',
'reddwarf.guestagent.pkg.PkgAgent']
classes = []
for guest_driver in guest_drivers:
LOG.info(guest_driver)
driver = utils.import_class(guest_driver)
classes.append(driver)
try:
cls = type("GuestDriver", tuple(set(classes)), {})
self.driver = cls()
except TypeError as te:
msg = "An issue occurred instantiating the GuestDriver as the " \
"following classes: " + str(classes) + \
" Exception=" + str(te)
raise TypeError(msg)
super(GuestManager, self).__init__(*args, **kwargs)
def init_host(self):
"""Method for any service initialization"""
pass
def periodic_tasks(self, raise_on_error=False):
LOG.info("Launching a periodic task")
"""Method for running any periodic tasks.
def test_method(self, context):
LOG.info("test_method called with context %s" % context)
Right now does the status updates"""
status_method = "update_status"
try:
getattr(self.driver, status_method)()
except AttributeError as ae:
LOG.error("Method %s not found for driver %s", status_method,
self.driver)
if raise_on_error:
raise ae
def upgrade(self, context):
"""Upgrade the guest agent and restart the agent"""
LOG.debug(_("Self upgrade of guest agent issued"))
def __getattr__(self, key):
"""Converts all method calls and direct it at the driver"""
return functools.partial(self._mapper, key)
def _mapper(self, method, context, *args, **kwargs):
""" Tries to call the respective driver method """
try:
return getattr(self.driver, method)(*args, **kwargs)
except AttributeError:
LOG.error("Method %s not found for driver %s", method, self.driver)
raise exception.NotFound("Method not available for the "
"chosen driver")

210
reddwarf/guestagent/pkg.py Normal file
View File

@ -0,0 +1,210 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Manages packages on the Guest VM.
"""
import logging
import pexpect
from reddwarf.common import exception
LOG = logging.getLogger(__name__)
# FLAGS = flags.FLAGS
class PkgAdminLockError(exception.ReddwarfError):
pass
class PkgPermissionError(exception.ReddwarfError):
pass
class PkgPackageStateError(exception.ReddwarfError):
pass
class PkgNotFoundError(exception.NotFound):
pass
class PkgTimeout(exception.ReddwarfError):
pass
OK = 0
RUN_DPKG_FIRST = 1
REINSTALL_FIRST = 2
def kill_proc(child):
child.delayafterclose = 1
child.delayafterterminate = 1
child.close(force=True)
def wait_and_close_proc(child, time_out=-1):
child.expect(pexpect.EOF, timeout=time_out)
child.close()
class PkgAgent(object):
""" Agent Controller which can maintain package installs on a guest."""
def _fix(self, time_out):
"""Sometimes you have to run this command before a pkg will install."""
#sudo dpkg --configure -a
child = pexpect.spawn("sudo -E dpkg --configure -a")
wait_and_close_proc(child, time_out)
def _install(self, package_name, time_out):
"""Attempts to install a package.
Returns OK if the package installs fine or a result code if a
recoverable-error occurred.
Raises an exception if a non-recoverable error or time out occurs.
"""
child = pexpect.spawn("sudo -E DEBIAN_FRONTEND=noninteractive "
"apt-get -y --allow-unauthenticated install %s"
% package_name)
try:
i = child.expect(['.*password*',
'E: Unable to locate package %s' % package_name,
"Couldn't find package % s" % package_name,
"dpkg was interrupted, you must manually run "
"'sudo dpkg --configure -a'",
"Unable to lock the administration directory",
"Setting up %s*" % package_name,
"is already the newest version"],
timeout=time_out)
if i == 0:
raise PkgPermissionError("Invalid permissions.")
elif i == 1 or i == 2:
raise PkgNotFoundError("Could not find apt %s" % package_name)
elif i == 3:
return RUN_DPKG_FIRST
elif i == 4:
raise PkgAdminLockError()
wait_and_close_proc(child)
except pexpect.TIMEOUT:
kill_proc(child)
raise PkgTimeout("Process timeout after %i seconds." % time_out)
return OK
def _remove(self, package_name, time_out):
"""Removes a package.
Returns OK if the package is removed successfully or a result code if a
recoverable-error occurs.
Raises an exception if a non-recoverable error or time out occurs.
"""
child = pexpect.spawn("sudo -E apt-get -y --allow-unauthenticated "
"remove %s" % package_name)
try:
i = child.expect(['.*password*',
'E: Unable to locate package %s' % package_name,
'Package is in a very bad inconsistent state',
"Sub-process /usr/bin/dpkg returned an "
"error code",
"dpkg was interrupted, you must manually run "
"'sudo dpkg --configure -a'",
"Unable to lock the administration directory",
#'The following packages will be REMOVED',
"Removing %s*" % package_name],
timeout=time_out)
if i == 0:
raise PkgPermissionError("Invalid permissions.")
elif i == 1:
raise PkgNotFoundError("Could not find pkg %s" % package_name)
elif i == 2 or i == 3:
return REINSTALL_FIRST
elif i == 4:
return RUN_DPKG_FIRST
elif i == 5:
raise PkgAdminLockError()
wait_and_close_proc(child)
except pexpect.TIMEOUT:
kill_proc(child)
raise PkgTimeout("Process timeout after %i seconds." % time_out)
return OK
def pkg_install(self, package_name, time_out):
"""Installs a package."""
result = self._install(package_name, time_out)
if result != OK:
if result == RUN_DPKG_FIRST:
self._fix(time_out)
result = self._install(package_name, time_out)
if result != OK:
raise PkgPackageStateError("Package %s is in a bad state."
% package_name)
def pkg_version(self, package_name):
"""Returns the installed version of the given package.
It is sometimes impossible to know if a package is completely
unavailable before you attempt to install. Some packages may return
no information from the dpkg command but then install fine with apt-get
install.
"""
child = pexpect.spawn("dpkg -l %s" % package_name)
i = child.expect([".*No packages found matching*", "\+\+\+\-"])
if i == 0:
#raise PkgNotFoundError()
return None
# Need to capture the version string
child.expect("\n")
i = child.expect(["<none>", ".*"])
if i == 0:
return None
line = child.match.group()
parts = line.split()
# Should be something like:
# ['un', 'cowsay', '<none>', '(no', 'description', 'available)']
try:
wait_and_close_proc(child)
except pexpect.TIMEOUT:
kill_proc(child)
raise PkgTimeout("Remove process took too long.")
if len(parts) <= 2:
raise Error("Unexpected output.")
if parts[1] != package_name:
raise Error("Unexpected output:[1] == " + str(parts[1]))
if parts[0] == 'un' or parts[2] == '<none>':
return None
return parts[2]
def pkg_remove(self, package_name, time_out):
"""Removes a package."""
if self.pkg_version(package_name) == None:
return
result = self._remove(package_name, time_out)
if result != OK:
if result == REINSTALL_FIRST:
self._install(package_name, time_out)
elif result == RUN_DPKG_FIRST:
self._fix(time_out)
result = self._remove(package_name, time_out)
if result != OK:
raise PkgPackageStateError("Package %s is in a bad state."
% package_name)

View File

@ -0,0 +1,63 @@
# Copyright (c) 2011 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Set of utilities for the Guest Manager
"""
import fcntl
import socket
import struct
from nova import context
from nova import flags
from reddwarf.db import db_api as dbapi
# from nova.db import api as dbapi
flags.DEFINE_string('guest_ethernet_device', "eth0",
'Default Ethernet device for the guest agent')
FLAGS = flags.FLAGS
instance_id = None
def get_instance_id():
"""Return the instance id for this guest"""
global instance_id
if not instance_id:
# TODO(rnirmal): Better way to get the instance id
address = get_ipv4_address()
instance = dbapi.instance_get_by_fixed_ip(context.get_admin_context(),
address)
instance_id = instance.id
return instance_id
def get_ipv4_address():
""" Get the ip address provided an ethernet device"""
# Create an IPV4 (AF_INET) datagram socket
soc = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# fcntl is a system fuction that takes in the socket file descriptor,
# 0x8915 = SIOCGIFADDR which is an os call passed to ioctl which returns
# the list of interface addresses.
# struct.pack, packs the ethernet device string into a binary buffer
return socket.inet_ntoa(fcntl.ioctl(soc.fileno(), 0x8915,
struct.pack('256s',
FLAGS.guest_ethernet_device[:15])
)[20:24])