Skip to content

设备影子

设备影子(Device Shadow)是driver-box的核心数据存储组件,提供高效、可靠的设备状态管理和数据访问能力。

stateDiagram-v2
    [*] --> Offline: 设备初始化
    Offline --> Initializing: 首次数据到达
    Initializing --> Online: setPointValue()
    
    Online --> Online: 正常数据更新
    Online --> Checking: TTL检查周期
    Checking --> Online: 数据未过期
    Checking --> MaybeOffline: disconnectTimes < 3
    
    MaybeOffline --> Offline: 超过60秒且失败>=3次
    MaybeOffline --> Online: 重试成功
    
    Online --> Refreshing: refreshStatus()
    Refreshing --> Online: TTL未到期
    Refreshing --> Offline: TTL已到期
sequenceDiagram
    participant API as External API
    participant Core as Core Layer
    participant Shadow as Device Shadow
    participant Device as Device Instance

    API->>Core: ReadPoint(deviceId, pointName)
    Core->>Shadow: GetPointValue(deviceId, pointName)
    Shadow->>Shadow: 检查online状态
    Shadow->>Shadow: 检查TTL有效性
    alt online && 有效
        Shadow-->>Core: value, true
    else
        Shadow-->>Core: nil, false
    end

    API->>Core: WritePoint(deviceId, pointData)
    Core->>Shadow: SetWritePointValue(deviceId, name, value)
    Shadow->>Device: setWritePointValue(name, value)
    Note over Device: 存储待写值

    Note over Shadow: WritePoint完成后<br/>立即触发读验证
    Core->>Core: tryReadNewValues()
    Core->>Device: 发送读指令
    Device-->>Core: 实际值
    Core->>Shadow: SetPointValue(deviceId, name, actualValue)
    Note over Shadow: 更新为实际读取值
flowchart TD
    A[设备数据到达] --> B[更新updatedAt<br/>当前时间]
    B --> C[重置disconnectTimes<br/>= 0]
    C --> D[设置online = true]
    D --> E{发送/读取操作}
    
    E -->|成功| F[保持online=true<br/>disconnectTimes=0]
    E -->|失败| G{disconnectTimes统计}
    
    G -->|< 3次<br/>&&> 60秒| H[设置online=false<br/>设备离线]
    G -->|>= 3次| H
    G -->|< 3次| I[disconnectTimes++]
    
    H --> J[触发ShadowOffline事件]
    
    subgraph TTL检查周期
        K[定时检查] --> L{时间差 > TTL?}
        L -->|是| M[设置online=false]
        L -->|否| N[保持online=true]
    end

设备影子采用内存+持久化的混合存储架构,确保数据的高性能访问和可靠性。

graph TB
    subgraph "Memory Layer"
        MemCache[实时影子缓存]
        Index[状态索引]
        Queue[变更队列]
    end

    subgraph "Persistence Layer"
        SQLite[(SQLite数据库)]
        Snapshot[配置快照]
        History[历史记录]
    end

    subgraph "Interface Layer"
        RestAPI[REST API]
        WebSocket[WebSocket]
        Internal[内部调用]
    end

    subgraph "Sync Mechanism"
        Sync[数据同步器]
        Listener[变更监听器]
        Checker[一致性检查]
    end

    MemCache <--> SQLite
    Index <--> Snapshot
    Queue <--> History

    MemCache --> RestAPI
    MemCache --> WebSocket
    MemCache --> Internal

    Sync --> MemCache
    Listener --> Queue
    Checker --> SQLite

基于实际代码实现,每个设备影子包含完整的设备状态信息:

Device结构 (driverbox/shadow/shadow.go)

type Device struct {
ID string `json:"id"`
ModelName string `json:"modelName"`
Points map[string]DevicePoint `json:"points"`
Online bool `json:"online"`
TTL string `json:"ttl"`
DisconnectTimes int `json:"disconnectTimes"`
UpdatedAt time.Time `json:"updatedAt"`
}

DevicePoint结构 (driverbox/shadow/shadow.go)

type DevicePoint struct {
Name string `json:"name"`
Value interface{} `json:"value"`
WriteValue interface{} `json:"writeValue"`
UpdatedAt time.Time `json:"updatedAt"`
WriteAt time.Time `json:"writeAt"`
}

