import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
+import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
+import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.rest.RestStatus;
import org.json.JSONObject;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
for (JSONObject json : jsons) {
if(topic.isCorrelateClearedMessage()) {
- boolean found = correlateClearedMessage(json);
+ boolean found = correlateClearedMessage(topic, json);
if(found) {
continue;
}
}
String id = topic.getMessageId(json); //id can be null
-
- request.add(new IndexRequest(topic.getName().toLowerCase(), "doc", id).source(json.toString(), XContentType.JSON));
+
+ request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON));
}
if(config.isAsync()) {
client.bulkAsync(request, RequestOptions.DEFAULT, listener);
}
}
}
-
- private boolean correlateClearedMessage(JSONObject json) {
- boolean found = true;
-
- /*TODO
- * 1. check if this is a alarm cleared message
- * 2. search previous alarm message
- * 3. update previous message, if success, set found=true
- */
- //for Sonar test, remove the following
- if(json.isNull("kkkkk")) {
- found = false;
+
+ /**
+ *
+ * @param topic
+ * @param json
+ * @return boolean
+ *
+ * Because of query by id, The search API cannot be used for query.
+ * The search API can only query all data or based on the fields in the source.
+ * So use the get API, three parameters: index, type, document id
+ */
+ private boolean correlateClearedMessage(Topic topic, JSONObject json) {
+ boolean found = false;
+ String eName = null;
+
+ try {
+ eName = json.query("/event/commonEventHeader/eventName").toString();
+
+ if (StringUtils.isNotBlank(eName)) {
+
+ if (eName.endsWith("Cleared")) {
+
+ String name = eName.substring(0, eName.length() - 7);
+ String reportingEntityName = json.query("/event/commonEventHeader/reportingEntityName").toString();
+ String specificProblem = json.query("/event/faultFields/specificProblem").toString();
+
+ String id = null;
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder = stringBuilder.append(name).append('^').append(reportingEntityName).append('^').append(specificProblem);
+
+ id = stringBuilder.toString();//example: id = "aaaa^cccc^bbbbb"
+ String index = topic.getName().toLowerCase();
+
+ //get
+ GetRequest getRequest = new GetRequest(index, config.getElasticsearchType(), id);
+
+ GetResponse getResponse = null;
+ try {
+ getResponse = client.get(getRequest, RequestOptions.DEFAULT);
+ if (getResponse != null) {
+
+ if (getResponse.isExists()) {
+ String sourceAsString = getResponse.getSourceAsString();
+ JSONObject jsonObject = new JSONObject(sourceAsString);
+ jsonObject.getJSONObject("event").getJSONObject("faultFields").put("vfStatus", "closed");
+ String jsonString = jsonObject.toString();
+
+ //update
+ IndexRequest request = new IndexRequest(index, config.getElasticsearchType(), id);
+ request.source(jsonString, XContentType.JSON);
+ IndexResponse indexResponse = null;
+ try {
+ indexResponse = client.index(request, RequestOptions.DEFAULT);
+ found = true;
+ } catch (IOException e) {
+ log.error("save failure");
+ }
+ } else {
+ log.error("The getResponse was not exists" );
+ }
+
+ } else {
+ log.error("The document for this id was not found" );
+ }
+
+ } catch (ElasticsearchException e) {
+ if (e.status() == RestStatus.NOT_FOUND) {
+ log.error("The document for this id was not found" );
+ }
+ if (e.status() == RestStatus.CONFLICT) {
+ log.error("Version conflict" );
+ }
+ log.error("Get document exception", e);
+ }catch (IOException e) {
+ log.error(topic.getName() , e);
+ }
+
+ } else {
+ log.info("The data is normal");
+ }
+
+ } else {
+ log.debug("event id null");
+ }
+
+ } catch (Exception e) {
+ log.error("error",e);
}
-
- return found;
+
+ return found;
}
}