Как скрапить данные с YouTube при помощи Python

Комментарии: 0

Создателям контента на YouTube необходимо тщательно изучать метрики видео, анализировать комментарии, а также проводить сравнение своих работ с видео других авторов. Незаменимым инструментом для выполнения анализа необходимой выборки видео является скрипт для скрапинга YouTube. В этом руководстве мы разработаем скрипт, который позволит автоматизировать процесс сбора таких данных, как владелец, название, описание канала и комментарии под видео..

Создание скрапера для извлечения данных с YouTube

Для правильной работы скрипта необходимо установить несколько пакетов. Начнем с установки “selenium-wire”, который обеспечивает расширенные возможности для настройки прокси, а также сам Selenium, содержащий основные классы и модули. Для установки этих пакетов введите следующую команду в командной строке:

pip install selenium-wire selenium blinker==1.7.0

Теперь перейдем к импорту дополнительных компонентов.

Шаг 1: Импорт библиотек и пакетов

На данном этапе необходимо импортировать библиотеки и пакеты, которые будут использованы в нашем скрипте для работы с веб-элементами, а также дополнительные модули для обработки данных и управления временем выполнения.

from selenium.webdriver.chrome.options import Options
from seleniumwire import webdriver as wiredriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
import json
import time

Модуль “json” играет ключевую роль в преобразовании извлеченных данных в правильно отформатированные JSON-структуры, что обеспечивает их оптимальное отображение. Помимо использования маскировки IP-адреса, модуль “time” также необходим для внедрения случайных задержек между действиями скрипта, что помогает минимизировать риск обнаружения скриптов по поведенческим паттернам.

Кроме того, модуль time важен для обеспечения корректной загрузки элементов на странице, с которых предстоит извлекать данные. В следующих разделах мы более подробно рассмотрим другие используемые модули.

Шаг 2: Настройка драйвера Chrome для Selenium

Каждый раз, когда вы запускаете Selenium с использованием Python скрипта, он использует ваш IP-адрес для выполнения любых действий. Это может быть рискованно, особенно для сайтов с системами защиты от скрапинга, таких как YouTube. Возможными последствиями могут быть ограничения на доступ к контенту YouTube с вашего IP-адреса.

Для избежания таких ситуаций необходимо предпринять несколько действий. Сначала создайте три переменные для хранения данных о прокси, через которые будет осуществляться доступ к странице. Затем создайте переменную “chrome_options”, которую интегрируем в Chrome WebDriver. Это позволит Selenium определить, какой прокси использовать при скрапинге. Данные прокси необходимо направить в виде аргументов для “chrome_options”, таким образом завершая интеграцию их в скрипт.

# Укажите адрес прокси-сервера с логином и паролем
proxy_address = ""
proxy_username = ""
proxy_password = ""
# Настройте параметры Chrome для работы с прокси
chrome_options = Options()
chrome_options.add_argument(f'--proxy-server={proxy_address}')
chrome_options.add_argument(f'--proxy-auth={proxy_username}:{proxy_password}')
# Создайте экземпляр WebDriver с selenium-wire
driver = wiredriver.Chrome(options=chrome_options)

Шаг 3: Извлечение данных со страницы видео на YouTube

Создайте переменную “youtube_url_to_scrape”, которая будет хранить URL целевой страницы на YouTube. Эта переменная затем используется в методе “driver.get()”, чтобы указать Selenium открыть определенную страницу для скрапинга. Это действие приведет к открытию отдельного окна Chrome при выполнении скрипта.

youtube_url_to_scrape = ""
# Выполните автоматизацию Selenium используя selenium-wire.
driver.get(youtube_url_to_scrape)

Теперь определим функцию “extract_information()”, которая извлекает необходимую информацию со страницы.

Важно убедиться, что все элементы на странице загружены. Для этой задачи мы используем класс WebDriverWait для приостановки выполнения скрипта до момента, когда кнопка “more” станет доступна для взаимодействия, что достигается с помощью переменной element. Когда кнопка становится доступной, Selenium выполняет JavaScript-команду для ее нажатия, позволяя тем самым получить доступ к полному описанию видео.

