monasca-agent/tests/detection/test_process_check.py
Joseph Davis 623a4db21f Enable unit tests for py36
Even though there was a py36 test enabled in the gate, the tox.ini
configuration was not actually invoking the unit tests.  This
change sets up the environment to allow tests to run.

As a result, a number of Python3 errors are uncovered and fixed.

Notably:
  Python 3 does not have contextlib.nested, so reformatting using ,
  file() is not in Python 3, so use io.open() instead
  Use six.assertCountEqual(self, in tests

safe_decode:
  subprocess.check_output returns in byte encoding, while default text
type str. safe_decode does the right thing by making sure string are not
bytes in python2 and python3

No ascci encoding:
 python3 defaults to UTF-8 encoding, which is merely an extension to
ascii (default for python2).

test_json_plugin.py:
 the file being opened in binary(wb) mode so python is expecting the
string in bytes.

Some of the refactoring should be revisited after we drop Python 2
support.

Change-Id: I62b46a2509c39201ca015ca7c269b2ea70c376c8
Story: 2005047
Task: 29547
2019-07-18 16:08:09 +02:00

178 lines
8.0 KiB
Python

# (C) Copyright 2016 Hewlett Packard Enterprise Development LP
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import psutil
import tempfile
import unittest
from mock import patch
from monasca_setup.detection.plugins.process import ProcessCheck
LOG = logging.getLogger('monasca_setup.detection.plugins.process')
class PSUtilGetProc(object):
cmdLine = ['monasca-api']
def as_dict(self, attrs=None):
return {'name': 'monasca-api',
'cmdline': PSUtilGetProc.cmdLine}
def cmdline(self):
return self.cmdLine
class TestProcessCheck(unittest.TestCase):
def setUp(self):
unittest.TestCase.setUp(self)
with patch.object(ProcessCheck, '_detect') as mock_detect:
self.proc_plugin = ProcessCheck('temp_dir')
self.assertTrue(mock_detect.called)
def _detect(self,
proc_plugin,
config_is_file=False,
by_process_name=True):
proc_plugin.available = False
psutil_mock = PSUtilGetProc()
process_iter_patch = patch.object(psutil, 'process_iter',
return_value=[psutil_mock])
isfile_patch = patch.object(os.path, 'isfile',
return_value=config_is_file)
with process_iter_patch as mock_process_iter, \
isfile_patch as mock_isfile:
proc_plugin._detect()
if by_process_name:
self.assertTrue(mock_process_iter.called)
self.assertFalse(mock_isfile.called)
def test_detect_process_not_found(self):
PSUtilGetProc.cmdLine = []
self.proc_plugin.process_config = [{'process_names': ['monasca-api'], 'dimensions': {'service': 'monitoring'}}]
self._detect(self.proc_plugin)
self.assertFalse(self.proc_plugin.available)
def test_detect_process_found(self):
self.proc_plugin.process_config = [{'process_names': ['monasca-api'], 'dimensions': {'service': 'monitoring'}}]
self._detect(self.proc_plugin)
self.assertTrue(self.proc_plugin.available)
def test_missing_arg(self):
# monitor by process_username requires component
self.proc_plugin.process_config = [{'process_username': 'dbadmin', 'dimensions': {'service': 'monitoring'}}]
self._detect(self.proc_plugin, by_process_name=False)
self.assertFalse(self.proc_plugin.available)
def test_detect_build_config_process_name(self):
self.proc_plugin.process_config = [{'process_names': ['monasca-api'], 'dimensions': {'service': 'monitoring'}}]
self._detect(self.proc_plugin)
result = self.proc_plugin.build_config()
self.assertEqual(result['process']['instances'][0]['name'],
'monasca-api')
self.assertEqual(result['process']['instances'][0]['detailed'],
True)
self.assertEqual(result['process']['instances'][0]['exact_match'],
False)
self.assertEqual(result['process']['instances'][0]['dimensions']['service'],
'monitoring')
self.assertEqual(result['process']['instances'][0]['dimensions']['component'],
'monasca-api')
self.assertEqual(result['process']['instances'][0]['search_string'][0],
'monasca-api')
def test_detect_build_config_process_name_exact_match_true(self):
self.proc_plugin.process_config = [
{'process_names': ['monasca-api'], 'dimensions': {'service': 'monitoring'}, 'exact_match': True}]
self._detect(self.proc_plugin)
result = self.proc_plugin.build_config()
self.assertEqual(result['process']['instances'][0]['name'],
'monasca-api')
self.assertEqual(result['process']['instances'][0]['detailed'],
True)
self.assertEqual(result['process']['instances'][0]['exact_match'],
True)
self.assertEqual(result['process']['instances'][0]['dimensions']['service'],
'monitoring')
self.assertEqual(result['process']['instances'][0]['dimensions']['component'],
'monasca-api')
self.assertEqual(result['process']['instances'][0]['search_string'][0],
'monasca-api')
def test_build_config_process_names(self):
self.proc_plugin.valid_process_names = [
{'process_names': ['monasca-api'],
'dimensions': {'service': 'monitoring'},
'found_process_names': ['monasca-api'],
'exact_match': False},
{'process_names': ['monasca-thresh'],
'dimensions': {'service': 'monitoring'},
'found_process_names': ['monasca-thresh'],
'exact_match': False}]
result = self.proc_plugin.build_config()
self.assertEqual(result['process']['instances'][0]['name'],
'monasca-api')
self.assertEqual(result['process']['instances'][0]['detailed'],
True)
self.assertEqual(result['process']['instances'][0]['exact_match'],
False)
self.assertEqual(result['process']['instances'][0]['dimensions']['service'],
'monitoring')
self.assertEqual(result['process']['instances'][0]['dimensions']['component'],
'monasca-api')
self.assertEqual(result['process']['instances'][0]['search_string'][0],
'monasca-api')
self.assertEqual(result['process']['instances'][1]['name'],
'monasca-thresh')
self.assertEqual(result['process']['instances'][1]['dimensions']['component'],
'monasca-thresh')
def test_detect_build_config_process_username(self):
self.proc_plugin.process_config = \
[{'process_username': 'dbadmin', 'dimensions': {'service': 'monitoring', 'component': 'vertica'}}]
self.proc_plugin._detect()
result = self.proc_plugin.build_config()
self.assertEqual(result['process']['instances'][0]['name'],
'vertica')
self.assertEqual(result['process']['instances'][0]['detailed'],
True)
self.assertEqual(result['process']['instances'][0]['dimensions']['service'],
'monitoring')
self.assertEqual(result['process']['instances'][0]['dimensions']['component'],
'vertica')
def test_input_yaml_file(self):
# note: The previous tests will cover all yaml data variations, since
# the data is translated into a single dictionary.
fd, temp_path = tempfile.mkstemp(suffix='.yaml', text=True)
os.write(fd, b'---\nprocess_config:\n- process_username: dbadmin\n dimensions:\n '
b'service: monitoring\n component: vertica\n')
self.proc_plugin.args = {'conf_file_path': temp_path}
self.proc_plugin._detect()
result = self.proc_plugin.build_config()
self.assertEqual(result['process']['instances'][0]['name'],
'vertica')
self.assertEqual(result['process']['instances'][0]['detailed'],
True)
self.assertEqual(result['process']['instances'][0]['dimensions']['service'],
'monitoring')
self.assertEqual(result['process']['instances'][0]['dimensions']['component'],
'vertica')
os.close(fd)
os.remove(temp_path)