import java.util.UUID;
import lombok.Getter;
+import lombok.NonNull;
import lombok.Setter;
import lombok.ToString;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
this.participantType = source.participantType;
this.participantId = source.participantId;
}
+
+ /**
+ * Determines if this message applies to this participant type.
+ *
+ * @param participantType type of the participant to match against
+ * @param participantId id of the participant to match against
+ * @return {@code true} if this message applies to this participant, {@code false} otherwise
+ */
+ public boolean appliesTo(@NonNull final ToscaConceptIdentifier participantType,
+ @NonNull final ToscaConceptIdentifier participantId) {
+ // Broadcast message to all participants
+ if (this.participantType == null) {
+ return true;
+ }
+
+ if (!participantType.equals(this.participantType)) {
+ return false;
+ }
+
+ // Broadcast message to all control loop elements on this participant
+ if (this.participantId == null) {
+ return true;
+ }
+
+ // Targeted message at this specific participant
+ return participantId.equals(this.participantId);
+ }
}
return true;
}
- // Broadcast message to all control loop elements on this participant
- if (participantType.equals(this.participantType) && this.participantId == null) {
- return true;
+ if (!participantType.equals(this.participantType)) {
+ return false;
}
- // Targeted message at this specific participant
- if (participantType.equals(this.participantType) && participantId.equals(this.participantId)) {
+ // Broadcast message to all control loop elements on this participant
+ if (this.participantId == null) {
return true;
}
- // Message is not for this participant
- return false;
+ // Targeted message at this specific participant
+ return participantId.equals(this.participantId);
}
}
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageUtils.assertSerializable;
import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
class ParticipantAckMessageTest {
private ParticipantAckMessage message;
+ private static final ToscaConceptIdentifier PTYPE_456 = new ToscaConceptIdentifier("PType", "4.5.6");
+ private static final ToscaConceptIdentifier PTYPE_457 = new ToscaConceptIdentifier("PType", "4.5.7");
+ private static final ToscaConceptIdentifier ID_123 = new ToscaConceptIdentifier("id", "1.2.3");
+ private static final ToscaConceptIdentifier ID_124 = new ToscaConceptIdentifier("id", "1.2.4");
+
@Test
void testCopyConstructor() throws CoderException {
assertThatThrownBy(() -> new ParticipantAckMessage((ParticipantAckMessage) null))
assertSerializable(message, ParticipantAckMessage.class);
}
+ @Test
+ void testAppliesTo_NullParticipantId() {
+ message = makeMessage();
+
+ assertThatThrownBy(() -> message.appliesTo(null, null)).isInstanceOf(NullPointerException.class);
+ assertThatThrownBy(() -> message.appliesTo(PTYPE_456, null)).isInstanceOf(NullPointerException.class);
+ assertThatThrownBy(() -> message.appliesTo(null, ID_123)).isInstanceOf(NullPointerException.class);
+ }
+
+ @Test
+ void testAppliesTo_ParticipantIdMatches() {
+ message = makeMessage();
+
+ // ParticipantId matches
+ assertTrue(message.appliesTo(PTYPE_456, ID_123));
+ assertFalse(message.appliesTo(PTYPE_456, ID_124));
+ assertFalse(message.appliesTo(PTYPE_457, ID_123));
+ }
+
+ @Test
+ void testAppliesTo_ParticipantIdNoMatch() {
+ message = makeMessage();
+
+ // ParticipantId does not match
+ ToscaConceptIdentifier id = new ToscaConceptIdentifier();
+ id.setName("id1111");
+ id.setVersion("3.2.1");
+ assertFalse(message.appliesTo(id, id));
+ message.setParticipantType(null);
+ assertTrue(message.appliesTo(id, id));
+ }
+
private ParticipantAckMessage makeMessage() {
ParticipantAckMessage msg = new ParticipantAckMessage(ParticipantMessageType.PARTICIPANT_DEREGISTER_ACK);
+ msg.setParticipantType(PTYPE_456);
+ msg.setParticipantId(ID_123);
msg.setMessage("Successfull Ack");
msg.setResult(true);
msg.setResponseTo(UUID.randomUUID());
class ParticipantMessageTest {
private ParticipantMessage message;
+ private static final ToscaConceptIdentifier PTYPE_456 = new ToscaConceptIdentifier("PType", "4.5.6");
+ private static final ToscaConceptIdentifier PTYPE_457 = new ToscaConceptIdentifier("PType", "4.5.7");
+ private static final ToscaConceptIdentifier ID_123 = new ToscaConceptIdentifier("id", "1.2.3");
+ private static final ToscaConceptIdentifier ID_124 = new ToscaConceptIdentifier("id", "1.2.4");
+
@Test
void testCopyConstructor() throws CoderException {
assertThatThrownBy(() -> new ParticipantMessage((ParticipantMessage) null))
message = makeMessage();
assertThatThrownBy(() -> message.appliesTo(null, null)).isInstanceOf(NullPointerException.class);
- assertThatThrownBy(() -> message.appliesTo(new ToscaConceptIdentifier("PType", "4.5.6"), null))
- .isInstanceOf(NullPointerException.class);
- assertThatThrownBy(() -> message.appliesTo(null, new ToscaConceptIdentifier("id", "1.2.3")))
- .isInstanceOf(NullPointerException.class);
+ assertThatThrownBy(() -> message.appliesTo(PTYPE_456, null)).isInstanceOf(NullPointerException.class);
+ assertThatThrownBy(() -> message.appliesTo(null, ID_123)).isInstanceOf(NullPointerException.class);
}
@Test
message = makeMessage();
// ParticipantId matches
- assertTrue(message.appliesTo(new ToscaConceptIdentifier("PType", "4.5.6"),
- new ToscaConceptIdentifier("id", "1.2.3")));
- assertFalse(message.appliesTo(new ToscaConceptIdentifier("PType", "4.5.6"),
- new ToscaConceptIdentifier("id", "1.2.4")));
- assertFalse(message.appliesTo(new ToscaConceptIdentifier("PType", "4.5.7"),
- new ToscaConceptIdentifier("id", "1.2.3")));
+ assertTrue(message.appliesTo(PTYPE_456, ID_123));
+ assertFalse(message.appliesTo(PTYPE_456, ID_124));
+ assertFalse(message.appliesTo(PTYPE_457, ID_123));
}
@Test
private ParticipantMessage makeMessage() {
ParticipantMessage msg = new ParticipantMessage(ParticipantMessageType.PARTICIPANT_STATE_CHANGE);
- msg.setParticipantType(new ToscaConceptIdentifier("PType", "4.5.6"));
- msg.setParticipantId(new ToscaConceptIdentifier("id", "1.2.3"));
+ msg.setParticipantType(PTYPE_456);
+ msg.setParticipantId(ID_123);
msg.setMessageId(UUID.randomUUID());
msg.setTimestamp(Instant.ofEpochMilli(3000));
synchronized (lockit) {
ParticipantMessagePublisher participantMessagePublisher =
- new ParticipantMessagePublisher(Collections.singletonList(Mockito.mock(TopicSink.class)));
+ new ParticipantMessagePublisher();
+ participantMessagePublisher.active(Collections.singletonList(Mockito.mock(TopicSink.class)));
participantMessagePublisher.sendParticipantRegister(participantRegisterMsg);
}
}
synchronized (lockit) {
ParticipantMessagePublisher participantMessagePublisher =
- new ParticipantMessagePublisher(Collections.singletonList(Mockito.mock(TopicSink.class)));
+ new ParticipantMessagePublisher();
+ participantMessagePublisher.active(Collections.singletonList(Mockito.mock(TopicSink.class)));
participantMessagePublisher.sendParticipantDeregister(participantDeregisterMsg);
}
}
participantUpdateAckMsg.setResult(true);
synchronized (lockit) {
- ParticipantMessagePublisher participantMessagePublisher =
- new ParticipantMessagePublisher(Collections.singletonList(Mockito.mock(TopicSink.class)));
+ ParticipantMessagePublisher participantMessagePublisher = new ParticipantMessagePublisher();
+ participantMessagePublisher.active(Collections.singletonList(Mockito.mock(TopicSink.class)));
participantMessagePublisher.sendParticipantUpdateAck(participantUpdateAckMsg);
}
}
void testParticipantStatusHeartbeat() throws Exception {
final ParticipantStatus heartbeat = participantHandler.makeHeartbeat(true);
synchronized (lockit) {
- ParticipantMessagePublisher publisher =
- new ParticipantMessagePublisher(Collections.singletonList(Mockito.mock(TopicSink.class)));
+ ParticipantMessagePublisher publisher = new ParticipantMessagePublisher();
+ publisher.active(Collections.singletonList(Mockito.mock(TopicSink.class)));
assertThatCode(() -> publisher.sendHeartbeat(heartbeat)).doesNotThrowAnyException();
}
}
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopStateChange;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.springframework.stereotype.Component;
super(ControlLoopStateChange.class, participantHandler,
participantHandler::handleControlLoopStateChange);
}
+
+ @Override
+ public String getType() {
+ return ParticipantMessageType.CONTROL_LOOP_STATE_CHANGE.name();
+ }
}
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.springframework.stereotype.Component;
public ControlLoopUpdateListener(final ParticipantHandler participantHandler) {
super(ControlLoopUpdate.class, participantHandler, participantHandler::handleControlLoopUpdate);
}
+
+ @Override
+ public String getType() {
+ return ParticipantMessageType.CONTROL_LOOP_UPDATE.name();
+ }
}
import java.util.function.Consumer;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantAckMessage;
+import org.onap.policy.clamp.controlloop.participant.intermediary.handler.Listener;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.listeners.ScoListener;
/**
* Abstract Listener for Participant Ack messages sent by runtime.
*/
-public abstract class ParticipantAckListener<T extends ParticipantAckMessage> extends ScoListener<T> {
+public abstract class ParticipantAckListener<T extends ParticipantAckMessage> extends ScoListener<T>
+ implements Listener {
private final ParticipantHandler participantHandler;
private final Consumer<T> consumer;
@Override
public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, T message) {
- consumer.accept(message);
+ if (participantHandler.appliesTo(message)) {
+ consumer.accept(message);
+ }
+ }
+
+ @Override
+ public ScoListener<T> getScoListener() {
+ return this;
}
}
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.springframework.stereotype.Component;
public ParticipantDeregisterAckListener(final ParticipantHandler participantHandler) {
super(ParticipantDeregisterAck.class, participantHandler, participantHandler::handleParticipantDeregisterAck);
}
+
+ @Override
+ public String getType() {
+ return ParticipantMessageType.PARTICIPANT_DEREGISTER_ACK.name();
+ }
}
import java.util.function.Consumer;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage;
+import org.onap.policy.clamp.controlloop.participant.intermediary.handler.Listener;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.listeners.ScoListener;
/**
* Abstract Listener for Participant messages sent by CLAMP.
*/
-public abstract class ParticipantListener<T extends ParticipantMessage> extends ScoListener<T> {
+public abstract class ParticipantListener<T extends ParticipantMessage> extends ScoListener<T> implements Listener {
private final ParticipantHandler participantHandler;
private final Consumer<T> consumer;
consumer.accept(message);
}
}
+
+ @Override
+ public ScoListener<T> getScoListener() {
+ return this;
+ }
}
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
import java.util.List;
+import javax.ws.rs.core.Response.Status;
+import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopAck;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck;
+import org.onap.policy.clamp.controlloop.participant.intermediary.handler.Publisher;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
/**
* This class is used to send Participant Status messages to clamp using TopicSinkClient.
*
*/
-public class ParticipantMessagePublisher {
+@Component
+public class ParticipantMessagePublisher implements Publisher {
private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantMessagePublisher.class);
- private final TopicSinkClient topicSinkClient;
+ private boolean active = false;
+ private TopicSinkClient topicSinkClient;
/**
* Constructor for instantiating ParticipantMessagePublisher.
*
* @param topicSinks the topic sinks
*/
- public ParticipantMessagePublisher(List<TopicSink> topicSinks) {
+ @Override
+ public void active(List<TopicSink> topicSinks) {
if (topicSinks.size() != 1) {
throw new IllegalArgumentException("Configuration unsupported, Topic sinks greater than 1");
}
this.topicSinkClient = new TopicSinkClient(topicSinks.get(0));
+ active = true;
}
/**
* @param participantStatus the Participant Status
*/
public void sendParticipantStatus(final ParticipantStatus participantStatus) {
+ if (!active) {
+ throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+ }
topicSinkClient.send(participantStatus);
LOGGER.debug("Sent Participant Status message to CLAMP - {}", participantStatus);
}
* @param participantRegister the Participant Status
*/
public void sendParticipantRegister(final ParticipantRegister participantRegister) {
+ if (!active) {
+ throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+ }
topicSinkClient.send(participantRegister);
LOGGER.debug("Sent Participant Register message to CLAMP - {}", participantRegister);
}
* @param participantDeregister the Participant Status
*/
public void sendParticipantDeregister(final ParticipantDeregister participantDeregister) {
+ if (!active) {
+ throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+ }
topicSinkClient.send(participantDeregister);
LOGGER.debug("Sent Participant Deregister message to CLAMP - {}", participantDeregister);
}
* @param participantUpdateAck the Participant Update Ack
*/
public void sendParticipantUpdateAck(final ParticipantUpdateAck participantUpdateAck) {
+ if (!active) {
+ throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+ }
topicSinkClient.send(participantUpdateAck);
LOGGER.debug("Sent Participant Update Ack message to CLAMP - {}", participantUpdateAck);
}
* @param controlLoopAck ControlLoop Update/StateChange Ack
*/
public void sendControlLoopAck(final ControlLoopAck controlLoopAck) {
+ if (!active) {
+ throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+ }
topicSinkClient.send(controlLoopAck);
LOGGER.debug("Sent ControlLoop Update/StateChange Ack to runtime - {}", controlLoopAck);
}
* @param participantStatus the Participant Status
*/
public void sendHeartbeat(final ParticipantStatus participantStatus) {
+ if (!active) {
+ throw new ControlLoopRuntimeException(Status.NOT_ACCEPTABLE, "Not Active!");
+ }
topicSinkClient.send(participantStatus);
LOGGER.debug("Sent Participant heartbeat to CLAMP - {}", participantStatus);
}
+
+ @Override
+ public void stop() {
+ active = false;
+ }
}
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.springframework.stereotype.Component;
public ParticipantRegisterAckListener(final ParticipantHandler participantHandler) {
super(ParticipantRegisterAck.class, participantHandler, participantHandler::handleParticipantRegisterAck);
}
+
+ @Override
+ public String getType() {
+ return ParticipantMessageType.PARTICIPANT_REGISTER_ACK.name();
+ }
}
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatusReq;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.springframework.stereotype.Component;
public ParticipantStatusReqListener(final ParticipantHandler participantHandler) {
super(ParticipantStatusReq.class, participantHandler, participantHandler::handleParticipantStatusReq);
}
+
+ @Override
+ public String getType() {
+ return ParticipantMessageType.PARTICIPANT_STATUS_REQ.name();
+ }
}
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.springframework.stereotype.Component;
public ParticipantUpdateListener(final ParticipantHandler participantHandler) {
super(ParticipantUpdate.class, participantHandler, participantHandler::handleParticipantUpdate);
}
+
+ @Override
+ public String getType() {
+ return ParticipantMessageType.PARTICIPANT_UPDATE.name();
+ }
}
+++ /dev/null
-/*-
- * ============LICENSE_START=======================================================
- * Copyright (C) 2021 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.clamp.controlloop.participant.intermediary.config;
-
-import java.util.List;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantMessagePublisher;
-import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-@Configuration
-public class BeanFactory {
-
- // Name of the message type for messages on topics
- private static final String[] MSG_TYPE_NAMES = {"messageType"};
-
- /**
- * create ParticipantMessagePublisher.
- *
- * @param parameters the ParticipantParameters
- * @return ParticipantMessagePublisher
- */
- @Bean
- public ParticipantMessagePublisher publisher(final ParticipantParameters parameters) {
- List<TopicSink> topicSinks = TopicEndpointManager.getManager()
- .addTopicSinks(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSinks());
- return new ParticipantMessagePublisher(topicSinks);
- }
-
- @Bean
- public MessageTypeDispatcher msgDispatcher() {
- return new MessageTypeDispatcher(MSG_TYPE_NAMES);
- }
-}
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
import org.onap.policy.clamp.controlloop.participant.intermediary.api.ParticipantIntermediaryApi;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopStateChangeListener;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopUpdateListener;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantDeregisterAckListener;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantRegisterAckListener;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusReqListener;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantUpdateListener;
import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
import org.onap.policy.common.utils.services.ServiceManagerContainer;
-import org.springframework.context.ApplicationContext;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
@Component
public class IntermediaryActivator extends ServiceManagerContainer implements Closeable {
- private final ApplicationContext applicationContext;
+ private static final String[] MSG_TYPE_NAMES = {"messageType"};
// Topics from which the participant receives and to which the participant sends messages
+ private List<TopicSink> topicSinks;
private List<TopicSource> topicSources;
ParticipantIntermediaryApi participantIntermediaryApi;
+ private final MessageTypeDispatcher msgDispatcher;
+
/**
* Instantiate the activator for participant.
*
- * @param applicationContext ApplicationContext
* @param parameters the ParticipantParameters
+ * @param publishers list of Publishers
+ * @param listeners list of Listeners
*/
- public IntermediaryActivator(final ApplicationContext applicationContext, final ParticipantParameters parameters,
- ParticipantIntermediaryApi participantIntermediaryApi) {
- this.applicationContext = applicationContext;
+ public IntermediaryActivator(final ParticipantParameters parameters,
+ ParticipantIntermediaryApi participantIntermediaryApi, List<Publisher> publishers,
+ List<Listener> listeners) {
this.participantIntermediaryApi = participantIntermediaryApi;
+ topicSinks = TopicEndpointManager.getManager()
+ .addTopicSinks(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSinks());
+
topicSources = TopicEndpointManager.getManager()
.addTopicSources(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSources());
- // @formatter:off
+ msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
+ // @formatter:off
addAction("Topic endpoint management",
- () -> TopicEndpointManager.getManager().start(),
- () -> TopicEndpointManager.getManager().shutdown());
+ () -> TopicEndpointManager.getManager().start(),
+ () -> TopicEndpointManager.getManager().shutdown());
+
+ publishers.forEach(publisher ->
+ addAction("Publisher " + publisher.getClass().getSimpleName(),
+ () -> publisher.active(topicSinks),
+ publisher::stop));
+
+ listeners.forEach(listener ->
+ addAction("Listener " + listener.getClass().getSimpleName(),
+ () -> msgDispatcher.register(listener.getType(), listener.getScoListener()),
+ () -> msgDispatcher.unregister(listener.getType())));
addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher);
// @formatter:on
* Registers the dispatcher with the topic source(s).
*/
private void registerMsgDispatcher() {
- MessageTypeDispatcher msgDispatcher = applicationContext.getBean(MessageTypeDispatcher.class);
-
- msgDispatcher.register(ParticipantMessageType.PARTICIPANT_STATUS_REQ.name(),
- applicationContext.getBean(ParticipantStatusReqListener.class));
-
- msgDispatcher.register(ParticipantMessageType.CONTROL_LOOP_STATE_CHANGE.name(),
- applicationContext.getBean(ControlLoopStateChangeListener.class));
-
- msgDispatcher.register(ParticipantMessageType.CONTROL_LOOP_UPDATE.name(),
- applicationContext.getBean(ControlLoopUpdateListener.class));
-
- msgDispatcher.register(ParticipantMessageType.PARTICIPANT_REGISTER_ACK.name(),
- applicationContext.getBean(ParticipantRegisterAckListener.class));
-
- msgDispatcher.register(ParticipantMessageType.PARTICIPANT_DEREGISTER_ACK.name(),
- applicationContext.getBean(ParticipantDeregisterAckListener.class));
-
- msgDispatcher.register(ParticipantMessageType.PARTICIPANT_UPDATE.name(),
- applicationContext.getBean(ParticipantUpdateListener.class));
-
for (final TopicSource source : topicSources) {
source.register(msgDispatcher);
}
* Unregisters the dispatcher from the topic source(s).
*/
private void unregisterMsgDispatcher() {
- MessageTypeDispatcher msgDispatcher = applicationContext.getBean(MessageTypeDispatcher.class);
-
for (final TopicSource source : topicSources) {
source.unregister(msgDispatcher);
}
@Override
public void close() throws IOException {
- super.shutdown();
+ if (isAlive()) {
+ super.shutdown();
+ }
}
}
--- /dev/null
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.participant.intermediary.handler;
+
+import org.onap.policy.common.endpoints.listeners.ScoListener;
+
+public interface Listener {
+
+ /**
+ * Get the type of message of interest to the listener.
+ *
+ * @return type of message of interest to the listener
+ */
+ String getType();
+
+ /**
+ * Get listener to register.
+ *
+ * @return listener to register
+ */
+ <T> ScoListener<T> getScoListener();
+}
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopStateChange;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ControlLoopUpdate;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantAckMessage;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage;
return participantMsg.appliesTo(participantType, participantId);
}
+ /**
+ * Check if a participant message applies to this participant handler.
+ *
+ * @param participantMsg the message to check
+ * @return true if it applies, false otherwise
+ */
+ public boolean appliesTo(ParticipantAckMessage participantMsg) {
+ return participantMsg.appliesTo(participantType, participantId);
+ }
+
/**
* Method to send ParticipantRegister message to controlloop runtime.
*/
--- /dev/null
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.participant.intermediary.handler;
+
+import java.util.List;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+
+/**
+ * Publisher.
+ */
+public interface Publisher {
+
+ void active(List<TopicSink> topicSinks);
+
+ void stop();
+}
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
-import java.util.stream.Stream;
-import javax.ws.rs.core.Response.Status;
import lombok.Getter;
import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException;
import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
import org.onap.policy.common.utils.services.ServiceManagerContainer;
+import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
* Constructor.
*
* @param clRuntimeParameterGroup the parameters for the control loop runtime service
- * @param publishers array of Publishers
- * @param listeners array of Listeners
+ * @param publishers list of Publishers
+ * @param listeners list of Listeners
* @throws ControlLoopRuntimeException if the activator does not start
*/
- public MessageDispatcherActivator(final ClRuntimeParameterGroup clRuntimeParameterGroup, Publisher[] publishers,
- Listener[] listeners) {
+ public MessageDispatcherActivator(final ClRuntimeParameterGroup clRuntimeParameterGroup, List<Publisher> publishers,
+ List<Listener> listeners) {
topicSinks = TopicEndpointManager.getManager()
.addTopicSinks(clRuntimeParameterGroup.getTopicParameterGroup().getTopicSinks());
topicSources = TopicEndpointManager.getManager()
.addTopicSources(clRuntimeParameterGroup.getTopicParameterGroup().getTopicSources());
- try {
- msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
- } catch (final RuntimeException e) {
- throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR,
- "topic message dispatcher failed to start", e);
- }
+ msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
// @formatter:off
addAction("Topic endpoint management",
() -> TopicEndpointManager.getManager().start(),
() -> TopicEndpointManager.getManager().shutdown());
- Stream.of(publishers).forEach(publisher ->
+ publishers.forEach(publisher ->
addAction("Publisher " + publisher.getClass().getSimpleName(),
() -> publisher.active(topicSinks),
- () -> publisher.stop()));
+ publisher::stop));
- Stream.of(listeners).forEach(listener ->
+ listeners.forEach(listener ->
addAction("Listener " + listener.getClass().getSimpleName(),
() -> msgDispatcher.register(listener.getType(), listener.getScoListener()),
() -> msgDispatcher.unregister(listener.getType())));
}
}
+ /**
+ * Handle ContextClosedEvent.
+ *
+ * @param ctxClosedEvent ContextClosedEvent
+ */
+ @EventListener
+ public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) {
+ if (isAlive()) {
+ stop();
+ }
+ }
+
@Override
public void close() throws IOException {
if (isAlive()) {
- stop();
+ super.shutdown();
}
}
}
public void handleParticipantMessage(ParticipantRegister participantRegisterMessage) {
LOGGER.debug("Participant Register received {}", participantRegisterMessage);
- participantRegisterAckPublisher.send(participantRegisterMessage.getMessageId());
+ participantRegisterAckPublisher.send(participantRegisterMessage.getMessageId(),
+ participantRegisterMessage.getParticipantId(), participantRegisterMessage.getParticipantType());
participantUpdatePublisher.send(participantRegisterMessage.getParticipantId(),
participantRegisterMessage.getParticipantType(), true);
throws PfModelException, ControlLoopException {
if (participantStatusMessage.getControlLoopInfoList() != null) {
for (ControlLoopInfo clEntry : participantStatusMessage.getControlLoopInfoList()) {
- var dbControlLoop = controlLoopProvider.getControlLoop(
- new ToscaConceptIdentifier(clEntry.getControlLoopId()));
+ var dbControlLoop =
+ controlLoopProvider.getControlLoop(new ToscaConceptIdentifier(clEntry.getControlLoopId()));
if (dbControlLoop == null) {
exceptionOccured(Response.Status.NOT_FOUND,
"PARTICIPANT_STATUS control loop not found in database: " + clEntry.getControlLoopId());
}
dbControlLoop.setState(clEntry.getState());
- monitoringProvider.createClElementStatistics(clEntry.getControlLoopStatistics()
- .getClElementStatisticsList().getClElementStatistics());
+ monitoringProvider.createClElementStatistics(
+ clEntry.getControlLoopStatistics().getClElementStatisticsList().getClElementStatistics());
}
}
}
if (participantUpdateCounter.count(id)) {
LOGGER.debug("retry message ParticipantUpdate");
- participantUpdatePublisher.send(id.getLeft(), id.getRight());
+ participantUpdatePublisher.send(id.getLeft(), id.getRight(), true);
} else {
LOGGER.debug("report Participant Update fault");
participantUpdateCounter.setFault(id);
import java.util.UUID;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
import org.springframework.stereotype.Component;
/**
* Send ParticipantRegisterAck to Participant.
*
* @param responseTo the original request id in the request.
+ * @param participantId the participant Id
+ * @param participantType the participant Type
*/
- public void send(UUID responseTo) {
+ public void send(UUID responseTo, ToscaConceptIdentifier participantId, ToscaConceptIdentifier participantType) {
var message = new ParticipantRegisterAck();
+ message.setParticipantId(participantId);
+ message.setParticipantType(participantType);
message.setResponseTo(responseTo);
message.setMessage("Participant Register Ack");
message.setResult(true);
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.List;
import org.junit.jupiter.api.Test;
import org.onap.policy.clamp.controlloop.runtime.main.parameters.ClRuntimeParameterGroup;
import org.onap.policy.clamp.controlloop.runtime.supervision.comm.ParticipantStatusListener;
var publisherFirst = spy(mock(Publisher.class));
var publisherSecond = spy(mock(Publisher.class));
- var publishers = new Publisher[] {publisherFirst, publisherSecond};
+ var publishers = List.of(publisherFirst, publisherSecond);
var listenerFirst = spy(mock(ParticipantStatusListener.class));
when(listenerFirst.getType()).thenReturn(TOPIC_FIRST);
when(listenerSecond.getType()).thenReturn(TOPIC_SECOND);
when(listenerSecond.getScoListener()).thenReturn(listenerSecond);
- var listeners = new Listener[] {listenerFirst, listenerSecond};
+ List<Listener> listeners = List.of(listenerFirst, listenerSecond);
try (var activator = new MessageDispatcherActivator(parameterGroup, publishers, listeners)) {