Newer
Older
# BLE UUID's
root_identifier_uuid = '48c5d828-ac2a-442d-97a3-0c9822b04979'
uart_service_uuid = '6e400001-b5a3-f393-e0a9-e50e24dcca9e'
tx_characteristic_uuid = '6e400002-b5a3-f393-e0a9-e50e24dcca9e' # Write
rx_characteristic_uuid = '6e400003-b5a3-f393-e0a9-e50e24dcca9e' # Notify
DATA_TYPE_COLOR_SENSOR = 4
DATA_TYPE_BUMPER = 12
DATA_TYPE_LIGHT = 13
DATA_TYPE_TOUCH = 17
DATA_TYPE_CLIFF = 20
color_black = [17, 68, 65, 20, 49, 67, 52]
color_red = [34, 2, 32]
color_green = [51, 48, 3]
color_blue = [4, 64, 68]
class BluetoothDeviceManager(gatt.DeviceManager):
robot = None # root robot device
def device_discovered(self, device):
print("[%s] Discovered: %s" % (device.mac_address, device.alias()))
self.stop_discovery() # Stop searching
self.robot = RootDevice(mac_address=device.mac_address, manager=self)
self.robot.print_message()
self.robot.connect()
class RootDevice(gatt.Device):
def __init__(self, mac_address, manager, managed=True):
gatt.Device.__init__(self, mac_address, manager, managed=True)
# states for our robot primatives
self.currently_adjusting_for_perimeter = False
self.edge_following_enabled = False
self.patterning_lines_enabled = False
# variables in robot object for each robot
self.message = "no message yet"
self.last_packet = []
self.dash_count = 0
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def print_message(self):
print(self.message)
def connect_succeeded(self):
super().connect_succeeded()
print("[%s] Connected" % (self.mac_address))
def connect_failed(self, error):
super().connect_failed(error)
print("[%s] Connection failed: %s" % (self.mac_address, str(error)))
def disconnect_succeeded(self):
super().disconnect_succeeded()
print("[%s] Disconnected" % (self.mac_address))
def services_resolved(self):
super().services_resolved()
print("[%s] Resolved services" % (self.mac_address))
self.uart_service = next(
s for s in self.services
if s.uuid == uart_service_uuid)
self.tx_characteristic = next(
c for c in self.uart_service.characteristics
if c.uuid == tx_characteristic_uuid)
self.rx_characteristic = next(
c for c in self.uart_service.characteristics
if c.uuid == rx_characteristic_uuid)
self.rx_characteristic.enable_notifications() # listen to RX messages
def characteristic_value_updated(self, characteristic, value):
# Print sensor data
if new_data[0] == DATA_TYPE_COLOR_SENSOR : type = "Color Sensor"
if new_data[0] == DATA_TYPE_BUMPER: type = "Bumper"
if new_data[0] == DATA_TYPE_LIGHT: type = "Light Sensor"
if new_data[0] == DATA_TYPE_TOUCH: type = "Touch Sensor"
if new_data[0] == DATA_TYPE_CLIFF: type = "Cliff Sensor"
## #attempt to back out of cliff even
## if new_data[0] == DATA_TYPE_CLIFF:
# Perimeter detect, then other color functions
# check if you're at the perimeter every time there is new color data
# save COLOR INFO to object for future use
self.last_packet = new_data
self.adjust_for_perimeter_if_needed(new_data)
## if type == "Color Sensor" and self.edge_following_enabled:
## print("new data - calling edge following")
## self.follow_edge(new_data)
## if type == "Color Sensor" and self.patterning_lines_enabled:
## self.pattern_dashes(new_data, self.dash_count)
def adjust_for_perimeter_if_needed(self, message):
# Check to see if you're at perimeter!
# If you are, turn and update your state
print("{}: {}".format(type, message))
n = 0
for i in range(3,18):
if message[i] in color_red:
n += 1
print("all - red", n)
if n > 13: #number of sensors read the color
self.drive_backwards()
self.currently_adjusting_for_perimeter = True
return
n = 0
for i in range(3,8):
if message[i] in color_red:
print("all - left", n)
if n > 3:
self.steer(90,10)
self.currently_adjusting_for_perimeter = True
print("steering left")
return
for i in range(13,18):
if message[i] in color_red:
print("all - right", n)
if n > 3:
print("in n > 3")
self.steer(10,90)
self.currently_adjusting_for_perimeter = True
print("steering right")
return
print("OUT OF LOOP")
if self.currently_adjusting_for_perimeter:
self.currently_adjusting_for_perimeter = False
## self.stop()
self.drive_forward()
return
while time.time() < t_end:
self.drive_forward()
self.stop()
print("pen-down, driving")
self.pen_up()
while time.time() < t_end:
self.drive_forward()
self.stop()
print("pen up, drive forward")
self.stop()
else:
self.patterning_lines_enabled = False
self.dash_count += self.dash_count
if message[3] != 0 or message[4] != 0:
#drive_root("l")
#manager.robot.drive_left_slow()
#drive_root("z")
turn_state = 1
elif message[5] != 0 or message[6] != 0:
#drive_root("l")
#manager.robot.drive_left_slow_slow()
#drive_root("x")
turn_state = 1
#drive_root("f")
#print("test1")
elif message[7] != 0 or message[8] != 0:
#drive_root("f") # calling the member function versus out into global space
self.drive_forward()
elif message[14] != 0 or message[13] != 0:
#drive_root("r")
#manager.robot.drive_right_slow_slow()
#drive_root("c")
turn_state = 2
#drive_root("f")
#print("test2")
elif message[16] != 0 or message[15] != 0:
#drive_root("r")
#manager.robot.drive_right_slow()
#drive_root("v")
turn_state = 2
else:
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def drive_forward(self):
self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xD1])
def drive_left(self):
self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x8A])
def drive_right(self):
self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x25])
def stop(self):
self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x7E])
def drive_backwards(self):
self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0xFF, 0xFF, 0xFF, 0x9C, 0xFF, 0xFF, 0xFF, 0x9C, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x71])
def pen_up(self):
self.tx_characteristic.write_value([0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
def pen_down(self):
self.tx_characteristic.write_value([0x02, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
def turn_rate(self, rate):
left = 0
right = 0
if rate >= 0:
left = rate
if rate < 0:
right = -1*rate
leftbytes = left.to_bytes(4,byteorder='big',signed=True) # need to convert to byte string
rightbytes = right.to_bytes(4,byteorder='big',signed=True)
# note that we're not dynamically calculating the CRC at the end, so just leaving it 0 (unchecked)
self.tx_characteristic.write_value([0x01, 0x04, 0x00, leftbytes[0], leftbytes[1], leftbytes[2], leftbytes[3], rightbytes[0], rightbytes[1], rightbytes[2], rightbytes[3], 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0])
def steer(self, left, right):
leftbytes = left.to_bytes(4,byteorder='big',signed=True) # need to convert to byte string
rightbytes = right.to_bytes(4,byteorder='big',signed=True)
# note that we're not dynamically calculating the CRC at the end, so just leaving it 0 (unchecked)
self.tx_characteristic.write_value([0x01, 0x04, 0x00, leftbytes[0], leftbytes[1], leftbytes[2], leftbytes[3], rightbytes[0], rightbytes[1], rightbytes[2], rightbytes[3], 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0])
def drive_root(command):
angle = 0
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
if command == "f":
print ("Drive forward")
manager.robot.drive_forward()
if command == "b":
print ("Drive backwards")
manager.robot.drive_backwards()
if command == "r":
print ("Drive right")
manager.robot.drive_right()
if command == "l":
print ("Drive left")
manager.robot.drive_left()
if command == "s":
print ("Stop")
manager.robot.stop()
if command == "u":
print ("Pen up")
manager.robot.pen_up()
if command == "d":
print ("Pen down")
manager.robot.pen_down()
if command == "t":
print ("Enter turn rate (up to +-90):")
char = input()
rate = int(char)
print ("Turning ", rate)
manager.robot.turn_rate(rate)
if command == "p":
print ("Steer")
manager.robot.steer(30,40)
manager.robot.patterning_lines_enabled = False
if command == "edge":
print ("edge")
manager.robot.edge_following_enabled = True
if command == "pattern lines":
print ("pattern lines")
manager.robot.last_packet = 0
dash_count = manager.robot.last_packet
manager.robot.patterning_lines_enabled = True
# start here if run as program / not imported as module
if __name__ == "__main__":
manager = BluetoothDeviceManager(adapter_name = 'hci0')
manager.start_discovery(service_uuids=[root_identifier_uuid])
thread = threading.Thread(target = manager.run)
thread.start()
char = ""
try:
while manager.robot is None:
pass # wait for a root robot to be discovered
print("Press letter (f,b,l,r) to drive robot (t) to turn, (s) to stop, (u or d) raise pen up or down, (q) to quit")
while char != "q":
char = input() # wait for keyboard input
drive_root(char)
print('yay!')
print("Quitting")
manager.stop()
manager.robot.disconnect()
thread.join()
except KeyboardInterrupt:
pass