Use database name validation only on listing and loading of databases
* We don't need to validate the database names in the response of a 'list_databases' call, since they may have been created via the root_enabled backdoor fixes bug 1178089 Change-Id: I2f74e63cfd8b78feec9c38b5fec75138245a7f64
This commit is contained in:
parent
2ab0a5d488
commit
000744d3d6
@ -17,7 +17,7 @@ from reddwarf.guestagent.db import models as guest_models
|
||||
from urllib import unquote
|
||||
|
||||
|
||||
def populate_databases(dbs):
|
||||
def populate_validated_databases(dbs):
|
||||
"""
|
||||
Create a serializable request with user provided data
|
||||
for creating new databases.
|
||||
@ -25,7 +25,7 @@ def populate_databases(dbs):
|
||||
try:
|
||||
databases = []
|
||||
for database in dbs:
|
||||
mydb = guest_models.MySQLDatabase()
|
||||
mydb = guest_models.ValidatedMySQLDatabase()
|
||||
mydb.name = database.get('name', '')
|
||||
mydb.character_set = database.get('character_set', '')
|
||||
mydb.collate = database.get('collate', '')
|
||||
|
@ -20,7 +20,7 @@ import webob.exc
|
||||
from reddwarf.common import exception
|
||||
from reddwarf.common import pagination
|
||||
from reddwarf.common import wsgi
|
||||
from reddwarf.extensions.mysql.common import populate_databases
|
||||
from reddwarf.extensions.mysql.common import populate_validated_databases
|
||||
from reddwarf.extensions.mysql.common import populate_users
|
||||
from reddwarf.extensions.mysql.common import unquote_user_host
|
||||
from reddwarf.extensions.mysql import models
|
||||
@ -261,7 +261,7 @@ class SchemaController(wsgi.Controller):
|
||||
context = req.environ[wsgi.CONTEXT_KEY]
|
||||
self.validate(body)
|
||||
schemas = body['databases']
|
||||
model_schemas = populate_databases(schemas)
|
||||
model_schemas = populate_validated_databases(schemas)
|
||||
models.Schema.create(context, instance_id, model_schemas)
|
||||
return wsgi.Result(None, 202)
|
||||
|
||||
@ -270,7 +270,7 @@ class SchemaController(wsgi.Controller):
|
||||
LOG.info(_("req : '%s'\n\n") % req)
|
||||
context = req.environ[wsgi.CONTEXT_KEY]
|
||||
try:
|
||||
schema = guest_models.MySQLDatabase()
|
||||
schema = guest_models.ValidatedMySQLDatabase()
|
||||
schema.name = id
|
||||
models.Schema.delete(context, instance_id, schema.serialize())
|
||||
except (ValueError, AttributeError) as e:
|
||||
|
@ -285,16 +285,7 @@ class MySQLDatabase(Base):
|
||||
|
||||
@name.setter
|
||||
def name(self, value):
|
||||
if any([not value,
|
||||
not self._is_valid(value),
|
||||
not self.dbname.match(value),
|
||||
string.find("%r" % value, "\\") != -1]):
|
||||
raise ValueError("'%s' is not a valid database name" % value)
|
||||
elif len(value) > 64:
|
||||
msg = "Database name '%s' is too long. Max length = 64"
|
||||
raise ValueError(msg % value)
|
||||
else:
|
||||
self._name = value
|
||||
self._name = value
|
||||
|
||||
@property
|
||||
def collate(self):
|
||||
@ -341,6 +332,21 @@ class MySQLDatabase(Base):
|
||||
self._character_set = value
|
||||
|
||||
|
||||
class ValidatedMySQLDatabase(MySQLDatabase):
|
||||
@MySQLDatabase.name.setter
|
||||
def name(self, value):
|
||||
if any([not value,
|
||||
not self._is_valid(value),
|
||||
not self.dbname.match(value),
|
||||
string.find("%r" % value, "\\") != -1]):
|
||||
raise ValueError("'%s' is not a valid database name" % value)
|
||||
elif len(value) > 64:
|
||||
msg = "Database name '%s' is too long. Max length = 64"
|
||||
raise ValueError(msg % value)
|
||||
else:
|
||||
self._name = value
|
||||
|
||||
|
||||
class MySQLUser(Base):
|
||||
"""Represents a MySQL User and its associated properties"""
|
||||
|
||||
@ -424,7 +430,7 @@ class MySQLUser(Base):
|
||||
|
||||
@databases.setter
|
||||
def databases(self, value):
|
||||
mydb = MySQLDatabase()
|
||||
mydb = ValidatedMySQLDatabase()
|
||||
mydb.name = value
|
||||
self._databases.append(mydb.serialize())
|
||||
|
||||
|
@ -321,7 +321,7 @@ class MySqlAdmin(object):
|
||||
"""Create the list of specified databases"""
|
||||
with LocalSqlClient(get_engine()) as client:
|
||||
for item in databases:
|
||||
mydb = models.MySQLDatabase()
|
||||
mydb = models.ValidatedMySQLDatabase()
|
||||
mydb.deserialize(item)
|
||||
cd = query.CreateDatabase(mydb.name,
|
||||
mydb.character_set,
|
||||
@ -343,7 +343,7 @@ class MySqlAdmin(object):
|
||||
t = text(str(g))
|
||||
client.execute(t)
|
||||
for database in user.databases:
|
||||
mydb = models.MySQLDatabase()
|
||||
mydb = models.ValidatedMySQLDatabase()
|
||||
mydb.deserialize(database)
|
||||
g = query.Grant(permissions='ALL', database=mydb.name,
|
||||
user=user.name, host=user.host,
|
||||
@ -354,7 +354,7 @@ class MySqlAdmin(object):
|
||||
def delete_database(self, database):
|
||||
"""Delete the specified database"""
|
||||
with LocalSqlClient(get_engine()) as client:
|
||||
mydb = models.MySQLDatabase()
|
||||
mydb = models.ValidatedMySQLDatabase()
|
||||
mydb.deserialize(database)
|
||||
dd = query.DropDatabase(mydb.name)
|
||||
t = text(str(dd))
|
||||
|
@ -22,7 +22,7 @@ from reddwarf.common import exception
|
||||
from reddwarf.common import pagination
|
||||
from reddwarf.common import utils
|
||||
from reddwarf.common import wsgi
|
||||
from reddwarf.extensions.mysql.common import populate_databases
|
||||
from reddwarf.extensions.mysql.common import populate_validated_databases
|
||||
from reddwarf.extensions.mysql.common import populate_users
|
||||
from reddwarf.instance import models, views
|
||||
from reddwarf.backup.models import Backup as backup_model
|
||||
@ -190,7 +190,8 @@ class InstanceController(wsgi.Controller):
|
||||
name = body['instance']['name']
|
||||
flavor_ref = body['instance']['flavorRef']
|
||||
flavor_id = utils.get_id_from_href(flavor_ref)
|
||||
databases = populate_databases(body['instance'].get('databases', []))
|
||||
databases = populate_validated_databases(
|
||||
body['instance'].get('databases', []))
|
||||
users = None
|
||||
try:
|
||||
users = populate_users(body['instance'].get('users', []))
|
||||
|
@ -21,7 +21,7 @@ class MySQLDatabaseTest(testtools.TestCase):
|
||||
def setUp(self):
|
||||
super(MySQLDatabaseTest, self).setUp()
|
||||
|
||||
self.mysqlDb = dbmodels.MySQLDatabase()
|
||||
self.mysqlDb = dbmodels.ValidatedMySQLDatabase()
|
||||
self.origin_ignore_db = self.mysqlDb._ignore_dbs
|
||||
self.mysqlDb._ignore_dbs = ['mysql']
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user