Newer
Older
# BLE UUID's
root_identifier_uuid = '48c5d828-ac2a-442d-97a3-0c9822b04979'
uart_service_uuid = '6e400001-b5a3-f393-e0a9-e50e24dcca9e'
tx_characteristic_uuid = '6e400002-b5a3-f393-e0a9-e50e24dcca9e' # Write
rx_characteristic_uuid = '6e400003-b5a3-f393-e0a9-e50e24dcca9e' # Notify
DATA_TYPE_COLOR_SENSOR = 4
DATA_TYPE_BUMPER = 12
DATA_TYPE_LIGHT = 13
DATA_TYPE_TOUCH = 17
DATA_TYPE_CLIFF = 20
# color classifications
color_black = [17, 68]
color_red = [34]
color_green = [51]
class BluetoothDeviceManager(gatt.DeviceManager):
robot = None # root robot device
def device_discovered(self, device):
print("[%s] Discovered: %s" % (device.mac_address, device.alias()))
self.stop_discovery() # Stop searching
self.robot = RootDevice(mac_address=device.mac_address, manager=self)
print('I HAVE BEEN MADE')
self.robot.print_message()
self.robot.connect()
class RootDevice(gatt.Device):
# "message" allows last sensor readings to be stored in the robot object for each robot
message = "no message yet"
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def print_message(self):
print(self.message)
def connect_succeeded(self):
super().connect_succeeded()
print("[%s] Connected" % (self.mac_address))
def connect_failed(self, error):
super().connect_failed(error)
print("[%s] Connection failed: %s" % (self.mac_address, str(error)))
def disconnect_succeeded(self):
super().disconnect_succeeded()
print("[%s] Disconnected" % (self.mac_address))
def services_resolved(self):
super().services_resolved()
print("[%s] Resolved services" % (self.mac_address))
self.uart_service = next(
s for s in self.services
if s.uuid == uart_service_uuid)
self.tx_characteristic = next(
c for c in self.uart_service.characteristics
if c.uuid == tx_characteristic_uuid)
self.rx_characteristic = next(
c for c in self.uart_service.characteristics
if c.uuid == rx_characteristic_uuid)
self.rx_characteristic.enable_notifications() # listen to RX messages
def characteristic_value_updated(self, characteristic, value):
# Print sensor data
if new_data[0] == DATA_TYPE_COLOR_SENSOR : type = "Color Sensor"
if new_data[0] == DATA_TYPE_BUMPER: type = "Bumper"
if new_data[0] == DATA_TYPE_LIGHT: type = "Light Sensor"
if new_data[0] == DATA_TYPE_TOUCH: type = "Touch Sensor"
if new_data[0] == DATA_TYPE_CLIFF: type = "Cliff Sensor"
# print("{}: {}".format(type, new_data))
# Perimeter detect, then other color functions
print("new data - calling perimeter")
did_adjust = self.adjust_for_perimeter_if_needed(new_data)
if did_adjust:
return
if type == "Color Sensor" and self.edge_following_enabled:
print("new data - calling edge following")
if type == "Color Sensor" and self.patterning_lines_enabled:
self.pattern_dashes(new_data)
def adjust_for_perimeter_if_needed(self, message):
# Check to see if you're at perimeter! If you are, turn and update your state
print("IN ADJUST PERIMETER")
n = 0
for i in range(3,5):
if message[i] in color_black:
n += 1
if n > 1:
self.turn_rate(-45)
adjusted = True
else:
adjusted = False
n = 0
for i in range(16,18):
if message[i] in color_black:
n += 1
if n > 1:
self.turn_rate(45)
adjusted = True
else:
adjusted = False
#pen down, pen up, so many times...
count = 0
while count < 3: # number of lines robot will draw
print(count)
t0 = time.time() #FIX TIME
t1 = time.time()
delta_t = t1 - t0
print(delta_t)
self.pen_down()
self.drive_forward()
while delta_t < 5000: # ms pen down
self.drive_forward()
t1 = time.time()
delta_t = t1 - t0
t0 = time.time()
print(count)
self.pen_up()
while delta_t < 3000: # ms pen up
t1 = time.time()
delta_t = t1 - t0
count += 1
self.pen_up()
self.patterning_lines_enabled = False
if message[3] != 0 or message[4] != 0:
#drive_root("l")
#manager.robot.drive_left_slow()
#drive_root("z")
turn_state = 1
elif message[5] != 0 or message[6] != 0:
#drive_root("l")
#manager.robot.drive_left_slow_slow()
#drive_root("x")
turn_state = 1
#drive_root("f")
#print("test1")
elif message[7] != 0 or message[8] != 0:
#drive_root("f") # calling the member function versus out into global space
self.drive_forward()
elif message[14] != 0 or message[13] != 0:
#drive_root("r")
#manager.robot.drive_right_slow_slow()
#drive_root("c")
turn_state = 2
#drive_root("f")
#print("test2")
elif message[16] != 0 or message[15] != 0:
#drive_root("r")
#manager.robot.drive_right_slow()
#drive_root("v")
turn_state = 2
else:
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def drive_forward(self):
self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xD1])
def drive_left(self):
self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x8A])
def drive_right(self):
self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x25])
def stop(self):
self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x7E])
def drive_backwards(self):
self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0xFF, 0xFF, 0xFF, 0x9C, 0xFF, 0xFF, 0xFF, 0x9C, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x71])
def pen_up(self):
self.tx_characteristic.write_value([0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
def pen_down(self):
self.tx_characteristic.write_value([0x02, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
def turn_rate(self, rate):
left = 0
right = 0
if rate >= 0:
left = rate
if rate < 0:
right = -1*rate
leftbytes = left.to_bytes(4,byteorder='big',signed=True) # need to convert to byte string
rightbytes = right.to_bytes(4,byteorder='big',signed=True)
# note that we're not dynamically calculating the CRC at the end, so just leaving it 0 (unchecked)
self.tx_characteristic.write_value([0x01, 0x04, 0x00, leftbytes[0], leftbytes[1], leftbytes[2], leftbytes[3], rightbytes[0], rightbytes[1], rightbytes[2], rightbytes[3], 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0])
def steer(self, left, right):
leftbytes = left.to_bytes(4,byteorder='big',signed=True) # need to convert to byte string
rightbytes = right.to_bytes(4,byteorder='big',signed=True)
# note that we're not dynamically calculating the CRC at the end, so just leaving it 0 (unchecked)
self.tx_characteristic.write_value([0x01, 0x04, 0x00, leftbytes[0], leftbytes[1], leftbytes[2], leftbytes[3], rightbytes[0], rightbytes[1], rightbytes[2], rightbytes[3], 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0])
def drive_root(command):
angle = 0
if command == "f":
print ("Drive forward")
manager.robot.drive_forward()
if command == "b":
print ("Drive backwards")
manager.robot.drive_backwards()
if command == "r":
print ("Drive right")
manager.robot.drive_right()
if command == "l":
print ("Drive left")
manager.robot.drive_left()
if command == "s":
print ("Stop")
manager.robot.stop()
if command == "u":
print ("Pen up")
manager.robot.pen_up()
if command == "d":
print ("Pen down")
manager.robot.pen_down()
if command == "t":
print ("Enter turn rate (up to +-90):")
char = input()
rate = int(char)
print ("Turning ", rate)
manager.robot.turn_rate(rate)
if command == "p":
print ("Steer")
manager.robot.steer(30,40)
manager.robot.patterning_lines_enabled = False
if command == "edge":
print ("edge")
manager.robot.edge_following_enabled = True
if command == "pattern lines":
print ("pattern lines")
manager.robot.patterning_lines_enabled = True
# start here if run as program / not imported as module
if __name__ == "__main__":
manager = BluetoothDeviceManager(adapter_name = 'hci0')
manager.start_discovery(service_uuids=[root_identifier_uuid])
thread = threading.Thread(target = manager.run)
thread.start()
char = ""
try:
while manager.robot is None:
pass # wait for a root robot to be discovered
print("Press letter (f,b,l,r) to drive robot (t) to turn, (s) to stop, (u or d) raise pen up or down, (q) to quit")
while char != "q":
char = input() # wait for keyboard input
print('predrive')
drive_root(char)
manager.robot.print_message()
print('yay!')
print("Quitting")
manager.stop()
manager.robot.disconnect()
thread.join()
except KeyboardInterrupt:
pass