1 # http://www.apache.org/licenses/LICENSE-2.0
2 """CPS onboard module."""
7 from kubernetes import client, config
8 from onapsdk.configuration import settings
9 from onapsdk.cps import Anchor, Dataspace, SchemaSet
11 from onaptests.utils.exceptions import EnvironmentPreparationException
13 from ..base import BaseStep
16 class CpsBaseStep(BaseStep, ABC):
17 """Abstract CPS base step."""
20 def component(self) -> str:
25 class CreateCpsDataspaceStep(CpsBaseStep):
26 """Step to create a dataspace."""
28 def __init__(self) -> None:
29 """Initialize step."""
30 super().__init__(cleanup=settings.CLEANUP_FLAG)
33 def description(self) -> str:
34 """Step description."""
35 return "Create CPS dataspace."
38 def execute(self) -> None:
39 """Create a dataspace.
46 Dataspace.create(settings.DATASPACE_NAME)
48 @BaseStep.store_state(cleanup=True)
49 def cleanup(self) -> None:
50 """Delete created dataspace."""
51 dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
56 class CreateCpsSchemaSetStep(CpsBaseStep):
57 """Step to check schema-set creation."""
59 def __init__(self) -> None:
63 - CreateCpsDataspaceStep.
65 super().__init__(cleanup=settings.CLEANUP_FLAG)
66 self.add_step(CreateCpsDataspaceStep())
69 def description(self) -> str:
70 """Step description."""
71 return "Create CPS bookstore schema set"
74 def execute(self) -> None:
75 """Get dataspace created on substep and create schema-set.
83 dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
84 with settings.SCHEMA_SET_FILE.open("rb") as schema_set_file:
85 dataspace.create_schema_set(settings.SCHEMA_SET_NAME, schema_set_file)
87 @BaseStep.store_state(cleanup=True)
88 def cleanup(self) -> None:
89 """Delete created schema-set."""
90 dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
91 schema_set = dataspace.get_schema_set(settings.SCHEMA_SET_NAME)
96 class CreateCpsAnchorStep(CpsBaseStep):
97 """Step to create an anchor."""
99 def __init__(self) -> None:
103 - CreateCpsSchemaSetStep.
105 super().__init__(cleanup=settings.CLEANUP_FLAG)
106 self.add_step(CreateCpsSchemaSetStep())
109 def description(self) -> str:
110 """Step description."""
111 return "Create CPS anchor"
113 @BaseStep.store_state
114 def execute(self) -> None:
117 Get dataspace and schema-set created substeps and create anchor.
126 dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
127 schema_set: SchemaSet = dataspace.get_schema_set(settings.SCHEMA_SET_NAME)
128 dataspace.create_anchor(schema_set, settings.ANCHOR_NAME)
130 @BaseStep.store_state(cleanup=True)
131 def cleanup(self) -> None:
132 """Delete an anchor."""
133 dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
134 anchor: Anchor = dataspace.get_anchor(settings.ANCHOR_NAME)
139 class CreateCpsAnchorNodeStep(CpsBaseStep):
140 """Step to create anchor node."""
142 def __init__(self) -> None:
146 - CreateCpsAnchorStep.
148 super().__init__(cleanup=settings.CLEANUP_FLAG)
149 self.add_step(CreateCpsAnchorStep())
152 def description(self) -> str:
153 """Step description."""
154 return "Create CPS anchor node"
156 @BaseStep.store_state
157 def execute(self) -> None:
158 """Create a node on an anchor created on substep.
167 dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
168 anchor: Anchor = dataspace.get_anchor(settings.ANCHOR_NAME)
169 anchor.create_node(settings.ANCHOR_DATA)
171 @BaseStep.store_state(cleanup=True)
172 def cleanup(self) -> None:
174 dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
175 anchor: Anchor = dataspace.get_anchor(settings.ANCHOR_NAME)
176 anchor.delete_nodes("/")
180 class UpdateCpsAnchorNodeStep(CpsBaseStep):
181 """Step to update node on anchor creation."""
183 def __init__(self) -> None:
187 - CreateCpsAnchorNodeStep.
189 super().__init__(cleanup=BaseStep.HAS_NO_CLEANUP)
190 self.add_step(CreateCpsAnchorNodeStep())
193 def description(self) -> str:
194 """Step description."""
195 return "Update CPS anchor node"
197 @BaseStep.store_state
198 def execute(self) -> None:
199 """Update a node on an anchor created on substep.
208 dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
209 anchor: Anchor = dataspace.get_anchor(settings.ANCHOR_NAME)
210 anchor.update_node("/", settings.ANCHOR_DATA_2)
213 class QueryCpsAnchorNodeStep(CpsBaseStep):
214 """Step to check query on node."""
216 def __init__(self) -> None:
220 - UpdateCpsAnchorNodeStep.
223 super().__init__(cleanup=BaseStep.HAS_NO_CLEANUP)
224 self.add_step(UpdateCpsAnchorNodeStep())
227 def description(self) -> str:
228 """Step description."""
231 @BaseStep.store_state
232 def execute(self) -> None:
233 """Query on node on an anchor created on substep.
244 dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
245 anchor: Anchor = dataspace.get_anchor(settings.ANCHOR_NAME)
246 anchor.query_node(settings.QUERY_1)
247 anchor.query_node(settings.QUERY_2)
248 anchor.query_node(settings.QUERY_3)
251 class CheckPostgressDataBaseConnectionStep(CpsBaseStep):
252 """Step to test connection with postgress."""
254 def __init__(self) -> None:
255 """Initialize step."""
256 super().__init__(cleanup=BaseStep.HAS_NO_CLEANUP)
259 def description(self) -> str:
260 """Step description."""
261 return "Establish connection with Postgress and execute the query"
263 def get_database_credentials(self):
264 config.load_kube_config()
265 api_instance = client.CoreV1Api()
267 secret = api_instance.read_namespaced_secret(
268 settings.SECRET_NAME, settings.K8S_ONAP_NAMESPACE)
270 if settings.DB_LOGIN in secret.data and settings.DB_PASSWORD in secret.data:
271 login_base64 = secret.data[settings.DB_LOGIN]
272 self.login = base64.b64decode(login_base64).decode("utf-8")
273 password_base64 = secret.data[settings.DB_PASSWORD]
274 self.password = base64.b64decode(password_base64).decode("utf-8")
276 raise EnvironmentPreparationException(
277 "Login key or password key not found in secret")
279 raise EnvironmentPreparationException("Secret data not found in secret")
280 except client.rest.ApiException as e:
283 raise EnvironmentPreparationException("Error accessing secret") from e
285 def connect_to_postgress(self):
286 self.get_database_credentials()
287 if self.login and self.password:
290 "password": self.password,
291 "host": settings.DB_PRIMARY_HOST,
292 "database": settings.DATABASE,
293 "port": settings.DB_PORT
296 connection = pg8000.connect(**db_params)
297 cursor = connection.cursor()
298 select_query = "SELECT * FROM yang_resource LIMIT 1;"
299 cursor.execute(select_query)
304 except pg8000.Error as e:
305 self._logger.exception(f"Error while connecting to PostgreSQL: {str(e)}")
308 @BaseStep.store_state
309 def execute(self) -> None:
310 """Establish connection with Postgress and execute the query.
316 - K8S_ONAP_NAMESPACE,
323 self.connect_to_postgress()