Skip to content

Theta Join usage & implementation

Aleksandar Vitorovic edited this page May 30, 2016 · 56 revisions

Prerequisite : the content of this page is a continuation of Imperative Squall interface and is based on the concepts detailed in it. Theta-Join concepts are presented in the publication from A. Okcan and M. Riedewald Processing Theta-Join using MapReduce [1] and briefly described in our Squall overview.

This section is divided into two parts. The first part describes how query plans using Theta-Join can be written, more specifically how join conditions are specified, as it is the main difference compared to the standard hash-join. The second part provides explanation on how Theta-Join is implemented in Squall.

##Theta-Join query plans

Currently, query plans containing Theta-Joins cannot be automatically generated from SQL. Indeed, these query plans have to be specified manually, using Imperative Squall interface. There is however a important difference when specifying the join component: instead of EquiJoinComponent, we use ThetaJoinComponent.

Let's take an example query, based on the TPC-H schema :

    SELECT SUM(LINEITEM.EXTENDEDPRICE)
    FROM LINEITEM, ORDERS
    WHERE 10*LINEITEM.EXTENDEDPRICE < ORDERS.TOTALPRICE

The join condition is an inequality predicate which is cannot be evaluated by traditional key partitioning as in equi-joins. This is typically a condition that can be handled by Theta-Join efficiently.

The configurations files from before can be used the same way but need a small addition. An estimated cardinality must be specified for each table that is joined using Theta-Join in the query plan. If the query plan contains a Theta-Join from previously Theta-Joined tables, their estimated cardinality must be specified as well. This cardinality is used to generate the mapping described in [1] and achieve a good load balancing (details provided in the next section). Nevertheless, wrong values do not have significant impact on performance. If no estimate is available, it is better to underestimate the cardinality of a relation.

Cardinality are specified with the following format :

    TABLENAME_CARD <cardinality>

In our example, we have to specify the estimated cardinality for LINEITEM and ORDERS :

    ORDERS_CARD 10000
    LINEITEM_CARD 10000

If we wanted to use the result of this join in a second Theta-Join, we would specify the cardinality the following way :

    LINEITEM_ORDERS_CARD 100

We now present the corresponding Java query plan using ThetaJoinComponent :

	ProjectOperator projectionLineitem = new ProjectOperator(0, 5);
	DataSourceComponent relationLineitem = new DataSourceComponent("LINEITEM", conf)
				.add(projectionLineitem);
	
	ProjectOperator projectionOrders = new ProjectOperator(0, 3);	
	DataSourceComponent relationOrders = new DataSourceComponent("ORDERS", conf)
				.add(projectionOrders);
		
	// 10*LINEITEM.EXTENDEDPRICE < ORDERS.TOTALPRICE
	//-----------------------------------------------
	
	ValueSpecification value10 = new ValueSpecification(_doubleType, 10);
	ColumnReference colLineitemsInequality = new ColumnReference(_doubleType, 1);
	Multiplication mult = new Multiplication(value10, colLineitemsInequality);
	ColumnReference colOrdersInequality = new ColumnReference(_doubleType, 1);
	ComparisonPredicate inequalityPredicate = new ComparisonPredicate(
                                                      ComparisonPredicate.LESS_OP,
                                                      mult, colOrdersInequality);
		
	// SUM(LINEITEM.EXTENDEDPRICE)
	//-----------------------------------------------
	
	AggregateOperator sumAgg =
			new AggregateSumOperator(new ColumnReference(_doubleType, 0), conf);
	
	// JOIN
	//-----------------------------------------------
	
	ThetaJoinComponent LINEITEMS_ORDERSjoin = new ThetaJoinComponent(
				relationLineitem,
				relationOrders,
				_queryPlan).setJoinPredicate(inequalityPredicate)
				           .addOperator(new ProjectionOperator(1))
				           .addOperator(sumAgg);

This example introduces the use of classes of packages expressions and predicates. The expressions package provides multiple classes to represent the standard operations (addition, subtraction, multiplication) as well as ValueSpecification that is used to represent a constant.

##Theta-Join implementation

The implementation of Theta-Join uses matrix mapping [1] as its partitioning scheme. This mapping is computed at the beginning of the join and determines which tuples are sent to which nodes (at least one). The mapping, represented as a matrix, is split into regions, each of which is assigned to one node. The number of regions corresponds to the number of nodes (parallelism) used for the Theta-Join. Rows correspond to tuples of the first relation and columns to tuples of the second relation. Finding a good mapping is important in terms of performance. It directly influences the resulting level of load balancing of the system as well as the proportion of network and processing power usage. When a tuple from the first relation arrives to the join component, a row of the matrix is selected at random. Then, the tuple is sent to all the nodes responsible for the regions in the row. A good partitioning of the matrix is a partitioning in which: a) all regions have almost the same area - since the area correspond to the amount of data to be processed, and b) the regions' share is as close as possible to squares - to keep the replication and network transfers reasonably low.

References

[1] Processing Theta-Join using MapReduce