87cf9fef4dfbce266f75a984853913a9e3d4d417
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * DCAEGEN2-SERVICES-SDK
4  * ================================================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct;
21
22 import io.netty.buffer.ByteBuf;
23 import io.vavr.collection.HashSet;
24 import io.vavr.control.Try;
25
26 import java.net.InetSocketAddress;
27 import java.nio.file.Path;
28 import java.nio.file.Paths;
29 import java.time.Duration;
30
31 import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys;
32 import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore;
33 import org.onap.dcaegen2.services.sdk.security.ssl.Passwords;
34 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer;
35 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory;
36 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions;
37 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions.Builder;
38 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableWireFrameVersion;
39 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion;
40 import org.onap.ves.VesEventOuterClass.VesEvent;
41 import reactor.core.publisher.Flux;
42
43 /**
44  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
45  */
46 public class SystemUnderTestWrapper {
47
48     private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(5);
49     private final DummyCollector collector = new DummyCollector();
50     private HvVesProducer cut;
51     private final Duration timeout;
52
53     public SystemUnderTestWrapper(Duration timeout) {
54         this.timeout = timeout;
55     }
56
57     public SystemUnderTestWrapper() {
58         this(DEFAULT_TIMEOUT);
59     }
60
61     public void startSecure() {
62         start(ImmutableProducerOptions.builder()
63                 .securityKeys(ImmutableSecurityKeys.builder()
64                         .keyStore(ImmutableSecurityKeysStore.of(resource("/client.p12").get()))
65                         .keyStorePassword(Passwords.fromResource("/client.pass").get())
66                         .trustStore(ImmutableSecurityKeysStore.of(resource("/trust.p12").get()))
67                         .trustStorePassword(Passwords.fromResource("/trust.pass").get())
68                         .build()));
69     }
70
71     public void start() {
72         start(createDefaultOptions());
73     }
74
75     public void start(ImmutableProducerOptions.Builder optionsBuilder) {
76         InetSocketAddress collectorAddress = collector.start();
77         WireFrameVersion WTPVersion = ImmutableWireFrameVersion.builder().build();
78         cut = HvVesProducerFactory.create(
79                 optionsBuilder.collectorAddresses(HashSet.of(collectorAddress))
80                         .wireFrameVersion(WTPVersion).build());
81     }
82
83     public void stop() {
84         collector.stop();
85     }
86
87     public ByteBuf blockingSend(Flux<VesEvent> events) {
88         events.transform(cut::send).subscribe();
89         collector.blockUntilFirstClientIsHandled(timeout);
90         return collector.dataFromFirstClient();
91     }
92
93     private Builder createDefaultOptions() {
94         return ImmutableProducerOptions.builder();
95     }
96
97     private Try<Path> resource(String resource) {
98         return Try.of(() -> Paths.get(Passwords.class.getResource(resource).toURI()));
99     }
100
101 }