Lots of fixes to make the nova commands work properly and make datamodel work with mysql properly
This commit is contained in:
@@ -41,6 +41,7 @@ from nova.endpoint import images
|
||||
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
flags.DECLARE('storage_availability_zone', 'nova.volume.manager')
|
||||
|
||||
|
||||
def _gen_key(user_id, key_name):
|
||||
@@ -262,11 +263,11 @@ class CloudController(object):
|
||||
volume['mountpoint'])
|
||||
if volume['attach_status'] == 'attached':
|
||||
v['attachmentSet'] = [{'attachTime': volume['attach_time'],
|
||||
'deleteOnTermination': volume['delete_on_termination'],
|
||||
'deleteOnTermination': False,
|
||||
'device': volume['mountpoint'],
|
||||
'instanceId': volume['instance_id'],
|
||||
'status': 'attached',
|
||||
'volume_id': volume['volume_id']}]
|
||||
'volume_id': volume['str_id']}]
|
||||
else:
|
||||
v['attachmentSet'] = [{}]
|
||||
return v
|
||||
@@ -293,7 +294,7 @@ class CloudController(object):
|
||||
def attach_volume(self, context, volume_id, instance_id, device, **kwargs):
|
||||
volume_ref = db.volume_get_by_str(context, volume_id)
|
||||
# TODO(vish): abstract status checking?
|
||||
if volume_ref['status'] == "attached":
|
||||
if volume_ref['attach_status'] == "attached":
|
||||
raise exception.ApiError("Volume is already attached")
|
||||
#volume.start_attach(instance_id, device)
|
||||
instance_ref = db.instance_get_by_str(context, instance_id)
|
||||
@@ -306,7 +307,7 @@ class CloudController(object):
|
||||
"mountpoint": device}})
|
||||
return defer.succeed({'attachTime': volume_ref['attach_time'],
|
||||
'device': volume_ref['mountpoint'],
|
||||
'instanceId': instance_ref['id_str'],
|
||||
'instanceId': instance_ref['id'],
|
||||
'requestId': context.request_id,
|
||||
'status': volume_ref['attach_status'],
|
||||
'volumeId': volume_ref['id']})
|
||||
@@ -334,7 +335,7 @@ class CloudController(object):
|
||||
db.volume_detached(context)
|
||||
return defer.succeed({'attachTime': volume_ref['attach_time'],
|
||||
'device': volume_ref['mountpoint'],
|
||||
'instanceId': instance_ref['id_str'],
|
||||
'instanceId': instance_ref['str_id'],
|
||||
'requestId': context.request_id,
|
||||
'status': volume_ref['attach_status'],
|
||||
'volumeId': volume_ref['id']})
|
||||
|
||||
@@ -18,9 +18,10 @@
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Process pool, still buggy right now.
|
||||
Process pool using twisted threading
|
||||
"""
|
||||
|
||||
import logging
|
||||
import StringIO
|
||||
|
||||
from twisted.internet import defer
|
||||
@@ -29,30 +30,14 @@ from twisted.internet import protocol
|
||||
from twisted.internet import reactor
|
||||
|
||||
from nova import flags
|
||||
from nova.utils import ProcessExecutionError
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
flags.DEFINE_integer('process_pool_size', 4,
|
||||
'Number of processes to use in the process pool')
|
||||
|
||||
|
||||
# NOTE(termie): this is copied from twisted.internet.utils but since
|
||||
# they don't export it I've copied and modified
|
||||
class UnexpectedErrorOutput(IOError):
|
||||
"""
|
||||
Standard error data was received where it was not expected. This is a
|
||||
subclass of L{IOError} to preserve backward compatibility with the previous
|
||||
error behavior of L{getProcessOutput}.
|
||||
|
||||
@ivar processEnded: A L{Deferred} which will fire when the process which
|
||||
produced the data on stderr has ended (exited and all file descriptors
|
||||
closed).
|
||||
"""
|
||||
def __init__(self, stdout=None, stderr=None):
|
||||
IOError.__init__(self, "got stdout: %r\nstderr: %r" % (stdout, stderr))
|
||||
|
||||
|
||||
# This is based on _BackRelay from twister.internal.utils, but modified to
|
||||
# capture both stdout and stderr, without odd stderr handling, and also to
|
||||
# This is based on _BackRelay from twister.internal.utils, but modified to
|
||||
# capture both stdout and stderr, without odd stderr handling, and also to
|
||||
# handle stdin
|
||||
class BackRelayWithInput(protocol.ProcessProtocol):
|
||||
"""
|
||||
@@ -62,22 +47,23 @@ class BackRelayWithInput(protocol.ProcessProtocol):
|
||||
@ivar deferred: A L{Deferred} which will be called back with all of stdout
|
||||
and all of stderr as well (as a tuple). C{terminate_on_stderr} is true
|
||||
and any bytes are received over stderr, this will fire with an
|
||||
L{_UnexpectedErrorOutput} instance and the attribute will be set to
|
||||
L{_ProcessExecutionError} instance and the attribute will be set to
|
||||
C{None}.
|
||||
|
||||
@ivar onProcessEnded: If C{terminate_on_stderr} is false and bytes are
|
||||
received over stderr, this attribute will refer to a L{Deferred} which
|
||||
will be called back when the process ends. This C{Deferred} is also
|
||||
associated with the L{_UnexpectedErrorOutput} which C{deferred} fires
|
||||
with earlier in this case so that users can determine when the process
|
||||
@ivar onProcessEnded: If C{terminate_on_stderr} is false and bytes are
|
||||
received over stderr, this attribute will refer to a L{Deferred} which
|
||||
will be called back when the process ends. This C{Deferred} is also
|
||||
associated with the L{_ProcessExecutionError} which C{deferred} fires
|
||||
with earlier in this case so that users can determine when the process
|
||||
has actually ended, in addition to knowing when bytes have been received
|
||||
via stderr.
|
||||
"""
|
||||
|
||||
def __init__(self, deferred, started_deferred=None,
|
||||
terminate_on_stderr=False, check_exit_code=True,
|
||||
process_input=None):
|
||||
def __init__(self, deferred, cmd, started_deferred=None,
|
||||
terminate_on_stderr=False, check_exit_code=True,
|
||||
process_input=None):
|
||||
self.deferred = deferred
|
||||
self.cmd = cmd
|
||||
self.stdout = StringIO.StringIO()
|
||||
self.stderr = StringIO.StringIO()
|
||||
self.started_deferred = started_deferred
|
||||
@@ -85,14 +71,18 @@ class BackRelayWithInput(protocol.ProcessProtocol):
|
||||
self.check_exit_code = check_exit_code
|
||||
self.process_input = process_input
|
||||
self.on_process_ended = None
|
||||
|
||||
|
||||
def _build_execution_error(self, exit_code=None):
|
||||
return ProcessExecutionError(cmd=self.cmd,
|
||||
exit_code=exit_code,
|
||||
stdout=self.stdout.getvalue(),
|
||||
stderr=self.stderr.getvalue())
|
||||
|
||||
def errReceived(self, text):
|
||||
self.stderr.write(text)
|
||||
if self.terminate_on_stderr and (self.deferred is not None):
|
||||
self.on_process_ended = defer.Deferred()
|
||||
self.deferred.errback(UnexpectedErrorOutput(
|
||||
stdout=self.stdout.getvalue(),
|
||||
stderr=self.stderr.getvalue()))
|
||||
self.deferred.errback(self._build_execution_error())
|
||||
self.deferred = None
|
||||
self.transport.loseConnection()
|
||||
|
||||
@@ -102,15 +92,19 @@ class BackRelayWithInput(protocol.ProcessProtocol):
|
||||
def processEnded(self, reason):
|
||||
if self.deferred is not None:
|
||||
stdout, stderr = self.stdout.getvalue(), self.stderr.getvalue()
|
||||
try:
|
||||
if self.check_exit_code:
|
||||
reason.trap(error.ProcessDone)
|
||||
self.deferred.callback((stdout, stderr))
|
||||
except:
|
||||
# NOTE(justinsb): This logic is a little suspicious to me...
|
||||
# If the callback throws an exception, then errback will be
|
||||
# called also. However, this is what the unit tests test for...
|
||||
self.deferred.errback(UnexpectedErrorOutput(stdout, stderr))
|
||||
exit_code = reason.value.exitCode
|
||||
if self.check_exit_code and exit_code <> 0:
|
||||
self.deferred.errback(self._build_execution_error(exit_code))
|
||||
else:
|
||||
try:
|
||||
if self.check_exit_code:
|
||||
reason.trap(error.ProcessDone)
|
||||
self.deferred.callback((stdout, stderr))
|
||||
except:
|
||||
# NOTE(justinsb): This logic is a little suspicious to me...
|
||||
# If the callback throws an exception, then errback will be
|
||||
# called also. However, this is what the unit tests test for...
|
||||
self.deferred.errback(self._build_execution_error(exit_code))
|
||||
elif self.on_process_ended is not None:
|
||||
self.on_process_ended.errback(reason)
|
||||
|
||||
@@ -122,8 +116,8 @@ class BackRelayWithInput(protocol.ProcessProtocol):
|
||||
self.transport.write(self.process_input)
|
||||
self.transport.closeStdin()
|
||||
|
||||
def get_process_output(executable, args=None, env=None, path=None,
|
||||
process_reactor=None, check_exit_code=True,
|
||||
def get_process_output(executable, args=None, env=None, path=None,
|
||||
process_reactor=None, check_exit_code=True,
|
||||
process_input=None, started_deferred=None,
|
||||
terminate_on_stderr=False):
|
||||
if process_reactor is None:
|
||||
@@ -131,10 +125,15 @@ def get_process_output(executable, args=None, env=None, path=None,
|
||||
args = args and args or ()
|
||||
env = env and env and {}
|
||||
deferred = defer.Deferred()
|
||||
cmd = executable
|
||||
if args:
|
||||
cmd = cmd + " " + ' '.join(args)
|
||||
logging.debug("Running cmd: %s", cmd)
|
||||
process_handler = BackRelayWithInput(
|
||||
deferred,
|
||||
started_deferred=started_deferred,
|
||||
check_exit_code=check_exit_code,
|
||||
deferred,
|
||||
cmd,
|
||||
started_deferred=started_deferred,
|
||||
check_exit_code=check_exit_code,
|
||||
process_input=process_input,
|
||||
terminate_on_stderr=terminate_on_stderr)
|
||||
# NOTE(vish): commands come in as unicode, but self.executes needs
|
||||
@@ -142,7 +141,7 @@ def get_process_output(executable, args=None, env=None, path=None,
|
||||
executable = str(executable)
|
||||
if not args is None:
|
||||
args = [str(x) for x in args]
|
||||
process_reactor.spawnProcess( process_handler, executable,
|
||||
process_reactor.spawnProcess( process_handler, executable,
|
||||
(executable,)+tuple(args), env, path)
|
||||
return deferred
|
||||
|
||||
|
||||
Reference in New Issue
Block a user