LOGGER.debug("Sent AutomationComposition Update/StateChange Ack to runtime - {}", automationCompositionAck);
}
- /**
- * Method to send Participant heartbeat to clamp on demand.
- *
- * @param participantStatus the Participant Status
- */
- @Timed(value = "publisher.participant_status", description = "PARTICIPANT_STATUS messages published")
- public void sendHeartbeat(final ParticipantStatus participantStatus) {
- validate();
- topicSinkClient.send(participantStatus);
- LOGGER.debug("Sent Participant heartbeat to CLAMP - {}", participantStatus);
- }
-
private void validate() {
if (!active) {
throw new AutomationCompositionRuntimeException(Status.NOT_ACCEPTABLE, NOT_ACTIVE_TEXT);
participantDefinition.setParticipantId(cacheProvider.getParticipantId());
participantDefinition.setAutomationCompositionElementDefinitionList(List.of(acElementDefinition));
statusMsg.setParticipantDefinitionUpdates(List.of(participantDefinition));
- publisher.sendHeartbeat(statusMsg);
+ publisher.sendParticipantStatus(statusMsg);
}
private AutomationCompositionElementDefinition getAutomationCompositionElementDefinition(
import java.util.concurrent.ConcurrentHashMap;
import lombok.Getter;
import lombok.NonNull;
+import lombok.Setter;
import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
@Getter
private final UUID participantId;
+ @Getter
+ @Setter
+ private boolean registered = false;
+
private final List<ParticipantSupportedElementType> supportedAcElementTypes;
@Getter
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
import lombok.Getter;
import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
() -> TopicEndpointManager.getManager().start(),
() -> TopicEndpointManager.getManager().shutdown());
+ listeners.forEach(listener ->
+ addAction("Listener " + listener.getClass().getSimpleName(),
+ () -> msgDispatcher.register(listener.getType(), listener.getScoListener()),
+ () -> msgDispatcher.unregister(listener.getType())));
+
publishers.forEach(publisher ->
addAction("Publisher " + publisher.getClass().getSimpleName(),
() -> publisher.active(topicSinks),
publisher::stop));
- listeners.forEach(listener ->
- addAction("Listener " + listener.getClass().getSimpleName(),
- () -> msgDispatcher.register(listener.getType(), listener.getScoListener()),
- () -> msgDispatcher.unregister(listener.getType())));
-
addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher);
// @formatter:on
}
public void handleContextRefreshEvent(ContextRefreshedEvent ctxRefreshedEvent) {
if (!isAlive()) {
start();
- sendParticipantRegister();
+ var task = new TimerTask() {
+ @Override
+ public void run() {
+ new Thread(participantHandler::sendParticipantRegister).start();
+ }
+ };
+ new Timer().schedule(task, 5000);
}
}
}
}
- private void sendParticipantRegister() {
- participantHandler.sendParticipantRegister();
- }
-
private void sendParticipantDeregister() {
participantHandler.sendParticipantDeregister();
}
*/
@Timed(value = "listener.participant_status_req", description = "PARTICIPANT_STATUS_REQ messages received")
public void handleParticipantStatusReq(final ParticipantStatusReq participantStatusReqMsg) {
- publisher.sendParticipantStatus(makeHeartbeat());
+ sendHeartbeat();
}
/**
public void handleParticipantRegisterAck(ParticipantRegisterAck participantRegisterAckMsg) {
LOGGER.debug("ParticipantRegisterAck message received as responseTo {}",
participantRegisterAckMsg.getResponseTo());
+ cacheProvider.setRegistered(true);
publisher.sendParticipantStatus(makeHeartbeat());
}
*/
public void sendHeartbeat() {
if (publisher.isActive()) {
- publisher.sendHeartbeat(makeHeartbeat());
+ if (!cacheProvider.isRegistered()) {
+ sendParticipantRegister();
+ } else {
+ publisher.sendParticipantStatus(makeHeartbeat());
+ }
}
}
var participantStatus = new ParticipantStatus();
assertDoesNotThrow(() -> publisher.sendParticipantStatus(participantStatus));
- assertDoesNotThrow(() -> publisher.sendHeartbeat(participantStatus));
-
var participantRegister = new ParticipantRegister();
assertDoesNotThrow(() -> publisher.sendParticipantRegister(participantRegister));
var participantStatus = new ParticipantStatus();
assertThrows(AutomationCompositionRuntimeException.class,
() -> publisher.sendParticipantStatus(participantStatus));
- assertThrows(AutomationCompositionRuntimeException.class, () -> publisher.sendHeartbeat(participantStatus));
var participantRegister = new ParticipantRegister();
assertThrows(AutomationCompositionRuntimeException.class,
var acOutHandler = new AutomationCompositionOutHandler(publisher, cacheProvider);
acOutHandler.sendAcDefinitionInfo(null, null, Map.of());
- verify(publisher, times(0)).sendHeartbeat(any(ParticipantStatus.class));
+ verify(publisher, times(0)).sendParticipantStatus(any(ParticipantStatus.class));
acOutHandler.sendAcDefinitionInfo(UUID.randomUUID(), null, Map.of());
- verify(publisher, times(0)).sendHeartbeat(any(ParticipantStatus.class));
+ verify(publisher, times(0)).sendParticipantStatus(any(ParticipantStatus.class));
acOutHandler.sendAcDefinitionInfo(compositionId, new ToscaConceptIdentifier("wrong", "1.0.0"), Map.of());
- verify(publisher, times(0)).sendHeartbeat(any(ParticipantStatus.class));
+ verify(publisher, times(0)).sendParticipantStatus(any(ParticipantStatus.class));
acOutHandler.sendAcDefinitionInfo(compositionId, elementId, Map.of());
- verify(publisher).sendHeartbeat(any(ParticipantStatus.class));
+ verify(publisher).sendParticipantStatus(any(ParticipantStatus.class));
}
@Test
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@Test
void handleParticipantStatusReqTest() {
var publisher = mock(ParticipantMessagePublisher.class);
+ when(publisher.isActive()).thenReturn(true);
var cacheProvider = mock(CacheProvider.class);
var participantHandler = new ParticipantHandler(mock(AutomationCompositionHandler.class),
mock(AcLockHandler.class), mock(AcDefinitionHandler.class), publisher, cacheProvider);
participantHandler.handleParticipantStatusReq(new ParticipantStatusReq());
+ verify(publisher).sendParticipantRegister(any(ParticipantRegister.class));
+
+ when(cacheProvider.isRegistered()).thenReturn(true);
+ clearInvocations(publisher);
+ participantHandler.handleParticipantStatusReq(new ParticipantStatusReq());
verify(publisher).sendParticipantStatus(any(ParticipantStatus.class));
}
void sendHeartbeatTest() {
var cacheProvider = mock(CacheProvider.class);
when(cacheProvider.getParticipantId()).thenReturn(CommonTestData.getParticipantId());
+ when(cacheProvider.isRegistered()).thenReturn(false);
when(cacheProvider.getAutomationCompositions()).thenReturn(CommonTestData.getTestAutomationCompositionMap());
var publisher = mock(ParticipantMessagePublisher.class);
when(publisher.isActive()).thenReturn(true);
var participantHandler = new ParticipantHandler(mock(AutomationCompositionHandler.class),
mock(AcLockHandler.class), acHandler, publisher, cacheProvider);
participantHandler.sendHeartbeat();
- verify(publisher).sendHeartbeat(any(ParticipantStatus.class));
- }
+ verify(publisher).sendParticipantRegister(any(ParticipantRegister.class));
+ when(cacheProvider.isRegistered()).thenReturn(true);
+ clearInvocations(publisher);
+ participantHandler.sendHeartbeat();
+ verify(publisher).sendParticipantStatus(any(ParticipantStatus.class));
+ }
}