Apache Calcite Tutorial

Published
Updated

Introduction to Apache Calcite

Apache Calcite is a Java-based framework that provides a unified SQL query engine for processing queries in various storage engines. Calcite is intended to be embeddable, flexible, and extensible; and is intended for use at the core of next-generation, big-data products.

Calcite contains its own SQL Parser Layer, optimizers, and JDBC layer - Calcite’s sister / sub project Apache Avatica. Using this, Calcite can perform dynamic SQL generation, SQL parsing and validity checking, query optimization and execution, and cross-database SQL query conversion.

Apache Calcite is database agnostic and has support for a wide variety of systems such as MySQL, PostgreSQL, SparkQL, MongoDB, ElasticSearch, CSV, Parquet, and much more.

Relational Algebra in Calcite Simplified

Part of databases and database theory, relational algebra defines the core concepts and operations found in modern database products. By utilizing relational algebraic operations such as, table selections, unions, Cartesian products, etc; Calcite is able to convert SQL into these relational expressions, perform optimizations using rule matching and planners, and then generate cost models for each. This allows sophisticated queries to become more optimally executed - all while the query result remains unchanged.

Calcite Planners and Optimizers

There are two optimizers provided within Calcite:

  • HepPlanner - This heuristic planner is a rules based optimizer which attempts to match a number of rules to the query that can improve the performance through a number of different methods. This is similar in function to the RBO provided within Oracle.
  • VolcanoPlanner - This is a cost based optimizer which iterates through different rules and applies them to the query in different combinations until it can find a plan with the most efficient cost. Not all different plan permutations can be established, the optimizer will stop after a certain number of iterations or if the cost ceases to improve during the runs.

Getting Started with Calcite

For the following examples, you will need the provided dependencies included in your project’s Maven Pom.xml file:

<dependency>
    <groupId>org.apache.calcite</groupId>
    <artifactId>calcite-core</artifactId>
    <version>1.26.0</version>
</dependency>
<dependency>
    <groupId>org.apache.calcite.avatica</groupId>
    <artifactId>avatica-core</artifactId>
    <version>1.17.0</version>
</dependency>

Additionally, you will need to include any appropriate Database driver dependencies. In this example, I am going to use Postgres and MySQL to perform a single database query across two different JDBC data sources:

<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.2.19</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.22</version>
</dependency>

Calcite RelBuilder

Apache Calcite’s RelBuilder class is used to construct sophisticated relational expressions which can then be translated into SQL.

RelBuilder Field

The relBuilder.field method is used to create a RexInputRef object is used as a reference to a field within a relational expression. This method can be used in a number of different ways depending on the context. Let’s take a look at a few different ways this can be used.

The following code takes two tables: cats and dogs within the database petDB and attempts to perform a left join on them using the column called petID:

relBuilder.scan("petDB", "cats")
    .scan("petDB", "dogs").as("dogTableAlias")
    .join(JoinRelType.LEFT,
        relBuilder.equals(
            relBuilder.field(2, 0, "petID"),
            relBuilder.field(2, "dogTableAlias", "petID")
        )
);

The SQL equivalent of this would be:

SELECT * FROM "cats" 
    LEFT JOIN "dogs" ON "cats"."petID" = "dogs"."petID";

Looking at the first field method, you will notice there are 3 parameters being passed.

relBuilder.field(2, 0, "petID")
  • The first parameter 2 is the inputCount. This number should align with the number of scans which are presently on the stack of the relBuilder. In our situation, there are 2 as we called relBuilder.scan() twice in succession.
  • The second parameter is what is referred to as the inputOrdinal. This is a 0-based index that aligns to the scan (or table) which you want to refer to. Since we want to refer to the cats table, this would be 0. Should we want to refer to the dogs table, we could set it to 1.
  • The third parameter is the column name. This can either be a String name of the column, (in this instance, petID). Alternatively, it can also be a fieldOrdinal which is a numerical reference to the order of the fields in which they exist in the table.

RelBuilder Table Aliases using AS

Note the above example in which we scanned the dogs table:

.scan("petDB", "dogs").as("dogTableAlias")

By chaining the as method onto the scan method, we are effectively creating a table alias (dogTableAlias) for us to reference later on in our relational expression.

We can then reference this alias in our field call in lieu of the inputOrdinal:

relBuilder.field(2, "dogTableAlias", "petID")

In contrast, the following code would have referenced the same thing:

relBuilder.field(2, 1, "petID")

Apache Calcite Examples

