信息发布→ 登录 注册 退出

pyppeteer执行js绕过webdriver监测方法上

发布时间:2026-01-11

点击量:
目录
  • Pyppeteer简介
  • 下载
  • 打开网页并截图
  • 评估页面上的脚本
  • 关键字参数的选项
  • 选择器
  • 基础用法
  • 模拟输入
  • 使用 tkinter 获取页面高度 宽度
  • 爬取京东商城
  • 爬取淘宝网
    • taobao.py
    • exe_js.py
    • alifunc.py
  • 利用获取到的cookie 爬取搜索内容
    • 针对iframe 的操作

      Pyppeteer简介

      Puppeteer 是 Google 基于 Node.js 开发的一个工具,有了它我们可以通过 JavaScript 来控制 Chrome 浏览器的一些操作,当然也可以用作网络爬虫上,其 API 极其完善,功能非常强大。 而 Pyppeteer 又是什么呢?它实际上是 Puppeteer 的 Python 版本的实现,但他不是 Google 开发的,是一位来自于日本的工程师依据 Puppeteer 的一些功能开发出来的非官方版本。

      官方文档: https://miyakogi.github.io/pyppeteer/reference.html

      下载

      pip install pyppeteer
      

      打开网页并截图

      import asyncio
      from pyppeteer import launch
      async def main():
          browser = await launch()
          page = await browser.newPage()
          await page.goto('http://example.com')
          await page.screenshot({'path': 'example.png'})
          await browser.close()
      asyncio.get_event_loop().run_until_complete(main())
      

      评估页面上的脚本

      import asyncio
      from pyppeteer import launch
      async def main():
          browser = await launch()
          page = await browser.newPage()
          await page.goto('http://example.com')
          await page.screenshot({'path': 'example.png'})
      
          dimensions = await page.evaluate('''() => {
              return {
                  width: document.documentElement.clientWidth,
                  height: document.documentElement.clientHeight,
                  deviceScaleFactor: window.devicePixelRatio,
              }
          }''')
          print(dimensions)
          # >>> {'width': 800, 'height': 600, 'deviceScaleFactor': 1}
          await browser.close()
      asyncio.get_event_loop().run_until_complete(main())
      

      关键字参数的选项

      {'headless': True} # 默认为True无头
      {'headless': False} # 改为False变成有头
      browser = await launch({'headless': False})
      

      选择器

      Page.querySelector()  # CSS选择器
      Page.querySelectorAll()  # CSS选择器选所有
      Page.xpath()  # xpath选择器
      

      参数Page.evaluate()和Page.querySelectorEval()

      添加force_expr=True选项,这会强制pyppeteer将字符串视为表达式。

      获取页面内容的示例:

      content = await page.evaluate('document.body.textContent', force_expr=True)
      import asyncio
      from pyppeteer import launch
      async def main():
          browser = await launch({'headless': False})
          page = await browser.newPage()
          await page.goto('https://www.cnblogs.com/guyouyin123/p/12669430.html#selenium%E9%80%89%E6%8B%A9%E5%99%A8%E9%80%89%E6%8B%A9')
          content = await page.evaluate('document.body.textContent', force_expr=True)
          print(content)
          await browser.close()
      asyncio.get_event_loop().run_until_complete(main())
      

      获取元素内部文本的示例:

      element = await page.querySelector('h1')
      title = await page.evaluate('(element) => element.textContent', element)
      

      基础用法

      import asyncio
      from pyppeteer import launch
      async def main():
          # headless参数设为False,则变成有头模式
          # Pyppeteer支持字典和关键字传参,Puppeteer只支持字典传参
          # 指定引擎路径
          # exepath = r'C:\Users\Administrator\AppData\Local\pyppeteer\pyppeteer\local-chromium\575458\chrome-win32/chrome.exe'
          # browser = await launch({'executablePath': exepath, 'headless': False, 'slowMo': 30})
          browser = await launch(
              # headless=False,
              {'headless': False}
          )
          page = await browser.newPage()
          # 设置页面视图大小
          await page.setViewport(viewport={'width': 1280, 'height': 800})
          # 是否启用JS,enabled设为False,则无渲染效果
          await page.setJavaScriptEnabled(enabled=True)
      	# 超时间见 1000 毫秒
          res = await page.goto('https://www.toutiao.com/', options={'timeout': 1000})
          resp_headers = res.headers  # 响应头
          resp_status = res.status  # 响应状态
          # 等待
          await asyncio.sleep(2)
          # 第二种方法,在while循环里强行查询某元素进行等待
          while not await page.querySelector('.t'):
              pass
          # 滚动到页面底部
          await page.evaluate('window.scrollBy(0, document.body.scrollHeight)')
          await asyncio.sleep(2)
          # 截图 保存图片
          await page.screenshot({'path': 'toutiao.png'})
          # 打印页面cookies
          print(await page.cookies())
          """  打印页面文本 """
          # 获取所有 html 内容
          print(await page.content())
          # 在网页上执行js 脚本
          dimensions = await page.evaluate(pageFunction='''() => {
                  return {
                      width: document.documentElement.clientWidth,  // 页面宽度
                      height: document.documentElement.clientHeight,  // 页面高度
                      deviceScaleFactor: window.devicePixelRatio,  // 像素比 1.0000000149011612
                  }
              }''', force_expr=False)  # force_expr=False  执行的是函数
          print(dimensions)
          #  只获取文本  执行 js 脚本  force_expr  为 True 则执行的是表达式
          content = await page.evaluate(pageFunction='document.body.textContent', force_expr=True)
          print(content)
          # 打印当前页标题
          print(await page.title())
          # 抓取新闻内容  可以使用 xpath 表达式
          """
          # Pyppeteer 三种解析方式
          Page.querySelector()  # 选择器
          Page.querySelectorAll()
          Page.xpath()  # xpath  表达式
          # 简写方式为:
          Page.J(), Page.JJ(), and Page.Jx()
          """
          element = await page.querySelector(".feed-infinite-wrapper > ul>li")  # 纸抓取一个
          print(element)
          # 获取所有文本内容  执行 js
          content = await page.evaluate('(element) => element.textContent', element)
          print(content)
          # elements = await page.xpath('//div[@class="title-box"]/a')
          elements = await page.querySelectorAll(".title-box a")
          for item in elements:
              print(await item.getProperty('textContent'))
              # <pyppeteer.execution_context.JSHandle object at 0x000002220E7FE518>
              # 获取文本
              title_str = await (await item.getProperty('textContent')).jsonValue()
              # 获取链接
              title_link = await (await item.getProperty('href')).jsonValue()
              print(title_str)
              print(title_link)
          # 关闭浏览器
          await browser.close()
      asyncio.get_event_loop().run_until_complete(main())
      import asyncio
      import pyppeteer
      from collections import namedtuple
      Response = namedtuple("rs", "title url html cookies headers history status")
      async def get_html(url):
          browser = await pyppeteer.launch(headless=True, args=['--no-sandbox'])
          page = await  browser.newPage()
          res = await page.goto(url, options={'timeout': 3000})
          data = await page.content()
          title = await page.title()
          resp_cookies = await page.cookies()  # cookie
          resp_headers = res.headers  # 响应头
          resp_status = res.status  # 响应状态
          print(data)
          print(title)
          print(resp_headers)
          print(resp_status)
          return title
      if __name__ == '__main__':
          url_list = ["https://www.toutiao.com/",
                      "http://jandan.net/ooxx/page-8#comments",
                      "https://www.12306.cn/index/"
                     ]
          task = [get_html(url) for url in url_list]
          loop = asyncio.get_event_loop()
          results = loop.run_until_complete(asyncio.gather(*task))
          for res in results:
              print(res)
      headers = {'date': 'Sun, 28 Apr 2019 06:50:20 GMT',
                 'server': 'Cmcc',
                 'x-frame-options': 'SAMEORIGIN\nSAMEORIGIN',
                 'last-modified': 'Fri, 26 Apr 2019 09:58:09 GMT',
                 'accept-ranges': 'bytes',
                 'cache-control': 'max-age=43200',
                 'expires': 'Sun, 28 Apr 2019 18:50:20 GMT',
                 'vary': 'Accept-Encoding,User-Agent',
                 'content-encoding': 'gzip',
                 'content-length': '19823',
                 'content-type': 'text/html',
                 'connection': 'Keep-alive',
                 'via': '1.1 ID-0314217270751344 uproxy-17'}
      

      模拟输入

          # 模拟输入 账号密码  {'delay': rand_int()} 为输入时间
          await page.type('#TPL_username_1', "sadfasdfasdf")
          await page.type('#TPL_password_1', "123456789", )
          await page.waitFor(1000)
          await page.click("#J_SubmitStatic")
      

      使用 tkinter 获取页面高度 宽度

      def screen_size():
          """使用tkinter获取屏幕大小"""
          import tkinter
          tk = tkinter.Tk()
          width = tk.winfo_screenwidth()
          height = tk.winfo_screenheight()
          tk.quit()
          return width, height
      

      爬取京东商城

      import requests
      from bs4 import BeautifulSoup
      from pyppeteer import launch
      import asyncio
      def screen_size():
          """使用tkinter获取屏幕大小"""
          import tkinter
          tk = tkinter.Tk()
          width = tk.winfo_screenwidth()
          height = tk.winfo_screenheight()
          tk.quit()
          return width, height
      async def main(url):
          # browser = await launch({'headless': False, 'args': ['--no-sandbox'], })
          browser = await launch({'args': ['--no-sandbox'], })
          page = await browser.newPage()
          width, height = screen_size()
          await page.setViewport(viewport={"width": width, "height": height})
          await page.setJavaScriptEnabled(enabled=True)
          await page.setUserAgent(
              'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299')
          await page.goto(url)
          # await asyncio.sleep(2)
          await page.evaluate('window.scrollBy(0, document.body.scrollHeight)')
          await asyncio.sleep(1)
          # content = await page.content()
          li_list = await page.xpath('//*[@id="J_goodsList"]/ul/li')
          # print(li_list)
          item_list = []
          for li in li_list:
              a = await li.xpath('.//div[@class="p-img"]/a')
              detail_url = await (await a[0].getProperty("href")).jsonValue()
              promo_words = await (await a[0].getProperty("title")).jsonValue()
              a_ = await li.xpath('.//div[@class="p-commit"]/strong/a')
              p_commit = await (await a_[0].getProperty("textContent")).jsonValue()
              i = await li.xpath('./div/div[3]/strong/i')
              price = await (await i[0].getProperty("textContent")).jsonValue()
              em = await li.xpath('./div/div[4]/a/em')
              title = await (await em[0].getProperty("textContent")).jsonValue()
              item = {
                  "title": title,
                  "detail_url": detail_url,
                  "promo_words": promo_words,
                  'p_commit': p_commit,
                  'price': price
              }
              item_list.append(item)
              # print(item)
              # break
          # print(content)
          await page_close(browser)
          return item_list
      async def page_close(browser):
          for _page in await browser.pages():
              await _page.close()
          await browser.close()
      msg = "手机"
      url = "https://search.jd.com/Search?keyword={}&enc=utf-8&qrst=1&rt=1&stop=1&vt=2&wq={}&cid2=653&cid3=655&page={}"
      task_list = []
      for i in range(1, 6):
          page = i * 2 - 1
          url = url.format(msg, msg, page)
          task_list.append(main(url))
      loop = asyncio.get_event_loop()
      results = loop.run_until_complete(asyncio.gather(*task_list))
      # print(results, len(results))
      for i in results:
          print(i, len(i))
      # soup = BeautifulSoup(content, 'lxml')
      # div = soup.find('div', id='J_goodsList')
      # for i, li in enumerate(div.find_all('li', class_='gl-item')):
      #     if li.select('.p-img a'):
      #         print(li.select('.p-img a')[0]['href'], i)
      #         print(li.select('.p-price i')[0].get_text(), i)
      #         print(li.select('.p-name em')[0].text, i)
      #     else:
      #         print("#" * 200)
      #         print(li)
      

      爬取淘宝网

      taobao.py

      import asyncio
      import time
      from pyppeteer.launcher import launch
      from alifunc import mouse_slide, input_time_random
      from exe_js import js1, js3, js4, js5
      def screen_size():
          """使用tkinter获取屏幕大小"""
          import tkinter
          tk = tkinter.Tk()
          width = tk.winfo_screenwidth()
          height = tk.winfo_screenheight()
          tk.quit()
          return width, height
      async def main(username, pwd, url):
          browser = await launch({'headless': False, 'args': ['--no-sandbox'], }, userDataDir='./userdata',
                                 args=['--window-size=1366,768'])
          page = await browser.newPage()
          width, height = screen_size()
          await page.setViewport(viewport={"width": width, "height": height})
          await page.setUserAgent(
              'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299')
          await page.goto(url)
          await page.evaluate(js1)
          await page.evaluate(js3)
          await page.evaluate(js4)
          await page.evaluate(js5)
          pwd_login = await page.querySelector('.J_Quick2Static')
          # print(await (await pwd_login.getProperty('textContent')).jsonValue())
          await pwd_login.click()
          await page.type('#TPL_username_1', username, {'delay': input_time_random() - 50})
          await page.type('#TPL_password_1', pwd, {'delay': input_time_random()})
          await page.screenshot({'path': './headless-test-result.png'})
          time.sleep(2)
          slider = await page.Jeval('#nocaptcha', 'node => node.style')  # 是否有滑块
          if slider:
              print('出现滑块情况判定')
              await page.screenshot({'path': './headless-login-slide.png'})
              flag = await mouse_slide(page=page)
              if flag:
                  print(page.url)
                  await page.keyboard.press('Enter')
                  await get_cookie(page)
          else:
              await page.keyboard.press('Enter')
              await page.waitFor(20)
              await page.waitForNavigation()
              try:
                  global error
                  error = await page.Jeval('.error', 'node => node.textContent')
              except Exception as e:
                  error = None
                  print(e, "错啦")
              finally:
                  if error:
                      print('确保账户安全重新入输入')
                  else:
                      print(page.url)
                      # 可继续网页跳转 已经携带 cookie
                      # await get_search(page)
                      await get_cookie(page)
          await page_close(browser)
      async def page_close(browser):
          for _page in await browser.pages():
              await _page.close()
          await browser.close()
      async def get_search(page):
          # https://s.taobao.com/search?q={查询的条件}&p4ppushleft=1%2C48&s={每页 44 条 第一页 0 第二页 44}&sort=sale-desc
          await page.goto("https://s.taobao.com/search?q=气球")
          await asyncio.sleep(5)
          # print(await page.content())
      # 获取登录后cookie  
      async def get_cookie(page):
          res = await page.content()
          cookies_list = await page.cookies()
          cookies = ''
          for cookie in cookies_list:
              str_cookie = '{0}={1};'
              str_cookie = str_cookie.format(cookie.get('name'), cookie.get('value'))
              cookies += str_cookie
          print(cookies)
          # 将cookie 放入 cookie 池 以便多次请求 封账号 利用cookie 对搜索内容进行爬取
          return cookies
      if __name__ == '__main__':
          username = 'username'
          pwd = 'password'
          url = "https://login.taobao.com/member/login.jhtml?spm=a21bo.2017.754894437.1.5af911d9qqVAb1&f=top&redirectURL=https%3A%2F%2Fwww.taobao.com%2F"
          loop = asyncio.get_event_loop()
          loop.run_until_complete(main(username, pwd, url))

      exe_js.py

      js1 = '''() =>{
                 Object.defineProperties(navigator,{
                   webdriver:{
                     get: () => false
                   }
                 })
              }'''
      
      js2 = '''() => {
              alert (
                  window.navigator.webdriver
              )
          }'''
      js3 = '''() => {
              window.navigator.chrome = {
          runtime: {},
          // etc.
        };
          }'''
      js4 = '''() =>{
      Object.defineProperty(navigator, 'languages', {
            get: () => ['en-US', 'en']
          });
              }'''
      js5 = '''() =>{
      Object.defineProperty(navigator, 'plugins', {
          get: () => [1, 2, 3, 4, 5,6],
        });
              }'''
      

      alifunc.py

      from retrying import retry  # 错误自动重试
      import time, asyncio, random
      def retry_if_result_none(result):
          return result is None
      @retry(retry_on_result=retry_if_result_none, )
      async def mouse_slide(page=None):
          await asyncio.sleep(3)
          try:
              await page.hover('#nc_1_n1z')
              await page.mouse.down()
              await page.mouse.move(2000, 0, {'delay': random.randint(1000, 2000)})
              await page.mouse.up()
          except Exception as e:
              print(e, '     :slide login False')
              return None
          else:
              await asyncio.sleep(3)
              slider_again = await page.Jeval('.nc-lang-cnt', 'node => node.textContent')
              if slider_again != '验证通过':
                  return None
              else:
                  await page.screenshot({'path': './headless-slide-result.png'})
                  print('验证通过')
                  return 1
      def input_time_random():
          return random.randint(100, 151)
      

      利用获取到的cookie 爬取搜索内容

      import json
      import requests
      import re
      # 设置 cookie 池 随机发送请求 通过 pyppeteer 获取 cookie
      cookie = '_tb_token_=edd7e354dee53;t=fed8f4ca1946ca1e73223cfae04bc589;sg=20f;cna=2uJSFdQGmDMCAbfFWXWAC4Jv;cookie2=1db6cd63ad358170ea13319f7a862c33;_l_g_=Ug%3D%3D;v=0;unb=3150916610;skt=49cbfd5e01d1b550;cookie1=BxVRmD3sh19TaAU6lH88bHw5oq%2BgcAGcRe229Hj5DTA%3D;csg=cf45a9e2;uc3=vt3=F8dByEazRMnQZDe%2F9qI%3D&id2=UNGTqfZ61Z3rsA%3D%3D&nk2=oicxO%2BHX4Pg%3D&lg2=U%2BGCWk%2F75gdr5Q%3D%3D;existShop=MTU1Njg3MDM3MA%3D%3D;tracknick=%5Cu7433150322;lgc=%5Cu7433150322;_cc_=V32FPkk%2Fhw%3D%3D;mt=ci=86_1;dnk=%5Cu7433150322;_nk_=%5Cu7433150322;cookie17=UNGTqfZ61Z3rsA%3D%3D;tg=0;enc=tThHs6Sn3BAl8v1fu3J4tMpgzA1n%2BLzxjib0vDAtGsXJCb4hqQZ7Z9fHIzsN0WghdcKEsoeKz6mBwPUpyzLOZw%3D%3D;JSESSIONID=B3F383B3467EC60F8CA425935232D395;l=bBMspAhrveV5732DBOCanurza77OSIRYYuPzaNbMi_5pm6T_G4QOlC03xF96VjfRswYBqh6Mygv9-etuZ;hng=CN%7Czh-CN%7CCNY%7C156;isg=BLi41Q8PENDal3xUVsA-aPbfiWaKiRzB6vcTu_IpBPOmDVj3mjHsO86vxUQYW9SD;uc1=cookie16=W5iHLLyFPlMGbLDwA%2BdvAGZqLg%3D%3D&cookie21=W5iHLLyFeYZ1WM9hVnmS&cookie15=UIHiLt3xD8xYTw%3D%3D&existShop=false&pas=0&cookie14=UoTZ4ttqLhxJww%3D%3D&tag=8&lng=zh_CN;thw=cn;x=e%3D1%26p%3D*%26s%3D0%26c%3D0%26f%3D0%26g%3D0%26t%3D0;swfstore=34617;'
      headers = {
          'cookie': cookie,
          "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36"
      }
      rep = requests.get('https://s.taobao.com/search?q=手机&p4ppushleft=1%2C48&s=0&sort=sale-desc ', headers=headers)
      rep.encoding = 'utf-8'
      res = rep.text
      print(res)
      r = re.compile(r'g_page_config = (.*?)g_srp_loadCss', re.S)
      res = r.findall(res)
      data = res[0].strip().rstrip(';')
      dic_data = json.loads(data)
      auctions = dic_data.get('mods')['itemlist']['data']['auctions']
      # print(auctions,len(auctions))
      for item in auctions[1:]:
          print(item)
          break

      针对iframe 的操作

      page.frames 获取所有的 iframe 列表 需要判断操作的是哪一个 iframe 跟操作 page 一样操作

      from pyppeteer import launch
      import asyncio
      async def main(url):
          w = await launch({'headless': False, 'args': ['--no-sandbox'], })
          page = await w.newPage()
          await page.setViewport({"width": 1366, 'height': 800})
          await page.goto(url)
          try:
              await asyncio.sleep(1)
              frame = page.frames
              print(frame)  # 需要找到是哪一个 frame
              title = await frame[1].title()
              print(title)
              await asyncio.sleep(1)
              login = await frame[1].querySelector('#switcher_plogin')
              print(login)
              await login.click()
              await asyncio.sleep(20)
          except Exception as e:
              print(e, "EEEEEEEEE")
          for _page in await w.pages():
              await _page.close()
          await w.close()
      asyncio.get_event_loop().run_until_complete(main("https://i.qq.com/?rd=1"))
      # asyncio.get_event_loop().run_until_complete(main("https://www.gushici.com/"))

      未完传送门:pyppeteer执行js绕过webdriver监测方法下

      以上就是pyppeteer执行js绕过webdriver监测方法上的详细内容,更多关于pyppeteer执行js绕过webdriver监测的资料请关注其它相关文章!

      在线客服
      服务热线

      服务热线

      4008888355

      微信咨询
      二维码
      返回顶部
      ×二维码

      截屏,微信识别二维码

      打开微信

      微信号已复制,请打开微信添加咨询详情!