* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
-*
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
- *
+ *
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
package org.onap.dmaap.dmf.mr.beans;
/**
* class performing all topic operations
- *
+ *
* @author anowarul.islam
*
*/
fkafkaBrokers = "localhost:9092";
}
-
- props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
- if(Utils.isCadiEnabled()){
- props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
- props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");
- }
-
- fKafkaAdminClient=AdminClient.create ( props );
-
+
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
+ if(Utils.isCadiEnabled()){
+ props.putAll(Utils.addSaslProps());
+ }
+ fKafkaAdminClient=AdminClient.create ( props );
+
}
private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
private final AdminClient fKafkaAdminClient;
-
-
+
+
/**
* DMaaPKafkaMetaBroker constructor initializing
- *
+ *
* @param settings
* @param zk
* @param configDb
*/
public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
- @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
+ @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
fZk = zk;
fCambriaConfig = configDb;
fBaseTopicData = configDb.parse("/topics");
fkafkaBrokers = "localhost:9092";
}
-
- if(Utils.isCadiEnabled()){
- props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
- props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");
- }
- props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
-
- fKafkaAdminClient=AdminClient.create ( props );
-
-
-
+
+ if(Utils.isCadiEnabled()){
+ props.putAll(Utils.addSaslProps());
+ }
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
+
+ fKafkaAdminClient=AdminClient.create ( props );
+
+
+
}
-
+
public DMaaPKafkaMetaBroker( rrNvReadable settings,
- ZkClient zk, ConfigDb configDb,AdminClient client) {
-
+ ZkClient zk, ConfigDb configDb,AdminClient client) {
+
fZk = zk;
fCambriaConfig = configDb;
fBaseTopicData = configDb.parse("/topics");
- fKafkaAdminClient= client;
-
-
-
+ fKafkaAdminClient= client;
+
+
+
}
@Override
/**
* static method get KafkaTopic object
- *
+ *
* @param db
* @param base
* @param topic
*/
@Override
public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
- boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
+ boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
log.info("Creating topic: " + topic);
try {
log.info("Check if topic [" + topic + "] exist.");
// create via kafka
- try {
- final NewTopic topicRequest =
- new NewTopic(topic, partitions, (short)replicas);
- final CreateTopicsResult ctr =
- fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
- final KafkaFuture<Void> ctrResult = ctr.all();
- ctrResult.get();
- // underlying Kafka topic created. now setup our API info
- return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
- } catch (InterruptedException e) {
- log.warn("Execution of describeTopics timed out.");
- throw new ConfigDbException(e);
- } catch (ExecutionException e) {
- log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e);
- throw new ConfigDbException(e.getCause());
- }
-
+ try {
+ final NewTopic topicRequest =
+ new NewTopic(topic, partitions, (short)replicas);
+ final CreateTopicsResult ctr =
+ fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
+ final KafkaFuture<Void> ctrResult = ctr.all();
+ ctrResult.get();
+ // underlying Kafka topic created. now setup our API info
+ return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
+ } catch (InterruptedException e) {
+ log.warn("Execution of describeTopics timed out.");
+ throw new ConfigDbException(e);
+ } catch (ExecutionException e) {
+ log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e);
+ throw new ConfigDbException(e.getCause());
+ }
+
}
@Override
log.info("Deleting topic: " + topic);
try {
log.info("Loading zookeeper client for topic deletion.");
- // topic creation. (Otherwise, the topic is only partially created
+ // topic creation. (Otherwise, the topic is only partially created
// in ZK.)
-
-
+
+
fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
log.info("Zookeeper client loaded successfully. Deleting topic.");
-
+
} catch (Exception e) {
log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
throw new ConfigDbException(e);
/**
* method Providing KafkaTopic Object associated with owner and
* transactionenabled or not
- *
+ *
* @param name
* @param desc
* @param owner
/**
* static method giving kafka topic object
- *
+ *
* @param db
* @param basePath
* @param name
* @throws ConfigDbException
*/
public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
- boolean transactionEnabled) throws ConfigDbException {
+ boolean transactionEnabled) throws ConfigDbException {
final JSONObject o = new JSONObject();
o.put("owner", owner);
o.put("description", desc);
/**
* class performing all user opearation like user is eligible to read,
* write. permitting a user to write and read,
- *
+ *
* @author anowarul.islam
*
*/
public static class KafkaTopic implements Topic {
/**
* constructor initializes
- *
+ *
* @param name
* @param configdb
* @param baseTopic
fOwner = o.optString("owner", "");
fDesc = o.optString("description", "");
fTransactionEnabled = o.optBoolean("txenabled", false);// default
- // value is
- // false
+ // value is
+ // false
// if this topic has an owner, it needs both read/write ACLs. If there's no
- // owner (or it's empty), null is okay -- this is for existing or implicitly
- // created topics.
- JSONObject readers = o.optJSONObject ( "readers" );
- if ( readers == null && fOwner.length () > 0 )
- {
- readers = kEmptyAcl;
- }
- fReaders = fromJson ( readers );
-
- JSONObject writers = o.optJSONObject ( "writers" );
- if ( writers == null && fOwner.length () > 0 )
- {
- writers = kEmptyAcl;
- }
- fWriters = fromJson ( writers );
+ // owner (or it's empty), null is okay -- this is for existing or implicitly
+ // created topics.
+ JSONObject readers = o.optJSONObject ( "readers" );
+ if ( readers == null && fOwner.length () > 0 )
+ {
+ readers = kEmptyAcl;
+ }
+ fReaders = fromJson ( readers );
+
+ JSONObject writers = o.optJSONObject ( "writers" );
+ if ( writers == null && fOwner.length () > 0 )
+ {
+ writers = kEmptyAcl;
+ }
+ fWriters = fromJson ( writers );
}
-
+
private NsaAcl fromJson(JSONObject o) {
NsaAcl acl = new NsaAcl();
if (o != null) {
try
{
final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
-
+
// we have to assume we have current data, or load it again. for the expected use
// case, assuming we can overwrite the data is fine.
final JSONObject o = new JSONObject ();
o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) );
o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) );
fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
-
+
log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
-
+
}
catch ( ConfigDbException | AccessDeniedException x )
{
throw x;
}
-
+
}
private JSONObject safeSerialize(NsaAcl acl) {
private final NsaAcl fReaders;
private final NsaAcl fWriters;
private boolean fTransactionEnabled;
-
+
public boolean isTransactionEnabled() {
return fTransactionEnabled;
}