視頻識別 動作識別 實時異常行為識別 等所有行為識別


文章目錄

 

大家好,我是cv君,很多大創,比賽,項目,工程,科研,學術的煉丹術士問我上述這些識別,該怎么做,怎么選擇框架,今天可以和大家分析一下一些方案:

用單幀目標檢測做的話,前后語義相關性很差(也有優化版),效果不能達到實際項目需求,尤其是在誤檢上較難,並且目標檢測是需要大量數據來擬合的。標注需求極大。

用姿態加目標檢測結合的方式,效果是很不錯的,不過一些這樣類似Two stage的方案,速度較慢(也有很多實時的),同樣有着一些不能通過解決時間上下文的問題。

即:摔倒檢測 我們正常是應該有一個摔倒過程,才能被判斷為摔倒的,而不是人倒下的就一定是摔倒(純目標檢測弊病)

運動檢測 比如引體向上,和高抬腿計數,球類運動,若是使用目標檢測做,那么會出現什么問題呢? 引體向上無法實現動作是否規范(當然可以通過后處理判斷下巴是否過框,效果是不夠人工智能的),高抬腿計數,目標檢測是無法計數的,判斷人物的球類運動,目標檢測是有很大的誤檢的:第一種使用球檢測,誤檢很大,第二種使用打球手勢檢測,遇到人物遮擋球類,就無法識別目標,在標注上也需要大量數據

今天cv君鋪墊了這么多,只是為了給大家推薦一個全新出爐視頻序列檢測方法,目前代碼已開源至Githubhttps://github.com/xiaobingchan/CV-Action 歡迎star~

歡迎移步。只需要很少的訓練數據,就可以擬合哦!不信你來試試吧~幾個訓練集即可。

神經網絡使用的是這兩個月開源的實時動作序列強分類神經網絡:realtimenet 。

我的github將收集 所有的上述說到的動作序列視頻數據,訓練出能實用的檢測任務:目前實現了手勢控制的檢測,等等,大家歡迎關注公眾號,后續會接着更新。

開始

目前以手勢和運動識別為例子,因為cv君沒什么數據哈哈

項目演示:

本人做的沒轉gif,所以大家可以看看其他的演示效果圖,跟我的是幾乎一樣的~ 只是訓練數據不同
在這里插入圖片描述在這里插入圖片描述

在這里插入圖片描述
在這里插入圖片描述

一、 基本過程和思想

基本思想是將數據集中視頻及分類標簽轉換為圖像(視頻幀)和其對應的分類標簽,也可以不標注,單獨給一個小視頻標注上分類類別,再采用CNN網絡對圖像進行訓練學習和測試,將視頻分類問題轉化為圖形分類問題。具體步驟包括:

(1) 對每個視頻(訓練和測試視頻)以一定的FPS截出視頻幀(jpegs)保存為訓練集和測試集,將對圖像的分類性能作為所對應視頻的分類性能

(2)訓練一個人物等特征提取模型,並采用模型融合策略,一個特征提取,一個分類模型。特征工程部分通用人物行為,分類模型,訓練自己的類別的分類模型即可。

(4) 訓練完成后載入模型對test set內所有的視頻幀進行檢查驗證,得出全測試集上的top1准確率和top5准確率輸出。

(5)實時檢測。

二 、視頻理解還有哪些優秀框架

第一個 就是我github這個了,比較方便,但不敢排前幾,因為沒有什么集成,

然后MMaction ,就是視頻理解框架了,眾所周知,他們家的東西很棒

第二個就是facebook家的一些了,

再下來基本上就不多了,全面好用的實時框架。

好,所以我們先來說說我的使用過程。

三、效果體驗~使用

體驗官方的一些模型 (模型我已經放在里面了)

pip install -r requirements.txt

將模型放置此處:

resources
├── backbone
│   ├── strided_inflated_efficientnet.ckpt
│   └── strided_inflated_mobilenet.ckpt
├── fitness_activity_recognition
│   └── ...
├── gesture_recognition
│   └── ...
└── ...

首先,請試用我們提供的演示。在sense/examples目錄中,您將找到3個Python腳本, run_gesture_recognition.py ,健身_跟蹤器 run_fitness_tracker.py .py,並運行卡路里_估算 run_calorie_estimation .py. 啟動每個演示就像在終端中運行腳本一樣簡單,如下所述。

