【实例】事业单位学习平台自动化操作

目录

一、创作背景:

二、实现逻辑:

三、代码分析【Deepseek分析】:

1) 主要功能

2)核心组件

    2.1 GUI界面 (AutomationApp类)

    2.2 浏览器自动化

    2.3 平台特定处理

3) 关键技术

4)代码亮点

5)总结

四、运行截图:

五、程序代码:


特别声明:***本代码仅限编程学习交流,不得作为学习工具使用!***

一、创作背景:

        公职人员每年有在线学习培训任务,作为年度考核依据之一。各视频学习培训网站均有各自的不同防作弊机制,即:不定时出现弹窗在线答题,作答正确后方能继续播放学习视频。最近学习Python,刚好可以拿来练手~

二、实现逻辑:

        主界面基于Tcl/Tk图形库开发,采用ttk控件进行界面美化,核心功能通过Selenium实现浏览器自动化操作。

三、代码分析(Deepseek分析):

        这是一个基于Python Tkinter和Selenium的浏览器自动化工具,主要用于在线学习平台的自动化操作。下面是对代码的详细分析: 

1) 主要功能

多浏览器支持:支持Edge和Chrome浏览器

多平台支持:可处理”思想天下”、”开放大学”、”一维科技”等学习平台的弹窗和验证

自动化操作:自动答题、视频监控、弹窗处理等

用户友好界面:图形化操作界面,实时日志显示

2)核心组件
    2.1 GUI界面 (AutomationApp类)

使用Tkinter和ttk构建现代化界面

包含状态指示灯、控制按钮、浏览器选择、学习平台选择和日志区域

实现了丰富的样式配置(ColorConfigsetup_modern_style方法)

支持日志重定向(TextRedirector类),高亮显示不同级别的信息

    2.2 浏览器自动化

浏览器配置BrowserConfig类负责检测浏览器版本、设置独立的用户数据目录和查找驱动

驱动初始化init_driver函数根据配置初始化WebDriver

进程管理kill_browser_processes函数确保精准清理自动化进程,不清理用户浏览器

    2.3 平台特定处理

思想天下math_popup_solver处理数学题弹窗

开放大学kfdx_popup处理防作弊问答(使用OCR识别)、视频播放自动控制

一维科技ywdc_popup处理答题卡弹窗、视频暂停后自动播放

3) 关键技术

多线程处理:使用Thread实现后台监控,避免阻塞GUI

线程安全:使用RLock保护共享资源

异常处理:完善的异常捕获和处理机制

OCR识别:使用pytesseract识别验证码

进程管理:使用psutil精准控制浏览器进程

日志系统:重定向标准输出到GUI界面

4)代码亮点

现代化UI设计

自定义颜色配置系统

状态指示灯动画效果

响应式按钮和交互元素

健壮性设计

完善的资源清理机制(cleanup方法)

浏览器状态检查(is_driver_alive)

版本兼容性检查

可扩展性

通过site_config字典轻松添加新平台

模块化的弹窗处理函数

用户体验

详细的错误提示

实时日志反馈

超链接识别和点击

5)总结

这是一个功能完善、设计良好的浏览器自动化工具,展示了以下技术能力:

复杂的GUI应用程序开发

浏览器自动化技术

多线程编程

健壮的错误处理

现代化的UI设计

代码结构清晰,注释完善,适合作为中大型Python GUI项目的参考。

四、运行截图:

 

五、程序代码(部分):

    程序只做了3个学习网站的弹窗,可根据实际不同网站进行字典配置更新,并处理相应模块的弹窗处理逻辑,进行匹配扩展。因代码有1668行,请移步下载:https://download.csdn.net/download/Ricky_One/90936258。包括完整源代码、资源文件及编译后的单个EXE文件(建议Win10以上操作系统,直接可使用。未将浏览器驱动文件打包进EXE,可根据程序提示,点击超链接可直接打开对应的浏览器驱动下载网址)。

def get_resource_path(relative_path):
    """获取资源的绝对路径(兼容开发模式和 PyInstaller 打包模式)"""
    if hasattr(sys, '_MEIPASS'):
        # PyInstaller 打包后的临时目录
        base_path = sys._MEIPASS
    else:
        # 开发时的当前目录
        base_path = os.path.abspath(".")
    return os.path.join(base_path, relative_path)

class AutomationApp(tk.Tk):
# 初始化及GUI构造部分省略...
    # ==================== GUI交互方法 ====================
    def show_error_dialog(self, message):
        """显示错误弹窗"""
        self.update_indicator("error")
        self.schedule_after(0, lambda: messagebox.showerror(
            "系统错误",
            message,
            detail="请根据提示解决问题后重试"
        ))

    def log(self, message):
        """线程安全的日志记录"""
        self.schedule_after(0, lambda: print(message))  # 确保所有日志流经 TextRedirector

    def update_status(self, message):
        """更新状态栏"""
        self.status_var.set(f"状态: {message}")
        self.update_idletasks()

    def start_automation(self):
        """启动自动化任务(线程安全版)"""
        if not self.is_running:
            try:
                self.update_indicator("wait")
                self.exit_flag.clear()
                self.driver = None
                browser_config = BrowserConfig(self)
                self.user_data_dir = browser_config.user_data_dir  # 保存路径
                browser_type = self.mode_var.get()
                
                # 终止可能存在的旧线程
                if hasattr(self, '_monitor_thread'):
                    try:
                        if self._monitor_thread.is_alive():
                            self._monitor_thread.join(timeout=0.5)
                    except:
                        pass
                    
                # 精确清理当前实例的浏览器进程
                kill_browser_processes(browser_type=browser_type, user_data_dir=self.user_data_dir)

                # 将 browser_config 传递给线程,创建并启动线程
                self._monitor_thread = Thread(
                    target=self._run_automation_wrapper,
                    daemon=True,
                    args=(browser_config,),
                    name=f"MonitorThread-{time.time()}"
                )
                self._monitor_thread.start()

                self.is_running = True
                self.btn_start.config(state=tk.DISABLED)
                self.btn_stop.config(state=tk.NORMAL)
                self.update_status("初始化...")
            except Exception as e:
                # 异常处理:记录错误并恢复UI状态
                self.log(f"🚨 浏览器初始化失败: {str(e)}")
                self.is_running = False
                self.btn_start.config(state=tk.NORMAL)
                self.btn_stop.config(state=tk.DISABLED)
                self.update_status("初始化失败")
                self.update_indicator("error")
                        
    def stop_automation(self):
        """用户点击停止按钮时的操作"""
        if not self.is_running:
            return
        
        self.exit_flag.set()        
        self.is_running = False
        self.schedule_after(100, lambda:self.cleanup(close_browser=True,
                        message="🛑 用户主动停止监控"))

    def _monitor_video_progress(self):
        """监控视频进度"""
        Tips = True
        current_item = None
        lists = 0
        while not self.exit_flag.is_set() and is_driver_alive(self.driver):
            try:
                # 获取所有视频项
                video_items = self.driver.find_elements(By.CSS_SELECTOR, "ul.playlist>li")
                lists=len(video_items)

                # 方法:检查 active 类               
                for index, item in enumerate(video_items):
                    if "active" in item.get_attribute("class") and current_item != index+1:
                        current_item = index + 1  # 从 1 开始编号
                        Tips=True
                        break
                            
                if current_item :
                    if Tips:
                        self.log(f"▶ 当前播放本课程第 {current_item}个视频")
                        Tips=False
                    video = self.driver.find_element(By.CSS_SELECTOR, "video")

                    # 使用JavaScript检查视频状态
                    is_ended = self.driver.execute_script("""
                        return arguments[0].ended;""", video)
                    
                    if is_ended:
                        if current_item<lists:
                            video_items[current_item].click()
                            self.log("▶ 当前视频播放完毕,自动切换至下个视频...")
                        else:
                            self.driver.find_element(By.CSS_SELECTOR, "div#tab-second").click()
                            mycourse_list = self.driver.find_elements(By.CSS_SELECTOR, "div.mycourse_list")
                            if len(mycourse_list) > 1:
                                mycourse_list[1].click()
                                self.log("✅ 本课程播放完毕,切换至下一课程成功")
                 
                if self.exit_flag.is_set():
                    break
                time.sleep(1)
            except Exception :
                pass
    def _run_automation_wrapper(self,browser_config):
        """包装方法用于异常捕获和线程清理"""
        try:
            self.run_automation(browser_config)
        except Exception:
            self.log("🔥 监控线程崩溃:浏览器启动异常或关闭")
        finally:
            if hasattr(self, '_monitor_thread'):
                del self._monitor_thread
        
    def run_automation(self, browser_config):  # 修改签名
        """自动化主逻辑(完整改进版)"""
        try:
            
            # 获取当前选择的网站
            current_strategy = self.strategy_var.get()
            config = self.site_config[current_strategy]

            # 初始化浏览器
            self.driver = init_driver(browser_config)
            
            # 显示启动信息
            browser_name = "Edge" if self.mode_var.get() == "edge" else "Chrome"
            self.log(f"
{'='*60}")
            self.log(f"浏览器启动正常:{browser_name}浏览器 | 用户目录: {browser_config.user_data_dir}")
            self.log(f"驱动程序路径: {browser_config.driver_path}")
            self.log(f"
🌐 正在访问:【{self.strategy_var.get()}】学习平台")
            self.log(f"{'='*60}")
            self.log("🚨 程序初始化完成,请等待浏览器完成加载...")
        
            # 导航到目标网站,并记录初始窗口句柄
            self.driver.get(config["url"])
            self.log(f"⏩ 页面切换至: {unquote(self.driver.current_url)}")
            self.update_status("就绪")
            original_window = self.driver.current_window_handle  
            last_url = ""
            retry_count = 0
            MAX_RETRY = 3
            login_flag = True # 登陆提示标志
            leave_flag = False # 离开学习网址标志
            
            if "hnsydwpx.cn" in config["url"]: # 开放大学视频监测
                self._video_thread = Thread(target=self._monitor_video_progress, daemon=True)
                self._video_thread.start()
                #self.log("视频监测开始")

            while not self.exit_flag.is_set():
                try:
                    # 增强的状态检查
                    if self.exit_flag.is_set():
                        self.log("ℹ️ 检测到退出标志,停止监控")
                        break
                    if not (self.is_running and is_driver_alive(self.driver)):
                        self.log("⚠️ 检测到停止信号或浏览器异常")
                        break
                    
                   # 处理页面逻辑
                    try:
                        current_url = self.driver.current_url
                        if "msn.cn" in current_url or "edge://" in current_url or "Chrome://" in current_url:
                            current_url = last_url
                            
                        if current_url is None:
                            self.log("⚠️ 浏览器处于恢复状态,等待页面加载...")
                            time.sleep(2)
                            continue                                                
                        if current_url != last_url:                     
                            if "play" in current_url or "Play" in current_url:
                                leave_flag = True
                                self.log("👉 已开始学习,监控启动")
                                self.log(f"⏩ 学习页面切换至: {unquote(current_url)}")
                                self.update_status("监控运行中...")
                                self.schedule_after(0, lambda: self.update_indicator("running"))
                            elif leave_flag:
                                leave_flag = False
                                self.log("👈 已离开学习,停止监控")
                                self.update_status("已停止监控")
                                self.schedule_after(0, lambda: self.update_indicator("idle"))
                            last_url = current_url        
                            
                        # 检测登陆界面并自动切换窗口
                        if "ggfw.rst.hunan.gov.cn" in current_url:
                            if login_flag: # 只显示一次提示
                                self.log("ⓘ 检测到登陆窗口,请按要求登陆...")
                                login_flag = False
                                self.update_status("等待登陆...")
                                
                            handles = self.driver.window_handles
                            flag = True
                            if len(handles) > 1 and flag:
                                for handle in handles:
                                    if handle != original_window:
                                        self.driver.switch_to.window(handle)
                                        sub_url = self.driver.current_url
                                        if "msn.cn" in current_url or "edge://" in current_url or "Chrome://" in current_url:
                                            self.driver.switch_to.window(original_window)
                                        if re.search(r'://([^/]+)',config["url"]).group(1) in sub_url:
                                            self.log("ⓘ 已完成登陆,请开始学习!")
                                            self.update_status("监控运行中...")
                                            self.schedule_after(0, lambda: self.update_indicator("running"))
                                            flag = False
                                            break
                        
                        # 使用安全调用处理弹窗
                        result = self._safe_popup_handler(
                            config['handler'], 
                            self.driver, 
                            self.exit_flag,
                            self.counter
                        )
                        
                        if result is True:
                            if self.counter.count > 0:
                                self.update_status(f"已处理{self.counter.count}次")
                            retry_count = 0
                        elif result is False:
                            retry_count += 1
                            if retry_count < 4: 
                                self.log(f"⚠️ 弹窗处理失败(重试次数:{retry_count}/{MAX_RETRY})")
                        else:
                            retry_count = 0
                            time.sleep(0.5) 
                            
                        if retry_count > MAX_RETRY:
                            self.log("ℹ️ 达到最大重试次数,刷新页面...")
                            self.driver.refresh()
                            retry_count = 0
                            time.sleep(5) 
                            
                        self.exit_flag.wait(1) #主循环等待时间
                        
                    except TimeoutException as e:
                        self.log(f"⚠️ 操作超时: {str(e)}")
                        continue
                    except NoSuchWindowException:
                        self.log("🚨 浏览器窗口已被关闭")
                        break
                    except WebDriverException as e:
                        if "invalid session" in str(e).lower():
                            self.log("🚨 浏览器会话已终止,停止处理")
                            break
                        self.log(f"⚠️ 浏览器异常: {type(e).__name__}")
                        continue  
                    except Exception as e:
                        self.log(f"🚨 未处理的异常: {type(e).__name__}")
                        break
                except Exception as e:
                    self.log(f"
🚨 监控循环异常: {type(e).__name__}")
                    break
                
        finally:
            self.schedule_after(0, lambda:self.cleanup(close_browser=True,
                message=f"
🛑【{current_strategy}】平台监控已停止!"))

    def _safe_popup_handler(self, handler_func, driver, exit_flag, counter):
        """带异常保护的弹窗处理器"""
        try:
            if exit_flag.is_set() or not is_driver_alive(driver):
                return None
            return handler_func(driver, exit_flag, counter, self)
        except Exception as e:
            self.log(f"⚠️ {handler_func.__name__} 处理失败: {str(e)}")
            return False  # 视为处理失败
    
    def cleanup(self, close_browser=True, message=None):
        """线程安全的资源清理方法(完整改进版)
        参数:
            close_browser: 是否关闭浏览器(默认True)
            message: 要记录的退出消息(可选)
        """
        if not self.winfo_exists():  # 优先检查窗口是否已销毁
            return

        def _cleanup():
            try:
                # *. 显式停止视频线程
                if hasattr(self, '_video_thread') and self._video_thread.is_alive():
                    self._video_thread.join(timeout=1.5)  # 最多等待1.5秒
                
                # 1. 确保状态标记重置(避免竞态条件)
                with self._running_lock:
                    self._is_running = False

                # 2. 安全获取并清理driver实例
                driver = None
                with self._driver_lock:
                    driver = self._driver
                    self._driver = None  # 显式置空防止重复清理

                # 3. 关闭浏览器(如果要求且存在)
                if close_browser and driver is not None:
                    try:
                        # 尝试正常关闭
                        if hasattr(driver, 'quit'):
                            driver.quit()
                            self.log("🚪 浏览器已安全关闭")
                    except Exception as e:
                        self.log(f"⚠️ 关闭浏览器时出错: {str(e)}")
                        # 强制终止残留进程
                        kill_browser_processes(browser_type=self.mode_var.get() , user_data_dir=self.user_data_dir)
                        self.log(" 已强制终止浏览器进程")

                # 4. 清理服务进程(如果存在)
                if hasattr(self, '_driver_service_pid'):
                    try:
                        if psutil.pid_exists(self._driver_service_pid):
                            psutil.Process(self._driver_service_pid).terminate()
                            self.log(f"🛑 已终止驱动服务进程(PID: {self._driver_service_pid})")
                    except Exception as e:
                        self.log(f"⚠️ 终止驱动服务进程失败: {str(e)}")
                    finally:
                        if hasattr(self, '_driver_service_pid'):
                            del self._driver_service_pid

                # 5. 更新UI状态(带存在性检查)
                if hasattr(self, 'btn_start') and self.winfo_exists():
                    self.btn_start.config(state=tk.NORMAL)
                    self.btn_stop.config(state=tk.DISABLED)
                    self.update_status("已停止监控")
                    self.update_indicator("idle")  # 确保状态灯重置
                    # 记录退出消息(如果有)
                    if message:
                        self.log(message)

            except Exception as e:
                # 记录清理过程中的任何异常(不影响程序退出)
                error_msg = f"⚠️ 清理过程中出错: {type(e).__name__} - {str(e)}"
                if hasattr(self, 'log') and self.winfo_exists():
                    self.log(error_msg)
                else:
                    print(error_msg)  # 最后兜底输出

        # 确保在主线程执行UI操作
        if self.winfo_exists():
            self.schedule_after(0, _cleanup)
            
    def on_window_close(self):
        """安全处理窗口关闭事件"""
        if self.is_running:  # 如果正在运行,先提示用户
            confirm = messagebox.askokcancel(
                "退出程序",
                "监控正在运行,建议先停止监控再退出。
确定要强制退出吗?",
                icon=messagebox.WARNING
            )
            if not confirm:  # 用户取消退出
                return

        # 1. 停止所有定时器
        self.cancel_all_after()   

        # 2. 设置退出标志
        self.exit_flag.set()
 
        # 3. 异步清理浏览器资源(不依赖Tkinter事件循环)
        browser_type = self.mode_var.get()       
        Thread(target=self._async_cleanup, args=(browser_type, self.user_data_dir), daemon=True).start()

    def _async_cleanup(self, browser_type, user_data_dir):
        """异步执行浏览器资源清理"""
        try:
            # 安全获取并清理driver实例
            driver = None
            with self._driver_lock:
                driver = self._driver
                self._driver = None

            if driver is not None:
                try:
                    driver.quit()  # 同步立即关闭 
                except Exception as e:
                    self.log(f"⚠️ 关闭浏览器时出错: {str(e)}")

            # 强制终止残留进程
            kill_browser_processes(browser_type=browser_type, user_data_dir=user_data_dir)

            self.destroy()

        except Exception as e:
            self.log(f"⚠️ 清理过程中出错: {str(e)}")
            
    def cancel_all_after(self):
        """安全取消所有定时器"""
        for after_id in self.after_ids.copy():
            try:
                self.after_cancel(after_id)
                if after_id in self.after_ids:
                    self.after_ids.remove(after_id)
            except tk.TclError:
                pass  # 忽略无效ID

    def schedule_after(self, ms, func):
        """统一调度定时器"""
        after_id = self.after(ms, func)
        self.after_ids.append(after_id)
        return after_id

    # ============线程锁优化 =================
    @property
    def driver(self):
        """线程安全的driver属性访问(增加窗口状态检查)"""
        if not self.winfo_exists():
            return None
        if not hasattr(self, '_driver_lock'):
            return None
        with self._driver_lock:
            return self._driver

    @driver.setter
    def driver(self, value):
        if not self.winfo_exists():
            return
        if hasattr(self, '_driver_lock'):
            with self._driver_lock:
                self._driver = value

    @property
    def is_running(self):
        with self._running_lock:
            return self._is_running

    @is_running.setter
    def is_running(self, value):
        with self._running_lock:
            self._is_running = value


def kill_browser_processes(browser_type=None, user_data_dir=None):
    """只精准清理自动化进程,不清理用户浏览器进程"""
    if not user_data_dir:
        return
    
    # 浏览器进程名称映射
    targets = {
        "edge": ["msedge.exe", "msedgedriver.exe"],
        "chrome": ["chrome.exe", "chromedriver.exe"]
    }.get(browser_type, [])
    
    # 预处理用户目录路径
    normalized_dir = os.path.normcase(os.path.abspath(user_data_dir))
    target_names = {name.lower() for name in targets}
    
    try:
        # 一次性获取所有进程信息
        procs = {p.pid: p for p in psutil.process_iter(['pid', 'name', 'cmdline'])}
        
        # 筛选目标进程
        matched_procs = []
        for pid, proc in procs.items():
            try:
                # 快速排除非目标进程
                if proc.info['name'].lower() not in target_names:
                    continue
                
                # 获取命令行参数
                cmdline = ' '.join(proc.info['cmdline']).lower()
                
                # 精准匹配用户数据目录和自动化标志
                if (f'--user-data-dir="{normalized_dir}"' in cmdline or 
                    f'--user-data-dir={normalized_dir}' in cmdline) and 
                   '--remote-allow-origins=*' in cmdline:
                    matched_procs.append(proc)
            except (psutil.NoSuchProcess, IndexError):
                continue
        
        # 直接强制终止匹配进程(避免等待)
        for proc in matched_procs:
            try:
                proc.kill()  # 直接发送SIGKILL
            except psutil.NoSuchProcess:
                pass
    
    except Exception as e:
        print(f"清理进程时出错: {str(e)}")


# 辅助组件及学习平台弹窗处理部分略...
© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容