Last active
November 28, 2023 14:08
-
-
Save Filarius/684a053ca1395e30f0db18842fede081 to your computer and use it in GitHub Desktop.
flibusta repacker
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import pool | |
import zipfile as zf | |
import json | |
import os.path | |
import base64 | |
def remove_binary(text, zipname = None, fb2name=None, image_zip=None): | |
ts = text.split(b'<binary') | |
image_cnt = 10000 | |
images_to_zip = [] | |
for i in range(len(ts)): | |
if i == 0: | |
continue | |
x = ts[i] | |
p1 = x.find(b'<') | |
p2 = x.find(b'</binary') | |
if p1 != p2: | |
raise Exception("bad file") | |
idx1 = x.find(b'id="')+len(b'id="') | |
if idx1 < 4 : # idx1 < len(b'id="') | |
x = x.replace(b"'",b'"') | |
idx1 = x.find(b'id="') + len(b'id="') | |
idx2 = x.find(b'"',idx1) | |
id = x[idx1:idx2] | |
p11 = x.find(b'>') + 1 | |
data = x[p11:p2] | |
#data = data.replace(b'\r\n',b'') | |
''' | |
try: | |
data = base64.b64decode(data) | |
except: | |
print('base64====') | |
print(fb2name) | |
print(data) | |
print('==========') | |
print(ts[i]) | |
raise Exception('bad base64') | |
''' | |
#ts[i] = x[p2 + len("</binary>"):] | |
image_cnt += 1 | |
x = b'<binary' + x[:p11+1] + x[p2:] | |
ts[i] = x | |
name = zipname +'_'+ fb2name +'_'+str(image_cnt)+'_'+id.decode("latin") | |
images_to_zip.append((name, data)) | |
for n,d in images_to_zip: | |
try: | |
image_zip.writestr(n,d) | |
except Exception as e: | |
print(n) | |
raise e | |
return b''.join(ts) | |
def covert(filename, path, savepath, filelist, gen_filter, lang_filter, strict, noimage): | |
fzip = zf.ZipFile(path + filename , mode='r') | |
wzip = zf.ZipFile(savepath + filename, mode='w', compression=zf.ZIP_DEFLATED, compresslevel=9) | |
image_zip = zf.ZipFile(savepath + filename + '_images.zip', mode='w', compression=zf.ZIP_DEFLATED, compresslevel=9) | |
try: | |
for item in filelist: | |
if len(lang_filter) > 0: | |
lang = item['lang'] | |
if lang not in lang_filter: | |
continue | |
if len(gen_filter) > 0: | |
genres = item['genres'] | |
comp = gen_filter.intersection(genres) | |
if strict: | |
if len(comp) != len(genres): | |
continue | |
else: | |
if len(comp) == 0: | |
continue | |
fname = item['filename'] | |
data = fzip.read(fname) | |
if noimage: | |
try: | |
data = remove_binary(data, filename, fname, image_zip) | |
except Exception: | |
import traceback | |
traceback.print_exc() | |
wzip.writestr(fname, data=data, compress_type=zf.ZIP_DEFLATED, compresslevel=9) | |
except Exception: | |
import traceback | |
traceback.print_exc() | |
fzip.close() | |
wzip.close() | |
image_zip.close() | |
print("repacked: " + filename) | |
def extract_genres(cfg): | |
print('LOADING GENRES') | |
fzip = zf.ZipFile(cfg['path_to_flibusta_fb2_local.inpx'], mode='r') | |
fileiter = fzip.infolist() | |
genres = set() | |
langs = set() | |
c = dict() | |
i = 0 | |
for file in fileiter: | |
s = file.filename[-4:] | |
if file.filename[-4:]=='.inp': | |
data = fzip.read(file) | |
data = data.split(bytes(chr(4),"ascii")) | |
datagenres = data[1::14] | |
items = set() | |
for item in datagenres: | |
item = item.split(b':')[:-1] | |
items.update(item) | |
#data = data.split(b':')[:-1] | |
genres.update(items) | |
datalangs = data[11::14] | |
items = set() | |
for item in datalangs: | |
item = item[:2] | |
item = item.decode('latin').lower() | |
i += 1 | |
if item not in c: | |
c[item] = 1 | |
else: | |
c[item] = c[item] + 1 | |
items.add(item) | |
langs.update(items) | |
print("inspecting: ", file.filename) | |
print('GENRES LOADED, NOW YOU CAN SET GENRES IGNORE FILTER IN CONFIG FILE') | |
genres = list(genres) | |
genres.sort() | |
langs = list(langs) | |
langs.sort() | |
return genres,langs | |
def load_database(cfg): | |
print('LOADING BOOKS INFO') | |
fzip = zf.ZipFile(cfg['path_to_flibusta_fb2_local.inpx'], mode='r') | |
filelist = fzip.infolist() | |
db = dict() | |
for file in filelist: | |
books = [] | |
if file.filename[-4:] == '.inp': | |
data = fzip.read(file) | |
data = data.split(b"\r\n")[:-1] | |
for book in data: | |
b = {} | |
l = book.split(bytes(chr(4),"ascii"))[:-1] | |
#l = list(map(lambda x:x.decode('latin'),list(l))) | |
b['filename'] = l[5].decode('latin')+'.fb2' | |
if l[5] != l[7]: | |
print(l[5],l[7]) | |
raise Exception('l[5],l[7]') | |
g = l[1].split(b':')[:-1] | |
g = list(map(lambda x: x.decode("latin"), g)) | |
b['genres'] = set(g) | |
lang = l[11][:2].decode('latin').lower() | |
b['lang'] = lang | |
books.append(b) | |
print("read: ", file.filename) | |
if len(books)>0: | |
db[file.filename[:-4]+'.zip'] = books | |
print('BOOKS INFO LOADED') | |
return db | |
def create_empty_cfg(): | |
cfg = dict() | |
cfg['path_to_flibusta_fb2_local.inpx'] = 'path/flibusta_fb2_local.inpx' | |
cfg['path_to_zip_repack_folder'] = 'path/' | |
cfg['genres'] = [] | |
cfg['languages'] = [] | |
cfg['strict_filter'] = False | |
cfg['remove_images'] = True | |
cfg['multicore'] = 4 | |
with open('config.txt', 'w') as f: | |
json.dump(cfg, f, indent=4) | |
if __name__ == '__main__': | |
if not os.path.isfile('config.txt'): | |
print('config.txt does not exists, dummy config was created, please specify path to .INPX and re-run') | |
create_empty_cfg() | |
exit(1) | |
with open('config.txt') as f: | |
cfg = json.load(f) | |
if not os.path.isfile(cfg['path_to_flibusta_fb2_local.inpx']): | |
print('flibusta_fb2_local.inpx does not exists') | |
exit(1) | |
if len(cfg['genres'])==0: | |
genres, langs = extract_genres(cfg) | |
cfg['genres'] = list(map(lambda x:x.decode("latin"),list(genres))) | |
cfg['languages'] = langs | |
with open('config.txt', 'w') as f: | |
json.dump(cfg, f, indent=4) | |
exit(0) | |
db = load_database(cfg) | |
gen_filter = set(cfg['genres']) | |
lang_filter = set(cfg['languages']) | |
#parent = psutil.Process() | |
#parent.nice(psutil.IDLE_PRIORITY_CLASS) | |
#listdir = os.listdir("D:/-TORRENT/fb2.Flibusta.Net/") | |
p = pool.Pool(int(cfg['multicore'])) | |
path = os.path.dirname(cfg['path_to_flibusta_fb2_local.inpx'])+'/' | |
for key,val in db.items(): | |
p.apply_async(covert, (key, path, | |
cfg['path_to_zip_repack_folder'], | |
val, | |
gen_filter, lang_filter, | |
cfg['strict_filter'], | |
cfg['remove_images'])) | |
""" | |
covert(key, path, | |
cfg['path_to_zip_repack_folder'], | |
val, | |
gen_filter, lang_filter, | |
cfg['strict_filter'], | |
cfg['remove_images']) | |
""" | |
p.close() | |
p.join() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import subprocess | |
path = "D:\\-TORRENT\\fb2.Flibusta.Net\\" | |
filelist = os.listdir(path) | |
for f in filelist: | |
if f[-1] == 'p': | |
os.chdir("E:\\") | |
comm = ['C:\Program Files\\7-Zip\\7z.exe', | |
"e", | |
path+f, | |
'-aoa', | |
"-mmt=8"] | |
subprocess.run(comm) | |
#os.chdir("D:\\fb2.no_image.flibusta.net.202201\\32\\32\\") | |
comm = ['C:\\Program Files\\7-Zip\\7z.exe', | |
"a", | |
"-t7z", | |
path + "repack\\" + f +".7z9", | |
"*.*", | |
"-m0=LZMA2", | |
#"-m0=PPMd", | |
"-mx=9", | |
"-mmt=2", | |
"-sdel" | |
] | |
s = " ".join(comm) | |
subprocess.run(comm) | |
print("=======================") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment