forked from luksan/pybrew
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbrewcontroller.py
More file actions
126 lines (102 loc) · 3.37 KB
/
brewcontroller.py
File metadata and controls
126 lines (102 loc) · 3.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# -*- coding: utf-8 -*-
import serial
import time
SERIAL_STATE_INIT = 1
SERIAL_STATE_ACTIVE = 2
SERIAL_STATE_ERROR = 3
# Commands:
# SV <ventilnummer> <1|0> ändra ventilläge
# GT <tempnummer> läs temperatur
# GV <ventilnummer> läs ventilläge
# GR läser regulatorns börvärde
# SR <temperatur> sätter regulatorns börvärde
class BrewController():
def __init__(self):
self.sport = None
self.serial_state = SERIAL_STATE_INIT
self.ticker = 0
self.read_temp1 = -255
self.has_read_temp1 = False
self.set_temp1 = 0
self.do_set_temp1 = False
self.send_queue = []
self.last_command_ok = False
self.VALVES = {
'0': 'Ventil 1',
'1': 'Ventil 2',
'2': 'Ventil 3',
'3': 'Ventil 4',
}
try:
self.sport = serial.Serial('/dev/ttyUSB0')
self.sport.timeout = 0.1
except Exception, e:
print e
raise Exception('Failed to init COM4')
class A:
def readlines(self):
return []
self.sport = A()
return
def _parse_serial(self, lines):
self.last_command_ok = False
if (len(lines) < 1):
return
if (lines[0].find("accepting commands") != -1):
self.serial_state = SERIAL_STATE_ACTIVE
print 'Serial line active'
return
if (self.serial_state != SERIAL_STATE_ACTIVE):
return
for i in range(0, len(lines)):
if (lines[0].find('GT') != -1):
try:
self.read_temp1 = int(lines[1])
self.has_read_temp1 = True
self.last_command_ok = True
except:
self.read_temp1 = -255
self.has_read_temp1 = False
if (lines[0].find('SR') != -1):
try:
result = lines[1]
if (result.find('OK') != -1):
self.last_command_ok = True
except:
pass
def send(self, line):
self.send_queue.append(line)
def set_valve_open(self, valve, open):
print "Setting", self.VALVES[valve], open
s = '0'
if open:
s = '1'
self.send("SV " + str(valve) + " " + s)
def set_temp(this, temp):
self.send("SR " + str(this.set_temp1))
def get_temp(this):
return this.read_temp1
def isready(self):
if ((self.serial_state == SERIAL_STATE_ACTIVE) and
(self.has_read_temp1)):
return True
else:
return False
def run(self):
lines = self.sport.readlines()
self._parse_serial(lines)
self.ticker += 1
if (self.serial_state != SERIAL_STATE_ACTIVE):
return
if ((self.ticker % 2) == 0):
self.sport.write("GT 0")
if (((self.ticker+1) % 2) == 0):
for l in self.send_queue:
self.sport.write(l)
self.send_queue = []
def main():
bc = BrewController()
while(True):
bc.run()
time.sleep(0.1)
#main()