#!/usr/bin/python3 import rclpy from rclpy.node import Node from sensor_msgs.msg import Image from std_msgs.msg import String, Float32MultiArray from geometry_msgs.msg import Point import numpy as np import cv2, pathlib, statistics from cv_bridge import CvBridge from cam_driver import CameraDriver import pyrealsense2 as rs class CameraVision(CameraDriver): def __init__(self, name="cam_vision", timerFreq = 1/60.0): super().__init__(name, timerFreq) # Create the node # Load parameters directory = pathlib.PurePath(__file__).parent try: # Load HSV threshold params from file file = open(str(directory / "config" / "hsv_params" ), "r") # Parse string text = file.read() low = text.split(",")[0].strip('][ ') high = text.split(",")[1].strip('][ ') self._low = np.array([int(i) for i in low.split(" ") if i]) self._high = np.array([int(i) for i in high.split(" ") if i]) print(f"Successfully loaded HSV params :\n - low : {self._low}\n - high {self._high}") except: # Initialize default variables if no file exists self._low = np.array([60-25, 50, 50]) self._high = np.array([65+25, 255, 255]) print("Set HSV params to default values :\n - low : {self._low}\n - high {self._high}") # Node parameters self.declare_parameters( namespace='', parameters=[ ('DEPTH_CHECK_RADIUS', 10) ]) # Node variables self._kernel = np.ones((5, 5), np.uint8) self._color_info = (0, 0, 255) # Initialize a ROS2 Image to CV2 image converter self._bridge = CvBridge() def init_publishers(self, timerFreq: float): # Initialize publishers self._rgb_detection_publisher = self.create_publisher(Image, 'rgb_detection', 10) self._mask_detection_publisher = self.create_publisher(Image, 'mask_detection', 10) self._detection_publisher = self.create_publisher(String, 'detection', 10) self._bottle_relative_pos_publisher = self.create_publisher(Point, 'bottle_relative_pos', 10) # Initialize a clock for the publisher self.create_timer(timerFreq, self.loop) self.create_timer(1, self.publish_imgs) # Limit the publishing of images to 1 each second (to prevent network issues) def publish_imgs(self): if self._rgb_detection is None or self._mask_detection is None: return # Convert cv2 images to ROS2 RGB Image ros2_mask_image = cv2.cvtColor(self._mask_detection, cv2.COLOR_GRAY2RGB) ros2_mask_image = cv2.cvtColor(ros2_mask_image, cv2.COLOR_RGB2BGR) ros2_mask_image = self._bridge.cv2_to_imgmsg(ros2_mask_image,"bgr8") ros2_mask_image.header.stamp = self.get_clock().now().to_msg() ros2_mask_image.header.frame_id = "image" ros2_rgb_image = cv2.cvtColor(self._rgb_detection, cv2.COLOR_HSV2BGR) ros2_rgb_image = self._bridge.cv2_to_imgmsg(ros2_rgb_image,"bgr8") ros2_rgb_image.header.stamp = self.get_clock().now().to_msg() ros2_rgb_image.header.frame_id = "image" # Publish Images -> to be able to detect false positives and improve the algorithm self._rgb_detection_publisher.publish(ros2_rgb_image) self._mask_detection_publisher.publish(ros2_mask_image) # Reset images self._rgb_detection = None self._mask_detection = None def publish_data(self, xDist, yDist): # Publish String message -> to inform that we have detected a bottle str = String() str.data = "A bottle has been detected" self._detection_publisher.publish(str) # Depth publish relative_position = Point() relative_position.x = xDist relative_position.y = yDist self._bottle_relative_pos_publisher.publish(relative_position) # Node internal loop def loop(self): """ Main loop to call """ # update images if self._update_imgs() is None : return color_image = np.asanyarray(self._color_frame.get_data()) # Convert image to HSV for easier thresholding image = cv2.cvtColor(color_image, cv2.COLOR_BGR2HSV) # Seuillage par colorimétrie mask = cv2.inRange(image, self._low, self._high) # Réduction du bruit mask = cv2.erode(mask, self._kernel, iterations=4) mask = cv2.dilate(mask, self._kernel, iterations=4) # Segmentation using shape matching directory = pathlib.PurePath(__file__).parent pattern = cv2.imread(str(directory / 'img' / 'template.jpg'), cv2.IMREAD_GRAYSCALE) pattern2 = cv2.imread(str(directory / 'img' / 'template2.jpg'), cv2.IMREAD_GRAYSCALE) contours,_ = cv2.findContours(pattern, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) sorted_contours = sorted(contours, key=cv2.contourArea, reverse=True) template_contour = sorted_contours[0] contours,_ = cv2.findContours(pattern2, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) sorted_contours = sorted(contours, key=cv2.contourArea, reverse=True) template2_contour = sorted_contours[0] contours,_ = cv2.findContours(mask, cv2.RETR_CCOMP, cv2.CHAIN_APPROX_SIMPLE) sorted_contours = sorted(contours, key=cv2.contourArea, reverse=True) findPt = False (x, y) = 0, 0 for c in sorted_contours: # Compare our mask with our templates match = cv2.matchShapes(template_contour, c, 3, 0.0) match2 = cv2.matchShapes(template2_contour, c, 3, 0.0) # Get size of object (x, y), radius = cv2.minEnclosingCircle(c) # If we find a correct match that is big enough, we publish data if min(match, match2) < 0.3 and radius > 30: findPt = True break # Stop checking other contours # point found if findPt: # Circle the bottle on the image cv2.circle(image, (int(x), int(y)), int(radius), self._color_info, 2) cv2.circle(image, (int(x), int(y)), 5, self._color_info, 10) # Store images for publishing self._rgb_detection = image self._mask_detection = mask x,y = int(x), int(y) # Depth calculus xDist, yDist = self.pixelToRelativePos(x,y) # Publish string and position message self.publish_data(xDist, yDist) def pixelToRelativePos(self, x, y): # Calculation of the depth value color_intrin = self._color_frame.profile.as_video_stream_profile().intrinsics D_RADIUS = self.paramInt('DEPTH_CHECK_RADIUS') front_dist = [] lateral_dist = [] for X in range(x-D_RADIUS, x+D_RADIUS): for Y in range(y-D_RADIUS, y+D_RADIUS): if (X-x)**2 + (Y-y)**2 <= D_RADIUS: depth = self._depth_dist_frame.get_distance(x, y) dx ,dy, dz = rs.rs2_deproject_pixel_to_point(color_intrin, [x, y], depth) # Store values for average front_dist.append(dz) lateral_dist.append(-dx) xDist = statistics.mean(front_dist) yDist = statistics.mean(lateral_dist) return xDist, yDist def paramInt(self, name): return self.get_parameter(name).get_parameter_value().integer_value def main(): """ Main loop """ rclpy.init() rosNode = CameraVision(timerFreq=1.0/60) rclpy.spin(rosNode) rosNode.destroy_node() rclpy.shutdown() if __name__ == "__main__": main()