用手機寫代碼:基於 Serverless 的在線編程能力探索


簡介:Serverless 架構的按量付費模式,可以在保證在線編程功能性能的前提下,進一步降低成本。本文將會以阿里雲函數計算為例,通過 Serverless 架構實現一個 Python 語言的在線編程功能,並對該功能進一步的優化,使其更加貼近本地本地代碼執行體驗。

隨着計算機科學與技術的發展,越來越多的人開始接觸編程,也有越來越多的在線編程平台誕生。以 Python 語言的在線編程平台為例,大致可以分為兩類:

  • 一類是 OJ 類型的,即在線評測的編程平台,這類的平台特點是阻塞類型的執行,即用戶需要一次性將代碼和標准輸入內容提交,當程序執行完成會一次性將結果返回;
  • 另一類則是學習、工具類的在線編程平台,例如 Anycodes 在線編程等網站,這一類平台的特點是非阻塞類型的執行,即用戶可以實時看到代碼執行的結果,以及可以實時內容進行內容的輸入。

但是,無論是那種類型的在線編程平台,其背后的核心模塊( “代碼執行器”或“判題機”)都是極具有研究價值,一方面,這類網站通常情況下都需要比要嚴格的“安全機制”,例如程序會不會有惡意代碼,出現死循環、破壞計算機系統等,程序是否需要隔離運行,運行時是否會獲取到其他人提交的代碼等;

另一方面,這類平台通常情況下都會對資源消耗比較大,尤其是比賽來臨時,更是需要突然間對相關機器進行擴容,必要時需要大規模集群來進行應對。同時這類網站通常情況下也都有一個比較大的特點,那就是觸發式,即每個代碼執行前后實際上並沒有非常緊密的前后文關系等。

隨着 Serverless 架構的不斷發展,很多人發現 Serverless 架構的請求級隔離和極致彈性等特性可以解決傳統在線編程平台所遇到的安全問題和資源消耗問題,Serverless 架構的按量付費模式,可以在保證在線編程功能性能的前提下,進一步降低成本。所以,通過 Serverless 架構實現在線編程功能的開發就逐漸的被更多人所關注和研究。本文將會以阿里雲函數計算為例,通過 Serverless 架構實現一個 Python 語言的在線編程功能,並對該功能進一步的優化,使其更加貼近本地本地代碼執行體驗。

在線編程功能開發

一個比較簡單的、典型的在線編程功能,在線執行模塊通常情況下是需要以下幾個能力:

  • 在線執行代碼
  • 用戶可以輸入內容
  • 可以返回結果(標准輸出、標准錯誤等)

除了在線編程所需要實現的功能之外,在線編程在 Serverless 架構下,所需要實現的業務邏輯,也僅僅被收斂到關注代碼執行模塊即可:獲取客戶端發送的程序信息(包括代碼、標准輸入等),將代碼緩存到本地,執行代碼,獲取結果,但會給客戶端,整個架構的流程簡圖為:

關於執行代碼部分,可以通過 Python 語言的 subprocess 依賴中的 Popen() 方法實現,在使用 Popen() 方法時,有幾個比較重要的概念,需要明確:
  • subprocess.PIPE:一個可以被用於 Popen 的stdin 、stdout 和 stderr 3 個參數的特殊值,表示需要創建一個新的管道;
  • subprocess.STDOUT:一個可以被用於 Popen 的 stderr 參數的輸出值,表示子程序的標准錯誤匯合到標准輸出;

所以,當我們想要實現可以:

進行標准輸入(stdin),獲取標准輸出(stdout)以及標准錯誤(stderr)的功能

可以簡化代碼實現為:

除代碼執行部分之外,在 Serverless 架構下,獲取到用戶代碼並將其存儲過程中,需要額外注意函數實例中目錄的讀寫權限。通常情況下,在函數計算中,如果不進行硬盤掛載,只有/tmp/目錄是有可寫入權限的。所以在該項目中,我們將用戶傳遞到服務端的代碼進行臨時存儲時,需要將其寫入臨時目錄/tmp/,在臨時存儲代碼的時候,還需要額外考慮實例復用的情況,所以此時,可以為臨時代碼提供臨時的文件名,例如:

