Merge "update sync queries to use searh data service"
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / autosuggestion / sync / VnfAliasSuggestionSynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sparky.autosuggestion.sync;
22
23 import static java.util.concurrent.CompletableFuture.supplyAsync;
24
25 import java.util.Map;
26 import java.util.concurrent.ExecutorService;
27
28 import org.onap.aai.cl.api.Logger;
29 import org.onap.aai.cl.eelf.LoggerFactory;
30 import org.onap.aai.cl.mdc.MdcContext;
31 import org.onap.aai.restclient.client.OperationResult;
32 import org.onap.aai.sparky.dal.NetworkTransaction;
33 import org.onap.aai.sparky.dal.rest.HttpMethod;
34 import org.onap.aai.sparky.logging.AaiUiMsgs;
35 import org.onap.aai.sparky.search.filters.config.FiltersConfig;
36 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
37 import org.onap.aai.sparky.sync.IndexSynchronizer;
38 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
39 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
40 import org.onap.aai.sparky.sync.entity.AggregationSuggestionEntity;
41 import org.onap.aai.sparky.sync.enumeration.OperationState;
42 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
43 import org.onap.aai.sparky.sync.task.PerformSearchServicePut;
44 import org.onap.aai.sparky.util.NodeUtils;
45 import org.slf4j.MDC;
46
47
48 public class VnfAliasSuggestionSynchronizer extends AbstractEntitySynchronizer
49     implements IndexSynchronizer {
50
51   private static final Logger LOG =
52       LoggerFactory.getInstance().getLogger(VnfAliasSuggestionSynchronizer.class);
53
54   private boolean isSyncInProgress;
55   private boolean shouldPerformRetry;
56   private Map<String, String> contextMap;
57   protected ExecutorService esPutExecutor;
58   private FiltersConfig filtersConfig;
59
60   public VnfAliasSuggestionSynchronizer(ElasticSearchSchemaConfig schemaConfig,
61       int internalSyncWorkers, int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
62       NetworkStatisticsConfig esStatConfig, FiltersConfig filtersConfig) throws Exception {
63     super(LOG, "VASS-" + schemaConfig.getIndexName().toUpperCase(), internalSyncWorkers, aaiWorkers,
64         esWorkers, schemaConfig.getIndexName(), aaiStatConfig, esStatConfig);
65
66     this.isSyncInProgress = false;
67     this.shouldPerformRetry = false;
68     this.synchronizerName = "VNFs Alias Suggestion Synchronizer";
69     this.contextMap = MDC.getCopyOfContextMap();
70     this.esPutExecutor = NodeUtils.createNamedExecutor("ASS-ES-PUT", 2, LOG);
71     this.filtersConfig = filtersConfig;
72   }
73
74   @Override
75   protected boolean isSyncDone() {
76     int totalWorkOnHand = esWorkOnHand.get();
77
78     if (LOG.isDebugEnabled()) {
79       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
80           indexName + ", isSyncDone(), totalWorkOnHand = " + totalWorkOnHand);
81     }
82
83     if (totalWorkOnHand > 0 || !isSyncInProgress) {
84       return false;
85     }
86
87     return true;
88   }
89
90   @Override
91   public OperationState doSync() {
92     isSyncInProgress = true;
93     this.syncDurationInMs = -1;
94     syncStartedTimeStampInMs = System.currentTimeMillis();
95
96     syncEntity();
97
98     while (!isSyncDone()) {
99       try {
100         if (shouldPerformRetry) {
101           syncEntity();
102         }
103         Thread.sleep(1000);
104       } catch (Exception exc) {
105         // We don't care about this exception
106       }
107     }
108
109     return OperationState.OK;
110   }
111
112   private void syncEntity() {
113     String txnId = NodeUtils.getRandomTxnId();
114     MdcContext.initialize(txnId, synchronizerName, "", "Sync", "");
115     
116     AggregationSuggestionEntity syncEntity = new AggregationSuggestionEntity(filtersConfig);
117     syncEntity.deriveFields();
118     syncEntity.initializeFilters();
119
120     String link = null;
121     try {
122       link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), syncEntity.getId());
123     } catch (Exception exc) {
124       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
125     }
126     
127     try {
128       String jsonPayload = null;
129       jsonPayload = syncEntity.getAsJson();
130       if (link != null && jsonPayload != null) {
131
132         NetworkTransaction elasticPutTxn = new NetworkTransaction();
133         elasticPutTxn.setLink(link);
134         elasticPutTxn.setOperationType(HttpMethod.PUT);
135
136         esWorkOnHand.incrementAndGet();
137         final Map<String, String> contextMap = MDC.getCopyOfContextMap();
138         supplyAsync(new PerformSearchServicePut(jsonPayload, elasticPutTxn,
139                         searchServiceAdapter, contextMap), esPutExecutor).whenComplete((result, error) -> {
140
141               esWorkOnHand.decrementAndGet();
142
143               if (error != null) {
144                 String message = "Aggregation suggestion entity sync UPDATE PUT error - "
145                     + error.getLocalizedMessage();
146                 LOG.error(AaiUiMsgs.ES_AGGREGATION_SUGGESTION_ENTITY_SYNC_ERROR, message);
147               } else {
148                 updateElasticSearchCounters(result);
149                 wasEsOperationSuccessful(result);
150               }
151             });
152       }
153     } catch (Exception exc) {
154       String message =
155           "Exception caught during aggregation suggestion entity sync PUT operation. Message - "
156               + exc.getLocalizedMessage();
157       LOG.error(AaiUiMsgs.ES_AGGREGATION_SUGGESTION_ENTITY_SYNC_ERROR, message);
158     }
159   }
160
161   private void wasEsOperationSuccessful(NetworkTransaction result) {
162     if (result != null) {
163       OperationResult opResult = result.getOperationResult();
164
165       if (!opResult.wasSuccessful()) {
166         shouldPerformRetry = true;
167       } else {
168         isSyncInProgress = false;
169         shouldPerformRetry = false;
170       }
171     }
172   }
173
174   @Override
175   public SynchronizerState getState() {
176     if (!isSyncDone()) {
177       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
178     }
179
180     return SynchronizerState.IDLE;
181   }
182
183   @Override
184   public String getStatReport(boolean shouldDisplayFinalReport) {
185     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
186     return getStatReport(syncDurationInMs, shouldDisplayFinalReport);
187   }
188
189   @Override
190   public void shutdown() {
191     this.shutdownExecutors();
192   }
193 }