Merge "Fix Blocker/Critical sonar issues"
[aai/sparky-be.git] / src / main / java / org / openecomp / sparky / synchronizer / AbstractEntitySynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.openecomp.sparky.synchronizer;
24
25 import java.net.InetAddress;
26 import java.net.UnknownHostException;
27 import java.util.EnumSet;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import org.openecomp.cl.api.Logger;
32 import org.openecomp.sparky.config.oxm.OxmModelLoader;
33 import org.openecomp.sparky.dal.NetworkTransaction;
34 import org.openecomp.sparky.dal.aai.ActiveInventoryDataProvider;
35 import org.openecomp.sparky.dal.aai.ActiveInventoryEntityStatistics;
36 import org.openecomp.sparky.dal.aai.ActiveInventoryProcessingExceptionStatistics;
37 import org.openecomp.sparky.dal.aai.config.ActiveInventoryConfig;
38 import org.openecomp.sparky.dal.elasticsearch.ElasticSearchDataProvider;
39 import org.openecomp.sparky.dal.elasticsearch.ElasticSearchEntityStatistics;
40 import org.openecomp.sparky.dal.elasticsearch.config.ElasticSearchConfig;
41 import org.openecomp.sparky.dal.rest.HttpMethod;
42 import org.openecomp.sparky.dal.rest.OperationResult;
43 import org.openecomp.sparky.dal.rest.RestOperationalStatistics;
44 import org.openecomp.sparky.logging.AaiUiMsgs;
45 import org.openecomp.sparky.util.NodeUtils;
46
47 import org.openecomp.cl.mdc.MdcContext;
48 import com.fasterxml.jackson.databind.ObjectMapper;
49
50 /**
51  * The Class AbstractEntitySynchronizer.
52  *
53  * @author davea.
54  */
55 public abstract class AbstractEntitySynchronizer {
56
57   protected static final int VERSION_CONFLICT_EXCEPTION_CODE = 409;
58   protected static final Integer RETRY_COUNT_PER_ENTITY_LIMIT = new Integer(3);
59
60   protected final Logger logger;
61   protected ObjectMapper mapper;
62   protected OxmModelLoader oxmModelLoader;
63   protected long syncDurationInMs;
64   /**
65    * The Enum StatFlag.
66    */
67   protected enum StatFlag {
68     AAI_REST_STATS, AAI_ENTITY_STATS, AAI_PROCESSING_EXCEPTION_STATS,
69     AAI_TASK_PROCESSING_STATS, ES_REST_STATS, ES_ENTITY_STATS, ES_TASK_PROCESSING_STATS
70   }
71
72   protected EnumSet<StatFlag> enabledStatFlags;
73
74   protected ActiveInventoryDataProvider aaiDataProvider;
75   protected ElasticSearchDataProvider esDataProvider;
76
77   protected ExecutorService synchronizerExecutor;
78   protected ExecutorService aaiExecutor;
79   protected ExecutorService esExecutor;
80
81   private RestOperationalStatistics esRestStats;
82   protected ElasticSearchEntityStatistics esEntityStats;
83
84   private RestOperationalStatistics aaiRestStats;
85   protected ActiveInventoryEntityStatistics aaiEntityStats;
86   private ActiveInventoryProcessingExceptionStatistics aaiProcessingExceptionStats;
87
88   private TaskProcessingStats aaiTaskProcessingStats;
89   private TaskProcessingStats esTaskProcessingStats;
90
91   private TransactionRateController aaiTransactionRateController;
92   private TransactionRateController esTransactionRateController;
93
94   protected AtomicInteger aaiWorkOnHand;
95   protected AtomicInteger esWorkOnHand;
96   protected String synchronizerName;
97
98   protected abstract boolean isSyncDone();
99   protected boolean shouldSkipSync;
100   
101   public String getActiveInventoryStatisticsReport() {
102
103     StringBuilder sb = new StringBuilder(128);
104
105     if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
106       sb.append("\n\n        ").append("REST Operational Stats:");
107       sb.append(aaiRestStats.getStatisticsReport());
108     }
109
110     if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
111       sb.append("\n\n        ").append("Entity Stats:");
112       sb.append(aaiEntityStats.getStatisticsReport());
113     }
114
115     if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
116       sb.append("\n\n        ").append("Processing Exception Stats:");
117       sb.append(aaiProcessingExceptionStats.getStatisticsReport());
118     }
119
120     return sb.toString();
121
122   }
123
124   public String getElasticSearchStatisticsReport() {
125
126     StringBuilder sb = new StringBuilder(128);
127
128     if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
129       sb.append("\n\n        ").append("REST Operational Stats:");
130       sb.append(esRestStats.getStatisticsReport());
131     }
132
133     if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
134       sb.append("\n\n        ").append("Entity Stats:");
135       sb.append(esEntityStats.getStatisticsReport());
136     }
137
138     return sb.toString();
139
140   }
141
142   /**
143    * Adds the active inventory stat report.
144    *
145    * @param sb the sb
146    */
147   private void addActiveInventoryStatReport(StringBuilder sb) {
148
149     if (sb == null) {
150       return;
151     }
152
153     sb.append("\n\n    AAI");
154     sb.append(getActiveInventoryStatisticsReport());
155
156     double currentTps = 0;
157     if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
158       sb.append("\n\n        ").append("Task Processor Stats:");
159       sb.append(aaiTaskProcessingStats.getStatisticsReport(false, "        "));
160
161       currentTps = aaiTransactionRateController.getCurrentTps();
162
163       sb.append("\n          ").append("Current TPS: ").append(currentTps);
164     }
165
166     sb.append("\n          ").append("Current WOH: ").append(aaiWorkOnHand.get());
167
168     if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
169       if (currentTps > 0) {
170         double numMillisecondsToCompletion = (aaiWorkOnHand.get() / currentTps) * 1000;
171         sb.append("\n            ").append("SyncDurationRemaining=")
172             .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
173       }
174     }
175
176   }
177
178   /**
179    * Adds the elastic stat report.
180    *
181    * @param sb the sb
182    */
183   private void addElasticStatReport(StringBuilder sb) {
184
185     if (sb == null) {
186       return;
187     }
188
189     sb.append("\n\n    ELASTIC");
190     sb.append(getElasticSearchStatisticsReport());
191
192     double currentTps = 0;
193
194     if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
195       sb.append("\n\n        ").append("Task Processor Stats:");
196       sb.append(esTaskProcessingStats.getStatisticsReport(false, "           "));
197
198       currentTps = esTransactionRateController.getCurrentTps();
199
200       sb.append("\n        ").append("Current TPS: ").append(currentTps);
201     }
202
203     sb.append("\n        ").append("Current WOH: ").append(esWorkOnHand.get());
204
205     if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
206       if (currentTps > 0) {
207         double numMillisecondsToCompletion = (esWorkOnHand.get() / currentTps) * 1000;
208         sb.append("\n            ").append("SyncDurationRemaining=")
209             .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
210       }
211     }
212
213
214   }
215
216   /**
217    * Gets the stat report.
218    *
219    * @param syncOpTimeInMs the sync op time in ms
220    * @param showFinalReport the show final report
221    * @return the stat report
222    */
223   protected String getStatReport(long syncOpTimeInMs, boolean showFinalReport) {
224
225     StringBuilder sb = new StringBuilder(128);
226
227     sb.append("\n").append(synchronizerName + " Statistics: ( Sync Operation Duration = "
228         + NodeUtils.getDurationBreakdown(syncOpTimeInMs) + " )");
229
230     addActiveInventoryStatReport(sb);
231     addElasticStatReport(sb);
232
233     if (showFinalReport) {
234       sb.append("\n\n        ").append("Sync Completed!\n");
235     } else {
236       sb.append("\n\n        ").append("Sync in Progress...\n");
237     }
238
239     return sb.toString();
240
241   }
242
243   protected String indexName;
244   protected long syncStartedTimeStampInMs;
245
246   /**
247    * Instantiates a new abstract entity synchronizer.
248    *
249    * @param logger the logger
250    * @param syncName the sync name
251    * @param numSyncWorkers the num sync workers
252    * @param numActiveInventoryWorkers the num active inventory workers
253    * @param numElasticsearchWorkers the num elasticsearch workers
254    * @param indexName the index name
255    * @throws Exception the exception
256    */
257   protected AbstractEntitySynchronizer(Logger logger, String syncName, int numSyncWorkers,
258       int numActiveInventoryWorkers, int numElasticsearchWorkers, String indexName)
259           throws Exception {
260     this.logger = logger;
261     this.synchronizerExecutor =
262         NodeUtils.createNamedExecutor(syncName + "-INTERNAL", numSyncWorkers, logger);
263     this.aaiExecutor =
264         NodeUtils.createNamedExecutor(syncName + "-AAI", numActiveInventoryWorkers, logger);
265     this.esExecutor =
266         NodeUtils.createNamedExecutor(syncName + "-ES", numElasticsearchWorkers, logger);
267     this.mapper = new ObjectMapper();
268     this.oxmModelLoader = OxmModelLoader.getInstance();
269     this.indexName = indexName;
270     this.esRestStats = new RestOperationalStatistics();
271     this.esEntityStats = new ElasticSearchEntityStatistics(oxmModelLoader);
272     this.aaiRestStats = new RestOperationalStatistics();
273     this.aaiEntityStats = new ActiveInventoryEntityStatistics(oxmModelLoader);
274     this.aaiProcessingExceptionStats = new ActiveInventoryProcessingExceptionStatistics();
275     this.aaiTaskProcessingStats =
276         new TaskProcessingStats(ActiveInventoryConfig.getConfig().getTaskProcessorConfig());
277     this.esTaskProcessingStats =
278         new TaskProcessingStats(ElasticSearchConfig.getConfig().getProcessorConfig());
279
280     this.aaiTransactionRateController =
281         new TransactionRateController(ActiveInventoryConfig.getConfig().getTaskProcessorConfig());
282     this.esTransactionRateController =
283         new TransactionRateController(ElasticSearchConfig.getConfig().getProcessorConfig());
284
285     this.aaiWorkOnHand = new AtomicInteger(0);
286     this.esWorkOnHand = new AtomicInteger(0);
287
288     enabledStatFlags = EnumSet.allOf(StatFlag.class);
289
290     this.synchronizerName = "Abstact Entity Synchronizer";
291     
292     String txnID = NodeUtils.getRandomTxnId();
293         MdcContext.initialize(txnID, "AbstractEntitySynchronizer", "", "Sync", "");
294         
295         this.shouldSkipSync = false;
296     this.syncStartedTimeStampInMs = System.currentTimeMillis();
297     this.syncDurationInMs = -1;
298   }
299
300   public boolean shouldSkipSync() {
301     return shouldSkipSync;
302   }
303
304   public void setShouldSkipSync(boolean shouldSkipSync) {
305     this.shouldSkipSync = shouldSkipSync;
306   }
307
308   /**
309    * Inc active inventory work on hand counter.
310    */
311   protected void incActiveInventoryWorkOnHandCounter() {
312     aaiWorkOnHand.incrementAndGet();
313   }
314
315   /**
316    * Dec active inventory work on hand counter.
317    */
318   protected void decActiveInventoryWorkOnHandCounter() {
319     aaiWorkOnHand.decrementAndGet();
320   }
321
322   /**
323    * Inc elastic search work on hand counter.
324    */
325   protected void incElasticSearchWorkOnHandCounter() {
326     esWorkOnHand.incrementAndGet();
327   }
328
329   /**
330    * Dec elastic search work on hand counter.
331    */
332   protected void decElasticSearchWorkOnHandCounter() {
333     esWorkOnHand.decrementAndGet();
334   }
335
336   /**
337    * Shutdown executors.
338    */
339   protected void shutdownExecutors() {
340     try {
341       synchronizerExecutor.shutdown();
342       aaiExecutor.shutdown();
343       esExecutor.shutdown();
344       aaiDataProvider.shutdown();
345       esDataProvider.shutdown();
346     } catch (Exception exc) {
347       logger.error(AaiUiMsgs.ERROR_SHUTDOWN_EXECUTORS, exc );
348     }
349   }
350
351   /**
352    * Clear cache.
353    */
354   public void clearCache() {
355     if (aaiDataProvider != null) {
356       aaiDataProvider.clearCache();
357     }
358   }
359
360   protected ActiveInventoryDataProvider getAaiDataProvider() {
361     return aaiDataProvider;
362   }
363
364   public void setAaiDataProvider(ActiveInventoryDataProvider aaiDataProvider) {
365     this.aaiDataProvider = aaiDataProvider;
366   }
367
368   protected ElasticSearchDataProvider getEsDataProvider() {
369     return esDataProvider;
370   }
371
372   public void setEsDataProvider(ElasticSearchDataProvider provider) {
373     this.esDataProvider = provider;
374   }
375
376   /**
377    * Gets the elastic full url.
378    *
379    * @param resourceUrl the resource url
380    * @param indexName the index name
381    * @param indexType the index type
382    * @return the elastic full url
383    * @throws Exception the exception
384    */
385   protected String getElasticFullUrl(String resourceUrl, String indexName, String indexType)
386       throws Exception {
387     return ElasticSearchConfig.getConfig().getElasticFullUrl(resourceUrl, indexName, indexType);
388   }
389
390   /**
391    * Gets the elastic full url.
392    *
393    * @param resourceUrl the resource url
394    * @param indexName the index name
395    * @return the elastic full url
396    * @throws Exception the exception
397    */
398   protected String getElasticFullUrl(String resourceUrl, String indexName) throws Exception {
399     return ElasticSearchConfig.getConfig().getElasticFullUrl(resourceUrl, indexName);
400   }
401
402   public String getIndexName() {
403     return indexName;
404   }
405
406   public void setIndexName(String indexName) {
407     this.indexName = indexName;
408   }
409
410
411   /**
412    * Gets the response length.
413    *
414    * @param txn the txn
415    * @return the response length
416    */
417   private long getResponseLength(NetworkTransaction txn) {
418
419     if (txn == null) {
420       return -1;
421     }
422
423     OperationResult result = txn.getOperationResult();
424
425     if (result == null) {
426       return -1;
427     }
428
429     if (result.getResult() != null) {
430       return result.getResult().length();
431     }
432
433     return -1;
434   }
435
436   /**
437    * Update elastic search counters.
438    *
439    * @param method the method
440    * @param or the or
441    */
442   protected void updateElasticSearchCounters(HttpMethod method, OperationResult or) {
443     updateElasticSearchCounters(new NetworkTransaction(method, null, or));
444   }
445
446   /**
447    * Update elastic search counters.
448    *
449    * @param method the method
450    * @param entityType the entity type
451    * @param or the or
452    */
453   protected void updateElasticSearchCounters(HttpMethod method, String entityType,
454       OperationResult or) {
455     updateElasticSearchCounters(new NetworkTransaction(method, entityType, or));
456   }
457
458   /**
459    * Update elastic search counters.
460    *
461    * @param txn the txn
462    */
463   protected void updateElasticSearchCounters(NetworkTransaction txn) {
464
465     if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
466       esRestStats.updateCounters(txn);
467     }
468
469     if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
470       esEntityStats.updateCounters(txn);
471     }
472
473     if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
474
475       esTransactionRateController.trackResponseTime(txn.getOperationResult().getResponseTimeInMs());
476
477       esTaskProcessingStats
478           .updateTaskResponseStatsHistogram(txn.getOperationResult().getResponseTimeInMs());
479       esTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
480
481       // don't know the cost of the lengh calc, we'll see if it causes a
482       // problem
483
484       long responsePayloadSizeInBytes = getResponseLength(txn);
485       if (responsePayloadSizeInBytes >= 0) {
486         esTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
487       }
488
489       esTaskProcessingStats
490           .updateTransactionsPerSecondHistogram((long) esTransactionRateController.getCurrentTps());
491     }
492   }
493
494   /**
495    * Update active inventory counters.
496    *
497    * @param method the method
498    * @param or the or
499    */
500   protected void updateActiveInventoryCounters(HttpMethod method, OperationResult or) {
501     updateActiveInventoryCounters(new NetworkTransaction(method, null, or));
502   }
503
504   /**
505    * Update active inventory counters.
506    *
507    * @param method the method
508    * @param entityType the entity type
509    * @param or the or
510    */
511   protected void updateActiveInventoryCounters(HttpMethod method, String entityType,
512       OperationResult or) {
513     updateActiveInventoryCounters(new NetworkTransaction(method, entityType, or));
514   }
515
516   /**
517    * Update active inventory counters.
518    *
519    * @param txn the txn
520    */
521   protected void updateActiveInventoryCounters(NetworkTransaction txn) {
522
523     if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
524       aaiRestStats.updateCounters(txn);
525     }
526
527     if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
528       aaiEntityStats.updateCounters(txn);
529     }
530
531     if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
532       aaiProcessingExceptionStats.updateCounters(txn);
533     }
534
535     if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
536       aaiTransactionRateController
537           .trackResponseTime(txn.getOperationResult().getResponseTimeInMs());
538
539       aaiTaskProcessingStats
540           .updateTaskResponseStatsHistogram(txn.getOperationResult().getResponseTimeInMs());
541       aaiTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
542
543       // don't know the cost of the lengh calc, we'll see if it causes a
544       // problem
545
546       long responsePayloadSizeInBytes = getResponseLength(txn);
547       if (responsePayloadSizeInBytes >= 0) {
548         aaiTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
549       }
550
551       aaiTaskProcessingStats.updateTransactionsPerSecondHistogram(
552           (long) aaiTransactionRateController.getCurrentTps());
553     }
554   }
555
556   /**
557    * Reset counters.
558    */
559   protected void resetCounters() {
560     aaiRestStats.reset();
561     aaiEntityStats.reset();
562     aaiProcessingExceptionStats.reset();
563
564     esRestStats.reset();
565     esEntityStats.reset();
566   }
567
568 }