LLM
January 19

Как я настроил саммаризацию постов с помощью YandexGPT

В этой статье я расскажу, как с помощью продуктов Яндекса настроить хранение, автоматическое создание краткого содержания (саммаризацию) и визуализацию постов телеграм-канала. Для меня, как автора одного из таких каналов и довольно педантичного человека, это эффективный способ систематизировать работу с контентом. Кроме того, это интересная возможность на практике познакомиться с LLM от компании Яндекс — YandexGPT.

Lego HR-аналитики используют YandexGPT

Получение данных из Telegram

Начнем с того, что научимся парсить телеграм-посты. Это единственная часть повествования, где я буду использовать стороннее решение, не относящееся к продуктам Яндекса.

Я использую tdl ("Telegram Downloader, but more than a downloader") [1]. Обратитесь к документации [2] для самостоятельной настройки этого инструмента на своей машине.

После настройки переходим в Python и читаем сообщения канала за всё время.

# Подключаем библиотеку subprocess
import subprocess

# Создаем функцию для выполнения команд tdl из Python
def run_tdl_export():
    command = [
        "tdl",
        "chat",
        "export",
        "-n",
        "quickstart",
        "-c",
        "<YOUR TELEGRAM CHANNEL ID>", #id вашего телеграм канала
        "--all",
        "--with-content",
        "-o",
        "C:/db/tg.json" #директория и файл, в который записан результат
    ]

    subprocess.run(command)

# Запускаем исполнение
if __name__ == "__main__":
    run_tdl_export())

Данные записываются в файл формата JSON. Теперь прочитаем их с помощью Python.

# Подключаем библиотеку json
import json

# Читаем файл из нашей директории
with open('C:\\db\\tg.json', 'r', encoding='utf-8') as f:
    data = json.load(f)

Пример загруженных данных

Для удобства представления я хочу трансформировать JSON в пандас датафрейм и добавить несколько полезных колонок.

#Подключаем необходимые библиотеки
import pandas as pd
import re
import datetime as dt


# Создаем ряд вспомогательных функций

# Функция для конвертации Unix-времени в ISO-формат
def convert_unix_to_date(timestamp):
    dt_object = dt.datetime.utcfromtimestamp(timestamp)
    formatted_date = dt_object.strftime('%Y-%m-%d')
    return formatted_date
    
# Функция для извлечения хештегов в виде списка
pattern = re.compile(r'#(\w+)')

def find_hashtags(text):
    return [hashtag.lower() for hashtag in pattern.findall(str(text))]
    
# Функция для извлечения хештегов в виде строки
def find_hashtags_string(text):
    return ', '.join([hashtag.lower() for hashtag in pattern.findall(str(text))])
    
# Ручной классификатор тем на основе ключевых фраз
def text_category(text):   
    phrases_categories = {
        'регрессиада': 'Регрессиада',
        'вы возглавляете hr-аналитику. что теперь?': 'Вы возглавляете HR-аналитику. Что теперь?',
        'анализ hr вакансий': 'Анализ HR вакансий',
        'hr дашборд в excel': 'HR дашборд в Excel',
        'байесовский фреймворк и hr задачи': 'Байесовский фреймворк и HR задачи',
        'нормальное распределение': 'Обзорная статистическая серия',
        'логарифмирование и преобразование переменных': 'Обзорная статистическая серия',
        'доверительные интервалы': 'Обзорная статистическая серия',
        'корреляция. теория': 'Обзорная статистическая серия',
        'корреляция. практика': 'Обзорная статистическая серия',
        'статистическая значимость и размер эффекта. теория': 'Обзорная статистическая серия',
        'статистическая значимость и размер эффекта. практика': 'Обзорная статистическая серия',
        'как перестать бояться и полюбить r': 'Как перестать бояться и полюбить R'
    }
    text_lower = text.lower()
    
    for phrase, category in phrases_categories.items():
        if phrase in text_lower:
            return category
        
    return None 
    
# # Трансформируем данные в Pandas DataFrame
df = pd.json_normalize(data['messages'])

