Handle duplicate id 58/86758/2
authorGuobiao Mo <guobiaomo@chinamobile.com>
Thu, 2 May 2019 00:17:38 +0000 (17:17 -0700)
committerGuobiao Mo <guobiaomo@chinamobile.com>
Thu, 2 May 2019 23:48:20 +0000 (16:48 -0700)
Issue-ID: DCAEGEN2-1411
Change-Id: Ib39b39ff276d7d5c3dbfaa281df6104926fa354d
Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java

index c618f57..e957c0f 100644 (file)
@@ -107,10 +107,6 @@ public class Topic {
                this.name = name;
        }
 
-       public boolean isDefault() {
-               return "_DL_DEFAULT_".equals(name);
-       }
-
        public boolean isEnabled() {
                return is(enabled);
        }
index 12d03ee..f5ee5b7 100644 (file)
@@ -106,7 +106,11 @@ public class CouchbaseService {
                        JsonDocument doc = JsonDocument.create(id, expiry, jsonObject);
                        documents.add(doc);
                }
-               saveDocuments(documents);               
+               try {
+                       saveDocuments(documents);
+               }catch(Exception e) {
+                       log.error("error saving to Couchbase.", e);
+               }
                log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size()); 
        }
 
index 4090e7e..c354f17 100644 (file)
@@ -35,7 +35,7 @@ import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.client.indices.CreateIndexRequest;
 import org.elasticsearch.client.indices.CreateIndexResponse;
