[DCAEGEN2] Release dcaegen2-services-kpi-computation-ms container
[dcaegen2/services.git] / components / pm-subscription-handler / pmsh_service / mod / pmsh_config.py
1 # ============LICENSE_START===================================================
2 #  Copyright (C) 2021-2022 Nordix Foundation.
3 # ============================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # SPDX-License-Identifier: Apache-2.0
17 # ============LICENSE_END=====================================================
18
19 """This module represents PMSH application configuration
20    Singleton instance of configuration is created and stored,
21    Enum representation is used for Message Router topics.
22 """
23
24 from enum import Enum, unique
25
26 import requests
27 from onap_dcae_cbs_docker_client.client import get_all
28 from requests.auth import HTTPBasicAuth
29 from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type
30
31 from mod import logger, mdc_handler
32
33
34 @unique
35 class MRTopic(Enum):
36     """ Enum used to represent Message Router Topic"""
37     AAI_SUBSCRIBER = 'aai_subscriber'
38     POLICY_PM_PUBLISHER = 'policy_pm_publisher'
39     POLICY_PM_SUBSCRIBER = 'policy_pm_subscriber'
40
41
42 class MetaSingleton(type):
43     """ Metaclass used to create singleton object by overriding __call__() method """
44     _instances = {}
45
46     def __call__(cls, *args, **kwargs):
47         if cls not in cls._instances:
48             cls._instances[cls] = super().__call__(*args, **kwargs)
49         return cls._instances[cls]
50
51     @classmethod
52     def get_cls_instance(mcs, cls_name):
53         return mcs._instances[cls_name]
54
55
56 class AppConfig(metaclass=MetaSingleton):
57     """ Object representation of the PMSH Application config. """
58
59     def __init__(self):
60         app_config = self._get_config()
61         self.key_path = app_config['config'].get('key_path')
62         self.cert_path = app_config['config'].get('cert_path')
63         self.ca_cert_path = app_config['config'].get('ca_cert_path')
64         self.enable_tls = app_config['config'].get('enable_tls')
65         self.aaf_id = app_config['config'].get('aaf_identity')
66         self.aaf_pass = app_config['config'].get('aaf_password')
67         self.streams_publishes = app_config['config'].get('streams_publishes')
68         self.streams_subscribes = app_config['config'].get('streams_subscribes')
69
70     @staticmethod
71     def get_instance():
72         return AppConfig.get_cls_instance(AppConfig)
73
74     @retry(wait=wait_fixed(5), stop=stop_after_attempt(5),
75            retry=retry_if_exception_type(ValueError))
76     def _get_config(self):
77
78         """ Retrieves PMSH's configuration from Config binding service. If a non-2xx response
79         is received, it retries after 2 seconds for 5 times before raising an exception.
80
81         Returns:
82             dict: Dictionary representation of the the service configuration
83
84         Raises:
85             Exception: If any error occurred pulling configuration from Config binding service.
86         """
87         try:
88             logger.info('Attempting to fetch PMSH Configuration from CBS.')
89             config = get_all()
90             logger.info(f'Successfully fetched PMSH config from CBS: {config}')
91             return config
92         except Exception as e:
93             logger.error(f'Failed to get config from CBS: {e}', exc_info=True)
94             raise ValueError(e) from e
95
96     @mdc_handler
97     def publish_to_topic(self, mr_topic, event_json, **kwargs):
98         """
99         Publish the event to the DMaaP Message Router topic.
100
101         Args:
102             mr_topic (enum) : Message Router topic to publish.
103             event_json (dict): the json data to be published.
104         """
105         session = requests.Session()
106         topic_url = self.streams_publishes[mr_topic].get('dmaap_info').get('topic_url')
107         headers = {'content-type': 'application/json', 'x-transactionid': kwargs['request_id'],
108                    'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']}
109         logger.info(f'Publishing event to MR topic: {topic_url}')
110         response = session.post(topic_url, headers=headers,
111                                 auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json,
112                                 verify=(self.ca_cert_path if self.enable_tls else False))
113         response.raise_for_status()
114
115     @mdc_handler
116     def get_from_topic(self, mr_topic, consumer_id, consumer_group='dcae_pmsh_cg', timeout=5000,
117                        **kwargs):
118         """
119         Returns the json data from the MrTopic.
120
121         Args:
122             mr_topic (enum) : Message Router topic to subscribe.
123             consumer_id (str): Within your subscribers group, a name that uniquely
124             identifies your subscribers process.
125             consumer_group (str): A name that uniquely identifies your subscribers.
126             timeout (int): The request timeout value in mSec.
127
128         Returns:
129             list[str]: the json response from DMaaP Message Router topic.
130         """
131         try:
132             session = requests.Session()
133             topic_url = self.streams_subscribes[mr_topic].get('dmaap_info').get('topic_url')
134             headers = {'accept': 'application/json', 'content-type': 'application/json',
135                        'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']}
136             logger.info(f'Fetching messages from MR topic: {topic_url}')
137             response = session.get(f'{topic_url}/{consumer_group}/{consumer_id}'
138                                    f'?timeout={timeout}',
139                                    auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers,
140                                    verify=(self.ca_cert_path if self.enable_tls else False))
141             if response.status_code == 503:
142                 logger.error(f'MR Service is unavailable at present: {response.content}')
143             response.raise_for_status()
144             if response.ok:
145                 return response.json()
146         except Exception as e:
147             logger.error(f'Failed to fetch message from MR: {e}', exc_info=True)
148             raise