Join Sample
The Pervasive DataRush release includes several sample applications located in the samples directory. The samples include the source code with sample input data. We'll use the join sample for this tutorial. The purpose of the join sample is to read in two, pre-sorted text files, join them together, and output the results of the join to a third file. This is a very simple example, but is useful to demonstrate several concepts within Pervasive DataRush.
Source code:
package example.join;
import com.pervasive.datarush.graphs.ApplicationGraph;
import com.pervasive.datarush.operators.CompositionContext;
import com.pervasive.datarush.operators.DataflowOperator;
import com.pervasive.datarush.operators.GraphFactory;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedTextProperties;
import com.pervasive.datarush.operators.io.textfile.WriteDelimitedText;
import com.pervasive.datarush.operators.io.textfile.WriteDelimitedTextProperties;
import com.pervasive.datarush.operators.join.JoinMode;
import com.pervasive.datarush.operators.join.JoinSortedRows;
public class JoinSample extends DataflowOperator {
String leftInputFile;
ReadDelimitedTextProperties leftInputProperties;
String rightInputFile;
ReadDelimitedTextProperties rightInputProperties;
JoinMode joinMode;
String[] keyNames = { "firstName", "lastName" };
String outputFile;
WriteDelimitedTextProperties outputProperties;
public static void main(String[] args) {
ApplicationGraph app = GraphFactory.newApplicationGraph("Join Sample");
app.add(new JoinSample());
app.run();
}
public JoinSample() {
this.leftInputFile = "leftInput.txt";
this.leftInputProperties = new ReadDelimitedTextProperties();
this.leftInputProperties.setHeader(true);
this.rightInputFile = "rightInput.txt";
this.rightInputProperties = new ReadDelimitedTextProperties();
this.rightInputProperties.setHeader(true);
this.joinMode = JoinMode.FULL_OUTER;
this.outputFile = "outputFile.txt";
this.outputProperties = new WriteDelimitedTextProperties();
}
@Override
protected void compose(CompositionContext ctx) {
// Add reader for left hand side of join
ReadDelimitedText readLeft = ctx.add(new ReadDelimitedText(leftInputFile, leftInputProperties));
// Add reader for right hand side of join
ReadDelimitedText readRight = ctx.add(new ReadDelimitedText(rightInputFile, rightInputProperties));
// Add join
JoinSortedRows join = ctx.add( new JoinSortedRows( readLeft.getOutput(),
readRight.getOutput(),
joinMode,
JoinKey.makeJoinKeys(keyNames,KeyNames)),
// Add writer for output of join
ctx.add(new WriteDelimitedText(join.getOutput(), outputFile,
outputProperties), "write");
}
As an alternative to writing a Pervasive DataRush Java application, the developer can also use the JavaScript interface for a simplified interface into Pervasive DataRush.
An example of the above code below is:
// Read the unit price data
leftInput = readDelim('leftInput.txt', {header:true})
// Read the sales data
rightInput = readDelim('rightInput.txt', {header:true})
// Define the keys to use for the join. In this example, the sources have
// utilize the same field names.
joinKeys=["PRODUCT_ID", "CHANNEL_NAME"]
// Join the two data sources. Note that the order of using the data sources
// is important. With the join operator, the flow used to invoke join is
// the left hand side flow and the flow that is the first parameter is the
// right hand side flow. This is important when using joins like left
// outer or right outer.
output = leftInput.join(rightInput, joinKeys, joinKeys, 'FULL_OUTER')
// Write the output of the join operation to a delimited text file.
output.writeDelim('target/scratch/JoinSampleOutput.txt', {mode:'overwrite', header:true})
A DataflowOperator that exposes a main can be invoked directly using the Pervasive DataRush dr script (found in the bin directory). The dr script builds up the command line needed to invoke the JVM to run the wanted application. For example, to run the join application, the following command line can be used:
dr example.join.JoinSample
The composition of the join application happens in the JoinSample constructor. Since JoinSample extends DataflowOperator, it requires an overridden compose method. This takes in a CompositionContext. This compose method allows the developer to build graphs and has exposed to it the add method. This method takes either a DataflowOperator or a DataflowNode to add to the composition of JoinSample. Looking at the constructor code, we can see that it first creates and then adds a ReadDelimitedText operator.
This operator reads a text file in delimited form and outputs a dataflow containing the data from the file. Another ReadDelimitedText operator is constructed and added to the graph. This one reads the second file to be joined. Next, the JoinSortedRows operator is created and added to the graph. This operator does the join of the data. Note that we are assuming the two input data sources are already joined. The first two parameters of the join operator constructor are the two dataflows to join together. The join operator is passed the output of the first text reader and the output of the second text reader. In this way, the operators are linked together. Next, a WriteDelimitedText operator is created and added to the graph. This operator reads from a dataflow and writes to a delimited text file. Note that the writer takes the output dataflow of the join as its input. The writer does not produce a dataflow output.
The DataflowOperator implementations are composite operators, including ApplicationGraph. They can contain composite operators that may contain composite operators and so on. We can see this behavior in the above example: a DataflowGraph is created and the JoinSample is added to it. JoinSample contains several composite operators which themselves may use other composites.
When the JoinSample graph is executed, the readers will commence to reading and parsing their input files and stuffing data onto their output queues. In parallel, the join operator will read its two input data queues, join the incoming data by the specified keys fields, and output the data to its output queue. The writer will read data from the join operator and write the data to an output text file. All of the components are able to run in independent threads in pipe-lined fashion, hence taking advantage of multiple CPUs in parallel.
» Back to Top