OpenStreetMap數據清洗(SQL&MonogoDB版本)


目標:通過網上下載的OpenStreetMap.xml數據格式,將該文件的格式進行統計,清洗,並導出成CSV格式的文件,最后倒入到SQLite中

本案例中所需的包

import csv
import codecs
import pprint
import re
import xml.etree.cElementTree as ET
from collections import defaultdict
import cerberus
import schema

1.統計文件中每一個標簽出現的次數

思路:將xml文件使用sax解析,將每一個節點的的標簽值設為字典的key,次數為value,初始化為0,

   循環文件,如果可以找到key,那么value的值+1,否則不變

def count_tags(filename):
#1.讀文件 osm
= ET.ElementTree(file=filename)
#2.獲取根節點 root
= osm.getroot()
#3.獲取根節點的標簽,創建一個字典來存放標簽名和次數 tags_count_dic
= {root.tag:0}
#4.循環文件
for _,ele in ET.iterparse(filename,events=('start',)):
#5.如果有元素的tag在字典中,則value的值+1,否則表示該標簽只出現一次
if ele.tag in tags_count_dic: tags_count_dic[ele.tag] += 1 else: tags_count_dic[ele.tag] = 1 return tags_count_dic
def test():
    #測試函數的斷言不出錯,表示結果正確
    tags = count_tags('example.osm')
    pprint.pprint(tags)
    assert tags == {'bounds': 1,
                     'member': 3,
                     'nd': 4,
                     'node': 20,
                     'osm': 1,
                     'relation': 1,
                     'tag': 7,
                     'way': 1}

if __name__ == "__main__":
    test()

2.根據正則表達式,確定各種標簽類型的數量

思路:獲取根據傳入的element,來獲取tag,獲取到tag即可獲取到k的值,在根據正則表達式進行匹配,將匹配成功的值放入到不同的字典中

lower = re.compile(r'^([a-z]|_)*$') #僅包含小寫字母且有效的標記
lower_colon = re.compile(r'^([a-z]|_)*:([a-z]|_)*$') #名稱中有冒號的其他有效標記
problemchars = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]') #字符存在問題的標記

def key_type(element,keys):
#1.找到需要處理的標簽進行處理
if element.tag == 'tag':
#2.獲取帶匹配的字符串 key
= element.attrib['k']
#逐次匹配,並將匹配成功的結果放到keys中,並返回
if lower.search(key): keys['lower'] += 1 elif lower_colon.search(key): keys['lower_colon'] += 1 elif problemchars.search(key): keys['problemchars'] += 1 else: keys['other'] += 1 return keys def process_map(filename): keys = {"lower": 0, "lower_colon": 0, "problemchars": 0, "other": 0} for _, element in ET.iterparse(filename): keys = key_type(element, keys) return keys
def test():
    #測試函數的斷言不報錯,代碼正確
    keys = process_map('example.osm')
    pprint.pprint(keys)
    assert keys == {'lower': 5, 'lower_colon': 0, 'other': 1, 'problemchars': 1}

if __name__ == "__main__":
    test()

3.搜索用戶,返回一組唯一的用戶ID

思路:找到uid所對應的tag,循環xml文件,如果標簽存在uid,就加入到set中返回

def get_user(element):
#如果標簽中包含'uid'這一屬性,則返回該屬性的值
if 'uid' in element.attrib: return element.attrib['uid'] def process_map(filename): users = set()
#循環xml文件,如果每行的元素中有'uid'這一標簽,則其值取出加入到set中,返回
for _, element in ET.iterparse(filename): if get_user(element): users.add(get_user(element)) return users
def test():
    #斷言不出錯,程序正確
    users = process_map('example.osm')
    pprint.pprint(users)
    assert len(users) == 6

if __name__ == "__main__":
    test()

4.完善街道名,將街道中的一些不合法的值去除

思路:循環街道的字典,名稱在mapping,則進行替換,返回替換后的字符串

OSMFILE = "example1.osm"
street_type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)


expected = ["Street", "Avenue", "Boulevard", "Drive", "Court", "Place", "Square", "Lane", "Road", 
            "Trail", "Parkway", "Commons"]