手勢:

cd examples/

python run_gesture_recognition.py

健身_跟蹤器:

python examples/run_fitness_tracker.py --weight=65 --age=30 --height=170 --gender=female
  --camera_id=CAMERA_ID           ID of the camera to stream from
  --path_in=FILENAME              Video file to stream from. This assumes that the video was encoded at 16 fps.

卡路里計算

python examples/run_calorie_estimation.py --weight=65 --age=30 --height=170 --gender=female

三、訓練自己數據集步驟

首先 clone一下我的github,或者原作者github,

然后自己錄制幾個視頻,比如我這里capture 一個類別,錄制了幾個視頻,可以以MP4 或者avi后綴,再來個類別,再錄制一些視頻,以名字為類別。

然后

cd tools\sense_studio\sense_studio.py
  • 1

這一步,會顯示:

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-ZHcMZyRC-1614669575240)(D:\CSDN\pic_new\sense\1614657924104.png)]

然后,打開這個網址:

來到前端界面

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-wv6folha-1614669575241)(./sense\1614658174416.png)]

點擊一下start new project

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-EEhm0qcY-1614669575243)(./sense\1614658199676.png)]

這樣編寫

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-Dp8HRoE2-1614669575244)(./sense\1614658272219.png)]

然后點擊create project 即可制作數據。

但是官方的制作方法是有着嚴重bug的~我們該怎么做呢!

下面,我修改后,可以這樣!

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-0tBCyoPd-1614669575245)(./sense\1614660437833.png)]

這里請仔細看:

我們在sense_studio 文件夾下,新建一個文件夾:我叫他cvdemo1

然后新建兩個文件夾:videos_train 和videos_valid 里面存放的capture是你的類別名字的數據集,capture存放相關的訓練集,click存放click的訓練集,同樣的videos_valid 存放驗證集,

在cvdemo1文件夾下新建project_config.json ,里面寫什么呢? 可以復制我的下面的代碼:

{
  "name": "cvdemo1", 
  "date_created": "2021-02-03",
  "classes": {
    "capture": [
      "capture",
      "capture"
    ],
    "click": [
      "click",
      "click"
    ]
  }
}

里面的name 改成你的文件夾名字即可。

就這么簡單!

然后就可以訓練:
python train_classifier.py 你可以將main中修改一下。

將path in修改成我們的訓練數據地址,即可,其他的修改不多,就按照我的走即可,

    # Parse arguments
    # args = docopt(__doc__)
    path_in = './sense_studio/cvdemo1/'
    path_out = path_in
    os.makedirs(path_out, exist_ok=True)
    use_gpu = True
    path_annotations_train = None
    path_annotations_valid =None
    num_layers_to_finetune = 9
    temporal_training = False

    # Load feature extractor
    feature_extractor = feature_extractors.StridedInflatedEfficientNet()
    checkpoint = torch.load('../resources/backbone/strided_inflated_efficientnet.ckpt')
    feature_extractor.load_state_dict(checkpoint)
    feature_extractor.eval()

    # Get the require temporal dimension of feature tensors in order to
    # finetune the provided number of layers.
    if num_layers_to_finetune > 0:
        num_timesteps = feature_extractor.num_required_frames_per_layer.get(-num_layers_to_finetune)
        if not num_timesteps:
            # Remove 1 because we added 0 to temporal_dependencies
            num_layers = len(feature_extractor.num_required_frames_per_layer) - 1
            raise IndexError(f'Num of layers to finetune not compatible. '
                             f'Must be an integer between 0 and {num_layers}')
    else:
        num_timesteps = 1

訓練特別快,10分鍾即可,

