MIT 6.830 LAB2 DBOperator


MIT 6.830 LAB2 DBOperator

2021/04/01-2021/04/05

前言

課程地址:http://db.lcs.mit.edu/6.830/sched.php

代碼:https://github.com/MIT-DB-Class/simple-db-hw

講義:https://github.com/MIT-DB-Class/course-info-2018/

清明期間用零散的時間把Lab2寫完了,整個Lab2基本是在為simpledb實現一個基本的操作。比如我們常用的選擇、join、聚合、插入、刪除等。

LAB2

exercise1 Join&Filter

xxxPredicate是輔助類,一般用於根據Field是否滿足條件來篩選Tuple;

Filter實現了Operator接口,理解來說就是根據某個Predicate來篩選Tuple,理解來說就是類似篩選出滿足where xxx="xxx"的Tuple;

Join實現了Operator接口,是關系運算中的連接操作

Operator接口是實現類可以參考Project.java或者OrderBy.java

實現

  • src/simpledb/Predicate.java
  • src/simpledb/JoinPredicate.java
  • src/simpledb/Filter.java
  • src/simpledb/Join.java

Join難點在於fetchNext,由於是嵌套循環,需要在Join類中添加成員變量private Tuple t1;

    protected Tuple fetchNext() throws TransactionAbortedException, DbException {
        while (t1!=null||child1.hasNext()){
            if(t1==null){ // init t1 if null
                if(child1.hasNext()){
                    t1 = child1.next();
                }else{
                    return null;
                }
            }
            if(!child2.hasNext()){ // t2到頭了,nextLoop
                if(child1.hasNext()){
                    child2.rewind();
                    t1 = child1.next();
                }else{
                    return null;
                }
            }
            while (child2.hasNext()){
                Tuple t2 = child2.next();
                if(joinPredicate.filter(t1,t2)){
                    Tuple res = new Tuple(getTupleDesc());
                    for (int i = 0; i < t1.getTupleDesc().numFields(); i++) {
                        res.setField(i,t1.getField(i));
                    }
                    for (int i = 0; i < t2.getTupleDesc().numFields(); i++) {
                        res.setField(t1.getTupleDesc().numFields()+i,t2.getField(i));
                    }
                    return res;
                }
            }
        }
        return null;
    }

這邊的基類Operator其實也是一個OpIterator,其nexthasNext的實現采用了模板方法的設計模式,用了一個Tuple作為緩存,十分巧妙

子類(也就是我們這里實現的Join、Filter)只需要實現fetchNext就可以了

public abstract class Operator implements OpIterator {

    private Tuple next = null;
    
    public boolean hasNext() throws DbException, TransactionAbortedException {
        if (!this.open)
            throw new IllegalStateException("Operator not yet open");
        
        if (next == null)
            next = fetchNext();
        return next != null;
    }

    public Tuple next() throws DbException, TransactionAbortedException,
            NoSuchElementException {
        if (next == null) {
            next = fetchNext();
            if (next == null)
                throw new NoSuchElementException();
        }

        Tuple result = next;
        next = null;
        return result;
    }

    /**
     * Returns the next Tuple in the iterator, or null if the iteration is
     * finished. Operator uses this method to implement both <code>next</code>
     * and <code>hasNext</code>.
     * 
     * @return the next Tuple in the iterator, or null if the iteration is
     *         finished.
     */
    protected abstract Tuple fetchNext() throws DbException,
            TransactionAbortedException;
    
    ....
}

exercise2 Aggregate

Aggregate實現Operator接口,是聚合操作類。比如select count(*) from xx group by xxx

IntegerAggregator和StringAggregator其實都是Aggregate的輔助類

區別在於聚合操作字段的屬性,如果是Integer,可以有count、sum、max、min、avg操作;如果是String,只有count操作

實現

  • src/simpledb/IntegerAggregator.java
  • src/simpledb/StringAggregator.java
  • src/simpledb/Aggregate.java

注意這里的groupby字段可以是空,也就是可以允許有沒有groupby的情況下使用聚合函數,這是符合MySQL語法的,如select count(*) from user;

總的來說,難點只在於實現IntegerAggregator,StringAggregator完全可以看作IntegerAggregator的子類,而Aggregate只是對xxxAggregator的一個封裝,只需要調用xxxAggregator即可。

這里我的實現和網上大多數代碼不一樣,充分使用了依賴倒置原則。

以IntegerAggregator為例,我們需要實現五個聚合操作,將其封裝為GBHandler私有抽象類,通過策略模式進行實現,保證可讀性和可修改性。將聚合操作的職責封裝進GBHandler中,頂層的IntegerAggregator只需要將其計算后的結果,使用TupleIterator進行封裝,返回結果即可。

