merged orm branch

This commit is contained in:
Vishvananda Ishaya
2010-09-02 14:21:41 -07:00
7 changed files with 115 additions and 95 deletions

View File

@@ -52,41 +52,46 @@ class NoMoreNetworks(exception.Error):
################### ###################
def daemon_get(context, daemon_id): def service_destroy(context, instance_id):
"""Get an daemon or raise if it does not exist.""" """Destroy the service or raise if it does not exist."""
return IMPL.daemon_get(context, daemon_id) return IMPL.service_destroy(context, instance_id)
def daemon_get_by_args(context, host, binary): def service_get(context, service_id):
"""Get the state of an daemon by node name and binary.""" """Get an service or raise if it does not exist."""
return IMPL.daemon_get_by_args(context, host, binary) return IMPL.service_get(context, service_id)
def daemon_get_all_by_topic(context, topic): def service_get_all_by_topic(context, topic):
"""Get all compute daemons for a given topi """ """Get all compute services for a given topic """
return IMPL.daemon_get_all_by_topic(context, topic) return IMPL.service_get_all_by_topic(context, topic)
def daemon_get_all_compute_sorted(context): def service_get_all_compute_sorted(context):
"""Get all compute daemons sorted by instance count """Get all compute services sorted by instance count
Returns a list of (Daemon, instance_count) tuples Returns a list of (Service, instance_count) tuples
""" """
return IMPL.daemon_get_all_compute_sorted(context) return IMPL.service_get_all_compute_sorted(context)
def daemon_create(context, values): def service_get_by_args(context, host, binary):
"""Create a daemon from the values dictionary.""" """Get the state of an service by node name and binary."""
return IMPL.daemon_create(context, values) return IMPL.service_get_by_args(context, host, binary)
def daemon_update(context, daemon_id, values): def service_create(context, values):
"""Set the given properties on an daemon and update it. """Create a service from the values dictionary."""
return IMPL.service_create(context, values)
Raises NotFound if daemon does not exist.
def service_update(context, service_id, values):
"""Set the given properties on an service and update it.
Raises NotFound if service does not exist.
""" """
return IMPL.daemon_update(context, daemon_id, values) return IMPL.service_update(context, service_id, values)
################### ###################

View File

