小阳心健康测量 Python SDK
安装 SDK
pip install xy-health-measurement-sdk
IMPORTANT
SDK 依赖 Protocal Buffers 3.19.6,用户需根据系统环境进行安装,或使用 Docker镜像。
初始化
初始化测量对象以进行后续健康测量。
IMPORTANT
每次测量均需创建一个独立 Measurement
对象,对象内部包含单次测量完整过程数据,不可复用。
方法
Measurement(app_id, sdk_key, *args, **kwargs)
参数说明
名称 | 类型 | 必需 | 示例值 | 描述 |
---|---|---|---|---|
app_id | str | 是 | - | APP_ID 从管理控制台获取 |
sdk_key | str | 是 | - | SDK_KEY 从管理控制台获取 |
args | tuple | 否 | (MeasurementCategory.BloodPressure, MeasurementCategory.Anxiety) | 测量类型 |
kwargs | dict | 是 | 参考下方详解 | 自定义配置 |
args
- 该参数指定本次测量关注的所有测量类型指标,缺省值为
MeasurementCategory.All
。 - 默认提供“生理测量”和“情绪测量”两种预设类型。
- 生理测量包括 心率报告(心率、心率变异率)、房颤报告(房颤)、血压报告(舒张压、收缩压)、血氧报告(血氧饱和度)、风险报告(心脏病风险、中风风险、心血管疾病风险、心脏压力、血管功能)、基础报告(性别、皮肤年龄、体重指数)、综合心健康风险报告(综合心健康风险)。
- 情绪测量包含 心率报告(心率)、攻击性报告(攻击性)、焦虑度报告(焦虑度)、活力度报告(活力度)、抑郁度报告(抑郁度)、疲劳度报告(疲劳度)、压力度报告(压力度)、情绪综合分报告(情绪综合分)。
- 如果预设类型不满足,您也可以根据
MeasurementCategory
类型传入关注的指标分类。
测量类型(SDK内部包含)定义如下:
pyclass MeasurementCategory(Enum): #心率房颤 HeartRate = 1, #血压 BloodPressure = 2, #血氧 BloodOxygen = 4, #健康风险 Risk = 8, #攻击性 Aggressivity = 16, #焦虑度 Anxiety = 32, #压抑度[已废弃] Depression = 64, #活力度 Vitality = 128, #积极性[已废弃] Positivity = 256, #抑郁度 Suppression = 512, #疲劳度 Fatigue = 1024, #生理测量 Physiology = 15, #情绪测量 Emotion = 1713, #全部指标 All = 1727 }
WARNING
测量类型参数将受限于您所购许可证绑定的测量类型,这意味着该参数应为许可证所授权测量类型的子集。
示例代码
pymeasurement = Measurement(app_id, sdk_key)
pymeasurement = Measurement(app_id, sdk_key, MeasurementCategory.Physiology)
pymeasurement = Measurement(app_id, sdk_key, MeasurementCategory.Emotion)
py# 仅关注 “血氧/焦虑” 指标 measurement = Measurement(app_id, sdk_key, MeasurementCategory.BloodPressure, MeasurementCategory.Anxiety)
- 该参数指定本次测量关注的所有测量类型指标,缺省值为
kwargs
该参数用于指定自定义配置,其可选配置结构示例如下:
py{ 'logging_config':{ 'level': logging.DEBUG, 'format': '%(asctime)s - %(levelname)s - %(filename)s[line:%(lineno)d]: %(message)s', 'filename': 'log.txt', 'filemode': 'w' }, 'auth_url': '', 'measurement_url': '', 'measurement_duration': 15000, 'min_chunk_timespan': 6000 }
名称 类型 必需 描述 logging_config dict 否 自定义日志配置 auth_url str 否 许可证认证地址,用于私有化部署服务配置 measurement_url str 否 测量服务地址,用于私有化部署服务配置 measurement_duration int 否 测量时长(毫秒) min_chunk_timespan int 否 阶段测量最小时长(毫秒)
启动测量
启动测量,成功会触发started
事件并返回measurement_id
(本次测量唯一标识),失败则会触发crashed
事件。
测量要求参阅 健康测量条件及要求。
方法
measurement.start(frame)
参数说明
名称 | 类型 | 必需 | 描述 |
---|---|---|---|
frame | ndarray | 是 | 初始视频帧 |
- 视频帧仅用于校验是否满足基本测量条件并匹配测量模型,测量服务不会保存用户数据。
- 视频帧中必须包含清晰人脸,人脸需要保持正脸,光线明亮均匀。
示例代码
#订阅启动测量事件
measurement.subscribe('started', started_handler)
def started_handler(sender, **kwargs):
print(kwargs)
#启动测量
measurement.start(frame)
采集数据
测量成功开始后,用户需要从摄像头、视频文件等采集视频帧并提供给 SDK 进行图像分析,视频帧需要保持连续,且帧率不能太低,建议15-30帧每秒。视频帧采集满足条件后会触发collected
事件,此时可以停止采集数据。
方法
measurement.enqueue(frame, timestamp)
参数说明
名称 | 类型 | 必需 | 描述 |
---|---|---|---|
frame | ndarray | 是 | 视频帧 |
timestamp | number | 是 | 视频帧时间戳 |
frame
视频帧中必须包含清晰人脸,人脸需要保持正脸,光线明亮均匀。视频帧务必保持连续,切勿跳帧。
timestamp
拿到视频帧瞬间的时间戳,务必在获得视频帧的第一时间记录时间戳,切勿在操作视频帧后再记录时间戳。格式为
Unix 时间戳(精确到毫秒)
,使用int(time.time() * 1000)
获得即可。
示例代码
#订阅视频帧采集完成事件
measurement.subscribe('collected', collected_handler)
def collected_handler(sender):
pass
#入队视频帧
measurement.enqueue(frame, 1709151805475)
订阅报告
测量成功开始后,SDK 会在接收到足够数据的情况下请求测量服务生成健康报告。测量过程中会产生阶段性报告并通过chunkReportGenerated
事件发布,测量结束后会产生完整健康报告并通过wholeReportGenerated
事件发布。
用户可以按需订阅chunkReportGenerated
和wholeReportGenerated
事件以获得健康报告。
示例代码
#订阅阶段性报告
measurement.subscribe('chunk_report_generated', chunk_report_handler)
def chunk_report_handler(sender, **kwargs):
print(kwargs)
#订阅完整报告
measurement.subscribe('whole_report_generated', whole_report_handler)
def whole_report_handler(sender, **kwargs):
print(kwargs)
报告解读
请参见 健康报告解读。
中断测量
在测量过程中可以随时终止测量。测量终端后会触发interrupted
事件。
方法
#订阅中断测量事件
measurement.subscribe("interrupted", interrupted_handler)
def interrupted_handler(sender):
pass
#中断测量
measurement.interrupt()
异常处理
启动测量后,任意环节出现异常均会通过crashed
事件发布,用户可以按需订阅此事件,以获得错误信息并处理。
示例代码
#订阅异常事件
measurement.subscribe('crashed', exception_handler)
def exception_handler(sender, **kwargs):
# exception_level = kwargs['level'] # 异常级别
print(kwargs['msg'])
crashed
事件处理函数中所暴露对象的level
属性反映了当前异常级别,如果异常级别为error
则 SDK 会自动终止本次测量,用户需根据错误原因做出相应调整后重新开始新的测量。
性能问题
- 测量过程中 SDK 需要进行实时图像处理,帧率设置过高会增加性能损耗,一般建议设置每秒15-30帧。
- 为提高测量响应速度,SDK 内部采用多进程机制处理计算密集型任务,使用者集成时需要注意兼容问题。
示例程序
下面我们以视频文件和网络视频流两种常见数据源来演示如何使用 SDK 进行测量。
from abc import ABC, abstractmethod
from xy_health_measurement_sdk.Measurement import Measurement
from xy_health_measurement_sdk.protos.Category_pb2 import BloodPressure, Anxiety
class Sample(ABC):
def __init__(self, *args, **kwargs):
# 创建测量对象
self._measurement = Measurement(*args, **kwargs)
# 订阅事件
self._measurement.subscribe('started', self._started_handler)
self._measurement.subscribe('collected', self._collected_handler)
self._measurement.subscribe('chunk_report_generated', self._chunk_report_handler)
self._measurement.subscribe('whole_report_generated', self._whole_report_handler)
self._measurement.subscribe('crashed', self._exception_handler)
self._collected = False # 数据采集完成标志
def _started_handler(self, sender, **kwargs):
print(kwargs)
def _collected_handler(self, sender):
self._collected = True
def _chunk_report_handler(self, sender, **kwargs):
print(kwargs)
def _whole_report_handler(self, sender, **kwargs):
print(kwargs)
def _exception_handler(self, sender, **kwargs):
if kwargs['level'] == 'error':
self._collected_handler(self)
self._measurement.stop()
print(kwargs['msg_cn'])
@abstractmethod
def start(self, *args):
pass
为简化冗余代码,我们先将 SDK 相关通用代码定义到Sample
抽象类中,用户可以按需订阅关注的事件并重写事件处理函数。
import cv2
from Sample import Sample
class VideoFileSample(Sample):
def start(self, *args):
video, cap = args[0], None
try:
cap = cv2.VideoCapture(video)
if not cap.isOpened():
print(f'Unable to open the {video}.')
return
success, frame = cap.read()
if not success:
print(f'Unable to read the {video}.')
return
self._measurement.start(frame)
while not self._collected:
success, frame = cap.read()
if not success:
break
timestamp = cap.get(cv2.CAP_PROP_POS_MSEC)
self._measurement.enqueue(frame, timestamp)
except Exception as ex:
print(ex)
finally:
if cap:
cap.release()
app_id, sdk_key, video = '', '', 'video.mp4'
VideoFileSample(app_id, sdk_key).start(video)
input("测量中,请勿退出...")
import cv2
from time import time, sleep
from Sample import Sample
class RtspSample(Sample):
def start(self, *args):
addr, fps = args
cap, interval = None, 1 / fps
try:
cap = cv2.VideoCapture(addr)
if not cap.isOpened():
print(f'Unable to load the {addr}.')
return
success, frame = cap.read()
if not success:
print(f'Unable to read the {addr}.')
return
self._measurement.start(frame)
while not self._collected:
ret, frame = cap.read()
if not ret:
break
timestamp = int(time() * 1000) # 网络视频流中时间会被不断重置,不能取其中的时间戳
sleep(interval) # 根据帧率设置采集时间间隔,否则会密集获取重复视频帧
self._measurement.enqueue(frame, timestamp)
except Exception as ex:
print(ex)
finally:
if cap:
cap.release()
app_id, sdk_key = '', ''
rtsp = 'rtsp://xxx'
fps = 25 # 假定帧率为25(需根据实际推流帧率调整,设置帧率应小于等于实际推流帧率)
RtspSample(app_id, sdk_key, BloodPressure, Anxiety).start(rtsp, fps)
input("测量中,请勿退出...")
需要特别注意的是,RTSP
等网络视频流会定时重置视频时间戳,因此需要采用当前系统时间戳而非视频流时间戳。另外,使用网络视频流数据源测量时需要根据数据源实际推流参数设置合理的帧率。设置帧率应小于等于实际推流帧率。
以上代码已分享至GitHub,具体用法也可以参考下面的视频教程。