# -*- coding: utf-8 -*-

import randomrandom

Str = lambda num=5: "".join(random.sample('abcdefghijklmnopqrstuvwxyz', num))

path = "/tmp/%s"% randomStr(5)

完整的代碼實現為:

# -*- coding: utf-8 -*-

import json

import uuid

import random

import subprocess

# 隨機字符串

randomStr = lambda num=5: "".join(random.sample('abcdefghijklmnopqrstuvwxyz', num))

# Response

class Response:

   def __init__(self, start_response, response, errorCode=None):

       self.start = start_response

       responseBody = {

           'Error': {"Code": errorCode, "Message": response},

       } if errorCode else {

           'Response': response

       }

       # 默認增加uuid,便於后期定位

       responseBody['ResponseId'] = str(uuid.uuid1())

       self.response = json.dumps(responseBody)

 

   def __iter__(self):

       status = '200'

       response_headers = [('Content-type', 'application/json; charset=UTF-8')]

       self.start(status, response_headers)

       yield self.response.encode("utf-8")

 

def WriteCode(code, fileName):

   try:

       with open(fileName, "w") as f:

           f.write(code)

       return True

   except Exception as e:

       print(e)

       return False

 

def RunCode(fileName, input_data=""):

   child = subprocess.Popen("python %s" % (fileName),

                            stdin=subprocess.PIPE,

                            stdout=subprocess.PIPE,

                            stderr=subprocess.STDOUT,

                            shell=True)

   output = child.communicate(input=input_data.encode("utf-8"))

   return output[0].decode("utf-8")

 

def handler(environ, start_response):

   try:

       request_body_size = int(environ.get('CONTENT_LENGTH', 0))

   except (ValueError):

       request_body_size = 0

   requestBody = json.loads(environ['wsgi.input'].read(request_body_size).decode("utf-8"))

 

   code = requestBody.get("code", None)

   inputData = requestBody.get("input", "")

   fileName = "/tmp/" + randomStr(5)

   responseData = RunCode(fileName, inputData) if code and WriteCode(code, fileName) else "Error"

   return Response(start_response, {"result": responseData})

完成核心的業務邏輯編寫之后,我們可以將代碼部署到阿里雲函數計算中。部署完成之后,我們可以獲得到接口的臨時測試地址。通過 PostMan 對該接口進行測試,以 Python 語言的輸出語句為例:

print('HELLO WORLD')

可以看到,當我們通過 POST 方法,攜帶代碼等作為參數,發起請求后,獲得到的響應為:

我們通過響應結果,可以看到,系統是可以正常輸出我們的預期結果:“HELLO WORLD” 至此我們完成了標准輸出功能的測試,接下來我們對標准錯誤等功能進行測試,此時我們將剛剛的輸出代碼進行破壞:

