本帖最后由 ltpop 于 2023-12-28 16:46 编辑
一、介绍视频
二、项目总结
笔者是一个智能家居爱好者,玩过HomeAssistant、ESPHome、Tasmota等平台,并使用过ESP8266、ESP32自已DIY设备。通常我只使用现有平台进行配置,很少直接编写硬件代码。非常感谢DigiKey和EEWORLD举办的Follow me活动,让我有机会玩转智能设备的底层代码。
和大部分同学一样,我选择了CircuitPython作为开发平台和语言。社区资源非常丰富,开发门槛较低,调试方便,非常适合像我这样的新手。为了完成活动的要求并实现一个完整的功能,我选择了制作一个日历+天气+番茄钟功能。这个功能可以在日常的工作学习中,运用番茄工作法提高效率。
这是我第一次接触CircuitPython,完成项目的同时也学习了CircuitPython。我的方法是先确定项目目标,即需要实现的最小功能。然后针对每个功能参考官方示例并对相关库用法进行学习,并同时进行代码调试。各功能代码测试通过后,将其合并为最终完整的代码。
项目中的功能点较多,除了活动要求的WIFI配置、网络请求、中文显示、Neopixel LED控制外,还包含了屏幕控制、协程、触摸传感器等的使用。其中的难点是屏幕控制和协程。作为一个CircuitPython新手,目前完成的项目代码仅仅是实现了功能,仍有很大的优化空间。后面还会继续探索实践,欢迎大家一起交流。
项目目标:
完成活动要求
网络数据请求
中文显示
Neopixel控制
实现番茄日历功能:
日期时间显示,并显示农历
天气显示
番茄钟控制功能
可设置番茄时长
显示番茄倒计时
完成后LED提醒并确认
番茄个数统计
实现方法
WIFI连接至互联网
从NTP服务获取日期和时间
从开放日历API获取农历日期
从开放天气API获取当地天气
使用协程函数和共享变量的方法进行数据更新和屏幕显示控制
使用ESP32S3的触摸传感器进行输入控制,执行番茄钟的启动和确认
使用Neopixel LED的不同颜色和状态表示设备的状态,如联网状态、番茄钟运行、确认阶段等
项目成品展示
完成帖子:【得捷电子Follow me第2期】番茄日历钟 - DigiKey得捷技术专区 - 电子工程世界-论坛 (eeworld.com.cn)
活动任务
Follow me 第2期!与得捷电子一起解锁开发板超能力! (eeworld.com.cn)
任务1:控制屏幕显示中文
完成屏幕的控制,并且能显示中文
任务2:网络功能使用
完成网络功能的使用,能够创建热点和连接到WiFi
任务3:控制WS2812B
使用按键控制板载Neopixel LED的显示和颜色切换
任务4:分任务1:日历&时钟
完成一个可通过互联网更新的万年历时钟,并显示当地的天气信息
项目材料
ESP32S3 TFT Overview | Adafruit ESP32-S3 TFT Feather | Adafruit Learning System
环境准备
安装CircuitPython
双击reset键,将.uf2文件复制到FTHRS2BOOT磁盘中,会自动重启并完成安装
使用Mu editor或VSCode+插件连接开发板,进入代码开发,两者都很方便进入REPL.模式进行调试。Mu连接更稳定,VSCode代码编写更强大。
开发流程一般是找相似的官方示例,在REPL模式中进行代码片断调试,测试成功后再写入code.py文件中,进行完整测试。
正常模型下修改了code.py文件内容后,开发板会自动重启载入代码。但如果在REPL模式下,需要手动重置开发板,除了按板子上的RST按钮外,还可以使用Ctrl+D或执行以下代码来重置。
import microcontroller;
microcontroller.reset();
安装Lib
代码中使用的库,一部分已经内置,可以直接import。否则就需要手动安装,官方提供的Lib链接为Libraries (circuitpython.org) 或Adafruit_CircuitPython_Bundle/libraries/drivers at main · adafruit/Adafruit_CircuitPython_Bundle (github.com),可进入下载与系统版本对应的包进行安装,如 adafruit-circuitpython-bundle-8.x-mpy-20231010.zip 。
安装方法很简单,解压后在lib目录中找到相应的.mpy文件,并拷贝至 CIRCUITPY 磁盘中lib目录下即可。如果需要源码,也可下载原始py文件。
功能点实现方法
中文字体显示(任务1)
官方示例:Introduction — Adafruit Bitmap_Font Library 1.0 documentation (circuitpython.org)
需要额外Lib:adafruit_display_text、adafruit_bitmap_font
显示中文的关键点在于找到中文字体,参考网友的代码使用了这里的开源字体:carrothu-cn/chinese-bitmap-fonts (github.com)。因为开发板空间限制,选择了大小合适的wenquanyi_13px.pcf,保存在 CIRCUITPY 磁盘的font目录下。
核心代码
import board
import displayio
import terminalio
# 需要导入adafruit_display_text库
from adafruit_display_text import bitmap_label
# 需要导入adafruit_bitmap_font库
from adafruit_bitmap_font import bitmap_font
display = board.DISPLAY
# 中文字体文件放在font目录下
font_file = "font/wenquanyi_13px.pcf"
font = bitmap_font.load_font(font_file)
group = displayio.Group()
# 设置农历显示(上中部)
lunar_label = bitmap_label.Label(font, color=0x0000FF, scale=1)
lunar_label.anchor_point = (0.5, 0.0)
lunar_label.anchored_position = (display.width // 2, 5)
lunar_label.text = "十月初一"
group.append(lunar_label)
display.root_group = group
在项目中的农历和天气信息均可正常显示中文内容,如下图(上中部和左下角位置)
连接WIF(任务2)
这个比较简单,无需额外的lib。但最好需要多次重试连接,避免上电时连接失败。
核心代码
import os
import wifi
import time
# 连接wifi
# 事先在settings.toml配置 WIFI信息:CIRCUITPY_WIFI_SSID=<wifi ssid名称>和CIRCUITPY_WIFI_PASSWORD=<wifi密码>
state = wifi.radio.connected
failure_count = 0
while not wifi.radio.connected:
try:
wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv("CIRCUITPY_WIFI_PASSWORD"))
except OSError as error:
print("Failed to connect, retrying\n", error)
failure_count += 1
time.sleep(5)
项目中在不同的联网状态下可进行不同的展示,如未联网时显示如下(左下角提醒未连WIFI)
控制WS2812B(任务3)
官方示例:Adafruit CircuitPython NeoPixel — Adafruit CircuitPython NeoPixel Library 1.0 documentation
需要额外Lib:neopixel、adafruit_led_animation
NeoPixel LED的使用方法非常灵活,官方有很多特效库,可完成各种效果。项目中使用不同的颜色指示当前开发板状态,如番茄钟计时显示为红色常量,番茄钟倒计时确认中显示为青蓝色闪烁等等。
核心代码
import board
# 需要导入neopixel库
import neopixel
# 需要导入adafruit_led_animation库
from adafruit_led_animation.animation.blink import Blink
pixel_pin = board.NEOPIXEL
pixel_num = 1
pixels = neopixel.NeoPixel(pixel_pin, pixel_num, brightness=0.05, auto_write=False)
# 红色常亮
pixels.fill((255, 0, 0))
pixels.show()
# 显示为青蓝色闪烁
blink = Blink(pixels, 0.5, color.CYAN)
blink.animate()
项目中在番茄确认时,LED灯为表蓝色闪烁,以便提醒用户进行番茄确认。
获取日期时间(任务4)
官方示例:Adafruit_Learning_System_Guides/Raspberry_Pi_Azure_IoT_Hub_Dashboard/qtPyEsp32S2_co2/code.py at 6e9b42bf65613e926cbcd5d2e3ddf19f7c4cbb54 · adafruit/Adafruit_Learning_System_Guides (github.com)
需要额外Lib:adafruit_ntp、adafruit_requests
因为开发板没有连接时钟模块,需要从NTP服务器获取时间,官方有现成的库可调用,为了稳定可使用国内的NTP服务器。NTP返回的数据中有日期和时间,为了方便获取农历日期,采用API的方式来获取,与天气API类似,需要注册账号使用API KEY来访问。
核心代码
import os
import wifi
import time
import ssl
import socketpool
# 需要导入adafruit_ntp库
import adafruit_ntp
# 需要导入adafruit_requests库
import adafruit_requests as requests
tz_offset=os.getenv("TIME_ZONE")
ntp = None
while True:
# 使用前需要先联网
# 获取当前ntp时间,
if not ntp:
pool = socketpool.SocketPool(wifi.radio)
try:
ntp = adafruit_ntp.NTP(pool, server = "cn.ntp.org.cn", tz_offset = tz_offset)
except OSError as error:
print("NTP failed, retrying\n", error)
pass
# 获取阴历
pool = socketpool.SocketPool(wifi.radio)
https = requests.Session(pool, ssl.create_default_context())
lunar_response = https.get(f'https://v2.alapi.cn/api/lunar?token={os.getenv("LUNAR_API_KEY")}')
lunar_json = lunar_response.json()
if lunar_json['code'] == 200:
lunar_data = lunar_json['data']
week_day = lunar_data['week_no']
week_name = lunar_data['week_name']
lunar_year_ganzhi = lunar_data['week_name']
lunar_month_chinese = lunar_data['lunar_month_chinese']
lunar_day_chinese = lunar_data['lunar_day_chinese']
print(f"当前农历:{lunar_year_ganzhi}年 {lunar_month_chinese}{lunar_day_chinese}")
print(f"当前星期:{week_name}")
# 仅在第二天时再更新
dt = ntp.datetime
print(f"当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour:02d}:{dt.tm_min:02d}")
wait_seconds = (24-dt.tm_hour)*60+(60 - dt.tm_sec)
time.sleep(wait_seconds)
项目中屏幕顶部依次为日期、农历、时间显示,如下图
天气信息获取(任务4)
官方示例:EEWORLDLINKTK4,需要先注册账号,获得API KEY才能使用。
密码类信息可统一保存在根目录下的settings.toml文件中,格式为<变量名> = <值>,然后在代码中使用os.getenv("<变量名>")来获取,安全方便。
为了方便修改城市信息,也将天气城市设置在toml文件,或者通过当前网络的外网IP获取到当地城市,可访问相关API获取到,坐标可参考代码。
核心代码:
import os
import wifi
import time
import ssl
import socketpool
# 需要导入adafruit_requests库
import adafruit_requests as requests
# 获取天气信息
# 获取天气API KEY
weather_api_key = os.getenv("WEATHER_API_KEY")
# 获取天气城市:从配置文件中读取城市设置
weather_city = os.getenv("WEATHER_CITY")
while True:
# 创建https
pool = socketpool.SocketPool(wifi.radio)
https = requests.Session(pool, ssl.create_default_context())
# 如果读取不到配置的城市,则获取当前IP城市
if not weather_city:
# 获取当前外网IP和城市
ip_city_response = https.get("https://myip.ipip.net/json")
ip_city_json = ip_city_response.json()
if ip_city_json["ret"] == "ok":
weather_city = ip_city_json['data']['location'][2]
print(f"当前IP城市:{weather_city}")
# 当前天气
weather_now_url = f"https://api.seniverse.com/v3/weather/now.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5"
weather_now_response = https.get(weather_now_url)
weather_json = weather_now_response.json()
if weather_json["results"]:
now_weather = weather_json["results"][0]["now"]["text"]
now_temperature = weather_json["results"][0]["now"]["temperature"]
print(f"当前天气:{now_weather},气温:{now_temperature}℃")
time.sleep(300) #5分钟更新一次
项目中天气和气温信息展示在屏幕底部,如下图
屏幕显示控制
官网示例链接:Introduction | CircuitPython Display Support Using displayio | Adafruit Learning System
需要额外Lib:board、displayio、terminalio
屏幕控制分为两个部分,第一部分为画为屏幕的展示布局,可使用官方提供的displayio库来实现;第二部分为控制显示内容,基本思路就是在每次循环中更新各元素的展示内容,如文字、数字、图片等。
该部分涉及到项目中的wifi连接、网络时间、天气等处理,而且需要同步控制屏幕显示。代码行数较多,因为番倒计时每秒更新屏幕内容,目前的代码执行时屏幕显示不是很顺畅,偶尔会卡住1-2秒,代码仍需要优化。
屏幕布局中各组件的从属关系如下图所示,简单说就是Display包含Group,Group中包含Bitmap。
屏幕坐标从左上角开始
bitmap_label中anchor_point为元素的参照位置点,值含义如下,如放置在屏幕左上角的元素可设置为(0,0),右下角的可设置为(1,1),具体可参考核心代码:
核心代码:
import board
import displayio
import terminalio
# 需要导入adafruit_display_text库
from adafruit_display_text import bitmap_label
# 需要导入adafruit_bitmap_font库
from adafruit_bitmap_font import bitmap_font
display = board.DISPLAY
# 中文字体文件放在font目录下
font_file = "font/wenquanyi_13px.pcf"
font = bitmap_font.load_font(font_file)
group = displayio.Group()
# 设置日期显示(左上角)
date_label = bitmap_label.Label(terminalio.FONT, scale=1)
date_label.anchor_point = (0.0, 0.0)
date_label.anchored_position = (5, 5)
date_label.text = "2023-11-11"
group.append(date_label)
# 设置时间显示(右上角)
time_label = bitmap_label.Label(terminalio.FONT, color=0xFF0000, scale=2)
time_label.anchor_point = (1.0, 0.0)
time_label.anchored_position = (display.width - 2, 2)
time_label.text = "11:30"
group.append(time_label)
# 设置农历显示(上中部)
lunar_label = bitmap_label.Label(font, color=0x0000FF, scale=1)
lunar_label.anchor_point = (0.5, 0.0)
lunar_label.anchored_position = (display.width // 2, 5)
lunar_label.text = "十月初一"
group.append(lunar_label)
# 设置天气显示(左下角)
weather_label = bitmap_label.Label(font, color=0x00FF00, scale=1)
weather_label.anchor_point = (0.0, 1.0)
weather_label.anchored_position = (2, display.height - 5)
weather_label.text = "晴"
group.append(weather_label)
# 设置气温显示(下中部)
temperature_label = bitmap_label.Label(font, color=0xFFFF00, scale=1)
temperature_label.anchor_point = (0.5, 1.0)
temperature_label.anchored_position = (display.width // 2, display.height - 5)
temperature_label.text = "0℃"
group.append(temperature_label)
# 设置番茄钟倒计时显示(中间)
pomodoro_label = bitmap_label.Label(terminalio.FONT, color=0xFF00FF, scale=7)
# 显示位置
pomodoro_label.anchor_point = (0.5, 0.5)
pomodoro_label.anchored_position = (display.width // 2, display.height // 2)
pomodoro_label.text = "15:00"
group.append(pomodoro_label)
# 设置倒番茄钟统计显示(右下角)
count_label = bitmap_label.Label(terminalio.FONT, color=0x00FFFF, scale=2)
# 显示位置
count_label.anchor_point = (1, 1)
count_label.anchored_position = (display.width - 2, display.height - 2)
count_label.text = "0"
group.append(count_label)
os.getenv("POMODORO_TIMEOUT")
# 定义main_group
main_group = displayio.Group()
main_group.append(group)
# 只能有一个main_group
display.root_group = main_group
# 更新屏幕内容,以下为示例代码,需要环境变量,无法直接执行
while True:
timeout = 0.1
if internet.state and ntp_datetime.ntp:
dt = ntp_datetime.ntp.datetime
text = f'当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour}:{dt.tm_min} ,当前城市:{weather.city}, 当前天气:{weather.text} ,温度:{weather.temperature}℃'
# 设置日期文本
date_label.text = f"{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday}"
# 设置时间文本
time_label.text = f"{dt.tm_hour}:{dt.tm_min}"
# 设置农历文本
lunar_label.text = f"{lunar_month_chinese}月{lunar_day_chinese}"
if weather.text:
# 设置天气文本
weather_label.text = f"{weather.text}"
# 设置气温文本
temperature_label.text = f"{weather.temperature}℃"
else:
text = f'请先连接WIFI'
weather_label.text = text
await asyncio.sleep(timeout)
效果图
多任务并发控制
官方示例:Concurrent Tasks | Cooperative Multitasking in CircuitPython with asyncio | Adafruit Learning System
需要额外Lib:asyncio、adafruit_ticks
项目中需要控制不同的网络请求,并需要在屏幕展示不同的内容,在同一个循环中实现会非常复杂。需要使用并发控制将各事件进行解耦,方便代码编写。即Python中协程异步IO(asyncio),参考官方的文档,该实现可总结为以下几个步骤
在普通函数前添加async关键字,将其转为协程函数
使用await asyncio.sleep代替time.sleep进行延迟,释放对控制器的独占
定义好各协程函数后,使用 asyncio.create_task(some_coroutine(arg1, arg2, ...))对其进行调用
使用 await asyncio.gather(task1, task2, ...)等待各协程任务完成
再次强调不要忘记使用 await
核心代码:这里是官方示例,实际代码可参考下面完整的项目源码
# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries
#
# SPDX-License-Identifier: MIT
import asyncio
import board
import digitalio
import keypad
class Interval:
"""Simple class to hold an interval value. Use .value to to read or write."""
def __init__(self, initial_interval):
self.value = initial_interval
async def monitor_interval_buttons(pin_slower, pin_faster, interval):
"""Monitor two buttons: one lengthens the interval, the other shortens it.
Change interval.value as appropriate.
"""
# Assume buttons are active low.
with keypad.Keys(
(pin_slower, pin_faster), value_when_pressed=False, pull=True
) as keys:
while True:
key_event = keys.events.get()
if key_event and key_event.pressed:
if key_event.key_number == 0:
# Lengthen the interval.
interval.value += 0.1
else:
# Shorten the interval.
interval.value = max(0.1, interval.value - 0.1)
print("interval is now", interval.value)
# Let another task run.
await asyncio.sleep(0)
async def blink(pin, interval):
"""Blink the given pin forever.
The blinking rate is controlled by the supplied Interval object.
"""
with digitalio.DigitalInOut(pin) as led:
led.switch_to_output()
while True:
led.value = not led.value
await asyncio.sleep(interval.value)
async def main():
interval1 = Interval(0.5)
interval2 = Interval(1.0)
led1_task = asyncio.create_task(blink(board.D1, interval1))
led2_task = asyncio.create_task(blink(board.D2, interval2))
interval1_task = asyncio.create_task(
monitor_interval_buttons(board.D3, board.D4, interval1)
)
interval2_task = asyncio.create_task(
monitor_interval_buttons(board.D5, board.D6, interval2)
)
await asyncio.gather(led1_task, led2_task, interval1_task, interval2_task)
asyncio.run(main())
ESP32S3触摸传感器使用
官方示例:CircuitPython Cap Touch | CircuitPython Essentials | Adafruit Learning System
ESP32S3中内置了多个触摸传感器,可以无需外接设备即可实现输入控制,本项目中用于番茄钟的开始触发和确认功能。使用时可将支持触摸传感器的GPIO接一根导线或直接触摸PCB板。具体哪个GPIO支持触摸,可通过执行以下官方代码给出清单。
Capacitive Touch | Adafruit ESP32-S3 TFT Feather | Adafruit Learning System
执行结果为:
# 只显示支持触摸的GPIO
Touch on: A4
Touch on: A5
Touch on: TX
Touch on: D10
Touch on: D11
Touch on: D12
Touch on: LED
Touch on: RX
Touch on: D5
Touch on: D6
Touch on: D9
测试结果为11个,但经测试并不是每个都能使用,如其中的A4、A5一直都是触发状态,原因不明,有了解的小伙伴欢迎评论。项目中使用了D10,位于屏幕上侧中部,经使用测试,触摸反应很灵敏。
核心代码
import board
import touchio
import time
touch = touchio.TouchIn(board.D10)
while True:
if touch.value:
# 按钮开始番茄计时
print("start")
time.sleep(0.1)
多设备MQTT通信
本次活动一起下单的还有其他两块ESP32的单片机,用来模拟多设备通信。
本实验使用MQTT来实现多设备间通信,具体设备清单如下:
设备1:本次项目的主控,Adafruit ESP32-S3 TFT Feather,用来模拟中控面板,用来显示灯的开关状态。
设备2:另一款ESP32S3的开发板,AtomS3 Lite ESP32S3,用来模拟智能灯
设备3:ESP32C3模块,esp32-c3-wroom-02,经过简单的外围电路焊接,可用来模拟智能开关
效果展示:
设备1:显示灯的状态,右下角on和off
设备2:使用其背面的LED来模拟智能灯,如下图
设备3:模拟智能开关,模组添加了轻触按键,用来模拟开关。
简单说下模组的接线。因为这款是ESP32C3模组,无法直接使用,这里焊接了必要的外围和开关,通过USB2TTL模块接入,如下图。除了3V3、GND、TXD、RXD直接接入USB2TTL外,EN、IO2、IO8需要接高电平,这里各串联3个10K电阻接到3V3,按键开关两端接在IO9和GND,加电时按下可进行固件刷写,否则正常启动。(条件有限,仅用来做实验)
环境搭建
在本地搭建MQTT服务器,用的是mosquitto,流程比较简单,教程也比较多,不再赘述。
MQTT的设计如下:
定义了两个mqtt的topic:cmd/d2/led1和state/d2/led1,其中cmd为发送开灯命令,state为灯的状态更新。d2为设备,本实验中只涉及到设备2,定义为d2,led1为设备中的具体模块,本例中为led1。
其中设备1订阅state/d2/led1主题,当接收的消息为1时表示灯已开,为0时表示灯已关,对应屏幕上显示on和off。
设备2订阅cmd/d2/led1主题,当接收的消息为1时表示开灯操作(0为关机操作,2为切换操作),此时将本机的led1打开,并同时发送消息到主题state/d2/led1,消息内容为灯的具体状态0或1。同时也可通过设备本机的按键来进行开关灯操作,同样也发送相应的mqtt消息。
设备3监听本机按键操作,当按键按下时发送消息到主题cmd/d2/led1,消息内容为2,表示切换开灯操作。
核心代码
设备1和设备2都是ESP32S3芯片,直接使用CircuitPython环境;设备3为ESP32C3,使用了microPython环境。
设备1(中控端,负责显示灯状态)
# 此处为核心代码,完整代码见项目源码
import displayio
# 需要导入adafruit_display_text库
from adafruit_display_text import bitmap_label
# 需要导入adafruit_minimqtt库
import adafruit_minimqtt.adafruit_minimqtt as MQTT
# MQTT
class MqttClient:
def __init__(self):
self.mqtt = None
self.is_connected = False
self.host = os.getenv("MQTT_HOST")
self.port = os.getenv("MQTT_PORT")
self.user = os.getenv("MQTT_USER")
self.password = os.getenv("MQTT_PASSWORD")
self.client_id = os.getenv("MQTT_CLIENT_ID")
self.topic = os.getenv("MQTT_TOPIC")
self.led = "led1"
self.cmd_topic = f"cmd/{self.topic}/{self.led}"
self.state_topic = f"stat/{self.topic}/{self.led}"
self.led = None
self.wait = 0.1
def connect(self, mqtt_cli, userdata, flags, rc):
print(f"Connected to MQTT {self.host}")
self.is_connected = True
mqtt_cli.subscribe(self.state_topic)
def disconnect(self, mqtt_cli, userdata, rc):
print(f"Disconnected from MQTT {self.host}")
self.is_connected = False
def message(self, client, topic, message):
print(f"New message on topic {topic}: {message}")
# 0-关,1-关,2-切换
if topic == self.state_topic:
if message == '0':
self.led = False
elif message == '1':
self.led = True
async def mqtt_connect(mqtt_client):
while True:
if not mqtt_client.mqtt:
print("Set up a MiniMQTT Client")
pool = socketpool.SocketPool(wifi.radio)
mqtt = MQTT.MQTT(
broker=mqtt_client.host,
username=mqtt_client.user,
password=mqtt_client.password,
socket_pool=pool,
client_id=mqtt_client.client_id
)
mqtt.on_connect = mqtt_client.connect
mqtt.on_disconnect = mqtt_client.disconnect
mqtt.on_message = mqtt_client.message
mqtt_client.mqtt = mqtt
mqtt.connect()
try:
mqtt_client.mqtt.loop()
except (ValueError, RuntimeError) as e:
print("Failed to get data, retrying\n", e)
mqtt_client.mqtt.reconnect()
continue
await asyncio.sleep(mqtt_client.wait)
# 屏幕显示
async def lcd_display(internet, mqtt_client, ntp_datetime, weather, pomodoro):
display = board.DISPLAY
group = displayio.Group()
# 设置MQTT状态(下中右部)
mqtt_label = bitmap_label.Label(terminalio.FONT, scale=1)
mqtt_label.anchor_point = (0.5, 1.0)
mqtt_label.anchored_position = (display.width // 1.3, display.height - 5)
mqtt_label.text = "-"
group.append(mqtt_label)
# 其他代码省略,完整代码见项目源码
# 创建根group
main_group = displayio.Group()
main_group.append(group)
# 展示
display.root_group = main_group
while True:
# MQTT LED状态
if mqtt_client.mqtt.is_connected:
if mqtt_client.led:
# 亮灯显示为红色
led_text = "on"
mqtt_label.color=0xFF0000
else:
led_text = "off"
mqtt_label.color=0xFFFFFF
if mqtt_label.text != led_text:
mqtt_label.text = led_text
# 其他代码省略,完整代码见项目源码
await asyncio.sleep(pomodoro.wait)
设备2(智能灯)
import board
import os
import wifi
import socketpool
from digitalio import DigitalInOut, Direction, Pull
# 需要导入adafruit_minimqtt库
import adafruit_minimqtt.adafruit_minimqtt as MQTT
# 需要导入neopixel库
import neopixel
# 需要导入asyncio、adafruit_ticks库
import asyncio
# MQTT
class MqttClient:
def __init__(self):
self.mqtt = None
self.is_connected = False
self.host = os.getenv("MQTT_HOST")
self.port = os.getenv("MQTT_PORT")
self.user = os.getenv("MQTT_USER")
self.password = os.getenv("MQTT_PASSWORD")
self.client_id = os.getenv("MQTT_CLIENT_ID")
self.topic = os.getenv("MQTT_TOPIC")
self.led = "led1"
self.cmd_topic = f"cmd/{self.topic}/{self.led}"
self.state_topic = f"stat/{self.topic}/{self.led}"
self.led = None
self.wait = 0.1
def connect(self, mqtt_cli, userdata, flags, rc):
print(f"Connected to MQTT {self.host}")
self.is_connected = True
mqtt_cli.subscribe(self.cmd_topic)
def disconnect(self, mqtt_cli, userdata, rc):
print(f"Disconnected from MQTT {self.host}")
self.is_connected = False
def message(self, client, topic, message):
print(f"New message on topic {topic}: {message}")
# 0-关,1-关,2-切换
print(f"topic: {topic}")
if topic == self.cmd_topic:
print(f"message: {message}")
if self.led:
if message == "0":
self.led.power = False
elif message == "1":
self.led.power = True
elif message == "2":
self.led.power = not self.led.power
# LED
class LED:
def __init__(self):
self.pixels = None
self.power = False
self.wait = 0.05
# 连接wifi
async def wifi_connect(internet):
# 事件在settings.toml配置 WIFI信息:CIRCUITPY_WIFI_SSID=<wifi ssid名称>和CIRCUITPY_WIFI_PASSWORD=<wifi密码>
failure_count = 0
while True:
internet.state = wifi.radio.connected
if not wifi.radio.connected:
try:
wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv(
"CIRCUITPY_WIFI_PASSWORD"))
except OSError as error:
print("Failed to connect, retrying\n", error)
failure_count += 1
await asyncio.sleep(internet.wait)
# 如果wifi没有正常连接,则切换为ap模式
# if not wifi_state:
# wifi.radio.start_ap(ssid = 'esp32s3_ap', password = 'abcd1234')
async def mqtt_connect(mqtt_client):
while True:
if not mqtt_client.mqtt:
print("Set up a MiniMQTT Client")
pool = socketpool.SocketPool(wifi.radio)
mqtt = MQTT.MQTT(
broker=mqtt_client.host,
username=mqtt_client.user,
password=mqtt_client.password,
socket_pool=pool,
client_id=mqtt_client.client_id
)
mqtt.on_connect = mqtt_client.connect
mqtt.on_disconnect = mqtt_client.disconnect
mqtt.on_message = mqtt_client.message
mqtt_client.mqtt = mqtt
mqtt.connect()
try:
mqtt_client.mqtt.loop()
except (ValueError, RuntimeError) as e:
print("Failed to get data, retrying\n", e)
mqtt_client.mqtt.reconnect()
continue
await asyncio.sleep(mqtt_client.wait)
# 按钮点击检测
async def monitor_buttons(led):
button = DigitalInOut(board.BTN)
while True:
if not button.value:
# 按钮按下,防抖
# await asyncio.sleep(led.wait)
# if not button.value:
print("button pressed")
led.power = not led.power
await asyncio.sleep(led.wait)
# led控制
async def pixels_led(led, mqtt_client):
if not led.pixels:
# Neopixel LED定义
pixel_pin = board.NEOPIXEL
pixel_num = 1
pixels = neopixel.NeoPixel(pixel_pin, pixel_num, brightness=0, auto_write=True)
pixels.fill((255, 255, 255))
led.pixels = pixels
last_power = None
mqtt_client.led = led
while True:
if led.power:
pixels.brightness = 1
else:
pixels.brightness = 0
if last_power != led.power:
print("led toggle")
last_power = led.power
print(mqtt_client)
if mqtt_client.is_connected:
print(mqtt_client.state_topic)
mqtt_client.mqtt.publish(mqtt_client.state_topic, 1 if led.power else 0)
await asyncio.sleep(led.wait)
async def main():
# 共享变量设置
mqtt_client = MqttClient()
led = LED()
# 协程函数定义
mqtt_task = asyncio.create_task(mqtt_connect(mqtt_client))
monitor_buttons_task = asyncio.create_task(monitor_buttons(led))
pixels_led_task = asyncio.create_task(pixels_led(led,mqtt_client))
# 启动协程
await asyncio.gather(mqtt_task, monitor_buttons_task, pixels_led_task)
asyncio.run(main())
设备3(智能开关)
import ujson
import network
from umqtt.simple import MQTTClient
from machine import Pin
from time import sleep_ms
# 保存 Wi-Fi MQTT 配置到配置文件
def save_wifi_config(ssid, password, mqtt_host, mqtt_port, mqtt_user, mqtt_password):
config = {
'wifi_ssid': ssid,
'wifi_password': password,
'mqtt_host': mqtt_host,
'mqtt_port': mqtt_port,
'mqtt_user': mqtt_user,
'mqtt_password': mqtt_password
}
with open('config.json', 'w') as f:
ujson.dump(config, f)
# 从配置文件中读取 Wi-Fi 配置
def load_wifi_config():
try:
with open('config.json', 'r') as f:
config = ujson.load(f)
return config['wifi_ssid'], config['wifi_password']
except OSError:
return None, None
# 从配置文件中读取 MQTT 配置
def load_mqtt_config():
try:
with open('config.json', 'r') as f:
config = ujson.load(f)
return config['mqtt_host'], config['mqtt_port'], config['mqtt_user'], config['mqtt_password']
except OSError:
return None, None, None, None
# 示例用法:保存 Wi-Fi 配置到配置文件
# wifi_ssid = 'your_wifi_ssid'
# wifi_password = 'your_wifi_password'
# mqtt_host = ''
# mqtt_port = ''
# mqtt_user = ''
# mqtt_password = ''
# save_wifi_config(wifi_ssid, wifi_password, mqtt_host, mqtt_port, mqtt_user, mqtt_password)
# 示例用法:从配置文件中读取 Wi-Fi 配置
# saved_wifi_ssid, saved_wifi_password = load_wifi_config()
# 设置 Wi-Fi 连接
wifi_ssid, wifi_password = load_wifi_config()
station = network.WLAN(network.STA_IF)
station.active(True)
station.connect(wifi_ssid, wifi_password)
# 设置 MQTT 客户端
mqtt_broker, mqtt_port, mqtt_username, mqtt_password = load_mqtt_config()
mqtt_client_id = 'd2'
mqtt_topic = 'cmd/d1/led1'
mqtt_client = MQTTClient(mqtt_client_id, mqtt_broker, port=mqtt_port, user=mqtt_username, password=mqtt_password)
mqtt_client.connect()
# 设置按键引脚
button_pin = Pin(9, Pin.IN, Pin.PULL_UP)
# 按键防抖延迟时间(毫秒)
debounce_delay = 100
# 检测按键状态并处理按下事件
def handle_button_press():
if not button_pin.value():
sleep_ms(debounce_delay)
if not button_pin.value():
# 当按键按下时,发送 MQTT 消息
mqtt_client.publish(mqtt_topic, '2')
while True:
handle_button_press()
项目最终完整代码
终于完成了🧗♂️,完整代码如下
"""
番茄日历钟
ltpop@163.com
20231126
"""
# TOML配置文件读取
import os
import time
import rtc
import board
import displayio
import terminalio
import touchio
import wifi
import ssl
import socketpool
# 需要导入asyncio、adafruit_ticks库
import asyncio
# 需要导入neopixel库
import neopixel
# 需要导入adafruit_ntp库
import adafruit_ntp
# 需要导入adafruit_display_text库
from adafruit_display_text import bitmap_label
# 需要导入adafruit_bitmap_font库
from adafruit_bitmap_font import bitmap_font
# 需要导入adafruit_imageload库
import adafruit_imageload
# 需要导入adafruit_requests库
import adafruit_requests as requests
# 需要导入adafruit_led_animation库
from adafruit_led_animation.animation.blink import Blink
from adafruit_led_animation.animation.rainbow import Rainbow
from adafruit_led_animation.sequence import AnimationSequence
import adafruit_led_animation.color as color
# 需要导入adafruit_minimqtt库
import adafruit_minimqtt.adafruit_minimqtt as MQTT
# 当前设备联网状态
class Internet:
def __init__(self):
self.state = False
self.wait = 30.0
# MQTT
class MqttClient:
def __init__(self):
self.mqtt = None
self.is_connected = False
self.host = os.getenv("MQTT_HOST")
self.port = os.getenv("MQTT_PORT")
self.user = os.getenv("MQTT_USER")
self.password = os.getenv("MQTT_PASSWORD")
self.client_id = os.getenv("MQTT_CLIENT_ID")
self.topic = os.getenv("MQTT_TOPIC")
self.led = "led1"
self.cmd_topic = f"cmd/{self.topic}/{self.led}"
self.state_topic = f"stat/{self.topic}/{self.led}"
self.led = None
self.wait = 0.1
def connect(self, mqtt_cli, userdata, flags, rc):
print(f"Connected to MQTT {self.host}")
self.is_connected = True
mqtt_cli.subscribe(self.state_topic)
def disconnect(self, mqtt_cli, userdata, rc):
print(f"Disconnected from MQTT {self.host}")
self.is_connected = False
def message(self, client, topic, message):
print(f"New message on topic {topic}: {message}")
# 0-关,1-关,2-切换
if topic == self.state_topic:
if message == '0':
self.led = False
elif message == '1':
self.led = True
# 当前天气状态
class Weather:
def __init__(self):
self.city = ''
self.text = None
self.temperature = 20.0
self.wait = 3600.0
# 当前日期时间状态
class NTPDatetime:
def __init__(self):
self.datetime = None
self.ntp = None
self.ntp_sync = None
self.next_lunar_request_time = None
self.weekday = None
self.week_name = None
self.lunar_year_ganzhi = None
self.lunar_month_chinese = None
self.lunar_day_chinese = None
self.wait = 3600.0
self.retry = 10
# 番茄统计
class Pomodoro:
def __init__(self):
self.count = 0
self.run = False
self.end = 0
self.confirming = False
self.confirmed = False
self.time = os.getenv("POMODORO_TIME")
self.timeout = os.getenv("POMODORO_TIMEOUT")
self.wait = 0.2
# 连接wifi
async def wifi_connect(internet):
# 事件在settings.toml配置 WIFI信息:CIRCUITPY_WIFI_SSID=<wifi ssid名称>和CIRCUITPY_WIFI_PASSWORD=<wifi密码>
failure_count = 0
while True:
internet.state = wifi.radio.connected
if not wifi.radio.connected:
try:
wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv(
"CIRCUITPY_WIFI_PASSWORD"))
except OSError as error:
print("Failed to connect, retrying\n", error)
failure_count += 1
await asyncio.sleep(internet.wait)
# 如果wifi没有正常连接,则切换为ap模式
# if not wifi_state:
# wifi.radio.start_ap(ssid = 'esp32s3_ap', password = 'abcd1234')
async def mqtt_connect(mqtt_client):
while True:
if not mqtt_client.mqtt:
print("Set up a MiniMQTT Client")
pool = socketpool.SocketPool(wifi.radio)
mqtt = MQTT.MQTT(
broker=mqtt_client.host,
username=mqtt_client.user,
password=mqtt_client.password,
socket_pool=pool,
client_id=mqtt_client.client_id
)
mqtt.on_connect = mqtt_client.connect
mqtt.on_disconnect = mqtt_client.disconnect
mqtt.on_message = mqtt_client.message
mqtt_client.mqtt = mqtt
mqtt.connect()
try:
mqtt_client.mqtt.loop()
except (ValueError, RuntimeError) as e:
print("Failed to get data, retrying\n", e)
mqtt_client.mqtt.reconnect()
continue
await asyncio.sleep(mqtt_client.wait)
# 获取时间
async def fetch_time(internet, ntp_datetime):
tz_offset = os.getenv("TIME_ZONE")
the_rtc = rtc.RTC()
ntp = None
while True:
if internet.state:
ntp_ok = False
lunar_ok = False
pool = socketpool.SocketPool(wifi.radio)
# 获取当前ntp时间,
if not ntp:
try:
ntp = adafruit_ntp.NTP(
pool, server="ntp.ntsc.ac.cn", tz_offset=tz_offset)
# 更新系统时间
the_rtc.datetime = ntp.datetime
ntp_datetime.ntp = ntp
ntp_ok = True
ntp_datetime.ntp_sync = True
except OSError as error:
print("NTP failed, retrying\n", error)
ntp = None
lunar_need_update = False
if not ntp_datetime.next_lunar_request_time:
lunar_need_update = True
elif time.time() > ntp_datetime.next_lunar_request_time:
# 超过下一次请求时间
lunar_need_update = True
if lunar_need_update:
# 获取阴历
try:
https = requests.Session(pool, ssl.create_default_context())
lunar_response = https.get(
f'https://v2.alapi.cn/api/lunar?token={os.getenv("LUNAR_API_KEY")}')
except RuntimeError as error:
print("Lunar request failed, retrying\n", error)
lunar_response = None
if lunar_response:
lunar_json = lunar_response.json()
if lunar_json['code'] == 200:
lunar_data = lunar_json['data']
# print(lunar_data)
week_day = lunar_data['week_no']
if week_day:
ntp_datetime.week_day = week_day
week_name = lunar_data['week_name']
if week_name:
ntp_datetime.week_name = week_name
lunar_year_ganzhi = lunar_data['ganzhi_year']
if lunar_year_ganzhi:
ntp_datetime.lunar_year_ganzhi = lunar_year_ganzhi
lunar_month_chinese = lunar_data['lunar_month_chinese']
if lunar_month_chinese:
ntp_datetime.lunar_month_chinese = lunar_month_chinese
lunar_day_chinese = lunar_data['lunar_day_chinese']
if lunar_day_chinese:
ntp_datetime.lunar_day_chinese = lunar_day_chinese
lunar_ok = True
print(
f"当前阴历:{lunar_year_ganzhi}年 {lunar_month_chinese}{lunar_day_chinese}")
print(f"当前星期:{week_name}")
# 设置下一次更新时间为第二天0点
current_time = time.localtime()
ntp_datetime.next_lunar_request_time = time.mktime((current_time.tm_year, current_time.tm_mon, current_time.tm_mday + 1, 0, 0, 0, 0, 0, 0))
# 仅在第二天时再更新
if not ntp_ok or not lunar_ok:
await asyncio.sleep(ntp_datetime.retry)
else:
# dt = time.localtime()
# print(
# f"当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour:02d}:{dt.tm_min:02d}")
# wait_seconds = (24-dt.tm_hour)*60+(60 - dt.tm_sec)
await asyncio.sleep(ntp_datetime.wait)
else:
await asyncio.sleep(internet.wait)
# 获取天气信息
async def fetch_weather(internet, weather):
# 获取天气API KEY
weather_api_key = os.getenv("WEATHER_API_KEY")
# 获取天气城市:从配置文件中读取城市设置
weather_city = os.getenv("WEATHER_CITY")
while True:
if internet.state:
# 天气信息
pool = socketpool.SocketPool(wifi.radio)
https = requests.Session(pool, ssl.create_default_context())
# 如果读取不到配置的城市,则获取当前IP城市
if not weather_city:
# 获取当前外网IP和城市
try:
ip_city_response = https.get("https://myip.ipip.net/json")
except RuntimeError as error:
print("IP city request failed, retrying\n", error)
if ip_city_response:
ip_city_json = ip_city_response.json()
if ip_city_json["ret"] == "ok":
weather_city = ip_city_json['data']['location'][2]
print(f"当前IP城市:{weather_city}")
weather.city = weather_city
# 当前天气
weather_now_url = f"https://api.seniverse.com/v3/weather/now.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5"
try:
weather_now_response = https.get(weather_now_url)
except RuntimeError as error:
print("Weather request failed, retrying\n", error)
weather_now_response = None
if weather_now_response:
weather_json = weather_now_response.json()
if weather_json["results"]:
now_weather = weather_json["results"][0]["now"]["text"]
now_temperature = weather_json["results"][0]["now"]["temperature"]
weather.text = now_weather
weather.temperature = now_temperature
print(f"当前天气:{now_weather},气温:{now_temperature}℃")
# 未来天气预报
# weather_daily_url = f"https://api.seniverse.com/v3/weather/daily.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5"
# weather_response = https.get(weather_daily_url)
# weather_json = weather_response.json()
# if weather_json["results"]:
# today_weather = weather_json["results"][0]["daily"][0]["text_day"]
# today_temprature_low = weather_json["results"][0]["daily"][0]["low"]
# today_temprature_high = weather_json["results"][0]["daily"][0]["high"]
# today_humidity = weather_json["results"][0]["daily"][0]["humidity"]
# print(f"明天天气:{today_weather},气温:{today_temprature_low}℃ - {today_temprature_high}℃,温度:{today_humidity}%")
await asyncio.sleep(weather.wait)
else:
await asyncio.sleep(internet.wait)
# led显示
async def pixels_led(internet, pomodoro):
# Neopixel LED控制
pixel_pin = board.NEOPIXEL
pixel_num = 1
pixels = neopixel.NeoPixel(
pixel_pin, pixel_num, brightness=0.05, auto_write=False)
rainbow = Rainbow(pixels, speed=0.1, period=2)
blink = Blink(pixels, 0.5, color.GREEN)
animations = AnimationSequence(
rainbow,
advance_interval=5,
auto_clear=True,
)
while True:
# 番茄进行中,显示为红色常亮
if pomodoro.run:
# 番茄等待确认中,显示为绿色闪烁
if pomodoro.confirming:
blink.speed = 0.5
blink.color = color.GREEN
blink.animate()
else:
pixels.fill((255, 0, 0))
pixels.show()
elif not internet.state:
blink.speed = 2
blink.color = color.RED
blink.animate()
else:
# 否则显示为彩虹色
animations.animate()
await asyncio.sleep(pomodoro.wait)
# 屏幕显示
async def lcd_display(internet, mqtt_client, ntp_datetime, weather, pomodoro):
display = board.DISPLAY
# 中文字体文件放在font目录下
font_file = "font/wenquanyi_13px.pcf"
font = bitmap_font.load_font(font_file)
group = displayio.Group()
# 设置日期显示(左上角)
date_label = bitmap_label.Label(terminalio.FONT, scale=1)
date_label.anchor_point = (0.0, 0.0)
date_label.anchored_position = (5, 5)
date_label.text = "2023-11-11"
group.append(date_label)
# 设置时间显示(右上角)
time_label = bitmap_label.Label(terminalio.FONT, color=0xFF0000, scale=2)
time_label.anchor_point = (1.0, 0.0)
time_label.anchored_position = (display.width - 2, 2)
time_label.text = "11:30"
group.append(time_label)
# 设置农历显示(上中部)
lunar_label = bitmap_label.Label(font, color=0x0000FF, scale=1)
lunar_label.anchor_point = (0.5, 0.0)
lunar_label.anchored_position = (display.width // 2, 5)
lunar_label.text = "九月廿八"
group.append(lunar_label)
# 设置天气显示(左下角)
weather_label = bitmap_label.Label(font, color=0x00FF00, scale=1)
weather_label.anchor_point = (0.0, 1.0)
weather_label.anchored_position = (2, display.height - 5)
weather_label.text = "晴"
group.append(weather_label)
# 设置气温显示(下中部)
temperature_label = bitmap_label.Label(font, color=0xFFFF00, scale=1)
temperature_label.anchor_point = (0.5, 1.0)
temperature_label.anchored_position = (
display.width // 2, display.height - 5)
temperature_label.text = "5℃"
group.append(temperature_label)
# 设置MQTT状态(下中右部)
mqtt_label = bitmap_label.Label(terminalio.FONT, scale=1)
mqtt_label.anchor_point = (0.5, 1.0)
mqtt_label.anchored_position = (display.width // 1.3, display.height - 5)
mqtt_label.text = "-"
group.append(mqtt_label)
# 设置番茄钟倒计时显示(中间)
pomodoro_label = bitmap_label.Label(
terminalio.FONT, color=0xFF00FF, scale=7)
# 显示位置
pomodoro_label.anchor_point = (0.5, 0.5)
pomodoro_label.anchored_position = (
display.width // 2, display.height // 2)
pomodoro_label.text = "15:00"
group.append(pomodoro_label)
# 设置倒番茄钟统计显示(右下角)
# with open("img/tomato.bmp", "rb") as f:
# image, palette = adafruit_imageload.load(f, bitmap=displayio.Bitmap, palette=displayio.Palette)
# sprite = displayio.TileGrid(image, pixel_shader=palette)
# group.append(sprite)
count_label = bitmap_label.Label(terminalio.FONT, color=0x00FFFF, scale=2)
# 显示位置
count_label.anchor_point = (1, 1)
count_label.anchored_position = (display.width - 2, display.height - 2)
count_label.text = "0"
group.append(count_label)
# 番茄倒计时结束时的确认超时时间,超时不确认该番茄不计入统计
os.getenv("POMODORO_TIMEOUT")
# 创建根group
main_group = displayio.Group()
main_group.append(group)
# 展示
display.root_group = main_group
while True:
if ntp_datetime.ntp_sync:
dt = time.localtime()
# text = f'当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour}:{dt.tm_min} ,当前城市:{weather.city}, 当前天气:{weather.text} ,温度:{weather.temperature}℃'
# 设置日期文本
date_text = f"{dt.tm_year:04d}-{dt.tm_mon:02d}-{dt.tm_mday:02d}"
if date_label.text != date_text:
date_label.text = date_text
# 设置时间文本
time_text = f"{dt.tm_hour:02d}:{dt.tm_min:02d}"
if time_label.text != time_text:
time_label.text = time_text
# 设置农历文本
if ntp_datetime.lunar_month_chinese and ntp_datetime.lunar_day_chinese:
lunar_text = f"{ntp_datetime.lunar_month_chinese}{ntp_datetime.lunar_day_chinese}"
if lunar_label.text != lunar_text:
lunar_label.text = lunar_text
if weather.text:
# 设置天气文本
if weather_label.text != weather.text:
weather_label.text = weather.text
# 设置气温文本
temperature_text = f"{weather.temperature}℃"
if temperature_label.text != temperature_text:
temperature_label.text = temperature_text
else:
weather_text = f'请先连接WIFI'
if weather_label.text != weather_text:
weather_label.text = weather_text
# MQTT LED状态
if mqtt_client.mqtt.is_connected:
if mqtt_client.led:
# 亮灯显示为红色
led_text = "on"
mqtt_label.color=0xFF0000
else:
led_text = "off"
mqtt_label.color=0xFFFFFF
if mqtt_label.text != led_text:
mqtt_label.text = led_text
# 更新番茄钟
pomodoro_label.color = 0x00FFFF
count_text = f"{pomodoro.count}"
if count_label.text != count_text:
count_label.text = count_text
if pomodoro.run:
left_seconds = pomodoro.end - time.monotonic()
if left_seconds >= 0:
minute = int(left_seconds / 60)
second = int(left_seconds % 60)
# 倒计时每秒更新一次
pomodoro_text = f"{minute:02d}:{second:02d}"
if pomodoro_label.text != pomodoro_text:
pomodoro_label.text = pomodoro_text
else:
# 番茄完成时,需要在超时时间内按键确认方可统计为完成的番茄数
timeout_seconds = abs(left_seconds)
if not pomodoro.confirmed and timeout_seconds < pomodoro.timeout:
pomodoro.confirming = True
weather_label.text = f'番茄等待确认'
# 超时时显示为红色
pomodoro_label.color = 0xFF0000
pomodoro_label.text = f"{int(pomodoro.timeout - timeout_seconds):02d}"
else:
pomodoro_label.text = f'{int(pomodoro.time/60):02d}:{int(pomodoro.time%60):02d}'
pomodoro.confirming = False
pomodoro.confirmed = False
pomodoro.run = False
await asyncio.sleep(pomodoro.wait)
async def monitor_touch_buttons(pomodoro):
touch = touchio.TouchIn(board.D10)
while True:
if touch.value:
await asyncio.sleep(0.1)
if touch.value:
# 按钮开始番茄计时
if not pomodoro.run:
pomodoro.run = True
pomodoro.end = time.monotonic() + pomodoro.time
# 番茄确认状态时,将番茄数加1
elif pomodoro.confirming:
pomodoro.confirmed = True
pomodoro.count += 1
await asyncio.sleep(0.1)
async def main():
# 共享变量设置
internet = Internet()
mqtt_client = MqttClient()
ntp_datetime = NTPDatetime()
weather = Weather()
pomodoro = Pomodoro()
# 协程函数定义
internet_task = asyncio.create_task(wifi_connect(internet))
mqtt_task = asyncio.create_task(mqtt_connect(mqtt_client))
fetch_time_task = asyncio.create_task(fetch_time(internet, ntp_datetime))
fetch_weather_task = asyncio.create_task(fetch_weather(internet, weather))
pixels_led_task = asyncio.create_task(pixels_led(internet, pomodoro))
lcd_display_task = asyncio.create_task(
lcd_display(internet, mqtt_client, ntp_datetime, weather, pomodoro))
monitor_touch_buttons_task = asyncio.create_task(
monitor_touch_buttons(pomodoro))
# 启动协程
await asyncio.gather(internet_task, mqtt_task, fetch_time_task, fetch_weather_task, pixels_led_task, lcd_display_task, monitor_touch_buttons_task)
asyncio.run(main())
附settings.toml内容
CIRCUITPY_WIFI_SSID = "<根据实际情况修改>"
CIRCUITPY_WIFI_PASSWORD = "<根据实际情况修改>"
CIRCUITPY_WEB_API_PASSWORD = "passw0rd" #未使用
CIRCUITPY_WEB_API_PORT = 80 #未使用
TIME_ZONE = 8
# https://www.alapi.cn/api/view/54
LUNAR_API_KEY = "<根据实际情况修改>"
WEATHER_API_KEY = "<根据实际情况修改>"
WEATHER_CITY = "<根据实际情况修改,可以设置为城市中文名或拼音>"
# 一个番茄钟时间秒数
POMODORO_TIME = 900
# 番茄钟完成时,手动确认超时秒数
POMODORO_TIMEOUT = 100
MQTT_HOST = "<根据实际情况修改>"
MQTT_PORT = 1883
MQTT_USER = "<根据实际情况修改>"
MQTT_PASSWORD = "<根据实际情况修改>"
MQTT_TOPIC = "d1"
MQTT_CLIENT_ID = "<根据实际情况修改>"
三、源码下载
源码打包:(非最新版本,更新后的代码版本可以参考帖子的内容)
ESP32S3-TFT-CircuitPython-番茄日历钟源码-嵌入式开发相关资料下载-EEWORLD下载中心
⛳️END
本帖最后由 ltpop 于 2023-11-27 12:02 编辑
最终版本代码(有正确缩进)
"""
番茄日历钟
ltpop@163.com
202311 v26
"""
# TOML配置文件读取
import os
import time
import rtc
import board
import displayio
import terminalio
import touchio
import wifi
import ssl
import socketpool
# 需要导入asyncio、adafruit_ticks库
import asyncio
# 需要导入neopixel库
import neopixel
# 需要导入adafruit_ntp库
import adafruit_ntp
# 需要导入adafruit_display_text库
from adafruit_display_text import bitmap_label
# 需要导入adafruit_bitmap_font库
from adafruit_bitmap_font import bitmap_font
# 需要导入adafruit_imageload库
import adafruit_imageload
# 需要导入adafruit_requests库
import adafruit_requests as requests
# 需要导入adafruit_led_animation库
from adafruit_led_animation.animation.blink import Blink
from adafruit_led_animation.animation.rainbow import Rainbow
from adafruit_led_animation.sequence import AnimationSequence
import adafruit_led_animation.color as color
# 需要导入adafruit_minimqtt库
import adafruit_minimqtt.adafruit_minimqtt as MQTT
# 当前设备联网状态
class Internet:
def __init__(self):
self.state = False
self.wait = 30.0
# MQTT
class MqttClient:
def __init__(self):
self.mqtt = None
self.is_connected = False
self.host = os.getenv("MQTT_HOST")
self.port = os.getenv("MQTT_PORT")
self.user = os.getenv("MQTT_USER")
self.password = os.getenv("MQTT_PASSWORD")
self.client_id = os.getenv("MQTT_CLIENT_ID")
self.topic = os.getenv("MQTT_TOPIC")
self.led = "led1"
self.cmd_topic = f"cmd/{self.topic}/{self.led}"
self.state_topic = f"stat/{self.topic}/{self.led}"
self.led = None
self.wait = 0.1
def connect(self, mqtt_cli, userdata, flags, rc):
print(f"Connected to MQTT {self.host}")
self.is_connected = True
mqtt_cli.subscribe(self.state_topic)
def disconnect(self, mqtt_cli, userdata, rc):
print(f"Disconnected from MQTT {self.host}")
self.is_connected = False
def message(self, client, topic, message):
print(f"New message on topic {topic}: {message}")
# 0-关,1-关,2-切换
if topic == self.state_topic:
if message == '0':
self.led = False
elif message == '1':
self.led = True
# 当前天气状态
class Weather:
def __init__(self):
self.city = ''
self.text = None
self.temperature = 20.0
self.wait = 3600.0
# 当前日期时间状态
class NTPDatetime:
def __init__(self):
self.datetime = None
self.ntp = None
self.ntp_sync = None
self.next_lunar_request_time = None
self.weekday = None
self.week_name = None
self.lunar_year_ganzhi = None
self.lunar_month_chinese = None
self.lunar_day_chinese = None
self.wait = 3600.0
self.retry = 10
# 番茄统计
class Pomodoro:
def __init__(self):
self.count = 0
self.run = False
self.end = 0
self.confirming = False
self.confirmed = False
self.time = os.getenv("POMODORO_TIME")
self.timeout = os.getenv("POMODORO_TIMEOUT")
self.wait = 0.2
# 连接wifi
async def wifi_connect(internet):
# 事件在settings.toml配置 WIFI信息:CIRCUITPY_WIFI_SSID=<wifi ssid名称>和CIRCUITPY_WIFI_PASSWORD=<wifi密码>
failure_count = 0
while True:
internet.state = wifi.radio.connected
if not wifi.radio.connected:
try:
wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv(
"CIRCUITPY_WIFI_PASSWORD"))
except OSError as error:
print("Failed to connect, retrying\n", error)
failure_count += 1
await asyncio.sleep(internet.wait)
# 如果wifi没有正常连接,则切换为ap模式
# if not wifi_state:
# wifi.radio.start_ap(ssid = 'esp32s3_ap', password = 'abcd1234')
async def mqtt_connect(mqtt_client):
while True:
if not mqtt_client.mqtt:
print("Set up a MiniMQTT Client")
pool = socketpool.SocketPool(wifi.radio)
mqtt = MQTT.MQTT(
broker=mqtt_client.host,
username=mqtt_client.user,
password=mqtt_client.password,
socket_pool=pool,
client_id=mqtt_client.client_id
)
mqtt.on_connect = mqtt_client.connect
mqtt.on_disconnect = mqtt_client.disconnect
mqtt.on_message = mqtt_client.message
mqtt_client.mqtt = mqtt
mqtt.connect()
try:
mqtt_client.mqtt.loop()
except (ValueError, RuntimeError) as e:
print("Failed to get data, retrying\n", e)
mqtt_client.mqtt.reconnect()
continue
await asyncio.sleep(mqtt_client.wait)
# 获取时间
async def fetch_time(internet, ntp_datetime):
tz_offset = os.getenv("TIME_ZONE")
the_rtc = rtc.RTC()
ntp = None
while True:
if internet.state:
ntp_ok = False
lunar_ok = False
pool = socketpool.SocketPool(wifi.radio)
# 获取当前ntp时间,
if not ntp:
try:
ntp = adafruit_ntp.NTP(
pool, server="ntp.ntsc.ac.cn", tz_offset=tz_offset)
# 更新系统时间
the_rtc.datetime = ntp.datetime
ntp_datetime.ntp = ntp
ntp_ok = True
ntp_datetime.ntp_sync = True
except OSError as error:
print("NTP failed, retrying\n", error)
ntp = None
lunar_need_update = False
if not ntp_datetime.next_lunar_request_time:
lunar_need_update = True
elif time.time() > ntp_datetime.next_lunar_request_time:
# 超过下一次请求时间
lunar_need_update = True
if lunar_need_update:
# 获取阴历
try:
https = requests.Session(pool, ssl.create_default_context())
lunar_response = https.get(
f'https://v2.alapi.cn/api/lunar?token={os.getenv("LUNAR_API_KEY")}')
except RuntimeError as error:
print("Lunar request failed, retrying\n", error)
lunar_response = None
if lunar_response:
lunar_json = lunar_response.json()
if lunar_json['code'] == 200:
lunar_data = lunar_json['data']
# print(lunar_data)
week_day = lunar_data['week_no']
if week_day:
ntp_datetime.week_day = week_day
week_name = lunar_data['week_name']
if week_name:
ntp_datetime.week_name = week_name
lunar_year_ganzhi = lunar_data['ganzhi_year']
if lunar_year_ganzhi:
ntp_datetime.lunar_year_ganzhi = lunar_year_ganzhi
lunar_month_chinese = lunar_data['lunar_month_chinese']
if lunar_month_chinese:
ntp_datetime.lunar_month_chinese = lunar_month_chinese
lunar_day_chinese = lunar_data['lunar_day_chinese']
if lunar_day_chinese:
ntp_datetime.lunar_day_chinese = lunar_day_chinese
lunar_ok = True
print(
f"当前阴历:{lunar_year_ganzhi}年 {lunar_month_chinese}{lunar_day_chinese}")
print(f"当前星期:{week_name}")
# 设置下一次更新时间为第二天0点
current_time = time.localtime()
ntp_datetime.next_lunar_request_time = time.mktime((current_time.tm_year, current_time.tm_mon, current_time.tm_mday + 1, 0, 0, 0, 0, 0, 0))
# 仅在第二天时再更新
if not ntp_ok or not lunar_ok:
await asyncio.sleep(ntp_datetime.retry)
else:
# dt = time.localtime()
# print(
# f"当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour:02d}:{dt.tm_min:02d}")
# wait_seconds = (24-dt.tm_hour)*60+(60 - dt.tm_sec)
await asyncio.sleep(ntp_datetime.wait)
else:
await asyncio.sleep(internet.wait)
# 获取天气信息
async def fetch_weather(internet, weather):
# 获取天气API KEY
weather_api_key = os.getenv("WEATHER_API_KEY")
# 获取天气城市:从配置文件中读取城市设置
weather_city = os.getenv("WEATHER_CITY")
while True:
if internet.state:
# 天气信息
pool = socketpool.SocketPool(wifi.radio)
https = requests.Session(pool, ssl.create_default_context())
# 如果读取不到配置的城市,则获取当前IP城市
if not weather_city:
# 获取当前外网IP和城市
try:
ip_city_response = https.get("https://myip.ipip.net/json")
except RuntimeError as error:
print("IP city request failed, retrying\n", error)
if ip_city_response:
ip_city_json = ip_city_response.json()
if ip_city_json["ret"] == "ok":
weather_city = ip_city_json['data']['location'][2]
print(f"当前IP城市:{weather_city}")
weather.city = weather_city
# 当前天气
weather_now_url = f"https://api.seniverse.com/v3/weather/now.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5"
try:
weather_now_response = https.get(weather_now_url)
except RuntimeError as error:
print("Weather request failed, retrying\n", error)
weather_now_response = None
if weather_now_response:
weather_json = weather_now_response.json()
if weather_json["results"]:
now_weather = weather_json["results"][0]["now"]["text"]
now_temperature = weather_json["results"][0]["now"]["temperature"]
weather.text = now_weather
weather.temperature = now_temperature
print(f"当前天气:{now_weather},气温:{now_temperature}℃")
# 未来天气预报
# weather_daily_url = f"https://api.seniverse.com/v3/weather/daily.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5"
# weather_response = https.get(weather_daily_url)
# weather_json = weather_response.json()
# if weather_json["results"]:
# today_weather = weather_json["results"][0]["daily"][0]["text_day"]
# today_temprature_low = weather_json["results"][0]["daily"][0]["low"]
# today_temprature_high = weather_json["results"][0]["daily"][0]["high"]
# today_humidity = weather_json["results"][0]["daily"][0]["humidity"]
# print(f"明天天气:{today_weather},气温:{today_temprature_low}℃ - {today_temprature_high}℃,温度:{today_humidity}%")
await asyncio.sleep(weather.wait)
else:
await asyncio.sleep(internet.wait)
# led显示
async def pixels_led(internet, pomodoro):
# Neopixel LED控制
pixel_pin = board.NEOPIXEL
pixel_num = 1
pixels = neopixel.NeoPixel(
pixel_pin, pixel_num, brightness=0.05, auto_write=False)
rainbow = Rainbow(pixels, speed=0.1, period=2)
blink = Blink(pixels, 0.5, color.GREEN)
animations = AnimationSequence(
rainbow,
advance_interval=5,
auto_clear=True,
)
while True:
# 番茄进行中,显示为红色常亮
if pomodoro.run:
# 番茄等待确认中,显示为绿色闪烁
if pomodoro.confirming:
blink.speed = 0.5
blink.color = color.GREEN
blink.animate()
else:
pixels.fill((255, 0, 0))
pixels.show()
elif not internet.state:
blink.speed = 2
blink.color = color.RED
blink.animate()
else:
# 否则显示为彩虹色
animations.animate()
await asyncio.sleep(pomodoro.wait)
# 屏幕显示
async def lcd_display(internet, mqtt_client, ntp_datetime, weather, pomodoro):
display = board.DISPLAY
# 中文字体文件放在font目录下
font_file = "font/wenquanyi_13px.pcf"
font = bitmap_font.load_font(font_file)
group = displayio.Group()
# 设置日期显示(左上角)
date_label = bitmap_label.Label(terminalio.FONT, scale=1)
date_label.anchor_point = (0.0, 0.0)
date_label.anchored_position = (5, 5)
date_label.text = "2023-11-11"
group.append(date_label)
# 设置时间显示(右上角)
time_label = bitmap_label.Label(terminalio.FONT, color=0xFF0000, scale=2)
time_label.anchor_point = (1.0, 0.0)
time_label.anchored_position = (display.width - 2, 2)
time_label.text = "11:30"
group.append(time_label)
# 设置农历显示(上中部)
lunar_label = bitmap_label.Label(font, color=0x0000FF, scale=1)
lunar_label.anchor_point = (0.5, 0.0)
lunar_label.anchored_position = (display.width // 2, 5)
lunar_label.text = "九月廿八"
group.append(lunar_label)
# 设置天气显示(左下角)
weather_label = bitmap_label.Label(font, color=0x00FF00, scale=1)
weather_label.anchor_point = (0.0, 1.0)
weather_label.anchored_position = (2, display.height - 5)
weather_label.text = "晴"
group.append(weather_label)
# 设置气温显示(下中部)
temperature_label = bitmap_label.Label(font, color=0xFFFF00, scale=1)
temperature_label.anchor_point = (0.5, 1.0)
temperature_label.anchored_position = (
display.width // 2, display.height - 5)
temperature_label.text = "5℃"
group.append(temperature_label)
# 设置MQTT状态(下中右部)
mqtt_label = bitmap_label.Label(terminalio.FONT, scale=1)
mqtt_label.anchor_point = (0.5, 1.0)
mqtt_label.anchored_position = (display.width // 1.3, display.height - 5)
mqtt_label.text = "-"
group.append(mqtt_label)
# 设置番茄钟倒计时显示(中间)
pomodoro_label = bitmap_label.Label(
terminalio.FONT, color=0xFF00FF, scale=7)
# 显示位置
pomodoro_label.anchor_point = (0.5, 0.5)
pomodoro_label.anchored_position = (
display.width // 2, display.height // 2)
pomodoro_label.text = "15:00"
group.append(pomodoro_label)
# 设置倒番茄钟统计显示(右下角)
# with open("img/tomato.bmp", "rb") as f:
# image, palette = adafruit_imageload.load(f, bitmap=displayio.Bitmap, palette=displayio.Palette)
# sprite = displayio.TileGrid(image, pixel_shader=palette)
# group.append(sprite)
count_label = bitmap_label.Label(terminalio.FONT, color=0x00FFFF, scale=2)
# 显示位置
count_label.anchor_point = (1, 1)
count_label.anchored_position = (display.width - 2, display.height - 2)
count_label.text = "0"
group.append(count_label)
# 番茄倒计时结束时的确认超时时间,超时不确认该番茄不计入统计
os.getenv("POMODORO_TIMEOUT")
# 创建根group
main_group = displayio.Group()
main_group.append(group)
# 展示
display.root_group = main_group
while True:
if ntp_datetime.ntp_sync:
dt = time.localtime()
# text = f'当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour}:{dt.tm_min} ,当前城市:{weather.city}, 当前天气:{weather.text} ,温度:{weather.temperature}℃'
# 设置日期文本
date_text = f"{dt.tm_year:04d}-{dt.tm_mon:02d}-{dt.tm_mday:02d}"
if date_label.text != date_text:
date_label.text = date_text
# 设置时间文本
time_text = f"{dt.tm_hour:02d}:{dt.tm_min:02d}"
if time_label.text != time_text:
time_label.text = time_text
# 设置农历文本
if ntp_datetime.lunar_month_chinese and ntp_datetime.lunar_day_chinese:
lunar_text = f"{ntp_datetime.lunar_month_chinese}{ntp_datetime.lunar_day_chinese}"
if lunar_label.text != lunar_text:
lunar_label.text = lunar_text
if weather.text:
# 设置天气文本
if weather_label.text != weather.text:
weather_label.text = weather.text
# 设置气温文本
temperature_text = f"{weather.temperature}℃"
if temperature_label.text != temperature_text:
temperature_label.text = temperature_text
else:
weather_text = f'请先连接WIFI'
if weather_label.text != weather_text:
weather_label.text = weather_text
# MQTT LED状态
if mqtt_client.mqtt.is_connected:
if mqtt_client.led:
# 亮灯显示为红色
led_text = "on"
mqtt_label.color=0xFF0000
else:
led_text = "off"
mqtt_label.color=0xFFFFFF
if mqtt_label.text != led_text:
mqtt_label.text = led_text
# 更新番茄钟
pomodoro_label.color = 0x00FFFF
count_text = f"{pomodoro.count}"
if count_label.text != count_text:
count_label.text = count_text
if pomodoro.run:
left_seconds = pomodoro.end - time.monotonic()
if left_seconds >= 0:
minute = int(left_seconds / 60)
second = int(left_seconds % 60)
# 倒计时每秒更新一次
pomodoro_text = f"{minute:02d}:{second:02d}"
if pomodoro_label.text != pomodoro_text:
pomodoro_label.text = pomodoro_text
else:
# 番茄完成时,需要在超时时间内按键确认方可统计为完成的番茄数
timeout_seconds = abs(left_seconds)
if not pomodoro.confirmed and timeout_seconds < pomodoro.timeout:
pomodoro.confirming = True
weather_label.text = f'番茄等待确认'
# 超时时显示为红色
pomodoro_label.color = 0xFF0000
pomodoro_label.text = f"{int(pomodoro.timeout - timeout_seconds):02d}"
else:
pomodoro_label.text = f'{int(pomodoro.time/60):02d}:{int(pomodoro.time%60):02d}'
pomodoro.confirming = False
pomodoro.confirmed = False
pomodoro.run = False
await asyncio.sleep(pomodoro.wait)
async def monitor_touch_buttons(pomodoro):
touch = touchio.TouchIn(board.D10)
while True:
if touch.value:
await asyncio.sleep(0.1)
if touch.value:
# 按钮开始番茄计时
if not pomodoro.run:
pomodoro.run = True
pomodoro.end = time.monotonic() + pomodoro.time
# 番茄确认状态时,将番茄数加1
elif pomodoro.confirming:
pomodoro.confirmed = True
pomodoro.count += 1
await asyncio.sleep(0.1)
async def main():
# 共享变量设置
internet = Internet()
mqtt_client = MqttClient()
ntp_datetime = NTPDatetime()
weather = Weather()
pomodoro = Pomodoro()
# 协程函数定义
internet_task = asyncio.create_task(wifi_connect(internet))
mqtt_task = asyncio.create_task(mqtt_connect(mqtt_client))
fetch_time_task = asyncio.create_task(fetch_time(internet, ntp_datetime))
fetch_weather_task = asyncio.create_task(fetch_weather(internet, weather))
pixels_led_task = asyncio.create_task(pixels_led(internet, pomodoro))
lcd_display_task = asyncio.create_task(
lcd_display(internet, mqtt_client, ntp_datetime, weather, pomodoro))
monitor_touch_buttons_task = asyncio.create_task(
monitor_touch_buttons(pomodoro))
# 启动协程
await asyncio.gather(internet_task, mqtt_task, fetch_time_task, fetch_weather_task, pixels_led_task, lcd_display_task, monitor_touch_buttons_task)
asyncio.run(main())
欢迎大家一起学习交流
本帖最后由 ltpop 于 2023-11-17 22:49 编辑
演示视频
项目源码
"""
番茄日历钟
ltpop@163.com
202311
"""
# TOML配置文件读取
import os
import time
import rtc
import board
import displayio
import terminalio
import touchio
import wifi
import ssl
import socketpool
# 需要导入asyncio、adafruit_ticks库
import asyncio
# 需要导入neopixel库
import neopixel
# 需要导入adafruit_ntp库
import adafruit_ntp
# 需要导入adafruit_display_text库
from adafruit_display_text import bitmap_label
# 需要导入adafruit_bitmap_font库
from adafruit_bitmap_font import bitmap_font
# 需要导入adafruit_requests库
import adafruit_requests as requests
# 需要导入adafruit_led_animation库
from adafruit_led_animation.animation.blink import Blink
from adafruit_led_animation.animation.rainbow import Rainbow
from adafruit_led_animation.sequence import AnimationSequence
import adafruit_led_animation.color as color
# 当前设备联网状态
class Internet:
def __init__(self):
self.state = False
self.wait = 30.0
# 当前天气状态
class Weather:
def __init__(self):
self.city = ''
self.text = None
self.temperature = 20.0
self.wait = 300.0
# 当前日期时间状态
class NTPDatetime:
def __init__(self):
self.datetime = None
self.ntp = None
self.weekday = None
self.week_name = None
self.lunar_year_ganzhi = None
self.lunar_month_chinese = None
self.lunar_day_chinese = None
self.wait = 3600.0
self.retry = 10
# 番茄统计
class Pomodoro:
def __init__(self):
self.count = 0
self.run = False
self.end = 0
self.confirming = False
self.confirmed = False
self.time = os.getenv("POMODORO_TIME")
self.timeout = os.getenv("POMODORO_TIMEOUT")
self.wait = 0.1
# 连接wifi
async def wifi_connect(internet):
# 事件在settings.toml配置 WIFI信息:CIRCUITPY_WIFI_SSID=<wifi ssid名称>和CIRCUITPY_WIFI_PASSWORD=<wifi密码>
failure_count = 0
while True:
internet.state = wifi.radio.connected
if not wifi.radio.connected:
try:
wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv("CIRCUITPY_WIFI_PASSWORD"))
except OSError as error:
print("Failed to connect, retrying\n", error)
failure_count += 1
await asyncio.sleep(internet.wait)
# 如果wifi没有正常连接,则切换为ap模式
# if not wifi_state:
# wifi.radio.start_ap(ssid = 'esp32s3_ap', password = 'abcd1234')
# 获取时间
async def fetch_time(internet, ntp_datetime):
tz_offset=os.getenv("TIME_ZONE")
the_rtc = rtc.RTC()
ntp = None
while True:
if internet.state:
ntp_ok = False
lunar_ok = False
# 获取当前ntp时间,
if not ntp:
pool = socketpool.SocketPool(wifi.radio)
try:
ntp = adafruit_ntp.NTP(pool, server = "cn.ntp.org.cn", tz_offset = tz_offset)
# 更新系统时间
the_rtc.datetime = ntp.datetime
ntp_datetime.ntp = ntp
ntp_ok = True
except OSError as error:
print("NTP failed, retrying\n", error)
ntp = None
else:
# 获取阴历
pool = socketpool.SocketPool(wifi.radio)
https = requests.Session(pool, ssl.create_default_context())
lunar_response = https.get(f'https://v2.alapi.cn/api/lunar?token={os.getenv("LUNAR_API_KEY")}')
if lunar_response:
lunar_json = lunar_response.json()
if lunar_json['code'] == 200:
lunar_data = lunar_json['data']
# print(lunar_data)
week_day = lunar_data['week_no']
if week_day:
ntp_datetime.week_day = week_day
week_name = lunar_data['week_name']
if week_name:
ntp_datetime.week_name = week_name
lunar_year_ganzhi = lunar_data['ganzhi_year']
if lunar_year_ganzhi:
ntp_datetime.lunar_year_ganzhi = lunar_year_ganzhi
lunar_month_chinese = lunar_data['lunar_month_chinese']
if lunar_month_chinese:
ntp_datetime.lunar_month_chinese = lunar_month_chinese
lunar_day_chinese = lunar_data['lunar_day_chinese']
if lunar_day_chinese:
ntp_datetime.lunar_day_chinese = lunar_day_chinese
lunar_ok = True
print(f"当前阴历:{lunar_year_ganzhi}年 {lunar_month_chinese}{lunar_day_chinese}")
print(f"当前星期:{week_name}")
# 仅在第二天时再更新
if not ntp_ok or not lunar_ok:
await asyncio.sleep(ntp_datetime.retry)
else:
dt = time.localtime()
print(f"当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour:02d}:{dt.tm_min:02d}")
wait_seconds = (24-dt.tm_hour)*60+(60 - dt.tm_sec)
await asyncio.sleep(wait_seconds)
else:
await asyncio.sleep(internet.wait)
# 获取天气信息
async def fetch_weather(internet, weather):
# 获取天气API KEY
weather_api_key = os.getenv("WEATHER_API_KEY")
# 获取天气城市:从配置文件中读取城市设置
weather_city = os.getenv("WEATHER_CITY")
while True:
if internet.state:
# 天气信息
pool = socketpool.SocketPool(wifi.radio)
https = requests.Session(pool, ssl.create_default_context())
# 如果读取不到配置的城市,则获取当前IP城市
if not weather_city:
# 获取当前外网IP和城市
ip_city_response = https.get("https://myip.ipip.net/json")
ip_city_json = ip_city_response.json()
if ip_city_json["ret"] == "ok":
weather_city = ip_city_json['data']['location'][2]
print(f"当前IP城市:{weather_city}")
weather.city = weather_city
# 当前天气
weather_now_url = f"https://api.seniverse.com/v3/weather/now.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5"
weather_now_response = https.get(weather_now_url)
weather_json = weather_now_response.json()
if weather_json["results"]:
now_weather = weather_json["results"][0]["now"]["text"]
now_temperature = weather_json["results"][0]["now"]["temperature"]
weather.text = now_weather
weather.temperature = now_temperature
print(f"当前天气:{now_weather},气温:{now_temperature}℃")
# 未来天气预报
# weather_daily_url = f"https://api.seniverse.com/v3/weather/daily.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5"
# weather_response = https.get(weather_daily_url)
# weather_json = weather_response.json()
# if weather_json["results"]:
# today_weather = weather_json["results"][0]["daily"][0]["text_day"]
# today_temprature_low = weather_json["results"][0]["daily"][0]["low"]
# today_temprature_high = weather_json["results"][0]["daily"][0]["high"]
# today_humidity = weather_json["results"][0]["daily"][0]["humidity"]
# print(f"明天天气:{today_weather},气温:{today_temprature_low}℃ - {today_temprature_high}℃,温度:{today_humidity}%")
await asyncio.sleep(weather.wait)
else:
await asyncio.sleep(internet.wait)
# led显示
async def pixels_led(internet, pomodoro):
# Neopixel LED控制
pixel_pin = board.NEOPIXEL
pixel_num = 1
pixels = neopixel.NeoPixel(pixel_pin, pixel_num, brightness=0.05, auto_write=False)
rainbow = Rainbow(pixels, speed=0.1, period=2)
# 番茄等待确认中,显示为青蓝色闪烁
blink = Blink(pixels, 0.5, color.CYAN)
animations = AnimationSequence(
rainbow,
advance_interval=5,
auto_clear=True,
)
while True:
# 番茄进行中,显示为红色常亮
if pomodoro.run:
# 番茄等待确认中,显示为
if pomodoro.confirming:
blink.animate()
else:
pixels.fill((255, 0, 0))
pixels.show()
else:
# 否则显示为彩虹色
animations.animate()
await asyncio.sleep(pomodoro.wait)
# 屏幕显示
async def lcd_display(internet, ntp_datetime, weather, pomodoro):
display = board.DISPLAY
# 中文字体文件放在font目录下
font_file = "font/wenquanyi_13px.pcf"
font = bitmap_font.load_font(font_file)
group = displayio.Group()
# 设置日期显示(左上角)
date_label = bitmap_label.Label(terminalio.FONT, scale=1)
date_label.anchor_point = (0.0, 0.0)
date_label.anchored_position = (5, 5)
date_label.text = "2023-11-11"
group.append(date_label)
# 设置时间显示(右上角)
time_label = bitmap_label.Label(terminalio.FONT, color=0xFF0000, scale=2)
time_label.anchor_point = (1.0, 0.0)
time_label.anchored_position = (display.width - 2, 2)
time_label.text = "11:30"
group.append(time_label)
# 设置农历显示(上中部)
lunar_label = bitmap_label.Label(font, color=0x0000FF, scale=1)
lunar_label.anchor_point = (0.5, 0.0)
lunar_label.anchored_position = (display.width // 2, 5)
lunar_label.text = "九月廿八"
group.append(lunar_label)
# 设置天气显示(左下角)
weather_label = bitmap_label.Label(font, color=0x00FF00, scale=1)
weather_label.anchor_point = (0.0, 1.0)
weather_label.anchored_position = (2, display.height - 5)
weather_label.text = "晴"
group.append(weather_label)
# 设置气温显示(下中部)
temperature_label = bitmap_label.Label(font, color=0xFFFF00, scale=1)
temperature_label.anchor_point = (0.5, 1.0)
temperature_label.anchored_position = (display.width // 2, display.height - 5)
temperature_label.text = "5℃"
group.append(temperature_label)
# 设置番茄钟倒计时显示(中间)
pomodoro_label = bitmap_label.Label(terminalio.FONT, color=0xFF00FF, scale=7)
# 显示位置
pomodoro_label.anchor_point = (0.5, 0.5)
pomodoro_label.anchored_position = (display.width // 2, display.height // 2)
pomodoro_label.text = "15:00"
group.append(pomodoro_label)
# 设置倒番茄钟统计显示(右下角)
count_label = bitmap_label.Label(terminalio.FONT, color=0x00FFFF, scale=2)
# 显示位置
count_label.anchor_point = (1, 1)
count_label.anchored_position = (display.width - 2, display.height - 2)
count_label.text = "0"
group.append(count_label)
# 番茄倒计时结束时的确认超时时间,超时不确认该番茄不计入统计
os.getenv("POMODORO_TIMEOUT")
# 创建根group
main_group = displayio.Group()
main_group.append(group)
# 展示
display.root_group = main_group
while True:
if internet.state:
dt = time.localtime()
# text = f'当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour}:{dt.tm_min} ,当前城市:{weather.city}, 当前天气:{weather.text} ,温度:{weather.temperature}℃'
# 设置日期文本
date_label.text = f"{dt.tm_year:04d}-{dt.tm_mon:02d}-{dt.tm_mday:02d}"
# 设置时间文本
time_label.text = f"{dt.tm_hour:02d}:{dt.tm_min:02d}"
# 设置农历文本
if ntp_datetime.lunar_month_chinese and ntp_datetime.lunar_day_chinese:
lunar_label.text = f"{ntp_datetime.lunar_month_chinese}{ntp_datetime.lunar_day_chinese}"
if weather.text:
# 设置天气文本
weather_label.text = f"{weather.text}"
# 设置气温文本
temperature_label.text = f"{weather.temperature}℃"
else:
weather_label.text = f'请先连接WIFI'
timeout = pomodoro.wait
# 更新番茄钟
pomodoro_label.color = 0x00FFFF
count_label.text = f"{pomodoro.count}"
if pomodoro.run:
left_seconds = pomodoro.end - time.monotonic()
if left_seconds >= 0:
minute = int(left_seconds / 60)
second = int(left_seconds % 60)
# 倒计时每秒更新一次
sec = left_seconds % 1
timeout = 1.0 - sec
pomodoro_label.text = f"{minute:02d}:{second:02d}"
else:
# 番茄完成时,需要在超时时间内按键确认方可统计为完成的番茄数
timeout_seconds = abs(left_seconds)
if not pomodoro.confirmed and timeout_seconds < pomodoro.timeout:
pomodoro.confirming = True
weather_label.text = f'番茄等待确认'
# 超时时显示为红色
pomodoro_label.color = 0xFF0000
pomodoro_label.text = f"{int(pomodoro.timeout - timeout_seconds):02d}"
else:
pomodoro_label.text = f'{int(pomodoro.time/60):02d}:{int(pomodoro.time%60):02d}'
pomodoro.confirming = False
pomodoro.confirmed = False
pomodoro.run = False
else:
timeout = pomodoro.wait
await asyncio.sleep(timeout)
async def monitor_touch_buttons(pomodoro):
touch = touchio.TouchIn(board.D10)
while True:
if touch.value:
# 按钮开始番茄计时
if not pomodoro.run:
pomodoro.run = True
pomodoro.end = time.monotonic() + pomodoro.time
# 番茄确认状态时,将番茄数加1
elif pomodoro.confirming:
pomodoro.confirmed = True
pomodoro.count += 1
await asyncio.sleep(pomodoro.wait)
async def main():
# 共享变量设置
internet = Internet()
ntp_datetime = NTPDatetime()
weather = Weather()
pomodoro = Pomodoro()
# 协程函数定义
internet_task = asyncio.create_task(wifi_connect(internet))
fetch_time_task = asyncio.create_task(fetch_time(internet, ntp_datetime))
fetch_weather_task = asyncio.create_task(fetch_weather(internet,weather))
pixels_led_task = asyncio.create_task(pixels_led(internet,pomodoro))
lcd_display_task = asyncio.create_task(lcd_display(internet, ntp_datetime, weather, pomodoro))
monitor_touch_buttons_task = asyncio.create_task(monitor_touch_buttons(pomodoro))
# 启动协程
await asyncio.gather(internet_task, fetch_time_task, fetch_weather_task, pixels_led_task, lcd_display_task, monitor_touch_buttons_task)
asyncio.run(main())