然后,你可以運行run_custom_classifier.py

   # Parse arguments
    # args = docopt(__doc__)
    camera_id = 0
    path_in = None
    path_out = None
    custom_classifier = './sense_studio/cvdemo1/'
    title = None
    use_gpu = True

    # Load original feature extractor
    feature_extractor = feature_extractors.StridedInflatedEfficientNet()
    feature_extractor.load_weights_from_resources('../resources/backbone/strided_inflated_efficientnet.ckpt')
    # feature_extractor = feature_extractors.StridedInflatedMobileNetV2()
    # feature_extractor.load_weights_from_resources(r'../resources\backbone\strided_inflated_mobilenet.ckpt')
    checkpoint = feature_extractor.state_dict()

    # Load custom classifier
    checkpoint_classifier = torch.load(os.path.join(custom_classifier, 'classifier.checkpoint'))
    # Update original weights in case some intermediate layers have been finetuned
    name_finetuned_layers = set(checkpoint.keys()).intersection(checkpoint_classifier.keys())
    for key in name_finetuned_layers:
        checkpoint[key] = checkpoint_classifier.pop(key)
    feature_extractor.load_state_dict(checkpoint)
    feature_extractor.eval()
    print('[debug] net:', feature_extractor)
    with open(os.path.join(custom_classifier, 'label2int.json')) as file:
        class2int = json.load(file)
    INT2LAB = {value: key for key, value in class2int.items()}

    gesture_classifier = LogisticRegression(num_in=feature_extractor.feature_dim,
                                            num_out=len(INT2LAB))
    gesture_classifier.load_state_dict(checkpoint_classifier)
    gesture_classifier.eval()
    print(gesture_classifier)

同樣修改路徑即可。

結果就可以實時檢測了

原代碼解讀

同樣的,我們使用的是使用efficienct 來做的特征,你也可以改成mobilenet 來做,有示例代碼,就是訓練的時候,用mobilenet ,檢測的時候也是,只需要修改幾行代碼即可。

efficienct 提取特征部分代碼:

class StridedInflatedEfficientNet(StridedInflatedMobileNetV2):

    def __init__(self):

        super().__init__()

        self.cnn = nn.Sequential(
            ConvReLU(3, 32, 3, stride=2),
            InvertedResidual(32, 24, 3, spatial_stride=1),
            InvertedResidual(24, 32, 3, spatial_stride=2, expand_ratio=6),
            InvertedResidual(32, 32, 3, spatial_stride=1, expand_ratio=6, temporal_shift=True),
            InvertedResidual(32, 32, 3, spatial_stride=1, expand_ratio=6),
            InvertedResidual(32, 32, 3, spatial_stride=1, expand_ratio=6),
            InvertedResidual(32, 56, 5, spatial_stride=2, expand_ratio=6),
            InvertedResidual(56, 56, 5, spatial_stride=1, expand_ratio=6, temporal_shift=True, temporal_stride=True),
            InvertedResidual(56, 56, 5, spatial_stride=1, expand_ratio=6),
            InvertedResidual(56, 56, 5, spatial_stride=1, expand_ratio=6),
            InvertedResidual(56, 112, 3, spatial_stride=2, expand_ratio=6),
            InvertedResidual(112, 112, 3, spatial_stride=1, expand_ratio=6, temporal_shift=True),
            InvertedResidual(112, 112, 3, spatial_stride=1, expand_ratio=6),
            InvertedResidual(112, 112, 3, spatial_stride=1, expand_ratio=6),
            InvertedResidual(112, 112, 3, spatial_stride=1, expand_ratio=6, temporal_shift=True, temporal_stride=True),
            InvertedResidual(112, 112, 3, spatial_stride=1, expand_ratio=6),
            InvertedResidual(112, 160, 5, spatial_stride=1, expand_ratio=6),
            InvertedResidual(160, 160, 5, spatial_stride=1, expand_ratio=6, temporal_shift=True),
            InvertedResidual(160, 160, 5, spatial_stride=1, expand_ratio=6),
            InvertedResidual(160, 160, 5, spatial_stride=1, expand_ratio=6),
            InvertedResidual(160, 160, 5, spatial_stride=1, expand_ratio=6, temporal_shift=True),
            InvertedResidual(160, 160, 5, spatial_stride=1, expand_ratio=6),
            InvertedResidual(160, 272, 5, spatial_stride=2, expand_ratio=6),
            InvertedResidual(272, 272, 5, spatial_stride=1, expand_ratio=6, temporal_shift=True),
            InvertedResidual(272, 272, 5, spatial_stride=1, expand_ratio=6),
            InvertedResidual(272, 272, 5, spatial_stride=1, expand_ratio=6, temporal_shift=True),
            InvertedResidual(272, 272, 5, spatial_stride=1, expand_ratio=6),
            InvertedResidual(272, 272, 5, spatial_stride=1, expand_ratio=6),
            InvertedResidual(272, 272, 5, spatial_stride=1, expand_ratio=6),
            InvertedResidual(272, 272, 5, spatial_stride=1, expand_ratio=6),
            InvertedResidual(272, 448, 3, spatial_stride=1, expand_ratio=6),
            ConvReLU(448, 1280, 1)
        )

