Merge "CSIT tests update for Synchronous VES collector"
[integration/csit.git] / scripts / dmaap-datarouter / robot_ssl / update_ca.py
1 # ============LICENSE_START===================================================
2 #  Copyright (C) 2019-2021 Nordix Foundation.
3 # ============================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # SPDX-License-Identifier: Apache-2.0
17 # ============LICENSE_END=====================================================
18
19 import certifi
20 import os
21
22
23 def add_onap_ca_cert():
24     cafile = certifi.where()
25     dir_path = os.path.dirname(os.path.realpath(__file__))
26     datarouter_ca = dir_path + '/onap_ca_cert.pem'
27     with open(datarouter_ca, 'rb') as infile:
28         customca = infile.read()
29
30     with open(cafile, 'ab') as outfile:
31         outfile.write(customca)
32
33     print("Added DR Cert to CA")
34
35
36 def remove_onap_ca_cert():
37     cafile = certifi.where()
38     number_of_lines_to_delete = 40
39     count = 0
40     dr_cert_exists = False
41
42     with open(cafile, 'r+b', buffering=0) as outfile:
43         for line in outfile.readlines()[-36:-35]:
44             if '# Serial: 0x9EAEEDC0A7CEB59D'.encode() in line:
45                 dr_cert_exists = True
46         if dr_cert_exists:
47             outfile.seek(0, os.SEEK_END)
48             end = outfile.tell()
49             while outfile.tell() > 0:
50                 outfile.seek(-1, os.SEEK_CUR)
51                 char = outfile.read(1)
52                 if char == b'\n':
53                     count += 1
54                 if count == number_of_lines_to_delete:
55                     outfile.truncate()
56                     print(
57                         "Removed " + str(number_of_lines_to_delete) + " lines from end of CA File")
58                     exit(0)
59                 outfile.seek(-1, os.SEEK_CUR)
60         else:
61             print("No DR cert in CA File to remove")
62
63     if count < number_of_lines_to_delete + 1:
64         print("Number of lines in file less than number of lines to delete. Exiting...")
65         exit(1)