1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.mr.client.impl;
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
28 import java.util.Collection;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.concurrent.ScheduledThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.locks.ReentrantReadWriteLock;
34 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
35 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
36 import java.util.zip.GZIPOutputStream;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
41 import com.att.nsa.apiClient.http.HttpClient;
42 import com.att.nsa.apiClient.http.HttpException;
43 import com.att.nsa.mr.client.MRBatchingPublisher;
44 import com.att.nsa.mr.client.response.MRPublisherResponse;
47 * This is a batching publisher class that allows the client to publish messages
48 * in batches that are limited in terms of size and/or hold time.
51 * @deprecated This class's tricky locking doesn't quite work
55 public class MRBatchPublisher implements MRBatchingPublisher
57 public static final long kMinMaxAgeMs = 1;
60 * Create a batch publisher.
62 * @param baseUrls the base URLs, like "localhost:8080". This class adds the correct application path.
63 * @param topic the topic to publish to
64 * @param maxBatchSize the maximum size of a batch
65 * @param maxAgeMs the maximum age of a batch
67 public MRBatchPublisher ( Collection<String> baseUrls, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
69 if ( maxAgeMs < kMinMaxAgeMs )
71 fLog.warn ( "Max age in ms is less than the minimum. Overriding to " + kMinMaxAgeMs );
72 maxAgeMs = kMinMaxAgeMs;
76 fSender = new Sender ( baseUrls, topic, maxBatchSize, maxAgeMs, compress );
77 } catch (MalformedURLException e) {
78 throw new RuntimeException(e);
81 // FIXME: this strategy needs an overhaul -- why not just run a thread that knows how to wait for
82 // the oldest msg to hit max age? (locking is complicated, but should be do-able)
83 fExec = new ScheduledThreadPoolExecutor ( 1 );
84 fExec.scheduleAtFixedRate ( fSender, 100, 50, TimeUnit.MILLISECONDS );
88 public void setApiCredentials ( String apiKey, String apiSecret )
90 fSender.setApiCredentials ( apiKey, apiSecret );
94 public void clearApiCredentials ()
96 fSender.clearApiCredentials ();
100 * Send the given message with the given partition
103 * @throws IOException
106 public int send ( String partition, String msg ) throws IOException
108 return send ( new message ( partition, msg ) );
111 public int send ( String msg ) throws IOException
113 return send ( new message ( "",msg ) );
116 * Send the given message
117 * @param userMsg a message
118 * @throws IOException
121 public int send ( message userMsg ) throws IOException
123 final LinkedList<message> list = new LinkedList<message> ();
124 list.add ( userMsg );
125 return send ( list );
129 * Send the given set of messages
130 * @param msgs the set of messages, sent in order of iteration
131 * @return the number of messages in the pending queue (this could actually be less than the size of the given collection, depending on thread timing)
132 * @throws IOException
135 public int send ( Collection<message> msgs ) throws IOException
137 if ( msgs.size() > 0 )
139 fSender.queue ( msgs );
141 return fSender.size ();
145 public int getPendingMessageCount ()
147 return fSender.size ();
151 * Send any pending messages and close this publisher.
152 * @throws IOException
153 * @throws InterruptedException
160 final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS );
161 if ( remains.size() > 0 )
163 fLog.warn ( "Closing publisher with " + remains.size() + " messages unsent. "
164 + "(Consider using the alternate close method to capture unsent messages in this case.)" );
167 catch ( InterruptedException e )
169 fLog.warn ( "Possible message loss. " + e.getMessage(), e );
170 Thread.currentThread().interrupt();
172 catch ( IOException e )
174 fLog.warn ( "Possible message loss. " + e.getMessage(), e );
178 public List<message> close ( long time, TimeUnit unit ) throws InterruptedException, IOException
180 fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false );
181 fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false );
184 final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit );
185 final long timeoutAtMs = System.currentTimeMillis () + waitInMs;
186 while ( System.currentTimeMillis () < timeoutAtMs && getPendingMessageCount() > 0 )
188 fSender.checkSend ( true );
189 Thread.sleep ( 250 );
192 final LinkedList<message> result = new LinkedList<message> ();
193 fSender.drainTo ( result );
197 private final ScheduledThreadPoolExecutor fExec;
198 private final Sender fSender;
200 private static class TimestampedMessage extends message
202 public TimestampedMessage ( message m )
205 timestamp = System.currentTimeMillis ();
207 public final long timestamp;
210 private Logger fLog = LoggerFactory.getLogger ( MRBatchPublisher.class );
212 private class Sender extends MRBaseClient implements Runnable
214 public Sender ( Collection<String> baseUrls, String topic, int maxBatch, long maxAgeMs, boolean compress ) throws MalformedURLException
218 fNextBatch = new LinkedList<TimestampedMessage> ();
219 fSendingBatch = null;
221 fMaxBatchSize = maxBatch;
222 fMaxAgeMs = maxAgeMs;
223 fCompress = compress;
224 fLock = new ReentrantReadWriteLock ();
225 fWriteLock = fLock.writeLock ();
226 fReadLock = fLock.readLock ();
227 fDontSendUntilMs = 0;
230 public void drainTo ( LinkedList<message> list )
235 if ( fSendingBatch != null )
237 list.addAll ( fSendingBatch );
239 list.addAll ( fNextBatch );
241 fSendingBatch = null;
246 fWriteLock.unlock ();
251 * Called periodically by the background executor.
260 catch ( IOException e )
262 fLog.warn ( "MR background send: " + e.getMessage () );
271 return fNextBatch.size () + ( fSendingBatch == null ? 0 : fSendingBatch.size () );
280 * Called to queue a message.
282 * @throws IOException
284 public void queue ( Collection<message> msgs ) throws IOException
289 for ( message userMsg : msgs )
291 if ( userMsg != null )
293 fNextBatch.add ( new TimestampedMessage ( userMsg ) );
297 fLog.warn ( "MRBatchPublisher::Sender::queue received a null message." );
309 * Send a batch if the queue is long enough, or the first pending message is old enough.
311 * @throws IOException
313 public void checkSend ( boolean force ) throws IOException
315 // hold a read lock just long enough to evaluate whether a batch
317 boolean shouldSend = false;
321 if ( fNextBatch.size () > 0 )
323 final long nowMs = System.currentTimeMillis ();
324 shouldSend = ( force || fNextBatch.size() >= fMaxBatchSize );
327 final long sendAtMs = fNextBatch.getFirst ().timestamp + fMaxAgeMs;
328 shouldSend = sendAtMs <= nowMs;
331 // however, unless forced, wait after an error
332 shouldSend = force || ( shouldSend && nowMs >= fDontSendUntilMs );
334 // else: even in 'force', there's nothing to send, so shouldSend=false is fine
341 // if a send is required, acquire a write lock, swap out the next batch,
342 // swap in a fresh batch, and release the lock for the caller to start
343 // filling a batch again. After releasing the lock, send the current
344 // batch. (There could be more messages added between read unlock and
345 // write lock, but that's fine.)
348 fSendingBatch = null;
353 fSendingBatch = fNextBatch;
354 fNextBatch = new LinkedList<TimestampedMessage> ();
358 fWriteLock.unlock ();
361 if ( !doSend ( fSendingBatch, this, fTopic, fCompress, fLog ) )
363 fLog.warn ( "Send failed, rebuilding send queue." );
365 // note the time for back-off
366 fDontSendUntilMs = sfWaitAfterError + System.currentTimeMillis ();
368 // the send failed. reconstruct the pending queue
372 final LinkedList<TimestampedMessage> nextGroup = fNextBatch;
373 fNextBatch = fSendingBatch;
374 fNextBatch.addAll ( nextGroup );
375 fSendingBatch = null;
376 fLog.info ( "Send queue rebuilt; " + fNextBatch.size () + " messages to send." );
380 fWriteLock.unlock ();
388 fSendingBatch = null;
392 fWriteLock.unlock ();
398 private LinkedList<TimestampedMessage> fNextBatch;
399 private LinkedList<TimestampedMessage> fSendingBatch;
400 private final String fTopic;
401 private final int fMaxBatchSize;
402 private final long fMaxAgeMs;
403 private final boolean fCompress;
404 private final ReentrantReadWriteLock fLock;
405 private final WriteLock fWriteLock;
406 private final ReadLock fReadLock;
407 private long fDontSendUntilMs;
408 private static final long sfWaitAfterError = 1000;
411 // this is static so that it's clearly not using any mutable member data outside of a lock
412 private static boolean doSend ( LinkedList<TimestampedMessage> toSend, HttpClient client, String topic, boolean compress, Logger log )
414 // it's possible for this call to be made with an empty list. in this case, just return.
415 if ( toSend.size() < 1 )
420 final long nowMs = System.currentTimeMillis ();
421 final String url = MRConstants.makeUrl ( topic );
423 log.info ( "sending " + toSend.size() + " msgs to " + url + ". Oldest: " + ( nowMs - toSend.getFirst().timestamp ) + " ms" );
425 final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
428 OutputStream os = baseStream;
431 os = new GZIPOutputStream ( baseStream );
433 for ( TimestampedMessage m : toSend )
435 os.write ( ( "" + m.fPartition.length () ).getBytes() );
437 os.write ( ( "" + m.fMsg.length () ).getBytes() );
439 os.write ( m.fPartition.getBytes() );
440 os.write ( m.fMsg.getBytes() );
445 catch ( IOException e )
447 log.warn ( "Problem writing stream to post: " + e.getMessage () );
451 boolean result = false;
452 final long startMs = System.currentTimeMillis ();
455 client.post ( url, compress ?
456 MRFormat.CAMBRIA_ZIP.toString () :
457 MRFormat.CAMBRIA.toString (),
458 baseStream.toByteArray(), false );
461 catch ( HttpException e )
463 log.warn ( "Problem posting to MR: " + e.getMessage() );
465 catch ( IOException e )
467 log.warn ( "Problem posting to MR: " + e.getMessage() );
470 log.info ( "MR response (" + (System.currentTimeMillis ()-startMs) + " ms): OK" );
475 public void logTo ( Logger log )
481 public MRPublisherResponse sendBatchWithResponse() {
482 // TODO Auto-generated method stub