Для решения проблемы динамически загружаемых комментариев мы применяем класс “Actions” в сочетании с модулем “time”. Это позволяет нам прокручивать страницу вниз дважды каждые 10 секунд, обеспечивая более эффективный сбор комментариев. Такой подход гарантирует, что мы сможем скрапить максимальное количество комментариев.

def extract_information() -> dict:
   try:
       element = WebDriverWait(driver, 15).until(
           EC.presence_of_element_located((By.XPATH, '//*[@id="expand"]'))
       )

       element.click()

       time.sleep(10)
       actions = ActionChains(driver)
       actions.send_keys(Keys.END).perform()
       time.sleep(10)
       actions.send_keys(Keys.END).perform()
       time.sleep(10)

Существует несколько способов поиска элементов с использованием Selenium WebDriver: по ID, CLASS_NAME, XPATH и т.д. В этом руководстве мы будем использовать их комбинацию.

XPATH — это более сложная, основанная на шаблонах система для поиска переменных во время скрапинга. Chrome позволит упростить процесс ее использования: при просмотре кода с помощью инструмента инспекции Chrome, щелкните правой кнопкой мыши, чтобы скопировать XPATH. После копирования вы можете использовать функцию “find_elements”, чтобы определить все элементы, содержащие нужную информацию, например, заголовок видео, описание и т.д.

Важно отметить, что некоторые элементы на странице могут иметь схожие атрибуты, что может привести к тому, что вызов “find_elements()” вернет список, а не строку. В таких случаях необходимо изучить список, чтобы определить индекс релевантной информации и извлечь текст.

В завершение, создается переменная-словарь “data”, которая будет возвращать всю собранную информацию для последующего использования.

 video_title = driver.find_elements(By.XPATH, '//*[@id="title"]/h1')[0].text

   owner = driver.find_elements(By.XPATH, '//*[@id="text"]/a')[0].text

   total_number_of_subscribers = \
       driver.find_elements(By.XPATH, "//div[@id='upload-info']//yt-formatted-string[@id='owner-sub-count']")[
           0].text

   video_description = driver.find_elements(By.XPATH,                                  '//*[@id="description-inline-expander"]/yt-attributed-string/span/span')
   result = []
   for i in video_description:
       result.append(i.text)
   description = ''.join(result)

   publish_date = driver.find_elements(By.XPATH, '//*[@id="info"]/span')[2].text
   total_views = driver.find_elements(By.XPATH, '//*[@id="info"]/span')[0].text

   number_of_likes = driver.find_elements(By.XPATH,                                   '//*[@id="top-level-buttons-computed"]/segmented-like-dislike-button-view-model/yt-smartimation/div/div/like-button-view-model/toggle-button-view-model/button-view-model/button/div')[
       1].text

   comment_names = driver.find_elements(By.XPATH, '//*[@id="author-text"]/span')
   comment_content = driver.find_elements(By.XPATH, '//*[@id="content-text"]/span')
   comment_library = []

   for each in range(len(comment_names)):
       name = comment_names[each].text
       content = comment_content[each].text
       indie_comment = {
           'name': name,
           'comment': content
       }
       comment_library.append(indie_comment)

   data = {
       'owner': owner,
       'subscribers': total_number_of_subscribers,
       'video_title': video_title,
       'description': description,
       'date': publish_date,
       'views': total_views,
       'likes': number_of_likes,
       'comments': comment_library
   }

   return data

except Exception as err:
   print(f"Error: {err}")

Шаг 4: Запись собранных данных в JSON файл

def organize_write_data(data:dict):
    output = json.dumps(data, indent=2, ensure_ascii=False).encode("ascii", "ignore").decode("utf-8")
    try:
        with open("output.json", 'w', encoding='utf-8') as file:
            file.write(output)
    except Exception as err:
        print(f"Error encountered: {err}")

Используем функцию “organize_write_data()”, которая принимает данные в виде словаря data и организует их в структурированный формат JSON. После этого функция записывает эти данные в файл с названием “output.json”, при этом учитывая и обрабатывая возможные ошибки, которые могут возникнуть в процессе записи файла.

Полный код

Ниже представлена полная версия скрипта для извлечения данных:

from selenium.webdriver.chrome.options import Options
from seleniumwire import webdriver as wiredriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
import json
import time

# Укажите адрес прокси-сервера с логином  и паролем
proxy_address = ""
proxy_username = ""
proxy_password = ""

# Настройте параметры Chrome для работы с прокси
chrome_options = Options()
chrome_options.add_argument(f'--proxy-server={proxy_address}')
chrome_options.add_argument(f'--proxy-auth={proxy_username}:{proxy_password}')

# Создайте экземпляр WebDriver с selenium-wire
driver = wiredriver.Chrome(options=chrome_options)

youtube_url_to_scrape = ""

# Выполняйте автоматизацию в Selenium с расширенными возможностями selenium-wire
driver.get(youtube_url_to_scrape)


def extract_information() -> dict:
   try:
       element = WebDriverWait(driver, 15).until(
           EC.presence_of_element_located((By.XPATH, '//*[@id="expand"]'))
       )
       element.click()

       time.sleep(10)
       actions = ActionChains(driver)
       actions.send_keys(Keys.END).perform()
       time.sleep(10)
       actions.send_keys(Keys.END).perform()
       time.sleep(10)

       video_title = driver.find_elements(By.XPATH, '//*[@id="title"]/h1')[0].text

       owner = driver.find_elements(By.XPATH, '//*[@id="text"]/a')[0].text
       total_number_of_subscribers = \
           driver.find_elements(By.XPATH, "//div[@id='upload-info']//yt-formatted-string[@id='owner-sub-count']")[
               0].text

       video_description = driver.find_elements(By.XPATH,
                                                '//*[@id="description-inline-expander"]/yt-attributed-string/span/span')
       result = []
       for i in video_description:
           result.append(i.text)
       description = ''.join(result)

       publish_date = driver.find_elements(By.XPATH, '//*[@id="info"]/span')[2].text
       total_views = driver.find_elements(By.XPATH, '//*[@id="info"]/span')[0].text

       number_of_likes = driver.find_elements(By.XPATH,
                                              '//*[@id="top-level-buttons-computed"]/segmented-like-dislike-button-view-model/yt-smartimation/div/div/like-button-view-model/toggle-button-view-model/button-view-model/button/div')[
           1].text

       comment_names = driver.find_elements(By.XPATH, '//*[@id="author-text"]/span')
       comment_content = driver.find_elements(By.XPATH,
                                              '//*[@id="content-text"]/span')
       comment_library = []

       for each in range(len(comment_names)):
           name = comment_names[each].text
           content = comment_content[each].text
           indie_comment = {
               'name': name,
               'comment': content
           }
           comment_library.append(indie_comment)

       data = {
           'owner': owner,
           'subscribers': total_number_of_subscribers,
           'video_title': video_title,
           'description': description,
           'date': publish_date,
           'views': total_views,
           'likes': number_of_likes,
           'comments': comment_library
       }

       return data

   except Exception as err:
       print(f"Error: {err}")


# Запись данных в JSON файл
def organize_write_data(data: dict):
   output = json.dumps(data, indent=2, ensure_ascii=False).encode("ascii", "ignore").decode("utf-8")
   try:
       with open("output.json", 'w', encoding='utf-8') as file:
           file.write(output)
   except Exception as err:
       print(f"Error encountered: {err}")


organize_write_data(extract_information())
driver.quit()

Результаты

Выходные данные выглядят следующим образом:

Screenshot_1.png

Безопасный сбор информации с YouTube становится особенно эффективным, когда используются хорошо разработанные скрипты с применением прокси для гарантии соблюдения политик и правил платформы. Рассмотренный выше подход способствует ответственному извлечению данных и помогает снизить риск возможных ограничений, налагаемых платформой.

Комментарии:

0 комментариев