1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.mr.client.impl;
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
28 import java.util.Collection;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.concurrent.ScheduledThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.locks.ReentrantReadWriteLock;
34 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
35 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
36 import java.util.zip.GZIPOutputStream;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
41 import com.att.nsa.apiClient.http.HttpClient;
42 import com.att.nsa.apiClient.http.HttpException;
43 import com.att.nsa.mr.client.MRBatchingPublisher;
44 import com.att.nsa.mr.client.response.MRPublisherResponse;
47 * This is a batching publisher class that allows the client to publish messages
48 * in batches that are limited in terms of size and/or hold time.
51 * @deprecated This class's tricky locking doesn't quite work
55 public class MRBatchPublisher implements MRBatchingPublisher
57 public static final long kMinMaxAgeMs = 1;
60 * Create a batch publisher.
62 * @param baseUrls the base URLs, like "localhost:8080". This class adds the correct application path.
63 * @param topic the topic to publish to
64 * @param maxBatchSize the maximum size of a batch
65 * @param maxAgeMs the maximum age of a batch
67 public MRBatchPublisher ( Collection<String> baseUrls, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
69 if ( maxAgeMs < kMinMaxAgeMs )
71 fLog.warn ( "Max age in ms is less than the minimum. Overriding to " + kMinMaxAgeMs );
72 maxAgeMs = kMinMaxAgeMs;
76 fSender = new Sender ( baseUrls, topic, maxBatchSize, maxAgeMs, compress );
77 } catch (MalformedURLException e) {
78 throw new IllegalArgumentException (e);
81 // FIXME: this strategy needs an overhaul -- why not just run a thread that knows how to wait for
82 // the oldest msg to hit max age? (locking is complicated, but should be do-able)
83 fExec = new ScheduledThreadPoolExecutor ( 1 );
84 fExec.scheduleAtFixedRate ( fSender, 100, 50, TimeUnit.MILLISECONDS );
88 public void setApiCredentials ( String apiKey, String apiSecret )
90 fSender.setApiCredentials ( apiKey, apiSecret );
94 public void clearApiCredentials ()
96 fSender.clearApiCredentials ();
100 * Send the given message with the given partition
103 * @throws IOException
106 public int send ( String partition, String msg ) throws IOException
108 return send ( new message ( partition, msg ) );
111 public int send ( String msg ) throws IOException
113 return send ( new message ( "",msg ) );
116 * Send the given message
117 * @param userMsg a message
118 * @throws IOException
121 public int send ( message userMsg ) throws IOException
123 final LinkedList<message> list = new LinkedList<message> ();
124 list.add ( userMsg );
125 return send ( list );
129 * Send the given set of messages
130 * @param msgs the set of messages, sent in order of iteration
131 * @return the number of messages in the pending queue (this could actually be less than the size of the given collection, depending on thread timing)
132 * @throws IOException
135 public int send ( Collection<message> msgs ) throws IOException
137 if ( msgs.isEmpty() )
139 fSender.queue ( msgs );
141 return fSender.size ();
145 public int getPendingMessageCount ()
147 return fSender.size ();
151 * Send any pending messages and close this publisher.
152 * @throws IOException
153 * @throws InterruptedException
160 final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS );
161 if ( remains.isEmpty() )
163 fLog.warn ( "Closing publisher with " + remains.size() + " messages unsent. "
164 + "(Consider using the alternate close method to capture unsent messages in this case.)" );
167 catch ( InterruptedException e )
169 fLog.warn ( "Possible message loss. " + e.getMessage(), e );
170 Thread.currentThread().interrupt();
172 catch ( IOException e )
174 fLog.warn ( "Possible message loss. " + e.getMessage(), e );
178 public List<message> close ( long time, TimeUnit unit ) throws InterruptedException, IOException
180 fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false );
181 fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false );
184 final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit );
185 final long timeoutAtMs = System.currentTimeMillis () + waitInMs;
186 while ( System.currentTimeMillis () < timeoutAtMs && getPendingMessageCount() > 0 )
188 fSender.checkSend ( true );
189 Thread.sleep ( 250 );
192 final LinkedList<message> result = new LinkedList<message> ();
193 fSender.drainTo ( result );
197 private final ScheduledThreadPoolExecutor fExec;
198 private final Sender fSender;
200 private static class TimestampedMessage extends message
202 public TimestampedMessage ( message m )
205 timestamp = System.currentTimeMillis ();
207 public final long timestamp;
210 private Logger fLog = LoggerFactory.getLogger ( MRBatchPublisher.class );
212 private class Sender extends MRBaseClient implements Runnable
214 public Sender ( Collection<String> baseUrls, String topic, int maxBatch, long maxAgeMs, boolean compress ) throws MalformedURLException
218 fNextBatch = new LinkedList<TimestampedMessage> ();
219 fSendingBatch = null;
221 fMaxBatchSize = maxBatch;
222 fMaxAgeMs = maxAgeMs;
223 fCompress = compress;
224 fLock = new ReentrantReadWriteLock ();
225 fWriteLock = fLock.writeLock ();
226 fReadLock = fLock.readLock ();
227 fDontSendUntilMs = 0;
230 public void drainTo ( LinkedList<message> list )
235 if ( fSendingBatch != null )
237 list.addAll ( fSendingBatch );
239 list.addAll ( fNextBatch );
241 fSendingBatch = null;
246 fWriteLock.unlock ();
251 * Called periodically by the background executor.
260 catch ( IOException e )
262 fLog.warn ( "MR background send: " + e.getMessage () );
263 fLog.error( "IOException " + e );
272 return fNextBatch.size () + ( fSendingBatch == null ? 0 : fSendingBatch.size () );
281 * Called to queue a message.
283 * @throws IOException
285 public void queue ( Collection<message> msgs ) throws IOException
290 for ( message userMsg : msgs )
292 if ( userMsg != null )
294 fNextBatch.add ( new TimestampedMessage ( userMsg ) );
298 fLog.warn ( "MRBatchPublisher::Sender::queue received a null message." );
310 * Send a batch if the queue is long enough, or the first pending message is old enough.
312 * @throws IOException
314 public void checkSend ( boolean force ) throws IOException
316 // hold a read lock just long enough to evaluate whether a batch
318 boolean shouldSend = false;
322 if ( fNextBatch.isEmpty() )
324 final long nowMs = System.currentTimeMillis ();
325 shouldSend = ( force || fNextBatch.size() >= fMaxBatchSize );
328 final long sendAtMs = fNextBatch.getFirst ().timestamp + fMaxAgeMs;
329 shouldSend = sendAtMs <= nowMs;
332 // however, unless forced, wait after an error
333 shouldSend = force || ( shouldSend && nowMs >= fDontSendUntilMs );
335 // else: even in 'force', there's nothing to send, so shouldSend=false is fine
342 // if a send is required, acquire a write lock, swap out the next batch,
343 // swap in a fresh batch, and release the lock for the caller to start
344 // filling a batch again. After releasing the lock, send the current
345 // batch. (There could be more messages added between read unlock and
346 // write lock, but that's fine.)
349 fSendingBatch = null;
354 fSendingBatch = fNextBatch;
355 fNextBatch = new LinkedList<TimestampedMessage> ();
359 fWriteLock.unlock ();
362 if ( !doSend ( fSendingBatch, this, fTopic, fCompress, fLog ) )
364 fLog.warn ( "Send failed, rebuilding send queue." );
366 // note the time for back-off
367 fDontSendUntilMs = sfWaitAfterError + System.currentTimeMillis ();
369 // the send failed. reconstruct the pending queue
373 final LinkedList<TimestampedMessage> nextGroup = fNextBatch;
374 fNextBatch = fSendingBatch;
375 fNextBatch.addAll ( nextGroup );
376 fSendingBatch = null;
377 fLog.info ( "Send queue rebuilt; " + fNextBatch.size () + " messages to send." );
381 fWriteLock.unlock ();
389 fSendingBatch = null;
393 fWriteLock.unlock ();
399 private LinkedList<TimestampedMessage> fNextBatch;
400 private LinkedList<TimestampedMessage> fSendingBatch;
401 private final String fTopic;
402 private final int fMaxBatchSize;
403 private final long fMaxAgeMs;
404 private final boolean fCompress;
405 private final ReentrantReadWriteLock fLock;
406 private final WriteLock fWriteLock;
407 private final ReadLock fReadLock;
408 private long fDontSendUntilMs;
409 private static final long sfWaitAfterError = 1000;
412 // this is static so that it's clearly not using any mutable member data outside of a lock
413 private static boolean doSend ( LinkedList<TimestampedMessage> toSend, HttpClient client, String topic, boolean compress, Logger log )
415 // it's possible for this call to be made with an empty list. in this case, just return.
416 if ( toSend.isEmpty() )
421 final long nowMs = System.currentTimeMillis ();
422 final String url = MRConstants.makeUrl ( topic );
424 log.info ( "sending " + toSend.size() + " msgs to " + url + ". Oldest: " + ( nowMs - toSend.getFirst().timestamp ) + " ms" );
426 final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
429 OutputStream os = baseStream;
432 os = new GZIPOutputStream ( baseStream );
434 for ( TimestampedMessage m : toSend )
436 os.write ( ( "" + m.fPartition.length () ).getBytes() );
438 os.write ( ( "" + m.fMsg.length () ).getBytes() );
440 os.write ( m.fPartition.getBytes() );
441 os.write ( m.fMsg.getBytes() );
446 catch ( IOException e )
448 log.warn ( "Problem writing stream to post: " + e.getMessage (),e );
452 boolean result = false;
453 final long startMs = System.currentTimeMillis ();
456 client.post ( url, compress ?
457 MRFormat.CAMBRIA_ZIP.toString () :
458 MRFormat.CAMBRIA.toString (),
459 baseStream.toByteArray(), false );
462 catch ( HttpException e )
464 log.warn ( "Problem posting to MR: " + e.getMessage(),e );
466 catch ( IOException e )
468 log.warn ( "Problem posting to MR: " + e.getMessage(),e );
471 log.info ( "MR response (" + (System.currentTimeMillis ()-startMs) + " ms): OK" );
476 public void logTo ( Logger log )
482 public MRPublisherResponse sendBatchWithResponse() {
483 // TODO Auto-generated method stub