[雪峰磁针石博客]python网络作业:使用python的socket库实现ICMP协议的ping

简介:

ICMP ping是您遇到过的最常见的网络扫描类型。 打开命令行提示符或终端并输入ping www.google.com非常容易。

为什么要在python中实现?

  • 很多名牌大学喜欢考试用python的socket库实现ICMP协议的ping
  • 个别环境没有ping

直接上代码:

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# 技术支持:https://www.jianshu.com/u/69f40328d4f0 
# 技术支持 https://china-testing.github.io/
# https://github.com/china-testing/python-api-tesing/blob/master/practices/ping.py
# 讨论钉钉免费群21745728 qq群144081101 567351477
# CreateDate: 2018-11-22

import os 
import argparse 
import socket
import struct
import select
import time


ICMP_ECHO_REQUEST = 8 # Platform specific
DEFAULT_TIMEOUT = 2
DEFAULT_COUNT = 4 


class Pinger(object):
    """ Pings to a host -- the Pythonic way"""

    def __init__(self, target_host, count=DEFAULT_COUNT, timeout=DEFAULT_TIMEOUT):
        self.target_host = target_host
        self.count = count
        self.timeout = timeout


    def do_checksum(self, source_string):
        """  Verify the packet integritity """
        sum = 0
        max_count = (len(source_string)/2)*2
        count = 0
        while count < max_count:

            val = source_string[count + 1]*256 + source_string[count]                   
            sum = sum + val
            sum = sum & 0xffffffff 
            count = count + 2

        if max_count<len(source_string):
            sum = sum + ord(source_string[len(source_string) - 1])
            sum = sum & 0xffffffff 

        sum = (sum >> 16)  +  (sum & 0xffff)
        sum = sum + (sum >> 16)
        answer = ~sum
        answer = answer & 0xffff
        answer = answer >> 8 | (answer << 8 & 0xff00)
        return answer

    def receive_pong(self, sock, ID, timeout):
        """
        Receive ping from the socket.
        """
        time_remaining = timeout
        while True:
            start_time = time.time()
            readable = select.select([sock], [], [], time_remaining)
            time_spent = (time.time() - start_time)
            if readable[0] == []: # Timeout
                return

            time_received = time.time()
            recv_packet, addr = sock.recvfrom(1024)
            icmp_header = recv_packet[20:28]
            type, code, checksum, packet_ID, sequence = struct.unpack(
       "bbHHh", icmp_header
   )
            if packet_ID == ID:
                bytes_In_double = struct.calcsize("d")
                time_sent = struct.unpack("d", recv_packet[28:28 + bytes_In_double])[0]
                return time_received - time_sent

            time_remaining = time_remaining - time_spent
            if time_remaining <= 0:
                return


    def send_ping(self, sock,  ID):
        """
        Send ping to the target host
        """
        target_addr  =  socket.gethostbyname(self.target_host)

        my_checksum = 0

        # Create a dummy heder with a 0 checksum.
        header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1)
        bytes_In_double = struct.calcsize("d")
        data = (192 - bytes_In_double) * "Q"
        data = struct.pack("d", time.time()) + bytes(data.encode('utf-8'))

        # Get the checksum on the data and the dummy header.
        my_checksum = self.do_checksum(header + data)
        header = struct.pack(
      "bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1
  )
        packet = header + data
        sock.sendto(packet, (target_addr, 1))


    def ping_once(self):
        """
        Returns the delay (in seconds) or none on timeout.
        """
        icmp = socket.getprotobyname("icmp")
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
        except socket.error as e:
            if e.errno == 1:
                # Not superuser, so operation not permitted
                e.msg +=  "ICMP messages can only be sent from root user processes"
                raise socket.error(e.msg)
        except Exception as e:
            print ("Exception: %s" %(e))

        my_ID = os.getpid() & 0xFFFF

        self.send_ping(sock, my_ID)
        delay = self.receive_pong(sock, my_ID, self.timeout)
        sock.close()
        return delay


    def ping(self):
        """
        Run the ping process
        """
        for i in range(self.count):
            print ("Ping to %s..." % self.target_host,)
            try:
                delay  =  self.ping_once()
            except socket.gaierror as e:
                print ("Ping failed. (socket error: '%s')" % e[1])
                break

            if delay  ==  None:
                print ("Ping failed. (timeout within %ssec.)" % self.timeout)
            else:
                delay  =  delay * 1000
                print ("Get pong in %0.4fms" % delay)



