2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2018-2019 Huawei. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.datalake.feeder.service;
 
  23 import com.couchbase.client.java.Cluster;
 
  24 import com.couchbase.client.java.CouchbaseCluster;
 
  25 import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
 
  26 import com.couchbase.mock.Bucket;
 
  27 import com.couchbase.mock.BucketConfiguration;
 
  28 import com.couchbase.mock.CouchbaseMock;
 
  29 import com.couchbase.mock.client.MockClient;
 
  30 import org.jetbrains.annotations.NotNull;
 
  31 import org.json.JSONObject;
 
  32 import org.junit.After;
 
  33 import org.junit.Before;
 
  34 import org.junit.Test;
 
  35 import org.junit.runner.RunWith;
 
  36 import org.mockito.junit.MockitoJUnitRunner;
 
  37 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 
  38 import org.onap.datalake.feeder.domain.Topic;
 
  40 import static org.mockito.Mockito.when;
 
  42 import java.util.ArrayList;
 
  43 import java.util.List;
 
  44 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
  46 @RunWith(MockitoJUnitRunner.class)
 
  47 public class CouchbaseServiceTest {
 
  48     protected final BucketConfiguration bucketConfiguration = new BucketConfiguration();
 
  49     protected MockClient mockClient;
 
  50     protected CouchbaseMock couchbaseMock;
 
  51     protected Cluster cluster;
 
  52     protected com.couchbase.client.java.Bucket bucket;
 
  53     protected int carrierPort;
 
  54     protected int httpPort;
 
  56     protected void getPortInfo(String bucket) throws Exception {
 
  57         httpPort = couchbaseMock.getHttpPort();
 
  58         carrierPort = couchbaseMock.getCarrierPort(bucket);
 
  61     protected void createMock(@NotNull String name, @NotNull String password) throws Exception {
 
  62         bucketConfiguration.numNodes = 1;
 
  63         bucketConfiguration.numReplicas = 1;
 
  64         bucketConfiguration.numVBuckets = 1024;
 
  65         bucketConfiguration.name = name;
 
  66         bucketConfiguration.type = Bucket.BucketType.COUCHBASE;
 
  67         bucketConfiguration.password = password;
 
  68         ArrayList<BucketConfiguration> configList = new ArrayList<BucketConfiguration>();
 
  69         configList.add(bucketConfiguration);
 
  70         couchbaseMock = new CouchbaseMock(0, configList);
 
  71         couchbaseMock.start();
 
  72         couchbaseMock.waitForStartup();
 
  75     protected void createClient() {
 
  76         cluster = CouchbaseCluster.create(DefaultCouchbaseEnvironment.builder()
 
  77                                                   .bootstrapCarrierDirectPort(carrierPort)
 
  78                                                   .bootstrapHttpDirectPort(httpPort)
 
  79                                                   .build(), "couchbase://127.0.0.1");
 
  80         bucket = cluster.openBucket("default");
 
  84     public void setUp() throws Exception {
 
  85         createMock("default", "");
 
  86         getPortInfo("default");
 
  91     public void tearDown() {
 
  92         if (cluster != null) {
 
  95         if (couchbaseMock != null) {
 
  98         if (mockClient != null) {
 
  99             mockClient.shutdown();
 
 104     public void testSaveJsonsWithTopicId() {
 
 105         ApplicationConfiguration appConfig = new ApplicationConfiguration();
 
 106         appConfig.setTimestampLabel("datalake_ts_");
 
 108         String text = "{ data: { data2 : { value : 'hello'}}}";
 
 110         JSONObject json = new JSONObject(text);
 
 112         Topic topic = new Topic("test getMessageId");
 
 113         topic.setMessageIdPath("/data/data2/value");
 
 114         List<JSONObject> jsons = new ArrayList<>();
 
 115         json.put(appConfig.getTimestampLabel(), 1234);
 
 117         CouchbaseService couchbaseService = new CouchbaseService();
 
 118         couchbaseService.bucket = bucket;
 
 119         couchbaseService.config = appConfig;
 
 120         couchbaseService.saveJsons(topic.getTopicConfig(), jsons);
 
 125     public void testSaveJsonsWithOutTopicId() {
 
 126         ApplicationConfiguration appConfig = new ApplicationConfiguration();
 
 127         appConfig.setTimestampLabel("datalake_ts_");
 
 129         String text = "{ data: { data2 : { value : 'hello'}}}";
 
 131         JSONObject json = new JSONObject(text);
 
 133         Topic topic = new Topic("test getMessageId");
 
 134         List<JSONObject> jsons = new ArrayList<>();
 
 135         json.put(appConfig.getTimestampLabel(), 1234);
 
 137         CouchbaseService couchbaseService = new CouchbaseService();
 
 138         couchbaseService.bucket = bucket;
 
 139         couchbaseService.config = appConfig;
 
 140         couchbaseService.saveJsons(topic.getTopicConfig(), jsons);
 
 144     public void testCleanupBucket() {
 
 145         CouchbaseService couchbaseService = new CouchbaseService();
 
 146         couchbaseService.bucket = bucket;
 
 147         ApplicationConfiguration appConfig = new ApplicationConfiguration();
 
 148         couchbaseService.config = appConfig;
 
 149         couchbaseService.cleanUp();