2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.pap.main.comm.msgdata;
23 import lombok.AccessLevel;
25 import lombok.NonNull;
27 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
28 import org.onap.policy.common.utils.services.ServiceManager;
29 import org.onap.policy.models.pdp.concepts.PdpMessage;
30 import org.onap.policy.models.pdp.concepts.PdpStatus;
31 import org.onap.policy.pap.main.comm.QueueToken;
32 import org.onap.policy.pap.main.comm.TimerManager;
33 import org.onap.policy.pap.main.notification.PolicyNotifier;
34 import org.onap.policy.pap.main.parameters.RequestParams;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
39 * Request data implementation.
41 public abstract class RequestImpl implements Request {
42 private static final Logger logger = LoggerFactory.getLogger(RequestImpl.class);
45 * Name with which this data is associated, used for logging purposes.
48 private final String name;
51 * The configuration parameters.
53 @Getter(AccessLevel.PROTECTED)
54 private final RequestParams params;
57 * Used to register/unregister the listener and the timer.
59 private final ServiceManager svcmgr;
62 * Handles events associated with the request.
65 private RequestListener listener;
68 * Notifier for policy update completions.
72 private PolicyNotifier notifier;
75 * Current retry count.
78 private int retryCount = 0;
81 * The current message.
84 private PdpMessage message;
87 * The currently running timer.
89 private TimerManager.Timer timer;
92 * Token that has been placed on the queue.
94 private QueueToken<PdpMessage> token = null;
98 * Constructs the object, and validates the parameters.
100 * @param params configuration parameters
101 * @param name the request name, used for logging purposes
102 * @param message the initial message
104 * @throws IllegalArgumentException if a required parameter is not set
106 public RequestImpl(@NonNull RequestParams params, @NonNull String name, @NonNull PdpMessage message) {
110 this.params = params;
111 this.message = message;
114 this.svcmgr = new ServiceManager(name)
115 .addAction("listener",
116 () -> params.getResponseDispatcher()
117 .register(this.message.getRequestId(), this::processResponse),
118 () -> params.getResponseDispatcher().unregister(this.message.getRequestId()))
120 () -> timer = params.getTimers().register(this.message.getRequestId(), this::handleTimeout),
121 () -> timer.cancel())
122 .addAction("enqueue",
125 // do not remove from the queue - token may be re-used
131 public void reconfigure(PdpMessage newMessage, QueueToken<PdpMessage> token2) {
132 if (newMessage.getClass() != message.getClass()) {
133 throw new IllegalArgumentException("expecting " + message.getClass().getSimpleName() + " instead of "
134 + newMessage.getClass().getSimpleName());
137 logger.info("reconfiguring {} with new message", getName());
139 if (svcmgr.isAlive()) {
140 token = stopPublishing(false);
141 message = newMessage;
142 startPublishing(token2);
145 message = newMessage;
150 public boolean isPublishing() {
151 return svcmgr.isAlive();
155 public void startPublishing() {
156 startPublishing(null);
160 public void startPublishing(QueueToken<PdpMessage> token2) {
161 if (listener == null) {
162 throw new IllegalStateException("listener has not been set");
165 synchronized (params.getModifyLock()) {
166 replaceToken(token2);
168 if (svcmgr.isAlive()) {
169 logger.info("{} is already publishing", getName());
179 * Replaces the current token with a new token.
180 * @param newToken the new token
182 private void replaceToken(QueueToken<PdpMessage> newToken) {
183 if (newToken != null) {
187 } else if (token != newToken) {
188 // already have a token - discard the new token
189 newToken.replaceItem(null);
195 public void stopPublishing() {
196 stopPublishing(true);
200 public QueueToken<PdpMessage> stopPublishing(boolean removeFromQueue) {
201 if (svcmgr.isAlive()) {
204 if (removeFromQueue) {
205 token.replaceItem(null);
210 QueueToken<PdpMessage> tok = token;
217 * Enqueues the current message with the publisher, putting it into the queue token,
218 * if possible. Otherwise, it adds a new token to the queue.
220 private void enqueue() {
221 if (token != null && token.replaceItem(message) != null) {
222 // took the other's place in the queue - continue using the token
226 // couldn't take the other's place - add our own token to the queue
227 token = new QueueToken<>(message);
228 params.getPdpPublisher().enqueue(token);
232 * Resets the retry count.
234 public void resetRetryCount() {
239 * Bumps the retry count.
241 * @return {@code true} if successful, {@code false} if the limit has been reached
243 public boolean bumpRetryCount() {
244 if (retryCount >= params.getMaxRetryCount()) {
253 * Processes a response received from the PDP.
255 * @param infra infrastructure on which the response was received
256 * @param topic topic on which the response was received
257 * @param response the response
259 private void processResponse(CommInfrastructure infra, String topic, PdpStatus response) {
261 synchronized (params.getModifyLock()) {
262 String pdpName = response.getName();
264 if (!svcmgr.isAlive()) {
265 // this particular request must have been discarded
271 String reason = checkResponse(response);
272 if (reason != null) {
273 logger.info("{} PDP data mismatch via {} {}: {}", getName(), infra, topic, reason);
274 listener.failure(pdpName, reason);
278 logger.info("{} successful", getName());
279 listener.success(pdpName);
286 * @param timerName the timer timer
288 private void handleTimeout(String timerName) {
290 synchronized (params.getModifyLock()) {
291 if (!svcmgr.isAlive()) {
292 // this particular request must have been discarded
298 if (!bumpRetryCount()) {
299 logger.info("{} timeout {} - retry count {} exhausted", getName(), timerName, retryCount);
300 listener.retryCountExhausted();
305 logger.info("{} timeout - re-publish count {}", getName(), retryCount);
307 // startPublishing() resets the count, so save & restore it here
308 int count = retryCount;
315 * Verifies that the name is not null. Also verifies that it matches the name in the
316 * message, if the message has a name.
319 public String checkResponse(PdpStatus response) {
320 if (response.getName() == null) {
321 return "null PDP name";
324 if (message.getName() != null && !message.getName().equals(response.getName())) {
325 return "PDP name does not match";