python spark example


這是年初寫的一個job,用於對api非法訪問(大量403)進行統計,並有其他后續手段。寫的比較通俗易懂,做個sample記錄下

數據源是kafka stream,實時計算。規則是mysql配置的,簡單說就是1分鍾內超過多少次403就記錄下來

  1 import json
  2 import logging
  3 from datetime import datetime
  4 
  5 import MySQLdb
  6 from pyspark import SparkContext, SparkConf
  7 from pyspark.streaming import StreamingContext
  8 from pyspark.streaming.kafka import KafkaUtils
  9 
 10 logger = logging.getLogger()
 11 hdlr = logging.FileHandler('nginx_log_stats.log')
 12 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
 13 hdlr.setFormatter(formatter)
 14 logger.addHandler(hdlr)
 15 logger.setLevel(logging.ERROR)
 16 
 17 
 18 def is_in_one_minute(nginx_timestamp):
 19     """
 20     :param nginx_time: "timestamp": "16/Feb/2017:08:23:59 +0000"
 21     :return:
 22     """
 23     now = datetime.now()
 24     nginx_datetime = datetime.strptime(nginx_timestamp.split('+')[0].strip(),
 25                                        '%d/%b/%Y:%H:%M:%S')
 26     return (now - nginx_datetime).seconds <= 60 if now > nginx_datetime else (nginx_datetime - now).seconds <= 60
 27 
 28 
 29 
 30 # save to mysql
 31 def saveToMysql(partition):
 32     host = "..."
 33     user = "..."
 34     password = "..."
 35     db_name = "..._monitor"
 36     db = MySQLdb.connect(host, user, password, db_name, charset='utf8')
 37     db.autocommit(True)
 38     cursor = db.cursor()
 39     for d1ct in partition:
 40         sql = r"""INSERT INTO `security_suspect_request` (`domain`, `api`, `code`, `ip`, `access_count`) VALUES ('{domain}', '{api}', '{code}', '{ip}', {access_count})""".format(
 41             domain=d1ct['domain'], api=d1ct['path'], code=d1ct['response'], ip=d1ct['ip'],
 42             access_count=d1ct['count'])
 43         cursor.execute(sql)
 44     db.close()
 45 
 46 
 47 def dictfetchall(cursor):
 48     "Return all rows from a cursor as a dict"
 49     columns = [col[0] for col in cursor.description]
 50     return [
 51         dict(zip(columns, row))
 52         for row in cursor.fetchall()
 53         ]
 54 
 55 
 56 def filterPolicy(log):
 57     '''
 58 {
 59   "path": "/var/log/nginx/webapi..../access-log",
 60   "host": "ip-10-...",
 61   "clientip": "10....",
 62   "timestamp": "16/Feb/2017:08:23:59 +0000",
 63   "domain": "...com",
 64   "verb": "POST",
 65   "request_path": "/video/upload",
 66   "request_param": "sig=b400fdce...&userId=...",
 67   "httpversion": "1.1",
 68   "response": "403",
 69   "bytes": "0",
 70   "agent": "Dalvik/1.6.0 (Linux; U; Android 4.4.4; SM-T561 Build/KTU84P)",
 71   "response_time": "0.110",
 72   "topic": "nginx"
 73 }
 74     '''
 75     # true save . false ignore
 76     true_flag = 0
 77     this = json.loads(log[1])
 78     # filter time
 79     if not is_in_one_minute(this['timestamp']):
 80         return False
 81     # filter condition
 82     for policy in filterVar.value:
 83         if policy['domain'] == 'all' or ('domain' in this.keys() and this['domain'] == policy['domain']):
 84             if policy['api'] == 'all' or ('request_path' in this.keys() and this['request_path'] == policy['api']):
 85                 if 'response' in this.keys() and this['response'] == str(policy['code']):
 86                     true_flag += 1
 87 
 88     return True if true_flag else False
 89 
 90 
 91 def countMap(log):
 92     import json, re
 93     this = json.loads(log[1])
 94     key = this.get('domain', "") + "--" + re.sub(r'\/\d+$', r'',
 95                                                  this.get('request_path', "") + "--" + this.get(
 96                                                      'clientip') + "--" + this.get('response'))
 97     value = {'count': 1}
 98     return key, value
 99 
100 
101 def countReduce(prev, cur):
102     cur['count'] = cur['count'] + prev['count']
103     return cur
104 
105 
106 def output(tup1e):
107     """
108     a touple (key, value)
109     """
110     tup1e[1]['domain'], tup1e[1]['path'], tup1e[1]['ip'], tup1e[1]['response'] = tup1e[0].split('--')
111     return tup1e[1]
112 
113 
114 def youAreUnderArrest(d1ct):
115     mylimit = None
116     for row in filterVar.value:
117         if row['domain'] == 'all' or row['domain'] == d1ct['domain']:
118             if row['api'] == 'all' or row['api'] == d1ct['path']:
119                 if row['code'] == int(d1ct['response']):
120                     mylimit = row['limit']
121 
122     return False if mylimit is None else d1ct['count'] >= mylimit
123 
124 
125 if __name__ == "__main__":
126     host = "..."
127     user = "..."
128     password = "..."
129     db_name = "..._monitor"
130     db = MySQLdb.connect(host, user, password, db_name, charset='utf8')
131     db.autocommit(True)
132     cur = db.cursor()
133     try:
134         # for now only support 1 row
135         cur.execute(r"""SELECT * FROM security_anti_hacker_policy""")
136         filter_option = dictfetchall(cur)
137     finally:
138         db.close()
139 
140     topic = 'nginx.log'
141     zkQuorum = '...:2181,...:2181,...:2181'
142     conf = (SparkConf()
143             .setMaster("spark://...:7077")
144             .setAppName("anti_hacker_stats")
145             .set("spark.driver.memory", "1g")
146             .set("spark.executor.memory", "1g")
147             .set("spark.cores.max", 2))
148     sc = SparkContext(conf=conf)
149     # broadcast variable for share
150     filterVar = sc.broadcast(filter_option)
151     ssc = StreamingContext(sc, 60)
152     kvs = KafkaUtils.createStream(ssc, zkQuorum, "anti-hacker", {topic: 1},
153                                   {"auto.offset.reset": 'largest'})
154     lines = kvs.filter(filterPolicy).map(countMap).reduceByKey(countReduce).map(output).filter(youAreUnderArrest)
155     lines.foreachRDD(lambda rdd: rdd.foreachPartition(saveToMysql))
156     # lines.saveAsTextFiles('test')
157     # lines = kvs.filter(filterPolicy)
158     # lines.pprint()
159     ssc.start()
160     ssc.awaitTermination()

python寫spark需要在spark服務器上用pyspark執行,調試很不方便,更建議用scala,另有example

幾個重點:

  1. 因為spark天然就是分布式的,所以每個rdd可以認為就是在在不同的機器上,是不能共享jdbc connection的,需要各寫各的
  2. 因為上面那個原因,如果需要共享數據呢?很直觀,就是150行那句sc.broadcast,將共享數據廣播給各個rdd
  3. 數據格式很重要,你必須了解數據源里的格式


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM