2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.aai.sparky.sync;
23 import java.util.EnumSet;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.atomic.AtomicInteger;
26 import java.util.function.Consumer;
28 import org.onap.aai.cl.api.Logger;
29 import org.onap.aai.cl.eelf.LoggerFactory;
30 import org.onap.aai.cl.mdc.MdcContext;
31 import org.onap.aai.restclient.client.OperationResult;
32 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
33 import org.onap.aai.sparky.search.SearchServiceAdapter;
34 import org.onap.aai.sparky.dal.NetworkTransaction;
35 import org.onap.aai.sparky.dal.aai.ActiveInventoryEntityStatistics;
36 import org.onap.aai.sparky.dal.aai.ActiveInventoryProcessingExceptionStatistics;
37 import org.onap.aai.sparky.dal.elasticsearch.ElasticSearchEntityStatistics;
38 import org.onap.aai.sparky.dal.rest.HttpMethod;
39 import org.onap.aai.sparky.dal.rest.RestOperationalStatistics;
40 import org.onap.aai.sparky.logging.AaiUiMsgs;
41 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
42 import org.onap.aai.sparky.sync.task.PerformSearchServiceRetrieval;
43 import org.onap.aai.sparky.util.NodeUtils;
45 import com.fasterxml.jackson.databind.ObjectMapper;
47 import static java.util.concurrent.CompletableFuture.supplyAsync;
50 * The Class AbstractEntitySynchronizer.
54 public abstract class AbstractEntitySynchronizer {
56 protected static final Logger LOG =
57 LoggerFactory.getInstance().getLogger(AbstractEntitySynchronizer.class);
58 protected static final int VERSION_CONFLICT_EXCEPTION_CODE = 409;
59 protected static final Integer RETRY_COUNT_PER_ENTITY_LIMIT = new Integer(3);
61 protected final Logger logger;
62 protected ObjectMapper mapper;
63 protected long syncDurationInMs;
68 protected enum StatFlag {
69 AAI_REST_STATS, AAI_ENTITY_STATS, AAI_PROCESSING_EXCEPTION_STATS,
70 AAI_TASK_PROCESSING_STATS, ES_REST_STATS, ES_ENTITY_STATS, ES_TASK_PROCESSING_STATS
73 protected EnumSet<StatFlag> enabledStatFlags;
75 protected SearchServiceAdapter searchServiceAdapter;
76 protected ActiveInventoryAdapter aaiAdapter;
78 protected ExecutorService synchronizerExecutor;
79 protected ExecutorService aaiExecutor;
80 protected ExecutorService esExecutor;
82 private RestOperationalStatistics esRestStats;
83 protected ElasticSearchEntityStatistics esEntityStats;
85 private RestOperationalStatistics aaiRestStats;
86 protected ActiveInventoryEntityStatistics aaiEntityStats;
87 private ActiveInventoryProcessingExceptionStatistics aaiProcessingExceptionStats;
89 private TaskProcessingStats aaiTaskProcessingStats;
90 private TaskProcessingStats esTaskProcessingStats;
92 private TransactionRateMonitor aaiTransactionRateController;
93 private TransactionRateMonitor esTransactionRateController;
95 protected AtomicInteger aaiWorkOnHand;
96 protected AtomicInteger esWorkOnHand;
97 protected String synchronizerName;
99 protected abstract boolean isSyncDone();
100 protected boolean shouldSkipSync;
102 public String getActiveInventoryStatisticsReport() {
104 StringBuilder sb = new StringBuilder(128);
106 if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
107 sb.append("\n\n ").append("REST Operational Stats:");
108 sb.append(aaiRestStats.getStatisticsReport());
111 if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
112 sb.append("\n\n ").append("Entity Stats:");
113 sb.append(aaiEntityStats.getStatisticsReport());
116 if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
117 sb.append("\n\n ").append("Processing Exception Stats:");
118 sb.append(aaiProcessingExceptionStats.getStatisticsReport());
121 return sb.toString();
125 public String getElasticSearchStatisticsReport() {
127 StringBuilder sb = new StringBuilder(128);
129 if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
130 sb.append("\n\n ").append("REST Operational Stats:");
131 sb.append(esRestStats.getStatisticsReport());
134 if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
135 sb.append("\n\n ").append("Entity Stats:");
136 sb.append(esEntityStats.getStatisticsReport());
139 return sb.toString();
144 * Adds the active inventory stat report.
148 private void addActiveInventoryStatReport(StringBuilder sb) {
154 sb.append("\n\n AAI");
155 sb.append(getActiveInventoryStatisticsReport());
157 double currentTps = 0;
158 if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
159 sb.append("\n\n ").append("Task Processor Stats:");
160 sb.append(aaiTaskProcessingStats.getStatisticsReport(false, " "));
162 currentTps = aaiTransactionRateController.getCurrentTps();
164 sb.append("\n ").append("Current TPS: ").append(currentTps);
167 sb.append("\n ").append("Current WOH: ").append(aaiWorkOnHand.get());
169 if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
170 if (currentTps > 0) {
171 double numMillisecondsToCompletion = (aaiWorkOnHand.get() / currentTps) * 1000;
172 sb.append("\n ").append("SyncDurationRemaining=")
173 .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
180 * Adds the elastic stat report.
184 private void addElasticStatReport(StringBuilder sb) {
190 sb.append("\n\n ELASTIC");
191 sb.append(getElasticSearchStatisticsReport());
193 double currentTps = 0;
195 if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
196 sb.append("\n\n ").append("Task Processor Stats:");
197 sb.append(esTaskProcessingStats.getStatisticsReport(false, " "));
199 currentTps = esTransactionRateController.getCurrentTps();
201 sb.append("\n ").append("Current TPS: ").append(currentTps);
204 sb.append("\n ").append("Current WOH: ").append(esWorkOnHand.get());
206 if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
207 if (currentTps > 0) {
208 double numMillisecondsToCompletion = (esWorkOnHand.get() / currentTps) * 1000;
209 sb.append("\n ").append("SyncDurationRemaining=")
210 .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
218 * Gets the stat report.
220 * @param syncOpTimeInMs the sync op time in ms
221 * @param showFinalReport the show final report
222 * @return the stat report
224 protected String getStatReport(long syncOpTimeInMs, boolean showFinalReport) {
226 StringBuilder sb = new StringBuilder(128);
228 sb.append("\n").append(synchronizerName + " Statistics: ( Sync Operation Duration = "
229 + NodeUtils.getDurationBreakdown(syncOpTimeInMs) + " )");
231 addActiveInventoryStatReport(sb);
232 addElasticStatReport(sb);
234 if (showFinalReport) {
235 sb.append("\n\n ").append("Sync Completed!\n");
237 sb.append("\n\n ").append("Sync in Progress...\n");
240 return sb.toString();
244 protected String indexName;
245 protected long syncStartedTimeStampInMs;
248 * Instantiates a new abstract entity synchronizer.
250 * @param logger the logger
251 * @param syncName the sync name
252 * @param numSyncWorkers the num sync workers
253 * @param numActiveInventoryWorkers the num active inventory workers
254 * @param numElasticsearchWorkers the num elasticsearch workers
255 * @param indexName the index name
256 * @throws Exception the exception
258 protected AbstractEntitySynchronizer(Logger logger, String syncName, int numSyncWorkers,
259 int numActiveInventoryWorkers, int numElasticsearchWorkers, String indexName,
260 NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig)
262 this.logger = logger;
263 this.synchronizerExecutor =
264 NodeUtils.createNamedExecutor(syncName + "-INTERNAL", numSyncWorkers, logger);
266 NodeUtils.createNamedExecutor(syncName + "-AAI", numActiveInventoryWorkers, logger);
268 NodeUtils.createNamedExecutor(syncName + "-ES", numElasticsearchWorkers, logger);
269 this.mapper = new ObjectMapper();
270 this.indexName = indexName;
271 this.esRestStats = new RestOperationalStatistics();
272 this.esEntityStats = new ElasticSearchEntityStatistics();
273 this.aaiRestStats = new RestOperationalStatistics();
274 this.aaiEntityStats = new ActiveInventoryEntityStatistics();
275 this.aaiProcessingExceptionStats = new ActiveInventoryProcessingExceptionStatistics();
276 this.aaiTaskProcessingStats =
277 new TaskProcessingStats(aaiStatConfig);
278 this.esTaskProcessingStats =
279 new TaskProcessingStats(esStatConfig);
281 this.aaiTransactionRateController =
282 new TransactionRateMonitor(numActiveInventoryWorkers, aaiStatConfig);
283 this.esTransactionRateController =
284 new TransactionRateMonitor(numElasticsearchWorkers, esStatConfig);
286 this.aaiWorkOnHand = new AtomicInteger(0);
287 this.esWorkOnHand = new AtomicInteger(0);
289 enabledStatFlags = EnumSet.allOf(StatFlag.class);
291 this.synchronizerName = "Abstact Entity Synchronizer";
293 String txnID = NodeUtils.getRandomTxnId();
294 MdcContext.initialize(txnID, "AbstractEntitySynchronizer", "", "Sync", "");
296 this.shouldSkipSync = false;
297 this.syncStartedTimeStampInMs = System.currentTimeMillis();
298 this.syncDurationInMs = -1;
301 public boolean shouldSkipSync() {
302 return shouldSkipSync;
305 public void setShouldSkipSync(boolean shouldSkipSync) {
306 this.shouldSkipSync = shouldSkipSync;
310 * Inc active inventory work on hand counter.
312 protected void incActiveInventoryWorkOnHandCounter() {
313 aaiWorkOnHand.incrementAndGet();
317 * Dec active inventory work on hand counter.
319 protected void decActiveInventoryWorkOnHandCounter() {
320 aaiWorkOnHand.decrementAndGet();
324 * Inc elastic search work on hand counter.
326 protected void incElasticSearchWorkOnHandCounter() {
327 esWorkOnHand.incrementAndGet();
331 * Dec elastic search work on hand counter.
333 protected void decElasticSearchWorkOnHandCounter() {
334 esWorkOnHand.decrementAndGet();
338 * Shutdown executors.
340 protected void shutdownExecutors() {
343 if (synchronizerExecutor != null) {
344 synchronizerExecutor.shutdown();
347 if (aaiExecutor != null) {
348 aaiExecutor.shutdown();
351 if (esExecutor != null) {
352 esExecutor.shutdown();
355 } catch (Exception exc) {
356 logger.error(AaiUiMsgs.ERROR_SHUTDOWN_EXECUTORS, exc );
363 public void clearCache() {}
365 public SearchServiceAdapter getSearchServiceAdapter() {
366 return searchServiceAdapter;
369 public void setSearchServiceAdapter(SearchServiceAdapter searchServiceAdapter) {
370 this.searchServiceAdapter = searchServiceAdapter;
373 public ActiveInventoryAdapter getAaiAdapter() {
377 public void setAaiAdapter(ActiveInventoryAdapter aaiAdapter) {
378 this.aaiAdapter = aaiAdapter;
381 public String getIndexName() {
385 public void setIndexName(String indexName) {
386 this.indexName = indexName;
391 * Gets the response length.
394 * @return the response length
396 private long getResponseLength(NetworkTransaction txn) {
402 OperationResult result = txn.getOperationResult();
404 if (result == null) {
408 if (result.getResult() != null) {
409 return result.getResult().length();
416 * Update elastic search counters.
418 * @param method the method
419 * @param entityType the entity type
422 protected void updateElasticSearchCounters(HttpMethod method, String entityType,
423 OperationResult or) {
424 updateElasticSearchCounters(new NetworkTransaction(method, entityType, or));
428 * Update elastic search counters.
432 protected void updateElasticSearchCounters(NetworkTransaction txn) {
434 if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
435 esRestStats.updateCounters(txn);
438 if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
439 esEntityStats.updateCounters(txn);
442 if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
444 esTransactionRateController.trackResponseTime(txn.getOpTimeInMs());
446 esTaskProcessingStats
447 .updateTaskResponseStatsHistogram(txn.getOpTimeInMs());
448 esTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
450 // don't know the cost of the lengh calc, we'll see if it causes a
453 long responsePayloadSizeInBytes = getResponseLength(txn);
454 if (responsePayloadSizeInBytes >= 0) {
455 esTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
458 esTaskProcessingStats
459 .updateTransactionsPerSecondHistogram((long) esTransactionRateController.getCurrentTps());
464 * Update active inventory counters.
466 * @param method the method
467 * @param entityType the entity type
470 protected void updateActiveInventoryCounters(HttpMethod method, String entityType,
471 OperationResult or) {
472 updateActiveInventoryCounters(new NetworkTransaction(method, entityType, or));
476 * Update active inventory counters.
480 protected void updateActiveInventoryCounters(NetworkTransaction txn) {
482 if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
483 aaiRestStats.updateCounters(txn);
486 if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
487 aaiEntityStats.updateCounters(txn);
490 if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
491 aaiProcessingExceptionStats.updateCounters(txn);
494 if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
495 aaiTransactionRateController
496 .trackResponseTime(txn.getOpTimeInMs());
498 aaiTaskProcessingStats
499 .updateTaskResponseStatsHistogram(txn.getOpTimeInMs());
500 aaiTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
502 // don't know the cost of the lengh calc, we'll see if it causes a
505 long responsePayloadSizeInBytes = getResponseLength(txn);
506 if (responsePayloadSizeInBytes >= 0) {
507 aaiTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
510 aaiTaskProcessingStats.updateTransactionsPerSecondHistogram(
511 (long) aaiTransactionRateController.getCurrentTps());
518 protected void resetCounters() {
519 aaiRestStats.reset();
520 aaiEntityStats.reset();
521 aaiProcessingExceptionStats.reset();
524 esEntityStats.reset();
528 protected void performRetrySync(String id, Consumer<NetworkTransaction> networkTransactionConsumer, NetworkTransaction txn) {
532 * In this retry flow the se object has already derived its fields
534 link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), id);
535 } catch (Exception exc) {
536 LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
540 NetworkTransaction retryTransaction = new NetworkTransaction();
541 retryTransaction.setLink(link);
542 retryTransaction.setEntityType(txn.getEntityType());
543 retryTransaction.setDescriptor(txn.getDescriptor());
544 retryTransaction.setOperationType(HttpMethod.GET);
547 * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
548 * called incrementAndGet when queuing the failed PUT!
551 supplyAsync(new PerformSearchServiceRetrieval(retryTransaction, searchServiceAdapter),
552 esExecutor).whenComplete((result, error) -> {
554 esWorkOnHand.decrementAndGet();
557 LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
559 updateElasticSearchCounters(result);
560 networkTransactionConsumer.accept(result);