#!/usr/bin/python3 import rclpy from rclpy.node import Node from sensor_msgs.msg import Image from std_msgs.msg import String import numpy as np import sys, cv2, os, pathlib from cv_bridge import CvBridge class CameraVision(Node): def __init__(self, name="cam_vision", timerFreq = 1/60.0): super().__init__(name) # Create the node # Initialiaze node variables self._low = np.array([60-25, 50, 50]) self._high = np.array([65+25, 255, 255]) self._kernel = np.ones((5, 5), np.uint8) self._color_info = (0, 0, 255) # Initialize subscribers self.create_subscription(Image, 'cam_rgb', self.rgb_callback, 10) # Initialize subcriber variables self._rgb_img = None # Initialize publishers self._rgb_detection_publisher = self.create_publisher(Image, 'rgb_detection', 10) self._mask_detection_publisher = self.create_publisher(Image, 'mask_detection', 10) self._detection_publisher = self.create_publisher(String, 'detection', 10) # Initialize a clock for the publisher self.create_timer(timerFreq, self.loop) # Initialize a ROS2 Image to CV2 image converter self._bridge = CvBridge() # Callbacks def rgb_callback(self, img): self._rgb_img = self._bridge.imgmsg_to_cv2(img, desired_encoding='passthrough') def publish_msgs(self, image, mask): # Convert cv2 images to ROS2 RGB Image20 ros2_mask_image = cv2.cvtColor(mask, cv2.COLOR_GRAY2RGB) ros2_mask_image = cv2.cvtColor(ros2_mask_image, cv2.COLOR_RGB2BGR) ros2_mask_image = self._bridge.cv2_to_imgmsg(ros2_mask_image,"bgr8") ros2_mask_image.header.stamp = self.get_clock().now().to_msg() ros2_mask_image.header.frame_id = "image" ros2_rgb_image = cv2.cvtColor(image, cv2.COLOR_HSV2BGR) ros2_rgb_image = self._bridge.cv2_to_imgmsg(ros2_rgb_image,"bgr8") ros2_rgb_image.header.stamp = self.get_clock().now().to_msg() ros2_rgb_image.header.frame_id = "image" # Publish String message -> to inform that we have detected a bottle str = String() str.data = "A bottle has been detected" self._detection_publisher.publish(str) # Publish Images -> to be able to detect false positives and improve the algorithm self._rgb_detection_publisher.publish(ros2_rgb_image) self._mask_detection_publisher.publish(ros2_mask_image) # Node internal loop def loop(self): if self._rgb_img is None : return # Convert image to HSV for easier thresholding image = cv2.cvtColor(self._rgb_img, cv2.COLOR_BGR2HSV) # Seuillage par colorimétrie mask = cv2.inRange(image, self._low, self._high) # Réduction du bruit mask = cv2.erode(mask, self._kernel, iterations=4) mask = cv2.dilate(mask, self._kernel, iterations=4) # Segmentation using shape matching directory = pathlib.PurePath(__file__).parent pattern = cv2.imread(str(directory / 'img' / 'template.jpg'), cv2.IMREAD_GRAYSCALE) pattern2 = cv2.imread(str(directory / 'img' / 'template2.jpg'), cv2.IMREAD_GRAYSCALE) contours,_ = cv2.findContours(pattern, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) sorted_contours = sorted(contours, key=cv2.contourArea, reverse=True) template_contour = sorted_contours[0] contours,_ = cv2.findContours(pattern2, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) sorted_contours = sorted(contours, key=cv2.contourArea, reverse=True) template2_contour = sorted_contours[0] contours,_ = cv2.findContours(mask, cv2.RETR_CCOMP, cv2.CHAIN_APPROX_SIMPLE) sorted_contours = sorted(contours, key=cv2.contourArea, reverse=True) for c in contours: # Compare our mask with our templates match = cv2.matchShapes(template_contour, c, 3, 0.0) match2 = cv2.matchShapes(template2_contour, c, 3, 0.0) # Get size of object (x, y), radius = cv2.minEnclosingCircle(c) # If we find a correct match that is big enough, we publish data if min(match, match2) < 0.3 and radius > 30 and y > 2.0/5 * 480: # print(f'Match : {min(match, match2)}') # Circle the bottle on the image cv2.circle(image, (int(x), int(y)), int(radius), self._color_info, 2) cv2.circle(image, (int(x), int(y)), 5, self._color_info, 10) # Publish images and string message self.publish_msgs(image, mask) break # Stop checking other contours def main(): """ Main loop """ rclpy.init() rosNode = CameraVision(timerFreq=1.0/120) rclpy.spin(rosNode) rosNode.destroy_node() rclpy.shutdown() if __name__ == "__main__": main()