Skip to content

Commit

Permalink
Merge pull request boostcampaitech6#5 from boostcampaitech6/feat/data
Browse files Browse the repository at this point in the history
[FEAT] Data Code Merge
  • Loading branch information
taeyang916 authored Mar 27, 2024
2 parents cf2433c + a571787 commit e167a68
Show file tree
Hide file tree
Showing 14 changed files with 1,113 additions and 0 deletions.
52 changes: 52 additions & 0 deletions data/audio/audio_check_dB.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import librosa
import numpy as np
import matplotlib.pyplot as plt

'''
You can determine the minimum, maximum, and average dB values
to set a threshold for identifying voice regions based on dB levels.
After visually inspecting the waveform and setting a threshold,
adding 80 to it, you can conveniently apply this threshold value to `audio_crop.py`.
'''

# Load audio file
audio_path = "voice2face-data/audio/input.wav"
y, sr = librosa.load(audio_path, sr=None)

# Calculate spectrum and check maximum and minimum dB values
D = librosa.amplitude_to_db(librosa.stft(y), ref=np.max)
max_db = np.max(D)
min_db = np.min(D)

# Set threshold value
threshold_db = -60

# Consider regions with dB values above the threshold as voice regions
voice_indices = np.where(D > threshold_db)

print("Threshold:", threshold_db)
print("Maximum dB value in regions with voice:", np.max(D[voice_indices]))
print("Minimum dB value in regions with voice:", np.min(D[voice_indices]))

# Calculate average dB value in regions with voice
average_db = np.mean(D[voice_indices])
print("Average dB value in regions with voice:", average_db)

# Plot waveform and spectrum
plt.figure(figsize=(12, 6))

# Plot waveform
plt.subplot(2, 1, 1)
plt.plot(y)
plt.title("Waveform")
plt.xlabel("Sample")
plt.ylabel("Amplitude")

# Plot spectrum
plt.subplot(2, 1, 2)
librosa.display.specshow(D, sr=sr, x_axis='time', y_axis='log')
plt.colorbar(format='%+2.0f dB')
plt.title('Log-frequency power spectrogram')

plt.tight_layout()
plt.show()
107 changes: 107 additions & 0 deletions data/audio/audio_crop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import os
import librosa
import soundfile as sf
import matplotlib.pyplot as plt
from pydub import AudioSegment

'''
Extracts human voice segments from an audio file and creates a new audio file with the detected voice segments
within a 10-second duration.
Args:
audio_file (str): Path to the input audio file. If the file format is .m4a, it will be converted to .wav.
Returns:
save_file (str): Path to the saved audio file with detected voice segments.
'''

def detect_human_voice(audio_file):
'''
Detects human voice segments in an audio file.
Args:
audio_file (str): Path to the input audio file.
Returns:
voice_indices (list): List containing indices of the detected voice segments.
'''
# Read the audio file
y, sr = librosa.load(audio_file, sr=None)

# Detect voice activity
# ----- Need to Modify threshold-----#
voice_segments = librosa.effects.split(y, top_db=18)

# Generate indices of voice segments
voice_indices = []
for start, end in voice_segments:
voice_indices.extend(range(start, end))

return voice_indices

def save_full_audio_with_detected_voice(audio_file, save_file):
'''
Saves the full audio file with detected voice segments.
Args:
audio_file (str): Path to the input audio file.
save_file (str): Path to save the audio file with detected voice segments.
'''
# Read the entire audio file
y, sr = librosa.load(audio_file, sr=None)

# Detect human voice segments and get their indices
voice_indices = detect_human_voice(audio_file)

# Extract human voice segments using the indices
combined_audio = y[voice_indices]

# Save the extracted audio segments to a file
sf.write(save_file, combined_audio, sr)

# Visualize and save the waveform of the original and detected voice segments
plt.figure(figsize=(12, 6))

# Original audio waveform
plt.subplot(2, 1, 1)
plt.plot(y)
plt.title("Original Audio Waveform")
plt.xlabel("Sample")
plt.ylabel("Amplitude")

# Waveform of detected voice segments
plt.subplot(2, 1, 2)
plt.plot(combined_audio)
plt.title("Detected Voice Waveform")
plt.xlabel("Sample")
plt.ylabel("Amplitude")

plt.tight_layout()
save_path = os.path.join(os.path.dirname(save_file), 'result')
if not os.path.exists(save_path):
os.makedirs(save_path)
save_file_path = os.path.join(save_path, os.path.basename(save_file[:-4] + "_waveform_comparison.png"))
plt.savefig(save_file_path)

# Save the extracted audio segments to a file
audio_save_file_path = os.path.join(save_path, os.path.basename(save_file))
sf.write(audio_save_file_path, combined_audio, sr)

plt.show()

# Define paths for the original file and the file to save with detected voice segments
# ------Need to modify path------ #
audio_file_path = "voice2face-data/audio/input.m4a"
save_file_path = "voice2face-data/audio/detected_voice.wav"

# Check if the file extension is ".m4a" for conversion and processing
if audio_file_path.endswith('.m4a'):
# Convert m4a file to wav format
wav_path = audio_file_path[:-4] + ".wav"
audio = AudioSegment.from_file(audio_file_path)
audio.export(wav_path, format="wav")
# Process the converted wav file
save_full_audio_with_detected_voice(wav_path, save_file_path)
else:
# Process the original file without conversion
save_full_audio_with_detected_voice(audio_file_path, save_file_path)
139 changes: 139 additions & 0 deletions data/crawling/crawling_detect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import os
import pandas as pd
from moviepy.editor import VideoFileClip
import numpy as np
import face_recognition
import shutil

'''
Detects faces and audio in video clips and refines them.
Extracts faces from the video clips and selects segments with audio to rebuild new videos.
New videos are organized in the "processed_videos" folder.
'''

# Function to extract audio from video clips with detected faces
def extract_audio_with_face(video_clip, start_time, end_time):
'''
Extracts audio from a video clip with detected faces within a specified time range.
Args:
video_clip (VideoFileClip): Input video clip.
start_time (float): Start time of the segment containing the detected faces.
end_time (float): End time of the segment containing the detected faces.
Returns:
audio (AudioClip): Extracted audio clip.
'''
audio = video_clip.audio.subclip(start_time, end_time)
return audio

# Function to extract audio from video clips with detected faces in multiple segments
def extract_audio_with_faces(video_clip, face_detections):
'''
Extracts audio from a video clip with detected faces in multiple segments.
Args:
video_clip (VideoFileClip): Input video clip.
face_detections (list): List of tuples containing start and end times of segments with detected faces.
Returns:
final_audio (ndarray): Concatenated audio array from all detected face segments.
'''
audio_clips = []

for start_time, end_time in face_detections:
audio_clip = extract_audio_with_face(video_clip, start_time, end_time)
audio_clips.append(audio_clip)

final_audio = np.concatenate([clip.to_soundarray() for clip in audio_clips])
return final_audio

# Function to detect faces in video clips
def detect_faces(video_clip):
'''
Detects faces in a video clip.
Args:
video_clip (VideoFileClip): Input video clip.
Returns:
face_detections (list): List of tuples containing start and end times of segments with detected faces.
'''
frames = [frame for frame in video_clip.iter_frames()]
frame_rate = video_clip.fps
frame_times = np.arange(len(frames)) / frame_rate
face_detections = []

for i, frame in enumerate(frames):
face_locations = face_recognition.face_locations(frame)
if face_locations:
start_time = frame_times[max(0, i - 1)]
end_time = frame_times[min(len(frames) - 1, i + 1)]
face_detections.append((start_time, end_time))

return face_detections

# Function to create a new video from detected face segments
def create_new_video(video_clip, face_detections, output_path):
'''
Creates a new video from detected face segments.
Args:
video_clip (VideoFileClip): Input video clip.
face_detections (list): List of tuples containing start and end times of segments with detected faces.
output_path (str): Path to save the new video.
'''
new_video_clip = None

for start_time, end_time in face_detections:
subclip = video_clip.subclip(start_time, end_time)
if new_video_clip is None:
new_video_clip = subclip
else:
new_video_clip = new_video_clip.append(subclip)

new_video_clip.write_videofile(output_path)

