Browse Source

上传文件至 'src'

master
nfa 4 years ago
parent
commit
9bb9ca2a74
  1. 19
      src/electricity.py
  2. 155
      src/erequest.py
  3. 35
      src/led.py
  4. 16
      src/light.py
  5. 16
      src/smoke.py

19
src/electricity.py

@ -0,0 +1,19 @@
import board
import digitalio
import time
elecio = digitalio.DigitalInOut(board.GP7)
elecio.direction = digitalio.Direction.INPUT
def mesure_electricity():
try:
# print(elecio.value)
return elecio.value
except:
return 1
if __name__ == "__main__":
while True:
# print(dropio.value)
time.sleep(1)
print(mesure_electricity())

155
src/erequest.py

@ -0,0 +1,155 @@
import board
import busio
import digitalio
import adafruit_requests as requests
from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K
import adafruit_wiznet5k.adafruit_wiznet5k_socket as socket
import time
import rtc
import adafruit_hashlib as hashlib
from light import flash_led
from config import IP_ADDR,MAC_ADDR,NET_MASK,GATEWAY,DNS,SECRET,SERVER_URL,SN,TIMESPAN,A_FIRE,A_TEMP,A_HUM
rt = rtc.RTC()
ipl = IP_ADDR.split('.')
nml = NET_MASK.split('.')
gwl = GATEWAY.split('.')
dnl = DNS.split('.')
macl = MAC_ADDR.split(':')
print(macl)
ipaddr = (int(ipl[0]),int(ipl[1]),int(ipl[2]),int(ipl[3]))
macaddr = (int(macl[0],16),int(macl[1],16),int(macl[2],16),int(macl[3],16),int(macl[4],16),int(macl[5],16))
netmask = (int(nml[0]),int(nml[1]),int(nml[2]),int(nml[3]))
gateway = (int(gwl[0]),int(gwl[1]),int(gwl[2]),int(gwl[3]))
dns = (int(dnl[0]),int(dnl[1]),int(dnl[2]),int(dnl[3]))
# Setup your network configuration below
print("Wiznet5k WebClient Test (no DHCP)")
print(time.localtime())
cs = digitalio.DigitalInOut(board.GP17)
spi_bus = busio.SPI(board.GP18, MOSI=board.GP19, MISO=board.GP16)
# Initialize ethernet interface without DHCP
eth = WIZNET5K(spi_bus, cs, is_dhcp=False, mac=macaddr)
# Set network configuration
def eth_init():
global eth
global ipaddr,netmask,gateway,dns # Set network configuration
nok = True
try:
eth.ifconfig = (ipaddr, netmask, gateway, dns)
print("Chip Version:", eth.chip)
print("MAC Address:", [hex(i) for i in eth.mac_address])
print("My IP address is:", eth.pretty_ip(eth.ip_address))
print("Link Status:", eth.link_status)
time.sleep(0.1)
requests.set_socket(socket, eth)
except:
nok = False
return nok
# eth.ifconfig = (ipaddr, netmask, gateway, dns)
def check_time():
global rt
# requests.set_socket(socket, eth)
print('Updating time ...')
# eth._debug = True
try:
gdata = {'spot':SN,'timespan':TIMESPAN,'afire':A_FIRE,'atemp':A_TEMP,'ahum':A_HUM}
print(gdata)
servtime = get_data(SERVER_URL+'/env/time/', gdata)
rt.datetime = time.localtime(int(servtime))
except:
pass
# print(r.datetime)
def check_netstatus():
global eth
stcode = 1
try:
if eth.link_status:
flash_led('normal')
stcode = 0
else:
flash_led('alarm')
stcode = 1
except:
pass
return stcode
def get_token():
global rt
try:
stamp = str(rt.datetime[0])+'##'+str(rt.datetime[3]%10)
print(stamp)
m = hashlib.sha1()
m.update((SECRET+stamp).encode('utf-8'))
return m.hexdigest()
except:
return ''
def parse_data(data):
res = ''
try:
if isinstance(data, dict):
t = []
for k in data:
t.append(k+'='+str(data[k]))
res = '&'.join(t)
except:
pass
return res
def get_data(server_addr, data):
if not server_addr:
print('Server address must be set.')
return False
# Initialize a requests object with a socket and ethernet interface
#requests.set_socket(socket, eth)
print('Geting data :'+server_addr)
# eth._debug = True
res = 'No data received.'
try:
r = requests.get(server_addr+'?'+parse_data(data), **{'timeout':0.1})
res = r.text
print(res)
r.close()
except:
pass
return res
def postdata(server_addr,data):
if not server_addr:
print('Server address must be set.')
return False
# Initialize a requests object with a socket and ethernet interface
requests.set_socket(socket, eth)
data['token']= get_token()
print('Token is :'+data['token'])
print('Sending data :'+server_addr)
# eth._debug = True
requests.request('POST',server_addr,data=data)
return True
def post_data(server_addr,data):
if not server_addr:
print('Server address must be set.')
return False
try:
data['token']= get_token()
print('Token is :'+data['token'])
print('Sending data :'+server_addr)
# eth._debug = True
r = requests.post(server_addr,**{'data':data,'timeout':0.1})
r.close()
except:
pass
return True
if __name__ == "__main__":
eth_init()
get_data('http://172.18.17.136:8000/env/cmd/',{'spot':'WJSPJ001'})
# post_data('http://2.54.41.27:8000/env/api/',{'errno':0,'electricity':True,'temperature':28.5,'humidity':50,'water':False,'fire':8800})
# post_data('http://192.168.2.76:8000/env/api/',{'sn':'WJSPJ002','errno':0,'electricity':True,'temperature':29.5,'humidity':56,'water':False,'fire':3800})
# post_test('http://www.changzhou.gov.cn/ns_class/06v86ba2')

