设备影子
设备影子(Device Shadow)是driver-box的核心数据存储组件,提供高效、可靠的设备状态管理和数据访问能力。
stateDiagram-v2
[*] --> Offline: 设备初始化
Offline --> Initializing: 首次数据到达
Initializing --> Online: setPointValue()
Online --> Online: 正常数据更新
Online --> Checking: TTL检查周期
Checking --> Online: 数据未过期
Checking --> MaybeOffline: disconnectTimes < 3
MaybeOffline --> Offline: 超过60秒且失败>=3次
MaybeOffline --> Online: 重试成功
Online --> Refreshing: refreshStatus()
Refreshing --> Online: TTL未到期
Refreshing --> Offline: TTL已到期
数据更新流程
Section titled “数据更新流程”sequenceDiagram
participant API as External API
participant Core as Core Layer
participant Shadow as Device Shadow
participant Device as Device Instance
API->>Core: ReadPoint(deviceId, pointName)
Core->>Shadow: GetPointValue(deviceId, pointName)
Shadow->>Shadow: 检查online状态
Shadow->>Shadow: 检查TTL有效性
alt online && 有效
Shadow-->>Core: value, true
else
Shadow-->>Core: nil, false
end
API->>Core: WritePoint(deviceId, pointData)
Core->>Shadow: SetWritePointValue(deviceId, name, value)
Shadow->>Device: setWritePointValue(name, value)
Note over Device: 存储待写值
Note over Shadow: WritePoint完成后<br/>立即触发读验证
Core->>Core: tryReadNewValues()
Core->>Device: 发送读指令
Device-->>Core: 实际值
Core->>Shadow: SetPointValue(deviceId, name, actualValue)
Note over Shadow: 更新为实际读取值
在线状态判定机制
Section titled “在线状态判定机制”flowchart TD
A[设备数据到达] --> B[更新updatedAt<br/>当前时间]
B --> C[重置disconnectTimes<br/>= 0]
C --> D[设置online = true]
D --> E{发送/读取操作}
E -->|成功| F[保持online=true<br/>disconnectTimes=0]
E -->|失败| G{disconnectTimes统计}
G -->|< 3次<br/>&&> 60秒| H[设置online=false<br/>设备离线]
G -->|>= 3次| H
G -->|< 3次| I[disconnectTimes++]
H --> J[触发ShadowOffline事件]
subgraph TTL检查周期
K[定时检查] --> L{时间差 > TTL?}
L -->|是| M[设置online=false]
L -->|否| N[保持online=true]
end
影子系统架构
Section titled “影子系统架构”设备影子采用内存+持久化的混合存储架构,确保数据的高性能访问和可靠性。
graph TB
subgraph "Memory Layer"
MemCache[实时影子缓存]
Index[状态索引]
Queue[变更队列]
end
subgraph "Persistence Layer"
SQLite[(SQLite数据库)]
Snapshot[配置快照]
History[历史记录]
end
subgraph "Interface Layer"
RestAPI[REST API]
WebSocket[WebSocket]
Internal[内部调用]
end
subgraph "Sync Mechanism"
Sync[数据同步器]
Listener[变更监听器]
Checker[一致性检查]
end
MemCache <--> SQLite
Index <--> Snapshot
Queue <--> History
MemCache --> RestAPI
MemCache --> WebSocket
MemCache --> Internal
Sync --> MemCache
Listener --> Queue
Checker --> SQLite
设备影子结构
Section titled “设备影子结构”基于实际代码实现,每个设备影子包含完整的设备状态信息:
Device结构 (driverbox/shadow/shadow.go)
type Device struct { ID string `json:"id"` ModelName string `json:"modelName"` Points map[string]DevicePoint `json:"points"` Online bool `json:"online"` TTL string `json:"ttl"` DisconnectTimes int `json:"disconnectTimes"` UpdatedAt time.Time `json:"updatedAt"`}DevicePoint结构 (driverbox/shadow/shadow.go)
type DevicePoint struct { Name string `json:"name"` Value interface{} `json:"value"` WriteValue interface{} `json:"writeValue"` UpdatedAt time.Time `json:"updatedAt"` WriteAt time.Time `json:"writeAt"`}数据结构说明
Section titled “数据结构说明”设备字段说明:
- ID: 设备唯一标识符
- ModelName: 设备模型名称
- Points: 设备点位映射,key为点位名称,value为点位数据
- Online: 设备在线状态
- TTL: 设备生存时间,超过该时长没有收到数据视为离线
- DisconnectTimes: 断开连接次数,60秒内超过3次判定离线
- UpdatedAt: 设备最后更新时间
点位字段说明:
- Name: 点位名称
- Value: 点位当前值(从设备读取的实际值)
- WriteValue: 点位写入值(下发的控制值)
- UpdatedAt: 点位值更新时间
- WriteAt: 点位写入时间
{ "id": "device-001", "modelName": "temp_sensor_v1", "online": true, "ttl": "24h0m0s", "disconnectTimes": 0, "updatedAt": "2024-01-22T07:00:00Z", "points": { "temperature": { "name": "temperature", "value": 25.5, "writeValue": null, "updatedAt": "2024-01-22T07:00:00Z", "writeAt": "0001-01-01T00:00:00Z" }, "humidity": { "name": "humidity", "value": 65.2, "writeValue": null, "updatedAt": "2024-01-22T07:00:00Z", "writeAt": "0001-01-01T00:00:00Z" } }}数据生命周期
Section titled “数据生命周期”设备影子数据从创建到销毁经历完整的生命周期管理:
stateDiagram-v2
[*] --> 创建: 设备首次接入
创建 --> 同步: 数据更新
同步 --> 缓存: 内存存储
缓存 --> 持久化: 定时保存
缓存 --> 过期: TTL超时
过期 --> 离线: 标记离线
离线 --> 重连: 设备重新接入
重连 --> 同步
缓存 --> 销毁: 设备删除
持久化 --> 销毁
销毁 --> [*]
运行 --> 错误: 异常处理
错误 --> 恢复: 重试机制
恢复 --> 运行
数据更新流程
Section titled “数据更新流程”- 数据接收:协议插件接收到设备数据
- 格式转换:转换为driver-box内部标准格式
- 影子更新:更新内存中的设备影子状态
- 事件通知:触发数据变更事件给导出插件
- 持久化:定时将数据保存到SQLite数据库
内存缓存设计
Section titled “内存缓存设计”内存层提供高性能的数据访问:
- 实时性:毫秒级数据访问延迟
- 并发安全:支持高并发读写操作
- 内存效率:优化的数据结构和索引机制
持久层确保数据可靠性:
- SQLite数据库:轻量级本地存储,无需额外依赖
- 自动清理:TTL过期数据自动删除,控制存储空间
- 数据压缩:历史数据压缩存储,节省磁盘空间
- 备份恢复:支持数据备份和恢复功能
采用多级缓存策略优化性能:
- LRU算法:最近最少使用淘汰,确保热点数据常驻
- 预加载:系统启动时预加载频繁访问的数据
- 写缓冲:批量写入减少磁盘I/O操作
- 读缓存:热点数据缓存,进一步提升访问速度
数据一致性保证
Section titled “数据一致性保证”并发控制机制
Section titled “并发控制机制”使用乐观锁机制保证数据一致性:
// 并发控制示例func UpdatePoint(deviceId, pointName string, value interface{}) error { shadow := getDeviceShadow(deviceId) originalVersion := shadow.Version
// 更新数据 shadow.Points[pointName].Value = value shadow.Points[pointName].Timestamp = time.Now() shadow.Version++
// 原子性检查和提交 if !atomicCompareAndSet(shadow, originalVersion) { return errors.New("concurrent modification detected") }
return nil}提供完整的事务支持:
- 批量更新:支持多点位原子更新操作
- 回滚机制:失败时自动回滚,保证数据完整性
- 一致性保证:强一致性数据更新,不丢失任何变更
设备影子API接口
Section titled “设备影子API接口”基于实际代码实现,设备影子提供以下核心API接口:
DeviceShadow接口 (driverbox/shadow/shadow.go)
type DeviceShadow interface { // AddDevice 新增设备 AddDevice(id string, modelName string, ttl ...time.Duration)
// GetDevice 获取设备 GetDevice(id string) (device Device, ok bool)
// HasDevice 是否存在设备 HasDevice(id string) bool
// DeleteDevice 删除设备 DeleteDevice(id ...string) (err error)
// SetDevicePoint 设置设备点位值 SetDevicePoint(id, pointName string, value interface{}) (err error)
// GetDevicePoint 获取设备点位值 GetDevicePoint(id, pointName string) (value interface{}, err error)
// GetDevicePoints 获取设备所有点位 GetDevicePoints(id string) (points map[string]DevicePoint, err error)
// GetDevicePointDetails 获取设备点位详情 GetDevicePointDetails(id, pointName string) (point DevicePoint, err error)
// GetDeviceUpdateAt 获取设备最后更新时间 GetDeviceUpdateAt(id string) (time.Time, error)
// GetDeviceStatus 获取设备在离线状态 GetDeviceStatus(id string) (online bool, err error)
// SetOnline 设置设备为在线状态 SetOnline(id string) (err error)
// SetOffline 设置设备为离线状态 SetOffline(id string) (err error)
// MayBeOffline 可能离线事件(60秒内超过3次判定离线) MayBeOffline(id string) (err error)
// GetDevices 获取所有设备 GetDevices() []Device
// SetWritePointValue 存储下发控制点位值 SetWritePointValue(id string, pointName string, value interface{}) (err error)
// GetWritePointValue 获取下发控制点位值 GetWritePointValue(id string, pointName string) (value interface{}, err error)}API使用示例
Section titled “API使用示例”获取设备影子:
// 获取设备信息device, ok := driverbox.Shadow().GetDevice("device-001")if ok { fmt.Printf("设备ID: %s, 在线状态: %v\n", device.ID, device.Online) for pointName, point := range device.Points { fmt.Printf("点位 %s: 值=%v, 更新时间=%v\n", pointName, point.Value, point.UpdatedAt) }}设置点位值:
// 设置设备点位值err := driverbox.Shadow().SetDevicePoint("device-001", "temperature", 25.5)if err != nil { fmt.Printf("设置点位值失败: %v\n", err)}获取点位详情:
// 获取点位详细信息point, err := driverbox.Shadow().GetDevicePointDetails("device-001", "temperature")if err == nil { fmt.Printf("点位名称: %s\n", point.Name) fmt.Printf("当前值: %v\n", point.Value) fmt.Printf("写入值: %v\n", point.WriteValue) fmt.Printf("更新时间: %v\n", point.UpdatedAt) fmt.Printf("写入时间: %v\n", point.WriteAt)}设备状态管理:
// 设置设备在线err := driverbox.Shadow().SetOnline("device-001")
// 设置设备离线err := driverbox.Shadow().SetOffline("device-001")
// 可能离线(用于网络异常场景)err := driverbox.Shadow().MayBeOffline("device-001")
// 获取设备状态online, err := driverbox.Shadow().GetDeviceStatus("device-001")批量操作:
// 获取所有设备devices := driverbox.Shadow().GetDevices()for _, device := range devices { fmt.Printf("设备: %s, 模型: %s, 在线: %v\n", device.ID, device.ModelName, device.Online)}
// 删除设备err := driverbox.Shadow().DeleteDevice("device-001", "device-002")写操作记录:
// 存储下发控制值err := driverbox.Shadow().SetWritePointValue("device-001", "switch", true)
// 获取下发控制值writeValue, err := driverbox.Shadow().GetWritePointValue("device-001", "switch")系统提供全面的健康监控:
# 检查影子服务状态GET /api/v1/health/shadow
# 查看缓存统计信息GET /api/v1/shadow/stats响应示例:
{ "memoryUsage": "45MB", "cacheHitRate": "95.2%", "totalDevices": 150, "onlineDevices": 148, "dataPoints": 1200}关键性能指标监控:
- 响应时间:平均读写操作延迟
- 吞吐量:每秒处理的请求数量
- 缓存命中率:缓存有效性指标
- 内存使用率:内存资源使用情况
影子数据导出
Section titled “影子数据导出”支持将影子数据导出到外部系统:
{ "export": { "shadow": { "enable": true, "format": "json", "interval": "30s", "targets": ["mqtt", "http", "file"] } }}历史数据查询
Section titled “历史数据查询”提供丰富的历史数据查询功能:
# 查询指定时间范围的历史数据GET /api/v1/history?device=device-001&point=temperature&start=2024-01-01&end=2024-01-31
# 聚合查询(平均值、最大值等)GET /api/v1/history/aggregate?device=device-001&point=temperature&function=avg&interval=1h数据恢复机制
Section titled “数据恢复机制”完善的故障恢复策略:
- 内存丢失恢复:重启时从数据库自动恢复
- 数据不一致修复:自动检测和修复不一致数据
- 备份恢复:支持从备份文件恢复数据
高可用性保障:
- 主从复制:支持影子数据的主从同步
- 故障转移:主节点故障时自动切换
- 数据分片:大规模部署时的水平扩展