- 2024-10-27
-
回复了主题帖:
【2024 DigiKey 创意大赛】基于AI图像识别的宠物监控报警装置
秦天qintian0303 发表于 2024-10-22 09:10
AI图像识别是不是能识别出事那个猫?
当然可以
- 2024-10-20
-
发表了主题帖:
【2024 DigiKey 创意大赛】基于AI图像识别的宠物监控报警装置
基于AI图像识别的宠物监控报警装置
作者:土豆12
一、作品简介
我在养狗的过程中一只有个困扰,就是狗需要运动,不能把它一直关到笼子里。但家里又有很多区域不想让它进入,但往往一眼没看住他就跑了进去。因此我打算设计一个报警装置,我们可以将其部署在一些不希望宠物进入的区域,比如厨房或阳台。当宠物进入该区域并被摄像头捕捉到后,通过放置在其他区域的ESP32-S3模块播放声音,同时使用ESP32-C6模块控制灯光闪烁,以此吸引宠物离开区域。
二、系统框图
项目的技术实现可以分为下面几个部分。
1,物联网中枢平台,我计划是用部署在树莓派zero2W上的MQTT 服务器。
2,其次是ESP32-S3和ESP32-C6,当然还有作为摄像头模组使用的ESP32-CAMERA,代码可以使用arduino和 circuitpython进行编写。
3,AI图像识别,计划使用Yolov8进行实现,代码同样部署在树莓派zero2W上。
4,识别完成的结果发送到MQTT服务器,在通过MQTT服务器广播到所有其他节点。
5,若发现宠物,则通过放置在其他区域的ESP32-S3模块播放声音,同时使用ESP32-C6模块控制灯光闪烁,以此吸引宠物离开区域。
三、各部分功能说明
先在树莓派安装系统,安装参照树莓派官方教程就可以,非常简单,这里就不赘述了。然后安装docker:
# Add Docker's official GPG key:
sudo apt-get update
sudo apt-get install ca-certificates curl
sudo install -m 0755 -d /etc/apt/keyrings
sudo curl -fsSL https://download.docker.com/linux/debian/gpg -o /etc/apt/keyrings/docker.asc
sudo chmod a+r /etc/apt/keyrings/docker.asc
# Add the repository to Apt sources:
echo \
"deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/debian \
$(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
sudo apt-get update
sudo apt-get install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin
接着使用docker compose来部署服务。Compose文件内容如下:
version: '3'
services:
mosquitto:
image: eclipse-mosquitto
container_name: mosquitto
ports:
- 1883:1883
- 9001:9001
restart: unless-stopped
这样就完成了服务的部署。下面开始编写各个节点的代码。先从信号输入开始。我使用的是ESP32-CAMERA模块,直接使用官方例程就可以。
下面是图像处理代码,使用python编写。如果发现有宠物出现在视野中,则会像MQTT服务器上报消息:
from ultralytics import YOLO
from ha_mqtt_discoverable import Settings
from ha_mqtt_discoverable.sensors import BinarySensor, BinarySensorInfo
CAM_URL = "http://192.168.1.102:81/stream"
MQTT_USERNAME = ""
MQTT_PASSWORD = ""
mqtt_settings = Settings.MQTT(
host="localhost", username=MQTT_USERNAME, password=MQTT_PASSWORD
)
sensor_info = BinarySensorInfo(name="dog", off_delay=3)
mysensor = BinarySensor(Settings(mqtt=mqtt_settings, entity=sensor_info))
model = YOLO("yolov8n.pt")
results = model.predict(CAM_URL, stream=True, show=False, conf=0.5)
# loop
for result in results:
for box in result.boxes:
class_id = result.names[box.cls[0].item()]
cords = box.xyxy[0].tolist()
cords = [round(x) for x in cords]
confi = round(box.conf[0].item(), 2)
print("Object type:", class_id)
print("Coordinates:", cords)
print("Probability:", confi)
print("---")
if class_id == "dog":
mysensor.on()
这样一旦摄像头发现有狗子出现,就会立马发送到MQTT服务器。
下面看看ESP32-S3和ESP32-C6节点部分。我把他们都安装在一块面包板上,方便连线。其中ESP32-S3与SPK2模块连线方式是:
IO4 - BCLK
IO6 - WS
IO5 - DATA
ESP32-C6部分我是在IO9上连接了一个LED。
首先来写ESP32-S3的代码,这部分使用circuitpython完成,实现的是当接收到MQTT服务器发布的发现狗子消息后,播放声音吸引狗子注意力。
在开始编写代码前,要先写好settings.toml文件,这样才可以正确使用网络以及连接到MQTT服务器。
CIRCUITPY_WIFI_SSID=" "
CIRCUITPY_WIFI_PASSWORD=" "
MQTT_BROKER = ""
MQTT_PORT = 1883
MQTT_USER = ""
MQTT_PASSWORD = ""
下面是程序代码:
import audiocore
import board
import audiobusio
import os
import wifi
import socketpool
import ssl
import adafruit_minimqtt.adafruit_minimqtt as MQTT
import json
mqtt_topic = "hmd/binary_sensor/dog/state"
state = 0
# Define callback methods which are called when events occur
def connect(client, userdata, flags, rc):
# This function will be called when the client is connected
# successfully to the broker.
print("Connected to MQTT Broker!")
print("Flags: {0}\n RC: {1}".format(flags, rc))
def disconnect(client, userdata, rc):
# This method is called when the client disconnects
# from the broker.
print("Disconnected from MQTT Broker!")
def subscribe(client, userdata, topic, granted_qos):
# This method is called when the client subscribes to a new feed.
print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
def unsubscribe(client, userdata, topic, pid):
# This method is called when the client unsubscribes from a feed.
print("Unsubscribed from {0} with PID {1}".format(topic, pid))
def publish(client, userdata, topic, pid):
# This method is called when the client publishes data to a feed.
print("Published to {0} with PID {1}".format(topic, pid))
def message(client, topic, message):
global state
# This method is called when a topic the client is subscribed to
# has a new message.
print(f"New message on topic {topic}: {message}")
if message == "on":
state = 1
mqtt_client = MQTT.MQTT(
broker=os.getenv("MQTT_BROKER"),
port=os.getenv("MQTT_PORT"),
username=os.getenv("MQTT_USER"),
password=os.getenv("MQTT_PASSWORD"),
is_ssl=False,
socket_pool=socketpool.SocketPool(wifi.radio),
ssl_context=ssl.create_default_context(),
)
# Connect callback handlers to client
mqtt_client.on_connect = connect
mqtt_client.on_disconnect = disconnect
mqtt_client.on_subscribe = subscribe
mqtt_client.on_unsubscribe = unsubscribe
mqtt_client.on_publish = publish
mqtt_client.on_message = message
print("Attempting to connect to %s" % mqtt_client.broker)
mqtt_client.connect()
print("Subscribing to %s" % mqtt_topic)
mqtt_client.subscribe(mqtt_topic)
wave = audiocore.WaveFile("music3.wav")
audio = audiobusio.I2SOut(board.IO4, board.IO6, board.IO5)
while True:
mqtt_client.loop(1)
if state:
audio.play(wave, loop=False)
state = 0
最后是ESP32-C6部分代码,这部分代码的功能是当接收到MQTT服务器发布的发现狗子消息后,闪烁灯光吸引狗子注意力。这部分也是使用circuitpython编写的。
import os
import wifi
import socketpool
import ssl
import adafruit_minimqtt.adafruit_minimqtt as MQTT
import json
import digitalio
import board
import time
mqtt_topic = "hmd/binary_sensor/dog/state"
led = digitalio.DigitalInOut(board.IO9)
led.direction = digitalio.Direction.OUTPUT
# Define callback methods which are called when events occur
def connect(client, userdata, flags, rc):
# This function will be called when the client is connected
# successfully to the broker.
print("Connected to MQTT Broker!")
print("Flags: {0}\n RC: {1}".format(flags, rc))
def disconnect(client, userdata, rc):
# This method is called when the client disconnects
# from the broker.
print("Disconnected from MQTT Broker!")
def subscribe(client, userdata, topic, granted_qos):
# This method is called when the client subscribes to a new feed.
print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
def unsubscribe(client, userdata, topic, pid):
# This method is called when the client unsubscribes from a feed.
print("Unsubscribed from {0} with PID {1}".format(topic, pid))
def publish(client, userdata, topic, pid):
# This method is called when the client publishes data to a feed.
print("Published to {0} with PID {1}".format(topic, pid))
def message(client, topic, message):
# This method is called when a topic the client is subscribed to
# has a new message.
print(f"New message on topic {topic}: {message}")
if message == "on":
led.value = True
mqtt_client = MQTT.MQTT(
broker=os.getenv("MQTT_BROKER"),
port=os.getenv("MQTT_PORT"),
username=os.getenv("MQTT_USER"),
password=os.getenv("MQTT_PASSWORD"),
is_ssl=False,
socket_pool=socketpool.SocketPool(wifi.radio),
ssl_context=ssl.create_default_context(),
)
# Connect callback handlers to client
mqtt_client.on_connect = connect
mqtt_client.on_disconnect = disconnect
mqtt_client.on_subscribe = subscribe
mqtt_client.on_unsubscribe = unsubscribe
mqtt_client.on_publish = publish
mqtt_client.on_message = message
print("Attempting to connect to %s" % mqtt_client.broker)
mqtt_client.connect()
print("Subscribing to %s" % mqtt_topic)
mqtt_client.subscribe(mqtt_topic)
# print("Publishing to %s" % mqtt_topic)d
# mqtt_client.publish(mqtt_topic, "Hello Broker!")
# print("Unsubscribing from %s" % mqtt_topic)
# mqtt_client.unsubscribe(mqtt_topic)
# print("Disconnecting from %s" % mqtt_client.broker)
# mqtt_client.disconnect()
while True:
led.value = False
mqtt_client.loop(1)
四、作品源码
五、作品功能演示视频
[localvideo]3c89db2c1fd3a914fbd389fe79c9b76d[/localvideo]
六、项目总结
通过这个项目,让我尝试了多个设备通过MQTT协调工作,体验到了MQTT在物联网中不可撼动的强大能力。word版本报告附在了下面:
- 2024-10-06
-
发表了主题帖:
【2024 DigiKey 创意大赛】开箱
感谢这次大赛活动提供的候补名额,让我可以有机会参加。这次活动购买了很多物料,接下来一个一个看一下。
1,树莓派zero2w,用来作为云端服务器使用。
2,ESP32-S3- DEVKITC-1- N8R8,用来用作视频上传节点
3,ESP32-C6-DEVKITC-1-N8 作为警报节点单片机使用
4,U055-B M5stack生产的speaker模块,用来播放警报
- 2024-08-08
-
发表了主题帖:
【Follow me第二季第1期】全部任务提交
本帖最后由 土豆12 于 2024-10-13 00:14 编辑
这次的followme活动主控是Adafruit Circuit Playground Express,这块板子可以非常方便的使用circuitpython来进行开发。跟随活动一起我还附带了一片树莓派,打算作为上位机使用。话不多说,先来开箱。
本次活动中,我会使用树莓派作为上位机,用来给circuitplayground编程并烧录。
下面我们开始入门任务,开发环境搭建,点亮板载LED。
首先先准备一张内存卡,烧录上raspberry pi OS 64bit lite固件,启动后通过ssh登录,接下来需要安装一些所需的软件:
sudo apt update
sudo apt upgrade
sudo apt install xserver-xorg
sudo apt install ttf-wqy-zenhei
sudo apt autoremove
pip install thonny
上面的命令除了安装了我们编程所需的Thonny和中文字体外,还安装了Xserver,这是用来将Thonny的图形化界面通过ssh显示在我们电脑上的工具,所用的协议是X11-Forwarding。
在使用前还需要确认一下SSHD配置:
sudo nano /etc/ssh/sshd_config
确认其中的X11Forwarding是yes。
重启一下sshd服务:
sudo systemctl restart sshd
最后,在MobaXterm中,我们要在连接配置中勾选X11-Forwarding选项:
再次重新连接SSH,看到如下X11-Forwarding部分已经被搭上了勾,就说明已经启用完毕。
在命令里直接敲入thonny,就可以看到树莓派上的thonny界面成功的在我们电脑上被显示了出来。
接着在接上circuitpplayground前,我们要先把circuitpython固件刷进去才行。去官网下载完circuitpython的uf2固件,把开发板接上电脑,按下reset后,直接拖到bootloader里就可以。
接着我们把开发板插到树莓派的USB口上,这里需要一根OTG线,插好后,就可以用树莓派上的thonny连接到开发板了。
首先我们先把本次项目所需的所有库文件,和生成流水灯所需的函数准备好:
import time
import board
from digitalio import DigitalInOut, Direction, Pull
from analogio import AnalogIn
from adafruit_circuitplayground import cp
from adafruit_circuitplayground.express import cpx
def wheel(pos):
if (pos < 0) or (pos > 255):
return (0, 0, 0)
if pos < 85:
return (int(255 - pos * 3), int(pos * 3), 0)
elif pos < 170:
pos -= 85
return (0, int(255 - (pos * 3)), int(pos * 3))
else:
pos -= 170
return (int(pos * 3), 0, int(255 - pos * 3))
pixeln = 0
t = 10
接下来是点灯,只需要用自带的cpx就可以
# 入门任务
print("入门任务")
cpx.red_led = 1
time.sleep(t)
接下来是基础任务一,利用之前的wheel函数来创建一个流水灯
# 基础任务一
print("基础任务一")
start_time = time.time()
while time.time() - start_time < t:
for p in range(10):
color = wheel(25 * ((pixeln + p) % 10))
cpx.pixels[p] = tuple(int(c * ((10 - (pixeln + p) % 10)) / 10.0) for c in color)
pixeln += 1
if pixeln > 9:
pixeln = 0
接着是基础任务二,读取温度和光强后,如果温度和光强都较低,温度不高于25,光强不超过40,则比较舒适,板子上的LED都显示为绿色;若温度不高于32,光强不高于80,则一般,LED显示黄色;其他情况则说明舒适度比较差,LED显示为红色。从下面的串口信息可以看到,当前较为炎热,所以板子显示是红色。
# 基础任务二
print("基础任务二")
start_time = time.time()
while time.time() - start_time < t:
print("Temperature: %0.1f *C" % cpx.temperature)
print("Light Level: %d" % cpx.light)
if 18 <= cpx.temperature <= 25 and cpx.light <= 40:
for p in range(10):
cpx.pixels[p] = (0, 15, 0)
elif 10 <= cpx.temperature <= 32 and cpx.light <= 80:
for p in range(10):
cpx.pixels[p] = (15, 15, 0)
else:
for p in range(10):
cpx.pixels[p] = (15, 0, 0)
下面是基础任务三,参考官方的视频教程,可以知道如果想用IR发射和接收管来测量接近情况,需要让IR发射端闪一下,然后立马读取IR接收端的模拟量。此时我把阈值设置为31000,若障碍物过于接近,读数超过31000,则发出警报声,并亮红灯;否则则关闭警报,亮绿灯。
# 基础任务三
print("基础任务三")
LED_IR = DigitalInOut(board.IR_TX)
LED_IR.switch_to_output()
SENSOR_IR = AnalogIn(board.IR_PROXIMITY)
start_time = time.time()
while time.time() - start_time < t:
LED_IR.value = 1
time.sleep(0.01)
LED_IR.value = 0
sensor = SENSOR_IR.value
print("proximity Level: %d" % sensor)
if sensor > 31000:
for p in range(10):
cpx.pixels[p] = (15, 0, 0)
cp.start_tone(440)
else:
for p in range(10):
cpx.pixels[p] = (0, 15, 0)
cp.stop_tone()
下面是进阶任务,读取板载重力感应的数值,通过两个水平量的平方和可以计算出板子倾斜的程度,如果程度较大则LED显示红色并报警;否则关闭警报并显示绿色。
# 进阶任务
print("进阶任务")
start_time = time.time()
while time.time() - start_time < t:
x, y, z = cpx.acceleration
print("Accelerometer: (%0.1f, %0.1f, %0.1f) m/s^2" % (x, y, z))
skew = (x * x) + (y * y)
if skew > 5:
for p in range(10):
cpx.pixels[p] = (15, 0, 0)
cp.start_tone(440)
else:
for p in range(10):
cpx.pixels[p] = (0, 15, 0)
cp.stop_tone()
最后一个任务是水果钢琴,板载的GPIO带有触摸功能,通过手指触摸就可以触发。如果把引脚外接到水果上,也可以有相同的效果。由于水果会腐蚀引脚,这里我就直接用手触摸开发板的引脚来进行演示,触摸不同的引脚,板载LED会显示不同的颜色,同时播放不同的音符,实现钢琴效果。
# 创意任务三:水果钢琴
print("创意任务三:水果钢琴")
while 1:
if cp.touch_A4:
print("Touched A4!")
cp.pixels.fill((15, 0, 0))
cp.start_tone(262)
elif cp.touch_A5:
print("Touched A5!")
cp.pixels.fill((15, 5, 0))
cp.start_tone(294)
elif cp.touch_A6:
print("Touched A6!")
cp.pixels.fill((15, 15, 0))
cp.start_tone(330)
elif cp.touch_A7:
print("Touched A7!")
cp.pixels.fill((0, 15, 0))
cp.start_tone(349)
elif cp.touch_A1:
print("Touched A1!")
cp.pixels.fill((0, 15, 15))
cp.start_tone(392)
elif cp.touch_A2 and not cp.touch_A3:
print("Touched A2!")
cp.pixels.fill((0, 0, 15))
cp.start_tone(440)
elif cp.touch_A3 and not cp.touch_A2:
print("Touched A3!")
cp.pixels.fill((5, 0, 15))
cp.start_tone(494)
elif cp.touch_A2 and cp.touch_A3:
print('Touched "8"!')
cp.pixels.fill((15, 0, 15))
cp.start_tone(523)
else:
cp.pixels.fill((0, 0, 0))
cp.stop_tone()
心得体会:这次活动让我同时体验到两块开发板,并尝试在他们之间进行协作,非常有意思。建议以后可以继续多选一些容易上手,可玩性高的开发板,并且给大家足够的剩余额度购买其他板子来开发新玩法。
源代码:https://download.eeworld.com.cn/detail/eew_JtDqA2/633951
演示视频:https://training.eeworld.com.cn/video/40781
-
上传了资料:
Follow me第二季第1期
- 2024-07-28
-
发表了主题帖:
【Sipeed MAix BiT AIoT 开发套件】4,自动瞄准跟拍的系统
在开始前,先看一下实现的效果:
[localvideo]b65976bd4f7ae6dd09f538b9af516218[/localvideo]
前三期已经完成了全部功能模块独立的实现,最后一期我们要把功能整合起来,完成项目。
首先根据上一期中的PWM驱动舵机订单方法,写一个简单的舵机类,方便更简单的控制:
class Servo:
def __init__(self, pwm, dir=50, duty_min=2.5, duty_max=12.5):
self.value = dir
self.pwm = pwm
self.duty_min = duty_min
self.duty_max = duty_max
self.duty_range = duty_max -duty_min
self.enable(True)
self.pwm.duty(self.value/100*self.duty_range+self.duty_min)
def enable(self, en):
if en:
self.pwm.enable()
else:
self.pwm.disable()
def dir(self, percentage):
if percentage > 100:
percentage = 100
elif percentage < 0:
percentage = 0
self.pwm.duty(percentage/100*self.duty_range+self.duty_min)
def drive(self, inc):
self.value += inc
if self.value > 100:
self.value = 100
elif self.value < 0:
self.value = 0
self.pwm.duty(self.value/100*self.duty_range+self.duty_min)
在控制舵机的时候,如果希望比较平滑的控制误差收敛,我们需要用到PID算法。创建以下类手工写入一个PID算法。该类一共四个输入,除了P、I、D三个参数以外,还增加了一个最大I的参数,用来限制积分变量的修正上限。
class PID:
_kp = _ki = _kd = _integrator = _imax = 0
_last_error = _last_t = 0
_RC = 1/(2 * pi * 20)
def __init__(self, p=0, i=0, d=0, imax=0):
self._kp = float(p)
self._ki = float(i)
self._kd = float(d)
self._imax = abs(imax)
self._last_derivative = None
def get_pid(self, error, scaler):
tnow = time.ticks_ms()
dt = tnow - self._last_t
output = 0
if self._last_t == 0 or dt > 1000:
dt = 0
self.reset_I()
self._last_t = tnow
delta_time = float(dt) / float(1000)
output += error * self._kp
if abs(self._kd) > 0 and dt > 0:
if self._last_derivative == None:
derivative = 0
self._last_derivative = 0
else:
derivative = (error - self._last_error) / delta_time
derivative = self._last_derivative + \
((delta_time / (self._RC + delta_time)) * \
(derivative - self._last_derivative))
self._last_error = error
self._last_derivative = derivative
output += self._kd * derivative
output *= scaler
if abs(self._ki) > 0 and dt > 0:
self._integrator += (error * self._ki) * scaler * delta_time
if self._integrator < -self._imax: self._integrator = -self._imax
elif self._integrator > self._imax: self._integrator = self._imax
output += self._integrator
return output
def reset_I(self):
self._integrator = 0
self._last_derivative = None
由于我们使用的是一个2轴云台,因此需要同时使用多个舵机。将PID的调整结果一一放入两个舵机中进行调整,就可以完成云台的调整。因此我们可以再写一个云台类,用来把上面的两个类结合起来:
class Gimbal:
def __init__(self, pitch, pid_pitch, roll=None, pid_roll=None, yaw=None, pid_yaw=None):
self._pitch = pitch
self._roll = roll
self._yaw = yaw
self._pid_pitch = pid_pitch
self._pid_roll = pid_roll
self._pid_yaw = pid_yaw
def run(self, pitch_err, roll_err=50, yaw_err=50, pitch_reverse=False, roll_reverse=False, yaw_reverse=False):
out = self._pid_pitch.get_pid(pitch_err, 1)
# print("err: {}, out: {}".format(pitch_err, out))
if pitch_reverse:
out = - out
self._pitch.drive(out)
if self._roll:
out = self._pid_roll.get_pid(roll_err, 1)
if roll_reverse:
out = - out
self._roll.drive(out)
if self._yaw:
out = self._pid_yaw.get_pid(yaw_err, 1)
if yaw_reverse:
out = - out
self._yaw.drive(out)
至此,所有的准备工作都完成了。Main函数的功能就非常直观了,先驱动摄像头和LCD,摄像头抓取图像,使用人脸检测,检测到人脸后从图上计算出和中心点的x轴y轴差异,然后把这个差异喂给PID类,计算出需要调整的值后,再把值喂给云台类,云台类使用舵机类完成两个舵机的调整。最后再将图像显示在LCD上,就完整的一个循环内的全部过程。
if __name__ == "__main__":
'''
servo:
freq: 50 (Hz)
T: 1/50 = 0.02s = 20ms
duty: [0.5ms, 2.5ms] -> [0.025, 0.125] -> [2.5%, 12.5%]
pin:
IO24 <--> pitch
IO25 <--> roll
'''
init_pitch = 20 # init position, value: [0, 100], means minimum angle to maxmum angle of servo
init_roll = 50 # 50 means middle
sensor_hmirror = False
sensor_vflip = False
lcd_rotation = 2
lcd_mirror = False
pitch_pid = [0.25, 0, 0.02, 0] # P I D I_max
roll_pid = [0.25, 0, 0.02, 0] # P I D I_max
target_err_range = 10 # target error output range, default [0, 10]
target_ignore_limit = 0.02 # when target error < target_err_range*target_ignore_limit , set target error to 0
pitch_reverse = False # reverse out value direction
roll_reverse = False # ..
import sensor,image,lcd
import KPU as kpu
def lcd_show_except(e):
import uio
err_str = uio.StringIO()
sys.print_exception(e, err_str)
err_str = err_str.getvalue()
img = image.Image(size=(224,224))
img.draw_string(0, 10, err_str, scale=1, color=(0xff,0x00,0x00))
lcd.display(img)
class Target():
def __init__(self, out_range=10, ignore_limit=0.02, hmirror=False, vflip=False, lcd_rotation=2, lcd_mirror=False):
self.pitch = 0
self.roll = 0
self.out_range = out_range
self.ignore = ignore_limit
self.task = kpu.load(0x300000) # face model addr in flash
self.clock = time.clock() # Create a clock object to track the FPS.
anchor = (1.889, 2.5245, 2.9465, 3.94056, 3.99987, 5.3658, 5.155437, 6.92275, 6.718375, 9.01025)
kpu.init_yolo2(self.task, 0.5, 0.3, 5, anchor)
lcd.init(type=1)
lcd.rotation(lcd_rotation)
lcd.mirror(lcd_mirror)
try:
sensor.reset()
except Exception as e:
raise Exception("sensor reset fail, please check hardware connection, or hardware damaged! err: {}".format(e))
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.set_hmirror(hmirror)
sensor.set_vflip(vflip)
def get_target_err(self):
self.clock.tick()
img = sensor.snapshot()
objects = kpu.run_yolo2(self.task, img)
if objects:
max_area = 0
max_i = 0
for i, j in enumerate(objects):
a = j.w()*j.h()
if a > max_area:
max_i = i
max_area = a
img.draw_rectangle(objects[max_i].rect())
self.pitch = (objects[max_i].y() + objects[max_i].h() / 2)/240*self.out_range*2 - self.out_range
self.roll = (objects[max_i].x() + objects[max_i].w() / 2)/320*self.out_range*2 - self.out_range
# limit
if abs(self.pitch) < self.out_range*self.ignore:
self.pitch = 0
if abs(self.roll) < self.out_range*self.ignore:
self.roll = 0
img.draw_cross(160, 120)
img.draw_string(0, 200, "FPS: %s" % self.clock.fps(), scale=2)
img.draw_string(0, 220, "Error: %s" % str((self.roll, self.pitch)), scale=2)
lcd.display(img)
return (self.pitch, self.roll)
else:
img.draw_cross(160, 120)
img.draw_string(0, 200, "FPS: %s" % self.clock.fps(), scale=2)
img.draw_string(0, 220, "Error: %s" % "N/A", scale=2)
lcd.display(img)
return (0, 0)
target = Target(target_err_range, target_ignore_limit, sensor_hmirror, sensor_vflip, lcd_rotation, lcd_mirror)
tim0 = Timer(Timer.TIMER0, Timer.CHANNEL0, mode=Timer.MODE_PWM)
tim1 = Timer(Timer.TIMER0, Timer.CHANNEL1, mode=Timer.MODE_PWM)
pitch_pwm = PWM(tim0, freq=50, duty=0, pin=24)
roll_pwm = PWM(tim1, freq=50, duty=0, pin=25)
pitch = Servo(pitch_pwm, dir=init_pitch)
roll = Servo(roll_pwm, dir=init_roll)
pid_pitch = PID(p=pitch_pid[0], i=pitch_pid[1], d=pitch_pid[2], imax=pitch_pid[3])
pid_roll = PID(p=roll_pid[0], i=roll_pid[1], d=roll_pid[2], imax=roll_pid[3])
gimbal = Gimbal(pitch, pid_pitch, roll, pid_roll)
target_pitch = init_pitch
target_roll = init_roll
t = time.ticks_ms()
_dir = 0
t0 = time.ticks_ms()
while True:
try:
# get target error
err_pitch, err_roll = target.get_target_err()
# run
gimbal.run(-err_pitch, err_roll, pitch_reverse = pitch_reverse, roll_reverse=roll_reverse)
# interval limit to > 5ms to wait servo move
if time.ticks_ms() - t0 < 5:
continue
t0 = time.ticks_ms()
except Exception as e:
sys.print_exception(e)
lcd_show_except(e)
finally:
gc.collect()
最后,完整的代码如下:
import time, sys, gc
from machine import Timer,PWM
from math import pi
class Servo:
def __init__(self, pwm, dir=50, duty_min=2.5, duty_max=12.5):
self.value = dir
self.pwm = pwm
self.duty_min = duty_min
self.duty_max = duty_max
self.duty_range = duty_max -duty_min
self.enable(True)
self.pwm.duty(self.value/100*self.duty_range+self.duty_min)
def enable(self, en):
if en:
self.pwm.enable()
else:
self.pwm.disable()
def dir(self, percentage):
if percentage > 100:
percentage = 100
elif percentage < 0:
percentage = 0
self.pwm.duty(percentage/100*self.duty_range+self.duty_min)
def drive(self, inc):
self.value += inc
if self.value > 100:
self.value = 100
elif self.value < 0:
self.value = 0
self.pwm.duty(self.value/100*self.duty_range+self.duty_min)
class PID:
_kp = _ki = _kd = _integrator = _imax = 0
_last_error = _last_t = 0
_RC = 1/(2 * pi * 20)
def __init__(self, p=0, i=0, d=0, imax=0):
self._kp = float(p)
self._ki = float(i)
self._kd = float(d)
self._imax = abs(imax)
self._last_derivative = None
def get_pid(self, error, scaler):
tnow = time.ticks_ms()
dt = tnow - self._last_t
output = 0
if self._last_t == 0 or dt > 1000:
dt = 0
self.reset_I()
self._last_t = tnow
delta_time = float(dt) / float(1000)
output += error * self._kp
if abs(self._kd) > 0 and dt > 0:
if self._last_derivative == None:
derivative = 0
self._last_derivative = 0
else:
derivative = (error - self._last_error) / delta_time
derivative = self._last_derivative + \
((delta_time / (self._RC + delta_time)) * \
(derivative - self._last_derivative))
self._last_error = error
self._last_derivative = derivative
output += self._kd * derivative
output *= scaler
if abs(self._ki) > 0 and dt > 0:
self._integrator += (error * self._ki) * scaler * delta_time
if self._integrator < -self._imax: self._integrator = -self._imax
elif self._integrator > self._imax: self._integrator = self._imax
output += self._integrator
return output
def reset_I(self):
self._integrator = 0
self._last_derivative = None
class Gimbal:
def __init__(self, pitch, pid_pitch, roll=None, pid_roll=None, yaw=None, pid_yaw=None):
self._pitch = pitch
self._roll = roll
self._yaw = yaw
self._pid_pitch = pid_pitch
self._pid_roll = pid_roll
self._pid_yaw = pid_yaw
def run(self, pitch_err, roll_err=50, yaw_err=50, pitch_reverse=False, roll_reverse=False, yaw_reverse=False):
out = self._pid_pitch.get_pid(pitch_err, 1)
# print("err: {}, out: {}".format(pitch_err, out))
if pitch_reverse:
out = - out
self._pitch.drive(out)
if self._roll:
out = self._pid_roll.get_pid(roll_err, 1)
if roll_reverse:
out = - out
self._roll.drive(out)
if self._yaw:
out = self._pid_yaw.get_pid(yaw_err, 1)
if yaw_reverse:
out = - out
self._yaw.drive(out)
if __name__ == "__main__":
'''
servo:
freq: 50 (Hz)
T: 1/50 = 0.02s = 20ms
duty: [0.5ms, 2.5ms] -> [0.025, 0.125] -> [2.5%, 12.5%]
pin:
IO24 <--> pitch
IO25 <--> roll
'''
init_pitch = 20 # init position, value: [0, 100], means minimum angle to maxmum angle of servo
init_roll = 50 # 50 means middle
sensor_hmirror = False
sensor_vflip = False
lcd_rotation = 2
lcd_mirror = False
pitch_pid = [0.25, 0, 0.02, 0] # P I D I_max
roll_pid = [0.25, 0, 0.02, 0] # P I D I_max
target_err_range = 10 # target error output range, default [0, 10]
target_ignore_limit = 0.02 # when target error < target_err_range*target_ignore_limit , set target error to 0
pitch_reverse = False # reverse out value direction
roll_reverse = False # ..
import sensor,image,lcd
import KPU as kpu
def lcd_show_except(e):
import uio
err_str = uio.StringIO()
sys.print_exception(e, err_str)
err_str = err_str.getvalue()
img = image.Image(size=(224,224))
img.draw_string(0, 10, err_str, scale=1, color=(0xff,0x00,0x00))
lcd.display(img)
class Target():
def __init__(self, out_range=10, ignore_limit=0.02, hmirror=False, vflip=False, lcd_rotation=2, lcd_mirror=False):
self.pitch = 0
self.roll = 0
self.out_range = out_range
self.ignore = ignore_limit
self.task = kpu.load(0x300000) # face model addr in flash
self.clock = time.clock() # Create a clock object to track the FPS.
anchor = (1.889, 2.5245, 2.9465, 3.94056, 3.99987, 5.3658, 5.155437, 6.92275, 6.718375, 9.01025)
kpu.init_yolo2(self.task, 0.5, 0.3, 5, anchor)
lcd.init(type=1)
lcd.rotation(lcd_rotation)
lcd.mirror(lcd_mirror)
try:
sensor.reset()
except Exception as e:
raise Exception("sensor reset fail, please check hardware connection, or hardware damaged! err: {}".format(e))
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.set_hmirror(hmirror)
sensor.set_vflip(vflip)
def get_target_err(self):
self.clock.tick()
img = sensor.snapshot()
objects = kpu.run_yolo2(self.task, img)
if objects:
max_area = 0
max_i = 0
for i, j in enumerate(objects):
a = j.w()*j.h()
if a > max_area:
max_i = i
max_area = a
img.draw_rectangle(objects[max_i].rect())
self.pitch = (objects[max_i].y() + objects[max_i].h() / 2)/240*self.out_range*2 - self.out_range
self.roll = (objects[max_i].x() + objects[max_i].w() / 2)/320*self.out_range*2 - self.out_range
# limit
if abs(self.pitch) < self.out_range*self.ignore:
self.pitch = 0
if abs(self.roll) < self.out_range*self.ignore:
self.roll = 0
img.draw_cross(160, 120)
img.draw_string(0, 200, "FPS: %s" % self.clock.fps(), scale=2)
img.draw_string(0, 220, "Error: %s" % str((self.roll, self.pitch)), scale=2)
lcd.display(img)
return (self.pitch, self.roll)
else:
img.draw_cross(160, 120)
img.draw_string(0, 200, "FPS: %s" % self.clock.fps(), scale=2)
img.draw_string(0, 220, "Error: %s" % "N/A", scale=2)
lcd.display(img)
return (0, 0)
target = Target(target_err_range, target_ignore_limit, sensor_hmirror, sensor_vflip, lcd_rotation, lcd_mirror)
tim0 = Timer(Timer.TIMER0, Timer.CHANNEL0, mode=Timer.MODE_PWM)
tim1 = Timer(Timer.TIMER0, Timer.CHANNEL1, mode=Timer.MODE_PWM)
pitch_pwm = PWM(tim0, freq=50, duty=0, pin=24)
roll_pwm = PWM(tim1, freq=50, duty=0, pin=25)
pitch = Servo(pitch_pwm, dir=init_pitch)
roll = Servo(roll_pwm, dir=init_roll)
pid_pitch = PID(p=pitch_pid[0], i=pitch_pid[1], d=pitch_pid[2], imax=pitch_pid[3])
pid_roll = PID(p=roll_pid[0], i=roll_pid[1], d=roll_pid[2], imax=roll_pid[3])
gimbal = Gimbal(pitch, pid_pitch, roll, pid_roll)
target_pitch = init_pitch
target_roll = init_roll
t = time.ticks_ms()
_dir = 0
t0 = time.ticks_ms()
while True:
try:
# get target error
err_pitch, err_roll = target.get_target_err()
# run
gimbal.run(-err_pitch, err_roll, pitch_reverse = pitch_reverse, roll_reverse=roll_reverse)
# interval limit to > 5ms to wait servo move
if time.ticks_ms() - t0 < 5:
continue
t0 = time.ticks_ms()
except Exception as e:
sys.print_exception(e)
lcd_show_except(e)
finally:
gc.collect()
最后,我们把开发板,摄像头,屏幕都固定在面包板上,再把面包板固定在云台上,接好线,就完成了硬件的组装。Y轴舵机接PIN_24,X轴舵机接PIN_25.
项目全部源代码可以在这里下载:
-
发表了主题帖:
【Sipeed MAix BiT AIoT 开发套件】3,舵机驱动
K210带有硬件PWM,那我们自然要使用硬件PWM来驱动舵机。相比于使用time自己产生软件PWM方波信号,硬件PWM信号的一致性和稳定性,精确度都会高很多,可以有效避免舵机一直咔咔响的问题。
使用舵机的话,就可Micropython的方法类似。但相比起来稍微复杂一点点,我们可以手动指定计时器和通道。可以通过以下代码创建一个舵机实体:
from machine import Timer,PWM
tim = Timer(Timer.TIMER0, Timer.CHANNEL0, mode=Timer.MODE_PWM)
S1 = PWM(tim, freq=50, duty=0, pin=25)
这时,我们已经可以产生一个固定为50Hz的PWM信号。舵机接收的信号是500us - 2500us之间的信号,要想产生这个信号,我们只需要调整占空比就可以实现。通过简单计算,我们可以构造一个函数,输入角度即可计算出对应的占空比,并产生对应PWM:
def move(servo, angle):
servo.duty((angle+90)/180*10+2.5)
接着,使用以下循环就可以让舵机摇摆:
import time
j = 1
while True:
j = -j
for i in range(-90,90):
move(S1, i * j)
time.sleep(0.001)
效果如下:
[localvideo]37a43666666b07e0321f7763d5ba1bc4[/localvideo]
-
发表了主题帖:
【Sipeed MAix BiT AIoT 开发套件】2,Yolo_v2人脸识别
要实现人脸追踪,第一步就是要实现人脸的检测。
人脸检测属于机器学习里非常常规的应用。一般情况下,我们不需要去自己训练模型,可以试用官方准备好的模型,在下面链接下载face_model_at_0x300000.kfpkg模型文件:
https://dl.sipeed.com/MAIX/MaixPy/model
下载完成后,需要通过和上一期更新固件时同样的工具kflash_gui把模型刷写到开发板上:
不需要担心这个刷写会覆盖掉我们之前刷写的bin固件。因为这次刷写的文件是kfpkg文件,这个文件中有记载文件应该被刷写道flash的什么位置。
MaixPy 中的存储介质主要由 Flash,SD 卡组成,分为三块区域,分别是 MaixPy.bin 固件区,xxx.kmodel 模型区,文件系统区:Flash 上为 spiffs(SPI Flash File System),SD 卡为 Fatfs(FAT file system)。可以从下图中看到储存系统的分配,因此相互之间不会影响。
接下来加载模型,只需要把模型的刷鞋地址作为参数传递给函数即可。
import KPU as kpu
task = kpu.load(0x300000)
anchor = (1.889, 2.5245, 2.9465, 3.94056, 3.99987, 5.3658, 5.155437, 6.92275, 6.718375, 9.01025)
kpu.init_yolo2(task, 0.5, 0.3, 5, anchor)
其中,anchor元组是锚点参数,这个参数与模型是一一对应的,也就是说,这个元组我们不可以去做任何自行修改,一定要保持一模一样。
init_yolo2这个函数可以初始化模型。我们可以看到中间有两个浮点数参数。其中第一个参数是概率阈值, 只有是这个物体的概率大于这个值才会输出结果, 取值范围:[0, 1];第二个参数是box_iou 门限, 为了防止同一个物体被框出多个框,当在同一个物体上框出了两个框,这两个框的交叉区域占两个框总占用面积的比例 如果小于这个值时, 就取其中概率最大的一个框。
接着我们准备好想要识别的图片实体,这个图片通过摄像头获得。首先我们初始化摄像头:
import sensor
sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
接着,通过下面这个函数就可以抓取到照片,并保存为img变量
img = sensor.snapshot()
得到照片后,将照片放入模型中进行识别,识别出的结果是一个可以遍历的实体,里面每一个元素就是我们识别到的每一个人脸。 因为图像中的人脸可能不止一个,但追踪时我们只能选择最近的追踪。因此我们可以遍历每一个识别到的人脸,选择最大的哪个,把它的框画在图上。
objects = kpu.run_yolo2(self.task, img)
if objects:
max_area = 0
max_i = 0
for i, j in enumerate(objects):
a = j.w()*j.h()
if a > max_area:
max_i = i
max_area = a
img.draw_rectangle(objects[max_i].rect())
最后,我们可以把识别到的图像显示在LCD上。同样的,先要初始化LCD,只需一行代码即可完成。接着显示图片即可。
import lcd
lcd.init(type=1)
lcd.display(img)
如果一切顺利,逐行运行上述代码,如果图像中有人脸,我们可以看到LCD上显示出了对应的识别结果:
完整代码如下:
import sensor, image, lcd, time
import KPU as kpu
import gc, sys
def lcd_show_except(e):
import uio
err_str = uio.StringIO()
sys.print_exception(e, err_str)
err_str = err_str.getvalue()
img = image.Image(size=(224, 224))
img.draw_string(0, 10, err_str, scale=1, color=(0xFF, 0x00, 0x00))
lcd.display(img)
def main(model_addr=0x300000, lcd_rotation=2, sensor_hmirror=False, sensor_vflip=False):
try:
sensor.reset()
except Exception as e:
raise Exception(
"sensor reset fail, please check hardware connection, or hardware damaged! err: {}".format(
e
)
)
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.set_hmirror(sensor_hmirror)
sensor.set_vflip(sensor_vflip)
lcd.init(type=1)
lcd.rotation(lcd_rotation)
clock = time.clock()
anchors = (
1.889,
2.5245,
2.9465,
3.94056,
3.99987,
5.3658,
5.155437,
6.92275,
6.718375,
9.01025,
)
try:
task = kpu.load(model_addr)
kpu.init_yolo2(task, 0.5, 0.3, 5, anchors) # threshold:[0,1], nms_value: [0, 1]
while True:
clock.tick()
img = sensor.snapshot()
objects = kpu.run_yolo2(task, img)
if objects:
for obj in objects:
img.draw_rectangle(obj.rect())
center = (
round(obj.x() + obj.w() / 2, 1),
round(obj.y() + obj.h() / 2, 1),
)
else:
center = "N/A"
img.draw_string(0, 200, "FPS: %f" % (clock.fps()), scale=2)
img.draw_string(0, 220, "Center: %s" % str(center), scale=2)
lcd.display(img)
except Exception as e:
raise e
finally:
if not task is None:
kpu.deinit(task)
if __name__ == "__main__":
try:
main()
except Exception as e:
sys.print_exception(e)
lcd_show_except(e)
finally:
gc.collect()
-
发表了主题帖:
【Sipeed MAix BiT AIoT 开发套件】1,固件烧录,开发环境准备
感谢EEWorld提供的评测机会,经历千辛万苦终于收到了这块开发板。包装非常不错,所有配件都被收纳在一个小盒子中。
打开后,里面一共有以下这些东西:
Sipeed官方有非常详细的资料,在官方的wiki页面上。这块板子使用的开发环境是MaixPy,注意这个MaixPy有多个版本,互相不兼容。适配我们手上K210开发板的是v1,因此我们看这个网页:
https://wiki.sipeed.com/soft/maixpy/zh/index.html
在开发之前,我们先把摄像头和屏幕接上,给开发板上电,可以看到一个红色的页面:
红色屏幕上很清楚的写着当前固件的版本。这些信息我们也可以通过串口来得到。板载的串口芯片使用的是CH552,最新版的Win11可以免驱动直接用。串口波特率是115200,随便用一个串口工具连接,连接好后按板上的RESET重置,启动后使用Ctrl + C 终止程序,便可以看到固件信息:
如果需要更新固件,我们可以去到https://dl.sipeed.com/shareURL/MAIX/MaixPy/release/master下载最新固件,目前最新是v0.6.3_2. 下载来的固件是bin文件。
接着还需要去下载一下刷固件的工具:kflash_gui:https://github.com/sipeed/kflash_gui/releases
下载好后,加载固件,其他设置保持默认,可以和我下面的图确认一下各项设置,没有问题后点击下载就可以完成固件更新。
接下来我们可以准备一下开发环境。开发环境移植自OpenMV,有相应开发经验的小伙伴应该非常熟悉。可以去这个链接下载,如果想用免安装版,直接下载7z压缩包的版本即可:http://dl.sipeed.com/MAIX/MaixPy/ide/
下载后打开,选择好开发板,按左下角按钮连上开发板,运行默认打开的helloworld,就可以看到熟悉的界面:
从上图可以看到,我们的摄像头和屏幕已经正常开始工作。当然,不使用MaixPy IDE的话,我们也可以简单的测试一下硬件。只需要在开发板上电后快速按住BOOT按钮,再短在看到显示当前在测试模式后,我们就可以看到摄像头上的画面被显示在屏幕上。这就说明硬件都可以正常工作。
如果小伙伴们好奇这些功能都是怎么做到的,其实非常简单。MaixPy的固件是移植自Micropython,这个默认的红色背景,以及测试模式,其实都是板子出厂自带的main.py程序。通过使用uPyLoader-win我们可以把程序下载下来。具体程序如下:
try:
import gc, lcd, image, sys, os
from Maix import GPIO
from fpioa_manager import fm
test_pin=16
fm.fpioa.set_function(test_pin,fm.fpioa.GPIO7)
test_gpio=GPIO(GPIO.GPIO7,GPIO.IN,GPIO.PULL_UP)
fm.fpioa.set_function(17,fm.fpioa.GPIO0)
lcd_en = GPIO(GPIO.GPIO0,GPIO.OUT)
lcd_en.value(0)
lcd.init()
lcd.clear(color=(255,0,0))
lcd.draw_string(lcd.width()//2-68,lcd.height()//2-4, "Welcome to ", lcd.WHITE, lcd.RED)
if test_gpio.value() == 0:
print('PIN 16 pulled down, enter test mode')
lcd.clear(lcd.PINK)
lcd.draw_string(lcd.width()//2-68,lcd.height()//2-4, "Test Mode, wait ...", lcd.WHITE, lcd.PINK)
import sensor
import image
sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.run(1)
lcd.freq(16000000)
while True:
img=sensor.snapshot()
lcd.display(img)
loading = image.Image(size=(lcd.width(), lcd.height()))
loading.draw_rectangle((0, 0, lcd.width(), lcd.height()), fill=True, color=(255, 0, 0))
info = "Welcome to MaixPy"
loading.draw_string(int(lcd.width()//2 - len(info) * 5), (lcd.height())//4, info, color=(255, 255, 255), scale=2, mono_space=0)
v = sys.implementation.version
vers = 'V{}.{}.{} : maixpy.sipeed.com'.format(v[0],v[1],v[2])
loading.draw_string(int(lcd.width()//2 - len(info) * 6), (lcd.height())//3 + 20, vers, color=(255, 255, 255), scale=1, mono_space=1)
lcd.display(loading)
tf = None
try:
os.listdir("/sd/.")
except Exception as e:
tf ="SDcard not mount,use flash!"
loading.draw_string(int(lcd.width()//2 - len(info) * 7), (lcd.height())//2 + 10, tf, color=(255, 255, 255), scale=1, mono_space=1)
if not tf:
tf ="SDcard is mount,use SD!"
loading.draw_string(int(lcd.width()//2 - len(info) * 6), (lcd.height())//2 + 10, tf, color=(255, 255, 255), scale=1, mono_space=1)
lcd.display(loading)
del loading, v, info, vers
gc.collect()
finally:
gc.collect()
阅读上面的代码,就可以看到程序含义:如果启动时检测BOOT被按下,那么就进入测试模式,抓取摄像头画面并在LCD上显示;如果没有检测到BOOT,则显示红色屏幕。
至此,开发环境搭建,硬件测试,一切准备工作都已经完成。下一期我们正式开始项目开发。
- 2024-07-02
-
回复了主题帖:
测评入围名单:Sipeed MAix BiT AIoT 开发套件
私信已经无法发送,因此在这里回复。@eew_JtDqA2 这个就是我,我只不过是设置了一下用户名,名单上的是未设置用户名的初始默认名子。
- 2024-06-30
-
回复了主题帖:
测评入围名单:Sipeed MAix BiT AIoT 开发套件
感谢审核,还没有收到站短私信