Standalone TCA with EELF Logger
[dcaegen2/analytics/tca-gen2.git] / dcae-analytics / dcae-analytics-web / src / main / java / org / onap / dcae / analytics / web / dmaap / MrSubscriberPollingAdvice.java
1 /*
2  * ================================================================================
3  * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  *
18  */
19
20 package org.onap.dcae.analytics.web.dmaap;
21
22 import java.util.Map;
23 import java.util.concurrent.atomic.AtomicInteger;
24
25 import org.onap.dcae.analytics.model.AnalyticsHttpConstants;
26 import org.onap.dcae.analytics.model.DmaapMrConstants;
27 import org.onap.dcae.analytics.web.util.AnalyticsHttpUtils;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import org.springframework.http.HttpStatus;
31 import org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice;
32 import org.springframework.integration.support.MessageBuilder;
33 import org.springframework.integration.util.DynamicPeriodicTrigger;
34 import org.springframework.messaging.Message;
35
36 /**
37  * A polling advice which can auto adjust polling intervals depending on DMaaP MR message availability.
38  * Can be configured to slow down polling when messages are not available and increase polling when messages are
39  * indeed available.
40  * <p>
41  * The next polling interval is <b>increased</b> by given step up delta if message is <b>not found</b> up to maximum
42  * Polling Interval
43  * <br>
44  * The next polling interval is <b>decreased</b> by step down delta if message <b>is found</b> up to minimum
45  * polling interval
46  *
47  * @author Rajiv Singla
48  */
49 public class MrSubscriberPollingAdvice extends AbstractRequestHandlerAdvice {
50
51     private static final Logger log = LoggerFactory.getLogger(MrSubscriberPollingAdvice.class);
52
53     private final DynamicPeriodicTrigger trigger;
54     private final int minPollingInterval;
55     private final int stepUpPollingDelta;
56     private final int maxPollingInterval;
57     private final int stepDownPollingDelta;
58
59     private final AtomicInteger nextPollingInterval;
60
61     /**
62      * Creates variable polling intervals based on message availability.
63      *
64      * @param trigger Dynamic Trigger instance
65      * @param minPollingInterval Minimum polling interval
66      * @param stepUpPollingDelta Delta by which next polling interval will be increased when message is not found
67      * @param maxPollingInterval Maximum polling interval
68      * @param stepDownPollingDelta Delta by which next polling interval will be decreased when message is found
69      */
70     public MrSubscriberPollingAdvice(final DynamicPeriodicTrigger trigger,
71                                      final int minPollingInterval,
72                                      final int stepUpPollingDelta,
73                                      final int maxPollingInterval,
74                                      final int stepDownPollingDelta) {
75         this.trigger = trigger;
76         this.minPollingInterval = minPollingInterval;
77         this.stepUpPollingDelta = stepUpPollingDelta;
78         this.maxPollingInterval = maxPollingInterval;
79         this.stepDownPollingDelta = stepDownPollingDelta;
80         nextPollingInterval = new AtomicInteger(minPollingInterval);
81     }
82
83     @Override
84     @SuppressWarnings("unchecked")
85     protected Object doInvoke(final ExecutionCallback callback, final Object target, final Message<?> message)
86             throws Exception {
87
88         // execute call back
89         Object result = callback.execute();
90
91         // if result is not of type message builder just return
92         if (!(result instanceof MessageBuilder)) {
93             return result;
94         }
95
96         final MessageBuilder<String> resultMessageBuilder = (MessageBuilder<String>) result;
97         final String payload = resultMessageBuilder.getPayload();
98         final Map<String, Object> headers = resultMessageBuilder.getHeaders();
99         final Object httpStatusCode = headers.get(AnalyticsHttpConstants.HTTP_STATUS_CODE_HEADER_KEY);
100
101         // get http status code
102         if (httpStatusCode == null) {
103             return result;
104         }
105         final HttpStatus httpStatus = HttpStatus.resolve(Integer.parseInt(httpStatusCode.toString()));
106
107
108         // if status code is present and successful apply polling adjustments
109         if (httpStatus != null && httpStatus.is2xxSuccessful()) {
110             final boolean areMessagesPresent = areMessagesPresent(payload);
111             updateNextPollingInterval(areMessagesPresent);
112
113             final String requestId = AnalyticsHttpUtils.getRequestId(message.getHeaders());
114             final String transactionId = AnalyticsHttpUtils.getTransactionId(message.getHeaders());
115
116             log.debug("Request Id: {}, Transaction Id: {}, Messages Present: {}, " +
117                             "Next Polling Interval will be: {}", requestId, transactionId,
118                     areMessagesPresent, nextPollingInterval);
119
120             trigger.setPeriod(nextPollingInterval.get());
121
122             // if no messages were found in dmaap poll - terminate further processing
123             if (!areMessagesPresent) {
124                 log.info("Request Id: {}, Transaction Id: {}, No new messages found in DMaaP MR Response. " +
125                         "No further processing required", requestId, transactionId);
126                 return null;
127             }
128
129         }
130
131         return result;
132     }
133
134     private boolean areMessagesPresent(final String payload) {
135
136         return !(payload.isEmpty() || payload.equals(DmaapMrConstants.SUBSCRIBER_EMPTY_MESSAGE_RESPONSE_STRING));
137     }
138
139     private void updateNextPollingInterval(final boolean areMessagesPresent) {
140         if (areMessagesPresent) {
141             nextPollingInterval.getAndUpdate(interval -> interval - stepDownPollingDelta <= minPollingInterval ?
142                     minPollingInterval : interval - stepDownPollingDelta);
143         } else {
144             nextPollingInterval.getAndUpdate(interval -> interval + stepUpPollingDelta >= maxPollingInterval ?
145                     maxPollingInterval : interval + stepUpPollingDelta);
146         }
147     }
148 }