df['date'] = df['date'].apply(convert_unix_to_date) #Конвертация даты
df['title'] = df['text'].apply(lambda x: str(x).split('\n')[0]) #Первая строка текста как заголовок
df['link'] = 'https://t.me/h0h1_hr_analytics/' + df['id'].astype(str) #Генерация ссылки
df['hashtags'] = df['text'].apply(find_hashtags) #Хештеги как список
df['hashtags_string'] = df['text'].apply(find_hashtags_string) #Хештеги как строка
df['refreshed_date'] =  dt.datetime.now().strftime('%Y-%m-%d %H:%M:%S') #Дата обновления
df['series'] = df['title'].apply(text_category) #Категория текста

Пример датафрейма

Генерация саммари с YandexGPT API

Для дальнейшей работы потребуется создать Яндекс ID и выполнить ряд настроек в сервисе Yandex Cloud. Вы можете обратиться к официальной документации, однако более быстрым и понятным вариантом я считаю первую часть статьи [3], где автор подробно описал все шаги в удобном формате.

К слову, во второй части этой статьи показано, как работать с API YandexGPT через HTTP-запросы. Мы же пойдем другим путем и будем использовать SDK для упрощения интеграции.

# Подключаем необходимые библиотеки
from __future__ import annotations
from yandex_cloud_ml_sdk import YCloudM

# Настраиваем модель YandexGPT через SDK
sdk = YCloudML(
    folder_id="<YOUR FOLDER ID>", # Замените на ваш ID каталога в Yandex Cloud
    auth="<YOUR TOKEN>"           # Укажите ваш токен аутентификации
)

model = sdk.models.completions("yandexgpt") # Инициализируем модель YandexGPT
model = model.configure(temperature=0.5) # Регулируем "креативность" модели

Я хочу написать такую фукнцию, которая будет делать короткое саммари не более 200 символов на основе текста моих постов (YandexGPT не всегда будет удерживать требование к длине, но нарушения не критичны).

# Подключаем библиотеку time
import time

# Генерация саммари для всех постов
def get_summary(text, delay=1):
    
    if not text:
        return ""
    try:
        time.sleep(delay) # Небольшая задержка для распределения запросов
        messages = [
            {"role": "system", "text": "Сделай очень краткий пересказ текста. Длина не более 200 знаков."},
            {"role": "user", "text": text},
        ]
        result = model.run(messages)
        for alternative in result:
            return alternative.text.strip() # Возвращаем первое подходящее саммари
    except Exception as e:
        print(f"Error processing text: {e}")
        return "" # Возвращаем пустую строку в случае ошибки

Прежде, чем вызвать функцию помните, что использование API YandexGPT производится на платной основе! С тарифами можно ознакомиться на странице документации [4], а на странице биллинга вашего сервиса вы можете отслеживать текущие затраты.

Чтобы минимизировать расходы, я напишу вторую функцию. Она будет генерировать саммари только для новых постов, избегая повторной обработки старых записей. Такой подход позволит сохранить бюджет и оптимизировать использование API.

# Генерация саммари только для новых постов
def conditional_summary(row, delay=1):
    if row['is_new']: # Проверяем, является ли пост новым
        time.sleep(delay)
        return get_summary(row['text'], delay=0)
    else:
        # Возвращаем существующее саммари для старого поста
        return current_table.loc[current_table['id'] == row['id'], 'summary'].values[0]

В первый раз применим нашу функцию для всех данных и запишем результат в новую колонку. В моем примере 205 телеграм постов, время работы функции заняло шесть минут, а стоимость составила 56 рублей.

df['summary'] = df['text'].apply(get_summary)

Преьвю саммаризацит постов в колонке summary

Хранение в ClickHouse

Поскольку большую часть проекта я решил реализовать с помощью продуктов Яндекса, результаты будем хранить в базе данных ClickHouse — одном из моих любимых инструментов. Однако вы, конечно, можете выбрать любое другое решение, которое вам больше по душе.

Разворачивается кластер также в среде Yandex Cloud, для чего вы можете обратиться к официальной документации или описанию с картинками от другого автора [5]. Отдельная благодарность сотруднику моей команды - Диме Майорову, который помог мне настроить ClickHouse еще в 2023 году.

После выполнения шагов по настройке базы, возвращаемся в Python и записываем результат в таблицу. Я выбрал путь полной перезаписи каждый раз, так как он для меня более удобен, но вы можете предусмотреть иную реализацию.

# Подключаем библиотеку clickhouse_connect
import clickhouse_connect

