3ea7185e6364841db4b7573ac175c27c14888fe6
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
22
23 import java.util.List;
24 import java.util.Map;
25
26 import org.onap.policy.common.endpoints.event.comm.Topic;
27 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * This implementation publishes events for the associated DMAAP topic, inline with the calling
33  * thread.
34  */
35 public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTopicSink {
36
37     protected static Logger logger = LoggerFactory.getLogger(InlineDmaapTopicSink.class);
38
39     protected final String userName;
40     protected final String password;
41
42     protected String environment = null;
43     protected String aftEnvironment = null;
44     protected String partner = null;
45     protected String latitude = null;
46     protected String longitude = null;
47
48     protected Map<String, String> additionalProps = null;
49
50     /**
51      * 
52      * @param servers DMaaP servers
53      * @param topic DMaaP Topic to be monitored
54      * @param apiKey DMaaP API Key (optional)
55      * @param apiSecret DMaaP API Secret (optional)
56      * @param consumerGroup DMaaP Reader Consumer Group
57      * @param consumerInstance DMaaP Reader Instance
58      * @param fetchTimeout DMaaP fetch timeout
59      * @param fetchLimit DMaaP fetch limit
60      * @param environment DME2 Environment
61      * @param aftEnvironment DME2 AFT Environment
62      * @param partner DME2 Partner
63      * @param latitude DME2 Latitude
64      * @param longitude DME2 Longitude
65      * @param additionalProps Additional properties to pass to DME2
66      * @param useHttps does connection use HTTPS?
67      * @param allowSelfSignedCerts are self-signed certificates allow
68      * 
69      * @throws IllegalArgumentException An invalid parameter passed in
70      */
71     public InlineDmaapTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
72             String password, String partitionKey, String environment, String aftEnvironment, String partner,
73             String latitude, String longitude, Map<String, String> additionalProps, boolean useHttps,
74             boolean allowSelfSignedCerts) {
75
76         super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts);
77
78         this.userName = userName;
79         this.password = password;
80
81         this.environment = environment;
82         this.aftEnvironment = aftEnvironment;
83         this.partner = partner;
84
85         this.latitude = latitude;
86         this.longitude = longitude;
87
88         this.additionalProps = additionalProps;
89     }
90
91     public InlineDmaapTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
92             String password, String partitionKey, boolean useHttps, boolean allowSelfSignedCerts) {
93
94         super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts);
95
96         this.userName = userName;
97         this.password = password;
98     }
99
100
101     @Override
102     public void init() {
103         if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
104             this.publisher = new BusPublisher.CambriaPublisherWrapper(this.servers, this.topic, this.apiKey,
105                     this.apiSecret, this.userName, this.password, this.useHttps, this.allowSelfSignedCerts);
106         } else {
107             this.publisher = new BusPublisher.DmaapDmePublisherWrapper(this.servers, this.topic, this.userName,
108                     this.password, this.environment, this.aftEnvironment, this.partner, this.latitude, this.longitude,
109                     this.additionalProps, this.useHttps);
110         }
111
112         logger.info("{}: DMAAP SINK created", this);
113     }
114
115     /**
116      * {@inheritDoc}
117      */
118     @Override
119     public CommInfrastructure getTopicCommInfrastructure() {
120         return Topic.CommInfrastructure.DMAAP;
121     }
122
123
124     @Override
125     public String toString() {
126         return "InlineDmaapTopicSink [userName=" + userName + ", password=" + password
127                 + ", getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()=" + super.toString()
128                 + "]";
129     }
130
131 }