如何手動實現Try Insert和Insert Or Update


在日常開發中,我們有時會需要對數據的插入操作進行定制。比如,如果表里已有某某記錄就不寫入新紀錄,或者表里沒該記錄就插入,否則就更新。前者我們稱為TryInsert,后者為InsertOrUpdate(也叫做upsert)。一般來說,很多orm框架都會附帶這樣的函數,但是如果你要批量插入數據,orm自帶的函數就不太夠用了。下面我們從手動拼SQL的角度來實現TryInsertInsertOrUpdate

考慮到現在流行的兩大開源RDBMS對SQL標准支持比較落后,而早期的標准並沒有這方面的標准語法,所以我們分成MySQL篇和Postgres篇來分別使用它們各自的方言解決上面提到的兩個問題。

MySQL篇

原理解析

insert ignore into

插入如果報錯(主鍵或者Unique鍵重復),會把錯誤轉成警告,此時返回的影響行數為0,可以用來實現TryInsert()

replace into

replaceinsert語法基本一致,是Mysql的擴展語法,官方的InsertOrUpdatereplace語句的基本邏輯如下:

ok:=Insert()
if !ok {
  if duplicate-key {  // key重復就刪掉重新插入
    Delete()
    Insert()
  }
}

從這里我們可以看出replace語句的影響行數,如果是插入,影響行數為1;如果是更新,刪除再插入,影響行數為2。

Insert into ... on duplicate key update

也是MySQL擴展語法。... on duplicate key update的邏輯與replace差不多,唯一的區別就是如果插入的新值與舊值一樣,默認返回的影響行數為0,所以這里的邏輯是如果新值和舊值相同就不作處理。

代碼示例

下面是以golang為例,給出示例:

type User struct {
  UserID   int64  `gorm:"user_id"`
  Username string	`gorm:"username"`
  Password string	`gorm:"password"`
  Address  string	`gorm:"address"`
}

func BulkTryInsert(data []*User) error{
  str:=make([]string, 0, len(data))
  param:=make([]interface{},0,len(data)*4)  // 4個屬性
  for _,d:=range data {
    str=append(str,"(?,?,?,?)")
    param=append(d.UserID)
    param=append(d.Username)
    param=append(d.Password)
    param=append(d.Address)
  }
  stmt:=fmt.Sprintf("INSERT IGNORE INTO table_name(user_id,username,password,address) VALUES %s",strings.Join(str,",") )
  return DB.Exec(stmt, param...).Error
}

func BulkUpsert(data []*User) error{
  str:=make([]string, 0, len(data))
  param:=make([]interface{},0,len(data)*4)  // 4個屬性
  for _,d:=range data {
    str=append(str,"(?,?,?,?)")
    param=append(d.UserID)
    param=append(d.Username)
    param=append(d.Password)
    param=append(d.Address)
  }
  stmt:=fmt.Sprintf("REPLACE INTO table_name(user_id,username,password,address) VALUES %s",strings.Join(str,",") )    // 與上面的區別僅在這行的SQL
  return DB.Exec(stmt, param...).Error
}

Postgres篇

原理解析

Insert into ... on conflict (...) do nothing

on conflict后面需要帶上沖突的鍵,比如主鍵或者Unique約束。這條SQL的意思就如字面所示,當某某鍵存在重復沖突的時候,什么也不做,即TryInsert

Insert into ... on conflict (...) do update set (...)

這條SQL就比較復雜了,Postgres這個語法表面上看比MySQL自由度更高,實際上非常繁瑣笨重,不如MySQL務實。set的意思是,沖突時需要指定更新哪些屬性,這是強制的,必須具體地說明每個字段,真是不友好啊。大概是要寫成這樣,其中EXCLUDED指代要插入的那條記錄:

INSERT INTO ... on conflict (user_id, address) do update set password=EXCLUDED.password and username=EXCLUDED.username

代碼示例

這次我們設想一種實用的場景,python經常被用作科學計算,pandas是大家偏愛的計算包,pandasio部分提供了傻瓜式的讀寫文件和數據庫里數據的函數,比如寫數據庫的to_sql,但是這個函數有局限性,它只能做到TryInsert和清空表數據再插入,對於upsert則無能為力。目前來說,我們只能手動實現它。

按照上面的解析,我們需要給每張表設置好UniqueConstraint才能使用這個語法。下面給出一個例子:

# 使用的是sqlalchemy
Base = declarative_base()

# 將一個list分割成m個大小為n的list
def chunks(a, n):
    return [a[i:i + n] for i in range(0, len(a), n)]

class DBUser(Base):
  __tablename__ = 'user' # UniqueConstraint和PrimaryKey至少要有一個
  __table_args__ = (UniqueConstraint('user_id', 'address'), 
                   {'schema': 'db'})
  user_id = Column(BigInteger)
  username = Column(String(200))
  password = Column(String(200))
  address = Column(String(200))
  
  def dtype(self): # pandas需要的dtype
    d = {c.name: c.type for c in self.__table__.c}
    if 'id' in d:
    	el d['id']   # 一般id都是自動生成的,提供給pandas的dtype應該剔除id
    return d
  
  def fullname(self):
    return self.__table_args__[-1]['schema'] + '.' + self.__tablename__
  
  # 只要DBUser再提供一個Unique Constraint的屬性列表,下面這兩個函數就可以寫成通用的函數
  # 這里只是給出例子,點到為止
  def bulk_try_insert(self, engine, data):
    col = self.dtype().keys()
    col_str = ','.join(col)
    col_str = '(' + col_str + ')'
    update_col = []
    for c in col:
      update_str = '{0}=EXCLUDED.{1}'.format(c, c)
      update_col.append(update_str)
    value_str = []
    value_args = []
    for d in data:
      tmp_str = '(' + col.__len__() * '%s,'
      tmp_str = tmp_str[:-1] + ')'
      value_str.append(tmp_str)
      for k in col:
        value_args.append(d[k])
    
    stmt= 'insert into ' + self.fullname() + col_str + 'values ' + ','.join(
      value_str) + 'on conflict (user_id, address) do update set ' + ",".join(update_col)
    engine.execute(stmt, value_args)
  
  def bulk_insert_chunk(self, engine, data, n=1000):
    d_list = chunks(data, n)
    for a in d_list:
      self.bulk_insert(engine, a)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM