"LOG.md" did not exist on "d4ddb799a3667581eda42f9e84393b4a7b10d490"
Newer
Older
#!/usr/bin/env python3
import gatt
import threading
import time
# BLE UUID's
root_identifier_uuid = '48c5d828-ac2a-442d-97a3-0c9822b04979'
uart_service_uuid = '6e400001-b5a3-f393-e0a9-e50e24dcca9e'
tx_characteristic_uuid = '6e400002-b5a3-f393-e0a9-e50e24dcca9e' # Write
rx_characteristic_uuid = '6e400003-b5a3-f393-e0a9-e50e24dcca9e' # Notify
DATA_TYPE_COLOR_SENSOR = 4
DATA_TYPE_BUMPER = 12
DATA_TYPE_LIGHT = 13
DATA_TYPE_TOUCH = 17
DATA_TYPE_CLIFF = 20
class BluetoothDeviceManager(gatt.DeviceManager):
robot = None # root robot device
def device_discovered(self, device):
print("[%s] Discovered: %s" % (device.mac_address, device.alias()))
self.stop_discovery() # Stop searching
self.robot = RootDevice(mac_address=device.mac_address, manager=self)
print('I HAVE BEEN MADE')
self.robot.print_message()
self.robot.connect()
class RootDevice(gatt.Device):
message = "no message"
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def print_message(self):
print(self.message)
def connect_succeeded(self):
super().connect_succeeded()
print("[%s] Connected" % (self.mac_address))
def connect_failed(self, error):
super().connect_failed(error)
print("[%s] Connection failed: %s" % (self.mac_address, str(error)))
def disconnect_succeeded(self):
super().disconnect_succeeded()
print("[%s] Disconnected" % (self.mac_address))
def services_resolved(self):
super().services_resolved()
print("[%s] Resolved services" % (self.mac_address))
self.uart_service = next(
s for s in self.services
if s.uuid == uart_service_uuid)
self.tx_characteristic = next(
c for c in self.uart_service.characteristics
if c.uuid == tx_characteristic_uuid)
self.rx_characteristic = next(
c for c in self.uart_service.characteristics
if c.uuid == rx_characteristic_uuid)
self.rx_characteristic.enable_notifications() # listen to RX messages
def characteristic_value_updated(self, characteristic, value):
# Print sensor data
if new_data[0] == DATA_TYPE_COLOR_SENSOR : type = "Color Sensor"
if new_data[0] == DATA_TYPE_BUMPER: type = "Bumper"
if new_data[0] == DATA_TYPE_LIGHT: type = "Light Sensor"
if new_data[0] == DATA_TYPE_TOUCH: type = "Touch Sensor"
if new_data[0] == DATA_TYPE_CLIFF: type = "Cliff Sensor"
# print("{}: {}".format(type, new_data))
# Do the thing!
if new_data[0] == DATA_TYPE_COLOR_SENSOR:
did_adjust = adjust_for_perimeter_if_needed(new_data)
if did_adjust:
return
if type == "Color Sensor" and self.edge_following_enabled:
self.follow_edge(new_data)
def adjust_for_perimeter_if_needed(self, data):
adjusted = False
# TODO: Check to see if you're at perimeter! If you are, update your state and set
# adjusted = True
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
return adjusted
def follow_edge(self, message):
if message[3] != 0 or message[4] != 0:
#drive_root("l")
manager.robot.turn_rate(-45)
#manager.robot.drive_left_slow()
#drive_root("z")
turn_state = 1
elif message[5] != 0 or message[6] != 0:
#drive_root("l")
manager.robot.turn_rate(-30)
#manager.robot.drive_left_slow_slow()
#drive_root("x")
turn_state = 1
#drive_root("f")
#print("test1")
elif message[7] != 0 or message[8] != 0:
drive_root("f")
elif message[9] != 0 or message[10] != 0:
manager.robot.turn_rate(30)
elif message[12] != 0 or message[11] != 0:
manager.robot.turn_rate(45)
elif message[14] != 0 or message[13] != 0:
#drive_root("r")
manager.robot.turn_rate(60)
#manager.robot.drive_right_slow_slow()
#drive_root("c")
turn_state = 2
#drive_root("f")
#print("test2")
elif message[16] != 0 or message[15] != 0:
#drive_root("r")
manager.robot.turn_rate(60)
#manager.robot.drive_right_slow()
#drive_root("v")
turn_state = 2
else:
drive_root("f")
#manager.robot.drive_forward_slow()
#drive_root("a")
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def drive_forward(self):
self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xD1])
def drive_left(self):
self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x8A])
def drive_right(self):
self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x25])
def stop(self):
self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x7E])
def drive_backwards(self):
self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0xFF, 0xFF, 0xFF, 0x9C, 0xFF, 0xFF, 0xFF, 0x9C, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x71])
def pen_up(self):
self.tx_characteristic.write_value([0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
def pen_down(self):
self.tx_characteristic.write_value([0x02, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
# def pattern_fill_straight(self):
# pen_down(self)
# Timer(30, drive_forward(self))
def turn_rate(self, rate):
left = 0
right = 0
if rate >= 0:
left = rate
if rate < 0:
right = -1*rate
leftbytes = left.to_bytes(4,byteorder='big',signed=True) # need to convert to byte string
rightbytes = right.to_bytes(4,byteorder='big',signed=True)
# note that we're not dynamically calculating the CRC at the end, so just leaving it 0 (unchecked)
self.tx_characteristic.write_value([0x01, 0x04, 0x00, leftbytes[0], leftbytes[1], leftbytes[2], leftbytes[3], rightbytes[0], rightbytes[1], rightbytes[2], rightbytes[3], 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0])
def steer(self, left, right):
leftbytes = left.to_bytes(4,byteorder='big',signed=True) # need to convert to byte string
rightbytes = right.to_bytes(4,byteorder='big',signed=True)
# note that we're not dynamically calculating the CRC at the end, so just leaving it 0 (unchecked)
self.tx_characteristic.write_value([0x01, 0x04, 0x00, leftbytes[0], leftbytes[1], leftbytes[2], leftbytes[3], rightbytes[0], rightbytes[1], rightbytes[2], rightbytes[3], 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0])
def drive_root(command):
angle = 0
if command == "f":
print ("Drive forward")
manager.robot.drive_forward()
if command == "b":
print ("Drive backwards")
manager.robot.drive_backwards()
if command == "r":
print ("Drive right")
manager.robot.drive_right()
if command == "l":
print ("Drive left")
manager.robot.drive_left()
if command == "s":
print ("Stop")
manager.robot.stop()
if command == "u":
print ("Pen up")
manager.robot.pen_up()
if command == "d":
print ("Pen down")
manager.robot.pen_down()
if command == "t":
print ("Enter turn rate (up to +-90):")
char = input()
rate = int(char)
print ("Turning ", rate)
manager.robot.turn_rate(rate)
if command == "p":
print ("Steer")
manager.robot.steer(30,40)
if command == "clear":
print ("clear all")
manager.robot.edge_following_enabled = False
if command == "edge":
print ("edge")
manager.robot.edge_following_enabled = True
# start here if run as program / not imported as module
if __name__ == "__main__":
manager = BluetoothDeviceManager(adapter_name = 'hci0')
manager.start_discovery(service_uuids=[root_identifier_uuid])
thread = threading.Thread(target = manager.run)
thread.start()
char = ""
try:
while manager.robot is None:
pass # wait for a root robot to be discovered
print("Press letter (f,b,l,r) to drive robot (t) to turn, (s) to stop, (u or d) raise pen up or down, (q) to quit")
while char != "q":
char = input() # wait for keyboard input
print('predrive')
drive_root(char)
manager.robot.print_message()
print('yay!')
print("Quitting")
manager.stop()
manager.robot.disconnect()
thread.join()
except KeyboardInterrupt:
pass