2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.datalake.feeder.service;
23 import java.io.IOException;
24 import java.util.List;
26 import javax.annotation.PostConstruct;
27 import javax.annotation.PreDestroy;
29 import org.apache.http.HttpHost;
30 import org.elasticsearch.ElasticsearchStatusException;
31 import org.elasticsearch.action.ActionListener;
32 import org.elasticsearch.client.indices.CreateIndexRequest;
33 import org.elasticsearch.client.indices.CreateIndexResponse;
34 import org.elasticsearch.client.indices.GetIndexRequest;
35 import org.elasticsearch.action.bulk.BulkRequest;
36 import org.elasticsearch.action.bulk.BulkResponse;
37 import org.elasticsearch.action.index.IndexRequest;
38 import org.elasticsearch.client.RequestOptions;
39 import org.elasticsearch.client.RestClient;
40 import org.elasticsearch.client.RestHighLevelClient;
41 import org.elasticsearch.common.xcontent.XContentType;
42 import org.json.JSONObject;
43 import org.onap.datalake.feeder.config.ApplicationConfiguration;
44 import org.onap.datalake.feeder.domain.Db;
45 import org.onap.datalake.feeder.domain.Topic;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
49 import org.springframework.beans.factory.annotation.Autowired;
50 import org.springframework.stereotype.Service;
53 * Service to use Elasticsearch
59 public class ElasticsearchService {
61 private final Logger log = LoggerFactory.getLogger(this.getClass());
64 private ApplicationConfiguration config;
67 private DbService dbService;
69 private RestHighLevelClient client;
70 ActionListener<BulkResponse> listener;
74 Db elasticsearch = dbService.getElasticsearch();
75 String elasticsearchHost = elasticsearch.getHost();
77 // Initialize the Connection
78 client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http")));
80 log.info("Connect to Elasticsearch Host " + elasticsearchHost);
82 listener = new ActionListener<BulkResponse>() {
84 public void onResponse(BulkResponse bulkResponse) {
89 public void onFailure(Exception e) {
90 log.error(e.getMessage());
96 public void cleanUp() throws IOException {
100 public void ensureTableExist(String topic) throws IOException {
101 String topicLower = topic.toLowerCase();
103 GetIndexRequest request = new GetIndexRequest(topicLower);
105 boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
107 CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
108 CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
109 log.info(createIndexResponse.index()+" : created "+createIndexResponse.isAcknowledged());
113 //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
114 public void saveJsons(Topic topic, List<JSONObject> jsons) {
115 BulkRequest request = new BulkRequest();
117 for (JSONObject json : jsons) {
118 if(topic.isCorrelateClearedMessage()) {
119 boolean found = correlateClearedMessage(json);
125 String id = topic.getMessageId(json); //id can be null
127 request.add(new IndexRequest(topic.getName().toLowerCase(), "doc", id).source(json.toString(), XContentType.JSON));
129 if(config.isAsync()) {
130 client.bulkAsync(request, RequestOptions.DEFAULT, listener);
133 client.bulk(request, RequestOptions.DEFAULT);
134 } catch (IOException e) {
135 log.error( topic.getName() , e);
140 private boolean correlateClearedMessage(JSONObject json) {
141 boolean found = true;
144 * 1. check if this is a alarm cleared message
145 * 2. search previous alarm message
146 * 3. update previous message, if success, set found=true
148 //for Sonar test, remove the following
149 if(json.isNull("kkkkk")) {