2 * ================================================================================
3 * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
20 package org.onap.dcae.analytics.web.dmaap;
23 import java.util.concurrent.atomic.AtomicInteger;
25 import org.onap.dcae.analytics.model.AnalyticsHttpConstants;
26 import org.onap.dcae.analytics.model.DmaapMrConstants;
27 import org.onap.dcae.analytics.web.util.AnalyticsHttpUtils;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import org.springframework.http.HttpStatus;
31 import org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice;
32 import org.springframework.integration.support.MessageBuilder;
33 import org.springframework.integration.util.DynamicPeriodicTrigger;
34 import org.springframework.messaging.Message;
37 * A polling advice which can auto adjust polling intervals depending on DMaaP MR message availability.
38 * Can be configured to slow down polling when messages are not available and increase polling when messages are
41 * The next polling interval is <b>increased</b> by given step up delta if message is <b>not found</b> up to maximum
44 * The next polling interval is <b>decreased</b> by step down delta if message <b>is found</b> up to minimum
47 * @author Rajiv Singla
49 public class MrSubscriberPollingAdvice extends AbstractRequestHandlerAdvice {
51 private static final Logger log = LoggerFactory.getLogger(MrSubscriberPollingAdvice.class);
53 private final DynamicPeriodicTrigger trigger;
54 private final int minPollingInterval;
55 private final int stepUpPollingDelta;
56 private final int maxPollingInterval;
57 private final int stepDownPollingDelta;
59 private final AtomicInteger nextPollingInterval;
62 * Creates variable polling intervals based on message availability.
64 * @param trigger Dynamic Trigger instance
65 * @param minPollingInterval Minimum polling interval
66 * @param stepUpPollingDelta Delta by which next polling interval will be increased when message is not found
67 * @param maxPollingInterval Maximum polling interval
68 * @param stepDownPollingDelta Delta by which next polling interval will be decreased when message is found
70 public MrSubscriberPollingAdvice(final DynamicPeriodicTrigger trigger,
71 final int minPollingInterval,
72 final int stepUpPollingDelta,
73 final int maxPollingInterval,
74 final int stepDownPollingDelta) {
75 this.trigger = trigger;
76 this.minPollingInterval = minPollingInterval;
77 this.stepUpPollingDelta = stepUpPollingDelta;
78 this.maxPollingInterval = maxPollingInterval;
79 this.stepDownPollingDelta = stepDownPollingDelta;
80 nextPollingInterval = new AtomicInteger(minPollingInterval);
84 @SuppressWarnings("unchecked")
85 protected Object doInvoke(final ExecutionCallback callback, final Object target, final Message<?> message)
89 Object result = callback.execute();
91 // if result is not of type message builder just return
92 if (!(result instanceof MessageBuilder)) {
96 final MessageBuilder<String> resultMessageBuilder = (MessageBuilder<String>) result;
97 final String payload = resultMessageBuilder.getPayload();
98 final Map<String, Object> headers = resultMessageBuilder.getHeaders();
99 final Object httpStatusCode = headers.get(AnalyticsHttpConstants.HTTP_STATUS_CODE_HEADER_KEY);
101 // get http status code
102 if (httpStatusCode == null) {
105 final HttpStatus httpStatus = HttpStatus.resolve(Integer.parseInt(httpStatusCode.toString()));
108 // if status code is present and successful apply polling adjustments
109 if (httpStatus != null && httpStatus.is2xxSuccessful()) {
110 final boolean areMessagesPresent = areMessagesPresent(payload);
111 updateNextPollingInterval(areMessagesPresent);
113 final String requestId = AnalyticsHttpUtils.getRequestId(message.getHeaders());
114 final String transactionId = AnalyticsHttpUtils.getTransactionId(message.getHeaders());
116 log.debug("Request Id: {}, Transaction Id: {}, Messages Present: {}, " +
117 "Next Polling Interval will be: {}", requestId, transactionId,
118 areMessagesPresent, nextPollingInterval);
120 trigger.setPeriod(nextPollingInterval.get());
122 // if no messages were found in dmaap poll - terminate further processing
123 if (!areMessagesPresent) {
124 log.info("Request Id: {}, Transaction Id: {}, No new messages found in DMaaP MR Response. " +
125 "No further processing required", requestId, transactionId);
134 private boolean areMessagesPresent(final String payload) {
136 return !(payload.isEmpty() || payload.equals(DmaapMrConstants.SUBSCRIBER_EMPTY_MESSAGE_RESPONSE_STRING));
139 private void updateNextPollingInterval(final boolean areMessagesPresent) {
140 if (areMessagesPresent) {
141 nextPollingInterval.getAndUpdate(interval -> interval - stepDownPollingDelta <= minPollingInterval ?
142 minPollingInterval : interval - stepDownPollingDelta);
144 nextPollingInterval.getAndUpdate(interval -> interval + stepUpPollingDelta >= maxPollingInterval ?
145 maxPollingInterval : interval + stepUpPollingDelta);