# Аутентификация и подключение к базе данных
client = clickhouse_connect.get_client(
    host='<YOUR HOST>',          # Хост ClickHouse-сервера
    port=<YOUR PORT>,            # Порт ClickHouse-сервера
    database='<YOUR DATABASE>',  # Имя базы данных
    username='<YOUR USERNAME>',  # Имя пользователя
    password='<YOUR PASSWORD>',  # Пароль пользователя
    client_cert_key='<YOUR CERTIFICATE ROOT>'  # Корневой сертификат для безопасного соединения (если требуется)
)

# Создаем таблицу, если она не существует
create_table_query = """
    CREATE TABLE IF NOT EXISTS tg_topics (
        id Int32,
        type String,
        file String,
        date Date,
        text String,
        title String,
        link String,
        hashtags Array(String),
        hashtags_string String,
        summary String,
        refreshed_date DateTime,
        series String
    ) ENGINE = MergeTree()
    ORDER BY id
"""

client.command(create_table_query)

# Очищаем таблицу
truncate_query = "TRUNCATE TABLE tg_topics"
client.command(truncate_query)

# Подготовка данных и запись в базу
csv_data = df.to_csv(index=False)
insert_query = f"INSERT INTO tg_topics FORMAT CSV {csv_data}"
client.command(insert_query)

Теперь мы можем добавить условия для генерации саммари только для новых постов. Идея заключается в том, чтобы для уже обработанных постов извлекать результаты из базы данных по их id.

# Считываем текущие записи из базы данных
read_query = "SELECT * FROM tg_topics"
current_table = client.query_df(read_query)

# Получаем список уже существующих ID
existing_ids = set(current_table['id'].tolist())

# Определяем, какие посты новые
df['is_new'] = ~df['id'].isin(existing_ids)

# Генерируем саммари
df['summary'] = df.apply(conditional_summary, axis=1)

Эти шаги необходимо вынести в начало выполнения скрипта, чтобы обеспечить корректное определение новых постов и исключить повторную обработку старых данных. Для получения полного финального кода вы можете обратиться к готовому скрипту в моем GitHub репозитории [6].

Визуализация в DataLens

Для визуализации данных мы воспользуемся еще одним продуктом Яндекса, а именно DataLens. У меня есть небольшой видеоурок, где я даю краткий курс по работе с этим инструментом [7]. Кроме того, вы можете обратиться к официальным справочным материалам и видео.

Вначале создадим подключение ClickHouse к нашей базе данных заполнив все необходимые плейсхолдеры.

В моей базе данных всего две таблицы, одна с постами телеграм-канала и вторая с HR-вакансиями HH. Настроим датасет на основе подходящей таблицы, все посты, которые не содержат тегов - я исключаю.

Переименуем часть полей на русский язык для более удобного отображения в будущем дашборде.

Создаю два дополнительны поля:

  1. Тема: используя выражение URL([link], LEFT([title], 100)), мы создаем кликабельное название поста. Таким образом, при клике на название пользователь будет перенаправлен на сам пост.
  2. hashtag_filter: поле, созданное с помощью функции UNNEST([Хештеги для фильтров]), позволяет настраивать фильтрацию по хештегам. Именно поэтому мы храним хештеги как в виде списка, так и в виде строки.

Далее я воспользуюсь обычной таблицей для визуализации.

Осталось только вынести таблицу на дашборд [8] и добавить необходимые фильтры.

Если вас интересует вопрос, как сделать весь процесс по сбору и хранению данных автоматическим, то вы можете воспользоваться моей статьей из серии про парсинг вакансий HH, где я разбираю этот аспект [9].

Ссылки

  1. https://github.com/iyear/tdl
  2. https://docs.iyear.me/tdl/getting-started/installation/
  3. Как подключить Yandex GPT к своему проекту на Python
  4. Правила тарификации для Yandex Foundation Models
  5. Инженер на минималках: установка и настройка ClickHouse
  6. https://github.com/alexander-botvin/h0h1_about_hr_analytics/blob/main/How%20I%20set%20up%20post%20summarization%20using%20YandexGPT/tg_topics.ipynb
  7. HR вакансии в DataLens
  8. https://datalens.yandex/kzgpuwj4mj169
  9. Ботвин А.Ю. Анализ HR вакансий. Часть 4. Автоматизация процесса