【Python]】基于Python3实现的m3u8批量下载器 解密&合并&多线程 (开车新姿势~)

【Python]】基于Python3实现的m3u8批量下载器 解密&合并&多线程 (开车新姿势~)

kain
2020-10-10 / 0 评论 / 167 阅读 / 正在检测是否收录...
温馨提示:
本文最后更新于2020年10月10日,已超过1317天没有更新,文章所提及的内容可能已过时失效,所以请自行测试验证。

一、前言

闲来无聊写的m3u8批量下载器,实现了多线程下载、AES常规解密、合并、批量下载四大功能。
img

二、m3u8概述

M3U8 是 Unicode 版本的 M3U,用 UTF-8 编码。"M3U" 和 "M3U8" 文件都是苹果公司使用的 HTTP Live Streaming(HLS) 协议格式的基础,HLS 的工作原理是把整个流分成一个个小的基于 HTTP 的文件来下载,每次只下载一些。当媒体流正在播放时,客户端可以选择从许多不同的备用源中以不同的速率下载同样的资源,允许流媒体会话适应不同的网络速率,所以广泛用于在线视频的播放、传输。

一个m3u8文件主要由信息头(记录版本、是否加密、key的位置)、ts流列表两个部分构成。下面是常见的两类m3u8文件。

m3u8表现形式1(假设m3u8文件的url=https://www.xxx.com/yyy/zzz/index.m3u8

这种m3u8链接对应的视频可能有多种分辨率,比如下面这个例子只有720x480这个分辨率,对应的相对url为1000kb/hls/index.m3u8,是一个相对路径,720x480分辨率的视频绝对路径就是https://www.xxx.com/yyy/zzz/1000kb/hls/index.m3u8,需要再次访问这个链接下载这个分辨率对应的m3u8文件。

#EXTM3U
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=1000000,RESOLUTION=20x480
000kb/hls/index.m3u8

m3u8表现形式2(假设m3u8文件的url=https://www.xxx.com/yyy/zzz/index.m3u8

这种m3u8已经把ts列表直接放进来了,所以下载所有的ts流组合即可得到视频的全部内容。
注意第5行,注明了该视频进行了加密,加密方式为AES-128(默认是CBC模式),加密key的路径为key.key,是一个相对路径,套上基路径就是https://www.xxx.com/yyy/zzz/key.key,(注:key也可能这个key的url是一个http开头的绝对路径)。有的m3u8可能还在这一行加一个IV,也就是AES-CBC加密、解密中的IV。
下面的例子中ts也是相对路径,同样需要加上基路径,比如第1个ts的绝对路径就是https://www.xxx.com/yyy/zzz/QxiMvI3688000.ts

#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:1
#EXT-X-MEDIA-SEQUENCE:0
#EXT-X-KEY:METHOD=AES-128,URI="key.key"
#EXTINF:0.834167,
QxiMvI3688000.ts
#EXTINF:0.834167,
QxiMvI3688001.ts
#EXTINF:0.834167,
QxiMvI3688002.ts
#EXTINF:0.834167,
QxiMvI3688003.ts
#EXTINF:0.834167,
QxiMvI3688004.ts
#EXTINF:0.834167,
QxiMvI3688005.ts
...

三、基于Python3实现的m3u8批量下载器(解密&多线程&合并)

1、下载思路

经过简单的分析m3u8协议及其文件格式,现在只要把他们串起来就好了。

①、下载m3u8文件,如果其内容的表示形式是第1种,则还需要再次访问对应的分辨率的url,重新下载m3u8
②、解析m3u8,判断是否加密了(需要提取加密方式、加密key、IV),提取ts列表
③、多线程下载所有ts(注意别打乱顺序,在m3u8文件中的顺序就是在完整视频中的顺序,所以需要记录原来的顺序,或者按照顺序进行ts重命名)
④、合并(如果加密了,则对每个ts解密)
⑤、调用FFmpeg,将合并好的视频信息放入一个mp4容器中(直接放在mp4文件也行)
⑥、回到①,开始下载下一个m3u8

2、Python源码实现

受博主编码能力影响,加上博主又很懒,怎么简单怎么来,代码冗余度比较高。。。下面的代码已经过了4-5千个m3u8的下载测试,但是不能保证没有bug,如有问题欢迎斧正哈~

# UTF-8
# author hestyle
# desc: 必须在终端直接执行,不能在pycharm等IDE中直接执行,否则看不到动态进度条效果

import os
import sys
import m3u8
import requests
import traceback
import threadpool
import shutil
from Crypto.Cipher import AES

headers = {
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
    "Connection": "Keep-Alive",
    "Accept-Encoding": "gzip, deflate, br",
    "Accept-Language": "zh-CN,zh;q=0.9",
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.102 Safari/537.36"
}
######################配置信息##########################
# m3u8链接批量输入文件
m3u8InputFilePath = "/Users/kain/Code/GIT/m3u8downloader/input/m3u8s_input.txt"
# 视频保存路径
saveRootDirPath = "/Users/kain/Code/GIT/m3u8downloader/output"
# 下载出错的m3u8保存文件
errorM3u8InfoDirPath = "/Users/kain/Code/GIT/m3u8downloader/output/error.txt"
# m3u8文件、key文件下载尝试次数,ts流默认无限次尝试下载,直到成功
m3u8TryCountConf = 10
# 线程数(同时下载的分片数)
processCountConf = 50
#######################################################

# 全局变量
# 全局线程池
taskThreadPool = None
# 当前下载的m3u8 url
m3u8Url = None
# url前缀
rootUrlPath = None
# title
title = None
# ts count
sumCount = None
# 已处理的ts
doneCount = None
# cache path
cachePath = saveRootDirPath + "/cache"
# log path
logPath = cachePath + "/log.log"
# log file
logFile = None

# 1、下载m3u8文件
def getM3u8Info():
    global m3u8Url
    global logFile
    global rootUrlPath
    tryCount = m3u8TryCountConf
    while True:
        if tryCount < 0:
            print("\t{0}下载失败!".format(m3u8Url))
            logFile.write("\t{0}下载失败!".format(m3u8Url))
            return None
        tryCount = tryCount - 1
        try:
            response = requests.get(m3u8Url, headers=headers, timeout=20, allow_redirects=True)
            if response.status_code == 301:
                nowM3u8Url = response.headers["location"]
                print("\t{0}重定向至{1}!".format(m3u8Url, nowM3u8Url))
                logFile.write("\t{0}重定向至{1}!\n".format(m3u8Url, nowM3u8Url))
                m3u8Url = nowM3u8Url
                continue
            expected_length = int(response.headers.get('Content-Length'))
            actual_length = len(response.content)
            if expected_length > actual_length:
                raise Exception("m3u8下载不完整")
            print("\t{0}下载成功!".format(m3u8Url))
            logFile.write("\t{0}下载成功!".format(m3u8Url))
            rootUrlPath = m3u8Url[0:m3u8Url.rindex('/')]
            break
        except TimeoutError:
            print("\t{0}下载失败!正在重试".format(m3u8Url))
            logFile.write("\t{0}下载失败!正在重试".format(m3u8Url))
            traceback.print_exc()
    # 解析m3u8中的内容
    m3u8Info = m3u8.loads(response.text)
    # 有可能m3u8Url是一个多级码流
    if m3u8Info.is_variant:
        print("\t{0}为多级码流!".format(m3u8Url))
        logFile.write("\t{0}为多级码流!".format(m3u8Url))
        for rowData in response.text.split('\n'):
            # 寻找响应内容的中的m3u8
            if rowData.endswith(".m3u8"):
                m3u8Url = m3u8Url.replace("index.m3u8", rowData)
                rootUrlPath = m3u8Url[0:m3u8Url.rindex('/')]
                return getM3u8Info()
        # 遍历未找到就返回None
        print("\t{0}响应未寻找到m3u8!".format(response.text))
        logFile.write("\t{0}响应未寻找到m3u8!".format(response.text))
        return None
    else:
        return m3u8Info

# 2、下载key文件
def getKey(keyUrl):
    global logFile
    tryCount = m3u8TryCountConf
    while True:
        if tryCount < 0:
            print("\t{0}下载失败!".format(keyUrl))
            logFile.write("\t{0}下载失败!".format(keyUrl))
            return None
        tryCount = tryCount - 1
        try:
            response = requests.get(keyUrl, headers=headers, timeout=20, allow_redirects=True)
            if response.status_code == 301:
                nowKeyUrl = response.headers["location"]
                print("\t{0}重定向至{1}!".format(keyUrl, nowKeyUrl))
                logFile.write("\t{0}重定向至{1}!\n".format(keyUrl, nowKeyUrl))
                keyUrl = nowKeyUrl
                continue
            expected_length = int(response.headers.get('Content-Length'))
            actual_length = len(response.content)
            if expected_length > actual_length:
                raise Exception("key下载不完整")
            print("\t{0}下载成功!key = {1}".format(keyUrl, response.content.decode("utf-8")))
            logFile.write("\t{0}下载成功! key = {1}".format(keyUrl, response.content.decode("utf-8")))
            break
        except :
            print("\t{0}下载失败!".format(keyUrl))
            logFile.write("\t{0}下载失败!".format(keyUrl))
    return response.text

# 3、多线程下载ts流
def mutliDownloadTs(playlist):
    global logFile
    global sumCount
    global doneCount
    global taskThreadPool
    taskList = []
    # 每个ts单独作为一个task
    for index in range(len(playlist)):
        dict = {"playlist": playlist, "index": index}
        taskList.append((None, dict))
    # 重新设置ts数量,已下载的ts数量
    doneCount = 0
    sumCount = len(taskList)
    printProcessBar(sumCount, doneCount, 50)
    # 构造thread pool
    requests = threadpool.makeRequests(downloadTs, taskList)
    [taskThreadPool.putRequest(req) for req in requests]
    # 等待所有任务处理完成
    taskThreadPool.wait()
    print("")
    return True

# 4、下载单个ts playlists[index]
def downloadTs(playlist, index):
    global logFile
    global sumCount
    global doneCount
    global cachePath
    global rootUrlPath
    succeed = False
    while not succeed:
        # 文件名格式为 "00000001.ts",index不足8位补充0
        outputPath = cachePath + "/" + "{0:0>8}.ts".format(index)
        outputFp = open(outputPath, "wb+")
        if playlist[index].startswith("http"):
            tsUrl = playlist[index]
        else:
            tsUrl = rootUrlPath + "/" + playlist[index]
        try:
            response = requests.get(tsUrl, timeout=10, headers=headers, stream=True)
            if response.status_code == 200:
                expected_length = int(response.headers.get('Content-Length'))
                actual_length = len(response.content)
                if expected_length > actual_length:
                    raise Exception("分片下载不完整")
                outputFp.write(response.content)
                doneCount += 1
                printProcessBar(sumCount, doneCount, 50)
                logFile.write("\t分片{0:0>8} url = {1} 下载成功!".format(index, tsUrl))
                succeed = True
        except Exception as exception:
            logFile.write("\t分片{0:0>8} url = {1} 下载失败!正在重试...msg = {2}".format(index, tsUrl, exception))
        outputFp.close()

# 5、合并ts
def mergeTs(tsFileDir, outputFilePath, cryptor, count):
    global logFile
    outputFp = open(outputFilePath, "wb+")
    for index in range(count):
        printProcessBar(count, index + 1, 50)
        logFile.write("\t{0}\n".format(index))
        inputFilePath = tsFileDir + "/" + "{0:0>8}.ts".format(index)
        if not os.path.exists(outputFilePath):
            print("\n分片{0:0>8}.ts, 不存在,已跳过!".format(index))
            logFile.write("分片{0:0>8}.ts, 不存在,已跳过!\n".format(index))
            continue
        inputFp = open(inputFilePath, "rb")
        fileData = inputFp.read()
        try:
            if cryptor is None:
                outputFp.write(fileData)
            else:
                outputFp.write(cryptor.decrypt(fileData))
        except Exception as exception:
            inputFp.close()
            outputFp.close()
            print(exception)
            return False
        inputFp.close()
    print("")
    outputFp.close()
    return True

# 6、删除ts文件
def removeTsDir(tsFileDir):
    # 先清空文件夹
    for root, dirs, files in os.walk(tsFileDir, topdown=False):
        for name in files:
            os.remove(os.path.join(root, name))
        for name in dirs:
            os.rmdir(os.path.join(root, name))
    os.rmdir(tsFileDir)
    return True

# 7、convert to mp4(调用了FFmpeg,将合并好的视频内容放置到一个mp4容器中)
def ffmpegConvertToMp4(inputFilePath, ouputFilePath):
    global logFile
    if not os.path.exists(inputFilePath):
        print(inputFilePath + " 路径不存在!")
        logFile.write(inputFilePath + " 路径不存在!\n")
        return False
    cmd = r'./ffmpeg -i "{0}" -vcodec copy -acodec copy "{1}"'.format(inputFilePath, ouputFilePath)
    if os.system(cmd) == 0:
        print(inputFilePath + "转换成功!")
        logFile.write(inputFilePath + "转换成功!\n")
        return True
    else:
        print(inputFilePath + "转换失败!")
        logFile.write(inputFilePath + "转换失败!\n")
        return False

# 8、模拟输出进度条
def printProcessBar(sumCount, doneCount, width):
    precent = doneCount / sumCount
    useCount = int(precent * width)
    spaceCount = int(width - useCount)
    precent = precent*100
    print('\t{0}/{1} {2}{3} {4:.2f}%'.format(sumCount, doneCount, useCount*'■', spaceCount*'□', precent), file=sys.stdout, flush=True, end='\r')

# m3u8下载器
def m3uVideo8Downloader():
    global title
    global logFile
    global m3u8Url
    global cachePath
    # 1、下载m3u8
    print("\t1、开始下载m3u8...")
    logFile.write("\t1、开始下载m3u8...\n")
    m3u8Info = getM3u8Info()
    if m3u8Info is None:
        return False
    tsList = []
    for playlist in m3u8Info.segments:
        tsList.append(playlist.uri)
    # 2、获取key
    keyText = ""
    cryptor = None
    # 判断是否加密
    if (len(m3u8Info.keys) != 0) and (m3u8Info.keys[0] is not None):
        # 默认选择第一个key,且AES-128算法
        key = m3u8Info.keys[0]
        if key.method != "AES-128":
            print("\t{0}不支持的解密方式!".format(key.method))
            logFile.write("\t{0}不支持的解密方式!\n".format(key.method))
            return False
        # 如果key的url是相对路径,加上m3u8Url的路径
        keyUrl = key.uri
        if not keyUrl.startswith("http"):
            keyUrl = m3u8Url.replace("index.m3u8", keyUrl)
        print("\t2、开始下载key...")
        logFile.write("\t2、开始下载key...\n")
        keyText = getKey(keyUrl)
        if keyText is None:
            return False
        # 判断是否有偏移量
        if key.iv is not None:
            cryptor = AES.new(bytes(keyText, encoding='utf8'), AES.MODE_CBC, bytes(key.iv, encoding='utf8'))
        else:
            cryptor = AES.new(bytes(keyText, encoding='utf8'), AES.MODE_CBC, bytes(keyText, encoding='utf8'))
    # 3、下载ts
    print("\t3、开始下载ts...")
    logFile.write("\t3、开始下载ts...\n")
    if mutliDownloadTs(tsList):
        print("\tts下载完成---------------------")
        logFile.write("\tts下载完成---------------------\n")
    # 4、合并ts
    print("\t4、开始合并ts...")
    logFile.write("\t4、开始合并ts...\n")
    if mergeTs(cachePath, cachePath + "/cache.flv", cryptor, len(tsList)):
        print("\tts合并完成---------------------")
        logFile.write("\tts合并完成---------------------\n")
    else:
        print(keyText)
        print("\tts合并失败!")
        logFile.write("\tts合并失败!\n")
        return False
    # 5、开始转换成mp4
    print("\t5、开始mp4转换...")
    logFile.write("\t5、开始mp4转换...\n")
    if not ffmpegConvertToMp4(cachePath + "/cache.flv", saveRootDirPath + "/" + title + ".mp4"):
        return False
    return True

if __name__ == '__main__':
    # 判断m3u8文件是否存在
    if not (os.path.exists(m3u8InputFilePath)):
        print("{0}文件不存在!".format(m3u8InputFilePath))
        exit(0)
    m3u8InputFp = open(m3u8InputFilePath, "r", encoding="utf-8")
    # 设置error的m3u8 url输出
    errorM3u8InfoFp = open(errorM3u8InfoDirPath, "a+", encoding="utf-8")
    # 设置log file
    if not os.path.exists(cachePath):
        os.makedirs(cachePath)
    logFile = open(logPath, "w+", encoding="utf-8")
    # 初始化线程池
    taskThreadPool = threadpool.ThreadPool(processCountConf)
    while True:
        rowData = m3u8InputFp.readline()
        rowData = rowData.strip('\n')
        if rowData == "":
            break
        m3u8Info = rowData.split(',')
        title = m3u8Info[0]
        m3u8Url = m3u8Info[1]
        try:
            print("{0} 开始下载:".format(m3u8Info[0]))
            logFile.write("{0} 开始下载:\n".format(m3u8Info[0]))
            if m3uVideo8Downloader():
                # 成功下载完一个m3u8则清空logFile
                logFile.truncate()
                print("{0} 下载成功!".format(m3u8Info[0]))
            else:
                errorM3u8InfoFp.write(title + "," + m3u8Url + '\n')
                errorM3u8InfoFp.flush()
                print("{0} 下载失败!".format(m3u8Info[0]))
                logFile.write("{0} 下载失败!\n".format(m3u8Info[0]))
        except Exception as exception:
            print(exception)
            traceback.print_exc()
    # 关闭文件
    logFile.close()
    m3u8InputFp.close()
    errorM3u8InfoFp.close()
    shutil.rmtree(cachePath)
    print("----------------下载结束------------------")

四、开车姿势与车速展示

1、开车姿势

①、导入源码用到库m3u8、traceback、threadpool、pycryptodome、beautifulsoup4(用pip3安装就行)

②、将 ffmpeg 放到源码文件所在目录(源码调用了cmd命令进而调用了 ffmpeg),创建 inputoutput 文件夹

Windows / macOS 使用 FFMPEG : https://www.lanzoux.com/b015tjr7e

Linux 请到以下网址根据系统自行下载:https://ffmpeg.org/download.html#build-linux

修改代码 237 行,win 系统修改为:.\ffmpeg.exe
tGicTH

8jX70Q

程序目录

③、将准备好的 m3u8s_input.txt 文件(必须是utf-8编码,格式如下)保存在 input 文件夹

m3u8s_input.txt 示例
title_1,m3u8_url_1
title_2,m3u8_url_2
title_3,m3u8_url_3
...这是省略号,小白别瞎搞...
title_n,m3u8_url_n

④、配置好源码中的 m3u8 url 文件路径、视频保存的地址、线程数

⑤、控制台/终端使用 Python 执行脚本

下面将 python 程序保存名为 run.py

python3 run.py
注:为了尽量减少频繁的删除、创建 ts 流文件,源码运行时在 output 目录中创建一个cache 缓存目录,里面暂存下载好的 ts 流。下载过程中不可删除!!!全部下载完成会自动删除 cache 缓存目录

2、车速展示(系好安全带)

0CsWIP

2Uc45H

五、后记

以上源码仅作为Python技术学习、交流之用,切勿用于任何可能造成违法犯罪的场景,否则后果自负!

0

评论 (0)

取消