if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Python ping')
    parser.add_argument('host', action="store", help=u'主机名')
    given_args = parser.parse_args()  
    target_host = given_args.host
    pinger = Pinger(target_host=target_host)
    pinger.ping()

参考资料

执行

注意要有root或管理员权限:

# python3 ping.py china-testing.github.io
Ping to china-testing.github.io...
Get pong in 160.7175ms
Ping to china-testing.github.io...
Get pong in 160.8465ms
Ping to china-testing.github.io...
Get pong in 12.0983ms
Ping to china-testing.github.io...
Get pong in 161.3324ms
# python3 ping.py www.so.com
Ping to www.so.com...
Get pong in 29.0303ms
Ping to www.so.com...
Get pong in 28.8599ms
Ping to www.so.com...
Get pong in 28.9860ms
Ping to www.so.com...
Get pong in 29.0167ms
相关文章
|
4月前
|
负载均衡 网络协议 Linux
网络ping不通到底有多少原因?一文搞明白!
网络ping不通是网络中出现频率最高的故障之一,同时也是最让人抓狂的故障,谁没遇到过?今天就和你细说下ping不通的原因,看看能不能和你遇到的情况对上号。
2236 0
|
5月前
|
JSON 监控 API
在线网络PING接口检测服务器连通状态免费API教程
接口盒子提供免费PING检测API,可测试域名或IP的连通性与响应速度,支持指定地域节点,适用于服务器运维和网络监控。
577 0
|
9月前
|
网络协议 Unix Linux
# 2个类轻松构建高效Socket通信库
本文介绍了一种通过两个类`EpollEventHandler`和`IEpollEvent`构建高效Socket通信库的方法。该库支持TCP、UDP和Unix域套接字,采用I/O多路复用技术(如epoll),提升并发处理能力。通过抽象基类和具体事件类的设计,简化了API使用,便于开发者快速上手。文章还提供了服务端与客户端的实例代码,展示其在实际项目中的应用效果。此Socket库适应嵌入式环境,功能定制性强,有助于减少外部依赖并提升维护效率。
281 99
# 2个类轻松构建高效Socket通信库
|
11月前
|
API Python
【02】优雅草央央逆向技术篇之逆向接口协议篇-以小红书为例-python逆向小红书将用户名转换获得为uid-优雅草央千澈
【02】优雅草央央逆向技术篇之逆向接口协议篇-以小红书为例-python逆向小红书将用户名转换获得为uid-优雅草央千澈
689 1
|
5月前
|
API 数据安全/隐私保护 Python
小红书批量发布协议, 抖音自动批量发布软件脚本,笔记作品视频自动发布工具【python】
这个工具框架包含了小红书和抖音的批量发布功能,支持图片和视频处理、定时发布等功能
|
负载均衡 网络协议 算法
不为人知的网络编程(十九):能Ping通,TCP就一定能连接和通信吗?
这网络层就像搭积木一样,上层协议都是基于下层协议搭出来的。不管是ping(用了ICMP协议)还是tcp本质上都是基于网络层IP协议的数据包,而到了物理层,都是二进制01串,都走网卡发出去了。 如果网络环境没发生变化,目的地又一样,那按道理说他们走的网络路径应该是一样的,什么情况下会不同呢? 我们就从路由这个话题聊起吧。
327 4
不为人知的网络编程(十九):能Ping通,TCP就一定能连接和通信吗?
|
Python
Socket学习笔记(二):python通过socket实现客户端到服务器端的图片传输
使用Python的socket库实现客户端到服务器端的图片传输,包括客户端和服务器端的代码实现,以及传输结果的展示。
683 3
Socket学习笔记(二):python通过socket实现客户端到服务器端的图片传输
|
JSON 数据格式 Python
Socket学习笔记(一):python通过socket实现客户端到服务器端的文件传输
本文介绍了如何使用Python的socket模块实现客户端到服务器端的文件传输,包括客户端发送文件信息和内容,服务器端接收并保存文件的完整过程。
672 1
Socket学习笔记(一):python通过socket实现客户端到服务器端的文件传输
|
网络协议 安全 Linux
网络工具ping的使用方式
【10月更文挑战第19天】网络工具ping的使用方式
1245 6

推荐镜像

更多