from datetime import datetime from tags import Tags def dt_to_unix_ts(ts: datetime) -> int: return int(ts.timestamp()) def dt_to_mp4_ts(ts: datetime) -> int: return int(ts.timestamp()) + 2082844800 def s2b(s: str): return bytearray(s.encode("ascii")) def i2b(i: int, size: int = 4): return bytearray(i.to_bytes(size, "big")) class MP4MuxerALAC: def __init__(self) -> None: self.data: bytearray = bytearray() self.timestamp: datetime = datetime.now() self.sample_rate: int = 0 self.number_of_samples: int = 0 self.bit_depth: int = 0 self.samples_per_frame: int = 4096 self.channel_count: int = 2 self.sample_sizes: list[int] = [] # these are the only important ones self.offsets: dict = { "stco": 0, "mdat": 0, } self.total_duration: int = 0 self.mdat_data: bytearray = bytearray() self.tags: Tags = None def create(self) -> None: self.ftyp() self.moov() self.free() self.mdat() self.rewrite_stco_chunk() def out(self, filename: str) -> None: with open(filename, "wb") as f: f.write(self.data) def w(self, b: bytearray): if isinstance(b, bytes): b = bytearray(b) self.data.extend(b) def set_sample_rate(self, sr: int) -> None: self.sample_rate = sr def set_number_of_samples(self, nr: int) -> None: self.number_of_samples = nr def set_bit_depth(self, bd: int) -> None: self.bit_depth = bd def set_sample_sizes(self, ss: int) -> None: self.sample_sizes = ss def set_total_duration(self, td: int) -> None: self.total_duration = td def set_mdat_data(self, m: bytearray) -> None: self.mdat_data = m def set_tags(self, t: Tags) -> None: self.tags = t def set_timestamp(self, t: datetime) -> None: self.timestamp = t def ftyp(self) -> None: major_brand: str = "M4A " minor_version: int = 0 compatible_brands: list[str] = ["M4A ", "mp42", "isom"] size: int = 16 + (len(compatible_brands) * 4) self.w(i2b(size)) self.w(s2b("ftyp")) self.w(s2b(major_brand)) self.w(i2b(minor_version)) for c in compatible_brands: self.w(s2b(c)) def moov_size(self) -> int: total_size: int = 8 total_size += self.mvhd_size() total_size += self.trak_size() if self.tags: total_size += self.udta_size() return total_size def moov(self) -> None: self.w(i2b(self.moov_size())) self.w(s2b("moov")) self.mvhd() self.trak() if self.tags: self.udta() def mvhd_size(self) -> int: return 108 def mvhd(self) -> None: version: int = 0 creation_time: datetime = self.timestamp modification_time: datetime = self.timestamp time_scale: int = self.sample_rate duration: int = self.total_duration rate: int = 0x10000 # 1.0 volume: int = 0x100 # 1.0 next_track_id: int = 2 self.w(i2b(self.mvhd_size())) self.w(s2b("mvhd")) self.w(i2b(version)) self.w(i2b(dt_to_mp4_ts(creation_time))) self.w(i2b(dt_to_mp4_ts(modification_time))) self.w(i2b(time_scale)) self.w(i2b(duration)) self.w(i2b(rate)) self.w(i2b(volume, 2)) # const bit(16) reserved = 0 self.w(i2b(0, 2)) # const unsigned int(32)[2] reserved = 0 self.w(i2b(0, 8)) # template int(32)[9] matrix # { 0x00010000,0,0,0,0x00010000,0,0,0,0x40000000 } self.w(i2b(0x10000)) self.w(i2b(0)) self.w(i2b(0)) self.w(i2b(0)) self.w(i2b(0x10000)) self.w(i2b(0)) self.w(i2b(0)) self.w(i2b(0)) self.w(i2b(0x40000000)) # Unity matrix # bit(32)[6] pre_defined = 0 self.w(i2b(0, 24)) self.w(i2b(next_track_id)) def trak_size(self) -> int: total_size: int = 8 total_size += self.tkhd_size() total_size += self.mdia_size() return total_size def trak(self) -> None: self.w(i2b(self.trak_size())) self.w(s2b("trak")) self.tkhd() self.mdia() def tkhd_size(self) -> int: return 92 def tkhd(self) -> None: flags: int = 1 creation_time: datetime = self.timestamp modification_time: datetime = self.timestamp track_id: int = 1 duration: int = self.total_duration layer: int = 0 alternate_group: int = 0 volume: int = 0x100 # 1.0 width: int = 0 height: int = 0 self.w(i2b(self.tkhd_size())) self.w(s2b("tkhd")) self.w(i2b(flags)) self.w(i2b(dt_to_mp4_ts(creation_time))) self.w(i2b(dt_to_mp4_ts(modification_time))) self.w(i2b(track_id)) # const unsigned int (32) reserved = 0 self.w(i2b(0)) self.w(i2b(duration)) # reserved self.w(i2b(0)) # const unsigned int (32) [2] reserved = 0 self.w(i2b(0)) self.w(i2b(layer, 2)) self.w(i2b(alternate_group, 2)) # template int (16) volume = {if track_is_audio 0x0100 else 0} self.w(i2b(volume, 2)) # const unsigned int (16) reserved = 0 self.w(i2b(0, 2)) # template int (32) [9] matrix # { 0x00010000,0,0,0,0x00010000,0,0,0,0x40000000 } self.w(i2b(0x10000)) self.w(i2b(0)) self.w(i2b(0)) self.w(i2b(0)) self.w(i2b(0x10000)) self.w(i2b(0)) self.w(i2b(0)) self.w(i2b(0)) self.w(i2b(0x40000000)) # Unity matrix # unsigned int (32) width self.w(i2b(width)) # unsigned int (32) height self.w(i2b(height)) def mdia_size(self) -> int: total_size: int = 8 total_size += self.mdhd_size() total_size += self.hdlr_size() total_size += self.minf_size() return total_size def mdia(self) -> None: self.w(i2b(self.mdia_size())) self.w(s2b("mdia")) self.mdhd() self.hdlr() self.minf() def mdhd_size(self) -> int: return 32 def mdhd(self) -> None: version: int = 0 flags: int = 0 creation_time: datetime = self.timestamp modification_time: datetime = self.timestamp time_scale: int = self.sample_rate duration: int = self.total_duration language: int = 0x55C4 # undefined quality: int = 0 self.w(i2b(self.mdhd_size())) self.w(s2b("mdhd")) self.w(i2b(version, 1)) self.w(i2b(flags, 3)) self.w(i2b(dt_to_mp4_ts(creation_time))) self.w(i2b(dt_to_mp4_ts(modification_time))) self.w(i2b(time_scale)) self.w(i2b(duration)) self.w(i2b(language, 2)) self.w(i2b(quality, 2)) def hdlr_size(self) -> int: return 32 def hdlr(self) -> None: version: int = 0 flags: int = 0 component_type = "mhlr" # media handler component_subtype = "soun" component_name = 0 component_flags = 0 component_flags_mask = 0 self.w(i2b(self.hdlr_size())) self.w(s2b("hdlr")) self.w(i2b(version, 1)) self.w(i2b(flags, 3)) self.w(s2b(component_type)) self.w(s2b(component_subtype)) self.w(i2b(component_name)) self.w(i2b(component_flags)) self.w(i2b(component_flags_mask)) def minf_size(self) -> int: total_size: int = 8 total_size += self.smhd_size() total_size += self.dinf_size() total_size += self.stbl_size() return total_size def minf(self) -> None: self.w(i2b(self.minf_size())) self.w(s2b("minf")) self.smhd() self.dinf() self.stbl() def smhd_size(self) -> int: return 16 def smhd(self) -> None: version: int = 0 flags: int = 0 audio_balance: int = 0 self.w(i2b(self.smhd_size())) self.w(s2b("smhd")) self.w(i2b(version, 1)) self.w(i2b(flags, 3)) self.w(i2b(audio_balance, 2)) self.w(i2b(0, 2)) # reserved def dinf_size(self) -> int: total_size: int = 8 total_size += self.dref_size() return total_size def dinf(self) -> None: self.w(i2b(self.dinf_size())) self.w(s2b("dinf")) self.dref() pass def dref_size(self) -> int: return 28 def dref(self) -> None: version: int = 0 flags: int = 0 entry_count: int = 1 self.w(i2b(self.dref_size())) self.w(s2b("dref")) self.w(i2b(version, 1)) self.w(i2b(flags, 3)) self.w(i2b(entry_count)) data_location_size: int = 12 data_location_name: str = "url " data_location_version: int = 0 data_location_flags: int = 1 # same file self.w(i2b(data_location_size)) self.w(s2b(data_location_name)) self.w(i2b(data_location_version, 1)) self.w(i2b(data_location_flags, 3)) def stbl_size(self) -> int: total_size: int = 8 total_size += self.stsd_size() total_size += self.stts_size() total_size += self.stsz_size() total_size += self.stsc_size() total_size += self.stco_size() return total_size def stbl(self) -> None: self.w(i2b(self.stbl_size())) self.w(s2b("stbl")) self.stsd() self.stts() self.stsz() self.stsc() self.stco() def stsd_size(self) -> int: return 88 def stsd(self) -> None: version: int = 0 flags: int = 0 count: int = 1 audio_size: int = 72 audio_name: str = "alac" channel_count: int = self.channel_count sample_size: int = self.bit_depth sample_rate: int = self.sample_rate samples_per_frame: int = self.samples_per_frame max_coded_frame_size: int = max(self.sample_sizes) self.w(i2b(self.stsd_size())) self.w(s2b("stsd")) self.w(i2b(version, 1)) self.w(i2b(flags, 3)) self.w(i2b(count)) self.w(i2b(audio_size)) self.w(s2b(audio_name)) self.w(i2b(0, 6)) # reserved self.w(i2b(1, 2)) # data reference index self.w(i2b(0)) # reserved self.w(i2b(0)) # reserved self.w(i2b(channel_count, 2)) self.w(i2b(sample_size, 2)) self.w(i2b(0, 2)) # pre-defined self.w(i2b(0, 2)) # reserved if sample_rate <= 65535: self.w(i2b(sample_rate, 2)) else: self.w(i2b(0, 2)) self.w(i2b(0, 2)) # sample rate (again? set to zero for some reason) # magic cookie starts here self.w(i2b(36)) # size self.w(s2b("alac")) self.w(i2b(0)) # reserved self.w(i2b(samples_per_frame)) self.w(i2b(0, 1)) # reserved self.w(i2b(sample_size, 1)) self.w(i2b(40, 1)) # rice history mult, pb, tuning parameter self.w(i2b(10, 1)) # rice initial history, mb, tuning parameter self.w(i2b(14, 1)) # rice kmodifier, kb, tuning parameter self.w(i2b(channel_count, 1)) self.w(i2b(255, 2)) # maxRun, currently unused self.w(i2b(max_coded_frame_size)) self.w(i2b(self.sample_rate * self.bit_depth * self.channel_count)) # bitrate self.w(i2b(sample_rate)) def stts_size(self) -> int: total_size: int = 16 # size of the stts box depends on the number of entries number_of_entries: int = 2 if self.total_duration % self.samples_per_frame == 0: number_of_entries = 1 total_size += 8 * number_of_entries return total_size def stts(self) -> None: version: int = 0 flags: int = 0 number_of_entries: int = 2 if self.total_duration % self.samples_per_frame == 0: number_of_entries = 1 self.w(i2b(self.stts_size())) self.w(s2b("stts")) self.w(i2b(version, 1)) self.w(i2b(flags, 3)) self.w(i2b(number_of_entries)) if number_of_entries == 1: self.w(i2b(len(self.sample_sizes))) self.w(i2b(self.samples_per_frame)) elif number_of_entries == 2: self.w(i2b(len(self.sample_sizes) - 1)) self.w(i2b(self.samples_per_frame)) self.w(i2b(1)) self.w(i2b(self.total_duration % self.samples_per_frame)) def stsz_size(self) -> int: total_size: int = 20 total_size += 4 * len(self.sample_sizes) return total_size def stsz(self) -> None: version: int = 0 flags: int = 0 sample_size: int = 0 sample_count: int = len(self.sample_sizes) self.w(i2b(self.stsz_size())) self.w(s2b("stsz")) self.w(i2b(version, 1)) self.w(i2b(flags, 3)) self.w(i2b(sample_size)) self.w(i2b(sample_count)) for s in self.sample_sizes: self.w(i2b(s)) def stsc_size(self) -> int: number_of_entries: int = 1 entries_per_second: int = int(round(self.sample_rate / 4096)) last_entry: int = len(self.sample_sizes) % entries_per_second if last_entry != 0: number_of_entries = 2 total_size: int = 16 + (number_of_entries * 12) return total_size # we only write a single chunk # not sure what the side effects of this move are def stsc(self) -> None: version: int = 0 flags: int = 0 number_of_entries: int = 1 entries_per_second: int = int(round(self.sample_rate / 4096)) last_entry: int = len(self.sample_sizes) % entries_per_second first_chunk_count = int( (len(self.sample_sizes) - last_entry) / entries_per_second ) entries = [] entries.append( { "first_chunk": 1, "samples_per_chunk": entries_per_second, "sample_description_index": 1, } ) if last_entry != 0: number_of_entries = 2 entries.append( { "first_chunk": first_chunk_count + 1, "samples_per_chunk": last_entry, "sample_description_index": 1, } ) self.w(i2b(self.stsc_size())) self.w(s2b("stsc")) self.w(i2b(version, 1)) self.w(i2b(flags, 3)) self.w(i2b(number_of_entries)) for s in entries: first_chunk: int = s["first_chunk"] samples_per_chunk: int = s["samples_per_chunk"] sample_description_index: int = s["sample_description_index"] self.w(i2b(first_chunk)) self.w(i2b(samples_per_chunk)) self.w(i2b(sample_description_index)) def stco_size(self) -> int: total_size: int = 16 entries_per_second: int = int(round(self.sample_rate / 4096)) last_entry: int = len(self.sample_sizes) % entries_per_second first_chunk_count = int( (len(self.sample_sizes) - last_entry) / entries_per_second ) number_of_stco_entries = first_chunk_count if last_entry != 0: number_of_stco_entries += 1 total_size += 4 * number_of_stco_entries return total_size def stco(self) -> None: version: int = 0 flags: int = 0 self.offsets["stco"] = len(self.data) entries_per_second: int = int(round(self.sample_rate / 4096)) last_entry: int = len(self.sample_sizes) % entries_per_second first_chunk_count = int( (len(self.sample_sizes) - last_entry) / entries_per_second ) number_of_stco_entries = first_chunk_count if last_entry != 0: number_of_stco_entries += 1 self.w(i2b(self.stco_size())) self.w(s2b("stco")) self.w(i2b(version, 1)) self.w(i2b(flags, 3)) self.w(i2b(number_of_stco_entries)) for s in range(number_of_stco_entries): self.w(i2b(0xFFFFFFFF)) # placeholder value def free(self) -> None: self.w(i2b(8)) self.w(s2b("free")) def mdat(self) -> None: self.offsets["mdat"] = len(self.data) self.w(i2b(8 + len(self.mdat_data))) self.w(s2b("mdat")) self.w(self.mdat_data) def rewrite_stco_chunk(self) -> None: stco_pos: int = self.offsets["stco"] + 16 first_chunk_offset: int = self.offsets["mdat"] + 8 entries_per_second: int = int(round(self.sample_rate / 4096)) last_entry: int = len(self.sample_sizes) % entries_per_second first_chunk_count = int( (len(self.sample_sizes) - last_entry) / entries_per_second ) number_of_stco_entries = first_chunk_count if last_entry != 0: number_of_stco_entries += 1 for s in range(number_of_stco_entries): bytes_to_write = i2b( first_chunk_offset + (sum(self.sample_sizes[: entries_per_second * s])) ) for index, b in enumerate(bytes_to_write): self.data[stco_pos + (s * 4) + index] = b def udta_size(self) -> int: return 8 + self.meta_size() def udta(self) -> None: self.w(i2b(self.udta_size())) self.w(s2b("udta")) self.meta() def meta_size(self) -> int: total_size: int = 12 total_size += self.meta_hdlr_size() total_size += self.ilst_size() return total_size def meta(self) -> None: version: int = 0 flags: int = 0 self.w(i2b(self.meta_size())) self.w(s2b("meta")) self.w(i2b(version, 1)) self.w(i2b(flags, 3)) self.meta_hdlr() self.ilst() def meta_hdlr_size(self) -> int: return 33 def meta_hdlr(self) -> None: version: int = 0 flags: int = 0 type_quicktime: int = 0 metadata_type: str = "mdir" manufacturer: str = "appl" component_reserved_flags: int = 0 component_reserved_flags_mask: int = 0 component_type_name: int = 0 self.w(i2b(self.meta_hdlr_size())) self.w(s2b("hdlr")) self.w(i2b(version, 1)) self.w(i2b(flags, 3)) self.w(i2b(type_quicktime)) self.w(s2b(metadata_type)) self.w(s2b(manufacturer)) self.w(i2b(component_reserved_flags)) self.w(i2b(component_reserved_flags_mask)) self.w(i2b(component_type_name, 1)) def ilst_size(self) -> int: total_size: int = 8 if self.tags.track_name: total_size += 24 + len(self.tags.track_name.encode("utf-8")) if self.tags.artist: total_size += 24 + len(self.tags.artist.encode("utf-8")) if self.tags.album_artist: total_size += 24 + len(self.tags.album_artist.encode("utf-8")) if self.tags.composer: total_size += 24 + len(self.tags.composer.encode("utf-8")) if self.tags.album_name: total_size += 24 + len(self.tags.album_name.encode("utf-8")) if self.tags.genre: total_size += 24 + len(self.tags.genre.encode("utf-8")) if self.tags.date: total_size += 24 + len(self.tags.date.encode("utf-8")) if self.tags.isrc: total_size += 24 + len(self.tags.isrc.encode("utf-8")) if self.tags.copyright: total_size += 24 + len(self.tags.copyright.encode("utf-8")) if self.tags.track_number or self.tags.total_number_of_tracks: total_size += 32 if self.tags.disc_number or self.tags.total_number_of_discs: total_size += 32 if self.tags.upc: total_size += len(self.tags.upc.encode("utf-8")) + 64 + len("UPC") if self.tags.label: total_size += len(self.tags.label.encode("utf-8")) + 64 + len("LABEL") if self.tags.apple_store_catalog_id: total_size += 28 if self.tags.playlist_id: total_size += 28 if self.tags.album_title_id: total_size += 28 if self.tags.cover_data: total_size += 24 + len(self.tags.cover_data) return total_size def write_mp4_tag_utf8(self, box: bytes, content: str): b: bytearray = bytearray(content.encode("utf-8")) data_size: int = len(b) + 16 self.w(i2b(data_size + 8)) self.w(box) self.w(i2b(data_size)) self.w(s2b("data")) # 0 = binary; 1 = utf-8 kind: int = 1 language: int = 0 self.w(i2b(kind)) self.w(i2b(language)) self.w(b) def write_mp4_tag_int(self, box: bytes, content: int): kind: int = 21 # signed integer language: int = 0 size: int = 24 + len(box) self.w(i2b(size)) self.w(box) size -= 8 self.w(i2b(size)) self.w(s2b("data")) self.w(i2b(kind)) self.w(i2b(language)) self.w(i2b(content)) def write_itunes_tag_utf8(self, box: str, content: str): b: bytearray = bytearray(content.encode("utf-8")) full_size: int = len(b) + 64 + len(box) self.w(i2b(full_size)) self.w(s2b("----")) mean_size: int = 28 self.w(i2b(mean_size)) self.w(s2b("mean")) self.w(i2b(0)) # unknown self.w(s2b("com.apple.iTunes")) name_size: int = 12 + len(box) self.w(i2b(name_size)) self.w(s2b("name")) self.w(i2b(0)) # unknown self.w(s2b(box)) data_size = len(b) + 16 self.w(i2b(data_size)) self.w(s2b("data")) # 0 = binary; 1 = utf-8 kind: int = 1 language: int = 0 self.w(i2b(kind)) self.w(i2b(language)) self.w(b) def write_mp4_tag_tuple_int(self, box: bytes, curr: int, total: int): self.w(i2b(32)) # size self.w(box) self.w(i2b(24)) # data size self.w(s2b("data")) kind: int = 0 # binary language: int = 0 self.w(i2b(kind)) self.w(i2b(language)) self.w(i2b(0, 2)) # reserved self.w(i2b(curr, 2)) self.w(i2b(total, 2)) self.w(i2b(0, 2)) # reserved def ilst(self) -> None: self.w(i2b(self.ilst_size())) self.w(s2b("ilst")) if self.tags.track_name: self.write_mp4_tag_utf8(b"\xA9\x6E\x61\x6D", self.tags.track_name) # ©nam if self.tags.artist: self.write_mp4_tag_utf8(b"\xA9\x41\x52\x54", self.tags.artist) # ©ART if self.tags.album_artist: self.write_mp4_tag_utf8(b"\x61\x41\x52\x54", self.tags.album_artist) # aART if self.tags.composer: self.write_mp4_tag_utf8(b"\xA9\x77\x72\x74", self.tags.composer) # ©wrt if self.tags.album_name: self.write_mp4_tag_utf8(b"\xA9\x61\x6C\x62", self.tags.album_name) # ©alb if self.tags.genre: self.write_mp4_tag_utf8(b"\xA9\x67\x65\x6E", self.tags.genre) # ©gen if self.tags.date: self.write_mp4_tag_utf8(b"\xA9\x64\x61\x79", self.tags.date) # ©day if self.tags.isrc: self.write_mp4_tag_utf8(b"\x49\x53\x52\x43", self.tags.isrc) # ISRC if self.tags.copyright: self.write_mp4_tag_utf8(b"\x63\x70\x72\x74", self.tags.copyright) # cprt if self.tags.apple_store_catalog_id: self.write_mp4_tag_int( b"\x63\x6E\x49\x44", self.tags.apple_store_catalog_id ) # cnID if self.tags.playlist_id: self.write_mp4_tag_int(b"\x70\x6C\x49\x44", self.tags.playlist_id) # plID if self.tags.album_title_id: self.write_mp4_tag_int( b"\x61\x74\x49\x44", self.tags.album_title_id ) # atID if self.tags.upc: self.write_itunes_tag_utf8("UPC", self.tags.upc) if self.tags.label: self.write_itunes_tag_utf8("LABEL", self.tags.label) if self.tags.track_number or self.tags.total_number_of_tracks: curr: int = 0 if self.tags.track_number: curr = self.tags.track_number total: int = 0 if self.tags.total_number_of_tracks: total = self.tags.total_number_of_tracks self.write_mp4_tag_tuple_int(b"\x74\x72\x6B\x6E", curr, total) if self.tags.disc_number or self.tags.total_number_of_discs: curr: int = 0 if self.tags.disc_number: curr = self.tags.disc_number total: int = 0 if self.tags.total_number_of_discs: total = self.tags.total_number_of_discs self.write_mp4_tag_tuple_int(b"\x64\x69\x73\x6B", curr, total) if self.tags.cover_data: if self.tags.cover_format == "jpeg": kind: int = 13 elif self.tags.cover_format == "png": kind: int = 14 language: int = 0 b: bytearray = self.tags.cover_data full_size: int = len(b) + 24 self.w(i2b(full_size)) self.w(s2b("covr")) data_size: int = full_size - 8 self.w(i2b(data_size)) self.w(s2b("data")) self.w(i2b(kind)) self.w(i2b(language)) self.w(b)