1 # ============LICENSE_START===================================================
2 # Copyright (C) 2021 Nordix Foundation.
3 # ============================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 # SPDX-License-Identifier: Apache-2.0
17 # ============LICENSE_END=====================================================
19 """This module represents PMSH application configuration
20 Singleton instance of configuration is created and stored,
21 Enum representation is used for Message Router topics.
24 from enum import Enum, unique
27 from onap_dcae_cbs_docker_client.client import get_all
28 from requests.auth import HTTPBasicAuth
29 from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type
31 from mod import logger
32 from mod.pmsh_utils import mdc_handler
37 """ Enum used to represent Message Router Topic"""
38 AAI_SUBSCRIBER = 'aai_subscriber'
39 POLICY_PM_PUBLISHER = 'policy_pm_publisher'
40 POLICY_PM_SUBSCRIBER = 'policy_pm_subscriber'
43 class MetaSingleton(type):
44 """ Metaclass used to create singleton object by overriding __call__() method """
47 def __call__(cls, *args, **kwargs):
48 if cls not in cls._instances:
49 cls._instances[cls] = super().__call__(*args, **kwargs)
50 return cls._instances[cls]
53 def get_cls_instance(mcs, cls_name):
54 return mcs._instances[cls_name]
57 class AppConfig(metaclass=MetaSingleton):
58 """ Object representation of the PMSH Application config. """
61 app_config = self._get_config()
62 self.key_path = app_config['config'].get('key_path')
63 self.cert_path = app_config['config'].get('cert_path')
64 self.ca_cert_path = app_config['config'].get('ca_cert_path')
65 self.enable_tls = app_config['config'].get('enable_tls')
66 self.aaf_id = app_config['config'].get('aaf_identity')
67 self.aaf_pass = app_config['config'].get('aaf_password')
68 self.streams_publishes = app_config['config'].get('streams_publishes')
69 self.streams_subscribes = app_config['config'].get('streams_subscribes')
70 # TODO: aaf_creds variable should be removed on code cleanup
71 self.aaf_creds = {'aaf_id': self.aaf_id, 'aaf_pass': self.aaf_pass}
75 return AppConfig.get_cls_instance(AppConfig)
77 @retry(wait=wait_fixed(5), stop=stop_after_attempt(5),
78 retry=retry_if_exception_type(ValueError))
79 def _get_config(self):
81 """ Retrieves PMSH's configuration from Config binding service. If a non-2xx response
82 is received, it retries after 2 seconds for 5 times before raising an exception.
85 dict: Dictionary representation of the the service configuration
88 Exception: If any error occurred pulling configuration from Config binding service.
91 logger.info('Attempting to fetch PMSH Configuration from CBS.')
93 logger.info(f'Successfully fetched PMSH config from CBS: {config}')
95 except Exception as e:
96 logger.error(f'Failed to get config from CBS: {e}', exc_info=True)
100 def publish_to_topic(self, mr_topic, event_json, **kwargs):
102 Publish the event to the DMaaP Message Router topic.
105 mr_topic (enum) : Message Router topic to publish.
106 event_json (dict): the json data to be published.
109 Exception: if post request fails.
112 session = requests.Session()
113 topic_url = self.streams_publishes[mr_topic].get('dmaap_info').get('topic_url')
114 headers = {'content-type': 'application/json', 'x-transactionid': kwargs['request_id'],
115 'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']}
116 logger.info(f'Publishing event to MR topic: {topic_url}')
117 response = session.post(topic_url, headers=headers,
118 auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json,
119 verify=(self.ca_cert_path if self.enable_tls else False))
120 response.raise_for_status()
121 except Exception as e:
125 def get_from_topic(self, mr_topic, consumer_id, consumer_group='dcae_pmsh_cg', timeout=5000,
128 Returns the json data from the MrTopic.
131 mr_topic (enum) : Message Router topic to subscribe.
132 consumer_id (str): Within your subscribers group, a name that uniquely
133 identifies your subscribers process.
134 consumer_group (str): A name that uniquely identifies your subscribers.
135 timeout (int): The request timeout value in mSec.
138 list[str]: the json response from DMaaP Message Router topic.
141 session = requests.Session()
142 topic_url = self.streams_subscribes[mr_topic].get('dmaap_info').get('topic_url')
143 headers = {'accept': 'application/json', 'content-type': 'application/json',
144 'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']}
145 logger.info(f'Fetching messages from MR topic: {topic_url}')
146 response = session.get(f'{topic_url}/{consumer_group}/{consumer_id}'
147 f'?timeout={timeout}',
148 auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers,
149 verify=(self.ca_cert_path if self.enable_tls else False))
150 if response.status_code == 503:
151 logger.error(f'MR Service is unavailable at present: {response.content}')
153 response.raise_for_status()
155 return response.json()
156 except Exception as e:
157 logger.error(f'Failed to fetch message from MR: {e}', exc_info=True)