Kafka - SQL 代碼實現


1.概述

  上次給大家分享了關於 Kafka SQL 的實現思路,這次給大家分享如何實現 Kafka SQL。要實現 Kafka SQL,在上一篇《Kafka - SQL 引擎分享》中分享了其實現的思路,核心包含數據源的加載,以及 SQL 樹的映射。今天筆者給大家分享相關實現的代碼。

2.內容

  這里,將數據映射成 SQL Tree 是使用了 Apache Calcite 來承接這部分工作。在實現代碼之前,我們首先來了解下 Apache Calcite 的相關內容,Apache Calcite 是一個面向 Hadoop 的查詢引擎,它提供了業界標准的 SQL 語言,以及多種查詢優化和連接各種存儲介質的適配器。另外,還能處理 OLAP 和流處理場景。因為存在這么多優秀和閃光的特性, Hadoop 生態圈中 Apache Calcite 越發引人注目,被諸多項目所集成,常見的有:

  • Apache Drill:基於大數據的實時查詢引擎
  • Apache Spark:繼 Hadoop 之后的新一代大數據分布式處理框架。
  • 更多詳情,這里就不一一列舉了,詳情查看地址:《Adapters

2.1 數據類型

  這里數據源的數據類型,我們分為兩種,一種是 SQL,另一種是基於編程語言的,這里我們所使用的是 Java,定義內容如下:

public static Map<String, SqlTypeName> SQLTYPE_MAPPING = new HashMap<String, SqlTypeName>();
public static Map<String, Class> JAVATYPE_MAPPING = new HashMap<String, Class>();

public static void initRowType() {
        SQLTYPE_MAPPING.put("char", SqlTypeName.CHAR);
        JAVATYPE_MAPPING.put("char", Character.class);
        SQLTYPE_MAPPING.put("varchar", SqlTypeName.VARCHAR);
        JAVATYPE_MAPPING.put("varchar", String.class);
        // ......     
}

2.2 表的相關描述

  另外,我們需要對表進行一個描述,在關系型數據庫中,一個正常的表由行列組成,定義內容如下:

    public static class Database {
        public List<Table> tables = new LinkedList<Table>();
    }

    public static class Table {
        public String tableName;
        public List<Column> columns = new LinkedList<Column>();
        public List<List<String>> data = new LinkedList<List<String>>();
    }

    public static class Column {
        public String name;
        public String type;
    }

  在每個集合中存儲數據庫相關名稱,每個數據庫存儲多個集合的表對象,每個表對象下面又有一系列的列以及綁定的數據源。在每個列對象中包含字段名和類型,層層遞進,依次關聯。在使用 Calcite 是,需要遵循其 JSON Model,上篇博客我們已經定義過其 JSON Model,這里我們直接拿來使用,內容如下:

{
    version: '1.0',
    defaultSchema: 'kafka',  
    schemas: [  
        {
            name: 'kafka',  
            type: 'custom',
            factory: 'cn.smartloli.kafka.visual.engine.KafkaMemorySchemaFactory',  
            operand: {
                database: 'kafka_db'
            }  
        } 
    ]
}

   要實現其 Model ,這里需要我們去實現 org.apache.calcite.schema.SchemaFactory 的接口,內容如下所示:

public class KafkaMemorySchemaFactory implements SchemaFactory {
    @Override
    public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
        return new KafkaMemorySchema(name);
    }
}

  而在 KafkaMemorySchema 類中,我們只需要實現它的 getTableMap 方法,內容如下所示:

 @Override
 protected Map<String, Table> getTableMap() {
   Map<String, Table> tables = new HashMap<String, Table>();
    Database database = KafkaMemoryData.MAP.get(this.dbName);
    if (database == null)
      return tables;
    for (KafkaMemoryData.Table table : database.tables) {
      tables.put(table.tableName, new KafkaMemoryTable(table));
    }
    return tables;
 }

  從上述代碼中,可以知道通過內存中的 Map 表查看對應的數據庫對象,然后根據數據庫對象中的表作為 Schema 中的表,而表的類型為 KafkaMemoryTable。

