Оповещение о новых видео на YouTube. Python3, SQLite, BS4
Начнём как обычно с импрота зависимостей.
import requests from bs4 import BeautifulSoup import sqlite3 from time import sleep from aiogram import Bot, types from aiogram.dispatcher import Dispatcher from aiogram.utils import executor import asyncio
Кстати говоря... Aiogram поддерживает Python 3.7+. Еще - хотелось бы сказать спасибо комьюнити сообщества Aiogram за помощь, если вам тоже нужна помощь с каким либо своим проектом - можете задавать им вопросы в их чате:
https://t.me/aiogram_ru . Ну и хотелось бы сказать спасибо своему другу который помог мне с написанием конкретного элемента который я и объяснить то теперь не могу ;D. Но ничего .там всего 2 строки. Приступим к коду
Давайте сразу обозначим бота и всё что ему нужно.
TOKEN = "222222222:фффффффффф2-2ффффффффффs_ффффф" bot = Bot(token=TOKEN) dp = Dispatcher(bot)
Далее у меня идёт функция для проверки последнего видео на канале:
def video(link): r = requests.get(link) soup = BeautifulSoup(r.text, 'html.parser') last_video = soup.find_all("h3", {"class": "yt-lockup-title "})[0].find("a")["href"] return last_video
Если вы читали предыдущие статьи по парсингу страниц то для вас тут нет ничего сложно, а всем остальным предлагаю ознакомиться, т.к. там всё очень доступно описано. Данная функция возвращает нам короткую ссылку на видео вида /watch=v_askasjd28
Далее будем связываться с базой данных. Если говорить на чистоту то я писал это нереально много времени. >6 часов. Не хватает мне практики в бд. Ну ничего. Продолжим.
def bd(nubmer): con = sqlite3.connect("yt.db") cur = con.cursor() cur.execute(f'CREATE TABLE IF NOT EXISTS "{str(nubmer)}" ("1", "2" TEXT)') con.commit() cur.close() con.close()
Тут всё довольно таки просто (ведь это только начало)
Итак, сначала мы устанавливаем соединения с базой данных с помощью
con = sqlite3.connect("yt.db")
Затем мы создаём курсор. Курсор нужен для записи данных если говорить грубо.
Далее идёт что то более интересное.
cur.execute(f'CREATE TABLE IF NOT EXISTS "{str(nubmer)}" ("1", "2" TEXT)')
Тут мы делаем запрос к базе данных и создаем таблицу если она не создана. Если переводить то будет что то вроде этого: "Создать таблицу если её еще не существует {Название таблцы}" ("Название первого столбца" , "Название второго столбца", Тип данных обоих столбцов) Можно было бы спросить ,почему разные ковычки. Где то одинарные, где то двойные. Дело в том что допустим текст в двойных ковычках будет обозначаться в одинарных. это нужно было во время теста да и сейчас нужно. У меня очень костыльная база. Название столбцов это цифры, то есть с помощью цикла for можно пройтись по ним всем. Мне показалось это удобным в данном случае. Каждая таблица отвечает за определенный канал.
Идём далее.
con.commit() cur.close() con.close()
commit() нужен для того что бы изменения или добавления в базе данных сохранились. Затем нам необходимо закрыть соединения. Это обязательный пунк т.к. SQLite не поддерживает более одного активного подключения. Но мы ведь тут не игры пишем, нам и одного хватит С: главное не забывать закрывать соединения которые вы открыли.
Теперь уже можно приступить к написанию функции которая будет подсчитывать сколько у нас всего таблиц.
def check(): con = sqlite3.connect('yt.db') cur = con.cursor() meta = cur.execute("SELECT * FROM sqlite_master WHERE type='table';") disallowed_prefix = "sqlite_stat" last_table = [table[1] for table in meta if isinstance(table, tuple) and not table[1].startswith(disallowed_prefix)] con.commit() meta.close() cur.close() con.close() return last_table[-1]
Коннектимся к базе, создаём курсор. Всё как обычно С:, затем нам надо с помощью sqlite_master выбрать все что является таблицой. Что же такое sqlite_master? Понятия не имею ,но на хабре есть интересная статья по этому поводу, хочешь вникнуть? Прочти! https://habr.com/ru/post/223451/ . Далее идут 2 строки кода которые мне написали...Дя, не так хорош я в этих ваших базах данных ,но если говорить в кратце то страка last_table возвращает нам все названия таблиц в массиве. Что нам и нужно) Удивительно. Причем человек написал мне это с телефона. за минуту если не меньше. Затем мы опять же закрываем все соединения.
Далее была написана функция
def link_writer(link, nubmer): con = sqlite3.connect("yt.db") cur = con.cursor() mass = [] mass.append(link) mass.append(video(link)) cur.execute(f"INSERT INTO '{str(nubmer)}' VALUES(?,?)", (mass[0], mass[1])) con.commit() cur.close() con.close()
Тут идёт стандартный набор, законектился, получил курсор, создал пустой массив, добавил в массив то что передал в функцию, затем кинул ту же ссылку в функцию video которая была выше что бы вытащить короткую ссылку. и затем добавил это в базу данных. НО, сюда надо передать еще и число, а число это название таблицы. Вы поймете почему так - увидев следующею функцию.
Далее была сделана функция добавления канала в базу данных из бота. Поэтому она Async.
async def add_chanel(chanel_link): bd(int(check()) + 1) link_writer(chanel_link, check())
Ну и тут вроде уже всё встаёт на свои места, сначала мы создаём новую таблицу, что бы узнать какой нам нужен номер для создания таблицы - мы вызываем функцию check, она возвращает число, мы прибавляем к нему единицу, ведь мы получили последний элемент, нам надо добавить единицу что бы он стал уникальным. Так вот, вызвали функцию bd() и в неё передали число которое получили от check + 1, теперь нам надо записать новый канал, мы вызываем функцию link_writer( ), в неё передаём ссылку который пользователь должен будет прислать в бота. Ну и вместо числа подставляем функцию check(), которая вернёт последнею таблицу, но пустую, ведь мы её ток создали С:.
Далее для моего удобства была написана функция для отправки сообщений. Можете её заменить на стандартную функцию либы aiogram.
URL = "https://api.telegram.org/bot" + TOKEN + "/" def send_message(chat_id, text="Wait a second please..."): url = URL + "sendmessage?chat_id={}&text={}&parse_mode=HTML".format(chat_id, text) requests.get(url)
Передаём в функцию айди и текст, она отправляет сообщение через бота. Просто решил выдрать её из своего прошлого кода. Функция гипер простая, простой get запрос. Далее идёт самое сложное. Главная функция на которую было убито много времени. Изначально она была распилена на 4 функции, но потом было принято решение совместить все функции, ибо возникали проблемы с закрытием соединения с базой. За совет всё так же спасибо тому другу который написал для меня 2 важных строки кода.
def check_links(): con = sqlite3.connect('yt.db') cur = con.cursor() meta = cur.execute("SELECT * FROM sqlite_master WHERE type='table';") disallowed_prefix = "sqlite_stat" last_table = [table[1] for table in meta if isinstance(table, tuple) and not table[1].startswith(disallowed_prefix)] for i in range(1, int(last_table[-1]) + 1): channel_link = con.execute(f'SELECT "1" FROM "{str(i)}"') last_video = con.execute(f'SELECT "2" FROM "{str(i)}"') alla = channel_link.fetchall()[0][0] r = requests.get(alla) soup = BeautifulSoup(r.text, 'html.parser') last_video2 = soup.find_all("h3", {"class": "yt-lockup-title "})[0].find("a")["href"] if last_video.fetchall()[-1][0] != last_video2: print("Опа, новое видео") mass2 = [] cl = alla print(cl) mass2.append(cl) short_link = video(cl) mass2.append(short_link) cur.execute(f"INSERT INTO '{str(i)}' VALUES(?,?)", (mass2[0], mass2[1])) con.commit() send_message(771844687, "https://www.youtube.com" + short_link) else: print("Видео старое...")
Создаём коннект с базой, вроде уже дефолт, да? Далее все тем же методом который был в функции check() мы вытаскиваем последнею таблицу. Почему бы нам просто не вызвать функцию check? Да всё просто, не стоит открывать дополнительные соединения с базой, они могут не закрыться xD
Далее идёт обычный цикл, от одного до последней таблицы + 1. Т.К. начинаем с одного а не с нуля, т.к. у нас база с еденицы начинается (ага, идиот блеан, надо было с нуля начинать и не париться). В общем мы идём по каждой таблицы. из неё вытаскиваем ссылку на канал и короткую ссылку на видео. Идём ниже
alla = channel_link.fetchall()[0][0]
Прекрасная вещь, не знаю как она пришла ко мне в голову но с ней работает, а если передавать передавать channel_link.fetchall()[0][0] то оно так работать не будет. Связано с тем что мы уже один раз получили всех, и наш курсор остался в конце. еще раз всё без "перевхода" в базу данных мы получить не можем. Далее мы делаем запрос на полную ссылку :
r = requests.get(alla)
Затем мы опять же парсим по этой ссылке видео и получаем короткую ссылку для сравнения её с той что лежит у нас в БД. Далее сравниваем:
if last_video.fetchall()[-1][0] != last_video2: print("Опа, новое видео") mass2 = [] cl = alla mass2.append(cl) short_link = video(cl) mass2.append(short_link) cur.execute(f"INSERT INTO '{str(i)}' VALUES(?,?)", (mass2[0], mass2[1])) con.commit() send_message(123123, "https://www.youtube.com" + short_link) else: print("Видео старое...")
То есть что тут у нас происходит, мы сравниваем то что нам пришло с запроса и то что лежало в БД, итак, если оно не совпадает - значит вышло видео. Мы это печатаем в консоль, создаем пустой массив, обозначаем переменную через переменную, ибо по другому оно не хотело работать, далее добавляем её в массив, это полный линк на видео, затем мы достаём короткий линк на видео с помощью нашей функции video(), затем мы этот массив - добавляем в нужную нам таблицу. Как узнать что она нужная? та всё просто, мы ведь все еще в цикле for, где i и есть номер нашей таблицы. Далее сохраняем всё это, и отправляем сообщение пользователю с айди который вы укажите. Ну а если ссылка не новая то просто печатаем что видео старое. На это еще не всё.
@dp.message_handler(commands=['start']) async def process_start_command(message: types.Message): await message.reply("TEST TEST", reply=False) @dp.message_handler(commands=['add']) async def process_start_command(message: types.Message): chek_word = message.text.replace("/add ", "") await add_chanel(chek_word) await message.reply("TEST2 TEST2", reply=False)
Тут у нас два обработчика собщений которые могут поступать боту. Один на старт и он ответит "TEST TEST" а второй на добавление ссылки в базу данных. Проверял на купленном канале, отстук идёт, на счет других каналов хз, времени особо небыло, но думаю что ничего не измениться и он будет работать. Что же, идём дальше.
async def all(): while True: check_links() await asyncio.sleep(300)
Функция которая будет проверять каждые 5 минут (советую ставить каждые 20 - 12000 секунд) вышло ли новое видео. Почему я советую ставить каждые 20 минут? потому что когда пользователь выкладывает видео - оно весит 10 минут "на главной" на его канале и только потом появляется во вкладке "Видео".
Ну и последняя фунция, очень маленькая но жутко нужная :D .
if __name__ == '__main__': loop = asyncio.get_event_loop() loop.create_task(all()) executor.start_polling(dp, loop=loop)
Сначала мы создаём loop, затем создаем ему задачу, задача - наша функция, которая будет выполнятся каждые 5 минут. Ну и запускаем бота. Можете тестить подняв у себя на пк / сервере. Ниже я оставлю полный листинг программы.
Ну и самое главное забыл сказать, для корректной работы программы - лучше всего создать отдельный python файл и через него записать первую таблицу базы данных, а точнее сделать таблицу "1" и задать ей соответсвенно линк на канал и короткую ссылку на видео. Это обязательно.
А , ну да, еще кое что) не кидайте просто ссылку на канал, переходите в раздел видео и только потом копируйте, по итогу у вас должна получится ссылка вида:
https://www.youtube.com/channel/qqqqqqzqqqqqqqqqqqqqqqqqqqqqqqq/videos
Обязательно /videos в конце.ну а что бы заполнить первую таблицу как я и просил выше предлагаю вам 2 пути. на основе моего кода написать свой или списать со скриншота)
Ну и хочется сказать - мой код не идеален, вообще это костыли - никому не советую так писать, лучше всего читайте больше ,пробуйте, развивайтесь. А на мой код смотрите как на плохой пример который хоть и работает но его лучше не повторять.
import requests from bs4 import BeautifulSoup import sqlite3 from time import sleep from aiogram import Bot, types from aiogram.dispatcher import Dispatcher from aiogram.utils import executor import asyncio TOKEN = "йййййй:йййййййййй-йййййййй_йййййййййй" bot = Bot(token=TOKEN) dp = Dispatcher(bot) def video(link): r = requests.get(link) soup = BeautifulSoup(r.text, 'html.parser') last_video = soup.find_all("h3", {"class": "yt-lockup-title "})[0].find("a")["href"] return last_video def bd(nubmer): con = sqlite3.connect("yt.db") cur = con.cursor() cur.execute(f'CREATE TABLE IF NOT EXISTS "{str(nubmer)}" ("1", "2" TEXT)') con.commit() cur.close() con.close() def check(): con = sqlite3.connect('yt.db') cur = con.cursor() meta = cur.execute("SELECT * FROM sqlite_master WHERE type='table';") disallowed_prefix = "sqlite_stat" last_table = [table[1] for table in meta if isinstance(table, tuple) and not table[1].startswith(disallowed_prefix)] con.commit() meta.close() cur.close() con.close() return last_table[-1] def link_writer(link, nubmer): con = sqlite3.connect("yt.db") cur = con.cursor() mass = [] mass.append(link) mass.append(video(link)) cur.execute(f"INSERT INTO '{str(nubmer)}' VALUES(?,?)", (mass[0], mass[1])) con.commit() cur.close() con.close() async def add_chanel(chanel_link): bd(int(check()) + 1) link_writer(chanel_link, check()) URL = "https://api.telegram.org/bot" + TOKEN + "/" def send_message(chat_id, text="Wait a second please..."): url = URL + "sendmessage?chat_id={}&text={}&parse_mode=HTML".format(chat_id, text) requests.get(url) def check_links(): con = sqlite3.connect('yt.db') cur = con.cursor() meta = cur.execute("SELECT * FROM sqlite_master WHERE type='table';") disallowed_prefix = "sqlite_stat" last_table = [table[1] for table in meta if isinstance(table, tuple) and not table[1].startswith(disallowed_prefix)] for i in range(1, int(last_table[-1]) + 1): channel_link = con.execute(f'SELECT "1" FROM "{str(i)}"') last_video = con.execute(f'SELECT "2" FROM "{str(i)}"') alla = channel_link.fetchall()[0][0] r = requests.get(alla) soup = BeautifulSoup(r.text, 'html.parser') last_video2 = soup.find_all("h3", {"class": "yt-lockup-title "})[0].find("a")["href"] if last_video.fetchall()[-1][0] != last_video2: print("Опа, новое видео") mass2 = [] cl = alla print(cl) mass2.append(cl) short_link = video(cl) mass2.append(short_link) cur.execute(f"INSERT INTO '{str(i)}' VALUES(?,?)", (mass2[0], mass2[1])) con.commit() send_message(771844687, "https://www.youtube.com" + short_link) else: print("Видео старое...") @dp.message_handler(commands=['start']) async def process_start_command(message: types.Message): await message.reply("TEST TEST", reply=False) @dp.message_handler(commands=['add']) async def process_start_command(message: types.Message): chek_word = message.text.replace("/add ", "") await add_chanel(chek_word) await message.reply("TEST2 TEST2", reply=False) async def all(): while True: check_links() await asyncio.sleep(300) if __name__ == '__main__': loop = asyncio.get_event_loop() loop.create_task(all()) executor.start_polling(dp, loop=loop)
Всем удачи.