forked from boostcampaitech6/level2-3-cv-finalproject-cv-08
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Feat : Add data V1 OLKAVS refine function
- Loading branch information
1 parent
a571787
commit c4d8d96
Showing
5 changed files
with
245 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
import os | ||
import json | ||
from pydub import AudioSegment | ||
|
||
def process_wav_and_json(wav_file: str, json_file: str, wav_file_name: str, save_folder: str): | ||
""" wav 파일을 받아 json 에서 필요한 부분을 잘라내 형태에 맞게 저장할 코드 | ||
Args: | ||
wav_file (str): json 파일과 매칭되는 wav_file 경로 | ||
json_file (str): wav_file croping에 사용될 json_file 경로 | ||
wav_file_name (str): folder명 지정에 사용할 wav_file 자체 이름 | ||
save_folder (str): 정제된 파일을 저장할 경로 | ||
""" | ||
count = 1 | ||
audio = AudioSegment.from_wav(wav_file) | ||
|
||
# 저장할 폴더 생성 | ||
wav_file_folder = wav_file_name.split("_")[:-2] | ||
wav_file_folder = "_".join(wav_file_folder) | ||
wav_file_folder = os.path.join(save_folder,'data', wav_file_folder,'audio') | ||
os.makedirs(wav_file_folder, exist_ok=True) | ||
|
||
# json 파일 열기 | ||
with open(json_file, 'r') as f: | ||
data = json.load(f) | ||
data= data[0] | ||
|
||
# 각 시간대별로 돌아가며 동영상 croping | ||
for sentence_info in data['Sentence_info']: | ||
start_time = round(int(sentence_info['start_time']), 3) | ||
end_time = round(int(sentence_info['end_time']), 3) | ||
time = end_time - start_time | ||
if time <3 or time > 10 : | ||
continue | ||
|
||
start_time_ms = start_time * 1000 | ||
end_time_ms = end_time * 1000 | ||
|
||
|
||
segment = audio[start_time_ms:end_time_ms] | ||
output_wav_file = f"{wav_file_name.split('_')[-1]}_{count:03d}.wav" | ||
|
||
output_wav_path = os.path.join(wav_file_folder, output_wav_file) | ||
|
||
segment.export(output_wav_path, format='wav') | ||
count += 1 | ||
|
||
print(f"Saved {output_wav_path}") | ||
|
||
|
||
def find_matching_json(file_name: str, json_folder: str) ->str : | ||
"""wav 파일과 파일명이 똑같은 json 파일을 찾아 경로 반환, | ||
만약 없다면 None 반환 | ||
Args: | ||
file_name (str): wav 파일 이름 가져오기 | ||
json_folder (str): json이 저장된 위치 확인 | ||
Returns: | ||
str: wav 파일에 해당하는 json 파일 반환 | ||
""" | ||
|
||
for root, dirs, files in os.walk(json_folder): | ||
for file in files: | ||
if file.split('.')[0] == file_name: | ||
json_file = os.path.join(root, file) | ||
return json_file | ||
return None | ||
|
||
def find_wav(wav_folder: str, json_folder: str, save_folder: str): | ||
""" 처음 정제할 wav 파일 찾기 | ||
이후 json 파일을 찾아 wav를 정제한다. | ||
Args: | ||
wav_folder (str): wav 파일들이 저장된 폴더 위치 | ||
json_folder (str): json 파일들이 저장된 폴더 위치 | ||
save_folder (str): croping 된 파일들을 저장할 위치 | ||
""" | ||
|
||
for root,_,files in os.walk(wav_folder): | ||
for file in files: | ||
if file.endswith('.wav'): | ||
wav_file = os.path.join(root, file) | ||
wav_file_name = wav_file.split('/')[-1].split('.')[0] | ||
json_file = find_matching_json(wav_file_name, json_folder) | ||
|
||
if json_file: | ||
process_wav_and_json(wav_file, json_file, wav_file_name, save_folder) | ||
os.remove(wav_file) | ||
|
||
|
||
def main(wav_folder: str, json_folder: str, save_folder: str): | ||
find_wav(wav_folder, json_folder, save_folder) | ||
print('End of processing') | ||
|
||
|
||
|
||
|
||
if __name__ == '__main__': | ||
# WAV 파일이 들어있는 폴더 경로 | ||
wav_folder = '/home/carbox/Desktop/data/009.립리딩(입모양) 음성인식 데이터/01.데이터/2.Validation/원천데이터' | ||
# JSON 파일이 들어있는 폴더 경로 | ||
json_folder = '/home/carbox/Desktop/data/009.립리딩(입모양) 음성인식 데이터/01.데이터/2.Validation/라벨링데이터' | ||
# 원천데이터를 저장할 폴더 경로 | ||
save_folder = '/home/carbox' | ||
|
||
main(wav_folder, json_folder, save_folder) |
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
import cv2 | ||
import os | ||
from rembg import remove | ||
|
||
def process_video(video_path: str, save_folder: str): | ||
"""Video 에서 사람의 얼굴이 정면이고, 눈을 뜨고 잇을 때 캡쳐 및 배경 제거 | ||
Args: | ||
video_path (str): 타겟 비디오 파일경로 | ||
save_folder (str): 이미지를 저장할 위치 | ||
""" | ||
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_alt2.xml') | ||
eye_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_eye_tree_eyeglasses.xml') | ||
|
||
cap = cv2.VideoCapture(video_path) | ||
|
||
while True: | ||
ret, frame = cap.read() | ||
if not ret: | ||
break | ||
|
||
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | ||
faces = face_cascade.detectMultiScale(gray, 1.1, 4) | ||
|
||
for (x, y, w, h) in faces: | ||
# 얼굴 영역을 30% 확장 | ||
expansion_rate = 0.3 | ||
new_width = int(w * (1 + expansion_rate)) | ||
new_height = int(h * (1 + expansion_rate)) | ||
new_x = x - int(w * expansion_rate / 2) | ||
new_y = y - int(h * expansion_rate / 2) | ||
|
||
# 확장된 영역이 프레임을 벗어나지 않도록 조정 | ||
new_x = max(0, new_x) | ||
new_y = max(0, new_y) | ||
new_width = min(new_width, frame.shape[1] - new_x) | ||
new_height = min(new_height, frame.shape[0] - new_y) | ||
|
||
roi_color = frame[new_y:new_y+new_height, new_x:new_x+new_width] | ||
eyes = eye_cascade.detectMultiScale(cv2.cvtColor(roi_color, cv2.COLOR_BGR2GRAY)) | ||
if len(eyes) >= 2: # 두 눈이 검출되면 | ||
face_img = cv2.resize(roi_color, (256, 256)) | ||
face_img = remove(face_img, bgcolor=(255, 255, 255, 255)) | ||
|
||
video_name = "_".join(os.path.splitext(os.path.basename(video_path))[0].split("_")[:-2]) | ||
origin_file_folder = os.path.join(save_folder,'data', video_name) | ||
os.makedirs(origin_file_folder, exist_ok=True) | ||
|
||
cv2.imwrite(f'{os.path.join(origin_file_folder, video_name)}.png', face_img) | ||
cap.release() | ||
return | ||
|
||
print(os.path.basename(video_path)) | ||
cap.release() | ||
|
||
|
||
def main(video_folder: str, save_folder: str): | ||
""" 비디오 폴더 경로에서 .mp4 파일 선택, | ||
정제 이후 파일 제거 | ||
Args: | ||
video_folder (str): 탐색할 비디오가 있는 경로 | ||
save_folder (str): 이미지 저장을 위한 파일 | ||
""" | ||
for root,_,files in os.walk(video_folder): | ||
for file in files: | ||
if file.endswith('.mp4'): | ||
video_file = os.path.join(root, file) | ||
process_video(video_file, save_folder) | ||
os.remove(os.path.join(root,file)) | ||
|
||
|
||
if __name__ == '__main__': | ||
video_folder = '/home/carbox/Desktop/data/009.립리딩(입모양) 음성인식 데이터/01.데이터/2.Validation/원천데이터' | ||
save_folder = "/home/carbox" | ||
main(video_folder, save_folder) | ||
|
||
|
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
import os | ||
import re | ||
import tarfile | ||
|
||
def extract_tar_if_needed(root_folder: str): | ||
""" tar 파일을 추출하는 함수 | ||
Args: | ||
root_folder (str): tar 파일이 들어있는 폴더 경로 | ||
""" | ||
for root, dirs, files in os.walk(root_folder): | ||
for file in files: | ||
if file.endswith('.tar'): | ||
# .tar 파일과 같은 이름을 가진 폴더가 있는지 확인 | ||
tar_path = os.path.join(root, file) | ||
folder_name = os.path.splitext(tar_path)[0] | ||
if not os.path.exists(folder_name): | ||
# 동일한 이름의 폴더가 없으면 압축 해제 | ||
print(f"Extracting {tar_path}") | ||
with tarfile.open(tar_path) as tar: | ||
tar.extractall(path=root) | ||
print(f"Extracted to {folder_name}") | ||
# 압축 해제 후 dirs 리스트 업데이트 | ||
dirs.append(os.path.basename(folder_name)) | ||
|
||
# 추출 후 삭제 | ||
os.remove(os.path.join(root, file)) | ||
print(f"Deleted {file} ") | ||
|
||
|
||
|
||
def delete_files_except_extensions(root_folder: str, keep_extensions: list, pattern: str): | ||
""" 정제에 필요한 파일을 제외한 다른 파일들 삭제 | ||
Args: | ||
root_folder (str): 타겟 폴더 | ||
keep_extensions (list): 유지할 확장자 wav, 혹은 끝나는 부분 (A_001.mp4) 확인 | ||
pattern (str): 유지할 파일명 패턴 | ||
""" | ||
for root, dirs, files in os.walk(root_folder): | ||
for file in files: | ||
if not any(file.endswith(ext) for ext in keep_extensions): | ||
if re.search(pattern, file): | ||
continue | ||
file_path = os.path.join(root, file) | ||
os.remove(file_path) | ||
print(f"Deleted {file}") | ||
|
||
def main(root_folder: str, keep_extensions: list, pattern: str): | ||
# 먼저 .tar 파일이 있으면 압축을 해제 | ||
extract_tar_if_needed(root_folder) | ||
# 압축 해제 후 (해당되는 경우), 지정된 확장자 외의 파일 삭제 | ||
delete_files_except_extensions(root_folder, keep_extensions, pattern) | ||
print("End of processing") | ||
|
||
if __name__ == '__main__': | ||
root_folder = '~/Desktop/GPU/009.립리딩(입모양) 음성인식 데이터/01.데이터/2.Validation/원천데이터' # 탐색을 시작할 폴더 경로 | ||
keep_extensions = ['A_001.mp4', '.wav'] # 유지하고 싶은 파일 확장자 목록 | ||
pattern = r'.*_A_.*\.json' | ||
main(root_folder, keep_extensions, pattern) |