2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 package org.onap.aai.sparky.synchronizer;
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
27 import java.io.IOException;
28 import java.sql.Timestamp;
29 import java.text.SimpleDateFormat;
30 import java.util.Collection;
31 import java.util.EnumSet;
33 import java.util.Map.Entry;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.atomic.AtomicInteger;
37 import java.util.function.Supplier;
39 import javax.json.Json;
41 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
42 import org.onap.aai.sparky.dal.rest.HttpMethod;
43 import org.onap.aai.sparky.dal.rest.OperationResult;
44 import org.onap.aai.sparky.logging.AaiUiMsgs;
45 import org.onap.aai.sparky.synchronizer.enumeration.OperationState;
46 import org.onap.aai.sparky.synchronizer.enumeration.SynchronizerState;
47 import org.onap.aai.sparky.util.NodeUtils;
48 import org.onap.aai.cl.api.Logger;
49 import org.onap.aai.cl.eelf.LoggerFactory;
50 import org.onap.aai.cl.mdc.MdcContext;
53 import com.fasterxml.jackson.databind.JsonNode;
54 import com.fasterxml.jackson.databind.node.ArrayNode;
57 * The Class HistoricalEntitySummarizer.
59 public class HistoricalEntitySummarizer extends AbstractEntitySynchronizer
60 implements IndexSynchronizer {
62 private static final Logger LOG = LoggerFactory.getInstance().getLogger(HistoricalEntitySummarizer.class);
63 private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
65 private boolean allWorkEnumerated;
66 private ConcurrentHashMap<String, AtomicInteger> entityCounters;
67 private boolean syncInProgress;
68 private Map<String, String> contextMap;
71 * Instantiates a new historical entity summarizer.
73 * @param indexName the index name
74 * @throws Exception the exception
76 public HistoricalEntitySummarizer(String indexName) throws Exception {
77 super(LOG, "HES", 2, 5, 5, indexName);
79 this.allWorkEnumerated = false;
80 this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
81 this.synchronizerName = "Historical Entity Summarizer";
82 this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
83 this.syncInProgress = false;
84 this.contextMap = MDC.getCopyOfContextMap();
85 this.syncDurationInMs = -1;
89 * Collect all the work.
91 * @return the operation state
93 private OperationState collectAllTheWork() {
95 Map<String, OxmEntityDescriptor> descriptorMap =
96 oxmModelLoader.getSearchableEntityDescriptors();
98 if (descriptorMap.isEmpty()) {
99 LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "historical entities");
101 return OperationState.ERROR;
104 Collection<String> entityTypes = descriptorMap.keySet();
106 AtomicInteger asyncWoH = new AtomicInteger(0);
108 asyncWoH.set(entityTypes.size());
111 for (String entityType : entityTypes) {
113 supplyAsync(new Supplier<Void>() {
117 MDC.setContextMap(contextMap);
119 OperationResult typeLinksResult =
120 aaiDataProvider.getSelfLinksByEntityType(entityType);
121 updateActiveInventoryCounters(HttpMethod.GET, entityType, typeLinksResult);
122 processEntityTypeSelfLinks(entityType, typeLinksResult);
123 } catch (Exception exc) {
124 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc.getMessage());
131 }, aaiExecutor).whenComplete((result, error) -> {
133 asyncWoH.decrementAndGet();
136 LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, error.getMessage());
144 while (asyncWoH.get() > 0) {
146 if (LOG.isDebugEnabled()) {
147 LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + " summarizer waiting for all the links to be processed.");
153 esWorkOnHand.set(entityCounters.size());
155 // start doing the real work
156 allWorkEnumerated = true;
158 insertEntityTypeCounters();
160 if (LOG.isDebugEnabled()) {
162 StringBuilder sb = new StringBuilder(128);
164 sb.append("\n\nHistorical Entity Counters:");
166 for (Entry<String, AtomicInteger> entry : entityCounters.entrySet()) {
167 sb.append("\n").append(entry.getKey()).append(" = ").append(entry.getValue().get());
170 LOG.debug(AaiUiMsgs.DEBUG_GENERIC, sb.toString());
174 } catch (Exception exc) {
175 LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, exc.getMessage());
179 allWorkEnumerated = true;
181 return OperationState.ERROR;
184 return OperationState.OK;
189 * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#doSync()
192 public OperationState doSync() {
193 String txnID = NodeUtils.getRandomTxnId();
194 MdcContext.initialize(txnID, "HistoricalEntitySynchronizer", "", "Sync", "");
196 if (syncInProgress) {
197 LOG.info(AaiUiMsgs.HISTORICAL_SYNC_PENDING);
198 return OperationState.PENDING;
203 syncInProgress = true;
204 this.syncStartedTimeStampInMs = System.currentTimeMillis();
205 allWorkEnumerated = false;
207 return collectAllTheWork();
211 * Process entity type self links.
213 * @param entityType the entity type
214 * @param operationResult the operation result
216 private void processEntityTypeSelfLinks(String entityType, OperationResult operationResult) {
218 JsonNode rootNode = null;
220 final String jsonResult = operationResult.getResult();
222 if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
225 rootNode = mapper.readTree(jsonResult);
226 } catch (IOException exc) {
227 LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc.getMessage());
231 JsonNode resultData = rootNode.get("result-data");
232 ArrayNode resultDataArrayNode = null;
234 if (resultData != null && resultData.isArray()) {
235 resultDataArrayNode = (ArrayNode) resultData;
236 entityCounters.put(entityType, new AtomicInteger(resultDataArrayNode.size()));
243 * Insert entity type counters.
245 private void insertEntityTypeCounters() {
247 if (esWorkOnHand.get() <= 0) {
251 SimpleDateFormat dateFormat = new SimpleDateFormat(INSERTION_DATE_TIME_FORMAT);
252 Timestamp timestamp = new Timestamp(System.currentTimeMillis());
253 String currentFormattedTimeStamp = dateFormat.format(timestamp);
255 Set<Entry<String, AtomicInteger>> entityCounterEntries = entityCounters.entrySet();
257 for (Entry<String, AtomicInteger> entityCounterEntry : entityCounterEntries) {
259 supplyAsync(new Supplier<Void>() {
263 MDC.setContextMap(contextMap);
264 String jsonString = Json.createObjectBuilder().add(
265 "count", entityCounterEntry.getValue().get())
266 .add("entityType", entityCounterEntry.getKey())
267 .add("timestamp", currentFormattedTimeStamp).build().toString();
271 link = getElasticFullUrl("", indexName);
272 OperationResult or = esDataProvider.doPost(link, jsonString, "application/json");
273 updateElasticSearchCounters(HttpMethod.POST, entityCounterEntry.getKey(), or);
274 } catch (Exception exc) {
275 LOG.error(AaiUiMsgs.ES_STORE_FAILURE, exc.getMessage() );
281 }, esExecutor).whenComplete((result, error) -> {
283 esWorkOnHand.decrementAndGet();
289 while (esWorkOnHand.get() > 0) {
293 } catch (InterruptedException exc) {
294 LOG.error(AaiUiMsgs.INTERRUPTED, "historical Entities", exc.getMessage());
301 public SynchronizerState getState() {
304 return SynchronizerState.PERFORMING_SYNCHRONIZATION;
307 return SynchronizerState.IDLE;
312 * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
315 public String getStatReport(boolean showFinalReport) {
316 syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
317 return this.getStatReport(syncDurationInMs, showFinalReport);
321 * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#shutdown()
324 public void shutdown() {
325 this.shutdownExecutors();
329 protected boolean isSyncDone() {
331 int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
333 if (LOG.isDebugEnabled()) {
334 LOG.debug(AaiUiMsgs.DEBUG_GENERIC,indexName + ", isSyncDone(), totalWorkOnHand = " + totalWorkOnHand
335 + " all work enumerated = " + allWorkEnumerated);
338 if (totalWorkOnHand > 0 || !allWorkEnumerated) {
342 this.syncInProgress = false;
348 * @see org.onap.aai.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
351 public void clearCache() {
353 if (syncInProgress) {
354 LOG.debug(AaiUiMsgs.DEBUG_GENERIC, "Historical Entity Summarizer in progress, request to clear cache ignored");
359 this.resetCounters();
360 if (entityCounters != null) {
361 entityCounters.clear();
364 allWorkEnumerated = false;