自定義經緯度索引(非RTree、Morton Code[z order curve]、Geohash的方式)
Custom Indexing for Latitude-Longitude Data
網絡上有一些經緯度索引的文章,討論的方法是用Geohash、RTree、Z-Order Curve(morton Code)等算法來將經緯度由二維變為一維的索引。
這篇文章介紹一種使用二維數據索引的方式來創建經緯度索引。前半部分討論一種自定義的、使用二維數組的方式將經緯度變為一維數據,來加快索引。
文章最后實現了查找附近幾公里的方法。常見的一些應用是根據某個經緯度點,計算出一定范圍內的其他坐標點。這些坐標點是景區、商場等。中文網絡里,有講GeoHash、RTree的一些文章,但都不夠全面,只是介紹了如何實現Geohash或Rtree,但沒有介紹到最后一個終極目標,即給定一個經緯度 latitude\longitude和距離radius,如何實現查找一定范圍內的所有的點。
本文來源於MSDN雜志的一篇文章,請參考:
https://msdn.microsoft.com/en-us/magazine/jj133823.aspx
它有提供源代碼下載,但我去下載的時候,提供的是錯誤的代碼。
通常,我們在數據庫里會有一張表保存如下的關系:
Id latitude - longitude
0001 47.610 - 122.330
0002 36.175 - 115.136
0003 47.613 - 122.332
Id代表某個主鍵,可以是一個小吃店、商場等。Latitude – longitude 表示該ID所對應的坐標。
一些應用場景包括:
1) 要找出在一定經緯范圍內的所有ID:
SELECT Id
FROM IdInfoTable
WHERE latitude >= 47.0 AND latitude < 48.0
AND longitude >= -123.0 AND longitude < -122.0
2)還有一些應用是用戶會根據自己的坐標點來查找,比如我在 47.003 – 122.311附近,我要找附近5公里范圍的所有小吃店。一種最笨的方法是逐一遍歷表里每一條數據,如果2個坐標點的距離小於5就把這個ID留下作為搜索結果輸出。根據2個經緯度坐標點來計算2個經緯度之間距離的方法,可以參考:
http://www.cnblogs.com/softfair/p/distance_of_two_latitude_and_longitude_points.html
另外一種簡單的想法是:
我們可以考慮把一對經緯映射到一個區域ID(sectorID)。
latitude – longitude Sector ID
47.610 - 122.330 49377
36.175 - 115.136 45424
47.613 - 122.332 49377
Id Sector ID
0001 49377
0002 45424
0003 49377
如果某些經緯度都映射到統一個SectorID,那這個ID,我都可以返回。設想有如下一個應用。我根據地圖上紅色坐標點,得到一個SectorID,假設為49377,我本地保存的信息表IdInfoTable里,也把經緯度信息映射為SectorID,那我做查詢:
SELECT Id
FROM
IdInfoTable
WHERE sector = 49377
就能立刻找到我需要的哪些
ID
是與圖上紅色點經緯度相同的一個區域的。
經緯度一種簡單的划分,就是二維數組:
首先第一步,就是考慮如何將全球的經緯度划分開,比如我這里是每0.5度划分一小格。如下圖,fraction = 0.5 表示間隔數。即每一度間隔0.5。
緯度從-90 到 90度。可以划分為:
[-90.0, -89.5)
[-89.5, -80.0)
[-80.0, -79.5)
….
[89.5, 90.0]
最后一個是閉合區間,讓它包含90度。如下表格的第一列所示,這樣緯度間隔總共有360度。
90-(-90)/0.5=360
經度從-180 到 180度,按照0.5間隔划分,可以表示為:
[-180.0, -179.5)
[-179.5, -170.0)
…..
[179.5, 180.0]
最后一個是閉合區間,讓它包含180度。如下表格第一行所示。這樣緯度間隔總共有720度。
180-(-180)/0.5=720
表格的行代表緯度,一行代表0.5度的跨度。
表格的列代表經度,一列代表0.5度的跨度。
表格總共有360 * 720 = 259,200個,序號從 0 到259,199。
fraction = 0.5 |
|
[-180.0, -179.5) 0 |
[-179.5, -170.0) 1 |
[-170.0, -169.5) 2 |
|
|
[179.5, 180.0] 719 |
[-90.0, -89.5) -> 0 |
0 |
1 |
2 |
... |
|
719 |
|
[-89.5, -89.0) -> 1 |
720 |
721 |
722 |
... |
|
1439 |
|
[-80.0, -79.5) -> 2 |
1440 |
1441 |
1442 |
... |
|
|
|
|
... |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
[89.5, 90.0] -> 359 |
258,480 |
|
|
... |
|
259,199 |
表1
上面使用了fraction=0.5來划分全球的經緯度。你也可以使用更小的間隔來划分全球的經緯度。這個會得到更多的sectorID。更甚者 緯度行使用0.5來划分,經度列用0.2來划分。(最好不要使用0.1這類計算機無法精確表示的浮點數。)這里演示只用0.5。
如果有一個緯度latitude的row索引 ri,有一個經度longitude的列col索引ci,則sectorID是:
sid = (ri * 360 * 1/fraction) + ci
例如:上表格里,Sector 1441的行索引 ri 是2,列索引 ci 是1。
(2 * 360 * 1/0.5) + 1 = (2 * 720) + 1 = 1441.
360 * 1/fraction 這個因子決定了每一行有多少個列(經度360度分為多少份)。這個因子乘以ri就是這一行第一個sectorID。加上ci 就是往該行偏移ci個列,就是最終的SectorID。
下一個難題是如何找到經緯度的索引ri 和ci。笨的辦法是遍歷所有經緯度找到合適的索引,例如遍歷一遍緯度,如上表格從0 到359遍歷找到給定緯度的ri,再從經度的0到719遍歷,找到給定經度的ci。遍歷的多少取決於切分的粒度,切的越細,遍歷區間越大。另外一種就是二分查找。
代碼如下,緯度索引返回的是行ri值,經度索引返回的是列ci值。它們2個函數是相似的,區別只是在LonIndex函數中把常量180需要替換為360;-90 替換為-180;90替換為180。
static int LatIndex(double latitude, double fraction) { latitude = Math.Round(latitude, 8); int lastIndex = (int)(180 * (1.0 / fraction) - 1); double firstLat = -90.0; if (latitude == -90.0) return 0; if (latitude == 90.0) return lastIndex; int lo = 0; int hi = lastIndex; int mid = (lo + hi) / 2; int ct = 0; // To prevent infinite loop while (ct < 10000) { ++ct; double left = firstLat + (fraction * mid); // Left part interval left = Math.Round(left, 8); double right = left + fraction; // Right part interval right = Math.Round(right, 8); if (latitude >= left && latitude < right) return mid; else if (latitude < left) { hi = mid - 1; mid = (lo + hi) / 2; } else { lo = mid + 1; mid = (lo + hi) / 2; } } throw new Exception("LatIndex no return for input = " + latitude); }
上面方法里經緯度用的double。Double、float在計算機里有精度問題,某些數據是表示不正確的,一般也不能用 = 來進行相等比較,例如 double a=-1, b=-1, 這時候計算機判斷 a==b就是false。還有一些除不盡的數也是不能直接=比較的。但可以截取一個有效位數,然后進行比較。為避免equality-of-two-type-double 錯誤,2個double或float類型數據是不能比較大小的。所以這里取一個8位有效數字,然后進行比較。上面的函數的主循環里檢查[a,b) 這個區間。所以需要顯示的判斷 90這個最后一個值。
有了上面的方法介紹,下面就可以把經緯度映射到sectorID了。
static int LatLonToSector(double latitude, double longitude, double fraction) { int latIndex = LatIndex(latitude, fraction); // row int lonIndex = LonIndex(longitude, fraction); // col return (latIndex * 360 * (int)(1.0 / fraction)) + lonIndex; }
Sector 面積的問題。
因為根據上面fraction分割經緯度的方法是將地球平面平均的分割為若干個小塊。但這些小塊的面積是不一樣大小的。如下圖:緯線之間是平行的,每一緯度之間的距離,例如A的距離、B的距離大約是111KM。經線之間是在赤道上距離遠,靠近極點附近的距離近,例如C的距離是比D的距離要小的。所以一個小塊的面積在不同緯度上是不一樣的。
圖1
2個坐標點之間的距離計算方法如下:參考:經緯度距離計算(給定2個經緯度,計算距離) http://www.cnblogs.com/softfair/p/distance_of_two_latitude_and_longitude_points.html
public static double Distance(double lat1, double lon1, double lat2, double lon2) { double r = 6371.0; // approx. radius of earth in km double lat1Radians = (lat1 * Math.PI) / 180.0; double lon1Radians = (lon1 * Math.PI) / 180.0; double lat2Radians = (lat2 * Math.PI) / 180.0; double lon2Radians = (lon2 * Math.PI) / 180.0; double d = r * Math.Acos((Math.Cos(lat1Radians) * Math.Cos(lat2Radians) * Math.Cos(lon2Radians - lon1Radians)) + Math.Sin(lat1Radians) * Math.Sin(lat2Radians))); return d; }
所以,如果知道一個Sector的起始經緯度(latitude, longitude)是能夠計算這個sector的長、寬和面積的。
以下是把sectorID轉化為起始緯度的函數:
static double SectorToLat(int sector, double fraction) { int divisor = 360 * (int)(1.0 / fraction); int row = sector / divisor; return -90.0 + (fraction * row); }
這個函數是LatLonToSector的一個逆向操作。例如給定sectorID=1441,fraction=0.5如上表格,
divisor = 360 * 1.0 / 0.5 = 720 表示有多少個列,是經度的間隔。1441 / 720 = 2 就是行row索引ri。-90.0 + 0.5 * 2 = -90.0 + 1.0 = -89.0 就是[-89.0, -79.5)這個區間的起始緯度。
獲得起始經度的方法是類似的:
static double SectorToLon(int sector, double fraction) { int divisor = 360 * (int)(1.0 / fraction); int col = sector % divisor; return -180.0 + (fraction * col); }
sector % divisor 取模表示所在的列ci。
根據以上2個方法,可以計算面積:
static double Area(int sector, double fraction) { double lat1 = SectorToLat(sector, fraction); double lon1 = SectorToLon(sector, fraction); double lat2 = lat1 + fraction; double lon2 = lon1 + fraction; double width = Distance(lat1, lon1, lat1, lon2); double height = Distance(lat1, lon1, lat2, lon1); return width * height; }
臨近區塊(Adjacent Sectors)
有時候,找到當前Sector是不夠的,還需要找到臨近的Sector。例如在721塊的時候,臨近Sector是0, 1, 2, 720, 722, 1440, 1441 和 1442共八個塊。左右Sector就是SectorId減或加1得到。上下的Sector就是SectorId減加一行所具有的列數總數即360 * (1 / fraction)。另外需要注意的幾個特殊Sector,他們可能是四個角上的、或者是第一行的sector、或者是最后一行的sector,它們臨近的sector是不全的。需要額外判斷。以下是判斷臨近塊的方法:
static bool IsLeftEdge(int sector, double fraction) { int numColumns = (int)(1.0 / fraction) * 360; if (sector % numColumns == 0) return true; else return false; } static bool IsRightEdge(int sector, double fraction) { if (IsLeftEdge(sector + 1, fraction) == true) return true; else return false; } static bool IsTopRow(int sector, double fraction) { int numColumns = (int)(1.0 / fraction) * 360; if (sector >= 0 && sector <= numColumns - 1) return true; else return false; } static bool IsBottomRow(int sector, double fraction) { int numColumns = (int)(1.0 / fraction) * 360; int numRows = (int)(1.0 / fraction) * 180; int firstValueInLastRow = numColumns * (numRows - 1); int lastValueInLastRow = numColumns * numRows - 1; if (sector >= firstValueInLastRow && sector <= lastValueInLastRow) return true; else return false; }
下面的函數是返回臨近點的函數。返回數組里,
result[0]是左上角的SectorId,
result[1]是正上方的SectorId,
result[2]是右上角的SectorId,
result[3]是正左方的SectorId,
result[4]是正右方的SectorId,
result[5]是左下角的SectorId,
result[6]是正下方的SectorId,
result[7]是右下角的SectorId,
static int[] AdjacentSectors(int sector, double fraction) { int[] result = new int[8]; // Clockwise from upper-left int numCols = (int)(1.0 / fraction) * 360; int numRows = (int)(1.0 / fraction) * 180; int firstValueInLastRow = numCols * (numRows - 1); int lastValueInLastRow = numCols * numRows - 1; // General case if (IsLeftEdge(sector, fraction) == false && IsRightEdge(sector, fraction) == false && IsTopRow(sector, fraction) == false && IsBottomRow(sector, fraction) == false) { result[0] = sector - numCols - 1; result[1] = sector - numCols; result[2] = sector - numCols + 1; result[3] = sector - 1; result[4] = sector + 1; result[5] = sector + numCols - 1; result[6] = sector + numCols; result[7] = sector + numCols + 1; return result; } // Deal with special cases here. See code download. throw new Exception("Unexpected logic path in AdjacentSectors"); }
有時候,僅僅查找臨近的8個節點是不夠的。例如:在高緯度地區。如下圖所示,中心區域是紅色的,它四周的8塊Sector是綠色的。如果Fraction很小,導致紅色區域的寬和高都比較小,例如有可能紅色Secotor的寬高是2*10,則當需要查找附近10公里的時候,寬度需要向外延生,比如說左邊,需要向外延生5個臨近快。因為如果當前的點是紅色Sector的起始點,即紅色塊左下角點時,左邊就需要向外延生5*2公里。這樣才能完全覆蓋住半徑10公里的范圍。也就是下圖中黑色線條標出的左邊方向的5個Sector,每個Secotr的寬是2。黑色線條就一個10公里半徑(以紅色Sector左下角作為圓心)。
這樣擴展后,會找到更多的點,包括哪些可能不是10公里范圍內的點。那就需要再用距離公式對這些范圍內的點再做精確的距離計算,來剔除掉一些額外點,保留下盡在范圍的點。
/// <summary> /// 根據經緯度、半徑查找臨近節點,會遞歸查找,找到該半徑區域大小的所有Sector。 /// </summary> /// <param name="latitude">緯度</param> /// <param name="longitude">經度</param> /// <param name="radius">半徑</param> /// <param name="fraction"></param> /// <returns>SectorID</returns> public HashSet<int> FindClosedAdjacentSectors(double latitude, double longitude, double radius) { if (radius > 200.0) { throw new Exception("半徑太大"); //超過200公里,國內查找實際意義不太大。 } //這里使用List來保存所有的sector,再最后輸出的時候再去重復。 //因為下面的查找用的是深度優先遞歸查找,實際測試中發現,如果用HashSet會遺漏掉幾個點。 var allFoundSectorIdSet = new List<int>(); var centralSectorId = MapLonLatCalc.LatLonToSector(latitude, longitude, Fraction); allFoundSectorIdSet.Add(centralSectorId); //計算當前sector的寬和高 var w = 0.0; var h = 0.0; MapLonLatCalc.SectorArea(centralSectorId, Fraction, out w, out h); GetAdjacentSectorsRecursively(centralSectorId, allFoundSectorIdSet,radius+w,radius+h, 200);//給寬度+w,高度+h,冗余,不同經緯度上多找一些。以免遺漏 return new HashSet<int>(allFoundSectorIdSet); }
在這里,我做遞歸查找時,將寬和高都額外給了一個距離,這個距離是中心塊的寬和高。上面介紹的SectorToLat和SectorToLon得到的lat和lon都是Sector左上角的坐標位置(如下圖藍色點位置),一個Sector是一個矩形。如果我們輸入的源點是藍色點,那遞歸查找時,無需額外的增加寬和高。但如果我們輸入的源點是紅色的點的位置,在做半徑10公里這類查找時,我們就需要額外的把紅色點所在的Sector的寬和高給補足進去。代碼里計算的距離的參考點都是以藍色的點為基准的。
以上代碼使用List<int> 來保存所有找到的SectorId,會存在重復的Sector,原來使用的HashSet保存,直接在查找時就去重復,但測試后發現一些問題。例如上圖,中心Sector是0,首先找到周邊8個Sector並保存,然后開始深度搜索遞歸,從Sector1首先開始,而Sector1它會優先查找到天藍色線條所在的Sector,當找到Sector7的時候,Sector7又深度優先,會直接把Sector2找到保存下來,當Sector1周邊所有都找遍完了,輪到Sector2來遞歸查找時,發現Sector2已經找過了,就跳過了。導致Sector8沒有被包含進來。這就出現了錯誤。所以現在使用List保存所有Sector,最后再去重復。
遞歸查找:
/// <summary> /// 遞歸查找臨近區域的SectorId /// </summary> /// <param name="sectorId">當前SectorID</param> /// <param name="allSectors">已經存在的SectorId,每找到一個就存入</param> /// <param name="accumulatedWidth">累計的寬度,這里其實是剩余的寬度</param> /// <param name="accumulatedHeight">累計的高度,這里其實剩余的寬度</param> /// <param name="recursiveTime">遞歸次數,避免死死循環</param> /// <param name="fraction"></param> private void GetAdjacentSectorsRecursively(int centorSectorId, List<int> allSectors,double remainWidth,double remainHeight, int recursiveTime) { if (recursiveTime <= 0) { return; } var result = MapLonLatCalc.AdjacentSectors(centorSectorId, Fraction); for (int i = 0; i < result.Length; i++) { var sId = result[i]; if (sId > -1) { allSectors.Add(sId); //計算當前sector的寬和高 var w = 0.0; var h = 0.0; MapLonLatCalc.SectorArea(sId, Fraction, out w, out h); var leftWidth = remainWidth - w;//還剩余多少沒有找到?如果>0,就需要遞歸查找,低緯度與高緯度值不一樣。 var leftHeight = remainHeight - h; //看看剩余的高,高是同一經線上的2個維度距離。所以高是平均相等的。 //只要有剩余的寬和高,就遞歸去找。可多找,不可少找 // 這里有個問題,在極點附近lat接近90 的時候,Sector就是幾個,無論怎么遞歸查找,leftWidth 和leftHeight有可能永遠>0. //所以在數據源上要做限制,不允許有極點附近的坐標。還好,目前國內或國外的查找點在極點附近基本沒有什么。 if (leftWidth > 0 || leftHeight > 0) { GetAdjacentSectorsRecursively(sId, allSectors,leftWidth,leftHeight, recursiveTime - 1); } } } }
在極點附近的時候,這里有個問題,在極點附近lat接近90 的時候,Sector就是幾個,無論怎么遞歸查找,leftWidth 和leftHeight有可能永遠>0.
所以在數據源上要做限制,不允許有極點附近的坐標。還好,目前國內或國外的查找點在極點附近基本沒有什么。
測試程序:
隨機生成一批坐標點,緯度范圍控制在 -85 到85之間。目前地球上85度緯度以上的地區基本無有效信息可查。微軟Bing地圖也在這個范圍內。
static void InitLatLon() { Random r = new Random(0); //initialDataFile="..\\..\\UserIDLatLon.txt"; FileStream ofs = new FileStream(initialDataFile, FileMode.Create); StreamWriter sw = new StreamWriter(ofs); for (int i = 0; i < 1000000; ++i) { double lat = (85.0 - (-85.0)) * r.NextDouble() + (-85.0); double lon = (180.0 - (-180.0)) * r.NextDouble() + (-180.0); sw.WriteLine(i.ToString().PadLeft(6, '0') + "," + lat.ToString("F8") + "," + lon.ToString("F8")); } sw.Close(); ofs.Close(); }
MyCusterInfo 是模擬現實中的一個實際業務中可能需要的一些地理信息。它除了ID、經緯度坐標外,可能還包含其他信息,這里用ExtraInfo表示。
public class MyCusterInfo { public int Id { get; set; } public double LAT { get; set; } public double LON { get; set; } /// <summary> /// 額外信息 /// </summary> public object ExtraInfo { get; set; } }
從文件中把經緯度信息讀取出來放在內存中:
Dictionary<int, MyCusterInfo> myLatLonCusterInfo = new Dictionary<int, MyCusterInfo>(1000000); //initialDataFile = "..\\..\\UserIDLatLon.txt"; FileStream ifs = new FileStream(initialDataFile, FileMode.Open); StreamReader sr = new StreamReader(ifs); string line = ""; string[] tokens = null; while ((line = sr.ReadLine()) != null) { tokens = line.Split(','); int id = int.Parse(tokens[0]); double lat = double.Parse(tokens[1]); double lon = double.Parse(tokens[2]); bool vld = false; double vlat = -1.0, vlon = -1.0; vld = findObjects.ConvertToValidLatLon(lat, lon, out vlat, out vlon); if (vld) { var obj=new MyCusterInfo { Id = id, LAT = vlat, LON = vlon }; myLatLonCusterInfo[id]=obj; } }
完整代碼,請從這里獲取: http://download.csdn.net/detail/soft_fair/8680673
經過測試程序,發現使用笨辦法 和使用這個自定義經緯度索引的方法,返回的結果數是一致的。自定義索引的方法更快、更高效。目前已經應用於互聯網生產環境。