# Read data from a CSV file
csv_file_path = "/Users/imseohyeon/Documents/crawling/data/Youtube_search_df.csv"
df = pd.read_csv(csv_file_path)

# Paths for input and output folders
DOWNLOAD_FOLDER = "/Users/imseohyeon/Documents/crawling/download/"
NEW_FOLDER = "/Users/imseohyeon/Documents/crawling/processed_videos/"

# Create a new folder if it doesn't exist
if not os.path.exists(NEW_FOLDER):
os.makedirs(NEW_FOLDER)

# Process each video to extract audio from segments with detected faces and create new videos
for idx, row in df.iterrows():
video_filename = f"{idx}_video.mp4"
video_path = os.path.join(DOWNLOAD_FOLDER, video_filename)

if os.path.exists(video_path):
try:
video_clip = VideoFileClip(video_path)
face_detections = detect_faces(video_clip)

if face_detections:
final_audio = extract_audio_with_faces(video_clip, face_detections)
output_path = os.path.join(NEW_FOLDER, f"{idx}_new_video.mp4")
create_new_video(video_clip, face_detections, output_path)

print(f"Processing complete for {video_filename}")
else:
print(f"No faces detected in {video_filename}")
except Exception as e:
print(f"Error processing {video_filename}: {e}")
else:
print(f"File {video_filename} does not exist.")

# Move processed videos to another folder
processed_files = os.listdir(NEW_FOLDER)
for file in processed_files:
shutil.move(os.path.join(NEW_FOLDER, file), DOWNLOAD_FOLDER)

print("All videos processed")
30 changes: 30 additions & 0 deletions data/crawling/crawling_rename_video.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import os
import pandas as pd

'''
Match the video names in the 'download' folder with the index in the CSV.
This facilitates the subsequent video relabeling task.
'''

# Read links from the CSV file
csv_file_path = "/Users/imseohyeon/Documents/crawling/data/Youtube_search_df.csv"
df = pd.read_csv(csv_file_path)

# Path to the folder where downloaded videos are stored
DOWNLOAD_FOLDER = "/Users/imseohyeon/Documents/crawling/download/"

# Iterate over all files in the folder and rename them
for filename in os.listdir(DOWNLOAD_FOLDER):
# Full path of the file
file_path = os.path.join(DOWNLOAD_FOLDER, filename)
# Check if the file is a .mp4 file
if filename.endswith(".mp4"):
# Extract the index value from the file name (assuming the video title is stored as the index)
idx = filename.split("_")[0] # Example: "0_video.mp4" -> "0"
# Create a new file name
new_filename = f"{idx}_video.mp4"
# Create the new file path
new_file_path = os.path.join(DOWNLOAD_FOLDER, new_filename)
# Rename the file
os.rename(file_path, new_file_path)
print(f"File renamed: {filename} -> {new_filename}")
42 changes: 42 additions & 0 deletions data/crawling/crawling_select_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import os
import pandas as pd
from argparse import ArgumentParser

def parse_args():
parser = ArgumentParser()

# Conventional args
parser.add_argument('--csv_file', type=str, default='output_test.csv')
parser.add_argument('--data_path', type=str, default='origin/video')
parser.add_argument('--save_csv', type=str, default='new_output.csv')

args = parser.parse_args()

return args


def list_files_and_folders(data_path):
if os.path.isdir(data_path):
items = os.listdir(data_path)
return items
else:
return None

def main(csv_file, data_path, save_csv):

csv_data = pd.read_csv(csv_file, header=None)
youtube_ids = list_files_and_folders(data_path)

for youtube_id in youtube_ids:
filtered_df = csv_data[csv_data[0].astype(str).str.contains(youtube_id)]
first_row = filtered_df.iloc[0:1]
file_name = list_files_and_folders(os.path.join(data_path, youtube_id))[0]
file_name_list = file_name.split("_")
first_row[4] = file_name_list[0]
first_row[5] = file_name_list[1]
first_row.to_csv(save_csv, mode="a", index=False, header=False)


if __name__ == '__main__':
args = parse_args()
main(**args.__dict__)
Loading

0 comments on commit e167a68

Please sign in to comment.