2 * ============LICENSE_START===================================================
3 * SPARKY (AAI UI service)
4 * ============================================================================
5 * Copyright © 2017 AT&T Intellectual Property.
6 * Copyright © 2017 Amdocs
8 * ============================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=====================================================
22 * ECOMP and OpenECOMP are trademarks
23 * and service marks of AT&T Intellectual Property.
25 package org.onap.aai.sparky.sync;
27 import java.util.EnumSet;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.atomic.AtomicInteger;
31 import org.onap.aai.cl.api.Logger;
32 import org.onap.aai.cl.mdc.MdcContext;
33 import org.onap.aai.restclient.client.OperationResult;
34 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
35 import org.onap.aai.sparky.dal.ElasticSearchAdapter;
36 import org.onap.aai.sparky.dal.NetworkTransaction;
37 import org.onap.aai.sparky.dal.aai.ActiveInventoryEntityStatistics;
38 import org.onap.aai.sparky.dal.aai.ActiveInventoryProcessingExceptionStatistics;
39 import org.onap.aai.sparky.dal.elasticsearch.ElasticSearchEntityStatistics;
40 import org.onap.aai.sparky.dal.rest.HttpMethod;
41 import org.onap.aai.sparky.dal.rest.RestOperationalStatistics;
42 import org.onap.aai.sparky.logging.AaiUiMsgs;
43 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
44 import org.onap.aai.sparky.util.NodeUtils;
46 import com.fasterxml.jackson.databind.ObjectMapper;
49 * The Class AbstractEntitySynchronizer.
53 public abstract class AbstractEntitySynchronizer {
55 protected static final int VERSION_CONFLICT_EXCEPTION_CODE = 409;
56 protected static final Integer RETRY_COUNT_PER_ENTITY_LIMIT = new Integer(3);
58 protected final Logger logger;
59 protected ObjectMapper mapper;
60 protected long syncDurationInMs;
65 protected enum StatFlag {
66 AAI_REST_STATS, AAI_ENTITY_STATS, AAI_PROCESSING_EXCEPTION_STATS,
67 AAI_TASK_PROCESSING_STATS, ES_REST_STATS, ES_ENTITY_STATS, ES_TASK_PROCESSING_STATS
70 protected EnumSet<StatFlag> enabledStatFlags;
72 protected ElasticSearchAdapter elasticSearchAdapter;
73 protected ActiveInventoryAdapter aaiAdapter;
75 protected ExecutorService synchronizerExecutor;
76 protected ExecutorService aaiExecutor;
77 protected ExecutorService esExecutor;
79 private RestOperationalStatistics esRestStats;
80 protected ElasticSearchEntityStatistics esEntityStats;
82 private RestOperationalStatistics aaiRestStats;
83 protected ActiveInventoryEntityStatistics aaiEntityStats;
84 private ActiveInventoryProcessingExceptionStatistics aaiProcessingExceptionStats;
86 private TaskProcessingStats aaiTaskProcessingStats;
87 private TaskProcessingStats esTaskProcessingStats;
89 private TransactionRateMonitor aaiTransactionRateController;
90 private TransactionRateMonitor esTransactionRateController;
92 protected AtomicInteger aaiWorkOnHand;
93 protected AtomicInteger esWorkOnHand;
94 protected String synchronizerName;
96 protected abstract boolean isSyncDone();
97 protected boolean shouldSkipSync;
99 public String getActiveInventoryStatisticsReport() {
101 StringBuilder sb = new StringBuilder(128);
103 if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
104 sb.append("\n\n ").append("REST Operational Stats:");
105 sb.append(aaiRestStats.getStatisticsReport());
108 if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
109 sb.append("\n\n ").append("Entity Stats:");
110 sb.append(aaiEntityStats.getStatisticsReport());
113 if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
114 sb.append("\n\n ").append("Processing Exception Stats:");
115 sb.append(aaiProcessingExceptionStats.getStatisticsReport());
118 return sb.toString();
122 public String getElasticSearchStatisticsReport() {
124 StringBuilder sb = new StringBuilder(128);
126 if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
127 sb.append("\n\n ").append("REST Operational Stats:");
128 sb.append(esRestStats.getStatisticsReport());
131 if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
132 sb.append("\n\n ").append("Entity Stats:");
133 sb.append(esEntityStats.getStatisticsReport());
136 return sb.toString();
141 * Adds the active inventory stat report.
145 private void addActiveInventoryStatReport(StringBuilder sb) {
151 sb.append("\n\n AAI");
152 sb.append(getActiveInventoryStatisticsReport());
154 double currentTps = 0;
155 if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
156 sb.append("\n\n ").append("Task Processor Stats:");
157 sb.append(aaiTaskProcessingStats.getStatisticsReport(false, " "));
159 currentTps = aaiTransactionRateController.getCurrentTps();
161 sb.append("\n ").append("Current TPS: ").append(currentTps);
164 sb.append("\n ").append("Current WOH: ").append(aaiWorkOnHand.get());
166 if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
167 if (currentTps > 0) {
168 double numMillisecondsToCompletion = (aaiWorkOnHand.get() / currentTps) * 1000;
169 sb.append("\n ").append("SyncDurationRemaining=")
170 .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
177 * Adds the elastic stat report.
181 private void addElasticStatReport(StringBuilder sb) {
187 sb.append("\n\n ELASTIC");
188 sb.append(getElasticSearchStatisticsReport());
190 double currentTps = 0;
192 if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
193 sb.append("\n\n ").append("Task Processor Stats:");
194 sb.append(esTaskProcessingStats.getStatisticsReport(false, " "));
196 currentTps = esTransactionRateController.getCurrentTps();
198 sb.append("\n ").append("Current TPS: ").append(currentTps);
201 sb.append("\n ").append("Current WOH: ").append(esWorkOnHand.get());
203 if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
204 if (currentTps > 0) {
205 double numMillisecondsToCompletion = (esWorkOnHand.get() / currentTps) * 1000;
206 sb.append("\n ").append("SyncDurationRemaining=")
207 .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
215 * Gets the stat report.
217 * @param syncOpTimeInMs the sync op time in ms
218 * @param showFinalReport the show final report
219 * @return the stat report
221 protected String getStatReport(long syncOpTimeInMs, boolean showFinalReport) {
223 StringBuilder sb = new StringBuilder(128);
225 sb.append("\n").append(synchronizerName + " Statistics: ( Sync Operation Duration = "
226 + NodeUtils.getDurationBreakdown(syncOpTimeInMs) + " )");
228 addActiveInventoryStatReport(sb);
229 addElasticStatReport(sb);
231 if (showFinalReport) {
232 sb.append("\n\n ").append("Sync Completed!\n");
234 sb.append("\n\n ").append("Sync in Progress...\n");
237 return sb.toString();
241 protected String indexName;
242 protected long syncStartedTimeStampInMs;
245 * Instantiates a new abstract entity synchronizer.
247 * @param logger the logger
248 * @param syncName the sync name
249 * @param numSyncWorkers the num sync workers
250 * @param numActiveInventoryWorkers the num active inventory workers
251 * @param numElasticsearchWorkers the num elasticsearch workers
252 * @param indexName the index name
253 * @throws Exception the exception
255 protected AbstractEntitySynchronizer(Logger logger, String syncName, int numSyncWorkers,
256 int numActiveInventoryWorkers, int numElasticsearchWorkers, String indexName,
257 NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig)
259 this.logger = logger;
260 this.synchronizerExecutor =
261 NodeUtils.createNamedExecutor(syncName + "-INTERNAL", numSyncWorkers, logger);
263 NodeUtils.createNamedExecutor(syncName + "-AAI", numActiveInventoryWorkers, logger);
265 NodeUtils.createNamedExecutor(syncName + "-ES", numElasticsearchWorkers, logger);
266 this.mapper = new ObjectMapper();
267 this.indexName = indexName;
268 this.esRestStats = new RestOperationalStatistics();
269 this.esEntityStats = new ElasticSearchEntityStatistics();
270 this.aaiRestStats = new RestOperationalStatistics();
271 this.aaiEntityStats = new ActiveInventoryEntityStatistics();
272 this.aaiProcessingExceptionStats = new ActiveInventoryProcessingExceptionStatistics();
273 this.aaiTaskProcessingStats =
274 new TaskProcessingStats(aaiStatConfig);
275 this.esTaskProcessingStats =
276 new TaskProcessingStats(esStatConfig);
278 this.aaiTransactionRateController =
279 new TransactionRateMonitor(numActiveInventoryWorkers, aaiStatConfig);
280 this.esTransactionRateController =
281 new TransactionRateMonitor(numElasticsearchWorkers, esStatConfig);
283 this.aaiWorkOnHand = new AtomicInteger(0);
284 this.esWorkOnHand = new AtomicInteger(0);
286 enabledStatFlags = EnumSet.allOf(StatFlag.class);
288 this.synchronizerName = "Abstact Entity Synchronizer";
290 String txnID = NodeUtils.getRandomTxnId();
291 MdcContext.initialize(txnID, "AbstractEntitySynchronizer", "", "Sync", "");
293 this.shouldSkipSync = false;
294 this.syncStartedTimeStampInMs = System.currentTimeMillis();
295 this.syncDurationInMs = -1;
298 public boolean shouldSkipSync() {
299 return shouldSkipSync;
302 public void setShouldSkipSync(boolean shouldSkipSync) {
303 this.shouldSkipSync = shouldSkipSync;
307 * Inc active inventory work on hand counter.
309 protected void incActiveInventoryWorkOnHandCounter() {
310 aaiWorkOnHand.incrementAndGet();
314 * Dec active inventory work on hand counter.
316 protected void decActiveInventoryWorkOnHandCounter() {
317 aaiWorkOnHand.decrementAndGet();
321 * Inc elastic search work on hand counter.
323 protected void incElasticSearchWorkOnHandCounter() {
324 esWorkOnHand.incrementAndGet();
328 * Dec elastic search work on hand counter.
330 protected void decElasticSearchWorkOnHandCounter() {
331 esWorkOnHand.decrementAndGet();
335 * Shutdown executors.
337 protected void shutdownExecutors() {
340 if (synchronizerExecutor != null) {
341 synchronizerExecutor.shutdown();
344 if (aaiExecutor != null) {
345 aaiExecutor.shutdown();
348 if (esExecutor != null) {
349 esExecutor.shutdown();
352 } catch (Exception exc) {
353 logger.error(AaiUiMsgs.ERROR_SHUTDOWN_EXECUTORS, exc );
360 public void clearCache() {}
362 public ElasticSearchAdapter getElasticSearchAdapter() {
363 return elasticSearchAdapter;
366 public void setElasticSearchAdapter(ElasticSearchAdapter elasticSearchAdapter) {
367 this.elasticSearchAdapter = elasticSearchAdapter;
370 public ActiveInventoryAdapter getAaiAdapter() {
374 public void setAaiAdapter(ActiveInventoryAdapter aaiAdapter) {
375 this.aaiAdapter = aaiAdapter;
378 public String getIndexName() {
382 public void setIndexName(String indexName) {
383 this.indexName = indexName;
388 * Gets the response length.
391 * @return the response length
393 private long getResponseLength(NetworkTransaction txn) {
399 OperationResult result = txn.getOperationResult();
401 if (result == null) {
405 if (result.getResult() != null) {
406 return result.getResult().length();
413 * Update elastic search counters.
415 * @param method the method
416 * @param entityType the entity type
419 protected void updateElasticSearchCounters(HttpMethod method, String entityType,
420 OperationResult or) {
421 updateElasticSearchCounters(new NetworkTransaction(method, entityType, or));
425 * Update elastic search counters.
429 protected void updateElasticSearchCounters(NetworkTransaction txn) {
431 if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
432 esRestStats.updateCounters(txn);
435 if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
436 esEntityStats.updateCounters(txn);
439 if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
441 esTransactionRateController.trackResponseTime(txn.getOpTimeInMs());
443 esTaskProcessingStats
444 .updateTaskResponseStatsHistogram(txn.getOpTimeInMs());
445 esTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
447 // don't know the cost of the lengh calc, we'll see if it causes a
450 long responsePayloadSizeInBytes = getResponseLength(txn);
451 if (responsePayloadSizeInBytes >= 0) {
452 esTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
455 esTaskProcessingStats
456 .updateTransactionsPerSecondHistogram((long) esTransactionRateController.getCurrentTps());
461 * Update active inventory counters.
463 * @param method the method
464 * @param entityType the entity type
467 protected void updateActiveInventoryCounters(HttpMethod method, String entityType,
468 OperationResult or) {
469 updateActiveInventoryCounters(new NetworkTransaction(method, entityType, or));
473 * Update active inventory counters.
477 protected void updateActiveInventoryCounters(NetworkTransaction txn) {
479 if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
480 aaiRestStats.updateCounters(txn);
483 if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
484 aaiEntityStats.updateCounters(txn);
487 if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
488 aaiProcessingExceptionStats.updateCounters(txn);
491 if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
492 aaiTransactionRateController
493 .trackResponseTime(txn.getOpTimeInMs());
495 aaiTaskProcessingStats
496 .updateTaskResponseStatsHistogram(txn.getOpTimeInMs());
497 aaiTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
499 // don't know the cost of the lengh calc, we'll see if it causes a
502 long responsePayloadSizeInBytes = getResponseLength(txn);
503 if (responsePayloadSizeInBytes >= 0) {
504 aaiTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
507 aaiTaskProcessingStats.updateTransactionsPerSecondHistogram(
508 (long) aaiTransactionRateController.getCurrentTps());
515 protected void resetCounters() {
516 aaiRestStats.reset();
517 aaiEntityStats.reset();
518 aaiProcessingExceptionStats.reset();
521 esEntityStats.reset();