社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

python:从文件中读取最后的'n'行[重复]

nac001 • 4 年前 • 1709 次点击  

我正在为一个web应用程序编写一个日志文件查看器,为此我想对日志文件的行进行分页。文件中的项是基于最新项的行。

所以我需要一个 tail() 可读取的方法 n 从底部开始并支持偏移的线。我想到的是这样的:

def tail(f, n, offset=0):
    """Reads a n lines from f with an offset of offset lines."""
    avg_line_length = 74
    to_read = n + offset
    while 1:
        try:
            f.seek(-(avg_line_length * to_read), 2)
        except IOError:
            # woops.  apparently file is smaller than what we want
            # to step back, go to the beginning instead
            f.seek(0)
        pos = f.tell()
        lines = f.read().splitlines()
        if len(lines) >= to_read or pos == 0:
            return lines[-to_read:offset and -offset or None]
        avg_line_length *= 1.3

这样做合理吗?使用偏移量跟踪日志文件的建议方法是什么?

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/43580
 
1709 次点击  
文章 [ 30 ]  |  最新文章 4 年前
ProgramFast
Reply   •   1 楼
ProgramFast    8 年前

虽然对于大文件来说,这并不是一个有效的方法,但是这段代码非常直截了当:

  1. 它读取文件对象, f
  2. 它使用换行符拆分返回的字符串, \n .
  3. 它获取数组列表的最后一个索引,使用负号表示最后一个索引, : 得到一个子阵。

    def tail(f,n):
        return "\n".join(f.read().split("\n")[-n:])
    
Med sadek
Reply   •   2 楼
Med sadek    5 年前

很简单:

def tail(fname,nl):
with open(fname) as f:
    data=f.readlines() #readlines return a list
    print(''.join(data[-nl:]))
user9956608
Reply   •   3 楼
user9956608    6 年前

我找到了一个可能最简单的方法来找到文件的第一行或最后N行

文件的最后N行(例如:n=10)

