2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 package org.onap.aai.sparky.synchronizer;
25 import java.net.InetAddress;
26 import java.net.UnknownHostException;
27 import java.util.EnumSet;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.atomic.AtomicInteger;
31 import org.onap.aai.sparky.config.oxm.OxmModelLoader;
32 import org.onap.aai.sparky.dal.NetworkTransaction;
33 import org.onap.aai.sparky.dal.aai.ActiveInventoryDataProvider;
34 import org.onap.aai.sparky.dal.aai.ActiveInventoryEntityStatistics;
35 import org.onap.aai.sparky.dal.aai.ActiveInventoryProcessingExceptionStatistics;
36 import org.onap.aai.sparky.dal.aai.config.ActiveInventoryConfig;
37 import org.onap.aai.sparky.dal.elasticsearch.ElasticSearchDataProvider;
38 import org.onap.aai.sparky.dal.elasticsearch.ElasticSearchEntityStatistics;
39 import org.onap.aai.sparky.dal.elasticsearch.config.ElasticSearchConfig;
40 import org.onap.aai.sparky.dal.rest.HttpMethod;
41 import org.onap.aai.sparky.dal.rest.OperationResult;
42 import org.onap.aai.sparky.dal.rest.RestOperationalStatistics;
43 import org.onap.aai.sparky.logging.AaiUiMsgs;
44 import org.onap.aai.sparky.util.NodeUtils;
45 import org.onap.aai.cl.api.Logger;
46 import org.onap.aai.cl.mdc.MdcContext;
47 import com.fasterxml.jackson.databind.ObjectMapper;
50 * The Class AbstractEntitySynchronizer.
54 public abstract class AbstractEntitySynchronizer {
56 protected static final int VERSION_CONFLICT_EXCEPTION_CODE = 409;
57 protected static final Integer RETRY_COUNT_PER_ENTITY_LIMIT = new Integer(3);
59 protected final Logger logger;
60 protected ObjectMapper mapper;
61 protected OxmModelLoader oxmModelLoader;
62 protected long syncDurationInMs;
66 protected enum StatFlag {
67 AAI_REST_STATS, AAI_ENTITY_STATS, AAI_PROCESSING_EXCEPTION_STATS,
68 AAI_TASK_PROCESSING_STATS, ES_REST_STATS, ES_ENTITY_STATS, ES_TASK_PROCESSING_STATS
71 protected EnumSet<StatFlag> enabledStatFlags;
73 protected ActiveInventoryDataProvider aaiDataProvider;
74 protected ElasticSearchDataProvider esDataProvider;
76 protected ExecutorService synchronizerExecutor;
77 protected ExecutorService aaiExecutor;
78 protected ExecutorService esExecutor;
80 private RestOperationalStatistics esRestStats;
81 protected ElasticSearchEntityStatistics esEntityStats;
83 private RestOperationalStatistics aaiRestStats;
84 protected ActiveInventoryEntityStatistics aaiEntityStats;
85 private ActiveInventoryProcessingExceptionStatistics aaiProcessingExceptionStats;
87 private TaskProcessingStats aaiTaskProcessingStats;
88 private TaskProcessingStats esTaskProcessingStats;
90 private TransactionRateController aaiTransactionRateController;
91 private TransactionRateController esTransactionRateController;
93 protected AtomicInteger aaiWorkOnHand;
94 protected AtomicInteger esWorkOnHand;
95 protected String synchronizerName;
97 protected abstract boolean isSyncDone();
98 protected boolean shouldSkipSync;
100 public String getActiveInventoryStatisticsReport() {
102 StringBuilder sb = new StringBuilder(128);
104 if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
105 sb.append("\n\n ").append("REST Operational Stats:");
106 sb.append(aaiRestStats.getStatisticsReport());
109 if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
110 sb.append("\n\n ").append("Entity Stats:");
111 sb.append(aaiEntityStats.getStatisticsReport());
114 if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
115 sb.append("\n\n ").append("Processing Exception Stats:");
116 sb.append(aaiProcessingExceptionStats.getStatisticsReport());
119 return sb.toString();
123 public String getElasticSearchStatisticsReport() {
125 StringBuilder sb = new StringBuilder(128);
127 if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
128 sb.append("\n\n ").append("REST Operational Stats:");
129 sb.append(esRestStats.getStatisticsReport());
132 if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
133 sb.append("\n\n ").append("Entity Stats:");
134 sb.append(esEntityStats.getStatisticsReport());
137 return sb.toString();
142 * Adds the active inventory stat report.
146 private void addActiveInventoryStatReport(StringBuilder sb) {
152 sb.append("\n\n AAI");
153 sb.append(getActiveInventoryStatisticsReport());
155 double currentTps = 0;
156 if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
157 sb.append("\n\n ").append("Task Processor Stats:");
158 sb.append(aaiTaskProcessingStats.getStatisticsReport(false, " "));
160 currentTps = aaiTransactionRateController.getCurrentTps();
162 sb.append("\n ").append("Current TPS: ").append(currentTps);
165 sb.append("\n ").append("Current WOH: ").append(aaiWorkOnHand.get());
167 if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
168 if (currentTps > 0) {
169 double numMillisecondsToCompletion = (aaiWorkOnHand.get() / currentTps) * 1000;
170 sb.append("\n ").append("SyncDurationRemaining=")
171 .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
178 * Adds the elastic stat report.
182 private void addElasticStatReport(StringBuilder sb) {
188 sb.append("\n\n ELASTIC");
189 sb.append(getElasticSearchStatisticsReport());
191 double currentTps = 0;
193 if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
194 sb.append("\n\n ").append("Task Processor Stats:");
195 sb.append(esTaskProcessingStats.getStatisticsReport(false, " "));
197 currentTps = esTransactionRateController.getCurrentTps();
199 sb.append("\n ").append("Current TPS: ").append(currentTps);
202 sb.append("\n ").append("Current WOH: ").append(esWorkOnHand.get());
204 if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
205 if (currentTps > 0) {
206 double numMillisecondsToCompletion = (esWorkOnHand.get() / currentTps) * 1000;
207 sb.append("\n ").append("SyncDurationRemaining=")
208 .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
216 * Gets the stat report.
218 * @param syncOpTimeInMs the sync op time in ms
219 * @param showFinalReport the show final report
220 * @return the stat report
222 protected String getStatReport(long syncOpTimeInMs, boolean showFinalReport) {
224 StringBuilder sb = new StringBuilder(128);
226 sb.append("\n").append(synchronizerName + " Statistics: ( Sync Operation Duration = "
227 + NodeUtils.getDurationBreakdown(syncOpTimeInMs) + " )");
229 addActiveInventoryStatReport(sb);
230 addElasticStatReport(sb);
232 if (showFinalReport) {
233 sb.append("\n\n ").append("Sync Completed!\n");
235 sb.append("\n\n ").append("Sync in Progress...\n");
238 return sb.toString();
242 protected String indexName;
243 protected long syncStartedTimeStampInMs;
246 * Instantiates a new abstract entity synchronizer.
248 * @param logger the logger
249 * @param syncName the sync name
250 * @param numSyncWorkers the num sync workers
251 * @param numActiveInventoryWorkers the num active inventory workers
252 * @param numElasticsearchWorkers the num elasticsearch workers
253 * @param indexName the index name
254 * @throws Exception the exception
256 protected AbstractEntitySynchronizer(Logger logger, String syncName, int numSyncWorkers,
257 int numActiveInventoryWorkers, int numElasticsearchWorkers, String indexName)
259 this.logger = logger;
260 this.synchronizerExecutor =
261 NodeUtils.createNamedExecutor(syncName + "-INTERNAL", numSyncWorkers, logger);
263 NodeUtils.createNamedExecutor(syncName + "-AAI", numActiveInventoryWorkers, logger);
265 NodeUtils.createNamedExecutor(syncName + "-ES", numElasticsearchWorkers, logger);
266 this.mapper = new ObjectMapper();
267 this.oxmModelLoader = OxmModelLoader.getInstance();
268 this.indexName = indexName;
269 this.esRestStats = new RestOperationalStatistics();
270 this.esEntityStats = new ElasticSearchEntityStatistics(oxmModelLoader);
271 this.aaiRestStats = new RestOperationalStatistics();
272 this.aaiEntityStats = new ActiveInventoryEntityStatistics(oxmModelLoader);
273 this.aaiProcessingExceptionStats = new ActiveInventoryProcessingExceptionStatistics();
274 this.aaiTaskProcessingStats =
275 new TaskProcessingStats(ActiveInventoryConfig.getConfig().getTaskProcessorConfig());
276 this.esTaskProcessingStats =
277 new TaskProcessingStats(ElasticSearchConfig.getConfig().getProcessorConfig());
279 this.aaiTransactionRateController =
280 new TransactionRateController(ActiveInventoryConfig.getConfig().getTaskProcessorConfig());
281 this.esTransactionRateController =
282 new TransactionRateController(ElasticSearchConfig.getConfig().getProcessorConfig());
284 this.aaiWorkOnHand = new AtomicInteger(0);
285 this.esWorkOnHand = new AtomicInteger(0);
287 enabledStatFlags = EnumSet.allOf(StatFlag.class);
289 this.synchronizerName = "Abstact Entity Synchronizer";
291 String txnID = NodeUtils.getRandomTxnId();
292 MdcContext.initialize(txnID, "AbstractEntitySynchronizer", "", "Sync", "");
294 this.shouldSkipSync = false;
295 this.syncStartedTimeStampInMs = System.currentTimeMillis();
296 this.syncDurationInMs = -1;
299 public boolean shouldSkipSync() {
300 return shouldSkipSync;
303 public void setShouldSkipSync(boolean shouldSkipSync) {
304 this.shouldSkipSync = shouldSkipSync;
308 * Inc active inventory work on hand counter.
310 protected void incActiveInventoryWorkOnHandCounter() {
311 aaiWorkOnHand.incrementAndGet();
315 * Dec active inventory work on hand counter.
317 protected void decActiveInventoryWorkOnHandCounter() {
318 aaiWorkOnHand.decrementAndGet();
322 * Inc elastic search work on hand counter.
324 protected void incElasticSearchWorkOnHandCounter() {
325 esWorkOnHand.incrementAndGet();
329 * Dec elastic search work on hand counter.
331 protected void decElasticSearchWorkOnHandCounter() {
332 esWorkOnHand.decrementAndGet();
336 * Shutdown executors.
338 protected void shutdownExecutors() {
340 synchronizerExecutor.shutdown();
341 aaiExecutor.shutdown();
342 esExecutor.shutdown();
343 aaiDataProvider.shutdown();
344 esDataProvider.shutdown();
345 } catch (Exception exc) {
346 logger.error(AaiUiMsgs.ERROR_SHUTDOWN_EXECUTORS, exc );
353 public void clearCache() {
354 if (aaiDataProvider != null) {
355 aaiDataProvider.clearCache();
359 protected ActiveInventoryDataProvider getAaiDataProvider() {
360 return aaiDataProvider;
363 public void setAaiDataProvider(ActiveInventoryDataProvider aaiDataProvider) {
364 this.aaiDataProvider = aaiDataProvider;
367 protected ElasticSearchDataProvider getEsDataProvider() {
368 return esDataProvider;
371 public void setEsDataProvider(ElasticSearchDataProvider provider) {
372 this.esDataProvider = provider;
376 * Gets the elastic full url.
378 * @param resourceUrl the resource url
379 * @param indexName the index name
380 * @param indexType the index type
381 * @return the elastic full url
382 * @throws Exception the exception
384 protected String getElasticFullUrl(String resourceUrl, String indexName, String indexType)
386 return ElasticSearchConfig.getConfig().getElasticFullUrl(resourceUrl, indexName, indexType);
390 * Gets the elastic full url.
392 * @param resourceUrl the resource url
393 * @param indexName the index name
394 * @return the elastic full url
395 * @throws Exception the exception
397 protected String getElasticFullUrl(String resourceUrl, String indexName) throws Exception {
398 return ElasticSearchConfig.getConfig().getElasticFullUrl(resourceUrl, indexName);
401 public String getIndexName() {
405 public void setIndexName(String indexName) {
406 this.indexName = indexName;
411 * Gets the response length.
414 * @return the response length
416 private long getResponseLength(NetworkTransaction txn) {
422 OperationResult result = txn.getOperationResult();
424 if (result == null) {
428 if (result.getResult() != null) {
429 return result.getResult().length();
436 * Update elastic search counters.
438 * @param method the method
441 protected void updateElasticSearchCounters(HttpMethod method, OperationResult or) {
442 updateElasticSearchCounters(new NetworkTransaction(method, null, or));
446 * Update elastic search counters.
448 * @param method the method
449 * @param entityType the entity type
452 protected void updateElasticSearchCounters(HttpMethod method, String entityType,
453 OperationResult or) {
454 updateElasticSearchCounters(new NetworkTransaction(method, entityType, or));
458 * Update elastic search counters.
462 protected void updateElasticSearchCounters(NetworkTransaction txn) {
464 if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
465 esRestStats.updateCounters(txn);
468 if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
469 esEntityStats.updateCounters(txn);
472 if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
474 esTransactionRateController.trackResponseTime(txn.getOperationResult().getResponseTimeInMs());
476 esTaskProcessingStats
477 .updateTaskResponseStatsHistogram(txn.getOperationResult().getResponseTimeInMs());
478 esTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
480 // don't know the cost of the lengh calc, we'll see if it causes a
483 long responsePayloadSizeInBytes = getResponseLength(txn);
484 if (responsePayloadSizeInBytes >= 0) {
485 esTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
488 esTaskProcessingStats
489 .updateTransactionsPerSecondHistogram((long) esTransactionRateController.getCurrentTps());
494 * Update active inventory counters.
496 * @param method the method
499 protected void updateActiveInventoryCounters(HttpMethod method, OperationResult or) {
500 updateActiveInventoryCounters(new NetworkTransaction(method, null, or));
504 * Update active inventory counters.
506 * @param method the method
507 * @param entityType the entity type
510 protected void updateActiveInventoryCounters(HttpMethod method, String entityType,
511 OperationResult or) {
512 updateActiveInventoryCounters(new NetworkTransaction(method, entityType, or));
516 * Update active inventory counters.
520 protected void updateActiveInventoryCounters(NetworkTransaction txn) {
522 if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
523 aaiRestStats.updateCounters(txn);
526 if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
527 aaiEntityStats.updateCounters(txn);
530 if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
531 aaiProcessingExceptionStats.updateCounters(txn);
534 if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
535 aaiTransactionRateController
536 .trackResponseTime(txn.getOperationResult().getResponseTimeInMs());
538 aaiTaskProcessingStats
539 .updateTaskResponseStatsHistogram(txn.getOperationResult().getResponseTimeInMs());
540 aaiTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
542 // don't know the cost of the lengh calc, we'll see if it causes a
545 long responsePayloadSizeInBytes = getResponseLength(txn);
546 if (responsePayloadSizeInBytes >= 0) {
547 aaiTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
550 aaiTaskProcessingStats.updateTransactionsPerSecondHistogram(
551 (long) aaiTransactionRateController.getCurrentTps());
558 protected void resetCounters() {
559 aaiRestStats.reset();
560 aaiEntityStats.reset();
561 aaiProcessingExceptionStats.reset();
564 esEntityStats.reset();