return null;
}
+ /**
+ * Finds startPhase from a map of properties.
+ *
+ * @param properties Map of properties
+ * @return startPhase
+ */
+ public static int findStartPhase(Map<String, Object> properties) {
+ var objParticipantType = properties.get("startPhase");
+ if (objParticipantType != null) {
+ return Integer.valueOf(objParticipantType.toString());
+ }
+ return 0;
+ }
+
/**
* Checks If NodeTemplate Is ControlLoopElement.
*
// A list of ParticipantUpdates instances which carries details of an updated participant.
private List<ParticipantUpdates> participantUpdatesList = new ArrayList<>();
+ private Integer startPhase = 0;
/**
* Constructor for instantiating ControlLoopUpdate class with message name.
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoops;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantUpdates;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantUtils;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopAck;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopStateChange;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate;
return;
}
+ if (0 == updateMsg.getStartPhase()) {
+ handleClUpdatePhase0(updateMsg, clElementDefinitions);
+ } else {
+ handleClUpdatePhaseN(updateMsg, clElementDefinitions);
+ }
+ }
+
+ private void handleClUpdatePhase0(ControlLoopUpdate updateMsg,
+ List<ControlLoopElementDefinition> clElementDefinitions) {
var controlLoop = controlLoopMap.get(updateMsg.getControlLoopId());
// TODO: Updates to existing ControlLoops are not supported yet (Addition/Removal of ControlLoop
var clElements = storeElementsOnThisParticipant(updateMsg.getParticipantUpdatesList());
+ var clElementMap = prepareClElementMap(clElements);
+ controlLoop = new ControlLoop();
+ controlLoop.setDefinition(updateMsg.getControlLoopId());
+ controlLoop.setElements(clElementMap);
+ controlLoopMap.put(updateMsg.getControlLoopId(), controlLoop);
+
+ handleControlLoopElementUpdate(clElements, clElementDefinitions, updateMsg.getStartPhase(),
+ updateMsg.getControlLoopId());
+ }
+
+ private void handleClUpdatePhaseN(ControlLoopUpdate updateMsg,
+ List<ControlLoopElementDefinition> clElementDefinitions) {
+
+ var clElementList = updateMsg.getParticipantUpdatesList().stream()
+ .flatMap(participantUpdate -> participantUpdate.getControlLoopElementList().stream())
+ .filter(element -> participantType.equals(element.getParticipantType())).collect(Collectors.toList());
+
+ handleControlLoopElementUpdate(clElementList, clElementDefinitions, updateMsg.getStartPhase(),
+ updateMsg.getControlLoopId());
+ }
+
+ private void handleControlLoopElementUpdate(List<ControlLoopElement> clElements,
+ List<ControlLoopElementDefinition> clElementDefinitions, Integer startPhaseMsg,
+ ToscaConceptIdentifier controlLoopId) {
try {
for (var element : clElements) {
- var clElementNodeTemplate =
- getClElementNodeTemplate(clElementDefinitions, element.getDefinition());
- for (var clElementListener : listeners) {
- clElementListener.controlLoopElementUpdate(updateMsg.getControlLoopId(), element,
- clElementNodeTemplate);
+ var clElementNodeTemplate = getClElementNodeTemplate(clElementDefinitions, element.getDefinition());
+ if (clElementNodeTemplate != null) {
+ int startPhase = ParticipantUtils.findStartPhase(clElementNodeTemplate.getProperties());
+ if (startPhaseMsg.equals(startPhase)) {
+ for (var clElementListener : listeners) {
+ clElementListener.controlLoopElementUpdate(controlLoopId, element, clElementNodeTemplate);
+ }
+ }
}
}
} catch (PfModelException e) {
- LOGGER.debug("Control loop element update failed {}", updateMsg.getControlLoopId());
+ LOGGER.debug("Control loop element update failed {}", controlLoopId);
}
- var clElementMap = prepareClElementMap(clElements);
- controlLoop = new ControlLoop();
- controlLoop.setDefinition(updateMsg.getControlLoopId());
- controlLoop.setElements(clElementMap);
- controlLoopMap.put(updateMsg.getControlLoopId(), controlLoop);
}
private ToscaNodeTemplate getClElementNodeTemplate(List<ControlLoopElementDefinition> clElementDefinitions,
ToscaConceptIdentifier clElementDefId) {
+
for (var clElementDefinition : clElementDefinitions) {
if (clElementDefinition.getClElementDefinitionId().equals(clElementDefId)) {
return clElementDefinition.getControlLoopElementToscaNodeTemplate();
}
private List<ControlLoopElement> storeElementsOnThisParticipant(List<ParticipantUpdates> participantUpdates) {
- var clElementMap = participantUpdates.stream()
+ var clElementList = participantUpdates.stream()
.flatMap(participantUpdate -> participantUpdate.getControlLoopElementList().stream())
.filter(element -> participantType.equals(element.getParticipantType())).collect(Collectors.toList());
- for (var element : clElementMap) {
+ for (var element : clElementList) {
elementsOnThisParticipant.put(element.getId(), element);
}
- return clElementMap;
+ return clElementList;
}
private Map<UUID, ControlLoopElement> prepareClElementMap(List<ControlLoopElement> clElements) {
controlLoop.getElements().values().stream().forEach(clElement -> {
for (var clElementListener : listeners) {
try {
- clElementListener.controlLoopElementStateChange(controlLoop.getDefinition(),
- clElement.getId(), clElement.getState(), orderedState);
+ clElementListener.controlLoopElementStateChange(controlLoop.getDefinition(), clElement.getId(),
+ clElement.getState(), orderedState);
} catch (PfModelException e) {
LOGGER.debug("Control loop element update failed {}", controlLoop.getDefinition());
}
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantHealthStatus;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantUtils;
import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ControlLoopProvider;
import org.onap.policy.clamp.controlloop.models.controlloop.persistence.provider.ParticipantProvider;
import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusReqPublisher;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantUpdatePublisher;
import org.onap.policy.models.base.PfModelException;
+import org.onap.policy.models.provider.PolicyModelsProvider;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
new HandleCounter<>();
private final ControlLoopProvider controlLoopProvider;
+ private final PolicyModelsProvider modelsProvider;
private final ControlLoopStateChangePublisher controlLoopStateChangePublisher;
private final ControlLoopUpdatePublisher controlLoopUpdatePublisher;
private final ParticipantProvider participantProvider;
* Constructor for instantiating SupervisionScanner.
*
* @param controlLoopProvider the provider to use to read control loops from the database
+ * @param modelsProvider the Policy Models Provider
* @param controlLoopStateChangePublisher the ControlLoop StateChange Publisher
* @param controlLoopUpdatePublisher the ControlLoopUpdate Publisher
* @param participantProvider the Participant Provider
* @param participantUpdatePublisher the Participant Update Publisher
* @param clRuntimeParameterGroup the parameters for the control loop runtime
*/
- public SupervisionScanner(final ControlLoopProvider controlLoopProvider,
+ public SupervisionScanner(final ControlLoopProvider controlLoopProvider, PolicyModelsProvider modelsProvider,
final ControlLoopStateChangePublisher controlLoopStateChangePublisher,
ControlLoopUpdatePublisher controlLoopUpdatePublisher, ParticipantProvider participantProvider,
ParticipantStatusReqPublisher participantStatusReqPublisher,
ParticipantUpdatePublisher participantUpdatePublisher,
final ClRuntimeParameterGroup clRuntimeParameterGroup) {
this.controlLoopProvider = controlLoopProvider;
+ this.modelsProvider = modelsProvider;
this.controlLoopStateChangePublisher = controlLoopStateChangePublisher;
this.controlLoopUpdatePublisher = controlLoopUpdatePublisher;
this.participantProvider = participantProvider;
}
try {
- for (ControlLoop controlLoop : controlLoopProvider.getControlLoops(null, null)) {
- scanControlLoop(controlLoop, counterCheck);
+ var list = modelsProvider.getServiceTemplateList(null, null);
+ if (list != null && !list.isEmpty()) {
+ ToscaServiceTemplate toscaServiceTemplate = list.get(0);
+
+ for (ControlLoop controlLoop : controlLoopProvider.getControlLoops(null, null)) {
+ scanControlLoop(controlLoop, toscaServiceTemplate, counterCheck);
+ }
}
} catch (PfModelException pfme) {
LOGGER.warn("error reading control loops from database", pfme);
participantUpdateCounter.remove(id);
}
- private void scanControlLoop(final ControlLoop controlLoop, boolean counterCheck) throws PfModelException {
+ private void scanControlLoop(final ControlLoop controlLoop, ToscaServiceTemplate toscaServiceTemplate,
+ boolean counterCheck) throws PfModelException {
LOGGER.debug("scanning control loop {} . . .", controlLoop.getKey().asIdentifier());
if (controlLoop.getState().equals(controlLoop.getOrderedState().asState())) {
}
var completed = true;
+ var minSpNotCompleted = 1000; // min startPhase not completed
for (ControlLoopElement element : controlLoop.getElements().values()) {
if (!element.getState().equals(element.getOrderedState().asState())) {
completed = false;
- break;
+ ToscaNodeTemplate toscaNodeTemplate = toscaServiceTemplate.getToscaTopologyTemplate().getNodeTemplates()
+ .get(element.getDefinition().getName());
+ int startPhase = ParticipantUtils.findStartPhase(toscaNodeTemplate.getProperties());
+ minSpNotCompleted = Math.min(minSpNotCompleted, startPhase);
}
+
}
if (completed) {
LOGGER.debug("control loop scan: transition from state {} to {} not completed", controlLoop.getState(),
controlLoop.getOrderedState());
if (counterCheck) {
- handleCounter(controlLoop);
+ handleCounter(controlLoop, minSpNotCompleted);
}
}
}
controlLoopCounter.clear(controlLoop.getKey().asIdentifier());
}
- private void handleCounter(ControlLoop controlLoop) {
+ private void handleCounter(ControlLoop controlLoop, int startPhase) {
ToscaConceptIdentifier id = controlLoop.getKey().asIdentifier();
if (controlLoopCounter.isFault(id)) {
LOGGER.debug("report ControlLoop fault");
if (controlLoopCounter.count(id)) {
if (ControlLoopState.UNINITIALISED2PASSIVE.equals(controlLoop.getState())) {
LOGGER.debug("retry message ControlLoopUpdate");
- controlLoopUpdatePublisher.send(controlLoop);
+ controlLoopUpdatePublisher.send(controlLoop, startPhase);
} else {
LOGGER.debug("retry message ControlLoopStateChange");
controlLoopStateChangePublisher.send(controlLoop);
* @param controlLoop the ControlLoop
*/
public void send(ControlLoop controlLoop) {
+ send(controlLoop, 0);
+ }
+
+ /**
+ * Send ControlLoopUpdate to Participant.
+ *
+ * @param controlLoop the ControlLoop
+ * @param startPhase the Start Phase
+ */
+ public void send(ControlLoop controlLoop, int startPhase) {
var controlLoopUpdateMsg = new ControlLoopUpdate();
+ controlLoopUpdateMsg.setStartPhase(startPhase);
controlLoopUpdateMsg.setControlLoopId(controlLoop.getKey().asIdentifier());
controlLoopUpdateMsg.setMessageId(UUID.randomUUID());
controlLoopUpdateMsg.setTimestamp(Instant.now());
heartBeatMs: 20000
maxStatusWaitMs: 100000
updateParameters:
- maxRetryCount: 3
+ maxRetryCount: 4
maxWaitMs: 20000
databaseProviderParameters:
name: PolicyProviderParameterGroup
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantUpdatePublisher;
import org.onap.policy.clamp.controlloop.runtime.util.CommonTestData;
import org.onap.policy.models.base.PfModelException;
+import org.onap.policy.models.provider.PolicyModelsProvider;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
class SupervisionScannerTest {
@Test
void testScannerOrderedStateEqualsToState() throws PfModelException {
var controlLoopProvider = mock(ControlLoopProvider.class);
+ var modelsProvider = mock(PolicyModelsProvider.class);
var controlLoopStateChangePublisher = mock(ControlLoopStateChangePublisher.class);
var controlLoopUpdatePublisher = mock(ControlLoopUpdatePublisher.class);
var participantProvider = mock(ParticipantProvider.class);
var controlLoop = new ControlLoop();
when(controlLoopProvider.getControlLoops(null, null)).thenReturn(List.of(controlLoop));
- var supervisionScanner = new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher,
- controlLoopUpdatePublisher, participantProvider, participantStatusReqPublisher,
- participantUpdatePublisher, clRuntimeParameterGroup);
+ var supervisionScanner = new SupervisionScanner(controlLoopProvider, modelsProvider,
+ controlLoopStateChangePublisher, controlLoopUpdatePublisher, participantProvider,
+ participantStatusReqPublisher, participantUpdatePublisher, clRuntimeParameterGroup);
supervisionScanner.run(false);
verify(controlLoopProvider, times(0)).updateControlLoop(any(ControlLoop.class));
var participantUpdatePublisher = mock(ParticipantUpdatePublisher.class);
var clRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner");
- var supervisionScanner = new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher,
- controlLoopUpdatePublisher, participantProvider, participantStatusReqPublisher,
- participantUpdatePublisher, clRuntimeParameterGroup);
+ var modelsProvider = mock(PolicyModelsProvider.class);
+ when(modelsProvider.getServiceTemplateList(null, null)).thenReturn(List.of(new ToscaServiceTemplate()));
+
+ var supervisionScanner = new SupervisionScanner(controlLoopProvider, modelsProvider,
+ controlLoopStateChangePublisher, controlLoopUpdatePublisher, participantProvider,
+ participantStatusReqPublisher, participantUpdatePublisher, clRuntimeParameterGroup);
supervisionScanner.run(false);
verify(controlLoopProvider, times(1)).updateControlLoop(any(ControlLoop.class));
participant.setVersion("1.0.0");
when(participantProvider.getParticipants(null, null)).thenReturn(List.of(participant));
+ var modelsProvider = mock(PolicyModelsProvider.class);
var controlLoopUpdatePublisher = mock(ControlLoopUpdatePublisher.class);
var participantStatusReqPublisher = mock(ParticipantStatusReqPublisher.class);
var controlLoopStateChangePublisher = mock(ControlLoopStateChangePublisher.class);
var participantUpdatePublisher = mock(ParticipantUpdatePublisher.class);
var clRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner");
- var supervisionScanner = new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher,
- controlLoopUpdatePublisher, participantProvider, participantStatusReqPublisher,
- participantUpdatePublisher, clRuntimeParameterGroup);
+ var supervisionScanner = new SupervisionScanner(controlLoopProvider, modelsProvider,
+ controlLoopStateChangePublisher, controlLoopUpdatePublisher, participantProvider,
+ participantStatusReqPublisher, participantUpdatePublisher, clRuntimeParameterGroup);
supervisionScanner.handleParticipantStatus(participant.getKey().asIdentifier());
supervisionScanner.run(true);
var participantProvider = new ParticipantProvider(clRuntimeParameterGroup.getDatabaseProviderParameters());
participantProvider.updateParticipants(List.of(participant));
+ var modelsProvider = mock(PolicyModelsProvider.class);
var controlLoopUpdatePublisher = mock(ControlLoopUpdatePublisher.class);
var participantStatusReqPublisher = mock(ParticipantStatusReqPublisher.class);
var controlLoopStateChangePublisher = mock(ControlLoopStateChangePublisher.class);
var participantUpdatePublisher = mock(ParticipantUpdatePublisher.class);
- var supervisionScanner = new SupervisionScanner(controlLoopProvider, controlLoopStateChangePublisher,
- controlLoopUpdatePublisher, participantProvider, participantStatusReqPublisher,
- participantUpdatePublisher, clRuntimeParameterGroup);
+ var supervisionScanner = new SupervisionScanner(controlLoopProvider, modelsProvider,
+ controlLoopStateChangePublisher, controlLoopUpdatePublisher, participantProvider,
+ participantStatusReqPublisher, participantUpdatePublisher, clRuntimeParameterGroup);
supervisionScanner.handleParticipantStatus(participant.getKey().asIdentifier());
supervisionScanner.run(true);