* Used by acm runtime to migrate from a composition to another one in participants, triggers a
* AUTOMATION_COMPOSITION_MIGRATION message with result of AUTOMATION_COMPOSITION_STATE_CHANGE operation.
*/
- AUTOMATION_COMPOSITION_MIGRATION
+ AUTOMATION_COMPOSITION_MIGRATION,
+
+ /**
+ * Used by runtime to send composition and instances to sync participant replicas.
+ */
+ PARTICIPANT_SYNC_MSG
}
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2023,2024 Nordix Foundation.
+ * Copyright (C) 2023-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
// element definition
private List<ParticipantDefinition> participantDefinitionUpdates = new ArrayList<>();
- // automationcomposition instances list
+ // automation composition instances list
private List<ParticipantRestartAc> automationcompositionList = new ArrayList<>();
/**
super(ParticipantMessageType.PARTICIPANT_RESTART);
}
+ /**
+ * Constructor with message type.
+ * @param messageType messageType
+ */
+ public ParticipantRestart(ParticipantMessageType messageType) {
+ super(messageType);
+ }
+
/**
* Constructs the object, making a deep copy.
*
--- /dev/null
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.models.acm.messages.kafka.participant;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+@Getter
+@Setter
+@ToString(callSuper = true)
+public class ParticipantSync extends ParticipantRestart {
+
+ /**
+ * Constructor.
+ */
+ public ParticipantSync() {
+ super(ParticipantMessageType.PARTICIPANT_SYNC_MSG);
+ }
+
+ /**
+ * Constructs the object, making a deep copy.
+ *
+ * @param source source from which to copy
+ */
+ public ParticipantSync(ParticipantSync source) {
+ super(source);
+ }
+}
package org.onap.policy.clamp.models.acm.messages.kafka.participant;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageUtils.assertSerializable;
import static org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageUtils.removeVariableFields;
@Test
void testCopyConstructor() throws CoderException {
- assertThatThrownBy(() -> new ParticipantRestart(null)).isInstanceOf(NullPointerException.class);
final var orig = new ParticipantRestart();
// verify with null values
--- /dev/null
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.models.acm.messages.kafka.participant;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageUtils.assertSerializable;
+import static org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageUtils.removeVariableFields;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.junit.jupiter.api.Test;
+import org.onap.policy.clamp.models.acm.concepts.AcElementRestart;
+import org.onap.policy.clamp.models.acm.concepts.DeployState;
+import org.onap.policy.clamp.models.acm.concepts.LockState;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
+import org.onap.policy.clamp.models.acm.utils.CommonTestData;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+
+
+
+public class ParticipantSyncTest {
+
+ @Test
+ void testCopyConstructor() throws CoderException {
+
+ final var orig = new ParticipantSync();
+ // verify with null values
+ assertEquals(removeVariableFields(orig.toString()),
+ removeVariableFields(new ParticipantSync(orig).toString()));
+
+ orig.setMessageId(UUID.randomUUID());
+ orig.setCompositionId(UUID.randomUUID());
+ orig.setTimestamp(Instant.ofEpochMilli(3000));
+ orig.setParticipantId(CommonTestData.getParticipantId());
+
+ var participantDefinitionUpdate = new ParticipantDefinition();
+ var type = new ToscaConceptIdentifier("id", "1.2.3");
+ var acDefinition = CommonTestData.getAcElementDefinition(type);
+ participantDefinitionUpdate.setAutomationCompositionElementDefinitionList(List.of(acDefinition));
+ orig.setParticipantDefinitionUpdates(List.of(participantDefinitionUpdate));
+
+ var acElement = new AcElementRestart();
+ acElement.setId(UUID.randomUUID());
+ var id = new ToscaConceptIdentifier("id", "1.2.3");
+ acElement.setDefinition(id);
+ acElement.setDeployState(DeployState.DEPLOYED);
+ acElement.setLockState(LockState.LOCKED);
+ acElement.setOperationalState("OperationalState");
+ acElement.setUseState("UseState");
+ acElement.setProperties(Map.of("key", "value"));
+ acElement.setOutProperties(Map.of("keyOut", "valueOut"));
+
+ var acRestart = new ParticipantRestartAc();
+ acRestart.setAcElementList(List.of(acElement));
+ acRestart.setAutomationCompositionId(UUID.randomUUID());
+
+ orig.setAutomationcompositionList(List.of(acRestart));
+
+ assertEquals(removeVariableFields(orig.toString()),
+ removeVariableFields(new ParticipantSync(orig).toString()));
+
+ assertSerializable(orig, ParticipantSync.class);
+ }
+}
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021 Nordix Foundation.
+ * Copyright (C) 2021,2024 Nordix Foundation.
* Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
import lombok.Getter;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
+import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
topicSources = TopicEndpointManager.getManager()
.addTopicSources(acRuntimeParameterGroup.getTopicParameterGroup().getTopicSources());
+ var topics = acRuntimeParameterGroup.getTopics();
+
msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
+ var topicMap = topicSinks.stream()
+ .collect(Collectors.toMap(Topic::getTopic, UnaryOperator.identity()));
+
+
// @formatter:off
addAction("Topic endpoint management",
() -> TopicEndpointManager.getManager().start(),
publishers.forEach(publisher ->
addAction("Publisher " + publisher.getClass().getSimpleName(),
- () -> publisher.active(topicSinks),
+ () -> publisher.active(publisher.isDefaultTopic() ? topicMap.get(topics.getOperationTopic())
+ : topicMap.get(topics.getSyncTopic())),
publisher::stop));
listeners.forEach(listener ->
* Registers the dispatcher with the topic source(s).
*/
private void registerMsgDispatcher() {
- for (final TopicSource source : topicSources) {
+ for (final var source : topicSources) {
source.register(msgDispatcher);
}
}
* Unregisters the dispatcher from the topic source(s).
*/
private void unregisterMsgDispatcher() {
- for (final TopicSource source : topicSources) {
+ for (final var source : topicSources) {
source.unregister(msgDispatcher);
}
}
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021 Nordix Foundation.
+ * Copyright (C) 2021,2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.policy.clamp.acm.runtime.config.messaging;
import java.util.List;
+import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
/**
*/
public interface Publisher {
- void active(List<TopicSink> topicSinks);
+ void active(TopicSink topicSink);
void stop();
+
+ boolean isDefaultTopic();
}
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021,2023 Nordix Foundation.
+ * Copyright (C) 2021,2023-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@Valid
@NotNull
private AcmParameters acmParameters = new AcmParameters();
+
+ @Valid
+ @NotNull
+ private Topics topics;
}
--- /dev/null
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.runtime.main.parameters;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.validation.annotation.Validated;
+
+@Getter
+@Setter
+@Validated
+@AllArgsConstructor
+public class Topics {
+
+ private String operationTopic;
+ private String syncTopic;
+}
import jakarta.ws.rs.core.Response.Status;
import java.util.List;
+import java.util.Optional;
import org.onap.policy.clamp.acm.runtime.config.messaging.Publisher;
+import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantAckMessage;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
@Override
- public void active(List<TopicSink> topicSinks) {
- if (topicSinks.size() != 1) {
- throw new IllegalArgumentException("Topic Sink must be one");
- }
- this.topicSinkClient = new TopicSinkClient(topicSinks.get(0));
+ public void active(TopicSink topicSink) {
+ this.topicSinkClient = new TopicSinkClient(topicSink);
active = true;
}
public void stop() {
active = false;
}
+
+ /**
+ * Is default topic.
+ * @return true if default
+ */
+ @Override
+ public boolean isDefaultTopic() {
+ return true;
+ }
}
import jakarta.ws.rs.core.Response.Status;
import java.util.List;
+import java.util.Optional;
import org.onap.policy.clamp.acm.runtime.config.messaging.Publisher;
+import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessage;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
+
public abstract class AbstractParticipantPublisher<E extends ParticipantMessage> implements Publisher {
private TopicSinkClient topicSinkClient;
@Override
- public void active(List<TopicSink> topicSinks) {
- if (topicSinks.size() != 1) {
- throw new IllegalArgumentException("Topic Sink must be one");
- }
- this.topicSinkClient = new TopicSinkClient(topicSinks.get(0));
+ public void active(TopicSink topicSink) {
+ this.topicSinkClient = new TopicSinkClient(topicSink);
active = true;
}
public void stop() {
active = false;
}
+
+ /**
+ * Is default topic.
+ * @return true if default
+ */
+ @Override
+ public boolean isDefaultTopic() {
+ return true;
+ }
}
super.send(message);
}
- private List<ParticipantDefinition> prepareParticipantRestarting(UUID participantId,
+ protected List<ParticipantDefinition> prepareParticipantRestarting(UUID participantId,
AutomationCompositionDefinition acmDefinition) {
var acElements = AcmUtils.extractAcElementsFromServiceTemplate(acmDefinition.getServiceTemplate(),
acRuntimeParameterGroup.getAcmParameters().getToscaElementName());
- // list of entry entry filtered by participantId
+ // list of entry filtered by participantId
List<Entry<String, ToscaNodeTemplate>> elementList = new ArrayList<>();
Map<ToscaConceptIdentifier, UUID> supportedElementMap = new HashMap<>();
for (var elementEntry : acElements) {
--- /dev/null
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.runtime.supervision.comm;
+
+import io.micrometer.core.annotation.Timed;
+import java.time.Instant;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
+import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
+import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
+import org.onap.policy.clamp.models.acm.utils.AcmUtils;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+
+@Component
+public class ParticipantSyncPublisher extends ParticipantRestartPublisher {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantSyncPublisher.class);
+
+ private final AcRuntimeParameterGroup acRuntimeParameterGroup;
+
+ public ParticipantSyncPublisher(AcRuntimeParameterGroup acRuntimeParameterGroup) {
+ super(acRuntimeParameterGroup);
+ this.acRuntimeParameterGroup = acRuntimeParameterGroup;
+ }
+
+
+ /**
+ * Send sync msg to Participant.
+ *
+ * @param participantId the ParticipantId
+ * @param acmDefinition the AutomationComposition Definition
+ * @param automationCompositions the list of automationCompositions
+ */
+ @Override
+ @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published")
+ public void send(UUID participantId, AutomationCompositionDefinition acmDefinition,
+ List<AutomationComposition> automationCompositions) {
+
+ var message = new ParticipantSync();
+ message.setParticipantId(participantId);
+ message.setCompositionId(acmDefinition.getCompositionId());
+ message.setMessageId(UUID.randomUUID());
+ message.setTimestamp(Instant.now());
+ message.setState(acmDefinition.getState());
+ message.setParticipantDefinitionUpdates(prepareParticipantRestarting(participantId, acmDefinition));
+ var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(acmDefinition.getServiceTemplate());
+
+ for (var automationComposition : automationCompositions) {
+ var syncAc = new ParticipantRestartAc();
+ syncAc.setAutomationCompositionId(automationComposition.getInstanceId());
+ for (var element : automationComposition.getElements().values()) {
+ if (participantId.equals(element.getParticipantId())) {
+ var acElementSync = AcmUtils.createAcElementRestart(element);
+ acElementSync.setToscaServiceTemplateFragment(toscaServiceTemplateFragment);
+ syncAc.getAcElementList().add(acElementSync);
+ }
+ }
+ message.getAutomationcompositionList().add(syncAc);
+ }
+
+ LOGGER.debug("Participant Sync sent {}", message);
+ super.send(message);
+ }
+
+ /**
+ * Is default topic.
+ * @return true if default
+ */
+ @Override
+ public boolean isDefaultTopic() {
+ return false;
+ }
+
+}
path: /error
runtime:
+ topics:
+ operationTopic: policy-acruntime-participant
+ syncTopic: acm-ppnt-sync
participantParameters:
heartBeatMs: 20000
maxStatusWaitMs: 200000
topicParameterGroup:
topicSources:
-
- topic: policy-acruntime-participant
+ topic: ${runtime.topics.operationTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
fetchTimeout: 15000
topicSinks:
-
- topic: policy-acruntime-participant
+ topic: ${runtime.topics.operationTopic}
+ servers:
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: NOOP
+
+ -
+ topic: ${runtime.topics.syncTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
// repeat start - should throw an exception
assertThatIllegalStateException().isThrownBy(activator::start);
assertTrue(activator.isAlive());
- verify(publisherFirst, times(1)).active(anyList());
- verify(publisherSecond, times(1)).active(anyList());
+ verify(publisherFirst, times(1)).active(any());
+ verify(publisherSecond, times(1)).active(any());
var sco = CODER.decode("{messageType:" + TOPIC_FIRST + "}", StandardCoderObject.class);
activator.getMsgDispatcher().onTopicEvent(null, "msg", sco);
import org.junit.jupiter.api.Test;
import org.onap.policy.clamp.acm.runtime.instantiation.InstantiationUtils;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
+import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider;
import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler;
import org.onap.policy.clamp.acm.runtime.supervision.SupervisionHandler;
void testSendParticipantRegisterAck() {
var acRegisterAckPublisher = new ParticipantRegisterAckPublisher();
var topicSink = mock(TopicSink.class);
- acRegisterAckPublisher.active(List.of(topicSink));
+ acRegisterAckPublisher.active(topicSink);
acRegisterAckPublisher.send(new ParticipantRegisterAck());
verify(topicSink).send(anyString());
acRegisterAckPublisher.stop();
void testSendParticipantDeregisterAck() {
var acDeregisterAckPublisher = new ParticipantDeregisterAckPublisher();
var topicSink = mock(TopicSink.class);
- acDeregisterAckPublisher.active(Collections.singletonList(topicSink));
+ acDeregisterAckPublisher.active(topicSink);
acDeregisterAckPublisher.send(new ParticipantDeregisterAck());
verify(topicSink).send(anyString());
acDeregisterAckPublisher.stop();
void testSendAutomationCompositionStateChangePublisher() {
var publisher = new AutomationCompositionStateChangePublisher();
var topicSink = mock(TopicSink.class);
- publisher.active(List.of(topicSink));
+ publisher.active(topicSink);
publisher.send(getAutomationComposition(), 0, true);
verify(topicSink).send(anyString());
publisher.stop();
var publisher = new ParticipantPrimePublisher(mock(ParticipantProvider.class),
mock(AcmParticipantProvider.class), mock(AcRuntimeParameterGroup.class));
var topicSink = mock(TopicSink.class);
- publisher.active(List.of(topicSink));
+ publisher.active(topicSink);
publisher.sendDepriming(UUID.randomUUID());
verify(topicSink).send(anyString());
}
var publisher = new ParticipantPrimePublisher(participantProvider, mock(AcmParticipantProvider.class),
CommonTestData.getTestParamaterGroup());
var topicSink = mock(TopicSink.class);
- publisher.active(List.of(topicSink));
+ publisher.active(topicSink);
var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
serviceTemplate.setName("Name");
serviceTemplate.setVersion("1.0.0");
void testParticipantStatusReqPublisher() {
var publisher = new ParticipantStatusReqPublisher();
var topicSink = mock(TopicSink.class);
- publisher.active(List.of(topicSink));
+ publisher.active(topicSink);
publisher.send(CommonTestData.getParticipantId());
verify(topicSink).send(anyString());
}
void testParticipantRegisterAckPublisher() {
var publisher = new ParticipantRegisterAckPublisher();
var topicSink = mock(TopicSink.class);
- publisher.active(List.of(topicSink));
+ publisher.active(topicSink);
publisher.send(UUID.randomUUID(), CommonTestData.getParticipantId());
verify(topicSink).send(anyString());
}
void testParticipantDeregisterAckPublisher() {
var publisher = new ParticipantDeregisterAckPublisher();
var topicSink = mock(TopicSink.class);
- publisher.active(List.of(topicSink));
+ publisher.active(topicSink);
publisher.send(UUID.randomUUID());
verify(topicSink).send(anyString());
}
void testAcElementPropertiesPublisher() {
var publisher = new AcElementPropertiesPublisher();
var topicSink = mock(TopicSink.class);
- publisher.active(List.of(topicSink));
+ publisher.active(topicSink);
var automationComposition =
InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_UPDATE_JSON, "Crud");
publisher.send(automationComposition);
void testAutomationCompositionMigrationPublisher() {
var publisher = new AutomationCompositionMigrationPublisher();
var topicSink = mock(TopicSink.class);
- publisher.active(List.of(topicSink));
+ publisher.active(topicSink);
var automationComposition =
InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_UPDATE_JSON, "Crud");
publisher.send(automationComposition, UUID.randomUUID());
void testParticipantRestartPublisher() {
var publisher = new ParticipantRestartPublisher(CommonTestData.getTestParamaterGroup());
var topicSink = mock(TopicSink.class);
- publisher.active(List.of(topicSink));
+ publisher.active(topicSink);
+
+ var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
+ var acmDefinition = new AutomationCompositionDefinition();
+ acmDefinition.setCompositionId(UUID.randomUUID());
+ acmDefinition.setServiceTemplate(serviceTemplate);
+ var acElements = AcmUtils
+ .extractAcElementsFromServiceTemplate(serviceTemplate, "");
+ acmDefinition.setElementStateMap(AcmUtils.createElementStateMap(acElements, AcTypeState.PRIMED));
+
+ var automationComposition =
+ InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_UPDATE_JSON, "Crud");
+
+ var participantId = automationComposition.getElements().values().iterator().next().getParticipantId();
+ acmDefinition.getElementStateMap().values().iterator().next().setParticipantId(participantId);
+
+ publisher.send(participantId, acmDefinition, List.of(automationComposition));
+ verify(topicSink).send(anyString());
+ }
+
+ @Test
+ void testParticipantSyncPublisher() {
+ var publisher = new ParticipantSyncPublisher(CommonTestData.getTestParamaterGroup());
+ var topicSink = mock(TopicSink.class);
+ publisher.active(topicSink);
var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
var acmDefinition = new AutomationCompositionDefinition();
context-path: /onap/policy/clamp/acm
runtime:
+ topics:
+ operationTopic: policy-acruntime-participant
+ syncTopic: acm-ppnt-sync
participantParameters:
updateParameters:
maxRetryCount: 3
topicParameterGroup:
topicSources:
-
- topic: POLICY-ACRUNTIME-PARTICIPANT
+ topic: ${runtime.topics.operationTopic}
servers:
- localhost
topicCommInfrastructure: noop
topicCommInfrastructure: noop
servers:
- localhost
- topic: POLICY-ACRUNTIME-PARTICIPANT
+ topic: ${runtime.topics.operationTopic}
+
+ - topic: ${runtime.topics.syncTopic}
+ servers:
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: noop
tracing:
enabled: true
context-path: /onap/policy/clamp/acm
runtime:
+ topics:
+ operationTopic: policy-acruntime-participant
+ syncTopic: acm-ppnt-sync
participantParameters:
updateParameters:
maxRetryCount: 3
topicParameterGroup:
topicSources:
-
- topic: policy-acruntime-participant
+ topic: ${runtime.topics.operationTopic}
servers:
- kafka:9092
topicCommInfrastructure: NOOP
topicCommInfrastructure: NOOP
servers:
- kafka:9092
- topic: policy-acruntime-participant
+ topic: ${runtime.topics.operationTopic}
+ -
+ topic: ${runtime.topics.syncTopic}
+ servers:
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: NOOP
acmParameters:
acElementName: org.onap.policy.clamp.acm.AutomationCompositionElement
acNodeType: org.onap.policy.clamp.acm.AutomationComposition
"databasePassword": "P01icY",
"persistenceUnit": "InstantiationTests"
},
+ "topics":{
+ "operationTopic": "policy-acruntime-participant",
+ "syncTopic": "acm-ppnt-sync"
+ },
"topicParameterGroup": {
+
"topicSources": [
{
- "topic": "POLICY-ACRUNTIME-PARTICIPANT",
+ "topic": "${topics.operationTopic}",
"servers": [
"localhost"
],
],
"topicSinks": [
{
- "topic": "POLICY-ACRUNTIME-PARTICIPANT",
+ "topic": "${topics.operationTopic}",
+ "servers": [
+ "localhost"
+ ],
+ "topicCommInfrastructure": "NOOP"
+ },
+ {
+ "topic": "${topics.syncTopic}",
"servers": [
"localhost"
],