/**
 * Knows how to compute some aggregate over a set of IntFields.
 *
 * finish in lab2 exercise2
 */
public class IntegerAggregator implements Aggregator {

    private static final long serialVersionUID = 1L;

    private static final String NO_GROUPING_KEY = "NO_GROUPING_KEY";
    /**
     * the 0-based index of the group-by field in the tuple,
     * or NO_GROUPING if there is no grouping
     */
    private int gbFieldIndex;
    /**
     * the type of the group by field (e.g., Type.INT_TYPE),
     * or null if there is no grouping
     */
    private Type gbFieldType;
    /**
     * the 0-based index of the aggregate field in the tuple
     */
    private int aggregateFieldIndex;
    /**
     * the aggregation operator
     */
    private Op aggregationOp;

    private GBHandler gbHandler;

    /**
     * Aggregate constructor
     * 
     * @param gbfield
     *            the 0-based index of the group-by field in the tuple, or
     *            NO_GROUPING if there is no grouping
     * @param gbfieldtype
     *            the type of the group by field (e.g., Type.INT_TYPE), or null
     *            if there is no grouping
     * @param afield
     *            the 0-based index of the aggregate field in the tuple
     * @param what
     *            the aggregation operator
     */

    public IntegerAggregator(int gbfield, Type gbfieldtype, int afield, Op what) {
        this.gbFieldIndex = gbfield;
        this.gbFieldType = gbfieldtype;
        this.aggregateFieldIndex = afield;
        this.aggregationOp = what;
        switch (what){
            case MIN:
                gbHandler =new MinHandler();
                break;
            case MAX:
                gbHandler =new MaxHandler();
                break;
            case AVG:
                gbHandler =new AvgHandler();
                break;
            case SUM:
                gbHandler =new SumHandler();
                break;
            case COUNT:
                gbHandler =new CountHandler();
                break;
            default:
                throw new UnsupportedOperationException("Unsupported aggregation operator ");
        }
    }

    /**
     * Merge a new tuple into the aggregate, grouping as indicated in the
     * constructor
     * 
     * @param tup
     *            the Tuple containing an aggregate field and a group-by field
     */
    public void mergeTupleIntoGroup(Tuple tup) {
        if(gbFieldType!=null&&(!tup.getField(gbFieldIndex).getType().equals(gbFieldType))){
            throw new IllegalArgumentException("Given tuple has wrong type");
        }
        String key;
        if (gbFieldIndex == NO_GROUPING) {
            key = NO_GROUPING_KEY;
        } else {
            key = tup.getField(gbFieldIndex).toString();
        }
        gbHandler.handle(key,tup.getField(aggregateFieldIndex));
    }
    /**
     * Create a OpIterator over group aggregate results.
     * 
     * @return a OpIterator whose tuples are the pair (groupVal, aggregateVal)
     *         if using group, or a single (aggregateVal) if no grouping. The
     *         aggregateVal is determined by the type of aggregate specified in
     *         the constructor.
     */
    public OpIterator iterator() {
        Map<String,Integer> results = gbHandler.getGbResult();
        Type[] types;
        String[] names;
        TupleDesc tupleDesc;
        List<Tuple> tuples = new ArrayList<>();
        if(gbFieldIndex==NO_GROUPING){
            types = new Type[]{Type.INT_TYPE};
            names = new String[]{"aggregateVal"};
            tupleDesc = new TupleDesc(types,names);
            for(Integer value:results.values()){
                Tuple tuple = new Tuple(tupleDesc);
                tuple.setField(0,new IntField(value));
                tuples.add(tuple);
            }
        }else{
            types = new Type[]{gbFieldType,Type.INT_TYPE};
            names = new String[]{"groupVal","aggregateVal"};
            tupleDesc = new TupleDesc(types,names);
            for(Map.Entry<String,Integer> entry:results.entrySet()){
                Tuple tuple = new Tuple(tupleDesc);
                if(gbFieldType==Type.INT_TYPE){
                    tuple.setField(0,new IntField(Integer.parseInt(entry.getKey())));
                }else{
                    tuple.setField(0,new StringField(entry.getKey(),entry.getKey().length()));
                }
                tuple.setField(1,new IntField(entry.getValue()));
                tuples.add(tuple);
            }
        }
        return new TupleIterator(tupleDesc,tuples);
    }