设备字段说明:

  • ID: 设备唯一标识符
  • ModelName: 设备模型名称
  • Points: 设备点位映射,key为点位名称,value为点位数据
  • Online: 设备在线状态
  • TTL: 设备生存时间,超过该时长没有收到数据视为离线
  • DisconnectTimes: 断开连接次数,60秒内超过3次判定离线
  • UpdatedAt: 设备最后更新时间

点位字段说明:

  • Name: 点位名称
  • Value: 点位当前值(从设备读取的实际值)
  • WriteValue: 点位写入值(下发的控制值)
  • UpdatedAt: 点位值更新时间
  • WriteAt: 点位写入时间
{
"id": "device-001",
"modelName": "temp_sensor_v1",
"online": true,
"ttl": "24h0m0s",
"disconnectTimes": 0,
"updatedAt": "2024-01-22T07:00:00Z",
"points": {
"temperature": {
"name": "temperature",
"value": 25.5,
"writeValue": null,
"updatedAt": "2024-01-22T07:00:00Z",
"writeAt": "0001-01-01T00:00:00Z"
},
"humidity": {
"name": "humidity",
"value": 65.2,
"writeValue": null,
"updatedAt": "2024-01-22T07:00:00Z",
"writeAt": "0001-01-01T00:00:00Z"
}
}
}

设备影子数据从创建到销毁经历完整的生命周期管理:

stateDiagram-v2
    [*] --> 创建: 设备首次接入
    创建 --> 同步: 数据更新
    同步 --> 缓存: 内存存储
    缓存 --> 持久化: 定时保存

    缓存 --> 过期: TTL超时
    过期 --> 离线: 标记离线
    离线 --> 重连: 设备重新接入
    重连 --> 同步

    缓存 --> 销毁: 设备删除
    持久化 --> 销毁
    销毁 --> [*]

    运行 --> 错误: 异常处理
    错误 --> 恢复: 重试机制
    恢复 --> 运行
  1. 数据接收:协议插件接收到设备数据
  2. 格式转换:转换为driver-box内部标准格式
  3. 影子更新:更新内存中的设备影子状态
  4. 事件通知:触发数据变更事件给导出插件
  5. 持久化:定时将数据保存到SQLite数据库

内存层提供高性能的数据访问:

  • 实时性:毫秒级数据访问延迟
  • 并发安全:支持高并发读写操作
  • 内存效率:优化的数据结构和索引机制

持久层确保数据可靠性:

  • SQLite数据库:轻量级本地存储,无需额外依赖
  • 自动清理:TTL过期数据自动删除,控制存储空间
  • 数据压缩:历史数据压缩存储,节省磁盘空间
  • 备份恢复:支持数据备份和恢复功能

采用多级缓存策略优化性能:

  • LRU算法:最近最少使用淘汰,确保热点数据常驻
  • 预加载:系统启动时预加载频繁访问的数据
  • 写缓冲:批量写入减少磁盘I/O操作
  • 读缓存:热点数据缓存,进一步提升访问速度

使用乐观锁机制保证数据一致性:

// 并发控制示例
func UpdatePoint(deviceId, pointName string, value interface{}) error {
shadow := getDeviceShadow(deviceId)
originalVersion := shadow.Version
// 更新数据
shadow.Points[pointName].Value = value
shadow.Points[pointName].Timestamp = time.Now()
shadow.Version++
// 原子性检查和提交
if !atomicCompareAndSet(shadow, originalVersion) {
return errors.New("concurrent modification detected")
}
return nil
}

提供完整的事务支持:

  • 批量更新:支持多点位原子更新操作
  • 回滚机制:失败时自动回滚,保证数据完整性
  • 一致性保证:强一致性数据更新,不丢失任何变更

基于实际代码实现,设备影子提供以下核心API接口:

DeviceShadow接口 (driverbox/shadow/shadow.go)

