2 * ================================================================================
3 * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
20 package org.onap.dcae.analytics.web.dmaap;
23 import java.util.concurrent.atomic.AtomicInteger;
25 import org.onap.dcae.analytics.model.AnalyticsHttpConstants;
26 import org.onap.dcae.analytics.model.DmaapMrConstants;
27 import org.onap.dcae.analytics.tca.core.util.LogSpec;
28 import org.onap.dcae.analytics.web.util.AnalyticsHttpUtils;
29 import org.onap.dcae.utils.eelf.logger.api.log.EELFLogFactory;
30 import org.onap.dcae.utils.eelf.logger.api.log.EELFLogger;
31 import org.onap.dcae.utils.eelf.logger.api.spec.DebugLogSpec;
32 import org.springframework.http.HttpStatus;
33 import org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice;
34 import org.springframework.integration.support.MessageBuilder;
35 import org.springframework.integration.util.DynamicPeriodicTrigger;
36 import org.springframework.messaging.Message;
39 * A polling advice which can auto adjust polling intervals depending on DMaaP MR message availability.
40 * Can be configured to slow down polling when messages are not available and increase polling when messages are
43 * The next polling interval is <b>increased</b> by given step up delta if message is <b>not found</b> up to maximum
46 * The next polling interval is <b>decreased</b> by step down delta if message <b>is found</b> up to minimum
49 * @author Rajiv Singla
51 public class MrSubscriberPollingAdvice extends AbstractRequestHandlerAdvice {
53 private static final EELFLogger eelfLogger = EELFLogFactory.getLogger(MrSubscriberPollingAdvice.class);
55 private final DynamicPeriodicTrigger trigger;
56 private final int minPollingInterval;
57 private final int stepUpPollingDelta;
58 private final int maxPollingInterval;
59 private final int stepDownPollingDelta;
61 private final AtomicInteger nextPollingInterval;
64 * Creates variable polling intervals based on message availability.
66 * @param trigger Dynamic Trigger instance
67 * @param minPollingInterval Minimum polling interval
68 * @param stepUpPollingDelta Delta by which next polling interval will be increased when message is not found
69 * @param maxPollingInterval Maximum polling interval
70 * @param stepDownPollingDelta Delta by which next polling interval will be decreased when message is found
72 public MrSubscriberPollingAdvice(final DynamicPeriodicTrigger trigger,
73 final int minPollingInterval,
74 final int stepUpPollingDelta,
75 final int maxPollingInterval,
76 final int stepDownPollingDelta) {
77 this.trigger = trigger;
78 this.minPollingInterval = minPollingInterval;
79 this.stepUpPollingDelta = stepUpPollingDelta;
80 this.maxPollingInterval = maxPollingInterval;
81 this.stepDownPollingDelta = stepDownPollingDelta;
82 nextPollingInterval = new AtomicInteger(minPollingInterval);
86 @SuppressWarnings("unchecked")
87 protected Object doInvoke(final ExecutionCallback callback, final Object target, final Message<?> message)
91 Object result = callback.execute();
93 // if result is not of type message builder just return
94 if (!(result instanceof MessageBuilder)) {
98 final MessageBuilder<String> resultMessageBuilder = (MessageBuilder<String>) result;
99 final String payload = resultMessageBuilder.getPayload();
100 final Map<String, Object> headers = resultMessageBuilder.getHeaders();
101 final Object httpStatusCode = headers.get(AnalyticsHttpConstants.HTTP_STATUS_CODE_HEADER_KEY);
103 // get http status code
104 if (httpStatusCode == null) {
107 final HttpStatus httpStatus = HttpStatus.resolve(Integer.parseInt(httpStatusCode.toString()));
110 // if status code is present and successful apply polling adjustments
111 if (httpStatus != null && httpStatus.is2xxSuccessful()) {
112 final boolean areMessagesPresent = areMessagesPresent(payload);
113 updateNextPollingInterval(areMessagesPresent);
115 final String requestId = AnalyticsHttpUtils.getRequestId(message.getHeaders());
116 final String transactionId = AnalyticsHttpUtils.getTransactionId(message.getHeaders());
117 final DebugLogSpec debugLogSpec = LogSpec.createDebugLogSpec(requestId);
118 eelfLogger.debugLog().debug("Request Id: {}, Transaction Id: {}, Messages Present: {}, " +
119 "Next Polling Interval will be: {}", debugLogSpec, requestId, transactionId,
120 String.valueOf(areMessagesPresent), nextPollingInterval.toString());
122 trigger.setPeriod(nextPollingInterval.get());
124 // if no messages were found in dmaap poll - terminate further processing
125 if (!areMessagesPresent) {
126 eelfLogger.debugLog().debug("Request Id: {}, Transaction Id: {}, No new messages found in DMaaP MR Response. " +
127 "No further processing required", debugLogSpec, requestId, transactionId);
136 private boolean areMessagesPresent(final String payload) {
138 return !(payload.isEmpty() || payload.equals(DmaapMrConstants.SUBSCRIBER_EMPTY_MESSAGE_RESPONSE_STRING));
141 private void updateNextPollingInterval(final boolean areMessagesPresent) {
142 if (areMessagesPresent) {
143 nextPollingInterval.getAndUpdate(interval -> interval - stepDownPollingDelta <= minPollingInterval ?
144 minPollingInterval : interval - stepDownPollingDelta);
146 nextPollingInterval.getAndUpdate(interval -> interval + stepUpPollingDelta >= maxPollingInterval ?
147 maxPollingInterval : interval + stepUpPollingDelta);