1 # Copyright 2019 AT&T Intellectual Property. All rights reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 from ONAPLibrary.UUIDKeywords import UUIDKeywords
16 from RequestsLibrary import RequestsLibrary
17 from robot.api import logger
19 from ONAPLibrary.Base64Keywords import Base64Keywords
20 from ONAPLibrary.HTTPKeywords import HTTPKeywords
23 class RequestsHelper(object):
24 """ non keyword methods related to Requests library """
27 super(RequestsHelper, self).__init__()
28 self.uuid = UUIDKeywords()
29 self.application_id = "robot-ete"
30 self.requests = RequestsLibrary()
31 self.http = HTTPKeywords()
33 def get_request(self, alias, endpoint, data_path, sdc_user=None, accept="application/json", auth=None,
35 """Runs a get request"""
36 self.http.disable_warnings()
37 logger.info("Creating session" + endpoint)
38 self._create_session(alias, endpoint, auth=auth, client_certs=client_certs)
39 headers = self._create_headers(sdc_user_id=sdc_user, accept=accept)
40 resp = self.requests.get_request(alias, data_path, headers=headers)
41 logger.info("Received response from [" + alias + "]: " + resp.text)
44 def post_request(self, alias, endpoint, data_path, data, sdc_user=None, files=None, accept="application/json",
45 content_type="application/json", auth=None, client_certs=None):
46 """Runs a post request"""
47 self.http.disable_warnings()
48 logger.info("Creating session" + endpoint)
52 md5checksum = Base64Keywords().base64_encode(md5.hexdigest())
55 self._create_session(alias, endpoint, auth=auth, client_certs=client_certs)
56 headers = self._create_headers(sdc_user_id=sdc_user, accept=accept, content_type=content_type, md5=md5checksum)
57 resp = self.requests.post_request(alias, data_path, files=files, data=data, headers=headers)
58 logger.info("Received response from [" + alias + "]: " + resp.text)
61 def put_request(self, alias, endpoint, data_path, data, sdc_user=None, accept="application/json",
62 auth=None, client_certs=None):
63 """Runs a put request"""
64 self.http.disable_warnings()
65 logger.info("Creating session" + endpoint)
66 self._create_session(alias, endpoint, auth=auth, client_certs=client_certs)
67 headers = self._create_headers(sdc_user_id=sdc_user, accept=accept)
68 resp = self.requests.put_request(alias, data_path, data=data, headers=headers)
69 logger.info("Received response from [" + alias + "]: " + resp.text)
72 def delete_request(self, alias, endpoint, data_path, data=None, sdc_user=None, accept="application/json",
73 auth=None, client_certs=None):
74 """Runs a delete request"""
75 self.http.disable_warnings()
76 logger.info("Creating session" + endpoint)
77 self._create_session(alias, endpoint, auth=auth, client_certs=client_certs)
78 headers = self._create_headers(sdc_user_id=sdc_user, accept=accept)
79 resp = self.requests.delete_request(alias, data_path, data=data, headers=headers)
80 logger.info("Received response from [" + alias + "]: " + resp.text)
83 def _create_session(self, alias, endpoint, auth=None, client_certs=None):
84 if client_certs is not None:
85 self.requests.create_client_cert_session(alias, endpoint, client_certs=client_certs)
87 self.requests.create_session(alias, endpoint, auth=auth)
89 def _create_headers(self, sdc_user_id=None, accept="application/json", content_type="application/json", md5=None):
90 """Create the headers that are used by onap"""
91 uuid = self.uuid.generate_uuid4()
94 "Content-Type": content_type,
95 "X-TransactionId": self.application_id + "-" + uuid,
96 "X-FromAppId": self.application_id
98 if sdc_user_id is not None:
99 headers["USER_ID"] = sdc_user_id
101 headers["Content-MD5"] = md5