Add Cython and Password Support

This combines the password and Cython PRs into one.
This commit is contained in:
Theelgirl 2020-10-13 12:14:39 +00:00 committed by GitHub
parent 0e1eac766b
commit a51da7dd51
1 changed files with 148 additions and 79 deletions

View File

@ -1,35 +1,101 @@
from bitstring import Bits, BitArray
from magic import Magic
import mimetypes
from PIL import Image
import glob
from operator import sub
import numpy as np
from tqdm import tqdm
import ffmpeg
import binascii
import argparse
import sys
import os
import getpass
import io
import gzip
import pickle
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from Crypto.Cipher import AES
try:
from fvid_cython import cy_get_bits_from_image as cy_gbfi
except (ImportError, ModuleNotFoundError):
use_cython = False
else:
use_cython = True
DELIMITER = bin(int.from_bytes("HELLO MY NAME IS ALFREDO".encode(), "big"))
FRAMES_DIR = "./fvid_frames/"
SALT = '63929291bca3c602de64352a4d4bfe69'.encode() # It need be the same in one instance of coding/decoding
DEFAULT_KEY = ' '*32
DEFAULT_KEY = DEFAULT_KEY.encode()
NOTDEBUG = True
class WrongPassword(Exception):
pass
class MissingArgument(Exception):
pass
def get_password(password_provided):
if password_provided=='default':
return DEFAULT_KEY
else:
if password_provided == None:
password_provided = getpass.getpass("Enter password:")
password = str(password_provided).encode()
kdf = PBKDF2HMAC(
algorithm=hashes.SHA512(),
length=32,
salt=SALT,
iterations=100000,
backend=default_backend()
)
key = kdf.derive(password)
return key
def get_bits_from_file(filepath):
def get_bits_from_file(filepath, key):
print('Reading file...')
bitarray = BitArray(filename=filepath)
# adding a delimiter to know when the file ends to avoid corrupted files
# when retrieving
bitarray.append(DELIMITER)
cipher = AES.new(key, AES.MODE_EAX, nonce=SALT)
ciphertext, tag = cipher.encrypt_and_digest(bitarray.tobytes())
filename = os.path.basename(filepath)
pickled = pickle.dumps({'tag':tag,
'data':ciphertext,
'filename':filepath})
print('Ziping...')
#zip
out = io.BytesIO()
with gzip.GzipFile(fileobj=out, mode='w') as fo:
fo.write(pickled)
zip = out.getvalue()
#zip
del bitarray
del pickled
bitarray = BitArray(zip)
return bitarray.bin
def less(val1, val2):
return val1 < val2
def get_bits_from_image(image):
if use_cython:
bits = cy_gbfi(image)
return bits, False
width, height = image.size
done = False
@ -37,28 +103,13 @@ def get_bits_from_image(image):
px = image.load()
bits = ""
delimiter_str = DELIMITER.replace("0b", "")
delimiter_length = len(delimiter_str)
pbar = tqdm(range(height), desc="Getting bits from frame")
pbar = range(height)
white = (255, 255, 255)
black = (0, 0, 0)
for y in pbar:
for x in range(width):
# check if we have hit the delimiter
if bits[-delimiter_length:] == delimiter_str:
# remove delimiter from bit data to have an exact one to one
# copy when decoding
bits = bits[: len(bits) - delimiter_length]
pbar.close()
return (bits, True)
pixel = px[x, y]
pixel_bin_rep = "0"
@ -89,19 +140,19 @@ def get_bits_from_image(image):
def get_bits_from_video(video_filepath):
# get image sequence from video
print('Reading video...')
image_sequence = []
ffmpeg.input(video_filepath).output(
f"{FRAMES_DIR}decoded_frames%03d.png"
).run(quiet=True)
os.system('ffmpeg -i ' + video_filepath + ' ./fvid_frames/decoded_frames_%d.png');
for filename in glob.glob(f"{FRAMES_DIR}decoded_frames*.png"):
# for filename in glob.glob(f"{FRAMES_DIR}decoded_frames*.png"):
for filename in sorted(glob.glob(f"{FRAMES_DIR}decoded_frames*.png"), key=os.path.getmtime) :
image_sequence.append(Image.open(filename))
bits = ""
sequence_length = len(image_sequence)
for index in range(sequence_length):
print('Bits are in place')
for index in tqdm(range(sequence_length)):
b, done = get_bits_from_image(image_sequence[index])
bits += b
@ -112,20 +163,43 @@ def get_bits_from_video(video_filepath):
return bits
def save_bits_to_file(file_path, bits):
def save_bits_to_file(file_path, bits, key):
# get file extension
bitstring = Bits(bin=bits)
mime = Magic(mime=True)
mime_type = mime.from_buffer(bitstring.tobytes())
#zip
print('Unziping...')
in_ = io.BytesIO()
in_.write(bitstring.bytes)
in_.seek(0)
with gzip.GzipFile(fileobj=in_, mode='rb') as fo:
bitstring = fo.read()
#zip
unpickled = pickle.loads(bitstring)
tag = unpickled['tag']
ciphertext = unpickled['data']
filename = unpickled['filename']
cipher = AES.new(key, AES.MODE_EAX, nonce=SALT)
bitstring = cipher.decrypt(ciphertext)
print('Checking integrity...')
try:
cipher.verify(tag)
except ValueError:
raise WrongPassword("Key incorrect or message corrupted")
bitstring = BitArray(bitstring)
# If filepath not passed in use defualt
# otherwise used passed in filepath
if file_path == None:
filepath = f"file{mimetypes.guess_extension(type=mime_type)}"
filepath = filename
else:
filepath = file_path
filepath = file_path # No need for mime Magic
with open(
filepath, "wb"
@ -133,15 +207,6 @@ def save_bits_to_file(file_path, bits):
bitstring.tofile(f)
def make_image(bit_set, resolution=(1920, 1080)):
width, height = resolution
image = Image.new("1", (width, height))
image.putdata(bit_set)
return image
def split_list_by_n(lst, n):
for i in range(0, len(lst), n):
@ -155,41 +220,41 @@ def make_image_sequence(bitstring, resolution=(1920, 1080)):
set_size = width * height
# bit_sequence = []
bit_sequence = split_list_by_n(list(map(int, bitstring)), width * height)
image_bits = []
print('Making image sequence')
print('Cutting...')
#bitlist = list(tqdm(split_list_by_n(bitstring, set_size)))
bitlist = list(split_list_by_n(bitstring, set_size))
del bitstring
bitlist[-1] = bitlist[-1] + '0'*(set_size - len(bitlist[-1]))
# using bit_sequence to make image sequence
index = 1
bitlist = bitlist[::-1]
print('Saving frames...')
for _ in tqdm(range(len(bitlist))):
bitl = bitlist.pop()
# for bitl in tqdm(bitlist):
# image_bits = list(map(int, tqdm(bitl)))
image_bits = list(map(int, bitl))
# print(image_bits)
image_sequence = []
for bit_set in bit_sequence:
image_sequence.append(make_image(bit_set))
return image_sequence
image = Image.new("1", (width, height))
image.putdata(image_bits)
image.save(
f"{FRAMES_DIR}encoded_frames_{index}.png"
)
index += 1
def make_video(output_filepath, image_sequence, framerate="1/5"):
def make_video(output_filepath, framerate="1/5"):
if output_filepath == None:
outputfile = "file.mp4"
else:
outputfile = output_filepath
frames = glob.glob(f"{FRAMES_DIR}encoded_frames*.png")
# for one frame
if len(frames) == 1:
ffmpeg.input(frames[0], loop=1, t=1).output(
outputfile, vcodec="libx264rgb"
).run(quiet=True)
else:
ffmpeg.input(
f"{FRAMES_DIR}encoded_frames*.png",
pattern_type="glob",
framerate=framerate,
).output(outputfile, vcodec="libx264rgb").run(quiet=True)
os.system('ffmpeg -r ' + framerate + ' -i ./fvid_frames/encoded_frames_%d.png -c:v libx264rgb ' + outputfile)
@ -219,11 +284,20 @@ def main():
parser.add_argument("-i", "--input", help="input file", required=True)
parser.add_argument("-o", "--output", help="output path")
parser.add_argument("-f", "--framerate", help="set framerate for encoding (as a fraction)", default="1/5", type=str)
parser.add_argument("-p", "--password", help="set password", nargs="?", type=str, default='default')
args = parser.parse_args()
setup()
if not NOTDEBUG:
print('args', args)
print('PASSWORD', args.password, [len(args.password) if len(args.password) is not None else None for _ in range(0)])
if not args.decode and not args.encode:
raise MissingArgument('You should use either --encode or --decode!') #check for arguments
key = get_password(args.password)
if args.decode:
bits = get_bits_from_video(args.input)
@ -232,7 +306,7 @@ def main():
if args.output:
file_path = args.output
save_bits_to_file(file_path, bits)
save_bits_to_file(file_path, bits, key)
elif args.encode:
# isdigit has the benefit of being True and raising an error if the user passes a negative string
@ -240,22 +314,17 @@ def main():
if (not args.framerate.isdigit() and "/" not in args.framerate) or all(x in args.framerate for x in ("-", "/")):
raise NotImplementedError("The framerate must be a positive fraction or an integer for now, like 3, '1/3', or '1/5'!")
# get bits from file
bits = get_bits_from_file(args.input)
bits = get_bits_from_file(args.input, key)
# create image sequence
image_sequence = make_image_sequence(bits)
make_image_sequence(bits)
# save images
for index in range(len(image_sequence)):
image_sequence[index].save(
f"{FRAMES_DIR}encoded_frames_{index}.png"
)
video_file_path = None
if args.output:
video_file_path = args.output
make_video(video_file_path, image_sequence, args.framerate)
make_video(video_file_path, args.framerate)
cleanup()