bcd44039c9c5842ed684747fd056f0a332756f0f
[dmaap/messagerouter/dmaapclient.git] / src / main / java / org / onap / dmaap / mr / client / impl / MRBatchPublisher.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.mr.client.impl;
23
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
28 import java.util.Collection;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.concurrent.ScheduledThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.locks.ReentrantReadWriteLock;
34 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
35 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
36 import java.util.zip.GZIPOutputStream;
37
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 import com.att.nsa.apiClient.http.HttpClient;
42 import com.att.nsa.apiClient.http.HttpException;
43 import org.onap.dmaap.mr.client.MRBatchingPublisher;
44 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
45
46 /**
47  * This is a batching publisher class that allows the client to publish messages
48  * in batches that are limited in terms of size and/or hold time.
49  * 
50  * @author author
51  * @deprecated This class's tricky locking doesn't quite work
52  *
53  */
54 @Deprecated
55 public class MRBatchPublisher implements MRBatchingPublisher
56 {
57         public static final long kMinMaxAgeMs = 1;
58
59         /**
60          * Create a batch publisher.
61          * 
62          * @param baseUrls the base URLs, like "localhost:8080". This class adds the correct application path.
63          * @param topic the topic to publish to
64          * @param maxBatchSize the maximum size of a batch
65          * @param maxAgeMs the maximum age of a batch
66          */
67         public MRBatchPublisher ( Collection<String> baseUrls, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
68         {
69                 if ( maxAgeMs < kMinMaxAgeMs )
70                 {
71                         fLog.warn ( "Max age in ms is less than the minimum. Overriding to " + kMinMaxAgeMs );
72                         maxAgeMs = kMinMaxAgeMs;
73                 }
74
75                 try {
76                         fSender = new Sender ( baseUrls, topic, maxBatchSize, maxAgeMs, compress );
77                 } catch (MalformedURLException e) {
78                         throw new IllegalArgumentException (e);
79                 }
80
81                 // FIXME: this strategy needs an overhaul -- why not just run a thread that knows how to wait for
82                 // the oldest msg to hit max age? (locking is complicated, but should be do-able)
83                 fExec = new ScheduledThreadPoolExecutor ( 1 );
84                 fExec.scheduleAtFixedRate ( fSender, 100, 50, TimeUnit.MILLISECONDS );
85         }
86
87         @Override
88         public void setApiCredentials ( String apiKey, String apiSecret )
89         {
90                 fSender.setApiCredentials ( apiKey, apiSecret );
91         }
92
93         @Override
94         public void clearApiCredentials ()
95         {
96                 fSender.clearApiCredentials ();
97         }
98
99         /**
100          * Send the given message with the given partition
101          * @param partition
102          * @param msg
103          * @throws IOException
104          */
105         @Override
106         public int send ( String partition, String msg ) throws IOException
107         {
108                 return send ( new message ( partition, msg ) );
109         }
110         @Override
111         public int send ( String msg ) throws IOException
112         {
113                 return send ( new message ( "",msg ) );
114         }
115         /**
116          * Send the given message
117          * @param userMsg a message
118          * @throws IOException
119          */
120         @Override
121         public int send ( message userMsg ) throws IOException
122         {
123                 final LinkedList<message> list = new LinkedList<message> ();
124                 list.add ( userMsg );
125                 return send ( list );
126         }
127
128         /**
129          * Send the given set of messages
130          * @param msgs the set of messages, sent in order of iteration
131          * @return the number of messages in the pending queue (this could actually be less than the size of the given collection, depending on thread timing)
132          * @throws IOException
133          */
134         @Override
135         public int send ( Collection<message> msgs ) throws IOException
136         {
137                 if ( msgs.isEmpty() )
138                 {
139                         fSender.queue ( msgs );
140                 }
141                 return fSender.size ();
142         }
143
144         @Override
145         public int getPendingMessageCount ()
146         {
147                 return fSender.size ();
148         }
149
150         /**
151          * Send any pending messages and close this publisher.
152          * @throws IOException
153          * @throws InterruptedException 
154          */
155         @Override
156         public void close ()
157         {
158                 try
159                 {
160                         final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS );
161                         if ( remains.isEmpty() )
162                         {
163                                 fLog.warn ( "Closing publisher with " + remains.size() + " messages unsent. "
164                                         + "(Consider using the alternate close method to capture unsent messages in this case.)" );
165                         }
166                 }
167                 catch ( InterruptedException e )
168                 {
169                         fLog.warn ( "Possible message loss. " + e.getMessage(), e );
170                         Thread.currentThread().interrupt();
171                 }
172                 catch ( IOException e )
173                 {
174                         fLog.warn ( "Possible message loss. " + e.getMessage(), e );
175                 }
176         }
177
178         public List<message> close ( long time, TimeUnit unit ) throws InterruptedException, IOException
179         {
180                 fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false );
181                 fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false );
182                 fExec.shutdown ();
183
184                 final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit );
185                 final long timeoutAtMs = System.currentTimeMillis () + waitInMs;
186                 while ( System.currentTimeMillis () < timeoutAtMs && getPendingMessageCount() > 0 )
187                 {
188                         fSender.checkSend ( true );
189                         Thread.sleep ( 250 );
190                 }
191
192                 final LinkedList<message> result = new LinkedList<message> ();
193                 fSender.drainTo ( result );
194                 return result;
195         }
196
197         private final ScheduledThreadPoolExecutor fExec;
198         private final Sender fSender;
199         
200         private static class TimestampedMessage extends message
201         {
202                 public TimestampedMessage ( message m )
203                 {
204                         super ( m );
205                         timestamp = System.currentTimeMillis ();
206                 }
207                 public final long timestamp;
208         }
209
210         private Logger fLog = LoggerFactory.getLogger ( MRBatchPublisher.class );
211
212         private class Sender extends MRBaseClient implements Runnable
213         {
214                 public Sender ( Collection<String> baseUrls, String topic, int maxBatch, long maxAgeMs, boolean compress ) throws MalformedURLException
215                 {
216                         super ( baseUrls );
217
218                         fNextBatch = new LinkedList<TimestampedMessage> ();
219                         fSendingBatch = null;
220                         fTopic = topic;
221                         fMaxBatchSize = maxBatch;
222                         fMaxAgeMs = maxAgeMs;
223                         fCompress = compress;
224                         fLock = new ReentrantReadWriteLock ();
225                         fWriteLock = fLock.writeLock ();
226                         fReadLock = fLock.readLock ();
227                         fDontSendUntilMs = 0;
228                 }
229
230                 public void drainTo ( LinkedList<message> list )
231                 {
232                         fWriteLock.lock ();
233                         try
234                         {
235                                 if ( fSendingBatch != null )
236                                 {
237                                         list.addAll ( fSendingBatch );
238                                 }
239                                 list.addAll ( fNextBatch );
240
241                                 fSendingBatch = null;
242                                 fNextBatch.clear ();
243                         }
244                         finally
245                         {
246                                 fWriteLock.unlock ();
247                         }
248                 }
249
250                 /**
251                  * Called periodically by the background executor.
252                  */
253                 @Override
254                 public void run ()
255                 {
256                         try
257                         {
258                                 checkSend ( false );
259                         }
260                         catch ( IOException e )
261                         {
262                                 fLog.warn ( "MR background send: " + e.getMessage () );
263                                 fLog.error( "IOException " + e );
264                         }
265                 }
266
267                 public int size ()
268                 {
269                         fReadLock.lock ();
270                         try
271                         {
272                                 return fNextBatch.size () + ( fSendingBatch == null ? 0 : fSendingBatch.size () );
273                         }
274                         finally
275                         {
276                                 fReadLock.unlock ();
277                         }
278                 }
279                 
280                 /**
281                  * Called to queue a message.
282                  * @param m
283                  * @throws IOException 
284                  */
285                 public void queue ( Collection<message> msgs ) throws IOException
286                 {
287                         fWriteLock.lock ();
288                         try
289                         {
290                                 for ( message userMsg : msgs )
291                                 {
292                                         if ( userMsg != null )
293                                         {
294                                                 fNextBatch.add ( new TimestampedMessage ( userMsg ) );
295                                         }
296                                         else
297                                         {
298                                                 fLog.warn ( "MRBatchPublisher::Sender::queue received a null message." );
299                                         }
300                                 }
301                         }
302                         finally
303                         {
304                                 fWriteLock.unlock();
305                         }
306                         checkSend ( false );
307                 }
308
309                 /**
310                  * Send a batch if the queue is long enough, or the first pending message is old enough.
311                  * @param force
312                  * @throws IOException
313                  */
314                 public void checkSend ( boolean force ) throws IOException
315                 {
316                         // hold a read lock just long enough to evaluate whether a batch
317                         // should be sent
318                         boolean shouldSend = false;
319                         fReadLock.lock ();
320                         try
321                         {
322                                 if ( fNextBatch.isEmpty() )
323                                 {
324                                         final long nowMs = System.currentTimeMillis ();
325                                         shouldSend = ( force || fNextBatch.size() >= fMaxBatchSize );
326                                         if ( !shouldSend )
327                                         {
328                                                 final long sendAtMs = fNextBatch.getFirst ().timestamp + fMaxAgeMs;
329                                                 shouldSend = sendAtMs <= nowMs;
330                                         }
331                                         
332                                         // however, unless forced, wait after an error
333                                         shouldSend = force || ( shouldSend && nowMs >= fDontSendUntilMs ); 
334                                 }
335                                 // else: even in 'force', there's nothing to send, so shouldSend=false is fine
336                         }
337                         finally
338                         {
339                                 fReadLock.unlock ();
340                         }
341
342                         // if a send is required, acquire a write lock, swap out the next batch,
343                         // swap in a fresh batch, and release the lock for the caller to start
344                         // filling a batch again. After releasing the lock, send the current
345                         // batch. (There could be more messages added between read unlock and
346                         // write lock, but that's fine.)
347                         if ( shouldSend )
348                         {
349                                 fSendingBatch = null;
350
351                                 fWriteLock.lock ();
352                                 try
353                                 {
354                                         fSendingBatch = fNextBatch;
355                                         fNextBatch = new LinkedList<TimestampedMessage> ();
356                                 }
357                                 finally
358                                 {
359                                         fWriteLock.unlock ();
360                                 }
361
362                                 if ( !doSend ( fSendingBatch, this, fTopic, fCompress, fLog ) )
363                                 {
364                                         fLog.warn ( "Send failed, rebuilding send queue." );
365
366                                         // note the time for back-off
367                                         fDontSendUntilMs = sfWaitAfterError + System.currentTimeMillis ();
368
369                                         // the send failed. reconstruct the pending queue
370                                         fWriteLock.lock ();
371                                         try
372                                         {
373                                                 final LinkedList<TimestampedMessage> nextGroup = fNextBatch;
374                                                 fNextBatch = fSendingBatch;
375                                                 fNextBatch.addAll ( nextGroup );
376                                                 fSendingBatch = null;
377                                                 fLog.info ( "Send queue rebuilt; " + fNextBatch.size () + " messages to send." );
378                                         }
379                                         finally
380                                         {
381                                                 fWriteLock.unlock ();
382                                         }
383                                 }
384                                 else
385                                 {
386                                         fWriteLock.lock ();
387                                         try
388                                         {
389                                                 fSendingBatch = null;
390                                         }
391                                         finally
392                                         {
393                                                 fWriteLock.unlock ();
394                                         }
395                                 }
396                         }
397                 }
398
399                 private LinkedList<TimestampedMessage> fNextBatch;
400                 private LinkedList<TimestampedMessage> fSendingBatch;
401                 private final String fTopic;
402                 private final int fMaxBatchSize;
403                 private final long fMaxAgeMs;
404                 private final boolean fCompress;
405                 private final ReentrantReadWriteLock fLock;
406                 private final WriteLock fWriteLock;
407                 private final ReadLock fReadLock;
408                 private long fDontSendUntilMs;
409                 private static final long sfWaitAfterError = 1000;
410         }
411
412         // this is static so that it's clearly not using any mutable member data outside of a lock
413         private static boolean doSend ( LinkedList<TimestampedMessage> toSend, HttpClient client, String topic, boolean compress, Logger log )
414         {
415                 // it's possible for this call to be made with an empty list. in this case, just return.
416                 if ( toSend.isEmpty()  )
417                 {
418                         return true;
419                 }
420
421                 final long nowMs = System.currentTimeMillis ();
422                 final String url = MRConstants.makeUrl ( topic );
423
424                 log.info ( "sending " + toSend.size() + " msgs to " + url + ". Oldest: " + ( nowMs - toSend.getFirst().timestamp ) + " ms"  );
425
426                 final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
427                 try
428                 {
429                         OutputStream os = baseStream;
430                         if ( compress )
431                         {
432                                 os = new GZIPOutputStream ( baseStream );
433                         }
434                         for ( TimestampedMessage m : toSend )
435                         {
436                                 os.write ( ( "" + m.fPartition.length () ).getBytes() );
437                                 os.write ( '.' );
438                                 os.write ( ( "" + m.fMsg.length () ).getBytes() );
439                                 os.write ( '.' );
440                                 os.write ( m.fPartition.getBytes() );
441                                 os.write ( m.fMsg.getBytes() );
442                                 os.write ( '\n' );
443                         }
444                         os.close ();
445                 }
446                 catch ( IOException e )
447                 {
448                         log.warn ( "Problem writing stream to post: " + e.getMessage (),e );
449                         return false;
450                 }
451
452                 boolean result = false;
453                 final long startMs = System.currentTimeMillis ();
454                 try
455                 {
456                         client.post ( url, compress ?
457                                 MRFormat.CAMBRIA_ZIP.toString () :
458                                 MRFormat.CAMBRIA.toString (),
459                                 baseStream.toByteArray(), false );
460                         result = true;
461                 }
462                 catch ( HttpException e )
463                 {
464                         log.warn ( "Problem posting to MR: " + e.getMessage(),e );
465                 }
466                 catch ( IOException e )
467                 {
468                         log.warn ( "Problem posting to MR: " + e.getMessage(),e );
469                 }
470
471                 log.info ( "MR response (" + (System.currentTimeMillis ()-startMs) + " ms): OK" );
472                 return result;
473         }
474
475         @Override
476         public void logTo ( Logger log )
477         {
478                 fLog = log;
479         }
480
481         @Override
482         public MRPublisherResponse sendBatchWithResponse() {
483                 // TODO Auto-generated method stub
484                 return null;
485         }
486         
487 }