Skip to content

Plugin开发

本文档详细介绍如何为driver-box开发自定义协议插件,实现对各种设备通信协议的支持。

driver-box采用插件化架构,允许开发者轻松扩展新的协议支持。每个协议对应一个插件,插件需要实现标准的Plugin接口。

插件是driver-box的核心扩展机制,负责:

  • 协议解析:将通用设备操作转换为特定协议格式
  • 通信管理:管理与设备的连接和通信
  • 数据处理:处理设备数据的编码、解码和转换
  • 生命周期管理:处理插件的初始化、运行和销毁
flowchart TD
    A[设备配置] --> B[Plugin.Initialize]
    B --> C[创建连接池]
    C --> D[设备通信]
    D --> E[Plugin.Connector]
    E --> F[Connector.Encode]
    F --> G[Connector.Send]
    H[系统关闭] --> I[Plugin.Destroy]
  • Go 1.18+
  • 熟悉Go语言和面向对象编程
  • 了解目标设备的通信协议
  • 掌握基本的并发编程概念
plugins/
├── your_protocol/ # 协议名称目录
│ ├── plugin.go # 插件入口文件
│ └── internal/ # 内部实现
│ ├── plugin.go # Plugin接口实现
│ ├── connector.go # Connector接口实现
│ ├── model.go # 数据模型定义
│ └── xxx.go # 其他辅助文件
package your_protocol
import (
"github.com/ibuilding-x/driver-box/driverbox"
"github.com/ibuilding-x/driver-box/plugins/your_protocol/internal"
)
// EnablePlugin 启用插件(必须导出此函数)
func EnablePlugin() {
driverbox.EnablePlugin(internal.ProtocolName, new(internal.Plugin))
}

Connector负责管理单个设备的通信连接:

package internal
import (
"errors"
"time"
"github.com/ibuilding-x/driver-box/driverbox/plugin"
"go.uber.org/zap"
)
// connector 连接器实现
type connector struct {
plugin *Plugin
config *ConnectionConfig
client interface{} // 协议客户端实例
lastActive time.Time
mutex sync.RWMutex
closed bool
deviceId string
}
// Encode 编码设备操作
func (c *connector) Encode(deviceId string, mode plugin.EncodeMode, values ...plugin.PointData) (interface{}, error) {
if c.closed {
return nil, errors.New("connector is closed")
}
c.mutex.Lock()
defer c.mutex.Unlock()
switch mode {
case plugin.ReadMode:
return c.encodeRead(deviceId, values)
case plugin.WriteMode:
return c.encodeWrite(deviceId, values)
default:
return nil, errors.New("unsupported encode mode")
}
}
// Send 发送数据到设备
func (c *connector) Send(data interface{}) error {
if c.closed {
return errors.New("connector is closed")
}
c.mutex.Lock()
defer c.mutex.Unlock()
// 实现具体的发送逻辑
// 例如:序列化数据并通过网络发送
if err := c.sendData(data); err != nil {
c.lastActive = time.Now() // 更新最后活跃时间
return err
}
c.lastActive = time.Now()
return nil
}
// Release 释放资源
func (c *connector) Release() error {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.closed {
return nil
}
c.closed = true
// 关闭底层连接
if c.client != nil {
// 根据具体协议关闭客户端
// 例如:c.client.Close()
}
driverbox.Log().Info("connector released",
zap.String("device", c.deviceId))
return nil
}
// 编码读取操作
func (c *connector) encodeRead(deviceId string, values []plugin.PointData) (interface{}, error) {
// 根据协议规范构建读取请求
// 这里需要根据具体的协议格式来实现
request := map[string]interface{}{
"device_id": deviceId,
"operation": "read",
"points": values,
}
return request, nil
}
// 编码写入操作
func (c *connector) encodeWrite(deviceId string, values []plugin.PointData) (interface{}, error) {
// 根据协议规范构建写入请求
request := map[string]interface{}{
"device_id": deviceId,
"operation": "write",
"points": values,
}
return request, nil
}
// 发送数据的具体实现
func (c *connector) sendData(data interface{}) error {
// 实现具体的网络发送逻辑
// 例如使用TCP/UDP/HTTP等协议发送数据
// 这里只是示例,需要根据具体协议实现
driverbox.Log().Debug("sending data",
zap.String("device", c.deviceId),
zap.Any("data", data))
// TODO: 实现实际的发送逻辑
// 例如:
// if tcpConn, ok := c.client.(*net.TCPConn); ok {
// _, err := tcpConn.Write(serializedData)
// return err
// }
return nil
}
package internal
import "encoding/json"
// ConnectionConfig 连接配置
type ConnectionConfig struct {
Host string `json:"host"` // 主机地址
Port int `json:"port"` // 端口
Timeout time.Duration `json:"timeout"` // 超时时间
ConnectionKey string `json:"connectionKey"` // 连接标识
Enable bool `json:"enable"` // 是否启用
// 协议特定的配置项
CustomParam1 string `json:"customParam1"` // 自定义参数1
CustomParam2 int `json:"customParam2"` // 自定义参数2
}
// UnmarshalJSON 自定义JSON反序列化
func (c *ConnectionConfig) UnmarshalJSON(data []byte) error {
// 可以在这里添加配置验证逻辑
return json.Unmarshal(data, c)
}
var (
// ErrConnectionFailed 连接失败
ErrConnectionFailed = errors.New("connection failed")
// ErrDeviceNotFound 设备未找到
ErrDeviceNotFound = errors.New("device not found")
// ErrInvalidResponse 无效的响应
ErrInvalidResponse = errors.New("invalid response")
// ErrTimeout 操作超时
ErrTimeout = errors.New("operation timeout")
)
// 使用driverbox的统一日志系统
driverbox.Log().Info("operation successful",
zap.String("protocol", ProtocolName),
zap.String("device", deviceId),
zap.Duration("duration", duration))
driverbox.Log().Error("operation failed",
zap.String("protocol", ProtocolName),
zap.String("device", deviceId),
zap.Error(err))
package internal_test
import (
"testing"
"time"
"github.com/ibuilding-x/driver-box/driverbox/plugin"
"github.com/ibuilding-x/driver-box/plugins/your_protocol/internal"
"github.com/stretchr/testify/assert"
)
func TestPlugin_Initialize(t *testing.T) {
plugin := &internal.Plugin{}
config := createTestConfig() // 创建测试配置
plugin.Initialize(config)
assert.NotNil(t, plugin.connPool)
}
func TestConnector_Encode(t *testing.T) {
conn := &internal.connector{}
values := []plugin.PointData{
{PointName: "temperature", Value: "25.5"},
}
result, err := conn.Encode("test_device", plugin.ReadMode, values...)
assert.NoError(t, err)
assert.NotNil(t, result)
}

创建测试配置文件 test_config.json

{
"deviceModels": [
{
"name": "测试设备",
"devices": [
{
"id": "test-device-001",
"connectionKey": "test-connection"
}
]
}
],
"connections": {
"test-connection": {
"host": "localhost",
"port": 8080,
"timeout": 5000,
"enable": true
}
},
"protocolName": "your_protocol"
}