1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.metrics.publisher.impl;
24 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
25 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
26 import org.onap.dmaap.dmf.mr.metrics.publisher.CambriaPublisherUtility;
28 import javax.ws.rs.client.Client;
29 import javax.ws.rs.client.ClientBuilder;
30 import javax.ws.rs.client.Entity;
31 import javax.ws.rs.client.WebTarget;
32 import javax.ws.rs.core.Response;
33 import java.io.ByteArrayOutputStream;
34 import java.io.IOException;
35 import java.io.OutputStream;
36 import java.net.MalformedURLException;
37 import java.nio.channels.NotYetConnectedException;
38 import java.util.Collection;
39 import java.util.LinkedList;
40 import java.util.List;
41 import java.util.concurrent.LinkedBlockingQueue;
42 import java.util.concurrent.ScheduledThreadPoolExecutor;
43 import java.util.concurrent.TimeUnit;
44 import java.util.zip.GZIPOutputStream;
48 * class DMaaPCambriaSimplerBatchPublisher used to send the publish the messages
51 * @author anowarul.islam
54 public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
55 implements org.onap.dmaap.dmf.mr.metrics.publisher.CambriaBatchingPublisher {
58 * static inner class initializes with urls, topic,batchSize
60 * @author anowarul.islam
63 public static class Builder {
68 * constructor initialize with url
74 public Builder againstUrls(Collection<String> baseUrls) {
80 * constructor initializes with topics
86 public Builder onTopic(String topic) {
92 * constructor initilazes with batch size and batch time
95 * @param maxBatchAgeMs
99 public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
100 fMaxBatchSize = maxBatchSize;
101 fMaxBatchAgeMs = maxBatchAgeMs;
106 * constructor initializes with compress
111 public Builder compress(boolean compress) {
112 fCompress = compress;
117 * method returns DMaaPCambriaSimplerBatchPublisher object
121 public DMaaPCambriaSimplerBatchPublisher build() {
124 return new DMaaPCambriaSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress);
125 } catch (MalformedURLException e) {
127 NotYetConnectedException exception=new NotYetConnectedException();
128 exception.setStackTrace(e.getStackTrace());
135 private Collection<String> fUrls;
136 private String fTopic;
137 private int fMaxBatchSize = 100;
138 private long fMaxBatchAgeMs = 1000;
139 private boolean fCompress = false;
148 public int send(String partition, String msg) {
149 return send(new message(partition, msg));
156 public int send(message msg) {
157 final LinkedList<message> list = new LinkedList<message>();
166 public synchronized int send(Collection<message> msgs) {
168 throw new IllegalStateException("The publisher was closed.");
171 for (message userMsg : msgs) {
172 fPending.add(new TimestampedMessage(userMsg));
174 return getPendingMessageCount();
178 * getPending message count
181 public synchronized int getPendingMessageCount() {
182 return fPending.size();
187 * @exception InterruptedException
188 * @exception IOException
191 public void close() {
193 final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
194 if (remains.isEmpty()) {
195 getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
196 + "Consider using CambriaBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
198 } catch (InterruptedException e) {
199 getLog().warn("Possible message loss. " + e.getMessage(), e);
200 Thread.currentThread().interrupt();
201 } catch (IOException e) {
202 getLog().warn("Possible message loss. " + e.getMessage(), e);
211 public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
212 synchronized (this) {
215 // stop the background sender
216 fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
217 fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
221 final long now = Clock.now();
222 final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
223 final long timeoutAtMs = now + waitInMs;
225 while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
229 // synchronizing the current object
230 synchronized (this) {
231 final LinkedList<message> result = new LinkedList<message>();
232 fPending.drainTo(result);
238 * Possibly send a batch to the cambria server. This is called by the
239 * background thread and the close() method
243 private synchronized void send(boolean force) {
244 if (force || shouldSendNow()) {
246 getLog().warn("Send failed, " + fPending.size() + " message to send.");
248 // note the time for back-off
249 fDontSendUntilMs = sfWaitAfterError + Clock.now();
258 private synchronized boolean shouldSendNow() {
259 boolean shouldSend = false;
260 if (fPending.isEmpty()) {
261 final long nowMs = Clock.now();
263 shouldSend = (fPending.size() >= fMaxBatchSize);
265 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
266 shouldSend = sendAtMs <= nowMs;
269 // however, wait after an error
270 shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
279 private synchronized boolean sendBatch() {
280 // it's possible for this call to be made with an empty list. in this
281 // case, just return.
282 if (fPending.isEmpty()) {
286 final long nowMs = Clock.now();
287 final String url = CambriaPublisherUtility.makeUrl(fTopic);
289 getLog().info("sending " + fPending.size() + " msgs to " + url + ". Oldest: "
290 + (nowMs - fPending.peek().timestamp) + " ms");
294 final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
295 OutputStream os = baseStream;
297 os = new GZIPOutputStream(baseStream);
299 for (TimestampedMessage m : fPending) {
300 os.write(("" + m.fPartition.length()).getBytes());
302 os.write(("" + m.fMsg.length()).getBytes());
304 os.write(m.fPartition.getBytes());
305 os.write(m.fMsg.getBytes());
310 final long startMs = Clock.now();
312 // code from REST Client Starts
317 Client client = ClientBuilder.newClient();
318 String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
319 if (null==metricTopicname) {
321 metricTopicname="msgrtr.apinode.metrics.dmaap";
323 WebTarget target = client
324 .target("http://localhost:" + CambriaConstants.kStdCambriaServicePort);
325 target = target.path("/events/" + fTopic);
326 getLog().info("url : " + target.getUri().toString());
329 Entity<byte[]> data = Entity.entity(baseStream.toByteArray(), "application/cambria");
331 Response response = target.request().post(data);
333 getLog().info("Response received :: " + response.getStatus());
334 getLog().info("Response received :: " + response.toString());
336 // code from REST Client Ends
341 } catch (IllegalArgumentException x) {
342 getLog().warn(x.getMessage(), x);
345 catch (IOException x) {
346 getLog().warn(x.getMessage(), x);
351 private final String fTopic;
352 private final int fMaxBatchSize;
353 private final long fMaxBatchAgeMs;
354 private final boolean fCompress;
355 private boolean fClosed;
357 private final LinkedBlockingQueue<TimestampedMessage> fPending;
358 private long fDontSendUntilMs;
359 private final ScheduledThreadPoolExecutor fExec;
361 private static final long sfWaitAfterError = 1000;
367 * @param maxBatchSize
368 * @param maxBatchAgeMs
371 private DMaaPCambriaSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize,
372 long maxBatchAgeMs, boolean compress) throws MalformedURLException {
376 if (topic == null || topic.length() < 1) {
377 throw new IllegalArgumentException("A topic must be provided.");
382 fMaxBatchSize = maxBatchSize;
383 fMaxBatchAgeMs = maxBatchAgeMs;
384 fCompress = compress;
386 fPending = new LinkedBlockingQueue<TimestampedMessage>();
387 fDontSendUntilMs = 0;
389 fExec = new ScheduledThreadPoolExecutor(1);
390 fExec.scheduleAtFixedRate(new Runnable() {
395 }, 100, 50, TimeUnit.MILLISECONDS);
401 * @author anowarul.islam
404 private static class TimestampedMessage extends message {
406 * to store timestamp value
408 public final long timestamp;
411 * constructor initialize with message
416 public TimestampedMessage(message m) {
418 timestamp = Clock.now();