@@ -36,27 +36,27 @@ FLAGS = flags.FLAGS
################### ###################
def daemon_get(_context, daemon_id): def service_destroy(context, service_id):
return models.Daemon.find(daemon_id) service_ref = service_get(context, service_id)
service_ref.delete()
def service_get(_context, service_id):
return models.Service.find(service_id)
def daemon_get_by_args(_context, host, binary): def service_get_all_by_topic(context, topic):
return models.Daemon.find_by_args(host, binary)
def daemon_get_all_by_topic(context, topic):
with managed_session() as session: with managed_session() as session:
return session.query(models.Daemon) \ return session.query(models.Service) \
.filter_by(deleted=False) \ .filter_by(deleted=False) \
.filter_by(topic=topic) \ .filter_by(topic=topic) \
.all() .all()
def daemon_get_all_compute_sorted(_context): def service_get_all_compute_sorted(_context):
with managed_session() as session: with managed_session() as session:
# NOTE(vish): The intended query is below # NOTE(vish): The intended query is below
# SELECT daemons.*, inst_count.instance_count # SELECT services.*, inst_count.instance_count
# FROM daemons LEFT OUTER JOIN # FROM services LEFT OUTER JOIN
# (SELECT host, count(*) AS instance_count # (SELECT host, count(*) AS instance_count
# FROM instances GROUP BY host) AS inst_count # FROM instances GROUP BY host) AS inst_count
subq = session.query(models.Instance.host, subq = session.query(models.Instance.host,
@@ -65,27 +65,31 @@ def daemon_get_all_compute_sorted(_context):
.group_by(models.Instance.host) \ .group_by(models.Instance.host) \
.subquery() .subquery()
topic = 'compute' topic = 'compute'
return session.query(models.Daemon, subq.c.instance_count) \ return session.query(models.Service, subq.c.instance_count) \
.filter_by(topic=topic) \ .filter_by(topic=topic) \
.filter_by(deleted=False) \ .filter_by(deleted=False) \
.outerjoin((subq, models.Daemon.host == subq.c.host)) \ .outerjoin((subq, models.Service.host == subq.c.host)) \
.order_by(subq.c.instance_count) \ .order_by(subq.c.instance_count) \
.all() .all()
def daemon_create(_context, values): def service_get_by_args(_context, host, binary):
daemon_ref = models.Daemon() return models.Service.find_by_args(host, binary)
for (key, value) in values.iteritems():
daemon_ref[key] = value
daemon_ref.save()
return daemon_ref.id
def daemon_update(context, daemon_id, values): def service_create(_context, values):
daemon_ref = daemon_get(context, daemon_id) service_ref = models.Service()
for (key, value) in values.iteritems(): for (key, value) in values.iteritems():
daemon_ref[key] = value service_ref[key] = value
daemon_ref.save() service_ref.save()
return service_ref.id
def service_update(context, service_id, values):
service_ref = service_get(context, service_id)
for (key, value) in values.iteritems():
service_ref[key] = value
service_ref.save()
################### ###################

View File

@@ -160,9 +160,9 @@ class Host(BASE, NovaBase):
id = Column(String(255), primary_key=True) id = Column(String(255), primary_key=True)
class Daemon(BASE, NovaBase): class Service(BASE, NovaBase):
"""Represents a running service on a host""" """Represents a running service on a host"""
__tablename__ = 'daemons' __tablename__ = 'services'
id = Column(Integer, primary_key=True) id = Column(Integer, primary_key=True)
host = Column(String(255), ForeignKey('hosts.id')) host = Column(String(255), ForeignKey('hosts.id'))
binary = Column(String(255)) binary = Column(String(255))
@@ -392,7 +392,7 @@ class FloatingIp(BASE, NovaBase):
def register_models(): def register_models():
"""Register Models and create metadata""" """Register Models and create metadata"""
from sqlalchemy import create_engine from sqlalchemy import create_engine
models = (Image, Host, Daemon, Instance, Volume, ExportDevice, models = (Image, Host, Service, Instance, Volume, ExportDevice,
FixedIp, FloatingIp, Network, NetworkIndex) FixedIp, FloatingIp, Network, NetworkIndex)
engine = create_engine(FLAGS.sql_connection, echo=False) engine = create_engine(FLAGS.sql_connection, echo=False)
for model in models: for model in models:

View File

@@ -18,7 +18,7 @@
""" """
Proxy AMI-related calls from the cloud controller, to the running Proxy AMI-related calls from the cloud controller, to the running
objectstore daemon. objectstore service.
""" """
import json import json

View File

@@ -17,11 +17,11 @@
# under the License. # under the License.
""" """
Base functionality for nova daemons - gradually being replaced with twistd.py. Base functionality for nova services - gradually being replaced with twistd.py.
""" """
import daemon import service
from daemon import pidlockfile from service import pidlockfile
import logging import logging
import logging.handlers import logging.handlers
import os import os
@@ -33,14 +33,14 @@ from nova import flags
FLAGS = flags.FLAGS FLAGS = flags.FLAGS
flags.DEFINE_bool('daemonize', False, 'daemonize this process') flags.DEFINE_bool('serviceize', False, 'serviceize this process')
# NOTE(termie): right now I am defaulting to using syslog when we daemonize # NOTE(termie): right now I am defaulting to using syslog when we serviceize
# it may be better to do something else -shrug- # it may be better to do something else -shrug-
# NOTE(Devin): I think we should let each process have its own log file # NOTE(Devin): I think we should let each process have its own log file
# and put it in /var/logs/nova/(appname).log # and put it in /var/logs/nova/(appname).log
# This makes debugging much easier and cuts down on sys log # This makes debugging much easier and cuts down on sys log
# clutter. # clutter.
flags.DEFINE_bool('use_syslog', True, 'output to syslog when daemonizing') flags.DEFINE_bool('use_syslog', True, 'output to syslog when serviceizing')
flags.DEFINE_string('logfile', None, 'log file to output to') flags.DEFINE_string('logfile', None, 'log file to output to')
flags.DEFINE_string('pidfile', None, 'pid file to output to') flags.DEFINE_string('pidfile', None, 'pid file to output to')
flags.DEFINE_string('working_directory', './', 'working directory...') flags.DEFINE_string('working_directory', './', 'working directory...')
@@ -50,17 +50,17 @@ flags.DEFINE_integer('gid', os.getgid(), 'gid under which to run')
def stop(pidfile): def stop(pidfile):
""" """
Stop the daemon Stop the service
""" """
# Get the pid from the pidfile # Get the pid from the pidfile
try: try:
pid = int(open(pidfile,'r').read().strip()) pid = int(open(pidfile,'r').read().strip())
except IOError: except IOError:
message = "pidfile %s does not exist. Daemon not running?\n" message = "pidfile %s does not exist. Service not running?\n"
sys.stderr.write(message % pidfile) sys.stderr.write(message % pidfile)
return # not an error in a restart return # not an error in a restart
# Try killing the daemon process # Try killing the service process
try: try:
while 1: while 1:
os.kill(pid, signal.SIGTERM) os.kill(pid, signal.SIGTERM)
@@ -100,13 +100,13 @@ def serve(name, main):
else: else:
print 'usage: %s [options] [start|stop|restart]' % argv[0] print 'usage: %s [options] [start|stop|restart]' % argv[0]
sys.exit(1) sys.exit(1)
daemonize(argv, name, main) serviceize(argv, name, main)
def daemonize(args, name, main): def serviceize(args, name, main):
"""Does the work of daemonizing the process""" """Does the work of serviceizing the process"""
logging.getLogger('amqplib').setLevel(logging.WARN) logging.getLogger('amqplib').setLevel(logging.WARN)
if FLAGS.daemonize: if FLAGS.serviceize:
logger = logging.getLogger() logger = logging.getLogger()
formatter = logging.Formatter( formatter = logging.Formatter(
name + '(%(name)s): %(levelname)s %(message)s') name + '(%(name)s): %(levelname)s %(message)s')
@@ -129,8 +129,8 @@ def daemonize(args, name, main):
else: else:
logging.getLogger().setLevel(logging.WARNING) logging.getLogger().setLevel(logging.WARNING)
with daemon.DaemonContext( with service.ServiceContext(
detach_process=FLAGS.daemonize, detach_process=FLAGS.serviceize,
working_directory=FLAGS.working_directory, working_directory=FLAGS.working_directory,
pidfile=pidlockfile.TimeoutPIDLockFile(FLAGS.pidfile, pidfile=pidlockfile.TimeoutPIDLockFile(FLAGS.pidfile,
acquire_timeout=1, acquire_timeout=1,

View File

@@ -52,6 +52,16 @@ class Service(object, service.Service):
self.manager = manager_class(host=host, *args, **kwargs) self.manager = manager_class(host=host, *args, **kwargs)
self.model_disconnected = False self.model_disconnected = False
super(Service, self).__init__(*args, **kwargs) super(Service, self).__init__(*args, **kwargs)
try:
service_ref = db.service_get_by_args(None,
self.host,
self.binary)
self.service_id = service_ref['id']
except exception.NotFound:
self.service_id = db.service_create(None, {'host': self.host,
'binary': self.binary,
'topic': self.topic,
'report_count': 0})
def __getattr__(self, key): def __getattr__(self, key):
try: try:
@@ -110,24 +120,25 @@ class Service(object, service.Service):
service_obj.setServiceParent(application) service_obj.setServiceParent(application)
return application return application
@defer.inlineCallbacks def kill(self, context=None):
def report_state(self, context=None): """Destroy the service object in the datastore"""
"""Update the state of this daemon in the datastore."""
try: try:
try: service_ref = db.service_get_by_args(context,
daemon_ref = db.daemon_get_by_args(context,
self.host, self.host,
self.binary) self.binary)
daemon_id = daemon_ref['id'] service_id = service_ref['id']
db.service_destroy(context, self.service_id)
except exception.NotFound: except exception.NotFound:
daemon_id = db.daemon_create(context, {'host': self.host, logging.warn("Service killed that has no database entry")
'binary': self.binary,
'topic': self.topic, @defer.inlineCallbacks
'report_count': 0}) def report_state(self, context=None):
daemon_ref = db.daemon_get(context, daemon_id) """Update the state of this service in the datastore."""
db.daemon_update(context, try:
daemon_id, service_ref = db.service_get(context, self.service_id)
{'report_count': daemon_ref['report_count'] + 1}) db.service_update(context,
self.service_id,
{'report_count': service_ref['report_count'] + 1})
# TODO(termie): make this pattern be more elegant. # TODO(termie): make this pattern be more elegant.
if getattr(self, "model_disconnected", False): if getattr(self, "model_disconnected", False):

View File

@@ -84,40 +84,40 @@ class ServiceTestCase(test.BaseTestCase):
def test_report_state(self): def test_report_state(self):
host = 'foo' host = 'foo'
binary = 'bar' binary = 'bar'
daemon_ref = {'host': host, service_ref = {'host': host,
'binary': binary, 'binary': binary,
'report_count': 0, 'report_count': 0,
'id': 1} 'id': 1}
service.db.__getattr__('report_state') service.db.__getattr__('report_state')
service.db.daemon_get_by_args(None, service.db.service_get_by_args(None,
host, host,
binary).AndReturn(daemon_ref) binary).AndReturn(service_ref)
service.db.daemon_update(None, daemon_ref['id'], service.db.service_update(None, service_ref['id'],
mox.ContainsKeyValue('report_count', 1)) mox.ContainsKeyValue('report_count', 1))
self.mox.ReplayAll() self.mox.ReplayAll()
s = service.Service() s = service.Service()
rv = yield s.report_state(host, binary) rv = yield s.report_state(host, binary)
def test_report_state_no_daemon(self): def test_report_state_no_service(self):
host = 'foo' host = 'foo'
binary = 'bar' binary = 'bar'
daemon_create = {'host': host, service_create = {'host': host,
'binary': binary, 'binary': binary,
'report_count': 0} 'report_count': 0}
daemon_ref = {'host': host, service_ref = {'host': host,
'binary': binary, 'binary': binary,
'report_count': 0, 'report_count': 0,
'id': 1} 'id': 1}
service.db.__getattr__('report_state') service.db.__getattr__('report_state')
service.db.daemon_get_by_args(None, service.db.service_get_by_args(None,
host, host,
binary).AndRaise(exception.NotFound()) binary).AndRaise(exception.NotFound())
service.db.daemon_create(None, service.db.service_create(None,
daemon_create).AndReturn(daemon_ref['id']) service_create).AndReturn(service_ref['id'])
service.db.daemon_get(None, daemon_ref['id']).AndReturn(daemon_ref) service.db.service_get(None, service_ref['id']).AndReturn(service_ref)
service.db.daemon_update(None, daemon_ref['id'], service.db.service_update(None, service_ref['id'],
mox.ContainsKeyValue('report_count', 1)) mox.ContainsKeyValue('report_count', 1))
self.mox.ReplayAll() self.mox.ReplayAll()
@@ -127,13 +127,13 @@ class ServiceTestCase(test.BaseTestCase):
def test_report_state_newly_disconnected(self): def test_report_state_newly_disconnected(self):
host = 'foo' host = 'foo'
binary = 'bar' binary = 'bar'
daemon_ref = {'host': host, service_ref = {'host': host,
'binary': binary, 'binary': binary,
'report_count': 0, 'report_count': 0,
'id': 1} 'id': 1}
service.db.__getattr__('report_state') service.db.__getattr__('report_state')
service.db.daemon_get_by_args(None, service.db.service_get_by_args(None,
host, host,
binary).AndRaise(Exception()) binary).AndRaise(Exception())
@@ -146,16 +146,16 @@ class ServiceTestCase(test.BaseTestCase):
def test_report_state_newly_connected(self): def test_report_state_newly_connected(self):
host = 'foo' host = 'foo'
binary = 'bar' binary = 'bar'
daemon_ref = {'host': host, service_ref = {'host': host,
'binary': binary, 'binary': binary,
'report_count': 0, 'report_count': 0,
'id': 1} 'id': 1}
service.db.__getattr__('report_state') service.db.__getattr__('report_state')
service.db.daemon_get_by_args(None, service.db.service_get_by_args(None,
host, host,
binary).AndReturn(daemon_ref) binary).AndReturn(service_ref)
service.db.daemon_update(None, daemon_ref['id'], service.db.service_update(None, service_ref['id'],
mox.ContainsKeyValue('report_count', 1)) mox.ContainsKeyValue('report_count', 1))
self.mox.ReplayAll() self.mox.ReplayAll()