2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.policy.drools.event.comm.bus.internal;
23 import java.util.List;
26 import org.slf4j.LoggerFactory;
27 import org.slf4j.Logger;
28 import org.openecomp.policy.drools.event.comm.Topic;
29 import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSink;
32 * This implementation publishes events for the associated DMAAP topic,
33 * inline with the calling thread.
35 public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTopicSink {
37 protected static Logger logger =
38 LoggerFactory.getLogger(InlineDmaapTopicSink.class);
40 protected final String userName;
41 protected final String password;
43 protected String environment = null;
44 protected String aftEnvironment = null;
45 protected String partner = null;
46 protected String latitude = null;
47 protected String longitude = null;
49 protected Map<String,String> additionalProps = null;
53 * @param servers DMaaP servers
54 * @param topic DMaaP Topic to be monitored
55 * @param apiKey DMaaP API Key (optional)
56 * @param apiSecret DMaaP API Secret (optional)
57 * @param consumerGroup DMaaP Reader Consumer Group
58 * @param consumerInstance DMaaP Reader Instance
59 * @param fetchTimeout DMaaP fetch timeout
60 * @param fetchLimit DMaaP fetch limit
61 * @param environment DME2 Environment
62 * @param aftEnvironment DME2 AFT Environment
63 * @param partner DME2 Partner
64 * @param latitude DME2 Latitude
65 * @param longitude DME2 Longitude
66 * @param additionalProps Additional properties to pass to DME2
67 * @param useHttps does connection use HTTPS?
68 * @param allowSelfSignedCerts are self-signed certificates allow
70 * @throws IllegalArgumentException An invalid parameter passed in
72 public InlineDmaapTopicSink(List<String> servers, String topic,
73 String apiKey, String apiSecret,
74 String userName, String password,
76 String environment, String aftEnvironment, String partner,
77 String latitude, String longitude, Map<String,String> additionalProps,
78 boolean useHttps, boolean allowSelfSignedCerts)
79 throws IllegalArgumentException {
81 super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts);
83 this.userName = userName;
84 this.password = password;
86 this.environment = environment;
87 this.aftEnvironment = aftEnvironment;
88 this.partner = partner;
90 this.latitude = latitude;
91 this.longitude = longitude;
93 this.additionalProps = additionalProps;
96 public InlineDmaapTopicSink(List<String> servers, String topic,
97 String apiKey, String apiSecret,
98 String userName, String password,
99 String partitionKey, boolean useHttps, boolean allowSelfSignedCerts)
100 throws IllegalArgumentException {
102 super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts);
104 this.userName = userName;
105 this.password = password;
111 if ((this.environment == null || this.environment.isEmpty()) &&
112 (this.aftEnvironment == null || this.aftEnvironment.isEmpty()) &&
113 (this.latitude == null || this.latitude.isEmpty()) &&
114 (this.longitude == null || this.longitude.isEmpty()) &&
115 (this.partner == null || this.partner.isEmpty())) {
117 new BusPublisher.DmaapAafPublisherWrapper(this.servers,
120 this.password, this.useHttps);
123 new BusPublisher.DmaapDmePublisherWrapper(this.servers, this.topic,
124 this.userName, this.password,
125 this.environment, this.aftEnvironment,
126 this.partner, this.latitude, this.longitude,
127 this.additionalProps, this.useHttps);
130 logger.info("{}: DMAAP SINK created", this);
137 public CommInfrastructure getTopicCommInfrastructure() {
138 return Topic.CommInfrastructure.DMAAP;
143 public String toString() {
144 StringBuilder builder = new StringBuilder();
145 builder.append("InlineDmaapTopicSink [userName=").append(userName).append(", password=").append(password)
146 .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()).append(", toString()=")
147 .append(super.toString()).append("]");
148 return builder.toString();