flink異常:Could not forward element to next operator


異常描述

當flink的數據流中的元素字段內存在字段值為null的時候會報以下異常信息

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)

解決這個問題的方法就是進入流的每一元素中值為null的字段全部賦值,具體操作如下。

null填充類

import java.lang.reflect.{Field, Method}
import java.lang.{Double, Float}

import cn.hutool.core.util.StrUtil
import org.slf4j.LoggerFactory

import scala.collection.mutable.ListBuffer

object BeanUtil {
    val logger = LoggerFactory.getLogger(BeanUtil.getClass)
    def fillNull[T: Manifest](t: T, clazz: Class[T]): T = {
        val fields = getAllFields(clazz)
        var i = 0
        for (field <- fields) {

            try {
                fillNumber(field, t, clazz)
                fillString(field, t, clazz)
            } catch {
                case ex: Exception => logger.error("字段 %s 填充失敗".format(field.getName))
            }
        }
        t
    }

    def fillString[T: Manifest](field: Field, bean: T, clazz: Class[T]): Unit = {
        if (field.getGenericType.toString.equals("class java.lang.String")) {
            val getMethod = clazz.getMethod("get" + getMethodName(field.getName))
            val value = getMethod.invoke(bean).asInstanceOf[String]
            val setMethod = clazz.getMethod("set" + getMethodName(field.getName), classOf[String])
            if (StrUtil.isEmpty(value)) {
                setMethod.invoke(bean, "")
            }
        }
    }
    def fillNumber[T: Manifest](field: Field, bean: T, clazz: Class[T]): Unit = {

        if (field.getGenericType.toString.equals("class java.lang.Double")) {
            val getMethod = clazz.getMethod("get" + getMethodName(field.getName))
            val value = getMethod.invoke(bean).asInstanceOf[Double]
            val setMethod = clazz.getMethod("set" + getMethodName(field.getName), classOf[Double])
            if (value == null) {
                setMethod.invoke(bean, Double.valueOf(0.0f))
            }
        }

        if (field.getGenericType.toString.equals("class java.lang.Float")) {
            val getMethod = clazz.getMethod("get" + getMethodName(field.getName))
            val value = getMethod.invoke(bean).asInstanceOf[Float]

            val setMethod = clazz.getMethod("set" + getMethodName(field.getName), classOf[Double])
            if (value == null) {
                setMethod.invoke(bean, Float.valueOf(0.0f))
            }
        }
        if (field.getGenericType.toString.equals("class java.lang.Integer")) {
            val getMethod = clazz.getMethod("get" + getMethodName(field.getName))
            val value = getMethod.invoke(bean).asInstanceOf[Integer]

            val setMethod = clazz.getMethod("set" + getMethodName(field.getName), classOf[Integer])
            if (value == null) {
                setMethod.invoke(bean, Integer.valueOf(0))
            }
        }
    }

    def getAllFields[T](t: Class[T]): List[Field] = {
        //將所有獲取到的父類屬性加進一個數組中
        val buffer = new ListBuffer[Field]
        var clazz: Class[_ >: T] = t
        while (clazz != null) {
            val fields: Array[Field] = clazz.getDeclaredFields()
            buffer.appendAll(fields)
            clazz = clazz.getSuperclass
        }
        buffer.toList
    }

    def getMethodName(fildeName: String): String = {
        val times = fildeName.getBytes()
        return fildeName.substring(0, 1).toUpperCase + fildeName.substring(1, fildeName.length)
    }
}

調用方法

steamSummary.map(e => BeanUtil.fillNull(e, classOf[MyBean]))


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM