#!/usr/bin/python3 import rclpy from rclpy.node import Node from sensor_msgs.msg import Image from std_msgs.msg import String import numpy as np import sys, cv2 from cv_bridge import CvBridge class CameraVision(Node): def __init__(self, name="cam_vision", timerFreq = 1/60.0): super().__init__(name) # Create the node # Initialiaze node variables self._low = np.array([58-20, 60, 40]) self._high = np.array([58+20, 255,255]) self._kernel = np.ones((3, 3), np.uint8) self._color_info = (0, 0, 255) # Initialize subscribers self.create_subscription(Image, 'cam_rgb', self.rgb_callback, 10) # self.create_subscription(Image, 'cam_depth', self.depth_callback, 10) # self.create_subscription(Image, 'cam_infra_1', self.infra1_callback, 10) # self.create_subscription(Image, 'cam_infra_2', self.infra2_callback, 10) # Initialize subcriber variables self._rgb_img = None self._depth_img = None self._infra1_img = None self._infra2_img = None # Initialize publishers self._rgb_detection_publisher = self.create_publisher(Image, 'rgb_detection', 10) self._mask_detection_publisher = self.create_publisher(Image, 'mask_detection', 10) self._detection_publisher = self.create_publisher(String, 'detection', 10) # Initialize a clock for the publisher self.create_timer(timerFreq, self.loop) # Initialize a ROS2 Image to CV2 image converter self._bridge = CvBridge() # Callbacks def rgb_callback(self, img): self._rgb_img = self._bridge.imgmsg_to_cv2(img, desired_encoding='passthrough') # def depth_callback(self, img): # self._depth_img = self._bridge.imgmsg_to_cv2(img, desired_encoding='passthrough') # def infra1_callback(self, img): # self._infra1_img = self._bridge.imgmsg_to_cv2(img, desired_encoding='passthrough') # def infra2_callback(self, img): # self._infra2_img = self._bridge.imgmsg_to_cv2(img, desired_encoding='passthrough') # Node internal loop def loop(self): if self._rgb_img is None : return # Convert image to HSV for easier thresholding image = cv2.cvtColor(self._rgb_img, cv2.COLOR_BGR2HSV) # Seuillage par colorimétrie mask = cv2.inRange(image, self._low, self._high) beforeErrosion = mask # Réduction du bruit mask = cv2.erode(mask, self._kernel, iterations=4) mask = cv2.dilate(mask, self._kernel, iterations=4) # print(mask) # cv2.imshow('Camera', self._rgb_img) # cv2.imshow('Before errosion', beforeErrosion) # cv2.imshow('Mask', mask) # Segmentation using the "Min enclosing circle" method elements = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)[-2] if len(elements) > 0: # If we detect elements c = max(elements, key=cv2.contourArea) (x, y), radius = cv2.minEnclosingCircle(c) if radius > 20: #If the radius of the biggest element detected is greater than 30, we consider that we have detected a bottle # Circle the bottle on the image cv2.circle(image, (int(x), int(y)), int(radius), self._color_info, 2) cv2.circle(image, (int(x), int(y)), 5, self._color_info, 10) # Convert cv2 images to ROS2 RGB Image ros2_mask_image = cv2.cvtColor(mask, cv2.COLOR_GRAY2RGB) ros2_mask_image = cv2.cvtColor(ros2_mask_image, cv2.COLOR_RGB2BGR) ros2_mask_image = self._bridge.cv2_to_imgmsg(ros2_mask_image,"bgr8") ros2_mask_image.header.stamp = self.get_clock().now().to_msg() ros2_mask_image.header.frame_id = "image" ros2_rgb_image = cv2.cvtColor(image, cv2.COLOR_HSV2BGR) ros2_rgb_image = self._bridge.cv2_to_imgmsg(ros2_rgb_image,"bgr8") ros2_rgb_image.header.stamp = self.get_clock().now().to_msg() ros2_rgb_image.header.frame_id = "image" # Publish String message -> to inform that we have detected a bottle str = String() str.data = "A bottle has been detected" self._detection_publisher.publish(str) # Publish Image -> to be able to detect false positives and improve the algorithm self._rgb_detection_publisher.publish(ros2_rgb_image) self._mask_detection_publisher.publish(ros2_mask_image) def main(): """ Main loop """ rclpy.init() rosNode = CameraVision(timerFreq=1.0/120) rclpy.spin(rosNode) rosNode.destroy_node() rclpy.shutdown() if __name__ == "__main__": main()