Spaces:
Running
Running
File size: 1,692 Bytes
0cb931d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
from typing import Any, Dict, List
import copy
def merge_timestamps(timestamps: List[Dict[str, Any]], merge_window: float = 5, max_merge_size: float = 30, padding_left: float = 1, padding_right: float = 1):
result = []
if len(timestamps) == 0:
return result
processed_time = 0
current_segment = None
for i in range(len(timestamps)):
next_segment = timestamps[i]
delta = next_segment['start'] - processed_time
# Note that segments can still be longer than the max merge size, they just won't be merged in that case
if current_segment is None or delta > merge_window or next_segment['end'] - current_segment['start'] > max_merge_size:
# Finish the current segment
if current_segment is not None:
# Add right padding
finish_padding = min(padding_right, delta / 2) if delta < padding_left + padding_right else padding_right
current_segment['end'] += finish_padding
delta -= finish_padding
result.append(current_segment)
# Start a new segment
current_segment = copy.deepcopy(next_segment)
# Pad the segment
current_segment['start'] = current_segment['start'] - min(padding_left, delta)
processed_time = current_segment['end']
else:
# Merge the segment
current_segment['end'] = next_segment['end']
processed_time = current_segment['end']
# Add the last segment
if current_segment is not None:
current_segment['end'] += padding_right
result.append(current_segment)
return result |