From 6c9b0aea4a8fef0c922182837fab7e3070d90230 Mon Sep 17 00:00:00 2001 From: z8 <87996468+0x666690@users.noreply.github.com> Date: Wed, 8 Dec 2021 19:49:06 +0100 Subject: [PATCH] Add code --- README.md | 100 ++++++ mp4muxer_alac.py | 809 ++++++++++++++++++++++++++++++++++++++++++ mp4muxer_eac3.py | 890 +++++++++++++++++++++++++++++++++++++++++++++++ tags.py | 22 ++ 4 files changed, 1821 insertions(+) create mode 100644 README.md create mode 100644 mp4muxer_alac.py create mode 100644 mp4muxer_eac3.py create mode 100644 tags.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..9fa565f --- /dev/null +++ b/README.md @@ -0,0 +1,100 @@ +# MP4 muxer + +## Info + +This code is only meant to work with ALAC and EAC3 (JOC) streams, for any other codecs you're on your own. +Use at your own risk. If you use this to create files and don't validate them afterwards, don't blame me if something goes wrong. + +None of this code will parse MP4 files for you. If your source file is a different MP4 file (or a different format entirely), +you will have to figure out how to parse the required information (see `Usage`) yourself. + +For more information, see [here](https://z8.re/blog/mp4). + + +## Usage + +You will need to provide the following info: +- Sample rate +- Bit depth +- mdat box as a bytearray +- Sizes for all samples contained within the mdat box +- Total duration +- (optional: Tags) + +Writing tags is optional, even without them the file will play back just fine. They're written to `moov/udta/meta/ilst`. + +If you do not create a `Tags` object and/or do not use `set_tags()`, then the `udta` box will not be created. + +Note for EAC3 JOC: This code assumes that there is only a single independed substream present and that the +values in the EAC3 specific box within `stsd` of the source file match the following (I left out the reserved bits here): +```python +fscod = 0 +bsid = 16 +asvc = 0 +bsmod = 0 +acmod = 7 +lfeon = 1 +num_dep_sub = 0 +ec3_job_flag = 1 +joc_complexity_index = 16 +``` + +## Example + +```python +from mp4muxer_alac import MP4MuxerALAC + +m = MP4MuxerALAC() +m.set_sample_rate(44100) +m.set_bit_depth(16) +sample_sizes = [7424, 6915, 6830, 6737, ..., 2700, 1443] +m.set_sample_sizes(sample_sizes) +m.set_total_duration(10442880) +with open("m.dat", "rb") as f: + mdat = bytearray(f.read()) +m.set_mdat_data(mdat) + +t = Tags() +t.track_name = "Insert Interesting Title Here" +t.artist = "Artist X & Y" +t.album_artist = "Artist X" +t.composer = "X, Y, Z" +t.album_name = "First Track - Single" +t.genre = "Dubstep;Country" +t.track_number = 1 +t.total_number_of_tracks = 1 +t.disc_number = 1 +t.total_number_of_discs = 1 +t.date = "2020-05-22" +t.upc = "5556667778889" +t.label = "We Sell Records Rec." +with open("cover.jpg", "rb") as f: + t.cover_data = bytearray(f.read()) +t.cover_format = "jpeg" +t.isrc = "CYZXL20044078" +t.copyright = "℗ 2021 We Sell Records Rec." +t.apple_store_catalog_id = 9999998 +t.album_title_id = 9999999 +t.playlist_id = 32423444 + +m.set_tags(t) +m.create() +m.out("A Piece of Music.m4a") +``` + +For EAC3 JOC the process is similar: + +```python +from mp4muxer_eac3 import MP4MuxerEAC3 + +m = MP4MuxerEAC3() +m.set_sample_rate(sr) +m.set_bit_depth(bd) +m.set_bit_rate(768) +m.set_sample_sizes(sample_sizes) +m.set_timestamp(timestamp) +m.set_mdat_data(mdat) + +... + +``` diff --git a/mp4muxer_alac.py b/mp4muxer_alac.py new file mode 100644 index 0000000..81271d8 --- /dev/null +++ b/mp4muxer_alac.py @@ -0,0 +1,809 @@ +from datetime import datetime +from tags import Tags + + +def dt_to_unix_ts(ts: datetime) -> int: + return int(ts.timestamp()) + + +def dt_to_mp4_ts(ts: datetime) -> int: + return int(ts.timestamp()) + 2082844800 + + +def s2b(s: str): + return bytearray(s.encode("ascii")) + + +def i2b(i: int, size: int = 4): + return bytearray(i.to_bytes(size, "big")) + + +class MP4MuxerALAC: + def __init__(self) -> None: + self.data: bytearray = bytearray() + self.timestamp: datetime = datetime.now() + self.sample_rate: int = 0 + self.number_of_samples: int = 0 + self.bit_depth: int = 0 + self.samples_per_frame: int = 4096 + self.channel_count: int = 2 + self.sample_sizes: list[int] = [] + # these are the only important ones + self.offsets: dict = { + "stco": 0, + "mdat": 0, + } + + self.total_duration: int = 0 + self.mdat_data: bytearray = bytearray() + self.tags: Tags = None + + def create(self) -> None: + self.ftyp() + self.moov() + self.free() + self.mdat() + self.rewrite_stco_chunk() + + def out(self, filename: str) -> None: + with open(filename, "wb") as f: + f.write(self.data) + + def w(self, b: bytearray): + if isinstance(b, bytes): + b = bytearray(b) + self.data.extend(b) + + def set_sample_rate(self, sr: int) -> None: + self.sample_rate = sr + + def set_number_of_samples(self, nr: int) -> None: + self.number_of_samples = nr + + def set_bit_depth(self, bd: int) -> None: + self.bit_depth = bd + + def set_sample_sizes(self, ss: int) -> None: + self.sample_sizes = ss + + def set_total_duration(self, td: int) -> None: + self.total_duration = td + + def set_mdat_data(self, m: bytearray) -> None: + self.mdat_data = m + + def set_tags(self, t: Tags) -> None: + self.tags = t + + def set_timestamp(self, t: datetime) -> None: + self.timestamp = t + + def ftyp(self) -> None: + major_brand: str = "M4A " + minor_version: int = 0 + compatible_brands: list[str] = ["M4A ", "mp42", "isom"] + size: int = 16 + (len(compatible_brands) * 4) + + self.w(i2b(size)) + self.w(s2b("ftyp")) + self.w(s2b(major_brand)) + self.w(i2b(minor_version)) + for c in compatible_brands: + self.w(s2b(c)) + + def moov_size(self) -> int: + total_size: int = 8 + total_size += self.mvhd_size() + total_size += self.trak_size() + if self.tags: + total_size += self.udta_size() + return total_size + + def moov(self) -> None: + self.w(i2b(self.moov_size())) + self.w(s2b("moov")) + self.mvhd() + self.trak() + if self.tags: + self.udta() + + def mvhd_size(self) -> int: + return 108 + + def mvhd(self) -> None: + version: int = 0 + creation_time: datetime = self.timestamp + modification_time: datetime = self.timestamp + time_scale: int = self.sample_rate + duration: int = self.total_duration + rate: int = 0x10000 # 1.0 + volume: int = 0x100 # 1.0 + next_track_id: int = 2 + + self.w(i2b(self.mvhd_size())) + self.w(s2b("mvhd")) + self.w(i2b(version)) + self.w(i2b(dt_to_mp4_ts(creation_time))) + self.w(i2b(dt_to_mp4_ts(modification_time))) + self.w(i2b(time_scale)) + self.w(i2b(duration)) + self.w(i2b(rate)) + self.w(i2b(volume, 2)) + + # const bit(16) reserved = 0 + self.w(i2b(0, 2)) + # const unsigned int(32)[2] reserved = 0 + self.w(i2b(0, 8)) + # template int(32)[9] matrix + # { 0x00010000,0,0,0,0x00010000,0,0,0,0x40000000 } + self.w(i2b(0x10000)) + self.w(i2b(0)) + self.w(i2b(0)) + self.w(i2b(0)) + self.w(i2b(0x10000)) + self.w(i2b(0)) + self.w(i2b(0)) + self.w(i2b(0)) + self.w(i2b(0x40000000)) + # Unity matrix + # bit(32)[6] pre_defined = 0 + self.w(i2b(0, 24)) + self.w(i2b(next_track_id)) + + def trak_size(self) -> int: + total_size: int = 8 + total_size += self.tkhd_size() + total_size += self.mdia_size() + return total_size + + def trak(self) -> None: + self.w(i2b(self.trak_size())) + self.w(s2b("trak")) + self.tkhd() + self.mdia() + + def tkhd_size(self) -> int: + return 92 + + def tkhd(self) -> None: + flags: int = 1 + creation_time: datetime = self.timestamp + modification_time: datetime = self.timestamp + track_id: int = 1 + duration: int = self.total_duration + layer: int = 0 + alternate_group: int = 0 + volume: int = 0x100 # 1.0 + width: int = 0 + height: int = 0 + + self.w(i2b(self.tkhd_size())) + self.w(s2b("tkhd")) + + self.w(i2b(flags)) + self.w(i2b(dt_to_mp4_ts(creation_time))) + self.w(i2b(dt_to_mp4_ts(modification_time))) + self.w(i2b(track_id)) + + # const unsigned int (32) reserved = 0 + self.w(i2b(0)) + self.w(i2b(duration)) + # reserved + self.w(i2b(0)) + # const unsigned int (32) [2] reserved = 0 + self.w(i2b(0)) + self.w(i2b(layer, 2)) + self.w(i2b(alternate_group, 2)) + # template int (16) volume = {if track_is_audio 0x0100 else 0} + self.w(i2b(volume, 2)) + # const unsigned int (16) reserved = 0 + self.w(i2b(0, 2)) + # template int (32) [9] matrix + # { 0x00010000,0,0,0,0x00010000,0,0,0,0x40000000 } + self.w(i2b(0x10000)) + self.w(i2b(0)) + self.w(i2b(0)) + self.w(i2b(0)) + self.w(i2b(0x10000)) + self.w(i2b(0)) + self.w(i2b(0)) + self.w(i2b(0)) + self.w(i2b(0x40000000)) + # Unity matrix + # unsigned int (32) width + self.w(i2b(width)) + # unsigned int (32) height + self.w(i2b(height)) + + def mdia_size(self) -> int: + total_size: int = 8 + total_size += self.mdhd_size() + total_size += self.hdlr_size() + total_size += self.minf_size() + return total_size + + def mdia(self) -> None: + self.w(i2b(self.mdia_size())) + self.w(s2b("mdia")) + self.mdhd() + self.hdlr() + self.minf() + + def mdhd_size(self) -> int: + return 32 + + def mdhd(self) -> None: + version: int = 0 + flags: int = 0 + creation_time: datetime = self.timestamp + modification_time: datetime = self.timestamp + time_scale: int = self.sample_rate + duration: int = self.total_duration + language: int = 0x55C4 # undefined + quality: int = 0 + + self.w(i2b(self.mdhd_size())) + self.w(s2b("mdhd")) + self.w(i2b(version, 1)) + self.w(i2b(flags, 3)) + self.w(i2b(dt_to_mp4_ts(creation_time))) + self.w(i2b(dt_to_mp4_ts(modification_time))) + self.w(i2b(time_scale)) + self.w(i2b(duration)) + self.w(i2b(language, 2)) + self.w(i2b(quality, 2)) + + def hdlr_size(self) -> int: + return 32 + + def hdlr(self) -> None: + version: int = 0 + flags: int = 0 + component_type = "mhlr" # media handler + component_subtype = "soun" + component_name = 0 + component_flags = 0 + component_flags_mask = 0 + + self.w(i2b(self.hdlr_size())) + self.w(s2b("hdlr")) + self.w(i2b(version, 1)) + self.w(i2b(flags, 3)) + self.w(s2b(component_type)) + self.w(s2b(component_subtype)) + self.w(i2b(component_name)) + self.w(i2b(component_flags)) + self.w(i2b(component_flags_mask)) + + def minf_size(self) -> int: + total_size: int = 8 + total_size += self.smhd_size() + total_size += self.dinf_size() + total_size += self.stbl_size() + return total_size + + def minf(self) -> None: + self.w(i2b(self.minf_size())) + self.w(s2b("minf")) + self.smhd() + self.dinf() + self.stbl() + + def smhd_size(self) -> int: + return 16 + + def smhd(self) -> None: + version: int = 0 + flags: int = 0 + audio_balance: int = 0 + + self.w(i2b(self.smhd_size())) + self.w(s2b("smhd")) + self.w(i2b(version, 1)) + self.w(i2b(flags, 3)) + self.w(i2b(audio_balance, 2)) + self.w(i2b(0, 2)) # reserved + + def dinf_size(self) -> int: + total_size: int = 8 + total_size += self.dref_size() + return total_size + + def dinf(self) -> None: + self.w(i2b(self.dinf_size())) + self.w(s2b("dinf")) + self.dref() + pass + + def dref_size(self) -> int: + return 28 + + def dref(self) -> None: + version: int = 0 + flags: int = 0 + entry_count: int = 1 + self.w(i2b(self.dref_size())) + self.w(s2b("dref")) + self.w(i2b(version, 1)) + self.w(i2b(flags, 3)) + self.w(i2b(entry_count)) + + data_location_size: int = 12 + data_location_name: str = "url " + data_location_version: int = 0 + data_location_flags: int = 1 # same file + + self.w(i2b(data_location_size)) + self.w(s2b(data_location_name)) + self.w(i2b(data_location_version, 1)) + self.w(i2b(data_location_flags, 3)) + + def stbl_size(self) -> int: + total_size: int = 8 + total_size += self.stsd_size() + total_size += self.stts_size() + total_size += self.stsz_size() + total_size += self.stsc_size() + total_size += self.stco_size() + return total_size + + def stbl(self) -> None: + self.w(i2b(self.stbl_size())) + self.w(s2b("stbl")) + self.stsd() + self.stts() + self.stsz() + self.stsc() + self.stco() + + def stsd_size(self) -> int: + return 88 + + def stsd(self) -> None: + version: int = 0 + flags: int = 0 + count: int = 1 + audio_size: int = 72 + audio_name: str = "alac" + channel_count: int = self.channel_count + sample_size: int = self.bit_depth + sample_rate: int = self.sample_rate + samples_per_frame: int = self.samples_per_frame + max_coded_frame_size: int = max(self.sample_sizes) + + self.w(i2b(self.stsd_size())) + self.w(s2b("stsd")) + self.w(i2b(version, 1)) + self.w(i2b(flags, 3)) + self.w(i2b(count)) + self.w(i2b(audio_size)) + self.w(s2b(audio_name)) + self.w(i2b(0, 6)) # reserved + self.w(i2b(1, 2)) # data reference index + self.w(i2b(0)) # reserved + self.w(i2b(0)) # reserved + self.w(i2b(channel_count, 2)) + self.w(i2b(sample_size, 2)) + self.w(i2b(0, 2)) # pre-defined + self.w(i2b(0, 2)) # reserved + if sample_rate <= 65535: + self.w(i2b(sample_rate, 2)) + else: + self.w(i2b(0, 2)) + self.w(i2b(0, 2)) # sample rate (again? set to zero for some reason) + # magic cookie starts here + self.w(i2b(36)) # size + self.w(s2b("alac")) + self.w(i2b(0)) # reserved + self.w(i2b(samples_per_frame)) + self.w(i2b(0, 1)) # reserved + self.w(i2b(sample_size, 1)) + self.w(i2b(40, 1)) # rice history mult, pb, tuning parameter + self.w(i2b(10, 1)) # rice initial history, mb, tuning parameter + self.w(i2b(14, 1)) # rice kmodifier, kb, tuning parameter + self.w(i2b(channel_count, 1)) + self.w(i2b(255, 2)) # maxRun, currently unused + self.w(i2b(max_coded_frame_size)) + + self.w(i2b(self.sample_rate * self.bit_depth * self.channel_count)) # bitrate + self.w(i2b(sample_rate)) + + def stts_size(self) -> int: + total_size: int = 16 + # size of the stts box depends on the number of entries + number_of_entries: int = 2 + if self.total_duration % self.samples_per_frame == 0: + number_of_entries = 1 + total_size += 8 * number_of_entries + return total_size + + def stts(self) -> None: + version: int = 0 + flags: int = 0 + number_of_entries: int = 2 + if self.total_duration % self.samples_per_frame == 0: + number_of_entries = 1 + self.w(i2b(self.stts_size())) + self.w(s2b("stts")) + self.w(i2b(version, 1)) + self.w(i2b(flags, 3)) + self.w(i2b(number_of_entries)) + + if number_of_entries == 1: + self.w(i2b(len(self.sample_sizes))) + self.w(i2b(self.samples_per_frame)) + elif number_of_entries == 2: + self.w(i2b(len(self.sample_sizes) - 1)) + self.w(i2b(self.samples_per_frame)) + self.w(i2b(1)) + self.w(i2b(self.total_duration % self.samples_per_frame)) + + def stsz_size(self) -> int: + total_size: int = 20 + total_size += 4 * len(self.sample_sizes) + return total_size + + def stsz(self) -> None: + version: int = 0 + flags: int = 0 + sample_size: int = 0 + sample_count: int = len(self.sample_sizes) + + self.w(i2b(self.stsz_size())) + self.w(s2b("stsz")) + self.w(i2b(version, 1)) + self.w(i2b(flags, 3)) + self.w(i2b(sample_size)) + self.w(i2b(sample_count)) + + for s in self.sample_sizes: + self.w(i2b(s)) + + def stsc_size(self) -> int: + number_of_entries: int = 1 + entries_per_second: int = int(round(self.sample_rate / 4096)) + last_entry: int = len(self.sample_sizes) % entries_per_second + if last_entry != 0: + number_of_entries = 2 + + total_size: int = 16 + (number_of_entries * 12) + return total_size + + # we only write a single chunk + # not sure what the side effects of this move are + def stsc(self) -> None: + version: int = 0 + flags: int = 0 + number_of_entries: int = 1 + entries_per_second: int = int(round(self.sample_rate / 4096)) + last_entry: int = len(self.sample_sizes) % entries_per_second + first_chunk_count = int( + (len(self.sample_sizes) - last_entry) / entries_per_second + ) + entries = [] + entries.append( + { + "first_chunk": 1, + "samples_per_chunk": entries_per_second, + "sample_description_index": 1, + } + ) + if last_entry != 0: + number_of_entries = 2 + entries.append( + { + "first_chunk": first_chunk_count + 1, + "samples_per_chunk": last_entry, + "sample_description_index": 1, + } + ) + + self.w(i2b(self.stsc_size())) + self.w(s2b("stsc")) + self.w(i2b(version, 1)) + self.w(i2b(flags, 3)) + self.w(i2b(number_of_entries)) + + for s in entries: + first_chunk: int = s["first_chunk"] + samples_per_chunk: int = s["samples_per_chunk"] + sample_description_index: int = s["sample_description_index"] + self.w(i2b(first_chunk)) + self.w(i2b(samples_per_chunk)) + self.w(i2b(sample_description_index)) + + def stco_size(self) -> int: + total_size: int = 16 + entries_per_second: int = int(round(self.sample_rate / 4096)) + last_entry: int = len(self.sample_sizes) % entries_per_second + first_chunk_count = int( + (len(self.sample_sizes) - last_entry) / entries_per_second + ) + number_of_stco_entries = first_chunk_count + if last_entry != 0: + number_of_stco_entries += 1 + total_size += 4 * number_of_stco_entries + return total_size + + def stco(self) -> None: + version: int = 0 + flags: int = 0 + self.offsets["stco"] = len(self.data) + entries_per_second: int = int(round(self.sample_rate / 4096)) + last_entry: int = len(self.sample_sizes) % entries_per_second + first_chunk_count = int( + (len(self.sample_sizes) - last_entry) / entries_per_second + ) + number_of_stco_entries = first_chunk_count + if last_entry != 0: + number_of_stco_entries += 1 + + self.w(i2b(self.stco_size())) + self.w(s2b("stco")) + self.w(i2b(version, 1)) + self.w(i2b(flags, 3)) + + self.w(i2b(number_of_stco_entries)) + + for s in range(number_of_stco_entries): + self.w(i2b(0xFFFFFFFF)) # placeholder value + + def free(self) -> None: + self.w(i2b(8)) + self.w(s2b("free")) + + def mdat(self) -> None: + self.offsets["mdat"] = len(self.data) + self.w(i2b(8 + len(self.mdat_data))) + self.w(s2b("mdat")) + self.w(self.mdat_data) + + def rewrite_stco_chunk(self) -> None: + stco_pos: int = self.offsets["stco"] + 16 + first_chunk_offset: int = self.offsets["mdat"] + 8 + entries_per_second: int = int(round(self.sample_rate / 4096)) + last_entry: int = len(self.sample_sizes) % entries_per_second + first_chunk_count = int( + (len(self.sample_sizes) - last_entry) / entries_per_second + ) + number_of_stco_entries = first_chunk_count + if last_entry != 0: + number_of_stco_entries += 1 + + for s in range(number_of_stco_entries): + bytes_to_write = i2b( + first_chunk_offset + (sum(self.sample_sizes[: entries_per_second * s])) + ) + for index, b in enumerate(bytes_to_write): + self.data[stco_pos + (s * 4) + index] = b + + def udta_size(self) -> int: + return 8 + self.meta_size() + + def udta(self) -> None: + self.w(i2b(self.udta_size())) + self.w(s2b("udta")) + self.meta() + + def meta_size(self) -> int: + total_size: int = 12 + total_size += self.meta_hdlr_size() + total_size += self.ilst_size() + return total_size + + def meta(self) -> None: + version: int = 0 + flags: int = 0 + + self.w(i2b(self.meta_size())) + self.w(s2b("meta")) + self.w(i2b(version, 1)) + self.w(i2b(flags, 3)) + + self.meta_hdlr() + self.ilst() + + def meta_hdlr_size(self) -> int: + return 33 + + def meta_hdlr(self) -> None: + version: int = 0 + flags: int = 0 + type_quicktime: int = 0 + metadata_type: str = "mdir" + manufacturer: str = "appl" + component_reserved_flags: int = 0 + component_reserved_flags_mask: int = 0 + component_type_name: int = 0 + + self.w(i2b(self.meta_hdlr_size())) + self.w(s2b("hdlr")) + self.w(i2b(version, 1)) + self.w(i2b(flags, 3)) + self.w(i2b(type_quicktime)) + self.w(s2b(metadata_type)) + self.w(s2b(manufacturer)) + self.w(i2b(component_reserved_flags)) + self.w(i2b(component_reserved_flags_mask)) + self.w(i2b(component_type_name, 1)) + + def ilst_size(self) -> int: + total_size: int = 8 + if self.tags.track_name: + total_size += 24 + len(self.tags.track_name.encode("utf-8")) + if self.tags.artist: + total_size += 24 + len(self.tags.artist.encode("utf-8")) + if self.tags.album_artist: + total_size += 24 + len(self.tags.album_artist.encode("utf-8")) + if self.tags.composer: + total_size += 24 + len(self.tags.composer.encode("utf-8")) + if self.tags.album_name: + total_size += 24 + len(self.tags.album_name.encode("utf-8")) + if self.tags.genre: + total_size += 24 + len(self.tags.genre.encode("utf-8")) + if self.tags.date: + total_size += 24 + len(self.tags.date.encode("utf-8")) + if self.tags.isrc: + total_size += 24 + len(self.tags.isrc.encode("utf-8")) + if self.tags.copyright: + total_size += 24 + len(self.tags.copyright.encode("utf-8")) + if self.tags.track_number or self.tags.total_number_of_tracks: + total_size += 32 + if self.tags.disc_number or self.tags.total_number_of_discs: + total_size += 32 + if self.tags.upc: + total_size += len(self.tags.upc.encode("utf-8")) + 64 + len("UPC") + if self.tags.label: + total_size += len(self.tags.label.encode("utf-8")) + 64 + len("LABEL") + if self.tags.apple_store_catalog_id: + total_size += 28 + if self.tags.playlist_id: + total_size += 28 + if self.tags.album_title_id: + total_size += 28 + if self.tags.cover_data: + total_size += 24 + len(self.tags.cover_data) + return total_size + + def write_mp4_tag_utf8(self, box: bytes, content: str): + b: bytearray = bytearray(content.encode("utf-8")) + data_size: int = len(b) + 16 + self.w(i2b(data_size + 8)) + self.w(box) + self.w(i2b(data_size)) + self.w(s2b("data")) + # 0 = binary; 1 = utf-8 + kind: int = 1 + language: int = 0 + self.w(i2b(kind)) + self.w(i2b(language)) + self.w(b) + + def write_mp4_tag_int(self, box: bytes, content: int): + kind: int = 21 # signed integer + language: int = 0 + size: int = 24 + len(box) + self.w(i2b(size)) + self.w(box) + size -= 8 + self.w(i2b(size)) + self.w(s2b("data")) + self.w(i2b(kind)) + self.w(i2b(language)) + self.w(i2b(content)) + + def write_itunes_tag_utf8(self, box: str, content: str): + b: bytearray = bytearray(content.encode("utf-8")) + full_size: int = len(b) + 64 + len(box) + self.w(i2b(full_size)) + self.w(s2b("----")) + mean_size: int = 28 + self.w(i2b(mean_size)) + self.w(s2b("mean")) + self.w(i2b(0)) # unknown + self.w(s2b("com.apple.iTunes")) + name_size: int = 12 + len(box) + self.w(i2b(name_size)) + self.w(s2b("name")) + self.w(i2b(0)) # unknown + self.w(s2b(box)) + data_size = len(b) + 16 + self.w(i2b(data_size)) + self.w(s2b("data")) + # 0 = binary; 1 = utf-8 + kind: int = 1 + language: int = 0 + self.w(i2b(kind)) + self.w(i2b(language)) + self.w(b) + + def write_mp4_tag_tuple_int(self, box: bytes, curr: int, total: int): + self.w(i2b(32)) # size + self.w(box) + self.w(i2b(24)) # data size + self.w(s2b("data")) + kind: int = 0 # binary + language: int = 0 + self.w(i2b(kind)) + self.w(i2b(language)) + self.w(i2b(0, 2)) # reserved + self.w(i2b(curr, 2)) + self.w(i2b(total, 2)) + self.w(i2b(0, 2)) # reserved + + def ilst(self) -> None: + self.w(i2b(self.ilst_size())) + self.w(s2b("ilst")) + + if self.tags.track_name: + self.write_mp4_tag_utf8(b"\xA9\x6E\x61\x6D", self.tags.track_name) # ©nam + if self.tags.artist: + self.write_mp4_tag_utf8(b"\xA9\x41\x52\x54", self.tags.artist) # ©ART + if self.tags.album_artist: + self.write_mp4_tag_utf8(b"\x61\x41\x52\x54", self.tags.album_artist) # aART + if self.tags.composer: + self.write_mp4_tag_utf8(b"\xA9\x77\x72\x74", self.tags.composer) # ©wrt + if self.tags.album_name: + self.write_mp4_tag_utf8(b"\xA9\x61\x6C\x62", self.tags.album_name) # ©alb + if self.tags.genre: + self.write_mp4_tag_utf8(b"\xA9\x67\x65\x6E", self.tags.genre) # ©gen + if self.tags.date: + self.write_mp4_tag_utf8(b"\xA9\x64\x61\x79", self.tags.date) # ©day + if self.tags.isrc: + self.write_mp4_tag_utf8(b"\x49\x53\x52\x43", self.tags.isrc) # ISRC + if self.tags.copyright: + self.write_mp4_tag_utf8(b"\x63\x70\x72\x74", self.tags.copyright) # cprt + + if self.tags.apple_store_catalog_id: + self.write_mp4_tag_int( + b"\x63\x6E\x49\x44", self.tags.apple_store_catalog_id + ) # cnID + if self.tags.playlist_id: + self.write_mp4_tag_int(b"\x70\x6C\x49\x44", self.tags.playlist_id) # plID + if self.tags.album_title_id: + self.write_mp4_tag_int( + b"\x61\x74\x49\x44", self.tags.album_title_id + ) # atID + + if self.tags.upc: + self.write_itunes_tag_utf8("UPC", self.tags.upc) + if self.tags.label: + self.write_itunes_tag_utf8("LABEL", self.tags.label) + + if self.tags.track_number or self.tags.total_number_of_tracks: + curr: int = 0 + if self.tags.track_number: + curr = self.tags.track_number + total: int = 0 + if self.tags.total_number_of_tracks: + total = self.tags.total_number_of_tracks + + self.write_mp4_tag_tuple_int(b"\x74\x72\x6B\x6E", curr, total) + + if self.tags.disc_number or self.tags.total_number_of_discs: + curr: int = 0 + if self.tags.disc_number: + curr = self.tags.disc_number + total: int = 0 + if self.tags.total_number_of_discs: + total = self.tags.total_number_of_discs + + self.write_mp4_tag_tuple_int(b"\x64\x69\x73\x6B", curr, total) + + if self.tags.cover_data: + if self.tags.cover_format == "jpeg": + kind: int = 13 + elif self.tags.cover_format == "png": + kind: int = 14 + language: int = 0 + b: bytearray = self.tags.cover_data + + full_size: int = len(b) + 24 + self.w(i2b(full_size)) + self.w(s2b("covr")) + data_size: int = full_size - 8 + self.w(i2b(data_size)) + self.w(s2b("data")) + self.w(i2b(kind)) + self.w(i2b(language)) + self.w(b) diff --git a/mp4muxer_eac3.py b/mp4muxer_eac3.py new file mode 100644 index 0000000..bec9795 --- /dev/null +++ b/mp4muxer_eac3.py @@ -0,0 +1,890 @@ +from datetime import datetime +from tags import Tags + + +def dt_to_unix_ts(ts: datetime) -> int: + return int(ts.timestamp()) + + +def dt_to_mp4_ts(ts: datetime) -> int: + return int(ts.timestamp()) + 2082844800 + + +def s2b(s: str): + return bytearray(s.encode("ascii")) + + +def i2b(i: int, size: int = 4): + return bytearray(i.to_bytes(size, "big")) + + +class MP4MuxerEAC3: + def __init__(self) -> None: + self.data: bytearray = bytearray() + self.timestamp: datetime = datetime.now() + self.sample_rate: int = 0 + self.number_of_samples: int = 0 + self.bit_depth: int = 0 + self.bit_rate: int = 0 + self.sample_delta: int = 1536 + self.default_sample_size: int = 3072 + self.channel_count: int = 2 + self.sample_sizes: list[int] = [] + # these are the only important ones + self.offsets: dict = { + "stco": 0, + "mdat": 0, + } + self.mdat_data: bytearray = bytearray() + self.tags: Tags = None + + def create(self) -> None: + self.ftyp() + self.moov() + self.free() + self.mdat() + self.rewrite_stco_chunk() + + def out(self, filename: str) -> None: + with open(filename, "wb") as f: + f.write(self.data) + + def w(self, b: bytearray): + if isinstance(b, bytes): + b = bytearray(b) + self.data.extend(b) + + def set_sample_rate(self, sr: int) -> None: + self.sample_rate = sr + + def set_number_of_samples(self, nr: int) -> None: + self.number_of_samples = nr + + def set_bit_depth(self, bd: int) -> None: + self.bit_depth = bd + + def set_bit_rate(self, br) -> None: + self.bit_rate = br + + def set_sample_sizes(self, ss: int) -> None: + self.sample_sizes = ss + + def set_mdat_data(self, m: bytearray) -> None: + self.mdat_data = m + + def set_tags(self, t: Tags) -> None: + self.tags = t + + def set_timestamp(self, t: datetime) -> None: + self.timestamp = t + + def ftyp(self) -> None: + major_brand: str = "mp42" + minor_version: int = 0 + compatible_brands: list[str] = ["mp42", "dby1", "isom"] + + # size + box string + major brand + minor version + size: int = 16 + for c in compatible_brands: + size += 4 + + self.w(i2b(size)) + self.w(s2b("ftyp")) + self.w(s2b(major_brand)) + self.w(i2b(minor_version)) + for c in compatible_brands: + self.w(s2b(c)) + + def moov_size(self) -> int: + total_size: int = 8 + total_size += self.mvhd_size() + total_size += self.trak_size() + total_size += self.iods_size() + if self.tags: + total_size += self.udta_size() + return total_size + + def moov(self) -> None: + self.w(i2b(self.moov_size())) + self.w(s2b("moov")) + self.mvhd() + self.trak() + self.iods() + if self.tags: + self.udta() + + def mvhd_size(self) -> int: + return 108 + + def mvhd(self) -> None: + version: int = 0 + creation_time: datetime = self.timestamp + modification_time: datetime = self.timestamp + time_scale: int = self.sample_rate + duration: int = int(self.sample_delta * len(self.sample_sizes)) + # 1.0 + rate: int = 0x10000 + # 1.0 + volume: int = 0x100 + next_track_id: int = 2 + + self.w(i2b(self.mvhd_size())) + self.w(s2b("mvhd")) + self.w(i2b(version)) + self.w(i2b(dt_to_mp4_ts(creation_time))) + self.w(i2b(dt_to_mp4_ts(modification_time))) + self.w(i2b(time_scale)) + self.w(i2b(duration)) + self.w(i2b(rate)) + self.w(i2b(volume, 2)) + + # const bit(16) reserved = 0 + self.w(i2b(0, 2)) + # const unsigned int(32)[2] reserved = 0 + self.w(i2b(0, 8)) + # template int(32)[9] matrix + # { 0x00010000,0,0,0,0x00010000,0,0,0,0x40000000 } + self.w(i2b(0x10000)) + self.w(i2b(0)) + self.w(i2b(0)) + self.w(i2b(0)) + self.w(i2b(0x10000)) + self.w(i2b(0)) + self.w(i2b(0)) + self.w(i2b(0)) + self.w(i2b(0x40000000)) + # Unity matrix + # bit(32)[6] pre_defined = 0 + self.w(i2b(0, 24)) + self.w(i2b(next_track_id)) + + def trak_size(self) -> int: + total_size: int = 8 + total_size += self.tkhd_size() + total_size += self.mdia_size() + return total_size + + def trak(self) -> None: + self.w(i2b(self.trak_size())) + self.w(s2b("trak")) + self.tkhd() + self.mdia() + + def tkhd_size(self) -> int: + return 92 + + def tkhd(self) -> None: + flags: int = 15 + creation_time: datetime = self.timestamp + modification_time: datetime = self.timestamp + track_id: int = 1 + duration: int = int(self.sample_delta * len(self.sample_sizes)) + layer: int = 0 + alternate_group: int = 2 + # 1.0 + volume: int = 0x100 + width: int = 0 + height: int = 0 + + self.w(i2b(self.tkhd_size())) + self.w(s2b("tkhd")) + + self.w(i2b(flags)) + self.w(i2b(dt_to_mp4_ts(creation_time))) + self.w(i2b(dt_to_mp4_ts(modification_time))) + self.w(i2b(track_id)) + + # const unsigned int (32) reserved = 0 + self.w(i2b(0)) + self.w(i2b(duration)) + # reserved + self.w(i2b(0)) + # const unsigned int (32) [2] reserved = 0 + self.w(i2b(0)) + self.w(i2b(layer, 2)) + self.w(i2b(alternate_group, 2)) + # template int (16) volume = {if track_is_audio 0x0100 else 0} + self.w(i2b(volume, 2)) + # const unsigned int (16) reserved = 0 + self.w(i2b(0, 2)) + # template int (32) [9] matrix + # { 0x00010000,0,0,0,0x00010000,0,0,0,0x40000000 } + self.w(i2b(0x10000)) + self.w(i2b(0)) + self.w(i2b(0)) + self.w(i2b(0)) + self.w(i2b(0x10000)) + self.w(i2b(0)) + self.w(i2b(0)) + self.w(i2b(0)) + self.w(i2b(0x40000000)) + # Unity matrix + # unsigned int (32) width + self.w(i2b(width)) + # unsigned int (32) height + self.w(i2b(height)) + + def mdia_size(self) -> int: + total_size: int = 8 + total_size += self.mdhd_size() + total_size += self.hdlr_size() + total_size += self.minf_size() + return total_size + + def mdia(self) -> None: + self.w(i2b(self.mdia_size())) + self.w(s2b("mdia")) + self.mdhd() + self.hdlr() + self.minf() + + def mdhd_size(self) -> int: + return 32 + + def mdhd(self) -> None: + version: int = 0 + flags: int = 0 + creation_time: datetime = self.timestamp + modification_time: datetime = self.timestamp + time_scale: int = self.sample_rate + duration: int = int(self.sample_delta * len(self.sample_sizes)) + language: int = 0x55C4 # undefined + quality: int = 0 + + self.w(i2b(self.mdhd_size())) + self.w(s2b("mdhd")) + self.w(i2b(version, 1)) + self.w(i2b(flags, 3)) + self.w(i2b(dt_to_mp4_ts(creation_time))) + self.w(i2b(dt_to_mp4_ts(modification_time))) + self.w(i2b(time_scale)) + self.w(i2b(duration)) + self.w(i2b(language, 2)) + self.w(i2b(quality, 2)) + + def hdlr_size(self) -> int: + return 46 + + def hdlr(self) -> None: + version = 0 + flags = 0 + component_type = "mhlr" + component_subtype = "soun" + component_manufacturer = 0 + component_flags = 0 + component_flags_mask = 0 + component_name = "sound handler" + + self.w(i2b(self.hdlr_size())) + self.w(s2b("hdlr")) + self.w(i2b(version, 1)) + self.w(i2b(flags, 3)) + self.w(s2b(component_type)) + self.w(s2b(component_subtype)) + self.w(i2b(component_manufacturer)) + self.w(i2b(component_flags)) + self.w(i2b(component_flags_mask)) + self.w(s2b(component_name)) + self.w(bytearray(b"\x00")) # terminating null byte + + def minf_size(self) -> int: + total_size: int = 8 + total_size += self.smhd_size() + total_size += self.dinf_size() + total_size += self.stbl_size() + return total_size + + def minf(self) -> None: + self.w(i2b(self.minf_size())) + self.w(s2b("minf")) + self.smhd() + self.dinf() + self.stbl() + + def smhd_size(self) -> int: + return 16 + + def smhd(self) -> None: + version: int = 0 + flags: int = 0 + audio_balance: int = 0 + + self.w(i2b(self.smhd_size())) + self.w(s2b("smhd")) + self.w(i2b(version, 1)) + self.w(i2b(flags, 3)) + self.w(i2b(audio_balance, 2)) + self.w(i2b(0, 2)) # reserved + + def dinf_size(self) -> int: + total_size: int = 8 + total_size += self.dref_size() + return total_size + + def dinf(self) -> None: + self.w(i2b(self.dinf_size())) + self.w(s2b("dinf")) + self.dref() + pass + + def dref_size(self) -> int: + return 28 + + def dref(self) -> None: + version: int = 0 + flags: int = 0 + entry_count: int = 1 + self.w(i2b(self.dref_size())) + self.w(s2b("dref")) + self.w(i2b(version, 1)) + self.w(i2b(flags, 3)) + self.w(i2b(entry_count)) + + data_location_size: int = 12 + data_location_name: str = "url " + data_location_version: int = 0 + data_location_flags: int = 1 # same file + + self.w(i2b(data_location_size)) + self.w(s2b(data_location_name)) + self.w(i2b(data_location_version, 1)) + self.w(i2b(data_location_flags, 3)) + + def stbl_size(self) -> int: + total_size: int = 8 + total_size += self.stsd_size() + total_size += self.stts_size() + total_size += self.stsz_size() + total_size += self.stsc_size() + total_size += self.stco_size() + return total_size + + def stbl(self) -> None: + self.data.extend(self.stbl_size().to_bytes(4, "big")) + self.data.extend("stbl".encode("ascii")) + self.stsd() + self.stts() + self.stsz() + self.stsc() + self.stco() + + def stsd_size(self) -> int: + return 67 + + def stsd(self) -> None: + version: int = 0 + flags: int = 0 + count: int = 1 + + audio_size = 51 + audio_name = "ec-3" + data_reference_index = 1 + channel_count = 2 + sample_size = self.bit_depth + sample_rate = self.sample_rate + + self.w(i2b(self.stsd_size())) + self.w(s2b("stsd")) + self.w(i2b(version, 1)) + self.w(i2b(flags, 3)) + self.w(i2b(count)) + self.w(i2b(audio_size)) + self.w(s2b(audio_name)) + self.w(i2b(0, 6)) # reserved + self.w(i2b(data_reference_index, 2)) # data reference index + self.w(i2b(0)) # reserved + self.w(i2b(0)) # reserved + self.w(i2b(channel_count, 2)) + self.w(i2b(sample_size, 2)) + self.w(i2b(0, 2)) # pre-defined + self.w(i2b(0, 2)) # reserved + self.w(i2b(sample_rate, 2)) + self.w(i2b(0, 2)) # sample rate (again? set to zero for some reason) + + # EAC3 specific box + eac3_size = 15 + eac3_name = "dec3" + + self.data.extend(eac3_size.to_bytes(4, "big")) + self.data.extend(eac3_name.encode("ascii")) + + data_rate = self.bit_rate # 13 bits + num_ind_sub = 0 # 3 bits + + b = data_rate << 3 + b += num_ind_sub + + self.data.extend(b.to_bytes(2, "big")) + + # independed substrem + + fscod = 0 # 2 bits + bsid = 16 # 5 bits + reserved_bit_1 = 0 # 1 bit + asvc = 0 # 1 bit + bsmod = 0 # 3 bits + acmod = 7 # 3 bits + lfeon = 1 # 1 bit + reserved_bit_2 = 0 # 3 bits + num_dep_sub = 0 # 4 bits + reserved_bit_3 = 0 # 1 bit + + b = fscod << (8 + 8 + 6) + b += bsid << (8 + 8 + 1) + b += reserved_bit_1 << (8 + 7) + b += asvc << (8 + 6) + b += bsmod << (8 + 4) + b += acmod << (8 + 1) + b += lfeon << 8 + b += reserved_bit_2 << 5 + b += num_dep_sub << 1 + b += reserved_bit_3 + + self.data.extend(b.to_bytes(3, "big")) + + # JOC extension + # both values 1 byte each + ec3_job_flag = 1 + joc_complexity_index = 16 + + self.data.extend(ec3_job_flag.to_bytes(1, "big")) + self.data.extend(joc_complexity_index.to_bytes(1, "big")) + + def stts_size(self) -> int: + return 24 + + def stts(self) -> None: + version: int = 0 + flags: int = 0 + number_of_entries: int = 1 + sample_count: int = len(self.sample_sizes) + + self.w(i2b(self.stts_size())) + self.w(s2b("stts")) + self.w(i2b(version, 1)) + self.w(i2b(flags, 3)) + self.w(i2b(number_of_entries)) + self.w(i2b(sample_count)) + self.w(i2b(self.sample_delta)) + + def stsz_size(self) -> int: + total_size: int = 20 + total_size += 4 * len(self.sample_sizes) + return total_size + + def stsz(self) -> None: + version: int = 0 + flags: int = 0 + sample_size: int = self.default_sample_size + sample_count: int = len(self.sample_sizes) + + self.w(i2b(self.stsz_size())) + self.w(s2b("stsz")) + self.w(i2b(version, 1)) + self.w(i2b(flags, 3)) + self.w(i2b(sample_size)) + self.w(i2b(sample_count)) + + for s in self.sample_sizes: + self.w(i2b(s)) + + def stsc_size(self) -> int: + number_of_entries: int = 1 + entries_per_second: int = int(round(self.sample_rate / self.sample_delta)) + last_entry: int = len(self.sample_sizes) % entries_per_second + if last_entry != 0: + number_of_entries = 2 + + total_size: int = 16 + (number_of_entries * 12) + return total_size + + # last chunk is forced to 1536, so we only need one entry + def stsc(self) -> None: + version: int = 0 + flags: int = 0 + number_of_entries: int = 1 + entries_per_second: int = int(round(self.sample_rate / self.sample_delta)) + last_entry: int = len(self.sample_sizes) % entries_per_second + first_chunk_count = int( + (len(self.sample_sizes) - last_entry) / entries_per_second + ) + entries = [] + entries.append( + { + "first_chunk": 1, + "samples_per_chunk": entries_per_second, + "sample_description_index": 1, + } + ) + if last_entry != 0: + number_of_entries = 2 + entries.append( + { + "first_chunk": first_chunk_count + 1, + "samples_per_chunk": last_entry, + "sample_description_index": 1, + } + ) + + self.w(i2b(self.stsc_size())) + self.w(s2b("stsc")) + self.w(i2b(version, 1)) + self.w(i2b(flags, 3)) + self.w(i2b(number_of_entries)) + + for s in entries: + first_chunk: int = s["first_chunk"] + samples_per_chunk: int = s["samples_per_chunk"] + sample_description_index: int = s["sample_description_index"] + self.w(i2b(first_chunk)) + self.w(i2b(samples_per_chunk)) + self.w(i2b(sample_description_index)) + + def stco_size(self) -> int: + total_size: int = 16 + entries_per_second: int = int(round(self.sample_rate / self.sample_delta)) + last_entry: int = len(self.sample_sizes) % entries_per_second + first_chunk_count = int( + (len(self.sample_sizes) - last_entry) / entries_per_second + ) + number_of_stco_entries = first_chunk_count + if last_entry != 0: + number_of_stco_entries += 1 + total_size += 4 * number_of_stco_entries + return total_size + + def stco(self) -> None: + version: int = 0 + flags: int = 0 + self.offsets["stco"] = len(self.data) + entries_per_second: int = int(round(self.sample_rate / self.sample_delta)) + last_entry: int = len(self.sample_sizes) % entries_per_second + first_chunk_count = int( + (len(self.sample_sizes) - last_entry) / entries_per_second + ) + number_of_stco_entries = first_chunk_count + if last_entry != 0: + number_of_stco_entries += 1 + + self.w(i2b(self.stco_size())) + self.w(s2b("stco")) + self.w(i2b(version, 1)) + self.w(i2b(flags, 3)) + + self.w(i2b(number_of_stco_entries)) + + for s in range(number_of_stco_entries): + self.w(i2b(0xFFFFFFFF)) # placeholder value + + def free(self) -> None: + self.w(i2b(8)) + self.w(s2b("free")) + + def mdat(self) -> None: + self.offsets["mdat"] = len(self.data) + self.w(i2b(8 + len(self.mdat_data))) + self.w(s2b("mdat")) + self.w(self.mdat_data) + + def rewrite_stco_chunk(self) -> None: + stco_pos: int = self.offsets["stco"] + 16 + first_chunk_offset: int = self.offsets["mdat"] + 8 + entries_per_second: int = int(round(self.sample_rate / self.sample_delta)) + last_entry: int = len(self.sample_sizes) % entries_per_second + first_chunk_count = int( + (len(self.sample_sizes) - last_entry) / entries_per_second + ) + number_of_stco_entries = first_chunk_count + if last_entry != 0: + number_of_stco_entries += 1 + + for s in range(number_of_stco_entries): + bytes_to_write = i2b( + first_chunk_offset + (sum(self.sample_sizes[: entries_per_second * s])) + ) + for index, b in enumerate(bytes_to_write): + self.data[stco_pos + (s * 4) + index] = b + + def udta_size(self) -> int: + return 8 + self.meta_size() + + def udta(self) -> None: + self.w(i2b(self.udta_size())) + self.w(s2b("udta")) + self.meta() + + def meta_size(self) -> int: + total_size: int = 12 + total_size += self.meta_hdlr_size() + total_size += self.ilst_size() + return total_size + + def meta(self) -> None: + version: int = 0 + flags: int = 0 + + self.w(i2b(self.meta_size())) + self.w(s2b("meta")) + self.w(i2b(version, 1)) + self.w(i2b(flags, 3)) + + self.meta_hdlr() + self.ilst() + + def meta_hdlr_size(self) -> int: + return 33 + + def meta_hdlr(self) -> None: + version: int = 0 + flags: int = 0 + type_quicktime: int = 0 + metadata_type: str = "mdir" + manufacturer: str = "appl" + component_reserved_flags: int = 0 + component_reserved_flags_mask: int = 0 + component_type_name: int = 0 + + self.w(i2b(self.meta_hdlr_size())) + self.w(s2b("hdlr")) + self.w(i2b(version, 1)) + self.w(i2b(flags, 3)) + self.w(i2b(type_quicktime)) + self.w(s2b(metadata_type)) + self.w(s2b(manufacturer)) + self.w(i2b(component_reserved_flags)) + self.w(i2b(component_reserved_flags_mask)) + self.w(i2b(component_type_name, 1)) + + def ilst_size(self) -> int: + total_size: int = 8 + if self.tags.track_name: + total_size += 24 + len(self.tags.track_name.encode("utf-8")) + if self.tags.artist: + total_size += 24 + len(self.tags.artist.encode("utf-8")) + if self.tags.album_artist: + total_size += 24 + len(self.tags.album_artist.encode("utf-8")) + if self.tags.composer: + total_size += 24 + len(self.tags.composer.encode("utf-8")) + if self.tags.album_name: + total_size += 24 + len(self.tags.album_name.encode("utf-8")) + if self.tags.genre: + total_size += 24 + len(self.tags.genre.encode("utf-8")) + if self.tags.date: + total_size += 24 + len(self.tags.date.encode("utf-8")) + if self.tags.isrc: + total_size += 24 + len(self.tags.isrc.encode("utf-8")) + if self.tags.copyright: + total_size += 24 + len(self.tags.copyright.encode("utf-8")) + if self.tags.track_number or self.tags.total_number_of_tracks: + total_size += 32 + if self.tags.disc_number or self.tags.total_number_of_discs: + total_size += 32 + if self.tags.upc: + total_size += len(self.tags.upc.encode("utf-8")) + 64 + len("UPC") + if self.tags.label: + total_size += len(self.tags.label.encode("utf-8")) + 64 + len("LABEL") + if self.tags.apple_store_catalog_id: + total_size += 28 + if self.tags.playlist_id: + total_size += 28 + if self.tags.album_title_id: + total_size += 28 + if self.tags.cover_data: + total_size += 24 + len(self.tags.cover_data) + return total_size + + def write_mp4_tag_utf8(self, box: bytes, content: str): + b: bytearray = bytearray(content.encode("utf-8")) + data_size: int = len(b) + 16 + self.w(i2b(data_size + 8)) + self.w(box) + self.w(i2b(data_size)) + self.w(s2b("data")) + # 0 = binary; 1 = utf-8 + kind: int = 1 + language: int = 0 + self.w(i2b(kind)) + self.w(i2b(language)) + self.w(b) + + def write_mp4_tag_int(self, box: bytes, content: int): + kind: int = 21 # signed integer + language: int = 0 + size: int = 24 + len(box) + self.w(i2b(size)) + self.w(box) + size -= 8 + self.w(i2b(size)) + self.w(s2b("data")) + self.w(i2b(kind)) + self.w(i2b(language)) + self.w(i2b(content)) + + def write_itunes_tag_utf8(self, box: str, content: str): + b: bytearray = bytearray(content.encode("utf-8")) + full_size: int = len(b) + 64 + len(box) + self.w(i2b(full_size)) + self.w(s2b("----")) + mean_size: int = 28 + self.w(i2b(mean_size)) + self.w(s2b("mean")) + self.w(i2b(0)) # unknown + self.w(s2b("com.apple.iTunes")) + name_size: int = 12 + len(box) + self.w(i2b(name_size)) + self.w(s2b("name")) + self.w(i2b(0)) # unknown + self.w(s2b(box)) + data_size = len(b) + 16 + self.w(i2b(data_size)) + self.w(s2b("data")) + # 0 = binary; 1 = utf-8 + kind: int = 1 + language: int = 0 + self.w(i2b(kind)) + self.w(i2b(language)) + self.w(b) + + def write_mp4_tag_tuple_int(self, box: bytes, curr: int, total: int): + self.w(i2b(32)) # size + self.w(box) + self.w(i2b(24)) # data size + self.w(s2b("data")) + kind: int = 0 # binary + language: int = 0 + self.w(i2b(kind)) + self.w(i2b(language)) + self.w(i2b(0, 2)) # reserved + self.w(i2b(curr, 2)) + self.w(i2b(total, 2)) + self.w(i2b(0, 2)) # reserved + + def ilst(self) -> None: + self.w(i2b(self.ilst_size())) + self.w(s2b("ilst")) + + if self.tags.track_name: + self.write_mp4_tag_utf8(b"\xA9\x6E\x61\x6D", self.tags.track_name) # ©nam + if self.tags.artist: + self.write_mp4_tag_utf8(b"\xA9\x41\x52\x54", self.tags.artist) # ©ART + if self.tags.album_artist: + self.write_mp4_tag_utf8(b"\x61\x41\x52\x54", self.tags.album_artist) # aART + if self.tags.composer: + self.write_mp4_tag_utf8(b"\xA9\x77\x72\x74", self.tags.composer) # ©wrt + if self.tags.album_name: + self.write_mp4_tag_utf8(b"\xA9\x61\x6C\x62", self.tags.album_name) # ©alb + if self.tags.genre: + self.write_mp4_tag_utf8(b"\xA9\x67\x65\x6E", self.tags.genre) # ©gen + if self.tags.date: + self.write_mp4_tag_utf8(b"\xA9\x64\x61\x79", self.tags.date) # ©day + if self.tags.isrc: + self.write_mp4_tag_utf8(b"\x49\x53\x52\x43", self.tags.isrc) # ISRC + if self.tags.copyright: + self.write_mp4_tag_utf8(b"\x63\x70\x72\x74", self.tags.copyright) # cprt + + if self.tags.apple_store_catalog_id: + self.write_mp4_tag_int( + b"\x63\x6E\x49\x44", self.tags.apple_store_catalog_id + ) # cnID + if self.tags.playlist_id: + self.write_mp4_tag_int(b"\x70\x6C\x49\x44", self.tags.playlist_id) # plID + if self.tags.album_title_id: + self.write_mp4_tag_int( + b"\x61\x74\x49\x44", self.tags.album_title_id + ) # atID + + if self.tags.upc: + self.write_itunes_tag_utf8("UPC", self.tags.upc) + if self.tags.label: + self.write_itunes_tag_utf8("LABEL", self.tags.label) + + if self.tags.track_number or self.tags.total_number_of_tracks: + curr: int = 0 + if self.tags.track_number: + curr = self.tags.track_number + total: int = 0 + if self.tags.total_number_of_tracks: + total = self.tags.total_number_of_tracks + + self.write_mp4_tag_tuple_int(b"\x74\x72\x6B\x6E", curr, total) + + if self.tags.disc_number or self.tags.total_number_of_discs: + curr: int = 0 + if self.tags.disc_number: + curr = self.tags.disc_number + total: int = 0 + if self.tags.total_number_of_discs: + total = self.tags.total_number_of_discs + + self.write_mp4_tag_tuple_int(b"\x64\x69\x73\x6B", curr, total) + + if self.tags.cover_data: + if self.tags.cover_format == "jpeg": + kind: int = 13 + elif self.tags.cover_format == "png": + kind: int = 14 + language: int = 0 + b: bytearray = self.tags.cover_data + + full_size: int = len(b) + 24 + self.w(i2b(full_size)) + self.w(s2b("covr")) + data_size: int = full_size - 8 + self.w(i2b(data_size)) + self.w(s2b("data")) + self.w(i2b(kind)) + self.w(i2b(language)) + self.w(b) + + def iods_size(self) -> int: + return 27 + + def iods(self) -> None: + self.w(i2b(self.iods_size())) + self.w(s2b("iods")) + + version: int = 0 + flags: int = 0 + self.w(i2b(version, 1)) + self.w(i2b(flags, 3)) + + # MP4_IOD_Tag + # header + iod_tag_type: int = 16 + iod_tag_size: int = 13 + + object_descriptor_id: int = 1 # 10 bits + url_flag: int = 0 # 1 bit + include_inline_profile_level_flag: int = 0 # 1 bit + reserved_bits: int = 15 # 4 bits + + od_profile_level_indication: int = 255 + scene_profile_level_indication: int = 255 + audio_profile_level_indication: int = 255 + visual_profile_level_indication: int = 255 + graphics_profile_level_indication: int = 255 + + self.w(i2b(iod_tag_type, 1)) + self.w(i2b(iod_tag_size, 1)) + + b: int = object_descriptor_id << 6 + b += url_flag << 5 + b += include_inline_profile_level_flag << 4 + b += reserved_bits + + self.w(i2b(b, 2)) + + self.w(i2b(od_profile_level_indication, 1)) + self.w(i2b(scene_profile_level_indication, 1)) + self.w(i2b(audio_profile_level_indication, 1)) + self.w(i2b(visual_profile_level_indication, 1)) + self.w(i2b(graphics_profile_level_indication, 1)) + + # ES_ID_IncTag + es_id_type: int = 14 + es_id_size: int = 4 + es_id_track_id: int = 1 + + self.w(i2b(es_id_type, 1)) + self.w(i2b(es_id_size, 1)) + self.w(i2b(es_id_track_id, 4)) diff --git a/tags.py b/tags.py new file mode 100644 index 0000000..a8cb210 --- /dev/null +++ b/tags.py @@ -0,0 +1,22 @@ +class Tags: + def __init__(self): + self.track_name: str = None + self.artist: str = None + self.album_name: str = None + self.album_artist: str = None + self.track_number: int = None + self.total_number_of_tracks: int = None + self.disc_number: int = None + self.total_number_of_discs: int = None + self.genre: str = None + self.composer: str = None + self.isrc: str = None + self.copyright: str = None + self.label: str = None + self.upc: str = None + self.apple_store_catalog_id: int = None + self.album_title_id: int = None + self.playlist_id: int = None + self.date: str = None + self.cover_data: bytearray = None + self.cover_format: str = None