准備
我的環境是python3.6,sc2包0.11.1
機器學習包下載鏈接:pysc2
地圖下載鏈接:maps
游戲下載鏈接:國際服 國服
pysc2是DeepMind開發的星際爭霸Ⅱ學習環境。 它是封裝星際爭霸Ⅱ機器學習API,同時也提供Python增強學習環境。
以神族為例編寫代碼,神族建築科技圖如下:
教程
采礦
# -*- encoding: utf-8 -*-
'''
@File : __init__.py.py
@Modify Time @Author @Desciption
------------ ------- -----------
2019/11/3 12:32 Jonas None
'''
import sc2
from sc2 import run_game, maps, Race, Difficulty
from sc2.player import Bot, Computer
class SentdeBot(sc2.BotAI):
async def on_step(self, iteration: int):
await self.distribute_workers()
run_game(maps.get("AcidPlantLE"), [
Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Easy)
],realtime = True)
注意
game_data.py的assert self.id != 0
注釋掉
pixel_map.py的assert self.bits_per_pixel % 8 == 0, "Unsupported pixel density"
注釋掉
否則會報錯
運行結果如下,農民開始采礦
可以正常采礦
建造農民和水晶塔
import sc2
from sc2 import run_game, maps, Race, Difficulty
from sc2.player import Bot, Computer
from sc2.constants import *
class SentdeBot(sc2.BotAI):
async def on_step(self, iteration: int):
await self.distribute_workers()
await self.build_workers()
await self.build_pylons()
# 建造農民
async def build_workers(self):
# 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於占用資源
for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
# 是否有50晶體礦
if self.can_afford(UnitTypeId.PROBE):
await self.do(nexus.train(UnitTypeId.PROBE))
## 建造水晶
async def build_pylons(self):
## 供應人口和現有人口之差小於5且水晶不是正在建造
if self.supply_left<5 and not self.already_pending(UnitTypeId.PYLON):
nexuses = self.units(UnitTypeId.NEXUS).ready
if nexuses.exists:
if self.can_afford(UnitTypeId.PYLON):
await self.build(UnitTypeId.PYLON,near=nexuses.first)
### 啟動游戲
run_game(maps.get("AcidPlantLE"), [
Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Easy)
],realtime = True)
運行結果如下,基地造農民,農民造水晶
收集氣體和開礦
代碼如下
import sc2
from sc2 import run_game, maps, Race, Difficulty
from sc2.player import Bot, Computer
from sc2.constants import *
class SentdeBot(sc2.BotAI):
async def on_step(self, iteration: int):
await self.distribute_workers()
await self.build_workers()
await self.build_pylons()
await self.build_assimilators()
await self.expand()
# 建造農民
async def build_workers(self):
# 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於占用資源
for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
# 是否有50晶體礦
if self.can_afford(UnitTypeId.PROBE):
await self.do(nexus.train(UnitTypeId.PROBE))
## 建造水晶
async def build_pylons(self):
## 供應人口和現有人口之差小於5且建築不是正在建造
if self.supply_left < 5 and not self.already_pending(UnitTypeId.PYLON):
nexuses = self.units(UnitTypeId.NEXUS).ready
if nexuses.exists:
if self.can_afford(UnitTypeId.PYLON):
await self.build(UnitTypeId.PYLON, near=nexuses.first)
## 建造吸收廠
async def build_assimilators(self):
for nexus in self.units(UnitTypeId.NEXUS).ready:
# 在瓦斯泉上建造吸收廠
vaspenes = self.state.vespene_geyser.closer_than(15.0,nexus)
for vaspene in vaspenes:
if not self.can_afford(UnitTypeId.ASSIMILATOR):
break
worker = self.select_build_worker(vaspene.position)
if worker is None:
break
if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0,vaspene).exists:
await self.do(worker.build(UnitTypeId.ASSIMILATOR,vaspene))
## 開礦
async def expand(self):
if self.units(UnitTypeId.NEXUS).amount<3 and self.can_afford(UnitTypeId.NEXUS):
await self.expand_now()
## 啟動游戲
run_game(maps.get("AcidPlantLE"), [
Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Easy)
], realtime=False)
run_game的realtime設置成False,可以在加速模式下運行游戲。
運行效果如下:
可以建造吸收廠和開礦
建造軍隊
import sc2
from sc2 import run_game, maps, Race, Difficulty
from sc2.player import Bot, Computer
from sc2.constants import *
class SentdeBot(sc2.BotAI):
async def on_step(self, iteration: int):
await self.distribute_workers()
await self.build_workers()
await self.build_pylons()
await self.build_assimilators()
await self.expand()
await self.offensive_force_buildings()
await self.build_offensive_force()
# 建造農民
async def build_workers(self):
# 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於占用資源
for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
# 是否有50晶體礦
if self.can_afford(UnitTypeId.PROBE):
await self.do(nexus.train(UnitTypeId.PROBE))
## 建造水晶
async def build_pylons(self):
## 供應人口和現有人口之差小於5且建築不是正在建造
if self.supply_left < 5 and not self.already_pending(UnitTypeId.PYLON):
nexuses = self.units(UnitTypeId.NEXUS).ready
if nexuses.exists:
if self.can_afford(UnitTypeId.PYLON):
await self.build(UnitTypeId.PYLON, near=nexuses.first)
## 建造吸收廠
async def build_assimilators(self):
for nexus in self.units(UnitTypeId.NEXUS).ready:
# 在瓦斯泉上建造吸收廠
vaspenes = self.state.vespene_geyser.closer_than(15.0,nexus)
for vaspene in vaspenes:
if not self.can_afford(UnitTypeId.ASSIMILATOR):
break
worker = self.select_build_worker(vaspene.position)
if worker is None:
break
if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0,vaspene).exists:
await self.do(worker.build(UnitTypeId.ASSIMILATOR,vaspene))
## 開礦
async def expand(self):
if self.units(UnitTypeId.NEXUS).amount<2 and self.can_afford(UnitTypeId.NEXUS):
await self.expand_now()
## 建造進攻性建築
async def offensive_force_buildings(self):
if self.units(UnitTypeId.PYLON).ready.exists:
pylon = self.units(UnitTypeId.PYLON).ready.random
if self.units(UnitTypeId.PYLON).ready.exists:
# 根據神族建築科技圖,折躍門建造過后才可以建造控制核心
if self.units(UnitTypeId.GATEWAY).ready.exists:
if not self.units(UnitTypeId.CYBERNETICSCORE):
if self.can_afford(UnitTypeId.CYBERNETICSCORE) and not self.already_pending(UnitTypeId.CYBERNETICSCORE):
await self.build(UnitTypeId.CYBERNETICSCORE,near = pylon)
# 否則建造折躍門
else:
if self.can_afford(UnitTypeId.GATEWAY) and not self.already_pending(UnitTypeId.GATEWAY):
await self.build(UnitTypeId.GATEWAY,near=pylon)
# 造兵
async def build_offensive_force(self):
# 無隊列化建造
for gw in self.units(UnitTypeId.GATEWAY).ready.noqueue:
if self.can_afford(UnitTypeId.STALKER) and self.supply_left>0:
await self.do(gw.train(UnitTypeId.STALKER))
## 啟動游戲
run_game(maps.get("AcidPlantLE"), [
Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Easy)
], realtime=False)
運行結果如下:
可以看到,我們建造了折躍門和控制核心並訓練了追獵者
控制部隊進攻
代碼如下
import sc2
from sc2 import run_game, maps, Race, Difficulty
from sc2.player import Bot, Computer
from sc2.constants import *
import random
class SentdeBot(sc2.BotAI):
async def on_step(self, iteration: int):
await self.distribute_workers()
await self.build_workers()
await self.build_pylons()
await self.build_assimilators()
await self.expand()
await self.offensive_force_buildings()
await self.build_offensive_force()
await self.attack()
# 建造農民
async def build_workers(self):
# 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於占用資源
for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
# 是否有50晶體礦
if self.can_afford(UnitTypeId.PROBE):
await self.do(nexus.train(UnitTypeId.PROBE))
## 建造水晶
async def build_pylons(self):
## 供應人口和現有人口之差小於5且建築不是正在建造
if self.supply_left < 5 and not self.already_pending(UnitTypeId.PYLON):
nexuses = self.units(UnitTypeId.NEXUS).ready
if nexuses.exists:
if self.can_afford(UnitTypeId.PYLON):
await self.build(UnitTypeId.PYLON, near=nexuses.first)
## 建造吸收廠
async def build_assimilators(self):
for nexus in self.units(UnitTypeId.NEXUS).ready:
# 在瓦斯泉上建造吸收廠
vaspenes = self.state.vespene_geyser.closer_than(15.0,nexus)
for vaspene in vaspenes:
if not self.can_afford(UnitTypeId.ASSIMILATOR):
break
worker = self.select_build_worker(vaspene.position)
if worker is None:
break
if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0,vaspene).exists:
await self.do(worker.build(UnitTypeId.ASSIMILATOR,vaspene))
## 開礦
async def expand(self):
if self.units(UnitTypeId.NEXUS).amount<3 and self.can_afford(UnitTypeId.NEXUS):
await self.expand_now()
## 建造進攻性建築
async def offensive_force_buildings(self):
if self.units(UnitTypeId.PYLON).ready.exists:
pylon = self.units(UnitTypeId.PYLON).ready.random
# 根據神族建築科技圖,折躍門建造過后才可以建造控制核心
if self.units(UnitTypeId.GATEWAY).ready.exists and not self.units(UnitTypeId.CYBERNETICSCORE):
if self.can_afford(UnitTypeId.CYBERNETICSCORE) and not self.already_pending(UnitTypeId.CYBERNETICSCORE):
await self.build(UnitTypeId.CYBERNETICSCORE,near = pylon)
# 否則建造折躍門
elif len(self.units(UnitTypeId.GATEWAY))<=3:
if self.can_afford(UnitTypeId.GATEWAY) and not self.already_pending(UnitTypeId.GATEWAY):
await self.build(UnitTypeId.GATEWAY,near=pylon)
## 造兵
async def build_offensive_force(self):
# 無隊列化建造
for gw in self.units(UnitTypeId.GATEWAY).ready.noqueue:
if self.can_afford(UnitTypeId.STALKER) and self.supply_left>0:
await self.do(gw.train(UnitTypeId.STALKER))
## 尋找目標
def find_target(self,state):
if len(self.known_enemy_units)>0:
# 隨機選取敵方單位
return random.choice(self.known_enemy_units)
elif len(self.known_enemy_units)>0:
# 隨機選取敵方建築
return random.choice(self.known_enemy_structures)
else:
# 返回敵方出生點位
return self.enemy_start_locations[0]
## 進攻
async def attack(self):
# 追獵者數量超過15個開始進攻
if self.units(UnitTypeId.STALKER).amount>15:
for s in self.units(UnitTypeId.STALKER).idle:
await self.do(s.attack(self.find_target(self.state)))
# 防衛模式:視野范圍內存在敵人,開始攻擊
if self.units(UnitTypeId.STALKER).amount>5:
if len(self.known_enemy_units)>0:
for s in self.units(UnitTypeId.STALKER).idle:
await self.do(s.attack(random.choice(self.known_enemy_units)))
## 啟動游戲
run_game(maps.get("AcidPlantLE"), [
Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Medium)
], realtime=False)
運行結果如下
可以看到,4個折躍門訓練追獵者並發動進攻。
擊敗困難電腦
我們目前的代碼只能擊敗中等和簡單電腦,那么如何擊敗困難電腦呢?
代碼如下
import sc2
from sc2 import run_game, maps, Race, Difficulty
from sc2.player import Bot, Computer
from sc2.constants import *
import random
class SentdeBot(sc2.BotAI):
def __init__(self):
# 經過計算,每分鍾大約165迭代次數
self.ITERATIONS_PER_MINUTE = 165
# 最大農民數量
self.MAX_WORKERS = 65
async def on_step(self, iteration: int):
self.iteration = iteration
await self.distribute_workers()
await self.build_workers()
await self.build_pylons()
await self.build_assimilators()
await self.expand()
await self.offensive_force_buildings()
await self.build_offensive_force()
await self.attack()
# 建造農民
async def build_workers(self):
# 星靈樞鈕*16(一個基地配備16個農民)大於農民數量並且現有農民數量小於MAX_WORKERS
if len(self.units(UnitTypeId.NEXUS))*16>len(self.units(UnitTypeId.PROBE)) and len(self.units(UnitTypeId.PROBE))<self.MAX_WORKERS:
# 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於占用資源
for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
# 是否有50晶體礦建造農民
if self.can_afford(UnitTypeId.PROBE):
await self.do(nexus.train(UnitTypeId.PROBE))
## 建造水晶
async def build_pylons(self):
## 供應人口和現有人口之差小於5且建築不是正在建造
if self.supply_left < 5 and not self.already_pending(UnitTypeId.PYLON):
nexuses = self.units(UnitTypeId.NEXUS).ready
if nexuses.exists:
if self.can_afford(UnitTypeId.PYLON):
await self.build(UnitTypeId.PYLON, near=nexuses.first)
## 建造吸收廠
async def build_assimilators(self):
for nexus in self.units(UnitTypeId.NEXUS).ready:
# 在瓦斯泉上建造吸收廠
vaspenes = self.state.vespene_geyser.closer_than(15.0,nexus)
for vaspene in vaspenes:
if not self.can_afford(UnitTypeId.ASSIMILATOR):
break
worker = self.select_build_worker(vaspene.position)
if worker is None:
break
if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0,vaspene).exists:
await self.do(worker.build(UnitTypeId.ASSIMILATOR,vaspene))
## 開礦
async def expand(self):
# (self.iteration / self.ITERATIONS_PER_MINUTE)是一個緩慢遞增的值,動態開礦
if self.units(UnitTypeId.NEXUS).amount<self.iteration / self.ITERATIONS_PER_MINUTE and self.can_afford(UnitTypeId.NEXUS):
await self.expand_now()
## 建造進攻性建築
async def offensive_force_buildings(self):
print(self.iteration / self.ITERATIONS_PER_MINUTE)
if self.units(UnitTypeId.PYLON).ready.exists:
pylon = self.units(UnitTypeId.PYLON).ready.random
# 根據神族建築科技圖,折躍門建造過后才可以建造控制核心
if self.units(UnitTypeId.GATEWAY).ready.exists and not self.units(UnitTypeId.CYBERNETICSCORE):
if self.can_afford(UnitTypeId.CYBERNETICSCORE) and not self.already_pending(UnitTypeId.CYBERNETICSCORE):
await self.build(UnitTypeId.CYBERNETICSCORE, near=pylon)
# 否則建造折躍門
# (self.iteration / self.ITERATIONS_PER_MINUTE)/2 是一個緩慢遞增的值
elif len(self.units(UnitTypeId.GATEWAY)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
if self.can_afford(UnitTypeId.GATEWAY) and not self.already_pending(UnitTypeId.GATEWAY):
await self.build(UnitTypeId.GATEWAY, near=pylon)
# 控制核心存在的情況下建造星門
if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
if len(self.units(UnitTypeId.STARGATE)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
if self.can_afford(UnitTypeId.STARGATE) and not self.already_pending(UnitTypeId.STARGATE):
await self.build(UnitTypeId.STARGATE, near=pylon)
## 造兵
async def build_offensive_force(self):
# 無隊列化建造
for gw in self.units(UnitTypeId.GATEWAY).ready.noqueue:
if not self.units(UnitTypeId.STALKER).amount > self.units(UnitTypeId.VOIDRAY).amount:
if self.can_afford(UnitTypeId.STALKER) and self.supply_left > 0:
await self.do(gw.train(UnitTypeId.STALKER))
for sg in self.units(UnitTypeId.STARGATE).ready.noqueue:
if self.can_afford(UnitTypeId.VOIDRAY) and self.supply_left > 0:
await self.do(sg.train(UnitTypeId.VOIDRAY))
## 尋找目標
def find_target(self,state):
if len(self.known_enemy_units)>0:
# 隨機選取敵方單位
return random.choice(self.known_enemy_units)
elif len(self.known_enemy_units)>0:
# 隨機選取敵方建築
return random.choice(self.known_enemy_structures)
else:
# 返回敵方出生點位
return self.enemy_start_locations[0]
## 進攻
async def attack(self):
# {UNIT: [n to fight, n to defend]}
aggressive_units = {UnitTypeId.STALKER: [15, 5],
UnitTypeId.VOIDRAY: [8, 3]}
for UNIT in aggressive_units:
# 攻擊模式
if self.units(UNIT).amount > aggressive_units[UNIT][0] and self.units(UNIT).amount > aggressive_units[UNIT][
1]:
for s in self.units(UNIT).idle:
await self.do(s.attack(self.find_target(self.state)))
# 防衛模式
elif self.units(UNIT).amount > aggressive_units[UNIT][1]:
if len(self.known_enemy_units) > 0:
for s in self.units(UNIT).idle:
await self.do(s.attack(random.choice(self.known_enemy_units)))
## 啟動游戲
run_game(maps.get("AcidPlantLE"), [
Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Hard)
], realtime=False)
運行結果如下
可以看到,擊敗了困難人族電腦,但是電腦選擇了rush戰術,我們寫得AI腳本會輸掉游戲。顯然,這不是最佳方案。
“只有AI才能拯救我的勝率”,請看下文。
采集地圖數據
這次我們只造一個折躍門,全力通過星門造虛空光輝艦
修改offensive_force_buildings(self)方法的判斷
elif len(self.units(GATEWAY)) < 1:
if self.can_afford(GATEWAY) and not self.already_pending(GATEWAY):
await self.build(GATEWAY, near=pylon)
注釋或者刪除build_offensive_force(self)的建造追獵者的代碼
## 造兵
async def build_offensive_force(self):
# 無隊列化建造
# for gw in self.units(UnitTypeId.GATEWAY).ready.noqueue:
# if not self.units(UnitTypeId.STALKER).amount > self.units(UnitTypeId.VOIDRAY).amount:
#
# if self.can_afford(UnitTypeId.STALKER) and self.supply_left > 0:
# await self.do(gw.train(UnitTypeId.STALKER))
for sg in self.units(UnitTypeId.STARGATE).ready.noqueue:
if self.can_afford(UnitTypeId.VOIDRAY) and self.supply_left > 0:
await self.do(sg.train(UnitTypeId.VOIDRAY))
attack(self)中的aggressive_units注釋掉Stalker
導入numpy和cv2庫
game_data = np.zeros((self.game_info.map_size[1], self.game_info.map_size[0], 3), np.uint8)
建立以地圖Heigt為行,Width為列的三維矩陣
for nexus in self.units(NEXUS):
nex_pos = nexus.position
print(nex_pos)
cv2.circle(game_data, (int(nex_pos[0]), int(nex_pos[1])), 10, (0, 255, 0), -1) # BGR
遍歷星靈樞紐,獲取下一個位置,畫圓,circle(承載圓的img, 圓心, 半徑, 顏色, thickness=-1表示填充)
接下來我們要垂直翻轉三維矩陣,因為我們建立的矩陣左上角是原點(0,0),縱坐標向下延申,橫坐標向右延申。翻轉之后就成了正常的坐標系。
flipped = cv2.flip(game_data, 0)
圖像縮放,達到可視化最佳。
resized = cv2.resize(flipped, dsize=None, fx=2, fy=2)
cv2.imshow('Intel', resized)
cv2.waitKey(1)
至此,完整代碼如下
import sc2
from sc2 import run_game, maps, Race, Difficulty
from sc2.player import Bot, Computer
from sc2.constants import *
import random
import numpy as np
import cv2
class SentdeBot(sc2.BotAI):
def __init__(self):
# 經過計算,每分鍾大約165迭代次數
self.ITERATIONS_PER_MINUTE = 165
# 最大農民數量
self.MAX_WORKERS = 65
async def on_step(self, iteration: int):
self.iteration = iteration
await self.distribute_workers()
await self.build_workers()
await self.build_pylons()
await self.build_assimilators()
await self.expand()
await self.offensive_force_buildings()
await self.build_offensive_force()
await self.intel()
await self.attack()
async def intel(self):
# 根據地圖建立的三維矩陣
game_data = np.zeros((self.game_info.map_size[1], self.game_info.map_size[0], 3), np.uint8)
for nexus in self.units(UnitTypeId.NEXUS):
nex_pos = nexus.position
# circle(承載圓的img, 圓心, 半徑, 顏色, thickness=-1表示填充)
# 記錄星靈樞紐的位置
cv2.circle(game_data, (int(nex_pos[0]), int(nex_pos[1])), 10, (0, 255, 0), -1)
# 圖像翻轉垂直鏡像
flipped = cv2.flip(game_data, 0)
# 圖像縮放
# cv2.resize(原圖像,輸出圖像的大小,width方向的縮放比例,height方向縮放的比例)
resized = cv2.resize(flipped, dsize=None, fx=2, fy=2)
cv2.imshow('Intel', resized)
# cv2.waitKey(每Xms刷新圖像)
cv2.waitKey(1)
# 建造農民
async def build_workers(self):
# 星靈樞鈕*16(一個基地配備16個農民)大於農民數量並且現有農民數量小於MAX_WORKERS
if len(self.units(UnitTypeId.NEXUS)) * 16 > len(self.units(UnitTypeId.PROBE)) and len(
self.units(UnitTypeId.PROBE)) < self.MAX_WORKERS:
# 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於占用資源
for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
# 是否有50晶體礦建造農民
if self.can_afford(UnitTypeId.PROBE):
await self.do(nexus.train(UnitTypeId.PROBE))
## 建造水晶
async def build_pylons(self):
## 供應人口和現有人口之差小於5且建築不是正在建造
if self.supply_left < 5 and not self.already_pending(UnitTypeId.PYLON):
nexuses = self.units(UnitTypeId.NEXUS).ready
if nexuses.exists:
if self.can_afford(UnitTypeId.PYLON):
await self.build(UnitTypeId.PYLON, near=nexuses.first)
## 建造吸收廠
async def build_assimilators(self):
for nexus in self.units(UnitTypeId.NEXUS).ready:
# 在瓦斯泉上建造吸收廠
vaspenes = self.state.vespene_geyser.closer_than(15.0, nexus)
for vaspene in vaspenes:
if not self.can_afford(UnitTypeId.ASSIMILATOR):
break
worker = self.select_build_worker(vaspene.position)
if worker is None:
break
if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0, vaspene).exists:
await self.do(worker.build(UnitTypeId.ASSIMILATOR, vaspene))
## 開礦
async def expand(self):
# (self.iteration / self.ITERATIONS_PER_MINUTE)是一個緩慢遞增的值,動態開礦
if self.units(UnitTypeId.NEXUS).amount < self.iteration / self.ITERATIONS_PER_MINUTE and self.can_afford(
UnitTypeId.NEXUS):
await self.expand_now()
## 建造進攻性建築
async def offensive_force_buildings(self):
print(self.iteration / self.ITERATIONS_PER_MINUTE)
if self.units(UnitTypeId.PYLON).ready.exists:
pylon = self.units(UnitTypeId.PYLON).ready.random
# 根據神族建築科技圖,折躍門建造過后才可以建造控制核心
if self.units(UnitTypeId.GATEWAY).ready.exists and not self.units(UnitTypeId.CYBERNETICSCORE):
if self.can_afford(UnitTypeId.CYBERNETICSCORE) and not self.already_pending(UnitTypeId.CYBERNETICSCORE):
await self.build(UnitTypeId.CYBERNETICSCORE, near=pylon)
# 否則建造折躍門
# (self.iteration / self.ITERATIONS_PER_MINUTE)/2 是一個緩慢遞增的值
# elif len(self.units(UnitTypeId.GATEWAY)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
elif len(self.units(UnitTypeId.GATEWAY)) < 1:
if self.can_afford(UnitTypeId.GATEWAY) and not self.already_pending(UnitTypeId.GATEWAY):
await self.build(UnitTypeId.GATEWAY, near=pylon)
# 控制核心存在的情況下建造星門
if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
if len(self.units(UnitTypeId.STARGATE)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
if self.can_afford(UnitTypeId.STARGATE) and not self.already_pending(UnitTypeId.STARGATE):
await self.build(UnitTypeId.STARGATE, near=pylon)
## 造兵
async def build_offensive_force(self):
# 無隊列化建造
for sg in self.units(UnitTypeId.STARGATE).ready.noqueue:
if self.can_afford(UnitTypeId.VOIDRAY) and self.supply_left > 0:
await self.do(sg.train(UnitTypeId.VOIDRAY))
## 尋找目標
def find_target(self, state):
if len(self.known_enemy_units) > 0:
# 隨機選取敵方單位
return random.choice(self.known_enemy_units)
elif len(self.known_enemy_units) > 0:
# 隨機選取敵方建築
return random.choice(self.known_enemy_structures)
else:
# 返回敵方出生點位
return self.enemy_start_locations[0]
## 進攻
async def attack(self):
# {UNIT: [n to fight, n to defend]}
aggressive_units = {UnitTypeId.VOIDRAY: [8, 3]}
for UNIT in aggressive_units:
# 攻擊模式
if self.units(UNIT).amount > aggressive_units[UNIT][0] and self.units(UNIT).amount > aggressive_units[UNIT][1]:
for s in self.units(UNIT).idle:
await self.do(s.attack(self.find_target(self.state)))
# 防衛模式
elif self.units(UNIT).amount > aggressive_units[UNIT][1]:
if len(self.known_enemy_units) > 0:
for s in self.units(UNIT).idle:
await self.do(s.attack(random.choice(self.known_enemy_units)))
## 啟動游戲
run_game(maps.get("AcidPlantLE"), [
Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Hard)
], realtime=False)
運行結果如下
采集到了地圖位置。
偵察
在intel(self)里創建一個字典draw_dict,UnitTypeId作為key,半徑和顏色是value
draw_dict = {
UnitTypeId.NEXUS: [15, (0, 255, 0)],
UnitTypeId.PYLON: [3, (20, 235, 0)],
UnitTypeId.PROBE: [1, (55, 200, 0)],
UnitTypeId.ASSIMILATOR: [2, (55, 200, 0)],
UnitTypeId.GATEWAY: [3, (200, 100, 0)],
UnitTypeId.CYBERNETICSCORE: [3, (150, 150, 0)],
UnitTypeId.STARGATE: [5, (255, 0, 0)],
UnitTypeId.ROBOTICSFACILITY: [5, (215, 155, 0)],
UnitTypeId.VOIDRAY: [3, (255, 100, 0)]
}
迭代同上
for unit_type in draw_dict:
for unit in self.units(unit_type).ready:
pos = unit.position
cv2.circle(game_data, (int(pos[0]), int(pos[1])), draw_dict[unit_type][0], draw_dict[unit_type][1], -1)
存儲三族的主基地名稱(星靈樞紐,指揮中心,孵化場),刻畫敵方建築。
# 主基地名稱
main_base_names = ["nexus", "supplydepot", "hatchery"]
# 記錄敵方基地位置
for enemy_building in self.known_enemy_structures:
pos = enemy_building.position
if enemy_building.name.lower() not in main_base_names:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 5, (200, 50, 212), -1)
for enemy_building in self.known_enemy_structures:
pos = enemy_building.position
if enemy_building.name.lower() in main_base_names:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 15, (0, 0, 255), -1)
刻畫敵方單位,如果是農民畫得小些,其他單位則畫大些。
for enemy_unit in self.known_enemy_units:
if not enemy_unit.is_structure:
worker_names = ["probe", "scv", "drone"]
# if that unit is a PROBE, SCV, or DRONE... it's a worker
pos = enemy_unit.position
if enemy_unit.name.lower() in worker_names:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (55, 0, 155), -1)
else:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 3, (50, 0, 215), -1)
在offensive_force_buildings(self)方法中添加建造機械台
if self.units(CYBERNETICSCORE).ready.exists:
if len(self.units(ROBOTICSFACILITY)) < 1:
if self.can_afford(ROBOTICSFACILITY) and not self.already_pending(ROBOTICSFACILITY):
await self.build(ROBOTICSFACILITY, near=pylon)
創建scout(),訓練Observer
async def scout(self):
if len(self.units(OBSERVER)) > 0:
scout = self.units(OBSERVER)[0]
if scout.is_idle:
enemy_location = self.enemy_start_locations[0]
move_to = self.random_location_variance(enemy_location)
print(move_to)
await self.do(scout.move(move_to))
else:
for rf in self.units(ROBOTICSFACILITY).ready.noqueue:
if self.can_afford(OBSERVER) and self.supply_left > 0:
await self.do(rf.train(OBSERVER))
生成隨機位置,很簡單。意思是橫坐標累計遞增-0.2和0.2倍的橫坐標,限制條件為如果x超過橫坐標,那么就是橫坐標最大值。
縱坐標同理。
def random_location_variance(self, enemy_start_location):
x = enemy_start_location[0]
y = enemy_start_location[1]
x += ((random.randrange(-20, 20))/100) * enemy_start_location[0]
y += ((random.randrange(-20, 20))/100) * enemy_start_location[1]
if x < 0:
x = 0
if y < 0:
y = 0
if x > self.game_info.map_size[0]:
x = self.game_info.map_size[0]
if y > self.game_info.map_size[1]:
y = self.game_info.map_size[1]
go_to = position.Point2(position.Pointlike((x,y)))
return go_to
完整代碼如下
# -*- encoding: utf-8 -*-
'''
@File : demo.py
@Modify Time @Author @Desciption
------------ ------- -----------
2019/11/3 12:32 Jonas None
'''
import sc2
from sc2 import run_game, maps, Race, Difficulty, position
from sc2.player import Bot, Computer
from sc2.constants import *
import random
import numpy as np
import cv2
class SentdeBot(sc2.BotAI):
def __init__(self):
# 經過計算,每分鍾大約165迭代次數
self.ITERATIONS_PER_MINUTE = 165
# 最大農民數量
self.MAX_WORKERS = 50
async def on_step(self, iteration: int):
self.iteration = iteration
await self.scout()
await self.distribute_workers()
await self.build_workers()
await self.build_pylons()
await self.build_assimilators()
await self.expand()
await self.offensive_force_buildings()
await self.build_offensive_force()
await self.intel()
await self.attack()
## 偵察
async def scout(self):
if len(self.units(UnitTypeId.OBSERVER)) > 0:
scout = self.units(UnitTypeId.OBSERVER)[0]
if scout.is_idle:
enemy_location = self.enemy_start_locations[0]
move_to = self.random_location_variance(enemy_location)
print(move_to)
await self.do(scout.move(move_to))
else:
for rf in self.units(UnitTypeId.ROBOTICSFACILITY).ready.noqueue:
if self.can_afford(UnitTypeId.OBSERVER) and self.supply_left > 0:
await self.do(rf.train(UnitTypeId.OBSERVER))
async def intel(self):
game_data = np.zeros((self.game_info.map_size[1], self.game_info.map_size[0], 3), np.uint8)
# UnitTypeId作為key,半徑和顏色是value
draw_dict = {
UnitTypeId.NEXUS: [15, (0, 255, 0)],
UnitTypeId.PYLON: [3, (20, 235, 0)],
UnitTypeId.PROBE: [1, (55, 200, 0)],
UnitTypeId.ASSIMILATOR: [2, (55, 200, 0)],
UnitTypeId.GATEWAY: [3, (200, 100, 0)],
UnitTypeId.CYBERNETICSCORE: [3, (150, 150, 0)],
UnitTypeId.STARGATE: [5, (255, 0, 0)],
UnitTypeId.ROBOTICSFACILITY: [5, (215, 155, 0)],
UnitTypeId.VOIDRAY: [3, (255, 100, 0)],
# OBSERVER: [3, (255, 255, 255)],
}
for unit_type in draw_dict:
for unit in self.units(unit_type).ready:
pos = unit.position
cv2.circle(game_data, (int(pos[0]), int(pos[1])), draw_dict[unit_type][0], draw_dict[unit_type][1], -1)
# 主基地名稱
main_base_names = ["nexus", "supplydepot", "hatchery"]
# 記錄敵方基地位置
for enemy_building in self.known_enemy_structures:
pos = enemy_building.position
# 不是主基地建築,畫小一些
if enemy_building.name.lower() not in main_base_names:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 5, (200, 50, 212), -1)
for enemy_building in self.known_enemy_structures:
pos = enemy_building.position
if enemy_building.name.lower() in main_base_names:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 15, (0, 0, 255), -1)
for enemy_unit in self.known_enemy_units:
if not enemy_unit.is_structure:
worker_names = ["probe", "scv", "drone"]
# if that unit is a PROBE, SCV, or DRONE... it's a worker
pos = enemy_unit.position
if enemy_unit.name.lower() in worker_names:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (55, 0, 155), -1)
else:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 3, (50, 0, 215), -1)
for obs in self.units(UnitTypeId.OBSERVER).ready:
pos = obs.position
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (255, 255, 255), -1)
# flip horizontally to make our final fix in visual representation:
flipped = cv2.flip(game_data, 0)
resized = cv2.resize(flipped, dsize=None, fx=2, fy=2)
cv2.imshow('Intel', resized)
cv2.waitKey(1)
def random_location_variance(self, enemy_start_location):
x = enemy_start_location[0]
y = enemy_start_location[1]
x += ((random.randrange(-20, 20)) / 100) * enemy_start_location[0]
y += ((random.randrange(-20, 20)) / 100) * enemy_start_location[1]
if x < 0:
x = 0
if y < 0:
y = 0
if x > self.game_info.map_size[0]:
x = self.game_info.map_size[0]
if y > self.game_info.map_size[1]:
y = self.game_info.map_size[1]
go_to = position.Point2(position.Pointlike((x, y)))
return go_to
# 建造農民
async def build_workers(self):
# 星靈樞鈕*16(一個基地配備16個農民)大於農民數量並且現有農民數量小於MAX_WORKERS
if len(self.units(UnitTypeId.NEXUS)) * 16 > len(self.units(UnitTypeId.PROBE)) and len(
self.units(UnitTypeId.PROBE)) < self.MAX_WORKERS:
# 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於占用資源
for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
# 是否有50晶體礦建造農民
if self.can_afford(UnitTypeId.PROBE):
await self.do(nexus.train(UnitTypeId.PROBE))
## 建造水晶
async def build_pylons(self):
## 供應人口和現有人口之差小於5且建築不是正在建造
if self.supply_left < 5 and not self.already_pending(UnitTypeId.PYLON):
nexuses = self.units(UnitTypeId.NEXUS).ready
if nexuses.exists:
if self.can_afford(UnitTypeId.PYLON):
await self.build(UnitTypeId.PYLON, near=nexuses.first)
## 建造吸收廠
async def build_assimilators(self):
for nexus in self.units(UnitTypeId.NEXUS).ready:
# 在瓦斯泉上建造吸收廠
vaspenes = self.state.vespene_geyser.closer_than(15.0, nexus)
for vaspene in vaspenes:
if not self.can_afford(UnitTypeId.ASSIMILATOR):
break
worker = self.select_build_worker(vaspene.position)
if worker is None:
break
if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0, vaspene).exists:
await self.do(worker.build(UnitTypeId.ASSIMILATOR, vaspene))
## 開礦
async def expand(self):
# (self.iteration / self.ITERATIONS_PER_MINUTE)是一個緩慢遞增的值,動態開礦
if self.units(UnitTypeId.NEXUS).amount < self.iteration / self.ITERATIONS_PER_MINUTE and self.can_afford(
UnitTypeId.NEXUS):
await self.expand_now()
## 建造進攻性建築
async def offensive_force_buildings(self):
print(self.iteration / self.ITERATIONS_PER_MINUTE)
if self.units(UnitTypeId.PYLON).ready.exists:
pylon = self.units(UnitTypeId.PYLON).ready.random
# 根據神族建築科技圖,折躍門建造過后才可以建造控制核心
if self.units(UnitTypeId.GATEWAY).ready.exists and not self.units(UnitTypeId.CYBERNETICSCORE):
if self.can_afford(UnitTypeId.CYBERNETICSCORE) and not self.already_pending(UnitTypeId.CYBERNETICSCORE):
await self.build(UnitTypeId.CYBERNETICSCORE, near=pylon)
# 否則建造折躍門
# (self.iteration / self.ITERATIONS_PER_MINUTE)/2 是一個緩慢遞增的值
# elif len(self.units(UnitTypeId.GATEWAY)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
elif len(self.units(UnitTypeId.GATEWAY)) < 1:
if self.can_afford(UnitTypeId.GATEWAY) and not self.already_pending(UnitTypeId.GATEWAY):
await self.build(UnitTypeId.GATEWAY, near=pylon)
# 控制核心存在的情況下建造機械台
if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
if len(self.units(UnitTypeId.ROBOTICSFACILITY)) < 1:
if self.can_afford(UnitTypeId.ROBOTICSFACILITY) and not self.already_pending(
UnitTypeId.ROBOTICSFACILITY):
await self.build(UnitTypeId.ROBOTICSFACILITY, near=pylon)
# 控制核心存在的情況下建造星門
if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
if len(self.units(UnitTypeId.STARGATE)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
if self.can_afford(UnitTypeId.STARGATE) and not self.already_pending(UnitTypeId.STARGATE):
await self.build(UnitTypeId.STARGATE, near=pylon)
## 造兵
async def build_offensive_force(self):
# 無隊列化建造
# for gw in self.units(UnitTypeId.GATEWAY).ready.noqueue:
# if not self.units(UnitTypeId.STALKER).amount > self.units(UnitTypeId.VOIDRAY).amount:
#
# if self.can_afford(UnitTypeId.STALKER) and self.supply_left > 0:
# await self.do(gw.train(UnitTypeId.STALKER))
for sg in self.units(UnitTypeId.STARGATE).ready.noqueue:
if self.can_afford(UnitTypeId.VOIDRAY) and self.supply_left > 0:
await self.do(sg.train(UnitTypeId.VOIDRAY))
## 尋找目標
def find_target(self, state):
if len(self.known_enemy_units) > 0:
# 隨機選取敵方單位
return random.choice(self.known_enemy_units)
elif len(self.known_enemy_units) > 0:
# 隨機選取敵方建築
return random.choice(self.known_enemy_structures)
else:
# 返回敵方出生點位
return self.enemy_start_locations[0]
## 進攻
async def attack(self):
# {UNIT: [n to fight, n to defend]}
aggressive_units = {UnitTypeId.VOIDRAY: [8, 3]}
for UNIT in aggressive_units:
# 攻擊模式
if self.units(UNIT).amount > aggressive_units[UNIT][0] and self.units(UNIT).amount > aggressive_units[UNIT][
1]:
for s in self.units(UNIT).idle:
await self.do(s.attack(self.find_target(self.state)))
# 防衛模式
elif self.units(UNIT).amount > aggressive_units[UNIT][1]:
if len(self.known_enemy_units) > 0:
for s in self.units(UNIT).idle:
await self.do(s.attack(random.choice(self.known_enemy_units)))
## 啟動游戲
run_game(maps.get("AcidPlantLE"), [
Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Hard)
], realtime=False)
運行結果如下,紅色和粉紅色是敵方單位。
創建訓練數據
統計資源、人口和軍隊人口比,在intel方法添加如下代碼
# 追蹤資源、人口和軍隊人口比
line_max = 50
mineral_ratio = self.minerals / 1500
if mineral_ratio > 1.0:
mineral_ratio = 1.0
vespene_ratio = self.vespene / 1500
if vespene_ratio > 1.0:
vespene_ratio = 1.0
population_ratio = self.supply_left / self.supply_cap
if population_ratio > 1.0:
population_ratio = 1.0
plausible_supply = self.supply_cap / 200.0
military_weight = len(self.units(UnitTypeId.VOIDRAY)) / (self.supply_cap - self.supply_left)
if military_weight > 1.0:
military_weight = 1.0
# 農民/人口 worker/supply ratio
cv2.line(game_data, (0, 19), (int(line_max * military_weight), 19), (250, 250, 200), 3)
# 人口/200 plausible supply (supply/200.0)
cv2.line(game_data, (0, 15), (int(line_max * plausible_supply), 15), (220, 200, 200), 3)
# (人口-現有人口)/人口 population ratio (supply_left/supply)
cv2.line(game_data, (0, 11), (int(line_max * population_ratio), 11), (150, 150, 150), 3)
# 氣體/1500 gas/1500
cv2.line(game_data, (0, 7), (int(line_max * vespene_ratio), 7), (210, 200, 0), 3)
# 晶體礦/1500 minerals minerals/1500
cv2.line(game_data, (0, 3), (int(line_max * mineral_ratio), 3), (0, 255, 25), 3)
運行結果如下,左下角自上而下依次是“農民/人口”,“人口/200”,“(人口-現有人口)/人口”,“氣體/1500”,“晶體礦/1500”
采集進攻行為數據,在attack方法中加入如下代碼
if len(self.units(UnitTypeId.VOIDRAY).idle) > 0:
choice = random.randrange(0, 4)
target = False
if self.iteration > self.do_something_after:
if choice == 0:
# 什么都不做
wait = random.randrange(20, 165)
self.do_something_after = self.iteration + wait
elif choice == 1:
# 攻擊離星靈樞紐最近的單位
if len(self.known_enemy_units) > 0:
target = self.known_enemy_units.closest_to(random.choice(self.units(UnitTypeId.NEXUS)))
elif choice == 2:
# 攻擊敵方建築
if len(self.known_enemy_structures) > 0:
target = random.choice(self.known_enemy_structures)
elif choice == 3:
# 攻擊敵方出生位置
target = self.enemy_start_locations[0]
if target:
for vr in self.units(UnitTypeId.VOIDRAY).idle:
await self.do(vr.attack(target))
y = np.zeros(4)
y[choice] = 1
print(y)
self.train_data.append([y, self.flipped])
輸出如下結果
···
[1. 0. 0. 0.]
[0. 0. 1. 0.]
[0. 0. 0. 1.]
[0. 0. 1. 0.]
[1. 0. 0. 0.]
···
為了使用self.flipped = cv2.flip(game_data, 0),修改
flipped = cv2.flip(game_data, 0)
resized = cv2.resize(flipped, dsize=None, fx=2, fy=2)
為
self.flipped = cv2.flip(game_data, 0)
resized = cv2.resize(self.flipped, dsize=None, fx=2, fy=2)
init 方法添加do_something_after和train_data
def __init__(self):
self.ITERATIONS_PER_MINUTE = 165
self.MAX_WORKERS = 50
self.do_something_after = 0
self.train_data = []
采集攻擊數據的時候不需要畫圖,我們在類前加HEADLESS = False
,intel方法代碼修改如下
self.flipped = cv2.flip(game_data, 0)
if not HEADLESS:
resized = cv2.resize(self.flipped, dsize=None, fx=2, fy=2)
cv2.imshow('Intel', resized)
cv2.waitKey(1)
加入on_end方法,只存儲勝利的數據,在和代碼同級目錄新建train_data文件夾
def on_end(self, game_result):
print('--- on_end called ---')
print(game_result)
if game_result == Result.Victory:
np.save("train_data/{}.npy".format(str(int(time.time()))), np.array(self.train_data))
完整代碼如下
import os
import time
import sc2
from sc2 import run_game, maps, Race, Difficulty, position, Result
from sc2.player import Bot, Computer
from sc2.constants import *
import random
import numpy as np
import cv2
HEADLESS = True
# os.environ["SC2PATH"] = 'F:\StarCraft II'
class SentdeBot(sc2.BotAI):
def __init__(self):
# 經過計算,每分鍾大約165迭代次數
self.ITERATIONS_PER_MINUTE = 165
# 最大農民數量
self.MAX_WORKERS = 50
self.do_something_after = 0
self.train_data = []
def on_end(self, game_result):
print('--- on_end called ---')
print(game_result)
if game_result == Result.Victory:
np.save("train_data/{}.npy".format(str(int(time.time()))), np.array(self.train_data))
async def on_step(self, iteration: int):
self.iteration = iteration
await self.scout()
await self.distribute_workers()
await self.build_workers()
await self.build_pylons()
await self.build_assimilators()
await self.expand()
await self.offensive_force_buildings()
await self.build_offensive_force()
await self.intel()
await self.attack()
## 偵察
async def scout(self):
if len(self.units(UnitTypeId.OBSERVER)) > 0:
scout = self.units(UnitTypeId.OBSERVER)[0]
if scout.is_idle:
enemy_location = self.enemy_start_locations[0]
move_to = self.random_location_variance(enemy_location)
print(move_to)
await self.do(scout.move(move_to))
else:
for rf in self.units(UnitTypeId.ROBOTICSFACILITY).ready.noqueue:
if self.can_afford(UnitTypeId.OBSERVER) and self.supply_left > 0:
await self.do(rf.train(UnitTypeId.OBSERVER))
async def intel(self):
game_data = np.zeros((self.game_info.map_size[1], self.game_info.map_size[0], 3), np.uint8)
# UnitTypeId作為key,半徑和顏色是value
draw_dict = {
UnitTypeId.NEXUS: [15, (0, 255, 0)],
UnitTypeId.PYLON: [3, (20, 235, 0)],
UnitTypeId.PROBE: [1, (55, 200, 0)],
UnitTypeId.ASSIMILATOR: [2, (55, 200, 0)],
UnitTypeId.GATEWAY: [3, (200, 100, 0)],
UnitTypeId.CYBERNETICSCORE: [3, (150, 150, 0)],
UnitTypeId.STARGATE: [5, (255, 0, 0)],
UnitTypeId.ROBOTICSFACILITY: [5, (215, 155, 0)],
UnitTypeId.VOIDRAY: [3, (255, 100, 0)],
# OBSERVER: [3, (255, 255, 255)],
}
for unit_type in draw_dict:
for unit in self.units(unit_type).ready:
pos = unit.position
cv2.circle(game_data, (int(pos[0]), int(pos[1])), draw_dict[unit_type][0], draw_dict[unit_type][1], -1)
# 主基地名稱
main_base_names = ["nexus", "supplydepot", "hatchery"]
# 記錄敵方基地位置
for enemy_building in self.known_enemy_structures:
pos = enemy_building.position
# 不是主基地建築,畫小一些
if enemy_building.name.lower() not in main_base_names:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 5, (200, 50, 212), -1)
for enemy_building in self.known_enemy_structures:
pos = enemy_building.position
if enemy_building.name.lower() in main_base_names:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 15, (0, 0, 255), -1)
for enemy_unit in self.known_enemy_units:
if not enemy_unit.is_structure:
worker_names = ["probe", "scv", "drone"]
# if that unit is a PROBE, SCV, or DRONE... it's a worker
pos = enemy_unit.position
if enemy_unit.name.lower() in worker_names:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (55, 0, 155), -1)
else:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 3, (50, 0, 215), -1)
for obs in self.units(UnitTypeId.OBSERVER).ready:
pos = obs.position
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (255, 255, 255), -1)
# 追蹤資源、人口和軍隊人口比
line_max = 50
mineral_ratio = self.minerals / 1500
if mineral_ratio > 1.0:
mineral_ratio = 1.0
vespene_ratio = self.vespene / 1500
if vespene_ratio > 1.0:
vespene_ratio = 1.0
population_ratio = self.supply_left / self.supply_cap
if population_ratio > 1.0:
population_ratio = 1.0
plausible_supply = self.supply_cap / 200.0
military_weight = len(self.units(UnitTypeId.VOIDRAY)) / (self.supply_cap - self.supply_left)
if military_weight > 1.0:
military_weight = 1.0
# 農民/人口 worker/supply ratio
cv2.line(game_data, (0, 19), (int(line_max * military_weight), 19), (250, 250, 200), 3)
# 人口/200 plausible supply (supply/200.0)
cv2.line(game_data, (0, 15), (int(line_max * plausible_supply), 15), (220, 200, 200), 3)
# (人口-現有人口)/人口 population ratio (supply_left/supply)
cv2.line(game_data, (0, 11), (int(line_max * population_ratio), 11), (150, 150, 150), 3)
# 氣體/1500 gas/1500
cv2.line(game_data, (0, 7), (int(line_max * vespene_ratio), 7), (210, 200, 0), 3)
# 晶體礦/1500 minerals minerals/1500
cv2.line(game_data, (0, 3), (int(line_max * mineral_ratio), 3), (0, 255, 25), 3)
# flip horizontally to make our final fix in visual representation:
self.flipped = cv2.flip(game_data, 0)
if HEADLESS:
resized = cv2.resize(self.flipped, dsize=None, fx=2, fy=2)
cv2.imshow('Intel', resized)
cv2.waitKey(1)
def random_location_variance(self, enemy_start_location):
x = enemy_start_location[0]
y = enemy_start_location[1]
x += ((random.randrange(-20, 20)) / 100) * enemy_start_location[0]
y += ((random.randrange(-20, 20)) / 100) * enemy_start_location[1]
if x < 0:
x = 0
if y < 0:
y = 0
if x > self.game_info.map_size[0]:
x = self.game_info.map_size[0]
if y > self.game_info.map_size[1]:
y = self.game_info.map_size[1]
go_to = position.Point2(position.Pointlike((x, y)))
return go_to
# 建造農民
async def build_workers(self):
# 星靈樞鈕*16(一個基地配備16個農民)大於農民數量並且現有農民數量小於MAX_WORKERS
if len(self.units(UnitTypeId.NEXUS)) * 16 > len(self.units(UnitTypeId.PROBE)) and len(
self.units(UnitTypeId.PROBE)) < self.MAX_WORKERS:
# 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於占用資源
for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
# 是否有50晶體礦建造農民
if self.can_afford(UnitTypeId.PROBE):
await self.do(nexus.train(UnitTypeId.PROBE))
## 建造水晶
async def build_pylons(self):
## 供應人口和現有人口之差小於5且建築不是正在建造
if self.supply_left < 5 and not self.already_pending(UnitTypeId.PYLON):
nexuses = self.units(UnitTypeId.NEXUS).ready
if nexuses.exists:
if self.can_afford(UnitTypeId.PYLON):
await self.build(UnitTypeId.PYLON, near=nexuses.first)
## 建造吸收廠
async def build_assimilators(self):
for nexus in self.units(UnitTypeId.NEXUS).ready:
# 在瓦斯泉上建造吸收廠
vaspenes = self.state.vespene_geyser.closer_than(15.0, nexus)
for vaspene in vaspenes:
if not self.can_afford(UnitTypeId.ASSIMILATOR):
break
worker = self.select_build_worker(vaspene.position)
if worker is None:
break
if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0, vaspene).exists:
await self.do(worker.build(UnitTypeId.ASSIMILATOR, vaspene))
## 開礦
async def expand(self):
# (self.iteration / self.ITERATIONS_PER_MINUTE)是一個緩慢遞增的值,動態開礦
if self.units(UnitTypeId.NEXUS).amount < self.iteration / self.ITERATIONS_PER_MINUTE and self.can_afford(
UnitTypeId.NEXUS):
await self.expand_now()
## 建造進攻性建築
async def offensive_force_buildings(self):
# print(self.iteration / self.ITERATIONS_PER_MINUTE)
if self.units(UnitTypeId.PYLON).ready.exists:
pylon = self.units(UnitTypeId.PYLON).ready.random
# 根據神族建築科技圖,折躍門建造過后才可以建造控制核心
if self.units(UnitTypeId.GATEWAY).ready.exists and not self.units(UnitTypeId.CYBERNETICSCORE):
if self.can_afford(UnitTypeId.CYBERNETICSCORE) and not self.already_pending(UnitTypeId.CYBERNETICSCORE):
await self.build(UnitTypeId.CYBERNETICSCORE, near=pylon)
# 否則建造折躍門
# (self.iteration / self.ITERATIONS_PER_MINUTE)/2 是一個緩慢遞增的值
# elif len(self.units(UnitTypeId.GATEWAY)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
elif len(self.units(UnitTypeId.GATEWAY)) < 1:
if self.can_afford(UnitTypeId.GATEWAY) and not self.already_pending(UnitTypeId.GATEWAY):
await self.build(UnitTypeId.GATEWAY, near=pylon)
# 控制核心存在的情況下建造機械台
if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
if len(self.units(UnitTypeId.ROBOTICSFACILITY)) < 1:
if self.can_afford(UnitTypeId.ROBOTICSFACILITY) and not self.already_pending(
UnitTypeId.ROBOTICSFACILITY):
await self.build(UnitTypeId.ROBOTICSFACILITY, near=pylon)
# 控制核心存在的情況下建造星門
if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
if len(self.units(UnitTypeId.STARGATE)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
if self.can_afford(UnitTypeId.STARGATE) and not self.already_pending(UnitTypeId.STARGATE):
await self.build(UnitTypeId.STARGATE, near=pylon)
## 造兵
async def build_offensive_force(self):
# 無隊列化建造
# for gw in self.units(UnitTypeId.GATEWAY).ready.noqueue:
# if not self.units(UnitTypeId.STALKER).amount > self.units(UnitTypeId.VOIDRAY).amount:
#
# if self.can_afford(UnitTypeId.STALKER) and self.supply_left > 0:
# await self.do(gw.train(UnitTypeId.STALKER))
for sg in self.units(UnitTypeId.STARGATE).ready.noqueue:
if self.can_afford(UnitTypeId.VOIDRAY) and self.supply_left > 0:
await self.do(sg.train(UnitTypeId.VOIDRAY))
## 尋找目標
def find_target(self, state):
if len(self.known_enemy_units) > 0:
# 隨機選取敵方單位
return random.choice(self.known_enemy_units)
elif len(self.known_enemy_units) > 0:
# 隨機選取敵方建築
return random.choice(self.known_enemy_structures)
else:
# 返回敵方出生點位
return self.enemy_start_locations[0]
## 進攻
async def attack(self):
if len(self.units(UnitTypeId.VOIDRAY).idle) > 0:
choice = random.randrange(0, 4)
target = False
if self.iteration > self.do_something_after:
if choice == 0:
# 什么都不做
wait = random.randrange(20, 165)
self.do_something_after = self.iteration + wait
elif choice == 1:
# 攻擊離星靈樞紐最近的單位
if len(self.known_enemy_units) > 0:
target = self.known_enemy_units.closest_to(random.choice(self.units(UnitTypeId.NEXUS)))
elif choice == 2:
# 攻擊敵方建築
if len(self.known_enemy_structures) > 0:
target = random.choice(self.known_enemy_structures)
elif choice == 3:
# 攻擊敵方出生位置
target = self.enemy_start_locations[0]
if target:
for vr in self.units(UnitTypeId.VOIDRAY).idle:
await self.do(vr.attack(target))
y = np.zeros(4)
y[choice] = 1
print(y)
self.train_data.append([y, self.flipped])
## 啟動游戲
run_game(maps.get("AcidPlantLE"), [
Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Medium)
], realtime=False)
可以看到train_data文件夾下存儲了勝利數據
創建神經網絡模型
sequential model如下
選擇Sequential模型,dense層,dropout和flatten,將數據flatten之后再進入dense層。最終,我們使用卷積神經網絡,所以我們需要Conv2D和MaxPooling2D。我們想可視化輸出模型訓練結果,再加上TensorBoard。
存儲數據用numpy,os操作IO,random用來進行隨機操作。
import keras
from keras.models import Sequential
from keras.layers import Dense,Dropout,Flatten,Conv2D,MaxPooling2D
from keras.callbacks import TensorBoard
import numpy as np
import os
import random
首先,創建模型
model = Sequential()
名詞解釋:
Sequential:序貫模型,序貫模型是函數式模型的簡略版,為最簡單的線性、從頭到尾的結構順序,不分叉
Conv2D:圖像的空域卷積
MaxPooling2D:空域信號施加到最大值池化
Dropout:用於防止過擬合
Flatten:輸入"壓平",把多維的輸入一維化,常用在卷積層到全連接層的過渡
Dense:全連接層
接下來,創建hidden卷積層。
model.add(Conv2D(32, (3, 3), padding='same',
input_shape=(176, 200, 3),
activation='relu'))
model.add(Conv2D(32, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.2))
model.add(Conv2D(64, (3, 3), padding='same',
activation='relu'))
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.2))
model.add(Conv2D(128, (3, 3), padding='same',
activation='relu'))
model.add(Conv2D(128, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.2))
創建全連接層
model.add(Flatten())
model.add(Dense(512, activation='relu'))
model.add(Dropout(0.5))
輸出層
model.add(Dense(4, activation='softmax'))
為神經網絡設置compile屬性
learning_rate = 0.0001
opt = keras.optimizers.adam(lr=learning_rate, decay=1e-6)
model.compile(loss='categorical_crossentropy',
optimizer=opt,
metrics=['accuracy'])
compile方法解釋:
compile(self, optimizer, loss, metrics=None, sample_weight_mode=None)
optimizer: 字符串(預定義優化器名)或優化器對象,參考優化器
loss: 字符串(預定義損失函數名)或目標函數,參考損失函數
metrics: 列表,包含評估模型在訓練和測試時的網絡性能的指標,典型用法是metrics=[‘accuracy’]
sample_weight_mode:如果你需要按時間步為樣本賦權(2D權矩陣),將該值設為“temporal”。
默認為“None”,代表按樣本賦權(1D權)。在下面fit函數的解釋中有相關的參考內容。
kwargs: 使用TensorFlow作為后端請忽略該參數,若使用Theano作為后端,kwargs的值將會傳遞給 K.function
通過TensorBoard記錄數據
tensorboard = TensorBoard(log_dir="logs/stage1")
完整代碼如下:
# -*- encoding: utf-8 -*-
'''
@File : building-model.py
@Modify Time @Author @Desciption
------------ ------- -----------
2019/11/26 22:37 Jonas None
'''
import keras
from keras.models import Sequential
from keras.layers import Dense,Dropout,Flatten,Conv2D,MaxPooling2D
from keras.callbacks import TensorBoard
import numpy as np
import os
import random
model = Sequential()
model.add(Conv2D(32, (3, 3), padding='same',
input_shape=(176, 200, 3),
activation='relu'))
model.add(Conv2D(32, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.2))
model.add(Conv2D(64, (3, 3), padding='same',
activation='relu'))
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.2))
model.add(Conv2D(128, (3, 3), padding='same',
activation='relu'))
model.add(Conv2D(128, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.2))
model.add(Flatten())
model.add(Dense(512, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(4, activation='softmax'))
learning_rate = 0.0001
opt = keras.optimizers.adam(lr=learning_rate, decay=1e-6)
model.compile(loss='categorical_crossentropy',
optimizer=opt,
metrics=['accuracy'])
tensorboard = TensorBoard(log_dir="logs/stage1")
tran_data_dir = "train_data"
訓練神經網絡模型
train_data現成的訓練文件鏈接,當然,你也可以自己訓練。
鏈接:https://pan.baidu.com/s/1X0eRXakbmqmL_It9QbLYfQ
提取碼:trow
一次處理200個文件
for i in range(hm_epochs):
print(i)
current = 0
increment = 200
not_maximum = True
獲取文件數組首元素,即choice
while not_maximum:
print("現在正在做 {}:{}".format(current,current+increment))
no_attacks = []
attack_closest_to_nexus = []
attack_enemy_structures = []
attack_enemy_start = []
for file in all_files[current:current+increment]:
full_path = os.path.join(train_data_dir,file)
data = np.load(full_path)
data = list(data)
for d in data:
# 返回最大值的索引
choice = np.argmax(d[0])
if choice == 0:
no_attacks.append(d)
elif choice == 1:
attack_closest_to_nexus.append(d)
elif choice == 2:
attack_enemy_structures.append(d)
elif choice == 3:
attack_enemy_start.append(d)
統計四種進攻策略的數量
# 對數據進行統計
def check_data():
choices = {"no_attacks": no_attacks,
"attack_closest_to_nexus": attack_closest_to_nexus,
"attack_enemy_structures": attack_enemy_structures,
"attack_enemy_start": attack_enemy_start}
total_data = 0
lengths = []
for choice in choices:
print("Length of {} is: {}".format(choice, len(choices[choice])))
total_data += len(choices[choice])
lengths.append(len(choices[choice]))
print("Total data length now is:",total_data)
return lengths
將四種進攻策略的數據規范到同一長度
lowest_data = min(lengths)
random.shuffle(no_attacks)
random.shuffle(attack_closest_to_nexus)
random.shuffle(attack_enemy_structures)
random.shuffle(attack_enemy_start)
# 縮小數據集
no_attacks = no_attacks[:lowest_data]
attack_closest_to_nexus = attack_closest_to_nexus[:lowest_data]
attack_enemy_structures = attack_enemy_structures[:lowest_data]
attack_enemy_start = attack_enemy_start[:lowest_data]
check_data()
把四種進攻策略合並成一個大list
train_data = no_attacks + attack_closest_to_nexus + attack_enemy_structures + attack_enemy_start
100個驗證集,len(train_data-100)個訓練集
x_train = np.array([i[1] for i in train_data[:-test_size]]).reshape(-1, 176, 200, 3)
y_train = np.array([i[0] for i in train_data[:-test_size]])
x_test = np.array([i[1] for i in train_data[-test_size:]]).reshape(-1, 176, 200, 3)
y_test = np.array([i[0] for i in train_data[-test_size:]])
擬合數據
fit方法參數解釋:
fit(x=None, y=None, batch_size=None, epochs=1, verbose=1, callbacks=None, validation_split=0.0, validation_data=None, shuffle=True, class_weight=None, sample_weight=None, initial_epoch=0, steps_per_epoch=None, >validation_steps=None)
x: 訓練數據的 Numpy 數組(如果模型只有一個輸入), 或者是 Numpy 數組的列表(如果模型有多個輸入)。 如果模型中的輸入層被命名,你也可以傳遞一個字典,將輸入層名稱映射到 Numpy 數組。 如果從本地框架張量饋送(例如 >TensorFlow 數據張量)數據,x 可以是 None(默認)。
y: 目標(標簽)數據的 Numpy 數組(如果模型只有一個輸出), 或者是 Numpy 數組的列表(如果模型有多個輸出)。 如果模型中的輸出層被命名,你也可以傳遞一個字典,將輸出層名稱映射到 Numpy 數組。 如果從本地框架張量饋送(例>如 TensorFlow 數據張量)數據,y 可以是 None(默認)。
batch_size: 整數或 None。每次梯度更新的樣本數。如果未指定,默認為 32。
verbose: 0, 1 或 2。日志顯示模式。 0 = 安靜模式, 1 = 進度條, 2 = 每輪一行。
validation_data: 元組 (x_val,y_val) 或元組 (x_val,y_val,val_sample_weights), 用來評估損失,以及在每輪結束時的任何模型度量指標。 模型將不會在這個數據上進行訓練。這個參數會覆蓋 validation_split。
shuffle: 布爾值(是否在每輪迭代之前混洗數據)或者 字符串 (batch)。 batch 是處理 HDF5 數據限制的特殊選項,它對一個 batch 內部的數據進行混洗。 當 steps_per_epoch 非 None 時,這個參數無效。
callbacks: 一系列的 keras.callbacks.Callback 實例。一系列可以在訓練時使用的回調函數。
model.fit(x_train,y_train,batch_size=batch_size,validation_data=(x_test,y_test),shuffle=True,verbose=1,
callbacks=[tensorboard])
保存模型
model.save("BasicCNN-{}-epochs-{}-LR-STAGE1".format(hm_epochs, learning_rate))
current += increment
if current > maximum:
not_maximum = False
至此,完整代碼如下
import keras
from keras.models import Sequential
from keras.layers import Dense,Dropout,Flatten,Conv2D,MaxPooling2D
from keras.callbacks import TensorBoard
import numpy as np
import os
import random
# 創建序貫模型
model = Sequential()
# 創建hidden卷積層
model.add(Conv2D(32, (3, 3), padding='same',
input_shape=(176, 200, 3),
activation='relu'))
model.add(Conv2D(32, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.2))
model.add(Conv2D(64, (3, 3), padding='same',
activation='relu'))
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.2))
model.add(Conv2D(128, (3, 3), padding='same',
activation='relu'))
model.add(Conv2D(128, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.2))
model.add(Flatten())
model.add(Dense(512, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(4, activation='softmax'))
learning_rate = 0.0001
opt = keras.optimizers.adam(lr=learning_rate, decay=1e-6)
model.compile(loss='categorical_crossentropy',
optimizer=opt,
metrics=['accuracy'])
tensorboard = TensorBoard(log_dir="logs/stage1")
train_data_dir = "train_data"
# 對數據進行統計
def check_data():
choices = {"no_attacks": no_attacks,
"attack_closest_to_nexus": attack_closest_to_nexus,
"attack_enemy_structures": attack_enemy_structures,
"attack_enemy_start": attack_enemy_start}
total_data = 0
lengths = []
for choice in choices:
print("Length of {} is: {}".format(choice, len(choices[choice])))
total_data += len(choices[choice])
lengths.append(len(choices[choice]))
print("Total data length now is:",total_data)
return lengths
# 循環次數
hm_epochs = 10
for i in range(hm_epochs):
print(i)
current = 0
increment = 200
not_maximum = True
# 遍歷所有文件
all_files = os.listdir(train_data_dir)
maximum = len(all_files)
random.shuffle(all_files)
while not_maximum:
print("現在正在做 {}:{}".format(current,current+increment))
no_attacks = []
attack_closest_to_nexus = []
attack_enemy_structures = []
attack_enemy_start = []
for file in all_files[current:current+increment]:
full_path = os.path.join(train_data_dir,file)
data = np.load(full_path)
data = list(data)
for d in data:
# 返回最大值的索引
choice = np.argmax(d[0])
if choice == 0:
no_attacks.append(d)
elif choice == 1:
attack_closest_to_nexus.append(d)
elif choice == 2:
attack_enemy_structures.append(d)
elif choice == 3:
attack_enemy_start.append(d)
lengths = check_data()
lowest_data = min(lengths)
random.shuffle(no_attacks)
random.shuffle(attack_closest_to_nexus)
random.shuffle(attack_enemy_structures)
random.shuffle(attack_enemy_start)
# 縮小數據集
no_attacks = no_attacks[:lowest_data]
attack_closest_to_nexus = attack_closest_to_nexus[:lowest_data]
attack_enemy_structures = attack_enemy_structures[:lowest_data]
attack_enemy_start = attack_enemy_start[:lowest_data]
check_data()
train_data = no_attacks + attack_enemy_start + attack_enemy_structures + attack_closest_to_nexus
random.shuffle(train_data)
test_size = 100
batch_size = 128
# 重置為四維數組,行未知,留給numpy計算
x_train = np.array([i[1] for i in train_data[:-test_size]]).reshape(-1, 176, 200, 3)
y_train = np.array([i[0] for i in train_data[:-test_size]])
x_test = np.array([i[1] for i in train_data[-test_size:]]).reshape(-1, 176, 200, 3)
y_test = np.array([i[0] for i in train_data[-test_size:]])
model.fit(x_train,y_train,batch_size=batch_size,validation_data=(x_test,y_test),shuffle=True,verbose=1,
callbacks=[tensorboard])
model.save("BasicCNN-{}-epochs-{}-LR-STAGE1".format(hm_epochs, learning_rate))
current += increment
if current > maximum:
not_maximum = False
調用神經網絡模型
我們自己決定是否使用模型,加上use_model
def __init__(self,use_model = False):
# 經過計算,每分鍾大約165迭代次數
self.ITERATIONS_PER_MINUTE = 165
# 最大農民數量
self.MAX_WORKERS = 50
self.do_something_after = 0
self.train_data = []
self.use_model = use_model
if self.use_model:
print("use model")
self.model = keras.models.load_model("BasicCNN-30-epochs-0.0001-LR-4.2")
記錄游戲結果,是否使用模型
def on_end(self, game_result):
print('--- on_end called ---')
print(game_result, self.use_model)
with open("log.txt","a") as f:
if self.use_model:
f.write("Model {}\n".format(game_result))
else:
f.write("Random {}\n".format(game_result))
既使用模型也使用隨機
async def attack(self):
if len(self.units(VOIDRAY).idle) > 0:
target = False
if self.iteration > self.do_something_after:
if self.use_model:
prediction = self.model.predict([self.flipped.reshape([-1, 176, 200, 3])])
choice = np.argmax(prediction[0])
#print('prediction: ',choice)
choice_dict = {0: "No Attack!",
1: "Attack close to our nexus!",
2: "Attack Enemy Structure!",
3: "Attack Eneemy Start!"}
print("Choice #{}:{}".format(choice, choice_dict[choice]))
else:
choice = random.randrange(0, 4)
運行游戲,別忘了加入use_model參數
## 啟動游戲
for i in range(10):
run_game(maps.get("AbyssalReefLE"), [
Bot(Race.Protoss, SentdeBot(use_model=True)),
Computer(Race.Terran, Difficulty.Medium)
], realtime=False)
測試,查看log.txt記錄的游戲結果
至此,demo.py完整代碼如下:
import sc2
from sc2 import run_game, maps, Race, Difficulty, position, Result
from sc2.player import Bot, Computer
from sc2.constants import *
import random
import numpy as np
import cv2
import keras
HEADLESS = False
# os.environ["SC2PATH"] = 'F:\StarCraft II'
class SentdeBot(sc2.BotAI):
def __init__(self,use_model = False):
# 經過計算,每分鍾大約165迭代次數
self.ITERATIONS_PER_MINUTE = 165
# 最大農民數量
self.MAX_WORKERS = 50
self.do_something_after = 0
self.train_data = []
self.use_model = use_model
if self.use_model:
print("use model")
self.model = keras.models.load_model("BasicCNN-30-epochs-0.0001-LR-4.2")
def on_end(self, game_result):
print('--- on_end called ---')
print(game_result, self.use_model)
with open("log.txt","a") as f:
if self.use_model:
f.write("Model {}\n".format(game_result))
else:
f.write("Random {}\n".format(game_result))
async def on_step(self, iteration: int):
self.iteration = iteration
await self.scout()
await self.distribute_workers()
await self.build_workers()
await self.build_pylons()
await self.build_assimilators()
await self.expand()
await self.offensive_force_buildings()
await self.build_offensive_force()
await self.intel()
await self.attack()
## 偵察
async def scout(self):
if len(self.units(UnitTypeId.OBSERVER)) > 0:
scout = self.units(UnitTypeId.OBSERVER)[0]
if scout.is_idle:
enemy_location = self.enemy_start_locations[0]
move_to = self.random_location_variance(enemy_location)
print(move_to)
await self.do(scout.move(move_to))
else:
for rf in self.units(UnitTypeId.ROBOTICSFACILITY).ready.noqueue:
if self.can_afford(UnitTypeId.OBSERVER) and self.supply_left > 0:
await self.do(rf.train(UnitTypeId.OBSERVER))
async def intel(self):
game_data = np.zeros((self.game_info.map_size[1], self.game_info.map_size[0], 3), np.uint8)
# UnitTypeId作為key,半徑和顏色是value
draw_dict = {
UnitTypeId.NEXUS: [15, (0, 255, 0)],
UnitTypeId.PYLON: [3, (20, 235, 0)],
UnitTypeId.PROBE: [1, (55, 200, 0)],
UnitTypeId.ASSIMILATOR: [2, (55, 200, 0)],
UnitTypeId.GATEWAY: [3, (200, 100, 0)],
UnitTypeId.CYBERNETICSCORE: [3, (150, 150, 0)],
UnitTypeId.STARGATE: [5, (255, 0, 0)],
UnitTypeId.ROBOTICSFACILITY: [5, (215, 155, 0)],
UnitTypeId.VOIDRAY: [3, (255, 100, 0)],
# OBSERVER: [3, (255, 255, 255)],
}
for unit_type in draw_dict:
for unit in self.units(unit_type).ready:
pos = unit.position
cv2.circle(game_data, (int(pos[0]), int(pos[1])), draw_dict[unit_type][0], draw_dict[unit_type][1], -1)
# 主基地名稱
main_base_names = ["nexus", "supplydepot", "hatchery"]
# 記錄敵方基地位置
for enemy_building in self.known_enemy_structures:
pos = enemy_building.position
# 不是主基地建築,畫小一些
if enemy_building.name.lower() not in main_base_names:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 5, (200, 50, 212), -1)
for enemy_building in self.known_enemy_structures:
pos = enemy_building.position
if enemy_building.name.lower() in main_base_names:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 15, (0, 0, 255), -1)
for enemy_unit in self.known_enemy_units:
if not enemy_unit.is_structure:
worker_names = ["probe", "scv", "drone"]
# if that unit is a PROBE, SCV, or DRONE... it's a worker
pos = enemy_unit.position
if enemy_unit.name.lower() in worker_names:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (55, 0, 155), -1)
else:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 3, (50, 0, 215), -1)
for obs in self.units(UnitTypeId.OBSERVER).ready:
pos = obs.position
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (255, 255, 255), -1)
# 追蹤資源、人口和軍隊人口比
line_max = 50
mineral_ratio = self.minerals / 1500
if mineral_ratio > 1.0:
mineral_ratio = 1.0
vespene_ratio = self.vespene / 1500
if vespene_ratio > 1.0:
vespene_ratio = 1.0
population_ratio = self.supply_left / self.supply_cap
if population_ratio > 1.0:
population_ratio = 1.0
plausible_supply = self.supply_cap / 200.0
military_weight = len(self.units(UnitTypeId.VOIDRAY)) / (self.supply_cap - self.supply_left)
if military_weight > 1.0:
military_weight = 1.0
# 農民/人口 worker/supply ratio
cv2.line(game_data, (0, 19), (int(line_max * military_weight), 19), (250, 250, 200), 3)
# 人口/200 plausible supply (supply/200.0)
cv2.line(game_data, (0, 15), (int(line_max * plausible_supply), 15), (220, 200, 200), 3)
# (人口-現有人口)/人口 population ratio (supply_left/supply)
cv2.line(game_data, (0, 11), (int(line_max * population_ratio), 11), (150, 150, 150), 3)
# 氣體/1500 gas/1500
cv2.line(game_data, (0, 7), (int(line_max * vespene_ratio), 7), (210, 200, 0), 3)
# 晶體礦/1500 minerals minerals/1500
cv2.line(game_data, (0, 3), (int(line_max * mineral_ratio), 3), (0, 255, 25), 3)
# flip horizontally to make our final fix in visual representation:
self.flipped = cv2.flip(game_data, 0)
resized = cv2.resize(self.flipped, dsize=None, fx=2, fy=2)
if not HEADLESS:
if self.use_model:
cv2.imshow('Model Intel', resized)
cv2.waitKey(1)
else:
cv2.imshow('Random Intel', resized)
cv2.waitKey(1)
def random_location_variance(self, enemy_start_location):
x = enemy_start_location[0]
y = enemy_start_location[1]
x += ((random.randrange(-20, 20)) / 100) * enemy_start_location[0]
y += ((random.randrange(-20, 20)) / 100) * enemy_start_location[1]
if x < 0:
x = 0
if y < 0:
y = 0
if x > self.game_info.map_size[0]:
x = self.game_info.map_size[0]
if y > self.game_info.map_size[1]:
y = self.game_info.map_size[1]
go_to = position.Point2(position.Pointlike((x, y)))
return go_to
# 建造農民
async def build_workers(self):
# 星靈樞鈕*16(一個基地配備16個農民)大於農民數量並且現有農民數量小於MAX_WORKERS
if len(self.units(UnitTypeId.NEXUS)) * 16 > len(self.units(UnitTypeId.PROBE)) and len(
self.units(UnitTypeId.PROBE)) < self.MAX_WORKERS:
# 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於占用資源
for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
# 是否有50晶體礦建造農民
if self.can_afford(UnitTypeId.PROBE):
await self.do(nexus.train(UnitTypeId.PROBE))
## 建造水晶
async def build_pylons(self):
## 供應人口和現有人口之差小於5且建築不是正在建造
if self.supply_left < 5 and not self.already_pending(UnitTypeId.PYLON):
nexuses = self.units(UnitTypeId.NEXUS).ready
if nexuses.exists:
if self.can_afford(UnitTypeId.PYLON):
await self.build(UnitTypeId.PYLON, near=nexuses.first)
## 建造吸收廠
async def build_assimilators(self):
for nexus in self.units(UnitTypeId.NEXUS).ready:
# 在瓦斯泉上建造吸收廠
vaspenes = self.state.vespene_geyser.closer_than(15.0, nexus)
for vaspene in vaspenes:
if not self.can_afford(UnitTypeId.ASSIMILATOR):
break
worker = self.select_build_worker(vaspene.position)
if worker is None:
break
if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0, vaspene).exists:
await self.do(worker.build(UnitTypeId.ASSIMILATOR, vaspene))
## 開礦
async def expand(self):
# (self.iteration / self.ITERATIONS_PER_MINUTE)是一個緩慢遞增的值,動態開礦
if self.units(UnitTypeId.NEXUS).amount < self.iteration / self.ITERATIONS_PER_MINUTE and self.can_afford(
UnitTypeId.NEXUS):
await self.expand_now()
## 建造進攻性建築
async def offensive_force_buildings(self):
# print(self.iteration / self.ITERATIONS_PER_MINUTE)
if self.units(UnitTypeId.PYLON).ready.exists:
pylon = self.units(UnitTypeId.PYLON).ready.random
# 根據神族建築科技圖,折躍門建造過后才可以建造控制核心
if self.units(UnitTypeId.GATEWAY).ready.exists and not self.units(UnitTypeId.CYBERNETICSCORE):
if self.can_afford(UnitTypeId.CYBERNETICSCORE) and not self.already_pending(UnitTypeId.CYBERNETICSCORE):
await self.build(UnitTypeId.CYBERNETICSCORE, near=pylon)
# 否則建造折躍門
# (self.iteration / self.ITERATIONS_PER_MINUTE)/2 是一個緩慢遞增的值
# elif len(self.units(UnitTypeId.GATEWAY)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
elif len(self.units(UnitTypeId.GATEWAY)) < 1:
if self.can_afford(UnitTypeId.GATEWAY) and not self.already_pending(UnitTypeId.GATEWAY):
await self.build(UnitTypeId.GATEWAY, near=pylon)
# 控制核心存在的情況下建造機械台
if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
if len(self.units(UnitTypeId.ROBOTICSFACILITY)) < 1:
if self.can_afford(UnitTypeId.ROBOTICSFACILITY) and not self.already_pending(
UnitTypeId.ROBOTICSFACILITY):
await self.build(UnitTypeId.ROBOTICSFACILITY, near=pylon)
# 控制核心存在的情況下建造星門
if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
if len(self.units(UnitTypeId.STARGATE)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
if self.can_afford(UnitTypeId.STARGATE) and not self.already_pending(UnitTypeId.STARGATE):
await self.build(UnitTypeId.STARGATE, near=pylon)
## 造兵
async def build_offensive_force(self):
for sg in self.units(UnitTypeId.STARGATE).ready.noqueue:
if self.can_afford(UnitTypeId.VOIDRAY) and self.supply_left > 0:
await self.do(sg.train(UnitTypeId.VOIDRAY))
## 尋找目標
def find_target(self, state):
if len(self.known_enemy_units) > 0:
# 隨機選取敵方單位
return random.choice(self.known_enemy_units)
elif len(self.known_enemy_units) > 0:
# 隨機選取敵方建築
return random.choice(self.known_enemy_structures)
else:
# 返回敵方出生點位
return self.enemy_start_locations[0]
## 進攻
async def attack(self):
if len(self.units(UnitTypeId.VOIDRAY).idle) > 0:
if self.use_model:
prediction = self.model.predict([self.flipped.reshape(-1,176,200,3)])
choice = np.argmax(prediction[0])
choice_dict = {0: "No Attack!",
1: "Attack close to our nexus!",
2: "Attack Enemy Structure!",
3: "Attack Enemy Start!"}
print("Choice #{}:{}".format(choice, choice_dict[choice]))
else:
choice = random.randrange(0, 4)
target = False
if self.iteration > self.do_something_after:
if choice == 0:
# 什么都不做
wait = random.randrange(20, 165)
self.do_something_after = self.iteration + wait
elif choice == 1:
# 攻擊離星靈樞紐最近的單位
if len(self.known_enemy_units) > 0:
target = self.known_enemy_units.closest_to(random.choice(self.units(UnitTypeId.NEXUS)))
elif choice == 2:
# 攻擊敵方建築
if len(self.known_enemy_structures) > 0:
target = random.choice(self.known_enemy_structures)
elif choice == 3:
# 攻擊敵方出生位置(換家)
target = self.enemy_start_locations[0]
if target:
for vr in self.units(UnitTypeId.VOIDRAY).idle:
await self.do(vr.attack(target))
y = np.zeros(4)
y[choice] = 1
print(y)
self.train_data.append([y, self.flipped])
## 啟動游戲
for i in range(10):
run_game(maps.get("AbyssalReefLE"), [
Bot(Race.Protoss, SentdeBot(use_model=True)),
Computer(Race.Terran, Difficulty.Medium)
], realtime=False)
log.txt輸出結果如下
···
Model Result.Defeat
Model Result.Victory
Model Result.Defeat
Model Result.Victory
Model Result.Victory
Model Result.Defeat
Model Result.Victory
Model Result.Victory
Model Result.Victory
Model Result.Victory
Model Result.Victoryd
Model Result.Defeat
Model Result.Victory
Model Result.Victory
Model Result.Defeat
Model Result.Victory
Model Result.Defeat
Model Result.Victory
Model Result.Victory
Model Result.Defeat
Model Result.Defeat
Model Result.Victory
···
打敗中等難度電腦的勝率大約59%,還需要改進
小優化
獲取游戲時間,game loop, 22.4 per second on faster game speed
async def on_step(self, iteration):
#self.iteration = iteration
self.timeMinutes = (self.state.game_loop/22.4) / 60
在random_location_variance方法里,修改enemy_start_location
為self.game_info.map_size
def random_location_variance(self, enemy_start_location):
x = enemy_start_location[0]
y = enemy_start_location[1]
# 修改
x += ((random.randrange(-20, 20))/100) * self.game_info.map_size[0]
y += ((random.randrange(-20, 20))/100) * self.game_info.map_size[1]
expand方法修改,將self.iteration / self.ITERATIONS_PER_MINUTE
修改成self.timeMinutes/2
async def expand(self):
try:
if self.units(NEXUS).amount < self.timeMinutes/2 and self.can_afford(NEXUS):
await self.expand_now()
except Exception as e:
print(str(e))
offensive_force_buildings方法修改,將self.iteration / self.ITERATIONS_PER_MINUTE) / 2
修改成self.timeMinutes
if self.units(CYBERNETICSCORE).ready.exists:
if len(self.units(STARGATE)) < self.timeMinutes: # 在這里修改
if self.can_afford(STARGATE) and not self.already_pending(STARGATE):
await self.build(STARGATE, near=pylon)
attack方法修改,將self.iteration
修改成self.timeMinutes
,“什么都不做“的等待時間修復
async def attack(self):
if len(self.units(VOIDRAY).idle) > 0:
target = False
if self.timeMinutes > self.do_something_after: # 這里修改
if self.use_model:
prediction = self.model.predict([self.flipped.reshape([-1, 176, 200, 3])])
choice = np.argmax(prediction[0])
else:
choice = random.randrange(0, 4)
if choice == 0:
# no attack
wait = random.randrange(7,100)/100 # 這里修改
self.do_something_after = self.timeMinutes+ wait
完整代碼如下
# -*- encoding: utf-8 -*-
'''
@File : demo.py
@Modify Time @Author @Desciption
------------ ------- -----------
2019/11/3 12:32 Jonas None
'''
import os
import time
import sc2
from sc2 import run_game, maps, Race, Difficulty, position, Result
from sc2.player import Bot, Computer
from sc2.constants import *
import random
import numpy as np
import cv2
import keras
HEADLESS = False
# os.environ["SC2PATH"] = 'F:\StarCraft II'
class SentdeBot(sc2.BotAI):
def __init__(self, use_model=False):
# 經過計算,每分鍾大約165迭代次數
# self.ITERATIONS_PER_MINUTE = 165
# 最大農民數量
self.MAX_WORKERS = 50
self.do_something_after = 0
self.train_data = []
self.use_model = use_model
self.scouts_and_spots = {}
if self.use_model:
print("use model")
self.model = keras.models.load_model("BasicCNN-30-epochs-0.0001-LR-4.2")
def on_end(self, game_result):
print('--- on_end called ---')
print(game_result, self.use_model)
with open("log.txt", "a") as f:
if self.use_model:
f.write("Model {}\n".format(game_result))
else:
f.write("Random {}\n".format(game_result))
async def on_step(self, iteration):
# self.iteration = iteration
self.timeMinutes = ((self.state.game_loop / 22.4) / 60)
print('Time:', self.timeMinutes)
await self.scout()
await self.distribute_workers()
await self.build_workers()
await self.build_pylons()
await self.build_assimilators()
await self.expand()
await self.offensive_force_buildings()
await self.build_offensive_force()
await self.intel()
await self.attack()
## 偵察
async def scout(self):
if len(self.units(UnitTypeId.OBSERVER)) > 0:
scout = self.units(UnitTypeId.OBSERVER)[0]
if scout.is_idle:
enemy_location = self.enemy_start_locations[0]
move_to = self.random_location_variance(enemy_location)
print(move_to)
await self.do(scout.move(move_to))
else:
for rf in self.units(UnitTypeId.ROBOTICSFACILITY).ready.noqueue:
if self.can_afford(UnitTypeId.OBSERVER) and self.supply_left > 0:
await self.do(rf.train(UnitTypeId.OBSERVER))
async def intel(self):
game_data = np.zeros((self.game_info.map_size[1], self.game_info.map_size[0], 3), np.uint8)
# UnitTypeId作為key,半徑和顏色是value
draw_dict = {
UnitTypeId.NEXUS: [15, (0, 255, 0)],
UnitTypeId.PYLON: [3, (20, 235, 0)],
UnitTypeId.PROBE: [1, (55, 200, 0)],
UnitTypeId.ASSIMILATOR: [2, (55, 200, 0)],
UnitTypeId.GATEWAY: [3, (200, 100, 0)],
UnitTypeId.CYBERNETICSCORE: [3, (150, 150, 0)],
UnitTypeId.STARGATE: [5, (255, 0, 0)],
UnitTypeId.ROBOTICSFACILITY: [5, (215, 155, 0)],
UnitTypeId.VOIDRAY: [3, (255, 100, 0)],
# OBSERVER: [3, (255, 255, 255)],
}
for unit_type in draw_dict:
for unit in self.units(unit_type).ready:
pos = unit.position
cv2.circle(game_data, (int(pos[0]), int(pos[1])), draw_dict[unit_type][0], draw_dict[unit_type][1], -1)
# 主基地名稱
main_base_names = ["nexus", "supplydepot", "hatchery"]
# 記錄敵方基地位置
for enemy_building in self.known_enemy_structures:
pos = enemy_building.position
# 不是主基地建築,畫小一些
if enemy_building.name.lower() not in main_base_names:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 5, (200, 50, 212), -1)
for enemy_building in self.known_enemy_structures:
pos = enemy_building.position
if enemy_building.name.lower() in main_base_names:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 15, (0, 0, 255), -1)
for enemy_unit in self.known_enemy_units:
if not enemy_unit.is_structure:
worker_names = ["probe", "scv", "drone"]
# if that unit is a PROBE, SCV, or DRONE... it's a worker
pos = enemy_unit.position
if enemy_unit.name.lower() in worker_names:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (55, 0, 155), -1)
else:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 3, (50, 0, 215), -1)
for obs in self.units(UnitTypeId.OBSERVER).ready:
pos = obs.position
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (255, 255, 255), -1)
# 追蹤資源、人口和軍隊人口比
line_max = 50
mineral_ratio = self.minerals / 1500
if mineral_ratio > 1.0:
mineral_ratio = 1.0
vespene_ratio = self.vespene / 1500
if vespene_ratio > 1.0:
vespene_ratio = 1.0
population_ratio = self.supply_left / self.supply_cap
if population_ratio > 1.0:
population_ratio = 1.0
plausible_supply = self.supply_cap / 200.0
military_weight = len(self.units(UnitTypeId.VOIDRAY)) / (self.supply_cap - self.supply_left)
if military_weight > 1.0:
military_weight = 1.0
# 農民/人口 worker/supply ratio
cv2.line(game_data, (0, 19), (int(line_max * military_weight), 19), (250, 250, 200), 3)
# 人口/200 plausible supply (supply/200.0)
cv2.line(game_data, (0, 15), (int(line_max * plausible_supply), 15), (220, 200, 200), 3)
# (人口-現有人口)/人口 population ratio (supply_left/supply)
cv2.line(game_data, (0, 11), (int(line_max * population_ratio), 11), (150, 150, 150), 3)
# 氣體/1500 gas/1500
cv2.line(game_data, (0, 7), (int(line_max * vespene_ratio), 7), (210, 200, 0), 3)
# 晶體礦/1500 minerals minerals/1500
cv2.line(game_data, (0, 3), (int(line_max * mineral_ratio), 3), (0, 255, 25), 3)
# flip horizontally to make our final fix in visual representation:
self.flipped = cv2.flip(game_data, 0)
resized = cv2.resize(self.flipped, dsize=None, fx=2, fy=2)
if not HEADLESS:
if self.use_model:
cv2.imshow('Model Intel', resized)
cv2.waitKey(1)
else:
cv2.imshow('Random Intel', resized)
cv2.waitKey(1)
def random_location_variance(self, enemy_start_location):
x = enemy_start_location[0]
y = enemy_start_location[1]
x += ((random.randrange(-20, 20)) / 100) * self.game_info.map_size[0]
y += ((random.randrange(-20, 20)) / 100) * self.game_info.map_size[1]
if x < 0:
x = 0
if y < 0:
y = 0
if x > self.game_info.map_size[0]:
x = self.game_info.map_size[0]
if y > self.game_info.map_size[1]:
y = self.game_info.map_size[1]
go_to = position.Point2(position.Pointlike((x, y)))
return go_to
# 建造農民
async def build_workers(self):
# 星靈樞鈕*16(一個基地配備16個農民)大於農民數量並且現有農民數量小於MAX_WORKERS
if len(self.units(UnitTypeId.NEXUS)) * 16 > len(self.units(UnitTypeId.PROBE)) and len(
self.units(UnitTypeId.PROBE)) < self.MAX_WORKERS:
# 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於占用資源
for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
# 是否有50晶體礦建造農民
if self.can_afford(UnitTypeId.PROBE):
await self.do(nexus.train(UnitTypeId.PROBE))
## 建造水晶
async def build_pylons(self):
## 供應人口和現有人口之差小於5且建築不是正在建造
if self.supply_left < 5 and not self.already_pending(UnitTypeId.PYLON):
nexuses = self.units(UnitTypeId.NEXUS).ready
if nexuses.exists:
if self.can_afford(UnitTypeId.PYLON):
await self.build(UnitTypeId.PYLON, near=nexuses.first)
## 建造吸收廠
async def build_assimilators(self):
for nexus in self.units(UnitTypeId.NEXUS).ready:
# 在瓦斯泉上建造吸收廠
vaspenes = self.state.vespene_geyser.closer_than(15.0, nexus)
for vaspene in vaspenes:
if not self.can_afford(UnitTypeId.ASSIMILATOR):
break
worker = self.select_build_worker(vaspene.position)
if worker is None:
break
if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0, vaspene).exists:
await self.do(worker.build(UnitTypeId.ASSIMILATOR, vaspene))
## 開礦
async def expand(self):
# (self.iteration / self.ITERATIONS_PER_MINUTE)是一個緩慢遞增的值,動態開礦
if self.units(UnitTypeId.NEXUS).amount < self.timeMinutes / 2 and self.can_afford(
UnitTypeId.NEXUS):
await self.expand_now()
## 建造進攻性建築
async def offensive_force_buildings(self):
# print(self.iteration / self.ITERATIONS_PER_MINUTE)
if self.units(UnitTypeId.PYLON).ready.exists:
pylon = self.units(UnitTypeId.PYLON).ready.random
# 根據神族建築科技圖,折躍門建造過后才可以建造控制核心
if self.units(UnitTypeId.GATEWAY).ready.exists and not self.units(UnitTypeId.CYBERNETICSCORE):
if self.can_afford(UnitTypeId.CYBERNETICSCORE) and not self.already_pending(UnitTypeId.CYBERNETICSCORE):
await self.build(UnitTypeId.CYBERNETICSCORE, near=pylon)
# 否則建造折躍門
# (self.iteration / self.ITERATIONS_PER_MINUTE)/2 是一個緩慢遞增的值
# elif len(self.units(UnitTypeId.GATEWAY)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
elif len(self.units(UnitTypeId.GATEWAY)) < 1:
if self.can_afford(UnitTypeId.GATEWAY) and not self.already_pending(UnitTypeId.GATEWAY):
await self.build(UnitTypeId.GATEWAY, near=pylon)
# 控制核心存在的情況下建造機械台
if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
if len(self.units(UnitTypeId.ROBOTICSFACILITY)) < 1:
if self.can_afford(UnitTypeId.ROBOTICSFACILITY) and not self.already_pending(
UnitTypeId.ROBOTICSFACILITY):
await self.build(UnitTypeId.ROBOTICSFACILITY, near=pylon)
# 控制核心存在的情況下建造星門
if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
if len(self.units(UnitTypeId.STARGATE)) < self.timeMinutes:
if self.can_afford(UnitTypeId.STARGATE) and not self.already_pending(UnitTypeId.STARGATE):
await self.build(UnitTypeId.STARGATE, near=pylon)
## 造兵
async def build_offensive_force(self):
for sg in self.units(UnitTypeId.STARGATE).ready.noqueue:
if self.can_afford(UnitTypeId.VOIDRAY) and self.supply_left > 0:
await self.do(sg.train(UnitTypeId.VOIDRAY))
## 尋找目標
def find_target(self, state):
if len(self.known_enemy_units) > 0:
# 隨機選取敵方單位
return random.choice(self.known_enemy_units)
elif len(self.known_enemy_units) > 0:
# 隨機選取敵方建築
return random.choice(self.known_enemy_structures)
else:
# 返回敵方出生點位
return self.enemy_start_locations[0]
## 進攻
async def attack(self):
if len(self.units(UnitTypeId.VOIDRAY).idle) > 0:
if self.use_model:
prediction = self.model.predict([self.flipped.reshape(-1, 176, 200, 3)])
choice = np.argmax(prediction[0])
choice_dict = {0: "No Attack!",
1: "Attack close to our nexus!",
2: "Attack Enemy Structure!",
3: "Attack Enemy Start!"}
print("Choice #{}:{}".format(choice, choice_dict[choice]))
else:
choice = random.randrange(0, 4)
target = False
if self.timeMinutes > self.do_something_after:
if choice == 0:
# 什么都不做
wait = random.randrange(7, 100) / 100
self.do_something_after = self.timeMinutes + wait
elif choice == 1:
# 攻擊離星靈樞紐最近的單位
if len(self.known_enemy_units) > 0:
target = self.known_enemy_units.closest_to(random.choice(self.units(UnitTypeId.NEXUS)))
elif choice == 2:
# 攻擊敵方建築
if len(self.known_enemy_structures) > 0:
target = random.choice(self.known_enemy_structures)
elif choice == 3:
# 攻擊敵方出生位置(換家)
target = self.enemy_start_locations[0]
if target:
for vr in self.units(UnitTypeId.VOIDRAY).idle:
await self.do(vr.attack(target))
y = np.zeros(4)
y[choice] = 1
print(y)
self.train_data.append([y, self.flipped])
## 啟動游戲
# for i in range(50):
run_game(maps.get("AbyssalReefLE"), [
Bot(Race.Protoss, SentdeBot(use_model=True)),
Computer(Race.Terran, Difficulty.Medium)
], realtime=False)
偵察優化
偵察需要優化的三個點:
1.在ob建造前,使用農民去偵察
2.使用多個偵察單位
3.在待開礦的區域放置偵察單位
保存當前偵察位置
def __init__(self, use_model=False):
...
self.scouts_and_spots = {}
偵察敵方出生點位附近位置,即敵方開礦、擴張的位置
async def scout(self):
# {DISTANCE_TO_ENEMY_START:EXPANSIONLOC}
self.expand_dis_dir = {}
expand_dis_dir字典中key是到出生點位的距離,value是實際位置
for el in self.expansion_locations:
distance_to_enemy_start = el.distance_to(self.enemy_start_locations[0])
#print(distance_to_enemy_start)
self.expand_dis_dir[distance_to_enemy_start] = el
對expand_dis_dir的key排序
self.ordered_exp_distances = sorted(k for k in self.expand_dis_dir)
我們的偵察單位會經常被殺,所以我們要時刻檢查偵察單位是否存在
existing_ids = [unit.tag for unit in self.units]
# 刪除已經死亡的偵察單位
to_be_removed = []
for noted_scout in self.scouts_and_spots:
if noted_scout in existing_ids:
to_be_removed.append(noted_scout)
for scout in to_be_removed:
del self.scouts_and_spots[scout]
緊隨其后,加上如下判斷。如果沒有ob,使用農民偵察
if len(self.units(ROBOTICSFACILITY).ready) == 0:
unit_type = PROBE
unit_limit = 1
else:
unit_type = OBSERVER
unit_limit = 15
只需要一個農民偵察,其余采礦、采氣
assign_scout = True
if unit_type == PROBE:
for unit in self.units(PROBE):
if unit.tag in self.scouts_and_spots:
assign_scout = False
分配偵察任務
至少有一個單位處在空閑狀態,然后根據unit_limit迭代該單位類型,再檢查unit's tag是否在我們self.scouts_and_spots字典中,最終遍歷我們已經排序好的ordered_exp_distances字典。
對於每一個距離,我們查詢location是否我們正在偵察的位置,如果不是,我們分配偵察單位偵察任務。如果偵察任務已指派,continue,未指派則break
if assign_scout:
if len(self.units(unit_type).idle) > 0:
for obs in self.units(unit_type).idle[:unit_limit]:
if obs.tag not in self.scouts_and_spots:
for dist in self.ordered_exp_distances:
try:
location = next(value for key, value in self.expand_dis_dir.items() if key == dist)
# DICT {UNIT_ID:LOCATION}
active_locations = [self.scouts_and_spots[k] for k in self.scouts_and_spots]
if location not in active_locations:
if unit_type == PROBE:
for unit in self.units(PROBE):
if unit.tag in self.scouts_and_spots:
continue
await self.do(obs.move(location))
self.scouts_and_spots[obs.tag] = location
break
except Exception as e:
pass
防止去偵察的農民去采礦
for obs in self.units(unit_type):
if obs.tag in self.scouts_and_spots:
if obs in [probe for probe in self.units(UnitTypeId.PROBE)]:
await self.do(obs.move(self.random_location_variance(self.scouts_and_spots[obs.tag])))
random_location_variance方法修改
def random_location_variance(self, location):
x = location[0]
y = location[1]
# FIXED THIS
x += random.randrange(-5,5)
y += random.randrange(-5,5)
if x < 0:
print("x below")
x = 0
if y < 0:
print("y below")
y = 0
if x > self.game_info.map_size[0]:
print("x above")
x = self.game_info.map_size[0]
if y > self.game_info.map_size[1]:
print("y above")
y = self.game_info.map_size[1]
go_to = position.Point2(position.Pointlike((x,y)))
return go_to
添加build_scout方法
async def build_scout(self):
if len(self.units(OBSERVER)) < math.floor(self.time/3):
for rf in self.units(ROBOTICSFACILITY).ready.noqueue:
print(len(self.units(OBSERVER)), self.time/3)
if self.can_afford(OBSERVER) and self.supply_left > 0:
await self.do(rf.train(OBSERVER))
在on_step方法加上await self.build_scout()
完整代碼如下
# -*- encoding: utf-8 -*-
'''
@File : demo.py
@Modify Time @Author @Desciption
------------ ------- -----------
2019/11/3 12:32 Jonas None
'''
import math
import os
import time
import sc2
from sc2 import run_game, maps, Race, Difficulty, position, Result
from sc2.player import Bot, Computer
from sc2.constants import *
import random
import numpy as np
import cv2
import keras
HEADLESS = False
# os.environ["SC2PATH"] = 'F:\StarCraft II'
class SentdeBot(sc2.BotAI):
def __init__(self, use_model=False):
# 經過計算,每分鍾大約165迭代次數
# self.ITERATIONS_PER_MINUTE = 165
# 最大農民數量
self.MAX_WORKERS = 50
self.do_something_after = 0
self.train_data = []
self.use_model = use_model
self.scouts_and_spots = {}
if self.use_model:
print("use model")
self.model = keras.models.load_model("BasicCNN-30-epochs-0.0001-LR-4.2")
# 存儲.npy訓練數據
def on_end(self, game_result):
print('--- on_end called ---')
print(game_result, self.use_model)
with open("log.txt", "a") as f:
if self.use_model:
f.write("Model {}\n".format(game_result))
else:
f.write("Random {}\n".format(game_result))
async def on_step(self, iteration):
# self.iteration = iteration
self.timeMinutes = ((self.state.game_loop / 22.4) / 60)
# print('Time:', self.timeMinutes)
await self.build_scout()
await self.scout()
await self.distribute_workers()
await self.build_workers()
await self.build_pylons()
await self.build_assimilators()
await self.expand()
await self.offensive_force_buildings()
await self.build_offensive_force()
await self.intel()
await self.attack()
async def build_scout(self):
if len(self.units(UnitTypeId.OBSERVER)) < math.floor(self.time / 3):
for rf in self.units(UnitTypeId.ROBOTICSFACILITY).ready.noqueue:
print(len(self.units(UnitTypeId.OBSERVER)), self.time / 3)
if self.can_afford(UnitTypeId.OBSERVER) and self.supply_left > 0:
await self.do(rf.train(UnitTypeId.OBSERVER))
## 偵察
async def scout(self):
# {DISTANCE_TO_ENEMY_START:EXPANSIONLOC}
self.expand_dis_dir = {}
for el in self.expansion_locations:
distance_to_enemy_start = el.distance_to(self.enemy_start_locations[0])
self.expand_dis_dir[distance_to_enemy_start] = el
self.ordered_exp_distances = sorted(k for k in self.expand_dis_dir)
existing_ids = [unit.tag for unit in self.units]
# 刪除已經死亡的偵察單位
to_be_removed = []
for noted_scout in self.scouts_and_spots:
if noted_scout in existing_ids:
to_be_removed.append(noted_scout)
for scout in to_be_removed:
del self.scouts_and_spots[scout]
if len(self.units(UnitTypeId.ROBOTICSFACILITY).ready) == 0:
unit_type = UnitTypeId.PROBE
unit_limit = 1
else:
unit_type = UnitTypeId.OBSERVER
unit_limit = 15
assign_scout = True
if unit_type == UnitTypeId.PROBE:
for unit in self.units(UnitTypeId.PROBE):
if unit.tag in self.scouts_and_spots:
assign_scout = False
# 分配偵察任務
if assign_scout:
if len(self.units(unit_type).idle) > 0:
for obs in self.units(unit_type).idle[:unit_limit]:
if obs.tag not in self.scouts_and_spots:
for dist in self.ordered_exp_distances:
try:
location = next(value for key, value in self.expand_dis_dir.items() if key == dist)
# DICT {UNIT_ID:LOCATION}
active_locations = [self.scouts_and_spouts[k] for k in self.scouts_and_spots]
if location not in active_locations:
if unit_type == UnitTypeId.PROBE:
for unit in self.units(UnitTypeId.PROBE):
if unit.tag in self.scouts_and_spouts:
continue
await self.do(obs.move(location))
self.scouts_and_spouts[obs.tag] = location
break
except Exception as e:
pass
# 防止去偵察的農民采礦
for obs in self.units(unit_type):
if obs.tag in self.scouts_and_spots:
if obs in [probe for probe in self.units(UnitTypeId.PROBE)]:
await self.do(obs.move(self.random_location_variance(self.scouts_and_spots[obs.tag])))
async def intel(self):
game_data = np.zeros((self.game_info.map_size[1], self.game_info.map_size[0], 3), np.uint8)
# UnitTypeId作為key,半徑和顏色是value
draw_dict = {
UnitTypeId.NEXUS: [15, (0, 255, 0)],
UnitTypeId.PYLON: [3, (20, 235, 0)],
UnitTypeId.PROBE: [1, (55, 200, 0)],
UnitTypeId.ASSIMILATOR: [2, (55, 200, 0)],
UnitTypeId.GATEWAY: [3, (200, 100, 0)],
UnitTypeId.CYBERNETICSCORE: [3, (150, 150, 0)],
UnitTypeId.STARGATE: [5, (255, 0, 0)],
UnitTypeId.ROBOTICSFACILITY: [5, (215, 155, 0)],
UnitTypeId.VOIDRAY: [3, (255, 100, 0)],
# OBSERVER: [3, (255, 255, 255)],
}
for unit_type in draw_dict:
for unit in self.units(unit_type).ready:
pos = unit.position
cv2.circle(game_data, (int(pos[0]), int(pos[1])), draw_dict[unit_type][0], draw_dict[unit_type][1], -1)
# 主基地名稱
main_base_names = ['nexus', 'commandcenter', 'orbitalcommand', 'planetaryfortress', 'hatchery']
# 記錄敵方基地位置
for enemy_building in self.known_enemy_structures:
pos = enemy_building.position
# 不是主基地建築,畫小一些
if enemy_building.name.lower() not in main_base_names:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 5, (200, 50, 212), -1)
for enemy_building in self.known_enemy_structures:
pos = enemy_building.position
if enemy_building.name.lower() in main_base_names:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 15, (0, 0, 255), -1)
for enemy_unit in self.known_enemy_units:
if not enemy_unit.is_structure:
worker_names = ["probe", "scv", "drone"]
# if that unit is a PROBE, SCV, or DRONE... it's a worker
pos = enemy_unit.position
if enemy_unit.name.lower() in worker_names:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (55, 0, 155), -1)
else:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 3, (50, 0, 215), -1)
for obs in self.units(UnitTypeId.OBSERVER).ready:
pos = obs.position
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (255, 255, 255), -1)
# 追蹤資源、人口和軍隊人口比
line_max = 50
mineral_ratio = self.minerals / 1500
if mineral_ratio > 1.0:
mineral_ratio = 1.0
vespene_ratio = self.vespene / 1500
if vespene_ratio > 1.0:
vespene_ratio = 1.0
population_ratio = self.supply_left / self.supply_cap
if population_ratio > 1.0:
population_ratio = 1.0
plausible_supply = self.supply_cap / 200.0
military_weight = len(self.units(UnitTypeId.VOIDRAY)) / (self.supply_cap - self.supply_left)
if military_weight > 1.0:
military_weight = 1.0
# 農民/人口 worker/supply ratio
cv2.line(game_data, (0, 19), (int(line_max * military_weight), 19), (250, 250, 200), 3)
# 人口/200 plausible supply (supply/200.0)
cv2.line(game_data, (0, 15), (int(line_max * plausible_supply), 15), (220, 200, 200), 3)
# (人口-現有人口)/人口 population ratio (supply_left/supply)
cv2.line(game_data, (0, 11), (int(line_max * population_ratio), 11), (150, 150, 150), 3)
# 氣體/1500 gas/1500
cv2.line(game_data, (0, 7), (int(line_max * vespene_ratio), 7), (210, 200, 0), 3)
# 晶體礦/1500 minerals minerals/1500
cv2.line(game_data, (0, 3), (int(line_max * mineral_ratio), 3), (0, 255, 25), 3)
# flip horizontally to make our final fix in visual representation:
self.flipped = cv2.flip(game_data, 0)
resized = cv2.resize(self.flipped, dsize=None, fx=2, fy=2)
if not HEADLESS:
if self.use_model:
cv2.imshow('Model Intel', resized)
cv2.waitKey(1)
else:
cv2.imshow('Random Intel', resized)
cv2.waitKey(1)
def random_location_variance(self, enemy_start_location):
x = enemy_start_location[0]
y = enemy_start_location[1]
x += random.randrange(-5, 5)
y += random.randrange(-5, 5)
if x < 0:
x = 0
if y < 0:
y = 0
if x > self.game_info.map_size[0]:
x = self.game_info.map_size[0]
if y > self.game_info.map_size[1]:
y = self.game_info.map_size[1]
go_to = position.Point2(position.Pointlike((x, y)))
return go_to
# 建造農民
async def build_workers(self):
# 星靈樞鈕*16(一個基地配備16個農民)大於農民數量並且現有農民數量小於MAX_WORKERS
if len(self.units(UnitTypeId.NEXUS)) * 16 > len(self.units(UnitTypeId.PROBE)) and len(
self.units(UnitTypeId.PROBE)) < self.MAX_WORKERS:
# 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於占用資源
for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
# 是否有50晶體礦建造農民
if self.can_afford(UnitTypeId.PROBE):
await self.do(nexus.train(UnitTypeId.PROBE))
## 建造水晶
async def build_pylons(self):
## 供應人口和現有人口之差小於5且建築不是正在建造
if self.supply_left < 5 and not self.already_pending(UnitTypeId.PYLON):
nexuses = self.units(UnitTypeId.NEXUS).ready
if nexuses.exists:
if self.can_afford(UnitTypeId.PYLON):
await self.build(UnitTypeId.PYLON, near=nexuses.first)
## 建造吸收廠
async def build_assimilators(self):
for nexus in self.units(UnitTypeId.NEXUS).ready:
# 在瓦斯泉上建造吸收廠
vaspenes = self.state.vespene_geyser.closer_than(15.0, nexus)
for vaspene in vaspenes:
if not self.can_afford(UnitTypeId.ASSIMILATOR):
break
worker = self.select_build_worker(vaspene.position)
if worker is None:
break
if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0, vaspene).exists:
await self.do(worker.build(UnitTypeId.ASSIMILATOR, vaspene))
## 開礦
async def expand(self):
# (self.iteration / self.ITERATIONS_PER_MINUTE)是一個緩慢遞增的值,動態開礦
if self.units(UnitTypeId.NEXUS).amount < self.timeMinutes / 2 and self.can_afford(
UnitTypeId.NEXUS):
await self.expand_now()
## 建造進攻性建築
async def offensive_force_buildings(self):
# print(self.iteration / self.ITERATIONS_PER_MINUTE)
if self.units(UnitTypeId.PYLON).ready.exists:
pylon = self.units(UnitTypeId.PYLON).ready.random
# 根據神族建築科技圖,折躍門建造過后才可以建造控制核心
if self.units(UnitTypeId.GATEWAY).ready.exists and not self.units(UnitTypeId.CYBERNETICSCORE):
if self.can_afford(UnitTypeId.CYBERNETICSCORE) and not self.already_pending(UnitTypeId.CYBERNETICSCORE):
await self.build(UnitTypeId.CYBERNETICSCORE, near=pylon)
# 否則建造折躍門
# (self.iteration / self.ITERATIONS_PER_MINUTE)/2 是一個緩慢遞增的值
# elif len(self.units(UnitTypeId.GATEWAY)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
elif len(self.units(UnitTypeId.GATEWAY)) < 1:
if self.can_afford(UnitTypeId.GATEWAY) and not self.already_pending(UnitTypeId.GATEWAY):
await self.build(UnitTypeId.GATEWAY, near=pylon)
# 控制核心存在的情況下建造機械台
if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
if len(self.units(UnitTypeId.ROBOTICSFACILITY)) < 1:
if self.can_afford(UnitTypeId.ROBOTICSFACILITY) and not self.already_pending(
UnitTypeId.ROBOTICSFACILITY):
await self.build(UnitTypeId.ROBOTICSFACILITY, near=pylon)
# 控制核心存在的情況下建造星門
if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
if len(self.units(UnitTypeId.STARGATE)) < self.timeMinutes:
if self.can_afford(UnitTypeId.STARGATE) and not self.already_pending(UnitTypeId.STARGATE):
await self.build(UnitTypeId.STARGATE, near=pylon)
## 造兵
async def build_offensive_force(self):
for sg in self.units(UnitTypeId.STARGATE).ready.noqueue:
if self.can_afford(UnitTypeId.VOIDRAY) and self.supply_left > 0:
await self.do(sg.train(UnitTypeId.VOIDRAY))
## 尋找目標
def find_target(self, state):
if len(self.known_enemy_units) > 0:
# 隨機選取敵方單位
return random.choice(self.known_enemy_units)
elif len(self.known_enemy_units) > 0:
# 隨機選取敵方建築
return random.choice(self.known_enemy_structures)
else:
# 返回敵方出生點位
return self.enemy_start_locations[0]
## 進攻
async def attack(self):
if len(self.units(UnitTypeId.VOIDRAY).idle) > 0:
if self.use_model:
prediction = self.model.predict([self.flipped.reshape(-1, 176, 200, 3)])
choice = np.argmax(prediction[0])
choice_dict = {0: "No Attack!",
1: "Attack close to our nexus!",
2: "Attack Enemy Structure!",
3: "Attack Enemy Start!"}
print("Choice #{}:{}".format(choice, choice_dict[choice]))
else:
choice = random.randrange(0, 4)
target = False
if self.timeMinutes > self.do_something_after:
if choice == 0:
# 什么都不做
wait = random.randrange(7, 100) / 100
self.do_something_after = self.timeMinutes + wait
elif choice == 1:
# 攻擊離星靈樞紐最近的單位
if len(self.known_enemy_units) > 0:
target = self.known_enemy_units.closest_to(random.choice(self.units(UnitTypeId.NEXUS)))
elif choice == 2:
# 攻擊敵方建築
if len(self.known_enemy_structures) > 0:
target = random.choice(self.known_enemy_structures)
elif choice == 3:
# 攻擊敵方出生位置(換家)
target = self.enemy_start_locations[0]
if target:
for vr in self.units(UnitTypeId.VOIDRAY).idle:
await self.do(vr.attack(target))
y = np.zeros(4)
y[choice] = 1
print(y)
self.train_data.append([y, self.flipped])
## 啟動游戲
# for i in range(50):
run_game(maps.get("AbyssalReefLE"), [
Bot(Race.Protoss, SentdeBot(use_model=True)),
Computer(Race.Terran, Difficulty.Medium)
], realtime=False)
運行結果如下,可以看到,農民到敵方基地偵察了。
增加選擇
choice的字典如下:
self.choices = {0: self.build_scout,
1: self.build_zealot,
2: self.build_gateway,
3: self.build_voidray,
4: self.build_stalker,
5: self.build_worker,
6: self.build_assimilator,
7: self.build_stargate,
8: self.build_pylon,
9: self.defend_nexus,
10: self.attack_known_enemy_unit,
11: self.attack_known_enemy_structure,
12: self.expand,
13: self.do_nothing,
}
on_step方法中刪除多余的調用方法
async def on_step(self, iteration):
self.timeMinutes = (self.state.game_loop/22.4) / 60
#print('Time:',self.time)
await self.distribute_workers()
await self.scout()
await self.intel()
await self.do_something()
創建do_something方法
async def do_something(self):
if self.time > self.do_something_after:
if self.use_model:
prediction = self.model.predict([self.flipped.reshape([-1, 176, 200, 3])])
choice = np.argmax(prediction[0])
else:
choice = random.randrange(0, 14)
try:
await self.choices[choice]()
except Exception as e:
print(str(e))
y = np.zeros(14)
y[choice] = 1
self.train_data.append([y, self.flipped])
實現on_step中的方法
訓練偵察單位
async def build_scout(self):
for rf in self.units(UnitTypeId.ROBOTICSFACILITY).ready.noqueue:
print(len(self.units(UnitTypeId.OBSERVER)), self.timeMinutes/3)
if self.can_afford(UnitTypeId.OBSERVER) and self.supply_left > 0:
await self.do(rf.train(UnitTypeId.OBSERVER))
break
訓練狂熱者
async def build_zealot(self):
gateways = self.units(UnitTypeId.GATEWAY).ready
if gateways.exists:
if self.can_afford(UnitTypeId.ZEALOT):
await self.do(random.choice(gateways).train(UnitTypeId.ZEALOT))
建造折躍門
async def build_gateway(self):
pylon = self.units(UnitTypeId.PYLON).ready.random
if self.can_afford(UnitTypeId.GATEWAY) and not self.already_pending(UnitTypeId.GATEWAY):
await self.build(UnitTypeId.GATEWAY, near=pylon)
訓練虛空戰艦
async def build_voidray(self):
stargates = self.units(UnitTypeId.STARGATE).ready
if stargates.exists:
if self.can_afford(UnitTypeId.VOIDRAY):
await self.do(random.choice(stargates).train(UnitTypeId.VOIDRAY))
訓練追獵者
async def build_stalker(self):
pylon = self.units(UnitTypeId.PYLON).ready.random
gateways = self.units(UnitTypeId.GATEWAY).ready
cybernetics_cores = self.units(UnitTypeId.CYBERNETICSCORE).ready
if gateways.exists and cybernetics_cores.exists:
if self.can_afford(UnitTypeId.STALKER):
await self.do(random.choice(gateways).train(UnitTypeId.STALKER))
if not cybernetics_cores.exists:
if self.units(UnitTypeId.GATEWAY).ready.exists:
if self.can_afford(UnitTypeId.CYBERNETICSCORE) and not self.already_pending(UnitTypeId.CYBERNETICSCORE):
await self.build(UnitTypeId.CYBERNETICSCORE, near=pylon)
訓練農民
async def build_worker(self):
nexuses = self.units(UnitTypeId.NEXUS).ready
if nexuses.exists:
if self.can_afford(UnitTypeId.PROBE):
await self.do(random.choice(nexuses).train(UnitTypeId.PROBE))
建造瓦斯氣泉
async def build_assimilator(self):
for nexus in self.units(UnitTypeId.NEXUS).ready:
vaspenes = self.state.vespene_geyser.closer_than(15.0, nexus)
for vaspene in vaspenes:
if not self.can_afford(UnitTypeId.ASSIMILATOR):
break
worker = self.select_build_worker(vaspene.position)
if worker is None:
break
if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0, vaspene).exists:
await self.do(worker.build(UnitTypeId.ASSIMILATOR, vaspene))
建造星門
async def build_stargate(self):
if self.units(UnitTypeId.PYLON).ready.exists:
pylon = self.units(UnitTypeId.PYLON).ready.random
if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
if self.can_afford(UnitTypeId.STARGATE) and not self.already_pending(UnitTypeId.STARGATE):
await self.build(UnitTypeId.STARGATE, near=pylon)
建造水晶塔
async def build_pylon(self):
nexuses = self.units(UnitTypeId.NEXUS).ready
if nexuses.exists:
if self.can_afford(UnitTypeId.PYLON):
await self.build(UnitTypeId.PYLON, near=self.units(UnitTypeId.NEXUS).first.position.towards(self.game_info.map_center, 5))
擴張、開礦
async def expand(self):
try:
if self.can_afford(UnitTypeId.NEXUS):
await self.expand_now()
except Exception as e:
print(str(e))
四種進攻選擇
async def do_nothing(self):
wait = random.randrange(7, 100)/100
self.do_something_after = self.timeMinutes + wait
async def defend_nexus(self):
if len(self.known_enemy_units) > 0:
target = self.known_enemy_units.closest_to(random.choice(self.units(UnitTypeId.NEXUS)))
for u in self.units(UnitTypeId.VOIDRAY).idle:
await self.do(u.attack(target))
for u in self.units(UnitTypeId.STALKER).idle:
await self.do(u.attack(target))
for u in self.units(UnitTypeId.ZEALOT).idle:
await self.do(u.attack(target))
async def attack_known_enemy_structure(self):
if len(self.known_enemy_structures) > 0:
target = random.choice(self.known_enemy_structures)
for u in self.units(UnitTypeId.VOIDRAY).idle:
await self.do(u.attack(target))
for u in self.units(UnitTypeId.STALKER).idle:
await self.do(u.attack(target))
for u in self.units(UnitTypeId.ZEALOT).idle:
await self.do(u.attack(target))
async def attack_known_enemy_unit(self):
if len(self.known_enemy_units) > 0:
target = self.known_enemy_units.closest_to(random.choice(self.units(UnitTypeId.NEXUS)))
for u in self.units(UnitTypeId.VOIDRAY).idle:
await self.do(u.attack(target))
for u in self.units(UnitTypeId.STALKER).idle:
await self.do(u.attack(target))
for u in self.units(UnitTypeId.ZEALOT).idle:
await self.do(u.attack(target))
完整代碼如下
# -*- encoding: utf-8 -*-
'''
@File : demo2.py
@Modify Time @Author @Desciption
------------ ------- -----------
2019/12/13 10:58 Jonas None
'''
import sc2
from sc2 import run_game, maps, Race, Difficulty, Result
from sc2.player import Bot, Computer
from sc2 import position
from sc2.constants import *
import random
import cv2
import numpy as np
import os
import time
import math
import keras
HEADLESS = False
class SentdeBot(sc2.BotAI):
def __init__(self, use_model=False, title=1):
self.MAX_WORKERS = 50
self.do_something_after = 0
self.use_model = use_model
self.title = title
self.scouts_and_spots = {}
# ADDED THE CHOICES #
self.choices = {0: self.build_scout,
1: self.build_zealot,
2: self.build_gateway,
3: self.build_voidray,
4: self.build_stalker,
5: self.build_worker,
6: self.build_assimilator,
7: self.build_stargate,
8: self.build_pylon,
9: self.defend_nexus,
10: self.attack_known_enemy_unit,
11: self.attack_known_enemy_structure,
12: self.expand,
13: self.do_nothing,
}
self.train_data = []
if self.use_model:
print("USING MODEL!")
self.model = keras.models.load_model("BasicCNN-30-epochs-0.0001-LR-4.2")
def on_end(self, game_result):
print('--- on_end called ---')
print(game_result, self.use_model)
with open("gameout-random-vs-medium.txt","a") as f:
if self.use_model:
f.write("Model {} - {}\n".format(game_result, int(time.time())))
else:
f.write("Random {} - {}\n".format(game_result, int(time.time())))
async def on_step(self, iteration):
self.timeMinutes = (self.state.game_loop/22.4) / 60
#print('Time:',self.time)
await self.distribute_workers()
await self.scout()
await self.intel()
await self.do_something()
def random_location_variance(self, location):
x = location[0]
y = location[1]
# FIXED THIS
x += random.randrange(-5,5)
y += random.randrange(-5,5)
if x < 0:
print("x below")
x = 0
if y < 0:
print("y below")
y = 0
if x > self.game_info.map_size[0]:
print("x above")
x = self.game_info.map_size[0]
if y > self.game_info.map_size[1]:
print("y above")
y = self.game_info.map_size[1]
go_to = position.Point2(position.Pointlike((x,y)))
return go_to
async def scout(self):
self.expand_dis_dir = {}
for el in self.expansion_locations:
distance_to_enemy_start = el.distance_to(self.enemy_start_locations[0])
#print(distance_to_enemy_start)
self.expand_dis_dir[distance_to_enemy_start] = el
self.ordered_exp_distances = sorted(k for k in self.expand_dis_dir)
existing_ids = [unit.tag for unit in self.units]
# removing of scouts that are actually dead now.
to_be_removed = []
for noted_scout in self.scouts_and_spots:
if noted_scout not in existing_ids:
to_be_removed.append(noted_scout)
for scout in to_be_removed:
del self.scouts_and_spots[scout]
if len(self.units(UnitTypeId.ROBOTICSFACILITY).ready) == 0:
unit_type = UnitTypeId.PROBE
unit_limit = 1
else:
unit_type = UnitTypeId.OBSERVER
unit_limit = 15
assign_scout = True
if unit_type == UnitTypeId.PROBE:
for unit in self.units(UnitTypeId.PROBE):
if unit.tag in self.scouts_and_spots:
assign_scout = False
if assign_scout:
if len(self.units(unit_type).idle) > 0:
for obs in self.units(unit_type).idle[:unit_limit]:
if obs.tag not in self.scouts_and_spots:
for dist in self.ordered_exp_distances:
try:
location = next(value for key, value in self.expand_dis_dir.items() if key == dist)
# DICT {UNIT_ID:LOCATION}
active_locations = [self.scouts_and_spots[k] for k in self.scouts_and_spots]
if location not in active_locations:
if unit_type == UnitTypeId.PROBE:
for unit in self.units(UnitTypeId.PROBE):
if unit.tag in self.scouts_and_spots:
continue
await self.do(obs.move(location))
self.scouts_and_spots[obs.tag] = location
break
except Exception as e:
pass
for obs in self.units(unit_type):
if obs.tag in self.scouts_and_spots:
if obs in [probe for probe in self.units(UnitTypeId.PROBE)]:
await self.do(obs.move(self.random_location_variance(self.scouts_and_spots[obs.tag])))
async def intel(self):
game_data = np.zeros((self.game_info.map_size[1], self.game_info.map_size[0], 3), np.uint8)
draw_dict = {
UnitTypeId.NEXUS: [15, (0, 255, 0)],
UnitTypeId.PYLON: [3, (20, 235, 0)],
UnitTypeId.PROBE: [1, (55, 200, 0)],
UnitTypeId.ASSIMILATOR: [2, (55, 200, 0)],
UnitTypeId.GATEWAY: [3, (200, 100, 0)],
UnitTypeId.CYBERNETICSCORE: [3, (150, 150, 0)],
UnitTypeId.STARGATE: [5, (255, 0, 0)],
UnitTypeId.ROBOTICSFACILITY: [5, (215, 155, 0)],
#VOIDRAY: [3, (255, 100, 0)],
}
for unit_type in draw_dict:
for unit in self.units(unit_type).ready:
pos = unit.position
cv2.circle(game_data, (int(pos[0]), int(pos[1])), draw_dict[unit_type][0], draw_dict[unit_type][1], -1)
# from Александр Тимофеев via YT
main_base_names = ['nexus', 'commandcenter', 'orbitalcommand', 'planetaryfortress', 'hatchery']
for enemy_building in self.known_enemy_structures:
pos = enemy_building.position
if enemy_building.name.lower() not in main_base_names:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 5, (200, 50, 212), -1)
for enemy_building in self.known_enemy_structures:
pos = enemy_building.position
if enemy_building.name.lower() in main_base_names:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 15, (0, 0, 255), -1)
for enemy_unit in self.known_enemy_units:
if not enemy_unit.is_structure:
worker_names = ["probe",
"scv",
"drone"]
# if that unit is a PROBE, SCV, or DRONE... it's a worker
pos = enemy_unit.position
if enemy_unit.name.lower() in worker_names:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (55, 0, 155), -1)
else:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 3, (50, 0, 215), -1)
for obs in self.units(UnitTypeId.OBSERVER).ready:
pos = obs.position
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (255, 255, 255), -1)
for vr in self.units(UnitTypeId.VOIDRAY).ready:
pos = vr.position
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 3, (255, 100, 0), -1)
line_max = 50
mineral_ratio = self.minerals / 1500
if mineral_ratio > 1.0:
mineral_ratio = 1.0
vespene_ratio = self.vespene / 1500
if vespene_ratio > 1.0:
vespene_ratio = 1.0
population_ratio = self.supply_left / self.supply_cap
if population_ratio > 1.0:
population_ratio = 1.0
plausible_supply = self.supply_cap / 200.0
worker_weight = len(self.units(UnitTypeId.PROBE)) / (self.supply_cap-self.supply_left)
if worker_weight > 1.0:
worker_weight = 1.0
cv2.line(game_data, (0, 19), (int(line_max*worker_weight), 19), (250, 250, 200), 3) # worker/supply ratio
cv2.line(game_data, (0, 15), (int(line_max*plausible_supply), 15), (220, 200, 200), 3) # plausible supply (supply/200.0)
cv2.line(game_data, (0, 11), (int(line_max*population_ratio), 11), (150, 150, 150), 3) # population ratio (supply_left/supply)
cv2.line(game_data, (0, 7), (int(line_max*vespene_ratio), 7), (210, 200, 0), 3) # gas / 1500
cv2.line(game_data, (0, 3), (int(line_max*mineral_ratio), 3), (0, 255, 25), 3) # minerals minerals/1500
# flip horizontally to make our final fix in visual representation:
self.flipped = cv2.flip(game_data, 0)
resized = cv2.resize(self.flipped, dsize=None, fx=2, fy=2)
if not HEADLESS:
cv2.imshow(str(self.title), resized)
cv2.waitKey(1)
def find_target(self, state):
if len(self.known_enemy_units) > 0:
return random.choice(self.known_enemy_units)
elif len(self.known_enemy_structures) > 0:
return random.choice(self.known_enemy_structures)
else:
return self.enemy_start_locations[0]
async def build_scout(self):
for rf in self.units(UnitTypeId.ROBOTICSFACILITY).ready.noqueue:
print(len(self.units(UnitTypeId.OBSERVER)), self.timeMinutes/3)
if self.can_afford(UnitTypeId.OBSERVER) and self.supply_left > 0:
await self.do(rf.train(UnitTypeId.OBSERVER))
break
async def build_worker(self):
nexuses = self.units(UnitTypeId.NEXUS).ready
if nexuses.exists:
if self.can_afford(UnitTypeId.PROBE):
await self.do(random.choice(nexuses).train(UnitTypeId.PROBE))
async def build_zealot(self):
gateways = self.units(UnitTypeId.GATEWAY).ready
if gateways.exists:
if self.can_afford(UnitTypeId.ZEALOT):
await self.do(random.choice(gateways).train(UnitTypeId.ZEALOT))
async def build_gateway(self):
pylon = self.units(UnitTypeId.PYLON).ready.random
if self.can_afford(UnitTypeId.GATEWAY) and not self.already_pending(UnitTypeId.GATEWAY):
await self.build(UnitTypeId.GATEWAY, near=pylon)
async def build_voidray(self):
stargates = self.units(UnitTypeId.STARGATE).ready
if stargates.exists:
if self.can_afford(UnitTypeId.VOIDRAY):
await self.do(random.choice(stargates).train(UnitTypeId.VOIDRAY))
async def build_stalker(self):
pylon = self.units(UnitTypeId.PYLON).ready.random
gateways = self.units(UnitTypeId.GATEWAY).ready
cybernetics_cores = self.units(UnitTypeId.CYBERNETICSCORE).ready
if gateways.exists and cybernetics_cores.exists:
if self.can_afford(UnitTypeId.STALKER):
await self.do(random.choice(gateways).train(UnitTypeId.STALKER))
if not cybernetics_cores.exists:
if self.units(UnitTypeId.GATEWAY).ready.exists:
if self.can_afford(UnitTypeId.CYBERNETICSCORE) and not self.already_pending(UnitTypeId.CYBERNETICSCORE):
await self.build(UnitTypeId.CYBERNETICSCORE, near=pylon)
async def build_assimilator(self):
for nexus in self.units(UnitTypeId.NEXUS).ready:
vaspenes = self.state.vespene_geyser.closer_than(15.0, nexus)
for vaspene in vaspenes:
if not self.can_afford(UnitTypeId.ASSIMILATOR):
break
worker = self.select_build_worker(vaspene.position)
if worker is None:
break
if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0, vaspene).exists:
await self.do(worker.build(UnitTypeId.ASSIMILATOR, vaspene))
async def build_stargate(self):
if self.units(UnitTypeId.PYLON).ready.exists:
pylon = self.units(UnitTypeId.PYLON).ready.random
if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
if self.can_afford(UnitTypeId.STARGATE) and not self.already_pending(UnitTypeId.STARGATE):
await self.build(UnitTypeId.STARGATE, near=pylon)
async def build_pylon(self):
nexuses = self.units(UnitTypeId.NEXUS).ready
if nexuses.exists:
if self.can_afford(UnitTypeId.PYLON):
await self.build(UnitTypeId.PYLON, near=self.units(UnitTypeId.NEXUS).first.position.towards(self.game_info.map_center, 5))
async def expand(self):
try:
if self.can_afford(UnitTypeId.NEXUS):
await self.expand_now()
except Exception as e:
print(str(e))
async def do_nothing(self):
wait = random.randrange(7, 100)/100
self.do_something_after = self.timeMinutes + wait
async def defend_nexus(self):
if len(self.known_enemy_units) > 0:
target = self.known_enemy_units.closest_to(random.choice(self.units(UnitTypeId.NEXUS)))
for u in self.units(UnitTypeId.VOIDRAY).idle:
await self.do(u.attack(target))
for u in self.units(UnitTypeId.STALKER).idle:
await self.do(u.attack(target))
for u in self.units(UnitTypeId.ZEALOT).idle:
await self.do(u.attack(target))
async def attack_known_enemy_structure(self):
if len(self.known_enemy_structures) > 0:
target = random.choice(self.known_enemy_structures)
for u in self.units(UnitTypeId.VOIDRAY).idle:
await self.do(u.attack(target))
for u in self.units(UnitTypeId.STALKER).idle:
await self.do(u.attack(target))
for u in self.units(UnitTypeId.ZEALOT).idle:
await self.do(u.attack(target))
async def attack_known_enemy_unit(self):
if len(self.known_enemy_units) > 0:
target = self.known_enemy_units.closest_to(random.choice(self.units(UnitTypeId.NEXUS)))
for u in self.units(UnitTypeId.VOIDRAY).idle:
await self.do(u.attack(target))
for u in self.units(UnitTypeId.STALKER).idle:
await self.do(u.attack(target))
for u in self.units(UnitTypeId.ZEALOT).idle:
await self.do(u.attack(target))
async def do_something(self):
if self.timeMinutes > self.do_something_after:
if self.use_model:
prediction = self.model.predict([self.flipped.reshape([-1, 176, 200, 3])])
choice = np.argmax(prediction[0])
else:
choice = random.randrange(0, 14)
try:
await self.choices[choice]()
except Exception as e:
print(str(e))
###### NEW CHOICE HANDLING HERE #########
###### NEW CHOICE HANDLING HERE #########
y = np.zeros(14)
y[choice] = 1
self.train_data.append([y, self.flipped])
if True:
run_game(maps.get("AbyssalReefLE"), [
Bot(Race.Protoss, SentdeBot(use_model=False, title=1)),
Computer(Race.Terran, Difficulty.Medium),
], realtime=False)
運行結果如下
統計圖優化
將彩色改成黑白,突出顯示單位
建立三階矩陣
async def intel(self):
game_data = np.zeros((self.game_info.map_size[1], self.game_info.map_size[0], 3), np.uint8)
迭代當前單位,繪制圓圈,我方單位白色
for unit in self.units().ready:
pos = unit.position
cv2.circle(game_data, (int(pos[0]), int(pos[1])), int(unit.radius*8), (255, 255, 255), math.ceil(int(unit.radius*0.5)))
迭代敵人,繪制圓圈,敵方單位灰色,
for unit in self.known_enemy_units:
pos = unit.position
cv2.circle(game_data, (int(pos[0]), int(pos[1])), int(unit.radius*8), (125, 125, 125), math.ceil(int(unit.radius*0.5)))
繪制統計資源的直線
try:
line_max = 50
mineral_ratio = self.minerals / 1500
if mineral_ratio > 1.0:
mineral_ratio = 1.0
vespene_ratio = self.vespene / 1500
if vespene_ratio > 1.0:
vespene_ratio = 1.0
population_ratio = self.supply_left / self.supply_cap
if population_ratio > 1.0:
population_ratio = 1.0
plausible_supply = self.supply_cap / 200.0
worker_weight = len(self.units(PROBE)) / (self.supply_cap-self.supply_left)
if worker_weight > 1.0:
worker_weight = 1.0
cv2.line(game_data, (0, 19), (int(line_max*worker_weight), 19), (250, 250, 200), 3) # worker/supply ratio
cv2.line(game_data, (0, 15), (int(line_max*plausible_supply), 15), (220, 200, 200), 3) # plausible supply (supply/200.0)
cv2.line(game_data, (0, 11), (int(line_max*population_ratio), 11), (150, 150, 150), 3) # population ratio (supply_left/supply)
cv2.line(game_data, (0, 7), (int(line_max*vespene_ratio), 7), (210, 200, 0), 3) # gas / 1500
cv2.line(game_data, (0, 3), (int(line_max*mineral_ratio), 3), (0, 255, 25), 3) # minerals minerals/1500
except Exception as e:
print(str(e))
resize和flipp
# flip horizontally to make our final fix in visual representation:
grayed = cv2.cvtColor(game_data, cv2.COLOR_BGR2GRAY)
self.flipped = cv2.flip(grayed, 0)
resized = cv2.resize(self.flipped, dsize=None, fx=2, fy=2)
if not HEADLESS:
cv2.imshow(str(self.title), resized)
cv2.waitKey(1)
在do_something方法可以手動設置單位建造的權重
async def do_something(self):
if self.time > self.do_something_after:
if self.use_model:
prediction = self.model.predict([self.flipped.reshape([-1, 176, 200, 3])])
choice = np.argmax(prediction[0])
else:
worker_weight = 8
zealot_weight = 3
voidray_weight = 20
stalker_weight = 8
pylon_weight = 5
stargate_weight = 5
gateway_weight = 3
choice_weights = 1*[0]+zealot_weight*[1]+gateway_weight*[2]+voidray_weight*[3]+stalker_weight*[4]+worker_weight*[5]+1*[6]+stargate_weight*[7]+pylon_weight*[8]+1*[9]+1*[10]+1*[11]+1*[12]+1*[13]
choice = random.choice(choice_weights)
try:
await self.choices[choice]()
except Exception as e:
print(str(e))
y = np.zeros(14)
y[choice] = 1
self.train_data.append([y, self.flipped])
至此,完整代碼如下:
import sc2
from sc2 import run_game, maps, Race, Difficulty, Result
from sc2.player import Bot, Computer
from sc2 import position
from sc2.constants import *
import random
import cv2
import numpy as np
import os
import time
import math
import keras
# os.environ["SC2PATH"] = '/starcraftstuff/StarCraftII/'
HEADLESS = True
class SentdeBot(sc2.BotAI):
def __init__(self, use_model=False, title=1):
self.MAX_WORKERS = 50
self.do_something_after = 0
self.use_model = use_model
self.title = title
# DICT {UNIT_ID:LOCATION}
# every iteration, make sure that unit id still exists!
self.scouts_and_spots = {}
# ADDED THE CHOICES #
self.choices = {0: self.build_scout,
1: self.build_zealot,
2: self.build_gateway,
3: self.build_voidray,
4: self.build_stalker,
5: self.build_worker,
6: self.build_assimilator,
7: self.build_stargate,
8: self.build_pylon,
9: self.defend_nexus,
10: self.attack_known_enemy_unit,
11: self.attack_known_enemy_structure,
12: self.expand, # might just be self.expand_now() lol
13: self.do_nothing,
}
self.train_data = []
if self.use_model:
print("USING MODEL!")
self.model = keras.models.load_model("BasicCNN-30-epochs-0.0001-LR-4.2")
def on_end(self, game_result):
print('--- on_end called ---')
print(game_result, self.use_model)
#if self.timeMinutes < 17:
if game_result == Result.Victory:
np.save("train_data/{}.npy".format(str(int(time.time()))), np.array(self.train_data))
async def on_step(self, iteration):
self.timeMinutes = (self.state.game_loop/22.4) / 60
print('Time:',self.timeMinutes)
if iteration % 5 == 0:
await self.distribute_workers()
await self.scout()
await self.intel()
await self.do_something()
def random_location_variance(self, location):
x = location[0]
y = location[1]
# FIXED THIS
x += random.randrange(-5,5)
y += random.randrange(-5,5)
if x < 0:
print("x below")
x = 0
if y < 0:
print("y below")
y = 0
if x > self.game_info.map_size[0]:
print("x above")
x = self.game_info.map_size[0]
if y > self.game_info.map_size[1]:
print("y above")
y = self.game_info.map_size[1]
go_to = position.Point2(position.Pointlike((x,y)))
return go_to
async def scout(self):
self.expand_dis_dir = {}
for el in self.expansion_locations:
distance_to_enemy_start = el.distance_to(self.enemy_start_locations[0])
#print(distance_to_enemy_start)
self.expand_dis_dir[distance_to_enemy_start] = el
self.ordered_exp_distances = sorted(k for k in self.expand_dis_dir)
existing_ids = [unit.tag for unit in self.units]
# removing of scouts that are actually dead now.
to_be_removed = []
for noted_scout in self.scouts_and_spots:
if noted_scout not in existing_ids:
to_be_removed.append(noted_scout)
for scout in to_be_removed:
del self.scouts_and_spots[scout]
if len(self.units(UnitTypeId.ROBOTICSFACILITY).ready) == 0:
unit_type = UnitTypeId.PROBE
unit_limit = 1
else:
unit_type = UnitTypeId.OBSERVER
unit_limit = 15
assign_scout = True
if unit_type == UnitTypeId.PROBE:
for unit in self.units(UnitTypeId.PROBE):
if unit.tag in self.scouts_and_spots:
assign_scout = False
if assign_scout:
if len(self.units(unit_type).idle) > 0:
for obs in self.units(unit_type).idle[:unit_limit]:
if obs.tag not in self.scouts_and_spots:
for dist in self.ordered_exp_distances:
try:
location = next(value for key, value in self.expand_dis_dir.items() if key == dist)
# DICT {UNIT_ID:LOCATION}
active_locations = [self.scouts_and_spots[k] for k in self.scouts_and_spots]
if location not in active_locations:
if unit_type == UnitTypeId.PROBE:
for unit in self.units(UnitTypeId.PROBE):
if unit.tag in self.scouts_and_spots:
continue
await self.do(obs.move(location))
self.scouts_and_spots[obs.tag] = location
break
except Exception as e:
pass
for obs in self.units(unit_type):
if obs.tag in self.scouts_and_spots:
if obs in [probe for probe in self.units(UnitTypeId.PROBE)]:
await self.do(obs.move(self.random_location_variance(self.scouts_and_spots[obs.tag])))
async def intel(self):
game_data = np.zeros((self.game_info.map_size[1], self.game_info.map_size[0], 3), np.uint8)
for unit in self.units().ready:
pos = unit.position
cv2.circle(game_data, (int(pos[0]), int(pos[1])), int(unit.radius*8), (255, 255, 255), math.ceil(int(unit.radius*0.5)))
for unit in self.known_enemy_units:
pos = unit.position
cv2.circle(game_data, (int(pos[0]), int(pos[1])), int(unit.radius*8), (125, 125, 125), math.ceil(int(unit.radius*0.5)))
try:
line_max = 50
mineral_ratio = self.minerals / 1500
if mineral_ratio > 1.0:
mineral_ratio = 1.0
vespene_ratio = self.vespene / 1500
if vespene_ratio > 1.0:
vespene_ratio = 1.0
population_ratio = self.supply_left / self.supply_cap
if population_ratio > 1.0:
population_ratio = 1.0
plausible_supply = self.supply_cap / 200.0
worker_weight = len(self.units(UnitTypeId.PROBE)) / (self.supply_cap-self.supply_left)
if worker_weight > 1.0:
worker_weight = 1.0
cv2.line(game_data, (0, 19), (int(line_max*worker_weight), 19), (250, 250, 200), 3) # worker/supply ratio
cv2.line(game_data, (0, 15), (int(line_max*plausible_supply), 15), (220, 200, 200), 3) # plausible supply (supply/200.0)
cv2.line(game_data, (0, 11), (int(line_max*population_ratio), 11), (150, 150, 150), 3) # population ratio (supply_left/supply)
cv2.line(game_data, (0, 7), (int(line_max*vespene_ratio), 7), (210, 200, 0), 3) # gas / 1500
cv2.line(game_data, (0, 3), (int(line_max*mineral_ratio), 3), (0, 255, 25), 3) # minerals minerals/1500
except Exception as e:
print(str(e))
# flip horizontally to make our final fix in visual representation:
grayed = cv2.cvtColor(game_data, cv2.COLOR_BGR2GRAY)
self.flipped = cv2.flip(grayed, 0)
resized = cv2.resize(self.flipped, dsize=None, fx=2, fy=2)
if not HEADLESS:
if self.use_model:
cv2.imshow(str(self.title), resized)
cv2.waitKey(1)
else:
cv2.imshow(str(self.title), resized)
cv2.waitKey(1)
def find_target(self, state):
if len(self.known_enemy_units) > 0:
return random.choice(self.known_enemy_units)
elif len(self.known_enemy_structures) > 0:
return random.choice(self.known_enemy_structures)
else:
return self.enemy_start_locations[0]
async def build_scout(self):
for rf in self.units(UnitTypeId.ROBOTICSFACILITY).ready.noqueue:
print(len(self.units(UnitTypeId.OBSERVER)), self.timeMinutes/3)
if self.can_afford(UnitTypeId.OBSERVER) and self.supply_left > 0:
await self.do(rf.train(UnitTypeId.OBSERVER))
break
if len(self.units(UnitTypeId.ROBOTICSFACILITY)) == 0:
pylon = self.units(UnitTypeId.PYLON).ready.noqueue.random
if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
if self.can_afford(UnitTypeId.ROBOTICSFACILITY) and not self.already_pending(UnitTypeId.ROBOTICSFACILITY):
await self.build(UnitTypeId.ROBOTICSFACILITY, near=pylon)
async def build_worker(self):
nexuses = self.units(UnitTypeId.NEXUS).ready.noqueue
if nexuses.exists:
if self.can_afford(UnitTypeId.PROBE):
await self.do(random.choice(nexuses).train(UnitTypeId.PROBE))
async def build_zealot(self):
#if len(self.units(ZEALOT)) < (8 - self.timeMinutes): # how we can phase out zealots over time?
gateways = self.units(UnitTypeId.GATEWAY).ready.noqueue
if gateways.exists:
if self.can_afford(UnitTypeId.ZEALOT):
await self.do(random.choice(gateways).train(UnitTypeId.ZEALOT))
async def build_gateway(self):
#if len(self.units(GATEWAY)) < 5:
pylon = self.units(UnitTypeId.PYLON).ready.noqueue.random
if self.can_afford(UnitTypeId.GATEWAY) and not self.already_pending(UnitTypeId.GATEWAY):
await self.build(UnitTypeId.GATEWAY, near=pylon.position.towards(self.game_info.map_center, 5))
async def build_voidray(self):
stargates = self.units(UnitTypeId.STARGATE).ready.noqueue
if stargates.exists:
if self.can_afford(UnitTypeId.VOIDRAY):
await self.do(random.choice(stargates).train(UnitTypeId.VOIDRAY))
async def build_stalker(self):
pylon = self.units(UnitTypeId.PYLON).ready.noqueue.random
gateways = self.units(UnitTypeId.GATEWAY).ready
cybernetics_cores = self.units(UnitTypeId.CYBERNETICSCORE).ready
if gateways.exists and cybernetics_cores.exists:
if self.can_afford(UnitTypeId.STALKER):
await self.do(random.choice(gateways).train(UnitTypeId.STALKER))
if not cybernetics_cores.exists:
if self.units(UnitTypeId.GATEWAY).ready.exists:
if self.can_afford(UnitTypeId.CYBERNETICSCORE) and not self.already_pending(UnitTypeId.CYBERNETICSCORE):
await self.build(UnitTypeId.CYBERNETICSCORE, near=pylon.position.towards(self.game_info.map_center, 5))
async def build_assimilator(self):
for nexus in self.units(UnitTypeId.NEXUS).ready:
vaspenes = self.state.vespene_geyser.closer_than(15.0, nexus)
for vaspene in vaspenes:
if not self.can_afford(UnitTypeId.ASSIMILATOR):
break
worker = self.select_build_worker(vaspene.position)
if worker is None:
break
if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0, vaspene).exists:
await self.do(worker.build(UnitTypeId.ASSIMILATOR, vaspene))
async def build_stargate(self):
if self.units(UnitTypeId.PYLON).ready.exists:
pylon = self.units(UnitTypeId.PYLON).ready.random
if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
if self.can_afford(UnitTypeId.STARGATE) and not self.already_pending(UnitTypeId.STARGATE):
await self.build(UnitTypeId.STARGATE, near=pylon.position.towards(self.game_info.map_center, 5))
async def build_pylon(self):
nexuses = self.units(UnitTypeId.NEXUS).ready
if nexuses.exists:
if self.can_afford(UnitTypeId.PYLON) and not self.already_pending(UnitTypeId.PYLON):
await self.build(UnitTypeId.PYLON, near=self.units(UnitTypeId.NEXUS).first.position.towards(self.game_info.map_center, 5))
async def expand(self):
try:
if self.can_afford(UnitTypeId.NEXUS) and len(self.units(UnitTypeId.NEXUS)) < 3:
await self.expand_now()
except Exception as e:
print(str(e))
async def do_nothing(self):
wait = random.randrange(7, 100)/100
self.do_something_after = self.timeMinutes + wait
async def defend_nexus(self):
if len(self.known_enemy_units) > 0:
target = self.known_enemy_units.closest_to(random.choice(self.units(UnitTypeId.NEXUS)))
for u in self.units(UnitTypeId.VOIDRAY).idle:
await self.do(u.attack(target))
for u in self.units(UnitTypeId.STALKER).idle:
await self.do(u.attack(target))
for u in self.units(UnitTypeId.ZEALOT).idle:
await self.do(u.attack(target))
async def attack_known_enemy_structure(self):
if len(self.known_enemy_structures) > 0:
target = random.choice(self.known_enemy_structures)
for u in self.units(UnitTypeId.VOIDRAY).idle:
await self.do(u.attack(target))
for u in self.units(UnitTypeId.STALKER).idle:
await self.do(u.attack(target))
for u in self.units(UnitTypeId.ZEALOT).idle:
await self.do(u.attack(target))
async def attack_known_enemy_unit(self):
if len(self.known_enemy_units) > 0:
target = self.known_enemy_units.closest_to(random.choice(self.units(UnitTypeId.NEXUS)))
for u in self.units(UnitTypeId.VOIDRAY).idle:
await self.do(u.attack(target))
for u in self.units(UnitTypeId.STALKER).idle:
await self.do(u.attack(target))
for u in self.units(UnitTypeId.ZEALOT).idle:
await self.do(u.attack(target))
async def do_something(self):
if self.timeMinutes > self.do_something_after:
if self.use_model:
prediction = self.model.predict([self.flipped.reshape([-1, 176, 200, 3])])
choice = np.argmax(prediction[0])
else:
worker_weight = 8
zealot_weight = 3
voidray_weight = 20
stalker_weight = 8
pylon_weight = 5
stargate_weight = 5
gateway_weight = 3
choice_weights = 1*[0]+zealot_weight*[1]+gateway_weight*[2]+voidray_weight*[3]+stalker_weight*[4]+worker_weight*[5]+1*[6]+stargate_weight*[7]+pylon_weight*[8]+1*[9]+1*[10]+1*[11]+1*[12]+1*[13]
choice = random.choice(choice_weights)
try:
await self.choices[choice]()
except Exception as e:
print(str(e))
y = np.zeros(14)
y[choice] = 1
self.train_data.append([y, self.flipped])
while True:
run_game(maps.get("AbyssalReefLE"), [
Bot(Race.Protoss, SentdeBot()),
#Bot(Race.Protoss, SentdeBot()),
Computer(Race.Protoss, Difficulty.Easy)
], realtime=False)
運行結果如下