1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.beans;
24 import com.att.nsa.drumlin.till.nv.rrNvReadable;
25 import com.att.nsa.metrics.impl.*;
26 import org.onap.dmaap.dmf.mr.CambriaApiVersionInfo;
27 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
28 import org.onap.dmaap.mr.apiServer.metrics.cambria.DMaaPMetricsSender;
30 import java.text.SimpleDateFormat;
31 import java.util.Date;
32 import java.util.HashMap;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.ScheduledExecutorService;
35 import java.util.concurrent.TimeUnit;
37 /*@Component("dMaaPMetricsSet")*/
40 * Metrics related information
42 * @author anowarul.islam
45 public class DMaaPMetricsSet extends CdmMetricsRegistryImpl implements MetricsSet {
47 private final CdmStringConstant fVersion;
48 private final CdmConstant fStartTime;
49 private final CdmTimeSince fUpTime;
51 private final CdmCounter fRecvTotal;
52 private final CdmRateTicker fRecvEpsInstant;
53 private final CdmRateTicker fRecvEpsShort;
54 private final CdmRateTicker fRecvEpsLong;
56 private final CdmCounter fSendTotal;
57 private final CdmRateTicker fSendEpsInstant;
58 private final CdmRateTicker fSendEpsShort;
59 private final CdmRateTicker fSendEpsLong;
61 private final CdmCounter fKafkaConsumerCacheMiss;
62 private final CdmCounter fKafkaConsumerCacheHit;
64 private final CdmCounter fKafkaConsumerClaimed;
65 private final CdmCounter fKafkaConsumerTimeout;
67 private final CdmSimpleMetric fFanOutRatio;
69 private final HashMap<String, CdmRateTicker> fPathUseRates;
70 private final HashMap<String, CdmMovingAverage> fPathAvgs;
72 private rrNvReadable fSettings;
74 private final ScheduledExecutorService fScheduler;
77 * Constructor initialization
82 public DMaaPMetricsSet(rrNvReadable cs) {
84 fVersion = new CdmStringConstant("Version " + CambriaApiVersionInfo.getVersion());
85 super.putItem("version", fVersion);
87 final long startTime = System.currentTimeMillis();
88 final Date d = new Date(startTime);
89 final String text = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssz").format(d);
90 fStartTime = new CdmConstant(startTime / 1000, "Start Time (epoch); " + text);
91 super.putItem("startTime", fStartTime);
93 fUpTime = new CdmTimeSince("seconds since start");
94 super.putItem("upTime", fUpTime);
96 fRecvTotal = new CdmCounter("Total events received since start");
97 super.putItem("recvTotalEvents", fRecvTotal);
99 fRecvEpsInstant = new CdmRateTicker("recv eps (1 min)", 1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
100 super.putItem("recvEpsInstant", fRecvEpsInstant);
102 fRecvEpsShort = new CdmRateTicker("recv eps (10 mins)", 1, TimeUnit.SECONDS, 10, TimeUnit.MINUTES);
103 super.putItem("recvEpsShort", fRecvEpsShort);
105 fRecvEpsLong = new CdmRateTicker("recv eps (1 hr)", 1, TimeUnit.SECONDS, 1, TimeUnit.HOURS);
106 super.putItem("recvEpsLong", fRecvEpsLong);
108 fSendTotal = new CdmCounter("Total events sent since start");
109 super.putItem("sendTotalEvents", fSendTotal);
111 fSendEpsInstant = new CdmRateTicker("send eps (1 min)", 1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
112 super.putItem("sendEpsInstant", fSendEpsInstant);
114 fSendEpsShort = new CdmRateTicker("send eps (10 mins)", 1, TimeUnit.SECONDS, 10, TimeUnit.MINUTES);
115 super.putItem("sendEpsShort", fSendEpsShort);
117 fSendEpsLong = new CdmRateTicker("send eps (1 hr)", 1, TimeUnit.SECONDS, 1, TimeUnit.HOURS);
118 super.putItem("sendEpsLong", fSendEpsLong);
120 fKafkaConsumerCacheMiss = new CdmCounter("Kafka Consumer Cache Misses");
121 super.putItem("kafkaConsumerCacheMiss", fKafkaConsumerCacheMiss);
123 fKafkaConsumerCacheHit = new CdmCounter("Kafka Consumer Cache Hits");
124 super.putItem("kafkaConsumerCacheHit", fKafkaConsumerCacheHit);
126 fKafkaConsumerClaimed = new CdmCounter("Kafka Consumers Claimed");
127 super.putItem("kafkaConsumerClaims", fKafkaConsumerClaimed);
129 fKafkaConsumerTimeout = new CdmCounter("Kafka Consumers Timedout");
130 super.putItem("kafkaConsumerTimeouts", fKafkaConsumerTimeout);
132 // FIXME: CdmLevel is not exactly a great choice
133 fFanOutRatio = new CdmSimpleMetric() {
135 public String getRawValueString() {
136 return getRawValue().toString();
140 public Number getRawValue() {
141 final double s = fSendTotal.getValue();
142 final double r = fRecvTotal.getValue();
143 return r == 0.0 ? 0.0 : s / r;
147 public String summarize() {
148 return getRawValueString() + " sends per recv";
152 super.putItem("fanOut", fFanOutRatio);
154 // these are added to the metrics catalog as they're discovered
155 fPathUseRates = new HashMap<String, CdmRateTicker>();
156 fPathAvgs = new HashMap<String, CdmMovingAverage>();
158 fScheduler = Executors.newScheduledThreadPool(1);
162 public void setupCambriaSender() {
163 DMaaPMetricsSender.sendPeriodically(fScheduler, this, "cambria.apinode.metrics.dmaap");
167 public void onRouteComplete(String name, long durationMs) {
168 CdmRateTicker ticker = fPathUseRates.get(name);
169 if (ticker == null) {
170 ticker = new CdmRateTicker("calls/min on path " + name + "", 1, TimeUnit.MINUTES, 1, TimeUnit.HOURS);
171 fPathUseRates.put(name, ticker);
172 super.putItem("pathUse_" + name, ticker);
176 CdmMovingAverage durs = fPathAvgs.get(name);
178 durs = new CdmMovingAverage("ms avg duration on path " + name + ", last 10 minutes", 10, TimeUnit.MINUTES);
179 fPathAvgs.put(name, durs);
180 super.putItem("pathDurationMs_" + name, durs);
182 durs.tick(durationMs);
186 public void publishTick(int amount) {
188 fRecvTotal.bumpBy(amount);
189 fRecvEpsInstant.tick(amount);
190 fRecvEpsShort.tick(amount);
191 fRecvEpsLong.tick(amount);
196 public void consumeTick(int amount) {
198 fSendTotal.bumpBy(amount);
199 fSendEpsInstant.tick(amount);
200 fSendEpsShort.tick(amount);
201 fSendEpsLong.tick(amount);
206 public void onKafkaConsumerCacheMiss() {
207 fKafkaConsumerCacheMiss.bump();
211 public void onKafkaConsumerCacheHit() {
212 fKafkaConsumerCacheHit.bump();
216 public void onKafkaConsumerClaimed() {
217 fKafkaConsumerClaimed.bump();
221 public void onKafkaConsumerTimeout() {
222 fKafkaConsumerTimeout.bump();