2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.aai.sparky.aggregation.sync;
23 import static java.util.concurrent.CompletableFuture.supplyAsync;
25 import java.io.IOException;
26 import java.sql.Timestamp;
27 import java.text.SimpleDateFormat;
28 import java.util.Collection;
29 import java.util.EnumSet;
31 import java.util.Map.Entry;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.function.Supplier;
37 import javax.json.Json;
38 import javax.ws.rs.core.MediaType;
40 import org.onap.aai.cl.api.Logger;
41 import org.onap.aai.cl.eelf.LoggerFactory;
42 import org.onap.aai.cl.mdc.MdcContext;
43 import org.onap.aai.restclient.client.OperationResult;
44 import org.onap.aai.sparky.config.oxm.SearchableEntityLookup;
45 import org.onap.aai.sparky.config.oxm.SearchableOxmEntityDescriptor;
46 import org.onap.aai.sparky.dal.rest.HttpMethod;
47 import org.onap.aai.sparky.logging.AaiUiMsgs;
48 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
49 import org.onap.aai.sparky.sync.IndexSynchronizer;
50 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
51 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
52 import org.onap.aai.sparky.sync.enumeration.OperationState;
53 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
54 import org.onap.aai.sparky.util.NodeUtils;
57 import com.fasterxml.jackson.databind.JsonNode;
58 import com.fasterxml.jackson.databind.node.ArrayNode;
61 * The Class HistoricalEntitySummarizer.
63 public class HistoricalEntitySummarizer extends AbstractEntitySynchronizer
64 implements IndexSynchronizer {
66 private static final Logger LOG = LoggerFactory.getInstance().getLogger(HistoricalEntitySummarizer.class);
67 private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
69 private boolean allWorkEnumerated;
70 private ConcurrentHashMap<String, AtomicInteger> entityCounters;
71 private boolean syncInProgress;
72 private Map<String, String> contextMap;
73 private ElasticSearchSchemaConfig schemaConfig;
74 private SearchableEntityLookup searchableEntityLookup;
77 * Instantiates a new historical entity summarizer.
79 * @param indexName the index name
80 * @throws Exception the exception
82 public HistoricalEntitySummarizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers,
83 int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
84 NetworkStatisticsConfig esStatConfig, SearchableEntityLookup searchableEntityLookup)
86 super(LOG, "HES", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(), aaiStatConfig, esStatConfig);
88 this.schemaConfig = schemaConfig;
89 this.allWorkEnumerated = false;
90 this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
91 this.synchronizerName = "Historical Entity Summarizer";
92 this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
93 this.syncInProgress = false;
94 this.contextMap = MDC.getCopyOfContextMap();
95 this.syncDurationInMs = -1;
96 this.searchableEntityLookup = searchableEntityLookup;
100 * Collect all the work.
102 * @return the operation state
104 private OperationState collectAllTheWork() {
106 Map<String, SearchableOxmEntityDescriptor> descriptorMap =
107 searchableEntityLookup.getSearchableEntityDescriptors();
109 if (descriptorMap.isEmpty()) {
110 LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "historical entities");
112 return OperationState.ERROR;
115 Collection<String> entityTypes = descriptorMap.keySet();
117 AtomicInteger asyncWoH = new AtomicInteger(0);
119 asyncWoH.set(entityTypes.size());
122 for (String entityType : entityTypes) {
124 supplyAsync(new Supplier<Void>() {
128 MDC.setContextMap(contextMap);
130 OperationResult typeLinksResult =
131 aaiAdapter.getSelfLinksByEntityType(entityType);
132 updateActiveInventoryCounters(HttpMethod.GET, entityType, typeLinksResult);
133 processEntityTypeSelfLinks(entityType, typeLinksResult);
134 } catch (Exception exc) {
135 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc.getMessage());
142 }, aaiExecutor).whenComplete((result, error) -> {
144 asyncWoH.decrementAndGet();
147 LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, error.getMessage());
155 while (asyncWoH.get() > 0) {
157 if (LOG.isDebugEnabled()) {
158 LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + " summarizer waiting for all the links to be processed.");
164 esWorkOnHand.set(entityCounters.size());
166 // start doing the real work
167 allWorkEnumerated = true;
169 insertEntityTypeCounters();
171 if (LOG.isDebugEnabled()) {
173 StringBuilder sb = new StringBuilder(128);
175 sb.append("\n\nHistorical Entity Counters:");
177 for (Entry<String, AtomicInteger> entry : entityCounters.entrySet()) {
178 sb.append("\n").append(entry.getKey()).append(" = ").append(entry.getValue().get());
181 LOG.debug(AaiUiMsgs.DEBUG_GENERIC, sb.toString());
185 } catch (Exception exc) {
186 LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, exc.getMessage());
190 allWorkEnumerated = true;
192 return OperationState.ERROR;
195 return OperationState.OK;
200 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
203 public OperationState doSync() {
204 this.syncDurationInMs = -1;
205 String txnID = NodeUtils.getRandomTxnId();
206 MdcContext.initialize(txnID, "HistoricalEntitySynchronizer", "", "Sync", "");
208 if (syncInProgress) {
209 LOG.info(AaiUiMsgs.HISTORICAL_SYNC_PENDING);
210 return OperationState.PENDING;
215 syncInProgress = true;
216 this.syncStartedTimeStampInMs = System.currentTimeMillis();
217 allWorkEnumerated = false;
219 return collectAllTheWork();
223 * Process entity type self links.
225 * @param entityType the entity type
226 * @param operationResult the operation result
228 private void processEntityTypeSelfLinks(String entityType, OperationResult operationResult) {
230 JsonNode rootNode = null;
232 final String jsonResult = operationResult.getResult();
234 if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
237 rootNode = mapper.readTree(jsonResult);
238 } catch (IOException exc) {
239 LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc.getMessage());
243 JsonNode resultData = rootNode.get("result-data");
244 ArrayNode resultDataArrayNode = null;
246 if (resultData != null && resultData.isArray()) {
247 resultDataArrayNode = (ArrayNode) resultData;
248 entityCounters.put(entityType, new AtomicInteger(resultDataArrayNode.size()));
255 * Insert entity type counters.
257 private void insertEntityTypeCounters() {
259 if (esWorkOnHand.get() <= 0) {
263 SimpleDateFormat dateFormat = new SimpleDateFormat(INSERTION_DATE_TIME_FORMAT);
264 Timestamp timestamp = new Timestamp(System.currentTimeMillis());
265 String currentFormattedTimeStamp = dateFormat.format(timestamp);
267 Set<Entry<String, AtomicInteger>> entityCounterEntries = entityCounters.entrySet();
269 for (Entry<String, AtomicInteger> entityCounterEntry : entityCounterEntries) {
271 supplyAsync(new Supplier<Void>() {
275 MDC.setContextMap(contextMap);
276 String jsonString = Json.createObjectBuilder().add(
277 "count", entityCounterEntry.getValue().get())
278 .add("entityType", entityCounterEntry.getKey())
279 .add("timestamp", currentFormattedTimeStamp).build().toString();
283 link = elasticSearchAdapter.buildElasticSearchPostUrl(indexName);
284 OperationResult or = elasticSearchAdapter.doPost(link, jsonString, MediaType.APPLICATION_JSON_TYPE);
285 updateElasticSearchCounters(HttpMethod.POST, entityCounterEntry.getKey(), or);
286 } catch (Exception exc) {
287 LOG.error(AaiUiMsgs.ES_STORE_FAILURE, exc.getMessage() );
293 }, esExecutor).whenComplete((result, error) -> {
295 esWorkOnHand.decrementAndGet();
301 while (esWorkOnHand.get() > 0) {
305 } catch (InterruptedException exc) {
306 LOG.error(AaiUiMsgs.INTERRUPTED, "historical Entities", exc.getMessage());
313 public SynchronizerState getState() {
316 return SynchronizerState.PERFORMING_SYNCHRONIZATION;
319 return SynchronizerState.IDLE;
324 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
327 public String getStatReport(boolean showFinalReport) {
328 syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
329 return this.getStatReport(syncDurationInMs, showFinalReport);
333 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
336 public void shutdown() {
337 this.shutdownExecutors();
341 protected boolean isSyncDone() {
343 int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
345 if (LOG.isDebugEnabled()) {
346 LOG.debug(AaiUiMsgs.DEBUG_GENERIC,indexName + ", isSyncDone(), totalWorkOnHand = " + totalWorkOnHand
347 + " all work enumerated = " + allWorkEnumerated);
350 if (totalWorkOnHand > 0 || !allWorkEnumerated) {
354 this.syncInProgress = false;
360 * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
363 public void clearCache() {
365 if (syncInProgress) {
366 LOG.debug(AaiUiMsgs.DEBUG_GENERIC, "Historical Entity Summarizer in progress, request to clear cache ignored");
371 this.resetCounters();
372 if (entityCounters != null) {
373 entityCounters.clear();
376 allWorkEnumerated = false;