ETSI SOL001 v2.5.1 model types not deployed in upgrade
[sdc.git] / catalog-be / src / main / resources / scripts / sdcBePy / tosca / models / model_import_manager.py
1 # ============LICENSE_START=======================================================
2 #  Copyright (C) 2021 Nordix Foundation
3 #  ===============================================================================
4 #  Licensed under the Apache License, Version 2.0 (the "License");
5 #  you may not use this file except in compliance with the License.
6 #  You may obtain a copy of the License at
7 #
8 #        http://www.apache.org/licenses/LICENSE-2.0
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14 #
15 #  SPDX-License-Identifier: Apache-2.0
16 #  ============LICENSE_END========================================================
17
18 import json
19 import os
20 import zipfile
21 from sdcBePy.tosca.models.normativeTypeCandidate import NormativeTypeCandidate
22 from pathlib import Path
23
24
25 class ModelImportManager:
26     """Manages the model imports directory"""
27
28     INIT_FOLDER_NAME = 'init'
29     UPGRADE_FOLDER_NAME = 'upgrade'
30     IMPORTS_FOLDER_NAME = 'imports'
31     ACTION_UPGRADE = 'upgrade'
32     ACTION_INIT = 'init'
33     TYPES_FOLDER = 'tosca'
34     NODE_FOLDER = 'node-types'
35
36     def __init__(self, model_imports_path, model_client):
37         self.__model_base_path = model_imports_path
38         self.__model_init_path = self.__model_base_path / self.INIT_FOLDER_NAME
39         self.__model_upgrade_path = self.__model_base_path / self.UPGRADE_FOLDER_NAME
40         self.__model_client = model_client
41
42     def deploy_models(self):
43         existing_models = self.__model_client.get_model_list()
44         for model_folder_name in self.__get_model_init_list():
45             model_payload_dict = self.__read_model_payload(model_folder_name, self.ACTION_INIT)
46             if (not existing_models or not any(m for m in existing_models if model_payload_dict['name'] == m['name'])):
47                 self.__create_models(model_folder_name, model_payload_dict)
48
49         for model_folder_name in self.__get_model_upgrade_list():
50             model_payload_dict = self.__read_model_payload(model_folder_name, self.ACTION_UPGRADE)
51             if (existing_models and any(m for m in existing_models if model_payload_dict['name'] == m['name'])):
52                 self.__update_models(model_folder_name, model_payload_dict)
53
54     def __create_models(self, model_folder_name, model_payload_dict):
55         model_imports_zip_path = self.__zip_model_imports(model_folder_name, self.ACTION_INIT)
56         self.__model_client.create_model(model_payload_dict, model_imports_zip_path)
57         self.__init_model_non_node_types(model_folder_name, model_payload_dict)
58         self.__init_model_node_types(model_folder_name, model_payload_dict)
59         self.__init_model_non_node_types(model_folder_name, model_payload_dict, True);
60
61     def __update_models(self, model_folder_name, model_payload_dict):
62         model_imports_zip_path = self.__zip_model_imports(model_folder_name, self.ACTION_UPGRADE)
63         self.__model_client.update_model_imports(model_payload_dict, model_imports_zip_path)
64         self.__upgrade_model_non_node_types(model_folder_name, model_payload_dict)
65         self.__upgrade_model_node_types(model_folder_name, model_payload_dict)
66         self.__upgrade_model_non_node_types(model_folder_name, model_payload_dict, True)
67
68     def __get_model_init_list(self):
69         return self.__get_model_list(self.__model_init_path)
70
71     def __get_model_upgrade_list(self):
72         return self.__get_model_list(self.__model_upgrade_path)
73
74     @staticmethod
75     def __get_model_list(path):
76         model_list = []
77         for (dirpath, dirnames, filenames) in os.walk(path):
78             model_list.extend(dirnames)
79             break
80         return model_list
81
82     def __get_node_type_list(self, path):
83         return [NormativeTypeCandidate(str(os.path.join(path, '')), self.__read_model_type_json(path))]
84
85     def __zip_model_imports(self, model, action_type) -> Path:
86         base_path = self.__get_base_action_path(action_type)
87         model_path = base_path / model
88         model_imports_path = base_path / model / self.IMPORTS_FOLDER_NAME
89         zip_file_path = model_path / "{}.zip".format(model)
90         zip_file = zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED)
91         for root, dirs, files in os.walk(model_imports_path):
92             for file in files:
93                 zip_file.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), model_imports_path))
94         zip_file.close()
95         return zip_file_path
96
97     def __read_model_payload_as_string(self, model, action_type) -> str:
98         base_path = self.__get_base_action_path(action_type)
99         model_payload_path = base_path / model / "payload.json"
100         json_file = open(model_payload_path)
101         json_data = json.load(json_file, strict=False)
102         return json.dumps(json_data)
103
104     def __read_model_type_json(self, tosca_path):
105         path = tosca_path / "types.json"
106         if not os.path.isfile(path):
107             return []
108         json_file = open(path)
109         return json.load(json_file)
110
111     def __read_model_payload(self, model, action_type) -> dict:
112         base_path = self.__get_base_action_path(action_type)
113         model_payload_path = base_path / model / "payload.json"
114         json_file = open(model_payload_path)
115         return json.load(json_file, strict=False)
116
117     def __get_base_action_path(self, action_type) -> Path:
118         return self.__model_init_path if action_type == self.INIT_FOLDER_NAME else self.__model_upgrade_path
119
120     def __get_tosca_path(self, action, model):
121         return self.__get_base_action_path(action) / model / self.TYPES_FOLDER
122
123     def __init_model_non_node_types(self, model, model_payload_dict, with_metadata=False):
124         path = self.__get_tosca_path(self.ACTION_INIT, model)
125         if os.path.isdir(path):
126             self.__model_client.import_model_elements(model_payload_dict, str(os.path.join(path, '')) , with_metadata)
127
128     def __upgrade_model_non_node_types(self, model, model_payload_dict, with_metadata=False):
129         path = self.__get_tosca_path(self.ACTION_UPGRADE, model)
130         if os.path.isdir(path):
131             self.__model_client.import_model_elements(model_payload_dict, str(os.path.join(path, '')), with_metadata)
132
133     def __init_model_node_types(self, model, model_payload_dict, upgrade=False):
134         path = self.__get_tosca_path(self.ACTION_INIT, model) / self.NODE_FOLDER
135         if os.path.isdir(path):
136             self.__model_client.import_model_types(model_payload_dict, self.__get_node_type_list(path), upgrade)
137
138     def __upgrade_model_node_types(self, model, model_payload_dict, upgrade=True):
139         path = self.__get_tosca_path(self.ACTION_UPGRADE, model) / self.NODE_FOLDER
140         if os.path.isdir(path):
141             self.__model_client.import_model_types(model_payload_dict, self.__get_node_type_list(path), upgrade)