Create a thread pool in java without using executor framework.
We will use blocking queue to implements a thread pool.
1. What is thread pool in java?
Thread pool is collection of threads, which are created to perform certain tasks.
Thread creation is costly IO operation.
It’s not advisable to create & destroy thread(s) every now and then.
It’s recommended to use pool of threads as per application’s need.
We will implementcustom thread pool using following classes.
BlockingQueue: BlockingQueue class will be used to store tasks.
TaskExecutor: TaskExecutor class is capable of executing the task.
ThreadPool: ThreadPool class is responsible for en-queuing task to blocking queue,
TestTask: The task or operation, which we want to execute.
TestThreadPool: TestThreadPool class creates the tasks and submit tasks to thread pool.
2. Execution flow of thread pool in java
Task Producer will generate the task.
Task submitted to Blocking queue (our custom implementation)
Available threads (Task Executor) in the thread pool get the tasks from blocking queue
Thread(s) executes & finishes the task
Thread become available to pick another task from queue
3. Create custom thread pool in java (without executor framework/example)
We have used the custom blocking queue implementation to demonstrate the thread pool in java.
The ThreadPool encapsulates the custom BlockingQueue class and TaskExecutor class.
3.1. ThreadPool class
Threadpool class creates numbers of TaskExecutor instances.
TaskExecutor class will be responsible for executing the tasks
ThreadPool class exposes one method submitTask.
submitTask method will be called by task generating program, to submit a task to threadPool.
package org.learn.Pool;
public class ThreadPool {
BlockingQueue <Runnable> queue;
public ThreadPool(int queueSize, int nThread) {
queue = new BlockingQueue<>(queueSize);
String threadName = null;
TaskExecutor task = null;
for (int count = 0; count < nThread; count++) {
threadName = "Thread-"+count;
task = new TaskExecutor(queue);
Thread thread = new Thread(task, threadName);
thread.start();
}
}
public void submitTask(Runnable task) throws InterruptedException {
queue.enqueue(task);
}
}
3.2. TaskExecutor class:
TaskExecutor class implements Runnable interface.
The method of TaskExecutor class dequeue the task from the queue (BlockingQueue)
TaskExecutor class executes the task.
package org.learn.Pool;
public class TaskExecutor implements Runnable {
BlockingQueue<Runnable> queue;
public TaskExecutor(BlockingQueue<Runnable> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
String name = Thread.currentThread().getName();
Runnable task = queue.dequeue();
System.out.println("Task Started by Thread :" + name);
task.run();
System.out.println("Task Finished by Thread :" + name);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3.3. BlockingQueue class to implement thread pool
BlockingQueue simulates the blocking queue implementation.
We have used LinkedList as underlying data structure.
BlockingQueue contains couple of synchronized methods
enqueue : It enqueue (push) Task to the queue
dequeue : This method takes (pop) the task from the queue.
package org.learn.Pool;
import java.util.LinkedList;
import java.util.Queue;
public class BlockingQueue<Type> {
private Queue<Type> queue = new LinkedList<Type>();
private int EMPTY = 0;
private int MAX_TASK_IN_QUEUE = -1;
public BlockingQueue(int size){
this.MAX_TASK_IN_QUEUE = size;
}
public synchronized void enqueue(Type task)
throws InterruptedException {
while(this.queue.size() == this.MAX_TASK_IN_QUEUE) {
wait();
}
if(this.queue.size() == EMPTY) {
notifyAll();
}
this.queue.offer(task);
}
public synchronized Type dequeue()
throws InterruptedException{
while(this.queue.size() == EMPTY){
wait();
}
if(this.queue.size() == this.MAX_TASK_IN_QUEUE){
notifyAll();
}
return this.queue.poll();
}
}
3.4. TestTask Class (To test thread pool)
TestTask simulates the task to be submitted to thread pool.
package org.learn.App;
public class TestTask implements Runnable {
private int number;
public TestTask(int number) {
this.number = number;
}
@Override
public void run() {
System.out.println("Start executing of task number :"+ number);
try {
//Simulating processing time
//perform tasks
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("End executing of task number :"+ number);
}
}
3.5. TestThreadPool class to validate thread pool
TestThreadPool class containing main function to test the thread pool.
package org.learn.App;
import org.learn.Pool.ThreadPool;
public class TestThreadPool {
public static void main(String[] args) throws InterruptedException {
//create queue size - 3
//Number of threads - 4
ThreadPool threadPool = new ThreadPool(3,4);
//Created 15 Tasks and submit to pool
for(int taskNumber = 1 ; taskNumber <= 7; taskNumber++) {
TestTask task = new TestTask(taskNumber);
threadPool.submitTask(task);
}
}
}
5. Output – thread pool example without executor framework in java
Task Started by Thread :Thread-2
Start executing of task number :2
Task Started by Thread :Thread-0
Start executing of task number :4
Task Started by Thread :Thread-1
Start executing of task number :3
Task Started by Thread :Thread-3
Start executing of task number :1
End executing of task number :2
End executing of task number :3
End executing of task number :4
Task Finished by Thread :Thread-0
Task Started by Thread :Thread-0
Task Finished by Thread :Thread-1
Task Finished by Thread :Thread-2
Task Started by Thread :Thread-1
Start executing of task number :6
End executing of task number :1
Start executing of task number :5
Task Finished by Thread :Thread-3
Task Started by Thread :Thread-2
Start executing of task number :7
End executing of task number :6
Task Finished by Thread :Thread-1
End executing of task number :7
Task Finished by Thread :Thread-2
End executing of task number :5
Task Finished by Thread :Thread-0