1710 Rebase - Second Attempt
[so.git] / bpmn / MSOCommonBPMN / src / main / java / org / openecomp / mso / client / dmaap / DmaapConsumer.java
1 package org.openecomp.mso.client.dmaap;
2
3 import java.io.FileNotFoundException;
4 import java.io.IOException;
5
6 import com.att.nsa.mr.client.MRClientFactory;
7 import com.att.nsa.mr.client.MRConsumer;
8
9 public class DmaapConsumer {
10
11         private final MRConsumer mrConsumer;
12         public DmaapConsumer() {
13                 mrConsumer = null;
14         }
15         public DmaapConsumer (String filepath) throws FileNotFoundException, IOException {
16                 
17                 mrConsumer = MRClientFactory.createConsumer(filepath);
18         }
19         
20         
21         public MRConsumer getMRConsumer() {
22                 return mrConsumer;
23         }
24         public boolean consume(Consumer consumer) throws Exception {
25                 boolean accepted = false;
26                 while (consumer.continuePolling()) {
27                         for (String message : this.getMRConsumer().fetch()) {
28                                 if (!accepted && consumer.isAccepted(message)) {
29                                         accepted = true;
30                                 } 
31                                 if (accepted) {
32                                         consumer.processMessage(message);
33                                 }
34                         }
35                 }
36                 
37                 return true;
38         }
39         
40 }