The following are a number of different applications using Apache Calcite to interact with the database layer. These are intended as real-world usages and examples of how to implement Calcite into a production level application.

Parse and Validate SQL using Calcite

The following code will parse a provided SQL string, in this case SELECT * FROM... and make sure that it is syntactically correct. Since we are passing it to the underlying data source, it will throw an error if the provided table does not exist, or if the query violates the structure of the underlying schema.

public void parseAndValidateSQL() throws SQLException, SqlParseException, ValidationException {
    // Build our connection
    Connection connection = DriverManager.getConnection("jdbc:calcite:");

    // Unwrap our connection using the CalciteConnection
    CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);

    // Define our parser Configuration
    SqlParser.Config parserConfig = SqlParser.config();

    // Get a pointer to our root schema for our Calcite Connection
    SchemaPlus rootSchema = calciteConnection.getRootSchema();


    // Attach our Postgres Jdbc Datasource to our Root Schema
    rootSchema.add("EXAMPLESCHEMA", JdbcSchema.create(rootSchema, "EXAMPLESCHEMA", mockDataSource, null, null));

    Frameworks.ConfigBuilder config = Frameworks.newConfigBuilder()
            .defaultSchema(rootSchema)
            .parserConfig(parserConfig)
            .context(Contexts.of(calciteConnection.config()));

    Planner planner = Frameworks.getPlanner(config.build());
    SqlNode node = planner.parse("SELECT * FROM EXAMPLESCHEMA.MYTEMPTABLE");

    SqlNode validateNode = planner.validate(node);
    SqlWriter writer = new SqlPrettyWriter();
    validateNode.unparse(writer, 0,0);

    // Print out our formatted SQL to the console
    System.out.println(ImmutableList.of(writer.toSqlString().getSql()));
}

Apache Calcite example for MySQL and PostgreSQL

Apache Calcite’s speciality lies in its ability to query from multiple database sources using a single query. For example, you can easily interact with data that is stored in MySQL and ElasticSearch, using a single query. This data can be joined, aggregated, and returned as a single result set all in an optimized manner using Calcite’s robust Volcano and Hep Planners.

import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.interpreter.Bindables;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.hep.HepPlanner;
import org.apache.calcite.plan.hep.HepProgram;
import org.apache.calcite.rel.RelHomogeneousShuttle;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttle;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelRunner;
import org.verdictdb.commons.DBTablePrinter;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class CalciteJdbcExample {

    private static final String POSTGRESQL_SCHEMA = "PUBLIC";
    private static final String MYSQL_SCHEMA = "mysql";

    public static void main(String[] args) throws Exception {

        // Build our connection
        Connection connection = DriverManager.getConnection("jdbc:calcite:");

        // Unwrap our connection using the CalciteConnection
        CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);

        // Get a pointer to our root schema for our Calcite Connection
        SchemaPlus rootSchema = calciteConnection.getRootSchema();

        // Instantiate a data source, this can be autowired in using Spring as well
        DataSource postgresDataSource = JdbcSchema.dataSource(
                "jdbc:postgresql://localhost/db",
                "org.postgresql.Driver", // Change this if you want to use something like MySQL, Oracle, etc.
                "postgres", // username
                "example"   // password
        );

        // Instantiate a data source, this can be autowired in using Spring as well
        DataSource mysqlDataSource = JdbcSchema.dataSource(
                "jdbc:mysql://localhost/db",
                "com.mysql.jdbc.Driver", // Change this if you want to use something like MySQL, Oracle, etc.
                "Username", // username
                "Password"   // password
        );

        // Attach our Postgres Jdbc Datasource to our Root Schema
        rootSchema.add(POSTGRESQL_SCHEMA, JdbcSchema.create(rootSchema, POSTGRESQL_SCHEMA, postgresDataSource, null, null));

        // Attach our MySQL Jdbc Datasource to our Root Schema
        rootSchema.add(MYSQL_SCHEMA, JdbcSchema.create(rootSchema, MYSQL_SCHEMA, mysqlDataSource, null, null));


        // Build a framework config to attach to our Calcite Planners and  Optimizers
        FrameworkConfig config = Frameworks.newConfigBuilder()
                .defaultSchema(rootSchema)
                .build();

        RelBuilder rb = RelBuilder.create(config);

        RelNode node = rb
                // First parameter is the Schema, the second is the table name
                .scan("PUBLIC", "TABLE_NAME_IN_POSTGRES")
                .scan("mysql", "TABLE_NAME_IN_MYSQL")
                // If you want to select from more than one table, you can do so by adding a second scan parameter
                .filter(
                        rb.equals(rb.field("fieldname"), rb.literal("literal"))
                )
                // These are the fields you want to return from your query
                .project(
                        rb.field("id"),
                        rb.field("col1"),
                        rb.field("colb")
                )
                .build();


        HepProgram program = HepProgram.builder().build();
        HepPlanner planner = new HepPlanner(program);

        planner.setRoot(node);

        RelNode optimizedNode = planner.findBestExp();

        final RelShuttle shuttle = new RelHomogeneousShuttle() {
            @Override public RelNode visit(TableScan scan) {
                final RelOptTable table = scan.getTable();
                if (scan instanceof LogicalTableScan && Bindables.BindableTableScan.canHandle(table)) {
                    return Bindables.BindableTableScan.create(scan.getCluster(), table);
                }
                return super.visit(scan);
            }
        };

        optimizedNode = optimizedNode.accept(shuttle);

        final RelRunner runner = connection.unwrap(RelRunner.class);
        PreparedStatement ps = runner.prepare(optimizedNode);

        ps.execute();

        ResultSet resultSet = ps.getResultSet();
        DBTablePrinter.printResultSet(resultSet);
    }
}

