[DCAEGEN2] PMSH AAI changes with new subscription format
[dcaegen2/services.git] / components / pm-subscription-handler / pmsh_service / mod / pmsh_config.py
1 # ============LICENSE_START===================================================
2 #  Copyright (C) 2021 Nordix Foundation.
3 # ============================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # SPDX-License-Identifier: Apache-2.0
17 # ============LICENSE_END=====================================================
18
19 """This module represents PMSH application configuration
20    Singleton instance of configuration is created and stored,
21    Enum representation is used for Message Router topics.
22 """
23
24 from enum import Enum, unique
25
26 import requests
27 from onap_dcae_cbs_docker_client.client import get_all
28 from requests.auth import HTTPBasicAuth
29 from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type
30
31 from mod import logger
32 from mod.pmsh_utils import mdc_handler
33
34
35 @unique
36 class MRTopic(Enum):
37     """ Enum used to represent Message Router Topic"""
38     AAI_SUBSCRIBER = 'aai_subscriber'
39     POLICY_PM_PUBLISHER = 'policy_pm_publisher'
40     POLICY_PM_SUBSCRIBER = 'policy_pm_subscriber'
41
42
43 class MetaSingleton(type):
44     """ Metaclass used to create singleton object by overriding __call__() method """
45     _instances = {}
46
47     def __call__(cls, *args, **kwargs):
48         if cls not in cls._instances:
49             cls._instances[cls] = super().__call__(*args, **kwargs)
50         return cls._instances[cls]
51
52     @classmethod
53     def get_cls_instance(mcs, cls_name):
54         return mcs._instances[cls_name]
55
56
57 class AppConfig(metaclass=MetaSingleton):
58     """ Object representation of the PMSH Application config. """
59
60     def __init__(self):
61         app_config = self._get_config()
62         self.key_path = app_config['config'].get('key_path')
63         self.cert_path = app_config['config'].get('cert_path')
64         self.ca_cert_path = app_config['config'].get('ca_cert_path')
65         self.enable_tls = app_config['config'].get('enable_tls')
66         self.aaf_id = app_config['config'].get('aaf_identity')
67         self.aaf_pass = app_config['config'].get('aaf_password')
68         self.streams_publishes = app_config['config'].get('streams_publishes')
69         self.streams_subscribes = app_config['config'].get('streams_subscribes')
70         # TODO: aaf_creds variable should be removed on code cleanup
71         self.aaf_creds = {'aaf_id': self.aaf_id, 'aaf_pass': self.aaf_pass}
72
73     @staticmethod
74     def get_instance():
75         return AppConfig.get_cls_instance(AppConfig)
76
77     @retry(wait=wait_fixed(5), stop=stop_after_attempt(5),
78            retry=retry_if_exception_type(ValueError))
79     def _get_config(self):
80
81         """ Retrieves PMSH's configuration from Config binding service. If a non-2xx response
82         is received, it retries after 2 seconds for 5 times before raising an exception.
83
84         Returns:
85             dict: Dictionary representation of the the service configuration
86
87         Raises:
88             Exception: If any error occurred pulling configuration from Config binding service.
89         """
90         try:
91             logger.info('Attempting to fetch PMSH Configuration from CBS.')
92             config = get_all()
93             logger.info(f'Successfully fetched PMSH config from CBS: {config}')
94             return config
95         except Exception as e:
96             logger.error(f'Failed to get config from CBS: {e}', exc_info=True)
97             raise ValueError(e)
98
99     @mdc_handler
100     def publish_to_topic(self, mr_topic, event_json, **kwargs):
101         """
102         Publish the event to the DMaaP Message Router topic.
103
104         Args:
105             mr_topic (enum) : Message Router topic to publish.
106             event_json (dict): the json data to be published.
107
108         Raises:
109             Exception: if post request fails.
110         """
111         try:
112             session = requests.Session()
113             topic_url = self.streams_publishes[mr_topic].get('dmaap_info').get('topic_url')
114             headers = {'content-type': 'application/json', 'x-transactionid': kwargs['request_id'],
115                        'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']}
116             logger.info(f'Publishing event to MR topic: {topic_url}')
117             response = session.post(topic_url, headers=headers,
118                                     auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json,
119                                     verify=(self.ca_cert_path if self.enable_tls else False))
120             response.raise_for_status()
121         except Exception as e:
122             raise e
123
124     @mdc_handler
125     def get_from_topic(self, mr_topic, consumer_id, consumer_group='dcae_pmsh_cg', timeout=5000,
126                        **kwargs):
127         """
128         Returns the json data from the MrTopic.
129
130         Args:
131             mr_topic (enum) : Message Router topic to subscribe.
132             consumer_id (str): Within your subscribers group, a name that uniquely
133             identifies your subscribers process.
134             consumer_group (str): A name that uniquely identifies your subscribers.
135             timeout (int): The request timeout value in mSec.
136
137         Returns:
138             list[str]: the json response from DMaaP Message Router topic.
139         """
140         try:
141             session = requests.Session()
142             topic_url = self.streams_subscribes[mr_topic].get('dmaap_info').get('topic_url')
143             headers = {'accept': 'application/json', 'content-type': 'application/json',
144                        'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']}
145             logger.info(f'Fetching messages from MR topic: {topic_url}')
146             response = session.get(f'{topic_url}/{consumer_group}/{consumer_id}'
147                                    f'?timeout={timeout}',
148                                    auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers,
149                                    verify=(self.ca_cert_path if self.enable_tls else False))
150             if response.status_code == 503:
151                 logger.error(f'MR Service is unavailable at present: {response.content}')
152                 pass
153             response.raise_for_status()
154             if response.ok:
155                 return response.json()
156         except Exception as e:
157             logger.error(f'Failed to fetch message from MR: {e}', exc_info=True)
158             raise