type DeviceShadow interface {
// AddDevice 新增设备
AddDevice(id string, modelName string, ttl ...time.Duration)
// GetDevice 获取设备
GetDevice(id string) (device Device, ok bool)
// HasDevice 是否存在设备
HasDevice(id string) bool
// DeleteDevice 删除设备
DeleteDevice(id ...string) (err error)
// SetDevicePoint 设置设备点位值
SetDevicePoint(id, pointName string, value interface{}) (err error)
// GetDevicePoint 获取设备点位值
GetDevicePoint(id, pointName string) (value interface{}, err error)
// GetDevicePoints 获取设备所有点位
GetDevicePoints(id string) (points map[string]DevicePoint, err error)
// GetDevicePointDetails 获取设备点位详情
GetDevicePointDetails(id, pointName string) (point DevicePoint, err error)
// GetDeviceUpdateAt 获取设备最后更新时间
GetDeviceUpdateAt(id string) (time.Time, error)
// GetDeviceStatus 获取设备在离线状态
GetDeviceStatus(id string) (online bool, err error)
// SetOnline 设置设备为在线状态
SetOnline(id string) (err error)
// SetOffline 设置设备为离线状态
SetOffline(id string) (err error)
// MayBeOffline 可能离线事件(60秒内超过3次判定离线)
MayBeOffline(id string) (err error)
// GetDevices 获取所有设备
GetDevices() []Device
// SetWritePointValue 存储下发控制点位值
SetWritePointValue(id string, pointName string, value interface{}) (err error)
// GetWritePointValue 获取下发控制点位值
GetWritePointValue(id string, pointName string) (value interface{}, err error)
}

获取设备影子:

// 获取设备信息
device, ok := driverbox.Shadow().GetDevice("device-001")
if ok {
fmt.Printf("设备ID: %s, 在线状态: %v\n", device.ID, device.Online)
for pointName, point := range device.Points {
fmt.Printf("点位 %s: 值=%v, 更新时间=%v\n",
pointName, point.Value, point.UpdatedAt)
}
}

设置点位值:

// 设置设备点位值
err := driverbox.Shadow().SetDevicePoint("device-001", "temperature", 25.5)
if err != nil {
fmt.Printf("设置点位值失败: %v\n", err)
}

获取点位详情:

// 获取点位详细信息
point, err := driverbox.Shadow().GetDevicePointDetails("device-001", "temperature")
if err == nil {
fmt.Printf("点位名称: %s\n", point.Name)
fmt.Printf("当前值: %v\n", point.Value)
fmt.Printf("写入值: %v\n", point.WriteValue)
fmt.Printf("更新时间: %v\n", point.UpdatedAt)
fmt.Printf("写入时间: %v\n", point.WriteAt)
}

设备状态管理:

// 设置设备在线
err := driverbox.Shadow().SetOnline("device-001")
// 设置设备离线
err := driverbox.Shadow().SetOffline("device-001")
// 可能离线(用于网络异常场景)
err := driverbox.Shadow().MayBeOffline("device-001")
// 获取设备状态
online, err := driverbox.Shadow().GetDeviceStatus("device-001")

批量操作:

// 获取所有设备
devices := driverbox.Shadow().GetDevices()
for _, device := range devices {
fmt.Printf("设备: %s, 模型: %s, 在线: %v\n",
device.ID, device.ModelName, device.Online)
}
// 删除设备
err := driverbox.Shadow().DeleteDevice("device-001", "device-002")

写操作记录:

// 存储下发控制值
err := driverbox.Shadow().SetWritePointValue("device-001", "switch", true)
// 获取下发控制值
writeValue, err := driverbox.Shadow().GetWritePointValue("device-001", "switch")

系统提供全面的健康监控:

Terminal window
# 检查影子服务状态
GET /api/v1/health/shadow
# 查看缓存统计信息
GET /api/v1/shadow/stats

响应示例:

{
"memoryUsage": "45MB",
"cacheHitRate": "95.2%",
"totalDevices": 150,
"onlineDevices": 148,
"dataPoints": 1200
}

关键性能指标监控:

  • 响应时间:平均读写操作延迟
  • 吞吐量:每秒处理的请求数量
  • 缓存命中率:缓存有效性指标
  • 内存使用率:内存资源使用情况

支持将影子数据导出到外部系统:

{
"export": {
"shadow": {
"enable": true,
"format": "json",
"interval": "30s",
"targets": ["mqtt", "http", "file"]
}
}
}

提供丰富的历史数据查询功能:

Terminal window
# 查询指定时间范围的历史数据
GET /api/v1/history?device=device-001&point=temperature&start=2024-01-01&end=2024-01-31
# 聚合查询(平均值、最大值等)
GET /api/v1/history/aggregate?device=device-001&point=temperature&function=avg&interval=1h

完善的故障恢复策略:

  1. 内存丢失恢复:重启时从数据库自动恢复
  2. 数据不一致修复:自动检测和修复不一致数据
  3. 备份恢复:支持从备份文件恢复数据

高可用性保障:

  • 主从复制:支持影子数据的主从同步
  • 故障转移:主节点故障时自动切换
  • 数据分片:大规模部署时的水平扩展