566da0f381fea89250bb14e4ba0e132483a43406
[vfc/nfvo/driver/ems.git] / ems / boco / src / main / java / org / onap / vfc / nfvo / emsdriver / collector / TaskThreadService.java
1 /*
2  * Copyright 2017 BOCO Corporation.  CMCC Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onap.vfc.nfvo.emsdriver.collector;
17
18 import org.apache.commons.logging.Log;
19 import org.apache.commons.logging.LogFactory;
20 import org.onap.vfc.nfvo.emsdriver.commons.constant.Constant;
21 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectMsg;
22 import org.onap.vfc.nfvo.emsdriver.commons.utils.StringUtil;
23
24 import java.util.concurrent.*;
25
26
27 public class TaskThreadService extends Thread {
28     private final ExecutorService pool;
29     private static final Log log = LogFactory.getLog(TaskThreadService.class);
30     private BlockingQueue<CollectMsg> queue = new LinkedBlockingQueue<>();
31     private boolean startFlag = true;
32     private long timeStamp = System.currentTimeMillis();
33
34     private TaskThreadService(int poolSize) {
35         pool = Executors.newFixedThreadPool(poolSize);
36     }
37
38     public static TaskThreadService getInstance(int poolSize) {
39         return new TaskThreadService(poolSize);
40     }
41     
42     @Override
43     public void run() { // run the service
44         try {
45             while (startFlag) {
46                 try {
47                     if (System.currentTimeMillis() - timeStamp > Constant.ONEMINUTE) {
48                         timeStamp = System.currentTimeMillis();
49                         log.debug("task queue size " + queue.size());
50                     }
51                     CollectMsg data = receive();
52                     if (data == null) {
53                         continue;
54                     }
55
56                     pool.execute(new TaskThread(data));
57
58
59                 } catch (Exception e) {
60                     log.error("collect task process fail!" + StringUtil.getStackTrace(e));
61                 }
62
63             }
64
65         } catch (Exception ex) {
66             log.error("task ThreadService error " + StringUtil.getStackTrace(ex));
67             pool.shutdown();
68         }
69         log.error("Task ThreadService exit");
70     }
71
72
73     public CollectMsg receive() {
74         try {
75             return queue.poll(100, TimeUnit.MILLISECONDS);
76         } catch (Exception e) {
77             log.error("queue.poll is error" + StringUtil.getStackTrace(e));
78         }
79         return null;
80     }
81
82
83     public void add(CollectMsg data) {
84         try {
85             queue.put(data);
86         } catch (Exception e) {
87             log.error("queue.put is error" + StringUtil.getStackTrace(e));
88         }
89     }
90
91     public int size() {
92         return queue.size();
93     }
94
95     public void stopTask() {
96         startFlag = false;
97     }
98 }