Storm 中的 tuple可以包含任何類型的對象。由於Storm 是一個分布式系統,所以在不同的任務之間傳遞消息時Storm必須知道怎樣序列化、反序列化消息對象。
Storm 使用 Kryo庫對對象進行序列化。Kryo 是一個靈活、快速的序列化庫。Storm 默認支持基礎類型、string、byte arrays、ArrayList、HashMap、HashSet 以及 Clojure 的集合類型的序列化。如果需要在tuple中使用其他的對象類型,就需要注冊一個自定義的序列化器。
原文和作者一起討論:http://www.cnblogs.com/intsmaze/p/7044042.html
微信:intsmaze
避免微信回復重復咨詢問題,技術咨詢請博客留言。
自定義序列化
TORM使用Kryo來序列化。要實現自定義序列化器,我們需要使用Kryo注冊新的序列化器。添加自定義序列化器是通過拓撲配置的topology.kryo.register屬性完成的。它需要一個注冊的列表,每個注冊項可以采取兩種形式:
1:類名注冊,在這種情況下,Storm將使用Kryo的FieldsSerializer來序列化該類。這可能是也可能不是該類最好的選擇,更多的細節可以查看Kryo文檔。
2:實現了com.esotericsoftware.kryo.Serializer接口的類名注冊的映射。
Storm為拓撲配置里的注冊序列化提供了幫助。Config類中有一個名為registerSerialization的方法,可以把注冊添加到配置中。
public void registerSerialization(Class klass); public void registerSerialization(Class klass, Class<? extends Serializer> serializerClass);
java序列化
一個拓撲中不同的任務傳遞消息時Storm發現了一個沒有注冊序列化器的類型,它會使用 Java 序列化器來代替,如果這個對象無法被Java序列化器序列化,Storm 就會拋出異常。
注意,Java 自身的序列化機制非常耗費資源,而且不管在 CPU 的性能上還是在序列化對象的大小上都沒有優勢。強烈建議讀者在生產環境中運行topology 的時候注冊一個自定義的序列化器。
可以通過將 Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION 配置為 false 的方式來將禁止序列化器回退到Java的序列化機制。
Config.setFallBackOnJavaSerialization(conf,false);這個時候如果storm使用java序列化就會拋出異常告訴開發人員去注冊一個kryo序列化。
實現storm序列化
創建傳輸的對象。
package cn.intsmaze.serializable.bean; public class Person { private int age; private Studnet studnet; private ArrayList arrayList=new ArrayList(); private LinkedList linkedList=new LinkedList(); public Person() { } public Person(int age,Studnet s) { this.age = age; this.studnet=s; arrayList.add("ArrayList中的"+s.getName()); linkedList.add("linkedList中的"+s.getName()); } @Override public String toString() { return "Person [age=" + age + ", studnet=" + studnet + ", arrayList=" + arrayList + ", linkedList=" + linkedList + "]"; } get(),set()...... } package cn.intsmaze.serializable.bean; public class Studnet { private String name; public Studnet() { } public Studnet(String name) { this.name = name; } @Override public String toString() { return "Studnet [name=" + name + "]"; } get(),set()...... }
spout和bolt的實現,spout每次會創建一個person對象將該對象發送到bolt,bolt類接收到該對象將該對象打印出來。
package cn.intsmaze.serializable; import cn.intsmaze.serializable.bean.Person; import cn.intsmaze.serializable.bean.Studnet; public class SpoutBean extends BaseRichSpout { SpoutOutputCollector collector; public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } public void nextTuple() { Studnet s=new Studnet("xiaoxi"); collector.emit(new Values(new Person(100,s))); Utils.sleep(500); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("person")); } } package cn.intsmaze.serializable; import cn.intsmaze.serializable.bean.Person; public class BoltBean extends BaseBasicBolt { public void prepare(Map stormConf, TopologyContext context) { super.prepare(stormConf, context); } public void execute(Tuple input, BasicOutputCollector collector) { Person person = (Person)input.getValueByField("person"); System.out.println("接收到spout節點傳來的數據:"+person); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
場景一:
使用public void registerSerialization(Class klass);
package cn.intsmaze.serializable; import java.util.LinkedList; import cn.intsmaze.serializable.bean.Person; import cn.intsmaze.serializable.bean.Studnet; public class TopologyBean { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new SpoutBean(), 1); builder.setBolt("bolt", new BoltBean(), 1).shuffleGrouping("spout"); Config conf = new Config(); conf.registerSerialization(Person.class); conf.registerSerialization(Studnet.class); //注釋掉后,但Studnet沒實現java序列化,則會報錯。有兩種方法,一種注冊該類,一種實現java序列化。 conf.registerSerialization(LinkedList.class); //這里如果注釋掉,則會使用java序列化方式,如果我們取消掉禁止使用java序列化方法,則會提示注冊LinkedList類報錯。 conf.setNumWorkers(2); // Config.setFallBackOnJavaSerialization(conf, false);//禁止使用java語言自己的序列化 StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } }
第11行,我們注冊person類使用Kryo序列化,person對象除了有基本類型int字段外,還有arraylist,linkedlist類型以及自定義的student類型。arraylist類型是storm默認已經提供了支持。
這里如果我們不對linkedlist類型和自定義類型student進行注冊則該拓撲在運行時則會報無法序列化student類型異常。
這個時候有兩種辦法解決:
一種就是使student實現java的public class Studnet implements Serializable接口,則該拓撲會成功運行。因為storm如果發現傳輸的對象如果沒有注冊為Kryo,則就會使用java的序列化對象,而linkedlist默認已經實現了該接口,所以才會出現前面報student對象無法序列化,然后使得student實現java的序列化接口即可。
第二種方案就是,我們對student類進行注冊conf.registerSerialization(Studnet.class);。
雖然linkedlist不注冊,會默認使用java的序列化,但是出於效率的考慮,我們將其注冊為Kryo。
提示:因為有些集合類型,storm沒有提供序列化支持,但是實現了java序列化接口,所以如果我們不加以控制,會使用java序列化而拖累整個系統。所以推薦使用
Config.setFallBackOnJavaSerialization(conf, false);禁止使用java語言自己的序列化來可以在本地模式時及時發現報錯信息,將問題盡早解決。
場景二:
我們使用kryo序列化,但是有時候我們並不希望傳輸對象的所有字段,而只是傳輸對象的某些字段,從而進一步提高消息的傳遞速率,這個時候我們可以使用kryo的自定義序列化機制來指定傳輸的值。
package cn.intsmaze.serializable.bean; import java.util.ArrayList; import java.util.LinkedList; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; public class PersonSerializable extends Serializer<Person>{ @Override public Person read(Kryo kryo, Input input, Class<Person> arg2) { System.out.println("序列化"); Person person=new Person(); person.setAge(input.readInt()); person.setArrayList(kryo.readObject(input, ArrayList.class)); person.setLinkedList(kryo.readObject(input, LinkedList.class));//該類型,storm默認不支持,所以要在topology中注冊該類型,如果不注冊,則會使用java序列化。 person.setStudnet(kryo.readObject(input, Studnet.class));//該類型,storm默認不支持,所以要在topology中注冊該類型,如果不注冊,且java序列化沒有實現,則會報錯。 return person; } @Override public void write(Kryo kryo, Output output, Person person) { System.out.println("反序列化"); output.writeInt(person.getAge()); kryo.writeObject(output, person.getArrayList()); kryo.writeObject(output, person.getLinkedList()); kryo.writeObject(output, person.getStudnet()); } }
package cn.intsmaze.serializable; import java.util.LinkedList; import cn.intsmaze.serializable.bean.Person; import cn.intsmaze.serializable.bean.PersonSerializable; import cn.intsmaze.serializable.bean.Studnet; public class TopologyBean { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new SpoutBean(), 1); builder.setBolt("bolt", new BoltBean(), 1).shuffleGrouping("spout"); Config conf = new Config(); conf.registerSerialization(Studnet.class); conf.registerSerialization(LinkedList.class); conf.registerSerialization(Person.class, PersonSerializable.class); conf.setNumWorkers(2); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } }
因為PersonSerializable類中指定了要傳輸person對象的int,studne,ArrayList,LinkedList 類型。
如果我們注釋掉第12行 conf.registerSerialization(Studnet.class);且Studnet類沒有實現java的序列化,則拓撲的任務間傳遞消息進行序列化時就會報無法序列化該類的錯誤,感興趣的同學可以試試注釋掉該行,看看storm會報什么異常。
第13行,我們必須注冊對LinkedList序列化,storm默認支持了對ArrayList類的序列化,但沒有提供對LinkedList序列化,需要我們手動注冊,如果不注冊,因為LinkedList實現了java的序列化接口,所以會使用java序列化,則不會報錯。
強烈建議,在開發中就算注冊了kyro序列化方式,也要設置該conf.setFallBackOnJavaSerialization(false)方法來禁止使用java序列化方式,因為實際開發中,核心架構搭建好了,會讓團隊成員直接在現成架構上編寫,他們不需要了解storm的一些機制,但是這也帶來問題,一種場景就是,開發人員對傳輸對象增加了一個LinkedList字段,但是他沒有注冊序列化類,storm就會對LinkedList使用java序列化,就會拖累系統的性能,所以在架構的時候,通過設置禁止java序列化方法,就可以在測試中及時發現問題所在。
補充:上面的所有一切,在本地運行以及部署到集群時,work數量設置為1時,都不會生效的。因為同一個對象公有一個內存,不會涉及網絡傳輸的,也就不需要序列化和反序列化。
生產場景回顧:
本人intsmaze生產上遇見的問題:storm工程中對傳輸對象使用了conf.registerSerialization(Person.class, PersonSerializable.class);方式來指定序列化該對象的某些字段。初級程序員在storm工程上開發時,因為業務需要對傳輸對象增加了一個字段,但是沒有在PersonSerializable中序列化和反序列化該對象。恰巧的時,初級工程師本地模式和准生產測試時,topology的work的數量都為1,導致對象在bolt和bolt節點傳輸時並沒有走序列化方式,結果測試一切正常,但是上生產后,因為work數量是10個,立馬在后一個bolt中報大量的空指針異常,造成很嚴重的生產問題。