storm java.io.NotSerializableException


估計有2年沒搞storm了,今天由於工作需要重新搞一次,

不過需要使用最新版本的storm了。使用的是1.2.3版本,

寫了程序,報了一個錯誤,如下

 

 

13886 [main] INFO  o.a.s.d.s.Supervisor - Starting supervisor with id 87f0e450-46bf-4545-b86f-2a9f961ad24d at host vm1.
Exception in thread "main" java.lang.IllegalStateException: Spout 'ThingSpout' contains a non-serializable field of type rexel.topo.ThingSpout$$Lambda$1/109961541, which was instantiated prior to topology creation. rexel.topo.ThingSpout$$Lambda$1/109961541 should be instantiated within the prepare method of 'ThingSpout at the earliest.
	at org.apache.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:143)
	at rexel.topo.MainTopology.main(MainTopology.java:34)
Caused by: java.lang.RuntimeException: java.io.NotSerializableException: rexel.topo.ThingSpout$$Lambda$1/109961541
	at org.apache.storm.utils.Utils.javaSerialize(Utils.java:240)
	at org.apache.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:138)
	... 1 more
Caused by: java.io.NotSerializableException: rexel.topo.ThingSpout$$Lambda$1/109961541
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.storm.utils.Utils.javaSerialize(Utils.java:236)
	... 2 more

 

工程結構如下:

 

MainTopology代碼如下:

package rexel.topo;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import rexel.bean.PropertiesBean;
import rexel.utils.PropertiesUtils;

public class MainTopology {
    private static PropertiesUtils propertiesUtils = PropertiesUtils.getInstance();

    public static void main(String[] args) throws Exception {
        PropertiesBean propertiesBean = propertiesUtils.readProperties();
        if (propertiesBean == null) {
            return;
        }

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("ThingSpout", new ThingSpout(), 1);
        builder.setBolt("ThingBolt", new ThingBolt(), 1).shuffleGrouping("ThingSpout");

        Config config = new Config();
        config.setNumWorkers(1);
        config.setMessageTimeoutSecs(60);
        config.setMaxSpoutPending(100);
        config.setNumAckers(1);if (Boolean.valueOf(args[0])) {
            StormSubmitter.submitTopology("MainTopology", config, builder.createTopology());
        } else {
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("MainTopology", config, builder.createTopology());
        }
    }
}

 

 

ThingSpout代碼如下:

package rexel.topo;

import java.net.URI;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Hashtable;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import rexel.bean.PropertiesBean;
import rexel.utils.CommonUtils;
import rexel.utils.PropertiesUtils;

public class ThingSpout extends BaseRichSpout {
    private static PropertiesUtils propertiesUtils = PropertiesUtils.getInstance();
    private static final long serialVersionUID = 1L;
    private static LinkedBlockingQueue<byte[]> queue;
    private static SpoutOutputCollector collector = null;

