2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2018 Intel Corp. All rights reserved.
4 * Copyright (C) 2019 Nordix Foundation.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.distribution.reception.handling.file;
24 import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
27 import java.io.IOException;
28 import java.nio.file.FileSystems;
29 import java.nio.file.Path;
30 import java.nio.file.Paths;
31 import java.nio.file.WatchEvent;
32 import java.nio.file.WatchKey;
33 import java.nio.file.WatchService;
34 import java.util.concurrent.TimeUnit;
35 import java.util.zip.ZipFile;
37 import org.onap.policy.common.parameters.ParameterService;
38 import org.onap.policy.distribution.model.Csar;
39 import org.onap.policy.distribution.reception.decoding.PolicyDecodingException;
40 import org.onap.policy.distribution.reception.handling.AbstractReceptionHandler;
41 import org.onap.policy.distribution.reception.statistics.DistributionStatisticsManager;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
46 * Handles reception of inputs from File System which can be used to decode policies.
48 public class FileSystemReceptionHandler extends AbstractReceptionHandler {
50 private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemReceptionHandler.class);
51 private boolean running = false;
57 protected void initializeReception(final String parameterGroupName) {
58 LOGGER.debug("FileSystemReceptionHandler init...");
60 final FileSystemReceptionHandlerConfigurationParameterGroup handlerParameters =
61 ParameterService.get(parameterGroupName);
62 final FileClientHandler fileClientHandler = new FileClientHandler(this, handlerParameters.getWatchPath());
63 final Thread fileWatcherThread = new Thread(fileClientHandler);
64 fileWatcherThread.start();
65 } catch (final Exception ex) {
66 LOGGER.error("FileSystemReceptionHandler initialization failed", ex);
74 public void destroy() {
79 * Method to check the running status of file watcher thread.
81 * @return the running status
83 public boolean isRunning() {
88 * Initialize the file watcher thread.
90 * @param watchPath Path to watch
92 public void initFileWatcher(final String watchPath) throws IOException {
93 try (final WatchService watcher = FileSystems.getDefault().newWatchService()) {
94 final Path dir = Paths.get(watchPath);
95 dir.register(watcher, ENTRY_CREATE);
96 LOGGER.debug("Watch Service registered for dir: {}", dir.getFileName());
97 startWatchService(watcher, dir);
98 } catch (final InterruptedException ex) {
99 LOGGER.error("FileWatcher initialization failed", ex);
100 Thread.currentThread().interrupt();
105 * Method to keep watching the given path for any new file created.
107 * @param watcher the watcher
108 * @param dir the watch directory
109 * @throws InterruptedException if it occurs
111 @SuppressWarnings("unchecked")
112 protected void startWatchService(final WatchService watcher, final Path dir) throws InterruptedException {
116 key = watcher.take();
118 for (final WatchEvent<?> event : key.pollEvents()) {
119 final WatchEvent<Path> ev = (WatchEvent<Path>) event;
120 final Path fileName = ev.context();
121 LOGGER.debug("new CSAR found: {}", fileName);
122 DistributionStatisticsManager.updateTotalDistributionCount();
123 final String fullFilePath = dir.toString() + File.separator + fileName.toString();
124 waitForFileToBeReady(fullFilePath);
125 createPolicyInputAndCallHandler(fullFilePath);
126 LOGGER.debug("CSAR complete: {}", fileName);
128 final boolean valid = key.reset();
130 LOGGER.error("Watch key no longer valid!");
137 * Method to create policy input & call policy handlers.
139 * @param fileName the filename
141 protected void createPolicyInputAndCallHandler(final String fileName) {
143 final Csar csarObject = new Csar(fileName);
144 DistributionStatisticsManager.updateTotalDownloadCount();
145 inputReceived(csarObject);
146 DistributionStatisticsManager.updateDownloadSuccessCount();
147 DistributionStatisticsManager.updateDistributionSuccessCount();
148 } catch (final PolicyDecodingException ex) {
149 DistributionStatisticsManager.updateDownloadFailureCount();
150 DistributionStatisticsManager.updateDistributionFailureCount();
151 LOGGER.error("Policy creation failed", ex);
155 private void waitForFileToBeReady(final String fullFilePath) throws InterruptedException {
158 TimeUnit.MILLISECONDS.sleep(100);
159 try (ZipFile zipFile = new ZipFile(fullFilePath)) {
161 } catch (final IOException exp) {
162 LOGGER.error("file is not ready for reading, wait for sometime and try again", exp);