From 76f87d59ec7f3f11227f74ee19153a8e103a7325 Mon Sep 17 00:00:00 2001 From: Lingxian Kong Date: Wed, 20 Dec 2017 16:32:27 +1300 Subject: [PATCH] Fix job handler Add unit test as well. Change-Id: I053f02ae1ad74eb83427b2c4496c5a5d744418d2 --- qinling/api/controllers/v1/job.py | 2 +- qinling/api/controllers/v1/resources.py | 2 +- qinling/db/api.py | 8 ++- qinling/db/sqlalchemy/api.py | 2 +- .../alembic_migrations/versions/001_pike.py | 2 +- qinling/db/sqlalchemy/models.py | 2 +- qinling/services/periodics.py | 8 +++ qinling/tests/unit/api/base.py | 5 +- qinling/tests/unit/services/test_periodics.py | 54 ++++++++++++++++++- qinling/utils/executions.py | 2 + .../tests/api/test_executions.py | 1 - 11 files changed, 74 insertions(+), 14 deletions(-) diff --git a/qinling/api/controllers/v1/job.py b/qinling/api/controllers/v1/job.py index 1d1695b3..4fe1ebd0 100644 --- a/qinling/api/controllers/v1/job.py +++ b/qinling/api/controllers/v1/job.py @@ -68,7 +68,7 @@ class JobsController(rest.RestController): 'next_execution_time': next_time, 'count': count, 'function_id': params['function_id'], - 'function_input': params.get('function_input') or {}, + 'function_input': params.get('function_input'), 'status': status.RUNNING } db_job = db_api.create_job(values) diff --git a/qinling/api/controllers/v1/resources.py b/qinling/api/controllers/v1/resources.py index 9d5368ed..2eec1025 100644 --- a/qinling/api/controllers/v1/resources.py +++ b/qinling/api/controllers/v1/resources.py @@ -342,7 +342,7 @@ class Job(Resource): id = types.uuid name = wtypes.text function_id = types.uuid - function_input = types.jsontype + function_input = wtypes.text status = wtypes.text pattern = wtypes.text count = int diff --git a/qinling/db/api.py b/qinling/db/api.py index 7516972d..6c78dffd 100644 --- a/qinling/db/api.py +++ b/qinling/db/api.py @@ -66,8 +66,12 @@ def conditional_update(model, values, expected_values, **kwargs): return IMPL.conditional_update(model, values, expected_values, **kwargs) -def get_function(id): - return IMPL.get_function(id) +def get_function(id, insecure=None): + """Get function from db. + + 'insecure' param is needed for job handler. + """ + return IMPL.get_function(id, insecure=insecure) def get_functions(limit=None, marker=None, sort_keys=None, diff --git a/qinling/db/sqlalchemy/api.py b/qinling/db/sqlalchemy/api.py index c3ae8b54..efd602fe 100644 --- a/qinling/db/sqlalchemy/api.py +++ b/qinling/db/sqlalchemy/api.py @@ -219,7 +219,7 @@ def conditional_update(model, values, expected_values, insecure=False, @db_base.insecure_aware() @db_base.session_aware() -def get_function(id, insecure=False, session=None): +def get_function(id, insecure=None, session=None): function = _get_db_object_by_id(models.Function, id, insecure=insecure) if not function: diff --git a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py index 3642c58d..8b8f73ad 100644 --- a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py +++ b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py @@ -103,7 +103,7 @@ def upgrade(): sa.Column('project_id', sa.String(length=80), nullable=False), sa.Column('id', sa.String(length=36), nullable=False), sa.Column('function_id', sa.String(length=36), nullable=False), - sa.Column('function_input', st.JsonLongDictType(), nullable=True), + sa.Column('function_input', sa.String(length=255), nullable=True), sa.Column('status', sa.String(length=32), nullable=False), sa.Column('name', sa.String(length=255), nullable=True), sa.Column('pattern', sa.String(length=32), nullable=True), diff --git a/qinling/db/sqlalchemy/models.py b/qinling/db/sqlalchemy/models.py index 9c536c32..caf4e089 100644 --- a/qinling/db/sqlalchemy/models.py +++ b/qinling/db/sqlalchemy/models.py @@ -76,7 +76,7 @@ class Job(model_base.QinlingSecureModelBase): sa.ForeignKey(Function.id) ) function = relationship('Function', back_populates="jobs") - function_input = sa.Column(st.JsonDictType()) + function_input = sa.Column(sa.String(255), nullable=True) def to_dict(self): d = super(Job, self).to_dict() diff --git a/qinling/services/periodics.py b/qinling/services/periodics.py index 28e1b493..52de95d5 100644 --- a/qinling/services/periodics.py +++ b/qinling/services/periodics.py @@ -148,6 +148,10 @@ def handle_job(engine_client): def start_function_mapping_handler(engine): + """Start function mapping handler thread. + + Function mapping handler is supposed to be running with engine service. + """ tg = threadgroup.ThreadGroup(1) tg.add_timer( 300, @@ -161,6 +165,10 @@ def start_function_mapping_handler(engine): def start_job_handler(): + """Start job handler thread. + + Job handler is supposed to be running with api service. + """ tg = threadgroup.ThreadGroup(1) engine_client = rpc.get_engine_client() diff --git a/qinling/tests/unit/api/base.py b/qinling/tests/unit/api/base.py index 1b18fc51..077cc737 100644 --- a/qinling/tests/unit/api/base.py +++ b/qinling/tests/unit/api/base.py @@ -36,10 +36,7 @@ class APITest(base.DbTestCase): self.addCleanup(shutil.rmtree, package_dir, True) # Disable authentication by default for API tests. - CONF.set_default('auth_enable', False, group='pecan') - self.addCleanup( - CONF.set_default, 'auth_enable', False, group='pecan' - ) + self.override_config('auth_enable', False, group='pecan') pecan_opts = CONF.pecan self.app = pecan.testing.load_test_app({ diff --git a/qinling/tests/unit/services/test_periodics.py b/qinling/tests/unit/services/test_periodics.py index 7b896c54..cdb909aa 100644 --- a/qinling/tests/unit/services/test_periodics.py +++ b/qinling/tests/unit/services/test_periodics.py @@ -11,14 +11,18 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from datetime import datetime +from datetime import timedelta import time import mock from oslo_config import cfg +from qinling import context from qinling.db import api as db_api from qinling.engine import default_engine from qinling.services import periodics +from qinling import status from qinling.tests.unit import base CONF = cfg.CONF @@ -27,10 +31,14 @@ CONF = cfg.CONF class TestPeriodics(base.DbTestCase): TEST_CASE_NAME = 'TestPeriodics' + def setUp(self): + super(TestPeriodics, self).setUp() + self.override_config('auth_enable', False, group='pecan') + @mock.patch('qinling.utils.etcd_util.delete_function') @mock.patch('qinling.utils.etcd_util.get_service_url') - def test_function_service_expiration(self, mock_etcd_url, - mock_etcd_delete): + def test_function_service_expiration_handler(self, mock_etcd_url, + mock_etcd_delete): db_func = self.create_function( runtime_id=None, prefix=self.TEST_CASE_NAME ) @@ -49,3 +57,45 @@ class TestPeriodics(base.DbTestCase): args, kwargs = mock_k8s.delete_function.call_args self.assertIn(function_id, args) mock_etcd_delete.assert_called_once_with(function_id) + + @mock.patch('qinling.utils.jobs.get_next_execution_time') + def test_job_handler(self, mock_get_next): + db_func = self.create_function( + runtime_id=None, prefix=self.TEST_CASE_NAME + ) + function_id = db_func.id + + self.assertEqual(0, db_func.count) + + now = datetime.utcnow() + db_job = self.create_job( + function_id, + self.TEST_CASE_NAME, + status=status.RUNNING, + next_execution_time=now, + count=2 + ) + job_id = db_job.id + + e_client = mock.Mock() + mock_get_next.return_value = now + timedelta(seconds=1) + periodics.handle_job(e_client) + context.set_ctx(self.ctx) + + db_job = db_api.get_job(job_id) + self.assertEqual(1, db_job.count) + db_func = db_api.get_function(function_id) + self.assertEqual(1, db_func.count) + db_execs = db_api.get_executions(function_id=function_id) + self.assertEqual(1, len(db_execs)) + + periodics.handle_job(e_client) + context.set_ctx(self.ctx) + + db_job = db_api.get_job(job_id) + self.assertEqual(0, db_job.count) + self.assertEqual(status.DONE, db_job.status) + db_func = db_api.get_function(function_id) + self.assertEqual(2, db_func.count) + db_execs = db_api.get_executions(function_id=function_id) + self.assertEqual(2, len(db_execs)) diff --git a/qinling/utils/executions.py b/qinling/utils/executions.py index 72b76e15..01d30252 100644 --- a/qinling/utils/executions.py +++ b/qinling/utils/executions.py @@ -62,6 +62,8 @@ def create_execution(engine_client, params): function_id = params['function_id'] is_sync = params.get('sync', True) input = params.get('input') + + # input in params should be a string. if input: try: params['input'] = json.loads(input) diff --git a/qinling_tempest_plugin/tests/api/test_executions.py b/qinling_tempest_plugin/tests/api/test_executions.py index cf55323b..aaa6d10c 100644 --- a/qinling_tempest_plugin/tests/api/test_executions.py +++ b/qinling_tempest_plugin/tests/api/test_executions.py @@ -270,7 +270,6 @@ class ExecutionsTest(base.BaseQinlingTest): @decorators.idempotent_id('bf6f8f35-fa88-469b-8878-7aa85a8ce5ab') def test_python_execution_process_number(self): self._create_function(name='test_python_process_limit.py') - resp, body = self.client.create_execution(self.function_id) self.assertEqual(201, resp.status)