Revert package name changes
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / nsa / cambria / metrics / publisher / impl / DMaaPCambriaSimplerBatchPublisher.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.cambria.metrics.publisher.impl;
23
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
28 import java.util.Collection;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.concurrent.LinkedBlockingQueue;
32 import java.util.concurrent.ScheduledThreadPoolExecutor;
33 import java.util.concurrent.TimeUnit;
34 import java.util.zip.GZIPOutputStream;
35
36 import javax.ws.rs.client.Client;
37 import javax.ws.rs.client.ClientBuilder;
38 import javax.ws.rs.client.Entity;
39 import javax.ws.rs.client.WebTarget;
40 import javax.ws.rs.core.Response;
41
42 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
43 import com.att.nsa.cambria.constants.CambriaConstants;
44 import com.att.nsa.cambria.metrics.publisher.CambriaPublisherUtility;
45
46 /**
47  * 
48  * class DMaaPCambriaSimplerBatchPublisher used to send the publish the messages
49  * in batch
50  * 
51  * @author author
52  *
53  */
54 public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
55                 implements com.att.nsa.cambria.metrics.publisher.CambriaBatchingPublisher {
56         /**
57          * 
58          * static inner class initializes with urls, topic,batchSize
59          * 
60          * @author author
61          *
62          */
63         public static class Builder {
64                 public Builder() {
65                 }
66
67                 /**
68                  * constructor initialize with url
69                  * 
70                  * @param baseUrls
71                  * @return
72                  * 
73                  */
74                 public Builder againstUrls(Collection<String> baseUrls) {
75                         fUrls = baseUrls;
76                         return this;
77                 }
78
79                 /**
80                  * constructor initializes with topics
81                  * 
82                  * @param topic
83                  * @return
84                  * 
85                  */
86                 public Builder onTopic(String topic) {
87                         fTopic = topic;
88                         return this;
89                 }
90
91                 /**
92                  * constructor initilazes with batch size and batch time
93                  * 
94                  * @param maxBatchSize
95                  * @param maxBatchAgeMs
96                  * @return
97                  * 
98                  */
99                 public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
100                         fMaxBatchSize = maxBatchSize;
101                         fMaxBatchAgeMs = maxBatchAgeMs;
102                         return this;
103                 }
104
105                 /**
106                  * constructor initializes with compress
107                  * 
108                  * @param compress
109                  * @return
110                  */
111                 public Builder compress(boolean compress) {
112                         fCompress = compress;
113                         return this;
114                 }
115
116                 /**
117                  * method returns DMaaPCambriaSimplerBatchPublisher object
118                  * 
119                  * @return
120                  */
121                 public DMaaPCambriaSimplerBatchPublisher build() {
122                         try {
123                                 return new DMaaPCambriaSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress);
124                         } catch (MalformedURLException e) {
125                                 throw new RuntimeException(e);
126                         }
127                 }
128
129                 private Collection<String> fUrls;
130                 private String fTopic;
131                 private int fMaxBatchSize = 100;
132                 private long fMaxBatchAgeMs = 1000;
133                 private boolean fCompress = false;
134         };
135
136         /**
137          * 
138          * @param partition
139          * @param msg
140          */
141         @Override
142         public int send(String partition, String msg) {
143                 return send(new message(partition, msg));
144         }
145
146         /**
147          * @param msg
148          */
149         @Override
150         public int send(message msg) {
151                 final LinkedList<message> list = new LinkedList<message>();
152                 list.add(msg);
153                 return send(list);
154         }
155
156         /**
157          * @param msgs
158          */
159         @Override
160         public synchronized int send(Collection<message> msgs) {
161                 if (fClosed) {
162                         throw new IllegalStateException("The publisher was closed.");
163                 }
164
165                 for (message userMsg : msgs) {
166                         fPending.add(new TimestampedMessage(userMsg));
167                 }
168                 return getPendingMessageCount();
169         }
170
171         /**
172          * getPending message count
173          */
174         @Override
175         public synchronized int getPendingMessageCount() {
176                 return fPending.size();
177         }
178
179         /**
180          * 
181          * @exception InterruptedException
182          * @exception IOException
183          */
184         @Override
185         public void close() {
186                 try {
187                         final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
188                         if (remains.size() > 0) {
189                                 getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
190                                                 + "Consider using CambriaBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
191                         }
192                 } catch (InterruptedException e) {
193                         getLog().warn("Possible message loss. " + e.getMessage(), e);
194                 } catch (IOException e) {
195                         getLog().warn("Possible message loss. " + e.getMessage(), e);
196                 }
197         }
198
199         /**
200          * @param time
201          * @param unit
202          */
203         @Override
204         public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
205                 synchronized (this) {
206                         fClosed = true;
207
208                         // stop the background sender
209                         fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
210                         fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
211                         fExec.shutdown();
212                 }
213
214                 final long now = Clock.now();
215                 final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
216                 final long timeoutAtMs = now + waitInMs;
217
218                 while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
219                         send(true);
220                         Thread.sleep(250);
221                 }
222                 // synchronizing the current object
223                 synchronized (this) {
224                         final LinkedList<message> result = new LinkedList<message>();
225                         fPending.drainTo(result);
226                         return result;
227                 }
228         }
229
230         /**
231          * Possibly send a batch to the cambria server. This is called by the
232          * background thread and the close() method
233          * 
234          * @param force
235          */
236         private synchronized void send(boolean force) {
237                 if (force || shouldSendNow()) {
238                         if (!sendBatch()) {
239                                 getLog().warn("Send failed, " + fPending.size() + " message to send.");
240
241                                 // note the time for back-off
242                                 fDontSendUntilMs = sfWaitAfterError + Clock.now();
243                         }
244                 }
245         }
246
247         /**
248          * 
249          * @return
250          */
251         private synchronized boolean shouldSendNow() {
252                 boolean shouldSend = false;
253                 if (fPending.size() > 0) {
254                         final long nowMs = Clock.now();
255
256                         shouldSend = (fPending.size() >= fMaxBatchSize);
257                         if (!shouldSend) {
258                                 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
259                                 shouldSend = sendAtMs <= nowMs;
260                         }
261
262                         // however, wait after an error
263                         shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
264                 }
265                 return shouldSend;
266         }
267
268         /**
269          * 
270          * @return
271          */
272         private synchronized boolean sendBatch() {
273                 // it's possible for this call to be made with an empty list. in this
274                 // case, just return.
275                 if (fPending.size() < 1) {
276                         return true;
277                 }
278
279                 final long nowMs = Clock.now();
280                 final String url = CambriaPublisherUtility.makeUrl(fTopic);
281
282                 getLog().info("sending " + fPending.size() + " msgs to " + url + ". Oldest: "
283                                 + (nowMs - fPending.peek().timestamp) + " ms");
284
285                 try {
286
287                         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
288                         OutputStream os = baseStream;
289                         if (fCompress) {
290                                 os = new GZIPOutputStream(baseStream);
291                         }
292                         for (TimestampedMessage m : fPending) {
293                                 os.write(("" + m.fPartition.length()).getBytes());
294                                 os.write('.');
295                                 os.write(("" + m.fMsg.length()).getBytes());
296                                 os.write('.');
297                                 os.write(m.fPartition.getBytes());
298                                 os.write(m.fMsg.getBytes());
299                                 os.write('\n');
300                         }
301                         os.close();
302
303                         final long startMs = Clock.now();
304
305                         // code from REST Client Starts
306
307                         // final String serverCalculatedSignature = sha1HmacSigner.sign
308                         // ("2015-09-21T11:38:19-0700", "iHAxArrj6Ve9JgmHvR077QiV");
309
310                         Client client = ClientBuilder.newClient();
311                         String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
312                          if (null==metricTopicname) {
313                                  
314                          metricTopicname="msgrtr.apinode.metrics.dmaap";
315                          }
316                         WebTarget target = client
317                                         .target("http://localhost:" + CambriaConstants.kStdCambriaServicePort);
318                         target = target.path("/events/" + fTopic);
319                         getLog().info("url : " + target.getUri().toString());
320                         // API Key
321
322                         Entity<byte[]> data = Entity.entity(baseStream.toByteArray(), "application/cambria");
323
324                         Response response = target.request().post(data);
325                         // header("X-CambriaAuth",
326                         // "2OH46YIWa329QpEF:"+serverCalculatedSignature).
327                         // header("X-CambriaDate", "2015-09-21T11:38:19-0700").
328                         // post(Entity.json(baseStream.toByteArray()));
329
330                         getLog().info("Response received :: " + response.getStatus());
331                         getLog().info("Response received :: " + response.toString());
332
333                         // code from REST Client Ends
334
335                         /*
336                          * final JSONObject result = post ( url, contentType,
337                          * baseStream.toByteArray(), true ); final String logLine =
338                          * "cambria reply ok (" + (Clock.now()-startMs) + " ms):" +
339                          * result.toString (); getLog().info ( logLine );
340                          */
341                         fPending.clear();
342                         return true;
343                 } catch (IllegalArgumentException x) {
344                         getLog().warn(x.getMessage(), x);
345                 }
346                 /*
347                  * catch ( HttpObjectNotFoundException x ) { getLog().warn (
348                  * x.getMessage(), x ); } catch ( HttpException x ) { getLog().warn (
349                  * x.getMessage(), x ); }
350                  */
351                 catch (IOException x) {
352                         getLog().warn(x.getMessage(), x);
353                 }
354                 return false;
355         }
356
357         private final String fTopic;
358         private final int fMaxBatchSize;
359         private final long fMaxBatchAgeMs;
360         private final boolean fCompress;
361         private boolean fClosed;
362
363         private final LinkedBlockingQueue<TimestampedMessage> fPending;
364         private long fDontSendUntilMs;
365         private final ScheduledThreadPoolExecutor fExec;
366
367         private static final long sfWaitAfterError = 1000;
368
369         /**
370          * 
371          * @param hosts
372          * @param topic
373          * @param maxBatchSize
374          * @param maxBatchAgeMs
375          * @param compress
376          * @throws MalformedURLException 
377          */
378         private DMaaPCambriaSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize,
379                         long maxBatchAgeMs, boolean compress) throws MalformedURLException {
380
381                 super(hosts);
382
383                 if (topic == null || topic.length() < 1) {
384                         throw new IllegalArgumentException("A topic must be provided.");
385                 }
386
387                 fClosed = false;
388                 fTopic = topic;
389                 fMaxBatchSize = maxBatchSize;
390                 fMaxBatchAgeMs = maxBatchAgeMs;
391                 fCompress = compress;
392
393                 fPending = new LinkedBlockingQueue<TimestampedMessage>();
394                 fDontSendUntilMs = 0;
395
396                 fExec = new ScheduledThreadPoolExecutor(1);
397                 fExec.scheduleAtFixedRate(new Runnable() {
398                         @Override
399                         public void run() {
400                                 send(false);
401                         }
402                 }, 100, 50, TimeUnit.MILLISECONDS);
403         }
404
405         /**
406          * 
407          * 
408          * @author author
409          *
410          */
411         private static class TimestampedMessage extends message {
412                 /**
413                  * to store timestamp value
414                  */
415                 public final long timestamp;
416
417                 /**
418                  * constructor initialize with message
419                  * 
420                  * @param m
421                  * 
422                  */
423                 public TimestampedMessage(message m) {
424                         super(m);
425                         timestamp = Clock.now();
426                 }
427         }
428
429 }