社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

记两件「会门脚本语言真香」的小事 | Python 主题月

coder-pig • 4 年前 • 394 次点击  
阅读 17

记两件「会门脚本语言真香」的小事 | Python 主题月

本文正在参加「Python主题月」,详情查看 活动链接

这两件小事发生在几周前了,一直想记录下,却又一直搁置,今天抽点时间写一下~

小事一

① 起因

最近两周公司项目暂时没啥大的新需求,有时间整下技术规划,跟组长商议后决定:对APP做下 启动优化,就是:应用启动、页面加载提速

说到页面加载提速,那么问题来了 → 页面那么多,怎么知道有哪些页面需要优化呢?

公司APP的设计是单Activity多Fragment,那有多少Fragment呢,这时会门脚本语言的好处就来了,直接写个脚本递归遍历文件夹统计下就知道了:

import os


def search_all_fragment(path):
    os.chdir(path)
    items = os.listdir(os.curdir)
    for item in items:
        path = os.path.join(item)
        # 获取路径分割后的最后部分,即文件名
        path_split_last = path.split(os.path.sep)[-1]
        # 判断是否为目录,是往下递归
        if os.path.isdir(path):
            print("[-]", path)
            search_all_fragment(path)
            os.chdir(os.pardir)
        # 因为项目里用到了ARouter会生成对应的Fragment,不在统计范畴中,要过滤掉
        elif 'Fragment' in path_split_last and '$$' not in path_split_last:
            print("[!]", path)
            # 只保存.java和.kt的文件
            if path_split_last.endswith('.java') or path_split_last.endswith('.kt'):
                result_list.append(path)
        else:
            print('[+]', path)


if __name__ == '__main__':
    result_list = []
    search_all_fragment(r'项目文件路径')
    print(result_list)
    print('共有Fragment:', len(result_list), "个")
复制代码

脚本一跑就知有没有:

行吧,一共412个Fragment,用脚本秀了一波,回归正题,怎么知道哪些页面需要优化。

巧了,刚好自家全埋点上有做 渲染时间埋点 的日志上报,且不关心具体的实现方案是否靠谱,开打Kibana,输入下筛选条件(仅查看这类Event),部分日志如下:

② 是页面的路径,取最后的Fragment就好,③ renderCost,这里就是渲染时间了,拿到这两个数据就行。

接着,怎么分析这些日志得出有哪些待优化页面?拍拍脑门跟组长定了个不怎么靠谱的方案:

统计所有Fragment的渲染时间,计算平均数,然后从长到短排序,优先优化渲染时间长的页面。

早上讨论完,看了下过去一年渲染类型的日志共有 53068377 条记录,下午去看病的路上就开始构思该怎么搞了,三条思路:

  • 1、Kibana支持保存查询结果导出成csv的,直接导出csv,调下csv库或pandas读取解析一波;
  • 2、有数据库权限的话,直接查询出所有日志,导出json或scv;
  • 3、抓包或模拟访问抓取数据保存到本地,然后再做批处理;

事实证明留多点后手是没错的,前两个思路+第三个思路的前半段都GG了,听我娓娓道来~

② 思路一 ×

第二天一早到公司,准备导出csv,看别人发的教程很简单,就三步:

输入查询条件查询 → 得出查询结果后Save → 生成CSV

但是,我死活都找不到Save,故生成CSV的按钮一直是灰的(点不了):

em...难道是权限的问题?换上了组长给的有查数据库权限的账号,一样不醒,难道是要 开启这样的配置 ,点击Kibana的设置,各种没权限,于是去找后台大佬,得到的回复是:

不能开这个,导几百条几千条还好,导个几百万条服务器顶不住直接就挂了,有风险,所以把这个功能禁用了。

思路一惨遭滑铁卢...


③ 思路二 ×

上思路二,有数据库权限的账号,执行查询语句后走脚本导出,写两句简单SQL条件查询语句还不是手到擒来!

现实跟我的认知出了点 偏差,原来这个数据库权限只是:可以用Kibana的Dev Tools,在上面拼接json字符串查询而已:

em...也不是不能用,抓包发现bool块的数据于筛选日志请求提交的数据相同,复制粘贴一波,然后把size改成1000000,执行下康康:

em...返回了错误信息,大概意思是说一次最多只能查10000条,如果一次查更多只能去改配置。

卧槽,直接裂开,我还想着复制粘贴保存下Json还好,5306w条数据啊,手动复制粘贴,来算算要多久:

  • 修改查询条件(起始日期时间和结束日期时间) → 10s
  • 点击查询等待查询结果显示 → 20s以内
  • 新建文件,复制粘贴,保存输入文件名 → 30s

每存1w条数据,我要花至少1分钟,换算成时,获取完这些数据要多少小时:5306/60≈88.5h,换算成标准工作日(8h),需要11天多一点,这还没算休息时间呢,要 花两个多星期 重复做这样重复的事,任谁也顶不住啊!!!

方案二也跪了...


④ 方案三 × √

em...抓下包?分析参数,然后写爬虫抓下,抓了几次请求后我就放弃了,Cookie里有个sid每请求一次变一次:

而且跟响应头Set-Cookie返回的不一样,短时间内捣鼓怎么构造的显然不可能,唉只能上看上去最low的模拟用户访问浏览器了。

Tips:后面闲下来发现,Set-Cookie返回的只是Cookies里的一部分,登录后拿到cookies,然后自己每次请求后替换这部分就好~

把模拟步骤拆分下:

  • 1、打开登录页 → 等待加载完 → 填充账号密码 → 点击登录
  • 2、等待页面跳转主页加载完 → 点击左侧Dev Tools图标
  • 3、等待页面加载完 → 清空左侧查询Json → 填充新的查询Json
  • 4、点击发送请求 → 等待右侧查询结果 → 选中查询结果 → 保存到本地文件

怎么方便怎么来,笔者直接把查询结果存txt里了,模拟访问用Selenium,直接开搞:

import time

from selenium import webdriver

base_url = 'http://kibana.xxx.xxx'
login_url = base_url + '/login'
login_data = {
    'password': 'xxx',
    'username': 'xxx',
}


# 初始化浏览器
def init_browser():
    chrome_options = webdriver.ChromeOptions()
    chrome_options.add_argument(r'--start-maximized')    # 开始就全屏
    return webdriver.Chrome(options=chrome_options)


# 模拟登录
def login():
    browser.get(login_url)
    time.sleep(10)
    inputs = browser.find_elements_by_class_name('euiFieldText')
    inputs[0].send_keys(login_data['username'])
    inputs[1].send_keys(login_data['password'])
    submit = browser.find_element_by_xpath('//button[1]')
    submit.click()


if __name__ == '__main__':
    browser = init_browser()
    login()
复制代码

用过selenium的朋友可能或说:添加下述配置设置下用户数据目录,下次打开浏览器访问处于登录态,就不用重新登录了:

chrome_options.add_argument(r'--user-data-dir=D:\ChromeUserData')   
复制代码

但实际的情况是设置了没用,还是跳转了登录页,我也不知道为什么,索性每次跑脚本都登录下吧...

登录成功后,稍待片刻会跳转到主页,等待加载完毕,点击左侧这个图标,这没有采用显式或隐式等待的方式,而是笨方法休眠死等~

login()函数最后调下下面这个方法:

# 访问主页点击Tab
def click_tab():
    time.sleep(8)   # 假死等待页面加载完毕
    browser.find_element_by_xpath('//ul[3]/li[2]').click()
复制代码

接着到输入区域写入文字:

Elements定位到目标位置:

卧槽,好像有点难搞啊,不是普通的文本输入框,获取外层ace_content的div,尝试send_keys:

def set_left_text():
    inputs = browser.find_element_by_xpath('//div[@class="ace_content"]')
    inputs.send_keys('测试文本')
复制代码

果然报错:

不能直接设置文本就只能另辟蹊径了,心生一计

点击最后的游标,然后一直按backspace键清空,接着模拟键盘输入一个个字母敲进去

改动后的代码

# 设置左侧文字
def set_left_text():
    time.sleep(5)
    cursor_div = browser.find_element_by_xpath('//div[@class="ace_cursor"]')
    cursor_div.click()
    action_chains = ActionChains(browser)
    for i in range(0, 500):
        action_chains.context_click(cursor_div).send_keys(Keys.BACKSPACE).perform()
    action_chains.context_click(cursor_div).send_keys('GET _search' + str(search_dict)).perform()