    @Override
    public void open(Map map, TopologyContext topologyContext,
        SpoutOutputCollector spoutOutputCollector) {
        PropertiesBean propertiesBean = propertiesUtils.readProperties();
        if (propertiesBean == null) {
            return;
        }

        collector = spoutOutputCollector;
        queue = new LinkedBlockingQueue<>(1000);

        //參數說明,請參見:AMQP客戶端接入說明。
        String accessKey = propertiesBean.getAccessKey();
        String accessSecret = propertiesBean.getAccessSecret();
        String uid = propertiesBean.getUid();
        String regionId = propertiesBean.getRegionId();

        String consumerGroupId = "AsgdwvkMT3ygC2IwT9GD000100";
        long timeStamp = System.currentTimeMillis();

        //簽名方法:支持hmacmd5,hmacsha1和hmacsha256
        String signMethod = "hmacsha1";
        //控制台服務端訂閱中消費組狀態頁客戶端ID一欄將顯示clientId參數。
        //建議使用機器UUID、MAC地址、IP等唯一標識等作為clientId。便於您區分識別不同的客戶端。
        String clientId = CommonUtils.getDeviceUnique();

        //UserName組裝方法,請參見文檔:AMQP客戶端接入說明。
        String userName = clientId + "|authMode=aksign"
            + ",signMethod=" + signMethod
            + ",timestamp=" + timeStamp
            + ",authId=" + accessKey
            + ",consumerGroupId=" + consumerGroupId
            + "|";
        //password組裝方法,請參見文檔:AMQP客戶端接入說明。
        String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;
        String password = doSign(signContent,accessSecret, signMethod);
        //按照qpid-jms的規范,組裝連接URL。
        String connectionUrl = "failover:(amqps://" + uid + ".iot-amqp." + regionId + ".aliyuncs.com:5671?amqp.idleTimeout=80000)"
            + "?failover.reconnectDelay=30";

        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF",connectionUrl);
        hashtable.put("queue.QUEUE", "default");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");

        try {
            Context context = new InitialContext(hashtable);
            ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
            Destination queue = (Destination)context.lookup("QUEUE");
            // Create Connection
            Connection connection = cf.createConnection(userName, password);
            ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
            // Create Session
            // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手動調用message.acknowledge()
            // Session.AUTO_ACKNOWLEDGE: SDK自動ACK(推薦)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            connection.start();
            // Create Receiver Link
            MessageConsumer consumer = session.createConsumer(queue);
            consumer.setMessageListener(messageListener);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void nextTuple() {
        try {
            byte[] body = queue.take();
            String uuid = UUID.randomUUID().toString();
            collector.emit(new Values((Object) body), uuid);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("body"));
    }

    private MessageListener messageListener = message -> {
        try {
            byte[] body = message.getBody(byte[].class);
            String content = new String(body);
            String topic = message.getStringProperty("topic");
            String messageId = message.getStringProperty("messageId");
            System.out.println("receive message"
                + ", topic = " + topic + ", msgId = " + messageId + ", content = " + content);
            queue.put(body);
        } catch (Exception e) {
            e.printStackTrace();
        }
    };

    private JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
        /**
         * 連接成功建立。
         */
        @Override
        public void onConnectionEstablished(URI remoteURI) {
            System.out.println("onConnectionEstablished, remoteUri:{}" + remoteURI + "}");
        }

        /**
         * 嘗試過最大重試次數之后,最終連接失敗。
         */
        @Override
        public void onConnectionFailure(Throwable error) {
            System.out.println("onConnectionFailure, {" + error.getMessage() + "}");
        }

        /**
         * 連接中斷。
         */
        @Override
        public void onConnectionInterrupted(URI remoteURI) {
            System.out.println("onConnectionInterrupted, remoteUri:{" + remoteURI + "}");
        }

        /**
         * 連接中斷后又自動重連上。
         */
        @Override
        public void onConnectionRestored(URI remoteURI) {
            System.out.println("onConnectionRestored, remoteUri:{" + remoteURI + "}");
        }

        @Override
        public void onInboundMessage(JmsInboundMessageDispatch envelope) {}

        @Override
        public void onSessionClosed(Session session, Throwable cause) {}

        @Override
        public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}

        @Override
        public void onProducerClosed(MessageProducer producer, Throwable cause) {}
    };

    private String doSign(String toSignString, String secret, String signMethod) {
        SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
        byte[] rawHmac = null;
        try {
            Mac mac = Mac.getInstance(signMethod);
            mac.init(signingKey);
            rawHmac = mac.doFinal(toSignString.getBytes());
        } catch (NoSuchAlgorithmException | InvalidKeyException e) {
            e.printStackTrace();
        }
        return Base64.encodeBase64String(rawHmac);
    }
}

 

ThingBolt代碼如下:

package rexel.topo;

import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

public class ThingBolt extends BaseRichBolt {
    private static final long serialVersionUID = 1L;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {

    }

    @Override
    public void execute(Tuple tuple) {
        String content = new String(tuple.getBinaryByField("body"));
        System.out.println("receive message: {" + content + "}");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

 

 

錯誤的原因是在Spout中有沒有序列化的代碼,將以下兩個對象前面加上static之后,問題解決。

private MessageListener messageListener → private static MessageListener messageListener

private JmsConnectionListener myJmsConnectionListener → private static JmsConnectionListener myJmsConnectionListener

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM