1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.metrics.publisher.impl;
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
28 import java.nio.channels.NotYetConnectedException;
29 import java.util.Collection;
30 import java.util.LinkedList;
31 import java.util.List;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.ScheduledThreadPoolExecutor;
34 import java.util.concurrent.TimeUnit;
35 import java.util.zip.GZIPOutputStream;
37 import javax.ws.rs.client.Client;
38 import javax.ws.rs.client.ClientBuilder;
39 import javax.ws.rs.client.Entity;
40 import javax.ws.rs.client.WebTarget;
41 import javax.ws.rs.core.Response;
43 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
44 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
45 import org.onap.dmaap.dmf.mr.metrics.publisher.CambriaPublisherUtility;
49 * class DMaaPCambriaSimplerBatchPublisher used to send the publish the messages
52 * @author anowarul.islam
55 public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
56 implements org.onap.dmaap.dmf.mr.metrics.publisher.CambriaBatchingPublisher {
59 * static inner class initializes with urls, topic,batchSize
61 * @author anowarul.islam
64 public static class Builder {
69 * constructor initialize with url
75 public Builder againstUrls(Collection<String> baseUrls) {
81 * constructor initializes with topics
87 public Builder onTopic(String topic) {
93 * constructor initilazes with batch size and batch time
96 * @param maxBatchAgeMs
100 public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
101 fMaxBatchSize = maxBatchSize;
102 fMaxBatchAgeMs = maxBatchAgeMs;
107 * constructor initializes with compress
112 public Builder compress(boolean compress) {
113 fCompress = compress;
118 * method returns DMaaPCambriaSimplerBatchPublisher object
122 public DMaaPCambriaSimplerBatchPublisher build() {
125 return new DMaaPCambriaSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress);
126 } catch (MalformedURLException e) {
128 NotYetConnectedException exception=new NotYetConnectedException();
129 exception.setStackTrace(e.getStackTrace());
136 private Collection<String> fUrls;
137 private String fTopic;
138 private int fMaxBatchSize = 100;
139 private long fMaxBatchAgeMs = 1000;
140 private boolean fCompress = false;
149 public int send(String partition, String msg) {
150 return send(new message(partition, msg));
157 public int send(message msg) {
158 final LinkedList<message> list = new LinkedList<message>();
167 public synchronized int send(Collection<message> msgs) {
169 throw new IllegalStateException("The publisher was closed.");
172 for (message userMsg : msgs) {
173 fPending.add(new TimestampedMessage(userMsg));
175 return getPendingMessageCount();
179 * getPending message count
182 public synchronized int getPendingMessageCount() {
183 return fPending.size();
188 * @exception InterruptedException
189 * @exception IOException
192 public void close() {
194 final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
195 if (remains.isEmpty()) {
196 getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
197 + "Consider using CambriaBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
199 } catch (InterruptedException e) {
200 getLog().warn("Possible message loss. " + e.getMessage(), e);
201 } catch (IOException e) {
202 getLog().warn("Possible message loss. " + e.getMessage(), e);
211 public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
212 synchronized (this) {
215 // stop the background sender
216 fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
217 fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
221 final long now = Clock.now();
222 final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
223 final long timeoutAtMs = now + waitInMs;
225 while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
229 // synchronizing the current object
230 synchronized (this) {
231 final LinkedList<message> result = new LinkedList<message>();
232 fPending.drainTo(result);
238 * Possibly send a batch to the cambria server. This is called by the
239 * background thread and the close() method
243 private synchronized void send(boolean force) {
244 if (force || shouldSendNow()) {
246 getLog().warn("Send failed, " + fPending.size() + " message to send.");
248 // note the time for back-off
249 fDontSendUntilMs = sfWaitAfterError + Clock.now();
258 private synchronized boolean shouldSendNow() {
259 boolean shouldSend = false;
260 if (fPending.isEmpty()) {
261 final long nowMs = Clock.now();
263 shouldSend = (fPending.size() >= fMaxBatchSize);
265 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
266 shouldSend = sendAtMs <= nowMs;
269 // however, wait after an error
270 shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
279 private synchronized boolean sendBatch() {
280 // it's possible for this call to be made with an empty list. in this
281 // case, just return.
282 if (fPending.isEmpty()) {
286 final long nowMs = Clock.now();
287 final String url = CambriaPublisherUtility.makeUrl(fTopic);
289 getLog().info("sending " + fPending.size() + " msgs to " + url + ". Oldest: "
290 + (nowMs - fPending.peek().timestamp) + " ms");
294 final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
295 OutputStream os = baseStream;
297 os = new GZIPOutputStream(baseStream);
299 for (TimestampedMessage m : fPending) {
300 os.write(("" + m.fPartition.length()).getBytes());
302 os.write(("" + m.fMsg.length()).getBytes());
304 os.write(m.fPartition.getBytes());
305 os.write(m.fMsg.getBytes());
310 final long startMs = Clock.now();
312 // code from REST Client Starts
317 Client client = ClientBuilder.newClient();
318 String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
319 if (null==metricTopicname) {
321 metricTopicname="msgrtr.apinode.metrics.dmaap";
323 WebTarget target = client
324 .target("http://localhost:" + CambriaConstants.kStdCambriaServicePort);
325 target = target.path("/events/" + fTopic);
326 getLog().info("url : " + target.getUri().toString());
329 Entity<byte[]> data = Entity.entity(baseStream.toByteArray(), "application/cambria");
331 Response response = target.request().post(data);
333 getLog().info("Response received :: " + response.getStatus());
334 getLog().info("Response received :: " + response.toString());
336 // code from REST Client Ends
341 } catch (IllegalArgumentException x) {
342 getLog().warn(x.getMessage(), x);
345 catch (IOException x) {
346 getLog().warn(x.getMessage(), x);
351 private final String fTopic;
352 private final int fMaxBatchSize;
353 private final long fMaxBatchAgeMs;
354 private final boolean fCompress;
355 private boolean fClosed;
357 private final LinkedBlockingQueue<TimestampedMessage> fPending;
358 private long fDontSendUntilMs;
359 private final ScheduledThreadPoolExecutor fExec;
361 private static final long sfWaitAfterError = 1000;
367 * @param maxBatchSize
368 * @param maxBatchAgeMs
371 private DMaaPCambriaSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize,
372 long maxBatchAgeMs, boolean compress) throws MalformedURLException {
376 if (topic == null || topic.length() < 1) {
377 throw new IllegalArgumentException("A topic must be provided.");
382 fMaxBatchSize = maxBatchSize;
383 fMaxBatchAgeMs = maxBatchAgeMs;
384 fCompress = compress;
386 fPending = new LinkedBlockingQueue<TimestampedMessage>();
387 fDontSendUntilMs = 0;
389 fExec = new ScheduledThreadPoolExecutor(1);
390 fExec.scheduleAtFixedRate(new Runnable() {
395 }, 100, 50, TimeUnit.MILLISECONDS);
401 * @author anowarul.islam
404 private static class TimestampedMessage extends message {
406 * to store timestamp value
408 public final long timestamp;
411 * constructor initialize with message
416 public TimestampedMessage(message m) {
418 timestamp = Clock.now();