Support kafka attributes
[dmaap/dbcapi.git] / src / main / java / org / onap / dmaap / dbcapi / resources / TopicResource.java
index cab48ca..dbeea63 100644 (file)
@@ -35,6 +35,7 @@ import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.GenericEntity;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
@@ -57,11 +58,16 @@ import org.onap.dmaap.dbcapi.util.DmaapConfig;
 @Authorization
 public class TopicResource extends BaseLoggingClass {
        private static FqtnType defaultTopicStyle;
+       private static String defaultPartitionCount;
+       private static String defaultReplicationCount;
        TopicService mr_topicService = new TopicService();
        
        public TopicResource() {
                DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
-               defaultTopicStyle = FqtnType.Validator( p.getProperty("MR.topicStyle", "FQTN_LEGACY_FORMAT"));
+               defaultTopicStyle = FqtnType.Validator( p.getProperty("MR.topicStyle", "FQTN_LEGACY_FORMAT"));
+               defaultPartitionCount = p.getProperty( "MR.partitionCount", "2");
+               defaultReplicationCount = p.getProperty( "MR.replicationCount", "1");
+               
                logger.info( "Setting defaultTopicStyle=" + defaultTopicStyle );
        }
                
@@ -94,9 +100,10 @@ public class TopicResource extends BaseLoggingClass {
            @ApiResponse( code = 400, message = "Error", response = ApiError.class )
        })
        public Response  addTopic( 
-                       Topic topic
+                       Topic topic,
+                       @QueryParam("useExisting") String useExisting
                        ) {
-               logger.info( "addTopic request: " + topic );
+               logger.info( "addTopic request: " + topic  + " useExisting=" + useExisting );
                ApiService check = new ApiService();
 
                try {
@@ -117,9 +124,21 @@ public class TopicResource extends BaseLoggingClass {
                        logger.info( "setting defaultTopicStyle=" + defaultTopicStyle + " for topic " + topic.getTopicName() );
                        topic.setFqtnStyle( defaultTopicStyle );
                }
+               String pc = topic.getPartitionCount();
+               if ( pc == null ) {
+                       topic.setPartitionCount(defaultPartitionCount);
+               }
+               String rc = topic.getReplicationCount();
+               if ( rc == null ) {
+                       topic.setReplicationCount(defaultReplicationCount);
+               }
                topic.setLastMod();
+               Boolean flag = false;
+               if (useExisting != null) {
+                       flag = "true".compareToIgnoreCase( useExisting ) == 0;
+               }
                
-               Topic mrc =  mr_topicService.addTopic(topic, check.getErr());
+               Topic mrc =  mr_topicService.addTopic(topic, check.getErr(), flag);
                if ( mrc != null && check.getErr().is2xx() ) {
                        return check.success(Status.CREATED.getStatusCode(), mrc);
                }