大數據之數據收集
數據收集是大數據的基礎。散落在各處的數據,只有經過了數據收集,才會集中起來,提供了后續處理的可能。從大數據技術發展以來,出現了很多數據收集的技術框架,本文試圖在若干流行的數據收集解決方案上加以敘述。
評估一個技術框架是否適合某個業務場景,通常需要考慮多個方面。
l 最基本的,考慮接口是否適配,收集socket數據了還是log數據,輸出到哪里;
l 考慮技術框架的性能,是否滿足業務的需求;
l 還需要考慮靈活性,如果需要做一些過濾或者自定義開發,是否容易;
l 考慮對性能的影響,數據收集不能影響了業務系統本身的運行,不能資源消耗太大;
l 考慮運維的難易程度,有的技術方案依賴很多,配置很復雜,就容易出錯;
l 考慮技術框架是否高可靠,不會出現丟數據的情況。
相信通過上面幾個方面的判斷,應該可以找到合適的技術框架。
一、Flume
Flume是一種分布式,可靠且可用的服務,用於有效地收集,聚合和移動大量流事件數據。
source接口適配
| Source Type |
Comments |
| Avro Source |
Listens on Avro port and receives events from external Avro client streams. |
| Thrift Source |
Listens on Thrift port and receives events from external Thrift client streams. |
| Exec Source |
Exec source runs a given Unix command on start-up and expects that process to continuously produce data on standard out (stderr is simply discarded, unless property logStdErr is set to true). |
| JMS Source |
JMS Source reads messages from a JMS destination such as a queue or topic. |
| SSL and JMS Source |
MS client implementations typically support to configure SSL/TLS via some Java system properties defined by JSSE (Java Secure Socket Extension). |
| Spooling Directory Source |
This source lets you ingest data by placing files to be ingested into a “spooling” directory on disk. |
| Taildir Source |
Watch the specified files, and tail them in nearly real-time once detected new lines appended to the each files. |
| Kafka Source |
Kafka Source is an Apache Kafka consumer that reads messages from Kafka topics. |
| NetCat TCP Source |
A netcat-like source that listens on a given port and turns each line of text into an event. |
| NetCat UDP Source |
As per the original Netcat (TCP) source, this source that listens on a given port and turns each line of text into an event and sent via the connected channel. Acts like nc -u -k -l [host] [port]. |
| Sequence Generator Source |
A simple sequence generator that continuously generates events with a counter that starts from 0, increments by 1 and stops at totalEvents. |
| Syslog Sources |
Reads syslog data and generate Flume events. |
| Syslog TCP Source |
The original, tried-and-true syslog TCP source. |
| Multiport Syslog TCP Source |
his is a newer, faster, multi-port capable version of the Syslog TCP source. |
| Syslog UDP Source |
|
| HTTP Source |
A source which accepts Flume Events by HTTP POST and GET. |
| Stress Source |
StressSource is an internal load-generating source implementation which is very useful for stress tests. |
| Custom Source |
A custom source is your own implementation of the Source interface. |
| Scribe Source |
Scribe is another type of ingest system. To adopt existing Scribe ingest system, Flume should use ScribeSource based on Thrift with compatible transfering protocol. |
|
|
|
sink接口適配
| Sink Type |
Comments |
| HDFS Sink |
This sink writes events into the Hadoop Distributed File System (HDFS). It currently supports creating text and sequence files. It supports compression in both file types. |
| Hive Sink |
This sink streams events containing delimited text or JSON data directly into a Hive table or partition. Events are written using Hive transactions. |
| Logger Sink |
Logs event at INFO level. Typically useful for testing/debugging purpose. |
| Avro Sink |
This sink forms one half of Flume’s tiered collection support. Flume events sent to this sink are turned into Avro events and sent to the configured hostname / port pair. |
| Thrift Sink |
This sink forms one half of Flume’s tiered collection support. Flume events sent to this sink are turned into Thrift events and sent to the configured hostname / port pair. |
| IRC Sink |
The IRC sink takes messages from attached channel and relays those to configured IRC destinations. |
| File Roll Sink |
Stores events on the local filesystem. |
| Null Sink |
Discards all events it receives from the channel. |
| HBaseSink |
This sink writes data to HBase. The Hbase configuration is picked up from the first hbase-site.xml encountered in the classpath. A class implementing HbaseEventSerializer which is specified by the configuration is used to convert the events into HBase puts and/or increments. |
| HBase2Sink |
HBase2Sink is the equivalent of HBaseSink for HBase version 2. |
| AsyncHBaseSink |
This sink writes data to HBase using an asynchronous model. |
| MorphlineSolrSink |
This sink extracts data from Flume events, transforms it, and loads it in near-real-time into Apache Solr servers, which in turn serve queries to end users or search applications. |
| ElasticSearchSink |
This sink writes data to an elasticsearch cluster. By default, events will be written so that the Kibana graphical interface can display them - just as if logstash wrote them. |
| Kafka Sink |
This is a Flume Sink implementation that can publish data to a Kafka topic. |
| HTTP Sink |
Behaviour of this sink is that it will take events from the channel, and send those events to a remote service using an HTTP POST request. The event content is sent as the POST body. |
| Custom Sink |
A custom sink is your own implementation of the Sink interface. A custom sink’s class and its dependencies must be included in the agent’s classpath when starting the Flume agent. |
由上可見,Flume支持的接口比較豐富,最常用的基於文件的日志收集source以及同步到kafka的sink。
處理能力,處理能力和機器性能和數據都有關,在考慮的時候,既需要考慮每秒多少條數據,也需要考慮每秒多少兆數據。通常,在以file作為channel的時候,Flume可以支持每秒幾十兆的數據處理,以memory作為channel的時候,可以支持每秒幾百兆的數據處理。
靈活性,Flume的靈活性還是不錯的,在數據處理的各個環節都預留有接口,方便進行個性化開發,再加上Flume本身也是java語言開發的,就更友好一些。
消耗,Flume本身的資源消耗還是比較多的,如果對資源消耗敏感,經過參數調優之后,使用的資源能夠降低不少。
維護性,Flume的依賴不多,jar包下載既可用,有人覺得配置起來比較麻煩,不過靈活性帶來的就是復雜性,個人感覺還好。
可靠性,Flume的可靠性在幾個方面都有體現,首先需要選擇合適的channel,來保證消息處理的可靠性,其次Flume自身還待遇LB的功能。
http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
https://blog.csdn.net/lijinqi1987/article/details/77506034
二、Logstash
Logstash本身是作為ELK的一員存在的,負責數據攝入,后來慢慢的也接入了更多的數據源和數據端。
Input接口
| Input Plugin |
Description |
| azure_event_hubs |
Receives events from Azure Event Hubs |
| beats |
Receives events from the Elastic Beats framework |
| cloudwatch |
Pulls events from the Amazon Web Services CloudWatch API |
| couchdb_changes |
Streams events from CouchDB’s _changes URI |
| dead_letter_queue |
read events from Logstash’s dead letter queue |
| elasticsearch |
Reads query results from an Elasticsearch cluster |
| exec |
Captures the output of a shell command as an event |
| file |
Streams events from files |
| ganglia |
Reads Ganglia packets over UDP |
| gelf |
Reads GELF-format messages from Graylog2 as events |
| generator |
Generates random log events for test purposes |
| github |
Reads events from a GitHub webhook |
| google_cloud_storage |
Extract events from files in a Google Cloud Storage bucket |
| google_pubsub |
Consume events from a Google Cloud PubSub service |
| graphite |
Reads metrics from the graphite tool |
| heartbeat |
Generates heartbeat events for testing |
| http |
Receives events over HTTP or HTTPS |
| http_poller |
Decodes the output of an HTTP API into events |
| imap |
Reads mail from an IMAP server |
| irc |
Reads events from an IRC server |
| java_generator |
Generates synthetic log events |
| java_stdin |
Reads events from standard input |
| jdbc |
Creates events from JDBC data |
| jms |
Reads events from a Jms Broker |
| jmx |
Retrieves metrics from remote Java applications over JMX |
| kafka |
Reads events from a Kafka topic |
| kinesis |
Receives events through an AWS Kinesis stream |
| log4j |
Reads events over a TCP socket from a Log4j SocketAppender object |
| lumberjack |
Receives events using the Lumberjack protocl |
| meetup |
Captures the output of command line tools as an event |
| pipe |
Streams events from a long-running command pipe |
| puppet_facter |
Receives facts from a Puppet server |
| rabbitmq |
Pulls events from a RabbitMQ exchange |
| redis |
Reads events from a Redis instance |
| relp |
Receives RELP events over a TCP socket |
| rss |
Captures the output of command line tools as an event |
| s3 |
Streams events from files in a S3 bucket |
| s3_sns_sqs |
Reads logs from AWS S3 buckets using sqs |
| salesforce |
Creates events based on a Salesforce SOQL query |
| snmp |
Polls network devices using Simple Network Management Protocol (SNMP) |
| snmptrap |
Creates events based on SNMP trap messages |
| sqlite |
Creates events based on rows in an SQLite database |
| sqs |
Pulls events from an Amazon Web Services Simple Queue Service queue |
| stdin |
Reads events from standard input |
| stomp |
Creates events received with the STOMP protocol |
| syslog |
Reads syslog messages as events |
| tcp |
Reads events from a TCP socket |
| |
Reads events from the Twitter Streaming API |
| udp |
Reads events over UDP |
| unix |
Reads events over a UNIX socket |
| varnishlog |
Reads from the varnish cache shared memory log |
| websocket |
Reads events from a websocket |
| wmi |
Creates events based on the results of a WMI query |
| xmpp |
Receives events over the XMPP/Jabber protocol |
可以看到Logstash除了支持file,stdout,kafka等常規的input之外,還支持很多亂七八糟的input,這說明其作為一個ELK的數據攝入是合格的,但是否是我們需要的,則要仔細評估。
Output接口
| Output Plugin |
Description |
| boundary |
Sends annotations to Boundary based on Logstash events |
| circonus |
Sends annotations to Circonus based on Logstash events |
| cloudwatch |
Aggregates and sends metric data to AWS CloudWatch |
| csv |
Writes events to disk in a delimited format |
| datadog |
Sends events to DataDogHQ based on Logstash events |
| datadog_metrics |
Sends metrics to DataDogHQ based on Logstash events |
| elastic_app_search |
Sends events to the Elastic App Search solution |
| elasticsearch |
Stores logs in Elasticsearch |
| |
Sends email to a specified address when output is received |
| exec |
Runs a command for a matching event |
| file |
Writes events to files on disk |
| ganglia |
Writes metrics to Ganglia’s gmond |
| gelf |
Generates GELF formatted output for Graylog2 |
| google_bigquery |
Writes events to Google BigQuery |
| google_cloud_storage |
Uploads log events to Google Cloud Storage |
| google_pubsub |
Uploads log events to Google Cloud Pubsub |
| graphite |
Writes metrics to Graphite |
| graphtastic |
Sends metric data on Windows |
| http |
Sends events to a generic HTTP or HTTPS endpoint |
| influxdb |
Writes metrics to InfluxDB |
| irc |
Writes events to IRC |
| java_sink |
Discards any events received |
| java_stdout |
Prints events to the STDOUT of the shell |
| juggernaut |
Pushes messages to the Juggernaut websockets server |
| kafka |
Writes events to a Kafka topic |
| librato |
Sends metrics, annotations, and alerts to Librato based on Logstash events |
| loggly |
Ships logs to Loggly |
| lumberjack |
Sends events using the lumberjack protocol |
| metriccatcher |
Writes metrics to MetricCatcher |
| mongodb |
Writes events to MongoDB |
| nagios |
Sends passive check results to Nagios |
| nagios_nsca |
Sends passive check results to Nagios using the NSCA protocol |
| opentsdb |
Writes metrics to OpenTSDB |
| pagerduty |
Sends notifications based on preconfigured services and escalation policies |
| pipe |
Pipes events to another program’s standard input |
| rabbitmq |
Pushes events to a RabbitMQ exchange |
| redis |
Sends events to a Redis queue using the RPUSH command |
| redmine |
Creates tickets using the Redmine API |
| riak |
Writes events to the Riak distributed key/value store |
| riemann |
Sends metrics to Riemann |
| s3 |
Sends Logstash events to the Amazon Simple Storage Service |
| sns |
Sends events to Amazon’s Simple Notification Service |
| solr_http |
Stores and indexes logs in Solr |
| sqs |
Pushes events to an Amazon Web Services Simple Queue Service queue |
| statsd |
Sends metrics using the statsd network daemon |
| stdout |
Prints events to the standard output |
| stomp |
Writes events using the STOMP protocol |
| syslog |
Sends events to a syslog server |
| tcp |
Writes events over a TCP socket |
| timber |
Sends events to the Timber.io logging service |
| udp |
Sends events over UDP |
| webhdfs |
Sends Logstash events to HDFS using the webhdfs REST API |
| websocket |
Publishes messages to a websocket |
| xmpp |
Posts events over XMPP |
| zabbix |
Sends events to a Zabbix server |
與Input類似,Output首先是一大批ES自己的東西,對於Hadoop系統的支持本身比較少,但在一些NoSQL數據庫方面的支持,相對多一些,比如Redis、MongoDB等。
處理能力,Logstash處理能力在每秒幾千條的規模上。
靈活性,Logstash提供了強大的數據過濾和預處理能力。
消耗,Logstash對資源的要求比較高,需要比較多的內存資源。
運維,Logstash本身以JRuby寫成,依賴和配置的復雜度比較高。
可靠,Logstash是單機運行,極端情況下存在丟數據的可能。
https://www.elastic.co/guide/en/logstash/current/index.html
https://doc.yonyoucloud.com/doc/logstash-best-practice-cn/index.html
三、FileBeat
FileBeat也是ES推出的數據收集的技術框架,相對於Logstash而言,支持數據處理的能力要弱一些,不過這正是其目的——輕量化,資源消耗就很低。
Input接口
| Input type |
Comments |
| Log |
read lines from log files. |
| Stdin |
read events from standard in. |
| Container |
read containers log files. |
| Kafka |
read from topics in a Kafka cluster. |
| Redis |
read entries from Redis slowlogs. |
| UDP |
read events over UDP. |
| Docker |
read logs from Docker containers. |
| TCP |
read events over TCP. |
| Syslog |
read events over TCP or UDP, this input will parse BSD (rfc3164) event and some variant. |
| s3 |
retrieve logs from S3 objects that are pointed by messages from specific SQS queues. |
| NetFlow |
read NetFlow and IPFIX exported flows and options records over UDP. |
| Google Pub/Sub |
read messages from a Google Cloud Pub/Sub topic subscription. |
| Azure eventhub |
read messages from an azure eventhub. |
可以看到FileBeat支持的Input種類比較少,但是常規的文件、標准輸出、Kafka等都支持,另外對特定產品的支持還是有的。
Output接口
| Output type |
Comments |
| Elasticsearch |
sends the transactions directly to Elasticsearch by using the Elasticsearch HTTP API. |
| Logstash |
sends events directly to Logstash by using the lumberjack protocol, which runs over TCP. Logstash allows for additional processing and routing of generated events. |
| Kafka |
sends the events to Apache Kafka. |
| Redis |
inserts the events into a Redis list or a Redis channel. |
| File |
dumps the transactions into a file where each transaction is in a JSON format. |
| Console |
writes events in JSON format to stdout. |
由以上組件接口可以看出,FileBeat在支持持久化方面還是比較弱的,由於本身孵化自ES,所以持久化的大部分為ES系組件,但對Kafka和Redis的支持,也能滿足一定的需求。
處理能力,FileBeat的處理能力相對一般,滿足基本的需求可以,類似每秒幾千條數據,如果數據量再多,就需要特殊處理。
靈活性,FileBeat提供了一定的靈活性,不過GO語言本身就有門檻,不如Flume和Logstash靈活性和處理能力強。
消耗,由於以GO編寫,所以正常情況下,資源消耗不多,但當遇到event的消息比較大時,在默認配置下容易出現OOM的情況。
運維,FileBeat本身是用GO寫的,所以沒有額外的依賴,但配置文件采用類YAML格式,可配的內容還是比較多的。
可靠,沒看到FileBeat本身對數據傳輸過程本身防丟失采取的策略,所以極端情況下,存在丟數據的可能。
https://www.elastic.co/guide/en/beats/filebeat/current/index.html
