Standalone TCA with EELF Logger
[dcaegen2/analytics/tca-gen2.git] / dcae-analytics / dcae-analytics-web / src / main / java / org / onap / dcae / analytics / web / dmaap / MrMessageSplitter.java
1 /*
2  * ================================================================================
3  * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  *
18  */
19
20 package org.onap.dcae.analytics.web.dmaap;
21
22 import static org.apache.commons.text.StringEscapeUtils.unescapeJava;
23 import static org.apache.commons.text.StringEscapeUtils.unescapeJson;
24 import static org.onap.dcae.analytics.model.AnalyticsHttpConstants.REQUEST_ID_HEADER_KEY;
25 import static org.onap.dcae.analytics.model.AnalyticsModelConstants.ANALYTICS_REQUEST_ID_DELIMITER;
26
27 import com.fasterxml.jackson.databind.JsonNode;
28 import com.fasterxml.jackson.databind.ObjectMapper;
29
30 import java.io.IOException;
31 import java.util.Collections;
32 import java.util.LinkedList;
33 import java.util.List;
34 import java.util.stream.IntStream;
35
36 import javax.annotation.Nonnull;
37 import javax.annotation.Nullable;
38
39 import org.onap.dcae.analytics.model.DmaapMrConstants;
40 import org.onap.dcae.analytics.web.exception.AnalyticsParsingException;
41 import org.onap.dcae.analytics.web.util.AnalyticsHttpUtils;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44 import org.springframework.integration.splitter.AbstractMessageSplitter;
45 import org.springframework.integration.support.MessageBuilder;
46 import org.springframework.messaging.Message;
47
48 /**
49  * DMaaP MR message splitter split the incoming messages into batch of given batch size
50  *
51  * @author Rajiv Singla
52  */
53 public class MrMessageSplitter extends AbstractMessageSplitter {
54
55     private static final Logger logger = LoggerFactory.getLogger(MrMessageSplitter.class);
56
57     private final ObjectMapper objectMapper;
58     private final Integer batchSize;
59
60     public MrMessageSplitter(@Nonnull final ObjectMapper objectMapper,
61                              @Nonnull final Integer batchSize) {
62         this.objectMapper = objectMapper;
63         this.batchSize = batchSize;
64     }
65
66     @Override
67     protected Object splitMessage(final Message<?> message) {
68
69         final List<String> dmaapMessages = convertJsonToStringMessages(String.class.cast(message.getPayload()).trim());
70
71         final String requestId = AnalyticsHttpUtils.getRequestId(message.getHeaders());
72         final String transactionId = AnalyticsHttpUtils.getTransactionId(message.getHeaders());
73
74         logger.info("Request Id: {}, Transaction Id: {}, Received new messages from DMaaP MR. Count: {}",
75                 requestId, transactionId, dmaapMessages.size());
76
77         final List<List<String>> messagePartitions = partition(dmaapMessages, batchSize);
78
79         logger.debug("Request Id: {}, Transaction Id: {},  Max allowed messages per batch: {}. " +
80                 "No of batches created: {}", requestId, transactionId, batchSize, messagePartitions.size());
81
82         // append batch id to request id header
83         return messagePartitions.isEmpty() ? null : IntStream.range(0, messagePartitions.size())
84                 .mapToObj(batchIndex ->
85                         MessageBuilder
86                                 .withPayload(messagePartitions.get(0))
87                                 .copyHeaders(message.getHeaders())
88                                 .setHeader(REQUEST_ID_HEADER_KEY,
89                                         requestId + ANALYTICS_REQUEST_ID_DELIMITER + batchIndex)
90                                 .build()
91
92                 );
93     }
94
95     /**
96      * Converts DMaaP MR subscriber messages json string to List of messages. If message Json String is empty
97      * or null
98      *
99      * @param messagesJsonString json messages String
100      *
101      * @return List containing DMaaP MR Messages
102      */
103     private List<String> convertJsonToStringMessages(@Nullable final String messagesJsonString) {
104
105         final LinkedList<String> messages = new LinkedList<>();
106
107         // If message string is not null or not empty parse json message array to List of string messages
108         if (messagesJsonString != null && !messagesJsonString.trim().isEmpty()
109                 && !DmaapMrConstants.SUBSCRIBER_EMPTY_MESSAGE_RESPONSE_STRING.equals(messagesJsonString.trim())) {
110
111             try {
112                 // get root node
113                 final JsonNode rootNode = objectMapper.readTree(messagesJsonString);
114                 // iterate over root node and parse arrays messages
115                 for (JsonNode jsonNode : rootNode) {
116                     // if array parse it is array of messages
117                     final String incomingMessageString = jsonNode.toString();
118                     if (jsonNode.isArray()) {
119                         final List messageList = objectMapper.readValue(incomingMessageString, List.class);
120                         for (Object message : messageList) {
121                             final String jsonMessageString = objectMapper.writeValueAsString(message);
122                             addUnescapedJsonToMessage(messages, jsonMessageString);
123                         }
124                     } else {
125                         // parse it as object
126                         addUnescapedJsonToMessage(messages, incomingMessageString);
127                     }
128                 }
129
130             } catch (IOException e) {
131                 final String errorMessage = String.format("Unable to convert subscriber Json String to Messages. " +
132                         "Subscriber Response String: %s, Json Error: %s", messagesJsonString, e);
133                 logger.error(errorMessage, e);
134                 throw new AnalyticsParsingException(errorMessage, e);
135             }
136
137         }
138         return messages;
139     }
140
141     /**
142      * Adds unescaped Json messages to given messages list
143      *
144      * @param messages message list in which unescaped messages will be added
145      * @param incomingMessageString incoming message string that may need to be escaped
146      */
147     private static void addUnescapedJsonToMessage(List<String> messages, String incomingMessageString) {
148         if (incomingMessageString.startsWith("\"") && incomingMessageString.endsWith("\"")) {
149             messages.add(unescapeJava(unescapeJson(
150                     incomingMessageString.substring(1, incomingMessageString.length() - 1))));
151         } else {
152             messages.add(unescapeJava(unescapeJson(incomingMessageString)));
153         }
154     }
155
156     /**
157      * Partition list into multiple lists
158      *
159      * @param list input list that needs to be broken into chunks
160      * @param batchSize batch size for each list
161      * @param <E> element type of the list
162      *
163      * @return List containing list of entries of specified batch size
164      */
165     private static <E> List<List<E>> partition(List<E> list, final Integer batchSize) {
166
167         if (list == null || batchSize == null || batchSize <= 0 || list.size() < batchSize) {
168             return Collections.singletonList(list);
169         }
170
171         final List<List<E>> result = new LinkedList<>();
172
173         for (int i = 0; i < list.size(); i++) {
174
175             if (i == 0 || i % batchSize == 0) {
176                 List<E> sublist = new LinkedList<>();
177                 result.add(sublist);
178             }
179
180             final List<E> lastSubList = result.get(result.size() - 1);
181             lastSubList.add(list.get(i));
182
183         }
184         return result;
185     }
186
187
188 }