c630a1656fb80d13be154c3b3e28d29c055e8b70
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
22
23 import java.util.List;
24 import java.util.Map;
25
26 import org.onap.policy.common.endpoints.event.comm.Topic;
27 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink;
28 import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.CambriaPublisherWrapper;
29 import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.DmaapDmePublisherWrapper;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * This implementation publishes events for the associated DMAAP topic, inline with the calling
35  * thread.
36  */
37 public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTopicSink {
38
39     protected static Logger logger = LoggerFactory.getLogger(InlineDmaapTopicSink.class);
40
41     protected final String userName;
42     protected final String password;
43
44     protected String environment = null;
45     protected String aftEnvironment = null;
46     protected String partner = null;
47     protected String latitude = null;
48     protected String longitude = null;
49
50     protected Map<String, String> additionalProps = null;
51
52     /**
53      * 
54      * @param servers DMaaP servers
55      * @param topic DMaaP Topic to be monitored
56      * @param apiKey DMaaP API Key (optional)
57      * @param apiSecret DMaaP API Secret (optional)
58      * @param consumerGroup DMaaP Reader Consumer Group
59      * @param consumerInstance DMaaP Reader Instance
60      * @param fetchTimeout DMaaP fetch timeout
61      * @param fetchLimit DMaaP fetch limit
62      * @param environment DME2 Environment
63      * @param aftEnvironment DME2 AFT Environment
64      * @param partner DME2 Partner
65      * @param latitude DME2 Latitude
66      * @param longitude DME2 Longitude
67      * @param additionalProps Additional properties to pass to DME2
68      * @param useHttps does connection use HTTPS?
69      * @param allowSelfSignedCerts are self-signed certificates allow
70      * 
71      * @throws IllegalArgumentException An invalid parameter passed in
72      */
73     public InlineDmaapTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
74             String password, String partitionKey, String environment, String aftEnvironment, String partner,
75             String latitude, String longitude, Map<String, String> additionalProps, boolean useHttps,
76             boolean allowSelfSignedCerts) {
77
78         super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts);
79
80         this.userName = userName;
81         this.password = password;
82
83         this.environment = environment;
84         this.aftEnvironment = aftEnvironment;
85         this.partner = partner;
86
87         this.latitude = latitude;
88         this.longitude = longitude;
89
90         this.additionalProps = additionalProps;
91     }
92
93     public InlineDmaapTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
94             String password, String partitionKey, boolean useHttps, boolean allowSelfSignedCerts) {
95
96         super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts);
97
98         this.userName = userName;
99         this.password = password;
100     }
101
102
103     @Override
104     public void init() {
105         if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
106             this.publisher = new CambriaPublisherWrapper(this.servers, this.topic, this.apiKey, this.apiSecret,
107                     this.userName, this.password, this.useHttps, this.allowSelfSignedCerts);
108         } else {
109             this.publisher = new DmaapDmePublisherWrapper(this.servers, this.topic, this.userName, this.password,
110                     this.environment, this.aftEnvironment, this.partner, this.latitude, this.longitude,
111                     this.additionalProps, this.useHttps);
112         }
113
114         logger.info("{}: DMAAP SINK created", this);
115     }
116
117     /**
118      * {@inheritDoc}
119      */
120     @Override
121     public CommInfrastructure getTopicCommInfrastructure() {
122         return Topic.CommInfrastructure.DMAAP;
123     }
124
125
126     @Override
127     public String toString() {
128         return "InlineDmaapTopicSink [userName=" + userName + ", password=" + password
129                 + ", getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()=" + super.toString()
130                 + "]";
131     }
132
133 }