Класс TagGroupReaderConnector
Класс является наследником класса BaseConnector.
Класс создан для реализации коннекторов, самостоятельно инициирующих чтение данных из источника. Такие коннекторы обычно читают несколько тегов, причём теги могут читаться с разной частотой.
Данный класс группирует теги по частоте чтения и для каждой группы запускает отдельную бесконечную задачу чтения, выполняемую с определённой частотой.
Требование к конфигурации привязанного к коннектору тега
Коннектор вводит дополнительное правило: атрибут
sourceв конфигурации, хранимой вprsJsonConfigStringпривязанного тега, должен содержать ключfrequency, значение которого - частота чтения тега из источника в секундах. Тип значения - вещественное.В случае, если ключ
frequencyотсутствует, то принимается значение по умолчанию в пять секунд.
Атрибуты
_tag_groups - группы тегов. Атрибут имеет вид:
{
<frequency_1>: {
"tags": ["tag_id_1", "tag_id_2"]
},
<frequency_2>: {
"tags": ["tag_id_3", "tag_id_4"]
}
}
Если кэш группы должен содержать ещё какие-либо атрибуты, кроме tags,
то необходимо переопределить метод default_tag_group.
Методы
def default_tag_group(self) - метод создаёт группу тегов по умолчанию в виде объекта:
{
"tags": []
}
Если необходимо расширить группу дополнительными ключами, то метод необходимо переопределить:
def default_tag_group(self):
res = super().default_tag_group()
res["additional_key"] = "some value"
return res
async def _read_group(self, frequency: float) - метод чтения тегов, запускаемый для каждой группы.
Абстрактный метод, обязательный для переопределения в классах-наследниках.
В отличие от класса BaseConnector, основная работа по чтению данных
должна быть реализована именно в этом методе, а не в _read_tags.
Периодичность вызова обеспечивается классом.
async def _read_tags(self) - метод работает как задача и запускает, в свою очередь, для каждой группы
тегов отдельную задачу чтения. Когда коннектор получает от платформы новую конфигурацию,
этот метод получает сигнал прекращения работы и останавливает все задачи чтения данных.
В случае, если при старте работы коннектора необходимо выполнить дополнительную работу, к примеру, для каждой группы создать свой экземпляр клиента чтения данных, то метод должен быть переопределён:
async def _read_tags(self):
for frequncy in self._tag_groups.keys():
self._tag_groups[frequency]["client"] = SomeProtocolClient()
super()._read_tags()
async def _remove_tag_cache(self, tag_id: str) - метод удаляет кэш тега.
Если тег - последний в группе, то удаляется и сама группа.
Соответственно, если при удалении группы необходимо выполнить дополнительную работу, то переопределяем этот метод:
async def _remove_tag_cache(self, tag_id: str):
frequency = self._config_from_platfrom.tags[tag_id].prsJsonConfigString.source.get("frequency", 5)
tag_group = self._tag_groups[frequency]
"""
Некоторая дополнительная работа с tag_group...
"""
super()._remove_tag_cache(tag_id)