Update INFO.yaml with new PTL
[testsuite/python-testing-utils.git] / robotframework-onap / ONAPLibrary / RequestsHelper.py
1 # Copyright 2019 AT&T Intellectual Property. All rights reserved.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 from ONAPLibrary.UUIDKeywords import UUIDKeywords
16 from RequestsLibrary import RequestsLibrary
17 import hashlib
18 from ONAPLibrary.Base64Keywords import Base64Keywords
19 from ONAPLibrary.HTTPKeywords import HTTPKeywords
20 from ONAPLibrary.RequestsDecorators import log_wrapped
21 from ONAPLibrary.RequestsDecorators import default_keywords
22
23
24 class RequestsHelper(object):
25     """ non keyword methods related to Requests library """
26
27     def __init__(self):
28         super(RequestsHelper, self).__init__()
29         self.uuid = UUIDKeywords()
30         self.application_id = "robot-ete"
31         self.requests = RequestsLibrary()
32         self.http = HTTPKeywords()
33
34     @default_keywords
35     @log_wrapped
36     def get_request(self, **kwargs):
37         """Runs a get request"""
38         return self.requests.get_request(alias=kwargs['alias'],  uri=kwargs['data_path'],
39                                          headers=self._perform_setup(**kwargs))
40
41     @default_keywords
42     @log_wrapped
43     def post_request(self, **kwargs):
44         """Runs a post request"""
45         kwargs['md5'] = self._format_md5(kwargs['data'])
46         return self.requests.post_request(alias=kwargs['alias'],  uri=kwargs['data_path'], files=kwargs['files'],
47                                           data=kwargs['data'], headers=self._perform_setup(**kwargs))
48
49     @default_keywords
50     @log_wrapped
51     def put_request(self, **kwargs):
52         """Runs a put request"""
53         return self.requests.put_request(alias=kwargs['alias'],  uri=kwargs['data_path'], data=kwargs['data'],
54                                          headers=self._perform_setup(**kwargs))
55
56     @default_keywords
57     @log_wrapped
58     def delete_request(self, **kwargs):
59         """Runs a delete request"""
60         return self.requests.delete_request(alias=kwargs['alias'],  uri=kwargs['data_path'], data=kwargs['data'],
61                                             headers=self._perform_setup(**kwargs))
62
63     def _perform_setup(self, **kwargs):
64         self.http.disable_warnings()
65         self._create_session(alias=kwargs['alias'], endpoint=kwargs['endpoint'], auth=kwargs['auth'],
66                              client_certs=kwargs['client_certs'])
67         return self._create_headers(sdc_user_id=kwargs['sdc_user'], accept=kwargs['accept'],
68                                     content_type=kwargs['content_type'], md5=kwargs.get("md5", None))
69
70     def _create_session(self, alias, endpoint, auth=None, client_certs=None):
71         if client_certs is not None:
72             self.requests.create_client_cert_session(alias, endpoint, client_certs=client_certs)
73         else:
74             self.requests.create_session(alias, endpoint, auth=auth)
75
76     def _create_headers(self, sdc_user_id=None, accept="application/json", content_type="application/json", md5=None):
77         """Create the headers that are used by onap"""
78         headers = {
79             "Accept": accept,
80             "Content-Type": content_type,
81             "X-TransactionId": self.uuid.generate_uuid4(),
82             "X-FromAppId": self.application_id
83         }
84         if sdc_user_id is not None:
85             headers["USER_ID"] = sdc_user_id
86         if md5 is not None:
87             headers["Content-MD5"] = md5
88         return headers
89
90     @staticmethod
91     def _format_md5(md5_input):
92         string_input = md5_input
93         if md5_input is not None:
94             if isinstance(md5_input, str):
95                 string_input = md5_input.encode('utf-8')
96             md5 = hashlib.md5()
97             md5.update(string_input)
98             return Base64Keywords().base64_encode(md5.hexdigest())
99         else:
100             return None