如何自定義一個Calcite對Tablesaw查詢的適配器


Tablesaw是Java領域中一個比較好用的數據分析工具,可以對數據集進行清洗、轉換、統計等。如果能使用Sql對數據集進行查詢,那就更完美了。使用Calcite這個開源項目完全可以做到,按照官網所說,Calcite適配了Csv、Elasticsearch、Druid等數據源,適配Tablesaw更不在話下。但是官網中沒有提供Tablesaw的適配器,我們可以參考Csv適配器來簡單適配Tablesaw。

1、Calcite如何查詢Sql

使用Sql查詢數據集,想必很多人都會想到Jdbc查詢的那一套,其實Calcite就是基於Jdbc來實現的,也是那幾個步驟:

/**
 * 1. 加載驅動類
 * 2. 創建Connection
 * 3. 創建Statement
 * 4. 執行查詢獲取數據集
 * 5. 關閉資源
*/
public static void exec(String sql) throws SQLException {
    Properties info = new Properties();
    Class.forName("org.apache.calcite.jdbc.Driver");    
    try (Connection conn = DriverManager.getConnection("jdbc:calcite:", info);  
        Statement stat = conn.createStatement()) {      
        final ResultSet resultSet = stat.executeQuery(sql);     
        ....
    }
}

Jdbc查詢在查詢時候會指定連接地址,但是如果對Tablesaw查詢要怎么指定連接地址呢?我們可以通過設置查詢屬性,實現Calcite預留的接口,達到查詢Tablesaw的目的。

2、Calcite的連接屬性

Calcite的驅動由Avatica提供,可以設置的屬性可以查看官網,主要有:

屬性 描述
caseSensitive 大小寫敏感
lex 語法,默認Oracle語法
quoting 標識符的引用語法:對特殊字段的引用,Oracle:"form"
quotedCasing 使用了quoting的標識符,如何排序
unquotedCasing 沒有使用quoting的標識符,如何排序
timeZone 默認JVM時區,不建議特別指定
model 模型文件,指定如何處理數據
schema schema名稱
schemaFactory 創建schema的工廠,model存在則不起效
schemaType schema類型,model存在則不起效
typeSystem 字段類型系統,model存在則不起效

model文件的結構:

{
  "version": "1.0",
  "defaultSchema": "foodmart",
  "schemas": [
    {
      type: 'custom',
      name: 'twissandra',
      factory: 'org.apache.calcite.adapter.cassandra.CassandraSchemaFactory',
      operand: {
        host: 'localhost',
        keyspace: 'twissandra'
      }
    }
  ]
}

schema.json的屬性:

屬性 描述
version 版本
defaultSchema 默認schema
schemas schema數組,可有多個schema組成
schemas.name schema的名稱
schemas.type schema的類型,使用自定義類型需要指定factory
schemas.factory 構造schema的工廠
schemas.operand 構造schema的自定義參數Map
schemas.tables schema里面可以有多個表
schemas.tables.name table的名稱
schemas.tables.type table的類型,使用自定義類型需要指定factory
schemas.tables.factory 構造table的工廠
schemas.tables.operand 構造table的自定義參數Map

可以理解為一個schema.json文件里面有多個schema,一個schema里面有多個table。
我們也可以通過url的方式傳入參數,效果和上面一樣:

jdbc:calcite:schemaFactory=org.apache.calcite.adapter.cassandra.CassandraSchemaFactory; schema.host=localhost; schema.keyspace=twissandra

3、定義Table的結構

我們需要定義Table的結構,來達到結構化查詢的目的。

public abstract class DataFrameTable extends AbstractTable {

    protected tech.tablesaw.api.Table table;

    private RelDataType rowType;

    DataFrameTable(tech.tablesaw.api.Table table) {
        this.table = table;
    }

    @Override
    public RelDataType getRowType(RelDataTypeFactory typeFactory) {
        if (rowType == null) {
            rowType = createRelDataType(typeFactory);
        }
        return rowType;
    }

