Renaming openecomp to onap
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / synchronizer / AbstractEntitySynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.synchronizer;
24
25 import java.net.InetAddress;
26 import java.net.UnknownHostException;
27 import java.util.EnumSet;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import org.onap.aai.sparky.config.oxm.OxmModelLoader;
32 import org.onap.aai.sparky.dal.NetworkTransaction;
33 import org.onap.aai.sparky.dal.aai.ActiveInventoryDataProvider;
34 import org.onap.aai.sparky.dal.aai.ActiveInventoryEntityStatistics;
35 import org.onap.aai.sparky.dal.aai.ActiveInventoryProcessingExceptionStatistics;
36 import org.onap.aai.sparky.dal.aai.config.ActiveInventoryConfig;
37 import org.onap.aai.sparky.dal.elasticsearch.ElasticSearchDataProvider;
38 import org.onap.aai.sparky.dal.elasticsearch.ElasticSearchEntityStatistics;
39 import org.onap.aai.sparky.dal.elasticsearch.config.ElasticSearchConfig;
40 import org.onap.aai.sparky.dal.rest.HttpMethod;
41 import org.onap.aai.sparky.dal.rest.OperationResult;
42 import org.onap.aai.sparky.dal.rest.RestOperationalStatistics;
43 import org.onap.aai.sparky.logging.AaiUiMsgs;
44 import org.onap.aai.sparky.util.NodeUtils;
45 import org.onap.aai.cl.api.Logger;
46 import org.onap.aai.cl.mdc.MdcContext;
47 import com.fasterxml.jackson.databind.ObjectMapper;
48
49 /**
50  * The Class AbstractEntitySynchronizer.
51  *
52  * @author davea.
53  */
54 public abstract class AbstractEntitySynchronizer {
55
56   protected static final int VERSION_CONFLICT_EXCEPTION_CODE = 409;
57   protected static final Integer RETRY_COUNT_PER_ENTITY_LIMIT = new Integer(3);
58
59   protected final Logger logger;
60   protected ObjectMapper mapper;
61   protected OxmModelLoader oxmModelLoader;
62   protected long syncDurationInMs;
63   /**
64    * The Enum StatFlag.
65    */
66   protected enum StatFlag {
67     AAI_REST_STATS, AAI_ENTITY_STATS, AAI_PROCESSING_EXCEPTION_STATS,
68     AAI_TASK_PROCESSING_STATS, ES_REST_STATS, ES_ENTITY_STATS, ES_TASK_PROCESSING_STATS
69   }
70
71   protected EnumSet<StatFlag> enabledStatFlags;
72
73   protected ActiveInventoryDataProvider aaiDataProvider;
74   protected ElasticSearchDataProvider esDataProvider;
75
76   protected ExecutorService synchronizerExecutor;
77   protected ExecutorService aaiExecutor;
78   protected ExecutorService esExecutor;
79
80   private RestOperationalStatistics esRestStats;
81   protected ElasticSearchEntityStatistics esEntityStats;
82
83   private RestOperationalStatistics aaiRestStats;
84   protected ActiveInventoryEntityStatistics aaiEntityStats;
85   private ActiveInventoryProcessingExceptionStatistics aaiProcessingExceptionStats;
86
87   private TaskProcessingStats aaiTaskProcessingStats;
88   private TaskProcessingStats esTaskProcessingStats;
89
90   private TransactionRateController aaiTransactionRateController;
91   private TransactionRateController esTransactionRateController;
92
93   protected AtomicInteger aaiWorkOnHand;
94   protected AtomicInteger esWorkOnHand;
95   protected String synchronizerName;
96
97   protected abstract boolean isSyncDone();
98   protected boolean shouldSkipSync;
99   
100   public String getActiveInventoryStatisticsReport() {
101
102     StringBuilder sb = new StringBuilder(128);
103
104     if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
105       sb.append("\n\n        ").append("REST Operational Stats:");
106       sb.append(aaiRestStats.getStatisticsReport());
107     }
108
109     if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
110       sb.append("\n\n        ").append("Entity Stats:");
111       sb.append(aaiEntityStats.getStatisticsReport());
112     }
113
114     if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
115       sb.append("\n\n        ").append("Processing Exception Stats:");
116       sb.append(aaiProcessingExceptionStats.getStatisticsReport());
117     }
118
119     return sb.toString();
120
121   }
122
123   public String getElasticSearchStatisticsReport() {
124
125     StringBuilder sb = new StringBuilder(128);
126
127     if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
128       sb.append("\n\n        ").append("REST Operational Stats:");
129       sb.append(esRestStats.getStatisticsReport());
130     }
131
132     if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
133       sb.append("\n\n        ").append("Entity Stats:");
134       sb.append(esEntityStats.getStatisticsReport());
135     }
136
137     return sb.toString();
138
139   }
140
141   /**
142    * Adds the active inventory stat report.
143    *
144    * @param sb the sb
145    */
146   private void addActiveInventoryStatReport(StringBuilder sb) {
147
148     if (sb == null) {
149       return;
150     }
151
152     sb.append("\n\n    AAI");
153     sb.append(getActiveInventoryStatisticsReport());
154
155     double currentTps = 0;
156     if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
157       sb.append("\n\n        ").append("Task Processor Stats:");
158       sb.append(aaiTaskProcessingStats.getStatisticsReport(false, "        "));
159
160       currentTps = aaiTransactionRateController.getCurrentTps();
161
162       sb.append("\n          ").append("Current TPS: ").append(currentTps);
163     }
164
165     sb.append("\n          ").append("Current WOH: ").append(aaiWorkOnHand.get());
166
167     if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
168       if (currentTps > 0) {
169         double numMillisecondsToCompletion = (aaiWorkOnHand.get() / currentTps) * 1000;
170         sb.append("\n            ").append("SyncDurationRemaining=")
171             .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
172       }
173     }
174
175   }
176
177   /**
178    * Adds the elastic stat report.
179    *
180    * @param sb the sb
181    */
182   private void addElasticStatReport(StringBuilder sb) {
183
184     if (sb == null) {
185       return;
186     }
187
188     sb.append("\n\n    ELASTIC");
189     sb.append(getElasticSearchStatisticsReport());
190
191     double currentTps = 0;
192
193     if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
194       sb.append("\n\n        ").append("Task Processor Stats:");
195       sb.append(esTaskProcessingStats.getStatisticsReport(false, "           "));
196
197       currentTps = esTransactionRateController.getCurrentTps();
198
199       sb.append("\n        ").append("Current TPS: ").append(currentTps);
200     }
201
202     sb.append("\n        ").append("Current WOH: ").append(esWorkOnHand.get());
203
204     if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
205       if (currentTps > 0) {
206         double numMillisecondsToCompletion = (esWorkOnHand.get() / currentTps) * 1000;
207         sb.append("\n            ").append("SyncDurationRemaining=")
208             .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
209       }
210     }
211
212
213   }
214
215   /**
216    * Gets the stat report.
217    *
218    * @param syncOpTimeInMs the sync op time in ms
219    * @param showFinalReport the show final report
220    * @return the stat report
221    */
222   protected String getStatReport(long syncOpTimeInMs, boolean showFinalReport) {
223
224     StringBuilder sb = new StringBuilder(128);
225
226     sb.append("\n").append(synchronizerName + " Statistics: ( Sync Operation Duration = "
227         + NodeUtils.getDurationBreakdown(syncOpTimeInMs) + " )");
228
229     addActiveInventoryStatReport(sb);
230     addElasticStatReport(sb);
231
232     if (showFinalReport) {
233       sb.append("\n\n        ").append("Sync Completed!\n");
234     } else {
235       sb.append("\n\n        ").append("Sync in Progress...\n");
236     }
237
238     return sb.toString();
239
240   }
241
242   protected String indexName;
243   protected long syncStartedTimeStampInMs;
244
245   /**
246    * Instantiates a new abstract entity synchronizer.
247    *
248    * @param logger the logger
249    * @param syncName the sync name
250    * @param numSyncWorkers the num sync workers
251    * @param numActiveInventoryWorkers the num active inventory workers
252    * @param numElasticsearchWorkers the num elasticsearch workers
253    * @param indexName the index name
254    * @throws Exception the exception
255    */
256   protected AbstractEntitySynchronizer(Logger logger, String syncName, int numSyncWorkers,
257       int numActiveInventoryWorkers, int numElasticsearchWorkers, String indexName)
258           throws Exception {
259     this.logger = logger;
260     this.synchronizerExecutor =
261         NodeUtils.createNamedExecutor(syncName + "-INTERNAL", numSyncWorkers, logger);
262     this.aaiExecutor =
263         NodeUtils.createNamedExecutor(syncName + "-AAI", numActiveInventoryWorkers, logger);
264     this.esExecutor =
265         NodeUtils.createNamedExecutor(syncName + "-ES", numElasticsearchWorkers, logger);
266     this.mapper = new ObjectMapper();
267     this.oxmModelLoader = OxmModelLoader.getInstance();
268     this.indexName = indexName;
269     this.esRestStats = new RestOperationalStatistics();
270     this.esEntityStats = new ElasticSearchEntityStatistics(oxmModelLoader);
271     this.aaiRestStats = new RestOperationalStatistics();
272     this.aaiEntityStats = new ActiveInventoryEntityStatistics(oxmModelLoader);
273     this.aaiProcessingExceptionStats = new ActiveInventoryProcessingExceptionStatistics();
274     this.aaiTaskProcessingStats =
275         new TaskProcessingStats(ActiveInventoryConfig.getConfig().getTaskProcessorConfig());
276     this.esTaskProcessingStats =
277         new TaskProcessingStats(ElasticSearchConfig.getConfig().getProcessorConfig());
278
279     this.aaiTransactionRateController =
280         new TransactionRateController(ActiveInventoryConfig.getConfig().getTaskProcessorConfig());
281     this.esTransactionRateController =
282         new TransactionRateController(ElasticSearchConfig.getConfig().getProcessorConfig());
283
284     this.aaiWorkOnHand = new AtomicInteger(0);
285     this.esWorkOnHand = new AtomicInteger(0);
286
287     enabledStatFlags = EnumSet.allOf(StatFlag.class);
288
289     this.synchronizerName = "Abstact Entity Synchronizer";
290     
291     String txnID = NodeUtils.getRandomTxnId();
292         MdcContext.initialize(txnID, "AbstractEntitySynchronizer", "", "Sync", "");
293         
294         this.shouldSkipSync = false;
295     this.syncStartedTimeStampInMs = System.currentTimeMillis();
296     this.syncDurationInMs = -1;
297   }
298
299   public boolean shouldSkipSync() {
300     return shouldSkipSync;
301   }
302
303   public void setShouldSkipSync(boolean shouldSkipSync) {
304     this.shouldSkipSync = shouldSkipSync;
305   }
306
307   /**
308    * Inc active inventory work on hand counter.
309    */
310   protected void incActiveInventoryWorkOnHandCounter() {
311     aaiWorkOnHand.incrementAndGet();
312   }
313
314   /**
315    * Dec active inventory work on hand counter.
316    */
317   protected void decActiveInventoryWorkOnHandCounter() {
318     aaiWorkOnHand.decrementAndGet();
319   }
320
321   /**
322    * Inc elastic search work on hand counter.
323    */
324   protected void incElasticSearchWorkOnHandCounter() {
325     esWorkOnHand.incrementAndGet();
326   }
327
328   /**
329    * Dec elastic search work on hand counter.
330    */
331   protected void decElasticSearchWorkOnHandCounter() {
332     esWorkOnHand.decrementAndGet();
333   }
334
335   /**
336    * Shutdown executors.
337    */
338   protected void shutdownExecutors() {
339     try {
340       synchronizerExecutor.shutdown();
341       aaiExecutor.shutdown();
342       esExecutor.shutdown();
343       aaiDataProvider.shutdown();
344       esDataProvider.shutdown();
345     } catch (Exception exc) {
346       logger.error(AaiUiMsgs.ERROR_SHUTDOWN_EXECUTORS, exc );
347     }
348   }
349
350   /**
351    * Clear cache.
352    */
353   public void clearCache() {
354     if (aaiDataProvider != null) {
355       aaiDataProvider.clearCache();
356     }
357   }
358
359   protected ActiveInventoryDataProvider getAaiDataProvider() {
360     return aaiDataProvider;
361   }
362
363   public void setAaiDataProvider(ActiveInventoryDataProvider aaiDataProvider) {
364     this.aaiDataProvider = aaiDataProvider;
365   }
366
367   protected ElasticSearchDataProvider getEsDataProvider() {
368     return esDataProvider;
369   }
370
371   public void setEsDataProvider(ElasticSearchDataProvider provider) {
372     this.esDataProvider = provider;
373   }
374
375   /**
376    * Gets the elastic full url.
377    *
378    * @param resourceUrl the resource url
379    * @param indexName the index name
380    * @param indexType the index type
381    * @return the elastic full url
382    * @throws Exception the exception
383    */
384   protected String getElasticFullUrl(String resourceUrl, String indexName, String indexType)
385       throws Exception {
386     return ElasticSearchConfig.getConfig().getElasticFullUrl(resourceUrl, indexName, indexType);
387   }
388
389   /**
390    * Gets the elastic full url.
391    *
392    * @param resourceUrl the resource url
393    * @param indexName the index name
394    * @return the elastic full url
395    * @throws Exception the exception
396    */
397   protected String getElasticFullUrl(String resourceUrl, String indexName) throws Exception {
398     return ElasticSearchConfig.getConfig().getElasticFullUrl(resourceUrl, indexName);
399   }
400
401   public String getIndexName() {
402     return indexName;
403   }
404
405   public void setIndexName(String indexName) {
406     this.indexName = indexName;
407   }
408
409
410   /**
411    * Gets the response length.
412    *
413    * @param txn the txn
414    * @return the response length
415    */
416   private long getResponseLength(NetworkTransaction txn) {
417
418     if (txn == null) {
419       return -1;
420     }
421
422     OperationResult result = txn.getOperationResult();
423
424     if (result == null) {
425       return -1;
426     }
427
428     if (result.getResult() != null) {
429       return result.getResult().length();
430     }
431
432     return -1;
433   }
434
435   /**
436    * Update elastic search counters.
437    *
438    * @param method the method
439    * @param or the or
440    */
441   protected void updateElasticSearchCounters(HttpMethod method, OperationResult or) {
442     updateElasticSearchCounters(new NetworkTransaction(method, null, or));
443   }
444
445   /**
446    * Update elastic search counters.
447    *
448    * @param method the method
449    * @param entityType the entity type
450    * @param or the or
451    */
452   protected void updateElasticSearchCounters(HttpMethod method, String entityType,
453       OperationResult or) {
454     updateElasticSearchCounters(new NetworkTransaction(method, entityType, or));
455   }
456
457   /**
458    * Update elastic search counters.
459    *
460    * @param txn the txn
461    */
462   protected void updateElasticSearchCounters(NetworkTransaction txn) {
463
464     if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
465       esRestStats.updateCounters(txn);
466     }
467
468     if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
469       esEntityStats.updateCounters(txn);
470     }
471
472     if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
473
474       esTransactionRateController.trackResponseTime(txn.getOperationResult().getResponseTimeInMs());
475
476       esTaskProcessingStats
477           .updateTaskResponseStatsHistogram(txn.getOperationResult().getResponseTimeInMs());
478       esTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
479
480       // don't know the cost of the lengh calc, we'll see if it causes a
481       // problem
482
483       long responsePayloadSizeInBytes = getResponseLength(txn);
484       if (responsePayloadSizeInBytes >= 0) {
485         esTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
486       }
487
488       esTaskProcessingStats
489           .updateTransactionsPerSecondHistogram((long) esTransactionRateController.getCurrentTps());
490     }
491   }
492
493   /**
494    * Update active inventory counters.
495    *
496    * @param method the method
497    * @param or the or
498    */
499   protected void updateActiveInventoryCounters(HttpMethod method, OperationResult or) {
500     updateActiveInventoryCounters(new NetworkTransaction(method, null, or));
501   }
502
503   /**
504    * Update active inventory counters.
505    *
506    * @param method the method
507    * @param entityType the entity type
508    * @param or the or
509    */
510   protected void updateActiveInventoryCounters(HttpMethod method, String entityType,
511       OperationResult or) {
512     updateActiveInventoryCounters(new NetworkTransaction(method, entityType, or));
513   }
514
515   /**
516    * Update active inventory counters.
517    *
518    * @param txn the txn
519    */
520   protected void updateActiveInventoryCounters(NetworkTransaction txn) {
521
522     if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
523       aaiRestStats.updateCounters(txn);
524     }
525
526     if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
527       aaiEntityStats.updateCounters(txn);
528     }
529
530     if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
531       aaiProcessingExceptionStats.updateCounters(txn);
532     }
533
534     if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
535       aaiTransactionRateController
536           .trackResponseTime(txn.getOperationResult().getResponseTimeInMs());
537
538       aaiTaskProcessingStats
539           .updateTaskResponseStatsHistogram(txn.getOperationResult().getResponseTimeInMs());
540       aaiTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
541
542       // don't know the cost of the lengh calc, we'll see if it causes a
543       // problem
544
545       long responsePayloadSizeInBytes = getResponseLength(txn);
546       if (responsePayloadSizeInBytes >= 0) {
547         aaiTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
548       }
549
550       aaiTaskProcessingStats.updateTransactionsPerSecondHistogram(
551           (long) aaiTransactionRateController.getCurrentTps());
552     }
553   }
554
555   /**
556    * Reset counters.
557    */
558   protected void resetCounters() {
559     aaiRestStats.reset();
560     aaiEntityStats.reset();
561     aaiProcessingExceptionStats.reset();
562
563     esRestStats.reset();
564     esEntityStats.reset();
565   }
566
567 }