基於樹莓派+傳感器+阿里雲IoT的智能家居管理(代碼實現)


視頻教程已經放在B站
請大家狠狠地三連我
雖然我沒有稚暉君那么強

[video(video-1y2sFBXw-1623142522346)(type-bilibili)(url-https://player.bilibili.com/player.html?aid=716086471)(image-https://ss.csdn.net/p?http://i0.hdslb.com/bfs/archive/c292aeda52dff181453ea0f9da809fd5d38fece8.jpg)(title-教程!基於樹莓派+傳感器+阿里雲IoT的智能家居管理(2))]

主文件


#!/usr/bin/python3

import aliLink,mqttd,rpi
import time,json
import Adafruit_DHT
import time
import LCD1602
import flame_sensor
import buzzer_1
import rain_detector
import gas_sensor
import relay
from threading import Thread

pin = 19  # DHT11 溫濕度傳感器管腳定義
Buzzer = 20    # 有源蜂鳴器管腳定義

# GPIO口定義
sensor = Adafruit_DHT.DHT11




# 三元素(iot后台獲取)
ProductKey = 'a11lzCDSgZP'
DeviceName = 'IU6aSETyiImFPSkpcywm'
DeviceSecret = "2551eb5f630c372743c538e9b87bfe6d"
# topic (iot后台獲取)
POST = '/sys/a11lzCDSgZP/IU6aSETyiImFPSkpcywm/thing/event/property/post'  # 上報消息到雲
POST_REPLY = '/sys/a11lzCDSgZP/IU6aSETyiImFPSkpcywm/thing/event/property/post_reply'
SET = '/sys/a11lzCDSgZP/IU6aSETyiImFPSkpcywm/thing/service/property/set'  # 訂閱雲端指令


#窗戶開關
window = 0
window_status = 0
Thread(target=relay.close).start()

# 消息回調(雲端下發消息的回調函數)
def on_message(client, userdata, msg):
    #print(msg.payload)
    Msg = json.loads(msg.payload)

    global window,window_status
    window = Msg['params']['window']
    print(msg.payload)  # 開關值
    if window_status != window:
        window_status = window
        if window == 1:
            Thread(target=relay.open).start()
        else:
            Thread(target=relay.close).start()



#連接回調(與阿里雲建立鏈接后的回調函數)
def on_connect(client, userdata, flags, rc):
    pass



# 鏈接信息
Server,ClientId,userNmae,Password = aliLink.linkiot(DeviceName,ProductKey,DeviceSecret)

# mqtt鏈接
mqtt = mqttd.MQTT(Server,ClientId,userNmae,Password)
mqtt.subscribe(SET) # 訂閱服務器下發消息topic
mqtt.begin(on_message,on_connect)




# 信息獲取上報,每2秒鍾上報一次系統參數
while True:
    #獲取指示燈狀態
    power_stats=int(rpi.getLed())
    if(power_stats == 0):
        power_LED = 0
    else:
        power_LED = 1

    # CPU 信息
    CPU_temp = float(rpi.getCPUtemperature())  # 溫度   ℃
    CPU_usage = float(rpi.getCPUuse())         # 占用率 %

    # RAM 信息
    RAM_stats =rpi.getRAMinfo()
    RAM_total =round(int(RAM_stats[0]) /1000,1)    #
    RAM_used =round(int(RAM_stats[1]) /1000,1)
    RAM_free =round(int(RAM_stats[2]) /1000,1)

    # Disk 信息
    DISK_stats =rpi.getDiskSpace()
    DISK_total = float(DISK_stats[0][:-1])
    DISK_used = float(DISK_stats[1][:-1])
    DISK_perc = float(DISK_stats[3][:-1])

    #溫度,濕度
    humidity, temperature = Adafruit_DHT.read_retry(sensor, pin)

    # LCD顯示
    LCD = 0
    try:
        LCD1602.init(0x27, 1)          # 初始化顯示屏
        LCD1602.write(0, 0, 'humidity:    ' + str(int(humidity)) + '%')
        LCD1602.write(0, 1, 'temperature: ' + str(int(temperature)) + '\'')  # 在第二行顯示world!
        LCD = 1
    except:
        print("顯示屏連接不穩定,請檢查")
        LCD = 0

    # 蜂鳴器
    buzzer = 0

    # 火焰傳感器
    flame_sensor.setup()
    if flame_sensor.fire() == 0:
        flame = 1
        buzzer_1.buzzer_on() #讓鈴聲叫
        buzzer = 1
    else:
        flame = 0
        buzzer_1.buzzer_off() #鈴聲不叫
        buzzer = 0

    # 煙霧傳感器
    gas = gas_sensor.gas()

    # 雨滴傳感器
    rain_detector.setup()
    if rain_detector.rain() == 0:
        rain = 1
    else:
        rain = 0

    # 構建與雲端模型一致的消息結構
    updateMsn = {
        'cpu_temperature':CPU_temp,
        'cpu_usage':CPU_usage,
        'RAM_total':RAM_total,
        'RAM_used':RAM_used,
        'RAM_free':RAM_free,
        'DISK_total':DISK_total,
        'DISK_used_space':DISK_used,
        'DISK_used_percentage':DISK_perc,
        'PowerLed':power_LED,
        'temperature':temperature,
        'humidity':humidity,
        'window':window,
        'LCD':LCD,
        'buzzer':buzzer,
        'flame':flame,
        'rain':rain,
        'gas':gas
    }
    JsonUpdataMsn = aliLink.Alink(updateMsn)
    print(JsonUpdataMsn)

    mqtt.push(POST,JsonUpdataMsn) # 定時向阿里雲IOT推送我們構建好的Alink協議數據

    time.sleep(3)

rpi

# 樹莓派數據與控制

import os
# Return CPU temperature as a character string                                     

def getCPUtemperature():
    res =os.popen('vcgencmd measure_temp').readline()
    return(res.replace("temp=","").replace("'C\n",""))
 
# Return RAM information (unit=kb) in a list                                      
# Index 0: total RAM                                                              
# Index 1: used RAM                                                                
# Index 2: free RAM                                                                
def getRAMinfo():
    p =os.popen('free')
    i =0
    while 1:
        i =i +1
        line =p.readline()
        if i==2:
            return(line.split()[1:4])
 
# Return % of CPU used by user as a character string                               
def getCPUuse():
    data = os.popen("top -n1 | awk '/Cpu\(s\):/ {print $2}'").readline().strip()
    return(data)
 
# Return information about disk space as a list (unit included)                    
# Index 0: total disk space                                                        
# Index 1: used disk space                                                        
# Index 2: remaining disk space                                                    
# Index 3: percentage of disk used                                                 
def getDiskSpace():
    p =os.popen("df -h /")
    i =0
    while True:
        i =i +1
        line =p.readline()
        if i==2:
            return(line.split()[1:5])
def  powerLed(swatch):
    led = open('/sys/class/leds/led1/brightness', 'w', 1)
    led.write(str(swatch))
    led.close()

# LED燈狀態檢測
def getLed():
	led = open('/sys/class/leds/led1/brightness', 'r', 1)
	state=led.read()
	led.close()
	return state
    
if __name__ == "__main__":
 
    # CPU informatiom
    CPU_temp =getCPUtemperature()
    CPU_usage =getCPUuse()
    print(CPU_usage)
    # RAM information
    # Output is in kb, here I convert it in Mb for readability
    RAM_stats =getRAMinfo()

    RAM_total = round(int(RAM_stats[0]) /1000,1)
    RAM_used = round(int(RAM_stats[1]) /1000,1)
    RAM_free = round(int(RAM_stats[2]) /1000,1)
    print(RAM_total,RAM_used,RAM_free)
    # Disk information
    DISK_stats =getDiskSpace()

    DISK_total = DISK_stats[0][:-1]
    DISK_used = DISK_stats[1][:-1]
    DISK_perc = DISK_stats[3][:-1]
    print(DISK_total,DISK_used,DISK_perc)

繼電器

import RPi.GPIO as GPIO
import time
import asyncio

GPIO.setmode(GPIO.BCM)              # 管腳映射,采用BCM編碼
GPIO.setwarnings(False)             # 忽略GPIO 警告

CH1 = 17
CH2 = 16 #繼電器輸入信號管腳


GPIO.setup(CH1,GPIO.OUT)
GPIO.setup(CH2,GPIO.OUT)

def open():
    GPIO.output(CH1, GPIO.LOW)
    time.sleep(10)
    GPIO.output(CH1, GPIO.HIGH)

def close():
    GPIO.output(CH2, GPIO.LOW)
    time.sleep(10)
    GPIO.output(CH2, GPIO.HIGH)

阿里雲連接

import time,json,random
import hmac,hashlib

def linkiot(DeviceName,ProductKey,DeviceSecret,server = 'iot-as-mqtt.cn-shanghai.aliyuncs.com'):
    serverUrl = server
    ClientIdSuffix = "|securemode=3,signmethod=hmacsha256,timestamp="

    # 拼合
    Times = str(int(time.time()))  # 獲取登錄時間戳
    Server = ProductKey+'.'+serverUrl    # 服務器地址
    ClientId = DeviceName + ClientIdSuffix + Times +'|'  # ClientId
    userNmae = DeviceName + "&" + ProductKey
    PasswdClear = "clientId" + DeviceName + "deviceName" + DeviceName +"productKey"+ProductKey + "timestamp" + Times  # 明文密碼

    # 加密
    h = hmac.new(bytes(DeviceSecret,encoding= 'UTF-8'),digestmod=hashlib.sha256)  # 使用密鑰
    h.update(bytes(PasswdClear,encoding = 'UTF-8'))
    Passwd = h.hexdigest()
    return Server,ClientId,userNmae,Passwd

# 阿里Alink協議實現(字典傳入,json str返回)
def Alink(params):
    AlinkJson = {}
    AlinkJson["id"] = random.randint(0,999999)
    AlinkJson["version"] = "1.0"
    AlinkJson["params"] = params
    AlinkJson["method"] = "thing.event.property.post"
    return json.dumps(AlinkJson)

if __name__ == "__main__":
    pass

蜂鳴器

import RPi.GPIO as GPIO
import time

BuzzerPin = 20    # 有源蜂鳴器管腳定義

# GPIO設置函數
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)                       # 關閉GPIO警告提示
GPIO.setup(BuzzerPin, GPIO.OUT)     # 設置有源蜂鳴器管腳為輸出模式
GPIO.output(BuzzerPin, GPIO.HIGH)   # 蜂鳴器設置為高電平,關閉蜂鳥器

#  打開蜂鳴器
def buzzer_on():
	GPIO.output(BuzzerPin, GPIO.LOW)  # 蜂鳴器為低電平觸發,所以使能蜂鳴器讓其發聲
# 關閉蜂鳴器
def buzzer_off():
	GPIO.output(BuzzerPin, GPIO.HIGH) # 蜂鳴器設置為高電平,關閉蜂鳥器

# 控制蜂鳴器鳴叫
def beep(x):
	buzzer_on()     # 打開蜂鳴器控制
	time.sleep(x)            # 延時時間
	buzzer_off()    # 關閉蜂鳴器控制
	time.sleep(x)            # 延時時間

# 循環函數
def loop():
	while True:
		beep(1) # 控制蜂鳴器鳴叫,延時時間為500mm

def destroy():
	GPIO.output(BuzzerPin, GPIO.HIGH) # 關閉蜂鳴器鳴叫
	GPIO.cleanup()                     # 釋放資源

# 程序入口
if __name__ == '__main__':
	try:                            # 檢測異常
		loop()                      # 調用循環函數
	except KeyboardInterrupt:  # 當按下Ctrl+C時,將執行destroy()子程序。
		destroy()              # 釋放資源

火焰傳感器

import PCF8591 as ADC
import RPi.GPIO as GPIO
import time
import math

DO = 26  # 火焰傳感器數字IO口
GPIO.setmode(GPIO.BCM) # 管腳映射,采用BCM編碼

# 初始化工作
def setup():
	ADC.setup(0x48)    # 設置PCF8591模塊地址
	GPIO.setup(DO, GPIO.IN) # 設置火焰傳感器數字IO口為輸入模式

# 打印信息,打印出火焰傳感器的狀態值
def Print(x):
	if x == 1:      # 安全
		print ('')
		print ('   *******************')
		print ('   *  Makerobo Safe~ *')
		print ('   *******************')
		print ('')
	if x == 0:     # 有火焰
		print ('')
		print ('   ******************')
		print ('   * Makerobo Fire! *')
		print ('   ******************')
		print ('')

# 功能函數
def fire():
    status = 1      # 狀態值
    # 讀取火焰傳感器數字IO口
    return GPIO.input(DO)

# 程序入口
if __name__ == '__main__':
	try:
		setup() # 初始化
		fire()
	except KeyboardInterrupt:
		pass	

煙霧傳感器

import PCF8591 as ADC
import RPi.GPIO as GPIO
import time
import math

DO = 18                    # 煙霧傳感器數字IO口
GPIO.setmode(GPIO.BCM)              # 管腳映射,采用BCM編碼
GPIO.setwarnings(False)             # 忽略GPIO 警告

# 初始化工作
def setup():
	ADC.setup(0x48)                      # 設置PCF8591模塊地址
	GPIO.setup	(DO,GPIO.IN)    # 煙霧傳感器數字IO口,設置為輸入模式

# 打印信息,打印出是否檢測到煙霧信息
def Print(x):
	if x == 1:     # 安全
		print ('')
		print ('   ******************')
		print ('   * Makerobo Safe~ *')
		print ('   ******************')
		print ('')
	if x == 0:    # 檢測到煙霧
		print ('')
		print ('   ************************')
		print ('   * Makerobo Danger Gas! *')
		print ('   ************************')
		print ('')

# 循環函數
def gas():
    setup()
    return GPIO.input(DO)  # 讀取GAS煙霧傳感器數字IO口值



# 程序入口
if __name__ == '__main__':
	setup()   # 初始化函數
	loop()    # 循環函數

LCD1602

import time
import smbus

BUS = smbus.SMBus(1)

# IIC LCD1602 液晶模塊寫入字
def write_word(addr, data):
	global BLEN
	temp = data
	if BLEN == 1:
		temp |= 0x08
	else:
		temp &= 0xF7
	BUS.write_byte(addr ,temp) # 設置IIC LCD1602 液晶模塊地址

# IIC LCD1602 發送命令
def  send_command(comm):
	# 首先發送 bit7-4 位
	lcd_buf = comm & 0xF0
	lcd_buf |= 0x04               # RS = 0, RW = 0, EN = 1
	write_word(LCD_ADDR ,lcd_buf)
	time.sleep(0.002)
	lcd_buf &= 0xFB               # Make EN = 0
	write_word(LCD_ADDR ,lcd_buf)

	# 其次發送 bit3-0 位
	lcd_buf = (comm & 0x0F) << 4
	lcd_buf |= 0x04               # RS = 0, RW = 0, EN = 1
	write_word(LCD_ADDR ,lcd_buf)
	time.sleep(0.002)
	lcd_buf &= 0xFB               # Make EN = 0
	write_word(LCD_ADDR ,lcd_buf)

def send_data(data):
	# 首先發送 bit7-4 位
	lcd_buf = data & 0xF0
	lcd_buf |= 0x05               # RS = 1, RW = 0, EN = 1
	write_word(LCD_ADDR ,lcd_buf)
	time.sleep(0.002)
	lcd_buf &= 0xFB               # Make EN = 0
	write_word(LCD_ADDR ,lcd_buf)

	# 其次發送 bit3-0 位
	lcd_buf = (data & 0x0F) << 4
	lcd_buf |= 0x05               # RS = 1, RW = 0, EN = 1
	write_word(LCD_ADDR ,lcd_buf)
	time.sleep(0.002)
	lcd_buf &= 0xFB               # Make EN = 0
	write_word(LCD_ADDR ,lcd_buf)

# IIC LCD1602 初始化
def init(addr, bl):
	global LCD_ADDR
	global BLEN
	LCD_ADDR = addr
	BLEN = bl
	try:
		send_command(0x33) # 必須先初始化到8線模式
		time.sleep(0.005)
		send_command(0x32) # 然后初始化為4行模式
		time.sleep(0.005)
		send_command(0x28) # 2 行 & 5*7 點位
		time.sleep(0.005)
		send_command(0x0C) # 啟用無光標顯示
		time.sleep(0.005)
		send_command(0x01) # 清除顯示
		BUS.write_byte(LCD_ADDR, 0x08)
	except:
		return False
	else:
		return True

# LCD 1602 清空顯示函數
def clear():
	send_command(0x01)  # 清除顯示

# LCD 1602 使能背光顯示
def openlight():
	BUS.write_byte(0x27,0x08)  # 使能背光顯示命令
	BUS.close()                # 關閉總線

# LCD 1602 顯示函數
def write(lcd_x, lcd_y, lcd_str):
	# 選擇行與列
	if lcd_x < 0:
		lcd_x = 0
	if lcd_x > 15:
		lcd_x = 15
	if lcd_y <0:
		lcd_y = 0
	if lcd_y > 1:
		lcd_y = 1

	# 移動光標
	lcd_addr = 0x80 + 0x40 * lcd_y + lcd_x
	send_command(lcd_addr)    # 發送地址

	for chr in lcd_str:                  # 獲取字符串長度
		send_data(ord(chr)) # 發送顯示

# 程序入口
if __name__ == '__main__':
	init(0x27, 1)          # 初始化顯示屏
	write(0, 0, 'Hello')   # 在第一行顯示Hello
	write(0, 1, 'world!')  # 在第二行顯示world!

mqtt

#!/usr/bin/python3

# pip install paho-mqtt
import paho.mqtt.client

# =====初始化======
class MQTT():
    def __init__(self,host,CcientID,username=None,password=None,port=1883,timeOut=60):
        self.Host = host
        self.Port = port
        self.timeOut = timeOut
        self.username =username
        self.password = password
        self.CcientID = CcientID

        self.mqttc = paho.mqtt.client.Client(self.CcientID)    #配置ID
        if self.username is not None:    #判斷用戶名密碼是否為空
            self.mqttc.username_pw_set(self.username, self.password)    #不為空則配置賬號密碼

        self.mqttc.connect(self.Host, self.Port, self.timeOut) #初始化服務器  IP  端口  超時時間


    # 初始化
    def begin(self,message,connect):
        self.mqttc.on_connect = connect
        self.mqttc.on_message = message
        self.mqttc.loop_start()  # 后台新進程循環監聽

# =====發送消息==========
    def push(self,tag,date,_Qos = 0):
        self.mqttc.publish(tag,date,_Qos)
        #print('OK',date)

# =======訂閱tips=====
    def subscribe(self,_tag):
        self.mqttc.subscribe(_tag)   #監聽標簽

PCF8591數模轉化模塊

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 說明:這是一個PCF8591模塊的程序。
#      警告:模擬輸入不能超過3.3V!
# 在這個程序中,我們使用電位計進行模擬輸入和控制一個模擬電壓
# 的LED燈,你可以導入這個程序到另一個程序中使用:
# import PCF8591 as ADC
# ADC.Setup(Address)  # 通過 sudo i2cdetect -y -1 可以獲取到IIC的地址
# ADC.read(channal)	# 通道選擇范圍為0-3
# ADC.write(Value)	# 值的范圍為:0-255
#####################################################
import smbus
import time

# 對應比較舊的版本如RPI V1 版本,則 "bus = smbus.SMBus(0)"
bus = smbus.SMBus(1)

#通過 sudo i2cdetect -y -1 可以獲取到IIC的地址
def setup(Addr):
	global address
	address = Addr

# 讀取模擬量信息
def read(chn): #通道選擇,范圍是0-3之間
	try:
		if chn == 0:
			bus.write_byte(address,0x40)
		if chn == 1:
			bus.write_byte(address,0x41)
		if chn == 2:
			bus.write_byte(address,0x42)
		if chn == 3:
			bus.write_byte(address,0x43)
		bus.read_byte(address) # 開始進行讀取轉換
	except Exception as e:
		print ("Address: %s" % address)
		print (e)
	return bus.read_byte(address)

# 模塊輸出模擬量控制,范圍為0-255
def write(val):
	try:
		temp = val # 將數值賦給temmp 變量
		temp = int(temp) # 將字符串轉換為整型
		# 在終端上打印temp以查看,否則將注釋掉
		bus.write_byte_data(address, 0x40, temp)
	except Exception as e:
		print ("Error: Device address: 0x%2X" % address)
		print (e)

if __name__ == "__main__":
	setup(0x48)
	while True:
		print ('AIN0 = ', read(0))
		print ('AIN1 = ', read(1))
		tmp = read(0)
		tmp = tmp*(255-125)/255+125 # 低於125時LED不會亮,所以請將“0-255”轉換為“125-255”
		write(tmp)
#		time.sleep(0.3)

雨滴傳感器

import PCF8591 as ADC
import RPi.GPIO as GPIO
import time
import math

DO = 22       # 雨滴傳感器數字管腳

GPIO.setmode(GPIO.BCM) # 采用BCM管腳給GPIO口

# GPIO口定義
def setup():
	ADC.setup(0x48)      # 設置PCF8591模塊地址
	GPIO.setup(DO, GPIO.IN)  # 設置雨滴傳感器管腳為輸入模式

# 打印出雨滴傳感器提示信息
def Print(x):
	if x == 1:          # 沒有雨滴
		print ('')
		print ('   ************************')
		print ('   * Not raining *')
		print ('   ************************')
		print ('')
	if x == 0:          # 有雨滴
		print ('')
		print ('   **********************')
		print ('   * Raining!! *')
		print ('   **********************')
		print ('')
# 循環函數
def loop():
	status = 1      # 雨滴傳感器狀態
	while True:
		print (ADC.read(2))  # 打印出AIN3的模擬量數值
		
		tmp = GPIO.input(DO)      # 讀取數字IO口電平,讀取數字雨滴傳感器DO端口
		if tmp != status:         # 狀態發生改變
			Print(tmp)   # 打印出雨滴傳感器檢測信息
			status = tmp # 狀態值重新賦值		
		time.sleep(0.2)                    # 延時200ms

# 功能函數
def rain():
    status = 1      # 雨滴傳感器狀態
    # 讀取數字IO口電平,讀取數字雨滴傳感器DO端口
    return GPIO.input(DO)


# 程序入口
if __name__ == '__main__':
	try:
		setup()   # GPIO定義
		loop()    # 調用循環函數
	except KeyboardInterrupt:
		pass	


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM