用Spark-NLP建立文本分類模型


作者|GUEST
編譯|VK
來源|Analytics Vidhya

概述

  • 在AWS電子病歷上建立John Snow實驗室的Spark NLP,並使用該庫對BBC文章進行簡單的文本分類。

介紹

自然語言處理是全球數據科學團隊的重要過程之一。隨着數據的不斷增長,大多數組織已經轉移到大數據平台,如apachehadoop和AWS、Azure和GCP等雲產品。

這些平台不僅能夠處理大數據,使組織能夠對非結構化數據(如文本分類)進行大規模分析。但在機器學習方面,大數據系統和機器學習工具之間仍然存在差距。

流行的機器學習python庫,如scikit-learn和Gensim,經過高度優化,可以在單節點計算機上執行,而不是為分布式環境設計的。

Apache Spark MLlib是許多幫助彌合這一差距的工具之一,它提供了大多數機器學習模型,如線性回歸、Logistic回歸、支持向量機、隨機森林、K-means、LDA等,以執行最常見的機器學習任務。

除了機器學習算法,Spark MLlib還提供了大量的特征變換器,如Tokenizer、StopWordRemover、n-grams和countvector、TF-IDF和Word2Vec等。

雖然這些轉換器和提取器足以構建基本的NLP管道,但是要構建一個更全面和生產級的管道,我們需要更先進的技術,如詞干分析、詞法化、詞性標記和命名實體識別。

Spark NLP提供了各種注釋器來執行高級NLP任務。有關更多信息,請在網站上查看注釋器列表及其用法

https://nlp.johnsnowlabs.com/docs/en/annotators。

設置環境

讓我們繼續看看如何在AWS EMR上設置Spark NLP。

1.在啟動EMR集群之前,我們需要創建一個引導操作。引導操作用於設置其他軟件或自定義群集節點的配置。以下是可用於在EMR集群上設置Spark NLP的引導操作,

#!/bin/bashsudo yum install -y python36-devel python36-pip python36-setuptools python36-virtualenvsudo python36 -m pip install --upgrade pip
#
sudo python36 -m pip install pandas
#
sudo python36 -m pip install boto3
#
sudo python36 -m pip install re
#
sudo python36 -m pip install spark-nlp==2.4.5

創建shell腳本之后,將該腳本復制到AWS S3中的一個位置。你還可以根據需要安裝其他python包。

2.我們可以使用AWS控制台、API或python中的boto3庫來啟動EMR集群。使用Python的好處是,無論何時需要實例化集群或將其添加到工作流中,都可以重用代碼。

下面是實例化EMR集群的python代碼。

