当前位置: 首页> 科技> 互联网 > YOLOv8模型实时检测RTSP协议视频流并实时发送报警信息到Java服务端实现(超详细)

YOLOv8模型实时检测RTSP协议视频流并实时发送报警信息到Java服务端实现(超详细)

时间:2025/7/13 16:51:06来源:https://blog.csdn.net/qq_35207086/article/details/142332300 浏览次数:0次

前言

在训练模型完成后,想把模型应用起来,比如模型可以部署到项目中,实时接收RTSP视频流进行识别检测,一旦达到自己所设置的置信度阈值(例如大于0.5),系统就会实时把报警信息发送给服务端,由服务端进行实时响应和策略执行,很像是个企业级应用流程 。

比如:我们在yolov8平台训练完“火焰”模型,想要实时检测并把识别结果发送给服务端进行报警,一旦出现火焰,立即发送服务端进行报警。如下图所示:

 智能识别系统(python端)

首先,获取RTSP视屏流,可以是单个视频流也可以是视频流数组,所有常量都可以通过配置文件的形式进行读取。在yolov8网络模型代码基础上进行调用模型,然后对实时的RTSP协议视屏流进行实时识别,一旦发现有异常,比如出现火焰,并且超过置信度阈值(0.5),就立马发送报警信息到服务端,以下是python代码,主要实现:

2.1.模型调用

   # 加载YOLOv8模型model = YOLO('model.pt')

2.2.RTSP协议视频流实时读取

def process_video_stream(rtsp_url):"""从RTSP流中读取视频帧并处理。:param rtsp_url: RTSP流的URL:return: None"""cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)if not cap.isOpened():logging.error(f"无法打开RTSP视频流:{rtsp_url}")return

2.3.异常检测

def start_detection(rtsp_url):"""启动视频流检测线程。:param rtsp_url: RTSP流的URL"""detection_thread = threading.Thread(target=process_video_stream, args=(rtsp_url,))detection_thread.start()

2.4.实时发送报警信息

def send_alarm(alarm_data):"""发送报警信息到服务器。:param alarm_data: 包含标签名称、数量、置信度和图像信息的字典:return: None"""for endpoint in SERVER_ENDPOINTS:try:response = requests.post(endpoint, json=alarm_data)if response.status_code == 200:logging.info(f"报警信息成功发送到 {endpoint}")else:logging.error(f"报警信息发送到 {endpoint} 失败,状态码:{response.status_code}")except Exception as e:logging.error(f"发送报警信息到 {endpoint} 时发生错误:{e}")

2.5.发送报警信息格式

 # 构造报警信息class_names = [model.names[int(cls)] for cls in filtered_classes]alarm_data = {"labels": class_names,"counts": len(filtered_boxes),"confidences": filtered_confidences.tolist(),"image": image_to_base64(frame),"timestamp": timestamp  # 添加时间戳字段}# 记录日志logging.info(f"发送报警信息:标签={class_names}, 时间={timestamp}, 类别={filtered_classes}, 置信度={filtered_confidences.tolist()}")# 发送报警信息send_alarm(alarm_data)

完整代码参考如下: 

