bf1a7ee1fe832f0f8a1bfc20bc9c343d385e0796
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / sync / AbstractEntitySynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.sync;
24
25 import java.util.EnumSet;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29 import org.onap.aai.cl.api.Logger;
30 import org.onap.aai.cl.mdc.MdcContext;
31 import org.onap.aai.restclient.client.OperationResult;
32 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
33 import org.onap.aai.sparky.dal.ElasticSearchAdapter;
34 import org.onap.aai.sparky.dal.NetworkTransaction;
35 import org.onap.aai.sparky.dal.aai.ActiveInventoryEntityStatistics;
36 import org.onap.aai.sparky.dal.aai.ActiveInventoryProcessingExceptionStatistics;
37 import org.onap.aai.sparky.dal.elasticsearch.ElasticSearchEntityStatistics;
38 import org.onap.aai.sparky.dal.elasticsearch.config.ElasticSearchConfig;
39 import org.onap.aai.sparky.dal.rest.HttpMethod;
40 import org.onap.aai.sparky.dal.rest.RestOperationalStatistics;
41 import org.onap.aai.sparky.logging.AaiUiMsgs;
42 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
43 import org.onap.aai.sparky.util.NodeUtils;
44
45 import com.fasterxml.jackson.databind.ObjectMapper;
46
47 /**
48  * The Class AbstractEntitySynchronizer.
49  *
50  * @author davea.
51  */
52 public abstract class AbstractEntitySynchronizer {
53
54   protected static final int VERSION_CONFLICT_EXCEPTION_CODE = 409;
55   protected static final Integer RETRY_COUNT_PER_ENTITY_LIMIT = new Integer(3);
56
57   protected final Logger logger;
58   protected ObjectMapper mapper;
59   protected long syncDurationInMs;
60
61   /**
62    * The Enum StatFlag.
63    */
64   protected enum StatFlag {
65     AAI_REST_STATS, AAI_ENTITY_STATS, AAI_PROCESSING_EXCEPTION_STATS, AAI_TASK_PROCESSING_STATS, ES_REST_STATS, ES_ENTITY_STATS, ES_TASK_PROCESSING_STATS
66   }
67
68   protected EnumSet<StatFlag> enabledStatFlags;
69
70   protected ElasticSearchAdapter elasticSearchAdapter;
71   protected ActiveInventoryAdapter aaiAdapter;
72
73   protected ExecutorService synchronizerExecutor;
74   protected ExecutorService aaiExecutor;
75   protected ExecutorService esExecutor;
76
77   private RestOperationalStatistics esRestStats;
78   protected ElasticSearchEntityStatistics esEntityStats;
79
80   private RestOperationalStatistics aaiRestStats;
81   protected ActiveInventoryEntityStatistics aaiEntityStats;
82   private ActiveInventoryProcessingExceptionStatistics aaiProcessingExceptionStats;
83
84   private TaskProcessingStats aaiTaskProcessingStats;
85   private TaskProcessingStats esTaskProcessingStats;
86
87   private TransactionRateMonitor aaiTransactionRateController;
88   private TransactionRateMonitor esTransactionRateController;
89
90   protected AtomicInteger aaiWorkOnHand;
91   protected AtomicInteger esWorkOnHand;
92   protected String synchronizerName;
93
94   protected abstract boolean isSyncDone();
95
96   protected boolean shouldSkipSync;
97
98   public String getActiveInventoryStatisticsReport() {
99
100     StringBuilder sb = new StringBuilder(128);
101
102     if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
103       sb.append("\n\n        ").append("REST Operational Stats:");
104       sb.append(aaiRestStats.getStatisticsReport());
105     }
106
107     if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
108       sb.append("\n\n        ").append("Entity Stats:");
109       sb.append(aaiEntityStats.getStatisticsReport());
110     }
111
112     if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
113       sb.append("\n\n        ").append("Processing Exception Stats:");
114       sb.append(aaiProcessingExceptionStats.getStatisticsReport());
115     }
116
117     return sb.toString();
118
119   }
120
121   public String getElasticSearchStatisticsReport() {
122
123     StringBuilder sb = new StringBuilder(128);
124
125     if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
126       sb.append("\n\n        ").append("REST Operational Stats:");
127       sb.append(esRestStats.getStatisticsReport());
128     }
129
130     if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
131       sb.append("\n\n        ").append("Entity Stats:");
132       sb.append(esEntityStats.getStatisticsReport());
133     }
134
135     return sb.toString();
136
137   }
138
139   /**
140    * Adds the active inventory stat report.
141    *
142    * @param sb the sb
143    */
144   private void addActiveInventoryStatReport(StringBuilder sb) {
145
146     if (sb == null) {
147       return;
148     }
149
150     sb.append("\n\n    AAI");
151     sb.append(getActiveInventoryStatisticsReport());
152
153     double currentTps = 0;
154     if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
155       sb.append("\n\n        ").append("Task Processor Stats:");
156       sb.append(aaiTaskProcessingStats.getStatisticsReport(false, "        "));
157
158       currentTps = aaiTransactionRateController.getCurrentTps();
159
160       sb.append("\n          ").append("Current TPS: ").append(currentTps);
161     }
162
163     sb.append("\n          ").append("Current WOH: ").append(aaiWorkOnHand.get());
164
165     if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
166       if (currentTps > 0) {
167         double numMillisecondsToCompletion = (aaiWorkOnHand.get() / currentTps) * 1000;
168         sb.append("\n            ").append("SyncDurationRemaining=")
169             .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
170       }
171     }
172
173   }
174
175   /**
176    * Adds the elastic stat report.
177    *
178    * @param sb the sb
179    */
180   private void addElasticStatReport(StringBuilder sb) {
181
182     if (sb == null) {
183       return;
184     }
185
186     sb.append("\n\n    ELASTIC");
187     sb.append(getElasticSearchStatisticsReport());
188
189     double currentTps = 0;
190
191     if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
192       sb.append("\n\n        ").append("Task Processor Stats:");
193       sb.append(esTaskProcessingStats.getStatisticsReport(false, "           "));
194
195       currentTps = esTransactionRateController.getCurrentTps();
196
197       sb.append("\n        ").append("Current TPS: ").append(currentTps);
198     }
199
200     sb.append("\n        ").append("Current WOH: ").append(esWorkOnHand.get());
201
202     if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
203       if (currentTps > 0) {
204         double numMillisecondsToCompletion = (esWorkOnHand.get() / currentTps) * 1000;
205         sb.append("\n            ").append("SyncDurationRemaining=")
206             .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
207       }
208     }
209
210
211   }
212
213   /**
214    * Gets the stat report.
215    *
216    * @param syncOpTimeInMs the sync op time in ms
217    * @param showFinalReport the show final report
218    * @return the stat report
219    */
220   protected String getStatReport(long syncOpTimeInMs, boolean showFinalReport) {
221
222     StringBuilder sb = new StringBuilder(128);
223
224     sb.append("\n").append(synchronizerName + " Statistics: ( Sync Operation Duration = "
225         + NodeUtils.getDurationBreakdown(syncOpTimeInMs) + " )");
226
227     addActiveInventoryStatReport(sb);
228     addElasticStatReport(sb);
229
230     if (showFinalReport) {
231       sb.append("\n\n        ").append("Sync Completed!\n");
232     } else {
233       sb.append("\n\n        ").append("Sync in Progress...\n");
234     }
235
236     return sb.toString();
237
238   }
239
240   protected String indexName;
241   protected long syncStartedTimeStampInMs;
242
243   /**
244    * Instantiates a new abstract entity synchronizer.
245    *
246    * @param logger the logger
247    * @param syncName the sync name
248    * @param numSyncWorkers the num sync workers
249    * @param numActiveInventoryWorkers the num active inventory workers
250    * @param numElasticsearchWorkers the num elasticsearch workers
251    * @param indexName the index name
252    * @throws Exception the exception
253    */
254   protected AbstractEntitySynchronizer(Logger logger, String syncName, int numSyncWorkers,
255       int numActiveInventoryWorkers, int numElasticsearchWorkers, String indexName,
256       NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig)
257       throws Exception {
258     this.logger = logger;
259     this.synchronizerExecutor =
260         NodeUtils.createNamedExecutor(syncName + "-INTERNAL", numSyncWorkers, logger);
261     this.aaiExecutor =
262         NodeUtils.createNamedExecutor(syncName + "-AAI", numActiveInventoryWorkers, logger);
263     this.esExecutor =
264         NodeUtils.createNamedExecutor(syncName + "-ES", numElasticsearchWorkers, logger);
265     this.mapper = new ObjectMapper();
266     this.indexName = indexName;
267     this.esRestStats = new RestOperationalStatistics();
268     this.esEntityStats = new ElasticSearchEntityStatistics();
269     this.aaiRestStats = new RestOperationalStatistics();
270     this.aaiEntityStats = new ActiveInventoryEntityStatistics();
271     this.aaiProcessingExceptionStats = new ActiveInventoryProcessingExceptionStatistics();
272     this.aaiTaskProcessingStats = new TaskProcessingStats(aaiStatConfig);
273     this.esTaskProcessingStats = new TaskProcessingStats(esStatConfig);
274
275     this.aaiTransactionRateController =
276         new TransactionRateMonitor(numActiveInventoryWorkers, aaiStatConfig);
277     this.esTransactionRateController =
278         new TransactionRateMonitor(numElasticsearchWorkers, esStatConfig);
279
280     this.aaiWorkOnHand = new AtomicInteger(0);
281     this.esWorkOnHand = new AtomicInteger(0);
282
283     enabledStatFlags = EnumSet.allOf(StatFlag.class);
284
285     this.synchronizerName = "Abstact Entity Synchronizer";
286
287     String txnID = NodeUtils.getRandomTxnId();
288     MdcContext.initialize(txnID, "AbstractEntitySynchronizer", "", "Sync", "");
289
290     this.shouldSkipSync = false;
291     this.syncStartedTimeStampInMs = System.currentTimeMillis();
292     this.syncDurationInMs = -1;
293   }
294
295   public boolean shouldSkipSync() {
296     return shouldSkipSync;
297   }
298
299   public void setShouldSkipSync(boolean shouldSkipSync) {
300     this.shouldSkipSync = shouldSkipSync;
301   }
302
303   /**
304    * Inc active inventory work on hand counter.
305    */
306   protected void incActiveInventoryWorkOnHandCounter() {
307     aaiWorkOnHand.incrementAndGet();
308   }
309
310   /**
311    * Dec active inventory work on hand counter.
312    */
313   protected void decActiveInventoryWorkOnHandCounter() {
314     aaiWorkOnHand.decrementAndGet();
315   }
316
317   /**
318    * Inc elastic search work on hand counter.
319    */
320   protected void incElasticSearchWorkOnHandCounter() {
321     esWorkOnHand.incrementAndGet();
322   }
323
324   /**
325    * Dec elastic search work on hand counter.
326    */
327   protected void decElasticSearchWorkOnHandCounter() {
328     esWorkOnHand.decrementAndGet();
329   }
330
331   /**
332    * Shutdown executors.
333    */
334   protected void shutdownExecutors() {
335     try {
336
337       if (synchronizerExecutor != null) {
338         synchronizerExecutor.shutdown();
339       }
340
341       if (aaiExecutor != null) {
342         aaiExecutor.shutdown();
343       }
344
345       if (esExecutor != null) {
346         esExecutor.shutdown();
347       }
348
349     } catch (Exception exc) {
350       logger.error(AaiUiMsgs.ERROR_SHUTDOWN_EXECUTORS, exc);
351     }
352   }
353
354   /**
355    * Clear cache.
356    */
357   public void clearCache() {}
358
359   public ElasticSearchAdapter getElasticSearchAdapter() {
360     return elasticSearchAdapter;
361   }
362
363   public void setElasticSearchAdapter(ElasticSearchAdapter elasticSearchAdapter) {
364     this.elasticSearchAdapter = elasticSearchAdapter;
365   }
366
367   public ActiveInventoryAdapter getAaiAdapter() {
368     return aaiAdapter;
369   }
370
371   public void setAaiAdapter(ActiveInventoryAdapter aaiAdapter) {
372     this.aaiAdapter = aaiAdapter;
373   }
374
375   /**
376    * Gets the elastic full url.
377    *
378    * @param resourceUrl the resource url
379    * @param indexName the index name
380    * @param indexType the index type
381    * @return the elastic full url
382    * @throws Exception the exception
383    */
384   protected String getElasticFullUrl(String resourceUrl, String indexName, String indexType)
385       throws Exception {
386     return ElasticSearchConfig.getConfig().getElasticFullUrl(resourceUrl, indexName, indexType);
387   }
388
389   /**
390    * Gets the elastic full url.
391    *
392    * @param resourceUrl the resource url
393    * @param indexName the index name
394    * @return the elastic full url
395    * @throws Exception the exception
396    */
397   protected String getElasticFullUrl(String resourceUrl, String indexName) throws Exception {
398     return ElasticSearchConfig.getConfig().getElasticFullUrl(resourceUrl, indexName);
399   }
400
401   public String getIndexName() {
402     return indexName;
403   }
404
405   public void setIndexName(String indexName) {
406     this.indexName = indexName;
407   }
408
409
410   /**
411    * Gets the response length.
412    *
413    * @param txn the txn
414    * @return the response length
415    */
416   private long getResponseLength(NetworkTransaction txn) {
417
418     if (txn == null) {
419       return -1;
420     }
421
422     OperationResult result = txn.getOperationResult();
423
424     if (result == null) {
425       return -1;
426     }
427
428     if (result.getResult() != null) {
429       return result.getResult().length();
430     }
431
432     return -1;
433   }
434
435   /**
436    * Update elastic search counters.
437    *
438    * @param method the method
439    * @param or the or
440    */
441   protected void updateElasticSearchCounters(HttpMethod method, OperationResult or) {
442     updateElasticSearchCounters(new NetworkTransaction(method, null, or));
443   }
444
445   /**
446    * Update elastic search counters.
447    *
448    * @param method the method
449    * @param entityType the entity type
450    * @param or the or
451    */
452   protected void updateElasticSearchCounters(HttpMethod method, String entityType,
453       OperationResult or) {
454     updateElasticSearchCounters(new NetworkTransaction(method, entityType, or));
455   }
456
457   /**
458    * Update elastic search counters.
459    *
460    * @param txn the txn
461    */
462   protected void updateElasticSearchCounters(NetworkTransaction txn) {
463
464     if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
465       esRestStats.updateCounters(txn);
466     }
467
468     if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
469       esEntityStats.updateCounters(txn);
470     }
471
472     if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
473
474       esTransactionRateController.trackResponseTime(txn.getOpTimeInMs());
475
476       esTaskProcessingStats.updateTaskResponseStatsHistogram(txn.getOpTimeInMs());
477       esTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
478
479       // don't know the cost of the lengh calc, we'll see if it causes a
480       // problem
481
482       long responsePayloadSizeInBytes = getResponseLength(txn);
483       if (responsePayloadSizeInBytes >= 0) {
484         esTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
485       }
486
487       esTaskProcessingStats
488           .updateTransactionsPerSecondHistogram((long) esTransactionRateController.getCurrentTps());
489     }
490   }
491
492   /**
493    * Update active inventory counters.
494    *
495    * @param method the method
496    * @param or the or
497    */
498   protected void updateActiveInventoryCounters(HttpMethod method, OperationResult or) {
499     updateActiveInventoryCounters(new NetworkTransaction(method, null, or));
500   }
501
502   /**
503    * Update active inventory counters.
504    *
505    * @param method the method
506    * @param entityType the entity type
507    * @param or the or
508    */
509   protected void updateActiveInventoryCounters(HttpMethod method, String entityType,
510       OperationResult or) {
511     updateActiveInventoryCounters(new NetworkTransaction(method, entityType, or));
512   }
513
514   /**
515    * Update active inventory counters.
516    *
517    * @param txn the txn
518    */
519   protected void updateActiveInventoryCounters(NetworkTransaction txn) {
520
521     if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
522       aaiRestStats.updateCounters(txn);
523     }
524
525     if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
526       aaiEntityStats.updateCounters(txn);
527     }
528
529     if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
530       aaiProcessingExceptionStats.updateCounters(txn);
531     }
532
533     if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
534       aaiTransactionRateController.trackResponseTime(txn.getOpTimeInMs());
535
536       aaiTaskProcessingStats.updateTaskResponseStatsHistogram(txn.getOpTimeInMs());
537       aaiTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
538
539       // don't know the cost of the lengh calc, we'll see if it causes a
540       // problem
541
542       long responsePayloadSizeInBytes = getResponseLength(txn);
543       if (responsePayloadSizeInBytes >= 0) {
544         aaiTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
545       }
546
547       aaiTaskProcessingStats.updateTransactionsPerSecondHistogram(
548           (long) aaiTransactionRateController.getCurrentTps());
549     }
550   }
551
552   /**
553    * Reset counters.
554    */
555   protected void resetCounters() {
556     aaiRestStats.reset();
557     aaiEntityStats.reset();
558     aaiProcessingExceptionStats.reset();
559
560     esRestStats.reset();
561     esEntityStats.reset();
562   }
563
564 }