1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Modifications Copyright © 2021 Orange.
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 *******************************************************************************/
25 package org.onap.dmaap.mr.client.impl;
27 import com.att.nsa.apiClient.http.HttpClient;
28 import com.att.nsa.apiClient.http.HttpException;
29 import java.io.ByteArrayOutputStream;
30 import java.io.IOException;
31 import java.io.OutputStream;
32 import java.net.MalformedURLException;
33 import java.util.Collection;
34 import java.util.LinkedList;
35 import java.util.List;
36 import java.util.concurrent.ScheduledThreadPoolExecutor;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.locks.ReentrantReadWriteLock;
39 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
40 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
41 import java.util.zip.GZIPOutputStream;
42 import org.onap.dmaap.mr.client.MRBatchingPublisher;
43 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
49 * This is a batching publisher class that allows the client to publish messages
50 * in batches that are limited in terms of size and/or hold time.
53 * @deprecated This class's tricky locking doesn't quite work
56 public class MRBatchPublisher implements MRBatchingPublisher {
57 public static final long MIN_MAX_AGE_MS = 1;
60 * Create a batch publisher.
62 * @param baseUrls the base URLs, like "localhost:8080". This class adds the correct application path.
63 * @param topic the topic to publish to
64 * @param maxBatchSize the maximum size of a batch
65 * @param maxAgeMs the maximum age of a batch
67 public MRBatchPublisher(Collection<String> baseUrls, String topic, int maxBatchSize, long maxAgeMs, boolean compress) {
68 if (maxAgeMs < MIN_MAX_AGE_MS) {
69 logger.warn("Max age in ms is less than the minimum. Overriding to " + MIN_MAX_AGE_MS);
70 maxAgeMs = MIN_MAX_AGE_MS;
74 fSender = new Sender(baseUrls, topic, maxBatchSize, maxAgeMs, compress);
75 } catch (MalformedURLException e) {
76 throw new IllegalArgumentException(e);
79 // FIXME: this strategy needs an overhaul -- why not just run a thread that knows how to wait for
80 // the oldest msg to hit max age? (locking is complicated, but should be do-able)
81 fExec = new ScheduledThreadPoolExecutor(1);
82 fExec.scheduleAtFixedRate(fSender, 100, 50, TimeUnit.MILLISECONDS);
86 public void setApiCredentials(String apiKey, String apiSecret) {
87 fSender.setApiCredentials(apiKey, apiSecret);
91 public void clearApiCredentials() {
92 fSender.clearApiCredentials();
96 * Send the given message with the given partition.
100 * @throws IOException
103 public int send(String partition, String msg) throws IOException {
104 return send(new Message(partition, msg));
108 public int send(String msg) throws IOException {
109 return send(new Message("", msg));
113 * Send the given message.
115 * @param userMsg a message
116 * @throws IOException
119 public int send(Message userMsg) throws IOException {
120 final LinkedList<Message> list = new LinkedList<>();
126 * Send the given set of messages.
128 * @param msgs the set of messages, sent in order of iteration
129 * @return the number of messages in the pending queue (this could actually be less than the size of the given collection, depending on thread timing)
130 * @throws IOException
133 public int send(Collection<Message> msgs) throws IOException {
134 if (msgs.isEmpty()) {
137 return fSender.size();
141 public int getPendingMessageCount() {
142 return fSender.size();
146 * Send any pending messages and close this publisher.
149 public void close() {
151 final List<Message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
152 if (remains.isEmpty()) {
153 logger.warn("Closing publisher with {} messages unsent. (Consider using the alternate close method to capture unsent messages in this case.)", remains.size());
155 } catch (InterruptedException e) {
156 logger.warn("Possible message loss. " + e.getMessage(), e);
157 Thread.currentThread().interrupt();
158 } catch (IOException e) {
159 logger.warn("Possible message loss. " + e.getMessage(), e);
163 public List<Message> close(long time, TimeUnit unit) throws InterruptedException, IOException {
164 fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
165 fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
168 final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
169 final long timeoutAtMs = System.currentTimeMillis() + waitInMs;
170 while (System.currentTimeMillis() < timeoutAtMs && getPendingMessageCount() > 0) {
171 fSender.checkSend(true);
175 final LinkedList<Message> result = new LinkedList<>();
176 fSender.drainTo(result);
180 private final ScheduledThreadPoolExecutor fExec;
181 private final Sender fSender;
183 private static class TimestampedMessage extends Message {
184 public TimestampedMessage(Message m) {
186 timestamp = System.currentTimeMillis();
189 public final long timestamp;
192 private Logger logger = LoggerFactory.getLogger(MRBatchPublisher.class);
194 private class Sender extends MRBaseClient implements Runnable {
195 public Sender(Collection<String> baseUrls, String topic, int maxBatch, long maxAgeMs, boolean compress) throws MalformedURLException {
198 fNextBatch = new LinkedList<>();
199 fSendingBatch = null;
201 fMaxBatchSize = maxBatch;
202 fMaxAgeMs = maxAgeMs;
203 fCompress = compress;
204 fLock = new ReentrantReadWriteLock();
205 fWriteLock = fLock.writeLock();
206 fReadLock = fLock.readLock();
207 fDontSendUntilMs = 0;
210 public void drainTo(List<Message> list) {
213 if (fSendingBatch != null) {
214 list.addAll(fSendingBatch);
216 list.addAll(fNextBatch);
218 fSendingBatch = null;
226 * Called periodically by the background executor.
232 } catch (Exception e) {
233 logger.warn("MR background send: {}", e.getMessage());
234 logger.error("IOException {}", e.getMessage());
241 return fNextBatch.size() + (fSendingBatch == null ? 0 : fSendingBatch.size());
248 * Called to queue a message.
251 * @throws IOException
253 public void queue(Collection<Message> msgs) {
256 for (Message userMsg : msgs) {
257 if (userMsg != null) {
258 fNextBatch.add(new TimestampedMessage(userMsg));
260 logger.warn("MRBatchPublisher::Sender::queue received a null message.");
270 * Send a batch if the queue is long enough, or the first pending message is old enough.
274 public void checkSend(boolean force) {
275 // hold a read lock just long enough to evaluate whether a batch
277 boolean shouldSend = false;
280 if (fNextBatch.isEmpty()) {
281 final long nowMs = System.currentTimeMillis();
282 shouldSend = (force || fNextBatch.size() >= fMaxBatchSize);
284 final long sendAtMs = fNextBatch.getFirst().timestamp + fMaxAgeMs;
285 shouldSend = sendAtMs <= nowMs;
288 // however, unless forced, wait after an error
289 shouldSend = force || (shouldSend && nowMs >= fDontSendUntilMs);
291 // else: even in 'force', there's nothing to send, so shouldSend=false is fine
296 // if a send is required, acquire a write lock, swap out the next batch,
297 // swap in a fresh batch, and release the lock for the caller to start
298 // filling a batch again. After releasing the lock, send the current
299 // batch. (There could be more messages added between read unlock and
300 // write lock, but that's fine.)
302 fSendingBatch = null;
306 fSendingBatch = fNextBatch;
307 fNextBatch = new LinkedList<>();
312 if (!doSend(fSendingBatch, this, fTopic, fCompress, logger)) {
313 logger.warn("Send failed, rebuilding send queue.");
315 // note the time for back-off
316 fDontSendUntilMs = SF_WAIT_AFTER_ERROR + System.currentTimeMillis();
318 // the send failed. reconstruct the pending queue
321 final LinkedList<TimestampedMessage> nextGroup = fNextBatch;
322 fNextBatch = fSendingBatch;
323 fNextBatch.addAll(nextGroup);
324 fSendingBatch = null;
325 logger.info("Send queue rebuilt; {} messages to send.", fNextBatch.size());
332 fSendingBatch = null;
340 private LinkedList<TimestampedMessage> fNextBatch;
341 private LinkedList<TimestampedMessage> fSendingBatch;
342 private final String fTopic;
343 private final int fMaxBatchSize;
344 private final long fMaxAgeMs;
345 private final boolean fCompress;
346 private final ReentrantReadWriteLock fLock;
347 private final WriteLock fWriteLock;
348 private final ReadLock fReadLock;
349 private long fDontSendUntilMs;
350 private static final long SF_WAIT_AFTER_ERROR = 1000;
353 // this is static so that it's clearly not using any mutable member data outside of a lock
354 private static boolean doSend(LinkedList<TimestampedMessage> toSend, HttpClient client, String topic, boolean compress, Logger log) {
355 // it's possible for this call to be made with an empty list. in this case, just return.
356 if (toSend.isEmpty()) {
360 final long nowMs = System.currentTimeMillis();
361 final String url = MRConstants.makeUrl(topic);
363 log.info("sending {} msgs to {}. Oldest: {} ms", toSend.size(), url, (nowMs - toSend.getFirst().timestamp));
365 final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
367 OutputStream os = baseStream;
369 os = new GZIPOutputStream(baseStream);
371 for (TimestampedMessage m : toSend) {
372 os.write(("" + m.fPartition.length()).getBytes());
374 os.write(("" + m.fMsg.length()).getBytes());
376 os.write(m.fPartition.getBytes());
377 os.write(m.fMsg.getBytes());
381 } catch (IOException e) {
382 log.warn("Problem writing stream to post: " + e.getMessage(), e);
386 boolean result = false;
387 final long startMs = System.currentTimeMillis();
390 compress ? MRFormat.CAMBRIA_ZIP.toString() : MRFormat.CAMBRIA.toString(),
391 baseStream.toByteArray(), false);
393 } catch (HttpException | IOException e) {
394 log.warn("Problem posting to MR: " + e.getMessage(), e);
397 log.info("MR response ({} ms): OK", (System.currentTimeMillis() - startMs));
402 public void logTo(Logger log) {
407 public MRPublisherResponse sendBatchWithResponse() {
408 // Auto-generated method stub