Fix job handler

Add unit test as well.

Change-Id: I053f02ae1ad74eb83427b2c4496c5a5d744418d2
This commit is contained in:
Lingxian Kong 2017-12-20 16:32:27 +13:00
parent 789bed3c85
commit 76f87d59ec
11 changed files with 74 additions and 14 deletions

View File

@ -68,7 +68,7 @@ class JobsController(rest.RestController):
'next_execution_time': next_time,
'count': count,
'function_id': params['function_id'],
'function_input': params.get('function_input') or {},
'function_input': params.get('function_input'),
'status': status.RUNNING
}
db_job = db_api.create_job(values)

View File

@ -342,7 +342,7 @@ class Job(Resource):
id = types.uuid
name = wtypes.text
function_id = types.uuid
function_input = types.jsontype
function_input = wtypes.text
status = wtypes.text
pattern = wtypes.text
count = int

View File

@ -66,8 +66,12 @@ def conditional_update(model, values, expected_values, **kwargs):
return IMPL.conditional_update(model, values, expected_values, **kwargs)
def get_function(id):
return IMPL.get_function(id)
def get_function(id, insecure=None):
"""Get function from db.
'insecure' param is needed for job handler.
"""
return IMPL.get_function(id, insecure=insecure)
def get_functions(limit=None, marker=None, sort_keys=None,

View File

@ -219,7 +219,7 @@ def conditional_update(model, values, expected_values, insecure=False,
@db_base.insecure_aware()
@db_base.session_aware()
def get_function(id, insecure=False, session=None):
def get_function(id, insecure=None, session=None):
function = _get_db_object_by_id(models.Function, id, insecure=insecure)
if not function:

View File

@ -103,7 +103,7 @@ def upgrade():
sa.Column('project_id', sa.String(length=80), nullable=False),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('function_id', sa.String(length=36), nullable=False),
sa.Column('function_input', st.JsonLongDictType(), nullable=True),
sa.Column('function_input', sa.String(length=255), nullable=True),
sa.Column('status', sa.String(length=32), nullable=False),
sa.Column('name', sa.String(length=255), nullable=True),
sa.Column('pattern', sa.String(length=32), nullable=True),

View File

@ -76,7 +76,7 @@ class Job(model_base.QinlingSecureModelBase):
sa.ForeignKey(Function.id)
)
function = relationship('Function', back_populates="jobs")
function_input = sa.Column(st.JsonDictType())
function_input = sa.Column(sa.String(255), nullable=True)
def to_dict(self):
d = super(Job, self).to_dict()

View File

@ -148,6 +148,10 @@ def handle_job(engine_client):
def start_function_mapping_handler(engine):
"""Start function mapping handler thread.
Function mapping handler is supposed to be running with engine service.
"""
tg = threadgroup.ThreadGroup(1)
tg.add_timer(
300,
@ -161,6 +165,10 @@ def start_function_mapping_handler(engine):
def start_job_handler():
"""Start job handler thread.
Job handler is supposed to be running with api service.
"""
tg = threadgroup.ThreadGroup(1)
engine_client = rpc.get_engine_client()

View File

@ -36,10 +36,7 @@ class APITest(base.DbTestCase):
self.addCleanup(shutil.rmtree, package_dir, True)
# Disable authentication by default for API tests.
CONF.set_default('auth_enable', False, group='pecan')
self.addCleanup(
CONF.set_default, 'auth_enable', False, group='pecan'
)
self.override_config('auth_enable', False, group='pecan')
pecan_opts = CONF.pecan
self.app = pecan.testing.load_test_app({

View File

@ -11,14 +11,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
from datetime import timedelta
import time
import mock
from oslo_config import cfg
from qinling import context
from qinling.db import api as db_api
from qinling.engine import default_engine
from qinling.services import periodics
from qinling import status
from qinling.tests.unit import base
CONF = cfg.CONF
@ -27,10 +31,14 @@ CONF = cfg.CONF
class TestPeriodics(base.DbTestCase):
TEST_CASE_NAME = 'TestPeriodics'
def setUp(self):
super(TestPeriodics, self).setUp()
self.override_config('auth_enable', False, group='pecan')
@mock.patch('qinling.utils.etcd_util.delete_function')
@mock.patch('qinling.utils.etcd_util.get_service_url')
def test_function_service_expiration(self, mock_etcd_url,
mock_etcd_delete):
def test_function_service_expiration_handler(self, mock_etcd_url,
mock_etcd_delete):
db_func = self.create_function(
runtime_id=None, prefix=self.TEST_CASE_NAME
)
@ -49,3 +57,45 @@ class TestPeriodics(base.DbTestCase):
args, kwargs = mock_k8s.delete_function.call_args
self.assertIn(function_id, args)
mock_etcd_delete.assert_called_once_with(function_id)
@mock.patch('qinling.utils.jobs.get_next_execution_time')
def test_job_handler(self, mock_get_next):
db_func = self.create_function(
runtime_id=None, prefix=self.TEST_CASE_NAME
)
function_id = db_func.id
self.assertEqual(0, db_func.count)
now = datetime.utcnow()
db_job = self.create_job(
function_id,
self.TEST_CASE_NAME,
status=status.RUNNING,
next_execution_time=now,
count=2
)
job_id = db_job.id
e_client = mock.Mock()
mock_get_next.return_value = now + timedelta(seconds=1)
periodics.handle_job(e_client)
context.set_ctx(self.ctx)
db_job = db_api.get_job(job_id)
self.assertEqual(1, db_job.count)
db_func = db_api.get_function(function_id)
self.assertEqual(1, db_func.count)
db_execs = db_api.get_executions(function_id=function_id)
self.assertEqual(1, len(db_execs))
periodics.handle_job(e_client)
context.set_ctx(self.ctx)
db_job = db_api.get_job(job_id)
self.assertEqual(0, db_job.count)
self.assertEqual(status.DONE, db_job.status)
db_func = db_api.get_function(function_id)
self.assertEqual(2, db_func.count)
db_execs = db_api.get_executions(function_id=function_id)
self.assertEqual(2, len(db_execs))

View File

@ -62,6 +62,8 @@ def create_execution(engine_client, params):
function_id = params['function_id']
is_sync = params.get('sync', True)
input = params.get('input')
# input in params should be a string.
if input:
try:
params['input'] = json.loads(input)

View File

@ -270,7 +270,6 @@ class ExecutionsTest(base.BaseQinlingTest):
@decorators.idempotent_id('bf6f8f35-fa88-469b-8878-7aa85a8ce5ab')
def test_python_execution_process_number(self):
self._create_function(name='test_python_process_limit.py')
resp, body = self.client.create_execution(self.function_id)
self.assertEqual(201, resp.status)