defdo_GET(self): f = self.send_head() if f: self.copyfile(f, self.wfile) f.close()
defdo_HEAD(self): f = self.send_head() if f: f.close()
defdo_POST(self): r, info = self.deal_post_data() print((r, info, "by: ", self.client_address)) f = BytesIO() f.write(b'html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">') f.write(b"\nUpload Result Page\n") f.write(b"\n
defdeal_post_data(self): content_type = self.headers['content-type'] ifnot content_type: return (False, "Content-Type header doesn't contain boundary") boundary = content_type.split("=")[1].encode() remainbytes = int(self.headers['content-length']) line = self.rfile.readline() remainbytes -= len(line) ifnot boundary in line: return (False, "Content NOT begin with boundary") line = self.rfile.readline() remainbytes -= len(line) fn = re.findall(r'Content-Disposition.*name="file"; filename="(.*)"', line.decode()) ifnot fn: return (False, "Can't find out file name...") path = self.translate_path(self.path) fn = os.path.join(path, fn[0]) line = self.rfile.readline() remainbytes -= len(line) line = self.rfile.readline() remainbytes -= len(line) try: out = open(fn, 'wb') except IOError: return (False, "Can't create file to write, do you have permission to write?")
preline = self.rfile.readline() remainbytes -= len(preline) while remainbytes > 0: line = self.rfile.readline() remainbytes -= len(line) if boundary in line: preline = preline[0:-1] if preline.endswith(b'\r'): preline = preline[0:-1] out.write(preline) out.close() return (True, "File '%s' upload success!" % fn) else: out.write(preline) preline = line return (False, "Unexpect Ends of data.")
defsend_head(self): path = self.translate_path(self.path) f = None if os.path.isdir(path): ifnot self.path.endswith('/'): # redirect browser - doing basically what apache does self.send_response(301) self.send_header("Location", self.path + "/") self.end_headers() returnNone for index in"index.html", "index.htm": index = os.path.join(path, index) if os.path.exists(index): path = index break else: return self.list_directory(path) ctype = self.guess_type(path) try: # Always read in binary mode. Opening files in text mode may cause # newline translations, making the actual size of the content # transmitted *less* than the content-length! f = open(path, 'rb') except IOError: self.send_error(404, "File not found") returnNone self.send_response(200) self.send_header("Content-type", ctype) fs = os.fstat(f.fileno()) self.send_header("Content-Length", str(fs[6])) self.send_header("Last-Modified", self.date_time_string(fs.st_mtime)) self.end_headers() return f
deflist_directory(self, path): try: list = os.listdir(path) except os.error:
self.send_error(404, "No permission to list directory") returnNone list.sort(key=lambda a: a.lower()) f = BytesIO() displaypath = html.escape(urllib.parse.unquote(self.path)) f.write(b'html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">') f.write(("\nDirectory listing for %s\n" % displaypath).encode()) f.write(("\n
) for name in list: fullname = os.path.join(path, name) displayname = linkname = name # Append / for directories or @ for symbolic links if os.path.isdir(fullname): displayname = name + "/" linkname = name + "/" if os.path.islink(fullname): displayname = name + "@" # Note: a link to a directory displays with @ and links with / f.write(('
deftranslate_path(self, path): path = path.split('?', 1)[0] path = path.split('#', 1)[0] path = posixpath.normpath(urllib.parse.unquote(path)) words = path.split('/') words = [_f for _f in words if _f] path = os.getcwd() for word in words: drive, word = os.path.splitdrive(word) head, word = os.path.split(word) if word in (os.curdir, os.pardir): continue path = os.path.join(path, word) return path