這個InvertedResidual 在這,

class InvertedResidual(nn.Module):  # noqa: D101

    def __init__(self, in_planes, out_planes, spatial_kernel_size=3, spatial_stride=1, expand_ratio=1,
                 temporal_shift=False, temporal_stride=False, sparse_temporal_conv=False):
        super().__init__()
        assert spatial_stride in [1, 2]
        hidden_dim = round(in_planes * expand_ratio)
        self.use_residual = spatial_stride == 1 and in_planes == out_planes
        self.temporal_shift = temporal_shift
        self.temporal_stride = temporal_stride

        layers = []
        if expand_ratio != 1:
            # Point-wise expansion
            stride = 1 if not temporal_stride else (2, 1, 1)
            if temporal_shift and sparse_temporal_conv:
                convlayer = SteppableSparseConv3dAs2d
                kernel_size = 1
            elif temporal_shift:
                convlayer = SteppableConv3dAs2d
                kernel_size = (3, 1, 1)
            else:
                convlayer = nn.Conv2d
                kernel_size = 1
            layers.append(ConvReLU(in_planes, hidden_dim, kernel_size=kernel_size, stride=stride,
                                   padding=0, convlayer=convlayer))

        layers.extend([
            # Depth-wise convolution
            ConvReLU(hidden_dim, hidden_dim, kernel_size=spatial_kernel_size, stride=spatial_stride,
                     groups=hidden_dim),
            # Point-wise mapping
            nn.Conv2d(hidden_dim, out_planes, 1, 1, 0),
            # nn.BatchNorm2d(out_planes)
        ])
        self.conv = nn.Sequential(*layers)

    def forward(self, input_):  # noqa: D102
        output_ = self.conv(input_)
        residual = self.realign(input_, output_)
        if self.use_residual:
            output_ += residual
        return output_

    def realign(self, input_, output_):  # noqa: D102
        n_out = output_.shape[0]
        if self.temporal_stride:
            indices = [-1 - 2 * idx for idx in range(n_out)]
            return input_[indices[::-1]]
        else:
            return input_[-n_out:]



我們finetune自己的數據集

def extract_features(path_in, net, num_layers_finetune, use_gpu, num_timesteps=1):
    # Create inference engine
    inference_engine = engine.InferenceEngine(net, use_gpu=use_gpu)

    # extract features
    for dataset in ["train", "valid"]:
        videos_dir = os.path.join(path_in, f"videos_{dataset}")
        features_dir = os.path.join(path_in, f"features_{dataset}_num_layers_to_finetune={num_layers_finetune}")
        video_files = glob.glob(os.path.join(videos_dir, "*", "*.avi"))

        print(f"\nFound {len(video_files)} videos to process in the {dataset}set")

        for video_index, video_path in enumerate(video_files):
            print(f"\rExtract features from video {video_index + 1} / {len(video_files)}",
                  end="")
            path_out = video_path.replace(videos_dir, features_dir).replace(".mp4", ".npy")

            if os.path.isfile(path_out):
                print("\n\tSkipped - feature was already precomputed.")
            else:
                # Read all frames
                compute_features(video_path, path_out, inference_engine,
                                 num_timesteps=num_timesteps, path_frames=None, batch_size=16)

        print('\n')

構建數據的dataloader

