ltpop

    1. 【得捷电子Follow me第2期】番茄日历钟 5/820 DigiKey得捷技术专区 2023-12-28
      本帖最后由 ltpop 于 2023-12-28 16:46 编辑 一、介绍视频   二、项目总结 笔者是一个智能家居爱好者,玩过HomeAssistant、ESPHome、Tasmota等平台,并使用过ESP8266、ESP32自已DIY设备。通常我只使用现有平台进行配置,很少直接编写硬件代码。非常感谢DigiKey和EEWORLD举办的Follow me活动,让我有机会玩转智能设备的底层代码。 和大部分同学一样,我选择了CircuitPython作为开发平台和语言。社区资源非常丰富,开发门槛较低,调试方便,非常适合像我这样的新手。为了完成活动的要求并实现一个完整的功能,我选择了制作一个日历+天气+番茄钟功能。这个功能可以在日常的工作学习中,运用番茄工作法提高效率。 这是我第一次接触CircuitPython,完成项目的同时也学习了CircuitPython。我的方法是先确定项目目标,即需要实现的最小功能。然后针对每个功能参考官方示例并对相关库用法进行学习,并同时进行代码调试。各功能代码测试通过后,将其合并为最终完整的代码。 项目中的功能点较多,除了活动要求的WIFI配置、网络请求、中文显示、Neopixel LED控制外,还包含了屏幕控制、协程、触摸传感器等的使用。其中的难点是屏幕控制和协程。作为一个CircuitPython新手,目前完成的项目代码仅仅是实现了功能,仍有很大的优化空间。后面还会继续探索实践,欢迎大家一起交流。 项目目标: 完成活动要求 网络数据请求 中文显示 Neopixel控制 实现番茄日历功能: 日期时间显示,并显示农历 天气显示 番茄钟控制功能 可设置番茄时长 显示番茄倒计时 完成后LED提醒并确认 番茄个数统计 实现方法 WIFI连接至互联网 从NTP服务获取日期和时间 从开放日历API获取农历日期 从开放天气API获取当地天气 使用协程函数和共享变量的方法进行数据更新和屏幕显示控制 使用ESP32S3的触摸传感器进行输入控制,执行番茄钟的启动和确认 使用Neopixel LED的不同颜色和状态表示设备的状态,如联网状态、番茄钟运行、确认阶段等 项目成品展示     完成帖子:【得捷电子Follow me第2期】番茄日历钟 - DigiKey得捷技术专区 - 电子工程世界-论坛 (eeworld.com.cn) 活动任务 Follow me 第2期!与得捷电子一起解锁开发板超能力! (eeworld.com.cn) 任务1:控制屏幕显示中文 完成屏幕的控制,并且能显示中文 任务2:网络功能使用 完成网络功能的使用,能够创建热点和连接到WiFi 任务3:控制WS2812B 使用按键控制板载Neopixel LED的显示和颜色切换 任务4:分任务1:日历&时钟 完成一个可通过互联网更新的万年历时钟,并显示当地的天气信息 项目材料 ESP32S3 TFT Overview | Adafruit ESP32-S3 TFT Feather | Adafruit Learning System 环境准备 安装CircuitPython 双击reset键,将.uf2文件复制到FTHRS2BOOT磁盘中,会自动重启并完成安装 使用Mu editor或VSCode+插件连接开发板,进入代码开发,两者都很方便进入REPL.模式进行调试。Mu连接更稳定,VSCode代码编写更强大。 开发流程一般是找相似的官方示例,在REPL模式中进行代码片断调试,测试成功后再写入code.py文件中,进行完整测试。 正常模型下修改了code.py文件内容后,开发板会自动重启载入代码。但如果在REPL模式下,需要手动重置开发板,除了按板子上的RST按钮外,还可以使用Ctrl+D或执行以下代码来重置。 import microcontroller; microcontroller.reset();   安装Lib 代码中使用的库,一部分已经内置,可以直接import。否则就需要手动安装,官方提供的Lib链接为Libraries (circuitpython.org) 或Adafruit_CircuitPython_Bundle/libraries/drivers at main · adafruit/Adafruit_CircuitPython_Bundle (github.com),可进入下载与系统版本对应的包进行安装,如 adafruit-circuitpython-bundle-8.x-mpy-20231010.zip 。 安装方法很简单,解压后在lib目录中找到相应的.mpy文件,并拷贝至 CIRCUITPY 磁盘中lib目录下即可。如果需要源码,也可下载原始py文件。 功能点实现方法 中文字体显示(任务1) 官方示例:Introduction — Adafruit Bitmap_Font Library 1.0 documentation (circuitpython.org) 需要额外Lib:adafruit_display_text、adafruit_bitmap_font 显示中文的关键点在于找到中文字体,参考网友的代码使用了这里的开源字体:carrothu-cn/chinese-bitmap-fonts (github.com)。因为开发板空间限制,选择了大小合适的wenquanyi_13px.pcf,保存在 CIRCUITPY 磁盘的font目录下。 核心代码 import board import displayio import terminalio # 需要导入adafruit_display_text库 from adafruit_display_text import bitmap_label # 需要导入adafruit_bitmap_font库 from adafruit_bitmap_font import bitmap_font display = board.DISPLAY # 中文字体文件放在font目录下 font_file = "font/wenquanyi_13px.pcf" font = bitmap_font.load_font(font_file) group = displayio.Group() # 设置农历显示(上中部) lunar_label = bitmap_label.Label(font, color=0x0000FF, scale=1) lunar_label.anchor_point = (0.5, 0.0) lunar_label.anchored_position = (display.width // 2, 5) lunar_label.text = "十月初一" group.append(lunar_label) display.root_group = group   在项目中的农历和天气信息均可正常显示中文内容,如下图(上中部和左下角位置) 连接WIF(任务2) 这个比较简单,无需额外的lib。但最好需要多次重试连接,避免上电时连接失败。 核心代码 import os import wifi import time # 连接wifi # 事先在settings.toml配置 WIFI信息:CIRCUITPY_WIFI_SSID=<wifi ssid名称>和CIRCUITPY_WIFI_PASSWORD=<wifi密码> state = wifi.radio.connected failure_count = 0 while not wifi.radio.connected: try: wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv("CIRCUITPY_WIFI_PASSWORD")) except OSError as error: print("Failed to connect, retrying\n", error) failure_count += 1 time.sleep(5)   项目中在不同的联网状态下可进行不同的展示,如未联网时显示如下(左下角提醒未连WIFI) 控制WS2812B(任务3) 官方示例:Adafruit CircuitPython NeoPixel — Adafruit CircuitPython NeoPixel Library 1.0 documentation 需要额外Lib:neopixel、adafruit_led_animation NeoPixel LED的使用方法非常灵活,官方有很多特效库,可完成各种效果。项目中使用不同的颜色指示当前开发板状态,如番茄钟计时显示为红色常量,番茄钟倒计时确认中显示为青蓝色闪烁等等。 核心代码 import board # 需要导入neopixel库 import neopixel # 需要导入adafruit_led_animation库 from adafruit_led_animation.animation.blink import Blink pixel_pin = board.NEOPIXEL pixel_num = 1 pixels = neopixel.NeoPixel(pixel_pin, pixel_num, brightness=0.05, auto_write=False) # 红色常亮 pixels.fill((255, 0, 0)) pixels.show() # 显示为青蓝色闪烁 blink = Blink(pixels, 0.5, color.CYAN) blink.animate()   项目中在番茄确认时,LED灯为表蓝色闪烁,以便提醒用户进行番茄确认。 获取日期时间(任务4) 官方示例:Adafruit_Learning_System_Guides/Raspberry_Pi_Azure_IoT_Hub_Dashboard/qtPyEsp32S2_co2/code.py at 6e9b42bf65613e926cbcd5d2e3ddf19f7c4cbb54 · adafruit/Adafruit_Learning_System_Guides (github.com) 需要额外Lib:adafruit_ntp、adafruit_requests 因为开发板没有连接时钟模块,需要从NTP服务器获取时间,官方有现成的库可调用,为了稳定可使用国内的NTP服务器。NTP返回的数据中有日期和时间,为了方便获取农历日期,采用API的方式来获取,与天气API类似,需要注册账号使用API KEY来访问。 核心代码 import os import wifi import time import ssl import socketpool # 需要导入adafruit_ntp库 import adafruit_ntp # 需要导入adafruit_requests库 import adafruit_requests as requests tz_offset=os.getenv("TIME_ZONE") ntp = None while True: # 使用前需要先联网 # 获取当前ntp时间, if not ntp: pool = socketpool.SocketPool(wifi.radio) try: ntp = adafruit_ntp.NTP(pool, server = "cn.ntp.org.cn", tz_offset = tz_offset) except OSError as error: print("NTP failed, retrying\n", error) pass # 获取阴历 pool = socketpool.SocketPool(wifi.radio) https = requests.Session(pool, ssl.create_default_context()) lunar_response = https.get(f'https://v2.alapi.cn/api/lunar?token={os.getenv("LUNAR_API_KEY")}') lunar_json = lunar_response.json() if lunar_json['code'] == 200: lunar_data = lunar_json['data'] week_day = lunar_data['week_no'] week_name = lunar_data['week_name'] lunar_year_ganzhi = lunar_data['week_name'] lunar_month_chinese = lunar_data['lunar_month_chinese'] lunar_day_chinese = lunar_data['lunar_day_chinese'] print(f"当前农历:{lunar_year_ganzhi}年 {lunar_month_chinese}{lunar_day_chinese}") print(f"当前星期:{week_name}") # 仅在第二天时再更新 dt = ntp.datetime print(f"当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour:02d}:{dt.tm_min:02d}") wait_seconds = (24-dt.tm_hour)*60+(60 - dt.tm_sec) time.sleep(wait_seconds) 项目中屏幕顶部依次为日期、农历、时间显示,如下图   天气信息获取(任务4)   官方示例:EEWORLDLINKTK4,需要先注册账号,获得API KEY才能使用。 密码类信息可统一保存在根目录下的settings.toml文件中,格式为<变量名> = <值>,然后在代码中使用os.getenv("<变量名>")来获取,安全方便。 为了方便修改城市信息,也将天气城市设置在toml文件,或者通过当前网络的外网IP获取到当地城市,可访问相关API获取到,坐标可参考代码。 核心代码: import os import wifi import time import ssl import socketpool # 需要导入adafruit_requests库 import adafruit_requests as requests # 获取天气信息 # 获取天气API KEY weather_api_key = os.getenv("WEATHER_API_KEY") # 获取天气城市:从配置文件中读取城市设置 weather_city = os.getenv("WEATHER_CITY") while True: # 创建https pool = socketpool.SocketPool(wifi.radio) https = requests.Session(pool, ssl.create_default_context()) # 如果读取不到配置的城市,则获取当前IP城市 if not weather_city: # 获取当前外网IP和城市 ip_city_response = https.get("https://myip.ipip.net/json") ip_city_json = ip_city_response.json() if ip_city_json["ret"] == "ok": weather_city = ip_city_json['data']['location'][2] print(f"当前IP城市:{weather_city}") # 当前天气 weather_now_url = f"https://api.seniverse.com/v3/weather/now.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5" weather_now_response = https.get(weather_now_url) weather_json = weather_now_response.json() if weather_json["results"]: now_weather = weather_json["results"][0]["now"]["text"] now_temperature = weather_json["results"][0]["now"]["temperature"] print(f"当前天气:{now_weather},气温:{now_temperature}℃") time.sleep(300) #5分钟更新一次   项目中天气和气温信息展示在屏幕底部,如下图 屏幕显示控制 官网示例链接:Introduction | CircuitPython Display Support Using displayio | Adafruit Learning System 需要额外Lib:board、displayio、terminalio 屏幕控制分为两个部分,第一部分为画为屏幕的展示布局,可使用官方提供的displayio库来实现;第二部分为控制显示内容,基本思路就是在每次循环中更新各元素的展示内容,如文字、数字、图片等。 该部分涉及到项目中的wifi连接、网络时间、天气等处理,而且需要同步控制屏幕显示。代码行数较多,因为番倒计时每秒更新屏幕内容,目前的代码执行时屏幕显示不是很顺畅,偶尔会卡住1-2秒,代码仍需要优化。 屏幕布局中各组件的从属关系如下图所示,简单说就是Display包含Group,Group中包含Bitmap。   屏幕坐标从左上角开始   bitmap_label中anchor_point为元素的参照位置点,值含义如下,如放置在屏幕左上角的元素可设置为(0,0),右下角的可设置为(1,1),具体可参考核心代码:   核心代码: import board import displayio import terminalio # 需要导入adafruit_display_text库 from adafruit_display_text import bitmap_label # 需要导入adafruit_bitmap_font库 from adafruit_bitmap_font import bitmap_font display = board.DISPLAY # 中文字体文件放在font目录下 font_file = "font/wenquanyi_13px.pcf" font = bitmap_font.load_font(font_file) group = displayio.Group() # 设置日期显示(左上角) date_label = bitmap_label.Label(terminalio.FONT, scale=1) date_label.anchor_point = (0.0, 0.0) date_label.anchored_position = (5, 5) date_label.text = "2023-11-11" group.append(date_label) # 设置时间显示(右上角) time_label = bitmap_label.Label(terminalio.FONT, color=0xFF0000, scale=2) time_label.anchor_point = (1.0, 0.0) time_label.anchored_position = (display.width - 2, 2) time_label.text = "11:30" group.append(time_label) # 设置农历显示(上中部) lunar_label = bitmap_label.Label(font, color=0x0000FF, scale=1) lunar_label.anchor_point = (0.5, 0.0) lunar_label.anchored_position = (display.width // 2, 5) lunar_label.text = "十月初一" group.append(lunar_label) # 设置天气显示(左下角) weather_label = bitmap_label.Label(font, color=0x00FF00, scale=1) weather_label.anchor_point = (0.0, 1.0) weather_label.anchored_position = (2, display.height - 5) weather_label.text = "晴" group.append(weather_label) # 设置气温显示(下中部) temperature_label = bitmap_label.Label(font, color=0xFFFF00, scale=1) temperature_label.anchor_point = (0.5, 1.0) temperature_label.anchored_position = (display.width // 2, display.height - 5) temperature_label.text = "0℃" group.append(temperature_label) # 设置番茄钟倒计时显示(中间) pomodoro_label = bitmap_label.Label(terminalio.FONT, color=0xFF00FF, scale=7) # 显示位置 pomodoro_label.anchor_point = (0.5, 0.5) pomodoro_label.anchored_position = (display.width // 2, display.height // 2) pomodoro_label.text = "15:00" group.append(pomodoro_label) # 设置倒番茄钟统计显示(右下角) count_label = bitmap_label.Label(terminalio.FONT, color=0x00FFFF, scale=2) # 显示位置 count_label.anchor_point = (1, 1) count_label.anchored_position = (display.width - 2, display.height - 2) count_label.text = "0" group.append(count_label) os.getenv("POMODORO_TIMEOUT") # 定义main_group main_group = displayio.Group() main_group.append(group) # 只能有一个main_group display.root_group = main_group # 更新屏幕内容,以下为示例代码,需要环境变量,无法直接执行 while True: timeout = 0.1 if internet.state and ntp_datetime.ntp: dt = ntp_datetime.ntp.datetime text = f'当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour}:{dt.tm_min} ,当前城市:{weather.city}, 当前天气:{weather.text} ,温度:{weather.temperature}℃' # 设置日期文本 date_label.text = f"{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday}" # 设置时间文本 time_label.text = f"{dt.tm_hour}:{dt.tm_min}" # 设置农历文本 lunar_label.text = f"{lunar_month_chinese}月{lunar_day_chinese}" if weather.text: # 设置天气文本 weather_label.text = f"{weather.text}" # 设置气温文本 temperature_label.text = f"{weather.temperature}℃" else: text = f'请先连接WIFI' weather_label.text = text await asyncio.sleep(timeout)   效果图 多任务并发控制 官方示例:Concurrent Tasks | Cooperative Multitasking in CircuitPython with asyncio | Adafruit Learning System 需要额外Lib:asyncio、adafruit_ticks 项目中需要控制不同的网络请求,并需要在屏幕展示不同的内容,在同一个循环中实现会非常复杂。需要使用并发控制将各事件进行解耦,方便代码编写。即Python中协程异步IO(asyncio),参考官方的文档,该实现可总结为以下几个步骤 在普通函数前添加async关键字,将其转为协程函数 使用await asyncio.sleep代替time.sleep进行延迟,释放对控制器的独占 定义好各协程函数后,使用 asyncio.create_task(some_coroutine(arg1, arg2, ...))对其进行调用 使用 await asyncio.gather(task1, task2, ...)等待各协程任务完成 再次强调不要忘记使用 await 核心代码:这里是官方示例,实际代码可参考下面完整的项目源码 # SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries # # SPDX-License-Identifier: MIT import asyncio import board import digitalio import keypad class Interval: """Simple class to hold an interval value. Use .value to to read or write.""" def __init__(self, initial_interval): self.value = initial_interval async def monitor_interval_buttons(pin_slower, pin_faster, interval): """Monitor two buttons: one lengthens the interval, the other shortens it. Change interval.value as appropriate. """ # Assume buttons are active low. with keypad.Keys( (pin_slower, pin_faster), value_when_pressed=False, pull=True ) as keys: while True: key_event = keys.events.get() if key_event and key_event.pressed: if key_event.key_number == 0: # Lengthen the interval. interval.value += 0.1 else: # Shorten the interval. interval.value = max(0.1, interval.value - 0.1) print("interval is now", interval.value) # Let another task run. await asyncio.sleep(0) async def blink(pin, interval): """Blink the given pin forever. The blinking rate is controlled by the supplied Interval object. """ with digitalio.DigitalInOut(pin) as led: led.switch_to_output() while True: led.value = not led.value await asyncio.sleep(interval.value) async def main(): interval1 = Interval(0.5) interval2 = Interval(1.0) led1_task = asyncio.create_task(blink(board.D1, interval1)) led2_task = asyncio.create_task(blink(board.D2, interval2)) interval1_task = asyncio.create_task( monitor_interval_buttons(board.D3, board.D4, interval1) ) interval2_task = asyncio.create_task( monitor_interval_buttons(board.D5, board.D6, interval2) ) await asyncio.gather(led1_task, led2_task, interval1_task, interval2_task) asyncio.run(main())   ESP32S3触摸传感器使用 官方示例:CircuitPython Cap Touch | CircuitPython Essentials | Adafruit Learning System ESP32S3中内置了多个触摸传感器,可以无需外接设备即可实现输入控制,本项目中用于番茄钟的开始触发和确认功能。使用时可将支持触摸传感器的GPIO接一根导线或直接触摸PCB板。具体哪个GPIO支持触摸,可通过执行以下官方代码给出清单。 Capacitive Touch | Adafruit ESP32-S3 TFT Feather | Adafruit Learning System 执行结果为: # 只显示支持触摸的GPIO Touch on: A4 Touch on: A5 Touch on: TX Touch on: D10 Touch on: D11 Touch on: D12 Touch on: LED Touch on: RX Touch on: D5 Touch on: D6 Touch on: D9 测试结果为11个,但经测试并不是每个都能使用,如其中的A4、A5一直都是触发状态,原因不明,有了解的小伙伴欢迎评论。项目中使用了D10,位于屏幕上侧中部,经使用测试,触摸反应很灵敏。 核心代码 import board import touchio import time touch = touchio.TouchIn(board.D10) while True: if touch.value: # 按钮开始番茄计时 print("start") time.sleep(0.1) 多设备MQTT通信 本次活动一起下单的还有其他两块ESP32的单片机,用来模拟多设备通信。 本实验使用MQTT来实现多设备间通信,具体设备清单如下: 设备1:本次项目的主控,Adafruit ESP32-S3 TFT Feather,用来模拟中控面板,用来显示灯的开关状态。 设备2:另一款ESP32S3的开发板,AtomS3 Lite ESP32S3,用来模拟智能灯 设备3:ESP32C3模块,esp32-c3-wroom-02,经过简单的外围电路焊接,可用来模拟智能开关 效果展示: 设备1:显示灯的状态,右下角on和off     设备2:使用其背面的LED来模拟智能灯,如下图     设备3:模拟智能开关,模组添加了轻触按键,用来模拟开关。 简单说下模组的接线。因为这款是ESP32C3模组,无法直接使用,这里焊接了必要的外围和开关,通过USB2TTL模块接入,如下图。除了3V3、GND、TXD、RXD直接接入USB2TTL外,EN、IO2、IO8需要接高电平,这里各串联3个10K电阻接到3V3,按键开关两端接在IO9和GND,加电时按下可进行固件刷写,否则正常启动。(条件有限,仅用来做实验)     环境搭建 在本地搭建MQTT服务器,用的是mosquitto,流程比较简单,教程也比较多,不再赘述。 MQTT的设计如下: 定义了两个mqtt的topic:cmd/d2/led1和state/d2/led1,其中cmd为发送开灯命令,state为灯的状态更新。d2为设备,本实验中只涉及到设备2,定义为d2,led1为设备中的具体模块,本例中为led1。 其中设备1订阅state/d2/led1主题,当接收的消息为1时表示灯已开,为0时表示灯已关,对应屏幕上显示on和off。 设备2订阅cmd/d2/led1主题,当接收的消息为1时表示开灯操作(0为关机操作,2为切换操作),此时将本机的led1打开,并同时发送消息到主题state/d2/led1,消息内容为灯的具体状态0或1。同时也可通过设备本机的按键来进行开关灯操作,同样也发送相应的mqtt消息。 设备3监听本机按键操作,当按键按下时发送消息到主题cmd/d2/led1,消息内容为2,表示切换开灯操作。 核心代码 设备1和设备2都是ESP32S3芯片,直接使用CircuitPython环境;设备3为ESP32C3,使用了microPython环境。 设备1(中控端,负责显示灯状态) # 此处为核心代码,完整代码见项目源码 import displayio # 需要导入adafruit_display_text库 from adafruit_display_text import bitmap_label # 需要导入adafruit_minimqtt库 import adafruit_minimqtt.adafruit_minimqtt as MQTT # MQTT class MqttClient: def __init__(self): self.mqtt = None self.is_connected = False self.host = os.getenv("MQTT_HOST") self.port = os.getenv("MQTT_PORT") self.user = os.getenv("MQTT_USER") self.password = os.getenv("MQTT_PASSWORD") self.client_id = os.getenv("MQTT_CLIENT_ID") self.topic = os.getenv("MQTT_TOPIC") self.led = "led1" self.cmd_topic = f"cmd/{self.topic}/{self.led}" self.state_topic = f"stat/{self.topic}/{self.led}" self.led = None self.wait = 0.1 def connect(self, mqtt_cli, userdata, flags, rc): print(f"Connected to MQTT {self.host}") self.is_connected = True mqtt_cli.subscribe(self.state_topic) def disconnect(self, mqtt_cli, userdata, rc): print(f"Disconnected from MQTT {self.host}") self.is_connected = False def message(self, client, topic, message): print(f"New message on topic {topic}: {message}") # 0-关,1-关,2-切换 if topic == self.state_topic: if message == '0': self.led = False elif message == '1': self.led = True async def mqtt_connect(mqtt_client): while True: if not mqtt_client.mqtt: print("Set up a MiniMQTT Client") pool = socketpool.SocketPool(wifi.radio) mqtt = MQTT.MQTT( broker=mqtt_client.host, username=mqtt_client.user, password=mqtt_client.password, socket_pool=pool, client_id=mqtt_client.client_id ) mqtt.on_connect = mqtt_client.connect mqtt.on_disconnect = mqtt_client.disconnect mqtt.on_message = mqtt_client.message mqtt_client.mqtt = mqtt mqtt.connect() try: mqtt_client.mqtt.loop() except (ValueError, RuntimeError) as e: print("Failed to get data, retrying\n", e) mqtt_client.mqtt.reconnect() continue await asyncio.sleep(mqtt_client.wait) # 屏幕显示 async def lcd_display(internet, mqtt_client, ntp_datetime, weather, pomodoro): display = board.DISPLAY group = displayio.Group() # 设置MQTT状态(下中右部) mqtt_label = bitmap_label.Label(terminalio.FONT, scale=1) mqtt_label.anchor_point = (0.5, 1.0) mqtt_label.anchored_position = (display.width // 1.3, display.height - 5) mqtt_label.text = "-" group.append(mqtt_label) # 其他代码省略,完整代码见项目源码 # 创建根group main_group = displayio.Group() main_group.append(group) # 展示 display.root_group = main_group while True: # MQTT LED状态 if mqtt_client.mqtt.is_connected: if mqtt_client.led: # 亮灯显示为红色 led_text = "on" mqtt_label.color=0xFF0000 else: led_text = "off" mqtt_label.color=0xFFFFFF if mqtt_label.text != led_text: mqtt_label.text = led_text # 其他代码省略,完整代码见项目源码 await asyncio.sleep(pomodoro.wait) 设备2(智能灯) import board import os import wifi import socketpool from digitalio import DigitalInOut, Direction, Pull # 需要导入adafruit_minimqtt库 import adafruit_minimqtt.adafruit_minimqtt as MQTT # 需要导入neopixel库 import neopixel # 需要导入asyncio、adafruit_ticks库 import asyncio # MQTT class MqttClient: def __init__(self): self.mqtt = None self.is_connected = False self.host = os.getenv("MQTT_HOST") self.port = os.getenv("MQTT_PORT") self.user = os.getenv("MQTT_USER") self.password = os.getenv("MQTT_PASSWORD") self.client_id = os.getenv("MQTT_CLIENT_ID") self.topic = os.getenv("MQTT_TOPIC") self.led = "led1" self.cmd_topic = f"cmd/{self.topic}/{self.led}" self.state_topic = f"stat/{self.topic}/{self.led}" self.led = None self.wait = 0.1 def connect(self, mqtt_cli, userdata, flags, rc): print(f"Connected to MQTT {self.host}") self.is_connected = True mqtt_cli.subscribe(self.cmd_topic) def disconnect(self, mqtt_cli, userdata, rc): print(f"Disconnected from MQTT {self.host}") self.is_connected = False def message(self, client, topic, message): print(f"New message on topic {topic}: {message}") # 0-关,1-关,2-切换 print(f"topic: {topic}") if topic == self.cmd_topic: print(f"message: {message}") if self.led: if message == "0": self.led.power = False elif message == "1": self.led.power = True elif message == "2": self.led.power = not self.led.power # LED class LED: def __init__(self): self.pixels = None self.power = False self.wait = 0.05 # 连接wifi async def wifi_connect(internet): # 事件在settings.toml配置 WIFI信息:CIRCUITPY_WIFI_SSID=<wifi ssid名称>和CIRCUITPY_WIFI_PASSWORD=<wifi密码> failure_count = 0 while True: internet.state = wifi.radio.connected if not wifi.radio.connected: try: wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv( "CIRCUITPY_WIFI_PASSWORD")) except OSError as error: print("Failed to connect, retrying\n", error) failure_count += 1 await asyncio.sleep(internet.wait) # 如果wifi没有正常连接,则切换为ap模式 # if not wifi_state: # wifi.radio.start_ap(ssid = 'esp32s3_ap', password = 'abcd1234') async def mqtt_connect(mqtt_client): while True: if not mqtt_client.mqtt: print("Set up a MiniMQTT Client") pool = socketpool.SocketPool(wifi.radio) mqtt = MQTT.MQTT( broker=mqtt_client.host, username=mqtt_client.user, password=mqtt_client.password, socket_pool=pool, client_id=mqtt_client.client_id ) mqtt.on_connect = mqtt_client.connect mqtt.on_disconnect = mqtt_client.disconnect mqtt.on_message = mqtt_client.message mqtt_client.mqtt = mqtt mqtt.connect() try: mqtt_client.mqtt.loop() except (ValueError, RuntimeError) as e: print("Failed to get data, retrying\n", e) mqtt_client.mqtt.reconnect() continue await asyncio.sleep(mqtt_client.wait) # 按钮点击检测 async def monitor_buttons(led): button = DigitalInOut(board.BTN) while True: if not button.value: # 按钮按下,防抖 # await asyncio.sleep(led.wait) # if not button.value: print("button pressed") led.power = not led.power await asyncio.sleep(led.wait) # led控制 async def pixels_led(led, mqtt_client): if not led.pixels: # Neopixel LED定义 pixel_pin = board.NEOPIXEL pixel_num = 1 pixels = neopixel.NeoPixel(pixel_pin, pixel_num, brightness=0, auto_write=True) pixels.fill((255, 255, 255)) led.pixels = pixels last_power = None mqtt_client.led = led while True: if led.power: pixels.brightness = 1 else: pixels.brightness = 0 if last_power != led.power: print("led toggle") last_power = led.power print(mqtt_client) if mqtt_client.is_connected: print(mqtt_client.state_topic) mqtt_client.mqtt.publish(mqtt_client.state_topic, 1 if led.power else 0) await asyncio.sleep(led.wait) async def main(): # 共享变量设置 mqtt_client = MqttClient() led = LED() # 协程函数定义 mqtt_task = asyncio.create_task(mqtt_connect(mqtt_client)) monitor_buttons_task = asyncio.create_task(monitor_buttons(led)) pixels_led_task = asyncio.create_task(pixels_led(led,mqtt_client)) # 启动协程 await asyncio.gather(mqtt_task, monitor_buttons_task, pixels_led_task) asyncio.run(main()) 设备3(智能开关) import ujson import network from umqtt.simple import MQTTClient from machine import Pin from time import sleep_ms # 保存 Wi-Fi MQTT 配置到配置文件 def save_wifi_config(ssid, password, mqtt_host, mqtt_port, mqtt_user, mqtt_password): config = { 'wifi_ssid': ssid, 'wifi_password': password, 'mqtt_host': mqtt_host, 'mqtt_port': mqtt_port, 'mqtt_user': mqtt_user, 'mqtt_password': mqtt_password } with open('config.json', 'w') as f: ujson.dump(config, f) # 从配置文件中读取 Wi-Fi 配置 def load_wifi_config(): try: with open('config.json', 'r') as f: config = ujson.load(f) return config['wifi_ssid'], config['wifi_password'] except OSError: return None, None # 从配置文件中读取 MQTT 配置 def load_mqtt_config(): try: with open('config.json', 'r') as f: config = ujson.load(f) return config['mqtt_host'], config['mqtt_port'], config['mqtt_user'], config['mqtt_password'] except OSError: return None, None, None, None # 示例用法:保存 Wi-Fi 配置到配置文件 # wifi_ssid = 'your_wifi_ssid' # wifi_password = 'your_wifi_password' # mqtt_host = '' # mqtt_port = '' # mqtt_user = '' # mqtt_password = '' # save_wifi_config(wifi_ssid, wifi_password, mqtt_host, mqtt_port, mqtt_user, mqtt_password) # 示例用法:从配置文件中读取 Wi-Fi 配置 # saved_wifi_ssid, saved_wifi_password = load_wifi_config() # 设置 Wi-Fi 连接 wifi_ssid, wifi_password = load_wifi_config() station = network.WLAN(network.STA_IF) station.active(True) station.connect(wifi_ssid, wifi_password) # 设置 MQTT 客户端 mqtt_broker, mqtt_port, mqtt_username, mqtt_password = load_mqtt_config() mqtt_client_id = 'd2' mqtt_topic = 'cmd/d1/led1' mqtt_client = MQTTClient(mqtt_client_id, mqtt_broker, port=mqtt_port, user=mqtt_username, password=mqtt_password) mqtt_client.connect() # 设置按键引脚 button_pin = Pin(9, Pin.IN, Pin.PULL_UP) # 按键防抖延迟时间(毫秒) debounce_delay = 100 # 检测按键状态并处理按下事件 def handle_button_press(): if not button_pin.value(): sleep_ms(debounce_delay) if not button_pin.value(): # 当按键按下时,发送 MQTT 消息 mqtt_client.publish(mqtt_topic, '2') while True: handle_button_press() 项目最终完整代码 终于完成了🧗‍♂️,完整代码如下 """ 番茄日历钟 ltpop@163.com 20231126 """ # TOML配置文件读取 import os import time import rtc import board import displayio import terminalio import touchio import wifi import ssl import socketpool # 需要导入asyncio、adafruit_ticks库 import asyncio # 需要导入neopixel库 import neopixel # 需要导入adafruit_ntp库 import adafruit_ntp # 需要导入adafruit_display_text库 from adafruit_display_text import bitmap_label # 需要导入adafruit_bitmap_font库 from adafruit_bitmap_font import bitmap_font # 需要导入adafruit_imageload库 import adafruit_imageload # 需要导入adafruit_requests库 import adafruit_requests as requests # 需要导入adafruit_led_animation库 from adafruit_led_animation.animation.blink import Blink from adafruit_led_animation.animation.rainbow import Rainbow from adafruit_led_animation.sequence import AnimationSequence import adafruit_led_animation.color as color # 需要导入adafruit_minimqtt库 import adafruit_minimqtt.adafruit_minimqtt as MQTT # 当前设备联网状态 class Internet: def __init__(self): self.state = False self.wait = 30.0 # MQTT class MqttClient: def __init__(self): self.mqtt = None self.is_connected = False self.host = os.getenv("MQTT_HOST") self.port = os.getenv("MQTT_PORT") self.user = os.getenv("MQTT_USER") self.password = os.getenv("MQTT_PASSWORD") self.client_id = os.getenv("MQTT_CLIENT_ID") self.topic = os.getenv("MQTT_TOPIC") self.led = "led1" self.cmd_topic = f"cmd/{self.topic}/{self.led}" self.state_topic = f"stat/{self.topic}/{self.led}" self.led = None self.wait = 0.1 def connect(self, mqtt_cli, userdata, flags, rc): print(f"Connected to MQTT {self.host}") self.is_connected = True mqtt_cli.subscribe(self.state_topic) def disconnect(self, mqtt_cli, userdata, rc): print(f"Disconnected from MQTT {self.host}") self.is_connected = False def message(self, client, topic, message): print(f"New message on topic {topic}: {message}") # 0-关,1-关,2-切换 if topic == self.state_topic: if message == '0': self.led = False elif message == '1': self.led = True # 当前天气状态 class Weather: def __init__(self): self.city = '' self.text = None self.temperature = 20.0 self.wait = 3600.0 # 当前日期时间状态 class NTPDatetime: def __init__(self): self.datetime = None self.ntp = None self.ntp_sync = None self.next_lunar_request_time = None self.weekday = None self.week_name = None self.lunar_year_ganzhi = None self.lunar_month_chinese = None self.lunar_day_chinese = None self.wait = 3600.0 self.retry = 10 # 番茄统计 class Pomodoro: def __init__(self): self.count = 0 self.run = False self.end = 0 self.confirming = False self.confirmed = False self.time = os.getenv("POMODORO_TIME") self.timeout = os.getenv("POMODORO_TIMEOUT") self.wait = 0.2 # 连接wifi async def wifi_connect(internet): # 事件在settings.toml配置 WIFI信息:CIRCUITPY_WIFI_SSID=<wifi ssid名称>和CIRCUITPY_WIFI_PASSWORD=<wifi密码> failure_count = 0 while True: internet.state = wifi.radio.connected if not wifi.radio.connected: try: wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv( "CIRCUITPY_WIFI_PASSWORD")) except OSError as error: print("Failed to connect, retrying\n", error) failure_count += 1 await asyncio.sleep(internet.wait) # 如果wifi没有正常连接,则切换为ap模式 # if not wifi_state: # wifi.radio.start_ap(ssid = 'esp32s3_ap', password = 'abcd1234') async def mqtt_connect(mqtt_client): while True: if not mqtt_client.mqtt: print("Set up a MiniMQTT Client") pool = socketpool.SocketPool(wifi.radio) mqtt = MQTT.MQTT( broker=mqtt_client.host, username=mqtt_client.user, password=mqtt_client.password, socket_pool=pool, client_id=mqtt_client.client_id ) mqtt.on_connect = mqtt_client.connect mqtt.on_disconnect = mqtt_client.disconnect mqtt.on_message = mqtt_client.message mqtt_client.mqtt = mqtt mqtt.connect() try: mqtt_client.mqtt.loop() except (ValueError, RuntimeError) as e: print("Failed to get data, retrying\n", e) mqtt_client.mqtt.reconnect() continue await asyncio.sleep(mqtt_client.wait) # 获取时间 async def fetch_time(internet, ntp_datetime): tz_offset = os.getenv("TIME_ZONE") the_rtc = rtc.RTC() ntp = None while True: if internet.state: ntp_ok = False lunar_ok = False pool = socketpool.SocketPool(wifi.radio) # 获取当前ntp时间, if not ntp: try: ntp = adafruit_ntp.NTP( pool, server="ntp.ntsc.ac.cn", tz_offset=tz_offset) # 更新系统时间 the_rtc.datetime = ntp.datetime ntp_datetime.ntp = ntp ntp_ok = True ntp_datetime.ntp_sync = True except OSError as error: print("NTP failed, retrying\n", error) ntp = None lunar_need_update = False if not ntp_datetime.next_lunar_request_time: lunar_need_update = True elif time.time() > ntp_datetime.next_lunar_request_time: # 超过下一次请求时间 lunar_need_update = True if lunar_need_update: # 获取阴历 try: https = requests.Session(pool, ssl.create_default_context()) lunar_response = https.get( f'https://v2.alapi.cn/api/lunar?token={os.getenv("LUNAR_API_KEY")}') except RuntimeError as error: print("Lunar request failed, retrying\n", error) lunar_response = None if lunar_response: lunar_json = lunar_response.json() if lunar_json['code'] == 200: lunar_data = lunar_json['data'] # print(lunar_data) week_day = lunar_data['week_no'] if week_day: ntp_datetime.week_day = week_day week_name = lunar_data['week_name'] if week_name: ntp_datetime.week_name = week_name lunar_year_ganzhi = lunar_data['ganzhi_year'] if lunar_year_ganzhi: ntp_datetime.lunar_year_ganzhi = lunar_year_ganzhi lunar_month_chinese = lunar_data['lunar_month_chinese'] if lunar_month_chinese: ntp_datetime.lunar_month_chinese = lunar_month_chinese lunar_day_chinese = lunar_data['lunar_day_chinese'] if lunar_day_chinese: ntp_datetime.lunar_day_chinese = lunar_day_chinese lunar_ok = True print( f"当前阴历:{lunar_year_ganzhi}年 {lunar_month_chinese}{lunar_day_chinese}") print(f"当前星期:{week_name}") # 设置下一次更新时间为第二天0点 current_time = time.localtime() ntp_datetime.next_lunar_request_time = time.mktime((current_time.tm_year, current_time.tm_mon, current_time.tm_mday + 1, 0, 0, 0, 0, 0, 0)) # 仅在第二天时再更新 if not ntp_ok or not lunar_ok: await asyncio.sleep(ntp_datetime.retry) else: # dt = time.localtime() # print( # f"当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour:02d}:{dt.tm_min:02d}") # wait_seconds = (24-dt.tm_hour)*60+(60 - dt.tm_sec) await asyncio.sleep(ntp_datetime.wait) else: await asyncio.sleep(internet.wait) # 获取天气信息 async def fetch_weather(internet, weather): # 获取天气API KEY weather_api_key = os.getenv("WEATHER_API_KEY") # 获取天气城市:从配置文件中读取城市设置 weather_city = os.getenv("WEATHER_CITY") while True: if internet.state: # 天气信息 pool = socketpool.SocketPool(wifi.radio) https = requests.Session(pool, ssl.create_default_context()) # 如果读取不到配置的城市,则获取当前IP城市 if not weather_city: # 获取当前外网IP和城市 try: ip_city_response = https.get("https://myip.ipip.net/json") except RuntimeError as error: print("IP city request failed, retrying\n", error) if ip_city_response: ip_city_json = ip_city_response.json() if ip_city_json["ret"] == "ok": weather_city = ip_city_json['data']['location'][2] print(f"当前IP城市:{weather_city}") weather.city = weather_city # 当前天气 weather_now_url = f"https://api.seniverse.com/v3/weather/now.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5" try: weather_now_response = https.get(weather_now_url) except RuntimeError as error: print("Weather request failed, retrying\n", error) weather_now_response = None if weather_now_response: weather_json = weather_now_response.json() if weather_json["results"]: now_weather = weather_json["results"][0]["now"]["text"] now_temperature = weather_json["results"][0]["now"]["temperature"] weather.text = now_weather weather.temperature = now_temperature print(f"当前天气:{now_weather},气温:{now_temperature}℃") # 未来天气预报 # weather_daily_url = f"https://api.seniverse.com/v3/weather/daily.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5" # weather_response = https.get(weather_daily_url) # weather_json = weather_response.json() # if weather_json["results"]: # today_weather = weather_json["results"][0]["daily"][0]["text_day"] # today_temprature_low = weather_json["results"][0]["daily"][0]["low"] # today_temprature_high = weather_json["results"][0]["daily"][0]["high"] # today_humidity = weather_json["results"][0]["daily"][0]["humidity"] # print(f"明天天气:{today_weather},气温:{today_temprature_low}℃ - {today_temprature_high}℃,温度:{today_humidity}%") await asyncio.sleep(weather.wait) else: await asyncio.sleep(internet.wait) # led显示 async def pixels_led(internet, pomodoro): # Neopixel LED控制 pixel_pin = board.NEOPIXEL pixel_num = 1 pixels = neopixel.NeoPixel( pixel_pin, pixel_num, brightness=0.05, auto_write=False) rainbow = Rainbow(pixels, speed=0.1, period=2) blink = Blink(pixels, 0.5, color.GREEN) animations = AnimationSequence( rainbow, advance_interval=5, auto_clear=True, ) while True: # 番茄进行中,显示为红色常亮 if pomodoro.run: # 番茄等待确认中,显示为绿色闪烁 if pomodoro.confirming: blink.speed = 0.5 blink.color = color.GREEN blink.animate() else: pixels.fill((255, 0, 0)) pixels.show() elif not internet.state: blink.speed = 2 blink.color = color.RED blink.animate() else: # 否则显示为彩虹色 animations.animate() await asyncio.sleep(pomodoro.wait) # 屏幕显示 async def lcd_display(internet, mqtt_client, ntp_datetime, weather, pomodoro): display = board.DISPLAY # 中文字体文件放在font目录下 font_file = "font/wenquanyi_13px.pcf" font = bitmap_font.load_font(font_file) group = displayio.Group() # 设置日期显示(左上角) date_label = bitmap_label.Label(terminalio.FONT, scale=1) date_label.anchor_point = (0.0, 0.0) date_label.anchored_position = (5, 5) date_label.text = "2023-11-11" group.append(date_label) # 设置时间显示(右上角) time_label = bitmap_label.Label(terminalio.FONT, color=0xFF0000, scale=2) time_label.anchor_point = (1.0, 0.0) time_label.anchored_position = (display.width - 2, 2) time_label.text = "11:30" group.append(time_label) # 设置农历显示(上中部) lunar_label = bitmap_label.Label(font, color=0x0000FF, scale=1) lunar_label.anchor_point = (0.5, 0.0) lunar_label.anchored_position = (display.width // 2, 5) lunar_label.text = "九月廿八" group.append(lunar_label) # 设置天气显示(左下角) weather_label = bitmap_label.Label(font, color=0x00FF00, scale=1) weather_label.anchor_point = (0.0, 1.0) weather_label.anchored_position = (2, display.height - 5) weather_label.text = "晴" group.append(weather_label) # 设置气温显示(下中部) temperature_label = bitmap_label.Label(font, color=0xFFFF00, scale=1) temperature_label.anchor_point = (0.5, 1.0) temperature_label.anchored_position = ( display.width // 2, display.height - 5) temperature_label.text = "5℃" group.append(temperature_label) # 设置MQTT状态(下中右部) mqtt_label = bitmap_label.Label(terminalio.FONT, scale=1) mqtt_label.anchor_point = (0.5, 1.0) mqtt_label.anchored_position = (display.width // 1.3, display.height - 5) mqtt_label.text = "-" group.append(mqtt_label) # 设置番茄钟倒计时显示(中间) pomodoro_label = bitmap_label.Label( terminalio.FONT, color=0xFF00FF, scale=7) # 显示位置 pomodoro_label.anchor_point = (0.5, 0.5) pomodoro_label.anchored_position = ( display.width // 2, display.height // 2) pomodoro_label.text = "15:00" group.append(pomodoro_label) # 设置倒番茄钟统计显示(右下角) # with open("img/tomato.bmp", "rb") as f: # image, palette = adafruit_imageload.load(f, bitmap=displayio.Bitmap, palette=displayio.Palette) # sprite = displayio.TileGrid(image, pixel_shader=palette) # group.append(sprite) count_label = bitmap_label.Label(terminalio.FONT, color=0x00FFFF, scale=2) # 显示位置 count_label.anchor_point = (1, 1) count_label.anchored_position = (display.width - 2, display.height - 2) count_label.text = "0" group.append(count_label) # 番茄倒计时结束时的确认超时时间,超时不确认该番茄不计入统计 os.getenv("POMODORO_TIMEOUT") # 创建根group main_group = displayio.Group() main_group.append(group) # 展示 display.root_group = main_group while True: if ntp_datetime.ntp_sync: dt = time.localtime() # text = f'当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour}:{dt.tm_min} ,当前城市:{weather.city}, 当前天气:{weather.text} ,温度:{weather.temperature}℃' # 设置日期文本 date_text = f"{dt.tm_year:04d}-{dt.tm_mon:02d}-{dt.tm_mday:02d}" if date_label.text != date_text: date_label.text = date_text # 设置时间文本 time_text = f"{dt.tm_hour:02d}:{dt.tm_min:02d}" if time_label.text != time_text: time_label.text = time_text # 设置农历文本 if ntp_datetime.lunar_month_chinese and ntp_datetime.lunar_day_chinese: lunar_text = f"{ntp_datetime.lunar_month_chinese}{ntp_datetime.lunar_day_chinese}" if lunar_label.text != lunar_text: lunar_label.text = lunar_text if weather.text: # 设置天气文本 if weather_label.text != weather.text: weather_label.text = weather.text # 设置气温文本 temperature_text = f"{weather.temperature}℃" if temperature_label.text != temperature_text: temperature_label.text = temperature_text else: weather_text = f'请先连接WIFI' if weather_label.text != weather_text: weather_label.text = weather_text # MQTT LED状态 if mqtt_client.mqtt.is_connected: if mqtt_client.led: # 亮灯显示为红色 led_text = "on" mqtt_label.color=0xFF0000 else: led_text = "off" mqtt_label.color=0xFFFFFF if mqtt_label.text != led_text: mqtt_label.text = led_text # 更新番茄钟 pomodoro_label.color = 0x00FFFF count_text = f"{pomodoro.count}" if count_label.text != count_text: count_label.text = count_text if pomodoro.run: left_seconds = pomodoro.end - time.monotonic() if left_seconds >= 0: minute = int(left_seconds / 60) second = int(left_seconds % 60) # 倒计时每秒更新一次 pomodoro_text = f"{minute:02d}:{second:02d}" if pomodoro_label.text != pomodoro_text: pomodoro_label.text = pomodoro_text else: # 番茄完成时,需要在超时时间内按键确认方可统计为完成的番茄数 timeout_seconds = abs(left_seconds) if not pomodoro.confirmed and timeout_seconds < pomodoro.timeout: pomodoro.confirming = True weather_label.text = f'番茄等待确认' # 超时时显示为红色 pomodoro_label.color = 0xFF0000 pomodoro_label.text = f"{int(pomodoro.timeout - timeout_seconds):02d}" else: pomodoro_label.text = f'{int(pomodoro.time/60):02d}:{int(pomodoro.time%60):02d}' pomodoro.confirming = False pomodoro.confirmed = False pomodoro.run = False await asyncio.sleep(pomodoro.wait) async def monitor_touch_buttons(pomodoro): touch = touchio.TouchIn(board.D10) while True: if touch.value: await asyncio.sleep(0.1) if touch.value: # 按钮开始番茄计时 if not pomodoro.run: pomodoro.run = True pomodoro.end = time.monotonic() + pomodoro.time # 番茄确认状态时,将番茄数加1 elif pomodoro.confirming: pomodoro.confirmed = True pomodoro.count += 1 await asyncio.sleep(0.1) async def main(): # 共享变量设置 internet = Internet() mqtt_client = MqttClient() ntp_datetime = NTPDatetime() weather = Weather() pomodoro = Pomodoro() # 协程函数定义 internet_task = asyncio.create_task(wifi_connect(internet)) mqtt_task = asyncio.create_task(mqtt_connect(mqtt_client)) fetch_time_task = asyncio.create_task(fetch_time(internet, ntp_datetime)) fetch_weather_task = asyncio.create_task(fetch_weather(internet, weather)) pixels_led_task = asyncio.create_task(pixels_led(internet, pomodoro)) lcd_display_task = asyncio.create_task( lcd_display(internet, mqtt_client, ntp_datetime, weather, pomodoro)) monitor_touch_buttons_task = asyncio.create_task( monitor_touch_buttons(pomodoro)) # 启动协程 await asyncio.gather(internet_task, mqtt_task, fetch_time_task, fetch_weather_task, pixels_led_task, lcd_display_task, monitor_touch_buttons_task) asyncio.run(main()) 附settings.toml内容 CIRCUITPY_WIFI_SSID = "<根据实际情况修改>" CIRCUITPY_WIFI_PASSWORD = "<根据实际情况修改>" CIRCUITPY_WEB_API_PASSWORD = "passw0rd" #未使用 CIRCUITPY_WEB_API_PORT = 80 #未使用 TIME_ZONE = 8 # https://www.alapi.cn/api/view/54 LUNAR_API_KEY = "<根据实际情况修改>" WEATHER_API_KEY = "<根据实际情况修改>" WEATHER_CITY = "<根据实际情况修改,可以设置为城市中文名或拼音>" # 一个番茄钟时间秒数 POMODORO_TIME = 900 # 番茄钟完成时,手动确认超时秒数 POMODORO_TIMEOUT = 100 MQTT_HOST = "<根据实际情况修改>" MQTT_PORT = 1883 MQTT_USER = "<根据实际情况修改>" MQTT_PASSWORD = "<根据实际情况修改>" MQTT_TOPIC = "d1" MQTT_CLIENT_ID = "<根据实际情况修改>" 三、源码下载 源码打包:(非最新版本,更新后的代码版本可以参考帖子的内容) ESP32S3-TFT-CircuitPython-番茄日历钟源码-嵌入式开发相关资料下载-EEWORLD下载中心 ⛳️END
    2. 【得捷电子Follow me第2期】番茄日历钟 5/820 DigiKey得捷技术专区 2023-11-26
      本帖最后由 ltpop 于 2023-11-27 12:02 编辑 最终版本代码(有正确缩进) """ 番茄日历钟 ltpop@163.com 202311 v26 """ # TOML配置文件读取 import os import time import rtc import board import displayio import terminalio import touchio import wifi import ssl import socketpool # 需要导入asyncio、adafruit_ticks库 import asyncio # 需要导入neopixel库 import neopixel # 需要导入adafruit_ntp库 import adafruit_ntp # 需要导入adafruit_display_text库 from adafruit_display_text import bitmap_label # 需要导入adafruit_bitmap_font库 from adafruit_bitmap_font import bitmap_font # 需要导入adafruit_imageload库 import adafruit_imageload # 需要导入adafruit_requests库 import adafruit_requests as requests # 需要导入adafruit_led_animation库 from adafruit_led_animation.animation.blink import Blink from adafruit_led_animation.animation.rainbow import Rainbow from adafruit_led_animation.sequence import AnimationSequence import adafruit_led_animation.color as color # 需要导入adafruit_minimqtt库 import adafruit_minimqtt.adafruit_minimqtt as MQTT # 当前设备联网状态 class Internet: def __init__(self): self.state = False self.wait = 30.0 # MQTT class MqttClient: def __init__(self): self.mqtt = None self.is_connected = False self.host = os.getenv("MQTT_HOST") self.port = os.getenv("MQTT_PORT") self.user = os.getenv("MQTT_USER") self.password = os.getenv("MQTT_PASSWORD") self.client_id = os.getenv("MQTT_CLIENT_ID") self.topic = os.getenv("MQTT_TOPIC") self.led = "led1" self.cmd_topic = f"cmd/{self.topic}/{self.led}" self.state_topic = f"stat/{self.topic}/{self.led}" self.led = None self.wait = 0.1 def connect(self, mqtt_cli, userdata, flags, rc): print(f"Connected to MQTT {self.host}") self.is_connected = True mqtt_cli.subscribe(self.state_topic) def disconnect(self, mqtt_cli, userdata, rc): print(f"Disconnected from MQTT {self.host}") self.is_connected = False def message(self, client, topic, message): print(f"New message on topic {topic}: {message}") # 0-关,1-关,2-切换 if topic == self.state_topic: if message == '0': self.led = False elif message == '1': self.led = True # 当前天气状态 class Weather: def __init__(self): self.city = '' self.text = None self.temperature = 20.0 self.wait = 3600.0 # 当前日期时间状态 class NTPDatetime: def __init__(self): self.datetime = None self.ntp = None self.ntp_sync = None self.next_lunar_request_time = None self.weekday = None self.week_name = None self.lunar_year_ganzhi = None self.lunar_month_chinese = None self.lunar_day_chinese = None self.wait = 3600.0 self.retry = 10 # 番茄统计 class Pomodoro: def __init__(self): self.count = 0 self.run = False self.end = 0 self.confirming = False self.confirmed = False self.time = os.getenv("POMODORO_TIME") self.timeout = os.getenv("POMODORO_TIMEOUT") self.wait = 0.2 # 连接wifi async def wifi_connect(internet): # 事件在settings.toml配置 WIFI信息:CIRCUITPY_WIFI_SSID=<wifi ssid名称>和CIRCUITPY_WIFI_PASSWORD=<wifi密码> failure_count = 0 while True: internet.state = wifi.radio.connected if not wifi.radio.connected: try: wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv( "CIRCUITPY_WIFI_PASSWORD")) except OSError as error: print("Failed to connect, retrying\n", error) failure_count += 1 await asyncio.sleep(internet.wait) # 如果wifi没有正常连接,则切换为ap模式 # if not wifi_state: # wifi.radio.start_ap(ssid = 'esp32s3_ap', password = 'abcd1234') async def mqtt_connect(mqtt_client): while True: if not mqtt_client.mqtt: print("Set up a MiniMQTT Client") pool = socketpool.SocketPool(wifi.radio) mqtt = MQTT.MQTT( broker=mqtt_client.host, username=mqtt_client.user, password=mqtt_client.password, socket_pool=pool, client_id=mqtt_client.client_id ) mqtt.on_connect = mqtt_client.connect mqtt.on_disconnect = mqtt_client.disconnect mqtt.on_message = mqtt_client.message mqtt_client.mqtt = mqtt mqtt.connect() try: mqtt_client.mqtt.loop() except (ValueError, RuntimeError) as e: print("Failed to get data, retrying\n", e) mqtt_client.mqtt.reconnect() continue await asyncio.sleep(mqtt_client.wait) # 获取时间 async def fetch_time(internet, ntp_datetime): tz_offset = os.getenv("TIME_ZONE") the_rtc = rtc.RTC() ntp = None while True: if internet.state: ntp_ok = False lunar_ok = False pool = socketpool.SocketPool(wifi.radio) # 获取当前ntp时间, if not ntp: try: ntp = adafruit_ntp.NTP( pool, server="ntp.ntsc.ac.cn", tz_offset=tz_offset) # 更新系统时间 the_rtc.datetime = ntp.datetime ntp_datetime.ntp = ntp ntp_ok = True ntp_datetime.ntp_sync = True except OSError as error: print("NTP failed, retrying\n", error) ntp = None lunar_need_update = False if not ntp_datetime.next_lunar_request_time: lunar_need_update = True elif time.time() > ntp_datetime.next_lunar_request_time: # 超过下一次请求时间 lunar_need_update = True if lunar_need_update: # 获取阴历 try: https = requests.Session(pool, ssl.create_default_context()) lunar_response = https.get( f'https://v2.alapi.cn/api/lunar?token={os.getenv("LUNAR_API_KEY")}') except RuntimeError as error: print("Lunar request failed, retrying\n", error) lunar_response = None if lunar_response: lunar_json = lunar_response.json() if lunar_json['code'] == 200: lunar_data = lunar_json['data'] # print(lunar_data) week_day = lunar_data['week_no'] if week_day: ntp_datetime.week_day = week_day week_name = lunar_data['week_name'] if week_name: ntp_datetime.week_name = week_name lunar_year_ganzhi = lunar_data['ganzhi_year'] if lunar_year_ganzhi: ntp_datetime.lunar_year_ganzhi = lunar_year_ganzhi lunar_month_chinese = lunar_data['lunar_month_chinese'] if lunar_month_chinese: ntp_datetime.lunar_month_chinese = lunar_month_chinese lunar_day_chinese = lunar_data['lunar_day_chinese'] if lunar_day_chinese: ntp_datetime.lunar_day_chinese = lunar_day_chinese lunar_ok = True print( f"当前阴历:{lunar_year_ganzhi}年 {lunar_month_chinese}{lunar_day_chinese}") print(f"当前星期:{week_name}") # 设置下一次更新时间为第二天0点 current_time = time.localtime() ntp_datetime.next_lunar_request_time = time.mktime((current_time.tm_year, current_time.tm_mon, current_time.tm_mday + 1, 0, 0, 0, 0, 0, 0)) # 仅在第二天时再更新 if not ntp_ok or not lunar_ok: await asyncio.sleep(ntp_datetime.retry) else: # dt = time.localtime() # print( # f"当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour:02d}:{dt.tm_min:02d}") # wait_seconds = (24-dt.tm_hour)*60+(60 - dt.tm_sec) await asyncio.sleep(ntp_datetime.wait) else: await asyncio.sleep(internet.wait) # 获取天气信息 async def fetch_weather(internet, weather): # 获取天气API KEY weather_api_key = os.getenv("WEATHER_API_KEY") # 获取天气城市:从配置文件中读取城市设置 weather_city = os.getenv("WEATHER_CITY") while True: if internet.state: # 天气信息 pool = socketpool.SocketPool(wifi.radio) https = requests.Session(pool, ssl.create_default_context()) # 如果读取不到配置的城市,则获取当前IP城市 if not weather_city: # 获取当前外网IP和城市 try: ip_city_response = https.get("https://myip.ipip.net/json") except RuntimeError as error: print("IP city request failed, retrying\n", error) if ip_city_response: ip_city_json = ip_city_response.json() if ip_city_json["ret"] == "ok": weather_city = ip_city_json['data']['location'][2] print(f"当前IP城市:{weather_city}") weather.city = weather_city # 当前天气 weather_now_url = f"https://api.seniverse.com/v3/weather/now.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5" try: weather_now_response = https.get(weather_now_url) except RuntimeError as error: print("Weather request failed, retrying\n", error) weather_now_response = None if weather_now_response: weather_json = weather_now_response.json() if weather_json["results"]: now_weather = weather_json["results"][0]["now"]["text"] now_temperature = weather_json["results"][0]["now"]["temperature"] weather.text = now_weather weather.temperature = now_temperature print(f"当前天气:{now_weather},气温:{now_temperature}℃") # 未来天气预报 # weather_daily_url = f"https://api.seniverse.com/v3/weather/daily.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5" # weather_response = https.get(weather_daily_url) # weather_json = weather_response.json() # if weather_json["results"]: # today_weather = weather_json["results"][0]["daily"][0]["text_day"] # today_temprature_low = weather_json["results"][0]["daily"][0]["low"] # today_temprature_high = weather_json["results"][0]["daily"][0]["high"] # today_humidity = weather_json["results"][0]["daily"][0]["humidity"] # print(f"明天天气:{today_weather},气温:{today_temprature_low}℃ - {today_temprature_high}℃,温度:{today_humidity}%") await asyncio.sleep(weather.wait) else: await asyncio.sleep(internet.wait) # led显示 async def pixels_led(internet, pomodoro): # Neopixel LED控制 pixel_pin = board.NEOPIXEL pixel_num = 1 pixels = neopixel.NeoPixel( pixel_pin, pixel_num, brightness=0.05, auto_write=False) rainbow = Rainbow(pixels, speed=0.1, period=2) blink = Blink(pixels, 0.5, color.GREEN) animations = AnimationSequence( rainbow, advance_interval=5, auto_clear=True, ) while True: # 番茄进行中,显示为红色常亮 if pomodoro.run: # 番茄等待确认中,显示为绿色闪烁 if pomodoro.confirming: blink.speed = 0.5 blink.color = color.GREEN blink.animate() else: pixels.fill((255, 0, 0)) pixels.show() elif not internet.state: blink.speed = 2 blink.color = color.RED blink.animate() else: # 否则显示为彩虹色 animations.animate() await asyncio.sleep(pomodoro.wait) # 屏幕显示 async def lcd_display(internet, mqtt_client, ntp_datetime, weather, pomodoro): display = board.DISPLAY # 中文字体文件放在font目录下 font_file = "font/wenquanyi_13px.pcf" font = bitmap_font.load_font(font_file) group = displayio.Group() # 设置日期显示(左上角) date_label = bitmap_label.Label(terminalio.FONT, scale=1) date_label.anchor_point = (0.0, 0.0) date_label.anchored_position = (5, 5) date_label.text = "2023-11-11" group.append(date_label) # 设置时间显示(右上角) time_label = bitmap_label.Label(terminalio.FONT, color=0xFF0000, scale=2) time_label.anchor_point = (1.0, 0.0) time_label.anchored_position = (display.width - 2, 2) time_label.text = "11:30" group.append(time_label) # 设置农历显示(上中部) lunar_label = bitmap_label.Label(font, color=0x0000FF, scale=1) lunar_label.anchor_point = (0.5, 0.0) lunar_label.anchored_position = (display.width // 2, 5) lunar_label.text = "九月廿八" group.append(lunar_label) # 设置天气显示(左下角) weather_label = bitmap_label.Label(font, color=0x00FF00, scale=1) weather_label.anchor_point = (0.0, 1.0) weather_label.anchored_position = (2, display.height - 5) weather_label.text = "晴" group.append(weather_label) # 设置气温显示(下中部) temperature_label = bitmap_label.Label(font, color=0xFFFF00, scale=1) temperature_label.anchor_point = (0.5, 1.0) temperature_label.anchored_position = ( display.width // 2, display.height - 5) temperature_label.text = "5℃" group.append(temperature_label) # 设置MQTT状态(下中右部) mqtt_label = bitmap_label.Label(terminalio.FONT, scale=1) mqtt_label.anchor_point = (0.5, 1.0) mqtt_label.anchored_position = (display.width // 1.3, display.height - 5) mqtt_label.text = "-" group.append(mqtt_label) # 设置番茄钟倒计时显示(中间) pomodoro_label = bitmap_label.Label( terminalio.FONT, color=0xFF00FF, scale=7) # 显示位置 pomodoro_label.anchor_point = (0.5, 0.5) pomodoro_label.anchored_position = ( display.width // 2, display.height // 2) pomodoro_label.text = "15:00" group.append(pomodoro_label) # 设置倒番茄钟统计显示(右下角) # with open("img/tomato.bmp", "rb") as f: # image, palette = adafruit_imageload.load(f, bitmap=displayio.Bitmap, palette=displayio.Palette) # sprite = displayio.TileGrid(image, pixel_shader=palette) # group.append(sprite) count_label = bitmap_label.Label(terminalio.FONT, color=0x00FFFF, scale=2) # 显示位置 count_label.anchor_point = (1, 1) count_label.anchored_position = (display.width - 2, display.height - 2) count_label.text = "0" group.append(count_label) # 番茄倒计时结束时的确认超时时间,超时不确认该番茄不计入统计 os.getenv("POMODORO_TIMEOUT") # 创建根group main_group = displayio.Group() main_group.append(group) # 展示 display.root_group = main_group while True: if ntp_datetime.ntp_sync: dt = time.localtime() # text = f'当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour}:{dt.tm_min} ,当前城市:{weather.city}, 当前天气:{weather.text} ,温度:{weather.temperature}℃' # 设置日期文本 date_text = f"{dt.tm_year:04d}-{dt.tm_mon:02d}-{dt.tm_mday:02d}" if date_label.text != date_text: date_label.text = date_text # 设置时间文本 time_text = f"{dt.tm_hour:02d}:{dt.tm_min:02d}" if time_label.text != time_text: time_label.text = time_text # 设置农历文本 if ntp_datetime.lunar_month_chinese and ntp_datetime.lunar_day_chinese: lunar_text = f"{ntp_datetime.lunar_month_chinese}{ntp_datetime.lunar_day_chinese}" if lunar_label.text != lunar_text: lunar_label.text = lunar_text if weather.text: # 设置天气文本 if weather_label.text != weather.text: weather_label.text = weather.text # 设置气温文本 temperature_text = f"{weather.temperature}℃" if temperature_label.text != temperature_text: temperature_label.text = temperature_text else: weather_text = f'请先连接WIFI' if weather_label.text != weather_text: weather_label.text = weather_text # MQTT LED状态 if mqtt_client.mqtt.is_connected: if mqtt_client.led: # 亮灯显示为红色 led_text = "on" mqtt_label.color=0xFF0000 else: led_text = "off" mqtt_label.color=0xFFFFFF if mqtt_label.text != led_text: mqtt_label.text = led_text # 更新番茄钟 pomodoro_label.color = 0x00FFFF count_text = f"{pomodoro.count}" if count_label.text != count_text: count_label.text = count_text if pomodoro.run: left_seconds = pomodoro.end - time.monotonic() if left_seconds >= 0: minute = int(left_seconds / 60) second = int(left_seconds % 60) # 倒计时每秒更新一次 pomodoro_text = f"{minute:02d}:{second:02d}" if pomodoro_label.text != pomodoro_text: pomodoro_label.text = pomodoro_text else: # 番茄完成时,需要在超时时间内按键确认方可统计为完成的番茄数 timeout_seconds = abs(left_seconds) if not pomodoro.confirmed and timeout_seconds < pomodoro.timeout: pomodoro.confirming = True weather_label.text = f'番茄等待确认' # 超时时显示为红色 pomodoro_label.color = 0xFF0000 pomodoro_label.text = f"{int(pomodoro.timeout - timeout_seconds):02d}" else: pomodoro_label.text = f'{int(pomodoro.time/60):02d}:{int(pomodoro.time%60):02d}' pomodoro.confirming = False pomodoro.confirmed = False pomodoro.run = False await asyncio.sleep(pomodoro.wait) async def monitor_touch_buttons(pomodoro): touch = touchio.TouchIn(board.D10) while True: if touch.value: await asyncio.sleep(0.1) if touch.value: # 按钮开始番茄计时 if not pomodoro.run: pomodoro.run = True pomodoro.end = time.monotonic() + pomodoro.time # 番茄确认状态时,将番茄数加1 elif pomodoro.confirming: pomodoro.confirmed = True pomodoro.count += 1 await asyncio.sleep(0.1) async def main(): # 共享变量设置 internet = Internet() mqtt_client = MqttClient() ntp_datetime = NTPDatetime() weather = Weather() pomodoro = Pomodoro() # 协程函数定义 internet_task = asyncio.create_task(wifi_connect(internet)) mqtt_task = asyncio.create_task(mqtt_connect(mqtt_client)) fetch_time_task = asyncio.create_task(fetch_time(internet, ntp_datetime)) fetch_weather_task = asyncio.create_task(fetch_weather(internet, weather)) pixels_led_task = asyncio.create_task(pixels_led(internet, pomodoro)) lcd_display_task = asyncio.create_task( lcd_display(internet, mqtt_client, ntp_datetime, weather, pomodoro)) monitor_touch_buttons_task = asyncio.create_task( monitor_touch_buttons(pomodoro)) # 启动协程 await asyncio.gather(internet_task, mqtt_task, fetch_time_task, fetch_weather_task, pixels_led_task, lcd_display_task, monitor_touch_buttons_task) asyncio.run(main()) 欢迎大家一起学习交流    
    3. 【得捷电子Follow me第2期】番茄日历钟 5/820 DigiKey得捷技术专区 2023-11-17
      帖子使用了markdown格式,不知道什么原因,代码缩进没有了,但在编辑模式下是正常的。
    4. 【得捷电子Follow me第2期】番茄日历钟 5/820 DigiKey得捷技术专区 2023-11-15
      本帖最后由 ltpop 于 2023-11-17 22:49 编辑 演示视频   项目源码 """ 番茄日历钟 ltpop@163.com 202311 """ # TOML配置文件读取 import os import time import rtc import board import displayio import terminalio import touchio import wifi import ssl import socketpool # 需要导入asyncio、adafruit_ticks库 import asyncio # 需要导入neopixel库 import neopixel # 需要导入adafruit_ntp库 import adafruit_ntp # 需要导入adafruit_display_text库 from adafruit_display_text import bitmap_label # 需要导入adafruit_bitmap_font库 from adafruit_bitmap_font import bitmap_font # 需要导入adafruit_requests库 import adafruit_requests as requests # 需要导入adafruit_led_animation库 from adafruit_led_animation.animation.blink import Blink from adafruit_led_animation.animation.rainbow import Rainbow from adafruit_led_animation.sequence import AnimationSequence import adafruit_led_animation.color as color # 当前设备联网状态 class Internet: def __init__(self): self.state = False self.wait = 30.0 # 当前天气状态 class Weather: def __init__(self): self.city = '' self.text = None self.temperature = 20.0 self.wait = 300.0 # 当前日期时间状态 class NTPDatetime: def __init__(self): self.datetime = None self.ntp = None self.weekday = None self.week_name = None self.lunar_year_ganzhi = None self.lunar_month_chinese = None self.lunar_day_chinese = None self.wait = 3600.0 self.retry = 10 # 番茄统计 class Pomodoro: def __init__(self): self.count = 0 self.run = False self.end = 0 self.confirming = False self.confirmed = False self.time = os.getenv("POMODORO_TIME") self.timeout = os.getenv("POMODORO_TIMEOUT") self.wait = 0.1 # 连接wifi async def wifi_connect(internet): # 事件在settings.toml配置 WIFI信息:CIRCUITPY_WIFI_SSID=<wifi ssid名称>和CIRCUITPY_WIFI_PASSWORD=<wifi密码> failure_count = 0 while True: internet.state = wifi.radio.connected if not wifi.radio.connected: try: wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv("CIRCUITPY_WIFI_PASSWORD")) except OSError as error: print("Failed to connect, retrying\n", error) failure_count += 1 await asyncio.sleep(internet.wait) # 如果wifi没有正常连接,则切换为ap模式 # if not wifi_state: # wifi.radio.start_ap(ssid = 'esp32s3_ap', password = 'abcd1234') # 获取时间 async def fetch_time(internet, ntp_datetime): tz_offset=os.getenv("TIME_ZONE") the_rtc = rtc.RTC() ntp = None while True: if internet.state: ntp_ok = False lunar_ok = False # 获取当前ntp时间, if not ntp: pool = socketpool.SocketPool(wifi.radio) try: ntp = adafruit_ntp.NTP(pool, server = "cn.ntp.org.cn", tz_offset = tz_offset) # 更新系统时间 the_rtc.datetime = ntp.datetime ntp_datetime.ntp = ntp ntp_ok = True except OSError as error: print("NTP failed, retrying\n", error) ntp = None else: # 获取阴历 pool = socketpool.SocketPool(wifi.radio) https = requests.Session(pool, ssl.create_default_context()) lunar_response = https.get(f'https://v2.alapi.cn/api/lunar?token={os.getenv("LUNAR_API_KEY")}') if lunar_response: lunar_json = lunar_response.json() if lunar_json['code'] == 200: lunar_data = lunar_json['data'] # print(lunar_data) week_day = lunar_data['week_no'] if week_day: ntp_datetime.week_day = week_day week_name = lunar_data['week_name'] if week_name: ntp_datetime.week_name = week_name lunar_year_ganzhi = lunar_data['ganzhi_year'] if lunar_year_ganzhi: ntp_datetime.lunar_year_ganzhi = lunar_year_ganzhi lunar_month_chinese = lunar_data['lunar_month_chinese'] if lunar_month_chinese: ntp_datetime.lunar_month_chinese = lunar_month_chinese lunar_day_chinese = lunar_data['lunar_day_chinese'] if lunar_day_chinese: ntp_datetime.lunar_day_chinese = lunar_day_chinese lunar_ok = True print(f"当前阴历:{lunar_year_ganzhi}年 {lunar_month_chinese}{lunar_day_chinese}") print(f"当前星期:{week_name}") # 仅在第二天时再更新 if not ntp_ok or not lunar_ok: await asyncio.sleep(ntp_datetime.retry) else: dt = time.localtime() print(f"当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour:02d}:{dt.tm_min:02d}") wait_seconds = (24-dt.tm_hour)*60+(60 - dt.tm_sec) await asyncio.sleep(wait_seconds) else: await asyncio.sleep(internet.wait) # 获取天气信息 async def fetch_weather(internet, weather): # 获取天气API KEY weather_api_key = os.getenv("WEATHER_API_KEY") # 获取天气城市:从配置文件中读取城市设置 weather_city = os.getenv("WEATHER_CITY") while True: if internet.state: # 天气信息 pool = socketpool.SocketPool(wifi.radio) https = requests.Session(pool, ssl.create_default_context()) # 如果读取不到配置的城市,则获取当前IP城市 if not weather_city: # 获取当前外网IP和城市 ip_city_response = https.get("https://myip.ipip.net/json") ip_city_json = ip_city_response.json() if ip_city_json["ret"] == "ok": weather_city = ip_city_json['data']['location'][2] print(f"当前IP城市:{weather_city}") weather.city = weather_city # 当前天气 weather_now_url = f"https://api.seniverse.com/v3/weather/now.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5" weather_now_response = https.get(weather_now_url) weather_json = weather_now_response.json() if weather_json["results"]: now_weather = weather_json["results"][0]["now"]["text"] now_temperature = weather_json["results"][0]["now"]["temperature"] weather.text = now_weather weather.temperature = now_temperature print(f"当前天气:{now_weather},气温:{now_temperature}℃") # 未来天气预报 # weather_daily_url = f"https://api.seniverse.com/v3/weather/daily.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5" # weather_response = https.get(weather_daily_url) # weather_json = weather_response.json() # if weather_json["results"]: # today_weather = weather_json["results"][0]["daily"][0]["text_day"] # today_temprature_low = weather_json["results"][0]["daily"][0]["low"] # today_temprature_high = weather_json["results"][0]["daily"][0]["high"] # today_humidity = weather_json["results"][0]["daily"][0]["humidity"] # print(f"明天天气:{today_weather},气温:{today_temprature_low}℃ - {today_temprature_high}℃,温度:{today_humidity}%") await asyncio.sleep(weather.wait) else: await asyncio.sleep(internet.wait) # led显示 async def pixels_led(internet, pomodoro): # Neopixel LED控制 pixel_pin = board.NEOPIXEL pixel_num = 1 pixels = neopixel.NeoPixel(pixel_pin, pixel_num, brightness=0.05, auto_write=False) rainbow = Rainbow(pixels, speed=0.1, period=2) # 番茄等待确认中,显示为青蓝色闪烁 blink = Blink(pixels, 0.5, color.CYAN) animations = AnimationSequence( rainbow, advance_interval=5, auto_clear=True, ) while True: # 番茄进行中,显示为红色常亮 if pomodoro.run: # 番茄等待确认中,显示为 if pomodoro.confirming: blink.animate() else: pixels.fill((255, 0, 0)) pixels.show() else: # 否则显示为彩虹色 animations.animate() await asyncio.sleep(pomodoro.wait) # 屏幕显示 async def lcd_display(internet, ntp_datetime, weather, pomodoro): display = board.DISPLAY # 中文字体文件放在font目录下 font_file = "font/wenquanyi_13px.pcf" font = bitmap_font.load_font(font_file) group = displayio.Group() # 设置日期显示(左上角) date_label = bitmap_label.Label(terminalio.FONT, scale=1) date_label.anchor_point = (0.0, 0.0) date_label.anchored_position = (5, 5) date_label.text = "2023-11-11" group.append(date_label) # 设置时间显示(右上角) time_label = bitmap_label.Label(terminalio.FONT, color=0xFF0000, scale=2) time_label.anchor_point = (1.0, 0.0) time_label.anchored_position = (display.width - 2, 2) time_label.text = "11:30" group.append(time_label) # 设置农历显示(上中部) lunar_label = bitmap_label.Label(font, color=0x0000FF, scale=1) lunar_label.anchor_point = (0.5, 0.0) lunar_label.anchored_position = (display.width // 2, 5) lunar_label.text = "九月廿八" group.append(lunar_label) # 设置天气显示(左下角) weather_label = bitmap_label.Label(font, color=0x00FF00, scale=1) weather_label.anchor_point = (0.0, 1.0) weather_label.anchored_position = (2, display.height - 5) weather_label.text = "晴" group.append(weather_label) # 设置气温显示(下中部) temperature_label = bitmap_label.Label(font, color=0xFFFF00, scale=1) temperature_label.anchor_point = (0.5, 1.0) temperature_label.anchored_position = (display.width // 2, display.height - 5) temperature_label.text = "5℃" group.append(temperature_label) # 设置番茄钟倒计时显示(中间) pomodoro_label = bitmap_label.Label(terminalio.FONT, color=0xFF00FF, scale=7) # 显示位置 pomodoro_label.anchor_point = (0.5, 0.5) pomodoro_label.anchored_position = (display.width // 2, display.height // 2) pomodoro_label.text = "15:00" group.append(pomodoro_label) # 设置倒番茄钟统计显示(右下角) count_label = bitmap_label.Label(terminalio.FONT, color=0x00FFFF, scale=2) # 显示位置 count_label.anchor_point = (1, 1) count_label.anchored_position = (display.width - 2, display.height - 2) count_label.text = "0" group.append(count_label) # 番茄倒计时结束时的确认超时时间,超时不确认该番茄不计入统计 os.getenv("POMODORO_TIMEOUT") # 创建根group main_group = displayio.Group() main_group.append(group) # 展示 display.root_group = main_group while True: if internet.state: dt = time.localtime() # text = f'当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour}:{dt.tm_min} ,当前城市:{weather.city}, 当前天气:{weather.text} ,温度:{weather.temperature}℃' # 设置日期文本 date_label.text = f"{dt.tm_year:04d}-{dt.tm_mon:02d}-{dt.tm_mday:02d}" # 设置时间文本 time_label.text = f"{dt.tm_hour:02d}:{dt.tm_min:02d}" # 设置农历文本 if ntp_datetime.lunar_month_chinese and ntp_datetime.lunar_day_chinese: lunar_label.text = f"{ntp_datetime.lunar_month_chinese}{ntp_datetime.lunar_day_chinese}" if weather.text: # 设置天气文本 weather_label.text = f"{weather.text}" # 设置气温文本 temperature_label.text = f"{weather.temperature}℃" else: weather_label.text = f'请先连接WIFI' timeout = pomodoro.wait # 更新番茄钟 pomodoro_label.color = 0x00FFFF count_label.text = f"{pomodoro.count}" if pomodoro.run: left_seconds = pomodoro.end - time.monotonic() if left_seconds >= 0: minute = int(left_seconds / 60) second = int(left_seconds % 60) # 倒计时每秒更新一次 sec = left_seconds % 1 timeout = 1.0 - sec pomodoro_label.text = f"{minute:02d}:{second:02d}" else: # 番茄完成时,需要在超时时间内按键确认方可统计为完成的番茄数 timeout_seconds = abs(left_seconds) if not pomodoro.confirmed and timeout_seconds < pomodoro.timeout: pomodoro.confirming = True weather_label.text = f'番茄等待确认' # 超时时显示为红色 pomodoro_label.color = 0xFF0000 pomodoro_label.text = f"{int(pomodoro.timeout - timeout_seconds):02d}" else: pomodoro_label.text = f'{int(pomodoro.time/60):02d}:{int(pomodoro.time%60):02d}' pomodoro.confirming = False pomodoro.confirmed = False pomodoro.run = False else: timeout = pomodoro.wait await asyncio.sleep(timeout) async def monitor_touch_buttons(pomodoro): touch = touchio.TouchIn(board.D10) while True: if touch.value: # 按钮开始番茄计时 if not pomodoro.run: pomodoro.run = True pomodoro.end = time.monotonic() + pomodoro.time # 番茄确认状态时,将番茄数加1 elif pomodoro.confirming: pomodoro.confirmed = True pomodoro.count += 1 await asyncio.sleep(pomodoro.wait) async def main(): # 共享变量设置 internet = Internet() ntp_datetime = NTPDatetime() weather = Weather() pomodoro = Pomodoro() # 协程函数定义 internet_task = asyncio.create_task(wifi_connect(internet)) fetch_time_task = asyncio.create_task(fetch_time(internet, ntp_datetime)) fetch_weather_task = asyncio.create_task(fetch_weather(internet,weather)) pixels_led_task = asyncio.create_task(pixels_led(internet,pomodoro)) lcd_display_task = asyncio.create_task(lcd_display(internet, ntp_datetime, weather, pomodoro)) monitor_touch_buttons_task = asyncio.create_task(monitor_touch_buttons(pomodoro)) # 启动协程 await asyncio.gather(internet_task, fetch_time_task, fetch_weather_task, pixels_led_task, lcd_display_task, monitor_touch_buttons_task) asyncio.run(main())  

最近访客

< 1/1 >

统计信息

已有3人来访过

  • 芯积分:49
  • 好友:--
  • 主题:2
  • 回复:4

留言

你需要登录后才可以留言 登录 | 注册


现在还没有留言