2 # -------------------------------------------------------------------------
3 # Copyright (c) 2018 Intel Corporation Intellectual Property
4 # Copyright (C) 2020 Wipro Limited.
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
18 # -------------------------------------------------------------------------
21 '''Secret Management Service Integration'''
23 from onapsmsclient import Client
24 import osdf.config.base as cfg_base
25 import osdf.config.credentials as creds
26 import osdf.config.loader as config_loader
27 from osdf.config.base import osdf_config
28 from osdf.logging.osdf_logging import debug_log
29 from osdf.utils import cipherUtils
32 "preload_secrets": "config/preload_secrets.yaml"
36 def preload_secrets():
37 """ This is intended to load the secrets required for testing Application
38 Actual deployment will have a preload script. Make sure the config is
40 preload_config = config_loader.load_config_file(
41 config_spec.get("preload_secrets"))
42 domain = preload_config.get("domain")
43 config = osdf_config.deployment
44 sms_url = config["aaf_sms_url"]
45 timeout = config["aaf_sms_timeout"]
46 cacert = config["aaf_ca_certs"]
47 sms_client = Client(url=sms_url, timeout=timeout, cacert=cacert)
48 domain_uuid = sms_client.createDomain(domain)
50 "Created domain {} with uuid {}".format(domain, domain_uuid))
51 secrets = preload_config.get("secrets")
52 for secret in secrets:
53 sms_client.storeSecret(domain, secret.get('name'),
55 debug_log.debug("Preload secrets complete")
58 def retrieve_secrets():
59 """Get all secrets under the domain name"""
61 config = osdf_config.deployment
62 sms_url = config["aaf_sms_url"]
63 timeout = config["aaf_sms_timeout"]
64 cacert = config["aaf_ca_certs"]
65 domain = config["secret_domain"]
66 sms_client = Client(url=sms_url, timeout=timeout, cacert=cacert)
67 secrets = sms_client.getSecretNames(domain)
68 for secret in secrets:
69 values = sms_client.getSecret(domain, secret)
70 secret_dict[secret] = values
71 debug_log.debug("Secret Dictionary Retrieval Success")
75 config = osdf_config.deployment
76 secret_dict = retrieve_secrets()
77 config['soUsername'] = secret_dict['so']['UserName']
78 config['soPassword'] = decrypt_pass(secret_dict['so']['Password'])
79 config['conductorUsername'] = secret_dict['conductor']['UserName']
80 config['conductorPassword'] = decrypt_pass(secret_dict['conductor']['Password'])
81 config['policyPlatformUsername'] = secret_dict['policyPlatform']['UserName']
82 config['policyPlatformPassword'] = decrypt_pass(secret_dict['policyPlatform']['Password'])
83 config['policyClientUsername'] = secret_dict['policyPlatform']['UserName']
84 config['policyClientPassword'] = decrypt_pass(secret_dict['policyPlatform']['Password'])
85 config['messageReaderAafUserId'] = secret_dict['dmaap']['UserName']
86 config['messageReaderAafPassword'] = decrypt_pass(secret_dict['dmaap']['Password'])
87 config['sdcUsername'] = secret_dict['sdc']['UserName']
88 config['sdcPassword'] = decrypt_pass(secret_dict['sdc']['Password'])
89 config['osdfPlacementUsername'] = secret_dict['osdfPlacement']['UserName']
90 config['osdfPlacementPassword'] = decrypt_pass(secret_dict['osdfPlacement']['Password'])
91 config['osdfPlacementSOUsername'] = secret_dict['osdfPlacementSO']['UserName']
92 config['osdfPlacementSOPassword'] = decrypt_pass(secret_dict['osdfPlacementSO']['Password'])
93 config['osdfPlacementVFCUsername'] = secret_dict['osdfPlacementVFC']['UserName']
94 config['osdfPlacementVFCPassword'] = decrypt_pass(secret_dict['osdfPlacementVFC']['Password'])
95 config['osdfCMSchedulerUsername'] = secret_dict['osdfCMScheduler']['UserName']
96 config['osdfCMSchedulerPassword'] = decrypt_pass(secret_dict['osdfCMScheduler']['Password'])
97 config['configDbUserName'] = secret_dict['configDb']['UserName']
98 config['configDbPassword'] = decrypt_pass(secret_dict['configDb']['Password'])
99 config['pciHMSUsername'] = secret_dict['pciHMS']['UserName']
100 config['pciHMSPassword'] = decrypt_pass(secret_dict['pciHMS']['Password'])
101 config['osdfPCIOptUsername'] = secret_dict['osdfPCIOpt']['UserName']
102 config['osdfPCIOptPassword'] = decrypt_pass(secret_dict['osdfPCIOpt']['Password'])
103 cfg_base.http_basic_auth_credentials = creds.load_credentials(osdf_config)
104 cfg_base.dmaap_creds = creds.dmaap_creds()
107 def decrypt_pass(passwd):
108 if passwd == '' or passwd == 'NA':
111 return cipherUtils.AESCipher.get_instance().decrypt(passwd)
114 def delete_secrets():
115 """ This is intended to delete the secrets for a clean initialization for
116 testing Application. Actual deployment will have a preload script.
117 Make sure the config is in sync"""
118 config = osdf_config.deployment
119 sms_url = config["aaf_sms_url"]
120 timeout = config["aaf_sms_timeout"]
121 cacert = config["aaf_ca_certs"]
122 domain = config["secret_domain"]
123 sms_client = Client(url=sms_url, timeout=timeout, cacert=cacert)
124 ret_val = sms_client.deleteDomain(domain)
125 debug_log.debug("Clean up complete")
129 if __name__ == "__main__":
130 # Initialize Secrets from SMS
133 # Retrieve Secrets from SMS and load to secret cache
134 # Use the secret_cache instead of config files
135 secret_cache = retrieve_secrets()
137 # Clean up Delete secrets and domain