Source code for gwa.eperf.client_tcp

#!/usr/bin/python
#-*-coding:iso-8859-15-*-


import socket,time,os,re,sys
from multiprocessing import Process, Array
from function import *
buf=1460


[docs]class ClientTCP: """ This class allow to create an object client. This client establish the number of connection, indicate while the object creation. The connection execute the same function to calculate bandwidth between interval and during the total duration. Constructor: new_client = ClientTCP() Function: initialization() runClient() formatValue() Client() bandwidthTCPDownload() bandwidthTCPDownload() stopClient() Constuctor of this class. The constructor take 6 arguments to create an ClientTCP object. :Argument: address (type Tuple) This argument represents addres of the server, the format ist ("127.0.0.1",5000) connection (type Integer) This argument represents the number of connection created interval (type Integer) This argument represents the time between two measure duration (type Integer) This argument represents the total time for this measure upload (type Boolean) This arguments represents if the measure execute the bandwidthTCPUpload function download (type Boolean) This arguments represents if the measure execute the bandwidthTCPDownload function **example** >>> ClI_TCP=ClientTCP(address,connection,interval,duration,upload,download) >>> CLI_TCP.runClient() """ def __init__(self,address,connection,interval,duration,upload,download): self.address=address self.connection=connection self.interval=interval self.duration=duration self.upload=upload self.download=download self.bwDownload=[] self.bwUpload=[] self.curTime=Timer() self.intTime=Timer()
[docs] def initialization(self,direction): """ This function allow to create the TCP socket and connection with the address specified in the constructor. After that, client programm send to the server the common parameters (interval, duration, direction). The answer's server is an number to identify the client ("self.ID"). :Argument: direction (type String) "download" or "upload" This common parameter allow to synchronize the server. """ # connection establishment self.mySocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #~ mySocket.settimeout(interval*2) self.mySocket.connect(self.address) self.mySocket.sendall(str(self.interval)+";"+str(self.duration)+";"+str(direction)+";") self.ID=self.mySocket.recv(1024).split(";")[0]
[docs] def runClient(self): """ This is principal function of this class. When it's called, several processes are started simultaneously and bandwidth is measured between this processes. """ #initialization of the value in the good format if the duartion is not modulo interval, we add the time necessary to become modulo processusClient=[] if not self.duration%self.interval == 0: self.duration=self.duration+self.interval-self.duration%self.interval #Creation of shared memory area initialized with int [ 1, 2, 3, ..] sharedProcessArray=Array('i',range(self.connection*(int(self.duration/self.interval)+1))) #Creation of Process simultaneously, they are connected with Client function for i in range(self.connection): processusClient.append(Process(target=self.Client,args=(sharedProcessArray,i))) processusClient[i].start() # If the boolean "upload" is True, so the main process wait that the array is fully completed # read the value in the memory and run the "formatValue" to put in the bwUpload attribut. if self.upload : for i in range(self.connection*(int(self.duration/self.interval)+1)): while sharedProcessArray[i]==i : pass for i in range(self.connection): temp=[] index=i*(int(self.duration/self.interval)+1) for j in range(int(self.duration/self.interval)+1): temp.append(sharedProcessArray[index+j]) self.bwUpload.append(temp) self.bwUpload=self.formatValue(self.bwUpload) for i in range(self.connection*(int(self.duration/self.interval)+1)): sharedProcessArray[i]=i temp=[] # If the boolean "download" is True, so the main process wait that the array is fully completed # read the value in the memory and run the "formatValue" to put in the bwDownload attribut. temp=[] if (self.download and not self.upload) or (self.download and self.upload): for i in range(self.connection*(int(self.duration/self.interval)+1)): while sharedProcessArray[i]==i : pass for i in range(self.connection): temp=[] index=i*(int(self.duration/self.interval)+1) for j in range(int(self.duration/self.interval)+1): temp.append(sharedProcessArray[index+j]) self.bwDownload.append(temp) self.bwDownload=self.formatValue(self.bwDownload) temp=[] for i in range(self.connection): processusClient[i].terminate()
[docs] def formatValue(self,listValue): """ This function allow to format the list passed in argument to the good format. :Argument: listValue (type List) This argument represents the list of values list. Values represents the number of octets sent or receive. [[inta, intb, intc], [int1, int2, int3]] :Return: A String list with the rigth unit for the bandwidth. [ "inta+int1 Mbits/s", "intb+int2 Mbits/s", "intc+int3 Kbits/s"] """ temp=[] for i in range(len(listValue[0])): #~ print sum([ e[i] for e in bwUpload]) temp.append(sum([ e[i] for e in listValue])) listValue=temp #~ print temp temp=[strbandwidth(listValue[len(listValue)-1:][0],self.duration)] listValue=[strbandwidth(e,self.interval) for e in listValue[:-1]]+temp return listValue
[docs] def Client(self,sharedProcessArray,number): """ This function is executed by the created processes to measure the bandwidth. Each process determine the first index of the shared memory area between the processes, to enter the values measured. Processes run "bandwidthTCPUpload" and / or "bandwidthTCPDownload" functions, after that booleans values, "upload" and "download", are tested. :Argument: sharedProcessArray (type Array) This argument is an shared Array between processes (shared memory area) number (type Int) Its used to calculate the first index of the shared memory area. """ index=number*(int(self.duration/self.interval)+1) if self.upload: self.bandwidthTCPUpload(sharedProcessArray,index) time.sleep(2) if self.download: self.bandwidthTCPDownload(sharedProcessArray,index)
[docs] def bandwidthTCPDownload(self,sharedProcessArray,index): """ This function allow to initialize the connection with the server, then run a loop to receive data of the server. This loop calculate the bandwidth on each interval and during the length of the measure and fill the shared memory area. At the end, this function close the socket. :Argument: sharedProcessArray (type Array) This argument is an shared Array between processes (shared memory area) index (type Int) This argument represents the fist Array index for the process """ # Initialize local variables and synchronize with the server self.initialization("download") self.curTime.restart() self.intTime.restart() sumNbBytes=0 intNbBytes=0 while 1: # calculate the number of octets receive. data = self.mySocket.recv(buf) sumNbBytes=sumNbBytes+len(data) intNbBytes=intNbBytes+len(data) # the server send "int" at each interval, the client put the value in the shared Array if "int" in data: print "["+self.ID+"] %.2f s\t\t\t"%self.curTime.get_value()+strNbData(intNbBytes)+"\t\t"+strbandwidth(intNbBytes,self.interval) sharedProcessArray[index]=intNbBytes index=index+1 intNbBytes=0 # The server send "quit" at the end of the measure, the client quit the infinity loop if "quit" in data: break print "["+self.ID+"] [0-%.1f] s\t\t\t"%self.duration+strNbData(sumNbBytes)+"\t\t"+strbandwidth(sumNbBytes,self.duration) sharedProcessArray[index]=sumNbBytes self.mySocket.close()
[docs] def bandwidthTCPUpload(self,sharedProcessArray,index): """ This function allow to initialize the connection with the server, then run a loop to send data to the server. This loop calculate the bandwidth on each interval and during the length of the measure and fill the shared memory area. At the end, this function close the socket. :Argument: sharedProcessArray (type Array) This argument is an shared Array between processes (shared memory area) index (type Int) This argument represents the fist Array index for the process """ # Initialize local variables and synchronize with the server self.initialization("upload") self.curTime.restart() self.intTime.restart() sumNbBytes=0 intNbBytes=0 # The client manage the time with this loop while self.curTime.get_value() < self.duration: # At each interval, the client put the value in the shared array if self.intTime.get_value() > self.interval: self.mySocket.send("int") print "["+self.ID+"] %.2f s\t\t\t"%self.curTime.get_value()+strNbData(intNbBytes)+"\t\t"+strbandwidth(intNbBytes,self.interval) sharedProcessArray[index]=intNbBytes index=index+1 intNbBytes=0 self.intTime.restart() # The Client send many data and calculate the number of octets acknowledged nbBytes = self.mySocket.send("0123456789"*148) sumNbBytes=sumNbBytes+nbBytes intNbBytes=intNbBytes+nbBytes self.mySocket.send("int") sharedProcessArray[index]=intNbBytes index=index+1 self.mySocket.send("quit") print "["+self.ID+"] %.2f s\t\t\t"%self.curTime.get_value()+strNbData(intNbBytes)+"\t\t"+strbandwidth(intNbBytes,self.interval) print "["+self.ID+"] [0-%.1f] s\t\t\t"%self.duration+strNbData(sumNbBytes)+"\t\t"+strbandwidth(sumNbBytes,self.duration) sharedProcessArray[index]=sumNbBytes self.mySocket.close()
[docs] def stopClient(self): """ This function quit the programm """ exit(0)