La boîte à Tutoriels de Christopher PECAUD

Créer un convertisseur vidéo avec watchdog avec Python

Laisser un commentaire

SOMMAIRE

I Introduction
II Prérequis
III Ecriture du script
IV Conclusion

I Introduction

Dans ce document nous allons voir comment utiliser Python afin de créer un convertisseur vidéo MP4 vers WMV avec un watchdog sous Windows. Nous verrons comment utiliser les exécutables FFMPEG afin de réaliser cette opération en utilisant le module ffmpy associé. Nous verrons comment de même scruter les événements liés à la modification d’un dossier afin de lancer le processus de conversion. Cela nécessite aussi l’utilisation d’une fonction pour permettre d’attendre la copie complète du fichier avant de lancer celui-ci.

II Prérequis

Pour l’écriture de ce script nous avons besoin de deux modules que nous allons télécharger et installer avec pip.

Le premier est ffmpy qui est téléchargeable par l’intermédiaire de la commande :

pip install ffmpy

Le second, watchdog, est téléchargeable par l’intermédiaire de celle-ci :

Pip install watchdog

Une fois ces deux modules installés, on peut commencer l’écriture de notre script Python.

III Ecriture du script

Importation des différents modules nécessaires

Dans un premier temps nous allons définir les imports nécessaires qui vont nous permettre d’utiliser les objets et fonctions associés aux modules téléchargés ainsi qu’à ceux inclus en standard avec Python.

On commence par importer l’objet FFMPEG du module ffmpy qui est un wrapper qui va nous permettre de travailler avec l’application FFMPEG.

from ffmpy import FFmpeg

Ensuite nous importons les classes Observer et FileSystemEventHandler. Le premier permet de scruter les changements qui se produisent à l’intérieur d’un dossier et envoie des notifications au gestionnaire d’événement FileSystemEventHandler

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

Les imports suivants vont permettre de lister les fichiers d’un dossier :

import os
from os import listdir
from os.path import isfile, join

L’import suivant permet d’utiliser l’objet sleep qui permet de créer une temporisation :

from time import sleep
from ctypes import windll

Définition de notre observateur d’événements

Nous pouvons dans un premier temps définir notre observateur d’événements. Ceci se fera dans la boucle principale de notre script :

if __name__ == "__main__":
    path_input = 'C:\\Temp\\A_transcoder\\CONVERT\\Input\\'
    path_output = "C:\\Temp\\A_transcoder\\CONVERT\\Output\\"
    
    event_handler = FileSystemEventHandler()

On commence par définir les variables qui vont stocker les chemins d’accès aux fichiers vidéo d’origine et convertis.

Ensuite on crée le gestionnaire d’événement à partir de la classe FileSystemEventHandler. On aurait très bien pu créer notre propre classe de gestion des événements à partir de celle-ci pour interagir avec certains événements comme la création, la suppression, la modification d’un fichier mais dans le cas présent nous devons juste nous assurer que le fichier est complètement copié avant de lancer la conversion. Nous reviendrons sur la fonction nécessaire un peu plus tard dans ce document.

Maintenant que le gestionnaire est créé il faut créer un observateur avec la classe Observer :

observer = Observer()

Puis on lui demande de vérifier périodiquement le chemin d’accès passé en paramètre pour voir si un nouveau fichier a été déposé. L’objet gestionnaire d’événements créé précédemment est aussi passé en paramètre de la fonction.

observer.schedule(event_handler, path, recursive=True)

Puis nous lançons l’observateur avec la ligne suivante :

observer.start()

Une fois le système de watchdog initialisé et démarré il faut lancer une boucle :

While True :
    ….

Dans laquelle nous allons traiter le fichier qui va être déposé dans le chemin d’accès que nous avons défini.

Pour commencer nous créons une temporisation :

sleep(10)

Ensuite on va récupérer le nom du fichier par l’intermédiaire d’une fonction personnalisée qui va extraire le nom du fichier du chemin d’accès complet.

Filename = getFile(path)

Cette fonction sera définie à l’extérieur de la boucle principale du script. Celle-ci s’appuie sur la fonction sustème isfile() qui vérifie si la ressource est un fichier. La fonction retourne le nom du fichier :

def getFile(path):
    
    filename = [f for f in listdir(path) if isfile(join(path, f))]
    if len(filename) > 0 :
       
        return filename[0]

Il faut ensuite vérifier que la copie du fichier est complète avant de lancer le processus de conversion ceci est réalisé par l’intermédiaire d’une fonction que nous allons définir à l’extérieur de la partie principale du script. Celle-ci va vérifier l’état du fichier. Elle s’appuie sur la fonction CreateFileW de l’objet Kernel32. Cette fonction crée ou ouvre le fichier avec les paramètres suivants :

  • Le chemin d’accès fourni par notre variable path ;
  • GENERIC_WRITE : pour obtenir les accès en lecture ;
  • FILE_SHARE_READ :

Le booléen finished nous permet de savoir lorsque cette opération est terminée.

def is_file_copying_finished(path):
        finished = False
        GENERIC_WRITE = 1 << 30
        FILE_SHARE_READ = 0x00000001
        OPEN_EXISTING = 3
        FILE_ATTRIBUTE_NORMAL = 0x80
        
        h_file = windll.Kernel32.CreateFileW(path, GENERIC_WRITE, FILE_SHARE_READ, None, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, None)
        if h_file != -1:
            windll.Kernel32.CloseHandle(h_file)
            finished = True
        return finished    

