Example of how to use targets plugins

This should demonstrate the way this plugin interface can be used.

Change-Id: I39f4a618e616701a8eecd18d74dae1ae842ba9e4
This commit is contained in:
Clint Byrum 2015-11-10 15:22:20 -08:00
parent 35b10a9de7
commit 92dc9d3d56
4 changed files with 126 additions and 0 deletions

View File

@ -32,6 +32,7 @@ console_scripts =
oslo.config.opts = oslo.config.opts =
subunit2sql.shell = subunit2sql.shell:list_opts subunit2sql.shell = subunit2sql.shell:list_opts
subunit2sql.write_subunit = subunit2sql.write_subunit:list_opts subunit2sql.write_subunit = subunit2sql.write_subunit:list_opts
subunit2sql.targets:attach_local = subunit2sql.targets.localdir.AttachmentsDir
[build_sphinx] [build_sphinx]
source-dir = doc/source source-dir = doc/source

View File

View File

@ -0,0 +1,63 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import os.path
import tempfile
from oslo_config import cfg
import testtools
OPTIONS = [
cfg.StrOpt('attachments_storage_dir', default=None,
help='Any file attachments will be stored here')
]
cfg.CONF.register_cli_opts(OPTIONS)
cfg.CONF.register_opts(OPTIONS)
class AttachmentsDir(testtools.StreamResult):
@classmethod
def enabled(self):
if not cfg.CONF.attachments_storage_dir:
return False
if not os.path.isdir(cfg.CONF.attachments_storage_dir):
return False
return True
def __init__(self):
self.pending_files = {}
def status(self, test_id=None, test_status=None, test_tags=None,
runnable=True, file_name=None, file_bytes=None, eof=False,
mime_type=None, route_code=None, timestamp=None):
if not file_name or not file_bytes:
return
target_dirs = []
if route_code:
target_dirs.append(route_code)
if test_id:
target_dirs.append(test_id)
target_dir = os.path.join(cfg.CONF.attachments_storage_dir,
*target_dirs)
os.makedirs(target_dir)
target_file_path = os.path.join(target_dir, file_name)
if target_file_path not in self.pending_files:
self.pending_files[target_file_path] = tempfile.NamedTemporaryFile(
prefix='.{}'.format(file_name), dir=target_dir, delete=False)
self.pending_files[target_file_path].write(file_bytes)
def stopTestRun(self):
for target_path, f in self.pending_files.items():
f.close()
os.rename(f.name, target_path)

View File

@ -0,0 +1,62 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import os.path
import tempfile
from oslo_config import cfg
from oslo_config import fixture
from subunit2sql.targets import localdir
from subunit2sql.tests import base
class TestLocaldir(base.TestCase):
def setUp(self):
super(TestLocaldir, self).setUp()
self.useFixture(fixture.Config(cfg.CONF))
self.tdir = tempfile.mkdtemp()
cfg.CONF.set_override(name='attachments_storage_dir',
override=self.tdir)
self.ad = localdir.AttachmentsDir()
def test_localdir_enabled_when_configured(self):
self.assertTrue(self.ad.enabled())
def test_localdir_disabled_when_no_conf(self):
cfg.CONF.clear_override(name='attachments_storage_dir')
self.assertFalse(self.ad.enabled())
def test_localdir_status_ignores_non_attachments(self):
self.ad.status(test_id='foo.test',
test_status='melancholy')
self.ad.stopTestRun()
self.assertEqual(0, len(os.listdir(self.tdir)))
def test_localdir_saves_testless_attachments(self):
self.ad.status(file_name='super.txt',
file_bytes=b'the quick brown fox',
route_code='routecode1')
self.ad.status(file_name='super.txt',
file_bytes=b'jumped over the lazy brown dog',
route_code='routecode2')
self.ad.stopTestRun()
expected_path = os.path.join(self.tdir, 'routecode1', 'super.txt')
self.assertTrue(os.path.exists(expected_path))
with open(expected_path, 'rb') as f:
self.assertEqual(b'the quick brown fox', f.read())
expected_path = os.path.join(self.tdir, 'routecode2', 'super.txt')
self.assertTrue(os.path.exists(expected_path))
with open(expected_path, 'rb') as f:
self.assertEqual(b'jumped over the lazy brown dog', f.read())