# UPDATE THIS VARIABLE
#題目這里的字符串需要更改,否則結果錯誤
mapping = { "Rd.": "Road", "St.": "Street", "Ave": "Avenue" } def audit_street_type(street_types,street_name): m = street_type_re.search(street_name) if m: street_type = m.group() if street_type not in expected: street_types[street_type].add(street_name) def is_street_name(elem): return (elem.attrib['k'] == 'addr:street') def audit(osmfile): osm_file = open(osmfile,'r') street_types = defaultdict(set) for event,ele in ET.iterparse(osmfile,events=('start',)): if ele.tag == 'tag' or ele.tag == 'way': for tag in ele.iter('tag'): if is_street_name(tag): audit_street_type(street_types,tag.attrib['v']) osm_file.close() return street_types def update_name(name, mapping): #獲取需要修改的key changewords = mapping.keys()
#如果名稱相同,則替換字符,並返回
for word in changewords: if word in name: name = name.replace(word,mapping.get(word)) return name

def test():
#斷言不出錯,則結果正確 st_types
= audit(OSMFILE) assert len(st_types) == 3 pprint.pprint(dict(st_types)) for st_type, ways in st_types.iteritems(): for name in ways: better_name = update_name(name, mapping) print name, "=>", better_name if name == "West Lexington St.": assert better_name == "West Lexington Street" if name == "Baldwin Rd.": assert better_name == "Baldwin Road" if __name__ == "__main__": test()

5.數據清洗

  目標數據的結構

  node節點需要[id,user,uid,version,lat,lon,timestamp,changeset]

  node節點下的tags子節點需要[id,key,value,type]

{'node': {'id': 757860928,
          'user': 'uboot',
          'uid': 26299,
       'version': '2',
          'lat': 41.9747374,
          'lon': -87.6920102,
          'timestamp': '2010-07-22T16:16:51Z',
      'changeset': 5288876},
 'node_tags': [{'id': 757860928,
                'key': 'amenity',
                'value': 'fast_food',
                'type': 'regular'},
               {'id': 757860928,
                'key': 'cuisine',
                'value': 'sausage',
                'type': 'regular'},
               {'id': 757860928,
                'key': 'name',
                'value': "Shelly's Tasty Freeze",
                'type': 'regular'}]}

way節點需要[id,user,uid,version,timestamp,changeset]

way節點下的nodes子節點需要[id,node_id,position]

way節點下的tag子節點需要[id,key,value,type]

{'way': {'id': 209809850,
         'user': 'chicago-buildings',
         'uid': 674454,
         'version': '1',
         'timestamp': '2013-03-13T15:58:04Z',
         'changeset': 15353317},
 'way_nodes': [{'id': 209809850, 'node_id': 2199822281, 'position': 0},
               {'id': 209809850, 'node_id': 2199822390, 'position': 1},
               {'id': 209809850, 'node_id': 2199822392, 'position': 2},
               {'id': 209809850, 'node_id': 2199822369, 'position': 3},
               {'id': 209809850, 'node_id': 2199822370, 'position': 4},
               {'id': 209809850, 'node_id': 2199822284, 'position': 5},
               {'id': 209809850, 'node_id': 2199822281, 'position': 6}],
 'way_tags': [{'id': 209809850,
               'key': 'housenumber',
               'type': 'addr',
               'value': '1412'},
              {'id': 209809850,
               'key': 'street',
               'type': 'addr',
               'value': 'West Lexington St.'},
              {'id': 209809850,
               'key': 'street:name',
               'type': 'addr',
               'value': 'Lexington'},
              {'id': '209809850',
               'key': 'street:prefix',
               'type': 'addr',
               'value': 'West'},
              {'id': 209809850,
               'key': 'street:type',
               'type': 'addr',
               'value': 'Street'},
              {'id': 209809850,
               'key': 'building',
               'type': 'regular',
               'value': 'yes'},
              {'id': 209809850,
               'key': 'levels',
               'type': 'building',
               'value': '1'},
              {'id': 209809850,
               'key': 'building_id',
               'type': 'chicago',
               'value': '366409'}]}

思路: 1.使用iterparse便利xml中每一個頂層標簽

         2.使用自定義函數將每個元素變成多個數據結構

         3.利用架構和驗證庫保證數據格式的正確

         4.將每個數據結構寫入相應的csv文件

OSM_PATH = "example1.osm"

NODES_PATH = "nodes.csv"  #node標簽生成的文件名
NODE_TAGS_PATH = "nodes_tags.csv"  #node下的tag標簽生成的文件名
WAYS_PATH = "ways.csv"  #way標簽生成的文件名
WAY_NODES_PATH = "ways_nodes.csv" #way標簽下的node生成的文件名
WAY_TAGS_PATH = "ways_tags.csv" #way標簽下的tag生成的文件名

LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+') #字符串中有冒號和小寫字母的標記
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')#字符存在問題的標記
SCHEMA = schema.schema #模板文件

# Make sure the fields order in the csvs matches the column order in the sql table schema
#每一個生成的文件的表頭
NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp'] NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type'] WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp'] WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type'] WAY_NODES_FIELDS = ['id', 'node_id', 'position'] def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS, problem_chars=PROBLEMCHARS, default_tag_type='regular'): """Clean and shape node or way XML element to Python dict""" node_attribs = {} #存放生成node的key和value,key作表頭,value作內容 way_attribs = {} #存放生成way的key和value,key作表頭,value作內容 way_nodes = [] #存放生成way標簽下的nd子標簽的值,[{...},{...}] tags = [] #存放node和way下的tag子標簽的值 ,[{...},{...},]# Handle secondary tags the same way for both node and way elements # YOUR CODE HERE
#先提取node字段
if element.tag == 'node':
#1.循環node_field表頭,如果element中有key所對應的屬性,則放入到node_attribs字典中
for key in NODE_FIELDS: node_attribs[key] = element.attrib[key]
#2.循環子節點,獲取tags元素的值
for child in element: Node_Tags = {}
#匹配字母和冒號 colon
= re.match(LOWER_COLON,child.attrib['k'])
#匹配異常字符 problem
= re.match(PROBLEMCHARS,child.attrib['k'])
#異常字符直接跳過,進行下一次查找
if problem: continue
#如果tag是包含字母和冒號<tag k="addr:housenumber" v="1412"/>需要解析成{'id': 12345, 'key': 'housenumber', 'value': '1412', 'type': 'addr'}
elif colon:
#從父節點獲取id屬性的值 Node_Tags[
'id'] = element.attrib['id']
#獲取k="addr:housenumber"的值,以:拆分,第一個值為type的值 type_value
= child.attrib['k'].split(':',1)[0] Node_Tags['type'] = type_value
#獲取k="addr:housenumber"的值,以:拆分,第二個值為key的值 Node_Tags[
'key'] = child.attrib['k'].split(':',1)[1]
#獲取v=1412的值,為value的值 Node_Tags[
'value'] = child.attrib['v']
#將處理后的數據加入到字典中 tags.append(Node_Tags)
#tag不包含冒號<tag k="building" v="yes"/>
else:
#從父節點獲取id屬性的值 Node_Tags[
'id'] = element.attrib['id']
#type的值是 regular Node_Tags[
'type'] = 'regular'
#獲取k=building的值,為key的值
Node_Tags['key'] = child.attrib['k']
#獲取v=yes的值,為value的值 Node_Tags[
'value'] = child.attrib['v']
#將處理后的數據加入到字典中 tags.append(Node_Tags)
#返回node處理之后的結果
return {'node': node_attribs, 'node_tags': tags}
#在提取way字段
elif element.tag == 'way':
1.循環way_field表頭,如果element中有key所對應的屬性,則放入到way_attribs字典中
for key in WAY_FIELDS: way_attribs[key] = element.attrib[key] counter = 0 #計數,用於填充way下面nd子標簽的position的值
#循環父節點下的子節點
for child in element: Way_Nodes = {} #存放nd子標簽 Way_Tags = {} #存放tag子標簽
#處理nd子標簽
if child.tag == 'nd':
#從父節點獲取id屬性的值 Way_Nodes[
'id'] = element.attrib['id']
#從自身的ref,來獲取該屬性的值 Way_Nodes[
'node_id'] = child.attrib['ref']
#獲取position的值,每循環一次nd,counter + 1 Way_Nodes[
'position'] = counter counter += 1
將處理后的nd子節點數據加入到字典中 way_nodes.append(Way_Nodes)
#處理tag子標簽
elif child.tag == 'tag':
#同處理node下的tag子節點 colon
= re.match(LOWER_COLON,child.attrib['k']) problem = re.match(PROBLEMCHARS,child.attrib['k']) if problem: continue elif colon: Way_Tags['id'] = element.attrib['id'] type_value = child.attrib['k'].split(':',1)[0] Way_Tags['key'] = child.attrib['k'].split(':',1)[1] Way_Tags['type'] = type_value Way_Tags['value'] = child.attrib['v'] tags.append(Way_Tags) else: Way_Tags['id'] = element.attrib['id'] Way_Tags['key'] = child.attrib['k'] Way_Tags['type'] = 'regular' Way_Tags['value'] = child.attrib['v'] tags.append(Way_Tags) return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags} def get_element(osm_file, tags=('node', 'way', 'relation')): """Yield element if it is the right type of tag""" """如果是正確的類型時,返回標簽中的tag""" context = ET.iterparse(osm_file, events=('start', 'end')) _, root = next(context) for event, elem in context: if event == 'end' and elem.tag in tags: yield elem root.clear() def validate_element(element, validator, schema=SCHEMA): """Raise ValidationError if element does not match schema"""
"""當和schema的數據格式不匹配時,拋出異常"""
if validator.validate(element, schema) is not True: field, errors = next(validator.errors.iteritems()) message_string = "\nElement of type '{0}' has the following errors:\n{1}" error_string = pprint.pformat(errors) raise Exception(message_string.format(field, error_string)) class UnicodeDictWriter(csv.DictWriter, object): """Extend csv.DictWriter to handle Unicode input""" """擴展csv下的DictWriter方法的去支持Unicode輸入""" def writerow(self, row): super(UnicodeDictWriter, self).writerow({ k: (v.encode('utf-8') if isinstance(v, unicode) else v) for k, v in row.iteritems() }) def writerows(self, rows): for row in rows: self.writerow(row) def process_map(file_in, validate): """Iteratively process each XML element and write to csv(s)""" """將處理好的xml文件寫入到csv中""" with codecs.open(NODES_PATH, 'w') as nodes_file, \ codecs.open(NODE_TAGS_PATH, 'w') as nodes_tags_file, \ codecs.open(WAYS_PATH, 'w') as ways_file, \ codecs.open(WAY_NODES_PATH, 'w') as way_nodes_file, \ codecs.open(WAY_TAGS_PATH, 'w') as way_tags_file: nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS) node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS) ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS) way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS) way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS) nodes_writer.writeheader() node_tags_writer.writeheader() ways_writer.writeheader() way_nodes_writer.writeheader() way_tags_writer.writeheader() validator = cerberus.Validator() for element in get_element(file_in, tags=('node', 'way')): el = shape_element(element) if el: if validate is True: validate_element(el, validator) if element.tag == 'node': nodes_writer.writerow(el['node']) node_tags_writer.writerows(el['node_tags']) elif element.tag == 'way': ways_writer.writerow(el['way']) way_nodes_writer.writerows(el['way_nodes']) way_tags_writer.writerows(el['way_tags'])

第二種方法

高階解法

def shape_tag(el, tag): 
#tag標簽返回的格式(el是父節點標簽指的是node標簽,tag是子節點指的是tag標簽) tag
= { 'id' : el.attrib['id'], 'key' : tag.attrib['k'], 'value': tag.attrib['v'], 'type' : 'regular' } if LOWER_COLON.match(tag['key']):
#如果tag的key中出現冒號<tag k="addr:housenumber" v="1412"/>,則根據:進行拆分,獲取type和key tag[
'type'], _, tag['key'] = tag['key'].partition(':') return tag def shape_way_node(el, i, nd):
#way下的nd標簽返回的格式(el是父節點標簽指的是way標簽,nd是子節點指的是nd標簽)
return { 'id' : el.attrib['id'], 'node_id' : nd.attrib['ref'], 'position' : i } def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS, problem_chars=PROBLEMCHARS, default_tag_type='regular'): """Clean and shape node or way XML element to Python dict""" node_attribs = {} way_attribs = {} way_nodes = []
#直接獲取所有的tag子標簽 tags
= [shape_tag(element, t) for t in element.iter('tag')] # Handle secondary tags the same way for both node and way elements # YOUR CODE HERE if element.tag == 'node': node_attribs = {f: element.attrib[f] for f in node_attr_fields} return {'node': node_attribs, 'node_tags': tags} elif element.tag == 'way': way_attribs = {f: element.attrib[f] for f in way_attr_fields} #獲取way標簽下nd標簽的各個值 way_nodes = [shape_way_node(element, i, nd) for i, nd in enumerate(element.iter('nd'))] return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}

 6.數據清洗(准備數據庫 MonogoDB版本)

    處理數據並將數據形狀變成我們之前提到的模型。輸出應該是字典列表,如下所示