def generate_data_loader(dataset_dir, features_dir, tags_dir, label_names, label2int,
                         label2int_temporal_annotation, num_timesteps=5, batch_size=16, shuffle=True,
                         stride=4, path_annotations=None, temporal_annotation_only=False,
                         full_network_minimum_frames=MODEL_TEMPORAL_DEPENDENCY):
    # Find pre-computed features and derive corresponding labels
    tags_dir = os.path.join(dataset_dir, tags_dir)
    features_dir = os.path.join(dataset_dir, features_dir)
    labels_string = []
    temporal_annotation = []
    if not path_annotations:
        # Use all pre-computed features
        features = []
        labels = []
        for label in label_names:
            feature_temp = glob.glob(f'{features_dir}/{label}/*.npy')
            features += feature_temp
            labels += [label2int[label]] * len(feature_temp)
            labels_string += [label] * len(feature_temp)
    else:
        with open(path_annotations, 'r') as f:
            annotations = json.load(f)
        features = ['{}/{}/{}.npy'.format(features_dir, entry['label'],
                                          os.path.splitext(os.path.basename(entry['file']))[0])
                    for entry in annotations]
        labels = [label2int[entry['label']] for entry in annotations]
        labels_string = [entry['label'] for entry in annotations]

    # check if annotation exist for each video
    for label, feature in zip(labels_string, features):
        classe_mapping = {0: "counting_background",
                          1: f'{label}_position_1', 2:
                              f'{label}_position_2'}
        temporal_annotation_file = feature.replace(features_dir, tags_dir).replace(".npy", ".json")
        if os.path.isfile(temporal_annotation_file):
            annotation = json.load(open(temporal_annotation_file))["time_annotation"]
            annotation = np.array([label2int_temporal_annotation[classe_mapping[y]] for y in annotation])
            temporal_annotation.append(annotation)
        else:
            temporal_annotation.append(None)

    if temporal_annotation_only:
        features = [x for x, y in zip(features, temporal_annotation) if y is not None]
        labels = [x for x, y in zip(labels, temporal_annotation) if y is not None]
        temporal_annotation = [x for x in temporal_annotation if x is not None]

    # Build dataloader
    dataset = FeaturesDataset(features, labels, temporal_annotation,
                              num_timesteps=num_timesteps, stride=stride,
                              full_network_minimum_frames=full_network_minimum_frames)
    data_loader = torch.utils.data.DataLoader(dataset, shuffle=shuffle, batch_size=batch_size)

    return data_loader

如何實時檢測視頻序列的?

這個問題,主要是通過 系列時間內幀間圖像組合成一個序列,送到網絡中進行分類的,可以在許多地方找到相關參數,比如 display.py :

class DisplayClassnameOverlay(BaseDisplay):
    """
    Display recognized class name as a large video overlay. Once the probability for a class passes the threshold,
    the name is shown and stays visible for a certain duration.
    """

    def __init__(
            self,
            thresholds: Dict[str, float],
            duration: float = 2.,
            font_scale: float = 3.,
            thickness: int = 2,
            border_size: int = 50,
            **kwargs
    ):
        """
        :param thresholds:
            Dictionary of thresholds for all classes.
        :param duration:
            Duration in seconds how long the class name should be displayed after it has been recognized.
        :param font_scale:
            Font scale factor for modifying the font size.
        :param thickness:
            Thickness of the lines used to draw the text.
        :param border_size:
            Height of the border on top of the video display. Used for correctly centering the displayed class name
            on the video.
        """
        super().__init__(**kwargs)
        self.thresholds = thresholds
        self.duration = duration
        self.font_scale = font_scale
        self.thickness = thickness
        self.border_size = border_size

        self._current_class_name = None
        self._start_time = None

    def _get_center_coordinates(self, img: np.ndarray, text: str):
        textsize = cv2.getTextSize(text, FONT, self.font_scale, self.thickness)[0]

        height, width, _ = img.shape
        height -= self.border_size

        x = int((width - textsize[0]) / 2)
        y = int((height + textsize[1]) / 2) + self.border_size

        return x, y

    def _display_class_name(self, img: np.ndarray, class_name: str):
        pos = self._get_center_coordinates(img, class_name)
        put_text(img, class_name, position=pos, font_scale=self.font_scale, thickness=self.thickness)

    def display(self, img: np.ndarray, display_data: dict):
        now = time.perf_counter()

        if self._current_class_name and now - self._start_time < self.duration:
            # Keep displaying the same class name
            self._display_class_name(img, self._current_class_name)
        else:
            self._current_class_name = None
            for class_name, proba in display_data['sorted_predictions']:
                if class_name in self.thresholds and proba > self.thresholds[class_name]:
                    # Display new class name
                    self._display_class_name(img, class_name)
                    self._current_class_name = class_name
                    self._start_time = now
                    break
        return img



每個類別只需要5個左右的視頻,即可得到不錯的效果嗷~
歡迎Star github~

.

 

任何程序錯誤,以及技術疑問或需要解答的,請掃碼添加作者VX::1755337994


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM