1 # ============LICENSE_START===================================================
2 # Copyright (C) 2021-2022 Nordix Foundation.
3 # ============================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 # SPDX-License-Identifier: Apache-2.0
17 # ============LICENSE_END=====================================================
19 """This module represents PMSH application configuration
20 Singleton instance of configuration is created and stored,
21 Enum representation is used for Message Router topics.
24 from enum import Enum, unique
27 from onap_dcae_cbs_docker_client.client import get_all
28 from requests.auth import HTTPBasicAuth
29 from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type
31 from mod import logger, mdc_handler
36 """ Enum used to represent Message Router Topic"""
37 AAI_SUBSCRIBER = 'aai_subscriber'
38 POLICY_PM_PUBLISHER = 'policy_pm_publisher'
39 POLICY_PM_SUBSCRIBER = 'policy_pm_subscriber'
42 class MetaSingleton(type):
43 """ Metaclass used to create singleton object by overriding __call__() method """
46 def __call__(cls, *args, **kwargs):
47 if cls not in cls._instances:
48 cls._instances[cls] = super().__call__(*args, **kwargs)
49 return cls._instances[cls]
52 def get_cls_instance(mcs, cls_name):
53 return mcs._instances[cls_name]
56 class AppConfig(metaclass=MetaSingleton):
57 """ Object representation of the PMSH Application config. """
60 app_config = self._get_config()
61 self.key_path = app_config['config'].get('key_path')
62 self.cert_path = app_config['config'].get('cert_path')
63 self.ca_cert_path = app_config['config'].get('ca_cert_path')
64 self.enable_tls = app_config['config'].get('enable_tls')
65 self.aaf_id = app_config['config'].get('aaf_identity')
66 self.aaf_pass = app_config['config'].get('aaf_password')
67 self.streams_publishes = app_config['config'].get('streams_publishes')
68 self.streams_subscribes = app_config['config'].get('streams_subscribes')
72 return AppConfig.get_cls_instance(AppConfig)
74 @retry(wait=wait_fixed(5), stop=stop_after_attempt(5),
75 retry=retry_if_exception_type(ValueError))
76 def _get_config(self):
78 """ Retrieves PMSH's configuration from Config binding service. If a non-2xx response
79 is received, it retries after 2 seconds for 5 times before raising an exception.
82 dict: Dictionary representation of the the service configuration
85 Exception: If any error occurred pulling configuration from Config binding service.
88 logger.info('Attempting to fetch PMSH Configuration from CBS.')
90 logger.info(f'Successfully fetched PMSH config from CBS: {config}')
92 except Exception as e:
93 logger.error(f'Failed to get config from CBS: {e}', exc_info=True)
94 raise ValueError(e) from e
97 def publish_to_topic(self, mr_topic, event_json, **kwargs):
99 Publish the event to the DMaaP Message Router topic.
102 mr_topic (enum) : Message Router topic to publish.
103 event_json (dict): the json data to be published.
105 session = requests.Session()
106 topic_url = self.streams_publishes[mr_topic].get('dmaap_info').get('topic_url')
107 headers = {'content-type': 'application/json', 'x-transactionid': kwargs['request_id'],
108 'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']}
109 logger.info(f'Publishing event to MR topic: {topic_url}')
110 response = session.post(topic_url, headers=headers,
111 auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json,
112 verify=(self.ca_cert_path if self.enable_tls else False))
113 response.raise_for_status()
116 def get_from_topic(self, mr_topic, consumer_id, consumer_group='dcae_pmsh_cg', timeout=5000,
119 Returns the json data from the MrTopic.
122 mr_topic (enum) : Message Router topic to subscribe.
123 consumer_id (str): Within your subscribers group, a name that uniquely
124 identifies your subscribers process.
125 consumer_group (str): A name that uniquely identifies your subscribers.
126 timeout (int): The request timeout value in mSec.
129 list[str]: the json response from DMaaP Message Router topic.
132 session = requests.Session()
133 topic_url = self.streams_subscribes[mr_topic].get('dmaap_info').get('topic_url')
134 headers = {'accept': 'application/json', 'content-type': 'application/json',
135 'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']}
136 logger.info(f'Fetching messages from MR topic: {topic_url}')
137 response = session.get(f'{topic_url}/{consumer_group}/{consumer_id}'
138 f'?timeout={timeout}',
139 auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers,
140 verify=(self.ca_cert_path if self.enable_tls else False))
141 if response.status_code == 503:
142 logger.error(f'MR Service is unavailable at present: {response.content}')
143 response.raise_for_status()
145 return response.json()
146 except Exception as e:
147 logger.error(f'Failed to fetch message from MR: {e}', exc_info=True)