2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 package org.onap.aai.sparky.synchronizer;
25 import java.net.InetAddress;
26 import java.net.UnknownHostException;
27 import java.util.EnumSet;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.atomic.AtomicInteger;
31 import org.onap.aai.sparky.config.oxm.OxmModelLoader;
32 import org.onap.aai.sparky.dal.NetworkTransaction;
33 import org.onap.aai.sparky.dal.aai.ActiveInventoryDataProvider;
34 import org.onap.aai.sparky.dal.aai.ActiveInventoryEntityStatistics;
35 import org.onap.aai.sparky.dal.aai.ActiveInventoryProcessingExceptionStatistics;
36 import org.onap.aai.sparky.dal.aai.config.ActiveInventoryConfig;
37 import org.onap.aai.sparky.dal.elasticsearch.ElasticSearchDataProvider;
38 import org.onap.aai.sparky.dal.elasticsearch.ElasticSearchEntityStatistics;
39 import org.onap.aai.sparky.dal.elasticsearch.config.ElasticSearchConfig;
40 import org.onap.aai.sparky.dal.rest.HttpMethod;
41 import org.onap.aai.sparky.dal.rest.OperationResult;
42 import org.onap.aai.sparky.dal.rest.RestOperationalStatistics;
43 import org.onap.aai.sparky.logging.AaiUiMsgs;
44 import org.onap.aai.sparky.util.NodeUtils;
45 import org.onap.aai.cl.api.Logger;
46 import org.onap.aai.cl.mdc.MdcContext;
47 import com.fasterxml.jackson.databind.ObjectMapper;
50 * The Class AbstractEntitySynchronizer.
54 public abstract class AbstractEntitySynchronizer {
56 protected static final int VERSION_CONFLICT_EXCEPTION_CODE = 409;
57 protected static final Integer RETRY_COUNT_PER_ENTITY_LIMIT = new Integer(3);
59 protected final Logger logger;
60 protected ObjectMapper mapper;
61 protected OxmModelLoader oxmModelLoader;
62 protected long syncDurationInMs;
67 protected enum StatFlag {
68 AAI_REST_STATS, AAI_ENTITY_STATS, AAI_PROCESSING_EXCEPTION_STATS, AAI_TASK_PROCESSING_STATS, ES_REST_STATS, ES_ENTITY_STATS, ES_TASK_PROCESSING_STATS
71 protected EnumSet<StatFlag> enabledStatFlags;
73 protected ActiveInventoryDataProvider aaiDataProvider;
74 protected ElasticSearchDataProvider esDataProvider;
76 protected ExecutorService synchronizerExecutor;
77 protected ExecutorService aaiExecutor;
78 protected ExecutorService esExecutor;
80 private RestOperationalStatistics esRestStats;
81 protected ElasticSearchEntityStatistics esEntityStats;
83 private RestOperationalStatistics aaiRestStats;
84 protected ActiveInventoryEntityStatistics aaiEntityStats;
85 private ActiveInventoryProcessingExceptionStatistics aaiProcessingExceptionStats;
87 private TaskProcessingStats aaiTaskProcessingStats;
88 private TaskProcessingStats esTaskProcessingStats;
90 private TransactionRateController aaiTransactionRateController;
91 private TransactionRateController esTransactionRateController;
93 protected AtomicInteger aaiWorkOnHand;
94 protected AtomicInteger esWorkOnHand;
95 protected String synchronizerName;
97 protected abstract boolean isSyncDone();
99 protected boolean shouldSkipSync;
101 public String getActiveInventoryStatisticsReport() {
103 StringBuilder sb = new StringBuilder(128);
105 if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
106 sb.append("\n\n ").append("REST Operational Stats:");
107 sb.append(aaiRestStats.getStatisticsReport());
110 if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
111 sb.append("\n\n ").append("Entity Stats:");
112 sb.append(aaiEntityStats.getStatisticsReport());
115 if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
116 sb.append("\n\n ").append("Processing Exception Stats:");
117 sb.append(aaiProcessingExceptionStats.getStatisticsReport());
120 return sb.toString();
124 public String getElasticSearchStatisticsReport() {
126 StringBuilder sb = new StringBuilder(128);
128 if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
129 sb.append("\n\n ").append("REST Operational Stats:");
130 sb.append(esRestStats.getStatisticsReport());
133 if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
134 sb.append("\n\n ").append("Entity Stats:");
135 sb.append(esEntityStats.getStatisticsReport());
138 return sb.toString();
143 * Adds the active inventory stat report.
147 private void addActiveInventoryStatReport(StringBuilder sb) {
153 sb.append("\n\n AAI");
154 sb.append(getActiveInventoryStatisticsReport());
156 double currentTps = 0;
157 if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
158 sb.append("\n\n ").append("Task Processor Stats:");
159 sb.append(aaiTaskProcessingStats.getStatisticsReport(false, " "));
161 currentTps = aaiTransactionRateController.getCurrentTps();
163 sb.append("\n ").append("Current TPS: ").append(currentTps);
166 sb.append("\n ").append("Current WOH: ").append(aaiWorkOnHand.get());
168 if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
169 if (currentTps > 0) {
170 double numMillisecondsToCompletion = (aaiWorkOnHand.get() / currentTps) * 1000;
171 sb.append("\n ").append("SyncDurationRemaining=")
172 .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
179 * Adds the elastic stat report.
183 private void addElasticStatReport(StringBuilder sb) {
189 sb.append("\n\n ELASTIC");
190 sb.append(getElasticSearchStatisticsReport());
192 double currentTps = 0;
194 if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
195 sb.append("\n\n ").append("Task Processor Stats:");
196 sb.append(esTaskProcessingStats.getStatisticsReport(false, " "));
198 currentTps = esTransactionRateController.getCurrentTps();
200 sb.append("\n ").append("Current TPS: ").append(currentTps);
203 sb.append("\n ").append("Current WOH: ").append(esWorkOnHand.get());
205 if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
206 if (currentTps > 0) {
207 double numMillisecondsToCompletion = (esWorkOnHand.get() / currentTps) * 1000;
208 sb.append("\n ").append("SyncDurationRemaining=")
209 .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
217 * Gets the stat report.
219 * @param syncOpTimeInMs the sync op time in ms
220 * @param showFinalReport the show final report
221 * @return the stat report
223 protected String getStatReport(long syncOpTimeInMs, boolean showFinalReport) {
225 StringBuilder sb = new StringBuilder(128);
227 sb.append("\n").append(synchronizerName + " Statistics: ( Sync Operation Duration = "
228 + NodeUtils.getDurationBreakdown(syncOpTimeInMs) + " )");
230 addActiveInventoryStatReport(sb);
231 addElasticStatReport(sb);
233 if (showFinalReport) {
234 sb.append("\n\n ").append("Sync Completed!\n");
236 sb.append("\n\n ").append("Sync in Progress...\n");
239 return sb.toString();
243 protected String indexName;
244 protected long syncStartedTimeStampInMs;
247 * Instantiates a new abstract entity synchronizer.
249 * @param logger the logger
250 * @param syncName the sync name
251 * @param numSyncWorkers the num sync workers
252 * @param numActiveInventoryWorkers the num active inventory workers
253 * @param numElasticsearchWorkers the num elasticsearch workers
254 * @param indexName the index name
255 * @throws Exception the exception
257 protected AbstractEntitySynchronizer(Logger logger, String syncName, int numSyncWorkers,
258 int numActiveInventoryWorkers, int numElasticsearchWorkers, String indexName)
260 this.logger = logger;
261 this.synchronizerExecutor =
262 NodeUtils.createNamedExecutor(syncName + "-INTERNAL", numSyncWorkers, logger);
264 NodeUtils.createNamedExecutor(syncName + "-AAI", numActiveInventoryWorkers, logger);
266 NodeUtils.createNamedExecutor(syncName + "-ES", numElasticsearchWorkers, logger);
267 this.mapper = new ObjectMapper();
268 this.oxmModelLoader = OxmModelLoader.getInstance();
269 this.indexName = indexName;
270 this.esRestStats = new RestOperationalStatistics();
271 this.esEntityStats = new ElasticSearchEntityStatistics(oxmModelLoader);
272 this.aaiRestStats = new RestOperationalStatistics();
273 this.aaiEntityStats = new ActiveInventoryEntityStatistics(oxmModelLoader);
274 this.aaiProcessingExceptionStats = new ActiveInventoryProcessingExceptionStatistics();
275 this.aaiTaskProcessingStats =
276 new TaskProcessingStats(ActiveInventoryConfig.getConfig().getTaskProcessorConfig());
277 this.esTaskProcessingStats =
278 new TaskProcessingStats(ElasticSearchConfig.getConfig().getProcessorConfig());
280 this.aaiTransactionRateController =
281 new TransactionRateController(ActiveInventoryConfig.getConfig().getTaskProcessorConfig());
282 this.esTransactionRateController =
283 new TransactionRateController(ElasticSearchConfig.getConfig().getProcessorConfig());
285 this.aaiWorkOnHand = new AtomicInteger(0);
286 this.esWorkOnHand = new AtomicInteger(0);
288 enabledStatFlags = EnumSet.allOf(StatFlag.class);
290 this.synchronizerName = "Abstact Entity Synchronizer";
292 String txnID = NodeUtils.getRandomTxnId();
293 MdcContext.initialize(txnID, "AbstractEntitySynchronizer", "", "Sync", "");
295 this.shouldSkipSync = false;
296 this.syncStartedTimeStampInMs = System.currentTimeMillis();
297 this.syncDurationInMs = -1;
300 public boolean shouldSkipSync() {
301 return shouldSkipSync;
304 public void setShouldSkipSync(boolean shouldSkipSync) {
305 this.shouldSkipSync = shouldSkipSync;
309 * Inc active inventory work on hand counter.
311 protected void incActiveInventoryWorkOnHandCounter() {
312 aaiWorkOnHand.incrementAndGet();
316 * Dec active inventory work on hand counter.
318 protected void decActiveInventoryWorkOnHandCounter() {
319 aaiWorkOnHand.decrementAndGet();
323 * Inc elastic search work on hand counter.
325 protected void incElasticSearchWorkOnHandCounter() {
326 esWorkOnHand.incrementAndGet();
330 * Dec elastic search work on hand counter.
332 protected void decElasticSearchWorkOnHandCounter() {
333 esWorkOnHand.decrementAndGet();
337 * Shutdown executors.
339 protected void shutdownExecutors() {
341 synchronizerExecutor.shutdown();
342 aaiExecutor.shutdown();
343 esExecutor.shutdown();
344 aaiDataProvider.shutdown();
345 esDataProvider.shutdown();
346 } catch (Exception exc) {
347 logger.error(AaiUiMsgs.ERROR_SHUTDOWN_EXECUTORS, exc);
354 public void clearCache() {
355 if (aaiDataProvider != null) {
356 aaiDataProvider.clearCache();
360 protected ActiveInventoryDataProvider getAaiDataProvider() {
361 return aaiDataProvider;
364 public void setAaiDataProvider(ActiveInventoryDataProvider aaiDataProvider) {
365 this.aaiDataProvider = aaiDataProvider;
368 protected ElasticSearchDataProvider getEsDataProvider() {
369 return esDataProvider;
372 public void setEsDataProvider(ElasticSearchDataProvider provider) {
373 this.esDataProvider = provider;
377 * Gets the elastic full url.
379 * @param resourceUrl the resource url
380 * @param indexName the index name
381 * @param indexType the index type
382 * @return the elastic full url
383 * @throws Exception the exception
385 protected String getElasticFullUrl(String resourceUrl, String indexName, String indexType)
387 return ElasticSearchConfig.getConfig().getElasticFullUrl(resourceUrl, indexName, indexType);
391 * Gets the elastic full url.
393 * @param resourceUrl the resource url
394 * @param indexName the index name
395 * @return the elastic full url
396 * @throws Exception the exception
398 protected String getElasticFullUrl(String resourceUrl, String indexName) throws Exception {
399 return ElasticSearchConfig.getConfig().getElasticFullUrl(resourceUrl, indexName);
402 public String getIndexName() {
406 public void setIndexName(String indexName) {
407 this.indexName = indexName;
412 * Gets the response length.
415 * @return the response length
417 private long getResponseLength(NetworkTransaction txn) {
423 OperationResult result = txn.getOperationResult();
425 if (result == null) {
429 if (result.getResult() != null) {
430 return result.getResult().length();
437 * Update elastic search counters.
439 * @param method the method
442 protected void updateElasticSearchCounters(HttpMethod method, OperationResult or) {
443 updateElasticSearchCounters(new NetworkTransaction(method, null, or));
447 * Update elastic search counters.
449 * @param method the method
450 * @param entityType the entity type
453 protected void updateElasticSearchCounters(HttpMethod method, String entityType,
454 OperationResult or) {
455 updateElasticSearchCounters(new NetworkTransaction(method, entityType, or));
459 * Update elastic search counters.
463 protected void updateElasticSearchCounters(NetworkTransaction txn) {
465 if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
466 esRestStats.updateCounters(txn);
469 if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
470 esEntityStats.updateCounters(txn);
473 if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
475 esTransactionRateController.trackResponseTime(txn.getOperationResult().getResponseTimeInMs());
477 esTaskProcessingStats
478 .updateTaskResponseStatsHistogram(txn.getOperationResult().getResponseTimeInMs());
479 esTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
481 // don't know the cost of the lengh calc, we'll see if it causes a
484 long responsePayloadSizeInBytes = getResponseLength(txn);
485 if (responsePayloadSizeInBytes >= 0) {
486 esTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
489 esTaskProcessingStats
490 .updateTransactionsPerSecondHistogram((long) esTransactionRateController.getCurrentTps());
495 * Update active inventory counters.
497 * @param method the method
500 protected void updateActiveInventoryCounters(HttpMethod method, OperationResult or) {
501 updateActiveInventoryCounters(new NetworkTransaction(method, null, or));
505 * Update active inventory counters.
507 * @param method the method
508 * @param entityType the entity type
511 protected void updateActiveInventoryCounters(HttpMethod method, String entityType,
512 OperationResult or) {
513 updateActiveInventoryCounters(new NetworkTransaction(method, entityType, or));
517 * Update active inventory counters.
521 protected void updateActiveInventoryCounters(NetworkTransaction txn) {
523 if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
524 aaiRestStats.updateCounters(txn);
527 if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
528 aaiEntityStats.updateCounters(txn);
531 if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
532 aaiProcessingExceptionStats.updateCounters(txn);
535 if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
536 aaiTransactionRateController
537 .trackResponseTime(txn.getOperationResult().getResponseTimeInMs());
539 aaiTaskProcessingStats
540 .updateTaskResponseStatsHistogram(txn.getOperationResult().getResponseTimeInMs());
541 aaiTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
543 // don't know the cost of the lengh calc, we'll see if it causes a
546 long responsePayloadSizeInBytes = getResponseLength(txn);
547 if (responsePayloadSizeInBytes >= 0) {
548 aaiTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
551 aaiTaskProcessingStats.updateTransactionsPerSecondHistogram(
552 (long) aaiTransactionRateController.getCurrentTps());
559 protected void resetCounters() {
560 aaiRestStats.reset();
561 aaiEntityStats.reset();
562 aaiProcessingExceptionStats.reset();
565 esEntityStats.reset();