Using ExecutorService with Java8

Executor - Star Wars
Executor - Star Wars
In this post we will move all files from one directory to another using the Executor service in Java 8.

Objective

  • Move files from one directory to another.
  • Use Executor Service with Java 8.

Logic

To move files from one directory to another:
  • List all files in the source directory
  • Create a main class which will call the thread (ProcessFiles.java)
  • The ProcessFiles.java is a Runnable thread, which is having the logic to move files from one directory to another.

Limitation

  • While using the above logic, there are chances that one thread is accessing one file while other thread has already moved it. So, we will get NoSuchFileException. 
  • While moving the files, there could be a case where one thread accessed one file from the directory, but another thread moved the same file in the target directory. In these cases, we can encounter NullPointerException. 

Resolutions

  • For NoSuchFileException, we can check if the same file exists in the target directory, and if not then throw the error.
  • For NullPointerException, we will keep a Map (in this case a ConcurrentHashMap) to keep the records of the files existing in the directory. And as the threads move the files to target directory, we will remove the entry from the Map.

Main class

Lets look into the main() of the MainClass.java
private static final Integer THREADS = 10;
public static ConcurrentHashMap<String, Integer> filesMap = new ConcurrentHashMap<>();
public static void main(String[] args) {
    File workDir = new File("src\\main\\resources\\source");
    if(workDir.exists() && workDir.isDirectory()) {
try {
Stream<Path> paths = Files.walk(Paths.get("src\\main\\resources\\source"), 1);
paths.forEach(path -> {
if(!path.toFile().getName().equalsIgnoreCase("source"))
filesMap.put(path.toFile().getName(), 0);
});
} catch (IOException e) {
e.printStackTrace();
}
File collectDir = new File("src\\main\\resources\\target");
if (collectDir.exists() && collectDir.isDirectory()) {
MainClass.moveUsingMultiThread();
}
}
} 
Here, the main method:
  • Checks if the source and target directory exists
  • Get the stream of Path using Files.walk() on the source directory.
  • For each file in the Stream of paths, it puts an entry into the ConcurrentHashMap declared globally.
  • Calls the method which will in-turn call the thread using Executor Service.
public static void moveUsingMultiThread(){
    System.out.println("Running MultiThreaded Application to Move files from source to target");
ExecutorService executorService = Executors.newFixedThreadPool(THREADS);

for(int i=0; i<5; i++) {
Runnable worker = new ProcessFiles();
executorService.execute(worker);
}
executorService.shutdown();

while (!executorService.isTerminated()) {
// Wait until all threads are finished
}
System.out.println("Finished all threads");
}
Here, the moveUsingMultiThread() method:
  • Set a fixed thread pool using newFixedThreadPool() method of the Executors class.
  • It creates five ProcessFiles threads and executes it using ExecutorService object.
  • It calls shutdown() of ExecutorService once all the thread execution is done. It will wait using the while loop, until all the threads are completed and then print the message "Finished all threads".

ProcessFiles Thread

This is a Runnable thread which 
  • Iterate over the ConcurrentHashMap, 
  • Move the files from source to target using Files.move(), and 
  • Remove the entry from ConcurrentHashMap.
    if(null!= MainClass.filesMap && MainClass.filesMap.size()>0){
        for (Map.Entry<String, Integer> entry : MainClass.filesMap.entrySet()) {
synchronized (this) {
System.out.println(Thread.currentThread().getName()+"---"+entry.getKey() +
"---" + MainClass.filesMap.get(entry.getKey()));
try {
//Multiple checks for multi threads
if (null != MainClass.filesMap &&
null!=MainClass.filesMap.get(entry.getKey()) &&
MainClass.filesMap.get(entry.getKey()) == 0) {
Files.move(Paths.get("src\\main\\resources\\source" +
File.separator + entry.getKey()),
Paths.get("src\\main\\resources\\target" +
File.separator + entry.getKey()));
MainClass.filesMap.remove(entry.getKey());
}
The NoSuchFileExeception mentioned above is handled in the catch block below:

} catch (NoSuchFileException nsfe) {
    if (!new File("src\\main\\resources\\target" +
File.separator + entry.getKey()).exists()) {
nsfe.printStackTrace();
}
} catch (NullPointerException npe){
//There might be a case when some threads will try to access get on fileMap,
// which will be empty at that time. Hence, on NPE if the map is not empty, then
// throw error, else skip
if(null!=MainClass.filesMap
&& MainClass.filesMap.size()>0){
System.out.println("FileName:"+entry.getKey()+" Value:"+entry.getValue());
npe.printStackTrace();
}
}

This will move all the files from source to target directory using 5 ProcessFiles threads and a thread pool size of 10 threads.

While trying this out, we came to a interesting aspect, where the entire list of files can be divided into multiple list (equal to the number of threads) and then passing the sub-list to each thread. By using this logic, you can avoid all the collision because each thread will work on different set of files. This is implemented in this post Using List Partition.

You can find the whole code base here. ExecutorServiceExample

Let me know if you have any suggestion.


Comments