    private RelDataType createRelDataType(RelDataTypeFactory typeFactory) {
        List<RelDataType> types = new ArrayList<>();
        List<String> names = new ArrayList<>();
        for (Column<?> column : table.columns()) {
            DataFrameFieldType type = DataFrameFieldType.of(column.type());
            RelDataType relDataType = type.toType((JavaTypeFactory) typeFactory);
            types.add(relDataType);
            names.add(column.name());
        }
        return typeFactory.createStructType(Pair.zip(names, types));
    }
}

定義Table,必不可少定義字段數據類型,需要指定Tablesaw的字段類型和Calcite的字段類型的對應關系

展開查看
enum DataFrameFieldType {
    STRING(String.class, ColumnType.STRING),
    TEXT(String.class, ColumnType.TEXT),
    BOOLEAN(Primitive.BOOLEAN, ColumnType.BOOLEAN),
    SHORT(Primitive.SHORT, ColumnType.SHORT),
    INT(Primitive.INT, ColumnType.INTEGER),
    LONG(Primitive.LONG, ColumnType.LONG),
    FLOAT(Primitive.FLOAT, ColumnType.FLOAT),
    DOUBLE(Primitive.DOUBLE, ColumnType.DOUBLE),
    DATE(java.sql.Date.class, ColumnType.LOCAL_DATE),
    TIME(java.sql.Time.class, ColumnType.LOCAL_TIME),
    TIMESTAMP(java.sql.Timestamp.class, ColumnType.LOCAL_DATE_TIME);

    private final Class<?> clazz;
    private final ColumnType columnType;

    private static final Map<ColumnType, DataFrameFieldType> MAP = new HashMap<>();

    static {
        for (DataFrameFieldType value : values()) {
            MAP.put(value.columnType, value);
        }
    }

    DataFrameFieldType(Primitive primitive, ColumnType columnType) {
        this(primitive.boxClass, columnType);
    }

    DataFrameFieldType(Class<?> clazz, ColumnType columnType) {
        this.clazz = clazz;
        this.columnType = columnType;
    }

    public RelDataType toType(JavaTypeFactory typeFactory) {
        RelDataType javaType = typeFactory.createJavaType(clazz);
        RelDataType sqlType = typeFactory.createSqlType(javaType.getSqlTypeName());
        return typeFactory.createTypeWithNullability(sqlType, true);
    }

    public static DataFrameFieldType of(ColumnType columnType) {
        return MAP.get(columnType);
    }
}

這樣我們就定義好一個Table的結構了,我們定義的Table中,有一個tech.tablesaw.api.Table,這個是用於提供數據和字段結構的,至於怎么來的,后面細說。

4、定義Table的數據枚舉器Enumerator

Calcite定義了3中查詢Table的方式,可以參考

  • ScannableTable:根據全部數據查詢
  • FilterableTable:查詢底層DB時進行一部分的數據過濾,再在內存中查詢
  • TranslatableTable:自定義優化規則

我們以最簡單的ScannableTable為例子查詢。

public class DataFrameScannableTable extends DataFrameTable implements ScannableTable {

    DataFrameScannableTable(tech.tablesaw.api.Table table) {
        super(table);
    }

    public Enumerable<Object[]> scan(DataContext root) {
        return new AbstractEnumerable<Object[]>() {
            public Enumerator<Object[]> enumerator() {
                return new DataFrameEnumerator(table);
            }
        };
    }
}

ScannableTable的scan方法定義了如何獲取Enumerator,而Enumerator是操作數據的關鍵

展開查看
class DataFrameEnumerator implements Enumerator<Object[]> {

    private Enumerator<Object[]> enumerator;

    DataFrameEnumerator(tech.tablesaw.api.Table table) {
        List<Object[]> objs = new ArrayList<>();
        for (int row = 0; row < table.rowCount(); row++) {
            Object[] rows = new Object[table.columnCount()];
            for (int col = 0; col < table.columnCount(); col++) {
                Column<?> column = table.column(col);
                rows[col] = convertToEnumeratorObject(column, row);
            }
            objs.add(rows);
        }
        this.enumerator = Linq4j.enumerator(objs);
    }

