bump the version
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / metrics / publisher / impl / DMaaPCambriaSimplerBatchPublisher.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.dmf.mr.metrics.publisher.impl;
23
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
28 import java.nio.channels.NotYetConnectedException;
29 import java.util.Collection;
30 import java.util.LinkedList;
31 import java.util.List;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.ScheduledThreadPoolExecutor;
34 import java.util.concurrent.TimeUnit;
35 import java.util.zip.GZIPOutputStream;
36
37 import javax.ws.rs.client.Client;
38 import javax.ws.rs.client.ClientBuilder;
39 import javax.ws.rs.client.Entity;
40 import javax.ws.rs.client.WebTarget;
41 import javax.ws.rs.core.Response;
42
43 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
44 import com.att.dmf.mr.constants.CambriaConstants;
45 import com.att.dmf.mr.metrics.publisher.CambriaPublisherUtility;
46
47 /**
48  * 
49  * class DMaaPCambriaSimplerBatchPublisher used to send the publish the messages
50  * in batch
51  * 
52  * @author anowarul.islam
53  *
54  */
55 public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
56                 implements com.att.dmf.mr.metrics.publisher.CambriaBatchingPublisher {
57         /**
58          * 
59          * static inner class initializes with urls, topic,batchSize
60          * 
61          * @author anowarul.islam
62          *
63          */
64         public static class Builder {
65                 public Builder() {
66                 }
67
68                 /**
69                  * constructor initialize with url
70                  * 
71                  * @param baseUrls
72                  * @return
73                  * 
74                  */
75                 public Builder againstUrls(Collection<String> baseUrls) {
76                         fUrls = baseUrls;
77                         return this;
78                 }
79
80                 /**
81                  * constructor initializes with topics
82                  * 
83                  * @param topic
84                  * @return
85                  * 
86                  */
87                 public Builder onTopic(String topic) {
88                         fTopic = topic;
89                         return this;
90                 }
91
92                 /**
93                  * constructor initilazes with batch size and batch time
94                  * 
95                  * @param maxBatchSize
96                  * @param maxBatchAgeMs
97                  * @return
98                  * 
99                  */
100                 public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
101                         fMaxBatchSize = maxBatchSize;
102                         fMaxBatchAgeMs = maxBatchAgeMs;
103                         return this;
104                 }
105
106                 /**
107                  * constructor initializes with compress
108                  * 
109                  * @param compress
110                  * @return
111                  */
112                 public Builder compress(boolean compress) {
113                         fCompress = compress;
114                         return this;
115                 }
116
117                 /**
118                  * method returns DMaaPCambriaSimplerBatchPublisher object
119                  * 
120                  * @return
121                  */
122                 public DMaaPCambriaSimplerBatchPublisher build()  {
123                         
124                         try {
125                         return new DMaaPCambriaSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress);
126                 } catch (MalformedURLException e) {
127                         
128                         NotYetConnectedException exception=new NotYetConnectedException();
129                         exception.setStackTrace(e.getStackTrace());
130                         
131                         throw exception ;
132                 
133                 }
134                 }
135
136                 private Collection<String> fUrls;
137                 private String fTopic;
138                 private int fMaxBatchSize = 100;
139                 private long fMaxBatchAgeMs = 1000;
140                 private boolean fCompress = false;
141         };
142
143         /**
144          * 
145          * @param partition
146          * @param msg
147          */
148         @Override
149         public int send(String partition, String msg) {
150                 return send(new message(partition, msg));
151         }
152
153         /**
154          * @param msg
155          */
156         @Override
157         public int send(message msg) {
158                 final LinkedList<message> list = new LinkedList<message>();
159                 list.add(msg);
160                 return send(list);
161         }
162
163         /**
164          * @param msgs
165          */
166         @Override
167         public synchronized int send(Collection<message> msgs) {
168                 if (fClosed) {
169                         throw new IllegalStateException("The publisher was closed.");
170                 }
171
172                 for (message userMsg : msgs) {
173                         fPending.add(new TimestampedMessage(userMsg));
174                 }
175                 return getPendingMessageCount();
176         }
177
178         /**
179          * getPending message count
180          */
181         @Override
182         public synchronized int getPendingMessageCount() {
183                 return fPending.size();
184         }
185
186         /**
187          * 
188          * @exception InterruptedException
189          * @exception IOException
190          */
191         @Override
192         public void close() {
193                 try {
194                         final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
195                         if (remains.isEmpty()) {
196                                 getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
197                                                 + "Consider using CambriaBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
198                         }
199                 } catch (InterruptedException e) {
200                         getLog().warn("Possible message loss. " + e.getMessage(), e);
201                 } catch (IOException e) {
202                         getLog().warn("Possible message loss. " + e.getMessage(), e);
203                 }
204         }
205
206         /**
207          * @param time
208          * @param unit
209          */
210         @Override
211         public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
212                 synchronized (this) {
213                         fClosed = true;
214
215                         // stop the background sender
216                         fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
217                         fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
218                         fExec.shutdown();
219                 }
220
221                 final long now = Clock.now();
222                 final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
223                 final long timeoutAtMs = now + waitInMs;
224
225                 while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
226                         send(true);
227                         Thread.sleep(250);
228                 }
229                 // synchronizing the current object
230                 synchronized (this) {
231                         final LinkedList<message> result = new LinkedList<message>();
232                         fPending.drainTo(result);
233                         return result;
234                 }
235         }
236
237         /**
238          * Possibly send a batch to the cambria server. This is called by the
239          * background thread and the close() method
240          * 
241          * @param force
242          */
243         private synchronized void send(boolean force) {
244                 if (force || shouldSendNow()) {
245                         if (!sendBatch()) {
246                                 getLog().warn("Send failed, " + fPending.size() + " message to send.");
247
248                                 // note the time for back-off
249                                 fDontSendUntilMs = sfWaitAfterError + Clock.now();
250                         }
251                 }
252         }
253
254         /**
255          * 
256          * @return
257          */
258         private synchronized boolean shouldSendNow() {
259                 boolean shouldSend = false;
260                 if (fPending.isEmpty()) {
261                         final long nowMs = Clock.now();
262
263                         shouldSend = (fPending.size() >= fMaxBatchSize);
264                         if (!shouldSend) {
265                                 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
266                                 shouldSend = sendAtMs <= nowMs;
267                         }
268
269                         // however, wait after an error
270                         shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
271                 }
272                 return shouldSend;
273         }
274
275         /**
276          * 
277          * @return
278          */
279         private synchronized boolean sendBatch() {
280                 // it's possible for this call to be made with an empty list. in this
281                 // case, just return.
282                 if (fPending.isEmpty()) {
283                         return true;
284                 }
285
286                 final long nowMs = Clock.now();
287                 final String url = CambriaPublisherUtility.makeUrl(fTopic);
288
289                 getLog().info("sending " + fPending.size() + " msgs to " + url + ". Oldest: "
290                                 + (nowMs - fPending.peek().timestamp) + " ms");
291
292                 try {
293
294                         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
295                         OutputStream os = baseStream;
296                         if (fCompress) {
297                                 os = new GZIPOutputStream(baseStream);
298                         }
299                         for (TimestampedMessage m : fPending) {
300                                 os.write(("" + m.fPartition.length()).getBytes());
301                                 os.write('.');
302                                 os.write(("" + m.fMsg.length()).getBytes());
303                                 os.write('.');
304                                 os.write(m.fPartition.getBytes());
305                                 os.write(m.fMsg.getBytes());
306                                 os.write('\n');
307                         }
308                         os.close();
309
310                         final long startMs = Clock.now();
311
312                         // code from REST Client Starts
313
314                         
315                         
316
317                         Client client = ClientBuilder.newClient();
318                         String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
319                          if (null==metricTopicname) {
320                                  
321                          metricTopicname="msgrtr.apinode.metrics.dmaap";
322                          }
323                         WebTarget target = client
324                                         .target("http://localhost:" + CambriaConstants.kStdCambriaServicePort);
325                         target = target.path("/events/" + fTopic);
326                         getLog().info("url : " + target.getUri().toString());
327                         // API Key
328
329                         Entity<byte[]> data = Entity.entity(baseStream.toByteArray(), "application/cambria");
330
331                         Response response = target.request().post(data);
332                         
333                         getLog().info("Response received :: " + response.getStatus());
334                         getLog().info("Response received :: " + response.toString());
335
336                         // code from REST Client Ends
337
338                         
339                         fPending.clear();
340                         return true;
341                 } catch (IllegalArgumentException x) {
342                         getLog().warn(x.getMessage(), x);
343                 }
344                 
345                 catch (IOException x) {
346                         getLog().warn(x.getMessage(), x);
347                 }
348                 return false;
349         }
350
351         private final String fTopic;
352         private final int fMaxBatchSize;
353         private final long fMaxBatchAgeMs;
354         private final boolean fCompress;
355         private boolean fClosed;
356
357         private final LinkedBlockingQueue<TimestampedMessage> fPending;
358         private long fDontSendUntilMs;
359         private final ScheduledThreadPoolExecutor fExec;
360
361         private static final long sfWaitAfterError = 1000;
362
363         /**
364          * 
365          * @param hosts
366          * @param topic
367          * @param maxBatchSize
368          * @param maxBatchAgeMs
369          * @param compress
370          */
371         private DMaaPCambriaSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize,
372                         long maxBatchAgeMs, boolean compress) throws MalformedURLException {
373
374                 super(hosts);
375
376                 if (topic == null || topic.length() < 1) {
377                         throw new IllegalArgumentException("A topic must be provided.");
378                 }
379
380                 fClosed = false;
381                 fTopic = topic;
382                 fMaxBatchSize = maxBatchSize;
383                 fMaxBatchAgeMs = maxBatchAgeMs;
384                 fCompress = compress;
385
386                 fPending = new LinkedBlockingQueue<TimestampedMessage>();
387                 fDontSendUntilMs = 0;
388
389                 fExec = new ScheduledThreadPoolExecutor(1);
390                 fExec.scheduleAtFixedRate(new Runnable() {
391                         @Override
392                         public void run() {
393                                 send(false);
394                         }
395                 }, 100, 50, TimeUnit.MILLISECONDS);
396         }
397
398         /**
399          * 
400          * 
401          * @author anowarul.islam
402          *
403          */
404         private static class TimestampedMessage extends message {
405                 /**
406                  * to store timestamp value
407                  */
408                 public final long timestamp;
409
410                 /**
411                  * constructor initialize with message
412                  * 
413                  * @param m
414                  * 
415                  */
416                 public TimestampedMessage(message m) {
417                         super(m);
418                         timestamp = Clock.now();
419                 }
420         }
421
422 }