298a4939c2c5999afcc0cd393e4876464b591f0d
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / sync / AbstractEntitySynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.sync;
24
25 import java.util.EnumSet;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29 import org.onap.aai.cl.api.Logger;
30 import org.onap.aai.cl.mdc.MdcContext;
31 import org.onap.aai.restclient.client.OperationResult;
32 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
33 import org.onap.aai.sparky.dal.ElasticSearchAdapter;
34 import org.onap.aai.sparky.dal.NetworkTransaction;
35 import org.onap.aai.sparky.dal.aai.ActiveInventoryEntityStatistics;
36 import org.onap.aai.sparky.dal.aai.ActiveInventoryProcessingExceptionStatistics;
37 import org.onap.aai.sparky.dal.elasticsearch.ElasticSearchEntityStatistics;
38 import org.onap.aai.sparky.dal.rest.HttpMethod;
39 import org.onap.aai.sparky.dal.rest.RestOperationalStatistics;
40 import org.onap.aai.sparky.logging.AaiUiMsgs;
41 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
42 import org.onap.aai.sparky.util.NodeUtils;
43
44 import com.fasterxml.jackson.databind.ObjectMapper;
45
46 /**
47  * The Class AbstractEntitySynchronizer.
48  *
49  * @author davea.
50  */
51 public abstract class AbstractEntitySynchronizer {
52
53   protected static final int VERSION_CONFLICT_EXCEPTION_CODE = 409;
54   protected static final Integer RETRY_COUNT_PER_ENTITY_LIMIT = new Integer(3);
55
56   protected final Logger logger;
57   protected ObjectMapper mapper;
58   protected long syncDurationInMs;
59
60   /**
61    * The Enum StatFlag.
62    */
63   protected enum StatFlag {
64     AAI_REST_STATS, AAI_ENTITY_STATS, AAI_PROCESSING_EXCEPTION_STATS,
65     AAI_TASK_PROCESSING_STATS, ES_REST_STATS, ES_ENTITY_STATS, ES_TASK_PROCESSING_STATS
66   }
67
68   protected EnumSet<StatFlag> enabledStatFlags;
69
70   protected ElasticSearchAdapter elasticSearchAdapter;
71   protected ActiveInventoryAdapter aaiAdapter;
72
73   protected ExecutorService synchronizerExecutor;
74   protected ExecutorService aaiExecutor;
75   protected ExecutorService esExecutor;
76
77   private RestOperationalStatistics esRestStats;
78   protected ElasticSearchEntityStatistics esEntityStats;
79
80   private RestOperationalStatistics aaiRestStats;
81   protected ActiveInventoryEntityStatistics aaiEntityStats;
82   private ActiveInventoryProcessingExceptionStatistics aaiProcessingExceptionStats;
83
84   private TaskProcessingStats aaiTaskProcessingStats;
85   private TaskProcessingStats esTaskProcessingStats;
86
87   private TransactionRateMonitor aaiTransactionRateController;
88   private TransactionRateMonitor esTransactionRateController;
89
90   protected AtomicInteger aaiWorkOnHand;
91   protected AtomicInteger esWorkOnHand;
92   protected String synchronizerName;
93
94   protected abstract boolean isSyncDone();
95   protected boolean shouldSkipSync;
96
97   public String getActiveInventoryStatisticsReport() {
98
99     StringBuilder sb = new StringBuilder(128);
100
101     if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
102       sb.append("\n\n        ").append("REST Operational Stats:");
103       sb.append(aaiRestStats.getStatisticsReport());
104     }
105
106     if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
107       sb.append("\n\n        ").append("Entity Stats:");
108       sb.append(aaiEntityStats.getStatisticsReport());
109     }
110
111     if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
112       sb.append("\n\n        ").append("Processing Exception Stats:");
113       sb.append(aaiProcessingExceptionStats.getStatisticsReport());
114     }
115
116     return sb.toString();
117
118   }
119
120   public String getElasticSearchStatisticsReport() {
121
122     StringBuilder sb = new StringBuilder(128);
123
124     if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
125       sb.append("\n\n        ").append("REST Operational Stats:");
126       sb.append(esRestStats.getStatisticsReport());
127     }
128
129     if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
130       sb.append("\n\n        ").append("Entity Stats:");
131       sb.append(esEntityStats.getStatisticsReport());
132     }
133
134     return sb.toString();
135
136   }
137
138   /**
139    * Adds the active inventory stat report.
140    *
141    * @param sb the sb
142    */
143   private void addActiveInventoryStatReport(StringBuilder sb) {
144
145     if (sb == null) {
146       return;
147     }
148
149     sb.append("\n\n    AAI");
150     sb.append(getActiveInventoryStatisticsReport());
151
152     double currentTps = 0;
153     if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
154       sb.append("\n\n        ").append("Task Processor Stats:");
155       sb.append(aaiTaskProcessingStats.getStatisticsReport(false, "        "));
156
157       currentTps = aaiTransactionRateController.getCurrentTps();
158
159       sb.append("\n          ").append("Current TPS: ").append(currentTps);
160     }
161
162     sb.append("\n          ").append("Current WOH: ").append(aaiWorkOnHand.get());
163
164     if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
165       if (currentTps > 0) {
166         double numMillisecondsToCompletion = (aaiWorkOnHand.get() / currentTps) * 1000;
167         sb.append("\n            ").append("SyncDurationRemaining=")
168             .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
169       }
170     }
171
172   }
173
174   /**
175    * Adds the elastic stat report.
176    *
177    * @param sb the sb
178    */
179   private void addElasticStatReport(StringBuilder sb) {
180
181     if (sb == null) {
182       return;
183     }
184
185     sb.append("\n\n    ELASTIC");
186     sb.append(getElasticSearchStatisticsReport());
187
188     double currentTps = 0;
189
190     if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
191       sb.append("\n\n        ").append("Task Processor Stats:");
192       sb.append(esTaskProcessingStats.getStatisticsReport(false, "           "));
193
194       currentTps = esTransactionRateController.getCurrentTps();
195
196       sb.append("\n        ").append("Current TPS: ").append(currentTps);
197     }
198
199     sb.append("\n        ").append("Current WOH: ").append(esWorkOnHand.get());
200
201     if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
202       if (currentTps > 0) {
203         double numMillisecondsToCompletion = (esWorkOnHand.get() / currentTps) * 1000;
204         sb.append("\n            ").append("SyncDurationRemaining=")
205             .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
206       }
207     }
208
209
210   }
211
212   /**
213    * Gets the stat report.
214    *
215    * @param syncOpTimeInMs the sync op time in ms
216    * @param showFinalReport the show final report
217    * @return the stat report
218    */
219   protected String getStatReport(long syncOpTimeInMs, boolean showFinalReport) {
220
221     StringBuilder sb = new StringBuilder(128);
222
223     sb.append("\n").append(synchronizerName + " Statistics: ( Sync Operation Duration = "
224         + NodeUtils.getDurationBreakdown(syncOpTimeInMs) + " )");
225
226     addActiveInventoryStatReport(sb);
227     addElasticStatReport(sb);
228
229     if (showFinalReport) {
230       sb.append("\n\n        ").append("Sync Completed!\n");
231     } else {
232       sb.append("\n\n        ").append("Sync in Progress...\n");
233     }
234
235     return sb.toString();
236
237   }
238
239   protected String indexName;
240   protected long syncStartedTimeStampInMs;
241
242   /**
243    * Instantiates a new abstract entity synchronizer.
244    *
245    * @param logger the logger
246    * @param syncName the sync name
247    * @param numSyncWorkers the num sync workers
248    * @param numActiveInventoryWorkers the num active inventory workers
249    * @param numElasticsearchWorkers the num elasticsearch workers
250    * @param indexName the index name
251    * @throws Exception the exception
252    */
253   protected AbstractEntitySynchronizer(Logger logger, String syncName, int numSyncWorkers,
254       int numActiveInventoryWorkers, int numElasticsearchWorkers, String indexName,
255       NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig)
256           throws Exception {
257     this.logger = logger;
258     this.synchronizerExecutor =
259         NodeUtils.createNamedExecutor(syncName + "-INTERNAL", numSyncWorkers, logger);
260     this.aaiExecutor =
261         NodeUtils.createNamedExecutor(syncName + "-AAI", numActiveInventoryWorkers, logger);
262     this.esExecutor =
263         NodeUtils.createNamedExecutor(syncName + "-ES", numElasticsearchWorkers, logger);
264     this.mapper = new ObjectMapper();
265     this.indexName = indexName; 
266     this.esRestStats = new RestOperationalStatistics();
267     this.esEntityStats = new ElasticSearchEntityStatistics();
268     this.aaiRestStats = new RestOperationalStatistics();
269     this.aaiEntityStats = new ActiveInventoryEntityStatistics();
270     this.aaiProcessingExceptionStats = new ActiveInventoryProcessingExceptionStatistics();
271     this.aaiTaskProcessingStats =
272         new TaskProcessingStats(aaiStatConfig);
273     this.esTaskProcessingStats =
274         new TaskProcessingStats(esStatConfig);
275
276     this.aaiTransactionRateController =
277         new TransactionRateMonitor(numActiveInventoryWorkers, aaiStatConfig);
278     this.esTransactionRateController =
279         new TransactionRateMonitor(numElasticsearchWorkers, esStatConfig);
280
281     this.aaiWorkOnHand = new AtomicInteger(0);
282     this.esWorkOnHand = new AtomicInteger(0);
283
284     enabledStatFlags = EnumSet.allOf(StatFlag.class);
285
286     this.synchronizerName = "Abstact Entity Synchronizer";
287     
288     String txnID = NodeUtils.getRandomTxnId();
289         MdcContext.initialize(txnID, "AbstractEntitySynchronizer", "", "Sync", "");
290
291     this.shouldSkipSync = false;
292     this.syncStartedTimeStampInMs = System.currentTimeMillis();
293     this.syncDurationInMs = -1;
294   }
295
296   public boolean shouldSkipSync() {
297     return shouldSkipSync;
298   }
299
300   public void setShouldSkipSync(boolean shouldSkipSync) {
301     this.shouldSkipSync = shouldSkipSync;
302   }
303
304   /**
305    * Inc active inventory work on hand counter.
306    */
307   protected void incActiveInventoryWorkOnHandCounter() {
308     aaiWorkOnHand.incrementAndGet();
309   }
310
311   /**
312    * Dec active inventory work on hand counter.
313    */
314   protected void decActiveInventoryWorkOnHandCounter() {
315     aaiWorkOnHand.decrementAndGet();
316   }
317
318   /**
319    * Inc elastic search work on hand counter.
320    */
321   protected void incElasticSearchWorkOnHandCounter() {
322     esWorkOnHand.incrementAndGet();
323   }
324
325   /**
326    * Dec elastic search work on hand counter.
327    */
328   protected void decElasticSearchWorkOnHandCounter() {
329     esWorkOnHand.decrementAndGet();
330   }
331
332   /**
333    * Shutdown executors.
334    */
335   protected void shutdownExecutors() {
336     try {
337
338       if (synchronizerExecutor != null) {
339         synchronizerExecutor.shutdown();
340       }
341
342       if (aaiExecutor != null) {
343         aaiExecutor.shutdown();
344       }
345
346       if (esExecutor != null) {
347         esExecutor.shutdown();
348       }
349     
350     } catch (Exception exc) {
351       logger.error(AaiUiMsgs.ERROR_SHUTDOWN_EXECUTORS, exc );
352     }
353   }
354
355   /**
356    * Clear cache.
357    */
358   public void clearCache() {}
359   
360   public ElasticSearchAdapter getElasticSearchAdapter() {
361     return elasticSearchAdapter;
362   }
363
364   public void setElasticSearchAdapter(ElasticSearchAdapter elasticSearchAdapter) {
365     this.elasticSearchAdapter = elasticSearchAdapter;
366   }
367
368   public ActiveInventoryAdapter getAaiAdapter() {
369     return aaiAdapter;
370   }
371
372   public void setAaiAdapter(ActiveInventoryAdapter aaiAdapter) {
373     this.aaiAdapter = aaiAdapter;
374   }
375
376   public String getIndexName() {
377     return indexName;
378   }
379
380   public void setIndexName(String indexName) {
381     this.indexName = indexName;
382   }
383
384
385   /**
386    * Gets the response length.
387    *
388    * @param txn the txn
389    * @return the response length
390    */
391   private long getResponseLength(NetworkTransaction txn) {
392
393     if (txn == null) {
394       return -1;
395     }
396
397     OperationResult result = txn.getOperationResult();
398
399     if (result == null) {
400       return -1;
401     }
402
403     if (result.getResult() != null) {
404       return result.getResult().length();
405     }
406
407     return -1;
408   }
409
410   /**
411    * Update elastic search counters.
412    *
413    * @param method the method
414    * @param or the or
415    */
416   protected void updateElasticSearchCounters(HttpMethod method, OperationResult or) {
417     updateElasticSearchCounters(new NetworkTransaction(method, null, or));
418   }
419
420   /**
421    * Update elastic search counters.
422    *
423    * @param method the method
424    * @param entityType the entity type
425    * @param or the or
426    */
427   protected void updateElasticSearchCounters(HttpMethod method, String entityType,
428       OperationResult or) {
429     updateElasticSearchCounters(new NetworkTransaction(method, entityType, or));
430   }
431
432   /**
433    * Update elastic search counters.
434    *
435    * @param txn the txn
436    */
437   protected void updateElasticSearchCounters(NetworkTransaction txn) {
438
439     if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
440       esRestStats.updateCounters(txn);
441     }
442
443     if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
444       esEntityStats.updateCounters(txn);
445     }
446
447     if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
448
449       esTransactionRateController.trackResponseTime(txn.getOpTimeInMs());
450
451       esTaskProcessingStats
452           .updateTaskResponseStatsHistogram(txn.getOpTimeInMs());
453       esTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
454
455       // don't know the cost of the lengh calc, we'll see if it causes a
456       // problem
457
458       long responsePayloadSizeInBytes = getResponseLength(txn);
459       if (responsePayloadSizeInBytes >= 0) {
460         esTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
461       }
462
463       esTaskProcessingStats
464           .updateTransactionsPerSecondHistogram((long) esTransactionRateController.getCurrentTps());
465     }
466   }
467
468   /**
469    * Update active inventory counters.
470    *
471    * @param method the method
472    * @param or the or
473    */
474   protected void updateActiveInventoryCounters(HttpMethod method, OperationResult or) {
475     updateActiveInventoryCounters(new NetworkTransaction(method, null, or));
476   }
477
478   /**
479    * Update active inventory counters.
480    *
481    * @param method the method
482    * @param entityType the entity type
483    * @param or the or
484    */
485   protected void updateActiveInventoryCounters(HttpMethod method, String entityType,
486       OperationResult or) {
487     updateActiveInventoryCounters(new NetworkTransaction(method, entityType, or));
488   }
489
490   /**
491    * Update active inventory counters.
492    *
493    * @param txn the txn
494    */
495   protected void updateActiveInventoryCounters(NetworkTransaction txn) {
496
497     if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
498       aaiRestStats.updateCounters(txn);
499     }
500
501     if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
502       aaiEntityStats.updateCounters(txn);
503     }
504
505     if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
506       aaiProcessingExceptionStats.updateCounters(txn);
507     }
508
509     if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
510       aaiTransactionRateController
511           .trackResponseTime(txn.getOpTimeInMs());
512
513       aaiTaskProcessingStats
514           .updateTaskResponseStatsHistogram(txn.getOpTimeInMs());
515       aaiTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
516
517       // don't know the cost of the lengh calc, we'll see if it causes a
518       // problem
519
520       long responsePayloadSizeInBytes = getResponseLength(txn);
521       if (responsePayloadSizeInBytes >= 0) {
522         aaiTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
523       }
524
525       aaiTaskProcessingStats.updateTransactionsPerSecondHistogram(
526           (long) aaiTransactionRateController.getCurrentTps());
527     }
528   }
529
530   /**
531    * Reset counters.
532    */
533   protected void resetCounters() {
534     aaiRestStats.reset();
535     aaiEntityStats.reset();
536     aaiProcessingExceptionStats.reset();
537
538     esRestStats.reset();
539     esEntityStats.reset();
540   }
541
542 }