原文鏈接:http://www.sjsjw.com/kf_cloud/article/020376ABA013802.asp
目的
實時監聽某目錄下的日志文件,如有新文件切換到新文件,並同步寫入kafka,同時記錄日志文件的行位置,以應對進程異常退出,能從上次的文件位置開始讀取(考慮到效率,這里是每100條記一次,可調整)
源碼:
import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.LineNumberReader; import java.io.PrintWriter; import java.io.RandomAccessFile; import java.net.NoRouteToHostException; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Properties; import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /* * 自己在源服務器寫生產者往kafka插入數據,注意文件"producer.properties放在linux下該jar文件同一目錄 * 監聽某個目錄下的文件數據然后寫入kafka * nohup java -jar portallog_producer.jar portallog /var/apache/logs portallog.position >/home/sre/portalhandler/handler.log 2>&1 & * * */ public class PortalLogTail_Line { private Producer<String,String> inner; java.util.Random ran = new Random(); public PortalLogTail_Line() throws FileNotFoundException, IOException { Properties properties = new Properties(); // properties.load(ClassLoader.getSystemResourceAsStream("producer.properties")); properties.load(new FileInputStream("producer.properties")); ProducerConfig config = new ProducerConfig(properties); inner = new Producer<String, String>(config); } public void send(String topicName,String message) { if(topicName == null || message == null){ return; } // KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message); //隨機作為key,hash分散到各個分區 KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,String.valueOf(ran.nextInt(9)),message); // KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message,message); inner.send(km); } public void send(String topicName,Collection<String> messages) { if(topicName == null || messages == null){ return; } if(messages.isEmpty()){ return; } List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(); for(String entry : messages){ KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry); kms.add(km); } inner.send(kms); } public void close(){ inner.close(); } public String getNewFile(File file) { File[] fs=file.listFiles(); long maxtime=0; String newfilename=""; for (int i=0;i<fs.length;i++) { if (fs[i].lastModified()>maxtime && fs[i].getName().contains("access")) { maxtime=fs[i].lastModified(); newfilename=fs[i].getAbsolutePath(); } } return newfilename; } //寫入文件名及行號 public void writePosition(String path,int rn,String positionpath) { try { BufferedWriter out = new BufferedWriter(new FileWriter(positionpath)); out.write(path+","+rn); out.close(); } catch (IOException e) { } } LineNumberReader randomFile=null; String newfile=null; String thisfile=null; String prefile=null; int ln=0; int beginln=0; public void realtimeShowLog(final File file,final String topicname, final String positionpath) throws IOException{ //啟動一個線程每1秒鍾讀取新增的日志信息 new Thread(new Runnable(){ public void run() { thisfile=getNewFile(file); prefile=thisfile; //訪問position文件,如果記錄了文件路徑,及行號,則定位,否則使用最新的文件 try { BufferedReader br=new BufferedReader(new FileReader(positionpath)); String line=br.readLine(); if (line!=null &&line.contains(",")) { thisfile=line.split(",")[0]; prefile=thisfile; beginln=Integer.parseInt(line.split(",")[1]); } } catch (FileNotFoundException e2) { // TODO Auto-generated catch block e2.printStackTrace(); } catch (IOException e2) { // TODO Auto-generated catch block e2.printStackTrace(); } //指定文件可讀可寫 try { randomFile = new LineNumberReader(new FileReader(thisfile)); } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } while (true) { try { Thread.sleep(100); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } try { //獲得變化部分的 // randomFile.seek(lastTimeFileSize); String tmp = ""; while( (tmp = randomFile.readLine())!= null) { int currln=randomFile.getLineNumber(); //beginln默認為0 if (currln>beginln) send(topicname,new String(tmp.getBytes("utf8"))); ln++; //每發生一條寫一次影響效率,連續發100次后再記錄位置 if (ln>100) { writePosition(thisfile,currln,positionpath); ln=0; } } thisfile=getNewFile(file); if(!thisfile.equals(prefile)) { randomFile.close(); randomFile = new LineNumberReader(new FileReader(thisfile)); prefile=thisfile; beginln=0; } } catch (IOException e) { throw new RuntimeException(e); } } }}).start(); } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { PortalLogTail_Line producer = new PortalLogTail_Line(); if (args.length!=3) { System.out.println("usage:topicname pathname positionpath"); System.exit(1); } String topicname=args[0]; String pathname=args[1]; String positionpath=args[2]; final File tmpLogFile = new File(pathname); producer.realtimeShowLog(tmpLogFile,topicname,positionpath); } }
producer.properties文件放在同級目錄下 metadata.broker.list=xxx:10909,xxx:10909 # name of the partitioner class for partitioning events; default partition spreads data randomly #partitioner.class= # specifies whether the messages are sent asynchronously (async) or synchronously (sync) producer.type=sync #producer.type=async # specify the compression codec for all data generated: none , gzip, snappy. # the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally compression.codec=none #compression.codec=gzip # message encoder serializer.class=kafka.serializer.StringEncoder
測試
最后執行:
nohup java -jar portallog_producer.jar portallog /var/apache/logs portallog.position >/home/sre/portalhandler/handler.log 2>&1 &