Initial commit for AAI-UI(sparky-backend)
[aai/sparky-be.git] / src / main / java / org / openecomp / sparky / synchronizer / AggregationSuggestionSynchronizer.java
1 /**
2  * ============LICENSE_START===================================================
3  * SPARKY (AAI UI service)
4  * ============================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=====================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25
26 package org.openecomp.sparky.synchronizer;
27
28 import static java.util.concurrent.CompletableFuture.supplyAsync;
29
30 import org.openecomp.cl.mdc.MdcContext;
31
32 import java.net.InetAddress;
33 import java.net.UnknownHostException;
34 import java.util.Map;
35 import java.util.concurrent.ExecutorService;
36
37 import org.openecomp.cl.api.Logger;
38 import org.openecomp.cl.eelf.LoggerFactory;
39 import org.openecomp.sparky.dal.NetworkTransaction;
40 import org.openecomp.sparky.dal.rest.HttpMethod;
41 import org.openecomp.sparky.dal.rest.OperationResult;
42 import org.openecomp.sparky.logging.AaiUiMsgs;
43 import org.openecomp.sparky.synchronizer.entity.AggregationSuggestionEntity;
44 import org.openecomp.sparky.synchronizer.enumeration.OperationState;
45 import org.openecomp.sparky.synchronizer.enumeration.SynchronizerState;
46 import org.openecomp.sparky.synchronizer.task.PerformElasticSearchPut;
47 import org.openecomp.sparky.util.NodeUtils;
48 import org.slf4j.MDC;
49
50 public class AggregationSuggestionSynchronizer extends AbstractEntitySynchronizer
51     implements IndexSynchronizer {
52
53   private static final Logger LOG =
54       LoggerFactory.getInstance().getLogger(AggregationSuggestionSynchronizer.class);
55
56   private boolean isSyncInProgress;
57   private boolean shouldPerformRetry;
58   private Map<String, String> contextMap;
59   protected ExecutorService esPutExecutor;
60
61   public AggregationSuggestionSynchronizer(String indexName) throws Exception {
62     super(LOG, "ASS-" + indexName.toUpperCase(), 2, 5, 5, indexName);
63
64     this.isSyncInProgress = false;
65     this.shouldPerformRetry = false;
66     this.synchronizerName = "Aggregation Suggestion Synchronizer";
67     this.contextMap = MDC.getCopyOfContextMap();
68     this.esPutExecutor = NodeUtils.createNamedExecutor("ASS-ES-PUT", 2, LOG);
69   }
70
71   @Override
72   protected boolean isSyncDone() {
73     int totalWorkOnHand = esWorkOnHand.get();
74
75     if (LOG.isDebugEnabled()) {
76       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
77           indexName + ", isSyncDone(), totalWorkOnHand = " + totalWorkOnHand);
78     }
79
80     if (totalWorkOnHand > 0 || !isSyncInProgress) {
81       return false;
82     }
83
84     return true;
85   }
86
87   @Override
88   public OperationState doSync() {
89     isSyncInProgress = true;
90
91     syncEntity();
92
93     while (!isSyncDone()) {
94       try {
95         if (shouldPerformRetry) {
96           syncEntity();
97         }
98         Thread.sleep(1000);
99       } catch (Exception exc) {
100         // We don't care about this exception
101       }
102     }
103
104     return OperationState.OK;
105   }
106
107   private void syncEntity() {
108     String txnId = NodeUtils.getRandomTxnId();
109     MdcContext.initialize(txnId, "AggregationSuggestionSynchronizer", "", "Sync", "");
110     
111     AggregationSuggestionEntity syncEntity = new AggregationSuggestionEntity();
112     syncEntity.deriveFields();
113
114     String link = null;
115     try {
116       link = getElasticFullUrl("/" + syncEntity.getId(), getIndexName());
117     } catch (Exception exc) {
118       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
119     }
120     
121     try {
122       String jsonPayload = null;
123       jsonPayload = syncEntity.getIndexDocumentJson();
124       if (link != null && jsonPayload != null) {
125
126         NetworkTransaction elasticPutTxn = new NetworkTransaction();
127         elasticPutTxn.setLink(link);
128         elasticPutTxn.setOperationType(HttpMethod.PUT);
129
130         esWorkOnHand.incrementAndGet();
131         final Map<String, String> contextMap = MDC.getCopyOfContextMap();
132         supplyAsync(new PerformElasticSearchPut(jsonPayload, elasticPutTxn,
133             esDataProvider, contextMap), esPutExecutor).whenComplete((result, error) -> {
134
135               esWorkOnHand.decrementAndGet();
136
137               if (error != null) {
138                 String message = "Aggregation suggestion entity sync UPDATE PUT error - "
139                     + error.getLocalizedMessage();
140                 LOG.error(AaiUiMsgs.ES_AGGREGATION_SUGGESTION_ENTITY_SYNC_ERROR, message);
141               } else {
142                 updateElasticSearchCounters(result);
143                 wasEsOperationSuccessful(result);
144               }
145             });
146       }
147     } catch (Exception exc) {
148       String message =
149           "Exception caught during aggregation suggestion entity sync PUT operation. Message - "
150               + exc.getLocalizedMessage();
151       LOG.error(AaiUiMsgs.ES_AGGREGATION_SUGGESTION_ENTITY_SYNC_ERROR, message);
152     }
153   }
154
155   private void wasEsOperationSuccessful(NetworkTransaction result) {
156     if (result != null) {
157       OperationResult opResult = result.getOperationResult();
158
159       if (!opResult.wasSuccessful()) {
160         shouldPerformRetry = true;
161       } else {
162         isSyncInProgress = false;
163         shouldPerformRetry = false;
164       }
165     }
166   }
167
168   @Override
169   public SynchronizerState getState() {
170     if (!isSyncDone()) {
171       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
172     }
173
174     return SynchronizerState.IDLE;
175   }
176
177   @Override
178   public String getStatReport(boolean shouldDisplayFinalReport) {
179     return getStatReport(System.currentTimeMillis() - this.syncStartedTimeStampInMs,
180         shouldDisplayFinalReport);
181   }
182
183   @Override
184   public void shutdown() {
185     this.shutdownExecutors();
186   }
187 }