2b9bad49d7bd6c3ca6231e94f7498b1a4601e704
[dmaap/messagerouter/msgrtr.git] /
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.metrics.publisher.impl;
23
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
28 import java.util.Collection;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.concurrent.LinkedBlockingQueue;
32 import java.util.concurrent.ScheduledThreadPoolExecutor;
33 import java.util.concurrent.TimeUnit;
34 import java.util.zip.GZIPOutputStream;
35
36 import javax.ws.rs.client.Client;
37 import javax.ws.rs.client.ClientBuilder;
38 import javax.ws.rs.client.Entity;
39 import javax.ws.rs.client.WebTarget;
40 import javax.ws.rs.core.Response;
41
42 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.constants.CambriaConstants;
43 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.metrics.publisher.CambriaPublisherUtility;
44
45 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
46
47 /**
48  * 
49  * class DMaaPCambriaSimplerBatchPublisher used to send the publish the messages
50  * in batch
51  * 
52  * @author author
53  *
54  */
55 public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
56                 implements org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.metrics.publisher.CambriaBatchingPublisher {
57         /**
58          * 
59          * static inner class initializes with urls, topic,batchSize
60          * 
61          * @author author
62          *
63          */
64         public static class Builder {
65                 public Builder() {
66                 }
67
68                 /**
69                  * constructor initialize with url
70                  * 
71                  * @param baseUrls
72                  * @return
73                  * 
74                  */
75                 public Builder againstUrls(Collection<String> baseUrls) {
76                         fUrls = baseUrls;
77                         return this;
78                 }
79
80                 /**
81                  * constructor initializes with topics
82                  * 
83                  * @param topic
84                  * @return
85                  * 
86                  */
87                 public Builder onTopic(String topic) {
88                         fTopic = topic;
89                         return this;
90                 }
91
92                 /**
93                  * constructor initilazes with batch size and batch time
94                  * 
95                  * @param maxBatchSize
96                  * @param maxBatchAgeMs
97                  * @return
98                  * 
99                  */
100                 public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
101                         fMaxBatchSize = maxBatchSize;
102                         fMaxBatchAgeMs = maxBatchAgeMs;
103                         return this;
104                 }
105
106                 /**
107                  * constructor initializes with compress
108                  * 
109                  * @param compress
110                  * @return
111                  */
112                 public Builder compress(boolean compress) {
113                         fCompress = compress;
114                         return this;
115                 }
116
117                 /**
118                  * method returns DMaaPCambriaSimplerBatchPublisher object
119                  * 
120                  * @return
121                  */
122                 public DMaaPCambriaSimplerBatchPublisher build() {
123                         try {
124                                 return new DMaaPCambriaSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress);
125                         } catch (MalformedURLException e) {
126                                 throw new RuntimeException(e);
127                         }
128                 }
129
130                 private Collection<String> fUrls;
131                 private String fTopic;
132                 private int fMaxBatchSize = 100;
133                 private long fMaxBatchAgeMs = 1000;
134                 private boolean fCompress = false;
135         };
136
137         /**
138          * 
139          * @param partition
140          * @param msg
141          */
142         @Override
143         public int send(String partition, String msg) {
144                 return send(new message(partition, msg));
145         }
146
147         /**
148          * @param msg
149          */
150         @Override
151         public int send(message msg) {
152                 final LinkedList<message> list = new LinkedList<message>();
153                 list.add(msg);
154                 return send(list);
155         }
156
157         /**
158          * @param msgs
159          */
160         @Override
161         public synchronized int send(Collection<message> msgs) {
162                 if (fClosed) {
163                         throw new IllegalStateException("The publisher was closed.");
164                 }
165
166                 for (message userMsg : msgs) {
167                         fPending.add(new TimestampedMessage(userMsg));
168                 }
169                 return getPendingMessageCount();
170         }
171
172         /**
173          * getPending message count
174          */
175         @Override
176         public synchronized int getPendingMessageCount() {
177                 return fPending.size();
178         }
179
180         /**
181          * 
182          * @exception InterruptedException
183          * @exception IOException
184          */
185         @Override
186         public void close() {
187                 try {
188                         final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
189                         if (remains.size() > 0) {
190                                 getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
191                                                 + "Consider using CambriaBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
192                         }
193                 } catch (InterruptedException e) {
194                         getLog().warn("Possible message loss. " + e.getMessage(), e);
195                 } catch (IOException e) {
196                         getLog().warn("Possible message loss. " + e.getMessage(), e);
197                 }
198         }
199
200         /**
201          * @param time
202          * @param unit
203          */
204         @Override
205         public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
206                 synchronized (this) {
207                         fClosed = true;
208
209                         // stop the background sender
210                         fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
211                         fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
212                         fExec.shutdown();
213                 }
214
215                 final long now = Clock.now();
216                 final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
217                 final long timeoutAtMs = now + waitInMs;
218
219                 while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
220                         send(true);
221                         Thread.sleep(250);
222                 }
223                 // synchronizing the current object
224                 synchronized (this) {
225                         final LinkedList<message> result = new LinkedList<message>();
226                         fPending.drainTo(result);
227                         return result;
228                 }
229         }
230
231         /**
232          * Possibly send a batch to the cambria server. This is called by the
233          * background thread and the close() method
234          * 
235          * @param force
236          */
237         private synchronized void send(boolean force) {
238                 if (force || shouldSendNow()) {
239                         if (!sendBatch()) {
240                                 getLog().warn("Send failed, " + fPending.size() + " message to send.");
241
242                                 // note the time for back-off
243                                 fDontSendUntilMs = sfWaitAfterError + Clock.now();
244                         }
245                 }
246         }
247
248         /**
249          * 
250          * @return
251          */
252         private synchronized boolean shouldSendNow() {
253                 boolean shouldSend = false;
254                 if (fPending.size() > 0) {
255                         final long nowMs = Clock.now();
256
257                         shouldSend = (fPending.size() >= fMaxBatchSize);
258                         if (!shouldSend) {
259                                 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
260                                 shouldSend = sendAtMs <= nowMs;
261                         }
262
263                         // however, wait after an error
264                         shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
265                 }
266                 return shouldSend;
267         }
268
269         /**
270          * 
271          * @return
272          */
273         private synchronized boolean sendBatch() {
274                 // it's possible for this call to be made with an empty list. in this
275                 // case, just return.
276                 if (fPending.size() < 1) {
277                         return true;
278                 }
279
280                 final long nowMs = Clock.now();
281                 final String url = CambriaPublisherUtility.makeUrl(fTopic);
282
283                 getLog().info("sending " + fPending.size() + " msgs to " + url + ". Oldest: "
284                                 + (nowMs - fPending.peek().timestamp) + " ms");
285
286                 try {
287
288                         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
289                         OutputStream os = baseStream;
290                         if (fCompress) {
291                                 os = new GZIPOutputStream(baseStream);
292                         }
293                         for (TimestampedMessage m : fPending) {
294                                 os.write(("" + m.fPartition.length()).getBytes());
295                                 os.write('.');
296                                 os.write(("" + m.fMsg.length()).getBytes());
297                                 os.write('.');
298                                 os.write(m.fPartition.getBytes());
299                                 os.write(m.fMsg.getBytes());
300                                 os.write('\n');
301                         }
302                         os.close();
303
304                         final long startMs = Clock.now();
305
306                         // code from REST Client Starts
307
308                         // final String serverCalculatedSignature = sha1HmacSigner.sign
309                         // ("2015-09-21T11:38:19-0700", "iHAxArrj6Ve9JgmHvR077QiV");
310
311                         Client client = ClientBuilder.newClient();
312                         String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
313                          if (null==metricTopicname) {
314                                  
315                          metricTopicname="msgrtr.apinode.metrics.dmaap";
316                          }
317                         WebTarget target = client
318                                         .target("http://localhost:" + CambriaConstants.kStdCambriaServicePort);
319                         target = target.path("/events/" + fTopic);
320                         getLog().info("url : " + target.getUri().toString());
321                         // API Key
322
323                         Entity<byte[]> data = Entity.entity(baseStream.toByteArray(), "application/cambria");
324
325                         Response response = target.request().post(data);
326                         // header("X-CambriaAuth",
327                         // "2OH46YIWa329QpEF:"+serverCalculatedSignature).
328                         // header("X-CambriaDate", "2015-09-21T11:38:19-0700").
329                         // post(Entity.json(baseStream.toByteArray()));
330
331                         getLog().info("Response received :: " + response.getStatus());
332                         getLog().info("Response received :: " + response.toString());
333
334                         // code from REST Client Ends
335
336                         /*
337                          * final JSONObject result = post ( url, contentType,
338                          * baseStream.toByteArray(), true ); final String logLine =
339                          * "cambria reply ok (" + (Clock.now()-startMs) + " ms):" +
340                          * result.toString (); getLog().info ( logLine );
341                          */
342                         fPending.clear();
343                         return true;
344                 } catch (IllegalArgumentException x) {
345                         getLog().warn(x.getMessage(), x);
346                 }
347                 /*
348                  * catch ( HttpObjectNotFoundException x ) { getLog().warn (
349                  * x.getMessage(), x ); } catch ( HttpException x ) { getLog().warn (
350                  * x.getMessage(), x ); }
351                  */
352                 catch (IOException x) {
353                         getLog().warn(x.getMessage(), x);
354                 }
355                 return false;
356         }
357
358         private final String fTopic;
359         private final int fMaxBatchSize;
360         private final long fMaxBatchAgeMs;
361         private final boolean fCompress;
362         private boolean fClosed;
363
364         private final LinkedBlockingQueue<TimestampedMessage> fPending;
365         private long fDontSendUntilMs;
366         private final ScheduledThreadPoolExecutor fExec;
367
368         private static final long sfWaitAfterError = 1000;
369
370         /**
371          * 
372          * @param hosts
373          * @param topic
374          * @param maxBatchSize
375          * @param maxBatchAgeMs
376          * @param compress
377          * @throws MalformedURLException 
378          */
379         private DMaaPCambriaSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize,
380                         long maxBatchAgeMs, boolean compress) throws MalformedURLException {
381
382                 super(hosts);
383
384                 if (topic == null || topic.length() < 1) {
385                         throw new IllegalArgumentException("A topic must be provided.");
386                 }
387
388                 fClosed = false;
389                 fTopic = topic;
390                 fMaxBatchSize = maxBatchSize;
391                 fMaxBatchAgeMs = maxBatchAgeMs;
392                 fCompress = compress;
393
394                 fPending = new LinkedBlockingQueue<TimestampedMessage>();
395                 fDontSendUntilMs = 0;
396
397                 fExec = new ScheduledThreadPoolExecutor(1);
398                 fExec.scheduleAtFixedRate(new Runnable() {
399                         @Override
400                         public void run() {
401                                 send(false);
402                         }
403                 }, 100, 50, TimeUnit.MILLISECONDS);
404         }
405
406         /**
407          * 
408          * 
409          * @author author
410          *
411          */
412         private static class TimestampedMessage extends message {
413                 /**
414                  * to store timestamp value
415                  */
416                 public final long timestamp;
417
418                 /**
419                  * constructor initialize with message
420                  * 
421                  * @param m
422                  * 
423                  */
424                 public TimestampedMessage(message m) {
425                         super(m);
426                         timestamp = Clock.now();
427                 }
428         }
429
430 }