Pylog test suite
[logging-analytics.git] / pylog / tests / test_marker.py
1 # Copyright (c) 2020 Deutsche Telekom.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #       http://www.apache.org/licenses/LICENSE-2.0
6 #
7 # Unless required by applicable law or agreed to in writing, software
8 # distributed under the License is distributed on an "AS IS" BASIS,
9 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10
11 import sys
12 import unittest
13 from collections import namedtuple
14
15 if sys.version_info[0] < 3:
16     from mock import patch
17 if sys.version_info[0] >= 3:
18     from unittest.mock import patch
19
20 import pytest
21
22 from onaplogging.marker import BaseMarker, matchMarkerHelp, MarkerFactory, MarkerFilter, MarkerNotifyHandler
23
24
25 class TestRecordMixin(object):
26
27     Record = namedtuple("Record", "marker")
28
29
30 class TestNameMixin(object):
31
32     TEST_NAME = "test_base"
33
34
35 class TestBaseMarker(unittest.TestCase, TestNameMixin):
36
37     def setUp(self):
38         super(TestBaseMarker, self).setUp()
39         self.base_marker = BaseMarker(name=self.TEST_NAME)
40
41     def test_base_marker_name(self):
42         with pytest.raises(TypeError):
43             BaseMarker(123)
44
45         with pytest.raises(ValueError):
46             BaseMarker(name="")
47
48         self.assertEqual(self.base_marker.getName(), self.TEST_NAME)
49
50     def test_base_marker_contains(self):
51         self.assertTrue(self.base_marker.contains(self.base_marker))
52         self.assertTrue(self.base_marker.contains(self.TEST_NAME))
53
54     def test_base_marker_compare(self):
55         self.assertNotEqual(self.base_marker, 3)
56         self.assertEqual(self.base_marker, self.base_marker)
57         other = BaseMarker("Other")
58         self.assertNotEqual(self.base_marker, other)
59         other = BaseMarker(self.TEST_NAME)
60         self.assertEqual(self.base_marker, other)
61
62     def test_base_marker_child(self):
63         self.assertListEqual(list(iter(self.base_marker)), [])
64         self.assertFalse(self.base_marker.contains(3))
65         with pytest.raises(TypeError):
66             self.base_marker.addChild(3)
67         with pytest.raises(TypeError):
68             self.base_marker.addChild("str")
69         with pytest.raises(TypeError):
70             self.base_marker.removeChild(3)
71
72         self.base_marker.addChild(self.base_marker)
73         self.assertListEqual(list(iter(self.base_marker)), [])
74
75         child1 = BaseMarker(name="child1")
76         self.assertFalse(self.base_marker.contains(child1))
77         self.base_marker.addChild(child1)
78         self.assertListEqual(list(iter(self.base_marker)), [child1])
79         self.assertTrue(self.base_marker.contains(child1))
80         self.base_marker.addChild(child1)
81         self.assertListEqual(list(iter(self.base_marker)), [child1])
82
83         self.base_marker.removeChild(child1)
84         self.assertListEqual(list(iter(self.base_marker)), [])
85         self.assertFalse(self.base_marker.contains(child1))
86
87         child2 = BaseMarker(name="child2")
88         self.assertFalse(self.base_marker.contains(child2))
89
90         with pytest.raises(TypeError):
91             self.base_marker.addChilds(None)
92         self.base_marker.addChilds((child1, child2,))
93         self.assertTrue(self.base_marker.contains(child1))
94         self.assertTrue(self.base_marker.contains(child2))
95         self.base_marker.removeChild(child1)
96         self.assertFalse(self.base_marker.contains(child1))
97         self.assertTrue(self.base_marker.contains(child2))
98         self.assertFalse(self.base_marker.contains("child1"))
99         self.assertTrue(self.base_marker.contains("child2"))
100         
101
102 class TestMatchMarkerHelp(unittest.TestCase, TestRecordMixin, TestNameMixin):
103     CHILD_NAME = "child"
104
105     def test_match_marker_help(self):
106         record = self.Record(None)
107         self.assertFalse(matchMarkerHelp(record, "anything"))
108
109         record = self.Record("not_marker_instance")
110         self.assertFalse(matchMarkerHelp(record, "not_marker_instance"))
111
112         marker = BaseMarker(self.TEST_NAME)
113         record = self.Record(marker)
114         self.assertFalse(matchMarkerHelp(record, "invalid_name"))
115         self.assertTrue(matchMarkerHelp(record, marker))
116         self.assertTrue(matchMarkerHelp(record, self.TEST_NAME))
117     
118         child = BaseMarker(self.CHILD_NAME)
119         marker.addChild(child)
120         self.assertTrue(matchMarkerHelp(record, [self.TEST_NAME, self.CHILD_NAME]))
121         self.assertTrue(matchMarkerHelp(record, [marker, self.CHILD_NAME]))
122         self.assertTrue(matchMarkerHelp(record, [marker, child]))
123         self.assertTrue(matchMarkerHelp(record, [marker, "invalid"]))
124
125
126 class TestMarkerFactory(unittest.TestCase, TestNameMixin):
127
128     def setUp(self):
129         super(TestMarkerFactory, self).setUp()
130         self.marker_factory = MarkerFactory()
131     
132     def test_get_marker(self):
133         with pytest.raises(ValueError):
134             self.marker_factory.getMarker()
135         self.assertEqual(len(self.marker_factory._marker_map), 0)
136         marker = self.marker_factory.getMarker(self.TEST_NAME)
137         self.assertEqual(marker.getName(), self.TEST_NAME)
138         self.assertEqual(len(self.marker_factory._marker_map), 1)
139         marker = self.marker_factory.getMarker(self.TEST_NAME)
140         self.assertEqual(marker.getName(), self.TEST_NAME)
141         self.assertEqual(len(self.marker_factory._marker_map), 1)
142
143         self.assertTrue(self.marker_factory.exist(marker.getName()))
144     
145         self.assertTrue(self.marker_factory.deleteMarker(marker.getName()))
146         self.assertFalse(self.marker_factory.exist(marker.getName()))
147         self.assertEqual(len(self.marker_factory._marker_map), 0)
148
149         self.assertFalse(self.marker_factory.deleteMarker(marker.getName()))
150
151
152 class TestMarkerFilter(unittest.TestCase, TestRecordMixin, TestNameMixin):
153
154     def test_marker_filter(self):
155         marker_filter = MarkerFilter()
156
157         record = self.Record(BaseMarker(self.TEST_NAME))
158         self.assertFalse(marker_filter.filter(record))
159
160         marker_filter = MarkerFilter(markers=BaseMarker(self.TEST_NAME))
161         self.assertTrue(marker_filter.filter(record))
162
163
164 class TestMarkerNotifyHandler(unittest.TestCase, TestRecordMixin, TestNameMixin):
165
166     def test_marker_notify_handler(self):
167         record = self.Record(BaseMarker(self.TEST_NAME))
168
169         notify_handler = MarkerNotifyHandler("test_host", "fromaddr", "toaddrs", "subject")
170         self.assertIsNone(notify_handler.markers)
171         self.assertFalse(notify_handler.handle(record))
172
173         marker = BaseMarker(self.TEST_NAME)
174         notify_handler = MarkerNotifyHandler("test_host", "fromaddr", "toaddrs", "subject", markers=[marker])
175         with patch("onaplogging.marker.markerHandler.SMTPHandler.handle") as mock_smtp_handler_handle:
176             mock_smtp_handler_handle.return_value = True
177             self.assertTrue(notify_handler.handle(record))
178         record = self.Record(BaseMarker("other"))
179         self.assertFalse(notify_handler.handle(record))