2 * Copyright 2017 BOCO Corporation. CMCC Technologies Co., Ltd
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onap.vfc.nfvo.emsdriver.collector;
18 import org.apache.commons.logging.Log;
19 import org.apache.commons.logging.LogFactory;
20 import org.onap.vfc.nfvo.emsdriver.commons.constant.Constant;
21 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectMsg;
22 import org.onap.vfc.nfvo.emsdriver.commons.utils.StringUtil;
24 import java.util.concurrent.*;
27 public class TaskThreadService extends Thread {
28 private final ExecutorService pool;
29 private static final Log log = LogFactory.getLog(TaskThreadService.class);
30 private BlockingQueue<CollectMsg> queue = new LinkedBlockingQueue<>();
31 private boolean startFlag = true;
32 private long timeStamp = System.currentTimeMillis();
34 private TaskThreadService(int poolSize) {
35 pool = Executors.newFixedThreadPool(poolSize);
38 public static TaskThreadService getInstance(int poolSize) {
39 return new TaskThreadService(poolSize);
43 public void run() { // run the service
47 if (System.currentTimeMillis() - timeStamp > Constant.ONEMINUTE) {
48 timeStamp = System.currentTimeMillis();
49 log.debug("task queue size " + queue.size());
51 CollectMsg data = receive();
56 pool.execute(new TaskThread(data));
59 } catch (Exception e) {
60 log.error("collect task process fail!" + StringUtil.getStackTrace(e));
65 } catch (Exception ex) {
66 log.error("task ThreadService error " + StringUtil.getStackTrace(ex));
69 log.error("Task ThreadService exit");
73 public CollectMsg receive() {
75 return queue.poll(100, TimeUnit.MILLISECONDS);
76 } catch (Exception e) {
77 log.error("queue.poll is error" + StringUtil.getStackTrace(e));
83 public void add(CollectMsg data) {
86 } catch (Exception e) {
87 log.error("queue.put is error" + StringUtil.getStackTrace(e));
95 public void stopTask() {