社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

python:从文件中读取最后的'n'行[重复]

nac001 • 5 年前 • 2983 次点击  

我正在为一个web应用程序编写一个日志文件查看器,为此我想对日志文件的行进行分页。文件中的项是基于最新项的行。

所以我需要一个 tail() 可读取的方法 n 从底部开始并支持偏移的线。我想到的是这样的:

def tail(f, n, offset=0):
    """Reads a n lines from f with an offset of offset lines."""
    avg_line_length = 74
    to_read = n + offset
    while 1:
        try:
            f.seek(-(avg_line_length * to_read), 2)
        except IOError:
            # woops.  apparently file is smaller than what we want
            # to step back, go to the beginning instead
            f.seek(0)
        pos = f.tell()
        lines = f.read().splitlines()
        if len(lines) >= to_read or pos == 0:
            return lines[-to_read:offset and -offset or None]
        avg_line_length *= 1.3

这样做合理吗?使用偏移量跟踪日志文件的建议方法是什么?

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/43580
 
2983 次点击  
文章 [ 30 ]  |  最新文章 5 年前
Emilio
Reply   •   1 楼
Emilio    7 年前

将@papercrane解决方案更新为python3。 用打开文件 open(filename, 'rb') 及:

def tail(f, window=20):
    """Returns the last `window` lines of file `f` as a list.
    """
    if window == 0:
        return []

    BUFSIZ = 1024
    f.seek(0, 2)
    remaining_bytes = f.tell()
    size = window + 1
    block = -1
    data = []

    while size > 0 and remaining_bytes > 0:
        if remaining_bytes - BUFSIZ > 0:
            # Seek back one whole BUFSIZ
            f.seek(block * BUFSIZ, 2)
            # read BUFFER
            bunch = f.read(BUFSIZ)
        else:
            # file too small, start from beginning
            f.seek(0, 0)
            # only read what was not read
            bunch = f.read(remaining_bytes)

        bunch = bunch.decode('utf-8')
        data.insert(0, bunch)
        size -= bunch.count('\n')
        remaining_bytes -= BUFSIZ
        block -= 1

    return ''.join(data).splitlines()[-window:]
Marko
Reply   •   2 楼
Marko    13 年前

我发现上面的popen是最好的解决方案。它又快又脏而且有效 对于Unix机器上的Python2.6,我使用了以下命令

    def GetLastNLines(self, n, fileName):
    """
    Name:           Get LastNLines
    Description:        Gets last n lines using Unix tail
    Output:         returns last n lines of a file
    Keyword argument:
    n -- number of last lines to return
    filename -- Name of the file you need to tail into
    """
    p=subprocess.Popen(['tail','-n',str(n),self.__fileName], stdout=subprocess.PIPE)
    soutput,sinput=p.communicate()
    return soutput

soutput将包含最后n行代码。要逐行遍历soutput,请执行以下操作:

for line in GetLastNLines(50,'myfile.log').split('\n'):
    print line
Hauke Rehfeld
Reply   •   3 楼
Hauke Rehfeld    7 年前

一个更干净的python3兼容版本,不插入,但追加和反转:

def tail(f, window=1):
    """
    Returns the last `window` lines of file `f` as a list of bytes.
    """
    if window == 0:
        return b''
    BUFSIZE = 1024
    f.seek(0, 2)
    end = f.tell()
    nlines = window + 1
    data = []
    while nlines > 0 and end > 0:
        i = max(0, end - BUFSIZE)
        nread = min(end, BUFSIZE)

        f.seek(i)
        chunk = f.read(nread)
        data.append(chunk)
        nlines -= chunk.count(b'\n')
        end -= nread
    return b'\n'.join(b''.join(reversed(data)).splitlines()[-window:])

像这样使用:

with open(path, 'rb') as f:
    last_lines = tail(f, 3).decode('utf-8')
dimitri
Reply   •   4 楼
dimitri    13 年前

简单快速的MMAP解决方案:

import mmap
import os

def tail(filename, n):
    """Returns last n lines from the filename. No exception handling"""
    size = os.path.getsize(filename)
    with open(filename, "rb") as f:
        # for Windows the mmap parameters are different
        fm = mmap.mmap(f.fileno(), 0, mmap.MAP_SHARED, mmap.PROT_READ)
        try:
            for i in xrange(size - 1, -1, -1):
                if fm[i] == '\n':
                    n -= 1
                    if n == -1:
                        break
            return fm[i + 1 if i else 0:].splitlines()
        finally:
            fm.close()
Armin Ronacher
Reply   •   5 楼
Armin Ronacher    16 年前

我最后使用的代码。我认为这是目前为止最好的:

def tail(f, n, offset=None):
    """Reads a n lines from f with an offset of offset lines.  The return
    value is a tuple in the form ``(lines, has_more)`` where `has_more` is
    an indicator that is `True` if there are more lines in the file.
    """
    avg_line_length = 74
    to_read = n + (offset or 0)

    while 1:
        try:
            f.seek(-(avg_line_length * to_read), 2)
        except IOError:
            # woops.  apparently file is smaller than what we want
            # to step back, go to the beginning instead
            f.seek(0)
        pos = f.tell()
        lines = f.read().splitlines()
        if len(lines) >= to_read or pos == 0:
            return lines[-to_read:offset and -offset or None], \
                   len(lines) > to_read or pos > 0
        avg_line_length *= 1.3
borgr papercrane
Reply   •   6 楼
borgr papercrane    6 年前