35
src/led.py

@ -0,0 +1,35 @@
import board
import busio
import digitalio
ledpins = []
ledpins.append(digitalio.DigitalInOut(board.GP12))
ledpins.append(digitalio.DigitalInOut(board.GP11))
ledpins.append(digitalio.DigitalInOut(board.GP10))
ledpins.append(digitalio.DigitalInOut(board.GP6))
ledpins.append(digitalio.DigitalInOut(board.GP13))
ledpins.append(digitalio.DigitalInOut(board.GP14))
ledpins.append(digitalio.DigitalInOut(board.GP15))
for ledpin in ledpins:
ledpin.direction = digitalio.Direction.OUTPUT
displist = (
(1, 0, 0, 0, 0, 0, 0),
(1, 1, 1, 0, 0, 1, 1),
(0, 1, 0, 0, 1, 0, 0),
(0, 1, 0, 0, 0, 0, 1),
(0, 0, 1, 0, 0, 1, 1),
(0, 0, 0, 1, 0, 0, 1),
(0, 0, 0, 1, 0, 0, 0),
(1, 1, 0, 0, 0, 1, 1),
(0, 0, 0, 0, 0, 0, 0),
(0, 0, 0, 0, 0, 0, 1),
(0, 0, 0, 0, 0, 1, 0)
)
def show_num(number):
for i in range(7):
ledpins[i].value = displist[number][i]

16
src/light.py

@ -0,0 +1,16 @@
import board
import busio
import digitalio
normal_led = digitalio.DigitalInOut(board.GP22)
alarm_led = digitalio.DigitalInOut(board.GP21)
normal_led.direction = digitalio.Direction.OUTPUT
alarm_led.direction = digitalio.Direction.OUTPUT
def flash_led(mode='normal'):
if mode == 'normal':
normal_led.value = True
alarm_led.value = False
else:
normal_led.value = False
alarm_led.value = True

16
src/smoke.py

@ -0,0 +1,16 @@
import board
import analogio
import time
smk = analogio.AnalogIn(board.GP27)
def mesure_smoke():
try:
# print(smk.value)
return smk.value
except:
return 0
if __name__ == "__main__":
while True:
print(mesure_smoke())
time.sleep(1)
Loading…
Cancel
Save