Apache Calcite Tutorial
Published
Updated
Introduction to Apache Calcite
Apache Calcite is a Java-based framework that provides a unified SQL query engine for processing queries in various storage engines. Calcite is intended to be embeddable, flexible, and extensible; and is intended for use at the core of next-generation, big-data products.
Calcite contains its own SQL Parser Layer, optimizers, and JDBC layer - Calcite’s sister / sub project Apache Avatica. Using this, Calcite can perform dynamic SQL generation, SQL parsing and validity checking, query optimization and execution, and cross-database SQL query conversion.
Apache Calcite is database agnostic and has support for a wide variety of systems such as MySQL, PostgreSQL, SparkQL, MongoDB, ElasticSearch, CSV, Parquet, and much more.
Relational Algebra in Calcite Simplified
Part of databases and database theory, relational algebra defines the core concepts and operations found in modern database products. By utilizing relational algebraic operations such as, table selections, unions, Cartesian products, etc; Calcite is able to convert SQL into these relational expressions, perform optimizations using rule matching and planners, and then generate cost models for each. This allows sophisticated queries to become more optimally executed - all while the query result remains unchanged.
Calcite Planners and Optimizers
There are two optimizers provided within Calcite:
- HepPlanner - This heuristic planner is a rules based optimizer which attempts to match a number of rules to the query that can improve the performance through a number of different methods. This is similar in function to the RBO provided within Oracle.
- VolcanoPlanner - This is a cost based optimizer which iterates through different rules and applies them to the query in different combinations until it can find a plan with the most efficient cost. Not all different plan permutations can be established, the optimizer will stop after a certain number of iterations or if the cost ceases to improve during the runs.
Getting Started with Calcite
For the following examples, you will need the provided dependencies included in your project’s Maven Pom.xml file:
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.26.0</version>
</dependency>
<dependency>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica-core</artifactId>
<version>1.17.0</version>
</dependency>
Additionally, you will need to include any appropriate Database driver dependencies. In this example, I am going to use Postgres and MySQL to perform a single database query across two different JDBC data sources:
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.19</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.22</version>
</dependency>
Calcite RelBuilder
Apache Calcite’s RelBuilder
class is used to construct sophisticated relational expressions which can then be translated into SQL.
RelBuilder Field
The relBuilder.field method is used to create a RexInputRef
object is used as a reference to a field within a relational expression. This method can be used in a number of different ways depending on the context. Let’s take a look at a few different ways this can be used.
The following code takes two tables: cats
and dogs
within the database petDB
and attempts to perform a left join on them using the column called petID
:
relBuilder.scan("petDB", "cats")
.scan("petDB", "dogs").as("dogTableAlias")
.join(JoinRelType.LEFT,
relBuilder.equals(
relBuilder.field(2, 0, "petID"),
relBuilder.field(2, "dogTableAlias", "petID")
)
);
The SQL equivalent of this would be:
SELECT * FROM "cats"
LEFT JOIN "dogs" ON "cats"."petID" = "dogs"."petID";
Looking at the first field
method, you will notice there are 3 parameters being passed.
relBuilder.field(2, 0, "petID")
- The first parameter
2
is theinputCount
. This number should align with the number of scans which are presently on the stack of the relBuilder. In our situation, there are 2 as we calledrelBuilder.scan()
twice in succession. - The second parameter is what is referred to as the
inputOrdinal
. This is a 0-based index that aligns to the scan (or table) which you want to refer to. Since we want to refer to thecats
table, this would be0
. Should we want to refer to thedogs
table, we could set it to1
. - The third parameter is the column name. This can either be a String name of the column, (in this instance,
petID
). Alternatively, it can also be afieldOrdinal
which is a numerical reference to the order of the fields in which they exist in the table.
RelBuilder Table Aliases using AS
Note the above example in which we scanned the dogs
table:
.scan("petDB", "dogs").as("dogTableAlias")
By chaining the as
method onto the scan
method, we are effectively creating a table alias (dogTableAlias
) for us to reference later on in our relational expression.
We can then reference this alias in our field
call in lieu of the inputOrdinal
:
relBuilder.field(2, "dogTableAlias", "petID")
In contrast, the following code would have referenced the same thing:
relBuilder.field(2, 1, "petID")
Apache Calcite Examples
The following are a number of different applications using Apache Calcite to interact with the database layer. These are intended as real-world usages and examples of how to implement Calcite into a production level application.
Parse and Validate SQL using Calcite
The following code will parse a provided SQL string, in this case SELECT * FROM...
and make sure that it is syntactically correct. Since we are passing it to the underlying data source, it will throw an error if the provided table does not exist, or if the query violates the structure of the underlying schema.
public void parseAndValidateSQL() throws SQLException, SqlParseException, ValidationException {
// Build our connection
Connection connection = DriverManager.getConnection("jdbc:calcite:");
// Unwrap our connection using the CalciteConnection
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
// Define our parser Configuration
SqlParser.Config parserConfig = SqlParser.config();
// Get a pointer to our root schema for our Calcite Connection
SchemaPlus rootSchema = calciteConnection.getRootSchema();
// Attach our Postgres Jdbc Datasource to our Root Schema
rootSchema.add("EXAMPLESCHEMA", JdbcSchema.create(rootSchema, "EXAMPLESCHEMA", mockDataSource, null, null));
Frameworks.ConfigBuilder config = Frameworks.newConfigBuilder()
.defaultSchema(rootSchema)
.parserConfig(parserConfig)
.context(Contexts.of(calciteConnection.config()));
Planner planner = Frameworks.getPlanner(config.build());
SqlNode node = planner.parse("SELECT * FROM EXAMPLESCHEMA.MYTEMPTABLE");
SqlNode validateNode = planner.validate(node);
SqlWriter writer = new SqlPrettyWriter();
validateNode.unparse(writer, 0,0);
// Print out our formatted SQL to the console
System.out.println(ImmutableList.of(writer.toSqlString().getSql()));
}
Apache Calcite example for MySQL and PostgreSQL
Apache Calcite’s speciality lies in its ability to query from multiple database sources using a single query. For example, you can easily interact with data that is stored in MySQL and ElasticSearch, using a single query. This data can be joined, aggregated, and returned as a single result set all in an optimized manner using Calcite’s robust Volcano and Hep Planners.
import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.interpreter.Bindables;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.hep.HepPlanner;
import org.apache.calcite.plan.hep.HepProgram;
import org.apache.calcite.rel.RelHomogeneousShuttle;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttle;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelRunner;
import org.verdictdb.commons.DBTablePrinter;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class CalciteJdbcExample {
private static final String POSTGRESQL_SCHEMA = "PUBLIC";
private static final String MYSQL_SCHEMA = "mysql";
public static void main(String[] args) throws Exception {
// Build our connection
Connection connection = DriverManager.getConnection("jdbc:calcite:");
// Unwrap our connection using the CalciteConnection
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
// Get a pointer to our root schema for our Calcite Connection
SchemaPlus rootSchema = calciteConnection.getRootSchema();
// Instantiate a data source, this can be autowired in using Spring as well
DataSource postgresDataSource = JdbcSchema.dataSource(
"jdbc:postgresql://localhost/db",
"org.postgresql.Driver", // Change this if you want to use something like MySQL, Oracle, etc.
"postgres", // username
"example" // password
);
// Instantiate a data source, this can be autowired in using Spring as well
DataSource mysqlDataSource = JdbcSchema.dataSource(
"jdbc:mysql://localhost/db",
"com.mysql.jdbc.Driver", // Change this if you want to use something like MySQL, Oracle, etc.
"Username", // username
"Password" // password
);
// Attach our Postgres Jdbc Datasource to our Root Schema
rootSchema.add(POSTGRESQL_SCHEMA, JdbcSchema.create(rootSchema, POSTGRESQL_SCHEMA, postgresDataSource, null, null));
// Attach our MySQL Jdbc Datasource to our Root Schema
rootSchema.add(MYSQL_SCHEMA, JdbcSchema.create(rootSchema, MYSQL_SCHEMA, mysqlDataSource, null, null));
// Build a framework config to attach to our Calcite Planners and Optimizers
FrameworkConfig config = Frameworks.newConfigBuilder()
.defaultSchema(rootSchema)
.build();
RelBuilder rb = RelBuilder.create(config);
RelNode node = rb
// First parameter is the Schema, the second is the table name
.scan("PUBLIC", "TABLE_NAME_IN_POSTGRES")
.scan("mysql", "TABLE_NAME_IN_MYSQL")
// If you want to select from more than one table, you can do so by adding a second scan parameter
.filter(
rb.equals(rb.field("fieldname"), rb.literal("literal"))
)
// These are the fields you want to return from your query
.project(
rb.field("id"),
rb.field("col1"),
rb.field("colb")
)
.build();
HepProgram program = HepProgram.builder().build();
HepPlanner planner = new HepPlanner(program);
planner.setRoot(node);
RelNode optimizedNode = planner.findBestExp();
final RelShuttle shuttle = new RelHomogeneousShuttle() {
@Override public RelNode visit(TableScan scan) {
final RelOptTable table = scan.getTable();
if (scan instanceof LogicalTableScan && Bindables.BindableTableScan.canHandle(table)) {
return Bindables.BindableTableScan.create(scan.getCluster(), table);
}
return super.visit(scan);
}
};
optimizedNode = optimizedNode.accept(shuttle);
final RelRunner runner = connection.unwrap(RelRunner.class);
PreparedStatement ps = runner.prepare(optimizedNode);
ps.execute();
ResultSet resultSet = ps.getResultSet();
DBTablePrinter.printResultSet(resultSet);
}
}
Unit Test Example for Calcite
The easiest way to obtain good unit test coverage over your application when using Calcite is to perform mocks using HSQLDB. In the below example, I define a mock HSQL data source and instantiate it using Calcite’s JdbcSchema
datasource call. Additionally, I use a RelBuilder
to access the data from my temporary database and then actually execute a prepared statement using Calcite’s Relational Expression engine.
import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelRunner;
import org.junit.Before;
import org.junit.Test;
import org.verdictdb.commons.DBTablePrinter;
import javax.sql.DataSource;
import java.sql.*;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.*;
public class CalciteTests {
private String jdbcUrl;
private Connection connection;
private DataSource mockDataSource;
@Before
public void setupTests() throws SQLException {
jdbcUrl = MockDb.INSTANCE.getUrl();
connection = DriverManager.getConnection(jdbcUrl, "", "");
// You can populate test data into the database like so:
try (Statement s = connection.createStatement()) {
s.execute("create table mytemptable(" +
"id integer not null primary key," +
"exampleFoo varchar(25)," +
"exampleBar varchar(25))");
s.execute("insert into mytemptable values(1, 'test', '1234')");
s.execute("insert into mytemptable values(2, 'test2', 'xyz')");
}
mockDataSource = JdbcSchema.dataSource(jdbcUrl, "org.hsqldb.jdbcDriver", "", "");
}
@Test
public void calciteUnitTestExample() throws SQLException {
// Build our connection
Connection connection = DriverManager.getConnection("jdbc:calcite:");
// Unwrap our connection using the CalciteConnection
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
// Get a pointer to our root schema for our Calcite Connection
SchemaPlus rootSchema = calciteConnection.getRootSchema();
// Attach our Postgres Jdbc Datasource to our Root Schema
rootSchema.add("exampleSchema", JdbcSchema.create(rootSchema, "exampleSchema", mockDataSource, null, null));
FrameworkConfig config = Frameworks.newConfigBuilder()
.defaultSchema(rootSchema)
.build();
RelBuilder r = RelBuilder.create(config);
RelNode node = r
// First parameter is the Schema, the second is the table name
.scan("exampleSchema", "MYTEMPTABLE")
// If you want to select from more than one table, you can do so by adding a second scan parameter
.filter(
r.equals(r.field("ID"), r.literal(1))
)
// These are the fields you want to return from your query
.project(
r.field("ID"),
r.field("EXAMPLEFOO"),
r.field("EXAMPLEBAR")
)
.build();
RelRunner runner = connection.unwrap(RelRunner.class);
PreparedStatement ps = runner.prepare(node);
ps.execute();
ResultSet resultSet = ps.getResultSet();
DBTablePrinter.printResultSet(resultSet);
}
static class MockDb {
MockDb() {}
static final MockDb INSTANCE = new MockDb();
private final AtomicInteger id = new AtomicInteger(1);
public String getUrl() {
return "jdbc:hsqldb:mem:db" + id.getAndIncrement();
}
}
}