+import time
from typing import Any, Dict
from yaml import load
from onapsdk.configuration import settings
+from onapsdk.exceptions import APIError, ResourceNotFound
from onapsdk.sdc.component import Component
from onapsdk.sdc.pnf import Pnf
from onapsdk.sdc.properties import ComponentProperty
if settings.PNF_NAME != "":
pnf: Pnf = Pnf(name=settings.PNF_NAME)
service.add_resource(pnf)
- service.checkin()
- service.onboard()
+ # If the service is already distributed, do not try to checkin/onboard (replay of tests)
+ # checkin is done if needed
+ # If service is replayed, no need to try to re-onboard the model
+ if not service.distributed:
+ try:
+ service.checkin()
+ except (APIError, ResourceNotFound):
+ # Retry as checkin may be a bit long
+ # Temp workaround to avoid internal race in SDC
+ time.sleep(5)
+ service.checkin()
+ service.onboard()
class YamlTemplateServiceOnboardStep(YamlTemplateBaseStep):
service.create()
self.declare_resources(service)
self.assign_properties(service)
- service.checkin()
- service.onboard()
+ # If the service is already distributed, do not try to checkin/onboard (replay of tests)
+ # checkin is done if needed
+ # If service is replayed, no need to try to re-onboard the model
+ if not service.distributed:
+ try:
+ service.checkin()
+ except (APIError, ResourceNotFound):
+ # Retry as checkin may be a bit long
+ # Temp workaround to avoid internal race in SDC
+ time.sleep(5)
+ service.checkin()
+ service.onboard()
def declare_resources(self, service: Service) -> None:
"""Declare resources.