revert few sonar fixes
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / metrics / publisher / impl / DMaaPCambriaSimplerBatchPublisher.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.metrics.publisher.impl;
23
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
28 import java.nio.channels.NotYetConnectedException;
29 import java.util.Collection;
30 import java.util.LinkedList;
31 import java.util.List;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.ScheduledThreadPoolExecutor;
34 import java.util.concurrent.TimeUnit;
35 import java.util.zip.GZIPOutputStream;
36
37 import javax.ws.rs.client.Client;
38 import javax.ws.rs.client.ClientBuilder;
39 import javax.ws.rs.client.Entity;
40 import javax.ws.rs.client.WebTarget;
41 import javax.ws.rs.core.Response;
42
43 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
44 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
45 import org.onap.dmaap.dmf.mr.metrics.publisher.CambriaPublisherUtility;
46
47 /**
48  * 
49  * class DMaaPCambriaSimplerBatchPublisher used to send the publish the messages
50  * in batch
51  * 
52  * @author anowarul.islam
53  *
54  */
55 public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
56                 implements org.onap.dmaap.dmf.mr.metrics.publisher.CambriaBatchingPublisher {
57         /**
58          * 
59          * static inner class initializes with urls, topic,batchSize
60          * 
61          * @author anowarul.islam
62          *
63          */
64         public static class Builder {
65                 public Builder() {
66                 }
67
68                 /**
69                  * constructor initialize with url
70                  * 
71                  * @param baseUrls
72                  * @return
73                  * 
74                  */
75                 public Builder againstUrls(Collection<String> baseUrls) {
76                         fUrls = baseUrls;
77                         return this;
78                 }
79
80                 /**
81                  * constructor initializes with topics
82                  * 
83                  * @param topic
84                  * @return
85                  * 
86                  */
87                 public Builder onTopic(String topic) {
88                         fTopic = topic;
89                         return this;
90                 }
91
92                 /**
93                  * constructor initilazes with batch size and batch time
94                  * 
95                  * @param maxBatchSize
96                  * @param maxBatchAgeMs
97                  * @return
98                  * 
99                  */
100                 public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
101                         fMaxBatchSize = maxBatchSize;
102                         fMaxBatchAgeMs = maxBatchAgeMs;
103                         return this;
104                 }
105
106                 /**
107                  * constructor initializes with compress
108                  * 
109                  * @param compress
110                  * @return
111                  */
112                 public Builder compress(boolean compress) {
113                         fCompress = compress;
114                         return this;
115                 }
116
117                 /**
118                  * method returns DMaaPCambriaSimplerBatchPublisher object
119                  * 
120                  * @return
121                  */
122                 public DMaaPCambriaSimplerBatchPublisher build()  {
123                         
124                         try {
125                         return new DMaaPCambriaSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress);
126                 } catch (MalformedURLException e) {
127                         
128                         NotYetConnectedException exception=new NotYetConnectedException();
129                         exception.setStackTrace(e.getStackTrace());
130                         
131                         throw exception ;
132                 
133                 }
134                 }
135
136                 private Collection<String> fUrls;
137                 private String fTopic;
138                 private int fMaxBatchSize = 100;
139                 private long fMaxBatchAgeMs = 1000;
140                 private boolean fCompress = false;
141         };
142
143         /**
144          * 
145          * @param partition
146          * @param msg
147          */
148         @Override
149         public int send(String partition, String msg) {
150                 return send(new message(partition, msg));
151         }
152
153         /**
154          * @param msg
155          */
156         @Override
157         public int send(message msg) {
158                 final LinkedList<message> list = new LinkedList<message>();
159                 list.add(msg);
160                 return send(list);
161         }
162
163         /**
164          * @param msgs
165          */
166         @Override
167         public synchronized int send(Collection<message> msgs) {
168                 if (fClosed) {
169                         throw new IllegalStateException("The publisher was closed.");
170                 }
171
172                 for (message userMsg : msgs) {
173                         fPending.add(new TimestampedMessage(userMsg));
174                 }
175                 return getPendingMessageCount();
176         }
177
178         /**
179          * getPending message count
180          */
181         @Override
182         public synchronized int getPendingMessageCount() {
183                 return fPending.size();
184         }
185
186         /**
187          * 
188          * @exception InterruptedException
189          * @exception IOException
190          */
191         @Override
192         public void close() {
193                 try {
194                         final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
195                         if (remains.isEmpty()) {
196                                 getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
197                                                 + "Consider using CambriaBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
198                         }
199                 } catch (InterruptedException e) {
200                         getLog().warn("Possible message loss. " + e.getMessage(), e);
201                         //Thread.currentThread().interrupt();
202                 } catch (IOException e) {
203                         getLog().warn("Possible message loss. " + e.getMessage(), e);
204                 }
205         }
206
207         /**
208          * @param time
209          * @param unit
210          */
211         @Override
212         public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
213                 synchronized (this) {
214                         fClosed = true;
215
216                         // stop the background sender
217                         fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
218                         fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
219                         fExec.shutdown();
220                 }
221
222                 final long now = Clock.now();
223                 final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
224                 final long timeoutAtMs = now + waitInMs;
225
226                 while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
227                         send(true);
228                         Thread.sleep(250);
229                 }
230                 // synchronizing the current object
231                 synchronized (this) {
232                         final LinkedList<message> result = new LinkedList<message>();
233                         fPending.drainTo(result);
234                         return result;
235                 }
236         }
237
238         /**
239          * Possibly send a batch to the cambria server. This is called by the
240          * background thread and the close() method
241          * 
242          * @param force
243          */
244         private synchronized void send(boolean force) {
245                 if (force || shouldSendNow()) {
246                         if (!sendBatch()) {
247                                 getLog().warn("Send failed, " + fPending.size() + " message to send.");
248
249                                 // note the time for back-off
250                                 fDontSendUntilMs = sfWaitAfterError + Clock.now();
251                         }
252                 }
253         }
254
255         /**
256          * 
257          * @return
258          */
259         private synchronized boolean shouldSendNow() {
260                 boolean shouldSend = false;
261                 if (fPending.isEmpty()) {
262                         final long nowMs = Clock.now();
263
264                         shouldSend = (fPending.size() >= fMaxBatchSize);
265                         if (!shouldSend) {
266                                 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
267                                 shouldSend = sendAtMs <= nowMs;
268                         }
269
270                         // however, wait after an error
271                         shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
272                 }
273                 return shouldSend;
274         }
275
276         /**
277          * 
278          * @return
279          */
280         private synchronized boolean sendBatch() {
281                 // it's possible for this call to be made with an empty list. in this
282                 // case, just return.
283                 if (fPending.isEmpty()) {
284                         return true;
285                 }
286
287                 final long nowMs = Clock.now();
288                 final String url = CambriaPublisherUtility.makeUrl(fTopic);
289
290                 getLog().info("sending " + fPending.size() + " msgs to " + url + ". Oldest: "
291                                 + (nowMs - fPending.peek().timestamp) + " ms");
292
293                 try {
294
295                         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
296                         OutputStream os = baseStream;
297                         if (fCompress) {
298                                 os = new GZIPOutputStream(baseStream);
299                         }
300                         for (TimestampedMessage m : fPending) {
301                                 os.write(("" + m.fPartition.length()).getBytes());
302                                 os.write('.');
303                                 os.write(("" + m.fMsg.length()).getBytes());
304                                 os.write('.');
305                                 os.write(m.fPartition.getBytes());
306                                 os.write(m.fMsg.getBytes());
307                                 os.write('\n');
308                         }
309                         os.close();
310
311                         final long startMs = Clock.now();
312
313                         // code from REST Client Starts
314
315                         
316                         
317
318                         Client client = ClientBuilder.newClient();
319                         String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
320                          if (null==metricTopicname) {
321                                  
322                          metricTopicname="msgrtr.apinode.metrics.dmaap";
323                          }
324                         WebTarget target = client
325                                         .target("http://localhost:" + CambriaConstants.kStdCambriaServicePort);
326                         target = target.path("/events/" + fTopic);
327                         getLog().info("url : " + target.getUri().toString());
328                         // API Key
329
330                         Entity<byte[]> data = Entity.entity(baseStream.toByteArray(), "application/cambria");
331
332                         Response response = target.request().post(data);
333                         
334                         getLog().info("Response received :: " + response.getStatus());
335                         getLog().info("Response received :: " + response.toString());
336
337                         // code from REST Client Ends
338
339                         
340                         fPending.clear();
341                         return true;
342                 } catch (IllegalArgumentException x) {
343                         getLog().warn(x.getMessage(), x);
344                 }
345                 
346                 catch (IOException x) {
347                         getLog().warn(x.getMessage(), x);
348                 }
349                 return false;
350         }
351
352         private final String fTopic;
353         private final int fMaxBatchSize;
354         private final long fMaxBatchAgeMs;
355         private final boolean fCompress;
356         private boolean fClosed;
357
358         private final LinkedBlockingQueue<TimestampedMessage> fPending;
359         private long fDontSendUntilMs;
360         private final ScheduledThreadPoolExecutor fExec;
361
362         private static final long sfWaitAfterError = 1000;
363
364         /**
365          * 
366          * @param hosts
367          * @param topic
368          * @param maxBatchSize
369          * @param maxBatchAgeMs
370          * @param compress
371          */
372         private DMaaPCambriaSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize,
373                         long maxBatchAgeMs, boolean compress) throws MalformedURLException {
374
375                 super(hosts);
376
377                 if (topic == null || topic.length() < 1) {
378                         throw new IllegalArgumentException("A topic must be provided.");
379                 }
380
381                 fClosed = false;
382                 fTopic = topic;
383                 fMaxBatchSize = maxBatchSize;
384                 fMaxBatchAgeMs = maxBatchAgeMs;
385                 fCompress = compress;
386
387                 fPending = new LinkedBlockingQueue<TimestampedMessage>();
388                 fDontSendUntilMs = 0;
389
390                 fExec = new ScheduledThreadPoolExecutor(1);
391                 fExec.scheduleAtFixedRate(new Runnable() {
392                         @Override
393                         public void run() {
394                                 send(false);
395                         }
396                 }, 100, 50, TimeUnit.MILLISECONDS);
397         }
398
399         /**
400          * 
401          * 
402          * @author anowarul.islam
403          *
404          */
405         private static class TimestampedMessage extends message {
406                 /**
407                  * to store timestamp value
408                  */
409                 public final long timestamp;
410
411                 /**
412                  * constructor initialize with message
413                  * 
414                  * @param m
415                  * 
416                  */
417                 public TimestampedMessage(message m) {
418                         super(m);
419                         timestamp = Clock.now();
420                 }
421         }
422
423 }