2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.aai.datarouter.policy;
23 import java.io.FileNotFoundException;
24 import java.util.List;
26 import org.apache.camel.Exchange;
27 import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext;
28 import org.onap.aai.datarouter.entity.SpikeEventEntity;
29 import org.onap.aai.datarouter.entity.SpikeEventVertex;
30 import org.onap.aai.datarouter.logging.EntityEventPolicyMsgs;
33 public class SpikeEntitySearchProcessor extends AbstractSpikeEntityEventProcessor {
35 public static final String additionalInfo = "Response of SpikeEntityEventPolicy";
36 private static final String searchIndexSchema = "";
40 /** Agent for communicating with the Search Service. */
42 public SpikeEntitySearchProcessor(SpikeEventPolicyConfig config)
43 throws FileNotFoundException {
49 protected void startup() {
50 // Create the indexes in the search service if they do not already exist.
51 searchAgent.createSearchIndex(searchIndexName, searchIndexSchema);
52 logger.info(EntityEventPolicyMsgs.ENTITY_EVENT_POLICY_REGISTERED);
56 public void process(Exchange exchange) throws Exception {
58 long startTime = System.currentTimeMillis();
59 String uebPayload = getExchangeBody(exchange);
60 if (uebPayload == null) {
64 String action = getSpikeEventAction(exchange, uebPayload);
68 SpikeEventVertex eventVertex = populateEventVertex(exchange, uebPayload);
69 if (eventVertex == null) {
72 String entityType = getEntityType(exchange, eventVertex, uebPayload);
73 if (entityType == null) {
76 String entityLink = getEntityLink(exchange, eventVertex, uebPayload);
77 if (entityLink == null) {
80 DynamicJAXBContext oxmJaxbContext = readOxm(exchange, uebPayload);
81 if (oxmJaxbContext == null) {
84 String oxmEntityType = getOxmEntityType(entityType);
85 List<String> searchableAttr = getSearchableAttibutes(oxmJaxbContext, oxmEntityType, entityType, uebPayload,
87 if (searchableAttr == null) {
91 // log the fact that all data are in good shape
92 logger.info(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_NONVERBOSE, action, entityType);
93 logger.debug(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_VERBOSE, action, entityType,
96 SpikeEventEntity spikeEventEntity = new SpikeEventEntity();
97 spikeEventEntity.setEntityType(entityType);
98 spikeEventEntity.setLink(entityLink);
99 spikeEventEntity = populateSpikeEventEntity(exchange, spikeEventEntity, oxmJaxbContext,
100 entityType, action, uebPayload, oxmEntityType,searchableAttr);
101 if (spikeEventEntity == null) {
105 handleSearchServiceOperation(spikeEventEntity, action, searchIndexName);
106 long stopTime = System.currentTimeMillis();
107 metricsLogger.info(EntityEventPolicyMsgs.OPERATION_RESULT_NO_ERRORS, PROCESS_SPIKE_EVENT,
108 String.valueOf(stopTime - startTime));
109 setResponse(exchange, ResponseType.SUCCESS, additionalInfo);
114 * This is not for this Scope. We get back to it later. (updateCerInEntity) private void
115 * updateSearchEntityWithCrossEntityReference(SpikeEventEntity spikeEventEntity) { try {
116 * Map<String, List<String>> headers = new HashMap<>(); headers.put(Headers.FROM_APP_ID,
117 * Arrays.asList("Data Router")); headers.put(Headers.TRANSACTION_ID,
118 * Arrays.asList(MDC.get(MdcContext.MDC_REQUEST_ID)));
120 * String entityId = spikeEventEntity.getId(); String jsonPayload;
122 * // Run the GET to retrieve the ETAG from the search service OperationResult storedEntity =
123 * searchAgent.getDocument(entitySearchIndex, entityId);
125 * if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) { /* NOTES:
126 * aaiEventEntity (ie the nested entity) may contain a subset of properties of the pre-existing
127 * object, so all we want to do is update the CER on the pre-existing object (if needed).
130 * List<String> etag = storedEntity.getHeaders().get(Headers.ETAG);
132 * if (etag != null && !etag.isEmpty()) { headers.put(Headers.IF_MATCH, etag); } else {
133 * logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, entitySearchIndex, entityId); }
135 * ArrayList<JsonNode> sourceObject = new ArrayList<>();
136 * NodeUtils.extractObjectsByKey(NodeUtils.convertJsonStrToJsonNode(storedEntity.getResult()),
137 * "content", sourceObject);
139 * if (!sourceObject.isEmpty()) { JsonNode node = sourceObject.get(0); final String sourceCer =
140 * NodeUtils.extractFieldValueFromObject(node, "crossEntityReferenceValues"); String newCer =
141 * spikeEventEntity.getCrossReferenceEntityValues(); boolean hasNewCer = true; if (sourceCer !=
142 * null && sourceCer.length() > 0) { // already has CER if (!sourceCer.contains(newCer)) {// don't
143 * re-add newCer = sourceCer + ";" + newCer; } else { hasNewCer = false; } }
145 * if (hasNewCer) { // Do the PUT with new CER ((ObjectNode)
146 * node).put("crossEntityReferenceValues", newCer); jsonPayload =
147 * NodeUtils.convertObjectToJson(node, false); searchAgent.putDocument(entitySearchIndex,
148 * entityId, jsonPayload, headers); } } } else {
150 * if (storedEntity.getResultCode() == 404) { // entity not found, so attempt to do a PUT
151 * searchAgent.putDocument(entitySearchIndex, entityId, spikeEventEntity.getAsJson(), headers); }
152 * else { logger.error(EntityEventPolicyMsgs.FAILED_TO_UPDATE_ENTITY_IN_DOCSTORE,
153 * spikeEventEntity.getId(), "SYNC_ENTITY"); } } } catch (IOException e) {
154 * logger.error(EntityEventPolicyMsgs.FAILED_TO_UPDATE_ENTITY_IN_DOCSTORE,
155 * spikeEventEntity.getId(), "SYNC_ENTITY"); } }