file=open("xyz.txt",'r")
liner=file.readlines()
for ran in range((len(liner)-N),len(liner)):
    print liner[ran]

文件的前n行(例如:n=10)

file=open("xyz.txt",'r")
liner=file.readlines()
for ran in range(0,N+1):
    print liner[ran]
S.Lott
Reply   •   4 楼
S.Lott    15 年前

仔细想想,这可能和这里的任何东西一样快。

def tail( f, window=20 ):
    lines= ['']*window
    count= 0
    for l in f:
        lines[count%window]= l
        count += 1
    print lines[count%window:], lines[:count%window]

简单多了。而且似乎进展得很快。

Quinten Cabo
Reply   •   5 楼
Quinten Cabo    5 年前

有非常有用的 module 这可以做到这一点:

from file_read_backwards import FileReadBackwards

with FileReadBackwards("/tmp/file", encoding="utf-8") as frb:

# getting lines by lines starting from the last line up
for l in frb:
    print(l)
Kant Manapure
Reply   •   6 楼
Kant Manapure    6 年前
abc = "2018-06-16 04:45:18.68"
filename = "abc.txt"
with open(filename) as myFile:
    for num, line in enumerate(myFile, 1):
        if abc in line:
            lastline = num
print "last occurance of work at file is in "+str(lastline) 
Y Kal
Reply   •   7 楼
Y Kal    6 年前
import itertools
fname = 'log.txt'
offset = 5
n = 10
with open(fname) as f:
    n_last_lines = list(reversed([x for x in itertools.islice(f, None)][-(offset+1):-(offset+n+1):-1]))
moylop260
Reply   •   8 楼
moylop260    9 年前
import time

attemps = 600
wait_sec = 5
fname = "YOUR_PATH"

with open(fname, "r") as f:
    where = f.tell()
    for i in range(attemps):
        line = f.readline()
        if not line:
            time.sleep(wait_sec)
            f.seek(where)
        else:
            print line, # already has newline
Raj
Reply   •   9 楼
Raj    10 年前
This is my version of tailf

import sys, time, os

filename = 'path to file'

try:
    with open(filename) as f:
        size = os.path.getsize(filename)
        if size < 1024:
            s = size
        else:
            s = 999
        f.seek(-s, 2)
        l = f.read()
        print l
        while True:
            line = f.readline()
            if not line:
                time.sleep(1)
                continue
            print line
except IOError:
    pass
Hal Canary
Reply   •   10 楼
Hal Canary    11 年前

不是第一个使用deque的例子,而是一个更简单的例子。这个是通用的:它适用于任何iterable对象,而不仅仅是文件。

#!/usr/bin/env python
import sys
import collections
def tail(iterable, N):
    deq = collections.deque()
    for thing in iterable:
        if len(deq) >= N:
            deq.popleft()
        deq.append(thing)
    for thing in deq:
        yield thing
if __name__ == '__main__':
    for line in tail(sys.stdin,10):
        sys.stdout.write(line)
Leifbk
Reply   •   11 楼
Leifbk    11 年前

我不得不从文件的最后一行读取一个特定的值,偶然发现了这个线程。我没有在python中重新发明轮子,而是使用了一个小的shell脚本,保存为 /usr/local/bin/get_last_网络:

#! /bin/bash
tail -n1 /home/leif/projects/transfer/export.log | awk {'print $14'}

在python程序中:

from subprocess import check_output

last_netp = int(check_output("/usr/local/bin/get_last_netp"))
Samba Siva Reddy
Reply   •   12 楼
Samba Siva Reddy    5 年前

简单的:

with open("test.txt") as f:
data = f.readlines()
tail = data[-2:]
print(''.join(tail)
David Rogers
Reply   •   13 楼
David Rogers    13 年前

如果文件没有以结尾,或者无法确保读取完整的第一行,这些解决方案中有几个会有问题。

def tail(file, n=1, bs=1024):
    f = open(file)
    f.seek(-1,2)
    l = 1-f.read(1).count('\n') # If file doesn't end in \n, count it anyway.
    B = f.tell()
    while n >= l and B > 0:
            block = min(bs, B)
            B -= block
            f.seek(B, 0)
            l += f.read(block).count('\n')
    f.seek(B, 0)
    l = min(l,n) # discard first (incomplete) line if l > n
    lines = f.readlines()[-l:]
    f.close()
    return lines
fdb
Reply   •   14 楼
fdb    13 年前

基于EyeCue答案(2010年6月10日21:28):这个类向file对象添加head()和tail()方法。

class File(file):
    def head(self, lines_2find=1):
        self.seek(0)                            #Rewind file
        return [self.next() for x in xrange(lines_2find)]

    def tail(self, lines_2find=1):  
        self.seek(0, 2)                         #go to end of file
        bytes_in_file = self.tell()             
        lines_found, total_bytes_scanned = 0, 0
        while (lines_2find+1 > lines_found and
               bytes_in_file > total_bytes_scanned): 
            byte_block = min(1024, bytes_in_file-total_bytes_scanned)
            self.seek(-(byte_block+total_bytes_scanned), 2)
            total_bytes_scanned += byte_block
            lines_found += self.read(1024).count('\n')
        self.seek(-total_bytes_scanned, 2)
        line_list = list(self.readlines())
        return line_list[-lines_2find:]

用途:

f = File('path/to/file', 'r')
f.head(3)
f.tail(3)
rabbit
Reply   •   15 楼
rabbit    13 年前

你可以用F.S寻(0, 2)到文件的末尾,然后逐行读取行,用以下替换读行():

def readline_backwards(self, f):
    backline = ''
    last = ''
    while not last == '\n':
        backline = last + backline
        if f.tell() <= 0:
            return backline
        f.seek(-1, 1)
        last = f.read(1)
        f.seek(-1, 1)
    backline = last
    last = ''
    while not last == '\n':
        backline = last + backline
        if f.tell() <= 0:
            return backline
        f.seek(-1, 1)
        last = f.read(1)
        f.seek(-1, 1)
    f.seek(1, 1)
    return backline
Brian
Reply   •   16 楼
Brian    15 年前

为了提高处理非常大文件的效率(通常在需要使用tail的日志文件情况下),您通常希望避免读取整个文件(即使这样做时不需要立即将整个文件读入内存),但是,您确实需要以某种方式计算行中的偏移量,而不是角色。一种可能是逐字符反向读取seek(),但这非常慢。相反,最好是在更大的块中处理。

我有一个实用功能,我刚才写的文件向后读取,可以在这里使用。

import os, itertools

def rblocks(f, blocksize=4096):
    """Read file as series of blocks from end of file to start.

    The data itself is in normal order, only the order of the blocks is reversed.
    ie. "hello world" -> ["ld","wor", "lo ", "hel"]
    Note that the file must be opened in binary mode.
    """
    if 'b' not in f.mode.lower():
        raise Exception("File must be opened using binary mode.")
    size = os.stat(f.name).st_size
    fullblocks, lastblock = divmod(size, blocksize)

    # The first(end of file) block will be short, since this leaves 
    # the rest aligned on a blocksize boundary.  This may be more 
    # efficient than having the last (first in file) block be short
    f.seek(-lastblock,2)
    yield f.read(lastblock)

    for i in range(fullblocks-1,-1, -1):
        f.seek(i * blocksize)
        yield f.read(blocksize)

def tail(f, nlines):
    buf = ''
    result = []
    for block in rblocks(f):
        buf = block + buf
        lines = buf.splitlines()

        # Return all lines except the first (since may be partial)
        if lines:
            result.extend(lines[1:]) # First line may not be complete
            if(len(result) >= nlines):
                return result[-nlines:]

            buf = lines[0]

    return ([buf]+result)[-nlines:]


f=open('file_to_tail.txt','rb')
for line in tail(f, 20):
    print line

[编辑]添加了更具体的版本(避免了两次反转)

GL2014
Reply   •   17 楼
GL2014    7 年前

下面是一个非常简单的实现:

with open('/etc/passwd', 'r') as f:
  try:
    f.seek(0,2)
    s = ''
    while s.count('\n') < 11:
      cur = f.tell()
      f.seek((cur - 10))
      s = f.read(10) + s
      f.seek((cur - 10))
    print s
  except Exception as e:
    f.readlines()
Travis Bear
Reply   •   18 楼
Travis Bear    10 年前

有一些现有的尾巴上的实现,您可以使用PIP安装:

  • 甲基氟脲嘧啶
  • 多尾
  • Log4拖尾机

根据您的情况,使用这些现有工具之一可能会有好处。

Eyecue
Reply   •   19 楼
Eyecue    14 年前

基于S.lott的最高投票结果(2008年9月25日21:43),但对小文件进行了修正。

def tail(the_file, lines_2find=20):  
    the_file.seek(0, 2)                         #go to end of file
    bytes_in_file = the_file.tell()             
    lines_found, total_bytes_scanned = 0, 0
    while lines_2find+1 > lines_found and bytes_in_file > total_bytes_scanned: 
        byte_block = min(1024, bytes_in_file-total_bytes_scanned)
        the_file.seek(-(byte_block+total_bytes_scanned), 2)
        total_bytes_scanned += byte_block
        lines_found += the_file.read(1024).count('\n')
    the_file.seek(-total_bytes_scanned, 2)
    line_list = list(the_file.readlines())
    return line_list[-lines_2find:]

    #we read at least 21 line breaks from the bottom, block by block for speed
    #21 to ensure we don't get a half line

希望这是有用的。

ShadowRanger
Reply   •   20 楼
ShadowRanger    5 年前

在评论者的要求下发布答案 my answer to a similar question 使用相同的技术来改变文件的最后一行,而不仅仅是得到它。

对于一个大文件, mmap 是最好的办法。改善现有的 MMAP 答:这个版本在Windows和Linux之间是可移植的,并且应该运行得更快(尽管在32位Python上,如果文件在GB范围内,如果不做一些修改,它将无法工作,请参阅 other answer for hints on handling this, and for modifying to work on Python 2 )

import io  # Gets consistent version of open for both Py2.7 and Py3.x
import itertools
import mmap

def skip_back_lines(mm, numlines, startidx):
    '''Factored out to simplify handling of n and offset'''
    for _ in itertools.repeat(None, numlines):
        startidx = mm.rfind(b'\n', 0, startidx)
        if startidx < 0:
            break
    return startidx

def tail(f, n, offset=0):
    # Reopen file in binary mode
    with io.open(f.name, 'rb') as binf, mmap.mmap(binf.fileno(), 0, access=mmap.ACCESS_READ) as mm:
        # len(mm) - 1 handles files ending w/newline by getting the prior line
        startofline = skip_back_lines(mm, offset, len(mm) - 1)
        if startofline < 0:
            return []  # Offset lines consumed whole file, nothing to return
            # If using a generator function (yield-ing, see below),
            # this should be a plain return, no empty list

        endoflines = startofline + 1  # Slice end to omit offset lines

        # Find start of lines to capture (add 1 to move from newline to beginning of following line)
        startofline = skip_back_lines(mm, n, startofline) + 1

        # Passing True to splitlines makes it return the list of lines without
        # removing the trailing newline (if any), so list mimics f.readlines()
        return mm[startofline:endoflines].splitlines(True)
        # If Windows style \r\n newlines need to be normalized to \n, and input
        # is ASCII compatible, can normalize newlines with:
        # return mm[startofline:endoflines].replace(os.linesep.encode('ascii'), b'\n').splitlines(True)

这假定尾线的数量足够小,你可以安全地将它们全部读入内存中;也可以使这成为一个生成器函数,并通过替换最后一行来手动读取一行。

        mm.seek(startofline)
        # Call mm.readline n times, or until EOF, whichever comes first
        # Python 3.2 and earlier:
        for line in itertools.islice(iter(mm.readline, b''), n):
            yield line

        # 3.3+:
        yield from itertools.islice(iter(mm.readline, b''), n)

最后,以二进制模式读取(必须使用 MMAP )所以它给了 str 行(py2)和 bytes 线条(PY3);如果需要 unicode (PY2)或 STR (py3),可以调整迭代方法来为您解码和/或修复换行符:

        lines = itertools.islice(iter(mm.readline, b''), n)
        if f.encoding:  # Decode if the passed file was opened with a specific encoding
            lines = (line.decode(f.encoding) for line in lines)
        if 'b' not in f.mode:  # Fix line breaks if passed file opened in text mode
            lines = (line.replace(os.linesep, '\n') for line in lines)
        # Python 3.2 and earlier:
        for line in lines:
            yield line
        # 3.3+:
        yield from lines

注意:我在一台无法访问python进行测试的机器上输入这些内容。请告诉我,如果我打字什么,这是足够的相似。 my other answer 那我 认为 它应该可以工作,但是调整(例如处理 offset )可能会导致微妙的错误。如果有任何错误,请在评论中告诉我。