在大數據整個處理流程過程中,數據的流向是一個很重要的問題,本篇博客主要記錄數據是怎么從http發送到kafka的。
使用技術點:
1. java的Vert.x框架
(關於java框架Vert.x的使用示例請移步:http://www.programcreek.com/java-api-examples/index.php?api=io.vertx.core.Vertx)
2. KafkaProducer 的使用
(使用示例移步:http://www.programcreek.com/java-api-examples/index.php?api=org.apache.kafka.clients.producer.KafkaProducer)
在導數據的過程中需要實現的功能:
1. 解析路徑,將路徑的最后一個字符串作為appkey;
2. 數據緩存,當kafka無法正常訪問時在本地cache目錄緩存數據;
3. 安全驗證,對請求的appkey進行合法性驗證;
4. 自動更新appkey列表,每間隔一段時間獲取一次最新的appkey列表;
5. 增加ip字段,給每份數據增加ip字段;
6. 記錄日志,記錄基本的統計信息日志,及異常錯誤信息。
框架結構圖如下所示:
數據整體計算圖如下所示:
計算流程圖如下所示:
在IDEA中的代碼結構為:
主要代碼如下:
package com.donews.data; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; /** * Created by reynold on 16-6-23. * */ public class Configuration { public static final Config conf= ConfigFactory.load(); }
Counter.java
package com.donews.data; import io.vertx.core.Vertx; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** * Created by reynold on 16-6-22. * */ public class Counter { private Logger LOG = LoggerFactory.getLogger(Counter.class); AtomicLong messages = new AtomicLong(0L); AtomicLong bytes = new AtomicLong(0L); private long start = System.currentTimeMillis(); private void reset() { messages.set(0L); bytes.set(0L); start = System.currentTimeMillis(); } public void start(Vertx vertx) { LOG.info("start Counter"); long delay = Configuration.conf.getDuration("server.counter.delay", TimeUnit.MILLISECONDS); vertx.setPeriodic(delay, h -> { long time = System.currentTimeMillis() - start; double rps = messages.get() * 1000.0 / time; double mbps = (bytes.get() * 1000.0 / 1024.0 / 1024.0) / time; Runtime runtime = Runtime.getRuntime(); double totalMem = runtime.totalMemory() * 1.0 / 1024 / 1024; double maxMem = runtime.maxMemory() * 1.0 / 1024 / 1024; double freeMem = runtime.freeMemory() * 1.0 / 1024 / 1024; LOG.info("{0}:Message/S, {1}:MBytes/S", rps, mbps); LOG.info("totalMem:{0}MB maxMem:{1}MB freeMem:{2}MB", totalMem, maxMem, freeMem); reset(); }); } }
KafkaHttpServer.java
package com.donews.data; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import io.vertx.core.Vertx; import io.vertx.core.http.HttpServer; import io.vertx.core.http.HttpServerResponse; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; import io.vertx.ext.web.Router; import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.handler.BodyHandler; import java.io.*; import java.sql.*; import java.time.Instant; import java.util.HashSet; import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.atomic.AtomicInteger; /** * Created by reynold on 16-6-22. * */ public class KafkaHttpServer { private static final Logger LOG = LoggerFactory.getLogger(KafkaHttpServer.class); private final Counter statistic = new Counter(); private static final String DBDRIVER = "com.mysql.jdbc.Driver"; private static final String URL = Configuration.conf.getString("mysql.url"); private static final String USER = Configuration.conf.getString("mysql.user"); private static final String PASSWORD = Configuration.conf.getString("mysql.password"); private static HashSet<String> appkeys = new HashSet<>(); private static boolean deleteFile = true; private void error(HttpServerResponse response, String message) { response.setStatusCode(500).end(new JsonObject() .put("code", 3) .put("msg", message) .encode()); } private void ok(HttpServerResponse response, String message) { response.putHeader("Access-Control-Allow-Origin", "*"); response.setStatusCode(200).end(new JsonObject() .put("code", 0) .put("msg", message) .encode()); } private void startService(int port) { KafkaProducerWrapper sender = new KafkaProducerWrapper(); Vertx vertx = Vertx.vertx(); HttpServer server = vertx.createHttpServer(); Router router = Router.router(vertx); router.route().handler(BodyHandler.create()); router.route("/mininfo/logs").handler(ctx -> { try { JsonArray array = ctx.getBodyAsJsonArray(); String[] messages = new String[array.size()]; for (int i = 0; i < array.size(); i++) { JsonObject message = array.getJsonObject(i); message.put("ip", ctx.request().remoteAddress().host()); if (!message.containsKey("timestamp")) { message.put("timestamp", Instant.now().toString()); } messages[i] = array.getJsonObject(i).encode(); } sendMessages(sender, ctx, "appstatistic_production", messages); } catch (Exception e) { error(ctx.response(), e.getMessage()); } }); router.routeWithRegex("/mininfo/v1/logs/[^/]+").handler(routingContext -> { String path = routingContext.request().path(); String topic = path.substring(path.lastIndexOf("/") + 1); LOG.info("現在處理的topic(appkey)為:" + topic); if (appkeys.contains(topic)) { LOG.info("經過驗證,該topic(appkey)有效"); String[] messages = routingContext.getBodyAsString().split("\n"); //用於執行阻塞任務(有序執行和無序執行),默認順序執行提交的阻塞任務 vertx.executeBlocking(future -> { sendMessages(sender, routingContext, topic, messages); future.complete(); }, result -> { }); } else { LOG.info("您的topic(appkey)還沒有配置,請在mysql中配置先"); error(routingContext.response(), "please configurate " + topic + "(appkey) in Mysql first! After 10mins it`ll take action"); } }); router.route("/mininfo/v1/ip").handler(ctx -> { LOG.info("x-real-for" + ctx.request().getHeader("x-real-for")); LOG.info("x-forwarded-for" + ctx.request().getHeader("x-forwarded-for")); ok(ctx.response(), ctx.request().getHeader("x-forwarded-for")); }); router.route("/*").handler(ctx -> error(ctx.response(), "wrong! check your path...")); server.requestHandler(router::accept).listen(port, result -> { if (result.succeeded()) { LOG.info("listen on port:{0}", String.valueOf(port)); this.statistic.start(vertx); } else { LOG.error(result.cause()); vertx.close(); } }); //如果你需要在你的程序關閉前采取什么措施,那么關閉鈎子(shutdown hook)是很有用的,類似finally Runtime.getRuntime().addShutdownHook(new Thread(sender::close)); } private void sendMessages(KafkaProducerWrapper sender, RoutingContext ctx, String topic, String[] messages) { AtomicInteger counter = new AtomicInteger(0); for (String message : messages) { if (message == null || "".equals(message)) { ok(ctx.response(), "Success"); continue; } //將ip增加到數據的ip字段 JSONObject jsonObject = JSON.parseObject(message); if (jsonObject.get("ip") == null) { LOG.info("正在增加ip字段"); String ip; String header = ctx.request().getHeader("x-forwarded-for"); if (!(header == null || header.trim().length() == 0 || header.trim().equals("null"))) { ip = header.split(",")[0]; } else { ip = ctx.request().remoteAddress().host(); } jsonObject.put("ip", ip); LOG.info("ip增加成功"); } //topic, message, callback,以匿名函數的形式實現接口中的onCompletion函數 sender.send(topic, jsonObject.toString(), (metadata, exception) -> { if (exception != null) { LOG.warn(exception); String msg = new JsonObject() .put("error", exception.getMessage()) .put("commit", counter.get()) .encode(); error(ctx.response(), msg); cacheLocal(jsonObject.toString(), "/home/yuhui/httpkafka/data_bak/" + topic + ".txt"); LOG.info("連接kafka失敗,寫入cache緩存目錄以備份數據"); } else { statistic.messages.incrementAndGet(); // Counter statistic.bytes.addAndGet(message.length()); if (counter.incrementAndGet() == messages.length) { ok(ctx.response(), "Success"); } } }); } } /** * 將發送到kafka失敗的消息緩存到本地 * * @param message message * @param cachePath cachePath */ private void cacheLocal(String message, String cachePath) { try { FileWriter fileWriter = new FileWriter(cachePath, true); BufferedWriter bw = new BufferedWriter(fileWriter); bw.write(message); bw.newLine(); bw.flush(); bw.close(); } catch (IOException e) { e.printStackTrace(); } } /** * 發送緩存數據到kafka,發送成功,刪除緩存數據,失敗過10分鍾重試 * * @param path 保存緩存數據的[目錄] */ private static void sendToKafka(String path) { String message; KafkaProducerWrapper sender = new KafkaProducerWrapper(); File file = new File(path); if (file.isDirectory()) { String[] fileList = file.list(); if (fileList != null && fileList.length != 0) { LOG.info("正在將緩存目錄中的備份數據發送到kafka中..."); for (String str : fileList) { String topic = str.split("\\.")[0]; try { BufferedReader reader = new BufferedReader(new FileReader(path + str)); while ((message = reader.readLine()) != null) { sender.send(topic, message, (metadata, exception) -> { if (metadata != null) { LOG.info("緩存的備份數據正在一條一條的插入kafka中"); } else { //程序錯誤重新運行 // exception.printStackTrace(); LOG.error("kafka連接異常為:===> 10分鍾后會自動重試," + exception.getMessage(), exception); deleteFile = false; } }); } if (deleteFile) { LOG.info("開始刪除已經插入到kafka中的緩存備份數據"); deleteFile(path, topic); LOG.info("刪除完畢!"); } reader.close(); } catch (IOException e) { e.printStackTrace(); } } } else { LOG.info("緩存目錄中沒有備份文件"); } } } private static void deleteFile(String path, String appkey) { String appkeyPath = path + "/" + appkey + ".txt"; File file = new File(appkeyPath); file.delete(); LOG.info("成功刪除appkey為" + appkey + "的緩存數據"); } private static Set<String> getAppkeys() { Set<String> appkeys = new HashSet<>(); String sql = "select appkey from config_table"; try { Class.forName(DBDRIVER); Connection conn = DriverManager.getConnection(URL, USER, PASSWORD); PreparedStatement ps = conn.prepareStatement(sql); ResultSet rs = ps.executeQuery(); while (rs.next()) { appkeys.add(rs.getString(1)); } rs.close(); conn.close(); } catch (ClassNotFoundException | SQLException e) { e.printStackTrace(); } return appkeys; } public static void main(String[] args) throws Exception { Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { appkeys.addAll(getAppkeys()); LOG.info("同步完數據庫中的appkey(每隔十分鍾)"); sendToKafka("/home/leixingzhi7/httpkafka/data_bak/"); // sendToKafka("C:\\Dell\\UpdatePackage\\log"); } }, 0L, 10 * 60 * 1000L); try { int port = Configuration.conf.getInt("server.port"); KafkaHttpServer front = new KafkaHttpServer(); front.startService(port); } catch (Exception e) { e.printStackTrace(); } } }
KafkaProducerWrapper.java
package com.donews.data; import com.typesafe.config.Config; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * Created by reynold on 16-6-22. * */ public class KafkaProducerWrapper { private Logger LOG = LoggerFactory.getLogger(KafkaProducerWrapper.class); private KafkaProducer<String, String> producer = init(); private KafkaProducer<String, String> init() { Config conf = Configuration.conf.getConfig("kafka"); Properties props = new Properties(); props.put("bootstrap.servers", conf.getString("bootstrap.servers")); props.put("acks", conf.getString("acks")); props.put("retries", conf.getInt("retries")); props.put("batch.size", conf.getInt("batch.size")); props.put("linger.ms", conf.getInt("linger.ms")); props.put("buffer.memory", conf.getLong("buffer.memory")); props.put("key.serializer", conf.getString("key.serializer")); props.put("value.serializer", conf.getString("value.serializer")); LOG.info("KafkaProducer Properties: {0}", props.toString()); return new KafkaProducer<>(props); } public void send(String topic, String message, Callback callback) { producer.send(new ProducerRecord<>(topic, message), callback); } public void close() { producer.close(); LOG.info("Kafka Producer Closed"); } public static void main(String[] args) { //KafkaProducerWrapper sender=new KafkaProducerWrapper(); //sender.producer.partitionsFor("xxxxx").forEach(System.out::println); } }
application.conf
server { port = 20000 counter.delay = 30s } kafka { bootstrap.servers = "XXX" acks = all retries = 1 batch.size = 1048576 linger.ms = 1 buffer.memory = 33554432 key.serializer = "org.apache.kafka.common.serialization.StringSerializer" value.serializer = "org.apache.kafka.common.serialization.StringSerializer" } mysql { url = "jdbc:mysql://XXX/user_privileges" user = "XXX" password = "XXX" }
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.donews.data</groupId> <artifactId>kafkahttp</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <!-- https://mvnrepository.com/artifact/com.typesafe/config --> <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-web</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.11</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4</version> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <!-- put your configurations here --> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.donews.data.KafkaHttpServer</mainClass> </transformer> </transformers> <outputFile>${project.build.directory}/${project.artifactId}-fat.jar</outputFile> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>