Update license and poms
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / sync / AbstractEntitySynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sparky.sync;
22
23 import java.util.EnumSet;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.atomic.AtomicInteger;
26
27 import org.onap.aai.cl.api.Logger;
28 import org.onap.aai.cl.mdc.MdcContext;
29 import org.onap.aai.restclient.client.OperationResult;
30 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
31 import org.onap.aai.sparky.dal.ElasticSearchAdapter;
32 import org.onap.aai.sparky.dal.NetworkTransaction;
33 import org.onap.aai.sparky.dal.aai.ActiveInventoryEntityStatistics;
34 import org.onap.aai.sparky.dal.aai.ActiveInventoryProcessingExceptionStatistics;
35 import org.onap.aai.sparky.dal.elasticsearch.ElasticSearchEntityStatistics;
36 import org.onap.aai.sparky.dal.rest.HttpMethod;
37 import org.onap.aai.sparky.dal.rest.RestOperationalStatistics;
38 import org.onap.aai.sparky.logging.AaiUiMsgs;
39 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
40 import org.onap.aai.sparky.util.NodeUtils;
41
42 import com.fasterxml.jackson.databind.ObjectMapper;
43
44 /**
45  * The Class AbstractEntitySynchronizer.
46  *
47  * @author davea.
48  */
49 public abstract class AbstractEntitySynchronizer {
50
51   protected static final int VERSION_CONFLICT_EXCEPTION_CODE = 409;
52   protected static final Integer RETRY_COUNT_PER_ENTITY_LIMIT = new Integer(3);
53
54   protected final Logger logger;
55   protected ObjectMapper mapper;
56   protected long syncDurationInMs;
57
58   /**
59    * The Enum StatFlag.
60    */
61   protected enum StatFlag {
62     AAI_REST_STATS, AAI_ENTITY_STATS, AAI_PROCESSING_EXCEPTION_STATS,
63     AAI_TASK_PROCESSING_STATS, ES_REST_STATS, ES_ENTITY_STATS, ES_TASK_PROCESSING_STATS
64   }
65
66   protected EnumSet<StatFlag> enabledStatFlags;
67
68   protected ElasticSearchAdapter elasticSearchAdapter;
69   protected ActiveInventoryAdapter aaiAdapter;
70
71   protected ExecutorService synchronizerExecutor;
72   protected ExecutorService aaiExecutor;
73   protected ExecutorService esExecutor;
74
75   private RestOperationalStatistics esRestStats;
76   protected ElasticSearchEntityStatistics esEntityStats;
77
78   private RestOperationalStatistics aaiRestStats;
79   protected ActiveInventoryEntityStatistics aaiEntityStats;
80   private ActiveInventoryProcessingExceptionStatistics aaiProcessingExceptionStats;
81
82   private TaskProcessingStats aaiTaskProcessingStats;
83   private TaskProcessingStats esTaskProcessingStats;
84
85   private TransactionRateMonitor aaiTransactionRateController;
86   private TransactionRateMonitor esTransactionRateController;
87
88   protected AtomicInteger aaiWorkOnHand;
89   protected AtomicInteger esWorkOnHand;
90   protected String synchronizerName;
91
92   protected abstract boolean isSyncDone();
93   protected boolean shouldSkipSync;
94
95   public String getActiveInventoryStatisticsReport() {
96
97     StringBuilder sb = new StringBuilder(128);
98
99     if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
100       sb.append("\n\n        ").append("REST Operational Stats:");
101       sb.append(aaiRestStats.getStatisticsReport());
102     }
103
104     if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
105       sb.append("\n\n        ").append("Entity Stats:");
106       sb.append(aaiEntityStats.getStatisticsReport());
107     }
108
109     if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
110       sb.append("\n\n        ").append("Processing Exception Stats:");
111       sb.append(aaiProcessingExceptionStats.getStatisticsReport());
112     }
113
114     return sb.toString();
115
116   }
117
118   public String getElasticSearchStatisticsReport() {
119
120     StringBuilder sb = new StringBuilder(128);
121
122     if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
123       sb.append("\n\n        ").append("REST Operational Stats:");
124       sb.append(esRestStats.getStatisticsReport());
125     }
126
127     if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
128       sb.append("\n\n        ").append("Entity Stats:");
129       sb.append(esEntityStats.getStatisticsReport());
130     }
131
132     return sb.toString();
133
134   }
135
136   /**
137    * Adds the active inventory stat report.
138    *
139    * @param sb the sb
140    */
141   private void addActiveInventoryStatReport(StringBuilder sb) {
142
143     if (sb == null) {
144       return;
145     }
146
147     sb.append("\n\n    AAI");
148     sb.append(getActiveInventoryStatisticsReport());
149
150     double currentTps = 0;
151     if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
152       sb.append("\n\n        ").append("Task Processor Stats:");
153       sb.append(aaiTaskProcessingStats.getStatisticsReport(false, "        "));
154
155       currentTps = aaiTransactionRateController.getCurrentTps();
156
157       sb.append("\n          ").append("Current TPS: ").append(currentTps);
158     }
159
160     sb.append("\n          ").append("Current WOH: ").append(aaiWorkOnHand.get());
161
162     if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
163       if (currentTps > 0) {
164         double numMillisecondsToCompletion = (aaiWorkOnHand.get() / currentTps) * 1000;
165         sb.append("\n            ").append("SyncDurationRemaining=")
166             .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
167       }
168     }
169
170   }
171
172   /**
173    * Adds the elastic stat report.
174    *
175    * @param sb the sb
176    */
177   private void addElasticStatReport(StringBuilder sb) {
178
179     if (sb == null) {
180       return;
181     }
182
183     sb.append("\n\n    ELASTIC");
184     sb.append(getElasticSearchStatisticsReport());
185
186     double currentTps = 0;
187
188     if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
189       sb.append("\n\n        ").append("Task Processor Stats:");
190       sb.append(esTaskProcessingStats.getStatisticsReport(false, "           "));
191
192       currentTps = esTransactionRateController.getCurrentTps();
193
194       sb.append("\n        ").append("Current TPS: ").append(currentTps);
195     }
196
197     sb.append("\n        ").append("Current WOH: ").append(esWorkOnHand.get());
198
199     if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
200       if (currentTps > 0) {
201         double numMillisecondsToCompletion = (esWorkOnHand.get() / currentTps) * 1000;
202         sb.append("\n            ").append("SyncDurationRemaining=")
203             .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
204       }
205     }
206
207
208   }
209
210   /**
211    * Gets the stat report.
212    *
213    * @param syncOpTimeInMs the sync op time in ms
214    * @param showFinalReport the show final report
215    * @return the stat report
216    */
217   protected String getStatReport(long syncOpTimeInMs, boolean showFinalReport) {
218
219     StringBuilder sb = new StringBuilder(128);
220
221     sb.append("\n").append(synchronizerName + " Statistics: ( Sync Operation Duration = "
222         + NodeUtils.getDurationBreakdown(syncOpTimeInMs) + " )");
223
224     addActiveInventoryStatReport(sb);
225     addElasticStatReport(sb);
226
227     if (showFinalReport) {
228       sb.append("\n\n        ").append("Sync Completed!\n");
229     } else {
230       sb.append("\n\n        ").append("Sync in Progress...\n");
231     }
232
233     return sb.toString();
234
235   }
236
237   protected String indexName;
238   protected long syncStartedTimeStampInMs;
239
240   /**
241    * Instantiates a new abstract entity synchronizer.
242    *
243    * @param logger the logger
244    * @param syncName the sync name
245    * @param numSyncWorkers the num sync workers
246    * @param numActiveInventoryWorkers the num active inventory workers
247    * @param numElasticsearchWorkers the num elasticsearch workers
248    * @param indexName the index name
249    * @throws Exception the exception
250    */
251   protected AbstractEntitySynchronizer(Logger logger, String syncName, int numSyncWorkers,
252       int numActiveInventoryWorkers, int numElasticsearchWorkers, String indexName,
253       NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig)
254           throws Exception {
255     this.logger = logger;
256     this.synchronizerExecutor =
257         NodeUtils.createNamedExecutor(syncName + "-INTERNAL", numSyncWorkers, logger);
258     this.aaiExecutor =
259         NodeUtils.createNamedExecutor(syncName + "-AAI", numActiveInventoryWorkers, logger);
260     this.esExecutor =
261         NodeUtils.createNamedExecutor(syncName + "-ES", numElasticsearchWorkers, logger);
262     this.mapper = new ObjectMapper();
263     this.indexName = indexName; 
264     this.esRestStats = new RestOperationalStatistics();
265     this.esEntityStats = new ElasticSearchEntityStatistics();
266     this.aaiRestStats = new RestOperationalStatistics();
267     this.aaiEntityStats = new ActiveInventoryEntityStatistics();
268     this.aaiProcessingExceptionStats = new ActiveInventoryProcessingExceptionStatistics();
269     this.aaiTaskProcessingStats =
270         new TaskProcessingStats(aaiStatConfig);
271     this.esTaskProcessingStats =
272         new TaskProcessingStats(esStatConfig);
273
274     this.aaiTransactionRateController =
275         new TransactionRateMonitor(numActiveInventoryWorkers, aaiStatConfig);
276     this.esTransactionRateController =
277         new TransactionRateMonitor(numElasticsearchWorkers, esStatConfig);
278
279     this.aaiWorkOnHand = new AtomicInteger(0);
280     this.esWorkOnHand = new AtomicInteger(0);
281
282     enabledStatFlags = EnumSet.allOf(StatFlag.class);
283
284     this.synchronizerName = "Abstact Entity Synchronizer";
285     
286     String txnID = NodeUtils.getRandomTxnId();
287         MdcContext.initialize(txnID, "AbstractEntitySynchronizer", "", "Sync", "");
288
289     this.shouldSkipSync = false;
290     this.syncStartedTimeStampInMs = System.currentTimeMillis();
291     this.syncDurationInMs = -1;
292   }
293
294   public boolean shouldSkipSync() {
295     return shouldSkipSync;
296   }
297
298   public void setShouldSkipSync(boolean shouldSkipSync) {
299     this.shouldSkipSync = shouldSkipSync;
300   }
301
302   /**
303    * Inc active inventory work on hand counter.
304    */
305   protected void incActiveInventoryWorkOnHandCounter() {
306     aaiWorkOnHand.incrementAndGet();
307   }
308
309   /**
310    * Dec active inventory work on hand counter.
311    */
312   protected void decActiveInventoryWorkOnHandCounter() {
313     aaiWorkOnHand.decrementAndGet();
314   }
315
316   /**
317    * Inc elastic search work on hand counter.
318    */
319   protected void incElasticSearchWorkOnHandCounter() {
320     esWorkOnHand.incrementAndGet();
321   }
322
323   /**
324    * Dec elastic search work on hand counter.
325    */
326   protected void decElasticSearchWorkOnHandCounter() {
327     esWorkOnHand.decrementAndGet();
328   }
329
330   /**
331    * Shutdown executors.
332    */
333   protected void shutdownExecutors() {
334     try {
335
336       if (synchronizerExecutor != null) {
337         synchronizerExecutor.shutdown();
338       }
339
340       if (aaiExecutor != null) {
341         aaiExecutor.shutdown();
342       }
343
344       if (esExecutor != null) {
345         esExecutor.shutdown();
346       }
347     
348     } catch (Exception exc) {
349       logger.error(AaiUiMsgs.ERROR_SHUTDOWN_EXECUTORS, exc );
350     }
351   }
352
353   /**
354    * Clear cache.
355    */
356   public void clearCache() {}
357   
358   public ElasticSearchAdapter getElasticSearchAdapter() {
359     return elasticSearchAdapter;
360   }
361
362   public void setElasticSearchAdapter(ElasticSearchAdapter elasticSearchAdapter) {
363     this.elasticSearchAdapter = elasticSearchAdapter;
364   }
365
366   public ActiveInventoryAdapter getAaiAdapter() {
367     return aaiAdapter;
368   }
369
370   public void setAaiAdapter(ActiveInventoryAdapter aaiAdapter) {
371     this.aaiAdapter = aaiAdapter;
372   }
373
374   public String getIndexName() {
375     return indexName;
376   }
377
378   public void setIndexName(String indexName) {
379     this.indexName = indexName;
380   }
381
382
383   /**
384    * Gets the response length.
385    *
386    * @param txn the txn
387    * @return the response length
388    */
389   private long getResponseLength(NetworkTransaction txn) {
390
391     if (txn == null) {
392       return -1;
393     }
394
395     OperationResult result = txn.getOperationResult();
396
397     if (result == null) {
398       return -1;
399     }
400
401     if (result.getResult() != null) {
402       return result.getResult().length();
403     }
404
405     return -1;
406   }
407
408   /**
409    * Update elastic search counters.
410    *
411    * @param method the method
412    * @param entityType the entity type
413    * @param or the or
414    */
415   protected void updateElasticSearchCounters(HttpMethod method, String entityType,
416       OperationResult or) {
417     updateElasticSearchCounters(new NetworkTransaction(method, entityType, or));
418   }
419
420   /**
421    * Update elastic search counters.
422    *
423    * @param txn the txn
424    */
425   protected void updateElasticSearchCounters(NetworkTransaction txn) {
426
427     if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
428       esRestStats.updateCounters(txn);
429     }
430
431     if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
432       esEntityStats.updateCounters(txn);
433     }
434
435     if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
436
437       esTransactionRateController.trackResponseTime(txn.getOpTimeInMs());
438
439       esTaskProcessingStats
440           .updateTaskResponseStatsHistogram(txn.getOpTimeInMs());
441       esTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
442
443       // don't know the cost of the lengh calc, we'll see if it causes a
444       // problem
445
446       long responsePayloadSizeInBytes = getResponseLength(txn);
447       if (responsePayloadSizeInBytes >= 0) {
448         esTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
449       }
450
451       esTaskProcessingStats
452           .updateTransactionsPerSecondHistogram((long) esTransactionRateController.getCurrentTps());
453     }
454   }
455
456   /**
457    * Update active inventory counters.
458    *
459    * @param method the method
460    * @param entityType the entity type
461    * @param or the or
462    */
463   protected void updateActiveInventoryCounters(HttpMethod method, String entityType,
464       OperationResult or) {
465     updateActiveInventoryCounters(new NetworkTransaction(method, entityType, or));
466   }
467
468   /**
469    * Update active inventory counters.
470    *
471    * @param txn the txn
472    */
473   protected void updateActiveInventoryCounters(NetworkTransaction txn) {
474
475     if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
476       aaiRestStats.updateCounters(txn);
477     }
478
479     if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
480       aaiEntityStats.updateCounters(txn);
481     }
482
483     if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
484       aaiProcessingExceptionStats.updateCounters(txn);
485     }
486
487     if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
488       aaiTransactionRateController
489           .trackResponseTime(txn.getOpTimeInMs());
490
491       aaiTaskProcessingStats
492           .updateTaskResponseStatsHistogram(txn.getOpTimeInMs());
493       aaiTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
494
495       // don't know the cost of the lengh calc, we'll see if it causes a
496       // problem
497
498       long responsePayloadSizeInBytes = getResponseLength(txn);
499       if (responsePayloadSizeInBytes >= 0) {
500         aaiTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
501       }
502
503       aaiTaskProcessingStats.updateTransactionsPerSecondHistogram(
504           (long) aaiTransactionRateController.getCurrentTps());
505     }
506   }
507
508   /**
509    * Reset counters.
510    */
511   protected void resetCounters() {
512     aaiRestStats.reset();
513     aaiEntityStats.reset();
514     aaiProcessingExceptionStats.reset();
515
516     esRestStats.reset();
517     esEntityStats.reset();
518   }
519
520 }