1 # Copyright 2019 AT&T Intellectual Property. All rights reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 from ONAPLibrary.UUIDKeywords import UUIDKeywords
16 from RequestsLibrary import RequestsLibrary
17 from robot.api import logger
19 from ONAPLibrary.Base64Keywords import Base64Keywords
22 class RequestsHelper(object):
23 """ non keyword methods related to Requests library """
26 super(RequestsHelper, self).__init__()
27 self.uuid = UUIDKeywords()
28 self.application_id = "robot-ete"
29 self.requests = RequestsLibrary()
31 def create_headers(self, sdc_user_id=None, accept="application/json", content_type="application/json", md5=None):
32 """Create the headers that are used by onap"""
33 uuid = self.uuid.generate_uuid4()
36 "Content-Type": content_type,
37 "X-TransactionId": self.application_id + "-" + uuid,
38 "X-FromAppId": self.application_id
40 if sdc_user_id is not None:
41 headers["USER_ID"] = sdc_user_id
43 headers["Content-MD5"] = md5
46 def get_request(self, alias, endpoint, data_path, sdc_user=None, accept="application/json", auth=None):
47 """Runs a get request"""
48 logger.info("Creating session" + endpoint)
49 self.requests.create_session(alias, endpoint, auth=auth)
50 headers = self.create_headers(sdc_user_id=sdc_user, accept=accept)
51 resp = self.requests.get_request(alias, data_path, headers=headers)
52 logger.info("Received response from [" + alias + "]: " + resp.text)
55 def post_request(self, alias, endpoint, data_path, data, sdc_user=None, files=None, accept="application/json",
56 content_type="application/json", auth=None):
57 """Runs a post request"""
58 logger.info("Creating session" + endpoint)
62 md5checksum = Base64Keywords().base64_encode(md5.hexdigest())
65 self.requests.create_session(alias, endpoint, auth=auth)
66 headers = self.create_headers(sdc_user_id=sdc_user, accept=accept, content_type=content_type, md5=md5checksum)
67 resp = self.requests.post_request(alias, data_path, files=files, data=data, headers=headers)
68 logger.info("Received response from [" + alias + "]: " + resp.text)
71 def put_request(self, alias, endpoint, data_path, data, sdc_user=None, accept="application/json", auth=None):
72 """Runs a put request"""
73 logger.info("Creating session" + endpoint)
74 self.requests.create_session(alias, endpoint, auth=auth)
75 headers = self.create_headers(sdc_user_id=sdc_user, accept=accept)
76 resp = self.requests.put_request(alias, data_path, data=data, headers=headers)
77 logger.info("Received response from [" + alias + "]: " + resp.text)
80 def delete_request(self, alias, endpoint, data_path, data=None, sdc_user=None, accept="application/json", auth=None):
81 """Runs a delete request"""
82 logger.info("Creating session" + endpoint)
83 self.requests.create_session(alias, endpoint, auth=auth)
84 headers = self.create_headers(sdc_user_id=sdc_user, accept=accept)
85 resp = self.requests.delete_request(alias, data_path, data=data, headers=headers)
86 logger.info("Received response from [" + alias + "]: " + resp.text)