import boto3region_name='region_name'def get_security_group_id(group_name, region_name):
    ec2 = boto3.client('ec2', region_name=region_name)
    response = ec2.describe_security_groups(GroupNames=[group_name])
    return response['SecurityGroups'][0]['GroupId']emr = boto3.client('emr', region_name=region_name)cluster_response = emr.run_job_flow(
        Name='cluster_name', # 更新值
        ReleaseLabel='emr-5.27.0',
        LogUri='s3_path_for_logs', # 更新值
        Instances={
            'InstanceGroups': [
                {
                    'Name': "Master nodes",
                    'Market': 'ON_DEMAND',
                    'InstanceRole': 'MASTER',
                    'InstanceType': 'm5.2xlarge', # 根據要求進行變更
                    'InstanceCount': 1 #對於主節點高可用性,設置計數大於1
                },
                {
                    'Name': "Slave nodes",
                    'Market': 'ON_DEMAND',
                    'InstanceRole': 'CORE',
                    'InstanceType': 'm5.2xlarge', # 根據要求進行變更
                    'InstanceCount': 2
                }
            ],
            'KeepJobFlowAliveWhenNoSteps': True,
            'Ec2KeyName' : 'key_pair_name', # 更新值
            'EmrManagedMasterSecurityGroup': get_security_group_id('ElasticMapReduce-master', region_name=region_name)
            'EmrManagedSlaveSecurityGroup': get_security_group_id('ElasticMapReduce-master', region_name=region_name)
        },
        BootstrapActions=[    {
                    'Name':'install_dependencies',
                    'ScriptBootstrapAction':{
                            'Args':[],
                            'Path':'path_to_bootstrapaction_on_s3' # 更新值
                            }
                }],
        Steps = [],
        VisibleToAllUsers=True,
        JobFlowRole='EMR_EC2_DefaultRole',
        ServiceRole='EMR_DefaultRole',
        Applications=[
            { 'Name': 'hadoop' },
            { 'Name': 'spark' },
            { 'Name': 'hive' },
            { 'Name': 'zeppelin' },
            { 'Name': 'presto' }
        ],
        Configurations=[
            # YARN
            {
                "Classification": "yarn-site", 
                "Properties": {"yarn.nodemanager.vmem-pmem-ratio": "4",
                               "yarn.nodemanager.pmem-check-enabled": "false",
                               "yarn.nodemanager.vmem-check-enabled": "false"}
            },
            
            # HADOOP
            {
                "Classification": "hadoop-env", 
                "Configurations": [
                        {
                            "Classification": "export", 
                            "Configurations": [], 
                            "Properties": {"JAVA_HOME": "/usr/lib/jvm/java-1.8.0"}
                        }
                    ], 
                "Properties": {}
            },
            
            # SPARK
            {
                "Classification": "spark-env", 
                "Configurations": [
                        {
                            "Classification": "export", 
                            "Configurations": [], 
                            "Properties": {"PYSPARK_PYTHON":"/usr/bin/python3",
                                           "JAVA_HOME": "/usr/lib/jvm/java-1.8.0"}
                        }
                    ], 
                "Properties": {}
            },
            {
                "Classification": "spark",
                "Properties": {"maximizeResourceAllocation": "true"},
                "Configurations": []
             },
            {
                "Classification": "spark-defaults",
                "Properties": {
                    "spark.dynamicAllocation.enabled": "true" #default is also true
                }
            }
        ]
    )

注意:請確保你對用於日志記錄和存儲引導操作腳本的S3 bucket具有正確的訪問權限。

基於Spark-NLP的BBC文章文本分類

現在我們已經准備好集群了,讓我們使用Spark NLP和Spark MLlib在BBC數據上構建一個簡單的文本分類示例。

1.初始化Spark

我們將導入所需的庫並使用不同的配置參數初始化spark會話。配置值取決於我的本地環境。相應地調整參數。

# 導入Spark NLP
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
import sparknlp
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline# 使用Spark NLP啟動Spark會話
#spark = sparknlp.start()spark = SparkSession.builder \
    .appName("BBC Text Categorization")\
    .config("spark.driver.memory","8G")\ change accordingly
    .config("spark.memory.offHeap.enabled",True)\
    .config("spark.memory.offHeap.size","8G") \
    .config("spark.driver.maxResultSize", "2G") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.4.5")\
    .config("spark.kryoserializer.buffer.max", "1000M")\
    .config("spark.network.timeout","3600s")\
    .getOrCreate()

2.加載文本數據

我們將使用BBC的數據。你可以從這個鏈接下載數據。下載以下數據后,使用spark代碼加載;

https://www.kaggle.com/yufengdev/bbc-text-categorization?#Get-the-data

# 文件位置和類型
file_location = r'path\to\bbc-text.csv'
file_type = "csv"# CSV
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)df.count()

3.將數據集拆分為訓練集和測試集

與python使用scikit learn分割數據不同,Spark Dataframe有一個內置函數randomSplit()來執行相同的操作。

(trainingData, testData) = df.randomSplit([0.7, 0.3], seed = 100)

randomSplit()函數需要兩個參數viz。權重數組和seed。在我們的例子中,我們將使用70/30分割,其中70%是訓練數據,30%是測試數據。

4.使用Spark NLP的NLP管道

讓我們繼續使用Spark NLP構建NLP管道。Spark NLP最大的優點之一是它與Spark MLLib模塊本機集成,有助於構建由transformers和estimators組成的綜合ML管道。