print('HELLO WORLD)

使用同樣的方法,再次進行代碼執行,可以看到結果:

結果中,我們可以看到 Python 的報錯信息,是符合我們的預期的,至此完成了在線編程功能的標准錯誤功能的測試,接下來,我們進行標准輸入功能的測試,由於我們使用的 subprocess.Popen() 方法,是一種阻塞方法,所以此時我們需要將代碼和標准輸入內容一同放到服務端。測試的代碼為:

tempInput = input('please input: ')

print('Output: ', tempInput)

測試的標准輸入內容為:“serverless devs”。

當我們使用同樣的方法,發起請求之后,我們可以看到:

系統是正常輸出預期的結果。至此我們完成了一個非常簡單的在線編程服務的接口。該接口目前只是初級版本,僅用於學習使用,其具有極大的優化空間:
  • 超時時間的處理
  • 代碼執行完成,可以進行清理

當然,通過這個接口也可以看到這樣一個問題:那就是代碼執行過程中是阻塞的,我們沒辦法進行持續性的輸入,也沒有辦法實時輸出,即使需要輸入內容也是需要將代碼和輸入內容一並發送到服務端。這種模式和目前市面上常見的 OJ 模式很類似,但是就單純的在線編程而言,還需要進一步對項目優化,使其可以通過非阻塞方法,實現代碼的執行,並且可以持續性的進行輸入操作,持續性的進行內容輸出。

更貼近“本地”的代碼執行器

我們以一段代碼為例:

import time

print("hello world")

time.sleep(10)

tempInput = input("please: ")

print("Input data: ", tempInput)

當我們在本地的執行這段 Python 代碼時,整體的用戶側的實際表現是:

  • 系統輸出 hello world
  • 系統等待 10 秒
  • 系統提醒我們 please,我們此時可以輸入一個字符串
  • 系統輸出 Input data 以及我們剛剛輸入的字符串

但是,這段代碼如果應用於傳統 OJ 或者剛剛我們所實現的在線編程系統中,表現則大不相同:

  • 代碼與我們要輸入內容一同傳給系統
  • 系統等待 10 秒
  • 輸出 hello world、please,以及最后輸 Input data 和我們輸入的內容

可以看到,OJ 模式上的在線編程功能和本地是有非常大的差距的,至少在體驗層面,這個差距是比較大的。為了減少這種體驗不統一的問題,我們可以將上上述的架構進一步升級,通過函數的異步觸發,以及 Python 語言的 pexpect.spawn() 方法實現一款更貼近本地體驗的在線編程功能:

在整個項目中,包括了兩個函數,兩個存儲桶:
  • 業務邏輯函數:該函數的主要操作是業務邏輯,包括創建代碼執行的任務(通過對象存儲觸發器進行異步函數執行),以及獲取函數輸出結果以及對任務函數的標准輸入進行相關操作等;
  • 執行器函數:該函數的主要作用是執行用戶的函數代碼,這部分是通過對象存儲觸發,通過下載代碼、執行代碼、獲取輸入、輸出結果等;代碼獲取從代碼存儲桶,輸出結果和獲取輸入從業務存儲桶;
  • 代碼存儲桶:該存儲桶的作用是存儲代碼,當用戶發起運行代碼的請求, 業務邏輯函數收到用戶代碼后,會將代碼存儲到該存儲桶,再由該存儲桶處罰異步任務;
  • 業務存儲桶:該存儲桶的作用是中間量的輸出,主要包括輸出內容的緩存、輸入內容的緩存;該部分數據可以通過對象存儲的本身特性進行生命周期的制定;

為了讓代碼在線執行起來,更加貼近本地體驗,該方案的代碼分為兩個函數,分別進行業務邏輯處理和在線編程核心功能。

其中業務邏輯處理函數,主要是:

  • 獲取用戶的代碼信息,生成代碼執行 ID,並將代碼存到對象存儲,異步觸發在線編程函數的執行,返回生成代碼執行 ID;
  • 獲取用戶的輸入信息和代碼執行 ID,並將內容存儲到對應的對象存儲中;
  • 獲取代碼的輸出結果,根據用戶指定的代碼執行 ID,將執行結果從對象存儲中讀取出來,並返回給用戶;

整體的業務邏輯為:

實現的代碼為:

# -*- coding: utf-8 -*-

 

import os

import oss2

import json

import uuid

import random

 

# 基本配置信息

AccessKey = {

   "id": os.environ.get('AccessKeyId'),

   "secret": os.environ.get('AccessKeySecret')

}

 

OSSCodeConf = {

   'endPoint': os.environ.get('OSSConfEndPoint'),

   'bucketName': os.environ.get('OSSConfBucketCodeName'),

   'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))

}

 

OSSTargetConf = {

   'endPoint': os.environ.get('OSSConfEndPoint'),

   'bucketName': os.environ.get('OSSConfBucketTargetName'),

   'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))

}

 

# 獲取獲取/上傳文件到OSS的臨時地址

auth = oss2.Auth(AccessKey['id'], AccessKey['secret'])

codeBucket = oss2.Bucket(auth, OSSCodeConf['endPoint'], OSSCodeConf['bucketName'])

targetBucket = oss2.Bucket(auth, OSSTargetConf['endPoint'], OSSTargetConf['bucketName'])

 

# 隨機字符串

randomStr = lambda num=5: "".join(random.sample('abcdefghijklmnopqrstuvwxyz', num))

 

# Response

class Response:

   def __init__(self, start_response, response, errorCode=None):

       self.start = start_response

       responseBody = {

           'Error': {"Code": errorCode, "Message": response},

       } if errorCode else {

           'Response': response

       }

       # 默認增加uuid,便於后期定位

       responseBody['ResponseId'] = str(uuid.uuid1())

       self.response = json.dumps(responseBody)

 

   def __iter__(self):

       status = '200'

       response_headers = [('Content-type', 'application/json; charset=UTF-8')]

       self.start(status, response_headers)

       yield self.response.encode("utf-8")

 

def handler(environ, start_response):

   try:

       request_body_size = int(environ.get('CONTENT_LENGTH', 0))

   except (ValueError):

       request_body_size = 0

   requestBody = json.loads(environ['wsgi.input'].read(request_body_size).decode("utf-8"))

 

   reqType = requestBody.get("type", None)

 

   if reqType == "run":

       # 運行代碼

       code = requestBody.get("code", None)

       runId = randomStr(10)

       codeBucket.put_object(runId, code.encode("utf-8"))

       responseData = runId

   elif reqType == "input":

       # 輸入內容

       inputData = requestBody.get("input", None)

       runId = requestBody.get("id", None)

       targetBucket.put_object(runId + "-input", inputData.encode("utf-8"))

       responseData = 'ok'

   elif reqType == "output":

       # 獲取結果

       runId = requestBody.get("id", None)

       targetBucket.get_object_to_file(runId + "-output", '/tmp/' + runId)

       with open('/tmp/' + runId) as f:

           responseData = f.read()

   else:

       responseData = "Error"

 

   return Response(start_response, {"result": responseData})

執行器函數,主要是通過代碼存儲桶觸發,從而進行代碼執行的模塊,這一部分主要包括:

  • 從存儲桶獲取代碼,並通過 pexpect.spawn() 進行代碼執行;
  • 通過 pexpect.spawn().read_nonblocking() 非阻塞的獲取間斷性的執行結果,並寫入到對象存儲;
  • 通過 pexpect.spawn().sendline() 進行內容輸入;

整體流程為:

代碼實現為:

# -*- coding: utf-8 -*-

 

import os

import re

import oss2

import json

import time

import pexpect

 

# 基本配置信息

AccessKey = {

   "id": os.environ.get('AccessKeyId'),

   "secret": os.environ.get('AccessKeySecret')

}

 

OSSCodeConf = {

   'endPoint': os.environ.get('OSSConfEndPoint'),

   'bucketName': os.environ.get('OSSConfBucketCodeName'),

   'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))

}

 

OSSTargetConf = {

   'endPoint': os.environ.get('OSSConfEndPoint'),

   'bucketName': os.environ.get('OSSConfBucketTargetName'),

   'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))

}

 

# 獲取獲取/上傳文件到OSS的臨時地址

auth = oss2.Auth(AccessKey['id'], AccessKey['secret'])

codeBucket = oss2.Bucket(auth, OSSCodeConf['endPoint'], OSSCodeConf['bucketName'])

targetBucket = oss2.Bucket(auth, OSSTargetConf['endPoint'], OSSTargetConf['bucketName'])

 

 

def handler(event, context):

   event = json.loads(event.decode("utf-8"))

 

   for eveEvent in event["events"]:

 

       # 獲取object

       code = eveEvent["oss"]["object"]["key"]

       localFileName = "/tmp/" + event["events"][0]["oss"]["object"]["eTag"]

 

       # 下載代碼

       codeBucket.get_object_to_file(code, localFileName)

 

       # 執行代碼

       foo = pexpect.spawn('python %s' % localFileName)

 

       outputData = ""

 

       startTime = time.time()

 

       # timeout可以通過文件名來進行識別

       try:

           timeout = int(re.findall("timeout(.*?)s", code)[0])

       except:

           timeout = 60

 

       while (time.time() - startTime) / 1000 <= timeout:

           try:

               tempOutput = foo.read_nonblocking(size=999999, timeout=0.01)

               tempOutput = tempOutput.decode("utf-8", "ignore")

 

               if len(str(tempOutput)) > 0:

                   outputData = outputData + tempOutput

 

               # 輸出數據存入oss

               targetBucket.put_object(code + "-output", outputData.encode("utf-8"))

 

           except Exception as e:

 

               print("Error: ", e)

 

               # 有輸入請求被阻塞

               if str(e) == "Timeout exceeded.":

 

                   try:

                       # 從oss讀取數據

                       targetBucket.get_object_to_file(code + "-input", localFileName + "-input")

                       targetBucket.delete_object(code + "-input")

                       with open(localFileName + "-input") as f:

                           inputData = f.read()

                       if inputData:

                           foo.sendline(inputData)

                   except:

                       pass

 

               # 程序執行完成輸出

               elif "End Of File (EOF)" in str(e):

                   targetBucket.put_object(code + "-output", outputData.encode("utf-8"))

                   return True

 

               # 程序拋出異常

               else:

 

                   outputData = outputData + "\n\nException: %s" % str(e)

                   targetBucket.put_object(code + "-output", outputData.encode("utf-8"))

 

                   return False

當我們完成核心的業務邏輯編寫之后,我們可以將項目部署到線上。

項目部署完成之后,和上文的測試方法一樣,在這里也通過 PostMan 對接口進行測試。此時,我們需要設定一個覆蓋能較全的測試代碼,包括輸出打印、輸入、一些 sleep() 等方法:

當我們通過 PostMan 發起請求執行這段代碼之后,我們可以看到系統為我們返回了預期的代碼執行 ID:

我們可以看到系統會返回給我們一個代碼執行 ID,該執行 ID 將會作為我們整個請求任務的 ID,此時,我們可以通過獲取輸出結果的接口,來獲取結果:

由於代碼中有:

time.sleep(10)

所以,迅速獲得結果的時候是看不到后半部分的輸出結果,我們可以設置一個輪訓任務,不斷通過該 ID 對接口進行刷新:

可以看到,10 秒鍾后,代碼執行到了輸入部分:

tempInput = input('please: ')

此時,我們再通過輸入接口,進行輸入操作:

完成之后,我們可以看到輸入成功(result: ok)的結果,此時我們繼續刷新之前獲取結果部分的請求:

可以看到,我們已經獲得到了所有結果的輸出。

相對於上文的在線編程功能,這種“更貼近本地的代碼執行器“變得復雜了很多,但是在實際使用的過程中,卻可以更好的模擬出本地執行代碼時的一些現象,例如代碼的休眠、阻塞、內容的輸出等。

總結

無論是簡單的在線代碼執行器部分,還是更貼近“本地”的代碼執行器部分,這篇文章在所應用的內容是相對廣泛的。通過這篇文章你可以看到:

  • HTTP 觸發器的基本使用方法;對象存儲觸發器的基本使用方;
  • 函數計算組件、對象存儲組件的基本使用方法,組件間依賴的實現方法;

同時,通過這篇文章,也可以從一個側面看到這樣一個常見問題的簡單解答:我有一個項目,我是每個接口一個函數,還是多個接口復用一個函數?

針對這個問題,其實最主要的是看業務本身的訴求,如果多個接口表達的含義是一致的,或者是同類的,類似的,並且多個接口的資源消耗是類似的,那么放在一個函數中來通過不同的路徑進行區分是完全可以的;如果出現資源消耗差距較大,或者函數類型、規模、類別區別過大的時候,將多個接口放在多個函數下也是沒有問題的。

本文實際上是拋磚引玉,無論是 OJ 系統的“判題機”部分,還是在線編程工具的“執行器部分”,都可以很好的和 Serverless 架構有着比較有趣的結合點。這種結合點不僅僅可以解決傳統在線編程所頭疼的事情(安全問題,資源消耗問題,並發問題,流量不穩定問題),更可以將 Serverless 的價值在一個新的領域發揮出來。

原文鏈接
本文為阿里雲原創內容,未經允許不得轉載。 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM