2  * ========================LICENSE_START=================================
 
   4  * ======================================================================
 
   5  * Copyright (C) 2022 Nordix Foundation. All rights reserved.
 
   6  * ======================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ========================LICENSE_END===================================
 
  21 package org.onap.ccsdk.oran.a1policymanagementservice.datastore;
 
  23 import java.lang.invoke.MethodHandles;
 
  25 import java.util.concurrent.CompletableFuture;
 
  27 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
 
  28 import org.slf4j.Logger;
 
  29 import org.slf4j.LoggerFactory;
 
  31 import reactor.core.publisher.Flux;
 
  32 import reactor.core.publisher.Mono;
 
  33 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 
  34 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 
  35 import software.amazon.awssdk.core.BytesWrapper;
 
  36 import software.amazon.awssdk.core.ResponseBytes;
 
  37 import software.amazon.awssdk.core.async.AsyncRequestBody;
 
  38 import software.amazon.awssdk.core.async.AsyncResponseTransformer;
 
  39 import software.amazon.awssdk.regions.Region;
 
  40 import software.amazon.awssdk.services.s3.S3AsyncClient;
 
  41 import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
 
  42 import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
 
  43 import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
 
  44 import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
 
  45 import software.amazon.awssdk.services.s3.model.DeleteBucketResponse;
 
  46 import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
 
  47 import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
 
  48 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
 
  49 import software.amazon.awssdk.services.s3.model.GetObjectResponse;
 
  50 import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
 
  51 import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
 
  52 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
 
  53 import software.amazon.awssdk.services.s3.model.PutObjectResponse;
 
  54 import software.amazon.awssdk.services.s3.model.S3Object;
 
  56 class S3ObjectStore implements DataStore {
 
  57     private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
  58     private final ApplicationConfig applicationConfig;
 
  60     private static S3AsyncClient s3AsynchClient;
 
  61     private final String location;
 
  63     public S3ObjectStore(ApplicationConfig applicationConfig, String location) {
 
  64         this.applicationConfig = applicationConfig;
 
  65         this.location = location;
 
  67         getS3AsynchClient(applicationConfig);
 
  70     private static synchronized S3AsyncClient getS3AsynchClient(ApplicationConfig applicationConfig) {
 
  71         if (applicationConfig.isS3Enabled() && s3AsynchClient == null) {
 
  72             s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build();
 
  74         return s3AsynchClient;
 
  77     private static S3AsyncClientBuilder getS3AsyncClientBuilder(ApplicationConfig applicationConfig) {
 
  78         URI uri = URI.create(applicationConfig.getS3EndpointOverride());
 
  79         return S3AsyncClient.builder() //
 
  80                 .region(Region.US_EAST_1) //
 
  81                 .endpointOverride(uri) //
 
  82                 .credentialsProvider(StaticCredentialsProvider.create( //
 
  83                         AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), //
 
  84                                 applicationConfig.getS3SecretAccessKey())));
 
  88     public Flux<String> listObjects(String prefix) {
 
  89         return listObjectsInBucket(bucket(), location + "/" + prefix).map(S3Object::key) //
 
  90                 .map(this::externalName);
 
  94     public Mono<Boolean> deleteObject(String name) {
 
  95         DeleteObjectRequest request = DeleteObjectRequest.builder() //
 
 100         CompletableFuture<DeleteObjectResponse> future = s3AsynchClient.deleteObject(request);
 
 102         return Mono.fromFuture(future).map(resp -> true);
 
 106     public Mono<byte[]> readObject(String name) {
 
 107         return getDataFromS3Object(bucket(), name);
 
 111     public Mono<byte[]> writeObject(String name, byte[] fileData) {
 
 113         PutObjectRequest request = PutObjectRequest.builder() //
 
 118         AsyncRequestBody body = AsyncRequestBody.fromBytes(fileData);
 
 120         CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
 
 122         return Mono.fromFuture(future) //
 
 123                 .map(putObjectResponse -> fileData) //
 
 124                 .doOnError(t -> logger.error("Failed to store object '{}' in S3 {}", key(name), t.getMessage()));
 
 128     public Mono<String> createDataStore() {
 
 129         return createS3Bucket(bucket());
 
 132     private Mono<String> createS3Bucket(String s3Bucket) {
 
 134         CreateBucketRequest request = CreateBucketRequest.builder() //
 
 138         CompletableFuture<CreateBucketResponse> future = s3AsynchClient.createBucket(request);
 
 140         return Mono.fromFuture(future) //
 
 141                 .map(f -> s3Bucket) //
 
 142                 .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage()))
 
 143                 .onErrorResume(t -> Mono.just(s3Bucket));
 
 147     public Mono<String> deleteAllObjects() {
 
 148         return listObjects("") //
 
 149                 .flatMap(this::deleteObject) //
 
 151                 .map(resp -> "OK").onErrorResume(t -> Mono.just("NOK"));
 
 154     public Mono<String> deleteBucket() {
 
 155         DeleteBucketRequest request = DeleteBucketRequest.builder() //
 
 159         CompletableFuture<DeleteBucketResponse> future = s3AsynchClient.deleteBucket(request);
 
 161         return Mono.fromFuture(future) //
 
 162                 .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(), t.getMessage()))
 
 163                 .map(resp -> bucket()) //
 
 164                 .doOnNext(resp -> logger.debug("Deleted bucket: {}", bucket())).onErrorResume(t -> Mono.just("NOK"));
 
 167     private String bucket() {
 
 168         return applicationConfig.getS3Bucket();
 
 171     private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) {
 
 173         return listObjectsRequest(bucket, prefix, null) //
 
 174                 .expand(response -> listObjectsRequest(bucket, prefix, response)) //
 
 175                 .map(ListObjectsResponse::contents) //
 
 176                 .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) //
 
 177                 .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) //
 
 178                 .flatMap(Flux::fromIterable) //
 
 179                 .doOnNext(obj -> logger.debug("Found object: {}", obj.key()));
 
 182     private Mono<ListObjectsResponse> listObjectsRequest(String bucket, String prefix,
 
 183             ListObjectsResponse prevResponse) {
 
 184         ListObjectsRequest.Builder builder = ListObjectsRequest.builder() //
 
 189         if (prevResponse != null) {
 
 190             if (Boolean.TRUE.equals(prevResponse.isTruncated())) {
 
 191                 builder.marker(prevResponse.nextMarker());
 
 197         ListObjectsRequest listObjectsRequest = builder.build();
 
 198         CompletableFuture<ListObjectsResponse> future = s3AsynchClient.listObjects(listObjectsRequest);
 
 199         return Mono.fromFuture(future);
 
 202     private Mono<byte[]> getDataFromS3Object(String bucket, String name) {
 
 204         GetObjectRequest request = GetObjectRequest.builder() //
 
 209         CompletableFuture<ResponseBytes<GetObjectResponse>> future =
 
 210                 s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
 
 212         return Mono.fromFuture(future) //
 
 213                 .map(BytesWrapper::asByteArray) //
 
 214                 .doOnError(t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key(name), bucket,
 
 216                 .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key(name))) //
 
 217                 .onErrorResume(t -> Mono.empty());
 
 220     private String key(String name) {
 
 221         return location + "/" + name;
 
 224     private String externalName(String internalName) {
 
 225         return internalName.substring(key("").length());