1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.beans;
24 import java.text.SimpleDateFormat;
25 import java.util.Date;
26 import java.util.HashMap;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.ScheduledExecutorService;
29 import java.util.concurrent.TimeUnit;
31 import org.onap.dmaap.dmf.mr.CambriaApiVersionInfo;
32 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
33 import org.onap.dmaap.mr.apiServer.metrics.cambria.DMaaPMetricsSender;
34 import com.att.nsa.drumlin.till.nv.rrNvReadable;
35 import com.att.nsa.metrics.impl.CdmConstant;
36 import com.att.nsa.metrics.impl.CdmCounter;
37 import com.att.nsa.metrics.impl.CdmMetricsRegistryImpl;
38 import com.att.nsa.metrics.impl.CdmMovingAverage;
39 import com.att.nsa.metrics.impl.CdmRateTicker;
40 import com.att.nsa.metrics.impl.CdmSimpleMetric;
41 import com.att.nsa.metrics.impl.CdmStringConstant;
42 import com.att.nsa.metrics.impl.CdmTimeSince;
44 /*@Component("dMaaPMetricsSet")*/
46 * Metrics related information
48 * @author anowarul.islam
51 public class DMaaPMetricsSet extends CdmMetricsRegistryImpl implements MetricsSet {
53 private final CdmStringConstant fVersion;
54 private final CdmConstant fStartTime;
55 private final CdmTimeSince fUpTime;
57 private final CdmCounter fRecvTotal;
58 private final CdmRateTicker fRecvEpsInstant;
59 private final CdmRateTicker fRecvEpsShort;
60 private final CdmRateTicker fRecvEpsLong;
62 private final CdmCounter fSendTotal;
63 private final CdmRateTicker fSendEpsInstant;
64 private final CdmRateTicker fSendEpsShort;
65 private final CdmRateTicker fSendEpsLong;
67 private final CdmCounter fKafkaConsumerCacheMiss;
68 private final CdmCounter fKafkaConsumerCacheHit;
70 private final CdmCounter fKafkaConsumerClaimed;
71 private final CdmCounter fKafkaConsumerTimeout;
73 private final CdmSimpleMetric fFanOutRatio;
75 private final HashMap<String, CdmRateTicker> fPathUseRates;
76 private final HashMap<String, CdmMovingAverage> fPathAvgs;
78 private rrNvReadable fSettings;
80 private final ScheduledExecutorService fScheduler;
83 * Constructor initialization
88 public DMaaPMetricsSet(rrNvReadable cs) {
90 fVersion = new CdmStringConstant("Version " + CambriaApiVersionInfo.getVersion());
91 super.putItem("version", fVersion);
93 final long startTime = System.currentTimeMillis();
94 final Date d = new Date(startTime);
95 final String text = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssz").format(d);
96 fStartTime = new CdmConstant(startTime / 1000, "Start Time (epoch); " + text);
97 super.putItem("startTime", fStartTime);
99 fUpTime = new CdmTimeSince("seconds since start");
100 super.putItem("upTime", fUpTime);
102 fRecvTotal = new CdmCounter("Total events received since start");
103 super.putItem("recvTotalEvents", fRecvTotal);
105 fRecvEpsInstant = new CdmRateTicker("recv eps (1 min)", 1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
106 super.putItem("recvEpsInstant", fRecvEpsInstant);
108 fRecvEpsShort = new CdmRateTicker("recv eps (10 mins)", 1, TimeUnit.SECONDS, 10, TimeUnit.MINUTES);
109 super.putItem("recvEpsShort", fRecvEpsShort);
111 fRecvEpsLong = new CdmRateTicker("recv eps (1 hr)", 1, TimeUnit.SECONDS, 1, TimeUnit.HOURS);
112 super.putItem("recvEpsLong", fRecvEpsLong);
114 fSendTotal = new CdmCounter("Total events sent since start");
115 super.putItem("sendTotalEvents", fSendTotal);
117 fSendEpsInstant = new CdmRateTicker("send eps (1 min)", 1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
118 super.putItem("sendEpsInstant", fSendEpsInstant);
120 fSendEpsShort = new CdmRateTicker("send eps (10 mins)", 1, TimeUnit.SECONDS, 10, TimeUnit.MINUTES);
121 super.putItem("sendEpsShort", fSendEpsShort);
123 fSendEpsLong = new CdmRateTicker("send eps (1 hr)", 1, TimeUnit.SECONDS, 1, TimeUnit.HOURS);
124 super.putItem("sendEpsLong", fSendEpsLong);
126 fKafkaConsumerCacheMiss = new CdmCounter("Kafka Consumer Cache Misses");
127 super.putItem("kafkaConsumerCacheMiss", fKafkaConsumerCacheMiss);
129 fKafkaConsumerCacheHit = new CdmCounter("Kafka Consumer Cache Hits");
130 super.putItem("kafkaConsumerCacheHit", fKafkaConsumerCacheHit);
132 fKafkaConsumerClaimed = new CdmCounter("Kafka Consumers Claimed");
133 super.putItem("kafkaConsumerClaims", fKafkaConsumerClaimed);
135 fKafkaConsumerTimeout = new CdmCounter("Kafka Consumers Timedout");
136 super.putItem("kafkaConsumerTimeouts", fKafkaConsumerTimeout);
138 // FIXME: CdmLevel is not exactly a great choice
139 fFanOutRatio = new CdmSimpleMetric() {
141 public String getRawValueString() {
142 return getRawValue().toString();
146 public Number getRawValue() {
147 final double s = fSendTotal.getValue();
148 final double r = fRecvTotal.getValue();
149 return r == 0.0 ? 0.0 : s / r;
153 public String summarize() {
154 return getRawValueString() + " sends per recv";
158 super.putItem("fanOut", fFanOutRatio);
160 // these are added to the metrics catalog as they're discovered
161 fPathUseRates = new HashMap<String, CdmRateTicker>();
162 fPathAvgs = new HashMap<String, CdmMovingAverage>();
164 fScheduler = Executors.newScheduledThreadPool(1);
168 public void setupCambriaSender() {
169 DMaaPMetricsSender.sendPeriodically(fScheduler, this, "cambria.apinode.metrics.dmaap");
173 public void onRouteComplete(String name, long durationMs) {
174 CdmRateTicker ticker = fPathUseRates.get(name);
175 if (ticker == null) {
176 ticker = new CdmRateTicker("calls/min on path " + name + "", 1, TimeUnit.MINUTES, 1, TimeUnit.HOURS);
177 fPathUseRates.put(name, ticker);
178 super.putItem("pathUse_" + name, ticker);
182 CdmMovingAverage durs = fPathAvgs.get(name);
184 durs = new CdmMovingAverage("ms avg duration on path " + name + ", last 10 minutes", 10, TimeUnit.MINUTES);
185 fPathAvgs.put(name, durs);
186 super.putItem("pathDurationMs_" + name, durs);
188 durs.tick(durationMs);
192 public void publishTick(int amount) {
194 fRecvTotal.bumpBy(amount);
195 fRecvEpsInstant.tick(amount);
196 fRecvEpsShort.tick(amount);
197 fRecvEpsLong.tick(amount);
202 public void consumeTick(int amount) {
204 fSendTotal.bumpBy(amount);
205 fSendEpsInstant.tick(amount);
206 fSendEpsShort.tick(amount);
207 fSendEpsLong.tick(amount);
212 public void onKafkaConsumerCacheMiss() {
213 fKafkaConsumerCacheMiss.bump();
217 public void onKafkaConsumerCacheHit() {
218 fKafkaConsumerCacheHit.bump();
222 public void onKafkaConsumerClaimed() {
223 fKafkaConsumerClaimed.bump();
227 public void onKafkaConsumerTimeout() {
228 fKafkaConsumerTimeout.bump();