import cv2
import torch
import threading
import requests
import time
from datetime import datetime
import json
import logging
import os
from queue import Queue
import base64
from concurrent.futures import ThreadPoolExecutor
from ultralytics import YOLO  # 导入YOLO类# 读取配置文件
def load_config(config_file='config.json'):"""从配置文件中加载配置信息。:param config_file: 配置文件路径:return: 配置信息字典"""try:with open(config_file, 'r', encoding='utf-8') as f:config = json.load(f)return configexcept Exception as e:logging.critical(f"无法加载配置文件:{e}")raise# 将图像转换为Base64编码的字符串
def image_to_base64(image):"""将图像转换为Base64编码的字符串。:param image: 图像数组:return: Base64编码的字符串"""try:_, buffer = cv2.imencode('.jpg', image)return base64.b64encode(buffer).decode('utf-8')except Exception as e:logging.error(f"图像转换失败:{e}")return None# 发送报警信息到服务器
def send_alarm(alarm_data, server_endpoints):"""发送报警信息到服务器。:param alarm_data: 包含标签名称、数量、置信度和图像信息的字典:param server_endpoints: 服务器端点列表:return: None"""headers = {'Content-Type': 'application/json'}try:for endpoint in server_endpoints:response = requests.post(endpoint, json=alarm_data, headers=headers,timeout=10)if response.status_code == 200:logging.info(f"报警信息成功发送到 {endpoint}")else:logging.error(f"报警信息发送到 {endpoint} 失败,状态码:{response.status_code}")except requests.exceptions.RequestException as e:logging.error(f"发送报警信息到 {endpoint} 时发生网络错误:{e}")except Exception as e:logging.error(f"发送报警信息到 {endpoint} 时发生错误:{e}")
# 从RTSP流中读取视频帧并处理
def process_video_stream(rtsp_url, model, config, alarm_queue):"""从RTSP流中读取视频帧并处理。:param rtsp_url: RTSP流的URL:param model: YOLO模型:param config: 配置信息字典:param alarm_queue: 报警队列:return: None"""try:cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)if not cap.isOpened():logging.error(f"无法打开RTSP视频流:{rtsp_url}")return# 初始化重试计数器和重试时间retry_count = 0retry_start_time = time.time()  # 记录开始重试的时间# 初始化 FPS 计算器fps_start_time = time.time()fps_num_frames = 0fps_window = []while True:# 检查是否超过了重试时间if time.time() - retry_start_time > config['total_retry_duration'] * 60:logging.error(f"重试时间超过 {config['total_retry_duration']} 分钟,停止处理。")breaktry:# 读取视频帧ret, frame = cap.read()if not ret:logging.warning(f"无法获取帧数据:{rtsp_url}")# 释放资源并重新打开cap.release()cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)if not cap.isOpened():# 重试连接for _ in range(config['max_retries_per_minute']):if cap.isOpened():breaklogging.error(f"尝试重新打开RTSP视频流:{rtsp_url}")cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)retry_count += 1time.sleep(config['retry_interval'])  # 每次重试间隔时间else:logging.error(f"无法重新打开RTSP视频流:{rtsp_url}")time.sleep(60 - (time.time() - fps_start_time) % 60)  # 等待到下一分钟continuecontinue# 使用YOLOv8进行检测results = model(frame, verbose=False)# 获取检测结果result = results[0]boxes = result.boxes.to(model.device)  # 将结果移动到同一设备confidences = boxes.conf.cpu().numpy()classes = boxes.cls.cpu().numpy().astype(int)# 筛选置信度大于阈值的结果high_conf_indices = confidences > config['confidence_threshold']filtered_boxes = boxes.xyxy[high_conf_indices].cpu().numpy()filtered_classes = classes[high_conf_indices]filtered_confidences = confidences[high_conf_indices]if len(filtered_boxes) > 0:# 当前时间戳current_datetime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')  # 格式化日期时间# 构造报警信息class_names = [model.names[int(cls)] for cls in filtered_classes]  # 类名列表alarm_data = {"labels": class_names,"counts": len(filtered_boxes),"confidences": filtered_confidences.tolist(),"image": image_to_base64(frame),"timestamp": current_datetime  # 添加格式化的日期时间字段}# 记录日志logging.info(f"发送报警信息:标签={class_names}, 时间={current_datetime}, 类别={filtered_classes}, 置信度={filtered_confidences.tolist()}")# 将报警信息放入队列alarm_queue.put(alarm_data)# 计算 FPSfps_num_frames += 1if (time.time() - fps_start_time) > config['fps_interval']:fps = fps_num_frames / (time.time() - fps_start_time)fps_window.append(fps)if len(fps_window) > config['fps_window_size']:fps_window.pop(0)avg_fps = sum(fps_window) / len(fps_window)logging.info(f"当前 FPS:{avg_fps:.2f}")fps_start_time = time.time()fps_num_frames = 0except cv2.error as e:logging.error(f"OpenCV错误:{e}")retry_count += 1continueexcept requests.exceptions.RequestException as e:logging.error(f"网络请求错误:{e}")retry_count += 1continueexcept torch.cuda.OutOfMemoryError as e:logging.error(f"CUDA内存不足:{e}")retry_count += 1continueexcept Exception as e:logging.error(f"处理视频流时发生错误:{e}")retry_count += 1continuecap.release()except Exception as e:logging.error(f"处理视频流时发生全局异常:{e}")# 从队列中读取报警信息并发送到服务器
def send_alarms(alarm_queue, server_endpoints):"""从队列中读取报警信息并发送到服务器。:param alarm_queue: 报警队列:param server_endpoints: 服务器端点列表:return: None"""try:while True:alarm_data = alarm_queue.get()send_alarm(alarm_data, server_endpoints)alarm_queue.task_done()except Exception as e:logging.error(f"发送报警信息时发生异常:{e}")# 启动多个视频流的检测
def start_detection(rtsp_urls, config):"""启动多个视频流的检测。:param rtsp_urls: RTSP流的URL列表:param config: 配置信息字典:return: None"""# 创建报警队列alarm_queue = Queue(maxsize=config['max_workers'])# 启动报警发送线程executor = ThreadPoolExecutor(max_workers=config['max_workers'])for _ in range(config['max_workers']):executor.submit(send_alarms, alarm_queue, config['server_endpoints'])# 加载YOLOv8模型,并优先使用GPU,如果没有GPU则使用CPUdevice = 'cuda' if torch.cuda.is_available() else 'cpu'logging.info(f"使用设备:{device}")model = YOLO(config['model_path']).to(device)# 启动多个检测线程for rtsp_url in rtsp_urls:detection_thread = threading.Thread(target=process_video_stream, args=(rtsp_url, model, config, alarm_queue))detection_thread.start()if __name__ == '__main__':# 加载配置文件config = load_config()# 设置日志级别logging.basicConfig(level=config['log_level'], format='%(asctime)s - %(levelname)s - %(message)s')# 直接在主程序中启动多个视频流的检测start_detection(config['rtsp_urls'], config)

配置文件代码:

{"rtsp_urls": ["rtsp://admin:XXXXXX@xx.xx.xx.xx:554/Streaming/Channels/101"],"server_endpoints": ["http://localhost:8088/alarm"],"model_path": "yolov8n.pt","device": "cuda","retry_interval": 6,"max_retries_per_minute": 10,"total_retry_duration": 30,"log_level": "INFO","confidence_threshold": 0.5,"fps_interval": 1,"fps_window_size": 10,"max_workers": 5
}

服务端接收报警信息(Java端)

服务端是Java编写,主要是接收python端发送的报警信息,报警字段主要有:标签名称、置信度大小、图片、数量和时间等字段,python发送时会根据设置的server_endpoints地址进行发送。例如:http://localhost:8088/alarm。

Java服务端代码:

bean类:

package com.wei.demo1.demo;import java.util.Arrays;
import java.util.Date;
import java.util.List;/*** @BelongsProject: demo1* @BelongsPackage: com.wei.demo1.demo* @ClassName AlarmInfo* @Author: weiq* @CreateTime: 2024-09-13  14:01* @Description: TODO* @Version: 1.0*/
public class AlarmInfo {private String[] labels;private Integer  counts;private String[] confidences;private String image;private String timestamp;public String[] getLabels() {return labels;}public void setLabels(String[] labels) {this.labels = labels;}public Integer getCounts() {return counts;}public void setCounts(Integer counts) {this.counts = counts;}public String[] getConfidences() {return confidences;}public void setConfidences(String[] confidences) {this.confidences = confidences;}..........................

处理类:

package com.wei.demo1.demo;
import com.alibaba.fastjson.JSON;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.lang.reflect.Type;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;public class AlarmReceiver {public static void main(String[] args) throws Exception {HttpServer server = HttpServer.create(new InetSocketAddress(8088), 0);server.createContext("/alarm", new AlarmHandler());server.setExecutor(null); // creates a default executorserver.start();System.out.println("Server started and listening on port 8088");}static class AlarmHandler implements HttpHandler {@Overridepublic void handle(HttpExchange exchange) throws IOException {if (!"POST".equals(exchange.getRequestMethod())) {exchange.sendResponseHeaders(405, -1); // Method Not Allowedreturn;}StringBuilder sb = new StringBuilder();try (BufferedReader br = new BufferedReader(new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8))) {String line;while ((line = br.readLine()) != null) {sb.append(line);}} catch (IOException e) {System.err.println("Error reading request body: " + e.getMessage());exchange.sendResponseHeaders(500, -1); // Internal Server Errorreturn;}String body = sb.toString();
//            System.out.println("Received POST data: " + body);// 使用FastJSON解析JSON字符串AlarmInfo alarmInfo = null;try {alarmInfo = JSON.parseObject(body, AlarmInfo.class);} catch (Exception e) {System.err.println("Failed to parse JSON object: " + e.getMessage());exchange.sendResponseHeaders(400, 0); // Bad Requestreturn;}System.out.println("Received POST data: " + alarmInfo);// 设置响应头和状态码exchange.getResponseHeaders().add("Content-Type", "application/json");exchange.sendResponseHeaders(200, body.length());// 写入响应体try (OutputStream responseBody = exchange.getResponseBody()) {responseBody.write(body.getBytes(StandardCharsets.UTF_8));}}}
}

测试效果

python端发送报警信息

Java服务端接收报警信息

完成!

后续更新策略执行的代码,敬请期待!!! 

关键字:YOLOv8模型实时检测RTSP协议视频流并实时发送报警信息到Java服务端实现(超详细)

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: