python自动化提取网页数据(火狐浏览器版)

📅 2026/7/3 5:40:29
python自动化提取网页数据(火狐浏览器版)
利用selenium库配合火狐驱动实现自动化额外加pyautogui库实现并行操作pandas库保存网页数据实现方式如下from bs4 import BeautifulSoup from selenium import webdriver #from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC import time import re from selenium.webdriver.firefox.options import Options from selenium.webdriver.firefox.firefox_binary import FirefoxBinary from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.keys import Keys from selenium.webdriver.firefox.service import Service import pandas as pd from selenium.webdriver import FirefoxProfile from openpyxl.styles import Border,Side,Font,Alignment,PatternFill import os import pyautogui import pyperclip import pygetwindow as gw import threading import csv #复制详情网址处理函数 def get_xq_url(): # 复制详情网址 # 检查是否有火狐浏览器打开 firefox_wins gw.getWindowsWithTitle(MOzilla Firefox) if not firefox_wins: print(未检测到打开的火狐浏览器) else: ff_win firefox_wins[0] ff_win.activate() # 切换火狐浏览器 time.sleep(1) pyautogui.click(x324, y47) time.sleep(2) pyautogui.hotkey(ctrl, a) time.sleep(0.1) pyautogui.hotkey(ctrl, c) xq_url pyperclip.paste() print(f详情网址为{xq_url}) time.sleep(2) #获取完之后点击关闭按钮 pyautogui.click(x1882,y69) time.sleep(0.5) # 切回原始界面 pyautogui.keyDown(alt) pyautogui.press(tab) pyautogui.keyUp(alt) return xq_url #解决多线程无法有返回值的问题 class ThreadWithResult(threading.Thread): def __init__(self,target,args()): super().__init__(targettarget,argsargs) self.result None #执行函数捕获返回值url存入result def run(self): self.result self._target(*self._args) def get_result(self): return self.result #写入文件模板 def write_file(v_df,out_put_flie,md,sheent_name): with pd.ExcelWriter(out_put_flie, engineopenpyxl, modemd) as writer: v_df.to_excel(writer, sheet_namesheent_name, indexFalse) # 输出总体文件 ws writer.sheets[sheent_name] # # 加边框 thin Side(stylethin, color000000) border Border(topthin, rightthin, bottomthin, leftthin) # 给所有表格加边框 for row in ws.iter_rows(min_row1, max_rowws.max_row, min_col1, max_colws.max_column): for cell in row: cell.border border # 自适应列宽处理 for col in ws.columns: col_center col[0].column_letter max_len max(len(str(cell.value)) for cell in col) ws.column_dimensions[col_center].width max_len 6 #切换最底层的iframe def get_all_deep_iframe(driver,flag): while True: try: wait WebDriverWait(driver, 15) inner_frame wait.until( EC.presence_of_element_located((By.XPATH, f//iframe[starts-with(name,{flag})]))) driver.switch_to.frame(inner_frame) except: break def get_all_deep_south_iframe(driver,flag): switch_count 0 #统计成功切入的iframe数 while True: try: wait WebDriverWait(driver, 15) inner_frame wait.until( EC.presence_of_element_located((By.XPATH, f//iframe[starts-with(name,{flag})]))) driver.switch_to.frame(inner_frame) switch_count 1 except: break return switch_count #处理详情页左边树形菜单数据 def left_tree_get_data(driver,txt): page_html_loop driver.page_source soup_old_loop BeautifulSoup(page_html_loop, html.parser) data_column [] data_column1 [] data_column2 [] # 提取表头 header_column [div.get_text(stripTrue) for div in soup_old_loop.select( #Header_myiframe0_0,#TH_Right_myiframe0 .list_gridCell_standard)] # 获取表头 # 提取表数据 tr_list1 soup_old_loop.select(tr[id^TR_Left_myiframe0_]) # 获取表数据 tr_list2 soup_old_loop.select(tr[id^TR_Right_myiframe0_]) # 获取表数据 for tr in tr_list1: lock_cells [div.get_text(stripTrue) for div in tr.select(div.list_gridCell_lock)] data_column1.append(lock_cells) for tr in tr_list2: std_cells [div.get_text(stripTrue) for div in tr.select(div.list_gridCell_standard)] data_column2.append(std_cells) for lock_rr, std_arr in zip(data_column1, data_column2): new_row lock_rr std_arr data_column.append(new_row) df3 pd.DataFrame(data_column, columnsheader_column) new_df3_pivot df3.T new_df3_pivot.reset_index(inplaceTrue) new_df3_pivot.rename(columns{index: 旧字段, 0: 旧字段值, 1: 旧字段值2}, inplaceTrue) new_df3_pivot.insert(0, 新增字段1, 字段1值) new_df3_pivot.insert(1, 新增字段2, f字段2值-{txt}) return new_df3_pivot #提取详情页面数据函数 def get_xq_right_data(tmp_url2,driver,wait,idx): if idx 0: md w else: md a # 回切当前iframe driver.switch_to.default_content() driver.switch_to.frame(ObjectList) inner_frame2 wait.until(EC.presence_of_element_located((By.XPATH, f//iframe[starts-with(name,center)]))) driver.switch_to.frame(inner_frame2) time.sleep(1) # 重新获取idx current_spans wait.until(EC.presence_of_all_elements_located((By.XPATH, //span[contains(class,tree-title)]))) span_elem current_spans[idx] txt span_elem.text.strip() if idx 0: south_num_flag 0 #soutn页面标识 span_elem.click() # 切换最底层iframe get_all_deep_iframe(driver,center) time.sleep(6) page_html driver.page_source soup_old BeautifulSoup(page_html, html.parser) tr_list soup_old.select(tr) res_list_xq_all [] data_dict {} for tr in tr_list: left_td_all tr.select(td[class^info_td_left]) right_td_all tr.select(td[class^info_td_right]) for left_td, right_td in zip(left_td_all, right_td_all): left_style left_td.get(style, ) right_style right_td.get(style, ) # 不要隐藏的字段 if display: none in left_style or display: none in right_style: continue name left_td.find(span).get_text(stripTrue) # 下拉框 sel right_td.find(select) # 输入框 inp right_td.find(input) if sel: code sel.get(initvalue, ) val code opt_list sel.find_all(option) for opt in opt_list: opt_val opt.get(value, ) if opt_val code: val opt.get_text(stripTrue) break elif inp: val inp.get(value, ) else: val data_dict[name] val column_1 新增字段值3 column_2 新增字段值3-详情-基本信息 for name,val in data_dict.items(): res_list_xq_all.append([column_1,column_2,name,val]) #res_list_xq_all.append(res_list_xq) #print(res_list_xq_all) new_df2 pd.DataFrame(res_list_xq_all,columns[新增字段1,新增字段2,旧字段,旧字段值]) new_df2[旧字段值2] new_df2.to_csv(tmp_url2, modemd, headerTrue,indexFalse,encodinggbk) time.sleep(3) else: # 重新获取idx current_spans wait.until( EC.presence_of_all_elements_located((By.XPATH, //span[contains(class,tree-title)]))) span_elem current_spans[idx] span_elem.click() # 切换最底层center iframe get_all_deep_iframe(driver,center) time.sleep(6) new_df3_pivotleft_tree_get_data(driver, txt) new_df3_pivot.to_csv(tmp_url2, modemd, headerFalse,indexFalse,encodinggbk) time.sleep(1) #切换上一层iframe driver.switch_to.parent_frame() south_num_flagget_all_deep_south_iframe(driver,south) if south_num_flag: new_df3_pivot_1 left_tree_get_data(driver, txt) new_df3_pivot_1.to_csv(tmp_url2, modemd, headerFalse,indexFalse,encodinggbk) return south_num_flag def acess_old_website(tmp_url1,tmp_url2,filer_code_yp,idx): login_url 登录网址 #登录网址 USERNAME USERNAME PASSWORD PASSWORD fire_options Options() fire_options.set_preference(dom.webdriver.enabled,False) fire_options.set_preference(useAutomationExtension,False) #忽略自签证书错误 fire_options.accept_insecure_certs True fire_options.set_preference(layout.css.devPixelsPerPx,1.0) fire_options.set_preference(dom.disable_open_during_load,True) fire_pathFirefoxBinary(rC:\firefox.exe) #指定火狐程序 gecko_path rE:\geckodriver.exe #指定火狐驱动 config_path rC:\rcml7n57.default #指定配置路径通过火狐浏览器网址栏输入about:profiles寻找 profiles FirefoxProfile(config_path) driver webdriver.Firefox(optionsfire_options,firefox_binaryfire_path,executable_pathgecko_path,firefox_profileprofiles) driver.execute_script( Object.defineProperty(navigator,webdriver,{get: () undefined }) document.querySelectorAll(*).forEach(item{item.style.pointerEventsauto}) ) wait WebDriverWait(driver, 15) try: driver.get(login_url) time.sleep(1) driver.execute_script(window.confirm function(){return true;}; window.alert function(){return true;}) driver.execute_script(window.alert function(){return true;}) user_input wait.until(EC.presence_of_element_located((id,UserID))) #driver.find_element(id, UserID) user_input.clear() user_input.send_keys(USERNAME) time.sleep(1) pwd_input wait.until(EC.presence_of_element_located((xpath,//input[classDpwd and typetext]))) #driver.find_element(id, Dpwd) pwd_input.click() time.sleep(0.5) pwd_hide driver.find_element(id,Password) driver.execute_script(farguments[0].value{PASSWORD},pwd_hide) time.sleep(0.3) # 登录 login_btn driver.find_element(xpath, //a[classlogin_btn]) login_btn.click() #默认确定按钮 alert WebDriverWait(driver, 10).until(EC.alert_is_present()) alert.accept() time.sleep(3) driver.execute_script( var e document.evaluate(/html/body/div[2]/div[1]/div[1]/div[2]/div[4]/div/ul/li[1]/a, document,null,XPathResult.FIRST_ORDERED_NODE_TYPE,null).singleNodeValue; if(e){ e.click();e.focus();console.log(执行成功);} else{ console.log(执行失败未找到元素);} ) # 一级菜单 time.sleep(1) driver.execute_script( var e document.evaluate(/html/body/table/tbody/tr[1]/td/table/tbody/tr/td[2]/ul/li[1]/a/span[2], document,null,XPathResult.FIRST_ORDERED_NODE_TYPE,null).singleNodeValue; if(e){ e.click();e.focus();console.log(执行成功);} else{ console.log(执行失败未找到元素);} ) # 二级菜单 time.sleep(1) driver.execute_script( var e document.evaluate(/html/body/table/tbody/tr[1]/td/table/tbody/tr/td[2]/ul/li[1]/ul/li[1]/a/span[2], document,null,XPathResult.FIRST_ORDERED_NODE_TYPE,null).singleNodeValue; if(e){ e.click();e.focus();console.log(执行成功);} else{ console.log(执行失败未找到元素);} ) # 三级菜单 time.sleep(6) #先切入到 MainCenter driver.switch_to.frame(MainCenter) time.sleep(1) #内层 inner_frame driver.find_element(xpath,//iframe[contains(id,tab_T01_iframe)]) driver.switch_to.frame(inner_frame) time.sleep(2) driver.execute_script( var e document.evaluate(//html/body/table/tbody/tr[3]/td/form/hidden/div[1]/div[1]/fieldset/legend, document,null,XPathResult.FIRST_ORDERED_NODE_TYPE,null).singleNodeValue; if(e){ e.click();e.focus();console.log(执行成功);} else{ console.log(执行失败未找到元素);} ) # 查询条件 #点击查询条件 # expand_search wait.until(EC.element_to_be_clickable((By.XPATH,//div[contains(.,查询条件)]))) # expand_search.click() time.sleep(2) #定位字段输入框,并赋值 input_box driver.find_element(By.NAME,输入框网页端名称) input_box.clear() input_box.send_keys(filer_code_yp) input_box.send_keys(Keys.ENTER) time.sleep(8) #获取第一个页面字段值 data_xxgl_js driver.execute_script( let result[]; for(let idx0;idxDZ.length;idx){ let rowArr DZ[idx][2]; if(Array.isArray(rowArr)){ for(let j0;jrowArr.length;j){ if (rowArr[j][0] arguments[0]){ result.push(rowArr[j]); } } } } return result; ,filer_code_yp) xxgl_cols [匹配字段名称] df pd.DataFrame(data_xxgl_js,columnsxxgl_cols) #码值转换 zt_dict { 01:临时, 02:生效, 03:落实担保, 04:解除担保, 05:终结, 06:处置, 07:删除 } df[状态]df[状态].map(zt_dict).fillna() sfzlw_dict { 01:是, 02:否 } df[码值字段相关] df[码值字段相关].map(sfzlw_dict).fillna() new_df_pivot new_df.T new_df_pivot.reset_index(inplaceTrue) new_df_pivot.rename(columns{index:旧字段,0:旧字段值},inplaceTrue) new_df_pivot.insert(0,新增字段1,字段1值) new_df_pivot.insert(1,新增字段2,字段2值) new_df_pivot[旧字段值2] new_df_pivot.to_excel(tmp_url1,indexFalse) # print(请复制详情菜单完整地址,并关闭按钮别忘了关闭按钮,请按回车键下一步) t ThreadWithResult(targetget_xq_url,args()) #点击详情 menu3 wait.until(EC.element_to_be_clickable((By.XPATH, //*[contains(text(),详情)]))) time.sleep(1.5) ActionChains(driver).move_to_element(menu3).pause(0.3).perform() print(执行并行线程前) time.sleep(1) t.start() ActionChains(driver).click(menu3).perform() print(执行并行线程后) #等待子线程执行完毕返回网址 t.join() xq_url t.get_result() #如果鼠标获取不到则手工输入 if not xq_url or https: not in xq_url: # 手动切换新标签 xq_url input(需要手工操作请输入详情网址) #进入详情页面 driver.get(xq_url) time.sleep(2) # 回切当前iframe driver.switch_to.default_content() driver.switch_to.frame(ObjectList) inner_frame2 wait.until(EC.presence_of_element_located((By.XPATH, f//iframe[starts-with(name,center)]))) driver.switch_to.frame(inner_frame2) left_tree wait.until(EC.presence_of_all_elements_located((By.XPATH, //span[contains(class,tree-title)]))) tree_node_dict {num: ele.text.strip() for num,ele in enumerate(left_tree)} #执行提取数据函数 layer_south get_xq_right_data(tmp_url2,driver, wait,idx) time.sleep(5) if layer_south: print(存在副页面) finally: driver.quit() return tree_node_dict def tran_data_seed_one(is_merge_flag,tmp_url1,tmp_url2,filer_code_yp): if is_merge_flag 0: idx 0 #dict_new None while True: try: dict_new acess_old_website(tmp_url1, tmp_url2, filer_code_yp, idx) dict_len len(dict_new) print(f{idx 1}:{dict_new[idx]} 项执行成功需要执行总数为{dict_len}还剩余{dict_len - idx - 1}项) if idx 1 dict_len: print(执行完毕退出循环) break idx 1 # 进行下一个 # break except Exception as e: print(f{idx 1}:{dict_new[idx]}项报错错误信息{str(e)},3秒后请重试) time.sleep(3) elif is_merge_flag 1: df pd.read_excel(tmp_url1) #df2 pd.read_table(tmp_url2,headerNone,sep,,enginepython) with open(tmp_url2,r,encodinggbk) as f: data list(csv.reader(f)) max_len max(map(len,data)) #print(data) header_rowdata[0] header header_row[f字段值{i2} for i in range(1,(max_len-len(header_row))1 )] data_row [row [] * (max_len - len(row)) for idx, row in enumerate(data[1:])] #print(header) df2 pd.DataFrame(data_row,columnsheader) # df2.to_excel(rE:\临时文件\新建文件夹\linshi2.xlsx) # df.to_excel(rE:\临时文件\新建文件夹\linshi1.xlsx) total_df pd.concat([df, df2], ignore_indexTrue) # 写入文件 write_file(total_df, out_put_flie, w, Sheet1) seed_list [ [分类,分类值] ] tmp_url1r\result.xlsx #第一个临时文件存外层数据 tmp_url2r\result2.csv #第二个临时文件存详情文件 for yp_zl,yp_bh in seed_list: out_put_flie r\模块- yp_zl .xlsx try: tran_data_seed_one(0,#处理方式 #0 只执行result文件1:只执行合并文件 tmp_url1, #第一个临时文件存外层数据 tmp_url2, #第二个临时文件存详情文件 yp_bh #押品编号 ) print(f生成result {yp_zl} 临时文件成功) tran_data_seed_one(1, # 处理方式 #0 只执行result文件1:只执行合并文件 tmp_url1, # 第一个临时文件存外层数据 tmp_url2, # 第二个临时文件存详情文件 yp_bh # 押品编号 ) print(f生成 {yp_zl} 文件成功) except Exception as e: print(f执行失败失败信息{e})