update link to upper-constraints.txt
[optf/osdf.git] / test / adapters / test_message_router.py
1 # -------------------------------------------------------------------------
2 #   Copyright (c) 2017-2018 AT&T Intellectual Property
3 #
4 #   Licensed under the Apache License, Version 2.0 (the "License");
5 #   you may not use this file except in compliance with the License.
6 #   You may obtain a copy of the License at
7 #
8 #       http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #   Unless required by applicable law or agreed to in writing, software
11 #   distributed under the License is distributed on an "AS IS" BASIS,
12 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #   See the License for the specific language governing permissions and
14 #   limitations under the License.
15 #
16 # -------------------------------------------------------------------------
17 #
18 import osdf.adapters.dcae.message_router as MR
19 import unittest
20
21 from osdf.operation.exceptions import MessageBusConfigurationException
22 from unittest.mock import patch
23
24
25 class TestMessageRouter(unittest.TestCase):
26
27     def test_valid_MR(self):
28         mr = MR.MessageRouterClient(dmaap_url="https://MYHOST:3905")
29
30     def test_valid_MR_with_base_urls(self):
31         base_urls = ["https://MYHOST1:3905/events/MY-TOPIC","https://MYHOST2:3905/events/MY-TOPIC"]
32         mr = MR.MessageRouterClient(dmaap_url=base_urls)
33
34     def test_invalid_valid_MR_with_base_urls(self):
35         """No dmaap_url"""
36         try:
37             mr = MR.MessageRouterClient()
38         except MessageBusConfigurationException:
39             return
40
41         raise Exception("Allows invalid MR configuration") # if it failed to error out
42
43     @patch('osdf.adapters.dcae.message_router.MessageRouterClient.http_request', return_value={})
44     def test_mr_http_request_mocked(self, http_request):
45         mr = MR.MessageRouterClient(dmaap_url="https://MYHOST:3905")
46         mr.http_request = http_request
47         assert mr.get() == {} 
48         assert mr.post("Hello") == {} 
49
50     def test_mr_http_request_non_existent_host(self):
51         mr = MR.MessageRouterClient(dmaap_url="https://MYHOST:3905")
52         try:
53             mr.get()
54         except:
55             return
56
57         raise Exception("Allows invalid host") # if it failed to error out
58 if __name__ == "__main__":
59     unittest.main()
60