26960379e326a8ff56cb39766fd506463084c697
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import java.net.MalformedURLException;
25 import java.util.Map;
26 import org.onap.policy.common.endpoints.event.comm.Topic;
27 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * This topic reader implementation specializes in reading messages over DMAAP topic and notifying
33  * its listeners.
34  */
35 public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource implements DmaapTopicSource, Runnable {
36
37     private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class);
38
39
40     protected final String userName;
41     protected final String password;
42
43     protected String environment = null;
44     protected String aftEnvironment = null;
45     protected String partner = null;
46     protected String latitude = null;
47     protected String longitude = null;
48
49     protected Map<String, String> additionalProps = null;
50
51
52     /**
53      * Constructor.
54      *
55      * @param busTopicParams Parameters object containing all the required inputs
56      *
57      * @throws IllegalArgumentException An invalid parameter passed in
58      */
59     public SingleThreadedDmaapTopicSource(BusTopicParams busTopicParams) {
60
61         super(busTopicParams);
62
63         this.userName = busTopicParams.getUserName();
64         this.password = busTopicParams.getPassword();
65
66         this.environment = busTopicParams.getEnvironment();
67         this.aftEnvironment = busTopicParams.getAftEnvironment();
68         this.partner = busTopicParams.getPartner();
69
70         this.latitude = busTopicParams.getLatitude();
71         this.longitude = busTopicParams.getLongitude();
72
73         this.additionalProps = busTopicParams.getAdditionalProps();
74         try {
75             this.init();
76         } catch (Exception e) {
77             throw new IllegalArgumentException("ERROR during init in dmaap-source: cannot create topic " + topic, e);
78         }
79     }
80
81
82     /**
83      * Initialize the Cambria or MR Client.
84      */
85     @Override
86     public void init() throws MalformedURLException {
87         BusTopicParams.TopicParamsBuilder builder = BusTopicParams.builder()
88             .servers(this.servers)
89             .topic(this.effectiveTopic)
90             .apiKey(this.apiKey)
91             .apiSecret(this.apiSecret)
92             .consumerGroup(this.consumerGroup)
93             .consumerInstance(this.consumerInstance)
94             .fetchTimeout(this.fetchTimeout)
95             .fetchLimit(this.fetchLimit)
96             .useHttps(this.useHttps)
97             .allowTracing(this.allowTracing);
98
99         if (anyNullOrEmpty(this.userName, this.password)) {
100             this.consumer = new BusConsumer.CambriaConsumerWrapper(builder
101                     .allowSelfSignedCerts(this.allowSelfSignedCerts)
102                     .build());
103         } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
104             this.consumer = new BusConsumer.CambriaConsumerWrapper(builder
105                     .userName(this.userName)
106                     .password(this.password)
107                     .allowSelfSignedCerts(this.allowSelfSignedCerts)
108                     .build());
109         } else {
110             this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(builder
111                     .userName(this.userName)
112                     .password(this.password)
113                     .environment(this.environment)
114                     .aftEnvironment(this.aftEnvironment)
115                     .partner(this.partner)
116                     .latitude(this.latitude)
117                     .longitude(this.longitude)
118                     .additionalProps(this.additionalProps)
119                     .build());
120         }
121
122         logger.info("{}: INITTED", this);
123     }
124
125     @Override
126     public CommInfrastructure getTopicCommInfrastructure() {
127         return Topic.CommInfrastructure.DMAAP;
128     }
129
130     @Override
131     public String toString() {
132         return "SingleThreadedDmaapTopicSource [userName=" + userName
133             + ", password=" + (password == null || password.isEmpty() ? "-" : password.length())
134             + ", getTopicCommInfrastructure()=" + getTopicCommInfrastructure()
135             + ", toString()=" + super.toString() + "]";
136     }
137
138
139 }