media_analyzer
Python package for analyzing video/image with machine learning methods, exif data, and other file based information.
1""" 2Python package for analyzing video/image with machine learning methods, 3exif data, and other file based information.""" 4 5from media_analyzer.data.anaylzer_config import AnalyzerSettings, FullAnalyzerConfig 6from media_analyzer.data.enums.analyzer_module import VisualModule, FileModule, AnalyzerModule 7from media_analyzer.data.enums.config_types import CaptionerProvider, LLMProvider 8from media_analyzer.data.enums.face_sex import FaceSex 9from media_analyzer.data.interfaces.api_io import InputMedia, MediaAnalyzerOutput 10from media_analyzer.data.interfaces.frame_data import MeasuredQualityData, FrameData, OCRData, \ 11 FrameDataOutput, ColorData, CaptionData, ColorHistogram 12from media_analyzer.data.interfaces.image_data import ImageData, ExifData, GPSData, TimeData, \ 13 WeatherData, IntermediateTimeData, ImageDataOutput, TagData 14from media_analyzer.data.interfaces.location_types import GeoLocation 15from media_analyzer.data.interfaces.ml_types import FaceBox, ObjectBox, OCRBox, BaseBoundingBox 16from media_analyzer.machine_learning.caption.blip_captioner import BlipCaptioner 17from media_analyzer.machine_learning.caption.captioner_protocol import CaptionerProtocol 18from media_analyzer.machine_learning.caption.get_captioner import get_captioner_by_provider 19from media_analyzer.machine_learning.caption.llm_captioner import LLMCaptioner 20from media_analyzer.machine_learning.embedding.embedder_protocol import EmbedderProtocol 21from media_analyzer.machine_learning.embedding.open_clip_embedder import OpenCLIPEmbedder 22from media_analyzer.machine_learning.embedding.zero_clip_embedder import ZeroCLIPEmbedder 23from media_analyzer.machine_learning.facial_recognition.facial_recognition_protocol import ( 24 FacialRecognitionProtocol, 25) 26from media_analyzer.machine_learning.facial_recognition.insight_facial_recognition import ( 27 InsightFacialRecognition, 28) 29from media_analyzer.machine_learning.object_detection.object_detection_protocol import ( 30 ObjectDetectionProtocol, 31) 32from media_analyzer.machine_learning.object_detection.resnet_object_detection import ( 33 ResnetObjectDetection, 34) 35from media_analyzer.machine_learning.ocr.ocr_protocol import OCRProtocol 36from media_analyzer.machine_learning.ocr.resnet_tesseract_ocr import ResnetTesseractOCR 37from media_analyzer.machine_learning.visual_llm.base_visual_llm import BaseVisualLLM, ChatMessage, \ 38 ChatRole 39from media_analyzer.machine_learning.visual_llm.get_llm import get_llm_by_provider 40from media_analyzer.machine_learning.visual_llm.mini_cpm_llm import MiniCPMLLM 41from media_analyzer.machine_learning.visual_llm.openai_llm import OpenAILLM 42from media_analyzer.media_analyzer import MediaAnalyzer 43from media_analyzer.processing.pipeline.file_based.data_url_module import DataUrlModule 44from media_analyzer.processing.pipeline.file_based.exif_module import ExifModule 45from media_analyzer.processing.pipeline.file_based.gps_module import GPSModule 46from media_analyzer.processing.pipeline.file_based.tags_module import TagsModule 47from media_analyzer.processing.pipeline.file_based.time_module import TimeModule 48from media_analyzer.processing.pipeline.file_based.weather_module import WeatherModule 49from media_analyzer.processing.pipeline.pipeline_module import PipelineModule 50from media_analyzer.processing.pipeline.visual_based.caption_module import CaptionModule 51from media_analyzer.processing.pipeline.visual_based.color_module import ColorModule 52from media_analyzer.processing.pipeline.visual_based.embedding_module import EmbeddingModule 53from media_analyzer.processing.pipeline.visual_based.faces_module import FacesModule 54from media_analyzer.processing.pipeline.visual_based.objects_module import ObjectsModule 55from media_analyzer.processing.pipeline.visual_based.ocr_module import OCRModule 56from media_analyzer.processing.pipeline.visual_based.quality_detection_module import ( 57 QualityDetectionModule, 58) 59from media_analyzer.processing.pipeline.visual_based.summary_module import SummaryModule 60 61__all__ = [ 62 # Main classes 63 "MediaAnalyzer", 64 "MediaAnalyzerOutput", 65 "InputMedia", 66 "AnalyzerSettings", 67 "AnalyzerModule", 68 "FileModule", 69 "VisualModule", 70 71 # Output data classes 72 "ImageDataOutput", 73 "FrameDataOutput", 74 ## Image Data 75 "ExifData", 76 "GPSData", 77 "TimeData", 78 "WeatherData", 79 "IntermediateTimeData", 80 "TagData", 81 ## Frame Data 82 "OCRData", 83 "CaptionData", 84 "MeasuredQualityData", 85 "ColorData", 86 "ColorHistogram", 87 88 # Extra output dataclasses 89 "GeoLocation", 90 "ChatMessage", 91 "ChatRole", 92 93 # Providers 94 "get_llm_by_provider", 95 "LLMProvider", 96 "get_captioner_by_provider", 97 "CaptionerProvider", 98 99 # Modules 100 "PipelineModule", 101 ## File-based Modules 102 "DataUrlModule", 103 "ExifModule", 104 "GPSModule", 105 "TimeModule", 106 "WeatherModule", 107 "TagsModule", 108 ## Visual-based Modules 109 "CaptionModule", 110 "EmbeddingModule", 111 "SummaryModule", 112 "FacesModule", 113 "OCRModule", 114 "ObjectsModule", 115 "QualityDetectionModule", 116 "ColorModule", 117 118 # Machine learning classes 119 "CaptionerProtocol", 120 "BlipCaptioner", 121 "EmbedderProtocol", 122 "ZeroCLIPEmbedder", 123 "OpenCLIPEmbedder", 124 "FacialRecognitionProtocol", 125 "InsightFacialRecognition", 126 "ObjectDetectionProtocol", 127 "ResnetObjectDetection", 128 "OCRProtocol", 129 "ResnetTesseractOCR", 130 "LLMCaptioner", 131 "BaseVisualLLM", 132 "MiniCPMLLM", 133 "OpenAILLM", 134 135 # Machine learning types 136 "BaseBoundingBox", 137 "OCRBox", 138 "ObjectBox", 139 "FaceBox", 140 "FaceSex", 141 142 # Somewhat useless 143 "FullAnalyzerConfig", 144 "ImageData", 145 "FrameData", 146]
21class MediaAnalyzer: 22 """Analyze media using a machine learning models, file based analysis, and exif data.""" 23 24 config: FullAnalyzerConfig 25 26 def __init__(self, config: AnalyzerSettings | None = None) -> None: 27 """Initialize the media analyzer with the given configuration.""" 28 if config is None: 29 config = AnalyzerSettings() 30 embedder = get_embedder_by_provider(config.embedder_provider) 31 self.config = FullAnalyzerConfig( 32 llm=get_llm_by_provider(config.llm_provider), 33 captioner=get_captioner_by_provider(config.captions_provider), 34 ocr=ResnetTesseractOCR(), 35 object_detector=ResnetObjectDetection(), 36 facial_recognition=InsightFacialRecognition(), 37 embedder=embedder, 38 settings=config, 39 ) 40 41 def analyze(self, input_media: InputMedia) -> MediaAnalyzerOutput: 42 """Analyze the given photo or video.""" 43 image_data, frame_data = run_metadata_pipeline(input_media, self.config) 44 image_data_output = ImageDataOutput( 45 path=image_data.path, 46 exif=image_data.exif, 47 data_url=image_data.data_url, 48 gps=image_data.gps, 49 time=image_data.time, 50 weather=image_data.weather, 51 tags=image_data.tags, 52 ) 53 frame_output = [ 54 FrameDataOutput( 55 ocr=frame.ocr, 56 embedding=frame.embedding, 57 faces=frame.faces, 58 summary=frame.summary, 59 caption_data=frame.caption_data, 60 objects=frame.objects, 61 measured_quality=frame.measured_quality, 62 color=frame.color, 63 ) 64 for frame in frame_data 65 ] 66 return MediaAnalyzerOutput(image_data=image_data_output, frame_data=frame_output) 67 68 def photo(self, image_path: Path) -> MediaAnalyzerOutput: 69 """Analyze a photo.""" 70 return self.analyze(InputMedia(image_path, frames=[image_path]))
Analyze media using a machine learning models, file based analysis, and exif data.
26 def __init__(self, config: AnalyzerSettings | None = None) -> None: 27 """Initialize the media analyzer with the given configuration.""" 28 if config is None: 29 config = AnalyzerSettings() 30 embedder = get_embedder_by_provider(config.embedder_provider) 31 self.config = FullAnalyzerConfig( 32 llm=get_llm_by_provider(config.llm_provider), 33 captioner=get_captioner_by_provider(config.captions_provider), 34 ocr=ResnetTesseractOCR(), 35 object_detector=ResnetObjectDetection(), 36 facial_recognition=InsightFacialRecognition(), 37 embedder=embedder, 38 settings=config, 39 )
Initialize the media analyzer with the given configuration.
41 def analyze(self, input_media: InputMedia) -> MediaAnalyzerOutput: 42 """Analyze the given photo or video.""" 43 image_data, frame_data = run_metadata_pipeline(input_media, self.config) 44 image_data_output = ImageDataOutput( 45 path=image_data.path, 46 exif=image_data.exif, 47 data_url=image_data.data_url, 48 gps=image_data.gps, 49 time=image_data.time, 50 weather=image_data.weather, 51 tags=image_data.tags, 52 ) 53 frame_output = [ 54 FrameDataOutput( 55 ocr=frame.ocr, 56 embedding=frame.embedding, 57 faces=frame.faces, 58 summary=frame.summary, 59 caption_data=frame.caption_data, 60 objects=frame.objects, 61 measured_quality=frame.measured_quality, 62 color=frame.color, 63 ) 64 for frame in frame_data 65 ] 66 return MediaAnalyzerOutput(image_data=image_data_output, frame_data=frame_output)
Analyze the given photo or video.
23@dataclass 24class MediaAnalyzerOutput: 25 """Output of the media-analyzer package. 26 27 Attributes: 28 image_data: File based analysis. 29 frame_data: Visual analysis for the frames given in the input. 30 """ 31 32 image_data: ImageDataOutput 33 frame_data: list[FrameDataOutput]
Output of the media-analyzer package.
Attributes:
- image_data: File based analysis.
- frame_data: Visual analysis for the frames given in the input.
9@dataclass 10class InputMedia: 11 """Input for the media-analyzer package. 12 13 Attributes: 14 path: The path to the photo or video file. 15 frames: A list of frame paths. In case of a photo, one frame is supplied, 16 for a video you can generate multiple frames and submit them for analysis. 17 """ 18 19 path: Path 20 frames: list[Path]
Input for the media-analyzer package.
Attributes:
- path: The path to the photo or video file.
- frames: A list of frame paths. In case of a photo, one frame is supplied, for a video you can generate multiple frames and submit them for analysis.
20@dataclass 21class AnalyzerSettings: 22 """Configuration settings for the media analysis pipeline. 23 24 This class contains various options for configuring how photo and video files 25 are analyzed, including language settings for OCR, the selection of providers 26 for captions and LLMs, and thresholds for different detection modules. 27 28 Attributes: 29 media_languages: The languages used for OCR. 30 theme_color_variant: The color variant used for the generated theme. 31 theme_contrast_level: The contrast level used for the generated theme. 32 captions_provider: The provider to be used for generating captions. 33 llm_provider: The provider for the large language model (LLM), 34 which can be used for summaries and captions. 35 enable_text_summary: Flag to enable or disable image summarization, uses LLM so is slow. 36 enable_document_summary: Flag to enable or disable document summaries, uses LLM so is slow. 37 document_detection_threshold: Threshold for detecting documents in images [0-100]. 38 face_detection_threshold: Threshold for face detection [0-1]. 39 enabled_file_modules: The set of modules used for file-based analysis. 40 enabled_visual_modules: The set of modules for visual analysis. 41 """ 42 43 media_languages: tuple[str, ...] = ("nld", "eng") 44 theme_contrast_level: float = 0.2 45 theme_color_variant: Variant = Variant.VIBRANT 46 captions_provider: CaptionerProvider = CaptionerProvider.BLIP_INSTRUCT 47 llm_provider: LLMProvider = LLMProvider.MINICPM 48 embedder_provider: EmbedderProvider = EmbedderProvider.OPEN_CLIP 49 enable_text_summary: bool = False 50 enable_document_summary: bool = False 51 document_detection_threshold: int = 65 52 face_detection_threshold: float = 0.7 53 enabled_file_modules: set[FileModule] = field( 54 default_factory=lambda: { 55 FileModule.DATA_URL, 56 FileModule.EXIF, 57 FileModule.GPS, 58 FileModule.TAGS, 59 FileModule.TIME, 60 FileModule.WEATHER, 61 } 62 ) 63 enabled_visual_modules: set[VisualModule] = field( 64 default_factory=lambda: { 65 VisualModule.CAPTION, 66 VisualModule.EMBEDDING, 67 VisualModule.FACES, 68 VisualModule.OBJECTS, 69 VisualModule.OCR, 70 VisualModule.QUALITY_DETECTION, 71 VisualModule.SUMMARY, 72 VisualModule.COLOR, 73 } 74 )
Configuration settings for the media analysis pipeline.
This class contains various options for configuring how photo and video files are analyzed, including language settings for OCR, the selection of providers for captions and LLMs, and thresholds for different detection modules.
Attributes:
- media_languages: The languages used for OCR.
- theme_color_variant: The color variant used for the generated theme.
- theme_contrast_level: The contrast level used for the generated theme.
- captions_provider: The provider to be used for generating captions.
- llm_provider: The provider for the large language model (LLM), which can be used for summaries and captions.
- enable_text_summary: Flag to enable or disable image summarization, uses LLM so is slow.
- enable_document_summary: Flag to enable or disable document summaries, uses LLM so is slow.
- document_detection_threshold: Threshold for detecting documents in images [0-100].
- face_detection_threshold: Threshold for face detection [0-1].
- enabled_file_modules: The set of modules used for file-based analysis.
- enabled_visual_modules: The set of modules for visual analysis.
5class FileModule(StrEnum): 6 """Enum for selecting file-based analyzer modules.""" 7 8 DATA_URL = "DataUrlModule" 9 EXIF = "ExifModule" 10 GPS = "GPSModule" 11 TAGS = "TagsModule" 12 TIME = "TimeModule" 13 WEATHER = "WeatherModule"
Enum for selecting file-based analyzer modules.
16class VisualModule(StrEnum): 17 """Enum for selecting visual based analyzer modules.""" 18 19 CAPTION = "CaptionModule" 20 EMBEDDING = "EmbeddingModule" 21 FACES = "FacesModule" 22 OBJECTS = "ObjectsModule" 23 OCR = "OCRModule" 24 QUALITY_DETECTION = "QualityDetectionModule" 25 SUMMARY = "SummaryModule" 26 COLOR = "ColorModule"
Enum for selecting visual based analyzer modules.
167@dataclass 168class ImageDataOutput: 169 """Comprehensive data for an image. 170 171 Attributes: 172 path: The file system path to the image. 173 exif: Exif data of the image. 174 data_url: The data URL representation of the image. 175 gps: GPS data associated with the image. 176 time: Time-related data for the image. 177 weather: Weather data at the time the image was taken. 178 """ 179 180 path: Path 181 exif: ExifData | None = None 182 data_url: str | None = None 183 gps: GPSData | None = None 184 time: TimeData | IntermediateTimeData | None = None 185 weather: WeatherData | None = None 186 tags: TagData | None = None
Comprehensive data for an image.
Attributes:
- path: The file system path to the image.
- exif: Exif data of the image.
- data_url: The data URL representation of the image.
- gps: GPS data associated with the image.
- time: Time-related data for the image.
- weather: Weather data at the time the image was taken.
118@dataclass 119class FrameDataOutput: 120 """Data for a frame. 121 122 Attributes: 123 ocr: The OCR data. 124 embedding: The embedding data. 125 faces: The face boxes. 126 summary: The frame summary. 127 caption_data: Info extracted using caption instructions. 128 objects: The object boxes. 129 measured_quality: The measured quality data. 130 """ 131 132 ocr: OCRData | None = None 133 embedding: list[float] | None = None 134 faces: list[FaceBox] | None = None 135 summary: str | None = None 136 caption_data: CaptionData | None = None 137 objects: list[ObjectBox] | None = None 138 measured_quality: MeasuredQualityData | None = None 139 color: ColorData | None = None
Data for a frame.
Attributes:
- ocr: The OCR data.
- embedding: The embedding data.
- faces: The face boxes.
- summary: The frame summary.
- caption_data: Info extracted using caption instructions.
- objects: The object boxes.
- measured_quality: The measured quality data.
11@dataclass 12class ExifData: 13 """Exif Data of the image. 14 15 Attributes: 16 width: The width of the image. 17 height: The height of the image. 18 duration: The duration of the media, if applicable. 19 size_bytes: The size of the file in bytes. 20 format: The format of the image. 21 exif_tool: The output from ExifTool. 22 file: File-related information. 23 composite: Composite data. 24 exif: Exif metadata, if available. 25 xmp: XMP metadata, if available. 26 mpf: Motion photo metadata, if available. 27 jfif: JFIF metadata, if available. 28 icc_profile: ICC profile data, if available. 29 gif: GIF-specific data, if available. 30 quicktime: QuickTime-specific data, if available. 31 matroska: Matroska-specific data, if available. 32 """ 33 34 width: int 35 height: int 36 duration: float | None 37 size_bytes: int 38 format: str 39 exif_tool: dict[str, Any] 40 file: dict[str, Any] 41 composite: dict[str, Any] 42 exif: dict[str, Any] | None 43 xmp: dict[str, Any] | None 44 mpf: dict[str, Any] | None 45 jfif: dict[str, Any] | None 46 icc_profile: dict[str, Any] | None 47 gif: dict[str, Any] | None 48 png: dict[str, Any] | None 49 quicktime: dict[str, Any] | None 50 matroska: dict[str, Any] | None
Exif Data of the image.
Attributes:
- width: The width of the image.
- height: The height of the image.
- duration: The duration of the media, if applicable.
- size_bytes: The size of the file in bytes.
- format: The format of the image.
- exif_tool: The output from ExifTool.
- file: File-related information.
- composite: Composite data.
- exif: Exif metadata, if available.
- xmp: XMP metadata, if available.
- mpf: Motion photo metadata, if available.
- jfif: JFIF metadata, if available.
- icc_profile: ICC profile data, if available.
- gif: GIF-specific data, if available.
- quicktime: QuickTime-specific data, if available.
- matroska: Matroska-specific data, if available.
53@dataclass 54class GPSData: 55 """GPS Data related to the image. 56 57 Attributes: 58 latitude: The latitude coordinate. 59 longitude: The longitude coordinate. 60 altitude: The altitude information. 61 location: The geolocation information. 62 """ 63 64 latitude: float | None = None 65 longitude: float | None = None 66 altitude: float | None = None 67 location: GeoLocation | None = None
GPS Data related to the image.
Attributes:
- latitude: The latitude coordinate.
- longitude: The longitude coordinate.
- altitude: The altitude information.
- location: The geolocation information.
77@dataclass 78class TimeData: 79 """Time-related data for the image. 80 81 Attributes: 82 datetime_local: The local datetime. 83 datetime_source: The source of the datetime information. 84 timezone_name: The name of the timezone. 85 timezone_offset: The offset of the timezone. 86 datetime_utc: The UTC datetime based of the GPS data. 87 """ 88 89 datetime_local: datetime 90 datetime_source: str 91 timezone_name: str | None 92 timezone_offset: timedelta | None 93 datetime_utc: datetime | None = None
Time-related data for the image.
Attributes:
- datetime_local: The local datetime.
- datetime_source: The source of the datetime information.
- timezone_name: The name of the timezone.
- timezone_offset: The offset of the timezone.
- datetime_utc: The UTC datetime based of the GPS data.
96@dataclass 97class WeatherData: 98 """Weather data from the time and place the image was taken. 99 100 Attributes: 101 weather_recorded_at: The datetime when the weather was recorded. 102 weather_temperature: The temperature at the time of recording. 103 weather_dewpoint: The dew point at the time of recording. 104 weather_relative_humidity: The relative humidity at the time of recording. 105 weather_precipitation: The precipitation level at the time of recording. 106 weather_wind_gust: The wind gust speed at the time of recording. 107 weather_pressure: The atmospheric pressure at the time of recording. 108 weather_sun_hours: The sun hours at the time of recording. 109 weather_condition: The weather condition at the time of recording. 110 """ 111 112 weather_recorded_at: datetime | None = None 113 weather_temperature: float | None = None 114 weather_dewpoint: float | None = None 115 weather_relative_humidity: float | None = None 116 weather_precipitation: float | None = None 117 weather_wind_gust: float | None = None 118 weather_pressure: float | None = None 119 weather_sun_hours: float | None = None 120 weather_condition: WeatherCondition | None = None
Weather data from the time and place the image was taken.
Attributes:
- weather_recorded_at: The datetime when the weather was recorded.
- weather_temperature: The temperature at the time of recording.
- weather_dewpoint: The dew point at the time of recording.
- weather_relative_humidity: The relative humidity at the time of recording.
- weather_precipitation: The precipitation level at the time of recording.
- weather_wind_gust: The wind gust speed at the time of recording.
- weather_pressure: The atmospheric pressure at the time of recording.
- weather_sun_hours: The sun hours at the time of recording.
- weather_condition: The weather condition at the time of recording.
70@dataclass 71class IntermediateTimeData: 72 """Intermediate Time Data related to the image, storing just datetime_utc.""" 73 74 datetime_utc: datetime | None = None
Intermediate Time Data related to the image, storing just datetime_utc.
123@dataclass 124class TagData: 125 """Tags, such as is_panorama, is_motion_photo, is_night_sight.""" 126 127 use_panorama_viewer: bool 128 is_photosphere: bool 129 projection_type: str | None 130 is_motion_photo: bool 131 motion_photo_presentation_timestamp: int | None 132 is_night_sight: bool 133 is_hdr: bool 134 is_burst: bool 135 burst_id: str | None 136 is_timelapse: bool 137 is_slowmotion: bool 138 is_video: bool 139 capture_fps: float | None 140 video_fps: float | None
Tags, such as is_panorama, is_motion_photo, is_night_sight.
43@dataclass 44class OCRData: 45 """OCR data for a frame. 46 47 Attributes: 48 has_legible_text: Whether the text is legible. 49 ocr_text: The OCR text. 50 document_summary: The document summary. 51 ocr_boxes: The OCR boxes. 52 """ 53 54 has_legible_text: bool 55 ocr_text: str | None 56 document_summary: str | None 57 ocr_boxes: list[OCRBox]
OCR data for a frame.
Attributes:
- has_legible_text: Whether the text is legible.
- ocr_text: The OCR text.
- document_summary: The document summary.
- ocr_boxes: The OCR boxes.
11@dataclass 12class CaptionData: 13 """A model to store structured information about an image.""" 14 15 default_caption: str 16 main_subject: str 17 is_indoor: bool 18 contains_pets: bool 19 is_food_or_drink: bool 20 contains_vehicle: bool 21 setting: str 22 is_event: bool 23 contains_landmarks: bool 24 is_document: bool 25 contains_people: bool 26 is_landscape: bool | None = None 27 is_cityscape: bool | None = None 28 pet_type: str | None = None 29 contains_animals: bool | None = None 30 animal_type: str | None = None 31 food_or_drink_type: str | None = None 32 vehicle_type: str | None = None 33 event_type: str | None = None 34 landmark_name: str | None = None 35 document_type: str | None = None 36 people_count: int | None = None 37 people_mood: str | None = None 38 photo_type: str | None = None 39 is_activity: bool | None = None 40 activity_description: str | None = None
A model to store structured information about an image.
60@dataclass 61class MeasuredQualityData: 62 """Measured quality data for a frame. 63 64 Attributes: 65 measured_sharpness: The measured sharpness. 66 measured_noise: The measured noise. 67 measured_brightness: The measured brightness. 68 measured_contrast: The measured contrast. 69 measured_clipping: The measured clipping. 70 measured_dynamic_range: The measured dynamic range. 71 quality_score: The quality score. 72 """ 73 74 measured_sharpness: float 75 measured_noise: int 76 measured_brightness: float 77 measured_contrast: float 78 measured_clipping: float 79 measured_dynamic_range: float 80 quality_score: float
Measured quality data for a frame.
Attributes:
- measured_sharpness: The measured sharpness.
- measured_noise: The measured noise.
- measured_brightness: The measured brightness.
- measured_contrast: The measured contrast.
- measured_clipping: The measured clipping.
- measured_dynamic_range: The measured dynamic range.
- quality_score: The quality score.
98@dataclass 99class ColorData: 100 """Color info, and theme generated based on image. 101 102 Attributes: 103 themes: Generated themes based of prominent colors in the image. 104 prominent_colors: Prominent colors extracted from the image. 105 average_hue: Average hue value in degrees. 106 average_saturation: Average saturation value [0 to 100]. 107 average_lightness: Average lightness value [0 to 100]. 108 """ 109 110 themes: list[dict[str, Any]] 111 prominent_colors: list[str] 112 average_hue: float 113 average_saturation: float 114 average_lightness: float 115 histogram: ColorHistogram
Color info, and theme generated based on image.
Attributes:
- themes: Generated themes based of prominent colors in the image.
- prominent_colors: Prominent colors extracted from the image.
- average_hue: Average hue value in degrees.
- average_saturation: Average saturation value [0 to 100].
- average_lightness: Average lightness value [0 to 100].
91class ColorHistogram(TypedDict): 92 """Types for histogram dict in ColorData.""" 93 94 bins: int 95 channels: RGBChannels
Types for histogram dict in ColorData.
5@dataclass 6class GeoLocation: 7 """Represents a reverse geocoded location where a photo/video was taken. 8 9 Attributes: 10 country: The country name. 11 city: The city name. 12 province: The province or state name, if applicable. 13 place_latitude: The latitude coordinate of the location. 14 place_longitude: The longitude coordinate of the location. 15 """ 16 17 country: str 18 city: str 19 province: str | None 20 place_latitude: float 21 place_longitude: float
Represents a reverse geocoded location where a photo/video was taken.
Attributes:
- country: The country name.
- city: The city name.
- province: The province or state name, if applicable.
- place_latitude: The latitude coordinate of the location.
- place_longitude: The longitude coordinate of the location.
18@dataclass 19class ChatMessage: 20 """Chat message dataclass.""" 21 22 message: str 23 images: list[Image] = field(default_factory=list) 24 role: ChatRole = ChatRole.USER
Chat message dataclass.
10class ChatRole(StrEnum): 11 """Chat roles enum.""" 12 13 ASSISTANT = auto() 14 USER = auto() 15 SYSTEM = auto()
Chat roles enum.
13def get_llm_by_provider(provider: LLMProvider) -> BaseVisualLLM: 14 """Get the LLM by the provider.""" 15 return llm_providers[provider]()
Get the LLM by the provider.
12class LLMProvider(StrEnum): 13 """LLM providers enum.""" 14 15 MINICPM = auto() 16 OPENAI = auto()
LLM providers enum.
9def get_captioner_by_provider(provider: CaptionerProvider) -> CaptionerProtocol: 10 """Get the captioner by the provider. 11 12 Args: 13 provider: The captioner provider. 14 15 Returns: 16 The captioner. 17 """ 18 return { 19 CaptionerProvider.MINICPM: lambda: LLMCaptioner(LLMProvider.MINICPM), 20 CaptionerProvider.OPENAI: lambda: LLMCaptioner(LLMProvider.OPENAI), 21 CaptionerProvider.BLIP: BlipCaptioner, 22 CaptionerProvider.BLIP_INSTRUCT: InstructBlipCaptioner, 23 }[provider]()
Get the captioner by the provider.
Arguments:
- provider: The captioner provider.
Returns:
The captioner.
19class CaptionerProvider(StrEnum): 20 """Captioner providers enum.""" 21 22 BLIP_INSTRUCT = auto() 23 MINICPM = auto() 24 OPENAI = auto() 25 BLIP = auto()
Captioner providers enum.
18class PipelineModule(ABC, Generic[TData]): 19 """A generic pipeline module that can process either File-based or Visual data.""" 20 21 run_times: list[float] 22 id: str 23 depends: ClassVar[set[AnalyzerModule]] = set() 24 25 def __init__(self) -> None: 26 """Initializes the PipelineModule.""" 27 self.id = self.__class__.__name__ 28 self.run_times = [] 29 30 def run(self, data: TData, config: FullAnalyzerConfig) -> None: 31 """Runs the pipeline module. 32 33 Measuring the execution time and delegating the 34 actual processing to the `process` method. 35 36 Args: 37 data: The data to be processed (ImageData or FrameData). 38 config: The configuration object (e.g., FullAnalyzerConfig). 39 """ 40 start_time = time.time() 41 42 self.process(data, config) 43 self.run_times.append(time.time() - start_time) 44 45 @abstractmethod 46 def process(self, data: TData, config: FullAnalyzerConfig) -> None: 47 """Abstract method for processing data. This should be implemented by subclasses. 48 49 Args: 50 data: The data to be processed (ImageData or FrameData). 51 config: The configuration object. 52 """
A generic pipeline module that can process either File-based or Visual data.
25 def __init__(self) -> None: 26 """Initializes the PipelineModule.""" 27 self.id = self.__class__.__name__ 28 self.run_times = []
Initializes the PipelineModule.
30 def run(self, data: TData, config: FullAnalyzerConfig) -> None: 31 """Runs the pipeline module. 32 33 Measuring the execution time and delegating the 34 actual processing to the `process` method. 35 36 Args: 37 data: The data to be processed (ImageData or FrameData). 38 config: The configuration object (e.g., FullAnalyzerConfig). 39 """ 40 start_time = time.time() 41 42 self.process(data, config) 43 self.run_times.append(time.time() - start_time)
Runs the pipeline module.
Measuring the execution time and delegating the
actual processing to the process method.
Arguments:
- data: The data to be processed (ImageData or FrameData).
- config: The configuration object (e.g., FullAnalyzerConfig).
45 @abstractmethod 46 def process(self, data: TData, config: FullAnalyzerConfig) -> None: 47 """Abstract method for processing data. This should be implemented by subclasses. 48 49 Args: 50 data: The data to be processed (ImageData or FrameData). 51 config: The configuration object. 52 """
Abstract method for processing data. This should be implemented by subclasses.
Arguments:
- data: The data to be processed (ImageData or FrameData).
- config: The configuration object.
13class DataUrlModule(PipelineModule[ImageData]): 14 """Convert an image to a data URL.""" 15 16 def process(self, data: ImageData, _: FullAnalyzerConfig) -> None: 17 """Convert an image to a data URL.""" 18 tiny_height = 6 19 with PIL.Image.open(data.frames[0]) as pil_image: 20 img = pil_image.resize( 21 ( 22 int(pil_image.width / pil_image.height * tiny_height), 23 tiny_height, 24 ), 25 ) 26 buffered = BytesIO() 27 img.save(buffered, format="PNG") 28 data.data_url = base64.b64encode(buffered.getvalue()).decode()
Convert an image to a data URL.
16 def process(self, data: ImageData, _: FullAnalyzerConfig) -> None: 17 """Convert an image to a data URL.""" 18 tiny_height = 6 19 with PIL.Image.open(data.frames[0]) as pil_image: 20 img = pil_image.resize( 21 ( 22 int(pil_image.width / pil_image.height * tiny_height), 23 tiny_height, 24 ), 25 ) 26 buffered = BytesIO() 27 img.save(buffered, format="PNG") 28 data.data_url = base64.b64encode(buffered.getvalue()).decode()
Convert an image to a data URL.
44class ExifModule(PipelineModule[ImageData]): 45 """Extract EXIF data from an image using exiftool.""" 46 47 def process(self, data: ImageData, _: FullAnalyzerConfig) -> None: 48 """Extract EXIF data from an image.""" 49 with ExifToolHelper() as et: 50 result = et.execute_json(str(data.path)) 51 exif_dict = structure_exiftool_dict(result[0]) 52 if ( 53 "Composite" not in exif_dict 54 or "File" not in exif_dict 55 or "ExifTool" not in exif_dict 56 ): 57 raise ValueError(f"Media-analyzer does not support this file {data.path}") 58 59 if "EXIF" in exif_dict: 60 alt_ref = exif_dict["EXIF"].get("GPSAltitudeRef") 61 # altitude ref = 0 means above sea level 62 # ref = 1 means below sea level 63 # LG G4 produces ref = 1.8 for some reason when above sea level 64 # (maybe also below?) 65 if alt_ref not in {0, 1, None}: 66 if "GPSAltitude" in exif_dict["Composite"]: 67 exif_dict["Composite"]["GPSAltitude"] = abs( 68 exif_dict["Composite"]["GPSAltitude"], 69 ) 70 exif_dict["EXIF"]["GPSAltitudeRef"] = 0 71 72 assert "ExifTool" in exif_dict 73 assert "File" in exif_dict 74 assert "Composite" in exif_dict 75 width = exif_dict["File"].get("ImageWidth") 76 height = exif_dict["File"].get("ImageHeight") 77 duration: float | None = None 78 if "GIF" in exif_dict: 79 width = exif_dict["GIF"]["ImageWidth"] 80 height = exif_dict["GIF"]["ImageHeight"] 81 if "PNG" in exif_dict: 82 width = exif_dict["PNG"]["ImageWidth"] 83 height = exif_dict["PNG"]["ImageHeight"] 84 if "QuickTime" in exif_dict: 85 duration = exif_dict["QuickTime"]["Duration"] 86 width = exif_dict["QuickTime"]["ImageWidth"] 87 height = exif_dict["QuickTime"]["ImageHeight"] 88 if "Matroska" in exif_dict: 89 width = exif_dict["Matroska"]["ImageWidth"] 90 height = exif_dict["Matroska"]["ImageHeight"] 91 duration = parse_duration(exif_dict["Matroska"]["Duration"]) 92 93 assert width and height 94 data.exif = ExifData( 95 size_bytes=exif_dict["File"]["FileSize"], 96 width=width, 97 height=height, 98 duration=duration, 99 format=exif_dict["File"]["MIMEType"], 100 exif_tool=exif_dict["ExifTool"], 101 file=exif_dict["File"], 102 exif=exif_dict.get("EXIF"), 103 xmp=exif_dict.get("XMP"), 104 mpf=exif_dict.get("MPF"), 105 jfif=exif_dict.get("JFIF"), 106 icc_profile=exif_dict.get("ICC_Profile"), 107 composite=exif_dict["Composite"], 108 gif=exif_dict.get("GIF"), 109 png=exif_dict.get("PNG"), 110 quicktime=exif_dict.get("QuickTime"), 111 matroska=exif_dict.get("Matroska"), 112 )
Extract EXIF data from an image using exiftool.
47 def process(self, data: ImageData, _: FullAnalyzerConfig) -> None: 48 """Extract EXIF data from an image.""" 49 with ExifToolHelper() as et: 50 result = et.execute_json(str(data.path)) 51 exif_dict = structure_exiftool_dict(result[0]) 52 if ( 53 "Composite" not in exif_dict 54 or "File" not in exif_dict 55 or "ExifTool" not in exif_dict 56 ): 57 raise ValueError(f"Media-analyzer does not support this file {data.path}") 58 59 if "EXIF" in exif_dict: 60 alt_ref = exif_dict["EXIF"].get("GPSAltitudeRef") 61 # altitude ref = 0 means above sea level 62 # ref = 1 means below sea level 63 # LG G4 produces ref = 1.8 for some reason when above sea level 64 # (maybe also below?) 65 if alt_ref not in {0, 1, None}: 66 if "GPSAltitude" in exif_dict["Composite"]: 67 exif_dict["Composite"]["GPSAltitude"] = abs( 68 exif_dict["Composite"]["GPSAltitude"], 69 ) 70 exif_dict["EXIF"]["GPSAltitudeRef"] = 0 71 72 assert "ExifTool" in exif_dict 73 assert "File" in exif_dict 74 assert "Composite" in exif_dict 75 width = exif_dict["File"].get("ImageWidth") 76 height = exif_dict["File"].get("ImageHeight") 77 duration: float | None = None 78 if "GIF" in exif_dict: 79 width = exif_dict["GIF"]["ImageWidth"] 80 height = exif_dict["GIF"]["ImageHeight"] 81 if "PNG" in exif_dict: 82 width = exif_dict["PNG"]["ImageWidth"] 83 height = exif_dict["PNG"]["ImageHeight"] 84 if "QuickTime" in exif_dict: 85 duration = exif_dict["QuickTime"]["Duration"] 86 width = exif_dict["QuickTime"]["ImageWidth"] 87 height = exif_dict["QuickTime"]["ImageHeight"] 88 if "Matroska" in exif_dict: 89 width = exif_dict["Matroska"]["ImageWidth"] 90 height = exif_dict["Matroska"]["ImageHeight"] 91 duration = parse_duration(exif_dict["Matroska"]["Duration"]) 92 93 assert width and height 94 data.exif = ExifData( 95 size_bytes=exif_dict["File"]["FileSize"], 96 width=width, 97 height=height, 98 duration=duration, 99 format=exif_dict["File"]["MIMEType"], 100 exif_tool=exif_dict["ExifTool"], 101 file=exif_dict["File"], 102 exif=exif_dict.get("EXIF"), 103 xmp=exif_dict.get("XMP"), 104 mpf=exif_dict.get("MPF"), 105 jfif=exif_dict.get("JFIF"), 106 icc_profile=exif_dict.get("ICC_Profile"), 107 composite=exif_dict["Composite"], 108 gif=exif_dict.get("GIF"), 109 png=exif_dict.get("PNG"), 110 quicktime=exif_dict.get("QuickTime"), 111 matroska=exif_dict.get("Matroska"), 112 )
Extract EXIF data from an image.
14class GPSModule(PipelineModule[ImageData]): 15 """Extract GPS data from an image.""" 16 17 depends: ClassVar[set[AnalyzerModule]] = {FileModule.EXIF} 18 19 def process(self, data: ImageData, _: FullAnalyzerConfig) -> None: 20 """Extract GPS time and location data from an image, and reverse geocode.""" 21 if ( 22 data.exif is None 23 or not data.exif.composite 24 or "GPSLatitude" not in data.exif.composite 25 or "GPSLongitude" not in data.exif.composite 26 ): 27 return 28 29 lat = data.exif.composite["GPSLatitude"] 30 lon = data.exif.composite["GPSLongitude"] 31 if not lat or not lon: 32 return 33 34 alt = data.exif.composite.get("GPSAltitude") 35 gps_datetime: datetime | None = None 36 if "GPSDateTime" in data.exif.composite: 37 for date_fmt in ["%Y:%m:%d %H:%M:%S.%fZ", "%Y:%m:%d %H:%M:%SZ"]: 38 try: 39 gps_datetime = datetime.strptime( # noqa: DTZ007 40 data.exif.composite["GPSDateTime"], 41 date_fmt, 42 ) 43 if gps_datetime is not None: 44 break 45 except ValueError: 46 pass 47 48 coded = reverse_geocode.get((lat, lon)) 49 data.time = IntermediateTimeData(datetime_utc=gps_datetime) 50 data.gps = GPSData( 51 latitude=lat, 52 longitude=lon, 53 altitude=alt, 54 location=GeoLocation( 55 country=coded["country"], 56 province=coded.get("state"), 57 city=coded["city"], 58 place_latitude=coded["latitude"], 59 place_longitude=coded["longitude"], 60 ), 61 )
Extract GPS data from an image.
19 def process(self, data: ImageData, _: FullAnalyzerConfig) -> None: 20 """Extract GPS time and location data from an image, and reverse geocode.""" 21 if ( 22 data.exif is None 23 or not data.exif.composite 24 or "GPSLatitude" not in data.exif.composite 25 or "GPSLongitude" not in data.exif.composite 26 ): 27 return 28 29 lat = data.exif.composite["GPSLatitude"] 30 lon = data.exif.composite["GPSLongitude"] 31 if not lat or not lon: 32 return 33 34 alt = data.exif.composite.get("GPSAltitude") 35 gps_datetime: datetime | None = None 36 if "GPSDateTime" in data.exif.composite: 37 for date_fmt in ["%Y:%m:%d %H:%M:%S.%fZ", "%Y:%m:%d %H:%M:%SZ"]: 38 try: 39 gps_datetime = datetime.strptime( # noqa: DTZ007 40 data.exif.composite["GPSDateTime"], 41 date_fmt, 42 ) 43 if gps_datetime is not None: 44 break 45 except ValueError: 46 pass 47 48 coded = reverse_geocode.get((lat, lon)) 49 data.time = IntermediateTimeData(datetime_utc=gps_datetime) 50 data.gps = GPSData( 51 latitude=lat, 52 longitude=lon, 53 altitude=alt, 54 location=GeoLocation( 55 country=coded["country"], 56 province=coded.get("state"), 57 city=coded["city"], 58 place_latitude=coded["latitude"], 59 place_longitude=coded["longitude"], 60 ), 61 )
Extract GPS time and location data from an image, and reverse geocode.
123class TimeModule(PipelineModule[ImageData]): 124 """Extracts datetime from an image.""" 125 126 depends: ClassVar[set[AnalyzerModule]] = {FileModule.EXIF, FileModule.GPS} 127 128 def process(self, data: ImageData, _: FullAnalyzerConfig) -> None: 129 """Extracts datetime from an image.""" 130 datetime_taken, datetime_source = get_local_datetime(data) 131 datetime_utc, timezone_name, timezone_offset = get_timezone_info(data, datetime_taken) 132 if datetime_utc is not None: 133 datetime_utc = datetime_utc.replace(tzinfo=None) 134 datetime_taken = datetime_taken.replace(tzinfo=None) 135 136 data.time = TimeData( 137 datetime_utc=datetime_utc, 138 datetime_local=datetime_taken, 139 datetime_source=datetime_source, 140 timezone_name=timezone_name, 141 timezone_offset=timezone_offset, 142 )
Extracts datetime from an image.
128 def process(self, data: ImageData, _: FullAnalyzerConfig) -> None: 129 """Extracts datetime from an image.""" 130 datetime_taken, datetime_source = get_local_datetime(data) 131 datetime_utc, timezone_name, timezone_offset = get_timezone_info(data, datetime_taken) 132 if datetime_utc is not None: 133 datetime_utc = datetime_utc.replace(tzinfo=None) 134 datetime_taken = datetime_taken.replace(tzinfo=None) 135 136 data.time = TimeData( 137 datetime_utc=datetime_utc, 138 datetime_local=datetime_taken, 139 datetime_source=datetime_source, 140 timezone_name=timezone_name, 141 timezone_offset=timezone_offset, 142 )
Extracts datetime from an image.
14class WeatherModule(PipelineModule[ImageData]): 15 """Extract weather data from the time and place an image was taken.""" 16 17 depends: ClassVar[set[AnalyzerModule]] = {FileModule.GPS} 18 19 def process(self, data: ImageData, _: FullAnalyzerConfig) -> None: 20 """Extract weather data from the time and place an image was taken.""" 21 if ( 22 not data.gps 23 or not data.time 24 or not data.time.datetime_utc 25 or not data.gps.latitude 26 or not data.gps.longitude 27 ): 28 return 29 meteo_data = Hourly( 30 Point(lat=data.gps.latitude, lon=data.gps.longitude), 31 data.time.datetime_utc - timedelta(minutes=30), 32 data.time.datetime_utc + timedelta(minutes=30), 33 ) 34 meteo_data = meteo_data.fetch() 35 if len(meteo_data) == 0: 36 return # pragma: no cover 37 max_possible_rows = 2 38 assert len(meteo_data) <= max_possible_rows 39 weather = meteo_data.iloc[0] 40 41 def panda_number(field: Any) -> int | None: # noqa: ANN401 42 try: 43 return int(field) 44 except (ValueError, TypeError): 45 return None 46 47 coco_number = panda_number(weather.coco) 48 weather_condition = WeatherCondition(coco_number) if coco_number is not None else None 49 data.weather = WeatherData( 50 weather_recorded_at=weather.name.to_pydatetime(), 51 weather_temperature=panda_number(weather.temp), 52 weather_dewpoint=panda_number(weather.dwpt), 53 weather_relative_humidity=panda_number(weather.rhum), 54 weather_precipitation=panda_number(weather.prcp), 55 weather_wind_gust=panda_number(weather.wpgt), 56 weather_pressure=panda_number(weather.pres), 57 weather_sun_hours=panda_number(weather.tsun), 58 weather_condition=weather_condition, 59 )
Extract weather data from the time and place an image was taken.
19 def process(self, data: ImageData, _: FullAnalyzerConfig) -> None: 20 """Extract weather data from the time and place an image was taken.""" 21 if ( 22 not data.gps 23 or not data.time 24 or not data.time.datetime_utc 25 or not data.gps.latitude 26 or not data.gps.longitude 27 ): 28 return 29 meteo_data = Hourly( 30 Point(lat=data.gps.latitude, lon=data.gps.longitude), 31 data.time.datetime_utc - timedelta(minutes=30), 32 data.time.datetime_utc + timedelta(minutes=30), 33 ) 34 meteo_data = meteo_data.fetch() 35 if len(meteo_data) == 0: 36 return # pragma: no cover 37 max_possible_rows = 2 38 assert len(meteo_data) <= max_possible_rows 39 weather = meteo_data.iloc[0] 40 41 def panda_number(field: Any) -> int | None: # noqa: ANN401 42 try: 43 return int(field) 44 except (ValueError, TypeError): 45 return None 46 47 coco_number = panda_number(weather.coco) 48 weather_condition = WeatherCondition(coco_number) if coco_number is not None else None 49 data.weather = WeatherData( 50 weather_recorded_at=weather.name.to_pydatetime(), 51 weather_temperature=panda_number(weather.temp), 52 weather_dewpoint=panda_number(weather.dwpt), 53 weather_relative_humidity=panda_number(weather.rhum), 54 weather_precipitation=panda_number(weather.prcp), 55 weather_wind_gust=panda_number(weather.wpgt), 56 weather_pressure=panda_number(weather.pres), 57 weather_sun_hours=panda_number(weather.tsun), 58 weather_condition=weather_condition, 59 )
Extract weather data from the time and place an image was taken.
35class TagsModule(PipelineModule[ImageData]): 36 """Extract weather data from the time and place an image was taken.""" 37 38 depends: ClassVar[set[AnalyzerModule]] = {FileModule.EXIF} 39 40 def process(self, data: ImageData, _: FullAnalyzerConfig) -> None: 41 """Get tags such as is_panorama, is_night_sight, is_motion_photo, etc.""" 42 assert data.exif is not None 43 44 is_hdr = "hdr" in data.path.name.lower() 45 is_burst, burst_id = detect_burst(data.path.name) 46 is_timelapse = False 47 is_slowmotion = False 48 is_photosphere = False 49 is_night_sight = "night" in data.path.name.lower() 50 is_video = "video" in data.exif.file["MIMEType"] 51 is_motion_photo = False 52 projection_type: str | None = None 53 capture_fps: float | None = None 54 video_fps: float | None = None 55 use_panorama_viewer = False 56 motion_photo_presentation_timestamp: int | None = None 57 58 # Photosphere / Motion Photo 59 if data.exif.xmp: 60 use_panorama_viewer = data.exif.xmp.get("UsePanoramaViewer", False) 61 is_photosphere = data.exif.xmp.get("IsPhotosphere", False) 62 projection_type = data.exif.xmp.get("ProjectionType", None) 63 is_motion_photo = data.exif.xmp.get("MotionPhoto", 0) == 1 64 if is_motion_photo: 65 motion_photo_presentation_timestamp = data.exif.xmp.get( 66 "MotionPhotoPresentationTimestampUs" 67 ) 68 69 if "BurstID" in data.exif.xmp: 70 is_burst = True 71 burst_id = data.exif.xmp["BurstID"] 72 73 # Slowmotion 74 if data.exif.quicktime: 75 capture_fps = data.exif.quicktime.get("AndroidCaptureFPS") 76 video_fps = data.exif.quicktime.get("VideoFrameRate") 77 if capture_fps and video_fps and capture_fps > video_fps + 1: 78 is_slowmotion = True 79 80 special_type = data.exif.quicktime.get("SpecialTypeID") 81 if special_type: 82 is_timelapse = "timelapse" in special_type.lower() 83 84 data.tags = TagData( 85 is_video=is_video, 86 capture_fps=capture_fps, 87 video_fps=video_fps, 88 is_hdr=is_hdr, 89 is_burst=is_burst, 90 burst_id=burst_id, 91 is_timelapse=is_timelapse, 92 is_slowmotion=is_slowmotion, 93 is_photosphere=is_photosphere, 94 is_night_sight=is_night_sight, 95 is_motion_photo=is_motion_photo, 96 projection_type=projection_type, 97 use_panorama_viewer=use_panorama_viewer, 98 motion_photo_presentation_timestamp=motion_photo_presentation_timestamp, 99 )
Extract weather data from the time and place an image was taken.
40 def process(self, data: ImageData, _: FullAnalyzerConfig) -> None: 41 """Get tags such as is_panorama, is_night_sight, is_motion_photo, etc.""" 42 assert data.exif is not None 43 44 is_hdr = "hdr" in data.path.name.lower() 45 is_burst, burst_id = detect_burst(data.path.name) 46 is_timelapse = False 47 is_slowmotion = False 48 is_photosphere = False 49 is_night_sight = "night" in data.path.name.lower() 50 is_video = "video" in data.exif.file["MIMEType"] 51 is_motion_photo = False 52 projection_type: str | None = None 53 capture_fps: float | None = None 54 video_fps: float | None = None 55 use_panorama_viewer = False 56 motion_photo_presentation_timestamp: int | None = None 57 58 # Photosphere / Motion Photo 59 if data.exif.xmp: 60 use_panorama_viewer = data.exif.xmp.get("UsePanoramaViewer", False) 61 is_photosphere = data.exif.xmp.get("IsPhotosphere", False) 62 projection_type = data.exif.xmp.get("ProjectionType", None) 63 is_motion_photo = data.exif.xmp.get("MotionPhoto", 0) == 1 64 if is_motion_photo: 65 motion_photo_presentation_timestamp = data.exif.xmp.get( 66 "MotionPhotoPresentationTimestampUs" 67 ) 68 69 if "BurstID" in data.exif.xmp: 70 is_burst = True 71 burst_id = data.exif.xmp["BurstID"] 72 73 # Slowmotion 74 if data.exif.quicktime: 75 capture_fps = data.exif.quicktime.get("AndroidCaptureFPS") 76 video_fps = data.exif.quicktime.get("VideoFrameRate") 77 if capture_fps and video_fps and capture_fps > video_fps + 1: 78 is_slowmotion = True 79 80 special_type = data.exif.quicktime.get("SpecialTypeID") 81 if special_type: 82 is_timelapse = "timelapse" in special_type.lower() 83 84 data.tags = TagData( 85 is_video=is_video, 86 capture_fps=capture_fps, 87 video_fps=video_fps, 88 is_hdr=is_hdr, 89 is_burst=is_burst, 90 burst_id=burst_id, 91 is_timelapse=is_timelapse, 92 is_slowmotion=is_slowmotion, 93 is_photosphere=is_photosphere, 94 is_night_sight=is_night_sight, 95 is_motion_photo=is_motion_photo, 96 projection_type=projection_type, 97 use_panorama_viewer=use_panorama_viewer, 98 motion_photo_presentation_timestamp=motion_photo_presentation_timestamp, 99 )
Get tags such as is_panorama, is_night_sight, is_motion_photo, etc.
160class CaptionModule(PipelineModule[FrameData]): 161 """Generate a caption from an image.""" 162 163 def process(self, data: FrameData, config: FullAnalyzerConfig) -> None: 164 """Generate caption data from an image.""" 165 data.caption_data = analyze_image(config.captioner, data.image)
Generate a caption from an image.
7class EmbeddingModule(PipelineModule[FrameData]): 8 """Embed an image using CLIP.""" 9 10 def process(self, data: FrameData, config: FullAnalyzerConfig) -> None: 11 """Embed an image using CLIP.""" 12 embedding = config.embedder.embed_image(data.image).tolist() 13 assert isinstance(embedding, list) 14 data.embedding = embedding
Embed an image using CLIP.
7class SummaryModule(PipelineModule[FrameData]): 8 """Generate a summary from an image using a language model.""" 9 10 def process(self, data: FrameData, config: FullAnalyzerConfig) -> None: # pragma: no cover 11 """Generate a summary from an image using a language model.""" 12 if not config.settings.enable_text_summary: 13 return 14 prompt = ( 15 "Describe this image in a way that captures all essential details " 16 "for a search database. Include the setting, key objects, actions, " 17 "number and type of people or animals, and any noticeable visual " 18 "features. Make the description clear, concise, and useful for " 19 "someone searching this image in a library. Avoid subjective " 20 "interpretations or ambiguous terms." 21 ) 22 23 data.summary = config.llm.image_question(data.image, prompt)
Generate a summary from an image using a language model.
10 def process(self, data: FrameData, config: FullAnalyzerConfig) -> None: # pragma: no cover 11 """Generate a summary from an image using a language model.""" 12 if not config.settings.enable_text_summary: 13 return 14 prompt = ( 15 "Describe this image in a way that captures all essential details " 16 "for a search database. Include the setting, key objects, actions, " 17 "number and type of people or animals, and any noticeable visual " 18 "features. Make the description clear, concise, and useful for " 19 "someone searching this image in a library. Avoid subjective " 20 "interpretations or ambiguous terms." 21 ) 22 23 data.summary = config.llm.image_question(data.image, prompt)
Generate a summary from an image using a language model.
7class FacesModule(PipelineModule[FrameData]): 8 """Get faces from an image.""" 9 10 def process(self, data: FrameData, config: FullAnalyzerConfig) -> None: 11 """Get faces from an image.""" 12 data.faces = config.facial_recognition.get_faces(data.image)
Get faces from an image.
12class OCRModule(PipelineModule[FrameData]): 13 """Extract text from an image using OCR.""" 14 15 def process(self, data: FrameData, config: FullAnalyzerConfig) -> None: 16 """Extract text from an image using OCR.""" 17 has_text = config.ocr.has_legible_text(data.image) 18 extracted_text: str | None = None 19 summary: str | None = None 20 boxes: list[OCRBox] = [] 21 if has_text: 22 extracted_text = config.ocr.get_text(data.image, config.settings.media_languages) 23 if extracted_text.strip() == "": 24 has_text = False 25 extracted_text = None 26 boxes = config.ocr.get_boxes(data.image, config.settings.media_languages) 27 28 # Check if this could be a photo of a document 29 if ( 30 config.settings.enable_document_summary 31 and has_text 32 and extracted_text 33 and len(extracted_text) > config.settings.document_detection_threshold 34 ): # pragma: no cover 35 prompt = ( 36 "Analyze the image and provide the following details:\n\n" 37 "Summary: A concise summary of the content in the photo, including any" 38 "key points or important sections visible." 39 "Text Detection: Detect and list any legible text visible in the image." 40 "If possible, extract it and provide a short excerpt or the full text." 41 "Language Detection: Identify the language(s) in the text and specify the" 42 "primary language used." 43 "Document Type: Determine the type of document or text. Is it a formal" 44 "document (e.g., letter, contract, form), informal (e.g., note, memo)," 45 "or something else? Provide details about the document's likely purpose" 46 "(e.g., invoice, receipt, report, etc.)." 47 "Text Formatting: If relevant, describe any specific formatting styles" 48 "such as headings, bullet points, numbered lists, tables, or signatures." 49 "Additional Features: Detect if there are any images, logos, or other" 50 "non-text elements present that provide additional context or information" 51 "about the document (e.g., company logos, photos, charts)." 52 "Contextual Details: If applicable, mention any visible date, address," 53 "or other contextual information that could help understand the document's" 54 "origin or purpose." 55 ) 56 57 summary = config.llm.image_question(data.image, prompt) 58 59 data.ocr = OCRData( 60 has_legible_text=has_text, 61 ocr_text=extracted_text, 62 document_summary=summary, 63 ocr_boxes=boxes, 64 )
Extract text from an image using OCR.
15 def process(self, data: FrameData, config: FullAnalyzerConfig) -> None: 16 """Extract text from an image using OCR.""" 17 has_text = config.ocr.has_legible_text(data.image) 18 extracted_text: str | None = None 19 summary: str | None = None 20 boxes: list[OCRBox] = [] 21 if has_text: 22 extracted_text = config.ocr.get_text(data.image, config.settings.media_languages) 23 if extracted_text.strip() == "": 24 has_text = False 25 extracted_text = None 26 boxes = config.ocr.get_boxes(data.image, config.settings.media_languages) 27 28 # Check if this could be a photo of a document 29 if ( 30 config.settings.enable_document_summary 31 and has_text 32 and extracted_text 33 and len(extracted_text) > config.settings.document_detection_threshold 34 ): # pragma: no cover 35 prompt = ( 36 "Analyze the image and provide the following details:\n\n" 37 "Summary: A concise summary of the content in the photo, including any" 38 "key points or important sections visible." 39 "Text Detection: Detect and list any legible text visible in the image." 40 "If possible, extract it and provide a short excerpt or the full text." 41 "Language Detection: Identify the language(s) in the text and specify the" 42 "primary language used." 43 "Document Type: Determine the type of document or text. Is it a formal" 44 "document (e.g., letter, contract, form), informal (e.g., note, memo)," 45 "or something else? Provide details about the document's likely purpose" 46 "(e.g., invoice, receipt, report, etc.)." 47 "Text Formatting: If relevant, describe any specific formatting styles" 48 "such as headings, bullet points, numbered lists, tables, or signatures." 49 "Additional Features: Detect if there are any images, logos, or other" 50 "non-text elements present that provide additional context or information" 51 "about the document (e.g., company logos, photos, charts)." 52 "Contextual Details: If applicable, mention any visible date, address," 53 "or other contextual information that could help understand the document's" 54 "origin or purpose." 55 ) 56 57 summary = config.llm.image_question(data.image, prompt) 58 59 data.ocr = OCRData( 60 has_legible_text=has_text, 61 ocr_text=extracted_text, 62 document_summary=summary, 63 ocr_boxes=boxes, 64 )
Extract text from an image using OCR.
7class ObjectsModule(PipelineModule[FrameData]): 8 """Detect objects in an image.""" 9 10 def process(self, data: FrameData, config: FullAnalyzerConfig) -> None: 11 """Detect objects in an image.""" 12 data.objects = config.object_detector.detect_objects(data.image)
Detect objects in an image.
115class QualityDetectionModule(PipelineModule[FrameData]): 116 """Detect image quality metrics.""" 117 118 def process(self, data: FrameData, _: FullAnalyzerConfig) -> None: 119 """Detect image quality metrics.""" 120 image_cv2: npt.NDArray[np.uint8] = np.array(data.image) 121 image_cv2 = cv2.cvtColor(image_cv2, cv2.COLOR_RGB2BGR) # type: ignore[assignment] 122 mean_brightness, contrast = exposure_measurement(image_cv2) 123 data.measured_quality = MeasuredQualityData( 124 measured_sharpness=sharpness_measurement(image_cv2), 125 measured_noise=noise_measurement(image_cv2), 126 measured_brightness=mean_brightness, 127 measured_contrast=contrast, 128 measured_clipping=measure_clipping(image_cv2), 129 measured_dynamic_range=calculate_dynamic_range(image_cv2), 130 quality_score=composite_quality_score(image_cv2), 131 )
Detect image quality metrics.
118 def process(self, data: FrameData, _: FullAnalyzerConfig) -> None: 119 """Detect image quality metrics.""" 120 image_cv2: npt.NDArray[np.uint8] = np.array(data.image) 121 image_cv2 = cv2.cvtColor(image_cv2, cv2.COLOR_RGB2BGR) # type: ignore[assignment] 122 mean_brightness, contrast = exposure_measurement(image_cv2) 123 data.measured_quality = MeasuredQualityData( 124 measured_sharpness=sharpness_measurement(image_cv2), 125 measured_noise=noise_measurement(image_cv2), 126 measured_brightness=mean_brightness, 127 measured_contrast=contrast, 128 measured_clipping=measure_clipping(image_cv2), 129 measured_dynamic_range=calculate_dynamic_range(image_cv2), 130 quality_score=composite_quality_score(image_cv2), 131 )
Detect image quality metrics.
38class ColorModule(PipelineModule[FrameData]): 39 """Get Color info from an image.""" 40 41 def process(self, data: FrameData, config: FullAnalyzerConfig) -> None: 42 """Get Color info from an image.""" 43 cv_image = np.array(data.image) 44 image_hsv = cv2.cvtColor(cv_image, cv2.COLOR_RGB2HSV) 45 46 # Extract the hue channel 47 hue_channel = image_hsv[:, :, 0].flatten() 48 saturation_channel = image_hsv[:, :, 1].flatten() 49 lightness_channel = image_hsv[:, :, 2].flatten() 50 51 # Convert hue values from OpenCV's [0, 179] range to [0, 360] range, and calculate avg hue. 52 average_hue_value = average_hue(hue_channel * 2) 53 average_saturation_value = float(saturation_channel.mean()) 54 average_lightness_value = float(lightness_channel.mean()) 55 56 prominent_colors = prominent_colors_from_image(data.image)[0:3] 57 themes = [ 58 theme_from_color( 59 color, 60 variant=config.settings.theme_color_variant, 61 contrast_level=config.settings.theme_contrast_level, 62 ) 63 for color in prominent_colors 64 ] 65 66 # Calculate color histograms for each channel 67 histogram_bins = 256 68 red_hist = cv2.calcHist([cv_image], [0], None, [histogram_bins], [0, 256]).flatten() 69 green_hist = cv2.calcHist([cv_image], [1], None, [histogram_bins], [0, 256]).flatten() 70 blue_hist = cv2.calcHist([cv_image], [2], None, [histogram_bins], [0, 256]).flatten() 71 72 # Convert histogram values from floats to ints 73 red_ints = [int(x) for x in red_hist] 74 green_ints = [int(x) for x in green_hist] 75 blue_ints = [int(x) for x in blue_hist] 76 77 histogram: ColorHistogram = { 78 "bins": histogram_bins, 79 "channels": {"red": red_ints, "green": green_ints, "blue": blue_ints}, 80 } 81 82 data.color = ColorData( 83 themes=[theme.dict() for theme in themes], 84 prominent_colors=prominent_colors, 85 average_hue=average_hue_value, 86 average_saturation=average_saturation_value, 87 average_lightness=average_lightness_value, 88 histogram=histogram, 89 )
Get Color info from an image.
41 def process(self, data: FrameData, config: FullAnalyzerConfig) -> None: 42 """Get Color info from an image.""" 43 cv_image = np.array(data.image) 44 image_hsv = cv2.cvtColor(cv_image, cv2.COLOR_RGB2HSV) 45 46 # Extract the hue channel 47 hue_channel = image_hsv[:, :, 0].flatten() 48 saturation_channel = image_hsv[:, :, 1].flatten() 49 lightness_channel = image_hsv[:, :, 2].flatten() 50 51 # Convert hue values from OpenCV's [0, 179] range to [0, 360] range, and calculate avg hue. 52 average_hue_value = average_hue(hue_channel * 2) 53 average_saturation_value = float(saturation_channel.mean()) 54 average_lightness_value = float(lightness_channel.mean()) 55 56 prominent_colors = prominent_colors_from_image(data.image)[0:3] 57 themes = [ 58 theme_from_color( 59 color, 60 variant=config.settings.theme_color_variant, 61 contrast_level=config.settings.theme_contrast_level, 62 ) 63 for color in prominent_colors 64 ] 65 66 # Calculate color histograms for each channel 67 histogram_bins = 256 68 red_hist = cv2.calcHist([cv_image], [0], None, [histogram_bins], [0, 256]).flatten() 69 green_hist = cv2.calcHist([cv_image], [1], None, [histogram_bins], [0, 256]).flatten() 70 blue_hist = cv2.calcHist([cv_image], [2], None, [histogram_bins], [0, 256]).flatten() 71 72 # Convert histogram values from floats to ints 73 red_ints = [int(x) for x in red_hist] 74 green_ints = [int(x) for x in green_hist] 75 blue_ints = [int(x) for x in blue_hist] 76 77 histogram: ColorHistogram = { 78 "bins": histogram_bins, 79 "channels": {"red": red_ints, "green": green_ints, "blue": blue_ints}, 80 } 81 82 data.color = ColorData( 83 themes=[theme.dict() for theme in themes], 84 prominent_colors=prominent_colors, 85 average_hue=average_hue_value, 86 average_saturation=average_saturation_value, 87 average_lightness=average_lightness_value, 88 histogram=histogram, 89 )
Get Color info from an image.
7class CaptionerProtocol(Protocol): 8 """Protocol for captioning images.""" 9 10 def caption(self, image: Image, instruction: str | None = None) -> str: 11 """Generate a caption for the given image. 12 13 Args: 14 image: The image to caption. 15 instruction: Optional instruction to prompt the caption model. 16 """
Protocol for captioning images.
1771def _no_init_or_replace_init(self, *args, **kwargs): 1772 cls = type(self) 1773 1774 if cls._is_protocol: 1775 raise TypeError('Protocols cannot be instantiated') 1776 1777 # Already using a custom `__init__`. No need to calculate correct 1778 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1779 if cls.__init__ is not _no_init_or_replace_init: 1780 return 1781 1782 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1783 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1784 # searches for a proper new `__init__` in the MRO. The new `__init__` 1785 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1786 # instantiation of the protocol subclass will thus use the new 1787 # `__init__` and no longer call `_no_init_or_replace_init`. 1788 for base in cls.__mro__: 1789 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1790 if init is not _no_init_or_replace_init: 1791 cls.__init__ = init 1792 break 1793 else: 1794 # should not happen 1795 cls.__init__ = object.__init__ 1796 1797 cls.__init__(self, *args, **kwargs)
10 def caption(self, image: Image, instruction: str | None = None) -> str: 11 """Generate a caption for the given image. 12 13 Args: 14 image: The image to caption. 15 instruction: Optional instruction to prompt the caption model. 16 """
Generate a caption for the given image.
Arguments:
- image: The image to caption.
- instruction: Optional instruction to prompt the caption model.
25class BlipCaptioner(CaptionerProtocol): 26 """Captioner implementation using the BLIP model. 27 28 This class provides methods to generate captions for images, handling specific 29 issues like hallucinated words and formatting errors. 30 """ 31 32 # dumbass blip captioner comes up with the word arafed or araffe sometimes. 33 hallucinated_words: ClassVar[list[str]] = ["arafed", "araffe"] 34 35 def caption(self, image: Image, instruction: str | None = None) -> str: 36 """Generate a caption for the given image. 37 38 Args: 39 image: The image to caption. 40 instruction: An optional conditional text to guide the caption generation. 41 42 Returns: 43 A formatted caption string. 44 """ 45 caption = self.raw_caption(image, instruction) 46 # Captions with apostrophe come out weird: "Person ' s" 47 caption = caption.replace(" ' ", "'") 48 if all(word not in caption for word in self.hallucinated_words): 49 return caption.capitalize() 50 for fake_word in self.hallucinated_words: 51 caption = caption.replace(fake_word, "") 52 return caption.strip().capitalize() 53 54 @staticmethod 55 def raw_caption(image: Image, instruction: str | None = None) -> str: 56 """Generate a raw caption for the image using the BLIP model. 57 58 Args: 59 image: The image to caption. 60 instruction: An optional conditional text to guide the caption generation. 61 62 Returns: 63 The raw caption string generated by the model. 64 """ 65 processor, model = get_processor_and_model() 66 rgb_image = image.convert("RGB") 67 if instruction is None: 68 inputs = processor(rgb_image, return_tensors="pt").to("cuda") 69 else: 70 inputs = processor(rgb_image, instruction, return_tensors="pt").to("cuda") 71 out = model.generate(**inputs) 72 caption = processor.decode( # type: ignore[no-untyped-call] 73 out[0], skip_special_tokens=True 74 ) 75 assert isinstance(caption, str) 76 return caption
Captioner implementation using the BLIP model.
This class provides methods to generate captions for images, handling specific issues like hallucinated words and formatting errors.
35 def caption(self, image: Image, instruction: str | None = None) -> str: 36 """Generate a caption for the given image. 37 38 Args: 39 image: The image to caption. 40 instruction: An optional conditional text to guide the caption generation. 41 42 Returns: 43 A formatted caption string. 44 """ 45 caption = self.raw_caption(image, instruction) 46 # Captions with apostrophe come out weird: "Person ' s" 47 caption = caption.replace(" ' ", "'") 48 if all(word not in caption for word in self.hallucinated_words): 49 return caption.capitalize() 50 for fake_word in self.hallucinated_words: 51 caption = caption.replace(fake_word, "") 52 return caption.strip().capitalize()
Generate a caption for the given image.
Arguments:
- image: The image to caption.
- instruction: An optional conditional text to guide the caption generation.
Returns:
A formatted caption string.
54 @staticmethod 55 def raw_caption(image: Image, instruction: str | None = None) -> str: 56 """Generate a raw caption for the image using the BLIP model. 57 58 Args: 59 image: The image to caption. 60 instruction: An optional conditional text to guide the caption generation. 61 62 Returns: 63 The raw caption string generated by the model. 64 """ 65 processor, model = get_processor_and_model() 66 rgb_image = image.convert("RGB") 67 if instruction is None: 68 inputs = processor(rgb_image, return_tensors="pt").to("cuda") 69 else: 70 inputs = processor(rgb_image, instruction, return_tensors="pt").to("cuda") 71 out = model.generate(**inputs) 72 caption = processor.decode( # type: ignore[no-untyped-call] 73 out[0], skip_special_tokens=True 74 ) 75 assert isinstance(caption, str) 76 return caption
Generate a raw caption for the image using the BLIP model.
Arguments:
- image: The image to caption.
- instruction: An optional conditional text to guide the caption generation.
Returns:
The raw caption string generated by the model.
8class EmbedderProtocol(Protocol): 9 """Embedder protocol.""" 10 11 def embed_text(self, text: str) -> NDArray[Any]: 12 """Embed a text input and return a list of floats as the embedding.""" 13 14 def embed_texts(self, texts: list[str]) -> NDArray[Any]: 15 """Embed a text inputs.""" 16 17 def embed_image(self, image: Image) -> NDArray[Any]: 18 """Embed an image input and return a list of floats as the embedding.""" 19 20 def embed_images(self, images: list[Image]) -> NDArray[Any]: 21 """Embed images."""
Embedder protocol.
1771def _no_init_or_replace_init(self, *args, **kwargs): 1772 cls = type(self) 1773 1774 if cls._is_protocol: 1775 raise TypeError('Protocols cannot be instantiated') 1776 1777 # Already using a custom `__init__`. No need to calculate correct 1778 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1779 if cls.__init__ is not _no_init_or_replace_init: 1780 return 1781 1782 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1783 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1784 # searches for a proper new `__init__` in the MRO. The new `__init__` 1785 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1786 # instantiation of the protocol subclass will thus use the new 1787 # `__init__` and no longer call `_no_init_or_replace_init`. 1788 for base in cls.__mro__: 1789 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1790 if init is not _no_init_or_replace_init: 1791 cls.__init__ = init 1792 break 1793 else: 1794 # should not happen 1795 cls.__init__ = object.__init__ 1796 1797 cls.__init__(self, *args, **kwargs)
11 def embed_text(self, text: str) -> NDArray[Any]: 12 """Embed a text input and return a list of floats as the embedding."""
Embed a text input and return a list of floats as the embedding.
Embed a text inputs.
17 def embed_image(self, image: Image) -> NDArray[Any]: 18 """Embed an image input and return a list of floats as the embedding."""
Embed an image input and return a list of floats as the embedding.
27class ZeroCLIPEmbedder(EmbedderProtocol): 28 """Embedder implementation using the CLIP model.""" 29 30 def embed_text(self, text: str) -> NDArray[np.float32]: 31 """Embed the given text. 32 33 Args: 34 text: The text to embed. 35 36 Returns: 37 The text embedding. 38 """ 39 result: NDArray[np.float32] = self.embed_texts([text])[0] 40 return result 41 42 def embed_texts(self, texts: list[str]) -> NDArray[np.float32]: 43 """Embed the given texts. 44 45 Args: 46 texts: The texts to embed. 47 48 Returns: 49 The text embeddings. 50 """ 51 model, processor = get_model_and_processor() 52 inputs_text = processor(text=texts, return_tensors="pt", padding=True) 53 with torch.no_grad(): 54 text_embedding = model.get_text_features(**inputs_text) # type: ignore[operator] 55 return F.normalize(text_embedding, p=2, dim=-1).numpy() 56 57 def embed_image(self, image: Image) -> NDArray[np.float32]: 58 """Embed the given image. 59 60 Args: 61 image: The images to embed. 62 63 Returns: 64 The image embeddings. 65 """ 66 result: NDArray[np.float32] = self.embed_images([image])[0] 67 return result 68 69 def embed_images(self, images: list[Image]) -> NDArray[np.float32]: 70 """Embed the given images. 71 72 Args: 73 images: The images to embed. 74 75 Returns: 76 The image embeddings. 77 """ 78 model, processor = get_model_and_processor() 79 inputs_image = processor(images=images, return_tensors="pt", padding=True) 80 with torch.no_grad(): 81 text_embedding = model.get_image_features(**inputs_image) # type: ignore[operator] 82 return F.normalize(text_embedding, p=2, dim=-1).numpy()
Embedder implementation using the CLIP model.
30 def embed_text(self, text: str) -> NDArray[np.float32]: 31 """Embed the given text. 32 33 Args: 34 text: The text to embed. 35 36 Returns: 37 The text embedding. 38 """ 39 result: NDArray[np.float32] = self.embed_texts([text])[0] 40 return result
Embed the given text.
Arguments:
- text: The text to embed.
Returns:
The text embedding.
42 def embed_texts(self, texts: list[str]) -> NDArray[np.float32]: 43 """Embed the given texts. 44 45 Args: 46 texts: The texts to embed. 47 48 Returns: 49 The text embeddings. 50 """ 51 model, processor = get_model_and_processor() 52 inputs_text = processor(text=texts, return_tensors="pt", padding=True) 53 with torch.no_grad(): 54 text_embedding = model.get_text_features(**inputs_text) # type: ignore[operator] 55 return F.normalize(text_embedding, p=2, dim=-1).numpy()
Embed the given texts.
Arguments:
- texts: The texts to embed.
Returns:
The text embeddings.
57 def embed_image(self, image: Image) -> NDArray[np.float32]: 58 """Embed the given image. 59 60 Args: 61 image: The images to embed. 62 63 Returns: 64 The image embeddings. 65 """ 66 result: NDArray[np.float32] = self.embed_images([image])[0] 67 return result
Embed the given image.
Arguments:
- image: The images to embed.
Returns:
The image embeddings.
69 def embed_images(self, images: list[Image]) -> NDArray[np.float32]: 70 """Embed the given images. 71 72 Args: 73 images: The images to embed. 74 75 Returns: 76 The image embeddings. 77 """ 78 model, processor = get_model_and_processor() 79 inputs_image = processor(images=images, return_tensors="pt", padding=True) 80 with torch.no_grad(): 81 text_embedding = model.get_image_features(**inputs_image) # type: ignore[operator] 82 return F.normalize(text_embedding, p=2, dim=-1).numpy()
Embed the given images.
Arguments:
- images: The images to embed.
Returns:
The image embeddings.
40class OpenCLIPEmbedder(EmbedderProtocol): 41 """Embedder implementation using the OpenCLIP ViT-H-14 model.""" 42 43 def embed_text(self, text: str) -> NDArray[np.float32]: 44 """Embed a single string of text. 45 46 Args: 47 text: The text to embed. 48 49 Returns: 50 A 1D NumPy array representing the text embedding. 51 """ 52 result: NDArray[np.float32] = self.embed_texts([text])[0] 53 return result 54 55 def embed_texts(self, texts: list[str]) -> NDArray[np.float32]: 56 """Embed a list of texts. 57 58 Args: 59 texts: The list of texts to embed. 60 61 Returns: 62 A 2D NumPy array of shape (n_texts, embedding_dim). 63 """ 64 model, _, tokenizer, device = get_open_clip_assets() 65 66 # Tokenize the text and move to the target device 67 text_tokens = tokenizer(texts).to(device) 68 69 with torch.no_grad(): 70 # Generate text features (embeddings) 71 text_features = model.encode_text(text_tokens) 72 # Normalize the features to have unit length 73 text_features = F.normalize(text_features, p=2, dim=-1) 74 75 # Move to CPU and convert to NumPy array 76 return text_features.cpu().numpy() 77 78 def embed_image(self, image: PIL.Image.Image) -> NDArray[np.float32]: 79 """Embed a single PIL Image. 80 81 Args: 82 image: The PIL Image to embed. 83 84 Returns: 85 A 1D NumPy array representing the image embedding. 86 """ 87 result: NDArray[np.float32] = self.embed_images([image])[0] 88 return result 89 90 def embed_images(self, images: list[PIL.Image.Image]) -> NDArray[np.float32]: 91 """Embed a list of PIL Images. 92 93 Args: 94 images: The list of PIL Images to embed. 95 96 Returns: 97 A 2D NumPy array of shape (n_images, embedding_dim). 98 """ 99 model, preprocess, _, device = get_open_clip_assets() 100 101 # Preprocess each image and stack them into a single tensor 102 image_tensors = torch.stack([preprocess(img) for img in images]).to(device) 103 104 with torch.no_grad(): 105 # Generate image features (embeddings) 106 image_features = model.encode_image(image_tensors) 107 # Normalize the features to have unit length 108 image_features = F.normalize(image_features, p=2, dim=-1) 109 110 # Move to CPU and convert to NumPy array 111 return image_features.cpu().numpy()
Embedder implementation using the OpenCLIP ViT-H-14 model.
43 def embed_text(self, text: str) -> NDArray[np.float32]: 44 """Embed a single string of text. 45 46 Args: 47 text: The text to embed. 48 49 Returns: 50 A 1D NumPy array representing the text embedding. 51 """ 52 result: NDArray[np.float32] = self.embed_texts([text])[0] 53 return result
Embed a single string of text.
Arguments:
- text: The text to embed.
Returns:
A 1D NumPy array representing the text embedding.
55 def embed_texts(self, texts: list[str]) -> NDArray[np.float32]: 56 """Embed a list of texts. 57 58 Args: 59 texts: The list of texts to embed. 60 61 Returns: 62 A 2D NumPy array of shape (n_texts, embedding_dim). 63 """ 64 model, _, tokenizer, device = get_open_clip_assets() 65 66 # Tokenize the text and move to the target device 67 text_tokens = tokenizer(texts).to(device) 68 69 with torch.no_grad(): 70 # Generate text features (embeddings) 71 text_features = model.encode_text(text_tokens) 72 # Normalize the features to have unit length 73 text_features = F.normalize(text_features, p=2, dim=-1) 74 75 # Move to CPU and convert to NumPy array 76 return text_features.cpu().numpy()
Embed a list of texts.
Arguments:
- texts: The list of texts to embed.
Returns:
A 2D NumPy array of shape (n_texts, embedding_dim).
78 def embed_image(self, image: PIL.Image.Image) -> NDArray[np.float32]: 79 """Embed a single PIL Image. 80 81 Args: 82 image: The PIL Image to embed. 83 84 Returns: 85 A 1D NumPy array representing the image embedding. 86 """ 87 result: NDArray[np.float32] = self.embed_images([image])[0] 88 return result
Embed a single PIL Image.
Arguments:
- image: The PIL Image to embed.
Returns:
A 1D NumPy array representing the image embedding.
90 def embed_images(self, images: list[PIL.Image.Image]) -> NDArray[np.float32]: 91 """Embed a list of PIL Images. 92 93 Args: 94 images: The list of PIL Images to embed. 95 96 Returns: 97 A 2D NumPy array of shape (n_images, embedding_dim). 98 """ 99 model, preprocess, _, device = get_open_clip_assets() 100 101 # Preprocess each image and stack them into a single tensor 102 image_tensors = torch.stack([preprocess(img) for img in images]).to(device) 103 104 with torch.no_grad(): 105 # Generate image features (embeddings) 106 image_features = model.encode_image(image_tensors) 107 # Normalize the features to have unit length 108 image_features = F.normalize(image_features, p=2, dim=-1) 109 110 # Move to CPU and convert to NumPy array 111 return image_features.cpu().numpy()
Embed a list of PIL Images.
Arguments:
- images: The list of PIL Images to embed.
Returns:
A 2D NumPy array of shape (n_images, embedding_dim).
9class FacialRecognitionProtocol(Protocol): 10 """Protocol for facial recognition.""" 11 12 def get_faces(self, image: Image) -> list[FaceBox]: 13 """Detect and embed faces from an image. 14 15 Args: 16 image: The image to get the faces from. 17 18 Returns: 19 The face boxes. 20 """
Protocol for facial recognition.
1771def _no_init_or_replace_init(self, *args, **kwargs): 1772 cls = type(self) 1773 1774 if cls._is_protocol: 1775 raise TypeError('Protocols cannot be instantiated') 1776 1777 # Already using a custom `__init__`. No need to calculate correct 1778 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1779 if cls.__init__ is not _no_init_or_replace_init: 1780 return 1781 1782 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1783 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1784 # searches for a proper new `__init__` in the MRO. The new `__init__` 1785 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1786 # instantiation of the protocol subclass will thus use the new 1787 # `__init__` and no longer call `_no_init_or_replace_init`. 1788 for base in cls.__mro__: 1789 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1790 if init is not _no_init_or_replace_init: 1791 cls.__init__ = init 1792 break 1793 else: 1794 # should not happen 1795 cls.__init__ = object.__init__ 1796 1797 cls.__init__(self, *args, **kwargs)
12 def get_faces(self, image: Image) -> list[FaceBox]: 13 """Detect and embed faces from an image. 14 15 Args: 16 image: The image to get the faces from. 17 18 Returns: 19 The face boxes. 20 """
Detect and embed faces from an image.
Arguments:
- image: The image to get the faces from.
Returns:
The face boxes.
28class InsightFacialRecognition(FacialRecognitionProtocol): 29 """Facial recognition implementation using the InsightFace model.""" 30 31 def get_faces(self, image: Image) -> list[FaceBox]: 32 """Detect and embed faces from an image.""" 33 cv_image = np.array(image) 34 dims_in_image = 3 35 if cv_image.shape[2] == dims_in_image: 36 cv_image = cv2.cvtColor(cv_image, cv2.COLOR_RGB2BGR) 37 app = get_app() 38 faces = app.get(cv_image) 39 return [ 40 FaceBox( 41 position=coordinate_to_proportional(face.bbox.tolist(), image), 42 width=(face.bbox[2] - face.bbox[0]).item() / image.width, 43 height=(face.bbox[3] - face.bbox[1]).item() / image.height, 44 age=face.age, 45 sex=FaceSex(face.sex), 46 confidence=face.det_score.item(), 47 mouth_left=coordinate_to_proportional(face.kps[0].tolist(), image), 48 mouth_right=coordinate_to_proportional(face.kps[1].tolist(), image), 49 nose_tip=coordinate_to_proportional(face.kps[2].tolist(), image), 50 eye_left=coordinate_to_proportional(face.kps[3].tolist(), image), 51 eye_right=coordinate_to_proportional(face.kps[4].tolist(), image), 52 embedding=face.normed_embedding.tolist(), 53 ) 54 for face in faces 55 ]
Facial recognition implementation using the InsightFace model.
31 def get_faces(self, image: Image) -> list[FaceBox]: 32 """Detect and embed faces from an image.""" 33 cv_image = np.array(image) 34 dims_in_image = 3 35 if cv_image.shape[2] == dims_in_image: 36 cv_image = cv2.cvtColor(cv_image, cv2.COLOR_RGB2BGR) 37 app = get_app() 38 faces = app.get(cv_image) 39 return [ 40 FaceBox( 41 position=coordinate_to_proportional(face.bbox.tolist(), image), 42 width=(face.bbox[2] - face.bbox[0]).item() / image.width, 43 height=(face.bbox[3] - face.bbox[1]).item() / image.height, 44 age=face.age, 45 sex=FaceSex(face.sex), 46 confidence=face.det_score.item(), 47 mouth_left=coordinate_to_proportional(face.kps[0].tolist(), image), 48 mouth_right=coordinate_to_proportional(face.kps[1].tolist(), image), 49 nose_tip=coordinate_to_proportional(face.kps[2].tolist(), image), 50 eye_left=coordinate_to_proportional(face.kps[3].tolist(), image), 51 eye_right=coordinate_to_proportional(face.kps[4].tolist(), image), 52 embedding=face.normed_embedding.tolist(), 53 ) 54 for face in faces 55 ]
Detect and embed faces from an image.
9class ObjectDetectionProtocol(Protocol): 10 """Protocol for object detection.""" 11 12 def detect_objects(self, image: Image) -> list[ObjectBox]: 13 """Check if an image has legible text."""
Protocol for object detection.
1771def _no_init_or_replace_init(self, *args, **kwargs): 1772 cls = type(self) 1773 1774 if cls._is_protocol: 1775 raise TypeError('Protocols cannot be instantiated') 1776 1777 # Already using a custom `__init__`. No need to calculate correct 1778 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1779 if cls.__init__ is not _no_init_or_replace_init: 1780 return 1781 1782 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1783 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1784 # searches for a proper new `__init__` in the MRO. The new `__init__` 1785 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1786 # instantiation of the protocol subclass will thus use the new 1787 # `__init__` and no longer call `_no_init_or_replace_init`. 1788 for base in cls.__mro__: 1789 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1790 if init is not _no_init_or_replace_init: 1791 cls.__init__ = init 1792 break 1793 else: 1794 # should not happen 1795 cls.__init__ = object.__init__ 1796 1797 cls.__init__(self, *args, **kwargs)
37class ResnetObjectDetection(ObjectDetectionProtocol): 38 """Object detection implementation using the ResNet model.""" 39 40 def detect_objects(self, image: Image) -> list[ObjectBox]: 41 """Detect objects in an image.""" 42 # you can specify the revision tag if you don't want the timm dependency 43 processor, model = get_model_and_processor() 44 45 inputs = processor(images=image, return_tensors="pt") 46 outputs = model(**inputs) 47 48 target_sizes = torch.tensor([image.size[::-1]]) 49 results = processor.post_process_object_detection( 50 outputs, 51 target_sizes=target_sizes, # type: ignore[arg-type] 52 threshold=0.8, 53 )[0] 54 55 return [ 56 ObjectBox( 57 confidence=score.item(), 58 position=coordinate_to_proportional( 59 (float(box[0].item()), float(box[1].item())), 60 image, 61 ), 62 width=(box[2].item() - box[0].item()) / image.width, 63 height=(box[3].item() - box[1].item()) / image.height, 64 label=model.config.id2label[label.item()], # type: ignore[index] 65 ) 66 for score, label, box in zip( 67 results["scores"], results["labels"], results["boxes"], strict=False 68 ) 69 ]
Object detection implementation using the ResNet model.
40 def detect_objects(self, image: Image) -> list[ObjectBox]: 41 """Detect objects in an image.""" 42 # you can specify the revision tag if you don't want the timm dependency 43 processor, model = get_model_and_processor() 44 45 inputs = processor(images=image, return_tensors="pt") 46 outputs = model(**inputs) 47 48 target_sizes = torch.tensor([image.size[::-1]]) 49 results = processor.post_process_object_detection( 50 outputs, 51 target_sizes=target_sizes, # type: ignore[arg-type] 52 threshold=0.8, 53 )[0] 54 55 return [ 56 ObjectBox( 57 confidence=score.item(), 58 position=coordinate_to_proportional( 59 (float(box[0].item()), float(box[1].item())), 60 image, 61 ), 62 width=(box[2].item() - box[0].item()) / image.width, 63 height=(box[3].item() - box[1].item()) / image.height, 64 label=model.config.id2label[label.item()], # type: ignore[index] 65 ) 66 for score, label, box in zip( 67 results["scores"], results["labels"], results["boxes"], strict=False 68 ) 69 ]
Detect objects in an image.
9class OCRProtocol(Protocol): 10 """Protocol for OCR.""" 11 12 def has_legible_text(self, image: Image) -> bool: 13 """Check if an image has legible text.""" 14 15 def get_text(self, image: Image, languages: tuple[str, ...]) -> str: 16 """Extract text from an image using OCR.""" 17 18 def get_boxes(self, image: Image, languages: tuple[str, ...]) -> list[OCRBox]: 19 """Get bounding boxes of text."""
Protocol for OCR.
1771def _no_init_or_replace_init(self, *args, **kwargs): 1772 cls = type(self) 1773 1774 if cls._is_protocol: 1775 raise TypeError('Protocols cannot be instantiated') 1776 1777 # Already using a custom `__init__`. No need to calculate correct 1778 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1779 if cls.__init__ is not _no_init_or_replace_init: 1780 return 1781 1782 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1783 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1784 # searches for a proper new `__init__` in the MRO. The new `__init__` 1785 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1786 # instantiation of the protocol subclass will thus use the new 1787 # `__init__` and no longer call `_no_init_or_replace_init`. 1788 for base in cls.__mro__: 1789 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1790 if init is not _no_init_or_replace_init: 1791 cls.__init__ = init 1792 break 1793 else: 1794 # should not happen 1795 cls.__init__ = object.__init__ 1796 1797 cls.__init__(self, *args, **kwargs)
35class ResnetTesseractOCR(OCRProtocol): 36 """OCR implementation using the ResNet model and Tesseract.""" 37 38 def has_legible_text(self, image: Image) -> bool: 39 """Check if an image has legible text.""" 40 resized_image = image.convert("RGB").resize((300, 300)) 41 model, processor = get_detector_model_and_processor() 42 inputs = processor(resized_image, return_tensors="pt").pixel_values 43 44 with torch.no_grad(): 45 outputs = model(inputs) 46 logits_per_image = outputs.logits 47 probs = logits_per_image.softmax(dim=1) 48 has_legible_text = (probs[0][1] > probs[0][0]).item() 49 assert isinstance(has_legible_text, bool) 50 return has_legible_text 51 52 def get_text(self, image: Image, languages: tuple[str, ...]) -> str: 53 """Extract text from an image using OCR.""" 54 extracted_text = pytesseract.image_to_string( 55 image, 56 lang="+".join(languages), 57 ) 58 assert isinstance(extracted_text, str) 59 return extracted_text 60 61 def get_boxes(self, image: Image, languages: tuple[str, ...]) -> list[OCRBox]: 62 """Get bounding boxes of text.""" 63 ocr_data = pytesseract.image_to_data( 64 image, 65 lang="+".join(languages), 66 output_type=Output.DICT, 67 ) 68 69 boxes: list[OCRBox] = [] 70 for i in range(len(ocr_data["level"])): 71 box = OCRBox( 72 position=coordinate_to_proportional( 73 [ocr_data["left"][i], ocr_data["top"][i]], 74 image, 75 ), 76 width=ocr_data["width"][i] / image.width, 77 height=ocr_data["height"][i] / image.height, 78 text=ocr_data["text"][i], 79 confidence=ocr_data["conf"][i] / 100, 80 ) 81 if box.text.strip() == "" or box.confidence < 0: 82 continue 83 boxes.append(box) 84 85 return boxes
OCR implementation using the ResNet model and Tesseract.
38 def has_legible_text(self, image: Image) -> bool: 39 """Check if an image has legible text.""" 40 resized_image = image.convert("RGB").resize((300, 300)) 41 model, processor = get_detector_model_and_processor() 42 inputs = processor(resized_image, return_tensors="pt").pixel_values 43 44 with torch.no_grad(): 45 outputs = model(inputs) 46 logits_per_image = outputs.logits 47 probs = logits_per_image.softmax(dim=1) 48 has_legible_text = (probs[0][1] > probs[0][0]).item() 49 assert isinstance(has_legible_text, bool) 50 return has_legible_text
Check if an image has legible text.
52 def get_text(self, image: Image, languages: tuple[str, ...]) -> str: 53 """Extract text from an image using OCR.""" 54 extracted_text = pytesseract.image_to_string( 55 image, 56 lang="+".join(languages), 57 ) 58 assert isinstance(extracted_text, str) 59 return extracted_text
Extract text from an image using OCR.
61 def get_boxes(self, image: Image, languages: tuple[str, ...]) -> list[OCRBox]: 62 """Get bounding boxes of text.""" 63 ocr_data = pytesseract.image_to_data( 64 image, 65 lang="+".join(languages), 66 output_type=Output.DICT, 67 ) 68 69 boxes: list[OCRBox] = [] 70 for i in range(len(ocr_data["level"])): 71 box = OCRBox( 72 position=coordinate_to_proportional( 73 [ocr_data["left"][i], ocr_data["top"][i]], 74 image, 75 ), 76 width=ocr_data["width"][i] / image.width, 77 height=ocr_data["height"][i] / image.height, 78 text=ocr_data["text"][i], 79 confidence=ocr_data["conf"][i] / 100, 80 ) 81 if box.text.strip() == "" or box.confidence < 0: 82 continue 83 boxes.append(box) 84 85 return boxes
Get bounding boxes of text.
10class LLMCaptioner(CaptionerProtocol): 11 """Captioner implementation using a large language model (LLM).""" 12 13 llm_provider: BaseVisualLLM 14 prompt: str = ( 15 "You are a BLIP image captioning model. " 16 "Generate a short caption for this image. " 17 "Examples: 'A plate of hotdogs', " 18 "'A bedroom with a bed and chair', " 19 "'A group of people by a lake', " 20 "'A tabby cat on a bed'. " 21 "Only output the caption!" 22 ) 23 24 def __init__(self, provider: LLMProvider) -> None: 25 """Initialize the LLM captioner.""" 26 self.llm_provider = get_llm_by_provider(provider) 27 28 def caption(self, image: Image, instruction: str | None = None) -> str: 29 """Generate a caption for the given image. 30 31 Args: 32 image: The image to caption. 33 instruction: Optional instruction to prompt the caption model. 34 """ 35 caption = self.llm_provider.image_question( 36 image=image, 37 question=self.prompt if instruction is None else instruction, 38 ) 39 return caption.replace('"', "").replace("'", "")
Captioner implementation using a large language model (LLM).
24 def __init__(self, provider: LLMProvider) -> None: 25 """Initialize the LLM captioner.""" 26 self.llm_provider = get_llm_by_provider(provider)
Initialize the LLM captioner.
28 def caption(self, image: Image, instruction: str | None = None) -> str: 29 """Generate a caption for the given image. 30 31 Args: 32 image: The image to caption. 33 instruction: Optional instruction to prompt the caption model. 34 """ 35 caption = self.llm_provider.image_question( 36 image=image, 37 question=self.prompt if instruction is None else instruction, 38 ) 39 return caption.replace('"', "").replace("'", "")
Generate a caption for the given image.
Arguments:
- image: The image to caption.
- instruction: Optional instruction to prompt the caption model.
27class BaseVisualLLM(ABC): 28 """Base class for visual language models.""" 29 30 def image_question(self, image: Image, question: str) -> str: 31 """Ask a question about an image.""" 32 return self.images_question([image], question) 33 34 def images_question(self, images: list[Image], question: str) -> str: 35 """Ask a question about multiple images.""" 36 return str.join("", self.stream_chat([ChatMessage(message=question, images=images)])) 37 38 @abstractmethod 39 def stream_chat( 40 self, 41 messages: list[ChatMessage], 42 convert_images: bool = True, 43 temperature: float = 0.7, 44 max_tokens: int = 500, 45 ) -> Generator[str, None, None]: 46 """LLM chat that gives streaming output."""
Base class for visual language models.
30 def image_question(self, image: Image, question: str) -> str: 31 """Ask a question about an image.""" 32 return self.images_question([image], question)
Ask a question about an image.
34 def images_question(self, images: list[Image], question: str) -> str: 35 """Ask a question about multiple images.""" 36 return str.join("", self.stream_chat([ChatMessage(message=question, images=images)]))
Ask a question about multiple images.
38 @abstractmethod 39 def stream_chat( 40 self, 41 messages: list[ChatMessage], 42 convert_images: bool = True, 43 temperature: float = 0.7, 44 max_tokens: int = 500, 45 ) -> Generator[str, None, None]: 46 """LLM chat that gives streaming output."""
LLM chat that gives streaming output.
36class MiniCPMLLM(BaseVisualLLM): 37 """Mini CPM LLM implementation.""" 38 39 def stream_chat( 40 self, 41 messages: list[ChatMessage], 42 convert_images: bool = True, 43 temperature: float = 0.7, 44 max_tokens: int = 500, # noqa: ARG002 45 ) -> Generator[str, None, None]: 46 """Mini CPM LLM chat that gives streaming output.""" 47 if convert_images: 48 for msg in messages: 49 msg.images = [image.convert(mode="RGB") for image in msg.images] 50 51 model, tokenizer = get_model_and_tokenizer() 52 formatted_msgs = [ 53 {"role": msg.role.value.lower(), "content": [*msg.images, msg.message]} 54 for msg in messages 55 ] 56 result = model.chat( # type: ignore[operator] 57 image=None, 58 msgs=formatted_msgs, 59 tokenizer=tokenizer, 60 sampling=True, 61 temperature=temperature, 62 stream=True, 63 ) 64 assert isinstance(result, Generator) 65 return result
Mini CPM LLM implementation.
39 def stream_chat( 40 self, 41 messages: list[ChatMessage], 42 convert_images: bool = True, 43 temperature: float = 0.7, 44 max_tokens: int = 500, # noqa: ARG002 45 ) -> Generator[str, None, None]: 46 """Mini CPM LLM chat that gives streaming output.""" 47 if convert_images: 48 for msg in messages: 49 msg.images = [image.convert(mode="RGB") for image in msg.images] 50 51 model, tokenizer = get_model_and_tokenizer() 52 formatted_msgs = [ 53 {"role": msg.role.value.lower(), "content": [*msg.images, msg.message]} 54 for msg in messages 55 ] 56 result = model.chat( # type: ignore[operator] 57 image=None, 58 msgs=formatted_msgs, 59 tokenizer=tokenizer, 60 sampling=True, 61 temperature=temperature, 62 stream=True, 63 ) 64 assert isinstance(result, Generator) 65 return result
Mini CPM LLM chat that gives streaming output.
47class OpenAILLM(MiniCPMLLM): 48 """OpenAI LLM implementation.""" 49 50 model_name: str 51 client: OpenAI 52 53 def __init__(self, model_name: str = "gpt-4o-mini") -> None: 54 """Initialize the OpenAI LLM.""" 55 super().__init__() 56 self.model_name = model_name 57 self.client = OpenAI() 58 59 def stream_chat( 60 self, 61 messages: list[ChatMessage], 62 convert_images: bool = True, # noqa: ARG002 63 temperature: float = 0.7, 64 max_tokens: int = 500, 65 ) -> Generator[str, None, None]: # pragma: no cover 66 """OpenAI LLM chat that gives streaming output.""" 67 dict_messages = list(map(chat_to_dict, messages)) 68 69 response = self.client.chat.completions.create( 70 model=self.model_name, 71 messages=dict_messages, # type: ignore[arg-type] 72 max_tokens=max_tokens, 73 temperature=temperature, 74 stream=True, 75 ) 76 77 for chunk in response: 78 chunk_content: str | None = chunk.choices[0].delta.content # type: ignore[union-attr] 79 if chunk_content is not None: 80 yield chunk_content
OpenAI LLM implementation.
53 def __init__(self, model_name: str = "gpt-4o-mini") -> None: 54 """Initialize the OpenAI LLM.""" 55 super().__init__() 56 self.model_name = model_name 57 self.client = OpenAI()
Initialize the OpenAI LLM.
59 def stream_chat( 60 self, 61 messages: list[ChatMessage], 62 convert_images: bool = True, # noqa: ARG002 63 temperature: float = 0.7, 64 max_tokens: int = 500, 65 ) -> Generator[str, None, None]: # pragma: no cover 66 """OpenAI LLM chat that gives streaming output.""" 67 dict_messages = list(map(chat_to_dict, messages)) 68 69 response = self.client.chat.completions.create( 70 model=self.model_name, 71 messages=dict_messages, # type: ignore[arg-type] 72 max_tokens=max_tokens, 73 temperature=temperature, 74 stream=True, 75 ) 76 77 for chunk in response: 78 chunk_content: str | None = chunk.choices[0].delta.content # type: ignore[union-attr] 79 if chunk_content is not None: 80 yield chunk_content
OpenAI LLM chat that gives streaming output.
7@dataclass 8class BaseBoundingBox: 9 """Base class for a bounding box with position and size. 10 11 Attributes: 12 position: The position of the bounding box, proportional to the full image width and height. 13 width: The width of the bounding box. 14 height: The height of the bounding box. 15 confidence: The confidence of the detected item (OCR/Object/Face). 16 """ 17 18 # position, width, height are proportional to full image width/height 19 position: tuple[float, float] 20 width: float 21 height: float 22 confidence: float
Base class for a bounding box with position and size.
Attributes:
- position: The position of the bounding box, proportional to the full image width and height.
- width: The width of the bounding box.
- height: The height of the bounding box.
- confidence: The confidence of the detected item (OCR/Object/Face).
36@dataclass 37class OCRBox(BaseBoundingBox): 38 """Represents a bounding box for OCR with text content. 39 40 Attributes: 41 text: The recognized text within the bounding box. 42 """ 43 44 text: str
Represents a bounding box for OCR with text content.
Attributes:
- text: The recognized text within the bounding box.
25@dataclass 26class ObjectBox(BaseBoundingBox): 27 """Represents an object bounding box with a label. 28 29 Attributes: 30 label: The label of the detected object. 31 """ 32 33 label: str
Represents an object bounding box with a label.
Attributes:
- label: The label of the detected object.
47@dataclass 48class FaceBox(BaseBoundingBox): 49 """Represents a face bounding box with facial attributes. 50 51 Attributes: 52 age: The estimated age of the person. 53 sex: The gender of the person. 54 mouth_left: The position of the left mouth corner. 55 mouth_right: The position of the right mouth corner. 56 nose_tip: The position of the nose tip. 57 eye_left: The position of the left eye. 58 eye_right: The position of the right eye. 59 embedding: The facial embedding vector. 60 """ 61 62 age: int 63 sex: FaceSex 64 mouth_left: tuple[float, float] 65 mouth_right: tuple[float, float] 66 nose_tip: tuple[float, float] 67 eye_left: tuple[float, float] 68 eye_right: tuple[float, float] 69 embedding: list[float]
Represents a face bounding box with facial attributes.
Attributes:
- age: The estimated age of the person.
- sex: The gender of the person.
- mouth_left: The position of the left mouth corner.
- mouth_right: The position of the right mouth corner.
- nose_tip: The position of the nose tip.
- eye_left: The position of the left eye.
- eye_right: The position of the right eye.
- embedding: The facial embedding vector.
Enum for sex of the detected person.
77@dataclass 78class FullAnalyzerConfig: 79 """A configuration class for the full analyzer. 80 81 Attributes: 82 llm: The language model. 83 captioner: The captioning model. 84 ocr: The OCR implementation. 85 embedder: The embedder implementation. 86 settings: The analyzer settings. 87 """ 88 89 llm: BaseVisualLLM 90 captioner: CaptionerProtocol 91 ocr: OCRProtocol 92 embedder: EmbedderProtocol 93 object_detector: ObjectDetectionProtocol 94 facial_recognition: FacialRecognitionProtocol 95 settings: AnalyzerSettings
A configuration class for the full analyzer.
Attributes:
- llm: The language model.
- captioner: The captioning model.
- ocr: The OCR implementation.
- embedder: The embedder implementation.
- settings: The analyzer settings.
143@dataclass 144class ImageData: 145 """Comprehensive data for an image. 146 147 Attributes: 148 path: The file system path to the image. 149 frames: A list of frame paths associated with the image. 150 exif: Exif data of the image. 151 data_url: The data URL representation of the image. 152 gps: GPS data associated with the image. 153 time: Time-related data for the image. 154 weather: Weather data at the time the image was taken. 155 """ 156 157 path: Path 158 frames: list[Path] 159 exif: ExifData | None = None 160 data_url: str | None = None 161 gps: GPSData | None = None 162 time: TimeData | IntermediateTimeData | None = None 163 weather: WeatherData | None = None 164 tags: TagData | None = None
Comprehensive data for an image.
Attributes:
- path: The file system path to the image.
- frames: A list of frame paths associated with the image.
- exif: Exif data of the image.
- data_url: The data URL representation of the image.
- gps: GPS data associated with the image.
- time: Time-related data for the image.
- weather: Weather data at the time the image was taken.
142@dataclass 143class FrameData: 144 """Data for a frame, including an image for using during analysis.""" 145 146 image: Image 147 path: Path 148 ocr: OCRData | None = None 149 embedding: list[float] | None = None 150 faces: list[FaceBox] | None = None 151 summary: str | None = None 152 caption_data: CaptionData | None = None 153 objects: list[ObjectBox] | None = None 154 measured_quality: MeasuredQualityData | None = None 155 color: ColorData | None = None
Data for a frame, including an image for using during analysis.