DMAAP-MR - Merge MR repos
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / beans / DMaaPMetricsSet.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.beans;
23
24 import com.att.nsa.drumlin.till.nv.rrNvReadable;
25 import com.att.nsa.metrics.impl.*;
26 import org.onap.dmaap.dmf.mr.CambriaApiVersionInfo;
27 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
28 import org.onap.dmaap.mr.apiServer.metrics.cambria.DMaaPMetricsSender;
29
30 import java.text.SimpleDateFormat;
31 import java.util.Date;
32 import java.util.HashMap;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.ScheduledExecutorService;
35 import java.util.concurrent.TimeUnit;
36
37 /*@Component("dMaaPMetricsSet")*/
38
39 /**
40  * Metrics related information
41  * 
42  * @author anowarul.islam
43  *
44  */
45 public class DMaaPMetricsSet extends CdmMetricsRegistryImpl implements MetricsSet {
46
47         private final CdmStringConstant fVersion;
48         private final CdmConstant fStartTime;
49         private final CdmTimeSince fUpTime;
50
51         private final CdmCounter fRecvTotal;
52         private final CdmRateTicker fRecvEpsInstant;
53         private final CdmRateTicker fRecvEpsShort;
54         private final CdmRateTicker fRecvEpsLong;
55
56         private final CdmCounter fSendTotal;
57         private final CdmRateTicker fSendEpsInstant;
58         private final CdmRateTicker fSendEpsShort;
59         private final CdmRateTicker fSendEpsLong;
60
61         private final CdmCounter fKafkaConsumerCacheMiss;
62         private final CdmCounter fKafkaConsumerCacheHit;
63
64         private final CdmCounter fKafkaConsumerClaimed;
65         private final CdmCounter fKafkaConsumerTimeout;
66
67         private final CdmSimpleMetric fFanOutRatio;
68
69         private final HashMap<String, CdmRateTicker> fPathUseRates;
70         private final HashMap<String, CdmMovingAverage> fPathAvgs;
71
72         private rrNvReadable fSettings;
73
74         private final ScheduledExecutorService fScheduler;
75
76         /**
77          * Constructor initialization
78          * 
79          * @param cs
80          */
81         
82                 public DMaaPMetricsSet(rrNvReadable cs) {
83                 
84                 fVersion = new CdmStringConstant("Version " + CambriaApiVersionInfo.getVersion());
85                 super.putItem("version", fVersion);
86
87                 final long startTime = System.currentTimeMillis();
88                 final Date d = new Date(startTime);
89                 final String text = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssz").format(d);
90                 fStartTime = new CdmConstant(startTime / 1000, "Start Time (epoch); " + text);
91                 super.putItem("startTime", fStartTime);
92
93                 fUpTime = new CdmTimeSince("seconds since start");
94                 super.putItem("upTime", fUpTime);
95
96                 fRecvTotal = new CdmCounter("Total events received since start");
97                 super.putItem("recvTotalEvents", fRecvTotal);
98
99                 fRecvEpsInstant = new CdmRateTicker("recv eps (1 min)", 1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
100                 super.putItem("recvEpsInstant", fRecvEpsInstant);
101
102                 fRecvEpsShort = new CdmRateTicker("recv eps (10 mins)", 1, TimeUnit.SECONDS, 10, TimeUnit.MINUTES);
103                 super.putItem("recvEpsShort", fRecvEpsShort);
104
105                 fRecvEpsLong = new CdmRateTicker("recv eps (1 hr)", 1, TimeUnit.SECONDS, 1, TimeUnit.HOURS);
106                 super.putItem("recvEpsLong", fRecvEpsLong);
107
108                 fSendTotal = new CdmCounter("Total events sent since start");
109                 super.putItem("sendTotalEvents", fSendTotal);
110
111                 fSendEpsInstant = new CdmRateTicker("send eps (1 min)", 1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
112                 super.putItem("sendEpsInstant", fSendEpsInstant);
113
114                 fSendEpsShort = new CdmRateTicker("send eps (10 mins)", 1, TimeUnit.SECONDS, 10, TimeUnit.MINUTES);
115                 super.putItem("sendEpsShort", fSendEpsShort);
116
117                 fSendEpsLong = new CdmRateTicker("send eps (1 hr)", 1, TimeUnit.SECONDS, 1, TimeUnit.HOURS);
118                 super.putItem("sendEpsLong", fSendEpsLong);
119
120                 fKafkaConsumerCacheMiss = new CdmCounter("Kafka Consumer Cache Misses");
121                 super.putItem("kafkaConsumerCacheMiss", fKafkaConsumerCacheMiss);
122
123                 fKafkaConsumerCacheHit = new CdmCounter("Kafka Consumer Cache Hits");
124                 super.putItem("kafkaConsumerCacheHit", fKafkaConsumerCacheHit);
125
126                 fKafkaConsumerClaimed = new CdmCounter("Kafka Consumers Claimed");
127                 super.putItem("kafkaConsumerClaims", fKafkaConsumerClaimed);
128
129                 fKafkaConsumerTimeout = new CdmCounter("Kafka Consumers Timedout");
130                 super.putItem("kafkaConsumerTimeouts", fKafkaConsumerTimeout);
131
132                 // FIXME: CdmLevel is not exactly a great choice
133                 fFanOutRatio = new CdmSimpleMetric() {
134                         @Override
135                         public String getRawValueString() {
136                                 return getRawValue().toString();
137                         }
138
139                         @Override
140                         public Number getRawValue() {
141                                 final double s = fSendTotal.getValue();
142                                 final double r = fRecvTotal.getValue();
143                                 return r == 0.0 ? 0.0 : s / r;
144                         }
145
146                         @Override
147                         public String summarize() {
148                                 return getRawValueString() + " sends per recv";
149                         }
150
151                 };
152                 super.putItem("fanOut", fFanOutRatio);
153
154                 // these are added to the metrics catalog as they're discovered
155                 fPathUseRates = new HashMap<String, CdmRateTicker>();
156                 fPathAvgs = new HashMap<String, CdmMovingAverage>();
157
158                 fScheduler = Executors.newScheduledThreadPool(1);
159         }
160
161         @Override
162         public void setupCambriaSender() {
163                 DMaaPMetricsSender.sendPeriodically(fScheduler, this,  "cambria.apinode.metrics.dmaap");
164         }
165
166         @Override
167         public void onRouteComplete(String name, long durationMs) {
168                 CdmRateTicker ticker = fPathUseRates.get(name);
169                 if (ticker == null) {
170                         ticker = new CdmRateTicker("calls/min on path " + name + "", 1, TimeUnit.MINUTES, 1, TimeUnit.HOURS);
171                         fPathUseRates.put(name, ticker);
172                         super.putItem("pathUse_" + name, ticker);
173                 }
174                 ticker.tick();
175
176                 CdmMovingAverage durs = fPathAvgs.get(name);
177                 if (durs == null) {
178                         durs = new CdmMovingAverage("ms avg duration on path " + name + ", last 10 minutes", 10, TimeUnit.MINUTES);
179                         fPathAvgs.put(name, durs);
180                         super.putItem("pathDurationMs_" + name, durs);
181                 }
182                 durs.tick(durationMs);
183         }
184
185         @Override
186         public void publishTick(int amount) {
187                 if (amount > 0) {
188                         fRecvTotal.bumpBy(amount);
189                         fRecvEpsInstant.tick(amount);
190                         fRecvEpsShort.tick(amount);
191                         fRecvEpsLong.tick(amount);
192                 }
193         }
194
195         @Override
196         public void consumeTick(int amount) {
197                 if (amount > 0) {
198                         fSendTotal.bumpBy(amount);
199                         fSendEpsInstant.tick(amount);
200                         fSendEpsShort.tick(amount);
201                         fSendEpsLong.tick(amount);
202                 }
203         }
204
205         @Override
206         public void onKafkaConsumerCacheMiss() {
207                 fKafkaConsumerCacheMiss.bump();
208         }
209
210         @Override
211         public void onKafkaConsumerCacheHit() {
212                 fKafkaConsumerCacheHit.bump();
213         }
214
215         @Override
216         public void onKafkaConsumerClaimed() {
217                 fKafkaConsumerClaimed.bump();
218         }
219
220         @Override
221         public void onKafkaConsumerTimeout() {
222                 fKafkaConsumerTimeout.bump();
223         }
224
225 }