社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

1分钟升级python3自带http服务器!支持文件上传!

黑客技术和网络安全 • 4 年前 • 567 次点击  
👇👇关注后回复 “进群” ,拉你进程序员交流群👇👇

作者丨小小明

来源丨快学Python


人生苦短,快学Python!

我们知道在命令行敲入以下命令即可打开一个http服务器:

python -m http.server 

然后就可以通过自己的IP地址来访问:

内网中的其他电脑也可以通过该IP下载你共享的文件。

现在我们希望增强该服务器的功能,增加简单的上传功能。

首先我们需要找到server.py文件所在的位置,一般都在python安装目录下的Lib目录下,例如我的电脑在D:\Miniconda3\Lib\http目录下,此时我们根据server.py的源码新增一个文件server2.py,代码如下:

__version__ = "0.1"
__all__ = ["SimpleHTTPRequestHandler"]

import html
import http.server
import mimetypes
import os
import posixpath
import re
import shutil
import urllib.error
import urllib.parse
import urllib.request
from io import BytesIO

class SimpleHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
    """简单的http文件服务器,支持上传下载
    """


    server_version = "SimpleHTTPWithUpload/" + __version__

    def do_GET(self):
        f = self.send_head()
        if f:
            self.copyfile(f, self.wfile)
            f.close()

    def do_HEAD(self):
        f = self.send_head()
        if f:
            f.close()

    def do_POST(self):
        r, info = self.deal_post_data()
        print((r, info, "by: ", self.client_address))
        f = BytesIO()
        f.write(b'html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">')
        f.write(b"\nUpload Result Page\n")
        f.write(b"\n

Upload Result Page

\n"
)
        f.write(b"
\n"
)
        if r:
            f.write(b"Success:")
        else:
            f.write(b"Failed:")
        f.write(info.encode())
        f.write(("
back"
 % self.headers['referer' ]).encode())
        f.write(b"\n\n")
        length = f.tell()
        f.seek(0)
        self.send_response(200)
        self.send_header("Content-type""text/html")
        self.send_header("Content-Length", str(length))
        self.end_headers()
        if f:
            self.copyfile(f, self.wfile)
            f.close()

    def deal_post_data(self):
        content_type = self.headers['content-type']
        if not content_type:
            return (False"Content-Type header doesn't contain boundary")
        boundary = content_type.split("=")[1].encode()
        remainbytes = int(self.headers['content-length'])
        line = self.rfile.readline()
        remainbytes -= len(line)
        if not boundary in line:
            return (False"Content NOT begin with boundary")
        line = self.rfile.readline()
        remainbytes -= len(line)
        fn = re.findall(r'Content-Disposition.*name="file"; filename="(.*)"', line.decode())
        if not fn:
            return (False"Can't find out file name...")
        path = self.translate_path(self.path)
        fn = os.path.join(path, fn[0])
        line = self.rfile.readline()
        remainbytes -= len(line)
        line = self.rfile.readline()
        remainbytes -= len(line)
        try:
            out = open(fn, 'wb')
        except IOError:
            return (False"Can't create file to write, do you have permission to write?")

        preline = self.rfile.readline()
        remainbytes -= len(preline)
        while remainbytes > 0:
            line = self.rfile.readline()
            remainbytes -= len(line)
            if boundary in line:
                preline = preline[0:-1]
                if preline.endswith(b'\r'):
                    preline = preline[0:-1]
                out.write(preline)
                out.close()
                return (True"File '%s' upload success!" % fn)
            else:
                out.write(preline)
                preline = line
        return (False"Unexpect Ends of data.")

    def send_head(self):
        path = self.translate_path(self.path)
        f = None
        if os.path.isdir(path):
            if not self.path.endswith('/'):
                # redirect browser - doing basically what apache does
                self.send_response(301)
                self.send_header("Location", self.path + "/")
                self.end_headers()
                return None
            for index in "index.html""index.htm":
                index = os.path.join(path, index)
                if os.path.exists(index):
                    path = index
                    break
            else:
                return self.list_directory(path)
        ctype = self.guess_type(path)
        try:
            # Always read in binary mode. Opening files in text mode may cause
            # newline translations, making the actual size of the content
            # transmitted *less* than the content-length!
            f = open(path, 'rb')
        except IOError:
            self.send_error(404"File not found")
            return None
        self.send_response(200)
        self.send_header("Content-type", ctype)
        fs = os.fstat(f.fileno())
        self.send_header("Content-Length", str(fs[6]))
        self.send_header("Last-Modified", self.date_time_string(fs.st_mtime))
        self.end_headers()
        return f

    def list_directory(self, path):
        try:
            list = os.listdir(path)
        except os.error:
            self.send_error(404"No permission to list directory")
            return None
        list.sort(key=lambda a: a.lower())
        f = BytesIO()
        displaypath = html.escape(urllib.parse.unquote(self.path))
        f.write(b'html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">')
        f.write(("\nDirectory listing for %s\n" % displaypath).encode())
        f.write(("\n

Directory listing for %s

\n"
 % displaypath).encode())
        f.write(b"
\n"
)
        f.write(b"
"
)
        f.write(b"")
        f.write(b"\n")
        f.write(b"
\n
    \n"
)
        for name in list:
            fullname = os.path.join(path, name)
            displayname = linkname = name
            # Append / for directories or @ for symbolic links
            if os.path.isdir(fullname):
                displayname = name + "/"
                linkname = name + "/"
            if os.path.islink(fullname):
                displayname = name + "@"
                # Note: a link to a directory displays with @ and links with /
            f.write(('
  • %s\n'

  •                      % (urllib.parse.quote(linkname), html.escape(displayname))).encode())
            f.write(b"\n
    \n\n\n"
    )
            length = f.tell()
            f.seek(0)
            self.send_response(200)
            self.send_header("Content-type""text/html")
            self.send_header("Content-Length", str(length))
            self.end_headers()
            return f

        def translate_path(self, path):
            path = path.split('?'1)[0]
            path = path.split('#'1)[0]
            path = posixpath.normpath(urllib.parse.unquote(path))
            words = path.split('/')
            words = [_f for _f in words if _f]
            path = os.getcwd()
            for word in words:
                drive, word = os.path.splitdrive(word)
                head, word = os.path.split(word)
                if word in (os.curdir, os.pardir): continue
                path = os.path.join(path, word)
            return path

        def copyfile(self, source, outputfile):
            shutil.copyfileobj(source, outputfile)

        def guess_type(self, path):
            base, ext = posixpath.splitext(path)
            if ext in self.extensions_map:
                return self.extensions_map[ext]
            ext = ext.lower()
            if ext in self.extensions_map:
                return self.extensions_map[ext]
            else:
                return self.extensions_map['']

        if not mimetypes.inited:
            mimetypes.init()  # try to read system mime.types
        extensions_map = mimetypes.types_map.copy()
        extensions_map.update({
            '''application/octet-stream',  # Default
            '.py''text/plain',
            '.c''text/plain',
            '.h''text/plain',
        })

    if __name__ == '__main__':
        import argparse

        parser = argparse.ArgumentParser()
        parser.add_argument('--bind''-b', default='', metavar='ADDRESS',
                            help='Specify alternate bind address '
                                 '[default: all interfaces]')
        parser.add_argument('--port''-p', default=8000, type=int,
                            help='Specify alternate port [default: 8000]')
        args = parser.parse_args()
        http.server.test(HandlerClass=SimpleHTTPRequestHandler, port=args.port, bind=args.bind) 

    就像这样:

    image-20210622205948012

    然后再运行:

    python -m http.server2 

    可以看到已经多了文件游览器和上传按钮。

    任意上传一个文件后会出现如下提示:

    点击返回或刷新页面可以重新游览文件列表。

    当然也支持指定端口,指定为默认的80端口时,游览器访问则无需指定端口号:

    python -m http.server2 -p 80 

    做的非常简陋,欢迎各位大佬改进!

    -End-

    最近有一些小伙伴,让我帮忙找一些 面试题 资料,于是我翻遍了收藏的 5T 资料后,汇总整理出来,可以说是程序员面试必备!所有资料都整理到网盘了,欢迎下载!

    点击👆卡片,关注后回复【面试题】即可获取

    在看点这里 好文分享给更多人↓↓

    Python社区是高质量的Python/Django开发社区
    本文地址:http://www.python88.com/topic/116575
     
    567 次点击