|
import pickle |
|
import struct |
|
import traceback |
|
|
|
import cv2 |
|
import numpy as np |
|
|
|
from core import imagelib |
|
from core.cv2ex import * |
|
from core.imagelib import SegIEPolys |
|
from core.interact import interact as io |
|
from core.structex import * |
|
from facelib import FaceType |
|
|
|
|
|
class DFLJPG(object): |
|
def __init__(self, filename): |
|
self.filename = filename |
|
self.data = b"" |
|
self.length = 0 |
|
self.chunks = [] |
|
self.dfl_dict = None |
|
self.shape = None |
|
self.img = None |
|
|
|
@staticmethod |
|
def load_raw(filename, loader_func=None): |
|
try: |
|
if loader_func is not None: |
|
data = loader_func(filename) |
|
else: |
|
with open(filename, "rb") as f: |
|
data = f.read() |
|
except: |
|
raise FileNotFoundError(filename) |
|
|
|
try: |
|
inst = DFLJPG(filename) |
|
inst.data = data |
|
inst.length = len(data) |
|
inst_length = inst.length |
|
chunks = [] |
|
data_counter = 0 |
|
while data_counter < inst_length: |
|
chunk_m_l, chunk_m_h = struct.unpack ("BB", data[data_counter:data_counter+2]) |
|
data_counter += 2 |
|
|
|
if chunk_m_l != 0xFF: |
|
raise ValueError(f"No Valid JPG info in {filename}") |
|
|
|
chunk_name = None |
|
chunk_size = None |
|
chunk_data = None |
|
chunk_ex_data = None |
|
is_unk_chunk = False |
|
|
|
if chunk_m_h & 0xF0 == 0xD0: |
|
n = chunk_m_h & 0x0F |
|
|
|
if n >= 0 and n <= 7: |
|
chunk_name = "RST%d" % (n) |
|
chunk_size = 0 |
|
elif n == 0x8: |
|
chunk_name = "SOI" |
|
chunk_size = 0 |
|
if len(chunks) != 0: |
|
raise Exception("") |
|
elif n == 0x9: |
|
chunk_name = "EOI" |
|
chunk_size = 0 |
|
elif n == 0xA: |
|
chunk_name = "SOS" |
|
elif n == 0xB: |
|
chunk_name = "DQT" |
|
elif n == 0xD: |
|
chunk_name = "DRI" |
|
chunk_size = 2 |
|
else: |
|
is_unk_chunk = True |
|
elif chunk_m_h & 0xF0 == 0xC0: |
|
n = chunk_m_h & 0x0F |
|
if n == 0: |
|
chunk_name = "SOF0" |
|
elif n == 2: |
|
chunk_name = "SOF2" |
|
elif n == 4: |
|
chunk_name = "DHT" |
|
else: |
|
is_unk_chunk = True |
|
elif chunk_m_h & 0xF0 == 0xE0: |
|
n = chunk_m_h & 0x0F |
|
chunk_name = "APP%d" % (n) |
|
else: |
|
is_unk_chunk = True |
|
|
|
|
|
|
|
|
|
|
|
if chunk_size == None: |
|
chunk_size, = struct.unpack (">H", data[data_counter:data_counter+2]) |
|
chunk_size -= 2 |
|
data_counter += 2 |
|
|
|
if chunk_size > 0: |
|
chunk_data = data[data_counter:data_counter+chunk_size] |
|
data_counter += chunk_size |
|
|
|
if chunk_name == "SOS": |
|
c = data_counter |
|
while c < inst_length and (data[c] != 0xFF or data[c+1] != 0xD9): |
|
c += 1 |
|
|
|
chunk_ex_data = data[data_counter:c] |
|
data_counter = c |
|
|
|
chunks.append ({'name' : chunk_name, |
|
'm_h' : chunk_m_h, |
|
'data' : chunk_data, |
|
'ex_data' : chunk_ex_data, |
|
}) |
|
inst.chunks = chunks |
|
|
|
return inst |
|
except Exception as e: |
|
raise Exception (f"Corrupted JPG file {filename} {e}") |
|
|
|
@staticmethod |
|
def load(filename, loader_func=None): |
|
try: |
|
inst = DFLJPG.load_raw (filename, loader_func=loader_func) |
|
inst.dfl_dict = {} |
|
|
|
for chunk in inst.chunks: |
|
if chunk['name'] == 'APP0': |
|
d, c = chunk['data'], 0 |
|
c, id, _ = struct_unpack (d, c, "=4sB") |
|
|
|
if id == b"JFIF": |
|
c, ver_major, ver_minor, units, Xdensity, Ydensity, Xthumbnail, Ythumbnail = struct_unpack (d, c, "=BBBHHBB") |
|
else: |
|
raise Exception("Unknown jpeg ID: %s" % (id) ) |
|
elif chunk['name'] == 'SOF0' or chunk['name'] == 'SOF2': |
|
d, c = chunk['data'], 0 |
|
c, precision, height, width = struct_unpack (d, c, ">BHH") |
|
inst.shape = (height, width, 3) |
|
|
|
elif chunk['name'] == 'APP15': |
|
if type(chunk['data']) == bytes: |
|
inst.dfl_dict = pickle.loads(chunk['data']) |
|
|
|
return inst |
|
except Exception as e: |
|
io.log_err (f'Exception occured while DFLJPG.load : {traceback.format_exc()}') |
|
return None |
|
|
|
def has_data(self): |
|
return len(self.dfl_dict.keys()) != 0 |
|
|
|
def save(self): |
|
try: |
|
with open(self.filename, "wb") as f: |
|
f.write ( self.dump() ) |
|
except: |
|
raise Exception( f'cannot save {self.filename}' ) |
|
|
|
def dump(self): |
|
data = b"" |
|
|
|
dict_data = self.dfl_dict |
|
|
|
|
|
for key in list(dict_data.keys()): |
|
if dict_data[key] is None: |
|
dict_data.pop(key) |
|
|
|
for chunk in self.chunks: |
|
if chunk['name'] == 'APP15': |
|
self.chunks.remove(chunk) |
|
break |
|
|
|
last_app_chunk = 0 |
|
for i, chunk in enumerate (self.chunks): |
|
if chunk['m_h'] & 0xF0 == 0xE0: |
|
last_app_chunk = i |
|
|
|
dflchunk = {'name' : 'APP15', |
|
'm_h' : 0xEF, |
|
'data' : pickle.dumps(dict_data), |
|
'ex_data' : None, |
|
} |
|
self.chunks.insert (last_app_chunk+1, dflchunk) |
|
|
|
|
|
for chunk in self.chunks: |
|
data += struct.pack ("BB", 0xFF, chunk['m_h'] ) |
|
chunk_data = chunk['data'] |
|
if chunk_data is not None: |
|
data += struct.pack (">H", len(chunk_data)+2 ) |
|
data += chunk_data |
|
|
|
chunk_ex_data = chunk['ex_data'] |
|
if chunk_ex_data is not None: |
|
data += chunk_ex_data |
|
|
|
return data |
|
|
|
def get_img(self): |
|
if self.img is None: |
|
self.img = cv2_imread(self.filename) |
|
return self.img |
|
|
|
def get_shape(self): |
|
if self.shape is None: |
|
img = self.get_img() |
|
if img is not None: |
|
self.shape = img.shape |
|
return self.shape |
|
|
|
def get_height(self): |
|
for chunk in self.chunks: |
|
if type(chunk) == IHDR: |
|
return chunk.height |
|
return 0 |
|
|
|
def get_dict(self): |
|
return self.dfl_dict |
|
|
|
def set_dict (self, dict_data=None): |
|
self.dfl_dict = dict_data |
|
|
|
def get_face_type(self): return self.dfl_dict.get('face_type', FaceType.toString (FaceType.FULL) ) |
|
def set_face_type(self, face_type): self.dfl_dict['face_type'] = face_type |
|
|
|
def get_landmarks(self): return np.array ( self.dfl_dict['landmarks'] ) |
|
def set_landmarks(self, landmarks): self.dfl_dict['landmarks'] = landmarks |
|
|
|
def get_eyebrows_expand_mod(self): return self.dfl_dict.get ('eyebrows_expand_mod', 1.0) |
|
def set_eyebrows_expand_mod(self, eyebrows_expand_mod): self.dfl_dict['eyebrows_expand_mod'] = eyebrows_expand_mod |
|
|
|
def get_source_filename(self): return self.dfl_dict.get ('source_filename', None) |
|
def set_source_filename(self, source_filename): self.dfl_dict['source_filename'] = source_filename |
|
|
|
def get_source_rect(self): return self.dfl_dict.get ('source_rect', None) |
|
def set_source_rect(self, source_rect): self.dfl_dict['source_rect'] = source_rect |
|
|
|
def get_source_landmarks(self): return np.array ( self.dfl_dict.get('source_landmarks', None) ) |
|
def set_source_landmarks(self, source_landmarks): self.dfl_dict['source_landmarks'] = source_landmarks |
|
|
|
def get_image_to_face_mat(self): |
|
mat = self.dfl_dict.get ('image_to_face_mat', None) |
|
if mat is not None: |
|
return np.array (mat) |
|
return None |
|
def set_image_to_face_mat(self, image_to_face_mat): self.dfl_dict['image_to_face_mat'] = image_to_face_mat |
|
|
|
def has_seg_ie_polys(self): |
|
return self.dfl_dict.get('seg_ie_polys',None) is not None |
|
|
|
def get_seg_ie_polys(self): |
|
d = self.dfl_dict.get('seg_ie_polys',None) |
|
if d is not None: |
|
d = SegIEPolys.load(d) |
|
else: |
|
d = SegIEPolys() |
|
|
|
return d |
|
|
|
def set_seg_ie_polys(self, seg_ie_polys): |
|
if seg_ie_polys is not None: |
|
if not isinstance(seg_ie_polys, SegIEPolys): |
|
raise ValueError('seg_ie_polys should be instance of SegIEPolys') |
|
|
|
if seg_ie_polys.has_polys(): |
|
seg_ie_polys = seg_ie_polys.dump() |
|
else: |
|
seg_ie_polys = None |
|
|
|
self.dfl_dict['seg_ie_polys'] = seg_ie_polys |
|
|
|
def has_xseg_mask(self): |
|
return self.dfl_dict.get('xseg_mask',None) is not None |
|
|
|
def get_xseg_mask_compressed(self): |
|
mask_buf = self.dfl_dict.get('xseg_mask',None) |
|
if mask_buf is None: |
|
return None |
|
|
|
return mask_buf |
|
|
|
def get_xseg_mask(self): |
|
mask_buf = self.dfl_dict.get('xseg_mask',None) |
|
if mask_buf is None: |
|
return None |
|
|
|
img = cv2.imdecode(mask_buf, cv2.IMREAD_UNCHANGED) |
|
if len(img.shape) == 2: |
|
img = img[...,None] |
|
|
|
return img.astype(np.float32) / 255.0 |
|
|
|
|
|
def set_xseg_mask(self, mask_a): |
|
if mask_a is None: |
|
self.dfl_dict['xseg_mask'] = None |
|
return |
|
|
|
mask_a = imagelib.normalize_channels(mask_a, 1) |
|
img_data = np.clip( mask_a*255, 0, 255 ).astype(np.uint8) |
|
|
|
data_max_len = 50000 |
|
|
|
ret, buf = cv2.imencode('.png', img_data) |
|
|
|
if not ret or len(buf) > data_max_len: |
|
for jpeg_quality in range(100,-1,-1): |
|
ret, buf = cv2.imencode( '.jpg', img_data, [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_quality] ) |
|
if ret and len(buf) <= data_max_len: |
|
break |
|
|
|
if not ret: |
|
raise Exception("set_xseg_mask: unable to generate image data for set_xseg_mask") |
|
|
|
self.dfl_dict['xseg_mask'] = buf |
|
|