Browse Source

上传文件至 'src/lib/adafruit_wiznet5k'

master
nfa 4 years ago
parent
commit
4b669a926d
  1. BIN
      src/lib/adafruit_wiznet5k/__init__.py
  2. 889
      src/lib/adafruit_wiznet5k/adafruit_wiznet5k.py
  3. 415
      src/lib/adafruit_wiznet5k/adafruit_wiznet5k_dhcp.py
  4. 253
      src/lib/adafruit_wiznet5k/adafruit_wiznet5k_dns.py
  5. 428
      src/lib/adafruit_wiznet5k/adafruit_wiznet5k_socket.py

BIN
src/lib/adafruit_wiznet5k/__init__.py

Binary file not shown.

889
src/lib/adafruit_wiznet5k/adafruit_wiznet5k.py

@ -0,0 +1,889 @@
# SPDX-FileCopyrightText: 2010 WIZnet
# SPDX-FileCopyrightText: 2010 Arduino LLC
# SPDX-FileCopyrightText: 2008 Bjoern Hartmann
# SPDX-FileCopyrightText: 2018 Paul Stoffregen
# SPDX-FileCopyrightText: 2020 Brent Rubell for Adafruit Industries
# SPDX-FileCopyrightText: 2021 Patrick Van Oosterwijck
# SPDX-FileCopyrightText: 2021 Adam Cummick
#
# SPDX-License-Identifier: MIT
"""
`adafruit_wiznet5k`
================================================================================
Pure-Python interface for WIZNET 5k ethernet modules.
* Author(s): WIZnet, Arduino LLC, Bjoern Hartmann, Paul Stoffregen, Brent Rubell,
Patrick Van Oosterwijck
Implementation Notes
--------------------
**Software and Dependencies:**
* Adafruit CircuitPython firmware for the supported boards:
https://github.com/adafruit/circuitpython/releases
* Adafruit's Bus Device library: https://github.com/adafruit/Adafruit_CircuitPython_BusDevice
"""
from random import randint
import time
from micropython import const
from adafruit_bus_device.spi_device import SPIDevice
import adafruit_wiznet5k.adafruit_wiznet5k_dhcp as dhcp
import adafruit_wiznet5k.adafruit_wiznet5k_dns as dns
__version__ = "0.0.0-auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_Wiznet5k.git"
# Wiznet5k Registers
REG_MR = const(0x0000) # Mode
REG_GAR = const(0x0001) # Gateway IP Address
REG_SUBR = const(0x0005) # Subnet Mask Address
REG_VERSIONR_W5500 = const(0x0039) # W5500 Silicon Version
REG_SHAR = const(0x0009) # Source Hardware Address
REG_SIPR = const(0x000F) # Source IP Address
REG_PHYCFGR = const(0x002E) # W5500 PHY Configuration
# Wiznet5k Socket Registers
REG_SNMR = const(0x0000) # Socket n Mode
REG_SNCR = const(0x0001) # Socket n Command
REG_SNIR = const(0x0002) # Socket n Interrupt
REG_SNSR = const(0x0003) # Socket n Status
REG_SNPORT = const(0x0004) # Socket n Source Port
REG_SNDIPR = const(0x000C) # Destination IP Address
REG_SNDPORT = const(0x0010) # Destination Port
REG_SNRX_RSR = const(0x0026) # RX Free Size
REG_SNRX_RD = const(0x0028) # Read Size Pointer
REG_SNTX_FSR = const(0x0020) # Socket n TX Free Size
REG_SNTX_WR = const(0x0024) # TX Write Pointer
# SNSR Commands
SNSR_SOCK_CLOSED = const(0x00)
SNSR_SOCK_INIT = const(0x13)
SNSR_SOCK_LISTEN = const(0x14)
SNSR_SOCK_SYNSENT = const(0x15)
SNSR_SOCK_SYNRECV = const(0x16)
SNSR_SOCK_ESTABLISHED = const(0x17)
SNSR_SOCK_FIN_WAIT = const(0x18)
SNSR_SOCK_CLOSING = const(0x1A)
SNSR_SOCK_TIME_WAIT = const(0x1B)
SNSR_SOCK_CLOSE_WAIT = const(0x1C)
SNSR_SOCK_LAST_ACK = const(0x1D)
SNSR_SOCK_UDP = const(0x22)
SNSR_SOCK_IPRAW = const(0x32)
SNSR_SOCK_MACRAW = const(0x42)
SNSR_SOCK_PPPOE = const(0x5F)
# Sock Commands (CMD)
CMD_SOCK_OPEN = const(0x01)
CMD_SOCK_LISTEN = const(0x02)
CMD_SOCK_CONNECT = const(0x04)
CMD_SOCK_DISCON = const(0x08)
CMD_SOCK_CLOSE = const(0x10)
CMD_SOCK_SEND = const(0x20)
CMD_SOCK_SEND_MAC = const(0x21)
CMD_SOCK_SEND_KEEP = const(0x22)
CMD_SOCK_RECV = const(0x40)
# Socket n Interrupt Register
SNIR_SEND_OK = const(0x10)
SNIR_TIMEOUT = const(0x08)
SNIR_RECV = const(0x04)
SNIR_DISCON = const(0x02)
SNIR_CON = const(0x01)
CH_SIZE = const(0x100)
SOCK_SIZE = const(0x800) # MAX W5k socket size
# Register commands
MR_RST = const(0x80) # Mode Register RST
# Socket mode register
SNMR_CLOSE = const(0x00)
SNMR_TCP = const(0x21)
SNMR_UDP = const(0x02)
SNMR_IPRAW = const(0x03)
SNMR_MACRAW = const(0x04)
SNMR_PPPOE = const(0x05)
MAX_PACKET = const(4000)
LOCAL_PORT = const(0x400)
# Default hardware MAC address
DEFAULT_MAC = (0xDE, 0xAD, 0xBE, 0xEF, 0xFE, 0xED)
# Maximum number of sockets to support, differs between chip versions.
W5200_W5500_MAX_SOCK_NUM = const(0x08)
SOCKET_INVALID = const(255)
# UDP socket struct.
UDP_SOCK = {"bytes_remaining": 0, "remote_ip": 0, "remote_port": 0}
class WIZNET5K: # pylint: disable=too-many-public-methods
"""Interface for WIZNET5K module.
:param ~busio.SPI spi_bus: The SPI bus the Wiznet module is connected to.
:param ~digitalio.DigitalInOut cs: Chip select pin.
:param ~digitalio.DigitalInOut rst: Optional reset pin.
:param bool is_dhcp: Whether to start DHCP automatically or not.
:param list mac: The Wiznet's MAC Address.
:param str hostname: The desired hostname, with optional {} to fill in MAC.
:param int dhcp_timeout: Timeout in seconds for DHCP response.
:param bool debug: Enable debugging output.
"""
TCP_MODE = const(0x21)
UDP_MODE = const(0x02)
TLS_MODE = const(0x03) # This is NOT currently implemented
# pylint: disable=too-many-arguments
def __init__(
self,
spi_bus,
cs,
reset=None,
is_dhcp=True,
mac=DEFAULT_MAC,
hostname=None,
dhcp_timeout=30,
debug=False,
):
self._debug = debug
self._chip_type = None
self._device = SPIDevice(spi_bus, cs, baudrate=8000000, polarity=0, phase=0)
# init c.s.
self._cs = cs
# reset wiznet module prior to initialization
if reset:
reset.value = True
time.sleep(0.1)
reset.value = False
time.sleep(0.1)
# Buffer for reading params from module
self._pbuff = bytearray(8)
self._rxbuf = bytearray(MAX_PACKET)
# attempt to initialize the module
self._ch_base_msb = 0
assert self._w5100_init() == 1, "Failed to initialize WIZnet module."
# Set MAC address
self.mac_address = mac
self._src_port = 0
self._dns = 0
# Set DHCP
if is_dhcp:
ret = self.set_dhcp(hostname, dhcp_timeout)
assert ret == 0, "Failed to configure DHCP Server!"
def set_dhcp(self, hostname=None, response_timeout=3):
"""Initializes the DHCP client and attempts to retrieve
and set network configuration from the DHCP server.
Returns True if DHCP configured, False otherwise.
:param str hostname: The desired hostname, with optional {} to fill in MAC.
:param int response_timeout: Time to wait for server to return packet, in seconds.
"""
if self._debug:
print("* Initializing DHCP")
# First, wait link status is on
# to avoid the code during DHCP - assert self.link_status, "Ethernet cable disconnected!"
start_time = time.monotonic()
while True:
if self.link_status or ((time.monotonic() - start_time) > 5) :
break
time.sleep(1)
if self._debug:
print("My Link is:", self.link_status)
self._src_port = 68
# Return IP assigned by DHCP
_dhcp_client = dhcp.DHCP(
self, self.mac_address, hostname, response_timeout, debug=self._debug
)
ret = _dhcp_client.request_dhcp_lease()
if ret == 1:
_ip = (
_dhcp_client.local_ip[0],
_dhcp_client.local_ip[1],
_dhcp_client.local_ip[2],
_dhcp_client.local_ip[3],
)
_subnet_mask = (
_dhcp_client.subnet_mask[0],
_dhcp_client.subnet_mask[1],
_dhcp_client.subnet_mask[2],
_dhcp_client.subnet_mask[3],
)
_gw_addr = (
_dhcp_client.gateway_ip[0],
_dhcp_client.gateway_ip[1],
_dhcp_client.gateway_ip[2],
_dhcp_client.gateway_ip[3],
)
self._dns = (
_dhcp_client.dns_server_ip[0],
_dhcp_client.dns_server_ip[1],
_dhcp_client.dns_server_ip[2],
_dhcp_client.dns_server_ip[3],
)
self.ifconfig = (_ip, _subnet_mask, _gw_addr, self._dns)
if self._debug:
print("* Found DHCP Server:")
print(
"IP: {}\nSubnet Mask: {}\nGW Addr: {}\nDNS Server: {}".format(
_ip, _subnet_mask, _gw_addr, self._dns
)
)
self._src_port = 0
return 0
return -1
def get_host_by_name(self, hostname):
"""Convert a hostname to a packed 4-byte IP Address.
Returns a 4 bytearray.
"""
if self._debug:
print(hostname)
print("* Get host by name")
if isinstance(hostname, str):
hostname = bytes(hostname, "utf-8")
self._src_port = int(time.monotonic()) & 0xFFFF
# Return IP assigned by DHCP
_dns_client = dns.DNS(self, self._dns, debug=self._debug)
ret = _dns_client.gethostbyname(hostname)
if self._debug:
print("* Resolved IP: ", ret)
assert ret != -1, "Failed to resolve hostname!"
self._src_port = 0
return ret
@property
def max_sockets(self):
"""Returns max number of sockets supported by chip."""
if self._chip_type == "w5500":
return W5200_W5500_MAX_SOCK_NUM
return -1
@property
def chip(self):
"""Returns the chip type."""
return self._chip_type
@property
def ip_address(self):
"""Returns the configured IP address."""
return self.read(REG_SIPR, 0x00, 4)
def pretty_ip(self, ip): # pylint: disable=no-self-use, invalid-name
"""Converts a bytearray IP address to a
dotted-quad string for printing
"""
return "%d.%d.%d.%d" % (ip[0], ip[1], ip[2], ip[3])
def unpretty_ip(self, ip): # pylint: disable=no-self-use, invalid-name
"""Converts a dotted-quad string to a bytearray IP address"""
octets = [int(x) for x in ip.split(".")]
return bytes(octets)
@property
def mac_address(self):
"""Returns the hardware's MAC address."""
return self.read(REG_SHAR, 0x00, 6)
@mac_address.setter
def mac_address(self, address):
"""Sets the hardware MAC address.
:param tuple address: Hardware MAC address.
"""
self.write(REG_SHAR, 0x04, address)
def pretty_mac(self, mac): # pylint: disable=no-self-use, invalid-name
"""Converts a bytearray MAC address to a
dotted-quad string for printing
"""
return "%s:%s:%s:%s:%s:%s" % (
hex(mac[0]),
hex(mac[1]),
hex(mac[2]),
hex(mac[3]),
hex(mac[4]),
hex(mac[5]),
)
def remote_ip(self, socket_num):
"""Returns the IP address of the host who sent the current incoming packet.
:param int socket num: Desired socket.
"""
if socket_num >= self.max_sockets:
return self._pbuff
for octet in range(0, 4):
self._pbuff[octet] = self._read_socket(socket_num, REG_SNDIPR + octet)[0]
return self.pretty_ip(self._pbuff)
@property
def link_status(self):
""""Returns if the PHY is connected."""
if self._chip_type == "w5500":
data = self.read(REG_PHYCFGR, 0x00)
return data[0] & 0x01
return 0
def remote_port(self, socket_num):
"""Returns the port of the host who sent the current incoming packet."""
if socket_num >= self.max_sockets:
return self._pbuff
for octet in range(0, 2):
self._pbuff[octet] = self._read_socket(socket_num, REG_SNDPORT + octet)[0]
return int((self._pbuff[0] << 8) | self._pbuff[0])
@property
def ifconfig(self):
"""Returns the network configuration as a tuple."""
return (
self.ip_address,
self.read(REG_SUBR, 0x00, 4),
self.read(REG_GAR, 0x00, 4),
self._dns,
)
@ifconfig.setter
def ifconfig(self, params):
"""Sets network configuration to provided tuple in format:
(ip_address, subnet_mask, gateway_address, dns_server).
"""
ip_address, subnet_mask, gateway_address, dns_server = params
self.write(REG_SIPR, 0x04, ip_address)
self.write(REG_SUBR, 0x04, subnet_mask)
self.write(REG_GAR, 0x04, gateway_address)
self._dns = dns_server
def _w5100_init(self):
"""Initializes and detects a wiznet5k module."""
time.sleep(1)
self._cs.switch_to_output()
self._cs.value = 1
# Detect if chip is Wiznet W5500
if self.detect_w5500() == 1:
# perform w5500 initialization
for i in range(0, W5200_W5500_MAX_SOCK_NUM):
ctrl_byte = 0x0C + (i << 5)
self.write(0x1E, ctrl_byte, 2)
self.write(0x1F, ctrl_byte, 2)
else:
return 0
return 1
def detect_w5500(self):
"""Detects W5500 chip."""
assert self.sw_reset() == 0, "Chip not reset properly!"
self._write_mr(0x08)
assert self._read_mr()[0] == 0x08, "Expected 0x08."
self._write_mr(0x10)
assert self._read_mr()[0] == 0x10, "Expected 0x10."
self._write_mr(0x00)
assert self._read_mr()[0] == 0x00, "Expected 0x00."
if self.read(REG_VERSIONR_W5500, 0x00)[0] != 0x04:
return -1
self._chip_type = "w5500"
self._ch_base_msb = 0x10
return 1
def sw_reset(self):
"""Performs a soft-reset on a Wiznet chip
by writing to its MR register reset bit.
"""
mode_reg = self._read_mr()
self._write_mr(0x80)
mode_reg = self._read_mr()
if mode_reg[0] != 0x00:
return -1
return 0
def _read_mr(self):
"""Reads from the Mode Register (MR)."""
res = self.read(REG_MR, 0x00)
return res
def _write_mr(self, data):
"""Writes to the mode register (MR).
:param int data: Data to write to the mode register.
"""
self.write(REG_MR, 0x04, data)
def read(self, addr, callback, length=1, buffer=None):
"""Reads data from a register address.
:param int addr: Register address.
"""
with self._device as bus_device:
bus_device.write(bytes([addr >> 8])) # pylint: disable=no-member
bus_device.write(bytes([addr & 0xFF])) # pylint: disable=no-member
bus_device.write(bytes([callback])) # pylint: disable=no-member
if buffer is None:
self._rxbuf = bytearray(length)
bus_device.readinto(self._rxbuf) # pylint: disable=no-member
return self._rxbuf
bus_device.readinto(buffer, end=length) # pylint: disable=no-member
return buffer
def write(self, addr, callback, data):
"""Write data to a register address.
:param int addr: Destination address.
:param int callback: Callback reference.
:param int data: Data to write, as an integer.
:param bytearray data: Data to write, as a bytearray.
"""
with self._device as bus_device:
bus_device.write(bytes([addr >> 8])) # pylint: disable=no-member
bus_device.write(bytes([addr & 0xFF])) # pylint: disable=no-member
bus_device.write(bytes([callback])) # pylint: disable=no-member
if hasattr(data, "from_bytes"):
bus_device.write(bytes([data])) # pylint: disable=no-member
else:
for i, _ in enumerate(data):
bus_device.write(bytes([data[i]])) # pylint: disable=no-member
# Socket-Register API
def udp_remaining(self):
"""Returns amount of bytes remaining in a udp socket."""
if self._debug:
print("* UDP Bytes Remaining: ", UDP_SOCK["bytes_remaining"])
return UDP_SOCK["bytes_remaining"]
def socket_available(self, socket_num, sock_type=SNMR_TCP):
"""Returns the amount of bytes to be read from the socket.
:param int socket_num: Desired socket to return bytes from.
:param int sock_type: Socket type, defaults to TCP.
"""
if self._debug:
print("* socket_available called with protocol", sock_type)
assert socket_num <= self.max_sockets, "Provided socket exceeds max_sockets."
res = self._get_rx_rcv_size(socket_num)
if sock_type == SNMR_TCP:
return res
if res > 0:
if UDP_SOCK["bytes_remaining"]:
return UDP_SOCK["bytes_remaining"]
# parse the udp rx packet
# read the first 8 header bytes
ret, self._pbuff = self.socket_read(socket_num, 8)
if ret > 0:
UDP_SOCK["remote_ip"] = self._pbuff[:4]
UDP_SOCK["remote_port"] = (self._pbuff[4] << 8) + self._pbuff[5]
UDP_SOCK["bytes_remaining"] = (self._pbuff[6] << 8) + self._pbuff[7]
ret = UDP_SOCK["bytes_remaining"]
return ret
return 0
def socket_status(self, socket_num):
"""Returns the socket connection status. Can be: SNSR_SOCK_CLOSED,
SNSR_SOCK_INIT, SNSR_SOCK_LISTEN, SNSR_SOCK_SYNSENT, SNSR_SOCK_SYNRECV,
SNSR_SYN_SOCK_ESTABLISHED, SNSR_SOCK_FIN_WAIT, SNSR_SOCK_CLOSING,
SNSR_SOCK_TIME_WAIT, SNSR_SOCK_CLOSE_WAIT, SNSR_LAST_ACK,
SNSR_SOCK_UDP, SNSR_SOCK_IPRAW, SNSR_SOCK_MACRAW, SNSR_SOCK_PPOE.
"""
return self._read_snsr(socket_num)
def socket_connect(self, socket_num, dest, port, conn_mode=SNMR_TCP):
"""Open and verify we've connected a socket to a dest IP address
or hostname. By default, we use 'conn_mode'= SNMR_TCP but we
may also use SNMR_UDP.
"""
assert self.link_status, "Ethernet cable disconnected!"
if self._debug:
print(
"* w5k socket connect, protocol={}, port={}, ip={}".format(
conn_mode, port, self.pretty_ip(dest)
)
)
# initialize a socket and set the mode
res = self.socket_open(socket_num, conn_mode=conn_mode)
if res == 1:
raise RuntimeError("Failed to initalize a connection with the socket.")
# set socket destination IP and port
self._write_sndipr(socket_num, dest)
self._write_sndport(socket_num, port)
self._send_socket_cmd(socket_num, CMD_SOCK_CONNECT)
if conn_mode == SNMR_TCP:
# wait for tcp connection establishment
while self.socket_status(socket_num)[0] != SNSR_SOCK_ESTABLISHED:
time.sleep(0.001)
if self._debug:
print("SN_SR:", self.socket_status(socket_num)[0])
if self.socket_status(socket_num)[0] == SNSR_SOCK_CLOSED:
raise RuntimeError("Failed to establish connection.")
elif conn_mode == SNMR_UDP:
UDP_SOCK["bytes_remaining"] = 0
return 1
def _send_socket_cmd(self, socket, cmd):
self._write_sncr(socket, cmd)
while self._read_sncr(socket) != b"\x00":
if self._debug:
print("waiting for sncr to clear...")
def get_socket(self):
"""Requests, allocates and returns a socket from the W5k
chip. Returned socket number may not exceed max_sockets.
"""
if self._debug:
print("*** Get socket")
sock = SOCKET_INVALID
for _sock in range(self.max_sockets):
status = self.socket_status(_sock)[0]
if status in (
SNSR_SOCK_CLOSED,
SNSR_SOCK_TIME_WAIT,
SNSR_SOCK_FIN_WAIT,
SNSR_SOCK_CLOSE_WAIT,
SNSR_SOCK_CLOSING,
):
sock = _sock
break
if self._debug:
print("Allocated socket #{}".format(sock))
return sock
def socket_listen(self, socket_num, port):
"""Start listening on a socket (TCP mode only).
:parm int socket_num: socket number
:parm int port: port to listen on
"""
assert self.link_status, "Ethernet cable disconnected!"
if self._debug:
print(
"* Listening on port={}, ip={}".format(
port, self.pretty_ip(self.ip_address)
)
)
# Initialize a socket and set the mode
self._src_port = port
res = self.socket_open(socket_num, conn_mode=SNMR_TCP)
if res == 1:
raise RuntimeError("Failed to initalize the socket.")
# Send listen command
self._send_socket_cmd(socket_num, CMD_SOCK_LISTEN)
# Wait until ready
status = [SNSR_SOCK_CLOSED]
while status[0] != SNSR_SOCK_LISTEN:
status = self._read_snsr(socket_num)
if status[0] == SNSR_SOCK_CLOSED:
raise RuntimeError("Listening socket closed.")
def socket_accept(self, socket_num):
"""Gets the dest IP and port from an incoming connection.
Returns the next socket number so listening can continue
:parm int socket_num: socket number
"""
dest_ip = self.remote_ip(socket_num)
dest_port = self.remote_port(socket_num)
next_socknum = self.get_socket()
if self._debug:
print(
"* Dest is ({}, {}), Next listen socknum is #{}".format(
dest_ip, dest_port, next_socknum
)
)
return next_socknum, (dest_ip, dest_port)
def socket_open(self, socket_num, conn_mode=SNMR_TCP):
"""Opens a TCP or UDP socket. By default, we use
'conn_mode'=SNMR_TCP but we may also use SNMR_UDP.
"""
assert self.link_status, "Ethernet cable disconnected!"
if self._debug:
print("*** Opening socket %d" % socket_num)
status = self._read_snsr(socket_num)[0]
if status in (
SNSR_SOCK_CLOSED,
SNSR_SOCK_TIME_WAIT,
SNSR_SOCK_FIN_WAIT,
SNSR_SOCK_CLOSE_WAIT,
SNSR_SOCK_CLOSING,
):
if self._debug:
print("* Opening W5k Socket, protocol={}".format(conn_mode))
time.sleep(0.00025)
self._write_snmr(socket_num, conn_mode)
self._write_snir(socket_num, 0xFF)
if self._src_port > 0:
# write to socket source port
self._write_sock_port(socket_num, self._src_port)
else:
self._write_sock_port(socket_num, randint(49152, 65535))
# open socket
self._write_sncr(socket_num, CMD_SOCK_OPEN)
self._read_sncr(socket_num)
assert (
self._read_snsr((socket_num))[0] == 0x13
or self._read_snsr((socket_num))[0] == 0x22
), "Could not open socket in TCP or UDP mode."
return 0
return 1
def socket_close(self, socket_num):
"""Closes a socket."""
if self._debug:
print("*** Closing socket #%d" % socket_num)
self._write_sncr(socket_num, CMD_SOCK_CLOSE)
self._read_sncr(socket_num)
def socket_disconnect(self, socket_num):
"""Disconnect a TCP connection."""
if self._debug:
print("*** Disconnecting socket #%d" % socket_num)
self._write_sncr(socket_num, CMD_SOCK_DISCON)
self._read_sncr(socket_num)
def socket_read(self, socket_num, length):
"""Reads data from a socket into a buffer.
Returns buffer.
"""
assert self.link_status, "Ethernet cable disconnected!"
assert socket_num <= self.max_sockets, "Provided socket exceeds max_sockets."
# Check if there is data available on the socket
ret = self._get_rx_rcv_size(socket_num)
if self._debug:
print("Bytes avail. on sock: ", ret)
if ret == 0:
# no data on socket?
status = self._read_snsr(socket_num)
if status in (SNSR_SOCK_LISTEN, SNSR_SOCK_CLOSED, SNSR_SOCK_CLOSE_WAIT):
# remote end closed its side of the connection, EOF state
ret = 0
resp = 0
else:
# connection is alive, no data waiting to be read
ret = -1
resp = -1
elif ret > length:
# set ret to the length of buffer
ret = length
if ret > 0:
if self._debug:
print("\t * Processing {} bytes of data".format(ret))
# Read the starting save address of the received data
ptr = self._read_snrx_rd(socket_num)
# Read data from the starting address of snrx_rd
ctrl_byte = 0x18 + (socket_num << 5)
resp = self.read(ptr, ctrl_byte, ret)
# After reading the received data, update Sn_RX_RD to the increased
# value as many as the reading size.
ptr = (ptr + ret) & 0xFFFF
self._write_snrx_rd(socket_num, ptr)
# Notify the W5k of the updated Sn_Rx_RD
self._write_sncr(socket_num, CMD_SOCK_RECV)
self._read_sncr(socket_num)
return ret, resp
def read_udp(self, socket_num, length):
"""Read UDP socket's remaining bytes."""
if UDP_SOCK["bytes_remaining"] > 0:
if UDP_SOCK["bytes_remaining"] <= length:
ret, resp = self.socket_read(socket_num, UDP_SOCK["bytes_remaining"])
else:
ret, resp = self.socket_read(socket_num, length)
if ret > 0:
UDP_SOCK["bytes_remaining"] -= ret
return ret, resp
return -1
def socket_write(self, socket_num, buffer, timeout=0):
"""Writes a bytearray to a provided socket."""
assert self.link_status, "Ethernet cable disconnected!"
assert socket_num <= self.max_sockets, "Provided socket exceeds max_sockets."
status = 0
ret = 0
free_size = 0
if len(buffer) > SOCK_SIZE:
ret = SOCK_SIZE
else:
ret = len(buffer)
stamp = time.monotonic()
# if buffer is available, start the transfer
free_size = self._get_tx_free_size(socket_num)
while free_size < ret:
free_size = self._get_tx_free_size(socket_num)
status = self.socket_status(socket_num)[0]
if status not in (SNSR_SOCK_ESTABLISHED, SNSR_SOCK_CLOSE_WAIT) or (
timeout and time.monotonic() - stamp > timeout
):
ret = 0
break
# Read the starting address for saving the transmitting data.
ptr = self._read_sntx_wr(socket_num)
offset = ptr & 0x07FF
dst_addr = offset + (socket_num * 2048 + 0x8000)
# update sn_tx_wr to the value + data size
ptr = (ptr + len(buffer)) & 0xFFFF
self._write_sntx_wr(socket_num, ptr)
cntl_byte = 0x14 + (socket_num << 5)
self.write(dst_addr, cntl_byte, buffer)
self._write_sncr(socket_num, CMD_SOCK_SEND)
self._read_sncr(socket_num)
# check data was transferred correctly
while (
self._read_socket(socket_num, REG_SNIR)[0] & SNIR_SEND_OK
) != SNIR_SEND_OK:
if (
self.socket_status(socket_num)[0]
in (
SNSR_SOCK_CLOSED,
SNSR_SOCK_TIME_WAIT,
SNSR_SOCK_FIN_WAIT,
SNSR_SOCK_CLOSE_WAIT,
SNSR_SOCK_CLOSING,
)
or (timeout and time.monotonic() - stamp > timeout)
):
# self.socket_close(socket_num)
return 0
time.sleep(0.01)
self._write_snir(socket_num, SNIR_SEND_OK)
return ret
# Socket-Register Methods
def _get_rx_rcv_size(self, sock):
"""Get size of recieved and saved in socket buffer."""
val = 0
val_1 = self._read_snrx_rsr(sock)
while val != val_1:
val_1 = self._read_snrx_rsr(sock)
if val_1 != 0:
val = self._read_snrx_rsr(sock)
return int.from_bytes(val, "b")
def _get_tx_free_size(self, sock):
"""Get free size of sock's tx buffer block."""
val = 0
val_1 = self._read_sntx_fsr(sock)
while val != val_1:
val_1 = self._read_sntx_fsr(sock)
if val_1 != 0:
val = self._read_sntx_fsr(sock)
return int.from_bytes(val, "b")
def _read_snrx_rd(self, sock):
self._pbuff[0] = self._read_socket(sock, REG_SNRX_RD)[0]
self._pbuff[1] = self._read_socket(sock, REG_SNRX_RD + 1)[0]
return self._pbuff[0] << 8 | self._pbuff[1]
def _write_snrx_rd(self, sock, data):
self._write_socket(sock, REG_SNRX_RD, data >> 8)
self._write_socket(sock, REG_SNRX_RD + 1, data & 0xFF)
def _write_sntx_wr(self, sock, data):
self._write_socket(sock, REG_SNTX_WR, data >> 8)
self._write_socket(sock, REG_SNTX_WR + 1, data & 0xFF)
def _read_sntx_wr(self, sock):
self._pbuff[0] = self._read_socket(sock, 0x0024)[0]
self._pbuff[1] = self._read_socket(sock, 0x0024 + 1)[0]
return self._pbuff[0] << 8 | self._pbuff[1]
def _read_sntx_fsr(self, sock):
data = self._read_socket(sock, REG_SNTX_FSR)
data += self._read_socket(sock, REG_SNTX_FSR + 1)
return data
def _read_snrx_rsr(self, sock):
data = self._read_socket(sock, REG_SNRX_RSR)
data += self._read_socket(sock, REG_SNRX_RSR + 1)
return data
def _write_sndipr(self, sock, ip_addr):
"""Writes to socket destination IP Address."""
for octet in range(0, 4):
self._write_socket(sock, REG_SNDIPR + octet, ip_addr[octet])
def _write_sndport(self, sock, port):
"""Writes to socket destination port."""
self._write_socket(sock, REG_SNDPORT, port >> 8)
self._write_socket(sock, REG_SNDPORT + 1, port & 0xFF)
def _read_snsr(self, sock):
"""Reads Socket n Status Register."""
return self._read_socket(sock, REG_SNSR)
def _write_snmr(self, sock, protocol):
"""Write to Socket n Mode Register."""
self._write_socket(sock, REG_SNMR, protocol)
def _write_snir(self, sock, data):
"""Write to Socket n Interrupt Register."""
self._write_socket(sock, REG_SNIR, data)
def _write_sock_port(self, sock, port):
"""Write to the socket port number."""
self._write_socket(sock, REG_SNPORT, port >> 8)
self._write_socket(sock, REG_SNPORT + 1, port & 0xFF)
def _write_sncr(self, sock, data):
self._write_socket(sock, REG_SNCR, data)
def _read_sncr(self, sock):
return self._read_socket(sock, REG_SNCR)
def _read_snmr(self, sock):
return self._read_socket(sock, REG_SNMR)
def _write_socket(self, sock, address, data):
"""Write to a W5k socket register."""
base = self._ch_base_msb << 8
cntl_byte = (sock << 5) + 0x0C
return self.write(base + sock * CH_SIZE + address, cntl_byte, data)
def _read_socket(self, sock, address):
"""Read a W5k socket register."""
cntl_byte = (sock << 5) + 0x08
return self.read(address, cntl_byte)

415
src/lib/adafruit_wiznet5k/adafruit_wiznet5k_dhcp.py

@ -0,0 +1,415 @@
# SPDX-FileCopyrightText: 2009 Jordan Terell (blog.jordanterrell.com)
# SPDX-FileCopyrightText: 2020 Brent Rubell for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
`adafruit_wiznet5k_dhcp`
================================================================================
Pure-Python implementation of Jordan Terrell's DHCP library v0.3
* Author(s): Jordan Terrell, Brent Rubell
"""
import gc
import time
from random import randrange
from micropython import const
import adafruit_wiznet5k.adafruit_wiznet5k_socket as socket
from adafruit_wiznet5k.adafruit_wiznet5k_socket import htonl, htons
# DHCP State Machine
STATE_DHCP_START = const(0x00)
STATE_DHCP_DISCOVER = const(0x01)
STATE_DHCP_REQUEST = const(0x02)
STATE_DHCP_LEASED = const(0x03)
STATE_DHCP_REREQUEST = const(0x04)
STATE_DHCP_RELEASE = const(0x05)
# DHCP Message Types
DHCP_DISCOVER = const(1)
DHCP_OFFER = const(2)
DHCP_REQUEST = const(3)
DHCP_DECLINE = const(4)
DHCP_ACK = const(5)
DHCP_NAK = const(6)
DHCP_RELEASE = const(7)
DHCP_INFORM = const(8)
# DHCP Message OP Codes
DHCP_BOOT_REQUEST = const(0x01)
DHCP_BOOT_REPLY = const(0x02)
DHCP_HTYPE10MB = const(0x01)
DHCP_HTYPE100MB = const(0x02)
DHCP_HLENETHERNET = const(0x06)
DHCP_HOPS = const(0x00)
MAGIC_COOKIE = const(0x63825363)
MAX_DHCP_OPT = const(0x10)
# Default DHCP Server port
DHCP_SERVER_PORT = const(67)
# DHCP Lease Time, in seconds
DEFAULT_LEASE_TIME = const(900)
BROADCAST_SERVER_ADDR = "255.255.255.255"
# DHCP Response Options
MSG_TYPE = 53
SUBNET_MASK = 1
ROUTERS_ON_SUBNET = 3
DNS_SERVERS = 6
DHCP_SERVER_ID = 54
T1_VAL = 58
T2_VAL = 59
LEASE_TIME = 51
OPT_END = 255
_BUFF = bytearray(317)
class DHCP:
"""W5k DHCP Client implementation.
:param eth: Wiznet 5k object
:param list mac_address: Hardware MAC.
:param str hostname: The desired hostname, with optional {} to fill in MAC.
:param int response_timeout: DHCP Response timeout.
:param bool debug: Enable debugging output.
"""
# pylint: disable=too-many-arguments, too-many-instance-attributes, invalid-name
def __init__(
self, eth, mac_address, hostname=None, response_timeout=30, debug=False
):
self._debug = debug
self._response_timeout = response_timeout
self._mac_address = mac_address
# Initalize a new UDP socket for DHCP
socket.set_interface(eth)
self._sock = socket.socket(type=socket.SOCK_DGRAM)
self._sock.settimeout(response_timeout)
# DHCP state machine
self._dhcp_state = STATE_DHCP_START
self._initial_xid = 0
self._transaction_id = 0
# DHCP server configuration
self.dhcp_server_ip = 0
self.local_ip = 0
self.gateway_ip = 0
self.subnet_mask = 0
self.dns_server_ip = 0
# Lease configuration
self._lease_time = 0
self._last_check_lease_ms = 0
self._renew_in_sec = 0
self._rebind_in_sec = 0
self._t1 = 0
self._t2 = 0
# Host name
mac_string = "".join("{:02X}".format(o) for o in mac_address)
self._hostname = bytes(
(hostname or "WIZnet{}").split(".")[0].format(mac_string)[:42], "utf-8"
)
def send_dhcp_message(self, state, time_elapsed):
"""Assemble and send a DHCP message packet to a socket.
:param int state: DHCP Message state.
:param float time_elapsed: Number of seconds elapsed since renewal.
"""
# before making send packet, shoule init _BUFF.
# if not, DHCP sometimes fails, wrong padding, garbage bytes, ...
_BUFF[:] = b'\x00' * len(_BUFF)
# OP
_BUFF[0] = DHCP_BOOT_REQUEST
# HTYPE
_BUFF[1] = DHCP_HTYPE10MB
# HLEN
_BUFF[2] = DHCP_HLENETHERNET
# HOPS
_BUFF[3] = DHCP_HOPS
# Transaction ID (xid)
self._initial_xid = htonl(self._transaction_id)
self._initial_xid = self._initial_xid.to_bytes(4, "l")
_BUFF[4:7] = self._initial_xid
# seconds elapsed
_BUFF[8] = (int(time_elapsed) & 0xFF00) >> 8
_BUFF[9] = int(time_elapsed) & 0x00FF
# flags
flags = htons(0x8000)
flags = flags.to_bytes(2, "b")
_BUFF[10] = flags[1]
_BUFF[11] = flags[0]
# NOTE: Skipping cidaddr/yiaddr/siaddr/giaddr
# as they're already set to 0.0.0.0
# chaddr
_BUFF[28:34] = self._mac_address
# NOTE: 192 octets of 0's, BOOTP legacy
# Magic Cookie
_BUFF[236] = (MAGIC_COOKIE >> 24) & 0xFF
_BUFF[237] = (MAGIC_COOKIE >> 16) & 0xFF
_BUFF[238] = (MAGIC_COOKIE >> 8) & 0xFF
_BUFF[239] = MAGIC_COOKIE & 0xFF
# Option - DHCP Message Type
_BUFF[240] = 53
_BUFF[241] = 0x01
_BUFF[242] = state
# Option - Client Identifier
_BUFF[243] = 61
# Length
_BUFF[244] = 0x07
# HW Type - ETH
_BUFF[245] = 0x01
# Client MAC Address
for mac in range(0, len(self._mac_address)):
_BUFF[246 + mac] = self._mac_address[mac]
# Option - Host Name
_BUFF[252] = 12
hostname_len = len(self._hostname)
after_hostname = 254 + hostname_len
_BUFF[253] = hostname_len
_BUFF[254:after_hostname] = self._hostname
if state == DHCP_REQUEST:
# Set the parsed local IP addr
_BUFF[after_hostname] = 50
_BUFF[after_hostname + 1] = 0x04
_BUFF[after_hostname + 2 : after_hostname + 6] = self.local_ip
# Set the parsed dhcp server ip addr
_BUFF[after_hostname + 6] = 54
_BUFF[after_hostname + 7] = 0x04
_BUFF[after_hostname + 8 : after_hostname + 12] = self.dhcp_server_ip
_BUFF[after_hostname + 12] = 55
_BUFF[after_hostname + 13] = 0x06
# subnet mask
_BUFF[after_hostname + 14] = 1
# routers on subnet
_BUFF[after_hostname + 15] = 3
# DNS
_BUFF[after_hostname + 16] = 6
# domain name
_BUFF[after_hostname + 17] = 15
# renewal (T1) value
_BUFF[after_hostname + 18] = 58
# rebinding (T2) value
_BUFF[after_hostname + 19] = 59
_BUFF[after_hostname + 20] = 255
# Send DHCP packet
self._sock.send(_BUFF)
def parse_dhcp_response(
self, response_timeout
): # pylint: disable=too-many-branches, too-many-statements
"""Parse DHCP response from DHCP server.
Returns DHCP packet type.
:param int response_timeout: Time to wait for server to return packet, in seconds.
"""
start_time = time.monotonic()
packet_sz = self._sock.available()
while packet_sz <= 0:
packet_sz = self._sock.available()
if (time.monotonic() - start_time) > response_timeout:
return (255, 0)
time.sleep(0.05)
# store packet in buffer
_BUFF = self._sock.recv()
if self._debug:
print("DHCP Response: ", _BUFF)
# -- Parse Packet, FIXED -- #
# Validate OP
assert (
_BUFF[0] == DHCP_BOOT_REPLY
), "Malformed Packet - \
DHCP message OP is not expected BOOT Reply."
xid = _BUFF[4:8]
if bytes(xid) < self._initial_xid:
print("f")
return 0, 0
self.local_ip = _BUFF[16:20]
if _BUFF[28:34] == 0:
return 0, 0
if int.from_bytes(_BUFF[235:240], "l") != MAGIC_COOKIE:
return 0, 0
# -- Parse Packet, VARIABLE -- #
ptr = 240
while _BUFF[ptr] != OPT_END:
if _BUFF[ptr] == MSG_TYPE:
ptr += 1
opt_len = _BUFF[ptr]
ptr += opt_len
msg_type = _BUFF[ptr]
ptr += 1
elif _BUFF[ptr] == SUBNET_MASK:
ptr += 1
opt_len = _BUFF[ptr]
ptr += 1
self.subnet_mask = _BUFF[ptr : ptr + opt_len]
ptr += opt_len
elif _BUFF[ptr] == DHCP_SERVER_ID:
ptr += 1
opt_len = _BUFF[ptr]
ptr += 1
self.dhcp_server_ip = _BUFF[ptr : ptr + opt_len]
ptr += opt_len
elif _BUFF[ptr] == LEASE_TIME:
ptr += 1
opt_len = _BUFF[ptr]
ptr += 1
self._lease_time = int.from_bytes(_BUFF[ptr : ptr + opt_len], "l")
ptr += opt_len
elif _BUFF[ptr] == ROUTERS_ON_SUBNET:
ptr += 1
opt_len = _BUFF[ptr]
ptr += 1
self.gateway_ip = _BUFF[ptr : ptr + opt_len]
ptr += opt_len
elif _BUFF[ptr] == DNS_SERVERS:
ptr += 1
opt_len = _BUFF[ptr]
ptr += 1
self.dns_server_ip = _BUFF[ptr : ptr + 4]
ptr += opt_len # still increment even though we only read 1 addr.
elif _BUFF[ptr] == T1_VAL:
ptr += 1
opt_len = _BUFF[ptr]
ptr += 1
self._t1 = int.from_bytes(_BUFF[ptr : ptr + opt_len], "l")
ptr += opt_len
elif _BUFF[ptr] == T2_VAL:
ptr += 1
opt_len = _BUFF[ptr]
ptr += 1
self._t2 = int.from_bytes(_BUFF[ptr : ptr + opt_len], "l")
ptr += opt_len
elif _BUFF[ptr] == 0:
break
else:
# We're not interested in this option
ptr += 1
opt_len = _BUFF[ptr]
ptr += 1
# no-op
ptr += opt_len
if self._debug:
print(
"Msg Type: {}\nSubnet Mask: {}\nDHCP Server ID:{}\nDNS Server IP:{}\
\nGateway IP:{}\nT1:{}\nT2:{}\nLease Time:{}".format(
msg_type,
self.subnet_mask,
self.dhcp_server_ip,
self.dns_server_ip,
self.gateway_ip,
self._t1,
self._t2,
self._lease_time,
)
)
gc.collect()
return msg_type, xid
def request_dhcp_lease(
self,
): # pylint: disable=too-many-branches, too-many-statements
"""Request to renew or acquire a DHCP lease."""
# select an initial transaction id
self._transaction_id = randrange(1, 2000)
result = 0
msg_type = 0
start_time = time.monotonic()
while self._dhcp_state != STATE_DHCP_LEASED:
if self._dhcp_state == STATE_DHCP_START:
self._transaction_id += 1
self._sock.connect(((BROADCAST_SERVER_ADDR), DHCP_SERVER_PORT))
if self._debug:
print("* DHCP: Discover")
self.send_dhcp_message(
STATE_DHCP_DISCOVER, ((time.monotonic() - start_time) / 1000)
)
self._dhcp_state = STATE_DHCP_DISCOVER
elif self._dhcp_state == STATE_DHCP_DISCOVER:
if self._debug:
print("* DHCP: Parsing OFFER")
msg_type, xid = self.parse_dhcp_response(self._response_timeout)
if msg_type == DHCP_OFFER:
# # use the _transaction_id the offer returned,
# # rather than the current one
# self._transaction_id = self._transaction_id.from_bytes(xid, "l")
if self._debug:
print("* DHCP: Request", xid)
self.send_dhcp_message(
DHCP_REQUEST, ((time.monotonic() - start_time) / 1000)
)
self._dhcp_state = STATE_DHCP_REQUEST
else:
print("* Received DHCP Message is not OFFER")
elif self._dhcp_state == STATE_DHCP_REQUEST:
if self._debug:
print("* DHCP: Parsing ACK")
msg_type, xid = self.parse_dhcp_response(self._response_timeout)
if msg_type == DHCP_ACK:
self._dhcp_state = STATE_DHCP_LEASED
result = 1
if self._lease_time == 0:
self._lease_time = DEFAULT_LEASE_TIME
if self._t1 == 0:
# T1 is 50% of _lease_time
self._t1 = self._lease_time >> 1
if self._t2 == 0:
# T2 is 87.5% of _lease_time
self._t2 = self._lease_time - (self._lease_time >> 3)
self._renew_in_sec = self._t1
self._rebind_in_sec = self._t2
elif msg_type == DHCP_NAK:
self._dhcp_state = STATE_DHCP_START
else:
print("* Received DHCP Message is not OFFER")
if msg_type == 255:
msg_type = 0
self._dhcp_state = STATE_DHCP_START
if result != 1 and (
(time.monotonic() - start_time > self._response_timeout)
):
break
self._transaction_id += 1
self._last_check_lease_ms = time.monotonic()
# close the socket, we're done with it
self._sock.close()
gc.collect()
return result

253
src/lib/adafruit_wiznet5k/adafruit_wiznet5k_dns.py

@ -0,0 +1,253 @@
# SPDX-FileCopyrightText: 2009-2010 MCQN Ltd
# SPDX-FileCopyrightText: Brent Rubell for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
`adafruit_wiznet5k_dns`
================================================================================
Pure-Python implementation of the Arduino DNS client for WIZnet 5k-based
ethernet modules.
* Author(s): MCQN Ltd, Brent Rubell
"""
import time
from random import getrandbits
from micropython import const
import adafruit_wiznet5k.adafruit_wiznet5k_socket as socket
from adafruit_wiznet5k.adafruit_wiznet5k_socket import htons
QUERY_FLAG = const(0x00)
OPCODE_STANDARD_QUERY = const(0x00)
RECURSION_DESIRED_FLAG = 1 << 8
TYPE_A = const(0x0001)
CLASS_IN = const(0x0001)
DATA_LEN = const(0x0004)
# Return codes for gethostbyname
SUCCESS = const(1)
TIMED_OUT = const(-1)
INVALID_SERVER = const(-2)
TRUNCATED = const(-3)
INVALID_RESPONSE = const(-4)
DNS_PORT = const(0x35) # port used for DNS request
class DNS:
"""W5K DNS implementation.
:param iface: Network interface
"""
def __init__(self, iface, dns_address, debug=False):
self._debug = debug
self._iface = iface
socket.set_interface(iface)
self._sock = socket.socket(type=socket.SOCK_DGRAM)
self._sock.settimeout(1)
self._dns_server = dns_address
self._host = 0
self._request_id = 0 # request identifier
self._pkt_buf = bytearray()
def gethostbyname(self, hostname):
"""Translate a host name to IPv4 address format.
:param str hostname: Desired host name to connect to.
Returns the IPv4 address as a bytearray if successful, -1 otherwise.
"""
if self._dns_server is None:
return INVALID_SERVER
self._host = hostname
# build DNS request packet
self._build_dns_header()
self._build_dns_question()
# Send DNS request packet
self._sock.connect((self._dns_server, DNS_PORT))
if self._debug:
print("* DNS: Sending request packet...")
self._sock.send(self._pkt_buf)
# wait and retry 3 times for a response
retries = 0
addr = -1
while (retries < 5) and (addr == -1):
addr = self._parse_dns_response()
if addr == -1 and self._debug:
print("* DNS ERROR: Failed to resolve DNS response, retrying...")
retries += 1
self._sock.close()
return addr
def _parse_dns_response(
self,
): # pylint: disable=too-many-return-statements, too-many-branches, too-many-statements, too-many-locals
"""Receives and parses DNS query response.
Returns desired hostname address if obtained, -1 otherwise.
"""
# wait for a response
start_time = time.monotonic()
packet_sz = self._sock.available()
while packet_sz <= 0:
packet_sz = self._sock.available()
if (time.monotonic() - start_time) > 1.0:
if self._debug:
print("* DNS ERROR: Did not receive DNS response!")
return -1
time.sleep(0.05)
# recv packet into buf
self._pkt_buf = self._sock.recv()
if self._debug:
print("DNS Packet Received: ", self._pkt_buf)
# Validate request identifier
xid = int.from_bytes(self._pkt_buf[0:2], "l")
if not xid == self._request_id:
if self._debug:
print(
"* DNS ERROR: Received request identifer {} \
does not match expected {}".format(
xid, self._request_id
)
)
return -1
# Validate flags
flags = int.from_bytes(self._pkt_buf[2:4], "l")
if not flags in (0x8180, 0x8580):
if self._debug:
print("* DNS ERROR: Invalid flags, ", flags)
return -1
# Number of questions
qr_count = int.from_bytes(self._pkt_buf[4:6], "l")
if not qr_count >= 1:
if self._debug:
print("* DNS ERROR: Question count >=1, ", qr_count)
return -1
# Number of answers
an_count = int.from_bytes(self._pkt_buf[6:8], "l")
if self._debug:
print("* DNS Answer Count: ", an_count)
if not an_count >= 1:
return -1
# Parse query
ptr = 12
name_len = 1
while name_len > 0:
# read the length of the name
name_len = self._pkt_buf[ptr]
if name_len == 0x00:
# we reached the end of this name
ptr += 1 # inc. pointer by 0x00
break
# advance pointer
ptr += name_len + 1
# Validate Query is Type A
q_type = int.from_bytes(self._pkt_buf[ptr : ptr + 2], "l")
if not q_type == TYPE_A:
if self._debug:
print("* DNS ERROR: Incorrect Query Type: ", q_type)
return -1
ptr += 2
# Validate Query is Type A
q_class = int.from_bytes(self._pkt_buf[ptr : ptr + 2], "l")
if not q_class == TYPE_A:
if self._debug:
print("* DNS ERROR: Incorrect Query Class: ", q_class)
return -1
ptr += 2
# Let's take the first type-a answer
if self._pkt_buf[ptr] != 0xC0:
return -1
ptr += 1
if self._pkt_buf[ptr] != 0xC:
return -1
ptr += 1
# Validate Answer Type A
ans_type = int.from_bytes(self._pkt_buf[ptr : ptr + 2], "l")
if not ans_type == TYPE_A:
if self._debug:
print("* DNS ERROR: Incorrect Answer Type: ", ans_type)
return -1
ptr += 2
# Validate Answer Class IN
ans_class = int.from_bytes(self._pkt_buf[ptr : ptr + 2], "l")
if not ans_class == TYPE_A:
if self._debug:
print("* DNS ERROR: Incorrect Answer Class: ", ans_class)
return -1
ptr += 2
# skip over TTL
ptr += 4
# Validate addr is IPv4
data_len = int.from_bytes(self._pkt_buf[ptr : ptr + 2], "l")
if not data_len == DATA_LEN:
if self._debug:
print("* DNS ERROR: Unexpected Data Length: ", data_len)
return -1
ptr += 2
# Return address
return self._pkt_buf[ptr : ptr + 4]
def _build_dns_header(self):
"""Builds DNS header."""
# generate a random, 16-bit, request identifier
self._request_id = getrandbits(16)
# ID, 16-bit identifier
self._pkt_buf.append(self._request_id >> 8)
self._pkt_buf.append(self._request_id & 0xFF)
# Flags (0x0100)
self._pkt_buf.append(0x01)
self._pkt_buf.append(0x00)
# QDCOUNT
self._pkt_buf.append(0x00)
self._pkt_buf.append(0x01)
# ANCOUNT
self._pkt_buf.append(0x00)
self._pkt_buf.append(0x00)
# NSCOUNT
self._pkt_buf.append(0x00)
self._pkt_buf.append(0x00)
# ARCOUNT
self._pkt_buf.append(0x00)
self._pkt_buf.append(0x00)
def _build_dns_question(self):
"""Build DNS question"""
host = self._host.decode("utf-8")
host = host.split(".")
# write out each section of host
for i, _ in enumerate(host):
# append the sz of the section
self._pkt_buf.append(len(host[i]))
# append the section data
self._pkt_buf += host[i]
# end of the name
self._pkt_buf.append(0x00)
# Type A record
self._pkt_buf.append(htons(TYPE_A) & 0xFF)
self._pkt_buf.append(htons(TYPE_A) >> 8)
# Class IN
self._pkt_buf.append(htons(CLASS_IN) & 0xFF)
self._pkt_buf.append(htons(CLASS_IN) >> 8)

428
src/lib/adafruit_wiznet5k/adafruit_wiznet5k_socket.py

@ -0,0 +1,428 @@
# SPDX-FileCopyrightText: 2019 ladyada for Adafruit Industries
# SPDX-FileCopyrightText: 2020 Brent Rubell for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
`adafruit_wiznet5k_socket`
================================================================================
A socket compatible interface with the Wiznet5k module.
* Author(s): ladyada, Brent Rubell, Patrick Van Oosterwijck, Adam Cummick
"""
import gc
import time
from micropython import const
import adafruit_wiznet5k as wiznet5k
_the_interface = None # pylint: disable=invalid-name
def set_interface(iface):
"""Helper to set the global internet interface."""
global _the_interface # pylint: disable=global-statement, invalid-name
_the_interface = iface
def htonl(x):
"""Convert 32-bit positive integers from host to network byte order."""
return (
((x) << 24 & 0xFF000000)
| ((x) << 8 & 0x00FF0000)
| ((x) >> 8 & 0x0000FF00)
| ((x) >> 24 & 0x000000FF)
)
def htons(x):
"""Convert 16-bit positive integers from host to network byte order."""
return (((x) << 8) & 0xFF00) | (((x) >> 8) & 0xFF)
SOCK_STREAM = const(0x21) # TCP
TCP_MODE = 80
SOCK_DGRAM = const(0x02) # UDP
AF_INET = const(3)
SOCKET_INVALID = const(255)
# pylint: disable=too-many-arguments, unused-argument
def getaddrinfo(host, port, family=0, socktype=0, proto=0, flags=0):
"""Translate the host/port argument into a sequence of 5-tuples that
contain all the necessary arguments for creating a socket connected to that service.
"""
if not isinstance(port, int):
raise RuntimeError("Port must be an integer")
if is_ipv4(host):
return [(AF_INET, socktype, proto, "", (host, port))]
return [(AF_INET, socktype, proto, "", (gethostbyname(host), port))]
def gethostbyname(hostname):
"""Translate a host name to IPv4 address format. The IPv4 address
is returned as a string.
:param str hostname: Desired hostname.
"""
addr = _the_interface.get_host_by_name(hostname)
addr = "{}.{}.{}.{}".format(addr[0], addr[1], addr[2], addr[3])
return addr
def is_ipv4(host):
"""Checks if a host string is an IPv4 address.
:param str host: host's name or ip
"""
octets = host.split(".", 3)
if len(octets) != 4 or not "".join(octets).isdigit():
return False
for octet in octets:
if int(octet) > 255:
return False
return True
# pylint: disable=invalid-name, too-many-public-methods
class socket:
"""A simplified implementation of the Python 'socket' class
for connecting to a Wiznet5k module.
:param int family: Socket address (and protocol) family.
:param int type: Socket type.
"""
# pylint: disable=redefined-builtin,unused-argument
def __init__(
self, family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None, socknum=None
):
if family != AF_INET:
raise RuntimeError("Only AF_INET family supported by W5K modules.")
self._sock_type = type
self._buffer = b""
self._timeout = 0
self._listen_port = None
self._socknum = _the_interface.get_socket()
if self._socknum == SOCKET_INVALID:
raise RuntimeError("Failed to allocate socket.")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._sock_type == SOCK_STREAM:
self.disconnect()
stamp = time.monotonic()
while self.status == wiznet5k.adafruit_wiznet5k.SNSR_SOCK_FIN_WAIT:
if time.monotonic() - stamp > 1000:
raise RuntimeError("Failed to disconnect socket")
self.close()
stamp = time.monotonic()
while self.status != wiznet5k.adafruit_wiznet5k.SNSR_SOCK_CLOSED:
if time.monotonic() - stamp > 1000:
raise RuntimeError("Failed to close socket")
@property
def socknum(self):
"""Returns the socket object's socket number."""
return self._socknum
@property
def status(self):
"""Returns the status of the socket"""
return _the_interface.socket_status(self.socknum)[0]
@property
def connected(self):
"""Returns whether or not we are connected to the socket."""
if self.socknum >= _the_interface.max_sockets:
return False
status = _the_interface.socket_status(self.socknum)[0]
if (
status == wiznet5k.adafruit_wiznet5k.SNSR_SOCK_CLOSE_WAIT
and self.available() == 0
):
result = False
else:
result = status not in (
wiznet5k.adafruit_wiznet5k.SNSR_SOCK_CLOSED,
wiznet5k.adafruit_wiznet5k.SNSR_SOCK_LISTEN,
wiznet5k.adafruit_wiznet5k.SNSR_SOCK_TIME_WAIT,
wiznet5k.adafruit_wiznet5k.SNSR_SOCK_FIN_WAIT,
)
if not result and status != wiznet5k.adafruit_wiznet5k.SNSR_SOCK_LISTEN:
self.close()
return result
def getpeername(self):
"""Return the remote address to which the socket is connected."""
return _the_interface.remote_ip(self.socknum)
def inet_aton(self, ip_string):
"""Convert an IPv4 address from dotted-quad string format.
:param str ip_string: IP Address, as a dotted-quad string.
"""
self._buffer = b""
self._buffer = [int(item) for item in ip_string.split(".")]
self._buffer = bytearray(self._buffer)
return self._buffer
def bind(self, address):
"""Bind the socket to the listen port, if host is specified the interface
will be reconfigured to that IP.
:param tuple address: local socket as a (host, port) tuple.
"""
if address[0] is not None:
ip_address = _the_interface.unpretty_ip(address[0])
current_ip, subnet_mask, gw_addr, dns = _the_interface.ifconfig
if ip_address != current_ip:
_the_interface.ifconfig = (ip_address, subnet_mask, gw_addr, dns)
self._listen_port = address[1]
def listen(self, backlog=None):
"""Listen on the port specified by bind.
:param backlog: For compatibility but ignored.
"""
assert self._listen_port is not None, "Use bind to set the port before listen!"
_the_interface.socket_listen(self.socknum, self._listen_port)
self._buffer = b""
def accept(self):
"""Accept a connection. The socket must be bound to an address and listening for
connections. The return value is a pair (conn, address) where conn is a new
socket object usable to send and receive data on the connection, and address is
the address bound to the socket on the other end of the connection.
"""
stamp = time.monotonic()
while self.status not in (
wiznet5k.adafruit_wiznet5k.SNSR_SOCK_SYNRECV,
wiznet5k.adafruit_wiznet5k.SNSR_SOCK_ESTABLISHED,
):
if self._timeout > 0 and time.monotonic() - stamp > self._timeout:
return None
if self.status == wiznet5k.adafruit_wiznet5k.SNSR_SOCK_CLOSED:
self.close()
self.listen()
new_listen_socknum, addr = _the_interface.socket_accept(self.socknum)
current_socknum = self.socknum
# Create a new socket object and swap socket nums so we can continue listening
client_sock = socket()
client_sock._socknum = current_socknum # pylint: disable=protected-access
self._socknum = new_listen_socknum # pylint: disable=protected-access
self.bind((None, self._listen_port))
self.listen()
while self.status != wiznet5k.adafruit_wiznet5k.SNSR_SOCK_LISTEN:
raise RuntimeError("Failed to open new listening socket")
return client_sock, addr
def connect(self, address, conntype=None):
"""Connect to a remote socket at address.
:param tuple address: Remote socket as a (host, port) tuple.
"""
assert (
conntype != 0x03
), "Error: SSL/TLS is not currently supported by CircuitPython."
host, port = address
if hasattr(host, "split"):
try:
host = tuple(map(int, host.split(".")))
except ValueError:
host = _the_interface.get_host_by_name(host)
if not _the_interface.socket_connect(
self.socknum, host, port, conn_mode=self._sock_type
):
raise RuntimeError("Failed to connect to host", host)
self._buffer = b""
def send(self, data):
"""Send data to the socket. The socket must be connected to
a remote socket.
:param bytearray data: Desired data to send to the socket.
"""
_the_interface.socket_write(self.socknum, data, self._timeout)
gc.collect()
def sendto(self, data, address):
"""Send data to the socket. The socket must be connected to
a remote socket.
:param bytearray data: Desired data to send to the socket.
:param tuple address: Remote socket as a (host, port) tuple.
"""
self.connect(address)
return self.send(data)
def recv(self, bufsize=0, flags=0): # pylint: disable=too-many-branches
"""Reads some bytes from the connected remote address.
:param int bufsize: Maximum number of bytes to receive.
:param int flags: ignored, present for compatibility.
"""
# print("Socket read", bufsize)
if bufsize == 0:
# read everything on the socket
while True:
avail = self.available()
if avail:
if self._sock_type == SOCK_STREAM:
self._buffer += _the_interface.socket_read(self.socknum, avail)[1]
elif self._sock_type == SOCK_DGRAM:
self._buffer += _the_interface.read_udp(self.socknum, avail)[1]
else:
break
gc.collect()
ret = self._buffer
self._buffer = b""
gc.collect()
return ret
stamp = time.monotonic()
to_read = bufsize - len(self._buffer)
received = []
while to_read > 0:
# print("Bytes to read:", to_read)
avail = self.available()
if avail:
stamp = time.monotonic()
if self._sock_type == SOCK_STREAM:
recv = _the_interface.socket_read(
self.socknum, min(to_read, avail)
)[1]
elif self._sock_type == SOCK_DGRAM:
recv = _the_interface.read_udp(self.socknum, min(to_read, avail))[1]
recv = bytes(recv)
received.append(recv)
to_read -= len(recv)
gc.collect()
if self._timeout > 0 and time.monotonic() - stamp > self._timeout:
break
self._buffer += b"".join(received)
ret = None
if len(self._buffer) == bufsize:
ret = self._buffer
self._buffer = b""
else:
ret = self._buffer[:bufsize]
self._buffer = self._buffer[bufsize:]
gc.collect()
return ret
def embed_recv(self, bufsize=0, flags=0): # pylint: disable=too-many-branches
"""Reads some bytes from the connected remote address and then return recv().
:param int bufsize: Maximum number of bytes to receive.
:param int flags: ignored, present for compatibility.
"""
# print("Socket read", bufsize)
ret = None
avail = self.available()
if avail:
if self._sock_type == SOCK_STREAM:
self._buffer += _the_interface.socket_read(self.socknum, avail)[1]
elif self._sock_type == SOCK_DGRAM:
self._buffer += _the_interface.read_udp(self.socknum, avail)[1]
gc.collect()
ret = self._buffer
# print("RET ptr:", id(ret), id(self._buffer))
self._buffer = b""
gc.collect()
return ret
def recvfrom(self, bufsize=0, flags=0):
"""Reads some bytes from the connected remote address.
:param int bufsize: Maximum number of bytes to receive.
:param int flags: ignored, present for compatibility.
:returns: a tuple (bytes, address) where address is a tuple (ip, port)
"""
return (
self.recv(bufsize),
(
_the_interface.remote_ip(self.socknum),
_the_interface.remote_port(self.socknum),
),
)
def recv_into(self, buf, nbytes=0, flags=0):
"""Reads some bytes from the connected remote address info the provided buffer.
:param bytearray buf: Data buffer
:param nbytes: Maximum number of bytes to receive
:param int flags: ignored, present for compatibility.
:returns: the number of bytes received
"""
if nbytes == 0:
nbytes = len(buf)
ret = self.recv(nbytes)
nbytes = len(ret)
buf[:nbytes] = ret
return nbytes
def recvfrom_into(self, buf, nbytes=0, flags=0):
"""Reads some bytes from the connected remote address info the provided buffer.
:param bytearray buf: Data buffer
:param nbytes: Maximum number of bytes to receive
:param int flags: ignored, present for compatibility.
:returns a tuple (nbytes, address) where address is a tuple (ip, port)
"""
return (
self.recv_into(buf, nbytes),
(
_the_interface.remote_ip(self.socknum),
_the_interface.remote_port(self.socknum),
),
)
def readline(self):
"""Attempt to return as many bytes as we can up to \
but not including '\r\n'.
"""
stamp = time.monotonic()
while b"\r\n" not in self._buffer:
avail = self.available()
if avail:
if self._sock_type == SOCK_STREAM:
self._buffer += _the_interface.socket_read(self.socknum, avail)[1]
elif self._sock_type == SOCK_DGRAM:
self._buffer += _the_interface.read_udp(self.socknum, avail)[1]
if (
not avail
and self._timeout > 0
and time.monotonic() - stamp > self._timeout
):
self.close()
raise RuntimeError("Didn't receive response, failing out...")
firstline, self._buffer = self._buffer.split(b"\r\n", 1)
gc.collect()
return firstline
def disconnect(self):
"""Disconnects a TCP socket."""
assert self._sock_type == SOCK_STREAM, "Socket must be a TCP socket."
_the_interface.socket_disconnect(self.socknum)
def close(self):
"""Closes the socket."""
_the_interface.socket_close(self.socknum)
def available(self):
"""Returns how many bytes of data are available to be read from the socket."""
return _the_interface.socket_available(self.socknum, self._sock_type)
def settimeout(self, value):
"""Sets socket read timeout.
:param int value: Socket read timeout, in seconds.
"""
if value < 0:
raise Exception("Timeout period should be non-negative.")
self._timeout = value
def gettimeout(self):
"""Return the timeout in seconds (float) associated
with socket operations, or None if no timeout is set.
"""
return self._timeout
Loading…
Cancel
Save