Wait for pdp-pap topic in xacml-pdp
[policy/xacml-pdp.git] / main / src / main / java / org / onap / policy / pdpx / main / comm / XacmlPdpHearbeatPublisher.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.pdpx.main.comm;
22
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.TimeUnit;
27 import lombok.Getter;
28 import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient;
29 import org.onap.policy.common.utils.coder.Coder;
30 import org.onap.policy.common.utils.coder.CoderException;
31 import org.onap.policy.common.utils.coder.StandardCoder;
32 import org.onap.policy.models.pdp.concepts.PdpStatus;
33 import org.onap.policy.models.pdp.concepts.PdpTopicCheck;
34 import org.onap.policy.pdpx.main.XacmlState;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 public class XacmlPdpHearbeatPublisher implements Runnable {
39     public static final int DEFAULT_HB_INTERVAL_MS = 60000;
40
41     private static final Logger LOGGER = LoggerFactory.getLogger(XacmlPdpHearbeatPublisher.class);
42     private static final Coder CODER = new StandardCoder();
43
44     private final BidirectionalTopicClient topicChecker;
45     private final long probeHeartbeatTopicMs;
46
47     /**
48      * Tracks the state of this PDP.
49      */
50     private final XacmlState currentState;
51
52     /**
53      * Current timer interval, in milliseconds.
54      */
55     @Getter
56     private long intervalMs = DEFAULT_HB_INTERVAL_MS;
57
58     private ScheduledExecutorService timerThread;
59
60     private ScheduledFuture<?> timer;
61
62
63     /**
64      * Constructor for instantiating XacmlPdpPublisher.
65      *
66      * @param topicChecker used to check the topic before sending heart beat message
67      * @param probeHeartbeatTopicMs frequency, in milliseconds, with which to probe the
68      *        heartbeat topic before sending the first heartbeat. Zero disables probing
69      * @param state tracks the state of this PDP
70      */
71     public XacmlPdpHearbeatPublisher(BidirectionalTopicClient topicChecker, long probeHeartbeatTopicMs,
72                     XacmlState state) {
73         LOGGER.info("heartbeat topic probe {}ms", probeHeartbeatTopicMs);
74         this.topicChecker = topicChecker;
75         this.probeHeartbeatTopicMs = probeHeartbeatTopicMs;
76         this.currentState = state;
77     }
78
79     @Override
80     public void run() {
81         try {
82             if (!isTopicReady()) {
83                 return;
84             }
85
86             PdpStatus message = currentState.genHeartbeat();
87             LOGGER.info("Sending Xacml PDP heartbeat to the PAP - {}", message);
88
89             String json = CODER.encode(message);
90             topicChecker.send(json);
91
92         } catch (RuntimeException | CoderException e) {
93             LOGGER.warn("send to {} failed because of {}", topicChecker.getSink().getTopic(), e.getMessage(), e);
94         }
95     }
96
97     private boolean isTopicReady() throws CoderException {
98         if (probeHeartbeatTopicMs <= 0 || topicChecker.isReady()) {
99             return true;
100         }
101
102         var check = new PdpTopicCheck();
103         check.setName(XacmlState.PDP_NAME);
104         return topicChecker.awaitReady(check, probeHeartbeatTopicMs);
105     }
106
107     /**
108      * Method to terminate the heart beat.
109      */
110     public synchronized void terminate() {
111         topicChecker.stopWaiting();
112
113         if (timerThread != null) {
114             timerThread.shutdownNow();
115             timerThread = null;
116             timer = null;
117         }
118     }
119
120     /**
121      * Restarts the timer if the interval has changed. If the timer is not currently
122      * running, then it updates the interval, but does not start the timer.
123      *
124      * @param intervalMs desired interval, or {@code null} to leave it unchanged
125      */
126     public synchronized void restart(Long intervalMs) {
127         if (intervalMs != null && intervalMs > 0 && intervalMs != this.intervalMs) {
128             this.intervalMs = intervalMs;
129
130             if (timerThread != null) {
131                 timer.cancel(false);
132                 timer = timerThread.scheduleWithFixedDelay(this, 0, this.intervalMs, TimeUnit.MILLISECONDS);
133             }
134         }
135     }
136
137     /**
138      * Starts the timer.
139      */
140     public synchronized void start() {
141         if (timerThread == null) {
142             timerThread = makeTimerThread();
143             timer = timerThread.scheduleWithFixedDelay(this, 0, this.intervalMs, TimeUnit.MILLISECONDS);
144         }
145     }
146
147     // these may be overridden by junit tests
148
149     protected ScheduledExecutorService makeTimerThread() {
150         return Executors.newScheduledThreadPool(1);
151     }
152 }