Generate notifications when policies change
[policy/pap.git] / main / src / main / java / org / onap / policy / pap / main / comm / msgdata / RequestImpl.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP PAP
4  * ================================================================================
5  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.pap.main.comm.msgdata;
22
23 import lombok.AccessLevel;
24 import lombok.Getter;
25 import lombok.NonNull;
26 import lombok.Setter;
27 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
28 import org.onap.policy.common.utils.services.ServiceManager;
29 import org.onap.policy.models.pdp.concepts.PdpMessage;
30 import org.onap.policy.models.pdp.concepts.PdpStatus;
31 import org.onap.policy.pap.main.comm.QueueToken;
32 import org.onap.policy.pap.main.comm.TimerManager;
33 import org.onap.policy.pap.main.notification.PolicyNotifier;
34 import org.onap.policy.pap.main.parameters.RequestParams;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * Request data implementation.
40  */
41 public abstract class RequestImpl implements Request {
42     private static final Logger logger = LoggerFactory.getLogger(RequestImpl.class);
43
44     /**
45      * Name with which this data is associated, used for logging purposes.
46      */
47     @Getter
48     private final String name;
49
50     /**
51      * The configuration parameters.
52      */
53     @Getter(AccessLevel.PROTECTED)
54     private final RequestParams params;
55
56     /**
57      * Used to register/unregister the listener and the timer.
58      */
59     private final ServiceManager svcmgr;
60
61     /**
62      * Handles events associated with the request.
63      */
64     @Setter
65     private RequestListener listener;
66
67     /**
68      * Notifier for policy update completions.
69      */
70     @Getter
71     @Setter
72     private PolicyNotifier notifier;
73
74     /**
75      * Current retry count.
76      */
77     @Getter
78     private int retryCount = 0;
79
80     /**
81      * The current message.
82      */
83     @Getter
84     private PdpMessage message;
85
86     /**
87      * The currently running timer.
88      */
89     private TimerManager.Timer timer;
90
91     /**
92      * Token that has been placed on the queue.
93      */
94     private QueueToken<PdpMessage> token = null;
95
96
97     /**
98      * Constructs the object, and validates the parameters.
99      *
100      * @param params configuration parameters
101      * @param name the request name, used for logging purposes
102      * @param message the initial message
103      *
104      * @throws IllegalArgumentException if a required parameter is not set
105      */
106     public RequestImpl(@NonNull RequestParams params, @NonNull String name, @NonNull PdpMessage message) {
107         params.validate();
108
109         this.name = name;
110         this.params = params;
111         this.message = message;
112
113         // @formatter:off
114         this.svcmgr = new ServiceManager(name)
115                         .addAction("listener",
116                             () -> params.getResponseDispatcher()
117                                             .register(this.message.getRequestId(), this::processResponse),
118                             () -> params.getResponseDispatcher().unregister(this.message.getRequestId()))
119                         .addAction("timer",
120                             () -> timer = params.getTimers().register(this.message.getRequestId(), this::handleTimeout),
121                             () -> timer.cancel())
122                         .addAction("enqueue",
123                             this::enqueue,
124                             () -> {
125                                 // do not remove from the queue - token may be re-used
126                             });
127         // @formatter:on
128     }
129
130     @Override
131     public void reconfigure(PdpMessage newMessage, QueueToken<PdpMessage> token2) {
132         if (newMessage.getClass() != message.getClass()) {
133             throw new IllegalArgumentException("expecting " + message.getClass().getSimpleName() + " instead of "
134                             + newMessage.getClass().getSimpleName());
135         }
136
137         logger.info("reconfiguring {} with new message", getName());
138
139         if (svcmgr.isAlive()) {
140             token = stopPublishing(false);
141             message = newMessage;
142             startPublishing(token2);
143
144         } else {
145             message = newMessage;
146         }
147     }
148
149     @Override
150     public boolean isPublishing() {
151         return svcmgr.isAlive();
152     }
153
154     @Override
155     public void startPublishing() {
156         startPublishing(null);
157     }
158
159     @Override
160     public void startPublishing(QueueToken<PdpMessage> token2) {
161         if (listener == null) {
162             throw new IllegalStateException("listener has not been set");
163         }
164
165         synchronized (params.getModifyLock()) {
166             replaceToken(token2);
167
168             if (svcmgr.isAlive()) {
169                 logger.info("{} is already publishing", getName());
170
171             } else {
172                 resetRetryCount();
173                 svcmgr.start();
174             }
175         }
176     }
177
178     /**
179      * Replaces the current token with a new token.
180      * @param newToken the new token
181      */
182     private void replaceToken(QueueToken<PdpMessage> newToken) {
183         if (newToken != null) {
184             if (token == null) {
185                 token = newToken;
186
187             } else if (token != newToken) {
188                 // already have a token - discard the new token
189                 newToken.replaceItem(null);
190             }
191         }
192     }
193
194     @Override
195     public void stopPublishing() {
196         stopPublishing(true);
197     }
198
199     @Override
200     public QueueToken<PdpMessage> stopPublishing(boolean removeFromQueue) {
201         if (svcmgr.isAlive()) {
202             svcmgr.stop();
203
204             if (removeFromQueue) {
205                 token.replaceItem(null);
206                 token = null;
207             }
208         }
209
210         QueueToken<PdpMessage> tok = token;
211         token = null;
212
213         return tok;
214     }
215
216     /**
217      * Enqueues the current message with the publisher, putting it into the queue token,
218      * if possible. Otherwise, it adds a new token to the queue.
219      */
220     private void enqueue() {
221         if (token != null && token.replaceItem(message) != null) {
222             // took the other's place in the queue - continue using the token
223             return;
224         }
225
226         // couldn't take the other's place - add our own token to the queue
227         token = new QueueToken<>(message);
228         params.getPdpPublisher().enqueue(token);
229     }
230
231     /**
232      * Resets the retry count.
233      */
234     public void resetRetryCount() {
235         retryCount = 0;
236     }
237
238     /**
239      * Bumps the retry count.
240      *
241      * @return {@code true} if successful, {@code false} if the limit has been reached
242      */
243     public boolean bumpRetryCount() {
244         if (retryCount >= params.getMaxRetryCount()) {
245             return false;
246         }
247
248         retryCount++;
249         return true;
250     }
251
252     /**
253      * Processes a response received from the PDP.
254      *
255      * @param infra infrastructure on which the response was received
256      * @param topic topic on which the response was received
257      * @param response the response
258      */
259     private void processResponse(CommInfrastructure infra, String topic, PdpStatus response) {
260
261         synchronized (params.getModifyLock()) {
262             String pdpName = response.getName();
263
264             if (!svcmgr.isAlive()) {
265                 // this particular request must have been discarded
266                 return;
267             }
268
269             svcmgr.stop();
270
271             String reason = checkResponse(response);
272             if (reason != null) {
273                 logger.info("{} PDP data mismatch via {} {}: {}", getName(), infra, topic, reason);
274                 listener.failure(pdpName, reason);
275                 return;
276             }
277
278             logger.info("{} successful", getName());
279             listener.success(pdpName);
280         }
281     }
282
283     /**
284      * Handles a timeout.
285      *
286      * @param timerName the timer timer
287      */
288     private void handleTimeout(String timerName) {
289
290         synchronized (params.getModifyLock()) {
291             if (!svcmgr.isAlive()) {
292                 // this particular request must have been discarded
293                 return;
294             }
295
296             stopPublishing();
297
298             if (!bumpRetryCount()) {
299                 logger.info("{} timeout {} - retry count {} exhausted", getName(), timerName, retryCount);
300                 listener.retryCountExhausted();
301                 return;
302             }
303
304             // re-publish
305             logger.info("{} timeout - re-publish count {}", getName(), retryCount);
306
307             // startPublishing() resets the count, so save & restore it here
308             int count = retryCount;
309             startPublishing();
310             retryCount = count;
311         }
312     }
313
314     /**
315      * Verifies that the name is not null. Also verifies that it matches the name in the
316      * message, if the message has a name.
317      */
318     @Override
319     public String checkResponse(PdpStatus response) {
320         if (response.getName() == null) {
321             return "null PDP name";
322         }
323
324         if (message.getName() != null && !message.getName().equals(response.getName())) {
325             return "PDP name does not match";
326         }
327
328         return null;
329     }
330 }