1 # http://www.apache.org/licenses/LICENSE-2.0
2 """CPS onboard module."""
5 from onapsdk.configuration import settings
6 from onapsdk.cps import Anchor, Dataspace, SchemaSet
8 from ..base import BaseStep
11 class CpsBaseStep(BaseStep, ABC):
12 """Abstract CPS base step."""
15 def component(self) -> str:
20 class CreateCpsDataspaceStep(CpsBaseStep):
21 """Step to create a dataspace."""
24 def description(self) -> str:
25 """Step description."""
26 return "Create CPS dataspace."
29 def execute(self) -> None:
30 """Create a dataspace."""
32 Dataspace.create(settings.DATASPACE_NAME)
34 @BaseStep.store_state(cleanup=True)
35 def cleanup(self) -> None:
36 """Delete created dataspace."""
37 dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
42 class CreateCpsSchemaSetStep(CpsBaseStep):
43 """Step to check schema-set creation."""
45 def __init__(self, cleanup: bool = False) -> None:
49 - CreateCpsDataspaceStep.
51 super().__init__(cleanup)
52 self.add_step(CreateCpsDataspaceStep(cleanup))
55 def description(self) -> str:
56 """Step description."""
57 return "Create CPS bookstore schema set"
60 def execute(self) -> None:
61 """Get dataspace created on substep and create schema-set."""
63 dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
64 with settings.SCHEMA_SET_FILE.open("rb") as schema_set_file:
65 dataspace.create_schema_set(settings.SCHEMA_SET_NAME, schema_set_file)
67 @BaseStep.store_state(cleanup=True)
68 def cleanup(self) -> None:
69 """Delete created schema-set."""
70 dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
71 schema_set = dataspace.get_schema_set(settings.SCHEMA_SET_NAME)
76 class CreateCpsAnchorStep(CpsBaseStep):
77 """Step to create an anchor."""
79 def __init__(self, cleanup: bool = False) -> None:
83 - CreateCpsSchemaSetStep.
85 super().__init__(cleanup)
86 self.add_step(CreateCpsSchemaSetStep(cleanup))
89 def description(self) -> str:
90 """Step description."""
91 return "Create CPS anchor"
94 def execute(self) -> None:
97 Get dataspace and schema-set created substeps and create anchor.
100 dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
101 schema_set: SchemaSet = dataspace.get_schema_set(settings.SCHEMA_SET_NAME)
102 dataspace.create_anchor(schema_set, settings.ANCHOR_NAME)
104 @BaseStep.store_state(cleanup=True)
105 def cleanup(self) -> None:
106 """Delete an anchor."""
107 dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
108 anchor: Anchor = dataspace.get_anchor(settings.ANCHOR_NAME)
113 class CreateCpsAnchorNodeStep(CpsBaseStep):
114 """Step to check node on anchor creation."""
116 def __init__(self, cleanup: bool = False) -> None:
120 - CreateCpsAnchorStep.
122 super().__init__(cleanup)
123 self.add_step(CreateCpsAnchorStep(cleanup))
126 def description(self) -> str:
127 """Step description."""
128 return "Create CPS anchor node"
130 @BaseStep.store_state
131 def execute(self) -> None:
132 """Create a node on an anchor created on substep."""
134 dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
135 anchor: Anchor = dataspace.get_anchor(settings.ANCHOR_NAME)
136 anchor.create_node(settings.ANCHOR_DATA)
138 @BaseStep.store_state(cleanup=True)
139 def cleanup(self) -> None:
141 dataspace: Dataspace = Dataspace(settings.DATASPACE_NAME)
142 anchor: Anchor = dataspace.get_anchor(settings.ANCHOR_NAME)
143 anchor.delete_nodes("/")