Skip to content
Snippets Groups Projects
run-solleroot.py 11.1 KiB
Newer Older
  • Learn to ignore specific revisions
  • 
    import gatt
    import threading
    
    import time #not working...
    
    
    
    # BLE UUID's
    root_identifier_uuid = '48c5d828-ac2a-442d-97a3-0c9822b04979'
    uart_service_uuid = '6e400001-b5a3-f393-e0a9-e50e24dcca9e'
    tx_characteristic_uuid = '6e400002-b5a3-f393-e0a9-e50e24dcca9e' # Write
    rx_characteristic_uuid = '6e400003-b5a3-f393-e0a9-e50e24dcca9e' # Notify
    
    
    # message types
    
    Sara Falcone's avatar
    Sara Falcone committed
    DATA_TYPE_COLOR_SENSOR = 4
    DATA_TYPE_BUMPER = 12
    DATA_TYPE_LIGHT = 13
    DATA_TYPE_TOUCH = 17
    DATA_TYPE_CLIFF = 20
    
    
    # color classifications
    color_black = [17, 68]
    color_red = [34]
    color_green = [51]
    
    
    class BluetoothDeviceManager(gatt.DeviceManager):
        robot = None # root robot device
    
        def device_discovered(self, device):
            print("[%s] Discovered: %s" % (device.mac_address, device.alias()))
            self.stop_discovery() # Stop searching
            self.robot = RootDevice(mac_address=device.mac_address, manager=self)
            print('I HAVE BEEN MADE')
            self.robot.print_message()
            self.robot.connect()
    
    class RootDevice(gatt.Device):
    
        # states for our robot primatives
    
    Sara Falcone's avatar
    Sara Falcone committed
        edge_following_enabled = False
    
        patterning_lines_enabled = False
    
        # "message" allows last sensor readings to be stored in the robot object for each robot
        message = "no message yet"
    
        def print_message(self):
            print(self.message)
        
        def connect_succeeded(self):
            super().connect_succeeded()
            print("[%s] Connected" % (self.mac_address))
    
        def connect_failed(self, error):
            super().connect_failed(error)
            print("[%s] Connection failed: %s" % (self.mac_address, str(error)))
    
        def disconnect_succeeded(self):
            super().disconnect_succeeded()
            print("[%s] Disconnected" % (self.mac_address))
    
        def services_resolved(self):
            super().services_resolved()
            print("[%s] Resolved services" % (self.mac_address))
    
            self.uart_service = next(
                s for s in self.services
                if s.uuid == uart_service_uuid)
    
            self.tx_characteristic = next(
                c for c in self.uart_service.characteristics
                if c.uuid == tx_characteristic_uuid)
    
            self.rx_characteristic = next(
                c for c in self.uart_service.characteristics
                if c.uuid == rx_characteristic_uuid)
    
            self.rx_characteristic.enable_notifications() # listen to RX messages
    
        def characteristic_value_updated(self, characteristic, value):
    
    Sara Falcone's avatar
    Sara Falcone committed
            new_data = []
    
            type = ""
            for byte in value:
    
    Sara Falcone's avatar
    Sara Falcone committed
                new_data.append(byte)
    
    #        print ("Messages from Root:")
    
    Sara Falcone's avatar
    Sara Falcone committed
            # Print sensor data
            if new_data[0] == DATA_TYPE_COLOR_SENSOR : type = "Color Sensor"
            if new_data[0] == DATA_TYPE_BUMPER: type = "Bumper"
            if new_data[0] == DATA_TYPE_LIGHT: type = "Light Sensor"
            if new_data[0] == DATA_TYPE_TOUCH: type = "Touch Sensor"
            if new_data[0] == DATA_TYPE_CLIFF: type = "Cliff Sensor"
    #        print("{}: {}".format(type, new_data))
    
    
            # Perimeter detect, then other color functions
    
    Sara Falcone's avatar
    Sara Falcone committed
            if new_data[0] == DATA_TYPE_COLOR_SENSOR:
    
                print("new data - calling perimeter")
    
                did_adjust = self.adjust_for_perimeter_if_needed(new_data)
    
    Sara Falcone's avatar
    Sara Falcone committed
                if did_adjust:
                    return
            if type == "Color Sensor" and self.edge_following_enabled:
    
                print("new data - calling edge following")
    
    Sara Falcone's avatar
    Sara Falcone committed
                self.follow_edge(new_data)
    
            if type == "Color Sensor" and self.patterning_lines_enabled:
                self.pattern_dashes(new_data)
    
        def adjust_for_perimeter_if_needed(self, message):
            # Check to see if you're at perimeter! If you are, turn and update your state
            print("IN ADJUST PERIMETER")
            n = 0
            for i in range(3,5):
                if message[i] in color_black:
                    n += 1
                if n > 1:
                    self.turn_rate(-45)
                    adjusted = True
                else:
                    adjusted = False
            n = 0
            for i in range(16,18):
                if message[i] in color_black:
                    n += 1
                if n > 1:
                    self.turn_rate(45)
                    adjusted = True
                else:
                    adjusted = False   
    
    Sara Falcone's avatar
    Sara Falcone committed
            return adjusted
    
        def pattern_dashes(self, message):
    
            print("IN PATTERN DASHES")
    
            #pen down, pen up, so many times...
            count = 0
            while count < 3: # number of lines robot will draw
                print(count)
                t0 = time.time() #FIX TIME
                t1 = time.time()
                delta_t = t1 - t0
                print(delta_t)
                
                self.pen_down()
                self.drive_forward()
                while delta_t < 5000: # ms pen down
                    self.drive_forward()
                    t1 = time.time()
                    delta_t = t1 - t0
                t0 = time.time()
                print(count)
                self.pen_up()
                while delta_t < 3000: # ms pen up
                    t1 = time.time()
                    delta_t = t1 - t0
                count += 1
                self.pen_up()
            self.patterning_lines_enabled = False
    
    Sara Falcone's avatar
    Sara Falcone committed
            
    
        def follow_edge(self, message):
    
            print("IN FOLLOW EDGE")
    
    Sara Falcone's avatar
    Sara Falcone committed
            if message[3] != 0 or message[4] != 0:
                #drive_root("l")
    
                self.turn_rate(-45)
    
    Sara Falcone's avatar
    Sara Falcone committed
                #manager.robot.drive_left_slow()
                #drive_root("z")
                turn_state = 1  
            elif message[5] != 0 or message[6] != 0:
                #drive_root("l")
    
                self.turn_rate(-30)
    
    Sara Falcone's avatar
    Sara Falcone committed
                #manager.robot.drive_left_slow_slow()
                #drive_root("x")
                turn_state = 1                
                #drive_root("f")
                #print("test1")
            elif message[7] != 0 or message[8] != 0:
    
                #drive_root("f") # calling the member function versus out into global space
                self.drive_forward()
    
    Sara Falcone's avatar
    Sara Falcone committed
            elif message[9] != 0 or message[10] != 0:
    
                self.turn_rate(30)
    
    Sara Falcone's avatar
    Sara Falcone committed
                
            elif message[12] != 0 or message[11] != 0:
    
                self.turn_rate(45)
    
    Sara Falcone's avatar
    Sara Falcone committed
            elif message[14] != 0 or message[13] != 0:
                #drive_root("r")
    
                self.turn_rate(60)
    
    Sara Falcone's avatar
    Sara Falcone committed
                #manager.robot.drive_right_slow_slow()
                #drive_root("c")
                turn_state = 2                
                #drive_root("f")
                #print("test2")
            elif message[16] != 0 or message[15] != 0:
                #drive_root("r")
    
                self.turn_rate(60)
    
    Sara Falcone's avatar
    Sara Falcone committed
                #manager.robot.drive_right_slow()
                #drive_root("v")
                turn_state = 2
            else:
    
                 self.drive_forward()
    
    
        def drive_forward(self):
            self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xD1])
    
        def drive_left(self):
            self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x8A])
    
        def drive_right(self):
            self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x25])
    
        def stop(self):
            self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x7E])
    
        def drive_backwards(self):
            self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0xFF, 0xFF, 0xFF, 0x9C, 0xFF, 0xFF, 0xFF, 0x9C, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x71])
    
        def pen_up(self):
            self.tx_characteristic.write_value([0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
    
        def pen_down(self):
            self.tx_characteristic.write_value([0x02, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
            
            
        def turn_rate(self, rate):
            left = 0
            right = 0
            if rate >= 0:
                left = rate
            if rate < 0:
                right = -1*rate
            leftbytes = left.to_bytes(4,byteorder='big',signed=True)  # need to convert to byte string
            rightbytes = right.to_bytes(4,byteorder='big',signed=True)
            # note that we're not dynamically calculating the CRC at the end, so just leaving it 0 (unchecked)
            self.tx_characteristic.write_value([0x01, 0x04, 0x00, leftbytes[0], leftbytes[1], leftbytes[2], leftbytes[3], rightbytes[0], rightbytes[1], rightbytes[2], rightbytes[3], 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0])
    
        def steer(self, left, right):
            leftbytes = left.to_bytes(4,byteorder='big',signed=True)  # need to convert to byte string
            rightbytes = right.to_bytes(4,byteorder='big',signed=True)
            # note that we're not dynamically calculating the CRC at the end, so just leaving it 0 (unchecked)
            self.tx_characteristic.write_value([0x01, 0x04, 0x00, leftbytes[0], leftbytes[1], leftbytes[2], leftbytes[3], rightbytes[0], rightbytes[1], rightbytes[2], rightbytes[3], 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0])
    
    
    def drive_root(command):
        angle = 0
        if command == "f":
            print ("Drive forward")
            manager.robot.drive_forward()
        if command == "b":
            print ("Drive backwards")
            manager.robot.drive_backwards()
        if command == "r":
            print ("Drive right")
            manager.robot.drive_right()
        if command == "l":
            print ("Drive left")
            manager.robot.drive_left()
        if command == "s":
            print ("Stop")
            manager.robot.stop()
        if command == "u":
            print ("Pen up")
            manager.robot.pen_up()
        if command == "d":
            print ("Pen down")
            manager.robot.pen_down()
        if command == "t":
            print ("Enter turn rate (up to +-90):")
            char = input()
            rate = int(char)
            print ("Turning ", rate)
            manager.robot.turn_rate(rate)
        if command == "p":
            print ("Steer")
            manager.robot.steer(30,40)
    
    Sara Falcone's avatar
    Sara Falcone committed
        if command == "clear":
            print ("clear all")
    
            manager.robot.stop()
    
    Sara Falcone's avatar
    Sara Falcone committed
            manager.robot.edge_following_enabled = False
    
            manager.robot.patterning_lines_enabled = False
    
    Sara Falcone's avatar
    Sara Falcone committed
        if command == "edge":
            print ("edge")
            manager.robot.edge_following_enabled = True
    
        if command == "pattern lines":
            print ("pattern lines")
            manager.robot.patterning_lines_enabled = True
    
    
    # start here if run as program / not imported as module
    if __name__ == "__main__":
        manager = BluetoothDeviceManager(adapter_name = 'hci0')
        manager.start_discovery(service_uuids=[root_identifier_uuid])
        thread = threading.Thread(target = manager.run)
        thread.start()
        char = ""
        try:
            while manager.robot is None:
                pass # wait for a root robot to be discovered
            print("Press letter (f,b,l,r) to drive robot (t) to turn, (s) to stop, (u or d) raise pen up or down, (q) to quit")
            while char != "q":
                char = input() # wait for keyboard input
                print('predrive')
                drive_root(char)
                manager.robot.print_message()
                print('yay!')
            print("Quitting")
            manager.stop()
            manager.robot.disconnect()
            thread.join()
        except KeyboardInterrupt:
            pass