复制代码

清空后输入,有点鬼畜:

这当中其实做了很多无效操作,按回退键500下,实际上字符没那么多,还有得等它把字敲完,得办法改进下。

又心生一计粘贴复制,实现起来就是:

往剪切板写入本次要查询的字符串 → 点击游标或内容获得焦点 → Ctrl+A全选内容 → 回退 → Ctrl+V粘贴内容

代码实现一波:

def set_left_text():
    time.sleep(5)
    input_div = browser.find_element_by_xpath('//div[@class="ace_content"]')
    input_div.click()
    action = ActionChains(browser)
    action.key_down(Keys.CONTROL).key_down('a').key_up('a').key_up(Keys.CONTROL).perform()
    action.key_down(Keys.BACKSPACE).key_up(Keys.BACKSPACE).perform()
    action.key_down(Keys.CONTROL).key_down('v').key_up('v').key_up(Keys.CONTROL).perform()
复制代码

看看效果:

可以的,模拟点击运行的小按钮了:

# 点击查询按钮
def click_submit():
    submit_button = browser.find_element_by_xpath('//button[@data-test-subj="sendRequestButton"]')
    submit_button.click()
复制代码

接着到右侧查询结果,直接处理有些麻烦,获取内容结点,递归遍历所有子节点,提取文本去空格换行等,最后拼接输出。

又又心生一计

能不能拦截selenium浏览器接收的请求,对特定请求,直接拿响应结果写入

还真可以,通过中间人代理的方式,此处使用 browsermob-proxy,下载完把库拷贝到项目中:

# 开启代理
server = Server(os.path.join(os.getcwd(), r'browsermob-proxy-2.1.4\bin\browsermob-proxy'))
server.start()
proxy = server.create_proxy()

# chrome加下配置
chrome_options.add_argument('--proxy-server={0}'.format(proxy.proxy))

# 抓包前:
proxy.new_har(options={
    'captureContent': True,
    'captureHeaders': True
})

# 抓包后过滤特定请求,并把内容保存到本地文件中:
def save_log(date_str, index):
    for entry in proxy.har['log']['entries']:
        if entry['request']['url'].endswith('path=_search&method=GET'):
            log_file_path = os.path.join(out_dir, date_str + '_' + str(index) + '.txt')
            with open(log_file_path, "w+", encoding='utf-8') as f:
                f.write(str(entry['response']['content'])
                    .replace("\n", '').replace("\\n", "").replace(' ', ''))
            print("日期日志保存完毕:", log_file_path)
复制代码

呕吼,完美,接着补齐剪贴板写入,以及查询日期的构造了:

def set_copy_text(content):
    w.OpenClipboard()
    w.EmptyClipboard()
    w.SetClipboardData(win32con.CF_UNICODETEXT, content)
    w.CloseClipboard()

# 构造生成一个从20200709到今天的日期
def init_date_list(begin_date, end_date):
    date_list = []
    begin_date = datetime.datetime.strptime(begin_date, "%Y%m%d")
    end_date = datetime.datetime.strptime(end_date, "%Y%m%d")
    while begin_date <= end_date:
        date_str = begin_date.strftime("%Y-%m-%d")
        date_list.append(date_str)
        begin_date += datetime.timedelta(days=1)
    return date_list
复制代码

最后,就是每次请求时更新请求参数写入剪贴板,打开代理抓包:

def input_query_content():
    try:
        for pos, date in enumerate(str_date_list[]):
            for index in range(1, 3):
                input_div = browser.find_element_by_xpath('//div[@class="ace_content"]')
                input_div.click()
                action = ActionChains(browser)
                print(str(pos + 1) + "、请求日期:" + date + "-" + ("上半天" if (index == 1) else "下半天"))
                update_dict_and_proxy(date, index)
                action.key_down(Keys.CONTROL).key_down('a').key_up('a').key_up(Keys.CONTROL).perform()
                set_copy_text('GET _search' + '\n' + str(search_dict).replace("'", '"'))
                time.sleep(1)
                action.key_down(Keys.BACKSPACE).key_up(Keys.BACKSPACE).perform()
                action.key_down(Keys.CONTROL).key_down('v').key_up('v').key_up(Keys.CONTROL).perform()
                submit_button = browser.find_element_by_xpath('//button[@data-test-subj="sendRequestButton"]')
                submit_button.click()
                time.sleep(20)
                save_log(date, index)
    except Exception as e:
        print(e)
        proxy.close()
        browser.close()

