直接坐干货,爬上抖音(抖音)妹妹视频列表,下载水印(仅限学习,不用于商业用途,如有侵权,请联系作者删除);18年初,python基础篇更新了。
比如我想获取抖音网红“惠子”小姐姐的主页列表视频,第一步在抖音上打开惠子的主页,右上角点击一下,可以看到一个分享按钮,点击分享,找到复制链接-> 。把链接放到浏览器中短链接被自动解析,变成长链接: ;sec_uid=MS4wLjABAAAAHmQ4DqHKN8IdfWWd52sYaGS6zaZaOTghOZ4ysZ0z_YM×tamp=1571884619&utm_source=copy&utm_campaign=client_share&utm_medium=android&share_app_name=douyin ,在长链接中就可以看到一些用户信息,有没有用我们先列出来!
key | value |
---|---|
user | 73838190950 |
u_code | 128dfi636 |
sec_uid | MS4wLjABAAAAHmQ4DqHKN8IdfWWd52sYaGS6zaZaOTghOZ4ysZ0z_YM |
timestamp | 1571884619 |
utm_source | copy |
utm_campaign | client_share |
utm_medium | android |
share_app_name | douyin |
打开浏览器开发者工具,找到对应的视频列表请求接口,一个一个排查终于找到这个链接:;count=21&max_cursor=0&aid=1128&_signature=QOtJJBARHVwzHUNLqlT-mEDrST&dytk=593d265a74e3384e06112b423ef268da
key | value |
---|---|
sec_uid | MS4wLjABAAAAHmQ4DqHKN8IdfWWd52sYaGS6zaZaOTghOZ4ysZ0z_YM |
count | 21 |
max_cursor | 00 |
aid | 1128 |
_signature | F1OCixATSudkpYjkPsX5FRdTgp |
dytk | 593d265a74e3384e06112b423ef268da |
返回的数据:
Json:
{
"max_cursor": 00,
"min_cursor": 00,
"has_more": true,
-"extra": {
"now": 00,
"logid": "2019102411481201001404709304158BDD"
},
"status_code": 0,
-"aweme_list": [
-{
-"statistics": {
"aweme_id": "6750893105127378180",
"comment_count": 1240,
"digg_count": 30000,
"play_count": 675000,
"share_count": 79,
"forward_count": 17
},
"image_infos": null,
"uniqid_position": null,
"long_video": null,
"aweme_id": "6750893105127378180",
+"text_extra": [ … ],
"position": null,
"geofencing": null,
"promotions": null,
"desc": "#看啥啥都缺 ,爱买女孩绝不认输。",
"aweme_type": 4,
"comment_list": null,
"video_text": null,
"cha_list": null,
-"video": {
+"cover": { … },
"width": 720,
-"origin_cover": {
-"url_list": [
";,
";,
";
],
"uri": "large/tos-cn-p-0015/6e83730009fe4fc2a3eeddbf06b0dbbf_1571815007"
},
"has_watermark": false,
-"play_addr_lowbr": {
"uri": "v0200ff80000bmnvs5ignbh26fqqufbg",
-"url_list": [
";line=0&ratio=540p&media_type=4&vr_type=0&improve_bitrate=0&is_play_url=1",
";line=1&ratio=540p&media_type=4&vr_type=0&improve_bitrate=0&is_play_url=1"
]
},
"bit_rate": null,
"vid": "v0200ff80000bmnvs5ignbh26fqqufbg",
-"play_addr": {
"uri": "v0200ff80000bmnvs5ignbh26fqqufbg",
-"url_list": [
";line=0&ratio=540p&media_type=4&vr_type=0&improve_bitrate=0&is_play_url=1",
";line=1&ratio=540p&media_type=4&vr_type=0&improve_bitrate=0&is_play_url=1"
]
},
"height": 1280,
-"dynamic_cover": {
-"url_list": [
";,
";,
";
],
"uri": "tos-cn-p-0015/f4f71ff403574d768a87e7ef3501a7cc_1571815009"
},
"ratio": "540p",
-"download_addr": {
"uri": "v0200ff80000bmnvs5ignbh26fqqufbg",
-"url_list": [
";line=0&ratio=540p&watermark=0&media_type=4&vr_type=0&improve_bitrate=0&logo_name=aweme_self",
";line=1&ratio=540p&watermark=0&media_type=4&vr_type=0&improve_bitrate=0&logo_name=aweme_self"
]
},
"duration": 61824
},
"video_labels": null,
"label_top_text": null
}
]
}
通过返回的参数可以看到我们需要的数据都在这里,在这里不着急解析数据,通过对比请求参数,别的参数都是现成的在主页代码中都可以找到,基本可以确定_signature
参数是加密字符串,接下来我们就跟踪这个参数的形成过程。通过搜索,确定了它在index_10ae3b3.js
中生成的 ,截图如下:
通过截图我们知道,signature
是通过_bytedAcrawler
对象获取的,顺着我们查看它的生成过程:截图如下:
它是在ba
生成的,截图如下:
通过分析,_signature
获取比较复杂,js代码已经被混淆压制,直接分析算法过程比较难,但是我们可以通过执行签名的算法代码,并返回对应的签名结果。把被压过的js保存下来,执行_by("")
获取参数签名。
分析完成后,开始python模拟手机数据请求:
1.读取主页链接:支持同时爬去多个小姐姐的主页视频列表,在中输入每个URL通过逗号/空格/tab/表格鍵/回车符 分割,支持多行,也可以使用命令进行指定链接
python amemv-video-ri url1,url2...
,解析文本数据/命令行数据;
content, opts, args = None, None, []
try:
if len) >= 2:
opts, args = ge[1:], "hi:o:", ["favorite"])
except ge as err:
usage()
(2)
if not args:
# check the sites file
filename = ""
if os.(filename):
content = parse_sites(filename)
else:
usage()
(1)
else:
content = (args[0] if args else '').split(",")
if len(content) == 0 or content[0] == "":
usage()
(1)
if opts:
for o, val in opts:
if o in ("--favorite"):
download_favorite = True
break
CrawlerScheduler(content)
2.获取列表视频:
class CrawlerScheduler(object):
def __init__(self, items):
= []
= []
= []
for i in range(len(items)):
url = get_real_address(items[i])
if not url:
continue
if re.search('share/user', url):
.append(url)
if re.search('share/challenge', url):
.append(url)
if re.search('share/music', url):
.append(url)
= Queue.Queue()
()
#通过node执行得到签名
@staticmethod
def generateSignature(value):
p = os.popen('node %s' % value)
return p.readlines()[0]
@staticmethod
def calculateFileMd5(filename):
hmd5 = ()
fp = open(filename, "rb")
())
return ()
def scheduling(self):
for x in range(THREADS):
worker = DownloadWorker()
worker.daemon = True
worker.start()
for url in :
(url)
for url in :
(url)
for url in :
(url)
def download_user_videos(self, url):
number = re.findall(r'share/user/(\d+)', url)
if not len(number):
return
dytk = get_dytk(url)
hostname = urllib.(url).hostname
if hostname != 't.; and not dytk:
return
user_id = number[0]
video_count = (user_id, dytk, url)
.join()
print("\nAweme number %s, video number %s\n\n" %
(user_id, str(video_count)))
print("\nFinish Downloading All the videos from %s\n\n" % user_id)
def download_challenge_videos(self, url):
challenge = re.findall('share/challenge/(\d+)', url)
if not len(challenge):
return
challenges_id = challenge[0]
video_count = (challenges_id, url)
.join()
print("\nAweme challenge #%s, video number %d\n\n" %
(challenges_id, video_count))
print("\nFinish Downloading All the videos from #%s\n\n" % challenges_id)
def download_music_videos(self, url):
music = re.findall('share/music/(\d+)', url)
if not len(music):
return
musics_id = music[0]
video_count = (musics_id, url)
.join()
print("\nAweme music @%s, video number %d\n\n" %
(musics_id, video_count))
print("\nFinish Downloading All the videos from @%s\n\n" % musics_id)
def _join_download_queue(self, aweme, target_folder):
try:
if aweme.get('video', None):
uri = aweme['video']['play_addr']['uri']
download_url = "{0}"
download_params = {
'video_id': uri,
'line': '0',
'ratio': '720p',
'media_type': '4',
'vr_type': '0',
'test_cdn': 'None',
'improve_bitrate': '0',
'iid': '35628056608',
'device_id': '46166618999',
'os_api': '18',
'app_name': 'aweme',
'channel': 'App%20Store',
'idfa': '00000000-0000-0000-0000-000000000000',
'device_platform': 'iphone',
'build_number': '27014',
'vid': '2ED380A7-F09C-6C9E-90F5-862D58F3129C',
'openudid': '21dae85eeac1da35a69e2a0ffeaeef61c78a2e98',
'device_type': 'iPhone8%2C2',
'app_version': '2.7.0',
'version_code': '2.7.0',
'os_version': '12.0',
'screen_width': '1242',
'aid': '1128',
'ac': 'WIFI'
}
if aweme.get('hostname') == 't.;:
download_url = '{0}'
download_params = {
'video_id': uri,
'line': '0',
'ratio': '720p',
'media_type': '4',
'vr_type': '0',
'test_cdn': 'None',
'improve_bitrate': '0',
'version_code': '1.7.2',
'language': 'en',
'app_name': 'trill',
'vid': 'D7B3981F-DD46-45A1-A97E-428B90096C3E',
'app_version': '1.7.2',
'device_id': '6619780206485964289',
'channel': 'App Store',
'mcc_mnc': '',
'tz_offset': '28800'
}
share_info = aweme.get('share_info', {})
url = download_url.format(
'&'.join([key + '=' + download_params[key] for key in download_params]))
.put(('video',
uri + "-" + ('share_desc', uri),
url, target_folder))
else:
if aweme.get('image_infos', None):
image = aweme['image_infos']['label_large']
.put(
('image', image['uri'], image['url_list'][0], target_folder))
except KeyError:
return
except UnicodedecodeError:
print("Cannot decode response data from DESC %s" % aweme['desc'])
return
def _download_user_media(self, user_id, dytk, url):
current_folder = os.getcwd()
target_folder = os.(current_folder, 'download/%s' % user_id)
if not os.(target_folder):
os.mkdir(target_folder)
if not user_id:
print("Number %s does not exist" % user_id)
return
hostname = urllib.(url).hostname
signature = (str(user_id))
user_video_url = "https://%s/aweme/v1/aweme/post/" % hostname
user_video_params = {
'user_id': str(user_id),
'count': '21',
'max_cursor': '0',
'aid': '1128',
'_signature': signature,
'dytk': dytk
}
if hostname == 't.;:
u('dytk')
user_video_params['aid'] = '1180'
max_cursor, video_count = None, 0
while True:
if max_cursor:
user_video_params['max_cursor'] = str(max_cursor)
res = reque(user_video_url, headers=HEADERS,
params=user_video_params)
contentJson = j('utf-8'))
aweme_list = con('aweme_list', [])
for aweme in aweme_list:
video_count += 1
aweme['hostname'] = hostname
(aweme, target_folder)
if con('has_more'):
max_cursor = con('max_cursor')
else:
break
# if True:
# favorite_folder = target_folder + '/favorite'
# video_count = (
# user_id, dytk, hostname, signature, favorite_folder, video_count)
if video_count == 0:
print("There's no video in number %s." % user_id)
return video_count
def _download_challenge_media(self, challenge_id, url):
if not challenge_id:
print("Challenge #%s does not exist" % challenge_id)
return
current_folder = os.getcwd()
target_folder = os.(
current_folder, 'download/#%s' % challenge_id)
if not os.(target_folder):
os.mkdir(target_folder)
hostname = urllib.(url).hostname
signature = (str(challenge_id) + '9' + '0')
challenge_video_url = "https://%s/aweme/v1/challenge/aweme/" % hostname
challenge_video_params = {
'ch_id': str(challenge_id),
'count': '9',
'cursor': '0',
'aid': '1128',
'screen_limit': '3',
'download_click_limit': '0',
'_signature': signature
}
cursor, video_count = None, 0
while True:
if cursor:
challenge_video_params['cursor'] = str(cursor)
challenge_video_params['_signature'] = (
str(challenge_id) + '9' + str(cursor))
res = reque(challenge_video_url,
headers=HEADERS, params=challenge_video_params)
try:
contentJson = j('utf-8'))
except:
prin)
aweme_list = con('aweme_list', [])
if not aweme_list:
break
for aweme in aweme_list:
aweme['hostname'] = hostname
video_count += 1
(aweme, target_folder)
print("number: ", video_count)
if con('has_more'):
cursor = con('cursor')
else:
break
if video_count == 0:
print("There's no video in challenge %s." % challenge_id)
return video_count
def _download_music_media(self, music_id, url):
if not music_id:
print("Challenge #%s does not exist" % music_id)
return
current_folder = os.getcwd()
target_folder = os.(current_folder, 'download/@%s' % music_id)
if not os.(target_folder):
os.mkdir(target_folder)
hostname = urllib.(url).hostname
signature = (str(music_id))
music_video_url = "https://%s/aweme/v1/music/aweme/?{0}" % hostname
music_video_params = {
'music_id': str(music_id),
'count': '9',
'cursor': '0',
'aid': '1128',
'screen_limit': '3',
'download_click_limit': '0',
'_signature': signature
}
if hostname == 't.;:
for key in ['screen_limit', 'download_click_limit', '_signature']:
mu(key)
music_video_params['aid'] = '1180'
cursor, video_count = None, 0
while True:
if cursor:
music_video_params['cursor'] = str(cursor)
music_video_params['_signature'] = (
str(music_id) + '9' + str(cursor))
url = mu(
'&'.join([key + '=' + music_video_params[key] for key in music_video_params]))
res = reque(url, headers=HEADERS)
contentJson = j('utf-8'))
aweme_list = con('aweme_list', [])
if not aweme_list:
break
for aweme in aweme_list:
aweme['hostname'] = hostname
video_count += 1
(aweme, target_folder)
if con('has_more'):
cursor = con('cursor')
else:
break
if video_count == 0:
print("There's no video in music %s." % music_id)
return video_count
3.下载视频:
#下载相关的逻辑
def download(medium_type, uri, medium_url, target_folder):
headers = co(HEADERS)
file_name = uri
if medium_type == 'video':
file_name += '.mp4'
headers['user-agent'] = 'Aweme/27014 CFNetwork Darwin;
elif medium_type == 'image':
file_name += '.jpg'
file_name = ("/", "-")
else:
return
file_path = os.(target_folder, file_name)
if os.(file_path):
print(file_name + " 已经爬取过了,文件保存在 " + file_path + " 放弃爬取")
return
print("Downloading %s from %s.\n" % (file_name, medium_url))
# VIDEOID_DICT[VIDEO_ID] = 1 # 记录已经下载的视频
retry_times = 0
while retry_times < RETRY:
try:
resp = reque(medium_url, headers=headers, stream=True, timeout=TIMEOUT)
if re == 403:
retry_times = RETRY
print("Access Denied when retrieve %s.\n" % medium_url)
raise Exception("Access Denied")
with open(file_path, 'wb') as fh:
for chunk in re(chunk_size=1024):
(chunk)
break
except:
pass
retry_times += 1
else:
try:
os.remove(file_path)
except OSError:
pass
print("Failed to retrieve %s from %s.\n" % (uri, medium_url))
(1)
4.其他:
#通过短链接-获取长链接
def get_real_address(url):
if url.find('v.douyin.com') < 0:
return url
res = reque(url, headers=HEADERS, allow_redirects=False)
return res.headers['Location'] if res.status_code == 302 else None
# 得到dytk参数
def get_dytk(url):
res = reque(url, headers=HEADERS)
if not res:
return None
dytk = re.findall("dytk: '(.*)'", res.con('utf-8'))
if len(dytk):
return dytk[0]
return None
# 下载管理器
class DownloadWorker(Thread):
def __init__(self, queue):
T(self)
= queue
def run(self):
while True:
medium_type, uri, download_url, target_folder = .get()
download(medium_type, uri, download_url, target_folder)
.task_done()
5.执行截图:
6.源码获取:欢迎star