1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.dmf.mr.metrics.publisher.impl;
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
28 import java.util.Collection;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.concurrent.LinkedBlockingQueue;
32 import java.util.concurrent.ScheduledThreadPoolExecutor;
33 import java.util.concurrent.TimeUnit;
34 import java.util.zip.GZIPOutputStream;
36 import javax.ws.rs.client.Client;
37 import javax.ws.rs.client.ClientBuilder;
38 import javax.ws.rs.client.Entity;
39 import javax.ws.rs.client.WebTarget;
40 import javax.ws.rs.core.Response;
42 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
43 import com.att.dmf.mr.constants.CambriaConstants;
44 import com.att.dmf.mr.metrics.publisher.CambriaPublisherUtility;
48 * class DMaaPCambriaSimplerBatchPublisher used to send the publish the messages
51 * @author anowarul.islam
54 public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
55 implements com.att.dmf.mr.metrics.publisher.CambriaBatchingPublisher {
58 * static inner class initializes with urls, topic,batchSize
60 * @author anowarul.islam
63 public static class Builder {
68 * constructor initialize with url
74 public Builder againstUrls(Collection<String> baseUrls) {
80 * constructor initializes with topics
86 public Builder onTopic(String topic) {
92 * constructor initilazes with batch size and batch time
95 * @param maxBatchAgeMs
99 public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
100 fMaxBatchSize = maxBatchSize;
101 fMaxBatchAgeMs = maxBatchAgeMs;
106 * constructor initializes with compress
111 public Builder compress(boolean compress) {
112 fCompress = compress;
117 * method returns DMaaPCambriaSimplerBatchPublisher object
121 public DMaaPCambriaSimplerBatchPublisher build() {
124 return new DMaaPCambriaSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress);
125 } catch (MalformedURLException e) {
126 throw new RuntimeException(e);
130 private Collection<String> fUrls;
131 private String fTopic;
132 private int fMaxBatchSize = 100;
133 private long fMaxBatchAgeMs = 1000;
134 private boolean fCompress = false;
143 public int send(String partition, String msg) {
144 return send(new message(partition, msg));
151 public int send(message msg) {
152 final LinkedList<message> list = new LinkedList<message>();
161 public synchronized int send(Collection<message> msgs) {
163 throw new IllegalStateException("The publisher was closed.");
166 for (message userMsg : msgs) {
167 fPending.add(new TimestampedMessage(userMsg));
169 return getPendingMessageCount();
173 * getPending message count
176 public synchronized int getPendingMessageCount() {
177 return fPending.size();
182 * @exception InterruptedException
183 * @exception IOException
186 public void close() {
188 final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
189 if (remains.isEmpty()) {
190 getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
191 + "Consider using CambriaBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
193 } catch (InterruptedException e) {
194 getLog().warn("Possible message loss. " + e.getMessage(), e);
195 } catch (IOException e) {
196 getLog().warn("Possible message loss. " + e.getMessage(), e);
205 public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
206 synchronized (this) {
209 // stop the background sender
210 fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
211 fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
215 final long now = Clock.now();
216 final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
217 final long timeoutAtMs = now + waitInMs;
219 while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
223 // synchronizing the current object
224 synchronized (this) {
225 final LinkedList<message> result = new LinkedList<message>();
226 fPending.drainTo(result);
232 * Possibly send a batch to the cambria server. This is called by the
233 * background thread and the close() method
237 private synchronized void send(boolean force) {
238 if (force || shouldSendNow()) {
240 getLog().warn("Send failed, " + fPending.size() + " message to send.");
242 // note the time for back-off
243 fDontSendUntilMs = sfWaitAfterError + Clock.now();
252 private synchronized boolean shouldSendNow() {
253 boolean shouldSend = false;
254 if (fPending.isEmpty()) {
255 final long nowMs = Clock.now();
257 shouldSend = (fPending.size() >= fMaxBatchSize);
259 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
260 shouldSend = sendAtMs <= nowMs;
263 // however, wait after an error
264 shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
273 private synchronized boolean sendBatch() {
274 // it's possible for this call to be made with an empty list. in this
275 // case, just return.
276 if (fPending.isEmpty()) {
280 final long nowMs = Clock.now();
281 final String url = CambriaPublisherUtility.makeUrl(fTopic);
283 getLog().info("sending " + fPending.size() + " msgs to " + url + ". Oldest: "
284 + (nowMs - fPending.peek().timestamp) + " ms");
288 final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
289 OutputStream os = baseStream;
291 os = new GZIPOutputStream(baseStream);
293 for (TimestampedMessage m : fPending) {
294 os.write(("" + m.fPartition.length()).getBytes());
296 os.write(("" + m.fMsg.length()).getBytes());
298 os.write(m.fPartition.getBytes());
299 os.write(m.fMsg.getBytes());
304 final long startMs = Clock.now();
306 // code from REST Client Starts
311 Client client = ClientBuilder.newClient();
312 String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
313 if (null==metricTopicname) {
315 metricTopicname="msgrtr.apinode.metrics.dmaap";
317 WebTarget target = client
318 .target("http://localhost:" + CambriaConstants.kStdCambriaServicePort);
319 target = target.path("/events/" + fTopic);
320 getLog().info("url : " + target.getUri().toString());
323 Entity<byte[]> data = Entity.entity(baseStream.toByteArray(), "application/cambria");
325 Response response = target.request().post(data);
327 getLog().info("Response received :: " + response.getStatus());
328 getLog().info("Response received :: " + response.toString());
330 // code from REST Client Ends
335 } catch (IllegalArgumentException x) {
336 getLog().warn(x.getMessage(), x);
339 catch (IOException x) {
340 getLog().warn(x.getMessage(), x);
345 private final String fTopic;
346 private final int fMaxBatchSize;
347 private final long fMaxBatchAgeMs;
348 private final boolean fCompress;
349 private boolean fClosed;
351 private final LinkedBlockingQueue<TimestampedMessage> fPending;
352 private long fDontSendUntilMs;
353 private final ScheduledThreadPoolExecutor fExec;
355 private static final long sfWaitAfterError = 1000;
361 * @param maxBatchSize
362 * @param maxBatchAgeMs
365 private DMaaPCambriaSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize,
366 long maxBatchAgeMs, boolean compress) throws MalformedURLException {
370 if (topic == null || topic.length() < 1) {
371 throw new IllegalArgumentException("A topic must be provided.");
376 fMaxBatchSize = maxBatchSize;
377 fMaxBatchAgeMs = maxBatchAgeMs;
378 fCompress = compress;
380 fPending = new LinkedBlockingQueue<TimestampedMessage>();
381 fDontSendUntilMs = 0;
383 fExec = new ScheduledThreadPoolExecutor(1);
384 fExec.scheduleAtFixedRate(new Runnable() {
389 }, 100, 50, TimeUnit.MILLISECONDS);
395 * @author anowarul.islam
398 private static class TimestampedMessage extends message {
400 * to store timestamp value
402 public final long timestamp;
405 * constructor initialize with message
410 public TimestampedMessage(message m) {
412 timestamp = Clock.now();