3cb167f38e9da296a57cd70c44e690e76c579cff
[policy/distribution.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2018 Intel Corp. All rights reserved.
4  *  Copyright (C) 2019 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.distribution.reception.handling.file;
23
24 import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
25
26 import java.io.File;
27 import java.io.IOException;
28 import java.nio.file.FileSystems;
29 import java.nio.file.Path;
30 import java.nio.file.Paths;
31 import java.nio.file.WatchEvent;
32 import java.nio.file.WatchKey;
33 import java.nio.file.WatchService;
34 import java.util.concurrent.TimeUnit;
35 import java.util.zip.ZipFile;
36
37 import org.onap.policy.common.logging.flexlogger.FlexLogger;
38 import org.onap.policy.common.logging.flexlogger.Logger;
39 import org.onap.policy.common.parameters.ParameterService;
40 import org.onap.policy.distribution.model.Csar;
41 import org.onap.policy.distribution.reception.decoding.PolicyDecodingException;
42 import org.onap.policy.distribution.reception.handling.AbstractReceptionHandler;
43 import org.onap.policy.distribution.reception.statistics.DistributionStatisticsManager;
44
45 /**
46  * Handles reception of inputs from File System which can be used to decode policies.
47  */
48 public class FileSystemReceptionHandler extends AbstractReceptionHandler {
49
50     private static final Logger LOGGER = FlexLogger.getLogger(FileSystemReceptionHandler.class);
51     private boolean running = false;
52
53     /**
54      * {@inheritDoc}.
55      */
56     @Override
57     protected void initializeReception(final String parameterGroupName) {
58         LOGGER.debug("FileSystemReceptionHandler init...");
59         try {
60             final FileSystemReceptionHandlerConfigurationParameterGroup handlerParameters =
61                     ParameterService.get(parameterGroupName);
62             final FileClientHandler fileClientHandler = new FileClientHandler(this, handlerParameters.getWatchPath());
63             final Thread fileWatcherThread = new Thread(fileClientHandler);
64             fileWatcherThread.start();
65         } catch (final Exception ex) {
66             LOGGER.error(ex);
67         }
68     }
69
70     /**
71      * {@inheritDoc}.
72      */
73     @Override
74     public void destroy() {
75         running = false;
76     }
77
78     /**
79      * Method to check the running status of file watcher thread.
80      *
81      * @return the running status
82      */
83     public boolean isRunning() {
84         return running;
85     }
86
87     /**
88      * Initialize the file watcher thread.
89      *
90      * @param watchPath Path to watch
91      */
92     public void initFileWatcher(final String watchPath) throws IOException {
93         try (final WatchService watcher = FileSystems.getDefault().newWatchService()) {
94             final Path dir = Paths.get(watchPath);
95             dir.register(watcher, ENTRY_CREATE);
96             LOGGER.debug("Watch Service registered for dir: " + dir.getFileName());
97             startWatchService(watcher, dir);
98         } catch (final InterruptedException ex) {
99             LOGGER.debug(ex);
100             Thread.currentThread().interrupt();
101         }
102     }
103
104     /**
105      * Method to keep watching the given path for any new file created.
106      *
107      * @param watcher the watcher
108      * @param dir the watch directory
109      * @throws InterruptedException if it occurs
110      */
111     @SuppressWarnings("unchecked")
112     protected void startWatchService(final WatchService watcher, final Path dir) throws InterruptedException {
113         WatchKey key;
114         running = true;
115         while (running) {
116             key = watcher.take();
117
118             for (final WatchEvent<?> event : key.pollEvents()) {
119                 final WatchEvent<Path> ev = (WatchEvent<Path>) event;
120                 final Path fileName = ev.context();
121                 LOGGER.debug("new CSAR found: " + fileName);
122                 DistributionStatisticsManager.updateTotalDistributionCount();
123                 final String fullFilePath = dir.toString() + File.separator + fileName.toString();
124                 waitForFileToBeReady(fullFilePath);
125                 createPolicyInputAndCallHandler(fullFilePath);
126                 LOGGER.debug("CSAR complete: " + fileName);
127             }
128             final boolean valid = key.reset();
129             if (!valid) {
130                 LOGGER.error("Watch key no longer valid!");
131                 break;
132             }
133         }
134     }
135
136     /**
137      * Method to create policy input & call policy handlers.
138      *
139      * @param fileName the filename
140      */
141     protected void createPolicyInputAndCallHandler(final String fileName) {
142         try {
143             final Csar csarObject = new Csar(fileName);
144             DistributionStatisticsManager.updateTotalDownloadCount();
145             inputReceived(csarObject);
146             DistributionStatisticsManager.updateDownloadSuccessCount();
147             DistributionStatisticsManager.updateDistributionSuccessCount();
148         } catch (final PolicyDecodingException ex) {
149             DistributionStatisticsManager.updateDownloadFailureCount();
150             DistributionStatisticsManager.updateDistributionFailureCount();
151             LOGGER.error(ex);
152         }
153     }
154
155     private void waitForFileToBeReady(final String fullFilePath) throws InterruptedException {
156         boolean flag = true;
157         while (flag) {
158             TimeUnit.MILLISECONDS.sleep(100);
159             try (ZipFile zipFile = new ZipFile(fullFilePath)) {
160                 flag = false;
161             } catch (final IOException exp) {
162                 LOGGER.error("file is not ready for reading, wait for sometime and try again", exp);
163             }
164         }
165     }
166 }