############################################################################ # # Name: optical_flow_diff_method.py # Python: 3.3.2 on win32 # Autor: Adrian Haas # ############################################################################ # Calculate optical flow ############################################################################ # # Calculate optical flow, see "Digitale Bildvearbeitung, Jähne,7.Auflage" # S.455ff, and use different filter masks. To gain a signal the average # is taken from the output vectors. # ############################################################################ import numpy from subprocess import Popen, PIPE import struct import warnings import matplotlib import matplotlib.pyplot as plt import matplotlib.animation as animation from mpl_toolkits.axes_grid1.inset_locator import inset_axes from mpl_toolkits.axes_grid1.inset_locator import zoomed_inset_axes from scipy import ndimage as fil from scipy import signal as sgn import matplotlib.image as mpimg ############################################################################ #read video ############################################################################ ############################################################################ #a) header information ############################################################################ movie="MVI_4113.mov" warnings.filterwarnings("ignore") #-> if run from python idle p= Popen(['ffprobe', '-show_streams', '-show_format', movie], stdout=PIPE) meta=p.communicate() meta=meta[0] meta=meta.decode("utf-8") print(meta) arr=meta.split("\r\n") #use of metadata for row in arr: if row[:7]=="height=": yline=int(row[7:]) if row[:6]=="width=": xline=int(row[6:]) if row[:9]=="duration=": duration=int(float(row[9:])) print("dim",xline,yline,duration) ############################################################################ #b) read video ############################################################################ #open pipe for reading p = Popen(['ffmpeg', '-i',movie,'-ss','0','-f', 'image2pipe', '-pix_fmt', 'rgb24', '-vcodec', 'rawvideo', '-'], stdout=PIPE) ############################################################################ #Animation function ############################################################################ def ani(i): global oldpic #READ NEXT FRAME newimage=p.stdout.read(xline*yline*3) image=numpy.fromstring(newimage,dtype='uint8').reshape((yline,xline,3)) image2=image.copy() #restore orginal img for plot #rgb->bw newpic=numpy.mean(image,axis=2) newpic=numpy.floor(newpic) if i==0: oldpic=newpic #OPTICAL FLOW CALCULATION #differentials dx,dy=numpy.gradient(newpic) dt=(newpic-oldpic) #create matrix elements dxdt=dx*dt dydt=dy*dt dxdx=dx*dx dydy=dy*dy dxdy=dx*dy #gaussian filter rdxdtf=fil.gaussian_filter(dxdt,4) rdydtf=fil.gaussian_filter(dydt,4) rdxdxf=fil.gaussian_filter(dxdx,4) rdydyf=fil.gaussian_filter(dydy,4) rdxdyf=fil.gaussian_filter(dxdy,4) #calculate optical flow vectors f1,f2 detM=rdxdxf*rdydyf-rdxdyf**2 f1=-(rdxdtf*rdydyf-rdydtf*rdxdyf)/(detM+(detM==0)) f2=-(rdydtf*rdxdxf-rdxdtf*rdxdyf)/(detM+(detM==0)) #-> /(detM+(detM==0)workaround. bug in idle for numpy.divide #calculate eigenvalues l1,l2 l1=(rdxdxf+rdydyf)/2+(4*(rdxdyf)**2+(rdxdxf-rdydyf)**2)**0.5/2 l2=(rdxdxf+rdydyf)/2-(4*(rdxdyf)**2+(rdxdxf-rdydyf)**2)**0.5/2 # calculate magnitude and angle f_abs=(f1**2+f2**2)**0.5 if i>0: f_abs=f_abs/numpy.max(f_abs) theta=numpy.arctan2(-f2,f1) #SET FILTER MASKS: #rule0: drop detM almost 0 mask0=(abs(detM)<1) #rule1: l1 and l2 should not be to small factor=150 #350 #50 th=numpy.max(l1)*0.001*factor mask1=l1<th th=numpy.max(l2)*0.001*factor mask2=l2<th #rule2: l1/l2 should not be to large mask3=l1<0.02*l2 #rule3: magnitude not too small, i.e. camera movement mask4=f_abs<0.1 #CREATE IMAGE h=(theta+numpy.pi)/(2*numpy.pi) s=numpy.zeros((yline,xline)) v=numpy.zeros((yline,xline)) s[:,:]=1 v[:,:]=0.8 s[mask0]=0 v[mask0]=0 s[mask1]=0 v[mask1]=0 s[mask2]=0 v[mask2]=0 s[mask4]=0 v[mask4]=0 hsv=numpy.dstack((h,s,v)) rgbout=255*matplotlib.colors.hsv_to_rgb(hsv) image=numpy.uint8(rgbout) #CREATE SIGNAL VECTORS f1[mask0]=0 f1[mask1]=0 f1[mask2]=0 f1[mask4]=0 f2[mask0]=0 f2[mask1]=0 f2[mask2]=0 f2[mask4]=0 f1_mean=-numpy.mean(f1) f2_mean=-numpy.mean(f2) direction=numpy.arctan2(-f2_mean,f1_mean) dir_x=numpy.cos(direction-numpy.pi/2) dir_y=numpy.sin(direction-numpy.pi/2) #UPDATE OBJECTS oldpic=newpic.copy() pic0=image pic20=image2 sig0[i]=(-direction) pic.set_data(pic0) pic2.set_data(pic20) zeiger.set_xdata(240+100*dir_x) zeiger.set_ydata(320+100*dir_y) zeiger2.set_xdata([240,240+100*dir_x]) zeiger2.set_ydata([320,320+100*dir_y]) signal.set_ydata(-sig0[:i]) signal.set_xdata(time[:i]) print(i/25/2) return pic,pic2,signal,zeiger,zeiger2, ############################################################################ #Create Plot ############################################################################ #plot init data pic0=numpy.zeros(yline*xline*3).reshape((yline,xline,3)) pic20=numpy.zeros(yline*xline*3).reshape((yline,xline,3)) dir_x=0 dir_y=0 time=numpy.arange(0,2*duration*25,1) sig0=numpy.zeros(2*duration*25) #create legend centerx=xline/2 centery=yline/2 degree_table=numpy.fliplr( numpy.fromfunction(lambda i,j: numpy.angle((i-centerx)+1j*(j-centery)), (xline,yline),dtype=int)) h=(degree_table+numpy.pi)/(2*numpy.pi) s=numpy.zeros((xline,yline)) v=numpy.zeros((xline,yline)) s[:,:]=1 v[:,:]=0.8 hsv=numpy.dstack((h,s,v)) rgbout=255*matplotlib.colors.hsv_to_rgb(hsv) legend=numpy.uint8(rgbout) #define plot without frames fig = plt.figure(frameon=False) fig.set_size_inches(19.2,10.8) #1 inch 100 pixel ax = plt.Axes(fig, [0., 0., 1., 1.]) ax.set_axis_off() fig.add_axes(ax) #Optical flow image (right) axins=plt.axes([640/1920,0,1280/1920,960/1080]) axins.text(10.0, 15,"Optischer Fluss mit Eigenwertfilter", color="white",fontsize=20) pic=axins.imshow(pic0,aspect="auto") plt.tick_params( which='both', bottom='off', top='off', labelbottom='off', left='off', labelleft='off', right='off') #Input image (left bottom) axins=plt.axes([0,0,640/1920,480/1080]) axins.text(10.0, 30,"Orignalbild",color="white",fontsize=20) pic2=axins.imshow(pic20,aspect="auto") plt.tick_params( which='both', bottom='off', top='off', labelbottom='off', left='off', labelleft='off', right='off') #signal output (top of top) axins=plt.axes([0,960/1080,1920/1920,120/1080]) axins.text(10.0, 0.0,"Winkel/Zeit ($-\pi,+\pi$)",color="white", fontsize=20) axins.set_axis_bgcolor("black") signal,=plt.plot(time,sig0,color="white") plt.ylim(-numpy.pi,numpy.pi) plt.xlim(0,2*duration*25) plt.tick_params( which='both', bottom='off', top='off', labelbottom='off', left='off', labelleft='off', right='off') #Legend(left top) axins=plt.axes([0,480/1080,640/1920,480/1080]) axins.text(10.0, 40,"Richtung",color="white",fontsize=20) axins.text(100.0, 320,"$270\degree$",color="white",fontsize=14) axins.text(380.0, 320,"$90\degree$",color="white",fontsize=14) axins.text(240.0, 100,"$0\degree$",color="white",fontsize=14) axins.text(230.0, 540,"$180\degree$",color="white",fontsize=14) axins.imshow(legend,aspect="auto") zeiger,=plt.plot(240+dir_x,320+dir_y,"ko") zeiger2,=plt.plot([240,240+dir_x],[320,320+dir_y],color="black") plt.ylim(640,0) plt.xlim(0,480) plt.tick_params( which='both', bottom='off', top='off', labelbottom='off', left='off', labelleft='off', right='off') ############################################################################ #Animate and write video ############################################################################ movie_ani = animation.FuncAnimation(fig, ani, frames=2*duration*25, interval=1, blit=False) Writer = animation.writers['ffmpeg'] writer = Writer(fps=2*25, metadata=dict(artist='Haas Adrian'), bitrate=45675, extra_args=['-vcodec', 'libx264', '-pix_fmt', 'yuv420p']) movie_ani.save('video.mp4', writer=writer) p.stdout.close() p.wait() #plt.show()