# 更新请求字典及新建抓包
def update_dict_and_proxy(date_str, index):
    gte_str = date_str + 'T00:00:00.000Z' if (index == 1) else date_str + 'T12:00:00.000Z'
    lte_str = date_str + 'T12:00:01.000Z' if (index == 1) else date_str + 'T23:59:59.000Z'
    search_dict['query']['bool']['filter'][20]['range']['time']['gte'] = gte_str
    search_dict['query']['bool']['filter'][21]['range']['time']['lte'] = lte_str
    proxy.new_har(options={
        'captureContent': True,
        'captureHeaders': True
    })
复制代码

脚本一跑,就可以开始挂机了,建议找一台空闲的电脑挂着,因为脚本会 占用剪切板,会影响正常工作哦!另外,这里把查询时间划分成上跟下,是想尽可能多的查询到所需数据。

会门脚本语言真香啊!把原本的工作委托给了自动化脚本,效率也高一倍,2w条数据只要1分钟,采集完所有数据的耗时骤降至少44个小时,机器还能24小时跑,所以其实只需要两天,还不影响我做 工(mo)作(yu),当然还可以再优化,将脚本部署到多台机子上同时执行,一减再减少,本来两个多星期的活,一天不到干完。还不香?

更幸运的是,实际上有效数据只有600w条,原来渲染事件的埋点是前年10月份才加的,所以单机跑了5个钟就把数据爬完了。

早上写脚本,下午就跑完,中途把统计脚本也写下,这一Part就很Easy了,正则表达式 yyds

data_pattern = re.compile('pagePath":"(.*?)".*?"renderCost","value":(.*?)}', re.S)
复制代码

读取文件内容,全文匹配,遍历匹配结果,依次对两个分组做处理,然后把渲染时间写入页面.txt文件中:

for log in log_list:
    print("解析文件:", log)
    with 


    
open(log, 'r', encoding='utf8') as f:
        content = f.read()
    data_result = data_pattern.findall(content)
    if data_result is not None:
        for data in data_result:
            page_name = ''
            page_render_time = 0
            page_split = data[0].split('-')
            if page_split is not None and len(page_split) > 0:
                other_page_split = page_split[-1].split(",")
                if other_page_split is not None and len(other_page_split) > 0:
                    page_name = other_page_split[-1]
                else:
                    page_name = page_split[-1]
            else:
                other_page_split = data[0].split(",")
                if other_page_split is not None and len(other_page_split) > 0:
                    page_name = other_page_split[-1]
                else:
                    page_name = data[0]
            page_render_time = data[1].replace('"', '')
            if page_name == 'i':
                print(data)
                break
            cp_utils.write_str_data(page_render_time, os.path.join(page_dir, page_name + ".txt"))
复制代码

写入示例如下:

再接着又是遍历文件夹子,字典存储数据 (页面:平均值),保存统计数据:

def average_time(file_path):
    page_dir_split = file_path.split(os.path.sep)
    if page_dir_split is not None and len(page_dir_split) > 0:
        result_average = 0
        render_time_list = cp_utils.load_list_from_file(file_path)
        for render_time in render_time_list:
            if render_time != '0':
                result_average = int((result_average + int(render_time)) / 2)
        print(page_dir_split[-1] + "结果计算完毕...")
        cp_utils.write_str_data(page_dir_split[-1] + "-" + str(result_average), result_file)
    else:
        print("异常退出")

复制代码

最后按照倒序输出到文件中:

def order_list(file_path):
    time_info_list = cp_utils.load_list_from_file(file_path)
    order_list_result = sorted(time_info_list, key=lambda x: int(x.split('-')[-1]), reverse=True)
    cp_utils.write_list_data(order_list_result, result_order_file)