    private abstract class GBHandler{
        ConcurrentHashMap<String,Integer> gbResult;
        abstract void handle(String key,Field field);
        private GBHandler(){
            gbResult = new ConcurrentHashMap<>();
        }
        public Map<String,Integer> getGbResult(){
            return gbResult;
        }
    }
    private class CountHandler extends GBHandler {
        @Override
        public void handle(String key, Field field) {
            if(gbResult.containsKey(key)){
                gbResult.put(key,gbResult.get(key)+1);
            }else{
                gbResult.put(key,1);
            }
        }
    }
    private class SumHandler extends GBHandler{
        @Override
        public void handle(String key, Field field) {
            if(gbResult.containsKey(key)){
                gbResult.put(key,gbResult.get(key)+Integer.parseInt(field.toString()));
            }else{
                gbResult.put(key,Integer.parseInt(field.toString()));
            }
        }
    }
    private class MinHandler extends GBHandler{
        @Override
        void handle(String key, Field field) {
            int tmp = Integer.parseInt(field.toString());
            if(gbResult.containsKey(key)){
                int res = gbResult.get(key)<tmp?gbResult.get(key):tmp;
                gbResult.put(key, res);
            }else{
                gbResult.put(key,tmp);
            }
        }
    }
    private class MaxHandler extends GBHandler{
        @Override
        void handle(String key, Field field) {
            int tmp = Integer.parseInt(field.toString());
            if(gbResult.containsKey(key)){
                int res = gbResult.get(key)>tmp?gbResult.get(key):tmp;
                gbResult.put(key, res);
            }else{
                gbResult.put(key,tmp);
            }
        }
    }
    private class AvgHandler extends GBHandler{
        ConcurrentHashMap<String,Integer> sum;
        ConcurrentHashMap<String,Integer> count;
        private AvgHandler(){
            count = new ConcurrentHashMap<>();
            sum = new ConcurrentHashMap<>();
        }
        @Override
        public void handle(String key, Field field) {
            int tmp = Integer.parseInt(field.toString());
            if(gbResult.containsKey(key)){
                count.put(key,count.get(key)+1);
                sum.put(key,sum.get(key)+tmp);
            }else{
                count.put(key,1);
                sum.put(key,tmp);
            }
            gbResult.put(key,sum.get(key)/count.get(key));
        }
    }
}

StringAggregator類的實現和Integer類似,只需要實現count即可

Aggregate類只需要不斷調用Aggregator提供的接口,加一層封裝即可

exercise3 HeapFile Mutability

這部分要求完成HeapPage和HeapFile中的插入、刪除,主要就是更新bitmap中的信息

實現余下部分

  • src/simpledb/HeapPage.java

  • src/simpledb/HeapFile.java

    (Note that you do not necessarily need to implement writePage at this point).

  • src/simpledb/BufferPool.java 中的

    • insertTuple()
    • deleteTuple()

注意點:

  1. heapFile的讀頁操作,都要應該從BufferPool里操作。比如加頁
  2. BufferPool里面的insertTuple需要做兩件事,
    • 一是調用HeapFile的insertTuple
    • 二是把在臟頁讀入cache中,一開始這里忘記讀入cache,一直出錯

exercise4 Insert&Delete

調用exercise3中實現的方法即可

實現

  • src/simpledb/Insert.java
  • src/simpledb/Delete.java

可以注意到,delete操作都不需要提供tableId,而insert操作需要提供tableId,這是因為delete的時候,可以從Tuple的RecordId中獲取到tableId

exercise5 page eviction

在BufferPool中實現頁面淘汰策略,主要實現幾個接口

  1. discardPage(PageId pid):用於移除cache中的頁
  2. flushPage(PageId pid):如果是臟頁,將臟頁寫入內存,並溢出臟位
  3. evictPage():自己實現一個頁面淘汰策略,這里我實現的是隨機淘汰
  4. flushAllPages():測試接口,無用

然后修改先前實現的BufferPool操作,保證cache中的頁面不大於numPages

實現

  • src/simpledb/BufferPool.java

這里我核心使用的是最簡單的隨機替換策略,這里其實埋了一個優化點,如果后面有機會可以優化為LRU或者LFU

    /**
     * Discards a page from the buffer pool.
     * Flushes the page to disk to ensure dirty pages are updated on disk.
     */
    private synchronized  void evictPage() throws DbException {
        // 采用最簡單的隨機淘汰策略
        List<Integer> keys = new ArrayList<>(pages.keySet());
        int randomKey = keys.get(new Random().nextInt(keys.size()));
        PageId evictPid = pages.get(randomKey).getId();
        try {
            flushPage(evictPid);
        } catch (IOException e) {
            e.printStackTrace();
        }
        discardPage(evictPid);
    }

一個要注意的點就是,對於臟頁若是要替換出去的話,得先刷到磁盤上再逐出內存BufferPool,不然會有數據不一致的風險

reference

6.830 Lab 2: SimpleDB Operators:https://blog.csdn.net/hjw199666/article/details/103590963

MIT 6.830 Database System 數據庫系統 Lab 2 實驗報告:https://zhuanlan.zhihu.com/p/159186187


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM