2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.aai.sparky.sync;
23 import java.util.EnumSet;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.atomic.AtomicInteger;
27 import org.onap.aai.cl.api.Logger;
28 import org.onap.aai.cl.mdc.MdcContext;
29 import org.onap.aai.restclient.client.OperationResult;
30 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
31 import org.onap.aai.sparky.dal.ElasticSearchAdapter;
32 import org.onap.aai.sparky.dal.NetworkTransaction;
33 import org.onap.aai.sparky.dal.aai.ActiveInventoryEntityStatistics;
34 import org.onap.aai.sparky.dal.aai.ActiveInventoryProcessingExceptionStatistics;
35 import org.onap.aai.sparky.dal.elasticsearch.ElasticSearchEntityStatistics;
36 import org.onap.aai.sparky.dal.rest.HttpMethod;
37 import org.onap.aai.sparky.dal.rest.RestOperationalStatistics;
38 import org.onap.aai.sparky.logging.AaiUiMsgs;
39 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
40 import org.onap.aai.sparky.util.NodeUtils;
42 import com.fasterxml.jackson.databind.ObjectMapper;
45 * The Class AbstractEntitySynchronizer.
49 public abstract class AbstractEntitySynchronizer {
51 protected static final int VERSION_CONFLICT_EXCEPTION_CODE = 409;
52 protected static final Integer RETRY_COUNT_PER_ENTITY_LIMIT = new Integer(3);
54 protected final Logger logger;
55 protected ObjectMapper mapper;
56 protected long syncDurationInMs;
61 protected enum StatFlag {
62 AAI_REST_STATS, AAI_ENTITY_STATS, AAI_PROCESSING_EXCEPTION_STATS,
63 AAI_TASK_PROCESSING_STATS, ES_REST_STATS, ES_ENTITY_STATS, ES_TASK_PROCESSING_STATS
66 protected EnumSet<StatFlag> enabledStatFlags;
68 protected ElasticSearchAdapter elasticSearchAdapter;
69 protected ActiveInventoryAdapter aaiAdapter;
71 protected ExecutorService synchronizerExecutor;
72 protected ExecutorService aaiExecutor;
73 protected ExecutorService esExecutor;
75 private RestOperationalStatistics esRestStats;
76 protected ElasticSearchEntityStatistics esEntityStats;
78 private RestOperationalStatistics aaiRestStats;
79 protected ActiveInventoryEntityStatistics aaiEntityStats;
80 private ActiveInventoryProcessingExceptionStatistics aaiProcessingExceptionStats;
82 private TaskProcessingStats aaiTaskProcessingStats;
83 private TaskProcessingStats esTaskProcessingStats;
85 private TransactionRateMonitor aaiTransactionRateController;
86 private TransactionRateMonitor esTransactionRateController;
88 protected AtomicInteger aaiWorkOnHand;
89 protected AtomicInteger esWorkOnHand;
90 protected String synchronizerName;
92 protected abstract boolean isSyncDone();
93 protected boolean shouldSkipSync;
95 public String getActiveInventoryStatisticsReport() {
97 StringBuilder sb = new StringBuilder(128);
99 if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
100 sb.append("\n\n ").append("REST Operational Stats:");
101 sb.append(aaiRestStats.getStatisticsReport());
104 if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
105 sb.append("\n\n ").append("Entity Stats:");
106 sb.append(aaiEntityStats.getStatisticsReport());
109 if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
110 sb.append("\n\n ").append("Processing Exception Stats:");
111 sb.append(aaiProcessingExceptionStats.getStatisticsReport());
114 return sb.toString();
118 public String getElasticSearchStatisticsReport() {
120 StringBuilder sb = new StringBuilder(128);
122 if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
123 sb.append("\n\n ").append("REST Operational Stats:");
124 sb.append(esRestStats.getStatisticsReport());
127 if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
128 sb.append("\n\n ").append("Entity Stats:");
129 sb.append(esEntityStats.getStatisticsReport());
132 return sb.toString();
137 * Adds the active inventory stat report.
141 private void addActiveInventoryStatReport(StringBuilder sb) {
147 sb.append("\n\n AAI");
148 sb.append(getActiveInventoryStatisticsReport());
150 double currentTps = 0;
151 if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
152 sb.append("\n\n ").append("Task Processor Stats:");
153 sb.append(aaiTaskProcessingStats.getStatisticsReport(false, " "));
155 currentTps = aaiTransactionRateController.getCurrentTps();
157 sb.append("\n ").append("Current TPS: ").append(currentTps);
160 sb.append("\n ").append("Current WOH: ").append(aaiWorkOnHand.get());
162 if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
163 if (currentTps > 0) {
164 double numMillisecondsToCompletion = (aaiWorkOnHand.get() / currentTps) * 1000;
165 sb.append("\n ").append("SyncDurationRemaining=")
166 .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
173 * Adds the elastic stat report.
177 private void addElasticStatReport(StringBuilder sb) {
183 sb.append("\n\n ELASTIC");
184 sb.append(getElasticSearchStatisticsReport());
186 double currentTps = 0;
188 if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
189 sb.append("\n\n ").append("Task Processor Stats:");
190 sb.append(esTaskProcessingStats.getStatisticsReport(false, " "));
192 currentTps = esTransactionRateController.getCurrentTps();
194 sb.append("\n ").append("Current TPS: ").append(currentTps);
197 sb.append("\n ").append("Current WOH: ").append(esWorkOnHand.get());
199 if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
200 if (currentTps > 0) {
201 double numMillisecondsToCompletion = (esWorkOnHand.get() / currentTps) * 1000;
202 sb.append("\n ").append("SyncDurationRemaining=")
203 .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
211 * Gets the stat report.
213 * @param syncOpTimeInMs the sync op time in ms
214 * @param showFinalReport the show final report
215 * @return the stat report
217 protected String getStatReport(long syncOpTimeInMs, boolean showFinalReport) {
219 StringBuilder sb = new StringBuilder(128);
221 sb.append("\n").append(synchronizerName + " Statistics: ( Sync Operation Duration = "
222 + NodeUtils.getDurationBreakdown(syncOpTimeInMs) + " )");
224 addActiveInventoryStatReport(sb);
225 addElasticStatReport(sb);
227 if (showFinalReport) {
228 sb.append("\n\n ").append("Sync Completed!\n");
230 sb.append("\n\n ").append("Sync in Progress...\n");
233 return sb.toString();
237 protected String indexName;
238 protected long syncStartedTimeStampInMs;
241 * Instantiates a new abstract entity synchronizer.
243 * @param logger the logger
244 * @param syncName the sync name
245 * @param numSyncWorkers the num sync workers
246 * @param numActiveInventoryWorkers the num active inventory workers
247 * @param numElasticsearchWorkers the num elasticsearch workers
248 * @param indexName the index name
249 * @throws Exception the exception
251 protected AbstractEntitySynchronizer(Logger logger, String syncName, int numSyncWorkers,
252 int numActiveInventoryWorkers, int numElasticsearchWorkers, String indexName,
253 NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig)
255 this.logger = logger;
256 this.synchronizerExecutor =
257 NodeUtils.createNamedExecutor(syncName + "-INTERNAL", numSyncWorkers, logger);
259 NodeUtils.createNamedExecutor(syncName + "-AAI", numActiveInventoryWorkers, logger);
261 NodeUtils.createNamedExecutor(syncName + "-ES", numElasticsearchWorkers, logger);
262 this.mapper = new ObjectMapper();
263 this.indexName = indexName;
264 this.esRestStats = new RestOperationalStatistics();
265 this.esEntityStats = new ElasticSearchEntityStatistics();
266 this.aaiRestStats = new RestOperationalStatistics();
267 this.aaiEntityStats = new ActiveInventoryEntityStatistics();
268 this.aaiProcessingExceptionStats = new ActiveInventoryProcessingExceptionStatistics();
269 this.aaiTaskProcessingStats =
270 new TaskProcessingStats(aaiStatConfig);
271 this.esTaskProcessingStats =
272 new TaskProcessingStats(esStatConfig);
274 this.aaiTransactionRateController =
275 new TransactionRateMonitor(numActiveInventoryWorkers, aaiStatConfig);
276 this.esTransactionRateController =
277 new TransactionRateMonitor(numElasticsearchWorkers, esStatConfig);
279 this.aaiWorkOnHand = new AtomicInteger(0);
280 this.esWorkOnHand = new AtomicInteger(0);
282 enabledStatFlags = EnumSet.allOf(StatFlag.class);
284 this.synchronizerName = "Abstact Entity Synchronizer";
286 String txnID = NodeUtils.getRandomTxnId();
287 MdcContext.initialize(txnID, "AbstractEntitySynchronizer", "", "Sync", "");
289 this.shouldSkipSync = false;
290 this.syncStartedTimeStampInMs = System.currentTimeMillis();
291 this.syncDurationInMs = -1;
294 public boolean shouldSkipSync() {
295 return shouldSkipSync;
298 public void setShouldSkipSync(boolean shouldSkipSync) {
299 this.shouldSkipSync = shouldSkipSync;
303 * Inc active inventory work on hand counter.
305 protected void incActiveInventoryWorkOnHandCounter() {
306 aaiWorkOnHand.incrementAndGet();
310 * Dec active inventory work on hand counter.
312 protected void decActiveInventoryWorkOnHandCounter() {
313 aaiWorkOnHand.decrementAndGet();
317 * Inc elastic search work on hand counter.
319 protected void incElasticSearchWorkOnHandCounter() {
320 esWorkOnHand.incrementAndGet();
324 * Dec elastic search work on hand counter.
326 protected void decElasticSearchWorkOnHandCounter() {
327 esWorkOnHand.decrementAndGet();
331 * Shutdown executors.
333 protected void shutdownExecutors() {
336 if (synchronizerExecutor != null) {
337 synchronizerExecutor.shutdown();
340 if (aaiExecutor != null) {
341 aaiExecutor.shutdown();
344 if (esExecutor != null) {
345 esExecutor.shutdown();
348 } catch (Exception exc) {
349 logger.error(AaiUiMsgs.ERROR_SHUTDOWN_EXECUTORS, exc );
356 public void clearCache() {}
358 public ElasticSearchAdapter getElasticSearchAdapter() {
359 return elasticSearchAdapter;
362 public void setElasticSearchAdapter(ElasticSearchAdapter elasticSearchAdapter) {
363 this.elasticSearchAdapter = elasticSearchAdapter;
366 public ActiveInventoryAdapter getAaiAdapter() {
370 public void setAaiAdapter(ActiveInventoryAdapter aaiAdapter) {
371 this.aaiAdapter = aaiAdapter;
374 public String getIndexName() {
378 public void setIndexName(String indexName) {
379 this.indexName = indexName;
384 * Gets the response length.
387 * @return the response length
389 private long getResponseLength(NetworkTransaction txn) {
395 OperationResult result = txn.getOperationResult();
397 if (result == null) {
401 if (result.getResult() != null) {
402 return result.getResult().length();
409 * Update elastic search counters.
411 * @param method the method
412 * @param entityType the entity type
415 protected void updateElasticSearchCounters(HttpMethod method, String entityType,
416 OperationResult or) {
417 updateElasticSearchCounters(new NetworkTransaction(method, entityType, or));
421 * Update elastic search counters.
425 protected void updateElasticSearchCounters(NetworkTransaction txn) {
427 if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
428 esRestStats.updateCounters(txn);
431 if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
432 esEntityStats.updateCounters(txn);
435 if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
437 esTransactionRateController.trackResponseTime(txn.getOpTimeInMs());
439 esTaskProcessingStats
440 .updateTaskResponseStatsHistogram(txn.getOpTimeInMs());
441 esTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
443 // don't know the cost of the lengh calc, we'll see if it causes a
446 long responsePayloadSizeInBytes = getResponseLength(txn);
447 if (responsePayloadSizeInBytes >= 0) {
448 esTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
451 esTaskProcessingStats
452 .updateTransactionsPerSecondHistogram((long) esTransactionRateController.getCurrentTps());
457 * Update active inventory counters.
459 * @param method the method
460 * @param entityType the entity type
463 protected void updateActiveInventoryCounters(HttpMethod method, String entityType,
464 OperationResult or) {
465 updateActiveInventoryCounters(new NetworkTransaction(method, entityType, or));
469 * Update active inventory counters.
473 protected void updateActiveInventoryCounters(NetworkTransaction txn) {
475 if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
476 aaiRestStats.updateCounters(txn);
479 if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
480 aaiEntityStats.updateCounters(txn);
483 if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
484 aaiProcessingExceptionStats.updateCounters(txn);
487 if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
488 aaiTransactionRateController
489 .trackResponseTime(txn.getOpTimeInMs());
491 aaiTaskProcessingStats
492 .updateTaskResponseStatsHistogram(txn.getOpTimeInMs());
493 aaiTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
495 // don't know the cost of the lengh calc, we'll see if it causes a
498 long responsePayloadSizeInBytes = getResponseLength(txn);
499 if (responsePayloadSizeInBytes >= 0) {
500 aaiTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
503 aaiTaskProcessingStats.updateTransactionsPerSecondHistogram(
504 (long) aaiTransactionRateController.getCurrentTps());
511 protected void resetCounters() {
512 aaiRestStats.reset();
513 aaiEntityStats.reset();
514 aaiProcessingExceptionStats.reset();
517 esEntityStats.reset();