复制代码

这样就可以得到页面的平均渲染时间了~当然,后面觉得看这个平均数不靠谱,因为影响变量太多了:

设备硬件不同,加载速度肯定是有差距的,有些用户活跃,有些不活跃,还有版本等等...

不靠谱但又想依赖这个全埋点的数据做点什么,想了想又定了另外一个方案:

按照页面使用频度排序,优先针对用户常用的页面进行优化,比如这个版本优化2个常用页面,挑几个典型特定设备进行跟踪,发版一段时间后那这两个页面的新数据跟旧数据做对比,就可以对优化的收益做量化了~

当然,这些是后话了,不敢想象,如果我不是Python玩得还可以的话,该怎么解决这些问题...


小事二

第二次小事比起第一件来说就小巫见大巫了,基友小A,让我帮忙给他搞点行业报告,一下子给了好几个网址,开头几个还好,就是模拟请求,解析页面,拿个ID啥的拼接,得出真实的PDF下载链接,然后下载。

后面的几个网站,就很鸡贼了,直接把PDF的每一页作为图片贴出来,如:

想把图片转成PDF,如果不会脚本语言,需要一张张图片右键保存到本地,最后用合成工具把图片合成成PDF。

不过巧了,我刚好 会点Python,所以这件事就变成了爬图片,找个图片转PDF的库了,找到个 img2pdf库,API简单,用着还行,不过如果图片有Alpha通道,会直接报错,所以需要自己去下,简单,用 pillow库 就可以做:

from PIL import Image, ImageFont, ImageDraw

# 批量对RGBA图片进行转换,同时删除无效文件
def remove_pic_alpha(pic):
    try:
        img = Image.open(pic)
        if img.mode == "RGBA":
            img = img.convert("RGB")
            img.save(pic)
            print("转换图片:", pic)
    except Exception as e:
        print("文件异常,移除:" + pic)
        os.remove(pic)
复制代码

简单转换代码如下:

import img2pdf

try:
    with open(pdf_path, "wb+") as f:
        f.write(img2pdf.convert(pic_path_list))
    print("输出PDF文件:", pdf_path)
except Exception as e:
    print("发生异常:", pdf_path)
复制代码

后面发现了一个大块头,851页,总共13950个有效的报告,有一些报告的页面结构不是纯图片,而是类似于:文字-图片-文字-图片这样,不想把文字漏掉,可以把它转换成图片,就是利用 pillow库,按照一定的规则,把文字绘制到一个白色的背景上。

def font2pic(content, pic_path):
    # 先转换为列表
    content_list = list(content)
    i = 30
    while i < len(content_list):
        content_list.insert(i, "\n")
        i += 30
    content_after = ''.join(content_list)
    im = Image.new("RGB", (960, 720), (255, 255, 255))
    dr = ImageDraw.Draw(im)
    font = ImageFont.truetype(os.path.join("fonts"


    
, "msyh.ttf"), 24)
    dr.text((50, 50), content_after, font=font, fill="#000000")
    im.save(pic_path)
复制代码

爬取处理页面是,还得记录顺序,并且把它作为图片名,一个爬取的临时文件示例如下:

然后就是遍历文件每一行,文字生成图片,图片链接执行下载(也可以批量下载后替换url),得出的PDF样例如下:

文本的渲染比较无脑,不是很美观,具体的渲染规则还得从长计议下,不过这些也是后话了,数据到手,你想怎么处理,都可以~


小结

当我们需要做一些重复性任务,且量比较大的时候,脚本的优势就出来了:只要程序足够稳健24小时不间断跑还不会累,把脚本部署到多台机子上,还可以缩短完成时间。当然,脚本是死的,人是活的,有些问题没考虑到,脚本跑到中途就挂了,所以大型任务还需要引入 告警及日志系统,以便及时跟进及对错误进行排查可以快速定位到问题。

脚本语言除了Python还有很多:Windows的.batLinux的.shC ShellJavaScriptLua 等。而笔者偏爱Python的原因主要还是因为它的 类库丰富,你能想到的基本都能找到对应的第三方库。

人生苦短,我用Python~

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/117390