Fix sonar issues :
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / autosuggestion / sync / VnfAliasSuggestionSynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sparky.autosuggestion.sync;
22
23 import static java.util.concurrent.CompletableFuture.supplyAsync;
24
25 import java.util.Map;
26 import java.util.concurrent.ExecutorService;
27
28 import org.onap.aai.cl.api.Logger;
29 import org.onap.aai.cl.eelf.LoggerFactory;
30 import org.onap.aai.cl.mdc.MdcContext;
31 import org.onap.aai.restclient.client.OperationResult;
32 import org.onap.aai.sparky.dal.NetworkTransaction;
33 import org.onap.aai.sparky.dal.rest.HttpMethod;
34 import org.onap.aai.sparky.logging.AaiUiMsgs;
35 import org.onap.aai.sparky.search.filters.config.FiltersConfig;
36 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
37 import org.onap.aai.sparky.sync.IndexSynchronizer;
38 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
39 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
40 import org.onap.aai.sparky.sync.entity.AggregationSuggestionEntity;
41 import org.onap.aai.sparky.sync.enumeration.OperationState;
42 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
43 import org.onap.aai.sparky.sync.task.PerformSearchServicePut;
44 import org.onap.aai.sparky.util.NodeUtils;
45 import org.slf4j.MDC;
46
47
48 public class VnfAliasSuggestionSynchronizer extends AbstractEntitySynchronizer
49     implements IndexSynchronizer {
50
51   private static final Logger LOG =
52       LoggerFactory.getInstance().getLogger(VnfAliasSuggestionSynchronizer.class);
53
54   private boolean isSyncInProgress;
55   private boolean shouldPerformRetry;
56   private Map<String, String> contextMap;
57   protected ExecutorService esPutExecutor;
58   private FiltersConfig filtersConfig;
59
60   public VnfAliasSuggestionSynchronizer(ElasticSearchSchemaConfig schemaConfig,
61       int internalSyncWorkers, int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
62       NetworkStatisticsConfig esStatConfig, FiltersConfig filtersConfig) throws Exception {
63     super(LOG, "VASS-" + schemaConfig.getIndexName().toUpperCase(), internalSyncWorkers, aaiWorkers,
64         esWorkers, schemaConfig.getIndexName(), aaiStatConfig, esStatConfig);
65
66     this.isSyncInProgress = false;
67     this.shouldPerformRetry = false;
68     this.synchronizerName = "VNFs Alias Suggestion Synchronizer";
69     this.contextMap = MDC.getCopyOfContextMap();
70     this.esPutExecutor = NodeUtils.createNamedExecutor("ASS-ES-PUT", 2, LOG);
71     this.filtersConfig = filtersConfig;
72   }
73
74   @Override
75   protected boolean isSyncDone() {
76     int totalWorkOnHand = esWorkOnHand.get();
77
78     if (LOG.isDebugEnabled()) {
79       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
80           indexName + ", isSyncDone(), totalWorkOnHand = " + totalWorkOnHand);
81     }
82
83     if (totalWorkOnHand > 0 || !isSyncInProgress) {
84       return false;
85     }
86
87     return true;
88   }
89
90   @Override
91   public OperationState doSync() {
92     isSyncInProgress = true;
93     this.syncDurationInMs = -1;
94     syncStartedTimeStampInMs = System.currentTimeMillis();
95
96     syncEntity();
97
98     while (!isSyncDone()) {
99       try {
100         if (shouldPerformRetry) {
101           syncEntity();
102         }
103         Thread.sleep(1000);
104       } catch (InterruptedException e) {
105         // Restore interrupted state...
106         Thread.currentThread().interrupt();
107       } catch (Exception exc) {
108         // We don't care about this exception
109       }
110     }
111
112     return OperationState.OK;
113   }
114
115   private void syncEntity() {
116     String txnId = NodeUtils.getRandomTxnId();
117     MdcContext.initialize(txnId, synchronizerName, "", "Sync", "");
118     
119     AggregationSuggestionEntity syncEntity = new AggregationSuggestionEntity(filtersConfig);
120     syncEntity.deriveFields();
121     syncEntity.initializeFilters();
122
123     String link = null;
124     try {
125       link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), syncEntity.getId());
126     } catch (Exception exc) {
127       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
128     }
129     
130     try {
131       String jsonPayload = null;
132       jsonPayload = syncEntity.getAsJson();
133       if (link != null && jsonPayload != null) {
134
135         NetworkTransaction elasticPutTxn = new NetworkTransaction();
136         elasticPutTxn.setLink(link);
137         elasticPutTxn.setOperationType(HttpMethod.PUT);
138
139         esWorkOnHand.incrementAndGet();
140         final Map<String, String> contextMap = MDC.getCopyOfContextMap();
141         supplyAsync(new PerformSearchServicePut(jsonPayload, elasticPutTxn,
142                         searchServiceAdapter, contextMap), esPutExecutor).whenComplete((result, error) -> {
143
144               esWorkOnHand.decrementAndGet();
145
146               if (error != null) {
147                 String message = "Aggregation suggestion entity sync UPDATE PUT error - "
148                     + error.getLocalizedMessage();
149                 LOG.error(AaiUiMsgs.ES_AGGREGATION_SUGGESTION_ENTITY_SYNC_ERROR, message);
150               } else {
151                 updateElasticSearchCounters(result);
152                 wasEsOperationSuccessful(result);
153               }
154             });
155       }
156     } catch (Exception exc) {
157       String message =
158           "Exception caught during aggregation suggestion entity sync PUT operation. Message - "
159               + exc.getLocalizedMessage();
160       LOG.error(AaiUiMsgs.ES_AGGREGATION_SUGGESTION_ENTITY_SYNC_ERROR, message);
161     }
162   }
163
164   private void wasEsOperationSuccessful(NetworkTransaction result) {
165     if (result != null) {
166       OperationResult opResult = result.getOperationResult();
167
168       if (!opResult.wasSuccessful()) {
169         shouldPerformRetry = true;
170       } else {
171         isSyncInProgress = false;
172         shouldPerformRetry = false;
173       }
174     }
175   }
176
177   @Override
178   public SynchronizerState getState() {
179     if (!isSyncDone()) {
180       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
181     }
182
183     return SynchronizerState.IDLE;
184   }
185
186   @Override
187   public String getStatReport(boolean shouldDisplayFinalReport) {
188     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
189     return getStatReport(syncDurationInMs, shouldDisplayFinalReport);
190   }
191
192   @Override
193   public void shutdown() {
194     this.shutdownExecutors();
195   }
196 }