############################################################################ # # Name: analyis_video_3.py # Python: 3.3.2 on win32 # Autor: Adrian Haas # ############################################################################ # Visual output of audio with high pass filtering ############################################################################ # # Visualize a bird's voice (time and freq. domain) and demonstrate a high # pass filter. FFmpeg is used to read/write the videos. Video parameters are # fixed to length: 19s, rate: 25 ps.(for longer videos use a buffer solution) # Output design, created with matplotlib, is adapted to HD format 1920x1080. # ############################################################################ import numpy from subprocess import Popen, PIPE import struct import matplotlib.pyplot as plt import matplotlib.animation as animation from mpl_toolkits.axes_grid1.inset_locator import inset_axes from mpl_toolkits.axes_grid1.inset_locator import zoomed_inset_axes from scipy import signal as sgn ############################################################################ #read audio and video ############################################################################ ############################################################################ #a) header information ############################################################################ movie="amsel_solo_3.mov" p= Popen(['ffprobe', '-show_streams', '-show_format', movie], stdout=PIPE) meta=p.communicate() meta=meta[0] meta=meta.decode("utf-8") print(meta) arr=meta.split("\r\n") #use of metadata for row in arr: if row[:7]=="height=": yline=int(row[7:]) if row[:6]=="width=": xline=int(row[6:]) if row[:12]=="sample_rate=": samplefreq=int(row[12:]) ############################################################################ #b) read audio ############################################################################ print("read audio...") command = [ 'ffmpeg', '-i', movie, '-f', 's16le', '-acodec', 'pcm_s16le', '-ar', str(samplefreq), '-ac', '2', '-'] pipe = Popen(command, stdout=PIPE, bufsize=10**8) raw_audio = pipe.stdout.read(19*samplefreq*4) pipe.terminate() ############################################################################ # signal processing (audio) ############################################################################ print("signal processing (audio)...") #transter to numpy array, framelength 2 means 16bit, convert b to int signal=numpy.frombuffer(raw_audio, dtype='int16') #stereo evens are channel 1, odds channel 2 channel1=signal[0::2].copy() channel2=signal[1::2].copy() #hp filter designed with window method, no real time analysis->length ok. hp=[-0.00037944848299544, -0.00033783890593945315, -0.00029253504163290434, -0.00023970311055145115, -0.00017477461652577663, -9.272830137669598e-05, 1.1574793784137145e-05, 0.0001430157491940401, 0.0003058165774437, 0.0005031301255852511, 0.0007366389481250697, 0.0010061815325713168, 0.001309424044835294, 0.0016415948188006856, 0.001995297158164091, 0.0023604136922910296, 0.0027241126009814823, 0.0030709625898008874, 0.003383159673456869, 0.0036408647424367303, 0.0038226466933397978, 0.003906021749120683, 0.0038680756368065458, 0.0036861516783489997, 0.003338584726807266, 0.002805458371814, 0.0020693610521889515, 0.0011161157324663504, -6.454232072044776e-05, -0.0014783643617184057, -0.003126086643543448, -0.005003049119473246, -0.007098951678833729, -0.00939776129234234, -0.011877778954360235, -0.014511870412676195, -0.01726785953199777, -0.02010907791722435, -0.022995059303958208, -0.02588236238252093, -0.028725501326484312, -0.031477959502404534, -0.03409325877997916, -0.03652605465307106, -0.038733226105613125, -0.04067492886462582, -0.04231558139435124, -0.043624754685477746, -0.0445779395323348, -0.04515716848716617, 0.9546485260770975, -0.04515716848716618, -0.044577939532334805, -0.043624754685477767, -0.04231558139435129, -0.04067492886462587, -0.03873322610561316, -0.03652605465307111, -0.03409325877997921, -0.03147795950240454, -0.02872550132648437, -0.025882362382520993, -0.022995059303958242, -0.020109077917224424, -0.017267859531997838, -0.014511870412676228, -0.011877778954360292, -0.009397761292342396, -0.007098951678833729, -0.005003049119473299, -0.0031260866435435, -0.0014783643617184358, -6.45423207204986e-05, 0.0011161157324663033, 0.0020693610521889264, 0.0028054583718139653, 0.0033385847268072367, 0.0036861516783489937, 0.0038680756368065193, 0.003906021749120658, 0.0038226466933397826, 0.003640864742436714, 0.003383159673456855, 0.0030709625898008744, 0.00272411260098147, 0.00236041369229102, 0.0019952971581640925, 0.0016415948188006802, 0.0013094240448352891, 0.001006181532571312, 0.000736638948125066, 0.0005031301255852484, 0.00030581657744369766, 0.00014301574919403797, 1.1574793784135181e-05, -9.272830137669764e-05, -0.00017477461652577812, -0.00023970311055145242, -0.00029253504163290585, -0.0003378389059394545, -0.0003794484829954412] #filter channel 1 sigfilter=sgn.lfilter(hp,1,channel1) #stack to channel 1, output half time not filtered/filtered onpos=int(len(channel1)/2) channel1[onpos:]=sigfilter[onpos:] #same for channel 2 sigfilter=sgn.lfilter(hp,1,channel2) channel2[onpos:]=sigfilter[onpos:] ############################################################################ #c) readvideo ############################################################################ print ("read video...") #read video and return numpy matrix(t,x,y,rgb) #open pipe for reading p = Popen(['ffmpeg', '-i', movie, '-f', 'image2pipe', '-pix_fmt', 'rgb24', '-vcodec', 'rawvideo', '-'], stdout=PIPE) i=0 while i<19*25: #take next frame newimage=p.stdout.read(xline*yline*3) # if end, newimage is empty try: image=numpy.fromstring(newimage, dtype='uint8').reshape((yline,xline,3)) except: break # add to txyrgb-matrix if i==0: matrix=numpy.zeros((1,yline,xline,3),dtype='uint8') matrix[0,:,:,:]=image else: imageadd=numpy.zeros((1,yline,xline,3),dtype='uint8') imageadd[0,:,:,:]=image matrix=numpy.concatenate((matrix,imageadd),axis=0) i+=1 p.stdout.close() ############################################################################ #Animation ############################################################################ print("start writing...") #parameters step=int(samplefreq/25) #time frame range per image kf=int(samplefreq/2/1000) #fft freq normed to kHz pic0=matrix[0,:,:,:] #first pic #function def ani(i): # animation function pic0=matrix[i,:,:,:] pic.set_data(pic0) sig0=channel1[i*step:(i+1)*step] sig.set_ydata(sig0) sig2.set_ydata(abs(numpy.fft.fft(sig0))) if i==10*25: text2.set_text(" ON") text2.set_color("green") return sig,pic,sig2,text2, #create plot #define figure without frame and define size fig = plt.figure(frameon=False) fig.set_size_inches(19.2,10.8) #1 inch 100 pixel #image has to fill the whole figure ax = plt.Axes(fig, [0., 0., 1., 1.]) ax.set_axis_off() fig.add_axes(ax) #add image pic=ax.imshow(pic0) #time window overlay axins=plt.axes([.25, .65, .65, .25]) axins.set_title("Time frame",color="white",fontsize=20) axins.patch.set_alpha(0.5) axins.set_xlabel("<- "+str(numpy.round(1000*1/25,1))+"ms ->", color="white",fontsize=20) sig0=channel1[0:step] sig,=plt.plot(sig0,'k') plt.xlim(0,len(sig0)) plt.ylim(-4000,4000) plt.tick_params( which='both', bottom='off', top='off', labelbottom='off', left='off', labelleft='off', right='off') #filter response window overlay axins=plt.axes([.05, .15, .3, .25]) axins.patch.set_alpha(0.5) axins.set_title("FIR HP",color="white",fontsize=20) axins.set_xticks([0,50,100]) axins.set_xticklabels([-50,0,50],color="white",fontsize=20) tabs=plt.plot(hp,'k') text2=axins.text(0.2, 0.6, " OFF", color="red",fontsize=20) plt.tick_params( which='both', #bottom='off', top='off', #labelbottom='off', left='off', labelleft='off', right='off') #fft window overlay axins2=plt.axes([.6, .15, .3, .25]) axins2.patch.set_alpha(0.5) axins2.set_title("Spectrum",color="white",fontsize=20) axins2.set_xticks([0,step/kf*1,step/kf*2,step/kf*3,step/kf*4,step/kf*5]) axins2.set_xticklabels([0,1,2,3,4,5],color="white",fontsize=20) axins2.set_xlabel("kHz",color="white",fontsize=20) sig0=channel1[0:step] sig2,=plt.plot(abs(numpy.fft.fft(sig0)),'k') plt.xlim(0,step/kf*5) #22050/5000 to get 5kHz range plt.ylim(0,640000) plt.tick_params( which='both', #bottom='off', top='off', #labelbottom='off', left='off', labelleft='off', right='off') #animate and save to video: movie_ani = animation.FuncAnimation(fig, ani, frames=19*25,interval=1, blit=False) Writer = animation.writers['ffmpeg'] writer = Writer(fps=25, metadata=dict(artist='Haas Adrian'), bitrate=45675,extra_args=['-vcodec', 'libx264', '-pix_fmt', 'yuv420p']) movie_ani.save('video.mp4', writer=writer) ############################################################################ # add audio to video ############################################################################ print("add audio to video...") #multiplex channels signalout=numpy.zeros(len(channel1)*2,dtype='int16') signalout[0::2]=channel1 signalout[1::2]=channel2 #change int16 to bytes, use the struct module <:little endian, h: integer(2) order='<'+str(len(signalout))+'h' audio_bytes=struct.pack(order, *signalout ) #adding sound to video command = [ 'ffmpeg', '-f', 's16le', '-acodec', 'pcm_s16le', '-ar', str(samplefreq), '-ac', '2', '-i', '-', '-y', '-i', 'video.mp4', '-vcodec', 'copy', 'video_sound.mp4'] pipe = Popen(command, stdout=PIPE,stdin=PIPE,stderr=PIPE) pipe.communicate(audio_bytes) #stdin.write instead communicate not work ?! pipe.stdin.close() pipe.terminate() #plt.show()