Разработка нового коннектора

Внимание

В документации используется термин «привязка»: тег «привязан» к коннектору. Это означает, что коннектор должен поставлять данные для этого тега.

Примечание

Предположим, что создан проект для нового коннектора и установлен пакет prs_connector_core.

Самый простой коннектор для протокола InduX

В качестве примера напишем самый простой коннектор для некоторого выдуманного протокола InduX.

Допустим, протокол не предполагает подписки на изменения значения параметров, и данные клиенту необходимо читать самому с определённой частотой.

Кроме того, для того, чтобы не возиться с низкоуровневым разбором байтов, предположим, что у нас есть пакет py_indux для чтения данных с устройств, работающих по протоколу InduX.

Предварительная работа

Перед разработкой коннектора необходимо определить:

  1. Способ связи с устройством, работающим по этому протоколу и, соответственно, какая информация необходима коннектору, чтобы он мог связаться с источником данных.

  2. Как получать данные для определённого параметра (тега) из источника.

Допустим, протокол InduX очень прост:

  1. Устройство подключается через порт USB и имя подключаемого порта - /dev/ttyUSB0.

  2. Для чтения текущего значения какого-либо параметра из устройства достаточно указать его имя.

Внимание

Для упрощения кода примем, что теги читаются из источника с одной, общей для всех частотой, поэтому эта частота указывается в конфигурации всего коннектора, а не для каждого тега отдельно.

Таким образом, полная конфигурация, которая будет приходить коннектору, примет примерно такой вид:

{
    "action": "prsConnector.full_configuration",
    "data": {
        "prsActive": True,              # коннектор активен
        "prsEntityTypeCode": None,      # не обращаем внимания
        "prsJsonConfigString": {
            "source": {                 # как подключаться к источнику данных
                "port": "/dev/ttyUSB0",
                "baudrate": 115200,
                "bytesize": 8,
                "parity": "N",
                "stopbits": 1,
                "timeout": 3,
                "frequency": 1         # частота чтения данных из источника
            },
            "log": {                # конфигурация журнала логов
                "level": "INFO",
                "fileName": "logs/prs_connector.log",
                "maxBytes": 10485760,
                "backupCount": 10
            }
        },
        "tags": {                                           # Список тегов, привязанных к коннектору.
            "ac620c9e-8fd1-103f-9850-bf1c6e0c417e": {       # Идентификатор тега.
                                                            # Допустим, тег - это ток,
                                                            # причём источник отдаёт данные в виде
                                                            # целого числа, в миллиамперах.
                                                            # Модель же, реализованная в платформе, предполагает,
                                                            # что ток - вещественное числои и измеряется в амперах.
                                                            # Соответственно, прочитанное значение нам нужно
                                                            # разделить на 1000.
                "prsActive": True,                          # флаг активности тега
                "prsJsonConfigString": {
                    "source": {                             # способ получения данных тега из источника
                        "name": "current"
                    },
                    "maxDev": 0.1,                          # минимальное значимое отклонение - 0.1 А
                    "JSONata": "$/1000"                     # выражение преобразует миллиамперы в амперы
                },
                "prsValueTypeCode": 0                       # тип значений тега:
                                                            # 1 = float
            }
        }
    }
}

Код коннектора

  1import sys
  2import asyncio
  3from time import time
  4# включаем в проект пакет для низкоуровневой работы с протоколом InduX
  5from py_indux import InduXClient, InduXException
  6# из пакета с базовым классом коннектора
  7# импортируем сам базовый класс и функцию,
  8# возвращающую текущую метку времени
  9from prs_connector_core import BaseConnector, ts, main
 10
 11class InduXConnector(BaseConnector):
 12
 13    # В общем случае, это единственный метод, который мы должны переопределить у базового класса.
 14    # Метод запускается базовым классом в виде отдельной задачи
 15    # каждый раз, когда от платформы приходит исправленная конфигурация
 16    # (не считая изменений, касающихся тегов, для них - отдельные сообщения).
 17    # Таким образом, метод _read_tags останавливается и после обновления конфигурации запускается вновь
 18    async def _read_tags(self):
 19
 20        async def get_data():
 21            # Для упрощения предположим, что коннектор читает все теги из источника
 22            # с одной частотой.
 23            # В противном случае придётся разбивать теги по группам в зависимости от частоты
 24            # чтения и запускать для каждой группы свою задачу чтения данных из источника.
 25            interval = self._config_from_platfrom.prsJsonConfigString.source["frequency"]
 26            while True:
 27
 28                # Зафиксируем время начала выполнения чтения.
 29                start_time = time()
 30
 31                # Заготовка для пакета данных.
 32                data = {"data": []}
 33                # Время в платформе хранится как количество микросекунд, прошедших, начиная
 34                # с 01-01-1970 00:00:00 UTC.
 35                # Функция ts() текущую метку времени в указанном формате.
 36                # Предполагаем, что источник данных не возвращает метку времени вместе со значением,
 37                # поэтому мы должны присвоить её сами.
 38                current_time = ts()
 39                # пробегаем циклом по всем тегам в конфигурации
 40                for tag_id, tag_config in self._config_from_platfrom.tags:
 41
 42                    # читаем значение каждого тега из конфигурации
 43                    value = await self._client.read(tag_config.prsJsonConfigString.source["name"])
 44                    # формируем пакет данных
 45                    data["data"].append(
 46
 47                        {
 48                            "tagId": tag_id,
 49                            "data": [
 50
 51                                [value, current_time]
 52
 53                            ]
 54
 55                        }
 56
 57                    )
 58
 59                # Помещаем сформированный пакет данных в очередь сообщений.
 60                self._data_queue.put_nowait(data)
 61
 62                self._logger.info("Цикл чтения данных завершён.")
 63
 64                # Считаем, сколько времени заняло чтение данных.
 65                elapsed = time() - start_time
 66                wait_time = max(0, interval - elapsed)
 67                # Делаем задержку между циклами чтения.
 68                await asyncio.sleep(wait_time)
 69
 70        while True:
 71            try:
 72                # Соединяемся с источником данных
 73                source = self._config_from_platfrom.prsJsonConfigString.source
 74                self._client = InduXClient(
 75
 76                    port=source["port"],
 77                    baudrate=source["baudrate"],
 78                    bytesize=source["bytesize"],
 79                    parity=source["parity"],
 80                    stopbits=source["stopbits"],
 81                    timeout=source["timeout"]
 82
 83                )
 84
 85                await self._client.connect()
 86
 87                self._logger.info("Соединение с источником установлено.")
 88
 89                # Запускаем цикл чтения данных.
 90                await get_data()
 91            except InduXException as ex:
 92                self._logger.exception(f"Ошибка чтения данных: {ex}")
 93
 94                # При ошибке чтения записываем в каждый тег значение None
 95                # с кодом качества = 102, что обозначает разрыв связи коннектора
 96                # с источником данных.
 97                data = {"data": []}
 98                current_time = ts()
 99                for tag_id, tag_config in self._config_from_platfrom.tags:
100
101                    data["data"].append(
102
103                        {"tagId": tag_id, "data": [[None, current_time, 102]]}
104
105                    )
106
107                self._data_queue.put_nowait(data)
108
109if __name__ == "__main__":
110    main(InduXConnector)