2.3 表類型

  這里筆者就直接使用全表掃描,使用 org.apache.calcite.schema.impl.AbstractTable 的默認方式,實現其 getRowType 方法和 scan 方法,內容如下所示:

public RelDataType getRowType(RelDataTypeFactory typeFactory) {
  if(dataType == null) {
     RelDataTypeFactory.FieldInfoBuilder fieldInfo = typeFactory.builder();
      for (KafkaMemoryData.Column column : this.sourceTable.columns) {
        RelDataType sqlType = typeFactory.createJavaType(
        KafkaMemoryData.JAVATYPE_MAPPING.get(column.type));
        sqlType = SqlTypeUtil.addCharsetAndCollation(sqlType, typeFactory);
        fieldInfo.add(column.name, sqlType);
      }
      this.dataType = typeFactory.createStructType(fieldInfo);
   }
   return this.dataType;
}
public Enumerable<Object[]> scan(DataContext root) {
        final List<String> types = new ArrayList<String>(sourceTable.columns.size());
        for(KafkaMemoryData.Column column : sourceTable.columns) {
            types.add(column.type);
        }
        final int[] fields = identityList(this.dataType.getFieldCount());
        return new AbstractEnumerable<Object[]>() {
            public Enumerator<Object[]> enumerator() {
                return new KafkaMemoryEnumerator<Object[]>(fields, types, sourceTable.data);
            }
        };
    }

  代碼中,表中的字段名和類型是根據初始化時,每個表中的數據類型映射匹配的,在 KafkaMemoryData.SQLTYPE_MAPPING 和 KafkaMemoryData.JAVATYPE_MAPPING 中有描述相關自定義類型映射,這里就不多做贅述了。

  實現流程大致就是這個樣子,將每次的 SQL 查詢,通過 Calcite 解析成標准可執行的 SQL 計划,執行期間會根據定義的信息,初始化每一個 Schema,在通過調用 getTableMap 獲取字段名和類型,根據這些信息判斷查詢的表,字段名,類型以及 SQL 語法是否標准規范。然后在使用 Calcite 內部機制,生成物理執行計划。查詢計划是 Tree 形式的,底層是進行掃表操作(可看作為 FROM),獲取每個表的數據,之后在根據表數據進行上層的關聯操作,如 JOIN,GROUP BY,LIMIT 等操作。

3.測試

  完成上述流程后,進行代碼測試,測試代碼如下所示:

public static void main(String[] args) {
        try {
            Class.forName("org.apache.calcite.jdbc.Driver");
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        Properties info = new Properties();
        try {
            Connection connection = DriverManager.getConnection("jdbc:calcite:model=/Users/dengjie/hadoop/workspace/kafka/kafka-visual/src/main/resources/plugins.json",info);    
            Statement st = connection.createStatement();
            // String sql = "select * from \"Kafka\" where \"_plat\"='1004' limit 1";
            String sql = "select * from \"Kafka\" limit 10";

            long start = System.currentTimeMillis();
            result = st.executeQuery(sql);
            ResultSetMetaData rsmd = result.getMetaData();
            List<Map<String, Object>> ret = new ArrayList<Map<String,Object>>();
            
            while (result.next()) {
                Map<String, Object> map = new HashMap<String, Object>();
                for (int i = 1; i <= rsmd.getColumnCount(); i++) {
                    System.out.print(result.getString(rsmd.getColumnName(i)) + " ");
                    map.put(rsmd.getColumnName(i), result.getString(rsmd.getColumnName(i)));
                }
                ret.add(map);
                System.out.println();
            }
            System.out.println(new Gson().toJson(ret));       
            result.close();
            connection.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

4.總結

  以上便是將 Kafka 中數據消費后,作為數據源加載和 SQL Tree 映射的實現代碼,實現不算太困難,在編寫 SQL 查詢的時候,需要遵循標准的 SQL 語法來操作數據源。

5.結束語

  這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM