File size: 10,913 Bytes
6755a2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
from copy import deepcopy
from typing import Iterable
import logging

import numpy as np

from ..utils.util import convert_class_attr_to_dict

logger = logging.getLogger(__name__)  # pylint: disable=invalid-name


class Clip(object, Item):
    """媒体片段, 指转场点与转场点之间的部分"""

    def __init__(
        self,
        time_start,
        duration,
        clipid=None,
        media_type=None,
        mediaid=None,
        timepoint_type=None,
        text=None,
        stage=None,
        path=None,
        duration_num=None,
        group_time_start=0,
        group_clipid=None,
        original_clipid=None,
        emb=None,
        multi_factor=None,
        similar_clipseq=None,
        rythm: float = None,
        **kwargs
    ):
        """
        Args:
            time_start (float): 开始时间,秒为单位,对应该媒体文件的, 和media_map.json上的序号一一对应
            duration (_type_): 片段持续时间
            clipid (int, or [int]): 由media_map提供的片段序号, 和media_map.json上的序号一一对应
            media_type (str, optional): music, video,text, Defaults to None.
            mediaid (int): 多媒体id, 当clipid是列表时,表示该片段是个融合片段
            timepoint_type(int, ): 开始点的转场类型. Defaults to None.
            text(str, optional): 该片段的文本描述,音乐可以是歌词,视频可以是台词,甚至可以是弹幕. Defaults to None.
            stage(str, optional): 该片段在整个媒体文件中的结构位置,如音乐的intro、chrous、vesa,视频的片头、片尾、开始、高潮、转场等. Defaults to None.
            path (_type_, optional): 该媒体文件的路径,用于后续媒体读取、处理. Defaults to None.
            duration_num (_type_, optional): 片段持续帧数, Defaults to None.
            group_time_start (int, optional): 当多歌曲、多视频剪辑时,group_time_start 表示该片段所对应的子媒体前所有子媒体的片段时长总和。
                默认0, 表示只有1个媒体文件. Defaults to 0.
            group_clipid (int, optional):  # MediaInfo.sub_meta_info 中的实际序号.
            original_clipid (None or [int], optional): 有些片段由其他片段合并,该字段用于片段来源,id是 media_map.json 中的实际序号. Defaults to None.
            emb (np.array, optional): 片段 综合emb,. Defaults to None.
            multi_factor (MultiFactorFeature), optional): 多维度特征. Defaults to None.
            similar_clipseq ([Clip]], optional): 与该片段相似的片段,具体结构待定义. Defaults to None.
        """
        self.media_type = media_type
        self.mediaid = mediaid
        self.time_start = time_start
        self.duration = duration
        self.clipid = clipid
        self.path = path
        self.timepoint_type = timepoint_type
        self.text = text
        self.stage = stage
        self.group_time_start = group_time_start
        self.group_clipid = group_clipid
        self.duration_num = duration_num
        self.original_clipid = original_clipid if original_clipid is not None else []
        self.emb = emb
        self.multi_factor = multi_factor
        self.similar_clipseq = similar_clipseq
        self.rythm = rythm
        # TODO: 目前谱面中会有一些不必要的中间结果,比较占内存,现在代码里删掉,待后续数据协议确定
        kwargs = {k: v for k, v in kwargs.items()}
        self.__dict__.update(kwargs)
        self.preprocess()

    def preprocess(self):
        pass

    def spread_parameters(self):
        pass

    @property
    def time_end(
        self,
    ):
        return self.time_start + self.duration

    @property
    def mvp_clip(self):
        """读取实际的片段数据为moviepy格式

        Raises:
            NotImplementedError: _description_
        """
        raise NotImplementedError


