com.almworks.sqlite4java
Class SQLiteQueue

java.lang.Object
  extended by com.almworks.sqlite4java.SQLiteQueue

public class SQLiteQueue
extends java.lang.Object

SQLiteQueue is a basic implementation of job queue for an SQLite connection. It provides multi-threaded or GUI application with asynchronous execution of database tasks in a single separate thread with a single SQLiteConnection.

The queue is started and stopped using start() and stop(boolean) methods correspondingly. Each database task is represented by a subclass of SQLiteJob. A task is scheduled for execution by execute(J) method. Tasks are served on first-come, first-serve basis.

Public methods of SQLiteQueue are thread-safe, unless noted otherwise.

When writing tasks, it's a good practice to keep transaction boundaries within single task. That is, if you BEGIN TRANSACTION in the task, make sure you COMMIT or ROLLBACK in the end. Otherwise, your transaction will remain unfinished, locks held, and you possible wouldn't know which job will execute next in the context of this unfinished transaction.

SQLiteQueue may be subclassed in order to change certain behavior. If you need some things to be done differently, look for a protected method to override. For example, you can implement a priority queue instead of FIFO queue.

SQLiteQueue and SQLiteJob are written to handle exceptions and errors in a controlled way. In particular, if the queue thread terminates abnormally, SQLiteQueue will try to "reincarnate" by starting another thread and opening another connection to the database. All queued tasks (except for the one that caused the problem) should survive the reincarnation and execute in the new thread.

Reincarnation is not possible for in-memory database, since the database is lost after connection closes.

Some examples:

 void start() {
   myQueue = new SQLiteQueue(myDatabaseFile);
   myQueue.start();
 }
 

int getTableRowCount(final String tableName) { return myQueue.execute(new SQLiteJob<Integer>() { protected Integer job(SQLiteConnection connection) throws SQLiteException { SQLiteStatement st = connection.prepare("SELECT COUNT(*) FROM " + tableName); try { st.step(); return st.columnInt(0); } finally { st.dispose(); } } }).complete(); }

Author:
Igor Sereda
See Also:
SQLiteJob

Field Summary
static long DEFAULT_REINCARNATE_TIMEOUT
          Default timeout for reincarnating database thread.
protected  java.util.Collection<SQLiteJob> myJobs
          Stores queued jobs.
 
Constructor Summary
SQLiteQueue()
          Constructs the queue, which will use an in-memory database.
SQLiteQueue(java.io.File databaseFile)
          Constructs the queue.
SQLiteQueue(java.io.File databaseFile, java.util.concurrent.ThreadFactory threadFactory)
          Constructs the queue and allows to specify a factory for the queue thread.
 
Method Summary
protected  void addJob(SQLiteJob job)
          Adds a job to the job collection.
protected  void afterExecute(SQLiteJob job)
          Do some work after job.execute() finished.
protected  java.util.Collection<SQLiteJob> createJobCollection()
          Creates a new collection for storing pending jobs.
protected  void disposeConnection(SQLiteConnection connection)
          Disposes the connection.
<T,J extends SQLiteJob<T>>
J
execute(J job)
          Places a job in the queue for asynchronous execution in database thread.
protected  void executeJob(SQLiteJob job)
          Runs the job with the current connection.
 SQLiteQueue flush()
          Waits until all jobs in the queue are executed.
 java.io.File getDatabaseFile()
          Get the underlying database file.
protected  long getReincarnationTimeout()
          Provides reincarnation timeout (the period to wait before reincarnating abnormally stopped queue thread).
protected  void handleJobException(SQLiteJob job, java.lang.Throwable e)
          Do some work if job threw an exception.
protected  void initConnection(SQLiteConnection connection)
          Initialize a new connection.
 boolean isDatabaseThread()
          Checks if the current thread is the thread that runs the queue's database connection.
protected  boolean isJobQueueEmpty()
          Checks if there are no more pending jobs.
protected  boolean isReincarnationPossible()
          Checks if reincarnation should be attempted after queue thread terminates abnormally.
 boolean isStopped()
          Checks if the queue is stopped.
 SQLiteQueue join()
          Waits for the queue to stop.
protected  SQLiteConnection openConnection()
          Creates and opens a connection to the database.
protected  void reincarnate(long reincarnateTimeout)
          Reincarnates the queue.
protected  java.util.List<SQLiteJob> removeJobsClearQueue()
          Clears the queue and returned removed jobs.
protected  void rollback()
          Rolls back current transaction.
protected  SQLiteJob selectJob()
          Selects the next job from pending jobs to be executed.
 SQLiteQueue start()
          Starts the queue by creating a new thread, opening connection in that thread and executing all jobs there.
 SQLiteQueue stop(boolean gracefully)
          Stops the queue.
 java.lang.String toString()
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

DEFAULT_REINCARNATE_TIMEOUT

public static final long DEFAULT_REINCARNATE_TIMEOUT
Default timeout for reincarnating database thread.

See Also:
Constant Field Values

myJobs

protected java.util.Collection<SQLiteJob> myJobs
Stores queued jobs.

protected by myLock

Constructor Detail

SQLiteQueue

public SQLiteQueue()
Constructs the queue, which will use an in-memory database.

The queue must be started in order for jobs to be executed.

See Also:
start()

SQLiteQueue

public SQLiteQueue(java.io.File databaseFile)
Constructs the queue. SQLiteQueue will use SQLiteConnection.open(boolean) method to create a connection within queue thread.

The queue must be started in order for jobs to be executed.

Parameters:
databaseFile - database file to connect to, or null to open an in-memory database
See Also:
start()

SQLiteQueue

public SQLiteQueue(java.io.File databaseFile,
                   java.util.concurrent.ThreadFactory threadFactory)
Constructs the queue and allows to specify a factory for the queue thread.

Parameters:
databaseFile - database file to connect to, or null to open an in-memory database
threadFactory - the factory for thread(s), cannot be null
Method Detail

toString

public java.lang.String toString()
Overrides:
toString in class java.lang.Object

getDatabaseFile

public java.io.File getDatabaseFile()
Get the underlying database file.

Returns:
database file or null if queue is working on an in-memory database

start

public SQLiteQueue start()
Starts the queue by creating a new thread, opening connection in that thread and executing all jobs there.

The queue will remain active until stop(boolean) method is called, or until it is terminated by non-recoverable error.

Calling this method second time does not have any effect. A queue cannot be started after it has stopped.

Any jobs added to the queue prior to start() will be carried out.

This method is thread-safe: it may be called from any thread.

Returns:
this queue
Throws:
java.lang.IllegalStateException - if threadFactory failed to produce a new thread

stop

public SQLiteQueue stop(boolean gracefully)
Stops the queue. After this method is called, no more jobs are accepted in execute(J) method. The thread and connection are finished and disposed.

If gracefully parameter is true, the currently queued jobs will be executed before queue stops. Otherwise, any pending jobs are cancelled, and the currently running job may be cancelled to. (If the currently running job is ignorant of job.isCancelled() status and does not run a long SQL statement, it still may finish normally.)

After call to stop(true) you can call stop(false) to force non-gracefull shutdown. Other than that, calling stop() second time has no effect.

If the queue hasn't been started, it will not be able to start later.

This method is thread-safe: it may be called from any thread. It finishes immediately, while actual stopping of the queue happening asynchronously. If you need to wait until queue is fully stopped, use join() method after you called stop().

Parameters:
gracefully - if true, jobs already queued will be executed, then the queue will stop
Returns:
this queue

join

public SQLiteQueue join()
                 throws java.lang.InterruptedException
Waits for the queue to stop. The method uses Thread.join(long) method to join with the queue thread.

Note that this method does not stop the queue. You need to call stop(boolean) explicitly.

If queue has not been started, the method returns immediately.

Returns:
this queue
Throws:
java.lang.InterruptedException - if the current thread is interrupted
java.lang.IllegalStateException - if called from the queue thread

execute

public <T,J extends SQLiteJob<T>> J execute(J job)
Places a job in the queue for asynchronous execution in database thread.

The added job's SQLiteJob.job(com.almworks.sqlite4java.SQLiteConnection) method will be called from the database thread with an instance of SQLiteConnection. Job may provide a return value, which will be treated as the job result.

The queue must be started in order for jobs to start executing. Jobs may be added to the queue before or after the queue is started. However, if the queue is already stopped, the job will be immediately cancelled. (It will receive SQLiteJob.jobCancelled() and SQLiteJob.jobFinished(T) callbacks before this method finishes.)

Because this method returns the argument, you can chain this method with other methods in SQLiteJob or in its subclass:

   MyResult r = myQueue.execute(new SQLiteJob<MyResult>() { ... }).complete();
 

