monasca-agent/tests_to_fix/test_win32_event_log.py

138 lines
4.5 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
import logging
import win32evtlog
from nose.plugins.attrib import attr
from nose.plugins.skip import SkipTest
from tests.common import get_check
logging.basicConfig()
CONFIG = """
init_config:
instances:
- host: .
tags:
- testtag1
- testtag2
log_file:
- Application
source_name:
- EVENTLOGTEST
type:
- Warning
notify:
- pagerduty
- "user1@somecompany.com"
- host: .
tags:
- testtag1
- testtag2
log_file:
- Application
source_name:
- EVENTLOGTEST
type:
- Error
- Information
"""
class WinEventLogTest(unittest.TestCase):
def setUp(self):
raise SkipTest("Requires win32evtlog module")
self.LOG_EVENTS = [
('Test 1', win32evtlog.EVENTLOG_WARNING_TYPE),
('Test 2', win32evtlog.EVENTLOG_ERROR_TYPE),
('Test 3', win32evtlog.EVENTLOG_INFORMATION_TYPE),
('Test 4', win32evtlog.EVENTLOG_WARNING_TYPE),
('Test 5', win32evtlog.EVENTLOG_WARNING_TYPE),
('Test 6', win32evtlog.EVENTLOG_ERROR_TYPE),
]
def write_event(self, msg, ev_type, source_name='EVENTLOGTEST'):
# Thanks to http://rosettacode.org/wiki/Write_to_Windows_event_log
import win32api
import win32con
import win32security
import win32evtlogutil
ph = win32api.GetCurrentProcess()
th = win32security.OpenProcessToken(ph, win32con.TOKEN_READ)
my_sid = win32security.GetTokenInformation(th, win32security.TokenUser)[0]
applicationName = source_name
eventID = 1
category = 5
myType = ev_type
descr = [msg, msg]
data = "Application\0Data".encode("ascii")
win32evtlogutil.ReportEvent(applicationName, eventID, eventCategory=category,
eventType=myType, strings=descr, data=data, sid=my_sid)
@attr('windows')
def test_windows_event_log(self):
check, instances = get_check('win32_event_log', CONFIG)
# Run the check against all instances to set the last_ts
for instance in instances:
check.check(instance)
# Run checks again and make sure there are no events
for instance in instances:
check.check(instance)
assert len(check.get_metrics()) == 0
# Generate some events for the log
for msg, ev_type in self.LOG_EVENTS:
self.write_event(msg, ev_type)
self.write_event('do not pick me', win32evtlog.EVENTLOG_INFORMATION_TYPE,
source_name='EVENTLOGTESTBAD')
# Run the checks again for them to pick up the new events
inst1, inst2 = instances
check.check(inst1)
ev1 = check.get_events()
assert len(ev1) > 0
assert len(ev1) == len([ev for ev in self.LOG_EVENTS
if ev[1] == win32evtlog.EVENTLOG_WARNING_TYPE])
for ev in ev1:
# Make sure we only picked up our source
assert 'EVENTLOGTESTBAD' not in ev['msg_title']
# Make sure the tags match up
assert ev['dimensions'] == inst1['dimensions']
# Check that the notifications are there.
for notify in inst1['notify']:
assert '@%s' % notify in ev['msg_text']
check.check(inst2)
ev2 = check.get_events()
assert len(ev2) > 0
assert len(ev2) == len([ev for ev in self.LOG_EVENTS if ev[1] in (
win32evtlog.EVENTLOG_ERROR_TYPE, win32evtlog.EVENTLOG_INFORMATION_TYPE)])
for ev in ev2:
# Make sure we only picked up our source
assert 'EVENTLOGTESTBAD' not in ev['msg_title']
# Make sure the tags match up
assert ev['dimensions'] == inst1['dimensions']
if __name__ == "__main__":
unittest.main()