Unit Test Example for Calcite

The easiest way to obtain good unit test coverage over your application when using Calcite is to perform mocks using HSQLDB. In the below example, I define a mock HSQL data source and instantiate it using Calcite’s JdbcSchema datasource call. Additionally, I use a RelBuilder to access the data from my temporary database and then actually execute a prepared statement using Calcite’s Relational Expression engine.

import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelRunner;
import org.junit.Before;
import org.junit.Test;
import org.verdictdb.commons.DBTablePrinter;

import javax.sql.DataSource;
import java.sql.*;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.*;

public class CalciteTests {

    private String jdbcUrl;
    private Connection connection;
    private DataSource mockDataSource;

    @Before
    public void setupTests() throws SQLException {
        jdbcUrl = MockDb.INSTANCE.getUrl();
        connection = DriverManager.getConnection(jdbcUrl, "", "");

        // You can populate test data into the database like so:
        try (Statement s = connection.createStatement()) {
            s.execute("create table mytemptable(" +
                    "id integer not null primary key," +
                    "exampleFoo varchar(25)," +
                    "exampleBar varchar(25))");

            s.execute("insert into mytemptable values(1, 'test', '1234')");
            s.execute("insert into mytemptable values(2, 'test2', 'xyz')");
        }

        mockDataSource = JdbcSchema.dataSource(jdbcUrl, "org.hsqldb.jdbcDriver", "", "");
    }

    @Test
    public void calciteUnitTestExample() throws SQLException {

        // Build our connection
        Connection connection = DriverManager.getConnection("jdbc:calcite:");

        // Unwrap our connection using the CalciteConnection
        CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);

        // Get a pointer to our root schema for our Calcite Connection
        SchemaPlus rootSchema = calciteConnection.getRootSchema();

        // Attach our Postgres Jdbc Datasource to our Root Schema
        rootSchema.add("exampleSchema", JdbcSchema.create(rootSchema, "exampleSchema", mockDataSource, null, null));

        FrameworkConfig config = Frameworks.newConfigBuilder()
                .defaultSchema(rootSchema)
                .build();

        RelBuilder r = RelBuilder.create(config);

        RelNode node = r
                // First parameter is the Schema, the second is the table name
                .scan("exampleSchema", "MYTEMPTABLE")
                // If you want to select from more than one table, you can do so by adding a second scan parameter
                .filter(
                        r.equals(r.field("ID"), r.literal(1))
                )
                // These are the fields you want to return from your query
                .project(
                        r.field("ID"),
                        r.field("EXAMPLEFOO"),
                        r.field("EXAMPLEBAR")
                )
                .build();

        RelRunner runner = connection.unwrap(RelRunner.class);
        PreparedStatement ps = runner.prepare(node);

        ps.execute();

        ResultSet resultSet = ps.getResultSet();
        DBTablePrinter.printResultSet(resultSet);
    }

    static class MockDb {
        MockDb() {}
        static final MockDb INSTANCE = new MockDb();
        private final AtomicInteger id = new AtomicInteger(1);

        public String getUrl() {
            return "jdbc:hsqldb:mem:db" + id.getAndIncrement();
        }
    }
}