StreakingJerry

  • 2024-10-06
  • 发表了主题帖: 【2024 DigiKey 创意大赛】姿态传感器

    姿态捕获单元 作者:StreakingJerry   一、作品简介 BME280传感器集成了温湿度,气压于一身,而且还是使用常用的I2C来进行通讯,可以支持多传感器一起使用。我打算以它为中心,在常规的姿态追踪器中再添加一个维度,实现在空间姿态的基础上增加高度获取,从而实现更广泛的应用。这里面的主要难度是高度获取通过气压和温度计算得出,数据扰动会较大,因此需要多传感器进行数据融合,校准,滤波,才能最终实现有效的高数数据。另外一点就是上位机可视化展示的实现。 二、系统框图 三、各部分功能说明 第一部分是四元数数据的获取,通过LSM6DSV16X传感器得到。这款传感器的性能十分强大。我使用的是板载在XSNUCLEO姿态传感器拓展板上的传感器,这样使用起来非常简单,直接板对板插上就可以。   而BME280这边,我自己绘制了电路板。电路上包含了稳压电路和电平转换电路,这样5V和3.3V都可以适配。   四个引脚的顺序刚好与Nucleo开发板左上角的I2C接口相同,因为模块功率并不高,实测直接使用AVDD进行供电也完全没有问题。这样就不需要任何额外的接线,依然可以板对板连接。     四、作品源码 项目代码分为两部分,第一部分是需要上传到Nucleo开发板中的源码,使用Arduino编写;第二部分是需要放到树莓派上面的上位机源码,使用python编写。 五、作品功能演示视频 [localvideo]7c68d1cb108367ffc28d96ec95185e2c[/localvideo]   六、项目总结 通过这次活动,尝试了之前没有试过的玩法,这是一次非常有意思的尝试过程。

  • 2024-08-12
  • 发表了主题帖: 【2024 DigiKey 创意大赛】体感追踪器 - 物料开箱

    收到了德捷寄来的包裹,现在的快递果然是相较过去快了很多。   我计划用这些东西做一个体感姿态追踪器,相比常规的欧拉角和四元数,我计划再多输出高度数据。   陀螺仪模块我刚好有个现成的支持arduino的运动传感器模块,上面有多个运动传感器可以让我选择。     两个板子可以直接拼接在一起。   接下来的项目中,我会自己设计一块BME280的板子,焊接好芯片后接入这套系统,进行项目开发。

  • 2024-08-08
  • 加入了学习《【Follow me第二季第1期】使用circuitpython开发并完成全部任务》,观看 【Follow me第二季第1期】使用circuitpython开发并完成全部任务

  • 2024-08-07
  • 上传了资料: 【Follow me第二季第1期】使用circuitpython开发并完成全部任务

  • 发表了主题帖: 【Follow me第二季第1期】使用circuitpython开发并完成全部任务

    本帖最后由 StreakingJerry 于 2024-8-8 11:58 编辑 项目介绍 本项目主控为Adafruit Circuit Playground Express,我另外搭配了一块树莓派zero 2w,用来辅助完成章鱼哥的拓展任务。开箱的器件如下:     视频讲解:   入门任务(必做):开发环境搭建,板载LED点亮 circuitpython烧写非常简单,在下面的页面下载最新版的官方uf2格式固件: https://circuitpython.org/board/circuitplayground_express/ 然后插上开发板,点击一下reset按钮,将会出现一个叫做CPLAYBOOT的磁盘,将UF2固件复制进去就可以完成烧写。   烧写完成后,circuitpython会变成另外一个磁盘,代码文件code.py就在其中,我们直接修改这个文件就可以改变代码,因此可以直接用vscode,将工作目录设置为这个磁盘就可以。并且可以使用串口插件来读取到串口数据。     接下来可以正式开始开发。   这块板子由于是adafruit官方出品,因此在cpy固件内已经freeze了所有所需的库,可以从源代码中看到:   因此我们直接写代码就可以。这块板子有一个专门的库去对应他上面的所有外设,叫做adafruit_circuitplayground.express。只需要导入他,就可以直接点灯并blink,闪烁的灯是USB接口旁边的红灯。由于项目有多个任务需要完成,因此我们加一个判断来方便任务切换。 from adafruit_circuitplayground.express import cpx # 任务选择 task1 = 0 task2 = 0 task3 = 0 task4 = 0 task5 = 0 task6 = 0 task7 = 1 # 入门任务(必做):开发环境搭建,板载LED点亮 if task1: while True: cpx.red_led = True # Turns the little LED next to USB on time.sleep(0.5) cpx.red_led = False time.sleep(0.5)   基础任务一(必做):控制板载炫彩LED,跑马灯点亮和颜色变换 为了实现跑马灯效果,需要先定义一个函数来计算灯的RGB值,然后用该函数分别设置每个灯的RGB,再循环增加index就可以。在程序开头我们还可以定义一下常用的颜色,灯珠数量,以及并设置一下亮度,方便后面使用。 color = { "black": 0x000000, "white": 0xFFFFFF, "red": 0xFF0000, "green": 0x00FF00, "blue": 0x0000FF, "cyan": 0x00FFFF, "magenta": 0xFF00FF, "yellow": 0xFFFF00, } # Not too bright! cpx.pixels.brightness = 0.3 # Number of Pixel on board pix_num = 10 def wheel(pos): # Input a value 0 to 255 to get a color value. # The colours are a transition r - g - b - back to r. if (pos < 0) or (pos > 255): return color["black"] if pos < 85: return (int(255 - pos * 3), int(pos * 3), 0) elif pos < 170: pos -= 85 return (0, int(255 - (pos * 3)), int(pos * 3)) else: pos -= 170 return (int(pos * 3), 0, int(255 - pos * 3)) # 基础任务一(必做):控制板载炫彩LED,跑马灯点亮和颜色变换 if task2: pixeln = 0 while True: for p in range(pix_num): rainbow = wheel(255 // pix_num * ((pixeln + p) % pix_num)) cpx.pixels[p] = tuple( int(c * ((pix_num - (pixeln + p) % pix_num)) / pix_num) for c in rainbow ) # Each time 'round we tick off one pixel at a time if cpx.switch: # depending on the switch we'll go clockwise pixeln += 1 if pixeln > pix_num - 1: pixeln = 0 else: # or counter clockwise pixeln -= 1 if pixeln < 0: pixeln = pix_num - 1       基础任务二(必做):监测环境温度和光线,通过板载LED展示舒适程度 通过cpx读取到光线和温度数据后,可以根据当前数值和舒适数值的差异,计算出一个”难受指数“,用这个数字的大小来表示舒适度。数字越大时,代表环境越不好,板子上的RGB灯会亮的越多。在这之前,我们还需要写一个范围映射函数,来把一个范围内的变量映射到另一个范围。 def map_range(input_value, input_min, input_max, output_min, output_max): std_value = (input_value - input_min) / (input_max - input_min) return std_value * (output_max - output_min) + output_min # 基础任务二(必做):监测环境温度和光线,通过板载LED展示舒适程度 if task3: while True: temperature_value = cpx.temperature light_value = cpx.light discomfort_value = abs(light_value - 20) + abs(temperature_value - 25) print( "Temperature: %0.1f *C; Light Level: %d; Discomfort Level: %d" % (temperature_value, light_value, discomfort_value) ) max_value = 100 min_value = 20 discomfort_value = min(discomfort_value, max_value) discomfort_value = max(discomfort_value, min_value) discomfort_index = int(map_range(discomfort_value,min_value,max_value,0,10)) for p in range(10): if p <= discomfort_index: cpx.pixels[p] = tuple(int(c * ((10 - p % 10)) / 10.0) for c in wheel(25 * (p % 10))) else: cpx.pixels[p] = color["black"]       基础任务三(必做):接近检测——设定安全距离并通过板载LED展示,检测到入侵时,发起声音报警   这个任务使用了板子上的红外发射和接收。如果板子面前有障碍物,就会反射更多的红外发射光,接收管会有更大的读数,通过这种方式来检测侵入。当侵入超过设定阈值,我这里设置为5时,则播放一个声音来报警。   这里最为有趣的是,需要让红外LED闪一下,关闭LED后再去读取传感器的数值。不能直接让红外LED亮着去读数。具体什么原理我还没想明白,希望大神赐教。 # 基础任务三(必做):接近检测——设定安全距离并通过板载LED展示,检测到入侵时,发起声音报警 if task4: ir_tx = DigitalInOut(board.IR_TX) ir_tx.direction = Direction.OUTPUT proximity = AnalogIn(board.IR_PROXIMITY) while True: ir_tx.value = True time.sleep(0.001) ir_tx.value = False proximity_value = proximity.value print("proximity Level: %d" % proximity_value) max_value = 42500 min_value = 31500 interval_value = (max_value - min_value) / 11 proximity_index = (proximity_value - min_value) // interval_value for p in range(10): if p <= proximity_index: cpx.pixels[p] = tuple(int(c * ((10 - p % 10)) / 10.0) for c in wheel(25 * (p % 10))) else: cpx.pixels[p] = color["black"] if proximity_index > 5: cpx.play_file("Fanfare.wav")     进阶任务(必做):制作不倒翁——展示不倒翁运动过程中的不同灯光效果 这个任务中我们先读出xyz加速度的值,这里我们只使用x和y的数据,也就是板子平面上的。通过笛卡尔坐标转换极坐标的方式,用反三角函数和勾股定理计算出不倒翁倾斜的角度和倾斜的程度,并在对应方向亮起指示灯。 # 进阶任务(必做):制作不倒翁——展示不倒翁运动过程中的不同灯光效果 if task5: while True: x, y, z = cpx.acceleration angle = (2 - (math.atan2(x, y) / math.pi + 1)) * 180 magnitude = math.sqrt((x * x) + (y * y)) print( "Accelerometer: (%0.1f, %0.1f, %0.1f) m/s^2; Angle: %0.1f, Magnitude: %0.1f" % (x, y, z, angle, magnitude) ) if magnitude > 2: magnitude = min(magnitude,9.8) for p in range(10): if p == (angle * 10 // 360): cpx.pixels[p] = tuple( int(c * ((10 - p % 10)) / 10.0) for c in wheel(25 * (p % 10)) ) else: cpx.pixels[p] = color["black"]       创意任务三:水果钢琴——通过触摸水果弹奏音乐,并配合灯光效果 板子上除了DAC A0外,其余所有引脚均支持电容触摸功能。程序检测到某一个引脚被触摸后,会亮起离它最近的LED,并播放对应音高的音。由于水果是导电的,也具备一定的电容,因此我们把引脚引出来连接到水果上也同样有效。只是记得,这个传感器的原理是测量电容值的变化量,因此会在初始化的时候记录当前电容测量值作为参考。因此,接上水果后,需要重新运行代码,改变一下基准参考。 # 创意任务三:水果钢琴——通过触摸水果弹奏音乐,并配合灯光效果 if task7: while True: if cpx.touch_A4: cpx.start_tone(524) cpx.pixels[0] = color["magenta"] elif cpx.touch_A5: cpx.start_tone(588) cpx.pixels[1] = color["magenta"] elif cpx.touch_A6: cpx.start_tone(660) cpx.pixels[3] = color["magenta"] elif cpx.touch_A7: cpx.start_tone(698) cpx.pixels[4] = color["magenta"] elif cpx.touch_A1: cpx.start_tone(784) cpx.pixels[6] = color["magenta"] elif cpx.touch_A2: cpx.start_tone(880) cpx.pixels[8] = color["magenta"] elif cpx.touch_A3: cpx.start_tone(988) cpx.pixels[9] = color["magenta"] else: cpx.stop_tone() for p in range(10): cpx.pixels[p] = color["black"]       创意任务二:章鱼哥——章鱼哥的触角根据环境声音的大小,章鱼哥的触角可舒展或者收缩 这个任务我使用了两种方式实现。 第一种是直接驱动舵机,当检测到声音后,计算声音升压的平均值,然后将这个值转到舵机可接受的pwm脉宽范围,并将PWM信号送出去驱动舵机,同时根据平均声压大小来亮起对应数量的指示灯。PWM输出使用的引脚是A1。 # 创意任务二:章鱼哥——章鱼哥的触角根据环境声音的大小,章鱼哥的触角可舒展或者收缩 if task6: def servo_duty_cycle(pulse_us, frequency=50): period_us = 1.0 / frequency * 1000000.0 duty_cycle = int(pulse_us / (period_us / 65535.0)) return duty_cycle def mean(values): return sum(values) / len(values) def normalized_rms(values): minbuf = int(mean(values)) sum_of_samples = sum( float(sample - minbuf) * (sample - minbuf) for sample in values ) return math.sqrt(sum_of_samples / len(values)) mic = audiobusio.PDMIn( board.MICROPHONE_CLOCK, board.MICROPHONE_DATA, sample_rate=16000, bit_depth=16 ) samples = array.array("H", [0] * 320) mic.record(samples, len(samples)) pwm = pwmio.PWMOut(board.A1, frequency=50, duty_cycle=0) while True: mic.record(samples, len(samples)) sound_value = normalized_rms(samples) max_value = 1500 min_value = 150 sound_value = min(sound_value, max_value) sound_value = max(sound_value, min_value) print("Sound Level: %d" % sound_value) pwm.duty_cycle = servo_duty_cycle(int(map_range(sound_value, min_value, max_value, 500, 2500))) sound_index = int(map_range(sound_value, min_value, max_value, 0, 10)) for p in range(10): if p <= sound_index: cpx.pixels[p] = tuple( int(c * ((10 - p % 10)) / 10.0) for c in wheel(25 * (p % 10)) ) else: cpx.pixels[p] = color["black"]     第二种方式是开发板仅作为信号输出,并不直接驱动舵机,舵机用树莓派zero 2w来驱动。这就需要树莓派先读取PWM信号,再输出pwm信号。读取PWM信号需要用到精确的中断计时,对树莓派来说有一定难度,使用pigpio库来实现。 import time import pigpio import read_PWM pi = pigpio.pi() p = read_PWM.reader(pi, 23) while True: f = p.frequency() us = p.pulse_width() dc = p.duty_cycle() print("f={:.1f} pw={} dc={:.2f}".format(f, int(us + 0.5), dc)) if 500 <= us <= 2500: pi.set_servo_pulsewidth(24, us) time.sleep(0.01) p.cancel() pi.stop()   我们把单片机的PWM街道23号脚,把舵机接到24号脚上。   然后运行代码: #!/bin/bash sudo pigpiod python main.py     心得体会 第一次使用如此方便的开发板,官方甚至为这块板子专门写了一个cpx库,用起来简直就是一键调用,甚至连初始化代码等都可以直接省略。可以说,这块开发板是我迄今为止见到过最适合新手小白入门的板子。   完整的项目代码放在下面。大家测试的时候需要注意,由于CPY是脚本语言,因此即使我们没有运行某一部分的代码,这部分代码所需的内存依旧会被计算在内。所有7个任务占用的内存超过了板子的内存限制。由于章鱼哥部分代码占用内存较大,因此在测试其他任务时,需要将章鱼哥的全部代码都注释掉才可以。 https://download.eeworld.com.cn/detail/eew_AG2DvH/633949  

  • 2024-02-12
  • 加入了学习《【得捷电子Follow me第4期】使用micropython完成全部任务》,观看 【得捷电子Follow me第4期】使用micropython完成全部任务

  • 上传了资料: 【得捷电子Follow me第4期】使用micropython完成全部任务

  • 发表了主题帖: 【得捷电子Follow me第4期】使用micropython完成全部任务

      这次活动使用的开发板w5500-pico,相对来说比较小众。虽然w5500和pico都有非常多的应用,但将他们组合起来,资源就显得捉襟见肘。官方库对micropython有一定支持,但并不完善,比如基本的ping方法都没有。由于年前在讨论群里吹水说年后整个活,那现在就兑现一下吧,全部任务都使用micropython来完成,如果需要造轮子,那就造个轮子。   任务使用树莓派5作为上位机,全部代码编写等操作都在树莓派5中进行完成。搭配官方散热器后,整体运行相当给力。     入门任务:开发环境搭建,BLINK,驱动液晶显示器进行显示(没有则串口HelloWorld)   首先先到micropython官网下载最新版的uf2固件:   下载好后,插入我们的开发板,应该会直接弹出一个u盘,把刚刚下载的uf2复制进去后,u盘消失,这就完成了烧写工作。   接下来我们打开terminal,输入thonny,就会自动打开thonny。点击右下角就可以看到我们的开发板。      这两个选择任意一个,都可以成功连接。   下面开始代码。 # 入门任务:开发环境搭建,BLINK,驱动液晶显示器进行显示(没有则串口HelloWorld) import machine import time led = machine.Pin(25, machine.Pin.OUT) for i in range(5): led.high() time.sleep(0.5) led.low() time.sleep(0.5) print("") print("Hello EEWorld!") print("") print("你好,电子工程世界!")   运行后就可以看到板子上的绿灯闪了5次,闪完后shell输出相应字符。         基础任务一:完成主控板W5500初始化(静态IP配置),并能使用局域网电脑ping通,同时W5500可以ping通互联网站点;通过抓包软件(Wireshark、Sniffer等)抓取本地PC的ping报文,展示并分析 我们把连接网络的功能放进单独的w5500文件中: from machine import Pin,SPI import network import time def w5500(config): # could be "dhcp" or ('192.168.x.xxx','255.255.255.0','192.168.x.1','8.8.8.8') #spi init spi=SPI(0,2_000_000, mosi=Pin(19),miso=Pin(16),sck=Pin(18)) nic = network.WIZNET5K(spi,Pin(17),Pin(20)) #spi,cs,reset pin nic.active(True)#nicwork active nic.ifconfig(config) while not nic.isconnected(): time.sleep(1) # print(nic.regs())#Print register information #Print nicwork address information print("IP Address:",nic.ifconfig()[0]) print("Subnic Mask:",nic.ifconfig()[1]) print("Gateway:",nic.ifconfig()[2]) print("DNS:",nic.ifconfig()[3]) return nic   下面先讲讲ping的实现,ping是基于icmp协议发送的报文,虽然也是通过socket方法实现的,但要注意的是这里我们的socket使用的是SOCK_RAW和ICMP,它的index是1。这里和我们使用常规tcp方法不一样,icmp更为底层。   好在开源代码有不少使用python实现ping方法的案例,让我不需要真的完全从头造轮子。随便找了一个,修改了一下socket的使用,使用uctypes来封装结构体,并且修改了一下原始代码中ping逻辑有问题的地方,就可以完成一个仅依赖于micropython中socket方法的ping。 # μPing (MicroPing) for MicroPython # copyright (c) 2018 Shawwwn <shawwwn1@gmail.com> # License: MIT # Internet Checksum Algorithm # Author: Olav Morken # https://github.com/olavmrk/python-ping/blob/master/ping.py # @data: bytes def checksum(data): if len(data) & 0x1: # Odd number of bytes data += b'\0' cs = 0 for pos in range(0, len(data), 2): b1 = data[pos] b2 = data[pos + 1] cs += (b1 << 8) + b2 while cs >= 0x10000: cs = (cs & 0xffff) + (cs >> 16) cs = ~cs & 0xffff return cs def ping(host, count=4, timeout=30, interval=1, quiet=False, size=64): import time import select import uctypes import socket import struct import random # prepare packet assert size >= 16, "pkt size too small" pkt = b'Q'*size pkt_desc = { "type": uctypes.UINT8 | 0, "code": uctypes.UINT8 | 1, "checksum": uctypes.UINT16 | 2, "id": uctypes.UINT16 | 4, "seq": uctypes.INT16 | 6, "timestamp": uctypes.UINT64 | 8, } # packet header descriptor h = uctypes.struct(uctypes.addressof(pkt), pkt_desc, uctypes.BIG_ENDIAN) h.type = 8 # ICMP_ECHO_REQUEST h.code = 0 h.checksum = 0 h.id = random.getrandbits(16) h.seq = 1 # init socket sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, 1) sock.setblocking(0) sock.settimeout(timeout/1000) c = 0 addr = None while c < 10: try: c += 1 time.sleep_ms(100) addr = socket.getaddrinfo(host, 1)[0][-1][0] # ip address break except: pass if not addr: print("DNS lookup failed") return sock.connect((addr, 1)) not quiet and print("PING %s (%s): %u data bytes" % (host, addr, len(pkt))) seqs = list(range(1, count+1)) # [1,2,...,count] c = 1 t = time.time() tstart = time.time() n_trans = 0 n_recv = 0 finish = False while time.time() - tstart < timeout: if time.time() - t >= interval and c<=count: # send packet h.checksum = 0 h.seq = c h.timestamp = time.ticks_us() h.checksum = checksum(pkt) if sock.send(pkt) == size: n_trans += 1 t = time.time() # reset timeout else: seqs.remove(c) # recv packet socks, _, _ = select.select([sock], [], [], 0) if socks: resp = socks[0].recv(4096) resp_mv = memoryview(resp) h2 = uctypes.struct(uctypes.addressof(resp_mv[20:]), pkt_desc, uctypes.BIG_ENDIAN) # TODO: validate checksum (optional) seq = h2.seq if h2.type==0 and h2.id==h.id and (seq in seqs): # 0: ICMP_ECHO_REPLY t_elasped = (time.ticks_us()-h2.timestamp) / 1000 ttl = struct.unpack('!B', resp_mv[8:9])[0] # time-to-live n_recv += 1 c += 1 not quiet and print("%u bytes from %s: icmp_seq=%u, ttl=%u, time=%f ms" % (len(resp), addr, seq, ttl, t_elasped)) seqs.remove(seq) if len(seqs) == 0: finish = True if finish: break # close sock.close() ret = (n_trans, n_recv) not quiet and print("%u packets transmitted, %u packets received" % (n_trans, n_recv)) return (n_trans, n_recv) 接着在主程序里写入以下代码运行,就可以看到打印。 这里我是用ping网关方法来判断是否成功连接上网络。因为我发现虽然在联网中已经有了connected判断,但实际上即使判断已联网,但并未连接,还需要一段时间。这应该是mpy中驱动的bug。 # 基础任务一:完成主控板W5500初始化(静态IP配置),并能使用局域网电脑ping通,同时W5500可以ping通互联网站点;通过抓包软件(Wireshark、Sniffer等)抓取本地PC的ping报文,展示并分析 from w5500 import w5500 from ping import ping ip = "192.168.8.213" gate = ip.split(".") gate[-1] = "1" gate = ".".join(gate) nic = w5500((ip,"255.255.255.0",gate,"8.8.8.8")) ping(gate, quiet=True) ping("eeworld.com.cn") 运行后,可以看到以下输出:     这时候我们尝试一下用主机去ping w5500:     抓包之前,我们先用sudo apt install wireshark 来安装一下。为了确保我们使用非root用户在桌面环境下也可以抓包,当安装过程中弹出dumpcap功能时,我们要选择yes启动。如果不小心错过,也可以通过sudo dpkg-reconfigure wireshark-common来开启。     接着我们输入以下命令,配置当前用户权限,然后注销重新登陆,就可以正常使用了。 sudo chmod +x /usr/bin/dumpcap sudo usermod -a -G wireshark pi   用wireshark抓包看一下,可以看到抓到对应的包:     基础任务二:主控板建立TCPIP或UDP服务器,局域网PC使用TCPIP或UDP客户端进行连接并发送数据,主控板接收到数据后,送液晶屏显示(没有则通过串口打印显示);通过抓包软件抓取交互报文,展示并分析。(TCP和UDP二选一,或者全都操作) 这个任务我们TCP与UDP都尝试一下。 TCP实验我们在W5500上搭建一个HTTP服务来进行,而UDP实验我们通过MQTT来实现。使用一个树莓派zero w作为MQTT broker,用另一个pico WH作为MQTT客户端,与我们的W5500 pico实现UDP通讯。   首先看HTTP部分,我们直接用socket方法发送HTTP头和内容就可以实现通信:   import socket addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1] s = socket.socket() s.bind(addr) s.listen(1) print('listening on', addr) def callback(content): start_index = content.find("comment=") + len("comment=") end_index = content.find("HTTP/1.1") if start_index != -1 and end_index != -1: comment = content[start_index:end_index].strip() print() print(comment) html = """ <!DOCTYPE html> <html> <head> <title>Micropython</title> </head> <body>%s</body> </html> """ for i in range(3): cl, addr = s.accept() # print('client connected from', addr) content = b"" while True: msg=cl.recv(1024) content += msg if content[-4:] == b"\r\n\r\n": break callback(content.decode()) response = html % """ <form action="/action_page.php" id="usrform"> <textarea rows="4" cols="50" name="comment"></textarea><br> <input type="submit"> </form> """ cl.send('HTTP/1.0 200 OK\r\nContent-type: text/html\r\n\r\n') cl.send(response) cl.close() s.close()   这里我们选择接受三次连接后结束。打开网页,输入W5500的ip,就可以看到对应页面。输入内容后,shell就可以打印出对应内容。   由于这是主机和W5500之间的通信,因此wireshark中可以抓到相应的包:   当三次访问完成后,就进入MQTT部分。   先在树莓派zero w上搭建我们的MQTT服务器。用ssh登陆上后,使用 sudo apt install mosquitto来安装服务。安装完成后,我们首先需要关闭原本的服务,因为这里我们想手动启动,并且观察broker的输出。 sudo systemctl stop mosquitto sudo systemctl disable mosquitto sudo systemctl status mosquitto 看到如下内容,说明已经成功关闭服务。   接着我们用nano mosquitto.conf新建一个配置文件,写入以下内容 allow_anonymous true listener 1883 保存后,就可以使用mosquitto -c mosquitto.conf来启动broker,启动后显示以下内容:     下面开始写W5500的代码。我使用的是官方的umqttsimple库,这个库有个小问题,默认的keepalive参数会导致无法运行,我们需要手动指定为10。这里我们连接上broker,并订阅主题w5500,收到内容后讲内容打印出来。完整代码如下: print("test MQTT") from umqttsimple import MQTTClient def callback(topic, msg): print((topic, msg)) if topic == b'w5500': print(msg) client = MQTTClient( client_id = "w5500", server = '192.168.8.174', keepalive = 10, ) client.set_callback(callback) client.connect() client.subscribe("w5500") print('Connected') while True: try: client.check_msg() except: client.disconnect() client.connect() client.subscribe("w5500")   下面我们写pico WH上的代码。pico WH我们使用circuitpython来完成,尽量多做一些不同的实验,避免重复。circuitpython刷机过程和micropython一致,就不重复说明了。刷好后,在settings.toml中写入以下内容,修改ssid和密码,然后断电重启后,就可以自动联网。切记一定要断电重启,soft reboot不行。 CIRCUITPY_WIFI_SSID="ssid" CIRCUITPY_WIFI_PASSWORD="password"   主程序实现每隔两秒钟向w5500主题发送一次消息。 import time import wifi import socketpool import adafruit_minimqtt.adafruit_minimqtt as MQTT pool = socketpool.SocketPool(wifi.radio) mqtt_client = MQTT.MQTT( broker='192.168.8.174', port=1883, socket_pool=pool, client_id='pico_wh' ) mqtt_client.connect() print('Start') while True: mqtt_client.publish('w5500', "Hello EEWorld!!!") time.sleep(2)   把上面代码写入code.py,就可以实现上电自动运行。如果一切顺例,我们就可以在w5500的shell中看到打印的内容:     进阶任务:从NTP服务器(注意数据交互格式的解析)同步时间,获取时间送显示屏(串口)显示 这里我们直接使用micropython自带的ntptime就可以实现。但这个包也有点小问题,网上很多教程都是使用包自带的变量NTP_DELTA来设置UTC+8时区,但实测并不能用。因此我们对时后手动增加对应的秒数,再转回struct time就可。 # 进阶任务:从NTP服务器(注意数据交互格式的解析)同步时间,获取时间送显示屏(串口)显示 import ntptime import time for i in range(10): try: ntptime.settime() t = time.localtime(time.time() + 8*3600) print("ntp time(BeiJing): %s-%s-%s %s:%s:%s" % (t[0],t[1],t[2],t[3],t[4],t[5])) break except: print("Can not get time!") time.sleep_ms(1000) 如果板子之前没有硬重启过,还保持着网络连接,运行后就可以看到时间已经校对:     终极任务二:使用外部存储器,组建简易FTP文件服务器,并能正常上传下载文件。 这里和之前的ping一样,使用socket方法来发送接收ftp命令,来实现ftp。这里的ftp代码是基于网上的开源例程修改的,原例程为esp8266编写,少了几个ftp命令,这里已经补上。 先在主机上使用sudo apt install ftp来安装ftp功能。完成后我们把下面代码保存为ftpserver.py文件。 # # Small ftp server for ESP8266 ans ESP32 Micropython # # Based on the work of chrisgp - Christopher Popp and pfalcon - Paul Sokolovsky # # The server accepts passive mode only. # It runs in foreground and quits, when it receives a quit command # Start the server with: # # import ftp # # Copyright (c) 2016 Christopher Popp (initial ftp server framework) # Copyright (c) 2016 Robert Hammelrath (putting the pieces together # and a few extensions) # Distributed under MIT License # import socket import network import uos import gc def send_list_data(path, dataclient, full): try: # whether path is a directory name for fname in sorted(uos.listdir(path), key=str.lower): dataclient.sendall(make_description(path, fname, full)) except: # path may be a file name or pattern pattern = path.split("/")[-1] path = path[:-(len(pattern) + 1)] if path == "": path = "/" for fname in sorted(uos.listdir(path), key=str.lower): if fncmp(fname, pattern): dataclient.sendall(make_description(path, fname, full)) def make_description(path, fname, full): if full: stat = uos.stat(get_absolute_path(path, fname)) file_permissions = ("drwxr-xr-x" if (stat[0] & 0o170000 == 0o040000) else "-rw-r--r--") file_size = stat[6] description = "{} 1 owner group {:>10} Jan 1 2000 {}\r\n".format( file_permissions, file_size, fname) else: description = fname + "\r\n" return description def send_file_data(path, dataclient): with open(path, "rb") as file: chunk = file.read(512) while len(chunk) > 0: dataclient.sendall(chunk) chunk = file.read(512) def save_file_data(path, dataclient): with open(path, "wb") as file: chunk = dataclient.recv(512) while len(chunk) > 0: file.write(chunk) chunk = dataclient.recv(512) def get_absolute_path(cwd, payload): # Just a few special cases "..", "." and "" # If payload start's with /, set cwd to / # and consider the remainder a relative path if payload.startswith('/'): cwd = "/" for token in payload.split("/"): if token == '..': if cwd != '/': cwd = '/'.join(cwd.split('/')[:-1]) if cwd == '': cwd = '/' elif token != '.' and token != '': if cwd == '/': cwd += token else: cwd = cwd + '/' + token return cwd # compare fname against pattern. Pattern may contain # wildcards ? and *. def fncmp(fname, pattern): pi = 0 si = 0 while pi < len(pattern) and si < len(fname): if (fname[si] == pattern[pi]) or (pattern[pi] == '?'): si += 1 pi += 1 else: if pattern[pi] == '*': # recurse if (pi + 1) == len(pattern): return True while si < len(fname): if fncmp(fname[si:], pattern[pi+1:]): return True else: si += 1 return False else: return False if pi == len(pattern.rstrip("*")) and si == len(fname): return True else: return False def ftpserver(net, port=21, timeout=None): DATA_PORT = 13333 ftpsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) datasocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ftpsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) datasocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) ftpsocket.bind(socket.getaddrinfo("0.0.0.0", port)[0][4]) datasocket.bind(socket.getaddrinfo("0.0.0.0", DATA_PORT)[0][4]) ftpsocket.listen(1) ftpsocket.settimeout(timeout) datasocket.listen(1) datasocket.settimeout(None) msg_250_OK = '250 OK\r\n' msg_550_fail = '550 Failed\r\n' addr = net.ifconfig()[0] print("FTP Server started on ", addr) try: dataclient = None fromname = None do_run = True while do_run: cl, remote_addr = ftpsocket.accept() cl.settimeout(300) cwd = '/' try: # print("FTP connection from:", remote_addr) cl.sendall("220 Hello, this is Micropython.\r\n") while True: gc.collect() data = cl.readline().decode("utf-8").rstrip("\r\n") if len(data) <= 0: print("Client disappeared") do_run = False break command = data.split(" ")[0].upper() payload = data[len(command):].lstrip() path = get_absolute_path(cwd, payload) print("Command={}, Payload={}".format(command, payload)) if command == "USER": cl.sendall("230 Logged in.\r\n") elif command == "SYST": cl.sendall("215 UNIX Type: L8\r\n") elif command == "NOOP": cl.sendall("200 OK\r\n") elif command == "FEAT": cl.sendall("211 no-features\r\n") elif command == "PWD" or command == "XPWD": cl.sendall('257 "{}"\r\n'.format(cwd)) elif command == "CWD" or command == "XCWD": try: files = uos.listdir(path) cwd = path cl.sendall(msg_250_OK) except: cl.sendall(msg_550_fail) elif command == "CDUP": cwd = get_absolute_path(cwd, "..") cl.sendall(msg_250_OK) elif command == "TYPE": # probably should switch between binary and not cl.sendall('200 Transfer mode set\r\n') elif command == "SIZE": try: size = uos.stat(path)[6] cl.sendall('213 {}\r\n'.format(size)) except: cl.sendall(msg_550_fail) elif command == "QUIT": cl.sendall('221 Bye.\r\n') do_run = False break elif command == "PASV": cl.sendall('227 Entering Passive Mode ({},{},{}).\r\n'. format(addr.replace('.', ','), DATA_PORT >> 8, DATA_PORT % 256)) dataclient, data_addr = datasocket.accept() print("FTP Data connection from:", data_addr) DATA_PORT = 13333 active = False elif command == "PORT": items = payload.split(",") if len(items) >= 6: data_addr = '.'.join(items[:4]) # replace by command session addr if data_addr == "127.0.1.1": data_addr = remote_addr DATA_PORT = int(items[4]) * 256 + int(items[5]) dataclient = socket.socket(socket.AF_INET, socket.SOCK_STREAM) dataclient.settimeout(10) dataclient.connect((data_addr, DATA_PORT)) print("FTP Data connection with:", data_addr) cl.sendall('200 OK\r\n') active = True else: cl.sendall('504 Fail\r\n') elif command == "LIST" or command == "NLST": if not payload.startswith("-"): place = path else: place = cwd try: cl.sendall("150 Here comes the directory listing.\r\n") send_list_data(place, dataclient, command == "LIST" or payload == "-l") cl.sendall("226 Listed.\r\n") except: cl.sendall(msg_550_fail) if dataclient is not None: dataclient.close() dataclient = None elif command == "RETR": try: cl.sendall("150 Opening data connection.\r\n") send_file_data(path, dataclient) cl.sendall("226 Transfer complete.\r\n") except: cl.sendall(msg_550_fail) if dataclient is not None: dataclient.close() dataclient = None elif command == "STOR": try: cl.sendall("150 Ok to send data.\r\n") save_file_data(path, dataclient) cl.sendall("226 Transfer complete.\r\n") except: cl.sendall(msg_550_fail) if dataclient is not None: dataclient.close() dataclient = None elif command == "DELE": try: uos.remove(path) cl.sendall(msg_250_OK) except: cl.sendall(msg_550_fail) elif command == "RMD" or command == "XRMD": try: uos.rmdir(path) cl.sendall(msg_250_OK) except: cl.sendall(msg_550_fail) elif command == "MKD" or command == "XMKD": try: uos.mkdir(path) cl.sendall(msg_250_OK) except: cl.sendall(msg_550_fail) elif command == "RNFR": fromname = path cl.sendall("350 Rename from\r\n") elif command == "RNTO": if fromname is not None: try: uos.rename(fromname, path) cl.sendall(msg_250_OK) except: cl.sendall(msg_550_fail) else: cl.sendall(msg_550_fail) fromname = None elif command == "MDTM": try: tm=localtime(uos.stat(path)[8]) cl.sendall('213 {:04d}{:02d}{:02d}{:02d}{:02d}{:02d}\r\n'.format(*tm[0:6])) except: cl.sendall('550 Fail\r\n') elif command == "STAT": if payload == "": cl.sendall("211-Connected to ({})\r\n" " Data address ({})\r\n" "211 TYPE: Binary STRU: File MODE:" " Stream\r\n".format( remote_addr[0], addr)) else: cl.sendall("213-Directory listing:\r\n") send_list_data(path, cl, True) cl.sendall("213 Done.\r\n") else: cl.sendall("502 Unsupported command.\r\n") print("Unsupported command {} with payload {}".format( command, payload)) except Exception as err: print(err) finally: cl.close() cl = None except Exception as e: print(e) finally: datasocket.close() ftpsocket.close() if dataclient is not None: dataclient.close() # ftpserver()   接着在主程序中我们重新连接一下网络,这里的目的不是为了联网,因为如果没有重置的话,网本来就连着。目的是为了获取到一个网络object,传入ftp方法用来获取ip。   # 终极任务二:使用外部存储器,组建简易FTP文件服务器,并能正常上传下载文件。 from ftpserver import ftpserver from w5500 import w5500 from ping import ping ip = "192.168.8.213" gate = ip.split(".") gate[-1] = "1" gate = ".".join(gate) nic = w5500((ip,"255.255.255.0",gate,"8.8.8.8")) ftpserver(nic) 运行后,我们在terminal中输入ftp 192.168.8.213来连接到w5500,使用ls方法来显示文件列表,通过get和put方法来下载和上传文件,验证一下,一切正常。     源码:https://download.eeworld.com.cn/detail/eew_AG2DvH/631127   体验心得 以往的开发都是在轮子齐全的情况下进行。这次活动为了全在micopython上完成,尝试做了一些简单的移植,我觉得还是非常有意思的。这是followme的最后一期,希望这样有意思的活动还可以一直持续下去。    

最近访客

< 1/1 >

统计信息

已有7人来访过

  • 芯积分:71
  • 好友:--
  • 主题:6
  • 回复:5

留言

你需要登录后才可以留言 登录 | 注册


现在还没有留言