Few JUnit additions for PAP-REST
[policy/engine.git] / ONAP-PAP-REST / src / main / java / org / onap / policy / pap / xacml / rest / Heartbeat.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP-PAP-REST
4  * ================================================================================
5  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.pap.xacml.rest;
22
23 import com.att.research.xacml.api.pap.PAPException;
24 import com.att.research.xacml.api.pap.PDPStatus;
25 import com.att.research.xacml.util.XACMLProperties;
26 import com.google.common.annotations.VisibleForTesting;
27 import java.net.ConnectException;
28 import java.net.HttpURLConnection;
29 import java.net.MalformedURLException;
30 import java.net.SocketTimeoutException;
31 import java.net.URL;
32 import java.net.UnknownHostException;
33 import java.util.HashMap;
34 import java.util.HashSet;
35 import java.util.Set;
36 import org.onap.policy.common.logging.eelf.MessageCodes;
37 import org.onap.policy.common.logging.eelf.PolicyLogger;
38 import org.onap.policy.common.logging.flexlogger.FlexLogger;
39 import org.onap.policy.common.logging.flexlogger.Logger;
40 import org.onap.policy.pap.xacml.restAuth.CheckPDP;
41 import org.onap.policy.rest.XacmlRestProperties;
42 import org.onap.policy.xacml.api.pap.OnapPDP;
43 import org.onap.policy.xacml.api.pap.OnapPDPGroup;
44 import org.onap.policy.xacml.api.pap.PAPPolicyEngine;
45
46 /**
47  * Heartbeat thread - periodically check on PDPs' status.
48  * Heartbeat with all known PDPs.
49  * Implementation note:
50  * The PDPs are contacted Sequentially, not in Parallel.
51  * If we did this in parallel using multiple threads we would simultaneously use - 1 thread and - 1
52  * connection for EACH PDP. This could become a resource problem since we already use multiple
53  * threads and connections for updating the PDPs when user changes occur. Using separate threads can
54  * also make it tricky dealing with timeouts on PDPs that are non-responsive.
55  * The Sequential operation does a heartbeat request to each PDP one at a time. This has the flaw
56  * that any PDPs that do not respond will hold up the entire heartbeat sequence until they timeout.
57  * If there are a lot of non-responsive PDPs and the timeout is large-ish (the default is 20
58  * seconds) it could take a long time to cycle through all of the PDPs. That means that this may not
59  * notice a PDP being down in a predictable time.
60  */
61 public class Heartbeat implements Runnable {
62
63     private static final Logger LOGGER = FlexLogger.getLogger(Heartbeat.class);
64
65     private PAPPolicyEngine papEngine;
66     private Set<OnapPDP> pdps = new HashSet<>();
67     private int heartbeatInterval;
68     private int heartbeatTimeout;
69     private static final String HEARTBEATSTRING = " Heartbeat '";
70     private static final String XACMLPAPSERVLET = "XacmlPapServlet";
71     private static final String ISRUNNINGFALSE = "isRunning is false, getting out of loop.";
72     private volatile boolean isRunning = false;
73
74     public synchronized boolean isHeartBeatRunning() {
75         return this.isRunning;
76     }
77
78     public synchronized void terminate() {
79         this.isRunning = false;
80     }
81
82     /**
83      * Instantiates a new heartbeat.
84      *
85      * @param papEngine2 the pap engine 2
86      */
87     public Heartbeat(PAPPolicyEngine papEngine2) {
88         papEngine = papEngine2;
89         this.heartbeatInterval =
90             Integer.parseInt(XACMLProperties.getProperty(XacmlRestProperties.PROP_PAP_HEARTBEAT_INTERVAL, "10000"));
91         this.heartbeatTimeout =
92             Integer.parseInt(XACMLProperties.getProperty(XacmlRestProperties.PROP_PAP_HEARTBEAT_TIMEOUT, "10000"));
93     }
94
95     @Override
96     public void run() {
97         // Set ourselves as running
98         synchronized (this) {
99             this.isRunning = true;
100         }
101         try {
102             while (this.isHeartBeatRunning()) {
103                 // Wait the given time
104                 Thread.sleep(heartbeatInterval);
105                 // get the list of PDPs (may have changed since last time)
106                 pdps.clear();
107                 synchronized (papEngine) {
108                     getPdpsFromGroup();
109                 }
110                 // Check for shutdown
111                 if (!this.isHeartBeatRunning()) {
112                     LOGGER.info(ISRUNNINGFALSE);
113                     break;
114                 }
115                 notifyEachPdp();
116                 // Check for shutdown
117                 if (!this.isHeartBeatRunning()) {
118                     LOGGER.info(ISRUNNINGFALSE);
119                     break;
120                 }
121             }
122         } catch (InterruptedException e) {
123             PolicyLogger.error(MessageCodes.ERROR_SYSTEM_ERROR + " Heartbeat interrupted.  Shutting down");
124             this.terminate();
125             Thread.currentThread().interrupt();
126         }
127     }
128
129     @VisibleForTesting
130     protected void getPdpsFromGroup() {
131         try {
132             for (OnapPDPGroup g : papEngine.getOnapPDPGroups()) {
133                 for (OnapPDP p : g.getOnapPdps()) {
134                     pdps.add(p);
135                 }
136             }
137         } catch (PAPException e) {
138             PolicyLogger.error(MessageCodes.ERROR_SYSTEM_ERROR, e, XACMLPAPSERVLET,
139                 "Heartbeat unable to read PDPs from PAPEngine");
140         }
141     }
142
143     @VisibleForTesting
144     protected void notifyEachPdp() {
145         HashMap<String, URL> idToUrlMap = new HashMap<>();
146         for (OnapPDP pdp : pdps) {
147             // Check for shutdown
148             if (!this.isHeartBeatRunning()) {
149                 LOGGER.info(ISRUNNINGFALSE);
150                 break;
151             }
152             // the id of the PDP is its url (though we add a query
153             // parameter)
154             URL pdpUrl = idToUrlMap.get(pdp.getId());
155             if (pdpUrl == null) {
156                 // haven't seen this PDP before
157                 String fullUrlString = null;
158                 try {
159                     // Check PDP ID
160                     if (CheckPDP.validateID(pdp.getId())) {
161                         fullUrlString = pdp.getId() + "?type=hb";
162                         pdpUrl = new URL(fullUrlString);
163                         idToUrlMap.put(pdp.getId(), pdpUrl);
164                     }
165                 } catch (MalformedURLException e) {
166                     PolicyLogger.error(MessageCodes.ERROR_DATA_ISSUE, e, XACMLPAPSERVLET,
167                         " PDP id '" + fullUrlString + "' is not a valid URL");
168                 }
169             }
170             updatePdpStatus(pdp, openPdpConnection(pdpUrl, pdp));
171         }
172     }
173
174     @VisibleForTesting
175     protected String openPdpConnection(URL pdpUrl, OnapPDP pdp) {
176         // Do a GET with type HeartBeat
177         String newStatus = "";
178         HttpURLConnection connection = null;
179         try {
180             // Open up the connection
181             if (pdpUrl != null) {
182                 connection = (HttpURLConnection) pdpUrl.openConnection();
183                 // Setup our method and headers
184                 connection.setRequestMethod("GET");
185                 connection.setConnectTimeout(heartbeatTimeout);
186                 // Authentication
187                 String encoding = CheckPDP.getEncoding(pdp.getId());
188                 if (encoding != null) {
189                     connection.setRequestProperty("Authorization", "Basic " + encoding);
190                 }
191                 // Do the connect
192                 connection.connect();
193                 if (connection.getResponseCode() == 204) {
194                     newStatus = connection.getHeaderField(XacmlRestProperties.PROP_PDP_HTTP_HEADER_HB);
195                     if (LOGGER.isDebugEnabled()) {
196                         LOGGER.debug("Heartbeat '" + pdp.getId() + "' status='" + newStatus + "'");
197                     }
198                 } else {
199                     // anything else is an unexpected result
200                     newStatus = PDPStatus.Status.UNKNOWN.toString();
201                     PolicyLogger.error(MessageCodes.ERROR_SYSTEM_ERROR + " Heartbeat connect response code "
202                         + connection.getResponseCode() + ": " + pdp.getId());
203                 }
204             }
205         } catch (UnknownHostException e) {
206             newStatus = PDPStatus.Status.NO_SUCH_HOST.toString();
207             PolicyLogger.error(MessageCodes.ERROR_SYSTEM_ERROR, e, XACMLPAPSERVLET,
208                 HEARTBEATSTRING + pdp.getId() + "' NO_SUCH_HOST");
209         } catch (SocketTimeoutException e) {
210             newStatus = PDPStatus.Status.CANNOT_CONNECT.toString();
211             PolicyLogger.error(MessageCodes.ERROR_SYSTEM_ERROR, e, XACMLPAPSERVLET,
212                 HEARTBEATSTRING + pdp.getId() + "' connection timeout");
213         } catch (ConnectException e) {
214             newStatus = PDPStatus.Status.CANNOT_CONNECT.toString();
215             PolicyLogger.error(MessageCodes.ERROR_SYSTEM_ERROR, e, XACMLPAPSERVLET,
216                 HEARTBEATSTRING + pdp.getId() + "' cannot connect");
217         } catch (Exception e) {
218             newStatus = PDPStatus.Status.UNKNOWN.toString();
219             PolicyLogger.error(MessageCodes.ERROR_SYSTEM_ERROR, e, XACMLPAPSERVLET,
220                 HEARTBEATSTRING + pdp.getId() + "' connect exception");
221         } finally {
222             // cleanup the connection
223             if (connection != null)
224                 connection.disconnect();
225         }
226         return newStatus;
227     }
228
229     @VisibleForTesting
230     protected void updatePdpStatus(OnapPDP pdp, String newStatus) {
231         if (!pdp.getStatus().getStatus().toString().equals(newStatus)) {
232             if (LOGGER.isDebugEnabled()) {
233                 LOGGER.debug("previous status='" + pdp.getStatus().getStatus() + "'  new Status='" + newStatus + "'");
234             }
235             try {
236                 getPAPInstance().setPDPSummaryStatus(pdp, newStatus);
237             } catch (PAPException e) {
238                 PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e, XACMLPAPSERVLET,
239                     "Unable to set state for PDP '" + pdp.getId());
240             }
241         }
242     }
243
244     private XACMLPapServlet getPAPInstance() {
245         return new XACMLPapServlet();
246     }
247
248 }