3 changed files with 260 additions and 0 deletions
Binary file not shown.
@ -0,0 +1,166 @@ |
|||
# SPDX-FileCopyrightText: 2016 Scott Shawcroft for Adafruit Industries |
|||
# |
|||
# SPDX-License-Identifier: MIT |
|||
|
|||
""" |
|||
`adafruit_bus_device.i2c_device` - I2C Bus Device |
|||
==================================================== |
|||
""" |
|||
|
|||
__version__ = "0.0.0-auto.0" |
|||
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BusDevice.git" |
|||
|
|||
|
|||
class I2CDevice: |
|||
""" |
|||
Represents a single I2C device and manages locking the bus and the device |
|||
address. |
|||
|
|||
:param ~busio.I2C i2c: The I2C bus the device is on |
|||
:param int device_address: The 7 bit device address |
|||
:param bool probe: Probe for the device upon object creation, default is true |
|||
|
|||
.. note:: This class is **NOT** built into CircuitPython. See |
|||
:ref:`here for install instructions <bus_device_installation>`. |
|||
|
|||
Example: |
|||
|
|||
.. code-block:: python |
|||
|
|||
import busio |
|||
from board import * |
|||
from adafruit_bus_device.i2c_device import I2CDevice |
|||
|
|||
with busio.I2C(SCL, SDA) as i2c: |
|||
device = I2CDevice(i2c, 0x70) |
|||
bytes_read = bytearray(4) |
|||
with device: |
|||
device.readinto(bytes_read) |
|||
# A second transaction |
|||
with device: |
|||
device.write(bytes_read) |
|||
""" |
|||
|
|||
def __init__(self, i2c, device_address, probe=True): |
|||
|
|||
self.i2c = i2c |
|||
self.device_address = device_address |
|||
|
|||
if probe: |
|||
self.__probe_for_device() |
|||
|
|||
def readinto(self, buf, *, start=0, end=None): |
|||
""" |
|||
Read into ``buf`` from the device. The number of bytes read will be the |
|||
length of ``buf``. |
|||
|
|||
If ``start`` or ``end`` is provided, then the buffer will be sliced |
|||
as if ``buf[start:end]``. This will not cause an allocation like |
|||
``buf[start:end]`` will so it saves memory. |
|||
|
|||
:param bytearray buffer: buffer to write into |
|||
:param int start: Index to start writing at |
|||
:param int end: Index to write up to but not include; if None, use ``len(buf)`` |
|||
""" |
|||
if end is None: |
|||
end = len(buf) |
|||
self.i2c.readfrom_into(self.device_address, buf, start=start, end=end) |
|||
|
|||
def write(self, buf, *, start=0, end=None): |
|||
""" |
|||
Write the bytes from ``buffer`` to the device, then transmit a stop |
|||
bit. |
|||
|
|||
If ``start`` or ``end`` is provided, then the buffer will be sliced |
|||
as if ``buffer[start:end]``. This will not cause an allocation like |
|||
``buffer[start:end]`` will so it saves memory. |
|||
|
|||
:param bytearray buffer: buffer containing the bytes to write |
|||
:param int start: Index to start writing from |
|||
:param int end: Index to read up to but not include; if None, use ``len(buf)`` |
|||
""" |
|||
if end is None: |
|||
end = len(buf) |
|||
self.i2c.writeto(self.device_address, buf, start=start, end=end) |
|||
|
|||
# pylint: disable-msg=too-many-arguments |
|||
def write_then_readinto( |
|||
self, |
|||
out_buffer, |
|||
in_buffer, |
|||
*, |
|||
out_start=0, |
|||
out_end=None, |
|||
in_start=0, |
|||
in_end=None |
|||
): |
|||
""" |
|||
Write the bytes from ``out_buffer`` to the device, then immediately |
|||
reads into ``in_buffer`` from the device. The number of bytes read |
|||
will be the length of ``in_buffer``. |
|||
|
|||
If ``out_start`` or ``out_end`` is provided, then the output buffer |
|||
will be sliced as if ``out_buffer[out_start:out_end]``. This will |
|||
not cause an allocation like ``buffer[out_start:out_end]`` will so |
|||
it saves memory. |
|||
|
|||
If ``in_start`` or ``in_end`` is provided, then the input buffer |
|||
will be sliced as if ``in_buffer[in_start:in_end]``. This will not |
|||
cause an allocation like ``in_buffer[in_start:in_end]`` will so |
|||
it saves memory. |
|||
|
|||
:param bytearray out_buffer: buffer containing the bytes to write |
|||
:param bytearray in_buffer: buffer containing the bytes to read into |
|||
:param int out_start: Index to start writing from |
|||
:param int out_end: Index to read up to but not include; if None, use ``len(out_buffer)`` |
|||
:param int in_start: Index to start writing at |
|||
:param int in_end: Index to write up to but not include; if None, use ``len(in_buffer)`` |
|||
""" |
|||
if out_end is None: |
|||
out_end = len(out_buffer) |
|||
if in_end is None: |
|||
in_end = len(in_buffer) |
|||
|
|||
self.i2c.writeto_then_readfrom( |
|||
self.device_address, |
|||
out_buffer, |
|||
in_buffer, |
|||
out_start=out_start, |
|||
out_end=out_end, |
|||
in_start=in_start, |
|||
in_end=in_end, |
|||
) |
|||
|
|||
# pylint: enable-msg=too-many-arguments |
|||
|
|||
def __enter__(self): |
|||
while not self.i2c.try_lock(): |
|||
pass |
|||
return self |
|||
|
|||
def __exit__(self, exc_type, exc_val, exc_tb): |
|||
self.i2c.unlock() |
|||
return False |
|||
|
|||
def __probe_for_device(self): |
|||
""" |
|||
Try to read a byte from an address, |
|||
if you get an OSError it means the device is not there |
|||
or that the device does not support these means of probing |
|||
""" |
|||
while not self.i2c.try_lock(): |
|||
pass |
|||
try: |
|||
self.i2c.writeto(self.device_address, b"") |
|||
except OSError: |
|||
# some OS's dont like writing an empty bytesting... |
|||
# Retry by reading a byte |
|||
try: |
|||
result = bytearray(1) |
|||
self.i2c.readfrom_into(self.device_address, result) |
|||
except OSError: |
|||
# pylint: disable=raise-missing-from |
|||
raise ValueError("No I2C device at address: 0x%x" % self.device_address) |
|||
# pylint: enable=raise-missing-from |
|||
finally: |
|||
self.i2c.unlock() |
@ -0,0 +1,94 @@ |
|||
# SPDX-FileCopyrightText: 2016 Scott Shawcroft for Adafruit Industries |
|||
# |
|||
# SPDX-License-Identifier: MIT |
|||
|
|||
# pylint: disable=too-few-public-methods |
|||
|
|||
""" |
|||
`adafruit_bus_device.spi_device` - SPI Bus Device |
|||
==================================================== |
|||
""" |
|||
|
|||
__version__ = "0.0.0-auto.0" |
|||
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BusDevice.git" |
|||
|
|||
|
|||
class SPIDevice: |
|||
""" |
|||
Represents a single SPI device and manages locking the bus and the device |
|||
address. |
|||
|
|||
:param ~busio.SPI spi: The SPI bus the device is on |
|||
:param ~digitalio.DigitalInOut chip_select: The chip select pin object that implements the |
|||
DigitalInOut API. |
|||
:param int extra_clocks: The minimum number of clock cycles to cycle the bus after CS is high. |
|||
(Used for SD cards.) |
|||
|
|||
.. note:: This class is **NOT** built into CircuitPython. See |
|||
:ref:`here for install instructions <bus_device_installation>`. |
|||
|
|||
Example: |
|||
|
|||
.. code-block:: python |
|||
|
|||
import busio |
|||
import digitalio |
|||
from board import * |
|||
from adafruit_bus_device.spi_device import SPIDevice |
|||
|
|||
with busio.SPI(SCK, MOSI, MISO) as spi_bus: |
|||
cs = digitalio.DigitalInOut(D10) |
|||
device = SPIDevice(spi_bus, cs) |
|||
bytes_read = bytearray(4) |
|||
# The object assigned to spi in the with statements below |
|||
# is the original spi_bus object. We are using the busio.SPI |
|||
# operations busio.SPI.readinto() and busio.SPI.write(). |
|||
with device as spi: |
|||
spi.readinto(bytes_read) |
|||
# A second transaction |
|||
with device as spi: |
|||
spi.write(bytes_read) |
|||
""" |
|||
|
|||
def __init__( |
|||
self, |
|||
spi, |
|||
chip_select=None, |
|||
*, |
|||
baudrate=100000, |
|||
polarity=0, |
|||
phase=0, |
|||
extra_clocks=0 |
|||
): |
|||
self.spi = spi |
|||
self.baudrate = baudrate |
|||
self.polarity = polarity |
|||
self.phase = phase |
|||
self.extra_clocks = extra_clocks |
|||
self.chip_select = chip_select |
|||
if self.chip_select: |
|||
self.chip_select.switch_to_output(value=True) |
|||
|
|||
def __enter__(self): |
|||
while not self.spi.try_lock(): |
|||
pass |
|||
self.spi.configure( |
|||
baudrate=self.baudrate, polarity=self.polarity, phase=self.phase |
|||
) |
|||
if self.chip_select: |
|||
self.chip_select.value = False |
|||
return self.spi |
|||
|
|||
def __exit__(self, exc_type, exc_val, exc_tb): |
|||
if self.chip_select: |
|||
self.chip_select.value = True |
|||
if self.extra_clocks > 0: |
|||
buf = bytearray(1) |
|||
buf[0] = 0xFF |
|||
clocks = self.extra_clocks // 8 |
|||
if self.extra_clocks % 8 != 0: |
|||
clocks += 1 |
|||
for _ in range(clocks): |
|||
self.spi.write(buf) |
|||
self.spi.unlock() |
|||
return False |
Loading…
Reference in new issue