{
"id": "2406124091",
"type: "node",
"visible":"true",
"created": {
          "version":"2",
          "changeset":"17206049",
          "timestamp":"2013-08-03T16:43:42Z",
          "user":"linuxUser16",
          "uid":"1219059"
        },
"pos": [41.9757030, -87.6921867],
"address": {
          "housenumber": "5157",
          "postcode": "60625",
          "street": "North Lincoln Ave"
        },
"amenity": "restaurant",
"cuisine": "mexican",
"name": "La Cabana De Don Luis",
"phone": "1 (773)-271-5176"
}

要求:

1.node下的tag子標簽處理如下

<tag k="addr:housenumber" v="5158"/>
<tag k="addr:street" v="North Lincoln Avenue"/>
<tag k="addr:street:name" v="Lincoln"/>
<tag k="addr:street:prefix" v="North"/>
<tag k="addr:street:type" v="Avenue"/>
<tag k="amenity" v="pharmacy"/>


應該改寫為:

{...
"address": {
    "housenumber": 5158,
    "street": "North Lincoln Avenue"
}
"amenity": "pharmacy",
...
}

2.way標簽下的子標簽的處理應該改為

<nd ref="305896090"/>
<nd ref="1719825889"/>
應該改為
"node_refs": ["305896090", "1719825889"]

 

lower = re.compile(r'^([a-z]|_)*$')
lower_colon = re.compile(r'^([a-z]|_)*:([a-z]|_)*$')
problemchars = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

CREATED = [ "version", "changeset", "timestamp", "user", "uid"]


def shape_element(element):
    node = {}
    if element.tag == "node" or element.tag == "way" :
        # YOUR CODE HERE

#1.獲取非父子節點的key和value
node = {tag.attrib['k']:tag.attrib['v'] for tag in element.iter('tag') if not tag.attrib['k'].startswith('addr:') and not problemchars.search(tag.attrib['k'])}
#2.獲取address節點的key和value
#例:<tag k="addr:street:name" v="Lincoln"/>
#1)將addr:后面的字符作為key,v的值作為value
#2)如果標簽中的key值以addr:開頭,且:的數量等於1
#3)且沒有特殊字符的存在
node[
'address'] = {tag.attrib['k'][5:]: tag.attrib['v'] for tag in element.iter('tag') if tag.attrib['k'].startswith('addr:') and tag.attrib['k'].count(':') == 1 and not problemchars.search(tag.attrib['k'])} #3.獲取element節點的屬性
     attr
= element.attrib node['id'] = attr['id'] #獲取id node['type'] = element.tag #獲取類型type node['visible'] = attr.get('visible') #獲取visible

     #4.獲取created節點中的key和value
node[
'created'] = {c:attr[c] for c in CREATED}
#5.如果標簽的類型是way,則從該標簽獲取ref的值封裝到node_refs中,否則直接從node中獲取lat,lon的值填充到pos中
if element.tag == 'way': node['node_refs'] = [nd.attrib['ref'] for nd in element.iter('nd')] else: node['pos'] = [float(attr['lat']),float(attr['lon'])] return node else: return None def process_map(file_in, pretty = False): # You do not need to change this file file_out = "{0}.json".format(file_in) data = [] with codecs.open(file_out, "w") as fo: for _, element in ET.iterparse(file_in): el = shape_element(element) if el: data.append(el) if pretty: fo.write(json.dumps(el, indent=2)+"\n") else: fo.write(json.dumps(el) + "\n") return data

 

def test():
    #如果測試代碼不出錯,則結果正確
    correct_first_elem = {
        "id": "261114295", 
        "visible": "true", 
        "type": "node", 
        "pos": [41.9730791, -87.6866303], 
        "created": {
            "changeset": "11129782", 
            "user": "bbmiller", 
            "version": "7", 
            "uid": "451048", 
            "timestamp": "2012-03-28T18:31:23Z"
        }
    }
    assert data[0] == correct_first_elem
    assert data[-1]["address"] == {
                                    "street": "West Lexington St.", 
                                    "housenumber": "1412"
                                      }
    assert data[-1]["node_refs"] == [ "2199822281", "2199822390",  "2199822392", "2199822369", 
                                    "2199822370", "2199822284", "2199822281"]

if __name__ == "__main__":
    test()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM