【作業內容】
模擬sql執行過程,完成對特定表的增、刪、改、查。
【作業分析】
隨着現在信息量的增加,以及數據庫提供的強大數據管理能力,數據庫編程編程了IT開發者必備的技能之一。數據庫開發者會預留數據庫對外接口,以便開發人員來使用,通過這些接口,我們可以執行我們的查詢、刪除、更行、插入等sql語句,而我們的作業內容則是為了更好的了解sql語句執行的過程,調高自己的sql語句編寫能力,也是想通過這個例子,對函數有進一步的了解,話不多少,代碼如下:
keyword_list = ['from','into','update'] spr = ' ' def SearchAll(strlist,table_name): '查詢' if 'where' in strlist: Condition_query(strlist,table_name) else: Query(strlist,table_name) def Delete(strlist,table_name): '刪除數據項' temp_tablename = '' index = 0 if 'where' not in strlist: index,colName = GetTableItemNum(table_name) f = open(table_name,'w') f.write(colName) f.close() else: temp_tablename = table_name + '_1' with open(temp_tablename,'a',encoding='utf8') as f1: with open(table_name,'r',encoding='utf8') as f: for line in f: if not Judegfilter(strlist,line): f1.write(line) else: index += 1 os.remove(table_name) os.rename(temp_tablename,table_name) print("Delete %d item data." %index) def Judegfilter(strlist,line): flag = False pos = strlist.index('where') colNameList = GetTableColName(strlist[pos - 1]) line_list = list(str(line).strip('\n').split(spr)) index = GetIndexInList(colNameList, strlist[pos + 1]) colValue = strlist[pos + 3].strip('"').strip('\'') if '=' in strlist: if colValue == line_list[index + 1]: flag = True elif 'like' in strlist: if colValue in line_list[index + 1]: flag = True elif colValue.isdigit(): if strlist[pos + 2] == '<': if int(colValue) > int(line_list[index + 1]): flag = True elif strlist[pos + 2] == '>': if int(colValue) < int(line_list[index + 1]): flag = True return flag def GetTableItemNum(table_name): index = 0 colName = '' with open(table_name,'r',encoding='utf8') as f: for line in f: if index == 0: colName = line.strip('\n') index += 1 if index > 0: index -= 1 return index,colName def Update(sqlList,table_name): '更新數據項' pass def Insert(sqlList,table_name): '插入數據項' index = sqlList.index('into') ''' 因為sql語句中的方式為table(colname1,cloname2),所以先取相應的值,然后將括號去掉,轉換成列表 ''' temp = sqlList[index +1].lower().strip(')') pos = str(temp).index('(') insertColName = temp[pos+1:len(temp)].split(',') if 'phone' not in insertColName: print('插入唯一鍵不存在,請檢查您的插入語句') else: temp = sqlList[index + 2].strip(')') insertValues = GetValueFromStr(temp) if len(insertValues) == 0: print('插入值不能為空,請檢查您的sql語句') else: InsertDataToDB(table_name, insertColName, insertValues) def GetValueFromStr(temp): pos = str(temp).index('(') insertValues = temp[pos + 1:len(temp)].split(',') insertValues1 = [] for line in insertValues: temp1 = line.strip('\'') insertValues1.append(temp1) return insertValues1 def InsertDataToDB(table_name, insertColName, insertValues): '插入數據到文件中' pos = insertColName.index('phone') value = insertColName[pos] flag = False pos = insertColName.index('phone') if pos > len(insertValues): print('插入值過少,請檢查您的sql語句') elif len(insertValues[pos]) == 0: print('插入值不能為空,請檢查您的sql語句') else: flag,tableColName = IsValueInDB(insertValues[pos], table_name) if flag: print("插入值已存在,請您檢查sql語句") else: WriteDataToDB(table_name, insertColName, insertValues,tableColName) def IsValueInDB(value, table_name): '判斷插入值是否已經存在' index = 0 templist = [] flag = False with open(table_name,'r',encoding='utf8') as f: index = 0 for line in f: if index == 0: templist = line.lower().strip(spr).split(spr) elif value in line: flag = True break index += 1 return flag,templist def WriteDataToDB(table_name, insertColName, insertValues,tableColName): '將數據組成合理的格式寫入到文件中' indexlist = [] index = GetOrderIndex() temp = spr + str(index) # valuedir = GetValue ''' with open(table_name, 'a',encoding='utf8') as f: for line in insertColName: pos = tableColName.index(line) indexlist.append(pos) lenNum = len(indexlist) temp = spr + str(index) if lenNum == 1: temp += pos * spr else: pos1 = indexlist[lenNum -1] - indexlist[lenNum -2] temp += pos1 * spr pos = insertColName.index(line) temp += insertValues[0] f.write(temp) ''' PutOrderIndex(index +1) print("插入數據成功") def GetOrderIndex(): '獲取自增量最后值,該值存放在文件中' index = 0 with open('index', 'r', encoding='utf8') as f: index = int(f.readline()) return index def PutOrderIndex(orderIndex): '將自增量寫入到文件中做記錄' with open('index', 'w', encoding='utf8') as f: temp = str(orderIndex) f.write(temp) def Condition_query(strlist,table_name): '條件查詢:strlist(將sql語句拆分之后的列表)、strlist(表名)' colList = list(strlist[1].split(',')) #查詢項 pos = GetIndexInList(strlist,'where') #pos 條件判斷關鍵字位置 index = 0 total = -1 with open(table_name, 'r', encoding='utf8') as f: for line in f: #line文件中讀取信息 temp = GetSearchvalue(line,strlist,index,colList,pos) index += 1 if len(temp) != 0: print(temp) total += 1 pass print('Total:', total) def GetSearchvalue(line,strlist,index,colList,pos): 'sql語句中的查詢條件中,條件項為數字類型' temp = '' if index == 0: temp = Print_colname(strlist, colList,line) else: temp = CheckCondition(strlist,line,pos) pass return temp def Print_colname(strlist,colList,line): temp = '' line = line.strip('\n') if '*' in strlist: temp = line else: for colIndex in colList: temp += colIndex temp += spr return temp def CheckCondition(strlist, line, pos): '判斷當前值是否符合查詢條件過濾值' temp = '' colNameList = GetTableColName(strlist[pos - 1]) line_list = list(str(line).strip('\n').split(spr)) index = GetIndexInList(colNameList,strlist[pos+1]) colValue = strlist[pos + 3].strip('"').strip('\'') if '=' in strlist: if colValue == line_list[index + 1]: temp = GetDataFromLine(line_list, line, strlist,pos) elif 'like' in strlist: if colValue in line_list[index + 1]: temp = GetDataFromLine(line_list, line, strlist, pos) elif colValue.isdigit(): if strlist[pos + 2] == '<': if int(colValue) > int(line_list[index + 1]): temp = GetDataFromLine(line_list, line, strlist,pos) elif strlist[pos + 2] == '>': if int(colValue) < int(line_list[index + 1]): temp = GetDataFromLine(line_list, line, strlist,pos) return temp def GetDataFromLine(line_list, line, strlist,pos): '將符合條件的數據進行分揀,依據查詢的字段名來取相應的值,如果是全字段,則直接返回值' temp = '' if '*' in strlist: temp = line.strip('\n') else: selColIndex = list(strlist[1].split(',')) colNameList = GetTableColName(strlist[pos-1]) selColIndexlist = GetIndexWithColName(selColIndex,colNameList) for data in selColIndexlist: temp += line_list[data+1].strip('\n') temp += spr return temp def Query(strlist,table_name): '非條件查詢' index = 0 if '*' in strlist: with open(table_name, 'r', encoding='utf8') as f: for line in f: print(line.strip('\n')) index += 1 else: index = Query_someone(strlist,table_name) print("Total:%d" %(index-1)) def Query_someone(strlist,table_name): '查詢任一一項或者多項數據' colList = list(strlist[1].split(',')) colnameList = GetTableColName(table_name) colNameIndex = GetIndexWithColName(colList,colnameList) ''' 1、讀取文件中第一行數據,獲得字段信息,判斷要查詢的字段是否為文件字段項,如果不是, 提示查詢出錯; 2、獲取查詢字段在文件字段項中的位置。獲取該位置,在之后讀取文件中的數據時使用到 ''' if len(colNameIndex) == 0: print('【Error】查詢字段不存在') return 0 else: colNameIndex.sort() index = 0 with open(table_name, 'r', encoding='utf8') as f: for line1 in f: strInfo = GetColValue(line1, colNameIndex, index) index += 1 print(strInfo) return index def GetTableColName(table_name): '獲取表中字段項列表' colnameList = [] with open(table_name, 'r', encoding='utf8') as f: colnameList = list(f.readline().lower().strip('\n').split(spr)) return colnameList def GetTableName(strSql): '通過sql語句轉化之后的列表,獲取到表名' name = '' list = strSql.split(' ') for line in keyword_list: if line in list: name_pos = list.index(line) if len(list) == name_pos+1: break elif line == 'into': temp = list[name_pos+1] name = SperString(temp,'(') else: name = list[name_pos+1] break return name def SperString(strData, sper): temp = '' if sper in strData: pos = strData.index(sper) temp = strData[0:pos] else: temp = strData return temp def GetIndexInList(list, name): '通過列表中的數據項,獲取數據項在列表中的位置信息' if name in list: return list.index(name) else: return -1 def GetIndexWithColName(colList, colnameList = []): '查詢時會遇到查詢多項數據,此處是將每個數據項在表中對應的位置信息獲取到' colNameIndex = [] for line in colList: if line.lower() in colnameList: index = colnameList.index(line.lower()) colNameIndex.append(index) return colNameIndex def GetColValue(colValue,list,index): '通過表中數據項所在位置,取出相應的值' strInfo = '' for line2 in list: templist = colValue.split(spr) if index == 0: strInfo += templist[line2].strip('\n') else: strInfo += templist[line2 + 1].strip('\n') strInfo += spr return strInfo def RunSql(strSql): '執行sql語句' sqlList = strSql.strip('\n').split(' ') table_name = GetTableName(strSql) oper = sqlList[0].lower() flag,errorInfo = JudgeSqlIsTrue(sqlList,table_name) if flag: First_Keyword_list[oper](sqlList,table_name) else: print('執行的sql語句有語法錯誤,錯誤提示:【%s】。請檢查您的執行語句' %errorInfo) def JudgeSqlIsTrue(strSqlList,table_Name): '判斷執行語句是否符合語法負責' flag = False temp = '' if len(strSqlList) == 0: print('空語句無法執行') else: if strSqlList[0].lower() not in First_Keyword_list: temp = '執行語句不符合執行規則,沒有找到執行關鍵字' elif not JudgeTableName(table_Name): temp = '查詢的表不存在' elif not JudgeColName(strSqlList, table_Name): temp = '查詢的表字段不存在' else: flag = True return flag,temp def JudgeTableName(table_Name): isExists = os.path.exists(table_Name) return isExists def JudgeColName(strSqlList,table_Name): '判斷操作字段是否在表中' if 'delete'in strSqlList or '*' in strSqlList: return True else: searchColNamelist = GetSearchNameFromSql(strSqlList) tablecolNamelist = GetTableColName(table_Name) if len(searchColNamelist) == 0: return False for line in searchColNamelist: if line not in tablecolNamelist: return False return True def GetSearchNameFromSql(strSqlList): '從sql語句中提取字段名' tempList = [] if 'select' in strSqlList: tempList = strSqlList[1].lower().split(',') elif 'insert' in strSqlList: temp = strSqlList[2].lower().strip(')') if '(' in temp: pos = temp.index('(') tempList = (temp[pos+1:len(temp)]).split(',') print(tempList) elif 'update' in strSqlList: pass return tempList #strSql = 'select * from staff_table where age > 22' #select * from staff_table where name like '龍龍' #insert into staff_table(name,dept) values('龍龍','教師') import os time = 0 First_Keyword_list = {'select':SearchAll,'insert':Insert,'update':Update,'delete':Delete} while time < 3: strSql = input('sql>') RunSql(strSql) time += 1