1 # Copyright 2018 Intel Corporation, Inc
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 import requests.exceptions
19 from urllib.parse import urljoin
21 from urlparse import urljoin
23 name = "onapsmsclient"
26 class InvalidRequestException(Exception):
30 class InternalServerError(Exception):
34 class UnexpectedError(Exception):
39 """Python Client for Secret Management Service"""
41 def __init__(self, url='http://localhost:10443', timeout=30, cacert=None):
42 """Creates a new SMS client instance
45 url (str): Base URL with port pointing to the SMS service
46 timeout (int): Number of seconds before aborting the connection
47 cacert (str): Path to the cacert that will be used to verify
48 If this is None, verify will be False and the server cert
49 is not verified by the client.
51 A Client object which can be used to interact with SMS service
55 self.timeout = timeout
57 self.session = requests.Session()
59 self._base_api_url = '/v1/sms'
61 def _urlJoin(self, *urls):
62 """Joins given urls into a single url
65 urls (str): url fragments to be combined.
73 def _raiseException(self, statuscode, errors=None):
74 """ Handles Exception Raising based on statusCode
77 statuscode (int): status code returned by the server
78 errors (str): list of strings containing error messages
81 exception: An exception is raised based on error message
85 raise InvalidRequestException(errors)
87 raise InternalServerError(errors)
89 raise UnexpectedError(errors)
91 def _request(self, method, url, headers=None, **kwargs):
92 """Handles request for all the client methods
95 method (str): type of HTTP method (get, post or delete).
97 headers (dict): custom headers if any.
98 **kwargs: various args supported by requests library
101 requests.Response: An object containing status_code and returned
102 json data is returned here.
107 'content-type': "application/json",
108 'Accept': "application/json"
111 # Verify the server or not based on the cacert argument
112 if self.cacert is None:
117 url = urljoin(self.base_url, url)
118 response = self.session.request(method, url, headers=headers,
119 allow_redirects=False, verify=verify,
120 timeout=self.timeout, **kwargs)
123 if response.status_code >= 400 and response.status_code < 600:
124 # Request Failed. Raise Exception.
125 errors = response.text
126 self._raiseException(response.status_code, errors)
131 """Returns Status of SMS Service
134 bool: True or False depending on if SMS Service is Sealed or not.
136 False - Unsealed and ready for operations
139 url = self._urlJoin(self._base_api_url, 'quorum', 'status')
141 response = self._request('get', url)
142 return response.json()['sealstatus']
144 def createDomain(self, domainName):
145 """Creates a Secret Domain
148 domainName (str): Name of the secret domain to create
151 string: UUID of the created domain name
154 domainName = domainName.strip()
155 data = {"name": domainName}
156 url = self._urlJoin(self._base_api_url, 'domain')
158 response = self._request('post', url, json=data)
159 return response.json()['uuid']
161 def deleteDomain(self, domainName):
162 """Deletes a Secret Domain
165 domainName (str): Name of the secret domain to delete
168 bool: True. An exception will be raised if delete failed.
171 domainName = domainName.strip()
172 url = self._urlJoin(self._base_api_url, 'domain', domainName)
174 self._request('delete', url)
177 def getSecretNames(self, domainName):
178 """Get all Secret Names in Domain
181 domainName (str): Name of the secret domain
184 string[]: List of strings each corresponding to a
185 Secret's Name in this Domain.
188 domainName = domainName.strip()
189 url = self._urlJoin(self._base_api_url, 'domain', domainName,
192 response = self._request('get', url)
193 return response.json()['secretnames']
195 def storeSecret(self, domainName, secretName, values):
196 """Store a Secret in given Domain
199 domainName (str): Name of the secret domain
200 secretName (str): Name for the Secret
201 values (dict): A dict containing name-value pairs which
205 bool: True. An exception will be raised if store failed.
208 domainName = domainName.strip()
209 secretName = secretName.strip()
210 url = self._urlJoin(self._base_api_url, 'domain', domainName,
213 if not isinstance(values, dict):
214 raise TypeError('Input values is not a dictionary')
216 data = {"name": secretName, "values": values}
217 self._request('post', url, json=data)
220 def getSecret(self, domainName, secretName):
221 """Get a particular Secret from Domain.
224 domainName (str): Name of the secret domain
225 secretName (str): Name of the secret
228 dict: dictionary containing the name-value pairs
229 which form the secret
232 domainName = domainName.strip()
233 secretName = secretName.strip()
234 url = self._urlJoin(self._base_api_url, 'domain', domainName,
235 'secret', secretName)
237 response = self._request('get', url)
238 return response.json()['values']
240 def deleteSecret(self, domainName, secretName):
241 """Delete a particular Secret from Domain.
244 domainName (str): Name of the secret domain
245 secretName (str): Name of the secret
248 bool: True. An exception will be raised if delete failed.
251 domainName = domainName.strip()
252 secretName = secretName.strip()
253 url = self._urlJoin(self._base_api_url, 'domain', domainName,
254 'secret', secretName)
256 self._request('delete', url)