Разработка нового коннектора
Внимание
В документации используется термин «привязка»: тег «привязан» к коннектору. Это означает, что коннектор должен поставлять данные для этого тега.
Примечание
Предположим, что создан проект для нового коннектора и установлен пакет prs_connector_core.
Самый простой коннектор для протокола InduX
В качестве примера напишем самый простой коннектор для некоторого выдуманного протокола InduX.
Допустим, протокол не предполагает подписки на изменения значения параметров, и данные клиенту необходимо читать самому с определённой частотой.
Кроме того, для того, чтобы не возиться с низкоуровневым разбором байтов, предположим, что у нас есть
пакет py_indux для чтения данных с устройств, работающих по протоколу InduX.
Предварительная работа
Перед разработкой коннектора необходимо определить:
Способ связи с устройством, работающим по этому протоколу и, соответственно, какая информация необходима коннектору, чтобы он мог связаться с источником данных.
Как получать данные для определённого параметра (тега) из источника.
Пример для протокола Modbus RTU
Для связи с устройством необходимо знать:
порт, на котором «висит» устройство;
скорость передачи данных;
количество бит в байте;
проверка чётности;
количество стоп-бит;
таймаут
Таким образом, коннектору для связи с устройством необходимо передать всю эту информацию примерно в такой структуре:
{ "port": "/dev/ttyUSB0", "baudrate": 115200, "bytesize": 8, "parity": "N", "stopbits": 1, "timeout": 3 }Для чтения данных какого-либо параметра необходимо знать:
тип регистров
номер стартового регистра
количество регистров для чтения
тип хранимого значения
номер устройства
Таким образом, каждый тег, привязанный к коннектору, должен содержать примерно такую структуру:
{ "regType": "H", # H - holding, D - discrete, C - coils, I - input "address": 6, # адрес регистра для чтения "count": 2, # количество регистров, которые необходимо прочитать "slave": 1, # номер устройства "dataType": "FLOAT32", # тип хранимого значения "frequency": 5 # частота (в секундах), с которой необходимо читать значение тега }Предположим, что к коннектору привязан только один тег. Тогда, в конечном итоге, при старте коннектор должен получить полную конфигурацию от платформы примерно такого вида:
{ "action": "prsConnector.full_configuration", "data": { "prsActive": True, # коннектор активен "prsEntityTypeCode": None, # не обращаем внимания "prsJsonConfigString": { "source": { # как подключаться к источнику данных "port": "/dev/ttyUSB0", "baudrate": 115200, "bytesize": 8, "parity": "N", "stopbits": 1, "timeout": 3 }, "log": { # конфигурация журнала логов "level": "INFO", "fileName": "logs/prs_connector.log", "maxBytes": 10485760, "backupCount": 10 } }, "tags": { # список тегов, привязанных к коннектору "ac620c9e-8fd1-103f-9850-bf1c6e0c417e": { # идентификатор тега "prsActive": True, # флаг активности тега "prsJsonConfigString": { "source": { # способ получения данных тега из источника "regType": "H", "address": 6, "count": 2, "slave": 1, "dataType": "FLOAT32", "frequency": 5 }, "maxDev": 0.5, # минимальное значимое отклонение значения "JSONata": "$abs($)>0?1:0" # предположим, что в источнике данных # по указанному адресу хранится мгновенное значение # тока # а наш тег - это флаг того, работает устройство или нет # соответственно, если ток > 0, то устройство работает # записанное в этом ключе JSONata выражение # будет делать то, что нам надо: # возращать 1, если ток > 0 и 0 в противном случае }, "prsValueTypeCode": 0 # тип значений тега: # 0 = int } } } }
Допустим, протокол InduX очень прост:
Устройство подключается через порт USB и имя подключаемого порта -
/dev/ttyUSB0.Для чтения текущего значения какого-либо параметра из устройства достаточно указать его имя.
Внимание
Для упрощения кода примем, что теги читаются из источника с одной, общей для всех частотой, поэтому эта частота указывается в конфигурации всего коннектора, а не для каждого тега отдельно.
Таким образом, полная конфигурация, которая будет приходить коннектору, примет примерно такой вид:
{
"action": "prsConnector.full_configuration",
"data": {
"prsActive": True, # коннектор активен
"prsEntityTypeCode": None, # не обращаем внимания
"prsJsonConfigString": {
"source": { # как подключаться к источнику данных
"port": "/dev/ttyUSB0",
"baudrate": 115200,
"bytesize": 8,
"parity": "N",
"stopbits": 1,
"timeout": 3,
"frequency": 1 # частота чтения данных из источника
},
"log": { # конфигурация журнала логов
"level": "INFO",
"fileName": "logs/prs_connector.log",
"maxBytes": 10485760,
"backupCount": 10
}
},
"tags": { # Список тегов, привязанных к коннектору.
"ac620c9e-8fd1-103f-9850-bf1c6e0c417e": { # Идентификатор тега.
# Допустим, тег - это ток,
# причём источник отдаёт данные в виде
# целого числа, в миллиамперах.
# Модель же, реализованная в платформе, предполагает,
# что ток - вещественное числои и измеряется в амперах.
# Соответственно, прочитанное значение нам нужно
# разделить на 1000.
"prsActive": True, # флаг активности тега
"prsJsonConfigString": {
"source": { # способ получения данных тега из источника
"name": "current"
},
"maxDev": 0.1, # минимальное значимое отклонение - 0.1 А
"JSONata": "$/1000" # выражение преобразует миллиамперы в амперы
},
"prsValueTypeCode": 0 # тип значений тега:
# 1 = float
}
}
}
}
Код коннектора
1import sys
2import asyncio
3from time import time
4# включаем в проект пакет для низкоуровневой работы с протоколом InduX
5from py_indux import InduXClient, InduXException
6# из пакета с базовым классом коннектора
7# импортируем сам базовый класс и функцию,
8# возвращающую текущую метку времени
9from prs_connector_core import BaseConnector, ts, main
10
11class InduXConnector(BaseConnector):
12
13 # В общем случае, это единственный метод, который мы должны переопределить у базового класса.
14 # Метод запускается базовым классом в виде отдельной задачи
15 # каждый раз, когда от платформы приходит исправленная конфигурация
16 # (не считая изменений, касающихся тегов, для них - отдельные сообщения).
17 # Таким образом, метод _read_tags останавливается и после обновления конфигурации запускается вновь
18 async def _read_tags(self):
19
20 async def get_data():
21 # Для упрощения предположим, что коннектор читает все теги из источника
22 # с одной частотой.
23 # В противном случае придётся разбивать теги по группам в зависимости от частоты
24 # чтения и запускать для каждой группы свою задачу чтения данных из источника.
25 interval = self._config_from_platfrom.prsJsonConfigString.source["frequency"]
26 while True:
27
28 # Зафиксируем время начала выполнения чтения.
29 start_time = time()
30
31 # Заготовка для пакета данных.
32 data = {"data": []}
33 # Время в платформе хранится как количество микросекунд, прошедших, начиная
34 # с 01-01-1970 00:00:00 UTC.
35 # Функция ts() текущую метку времени в указанном формате.
36 # Предполагаем, что источник данных не возвращает метку времени вместе со значением,
37 # поэтому мы должны присвоить её сами.
38 current_time = ts()
39 # пробегаем циклом по всем тегам в конфигурации
40 for tag_id, tag_config in self._config_from_platfrom.tags:
41
42 # читаем значение каждого тега из конфигурации
43 value = await self._client.read(tag_config.prsJsonConfigString.source["name"])
44 # формируем пакет данных
45 data["data"].append(
46
47 {
48 "tagId": tag_id,
49 "data": [
50
51 [value, current_time]
52
53 ]
54
55 }
56
57 )
58
59 # Помещаем сформированный пакет данных в очередь сообщений.
60 self._data_queue.put_nowait(data)
61
62 self._logger.info("Цикл чтения данных завершён.")
63
64 # Считаем, сколько времени заняло чтение данных.
65 elapsed = time() - start_time
66 wait_time = max(0, interval - elapsed)
67 # Делаем задержку между циклами чтения.
68 await asyncio.sleep(wait_time)
69
70 while True:
71 try:
72 # Соединяемся с источником данных
73 source = self._config_from_platfrom.prsJsonConfigString.source
74 self._client = InduXClient(
75
76 port=source["port"],
77 baudrate=source["baudrate"],
78 bytesize=source["bytesize"],
79 parity=source["parity"],
80 stopbits=source["stopbits"],
81 timeout=source["timeout"]
82
83 )
84
85 await self._client.connect()
86
87 self._logger.info("Соединение с источником установлено.")
88
89 # Запускаем цикл чтения данных.
90 await get_data()
91 except InduXException as ex:
92 self._logger.exception(f"Ошибка чтения данных: {ex}")
93
94 # При ошибке чтения записываем в каждый тег значение None
95 # с кодом качества = 102, что обозначает разрыв связи коннектора
96 # с источником данных.
97 data = {"data": []}
98 current_time = ts()
99 for tag_id, tag_config in self._config_from_platfrom.tags:
100
101 data["data"].append(
102
103 {"tagId": tag_id, "data": [[None, current_time, 102]]}
104
105 )
106
107 self._data_queue.put_nowait(data)
108
109if __name__ == "__main__":
110 main(InduXConnector)