increase pytest coverage
[dcaegen2/collectors/snmptrap.git] / tests / test_snmptrapd.py
1 import os
2 import pytest
3 import unittest
4 import snmptrapd
5 import datetime
6
7 import trapd_settings as tds
8 import trapd_http_session
9 import trapd_runtime_pid
10 import trapd_io
11 import trapd_get_cbs_config
12
13 from pysnmp.hlapi import *
14 from pysnmp import debug
15
16 class test_snmptrapd(unittest.TestCase):
17     """
18     Test the save_pid mod
19     """
20
21     pytest_json_data = "{ \"snmptrapd\": { \"version\": \"1.4.0\", \"title\": \"ONAP SNMP Trap Receiver\" }, \"protocols\": { \"transport\": \"udp\", \"ipv4_interface\": \"0.0.0.0\", \"ipv4_port\": 6162, \"ipv6_interface\": \"::1\", \"ipv6_port\": 6162 }, \"cache\": { \"dns_cache_ttl_seconds\": 60 }, \"publisher\": { \"http_timeout_milliseconds\": 1500, \"http_retries\": 3, \"http_milliseconds_between_retries\": 750, \"http_primary_publisher\": \"true\", \"http_peer_publisher\": \"unavailable\", \"max_traps_between_publishes\": 10, \"max_milliseconds_between_publishes\": 10000 }, \"streams_publishes\": { \"sec_fault_unsecure\": { \"type\": \"message_router\", \"aaf_password\": null, \"dmaap_info\": { \"location\": \"mtl5\", \"client_id\": null, \"client_role\": null, \"topic_url\": \"http://uebsb91kcdc.it.att.com:3904/events/ONAP-COLLECTOR-SNMPTRAP\" }, \"aaf_username\": null } }, \"files\": { \"runtime_base_dir\": \"/tmp/opt/app/snmptrap\", \"log_dir\": \"logs\", \"data_dir\": \"data\", \"pid_dir\": \"tmp\", \"arriving_traps_log\": \"snmptrapd_arriving_traps.log\", \"snmptrapd_diag\": \"snmptrapd_prog_diag.log\", \"traps_stats_log\": \"snmptrapd_stats.csv\", \"perm_status_file\": \"snmptrapd_status.log\", \"eelf_base_dir\": \"/tmp/opt/app/snmptrap/logs\", \"eelf_error\": \"error.log\", \"eelf_debug\": \"debug.log\", \"eelf_audit\": \"audit.log\", \"eelf_metrics\": \"metrics.log\", \"roll_frequency\": \"day\", \"minimum_severity_to_log\": 2 }, \"trap_config\": { \"sw_interval_in_seconds\": 60, \"notify_oids\": { \".1.3.6.1.4.1.9.0.1\": { \"sw_high_water_in_interval\": 102, \"sw_low_water_in_interval\": 7, \"category\": \"logonly\" }, \".1.3.6.1.4.1.9.0.2\": { \"sw_high_water_in_interval\": 101, \"sw_low_water_in_interval\": 7, \"category\": \"logonly\" }, \".1.3.6.1.4.1.9.0.3\": { \"sw_high_water_in_interval\": 102, \"sw_low_water_in_interval\": 7, \"category\": \"logonly\" }, \".1.3.6.1.4.1.9.0.4\": { \"sw_high_water_in_interval\": 10, \"sw_low_water_in_interval\": 3, \"category\": \"logonly\" } } }, \"snmpv3_config\": { \"usm_users\": [ { \"user\": \"usr-sha-aes256\", \"engineId\": \"8000000001020304\", \"usmHMACSHAAuth\": \"authkey1\", \"usmAesCfb256\": \"privkey1\" }, { \"user\": \"user1\", \"engineId\": \"8000000000000001\", \"usmHMACMD5Auth\": \"authkey1\", \"usmDESPriv\": \"privkey1\" }, { \"user\": \"user2\", \"engineId\": \"8000000000000002\", \"usmHMACSHAAuth\": \"authkey2\", \"usmAesCfb128\": \"privkey2\" }, { \"user\": \"user3\", \"engineId\": \"8000000000000003\", \"usmHMACSHAAuth\": \"authkey3\", \"usmAesCfb256\": \"privkey3\" } ] } }"
22
23     # create copy of snmptrapd.json for pytest
24     pytest_json_config = "/tmp/opt/app/snmptrap/etc/snmptrapd.json"
25     with open(pytest_json_config, 'w') as outfile:
26         outfile.write(pytest_json_data)
27
28  
29     def test_usage_err(self):
30         """
31         Test usage error
32         """
33
34         with pytest.raises(SystemExit) as pytest_wrapped_sys_exit:
35             result = snmptrapd.usage_err()
36             assert pytest_wrapped_sys_exit.type == SystemExit
37             assert pytest_wrapped_sys_exit.value.code == 1
38
39  
40     def test_load_all_configs(self):
41         """
42         Test load of all configs
43         """
44
45         # init vars
46         tds.init()
47
48         # request load of CBS data
49         os.environ.update(CBS_SIM_JSON='/tmp/opt/app/snmptrap/etc/snmptrapd.json')
50         result = trapd_get_cbs_config.get_cbs_config()
51         self.assertEqual(result, True)
52
53         # request load of CBS data
54         result = snmptrapd.load_all_configs(0, 1)
55         self.assertEqual(result, True)
56
57     def test_load_all_configs_signal(self):
58         """
59         Test load of all configs via runtime signal
60         """
61
62         # init vars
63         tds.init()
64
65         # request load of CBS data
66         os.environ.update(CBS_SIM_JSON='/tmp/opt/app/snmptrap/etc/snmptrapd.json')
67         result = trapd_get_cbs_config.get_cbs_config()
68         self.assertEqual(result, True)
69
70         # request load of CBS data
71         result = snmptrapd.load_all_configs(1, 1)
72         self.assertEqual(result, True)
73
74     def test_log_all_arriving_traps(self):
75         """
76         Test logging of traps
77         """
78
79         # init vars
80         tds.init()
81
82         # request load of CBS data
83         os.environ.update(CBS_SIM_JSON='/tmp/opt/app/snmptrap/etc/snmptrapd.json')
84         result = trapd_get_cbs_config.get_cbs_config()
85
86         # set last day to current
87         tds.last_day = datetime.datetime.now().day
88
89         # trap dict for logging
90         tds.trap_dict = {'uuid': '06f6e91c-3236-11e8-9953-005056865aac', 'agent address': '1.2.3.4', 'agent name': 'test-agent.nodomain.com', 'cambria.partition': 'test-agent.nodomain.com', 'community': '', 'community len': 0, 'epoch_serno': 15222068260000, 'protocol version': 'v2c', 'time received': 1522206826.2938566, 'trap category': 'ONAP-COLLECTOR-SNMPTRAP', 'sysUptime': '218567736', 'notify OID': '1.3.6.1.4.1.9999.9.9.999', 'notify OID len': 10}
91
92         # open eelf logs
93         trapd_io.open_eelf_logs()
94
95         # open trap logs
96         tds.arriving_traps_filename = tds.c_config['files']['runtime_base_dir'] + "/" + \
97             tds.c_config['files']['log_dir'] + "/" + \
98             (tds.c_config['files']['arriving_traps_log'])
99         tds.arriving_traps_fd = trapd_io.open_file(tds.arriving_traps_filename)
100
101         # name and open json trap log
102         tds.json_traps_filename = tds.c_config['files']['runtime_base_dir'] + "/" + tds.c_config['files']['log_dir'] + "/" + "DMAAP_" + (
103             tds.c_config['streams_publishes']['sec_fault_unsecure']['dmaap_info']['topic_url'].split('/')[-1]) + ".json"
104         tds.json_traps_fd = trapd_io.open_file(tds.json_traps_filename)
105         msg = ("published traps logged to: %s" % tds.json_traps_filename)
106         trapd_io.stdout_logger(msg)
107         trapd_io.ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg)
108
109         # don't open files, but try to log - should raise exception
110         with pytest.raises(Exception) as pytest_wrapped_exception:
111             result = snmptrapd.log_all_arriving_traps()
112             assert pytest_wrapped_exception.type == AttributeError
113
114     def test_log_all_incorrect_log_type(self):
115         """
116         Test logging of traps
117         """
118
119         # init vars
120         tds.init()
121
122         # request load of CBS data
123         os.environ.update(CBS_SIM_JSON='/tmp/opt/app/snmptrap/etc/snmptrapd.json')
124         trapd_get_cbs_config.get_cbs_config()
125
126         # open eelf logs
127         trapd_io.open_eelf_logs()
128
129     def test_v1_trap_receipt(self):
130         """
131         Test receiving traps
132         """
133
134         # init vars
135         tds.init()
136
137         # request load of CBS data
138         os.environ.update(CBS_SIM_JSON='/tmp/opt/app/snmptrap/etc/snmptrapd.json')
139         trapd_get_cbs_config.get_cbs_config()
140
141         errorIndication, errorStatus, errorIndex, varbinds = next(sendNotification(SnmpEngine(),
142              CommunityData('not_public'),
143              UdpTransportTarget(('localhost', 6162)),
144              ContextData(),
145              'trap',
146              [ObjectType(ObjectIdentity('.1.3.6.1.4.1.999.1'), OctetString('test trap - ignore')),
147               ObjectType(ObjectIdentity('.1.3.6.1.4.1.999.2'), OctetString('ONAP pytest trap'))])
148         )
149
150         result = errorIndication
151         self.assertEqual(result, None)
152
153     def test_add_varbind_to_json(self):
154
155         # init vars
156         tds.init()
157         tds.trap_dict["notify OID"] = ".1.2.3.4.5.6.7.8"
158         tds.trap_dict["protocol version"] = "v2c"
159
160         # varbinds=[(ObjectName('1.3.6.1.2.1.1.3.0'), TimeTicks(0)), (ObjectName('1.3.6.1.6.3.1.1.4.1.0'), ObjectIdentifier('1.3.6.1.4.1.74.2.46.12.1.1')), (ObjectName('1.3.6.1.4.1.74.2.46.12.1.1.1'), OctetString(b'ucsnmp heartbeat - ignore')), (ObjectName('1.3.6.1.4.1.74.2.46.12.1.1.2'), OctetString(b'Thu Mar 21 15:46:58 2019'))]
161
162         # vb=(ObjectName('1.3.6.1.4.1.74.2.46.12.1.1.1'), OctetString(b'ucsnmp heartbeat - ignore'))
163
164         self.assertEqual(snmptrapd.add_varbind_to_json(0,ObjectIdentifier('.1.2.3.4'), 'OctetString', OctetString(b'Thu Mar 21 15:46:58 2019')), 0)
165         self.assertEqual(snmptrapd.add_varbind_to_json(1,ObjectIdentifier('.1.2.3.4'), 'OctetString', OctetString(b'Thu Mar 21 15:46:58 2019')), 0)
166         self.assertEqual(snmptrapd.add_varbind_to_json(2,ObjectIdentifier('.1.2.3.4'), 'OctetString', OctetString(b'Thu Mar 21 15:46:58 2019')), 1)
167         self.assertEqual(snmptrapd.add_varbind_to_json(3,ObjectIdentifier('.1.2.3.4'), 'OctetString', OctetString(b'Thu Mar 21 15:46:58 2019')), 1)
168
169         # init vars
170         tds.init()
171         tds.trap_dict["notify OID"] = ".1.2.3.4.5.6.7.8"
172         tds.trap_dict["protocol version"] = "v1"
173
174         self.assertEqual(snmptrapd.add_varbind_to_json(0,ObjectIdentifier('.1.2.3.4'), 'OctetString', OctetString(b'Thu Mar 21 15:46:58 2019')), 0)
175         self.assertEqual(snmptrapd.add_varbind_to_json(5,ObjectIdentifier('.1.2.3.4'), 'OctetString', OctetString(b'Thu Mar 21 15:46:58 2019')), 1)
176
177 if __name__ == '__main__':
178     unittest.main()