2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 package org.onap.aai.sparky.sync;
25 import java.util.EnumSet;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.atomic.AtomicInteger;
29 import org.onap.aai.cl.api.Logger;
30 import org.onap.aai.cl.mdc.MdcContext;
31 import org.onap.aai.restclient.client.OperationResult;
32 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
33 import org.onap.aai.sparky.dal.ElasticSearchAdapter;
34 import org.onap.aai.sparky.dal.NetworkTransaction;
35 import org.onap.aai.sparky.dal.aai.ActiveInventoryEntityStatistics;
36 import org.onap.aai.sparky.dal.aai.ActiveInventoryProcessingExceptionStatistics;
37 import org.onap.aai.sparky.dal.elasticsearch.ElasticSearchEntityStatistics;
38 import org.onap.aai.sparky.dal.elasticsearch.config.ElasticSearchConfig;
39 import org.onap.aai.sparky.dal.rest.HttpMethod;
40 import org.onap.aai.sparky.dal.rest.RestOperationalStatistics;
41 import org.onap.aai.sparky.logging.AaiUiMsgs;
42 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
43 import org.onap.aai.sparky.util.NodeUtils;
45 import com.fasterxml.jackson.databind.ObjectMapper;
48 * The Class AbstractEntitySynchronizer.
52 public abstract class AbstractEntitySynchronizer {
54 protected static final int VERSION_CONFLICT_EXCEPTION_CODE = 409;
55 protected static final Integer RETRY_COUNT_PER_ENTITY_LIMIT = new Integer(3);
57 protected final Logger logger;
58 protected ObjectMapper mapper;
59 protected long syncDurationInMs;
64 protected enum StatFlag {
65 AAI_REST_STATS, AAI_ENTITY_STATS, AAI_PROCESSING_EXCEPTION_STATS, AAI_TASK_PROCESSING_STATS, ES_REST_STATS, ES_ENTITY_STATS, ES_TASK_PROCESSING_STATS
68 protected EnumSet<StatFlag> enabledStatFlags;
70 protected ElasticSearchAdapter elasticSearchAdapter;
71 protected ActiveInventoryAdapter aaiAdapter;
73 protected ExecutorService synchronizerExecutor;
74 protected ExecutorService aaiExecutor;
75 protected ExecutorService esExecutor;
77 private RestOperationalStatistics esRestStats;
78 protected ElasticSearchEntityStatistics esEntityStats;
80 private RestOperationalStatistics aaiRestStats;
81 protected ActiveInventoryEntityStatistics aaiEntityStats;
82 private ActiveInventoryProcessingExceptionStatistics aaiProcessingExceptionStats;
84 private TaskProcessingStats aaiTaskProcessingStats;
85 private TaskProcessingStats esTaskProcessingStats;
87 private TransactionRateMonitor aaiTransactionRateController;
88 private TransactionRateMonitor esTransactionRateController;
90 protected AtomicInteger aaiWorkOnHand;
91 protected AtomicInteger esWorkOnHand;
92 protected String synchronizerName;
94 protected abstract boolean isSyncDone();
96 protected boolean shouldSkipSync;
98 public String getActiveInventoryStatisticsReport() {
100 StringBuilder sb = new StringBuilder(128);
102 if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
103 sb.append("\n\n ").append("REST Operational Stats:");
104 sb.append(aaiRestStats.getStatisticsReport());
107 if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
108 sb.append("\n\n ").append("Entity Stats:");
109 sb.append(aaiEntityStats.getStatisticsReport());
112 if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
113 sb.append("\n\n ").append("Processing Exception Stats:");
114 sb.append(aaiProcessingExceptionStats.getStatisticsReport());
117 return sb.toString();
121 public String getElasticSearchStatisticsReport() {
123 StringBuilder sb = new StringBuilder(128);
125 if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
126 sb.append("\n\n ").append("REST Operational Stats:");
127 sb.append(esRestStats.getStatisticsReport());
130 if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
131 sb.append("\n\n ").append("Entity Stats:");
132 sb.append(esEntityStats.getStatisticsReport());
135 return sb.toString();
140 * Adds the active inventory stat report.
144 private void addActiveInventoryStatReport(StringBuilder sb) {
150 sb.append("\n\n AAI");
151 sb.append(getActiveInventoryStatisticsReport());
153 double currentTps = 0;
154 if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
155 sb.append("\n\n ").append("Task Processor Stats:");
156 sb.append(aaiTaskProcessingStats.getStatisticsReport(false, " "));
158 currentTps = aaiTransactionRateController.getCurrentTps();
160 sb.append("\n ").append("Current TPS: ").append(currentTps);
163 sb.append("\n ").append("Current WOH: ").append(aaiWorkOnHand.get());
165 if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
166 if (currentTps > 0) {
167 double numMillisecondsToCompletion = (aaiWorkOnHand.get() / currentTps) * 1000;
168 sb.append("\n ").append("SyncDurationRemaining=")
169 .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
176 * Adds the elastic stat report.
180 private void addElasticStatReport(StringBuilder sb) {
186 sb.append("\n\n ELASTIC");
187 sb.append(getElasticSearchStatisticsReport());
189 double currentTps = 0;
191 if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
192 sb.append("\n\n ").append("Task Processor Stats:");
193 sb.append(esTaskProcessingStats.getStatisticsReport(false, " "));
195 currentTps = esTransactionRateController.getCurrentTps();
197 sb.append("\n ").append("Current TPS: ").append(currentTps);
200 sb.append("\n ").append("Current WOH: ").append(esWorkOnHand.get());
202 if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
203 if (currentTps > 0) {
204 double numMillisecondsToCompletion = (esWorkOnHand.get() / currentTps) * 1000;
205 sb.append("\n ").append("SyncDurationRemaining=")
206 .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
214 * Gets the stat report.
216 * @param syncOpTimeInMs the sync op time in ms
217 * @param showFinalReport the show final report
218 * @return the stat report
220 protected String getStatReport(long syncOpTimeInMs, boolean showFinalReport) {
222 StringBuilder sb = new StringBuilder(128);
224 sb.append("\n").append(synchronizerName + " Statistics: ( Sync Operation Duration = "
225 + NodeUtils.getDurationBreakdown(syncOpTimeInMs) + " )");
227 addActiveInventoryStatReport(sb);
228 addElasticStatReport(sb);
230 if (showFinalReport) {
231 sb.append("\n\n ").append("Sync Completed!\n");
233 sb.append("\n\n ").append("Sync in Progress...\n");
236 return sb.toString();
240 protected String indexName;
241 protected long syncStartedTimeStampInMs;
244 * Instantiates a new abstract entity synchronizer.
246 * @param logger the logger
247 * @param syncName the sync name
248 * @param numSyncWorkers the num sync workers
249 * @param numActiveInventoryWorkers the num active inventory workers
250 * @param numElasticsearchWorkers the num elasticsearch workers
251 * @param indexName the index name
252 * @throws Exception the exception
254 protected AbstractEntitySynchronizer(Logger logger, String syncName, int numSyncWorkers,
255 int numActiveInventoryWorkers, int numElasticsearchWorkers, String indexName,
256 NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig)
258 this.logger = logger;
259 this.synchronizerExecutor =
260 NodeUtils.createNamedExecutor(syncName + "-INTERNAL", numSyncWorkers, logger);
262 NodeUtils.createNamedExecutor(syncName + "-AAI", numActiveInventoryWorkers, logger);
264 NodeUtils.createNamedExecutor(syncName + "-ES", numElasticsearchWorkers, logger);
265 this.mapper = new ObjectMapper();
266 this.indexName = indexName;
267 this.esRestStats = new RestOperationalStatistics();
268 this.esEntityStats = new ElasticSearchEntityStatistics();
269 this.aaiRestStats = new RestOperationalStatistics();
270 this.aaiEntityStats = new ActiveInventoryEntityStatistics();
271 this.aaiProcessingExceptionStats = new ActiveInventoryProcessingExceptionStatistics();
272 this.aaiTaskProcessingStats = new TaskProcessingStats(aaiStatConfig);
273 this.esTaskProcessingStats = new TaskProcessingStats(esStatConfig);
275 this.aaiTransactionRateController =
276 new TransactionRateMonitor(numActiveInventoryWorkers, aaiStatConfig);
277 this.esTransactionRateController =
278 new TransactionRateMonitor(numElasticsearchWorkers, esStatConfig);
280 this.aaiWorkOnHand = new AtomicInteger(0);
281 this.esWorkOnHand = new AtomicInteger(0);
283 enabledStatFlags = EnumSet.allOf(StatFlag.class);
285 this.synchronizerName = "Abstact Entity Synchronizer";
287 String txnID = NodeUtils.getRandomTxnId();
288 MdcContext.initialize(txnID, "AbstractEntitySynchronizer", "", "Sync", "");
290 this.shouldSkipSync = false;
291 this.syncStartedTimeStampInMs = System.currentTimeMillis();
292 this.syncDurationInMs = -1;
295 public boolean shouldSkipSync() {
296 return shouldSkipSync;
299 public void setShouldSkipSync(boolean shouldSkipSync) {
300 this.shouldSkipSync = shouldSkipSync;
304 * Inc active inventory work on hand counter.
306 protected void incActiveInventoryWorkOnHandCounter() {
307 aaiWorkOnHand.incrementAndGet();
311 * Dec active inventory work on hand counter.
313 protected void decActiveInventoryWorkOnHandCounter() {
314 aaiWorkOnHand.decrementAndGet();
318 * Inc elastic search work on hand counter.
320 protected void incElasticSearchWorkOnHandCounter() {
321 esWorkOnHand.incrementAndGet();
325 * Dec elastic search work on hand counter.
327 protected void decElasticSearchWorkOnHandCounter() {
328 esWorkOnHand.decrementAndGet();
332 * Shutdown executors.
334 protected void shutdownExecutors() {
337 if (synchronizerExecutor != null) {
338 synchronizerExecutor.shutdown();
341 if (aaiExecutor != null) {
342 aaiExecutor.shutdown();
345 if (esExecutor != null) {
346 esExecutor.shutdown();
349 } catch (Exception exc) {
350 logger.error(AaiUiMsgs.ERROR_SHUTDOWN_EXECUTORS, exc);
357 public void clearCache() {}
359 public ElasticSearchAdapter getElasticSearchAdapter() {
360 return elasticSearchAdapter;
363 public void setElasticSearchAdapter(ElasticSearchAdapter elasticSearchAdapter) {
364 this.elasticSearchAdapter = elasticSearchAdapter;
367 public ActiveInventoryAdapter getAaiAdapter() {
371 public void setAaiAdapter(ActiveInventoryAdapter aaiAdapter) {
372 this.aaiAdapter = aaiAdapter;
376 * Gets the elastic full url.
378 * @param resourceUrl the resource url
379 * @param indexName the index name
380 * @param indexType the index type
381 * @return the elastic full url
382 * @throws Exception the exception
384 protected String getElasticFullUrl(String resourceUrl, String indexName, String indexType)
386 return ElasticSearchConfig.getConfig().getElasticFullUrl(resourceUrl, indexName, indexType);
390 * Gets the elastic full url.
392 * @param resourceUrl the resource url
393 * @param indexName the index name
394 * @return the elastic full url
395 * @throws Exception the exception
397 protected String getElasticFullUrl(String resourceUrl, String indexName) throws Exception {
398 return ElasticSearchConfig.getConfig().getElasticFullUrl(resourceUrl, indexName);
401 public String getIndexName() {
405 public void setIndexName(String indexName) {
406 this.indexName = indexName;
411 * Gets the response length.
414 * @return the response length
416 private long getResponseLength(NetworkTransaction txn) {
422 OperationResult result = txn.getOperationResult();
424 if (result == null) {
428 if (result.getResult() != null) {
429 return result.getResult().length();
436 * Update elastic search counters.
438 * @param method the method
441 protected void updateElasticSearchCounters(HttpMethod method, OperationResult or) {
442 updateElasticSearchCounters(new NetworkTransaction(method, null, or));
446 * Update elastic search counters.
448 * @param method the method
449 * @param entityType the entity type
452 protected void updateElasticSearchCounters(HttpMethod method, String entityType,
453 OperationResult or) {
454 updateElasticSearchCounters(new NetworkTransaction(method, entityType, or));
458 * Update elastic search counters.
462 protected void updateElasticSearchCounters(NetworkTransaction txn) {
464 if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
465 esRestStats.updateCounters(txn);
468 if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
469 esEntityStats.updateCounters(txn);
472 if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
474 esTransactionRateController.trackResponseTime(txn.getOpTimeInMs());
476 esTaskProcessingStats.updateTaskResponseStatsHistogram(txn.getOpTimeInMs());
477 esTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
479 // don't know the cost of the lengh calc, we'll see if it causes a
482 long responsePayloadSizeInBytes = getResponseLength(txn);
483 if (responsePayloadSizeInBytes >= 0) {
484 esTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
487 esTaskProcessingStats
488 .updateTransactionsPerSecondHistogram((long) esTransactionRateController.getCurrentTps());
493 * Update active inventory counters.
495 * @param method the method
498 protected void updateActiveInventoryCounters(HttpMethod method, OperationResult or) {
499 updateActiveInventoryCounters(new NetworkTransaction(method, null, or));
503 * Update active inventory counters.
505 * @param method the method
506 * @param entityType the entity type
509 protected void updateActiveInventoryCounters(HttpMethod method, String entityType,
510 OperationResult or) {
511 updateActiveInventoryCounters(new NetworkTransaction(method, entityType, or));
515 * Update active inventory counters.
519 protected void updateActiveInventoryCounters(NetworkTransaction txn) {
521 if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
522 aaiRestStats.updateCounters(txn);
525 if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
526 aaiEntityStats.updateCounters(txn);
529 if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
530 aaiProcessingExceptionStats.updateCounters(txn);
533 if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
534 aaiTransactionRateController.trackResponseTime(txn.getOpTimeInMs());
536 aaiTaskProcessingStats.updateTaskResponseStatsHistogram(txn.getOpTimeInMs());
537 aaiTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
539 // don't know the cost of the lengh calc, we'll see if it causes a
542 long responsePayloadSizeInBytes = getResponseLength(txn);
543 if (responsePayloadSizeInBytes >= 0) {
544 aaiTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
547 aaiTaskProcessingStats.updateTransactionsPerSecondHistogram(
548 (long) aaiTransactionRateController.getCurrentTps());
555 protected void resetCounters() {
556 aaiRestStats.reset();
557 aaiEntityStats.reset();
558 aaiProcessingExceptionStats.reset();
561 esEntityStats.reset();