i have a bunch of videos 4k videos etc…and feel it’d be great to find an app that just autocompresses the library in the background to 1080 or something to save space. anyway to do this?
Stash is not meant to do that.
Most popular tools for that are:
And there are tools to automate it:
No. Stash is not good at that task. Use tdarr.
Whoops already answered above with a better answer.
This Python script compresses video files in a directory using ffmpeg with efficient settings (HEVC, reduced resolution, limited bitrate and frame rate). It is designed to help reduce video file sizes for use with Stash or other media organizers.
Steps to Use the Script
- Save the Script
- Copy the provided script to a file named
reduce_video_size.py
on your system.
- Install Requirements
- Ensure you have Python 3 installed.
- Install ffmpeg and ffprobe, which are required for processing video files.
- On Linux:
sudo apt install ffmpeg
- On macOS (with Homebrew):
brew install ffmpeg
- On Linux:
- Prepare Your Videos
- Place all the video files you wish to process into a directory. The script will scan this directory and its subfolders for common video formats:
.mp4
,.mkv
,.avi
,.mov
,.m4v
,.wmv
,.vob
.
- Run the Script
- Use the command:
text
python3 reduce_video_size.py <directory>
Replace <directory>
with the path to your videos.
- The script will analyze each video and, if it exceeds threshold settings (resolution, bitrate, etc.), compress it to a smaller version using the settings defined at the top of the script.
- Post-Processing
- After conversion, the script automatically deletes the larger file (either the original or the new file) to avoid duplicates.
- You may want to manually check results to ensure your metadata is still accurate.
Current Limitations
- No automatic Stash integration: The script operates independently from Stash. After compression, you must manually scan your video directory in Stash and sometimes reassign or update metadata to the newly created files, deduplicating originals versus converted files.
Ideal Improvements
- Direct Stash Database Integration:
It would be much more efficient if the script could:- Query the Stash database (such as via the Stash GraphQL API or direct SQLite access) to:
- Identify scenes with the largest bitrate first, optimizing the compression priority.
- Automatically update scene entries in the Stash database after conversion to point to the new (compressed) file, preserving all metadata.
- This would eliminate the need for a rescan and manual metadata transfer, making the workflow seamless and maintaining a tidy, deduplicated library.
- Query the Stash database (such as via the Stash GraphQL API or direct SQLite access) to:
Currently, you must manually scan for new files and deduplicate or move metadata from the original to the compressed version within Stash. Adding direct database handling and auto-updating features would make the process much more robust and user-friendly.
#!/usr/bin/env python3
import os
import sys
import subprocess
import re
import json
import pprint
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Iterator
VIDEO_CODEC = "hevc"
AUDIO_CODEC = "aac"
CRF_VALUE = 30
PRESET = "medium"
MAX_RESOLUTION = 540
MAX_FRAME_RATE = 15
MAX_VIDEO_BITRATE = 1400000 # 2 MB = 2 * 1024 * 1024
MAX_AUDIO_BITRATE = 64000 # 128k = 128 * 1024
ALLOWED_VIDEO_EXTENSIONS = (".mp4", ".mkv", ".avi", ".mov", ".m4v", ".wmv", ".vob")
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("reduce_video_size.log", mode="w"),
logging.StreamHandler(),
],
)
logger = logging.getLogger(__name__)
class VideoMetadataError(Exception):
"""Raised when there's an issue with video metadata."""
pass
class ConversionError(Exception):
"""Raised when there's an issue during the conversion process."""
pass
pixel_format_10bit_regex = re.compile("10le$")
pixel_format_12bit_regex = re.compile("12le$")
class PixelFormat:
__slots__ = ("_pix_fmt", "_is_10bit", "_is_12bit")
def __init__(self, pix_fmt):
self._pix_fmt = pix_fmt
self._is_10bit = pixel_format_10bit_regex.search(pix_fmt) is not None
self._is_12bit = pixel_format_12bit_regex.search(pix_fmt) is not None
@property
def pixel_format(self):
return self._pix_fmt
@property
def is_10bit(self):
return self._is_10bit
@property
def is_12bit(self):
return self._is_12bit
@property
def is_8bit(self):
return not (self._is_10bit or self._is_12bit)
def __str__(self):
return self._pix_fmt
def main():
logger.info("Starting main function")
if len(sys.argv) < 2:
print("Usage: python script.py <directory>")
sys.exit(1)
target_dir = sys.argv[1]
logger.info(f"Target directory: {target_dir}")
for file_path, video_metadata, audio_metadata in get_video_files_with_metadata(
target_dir
):
process_file(file_path, video_metadata, audio_metadata)
logger.info("Main function completed")
def get_video_files_with_metadata(directory: str) -> Iterator[Tuple[str, Dict]]:
"""Yields tuples containing the filepath and FFmpeg metadata for video files in the given directory."""
logger.info(f"get_video_files_with_metadata({directory})")
for filepath in get_video_filepaths(directory):
logger.info(f"Processing file: {filepath}")
video_metadata, audio_metadata = get_ffmpeg_metadata(filepath)
if is_valid_video(video_metadata):
logger.info(f"Video file found: {filepath}")
yield filepath, video_metadata, audio_metadata
else:
logger.info(f"Skipping file: {filepath}")
def get_video_filepaths(directory: str) -> Iterator[str]:
"""Yields filepaths of video files in the given directory."""
logger.info(f"get_video_filepaths({directory})")
for root, _, files in os.walk(directory):
for filename in files:
if filename.lower().endswith(ALLOWED_VIDEO_EXTENSIONS):
res = os.path.join(root, filename)
logger.info(f"Found video file: {res}")
yield res
def get_ffmpeg_metadata(filepath: str) -> Tuple[Dict, Dict]:
"""Returns FFmpeg metadata for the given filepath as a tuple of (video_metadata, audio_metadata)."""
logger.info(f"get_ffmpeg_metadata({filepath})")
try:
output = subprocess.check_output(
[
"ffprobe",
"-v",
"error",
"-show_streams",
"-print_format",
"json",
filepath,
]
)
metadata = json.loads(output.decode("utf-8"))
video_metadata = next(
(
stream
for stream in metadata["streams"]
if stream["codec_type"] == "video"
),
{},
)
audio_metadata = next(
(
stream
for stream in metadata["streams"]
if stream["codec_type"] == "audio"
),
{},
)
logger.info(
f"FFmpeg metadata:\nVideo:\n{pprint.pformat(video_metadata)}\nAudio:\n{pprint.pformat(audio_metadata)}"
)
return video_metadata, audio_metadata
except Exception as e:
logger.error(f"Error processing file {filepath}.\n{e}")
return {}, {}
def is_valid_video(metadata: Dict) -> bool:
"""Checks if the metadata indicates a valid video with frames."""
res = int(metadata.get("nb_frames", 0)) > 0 or "nb_frames" not in metadata
logger.info(f"is_valid_video(metadata: Dict) -> {res}")
return res
def process_file(file_path: str, video_metadata: str, audio_metadata: str) -> None:
logger.info(f"process_file({file_path}, metadata: str")
filename = os.path.basename(file_path)
logger.info(f"Processing file: {filename}")
try:
if should_convert(file_path, video_metadata, audio_metadata):
logger.info(f"Conversion needed for {filename}")
output_path = get_unique_path(file_path, ".mkv")
ffmpeg_cmd = prepare_ffmpeg_command(
file_path, output_path, video_metadata, audio_metadata
)
convert_file(file_path, output_path, ffmpeg_cmd)
else:
logger.info(f"No conversion needed for {filename}")
except VideoMetadataError as e:
logger.error(str(e))
except ConversionError as e:
logger.error(str(e))
except Exception as e:
logger.error(
f"Unexpected error occurred while processing file {filename}: {str(e)}"
)
raise
def prepare_ffmpeg_command(
file_path: str, output_path: str, video_metadata: Dict, audio_metadata: Dict
) -> List[str]:
try:
ffmpeg_cmd = ["ffmpeg", "-i", file_path]
ffmpeg_cmd = push_encode_video_args_to_command(ffmpeg_cmd, video_metadata)
ffmpeg_cmd = push_encode_audio_args_to_command(ffmpeg_cmd, audio_metadata)
ffmpeg_cmd = push_change_frame_rate_args_to_command(ffmpeg_cmd, video_metadata)
ffmpeg_cmd.append(output_path)
logger.info(
f"prepare_ffmpeg_command(file_path: str, video_metadata: Dict, audio_metadata: Dict) -> {ffmpeg_cmd}"
)
return ffmpeg_cmd
except Exception as e:
raise VideoMetadataError(f"Error preparing FFmpeg command\n{str(e)}")
def push_encode_video_args_to_command(
ffmpeg_cmd: List[str], video_metadata: Dict
) -> List[str]:
ffmpeg_cmd.append("-c:v")
crf = str(CRF_VALUE)
if get_bitdepth(video_metadata).is_10bit:
ffmpeg_cmd.extend(["libx265", "-x265-params", f"crf={crf}:profile=main10"])
else:
ffmpeg_cmd.extend(["libx265", "-crf", crf])
ffmpeg_cmd.extend(
[
"-maxrate",
bitrate_to_string(MAX_VIDEO_BITRATE),
"-preset",
PRESET,
]
)
try:
height = video_metadata["height"]
width = video_metadata["width"]
shorter_side = min(width, height)
if shorter_side > MAX_RESOLUTION:
ffmpeg_cmd.extend(
[
"-vf",
f"scale={'-2:MAX_RESOLUTION' if width > height else 'MAX_RESOLUTION:-2'}".replace(
"MAX_RESOLUTION", str(MAX_RESOLUTION)
),
]
)
except KeyError as e:
raise VideoMetadataError(
"Error: 'height' or 'width' key not found in video_metadata."
)
logger.info(
f"push_encode_video_args(ffmpeg_cmd: List[str], video_metadata: Dict) -> {ffmpeg_cmd}"
)
return ffmpeg_cmd
def push_encode_audio_args_to_command(
ffmpeg_cmd: List[str], audio_metadata: Dict
) -> List[str]:
ffmpeg_cmd.extend(["-c:a", AUDIO_CODEC])
if "bit_rate" in audio_metadata:
bit_rate = min(int(audio_metadata["bit_rate"]), MAX_AUDIO_BITRATE)
logger.info(f"Audio bit rate: {bit_rate}")
logger.info(f"String audio bit rate: {bitrate_to_string(bit_rate)}")
ffmpeg_cmd.extend(["-b:a", bitrate_to_string(bit_rate)])
logger.info(
f"push_encode_audio_args_to_command(ffmpeg_cmd: List[str], audio_metadata: Dict) -> {ffmpeg_cmd}"
)
return ffmpeg_cmd
def push_change_frame_rate_args_to_command(
ffmpeg_cmd: List[str], video_metadata: Dict
) -> List[str]:
if (
"r_frame_rate" in video_metadata
and calculate_fps(video_metadata["r_frame_rate"]) > MAX_FRAME_RATE
):
ffmpeg_cmd.extend(["-r", str(MAX_FRAME_RATE)])
return ffmpeg_cmd
def get_bitdepth(video_metadata: Dict) -> PixelFormat:
res = PixelFormat(video_metadata["pix_fmt"])
logger.info(f"Video.get_bitdepth() -> {res}")
return res
def bitrate_to_string(bitrate: int) -> str:
return f"{bitrate // 1000}k"
def calculate_fps(frame_rate: str) -> int:
logger.info(f"calculate_fps(frame_rate: {frame_rate})")
numerator, denominator = map(float, frame_rate.split("/"))
fps = numerator // denominator if denominator != 0 else 0
logger.info(f"Calculated FPS: {fps}")
return fps
def should_convert(file_path: str, video_metadata: Dict, audio_metadata: Dict) -> bool:
try:
video_width = video_metadata.get("width", 0)
video_height = video_metadata.get("height", 0)
fps = calculate_fps(video_metadata.get("r_frame_rate", "0/1"))
video_codec = video_metadata.get("codec_name", "")
video_bitrate = int(video_metadata.get("bit_rate", 0))
audio_bitrate = int(audio_metadata.get("bit_rate", 0))
except Exception as e:
logger.error(f"Error getting metadata in should_convert\n{e}")
return any(
[
video_codec not in ["hevc", "h265"],
fps > MAX_FRAME_RATE,
min(video_width, video_height) > MAX_RESOLUTION,
video_bitrate > MAX_VIDEO_BITRATE,
audio_bitrate > MAX_AUDIO_BITRATE,
]
)
def convert_file(file_path: str, output_path: str, ffmpeg_cmd: List[str]) -> None:
try:
logger.info(f"Executing FFmpeg command: {' '.join(ffmpeg_cmd)}")
subprocess.run(ffmpeg_cmd, check=True)
logger.info("Conversion completed successfully.")
remove_larger_file(file_path, output_path)
except KeyboardInterrupt:
cleanup(output_path)
exit(0)
except Exception as e:
cleanup(output_path)
logger.warning(f"An error occurred during conversion.\n{str(e)}")
def cleanup(output_path: str) -> None:
logger.info(f"Cleaning up: Removing {output_path}")
try:
os.remove(output_path)
logger.info(f"Removed {output_path}")
except FileNotFoundError:
logger.info(f"{output_path} was not found")
def get_unique_path(file_path: str | Path, suffix: str) -> str:
file_path = Path(file_path)
base = remove_number_suffix(file_path.stem)
return generate_unique_path(file_path.parent, base, suffix)
def remove_number_suffix(base: str) -> str:
re_marker = re.compile(r"(?P<prefix>.*)$\d+$$")
if mo := re_marker.match(base):
return mo.group("prefix")
return base
def generate_unique_path(parent: Path, base: str, suffix: str) -> str:
new_path = parent / f'{base}{suffix}'
if not new_path.exists():
return str(new_path)
count = 2
while True:
new_path = parent / f'{base}({count}){suffix}'
if not new_path.exists():
return str(new_path)
count += 1
def remove_larger_file(input_path: str, output_path: str) -> None:
try:
input_size = os.path.getsize(input_path)
output_size = os.path.getsize(output_path)
if input_size > output_size:
os.remove(input_path)
logger.info(f"Removed larger input file: {input_path}")
else:
os.remove(output_path)
logger.info(f"Removed larger output file: {output_path}")
except FileNotFoundError as e:
raise ConversionError(f"One or both files not found: {str(e)}")
if __name__ == "__main__":
main()
The complexity you talk about is simply not true. Database updates are not needed. A scan is sufficient. Tdarr handles everything in basic mode without flows independent of stash. I did not do any work like you are talking about. I re-encoded over 100,000 videos and saved over 100TB without any scripts and it works with stash with no additional effort.
You are really overthinking it and taking too much effort.
I never ever needed manual metadata transfer and the scans were so fast it was not an issue.
As I was scrolling through the thread, I was thinking “will stash automatically recognize a compressed file as the same scene” - so thank you for saving me from having to ask!