class ClipSeq(object):
    """媒体片段序列"""

    ClipClass = Clip

    def __init__(self, clips) -> None:
        """_summary_

        Args:
            clips ([Clip]]): 媒体片段序列
        """
        if not isinstance(clips, list):
            clips = [clips]
        if len(clips) == 0:
            self.clips = []
        elif isinstance(clips[0], dict):
            self.clips = [self.ClipClass(**d) for d in clips]
        else:
            self.clips = clips

    def set_clip_value(self, k, v):
        """给序列中的每一个clip 赋值"""
        for i in range(len(self.clips)):
            self.clips[i].__setattr__(k, v)

    def __len__(
        self,
    ):
        return len(self.clips)

    def merge(self, other, group_time_start_delta=None, groupid_delta=None):
        """融合其他ClipSeq。media_info 融合时需要记录 clip 所在的 groupid 和 group_time_start,delta用于表示变化

        Args:
            other (ClipSeq): 待融合的ClipSeq
            group_time_start_delta (float, optional): . Defaults to None.
            groupid_delta (int, optional): _description_. Defaults to None.
        """
        if group_time_start_delta is not None or groupid_delta is not None:
            for i, clip in enumerate(other):
                if group_time_start_delta is not None:
                    clip.group_time_start += group_time_start_delta
                if groupid_delta is not None:
                    clip.groupid += groupid_delta
        self.clips.extend(other.clips)
        for i in range(len(self.clips)):
            self.clips[i].group_clipid = i

    @property
    def duration(
        self,
    ):
        """Clip.duration的和

        Returns:
            float: 序列总时长
        """
        if len(self.clips) == 0:
            return 0
        else:
            return sum([c.duration for c in self.clips])

    def __getitem__(self, i) -> Clip:
        """支持索引和切片操作,如果输入是整数则返回Clip,如果是切片,则返回ClipSeq

        Args:
            i (int or slice): 索引

        Raises:
            ValueError: 需要按照给的输入类型索引

        Returns:
            Clip or ClipSeq:
        """
        if "int" in str(type(i)):
            i = int(i)
        if isinstance(i, int):
            clip = self.clips[i]
            return clip
        elif isinstance(i, Iterable):
            clips = [self.__getitem__(x) for x in i]
            clipseq = ClipSeq(clips)
            return clipseq
        elif isinstance(i, slice):
            if i.step is None:
                step = 1
            else:
                step = i.step
            clips = [self.__getitem__(x) for x in range(i.start, i.stop, step)]
            clipseq = ClipSeq(clips)
            return clipseq
        else:
            raise ValueError(
                "unsupported input, should be int or slice, but given {}, type={}".format(
                    i, type(i)
                )
            )

    def insert(self, idx, obj):
        self.clips.insert(idx, obj)

    def append(self, obj):
        self.clips.append(obj)

    def extend(self, objs):
        self.clips.extend(objs)

    @property
    def duration_seq_emb(
        self,
    ):
        emb = np.array([c.duration for c in self.clips])
        return emb

    @property
    def timestamp_seq_emb(self):
        emb = np.array([c.time_start for c in self.clips])
        return emb

    @property
    def rela_timestamp_seq_emb(self):
        emb = self.timestamp_seq_emb / self.duration
        return emb

    def get_factor_seq_emb(self, factor, dim):
        emb = []
        for c in self.clips:
            if factor not in c.multi_factor or c.multi_factor[factor] is None:
                v = np.full(dim, np.inf)
            else:
                v = c.multi_factor[factor]
            emb.append(v)
        emb = np.stack(emb, axis=0)
        return emb

    def semantic_seq_emb(self, dim):
        return self.get_factor_seq_emb(factor="semantics", dim=dim)

    def emotion_seq_emb(self, dim):
        return self.get_factor_seq_emb(factor="emotion", dim=dim)

    def theme_seq_emb(self, dim):
        return self.get_factor_seq_emb(factor="theme", dim=dim)

    def to_dct(
        self,
        target_keys=None,
        ignored_keys=None,
    ):
        if ignored_keys is None:
            ignored_keys = ["kwargs", "audio_path", "lyric_path", "start", "end"]
        clips = [
            clip.to_dct(target_keys=target_keys, ignored_keys=ignored_keys)
            for clip in self.clips
        ]
        return clips

    @property
    def mvp_clip(self):
        """读取实际的片段数据为moviepy格式

        Raises:
            NotImplementedError: _description_
        """
        raise NotImplementedError


class ClipIds(object):
    def __init__(
        self,
        clipids: list or int,
    ) -> None:
        """ClipSeq 中的 Clip序号,主要用于多个 Clip 融合后的 Clip, 使用场景如
        1. 一个 MusicClip 可以匹配到多个 VideoClip,VideoClip 的索引便可以使用 ClipIds 定义。

        Args:
            clipids (list or int): ClipSeq 中的序号
        """
        self.clipids = clipids if isinstance(clipids, list) else [clipids]


class ClipIdsSeq(object):
    def __init__(self, clipids_seq: list) -> None:
        """多个 ClipIds,使用场景可以是
        1. 将MediaClipSeq 进行重组,拆分重组成更粗粒度的ClipSeq;

        Args:
            clipids_seq (list): 组合后的 ClipIds 列表
        """
        self.clipids_seq = (
            clipids_seq if isinstance(clipids_seq, ClipIds) else [clipids_seq]
        )


# TODO: metric后续可能是字典
class MatchedClipIds(object):
    def __init__(
        self, id1: ClipIds, id2: ClipIds, metric: float = None, **kwargs
    ) -> None:
        """两种模态数据的片段匹配对,使用场景 可以是
        1. 音乐片段和视频片段 之间的匹配关系,

        Args:
            id1 (ClipIds): 第一种模态的片段
            id2 (ClipIds): 第二种模态的片段
            metric (float): 匹配度量距离
        """
        self.id1 = id1 if isinstance(id1, ClipIds) else ClipIds(id1)
        self.id2 = id2 if isinstance(id2, ClipIds) else ClipIds(id2)
        self.metric = metric
        self.__dict__.update(**kwargs)


class MatchedClipIdsSeq(object):
    def __init__(self, seq: list, metric: float = None, **kwargs) -> None:
        """两种模态数据的序列匹配对,使用场景可以是
        1. 音乐片段序列和视频片段序列 之间的匹配,每一个元素都是MatchedClipIds:

        Args:
            seq (list): 两种模态数据的序列匹配对列表
            metric (float): 匹配度量距离
        """
        self.seq = seq
        self.metric = metric
        self.__dict__.update(**kwargs)