DMAAP-MR - Merge MR repos
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / backends / memory / MemoryConsumerFactory.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.backends.memory;
23
24 import org.onap.dmaap.dmf.mr.CambriaApiException;
25 import org.onap.dmaap.dmf.mr.backends.Consumer;
26 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
27
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.HashMap;
31
32 /**
33  * 
34  * @author anowarul.islam
35  *
36  */
37 public class MemoryConsumerFactory implements ConsumerFactory
38 {
39
40         private final MemoryQueue fQueue;
41         
42         /**
43          * 
44          * Initializing constructor
45          * @param q
46          */
47         public MemoryConsumerFactory ( MemoryQueue q )
48         {
49                 fQueue = q;
50         }
51
52         /**
53          * 
54          * @param topic
55          * @param consumerGroupId
56          * @param clientId
57          * @param timeoutMs
58          * @return Consumer
59          */
60         @Override
61         public Consumer getConsumerFor ( String topic, String consumerGroupId, String clientId, int timeoutMs, String remotehost )
62         {
63                 return new MemoryConsumer ( topic, consumerGroupId );
64         }
65
66         /**
67          * 
68          * Define nested inner class
69          *
70          */
71         private class MemoryConsumer implements Consumer
72         {
73
74                 private final String fTopic;
75                 private final String fConsumer;
76                 private final long fCreateMs;
77                 private long fLastAccessMs;
78                 
79                 /**
80                  * 
81                  * Initializing MemoryConsumer constructor 
82                  * @param topic
83                  * @param consumer
84                  * 
85                  */
86                 public MemoryConsumer ( String topic, String consumer )
87                 {
88                         fTopic = topic;
89                         fConsumer = consumer;
90                         fCreateMs = System.currentTimeMillis ();
91                         fLastAccessMs = fCreateMs;
92                 }
93
94                 @Override
95                 /**
96                  * 
97                  * return consumer details  
98                  */
99                 public Message nextMessage ()
100                 {
101                         return fQueue.get ( fTopic, fConsumer );
102                 }
103
104                 @Override
105                 public boolean close() {
106                         //Nothing to close/clean up.
107                         return true;
108                 }
109                 /**
110                  * 
111                  */
112                 public void commitOffsets()
113                 {
114                         // ignoring this aspect
115                 }
116                 /**
117                  * get offset
118                  */
119                 public long getOffset()
120                 {
121                         return 0;
122                 }
123
124                 @Override
125                 /**
126                  * get consumer topic name
127                  */
128                 public String getName ()
129                 {
130                         return fTopic + "/" + fConsumer;
131                 }
132
133                 @Override
134                 public long getCreateTimeMs ()
135                 {
136                         return fCreateMs;
137                 }
138
139                 @Override
140                 public long getLastAccessMs ()
141                 {
142                         return fLastAccessMs;
143                 }
144
145                 
146
147                 @Override
148                 public void setOffset(long offset) {
149                         // TODO Auto-generated method stub
150                         
151                 }
152
153                 
154         }
155
156         @Override
157         public void destroyConsumer(String topic, String consumerGroupId,
158                         String clientId) {
159                 //No cache for memory consumers, so NOOP
160         }
161
162         @Override
163         public void dropCache ()
164         {
165                 // nothing to do - there's no cache here
166         }
167
168         @Override
169         /**
170          * @return ArrayList<MemoryConsumer>
171          */
172         public Collection<? extends Consumer> getConsumers ()
173         {
174                 return new ArrayList<> ();
175         }
176
177         @Override
178         public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs,
179                         String remotehost) throws UnavailableException, CambriaApiException {
180                 // TODO Auto-generated method stub
181                 return null;
182         }
183
184         
185 }