[DMAAP-CLIENT] First sonar issues review part2
[dmaap/messagerouter/dmaapclient.git] / src / main / java / org / onap / dmaap / mr / client / impl / MRBatchPublisher.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Modifications Copyright © 2021 Orange.
8  *  ================================================================================
9  *  Licensed under the Apache License, Version 2.0 (the "License");
10  *  you may not use this file except in compliance with the License.
11  *  You may obtain a copy of the License at
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  *  ============LICENSE_END=========================================================
20  *
21  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  *
23  *******************************************************************************/
24
25 package org.onap.dmaap.mr.client.impl;
26
27 import com.att.nsa.apiClient.http.HttpClient;
28 import com.att.nsa.apiClient.http.HttpException;
29 import java.io.ByteArrayOutputStream;
30 import java.io.IOException;
31 import java.io.OutputStream;
32 import java.net.MalformedURLException;
33 import java.util.Collection;
34 import java.util.LinkedList;
35 import java.util.List;
36 import java.util.concurrent.ScheduledThreadPoolExecutor;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.locks.ReentrantReadWriteLock;
39 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
40 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
41 import java.util.zip.GZIPOutputStream;
42 import org.onap.dmaap.mr.client.MRBatchingPublisher;
43 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47
48 /**
49  * This is a batching publisher class that allows the client to publish messages
50  * in batches that are limited in terms of size and/or hold time.
51  *
52  * @author author
53  * @deprecated This class's tricky locking doesn't quite work
54  */
55 @Deprecated
56 public class MRBatchPublisher implements MRBatchingPublisher {
57     public static final long MIN_MAX_AGE_MS = 1;
58
59     /**
60      * Create a batch publisher.
61      *
62      * @param baseUrls     the base URLs, like "localhost:8080". This class adds the correct application path.
63      * @param topic        the topic to publish to
64      * @param maxBatchSize the maximum size of a batch
65      * @param maxAgeMs     the maximum age of a batch
66      */
67     public MRBatchPublisher(Collection<String> baseUrls, String topic, int maxBatchSize, long maxAgeMs, boolean compress) {
68         if (maxAgeMs < MIN_MAX_AGE_MS) {
69             logger.warn("Max age in ms is less than the minimum. Overriding to " + MIN_MAX_AGE_MS);
70             maxAgeMs = MIN_MAX_AGE_MS;
71         }
72
73         try {
74             fSender = new Sender(baseUrls, topic, maxBatchSize, maxAgeMs, compress);
75         } catch (MalformedURLException e) {
76             throw new IllegalArgumentException(e);
77         }
78
79         // FIXME: this strategy needs an overhaul -- why not just run a thread that knows how to wait for
80         // the oldest msg to hit max age? (locking is complicated, but should be do-able)
81         fExec = new ScheduledThreadPoolExecutor(1);
82         fExec.scheduleAtFixedRate(fSender, 100, 50, TimeUnit.MILLISECONDS);
83     }
84
85     @Override
86     public void setApiCredentials(String apiKey, String apiSecret) {
87         fSender.setApiCredentials(apiKey, apiSecret);
88     }
89
90     @Override
91     public void clearApiCredentials() {
92         fSender.clearApiCredentials();
93     }
94
95     /**
96      * Send the given message with the given partition.
97      *
98      * @param partition
99      * @param msg
100      * @throws IOException
101      */
102     @Override
103     public int send(String partition, String msg) throws IOException {
104         return send(new Message(partition, msg));
105     }
106
107     @Override
108     public int send(String msg) throws IOException {
109         return send(new Message("", msg));
110     }
111
112     /**
113      * Send the given message.
114      *
115      * @param userMsg a message
116      * @throws IOException
117      */
118     @Override
119     public int send(Message userMsg) throws IOException {
120         final LinkedList<Message> list = new LinkedList<>();
121         list.add(userMsg);
122         return send(list);
123     }
124
125     /**
126      * Send the given set of messages.
127      *
128      * @param msgs the set of messages, sent in order of iteration
129      * @return the number of messages in the pending queue (this could actually be less than the size of the given collection, depending on thread timing)
130      * @throws IOException
131      */
132     @Override
133     public int send(Collection<Message> msgs) throws IOException {
134         if (msgs.isEmpty()) {
135             fSender.queue(msgs);
136         }
137         return fSender.size();
138     }
139
140     @Override
141     public int getPendingMessageCount() {
142         return fSender.size();
143     }
144
145     /**
146      * Send any pending messages and close this publisher.
147      */
148     @Override
149     public void close() {
150         try {
151             final List<Message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
152             if (remains.isEmpty()) {
153                 logger.warn("Closing publisher with {} messages unsent. (Consider using the alternate close method to capture unsent messages in this case.)", remains.size());
154             }
155         } catch (InterruptedException e) {
156             logger.warn("Possible message loss. " + e.getMessage(), e);
157             Thread.currentThread().interrupt();
158         } catch (IOException e) {
159             logger.warn("Possible message loss. " + e.getMessage(), e);
160         }
161     }
162
163     public List<Message> close(long time, TimeUnit unit) throws InterruptedException, IOException {
164         fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
165         fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
166         fExec.shutdown();
167
168         final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
169         final long timeoutAtMs = System.currentTimeMillis() + waitInMs;
170         while (System.currentTimeMillis() < timeoutAtMs && getPendingMessageCount() > 0) {
171             fSender.checkSend(true);
172             Thread.sleep(250);
173         }
174
175         final LinkedList<Message> result = new LinkedList<>();
176         fSender.drainTo(result);
177         return result;
178     }
179
180     private final ScheduledThreadPoolExecutor fExec;
181     private final Sender fSender;
182
183     private static class TimestampedMessage extends Message {
184         public TimestampedMessage(Message m) {
185             super(m);
186             timestamp = System.currentTimeMillis();
187         }
188
189         public final long timestamp;
190     }
191
192     private Logger logger = LoggerFactory.getLogger(MRBatchPublisher.class);
193
194     private class Sender extends MRBaseClient implements Runnable {
195         public Sender(Collection<String> baseUrls, String topic, int maxBatch, long maxAgeMs, boolean compress) throws MalformedURLException {
196             super(baseUrls);
197
198             fNextBatch = new LinkedList<>();
199             fSendingBatch = null;
200             fTopic = topic;
201             fMaxBatchSize = maxBatch;
202             fMaxAgeMs = maxAgeMs;
203             fCompress = compress;
204             fLock = new ReentrantReadWriteLock();
205             fWriteLock = fLock.writeLock();
206             fReadLock = fLock.readLock();
207             fDontSendUntilMs = 0;
208         }
209
210         public void drainTo(List<Message> list) {
211             fWriteLock.lock();
212             try {
213                 if (fSendingBatch != null) {
214                     list.addAll(fSendingBatch);
215                 }
216                 list.addAll(fNextBatch);
217
218                 fSendingBatch = null;
219                 fNextBatch.clear();
220             } finally {
221                 fWriteLock.unlock();
222             }
223         }
224
225         /**
226          * Called periodically by the background executor.
227          */
228         @Override
229         public void run() {
230             try {
231                 checkSend(false);
232             } catch (Exception e) {
233                 logger.warn("MR background send: {}", e.getMessage());
234                 logger.error("IOException {}", e.getMessage());
235             }
236         }
237
238         public int size() {
239             fReadLock.lock();
240             try {
241                 return fNextBatch.size() + (fSendingBatch == null ? 0 : fSendingBatch.size());
242             } finally {
243                 fReadLock.unlock();
244             }
245         }
246
247         /**
248          * Called to queue a message.
249          *
250          * @param msgs
251          * @throws IOException
252          */
253         public void queue(Collection<Message> msgs) {
254             fWriteLock.lock();
255             try {
256                 for (Message userMsg : msgs) {
257                     if (userMsg != null) {
258                         fNextBatch.add(new TimestampedMessage(userMsg));
259                     } else {
260                         logger.warn("MRBatchPublisher::Sender::queue received a null message.");
261                     }
262                 }
263             } finally {
264                 fWriteLock.unlock();
265             }
266             checkSend(false);
267         }
268
269         /**
270          * Send a batch if the queue is long enough, or the first pending message is old enough.
271          *
272          * @param force
273          */
274         public void checkSend(boolean force) {
275             // hold a read lock just long enough to evaluate whether a batch
276             // should be sent
277             boolean shouldSend = false;
278             fReadLock.lock();
279             try {
280                 if (fNextBatch.isEmpty()) {
281                     final long nowMs = System.currentTimeMillis();
282                     shouldSend = (force || fNextBatch.size() >= fMaxBatchSize);
283                     if (!shouldSend) {
284                         final long sendAtMs = fNextBatch.getFirst().timestamp + fMaxAgeMs;
285                         shouldSend = sendAtMs <= nowMs;
286                     }
287
288                     // however, unless forced, wait after an error
289                     shouldSend = force || (shouldSend && nowMs >= fDontSendUntilMs);
290                 }
291                 // else: even in 'force', there's nothing to send, so shouldSend=false is fine
292             } finally {
293                 fReadLock.unlock();
294             }
295
296             // if a send is required, acquire a write lock, swap out the next batch,
297             // swap in a fresh batch, and release the lock for the caller to start
298             // filling a batch again. After releasing the lock, send the current
299             // batch. (There could be more messages added between read unlock and
300             // write lock, but that's fine.)
301             if (shouldSend) {
302                 fSendingBatch = null;
303
304                 fWriteLock.lock();
305                 try {
306                     fSendingBatch = fNextBatch;
307                     fNextBatch = new LinkedList<>();
308                 } finally {
309                     fWriteLock.unlock();
310                 }
311
312                 if (!doSend(fSendingBatch, this, fTopic, fCompress, logger)) {
313                     logger.warn("Send failed, rebuilding send queue.");
314
315                     // note the time for back-off
316                     fDontSendUntilMs = SF_WAIT_AFTER_ERROR + System.currentTimeMillis();
317
318                     // the send failed. reconstruct the pending queue
319                     fWriteLock.lock();
320                     try {
321                         final LinkedList<TimestampedMessage> nextGroup = fNextBatch;
322                         fNextBatch = fSendingBatch;
323                         fNextBatch.addAll(nextGroup);
324                         fSendingBatch = null;
325                         logger.info("Send queue rebuilt; {} messages to send.", fNextBatch.size());
326                     } finally {
327                         fWriteLock.unlock();
328                     }
329                 } else {
330                     fWriteLock.lock();
331                     try {
332                         fSendingBatch = null;
333                     } finally {
334                         fWriteLock.unlock();
335                     }
336                 }
337             }
338         }
339
340         private LinkedList<TimestampedMessage> fNextBatch;
341         private LinkedList<TimestampedMessage> fSendingBatch;
342         private final String fTopic;
343         private final int fMaxBatchSize;
344         private final long fMaxAgeMs;
345         private final boolean fCompress;
346         private final ReentrantReadWriteLock fLock;
347         private final WriteLock fWriteLock;
348         private final ReadLock fReadLock;
349         private long fDontSendUntilMs;
350         private static final long SF_WAIT_AFTER_ERROR = 1000;
351     }
352
353     // this is static so that it's clearly not using any mutable member data outside of a lock
354     private static boolean doSend(LinkedList<TimestampedMessage> toSend, HttpClient client, String topic, boolean compress, Logger log) {
355         // it's possible for this call to be made with an empty list. in this case, just return.
356         if (toSend.isEmpty()) {
357             return true;
358         }
359
360         final long nowMs = System.currentTimeMillis();
361         final String url = MRConstants.makeUrl(topic);
362
363         log.info("sending {} msgs to {}. Oldest: {} ms", toSend.size(), url, (nowMs - toSend.getFirst().timestamp));
364
365         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
366         try {
367             OutputStream os = baseStream;
368             if (compress) {
369                 os = new GZIPOutputStream(baseStream);
370             }
371             for (TimestampedMessage m : toSend) {
372                 os.write(("" + m.fPartition.length()).getBytes());
373                 os.write('.');
374                 os.write(("" + m.fMsg.length()).getBytes());
375                 os.write('.');
376                 os.write(m.fPartition.getBytes());
377                 os.write(m.fMsg.getBytes());
378                 os.write('\n');
379             }
380             os.close();
381         } catch (IOException e) {
382             log.warn("Problem writing stream to post: " + e.getMessage(), e);
383             return false;
384         }
385
386         boolean result = false;
387         final long startMs = System.currentTimeMillis();
388         try {
389             client.post(url,
390                     compress ? MRFormat.CAMBRIA_ZIP.toString() : MRFormat.CAMBRIA.toString(),
391                     baseStream.toByteArray(), false);
392             result = true;
393         } catch (HttpException | IOException e) {
394             log.warn("Problem posting to MR: " + e.getMessage(), e);
395         }
396
397         log.info("MR response ({} ms): OK", (System.currentTimeMillis() - startMs));
398         return result;
399     }
400
401     @Override
402     public void logTo(Logger log) {
403         logger = log;
404     }
405
406     @Override
407     public MRPublisherResponse sendBatchWithResponse() {
408         // Auto-generated method stub
409         return null;
410     }
411
412 }