#!/usr/bin/env python
"""Basic CPS test case."""
-from onaptests.scenario.scenario_base import ScenarioBase
-from onaptests.steps.onboard.cps import CreateCpsAnchorNodeStep
+from onaptests.scenario.scenario_base import BaseStep, ScenarioBase
+from onaptests.steps.onboard.cps import (CheckPostgressDataBaseConnectionStep,
+ QueryCpsAnchorNodeStep)
+
+
+class BasicCpsStep(BaseStep):
+ def __init__(self):
+ """Initialize step.
+
+ Substeps:
+ - QueryCpsAnchorNodeStep
+ - CheckPostgressDataBaseConnectionStep.
+ """
+ super().__init__(cleanup=BaseStep.HAS_NO_CLEANUP)
+ self.add_step(QueryCpsAnchorNodeStep())
+ self.add_step(CheckPostgressDataBaseConnectionStep())
+
+ @property
+ def description(self) -> str:
+ """Step description.
+
+ Used for reports
+
+ Returns:
+ str: Step description
+
+ """
+ return "Basic CPS scenario step"
+
+ @property
+ def component(self) -> str:
+ """Component name.
+
+ Name of component which step is related with.
+ Most is the name of ONAP component.
+
+ Returns:
+ str: Component name
+
+ """
+ return "TEST"
class BasicCps(ScenarioBase):
- dataspace
- schemaset
- anchor
- And create a node. Use bookstore YANG model (available on CPS repository
+ - node
+ Update and Query on node
+ And check PostgreSQL connection. Use bookstore YANG model (available on CPS repository
https://github.com/onap/cps/blob/master/cps-service/src/test/resources/bookstore.yang).
At the end delete what's available to be deleted.
def __init__(self, **kwargs):
"""Init Basic CPS."""
super().__init__('basic_cps', **kwargs)
- self.test = CreateCpsAnchorNodeStep()
+ self.test = BasicCpsStep()
# http://www.apache.org/licenses/LICENSE-2.0
"""CPS onboard module."""
+import base64
from abc import ABC
+import pg8000
+from kubernetes import client, config
from onapsdk.configuration import settings
from onapsdk.cps import Anchor, Dataspace, SchemaSet
+from onaptests.utils.exceptions import EnvironmentPreparationException
+
from ..base import BaseStep
class CreateCpsAnchorNodeStep(CpsBaseStep):
- """Step to check node on anchor creation."""
+ """Step to create anchor node."""
def __init__(self) -> None:
"""Initialize step.
anchor: Anchor = dataspace.get_anchor(settings.ANCHOR_NAME)
anchor.delete_nodes("/")
super().cleanup()
+
+
+class UpdateCpsAnchorNodeStep(CpsBaseStep):
+ """Step to update node on anchor creation."""
+
+ def __init__(self) -> None:
+ """Initialize step.
+
+ Substeps:
+ - CreateCpsAnchorNodeStep.
+ """
+ super().__init__(cleanup=BaseStep.HAS_NO_CLEANUP)
+ self.add_step(CreateCpsAnchorNodeStep())
+
+ @property
+ def description(self) -> str:
+ """Step description."""
+ return "Update CPS anchor node"
+
+ @BaseStep.store_state
+ def execute(self) -> None:
+ """Update a node on an anchor created on substep.
+
+ Use settings values:
+ - DATASPACE_NAME,
+ - ANCHOR_NAME,
+ - ANCHOR_DATA_2.
+
+ """
+ super().execute()
+ dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
+ anchor: Anchor = dataspace.get_anchor(settings.ANCHOR_NAME)
+ anchor.update_node("/", settings.ANCHOR_DATA_2)
+
+
+class QueryCpsAnchorNodeStep(CpsBaseStep):
+ """Step to check query on node."""
+
+ def __init__(self) -> None:
+ """Initialize step.
+
+ Substeps:
+ - UpdateCpsAnchorNodeStep.
+
+ """
+ super().__init__(cleanup=BaseStep.HAS_NO_CLEANUP)
+ self.add_step(UpdateCpsAnchorNodeStep())
+
+ @property
+ def description(self) -> str:
+ """Step description."""
+ return "Query node"
+
+ @BaseStep.store_state
+ def execute(self) -> None:
+ """Query on node on an anchor created on substep.
+
+ Use settings values:
+ - DATASPACE_NAME,
+ - ANCHOR_NAME,
+ - QUERY_1,
+ - QUERY_2,
+ - QUERY_3.
+
+ """
+ super().execute()
+ dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
+ anchor: Anchor = dataspace.get_anchor(settings.ANCHOR_NAME)
+ anchor.query_node(settings.QUERY_1)
+ anchor.query_node(settings.QUERY_2)
+ anchor.query_node(settings.QUERY_3)
+
+
+class CheckPostgressDataBaseConnectionStep(CpsBaseStep):
+ """Step to test connection with postgress."""
+
+ def __init__(self) -> None:
+ """Initialize step."""
+ super().__init__(cleanup=BaseStep.HAS_NO_CLEANUP)
+
+ @property
+ def description(self) -> str:
+ """Step description."""
+ return "Establish connection with Postgress and execute the query"
+
+ def get_database_credentials(self):
+ config.load_kube_config()
+ api_instance = client.CoreV1Api()
+ try:
+ secret = api_instance.read_namespaced_secret(
+ settings.SECRET_NAME, settings.K8S_ONAP_NAMESPACE)
+ if secret.data:
+ if settings.DB_LOGIN in secret.data and settings.DB_PASSWORD in secret.data:
+ login_base64 = secret.data[settings.DB_LOGIN]
+ self.login = base64.b64decode(login_base64).decode("utf-8")
+ password_base64 = secret.data[settings.DB_PASSWORD]
+ self.password = base64.b64decode(password_base64).decode("utf-8")
+ else:
+ raise EnvironmentPreparationException(
+ "Login key or password key not found in secret")
+ else:
+ raise EnvironmentPreparationException("Secret data not found in secret")
+ except client.rest.ApiException as e:
+ self.login = None
+ self.password = None
+ raise EnvironmentPreparationException("Error accessing secret") from e
+
+ def connect_to_postgress(self):
+ self.get_database_credentials()
+ if self.login and self.password:
+ db_params = {
+ "user": self.login,
+ "password": self.password,
+ "host": settings.DB_PRIMARY_HOST,
+ "database": settings.DATABASE,
+ "port": settings.DB_PORT
+ }
+ try:
+ connection = pg8000.connect(**db_params)
+ cursor = connection.cursor()
+ select_query = "SELECT * FROM yang_resource LIMIT 1;"
+ cursor.execute(select_query)
+ if cursor:
+ cursor.close()
+ if connection:
+ connection.close()
+ except pg8000.Error as e:
+ self._logger.exception(f"Error while connecting to PostgreSQL: {str(e)}")
+ raise
+
+ @BaseStep.store_state
+ def execute(self) -> None:
+ """Establish connection with Postgress and execute the query.
+
+ Use settings values:
+ - DB_PRIMARY_HOST,
+ - DATABASE,
+ - DB_PORT,
+ - K8S_ONAP_NAMESPACE,
+ - SECRET_NAME,
+ - DB_LOGIN,
+ - DB_PASSWORD.
+
+ """
+ super().execute()
+ self.connect_to_postgress()