目录
一、创作背景:
二、实现逻辑:
三、代码分析【Deepseek分析】:
1) 主要功能
2)核心组件
2.1 GUI界面 (AutomationApp类)
2.2 浏览器自动化
2.3 平台特定处理
3) 关键技术
4)代码亮点
5)总结
四、运行截图:
五、程序代码:
特别声明:***本代码仅限编程学习交流,不得作为学习工具使用!***
一、创作背景:
公职人员每年有在线学习培训任务,作为年度考核依据之一。各视频学习培训网站均有各自的不同防作弊机制,即:不定时出现弹窗在线答题,作答正确后方能继续播放学习视频。最近学习Python,刚好可以拿来练手~
二、实现逻辑:
主界面基于Tcl/Tk图形库开发,采用ttk控件进行界面美化,核心功能通过Selenium实现浏览器自动化操作。
三、代码分析(Deepseek分析):
这是一个基于Python Tkinter和Selenium的浏览器自动化工具,主要用于在线学习平台的自动化操作。下面是对代码的详细分析:
1) 主要功能
多浏览器支持:支持Edge和Chrome浏览器
多平台支持:可处理”思想天下”、”开放大学”、”一维科技”等学习平台的弹窗和验证
自动化操作:自动答题、视频监控、弹窗处理等
用户友好界面:图形化操作界面,实时日志显示
2)核心组件
2.1 GUI界面 (AutomationApp类)
使用Tkinter和ttk构建现代化界面
包含状态指示灯、控制按钮、浏览器选择、学习平台选择和日志区域
实现了丰富的样式配置(ColorConfig和setup_modern_style方法)
支持日志重定向(TextRedirector类),高亮显示不同级别的信息
2.2 浏览器自动化
浏览器配置:BrowserConfig类负责检测浏览器版本、设置独立的用户数据目录和查找驱动
驱动初始化:init_driver函数根据配置初始化WebDriver
进程管理:kill_browser_processes函数确保精准清理自动化进程,不清理用户浏览器
2.3 平台特定处理
思想天下:math_popup_solver处理数学题弹窗
开放大学:kfdx_popup处理防作弊问答(使用OCR识别)、视频播放自动控制
一维科技:ywdc_popup处理答题卡弹窗、视频暂停后自动播放
3) 关键技术
多线程处理:使用Thread实现后台监控,避免阻塞GUI
线程安全:使用RLock保护共享资源
异常处理:完善的异常捕获和处理机制
OCR识别:使用pytesseract识别验证码
进程管理:使用psutil精准控制浏览器进程
日志系统:重定向标准输出到GUI界面
4)代码亮点
现代化UI设计:
自定义颜色配置系统
状态指示灯动画效果
响应式按钮和交互元素
健壮性设计:
完善的资源清理机制(cleanup方法)
浏览器状态检查(is_driver_alive)
版本兼容性检查
可扩展性:
通过site_config字典轻松添加新平台
模块化的弹窗处理函数
用户体验:
详细的错误提示
实时日志反馈
超链接识别和点击
5)总结
这是一个功能完善、设计良好的浏览器自动化工具,展示了以下技术能力:
复杂的GUI应用程序开发
浏览器自动化技术
多线程编程
健壮的错误处理
现代化的UI设计
代码结构清晰,注释完善,适合作为中大型Python GUI项目的参考。
四、运行截图:

