Initial work on async worker for quark
JIRA:NCP-1172 Created basic async service and test client - this is basically scaffolding and more will be added later - currently doesn't have tests but will on the next PR - intention is to add 'plugins' to support different types of async operations Created AsyncTransactions model and migration Created jobs endpoint and resource Created plugin_module for jobs Made job plugin_module admin-only for writes Allowed query by completed Created functional test for job plugin_module Fixed flake8 stuff Undid the neutron requirements Implements: blueprint quark-async-worker Change-Id: I88417f094e90b5410bce8ea5fed01043ed47aedd
This commit is contained in:
126
quark/api/extensions/jobs.py
Normal file
126
quark/api/extensions/jobs.py
Normal file
@@ -0,0 +1,126 @@
|
|||||||
|
# Copyright (c) 2016 Rackspace Hosting Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from neutron.api import extensions
|
||||||
|
from neutron import manager
|
||||||
|
from neutron import wsgi
|
||||||
|
from neutron_lib import exceptions as n_exc
|
||||||
|
from oslo_log import log as logging
|
||||||
|
import webob
|
||||||
|
|
||||||
|
RESOURCE_NAME = 'job'
|
||||||
|
RESOURCE_COLLECTION = RESOURCE_NAME + "s"
|
||||||
|
EXTENDED_ATTRIBUTES_2_0 = {
|
||||||
|
RESOURCE_COLLECTION: {
|
||||||
|
"completed": {"allow_post": False, "is_visible": True,
|
||||||
|
"default": False}}
|
||||||
|
}
|
||||||
|
|
||||||
|
attr_dict = EXTENDED_ATTRIBUTES_2_0[RESOURCE_COLLECTION]
|
||||||
|
attr_dict[RESOURCE_NAME] = {'allow_post': True,
|
||||||
|
'allow_put': True,
|
||||||
|
'is_visible': True}
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class JobsController(wsgi.Controller):
|
||||||
|
|
||||||
|
def __init__(self, plugin):
|
||||||
|
self._resource_name = RESOURCE_NAME
|
||||||
|
self._plugin = plugin
|
||||||
|
|
||||||
|
def index(self, request):
|
||||||
|
context = request.context
|
||||||
|
return {"jobs": self._plugin.get_jobs(context, **request.GET)}
|
||||||
|
|
||||||
|
def show(self, request, id):
|
||||||
|
context = request.context
|
||||||
|
try:
|
||||||
|
return {"job": self._plugin.get_job(context, id)}
|
||||||
|
except n_exc.NotFound as e:
|
||||||
|
raise webob.exc.HTTPNotFound(e)
|
||||||
|
|
||||||
|
def create(self, request, body=None):
|
||||||
|
context = request.context
|
||||||
|
body = self._deserialize(request.body, request.get_content_type())
|
||||||
|
try:
|
||||||
|
return {"job": self._plugin.create_job(context, body)}
|
||||||
|
except n_exc.NotFound as e:
|
||||||
|
raise webob.exc.HTTPNotFound(e)
|
||||||
|
except n_exc.Conflict as e:
|
||||||
|
raise webob.exc.HTTPConflict(e)
|
||||||
|
except n_exc.BadRequest as e:
|
||||||
|
raise webob.exc.HTTPBadRequest(e)
|
||||||
|
|
||||||
|
def update(self, request, id, body=None):
|
||||||
|
context = request.context
|
||||||
|
body = self._deserialize(request.body, request.get_content_type())
|
||||||
|
try:
|
||||||
|
return {"job": self._plugin.update_job(context, id, body)}
|
||||||
|
except n_exc.NotFound as e:
|
||||||
|
raise webob.exc.HTTPNotFound(e)
|
||||||
|
except n_exc.BadRequest as e:
|
||||||
|
raise webob.exc.HTTPBadRequest(e)
|
||||||
|
|
||||||
|
def delete(self, request, id):
|
||||||
|
context = request.context
|
||||||
|
try:
|
||||||
|
return self._plugin.delete_job(context, id)
|
||||||
|
except n_exc.NotFound as e:
|
||||||
|
raise webob.exc.HTTPNotFound(e)
|
||||||
|
except n_exc.BadRequest as e:
|
||||||
|
raise webob.exc.HTTPBadRequest(e)
|
||||||
|
|
||||||
|
|
||||||
|
class Jobs(extensions.ExtensionDescriptor):
|
||||||
|
"""Jobs support."""
|
||||||
|
@classmethod
|
||||||
|
def get_name(cls):
|
||||||
|
return "Asyncronous jobs for a tenant"
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_alias(cls):
|
||||||
|
return RESOURCE_COLLECTION
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_description(cls):
|
||||||
|
return "Provide a way to track asyncronous jobs"
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_namespace(cls):
|
||||||
|
return ("http://docs.openstack.org/network/ext/"
|
||||||
|
"ip_addresses/api/v2.0")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_updated(cls):
|
||||||
|
return "2016-05-15T10:00:00-00:00"
|
||||||
|
|
||||||
|
def get_extended_resources(self, version):
|
||||||
|
if version == "2.0":
|
||||||
|
return EXTENDED_ATTRIBUTES_2_0
|
||||||
|
else:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_resources(cls):
|
||||||
|
"""Returns Ext Resources."""
|
||||||
|
job_controller = JobsController(
|
||||||
|
manager.NeutronManager.get_plugin())
|
||||||
|
resources = []
|
||||||
|
resources.append(extensions.ResourceExtension(
|
||||||
|
Jobs.get_alias(),
|
||||||
|
job_controller))
|
||||||
|
return resources
|
||||||
@@ -92,7 +92,7 @@ def _model_query(context, model, filters, fields=None):
|
|||||||
model_filters = []
|
model_filters = []
|
||||||
eq_filters = ["address", "cidr", "deallocated", "ip_version", "service",
|
eq_filters = ["address", "cidr", "deallocated", "ip_version", "service",
|
||||||
"mac_address_range_id", "transaction_id", "lock_id",
|
"mac_address_range_id", "transaction_id", "lock_id",
|
||||||
"address_type"]
|
"address_type", "completed"]
|
||||||
in_filters = ["device_id", "device_owner", "group_id", "id", "mac_address",
|
in_filters = ["device_id", "device_owner", "group_id", "id", "mac_address",
|
||||||
"name", "network_id", "segment_id", "subnet_id",
|
"name", "network_id", "segment_id", "subnet_id",
|
||||||
"used_by_tenant_id", "version"]
|
"used_by_tenant_id", "version"]
|
||||||
@@ -1148,3 +1148,33 @@ def segment_allocation_range_create(context, **sa_range_dict):
|
|||||||
|
|
||||||
def segment_allocation_range_delete(context, sa_range):
|
def segment_allocation_range_delete(context, sa_range):
|
||||||
context.session.delete(sa_range)
|
context.session.delete(sa_range)
|
||||||
|
|
||||||
|
|
||||||
|
@scoped
|
||||||
|
def async_transaction_find(context, lock_mode=False, **filters):
|
||||||
|
query = context.session.query(models.AsyncTransactions)
|
||||||
|
if lock_mode:
|
||||||
|
query = query.with_lockmode("update")
|
||||||
|
|
||||||
|
model_filters = _model_query(
|
||||||
|
context, models.AsyncTransactions, filters)
|
||||||
|
|
||||||
|
query = query.filter(*model_filters)
|
||||||
|
return query
|
||||||
|
|
||||||
|
|
||||||
|
def async_transaction_create(context, **transaction_dict):
|
||||||
|
tx = models.AsyncTransactions()
|
||||||
|
tx.update(transaction_dict)
|
||||||
|
context.session.add(tx)
|
||||||
|
return tx
|
||||||
|
|
||||||
|
|
||||||
|
def async_transaction_update(context, transaction, **kwargs):
|
||||||
|
transaction.update(kwargs)
|
||||||
|
context.session.add(transaction)
|
||||||
|
return transaction
|
||||||
|
|
||||||
|
|
||||||
|
def async_transaction_delete(context, transaction):
|
||||||
|
context.session.delete(transaction)
|
||||||
|
|||||||
@@ -0,0 +1,32 @@
|
|||||||
|
"""Add async transactions table
|
||||||
|
|
||||||
|
Revision ID: 271cce54e15b
|
||||||
|
Revises: 2a116b962c95
|
||||||
|
Create Date: 2016-06-15 09:24:29.941684
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = '271cce54e15b'
|
||||||
|
down_revision = '2a116b962c95'
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade():
|
||||||
|
op.create_table('quark_async_transactions',
|
||||||
|
sa.Column('created_at', sa.DateTime(), nullable=True),
|
||||||
|
sa.Column('id', sa.String(36), nullable=False),
|
||||||
|
sa.Column('tenant_id', sa.String(255), nullable=False),
|
||||||
|
sa.Column('action', sa.String(255), nullable=False),
|
||||||
|
sa.Column('completed', sa.Boolean()),
|
||||||
|
sa.PrimaryKeyConstraint('id'),
|
||||||
|
mysql_engine='InnoDB')
|
||||||
|
op.create_index(op.f('ix_quark_async_transactions_tenant_id'),
|
||||||
|
'quark_async_transactions', ['tenant_id'],
|
||||||
|
unique=False)
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade():
|
||||||
|
op.drop_table('quark_async_transactions')
|
||||||
@@ -589,3 +589,10 @@ class SegmentAllocationRange(BASEV2, models.HasId):
|
|||||||
last_id = sa.Column(sa.BigInteger(), nullable=False)
|
last_id = sa.Column(sa.BigInteger(), nullable=False)
|
||||||
|
|
||||||
do_not_use = sa.Column(sa.Boolean(), default=False, nullable=False)
|
do_not_use = sa.Column(sa.Boolean(), default=False, nullable=False)
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncTransactions(BASEV2, models.HasId):
|
||||||
|
__tablename__ = "quark_async_transactions"
|
||||||
|
tenant_id = sa.Column(sa.String(255), index=True)
|
||||||
|
action = sa.Column(sa.String(255))
|
||||||
|
completed = sa.Column(sa.Boolean(), default=False)
|
||||||
|
|||||||
@@ -244,3 +244,7 @@ class CannotAddMoreIPsToPort(n_exc.OverQuota):
|
|||||||
|
|
||||||
class CannotCreateMoreSharedIPs(n_exc.OverQuota):
|
class CannotCreateMoreSharedIPs(n_exc.OverQuota):
|
||||||
message = _("Cannot create more shared IPs on selected network")
|
message = _("Cannot create more shared IPs on selected network")
|
||||||
|
|
||||||
|
|
||||||
|
class JobNotFound(n_exc.NotFound):
|
||||||
|
message = _("Job %(job_id)s not found")
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ from quark import ip_availability
|
|||||||
from quark.plugin_modules import floating_ips
|
from quark.plugin_modules import floating_ips
|
||||||
from quark.plugin_modules import ip_addresses
|
from quark.plugin_modules import ip_addresses
|
||||||
from quark.plugin_modules import ip_policies
|
from quark.plugin_modules import ip_policies
|
||||||
|
from quark.plugin_modules import jobs
|
||||||
from quark.plugin_modules import mac_address_ranges
|
from quark.plugin_modules import mac_address_ranges
|
||||||
from quark.plugin_modules import networks
|
from quark.plugin_modules import networks
|
||||||
from quark.plugin_modules import ports
|
from quark.plugin_modules import ports
|
||||||
@@ -131,7 +132,7 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
|
|||||||
"networks_quark", "router",
|
"networks_quark", "router",
|
||||||
"ip_availabilities", "ports_quark",
|
"ip_availabilities", "ports_quark",
|
||||||
"floatingip", "segment_allocation_ranges",
|
"floatingip", "segment_allocation_ranges",
|
||||||
"scalingip"]
|
"scalingip", "jobs"]
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
LOG.info("Starting quark plugin")
|
LOG.info("Starting quark plugin")
|
||||||
@@ -499,3 +500,25 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
|
|||||||
fields=fields, sorts=sorts,
|
fields=fields, sorts=sorts,
|
||||||
limit=limit, marker=marker,
|
limit=limit, marker=marker,
|
||||||
page_reverse=page_reverse)
|
page_reverse=page_reverse)
|
||||||
|
|
||||||
|
@sessioned
|
||||||
|
def get_jobs(self, context, **filters):
|
||||||
|
return jobs.get_jobs(context, **filters)
|
||||||
|
|
||||||
|
@sessioned
|
||||||
|
def get_job(self, context, id):
|
||||||
|
return jobs.get_job(context, id)
|
||||||
|
|
||||||
|
@sessioned
|
||||||
|
def create_job(self, context, job):
|
||||||
|
self._fix_missing_tenant_id(context, job['job'])
|
||||||
|
return jobs.create_job(context, job)
|
||||||
|
|
||||||
|
@sessioned
|
||||||
|
def update_job(self, context, id, job):
|
||||||
|
self._fix_missing_tenant_id(context, job['job'])
|
||||||
|
return jobs.update_job(context, id, job)
|
||||||
|
|
||||||
|
@sessioned
|
||||||
|
def delete_job(self, context, id):
|
||||||
|
return jobs.delete_job(context, id)
|
||||||
|
|||||||
89
quark/plugin_modules/jobs.py
Normal file
89
quark/plugin_modules/jobs.py
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
# Copyright 2016 Rackspace Hosting Inc.
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from neutron_lib import exceptions as n_exc
|
||||||
|
from oslo_config import cfg
|
||||||
|
from oslo_log import log as logging
|
||||||
|
|
||||||
|
from quark.db import api as db_api
|
||||||
|
from quark import exceptions as q_exc
|
||||||
|
from quark import plugin_views as v
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def get_jobs(context, **filters):
|
||||||
|
LOG.info("get_jobs for tenant %s" % context.tenant_id)
|
||||||
|
if not filters:
|
||||||
|
filters = {}
|
||||||
|
jobs = db_api.async_transaction_find(context, scope=db_api.ALL, **filters)
|
||||||
|
return [v._make_job_dict(ip) for ip in jobs]
|
||||||
|
|
||||||
|
|
||||||
|
def get_job(context, id):
|
||||||
|
LOG.info("get_job %s for tenant %s" % (id, context.tenant_id))
|
||||||
|
filters = {}
|
||||||
|
job = db_api.async_transaction_find(context, id=id, scope=db_api.ONE,
|
||||||
|
**filters)
|
||||||
|
if not job:
|
||||||
|
raise q_exc.JobNotFound(job_id=id)
|
||||||
|
return v._make_job_dict(job)
|
||||||
|
|
||||||
|
|
||||||
|
def create_job(context, body):
|
||||||
|
LOG.info("create_job for tenant %s" % context.tenant_id)
|
||||||
|
|
||||||
|
if not context.is_admin:
|
||||||
|
raise n_exc.NotAuthorized()
|
||||||
|
job = body.get('job')
|
||||||
|
if not job:
|
||||||
|
raise n_exc.BadRequest(resource="job", msg="Invalid request body.")
|
||||||
|
with context.session.begin():
|
||||||
|
new_job = db_api.async_transaction_create(context, **job)
|
||||||
|
return v._make_job_dict(new_job)
|
||||||
|
|
||||||
|
|
||||||
|
def update_job(context, id, body):
|
||||||
|
LOG.info("update_job %s for tenant %s" % (id, context.tenant_id))
|
||||||
|
|
||||||
|
if not context.is_admin:
|
||||||
|
raise n_exc.NotAuthorized()
|
||||||
|
job_update = body.get('job')
|
||||||
|
if not job_update:
|
||||||
|
raise n_exc.BadRequest(resource="job", msg="Invalid request body.")
|
||||||
|
job = db_api.async_transaction_find(context, id=id, scope=db_api.ONE)
|
||||||
|
if not job:
|
||||||
|
raise q_exc.JobNotFound(job_id=id)
|
||||||
|
job_mod = db_api.async_transaction_update(context, job, **job_update)
|
||||||
|
return v._make_job_dict(job_mod)
|
||||||
|
|
||||||
|
|
||||||
|
def delete_job(context, id, **filters):
|
||||||
|
"""Delete an ip address.
|
||||||
|
|
||||||
|
: param context: neutron api request context
|
||||||
|
: param id: UUID representing the ip address to delete.
|
||||||
|
"""
|
||||||
|
LOG.info("delete_ip_address %s for tenant %s" % (id, context.tenant_id))
|
||||||
|
|
||||||
|
if not context.is_admin:
|
||||||
|
raise n_exc.NotAuthorized()
|
||||||
|
with context.session.begin():
|
||||||
|
job = db_api.async_transaction_find(context, id=id, scope=db_api.ONE,
|
||||||
|
**filters)
|
||||||
|
if not job:
|
||||||
|
raise q_exc.JobNotFound(job_id=id)
|
||||||
|
db_api.async_transaction_delete(context, job)
|
||||||
@@ -355,3 +355,11 @@ def _make_scaling_ip_dict(flip):
|
|||||||
"tenant_id": flip.get("used_by_tenant_id"),
|
"tenant_id": flip.get("used_by_tenant_id"),
|
||||||
"status": flip.get("status"),
|
"status": flip.get("status"),
|
||||||
"ports": ports}
|
"ports": ports}
|
||||||
|
|
||||||
|
|
||||||
|
def _make_job_dict(job):
|
||||||
|
return {"id": job.get('id'),
|
||||||
|
"action": job.get('action'),
|
||||||
|
"completed": job.get('completed'),
|
||||||
|
"tenant_id": job.get('tenant_id'),
|
||||||
|
"created_at": job.get('created_at')}
|
||||||
|
|||||||
207
quark/tests/functional/plugin_modules/test_jobs.py
Normal file
207
quark/tests/functional/plugin_modules/test_jobs.py
Normal file
@@ -0,0 +1,207 @@
|
|||||||
|
# Copyright 2016 Rackspace Hosting Inc
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for# the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
from oslo_log import log as logging
|
||||||
|
|
||||||
|
from neutron_lib import exceptions as n_exc
|
||||||
|
|
||||||
|
from quark import exceptions as q_exc
|
||||||
|
from quark.plugin_modules import jobs as job_api
|
||||||
|
from quark.tests.functional.base import BaseFunctionalTest
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class QuarkJobs(BaseFunctionalTest):
|
||||||
|
def setUp(self):
|
||||||
|
super(QuarkJobs, self).setUp()
|
||||||
|
self.action = "test action"
|
||||||
|
self.tenant_id = "test_tenant"
|
||||||
|
self.tenant_id2 = "test_tenant2"
|
||||||
|
|
||||||
|
def test_create_job(self):
|
||||||
|
job_body = dict(tenant_id=self.tenant_id, action=self.action,
|
||||||
|
completed=False)
|
||||||
|
job_body = dict(job=job_body)
|
||||||
|
job = job_api.create_job(self.admin_context, job_body)
|
||||||
|
self.assertIsNotNone(job)
|
||||||
|
self.assertFalse(job['completed'])
|
||||||
|
self.assertEqual(self.tenant_id, job['tenant_id'])
|
||||||
|
self.assertEqual(self.action, job['action'])
|
||||||
|
|
||||||
|
def test_create_job_fail_non_admin(self):
|
||||||
|
job_body = dict(tenant_id=self.tenant_id, action=self.action,
|
||||||
|
completed=False)
|
||||||
|
job_body = dict(job=job_body)
|
||||||
|
with self.assertRaises(n_exc.NotAuthorized):
|
||||||
|
job_api.create_job(self.context, job_body)
|
||||||
|
|
||||||
|
def test_get_jobs(self):
|
||||||
|
job_body = dict(tenant_id=self.tenant_id, action=self.action,
|
||||||
|
completed=False)
|
||||||
|
job_body = dict(job=job_body)
|
||||||
|
job1 = job_api.create_job(self.admin_context, job_body)
|
||||||
|
self.assertIsNotNone(job1)
|
||||||
|
|
||||||
|
job_body = dict(tenant_id=self.tenant_id2, action=self.action,
|
||||||
|
completed=True)
|
||||||
|
job_body = dict(job=job_body)
|
||||||
|
job2 = job_api.create_job(self.admin_context, job_body)
|
||||||
|
self.assertIsNotNone(job2)
|
||||||
|
|
||||||
|
jobs = job_api.get_job(self.admin_context, job1['id'])
|
||||||
|
self.assertFalse(type(jobs) in [list, tuple])
|
||||||
|
job = jobs
|
||||||
|
self.assertFalse(job['completed'])
|
||||||
|
self.assertEqual(self.tenant_id, job['tenant_id'])
|
||||||
|
self.assertEqual(self.action, job['action'])
|
||||||
|
|
||||||
|
job = job_api.get_job(self.admin_context, job2['id'])
|
||||||
|
self.assertTrue(job['completed'])
|
||||||
|
self.assertEqual(self.tenant_id2, job['tenant_id'])
|
||||||
|
self.assertEqual(self.action, job['action'])
|
||||||
|
|
||||||
|
with self.assertRaises(q_exc.JobNotFound):
|
||||||
|
job_api.get_job(self.admin_context, 'derp')
|
||||||
|
|
||||||
|
jobs = job_api.get_jobs(self.admin_context)
|
||||||
|
self.assertTrue(type(jobs) in [list, tuple])
|
||||||
|
self.assertEqual(2, len(jobs))
|
||||||
|
|
||||||
|
jobs = job_api.get_jobs(self.admin_context, completed=True)
|
||||||
|
self.assertTrue(type(jobs) in [list, tuple])
|
||||||
|
self.assertEqual(1, len(jobs))
|
||||||
|
|
||||||
|
jobs = job_api.get_jobs(self.admin_context, completed=False)
|
||||||
|
self.assertTrue(type(jobs) in [list, tuple])
|
||||||
|
self.assertEqual(1, len(jobs))
|
||||||
|
|
||||||
|
jobs = job_api.get_jobs(self.admin_context, completed='hello')
|
||||||
|
self.assertTrue(type(jobs) in [list, tuple])
|
||||||
|
self.assertEqual(0, len(jobs))
|
||||||
|
|
||||||
|
jobs = job_api.get_jobs(self.admin_context, tenant_id=self.tenant_id)
|
||||||
|
self.assertTrue(type(jobs) in [list, tuple])
|
||||||
|
self.assertEqual(1, len(jobs))
|
||||||
|
|
||||||
|
jobs = job_api.get_jobs(self.admin_context, tenant_id='derp')
|
||||||
|
self.assertTrue(type(jobs) in [list, tuple])
|
||||||
|
self.assertEqual(0, len(jobs))
|
||||||
|
|
||||||
|
def test_get_job_different_non_admin(self):
|
||||||
|
job_body = dict(tenant_id=self.context.tenant_id, action=self.action,
|
||||||
|
completed=False)
|
||||||
|
job_body = dict(job=job_body)
|
||||||
|
job1 = job_api.create_job(self.admin_context, job_body)
|
||||||
|
self.assertIsNotNone(job1)
|
||||||
|
|
||||||
|
job_body = dict(tenant_id=self.tenant_id2, action=self.action,
|
||||||
|
completed=True)
|
||||||
|
job_body = dict(job=job_body)
|
||||||
|
job2 = job_api.create_job(self.admin_context, job_body)
|
||||||
|
self.assertIsNotNone(job2)
|
||||||
|
|
||||||
|
jobs = job_api.get_jobs(self.context)
|
||||||
|
self.assertTrue(type(jobs) in [list, tuple])
|
||||||
|
|
||||||
|
self.assertEqual(1, len(jobs))
|
||||||
|
self.assertEqual(self.context.tenant_id, jobs[0]['tenant_id'])
|
||||||
|
|
||||||
|
def test_update_jobs(self):
|
||||||
|
update_body = dict(completed=True)
|
||||||
|
update_body = dict(job=update_body)
|
||||||
|
|
||||||
|
with self.assertRaises(q_exc.JobNotFound):
|
||||||
|
job_api.update_job(self.admin_context, 'derp', update_body)
|
||||||
|
|
||||||
|
job_body = dict(tenant_id=self.tenant_id, action=self.action,
|
||||||
|
completed=False)
|
||||||
|
job_body = dict(job=job_body)
|
||||||
|
job1 = job_api.create_job(self.admin_context, job_body)
|
||||||
|
self.assertIsNotNone(job1)
|
||||||
|
|
||||||
|
job = job_api.get_job(self.admin_context, job1['id'])
|
||||||
|
self.assertFalse(job['completed'])
|
||||||
|
|
||||||
|
updated_job = job_api.update_job(self.admin_context, job1['id'],
|
||||||
|
update_body)
|
||||||
|
self.assertTrue(updated_job['completed'])
|
||||||
|
|
||||||
|
job = job_api.get_job(self.admin_context, job1['id'])
|
||||||
|
self.assertTrue(job['completed'])
|
||||||
|
|
||||||
|
def test_update_job_fail_non_admin(self):
|
||||||
|
update_body = dict(completed=True)
|
||||||
|
update_body = dict(job=update_body)
|
||||||
|
|
||||||
|
job_body = dict(tenant_id=self.tenant_id, action=self.action,
|
||||||
|
completed=False)
|
||||||
|
job_body = dict(job=job_body)
|
||||||
|
job1 = job_api.create_job(self.admin_context, job_body)
|
||||||
|
self.assertIsNotNone(job1)
|
||||||
|
|
||||||
|
job = job_api.get_job(self.admin_context, job1['id'])
|
||||||
|
self.assertFalse(job['completed'])
|
||||||
|
|
||||||
|
with self.assertRaises(n_exc.NotAuthorized):
|
||||||
|
job_api.update_job(self.context, job1['id'], update_body)
|
||||||
|
|
||||||
|
updated_job = job_api.update_job(self.admin_context, job1['id'],
|
||||||
|
update_body)
|
||||||
|
self.assertTrue(updated_job['completed'])
|
||||||
|
|
||||||
|
job = job_api.get_job(self.admin_context, job1['id'])
|
||||||
|
self.assertTrue(job['completed'])
|
||||||
|
|
||||||
|
def test_delete_jobs(self):
|
||||||
|
with self.assertRaises(q_exc.JobNotFound):
|
||||||
|
job_api.delete_job(self.admin_context, 'derp')
|
||||||
|
|
||||||
|
job_body = dict(tenant_id=self.tenant_id, action=self.action,
|
||||||
|
completed=False)
|
||||||
|
job_body = dict(job=job_body)
|
||||||
|
job1 = job_api.create_job(self.admin_context, job_body)
|
||||||
|
self.assertIsNotNone(job1)
|
||||||
|
|
||||||
|
job = job_api.get_job(self.admin_context, job1['id'])
|
||||||
|
self.assertFalse(job['completed'])
|
||||||
|
|
||||||
|
job_api.delete_job(self.admin_context, job1['id'])
|
||||||
|
|
||||||
|
with self.assertRaises(q_exc.JobNotFound):
|
||||||
|
job_api.get_job(self.admin_context, job1['id'])
|
||||||
|
|
||||||
|
with self.assertRaises(q_exc.JobNotFound):
|
||||||
|
job_api.delete_job(self.admin_context, job1['id'])
|
||||||
|
|
||||||
|
def test_delete_job_fail_non_admin(self):
|
||||||
|
with self.assertRaises(n_exc.NotAuthorized):
|
||||||
|
job_api.delete_job(self.context, 'derp')
|
||||||
|
|
||||||
|
job_body = dict(tenant_id=self.tenant_id, action=self.action,
|
||||||
|
completed=False)
|
||||||
|
job_body = dict(job=job_body)
|
||||||
|
job1 = job_api.create_job(self.admin_context, job_body)
|
||||||
|
self.assertIsNotNone(job1)
|
||||||
|
|
||||||
|
job = job_api.get_job(self.admin_context, job1['id'])
|
||||||
|
self.assertFalse(job['completed'])
|
||||||
|
|
||||||
|
with self.assertRaises(n_exc.NotAuthorized):
|
||||||
|
job_api.delete_job(self.context, job1['id'])
|
||||||
|
|
||||||
|
job_api.delete_job(self.admin_context, job1['id'])
|
||||||
|
|
||||||
|
with self.assertRaises(q_exc.JobNotFound):
|
||||||
|
job_api.get_job(self.context, job1['id'])
|
||||||
149
quark/tools/async_worker.py
Normal file
149
quark/tools/async_worker.py
Normal file
@@ -0,0 +1,149 @@
|
|||||||
|
import eventlet
|
||||||
|
eventlet.monkey_patch(socket=True, select=True, time=True)
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
|
||||||
|
from oslo_config import cfg
|
||||||
|
from oslo_log import log as logging
|
||||||
|
import oslo_messaging as messaging
|
||||||
|
from oslo_service import service as common_service
|
||||||
|
from oslo_utils import excutils
|
||||||
|
|
||||||
|
from neutron._i18n import _
|
||||||
|
from neutron._i18n import _LE
|
||||||
|
from neutron.common import config
|
||||||
|
from neutron.common import rpc as n_rpc
|
||||||
|
from neutron.db import api as session
|
||||||
|
from neutron import service
|
||||||
|
|
||||||
|
service_opts = [
|
||||||
|
cfg.StrOpt('topic',
|
||||||
|
help=_('Topic for messaging to pub/sub to')),
|
||||||
|
cfg.StrOpt('transport_url',
|
||||||
|
help=_('Connection string for transport service')),
|
||||||
|
cfg.IntOpt('periodic_interval',
|
||||||
|
default=40,
|
||||||
|
help=_('Seconds between running periodic tasks')),
|
||||||
|
cfg.IntOpt('rpc_workers',
|
||||||
|
default=1,
|
||||||
|
help=_('Number of RPC worker processes for service')),
|
||||||
|
cfg.IntOpt('periodic_fuzzy_delay',
|
||||||
|
default=5,
|
||||||
|
help=_('Range of seconds to randomly delay when starting the '
|
||||||
|
'periodic task scheduler to reduce stampeding. '
|
||||||
|
'(Disable by setting to 0)')),
|
||||||
|
]
|
||||||
|
CONF = cfg.CONF
|
||||||
|
CONF.register_opts(service_opts, "QUARK_ASYNC")
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class QuarkRpcTestCallback(object):
|
||||||
|
target = messaging.Target(version='1.0', namespace=None)
|
||||||
|
|
||||||
|
def stuff(self, context, **kwargs):
|
||||||
|
return {"status": "okay"}
|
||||||
|
|
||||||
|
|
||||||
|
class QuarkAsyncPlugin(object):
|
||||||
|
def __init__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _setup_rpc(self):
|
||||||
|
self.endpoints = [QuarkRpcTestCallback()]
|
||||||
|
|
||||||
|
def start_rpc_listeners(self):
|
||||||
|
"""Configure all listeners here"""
|
||||||
|
self._setup_rpc()
|
||||||
|
self.conn = n_rpc.create_connection()
|
||||||
|
self.conn.create_consumer(CONF.QUARK_ASYNC.topic, self.endpoints,
|
||||||
|
fanout=False)
|
||||||
|
return self.conn.consume_in_threads()
|
||||||
|
|
||||||
|
|
||||||
|
def serve_rpc():
|
||||||
|
|
||||||
|
if cfg.CONF.QUARK_ASYNC.rpc_workers < 1:
|
||||||
|
cfg.CONF.set_override('rpc_workers', 1, "QUARK_ASYNC")
|
||||||
|
|
||||||
|
try:
|
||||||
|
plugins = [QuarkAsyncPlugin()]
|
||||||
|
rpc = service.RpcWorker(plugins)
|
||||||
|
session.dispose() # probaby not needed, but maybe
|
||||||
|
launcher = common_service.ProcessLauncher(CONF, wait_interval=1.0)
|
||||||
|
launcher.launch_service(rpc, workers=CONF.QUARK_ASYNC.rpc_workers)
|
||||||
|
|
||||||
|
return launcher
|
||||||
|
except Exception:
|
||||||
|
with excutils.save_and_reraise_exception():
|
||||||
|
LOG.exception(_LE('Unrecoverable error: please check log for '
|
||||||
|
'details.'))
|
||||||
|
|
||||||
|
|
||||||
|
def start_api_and_rpc_workers():
|
||||||
|
pool = eventlet.GreenPool()
|
||||||
|
|
||||||
|
quark_rpc = serve_rpc()
|
||||||
|
pool.spawn(quark_rpc.wait)
|
||||||
|
|
||||||
|
pool.waitall()
|
||||||
|
|
||||||
|
|
||||||
|
def boot_server(server_func):
|
||||||
|
# the configuration will be read into the cfg.CONF global data structure
|
||||||
|
config.init(sys.argv[1:])
|
||||||
|
config.setup_logging()
|
||||||
|
config.set_config_defaults()
|
||||||
|
if not cfg.CONF.config_file:
|
||||||
|
sys.exit(_("ERROR: Unable to find configuration file via the default"
|
||||||
|
" search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and"
|
||||||
|
" the '--config-file' option!"))
|
||||||
|
try:
|
||||||
|
server_func()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
except RuntimeError as e:
|
||||||
|
sys.exit(_("ERROR: %s") % e)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
boot_server(start_api_and_rpc_workers)
|
||||||
|
|
||||||
|
|
||||||
|
class QuarkRpcTestApi(object):
|
||||||
|
"""This class is used for testing QuarkRpcTestCallback."""
|
||||||
|
def __init__(self):
|
||||||
|
target = messaging.Target(topic=CONF.QUARK_ASYNC.topic)
|
||||||
|
self.client = n_rpc.get_client(target)
|
||||||
|
|
||||||
|
def stuff(self, context):
|
||||||
|
cctxt = self.client.prepare(version='1.0')
|
||||||
|
return cctxt.call(context, 'stuff')
|
||||||
|
|
||||||
|
|
||||||
|
class QuarkAsyncTestContext(object):
|
||||||
|
"""This class is used for testing QuarkRpcTestCallback."""
|
||||||
|
def __init__(self):
|
||||||
|
self.time = time.ctime()
|
||||||
|
|
||||||
|
def to_dict(self):
|
||||||
|
return {"application": "rpc-client", "time": time.ctime()}
|
||||||
|
|
||||||
|
|
||||||
|
def test_main():
|
||||||
|
config.init(sys.argv[1:])
|
||||||
|
config.setup_logging()
|
||||||
|
config.set_config_defaults()
|
||||||
|
if not cfg.CONF.config_file:
|
||||||
|
sys.exit(_("ERROR: Unable to find configuration file via the default"
|
||||||
|
" search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and"
|
||||||
|
" the '--config-file' option!"))
|
||||||
|
context = QuarkAsyncTestContext() # typically context is neutron context
|
||||||
|
client = QuarkRpcTestApi()
|
||||||
|
LOG.info(client.stuff(context))
|
||||||
|
time.sleep(0) # necessary for preventing Timeout exceptions
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -21,6 +21,8 @@ console_scripts =
|
|||||||
quark-db-manage = quark.db.migration.alembic.cli:main
|
quark-db-manage = quark.db.migration.alembic.cli:main
|
||||||
gunicorn-neutron-server = quark.gunicorn_server:main
|
gunicorn-neutron-server = quark.gunicorn_server:main
|
||||||
quark-agent = quark.agent.agent:main
|
quark-agent = quark.agent.agent:main
|
||||||
|
quark-async-worker = quark.tools.async_worker:main
|
||||||
|
quark-async-tester = quark.tools.async_worker:test_main
|
||||||
ip_availability = quark.ip_availability:main
|
ip_availability = quark.ip_availability:main
|
||||||
redis_sg_tool = quark.tools.redis_sg_tool:main
|
redis_sg_tool = quark.tools.redis_sg_tool:main
|
||||||
null_routes = quark.tools.null_routes:main
|
null_routes = quark.tools.null_routes:main
|
||||||
|
|||||||
Reference in New Issue
Block a user