Multi-threaded file processing and database batch insertions


I have an Java main application which will read a file, line-by-line. Each line represents subscriber data.

name, email, mobile, ...

An subscriber object is created for each line being processed and then this object is persisted in database using JDBC.

PS: Input file has around 15 million subscriber data and application takes around 10-12 hours to process. I need to reduce this to around 2-3 hours as this task is an migration activity and down-time that we get is around 4-5 hours.

I know I need to use multiple thread / thread pool may be Java's native ExecuterService. But I am asked to do a batch update as well. Say taking a thread pool of 50 or 100 worker threads and batch update of 500-1000 subscribers.

I am familiar using ExecuterService but not getting an approach where I can have batch update logic too in it.

My overall application code looks like:

while (null != (line = getNextLine())) {
    Subscriber sub = getSub(line); // creates subscriber object by parsing the line
    persistSub(sub); // JDBC - PreparedStatement insert query executed

Need to know an approach where I can process it faster with multiple threads and using batch update or any existing frameworks or Java API's which can be used for such cases.


persistSub(sub) should not immediately access the database. Instead, it should store sub in an array of length 500-1000 and only when the array is full, or the input file terminated, wrap it in a Runnable and submit to a thread pool. The Runnable then accesses database via jdbc like it is described in JDBC Batching with PrepareStatement Object.


If writing into database is slow and input file reading is fast, many arrays with data can be created waiting to be written in the database, and system can run out of memory. So persistSub(sub) should keep track of the number of allocated arrays. The easiest way is to use a Semaphore inbitialized with allowed number of arrays. Before a new array is allocated, persistSub(sub) makes Semaphore.aquire(). Each Runnable task, before its end, makes Semaphore.release().

This video can help you solving your question :)
By: admin