import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
import org.onap.policy.clamp.models.acm.messages.rest.instantiation.LockOrder;
import org.onap.policy.common.utils.coder.StandardCoder;
-import org.onap.policy.models.base.PfModelRuntimeException;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
import org.onap.policy.models.tosca.authorative.concepts.ToscaDataType;
import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate;
import org.onap.policy.clamp.acm.participant.kserve.k8s.KserveClient;
import org.onap.policy.clamp.acm.participant.kserve.utils.CommonTestData;
import org.onap.policy.clamp.acm.participant.kserve.utils.ToscaUtils;
-import org.onap.policy.models.base.PfModelException;
import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
class AcElementHandlerTest {
import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
import org.onap.policy.clamp.acm.participant.intermediary.api.impl.AcElementListenerV1;
import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
-import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition;
-import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
import org.onap.policy.models.base.PfModelException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
import org.onap.policy.clamp.acm.participant.intermediary.api.impl.AcElementListenerV2;
-import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
-import org.onap.policy.clamp.models.acm.concepts.DeployState;
-import org.onap.policy.clamp.models.acm.concepts.LockState;
-import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
-import org.onap.policy.clamp.models.acm.utils.AcmUtils;
import org.onap.policy.models.base.PfModelException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
package org.onap.policy.clamp.acm.participant.intermediary.api;
-import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
-import org.onap.policy.clamp.models.acm.concepts.DeployState;
-import org.onap.policy.clamp.models.acm.concepts.LockState;
import org.onap.policy.models.base.PfModelException;
/**
void deprime(CompositionDto composition) throws PfModelException;
- void handleRestartComposition(CompositionDto composition, AcTypeState state) throws PfModelException;
-
- void handleRestartInstance(CompositionElementDto compositionElement, InstanceElementDto instanceElement,
- DeployState deployState, LockState lockState) throws PfModelException;
-
/**
* Handle an update on a automation composition element.
*
package org.onap.policy.clamp.acm.participant.intermediary.api.impl;
+import jakarta.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
* Wrapper of AutomationCompositionElementListener.
* Valid since 7.1.0 release.
*/
-public abstract class AcElementListenerV1 implements AutomationCompositionElementListener {
+public abstract class AcElementListenerV1
+ implements AutomationCompositionElementListener, AutomationCompositionElementListenerV1 {
protected final ParticipantIntermediaryApi intermediaryApi;
+ private static final String NOT_SUPPORTED = "not supported!";
+
protected AcElementListenerV1(ParticipantIntermediaryApi intermediaryApi) {
this.intermediaryApi = intermediaryApi;
}
deploy(instanceElement.instanceId(), element, properties);
}
- public abstract void deploy(UUID instanceId, AcElementDeploy element, Map<String, Object> properties)
- throws PfModelException;
-
@Override
public void undeploy(CompositionElementDto compositionElement, InstanceElementDto instanceElement)
throws PfModelException {
undeploy(instanceElement.instanceId(), instanceElement.elementId());
}
- public abstract void undeploy(UUID instanceId, UUID elementId) throws PfModelException;
-
@Override
public void lock(CompositionElementDto compositionElement, InstanceElementDto instanceElement)
throws PfModelException {
lock(instanceElement.instanceId(), instanceElement.elementId());
}
+ @Override
public void lock(UUID instanceId, UUID elementId) throws PfModelException {
intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, null, LockState.LOCKED,
StateChangeResult.NO_ERROR, "Locked");
unlock(instanceElement.instanceId(), instanceElement.elementId());
}
+ @Override
public void unlock(UUID instanceId, UUID elementId) throws PfModelException {
intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, null, LockState.UNLOCKED,
StateChangeResult.NO_ERROR, "Unlocked");
delete(instanceElement.instanceId(), instanceElement.elementId());
}
+ @Override
public void delete(UUID instanceId, UUID elementId) throws PfModelException {
intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, DeployState.DELETED, null,
StateChangeResult.NO_ERROR, "Deleted");
prime(composition.compositionId(), createAcElementDefinitionList(composition));
}
+ @Override
public void prime(UUID compositionId, List<AutomationCompositionElementDefinition> elementDefinitionList)
throws PfModelException {
intermediaryApi.updateCompositionState(compositionId, AcTypeState.PRIMED, StateChangeResult.NO_ERROR, "Primed");
deprime(composition.compositionId());
}
+ @Override
public void deprime(UUID compositionId) throws PfModelException {
intermediaryApi.updateCompositionState(compositionId, AcTypeState.COMMISSIONED, StateChangeResult.NO_ERROR,
"Deprimed");
}
- @Override
public void handleRestartComposition(CompositionDto composition, AcTypeState state) throws PfModelException {
- handleRestartComposition(composition.compositionId(), createAcElementDefinitionList(composition), state);
+ throw new PfModelException(Response.Status.BAD_REQUEST, NOT_SUPPORTED);
}
/**
*/
public void handleRestartComposition(UUID compositionId,
List<AutomationCompositionElementDefinition> elementDefinitionList, AcTypeState state) throws PfModelException {
- switch (state) {
- case PRIMING -> prime(compositionId, elementDefinitionList);
- case DEPRIMING -> deprime(compositionId);
- default ->
- intermediaryApi.updateCompositionState(compositionId, state, StateChangeResult.NO_ERROR, "Restarted");
- }
+ throw new PfModelException(Response.Status.BAD_REQUEST, NOT_SUPPORTED);
}
- @Override
public void handleRestartInstance(CompositionElementDto compositionElement, InstanceElementDto instanceElement,
DeployState deployState, LockState lockState) throws PfModelException {
- var element = new AcElementDeploy();
- element.setId(instanceElement.elementId());
- element.setDefinition(compositionElement.elementDefinitionId());
- element.setProperties(instanceElement.inProperties());
- Map<String, Object> properties = new HashMap<>(instanceElement.inProperties());
- properties.putAll(compositionElement.inProperties());
- handleRestartInstance(instanceElement.instanceId(), element, properties, deployState, lockState);
+ throw new PfModelException(Response.Status.BAD_REQUEST, NOT_SUPPORTED);
}
/**
*/
public void handleRestartInstance(UUID instanceId, AcElementDeploy element,
Map<String, Object> properties, DeployState deployState, LockState lockState) throws PfModelException {
+ throw new PfModelException(Response.Status.BAD_REQUEST, NOT_SUPPORTED);
- if (DeployState.DEPLOYING.equals(deployState)) {
- deploy(instanceId, element, properties);
- return;
- }
- if (DeployState.UNDEPLOYING.equals(deployState)) {
- undeploy(instanceId, element.getId());
- return;
- }
- if (DeployState.UPDATING.equals(deployState)) {
- update(instanceId, element, properties);
- return;
- }
- if (DeployState.DELETING.equals(deployState)) {
- delete(instanceId, element.getId());
- return;
- }
- if (LockState.LOCKING.equals(lockState)) {
- lock(instanceId, element.getId());
- return;
- }
- if (LockState.UNLOCKING.equals(lockState)) {
- unlock(instanceId, element.getId());
- return;
- }
- intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(),
- deployState, lockState, StateChangeResult.NO_ERROR, "Restarted");
}
@Override
element.getProperties());
}
+ @Override
public void migrate(UUID instanceId, AcElementDeploy element, UUID compositionTargetId,
Map<String, Object> properties) throws PfModelException {
intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(),
package org.onap.policy.clamp.acm.participant.intermediary.api.impl;
+import jakarta.ws.rs.core.Response;
import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionDto;
import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
public abstract class AcElementListenerV2 implements AutomationCompositionElementListener {
protected final ParticipantIntermediaryApi intermediaryApi;
+ private static final String NOT_SUPPORTED = "not supported!";
+
protected AcElementListenerV2(ParticipantIntermediaryApi intermediaryApi) {
this.intermediaryApi = intermediaryApi;
}
StateChangeResult.NO_ERROR, "Deprimed");
}
- @Override
public void handleRestartComposition(CompositionDto composition, AcTypeState state) throws PfModelException {
- switch (state) {
- case PRIMING -> prime(composition);
- case DEPRIMING -> deprime(composition);
- default -> intermediaryApi
- .updateCompositionState(composition.compositionId(), state, StateChangeResult.NO_ERROR, "Restarted");
- }
+ throw new PfModelException(Response.Status.BAD_REQUEST, NOT_SUPPORTED);
}
- @Override
public void handleRestartInstance(CompositionElementDto compositionElement, InstanceElementDto instanceElement,
DeployState deployState, LockState lockState) throws PfModelException {
-
- if (DeployState.DEPLOYING.equals(deployState)) {
- deploy(compositionElement, instanceElement);
- return;
- }
- if (DeployState.UNDEPLOYING.equals(deployState)) {
- undeploy(compositionElement, instanceElement);
- return;
- }
- if (DeployState.UPDATING.equals(deployState)) {
- update(compositionElement, instanceElement, instanceElement);
- return;
- }
- if (DeployState.DELETING.equals(deployState)) {
- delete(compositionElement, instanceElement);
- return;
- }
- if (LockState.LOCKING.equals(lockState)) {
- lock(compositionElement, instanceElement);
- return;
- }
- if (LockState.UNLOCKING.equals(lockState)) {
- unlock(compositionElement, instanceElement);
- return;
- }
- intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
- instanceElement.elementId(), deployState, lockState, StateChangeResult.NO_ERROR, "Restarted");
+ throw new PfModelException(Response.Status.BAD_REQUEST, NOT_SUPPORTED);
}
@Override
--- /dev/null
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.participant.intermediary.api.impl;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition;
+import org.onap.policy.models.base.PfModelException;
+
+public interface AutomationCompositionElementListenerV1 {
+
+ void undeploy(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
+
+ void deploy(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties)
+ throws PfModelException;
+
+ void lock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
+
+ void unlock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
+
+ void delete(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
+
+ void update(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties)
+ throws PfModelException;
+
+ void prime(UUID compositionId, List<AutomationCompositionElementDefinition> elementDefinitionList)
+ throws PfModelException;
+
+ void deprime(UUID compositionId) throws PfModelException;
+
+ void migrate(UUID instanceId, AcElementDeploy element, UUID compositionTargetId,
+ Map<String, Object> properties) throws PfModelException;
+}
+++ /dev/null
-/*-
- * ============LICENSE_START=======================================================
- * Copyright (C) 2023-2024 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.clamp.acm.participant.intermediary.comm;
-
-import org.onap.policy.clamp.acm.participant.intermediary.handler.ParticipantHandler;
-import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
-import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart;
-import org.springframework.stereotype.Component;
-
-@Component
-public class ParticipantRestartListener extends ParticipantListener<ParticipantRestart> {
-
- /**
- * Constructs the object.
- *
- * @param participantHandler the handler for managing the state of the participant
- */
- public ParticipantRestartListener(ParticipantHandler participantHandler) {
- super(ParticipantRestart.class, participantHandler, participantHandler::handleParticipantRestart);
- }
-
- @Override
- public String getType() {
- return ParticipantMessageType.PARTICIPANT_RESTART.name();
- }
-}
import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrime;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck;
-import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
import org.springframework.stereotype.Component;
@Component
}
/**
- * Handle a ParticipantRestart message.
+ * Handle a Participant Sync message.
*
- * @param participantRestartMsg the participantRestart message
+ * @param participantSyncMsg the participantRestart message
*/
- public void handleParticipantRestart(ParticipantRestart participantRestartMsg) {
- List<AutomationCompositionElementDefinition> list = new ArrayList<>();
- for (var participantDefinition : participantRestartMsg.getParticipantDefinitionUpdates()) {
- list.addAll(participantDefinition.getAutomationCompositionElementDefinitionList());
+ public void handleParticipantSync(ParticipantSync participantSyncMsg) {
+
+ if (participantSyncMsg.isDelete()) {
+ if (AcTypeState.COMMISSIONED.equals(participantSyncMsg.getState())) {
+ cacheProvider.removeElementDefinition(participantSyncMsg.getCompositionId());
+ }
+ for (var automationcomposition : participantSyncMsg.getAutomationcompositionList()) {
+ cacheProvider.removeAutomationComposition(automationcomposition.getAutomationCompositionId());
+ }
+ return;
}
- if (!AcTypeState.COMMISSIONED.equals(participantRestartMsg.getState())) {
- cacheProvider.addElementDefinition(participantRestartMsg.getCompositionId(), list);
+
+ if (!participantSyncMsg.getParticipantDefinitionUpdates().isEmpty()) {
+ List<AutomationCompositionElementDefinition> list = new ArrayList<>();
+ for (var participantDefinition : participantSyncMsg.getParticipantDefinitionUpdates()) {
+ list.addAll(participantDefinition.getAutomationCompositionElementDefinitionList());
+ }
+ cacheProvider.addElementDefinition(participantSyncMsg.getCompositionId(), list);
}
- for (var automationcomposition : participantRestartMsg.getAutomationcompositionList()) {
+ for (var automationcomposition : participantSyncMsg.getAutomationcompositionList()) {
cacheProvider
- .initializeAutomationComposition(participantRestartMsg.getCompositionId(), automationcomposition);
+ .initializeAutomationComposition(participantSyncMsg.getCompositionId(), automationcomposition);
}
- var inPropertiesMap = list.stream().collect(Collectors.toMap(
- AutomationCompositionElementDefinition::getAcElementDefinitionId,
- el -> el.getAutomationCompositionElementToscaNodeTemplate().getProperties()));
- var outPropertiesMap = list.stream().collect(Collectors.toMap(
- AutomationCompositionElementDefinition::getAcElementDefinitionId,
- AutomationCompositionElementDefinition::getOutProperties));
- var composition =
- new CompositionDto(participantRestartMsg.getCompositionId(), inPropertiesMap, outPropertiesMap);
- listener.restarted(participantRestartMsg.getMessageId(), composition, participantRestartMsg.getState(),
- participantRestartMsg.getAutomationcompositionList());
}
}
ParticipantRestartAc participantRestartAc) {
Map<UUID, AutomationCompositionElement> acElementMap = new LinkedHashMap<>();
for (var element : participantRestartAc.getAcElementList()) {
+ if (!getParticipantId().equals(element.getParticipantId())) {
+ continue;
+ }
var acElement = new AutomationCompositionElement();
acElement.setId(element.getId());
acElement.setParticipantId(getParticipantId());
acElement.setUseState(element.getUseState());
acElement.setProperties(element.getProperties());
acElement.setOutProperties(element.getOutProperties());
- acElement.setRestarting(true);
acElementMap.put(element.getId(), acElement);
}
var automationComposition = new AutomationComposition();
automationComposition.setCompositionId(compositionId);
+ automationComposition.setDeployState(participantRestartAc.getDeployState());
+ automationComposition.setLockState(participantRestartAc.getLockState());
automationComposition.setInstanceId(participantRestartAc.getAutomationCompositionId());
automationComposition.setElements(acElementMap);
automationCompositions.put(automationComposition.getInstanceId(), automationComposition);
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrime;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegister;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegisterAck;
-import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatusReq;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
acDefinitionHandler.handlePrime(participantPrimeMsg);
}
- /**
- * Handle a ParticipantRestart message.
- *
- * @param participantRestartMsg the participantRestart message
- */
- @Timed(value = "listener.participant_restart", description = "PARTICIPANT_RESTART messages received")
- public void handleParticipantRestart(ParticipantRestart participantRestartMsg) {
- LOGGER.debug("ParticipantRestart message received for participantId {}",
- participantRestartMsg.getParticipantId());
- acDefinitionHandler.handleParticipantRestart(participantRestartMsg);
- }
-
/**
* Handle a ParticipantSync message.
*
*/
@Timed(value = "listener.participant_sync_msg", description = "PARTICIPANT_SYNC messages received")
public void handleParticipantSync(ParticipantSync participantSyncMsg) {
- LOGGER.debug("ParticipantSync message received for participantId {}",
- participantSyncMsg.getParticipantId());
+ if (participantSyncMsg.getExcludeReplicas().contains(cacheProvider.getReplicaId())) {
+ LOGGER.debug("Ignore ParticipantSync message {}", participantSyncMsg.getMessageId());
+ return;
+ }
+ LOGGER.debug("ParticipantSync message received for participantId {}", participantSyncMsg.getParticipantId());
+ acDefinitionHandler.handleParticipantSync(participantSyncMsg);
}
/**
import io.opentelemetry.context.Context;
import java.io.Closeable;
import java.io.IOException;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.DeployState;
import org.onap.policy.clamp.models.acm.concepts.LockState;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
import org.onap.policy.models.base.PfModelException;
import org.slf4j.Logger;
}
}
- /**
- * Handles restarted scenario.
- *
- * @param messageId the messageId
- * @param composition the composition
- * @param state the state of the composition
- * @param automationCompositionList list of ParticipantRestartAc
- */
- public void restarted(UUID messageId, CompositionDto composition,
- AcTypeState state, List<ParticipantRestartAc> automationCompositionList) {
- try {
- listener.handleRestartComposition(composition, state);
- } catch (PfModelException e) {
- LOGGER.error("Composition Defintion restarted failed {} {}", composition.compositionId(), e.getMessage());
- intermediaryApi.updateCompositionState(composition.compositionId(), state, StateChangeResult.FAILED,
- "Composition Defintion restarted failed");
- }
-
- for (var automationComposition : automationCompositionList) {
- for (var element : automationComposition.getAcElementList()) {
- var compositionElement = new CompositionElementDto(composition.compositionId(),
- element.getDefinition(), composition.inPropertiesMap().get(element.getDefinition()),
- composition.outPropertiesMap().get(element.getDefinition()));
- var instanceElementDto = new InstanceElementDto(automationComposition.getAutomationCompositionId(),
- element.getId(), element.getToscaServiceTemplateFragment(),
- element.getProperties(), element.getOutProperties());
- cleanExecution(element.getId(), messageId);
- var result = executor.submit(() ->
- this.restartedInstanceProcess(compositionElement, instanceElementDto,
- element.getDeployState(), element.getLockState()));
- executionMap.put(element.getId(), result);
- }
- }
- }
-
- private void restartedInstanceProcess(CompositionElementDto compositionElement,
- InstanceElementDto instanceElementDto, DeployState deployState, LockState lockState) {
- try {
- listener.handleRestartInstance(compositionElement, instanceElementDto, deployState, lockState);
- executionMap.remove(instanceElementDto.elementId());
- } catch (PfModelException e) {
- LOGGER.error("Automation composition element deploy failed {} {}",
- instanceElementDto.elementId(), e.getMessage());
- intermediaryApi.updateAutomationCompositionElementState(instanceElementDto.instanceId(),
- instanceElementDto.elementId(), deployState, lockState, StateChangeResult.FAILED,
- "Automation composition element restart failed");
- }
- }
-
/**
* Closes this stream and releases any system resources associated
* with it. If the stream is already closed then invoking this
package org.onap.policy.clamp.acm.participant.intermediary.api.impl;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
var instanceElement = new InstanceElementDto(UUID.randomUUID(), UUID.randomUUID(), null, Map.of(), Map.of());
acElementListenerV1.deploy(compositionElement, instanceElement);
verify(acElementListenerV1).deploy(any(), any(), any());
-
- clearInvocations(acElementListenerV1);
- acElementListenerV1.handleRestartInstance(compositionElement, instanceElement,
- DeployState.DEPLOYING, LockState.NONE);
- verify(acElementListenerV1).deploy(any(), any(), any());
}
@Test
var instanceElement = new InstanceElementDto(UUID.randomUUID(), UUID.randomUUID(), null, Map.of(), Map.of());
acElementListenerV1.undeploy(compositionElement, instanceElement);
verify(acElementListenerV1).undeploy(instanceElement.instanceId(), instanceElement.elementId());
-
- clearInvocations(acElementListenerV1);
- acElementListenerV1.handleRestartInstance(compositionElement, instanceElement,
- DeployState.UNDEPLOYING, LockState.NONE);
- verify(acElementListenerV1).undeploy(instanceElement.instanceId(), instanceElement.elementId());
}
@Test
}
@Test
- void handleRestartComposition() throws PfModelException {
- var intermediaryApi = mock(ParticipantIntermediaryApi.class);
- var acElementListenerV1 = createAcElementListenerV1(intermediaryApi);
- var compositionId = UUID.randomUUID();
- var toscaConceptIdentifier = new ToscaConceptIdentifier();
- var composition = new CompositionDto(compositionId, Map.of(toscaConceptIdentifier, Map.of()), Map.of());
-
- acElementListenerV1.handleRestartComposition(composition, AcTypeState.PRIMED);
- verify(intermediaryApi)
- .updateCompositionState(compositionId, AcTypeState.PRIMED, StateChangeResult.NO_ERROR, "Restarted");
-
- clearInvocations(intermediaryApi);
- acElementListenerV1.handleRestartComposition(composition, AcTypeState.PRIMING);
- verify(intermediaryApi)
- .updateCompositionState(compositionId, AcTypeState.PRIMED, StateChangeResult.NO_ERROR, "Primed");
-
- clearInvocations(intermediaryApi);
- acElementListenerV1.handleRestartComposition(composition, AcTypeState.DEPRIMING);
- verify(intermediaryApi)
- .updateCompositionState(compositionId, AcTypeState.COMMISSIONED, StateChangeResult.NO_ERROR, "Deprimed");
+ void handleRestartComposition() {
+ var acElementListenerV1 = createAcElementListenerV1(mock(ParticipantIntermediaryApi.class));
+ assertThatThrownBy(() -> acElementListenerV1.handleRestartComposition(null, null))
+ .isInstanceOf(PfModelException.class);
}
@Test
- void handleRestartInstance() throws PfModelException {
- var intermediaryApi = mock(ParticipantIntermediaryApi.class);
- var acElementListenerV1 = createAcElementListenerV1(intermediaryApi);
- var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(),
- Map.of(), Map.of());
- var instanceElement = new InstanceElementDto(UUID.randomUUID(), UUID.randomUUID(), null, Map.of(), Map.of());
-
- acElementListenerV1.handleRestartInstance(compositionElement, instanceElement,
- DeployState.DEPLOYED, LockState.LOCKED);
- verify(intermediaryApi).updateAutomationCompositionElementState(instanceElement.instanceId(),
- instanceElement.elementId(), DeployState.DEPLOYED, LockState.LOCKED,
- StateChangeResult.NO_ERROR, "Restarted");
-
- clearInvocations(intermediaryApi);
- acElementListenerV1.handleRestartInstance(compositionElement, instanceElement,
- DeployState.DEPLOYED, LockState.LOCKING);
- verify(intermediaryApi).updateAutomationCompositionElementState(instanceElement.instanceId(),
- instanceElement.elementId(), null, LockState.LOCKED, StateChangeResult.NO_ERROR, "Locked");
-
- clearInvocations(intermediaryApi);
- acElementListenerV1.handleRestartInstance(compositionElement, instanceElement,
- DeployState.DEPLOYED, LockState.UNLOCKING);
- verify(intermediaryApi).updateAutomationCompositionElementState(instanceElement.instanceId(),
- instanceElement.elementId(), null, LockState.UNLOCKED, StateChangeResult.NO_ERROR, "Unlocked");
-
- clearInvocations(intermediaryApi);
- acElementListenerV1.handleRestartInstance(compositionElement, instanceElement,
- DeployState.UPDATING, LockState.LOCKED);
- verify(intermediaryApi).updateAutomationCompositionElementState(instanceElement.instanceId(),
- instanceElement.elementId(), DeployState.DEPLOYED, null,
- StateChangeResult.NO_ERROR, "Update not supported");
-
- clearInvocations(intermediaryApi);
- acElementListenerV1.handleRestartInstance(compositionElement, instanceElement,
- DeployState.DELETING, LockState.NONE);
- verify(intermediaryApi).updateAutomationCompositionElementState(instanceElement.instanceId(),
- instanceElement.elementId(), DeployState.DELETED, null, StateChangeResult.NO_ERROR, "Deleted");
+ void handleRestartInstance() {
+ var acElementListenerV1 = createAcElementListenerV1(mock(ParticipantIntermediaryApi.class));
+ assertThatThrownBy(() -> acElementListenerV1.handleRestartInstance(null, null,
+ null, null)).isInstanceOf(PfModelException.class);
}
@Test
package org.onap.policy.clamp.acm.participant.intermediary.api.impl;
-import static org.mockito.Mockito.clearInvocations;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import java.util.Map;
import java.util.UUID;
import org.junit.jupiter.api.Test;
-import org.mockito.Answers;
import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionDto;
import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
class AcElementListenerV2Test {
- @Test
- void deployTest() throws PfModelException {
- var acElementListenerV2 = mock(AcElementListenerV2.class, Answers.CALLS_REAL_METHODS);
- var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(),
- Map.of(), Map.of());
- var instanceElement = new InstanceElementDto(UUID.randomUUID(), UUID.randomUUID(), null, Map.of(), Map.of());
- acElementListenerV2.handleRestartInstance(compositionElement, instanceElement,
- DeployState.DEPLOYING, LockState.NONE);
- verify(acElementListenerV2).deploy(compositionElement, instanceElement);
- }
-
- @Test
- void undeployTest() throws PfModelException {
- var acElementListenerV2 = mock(AcElementListenerV2.class, Answers.CALLS_REAL_METHODS);
- var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(),
- Map.of(), Map.of());
- var instanceElement = new InstanceElementDto(UUID.randomUUID(), UUID.randomUUID(), null, Map.of(), Map.of());
- acElementListenerV2.handleRestartInstance(compositionElement, instanceElement,
- DeployState.UNDEPLOYING, LockState.NONE);
- verify(acElementListenerV2).undeploy(compositionElement, instanceElement);
- }
-
@Test
void lockTest() throws PfModelException {
var intermediaryApi = mock(ParticipantIntermediaryApi.class);
}
@Test
- void handleRestartComposition() throws PfModelException {
- var intermediaryApi = mock(ParticipantIntermediaryApi.class);
- var acElementListenerV2 = createAcElementListenerV2(intermediaryApi);
- var compositionId = UUID.randomUUID();
- var toscaConceptIdentifier = new ToscaConceptIdentifier();
- var composition = new CompositionDto(compositionId, Map.of(toscaConceptIdentifier, Map.of()), Map.of());
-
- acElementListenerV2.handleRestartComposition(composition, AcTypeState.PRIMED);
- verify(intermediaryApi)
- .updateCompositionState(compositionId, AcTypeState.PRIMED, StateChangeResult.NO_ERROR, "Restarted");
-
- clearInvocations(intermediaryApi);
- acElementListenerV2.handleRestartComposition(composition, AcTypeState.PRIMING);
- verify(intermediaryApi)
- .updateCompositionState(compositionId, AcTypeState.PRIMED, StateChangeResult.NO_ERROR, "Primed");
-
- clearInvocations(intermediaryApi);
- acElementListenerV2.handleRestartComposition(composition, AcTypeState.DEPRIMING);
- verify(intermediaryApi)
- .updateCompositionState(compositionId, AcTypeState.COMMISSIONED, StateChangeResult.NO_ERROR, "Deprimed");
+ void handleRestartComposition() {
+ var acElementListenerV2 = createAcElementListenerV2(mock(ParticipantIntermediaryApi.class));
+ assertThatThrownBy(() -> acElementListenerV2.handleRestartComposition(null, null))
+ .isInstanceOf(PfModelException.class);
}
@Test
- void handleRestartInstance() throws PfModelException {
- var intermediaryApi = mock(ParticipantIntermediaryApi.class);
- var acElementListenerV2 = createAcElementListenerV2(intermediaryApi);
- var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(),
- Map.of(), Map.of());
- var instanceElement = new InstanceElementDto(UUID.randomUUID(), UUID.randomUUID(), null, Map.of(), Map.of());
-
- acElementListenerV2.handleRestartInstance(compositionElement, instanceElement,
- DeployState.DEPLOYED, LockState.LOCKED);
- verify(intermediaryApi).updateAutomationCompositionElementState(instanceElement.instanceId(),
- instanceElement.elementId(), DeployState.DEPLOYED, LockState.LOCKED,
- StateChangeResult.NO_ERROR, "Restarted");
-
- clearInvocations(intermediaryApi);
- acElementListenerV2.handleRestartInstance(compositionElement, instanceElement,
- DeployState.DEPLOYED, LockState.LOCKING);
- verify(intermediaryApi).updateAutomationCompositionElementState(instanceElement.instanceId(),
- instanceElement.elementId(), null, LockState.LOCKED, StateChangeResult.NO_ERROR, "Locked");
-
- clearInvocations(intermediaryApi);
- acElementListenerV2.handleRestartInstance(compositionElement, instanceElement,
- DeployState.DEPLOYED, LockState.UNLOCKING);
- verify(intermediaryApi).updateAutomationCompositionElementState(instanceElement.instanceId(),
- instanceElement.elementId(), null, LockState.UNLOCKED, StateChangeResult.NO_ERROR, "Unlocked");
-
- clearInvocations(intermediaryApi);
- acElementListenerV2.handleRestartInstance(compositionElement, instanceElement,
- DeployState.UPDATING, LockState.LOCKED);
- verify(intermediaryApi).updateAutomationCompositionElementState(instanceElement.instanceId(),
- instanceElement.elementId(), DeployState.DEPLOYED, null,
- StateChangeResult.NO_ERROR, "Update not supported");
-
- clearInvocations(intermediaryApi);
- acElementListenerV2.handleRestartInstance(compositionElement, instanceElement,
- DeployState.DELETING, LockState.NONE);
- verify(intermediaryApi).updateAutomationCompositionElementState(instanceElement.instanceId(),
- instanceElement.elementId(), DeployState.DELETED, null, StateChangeResult.NO_ERROR, "Deleted");
+ void handleRestartInstance() {
+ var acElementListenerV2 = createAcElementListenerV2(mock(ParticipantIntermediaryApi.class));
+ assertThatThrownBy(() -> acElementListenerV2.handleRestartInstance(null, null,
+ null, null)).isInstanceOf(PfModelException.class);
}
@Test
assertEquals(ParticipantMessageType.AUTOMATION_COMPOSITION_STATE_CHANGE.name(),
automationCompositionStateChangeListener.getType());
- var participantRestartListener = new ParticipantRestartListener(participantHandler);
- assertEquals(ParticipantMessageType.PARTICIPANT_RESTART.name(),
- participantRestartListener.getType());
-
var participantSyncListener = new ParticipantSyncListener(participantHandler);
assertEquals(ParticipantMessageType.PARTICIPANT_SYNC_MSG.name(),
participantSyncListener.getType());
import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrime;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck;
-import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
class AcDefinitionHandlerTest {
}
@Test
- void restartedTest() {
- var participantRestartMsg = new ParticipantRestart();
- participantRestartMsg.setState(AcTypeState.PRIMED);
- participantRestartMsg.setCompositionId(UUID.randomUUID());
- participantRestartMsg.getParticipantDefinitionUpdates().add(createParticipantDefinition());
- participantRestartMsg.setAutomationcompositionList(List.of(CommonTestData.createParticipantRestartAc()));
+ void syncTest() {
+ var participantSyncMsg = new ParticipantSync();
+ participantSyncMsg.setState(AcTypeState.PRIMED);
+ participantSyncMsg.setCompositionId(UUID.randomUUID());
+ participantSyncMsg.getParticipantDefinitionUpdates().add(createParticipantDefinition());
+ participantSyncMsg.setAutomationcompositionList(List.of(CommonTestData.createParticipantRestartAc()));
var cacheProvider = mock(CacheProvider.class);
var listener = mock(ThreadHandler.class);
var ach = new AcDefinitionHandler(cacheProvider, mock(ParticipantMessagePublisher.class), listener);
- ach.handleParticipantRestart(participantRestartMsg);
+ ach.handleParticipantSync(participantSyncMsg);
verify(cacheProvider).initializeAutomationComposition(any(UUID.class), any());
verify(cacheProvider).addElementDefinition(any(), any());
}
+
+ @Test
+ void syncDeleteTest() {
+ var participantSyncMsg = new ParticipantSync();
+ participantSyncMsg.setState(AcTypeState.COMMISSIONED);
+ participantSyncMsg.setDelete(true);
+ participantSyncMsg.setCompositionId(UUID.randomUUID());
+ participantSyncMsg.getParticipantDefinitionUpdates().add(createParticipantDefinition());
+ var restartAc = CommonTestData.createParticipantRestartAc();
+ participantSyncMsg.setAutomationcompositionList(List.of(restartAc));
+
+ var cacheProvider = mock(CacheProvider.class);
+ var listener = mock(ThreadHandler.class);
+ var ach = new AcDefinitionHandler(cacheProvider, mock(ParticipantMessagePublisher.class), listener);
+ ach.handleParticipantSync(participantSyncMsg);
+ verify(cacheProvider).removeElementDefinition(participantSyncMsg.getCompositionId());
+ verify(cacheProvider).removeAutomationComposition(restartAc.getAutomationCompositionId());
+ }
}
import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionDto;
import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
-import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
-import org.onap.policy.clamp.models.acm.concepts.DeployState;
-import org.onap.policy.clamp.models.acm.concepts.LockState;
import org.onap.policy.models.base.PfModelException;
public class DummyAcElementListener implements AutomationCompositionElementListener {
public void deprime(CompositionDto composition) throws PfModelException {
}
- @Override
- public void handleRestartComposition(CompositionDto composition, AcTypeState state) throws PfModelException {
- }
-
- @Override
- public void handleRestartInstance(CompositionElementDto compositionElement, InstanceElementDto instanceElement,
- DeployState deployState, LockState lockState) throws PfModelException {
- }
-
@Override
public void migrate(CompositionElementDto compositionElement, CompositionElementDto compositionElementTarget,
InstanceElementDto instanceElement, InstanceElementDto instanceElementMigrate)
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrime;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegister;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegisterAck;
-import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatusReq;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.PropertiesUpdate;
import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
import org.onap.policy.clamp.models.acm.messages.rest.instantiation.LockOrder;
@Test
void handleParticipantRestartTest() {
- var participantRestartMsg = new ParticipantRestart();
- participantRestartMsg.setState(AcTypeState.PRIMED);
- participantRestartMsg.setCompositionId(UUID.randomUUID());
+ var participantSyncMsg = new ParticipantSync();
+ participantSyncMsg.setState(AcTypeState.PRIMED);
+ participantSyncMsg.setCompositionId(UUID.randomUUID());
+ participantSyncMsg.setReplicaId(CommonTestData.getReplicaId());
var cacheProvider = mock(CacheProvider.class);
+ when(cacheProvider.getReplicaId()).thenReturn(CommonTestData.getReplicaId());
var publisher = mock(ParticipantMessagePublisher.class);
var acHandler = mock(AcDefinitionHandler.class);
var participantHandler = new ParticipantHandler(mock(AutomationCompositionHandler.class),
mock(AcLockHandler.class), acHandler, publisher, cacheProvider);
- participantHandler.handleParticipantRestart(participantRestartMsg);
- verify(acHandler).handleParticipantRestart(participantRestartMsg);
+ participantHandler.handleParticipantSync(participantSyncMsg);
+ verify(acHandler).handleParticipantSync(participantSyncMsg);
}
@Test
package org.onap.policy.clamp.acm.participant.intermediary.handler;
-import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import jakarta.ws.rs.core.Response.Status;
import java.io.IOException;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
-import org.onap.policy.clamp.models.acm.concepts.AcElementRestart;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.DeployState;
import org.onap.policy.clamp.models.acm.concepts.LockState;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
import org.onap.policy.models.base.PfModelException;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
threadHandler.deprime(messageId, composition);
verify(intermediaryApi, timeout(TIMEOUT)).updateCompositionState(compositionId, AcTypeState.PRIMED,
StateChangeResult.FAILED, "Composition Defintion deprime failed");
-
- clearInvocations(listener);
- doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener)
- .handleRestartComposition(composition, AcTypeState.PRIMING);
- threadHandler.restarted(messageId, composition, AcTypeState.PRIMING, List.of());
- verify(intermediaryApi).updateCompositionState(compositionId, AcTypeState.PRIMED, StateChangeResult.FAILED,
- "Composition Defintion deprime failed");
- }
- }
-
- @Test
- void testRestarted() throws IOException, PfModelException {
- var listener = mock(AutomationCompositionElementListener.class);
- var intermediaryApi = mock(ParticipantIntermediaryApi.class);
- var cacheProvider = mock(CacheProvider.class);
- try (var threadHandler = new ThreadHandler(listener, intermediaryApi, cacheProvider)) {
- var messageId = UUID.randomUUID();
- var compositionId = UUID.randomUUID();
- var participantRestartAc = new ParticipantRestartAc();
- participantRestartAc.setAutomationCompositionId(UUID.randomUUID());
- participantRestartAc.getAcElementList().add(new AcElementRestart());
- var composition = new CompositionDto(compositionId, Map.of(), Map.of());
- threadHandler.restarted(messageId, composition, AcTypeState.PRIMED, List.of(participantRestartAc));
- verify(listener, timeout(TIMEOUT)).handleRestartInstance(any(), any(), any(), any());
}
}
}
return REPLICA_ID;
}
- public static UUID getRndParticipantId() {
- return UUID.randomUUID();
- }
-
public static ToscaConceptIdentifier getDefinition() {
return new ToscaConceptIdentifier("org.onap.domain.pmsh.PMSH_DCAEMicroservice", "1.2.3");
}
* Returns a Map of ToscaConceptIdentifier and AutomationComposition for test cases.
*
* @return automationCompositionMap
- *
- * @throws CoderException if there is an error with .json file.
*/
public static Map<UUID, AutomationComposition> getTestAutomationCompositionMap() {
var automationCompositions = getTestAutomationCompositions();
* Returns List of AutomationComposition for test cases.
*
* @return AutomationCompositions
- *
- * @throws CoderException if there is an error with .json file.
*/
public static AutomationCompositions getTestAutomationCompositions() {
try {
public static ParticipantRestartAc createParticipantRestartAc() {
var participantRestartAc = new ParticipantRestartAc();
participantRestartAc.setAutomationCompositionId(AC_ID_0);
+ participantRestartAc.setDeployState(DeployState.DEPLOYED);
+ participantRestartAc.setLockState(LockState.LOCKED);
var acElementRestart = new AcElementRestart();
acElementRestart.setDefinition(getDefinition());
+ acElementRestart.setParticipantId(PARTCICIPANT_ID);
acElementRestart.setDeployState(DeployState.DEPLOYED);
acElementRestart.setLockState(LockState.LOCKED);
acElementRestart.setOperationalState("OperationalState");
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart;
import org.onap.policy.clamp.models.acm.utils.AcmUtils;
import org.slf4j.Logger;