Basic bottle detection node

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from std_msgs.msg import String
import numpy as np
import sys, cv2
from cv_bridge import CvBridge
def __init__(self, name="cam_vision", timerFreq = 1/60.0):
super().__init__(name) # Create the node
# Initialiaze node variables
self._low = np.array([58-20, 60, 40])
self._high = np.array([58+20, 255,255])
self._kernel = np.ones((3, 3), np.uint8)
self._color_info = (0, 0, 255)
# Initialize subscribers
self.create_subscription(Image, 'cam_rgb', self.rgb_callback, 10)
self.create_subscription(Image, 'cam_depth', self.depth_callback, 10)
self.create_subscription(Image, 'cam_infra_1', self.infra1_callback, 10)
self.create_subscription(Image, 'cam_infra_2', self.infra2_callback, 10)
# Initialize subcriber variables
self._rgb_img = None
self._depth_img = None
self._infra1_img = None
self._infra2_img = None
# Initialize publishers
# self._rgb_publisher = self.create_publisher(Image, 'cam_rgb', 10)
self._detection_publisher = self.create_publisher(Image, 'cam_detection', 10)
self._mask_publisher = self.create_publisher(Image, 'mask_detection', 10)
self._bottle_detected_publisher = self.create_publisher(String, 'bottle_detected', 10)
# Initialize a clock for the publisher
self.create_timer(timerFreq, self.loop)
def infra2_callback(self, img):
self._infra2_img = self._bridge.imgmsg_to_cv2(img, desired_encoding='passthrough')
# Publish functions
# def publish_rgb(self):
# """
# Publish RGB image
# """
# self._rgb_publisher.publish(self._rgb_image)
# return 0
# Node internal loop
def loop(self):
if self._rgb_img is None :
# Convert image to HSV for easier thresholding
image = cv2.cvtColor(self._rgb_img, cv2.COLOR_BGR2HSV)
# Seuillage par colorimétrie
mask = cv2.inRange(image, self._low, self._high)
beforeErrosion = mask
# Réduction du bruit
mask = cv2.erode(mask, self._kernel, iterations=4)
mask = cv2.dilate(mask, self._kernel, iterations=4)
# print(mask)
# cv2.imshow('Camera', self._rgb_img)
# cv2.imshow('Before errosion', beforeErrosion)
# cv2.imshow('Mask', mask)
# Segmentation using the "Min enclosing circle" method
elements = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)[-2]
if len(elements) > 0: # If we detect elements
c = max(elements, key=cv2.contourArea)
(x, y), radius = cv2.minEnclosingCircle(c)
if radius > 20: #If the radius of the biggest element detected is greater than 30, we consider that we have detected a bottle
# Circle the bottle on the image, (int(x), int(y)), int(radius), self._color_info, 2), (int(x), int(y)), 5, self._color_info, 10)
# Convert cv2 images to ROS2 RGB Image
ros2_mask_image = cv2.cvtColor(mask, cv2.COLOR_GRAY2RGB)
ros2_mask_image = cv2.cvtColor(ros2_mask_image, cv2.COLOR_RGB2BGR)
ros2_mask_image = self._bridge.cv2_to_imgmsg(ros2_mask_image,"bgr8")
ros2_mask_image.header.stamp = self.get_clock().now().to_msg()
ros2_mask_image.header.frame_id = "image"
ros2_image = cv2.cvtColor(image, cv2.COLOR_HSV2BGR)
ros2_image = self._bridge.cv2_to_imgmsg(ros2_image,"bgr8")
ros2_image.header.stamp = self.get_clock().now().to_msg()
ros2_image.header.frame_id = "image"
# Publish String message -> to inform that we have detected a bottle
str = String() = "A bottle has been detected"
# Publish Image -> to be able to detect false positives and improve the algorithm
Main loop
rosNode = CameraDriver(timerFreq=1.0/120)
rosNode = CameraVision(timerFreq=1.0/120)
