远程调用Spark平台中的程序


用scala语言,开发好了在spark平台上可以一直运行的机器学习模型 
现在有个需求: 
要远程调用该模型的一些方法并获取结果 
那么可以使用jetty在服务器端主节点占用一个端口然后对外提供http服务

import javax.servlet.http.{HttpServletRequest, HttpServletResponse} import com.xxx.rec.basic.ccam.CanonicalCorrelationAnalysisModel import org.mortbay.jetty.{HttpStatus, Request, Server} import org.mortbay.jetty.handler._ object CannonicalCorrelationAnalysisModelJerseyServer extends AbstractHandler{ var model: CanonicalCorrelationAnalysisModel = null

  /** * 处理请求 返回响应 * @param target * @param request * @param response * @param dispatch */
  override def handle(target: String, request: HttpServletRequest, response: HttpServletResponse, dispatch: Int): Unit = { val url=request.getRequestURI url.substring(url.lastIndexOf("/")+1,url.length) match { case "recommend" => { //request中的target 用,号分割
        val target: Seq[String] = request.getParameter("target").split(",").toSeq val topNum: Int = request.getParameter("topNum").toInt val result = model.recommend(target, topNum) response.setStatus(HttpStatus.ORDINAL_200_OK); response.getWriter().println(result.mkString(",")) request.asInstanceOf[Request].setHandled(true) response.getWriter.close() } case _ => { response.setStatus(HttpStatus.ORDINAL_404_Not_Found); request.asInstanceOf[Request].setHandled(true) } } } def main(args: Array[String]): Unit = { import org.apache.spark.{SparkConf, SparkContext} val sparkConf = new SparkConf().setAppName("CanonicalCorrelationAnalysisModelDemo") val textFilePath = "file:///home/xxx/xxx.txt" val sc = new SparkContext(sparkConf) val data = sc.textFile(textFilePath).map { line => line.split(' ') }.cache() model = CanonicalCorrelationAnalysisModel.createModel(data, 0.3, 5) val server=new Server(9998) server.setHandler(this) server.start() } }

该程序运行后占用了服务器端主节点的9998端口,通过http访问即可


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM