If I had taken a sequential processing approach to performing this task I would not have fully utilized the full power of even a single core processor let alone the multiple cores/processors I had available.
At university my final year project was to implement the Communicating Sequential Processes paradigm of parallel programming used in the OCCAM programming language in Java. The result of this was the Java Communicating Sequential Processes library.
I decided that this problem was a perfect task to use a CSP style library. As the JCSP library was developed in Java 1.1 times I created another library using generics so I didn't have to cast all the time, this library is available in my Subversion repository under the rs-parallel project.
The basic principals behind CSP is that instead of using thread synchronization (monitors) and shared variables to communicate between threads you use channels to communicate between threads. The Java thread synchronization and shared variables are then abstracted by the channel construct. A channel has two basic methods read and write. When you write to a channel you wait until someone has read from the channel (except in the case of a buffered channel, in which case you only block if the buffer is full). Likewise when you read from a channel you wait until data is available to be read from the channel.
If we looked at a simple process to read objects from a file and write them out to a Channel the code would be as shown below. As you can see this is a simple loop which loops through all objects from the reader and writes them to the channel.
public void run(ChannelThe corresponding writer would look something like the followingout) {
for (DataObject object : reader) {
out.write(object);
}
}
public void run(ChannelEach step in the transformation pipeline would then be implemented as a process using the following pattern.in) {
while (true) {
DataObject object = in.read();
writer.write(object);
}
}
public void run(ChannelEach one of the above processes would be run in a separate thread and the channel of one process would be used as the input channel for the next process. In the package com.revolsys.parallel.process the Process interface is used instead of Runnable and the ProcessNetwork class is used to manage creating the Threads.in, Channel out) {
while (true) {
DataObject object = in.read();
// Do something with object
out.write(object);
}
}
To use the ProcessNetwork you would use the following code to create a network and start all the processes, waiting until they are finished.
ProcessNetwork network = new ProcessNetwork();
readProcess = new ReaderProcess();
transformProcess = new TransformProcess();
writeProcess = new WriterProcess();
transformProcess.setIn(readProcess.getOut());
writeProcess.setIn(transformProcess.getOut());
network.add(readProcess);
network.add(transformProcess);
network.add(writeProcess);
network.startAndWait();
In the above examples it shows the basic principals behind the library, there are some additional steps to connect and disconnect from reading and writing to a channel. This is required so that the network can shut itself down when no more data is required to be processed. If you look in the com.revolsys.parallel.process package at the AbstractOutProcess, AbstractInProcess or AbstractInOutProcess they show these extra steps and can be used as base classes for your processes so you don't have to deal with many of these issues.
Hopefully at some point I'll get to documenting the library itself but for now hopefully this is a good introduction.
Paul