defdo_checksum(self, source_string): """ Verify the packet integritity """ sum = 0 max_count = (len(source_string)/2)*2 count = 0 while count val = source_string[count + 1]*256 + source_string[count] sum = sum + val sum = sum & 0xffffffff count = count + 2
if max_count sum = sum + ord(source_string[len(source_string) - 1]) sum = sum & 0xffffffff
time_remaining = time_remaining - time_spent if time_remaining <= 0: return
defsend_ping(self, sock, ID): """ Send ping to the target host """ target_addr = socket.gethostbyname(self.target_host)
my_checksum = 0
# Create a dummy heder with a 0 checksum. header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1) bytes_In_double = struct.calcsize("d") data = (192 - bytes_In_double) * "Q" data = struct.pack("d", time.time()) + bytes(data.encode('utf-8'))
# Get the checksum on the data and the dummy header. my_checksum = self.do_checksum(header + data) header = struct.pack( "bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1 ) packet = header + data sock.sendto(packet, (target_addr, 1))
defping_once(self): """ Returns the delay (in seconds) or none on timeout. """ icmp = socket.getprotobyname("icmp") try:
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp) except socket.error as e: if e.errno == 1: # Not superuser, so operation not permitted e.msg += "ICMP messages can only be sent from root user processes" raise socket.error(e.msg) except Exception as e: print("Exception: %s" %(e))
defping(self): """ Run the ping process """ for i in range(self.count): print ("Ping to %s..." % self.target_host,) try: delay = self.ping_once() except socket.gaierror as e: print ("Ping failed. (socket error: '%s')" % e[1]) break
if delay == None: print ("Ping failed. (timeout within %ssec.)" % self.timeout) else: delay = delay * 1000 print("Get pong in %0.4fms" % delay)
if __name__ == '__main__': alive = [] host_prefix = '192.168.242.' for i in range(1, 255): host = host_prefix + str(i) pinger = Pinger(target_host=host)