MIT 6.830 LAB2 DBOperator
2021/04/01-2021/04/05
前言
課程地址:http://db.lcs.mit.edu/6.830/sched.php
代碼:https://github.com/MIT-DB-Class/simple-db-hw
講義:https://github.com/MIT-DB-Class/course-info-2018/
清明期間用零散的時間把Lab2寫完了,整個Lab2基本是在為simpledb實現一個基本的操作。比如我們常用的選擇、join、聚合、插入、刪除等。
LAB2
exercise1 Join&Filter
xxxPredicate是輔助類,一般用於根據Field是否滿足條件來篩選Tuple;
Filter實現了Operator接口,理解來說就是根據某個Predicate來篩選Tuple,理解來說就是類似篩選出滿足where xxx="xxx"的Tuple;
Join實現了Operator接口,是關系運算中的連接操作
Operator接口是實現類可以參考Project.java或者OrderBy.java
實現
- src/simpledb/Predicate.java
- src/simpledb/JoinPredicate.java
- src/simpledb/Filter.java
- src/simpledb/Join.java
Join難點在於fetchNext,由於是嵌套循環,需要在Join類中添加成員變量private Tuple t1;
protected Tuple fetchNext() throws TransactionAbortedException, DbException {
while (t1!=null||child1.hasNext()){
if(t1==null){ // init t1 if null
if(child1.hasNext()){
t1 = child1.next();
}else{
return null;
}
}
if(!child2.hasNext()){ // t2到頭了,nextLoop
if(child1.hasNext()){
child2.rewind();
t1 = child1.next();
}else{
return null;
}
}
while (child2.hasNext()){
Tuple t2 = child2.next();
if(joinPredicate.filter(t1,t2)){
Tuple res = new Tuple(getTupleDesc());
for (int i = 0; i < t1.getTupleDesc().numFields(); i++) {
res.setField(i,t1.getField(i));
}
for (int i = 0; i < t2.getTupleDesc().numFields(); i++) {
res.setField(t1.getTupleDesc().numFields()+i,t2.getField(i));
}
return res;
}
}
}
return null;
}
這邊的基類Operator其實也是一個OpIterator,其next、hasNext的實現采用了模板方法的設計模式,用了一個Tuple作為緩存,十分巧妙
子類(也就是我們這里實現的Join、Filter)只需要實現fetchNext就可以了
public abstract class Operator implements OpIterator {
private Tuple next = null;
public boolean hasNext() throws DbException, TransactionAbortedException {
if (!this.open)
throw new IllegalStateException("Operator not yet open");
if (next == null)
next = fetchNext();
return next != null;
}
public Tuple next() throws DbException, TransactionAbortedException,
NoSuchElementException {
if (next == null) {
next = fetchNext();
if (next == null)
throw new NoSuchElementException();
}
Tuple result = next;
next = null;
return result;
}
/**
* Returns the next Tuple in the iterator, or null if the iteration is
* finished. Operator uses this method to implement both <code>next</code>
* and <code>hasNext</code>.
*
* @return the next Tuple in the iterator, or null if the iteration is
* finished.
*/
protected abstract Tuple fetchNext() throws DbException,
TransactionAbortedException;
....
}
exercise2 Aggregate
Aggregate實現Operator接口,是聚合操作類。比如select count(*) from xx group by xxx
IntegerAggregator和StringAggregator其實都是Aggregate的輔助類
區別在於聚合操作字段的屬性,如果是Integer,可以有count、sum、max、min、avg操作;如果是String,只有count操作
實現
- src/simpledb/IntegerAggregator.java
- src/simpledb/StringAggregator.java
- src/simpledb/Aggregate.java
注意這里的groupby字段可以是空,也就是可以允許有沒有groupby的情況下使用聚合函數,這是符合MySQL語法的,如select count(*) from user;
總的來說,難點只在於實現IntegerAggregator,StringAggregator完全可以看作IntegerAggregator的子類,而Aggregate只是對xxxAggregator的一個封裝,只需要調用xxxAggregator即可。
這里我的實現和網上大多數代碼不一樣,充分使用了依賴倒置原則。
以IntegerAggregator為例,我們需要實現五個聚合操作,將其封裝為GBHandler私有抽象類,通過策略模式進行實現,保證可讀性和可修改性。將聚合操作的職責封裝進GBHandler中,頂層的IntegerAggregator只需要將其計算后的結果,使用TupleIterator進行封裝,返回結果即可。
/**
* Knows how to compute some aggregate over a set of IntFields.
*
* finish in lab2 exercise2
*/
public class IntegerAggregator implements Aggregator {
private static final long serialVersionUID = 1L;
private static final String NO_GROUPING_KEY = "NO_GROUPING_KEY";
/**
* the 0-based index of the group-by field in the tuple,
* or NO_GROUPING if there is no grouping
*/
private int gbFieldIndex;
/**
* the type of the group by field (e.g., Type.INT_TYPE),
* or null if there is no grouping
*/
private Type gbFieldType;
/**
* the 0-based index of the aggregate field in the tuple
*/
private int aggregateFieldIndex;
/**
* the aggregation operator
*/
private Op aggregationOp;
private GBHandler gbHandler;
/**
* Aggregate constructor
*
* @param gbfield
* the 0-based index of the group-by field in the tuple, or
* NO_GROUPING if there is no grouping
* @param gbfieldtype
* the type of the group by field (e.g., Type.INT_TYPE), or null
* if there is no grouping
* @param afield
* the 0-based index of the aggregate field in the tuple
* @param what
* the aggregation operator
*/
public IntegerAggregator(int gbfield, Type gbfieldtype, int afield, Op what) {
this.gbFieldIndex = gbfield;
this.gbFieldType = gbfieldtype;
this.aggregateFieldIndex = afield;
this.aggregationOp = what;
switch (what){
case MIN:
gbHandler =new MinHandler();
break;
case MAX:
gbHandler =new MaxHandler();
break;
case AVG:
gbHandler =new AvgHandler();
break;
case SUM:
gbHandler =new SumHandler();
break;
case COUNT:
gbHandler =new CountHandler();
break;
default:
throw new UnsupportedOperationException("Unsupported aggregation operator ");
}
}
/**
* Merge a new tuple into the aggregate, grouping as indicated in the
* constructor
*
* @param tup
* the Tuple containing an aggregate field and a group-by field
*/
public void mergeTupleIntoGroup(Tuple tup) {
if(gbFieldType!=null&&(!tup.getField(gbFieldIndex).getType().equals(gbFieldType))){
throw new IllegalArgumentException("Given tuple has wrong type");
}
String key;
if (gbFieldIndex == NO_GROUPING) {
key = NO_GROUPING_KEY;
} else {
key = tup.getField(gbFieldIndex).toString();
}
gbHandler.handle(key,tup.getField(aggregateFieldIndex));
}
/**
* Create a OpIterator over group aggregate results.
*
* @return a OpIterator whose tuples are the pair (groupVal, aggregateVal)
* if using group, or a single (aggregateVal) if no grouping. The
* aggregateVal is determined by the type of aggregate specified in
* the constructor.
*/
public OpIterator iterator() {
Map<String,Integer> results = gbHandler.getGbResult();
Type[] types;
String[] names;
TupleDesc tupleDesc;
List<Tuple> tuples = new ArrayList<>();
if(gbFieldIndex==NO_GROUPING){
types = new Type[]{Type.INT_TYPE};
names = new String[]{"aggregateVal"};
tupleDesc = new TupleDesc(types,names);
for(Integer value:results.values()){
Tuple tuple = new Tuple(tupleDesc);
tuple.setField(0,new IntField(value));
tuples.add(tuple);
}
}else{
types = new Type[]{gbFieldType,Type.INT_TYPE};
names = new String[]{"groupVal","aggregateVal"};
tupleDesc = new TupleDesc(types,names);
for(Map.Entry<String,Integer> entry:results.entrySet()){
Tuple tuple = new Tuple(tupleDesc);
if(gbFieldType==Type.INT_TYPE){
tuple.setField(0,new IntField(Integer.parseInt(entry.getKey())));
}else{
tuple.setField(0,new StringField(entry.getKey(),entry.getKey().length()));
}
tuple.setField(1,new IntField(entry.getValue()));
tuples.add(tuple);
}
}
return new TupleIterator(tupleDesc,tuples);
}
private abstract class GBHandler{
ConcurrentHashMap<String,Integer> gbResult;
abstract void handle(String key,Field field);
private GBHandler(){
gbResult = new ConcurrentHashMap<>();
}
public Map<String,Integer> getGbResult(){
return gbResult;
}
}
private class CountHandler extends GBHandler {
@Override
public void handle(String key, Field field) {
if(gbResult.containsKey(key)){
gbResult.put(key,gbResult.get(key)+1);
}else{
gbResult.put(key,1);
}
}
}
private class SumHandler extends GBHandler{
@Override
public void handle(String key, Field field) {
if(gbResult.containsKey(key)){
gbResult.put(key,gbResult.get(key)+Integer.parseInt(field.toString()));
}else{
gbResult.put(key,Integer.parseInt(field.toString()));
}
}
}
private class MinHandler extends GBHandler{
@Override
void handle(String key, Field field) {
int tmp = Integer.parseInt(field.toString());
if(gbResult.containsKey(key)){
int res = gbResult.get(key)<tmp?gbResult.get(key):tmp;
gbResult.put(key, res);
}else{
gbResult.put(key,tmp);
}
}
}
private class MaxHandler extends GBHandler{
@Override
void handle(String key, Field field) {
int tmp = Integer.parseInt(field.toString());
if(gbResult.containsKey(key)){
int res = gbResult.get(key)>tmp?gbResult.get(key):tmp;
gbResult.put(key, res);
}else{
gbResult.put(key,tmp);
}
}
}
private class AvgHandler extends GBHandler{
ConcurrentHashMap<String,Integer> sum;
ConcurrentHashMap<String,Integer> count;
private AvgHandler(){
count = new ConcurrentHashMap<>();
sum = new ConcurrentHashMap<>();
}
@Override
public void handle(String key, Field field) {
int tmp = Integer.parseInt(field.toString());
if(gbResult.containsKey(key)){
count.put(key,count.get(key)+1);
sum.put(key,sum.get(key)+tmp);
}else{
count.put(key,1);
sum.put(key,tmp);
}
gbResult.put(key,sum.get(key)/count.get(key));
}
}
}
StringAggregator類的實現和Integer類似,只需要實現count即可
Aggregate類只需要不斷調用Aggregator提供的接口,加一層封裝即可
exercise3 HeapFile Mutability
這部分要求完成HeapPage和HeapFile中的插入、刪除,主要就是更新bitmap中的信息
實現余下部分
-
src/simpledb/HeapPage.java
-
src/simpledb/HeapFile.java
(Note that you do not necessarily need to implement writePage at this point).
-
src/simpledb/BufferPool.java 中的
- insertTuple()
- deleteTuple()
注意點:
- heapFile的讀頁操作,都要應該從BufferPool里操作。比如加頁
- BufferPool里面的insertTuple需要做兩件事,
- 一是調用HeapFile的insertTuple
- 二是把在臟頁讀入cache中,一開始這里忘記讀入cache,一直出錯。
exercise4 Insert&Delete
調用exercise3中實現的方法即可
實現
- src/simpledb/Insert.java
- src/simpledb/Delete.java
可以注意到,delete操作都不需要提供tableId,而insert操作需要提供tableId,這是因為delete的時候,可以從Tuple的RecordId中獲取到tableId
exercise5 page eviction
在BufferPool中實現頁面淘汰策略,主要實現幾個接口
discardPage(PageId pid):用於移除cache中的頁flushPage(PageId pid):如果是臟頁,將臟頁寫入內存,並溢出臟位evictPage():自己實現一個頁面淘汰策略,這里我實現的是隨機淘汰flushAllPages():測試接口,無用然后修改先前實現的BufferPool操作,保證cache中的頁面不大於numPages
實現
- src/simpledb/BufferPool.java
這里我核心使用的是最簡單的隨機替換策略,這里其實埋了一個優化點,如果后面有機會可以優化為LRU或者LFU
/**
* Discards a page from the buffer pool.
* Flushes the page to disk to ensure dirty pages are updated on disk.
*/
private synchronized void evictPage() throws DbException {
// 采用最簡單的隨機淘汰策略
List<Integer> keys = new ArrayList<>(pages.keySet());
int randomKey = keys.get(new Random().nextInt(keys.size()));
PageId evictPid = pages.get(randomKey).getId();
try {
flushPage(evictPid);
} catch (IOException e) {
e.printStackTrace();
}
discardPage(evictPid);
}
一個要注意的點就是,對於臟頁若是要替換出去的話,得先刷到磁盤上再逐出內存BufferPool,不然會有數據不一致的風險
reference
6.830 Lab 2: SimpleDB Operators:https://blog.csdn.net/hjw199666/article/details/103590963
MIT 6.830 Database System 數據庫系統 Lab 2 實驗報告:https://zhuanlan.zhihu.com/p/159186187
