2ebb38d1524638241b43c9e65387c1870adade71
[dcaegen2/analytics/tca-gen2.git] / dcae-analytics / dcae-analytics-web / src / main / java / org / onap / dcae / analytics / web / dmaap / MrSubscriberPollingAdvice.java
1 /*
2  * ================================================================================
3  * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  *
18  */
19
20 package org.onap.dcae.analytics.web.dmaap;
21
22 import java.util.Map;
23 import java.util.concurrent.atomic.AtomicInteger;
24
25 import org.onap.dcae.analytics.model.AnalyticsHttpConstants;
26 import org.onap.dcae.analytics.model.DmaapMrConstants;
27 import org.onap.dcae.analytics.tca.core.util.LogSpec;
28 import org.onap.dcae.analytics.web.util.AnalyticsHttpUtils;
29 import org.onap.dcae.utils.eelf.logger.api.log.EELFLogFactory;
30 import org.onap.dcae.utils.eelf.logger.api.log.EELFLogger;
31 import org.onap.dcae.utils.eelf.logger.api.spec.DebugLogSpec;
32 import org.springframework.http.HttpStatus;
33 import org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice;
34 import org.springframework.integration.support.MessageBuilder;
35 import org.springframework.integration.util.DynamicPeriodicTrigger;
36 import org.springframework.messaging.Message;
37
38 /**
39  * A polling advice which can auto adjust polling intervals depending on DMaaP MR message availability.
40  * Can be configured to slow down polling when messages are not available and increase polling when messages are
41  * indeed available.
42  * <p>
43  * The next polling interval is <b>increased</b> by given step up delta if message is <b>not found</b> up to maximum
44  * Polling Interval
45  * <br>
46  * The next polling interval is <b>decreased</b> by step down delta if message <b>is found</b> up to minimum
47  * polling interval
48  *
49  * @author Rajiv Singla
50  */
51 public class MrSubscriberPollingAdvice extends AbstractRequestHandlerAdvice {
52
53     private static final EELFLogger eelfLogger = EELFLogFactory.getLogger(MrSubscriberPollingAdvice.class);
54
55     private final DynamicPeriodicTrigger trigger;
56     private final int minPollingInterval;
57     private final int stepUpPollingDelta;
58     private final int maxPollingInterval;
59     private final int stepDownPollingDelta;
60
61     private final AtomicInteger nextPollingInterval;
62
63     /**
64      * Creates variable polling intervals based on message availability.
65      *
66      * @param trigger Dynamic Trigger instance
67      * @param minPollingInterval Minimum polling interval
68      * @param stepUpPollingDelta Delta by which next polling interval will be increased when message is not found
69      * @param maxPollingInterval Maximum polling interval
70      * @param stepDownPollingDelta Delta by which next polling interval will be decreased when message is found
71      */
72     public MrSubscriberPollingAdvice(final DynamicPeriodicTrigger trigger,
73                                      final int minPollingInterval,
74                                      final int stepUpPollingDelta,
75                                      final int maxPollingInterval,
76                                      final int stepDownPollingDelta) {
77         this.trigger = trigger;
78         this.minPollingInterval = minPollingInterval;
79         this.stepUpPollingDelta = stepUpPollingDelta;
80         this.maxPollingInterval = maxPollingInterval;
81         this.stepDownPollingDelta = stepDownPollingDelta;
82         nextPollingInterval = new AtomicInteger(minPollingInterval);
83     }
84
85     @Override
86     @SuppressWarnings("unchecked")
87     protected Object doInvoke(final ExecutionCallback callback, final Object target, final Message<?> message)
88             throws Exception {
89
90         // execute call back
91         Object result = callback.execute();
92
93         // if result is not of type message builder just return
94         if (!(result instanceof MessageBuilder)) {
95             return result;
96         }
97
98         final MessageBuilder<String> resultMessageBuilder = (MessageBuilder<String>) result;
99         final String payload = resultMessageBuilder.getPayload();
100         final Map<String, Object> headers = resultMessageBuilder.getHeaders();
101         final Object httpStatusCode = headers.get(AnalyticsHttpConstants.HTTP_STATUS_CODE_HEADER_KEY);
102
103         // get http status code
104         if (httpStatusCode == null) {
105             return result;
106         }
107         final HttpStatus httpStatus = HttpStatus.resolve(Integer.parseInt(httpStatusCode.toString()));
108
109
110         // if status code is present and successful apply polling adjustments
111         if (httpStatus != null && httpStatus.is2xxSuccessful()) {
112             final boolean areMessagesPresent = areMessagesPresent(payload);
113             updateNextPollingInterval(areMessagesPresent);
114
115             final String requestId = AnalyticsHttpUtils.getRequestId(message.getHeaders());
116             final String transactionId = AnalyticsHttpUtils.getTransactionId(message.getHeaders());
117             final DebugLogSpec debugLogSpec = LogSpec.createDebugLogSpec(requestId);
118             eelfLogger.debugLog().debug("Request Id: {}, Transaction Id: {}, Messages Present: {}, " +
119                             "Next Polling Interval will be: {}", debugLogSpec, requestId, transactionId,
120                             String.valueOf(areMessagesPresent), nextPollingInterval.toString());
121
122             trigger.setPeriod(nextPollingInterval.get());
123
124             // if no messages were found in dmaap poll - terminate further processing
125             if (!areMessagesPresent) {
126                 eelfLogger.debugLog().debug("Request Id: {}, Transaction Id: {}, No new messages found in DMaaP MR Response. " +
127                         "No further processing required", debugLogSpec, requestId, transactionId);
128                 return null;
129             }
130
131         }
132
133         return result;
134     }
135
136     private boolean areMessagesPresent(final String payload) {
137
138         return !(payload.isEmpty() || payload.equals(DmaapMrConstants.SUBSCRIBER_EMPTY_MESSAGE_RESPONSE_STRING));
139     }
140
141     private void updateNextPollingInterval(final boolean areMessagesPresent) {
142         if (areMessagesPresent) {
143             nextPollingInterval.getAndUpdate(interval -> interval - stepDownPollingDelta <= minPollingInterval ?
144                     minPollingInterval : interval - stepDownPollingDelta);
145         } else {
146             nextPollingInterval.getAndUpdate(interval -> interval + stepUpPollingDelta >= maxPollingInterval ?
147                     maxPollingInterval : interval + stepUpPollingDelta);
148         }
149     }
150 }