Description
Describe the bug
I have Flask app which manages cameras and HLS streams. Flask uses threads to process requests. So when I trying to execute that method I'm getting BrokenPipeError
. (self.cameras
contains Picamera2 objects). And class containing method provided below is singleton.
def start_stream(self, in_cam: int):
try:
config = self.cameras[in_cam]['obj'].create_video_configuration(main={"size": (800, 600)})
encoder = H264Encoder(bitrate=16000, qp=30)
output = FfmpegOutput(f"-f hls -hls_time 4 -hls_list_size 5 -hls_flags delete_segments -hls_allow_cache 0 /tmp/hls/stream{in_cam}.m3u8")
output.error_callback = lambda x: print(f"FFMpeg Error: {x}")
self.cameras[in_cam]['obj'].configure(config)
self.cameras[in_cam]['obj'].start_recording(encoder, output)
self.cameras[in_cam]['busy'] = True
print(f'Started streaming for camera {in_cam}')
except Exception as e:
print(e)
return False
return True
When I'm trying to put that method in different thread/process result is the same.
That code works if executed in main thread.
To Reproduce
Flask server, singleton with initialized cameras, HLS streaming using H264Encoder and FfmpegOutput. Or it can be reproduced with built-in python threading or multiprocessing.
# camera_manager.py
import os
import sys
import signal
import time
import cv2
import multiprocessing as mp
from picamera2 import Picamera2
from picamera2.encoders import H264Encoder
from picamera2.outputs import FfmpegOutput
from src.device_manager.camera.camera_stats import CameraStats
class CameraManager:
def __init__(self):
print('Creating cameramanager')
self.stats = CameraStats()
self.cameras = {}
self.processes = {}
brief = self.stats.get_brief_cam_info()
for i in range(len(brief)):
self.cameras[brief[i]['Num']] = {
"obj": Picamera2(brief[i]['Num']),
"busy": False,
}
def stop_stream(self, in_cam: int):
try:
self.cameras[in_cam]['obj'].stop_recording()
self.cameras[in_cam]['busy'] = False
self.processes[in_cam].terminate()
except Exception as e:
print(e)
return False
return True
def start_stream(self, in_cam: int):
process = mp.Process(target=self.__start_stream, args=(in_cam,))
self.processes[in_cam] = process
self.processes[in_cam].start()
return True
def __start_stream(self, in_cam: int):
try:
config = self.cameras[in_cam]['obj'].create_video_configuration(main={"size": (800, 600)})
encoder = H264Encoder(bitrate=16000, qp=30)
output = FfmpegOutput(f"-f hls -hls_time 4 -hls_list_size 5 -hls_flags delete_segments -hls_allow_cache 0 /tmp/hls/stream{in_cam}.m3u8")
output.error_callback = lambda x: print(f"FFMpeg Error: {x}")
self.cameras[in_cam]['obj'].configure(config)
self.cameras[in_cam]['obj'].start_recording(encoder, output)
self.cameras[in_cam]['busy'] = True
print(f'Started streaming for camera {in_cam}')
except Exception as e:
print(e)
return False
return True
def get_camera(self, in_cam):
return self.cameras[in_cam]
if __name__ == "__main__":
CM = CameraManager()
CM.start_stream(0)
time.sleep(10)
CM.stop_stream(0)
import json
from pprint import pprint
from picamera2 import Picamera2
class CameraStats:
def is_camera_busy(self, camera):
return camera['busy']
def get_brief_cam_info(self):
return Picamera2.global_camera_info()
def get_cam_info(self, cam):
cameras = self.get_brief_cam_info()
props = []
for i in range(len(cameras)):
if cameras[i]['Num'] == cam['obj'].camera_idx:
props = cam['obj'].camera_properties | cameras[i]
break
return props
def get_cam_controls(self, cam):
return cam['obj'].camera_controls
Expected behaviour
FFMpeg writes HLS stream encoded by H264Encoder.
Console Output, Screenshots
[Errno 32] Broken pipe
Hardware :
Raspberry Pi 5 with M.2 HAT+, NoIR Camera V2, behavior is not changed with other raspberries or cameras
Activity