-import org.elasticsearch.client.indices.GetIndexRequest; 
+import org.elasticsearch.client.indices.GetIndexRequest;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
@@ -67,21 +67,20 @@ public class ElasticsearchService {
 
        @Autowired
        private ApplicationConfiguration config;
-       
+
        @Autowired
        private DbService dbService;
 
        private RestHighLevelClient client;
        ActionListener<BulkResponse> listener;
 
-
-//ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication
-//Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html
+       //ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication
+       //Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html
        @PostConstruct
        private void init() {
                Db elasticsearch = dbService.getElasticsearch();
                String elasticsearchHost = elasticsearch.getHost();
-               
+
                // Initialize the Connection
                client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http")));
 
@@ -104,47 +103,47 @@ public class ElasticsearchService {
        public void cleanUp() throws IOException {
                client.close();
        }
-       
+
        public void ensureTableExist(String topic) throws IOException {
                String topicLower = topic.toLowerCase();
-               
-               GetIndexRequest request = new GetIndexRequest(topicLower); 
-               
+
+               GetIndexRequest request = new GetIndexRequest(topicLower);
+
                boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
-               if(!exists){
+               if (!exists) {
                        //TODO submit mapping template
-                       CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower); 
-                       CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);          
+                       CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
+                       CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
                        log.info("{} : created {}", createIndexResponse.index(), createIndexResponse.isAcknowledged());
                }
        }
-       
+
        //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
        public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
                BulkRequest request = new BulkRequest();
 
                for (JSONObject json : jsons) {
-                       if(topic.isCorrelateClearedMessage()) {
+                       if (topic.isCorrelateClearedMessage()) {
                                boolean found = correlateClearedMessage(topic, json);
-                               if(found) {
+                               if (found) {
                                        continue;
                                }
                        }
-                       
+
                        String id = topic.getMessageId(json); //id can be null
 
                        request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON));
                }
 
                log.debug("saving text to topic = {}, batch count = {} ", topic, jsons.size());
-               
-               if(config.isAsync()) {
-                       client.bulkAsync(request, RequestOptions.DEFAULT, listener);                    
-               }else {
+
+               if (config.isAsync()) {
+                       client.bulkAsync(request, RequestOptions.DEFAULT, listener);
+               } else {
                        try {
                                client.bulk(request, RequestOptions.DEFAULT);
-                       } catch (IOException e) { 
-                               log.error( topic.getName() , e);
+                       } catch (IOException e) {
+                               log.error(topic.getName(), e);
                        }
                }
        }
@@ -155,9 +154,10 @@ public class ElasticsearchService {
         * @param json
         * @return boolean
         *
-        * Because of query by id, The search API cannot be used for query.
-        * The search API can only query all data or based on the fields in the source.
-        * So use the get API, three parameters: index, type, document id
+        *         Because of query by id, The search API cannot be used for query. The
+        *         search API can only query all data or based on the fields in the
+        *         source. So use the get API, three parameters: index, type, document
+        *         id
         */
        private boolean correlateClearedMessage(TopicConfig topic, JSONObject json) {
                boolean found = false;
@@ -166,75 +166,67 @@ public class ElasticsearchService {
                try {
                        eName = json.query("/event/commonEventHeader/eventName").toString();
 
-                       if (StringUtils.isNotBlank(eName)) {
-
-                               if (eName.endsWith("Cleared")) {
-
-                                       String name = eName.substring(0, eName.length() - 7);
-                                       String reportingEntityName = json.query("/event/commonEventHeader/reportingEntityName").toString();
-                                       String specificProblem = json.query("/event/faultFields/specificProblem").toString();
-
-                                       String id = null;
-                                       StringBuilder stringBuilder = new StringBuilder();
-                                       stringBuilder = stringBuilder.append(name).append('^').append(reportingEntityName).append('^').append(specificProblem);
-
-                                       id = stringBuilder.toString();//example: id = "aaaa^cccc^bbbbb"
-                                       String index = topic.getName().toLowerCase();
-
-                                       //get
-                                       GetRequest getRequest = new GetRequest(index, config.getElasticsearchType(), id);
-
-                                       GetResponse getResponse = null;
-                                       try {
-                                               getResponse = client.get(getRequest, RequestOptions.DEFAULT);
-                                               if (getResponse != null) {
-
-                                                       if (getResponse.isExists()) {
-                                                               String sourceAsString = getResponse.getSourceAsString();
-                                                               JSONObject jsonObject = new JSONObject(sourceAsString);
-                                                               jsonObject.getJSONObject("event").getJSONObject("faultFields").put("vfStatus", "closed");
-                                                               String jsonString = jsonObject.toString();
-
-                                                               //update
-                                                               IndexRequest request = new IndexRequest(index, config.getElasticsearchType(), id);
-                                                               request.source(jsonString, XContentType.JSON);
-                                                               IndexResponse indexResponse = null;
-                                                               try {
-                                                                       indexResponse = client.index(request, RequestOptions.DEFAULT);
-                                                                       found = true;
-                                                               } catch (IOException e) {
-                                                                       log.error("save failure");
-                                                               }
-                                                       } else {
-                                                               log.error("The getResponse was not exists" );
+                       if (StringUtils.isNotBlank(eName) && eName.endsWith("Cleared")) {
+
+                               String name = eName.substring(0, eName.length() - 7);
+                               String reportingEntityName = json.query("/event/commonEventHeader/reportingEntityName").toString();
+                               String specificProblem = json.query("/event/faultFields/specificProblem").toString();
+
+                               String id = null;
+                               StringBuilder stringBuilder = new StringBuilder();
+                               stringBuilder = stringBuilder.append(name).append('^').append(reportingEntityName).append('^').append(specificProblem);
+
+                               id = stringBuilder.toString();//example: id = "aaaa^cccc^bbbbb"
+                               String index = topic.getName().toLowerCase();
+
+                               //get
+                               GetRequest getRequest = new GetRequest(index, config.getElasticsearchType(), id);
+
+                               GetResponse getResponse = null;
+                               try {
+                                       getResponse = client.get(getRequest, RequestOptions.DEFAULT);
+                                       if (getResponse != null) {
+
+                                               if (getResponse.isExists()) {
+                                                       String sourceAsString = getResponse.getSourceAsString();
+                                                       JSONObject jsonObject = new JSONObject(sourceAsString);
+                                                       jsonObject.getJSONObject("event").getJSONObject("faultFields").put("vfStatus", "closed");
+                                                       String jsonString = jsonObject.toString();
+
+                                                       //update
+                                                       IndexRequest request = new IndexRequest(index, config.getElasticsearchType(), id);
+                                                       request.source(jsonString, XContentType.JSON);
+                                                       IndexResponse indexResponse = null;
+                                                       try {
+                                                               indexResponse = client.index(request, RequestOptions.DEFAULT);
+                                                               found = true;
+                                                       } catch (IOException e) {
+                                                               log.error("save failure");
                                                        }
-
                                                } else {
-                                                       log.error("The document for this id was not found" );
+                                                       log.error("The getResponse was not exists");
                                                }
 
-                                       } catch (ElasticsearchException e) {
-                                               if (e.status() == RestStatus.NOT_FOUND) {
-                                                       log.error("The document for this id was not found" );
-                                               }
-                                               if (e.status() == RestStatus.CONFLICT) {
-                                                       log.error("Version conflict" );
-                                               }
-                                               log.error("Get document exception", e);
-                                       }catch (IOException e) {
-                                               log.error(topic.getName() , e);
+                                       } else {
+                                               log.error("The document for this id was not found");
                                        }
 
-                               } else {
-                                       log.info("The data is normal");
+                               } catch (ElasticsearchException e) {
+                                       if (e.status() == RestStatus.NOT_FOUND) {
+                                               log.error("The document for this id was not found");
+                                       }
+                                       if (e.status() == RestStatus.CONFLICT) {
+                                               log.error("Version conflict");
+                                       }
+                                       log.error("Get document exception", e);
+                               } catch (IOException e) {
+                                       log.error(topic.getName(), e);
                                }
 
-                       } else {
-                               log.debug("event id null");
                        }
 
                } catch (Exception e) {
-                       log.error("error",e);
+                       log.error("error", e);
                }
 
                return found;
index 02c80a4..c540895 100644 (file)
@@ -41,6 +41,8 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import com.mongodb.bulk.BulkWriteError;
+import com.mongodb.MongoBulkWriteException;
 import com.mongodb.MongoClient;
 import com.mongodb.MongoClientOptions;
 import com.mongodb.MongoClientOptions.Builder;
@@ -48,6 +50,7 @@ import com.mongodb.MongoCredential;
 import com.mongodb.ServerAddress;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.InsertManyOptions;
 
 /**
  * Service for using MongoDB
@@ -62,7 +65,7 @@ public class MongodbService {
 
        @Autowired
        private ApplicationConfiguration config;
-        private boolean dbReady = false;
+       private boolean dbReady = false;
 
        @Autowired
        private DbService dbService;
@@ -70,6 +73,7 @@ public class MongodbService {
        private MongoDatabase database;
        private MongoClient mongoClient;
        private Map<String, MongoCollection<Document>> mongoCollectionMap = new HashMap<>();
+       private InsertManyOptions insertManyOptions;
 
        @PostConstruct
        private void init() {
@@ -103,26 +107,26 @@ public class MongodbService {
 
                addrs.add(new ServerAddress(host, port)); // FIXME should be a list of address
 
-
                try {
-                       if(StringUtils.isNoneBlank(userName) && StringUtils.isNoneBlank(password))
-                       {
+                       if (StringUtils.isNoneBlank(userName) && StringUtils.isNoneBlank(password)) {
                                credential = MongoCredential.createCredential(userName, databaseName, password.toCharArray());
                                List<MongoCredential> credentialList = new ArrayList<MongoCredential>();
                                credentialList.add(credential);
                                mongoClient = new MongoClient(addrs, credentialList, options);
-                       }else
-                       {
+                       } else {
                                mongoClient = new MongoClient(addrs, options);
                        }
-               }catch(Exception ex){
+               } catch (Exception ex) {
                        dbReady = false;
                        log.error("Fail to initiate MongoDB" + mongodb.getHost());
                        return;
                }
                database = mongoClient.getDatabase(mongodb.getDatabase());
-               dbReady = true;
 
+               insertManyOptions = new InsertManyOptions();
+               insertManyOptions.ordered(false);
+
+               dbReady = true;
        }
 
        @PreDestroy
@@ -131,7 +135,7 @@ public class MongodbService {
        }
 
        public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
-               if(dbReady == false)
+               if (dbReady == false)
                        return;
                List<Document> documents = new ArrayList<>(jsons.size());
                for (JSONObject json : jsons) {
@@ -147,7 +151,15 @@ public class MongodbService {
 
                String collectionName = topic.getName().replaceAll("[^a-zA-Z0-9]", "");//remove - _ .
                MongoCollection<Document> collection = mongoCollectionMap.computeIfAbsent(collectionName, k -> database.getCollection(k));
-               collection.insertMany(documents);
+
+               try {
+                       collection.insertMany(documents, insertManyOptions);
+               } catch (MongoBulkWriteException e) {
+                       List<BulkWriteError> bulkWriteErrors = e.getWriteErrors();
+                       for (BulkWriteError bulkWriteError : bulkWriteErrors) {
+                               log.error("Failed record: {}", bulkWriteError);
+                       }
+               }
 
                log.debug("saved text to topic = {}, batch count = {} ", topic, jsons.size());
        }
index 74f0884..4397e91 100644 (file)
@@ -68,6 +68,7 @@ public class TopicTest {
 
         assertTrue(testTopic.equals(new Topic("test")));
         assertEquals(testTopic.hashCode(), (new Topic("test")).hashCode());
+        assertEquals(testTopic.toString(), "test");
 
         defaultTopic.setDbs(new HashSet<>());
         defaultTopic.getDbs().add(new Db("Elasticsearch"));
index c65e920..dc9feed 100644 (file)
@@ -28,6 +28,7 @@ import java.util.HashSet;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -79,13 +80,20 @@ public class TopicConfigTest {
     public void testIs() {
         Topic testTopic = new Topic("test");
 
-        assertTrue(testTopic.equals(new Topic("test")));
-        assertEquals(testTopic.hashCode(), (new Topic("test")).hashCode());
-
+        TopicConfig testTopicConfig = testTopic.getTopicConfig();
+        testTopicConfig.setSinkdbs(null);
+        assertFalse(testTopicConfig.supportElasticsearch());
+        assertNull(testTopicConfig.getDataFormat2());
+                
         testTopic.setDbs(new HashSet<>());
         testTopic.getDbs().add(new Db("Elasticsearch"));
         
-        TopicConfig testTopicConfig = testTopic.getTopicConfig();
+        testTopicConfig = testTopic.getTopicConfig();
+
+        assertEquals(testTopicConfig, new Topic("test").getTopicConfig());
+        assertNotEquals(testTopicConfig, testTopic);
+        assertNotEquals(testTopicConfig, null);
+        assertEquals(testTopicConfig.hashCode(), (new Topic("test").getTopicConfig()).hashCode());
         
         assertTrue(testTopicConfig.supportElasticsearch());
         assertFalse(testTopicConfig.supportCouchbase());