- 2025-02-21
-
发表了主题帖:
《大规模语言模型:从理论到实践》第八章 大语言模型评估
1、模型评估概念
模型评估是指通过特定的评估方法和标准,评估机器学习模型在未见过数据上的泛化能力和预测准确性。模型评估的目标是帮助开发者了解模型在实际应用场景中的表现,从而决定模型的改进方向和实际应用的适用性。大语言模型(如GPT系列、BERT等)的评估更加复杂,因其能够处理多种任务,评估不仅仅局限于分类精度,还涉及生成文本的质量、语义理解、推理能力等多个层面。
1.1 模型评估的重要性
性能验证:确保模型在真实世界应用中的有效性。
泛化能力:评估模型是否能在未见过的实际数据上表现良好。
持续改进:提供反馈以指导模型的优化和微调。
1.2 常见评估方法
人工评估:通过人类评审员对模型生成内容的质量进行评分,适用于内容创作类任务(如文本生成、对话系统等)。
自动评估:通过算法自动计算一些标准指标(如准确率、召回率、BLEU、ROUGE等)来衡量模型的性能。
2、大语言模型评估体系
大语言模型的评估体系主要由两个核心组成部分:知识与能力评估和伦理与安全评估。由于大语言模型能够处理多个复杂任务,评估体系需要覆盖这些任务的广泛能力。
2.1 知识与能力评估
大语言模型具备处理各种语言任务的能力,如文本分类、情感分析、机器翻译、代码生成、推理等。针对这些任务,可以通过具体的评估场景(如HELM评估)对模型进行全面考察。
HELM评估:HELM框架构建了42个评估场景,涵盖任务类型、领域和语言等多维度,评估大语言模型的任务能力。
任务分类:例如,MMLU(Massive Multi-task Language Understanding)用于测试多任务语言理解能力,AGIEval则关注人类水平的认知任务,适合用于衡量大语言模型的通用能力。
2.2 伦理与安全评估
随着大语言模型的普及,伦理与安全问题也越来越重要。例如,模型可能生成带有种族偏见、性别偏见、暴力内容或不准确的医学建议。因此,评估模型是否符合伦理要求成为必要的环节。
伦理安全场景:包括侮辱性内容、不公平与歧视性内容、犯罪行为等。
红队测试:通过生成可能引导模型产生有害输出的测试用例,评估模型的伦理安全性。
3、大语言模型评估方法
评估方法涵盖了从分类任务到回归任务、文本生成等不同类型的任务,具体的评估指标根据任务类型而定。以下是几种主要的评估方法:
3.1 分类任务评估指标
分类任务通常使用精确率、召回率、F1值等常见评估指标来衡量模型对标签的预测效果。这些指标有助于了解模型在多类别问题上的表现。
混淆矩阵:帮助可视化分类结果,进一步计算精确率、召回率等指标。
F1值:精确率和召回率的调和平均,用于平衡分类性能。
3.2 回归任务评估指标
回归任务的评估侧重于模型对连续值的预测能力。常用的评估指标包括平均绝对误差(MAE)、均方误差(MSE)、均方根误差(RMSE)等。
MAE、MSE、RMSE:评估模型预测值与真实值之间的误差大小,越小表示模型越准确。
3.3 文本生成任务评估
文本生成任务包括机器翻译、摘要生成、故事生成等。这类任务的评估指标比较特殊,常见的有BLEU、ROUGE等。
BLEU(机器翻译评估):通过计算n-gram的精确率,评估翻译结果与参考翻译的相似度。
ROUGE(文本摘要评估):主要用于评估自动生成的摘要与参考摘要之间的相似度。
3.4 语言模型评估
语言模型通常通过交叉熵(Cross-entropy)和困惑度(Perplexity)来评估其在文本生成任务中的表现。交叉熵衡量模型输出与真实文本之间的差异,而困惑度则是交叉熵的指数函数,表示模型的预测能力。
交叉熵和困惑度:衡量模型的预测效果,困惑度越低表示模型性能越好。
4、大语言模型评估实践
评估实践是模型评估方法在实际应用中的体现,涉及如何通过具体任务和数据集对大语言模型进行全面评估。
4.1 人工评估
人工评估是大语言模型评估的重要组成部分,尤其在文本生成任务中,人工评估更能准确捕捉生成文本的质量、流畅度和语义连贯性。人工评估通常采用李克特量表等工具,评估者根据模型生成内容的各个维度进行打分。
评分维度:语法正确性、逻辑连贯性、风格一致性等。
4.2 自动评估
自动评估方法更加高效,能够快速计算模型生成结果的质量。BLEU、ROUGE、精确率、召回率等都是常用的自动评估指标。自动评估适用于生成类任务和分类任务,尤其在处理大规模数据集时,能够节省大量的人工成本。
4.3 对比评估
对比评估通过直接比较不同系统或算法的性能来确定优劣。例如,使用麦克尼马尔检验等统计方法,基于2×2混淆矩阵分析不同模型在分类任务上的差异,帮助开发者选择最佳模型。
4.4 伦理与安全评估
随着伦理和安全问题的关注,如何评估模型的伦理性和安全性也成为重要实践。通过红队测试等方法,模拟攻击和评估潜在的伦理风险,确保模型在实际应用中的可靠性。
大语言模型的评估体系和方法是一个多层次、全方位的复杂问题。随着模型的能力不断增强,评估方法的多样化也使得评估工作更加精细化。通过人工评估、自动评估以及对比评估等方法,可以更全面地衡量大语言模型在多种任务中的表现,确保其在实际应用中能够高效、安全地工作。
-
发表了主题帖:
《大规模语言模型:从理论到实践》第七章 大语言模型应用
1、推理规划
推理规划是指通过合理的规划和步骤化的推理过程来解决复杂问题。大语言模型(如ChatGPT)通常依赖于自回归生成过程,即每次只生成下一个词或符号。在处理复杂问题时,模型的推理能力通常受限,直接生成最终结果往往忽略了中间的推理步骤。
1.1 思维链提示(Chain-of-Thought Prompting)
思维链提示(CoT)是一种通过提供中间推理步骤来提升模型推理能力的方法。与传统的直接生成答案不同,思维链提示要求模型在输出最终答案之前,先输出详细的推理过程。
思维链的影响:通过示例或明确指导,模型可以逐步生成解题步骤,从而提高推理准确性。研究表明,思维链提示在推理任务中能够显著提升模型的表现,尤其在需要常识推理、逻辑推理和数学推理时。
零样本思维链(Zero-shot CoT):研究发现,即使没有具体的示例,通过简单的指令“让我们一步一步思考(Let’s think step by step)”,模型也能自动生成推理步骤,从而优化其推理过程。
1.2 由少至多提示(Least-to-Most Prompting)
由少至多提示是一种任务分解策略,适用于需要将复杂问题分解为多个简单子问题的情境。模型首先理解问题的结构,然后将其分解成一系列子任务逐一解决。通过这种方法,复杂的推理任务被拆解为更容易处理的小任务。
问题分解:在推理过程中,模型会首先识别并分解问题,生成子问题的列表,并逐一解决每个子问题。
逐步解决:通过逐个解决每个子问题,最终合成完整的答案,提升了大语言模型在复杂任务中的解决能力。
2、综合应用框架
随着大语言模型(如ChatGPT、LLaMA)的普及,如何将这些模型与外部资源和应用进行有效结合,成为了开发智能应用的核心问题。综合应用框架通过将多个模型和外部工具连接在一起,实现了高效的任务处理。
LangChain框架:
LangChain是一个开源框架,旨在简化基于大语言模型的应用开发。它提供了一些模块化的组件,使开发者能够将大语言模型与其他资源(如数据库、API)进行无缝集成。
模块化设计:LangChain将大语言模型应用拆解为多个组件,开发者可以根据需求选择并组合这些组件,轻松构建复杂的应用。
链式操作:通过将多个组件串联起来,开发者可以构建端到端的智能应用,例如自然语言问答系统、自动化文档处理等。
集成外部数据源:LangChain支持与外部数据源(如Google、Wikipedia、Notion等)进行集成,进一步增强大语言模型的应用能力。
3、智能代理
智能代理(或智能体)是指能够自主接收并处理外部信息,执行任务并做出响应的自主系统。通过结合大语言模型,智能代理能够进行推理、规划和学习,逐步实现更高层次的智能行为。
3.1 智能代理的组成
智能代理通常由以下几个核心模块组成:
思考模块:负责处理输入信息并进行推理与决策。这包括自然语言理解、任务分解、推理与规划等能力。
记忆模块:类似于人类的记忆系统,智能代理需要存储信息以便后续使用。大语言模型的内置记忆可以通过外部存储(如数据库)进一步增强。
工具调用模块:智能代理可以调用外部工具(如API或执行命令),扩展其能力并完成更复杂的任务。
3.2 智能代理的应用
多智能体系统:多个智能代理可以协同工作,分工合作,以解决更复杂的任务。例如,在辩论任务中,多个智能代理可以就某一问题展开讨论,互相反馈并改进自己的观点,从而实现问题的有效解决。
角色扮演:智能代理还可以扮演特定角色(如客户服务代表、专家等),根据任务要求执行特定的操作。例如,在角色扮演任务中,智能代理可以与用户进行互动并解决实际问题。
4、多模态大模型
多模态大模型是指能够处理多种模态(如文本、图像、音频等)的人工智能模型。多模态大模型能够理解和生成不同类型的信息,突破了传统大语言模型仅限于文本输入的局限。
MiniGPT-4:
MiniGPT-4是一个多模态大语言模型,结合了视觉和语言处理能力。该模型由三个主要组件组成:预训练的大语言模型(如Vicuna)、视觉编码器和线性投影层。通过这些组件,MiniGPT-4能够处理图像输入并生成自然语言描述,展现出强大的图像理解和生成能力。
模型架构:MiniGPT-4使用了Vision Transformer(ViT)作为视觉编码器,结合Q-Former模块将图像信息与语言信息进行对齐,从而实现图像与文本的无缝融合。
训练策略:MiniGPT-4采用了分阶段的训练策略,首先在大量图像-文本对数据集上进行预训练,之后进行微调,确保模型能够生成高质量的图像描述和理解图像中的细节。
5、大语言模型推理优化
随着大语言模型的推理时间不可预测和计算资源需求增加,推理优化成为提升模型效率的重要方向。以下是一些关键的推理优化技术:
5.1 键值缓存优化
在自回归生成过程中,大语言模型每次生成新的词元时都需要计算查询、键和值。为了避免每次迭代重新计算所有的键值,可以使用键值缓存技术,保存之前计算过的键值,并在后续迭代中重复使用。
键值缓存的应用:通过缓存查询、键和值,模型在生成新词元时只需计算新生成的词元的查询、键和值,大大减少了计算量和延迟。
5.2 迭代级调度
针对大语言模型推理任务的可变执行时间,Orca提出了迭代级调度策略,在每个批次中单独运行每个迭代,减少了作业的等待时间。
5.3 FastServe框架
FastServe是一个针对大语言模型推理优化的框架,专注于低作业完成时间和高效GPU显存管理。它通过采用迭代级抢占策略和最小完成优先策略,减少了推理过程中作业的延迟。
随着大语言模型应用的不断扩展,推理规划、智能代理、多模态处理和推理优化等方面的研究与实践成为关键领域。通过思维链提示、由少至多提示、综合应用框架和智能代理等技术,研究人员正致力于提升模型的推理能力和多任务处理能力。同时,优化推理过程的速度和效率将直接影响到模型的应用体验,推动了如FastServe和vLLM等框架的发展。
-
发表了主题帖:
《大规模语言模型:从理论到实践》第六章 强化学习
1、基于人类反馈的强化学习
基于人类反馈的强化学习(Reinforcement Learning from Human Feedback, RLHF)是一种结合人类评价和强化学习优化的技术,广泛应用于训练大语言模型,使其能够遵循人类的指令并生成符合偏好的输出。RLHF在训练中利用人类反馈信号作为奖励,指导模型生成更符合期望的结果。
RLHF的工作原理:
状态与动作:在RLHF中,模型的生成过程视为一种“动作”,模型基于输入的指令生成输出(即回答),这一输出被视为“动作”,而“状态”则是输入的指令或问题。
奖励机制:人类通过评估模型输出的质量(如是否有帮助、有害或正确)来提供奖励信号,反馈给模型。与传统的基于交叉熵损失的监督学习不同,RLHF是针对整个输出文本进行优化,而非单个词的预测。
应用优势:RLHF能够通过人类的奖励信号引导模型生成更自然、连贯的文本,能够适应自然语言的多样性,并有效解决微小变化的敏感性问题,尤其适用于生成式任务,如对话生成和文本创作。
2、奖励模型
奖励模型在RLHF中起着关键作用,它通过学习来自人类的偏好数据,模拟人类的评价标准,为模型生成的文本分配奖励,从而指导模型的优化过程。
2.1 奖励模型的训练
奖励模型通常通过使用基于对比的数据集进行训练。在这些数据集中,每一对样本都包括一个“优先”样本和一个“非优先”样本,模型根据这些对比数据学习如何判断一个输出是否符合人类的偏好。
有用性与无害性:奖励模型的构建通常基于有用性(模型的帮助程度)和无害性(模型的输出是否会造成负面影响)。例如,在某些任务中,模型需要在给出帮助的同时避免有害输出。为了平衡这两者,奖励模型会综合考虑这两个因素。
数据来源:奖励模型的数据集通常来自人类偏好数据,使用标注者对不同输出的评价来训练模型。这些数据集可以通过众包平台(如Amazon Mechanical Turk)收集,确保标注多样性和高质量的反馈。
2.2 奖励模型的优化
奖励模型的训练涉及优化一个损失函数,该函数通过比较不同模型输出的质量来优化模型。例如,使用交叉熵损失来调整模型预测的奖励值,使得模型生成更高质量的文本。
3、近端策略优化
近端策略优化(Proximal Policy Optimization, PPO)是一种强化学习算法,广泛应用于优化大语言模型。PPO算法的核心思想是通过限制策略更新的幅度来保证学习过程的稳定性,从而避免在训练中出现剧烈的策略波动。
3.1 PPO的工作原理
策略优化:PPO算法通过不断优化策略模型,使其在给定状态下采取的动作(即模型生成的输出)能够获得更高的奖励。它通过对策略进行局部优化,确保每一步的策略更新不会偏离当前策略太远。
优势估计:PPO结合了优势估计(Advantage Estimation)来评估动作的好坏,通过计算每个动作的优势值(即相对于平均动作的改进值)来优化策略。
重要性采样:PPO使用重要性采样(Importance Sampling)来利用不同策略生成的样本,提高数据的利用效率。通过权重修正,保证样本的有效性。
3.2 PPO的优势与变种
PPO的优势:PPO相较于其他强化学习算法(如传统的策略梯度方法)具有更高的效率和稳定性。它能够有效减少样本浪费,避免训练过程中产生不稳定的策略。
PPO的变种:在PPO算法的基础上,研究者提出了多个变种来进一步提升算法的计算效率和稳定性,如PPO-Penalty和PPO-Clip,这些变种通过修改KL散度和剪枝策略来控制策略的更新范围。
4、MOSS-RLHF实践
MOSS-RLHF是基于PPO算法的强化学习框架,旨在提高大语言模型(如LLaMA)在生成任务中的表现,通过人类反馈对模型进行强化学习。
4.1 奖励模型的训练
MOSS-RLHF框架中的奖励模型基于LLaMA模型,通过训练奖励模型来模拟人类的偏好。这些模型通过大量的反馈数据进行训练,以评估模型生成的文本是否符合人类的期望(如有用性和无害性)。
4.2 PPO微调
PPO微调过程包括以下步骤:
模型加载:加载策略模型、评论模型、奖励模型和参考模型。策略模型用于生成文本,评论模型用于预测文本的质量,奖励模型提供奖励评分,参考模型帮助防止过度更新。
经验采样:从策略模型生成回复,奖励模型对回复进行评分,然后将这些数据记录到经验缓冲区。这个过程确保了模型在不断迭代中能够生成符合人类偏好的输出。
优势估计与训练:使用广义优势估计(GAE)算法计算优势函数和回报函数,基于这些估计值进行策略模型和评论模型的训练,以最大化最终奖励。
4.3 实践中的挑战
标注一致性:人类反馈的标注一致性较低(约60%~70%),因此需要严格控制标注者的标准,确保数据的质量。
奖励模型的噪声与修正:由于人类标注可能存在噪声,需要进行数据去噪处理。为了保证奖励模型的泛化能力,选择较大的底座模型进行训练能够提高其性能。
奖励劫持(Reward Hacking):在PPO训练中,模型可能会迅速找到某些方式来“操控”奖励函数,生成无意义或重复的回复。通过增加KL惩罚,可以避免这种情况,确保训练过程的稳定性。
基于人类反馈的强化学习(RLHF)为大语言模型的训练提供了有效的优化路径。通过奖励模型和PPO算法的结合,RLHF能够提高模型的生成能力,确保生成的内容更符合人类的偏好,并能解决生成式任务中出现的多样性和敏感性问题。MOSS-RLHF框架通过精细化的训练流程和对PPO的优化,进一步提高了强化学习的稳定性和效率,是大规模语言模型微调的重要工具。
-
发表了主题帖:
《大规模语言模型:从理论到实践》第五章 有监督微调
1、提示学习和语境学习
1.1 提示示学习(Prompt-based Learning)
提示学习是一种利用预训练语言模型(PLM)适应下游任务的方式,通常不需要对模型的参数进行更新。提示学习的基本思路是通过构建一个模板,将任务转换为模型能够理解的输入格式。提示学习可以支持小样本学习(Few-shot Learning)甚至零样本学习(Zero-shot Learning)。具体来说,提示学习的过程包含以下几个步骤:
提示添加:通过设计特定的模板将原始输入转换为语言模型能够处理的形式。例如,对于情感分析任务,可以设计模板:“[X] 我感到 [Z]”,其中[X]为待分析句子,[Z]为需要预测的情感标签。
答案搜索:将构建好的提示输入模型,模型输出对每个候选答案的概率。根据这些概率来选择最可能的答案。
答案映射:在分类任务中,模型的输出可能是一个词或短语,需要将其映射为最终标签,如“好”映射为“正面”。
提示学习方法简便且易于理解,但在实际应用中也面临着如何高效构建和优化提示的问题。软提示(Soft Prompt)作为一种扩展方法,通过在模型嵌入空间中直接优化提示的参数,而非依赖于自然语言文本,使得模型能够更灵活地应对不同的任务。
1.2 语境学习(In-context Learning)
语境学习是基于提示学习发展而来的一种新兴方法,尤其在GPT-3及后续大模型中得到了广泛应用。语境学习允许模型通过上下文中的几个例子来学习任务,并无需对模型进行任何参数更新。与提示学习不同,语境学习强调从示例中进行类比,模型会根据给定的示例生成答案。
示例驱动:通过输入示例和待预测任务,模型能够根据示例内容自动完成推理。例如,提供几个带有情感标签的句子作为示例,模型就能通过学习这些示例来预测新输入的情感标签。
优点:语境学习可以通过自然语言提供清晰的任务指令,不需要修改模型的权重,显著降低了任务适应成本。研究表明,语境学习能够有效激活大模型已有的知识,使其在没有额外训练的情况下表现出色。
挑战:语境学习的效果对示例的选择、顺序、标签等方面十分敏感。研究者认为,语境学习的成功更多依赖于输入示例的格式和上下文分布,而非传统的标签对应关系。
2、高效模型微调
随着大语言模型(如GPT、BERT等)规模的不断扩大,传统的全量微调(Full Fine-tuning)方法需要大量的计算资源和存储空间,推动了高效模型微调方法的发展。高效微调方法的目标是在最小化参数调整的同时,保持或提升模型在下游任务中的性能。
2.1 LoRA(Low-Rank Adaptation)
LoRA是一种高效微调方法,它通过在预训练模型的权重旁边添加低秩矩阵来模拟参数变化,从而避免了对整个模型的参数进行更新。这种方法不仅节省了内存和计算资源,还能在不影响模型性能的前提下,减少可训练的参数数量。
原理:LoRA通过添加低秩矩阵AAA和BBB到原模型的权重矩阵上,实现模型参数的高效调整。具体来说,预训练权重通过低秩矩阵旁路进行调整,而在微调过程中,只需要优化低秩矩阵,而不是整个模型的参数。
优势:LoRA能够显著降低训练的计算开销和显存占用。在训练大语言模型时,LoRA大大提高了训练效率,并且其微调后的模型与全量微调的模型在性能上相当。
2.2 LoRA的变体
LoRA的成功促使研究者提出了一些变体,如AdaLoRA和QLoRA,它们在传统LoRA的基础上做出了改进:
AdaLoRA:动态调整低秩矩阵的秩,以适应不同层的微调需求,进而进一步减少训练参数。
QLoRA:基于量化技术,使用4-bit的量化方法来优化内存使用,从而降低显存占用并提高计算效率。
3、模型上下文窗口扩展
随着大语言模型的应用日益广泛,许多任务需要处理超长文本,如多轮对话和长文档摘要等。当前的大部分大语言模型(如LLaMA)对输入文本的上下文窗口存在长度限制。例如,LLaMA模型的最大上下文窗口为2048个token,这对长文本的理解和生成造成了很大的限制。
3.1 位置编码的扩展
为了能够处理更长的文本,需要扩展模型的上下文窗口。位置编码在这方面起到了关键作用。通过使用位置插值法和外推能力强的编码方法,如ALiBi(相对位置编码)和RoPE(旋转位置编码),可以实现对超长文本的有效处理。
ALiBi:通过对注意力得分添加一个静态不可学习的偏置,能够有效地提高模型在长文本中的表现。
位置插值法:通过对现有位置编码进行线性插值,可以扩展上下文窗口,使得模型能够处理更长的文本。
3.2 插值法与实验
插值法通过调整位置编码的索引,使得模型能够在不改变架构的前提下,处理更长的上下文窗口。通过微调模型和插值位置编码,LLaMA模型的上下文窗口可以扩展至32768个token,极大提升了模型处理长文本的能力。
4、指令数据的构建
指令数据是有监督微调(SFT)过程中至关重要的一部分。指令数据的质量直接影响模型的微调效果和性能。指令数据的构建可以分为手动构建和自动生成两种方法。
4.1 手动构建指令
手动构建指令方法通常通过人工收集并筛选现有的数据,如网络问答和人工编写的任务指令,来保证数据的高质量和多样性。例如,LIMA方法通过从高质量的问答社区和指令集收集数据,确保指令的多样性和代表性。尽管这种方法耗费大量人力,但能够较好地控制数据质量。
4.2 自动构建指令
自动生成指令数据的方法通过大语言模型的生成能力来实现。Self-Instruct是一个典型的自动生成指令数据的方法,它通过一个迭代的过程来生成指令,并通过反馈机制不断优化。
步骤1:生成初步的指令数据集,通过模型自举生成新任务的指令。
步骤2:自动识别任务类型并生成适合该任务的输入输出对。
步骤3:过滤低质量数据,确保指令数据集的质量和多样性。
5、DeepSpeed-Chat SFT实践
DeepSpeed-Chat是微软推出的基于DeepSpeed框架的工具,专门用于训练类ChatGPT模型。DeepSpeed-Chat通过结合有监督微调(SFT)、奖励模型微调和强化学习(RLHF)三个步骤,能够有效训练大规模对话模型。
5.1 训练流程
DeepSpeed-Chat的训练流程包含以下步骤:
有监督微调(SFT):使用标注数据对模型进行微调。
奖励模型微调:对模型进行奖励优化,以增强其生成能力。
强化学习(RLHF):通过人类反馈进一步调整模型生成的质量和一致性。
5.2 数据预处理和自定义模型
在使用DeepSpeed-Chat训练模型时,数据预处理和自定义模型的支持非常重要。训练数据需要以JSON格式进行处理,并确保模型能够适应特定的任务和数据。DeepSpeed-Chat支持自定义模型的修改,使其能够适应不同的任务需求。
5.3 模型训练与推理
在完成数据预处理和自定义模型的配置后,训练模型并通过DeepSpeed-Chat提供的推理功能进行评估。训练过程中会定期评估模型的困惑度(PPL),并调整训练策略以优化模型性能。
随着大语言模型在各种任务中的广泛应用,如何高效地进行微调、扩展模型上下文窗口、构建优质指令数据和实现强化学习等方面的研究和实践显得尤为重要。DeepSpeed-Chat作为一种强大的训练工具,通过优化训练流程和支持多种微调方法,显著提升了大规模对话模型的训练效率和质量。
- 2025-02-20
-
发表了主题帖:
《大规模语言模型:从理论到实践》第四章 分布式训练
1、分布式训练概述
随着深度学习尤其是大语言模型的飞速发展,单个计算设备的计算和存储能力已无法满足训练需求。例如,GPT-3拥有1750亿个参数,它的训练需要极为庞大的计算资源和存储空间。为了应对这种挑战,分布式训练系统应运而生。分布式训练的核心目标是将一个大规模模型训练任务分解为多个较小的子任务,并将这些子任务分配到多个计算设备上进行并行处理,从而加速训练过程。
1.1 计算墙、显存墙和通信墙
计算墙:计算能力的不足使得单个计算设备难以承载庞大的训练任务。以NVIDIA H100 GPU为例,其算力为2000 TFLOPS,而GPT-3的训练需要314 ZFLOPS的计算量,相差8个数量级。这使得训练一个大模型变得极其困难,迫使人们使用多个计算设备来进行分布式训练。
显存墙:大语言模型的参数量通常远超单个计算设备的显存容量。例如,GPT-3模型需要约700GB的显存来存储其模型参数,而当前的高端GPU(如NVIDIA H100)仅有80GB显存。因此,模型参数需要被切分,并分布在多个GPU上,通过模型并行来解决显存问题。
通信墙:分布式训练中的每个设备需要进行参数更新时,会产生大量的梯度数据传输。随着设备数量的增加,通信效率成为瓶颈。例如,在GPT-3的训练过程中,每轮迭代可能需要传输89.6TB的梯度数据,这会对网络带宽造成极大的压力。因此,如何优化通信效率是提升分布式训练效率的关键。
1.2 分布式训练的目标和挑战
分布式训练的最终目标是提升整体训练速度,缩短训练周期。理论上,总训练速度是由单设备计算速度、计算设备总量以及多设备加速比共同决定的。加速比受限于计算能力和通信效率,通信效率的优化成为关键。因此,分布式训练不仅仅是增加设备数量,还需要设计高效的并行策略和网络架构,以提高整体的计算和通信性能。
2、分布式训练的并行策略
2.1 数据并行(Data Parallelism)
数据并行是最常见的分布式训练策略。在数据并行中,每个计算设备(如GPU)都持有模型的副本,任务是将训练数据拆分成多个小批次,每个计算设备计算其数据批次的梯度并将结果传回主设备(或所有设备)。随后,所有计算设备会根据平均梯度来更新模型参数。
优点:数据并行能够通过增加计算设备数量,显著提高训练速度。每个设备仅负责处理一部分数据,不需要持有整个模型,因此显存占用较少。
缺点:在每次迭代时,所有设备之间需要同步梯度,这会带来较高的通信开销。尤其在大规模集群中,随着设备数量的增多,通信延迟和带宽限制可能成为训练效率的瓶颈。
2.2 模型并行(Model Parallelism)
模型并行通过将一个模型拆分成多个部分,分配到不同设备上进行计算。对于内存需求过高的模型(如GPT-3),无法将整个模型加载到单个设备时,模型并行就显得尤为重要。
层间并行(Pipeline Parallelism):将模型的不同层划分到不同设备上,分批次进行计算。每个设备只负责其负责的模型层的计算。这种方法可以有效利用设备的内存,但由于每个设备需要等待前一个阶段完成后才能开始计算,会导致设备的平均利用率降低,形成所谓的“流水线气泡”。
张量并行(Tensor Parallelism):将模型中的参数切分为较小的张量,并将它们分布在不同的设备上进行并行计算。每个计算设备处理部分张量的计算,通过网络交换张量的部分数据来完成整个计算。这种方法解决了单个设备内存不足的问题,适用于大规模模型。
2.3 混合并行(Hybrid Parallelism)
混合并行结合了数据并行、模型并行和流水线并行的优点,通常用于处理更大规模的模型。例如,在BLOOM的训练中,结合了张量并行、数据并行和流水线并行,通过多级并行优化计算和通信效率。混合并行能够最大化计算资源的利用率,并减少通信瓶颈,适用于训练超大规模的语言模型。
3、分布式训练的集群架构
为了支持大规模的分布式训练,通常需要使用高性能计算集群。集群的设计需要兼顾计算能力、网络带宽、设备间通信效率等多个方面。
3.1 参数服务器架构(Parameter Server)
参数服务器架构是分布式训练的经典架构。在这种架构中,模型的参数被分布在一个或多个服务器上,训练服务器负责计算梯度并将结果推送到参数服务器,后者进行梯度聚合和模型参数的更新。这种架构具有良好的扩展性,但在大规模集群中可能会受到通信瓶颈的限制。
同步训练模式:所有训练服务器必须等待其他设备完成梯度计算后,才能进行参数更新。这种模式确保了训练的一致性,但也带来了较高的通信开销。
异步训练模式:训练服务器在计算完成后立即将梯度更新到参数服务器,而不等待所有设备的计算结果。虽然可以提高训练速度,但会导致参数更新的冲突,从而影响模型的收敛性和稳定性。
3.2 去中心化架构(Decentralized Architecture)
去中心化架构不依赖于单个中央服务器,而是通过节点之间直接通信来进行训练。这种架构能够减少通信瓶颈,增强可扩展性,特别适用于需要频繁进行参数同步的大型分布式训练。
去中心化架构通常使用集合通信(Collective Communication)方法,如All-Reduce,将各个节点的计算结果合并并传递给所有计算节点。通过高效的通信协议,可以最大化集群的利用效率,减少通信开销。
4、DeepSpeed实践
DeepSpeed是微软开发的一种深度学习优化库,旨在为大规模神经网络训练提供高效的分布式支持。它通过一系列技术手段提升了大规模模型训练的效率,特别是在内存管理和计算优化方面表现突出。
4.1 ZeRO优化器(Zero Redundancy Optimizer)
ZeRO是DeepSpeed的核心优化策略之一,它通过将模型参数、梯度和优化器状态进行分片存储,减少了每个计算节点的内存占用。ZeRO有多个版本:
ZeRO-1:对优化器状态进行分片,内存占用为原来的1/4。
ZeRO-2:对优化器状态和梯度进行分片,内存占用为原来的1/8。
ZeRO-3:对优化器状态、梯度和模型参数进行分片,极大降低内存占用。
4.2 3D并行
DeepSpeed支持3D并行,即结合数据并行、张量并行和流水线并行。3D并行的核心在于对硬件架构的充分利用,通过优化显存效率和计算效率来提高训练速度。
4.3 混合精度训练(Mixed Precision Training)
DeepSpeed支持混合精度训练,利用FP16和BF16格式来减少显存占用并加速训练过程。通过将模型计算和存储分配到不同的数据格式,可以在不牺牲精度的情况下提高训练效率。
4.4 1-bit Adam优化器
DeepSpeed还实现了1-bit Adam优化器,它通过减少每次训练中的通信量,显著降低了分布式训练中的通信开销。1-bit Adam在保证收敛性的同时,提高了分布式训练的效率。
4.5 DeepSpeed与LLaMA
DeepSpeed被广泛应用于LLaMA等大规模语言模型的训练,提供了高效的内存优化、并行策略和计算能力。DeepSpeed的集成使得LLaMA能够在有限的硬件资源上完成训练,而不会受到显存和计算瓶颈的限制。
随着语言模型规模和数据量的激增,分布式训练已经成为大规模神经网络训练不可或缺的一部分。通过数据并行、模型并行和混合并行策略,结合高效的硬件架构和优化算法,如DeepSpeed,分布式训练能够大幅提升训练速度并降低资源消耗。DeepSpeed通过ZeRO优化器、混合精度训练、1-bit Adam等技术,使得训练超大规模模型变得更加高效和可扩展。
-
发表了主题帖:
《大规模语言模型:从理论到实践》第三章 大语言模型预训练数据
1、数据来源
大语言模型的训练需要大量多样化的数据,数据来源可以分为通用数据和专业数据两大类。
通用数据:这些数据包括网页、书籍、新闻、对话文本等,具有规模大、易获取和多样性强的特点。通用数据能为大语言模型提供丰富的语言建模能力和泛化能力,帮助模型理解和生成多种类型的文本。常见的数据来源包括:
网页数据:如CommonCrawl、ClueWeb、SogouT等,爬取了大量网站上的内容。尽管这些数据的质量参差不齐,但通过过滤和清洗,可以提取出高质量的文本。
对话数据:包含了社交媒体、论坛、电子邮件等多方参与者的对话内容。对话数据可以增强大语言模型的对话能力和多轮问答能力,常见数据集如Reddit、Ubuntu对话语料等。
书籍数据:书籍是对各种领域知识和语言风格的全面呈现,通过使用书籍数据,大语言模型可以提高在不同领域的理解能力。常见数据集如BooksCorpus、Pile中的Books3。
专业数据:这些数据特定于某一领域,如科学文献、代码数据、多语言数据等。引入专业数据可以提升模型在特定任务上的能力:
多语言数据:包括来自不同语言的文本,有助于增强大语言模型的跨语言理解能力。BLOOM、PaLM等模型使用了多语言数据进行训练,涵盖多达46种语言。
科学文本数据:如学术论文、技术文档等,能提高模型在理解专业领域内容上的能力。常见数据来源包括arXiv、PubMed等。
代码数据:程序代码数据对代码生成任务至关重要,主要来源于编程问答社区(如Stack Exchange)和开源代码仓库(如GitHub)。
2、数据处理
大语言模型的训练不仅依赖于数据的规模和多样性,数据的质量也至关重要。由于互联网数据的噪声较多,处理数据的步骤包括以下几个方面:
质量过滤:由于网络数据中包含许多低质量的内容,需要通过分类器或启发式方法过滤掉这些低质量数据。基于分类器的方法通过训练模型来识别低质量文本,而基于启发式的方法则通过规则来去除噪声数据,如语言过滤、关键词过滤、困惑度过滤等。
冗余去除:去除重复内容是数据处理的重要步骤,因为重复数据会导致模型训练不稳定或泛化能力差。冗余去除可分为句子级、段落级和文档级的去重,常用的方法包括哈希检测、n-gram重叠检测等。
隐私消除:由于部分数据中可能包含个人隐私信息(PII),需要通过命名实体识别(NER)等技术识别并去除敏感信息。
词元切分:词元切分是将文本拆分为较小的单位(如子词或字符)的过程,常见方法包括字节对编码(BPE)、WordPiece和Unigram模型。这些方法通过子词级别的处理,有效解决了未登录词(OOV)问题,提高了词汇覆盖率。
3、数据影响分析
数据的规模、质量和多样性对大语言模型的性能有显著影响:
数据规模:研究表明,随着模型参数量的增加,训练数据的规模也应适当扩大。例如,LLaMA模型训练时使用了2万亿词元,数据量的增加显著提升了模型的性能。实验表明,训练数据和模型大小需要等比例扩展,以达到计算最优。
数据质量:数据的质量是决定模型效果的关键因素。训练时使用清洗过的高质量数据相比于原始数据能显著提高模型性能。尤其是对于自然语言生成任务,高质量数据能有效提高文本的流畅度和逻辑一致性。
数据多样性:不同来源的数据包含不同的知识和语言特征,增加数据的多样性有助于提高模型的泛化能力和适应性。多领域、多语言的训练数据能够使模型在多种任务上表现更好,特别是在跨语言或跨领域任务上。
4、开源数据集
为了促进大语言模型的研究和应用,许多学术界和工业界的团队开源了大规模的数据集,这些数据集涵盖了多个领域,并经过精细化处理,确保数据质量:
Pile:由EleutherAI团队构建,包含22个子集,涵盖了多种领域的数据,如书籍、论文、网页等。Pile的数据量达到825GB,广泛应用于大语言模型的训练。
ROOTS:由BigScience项目构建,包含了46种自然语言和13种编程语言,约1.6TB。ROOTS数据集通过广泛收集公开语料、网页抓取、GitHub代码等,增强了数据的多样性和语言覆盖性。
RefinedWeb:由TII团队发布,基于CommonCrawl数据集进行过滤和处理,提供了高质量的网页数据,主要用于训练Falcon模型。该数据集的特点是严格的过滤和冗余去除过程。
SlimPajama:由CerebrasAI团队构建,基于RedPajama数据集进行清洗和去重,提供了精细处理的数据,适合用于大语言模型训练。
在大语言模型的训练中,数据的来源、处理、质量和多样性直接影响模型的训练效果和泛化能力。通过合理选择和处理数据,可以大大提高模型的性能。开源数据集的出现为大语言模型的研究提供了丰富的资源,推动了NLP领域的进步。
-
发表了主题帖:
《大规模语言模型:从理论到实践》第二章大语言模型基础
1、Transformer结构
Transformer结构是由Google在2017年提出并首先应用于机器翻译的神经网络模型架构。机器翻译的目标是从源语言(Source Language)转换到目标语言(Target Language)。Transformer 结构完全通过注意力机制完成对源语言序列和目标语言序列全局依赖的建模,如今几乎全部大语言模型都是基于 Transformer 结构。
Transformer架构首次应用于神经机器翻译任务,标志着自然语言处理(NLP)领域的一次革命。它摒弃了传统的循环神经网络(RNN)和卷积神经网络(CNN),通过完全依赖注意力机制(Attention)来实现对序列数据的建模,解决了长期以来RNN模型面临的梯度消失和长程依赖问题。Transformer的成功为GPT、BERT等一系列预训练语言模型奠定了基础。
1.1 Transformer的编码器和解码器
Transformer由两部分构成:编码器(Encoder)和解码器(Decoder)。每一部分由多个相同结构的Transformer块(Block)堆叠而成。编码器的任务是对输入序列进行编码,提取其中的语义信息;解码器则负责生成目标序列,逐步输出生成的词汇。
编码器部分:每个Transformer块包含两个主要的子层:多头自注意力(Multi-Head Self-Attention)和位置感知前馈神经网络(Position-wise Feed-Forward Network)。编码器的输入首先通过嵌入层(Embedding)转换为向量表示,然后通过加上位置编码(Positional Encoding)来引入位置信息,因Transformer没有像RNN那样的序列性结构,位置编码使模型能够理解输入序列中词的顺序。
解码器部分:解码器的结构与编码器类似,但解码器的每个Transformer块除了自注意力层外,还包含了一个交叉注意力层(Cross-Attention),它接收来自编码器的输出与自身的历史信息,以生成新的词汇。同时,解码器的第一个自注意力子层使用了掩码(Mask)机制,确保模型在生成时仅能看到前面的上下文,防止“泄露”未来的词汇信息。
1.2 主意力机制与位置编码
自注意力机制(Self-Attention):自注意力机制是Transformer架构的核心,它通过计算输入序列中每个单词与其他单词之间的相似度来生成当前单词的上下文表示。在计算时,输入序列通过映射到三个不同的向量空间:查询(Query)、键(Key)和值(Value),通过计算查询与键的点积来获取注意力分数,再根据这些分数加权平均值向量,从而得到每个词的上下文表示。
多头注意力(Multi-Head Attention):为了捕获不同子空间的上下文信息,Transformer引入了多头注意力机制。每个头独立地计算注意力,然后将结果拼接在一起,形成最终的输出。这样可以使得模型能够同时关注输入序列的不同部分,从而提高了模型的表达能力。
位置编码(Positional Encoding):由于Transformer不具备RNN中的隐状态传递,因此需要一种方法来表示词汇在序列中的相对位置。位置编码通过给每个词向量添加一个位置编码,使得模型可以利用这些信息捕捉词与词之间的相对顺序。位置编码通常使用正余弦函数生成,其中每个位置的编码具有唯一性,可以帮助模型区分不同位置的词汇。
1.3 优化技巧:残差连接与层归一化
Transformer中每个子层都有残差连接和层归一化技术,这两个技巧显著提高了训练的稳定性。残差连接通过直接将输入与输出相加,避免了深层网络中的梯度消失问题。层归一化则帮助稳定模型的训练过程,通过对每一层的输出进行标准化,使得每一层的输出在相同的范围内,有助于加速收敛并提高模型性能。
Transformer架构为大语言模型提供了强大的基础,大语言模型(如GPT、LLaMA等)通过预训练-微调的范式在多种自然语言处理任务中取得了显著的成绩。随着模型规模的不断扩大和计算资源的增强,未来大语言模型将继续推动NLP领域的进步,但同时也面临着计算效率和资源消耗的挑战。
2、生成式预训练语言模型GPT
生成式预训练语言模型(GPT)是OpenAI于2018年提出的一种基于Transformer的自回归语言模型。GPT采用生成式预训练(Generative Pre-Training,GPT)的方法,通过海量文本数据进行预训练,学习通用的语言表示能力,之后通过微调(fine-tuning)适应特定任务。
2.1 GPT的无监督预训练
GPT的训练过程包括两个阶段:无监督预训练和有监督微调。在无监督预训练阶段,GPT模型通过大量无标注文本数据学习语言的基础知识。具体来说,GPT是一个单向的自回归模型,它通过最大化每个词出现的条件概率来训练模型,即每次预测一个词,条件是当前词之前的所有词。GPT的目标是通过无监督学习,捕捉语言的统计规律,并学习如何生成连贯的文本。
GPT的输入文本首先通过词嵌入层(Word Embedding)转换为向量表示,然后通过多个Transformer层进行处理,每一层通过自注意力机制和前馈网络对输入的序列进行建模。
2.2 有监督下游任务微调
预训练完成后,GPT会对下游任务进行微调。微调阶段,GPT根据特定的任务需求(如文本分类、情感分析等),通过标注数据进行训练。微调的核心思想是利用预训练模型学习到的通用语言表示,再通过针对性的数据对其进行调整,使其能够更好地解决具体任务。
在微调过程中,GPT通过将任务相关数据输入到预训练的Transformer模型中,预测每个位置的词,并通过损失函数(如交叉熵)对模型进行优化。通过这种方法,GPT能够执行各种NLP任务,而不需要针对每个任务从头开始训练模型。
2.3 GPT-3与其创新性
GPT-3是OpenAI发布的第三代语言模型,具有1750亿个参数,相比于前代的GPT-2,它的参数量和计算能力大幅提升。GPT-3能够执行多种NLP任务,如文本生成、翻译、问答、摘要等,且无需显式微调,展示了预训练语言模型的强大零-shot(zero-shot)能力。
GPT-3的成功标志着生成式预训练语言模型的强大能力,但也暴露了其训练资源消耗巨大、可解释性差等问题。
3、大语言模型的结构
大语言模型,如GPT-3和LLaMA等,通常基于Transformer架构,但在某些方面进行了改进。例如,LLaMA采用了不同的归一化和激活函数,以及旋转位置嵌入(RoPE),这些改进使得LLaMA在某些任务上性能优越。
3.1 LLaMA与GPT-3的不同
LLaMA是Meta(前Facebook)推出的一个大规模语言模型,类似于GPT-3,但在架构上做了以下几个改进:
RMSNorm归一化:LLaMA采用了RMSNorm归一化方法,而不是GPT-3使用的LayerNorm。RMSNorm计算简便,且在某些场景下比LayerNorm效果更好。
SwiGLU激活函数:LLaMA使用SwiGLU激活函数,这种激活函数比传统的ReLU在很多任务中表现得更好,尤其是在大规模模型的训练中。
RoPE位置嵌入:LLaMA采用旋转位置嵌入(RoPE),通过复数的数学形式来表示位置编码。这种方法相比传统的位置编码能够更好地处理长序列。
3.2 注意力机制优化
由于Transformer架构中的自注意力机制计算复杂度为O(n²),随着序列长度的增加,计算和存储需求呈二次增长,这导致大语言模型在处理长文本时面临显著的计算瓶颈。为了解决这一问题,研究人员提出了几种优化方法:
稀疏注意力(Sparse Attention):通过减少注意力计算的范围,只关注输入序列中相关的部分,从而减小计算复杂度。
FlashAttention:通过优化内存访问和计算,减少了对全局内存的依赖,提高了计算效率。
多查询注意力(Multi-Query Attention):通过共享键和值的集合,使得每个注意力头只保留一个查询,从而减少显存的使用,提高计算效率。
- 2025-02-19
-
发表了主题帖:
【树莓派Pico测评】Pico简介与点灯测试
1、Pico简介
作为树莓派的新款微控制器系列产品,树莓派Pico售价仅需4美金,基于树莓派定制的RP2040微处理器,双核Arm Cortex M0 Plus架构,最高主频可达133MHz,板载264K的SRAM内存和2Mflash闪存空间,26个多功能GPIO,3.3V信号电平,通孔旁边有邮票孔,便于集成到其他电路板,2组SPI,2组I2C,2组UART,3个12bit精度的模数转换接口,16个可控PWM通道,USB1.1,支持主从设备。
树莓派Pico仍然使用Micro USB接口而不是当下更流行的Type-C接口,虽然说是为了节省成本,但是还是不得不吐槽一下,还有一个槽点就是引脚丝印只印在了背面,而正面没有,然后排针只能反着焊接喽,哈哈哈。
2、点灯测试
树莓派官方提供非常详细的资料,具体网址(Raspberry Pi Documentation - Microcontrollers),基于官网提供的资料可以看到,树莓派Pico支持MicroPython和C++两种语言,Pico切换编程语言通过刷入不同的固件来实现,按住Pico上的BOOTSET键不放,将Pico插上USB线,电脑上会出现一个RPI-RP2的盘符,优盘里提供了Pico的官网链接,使用MicroPython烧录固件,将该语言版本的UF2文件下载下来,直接拖进盘符,等待几秒就完成了了固件的刷新,Pico会自动断开刷固件的模式,重新以运行代码的串口模式连接到电脑,即可对Pico进行编程。
使用官方推荐的IDE:Thonny进行编程,同时Thonny也支持刷入固件,Thonny集成了Shell命令行,代码编辑和文件上传下载为一体,看到Shell命令行出现了>>>提示符,说明Thonny成功连接到了Pico,命令行也显示出了板子上安装的固件版本,在命令行输入print("hello")按下回车键后,Pico会成功返回hello。
接着进行闪烁led灯,代码如下,将代码保存为main.py文件到Pico上面,按照MicroPython的规则,上电之后自动开始运行main.py文件。
from machine import Pin
import time
led = Pin(25,Pin.OUT)
while True:
led.toggle()
time.sleep(1)
可以看到LED灯持续闪烁,结果如下所示:
- 2025-02-10
-
发表了主题帖:
《大规模语言模型从理论到实践》书籍概览
1、大模型的发展由来
大规模语言模型(Large Language Models,LLM),也称大语言模型或大型语言模型,是一种由包含数百亿以上参数的深度神经网络构建的语言模型,通常使用自监督学习方法通过大量无标注文本进行训练。
自2018年以来,Google、OpenAI、Meta、百度、华为等公司和研究机构都相继发布了包括BERT,GPT等在内多种模型,并在几乎所有自然语言处理任务中都表现出色。2019年大模型呈现爆发式的增长,特别是2022年11月ChatGPT(Chat Generative Pre-trained Transformer)发布后,更是引起了全世界的广泛关注。用户可以使用自然语言与系统交互,从而实现包括问答、分类、摘要、翻译、聊天等从理解到生成的各种任务。大规模语言模型展现出了强大的对世界知识掌握和对语言的理解能力。大语言模型的发展历程虽然只有短短不到五年,但是发展速度相当惊人,截止2023 年6 月,国内外有超过百种大模型相继发布,发展时间历程如下图所示。
大语言模型的发展可以粗略地分为如下三个阶段:基础模型阶段、能力探索阶段和突破发展阶段。基础模型阶段主要集中于2018 年至2021 年,此阶段的研究主要集中在语言模型本身;能力探索阶段集中于2019 年至2022 年,在直接利用大语言模型进行零样本和少样本学习的基础上,逐渐扩展到利用生成式框架针对大量任务进行有监督微调的方法,有效提升了模型的性能;突破发展阶段以2022 年11 月ChatGPT 的发布为起点,各大公司和研究机构相继发布了此类系统,包括Google 推出的Bard、百度的文心一言、科大讯飞的星火大模型、智谱ChatGLM、复旦大学MOSS 等。
2、书籍目录部分介绍
全书总共分为4个部分、8个章节,具体如下图。
第一部分(第二章)详细介绍大规模语言模型的基础理论知识,包括语言模型的定义、Transformer 结构,以及大规模语言模型框架等内容,并以 LLaMA 所采用的模型结构为例,提供代码实例的介绍。
第二部分(第三章和第四章)主要介绍预训练的相关内容,包括在模型分布式训练中需要掌握的数据并行、流水线并行和模型并行等技术。同时,介绍ZeRO系列优化方法。此外,详细介绍预训练所需的数据分布和数据预处理方法,并以DeepSpeed为例,演示如何进行大规模语言模型的预训练。
第三部分(第五章和第六章)聚焦于大规模语言模型在指令理解阶段的主要研究内容。着重阐述如何在基础模型的基础上利用有监督微调和强化学习方法,使模型能够理解指令并给出类人回答。具体介绍了高效微调方法、有监督微调数据构造方法、强化学习基础和近端策略优化方法,并以 DeepSpeed-Chat和 MOSS-RLHF为例,说明如何训练类ChatGPT系统。
第四部分(第七章和第八章)重点介绍大规模语言模型的扩展应用和评价。围绕大规模语言模型的应用和评估展开讨论。主要包括与外部工具和知识源连接的LangChain 技术,能够利用大规模语言模型进行自动规划执行复杂任务的应用,以及传统的语言模型评估方式和针对大规模语言模型使用的各类评估方法。
章节内容如下:
第一章为绪论部分,包括大语言模型的基本概念、发展历程、构建流程和本书的内容安排。
第二章为大语言模型基础,包括Transformer结构,生成式预训练语言模型GPT和大语言模型的结构。
第三章为大语言模型预训练数据,包括数据来源、数据处理,数据影响分析与开源数据集。
第四章为分布式训练,包括分布式训练概述,分布式训练的并行策略,分布式训练的集群架构和DeepSpeed实践。
第五章为有监督微调,包括提示学习和语境学习,高效模型微调,模型上下文窗口扩展,指令数据的构建,DeepSpeed-Chat SFT实践。
第六章为强化学习,包括基于人类反馈的强化学习,奖励模型,近端策略优化,MOSS-RLHF实践。
第七章为大语言模型应用,包括推理规划,综合应用框架,智能代理,多模态大模型,大语言模型推理优化。
第八章为大语言模型评估,包括模型评估概述,大语言模型评估体系,大语言模型评估方法和大语言模型评估实践。
3、相关资料
该书籍在开篇部分精心设计,为读者提供了基础数学符号的说明,这一举措极大地方便了读者对后续内容的学习和理解。此外,书中还细致地列出了参考文献,这不仅体现了学术的严谨性,也为有兴趣深入了解的读者提供了进一步探索的途径。通过这些贴心的安排,书籍旨在为读者打造一个全面、易于理解的学习环境。
同时本书提供相关课件PPT(百度网盘链接: https://pan.baidu.com/s/1n4ae-3ByYkoo0fFJ9R0N9A?pwd=ha7e提取码: ha7e)与代码仓库(https://github.com/intro-llm/intro-llm.github.io/tree/main/code/ch5),
4、书籍整体的概括
书籍采用了高质量的印刷标准,字体清晰、纸张质量良好,排版设计专业,为读者提供舒适的阅读体验。
书籍的目录安排显示出一个清晰的结构,从基础理论到实践应用,再到评估方法,每个章节都有明确的主题和目标。这种结构化的布局有助于读者逐步深入理解大语言模型的各个方面。
同时每个章节都配有相关的公式、图片以及相应的代码,这种设计不仅增强了理论的可理解性,也为读者提供了实践操作的指导。这种实用性强的编写方式适合那些希望将理论知识应用于实际问题的读者。
书籍中包含的图片和图表有助于直观展示复杂的概念和流程,这对于理解和记忆信息非常有帮助。
图文并茂的呈现方式使得阅读更加生动有趣。书籍鼓励读者通过实践来加深理解,例如通过代码实践来构建和训练自己的大语言模型。这种互动性可以提高读者的参与度和学习效率。
书籍内容覆盖了从基础到高级的多个层面,适合不同背景的读者。无论是初学者还是有一定基础的专业人士,都能从中找到有价值的信息。
- 2025-02-07
-
回复了主题帖:
测评入围名单: PolarFire SoC FPGA Discovery 套件
确认可完成测评计划
- 2025-01-19
-
回复了主题帖:
【测评入围名单(最后1批)】年终回炉:FPGA、AI、高性能MCU、书籍等65个测品邀你来~
个人信息无误,确认可以完成测评计划
- 2024-12-26
-
回复了主题帖:
【第二轮入围名单】《大规模语言模型:从理论到实践》
个人信息无误,确认可以完成测评分享计划。
- 2024-11-16
-
发表了主题帖:
嘉楠科K230AI开发板测评8--音频采集、播放、编码与解码、视频采集、播放与编码
本帖最后由 dfjs 于 2024-11-16 22:32 编辑
嘉楠科K230AI开发板测评8
1、音频采集与播放
K230自带采集与播放接口,接口位置如下左图所示,音频输入为:麦克风咪头,音频输出:3.5mm音频口(双声道),查看音频采集与播放的原理图,可能是考虑到功率问题,没有功率放大电路,而采用了3.5mm的音频口输出而不是直接喇叭外放。
具体代码思路如下:
初始化模块:
导入必要的模块,包括 os、media、pyaudio 和 wave。
初始化媒体管理和音频对象。
异常处理:
定义 exit_check 函数,用于捕获键盘中断(Ctrl+C)并优雅地退出程序。
音频录制:
定义 record_audio 函数,用于录制音频并保存为 WAV 文件。
设置音频参数,如采样率、采样精度、声道数和 chunk 大小。
打开音频输入流,读取音频数据并存储到列表中。
将列表中的音频数据保存到 WAV 文件中。
音频播放:
定义 play_audio 函数,用于播放 WAV 文件。
打开 WAV 文件,读取音频参数。
打开音频输出流,读取 WAV 文件中的音频数据并写入输出流。
实时回放:
定义 loop_audio 函数,用于实时采集音频并立即播放。
设置音频参数,打开音频输入流和输出流。
从输入流中读取音频数据并立即写入输出流。
主程序:
在 __main__ 块中,启用退出点,启动音频示例。
调用 play_audio、record_audio 或 loop_audio 函数来执行相应的音频操作。
参考代码如下:
# audio input and output example
#
# Note: You will need an SD card to run this example.
#
# You can play wav files or capture audio to save as wav
import os
from media.media import * #导入media模块,用于初始化vb buffer
from media.pyaudio import * #导入pyaudio模块,用于采集和播放音频
import media.wave as wave #导入wav模块,用于保存和加载wav音频文件
def exit_check():
try:
os.exitpoint()
except KeyboardInterrupt as e:
print("user stop: ", e)
return True
return False
def record_audio(filename, duration):
CHUNK = int(44100/25) #设置音频chunk值
FORMAT = paInt16 #设置采样精度
CHANNELS = 2 #设置声道数
RATE = 44100 #设置采样率
try:
p = PyAudio()
p.initialize(CHUNK) #初始化PyAudio对象
MediaManager.init() #vb buffer初始化
#创建音频输入流
stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
frames = []
#采集音频数据并存入列表
for i in range(0, int(RATE / CHUNK * duration)):
data = stream.read()
frames.append(data)
if exit_check():
break
#将列表中的数据保存到wav文件中
wf = wave.open(filename, 'wb') #创建wav 文件
wf.set_channels(CHANNELS) #设置wav 声道数
wf.set_sampwidth(p.get_sample_size(FORMAT)) #设置wav 采样精度
wf.set_framerate(RATE) #设置wav 采样率
wf.write_frames(b''.join(frames)) #存储wav音频数据
wf.close() #关闭wav文件
except BaseException as e:
print(f"Exception {e}")
finally:
stream.stop_stream() #停止采集音频数据
stream.close()#关闭音频输入流
p.terminate()#释放音频对象
MediaManager.deinit() #释放vb buffer
def play_audio(filename):
try:
wf = wave.open(filename, 'rb')#打开wav文件
CHUNK = int(wf.get_framerate()/25)#设置音频chunk值
p = PyAudio()
p.initialize(CHUNK) #初始化PyAudio对象
MediaManager.init() #vb buffer初始化
#创建音频输出流,设置的音频参数均为wave中获取到的参数
stream = p.open(format=p.get_format_from_width(wf.get_sampwidth()),
channels=wf.get_channels(),
rate=wf.get_framerate(),
output=True,frames_per_buffer=CHUNK)
data = wf.read_frames(CHUNK)#从wav文件中读取数一帧数据
while data:
stream.write(data) #将帧数据写入到音频输出流中
data = wf.read_frames(CHUNK) #从wav文件中读取数一帧数据
if exit_check():
break
except BaseException as e:
print(f"Exception {e}")
finally:
stream.stop_stream() #停止音频输出流
stream.close()#关闭音频输出流
p.terminate()#释放音频对象
wf.close()#关闭wav文件
MediaManager.deinit() #释放vb buffer
def loop_audio(duration):
CHUNK = int(44100/25)#设置音频chunck
FORMAT = paInt16 #设置音频采样精度
CHANNELS = 2 #设置音频声道数
RATE = 44100 #设置音频采样率
try:
p = PyAudio()
p.initialize(CHUNK)#初始化PyAudio对象
MediaManager.init() #vb buffer初始化
#创建音频输入流
input_stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
#创建音频输出流
output_stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
output=True,frames_per_buffer=CHUNK)
#从音频输入流中获取数据写入到音频输出流中
for i in range(0, int(RATE / CHUNK * duration)):
output_stream.write(input_stream.read())
if exit_check():
break
except BaseException as e:
print(f"Exception {e}")
finally:
input_stream.stop_stream()#停止音频输入流
output_stream.stop_stream()#停止音频输出流
input_stream.close() #关闭音频输入流
output_stream.close() #关闭音频输出流
p.terminate() #释放音频对象
MediaManager.deinit() #释放vb buffer
if __name__ == "__main__":
os.exitpoint(os.EXITPOINT_ENABLE)
print("audio sample start")
play_audio('/sdcard/app/output.wav') #播放wav文件
#record_audio('/sdcard/app/output.wav', 15) #录制wav文件
loop_audio(15) #采集音频并输出
print("audio sample done")
实验结果:
注意:需要用一个有线耳机插入到K230的3.5mm音频口,才可听到采集到的音频。
下图1为执行音频采集代码之前“CanMV\sdcard\app”目录的文件情况,图2为执行代码之后的目录文件,可以看到成功采集到“output.wav”的音频文件。
接着注释掉音频录制代码,执行音频播放代码,如下图,可以从有线耳机听到刚刚采集到的音频。
2、音频编码与解码
为了高效存储和传输音频数据,通过压缩技术减少文件大小和带宽需求,同时保持或优化音频质量,确保在不同设备和平台上的兼容性和一致性,因此需要音频编码与解码。G.711 是一种常用的音频编码标准,主要用于电话通信系统中。它通过脉冲编码调制(PCM)技术将模拟音频信号转换为数字信号,并进行量化和编码。G.711 编码的目的是在保持较高音频质量的同时,减少数据传输所需的带宽。K230的音频编码与解码采用G.711标准。
具体代码思路如下:
初始化模块:
导入必要的模块,包括 os、media、pyaudio 和 g711。
初始化媒体管理和音频对象。
异常处理:
定义 exit_check 函数,用于捕获键盘中断(Ctrl+C)并优雅地退出程序。
音频采集与编码:
定义 encode_audio 函数,用于采集音频数据并将其编码为 G.711 格式,然后保存到文件中。
设置音频参数,如采样率、采样精度、声道数和 chunk 大小。
打开音频输入流,读取音频数据并进行 G.711 编码,将编码后的数据保存到文件中。
音频解码与播放:
定义 decode_audio 函数,用于从文件中读取 G.711 编码的音频数据,解码为原始音频数据并播放。
打开 G.711 文件,读取音频参数。
打开音频输出流,读取 G.711 文件中的数据并解码为原始音频数据,然后写入输出流。
实时编码与解码回放:
定义 loop_codec 函数,用于实时采集音频数据,进行 G.711 编码和解码,然后立即播放。
设置音频参数,打开音频输入流和输出流。
从输入流中读取音频数据,进行 G.711 编码和解码,然后写入输出流。
主程序:
在 __main__ 块中,启用退出点,启动音频示例。
调用 encode_audio、decode_audio 或 loop_codec 函数来执行相应的音频操作。
参考代码如下:
# g711 encode/decode example
#
# Note: You will need an SD card to run this example.
#
# You can collect raw data and encode it into g711 or decode it into raw data output.
import os
from mpp.payload_struct import * #导入payload模块,用于获取音视频编解码类型
from media.media import * #导入media模块,用于初始化vb buffer
from media.pyaudio import * #导入pyaudio模块,用于采集和播放音频
import media.g711 as g711 #导入g711模块,用于g711编解码
def exit_check():
try:
os.exitpoint()
except KeyboardInterrupt as e:
print("user stop: ", e)
return True
return False
def encode_audio(filename, duration):
CHUNK = int(44100/25) #设置音频chunk值
FORMAT = paInt16 #设置采样精度
CHANNELS = 2 #设置声道数
RATE = 44100 #设置采样率
try:
p = PyAudio()
p.initialize(CHUNK) #初始化PyAudio对象
enc = g711.Encoder(K_PT_G711A,CHUNK) #创建g711编码器对象
MediaManager.init() #vb buffer初始化
enc.create() #创建编码器
#创建音频输入流
stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
frames = []
#采集音频数据编码并存入列表
for i in range(0, int(RATE / CHUNK * duration)):
frame_data = stream.read() #从音频输入流中读取音频数据
data = enc.encode(frame_data) #编码音频数据为g711
frames.append(data) #将g711编码数据保存到列表中
if exit_check():
break
#将g711编码数据存入文件中
with open(filename,mode='w') as wf:
wf.write(b''.join(frames))
stream.stop_stream() #停止音频输入流
stream.close() #关闭音频输入流
p.terminate() #释放音频对象
enc.destroy() #销毁g711音频编码器
except BaseException as e:
print(f"Exception {e}")
finally:
MediaManager.deinit() #释放vb buffer
def decode_audio(filename):
FORMAT = paInt16 #设置音频chunk值
CHANNELS = 2 #设置声道数
RATE = 44100 #设置采样率
CHUNK = int(RATE/25) #设置音频chunk值
try:
wf = open(filename,mode='rb') #打开g711文件
p = PyAudio()
p.initialize(CHUNK) #初始化PyAudio对象
dec = g711.Decoder(K_PT_G711A,CHUNK) #创建g711解码器对象
MediaManager.init() #vb buffer初始化
dec.create() #创建解码器
#创建音频输出流
stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
output=True,
frames_per_buffer=CHUNK)
stream_len = CHUNK*CHANNELS*2//2 #设置每次读取的g711数据流长度
stream_data = wf.read(stream_len) #从g711文件中读取数据
#解码g711文件并播放
while stream_data:
frame_data = dec.decode(stream_data) #解码g711文件
stream.write(frame_data) #播放raw数据
stream_data = wf.read(stream_len) #从g711文件中读取数据
if exit_check():
break
stream.stop_stream() #停止音频输入流
stream.close() #关闭音频输入流
p.terminate() #释放音频对象
dec.destroy() #销毁解码器
wf.close() #关闭g711文件
except BaseException as e:
print(f"Exception {e}")
finally:
MediaManager.deinit() #释放vb buffer
def loop_codec(duration):
CHUNK = int(44100/25) #设置音频chunk值
FORMAT = paInt16 #设置采样精度
CHANNELS = 2 #设置声道数
RATE = 44100 #设置采样率
try:
p = PyAudio()
p.initialize(CHUNK) #初始化PyAudio对象
dec = g711.Decoder(K_PT_G711A,CHUNK) #创建g711解码器对象
enc = g711.Encoder(K_PT_G711A,CHUNK) #创建g711编码器对象
MediaManager.init() #vb buffer初始化
dec.create() #创建g711解码器
enc.create() #创建g711编码器
#创建音频输入流
input_stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
#创建音频输出流
output_stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
output=True,
frames_per_buffer=CHUNK)
#从音频输入流中获取数据->编码->解码->写入到音频输出流中
for i in range(0, int(RATE / CHUNK * duration)):
frame_data = input_stream.read() #从音频输入流中获取raw音频数据
stream_data = enc.encode(frame_data) #编码音频数据为g711
frame_data = dec.decode(stream_data) #解码g711数据为raw数据
output_stream.write(frame_data) #播放raw数据
if exit_check():
break
input_stream.stop_stream() #停止音频输入流
output_stream.stop_stream() #停止音频输出流
input_stream.close() #关闭音频输入流
output_stream.close() #关闭音频输出流
p.terminate() #释放音频对象
dec.destroy() #销毁g711解码器
enc.destroy() #销毁g711编码器
except BaseException as e:
print(f"Exception {e}")
finally:
MediaManager.deinit() #释放vb buffer
if __name__ == "__main__":
os.exitpoint(os.EXITPOINT_ENABLE)
print("audio codec sample start")
#encode_audio('/sdcard/app/test.g711a', 5) #采集并编码g711文件
#decode_audio('/sdcard/app/test.g711a') #解码g711文件并输出
loop_codec(15) #采集音频数据->编码g711->解码g711->播放音频
print("audio codec sample done")
实验结果:
音频采集编码结果如下,生成“test.g711a”编码文件。
解码并播放代码如下,可以从耳机听到解码后的音频。
3、视频采集
K230自带三个摄像头接口CSI0、CSI1、CSI2,如下图,可以用来图像识别,拍照,录像等功能,下面用K230录取一段视频保存在内存卡中。
具体代码思路如下:
初始化模块:
导入必要的模块,包括 media.mp4format 和 os。
初始化 MP4 容器和配置对象。
配置 MP4 容器:
设置 MP4 文件的路径、视频编码格式、分辨率和音频编码格式。
创建和启动 MP4 复用器(muxer)。
处理音视频数据:
在一个循环中,调用 MP4 复用器的 Process 方法,将音视频数据写入 MP4 文件。
控制循环次数,达到一定帧数后停止。
停止和销毁 MP4 复用器:
停止 MP4 复用器。
销毁 MP4 复用器,释放资源。
异常处理:
捕获并处理可能发生的异常。
参考代码如下:
# Save MP4 file example
#
# Note: You will need an SD card to run this example.
#
# You can capture audio and video and save them as MP4.The current version only supports MP4 format, video supports 264/265, and audio supports g711a/g711u.
from media.mp4format import *
import os
def mp4_muxer_test():
print("mp4_muxer_test start")
width = 1280
height = 720
# 实例化mp4 container
mp4_muxer = Mp4Container()
mp4_cfg = Mp4CfgStr(mp4_muxer.MP4_CONFIG_TYPE_MUXER)
if mp4_cfg.type == mp4_muxer.MP4_CONFIG_TYPE_MUXER:
file_name = "/sdcard/app/tests/test.mp4"
mp4_cfg.SetMuxerCfg(file_name, mp4_muxer.MP4_CODEC_ID_H265, width, height, mp4_muxer.MP4_CODEC_ID_G711U)
# 创建mp4 muxer
mp4_muxer.Create(mp4_cfg)
# 启动mp4 muxer
mp4_muxer.Start()
frame_count = 0
try:
while True:
os.exitpoint()
# 处理音视频数据,按MP4格式写入文件
mp4_muxer.Process()
frame_count += 1
print("frame_count = ", frame_count)
if frame_count >= 200:
break
except BaseException as e:
print(e)
# 停止mp4 muxer
mp4_muxer.Stop()
# 销毁mp4 muxer
mp4_muxer.Destroy()
print("mp4_muxer_test stop")
if __name__ == "__main__":
os.exitpoint(os.EXITPOINT_ENABLE)
mp4_muxer_test()
实验结果:
可以看到在“\CanMV\sdcard\app\tests”目录下成功生成一个名为“test.mp4”的文件,可以用电脑带的播放器点开播放,视频带有声音。
4、视频播放
视频播放的思路如下:
初始化模块:
导入必要的模块,包括 media.player 和 os。
定义全局变量 start_play 用于控制播放状态。
定义播放器事件回调函数:
定义 player_event 函数,用于处理播放器事件,特别是播放结束事件。
加载和播放 MP4 文件:
创建播放器对象。
加载 MP4 文件。
设置播放器事件回调函数。
开始播放 MP4 文件。
等待播放结束。
停止播放:
捕获并处理可能发生的异常。
停止播放器,释放资源。
参考代码如下:
# play mp4 file example
#
# Note: You will need an SD card to run this example.
#
# You can load local files to play. The current version only supports MP4 format, video supports 264/265, and audio supports g711a/g711u.
from media.player import * #导入播放器模块,用于播放mp4文件
import os
start_play = False #播放结束flag
def player_event(event,data):
global start_play
if(event == K_PLAYER_EVENT_EOF): #播放结束标识
start_play = False #设置播放结束标识
def play_mp4_test(filename):
global start_play
player=Player() #创建播放器对象
player.load(filename) #加载mp4文件
player.set_event_callback(player_event) #设置播放器事件回调
player.start() #开始播放
start_play = True
#等待播放结束
try:
while(start_play):
time.sleep(0.1)
os.exitpoint()
except KeyboardInterrupt as e:
print("user stop: ", e)
except BaseException as e:
sys.print_exception(e)
player.stop() #停止播放
print("play over")
if __name__ == "__main__":
os.exitpoint(os.EXITPOINT_ENABLE)
play_mp4_test("/sdcard/app/tests/test.mp4")#播放mp4文件
实验结果:
5、视频编码
视频编码与解码是数字视频处理中不可或缺的步骤,通过压缩技术减少视频文件的大小和传输带宽需求,同时保持或优化视频质量,确保高效存储、快速传输和跨平台兼容性。编码将原始视频数据转换为压缩格式,解码则将压缩数据还原为可播放的视频,二者共同保证了视频内容的高效分发和流畅播放。
嘉楠K230内置多个高清视频图像输入处理和智能硬件处理单元,兼顾高性能、低功耗( 采用大小核设计兼顾性能与功耗,提供百毫秒级快速启动软件SDK支持,适合电池类产品开发)和高安全性特点。
使用 Python 和 media 模块来捕获视频数据并将其编码为 H.264 或 H.265 格式的文件。
视频编码的思路如下:
初始化模块:
导入必要的模块,包括 media.vencoder、media.sensor、media.media 和 os。
初始化传感器(相机)和视频编码器。
配置传感器:
重置传感器。
设置传感器的输出分辨率和格式。
实例化和配置视频编码器:
创建视频编码器对象。
设置视频编码器的输出缓冲区。
绑定传感器和视频编码器。
启动传感器和编码器:
初始化媒体管理器。
创建和启动视频编码器。
启动传感器。
处理视频数据:
在一个循环中,从视频编码器获取编码后的码流数据,并将其写入文件。
控制循环次数,达到一定帧数后停止。
停止和销毁编码器:
停止传感器和编码器。
销毁传感器和编码器的绑定。
停止和销毁编码器,释放资源。
异常处理:
捕获并处理可能发生的异常。
参考代码如下:
# Video encode example
#
# Note: You will need an SD card to run this example.
#
# You can capture videos and encode them into 264 files
from media.vencoder import *
from media.sensor import *
from media.media import *
import time, os
# NOT WORK NOW!!!
def venc_test():
print("venc_test start")
width = 1280
height = 720
venc_chn = VENC_CHN_ID_0
width = ALIGN_UP(width, 16)
# 初始化sensor
sensor = Sensor()
sensor.reset()
# 设置camera 输出buffer
# set chn0 output size
sensor.set_framesize(width = width, height = height, alignment=12)
# set chn0 output format
sensor.set_pixformat(Sensor.YUV420SP)
# 实例化video encoder
encoder = Encoder()
# 设置video encoder 输出buffer
encoder.SetOutBufs(venc_chn, 15, width, height)
# 绑定camera和venc
link = MediaManager.link(sensor.bind_info()['src'], (VIDEO_ENCODE_MOD_ID, VENC_DEV_ID, venc_chn))
# init media manager
MediaManager.init()
chnAttr = ChnAttrStr(encoder.PAYLOAD_TYPE_H265, encoder.H265_PROFILE_MAIN, width, height)
streamData = StreamData()
# 创建编码器
encoder.Create(venc_chn, chnAttr)
# 开始编码
encoder.Start(venc_chn)
# 启动camera
sensor.run()
frame_count = 0
if chnAttr.payload_type == encoder.PAYLOAD_TYPE_H265:
suffix = "265"
elif chnAttr.payload_type == encoder.PAYLOAD_TYPE_H264:
suffix = "264"
else:
suffix = "unkown"
print("cam_venc_test, venc payload_type unsupport")
out_file = f"/sdcard/app/tests/venc_chn_{venc_chn:02d}.{suffix}"
print("save stream to file: ", out_file)
with open(out_file, "wb") as fo:
try:
while True:
os.exitpoint()
encoder.GetStream(venc_chn, streamData) # 获取一帧码流
for pack_idx in range(0, streamData.pack_cnt):
stream_data = uctypes.bytearray_at(streamData.data[pack_idx], streamData.data_size[pack_idx])
fo.write(stream_data) # 码流写文件
print("stream size: ", streamData.data_size[pack_idx], "stream type: ", streamData.stream_type[pack_idx])
encoder.ReleaseStream(venc_chn, streamData) # 释放一帧码流
frame_count += 1
if frame_count >= 100:
break
except KeyboardInterrupt as e:
print("user stop: ", e)
except BaseException as e:
sys.print_exception(e)
# 停止camera
sensor.stop()
# 销毁camera和venc的绑定
del link
# 停止编码
encoder.Stop(venc_chn)
# 销毁编码器
encoder.Destroy(venc_chn)
# 清理buffer
MediaManager.deinit()
print("venc_test stop")
if __name__ == "__main__":
os.exitpoint(os.EXITPOINT_ENABLE)
venc_test()
实验结果:
在”This PC\CanMV\sdcard\app\tests”目录下生成名为“venc_chnn_00.265”的编码文件。
- 2024-11-15
-
发表了主题帖:
嘉楠K230AI开发板测评8--人脸3D网络、人体关键点、车牌识别、字符识别、物体识别
本帖最后由 dfjs 于 2024-11-15 22:43 编辑
嘉楠科K230AI开发板测评8--AI视觉篇
观察K230文件系统,AI视觉开发框架主要API接口代码位于“\CanMV\sdcard\app\libs”目录下,如下图:
PineLine :用于采集图像、画图以及结果图片显示的API接口。
Ai2d : 预处理(Preprocess)相关接口。
AIBase : 模型推理主要接口。
同时可以看到官方训练好的模型(后缀为.kmodel)位于“\CanMV\sdcard\app\tests\kmodel”目录下,包括人脸相关模型、人体相关模型、手部相关模型、车牌相关模型、字符识别模型、物体检测模型等,当然,也可以将自己训练的模型放在该目录下调用,如下图:
同时为了下面更好的学习,在这里我搜索了目标检测算法常用的三个重要的概念:置信度阈值(Confidence Threshold)、非极大值抑制阈值(NMS)和锚点数据(Anchors),置信度阈值用于过滤检测结果,NMS阈值用于去除重叠的检测框,而锚点数据则是模型预测目标位置的基础。这三个参数共同作用,使得目标检测算法能够准确地识别和定位目标。它们在检测过程中起到关键作用,在后面的推理过程经常用到。
置信度阈值(Confidence Threshold):
含义:置信度阈值用于过滤模型输出的检测结果。在目标检测模型中,模型会为每个目标输出一个置信度分数,表示模型认为检测到的目标属于某个类别的确定程度。
作用:置信度阈值用于确定一个检测结果是否足够可靠。只有当检测结果的置信度分数高于这个阈值时,该结果才会被认为是有效的检测。低于阈值的结果将被忽略。这有助于减少误检和提高检测的准确性。
非极大值抑制阈值(NMS):
含义:非极大值抑制是一种常用的技术,用于在目标检测中去除重叠的检测框。在实际场景中,同一个目标可能被模型多次检测到,产生多个边界框。NMS通过合并重叠的边界框来解决这个问题。
作用:NMS阈值决定了两个检测框需要有多大的重叠(通常是通过交并比IoU来衡量)才会被认为是同一个目标。如果两个检测框的IoU高于NMS阈值,那么置信度较低的检测框将被抑制(即删除),只保留置信度最高的那个。这有助于减少冗余的检测结果,提高检测的精确度。
锚点数据(Anchors):
含义:锚点(也称为先验框)是目标检测算法中用于预测目标位置的一种技术。锚点是一组预定义的边界框,它们有不同的尺寸和比例,用于覆盖目标可能出现的各种尺寸。
作用:在基于锚点的目标检测算法(如Faster R-CNN、SSD等)中,模型会预测每个锚点的偏移量,以调整锚点的位置和尺寸,使其更准确地匹配目标。锚点数据包含了这些预定义边界框的坐标和尺寸,它们是模型预测的基础。
1、人脸相关
人脸3D网格,在检测到人脸后用多个点描绘整个脸,从而把人脸轮廓像网格一样描绘出来,支持单个和多个人脸。
通过CanMV K230 AI视觉框架开发,用到的模型已经存放在CanMV K230的文件系统,具体编程思路如下:
自定义人脸检测人物类、人脸网络任务类、人脸网络后处理任务类、3D人脸网络类。
人脸检测任务类(FaceDetApp):
作用:
负责执行人脸检测任务。
功能:
加载人脸检测模型。
配置图像预处理操作,如填充(pad)和缩放(resize)。
执行模型推理。
进行后处理,包括置信度过滤和非极大值抑制(NMS)。
人脸网格任务类(FaceMeshApp)
作用:
负责执行人脸网格估计任务。
功能:
加载人脸网格模型。
配置图像预处理操作,如裁剪(crop)和缩放(resize)。
执行模型推理。
后处理,将模型输出的参数映射回人脸网格的参数。
人脸网格后处理任务类(FaceMeshPostApp)
作用:
负责对人脸网格估计的结果进行后处理。
功能:
接收人脸网格模型的输出参数。
将参数转换为最终的人脸网格。
调用aidemo库的接口进行进一步的后处理,如人脸网格的绘制。
3D人脸网格类(FaceMesh)
作用:
整合人脸检测、人脸网格估计和后处理流程。
功能:
初始化人脸检测、人脸网格和人脸网格后处理的实例。
运行整个3D人脸网格流程,包括人脸检测、人脸网格估计和后处理。
绘制最终的3D人脸网格结果。
主函数思路:
初始化显示模式和分辨率:根据平台选择HDMI或LCD模式,并设置相应的分辨率。
设置模型路径和其他参数:指定人脸检测、人脸网格和人脸网格后处理模型的路径,以及其他参数如锚点数据、输入分辨率等。
初始化PipeLine:创建一个图像处理流程的实例,设置传给AI的图像分辨率和显示分辨率。
创建3D人脸网格实例:初始化FaceMesh类,传入所需的模型路径和参数。
主循环:
获取当前帧图像。
调用FaceMesh类的run方法进行人脸检测、人脸网格估计和后处理。
打印检测和网格估计的结果。
调用FaceMesh类的draw_result方法绘制3D人脸网格结果。
显示推理效果。
进行垃圾回收。
打印帧率信息。
异常处理:捕获并打印异常信息。
资源清理:在finally块中释放资源,包括反初始化人脸检测、人脸网格和人脸网格后处理实例,以及销毁PipeLine实例。
参考代码如下:
'''
实验名称:人脸3D网格
实验平台:01Studio CanMV K230
教程:wiki.01studio.cc
'''
from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import os
import ujson
from media.media import *
from time import *
import nncase_runtime as nn
import ulab.numpy as np
import time
import image
import aidemo
import random
import gc
import sys
# 自定义人脸检测任务类
class FaceDetApp(AIBase):
def __init__(self,kmodel_path,model_input_size,anchors,confidence_threshold=0.25,nms_threshold=0.3,rgb888p_size=[1280,720],display_size=[1920,1080],debug_mode=0):
super().__init__(kmodel_path,model_input_size,rgb888p_size,debug_mode)
# kmodel路径
self.kmodel_path=kmodel_path
# 检测模型输入分辨率
self.model_input_size=model_input_size
# 置信度阈值
self.confidence_threshold=confidence_threshold
# nms阈值
self.nms_threshold=nms_threshold
# 检测任务锚框
self.anchors=anchors
# sensor给到AI的图像分辨率,宽16字节对齐
self.rgb888p_size=[ALIGN_UP(rgb888p_size[0],16),rgb888p_size[1]]
# 视频输出VO分辨率,宽16字节对齐
self.display_size=[ALIGN_UP(display_size[0],16),display_size[1]]
# debug模式
self.debug_mode=debug_mode
# 实例化Ai2d,用于实现模型预处理
self.ai2d=Ai2d(debug_mode)
# 设置Ai2d的输入输出格式和类型
self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT,nn.ai2d_format.NCHW_FMT,np.uint8, np.uint8)
# 配置预处理操作,这里使用了pad和resize,Ai2d支持crop/shift/pad/resize/affine,具体代码请打开/sdcard/app/libs/AI2D.py查看
def config_preprocess(self,input_image_size=None):
with ScopedTiming("set preprocess config",self.debug_mode > 0):
# 初始化ai2d预处理配置,默认为sensor给到AI的尺寸,可以通过设置input_image_size自行修改输入尺寸
ai2d_input_size=input_image_size if input_image_size else self.rgb888p_size
# 设置padding预处理
self.ai2d.pad(self.get_pad_param(), 0, [104,117,123])
# 设置resize预处理
self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
# 构建预处理流程,参数为预处理输入tensor的shape和预处理输出的tensor的shape
self.ai2d.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,self.model_input_size[1],self.model_input_size[0]])
# 自定义后处理,results是模型推理输出的array列表,这里使用了aidemo库的face_det_post_process接口
def postprocess(self,results):
with ScopedTiming("postprocess",self.debug_mode > 0):
res = aidemo.face_det_post_process(self.confidence_threshold,self.nms_threshold,self.model_input_size[0],self.anchors,self.rgb888p_size,results)
if len(res)==0:
return res
else:
return res[0]
# padding参数计算
def get_pad_param(self):
dst_w = self.model_input_size[0]
dst_h = self.model_input_size[1]
# 计算最小的缩放比例,等比例缩放
ratio_w = dst_w / self.rgb888p_size[0]
ratio_h = dst_h / self.rgb888p_size[1]
if ratio_w < ratio_h:
ratio = ratio_w
else:
ratio = ratio_h
new_w = (int)(ratio * self.rgb888p_size[0])
new_h = (int)(ratio * self.rgb888p_size[1])
dw = (dst_w - new_w) / 2
dh = (dst_h - new_h) / 2
top = (int)(round(0))
bottom = (int)(round(dh * 2 + 0.1))
left = (int)(round(0))
right = (int)(round(dw * 2 - 0.1))
return [0,0,0,0,top, bottom, left, right]
# 自定义人脸网格任务类
class FaceMeshApp(AIBase):
def __init__(self,kmodel_path,model_input_size,rgb888p_size=[1920,1080],display_size=[1920,1080],debug_mode=0):
super().__init__(kmodel_path,model_input_size,rgb888p_size,debug_mode)
# kmodel路径
self.kmodel_path=kmodel_path
# 人脸网格模型输入分辨率
self.model_input_size=model_input_size
# sensor给到AI的图像分辨率,宽16字节对齐
self.rgb888p_size=[ALIGN_UP(rgb888p_size[0],16),rgb888p_size[1]]
# 视频输出VO分辨率,宽16字节对齐
self.display_size=[ALIGN_UP(display_size[0],16),display_size[1]]
# debug模式
self.debug_mode=debug_mode
# 人脸mesh参数均值
self.param_mean = np.array([0.0003492636315058917,2.52790130161884e-07,-6.875197868794203e-07,60.1679573059082,-6.295513230725192e-07,0.0005757200415246189,-5.085391239845194e-05,74.2781982421875,5.400917189035681e-07,6.574138387804851e-05,0.0003442012530285865,-66.67157745361328,-346603.6875,-67468.234375,46822.265625,-15262.046875,4350.5888671875,-54261.453125,-18328.033203125,-1584.328857421875,-84566.34375,3835.960693359375,-20811.361328125,38094.9296875,-19967.85546875,-9241.3701171875,-19600.71484375,13168.08984375,-5259.14404296875,1848.6478271484375,-13030.662109375,-2435.55615234375,-2254.20654296875,-14396.5615234375,-6176.3291015625,-25621.919921875,226.39447021484375,-6326.12353515625,-10867.2509765625,868.465087890625,-5831.14794921875,2705.123779296875,-3629.417724609375,2043.9901123046875,-2446.6162109375,3658.697021484375,-7645.98974609375,-6674.45263671875,116.38838958740234,7185.59716796875,-1429.48681640625,2617.366455078125,-1.2070955038070679,0.6690792441368103,-0.17760828137397766,0.056725528091192245,0.03967815637588501,-0.13586315512657166,-0.09223993122577667,-0.1726071834564209,-0.015804484486579895,-0.1416848599910736],dtype=np.float)
# 人脸mesh参数方差
self.param_std = np.array([0.00017632152594160289,6.737943476764485e-05,0.00044708489440381527,26.55023193359375,0.0001231376954820007,4.493021697271615e-05,7.923670636955649e-05,6.982563018798828,0.0004350444069132209,0.00012314890045672655,0.00017400001524947584,20.80303955078125,575421.125,277649.0625,258336.84375,255163.125,150994.375,160086.109375,111277.3046875,97311.78125,117198.453125,89317.3671875,88493.5546875,72229.9296875,71080.2109375,50013.953125,55968.58203125,47525.50390625,49515.06640625,38161.48046875,44872.05859375,46273.23828125,38116.76953125,28191.162109375,32191.4375,36006.171875,32559.892578125,25551.1171875,24267.509765625,27521.3984375,23166.53125,21101.576171875,19412.32421875,19452.203125,17454.984375,22537.623046875,16174.28125,14671.640625,15115.6884765625,13870.0732421875,13746.3125,12663.1337890625,1.5870834589004517,1.5077009201049805,0.5881357789039612,0.5889744758605957,0.21327851712703705,0.2630201280117035,0.2796429395675659,0.38030216097831726,0.16162841022014618,0.2559692859649658],dtype=np.float)
# 实例化Ai2d,用于实现模型预处理
self.ai2d=Ai2d(debug_mode)
# 设置Ai2d的输入输出格式和类型
self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT,nn.ai2d_format.NCHW_FMT,np.uint8, np.uint8)
# 配置预处理操作,这里使用了crop和resize,Ai2d支持crop/shift/pad/resize/affine,具体代码请打开/sdcard/app/libs/AI2D.py查看
def config_preprocess(self,det,input_image_size=None):
with ScopedTiming("set preprocess config",self.debug_mode > 0):
# 初始化ai2d预处理配置,默认为sensor给到AI的尺寸,可以通过设置input_image_size自行修改输入尺寸
ai2d_input_size=input_image_size if input_image_size else self.rgb888p_size
# 计算crop参数,并设置crop预处理
roi = self.parse_roi_box_from_bbox(det)
self.ai2d.crop(int(roi[0]),int(roi[1]),int(roi[2]),int(roi[3]))
# 设置resize预处理
self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
# 构建预处理流程,参数为预处理输入tensor的shape和预处理输出的tensor的shape
self.ai2d.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,self.model_input_size[1],self.model_input_size[0]])
return roi
# 自定义后处理,results是模型输出的array列表
def postprocess(self,results):
with ScopedTiming("postprocess",self.debug_mode > 0):
param = results[0] * self.param_std + self.param_mean
return param
def parse_roi_box_from_bbox(self,bbox):
# 获取人脸roi
x1, y1, w, h = map(lambda x: int(round(x, 0)), bbox[:4])
old_size = (w + h) / 2
center_x = x1 + w / 2
center_y = y1 + h / 2 + old_size * 0.14
size = int(old_size * 1.58)
x0 = center_x - float(size) / 2
y0 = center_y - float(size) / 2
x1 = x0 + size
y1 = y0 + size
x0 = max(0, min(x0, self.rgb888p_size[0]))
y0 = max(0, min(y0, self.rgb888p_size[1]))
x1 = max(0, min(x1, self.rgb888p_size[0]))
y1 = max(0, min(y1, self.rgb888p_size[1]))
roi = (x0, y0, x1 - x0, y1 - y0)
return roi
# 自定义人脸网格后处理任务类
class FaceMeshPostApp(AIBase):
def __init__(self,kmodel_path,model_input_size,rgb888p_size=[1920,1080],display_size=[1920,1080],debug_mode=0):
super().__init__(kmodel_path,model_input_size,rgb888p_size,debug_mode)
# kmodel路径
self.kmodel_path=kmodel_path
# 人脸网格模型输入分辨率
self.model_input_size=model_input_size
# sensor给到AI的图像分辨率,宽16字节对齐
self.rgb888p_size=[ALIGN_UP(rgb888p_size[0],16),rgb888p_size[1]]
# 视频输出VO分辨率,宽16字节对齐
self.display_size=[ALIGN_UP(display_size[0],16),display_size[1]]
# debug模式
self.debug_mode=debug_mode
# 实例化Ai2d,用于实现模型预处理
self.ai2d=Ai2d(debug_mode)
# 设置Ai2d的输入输出格式和类型
self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT,nn.ai2d_format.NCHW_FMT,np.uint8, np.uint8)
# 重写预处理函数preprocess,因为该模型的预处理不是单纯调用一个ai2d能实现的,返回模型输入的tensor列表
def preprocess(self,param):
with ScopedTiming("set preprocess config",self.debug_mode > 0):
# face mesh post模型预处理,param解析
param = param[0]
trans_dim, shape_dim, exp_dim = 12, 40, 10
R_ = param[:trans_dim].copy().reshape((3, -1))
R = R_[:, :3].copy()
offset = R_[:, 3].copy()
offset = offset.reshape((3, 1))
alpha_shp = param[trans_dim:trans_dim + shape_dim].copy().reshape((-1, 1))
alpha_exp = param[trans_dim + shape_dim:].copy().reshape((-1, 1))
R_tensor = nn.from_numpy(R)
offset_tensor = nn.from_numpy(offset)
alpha_shp_tensor = nn.from_numpy(alpha_shp)
alpha_exp_tensor = nn.from_numpy(alpha_exp)
return [R_tensor,offset_tensor,alpha_shp_tensor,alpha_exp_tensor]
# 自定义模型后处理,这里调用了aidemo的face_mesh_post_process接口
def postprocess(self,results,roi):
with ScopedTiming("postprocess",self.debug_mode > 0):
x, y, w, h = map(lambda x: int(round(x, 0)), roi[:4])
x = x * self.display_size[0] // self.rgb888p_size[0]
y = y * self.display_size[1] // self.rgb888p_size[1]
w = w * self.display_size[0] // self.rgb888p_size[0]
h = h * self.display_size[1] // self.rgb888p_size[1]
roi_array = np.array([x,y,w,h],dtype=np.float)
aidemo.face_mesh_post_process(roi_array,results[0])
return results[0]
# 3D人脸网格
class FaceMesh:
def __init__(self,face_det_kmodel,face_mesh_kmodel,mesh_post_kmodel,det_input_size,mesh_input_size,anchors,confidence_threshold=0.25,nms_threshold=0.3,rgb888p_size=[1920,1080],display_size=[1920,1080],debug_mode=0):
# 人脸检测模型路径
self.face_det_kmodel=face_det_kmodel
# 人脸3D网格模型路径
self.face_mesh_kmodel=face_mesh_kmodel
# 人脸3D网格后处理模型路径
self.mesh_post_kmodel=mesh_post_kmodel
# 人脸检测模型输入分辨率
self.det_input_size=det_input_size
# 人脸3D网格模型输入分辨率
self.mesh_input_size=mesh_input_size
# anchors
self.anchors=anchors
# 置信度阈值
self.confidence_threshold=confidence_threshold
# nms阈值
self.nms_threshold=nms_threshold
# sensor给到AI的图像分辨率,宽16字节对齐
self.rgb888p_size=[ALIGN_UP(rgb888p_size[0],16),rgb888p_size[1]]
# 视频输出VO分辨率,宽16字节对齐
self.display_size=[ALIGN_UP(display_size[0],16),display_size[1]]
# debug_mode模式
self.debug_mode=debug_mode
# 人脸检测实例
self.face_det=FaceDetApp(self.face_det_kmodel,model_input_size=self.det_input_size,anchors=self.anchors,confidence_threshold=self.confidence_threshold,nms_threshold=self.nms_threshold,rgb888p_size=self.rgb888p_size,display_size=self.display_size,debug_mode=0)
# 人脸网格实例
self.face_mesh=FaceMeshApp(self.face_mesh_kmodel,model_input_size=self.mesh_input_size,rgb888p_size=self.rgb888p_size,display_size=self.display_size)
# 人脸网格后处理实例
self.face_mesh_post=FaceMeshPostApp(self.mesh_post_kmodel,model_input_size=self.mesh_input_size,rgb888p_size=self.rgb888p_size,display_size=self.display_size)
# 人脸检测预处理配置
self.face_det.config_preprocess()
# run函数
def run(self,input_np):
# 执行人脸检测
det_boxes=self.face_det.run(input_np)
mesh_res=[]
for det_box in det_boxes:
# 对检测到的每一个人脸配置预处理,执行人脸网格和人脸网格后处理
roi=self.face_mesh.config_preprocess(det_box)
param=self.face_mesh.run(input_np)
tensors=self.face_mesh_post.preprocess(param)
results=self.face_mesh_post.inference(tensors)
res=self.face_mesh_post.postprocess(results,roi)
mesh_res.append(res)
return det_boxes,mesh_res
# 绘制人脸解析效果
def draw_result(self,pl,dets,mesh_res):
pl.osd_img.clear()
if dets:
draw_img_np = np.zeros((self.display_size[1],self.display_size[0],4),dtype=np.uint8)
draw_img = image.Image(self.display_size[0], self.display_size[1], image.ARGB8888, alloc=image.ALLOC_REF,data = draw_img_np)
for vertices in mesh_res:
aidemo.face_draw_mesh(draw_img_np, vertices)
pl.osd_img.copy_from(draw_img)
if __name__=="__main__":
# 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
display_mode="lcd"
if display_mode=="hdmi":
display_size=[1920,1080]
else:
display_size=[800,480]
# 人脸检测模型路径
face_det_kmodel_path="/sdcard/app/tests/kmodel/face_detection_320.kmodel"
# 人脸网格模型路径
face_mesh_kmodel_path="/sdcard/app/tests/kmodel/face_alignment.kmodel"
# 人脸网格后处理模型路径
face_mesh_post_kmodel_path="/sdcard/app/tests/kmodel/face_alignment_post.kmodel"
# 其他参数
anchors_path="/sdcard/app/tests/utils/prior_data_320.bin"
rgb888p_size=[1920,1080]
face_det_input_size=[320,320]
face_mesh_input_size=[120,120]
confidence_threshold=0.5
nms_threshold=0.2
anchor_len=4200
det_dim=4
anchors = np.fromfile(anchors_path, dtype=np.float)
anchors = anchors.reshape((anchor_len,det_dim))
# 初始化PipeLine,只关注传给AI的图像分辨率,显示的分辨率
pl=PipeLine(rgb888p_size=rgb888p_size,display_size=display_size,display_mode=display_mode)
pl.create()
fm=FaceMesh(face_det_kmodel_path,face_mesh_kmodel_path,face_mesh_post_kmodel_path,det_input_size=face_det_input_size,mesh_input_size=face_mesh_input_size,anchors=anchors,confidence_threshold=confidence_threshold,nms_threshold=nms_threshold,rgb888p_size=rgb888p_size,display_size=display_size)
clock = time.clock()
try:
while True:
os.exitpoint()
clock.tick()
img=pl.get_frame() # 获取当前帧
det_boxes,mesh_res=fm.run(img) # 推理当前帧
print(det_boxes,mesh_res) # 打印结果
fm.draw_result(pl,det_boxes,mesh_res) # 绘制推理结果
pl.show_image() # 显示推理效果
gc.collect()
print(clock.fps()) #打印帧率
except Exception as e:
sys.print_exception(e)
finally:
fm.face_det.deinit()
fm.face_mesh.deinit()
fm.face_mesh_post.deinit()
pl.destroy()
可以看到使用默认配置后只使用了4行代码便实现了获取当前帧图像、AI推理、绘制结果、显示结果 的识别流程。代码中det_boxes变量为人脸检测结果, mesh_res为网格点数据。
...
while True:
os.exitpoint()
clock.tick()
img=pl.get_frame() # 获取当前帧
det_boxes,mesh_res=fm.run(img) # 推理当前帧
print(det_boxes,mesh_res) # 打印结果
fm.draw_result(pl,det_boxes,mesh_res) # 绘制推理结果
pl.show_image() # 显示推理效果
gc.collect()
print(clock.fps()) #打印帧率
...
实验结果如下:
2、人体相关
人体关键点检测是指标注出人体关节等关键信息,分析人体姿态、运动轨迹、动作角度等。检测摄像头拍摄到的画面中的人体关键点并通过画图提示。
本实验通过CanMV K230 AI视觉框架开发,用到的模型已经存放在CanMV K230的文件系统,无需额外拷贝。
具体编程思路如下:
自定义人体关键点检测类。
人体关键点检测类(PersonKeyPointApp):
初始化(__init__):
设置模型路径、模型输入尺寸、置信度阈值、非极大值抑制(NMS)阈值、RGB图像尺寸、显示尺寸和调试模式。
初始化骨骼信息和关键点颜色,用于后续绘制关键点和骨骼。
预处理配置(config_preprocess):
配置Ai2d实例,用于实现模型预处理,包括填充(pad)和缩放(resize)操作。
后处理(postprocess):
对模型的推理结果进行后处理,使用aidemo库的person_kp_postprocess接口,根据置信度阈值和NMS阈值过滤和处理关键点。
绘制结果(draw_result):
将处理后的关键点和骨骼信息绘制到图像上,使用不同的颜色区分不同的骨骼和关键点。
计算填充参数(get_padding_param):
计算为了将输入图像调整到模型输入尺寸所需的填充参数。
主函数的思路:
设置显示模式和尺寸:
根据选择的显示模式(HDMI或LCD),设置显示尺寸。
初始化模型和参数:
设置模型路径、置信度阈值、NMS阈值和RGB图像尺寸。
创建PipeLine:
初始化PipeLine实例,用于管理图像的获取和显示。
初始化人体关键点检测实例:
创建PersonKeyPointApp类的实例,并配置预处理操作。
主循环:
在一个无限循环中,不断获取当前帧图像,进行推理,绘制结果,并显示。
使用clock对象来计算和打印帧率。
异常处理:
捕获异常,打印异常信息,并在退出前释放相关资源。
参考代码如下:
'''
实验名称:人体关键点检测
实验平台:01Studio CanMV K230
教程:wiki.01studio.cc
'''
from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import os
import ujson
from media.media import *
from time import *
import nncase_runtime as nn
import ulab.numpy as np
import time
import utime
import image
import random
import gc
import sys
import aidemo
# 自定义人体关键点检测类
class PersonKeyPointApp(AIBase):
def __init__(self,kmodel_path,model_input_size,confidence_threshold=0.2,nms_threshold=0.5,rgb888p_size=[1280,720],display_size=[1920,1080],debug_mode=0):
super().__init__(kmodel_path,model_input_size,rgb888p_size,debug_mode)
self.kmodel_path=kmodel_path
# 模型输入分辨率
self.model_input_size=model_input_size
# 置信度阈值设置
self.confidence_threshold=confidence_threshold
# nms阈值设置
self.nms_threshold=nms_threshold
# sensor给到AI的图像分辨率
self.rgb888p_size=[ALIGN_UP(rgb888p_size[0],16),rgb888p_size[1]]
# 显示分辨率
self.display_size=[ALIGN_UP(display_size[0],16),display_size[1]]
self.debug_mode=debug_mode
#骨骼信息
self.SKELETON = [(16, 14),(14, 12),(17, 15),(15, 13),(12, 13),(6, 12),(7, 13),(6, 7),(6, 8),(7, 9),(8, 10),(9, 11),(2, 3),(1, 2),(1, 3),(2, 4),(3, 5),(4, 6),(5, 7)]
#肢体颜色
self.LIMB_COLORS = [(255, 51, 153, 255),(255, 51, 153, 255),(255, 51, 153, 255),(255, 51, 153, 255),(255, 255, 51, 255),(255, 255, 51, 255),(255, 255, 51, 255),(255, 255, 128, 0),(255, 255, 128, 0),(255, 255, 128, 0),(255, 255, 128, 0),(255, 255, 128, 0),(255, 0, 255, 0),(255, 0, 255, 0),(255, 0, 255, 0),(255, 0, 255, 0),(255, 0, 255, 0),(255, 0, 255, 0),(255, 0, 255, 0)]
#关键点颜色,共17个
self.KPS_COLORS = [(255, 0, 255, 0),(255, 0, 255, 0),(255, 0, 255, 0),(255, 0, 255, 0),(255, 0, 255, 0),(255, 255, 128, 0),(255, 255, 128, 0),(255, 255, 128, 0),(255, 255, 128, 0),(255, 255, 128, 0),(255, 255, 128, 0),(255, 51, 153, 255),(255, 51, 153, 255),(255, 51, 153, 255),(255, 51, 153, 255),(255, 51, 153, 255),(255, 51, 153, 255)]
# Ai2d实例,用于实现模型预处理
self.ai2d=Ai2d(debug_mode)
# 设置Ai2d的输入输出格式和类型
self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT,nn.ai2d_format.NCHW_FMT,np.uint8, np.uint8)
# 配置预处理操作,这里使用了pad和resize,Ai2d支持crop/shift/pad/resize/affine,具体代码请打开/sdcard/app/libs/AI2D.py查看
def config_preprocess(self,input_image_size=None):
with ScopedTiming("set preprocess config",self.debug_mode > 0):
# 初始化ai2d预处理配置,默认为sensor给到AI的尺寸,您可以通过设置input_image_size自行修改输入尺寸
ai2d_input_size=input_image_size if input_image_size else self.rgb888p_size
top,bottom,left,right=self.get_padding_param()
self.ai2d.pad([0,0,0,0,top,bottom,left,right], 0, [0,0,0])
self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
self.ai2d.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,self.model_input_size[1],self.model_input_size[0]])
# 自定义当前任务的后处理
def postprocess(self,results):
with ScopedTiming("postprocess",self.debug_mode > 0):
# 这里使用了aidemo库的person_kp_postprocess接口
results = aidemo.person_kp_postprocess(results[0],[self.rgb888p_size[1],self.rgb888p_size[0]],self.model_input_size,self.confidence_threshold,self.nms_threshold)
return results
#绘制结果,绘制人体关键点
def draw_result(self,pl,res):
with ScopedTiming("display_draw",self.debug_mode >0):
if res[0]:
pl.osd_img.clear()
kpses = res[1]
for i in range(len(res[0])):
for k in range(17+2):
if (k < 17):
kps_x,kps_y,kps_s = round(kpses[i][k][0]),round(kpses[i][k][1]),kpses[i][k][2]
kps_x1 = int(float(kps_x) * self.display_size[0] // self.rgb888p_size[0])
kps_y1 = int(float(kps_y) * self.display_size[1] // self.rgb888p_size[1])
if (kps_s > 0):
pl.osd_img.draw_circle(kps_x1,kps_y1,5,self.KPS_COLORS[k],4)
ske = self.SKELETON[k]
pos1_x,pos1_y= round(kpses[i][ske[0]-1][0]),round(kpses[i][ske[0]-1][1])
pos1_x_ = int(float(pos1_x) * self.display_size[0] // self.rgb888p_size[0])
pos1_y_ = int(float(pos1_y) * self.display_size[1] // self.rgb888p_size[1])
pos2_x,pos2_y = round(kpses[i][(ske[1] -1)][0]),round(kpses[i][(ske[1] -1)][1])
pos2_x_ = int(float(pos2_x) * self.display_size[0] // self.rgb888p_size[0])
pos2_y_ = int(float(pos2_y) * self.display_size[1] // self.rgb888p_size[1])
pos1_s,pos2_s = kpses[i][(ske[0] -1)][2],kpses[i][(ske[1] -1)][2]
if (pos1_s > 0.0 and pos2_s >0.0):
pl.osd_img.draw_line(pos1_x_,pos1_y_,pos2_x_,pos2_y_,self.LIMB_COLORS[k],4)
gc.collect()
else:
pl.osd_img.clear()
# 计算padding参数
def get_padding_param(self):
dst_w = self.model_input_size[0]
dst_h = self.model_input_size[1]
input_width = self.rgb888p_size[0]
input_high = self.rgb888p_size[1]
ratio_w = dst_w / input_width
ratio_h = dst_h / input_high
if ratio_w < ratio_h:
ratio = ratio_w
else:
ratio = ratio_h
new_w = (int)(ratio * input_width)
new_h = (int)(ratio * input_high)
dw = (dst_w - new_w) / 2
dh = (dst_h - new_h) / 2
top = int(round(dh - 0.1))
bottom = int(round(dh + 0.1))
left = int(round(dw - 0.1))
right = int(round(dw - 0.1))
return top, bottom, left, right
if __name__=="__main__":
# 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
display_mode="lcd"
if display_mode=="hdmi":
display_size=[1920,1080]
else:
display_size=[800,480]
# 模型路径
kmodel_path="/sdcard/app/tests/kmodel/yolov8n-pose.kmodel"
# 其它参数设置
confidence_threshold = 0.2
nms_threshold = 0.5
rgb888p_size=[1920,1080]
# 初始化PipeLine
pl=PipeLine(rgb888p_size=rgb888p_size,display_size=display_size,display_mode=display_mode)
pl.create()
# 初始化自定义人体关键点检测实例
person_kp=PersonKeyPointApp(kmodel_path,model_input_size=[320,320],confidence_threshold=confidence_threshold,nms_threshold=nms_threshold,rgb888p_size=rgb888p_size,display_size=display_size,debug_mode=0)
person_kp.config_preprocess()
clock = time.clock()
try:
while True:
os.exitpoint()
clock.tick()
img=pl.get_frame() # 获取当前帧数据
res=person_kp.run(img) # 推理当前帧
person_kp.draw_result(pl,res) # 绘制结果到PipeLine的osd图像
print(res) #打印结果
pl.show_image() # 显示当前的绘制结果
gc.collect()
print(clock.fps()) #打印帧率
#IDE中断释放相关资源
except Exception as e:
sys.print_exception(e)
finally:
person_kp.deinit()
pl.destroy()
实验结果:
3、车牌相关
车牌识别对找出的车牌进行车牌内容识别。识别摄像头拍摄到的画面中的车牌内容并通过写字符和画图指示。
通过CanMV K230 AI视觉框架开发,用到的模型已经存放在CanMV K230的文件系统,无需额外拷贝。
具体的编程思路如下:
自定义车牌检测类,车牌识别任务类(继承于AIBase类),车牌识别任务类。
LicenceDetectionApp类(车牌检测类):
这个类负责车牌的检测工作。
初始化函数中设置了车牌检测应用的参数,如模型路径、模型输入尺寸、置信度阈值、NMS阈值、RGB图像尺寸和显示尺寸。
config_preprocess方法配置了预处理操作,这里使用了resize操作来调整输入图像的尺寸以匹配模型的输入要求。
postprocess方法对检测结果进行后处理,使用aidemo.licence_det_postprocess接口,根据置信度阈值和NMS阈值过滤和处理检测结果。
LicenceRecognitionApp 类(车牌识别类):
这个类负责对检测到的车牌进行识别。
初始化函数中设置了车牌识别应用的参数,包括模型路径、模型输入尺寸、RGB图像尺寸和显示尺寸。
config_preprocess 方法配置了预处理操作,同样使用了resize操作。
postprocess 方法对识别结果进行后处理,将模型输出的数组转换为车牌上的字符序列。
LicenceRec 类(车牌识别任务类):
这个类整合了车牌检测和识别的功能。
初始化函数中接收车牌检测和识别模型的路径、输入尺寸等参数,并初始化了LicenceDetectionApp和LicenceRecognitionApp两个实例。
run 方法执行车牌检测,并将检测到的车牌区域抠出来,然后对每个车牌区域进行识别。
draw_result 方法将检测和识别的结果绘制到图像上,显示车牌的边界框和识别出的车牌号码。
主函数的思路:
设置显示模式和尺寸:
根据选择的显示模式(HDMI或LCD),设置显示尺寸。
初始化模型和参数:
设置车牌检测和识别模型的路径、输入尺寸、置信度阈值和NMS阈值。
创建PipeLine实例:
初始化PipeLine,管理图像的获取和显示。
初始化车牌识别任务实例:
创建LicenceRec类的实例,整合车牌检测和识别的功能,并配置预处理操作。
主循环:
在一个无限循环中,不断获取当前帧图像,进行车牌检测和识别,绘制结果,并显示。
使用clock对象来计算和打印帧率。
异常处理:
捕获异常,打印异常信息,并在退出前释放相关资源。
参考代码如下:
# 车牌字符字典
self.dict_rec = ["挂", "使", "领", "澳", "港", "皖", "沪", "津", "渝", "冀", "晋", "蒙", "辽", "吉", "黑", "苏", "浙", "京", "闽", "赣", "鲁", "豫", "鄂", "湘", "粤", "桂", "琼", "川", "贵", "云", "藏", "陕", "甘", "青", "宁", "新", "警", "学", "0", 1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "E", "F", "G", "H", "J", "K", "L", "M", "N", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z", "_", "-"]列出了所有的车牌可能出现的结果。
'''
实验名称:车牌识别
实验平台:01Studio CanMV K230
教程:wiki.01studio.cc
'''
from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import os
import ujson
from media.media import *
from time import *
import nncase_runtime as nn
import ulab.numpy as np
import time
import image
import aidemo
import random
import gc
import sys
# 自定义车牌检测类
class LicenceDetectionApp(AIBase):
# 初始化函数,设置车牌检测应用的参数
def __init__(self, kmodel_path, model_input_size, confidence_threshold=0.5, nms_threshold=0.2, rgb888p_size=[224,224], display_size=[1920,1080], debug_mode=0):
super().__init__(kmodel_path, model_input_size, rgb888p_size, debug_mode) # 调用基类的初始化函数
self.kmodel_path = kmodel_path # 模型路径
# 模型输入分辨率
self.model_input_size = model_input_size
# 分类阈值
self.confidence_threshold = confidence_threshold
self.nms_threshold = nms_threshold
# sensor给到AI的图像分辨率
self.rgb888p_size = [ALIGN_UP(rgb888p_size[0], 16), rgb888p_size[1]]
# 显示分辨率
self.display_size = [ALIGN_UP(display_size[0], 16), display_size[1]]
self.debug_mode = debug_mode
# Ai2d实例,用于实现模型预处理
self.ai2d = Ai2d(debug_mode)
# 设置Ai2d的输入输出格式和类型
self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8)
# 配置预处理操作,这里使用了pad和resize,Ai2d支持crop/shift/pad/resize/affine
def config_preprocess(self, input_image_size=None):
with ScopedTiming("set preprocess config", self.debug_mode > 0):
# 初始化ai2d预处理配置,默认为sensor给到AI的尺寸,可以通过设置input_image_size自行修改输入尺寸
ai2d_input_size = input_image_size if input_image_size else self.rgb888p_size
self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
self.ai2d.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,self.model_input_size[1],self.model_input_size[0]])
# 自定义当前任务的后处理
def postprocess(self, results):
with ScopedTiming("postprocess", self.debug_mode > 0):
# 对检测结果进行后处理
det_res = aidemo.licence_det_postprocess(results, [self.rgb888p_size[1], self.rgb888p_size[0]], self.model_input_size, self.confidence_threshold, self.nms_threshold)
return det_res
# 自定义车牌识别任务类
class LicenceRecognitionApp(AIBase):
def __init__(self,kmodel_path,model_input_size,rgb888p_size=[1920,1080],display_size=[1920,1080],debug_mode=0):
super().__init__(kmodel_path,model_input_size,rgb888p_size,debug_mode)
# kmodel路径
self.kmodel_path=kmodel_path
# 检测模型输入分辨率
self.model_input_size=model_input_size
# sensor给到AI的图像分辨率,宽16字节对齐
self.rgb888p_size=[ALIGN_UP(rgb888p_size[0],16),rgb888p_size[1]]
# 视频输出VO分辨率,宽16字节对齐
self.display_size=[ALIGN_UP(display_size[0],16),display_size[1]]
# debug模式
self.debug_mode=debug_mode
# 车牌字符字典
self.dict_rec = ["挂", "使", "领", "澳", "港", "皖", "沪", "津", "渝", "冀", "晋", "蒙", "辽", "吉", "黑", "苏", "浙", "京", "闽", "赣", "鲁", "豫", "鄂", "湘", "粤", "桂", "琼", "川", "贵", "云", "藏", "陕", "甘", "青", "宁", "新", "警", "学", "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "E", "F", "G", "H", "J", "K", "L", "M", "N", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z", "_", "-"]
self.dict_size = len(self.dict_rec)
self.ai2d=Ai2d(debug_mode)
self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT,nn.ai2d_format.NCHW_FMT,np.uint8, np.uint8)
# 配置预处理操作,这里使用了resize,Ai2d支持crop/shift/pad/resize/affine
def config_preprocess(self,input_image_size=None):
with ScopedTiming("set preprocess config",self.debug_mode > 0):
ai2d_input_size=input_image_size if input_image_size else self.rgb888p_size
self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
self.ai2d.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,self.model_input_size[1],self.model_input_size[0]])
# 自定义后处理,results是模型输出的array列表
def postprocess(self,results):
with ScopedTiming("postprocess",self.debug_mode > 0):
output_data=results[0].reshape((-1,self.dict_size))
max_indices = np.argmax(output_data, axis=1)
result_str = ""
for i in range(max_indices.shape[0]):
index = max_indices[i]
if index > 0 and (i == 0 or index != max_indices[i - 1]):
result_str += self.dict_rec[index - 1]
return result_str
# 车牌识别任务类
class LicenceRec:
def __init__(self,licence_det_kmodel,licence_rec_kmodel,det_input_size,rec_input_size,confidence_threshold=0.25,nms_threshold=0.3,rgb888p_size=[1920,1080],display_size=[1920,1080],debug_mode=0):
# 车牌检测模型路径
self.licence_det_kmodel=licence_det_kmodel
# 车牌识别模型路径
self.licence_rec_kmodel=licence_rec_kmodel
# 人脸检测模型输入分辨率
self.det_input_size=det_input_size
# 人脸姿态模型输入分辨率
self.rec_input_size=rec_input_size
# 置信度阈值
self.confidence_threshold=confidence_threshold
# nms阈值
self.nms_threshold=nms_threshold
# sensor给到AI的图像分辨率,宽16字节对齐
self.rgb888p_size=[ALIGN_UP(rgb888p_size[0],16),rgb888p_size[1]]
# 视频输出VO分辨率,宽16字节对齐
self.display_size=[ALIGN_UP(display_size[0],16),display_size[1]]
# debug_mode模式
self.debug_mode=debug_mode
self.licence_det=LicenceDetectionApp(self.licence_det_kmodel,model_input_size=self.det_input_size,confidence_threshold=self.confidence_threshold,nms_threshold=self.nms_threshold,rgb888p_size=self.rgb888p_size,display_size=self.display_size,debug_mode=0)
self.licence_rec=LicenceRecognitionApp(self.licence_rec_kmodel,model_input_size=self.rec_input_size,rgb888p_size=self.rgb888p_size)
self.licence_det.config_preprocess()
# run函数
def run(self,input_np):
# 执行车牌检测
det_boxes=self.licence_det.run(input_np)
# 将车牌部分抠出来
imgs_array_boxes = aidemo.ocr_rec_preprocess(input_np,[self.rgb888p_size[1],self.rgb888p_size[0]],det_boxes)
imgs_array = imgs_array_boxes[0]
boxes = imgs_array_boxes[1]
rec_res = []
for img_array in imgs_array:
# 对每一个检测到的车牌进行识别
self.licence_rec.config_preprocess(input_image_size=[img_array.shape[3],img_array.shape[2]])
licence_str=self.licence_rec.run(img_array)
rec_res.append(licence_str)
gc.collect()
return det_boxes,rec_res
# 绘制车牌检测识别效果
def draw_result(self,pl,det_res,rec_res):
pl.osd_img.clear()
if det_res:
point_8 = np.zeros((8),dtype=np.int16)
for det_index in range(len(det_res)):
for i in range(4):
x = det_res[det_index][i * 2 + 0]/self.rgb888p_size[0]*self.display_size[0]
y = det_res[det_index][i * 2 + 1]/self.rgb888p_size[1]*self.display_size[1]
point_8[i * 2 + 0] = int(x)
point_8[i * 2 + 1] = int(y)
for i in range(4):
pl.osd_img.draw_line(point_8[i * 2 + 0],point_8[i * 2 + 1],point_8[(i+1) % 4 * 2 + 0],point_8[(i+1) % 4 * 2 + 1],color=(255, 0, 255, 0),thickness=4)
pl.osd_img.draw_string_advanced( point_8[6], point_8[7] + 20, 40,rec_res[det_index] , color=(255,255,153,18))
if __name__=="__main__":
# 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
display_mode="lcd"
if display_mode=="hdmi":
display_size=[1920,1080]
else:
display_size=[800,480]
# 车牌检测模型路径
licence_det_kmodel_path="/sdcard/app/tests/kmodel/LPD_640.kmodel"
# 车牌识别模型路径
licence_rec_kmodel_path="/sdcard/app/tests/kmodel/licence_reco.kmodel"
# 其它参数
rgb888p_size=[640,360]
licence_det_input_size=[640,640]
licence_rec_input_size=[220,32]
confidence_threshold=0.2
nms_threshold=0.2
# 初始化PipeLine,只关注传给AI的图像分辨率,显示的分辨率
pl=PipeLine(rgb888p_size=rgb888p_size,display_size=display_size,display_mode=display_mode)
pl.create()
lr=LicenceRec(licence_det_kmodel_path,licence_rec_kmodel_path,det_input_size=licence_det_input_size,rec_input_size=licence_rec_input_size,confidence_threshold=confidence_threshold,nms_threshold=nms_threshold,rgb888p_size=rgb888p_size,display_size=display_size)
clock = time.clock()
try:
while True:
os.exitpoint()
clock.tick()
img=pl.get_frame() # 获取当前帧
det_res,rec_res=lr.run(img) # 推理当前帧
lr.draw_result(pl,det_res,rec_res) # 绘制当前帧推理结果
print(det_res,rec_res) #打印结果
pl.show_image() # 展示推理结果
gc.collect()
print(clock.fps()) #打印帧率
except Exception as e:
sys.print_exception(e)
finally:
lr.licence_det.deinit()
lr.licence_rec.deinit()
pl.destroy()
实验结果如下:
某次结果[array([456.35, 298.8773, 148.475, 301.1344, 146.225, 207.4359, 453.65, 205.5375], dtype=float32)] ['\u82cfE991N6'],['\u82cfE991N6']为识别结果,”u”为汉字的Unicode编码,82cf为汉字“苏”的编码编码结果,所以车牌为“苏E991N6”。
4、字符识别
OCR (Optical Character Recognition,光学字符识别)是指电子设备(例如扫描仪或数码相机)检查纸上打印的字符,通过检测暗、亮的模式确定其形状,然后用字符识别方法将形状翻译成计算机文字的过程。关键是从图像中提取有助于区分不同字符的特征,这些特征可能包括形状、边缘、纹理、笔画结构等。然后将提取的特征与已知的字符模板或模型进行匹配,以识别图像中的字符。
编程实现图片中的字符识别(支持中文和英文),通过CanMV K230 AI视觉框架开发,用到的模型已经存放在CanMV K230的文件系统,无需额外拷贝。
具体编程思路如下:
自定义OCR检测类,OCR识别类,OCR检测识别类。
OCRDetectionApp 类(OCR检测类):
这个类负责检测图像中的文本区域。
初始化函数中设置了检测模型的路径、模型输入尺寸、掩码阈值、文本框阈值、RGB图像尺寸和显示尺寸。
config_preprocess 方法配置了预处理操作,包括填充(pad)和缩放(resize)。
postprocess 方法对检测结果进行后处理,使用 aicube.ocr_post_process 接口,返回检测到的文本区域的坐标。
get_padding_param 方法计算填充参数,以确保输入图像的尺寸与模型输入尺寸匹配。
chw2hwc 方法将通道在前的图像数据(CHW)转换为通道在后的图像数据(HWC)。
OCRRecognitionApp 类(OCR识别类):
这个类负责识别检测到的文本区域中的字符。
初始化函数中设置了识别模型的路径、模型输入尺寸、字典路径、RGB图像尺寸和显示尺寸。
config_preprocess 方法配置了预处理操作,包括填充(pad)和缩放(resize)。
postprocess 方法对识别结果进行后处理,将模型输出的数组转换为文本字符串。
get_padding_param 方法计算填充参数,以确保输入图像的尺寸与模型输入尺寸匹配。
read_dict 方法读取OCR字典,该字典用于将识别结果的数字索引转换为对应的字符。
OCRDetRec 类(OCR检测识别类):
这个类整合了OCR检测和识别的功能。
初始化函数中接收OCR检测和识别模型的路径、输入尺寸、字典路径等参数,并初始化了 OCRDetectionApp 和 OCRRecognitionApp 两个实例。
run 方法执行OCR检测,并将检测到的文本区域传递给识别类进行识别。
draw_result 方法将检测和识别的结果绘制到图像上,显示文本区域的边界框和识别出的文本内容。
主函数的思路:
设置显示模式和尺寸:
根据选择的显示模式(HDMI或LCD),设置显示尺寸。
初始化模型和参数:
设置OCR检测和识别模型的路径、字典路径、输入尺寸、掩码阈值、文本框阈值。
创建 PipeLine 实例:
初始化 PipeLine,管理图像的获取和显示。
初始化OCR检测识别实例:
创建 OCRDetRec 类的实例,整合OCR检测和识别的功能,并配置预处理操作。
主循环:
在一个无限循环中,不断获取当前帧图像,进行OCR检测和识别,绘制结果,并显示。
使用 clock 对象来计算和打印帧率。
异常处理:
捕获异常,打印异常信息,并在退出前释放相关资源。
参考代码如下:
'''
实验名称:字符识别(OCR)
实验平台:01Studio CanMV K230
教程:wiki.01studio.cc
'''
from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import os
import ujson
from media.media import *
from time import *
import nncase_runtime as nn
import ulab.numpy as np
import time
import image
import aicube
import random
import gc
import sys
# 自定义OCR检测类
class OCRDetectionApp(AIBase):
def __init__(self,kmodel_path,model_input_size,mask_threshold=0.5,box_threshold=0.2,rgb888p_size=[224,224],display_size=[1920,1080],debug_mode=0):
super().__init__(kmodel_path,model_input_size,rgb888p_size,debug_mode)
self.kmodel_path=kmodel_path
# 模型输入分辨率
self.model_input_size=model_input_size
# 分类阈值
self.mask_threshold=mask_threshold
self.box_threshold=box_threshold
# sensor给到AI的图像分辨率
self.rgb888p_size=[ALIGN_UP(rgb888p_size[0],16),rgb888p_size[1]]
# 显示分辨率
self.display_size=[ALIGN_UP(display_size[0],16),display_size[1]]
self.debug_mode=debug_mode
# Ai2d实例,用于实现模型预处理
self.ai2d=Ai2d(debug_mode)
# 设置Ai2d的输入输出格式和类型
self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT,nn.ai2d_format.NCHW_FMT,np.uint8, np.uint8)
# 配置预处理操作,这里使用了pad和resize,Ai2d支持crop/shift/pad/resize/affine,具体代码请打开/sdcard/app/libs/AI2D.py查看
def config_preprocess(self,input_image_size=None):
with ScopedTiming("set preprocess config",self.debug_mode > 0):
# 初始化ai2d预处理配置,默认为sensor给到AI的尺寸,您可以通过设置input_image_size自行修改输入尺寸
ai2d_input_size=input_image_size if input_image_size else self.rgb888p_size
top,bottom,left,right=self.get_padding_param()
self.ai2d.pad([0,0,0,0,top,bottom,left,right], 0, [0,0,0])
self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
self.ai2d.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,self.model_input_size[1],self.model_input_size[0]])
# 自定义当前任务的后处理
def postprocess(self,results):
with ScopedTiming("postprocess",self.debug_mode > 0):
# chw2hwc
hwc_array=self.chw2hwc(self.cur_img)
# 这里使用了aicube封装的接口ocr_post_process做后处理,返回的det_boxes结构为[[crop_array_nhwc,[p1_x,p1_y,p2_x,p2_y,p3_x,p3_y,p4_x,p4_y]],...]
det_boxes = aicube.ocr_post_process(results[0][:,:,:,0].reshape(-1), hwc_array.reshape(-1),self.model_input_size,self.rgb888p_size, self.mask_threshold, self.box_threshold)
return det_boxes
# 计算padding参数
def get_padding_param(self):
# 右padding或下padding
dst_w = self.model_input_size[0]
dst_h = self.model_input_size[1]
input_width = self.rgb888p_size[0]
input_high = self.rgb888p_size[1]
ratio_w = dst_w / input_width
ratio_h = dst_h / input_high
if ratio_w < ratio_h:
ratio = ratio_w
else:
ratio = ratio_h
new_w = (int)(ratio * input_width)
new_h = (int)(ratio * input_high)
dw = (dst_w - new_w) / 2
dh = (dst_h - new_h) / 2
top = (int)(round(0))
bottom = (int)(round(dh * 2 + 0.1))
left = (int)(round(0))
right = (int)(round(dw * 2 - 0.1))
return top, bottom, left, right
# chw2hwc
def chw2hwc(self,features):
ori_shape = (features.shape[0], features.shape[1], features.shape[2])
c_hw_ = features.reshape((ori_shape[0], ori_shape[1] * ori_shape[2]))
hw_c_ = c_hw_.transpose()
new_array = hw_c_.copy()
hwc_array = new_array.reshape((ori_shape[1], ori_shape[2], ori_shape[0]))
del c_hw_
del hw_c_
del new_array
return hwc_array
# 自定义OCR识别任务类
class OCRRecognitionApp(AIBase):
def __init__(self,kmodel_path,model_input_size,dict_path,rgb888p_size=[1920,1080],display_size=[1920,1080],debug_mode=0):
super().__init__(kmodel_path,model_input_size,rgb888p_size,debug_mode)
# kmodel路径
self.kmodel_path=kmodel_path
# 识别模型输入分辨率
self.model_input_size=model_input_size
self.dict_path=dict_path
# sensor给到AI的图像分辨率,宽16字节对齐
self.rgb888p_size=[ALIGN_UP(rgb888p_size[0],16),rgb888p_size[1]]
# 视频输出VO分辨率,宽16字节对齐
self.display_size=[ALIGN_UP(display_size[0],16),display_size[1]]
# debug模式
self.debug_mode=debug_mode
self.dict_word=None
# 读取OCR的字典
self.read_dict()
self.ai2d=Ai2d(debug_mode)
self.ai2d.set_ai2d_dtype(nn.ai2d_format.RGB_packed,nn.ai2d_format.NCHW_FMT,np.uint8, np.uint8)
# 配置预处理操作,这里使用了pad和resize,Ai2d支持crop/shift/pad/resize/affine,具体代码请打开/sdcard/app/libs/AI2D.py查看
def config_preprocess(self,input_image_size=None,input_np=None):
with ScopedTiming("set preprocess config",self.debug_mode > 0):
ai2d_input_size=input_image_size if input_image_size else self.rgb888p_size
top,bottom,left,right=self.get_padding_param(ai2d_input_size,self.model_input_size)
self.ai2d.pad([0,0,0,0,top,bottom,left,right], 0, [0,0,0])
self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
# 如果传入input_np,输入shape为input_np的shape,如果不传入,输入shape为[1,3,ai2d_input_size[1],ai2d_input_size[0]]
self.ai2d.build([input_np.shape[0],input_np.shape[1],input_np.shape[2],input_np.shape[3]],[1,3,self.model_input_size[1],self.model_input_size[0]])
# 自定义后处理,results是模型输出的array列表
def postprocess(self,results):
with ScopedTiming("postprocess",self.debug_mode > 0):
preds = np.argmax(results[0], axis=2).reshape((-1))
output_txt = ""
for i in range(len(preds)):
# 当前识别字符不是字典的最后一个字符并且和前一个字符不重复(去重),加入识别结果字符串
if preds[i] != (len(self.dict_word) - 1) and (not (i > 0 and preds[i - 1] == preds[i])):
output_txt = output_txt + self.dict_word[preds[i]]
return output_txt
# 计算padding参数
def get_padding_param(self,src_size,dst_size):
# 右padding或下padding
dst_w = dst_size[0]
dst_h = dst_size[1]
input_width = src_size[0]
input_high = src_size[1]
ratio_w = dst_w / input_width
ratio_h = dst_h / input_high
if ratio_w < ratio_h:
ratio = ratio_w
else:
ratio = ratio_h
new_w = (int)(ratio * input_width)
new_h = (int)(ratio * input_high)
dw = (dst_w - new_w) / 2
dh = (dst_h - new_h) / 2
top = (int)(round(0))
bottom = (int)(round(dh * 2 + 0.1))
left = (int)(round(0))
right = (int)(round(dw * 2 - 0.1))
return top, bottom, left, right
def read_dict(self):
if self.dict_path!="":
with open(dict_path, 'r') as file:
line_one = file.read(100000)
line_list = line_one.split("\r\n")
self.dict_word = {num: char.replace("\r", "").replace("\n", "") for num, char in enumerate(line_list)}
class OCRDetRec:
def __init__(self,ocr_det_kmodel,ocr_rec_kmodel,det_input_size,rec_input_size,dict_path,mask_threshold=0.25,box_threshold=0.3,rgb888p_size=[1920,1080],display_size=[1920,1080],debug_mode=0):
# OCR检测模型路径
self.ocr_det_kmodel=ocr_det_kmodel
# OCR识别模型路径
self.ocr_rec_kmodel=ocr_rec_kmodel
# OCR检测模型输入分辨率
self.det_input_size=det_input_size
# OCR识别模型输入分辨率
self.rec_input_size=rec_input_size
# 字典路径
self.dict_path=dict_path
# 置信度阈值
self.mask_threshold=mask_threshold
# nms阈值
self.box_threshold=box_threshold
# sensor给到AI的图像分辨率,宽16字节对齐
self.rgb888p_size=[ALIGN_UP(rgb888p_size[0],16),rgb888p_size[1]]
# 视频输出VO分辨率,宽16字节对齐
self.display_size=[ALIGN_UP(display_size[0],16),display_size[1]]
# debug_mode模式
self.debug_mode=debug_mode
self.ocr_det=OCRDetectionApp(self.ocr_det_kmodel,model_input_size=self.det_input_size,mask_threshold=self.mask_threshold,box_threshold=self.box_threshold,rgb888p_size=self.rgb888p_size,display_size=self.display_size,debug_mode=0)
self.ocr_rec=OCRRecognitionApp(self.ocr_rec_kmodel,model_input_size=self.rec_input_size,dict_path=self.dict_path,rgb888p_size=self.rgb888p_size,display_size=self.display_size)
self.ocr_det.config_preprocess()
# run函数
def run(self,input_np):
# 先进行OCR检测
det_res=self.ocr_det.run(input_np)
boxes=[]
ocr_res=[]
for det in det_res:
# 对得到的每个检测框执行OCR识别
self.ocr_rec.config_preprocess(input_image_size=[det[0].shape[2],det[0].shape[1]],input_np=det[0])
ocr_str=self.ocr_rec.run(det[0])
ocr_res.append(ocr_str)
boxes.append(det[1])
gc.collect()
return boxes,ocr_res
# 绘制OCR检测识别效果
def draw_result(self,pl,det_res,rec_res):
pl.osd_img.clear()
if det_res:
# 循环绘制所有检测到的框
for j in range(len(det_res)):
# 将原图的坐标点转换成显示的坐标点,循环绘制四条直线,得到一个矩形框
for i in range(4):
x1 = det_res[j][(i * 2)] / self.rgb888p_size[0] * self.display_size[0]
y1 = det_res[j][(i * 2 + 1)] / self.rgb888p_size[1] * self.display_size[1]
x2 = det_res[j][((i + 1) * 2) % 8] / self.rgb888p_size[0] * self.display_size[0]
y2 = det_res[j][((i + 1) * 2 + 1) % 8] / self.rgb888p_size[1] * self.display_size[1]
pl.osd_img.draw_line((int(x1), int(y1), int(x2), int(y2)), color=(255, 0, 0, 255),thickness=5)
pl.osd_img.draw_string_advanced(int(x1),int(y1),32,rec_res[j],color=(0,0,255))
if __name__=="__main__":
# 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
display_mode="lcd"
if display_mode=="hdmi":
display_size=[1920,1080]
else:
display_size=[800,480]
# OCR检测模型路径
ocr_det_kmodel_path="/sdcard/app/tests/kmodel/ocr_det_int16.kmodel"
# OCR识别模型路径
ocr_rec_kmodel_path="/sdcard/app/tests/kmodel/ocr_rec_int16.kmodel"
# 其他参数
dict_path="/sdcard/app/tests/utils/dict.txt"
rgb888p_size=[640,360]
ocr_det_input_size=[640,640]
ocr_rec_input_size=[512,32]
mask_threshold=0.25
box_threshold=0.3
# 初始化PipeLine,只关注传给AI的图像分辨率,显示的分辨率
pl=PipeLine(rgb888p_size=rgb888p_size,display_size=display_size,display_mode=display_mode)
pl.create()
ocr=OCRDetRec(ocr_det_kmodel_path,ocr_rec_kmodel_path,det_input_size=ocr_det_input_size,rec_input_size=ocr_rec_input_size,dict_path=dict_path,mask_threshold=mask_threshold,box_threshold=box_threshold,rgb888p_size=rgb888p_size,display_size=display_size)
clock = time.clock()
try:
while True:
os.exitpoint()
clock.tick()
img=pl.get_frame() # 获取当前帧
det_res,rec_res=ocr.run(img) # 推理当前帧
ocr.draw_result(pl,det_res,rec_res) # 绘制当前帧推理结果
print(det_res,rec_res) # 打印结果
pl.show_image() # 展示当前帧推理结果
gc.collect()
print(clock.fps()) #打印帧率
except Exception as e:
sys.print_exception(e)
finally:
ocr.ocr_det.deinit()
ocr.ocr_rec.deinit()
pl.destroy()
实验结果:
某次结果[array([62.24493, 141.8473, 413.3598, 143.8497, 412.755, 275.1527, 61.64014, 273.1503], dtype=float32)] ['01\u79d1\u6280'],['01\u79d1\u6280']为识别结果,“\u79d1\u6280”为“科技”的Unicode编码,识别正确。
5、物体检测
物体检测,是机器视觉里面非常典型的应用。要实现的就是将一幅图片里面的各种物体检测出来,然后跟已知模型做比较从而判断物体是什么。
例程基于YOLOv8n, 支持识别80种物体。
["person", "bicycle", "car", "motorcycle", "airplane", "bus", "train", "truck", "boat", "traffic light", "fire hydrant", "stop sign", "parking meter", "bench", "bird", "cat", "dog", "horse", "sheep", "cow", "elephant", "bear", "zebra", "giraffe", "backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee", "skis", "snowboard", "sports ball", "kite", "baseball bat", "baseball glove", "skateboard", "surfboard", "tennis racket", "bottle", "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple", "sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair", "couch", "potted plant", "bed", "dining table", "toilet", "tv", "laptop", "mouse", "remote", "keyboard", "cell phone", "microwave", "oven", "toaster", "sink", "refrigerator", "book", "clock", "vase", "scissors", "teddy bear", "hair drier", "toothbrush"]
[“人”、“自行车”、“汽车”、“摩托车”、“飞机”、“公共汽车”、“火车”、“卡车”、“船”、“交通灯”、“消防栓”、“停车标志”、“停车收费表”、“长凳”、“鸟”、“猫”、“狗”、“马”、“羊”、“牛”、“大象”、“熊”、“斑马”、“长颈鹿”、“背包”、“雨伞”、“手提包”、“领带”、“手提箱”、“飞盘”、“滑雪板”、“滑雪板”、“运动球”、“风筝”、“棒球棒”、“棒球手套”、“滑板”、“冲浪板”、“网球拍”、“瓶子”、“酒杯”、“杯子”、“叉子”、“刀子”、“勺子”、“碗”、“香蕉”、“苹果”、“三明治”、“橙子”、“西兰花”、“胡萝卜”、“热狗”、“披萨”、“甜甜圈”、“蛋糕”、“椅子”、“沙发“盆栽”、“床”、“餐桌”、“马桶”、“电视”、“笔记本电脑”、“鼠标”、“遥控器”、“键盘”、“手机”、“微波炉”、“烤箱”、“烤面包机”、“水槽”、“冰箱”、“书”、“钟表”、“花瓶”、“剪刀”、“泰迪熊”、“吹风机”、“牙刷”]
具体编程思路如下:
自定义YOLOv8检测类
初始化(__init__):
设置模型路径、类别标签、模型输入尺寸、最大检测框数量、置信度阈值、NMS(非极大值抑制)阈值、RGB图像尺寸和显示尺寸。
初始化Ai2d实例,用于实现模型预处理。
预处理配置(config_preprocess):
配置Ai2d的预处理操作,这里使用了缩放(resize)操作来调整输入图像的尺寸以匹配模型的输入要求。
后处理(postprocess):
对模型的推理结果进行后处理,包括转换输出格式、应用置信度阈值、执行NMS以及限制最大检测框数量。
绘制结果(draw_result):
将检测结果绘制到图像上,包括绘制检测框和类别标签。
非极大值抑制(nms):
实现NMS算法,用于去除重叠的检测框,提高检测的准确性。
获取颜色(get_color):
根据检测到的物体类别索引,返回预设的颜色值,用于绘制不同类别的检测框。
主函数的思路:
设置显示模式和尺寸:
根据选择的显示模式(HDMI或LCD),设置显示尺寸。
初始化模型和参数:
设置YOLOv8模型的路径、类别标签、置信度阈值、NMS阈值和最大检测框数量。
创建PipeLine实例:
初始化PipeLine,管理图像的获取和显示。
初始化YOLOv8检测实例:
创建ObjectDetectionApp类的实例,并配置预处理操作。
主循环:
在一个无限循环中,不断获取当前帧图像,进行物体检测,绘制结果,并显示。
使用clock对象来计算和打印帧率。
异常处理:
捕获异常,打印异常信息,并在退出前释放相关资源。
参考代码如下:
'''
实验名称:物体检测(基于yolov8n)
实验平台:01Studio CanMV K230
教程:wiki.01studio.cc
'''
from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import os
import ujson
from media.media import *
from time import *
import nncase_runtime as nn
import ulab.numpy as np
import time
import utime
import image
import random
import gc
import sys
import aidemo
# 自定义YOLOv8检测类
class ObjectDetectionApp(AIBase):
def __init__(self,kmodel_path,labels,model_input_size,max_boxes_num,confidence_threshold=0.5,nms_threshold=0.2,rgb888p_size=[224,224],display_size=[1920,1080],debug_mode=0):
super().__init__(kmodel_path,model_input_size,rgb888p_size,debug_mode)
self.kmodel_path=kmodel_path
self.labels=labels
# 模型输入分辨率
self.model_input_size=model_input_size
# 阈值设置
self.confidence_threshold=confidence_threshold
self.nms_threshold=nms_threshold
self.max_boxes_num=max_boxes_num
# sensor给到AI的图像分辨率
self.rgb888p_size=[ALIGN_UP(rgb888p_size[0],16),rgb888p_size[1]]
# 显示分辨率
self.display_size=[ALIGN_UP(display_size[0],16),display_size[1]]
self.debug_mode=debug_mode
# 检测框预置颜色值
self.color_four=[(255, 220, 20, 60), (255, 119, 11, 32), (255, 0, 0, 142), (255, 0, 0, 230),
(255, 106, 0, 228), (255, 0, 60, 100), (255, 0, 80, 100), (255, 0, 0, 70),
(255, 0, 0, 192), (255, 250, 170, 30), (255, 100, 170, 30), (255, 220, 220, 0),
(255, 175, 116, 175), (255, 250, 0, 30), (255, 165, 42, 42), (255, 255, 77, 255),
(255, 0, 226, 252), (255, 182, 182, 255), (255, 0, 82, 0), (255, 120, 166, 157)]
# 宽高缩放比例
self.x_factor = float(self.rgb888p_size[0])/self.model_input_size[0]
self.y_factor = float(self.rgb888p_size[1])/self.model_input_size[1]
# Ai2d实例,用于实现模型预处理
self.ai2d=Ai2d(debug_mode)
# 设置Ai2d的输入输出格式和类型
self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT,nn.ai2d_format.NCHW_FMT,np.uint8, np.uint8)
# 配置预处理操作,这里使用了resize,Ai2d支持crop/shift/pad/resize/affine,具体代码请打开/sdcard/app/libs/AI2D.py查看
def config_preprocess(self,input_image_size=None):
with ScopedTiming("set preprocess config",self.debug_mode > 0):
# 初始化ai2d预处理配置,默认为sensor给到AI的尺寸,您可以通过设置input_image_size自行修改输入尺寸
ai2d_input_size=input_image_size if input_image_size else self.rgb888p_size
self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
self.ai2d.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,self.model_input_size[1],self.model_input_size[0]])
# 自定义当前任务的后处理
def postprocess(self,results):
with ScopedTiming("postprocess",self.debug_mode > 0):
result=results[0]
result = result.reshape((result.shape[0] * result.shape[1], result.shape[2]))
output_data = result.transpose()
boxes_ori = output_data[:,0:4]
scores_ori = output_data[:,4:]
confs_ori = np.max(scores_ori,axis=-1)
inds_ori = np.argmax(scores_ori,axis=-1)
boxes,scores,inds = [],[],[]
for i in range(len(boxes_ori)):
if confs_ori[i] > confidence_threshold:
scores.append(confs_ori[i])
inds.append(inds_ori[i])
x = boxes_ori[i,0]
y = boxes_ori[i,1]
w = boxes_ori[i,2]
h = boxes_ori[i,3]
left = int((x - 0.5 * w) * self.x_factor)
top = int((y - 0.5 * h) * self.y_factor)
right = int((x + 0.5 * w) * self.x_factor)
bottom = int((y + 0.5 * h) * self.y_factor)
boxes.append([left,top,right,bottom])
if len(boxes)==0:
return []
boxes = np.array(boxes)
scores = np.array(scores)
inds = np.array(inds)
# NMS过程
keep = self.nms(boxes,scores,nms_threshold)
dets = np.concatenate((boxes, scores.reshape((len(boxes),1)), inds.reshape((len(boxes),1))), axis=1)
dets_out = []
for keep_i in keep:
dets_out.append(dets[keep_i])
dets_out = np.array(dets_out)
dets_out = dets_out[:self.max_boxes_num, :]
return dets_out
# 绘制结果
def draw_result(self,pl,dets):
with ScopedTiming("display_draw",self.debug_mode >0):
if dets:
pl.osd_img.clear()
for det in dets:
x1, y1, x2, y2 = map(lambda x: int(round(x, 0)), det[:4])
x= x1*self.display_size[0] // self.rgb888p_size[0]
y= y1*self.display_size[1] // self.rgb888p_size[1]
w = (x2 - x1) * self.display_size[0] // self.rgb888p_size[0]
h = (y2 - y1) * self.display_size[1] // self.rgb888p_size[1]
pl.osd_img.draw_rectangle(x,y, w, h, color=self.get_color(int(det[5])),thickness=4)
pl.osd_img.draw_string_advanced( x , y-50,32," " + self.labels[int(det[5])] + " " + str(round(det[4],2)) , color=self.get_color(int(det[5])))
else:
pl.osd_img.clear()
# 多目标检测 非最大值抑制方法实现
def nms(self,boxes,scores,thresh):
"""Pure Python NMS baseline."""
x1,y1,x2,y2 = boxes[:, 0],boxes[:, 1],boxes[:, 2],boxes[:, 3]
areas = (x2 - x1 + 1) * (y2 - y1 + 1)
order = np.argsort(scores,axis = 0)[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
new_x1,new_y1,new_x2,new_y2,new_areas = [],[],[],[],[]
for order_i in order:
new_x1.append(x1[order_i])
new_x2.append(x2[order_i])
new_y1.append(y1[order_i])
new_y2.append(y2[order_i])
new_areas.append(areas[order_i])
new_x1 = np.array(new_x1)
new_x2 = np.array(new_x2)
new_y1 = np.array(new_y1)
new_y2 = np.array(new_y2)
xx1 = np.maximum(x1[i], new_x1)
yy1 = np.maximum(y1[i], new_y1)
xx2 = np.minimum(x2[i], new_x2)
yy2 = np.minimum(y2[i], new_y2)
w = np.maximum(0.0, xx2 - xx1 + 1)
h = np.maximum(0.0, yy2 - yy1 + 1)
inter = w * h
new_areas = np.array(new_areas)
ovr = inter / (areas[i] + new_areas - inter)
new_order = []
for ovr_i,ind in enumerate(ovr):
if ind < thresh:
new_order.append(order[ovr_i])
order = np.array(new_order,dtype=np.uint8)
return keep
# 根据当前类别索引获取框的颜色
def get_color(self, x):
idx=x%len(self.color_four)
return self.color_four[idx]
if __name__=="__main__":
# 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
display_mode="lcd"
if display_mode=="hdmi":
display_size=[1920,1080]
else:
display_size=[800,480]
# 模型路径
kmodel_path="/sdcard/app/tests/kmodel/yolov8n_320.kmodel"
labels = ["person", "bicycle", "car", "motorcycle", "airplane", "bus", "train", "truck", "boat", "traffic light", "fire hydrant", "stop sign", "parking meter", "bench", "bird", "cat", "dog", "horse", "sheep", "cow", "elephant", "bear", "zebra", "giraffe", "backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee", "skis", "snowboard", "sports ball", "kite", "baseball bat", "baseball glove", "skateboard", "surfboard", "tennis racket", "bottle", "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple", "sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair", "couch", "potted plant", "bed", "dining table", "toilet", "tv", "laptop", "mouse", "remote", "keyboard", "cell phone", "microwave", "oven", "toaster", "sink", "refrigerator", "book", "clock", "vase", "scissors", "teddy bear", "hair drier", "toothbrush"]
# 其它参数设置
confidence_threshold = 0.2
nms_threshold = 0.2
max_boxes_num = 50
rgb888p_size=[320,320]
# 初始化PipeLine
pl=PipeLine(rgb888p_size=rgb888p_size,display_size=display_size,display_mode=display_mode)
pl.create()
# 初始化自定义目标检测实例
ob_det=ObjectDetectionApp(kmodel_path,labels=labels,model_input_size=[320,320],max_boxes_num=max_boxes_num,confidence_threshold=confidence_threshold,nms_threshold=nms_threshold,rgb888p_size=rgb888p_size,display_size=display_size,debug_mode=0)
ob_det.config_preprocess()
clock = time.clock()
try:
while True:
os.exitpoint()
clock.tick()
img=pl.get_frame() # 获取当前帧数据
res=ob_det.run(img) # 推理当前帧
ob_det.draw_result(pl,res) # 绘制结果到PipeLine的osd图像
print(res) # 打印当前结果
pl.show_image() # 显示当前的绘制结果
gc.collect()
print(clock.fps()) #打印帧率
except Exception as e:
sys.print_exception(e)
finally:
ob_det.deinit()
pl.destroy()
实验结果:
缓冲区显示各个物体的名称和置信度(可信度),可以看到准确率还是挺高的。
- 2024-11-06
-
发表了主题帖:
嘉楠K230AI开发板测评7--AI Demo开发框架
本帖最后由 dfjs 于 2024-11-6 21:01 编辑
嘉楠科K230AI开发板测评7--AI视觉篇
1、AI视觉开发框架
更高级的机器视觉(AI视觉)需要使用KPU。可以简单类别比计算机的GPU(显卡),本质是实现高速的图像数据运算。
KPU是K230内部一个神经网络处理器,它可以在低功耗的情况下实现卷积神经网络计算,实时获取被检测目标的大小、坐标和种类,对人脸或者物体进行检测和分类。K230 KPU支持INT8和INT16, 典型网络下实测推理能力可达K210的13.7倍,MAC利用率超70%。
CanMV官方基于K230专门搭建了配套的AI视觉开发框架,框架结构如下图所示:
这个框架简单来说就是Sensor(摄像头)默认输出两路图像,一路格式为YUV420,直接给到Display显示;另一路格式为RGB888,给到AI部分进行处理。AI主要实现任务的前处理、推理和后处理流程,得到后处理结果后将其绘制在osd image实例上,并送给Display叠加,最后在HDMI、LCD或IDE缓冲区显示识别结果。
这套框架的优势是用户可以直接基于处理结果编程实现自己的功能,同时AI主要实现任务的前处理、推理和后处理流程也是通过Python代码实现,方便用户深入二次开发。充分满足不同用户和开发者的需求。
AI视觉开发框架主要API接口有:
PineLine : 将sensor、display封装成固定接口,用于采集图像、画图以及结果图片显示。
Ai2d : 预处理(Preprocess)相关接口。
AIBase : 模型推理主要接口。
2、相关接口
可在该网址查看:https://developer.canaan-creative.com/k230_canmv/main/zh/example/ai/AI_Demo%E8%AF%B4%E6%98%8E%E6%96%87%E6%A1%A3.html
2.1 PipeLine
将Media部分的代码封装在PipeLine类型中,通过固定的接口实现整个流程操作。
PipeLine类提供的接口包括:
初始化参数
rgb888p_size:list类型,预设给到AI部分的图像分辨率;如rgb888p_size=[1920,1080]。
display_size:list类型,显示部分Display的分辨率;如display_size=[1920,1080]。
display_mode:str类型,显示模式,包括”hdmi“和”lcd“;如display_mode=”hdmi“。
debug_mode:int类型,耗时调试模式,如果大于0,打印操作耗时;如debug_mode=0。
creat(sensor=None,hmirror=None,vfilp=None)
sensor:参数为可选参数,类型为Sensor对象,可自主配置现有CanMV、01Studio和k230d zero开发板实现了自动探测,可以默认使用create()实现。
hmirror:默认为None,当主动设置时为bool类型(True/False),表示是否实现水平方向镜像显示。
vflip: 默认为None,当主动设置时为bool类型(True/False),表示是否实现垂直方向翻转。
get_frame
返回一帧ulab.numpy.ndarray类型图像数据,分辨率为rgb888p_size,排布为CHW。
show_image
PipeLine实例中预设一帧OSD图像,该接口将成员变量osd_img显示在屏幕上。
destroy
销毁PipeLine实例。
示例代码:
from libs.PipeLine import PipeLine, ScopedTiming
from media.media import *
import gc
import sys,os
if __name__ == "__main__":
# 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
display_mode="hdmi"
if display_mode=="hdmi":
display_size=[1920,1080]
else:
display_size=[800,480]
# 初始化PipeLine,用于图像处理流程
pl = PipeLine(rgb888p_size=[1920,1080], display_size=display_size, display_mode=display_mode)
pl.create() # 创建PipeLine实例
try:
while True:
os.exitpoint() # 检查是否有退出信号
with ScopedTiming("total",1):
img = pl.get_frame() # 获取当前帧数据
print(img.shape)
gc.collect() # 垃圾回收
except Exception as e:
sys.print_exception(e) # 打印异常信息
finally:
pl.destroy() # 销毁PipeLine实例
通过pl.get_frame()接口获取一帧分辨率为rgb888p_size的图像,类型为ulab.numpy.ndarray,排布为CHW。基于上面的代码得到了一帧图像给AI处理,只关注AI推理部分的操作即可。
图像AI开发过程包括:图像预处理、模型推理、输出后处理的过程,整个过程封装在Ai2d类和AIBase类中。
2.2 Ai2d
对于Ai2d类,我们给出了常见的几种预处理方法,包括crop/shift/pad/resize/affine。该类别提供的接口包括:
初始化参数
debug_mode:int类型,耗时调试模式,如果大于0,打印操作耗时;如debug_mode=0。
Set_ai2d_dtype(input_format,output_format,input_type,output_type)
设置ai2d计算过程中的输入输出数据类型,输入输出数据格式。
Crop(start_x,start_y,width,height)预处理crop函数:
start_x:宽度方向的起始像素,int类型
start_y: 高度方向的起始像素,int类型
width: 宽度方向的crop长度,int类型
height: 高度方向的crop长度,int类型
Shift(shift_va预处理shift函数:
shift_val:右移的比特数,int类型
Pad(paddings,pad_mode,pad_val)预处理padding函数:
paddings:各个维度的padding, size=8,分别表示dim0到dim4的前后padding的个数,其中dim0/dim1固定配置{0, 0},list类型
pad_mode:只支持pad constant,配置0即可,int类型
pad_val:每个channel的padding value,list类型
Resize(interp_method,interp_mode)预处理resize函数:
interp_method:resize插值方法,ai2d_interp_method类型
interp_mode:resize模式,ai2d_interp_mode类型
Affine(interp_method,crop_round,bound_ind,bound_val,bound_smooth,M)预处理affine函数:
interp_method:Affine采用的插值方法,ai2d_interp_method类型
cord_round:整数边界0或者1,uint32_t类型
bound_ind:边界像素模式0或者1,uint32_t类型
bound_val:边界填充值,uint32_t类型
bound_smooth:边界平滑0或者1,uint32_t类型
M:仿射变换矩阵对应的vector,仿射变换为Y=[a_0, a_1; a_2, a_3] \cdot X + [b_0, b_1] $, 则 M=[a_0,a_1,b_0,a_2,a_3,b_1 ],list类型
Build(ai2d_input_shape,ai2d_output_shape):ai2d构造函数,前面配置的预处理方法起作用。
Run使用ai2d完成预处理
注意:
(1) Affine和Resize功能是互斥的,不能同时开启; (2) Shift功能的输入格式只能是Raw16; (3) Pad value是按通道配置的,对应的list元素个数要与channel数相等; (4) 当配置了多个功能时,执行顺序是Crop->Shift->Resize/Affine->Pad, 配置参数时注意要匹配;如果不符合该顺序,需要初始化多个Ai2d实例实现预处理过程;
示例代码如下:
from libs.PipeLine import PipeLine, ScopedTiming
from libs.AI2D import Ai2d
from media.media import *
import nncase_runtime as nn
import gc
import sys,os
if __name__ == "__main__":
# 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
display_mode="hdmi"
if display_mode=="hdmi":
display_size=[1920,1080]
else:
display_size=[800,480]
# 初始化PipeLine,用于图像处理流程
pl = PipeLine(rgb888p_size=[512,512], display_size=display_size, display_mode=display_mode)
pl.create() # 创建PipeLine实例
my_ai2d=Ai2d(debug_mode=0) #初始化Ai2d实例
# 配置resize预处理方法
my_ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
# 构建预处理过程
my_ai2d.build([1,3,512,512],[1,3,640,640])
try:
while True:
os.exitpoint() # 检查是否有退出信号
with ScopedTiming("total",1):
img = pl.get_frame() # 获取当前帧数据
print(img.shape) # 原图shape为[1,3,512,512]
ai2d_output_tensor=my_ai2d.run(img) # 执行resize预处理
ai2d_output_np=ai2d_output_tensor.to_numpy() # 类型转换
print(ai2d_output_np.shape) # 预处理后的shape为[1,3,640,640]
gc.collect() # 垃圾回收
except Exception as e:
sys.print_exception(e) # 打印异常信息
finally:
pl.destroy() # 销毁PipeLine实例
2.3 AIBase
AIBase部分封装了实现模型推理的主要接口,也是进行AI开发主要关注的部分。用户需要按照自己demo的要求实现前处理和后处理部分。
AIBase提供的接口包括:
初始化参数
kmodel_path:str类型,kmodel路径,用于初始化kpu对象并加载kmodel;
model_input_size:list类型,可选,模型输入分辨率,在单输入时起作用,格式为[width,height],如:model_input_size=[512,512];
rgb888p_size:list类型,可选,AI得到的图像的分辨率,在单输入时起作用,格式为[width,height],如:rgb888p_size=[640,640];
debug_mode:int类型,耗时调试模式,如果大于0,打印操作耗时;如debug_mode=0。
get_kmodel_inputs_num():返回当前模型的输入个数
get_kmodel_outputs_num():返回当前模型的输出个数
preprocess(input_np):使用ai2d对input_np做预处理,,如果不使用单个ai2d实例做预处理,需要在子类重写该函数。
inference(tensors):对预处理后得到的kmodel的输入(类型为tensor)进行推理,得到多个输出(类型为ulab.numpy.ndarray)
postprocess(results):模型输出后处理函数,该函数需要用户在任务子类重写,因为不同AI任务的后处理是不同的
run(input_np):模型的前处理、推理、后处理流程,适用于单ai2d实例能解决的前处理的AI任务,其他任务需要用户在子类重写。
deinit():AIBase销毁函数。
2.4 ScopedTiming
ScopedTiming 类在PipeLine.py模块内,是一个用来测量代码块执行时间的上下文管理器。上下文管理器通过定义包含 __enter__ 和 __exit__ 方法的类来创建。当在 with 语句中使用该类的实例时,__enter__ 在进入 with 块时被调用,__exit__ 在离开时被调用。
示例代码:
from libs.PipeLine import ScopedTiming
def test_time():
with ScopedTiming("test",1):
#####代码#####
# ...
##############
3、应用方法和示例
用户可根据具体的AI场景自写任务类继承AIBase,可以将任务分为如下四类:单模型任务、多模型任务,自定义预处理任务、无预处理任务。不同任务需要编写不同的代码实现,具体如下图所示:
3.1 单模型任务
该任务只有一个模型,只需要关注该模型的前处理、推理、后处理过程,此类任务的前处理使用Ai2d实现,可能使用一个Ai2d实例,也可能使用多个Ai2d实例,后处理基于场景自定义。
编写自定义任务类,主要关注任务类的config_preprocess、postprocess、以及该任务需要的其他方法如:draw_result等。
如果该任务包含多个Ai2d实例,则需要重写preprocess,按照预处理的顺序设置预处理阶段的计算过程。
单模型任务的伪代码结构如下:
from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import os
from media.media import *
import nncase_runtime as nn
import ulab.numpy as np
import image
import gc
import sys
# 自定义AI任务类,继承自AIBase基类
class MyAIApp(AIBase):
def __init__(self, kmodel_path, model_input_size, rgb888p_size=[224,224], display_size=[1920,1080], debug_mode=0):
# 调用基类的构造函数
super().__init__(kmodel_path, model_input_size, rgb888p_size, debug_mode)
# 模型文件路径
self.kmodel_path = kmodel_path
# 模型输入分辨率
self.model_input_size = model_input_size
# sensor给到AI的图像分辨率,并对宽度进行16的对齐
self.rgb888p_size = [ALIGN_UP(rgb888p_size[0], 16), rgb888p_size[1]]
# 显示分辨率,并对宽度进行16的对齐
self.display_size = [ALIGN_UP(display_size[0], 16), display_size[1]]
# 是否开启调试模式
self.debug_mode = debug_mode
# 实例化Ai2d,用于实现模型预处理
self.ai2d = Ai2d(debug_mode)
# 设置Ai2d的输入输出格式和类型
self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8)
# 配置预处理操作,这里使用了resize,Ai2d支持crop/shift/pad/resize/affine,具体代码请打开/sdcard/app/libs/AI2D.py查看
def config_preprocess(self, input_image_size=None):
with ScopedTiming("set preprocess config", self.debug_mode > 0):
# 初始化ai2d预处理配置,默认为sensor给到AI的尺寸,可以通过设置input_image_size自行修改输入尺寸
ai2d_input_size = input_image_size if input_image_size else self.rgb888p_size
# 配置resize预处理方法
self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
# 构建预处理流程
self.ai2d.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,self.model_input_size[1],self.model_input_size[0]])
# 自定义当前任务的后处理,results是模型输出array列表,需要根据实际任务重写
def postprocess(self, results):
with ScopedTiming("postprocess", self.debug_mode > 0):
pass
# 绘制结果到画面上,需要根据任务自己写
def draw_result(self, pl, dets):
with ScopedTiming("display_draw", self.debug_mode > 0):
pass
if __name__ == "__main__":
# 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
display_mode="hdmi"
if display_mode=="hdmi":
display_size=[1920,1080]
else:
display_size=[800,480]
# 设置模型路径,这里要替换成当前任务模型
kmodel_path = "example_test.kmodel"
rgb888p_size = [1920, 1080]
###### 其它参数########
...
######################
# 初始化PipeLine,用于图像处理流程
pl = PipeLine(rgb888p_size=rgb888p_size, display_size=display_size, display_mode=display_mode)
pl.create() # 创建PipeLine实例
# 初始化自定义AI任务实例
my_ai = MyAIApp(kmodel_path, model_input_size=[320, 320],rgb888p_size=rgb888p_size, display_size=display_size, debug_mode=0)
my_ai.config_preprocess() # 配置预处理
try:
while True:
os.exitpoint() # 检查是否有退出信号
with ScopedTiming("total",1):
img = pl.get_frame() # 获取当前帧数据
res = my_ai.run(img) # 推理当前帧
my_ai.draw_result(pl, res) # 绘制结果
pl.show_image() # 显示结果
gc.collect() # 垃圾回收
except Exception as e:
sys.print_exception(e) # 打印异常信息
finally:
my_ai.deinit() # 反初始化
pl.destroy() # 销毁PipeLine实例
多个Ai2d实例时的伪代码如下:
from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import os
from media.media import *
import nncase_runtime as nn
import ulab.numpy as np
import image
import gc
import sys
# 自定义AI任务类,继承自AIBase基类
class MyAIApp(AIBase):
def __init__(self, kmodel_path, model_input_size, rgb888p_size=[224,224], display_size=[1920,1080], debug_mode=0):
# 调用基类的构造函数
super().__init__(kmodel_path, model_input_size, rgb888p_size, debug_mode)
# 模型文件路径
self.kmodel_path = kmodel_path
# 模型输入分辨率
self.model_input_size = model_input_size
# sensor给到AI的图像分辨率,并对宽度进行16的对齐
self.rgb888p_size = [ALIGN_UP(rgb888p_size[0], 16), rgb888p_size[1]]
# 显示分辨率,并对宽度进行16的对齐
self.display_size = [ALIGN_UP(display_size[0], 16), display_size[1]]
# 是否开启调试模式
self.debug_mode = debug_mode
# 实例化Ai2d,用于实现模型预处理
self.ai2d_resize = Ai2d(debug_mode)
# 设置Ai2d的输入输出格式和类型
self.ai2d_resize.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8)
# 实例化Ai2d,用于实现模型预处理
self.ai2d_resize = Ai2d(debug_mode)
# 设置Ai2d的输入输出格式和类型
self.ai2d_resize.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8)
# 实例化Ai2d,用于实现模型预处理
self.ai2d_crop = Ai2d(debug_mode)
# 设置Ai2d的输入输出格式和类型
self.ai2d_crop.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8)
# 配置预处理操作,这里使用了resize和crop,Ai2d支持crop/shift/pad/resize/affine,具体代码请打开/sdcard/app/libs/AI2D.py查看
def config_preprocess(self, input_image_size=None):
with ScopedTiming("set preprocess config", self.debug_mode > 0):
# 初始化ai2d预处理配置,默认为sensor给到AI的尺寸,可以通过设置input_image_size自行修改输入尺寸
ai2d_input_size = input_image_size if input_image_size else self.rgb888p_size
# 配置resize预处理方法
self.ai2d_resize.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
# 构建预处理流程
self.ai2d_resize.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,640,640])
# 配置crop预处理方法
self.ai2d_crop.crop(0,0,320,320)
# 构建预处理流程
self.ai2d_crop.build([1,3,640,640],[1,3,320,320])
# 假设该任务需要crop和resize预处理,顺序是先resize再crop,该顺序不符合ai2d的处理顺序,因此需要设置两个Ai2d实例分别处理
def preprocess(self,input_np):
resize_tensor=self.ai2d_resize.run(input_np)
resize_np=resize_tensor.to_numpy()
crop_tensor=self.ai2d_crop.run(resize_np)
return [crop_tensor]
# 自定义当前任务的后处理,results是模型输出array列表,需要根据实际任务重写
def postprocess(self, results):
with ScopedTiming("postprocess", self.debug_mode > 0):
pass
# 绘制结果到画面上,需要根据任务自己写
def draw_result(self, pl, dets):
with ScopedTiming("display_draw", self.debug_mode > 0):
pass
# 重写deinit,释放多个ai2d资源
def deinit(self):
with ScopedTiming("deinit",self.debug_mode > 0):
del self.ai2d_resize
del self.ai2d_crop
super().deinit()
if __name__ == "__main__":
# 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
display_mode="hdmi"
if display_mode=="hdmi":
display_size=[1920,1080]
else:
display_size=[800,480]
# 设置模型路径,这里要替换成当前任务模型
kmodel_path = "example_test.kmodel"
rgb888p_size = [1920, 1080]
###### 其它参数########
...
######################
# 初始化PipeLine,用于图像处理流程
pl = PipeLine(rgb888p_size=rgb888p_size, display_size=display_size, display_mode=display_mode)
pl.create() # 创建PipeLine实例
# 初始化自定义AI任务实例
my_ai = MyAIApp(kmodel_path, model_input_size=[320, 320],rgb888p_size=rgb888p_size, display_size=display_size, debug_mode=0)
my_ai.config_preprocess() # 配置预处理
try:
while True:
os.exitpoint() # 检查是否有退出信号
with ScopedTiming("total",1):
img = pl.get_frame() # 获取当前帧数据
res = my_ai.run(img) # 推理当前帧
my_ai.draw_result(pl, res) # 绘制结果
pl.show_image() # 显示结果
gc.collect() # 垃圾回收
except Exception as e:
sys.print_exception(e) # 打印异常信息
finally:
my_ai.deinit() # 反初始化
pl.destroy() # 销毁PipeLine实例
3.2 自定义预处理任务
该任务只有一个模型,只需要关注该模型的前处理、推理、后处理过程,此类任务的前处理不使用Ai2d实现,可以使用ulab.numpy自定义,后处理基于场景自定义。
编写自定义任务类,主要关注任务类的preprocess、postprocess、以及该任务需要的其他方法如:draw_result等
对于需要重写前处理(不使用提供的ai2d类,自己手动写预处理)的AI任务伪代码如下:
from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import os
from media.media import *
import nncase_runtime as nn
import ulab.numpy as np
import image
import gc
import sys
# 自定义AI任务类,继承自AIBase基类
class MyAIApp(AIBase):
def __init__(self, kmodel_path, model_input_size, rgb888p_size=[224,224], display_size=[1920,1080], debug_mode=0):
# 调用基类的构造函数
super().__init__(kmodel_path, model_input_size, rgb888p_size, debug_mode)
# 模型文件路径
self.kmodel_path = kmodel_path
# 模型输入分辨率
self.model_input_size = model_input_size
# sensor给到AI的图像分辨率,并对宽度进行16的对齐
self.rgb888p_size = [ALIGN_UP(rgb888p_size[0], 16), rgb888p_size[1]]
# 显示分辨率,并对宽度进行16的对齐
self.display_size = [ALIGN_UP(display_size[0], 16), display_size[1]]
# 是否开启调试模式
self.debug_mode = debug_mode
# 实例化Ai2d,用于实现模型预处理
self.ai2d = Ai2d(debug_mode)
# 设置Ai2d的输入输出格式和类型
self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8)
# 对于不使用ai2d完成预处理的AI任务,使用封装的接口或者ulab.numpy实现预处理,需要在子类中重写该函数
def preprocess(self,input_np):
#############
#注意自定义预处理过程
#############
return [tensor]
# 自定义当前任务的后处理,results是模型输出array列表,需要根据实际任务重写
def postprocess(self, results):
with ScopedTiming("postprocess", self.debug_mode > 0):
pass
# 绘制结果到画面上,需要根据任务自己写
def draw_result(self, pl, dets):
with ScopedTiming("display_draw", self.debug_mode > 0):
pass
if __name__ == "__main__":
# 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
display_mode="hdmi"
if display_mode=="hdmi":
display_size=[1920,1080]
else:
display_size=[800,480]
# 设置模型路径,这里要替换成当前任务模型
kmodel_path = "example_test.kmodel"
rgb888p_size = [1920, 1080]
###### 其它参数########
...
######################
# 初始化PipeLine,用于图像处理流程
pl = PipeLine(rgb888p_size=rgb888p_size, display_size=display_size, display_mode=display_mode)
pl.create() # 创建PipeLine实例
# 初始化自定义AI任务实例
my_ai = MyAIApp(kmodel_path, model_input_size=[320, 320],rgb888p_size=rgb888p_size, display_size=display_size, debug_mode=0)
my_ai.config_preprocess() # 配置预处理
try:
while True:
os.exitpoint() # 检查是否有退出信号
with ScopedTiming("total",1):
img = pl.get_frame() # 获取当前帧数据
res = my_ai.run(img) # 推理当前帧
my_ai.draw_result(pl, res) # 绘制结果
pl.show_image() # 显示结果
gc.collect() # 垃圾回收
except Exception as e:
sys.print_exception(e) # 打印异常信息
finally:
my_ai.deinit() # 反初始化
pl.destroy() # 销毁PipeLine实例
3.3 无预处理任务
该任务只有一个模型且不需要预处理,只需要关注该模型的推理和后处理过程,此类任务一般作为多模型任务的一部分,直接对前一个模型的输出做为输入推理,后处理基于需求自定义。
编写自定义任务类,主要关注任务类的run(模型推理的整个过程,包括preprocess、inference、postprocess中的全部或某一些步骤)、postprocess、以及该任务需要的其他方法如:draw_results等
对于不需要预处理(直接输入推理)的AI任务伪代码如下:
from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import os
from media.media import *
import nncase_runtime as nn
import ulab.numpy as np
import image
import gc
import sys
# 自定义AI任务类,继承自AIBase基类
class MyAIApp(AIBase):
def __init__(self, kmodel_path, model_input_size, rgb888p_size=[224,224], display_size=[1920,1080], debug_mode=0):
# 调用基类的构造函数
super().__init__(kmodel_path, model_input_size, rgb888p_size, debug_mode)
# 模型文件路径
self.kmodel_path = kmodel_path
# 模型输入分辨率
self.model_input_size = model_input_size
# sensor给到AI的图像分辨率,并对宽度进行16的对齐
self.rgb888p_size = [ALIGN_UP(rgb888p_size[0], 16), rgb888p_size[1]]
# 显示分辨率,并对宽度进行16的对齐
self.display_size = [ALIGN_UP(display_size[0], 16), display_size[1]]
# 是否开启调试模式
self.debug_mode = debug_mode
# 自定义当前任务的后处理,results是模型输出array列表,需要根据实际任务重写
def postprocess(self, results):
with ScopedTiming("postprocess", self.debug_mode > 0):
pass
# 对于用预处理的AI任务,需要在子类中重写该函数
def run(self,inputs_np):
# 先将ulab.numpy.ndarray列表转换成tensor列表
tensors=[]
for input_np in inputs_np:
tensors.append(nn.from_numpy(input_np))
# 调用AIBase内的inference函数进行模型推理
results=self.inference(tensors)
# 调用当前子类的postprocess方法进行自定义后处理
outputs=self.postprocess(results)
return outputs
# 绘制结果到画面上,需要根据任务自己写
def draw_result(self, pl, dets):
with ScopedTiming("display_draw", self.debug_mode > 0):
pass
if __name__ == "__main__":
# 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
display_mode="hdmi"
if display_mode=="hdmi":
display_size=[1920,1080]
else:
display_size=[800,480]
# 设置模型路径,这里要替换成当前任务模型
kmodel_path = "example_test.kmodel"
rgb888p_size = [1920, 1080]
###### 其它参数########
...
######################
# 初始化PipeLine,用于图像处理流程
pl = PipeLine(rgb888p_size=rgb888p_size, display_size=display_size, display_mode=display_mode)
pl.create() # 创建PipeLine实例
# 初始化自定义AI任务实例
my_ai = MyAIApp(kmodel_path, model_input_size=[320, 320],rgb888p_size=rgb888p_size, display_size=display_size, debug_mode=0)
my_ai.config_preprocess() # 配置预处理
try:
while True:
os.exitpoint() # 检查是否有退出信号
with ScopedTiming("total",1):
img = pl.get_frame() # 获取当前帧数据
res = my_ai.run(img) # 推理当前帧
my_ai.draw_result(pl, res) # 绘制结果
pl.show_image() # 显示结果
gc.collect() # 垃圾回收
except Exception as e:
sys.print_exception(e) # 打印异常信息
finally:
my_ai.deinit() # 反初始化
pl.destroy() # 销毁PipeLine实例
3.4 多模型任务
该任务包含多个模型,可能是串联,也可能是其他组合方式。对于每个模型基本上属于前三种模型中的一种,最后通过一个完整的任务类将上述模型子任务统一起来。
编写多个子模型任务类,不同子模型任务参照前三种任务定义。不同任务关注不同的方法。
编写多模型任务类,将子模型任务类统一起来实现整个场景。
以双模型串联推理为例,给出的伪代码如下:
from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import os
from media.media import *
import nncase_runtime as nn
import ulab.numpy as np
import image
import gc
import sys
# 自定义AI任务类,继承自AIBase基类
class MyAIApp_1(AIBase):
def __init__(self, kmodel_path, model_input_size, rgb888p_size=[224,224], display_size=[1920,1080], debug_mode=0):
# 调用基类的构造函数
super().__init__(kmodel_path, model_input_size, rgb888p_size, debug_mode)
# 模型文件路径
self.kmodel_path = kmodel_path
# 模型输入分辨率
self.model_input_size = model_input_size
# sensor给到AI的图像分辨率,并对宽度进行16的对齐
self.rgb888p_size = [ALIGN_UP(rgb888p_size[0], 16), rgb888p_size[1]]
# 显示分辨率,并对宽度进行16的对齐
self.display_size = [ALIGN_UP(display_size[0], 16), display_size[1]]
# 是否开启调试模式
self.debug_mode = debug_mode
# 实例化Ai2d,用于实现模型预处理
self.ai2d = Ai2d(debug_mode)
# 设置Ai2d的输入输出格式和类型
self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8)
# 配置预处理操作,这里使用了resize,Ai2d支持crop/shift/pad/resize/affine,具体代码请打开/sdcard/app/libs/AI2D.py查看
def config_preprocess(self, input_image_size=None):
with ScopedTiming("set preprocess config", self.debug_mode > 0):
# 初始化ai2d预处理配置,默认为sensor给到AI的尺寸,可以通过设置input_image_size自行修改输入尺寸
ai2d_input_size = input_image_size if input_image_size else self.rgb888p_size
# 配置resize预处理方法
self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
# 构建预处理流程
self.ai2d.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,self.model_input_size[1],self.model_input_size[0]])
# 自定义当前任务的后处理,results是模型输出array列表,需要根据实际任务重写
def postprocess(self, results):
with ScopedTiming("postprocess", self.debug_mode > 0):
pass
# 自定义AI任务类,继承自AIBase基类
class MyAIApp_2(AIBase):
def __init__(self, kmodel_path, model_input_size, rgb888p_size=[224,224], display_size=[1920,1080], debug_mode=0):
# 调用基类的构造函数
super().__init__(kmodel_path, model_input_size, rgb888p_size, debug_mode)
# 模型文件路径
self.kmodel_path = kmodel_path
# 模型输入分辨率
self.model_input_size = model_input_size
# sensor给到AI的图像分辨率,并对宽度进行16的对齐
self.rgb888p_size = [ALIGN_UP(rgb888p_size[0], 16), rgb888p_size[1]]
# 显示分辨率,并对宽度进行16的对齐
self.display_size = [ALIGN_UP(display_size[0], 16), display_size[1]]
# 是否开启调试模式
self.debug_mode = debug_mode
# 实例化Ai2d,用于实现模型预处理
self.ai2d = Ai2d(debug_mode)
# 设置Ai2d的输入输出格式和类型
self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8)
# 配置预处理操作,这里使用了resize,Ai2d支持crop/shift/pad/resize/affine,具体代码请打开/sdcard/app/libs/AI2D.py查看
def config_preprocess(self, input_image_size=None):
with ScopedTiming("set preprocess config", self.debug_mode > 0):
# 初始化ai2d预处理配置,默认为sensor给到AI的尺寸,可以通过设置input_image_size自行修改输入尺寸
ai2d_input_size = input_image_size if input_image_size else self.rgb888p_size
# 配置resize预处理方法
self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
# 构建预处理流程
self.ai2d.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,self.model_input_size[1],self.model_input_size[0]])
# 自定义当前任务的后处理,results是模型输出array列表,需要根据实际任务重写
def postprocess(self, results):
with ScopedTiming("postprocess", self.debug_mode > 0):
pass
class MyApp:
def __init__(kmodel1_path,kmodel2_path,kmodel1_input_size,kmodel2_input_size,rgb888p_size,display_size,debug_mode):
# 创建两个模型推理的实例
self.app_1=MyApp_1(kmodel1_path,kmodel1_input_size,rgb888p_size,display_size,debug_mode)
self.app_2=MyApp_2(kmodel2_path,kmodel2_input_size,rgb888p_size,display_size,debug_mode)
self.app_1.config_preprocess()
# 编写run函数,具体代码根据AI任务的需求编写,此处只是给出一个示例
def run(self,input_np):
outputs_1=self.app_1.run(input_np)
outputs_2=[]
for out in outputs_1:
self.app_2.config_preprocess(out)
out_2=self.app_2.run(input_np)
outputs_2.append(out_2)
return outputs_1,outputs_2
# 绘制
def draw_result(self,pl,outputs_1,outputs_2):
pass
######其他函数########
# 省略
####################
if __name__ == "__main__":
# 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
display_mode="hdmi"
if display_mode=="hdmi":
display_size=[1920,1080]
else:
display_size=[800,480]
rgb888p_size = [1920, 1080]
# 设置模型路径,这里要替换成当前任务模型
kmodel1_path = "test_kmodel1.kmodel"
kmdoel1_input_size=[320,320]
kmodel2_path = "test_kmodel2.kmodel"
kmodel2_input_size=[48,48]
###### 其它参数########
# 省略
######################
# 初始化PipeLine,用于图像处理流程
pl = PipeLine(rgb888p_size=rgb888p_size, display_size=display_size, display_mode=display_mode)
pl.create() # 创建PipeLine实例
# 初始化自定义AI任务实例
my_ai = MyApp(kmodel1_path,kmodel2_path, kmodel1_input_size,kmodel2_input_size,rgb888p_size=rgb888p_size, display_size=display_size, debug_mode=0)
my_ai.config_preprocess() # 配置预处理
try:
while True:
os.exitpoint() # 检查是否有退出信号
with ScopedTiming("total",1):
img = pl.get_frame() # 获取当前帧数据
outputs_1,outputs_2 = my_ai.run(img) # 推理当前帧
my_ai.draw_result(pl, outputs_1,outputs_2) # 绘制结果
pl.show_image() # 显示结果
gc.collect() # 垃圾回收
except Exception as e:
sys.print_exception(e) # 打印异常信息
finally:
my_ai.app_1.deinit() # 反初始化
my_ai.app_2.deinit()
pl.destroy() # 销毁PipeLine实例
- 2024-11-04
-
发表了主题帖:
嘉楠K230AI开发板测评6--条形码、二维码与AprilTag标签识别
嘉楠科K230AI开发板测评6--机器视觉篇
1、条形码识别
条形码(barcode)是将宽度不等的多个黑条和空白,按照一定的编码规则排列,用以表达一组信息的图形标识符。常见的条形码是由反射率相差很大的黑条(简称条)和白条(简称空)排成的平行线图案。条形码可以标出物品的生产国、制造厂家、商品名称、生产日期、图书分类号、邮件起止地点、类别、日期等许多信息,因而在商品流通、图书管理、邮政管理、银行系统等许多领域都得到广泛的应用。
编程实现条形码识别,并将识别到的信息通过串口终端打印出来。
对于CanMV K230而言,直接使用MicroPython中的find_barcodes()即可获取摄像头采集图像中条形码的相关信息。
该函数支持所有一维条形码:image.EAN2 image.EAN5 image.EAN8 image.UPCE image.ISBN10 image.UPCA image.EAN13 image.ISBN13 image.I25 image.DATABAR (RSS-14) image.DATABAR_EXP (RSS-Expanded) image.CODABAR image.CODE39 image.PDF417 image.CODE93 image.CODE128
条形码对象是由 image.find_barcodes 返回的。
barcode.corners()返回一个由该对象的四个角组成的四个元组(x,y)的列表。四个角通常是按照从左上角开始沿顺时针顺序返回的。
barcode.rect()返回一个矩形元组(x, y, w, h),用于如数据矩阵的边界框的 image.draw_rectangle 等其他的 image 方法。
barcode.payload()返回条形码的有效载荷的字符串。例:数量。
barcode.type()返回条形码的列举类型 (int)。
barcode.rotation()返回以弧度计的条形码的旋度(浮点数)。
barcode.quality()返回条形码在图像中被检测到的次数(int)。
调用find_barcodes()函数,对得到的结果再进行处理即可,代码编写流程如下:
参考代码如下:
'''
实验名称:条形码识别
实验平台:01Studio CanMV K230
说明:编程实现摄像头识别各类条形码
'''
import time, math, os, gc
from media.sensor import * #导入sensor模块,使用摄像头相关接口
from media.display import * #导入display模块,使用display相关接口
from media.media import * #导入media模块,使用meida相关接口
#定义条形码类型
def barcode_name(code):
if(code.type() == image.EAN2):
return "EAN2"
if(code.type() == image.EAN5):
return "EAN5"
if(code.type() == image.EAN8):
return "EAN8"
if(code.type() == image.UPCE):
return "UPCE"
if(code.type() == image.ISBN10):
return "ISBN10"
if(code.type() == image.UPCA):
return "UPCA"
if(code.type() == image.EAN13):
return "EAN13"
if(code.type() == image.ISBN13):
return "ISBN13"
if(code.type() == image.I25):
return "I25"
if(code.type() == image.DATABAR):
return "DATABAR"
if(code.type() == image.DATABAR_EXP):
return "DATABAR_EXP"
if(code.type() == image.CODABAR):
return "CODABAR"
if(code.type() == image.CODE39):
return "CODE39"
if(code.type() == image.PDF417):
return "PDF417"
if(code.type() == image.CODE93):
return "CODE93"
if(code.type() == image.CODE128):
return "CODE128"
try:
sensor = Sensor() #构建摄像头对象
sensor.reset() #复位和初始化摄像头
#sensor.set_framesize(Sensor.FHD) #设置帧大小FHD(1920x1080),默认通道0
sensor.set_framesize(width=800, height=480) #设置帧大小VGA,默认通道0
sensor.set_pixformat(Sensor.RGB565) #设置输出图像格式,默认通道0
Display.init(Display.ST7701, to_ide=True) #同时使用3.5寸mipi屏和IDE缓冲区显示图像,800x480分辨率
#Display.init(Display.VIRT, sensor.width(), sensor.height()) #只使用IDE缓冲区显示图像
MediaManager.init() #初始化media资源管理器
sensor.run() #启动sensor
clock = time.clock()
while True:
os.exitpoint() #检测IDE中断
clock.tick()
img = sensor.snapshot() #拍摄图片
codes = img.find_barcodes() #查找图像中所有条形码
for code in codes:
#对条码画矩形表示
img.draw_rectangle(code.rect(),thickness=2)
#打印相关信息
print_args = (barcode_name(code), code.payload(), (180 * code.rotation()) / math.pi, code.quality())
print("Barcode %s, Payload \"%s\", rotation %f (degrees), quality %d" % print_args)
img.draw_string_advanced(0, 0, 30, code.payload(), color = (255, 255, 255)) #图像显示条码信息
Display.show_image(img) #显示图片
print(clock.fps()) #打印帧率
###################
# IDE中断释放资源代码
###################
except KeyboardInterrupt as e:
print(f"user stop")
except BaseException as e:
print(f"Exception '{e}'")
finally:
# sensor stop run
if isinstance(sensor, Sensor):
sensor.stop()
# deinit display
Display.deinit()
os.exitpoint(os.EXITPOINT_ENABLE_SLEEP)
time.sleep_ms(100)
# release media buffer
MediaManager.deinit()
实验结果如下,为了更好地识别,图像上条形码需比较平展,不能太小;在线生成一个条形码值位“ABC-abc-1234”的Code 128型条形码,运行程序,打开条形码图片。摄像头正对条形码,识别成功后可以看到图片出现方框以及在串口终端打印出条形码信息。
2、二维码识别
二维码又称二维条码,常见的二维码为QR Code,QR全称Quick Response,是一个近几年来移动设备上超流行的一种编码方式,它比传统的Bar Code条形码能存更多的信息,也能表示更多的数据类型。
二维条码/二维码(2-dimensional bar code)是用某种特定的几何图形按一定规律在平面(二维方向上)分布的、黑白相间的、记录数据符号信息的图形;在代码编制上巧妙地利用构成计算机内部逻辑基础的“0”、“1”比特流的概念,使用若干个与二进制相对应的几何形体来表示文字数值信息,通过图象输入设备或光电扫描设备自动识读以实现信息自动处理:它具有条码技术的一些共性:每种码制有其特定的字符集;每个字符占有一定的宽度;具有一定的校验功能等。同时还具有对不同行的信息自动识别功能、及处理图形旋转变化点。
MicroPython中的find_qrcodes()即可获取摄像头采集图像中二维码的相关信息。
二维码对象是由 image.find_qrcodes 返回的。
qrcode.corners()返回一个由该对象的四个角组成的四个元组(x,y)的列表。四个角通常是按照从左上角开始沿顺时针顺序返回的。
qrcode.rect()返回一个矩形元组(x, y, w, h),用于如二维码的边界框的 image.draw_rectangle 等其他的 image 方法。
qrcode.payload()返回二维码有效载荷的字符串,例如URL 。
qrcode.version()返回二维码的版本号(int)。
qrcode.ecc_level()返回二维码的ECC水平(int)。
qrcode.mask()返回二维码的掩码(int)。
qrcode.data_type()返回二维码的数据类型。
qrcode.eci()返回二维码的ECI。ECI储存了QR码中存储数据字节的编码。若想要处理包含超过标准ASCII文本的二维码,您需要查看这一数值。
MicroPython编程我们只需要简单地调用find_qrcodes()函数,对得到的结果再进行处理即可,代码编写流程如下:
核心代码如下:
''''''''''''
img = sensor.snapshot() #拍摄图片
res = img.find_qrcodes() #寻找所有二维码,返回列表,每个值为一个二维码
if len(res) > 0: #在图片和终端显示二维码信息
img.draw_rectangle(res[0].rect(), thickness=2)#列表不能属性,列表的值可以属性
img.draw_string_advanced(0, 0, 30, res[0].payload(), color = (255, 255, 255))
print(res[0].payload()) #串口终端打印
Display.show_image(img) #显示图片
print(clock.fps()) #打印帧率
''''''''''''
实验结果,为了更好地识别,图像上二维码需比较平展,不能太小,在线生成一个值为“https://www.bing.com”的QR码,运行程序,打开二维码图片;摄像头正对二维码,识别成功后可以看到图片出现方框以及在串口终端打印出二维码信息。
3、AprilTag标签识别
AprilTag是一种视觉基准系统,可用于多种任务,包括增强现实、机器人和相机校准。可以通过普通打印机创建目标,AprilTag 检测软件可以计算标签相对于相机的精确3D位置、方向和标识。
AprilTag通过特定的标志(与二维码相似,但是降低了复杂度以满足实时性要求),可以快速地检测标志,并计算相对位置。
AprilTag内容主要包含三个步骤:
第一步是如何根据梯度检测出图像中的各种边缘。
第二步即如何在边缘图像中找出需要的四边形图案并进行筛选,AprilTag尽可能的对检测出的边缘检测,首先剔除非直线边缘,在直线边缘进行邻接边缘查找,最终若形成闭环则为检测到一个四边形。
最后一个步便是如何进行二维码编码和二维码解码,编码方式分为三种,其黑边色块长度分别为8,7,6三个色块长度,对于解码内容,要在检测到的四边形内生成点阵列用于计算每色块的值,再根据局部二值模式(Local Binary Patterns)构造简单分类器对四边形内的色块进行分类,将正例色块编码为1将负例色块编码为0,就可以得到该二维码的编码。得到编码以后再与已知库内的编码进行匹配,确定解码出的二维码是否为正确。
可以将AprilTag简单地理解为一个特定信息的二维码,有family和ID两个概念:
TAG16H5 → 0 to 29
TAG25H7 → 0 to 241
TAG25H9 → 0 to 34
TAG36H10 → 0 to 2319
TAG36H11 → 0 to 586 (CanMV K230推荐使用)
ARTOOLKIT → 0 to 511
以【TAG36H11 → 0 to 586】为例,family信息就是:TAG36H11 , ID可以是“0 到 586” ,也就是一共有587种标记码。
不同家族区别:TAG16H5的有效区域是 4x4 的方块,那么它比TAG36H11看的更远(因为他有 6x6 个方块)。 但是内容少,所以TAG16H5的错误率比TAG36H11 高很多,因为TAG36H11的校验信息多。CanMV K210推荐使用TAG36H11家族的标记码。
可以在CanMV IDE生成AprilTag。点击工具--机器视觉--AprilTag生成器--TAG36H11家族:最小输入0 ,最大输入9 ,制作id从0-9共10张标签。点击OK后选择要生成的位置文件夹即可,如下图:
识别apriltag使用find_apriltags对象函数,返回一个 image.apriltag 对象的列表。
与二维码相比,AprilTags可在更远距离、较差光线和更扭曲的图像环境下被检测到。 AprilTags可应对所有种类的图像失真问题,而二维码并不能。也就是说,AprilTags只能将数字ID编码作为其有效载荷。
AprilTags也可用于本地化。每个 image.apriltag 对象都从摄像机返回其三维位置信息和旋转角度。 位置信息由 fx 、 fy 、 cx 和 cy 决定,分别为X和Y方向上图像的焦距和中心点。
tag.rect()返回一个矩形元组(x,y,w,h),二维码的边界。可以通过索引[0-3]来获得单个值。
tag.family()家族信息。
tag.id()ID信息。
tag.rotation()方向。
代码编写流程如下:
参考代码如下:
'''
实验名称:AprilTags标签识别
实验平台:01Studio CanMV K230
教程:wiki.01studio.cc
说明:推荐使用QVGA(320x240)分辨率,分辨率太高帧率会下降。
'''
import time, math, os, gc
from media.sensor import * #导入sensor模块,使用摄像头相关接口
from media.display import * #导入display模块,使用display相关接口
from media.media import * #导入media模块,使用meida相关接口
# apriltag代码最多支持可以同时处理6种tag家族。
# 返回的tag标记对象,将有其tag标记家族及其在tag标记家族内的id。
tag_families = 0
tag_families |= image.TAG16H5 # 注释掉,禁用这个家族
tag_families |= image.TAG25H7 # 注释掉,禁用这个家族
tag_families |= image.TAG25H9 # 注释掉,禁用这个家族
tag_families |= image.TAG36H10 # 注释掉,禁用这个家族
tag_families |= image.TAG36H11 # 注释掉以禁用这个家族(默认家族)
tag_families |= image.ARTOOLKIT # 注释掉,禁用这个家族
#标签系列有什么区别? 那么,例如,TAG16H5家族实际上是一个4x4的方形标签。
#所以,这意味着可以看到比6x6的TAG36H11标签更长的距离。
#然而,较低的H值(H5对H11),意味着4x4标签的假阳性率远高于6x6标签。
#所以,除非你有理由使用其他标签系列,否则使用默认族TAG36H11。
def family_name(tag):
if(tag.family() == image.TAG16H5):
return "TAG16H5"
if(tag.family() == image.TAG25H7):
return "TAG25H7"
if(tag.family() == image.TAG25H9):
return "TAG25H9"
if(tag.family() == image.TAG36H10):
return "TAG36H10"
if(tag.family() == image.TAG36H11):
return "TAG36H11"
if(tag.family() == image.ARTOOLKIT):
return "ARTOOLKIT"
try:
sensor = Sensor(width=1280, height=960) #构建摄像头对象,将摄像头长宽设置为4:3
sensor.reset() #复位和初始化摄像头
sensor.set_framesize(width=320, height=240) #设置帧大小为LCD分辨率(800x480),默认通道0
sensor.set_pixformat(Sensor.RGB565) #设置输出图像格式,默认通道0
Display.init(Display.ST7701, to_ide=True) #同时使用3.5寸mipi屏和IDE缓冲区显示图像,800x480分辨率
#Display.init(Display.VIRT, sensor.width(), sensor.height()) #只使用IDE缓冲区显示图像
MediaManager.init() #初始化media资源管理器
sensor.run() #启动sensor
clock = time.clock()
while True:
os.exitpoint() #检测IDE中断
clock.tick()
img = sensor.snapshot() #拍摄图片
for tag in img.find_apriltags(families=tag_families): # 如果没有给出家族,默认TAG36H11。
img.draw_rectangle(tag.rect(), color = (255, 0, 0), thickness=4)
img.draw_cross(tag.cx(), tag.cy(), color = (0, 255, 0), thickness=2)
print_args = (family_name(tag), tag.id(), (180 * tag.rotation()) / math.pi) #打印标签信息
print("Tag Family %s, Tag ID %d, rotation %f (degrees)" % print_args)
#img.draw_string_advanced(0, 0, 30, code.payload(), color = (255, 255, 255)) #图像显示条码信息
#Display.show_image(img) #显示图片
#显示图片,LCD居中方式显示
Display.show_image(img, x=round((800-sensor.width())/2),y=round((480-sensor.height())/2)) #显示图片
print(clock.fps()) #打印帧率
###################
# IDE中断释放资源代码
###################
except KeyboardInterrupt as e:
print(f"user stop")
except BaseException as e:
print(f"Exception '{e}'")
finally:
# sensor stop run
if isinstance(sensor, Sensor):
sensor.stop()
# deinit display
Display.deinit()
os.exitpoint(os.EXITPOINT_ENABLE_SLEEP)
time.sleep_ms(100)
# release media buffer
MediaManager.deinit()
实验结果如下,打开family: TAG36H11 , id: 0的标签图片测试:
- 2024-11-03
-
回复了主题帖:
嘉楠K230AI开发板测评4--图像显示、画图、边缘/线段/圆形/矩形检测、线性回归
秦天qintian0303 发表于 2024-11-2 23:09
这个学习怎么喂图啊?拍摄好的?还是喂视频?
直接电脑截图,哈哈
-
回复了主题帖:
嘉楠K230AI开发板测评4--图像显示、画图、边缘/线段/圆形/矩形检测、线性回归
littleshrimp 发表于 2024-11-2 21:58
这个开发板这么强大吗?如果不用micropython能实现相同的功能吗?或者micropython在商用产品上使用适不 ...
micropython都是封装好的函数,比较容易上手,也可以用linux开发,不过比较麻烦,没有micropython的生态好,个人感觉拿来商用的话成本还是太高了
-
发表了主题帖:
嘉楠K230AI开发板测评5---颜色识别、摄像头物体计数与巡线
本帖最后由 dfjs 于 2024-11-3 21:29 编辑
嘉楠科K230AI开发板测评5--机器视觉篇
1.单一颜色识别
预先设定颜色阈值,如红、绿、蓝,这样K230摄像头采集图像后就能自动识别了。
CanMV集成了RGB565颜色块识别find_blobs函数(其位于image模块下),主要是基于LAB(L:亮度,取值0-100,表示从纯黑到纯白的变化;A代表从绿色到红色的范围,取值是-128--127;B代表从蓝色到黄色的范围,取值是-128--127)颜色模型,每个颜色都是用一组LAB阈值表示。
image.find_blobs(thresholds[, invert=False[, roi[, x_stride=2[, y_stride=1[, area_threshold=10 [, pixels_threshold=10[, merge=False[, margin=0[, threshold_cb=None[, merge_cb=None]]]]]]]]]])函数查找图像中指定的色块,返回image.blog对象列表。
thresholds: 必须是元组列表。 [(lo, hi), (lo, hi), ..., (lo, hi)] 定义你想追踪的颜色范围。 对于灰度图像,每个元组需要包含两个值 - 最小灰度值和最大灰度值。 仅考虑落在这些阈值之间的像素区域。 对于RGB565图像,每个元组需要有六个值(l_lo,l_hi,a_lo,a_hi,b_lo,b_hi) - 分别是LAB L,A和B通道的最小值和最大值;
area_threshold: 若色块的边界框区域小于此参数值,则会被过滤掉;
pixels_threshold: 若色块的像素数量小于此参数值,则会被过滤掉;
merge: 若为True,则合并所有没有被过滤的色块;
margin: 调整合并色块的边缘。
blob.rect()函数返回一个矩形元组(x,y,w,h),如色块边界。可以通过索引[0-3]来获得这些值。
blob.cx()返回色块(int)的中心x位置。可以通过索引[5]来获得这个值。
blob.cy()返回色块(int)的中心y位置。可以通过索引[6]来获得这个值。
代码编写流程如下:
参考代码:
'''
实验名称:单一颜色识别
实验平台:01Studio CanMV K230
教程:wiki.01studio.cc
'''
import time, os, sys
from media.sensor import * #导入sensor模块,使用摄像头相关接口
from media.display import * #导入display模块,使用display相关接口
from media.media import * #导入media模块,使用meida相关接口
# 颜色识别阈值 (L Min, L Max, A Min, A Max, B Min, B Max) LAB模型
# 下面的阈值元组是用来识别 红、绿、蓝三种颜色,当然你也可以调整让识别变得更好。
thresholds = [(30, 100, 15, 127, 15, 127), # 红色阈值
(30, 100, -64, -8, 50, 70), # 绿色阈值
(0, 40, 0, 90, -128, -20)] # 蓝色阈值
try:
sensor = Sensor() #构建摄像头对象
sensor.reset() #复位和初始化摄像头
sensor.set_framesize(width=800, height=480) #设置帧大小为LCD分辨率(800x480),默认通道0
sensor.set_pixformat(Sensor.RGB565) #设置输出图像格式,默认通道0
Display.init(Display.ST7701, to_ide=True) #同时使用3.5寸mipi屏和IDE缓冲区显示图像,800x480分辨率
#Display.init(Display.VIRT, sensor.width(), sensor.height()) #只使用IDE缓冲区显示图像
MediaManager.init() #初始化media资源管理器
sensor.run() #启动sensor
clock = time.clock()
while True:
os.exitpoint() #检测IDE中断
################
## 这里编写代码 ##
################
clock.tick()
img = sensor.snapshot() #拍摄一张图片
blobs = img.find_blobs([thresholds[0]]) # 0,1,2分别表示红,绿,蓝色。
if blobs:
for b in blobs: #画矩形和箭头表示
tmp=img.draw_rectangle(b[0:4], thickness = 2)
tmp=img.draw_cross(b[5], b[6], thickness = 2)
img.draw_string_advanced(0, 0, 30, 'FPS: '+str("%.3f"%(clock.fps())), color = (255, 255, 255))
Display.show_image(img) #显示图片
print(clock.fps()) #打印FPS
###################
# IDE中断释放资源代码
###################
except KeyboardInterrupt as e:
print("user stop: ", e)
except BaseException as e:
print(f"Exception {e}")
finally:
# sensor stop run
if isinstance(sensor, Sensor):
sensor.stop()
# deinit display
Display.deinit()
os.exitpoint(os.EXITPOINT_ENABLE_SLEEP)
time.sleep_ms(100)
# release media buffer
MediaManager.deinit()
实验结果,如下图1,也可通过阈值编辑器来手动调节LAB的阈值范围,如下图2。
2.多种颜色识别
基于单一颜色识别,加以修改,即可实现多种颜色识别。
代码编写流程如下:
核心代码如下,与单一颜色识别例程相比,修改的代码如下,在颜色识别前中加入了for循环,识别预设的3种颜色:
# 颜色识别阈值 (L Min, L Max, A Min, A Max, B Min, B Max) LAB模型
# 下面的阈值元组是用来识别 红、绿、蓝三种颜色,当然你也可以调整让识别变得更好。
thresholds = [(30, 100, 15, 127, 15, 127), # 红色阈值
(30, 100, -64, -8, 50, 70), # 绿色阈值
(0, 40, 0, 90, -128, -20)] # 蓝色阈值
colors1 = [(255,0,0), (0,255,0), (0,0,255)]
colors2 = ['RED', 'GREEN', 'BLUE']
..............
img = sensor.snapshot() #拍摄一张图片
for i in range(3):
blobs = img.find_blobs([thresholds[i]]) # 0,1,2分别表示红,绿,蓝色。
if blobs:
for b in blobs: #画矩形、箭头和字符表示
tmp=img.draw_rectangle(b[0:4], thickness = 4, color = colors1[i])
tmp=img.draw_cross(b[5], b[6], thickness = 2)
tmp=img.draw_string_advanced(b[0], b[1]-35, 30, colors2[i],color = colors1[i])
img.draw_string_advanced(0, 0, 30, 'FPS: '+str("%.3f"%(clock.fps())), color = (255, 255, 255))
Display.show_image(img) #显示图片
print(clock.fps()) #打印FPS
实验结果如下,将每个颜色的圆形用矩形画出并表明颜色,与单一颜色识别例程相比,修改的代码如下,在颜色识别前中加入了for循环,识别预设的3种颜色:
3.物体计数(相同颜色)
基于上一节颜色识别我们看到可以识别出色块的数量,来学习如何识别指定颜色的物体,计算其数量。
针对不同颜色的物体我们如何获取它的阈值呢?
先使用 摄像头代码采集物体图像,在IDE右上角缓冲区点击“禁用”将要识别的物体确认下来;点击 工具—机器视觉—阈值编辑器 。在弹出的对话框选择“帧缓冲区”。通过调整下方6个LAB值,使得物体颜色在右边为白色,其余背景为黑色。记录颜色的LAB值,在后面代码中使用,如下图。
代码编写流程如下图:
核心代码如下:
.............
thresholds = [(18, 72, -13, 31, 18, 83)] #黄色跳线帽阈值
.............
img = sensor.snapshot()
blobs = img.find_blobs([thresholds[0]])
if blobs: #画框显示
for b in blobs:
tmp=img.draw_rectangle(b[0:4])
tmp=img.draw_cross(b[5], b[6])
#显示计算信息
img.draw_string_advanced(0, 0, 30, 'FPS: '+str("%.3f"%(clock.fps()))+' Num: '
+str(len(blobs)), color = (255, 255, 255))
Display.show_image(img)
print(clock.fps()) #打印FPS
.............
实验结果如下图,在阈值准确的情况下,统计出跳线帽的数量:
4.机器人巡线(实线)
机器人巡线依然基于颜色识别,根据摄像头采集到的图像直线与中心偏离的位置计算出偏离角度。
对画面是有一定要求的,也就是摄像头采集图像一定要出现唯一1条连续的黑色直线。程序通过对画面切割成三部分,计算每个部分黑色线的中心点X坐标,然后采用加权平均算法估算出直线的偏离位置。通常情况下越靠近底部的地方离摄像头越近,顶部表示远方线段。因此底部的图形权重高。
假设摄像头当前画面的像素是例程的QQVGA分辨率:160(宽)X120(高),左上角坐标为(0,0),然后当前出现直线坐标为(80,120)至(160,0)偏右的直线。上中下三个部分的权重分别为0.1、0.3、0.7(底部图像靠近机器人,权重大,权重总和可以不是1),我们来计算一下其中心值,如下图:
上图中Y轴的中点坐标就是60,X坐标加权平均值计算如下:
X=(80*0.7+120*0.3+160*0.1)/(0.7+0.3+0.1)=98
那么直线偏离坐标可以认为是(98,60),图中绿色“+”位置。那么利用反正切函数可以求出偏离角度:a = atan((98-80)/60)=16.7°,机器人相当于实线的位置往左偏了,所以加一个负号,即 -16.7°;偏离角度就是这么计算出来的。得到偏离角度后就可以自己编程去调整小车或者机器人的运动状态,直到0°为没有偏离。
代码编写思路如下:
参考代码如下:
'''
实验名称:机器人巡线(实线)
实验平台:01Studio CanMV K230
教程:wiki.01studio.cc
# 黑色灰度线巡线跟踪示例
#
#做一个跟随机器人的机器人需要很多的努力。这个示例脚本
#演示了如何做机器视觉部分的线跟随机器人。你
#可以使用该脚本的输出来驱动一个差分驱动机器人
#跟着一条线走。这个脚本只生成一个表示的旋转值(偏离角度)
#你的机器人向左或向右。
#
# 为了让本示例正常工作,你应该将摄像头对准一条直线(实线)
#并将摄像头调整到水平面45度位置。请保证画面内只有1条直线。
'''
import time, os, sys, math
from media.sensor import * #导入sensor模块,使用摄像头相关接口
from media.display import * #导入display模块,使用display相关接口
from media.media import * #导入media模块,使用meida相关接口
# 追踪黑线。使用 [(128, 255)] 追踪白线.
GRAYSCALE_THRESHOLD = [(0, 64)]
# 下面是一个roi【区域】元组列表。每个 roi 用 (x, y, w, h)表示的矩形。
'''
#采样图像QQVGA 160*120,列表把roi把图像分成3个矩形,越靠近的摄像头视野(通常为图像下方)的矩形权重越大。
ROIS = [ # [ROI, weight]
(0, 100, 160, 20, 0.7), # 可以根据不同机器人情况进行调整。
(0, 50, 160, 20, 0.3),
(0, 0, 160, 20, 0.1)
]
'''
#采样图像为QVGA 320*240,列表把roi把图像分成3个矩形,越靠近的摄像头视野(通常为图像下方)的矩形权重越大。
ROIS = [ # [ROI, weight]
(0, 200, 320, 40, 0.7), # 可以根据不同机器人情况进行调整。
(0, 100, 320, 40, 0.3),
(0, 0, 320, 40, 0.1)
]
# 计算以上3个矩形的权值【weight】的和,和不需要一定为1.
weight_sum = 0
for r in ROIS: weight_sum += r[4] # r[4] 为矩形权重值.
try:
sensor = Sensor(width=1280, height=960) #构建摄像头对象,将摄像头长宽设置为4:3
sensor.reset() #复位和初始化摄像头
sensor.set_framesize(width=320, height=240) #设置帧大小,默认通道0
sensor.set_pixformat(Sensor.GRAYSCALE) #设置输出图像格式,默认通道0
Display.init(Display.ST7701, to_ide=True) #同时使用3.5寸mipi屏和IDE缓冲区显示图像,800x480分辨率
#Display.init(Display.VIRT, sensor.width(), sensor.height()) #只使用IDE缓冲区显示图像
MediaManager.init() #初始化media资源管理器
sensor.run() #启动sensor
clock = time.clock()
while True:
os.exitpoint() #检测IDE中断
################
## 这里编写代码 ##
################
clock.tick()
img = sensor.snapshot() #拍摄一张图片
centroid_sum = 0
for r in ROIS:
blobs = img.find_blobs(GRAYSCALE_THRESHOLD, roi=r[0:4], merge=True) # r[0:4] 是上面定义的roi元组.
if blobs:
# Find the blob with the most pixels.
largest_blob = max(blobs, key=lambda b: b.pixels())
# Draw a rect around the blob.
img.draw_rectangle(largest_blob.rect())
img.draw_cross(largest_blob.cx(),
largest_blob.cy())
centroid_sum += largest_blob.cx() * r[4] # r[4] 是每个roi的权重值.
center_pos = (centroid_sum / weight_sum) # 确定直线的中心.
# 将直线中心位置转换成角度,便于机器人处理.
deflection_angle = 0
# 使用反正切函数计算直线中心偏离角度。可以自行画图理解
#权重X坐标落在图像左半部分记作正偏,落在右边部分记为负偏,所以计算结果加负号。
#deflection_angle = -math.atan((center_pos-80)/60) #采用图像为QQVGA 160*120时候使用
deflection_angle = -math.atan((center_pos-160)/120) #采用图像为QVGA 320*240时候使用
# 将偏离值转换成偏离角度.
deflection_angle = math.degrees(deflection_angle)
# 计算偏离角度后可以控制机器人进行调整.
print("Turn Angle: %f" % deflection_angle)
# LCD显示偏移角度,scale参数可以改变字体大小
img.draw_string_advanced(2,2,20, str('%.1f' % deflection_angle), color=(255,255,255))
#Display.show_image(img) #显示图片
#显示图片,仅用于LCD居中方式显示
Display.show_image(img, x=round((800-sensor.width())/2),y=round((480-sensor.height())/2))
print(clock.fps()) #打印FPS
###################
# IDE中断释放资源代码
###################
except KeyboardInterrupt as e:
print("user stop: ", e)
except BaseException as e:
print(f"Exception {e}")
finally:
# sensor stop run
if isinstance(sensor, Sensor):
sensor.stop()
# deinit display
Display.deinit()
os.exitpoint(os.EXITPOINT_ENABLE_SLEEP)
time.sleep_ms(100)
# release media buffer
MediaManager.deinit()
实验结果如下,以手机做黑色直线用,分别观察摄像头采集到没偏移、左偏和右偏各个直线的实验结果,可以看出效果良好。手机偏移角度分别为接近0°,负数,正数。获取到的偏移角度可以通过串口发送给其他外设或者主控。
- 2024-11-02
-
发表了主题帖:
嘉楠K230AI开发板测评4--图像显示、画图、边缘/线段/圆形/矩形检测、线性回归
本帖最后由 dfjs 于 2024-11-2 18:13 编辑
嘉楠科K230AI开发板测评4--机器视觉篇
摄像头
摄像头是整个机器视觉应用的基础,K230的引出了3路摄像头,接口如下图:
CanMV K230使用camera模块实现摄像头采集图像功能,K230硬件支持3路sensor输入(CSI接口),每个sensor设备均可独立完成图像数据采集捕获处理,并可以同时输出3路图像数据,sensor 0,sensor 1,sensor 2表示三个图像传感器;Camera Device 0,Camera Device 1,Camera Device 2表示三个sensor设备;output channel 0,output channel 1,output channel 2表示sensor设备的三个输出通道。三个图像传感器可以通过软件配置映射到不同的sensor 设备,示意图如下图。
摄像头(sensor)位于media模块下,通过from media.sensor import * #导入sensor模块,使用摄像头相关接口,sensor = Sensor(id,[width, height, fps])构建摄像头对象,id为CSI输入号,默认值为CSI2即开发板上的摄像头,width、height和fps为可选参数,分别表示sensor采集图像宽度,高度和帧率。
sensor.reset()复位和初始化摄像头。sensor.set_framesize(framesize = FRAME_SIZE_INVAILD, [width, height],chn = CAM_CHN_ID_0, alignment=0, **kwargs)设置每个通道的图像输出尺寸,framesize: 通道图像输出尺寸。chn: 通道编号,每个摄像头设备有3个通道。
sensor.set_pixformat(pixformat, chn = CAM_CHN_ID_0)设置图像像素格式。pixformat: 格式。chn: 通道编号,每个摄像头设备有3个通道。
sensor.set_hmirror(enable)设置摄像头画面水平镜像。sensor.set_vflip(enable)设置摄像头画面垂直翻转。
sensor.run()启动摄像头。
sensor.snapshot()使用相机拍摄一张照片,并返回 image 对象。
然后使用计算FPS(每秒帧数)的clock模块。clock=time.clock()构建一个时钟对象。clock.tick()开始追踪运行时间。clock.fps()停止追踪运行时间,并返回当前FPS(每秒帧数)。在调用该函数前始终首先调用 clock.tick(),完整代码编写流程如下。
参考代码如下,摄像头实时拍摄并显示在IDE缓冲区,由于CanMV K230 MicroPython底层基于Linux + RTOS实现,因此可以看到代码中出现一些辅助中断等代码,这些代码相对固定。
'''
实验名称:摄像头使用
实验平台:01Studio CanMV K230
说明:实现摄像头图像采集显示
'''
import time, os, sys
from media.sensor import * #导入sensor模块,使用摄像头相关接口
from media.display import * #导入display模块,使用display相关接口
from media.media import * #导入media模块,使用meida相关接口
try:
sensor = Sensor() #构建摄像头对象
sensor.reset() #复位和初始化摄像头
sensor.set_framesize(Sensor.FHD) #设置帧大小FHD(1920x1080),默认通道0
sensor.set_pixformat(Sensor.RGB565) #设置输出图像格式,默认通道0
#使用IDE缓冲区输出图像,显示尺寸和sensor配置一致。
Display.init(Display.VIRT, sensor.width(), sensor.height())
MediaManager.init() #初始化media资源管理器
sensor.run() #启动sensor
clock = time.clock()
while True:
os.exitpoint() #检测IDE中断
################
## 这里编写代码 ##
################
clock.tick()
img = sensor.snapshot() #拍摄一张图
Display.show_image(img) #显示图片
print(clock.fps()) #打印FPS
###################
# IDE中断释放资源代码
###################
except KeyboardInterrupt as e:
print("user stop: ", e)
except BaseException as e:
print(f"Exception {e}")
finally:
# sensor stop run
if isinstance(sensor, Sensor):
sensor.stop()
# deinit display
Display.deinit()
os.exitpoint(os.EXITPOINT_ENABLE_SLEEP)
time.sleep_ms(100)
# release media buffer
MediaManager.deinit()
实验结果,点击运行代码,右边显示摄像头实时拍摄情况,下方则显示RGB颜色直方图。
图像的3种显示方式
在摄像头拍摄图像后我们需要观察图像,这就涉及如何显示的问题,目前CanMV K230支持3种显示方式。分别是:IDE缓冲区显示、外接HDMI显示器或MIPI显示屏,3种图像显示方式,各有特点:
IDE缓冲区显示:性价比最高,图像质量有一定下降,但能满足大部分场合调试使用。最大支持1920x1080分辨率。
HDMI:外接HDMI显示屏,清晰度最高。最大支持1920x1080分辨率。
MIPI显示屏:外接01Studio 3.5寸MiPi显示屏,可以一体化组装,适合离线部署调试使用。最大支持800x480分辨率。
首先导入Display模块,Display.init(type = None, width = None, height = None, osd_num = 1, to_ide = False, fps = None)初始化Display模块,type: 显示设备类型,VIRT : IDE缓冲区显示;LT9611 : HDMI显示;ST7701 : mipi显示屏。width: 可选参数,显示图像宽度;height: 可选参数,显示图像高度;to_ide: 同时在IDE显示,仅用于设置为HDMI或MIPI屏显示时使用。
Display.show_image(img, x = 0, y = 0, layer = None, alpha = 255, flag = 0),img为显示图像对象,x: 起始横坐标;y: 起始纵坐标。
Display.deinit(),注销Display模块,必须在MediaManager.deinit()之前, 在sensor.stop()之后调用。代码编写流程图如下图:
参考代码如下,只展示与摄像头节不同的地方(核心代码):
#################################
## 图像3种不同显示方式(修改注释实现)
#################################
Display.init(Display.VIRT, sensor.width(), sensor.height()) #通过IDE缓冲区显示图像
#Display.init(Display.LT9611, to_ide=True) #通过HDMI显示图像
#Display.init(Display.ST7701, to_ide=True) #通过01Studio 3.5寸mipi显示屏显示图像
实验结果,分别为IDE缓冲区,HDMI显示器和MIPI屏幕
画图
通过摄像头采集到照片后,我们会进行一些处理,而这时候往往需要一些图形来指示,比如在图CanMV已经将图片处理(包含画图)封装成各类模块,我们 只需要熟悉其构造函数和使用方法即可片某个位置标记箭头、人脸识别后用矩形框提示等。
img=sensor.snapshot()通过摄像头拍摄方式返回image对象。
image.draw_line()对图像进行画线段,参数为起始坐标,终点坐标,颜色与线条粗细。
image.draw_rectangle()画矩形,参数为起始坐标,宽度,高度,颜色,边框粗细,是否填充。
image.draw_circle()画圆,参数为圆心,宽度,高度,颜色,线条粗细,是否填充。
image.draw_arrow()画箭头,参数为起始坐标,终点坐标,颜色,箭头位置大小,线条粗细。
image.draw_cross()画十字交叉,参数为交叉中点坐标,颜色,大小,线条粗细。
image.draw_string()写字符,参数为起始坐标,字符内容,颜色,字体大小,强制间隔。
image.draw_string_advanced()写字符,支持中文,参数为起始坐标,字体大小,字符内容,颜色,字体类型。
代码编写思路如下:
核心代码如下:
img = sensor.snapshot()
# 画线段:从 x0, y0 到 x1, y1 坐标的线段,颜色红色,线宽度 2。
img.draw_line(20, 20, 100, 20, color = (255, 0, 0), thickness = 2)
#画矩形:绿色不填充。
img.draw_rectangle(150, 20, 100, 30, color = (0, 255, 0), thickness = 2, fill = False)
#画圆:蓝色不填充。
img.draw_circle(60, 120, 30, color = (0, 0, 255), thickness = 2, fill = False)
#画箭头:白色。
img.draw_arrow(150, 120, 250, 120, color = (255, 255, 255), size = 20, thickness = 2)
#画十字交叉。
img.draw_cross(60, 200, color = (255, 255, 255), size = 20, thickness = 2)
#写字符。
#img.draw_string(150, 200, "Hello 01Studio!", color = (255, 255, 255), scale = 4, mono_space = False)
#写字符,支持中文。
img.draw_string_advanced(150, 180, 30, "Hello 01Studio", color = (255, 255, 255))
img.draw_string_advanced(40, 300, 30, "人生苦短, 我用Python", color = (255, 255, 255))
Display.show_image(img)
实验结果如下,在合适位置依次画出线段、矩形、圆形、箭头、十字交叉和字符:
图像检测
边缘检测
生活中每个物体都有一个边缘, 简单来说就是轮廓,使用MicroPython 结合 CanMV K230 自带的库来做图像轮廓检测。
CanMV集成了RGB565颜色块识别find_edges函数,位于 image 模块下,因此直接将拍摄到的图片进行处理即可。
直接通过image.find_edges(edge_type[, threshold])即可对图像进行边缘检测,参数edge_type为处理方式,image.EDGE_SIMPLE : 简单的阈值高通滤波算法(其基本原理是设置一个频率阈值,将高于该阈值的频率成分保留或增强,而将低于该阈值的频率成分抑制或去除,从而实现图像的锐化或边缘检测。); image.EDGE_CANNY: Canny 边缘检测算法(核心思想是找寻图像中灰度强度变化最强的位置,这些位置即边缘);threshold: 包含高、低阈值的二元组,默认是(100,200),仅支持灰度图像。
代码编写思路如下:
核心代码如下,对图片对象进行边缘:
img = sensor.snapshot() #拍摄一张图片
#使用 Canny 边缘检测器
img.find_edges(image.EDGE_CANNY, threshold=(50, 80))
# 也可以使用简单快速边缘检测,效果一般,配置如下
#img.find_edges(image.EDGE_SIMPLE, threshold=(100, 255))
#Display.show_image(img) #显示图片
#显示图片,仅用于LCD居中方式显示
Display.show_image(img, x=round((800-sensor.width())/2),y=round((480-sensor.height())/2))
print(clock.fps()) #打印FPS
实验结果如下,对任务画像进行了边缘化:
线段检测
CanMV集成了线段识别 find_line_segments 函数,位于 image 模块下,因此我们直接将拍摄到的图片进行处理即可。
image.find_line_segments([roi[,merge_distance=0[,max_theta_difference=15]]])线段识别函数,返回image.line线段对象列表。参数roi: 识别区域(x,y,w,h),未指定则默认整张图片。参数merge_distance: 两条线段间可以相互分开而不被合并的最大像素。参数max_theta_difference: 将少于这个角度值的线段合并。大部分参数使用默认即可,不支持压缩图像和bayer图像。
代码编写流程如下:
核心代码如下:
img = sensor.snapshot() #拍摄一张图片
if enable_lens_corr: img.lens_corr(1.8) # for 2.8mm lens...
# `merge_distance` 控制相近的线段是否合并. 数值 0 (默认值)表示不合并。数值
#为1时候表示相近1像素的线段被合并。因此你可以通过改变这个参数来控制检测到线
#段的数量。
# `max_theta_diff` 控制相差一定角度的线段合并,默认是15度,表示15度内的线
# 段都会合并
for l in img.find_line_segments(merge_distance = 0, max_theta_diff = 5):
img.draw_line(l.line(), color = (255, 0, 0), thickness=2)
print(l)
#Display.show_image(img) #显示图片
#显示图片,仅用于LCD居中方式显示
Display.show_image(img, x=round((800-sensor.width())/2),y=round((480-sensor.height())/2))
print(clock.fps()) #打印FPS
实验结果如下,IDE缓冲区标出图像中的横线。
圆形检测
CanMV集成了圆形识别find_circles函数,位于image模块下,因此我们直接将拍摄到的图片进行处理即可。
image.find_circles([roi[, x_stride=2[, y_stride=1[, threshold=2000[, x_margin=10[, y_margin=10[, r_margin=10[, r_min=2[, r_max[, r_step=2]]]]]]]]]])找圆函数。返回一个image.circle圆形对象,该圆形对象有4个值: x, y(圆心), r (半径)和magnitude(量级);量级越大说明识别到的圆可信度越高。
roi: 识别区域(x,y,w,h),未指定则默认整张图片;
threshold: 阈值。返回大于或等于threshold的圆,调整识别可信度;
x_stride y_stride : 霍夫变换时跳过x,y像素的量;
x_margin y_margin r_margin : 控制所检测圆的合并;
r_min r_max: 控制识别圆形的半径范围‘
r_step:控制识别步骤。
代码编写思路如下:
核心代码如下:
img = sensor.snapshot() #拍摄一张图片
# 圆形类有 4 个参数值: 圆心(x, y), r (半径)和 magnitude(量级);
# 量级越大说明识别到的圆可信度越高。
# `threshold` 参数控制找到圆的数量,数值的提升会降低识别圆形的总数。
# `x_margin`, `y_margin`, and `r_margin`控制检测到接近圆的合并调节.
# r_min, r_max, and r_step 用于指定测试圆的半径范围。
for c in img.find_circles(threshold = 2000, x_margin = 10, y_margin= 10,
r_margin = 10,r_min = 2, r_max = 100, r_step = 2):
#画红色圆做指示
img.draw_circle(c.x(), c.y(), c.r(), color = (255, 0, 0),thickness=2)
print(c) #打印圆形的信息
#Display.show_image(img) #显示图片
#显示图片,仅用于LCD居中方式显示
Display.show_image(img, x=round((800-sensor.width())/2),y=round((480-sensor.height())/2))
实验结果如下,图片检测识别结果如图。
矩形检测
CanMV集成了矩形识别find_rects函数,位于image模块下,因此我们直接将拍摄到的图片进行处理即可。
image.find_rects([roi=Auto, threshold=10000])矩形识别函数。返回一个image.rect矩形对象列表。
roi: 识别区域(x,y,w,h),未指定则默认整张图片;
threshold: 阈值。返回大于或等于threshold的矩形,调整识别可信度。
代码编写思路如下:
核心代码如下:
img = sensor.snapshot() #拍摄一张图片
# `threshold` 需要设置一个比价大的值来过滤掉噪声。
#这样在图像中检测到边缘亮度较低的矩形。矩形
#边缘量级越大,对比越强…
for r in img.find_rects(threshold = 10000):
img.draw_rectangle(r.rect(), color = (255, 0, 0),thickness=2) #画矩形显示
for p in r.corners(): img.draw_circle(p[0], p[1], 5, color = (0, 255, 0))#四角画小圆形
print(r)
#Display.show_image(img) #显示图片
#显示图片,仅用于LCD居中方式显示
Display.show_image(img, x=round((800-sensor.width())/2),y=round((480-sensor.height())/2))
print(clock.fps()) #打印FPS
实验结果,左边矩形识别结果如图。
快速线性回归(巡线)
快速线性回归的用途非常广泛,如比赛经常用到的小车、机器人巡线,可以通过线性回归的方式判断虚线和实线的轨迹,从而做出判断和响应。
CanMV集成了快速线性回归get_regression函数,位于image模块下。
mage.get_regression(thresholds[, invert=False[, roi[, x_stride=2[, y_stride=1[, area_threshold=10[, pixels_threshold=10[, robust=False]]]]]]])对图像所有阈值像素进行线性回
归计算。这一计算通过最小二乘法进行,通常速度较快,但不能处理任何异常值。若 robust 为True,则将使用泰尔指数。泰尔指数计算图像中所有阈值像素间的所有斜率的中值。
若在阈值转换后设定太多像素,即使在80x60的图像上,这一N^2操作也可能将您的FPS降到5以下。 但是,只要阈值转换后的进行设置的像素数量较少,即使在超过30%的阈值像素
为异常值的情况下,线性回归也依然有效。
threshold: 必须是元组列表。 (lo, hi) 定义你想追踪的颜色范围。对于灰度图像,每个元组需要包含两个值:最小灰度值和最大灰度值。
代码编写流程如下:
核心代码如下:
#image.binary([THRESHOLD])将灰度值在THRESHOLD范围变成了白色
img = sensor.snapshot().binary([THRESHOLD]) if BINARY_VISIBLE else sensor.snapshot()
# 返回一个类似 find_lines() 和find_line_segments()的对象.
# 有以下函数使用方法: x1(), y1(), x2(), y2(), length(),
# theta() (rotation in degrees), rho(), and magnitude().
#
# magnitude() 代表线性回归的指令,其值为(0, INF]。
# 0表示一个圆,INF数值越大,表示线性拟合的效果越好。
line = img.get_regression([(255,255) if BINARY_VISIBLE else THRESHOLD])
if (line):
img.draw_line(line.line(), color = 127,thickness=4)
print(line) #打印结果
#显示图片,仅用于LCD居中方式显示
Display.show_image(img, x=round((800-sensor.width())/2),y=round((480-sensor.height())/2))
print("FPS %f, mag = %s" % (clock.fps(), str(line.magnitude()) if (line) else "N/A"))
实验结果,为了标明线性的变化趋势,取多组实验结果如下,串口结果包含拟合线段的两个点坐标,长度,以及非常重要的theta角度信息: