# http://www.apache.org/licenses/LICENSE-2.0
"""CPS onboard module."""
import base64
+import ssl
from abc import ABC
import pg8000
from onapsdk.configuration import settings
from onapsdk.cps import Anchor, Dataspace, SchemaSet
-from onaptests.utils.exceptions import EnvironmentPreparationException
+from onaptests.utils.exceptions import (EnvironmentPreparationException,
+ OnapTestException)
from ..base import BaseStep
self.get_database_credentials()
if self.login and self.password:
+ ctx = ssl.create_default_context()
+ ctx.check_hostname = False
+ ctx.verify_mode = ssl.CERT_NONE
db_params = {
"user": self.login,
"password": self.password,
"host": settings.DB_PRIMARY_HOST,
"database": settings.DATABASE,
- "port": settings.DB_PORT
+ "port": settings.DB_PORT,
+ "ssl_context": ctx
}
try:
connection = pg8000.connect(**db_params)
cursor.close()
if connection:
connection.close()
- except pg8000.Error as e:
+ except Exception as e:
self._logger.exception(f"Error while connecting to PostgreSQL: {str(e)}")
- raise
+ raise OnapTestException(e) from e
@BaseStep.store_state
def execute(self) -> None: