2 * ============LICENSE_START===================================================
3 * SPARKY (AAI UI service)
4 * ============================================================================
5 * Copyright © 2017 AT&T Intellectual Property.
6 * Copyright © 2017 Amdocs
8 * ============================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=====================================================
22 * ECOMP and OpenECOMP are trademarks
23 * and service marks of AT&T Intellectual Property.
26 package org.openecomp.sparky.synchronizer;
28 import java.net.InetAddress;
29 import java.net.UnknownHostException;
30 import java.util.EnumSet;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.atomic.AtomicInteger;
34 import org.openecomp.cl.api.Logger;
35 import org.openecomp.sparky.config.oxm.OxmModelLoader;
36 import org.openecomp.sparky.dal.NetworkTransaction;
37 import org.openecomp.sparky.dal.aai.ActiveInventoryDataProvider;
38 import org.openecomp.sparky.dal.aai.ActiveInventoryEntityStatistics;
39 import org.openecomp.sparky.dal.aai.ActiveInventoryProcessingExceptionStatistics;
40 import org.openecomp.sparky.dal.aai.config.ActiveInventoryConfig;
41 import org.openecomp.sparky.dal.elasticsearch.ElasticSearchDataProvider;
42 import org.openecomp.sparky.dal.elasticsearch.ElasticSearchEntityStatistics;
43 import org.openecomp.sparky.dal.elasticsearch.config.ElasticSearchConfig;
44 import org.openecomp.sparky.dal.rest.HttpMethod;
45 import org.openecomp.sparky.dal.rest.OperationResult;
46 import org.openecomp.sparky.dal.rest.RestOperationalStatistics;
47 import org.openecomp.sparky.logging.AaiUiMsgs;
48 import org.openecomp.sparky.util.NodeUtils;
50 import org.openecomp.cl.mdc.MdcContext;
51 import com.fasterxml.jackson.databind.ObjectMapper;
54 * The Class AbstractEntitySynchronizer.
58 public abstract class AbstractEntitySynchronizer {
60 protected static final int VERSION_CONFLICT_EXCEPTION_CODE = 409;
61 protected static final Integer RETRY_COUNT_PER_ENTITY_LIMIT = new Integer(3);
63 protected final Logger logger;
64 protected ObjectMapper mapper;
65 protected OxmModelLoader oxmModelLoader;
70 protected enum StatFlag {
71 AAI_REST_STATS, AAI_ENTITY_STATS, AAI_PROCESSING_EXCEPTION_STATS,
72 AAI_TASK_PROCESSING_STATS, ES_REST_STATS, ES_ENTITY_STATS, ES_TASK_PROCESSING_STATS
75 protected EnumSet<StatFlag> enabledStatFlags;
77 protected ActiveInventoryDataProvider aaiDataProvider;
78 protected ElasticSearchDataProvider esDataProvider;
80 protected ExecutorService synchronizerExecutor;
81 protected ExecutorService aaiExecutor;
82 protected ExecutorService esExecutor;
84 private RestOperationalStatistics esRestStats;
85 protected ElasticSearchEntityStatistics esEntityStats;
87 private RestOperationalStatistics aaiRestStats;
88 protected ActiveInventoryEntityStatistics aaiEntityStats;
89 private ActiveInventoryProcessingExceptionStatistics aaiProcessingExceptionStats;
91 private TaskProcessingStats aaiTaskProcessingStats;
92 private TaskProcessingStats esTaskProcessingStats;
94 private TransactionRateController aaiTransactionRateController;
95 private TransactionRateController esTransactionRateController;
97 protected AtomicInteger aaiWorkOnHand;
98 protected AtomicInteger esWorkOnHand;
99 protected String synchronizerName;
101 protected abstract boolean isSyncDone();
103 public String getActiveInventoryStatisticsReport() {
105 StringBuilder sb = new StringBuilder(128);
107 if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
108 sb.append("\n\n ").append("REST Operational Stats:");
109 sb.append(aaiRestStats.getStatisticsReport());
112 if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
113 sb.append("\n\n ").append("Entity Stats:");
114 sb.append(aaiEntityStats.getStatisticsReport());
117 if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
118 sb.append("\n\n ").append("Processing Exception Stats:");
119 sb.append(aaiProcessingExceptionStats.getStatisticsReport());
122 return sb.toString();
126 public String getElasticSearchStatisticsReport() {
128 StringBuilder sb = new StringBuilder(128);
130 if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
131 sb.append("\n\n ").append("REST Operational Stats:");
132 sb.append(esRestStats.getStatisticsReport());
135 if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
136 sb.append("\n\n ").append("Entity Stats:");
137 sb.append(esEntityStats.getStatisticsReport());
140 return sb.toString();
145 * Adds the active inventory stat report.
149 private void addActiveInventoryStatReport(StringBuilder sb) {
155 sb.append("\n\n AAI");
156 sb.append(getActiveInventoryStatisticsReport());
158 double currentTps = 0;
159 if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
160 sb.append("\n\n ").append("Task Processor Stats:");
161 sb.append(aaiTaskProcessingStats.getStatisticsReport(false, " "));
163 currentTps = aaiTransactionRateController.getCurrentTps();
165 sb.append("\n ").append("Current TPS: ").append(currentTps);
168 sb.append("\n ").append("Current WOH: ").append(aaiWorkOnHand.get());
170 if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
171 if (currentTps > 0) {
172 double numMillisecondsToCompletion = (aaiWorkOnHand.get() / currentTps) * 1000;
173 sb.append("\n ").append("SyncDurationRemaining=")
174 .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
181 * Adds the elastic stat report.
185 private void addElasticStatReport(StringBuilder sb) {
191 sb.append("\n\n ELASTIC");
192 sb.append(getElasticSearchStatisticsReport());
194 double currentTps = 0;
196 if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
197 sb.append("\n\n ").append("Task Processor Stats:");
198 sb.append(esTaskProcessingStats.getStatisticsReport(false, " "));
200 currentTps = esTransactionRateController.getCurrentTps();
202 sb.append("\n ").append("Current TPS: ").append(currentTps);
205 sb.append("\n ").append("Current WOH: ").append(esWorkOnHand.get());
207 if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
208 if (currentTps > 0) {
209 double numMillisecondsToCompletion = (esWorkOnHand.get() / currentTps) * 1000;
210 sb.append("\n ").append("SyncDurationRemaining=")
211 .append(NodeUtils.getDurationBreakdown((long) numMillisecondsToCompletion));
219 * Gets the stat report.
221 * @param syncOpTimeInMs the sync op time in ms
222 * @param showFinalReport the show final report
223 * @return the stat report
225 protected String getStatReport(long syncOpTimeInMs, boolean showFinalReport) {
227 StringBuilder sb = new StringBuilder(128);
229 sb.append("\n").append(synchronizerName + " Statistics: ( Sync Operation Duration = "
230 + NodeUtils.getDurationBreakdown(syncOpTimeInMs) + " )");
232 addActiveInventoryStatReport(sb);
233 addElasticStatReport(sb);
235 if (showFinalReport) {
236 sb.append("\n\n ").append("Sync Completed!\n");
238 sb.append("\n\n ").append("Sync in Progress...\n");
241 return sb.toString();
245 protected String indexName;
246 protected long syncStartedTimeStampInMs;
249 * Instantiates a new abstract entity synchronizer.
251 * @param logger the logger
252 * @param syncName the sync name
253 * @param numSyncWorkers the num sync workers
254 * @param numActiveInventoryWorkers the num active inventory workers
255 * @param numElasticsearchWorkers the num elasticsearch workers
256 * @param indexName the index name
257 * @throws Exception the exception
259 protected AbstractEntitySynchronizer(Logger logger, String syncName, int numSyncWorkers,
260 int numActiveInventoryWorkers, int numElasticsearchWorkers, String indexName)
262 this.logger = logger;
263 this.synchronizerExecutor =
264 NodeUtils.createNamedExecutor(syncName + "-INTERNAL", numSyncWorkers, logger);
266 NodeUtils.createNamedExecutor(syncName + "-AAI", numActiveInventoryWorkers, logger);
268 NodeUtils.createNamedExecutor(syncName + "-ES", numElasticsearchWorkers, logger);
269 this.mapper = new ObjectMapper();
270 this.oxmModelLoader = OxmModelLoader.getInstance();
271 this.indexName = indexName;
272 this.esRestStats = new RestOperationalStatistics();
273 this.esEntityStats = new ElasticSearchEntityStatistics(oxmModelLoader);
274 this.aaiRestStats = new RestOperationalStatistics();
275 this.aaiEntityStats = new ActiveInventoryEntityStatistics(oxmModelLoader);
276 this.aaiProcessingExceptionStats = new ActiveInventoryProcessingExceptionStatistics();
277 this.aaiTaskProcessingStats =
278 new TaskProcessingStats(ActiveInventoryConfig.getConfig().getTaskProcessorConfig());
279 this.esTaskProcessingStats =
280 new TaskProcessingStats(ElasticSearchConfig.getConfig().getProcessorConfig());
282 this.aaiTransactionRateController =
283 new TransactionRateController(ActiveInventoryConfig.getConfig().getTaskProcessorConfig());
284 this.esTransactionRateController =
285 new TransactionRateController(ElasticSearchConfig.getConfig().getProcessorConfig());
287 this.aaiWorkOnHand = new AtomicInteger(0);
288 this.esWorkOnHand = new AtomicInteger(0);
290 enabledStatFlags = EnumSet.allOf(StatFlag.class);
292 this.synchronizerName = "Abstact Entity Synchronizer";
294 String txnID = NodeUtils.getRandomTxnId();
295 MdcContext.initialize(txnID, "AbstractEntitySynchronizer", "", "Sync", "");
300 * Inc active inventory work on hand counter.
302 protected void incActiveInventoryWorkOnHandCounter() {
303 aaiWorkOnHand.incrementAndGet();
307 * Dec active inventory work on hand counter.
309 protected void decActiveInventoryWorkOnHandCounter() {
310 aaiWorkOnHand.decrementAndGet();
314 * Inc elastic search work on hand counter.
316 protected void incElasticSearchWorkOnHandCounter() {
317 esWorkOnHand.incrementAndGet();
321 * Dec elastic search work on hand counter.
323 protected void decElasticSearchWorkOnHandCounter() {
324 esWorkOnHand.decrementAndGet();
328 * Shutdown executors.
330 protected void shutdownExecutors() {
332 synchronizerExecutor.shutdown();
333 aaiExecutor.shutdown();
334 esExecutor.shutdown();
335 aaiDataProvider.shutdown();
336 esDataProvider.shutdown();
337 } catch (Exception exc) {
338 logger.error(AaiUiMsgs.ERROR_SHUTDOWN_EXECUTORS, exc );
345 public void clearCache() {
346 if (aaiDataProvider != null) {
347 aaiDataProvider.clearCache();
351 protected ActiveInventoryDataProvider getAaiDataProvider() {
352 return aaiDataProvider;
355 public void setAaiDataProvider(ActiveInventoryDataProvider aaiDataProvider) {
356 this.aaiDataProvider = aaiDataProvider;
359 protected ElasticSearchDataProvider getEsDataProvider() {
360 return esDataProvider;
363 public void setEsDataProvider(ElasticSearchDataProvider provider) {
364 this.esDataProvider = provider;
368 * Gets the elastic full url.
370 * @param resourceUrl the resource url
371 * @param indexName the index name
372 * @param indexType the index type
373 * @return the elastic full url
374 * @throws Exception the exception
376 protected String getElasticFullUrl(String resourceUrl, String indexName, String indexType)
378 return ElasticSearchConfig.getConfig().getElasticFullUrl(resourceUrl, indexName, indexType);
382 * Gets the elastic full url.
384 * @param resourceUrl the resource url
385 * @param indexName the index name
386 * @return the elastic full url
387 * @throws Exception the exception
389 protected String getElasticFullUrl(String resourceUrl, String indexName) throws Exception {
390 return ElasticSearchConfig.getConfig().getElasticFullUrl(resourceUrl, indexName);
393 public String getIndexName() {
397 public void setIndexName(String indexName) {
398 this.indexName = indexName;
403 * Gets the response length.
406 * @return the response length
408 private long getResponseLength(NetworkTransaction txn) {
414 OperationResult result = txn.getOperationResult();
416 if (result == null) {
420 if (result.getResult() != null) {
421 return result.getResult().length();
428 * Update elastic search counters.
430 * @param method the method
433 protected void updateElasticSearchCounters(HttpMethod method, OperationResult or) {
434 updateElasticSearchCounters(new NetworkTransaction(method, null, or));
438 * Update elastic search counters.
440 * @param method the method
441 * @param entityType the entity type
444 protected void updateElasticSearchCounters(HttpMethod method, String entityType,
445 OperationResult or) {
446 updateElasticSearchCounters(new NetworkTransaction(method, entityType, or));
450 * Update elastic search counters.
454 protected void updateElasticSearchCounters(NetworkTransaction txn) {
456 if (enabledStatFlags.contains(StatFlag.ES_REST_STATS)) {
457 esRestStats.updateCounters(txn);
460 if (enabledStatFlags.contains(StatFlag.ES_ENTITY_STATS)) {
461 esEntityStats.updateCounters(txn);
464 if (enabledStatFlags.contains(StatFlag.ES_TASK_PROCESSING_STATS)) {
466 esTransactionRateController.trackResponseTime(txn.getOperationResult().getResponseTimeInMs());
468 esTaskProcessingStats
469 .updateTaskResponseStatsHistogram(txn.getOperationResult().getResponseTimeInMs());
470 esTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
472 // don't know the cost of the lengh calc, we'll see if it causes a
475 long responsePayloadSizeInBytes = getResponseLength(txn);
476 if (responsePayloadSizeInBytes >= 0) {
477 esTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
480 esTaskProcessingStats
481 .updateTransactionsPerSecondHistogram((long) esTransactionRateController.getCurrentTps());
486 * Update active inventory counters.
488 * @param method the method
491 protected void updateActiveInventoryCounters(HttpMethod method, OperationResult or) {
492 updateActiveInventoryCounters(new NetworkTransaction(method, null, or));
496 * Update active inventory counters.
498 * @param method the method
499 * @param entityType the entity type
502 protected void updateActiveInventoryCounters(HttpMethod method, String entityType,
503 OperationResult or) {
504 updateActiveInventoryCounters(new NetworkTransaction(method, entityType, or));
508 * Update active inventory counters.
512 protected void updateActiveInventoryCounters(NetworkTransaction txn) {
514 if (enabledStatFlags.contains(StatFlag.AAI_REST_STATS)) {
515 aaiRestStats.updateCounters(txn);
518 if (enabledStatFlags.contains(StatFlag.AAI_ENTITY_STATS)) {
519 aaiEntityStats.updateCounters(txn);
522 if (enabledStatFlags.contains(StatFlag.AAI_PROCESSING_EXCEPTION_STATS)) {
523 aaiProcessingExceptionStats.updateCounters(txn);
526 if (enabledStatFlags.contains(StatFlag.AAI_TASK_PROCESSING_STATS)) {
527 aaiTransactionRateController
528 .trackResponseTime(txn.getOperationResult().getResponseTimeInMs());
530 aaiTaskProcessingStats
531 .updateTaskResponseStatsHistogram(txn.getOperationResult().getResponseTimeInMs());
532 aaiTaskProcessingStats.updateTaskAgeStatsHistogram(txn.getTaskAgeInMs());
534 // don't know the cost of the lengh calc, we'll see if it causes a
537 long responsePayloadSizeInBytes = getResponseLength(txn);
538 if (responsePayloadSizeInBytes >= 0) {
539 aaiTaskProcessingStats.updateResponseSizeInBytesHistogram(responsePayloadSizeInBytes);
542 aaiTaskProcessingStats.updateTransactionsPerSecondHistogram(
543 (long) aaiTransactionRateController.getCurrentTps());
550 protected void resetCounters() {
551 aaiRestStats.reset();
552 aaiEntityStats.reset();
553 aaiProcessingExceptionStats.reset();
556 esEntityStats.reset();