logstash6.8.3 導入 CSV 文件到 ElasticSearch


logstash6.8.3 導入 CSV 文件到 ElasticSearch

使用logstash 導入數據到ES時,由三個步驟組成:input、filter、output。整個導入過程可視為:unix 管道操作,而管道中的每一步操作都是由"插件"實現的。使用 ./bin/logstash-plugin list 查看 logstash 已安裝的插件。

每個插件的選項都可以在官網查詢,先明確是哪一步操作,然后去官方文檔看是否有相應的插件是否支持這種操作。比如 output 配置選項:plugins-outputs-elasticsearch-options),其中的doc_id選項就支持 指定docid寫入ES。在這里,簡要說明一些常用的插件,要想了解它們實現的功能可參考官方文檔。

  1. mutate 插件 用於字段文本內容處理,比如 字符替換
  2. csv 插件 用於csv格式文件導入ES
  3. convert 插件 用於字段類型轉換
  4. date 插件 用於日期類型的字段處理

使用logstash導入時,默認以 "message" 標識 每一行數據,並且會生成一些額外的字段,比如@version、host、@timestamp,如果用不着,這些字段可以去除掉 ,此外,要注意ES中的索引的格式(Mapping結構),最好是指定自定義的索引模板,保證索引最"精簡"。

另外這里記錄一些常用的參數及其作用,更具體的解釋可查看官方文檔。

  1. sincedb_path 告訴logstash記錄文件已經處理到哪一行了,從而當logstash發生故障重啟時,可從故障點處開始導入,避免從頭重新導入。
  2. remove_field 刪除某些字段

配置文件完成后,執行以下命令./bin/logstash -f csvfile_logstash.conf 即可啟動 logstash 執行導入操作。

以下是各種錯誤解決:
錯誤一:

ConfigurationError”, :message=>”Expected one of #, input, filter, output at line 1, column 1

如果 配置文件內容是正確的,用Notepad++檢查一下文件的編碼,確保是:UTF-8 無BOM格式編碼

解決 SOH 分隔符問題

由於csv插件的separator選項不支持轉義字符,因此無法用\u0001來代表SOH。如果 csv 文件以 SOH 分隔符(\u0001)分割,一種方案是使用mutate插件替換,將\u0001替換成逗號。如下所示:

    mutate{
		# 每一行內容默認是message, 將分隔符 \u0001 替換成 逗號
		gsub => [ "message","\u0001","," ]
		# @timestamp 字段是默認生成的, 名稱修改成 created
		rename => ["@timestamp", "created"]
    }

但是實際上logstash6.8.3是支持按 SOH 分割的。在Linux shell 下,先按 ctrl+v,再按ctrl+a,輸入的就是SOH。那么在vim中打開配置文件,在 vim的 insert 模式下,先按 ctrl+v,再按ctrl+a,將SOH作為 csv 插件的separator分割符。

    csv {
			# 每行按逗號分割, 生成2個字段: topsid 和 title, (如果分割超過2列了,第三列則以 column3 命名)
            separator => ""
            columns => ["topsid", "title"]
			# 刪除一些不需要索引到ES中去的字段(logstash默認生成的一些字段)
			remove_field => ["host", "@timestamp", "@version", "message","path"]
        }	

一個將csv文件內容導入ES的示例配置模板如下:(csv文件中的每一行以SOH作為分割符)

  • logstash input 插件支持多種數據來源,比如kafka、beats、http、file 等。在這里我們的數據來源是文件,因此采用了logstash input file 插件。
  • 把數據從文件中讀到logstash后,可能需要對文件內容/格式 進行處理,比如分割、類型轉換、日期處理等,這由logstash filter插件實現。在這里我們進行了文件的切割和類型轉換,因此使用的是logstash filter csv插件和mutate插件。
  • 處理成我們想要的字段后,接下來就是導入到ES,那么就需要配置ES的地址、索引名稱、Mapping結構信息(使用指定模板寫入),這由 logstash output插件實現,在這里我們把處理后的數據導入ES,因此使用的是 logstash output elasticsearch插件。
input {
  file {
      path => "/data/psj/test/*.csv"
      start_position => "beginning"
	  sincedb_path => "/dev/null"
    }
}

filter {
    csv {
			# 每行按逗號分割, 生成2個字段: topsid 和 title, (如果分割超過2列了,第三列則以 column3 命名)
            separator => ""
            columns => ["topsid", "title"]
			# 刪除一些不需要索引到ES中去的字段(logstash默認生成的一些字段)
			remove_field => ["host", "@timestamp", "@version", "message","path"]

        }			
	mutate {
    convert => {
		# 類型轉換
		"topsid" => "integer"
		"title" => "string"
    }
  }
}

