Skip to content
Snippets Groups Projects
run-solleroot.py 9.21 KiB
Newer Older
  • Learn to ignore specific revisions
  • #!/usr/bin/env python3
    
    import gatt
    import threading
    import time
    
    
    # BLE UUID's
    root_identifier_uuid = '48c5d828-ac2a-442d-97a3-0c9822b04979'
    uart_service_uuid = '6e400001-b5a3-f393-e0a9-e50e24dcca9e'
    tx_characteristic_uuid = '6e400002-b5a3-f393-e0a9-e50e24dcca9e' # Write
    rx_characteristic_uuid = '6e400003-b5a3-f393-e0a9-e50e24dcca9e' # Notify
    
    
    Sara Falcone's avatar
    Sara Falcone committed
    
    DATA_TYPE_COLOR_SENSOR = 4
    DATA_TYPE_BUMPER = 12
    DATA_TYPE_LIGHT = 13
    DATA_TYPE_TOUCH = 17
    DATA_TYPE_CLIFF = 20
    
    
    class BluetoothDeviceManager(gatt.DeviceManager):
        robot = None # root robot device
    
        def device_discovered(self, device):
            print("[%s] Discovered: %s" % (device.mac_address, device.alias()))
            self.stop_discovery() # Stop searching
            self.robot = RootDevice(mac_address=device.mac_address, manager=self)
            print('I HAVE BEEN MADE')
            self.robot.print_message()
            self.robot.connect()
    
    class RootDevice(gatt.Device):
        message = "no message"
    
    Sara Falcone's avatar
    Sara Falcone committed
        edge_following_enabled = False
        
    
        def print_message(self):
            print(self.message)
        
        def connect_succeeded(self):
            super().connect_succeeded()
            print("[%s] Connected" % (self.mac_address))
    
        def connect_failed(self, error):
            super().connect_failed(error)
            print("[%s] Connection failed: %s" % (self.mac_address, str(error)))
    
        def disconnect_succeeded(self):
            super().disconnect_succeeded()
            print("[%s] Disconnected" % (self.mac_address))
    
        def services_resolved(self):
            super().services_resolved()
            print("[%s] Resolved services" % (self.mac_address))
    
            self.uart_service = next(
                s for s in self.services
                if s.uuid == uart_service_uuid)
    
            self.tx_characteristic = next(
                c for c in self.uart_service.characteristics
                if c.uuid == tx_characteristic_uuid)
    
            self.rx_characteristic = next(
                c for c in self.uart_service.characteristics
                if c.uuid == rx_characteristic_uuid)
    
            self.rx_characteristic.enable_notifications() # listen to RX messages
    
        def characteristic_value_updated(self, characteristic, value):
    
    Sara Falcone's avatar
    Sara Falcone committed
            new_data = []
    
            type = ""
            for byte in value:
    
    Sara Falcone's avatar
    Sara Falcone committed
                new_data.append(byte)
    
    #        print ("Messages from Root:")
    
    Sara Falcone's avatar
    Sara Falcone committed
            # Print sensor data
            if new_data[0] == DATA_TYPE_COLOR_SENSOR : type = "Color Sensor"
            if new_data[0] == DATA_TYPE_BUMPER: type = "Bumper"
            if new_data[0] == DATA_TYPE_LIGHT: type = "Light Sensor"
            if new_data[0] == DATA_TYPE_TOUCH: type = "Touch Sensor"
            if new_data[0] == DATA_TYPE_CLIFF: type = "Cliff Sensor"
    #        print("{}: {}".format(type, new_data))
    
            # Do the thing!
            if new_data[0] == DATA_TYPE_COLOR_SENSOR:
                did_adjust = adjust_for_perimeter_if_needed(new_data)
                if did_adjust:
                    return
            if type == "Color Sensor" and self.edge_following_enabled:
                self.follow_edge(new_data)
                
                
        def adjust_for_perimeter_if_needed(self, data):
            adjusted = False
            
            # TODO: Check to see if you're at perimeter! If you are, update your state and set
            # adjusted = True
    
    Sara Falcone's avatar
    Sara Falcone committed
            return adjusted
            
    
        def follow_edge(self, message):
            if message[3] != 0 or message[4] != 0:
                #drive_root("l")
                manager.robot.turn_rate(-45)
                #manager.robot.drive_left_slow()
                #drive_root("z")
                turn_state = 1  
            elif message[5] != 0 or message[6] != 0:
                #drive_root("l")
                manager.robot.turn_rate(-30)
                #manager.robot.drive_left_slow_slow()
                #drive_root("x")
                turn_state = 1                
                #drive_root("f")
                #print("test1")
            elif message[7] != 0 or message[8] != 0:
                drive_root("f")
                
            elif message[9] != 0 or message[10] != 0:
                manager.robot.turn_rate(30)
                
            elif message[12] != 0 or message[11] != 0:
                manager.robot.turn_rate(45)
            elif message[14] != 0 or message[13] != 0:
                #drive_root("r")
                manager.robot.turn_rate(60)
                #manager.robot.drive_right_slow_slow()
                #drive_root("c")
                turn_state = 2                
                #drive_root("f")
                #print("test2")
            elif message[16] != 0 or message[15] != 0:
                #drive_root("r")
                manager.robot.turn_rate(60)
                #manager.robot.drive_right_slow()
                #drive_root("v")
                turn_state = 2
            else:
                 drive_root("f")
                 #manager.robot.drive_forward_slow()
                 #drive_root("a")
    
    
        def drive_forward(self):
            self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xD1])
    
        def drive_left(self):
            self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x8A])
    
        def drive_right(self):
            self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x25])
    
        def stop(self):
            self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x7E])
    
        def drive_backwards(self):
            self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0xFF, 0xFF, 0xFF, 0x9C, 0xFF, 0xFF, 0xFF, 0x9C, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x71])
    
        def pen_up(self):
            self.tx_characteristic.write_value([0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
    
        def pen_down(self):
            self.tx_characteristic.write_value([0x02, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
    
    #   def pattern_fill_straight(self):
           # pen_down(self)
           # Timer(30, drive_forward(self))
            
        
            
            
        def turn_rate(self, rate):
            left = 0
            right = 0
            if rate >= 0:
                left = rate
            if rate < 0:
                right = -1*rate
            leftbytes = left.to_bytes(4,byteorder='big',signed=True)  # need to convert to byte string
            rightbytes = right.to_bytes(4,byteorder='big',signed=True)
            # note that we're not dynamically calculating the CRC at the end, so just leaving it 0 (unchecked)
            self.tx_characteristic.write_value([0x01, 0x04, 0x00, leftbytes[0], leftbytes[1], leftbytes[2], leftbytes[3], rightbytes[0], rightbytes[1], rightbytes[2], rightbytes[3], 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0])
    
        def steer(self, left, right):
            leftbytes = left.to_bytes(4,byteorder='big',signed=True)  # need to convert to byte string
            rightbytes = right.to_bytes(4,byteorder='big',signed=True)
            # note that we're not dynamically calculating the CRC at the end, so just leaving it 0 (unchecked)
            self.tx_characteristic.write_value([0x01, 0x04, 0x00, leftbytes[0], leftbytes[1], leftbytes[2], leftbytes[3], rightbytes[0], rightbytes[1], rightbytes[2], rightbytes[3], 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0])
    
    
    def drive_root(command):
        angle = 0
        if command == "f":
            print ("Drive forward")
            manager.robot.drive_forward()
        if command == "b":
            print ("Drive backwards")
            manager.robot.drive_backwards()
        if command == "r":
            print ("Drive right")
            manager.robot.drive_right()
        if command == "l":
            print ("Drive left")
            manager.robot.drive_left()
        if command == "s":
            print ("Stop")
            manager.robot.stop()
        if command == "u":
            print ("Pen up")
            manager.robot.pen_up()
        if command == "d":
            print ("Pen down")
            manager.robot.pen_down()
        if command == "t":
            print ("Enter turn rate (up to +-90):")
            char = input()
            rate = int(char)
            print ("Turning ", rate)
            manager.robot.turn_rate(rate)
        if command == "p":
            print ("Steer")
            manager.robot.steer(30,40)
    
    Sara Falcone's avatar
    Sara Falcone committed
        if command == "clear":
            print ("clear all")
            manager.robot.edge_following_enabled = False
        if command == "edge":
            print ("edge")
            manager.robot.edge_following_enabled = True
            
    
    # start here if run as program / not imported as module
    if __name__ == "__main__":
        manager = BluetoothDeviceManager(adapter_name = 'hci0')
        manager.start_discovery(service_uuids=[root_identifier_uuid])
        thread = threading.Thread(target = manager.run)
        thread.start()
        char = ""
        try:
            while manager.robot is None:
                pass # wait for a root robot to be discovered
            print("Press letter (f,b,l,r) to drive robot (t) to turn, (s) to stop, (u or d) raise pen up or down, (q) to quit")
            while char != "q":
                char = input() # wait for keyboard input
                print('predrive')
                drive_root(char)
                manager.robot.print_message()
                print('yay!')
            print("Quitting")
            manager.stop()
            manager.robot.disconnect()
            thread.join()
        except KeyboardInterrupt:
            pass