Convert Sparky to Spring-Boot
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / sync / AbstractEntitySynchronizer.java
1 /**
2  * ============LICENSE_START===================================================
3  * SPARKY (AAI UI service)
4  * ============================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=====================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25 package org.onap.aai.sparky.sync;
26
27 import java.util.EnumSet;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import org.onap.aai.cl.api.Logger;
32 import org.onap.aai.cl.mdc.MdcContext;
33 import org.onap.aai.restclient.client.OperationResult;
34 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
35 import org.onap.aai.sparky.dal.ElasticSearchAdapter;
36 import org.onap.aai.sparky.dal.NetworkTransaction;
37 import org.onap.aai.sparky.dal.aai.ActiveInventoryEntityStatistics;
38 import org.onap.aai.sparky.dal.aai.ActiveInventoryProcessingExceptionStatistics;
39 import org.onap.aai.sparky.dal.elasticsearch.ElasticSearchEntityStatistics;
40 import org.onap.aai.sparky.dal.rest.HttpMethod;
41 import org.onap.aai.sparky.dal.rest.RestOperationalStatistics;
42 import org.onap.aai.sparky.logging.AaiUiMsgs;
43 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
44 import org.onap.aai.sparky.util.NodeUtils;
45
46 import com.fasterxml.jackson.databind.ObjectMapper;
47
48 /**
49  * The Class AbstractEntitySynchronizer.
50  *
51  * @author davea.
52  */
53 public abstract class AbstractEntitySynchronizer {
54
55   protected static final int VERSION_CONFLICT_EXCEPTION_CODE = 409;
56   protected static final Integer RETRY_COUNT_PER_ENTITY_LIMIT = new Integer(3);
57
58   protected final Logger logger;
59   protected ObjectMapper mapper;
60   protected long syncDurationInMs;
61
62   /**
63    * The Enum StatFlag.
64    */
65   protected enum StatFlag {
66     AAI_REST_STATS, AAI_ENTITY_STATS, AAI_PROCESSING_EXCEPTION_STATS,
67     AAI_TASK_PROCESSING_STATS, ES_REST_STATS, ES_ENTITY_STATS, ES_TASK_PROCESSING_STATS
68   }
69
70   protected EnumSet<StatFlag> enabledStatFlags;
71
72   protected ElasticSearchAdapter elasticSearchAdapter;
73   protected ActiveInventoryAdapter aaiAdapter;
74
75   protected ExecutorService synchronizerExecutor;
76   protected ExecutorService aaiExecutor;
77   protected ExecutorService esExecutor;
78
79   private RestOperationalStatistics esRestStats;
80   protected ElasticSearchEntityStatistics esEntityStats;
81
82   private RestOperationalStatistics aaiRestStats;
83   protected ActiveInventoryEntityStatistics aaiEntityStats;
84   private ActiveInventoryProcessingExceptionStatistics aaiProcessingExceptionStats;
85
86   private TaskProcessingStats aaiTaskProcessingStats;
87   private TaskProcessingStats esTaskProcessingStats;
88
89   private TransactionRateMonitor aaiTransactionRateController;
90   private TransactionRateMonitor esTransactionRateController;
91
92   protected AtomicInteger aaiWorkOnHand;
93   protected AtomicInteger esWorkOnHand;
94   protected String synchronizerName;
95
96   protected abstract boolean isSyncDone();
97   protected boolean shouldSkipSync;
98
99   public String getActiveInventoryStatisticsReport() {
100
101     StringBuilder sb = new StringBuilder(128);
102
103     if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
104       sb.append("\n\n        ").append("REST Operational Stats:");
105       sb.append(aaiRestStats.getStatisticsReport());
106     }
107
108     if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
109       sb.append("\n\n        ").append("Entity Stats:");
110       sb.append(aaiEntityStats.getStatisticsReport());
111     }
112
113     if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
114       sb.append("\n\n        ").append("Processing Exception Stats:");
115       sb.append(aaiProcessingExceptionStats.getStatisticsReport());
116     }
117
118     return sb.toString();
119
120   }
121
122   public String getElasticSearchStatisticsReport() {
123
124     StringBuilder sb = new StringBuilder(128);
125
126     if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
127       sb.append("\n\n        ").append("REST Operational Stats:");
128       sb.append(esRestStats.getStatisticsReport());
129     }
130
131     if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
132       sb.append("\n\n        ").append("Entity Stats:");
133       sb.append(esEntityStats.getStatisticsReport());
134     }
135
136     return sb.toString();
137
138   }
139
140   /**
141    * Adds the active inventory stat report.
142    *
143    * @param sb the sb
144    */
145   private void addActiveInventoryStatReport(StringBuilder sb) {
146
147     if (sb == null) {
148       return;
149     }
150
151     sb.append("\n\n    AAI");
152     sb.append(getActiveInventoryStatisticsReport());
153
154     double currentTps = 0;
155     if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
156       sb.append("\n\n        ").append("Task Processor Stats:");
157       sb.append(aaiTaskProcessingStats.getStatisticsReport(false, "        "));
158
159       currentTps = aaiTransactionRateController.getCurrentTps();
160
161       sb.append("\n          ").append("Current TPS: ").append(currentTps);
162     }
163
164     sb.append("\n          ").append("Current WOH: ").append(aaiWorkOnHand.get());
165
166     if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
167       if (currentTps > 0) {
168         double numMillisecondsToCompletion = (aaiWorkOnHand.get() / currentTps) * 1000;
169         sb.append("\n            ").append("SyncDurationRemaining=")
170             .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
171       }
172     }
173
174   }
175
176   /**
177    * Adds the elastic stat report.
178    *
179    * @param sb the sb
180    */
181   private void addElasticStatReport(StringBuilder sb) {
182
183     if (sb == null) {
184       return;
185     }
186
187     sb.append("\n\n    ELASTIC");
188     sb.append(getElasticSearchStatisticsReport());
189
190     double currentTps = 0;
191
192     if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
193       sb.append("\n\n        ").append("Task Processor Stats:");
194       sb.append(esTaskProcessingStats.getStatisticsReport(false, "           "));
195
196       currentTps = esTransactionRateController.getCurrentTps();
197
198       sb.append("\n        ").append("Current TPS: ").append(currentTps);
199     }
200
201     sb.append("\n        ").append("Current WOH: ").append(esWorkOnHand.get());
202
203     if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
204       if (currentTps > 0) {
205         double numMillisecondsToCompletion = (esWorkOnHand.get() / currentTps) * 1000;
206         sb.append("\n            ").append("SyncDurationRemaining=")
207             .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
208       }
209     }
210
211
212   }
213
214   /**
215    * Gets the stat report.
216    *
217    * @param syncOpTimeInMs the sync op time in ms
218    * @param showFinalReport the show final report
219    * @return the stat report
220    */
221   protected String getStatReport(long syncOpTimeInMs, boolean showFinalReport) {
222
223     StringBuilder sb = new StringBuilder(128);
224
225     sb.append("\n").append(synchronizerName + " Statistics: ( Sync Operation Duration = "
226         + NodeUtils.getDurationBreakdown(syncOpTimeInMs) + " )");
227
228     addActiveInventoryStatReport(sb);
229     addElasticStatReport(sb);
230
231     if (showFinalReport) {
232       sb.append("\n\n        ").append("Sync Completed!\n");
233     } else {
234       sb.append("\n\n        ").append("Sync in Progress...\n");
235     }
236
237     return sb.toString();
238
239   }
240
241   protected String indexName;
242   protected long syncStartedTimeStampInMs;
243
244   /**
245    * Instantiates a new abstract entity synchronizer.
246    *
247    * @param logger the logger
248    * @param syncName the sync name
249    * @param numSyncWorkers the num sync workers
250    * @param numActiveInventoryWorkers the num active inventory workers
251    * @param numElasticsearchWorkers the num elasticsearch workers
252    * @param indexName the index name
253    * @throws Exception the exception
254    */
255   protected AbstractEntitySynchronizer(Logger logger, String syncName, int numSyncWorkers,
256       int numActiveInventoryWorkers, int numElasticsearchWorkers, String indexName,
257       NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig)
258           throws Exception {
259     this.logger = logger;
260     this.synchronizerExecutor =
261         NodeUtils.createNamedExecutor(syncName + "-INTERNAL", numSyncWorkers, logger);
262     this.aaiExecutor =
263         NodeUtils.createNamedExecutor(syncName + "-AAI", numActiveInventoryWorkers, logger);
264     this.esExecutor =
265         NodeUtils.createNamedExecutor(syncName + "-ES", numElasticsearchWorkers, logger);
266     this.mapper = new ObjectMapper();
267     this.indexName = indexName; 
268     this.esRestStats = new RestOperationalStatistics();
269     this.esEntityStats = new ElasticSearchEntityStatistics();
270     this.aaiRestStats = new RestOperationalStatistics();
271     this.aaiEntityStats = new ActiveInventoryEntityStatistics();
272     this.aaiProcessingExceptionStats = new ActiveInventoryProcessingExceptionStatistics();
273     this.aaiTaskProcessingStats =
274         new TaskProcessingStats(aaiStatConfig);
275     this.esTaskProcessingStats =
276         new TaskProcessingStats(esStatConfig);
277
278     this.aaiTransactionRateController =
279         new TransactionRateMonitor(numActiveInventoryWorkers, aaiStatConfig);
280     this.esTransactionRateController =
281         new TransactionRateMonitor(numElasticsearchWorkers, esStatConfig);
282
283     this.aaiWorkOnHand = new AtomicInteger(0);
284     this.esWorkOnHand = new AtomicInteger(0);
285
286     enabledStatFlags = EnumSet.allOf(StatFlag.class);
287
288     this.synchronizerName = "Abstact Entity Synchronizer";
289     
290     String txnID = NodeUtils.getRandomTxnId();
291         MdcContext.initialize(txnID, "AbstractEntitySynchronizer", "", "Sync", "");
292
293     this.shouldSkipSync = false;
294     this.syncStartedTimeStampInMs = System.currentTimeMillis();
295     this.syncDurationInMs = -1;
296   }
297
298   public boolean shouldSkipSync() {
299     return shouldSkipSync;
300   }
301
302   public void setShouldSkipSync(boolean shouldSkipSync) {
303     this.shouldSkipSync = shouldSkipSync;
304   }
305
306   /**
307    * Inc active inventory work on hand counter.
308    */
309   protected void incActiveInventoryWorkOnHandCounter() {
310     aaiWorkOnHand.incrementAndGet();
311   }
312
313   /**
314    * Dec active inventory work on hand counter.
315    */
316   protected void decActiveInventoryWorkOnHandCounter() {
317     aaiWorkOnHand.decrementAndGet();
318   }
319
320   /**
321    * Inc elastic search work on hand counter.
322    */
323   protected void incElasticSearchWorkOnHandCounter() {
324     esWorkOnHand.incrementAndGet();
325   }
326
327   /**
328    * Dec elastic search work on hand counter.
329    */
330   protected void decElasticSearchWorkOnHandCounter() {
331     esWorkOnHand.decrementAndGet();
332   }
333
334   /**
335    * Shutdown executors.
336    */
337   protected void shutdownExecutors() {
338     try {
339
340       if (synchronizerExecutor != null) {
341         synchronizerExecutor.shutdown();
342       }
343
344       if (aaiExecutor != null) {
345         aaiExecutor.shutdown();
346       }
347
348       if (esExecutor != null) {
349         esExecutor.shutdown();
350       }
351     
352     } catch (Exception exc) {
353       logger.error(AaiUiMsgs.ERROR_SHUTDOWN_EXECUTORS, exc );
354     }
355   }
356
357   /**
358    * Clear cache.
359    */
360   public void clearCache() {}
361   
362   public ElasticSearchAdapter getElasticSearchAdapter() {
363     return elasticSearchAdapter;
364   }
365
366   public void setElasticSearchAdapter(ElasticSearchAdapter elasticSearchAdapter) {
367     this.elasticSearchAdapter = elasticSearchAdapter;
368   }
369
370   public ActiveInventoryAdapter getAaiAdapter() {
371     return aaiAdapter;
372   }
373
374   public void setAaiAdapter(ActiveInventoryAdapter aaiAdapter) {
375     this.aaiAdapter = aaiAdapter;
376   }
377
378   public String getIndexName() {
379     return indexName;
380   }
381
382   public void setIndexName(String indexName) {
383     this.indexName = indexName;
384   }
385
386
387   /**
388    * Gets the response length.
389    *
390    * @param txn the txn
391    * @return the response length
392    */
393   private long getResponseLength(NetworkTransaction txn) {
394
395     if (txn == null) {
396       return -1;
397     }
398
399     OperationResult result = txn.getOperationResult();
400
401     if (result == null) {
402       return -1;
403     }
404
405     if (result.getResult() != null) {
406       return result.getResult().length();
407     }
408
409     return -1;
410   }
411
412   /**
413    * Update elastic search counters.
414    *
415    * @param method the method
416    * @param entityType the entity type
417    * @param or the or
418    */
419   protected void updateElasticSearchCounters(HttpMethod method, String entityType,
420       OperationResult or) {
421     updateElasticSearchCounters(new NetworkTransaction(method, entityType, or));
422   }
423
424   /**
425    * Update elastic search counters.
426    *
427    * @param txn the txn
428    */
429   protected void updateElasticSearchCounters(NetworkTransaction txn) {
430
431     if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
432       esRestStats.updateCounters(txn);
433     }
434
435     if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
436       esEntityStats.updateCounters(txn);
437     }
438
439     if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
440
441       esTransactionRateController.trackResponseTime(txn.getOpTimeInMs());
442
443       esTaskProcessingStats
444           .updateTaskResponseStatsHistogram(txn.getOpTimeInMs());
445       esTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
446
447       // don't know the cost of the lengh calc, we'll see if it causes a
448       // problem
449
450       long responsePayloadSizeInBytes = getResponseLength(txn);
451       if (responsePayloadSizeInBytes >= 0) {
452         esTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
453       }
454
455       esTaskProcessingStats
456           .updateTransactionsPerSecondHistogram((long) esTransactionRateController.getCurrentTps());
457     }
458   }
459
460   /**
461    * Update active inventory counters.
462    *
463    * @param method the method
464    * @param entityType the entity type
465    * @param or the or
466    */
467   protected void updateActiveInventoryCounters(HttpMethod method, String entityType,
468       OperationResult or) {
469     updateActiveInventoryCounters(new NetworkTransaction(method, entityType, or));
470   }
471
472   /**
473    * Update active inventory counters.
474    *
475    * @param txn the txn
476    */
477   protected void updateActiveInventoryCounters(NetworkTransaction txn) {
478
479     if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
480       aaiRestStats.updateCounters(txn);
481     }
482
483     if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
484       aaiEntityStats.updateCounters(txn);
485     }
486
487     if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
488       aaiProcessingExceptionStats.updateCounters(txn);
489     }
490
491     if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
492       aaiTransactionRateController
493           .trackResponseTime(txn.getOpTimeInMs());
494
495       aaiTaskProcessingStats
496           .updateTaskResponseStatsHistogram(txn.getOpTimeInMs());
497       aaiTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
498
499       // don't know the cost of the lengh calc, we'll see if it causes a
500       // problem
501
502       long responsePayloadSizeInBytes = getResponseLength(txn);
503       if (responsePayloadSizeInBytes >= 0) {
504         aaiTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
505       }
506
507       aaiTaskProcessingStats.updateTransactionsPerSecondHistogram(
508           (long) aaiTransactionRateController.getCurrentTps());
509     }
510   }
511
512   /**
513    * Reset counters.
514    */
515   protected void resetCounters() {
516     aaiRestStats.reset();
517     aaiEntityStats.reset();
518     aaiProcessingExceptionStats.reset();
519
520     esRestStats.reset();
521     esEntityStats.reset();
522   }
523
524 }