ICS

  • 2024-12-10
  • 回复了主题帖: 【Follow me第二季第4期】任务二:IMU传感器的使用——串口输出和姿态监控

    秦天qintian0303 发表于 2024-12-10 12:13 这个串口工具不错,求分享   附件有,几年前用的,最近很难找

  • 2024-12-09
  • 发表了主题帖: 【Follow me第二季第4期】任务二:IMU传感器的使用——串口输出和姿态监控

    本帖最后由 ICS 于 2024-12-9 16:38 编辑 同样,先上演示视频,再看文章: [localvideo]9b2686762a8cb2dabe6e2ad7b5916e49[/localvideo] --- 参考: [在 Nano RP2040 Connect 上访问 IMU 数据 |Arduino 文档](https://docs.arduino.cc/tutorials/nano-rp2040-connect/rp2040-imu-basics/) Arduino® Nano RP2040 Connect板载了一个`LSM6DSOX` 的六轴`IMU`,提供了三轴加速度计和三轴陀螺仪。 ### 加速度计 加速度计是一种用于测量加速度的机电设备。这些力可能是静态的,例如重力的连续力,或者像许多移动设备一样,动态的,用于感应运动或振动。 在此示例中,我们将使用加速度计作为 “level” ,以提供有关板位置的信息。使用此应用程序,我们将能够读取板的相对位置,以及通过向上、向下、向左或向右倾斜板来读取度数。 ### 陀螺仪 陀螺仪传感器是一种可以测量和保持物体的方向和角速度的设备。陀螺仪比加速度计更先进,因为它们可以测量物体的倾斜和横向,而加速度计只能测量其线性运动。 陀螺仪传感器也称为“角速率传感器”或“角速度传感器”。角速度以度/秒为单位,是物体每单位时间内旋转角度的变化。 ## 通过串口打印六轴原始数据 如果烧录的是`Micropython`,则我们可以很简单的通过串口打印六轴原始数据: ```python import time from machine import Pin, I2C from lsm6dsox import LSM6DSOX i2c = I2C(0, sda=Pin(12), scl=Pin(13)) lsm = LSM6DSOX(i2c) while True:     # 陀螺仪     gyro_x, gyro_y, gyro_z = lsm.gyro()     # 加速度计     accel_x, accel_y, accel_z = lsm.accel()     print(f"{gyro_x:.3f} {gyro_y:.3f} {gyro_z:.3f} {accel_x:.3f} {accel_y:.3f} {accel_z:.3f}")     time.sleep_ms(50) ``` 使用串口绘图器绘图如下: [localvideo]fe482e0505c7ccd7bfa71e3b878c8b49[/localvideo] ## 通过串口读取重力加速度获取大致姿态 首先我们需要安装`vispy`、`numpy`、`pyserial`和`PyQT5`: `pip install PyQt5 pyserial vispy numpy  ` ```python import numpy as np from vispy import app, scene from vispy.scene import transforms import serial import sys # 创建Canvas和View canvas = scene.SceneCanvas(keys='interactive', show=True) view = canvas.central_widget.add_view() view.camera = 'turntable' view.camera.fov = 60 view.camera.distance = 300 # 创建3D长方体的顶点和面 vertices = np.array([     [50, 15, 2.5], [50, -15, 2.5], [-50, -15, 2.5], [-50, 15, 2.5],     [50, 15, -2.5], [50, -15, -2.5], [-50, -15, -2.5], [-50, 15, -2.5] ]) faces = np.array([     [0, 1, 5], [5, 4, 0],  # Front face     [2, 3, 7], [7, 6, 2],  # Back face     [0, 4, 7], [7, 3, 0],  # Top face     [1, 5, 6], [6, 2, 1],  # Bottom face     [4, 5, 6], [6, 7, 4],  # Right face     [0, 1, 2], [2, 3, 0]   # Left face ]) # 为每个面指定不同颜色(使用RGB值) colors = [     [1, 0, 0],  # Red     [0, 1, 0],  # Green     [0, 0, 1],  # Blue     [1, 1, 0],  # Yellow     [0, 1, 1],  # Cyan     [1, 0, 1],  # Magenta     [0.5, 0.25, 0],  # Brown     [0.75, 0.25, 0.25],  # Pink     [0.5, 0.5, 0.5],  # Gray     [0.6, 1, 0],  # Lime     [0.25, 0.5, 0.75],  # Light Blue     [0.25, 0.75, 0.25]   # Light Green ] # 更新Mesh对象的颜色 mesh = scene.visuals.Mesh(vertices=vertices, faces=faces, face_colors=colors) view.add(mesh) # 串口设置 try:     ser = serial.Serial('COM6', 115200) except Exception as e:     print("Error opening serial port: {}".format(e))     sys.exit(1) def update_mesh(event):     try:         line = ser.readline().decode('utf-8').strip()         if line:             g_y, g_x, g_z = map(float, line.split())             angle_x = np.arcsin(g_x) * 180 / np.pi  # 镜像x轴             angle_y = -np.arcsin(g_y) * 180 / np.pi  # 镜像y轴             angle_z = np.arcsin(g_z) * 180 / np.pi  # 镜像z轴             mesh.transform = transforms.MatrixTransform()             mesh.transform.rotate(angle_x, (1, 0, 0))             mesh.transform.rotate(angle_y, (0, 1, 0))             mesh.transform.rotate(angle_z, (0, 0, 1))             canvas.update()     except ValueError:         pass  # Ignore conversion errors timer = app.Timer('auto', connect=update_mesh, start=True) if __name__ == '__main__':     app.run() ``` 这段代码通过 Vispy 库显示一个 3D 长方体,定义每个面的颜色,并通过串口接收数据调整其倾斜角度。串口数据表示 x、y、z 方向的倾斜比例,转换为旋转角度后实时更新图形的旋转状态。 ```python import time from machine import Pin, I2C from lsm6dsox import LSM6DSOX i2c = I2C(0, sda=Pin(12), scl=Pin(13)) lsm = LSM6DSOX(i2c) while True:     # 这里我们只需要加速度     accel_x, accel_y, accel_z = lsm.accel()     print(f"{accel_x:.3f} {accel_y:.3f} {accel_z:.3f}")     time.sleep_ms(50) ``` ## 附件 1. 代码打包         2. 串口绘图软件         先前文章: [【Follow me第二季第4期】任务一:搭建环境并点亮RGBLED&串口输出](https://bbs.eeworld.com.cn/thread-1300301-1-1.html) [【Follow me第二季第4期】非任务:制作一个黑神话碎玉池银行提款机](https://bbs.eeworld.com.cn/thread-1300596-1-1.html)

  • 2024-12-02
  • 回复了主题帖: 【Follow me第二季第4期】非任务:制作一个黑神话碎玉池银行提款机

    wangerxian 发表于 2024-12-2 13:12 是不是搞个修改器会更快一些 更专业且高效的方法通常是通过专门的软件来实现自动化刷取,但本文旨在提供一种有趣的尝试和娱乐思路

  • 2024-12-01
  • 发表了主题帖: 【Follow me第二季第4期】非任务:制作一个黑神话碎玉池银行提款机

    众所周知,黑神话第四章盘丝洞的碎玉池银行是那些灵韵、灵光点不足的玩家最爱去的地方。然而,刷取灵韵的过程往往枯燥且乏味,笔者就曾亲身经历,刷了一整晚才勉强将等级提升至200级。鉴于此,笔者突发奇想,是否能利用`Arduino® Nano RP2040 Connect`的`USB-HID`功能,将其模拟成一个键盘,从而自动执行刷灵韵的操作,以加速这一过程。(注:当然,更专业且高效的方法通常是通过专门的软件来实现自动化刷取,但本文旨在提供一种有趣的尝试和娱乐思路。) ## 首先先上演示视频 [localvideo]36fcf66e44de691aaee5e55e1b507a7a[/localvideo] ## 准备 #### 软件准备: 1. 黑神话悟空 2. `Arduino® Nano RP2040 Connect`需要刷入`CircuitPython`,且导入`adafruit_hid`库 #### 属性准备: 1. 攻击力需大于90 2. 装备双头鼠,且点满变身攻击和双头鼠 3. q键必须切换到缩地 4. 最好带上仙禄和幽灯鬼 #### 安装CircuitPython 前往[Nano RP2040 Connect Download](https://circuitpython.org/board/arduino_nano_rp2040_connect/)下载固件,在短接`REC`和`GND`的情况下双击两下按钮,拖入下载好的`.uf2`文件即可。 #### 安装adafruit_hid库 前往[Release 6.1.3 - Fix For Sphinx RTD Theme · adafruit/Adafruit_CircuitPython_HID · GitHub](https://github.com/adafruit/Adafruit_CircuitPython_HID/releases/tag/6.1.3)下载[adafruit-circuitpython-hid-py-6.1.3.zip](https://github.com/adafruit/Adafruit_CircuitPython_HID/releases/download/6.1.3/adafruit-circuitpython-hid-py-6.1.3.zip)解压后将lib目录下面的adafruit_hid移动到板载目录下即可 ## 代码实现 ```python import json import time import board import digitalio import usb_hid from adafruit_hid.keyboard import Keyboard from adafruit_hid.keycode import Keycode from adafruit_hid.keyboard_layout_us import KeyboardLayoutUS keyboard_map = {     "a": 0x04, "A": 0x04,     "b": 0x05, "B": 0x05,     "c": 0x06, "C": 0x06,     "d": 0x07, "D": 0x07,     "e": 0x08, "E": 0x08,     "f": 0x09, "F": 0x09,     "g": 0x0A, "G": 0x0A,     "h": 0x0B, "H": 0x0B,     "i": 0x0C, "I": 0x0C,     "j": 0x0D, "J": 0x0D,     "k": 0x0E, "K": 0x0E,     "l": 0x0F, "L": 0x0F,     "m": 0x10, "M": 0x10,     "n": 0x11, "N": 0x11,     "o": 0x12, "O": 0x12,     "p": 0x13, "P": 0x13,     "q": 0x14, "Q": 0x14,     "r": 0x15, "R": 0x15,     "s": 0x16, "S": 0x16,     "t": 0x17, "T": 0x17,     "u": 0x18, "U": 0x18,     "v": 0x19, "V": 0x19,     "w": 0x1A, "W": 0x1A,     "x": 0x1B, "X": 0x1B,     "y": 0x1C, "Y": 0x1C,     "z": 0x1D, "Z": 0x1D,     "1": 0x1E, "!": 0x1E,     "2": 0x1F, "@": 0x1F,     "3": 0x20, "#": 0x20,     "4": 0x21, "$": 0x21,     "5": 0x22, "%": 0x22,     "6": 0x23, "^": 0x23,     "7": 0x24, "&": 0x24,     "8": 0x25, "*": 0x25,     "9": 0x26, "(": 0x26,     "0": 0x27, ")": 0x27,     "enter": 0x28,     "esc": 0x29,     "ctrl": 0xE0,     "alt": 0xE2 } # 初始化键盘和布局 kbd = Keyboard(usb_hid.devices) layout = KeyboardLayoutUS(kbd) # 定义一个按键(例如,板载的一个按钮) usrkey = digitalio.DigitalInOut(board.D2)  # 假设GP24是板载按钮的引脚 usrkey.direction = digitalio.Direction.INPUT usrkey.pull = digitalio.Pull.UP def launch_notepad():     # 打开记事本(Windows系统)     kbd.send(Keycode.WINDOWS, Keycode.R)     time.sleep(0.3)     layout.write('notepad\n')     time.sleep(0.5) def type_text(text):     # 输入文本     layout.write(text) while True:     # 检查按钮是否被按下     if not usrkey.value:         while True:             with open("sycbank.txt", "r", encoding="utf-8") as f:                 keys = f.readlines()                 start_time = 0                 last_time = 0                 for key in keys:                     try:                         key = key.replace("\n", "")                         key_json = json.loads(key)                     except:                         print(f"error: {key}")                         continue                     print(key_json, f"{key_json['time']:.2f}")                     event_type = key_json["event_type"]                     name = key_json["name"]                     key_time = key_json["time"]                     if last_time == 0:                         last_time = key_time                     if event_type == "start":                         start_time = key_json["time"]                     if event_type == "stop":                         pass                     else:                         time.sleep(key_time - last_time)                         print(key_time - last_time)                         if event_type == "down":                             kbd.press(keyboard_map[name.lower()])                         elif event_type == "up":                             kbd.release(keyboard_map[name.lower()])                         last_time = key_time         time.sleep(2)     # 添加一个小的延时以避免过度占用CPU资源     time.sleep(0.1) ``` ## 启用方法 1. 首先前往盘丝洞碎玉池 2. 在土地庙前按`E`后,点击鼠标右键返回后不要移动鼠标和控制键盘 3. 短接`D2`和`GND`即可 ## 附件 板卡文件,可直接复制替换

  • 2024-11-27
  • 发表了主题帖: 【Follow me第二季第4期】任务一:搭建环境并点亮RGBLED&串口输出

    [Follow me第二季第4期!与得捷一起解锁开发板超能力!](https://www.eeworld.com.cn/huodong/digikey_follow_me_2024_04/?sid=102) --- ### 任务准备 1. 硬件准备    1. Arduino® Nano RP2040 Connect开发板    2. USB-Micro B 数据线 2. 软件准备    1. 安装Arduino IDE(确保版本支持RP2040芯片)或者 PlatformIO    2. 安装RP2040的Board支持包 ### 从零搭建环境(以PlatformIO为例) 1. 安装 VS Code    访问[Download Visual Studio Code - Mac, Linux, Windows](https://code.visualstudio.com/Download)下载**System Installer**的安装包,根据步骤安装即可。 2. 安装 PlatformIO IDE    1. 在VS Code中点击左边栏的拓展(Ctrl+Shift+X)    2. 点击搜索框,搜索`PlatformIO IDE`    3. 点击安装 3. 安装RP2040支持包    1. 在VS Code中点击左边栏的PlatformIO    2. 点击Open->New Project    3. 在Board中选择Arduino Nano RP2040 Connect    4. 点击Finsh,等待非常长一段时间(示网络情况而定) ### Blink三色LED / 串口打印Hello DigiKey & EEWorld! 一开始以为,串口打印和Blink应该是很简单的事情,结果事实上并不简单。从上面的框图和介绍中可知,RGB LED连接的并不是 RP2040,而是一个叫NINA-W102的东西,一看介绍:240MHz 32位双核Xtensa LX6处理器!!!这不是ESP32嘛!!!并没有直接可以通过RP2040操作RGB LED的引脚,必须先通过SPI与ESP32进行通讯,然后再由ESP 32对RGB LED进行操作。不过官方文档给出了如何操作NINA的库和例程:[Control Built-in RGB LED over Wi-Fi with Nano RP2040 Connect | Arduino Documentation](https://docs.arduino.cc/tutorials/nano-rp2040-connect/rp2040-web-server-rgb/),例程如下: ```C++ #include #include   # 需要在Platform IO 中安装 WiFi NINA void fadeLED(NinaPin ledPin) {   for (int brightness = 255; brightness >= 0; brightness--)   {     analogWrite(ledPin, brightness);     delay(10);   }   for (int brightness = 0; brightness

  • 2024-05-20
  • 发表了主题帖: 【2023 DigiKey大赛参与奖】Raspberry Pi 5 4G 开箱帖

    本帖最后由 ICS 于 2024-5-20 21:19 编辑 介绍 大家好!我最近有幸参加了2023年的DigiKey“智造万物,快乐不停”创意大赛,并且荣幸地获得了参与奖——我使用参与奖的报销金来购买 Raspberry Pi 5 4G! 外包装     内部包装       初步印象 拿起Raspberry Pi 5 4G,我被它的轻巧和紧凑的设计所吸引。但是真的是很热!!!!而且是个毛坯房,除了板子什么都没有。必要的有TF卡,5V5A的电源,散热器,可能还需要microHDMI的线。 结语 总的来说,我对这个参与奖感到非常满意和兴奋。Raspberry Pi 5 4G是一个功能强大且多用途的开发板,我迫不及待地想要开始我的树莓派之旅了!感谢EEWORLD 和 DigiKey为我提供了这个机会,我期待着在未来的项目中充分发挥它的潜力。 新的一期的2024 DigiKey“感知万物,乐享生活”创意大赛开始报名啦!:【万元大奖,600元物料】2024 DigiKey“感知万物,乐享生活”创意大赛开始报名啦!。快来参加吧! 这就是我对Raspberry Pi 5 4G的开箱体验和初步印象!如果你对这个开发板感兴趣,也可以去DigiKey网站上了解更多详情。期待未来能与大家分享我的树莓派项目!

  • 2024-03-24
  • 回复了主题帖: 【瓜分2500元红包】票选DigiKey"智造万物,快乐不停"创意大赛人气作品TOP3!

    今天才看见,话说自动化专业的算电子行业相关人员嘛

  • 2024-03-13
  • 加入了学习《【DigiKey“智造万物,快乐不停”创意大赛】全胸腔体外振荡排痰系统_作品提交》,观看 【DigiKey“智造万物,快乐不停”创意大赛】全胸腔体外振荡排痰系统_作品提交

  • 2024-03-03
  • 发表了主题帖: 【得捷Follow me第4期】W5500-EVB-Pico的使用 - 作品提交

    感谢EEWorld和得捷举办的此次活动:Follow me 第4期!与得捷一起解锁开发板超能力! (eeworld.com.cn)。 这次活动是一次非常有意义和有趣的学习和实践的机会,让我能够接触到最新的开发板和技术,跟随技术大咖的指导,完成了有挑战性的任务,收获了很多的知识和经验。 第一部分:任务视频介绍 [localvideo]b29348e09a283d3ae4fee90e1534bc22[/localvideo] 视频详情请查看【得捷Follow me第4期】提交视频-EEWORLD大学堂 第二部分:任务/项目总结报告 ps:不知道为什么,现在的md编辑器 TOC/图片缩放/代码块 都不行了:下文为EEWorld自动转码后的内容为方便查看:请下载WORD/PDF查看     入门任务 开发环境搭建,BLINK,驱动液晶显示器进行显示(没有则串口HelloWorld) 烧录 Micropython: 前往 Micropython 下载Wiznet W5500-EVB-Pico的专属固件:MicroPython - Python for microcontrollers 下载好W5500_EVB_PICO-20240222-v1.22.2.uf2后,进入烧录模式,可以直接拖入 uf2 文件进行烧录 烧录完成重启 BLINK: from machine import Pin import time led_pin = Pin(25, Pin.OUT) while True: led_pin.value(1) time.sleep(1) led_pin.value(0) time.sleep(1) 驱动液晶显示器: 我使用的是合宙的 1.8' 128x160 RGB TFT_LCD,连接如下: GPIO6 -------------- SCL GPIO7 -------------- SDA GPIO8 -------------- RST GPIO9 -------------- CS GPIO10 -------------- DC 前往AntonVanke/MicroPython-uFont: MicroPython 的中文字库,使 MicroPython 能够显示中文 (github.com)下载st77xx.py、ufont.py和unifont-14-12917-16.v3.bmf,并上传到根目录,运行: from machine import SPI, Pin import ufont from st77xx import ST7735 spi = SPI(0, 30000000, sck=Pin(6), mosi=Pin(7)) display = ST7735(spi=spi, cs=9, dc=10, rst=8, bl=None, width=160, height=128, rotate=1) font = ufont.BMFont("unifont-14-12917-16.v3.bmf") font.text(display, "EEWorld &\nDigiKey Follow Me 4\nICS \n你好", 0, 0, show=True) 基础任务 ■ 完成主控板W5500初始化(静态IP配置),并能使用局域网电脑ping通,同时W5500可以ping通互联网站点;通过抓包软件(Wireshark、Sniffer等)抓取本地PC的ping报文,展示并分析。 我们可以通过DCHP直接获取 IP 地址 import network d = network.WIZNET5K() d.active(True) d.ifconfig("dhcp") # 验证 if d.isconnected(): print(d.ifconfig()) # ('192.168.199.188', '255.255.255.0', '192.168.199.1', '192.168.199.1') 当然,也可以指定IP import network d = network.WIZNET5K() d.active(True) d.ifconfig(('192.168.199.20','255.255.255.0','192.168.199.1','223.5.5.5')) # 验证 if d.isconnected(): print(d.ifconfig()) 这里我们以第一次PING进行解析,我们来对比一下: 第一个数据包: 0000 02 d7 27 7c 62 9a a0 29 42 99 88 60 08 00 45 00 0010 00 3c b7 0f 00 00 80 01 00 00 c0 a8 c7 d0 c0 a8 0020 c7 14 08 00 4c 64 00 01 00 f7 61 62 63 64 65 66 0030 67 68 69 6a 6b 6c 6d 6e 6f 70 71 72 73 74 75 76 0040 77 61 62 63 64 65 66 67 68 69 以太网头部(Ethernet Header): 目的MAC地址:02:d7:27:7c:62:9a 源MAC地址:a0:29:42:99:88:60 以太网类型:0x0800(表示IP数据包) IP头部(IP Header): 版本:4 (IPv4) 头部长度:20 bytes 生存时间(TTL):128 协议:ICMP (0x01) 源IP地址:192.168.199.208 目的IP地址:192.168.199.20 ICMP数据部分: 类型:8 (Echo (ping) request) 代码:0 标识符(Identifier):0x0001 序列号(Sequence Number):0x00f7 数据:abcdefghijklmnopqrstuvwxyz 第二个数据包: 0000 a0 29 42 99 88 60 02 d7 27 7c 62 9a 08 00 45 00 0010 00 3c b7 0f 00 00 ff 01 f4 7a c0 a8 c7 14 c0 a8 0020 c7 d0 00 00 54 64 00 01 00 f7 61 62 63 64 65 66 0030 67 68 69 6a 6b 6c 6d 6e 6f 70 71 72 73 74 75 76 0040 77 61 62 63 64 65 66 67 68 69 以太网头部(Ethernet Header): 源MAC地址:a0:29:42:99:88:60 目的MAC地址:02:d7:27:7c:62:9a 以太网类型:0x0800(表示IP数据包) IP头部(IP Header): 生存时间(TTL):255 源IP地址:192.168.199.20 目的IP地址:192.168.199.208 ICMP数据部分: 类型:8 (Echo (ping) request) 代码:0 标识符(Identifier):0x0001 序列号(Sequence Number):0x00f7 数据:abcdefghijklmnopqrstuvwxyz ■ 主控板建立TCPIP或UDP服务器,局域网PC使用TCPIP或UDP客户端进行连接并发送数据,主控板接收到数据后,送液晶屏显示(没有则通过串口打印显示);通过抓包软件抓取交互报文,展示并分析。(TCP和UDP二选一,或者全都操作) 使用socket模块可以进行socket通讯,下面就创建一个TCP的服务端: import socket # 设置服务器的IP地址和端口号 SERVER_IP = '0.0.0.0' # 监听所有网络接口 SERVER_PORT = 12345 # 创建一个TCP套接字对象 server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 绑定IP地址和端口号 server_sock.bind((SERVER_IP, SERVER_PORT)) # 开始监听连接 server_sock.listen(1) print("Server listening on", SERVER_IP, "port", SERVER_PORT) # 接受连接并处理数据 client_sock, client_addr = server_sock.accept() print("Client connected from", client_addr) while True: # 接收客户端的数据 data = client_sock.recv(1024) if data: print("Received:", data.decode()) # 发送响应数据 response = "Hello ICS" client_sock.send(response.encode())   使用LLCOM可以进行网络调试(工具链接:LLCOM | 能跑Lua代码的串口调试工具! (papapoi.com)) ■ 从NTP服务器(注意数据交互格式的解析)同步时间,获取时间送显示屏(串口)显示。 在开发板上新建一个文件,命名为ntpics.py,内容如下: from machine import SPI, Pin,Timer import network import ufont from st77xx import ST7735 import ntplc import time spi = SPI(0, 30000000, sck=Pin(6), mosi=Pin(7)) display = ST7735(spi=spi, cs=9, dc=10, rst=8, bl=None, width=160, height=128, rotate=1) font = ufont.BMFont("unifont-14-12917-16.v3.bmf") font.text(display, "等待授时完成", 0, 0, show=True) d = network.WIZNET5K() d.active(True) d.ifconfig("dhcp") # 验证 if d.isconnected(): print(d.ifconfig()) ntplc.settime() def timer_callback(timer): # 获取当前时间戳(自1970年1月1日以来的秒数) current_time = time.time() # 将时间戳转换为本地时间 local_time = time.localtime(current_time + 28800) # 打印本地时间的年、月、日、时、分、秒 font.text(display, "{}-{}-{}\n{}:{}:{}".format(local_time[0], local_time[1], local_time[2], local_time[3], local_time[4], local_time[5]), 48, 32, font_size=16,clear=True) # 创建一个定时器对象 tim = Timer(-1) # 每隔1秒触发一次定时器回调函数 tim.init(period=1000, mode=Timer.PERIODIC, callback=timer_callback)   终极任务 ■ 使用外部存储器,组建简易FTP文件服务器,并能正常上传下载文件。 # 参考:https://github.com/hosseinghaheri/MicroPython-FTP-Server import socket import network import uos import gc import sys import errno from time import sleep_ms, localtime from micropython import alloc_emergency_exception_buf # constant definitions _CHUNK_SIZE = const(1024) _SO_REGISTER_HANDLER = const(20) _COMMAND_TIMEOUT = const(300) _DATA_TIMEOUT = const(100) _DATA_PORT = const(13333) # Global variables ftpsockets = [] datasocket = None client_list = [] verbose_l = 0 client_busy = False # Interfaces: (IP-Address (string), IP-Address (integer), Netmask (integer)) _month_name = ("", "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec") class FTP_client: def __init__(self, ftpsocket, local_addr): self.command_client, self.remote_addr = ftpsocket.accept() self.remote_addr = self.remote_addr[0] self.command_client.settimeout(_COMMAND_TIMEOUT) log_msg(1, "FTP Command connection from:", self.remote_addr) self.command_client.setsockopt(socket.SOL_SOCKET, _SO_REGISTER_HANDLER, self.exec_ftp_command) self.command_client.sendall("220 Hello, this is the {}.\r\n".format(sys.platform)) self.cwd = '/' self.fromname = None # self.logged_in = False self.act_data_addr = self.remote_addr self.DATA_PORT = 20 self.active = True self.pasv_data_addr = local_addr def send_list_data(self, path, data_client, full): try: for fname in uos.listdir(path): data_client.sendall(self.make_description(path, fname, full)) except Exception as e: # path may be a file name or pattern path, pattern = self.split_path(path) try: for fname in uos.listdir(path): if self.fncmp(fname, pattern): data_client.sendall( self.make_description(path, fname, full)) except: pass def make_description(self, path, fname, full): global _month_name if full: stat = uos.stat(self.get_absolute_path(path, fname)) file_permissions = ("drwxr-xr-x" if (stat[0] & 0o170000 == 0o040000) else "-rw-r--r--") file_size = stat[6] tm = stat[7] & 0xffffffff tm = localtime(tm if tm < 0x80000000 else tm - 0x100000000) if tm[0] != localtime()[0]: description = "{} 1 owner group {:>10} {} {:2} {:>5} {}\r\n".\ format(file_permissions, file_size, _month_name[tm[1]], tm[2], tm[0], fname) else: description = "{} 1 owner group {:>10} {} {:2} {:02}:{:02} {}\r\n".\ format(file_permissions, file_size, _month_name[tm[1]], tm[2], tm[3], tm[4], fname) else: description = fname + "\r\n" return description def send_file_data(self, path, data_client): buffer = bytearray(_CHUNK_SIZE) mv = memoryview(buffer) with open(path, "rb") as file: bytes_read = file.readinto(buffer) while bytes_read > 0: data_client.write(mv[0:bytes_read]) bytes_read = file.readinto(buffer) data_client.close() def save_file_data(self, path, data_client, mode): buffer = bytearray(_CHUNK_SIZE) mv = memoryview(buffer) with open(path, mode) as file: bytes_read = data_client.readinto(buffer) while bytes_read > 0: file.write(mv[0:bytes_read]) bytes_read = data_client.readinto(buffer) data_client.close() def get_absolute_path(self, cwd, payload): # Just a few special cases "..", "." and "" # If payload start's with /, set cwd to / # and consider the remainder a relative path if payload.startswith('/'): cwd = "/" for token in payload.split("/"): if token == '..': cwd = self.split_path(cwd)[0] elif token != '.' and token != '': if cwd == '/': cwd += token else: cwd = cwd + '/' + token return cwd def split_path(self, path): # instead of path.rpartition('/') tail = path.split('/')[-1] head = path[:-(len(tail) + 1)] return ('/' if head == '' else head, tail) # compare fname against pattern. Pattern may contain # the wildcards ? and *. def fncmp(self, fname, pattern): pi = 0 si = 0 while pi < len(pattern) and si < len(fname): if (fname[si] == pattern[pi]) or (pattern[pi] == '?'): si += 1 pi += 1 else: if pattern[pi] == '*': # recurse if pi == len(pattern.rstrip("*?")): # only wildcards left return True while si < len(fname): if self.fncmp(fname[si:], pattern[pi + 1:]): return True else: si += 1 return False else: return False if pi == len(pattern.rstrip("*")) and si == len(fname): return True else: return False def open_dataclient(self): if self.active: # active mode data_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) data_client.settimeout(_DATA_TIMEOUT) data_client.connect((self.act_data_addr, self.DATA_PORT)) log_msg(1, "FTP Data connection with:", self.act_data_addr) else: # passive mode data_client, data_addr = datasocket.accept() log_msg(1, "FTP Data connection with:", data_addr[0]) return data_client def exec_ftp_command(self, cl): global datasocket global client_busy global my_ip_addr try: gc.collect() data = cl.readline().decode("utf-8").rstrip("\r\n") if len(data) <= 0: # No data, close # This part is NOT CLEAN; there is still a chance that a # closing data connection will be signalled as closing # command connection log_msg(1, "*** No data, assume QUIT") close_client(cl) return if client_busy: # check if another client is busy cl.sendall("400 Device busy.\r\n") # tell so the remote client return # and quit client_busy = True # now it's my turn # check for log-in state may done here, like # if self.logged_in == False and not command in\ # ("USER", "PASS", "QUIT"): # cl.sendall("530 Not logged in.\r\n") # return command = data.split()[0].upper() payload = data[len(command):].lstrip() # partition is missing path = self.get_absolute_path(self.cwd, payload) log_msg(1, "Command={}, Payload={}".format(command, payload)) if command == "USER": # self.logged_in = True cl.sendall("230 Logged in.\r\n") # If you want to see a password,return # "331 Need password.\r\n" instead # If you want to reject an user, return # "530 Not logged in.\r\n" elif command == "PASS": # you may check here for a valid password and return # "530 Not logged in.\r\n" in case it's wrong # self.logged_in = True cl.sendall("230 Logged in.\r\n") elif command == "SYST": cl.sendall("215 UNIX Type: L8\r\n") elif command in ("TYPE", "NOOP", "ABOR"): # just accept & ignore cl.sendall('200 OK\r\n') elif command == "QUIT": cl.sendall('221 Bye.\r\n') close_client(cl) elif command == "PWD" or command == "XPWD": cl.sendall('257 "{}"\r\n'.format(self.cwd)) elif command == "CWD" or command == "XCWD": try: if (uos.stat(path)[0] & 0o170000) == 0o040000: self.cwd = path cl.sendall('250 OK\r\n') else: cl.sendall('550 Fail\r\n') except: cl.sendall('550 Fail\r\n') elif command == "PASV": cl.sendall('227 Entering Passive Mode ({},{},{}).\r\n'.format( self.pasv_data_addr.replace('.', ','), _DATA_PORT >> 8, _DATA_PORT % 256)) self.active = False elif command == "PORT": items = payload.split(",") if len(items) >= 6: self.act_data_addr = '.'.join(items[:4]) if self.act_data_addr == "127.0.1.1": # replace by command session addr self.act_data_addr = self.remote_addr self.DATA_PORT = int(items[4]) * 256 + int(items[5]) cl.sendall('200 OK\r\n') self.active = True else: cl.sendall('504 Fail\r\n') elif command == "LIST" or command == "NLST": if payload.startswith("-"): option = payload.split()[0].lower() path = self.get_absolute_path( self.cwd, payload[len(option):].lstrip()) else: option = "" try: data_client = self.open_dataclient() cl.sendall("150 Directory listing:\r\n") self.send_list_data(path, data_client, command == "LIST" or 'l' in option) cl.sendall("226 Done.\r\n") data_client.close() except: cl.sendall('550 Fail\r\n') if data_client is not None: data_client.close() elif command == "RETR": try: data_client = self.open_dataclient() cl.sendall("150 Opened data connection.\r\n") self.send_file_data(path, data_client) # if the next statement is reached, # the data_client was closed. data_client = None cl.sendall("226 Done.\r\n") except: cl.sendall('550 Fail\r\n') if data_client is not None: data_client.close() elif command == "STOR" or command == "APPE": try: data_client = self.open_dataclient() cl.sendall("150 Opened data connection.\r\n") self.save_file_data(path, data_client, "wb" if command == "STOR" else "ab") # if the next statement is reached, # the data_client was closed. data_client = None cl.sendall("226 Done.\r\n") except: cl.sendall('550 Fail\r\n') if data_client is not None: data_client.close() elif command == "SIZE": try: cl.sendall('213 {}\r\n'.format(uos.stat(path)[6])) except: cl.sendall('550 Fail\r\n') elif command == "MDTM": try: tm=localtime(uos.stat(path)[8]) cl.sendall('213 {:04d}{:02d}{:02d}{:02d}{:02d}{:02d}\r\n'.format(*tm[0:6])) except: cl.sendall('550 Fail\r\n') elif command == "STAT": if payload == "": cl.sendall("211-Connected to ({})\r\n" " Data address ({})\r\n" " TYPE: Binary STRU: File MODE: Stream\r\n" " Session timeout {}\r\n" "211 Client count is {}\r\n".format( self.remote_addr, self.pasv_data_addr, _COMMAND_TIMEOUT, len(client_list))) else: cl.sendall("213-Directory listing:\r\n") self.send_list_data(path, cl, True) cl.sendall("213 Done.\r\n") elif command == "DELE": try: uos.remove(path) cl.sendall('250 OK\r\n') except: cl.sendall('550 Fail\r\n') elif command == "RNFR": try: # just test if the name exists, exception if not uos.stat(path) self.fromname = path cl.sendall("350 Rename from\r\n") except: cl.sendall('550 Fail\r\n') elif command == "RNTO": try: uos.rename(self.fromname, path) cl.sendall('250 OK\r\n') except: cl.sendall('550 Fail\r\n') self.fromname = None elif command == "CDUP" or command == "XCUP": self.cwd = self.get_absolute_path(self.cwd, "..") cl.sendall('250 OK\r\n') elif command == "RMD" or command == "XRMD": try: uos.rmdir(path) cl.sendall('250 OK\r\n') except: cl.sendall('550 Fail\r\n') elif command == "MKD" or command == "XMKD": try: uos.mkdir(path) cl.sendall('250 OK\r\n') except: cl.sendall('550 Fail\r\n') elif command == "SITE": try: exec(payload.replace('\0','\n')) cl.sendall('250 OK\r\n') except: cl.sendall('550 Fail\r\n') else: cl.sendall("502 Unsupported command.\r\n") # log_msg(2, # "Unsupported command {} with payload {}".format(command, # payload)) except OSError as err: if verbose_l > 0: log_msg(1, "Exception in exec_ftp_command:") sys.print_exception(err) if err.errno in (errno.ECONNABORTED, errno.ENOTCONN): close_client(cl) # handle unexpected errors except Exception as err: log_msg(1, "Exception in exec_ftp_command: {}".format(err)) # tidy up before leaving client_busy = False def log_msg(level, *args): global verbose_l if verbose_l >= level: print(*args) # close client and remove it from the list def close_client(cl): cl.setsockopt(socket.SOL_SOCKET, _SO_REGISTER_HANDLER, None) cl.close() for i, client in enumerate(client_list): if client.command_client == cl: del client_list[i] break def accept_ftp_connect(ftpsocket, local_addr): # Accept new calls for the server try: client_list.append(FTP_client(ftpsocket, local_addr)) except: log_msg(1, "Attempt to connect failed") # try at least to reject try: temp_client, temp_addr = ftpsocket.accept() temp_client.close() except: pass def num_ip(ip): items = ip.split(".") return (int(items[0]) << 24 | int(items[1]) << 16 | int(items[2]) << 8 | int(items[3])) def stop(): global ftpsockets, datasocket global client_list global client_busy for client in client_list: client.command_client.setsockopt(socket.SOL_SOCKET, _SO_REGISTER_HANDLER, None) client.command_client.close() del client_list client_list = [] client_busy = False for sock in ftpsockets: sock.setsockopt(socket.SOL_SOCKET, _SO_REGISTER_HANDLER, None) sock.close() ftpsockets = [] if datasocket is not None: datasocket.close() datasocket = None # start listening for ftp connections on port 21 def start(port=21, verbose=0, splash=True): global ftpsockets, datasocket global verbose_l global client_list global client_busy alloc_emergency_exception_buf(100) verbose_l = verbose client_list = [] client_busy = False d = network.WIZNET5K() d.active(True) d.ifconfig("dhcp") ifconfig = d.ifconfig() addr = socket.getaddrinfo(ifconfig[0], port) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(addr[0][4]) sock.listen(1)# sock.setsockopt(socket.SOL_SOCKET, _SO_REGISTER_HANDLER, lambda s : accept_ftp_connect(s, ifconfig[0])) ftpsockets.append(sock) if splash: print("FTP server started on {}:{}".format(ifconfig[0], port)) datasocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) datasocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) datasocket.bind(('0.0.0.0', _DATA_PORT)) datasocket.listen(1) datasocket.settimeout(10) def restart(port=21, verbose=0, splash=True): stop() sleep_ms(200) start(port, verbose, splash) start(splash=True)     第三部分:可编译下载的代码 下载详情请查看:download.eeworld.com.cn/detail/ICS/631418     参与这次活动不仅提高了我的电子技术水平,也激发了我的创造力和热情,让我感受到了编程的乐趣和创造的成就。   非常感谢得捷和EEWorld为这次活动提供了优质的物料、平台、资源和支持,让我能够顺利地完成我的作品,并有机会获得丰厚的奖励和认可。 希能够继续举办更多类似的活动,让更多的电子爱好者能够参与进来,一起学习实用的电子技术知识,一起积攒DIY经验,一起变成更好的自己! 参考 1. W5500-EVB-Pico | WIZnet Document System 2. 六千字详细图解网络时间协议(NTP),带你领略NTP的魅力!-腾讯云开发者社区-腾讯云 (tencent.com) 3. Reference Library (ntp.org) 4. hosseinghaheri/MicroPython-FTP-Server: Small FTP server for ESP8266/ESP32/PYBD on the MicroPython platform (github.com) 补充内容 (2024-3-9 16:44): 标题写错了:应该是第四期的!!!!

  • 上传了资料: 【得捷电子Follow me第4期】整体源码+记录

  • 2024-01-15
  • 回复了主题帖: 【DigiKey创意大赛】基于ADS1282的三维地震勘探仪器的设计

    lugl4313820 发表于 2024-1-15 14:04 这个大佬可以申请传利了,拿来公开,不会怕国外的拿去申请了专利吗? 还没想过申请专利。 不过我已经在开发迭代的设备了,加强了边缘计算部分,降低了大量功耗,准备用作我的毕业设计

  • 回复了主题帖: 【DigiKey创意大赛】基于ADS1282的三维地震勘探仪器的设计

    okhxyyo 发表于 2024-1-15 09:19 地震仪这个牛了~~话说这个真的能测出来吗? 勘探中的地震不是指的“天然地震”,而是一次微小的、无害的人工方法制造的地震。地震勘探方法是依据地震波传播理论,利用地下岩石的弹性差异,通过人工激发地震波,记录地震波在地层中传播的地震信号,通过对地震信号进行处理和解释以查明地下地质构造、地层分布等地质情况,为寻找油气藏或其它勘探目的服务的一种地球物理勘探方法。

  • 2024-01-14
  • 发表了主题帖: 【DigiKey创意大赛】基于ADS1282的三维地震勘探仪器的设计

    作品名称:基于ADS1282的三维地震勘探仪器的设计 作者:ICS ## 作品简介 设计名称:《基于ADS1282的三维地震勘探仪器的设计》 作品照片: 项目用到的板卡、芯片、模块: 主要的芯片有:1. ADS1282IPW. 2. ESP32-S3-WROOM-1U-N4R2 作品功能介绍:         ​    仪器通过捕捉微弱的地震信号,获取地下三维结构的信息。地震勘探是一种利用地球物理现象探测地下结构和资源的技术,勘探中的地震不是指的“天然地震”,而是一次微小的、无害的人工方法制造的地震。地震勘探方法是依据地震波传播理论,利用地下岩石的弹性差异,通过人工激发地震波,记录地震波在地层中传播的地震信号,通过对地震信号进行处理和解释以查明地下地质构造、地层分布等地质情况,为寻找油气藏或其它勘探目的服务的一种地球物理勘探方法,它对于矿产资源开发、能源安全、地质灾害防治等都有着重要的作用。 ## 系统框图 硬件架构 系统的硬件可以分为:电池、电池保护及降压板、系统板、GPS模块、外壳。 系统的固件分为两个版本: 1. WIFI集群控制版本 源码请见:https://download.eeworld.com.cn/detail/ICS/630773 2. 低功耗版本 源码请见:https://download.eeworld.com.cn/detail/ICS/630771 ## 各部分功能说明 地震勘探是指人工激发所引起的弹性波利用地下介质弹性和密度的差异,通过观测和分析人工地震产生的地震波在地下的传播规律,推断地下岩层的性质和形态的地球物理勘探方法。 地震勘探技术的原理是采用人工方式激发出地表的地震波,地震波由上而下传播的期间会由于地层的介质和岩性的不一致出现反射和折射的现象,然后在地表通过地震勘探检波仪接收地层中反射回来的地震波。最终,对这些地震波信号进行分析处理,结合相关地震波理论知识,从而可以分析出地层的岩性以及地层的油气资源等信息,下面是演示图片,小三角就是检波器。 项目使用 ADS1282 来采集数据,通过 WIFI 传输采集数据,于此同时,我们还需要用到 SD3078 和 GPS 模块来授时,用到 TFCard 来存储数据。 受时模块主要用于获取精确的GPS时间。 存储模块用于存储采集数据(其实就是一张TF卡) ## 作品源码 作品分为两种固件以适应不同的场景和需求: 1. WIFI集群控制版本 下载链接:https://download.eeworld.com.cn/detail/ICS/630773 功耗:500mw 特性:WIFI 集群控制,espnow控制 2. 低功耗采集版本 下载链接:https://download.eeworld.com.cn/detail/ICS/630771 功耗:150mw 特性:精准受时 主要代码解析: 这段代码是用于启动GPS的函数,其中定义了两个函数GPSStartup和GPS_SerialHanddler。GPSStartup函数用于初始化GPS串口、设置使能引脚、初始化PPS引脚,并设置定时器和中断函数以进行授时。GPS_SerialHanddler函数用于处理接收到的GPS数据,并根据数据更新时间和位置信息。如果同步成功,将关闭GPS使能引脚。 以上的代码是实现了十秒的系统节拍,当处于采集任务时,获取精准的时间 由于代码量比较多,而且代码有完整注释,请查看代码 ## 作品功能演示视频 [localvideo]2d1840abd23a6dc1f09b1749491134df[/localvideo] [基于ADS1282的三维地震勘探仪器的设计 演示视频-【DigiKey“智造万物,快乐不停”创意大赛】-EEWORLD大学堂](https://training.eeworld.com.cn/video/38975) ## **项目总结** [【DigiKey“智造万物,快乐不停”创意大赛】 物料开箱](https://bbs.eeworld.com.cn/thread-1260793-1-1.html) [【DigiKey“智造万物,快乐不停”创意大赛】1. ADS1282 驱动的移植与实验](https://bbs.eeworld.com.cn/thread-1263186-1-1.html) [【DigiKey“智造万物,快乐不停”创意大赛】2. 系统功能的基本实现](https://bbs.eeworld.com.cn/thread-1266199-1-1.html) [【DigiKey“智造万物,快乐不停”创意大赛】3. 打的板子及其功能演示](https://bbs.eeworld.com.cn/thread-1269331-1-1.html) [【DigiKey“智造万物,快乐不停”创意大赛】4.外出测试以及数据处理思想](https://bbs.eeworld.com.cn/thread-1269764-1-1.html) ## **其他** ### WORD ### 参考资料: [地震勘探 - 知乎 (zhihu.com)](https://www.zhihu.com/topic/19869281/intro) [ADS1282 数据表、产品信息和支持 | 德州仪器 TI.com.cn](https://www.ti.com.cn/product/cn/ADS1282)

  • 发表了主题帖: 【DigiKey“智造万物,快乐不停”创意大赛】4.外出测试以及数据处理思想

    本帖最后由 ICS 于 2024-1-14 22:20 编辑 >作品名称:基于ADS1282的三维地震勘探仪器的设计 > >账号:[ICS](https://bbs.eeworld.com.cn/space-uid-1345439.html) ## 外出测试记录 最近外出测试了作品,并总结一下数据处理的思想及其代码 ![标记测线号](/data/attachment/forum/202401/14/211406z40hwgw42ogjaqqa.png?rand=6628.8168507876935) ![导出数据](/data/attachment/forum/202401/14/211406rm7o5mti5zpii5co.png?rand=4851.930213301554) ![采集数据归来](/data/attachment/forum/202401/14/211404jlz9wkcd3dg7fzfr.png?rand=7405.913229612324) ![炮点波形](/data/attachment/forum/202401/14/211405kbs7afqbbabffmhq.png.thumb.jpg?rand=1458.9571248143375) 下面是外出的视频,声音请忽略 [localvideo]3133deb36e886fb9857d18538561e6ac[/localvideo] [localvideo]f1b3a4da8d7a4f362f2240ed3e3b3736[/localvideo] 根据炮点起跳波形,此次实验比较成功,作品基本达到了市面上检波器的水准。 ## 地震数据格式介绍 > 本作品采用 SU 文件格式作为存储格式。SU 文件格式通常指的是Seismic Unix文件格式,这是一种用于存储地震数据的标准格式,文件包含了地震记录、采样率、地震道等信息。 > > 下面是一个本次采集的 SU 文件样本,可以使用 SeiSee 软件打开查看。[SeiSee Download - SeiSee program shows seismic data in SEG-Y, CWP/SU, CGG CST format on your PC (informer.com)](https://seisee.software.informer.com/) > ![SeiSee 软件查看su](/data/attachment/forum/202401/14/211403ct6zh1yoy88fv66o.png.thumb.jpg?rand=3527.2457683094126) 上图即为 SeiSee 文件查看本次实验的波形。 su 文件包含头部的`240`个字节的描述和后面的数据。其中重要的如下: ``` 9 -  12 测线中道顺序号――若一条测线有若干 SEG Y 文件号数递增 29 - 30 道识别码 35 - 36 数据用途 41 - 44 检波器组高程(所有基准以上高程为正,以下为负) 81 - 84 检波器组坐标 ―― X 纬度 81 - 84 检波器组坐标 ―― X 纬度 89 - 90 坐标单位 115 - 116 采样数目 117 - 118 采样间隔 微秒 119 - 120 野外仪器增益类型 125 - 126 相关性 157 - 158 数据记录的年 159 - 160 年中的第几日 161 - 162 时 163 - 164 分 165 - 166 秒 203 - 204  道值测量单位 189 - 192 IN LINE 193 - 196 CROSS LINE 213 - 214 通道数 ``` 除了上面的信息,我还加入了自定义的信息:使用 CDP X(181-184) 和 CDP Y (185-188)记录精准的内部时间 ## 数据处理的步骤 ### 数据的筛选 我们需要通过日志和炮点爆炸记录筛选合适的数据 ```python import re log_data = open("D:/414310092/414310092.log", "r", encoding="utf-8").read() # 定义正则表达式模式 time_pattern = re.compile(r"\[TIME\]\|(.+)") satellites_pattern = re.compile(r"\[GPS\]\|.*SATELLITES:(\d+).*") # 在日志数据中查找匹配项 matches = [] for match in re.finditer(time_pattern, log_data):     time_str = match.group(1)     satellites_match = satellites_pattern.search(log_data[match.end():])     if satellites_match:         satellites = int(satellites_match.group(1))         matches.append({"Time": time_str, "Satellites": satellites}) count = 0 file_list = os.listdir("D:\\414310092") # 打印匹配到的信息 for match in tqdm.tqdm(matches):     if match['Satellites'] > 0:         if f"data-{time_str_to_unix_timestamp(match['Time'])}.su" in file_list:             shutil.copy2(f"D:\\414310092\\data-{time_str_to_unix_timestamp(match['Time'])}.su",                          f"./ddata/414310092/data-{time_str_to_unix_timestamp(match['Time'])}.su")             count += 1 print(count) ``` ### 数据的裁剪与整合 由于采集的不稳定性,以及对时间的高精度要求,我们需要对数据进行处理。比如我有一个地震文件集,其中每个文件记录10000条数据,数据产生的间隔是1ms,这样每个文件的长度就是10s,通常情况下,文件的时间都会连续起来。但是由于时钟漂移的原因,时间有可能不会连续,我需要将其中的时间补零。此外,为了方便阅读,需要将时间对齐到整十秒,生成新的文件。 假设每个文件包含以下格式的数据: ```python # earthquake_1.csv 2024-01-01 00:00:00.001,10.5 2024-01-01 00:00:00.002,11.2 ``` 为了方便起见,我们已将 su 数据转换为 csv 数据,以便于查看。 ### 步骤一:导入必要的库 ```python pythonCopy codeimport pandas as pd from datetime import datetime, timedelta ``` ### 步骤二:读取地震数据文件 假设文件名是 earthquake_1.csv: ```python pythonCopy codefile_path = "earthquake_1.csv" df = pd.read_csv(file_path, parse_dates=['timestamp']) ``` ### 步骤三:补零和时间对齐 ```python pythonCopy code# 设置文件的开始时间和结束时间 start_time = df['timestamp'].min() end_time = df['timestamp'].max() # 生成连续时间索引 full_index = pd.date_range(start_time, end_time, freq='1ms') # 重新索引数据框并用 0 填充缺失值 df = df.set_index('timestamp').reindex(full_index, fill_value=0).reset_index() # 将时间对齐到整十秒 df['timestamp_aligned'] = df['timestamp'] - pd.to_timedelta(df['timestamp'].dt.second % 10, unit='s') ``` ### 步骤四:保存新的文件 ```python pythonCopy codenew_file_path = "earthquake_aligned.csv" df.to_csv(new_file_path, index=False) ``` 以上步骤中,首先读取了地震数据文件,然后使用 Pandas 进行时间索引的重新生成,同时用 0 填充缺失的时间戳。接着,将时间对齐到整十秒,最后保存新的文件。 [【DigiKey“智造万物,快乐不停”创意大赛】 物料开箱](https://bbs.eeworld.com.cn/thread-1260793-1-1.html) [【DigiKey“智造万物,快乐不停”创意大赛】1. ADS1282 驱动的移植与实验](https://bbs.eeworld.com.cn/thread-1263186-1-1.html) [【DigiKey“智造万物,快乐不停”创意大赛】2. 系统功能的基本实现](https://bbs.eeworld.com.cn/thread-1266199-1-1.html) [【DigiKey“智造万物,快乐不停”创意大赛】3. 打的板子及其功能演示](https://bbs.eeworld.com.cn/thread-1269331-1-1.html)

  • 2024-01-13
  • 上传了资料: 【DigiKey创意大赛】基于ADS1282的三维地震勘探仪器的设计|带WIFI以及集群控制版本

  • 上传了资料: 【DigiKey创意大赛】基于ADS1282的三维地震勘探仪器的设计

  • 2024-01-10
  • 回复了主题帖: 【DigiKey“智造万物,快乐不停”创意大赛】3. 打的板子及其功能演示

    秦天qintian0303 发表于 2024-1-10 15:27 你这直接SMT了,投入有点大啊   物料都是邮寄的要不了多少钱,打100板子差不多1600

  • 回复了主题帖: 【DigiKey“智造万物,快乐不停”创意大赛】3. 打的板子及其功能演示

    lugl4313820 发表于 2024-1-10 14:37 这么多,大佬是要出单子了吗?有没有试用一下呀? 现在就是在外出测试的路上

  • 发表了主题帖: 【DigiKey“智造万物,快乐不停”创意大赛】3. 打的板子及其功能演示

    --- 作品名称:基于ADS1282的三维地震勘探仪器的设计 账号:[ICS](https://bbs.eeworld.com.cn/space-uid-1345439.html) --- 打的板子到了(12月中旬),下面放上一些图片吧(多图警告) ---     嘉立创包裹的严严实实的 两个一版   解构解析:     下面是功能演示视频 [localvideo]f181160b7e34848196c492205a7a2471[/localvideo] 感觉水了一期,下一期将外出测试

最近访客

< 1/1 >

统计信息

已有8人来访过

  • 芯积分:279
  • 好友:--
  • 主题:14
  • 回复:14

留言

你需要登录后才可以留言 登录 | 注册


现在还没有留言