1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.cambria.metrics.publisher.impl;
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
28 import java.util.Collection;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.concurrent.LinkedBlockingQueue;
32 import java.util.concurrent.ScheduledThreadPoolExecutor;
33 import java.util.concurrent.TimeUnit;
34 import java.util.zip.GZIPOutputStream;
36 import javax.ws.rs.client.Client;
37 import javax.ws.rs.client.ClientBuilder;
38 import javax.ws.rs.client.Entity;
39 import javax.ws.rs.client.WebTarget;
40 import javax.ws.rs.core.Response;
42 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
43 import com.att.nsa.cambria.constants.CambriaConstants;
44 import com.att.nsa.cambria.metrics.publisher.CambriaPublisherUtility;
48 * class DMaaPCambriaSimplerBatchPublisher used to send the publish the messages
54 public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
55 implements com.att.nsa.cambria.metrics.publisher.CambriaBatchingPublisher {
58 * static inner class initializes with urls, topic,batchSize
63 public static class Builder {
68 * constructor initialize with url
74 public Builder againstUrls(Collection<String> baseUrls) {
80 * constructor initializes with topics
86 public Builder onTopic(String topic) {
92 * constructor initilazes with batch size and batch time
95 * @param maxBatchAgeMs
99 public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
100 fMaxBatchSize = maxBatchSize;
101 fMaxBatchAgeMs = maxBatchAgeMs;
106 * constructor initializes with compress
111 public Builder compress(boolean compress) {
112 fCompress = compress;
117 * method returns DMaaPCambriaSimplerBatchPublisher object
121 public DMaaPCambriaSimplerBatchPublisher build() {
123 return new DMaaPCambriaSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress);
124 } catch (MalformedURLException e) {
125 throw new RuntimeException(e);
129 private Collection<String> fUrls;
130 private String fTopic;
131 private int fMaxBatchSize = 100;
132 private long fMaxBatchAgeMs = 1000;
133 private boolean fCompress = false;
142 public int send(String partition, String msg) {
143 return send(new message(partition, msg));
150 public int send(message msg) {
151 final LinkedList<message> list = new LinkedList<message>();
160 public synchronized int send(Collection<message> msgs) {
162 throw new IllegalStateException("The publisher was closed.");
165 for (message userMsg : msgs) {
166 fPending.add(new TimestampedMessage(userMsg));
168 return getPendingMessageCount();
172 * getPending message count
175 public synchronized int getPendingMessageCount() {
176 return fPending.size();
181 * @exception InterruptedException
182 * @exception IOException
185 public void close() {
187 final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
188 if (remains.size() > 0) {
189 getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
190 + "Consider using CambriaBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
192 } catch (InterruptedException e) {
193 getLog().warn("Possible message loss. " + e.getMessage(), e);
194 } catch (IOException e) {
195 getLog().warn("Possible message loss. " + e.getMessage(), e);
204 public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
205 synchronized (this) {
208 // stop the background sender
209 fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
210 fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
214 final long now = Clock.now();
215 final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
216 final long timeoutAtMs = now + waitInMs;
218 while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
222 // synchronizing the current object
223 synchronized (this) {
224 final LinkedList<message> result = new LinkedList<message>();
225 fPending.drainTo(result);
231 * Possibly send a batch to the cambria server. This is called by the
232 * background thread and the close() method
236 private synchronized void send(boolean force) {
237 if (force || shouldSendNow()) {
239 getLog().warn("Send failed, " + fPending.size() + " message to send.");
241 // note the time for back-off
242 fDontSendUntilMs = sfWaitAfterError + Clock.now();
251 private synchronized boolean shouldSendNow() {
252 boolean shouldSend = false;
253 if (fPending.size() > 0) {
254 final long nowMs = Clock.now();
256 shouldSend = (fPending.size() >= fMaxBatchSize);
258 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
259 shouldSend = sendAtMs <= nowMs;
262 // however, wait after an error
263 shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
272 private synchronized boolean sendBatch() {
273 // it's possible for this call to be made with an empty list. in this
274 // case, just return.
275 if (fPending.size() < 1) {
279 final long nowMs = Clock.now();
280 final String url = CambriaPublisherUtility.makeUrl(fTopic);
282 getLog().info("sending " + fPending.size() + " msgs to " + url + ". Oldest: "
283 + (nowMs - fPending.peek().timestamp) + " ms");
287 final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
288 OutputStream os = baseStream;
290 os = new GZIPOutputStream(baseStream);
292 for (TimestampedMessage m : fPending) {
293 os.write(("" + m.fPartition.length()).getBytes());
295 os.write(("" + m.fMsg.length()).getBytes());
297 os.write(m.fPartition.getBytes());
298 os.write(m.fMsg.getBytes());
303 final long startMs = Clock.now();
305 // code from REST Client Starts
307 // final String serverCalculatedSignature = sha1HmacSigner.sign
308 // ("2015-09-21T11:38:19-0700", "iHAxArrj6Ve9JgmHvR077QiV");
310 Client client = ClientBuilder.newClient();
311 String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
312 if (null==metricTopicname) {
314 metricTopicname="msgrtr.apinode.metrics.dmaap";
316 WebTarget target = client
317 .target("http://localhost:" + CambriaConstants.kStdCambriaServicePort);
318 target = target.path("/events/" + fTopic);
319 getLog().info("url : " + target.getUri().toString());
322 Entity<byte[]> data = Entity.entity(baseStream.toByteArray(), "application/cambria");
324 Response response = target.request().post(data);
325 // header("X-CambriaAuth",
326 // "2OH46YIWa329QpEF:"+serverCalculatedSignature).
327 // header("X-CambriaDate", "2015-09-21T11:38:19-0700").
328 // post(Entity.json(baseStream.toByteArray()));
330 getLog().info("Response received :: " + response.getStatus());
331 getLog().info("Response received :: " + response.toString());
333 // code from REST Client Ends
336 * final JSONObject result = post ( url, contentType,
337 * baseStream.toByteArray(), true ); final String logLine =
338 * "cambria reply ok (" + (Clock.now()-startMs) + " ms):" +
339 * result.toString (); getLog().info ( logLine );
343 } catch (IllegalArgumentException x) {
344 getLog().warn(x.getMessage(), x);
347 * catch ( HttpObjectNotFoundException x ) { getLog().warn (
348 * x.getMessage(), x ); } catch ( HttpException x ) { getLog().warn (
349 * x.getMessage(), x ); }
351 catch (IOException x) {
352 getLog().warn(x.getMessage(), x);
357 private final String fTopic;
358 private final int fMaxBatchSize;
359 private final long fMaxBatchAgeMs;
360 private final boolean fCompress;
361 private boolean fClosed;
363 private final LinkedBlockingQueue<TimestampedMessage> fPending;
364 private long fDontSendUntilMs;
365 private final ScheduledThreadPoolExecutor fExec;
367 private static final long sfWaitAfterError = 1000;
373 * @param maxBatchSize
374 * @param maxBatchAgeMs
376 * @throws MalformedURLException
378 private DMaaPCambriaSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize,
379 long maxBatchAgeMs, boolean compress) throws MalformedURLException {
383 if (topic == null || topic.length() < 1) {
384 throw new IllegalArgumentException("A topic must be provided.");
389 fMaxBatchSize = maxBatchSize;
390 fMaxBatchAgeMs = maxBatchAgeMs;
391 fCompress = compress;
393 fPending = new LinkedBlockingQueue<TimestampedMessage>();
394 fDontSendUntilMs = 0;
396 fExec = new ScheduledThreadPoolExecutor(1);
397 fExec.scheduleAtFixedRate(new Runnable() {
402 }, 100, 50, TimeUnit.MILLISECONDS);
411 private static class TimestampedMessage extends message {
413 * to store timestamp value
415 public final long timestamp;
418 * constructor initialize with message
423 public TimestampedMessage(message m) {
425 timestamp = Clock.now();