output {
   elasticsearch {
        hosts => "http://http://127.0.0.1:9200"
        index => "chantitletest"
    	# 指定 文檔的 類型為 "_doc"
		document_type => "_doc"
		# 指定doc id 為topsid字段的值
		document_id => "%{topsid}"
		manage_template => true
		# 使用自定義的模板寫入,否則將會以logstash默認模板寫入
		template => "/data/services/logstash-6.8.3/config/chantitletpe.json"
		template_overwrite => true
		template_name => "chantitletpe"
       }
    stdout{
		codec => json_lines
	}
}

(也可以采用logstash filter插件的 mutate選項 將SOH轉換成逗號):

filter {
    mutate{
		# 每一行內容默認是message, 將分隔符 \u0001 替換成 逗號
		gsub => [ "message","\u0001","," ]
		# @timestamp 字段是默認生成的, 名稱修改成 created
		rename => ["@timestamp", "created"]
    }
    csv {
	    # 每行按逗號分割, 生成2個字段: topsid 和 title, (如果分割超過2列了,第三列則以 column3 命名)
            separator => ","
            columns => ["topsid", "title"]
			# 刪除一些不需要索引到ES中去的字段(logstash默認生成的一些字段)
			remove_field => ["host", "@timestamp", "@version", "message","path"]
        }			
	mutate {
    convert => {
		# 類型轉換
		"topsid" => "integer"
		"title" => "string"
    }
  }
}

使用的自定義模板如下:

{
  "index_patterns": [
    "chantitle_v1",
    "chantitletest"
  ],
  "settings": {
    "number_of_shards": 3,
    "analysis": {
      "analyzer": {
        "my_hanlp_analyzer": {
          "tokenizer": "my_hanlp"
        },
        "pinyin_analyzer": {
          "tokenizer": "my_pinyin"
        }
      },
      "tokenizer": {
        "my_hanlp": {
          "enable_normalization": "true",
          "type": "hanlp_standard"
        },
        "my_pinyin": {
          "keep_joined_full_pinyin": "true",
          "lowercase": "true",
          "keep_original": "true",
          "remove_duplicated_term": "true",
          "keep_first_letter": "false",
          "keep_separate_first_letter": "false",
          "type": "pinyin",
          "limit_first_letter_length": "16",
          "keep_full_pinyin": "true"
        }
      }
    }
  },
  "mappings": {
    "_doc": {
      "properties": {
        "created": {
          "type": "date",
          "doc_values": false,
          "format": "yyyy-MM-dd HH:mm:ss"
        },
        "title": {
          "type": "text",
          "fields": {
            "pinyin": {
              "type": "text",
              "boost": 10,
              "analyzer": "pinyin_analyzer"
            },
            "raw": {
              "type": "keyword",
              "doc_values": false
            }
          },
          "analyzer": "my_hanlp_analyzer"
        },
        "topsid": {
          "type": "long",
          "doc_values": false
        }
      }
    }
  }
}

上面給了一個csv文件導入ES,這里再給個txt文件導入ES吧。txt 以逗號分割,每列的內容都在冒號里面,只需要前4列內容,一行示例數據如下:

"12345","12345","研討區","12345","500","xxxx","2008-08-04 22:20:24","0","300","0","5","0","","0","0","","","0","0"

這里采用的是 logstash filter 的 dissect 插件。相比於grok插件,它的優點不是采用正規匹配的方式解析數據,速度較快,但不能解析復雜數據。只能夠對較為規律的數據進行導入。logstash 配置文件如下:

input {
  file {
      path => "/data/psj/test/*.txt"
      start_position => "beginning"
      # sincedb_path => "/dev/null"
    }
}

filter {
  dissect {
      mapping => {
		# 插件輸入的每一行數據默認名稱是message,由於每列數據在雙引號里面,因此解析前4列數據的寫法如下:
        "message" => '"%{topsid}","%{subsid}","%{subtitle}","%{pid}"'
      }
	  # 刪除自動生成的、用不着的一些字段
	  remove_field => ["host", "@timestamp", "@version", "message","path"]
	  convert_datatype => {
		# 類型轉換
		"topsid" => "int"
		"subsid" => "int"
		"pid" => "int"
    }
    }
}

output {
   elasticsearch {
        hosts => "http://127.0.0.1:9200"
        index => "chansubtitletest"
		document_type => "_doc"
		# 指定doc id 為topsid字段的值
		document_id => "%{subsid}"
		manage_template => true
		# 使用自定義的模板寫入,否則將會以logstash默認模板寫入
		template => "/data/services/logstash-6.8.3/config/chansubtitle.json"
		template_overwrite => true
		template_name => "chansubtitle"
       }
    stdout{
		codec => json_lines
	}
}

原文:https://www.cnblogs.com/hapjin/p/12410408.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM