diff --git a/etc/namos.conf b/etc/namos.conf index 4e24ccd..e416c87 100644 --- a/etc/namos.conf +++ b/etc/namos.conf @@ -10,4 +10,4 @@ rabbit_hosts = 172.241.0.101 connection = mysql+pymysql://root:password@172.241.0.101/namos?charset=utf8 [conductor] -workers=10 \ No newline at end of file +workers=1 \ No newline at end of file diff --git a/namos/cmd/conductor.py b/namos/cmd/conductor.py index 9cffa51..32ab4a4 100644 --- a/namos/cmd/conductor.py +++ b/namos/cmd/conductor.py @@ -50,7 +50,8 @@ def main(): launcher = os_service.launch(CONF, mgr, CONF.conductor.workers) # TODO(mrkanag) Namos is not registering the RPC backend, fix it ! - # namos.register_myself() + import os_namos + os_namos.register_myself() launcher.wait() diff --git a/namos/common/exception.py b/namos/common/exception.py index d9da06f..403bd0e 100644 --- a/namos/common/exception.py +++ b/namos/common/exception.py @@ -63,6 +63,12 @@ class NotFound(NamosException): http_status_code = 404 +class AlreadyExist(NamosException): + msg_fmt = ("%(model)s %(name)s already exists") + error_code = 0x01002 + http_status_code = 403 + + class RegionNotFound(NotFound): msg_fmt = ("Region %(region_id)s does not found") error_code = 0x01001 diff --git a/namos/conductor/manager.py b/namos/conductor/manager.py index 7e929ea..559a0bb 100644 --- a/namos/conductor/manager.py +++ b/namos/conductor/manager.py @@ -129,12 +129,6 @@ class ServiceProcessor(object): def process_service(self, context): # Service Node try: - # TODO(mrkanag) is this to be region specifc search - node = db_api.service_node_get_by_name( - context, - self.registration_info.get('fqdn')) - LOG.info('Service node %s is existing' % node) - except exception.ServiceNodeNotFound: # TODO(mrkanag) region_id is hard-coded, fix it ! # user proper node name instead of fqdn node = db_api.service_node_create( @@ -144,14 +138,15 @@ class ServiceProcessor(object): region_id='f7dcd175-27ef-46b5-997f-e6e572f320b0')) LOG.info('Service node %s is created' % node) + except exception.AlreadyExist: + # TODO(mrkanag) is this to be region specifc search + node = db_api.service_node_get_by_name( + context, + self.registration_info.get('fqdn')) + LOG.info('Service node %s is existing' % node) # Service try: - service = db_api.service_get_by_name( - context, - self.registration_info.get('project_name')) - LOG.info('Service %s is existing' % service) - except exception.ServiceNotFound: s_id = 'b9c2549f-f685-4bc2-92e9-ba8af9c18591' service = db_api.service_create( context, @@ -161,53 +156,36 @@ class ServiceProcessor(object): keystone_service_id=s_id)) LOG.info('Service %s is created' % service) + except exception.AlreadyExist: + service = db_api.service_get_by_name( + context, + self.registration_info.get('project_name')) + LOG.info('Service %s is existing' % service) # Service Component - service_components = \ - db_api.service_component_get_all_by_node_for_service( - context, - node_id=node.id, - service_id=service.id, - name=self.registration_info['prog_name'] - ) - if len(service_components) == 1: - service_component = service_components[0] - LOG.info('Service Component %s is existing' % service_component) - # TODO(mrkanag) what to do when service_components size is > 1 - else: + try: service_component = db_api.service_component_create( context, dict(name=self.registration_info['prog_name'], node_id=node.id, service_id=service.id)) LOG.info('Service Component %s is created' % service_component) + except exception.AlreadyExist: + service_components = \ + db_api.service_component_get_all_by_node_for_service( + context, + node_id=node.id, + service_id=service.id, + name=self.registration_info['prog_name'] + ) + if len(service_components) == 1: + service_component = service_components[0] + LOG.info('Service Component %s is existing' % + service_component) + # TODO(mrkanag) what to do when service_components size is > 1 # Service Worker - # TODO(mrkanag) Find a way to purge the dead service worker - # Once each service is enabled with heart beating namos - # purging can be done once heart beat stopped. this can be - # done from openstack.common.service.py - service_workers = \ - db_api.service_worker_get_by_host_for_service_component( - context, - service_component_id=service_component.id, - host=self.registration_info['host'] - ) - if len(service_workers) == 1: - service_worker = \ - db_api.service_worker_update( - context, - service_workers[0].id, - dict( - pid=self.registration_info['pid'], - name='%s@%s' % (self.registration_info['pid'], - service_component.name) - )) - LOG.info('Service Worker %s is existing and is updated' - % service_worker) - - # TODO(mrkanag) what to do when service_workers size is > 1 - else: + try: service_worker = db_api.service_worker_create( context, # TODO(mrkanag) Fix the name, device driver proper ! @@ -217,6 +195,31 @@ class ServiceProcessor(object): host=self.registration_info['host'], service_component_id=service_component.id)) LOG.info('Service Worker %s is created' % service_worker) + except exception.AlreadyExist: + # TODO(mrkanag) Find a way to purge the dead service worker + # Once each service is enabled with heart beating namos + # purging can be done once heart beat stopped. this can be + # done from openstack.common.service.py + service_workers = \ + db_api.service_worker_get_by_host_for_service_component( + context, + service_component_id=service_component.id, + host=self.registration_info['host'] + ) + if len(service_workers) == 1: + service_worker = \ + db_api.service_worker_update( + context, + service_workers[0].id, + dict( + pid=self.registration_info['pid'], + name='%s@%s' % (self.registration_info['pid'], + service_component.name) + )) + LOG.info('Service Worker %s is existing and is updated' + % service_worker) + + # TODO(mrkanag) what to do when service_workers size is > 1 # Config # TODO(mrkanag) Optimize the config like per service_component @@ -224,18 +227,20 @@ class ServiceProcessor(object): for cfg_name, cfg_obj in self.registration_info[ 'config_dict'].iteritems(): cfg_obj['service_worker_id'] = service_worker.id - configs = db_api.config_get_by_name_for_service_worker( - context, - service_worker_id=cfg_obj['service_worker_id'], - name=cfg_obj['name']) - if len(configs) == 1: - config = db_api.config_update(context, - configs[0].id, - cfg_obj) - LOG.info("Config %s is existing and is updated" % config) - else: + + try: config = db_api.config_create(context, cfg_obj) LOG.info("Config %s is created" % config) + except exception.AlreadyExist: + configs = db_api.config_get_by_name_for_service_worker( + context, + service_worker_id=cfg_obj['service_worker_id'], + name=cfg_obj['name']) + if len(configs) == 1: + config = db_api.config_update(context, + configs[0].id, + cfg_obj) + LOG.info("Config %s is existing and is updated" % config) return service_worker.id @@ -367,11 +372,6 @@ class DriverProcessor(object): # Device device_name = self._get_value(device_cfg['name']) try: - device = db_api.device_get_by_name( - context, - device_name) - LOG.info('Device %s is existing' % device) - except exception.DeviceNotFound: # TODO(mrkanag) region_id is hard-coded, fix it ! # Set the right status as well device = db_api.device_create( @@ -381,8 +381,13 @@ class DriverProcessor(object): region_id='f7dcd175-27ef-46b5-997f-e6e572f320b0')) LOG.info('Device %s is created' % device) + except exception.AlreadyExist: + device = db_api.device_get_by_name( + context, + device_name) + LOG.info('Device %s is existing' % device) - # Handle child devices + # TODO(mrkanag) Poperly Handle child devices if child_device_cfg is not None: for d_name in self._get_value(child_device_cfg['key']): base_name = self._get_value(child_device_cfg['base_name']) @@ -406,16 +411,7 @@ class DriverProcessor(object): LOG.info('Device %s is created' % device) # Device Endpoint - device_endpoints = db_api.device_endpoint_get_by_device_type( - context, - device_id=device.id, - type=endpoint_type, - name=device_endpoint_name) - if len(device_endpoints) >= 1: - device_endpoint = device_endpoints[0] - LOG.info('Device Endpoint %s is existing' % - device_endpoints[0]) - else: + try: for k, v in connection_cfg.iteritems(): connection[k] = self._get_value(k) @@ -426,15 +422,19 @@ class DriverProcessor(object): type=endpoint_type, device_id=device.id)) LOG.info('Device Endpoint %s is created' % device_endpoint) + except exception.AlreadyExist: + device_endpoints = db_api.device_endpoint_get_by_device_type( + context, + device_id=device.id, + type=endpoint_type, + name=device_endpoint_name) + if len(device_endpoints) >= 1: + device_endpoint = device_endpoints[0] + LOG.info('Device Endpoint %s is existing' % + device_endpoints[0]) # Device Driver Class try: - device_driver_class = db_api.device_driver_class_get_by_name( - context, - driver_name) - LOG.info('Device Driver Class %s is existing' % - device_driver_class) - except exception.DeviceDriverClassNotFound: device_driver_class = db_api.device_driver_class_create( context, dict(name=driver_name, @@ -446,21 +446,15 @@ class DriverProcessor(object): extra=driver_def.get('extra'))) LOG.info('Device Driver Class %s is created' % device_driver_class) + except exception.AlreadyExist: + device_driver_class = db_api.device_driver_class_get_by_name( + context, + driver_name) + LOG.info('Device Driver Class %s is existing' % + device_driver_class) # Device Driver - device_drivers = \ - db_api.device_driver_get_by_device_endpoint_service_worker( - context, - device_id=device.id, - endpoint_id=device_endpoint.id, - device_driver_class_id=device_driver_class.id, - service_worker_id=self.service_worker_id - ) - if len(device_drivers) >= 1: - device_driver = device_drivers[0] - LOG.info('Device Driver %s is existing' % - device_driver) - else: + try: device_driver = db_api.device_driver_create( context, dict(device_id=device.id, @@ -471,6 +465,19 @@ class DriverProcessor(object): ) LOG.info('Device Driver %s is created' % device_driver) + except exception.AlreadyExist: + device_drivers = \ + db_api.device_driver_get_by_device_endpoint_service_worker( + context, + device_id=device.id, + endpoint_id=device_endpoint.id, + device_driver_class_id=device_driver_class.id, + service_worker_id=self.service_worker_id + ) + if len(device_drivers) >= 1: + device_driver = device_drivers[0] + LOG.info('Device Driver %s is existing' % + device_driver) if __name__ == '__main__': diff --git a/namos/db/sqlalchemy/alembic/versions/48ebec3cd6f6_initial_version.py b/namos/db/sqlalchemy/alembic/versions/48ebec3cd6f6_initial_version.py index 83679a5..a9e5d39 100644 --- a/namos/db/sqlalchemy/alembic/versions/48ebec3cd6f6_initial_version.py +++ b/namos/db/sqlalchemy/alembic/versions/48ebec3cd6f6_initial_version.py @@ -34,7 +34,7 @@ def upgrade(): sa.Column('created_at', sa.DateTime(), nullable=True), sa.Column('updated_at', sa.DateTime(), nullable=True), sa.Column('id', sa.Uuid(length=36), nullable=False), - sa.Column('name', sa.String(length=255), nullable=False), + sa.Column('name', sa.String(length=255), nullable=False, unique=True), sa.Column('deleted_at', sa.DateTime(), nullable=True), sa.Column('extra', sa.Json(), nullable=True), sa.Column('keystone_service_id', sa.Uuid(length=36), nullable=False), @@ -47,7 +47,7 @@ def upgrade(): sa.Column('created_at', sa.DateTime(), nullable=True), sa.Column('updated_at', sa.DateTime(), nullable=True), sa.Column('id', sa.Uuid(length=36), nullable=False), - sa.Column('name', sa.String(length=255), nullable=False), + sa.Column('name', sa.String(length=255), nullable=False, unique=True), sa.Column('deleted_at', sa.DateTime(), nullable=True), sa.Column('extra', sa.Json(), nullable=True), sa.Column('python_class', sa.String(length=256), nullable=False), @@ -62,7 +62,7 @@ def upgrade(): sa.Column('created_at', sa.DateTime(), nullable=True), sa.Column('updated_at', sa.DateTime(), nullable=True), sa.Column('id', sa.Uuid(length=36), nullable=False), - sa.Column('name', sa.String(length=255), nullable=False), + sa.Column('name', sa.String(length=255), nullable=False, unique=True), sa.Column('deleted_at', sa.DateTime(), nullable=True), sa.Column('extra', sa.Json(), nullable=True), sa.Column('keystone_region_id', sa.String(length=255), nullable=False), @@ -75,7 +75,7 @@ def upgrade(): sa.Column('created_at', sa.DateTime(), nullable=True), sa.Column('updated_at', sa.DateTime(), nullable=True), sa.Column('id', sa.Uuid(length=36), nullable=False), - sa.Column('name', sa.String(length=255), nullable=False), + sa.Column('name', sa.String(length=255), nullable=False, unique=True), sa.Column('deleted_at', sa.DateTime(), nullable=True), sa.Column('status', sa.String(length=64), nullable=False), sa.Column('description', sa.Text(), nullable=True), @@ -94,7 +94,7 @@ def upgrade(): sa.Column('created_at', sa.DateTime(), nullable=True), sa.Column('updated_at', sa.DateTime(), nullable=True), sa.Column('id', sa.Uuid(length=36), nullable=False), - sa.Column('name', sa.String(length=255), nullable=False), + sa.Column('name', sa.String(length=255), nullable=False, unique=True), sa.Column('deleted_at', sa.DateTime(), nullable=True), sa.Column('description', sa.Text(), nullable=True), sa.Column('extra', sa.Json(), nullable=True), @@ -110,7 +110,7 @@ def upgrade(): sa.Column('created_at', sa.DateTime(), nullable=True), sa.Column('updated_at', sa.DateTime(), nullable=True), sa.Column('id', sa.Uuid(length=36), nullable=False), - sa.Column('name', sa.String(length=255), nullable=False), + sa.Column('name', sa.String(length=255), nullable=False, unique=True), sa.Column('extra', sa.Json(), nullable=True), sa.Column('device_id', sa.Uuid(length=36), nullable=True), sa.Column('connection', sa.Json(), nullable=False), @@ -124,7 +124,7 @@ def upgrade(): sa.Column('created_at', sa.DateTime(), nullable=True), sa.Column('updated_at', sa.DateTime(), nullable=True), sa.Column('id', sa.Uuid(length=36), nullable=False), - sa.Column('name', sa.String(length=255), nullable=False), + sa.Column('name', sa.String(length=255), nullable=False, unique=True), sa.Column('deleted_at', sa.DateTime(), nullable=True), sa.Column('description', sa.Text(), nullable=True), sa.Column('extra', sa.Json(), nullable=True), @@ -141,7 +141,7 @@ def upgrade(): sa.Column('created_at', sa.DateTime(), nullable=True), sa.Column('updated_at', sa.DateTime(), nullable=True), sa.Column('id', sa.Uuid(length=36), nullable=False), - sa.Column('name', sa.String(length=255), nullable=False), + sa.Column('name', sa.String(length=255), nullable=False, unique=True), sa.Column('deleted_at', sa.DateTime(), nullable=True), sa.Column('extra', sa.Json(), nullable=True), sa.Column('endpoint_id', sa.Uuid(length=36), nullable=True), @@ -160,7 +160,7 @@ def upgrade(): sa.Column('created_at', sa.DateTime(), nullable=True), sa.Column('updated_at', sa.DateTime(), nullable=True), sa.Column('id', sa.Uuid(length=36), nullable=False), - sa.Column('name', sa.String(length=255), nullable=False), + sa.Column('name', sa.String(length=255), nullable=False, unique=True), sa.Column('deleted_at', sa.DateTime(), nullable=True), sa.Column('extra', sa.Json(), nullable=True), sa.Column('pid', sa.String(length=32), nullable=False), @@ -173,6 +173,7 @@ def upgrade(): sa.PrimaryKeyConstraint('id'), mysql_engine='InnoDB' ) +# TODO(mrkanag) add oslo_config schema here def downgrade(): diff --git a/namos/db/sqlalchemy/api.py b/namos/db/sqlalchemy/api.py index a889576..d9a5ad8 100644 --- a/namos/db/sqlalchemy/api.py +++ b/namos/db/sqlalchemy/api.py @@ -16,6 +16,7 @@ import sys from oslo_config import cfg +from oslo_db import exception as db_exception from oslo_db.sqlalchemy import session as db_session from namos.common import exception @@ -57,7 +58,11 @@ def _session(context): def _create(context, resource_ref, values): resource_ref.update(values) - resource_ref.save(_session(context)) + try: + resource_ref.save(_session(context)) + except db_exception.DBDuplicateEntry: + raise exception.AlreadyExist(model=resource_ref.__class__.__name__, + name=resource_ref.name) return resource_ref diff --git a/namos/db/sqlalchemy/models.py b/namos/db/sqlalchemy/models.py index 0cae56b..7483dfe 100644 --- a/namos/db/sqlalchemy/models.py +++ b/namos/db/sqlalchemy/models.py @@ -18,6 +18,7 @@ SQLAlchemy models for namos database import sqlalchemy from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy import UniqueConstraint import uuid from namos.db.sqlalchemy.types import Json @@ -37,7 +38,7 @@ class NamosBase(models.ModelBase, id = sqlalchemy.Column(Uuid, primary_key=True, default=lambda: str(uuid.uuid4())) name = sqlalchemy.Column(sqlalchemy.String(255), - # unique=True, + unique=True, nullable=False, default=lambda: str(uuid.uuid4())) @@ -128,6 +129,9 @@ class DeviceEndpoint(BASE, Extra): __tablename__ = 'device_endpoint' + __table_args__ = ( + UniqueConstraint("device_id", "type"), + ) device_id = sqlalchemy.Column( Uuid, sqlalchemy.ForeignKey('device.id'), @@ -145,6 +149,12 @@ class DeviceDriver(BASE, SoftDelete, Extra): __tablename__ = 'device_driver' + __table_args__ = ( + UniqueConstraint("device_id", + "endpoint_id", + "device_driver_class_id", + "service_worker_id"), + ) endpoint_id = sqlalchemy.Column( Uuid, @@ -179,7 +189,8 @@ class DeviceDriverClass(BASE, # TODO(kanagaraj-manickam) Correct the max python class path here python_class = sqlalchemy.Column( sqlalchemy.String(256), - nullable=False + nullable=False, + unique=True ) # service type like compute, network, volume, etc type = sqlalchemy.Column( @@ -225,6 +236,15 @@ class ServiceComponent(BASE, Extra): __tablename__ = 'service_component' + __table_args__ = ( + UniqueConstraint("name", "node_id", "service_id"), + ) + + name = sqlalchemy.Column(sqlalchemy.String(255), + # unique=True, + nullable=False, + default=lambda: str(uuid.uuid4())) + node_id = sqlalchemy.Column( Uuid, sqlalchemy.ForeignKey('service_node.id'), @@ -241,6 +261,15 @@ class ServiceWorker(BASE, Extra): __tablename__ = 'service_worker' + __table_args__ = ( + UniqueConstraint("host", "service_component_id"), + ) + + name = sqlalchemy.Column(sqlalchemy.String(255), + # unique=True, + nullable=False, + default=lambda: str(uuid.uuid4())) + pid = sqlalchemy.Column( sqlalchemy.String(32), nullable=False @@ -261,9 +290,18 @@ class OsloConfig(BASE, Extra): __tablename__ = 'oslo_config' + __table_args__ = ( + UniqueConstraint("name", "service_worker_id"), + ) + default_value = sqlalchemy.Column( sqlalchemy.Text ) + name = sqlalchemy.Column(sqlalchemy.String(255), + # unique=True, + nullable=False, + default=lambda: str(uuid.uuid4())) + help = sqlalchemy.Column( sqlalchemy.Text, nullable=False,