uping.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. # µPing (MicroPing) for MicroPython
  2. # copyright (c) 2018 Shawwwn <shawwwn1@gmail.com>
  3. # License: MIT
  4. # Internet Checksum Algorithm
  5. # Author: Olav Morken
  6. # https://github.com/olavmrk/python-ping/blob/master/ping.py
  7. # @data: bytes
  8. def checksum(data):
  9. if len(data) & 0x1: # Odd number of bytes
  10. data += b'\0'
  11. cs = 0
  12. for pos in range(0, len(data), 2):
  13. b1 = data[pos]
  14. b2 = data[pos + 1]
  15. cs += (b1 << 8) + b2
  16. while cs >= 0x10000:
  17. cs = (cs & 0xffff) + (cs >> 16)
  18. cs = ~cs & 0xffff
  19. return cs
  20. def ping(host, count=4, timeout=5000, interval=10, quiet=False, size=64):
  21. import utime
  22. import uselect
  23. import uctypes
  24. import usocket
  25. import ustruct
  26. import urandom
  27. # prepare packet
  28. assert size >= 16, "pkt size too small"
  29. pkt = b'Q'*size
  30. pkt_desc = {
  31. "type": uctypes.UINT8 | 0,
  32. "code": uctypes.UINT8 | 1,
  33. "checksum": uctypes.UINT16 | 2,
  34. "id": uctypes.UINT16 | 4,
  35. "seq": uctypes.INT16 | 6,
  36. "timestamp": uctypes.UINT64 | 8,
  37. } # packet header descriptor
  38. h = uctypes.struct(uctypes.addressof(pkt), pkt_desc, uctypes.BIG_ENDIAN)
  39. h.type = 8 # ICMP_ECHO_REQUEST
  40. h.code = 0
  41. h.checksum = 0
  42. h.id = (urandom.getrandbits(8) << 8) + urandom.getrandbits(8) # urandom.randint(0, 65535)
  43. h.seq = 1
  44. # init socket
  45. sock = usocket.socket(usocket.AF_INET, usocket.SOCK_RAW, 1) #usocket.IPPROTO_ICMP == 1
  46. sock.setblocking(0)
  47. sock.settimeout(timeout/1000)
  48. addr = usocket.getaddrinfo(host, 1)[0][-1][0] # ip address
  49. sock.connect((addr, 1))
  50. not quiet and print("PING %s (%s): %u data bytes" % (host, addr, len(pkt)))
  51. seqs = list(range(1, count+1)) # [1,2,...,count]
  52. c = 1
  53. t = 0
  54. n_trans = 0
  55. n_recv = 0
  56. finish = False
  57. while t < timeout:
  58. if t==interval and c<=count:
  59. # send packet
  60. h.checksum = 0
  61. h.seq = c
  62. h.timestamp = utime.ticks_us()
  63. h.checksum = checksum(pkt)
  64. if sock.send(pkt) == size:
  65. n_trans += 1
  66. t = 0 # reset timeout
  67. else:
  68. seqs.remove(c)
  69. c += 1
  70. # recv packet
  71. while 1:
  72. socks, _, _ = uselect.select([sock], [], [], 0)
  73. if socks:
  74. resp = socks[0].recv(4096)
  75. resp_mv = memoryview(resp)
  76. h2 = uctypes.struct(uctypes.addressof(resp_mv[20:]), pkt_desc, uctypes.BIG_ENDIAN)
  77. # TODO: validate checksum (optional)
  78. seq = h2.seq
  79. if h2.type==0 and h2.id==h.id and (seq in seqs): # 0: ICMP_ECHO_REPLY
  80. t_elasped = (utime.ticks_us()-h2.timestamp) / 1000
  81. ttl = ustruct.unpack('!B', resp_mv[8:9])[0] # time-to-live
  82. n_recv += 1
  83. not quiet and print("%u bytes from %s: icmp_seq=%u, ttl=%u, time=%f ms" % (len(resp), addr, seq, ttl, t_elasped))
  84. seqs.remove(seq)
  85. if len(seqs) == 0:
  86. finish = True
  87. break
  88. else:
  89. break
  90. if finish:
  91. break
  92. utime.sleep_ms(1)
  93. t += 1
  94. # close
  95. sock.close()
  96. ret = (n_trans, n_recv)
  97. not quiet and print("%u packets transmitted, %u packets received" % (n_trans, n_recv))
  98. return (n_trans, n_recv)