23d3edca9fbb2fa221d9fcc2cfa1b3721e2a49e0
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import org.onap.policy.common.endpoints.event.comm.Topic;
25 import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource;
26
27 /**
28  * This topic source implementation specializes in reading messages over an UEB Bus topic source and
29  * notifying its listeners.
30  */
31 public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource implements UebTopicSource {
32
33     /**
34      * Constructor.
35      * 
36      * @param busTopicParams Parameters object containing all the required inputs
37      * @throws IllegalArgumentException An invalid parameter passed in
38      */
39
40
41     public SingleThreadedUebTopicSource(BusTopicParams busTopicParams) {
42
43         super(busTopicParams);
44
45         this.allowSelfSignedCerts = busTopicParams.isAllowSelfSignedCerts();
46
47         this.init();
48     }
49
50     /**
51      * Initialize the Cambria client.
52      */
53     @Override
54     public void init() {
55         this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder()
56                 .servers(this.servers)
57                 .topic(this.topic)
58                 .apiKey(this.apiKey)
59                 .apiSecret(this.apiSecret)
60                 .consumerGroup(this.consumerGroup)
61                 .consumerInstance(this.consumerInstance)
62                 .fetchTimeout(this.fetchTimeout)
63                 .fetchLimit(this.fetchLimit)
64                 .useHttps(this.useHttps)
65                 .allowSelfSignedCerts(this.allowSelfSignedCerts).build());
66     }
67
68     /**
69      * {@inheritDoc}
70      */
71     @Override
72     public CommInfrastructure getTopicCommInfrastructure() {
73         return Topic.CommInfrastructure.UEB;
74     }
75
76
77     @Override
78     public String toString() {
79         StringBuilder builder = new StringBuilder();
80         builder.append("SingleThreadedUebTopicSource [getTopicCommInfrastructure()=")
81                 .append(getTopicCommInfrastructure()).append(", toString()=").append(super.toString()).append("]");
82         return builder.toString();
83     }
84
85 }