Skip to content
Snippets Groups Projects
run-solleroot.py 12.5 KiB
Newer Older
import gatt
import threading
Sara Falcone's avatar
Sara Falcone committed
import time 

# BLE UUID's
root_identifier_uuid = '48c5d828-ac2a-442d-97a3-0c9822b04979'
uart_service_uuid = '6e400001-b5a3-f393-e0a9-e50e24dcca9e'
tx_characteristic_uuid = '6e400002-b5a3-f393-e0a9-e50e24dcca9e' # Write
rx_characteristic_uuid = '6e400003-b5a3-f393-e0a9-e50e24dcca9e' # Notify

# message types
Sara Falcone's avatar
Sara Falcone committed
DATA_TYPE_COLOR_SENSOR = 4
DATA_TYPE_BUMPER = 12
DATA_TYPE_LIGHT = 13
DATA_TYPE_TOUCH = 17
DATA_TYPE_CLIFF = 20

# color classifications
color_black = [17, 68, 65, 20, 49, 67, 52]
color_red = [34, 2, 32]
color_green = [51, 48, 3]
color_blue = [4, 64, 68]
class BluetoothDeviceManager(gatt.DeviceManager):
    robot = None # root robot device

    def device_discovered(self, device):
        print("[%s] Discovered: %s" % (device.mac_address, device.alias()))
        self.stop_discovery() # Stop searching
        self.robot = RootDevice(mac_address=device.mac_address, manager=self)
        self.robot.print_message()
        self.robot.connect()

class RootDevice(gatt.Device):
Sara Falcone's avatar
Sara Falcone committed
    def __init__(self, mac_address, manager, managed=True):
        gatt.Device.__init__(self, mac_address, manager, managed=True)
        
        # states for our robot primatives
        self.currently_adjusting_for_perimeter = False
        self.edge_following_enabled = False
        self.patterning_lines_enabled = False
    
        # variables in robot object for each robot
        self.message = "no message yet"
        self.last_packet = []
        self.dash_count = 0
    def print_message(self):
        print(self.message)
    
    def connect_succeeded(self):
        super().connect_succeeded()
        print("[%s] Connected" % (self.mac_address))

    def connect_failed(self, error):
        super().connect_failed(error)
        print("[%s] Connection failed: %s" % (self.mac_address, str(error)))

    def disconnect_succeeded(self):
        super().disconnect_succeeded()
        print("[%s] Disconnected" % (self.mac_address))

    def services_resolved(self):
        super().services_resolved()
        print("[%s] Resolved services" % (self.mac_address))

        self.uart_service = next(
            s for s in self.services
            if s.uuid == uart_service_uuid)

        self.tx_characteristic = next(
            c for c in self.uart_service.characteristics
            if c.uuid == tx_characteristic_uuid)

        self.rx_characteristic = next(
            c for c in self.uart_service.characteristics
            if c.uuid == rx_characteristic_uuid)

        self.rx_characteristic.enable_notifications() # listen to RX messages

    def characteristic_value_updated(self, characteristic, value):
Sara Falcone's avatar
Sara Falcone committed
        new_data = []
        type = ""
        for byte in value:
Sara Falcone's avatar
Sara Falcone committed
            new_data.append(byte)
#        print ("Messages from Root:")
Sara Falcone's avatar
Sara Falcone committed
        # Print sensor data
        if new_data[0] == DATA_TYPE_COLOR_SENSOR : type = "Color Sensor"
        if new_data[0] == DATA_TYPE_BUMPER: type = "Bumper"
        if new_data[0] == DATA_TYPE_LIGHT: type = "Light Sensor"
        if new_data[0] == DATA_TYPE_TOUCH: type = "Touch Sensor"
        if new_data[0] == DATA_TYPE_CLIFF: type = "Cliff Sensor"
Sara Falcone's avatar
Sara Falcone committed
##        print("{}: {}".format(type, new_data))
Sara Falcone's avatar
Sara Falcone committed
        
##        #attempt to back out of cliff even
##        if new_data[0] == DATA_TYPE_CLIFF:
Sara Falcone's avatar
Sara Falcone committed
##            self.drive_backwards()
Sara Falcone's avatar
Sara Falcone committed
##            print("end cliff event")
            
        # Perimeter detect, then other color functions
        # check if you're at the perimeter every time there is new color data
Sara Falcone's avatar
Sara Falcone committed
        if new_data[0] == DATA_TYPE_COLOR_SENSOR:
Sara Falcone's avatar
Sara Falcone committed
            # save COLOR INFO to object for future use
            self.last_packet = new_data
Sara Falcone's avatar
Sara Falcone committed
            self.adjust_for_perimeter_if_needed(new_data)

##        if type == "Color Sensor" and self.edge_following_enabled:
##            print("new data - calling edge following")
##            self.follow_edge(new_data)
##        if type == "Color Sensor" and self.patterning_lines_enabled:
##            self.pattern_dashes(new_data, self.dash_count)
        
      
    def adjust_for_perimeter_if_needed(self, message):
        # Check to see if you're at perimeter!
        # If you are, turn and update your state
        print("IN ADJUST PERIMETER")
        print("{}: {}".format(type, message))
        
Sara Falcone's avatar
Sara Falcone committed
        n = 0
        for i in range(3,18):
            if message[i] in color_red:
                n += 1
                print("all - red", n)
        if n > 13: #number of sensors read the color
Sara Falcone's avatar
Sara Falcone committed
            print("in all colors are red")
Sara Falcone's avatar
Sara Falcone committed
            self.drive_backwards()
            self.currently_adjusting_for_perimeter = True
            return
Sara Falcone's avatar
Sara Falcone committed
        n = 0
        for i in range(3,8):
            if message[i] in color_red:
Sara Falcone's avatar
Sara Falcone committed
                print("all - left", n)
        if n > 3:
            self.steer(90,10)
            self.currently_adjusting_for_perimeter = True
            print("steering left")
            return    
Sara Falcone's avatar
Sara Falcone committed
        for i in range(13,18):
            if message[i] in color_red:
Sara Falcone's avatar
Sara Falcone committed
                print("all - right", n)
        if n > 3:
            print("in n > 3")
            self.steer(10,90)
            self.currently_adjusting_for_perimeter = True
            print("steering right")
            return
        print("OUT OF LOOP")
        if self.currently_adjusting_for_perimeter:
            self.currently_adjusting_for_perimeter = False
##            self.stop()
            self.drive_forward()
            return
Sara Falcone's avatar
Sara Falcone committed
    def pattern_dashes(self, message, count):
Sara Falcone's avatar
Sara Falcone committed
        # pen down, pen up, so many times
Sara Falcone's avatar
Sara Falcone committed
        print("count:", count)
        num_dashes_wanted = 4
Sara Falcone's avatar
Sara Falcone committed
        if count < num_dashes_wanted:
Sara Falcone's avatar
Sara Falcone committed
            self.pen_down()
Sara Falcone's avatar
Sara Falcone committed
            t_end = time.time() + 2
Sara Falcone's avatar
Sara Falcone committed
            while time.time() < t_end:
                self.drive_forward()
            self.stop()
            print("pen-down, driving")
            self.pen_up()
Sara Falcone's avatar
Sara Falcone committed
            t_end = time.time() + 2
Sara Falcone's avatar
Sara Falcone committed
            while time.time() < t_end:
                self.drive_forward()
            self.stop()
            print("pen up, drive forward")
Sara Falcone's avatar
Sara Falcone committed
            self.stop()
        else:
            self.patterning_lines_enabled = False
        
        self.dash_count += self.dash_count
Sara Falcone's avatar
Sara Falcone committed
        

    def follow_edge(self, message):
        print("IN FOLLOW EDGE")
Sara Falcone's avatar
Sara Falcone committed
        if message[3] != 0 or message[4] != 0:
            #drive_root("l")
            self.turn_rate(-45)
Sara Falcone's avatar
Sara Falcone committed
            #manager.robot.drive_left_slow()
            #drive_root("z")
            turn_state = 1  
        elif message[5] != 0 or message[6] != 0:
            #drive_root("l")
            self.turn_rate(-30)
Sara Falcone's avatar
Sara Falcone committed
            #manager.robot.drive_left_slow_slow()
            #drive_root("x")
            turn_state = 1                
            #drive_root("f")
            #print("test1")
        elif message[7] != 0 or message[8] != 0:
            #drive_root("f") # calling the member function versus out into global space
            self.drive_forward()
Sara Falcone's avatar
Sara Falcone committed
        elif message[9] != 0 or message[10] != 0:
            self.turn_rate(30)
Sara Falcone's avatar
Sara Falcone committed
            
        elif message[12] != 0 or message[11] != 0:
            self.turn_rate(45)
Sara Falcone's avatar
Sara Falcone committed
        elif message[14] != 0 or message[13] != 0:
            #drive_root("r")
            self.turn_rate(60)
Sara Falcone's avatar
Sara Falcone committed
            #manager.robot.drive_right_slow_slow()
            #drive_root("c")
            turn_state = 2                
            #drive_root("f")
            #print("test2")
        elif message[16] != 0 or message[15] != 0:
            #drive_root("r")
            self.turn_rate(60)
Sara Falcone's avatar
Sara Falcone committed
            #manager.robot.drive_right_slow()
            #drive_root("v")
            turn_state = 2
        else:
             self.drive_forward()

    def drive_forward(self):
        self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xD1])

    def drive_left(self):
        self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x8A])

    def drive_right(self):
        self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x25])

    def stop(self):
        self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x7E])

    def drive_backwards(self):
        self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0xFF, 0xFF, 0xFF, 0x9C, 0xFF, 0xFF, 0xFF, 0x9C, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x71])

    def pen_up(self):
        self.tx_characteristic.write_value([0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])

    def pen_down(self):
        self.tx_characteristic.write_value([0x02, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
        
        
    def turn_rate(self, rate):
        left = 0
        right = 0
        if rate >= 0:
            left = rate
        if rate < 0:
            right = -1*rate
        leftbytes = left.to_bytes(4,byteorder='big',signed=True)  # need to convert to byte string
        rightbytes = right.to_bytes(4,byteorder='big',signed=True)
        # note that we're not dynamically calculating the CRC at the end, so just leaving it 0 (unchecked)
        self.tx_characteristic.write_value([0x01, 0x04, 0x00, leftbytes[0], leftbytes[1], leftbytes[2], leftbytes[3], rightbytes[0], rightbytes[1], rightbytes[2], rightbytes[3], 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0])

    def steer(self, left, right):
        leftbytes = left.to_bytes(4,byteorder='big',signed=True)  # need to convert to byte string
        rightbytes = right.to_bytes(4,byteorder='big',signed=True)
        # note that we're not dynamically calculating the CRC at the end, so just leaving it 0 (unchecked)
        self.tx_characteristic.write_value([0x01, 0x04, 0x00, leftbytes[0], leftbytes[1], leftbytes[2], leftbytes[3], rightbytes[0], rightbytes[1], rightbytes[2], rightbytes[3], 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0])


def drive_root(command):
    angle = 0
Sara Falcone's avatar
Sara Falcone committed
    last_packet = manager.robot.last_packet
    
    if command == "f":
        print ("Drive forward")
        manager.robot.drive_forward()
    if command == "b":
        print ("Drive backwards")
        manager.robot.drive_backwards()
    if command == "r":
        print ("Drive right")
        manager.robot.drive_right()
    if command == "l":
        print ("Drive left")
        manager.robot.drive_left()
    if command == "s":
        print ("Stop")
        manager.robot.stop()
    if command == "u":
        print ("Pen up")
        manager.robot.pen_up()
    if command == "d":
        print ("Pen down")
        manager.robot.pen_down()
    if command == "t":
        print ("Enter turn rate (up to +-90):")
        char = input()
        rate = int(char)
        print ("Turning ", rate)
        manager.robot.turn_rate(rate)
    if command == "p":
        print ("Steer")
        manager.robot.steer(30,40)
Sara Falcone's avatar
Sara Falcone committed
    if command == "clear":
        print ("clear all")
        manager.robot.stop()
Sara Falcone's avatar
Sara Falcone committed
        manager.robot.edge_following_enabled = False
        manager.robot.patterning_lines_enabled = False
Sara Falcone's avatar
Sara Falcone committed
    if command == "edge":
        print ("edge")
        manager.robot.edge_following_enabled = True
Sara Falcone's avatar
Sara Falcone committed
        manager.robot.follow_edge(last_packet)
    if command == "pattern lines":
        print ("pattern lines")
Sara Falcone's avatar
Sara Falcone committed
        manager.robot.last_packet = 0
        dash_count = manager.robot.last_packet
        manager.robot.patterning_lines_enabled = True
Sara Falcone's avatar
Sara Falcone committed
        manager.robot.pattern_dashes(last_packet, dash_count)
# start here if run as program / not imported as module
if __name__ == "__main__":
    manager = BluetoothDeviceManager(adapter_name = 'hci0')
    manager.start_discovery(service_uuids=[root_identifier_uuid])
    thread = threading.Thread(target = manager.run)
    thread.start()
    char = ""
    try:
        while manager.robot is None:
            pass # wait for a root robot to be discovered
        print("Press letter (f,b,l,r) to drive robot (t) to turn, (s) to stop, (u or d) raise pen up or down, (q) to quit")
        while char != "q":
            char = input() # wait for keyboard input
            print('waiting for keyboard input')
            drive_root(char)
            print('yay!')
        print("Quitting")
        manager.stop()
        manager.robot.disconnect()
        thread.join()
    except KeyboardInterrupt:
        pass