這個管道可以包括諸如CountVectorizer或HashingTF和IDF之類的特征提取模塊。我們還可以在這個管道中包含一個機器學習模型。

下面是由具有特征提取和機器學習模型的NLP管道組成的示例;

from pyspark.ml.feature import HashingTF, IDF, StringIndexer, SQLTransformer,IndexToString
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator# 轉換text列為nlp文件
document_assembler = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("document")#將文檔轉換為標識數組
tokenizer = Tokenizer() \
  .setInputCols(["document"]) \
  .setOutputCol("token")
 
# 清理標識
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized")# 刪除停用詞
stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)
stemmer = Stemmer() \
    .setInputCols(["cleanTokens"]) \
    .setOutputCol("stem")# 將自定義文檔結構轉換為標識數組。
finisher = Finisher() \
    .setInputCols(["stem"]) \
    .setOutputCols(["token_features"]) \
    .setOutputAsArray(True) \
    .setCleanAnnotations(False)# 生成頻率
hashingTF = HashingTF(inputCol="token_features", outputCol="rawFeatures", numFeatures=1000)# 生成逆文檔頻率
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)# 將標簽(字符串)轉換為整數。
label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")# 定義一個簡單的多項式邏輯回歸模型。嘗試不同的超參數組合,看看哪個更適合你的數據。你也可以嘗試不同的算法來比較分數。
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.0)# 將索引(整數)轉換為相應的類標簽
label_to_stringIdx = IndexToString(inputCol="label", outputCol="article_class")# 定義nlp管道
nlp_pipeline = Pipeline(
    stages=[document_assembler, 
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            stemmer, 
            finisher,
            hashingTF,
            idf,
            label_stringIdx,
            lr,
            label_to_stringIdx])

5.訓練模型

現在我們的NLP管道已經准備好了,讓我們根據訓練數據訓練我們的模型。

# 在訓練數據上擬合管道
pipeline_model = nlp_pipeline.fit(trainingData)

6.執行預測

一旦訓練完成,我們就可以預測測試數據上的類標簽。

# 對測試數據進行預測
predictions =  pipeline_model.transform(testData)

7. 評估模型

對訓練后的模型進行評估對於理解模型如何在看不見的數據上運行是非常重要的。我們將看到3個流行的評估指標,准確度、精確度和召回率。

  1. 准確度
# 導入evaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluatorevaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))
print("Test Error = %g " % (1.0 - accuracy))

  1. 精確度
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))
print("Test Error = %g " % (1.0 - accuracy))

  1. 召回率
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="weightedRecall")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))
print("Test Error = %g " % (1.0 - accuracy))

根據業務用例,你可以決定使用哪個度量來評估模型。

例如.如果一個機器學習模型被設計用來根據某些參數來檢測癌症,那么最好使用召回率,因為公司無法承受假負例(一個患有癌症但模型沒有檢測到癌症的人),而如果機器學習模型旨在生成用戶推薦,公司可以負擔得起誤報(10條建議中有8條符合用戶配置文件),因此可以使用精確度作為評估指標。

8. 保存管道模型

在成功地訓練、測試和評估模型之后,你可以將模型保存到磁盤,並在不同的Spark應用程序中使用它。要將模型保存到光盤,請使用以下代碼;

pipeline_model.save('/path/to/storage_location')

結論

Spark NLP提供了大量的注釋器和轉換器來構建數據預處理管道。Sparl NLP與Spark MLLib無縫集成,使我們能夠在分布式環境中構建端到端的自然語言處理項目。

在本文中,我們研究了如何在AWS EMR上安裝Spark NLP並實現了BBC數據的文本分類。我們還研究了Spark MLlib中的不同評估指標,並了解了如何存儲模型以供進一步使用。

希望你喜歡這篇文章。

原文鏈接:https://www.analyticsvidhya.com/blog/2020/07/build-text-categorization-model-with-spark-nlp/

歡迎關注磐創AI博客站:
http://panchuang.net/

sklearn機器學習中文官方文檔:
http://sklearn123.com/

歡迎關注磐創博客資源匯總站:
http://docs.panchuang.net/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM