Commit 8c5c6b9e authored by Inès EL HADRI's avatar Inès EL HADRI 💤

Ajout du package 'grp_astro' pour le challenge 1

parent 69b69c42
cmake_minimum_required(VERSION 3.8)
project(grp_astro)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# find dependencies
find_package(ament_cmake REQUIRED)
# uncomment the following section in order to fill in
# further dependencies manually.
# find_package(<dependency> REQUIRED)
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
# the following line skips the linter which checks for copyrights
# comment the line when a copyright and license is added to all source files
set(ament_cmake_copyright_FOUND TRUE)
# the following line skips cpplint (only works in a git repo)
# comment the line when this package is in a git repo and when
# a copyright and license is added to all source files
set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies()
endif()
ament_package()
# Install launch files.
install(DIRECTORY
launch
DESTINATION share/${PROJECT_NAME}/
)
# Copy rviz2 config
install(DIRECTORY
config
DESTINATION share/${PROJECT_NAME}/
)
# Python scripts
install( PROGRAMS
src/cam_driver
src/cam_vision
src/reactive_move
src/scan2point_cloud
DESTINATION lib/${PROJECT_NAME}
)
\ No newline at end of file
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>grp_astro</name>
<version>1.0.0</version>
<description>Package du Challenge 1 - UV LARM</description>
<maintainer email="lucas.naury@etu.imt-nord-europe.fr">lucas.naury</maintainer>
<maintainer email="ines.el.hadri@etu.imt-nord-europe.fr">ines.el.hadri</maintainer>
<license>MIT</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<!--Dependencies-->
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>geometry_msgs</depend>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>
#!/usr/bin/python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
import pyrealsense2 as rs
import signal, time, numpy as np
import sys, cv2
from cv_bridge import CvBridge
class CameraDriver(Node):
def __init__(self, name="cam_driver", timerFreq = 1/60.0):
super().__init__(name) # Create the node
# Initialize publishers
self._rgb_publisher = self.create_publisher(Image, 'cam_rgb', 10)
self._depth_publisher = self.create_publisher(Image, 'cam_depth', 10)
self._infra_publisher_1 = self.create_publisher(Image, 'cam_infra_1',10)
self._infra_publisher_2 = self.create_publisher(Image, 'cam_infra_2',10)
# Initialize a clock for the publisher
self.create_timer(timerFreq, self.publish_imgs)
# ------------------------------ Initialize camera ------------------------------ #
## Configure depth and color streams
self._pipeline = rs.pipeline()
self._config = rs.config()
## Get device product line for setting a supporting resolution
pipeline_wrapper = rs.pipeline_wrapper(self._pipeline)
pipeline_profile = self._config.resolve(pipeline_wrapper)
device = pipeline_profile.get_device()
device_product_line = str(device.get_info(rs.camera_info.product_line))
print( f"Connect: {device_product_line}" )
found_rgb = True
for s in device.sensors:
print( "Name:" + s.get_info(rs.camera_info.name) )
if s.get_info(rs.camera_info.name) == 'RGB Camera':
found_rgb = True
if not (found_rgb):
print("Depth camera required !!!")
exit(0)
## Configure stream width, height, format and frequency
self._config.enable_stream(rs.stream.color, width=848, height=480, format=rs.format.bgr8, framerate=60)
self._config.enable_stream(rs.stream.depth, width=848, height=480, format=rs.format.z16, framerate=60)
self._config.enable_stream(rs.stream.infrared, 1, width=848, height=480, format=rs.format.y8, framerate=60)
self._config.enable_stream(rs.stream.infrared, 2, width=848, height=480, format=rs.format.y8, framerate=60)
## Start the acquisition
self._pipeline.start(self._config)
def read_imgs(self):
"""lire et traduire images camera"""
# Get frames
frames = self._pipeline.wait_for_frames()
# Split frames
color_frame = frames.first(rs.stream.color)
depth_frame = frames.first(rs.stream.depth)
infra_frame_1 = frames.get_infrared_frame(1)
infra_frame_2 = frames.get_infrared_frame(2)
if not (depth_frame and color_frame and infra_frame_1 and infra_frame_2):
print("eh oh le truc y marche pas")
return
# Convert images to numpy arrays
depth_image = np.asanyarray(depth_frame.get_data())
color_image = np.asanyarray(color_frame.get_data())
infra_image_1 = np.asanyarray(infra_frame_1.get_data())
infra_image_2 = np.asanyarray(infra_frame_2.get_data())
# Apply colormap on depth and IR images (image must be converted to 8-bit per pixel first)
depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_JET)
infra_colormap_1 = cv2.applyColorMap(cv2.convertScaleAbs(infra_image_1, alpha=1), cv2.COLORMAP_JET)
infra_colormap_2 = cv2.applyColorMap(cv2.convertScaleAbs(infra_image_2, alpha=1), cv2.COLORMAP_JET)
bridge=CvBridge()
# Convert to ROS2 Image type
# - RGB Image
self._rgb_image = bridge.cv2_to_imgmsg(color_image,"bgr8")
self._rgb_image.header.stamp = self.get_clock().now().to_msg()
self._rgb_image.header.frame_id = "image"
# - Depth Image
self._depth_image = bridge.cv2_to_imgmsg(depth_colormap, "bgr8")
self._depth_image.header.stamp = self._rgb_image.header.stamp
self._depth_image.header.frame_id = "depth"
# - IR1 Image
self._infra1_image = bridge.cv2_to_imgmsg(infra_colormap_1,"bgr8")
self._infra1_image.header.stamp = self._rgb_image.header.stamp
self._infra1_image.header.frame_id = "infrared_1"
# - IR2 Image
self._infra2_image = bridge.cv2_to_imgmsg(infra_colormap_2,"bgr8")
self._infra2_image.header.stamp = self._rgb_image.header.stamp
self._infra2_image.header.frame_id = "infrared_2"
return self._rgb_image, self._depth_image, self._infra1_image, self._infra2_image
def publish_imgs(self):
self.publish_rgb()
self.publish_depth()
self.publish_IR()
def publish_rgb(self):
"""
Publish RGB image
"""
self._rgb_publisher.publish(self._rgb_image)
return 0
def publish_depth(self):
"""
Publish depth colormap image
"""
self._depth_publisher.publish(self._depth_image)
return 0
def publish_IR(self):
"""
Publish infrared colormap image
"""
self._infra_publisher_1.publish(self._infra1_image)
self._infra_publisher_2.publish(self._infra2_image)
return 0
# Capture ctrl-c event
isOk=True
def signalInterruption(signum, frame):
global isOk
print( "\nCtrl-c pressed" )
isOk= False
def main():
"""
Main loop
"""
signal.signal(signal.SIGINT, signalInterruption)
rclpy.init()
rosNode = CameraDriver(timerFreq=1.0/120)
isOk = True
while isOk:
rosNode.read_imgs()
rclpy.spin_once(rosNode, timeout_sec=0.001)
rosNode.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()
\ No newline at end of file
#!/usr/bin/python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from std_msgs.msg import String
import numpy as np
import sys, cv2
from cv_bridge import CvBridge
class CameraVision(Node):
def __init__(self, name="cam_vision", timerFreq = 1/60.0):
super().__init__(name) # Create the node
# Initialiaze node variables
self._low = np.array([58-20, 60, 40])
self._high = np.array([58+20, 255,255])
self._kernel = np.ones((3, 3), np.uint8)
self._color_info = (0, 0, 255)
# Initialize subscribers
self.create_subscription(Image, 'cam_rgb', self.rgb_callback, 10)
# Initialize subcriber variables
self._rgb_img = None
# Initialize publishers
self._rgb_detection_publisher = self.create_publisher(Image, 'rgb_detection', 10)
self._mask_detection_publisher = self.create_publisher(Image, 'mask_detection', 10)
self._detection_publisher = self.create_publisher(String, 'detection', 10)
# Initialize a clock for the publisher
self.create_timer(timerFreq, self.loop)
# Initialize a ROS2 Image to CV2 image converter
self._bridge = CvBridge()
# Callbacks
def rgb_callback(self, img):
self._rgb_img = self._bridge.imgmsg_to_cv2(img, desired_encoding='passthrough')
# Node internal loop
def loop(self):
if self._rgb_img is None :
return
# Convert image to HSV for easier thresholding
image = cv2.cvtColor(self._rgb_img, cv2.COLOR_BGR2HSV)
# Seuillage par colorimétrie
mask = cv2.inRange(image, self._low, self._high)
# Réduction du bruit
mask = cv2.erode(mask, self._kernel, iterations=4)
mask = cv2.dilate(mask, self._kernel, iterations=4)
# Segmentation using the "Min enclosing circle" method
elements = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)[-2]
if len(elements) > 0: # If we detect elements
c = max(elements, key=cv2.contourArea)
(x, y), radius = cv2.minEnclosingCircle(c)
if radius > 20: #If the radius of the biggest element detected is greater than 30, we consider that we have detected a bottle
# Circle the bottle on the image
cv2.circle(image, (int(x), int(y)), int(radius), self._color_info, 2)
cv2.circle(image, (int(x), int(y)), 5, self._color_info, 10)
# Convert cv2 images to ROS2 RGB Image
ros2_mask_image = cv2.cvtColor(mask, cv2.COLOR_GRAY2RGB)
ros2_mask_image = cv2.cvtColor(ros2_mask_image, cv2.COLOR_RGB2BGR)
ros2_mask_image = self._bridge.cv2_to_imgmsg(ros2_mask_image,"bgr8")
ros2_mask_image.header.stamp = self.get_clock().now().to_msg()
ros2_mask_image.header.frame_id = "image"
ros2_rgb_image = cv2.cvtColor(image, cv2.COLOR_HSV2BGR)
ros2_rgb_image = self._bridge.cv2_to_imgmsg(ros2_rgb_image,"bgr8")
ros2_rgb_image.header.stamp = self.get_clock().now().to_msg()
ros2_rgb_image.header.frame_id = "image"
# Publish String message -> to inform that we have detected a bottle
str = String()
str.data = "A bottle has been detected"
self._detection_publisher.publish(str)
# Publish Images -> to be able to detect false positives and improve the algorithm
self._rgb_detection_publisher.publish(ros2_rgb_image)
self._mask_detection_publisher.publish(ros2_mask_image)
def main():
"""
Main loop
"""
rclpy.init()
rosNode = CameraVision(timerFreq=1.0/120)
rclpy.spin(rosNode)
rosNode.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()
\ No newline at end of file
#!/usr/bin/python3
"""Basic control of the robot, move forward and avoid obstacles. Doesn't get stuck"""
# imports
import rclpy
import math as m
from rclpy.node import Node
from sensor_msgs.msg import PointCloud2
from geometry_msgs.msg import Twist
import sensor_msgs_py.point_cloud2
import time
# main class
class ReactiveMove(Node):
MAX_STUCK_TIME = 5 # Time before the robot is considered to be stuck and has to make a U-turn
# Rectangle of vision of the robot
RECT_WIDTH = 0.5
RECT_LENGTH = 0.5
# Robot velocities
ANGULAR_VELOCITY = 1.0
LINEAR_VELOCITY = 0.2
def __init__(self, name="reactive_move", timerFreq = 1/60.0):
""" constructor """
super().__init__(name) # Create the node
# Initialize a subscriber
self.create_subscription(PointCloud2, 'scan_points', self.scan_callback, 10)
# Initialize a publisher
self._velocity_publisher = self.create_publisher(Twist, 'cmd_vel', 10)
# Initialize a clock for the publisher
self.create_timer(timerFreq, self.publish_velo)
# Initialize variables
self._timerFreq = timerFreq
self._previousScanTime = None
self._previousStraightMvtTime = None
self._points = []
def scan_callback(self, scanMsg):
"""
Save received points for future calculations
"""
self._points = sensor_msgs_py.point_cloud2.read_points(scanMsg)
self._previousScanTime = time.time()
def calcul_velo(self):
"""
Calculate the velocity based on the lidar detected points and other data
"""
now = time.time()
# If scan data received more than 1s ago, stop the robot
if now - self._previousScanTime > 1:
print("STOP ROBOT")
return Twist() # Return a zero velocity
# If it's been rotating for more than 5s, it's probably stuck in a corner
# In this case, a u-turn is performed, until >3.14s which is equivalent to a rotation of >180°
if self._previousStraightMvtTime and self.MAX_STUCK_TIME < now - self._previousStraightMvtTime <= self.MAX_STUCK_TIME + m.pi:
velo = Twist()
velo.angular.z = self.ANGULAR_VELOCITY
print("U-TURN")
return velo
# Default case
x, y = firstPointInFront(self._points, self.RECT_LENGTH, self.RECT_WIDTH / 2.0)
velocity = Twist()
if x and y:
if y >= 0: # Point is in the left part of the rectangle -> Turn right
velocity.angular.z = -self.ANGULAR_VELOCITY
print("TURN RIGHT")
else: # Point is in the right part of the rectangle -> Turn left
velocity.angular.z = self.ANGULAR_VELOCITY
print("TURN LEFT")
else: # If there's not point in front, go straight
velocity.linear.x = self.LINEAR_VELOCITY
self._previousStraightMvtTime = now
print("STRAIGHT")
return velocity
def publish_velo(self):
"""
Publish velocity independently of the callback
"""
# Make sure at least one callback is made
if not self._previousScanTime:
return -1
velocity = self.calcul_velo()
self._velocity_publisher.publish(velocity) #  Move according to previously calculated velocity
return 0
# Static functions
def firstPointInFront(points:list, maxX, maxY):
"""
Check if points in front of the robot (in the rectangle with forward distance maxX and lateral distance of 2*maxY)
"""
for (x,y,_) in points:
if(x>0 and x < maxX and abs(y) < maxY):
return (x,y)
return (None, None)
# Main loop
def main():
"""
Main loop
"""
rclpy.init()
rosNode = ReactiveMove()
rclpy.spin(rosNode)
rosNode.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()
\ No newline at end of file
#!/usr/bin/python3
"""Change Lidar message into point cloud object"""
import rclpy
import math
from rclpy.node import Node
from sensor_msgs.msg import LaserScan, PointCloud2
from sensor_msgs_py import point_cloud2
from std_msgs.msg import Header
# Create node
rosNode= None
def scan_callback(scanMsg):
"""receive a lidar message and send a pointcloud message in topic"""
global rosNode
obstacles = ranges_to_positions(scanMsg)
# Sample for the tuto :
sample = [ [ round(p[0], 2), round(p[1], 2), round(p[2], 2) ] for p in obstacles[10:20] ]
# rosNode.get_logger().info( f"\nheader:\n{scanMsg.header}\nnumber of ranges: {len(scanMsg.ranges)}\nSample: {sample}\n" )
# Publish a msg
pointCloud = point_cloud2.create_cloud_xyz32(Header(frame_id=scanMsg.header.frame_id), obstacles)
point_publisher.publish(pointCloud)
def ranges_to_positions(scan_message: LaserScan) -> list:
"""change a lidar message into a pointcloud message"""
obstacles= []
angle= scan_message.angle_min
for aDistance in scan_message.ranges :
if 0.1 < aDistance and aDistance < 5.0 :
aPoint= [
math.cos(angle) * aDistance,
math.sin(angle) * aDistance,
0
]
obstacles.append(aPoint)
angle+= scan_message.angle_increment
return obstacles
rclpy.init()
rosNode= Node('scan_interpreter')
# Initialize a subscriber:
rosNode.create_subscription( LaserScan, 'scan', scan_callback, 10)
# Initialize a publisher:
point_publisher = rosNode.create_publisher(PointCloud2, '/scan_points', 10)
while True :
rclpy.spin_once( rosNode )
rosNode.destroy_node()
rclpy.shutdown()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment