Класс TagGroupReaderConnector

Класс является наследником класса BaseConnector.

Класс создан для реализации коннекторов, самостоятельно инициирующих чтение данных из источника. Такие коннекторы обычно читают несколько тегов, причём теги могут читаться с разной частотой.

Данный класс группирует теги по частоте чтения и для каждой группы запускает отдельную бесконечную задачу чтения, выполняемую с определённой частотой.

Атрибуты

_tag_groups - группы тегов. Атрибут имеет вид:

{
    <frequency_1>: {
        "tags": ["tag_id_1", "tag_id_2"]
    },
    <frequency_2>: {
        "tags": ["tag_id_3", "tag_id_4"]
    }
}

Если кэш группы должен содержать ещё какие-либо атрибуты, кроме tags, то необходимо переопределить метод default_tag_group.

Методы

def default_tag_group(self) - метод создаёт группу тегов по умолчанию в виде объекта:

{
    "tags": []
}

Если необходимо расширить группу дополнительными ключами, то метод необходимо переопределить:

def default_tag_group(self):
    res = super().default_tag_group()
    res["additional_key"] = "some value"
    return res

async def _read_group(self, frequency: float) - метод чтения тегов, запускаемый для каждой группы.

Абстрактный метод, обязательный для переопределения в классах-наследниках.

В отличие от класса BaseConnector, основная работа по чтению данных должна быть реализована именно в этом методе, а не в _read_tags.

Периодичность вызова обеспечивается классом.

async def _read_tags(self) - метод работает как задача и запускает, в свою очередь, для каждой группы тегов отдельную задачу чтения. Когда коннектор получает от платформы новую конфигурацию, этот метод получает сигнал прекращения работы и останавливает все задачи чтения данных.

В случае, если при старте работы коннектора необходимо выполнить дополнительную работу, к примеру, для каждой группы создать свой экземпляр клиента чтения данных, то метод должен быть переопределён:

async def _read_tags(self):

    for frequncy in self._tag_groups.keys():
        self._tag_groups[frequency]["client"] = SomeProtocolClient()

    super()._read_tags()

async def _remove_tag_cache(self, tag_id: str) - метод удаляет кэш тега. Если тег - последний в группе, то удаляется и сама группа.

Соответственно, если при удалении группы необходимо выполнить дополнительную работу, то переопределяем этот метод:

async def _remove_tag_cache(self, tag_id: str):
    frequency = self._config_from_platfrom.tags[tag_id].prsJsonConfigString.source.get("frequency", 5)

    tag_group = self._tag_groups[frequency]
    """
    Некоторая дополнительная работа с tag_group...
    """

    super()._remove_tag_cache(tag_id)