1 # Copyright 2019 AT&T Intellectual Property. All rights reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 from RequestsLibrary import RequestsLibrary
15 from robot.api import logger
16 from robot.api.deco import keyword
17 from robot.libraries.BuiltIn import BuiltIn
21 from ONAPLibrary.Base64Keywords import Base64Keywords
22 from ONAPLibrary.UUIDKeywords import UUIDKeywords
25 class BaseSDCKeywords(object):
26 """The main interface for interacting with SDC. It handles low level stuff like managing the http request library
27 and required fields. """
30 super(BaseSDCKeywords, self).__init__()
31 self.application_id = "robot-ete"
32 self.uuid = UUIDKeywords()
33 self.builtin = BuiltIn()
36 def run_get_request(self, endpoint, data_path, user, accept="application/json", auth=None):
37 """Runs an SDC get request"""
38 resp = self.get_request(endpoint, data_path, user, accept, auth)
39 self.builtin.should_be_equal_as_strings(resp.status_code, "200")
43 def run_post_request(self, endpoint, data_path, data, user, accept="application/json", auth=None):
44 """Runs an SDC post request"""
45 return self.post_request(endpoint, data_path, data, user, files=None, accept=accept, auth=auth)
48 def run_post_files_request(self, endpoint, data_path, files, user, accept="application/json", auth=None):
49 """Runs an SDC post files request"""
50 return self.post_request(endpoint, data_path, files, user, files=None, accept=accept,
51 content_type="multipart/form-data", auth=auth)
54 def run_put_request(self, endpoint, data_path, data, user, accept="application/json", auth=None):
55 """Runs an SDC post request"""
56 return self.put_request(endpoint, data_path, data, user, accept, auth)
58 def get_request(self, endpoint, data_path, user, accept="application/json", auth=None):
59 """Runs an SDC get request"""
60 logger.info("Creating session" + endpoint)
61 RequestsLibrary().create_session("sdc", endpoint, auth=auth)
62 resp = RequestsLibrary().get_request("sdc", data_path, headers=self.create_headers(user, accept))
63 logger.info("Received response from sdc " + resp.text)
66 def create_headers(self, user=None, accept="application/json", content_type="application/json", md5=None):
67 """Create the headers that are used by sdc"""
68 uuid = self.uuid.generate_uuid4()
71 "Content-Type": content_type,
72 "X-TransactionId": self.application_id + "-" + uuid,
73 "X-FromAppId": self.application_id
76 headers["USER_ID"] = user
78 headers["Content-MD5"] = md5
81 def post_request(self, endpoint, data_path, data, user, files=None, accept="application/json",
82 content_type="application/json", auth=None):
83 """Runs an SDC post request"""
84 logger.info("Creating session" + endpoint)
86 md5checksum = Base64Keywords().base64_encode(md5.update(data).hexdigest())
87 RequestsLibrary().create_session("sdc", endpoint, auth=auth)
88 headers = self.create_headers(user, accept=accept, content_type=content_type, md5=md5checksum)
89 resp = RequestsLibrary().post_request("sdc", data_path, files=files, data=data, headers=headers)
91 logger.info("Received response from sdc " + resp.text)
94 def put_request(self, endpoint, data_path, data, user, accept="application/json", auth=None):
95 """Runs an SDC post request"""
96 logger.info("Creating session" + endpoint)
97 RequestsLibrary().create_session("sdc", endpoint, auth=auth)
98 resp = RequestsLibrary().put_request("sdc", data_path, data=data, headers=self.create_headers(user, accept))
99 logger.info("Received response from sdc " + resp.text)
102 def delete_request(self, endpoint, data_path, data, user, accept="application/json", auth=None):
103 """Runs an SDC post request"""
104 logger.info("Creating session" + endpoint)
105 RequestsLibrary().create_session("sdc", endpoint, auth=auth)
106 resp = RequestsLibrary().delete_request("sdc", data_path, data=data, headers=self.create_headers(user, accept))
107 logger.info("Received response from sdc " + resp.text)