五、程序代码(部分):
程序只做了3个学习网站的弹窗,可根据实际不同网站进行字典配置更新,并处理相应模块的弹窗处理逻辑,进行匹配扩展。因代码有1668行,请移步下载:https://download.csdn.net/download/Ricky_One/90936258。包括完整源代码、资源文件及编译后的单个EXE文件(建议Win10以上操作系统,直接可使用。未将浏览器驱动文件打包进EXE,可根据程序提示,点击超链接可直接打开对应的浏览器驱动下载网址)。
def get_resource_path(relative_path):
"""获取资源的绝对路径(兼容开发模式和 PyInstaller 打包模式)"""
if hasattr(sys, '_MEIPASS'):
# PyInstaller 打包后的临时目录
base_path = sys._MEIPASS
else:
# 开发时的当前目录
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
class AutomationApp(tk.Tk):
# 初始化及GUI构造部分省略...
# ==================== GUI交互方法 ====================
def show_error_dialog(self, message):
"""显示错误弹窗"""
self.update_indicator("error")
self.schedule_after(0, lambda: messagebox.showerror(
"系统错误",
message,
detail="请根据提示解决问题后重试"
))
def log(self, message):
"""线程安全的日志记录"""
self.schedule_after(0, lambda: print(message)) # 确保所有日志流经 TextRedirector
def update_status(self, message):
"""更新状态栏"""
self.status_var.set(f"状态: {message}")
self.update_idletasks()
def start_automation(self):
"""启动自动化任务(线程安全版)"""
if not self.is_running:
try:
self.update_indicator("wait")
self.exit_flag.clear()
self.driver = None
browser_config = BrowserConfig(self)
self.user_data_dir = browser_config.user_data_dir # 保存路径
browser_type = self.mode_var.get()
# 终止可能存在的旧线程
if hasattr(self, '_monitor_thread'):
try:
if self._monitor_thread.is_alive():
self._monitor_thread.join(timeout=0.5)
except:
pass
# 精确清理当前实例的浏览器进程
kill_browser_processes(browser_type=browser_type, user_data_dir=self.user_data_dir)
# 将 browser_config 传递给线程,创建并启动线程
self._monitor_thread = Thread(
target=self._run_automation_wrapper,
daemon=True,
args=(browser_config,),
name=f"MonitorThread-{time.time()}"
)
self._monitor_thread.start()
self.is_running = True
self.btn_start.config(state=tk.DISABLED)
self.btn_stop.config(state=tk.NORMAL)
self.update_status("初始化...")
except Exception as e:
# 异常处理:记录错误并恢复UI状态
self.log(f"🚨 浏览器初始化失败: {str(e)}")
self.is_running = False
self.btn_start.config(state=tk.NORMAL)
self.btn_stop.config(state=tk.DISABLED)
self.update_status("初始化失败")
self.update_indicator("error")
def stop_automation(self):
"""用户点击停止按钮时的操作"""
if not self.is_running:
return
self.exit_flag.set()
self.is_running = False
self.schedule_after(100, lambda:self.cleanup(close_browser=True,
message="🛑 用户主动停止监控"))
def _monitor_video_progress(self):
"""监控视频进度"""
Tips = True
current_item = None
lists = 0
while not self.exit_flag.is_set() and is_driver_alive(self.driver):
try:
# 获取所有视频项
video_items = self.driver.find_elements(By.CSS_SELECTOR, "ul.playlist>li")
lists=len(video_items)
# 方法:检查 active 类
for index, item in enumerate(video_items):
if "active" in item.get_attribute("class") and current_item != index+1:
current_item = index + 1 # 从 1 开始编号
Tips=True
break
if current_item :
if Tips:
self.log(f"▶ 当前播放本课程第 {current_item}个视频")
Tips=False
video = self.driver.find_element(By.CSS_SELECTOR, "video")
# 使用JavaScript检查视频状态
is_ended = self.driver.execute_script("""
return arguments[0].ended;""", video)
if is_ended:
if current_item<lists:
video_items[current_item].click()
self.log("▶ 当前视频播放完毕,自动切换至下个视频...")
else:
self.driver.find_element(By.CSS_SELECTOR, "div#tab-second").click()
mycourse_list = self.driver.find_elements(By.CSS_SELECTOR, "div.mycourse_list")
if len(mycourse_list) > 1:
mycourse_list[1].click()
self.log("✅ 本课程播放完毕,切换至下一课程成功")
if self.exit_flag.is_set():
break
time.sleep(1)
except Exception :
pass
def _run_automation_wrapper(self,browser_config):
"""包装方法用于异常捕获和线程清理"""
try:
self.run_automation(browser_config)
except Exception:
self.log("🔥 监控线程崩溃:浏览器启动异常或关闭")
finally:
if hasattr(self, '_monitor_thread'):
del self._monitor_thread
def run_automation(self, browser_config): # 修改签名
"""自动化主逻辑(完整改进版)"""
try:
# 获取当前选择的网站
current_strategy = self.strategy_var.get()
config = self.site_config[current_strategy]
# 初始化浏览器
self.driver = init_driver(browser_config)
# 显示启动信息
browser_name = "Edge" if self.mode_var.get() == "edge" else "Chrome"
self.log(f"
{'='*60}")
self.log(f"浏览器启动正常:{browser_name}浏览器 | 用户目录: {browser_config.user_data_dir}")
self.log(f"驱动程序路径: {browser_config.driver_path}")
self.log(f"
🌐 正在访问:【{self.strategy_var.get()}】学习平台")
self.log(f"{'='*60}")
self.log("🚨 程序初始化完成,请等待浏览器完成加载...")
# 导航到目标网站,并记录初始窗口句柄
self.driver.get(config["url"])
self.log(f"⏩ 页面切换至: {unquote(self.driver.current_url)}")
self.update_status("就绪")
original_window = self.driver.current_window_handle
last_url = ""
retry_count = 0
MAX_RETRY = 3
login_flag = True # 登陆提示标志
leave_flag = False # 离开学习网址标志
if "hnsydwpx.cn" in config["url"]: # 开放大学视频监测
self._video_thread = Thread(target=self._monitor_video_progress, daemon=True)
self._video_thread.start()
#self.log("视频监测开始")
while not self.exit_flag.is_set():
try:
# 增强的状态检查
if self.exit_flag.is_set():
self.log("ℹ️ 检测到退出标志,停止监控")
break
if not (self.is_running and is_driver_alive(self.driver)):
self.log("⚠️ 检测到停止信号或浏览器异常")
break
# 处理页面逻辑
try:
current_url = self.driver.current_url
if "msn.cn" in current_url or "edge://" in current_url or "Chrome://" in current_url:
current_url = last_url
if current_url is None:
self.log("⚠️ 浏览器处于恢复状态,等待页面加载...")
time.sleep(2)
continue
if current_url != last_url:
if "play" in current_url or "Play" in current_url:
leave_flag = True
self.log("👉 已开始学习,监控启动")
self.log(f"⏩ 学习页面切换至: {unquote(current_url)}")
self.update_status("监控运行中...")
self.schedule_after(0, lambda: self.update_indicator("running"))
elif leave_flag:
leave_flag = False
self.log("👈 已离开学习,停止监控")
self.update_status("已停止监控")
self.schedule_after(0, lambda: self.update_indicator("idle"))
last_url = current_url
# 检测登陆界面并自动切换窗口
if "ggfw.rst.hunan.gov.cn" in current_url:
if login_flag: # 只显示一次提示
self.log("ⓘ 检测到登陆窗口,请按要求登陆...")
login_flag = False
self.update_status("等待登陆...")
handles = self.driver.window_handles
flag = True
if len(handles) > 1 and flag:
for handle in handles:
if handle != original_window:
self.driver.switch_to.window(handle)
sub_url = self.driver.current_url
if "msn.cn" in current_url or "edge://" in current_url or "Chrome://" in current_url:
self.driver.switch_to.window(original_window)
if re.search(r'://([^/]+)',config["url"]).group(1) in sub_url:
self.log("ⓘ 已完成登陆,请开始学习!")
self.update_status("监控运行中...")
self.schedule_after(0, lambda: self.update_indicator("running"))
flag = False
break
# 使用安全调用处理弹窗
result = self._safe_popup_handler(
config['handler'],
self.driver,
self.exit_flag,
self.counter
)
if result is True:
if self.counter.count > 0:
self.update_status(f"已处理{self.counter.count}次")
retry_count = 0
elif result is False:
retry_count += 1
if retry_count < 4:
self.log(f"⚠️ 弹窗处理失败(重试次数:{retry_count}/{MAX_RETRY})")
else:
retry_count = 0
time.sleep(0.5)
if retry_count > MAX_RETRY:
self.log("ℹ️ 达到最大重试次数,刷新页面...")
self.driver.refresh()
retry_count = 0
time.sleep(5)
self.exit_flag.wait(1) #主循环等待时间
except TimeoutException as e:
self.log(f"⚠️ 操作超时: {str(e)}")
continue
except NoSuchWindowException:
self.log("🚨 浏览器窗口已被关闭")
break
except WebDriverException as e:
if "invalid session" in str(e).lower():
self.log("🚨 浏览器会话已终止,停止处理")
break
self.log(f"⚠️ 浏览器异常: {type(e).__name__}")
continue
except Exception as e:
self.log(f"🚨 未处理的异常: {type(e).__name__}")
break
except Exception as e:
self.log(f"
🚨 监控循环异常: {type(e).__name__}")
break
finally:
self.schedule_after(0, lambda:self.cleanup(close_browser=True,
message=f"
🛑【{current_strategy}】平台监控已停止!"))
def _safe_popup_handler(self, handler_func, driver, exit_flag, counter):
"""带异常保护的弹窗处理器"""
try:
if exit_flag.is_set() or not is_driver_alive(driver):
return None
return handler_func(driver, exit_flag, counter, self)
except Exception as e:
self.log(f"⚠️ {handler_func.__name__} 处理失败: {str(e)}")
return False # 视为处理失败
def cleanup(self, close_browser=True, message=None):
"""线程安全的资源清理方法(完整改进版)
参数:
close_browser: 是否关闭浏览器(默认True)
message: 要记录的退出消息(可选)
"""
if not self.winfo_exists(): # 优先检查窗口是否已销毁
return
def _cleanup():
try:
# *. 显式停止视频线程
if hasattr(self, '_video_thread') and self._video_thread.is_alive():
self._video_thread.join(timeout=1.5) # 最多等待1.5秒
# 1. 确保状态标记重置(避免竞态条件)
with self._running_lock:
self._is_running = False
# 2. 安全获取并清理driver实例
driver = None
with self._driver_lock:
driver = self._driver
self._driver = None # 显式置空防止重复清理
# 3. 关闭浏览器(如果要求且存在)
if close_browser and driver is not None:
try:
# 尝试正常关闭
if hasattr(driver, 'quit'):
driver.quit()
self.log("🚪 浏览器已安全关闭")
except Exception as e:
self.log(f"⚠️ 关闭浏览器时出错: {str(e)}")
# 强制终止残留进程
kill_browser_processes(browser_type=self.mode_var.get() , user_data_dir=self.user_data_dir)
self.log(" 已强制终止浏览器进程")
# 4. 清理服务进程(如果存在)
if hasattr(self, '_driver_service_pid'):
try:
if psutil.pid_exists(self._driver_service_pid):
psutil.Process(self._driver_service_pid).terminate()
self.log(f"🛑 已终止驱动服务进程(PID: {self._driver_service_pid})")
except Exception as e:
self.log(f"⚠️ 终止驱动服务进程失败: {str(e)}")
finally:
if hasattr(self, '_driver_service_pid'):
del self._driver_service_pid
# 5. 更新UI状态(带存在性检查)
if hasattr(self, 'btn_start') and self.winfo_exists():
self.btn_start.config(state=tk.NORMAL)
self.btn_stop.config(state=tk.DISABLED)
self.update_status("已停止监控")
self.update_indicator("idle") # 确保状态灯重置
# 记录退出消息(如果有)
if message:
self.log(message)
except Exception as e:
# 记录清理过程中的任何异常(不影响程序退出)
error_msg = f"⚠️ 清理过程中出错: {type(e).__name__} - {str(e)}"
if hasattr(self, 'log') and self.winfo_exists():
self.log(error_msg)
else:
print(error_msg) # 最后兜底输出
# 确保在主线程执行UI操作
if self.winfo_exists():
self.schedule_after(0, _cleanup)
def on_window_close(self):
"""安全处理窗口关闭事件"""
if self.is_running: # 如果正在运行,先提示用户
confirm = messagebox.askokcancel(
"退出程序",
"监控正在运行,建议先停止监控再退出。
确定要强制退出吗?",
icon=messagebox.WARNING
)
if not confirm: # 用户取消退出
return
# 1. 停止所有定时器
self.cancel_all_after()
# 2. 设置退出标志
self.exit_flag.set()
# 3. 异步清理浏览器资源(不依赖Tkinter事件循环)
browser_type = self.mode_var.get()
Thread(target=self._async_cleanup, args=(browser_type, self.user_data_dir), daemon=True).start()
def _async_cleanup(self, browser_type, user_data_dir):
"""异步执行浏览器资源清理"""
try:
# 安全获取并清理driver实例
driver = None
with self._driver_lock:
driver = self._driver
self._driver = None
if driver is not None:
try:
driver.quit() # 同步立即关闭
except Exception as e:
self.log(f"⚠️ 关闭浏览器时出错: {str(e)}")
# 强制终止残留进程
kill_browser_processes(browser_type=browser_type, user_data_dir=user_data_dir)
self.destroy()
except Exception as e:
self.log(f"⚠️ 清理过程中出错: {str(e)}")
def cancel_all_after(self):
"""安全取消所有定时器"""
for after_id in self.after_ids.copy():
try:
self.after_cancel(after_id)
if after_id in self.after_ids:
self.after_ids.remove(after_id)
except tk.TclError:
pass # 忽略无效ID
def schedule_after(self, ms, func):
"""统一调度定时器"""
after_id = self.after(ms, func)
self.after_ids.append(after_id)
return after_id
# ============线程锁优化 =================
@property
def driver(self):
"""线程安全的driver属性访问(增加窗口状态检查)"""
if not self.winfo_exists():
return None
if not hasattr(self, '_driver_lock'):
return None
with self._driver_lock:
return self._driver
@driver.setter
def driver(self, value):
if not self.winfo_exists():
return
if hasattr(self, '_driver_lock'):
with self._driver_lock:
self._driver = value
@property
def is_running(self):
with self._running_lock:
return self._is_running
@is_running.setter
def is_running(self, value):
with self._running_lock:
self._is_running = value
def kill_browser_processes(browser_type=None, user_data_dir=None):
"""只精准清理自动化进程,不清理用户浏览器进程"""
if not user_data_dir:
return
# 浏览器进程名称映射
targets = {
"edge": ["msedge.exe", "msedgedriver.exe"],
"chrome": ["chrome.exe", "chromedriver.exe"]
}.get(browser_type, [])
# 预处理用户目录路径
normalized_dir = os.path.normcase(os.path.abspath(user_data_dir))
target_names = {name.lower() for name in targets}
try:
# 一次性获取所有进程信息
procs = {p.pid: p for p in psutil.process_iter(['pid', 'name', 'cmdline'])}
# 筛选目标进程
matched_procs = []
for pid, proc in procs.items():
try:
# 快速排除非目标进程
if proc.info['name'].lower() not in target_names:
continue
# 获取命令行参数
cmdline = ' '.join(proc.info['cmdline']).lower()
# 精准匹配用户数据目录和自动化标志
if (f'--user-data-dir="{normalized_dir}"' in cmdline or
f'--user-data-dir={normalized_dir}' in cmdline) and
'--remote-allow-origins=*' in cmdline:
matched_procs.append(proc)
except (psutil.NoSuchProcess, IndexError):
continue
# 直接强制终止匹配进程(避免等待)
for proc in matched_procs:
try:
proc.kill() # 直接发送SIGKILL
except psutil.NoSuchProcess:
pass
except Exception as e:
print(f"清理进程时出错: {str(e)}")
# 辅助组件及学习平台弹窗处理部分略...


















暂无评论内容