Type Parameters:
T - class of the job's result; use Object or Void if no result is needed
J - job class
Parameters:
job - the job to be executed on this queue's database connection, must not be null
Returns:
job
See Also:
SQLiteJob

flush

public SQLiteQueue flush()
                  throws java.lang.InterruptedException
Waits until all jobs in the queue are executed.

Returns:
this instance
Throws:
java.lang.InterruptedException - if the current thread is interrupted

isStopped

public boolean isStopped()
Checks if the queue is stopped.

Returns:
true if the queue was requested to stop or has stopped

isDatabaseThread

public boolean isDatabaseThread()
Checks if the current thread is the thread that runs the queue's database connection.

Returns:
true if the current thread is the database thread

addJob

protected void addJob(SQLiteJob job)
Adds a job to the job collection. Override to change the logic or order of jobs.

This method is called under synchronized lock and must not call any listeners or alien code.

Parameters:
job - the job to be added to myJobs, the latter possible being null

createJobCollection

protected java.util.Collection<SQLiteJob> createJobCollection()
Creates a new collection for storing pending jobs. Override to change the queue logic.

This method is called under synchronized lock and must not call any listeners or alien code.

Returns:
an instance of collection for jobs

isJobQueueEmpty

protected boolean isJobQueueEmpty()
Checks if there are no more pending jobs. Override to change the queue logic.

This method is called under synchronized lock and must not call any listeners or alien code.

Returns:
true if there are no pending jobs

removeJobsClearQueue

protected java.util.List<SQLiteJob> removeJobsClearQueue()
Clears the queue and returned removed jobs. Override to change the queue logic.

After this method is called, isJobQueueEmpty() must return true.

This method is called under synchronized lock and must not call any listeners or alien code.

Returns:
non-null list of removed jobs

selectJob

protected SQLiteJob selectJob()
Selects the next job from pending jobs to be executed. Override to change the queue logic.

This method is called under synchronized lock and must not call any listeners or alien code.

Returns:
null if there are no pending jobs, or the job for execution

openConnection

protected SQLiteConnection openConnection()
                                   throws SQLiteException
Creates and opens a connection to the database. Override to change how database connection is opened.

If this method throws an exception, the queue thread will terminate and possible reincarnate to try again.

Returns:
a new connection, not null, that can be used in the current thread
Throws:
SQLiteException - if connection cannot be created
See Also:
initConnection(com.almworks.sqlite4java.SQLiteConnection)

initConnection

protected void initConnection(SQLiteConnection connection)
                       throws SQLiteException
Initialize a new connection. Override to provide additional initialization code, for example executing initializing SQL.

If this method throws an exception, the queue thread will terminate and possible reincarnate to try again.

Parameters:
connection - freshly opened database connection
Throws:
SQLiteException - if any initialization code fails

disposeConnection

protected void disposeConnection(SQLiteConnection connection)
Disposes the connection. Override to change how connection is disposed.

Parameters:
connection - database connection no longer in use by the queue

rollback

protected void rollback()
Rolls back current transaction. This method is called after exception is caught from a job, or after job is cancelled. Override to change how to handle these two situations.


executeJob

protected void executeJob(SQLiteJob job)
                   throws java.lang.Throwable
Runs the job with the current connection.

Parameters:
job - next job from the queue
Throws:
java.lang.Throwable - any kind of problem

afterExecute

protected void afterExecute(SQLiteJob job)
                     throws java.lang.Throwable
Do some work after job.execute() finished. By default, performs rollback after a cancelled job.

Parameters:
job - finished job
Throws:
java.lang.Throwable - any kind of problem

handleJobException

protected void handleJobException(SQLiteJob job,
                                  java.lang.Throwable e)
                           throws java.lang.Throwable
Do some work if job threw an exception. By default, rolls back and ignores the exception.

Parameters:
job - erred job
e - exception thrown by the job
Throws:
java.lang.Throwable - any kind of problem

getReincarnationTimeout

protected long getReincarnationTimeout()
Provides reincarnation timeout (the period to wait before reincarnating abnormally stopped queue thread).

Returns:
reincarnation timeout

isReincarnationPossible

protected boolean isReincarnationPossible()
Checks if reincarnation should be attempted after queue thread terminates abnormally.

Returns:
true if reincarnation should be attempted

reincarnate

protected void reincarnate(long reincarnateTimeout)
Reincarnates the queue. This implementation starts a new thread, which waits for some time and then restarts database thread.

Parameters:
reincarnateTimeout - time to wait