Remove duplicated code
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / sync / AbstractEntitySynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sparky.sync;
22
23 import java.util.EnumSet;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.atomic.AtomicInteger;
26 import java.util.function.Consumer;
27
28 import org.onap.aai.cl.api.Logger;
29 import org.onap.aai.cl.eelf.LoggerFactory;
30 import org.onap.aai.cl.mdc.MdcContext;
31 import org.onap.aai.restclient.client.OperationResult;
32 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
33 import org.onap.aai.sparky.dal.ElasticSearchAdapter;
34 import org.onap.aai.sparky.dal.NetworkTransaction;
35 import org.onap.aai.sparky.dal.aai.ActiveInventoryEntityStatistics;
36 import org.onap.aai.sparky.dal.aai.ActiveInventoryProcessingExceptionStatistics;
37 import org.onap.aai.sparky.dal.elasticsearch.ElasticSearchEntityStatistics;
38 import org.onap.aai.sparky.dal.rest.HttpMethod;
39 import org.onap.aai.sparky.dal.rest.RestOperationalStatistics;
40 import org.onap.aai.sparky.logging.AaiUiMsgs;
41 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
42 import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
43 import org.onap.aai.sparky.util.NodeUtils;
44
45 import com.fasterxml.jackson.databind.ObjectMapper;
46
47 import static java.util.concurrent.CompletableFuture.supplyAsync;
48
49 /**
50  * The Class AbstractEntitySynchronizer.
51  *
52  * @author davea.
53  */
54 public abstract class AbstractEntitySynchronizer {
55
56   protected static final Logger LOG =
57           LoggerFactory.getInstance().getLogger(AbstractEntitySynchronizer.class);
58   protected static final int VERSION_CONFLICT_EXCEPTION_CODE = 409;
59   protected static final Integer RETRY_COUNT_PER_ENTITY_LIMIT = new Integer(3);
60
61   protected final Logger logger;
62   protected ObjectMapper mapper;
63   protected long syncDurationInMs;
64
65   /**
66    * The Enum StatFlag.
67    */
68   protected enum StatFlag {
69     AAI_REST_STATS, AAI_ENTITY_STATS, AAI_PROCESSING_EXCEPTION_STATS,
70     AAI_TASK_PROCESSING_STATS, ES_REST_STATS, ES_ENTITY_STATS, ES_TASK_PROCESSING_STATS
71   }
72
73   protected EnumSet<StatFlag> enabledStatFlags;
74
75   protected ElasticSearchAdapter elasticSearchAdapter;
76   protected ActiveInventoryAdapter aaiAdapter;
77
78   protected ExecutorService synchronizerExecutor;
79   protected ExecutorService aaiExecutor;
80   protected ExecutorService esExecutor;
81
82   private RestOperationalStatistics esRestStats;
83   protected ElasticSearchEntityStatistics esEntityStats;
84
85   private RestOperationalStatistics aaiRestStats;
86   protected ActiveInventoryEntityStatistics aaiEntityStats;
87   private ActiveInventoryProcessingExceptionStatistics aaiProcessingExceptionStats;
88
89   private TaskProcessingStats aaiTaskProcessingStats;
90   private TaskProcessingStats esTaskProcessingStats;
91
92   private TransactionRateMonitor aaiTransactionRateController;
93   private TransactionRateMonitor esTransactionRateController;
94
95   protected AtomicInteger aaiWorkOnHand;
96   protected AtomicInteger esWorkOnHand;
97   protected String synchronizerName;
98
99   protected abstract boolean isSyncDone();
100   protected boolean shouldSkipSync;
101
102   public String getActiveInventoryStatisticsReport() {
103
104     StringBuilder sb = new StringBuilder(128);
105
106     if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
107       sb.append("\n\n        ").append("REST Operational Stats:");
108       sb.append(aaiRestStats.getStatisticsReport());
109     }
110
111     if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
112       sb.append("\n\n        ").append("Entity Stats:");
113       sb.append(aaiEntityStats.getStatisticsReport());
114     }
115
116     if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
117       sb.append("\n\n        ").append("Processing Exception Stats:");
118       sb.append(aaiProcessingExceptionStats.getStatisticsReport());
119     }
120
121     return sb.toString();
122
123   }
124
125   public String getElasticSearchStatisticsReport() {
126
127     StringBuilder sb = new StringBuilder(128);
128
129     if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
130       sb.append("\n\n        ").append("REST Operational Stats:");
131       sb.append(esRestStats.getStatisticsReport());
132     }
133
134     if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
135       sb.append("\n\n        ").append("Entity Stats:");
136       sb.append(esEntityStats.getStatisticsReport());
137     }
138
139     return sb.toString();
140
141   }
142
143   /**
144    * Adds the active inventory stat report.
145    *
146    * @param sb the sb
147    */
148   private void addActiveInventoryStatReport(StringBuilder sb) {
149
150     if (sb == null) {
151       return;
152     }
153
154     sb.append("\n\n    AAI");
155     sb.append(getActiveInventoryStatisticsReport());
156
157     double currentTps = 0;
158     if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
159       sb.append("\n\n        ").append("Task Processor Stats:");
160       sb.append(aaiTaskProcessingStats.getStatisticsReport(false, "        "));
161
162       currentTps = aaiTransactionRateController.getCurrentTps();
163
164       sb.append("\n          ").append("Current TPS: ").append(currentTps);
165     }
166
167     sb.append("\n          ").append("Current WOH: ").append(aaiWorkOnHand.get());
168
169     if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
170       if (currentTps > 0) {
171         double numMillisecondsToCompletion = (aaiWorkOnHand.get() / currentTps) * 1000;
172         sb.append("\n            ").append("SyncDurationRemaining=")
173             .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
174       }
175     }
176
177   }
178
179   /**
180    * Adds the elastic stat report.
181    *
182    * @param sb the sb
183    */
184   private void addElasticStatReport(StringBuilder sb) {
185
186     if (sb == null) {
187       return;
188     }
189
190     sb.append("\n\n    ELASTIC");
191     sb.append(getElasticSearchStatisticsReport());
192
193     double currentTps = 0;
194
195     if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
196       sb.append("\n\n        ").append("Task Processor Stats:");
197       sb.append(esTaskProcessingStats.getStatisticsReport(false, "           "));
198
199       currentTps = esTransactionRateController.getCurrentTps();
200
201       sb.append("\n        ").append("Current TPS: ").append(currentTps);
202     }
203
204     sb.append("\n        ").append("Current WOH: ").append(esWorkOnHand.get());
205
206     if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
207       if (currentTps > 0) {
208         double numMillisecondsToCompletion = (esWorkOnHand.get() / currentTps) * 1000;
209         sb.append("\n            ").append("SyncDurationRemaining=")
210             .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
211       }
212     }
213
214
215   }
216
217   /**
218    * Gets the stat report.
219    *
220    * @param syncOpTimeInMs the sync op time in ms
221    * @param showFinalReport the show final report
222    * @return the stat report
223    */
224   protected String getStatReport(long syncOpTimeInMs, boolean showFinalReport) {
225
226     StringBuilder sb = new StringBuilder(128);
227
228     sb.append("\n").append(synchronizerName + " Statistics: ( Sync Operation Duration = "
229         + NodeUtils.getDurationBreakdown(syncOpTimeInMs) + " )");
230
231     addActiveInventoryStatReport(sb);
232     addElasticStatReport(sb);
233
234     if (showFinalReport) {
235       sb.append("\n\n        ").append("Sync Completed!\n");
236     } else {
237       sb.append("\n\n        ").append("Sync in Progress...\n");
238     }
239
240     return sb.toString();
241
242   }
243
244   protected String indexName;
245   protected long syncStartedTimeStampInMs;
246
247   /**
248    * Instantiates a new abstract entity synchronizer.
249    *
250    * @param logger the logger
251    * @param syncName the sync name
252    * @param numSyncWorkers the num sync workers
253    * @param numActiveInventoryWorkers the num active inventory workers
254    * @param numElasticsearchWorkers the num elasticsearch workers
255    * @param indexName the index name
256    * @throws Exception the exception
257    */
258   protected AbstractEntitySynchronizer(Logger logger, String syncName, int numSyncWorkers,
259       int numActiveInventoryWorkers, int numElasticsearchWorkers, String indexName,
260       NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig)
261           throws Exception {
262     this.logger = logger;
263     this.synchronizerExecutor =
264         NodeUtils.createNamedExecutor(syncName + "-INTERNAL", numSyncWorkers, logger);
265     this.aaiExecutor =
266         NodeUtils.createNamedExecutor(syncName + "-AAI", numActiveInventoryWorkers, logger);
267     this.esExecutor =
268         NodeUtils.createNamedExecutor(syncName + "-ES", numElasticsearchWorkers, logger);
269     this.mapper = new ObjectMapper();
270     this.indexName = indexName; 
271     this.esRestStats = new RestOperationalStatistics();
272     this.esEntityStats = new ElasticSearchEntityStatistics();
273     this.aaiRestStats = new RestOperationalStatistics();
274     this.aaiEntityStats = new ActiveInventoryEntityStatistics();
275     this.aaiProcessingExceptionStats = new ActiveInventoryProcessingExceptionStatistics();
276     this.aaiTaskProcessingStats =
277         new TaskProcessingStats(aaiStatConfig);
278     this.esTaskProcessingStats =
279         new TaskProcessingStats(esStatConfig);
280
281     this.aaiTransactionRateController =
282         new TransactionRateMonitor(numActiveInventoryWorkers, aaiStatConfig);
283     this.esTransactionRateController =
284         new TransactionRateMonitor(numElasticsearchWorkers, esStatConfig);
285
286     this.aaiWorkOnHand = new AtomicInteger(0);
287     this.esWorkOnHand = new AtomicInteger(0);
288
289     enabledStatFlags = EnumSet.allOf(StatFlag.class);
290
291     this.synchronizerName = "Abstact Entity Synchronizer";
292     
293     String txnID = NodeUtils.getRandomTxnId();
294         MdcContext.initialize(txnID, "AbstractEntitySynchronizer", "", "Sync", "");
295
296     this.shouldSkipSync = false;
297     this.syncStartedTimeStampInMs = System.currentTimeMillis();
298     this.syncDurationInMs = -1;
299   }
300
301   public boolean shouldSkipSync() {
302     return shouldSkipSync;
303   }
304
305   public void setShouldSkipSync(boolean shouldSkipSync) {
306     this.shouldSkipSync = shouldSkipSync;
307   }
308
309   /**
310    * Inc active inventory work on hand counter.
311    */
312   protected void incActiveInventoryWorkOnHandCounter() {
313     aaiWorkOnHand.incrementAndGet();
314   }
315
316   /**
317    * Dec active inventory work on hand counter.
318    */
319   protected void decActiveInventoryWorkOnHandCounter() {
320     aaiWorkOnHand.decrementAndGet();
321   }
322
323   /**
324    * Inc elastic search work on hand counter.
325    */
326   protected void incElasticSearchWorkOnHandCounter() {
327     esWorkOnHand.incrementAndGet();
328   }
329
330   /**
331    * Dec elastic search work on hand counter.
332    */
333   protected void decElasticSearchWorkOnHandCounter() {
334     esWorkOnHand.decrementAndGet();
335   }
336
337   /**
338    * Shutdown executors.
339    */
340   protected void shutdownExecutors() {
341     try {
342
343       if (synchronizerExecutor != null) {
344         synchronizerExecutor.shutdown();
345       }
346
347       if (aaiExecutor != null) {
348         aaiExecutor.shutdown();
349       }
350
351       if (esExecutor != null) {
352         esExecutor.shutdown();
353       }
354     
355     } catch (Exception exc) {
356       logger.error(AaiUiMsgs.ERROR_SHUTDOWN_EXECUTORS, exc );
357     }
358   }
359
360   /**
361    * Clear cache.
362    */
363   public void clearCache() {}
364   
365   public ElasticSearchAdapter getElasticSearchAdapter() {
366     return elasticSearchAdapter;
367   }
368
369   public void setElasticSearchAdapter(ElasticSearchAdapter elasticSearchAdapter) {
370     this.elasticSearchAdapter = elasticSearchAdapter;
371   }
372
373   public ActiveInventoryAdapter getAaiAdapter() {
374     return aaiAdapter;
375   }
376
377   public void setAaiAdapter(ActiveInventoryAdapter aaiAdapter) {
378     this.aaiAdapter = aaiAdapter;
379   }
380
381   public String getIndexName() {
382     return indexName;
383   }
384
385   public void setIndexName(String indexName) {
386     this.indexName = indexName;
387   }
388
389
390   /**
391    * Gets the response length.
392    *
393    * @param txn the txn
394    * @return the response length
395    */
396   private long getResponseLength(NetworkTransaction txn) {
397
398     if (txn == null) {
399       return -1;
400     }
401
402     OperationResult result = txn.getOperationResult();
403
404     if (result == null) {
405       return -1;
406     }
407
408     if (result.getResult() != null) {
409       return result.getResult().length();
410     }
411
412     return -1;
413   }
414
415   /**
416    * Update elastic search counters.
417    *
418    * @param method the method
419    * @param entityType the entity type
420    * @param or the or
421    */
422   protected void updateElasticSearchCounters(HttpMethod method, String entityType,
423       OperationResult or) {
424     updateElasticSearchCounters(new NetworkTransaction(method, entityType, or));
425   }
426
427   /**
428    * Update elastic search counters.
429    *
430    * @param txn the txn
431    */
432   protected void updateElasticSearchCounters(NetworkTransaction txn) {
433
434     if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
435       esRestStats.updateCounters(txn);
436     }
437
438     if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
439       esEntityStats.updateCounters(txn);
440     }
441
442     if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
443
444       esTransactionRateController.trackResponseTime(txn.getOpTimeInMs());
445
446       esTaskProcessingStats
447           .updateTaskResponseStatsHistogram(txn.getOpTimeInMs());
448       esTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
449
450       // don't know the cost of the lengh calc, we'll see if it causes a
451       // problem
452
453       long responsePayloadSizeInBytes = getResponseLength(txn);
454       if (responsePayloadSizeInBytes >= 0) {
455         esTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
456       }
457
458       esTaskProcessingStats
459           .updateTransactionsPerSecondHistogram((long) esTransactionRateController.getCurrentTps());
460     }
461   }
462
463   /**
464    * Update active inventory counters.
465    *
466    * @param method the method
467    * @param entityType the entity type
468    * @param or the or
469    */
470   protected void updateActiveInventoryCounters(HttpMethod method, String entityType,
471       OperationResult or) {
472     updateActiveInventoryCounters(new NetworkTransaction(method, entityType, or));
473   }
474
475   /**
476    * Update active inventory counters.
477    *
478    * @param txn the txn
479    */
480   protected void updateActiveInventoryCounters(NetworkTransaction txn) {
481
482     if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
483       aaiRestStats.updateCounters(txn);
484     }
485
486     if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
487       aaiEntityStats.updateCounters(txn);
488     }
489
490     if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
491       aaiProcessingExceptionStats.updateCounters(txn);
492     }
493
494     if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
495       aaiTransactionRateController
496           .trackResponseTime(txn.getOpTimeInMs());
497
498       aaiTaskProcessingStats
499           .updateTaskResponseStatsHistogram(txn.getOpTimeInMs());
500       aaiTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
501
502       // don't know the cost of the lengh calc, we'll see if it causes a
503       // problem
504
505       long responsePayloadSizeInBytes = getResponseLength(txn);
506       if (responsePayloadSizeInBytes >= 0) {
507         aaiTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
508       }
509
510       aaiTaskProcessingStats.updateTransactionsPerSecondHistogram(
511           (long) aaiTransactionRateController.getCurrentTps());
512     }
513   }
514
515   /**
516    * Reset counters.
517    */
518   protected void resetCounters() {
519     aaiRestStats.reset();
520     aaiEntityStats.reset();
521     aaiProcessingExceptionStats.reset();
522
523     esRestStats.reset();
524     esEntityStats.reset();
525   }
526
527
528   protected void performRetrySync(String id, Consumer<NetworkTransaction> networkTransactionConsumer, NetworkTransaction txn) {
529     String link = null;
530     try {
531       /*
532        * In this retry flow the se object has already derived its fields
533        */
534       link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), id);
535     } catch (Exception exc) {
536       LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
537     }
538
539     if (link != null) {
540       NetworkTransaction retryTransaction = new NetworkTransaction();
541       retryTransaction.setLink(link);
542       retryTransaction.setEntityType(txn.getEntityType());
543       retryTransaction.setDescriptor(txn.getDescriptor());
544       retryTransaction.setOperationType(HttpMethod.GET);
545
546       /*
547        * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
548        * called incrementAndGet when queuing the failed PUT!
549        */
550
551       supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
552               esExecutor).whenComplete((result, error) -> {
553
554         esWorkOnHand.decrementAndGet();
555
556         if (error != null) {
557           LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
558         } else {
559           updateElasticSearchCounters(result);
560           networkTransactionConsumer.accept(result);
561         }
562       });
563     }
564   }
565
566 }