Tablesaw是Java領域中一個比較好用的數據分析工具,可以對數據集進行清洗、轉換、統計等。如果能使用Sql對數據集進行查詢,那就更完美了。使用Calcite這個開源項目完全可以做到,按照官網所說,Calcite適配了Csv、Elasticsearch、Druid等數據源,適配Tablesaw更不在話下。但是官網中沒有提供Tablesaw的適配器,我們可以參考Csv適配器來簡單適配Tablesaw。
1、Calcite如何查詢Sql
使用Sql查詢數據集,想必很多人都會想到Jdbc查詢的那一套,其實Calcite就是基於Jdbc來實現的,也是那幾個步驟:
/**
* 1. 加載驅動類
* 2. 創建Connection
* 3. 創建Statement
* 4. 執行查詢獲取數據集
* 5. 關閉資源
*/
public static void exec(String sql) throws SQLException {
Properties info = new Properties();
Class.forName("org.apache.calcite.jdbc.Driver");
try (Connection conn = DriverManager.getConnection("jdbc:calcite:", info);
Statement stat = conn.createStatement()) {
final ResultSet resultSet = stat.executeQuery(sql);
....
}
}
Jdbc查詢在查詢時候會指定連接地址,但是如果對Tablesaw查詢要怎么指定連接地址呢?我們可以通過設置查詢屬性,實現Calcite預留的接口,達到查詢Tablesaw的目的。
2、Calcite的連接屬性
Calcite的驅動由Avatica提供,可以設置的屬性可以查看官網,主要有:
| 屬性 | 描述 |
|---|---|
| caseSensitive | 大小寫敏感 |
| lex | 語法,默認Oracle語法 |
| quoting | 標識符的引用語法:對特殊字段的引用,Oracle:"form" |
| quotedCasing | 使用了quoting的標識符,如何排序 |
| unquotedCasing | 沒有使用quoting的標識符,如何排序 |
| timeZone | 默認JVM時區,不建議特別指定 |
| model | 模型文件,指定如何處理數據 |
| schema | schema名稱 |
| schemaFactory | 創建schema的工廠,model存在則不起效 |
| schemaType | schema類型,model存在則不起效 |
| typeSystem | 字段類型系統,model存在則不起效 |
model文件的結構:
{
"version": "1.0",
"defaultSchema": "foodmart",
"schemas": [
{
type: 'custom',
name: 'twissandra',
factory: 'org.apache.calcite.adapter.cassandra.CassandraSchemaFactory',
operand: {
host: 'localhost',
keyspace: 'twissandra'
}
}
]
}
schema.json的屬性:
| 屬性 | 描述 |
|---|---|
| version | 版本 |
| defaultSchema | 默認schema |
| schemas | schema數組,可有多個schema組成 |
| schemas.name | schema的名稱 |
| schemas.type | schema的類型,使用自定義類型需要指定factory |
| schemas.factory | 構造schema的工廠 |
| schemas.operand | 構造schema的自定義參數Map |
| schemas.tables | schema里面可以有多個表 |
| schemas.tables.name | table的名稱 |
| schemas.tables.type | table的類型,使用自定義類型需要指定factory |
| schemas.tables.factory | 構造table的工廠 |
| schemas.tables.operand | 構造table的自定義參數Map |
可以理解為一個schema.json文件里面有多個schema,一個schema里面有多個table。
我們也可以通過url的方式傳入參數,效果和上面一樣:
jdbc:calcite:schemaFactory=org.apache.calcite.adapter.cassandra.CassandraSchemaFactory; schema.host=localhost; schema.keyspace=twissandra
3、定義Table的結構
我們需要定義Table的結構,來達到結構化查詢的目的。
public abstract class DataFrameTable extends AbstractTable {
protected tech.tablesaw.api.Table table;
private RelDataType rowType;
DataFrameTable(tech.tablesaw.api.Table table) {
this.table = table;
}
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
if (rowType == null) {
rowType = createRelDataType(typeFactory);
}
return rowType;
}
private RelDataType createRelDataType(RelDataTypeFactory typeFactory) {
List<RelDataType> types = new ArrayList<>();
List<String> names = new ArrayList<>();
for (Column<?> column : table.columns()) {
DataFrameFieldType type = DataFrameFieldType.of(column.type());
RelDataType relDataType = type.toType((JavaTypeFactory) typeFactory);
types.add(relDataType);
names.add(column.name());
}
return typeFactory.createStructType(Pair.zip(names, types));
}
}
定義Table,必不可少定義字段數據類型,需要指定Tablesaw的字段類型和Calcite的字段類型的對應關系
展開查看
enum DataFrameFieldType {
STRING(String.class, ColumnType.STRING),
TEXT(String.class, ColumnType.TEXT),
BOOLEAN(Primitive.BOOLEAN, ColumnType.BOOLEAN),
SHORT(Primitive.SHORT, ColumnType.SHORT),
INT(Primitive.INT, ColumnType.INTEGER),
LONG(Primitive.LONG, ColumnType.LONG),
FLOAT(Primitive.FLOAT, ColumnType.FLOAT),
DOUBLE(Primitive.DOUBLE, ColumnType.DOUBLE),
DATE(java.sql.Date.class, ColumnType.LOCAL_DATE),
TIME(java.sql.Time.class, ColumnType.LOCAL_TIME),
TIMESTAMP(java.sql.Timestamp.class, ColumnType.LOCAL_DATE_TIME);
private final Class<?> clazz;
private final ColumnType columnType;
private static final Map<ColumnType, DataFrameFieldType> MAP = new HashMap<>();
static {
for (DataFrameFieldType value : values()) {
MAP.put(value.columnType, value);
}
}
DataFrameFieldType(Primitive primitive, ColumnType columnType) {
this(primitive.boxClass, columnType);
}
DataFrameFieldType(Class<?> clazz, ColumnType columnType) {
this.clazz = clazz;
this.columnType = columnType;
}
public RelDataType toType(JavaTypeFactory typeFactory) {
RelDataType javaType = typeFactory.createJavaType(clazz);
RelDataType sqlType = typeFactory.createSqlType(javaType.getSqlTypeName());
return typeFactory.createTypeWithNullability(sqlType, true);
}
public static DataFrameFieldType of(ColumnType columnType) {
return MAP.get(columnType);
}
}
這樣我們就定義好一個Table的結構了,我們定義的Table中,有一個tech.tablesaw.api.Table,這個是用於提供數據和字段結構的,至於怎么來的,后面細說。
4、定義Table的數據枚舉器Enumerator
Calcite定義了3中查詢Table的方式,可以參考
- ScannableTable:根據全部數據查詢
- FilterableTable:查詢底層DB時進行一部分的數據過濾,再在內存中查詢
- TranslatableTable:自定義優化規則
我們以最簡單的ScannableTable為例子查詢。
public class DataFrameScannableTable extends DataFrameTable implements ScannableTable {
DataFrameScannableTable(tech.tablesaw.api.Table table) {
super(table);
}
public Enumerable<Object[]> scan(DataContext root) {
return new AbstractEnumerable<Object[]>() {
public Enumerator<Object[]> enumerator() {
return new DataFrameEnumerator(table);
}
};
}
}
ScannableTable的scan方法定義了如何獲取Enumerator,而Enumerator是操作數據的關鍵
展開查看
class DataFrameEnumerator implements Enumerator<Object[]> {
private Enumerator<Object[]> enumerator;
DataFrameEnumerator(tech.tablesaw.api.Table table) {
List<Object[]> objs = new ArrayList<>();
for (int row = 0; row < table.rowCount(); row++) {
Object[] rows = new Object[table.columnCount()];
for (int col = 0; col < table.columnCount(); col++) {
Column<?> column = table.column(col);
rows[col] = convertToEnumeratorObject(column, row);
}
objs.add(rows);
}
this.enumerator = Linq4j.enumerator(objs);
}
public Object[] current() {
return enumerator.current();
}
public boolean moveNext() {
return enumerator.moveNext();
}
public void reset() {
enumerator.reset();
}
public void close() {
enumerator.close();
}
private Object convertToEnumeratorObject(Column<?> column, int row) {
final TimeZone gmt = TimeZone.getTimeZone("GMT");
if (column instanceof DateColumn) {
return ((DateColumn) column).get(row).toEpochDay();
} else if (column instanceof TimeColumn) {
return Date.from(
((TimeColumn) column).get(row)
.atDate(LocalDate.ofEpochDay(0))
.atZone(gmt.toZoneId())
.toInstant()
).getTime();
} else if (column instanceof DateTimeColumn) {
return Date.from(
((DateTimeColumn) column).get(row)
.atZone(gmt.toZoneId())
.toInstant()
).getTime();
} else {
return column.get(row);
}
}
}
5、使用其他方法代替model.json傳入參數
至此,我們只需讓Calcite使用我們定義的Table來查詢數據就行了,前面說到可以通過傳入model.json來實現。我們通過自定義的TableFactory來創建Table:
public class DataFrameTableFactory implements TableFactory<DataFrameTable> {
private tech.tablesaw.api.Table table;
public DataFrameTableFactory(tech.tablesaw.api.Table table) {
this.table = table;
}
@Override
public DataFrameTable create(SchemaPlus schema,
String name,
Map<String, Object> operand,
RelDataType rowType) {
return new DataFrameScannableTable(table);
}
}
但是我們看到tech.tablesaw.api.Table參數,這個該怎么傳入?如果在有數據的前提下,我們很難通過model.json的operand參數來傳入,model.json適合根據operand參數到數據源獲取數據后再操作。對此,我們使用另外一個方法:
private static void setTableModel(Connection connection, tech.tablesaw.api.Table table) {
SchemaPlus rootSchema = ((CalciteConnection) connection).getRootSchema();
TableFactory<?> tableFactory = new DataFrameTableFactory(table);
org.apache.calcite.schema.Table t = tableFactory
.create(rootSchema, table.name(), ImmutableMap.of(), null);
rootSchema.add(table.name(), t);
}
該方法主要是獲取CalciteConnection的SchemaPlus,來傳入我們定義的Table。
6、測試
到此,我們應該能用Sql查詢Tablesaw了。我們來測試一下:
展開查看
@Test
public void test() throws SQLException {
tech.tablesaw.api.Table table = tech.tablesaw.api.Table.create("test");
StringColumn stringColumn = StringColumn.create("A");
stringColumn.append("bbbbb");
table.addColumns(stringColumn);
DateColumn dateColumn = DateColumn.create("B");
dateColumn.append(LocalDate.now());
table.addColumns(dateColumn);
DateTimeColumn dateTimeColumn = DateTimeColumn.create("C");
dateTimeColumn.append(LocalDateTime.now());
table.addColumns(dateTimeColumn);
TimeColumn timeColumn = TimeColumn.create("D");
timeColumn.append(LocalTime.now());
table.addColumns(timeColumn);
Table t = DataFrameQueryUtils.exec(table, "SELECT * FROM \"test\"");
System.out.println(t);
}
7、總結
這個例子只用了Table,並沒有使用Schema,其實Schema的原理也差不多,就是定義Table的集合,有需要可以參考官方來自己實現。Calcite做Tablesaw的適配器也不在話下,用二維數組代替Tablesaw也是可以的。雖然如此,但是還是要注意關於時間類型的一些坑,是關於時間類型轉換和時區的一些問題,這個有空再總結。
