2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 package org.onap.aai.sparky.sync;
25 import java.util.EnumSet;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.atomic.AtomicInteger;
29 import org.onap.aai.cl.api.Logger;
30 import org.onap.aai.cl.mdc.MdcContext;
31 import org.onap.aai.restclient.client.OperationResult;
32 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
33 import org.onap.aai.sparky.dal.ElasticSearchAdapter;
34 import org.onap.aai.sparky.dal.NetworkTransaction;
35 import org.onap.aai.sparky.dal.aai.ActiveInventoryEntityStatistics;
36 import org.onap.aai.sparky.dal.aai.ActiveInventoryProcessingExceptionStatistics;
37 import org.onap.aai.sparky.dal.elasticsearch.ElasticSearchEntityStatistics;
38 import org.onap.aai.sparky.dal.rest.HttpMethod;
39 import org.onap.aai.sparky.dal.rest.RestOperationalStatistics;
40 import org.onap.aai.sparky.logging.AaiUiMsgs;
41 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
42 import org.onap.aai.sparky.util.NodeUtils;
44 import com.fasterxml.jackson.databind.ObjectMapper;
47 * The Class AbstractEntitySynchronizer.
51 public abstract class AbstractEntitySynchronizer {
53 protected static final int VERSION_CONFLICT_EXCEPTION_CODE = 409;
54 protected static final Integer RETRY_COUNT_PER_ENTITY_LIMIT = new Integer(3);
56 protected final Logger logger;
57 protected ObjectMapper mapper;
58 protected long syncDurationInMs;
63 protected enum StatFlag {
64 AAI_REST_STATS, AAI_ENTITY_STATS, AAI_PROCESSING_EXCEPTION_STATS,
65 AAI_TASK_PROCESSING_STATS, ES_REST_STATS, ES_ENTITY_STATS, ES_TASK_PROCESSING_STATS
68 protected EnumSet<StatFlag> enabledStatFlags;
70 protected ElasticSearchAdapter elasticSearchAdapter;
71 protected ActiveInventoryAdapter aaiAdapter;
73 protected ExecutorService synchronizerExecutor;
74 protected ExecutorService aaiExecutor;
75 protected ExecutorService esExecutor;
77 private RestOperationalStatistics esRestStats;
78 protected ElasticSearchEntityStatistics esEntityStats;
80 private RestOperationalStatistics aaiRestStats;
81 protected ActiveInventoryEntityStatistics aaiEntityStats;
82 private ActiveInventoryProcessingExceptionStatistics aaiProcessingExceptionStats;
84 private TaskProcessingStats aaiTaskProcessingStats;
85 private TaskProcessingStats esTaskProcessingStats;
87 private TransactionRateMonitor aaiTransactionRateController;
88 private TransactionRateMonitor esTransactionRateController;
90 protected AtomicInteger aaiWorkOnHand;
91 protected AtomicInteger esWorkOnHand;
92 protected String synchronizerName;
94 protected abstract boolean isSyncDone();
95 protected boolean shouldSkipSync;
97 public String getActiveInventoryStatisticsReport() {
99 StringBuilder sb = new StringBuilder(128);
101 if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
102 sb.append("\n\n ").append("REST Operational Stats:");
103 sb.append(aaiRestStats.getStatisticsReport());
106 if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
107 sb.append("\n\n ").append("Entity Stats:");
108 sb.append(aaiEntityStats.getStatisticsReport());
111 if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
112 sb.append("\n\n ").append("Processing Exception Stats:");
113 sb.append(aaiProcessingExceptionStats.getStatisticsReport());
116 return sb.toString();
120 public String getElasticSearchStatisticsReport() {
122 StringBuilder sb = new StringBuilder(128);
124 if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
125 sb.append("\n\n ").append("REST Operational Stats:");
126 sb.append(esRestStats.getStatisticsReport());
129 if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
130 sb.append("\n\n ").append("Entity Stats:");
131 sb.append(esEntityStats.getStatisticsReport());
134 return sb.toString();
139 * Adds the active inventory stat report.
143 private void addActiveInventoryStatReport(StringBuilder sb) {
149 sb.append("\n\n AAI");
150 sb.append(getActiveInventoryStatisticsReport());
152 double currentTps = 0;
153 if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
154 sb.append("\n\n ").append("Task Processor Stats:");
155 sb.append(aaiTaskProcessingStats.getStatisticsReport(false, " "));
157 currentTps = aaiTransactionRateController.getCurrentTps();
159 sb.append("\n ").append("Current TPS: ").append(currentTps);
162 sb.append("\n ").append("Current WOH: ").append(aaiWorkOnHand.get());
164 if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
165 if (currentTps > 0) {
166 double numMillisecondsToCompletion = (aaiWorkOnHand.get() / currentTps) * 1000;
167 sb.append("\n ").append("SyncDurationRemaining=")
168 .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
175 * Adds the elastic stat report.
179 private void addElasticStatReport(StringBuilder sb) {
185 sb.append("\n\n ELASTIC");
186 sb.append(getElasticSearchStatisticsReport());
188 double currentTps = 0;
190 if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
191 sb.append("\n\n ").append("Task Processor Stats:");
192 sb.append(esTaskProcessingStats.getStatisticsReport(false, " "));
194 currentTps = esTransactionRateController.getCurrentTps();
196 sb.append("\n ").append("Current TPS: ").append(currentTps);
199 sb.append("\n ").append("Current WOH: ").append(esWorkOnHand.get());
201 if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
202 if (currentTps > 0) {
203 double numMillisecondsToCompletion = (esWorkOnHand.get() / currentTps) * 1000;
204 sb.append("\n ").append("SyncDurationRemaining=")
205 .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
213 * Gets the stat report.
215 * @param syncOpTimeInMs the sync op time in ms
216 * @param showFinalReport the show final report
217 * @return the stat report
219 protected String getStatReport(long syncOpTimeInMs, boolean showFinalReport) {
221 StringBuilder sb = new StringBuilder(128);
223 sb.append("\n").append(synchronizerName + " Statistics: ( Sync Operation Duration = "
224 + NodeUtils.getDurationBreakdown(syncOpTimeInMs) + " )");
226 addActiveInventoryStatReport(sb);
227 addElasticStatReport(sb);
229 if (showFinalReport) {
230 sb.append("\n\n ").append("Sync Completed!\n");
232 sb.append("\n\n ").append("Sync in Progress...\n");
235 return sb.toString();
239 protected String indexName;
240 protected long syncStartedTimeStampInMs;
243 * Instantiates a new abstract entity synchronizer.
245 * @param logger the logger
246 * @param syncName the sync name
247 * @param numSyncWorkers the num sync workers
248 * @param numActiveInventoryWorkers the num active inventory workers
249 * @param numElasticsearchWorkers the num elasticsearch workers
250 * @param indexName the index name
251 * @throws Exception the exception
253 protected AbstractEntitySynchronizer(Logger logger, String syncName, int numSyncWorkers,
254 int numActiveInventoryWorkers, int numElasticsearchWorkers, String indexName,
255 NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig)
257 this.logger = logger;
258 this.synchronizerExecutor =
259 NodeUtils.createNamedExecutor(syncName + "-INTERNAL", numSyncWorkers, logger);
261 NodeUtils.createNamedExecutor(syncName + "-AAI", numActiveInventoryWorkers, logger);
263 NodeUtils.createNamedExecutor(syncName + "-ES", numElasticsearchWorkers, logger);
264 this.mapper = new ObjectMapper();
265 this.indexName = indexName;
266 this.esRestStats = new RestOperationalStatistics();
267 this.esEntityStats = new ElasticSearchEntityStatistics();
268 this.aaiRestStats = new RestOperationalStatistics();
269 this.aaiEntityStats = new ActiveInventoryEntityStatistics();
270 this.aaiProcessingExceptionStats = new ActiveInventoryProcessingExceptionStatistics();
271 this.aaiTaskProcessingStats =
272 new TaskProcessingStats(aaiStatConfig);
273 this.esTaskProcessingStats =
274 new TaskProcessingStats(esStatConfig);
276 this.aaiTransactionRateController =
277 new TransactionRateMonitor(numActiveInventoryWorkers, aaiStatConfig);
278 this.esTransactionRateController =
279 new TransactionRateMonitor(numElasticsearchWorkers, esStatConfig);
281 this.aaiWorkOnHand = new AtomicInteger(0);
282 this.esWorkOnHand = new AtomicInteger(0);
284 enabledStatFlags = EnumSet.allOf(StatFlag.class);
286 this.synchronizerName = "Abstact Entity Synchronizer";
288 String txnID = NodeUtils.getRandomTxnId();
289 MdcContext.initialize(txnID, "AbstractEntitySynchronizer", "", "Sync", "");
291 this.shouldSkipSync = false;
292 this.syncStartedTimeStampInMs = System.currentTimeMillis();
293 this.syncDurationInMs = -1;
296 public boolean shouldSkipSync() {
297 return shouldSkipSync;
300 public void setShouldSkipSync(boolean shouldSkipSync) {
301 this.shouldSkipSync = shouldSkipSync;
305 * Inc active inventory work on hand counter.
307 protected void incActiveInventoryWorkOnHandCounter() {
308 aaiWorkOnHand.incrementAndGet();
312 * Dec active inventory work on hand counter.
314 protected void decActiveInventoryWorkOnHandCounter() {
315 aaiWorkOnHand.decrementAndGet();
319 * Inc elastic search work on hand counter.
321 protected void incElasticSearchWorkOnHandCounter() {
322 esWorkOnHand.incrementAndGet();
326 * Dec elastic search work on hand counter.
328 protected void decElasticSearchWorkOnHandCounter() {
329 esWorkOnHand.decrementAndGet();
333 * Shutdown executors.
335 protected void shutdownExecutors() {
338 if (synchronizerExecutor != null) {
339 synchronizerExecutor.shutdown();
342 if (aaiExecutor != null) {
343 aaiExecutor.shutdown();
346 if (esExecutor != null) {
347 esExecutor.shutdown();
350 } catch (Exception exc) {
351 logger.error(AaiUiMsgs.ERROR_SHUTDOWN_EXECUTORS, exc );
358 public void clearCache() {}
360 public ElasticSearchAdapter getElasticSearchAdapter() {
361 return elasticSearchAdapter;
364 public void setElasticSearchAdapter(ElasticSearchAdapter elasticSearchAdapter) {
365 this.elasticSearchAdapter = elasticSearchAdapter;
368 public ActiveInventoryAdapter getAaiAdapter() {
372 public void setAaiAdapter(ActiveInventoryAdapter aaiAdapter) {
373 this.aaiAdapter = aaiAdapter;
376 public String getIndexName() {
380 public void setIndexName(String indexName) {
381 this.indexName = indexName;
386 * Gets the response length.
389 * @return the response length
391 private long getResponseLength(NetworkTransaction txn) {
397 OperationResult result = txn.getOperationResult();
399 if (result == null) {
403 if (result.getResult() != null) {
404 return result.getResult().length();
411 * Update elastic search counters.
413 * @param method the method
416 protected void updateElasticSearchCounters(HttpMethod method, OperationResult or) {
417 updateElasticSearchCounters(new NetworkTransaction(method, null, or));
421 * Update elastic search counters.
423 * @param method the method
424 * @param entityType the entity type
427 protected void updateElasticSearchCounters(HttpMethod method, String entityType,
428 OperationResult or) {
429 updateElasticSearchCounters(new NetworkTransaction(method, entityType, or));
433 * Update elastic search counters.
437 protected void updateElasticSearchCounters(NetworkTransaction txn) {
439 if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
440 esRestStats.updateCounters(txn);
443 if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
444 esEntityStats.updateCounters(txn);
447 if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
449 esTransactionRateController.trackResponseTime(txn.getOpTimeInMs());
451 esTaskProcessingStats
452 .updateTaskResponseStatsHistogram(txn.getOpTimeInMs());
453 esTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
455 // don't know the cost of the lengh calc, we'll see if it causes a
458 long responsePayloadSizeInBytes = getResponseLength(txn);
459 if (responsePayloadSizeInBytes >= 0) {
460 esTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
463 esTaskProcessingStats
464 .updateTransactionsPerSecondHistogram((long) esTransactionRateController.getCurrentTps());
469 * Update active inventory counters.
471 * @param method the method
474 protected void updateActiveInventoryCounters(HttpMethod method, OperationResult or) {
475 updateActiveInventoryCounters(new NetworkTransaction(method, null, or));
479 * Update active inventory counters.
481 * @param method the method
482 * @param entityType the entity type
485 protected void updateActiveInventoryCounters(HttpMethod method, String entityType,
486 OperationResult or) {
487 updateActiveInventoryCounters(new NetworkTransaction(method, entityType, or));
491 * Update active inventory counters.
495 protected void updateActiveInventoryCounters(NetworkTransaction txn) {
497 if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
498 aaiRestStats.updateCounters(txn);
501 if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
502 aaiEntityStats.updateCounters(txn);
505 if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
506 aaiProcessingExceptionStats.updateCounters(txn);
509 if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
510 aaiTransactionRateController
511 .trackResponseTime(txn.getOpTimeInMs());
513 aaiTaskProcessingStats
514 .updateTaskResponseStatsHistogram(txn.getOpTimeInMs());
515 aaiTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
517 // don't know the cost of the lengh calc, we'll see if it causes a
520 long responsePayloadSizeInBytes = getResponseLength(txn);
521 if (responsePayloadSizeInBytes >= 0) {
522 aaiTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
525 aaiTaskProcessingStats.updateTransactionsPerSecondHistogram(
526 (long) aaiTransactionRateController.getCurrentTps());
533 protected void resetCounters() {
534 aaiRestStats.reset();
535 aaiEntityStats.reset();
536 aaiProcessingExceptionStats.reset();
539 esEntityStats.reset();