|
| 1 | +import csv |
| 2 | + |
| 3 | +from typing import List, Dict, Optional, Self |
| 4 | + |
| 5 | +from youtool import YouTube |
| 6 | + |
| 7 | +from .base import Command |
| 8 | + |
| 9 | + |
| 10 | +class VideoSearch(Command): |
| 11 | + """Search video info from a list of IDs or URLs (or CSV filename with URLs/IDs inside), generate CSV output (simplified video dict schema or option to get full video info) |
| 12 | + """ |
| 13 | + name = "video-search" |
| 14 | + arguments = [ |
| 15 | + {"name": "--ids", "type": str, "help": "Video IDs", "nargs": "*"}, |
| 16 | + {"name": "--urls", "type": str, "help": "Video URLs", "nargs": "*"}, |
| 17 | + {"name": "--input-file-path", "type": str, "help": "Input CSV file path with URLs/IDs"}, |
| 18 | + {"name": "--output-file-path", "type": str, "help": "Output CSV file path"}, |
| 19 | + {"name": "--full-info", "type": bool, "help": "Option to get full video info", "default": False} |
| 20 | + ] |
| 21 | + |
| 22 | + ID_COLUMN_NAME: str = "video_id" |
| 23 | + URL_COLUMN_NAME: str = "video_url" |
| 24 | + INFO_COLUMNS: List[str] = [ |
| 25 | + "id", "title", "published_at", "view_count" |
| 26 | + ] |
| 27 | + FULL_INFO_COLUMNS: List[str] = [ |
| 28 | + "id", "title", "description", "published_at", "view_count", "like_count", "comment_count" |
| 29 | + ] |
| 30 | + |
| 31 | + @staticmethod |
| 32 | + def filter_fields(video_info: Dict, info_columns: Optional[List] = None) -> Dict: |
| 33 | + """Filters the fields of a dictionary containing video information based on specified columns. |
| 34 | +
|
| 35 | + Args: |
| 36 | + video_info (Dict): A dictionary containing video information. |
| 37 | + info_columns (Optional[List], optional): A list specifying which fields to include in the filtered output. |
| 38 | + If None, returns the entire video_info dictionary. Defaults to None. |
| 39 | +
|
| 40 | + Returns: |
| 41 | + A dictionary containing only the fields specified in info_columns (if provided) |
| 42 | + or the entire video_info dictionary if info_columns is None. |
| 43 | + """ |
| 44 | + return { |
| 45 | + field: value for field, value in video_info.items() if field in info_columns |
| 46 | + } if info_columns else video_info |
| 47 | + |
| 48 | + @classmethod |
| 49 | + def execute(cls: Self, **kwargs) -> str: |
| 50 | + """ |
| 51 | + Execute the video-search command to fetch YouTube video information from IDs or URLs and save them to a CSV file. |
| 52 | +
|
| 53 | + Args: |
| 54 | + ids (list[str], optional): A list of YouTube video IDs. If not provided, input_file_path must be specified. |
| 55 | + urls (list[str], optional): A list of YouTube video URLs. If not provided, input_file_path must be specified. |
| 56 | + input_file_path (str, optional): Path to a CSV file containing YouTube video URLs or IDs. |
| 57 | + output_file_path (str, optional): Path to the output CSV file where video information will be saved. |
| 58 | + api_key (str): The API key to authenticate with the YouTube Data API. |
| 59 | + full_info (bool, optional): Flag to indicate whether to get full video info. Default is False. |
| 60 | +
|
| 61 | + Returns: |
| 62 | + A message indicating the result of the command. If output_file_path is specified, |
| 63 | + the message will include the path to the generated CSV file. |
| 64 | + Otherwise, it will return the result as a string. |
| 65 | +
|
| 66 | + Raises: |
| 67 | + Exception: If neither ids, urls, nor input_file_path is provided. |
| 68 | + """ |
| 69 | + ids = kwargs.get("ids") |
| 70 | + urls = kwargs.get("urls") |
| 71 | + input_file_path = kwargs.get("input_file_path") |
| 72 | + output_file_path = kwargs.get("output_file_path") |
| 73 | + api_key = kwargs.get("api_key") |
| 74 | + full_info = kwargs.get("full_info", False) |
| 75 | + |
| 76 | + info_columns = VideoSearch.FULL_INFO_COLUMNS if full_info else VideoSearch.SIMPLE_INFO_COLUMNS |
| 77 | + |
| 78 | + if input_file_path: |
| 79 | + with open(input_file_path, mode='r') as infile: |
| 80 | + reader = csv.DictReader(infile) |
| 81 | + for row in reader: |
| 82 | + if cls.ID_COLUMN_NAME in row and row[cls.ID_COLUMN_NAME]: |
| 83 | + ids.append(row[cls.ID_COLUMN_NAME]) |
| 84 | + elif cls.URL_COLUMN_NAME in row and row[cls.URL_COLUMN_NAME]: |
| 85 | + urls.append(row[cls.URL_COLUMN_NAME]) |
| 86 | + |
| 87 | + if not ids and not urls: |
| 88 | + raise Exception("Either 'ids' or 'urls' must be provided for the video-search command") |
| 89 | + |
| 90 | + youtube = YouTube([api_key], disable_ipv6=True) |
| 91 | + |
| 92 | + videos_infos = [] |
| 93 | + |
| 94 | + if ids: |
| 95 | + videos_infos += list(youtube.videos_infos(ids)) |
| 96 | + if urls: |
| 97 | + # TODO: add get videos_infos using urls to youtool |
| 98 | + raise NotImplementedError("videos_infos by url not implemented yet") |
| 99 | + |
| 100 | + return cls.data_to_csv( |
| 101 | + data=[ |
| 102 | + VideoSearch.filter_fields( |
| 103 | + video_info, info_columns |
| 104 | + ) for video_info in videos_infos |
| 105 | + ], |
| 106 | + output_file_path=output_file_path |
| 107 | + ) |
0 commit comments