Convert Sparky to Spring-Boot
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / autosuggestion / sync / VnfAliasSuggestionSynchronizer.java
1 /**
2  * ============LICENSE_START===================================================
3  * SPARKY (AAI UI service)
4  * ============================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=====================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25 package org.onap.aai.sparky.autosuggestion.sync;
26
27 import static java.util.concurrent.CompletableFuture.supplyAsync;
28
29 import java.util.Map;
30 import java.util.concurrent.ExecutorService;
31
32 import org.onap.aai.cl.api.Logger;
33 import org.onap.aai.cl.eelf.LoggerFactory;
34 import org.onap.aai.cl.mdc.MdcContext;
35 import org.onap.aai.restclient.client.OperationResult;
36 import org.onap.aai.sparky.dal.NetworkTransaction;
37 import org.onap.aai.sparky.dal.rest.HttpMethod;
38 import org.onap.aai.sparky.logging.AaiUiMsgs;
39 import org.onap.aai.sparky.search.filters.config.FiltersConfig;
40 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
41 import org.onap.aai.sparky.sync.IndexSynchronizer;
42 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
43 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
44 import org.onap.aai.sparky.sync.entity.AggregationSuggestionEntity;
45 import org.onap.aai.sparky.sync.enumeration.OperationState;
46 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
47 import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
48 import org.onap.aai.sparky.util.NodeUtils;
49 import org.slf4j.MDC;
50
51
52 public class VnfAliasSuggestionSynchronizer extends AbstractEntitySynchronizer
53     implements IndexSynchronizer {
54
55   private static final Logger LOG =
56       LoggerFactory.getInstance().getLogger(VnfAliasSuggestionSynchronizer.class);
57
58   private boolean isSyncInProgress;
59   private boolean shouldPerformRetry;
60   private Map<String, String> contextMap;
61   protected ExecutorService esPutExecutor;
62   private FiltersConfig filtersConfig;
63
64   public VnfAliasSuggestionSynchronizer(ElasticSearchSchemaConfig schemaConfig,
65       int internalSyncWorkers, int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
66       NetworkStatisticsConfig esStatConfig, FiltersConfig filtersConfig) throws Exception {
67     super(LOG, "VASS-" + schemaConfig.getIndexName().toUpperCase(), internalSyncWorkers, aaiWorkers,
68         esWorkers, schemaConfig.getIndexName(), aaiStatConfig, esStatConfig);
69
70     this.isSyncInProgress = false;
71     this.shouldPerformRetry = false;
72     this.synchronizerName = "VNFs Alias Suggestion Synchronizer";
73     this.contextMap = MDC.getCopyOfContextMap();
74     this.esPutExecutor = NodeUtils.createNamedExecutor("ASS-ES-PUT", 2, LOG);
75     this.filtersConfig = filtersConfig;
76   }
77
78   @Override
79   protected boolean isSyncDone() {
80     int totalWorkOnHand = esWorkOnHand.get();
81
82     if (LOG.isDebugEnabled()) {
83       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
84           indexName + ", isSyncDone(), totalWorkOnHand = " + totalWorkOnHand);
85     }
86
87     if (totalWorkOnHand > 0 || !isSyncInProgress) {
88       return false;
89     }
90
91     return true;
92   }
93
94   @Override
95   public OperationState doSync() {
96     isSyncInProgress = true;
97     this.syncDurationInMs = -1;
98     syncStartedTimeStampInMs = System.currentTimeMillis();
99
100     syncEntity();
101
102     while (!isSyncDone()) {
103       try {
104         if (shouldPerformRetry) {
105           syncEntity();
106         }
107         Thread.sleep(1000);
108       } catch (Exception exc) {
109         // We don't care about this exception
110       }
111     }
112
113     return OperationState.OK;
114   }
115
116   private void syncEntity() {
117     String txnId = NodeUtils.getRandomTxnId();
118     MdcContext.initialize(txnId, synchronizerName, "", "Sync", "");
119     
120     AggregationSuggestionEntity syncEntity = new AggregationSuggestionEntity(filtersConfig);
121     syncEntity.deriveFields();
122     syncEntity.initializeFilters();
123
124     String link = null;
125     try {
126       link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), syncEntity.getId());
127     } catch (Exception exc) {
128       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
129     }
130     
131     try {
132       String jsonPayload = null;
133       jsonPayload = syncEntity.getAsJson();
134       if (link != null && jsonPayload != null) {
135
136         NetworkTransaction elasticPutTxn = new NetworkTransaction();
137         elasticPutTxn.setLink(link);
138         elasticPutTxn.setOperationType(HttpMethod.PUT);
139
140         esWorkOnHand.incrementAndGet();
141         final Map<String, String> contextMap = MDC.getCopyOfContextMap();
142         supplyAsync(new PerformElasticSearchPut(jsonPayload, elasticPutTxn,
143             elasticSearchAdapter, contextMap), esPutExecutor).whenComplete((result, error) -> {
144
145               esWorkOnHand.decrementAndGet();
146
147               if (error != null) {
148                 String message = "Aggregation suggestion entity sync UPDATE PUT error - "
149                     + error.getLocalizedMessage();
150                 LOG.error(AaiUiMsgs.ES_AGGREGATION_SUGGESTION_ENTITY_SYNC_ERROR, message);
151               } else {
152                 updateElasticSearchCounters(result);
153                 wasEsOperationSuccessful(result);
154               }
155             });
156       }
157     } catch (Exception exc) {
158       String message =
159           "Exception caught during aggregation suggestion entity sync PUT operation. Message - "
160               + exc.getLocalizedMessage();
161       LOG.error(AaiUiMsgs.ES_AGGREGATION_SUGGESTION_ENTITY_SYNC_ERROR, message);
162     }
163   }
164
165   private void wasEsOperationSuccessful(NetworkTransaction result) {
166     if (result != null) {
167       OperationResult opResult = result.getOperationResult();
168
169       if (!opResult.wasSuccessful()) {
170         shouldPerformRetry = true;
171       } else {
172         isSyncInProgress = false;
173         shouldPerformRetry = false;
174       }
175     }
176   }
177
178   @Override
179   public SynchronizerState getState() {
180     if (!isSyncDone()) {
181       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
182     }
183
184     return SynchronizerState.IDLE;
185   }
186
187   @Override
188   public String getStatReport(boolean shouldDisplayFinalReport) {
189     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
190     return getStatReport(syncDurationInMs, shouldDisplayFinalReport);
191   }
192
193   @Override
194   public void shutdown() {
195     this.shutdownExecutors();
196   }
197 }