package org.onap.policy.clamp.controlloop.runtime.main.parameters;
-import java.util.List;
import javax.validation.constraints.NotBlank;
import lombok.Getter;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
import org.onap.policy.common.endpoints.parameters.RestServerParameters;
import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
import org.onap.policy.common.parameters.ParameterGroupImpl;
private ParticipantParameters participantParameters;
private TopicParameterGroup topicParameterGroup;
+ private long supervisionScannerIntervalSec;
+ private long participantStateChangeIntervalSec;
+ private long participantClUpdateIntervalSec;
+ private long participantClStateChangeIntervalSec;
+
/**
* Create the Control Loop parameter group.
*
private ParticipantControlLoopUpdatePublisher controlLoopUpdatePublisher;
private ParticipantControlLoopStateChangePublisher controlLoopStateChangePublisher;
+ private long supervisionScannerIntervalSec;
+ private long participantStateChangeIntervalSec;
+ private long participantClUpdateIntervalSec;
+ private long participantClStateChangeIntervalSec;
+
// Database scanner
private SupervisionScanner scanner;
() -> participantProvider = new ParticipantProvider(getDatabaseProviderParameters()),
() -> participantProvider = null);
// @formatter:on
+
+ supervisionScannerIntervalSec = clRuntimeParameterGroup.getSupervisionScannerIntervalSec();
+ participantStateChangeIntervalSec = clRuntimeParameterGroup.getParticipantClStateChangeIntervalSec();
+ participantClUpdateIntervalSec = clRuntimeParameterGroup.getParticipantClUpdateIntervalSec();
+ participantClStateChangeIntervalSec = clRuntimeParameterGroup.getParticipantClStateChangeIntervalSec();
+
}
/**
if (CollectionUtils.isEmpty(controlLoopIdentifierList)) {
// This is just to force throwing of the exception in certain circumstances.
- exceptionOccured(Response.Status.NOT_ACCEPTABLE,
- "The list of control loops for supervision is empty");
+ exceptionOccured(Response.Status.NOT_ACCEPTABLE, "The list of control loops for supervision is empty");
}
for (ToscaConceptIdentifier controlLoopId : controlLoopIdentifierList) {
@Override
public void startAndRegisterPublishers(List<TopicSink> topicSinks) {
- // TODO: Use a parameter for the timeout
// @formatter:off
this.publisherManager = new ServiceManager()
.addAction("Supervision scanner",
- () -> scanner = new SupervisionScanner(controlLoopProvider, 10000),
- () -> scanner = null)
+ () -> scanner =
+ new SupervisionScanner(controlLoopProvider, supervisionScannerIntervalSec),
+ () -> scanner.close())
.addAction("ControlLoopUpdate publisher",
- () -> controlLoopUpdatePublisher = new ParticipantControlLoopUpdatePublisher(topicSinks, -1),
+ () -> controlLoopUpdatePublisher =
+ new ParticipantControlLoopUpdatePublisher(topicSinks, participantClUpdateIntervalSec),
() -> controlLoopUpdatePublisher.terminate())
.addAction("StateChange Publisher",
- () -> stateChangePublisher = new ParticipantStateChangePublisher(topicSinks, 10000),
+ () -> stateChangePublisher =
+ new ParticipantStateChangePublisher(topicSinks, participantStateChangeIntervalSec),
() -> stateChangePublisher.terminate())
.addAction("ControlLoopStateChange Publisher",
() -> controlLoopStateChangePublisher =
- new ParticipantControlLoopStateChangePublisher(topicSinks, -1),
+ new ParticipantControlLoopStateChangePublisher(topicSinks, participantClStateChangeIntervalSec),
() -> controlLoopStateChangePublisher.terminate());
// @formatter:on
try {
* @throws PfModelException on accessing models in the database
* @throws ControlLoopException on supervision errors
*/
- private void superviseControlLoop(ControlLoop controlLoop) throws ControlLoopException, PfModelException {
+ private void superviseControlLoop(ControlLoop controlLoop) throws ControlLoopException, PfModelException {
switch (controlLoop.getOrderedState()) {
case UNINITIALISED:
superviseControlLoopUninitialization(controlLoop);
private void superviseParticipant(ParticipantStatus participantStatusMessage)
throws PfModelException, ControlLoopException {
if (participantStatusMessage.getParticipantId() == null) {
- exceptionOccured(Response.Status.NOT_FOUND,
- "Participant ID on PARTICIPANT_STATUS message is null");
+ exceptionOccured(Response.Status.NOT_FOUND, "Participant ID on PARTICIPANT_STATUS message is null");
}
List<Participant> participantList =
}
monitoringProvider = MonitoringHandler.getInstance().getMonitoringProvider();
- monitoringProvider.createParticipantStatistics(
- List.of(participantStatusMessage.getParticipantStatistics()));
+ monitoringProvider.createParticipantStatistics(List.of(participantStatusMessage.getParticipantStatistics()));
}
private void superviseControlLoops(ParticipantStatus participantStatusMessage)
--- /dev/null
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.runtime.supervision.comm;
+
+import java.util.List;
+import lombok.Getter;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
+
+public abstract class AbstractParticipantPublisher<E extends ParticipantMessage> {
+
+ private final TopicSinkClient topicSinkClient;
+
+ @Getter
+ private final long intervalSec;
+
+ /**
+ * Constructor.
+ *
+ * @param topicSinks the topic sinks
+ * @param intervalSec time interval to send ParticipantStateChange messages
+ */
+ protected AbstractParticipantPublisher(final List<TopicSink> topicSinks, long intervalSec) {
+ this.topicSinkClient = new TopicSinkClient(topicSinks.get(0));
+ this.intervalSec = intervalSec;
+ }
+
+ /**
+ * Terminates the current timer.
+ */
+ public void terminate() {
+ // Nothing to terminate, this publisher does not have a timer
+ }
+
+ /**
+ * Method to send Participant message to participants on demand.
+ *
+ * @param participantMessage the Participant message
+ */
+ public void send(final E participantMessage) {
+ topicSinkClient.send(participantMessage);
+ }
+}
import java.util.List;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This class is used to send ParticipantControlLoopStateChangePublisher messages to participants on DMaaP.
*/
-public class ParticipantControlLoopStateChangePublisher {
- private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantControlLoopStateChangePublisher.class);
-
- private TopicSinkClient topicSinkClient;
+public class ParticipantControlLoopStateChangePublisher
+ extends AbstractParticipantPublisher<ParticipantControlLoopStateChange> {
/**
* Constructor for instantiating ParticipantControlLoopStateChangePublisherPublisher.
* @param interval time interval to send ParticipantControlLoopStateChangePublisher messages
*/
public ParticipantControlLoopStateChangePublisher(final List<TopicSink> topicSinks, final long interval) {
- // TODO: Should not be dependent on the order of topic sinks in the config
- this.topicSinkClient = new TopicSinkClient(topicSinks.get(0));
- }
-
- /**
- * Terminates the current timer.
- */
- public void terminate() {
- // This is a user initiated message and doesn't need a timer.
- }
-
- /**
- * Get the current time interval used by the timer task.
- *
- * @return interval the current time interval
- */
- public long getInterval() {
- // This is a user initiated message and doesn't need a timer.
- return -1;
- }
-
- /**
- * Method to send ParticipantControlLoopStateChangePublisher status message to participants on demand.
- *
- * @param controlLoopStateChange the ParticipantControlLoopStateChangePublisher message
- */
- public void send(final ParticipantControlLoopStateChange controlLoopStateChange) {
- topicSinkClient.send(controlLoopStateChange);
- LOGGER.debug("Sent ParticipantControlLoopStateChange to Participants - {}", controlLoopStateChange);
+ super(topicSinks, interval);
}
}
import java.util.List;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This class is used to send ParticipantControlLoopUpdate messages to participants on DMaaP.
*/
-public class ParticipantControlLoopUpdatePublisher {
- private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantControlLoopUpdatePublisher.class);
-
- private TopicSinkClient topicSinkClient;
+public class ParticipantControlLoopUpdatePublisher extends AbstractParticipantPublisher<ParticipantControlLoopUpdate> {
/**
* Constructor for instantiating ParticipantUpdatePublisher.
* @param interval time interval to send ParticipantControlLoopUpdate messages
*/
public ParticipantControlLoopUpdatePublisher(final List<TopicSink> topicSinks, final long interval) {
- // TODO: Should not be dependent on the order of topic sinks in the config
- this.topicSinkClient = new TopicSinkClient(topicSinks.get(0));
- }
-
- /**
- * Terminates the current timer.
- */
- public void terminate() {
- // This is a user initiated message and doesn't need a timer.
- }
-
- /**
- * Get the current time interval used by the timer task.
- *
- * @return interval the current time interval
- */
- public long getInterval() {
- // This is a user initiated message and doesn't need a timer.
- return -1;
- }
-
- /**
- * Method to send ParticipantControlLoopUpdate status message to participants on demand.
- *
- * @param participantControlLoopUpdate the ParticipantControlLoopUpdate message
- */
- public void send(final ParticipantControlLoopUpdate participantControlLoopUpdate) {
- topicSinkClient.send(participantControlLoopUpdate);
- LOGGER.debug("Sent ParticipantControlLoopUpdate to Participants - {}", participantControlLoopUpdate);
+ super(topicSinks, interval);
}
}
import java.util.List;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStateChange;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This class is used to send ParticipantStateChange messages to participants on DMaaP.
*/
-public class ParticipantStateChangePublisher {
- private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantStateChangePublisher.class);
-
- private TopicSinkClient topicSinkClient;
+public class ParticipantStateChangePublisher extends AbstractParticipantPublisher<ParticipantStateChange> {
/**
* Constructor for instantiating ParticipantStateChangePublisher.
* @param topicSinks the topic sinks
* @param interval time interval to send ParticipantStateChange messages
*/
- public ParticipantStateChangePublisher(final List<TopicSink> topicSinks, final long interval) {
- // TODO: Should not be dependent on the order of topic sinks in the config
- this.topicSinkClient = new TopicSinkClient(topicSinks.get(0));
- }
-
- /**
- * Terminates the current timer.
- */
- public void terminate() {
- // Nothing to terminate, this publisher does not have a timer
- }
-
- /**
- * Get the current time interval used by the timer task.
- *
- * @return interval the current time interval
- */
- public long getInterval() {
- return -1;
- }
-
- /**
- * Method to send ParticipantStateChange status message to participants on demand.
- *
- * @param participantStateChange the ParticipantStateChange message
- */
- public void send(final ParticipantStateChange participantStateChange) {
- topicSinkClient.send(participantStateChange);
- LOGGER.debug("Sent ParticipantStateChange to Participants - {}", participantStateChange);
+ public ParticipantStateChangePublisher(List<TopicSink> topicSinks, long interval) {
+ super(topicSinks, interval);
}
}
{
"name": "CommissioningGroup",
+ "supervisionScannerIntervalSec" : 1000,
+ "participantStateChangeIntervalSec" : 1000,
+ "participantClUpdateIntervalSec" : 1000,
+ "participantClStateChangeIntervalSec" : 1000,
"restServerParameters": {
"host": "127.0.0.1",
"port": 6969,
{
"name": "ControlLoopRuntimeGroup",
+ "supervisionScannerIntervalSec": 1000,
+ "participantStateChangeIntervalSec": 1000,
+ "participantClUpdateIntervalSec": 1000,
+ "participantClStateChangeIntervalSec": 1000,
"restServerParameters": {
"host": "0.0.0.0",
- "port": ${port},
+ "port": ${port
+ },
"userName": "healthcheck",
"password": "zb!XztG34",
"https": false,
{
- "name":" ",
- "restServerParameters": {
- "host": "127.0.0.1",
- "port": 6969,
- "userName": "admin",
- "password": "password",
- "https": false,
- "aaf": false
- },
+ "name": " ",
+ "supervisionScannerIntervalSec": 1000,
+ "participantStateChangeIntervalSec": 1000,
+ "participantClUpdateIntervalSec": 1000,
+ "participantClStateChangeIntervalSec": 1000,
+ "restServerParameters": {
+ "host": "127.0.0.1",
+ "port": 6969,
+ "userName": "admin",
+ "password": "password",
+ "https": false,
+ "aaf": false
+ },
"pdpParameters": {
"heartBeatMs": 1,
"updateParameters": {
{
"name": "Instantiation",
- "restServerParameters": {
- "host": "127.0.0.1",
- "port": 6969,
- "userName": "admin",
- "password": "password",
- "https": false,
- "aaf": false
- },
+ "supervisionScannerIntervalSec": 1000,
+ "participantStateChangeIntervalSec": 1000,
+ "participantClUpdateIntervalSec": 1000,
+ "participantClStateChangeIntervalSec": 1000,
+ "restServerParameters": {
+ "host": "127.0.0.1",
+ "port": 6969,
+ "userName": "admin",
+ "password": "password",
+ "https": false,
+ "aaf": false
+ },
"pdpParameters": {
"heartBeatMs": 10,
"updateParameters": {
"persistenceUnit": "PolicyMariaDb"
},
"topicParameterGroup": {
- "topicSources" : [{
- "topic" : "INSTANTIATION",
- "servers" : [ "localhost:6845" ],
- "topicCommInfrastructure" : "dmaap"
- }],
- "topicSinks" : [{
- "topic" : "INSTANTIATION",
- "servers" : [ "localhost:6845" ],
- "topicCommInfrastructure" : "dmaap"
- }]
+ "topicSources": [
+ {
+ "topic": "INSTANTIATION",
+ "servers": [
+ "localhost:6845"
+ ],
+ "topicCommInfrastructure": "dmaap"
+ }
+ ],
+ "topicSinks": [
+ {
+ "topic": "INSTANTIATION",
+ "servers": [
+ "localhost:6845"
+ ],
+ "topicCommInfrastructure": "dmaap"
+ }
+ ]
}
}
{
- "name":"PapGroup",
- "restServerParameters":{
- "host":"0.0.0.0",
- "port":6969,
- "userName":"healthcheck",
- "password":"zb!XztG34"
+ "name": "PapGroup",
+ "supervisionScannerIntervalSec": 1000,
+ "participantStateChangeIntervalSec": 1000,
+ "participantClUpdateIntervalSec": 1000,
+ "participantClStateChangeIntervalSec": 1000,
+ "restServerParameters": {
+ "host": "0.0.0.0",
+ "port": 6969,
+ "userName": "healthcheck",
+ "password": "zb!XztG34"
},
"pdpParameters": {
"heartBeatMs": 1,
"persistenceUnit": "PdpGroupTest"
},
"topicParameterGroup": {
- "topicSources" : [{
- "topic" : "POLICY-PDP-PAP",
- "servers" : [ "message-router" ],
- "topicCommInfrastructure" : "dmaap"
- }],
- "topicSinks" : [{
- "topic" : "POLICY-PDP-PAP",
- "servers" : [ "message-router" ],
- "topicCommInfrastructure" : "dmaap"
- }]
+ "topicSources": [
+ {
+ "topic": "POLICY-PDP-PAP",
+ "servers": [
+ "message-router"
+ ],
+ "topicCommInfrastructure": "dmaap"
+ }
+ ],
+ "topicSinks": [
+ {
+ "topic": "POLICY-PDP-PAP",
+ "servers": [
+ "message-router"
+ ],
+ "topicCommInfrastructure": "dmaap"
+ }
+ ]
}
}
{
"name": "ControlLoopRuntimeGroup",
+ "supervisionScannerIntervalSec": 1000,
+ "participantStateChangeIntervalSec": 1000,
+ "participantClUpdateIntervalSec": 1000,
+ "participantClStateChangeIntervalSec": 1000,
"restServerParameters": {
"host": "0.0.0.0",
"port": 6969,
{
"name": "ControlLoopRuntimeGroup",
+ "supervisionScannerIntervalSec": 1000,
+ "participantStateChangeIntervalSec": 1000,
+ "participantClUpdateIntervalSec": 1000,
+ "participantClStateChangeIntervalSec": 1000,
"restServerParameters": {
"host": "0.0.0.0",
"port": 6969,
{
"name": "ControlLoopRuntimeGroup",
+ "supervisionScannerIntervalSec": 1000,
+ "participantStateChangeIntervalSec": 1000,
+ "participantClUpdateIntervalSec": 1000,
+ "participantClStateChangeIntervalSec": 1000,
"restServerParameters": {
"host": "0.0.0.0",
"port": 6969,