Initial commit for AAI-UI(sparky-backend)
[aai/sparky-be.git] / src / main / java / org / openecomp / sparky / synchronizer / AbstractEntitySynchronizer.java
1 /**
2  * ============LICENSE_START===================================================
3  * SPARKY (AAI UI service)
4  * ============================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=====================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25
26 package org.openecomp.sparky.synchronizer;
27
28 import java.net.InetAddress;
29 import java.net.UnknownHostException;
30 import java.util.EnumSet;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.atomic.AtomicInteger;
33
34 import org.openecomp.cl.api.Logger;
35 import org.openecomp.sparky.config.oxm.OxmModelLoader;
36 import org.openecomp.sparky.dal.NetworkTransaction;
37 import org.openecomp.sparky.dal.aai.ActiveInventoryDataProvider;
38 import org.openecomp.sparky.dal.aai.ActiveInventoryEntityStatistics;
39 import org.openecomp.sparky.dal.aai.ActiveInventoryProcessingExceptionStatistics;
40 import org.openecomp.sparky.dal.aai.config.ActiveInventoryConfig;
41 import org.openecomp.sparky.dal.elasticsearch.ElasticSearchDataProvider;
42 import org.openecomp.sparky.dal.elasticsearch.ElasticSearchEntityStatistics;
43 import org.openecomp.sparky.dal.elasticsearch.config.ElasticSearchConfig;
44 import org.openecomp.sparky.dal.rest.HttpMethod;
45 import org.openecomp.sparky.dal.rest.OperationResult;
46 import org.openecomp.sparky.dal.rest.RestOperationalStatistics;
47 import org.openecomp.sparky.logging.AaiUiMsgs;
48 import org.openecomp.sparky.util.NodeUtils;
49
50 import org.openecomp.cl.mdc.MdcContext;
51 import com.fasterxml.jackson.databind.ObjectMapper;
52
53 /**
54  * The Class AbstractEntitySynchronizer.
55  *
56  * @author davea.
57  */
58 public abstract class AbstractEntitySynchronizer {
59
60   protected static final int VERSION_CONFLICT_EXCEPTION_CODE = 409;
61   protected static final Integer RETRY_COUNT_PER_ENTITY_LIMIT = new Integer(3);
62
63   protected final Logger logger;
64   protected ObjectMapper mapper;
65   protected OxmModelLoader oxmModelLoader;
66
67   /**
68    * The Enum StatFlag.
69    */
70   protected enum StatFlag {
71     AAI_REST_STATS, AAI_ENTITY_STATS, AAI_PROCESSING_EXCEPTION_STATS,
72     AAI_TASK_PROCESSING_STATS, ES_REST_STATS, ES_ENTITY_STATS, ES_TASK_PROCESSING_STATS
73   }
74
75   protected EnumSet<StatFlag> enabledStatFlags;
76
77   protected ActiveInventoryDataProvider aaiDataProvider;
78   protected ElasticSearchDataProvider esDataProvider;
79
80   protected ExecutorService synchronizerExecutor;
81   protected ExecutorService aaiExecutor;
82   protected ExecutorService esExecutor;
83
84   private RestOperationalStatistics esRestStats;
85   protected ElasticSearchEntityStatistics esEntityStats;
86
87   private RestOperationalStatistics aaiRestStats;
88   protected ActiveInventoryEntityStatistics aaiEntityStats;
89   private ActiveInventoryProcessingExceptionStatistics aaiProcessingExceptionStats;
90
91   private TaskProcessingStats aaiTaskProcessingStats;
92   private TaskProcessingStats esTaskProcessingStats;
93
94   private TransactionRateController aaiTransactionRateController;
95   private TransactionRateController esTransactionRateController;
96
97   protected AtomicInteger aaiWorkOnHand;
98   protected AtomicInteger esWorkOnHand;
99   protected String synchronizerName;
100
101   protected abstract boolean isSyncDone();
102
103   public String getActiveInventoryStatisticsReport() {
104
105     StringBuilder sb = new StringBuilder(128);
106
107     if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
108       sb.append("\n\n        ").append("REST Operational Stats:");
109       sb.append(aaiRestStats.getStatisticsReport());
110     }
111
112     if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
113       sb.append("\n\n        ").append("Entity Stats:");
114       sb.append(aaiEntityStats.getStatisticsReport());
115     }
116
117     if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
118       sb.append("\n\n        ").append("Processing Exception Stats:");
119       sb.append(aaiProcessingExceptionStats.getStatisticsReport());
120     }
121
122     return sb.toString();
123
124   }
125
126   public String getElasticSearchStatisticsReport() {
127
128     StringBuilder sb = new StringBuilder(128);
129
130     if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
131       sb.append("\n\n        ").append("REST Operational Stats:");
132       sb.append(esRestStats.getStatisticsReport());
133     }
134
135     if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
136       sb.append("\n\n        ").append("Entity Stats:");
137       sb.append(esEntityStats.getStatisticsReport());
138     }
139
140     return sb.toString();
141
142   }
143
144   /**
145    * Adds the active inventory stat report.
146    *
147    * @param sb the sb
148    */
149   private void addActiveInventoryStatReport(StringBuilder sb) {
150
151     if (sb == null) {
152       return;
153     }
154
155     sb.append("\n\n    AAI");
156     sb.append(getActiveInventoryStatisticsReport());
157
158     double currentTps = 0;
159     if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
160       sb.append("\n\n        ").append("Task Processor Stats:");
161       sb.append(aaiTaskProcessingStats.getStatisticsReport(false, "        "));
162
163       currentTps = aaiTransactionRateController.getCurrentTps();
164
165       sb.append("\n          ").append("Current TPS: ").append(currentTps);
166     }
167
168     sb.append("\n          ").append("Current WOH: ").append(aaiWorkOnHand.get());
169
170     if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
171       if (currentTps > 0) {
172         double numMillisecondsToCompletion = (aaiWorkOnHand.get() / currentTps) * 1000;
173         sb.append("\n            ").append("SyncDurationRemaining=")
174             .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
175       }
176     }
177
178   }
179
180   /**
181    * Adds the elastic stat report.
182    *
183    * @param sb the sb
184    */
185   private void addElasticStatReport(StringBuilder sb) {
186
187     if (sb == null) {
188       return;
189     }
190
191     sb.append("\n\n    ELASTIC");
192     sb.append(getElasticSearchStatisticsReport());
193
194     double currentTps = 0;
195
196     if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
197       sb.append("\n\n        ").append("Task Processor Stats:");
198       sb.append(esTaskProcessingStats.getStatisticsReport(false, "           "));
199
200       currentTps = esTransactionRateController.getCurrentTps();
201
202       sb.append("\n        ").append("Current TPS: ").append(currentTps);
203     }
204
205     sb.append("\n        ").append("Current WOH: ").append(esWorkOnHand.get());
206
207     if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
208       if (currentTps > 0) {
209         double numMillisecondsToCompletion = (esWorkOnHand.get() / currentTps) * 1000;
210         sb.append("\n            ").append("SyncDurationRemaining=")
211             .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
212       }
213     }
214
215
216   }
217
218   /**
219    * Gets the stat report.
220    *
221    * @param syncOpTimeInMs the sync op time in ms
222    * @param showFinalReport the show final report
223    * @return the stat report
224    */
225   protected String getStatReport(long syncOpTimeInMs, boolean showFinalReport) {
226
227     StringBuilder sb = new StringBuilder(128);
228
229     sb.append("\n").append(synchronizerName + " Statistics: ( Sync Operation Duration = "
230         + NodeUtils.getDurationBreakdown(syncOpTimeInMs) + " )");
231
232     addActiveInventoryStatReport(sb);
233     addElasticStatReport(sb);
234
235     if (showFinalReport) {
236       sb.append("\n\n        ").append("Sync Completed!\n");
237     } else {
238       sb.append("\n\n        ").append("Sync in Progress...\n");
239     }
240
241     return sb.toString();
242
243   }
244
245   protected String indexName;
246   protected long syncStartedTimeStampInMs;
247
248   /**
249    * Instantiates a new abstract entity synchronizer.
250    *
251    * @param logger the logger
252    * @param syncName the sync name
253    * @param numSyncWorkers the num sync workers
254    * @param numActiveInventoryWorkers the num active inventory workers
255    * @param numElasticsearchWorkers the num elasticsearch workers
256    * @param indexName the index name
257    * @throws Exception the exception
258    */
259   protected AbstractEntitySynchronizer(Logger logger, String syncName, int numSyncWorkers,
260       int numActiveInventoryWorkers, int numElasticsearchWorkers, String indexName)
261           throws Exception {
262     this.logger = logger;
263     this.synchronizerExecutor =
264         NodeUtils.createNamedExecutor(syncName + "-INTERNAL", numSyncWorkers, logger);
265     this.aaiExecutor =
266         NodeUtils.createNamedExecutor(syncName + "-AAI", numActiveInventoryWorkers, logger);
267     this.esExecutor =
268         NodeUtils.createNamedExecutor(syncName + "-ES", numElasticsearchWorkers, logger);
269     this.mapper = new ObjectMapper();
270     this.oxmModelLoader = OxmModelLoader.getInstance();
271     this.indexName = indexName;
272     this.esRestStats = new RestOperationalStatistics();
273     this.esEntityStats = new ElasticSearchEntityStatistics(oxmModelLoader);
274     this.aaiRestStats = new RestOperationalStatistics();
275     this.aaiEntityStats = new ActiveInventoryEntityStatistics(oxmModelLoader);
276     this.aaiProcessingExceptionStats = new ActiveInventoryProcessingExceptionStatistics();
277     this.aaiTaskProcessingStats =
278         new TaskProcessingStats(ActiveInventoryConfig.getConfig().getTaskProcessorConfig());
279     this.esTaskProcessingStats =
280         new TaskProcessingStats(ElasticSearchConfig.getConfig().getProcessorConfig());
281
282     this.aaiTransactionRateController =
283         new TransactionRateController(ActiveInventoryConfig.getConfig().getTaskProcessorConfig());
284     this.esTransactionRateController =
285         new TransactionRateController(ElasticSearchConfig.getConfig().getProcessorConfig());
286
287     this.aaiWorkOnHand = new AtomicInteger(0);
288     this.esWorkOnHand = new AtomicInteger(0);
289
290     enabledStatFlags = EnumSet.allOf(StatFlag.class);
291
292     this.synchronizerName = "Abstact Entity Synchronizer";
293     
294     String txnID = NodeUtils.getRandomTxnId();
295         MdcContext.initialize(txnID, "AbstractEntitySynchronizer", "", "Sync", "");
296
297   }
298
299   /**
300    * Inc active inventory work on hand counter.
301    */
302   protected void incActiveInventoryWorkOnHandCounter() {
303     aaiWorkOnHand.incrementAndGet();
304   }
305
306   /**
307    * Dec active inventory work on hand counter.
308    */
309   protected void decActiveInventoryWorkOnHandCounter() {
310     aaiWorkOnHand.decrementAndGet();
311   }
312
313   /**
314    * Inc elastic search work on hand counter.
315    */
316   protected void incElasticSearchWorkOnHandCounter() {
317     esWorkOnHand.incrementAndGet();
318   }
319
320   /**
321    * Dec elastic search work on hand counter.
322    */
323   protected void decElasticSearchWorkOnHandCounter() {
324     esWorkOnHand.decrementAndGet();
325   }
326
327   /**
328    * Shutdown executors.
329    */
330   protected void shutdownExecutors() {
331     try {
332       synchronizerExecutor.shutdown();
333       aaiExecutor.shutdown();
334       esExecutor.shutdown();
335       aaiDataProvider.shutdown();
336       esDataProvider.shutdown();
337     } catch (Exception exc) {
338       logger.error(AaiUiMsgs.ERROR_SHUTDOWN_EXECUTORS, exc );
339     }
340   }
341
342   /**
343    * Clear cache.
344    */
345   public void clearCache() {
346     if (aaiDataProvider != null) {
347       aaiDataProvider.clearCache();
348     }
349   }
350
351   protected ActiveInventoryDataProvider getAaiDataProvider() {
352     return aaiDataProvider;
353   }
354
355   public void setAaiDataProvider(ActiveInventoryDataProvider aaiDataProvider) {
356     this.aaiDataProvider = aaiDataProvider;
357   }
358
359   protected ElasticSearchDataProvider getEsDataProvider() {
360     return esDataProvider;
361   }
362
363   public void setEsDataProvider(ElasticSearchDataProvider provider) {
364     this.esDataProvider = provider;
365   }
366
367   /**
368    * Gets the elastic full url.
369    *
370    * @param resourceUrl the resource url
371    * @param indexName the index name
372    * @param indexType the index type
373    * @return the elastic full url
374    * @throws Exception the exception
375    */
376   protected String getElasticFullUrl(String resourceUrl, String indexName, String indexType)
377       throws Exception {
378     return ElasticSearchConfig.getConfig().getElasticFullUrl(resourceUrl, indexName, indexType);
379   }
380
381   /**
382    * Gets the elastic full url.
383    *
384    * @param resourceUrl the resource url
385    * @param indexName the index name
386    * @return the elastic full url
387    * @throws Exception the exception
388    */
389   protected String getElasticFullUrl(String resourceUrl, String indexName) throws Exception {
390     return ElasticSearchConfig.getConfig().getElasticFullUrl(resourceUrl, indexName);
391   }
392
393   public String getIndexName() {
394     return indexName;
395   }
396
397   public void setIndexName(String indexName) {
398     this.indexName = indexName;
399   }
400
401
402   /**
403    * Gets the response length.
404    *
405    * @param txn the txn
406    * @return the response length
407    */
408   private long getResponseLength(NetworkTransaction txn) {
409
410     if (txn == null) {
411       return -1;
412     }
413
414     OperationResult result = txn.getOperationResult();
415
416     if (result == null) {
417       return -1;
418     }
419
420     if (result.getResult() != null) {
421       return result.getResult().length();
422     }
423
424     return -1;
425   }
426
427   /**
428    * Update elastic search counters.
429    *
430    * @param method the method
431    * @param or the or
432    */
433   protected void updateElasticSearchCounters(HttpMethod method, OperationResult or) {
434     updateElasticSearchCounters(new NetworkTransaction(method, null, or));
435   }
436
437   /**
438    * Update elastic search counters.
439    *
440    * @param method the method
441    * @param entityType the entity type
442    * @param or the or
443    */
444   protected void updateElasticSearchCounters(HttpMethod method, String entityType,
445       OperationResult or) {
446     updateElasticSearchCounters(new NetworkTransaction(method, entityType, or));
447   }
448
449   /**
450    * Update elastic search counters.
451    *
452    * @param txn the txn
453    */
454   protected void updateElasticSearchCounters(NetworkTransaction txn) {
455
456     if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
457       esRestStats.updateCounters(txn);
458     }
459
460     if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
461       esEntityStats.updateCounters(txn);
462     }
463
464     if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
465
466       esTransactionRateController.trackResponseTime(txn.getOperationResult().getResponseTimeInMs());
467
468       esTaskProcessingStats
469           .updateTaskResponseStatsHistogram(txn.getOperationResult().getResponseTimeInMs());
470       esTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
471
472       // don't know the cost of the lengh calc, we'll see if it causes a
473       // problem
474
475       long responsePayloadSizeInBytes = getResponseLength(txn);
476       if (responsePayloadSizeInBytes >= 0) {
477         esTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
478       }
479
480       esTaskProcessingStats
481           .updateTransactionsPerSecondHistogram((long) esTransactionRateController.getCurrentTps());
482     }
483   }
484
485   /**
486    * Update active inventory counters.
487    *
488    * @param method the method
489    * @param or the or
490    */
491   protected void updateActiveInventoryCounters(HttpMethod method, OperationResult or) {
492     updateActiveInventoryCounters(new NetworkTransaction(method, null, or));
493   }
494
495   /**
496    * Update active inventory counters.
497    *
498    * @param method the method
499    * @param entityType the entity type
500    * @param or the or
501    */
502   protected void updateActiveInventoryCounters(HttpMethod method, String entityType,
503       OperationResult or) {
504     updateActiveInventoryCounters(new NetworkTransaction(method, entityType, or));
505   }
506
507   /**
508    * Update active inventory counters.
509    *
510    * @param txn the txn
511    */
512   protected void updateActiveInventoryCounters(NetworkTransaction txn) {
513
514     if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
515       aaiRestStats.updateCounters(txn);
516     }
517
518     if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
519       aaiEntityStats.updateCounters(txn);
520     }
521
522     if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
523       aaiProcessingExceptionStats.updateCounters(txn);
524     }
525
526     if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
527       aaiTransactionRateController
528           .trackResponseTime(txn.getOperationResult().getResponseTimeInMs());
529
530       aaiTaskProcessingStats
531           .updateTaskResponseStatsHistogram(txn.getOperationResult().getResponseTimeInMs());
532       aaiTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
533
534       // don't know the cost of the lengh calc, we'll see if it causes a
535       // problem
536
537       long responsePayloadSizeInBytes = getResponseLength(txn);
538       if (responsePayloadSizeInBytes >= 0) {
539         aaiTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
540       }
541
542       aaiTaskProcessingStats.updateTransactionsPerSecondHistogram(
543           (long) aaiTransactionRateController.getCurrentTps());
544     }
545   }
546
547   /**
548    * Reset counters.
549    */
550   protected void resetCounters() {
551     aaiRestStats.reset();
552     aaiEntityStats.reset();
553     aaiProcessingExceptionStats.reset();
554
555     esRestStats.reset();
556     esEntityStats.reset();
557   }
558
559 }