上面S.lott的回答几乎对我有效,但最后给了我一部分线索。结果发现,它破坏块边界上的数据,因为数据以相反的顺序保存读取的块。调用“”联接(数据)时,块的顺序错误。这就解决了这个问题。

def tail(f, window=20):
    """
    Returns the last `window` lines of file `f` as a list.
    f - a byte file-like object
    """
    if window == 0:
        return []
    BUFSIZ = 1024
    f.seek(0, 2)
    bytes = f.tell()
    size = window + 1
    block = -1
    data = []
    while size > 0 and bytes > 0:
        if bytes - BUFSIZ > 0:
            # Seek back one whole BUFSIZ
            f.seek(block * BUFSIZ, 2)
            # read BUFFER
            data.insert(0, f.read(BUFSIZ))
        else:
            # file too small, start from begining
            f.seek(0,0)
            # only read what was not read
            data.insert(0, f.read(bytes))
        linesFound = data[0].count('\n')
        size -= linesFound
        bytes -= BUFSIZ
        block -= 1
    return ''.join(data).splitlines()[-window:]
glenbot
Reply   •   7 楼
glenbot    7 年前

这是我的答案。纯蟒蛇。使用时间看起来很快。在一个有100000行的日志文件中拖尾100行:

>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10)
0.0014600753784179688
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100)
0.00899195671081543
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=1000)
0.05842900276184082
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10000)
0.5394978523254395
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100000)
5.377126932144165

代码如下:

import os


def tail(f, lines=1, _buffer=4098):
    """Tail a file and get X lines from the end"""
    # place holder for the lines found
    lines_found = []

    # block counter will be multiplied by buffer
    # to get the block size from the end
    block_counter = -1

    # loop until we find X lines
    while len(lines_found) < lines:
        try:
            f.seek(block_counter * _buffer, os.SEEK_END)
        except IOError:  # either file is too small, or too many lines requested
            f.seek(0)
            lines_found = f.readlines()
            break

        lines_found = f.readlines()

        # we found enough lines, get out
        # Removed this line because it was redundant the while will catch
        # it, I left it for history
        # if len(lines_found) > lines:
        #    break

        # decrement the block counter to get the
        # next X bytes
        block_counter -= 1

    return lines_found[-lines:]
A. Coady
Reply   •   8 楼
A. Coady    16 年前

如果可以读取整个文件,则使用deque。

from collections import deque
deque(f, maxlen=n)

在2.6之前,deques没有maxlen选项,但是它很容易实现。

import itertools
def maxque(items, size):
    items = iter(items)
    q = deque(itertools.islice(items, size))
    for item in items:
        del q[0]
        q.append(item)
    return q

如果需要从末尾读取文件,则使用gallop(也称为指数)搜索。

def tail(f, n):
    assert n >= 0
    pos, lines = n+1, []
    while len(lines) <= n:
        try:
            f.seek(-pos, 2)
        except IOError:
            f.seek(0)
            break
        finally:
            lines = list(f)
        pos *= 2
    return lines[-n:]
Ivan Castellanos Mark
Reply   •   9 楼
Ivan Castellanos Mark    7 年前

在Python 2上假设一个UNIX类系统,您可以这样做:

import os
def tail(f, n, offset=0):
  stdin,stdout = os.popen2("tail -n "+n+offset+" "+f)
  stdin.close()
  lines = stdout.readlines(); stdout.close()
  return lines[:,-offset]

对于python 3,您可以执行以下操作:

import subprocess
def tail(f, n, offset=0):
    proc = subprocess.Popen(['tail', '-n', n + offset, f], stdout=subprocess.PIPE)
    lines = proc.stdout.readlines()
    return lines[:, -offset]
Mark Amery Allende
Reply   •   10 楼
Mark Amery Allende    10 年前

这可能比你的快。对线的长度没有任何假设。一次返回一个块,直到找到正确数量的'\n'字符。

def tail( f, lines=20 ):
    total_lines_wanted = lines

    BLOCK_SIZE = 1024
    f.seek(0, 2)
    block_end_byte = f.tell()
    lines_to_go = total_lines_wanted
    block_number = -1
    blocks = [] # blocks of size BLOCK_SIZE, in reverse order starting
                # from the end of the file
    while lines_to_go > 0 and block_end_byte > 0:
        if (block_end_byte - BLOCK_SIZE > 0):
            # read the last block we haven't yet read
            f.seek(block_number*BLOCK_SIZE, 2)
            blocks.append(f.read(BLOCK_SIZE))
        else:
            # file too small, start from begining
            f.seek(0,0)
            # only read what was not read
            blocks.append(f.read(block_end_byte))
        lines_found = blocks[-1].count('\n')
        lines_to_go -= lines_found
        block_end_byte -= BLOCK_SIZE
        block_number -= 1
    all_read_text = ''.join(reversed(blocks))
    return '\n'.join(all_read_text.splitlines()[-total_lines_wanted:])

我不喜欢关于线长度的棘手假设——作为一件实际的事情,你永远不可能知道这样的事情。

通常,这将在第一次或第二次通过循环时定位最后20行。如果你的74个字符的东西是准确的,你使块大小2048,你将尾随20行几乎立即。

而且,我也不会消耗大量的大脑卡路里来巧妙地调整物理操作系统块。使用这些高级i/o包,我怀疑您会看到试图在os块边界上对齐的任何性能结果。如果使用较低级别的I/O,则可能会看到加速。