sonar critical for InterruptedException
[dmaap/messagerouter/dmaapclient.git] / src / main / java / com / att / nsa / mr / client / impl / MRBatchPublisher.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.mr.client.impl;
23
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
28 import java.util.Collection;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.concurrent.ScheduledThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.locks.ReentrantReadWriteLock;
34 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
35 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
36 import java.util.zip.GZIPOutputStream;
37
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 import com.att.nsa.apiClient.http.HttpClient;
42 import com.att.nsa.apiClient.http.HttpException;
43 import com.att.nsa.mr.client.MRBatchingPublisher;
44 import com.att.nsa.mr.client.response.MRPublisherResponse;
45
46 /**
47  * This is a batching publisher class that allows the client to publish messages
48  * in batches that are limited in terms of size and/or hold time.
49  * 
50  * @author author
51  * @deprecated This class's tricky locking doesn't quite work
52  *
53  */
54 @Deprecated
55 public class MRBatchPublisher implements MRBatchingPublisher
56 {
57         public static final long kMinMaxAgeMs = 1;
58
59         /**
60          * Create a batch publisher.
61          * 
62          * @param baseUrls the base URLs, like "localhost:8080". This class adds the correct application path.
63          * @param topic the topic to publish to
64          * @param maxBatchSize the maximum size of a batch
65          * @param maxAgeMs the maximum age of a batch
66          */
67         public MRBatchPublisher ( Collection<String> baseUrls, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
68         {
69                 if ( maxAgeMs < kMinMaxAgeMs )
70                 {
71                         fLog.warn ( "Max age in ms is less than the minimum. Overriding to " + kMinMaxAgeMs );
72                         maxAgeMs = kMinMaxAgeMs;
73                 }
74
75                 try {
76                         fSender = new Sender ( baseUrls, topic, maxBatchSize, maxAgeMs, compress );
77                 } catch (MalformedURLException e) {
78                         throw new RuntimeException(e);
79                 }
80
81                 // FIXME: this strategy needs an overhaul -- why not just run a thread that knows how to wait for
82                 // the oldest msg to hit max age? (locking is complicated, but should be do-able)
83                 fExec = new ScheduledThreadPoolExecutor ( 1 );
84                 fExec.scheduleAtFixedRate ( fSender, 100, 50, TimeUnit.MILLISECONDS );
85         }
86
87         @Override
88         public void setApiCredentials ( String apiKey, String apiSecret )
89         {
90                 fSender.setApiCredentials ( apiKey, apiSecret );
91         }
92
93         @Override
94         public void clearApiCredentials ()
95         {
96                 fSender.clearApiCredentials ();
97         }
98
99         /**
100          * Send the given message with the given partition
101          * @param partition
102          * @param msg
103          * @throws IOException
104          */
105         @Override
106         public int send ( String partition, String msg ) throws IOException
107         {
108                 return send ( new message ( partition, msg ) );
109         }
110         @Override
111         public int send ( String msg ) throws IOException
112         {
113                 return send ( new message ( "",msg ) );
114         }
115         /**
116          * Send the given message
117          * @param userMsg a message
118          * @throws IOException
119          */
120         @Override
121         public int send ( message userMsg ) throws IOException
122         {
123                 final LinkedList<message> list = new LinkedList<message> ();
124                 list.add ( userMsg );
125                 return send ( list );
126         }
127
128         /**
129          * Send the given set of messages
130          * @param msgs the set of messages, sent in order of iteration
131          * @return the number of messages in the pending queue (this could actually be less than the size of the given collection, depending on thread timing)
132          * @throws IOException
133          */
134         @Override
135         public int send ( Collection<message> msgs ) throws IOException
136         {
137                 if ( msgs.size() > 0 )
138                 {
139                         fSender.queue ( msgs );
140                 }
141                 return fSender.size ();
142         }
143
144         @Override
145         public int getPendingMessageCount ()
146         {
147                 return fSender.size ();
148         }
149
150         /**
151          * Send any pending messages and close this publisher.
152          * @throws IOException
153          * @throws InterruptedException 
154          */
155         @Override
156         public void close ()
157         {
158                 try
159                 {
160                         final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS );
161                         if ( remains.size() > 0 )
162                         {
163                                 fLog.warn ( "Closing publisher with " + remains.size() + " messages unsent. "
164                                         + "(Consider using the alternate close method to capture unsent messages in this case.)" );
165                         }
166                 }
167                 catch ( InterruptedException e )
168                 {
169                         fLog.warn ( "Possible message loss. " + e.getMessage(), e );
170                         Thread.currentThread().interrupt();
171                 }
172                 catch ( IOException e )
173                 {
174                         fLog.warn ( "Possible message loss. " + e.getMessage(), e );
175                 }
176         }
177
178         public List<message> close ( long time, TimeUnit unit ) throws InterruptedException, IOException
179         {
180                 fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false );
181                 fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false );
182                 fExec.shutdown ();
183
184                 final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit );
185                 final long timeoutAtMs = System.currentTimeMillis () + waitInMs;
186                 while ( System.currentTimeMillis () < timeoutAtMs && getPendingMessageCount() > 0 )
187                 {
188                         fSender.checkSend ( true );
189                         Thread.sleep ( 250 );
190                 }
191
192                 final LinkedList<message> result = new LinkedList<message> ();
193                 fSender.drainTo ( result );
194                 return result;
195         }
196
197         private final ScheduledThreadPoolExecutor fExec;
198         private final Sender fSender;
199         
200         private static class TimestampedMessage extends message
201         {
202                 public TimestampedMessage ( message m )
203                 {
204                         super ( m );
205                         timestamp = System.currentTimeMillis ();
206                 }
207                 public final long timestamp;
208         }
209
210         private Logger fLog = LoggerFactory.getLogger ( MRBatchPublisher.class );
211
212         private class Sender extends MRBaseClient implements Runnable
213         {
214                 public Sender ( Collection<String> baseUrls, String topic, int maxBatch, long maxAgeMs, boolean compress ) throws MalformedURLException
215                 {
216                         super ( baseUrls );
217
218                         fNextBatch = new LinkedList<TimestampedMessage> ();
219                         fSendingBatch = null;
220                         fTopic = topic;
221                         fMaxBatchSize = maxBatch;
222                         fMaxAgeMs = maxAgeMs;
223                         fCompress = compress;
224                         fLock = new ReentrantReadWriteLock ();
225                         fWriteLock = fLock.writeLock ();
226                         fReadLock = fLock.readLock ();
227                         fDontSendUntilMs = 0;
228                 }
229
230                 public void drainTo ( LinkedList<message> list )
231                 {
232                         fWriteLock.lock ();
233                         try
234                         {
235                                 if ( fSendingBatch != null )
236                                 {
237                                         list.addAll ( fSendingBatch );
238                                 }
239                                 list.addAll ( fNextBatch );
240
241                                 fSendingBatch = null;
242                                 fNextBatch.clear ();
243                         }
244                         finally
245                         {
246                                 fWriteLock.unlock ();
247                         }
248                 }
249
250                 /**
251                  * Called periodically by the background executor.
252                  */
253                 @Override
254                 public void run ()
255                 {
256                         try
257                         {
258                                 checkSend ( false );
259                         }
260                         catch ( IOException e )
261                         {
262                                 fLog.warn ( "MR background send: " + e.getMessage () );
263                         }
264                 }
265
266                 public int size ()
267                 {
268                         fReadLock.lock ();
269                         try
270                         {
271                                 return fNextBatch.size () + ( fSendingBatch == null ? 0 : fSendingBatch.size () );
272                         }
273                         finally
274                         {
275                                 fReadLock.unlock ();
276                         }
277                 }
278                 
279                 /**
280                  * Called to queue a message.
281                  * @param m
282                  * @throws IOException 
283                  */
284                 public void queue ( Collection<message> msgs ) throws IOException
285                 {
286                         fWriteLock.lock ();
287                         try
288                         {
289                                 for ( message userMsg : msgs )
290                                 {
291                                         if ( userMsg != null )
292                                         {
293                                                 fNextBatch.add ( new TimestampedMessage ( userMsg ) );
294                                         }
295                                         else
296                                         {
297                                                 fLog.warn ( "MRBatchPublisher::Sender::queue received a null message." );
298                                         }
299                                 }
300                         }
301                         finally
302                         {
303                                 fWriteLock.unlock();
304                         }
305                         checkSend ( false );
306                 }
307
308                 /**
309                  * Send a batch if the queue is long enough, or the first pending message is old enough.
310                  * @param force
311                  * @throws IOException
312                  */
313                 public void checkSend ( boolean force ) throws IOException
314                 {
315                         // hold a read lock just long enough to evaluate whether a batch
316                         // should be sent
317                         boolean shouldSend = false;
318                         fReadLock.lock ();
319                         try
320                         {
321                                 if ( fNextBatch.size () > 0 )
322                                 {
323                                         final long nowMs = System.currentTimeMillis ();
324                                         shouldSend = ( force || fNextBatch.size() >= fMaxBatchSize );
325                                         if ( !shouldSend )
326                                         {
327                                                 final long sendAtMs = fNextBatch.getFirst ().timestamp + fMaxAgeMs;
328                                                 shouldSend = sendAtMs <= nowMs;
329                                         }
330                                         
331                                         // however, unless forced, wait after an error
332                                         shouldSend = force || ( shouldSend && nowMs >= fDontSendUntilMs ); 
333                                 }
334                                 // else: even in 'force', there's nothing to send, so shouldSend=false is fine
335                         }
336                         finally
337                         {
338                                 fReadLock.unlock ();
339                         }
340
341                         // if a send is required, acquire a write lock, swap out the next batch,
342                         // swap in a fresh batch, and release the lock for the caller to start
343                         // filling a batch again. After releasing the lock, send the current
344                         // batch. (There could be more messages added between read unlock and
345                         // write lock, but that's fine.)
346                         if ( shouldSend )
347                         {
348                                 fSendingBatch = null;
349
350                                 fWriteLock.lock ();
351                                 try
352                                 {
353                                         fSendingBatch = fNextBatch;
354                                         fNextBatch = new LinkedList<TimestampedMessage> ();
355                                 }
356                                 finally
357                                 {
358                                         fWriteLock.unlock ();
359                                 }
360
361                                 if ( !doSend ( fSendingBatch, this, fTopic, fCompress, fLog ) )
362                                 {
363                                         fLog.warn ( "Send failed, rebuilding send queue." );
364
365                                         // note the time for back-off
366                                         fDontSendUntilMs = sfWaitAfterError + System.currentTimeMillis ();
367
368                                         // the send failed. reconstruct the pending queue
369                                         fWriteLock.lock ();
370                                         try
371                                         {
372                                                 final LinkedList<TimestampedMessage> nextGroup = fNextBatch;
373                                                 fNextBatch = fSendingBatch;
374                                                 fNextBatch.addAll ( nextGroup );
375                                                 fSendingBatch = null;
376                                                 fLog.info ( "Send queue rebuilt; " + fNextBatch.size () + " messages to send." );
377                                         }
378                                         finally
379                                         {
380                                                 fWriteLock.unlock ();
381                                         }
382                                 }
383                                 else
384                                 {
385                                         fWriteLock.lock ();
386                                         try
387                                         {
388                                                 fSendingBatch = null;
389                                         }
390                                         finally
391                                         {
392                                                 fWriteLock.unlock ();
393                                         }
394                                 }
395                         }
396                 }
397
398                 private LinkedList<TimestampedMessage> fNextBatch;
399                 private LinkedList<TimestampedMessage> fSendingBatch;
400                 private final String fTopic;
401                 private final int fMaxBatchSize;
402                 private final long fMaxAgeMs;
403                 private final boolean fCompress;
404                 private final ReentrantReadWriteLock fLock;
405                 private final WriteLock fWriteLock;
406                 private final ReadLock fReadLock;
407                 private long fDontSendUntilMs;
408                 private static final long sfWaitAfterError = 1000;
409         }
410
411         // this is static so that it's clearly not using any mutable member data outside of a lock
412         private static boolean doSend ( LinkedList<TimestampedMessage> toSend, HttpClient client, String topic, boolean compress, Logger log )
413         {
414                 // it's possible for this call to be made with an empty list. in this case, just return.
415                 if ( toSend.size() < 1 )
416                 {
417                         return true;
418                 }
419
420                 final long nowMs = System.currentTimeMillis ();
421                 final String url = MRConstants.makeUrl ( topic );
422
423                 log.info ( "sending " + toSend.size() + " msgs to " + url + ". Oldest: " + ( nowMs - toSend.getFirst().timestamp ) + " ms"  );
424
425                 final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
426                 try
427                 {
428                         OutputStream os = baseStream;
429                         if ( compress )
430                         {
431                                 os = new GZIPOutputStream ( baseStream );
432                         }
433                         for ( TimestampedMessage m : toSend )
434                         {
435                                 os.write ( ( "" + m.fPartition.length () ).getBytes() );
436                                 os.write ( '.' );
437                                 os.write ( ( "" + m.fMsg.length () ).getBytes() );
438                                 os.write ( '.' );
439                                 os.write ( m.fPartition.getBytes() );
440                                 os.write ( m.fMsg.getBytes() );
441                                 os.write ( '\n' );
442                         }
443                         os.close ();
444                 }
445                 catch ( IOException e )
446                 {
447                         log.warn ( "Problem writing stream to post: " + e.getMessage () );
448                         return false;
449                 }
450
451                 boolean result = false;
452                 final long startMs = System.currentTimeMillis ();
453                 try
454                 {
455                         client.post ( url, compress ?
456                                 MRFormat.CAMBRIA_ZIP.toString () :
457                                 MRFormat.CAMBRIA.toString (),
458                                 baseStream.toByteArray(), false );
459                         result = true;
460                 }
461                 catch ( HttpException e )
462                 {
463                         log.warn ( "Problem posting to MR: " + e.getMessage() );
464                 }
465                 catch ( IOException e )
466                 {
467                         log.warn ( "Problem posting to MR: " + e.getMessage() );
468                 }
469
470                 log.info ( "MR response (" + (System.currentTimeMillis ()-startMs) + " ms): OK" );
471                 return result;
472         }
473
474         @Override
475         public void logTo ( Logger log )
476         {
477                 fLog = log;
478         }
479
480         @Override
481         public MRPublisherResponse sendBatchWithResponse() {
482                 // TODO Auto-generated method stub
483                 return null;
484         }
485         
486 }