4c9532b1f50b864984d0a5028844238342b63d4a
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / beans / DMaaPMetricsSet.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.dmf.mr.beans;
23
24 import java.text.SimpleDateFormat;
25 import java.util.Date;
26 import java.util.HashMap;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.ScheduledExecutorService;
29 import java.util.concurrent.TimeUnit;
30
31 import com.att.dmf.mr.CambriaApiVersionInfo;
32 import com.att.dmf.mr.backends.MetricsSet;
33 import com.att.mr.apiServer.metrics.cambria.DMaaPMetricsSender;
34 import com.att.nsa.drumlin.till.nv.rrNvReadable;
35 import com.att.nsa.metrics.impl.CdmConstant;
36 import com.att.nsa.metrics.impl.CdmCounter;
37 import com.att.nsa.metrics.impl.CdmMetricsRegistryImpl;
38 import com.att.nsa.metrics.impl.CdmMovingAverage;
39 import com.att.nsa.metrics.impl.CdmRateTicker;
40 import com.att.nsa.metrics.impl.CdmSimpleMetric;
41 import com.att.nsa.metrics.impl.CdmStringConstant;
42 import com.att.nsa.metrics.impl.CdmTimeSince;
43
44 /*@Component("dMaaPMetricsSet")*/
45 /**
46  * Metrics related information
47  * 
48  * @author anowarul.islam
49  *
50  */
51 public class DMaaPMetricsSet extends CdmMetricsRegistryImpl implements MetricsSet {
52
53         private final CdmStringConstant fVersion;
54         private final CdmConstant fStartTime;
55         private final CdmTimeSince fUpTime;
56
57         private final CdmCounter fRecvTotal;
58         private final CdmRateTicker fRecvEpsInstant;
59         private final CdmRateTicker fRecvEpsShort;
60         private final CdmRateTicker fRecvEpsLong;
61
62         private final CdmCounter fSendTotal;
63         private final CdmRateTicker fSendEpsInstant;
64         private final CdmRateTicker fSendEpsShort;
65         private final CdmRateTicker fSendEpsLong;
66
67         private final CdmCounter fKafkaConsumerCacheMiss;
68         private final CdmCounter fKafkaConsumerCacheHit;
69
70         private final CdmCounter fKafkaConsumerClaimed;
71         private final CdmCounter fKafkaConsumerTimeout;
72
73         private final CdmSimpleMetric fFanOutRatio;
74
75         private final HashMap<String, CdmRateTicker> fPathUseRates;
76         private final HashMap<String, CdmMovingAverage> fPathAvgs;
77
78         private rrNvReadable fSettings;
79
80         private final ScheduledExecutorService fScheduler;
81
82         /**
83          * Constructor initialization
84          * 
85          * @param cs
86          */
87         
88                 public DMaaPMetricsSet(rrNvReadable cs) {
89                 
90                 fVersion = new CdmStringConstant("Version " + CambriaApiVersionInfo.getVersion());
91                 super.putItem("version", fVersion);
92
93                 final long startTime = System.currentTimeMillis();
94                 final Date d = new Date(startTime);
95                 final String text = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssz").format(d);
96                 fStartTime = new CdmConstant(startTime / 1000, "Start Time (epoch); " + text);
97                 super.putItem("startTime", fStartTime);
98
99                 fUpTime = new CdmTimeSince("seconds since start");
100                 super.putItem("upTime", fUpTime);
101
102                 fRecvTotal = new CdmCounter("Total events received since start");
103                 super.putItem("recvTotalEvents", fRecvTotal);
104
105                 fRecvEpsInstant = new CdmRateTicker("recv eps (1 min)", 1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
106                 super.putItem("recvEpsInstant", fRecvEpsInstant);
107
108                 fRecvEpsShort = new CdmRateTicker("recv eps (10 mins)", 1, TimeUnit.SECONDS, 10, TimeUnit.MINUTES);
109                 super.putItem("recvEpsShort", fRecvEpsShort);
110
111                 fRecvEpsLong = new CdmRateTicker("recv eps (1 hr)", 1, TimeUnit.SECONDS, 1, TimeUnit.HOURS);
112                 super.putItem("recvEpsLong", fRecvEpsLong);
113
114                 fSendTotal = new CdmCounter("Total events sent since start");
115                 super.putItem("sendTotalEvents", fSendTotal);
116
117                 fSendEpsInstant = new CdmRateTicker("send eps (1 min)", 1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
118                 super.putItem("sendEpsInstant", fSendEpsInstant);
119
120                 fSendEpsShort = new CdmRateTicker("send eps (10 mins)", 1, TimeUnit.SECONDS, 10, TimeUnit.MINUTES);
121                 super.putItem("sendEpsShort", fSendEpsShort);
122
123                 fSendEpsLong = new CdmRateTicker("send eps (1 hr)", 1, TimeUnit.SECONDS, 1, TimeUnit.HOURS);
124                 super.putItem("sendEpsLong", fSendEpsLong);
125
126                 fKafkaConsumerCacheMiss = new CdmCounter("Kafka Consumer Cache Misses");
127                 super.putItem("kafkaConsumerCacheMiss", fKafkaConsumerCacheMiss);
128
129                 fKafkaConsumerCacheHit = new CdmCounter("Kafka Consumer Cache Hits");
130                 super.putItem("kafkaConsumerCacheHit", fKafkaConsumerCacheHit);
131
132                 fKafkaConsumerClaimed = new CdmCounter("Kafka Consumers Claimed");
133                 super.putItem("kafkaConsumerClaims", fKafkaConsumerClaimed);
134
135                 fKafkaConsumerTimeout = new CdmCounter("Kafka Consumers Timedout");
136                 super.putItem("kafkaConsumerTimeouts", fKafkaConsumerTimeout);
137
138                 // FIXME: CdmLevel is not exactly a great choice
139                 fFanOutRatio = new CdmSimpleMetric() {
140                         @Override
141                         public String getRawValueString() {
142                                 return getRawValue().toString();
143                         }
144
145                         @Override
146                         public Number getRawValue() {
147                                 final double s = fSendTotal.getValue();
148                                 final double r = fRecvTotal.getValue();
149                                 return r == 0.0 ? 0.0 : s / r;
150                         }
151
152                         @Override
153                         public String summarize() {
154                                 return getRawValueString() + " sends per recv";
155                         }
156
157                 };
158                 super.putItem("fanOut", fFanOutRatio);
159
160                 // these are added to the metrics catalog as they're discovered
161                 fPathUseRates = new HashMap<String, CdmRateTicker>();
162                 fPathAvgs = new HashMap<String, CdmMovingAverage>();
163
164                 fScheduler = Executors.newScheduledThreadPool(1);
165         }
166
167         @Override
168         public void setupCambriaSender() {
169                 DMaaPMetricsSender.sendPeriodically(fScheduler, this,  "cambria.apinode.metrics.dmaap");
170         }
171
172         @Override
173         public void onRouteComplete(String name, long durationMs) {
174                 CdmRateTicker ticker = fPathUseRates.get(name);
175                 if (ticker == null) {
176                         ticker = new CdmRateTicker("calls/min on path " + name + "", 1, TimeUnit.MINUTES, 1, TimeUnit.HOURS);
177                         fPathUseRates.put(name, ticker);
178                         super.putItem("pathUse_" + name, ticker);
179                 }
180                 ticker.tick();
181
182                 CdmMovingAverage durs = fPathAvgs.get(name);
183                 if (durs == null) {
184                         durs = new CdmMovingAverage("ms avg duration on path " + name + ", last 10 minutes", 10, TimeUnit.MINUTES);
185                         fPathAvgs.put(name, durs);
186                         super.putItem("pathDurationMs_" + name, durs);
187                 }
188                 durs.tick(durationMs);
189         }
190
191         @Override
192         public void publishTick(int amount) {
193                 if (amount > 0) {
194                         fRecvTotal.bumpBy(amount);
195                         fRecvEpsInstant.tick(amount);
196                         fRecvEpsShort.tick(amount);
197                         fRecvEpsLong.tick(amount);
198                 }
199         }
200
201         @Override
202         public void consumeTick(int amount) {
203                 if (amount > 0) {
204                         fSendTotal.bumpBy(amount);
205                         fSendEpsInstant.tick(amount);
206                         fSendEpsShort.tick(amount);
207                         fSendEpsLong.tick(amount);
208                 }
209         }
210
211         @Override
212         public void onKafkaConsumerCacheMiss() {
213                 fKafkaConsumerCacheMiss.bump();
214         }
215
216         @Override
217         public void onKafkaConsumerCacheHit() {
218                 fKafkaConsumerCacheHit.bump();
219         }
220
221         @Override
222         public void onKafkaConsumerClaimed() {
223                 fKafkaConsumerClaimed.bump();
224         }
225
226         @Override
227         public void onKafkaConsumerTimeout() {
228                 fKafkaConsumerTimeout.bump();
229         }
230
231 }