    public Object[] current() {
        return enumerator.current();
    }

    public boolean moveNext() {
        return enumerator.moveNext();
    }

    public void reset() {
        enumerator.reset();
    }

    public void close() {
        enumerator.close();
    }

    private Object convertToEnumeratorObject(Column<?> column, int row) {
        final TimeZone gmt = TimeZone.getTimeZone("GMT");
        if (column instanceof DateColumn) {
            return ((DateColumn) column).get(row).toEpochDay();
        } else if (column instanceof TimeColumn) {
            return Date.from(
                    ((TimeColumn) column).get(row)
                            .atDate(LocalDate.ofEpochDay(0))
                            .atZone(gmt.toZoneId())
                            .toInstant()
            ).getTime();
        } else if (column instanceof DateTimeColumn) {
            return Date.from(
                    ((DateTimeColumn) column).get(row)
                            .atZone(gmt.toZoneId())
                            .toInstant()
            ).getTime();
        } else {
            return column.get(row);
        }
    }
}

5、使用其他方法代替model.json傳入參數

至此,我們只需讓Calcite使用我們定義的Table來查詢數據就行了,前面說到可以通過傳入model.json來實現。我們通過自定義的TableFactory來創建Table:

public class DataFrameTableFactory implements TableFactory<DataFrameTable> {

    private tech.tablesaw.api.Table table;

    public DataFrameTableFactory(tech.tablesaw.api.Table table) {
        this.table = table;
    }

    @Override
    public DataFrameTable create(SchemaPlus schema,
                                 String name,
                                 Map<String, Object> operand,
                                 RelDataType rowType) {
        return new DataFrameScannableTable(table);
    }
}

但是我們看到tech.tablesaw.api.Table參數,這個該怎么傳入?如果在有數據的前提下,我們很難通過model.json的operand參數來傳入,model.json適合根據operand參數到數據源獲取數據后再操作。對此,我們使用另外一個方法:

    private static void setTableModel(Connection connection, tech.tablesaw.api.Table table) {
        SchemaPlus rootSchema = ((CalciteConnection) connection).getRootSchema();
        TableFactory<?> tableFactory = new DataFrameTableFactory(table);
        org.apache.calcite.schema.Table t = tableFactory
                .create(rootSchema, table.name(), ImmutableMap.of(), null);
        rootSchema.add(table.name(), t);
    }

該方法主要是獲取CalciteConnection的SchemaPlus,來傳入我們定義的Table。

6、測試

到此,我們應該能用Sql查詢Tablesaw了。我們來測試一下:

展開查看
    @Test
    public void test() throws SQLException {
        tech.tablesaw.api.Table table = tech.tablesaw.api.Table.create("test");

        StringColumn stringColumn = StringColumn.create("A");
        stringColumn.append("bbbbb");
        table.addColumns(stringColumn);

        DateColumn dateColumn = DateColumn.create("B");
        dateColumn.append(LocalDate.now());
        table.addColumns(dateColumn);

        DateTimeColumn dateTimeColumn = DateTimeColumn.create("C");
        dateTimeColumn.append(LocalDateTime.now());
        table.addColumns(dateTimeColumn);

        TimeColumn timeColumn = TimeColumn.create("D");
        timeColumn.append(LocalTime.now());
        table.addColumns(timeColumn);

        Table t = DataFrameQueryUtils.exec(table, "SELECT * FROM \"test\"");
        System.out.println(t);
    }
使用各種Calcite內置的函數也是沒問題的,但是如果使用了額外的函數,可能需要額外定義函數的計算方式。

7、總結

這個例子只用了Table,並沒有使用Schema,其實Schema的原理也差不多,就是定義Table的集合,有需要可以參考官方來自己實現。Calcite做Tablesaw的適配器也不在話下,用二維數組代替Tablesaw也是可以的。雖然如此,但是還是要注意關於時間類型的一些坑,是關於時間類型轉換和時區的一些問題,這個有空再總結。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM