-
Notifications
You must be signed in to change notification settings - Fork 9
/
Autovisor.py
267 lines (245 loc) · 10.7 KB
/
Autovisor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
# encoding=utf-8
import asyncio
import traceback
import time
from typing import Tuple
from res.configs import Config
from res.progress import get_progress, show_progress
from playwright.async_api import async_playwright, Playwright, Page, Browser
from playwright._impl._errors import TargetClosedError, TimeoutError
from res.support import show_donate
from res.utils import optimize_page, get_lesson_name, video_optimize, get_filtered_class
# 获取全局事件循环
event_loop_verify = asyncio.Event()
event_loop_answer = asyncio.Event()
async def auto_login(config: Config, page: Page):
await page.goto(config.login_url)
await page.locator('#lUsername').fill(config.username)
await page.locator('#lPassword').fill(config.password)
await page.wait_for_timeout(500)
await page.evaluate(config.login_js)
await page.wait_for_selector(".wall-main", state="hidden")
async def init_page(p: Playwright, config: Config) -> Tuple[Page, Browser]:
driver = "msedge" if config.driver == "edge" else config.driver
if not config.exe_path:
print(f"正在启动{config.driver}浏览器...")
browser = await p.chromium.launch(channel=driver, headless=False)
else:
print(f"正在启动{config.driver}浏览器...")
browser = await p.chromium.launch(executable_path=config.exe_path, channel=driver, headless=False)
context = await browser.new_context()
page = await context.new_page()
page.set_default_timeout(24 * 3600 * 1000)
viewsize = await page.evaluate(
'''() => {
return {width: window.screen.availWidth, height: window.screen.availHeight};
}'''
)
viewsize["height"] -= 50
await page.set_viewport_size(viewsize)
return page, browser
async def tail_work(page: Page, start_time, all_class, title) -> bool:
reachTimeLimit = False
page.set_default_timeout(24 * 3600 * 1000)
time_period = (time.time() - start_time) / 60
if 0 < config.limitMaxTime <= time_period:
print(f"\n当前课程已达时限:{config.limitMaxTime}min\n即将进入下门课程!")
reachTimeLimit = True
else:
class_name = await all_class[-1].get_attribute('class')
if "current_play" in class_name:
print("\n已学完本课程全部内容!")
print("==" * 10)
else:
print(f"\n\"{title}\" Done !")
print(f"本次课程已学习:{time_period:.1f} min")
return reachTimeLimit
async def play_video(page: Page):
canvas = await page.wait_for_selector(".videoArea", state="attached")
await canvas.click()
while True:
try:
await asyncio.sleep(0.5)
playing = await page.query_selector(".pauseButton")
if not playing:
canvas = await page.wait_for_selector(".videoArea", state="attached")
await canvas.click()
await page.wait_for_selector(".pauseButton", state="attached")
except TimeoutError:
continue
async def skip_questions(page: Page, event_loop):
while True:
try:
await asyncio.sleep(0.5)
await page.wait_for_selector(".topic-item", state="attached", timeout=1000)
if not await page.query_selector(".answer"):
choices = await page.locator(".topic-item").all()
for each in choices:
await each.click()
await page.wait_for_timeout(200)
await page.evaluate(config.close_ques)
event_loop.set()
except TimeoutError:
continue
async def wait_for_verify(page: Page, event_loop):
while True:
try:
await asyncio.sleep(1)
await page.wait_for_selector(".yidun_modal__title", state="attached", timeout=1000)
print("\n检测到安全验证,请手动点击完成...")
await page.wait_for_selector(".yidun_modal__title", state="hidden", timeout=24 * 3600 * 1000)
event_loop.set()
except TimeoutError:
continue
async def learning_loop(page: Page, config: Config):
title_selector = await page.wait_for_selector(".source-name")
course_title = await title_selector.text_content()
print(f"当前课程:<<{course_title}>>")
await page.wait_for_selector(".clearfix.video", state="attached")
all_class = await get_filtered_class(page)
start_time = time.time()
cur_index = 0
while cur_index < len(all_class):
await all_class[cur_index].click()
await page.wait_for_selector(".current_play", state="attached")
await page.wait_for_timeout(1000)
title = await get_lesson_name(page)
print("正在学习:%s" % title)
page.set_default_timeout(10000)
try:
await video_optimize(page, config)
except TimeoutError:
if await page.query_selector(".yidun_modal__title"):
await event_loop_verify.wait()
curtime, total_time = await get_progress(page)
timer = 0
while curtime != "100%":
try:
time_period = (time.time() - start_time) / 60
timer += 1
if 0 < config.limitMaxTime <= time_period:
break
elif timer % 5 == 0:
curtime, total_time = await get_progress(page)
show_progress(desc="完成进度:", cur_str=curtime)
except TimeoutError as e:
if await page.query_selector(".yidun_modal__title"):
await event_loop_verify.wait()
elif await page.query_selector(".topic-title"):
await event_loop_answer.wait()
else:
print(f"\n[Warn]{repr(e)}")
if await all_class[cur_index].get_attribute('class') == "current_play":
cur_index += 1
reachTimeLimit = await tail_work(page, start_time, all_class, title)
if reachTimeLimit:
return
async def reviewing_loop(page: Page, config: Config):
title_selector = await page.wait_for_selector(".source-name")
course_title = await title_selector.text_content()
print(f"当前课程:<<{course_title}>>")
await page.wait_for_selector(".clearfix.video", state="attached")
all_class = await get_filtered_class(page, enableRepeat=True)
course_start_time = time.time()
cur_index = 0
while cur_index < len(all_class):
await all_class[cur_index].click()
await page.wait_for_selector(".current_play", state="attached")
await page.wait_for_timeout(1000)
title = await get_lesson_name(page)
print("正在学习:%s" % title)
page.set_default_timeout(10000)
try:
await video_optimize(page, config)
except TimeoutError:
if await page.query_selector(".yidun_modal__title"):
await event_loop_verify.wait()
curtime, total_time = await get_progress(page)
start_time = time.time()
timer = 0
while True:
est_time = (time.time() - start_time) * config.limitSpeed
if est_time > total_time:
break
try:
time_period = (time.time() - course_start_time) / 60
timer += 1
if 0 < config.limitMaxTime <= time_period:
break
elif timer % 5 == 0:
curtime, total_time = await get_progress(page)
show_progress(desc="完成进度:", cur_str=curtime)
except TimeoutError as e:
if await page.query_selector(".yidun_modal__title"):
await event_loop_verify.wait()
elif await page.query_selector(".topic-title"):
await event_loop_answer.wait()
else:
print(f"\n[Warn]{repr(e)}")
if await all_class[cur_index].get_attribute('class') == "current_play":
cur_index += 1
reachTimeLimit = await tail_work(page, course_start_time, all_class, title)
if reachTimeLimit:
return
async def entrance(config: Config):
try:
async with async_playwright() as p:
page, browser = await init_page(p, config)
# 进行登录
if not config.username or not config.password:
print("请手动输入账号密码...")
print("等待登录完成...")
await auto_login(config, page)
# 遍历所有课程,加载网页
for course_url in config.course_urls:
print("开始加载播放页...")
await page.goto(course_url)
await page.wait_for_selector(".studytime-div")
# 关闭弹窗,优化页面体验
await optimize_page(page, config)
# 启动协程任务
skip_ques_task = asyncio.create_task(skip_questions(page, event_loop_answer))
play_video_task = asyncio.create_task(play_video(page))
verify_task = asyncio.create_task(wait_for_verify(page, event_loop_verify))
# 启动课程主循环
if config.enableRepeat:
await reviewing_loop(page, config)
else:
await learning_loop(page, config)
# 终止协程任务
skip_ques_task.cancel()
play_video_task.cancel()
verify_task.cancel()
await browser.close()
print("==" * 10)
print("所有课程学习完毕!")
show_donate("res/QRcode.jpg")
time.sleep(5)
except Exception:
traceback.print_exc()
if __name__ == "__main__":
print("Github:CXRunfree All Rights Reserved.")
print("===== Runtime Log =====")
try:
print("正在载入数据...")
config = Config()
asyncio.run(entrance(config))
except Exception as e:
if isinstance(e, KeyError):
input("[Error]可能是account文件的配置出错!")
elif isinstance(e, UserWarning):
input("[Error]是不是忘记填账号密码了?")
elif isinstance(e, FileNotFoundError):
print(f"文件缺失: {e.filename}")
input("[Error]程序缺失依赖文件,请重新安装程序!")
elif isinstance(e, TargetClosedError):
input("[Error]糟糕,网页关闭了!")
elif isinstance(e, UnicodeDecodeError):
print("configs配置文件编码有误,保存时请选择utf-8或gbk!")
input(f"[Error]{e}")
else:
print(f"[Error]{e}")
with open("log.txt", "w", encoding="utf-8") as log:
log.write(traceback.format_exc())
print("错误日志已保存至:log.txt")
input("系统出错,请检查后重新启动!")