Revenons maintenant à notre programme principal. Nous allons poursuivre notre boucle initiée précédemment en appelant notre fonction is_file_copying_finished. Si la valeur renvoyée par la fonction est True, nous extrayons le nom du fichier sans son extension (variable filename_without_ext) et nous extrayons de même l’extension par l’intermédiaire de la fonction partition qui permet de scinder la chaîne de caractères du nom du fichier en trois parties bien diestinctes. L’indice 0 correspond à la sous-chaîne correspondant au nom du fichier. L’indice 1 correspond au délimiteur c’est-à-dire la « . ». L’indice 2 correspond à la chaîne associée à l’extension du fichier. Ensuite nous appelons la fonction de conversion de la vidéo conversion_MP4_to_WMV.

if is_file_copying_finished(path_input + str(filename or "")):
    filename_without_ext = filename.partition('.')[0]
    filename_ext = filename.partition('.')[2]
    #print(filename_ext)
    conversion_MP4_to_WMV(filename_without_ext)

Voici le code de la fonction : def conversion_MP4_to_WMV(filename):

    try : 
        print('Initialisation du process de conversion...')
        ffmpeg = FFmpeg(
            executable='C:\\ffmpeg\\bin\\ffmpeg.exe',
            inputs={path_input + filename + '.' + filename_ext:None},
            outputs={path_output +filename + '.wmv':"-filter:v fps=25"}
        )
        ffmpeg.cmd
        print('The conversion process is about to start...')
        ffmpeg.run()
    except:
        print('The conversion failed...')

Nous définissons dans un premier temps une variable de type FFmpeg pour créer une instance d’objet de type FFMpeg qui prend en argument le chemin de l’exécutable de l’application FFmpeg. Dans mon cas le package a été déposé dans le dossier C:\ffmpeg\ donc l’exécutable se trouve dans le chemin suivant : C:\ffmpeg\bin

Ensuite il faut renseigner le chemin d’accès au fichier source avec un argument inputs contenant celui-ci et les filtres ou options à inclure :

inputs={path_input + filename + '.' + filename_ext:None},

Pour finir on donne le chemin de notre fichier de destination avec son nom de fichier et l’extension. On peut de même appliquer des filtres. Dans mon cas j’ai appliqué un filtre afin que la vidéo de sortie soit convertie en 25 images/secondes.

outputs={path_output +filename + '.wmv':"-filter:v fps=25"}

La fonction cmd permet de visualiser la commande associée à notre objet FFMPEG.

Nous lançons ensuite le processus de conversion avec la fonction run().

Nous englobons ce process dans une structure try … catch pour gérer les exceptions.

Voici le code complet du script :

from ffmpy import FFmpeg
from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler
from watchdog.events import FileSystemEventHandler
import subprocess
import os
from os import listdir
from os.path import isfile, join
import smtplib, ssl

from time import sleep
from ctypes import windll

def is_file_copying_finished(path):
        finished = False
        GENERIC_WRITE = 1 << 30
        FILE_SHARE_READ = 0x00000001
        OPEN_EXISTING = 3
        FILE_ATTRIBUTE_NORMAL = 0x80
        #print('the file is not fully copied... Waiting for the completion...')
        h_file = windll.Kernel32.CreateFileW(path, GENERIC_WRITE, FILE_SHARE_READ, None, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, None)
        if h_file != -1:
            windll.Kernel32.CloseHandle(h_file)
            print('The copying process is fully complete...')
            finished = True
        return finished    



def getFile(path):
    
    filename = [f for f in listdir(path) if isfile(join(path, f))]
    if len(filename) > 0 :
       
        return filename[0]

def conversion_MP4_to_WMV(filename):

    try : 
        print('Initialising the conversion process...')
        ffmpeg = FFmpeg(
            executable='C:\\ffmpeg\\bin\\ffmpeg.exe',
            inputs={path_input + filename + '.' + filename_ext:None},
            outputs={path_output +filename + '.wmv':"-filter:v fps=25"}
        )
        ffmpeg.cmd
        print('The conversion process is about to start...')
        ffmpeg.run()
    except:
        print('The conversion failed...')

if __name__ == "__main__":
    ath_input = 'C:\\Temp\\A_transcoder\\CONVERT\\Input\\'
    path_output = "C:\\Temp\\A_transcoder\\CONVERT\\Output\\"
    
    event_handler = ConversionEventHandler()
    

    observer = Observer()
    observer.schedule(event_handler, path_input, recursive=True)
    observer.start()
    try:
        while True:
            sleep(1)         
            filename = getFile(path_input)
            
            if is_file_copying_finished(path_input + str(filename or "")):
                filename_without_ext = filename.partition('.')[0]
                filename_ext = filename.partition('.')[2]
                #print(filename_ext)
                conversion_MP4_to_WMV(filename_without_ext)
                
                
            
                    
    except :
        print('The process failed. Retry...')         
        observer.stop()   
        observer.join()

IV Conclusion

Nous avons vu comment utiliser l’exécutable ffmpeg avec Python en association avec le module ffmpy. Vous pouvez maintenant utiliser ce code pour effectuer tout type de conversion de fichier vidéo.

blog comments powered by Disqus