1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#!/usr/bin/python3
"""Basic control of the robot, move forward and avoid obstacles. Doesn't get stuck"""
# imports
import rclpy
import math as m
from rclpy.node import Node
from sensor_msgs.msg import PointCloud2
from geometry_msgs.msg import Twist
import sensor_msgs_py.point_cloud2
import time
# main class
class ReactiveMove(Node):
MAX_STUCK_TIME = 5 # Time before the robot is considered to be stuck and has to make a U-turn
# Rectangle of vision of the robot
RECT_WIDTH = 0.6
RECT_LENGTH = 0.8
# Rectangle of speeds of the robot
MIN_DIST_MOVE_FORWARD = 0.3
MAX_DIST_MOVE_FORWARD = RECT_LENGTH
# Robot velocities
ANGULAR_VELOCITY = 1.0
LINEAR_VELOCITY = 0.3
def __init__(self, name="reactive_move", timerFreq = 1/60.0):
""" constructor """
super().__init__(name) # Create the node
# Initialize a subscriber
self.create_subscription(PointCloud2, 'scan_points', self.scan_callback, 10)
# Initialize a publisher
self._velocity_publisher = self.create_publisher(Twist, 'cmd_vel', 10)
# Initialize a clock for the publisher
self.create_timer(timerFreq, self.publish_velo)
# Initialize variables
self._timerFreq = timerFreq
self._previousScanTime = None
self._points = []
self._firstPointSeen = None
def scan_callback(self, scanMsg):
"""
Save received points for future calculations
"""
self._points = sensor_msgs_py.point_cloud2.read_points(scanMsg)
self._previousScanTime = time.time()
def calcul_velo(self):
"""
Calculate the velocity based on the lidar detected points and other data
"""
now = time.time()
# If scan data received more than 1s ago, stop the robot
if now - self._previousScanTime > 1:
print("No data received for more than 1s")
return Twist() # Return a zero velocity
# Default case
x, y = firstPointInFront(self._points, self.RECT_LENGTH, self.RECT_WIDTH / 2.0)
if(not self._firstPointSeen): #If you were previously moving straight without obstacles
self._firstPointSeen = (x,y)
velocity = Twist()
if x and y: # If there is a point in front at the moment
# Linear movement
if x < self.MIN_DIST_MOVE_FORWARD:
velocity.linear.x = 0.0 #If the wall is too close, don't go forward, only rotate
else:
coefficient = (x - self.MIN_DIST_MOVE_FORWARD) / (self.MAX_DIST_MOVE_FORWARD - self.MIN_DIST_MOVE_FORWARD) #linear between 0 (MIN_DIST_MOVE_FORWARD) and 1 (MAX_DIST_MOVE_FORWARD)
velocity.linear.x = coefficient * self.LINEAR_VELOCITY
# Rotation, based on the first point seen
firstY = self._firstPointSeen[1]
if firstY >= 0: # Point was in the left part of the rectangle -> Continue turning right until there is no point in front
velocity.angular.z = -self.ANGULAR_VELOCITY
else: # Point was in the right part of the rectangle -> Continue turning left until there is no point in front
velocity.angular.z = self.ANGULAR_VELOCITY
else: # If there's not point in front, go straight
velocity.linear.x = self.LINEAR_VELOCITY
self._firstPointSeen = None
return velocity
def publish_velo(self):
"""
Publish velocity independently of the callback
"""
# Make sure at least one callback is made
if not self._previousScanTime:
return -1
velocity = self.calcul_velo()
self._velocity_publisher.publish(velocity) # Move according to previously calculated velocity
return 0
# Static functions
def firstPointInFront(points:list, maxX, maxY):
"""
Check if points in front of the robot (in the rectangle with forward distance maxX and lateral distance of 2*maxY)
"""
# Closest point
cx, cy = None, None
closestDistance = 10000
for (x,y,_) in points:
pointDistance = m.sqrt(x**2 + y**2) #Get point distance to Lidar
isInRectangle = x > 0 and x < maxX and abs(y) < maxY
isCloser = (cx is None) or (pointDistance < closestDistance)
if(isInRectangle and isCloser): #If the point is closer, store it as the new closest point in the rectangle
cx,cy = x,y
closestDistance = pointDistance
return (cx, cy)
# Main loop
def main():
"""
Main loop
"""
rclpy.init()
rosNode = ReactiveMove()
rclpy.spin(rosNode)
rosNode.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()