re base code
[sdc.git] / utils / DmaapPublisher / src / main / java / org / openecomp / sdc / dmaap / DmaapPublisher.java
1 package org.openecomp.sdc.dmaap;
2
3 import org.apache.commons.lang3.StringUtils;
4 import org.apache.commons.lang3.math.NumberUtils;
5 import org.kohsuke.args4j.CmdLineException;
6 import org.kohsuke.args4j.CmdLineParser;
7 import org.kohsuke.args4j.OptionHandlerFilter;
8 import org.slf4j.Logger;
9 import org.slf4j.LoggerFactory;
10
11 import java.io.IOException;
12 import java.util.ArrayList;
13 import java.util.Collection;
14 import java.util.List;
15 import java.util.concurrent.ConcurrentLinkedDeque;
16 import java.util.concurrent.CopyOnWriteArrayList;
17 import java.util.function.Consumer;
18 import java.util.stream.IntStream;
19
20 import static org.openecomp.sdc.dmaap.Util.*;
21
22 public class DmaapPublisher {
23     private static final Logger logger = LoggerFactory.getLogger(DmaapPublisher.class);    
24     private static RequestManager requestManager ;
25     private static final ConcurrentLinkedDeque notificationBuffer = new ConcurrentLinkedDeque();
26
27
28     private static final List<Long> registeredTasks = new CopyOnWriteArrayList<>();
29     private DmaapPublisher() {}
30
31     public static void add(String notification){
32         notificationBuffer.add( notification );
33     }
34     public static void addAll(List<String> notifications){
35         notificationBuffer.addAll( notifications );
36     }
37     public static void main(String[] args) {
38         doPublish(args);
39     }
40
41     private static void doPublish( String[] args ) {
42         CliArgs cliArgs = new CliArgs();
43         CmdLineParser parser = new CmdLineParser(cliArgs);
44
45         try {
46             // parse the arguments.
47             parser.parseArgument( args );
48             doPublish( cliArgs );
49         }
50         catch(CmdLineException e) {
51             logger.error("#doPublish - failed to parse arguments.", e);
52             printUsage(parser, e);
53             return;
54         }
55     }
56
57     public static void doPublish( CliArgs cliArgs ){
58         try {
59             // parse the arguments.
60             DmaapPublishTool tool = new DmaapPublishTool( toPath(cliArgs.getYamlPath() , cliArgs.getYamlFilename()) , cliArgs.getNotificationData()  );
61             Collection<String> notifications = new ArrayList<String>( notificationBuffer );
62             tool.addNotifications( notifications );
63             notificationBuffer.removeAll(notifications);
64             Integer concurrentRequestCount = 1;
65             if ( StringUtils.isNotBlank( cliArgs.getConcurrentRequests() ) )
66                 concurrentRequestCount = Integer.parseInt( cliArgs.getConcurrentRequests() );
67             requestManager = new RequestManager( concurrentRequestCount );
68
69             IntStream.range(0,concurrentRequestCount).forEach( it -> {
70                                         //region -  report upon finish mechanishem
71                                         long ticket = System.nanoTime();
72                                         registeredTasks.add( ticket );
73                                         Consumer callback = ( uniqueTicket ) -> {
74                                             synchronized ( registeredTasks ){
75                                                 registeredTasks.remove( (long)uniqueTicket );
76                                                 registeredTasks.notifyAll();
77                                             }};
78
79                                         RunnableReporter task = new RunnableReporter( ticket , tool , cliArgs , callback );
80                                         requestManager.getExecutor().execute( task ) ;
81             });
82         }
83         catch(NumberFormatException e) {
84             logger.error("#doPublish - failed to parse argument CR.", e);
85             return;
86         }
87         catch(Exception e) {
88             logger.error("#doPublish - failed to publish.", e);
89         }
90     }
91
92     public static class RunnableReporter implements Runnable{
93
94             final private long ticket ;
95             final private DmaapPublishTool tool;
96             final private CliArgs cliArgs;
97             final Consumer reporter;
98
99             public RunnableReporter(final long ticket , final DmaapPublishTool tool , final CliArgs args ,  Consumer reporter){
100                 this.ticket = ticket ;
101                 this.tool = tool ;
102                 this.cliArgs = args ;
103                 this.reporter = reporter;
104             }
105             @Override
106             public void run() {
107                 try {
108                     tool.publish( cliArgs.getYamlPath() );
109                     reporter.accept(ticket);
110                 }catch(IOException e){
111                     logger.error("#doPublish - failed to publish.", e);
112                 }catch(InterruptedException e){
113                     logger.error("#doPublish - cannot complete publish, thread interuppted.", e);
114                     Thread.currentThread().interrupt();
115                 }
116             }
117     }
118
119
120     public static List<Long> getRegisteredTasks() {
121         return registeredTasks;
122     }
123
124     public static void preparePublish( String path,  String filename , String concurrentRequests ){
125
126             CliArgs cliArgs = new CliArgs();
127             if ( StringUtils.isNotBlank( filename ) )
128                 cliArgs.setYamlFilename( filename );
129             if ( StringUtils.isNotBlank( path ) )
130                 cliArgs.setYamlPath( path );
131             if ( NumberUtils.isCreatable( concurrentRequests ) )
132                 cliArgs.setConcurrentRequests(  concurrentRequests );
133
134             doPublish( cliArgs );
135
136     }
137
138
139     private static void printUsage(CmdLineParser parser, CmdLineException e) {
140         System.err.println( e.getMessage() );
141         System.err.println("java DmaapPublisher [options...] arguments...");
142         // print the list of available options
143         parser.printUsage(System.err);
144         System.err.println();
145         // print option sample. This is useful some time
146         System.err.println("  Example: java DmaapPublisher " + parser.printExample(OptionHandlerFilter.ALL));
147         
148     }
149 }