Source code for gwa.code.measurement

#!/usr/bin/python
# -*- coding: utf-8 -*-

import commands
import re
from eperf.server_udp import *
from eperf.client_udp import *
from eperf.server_tcp import *
from eperf.client_tcp import *

[docs]class Measurement: """ The modulce 'measurement.py' permit to manage the movings and note them during a campaign :arguments: - x_max (integer) Choice of the room's lenght - y_max (integer) Choice of the width of the room - x_step (integer) Choice of the step for the moving on the length of the room - y_step (integer) Choice of the step for the movings on the width of the room **example** >>> x_max = 20 >>> y_max = 12 >>> x_step = 1 >>> y_step = 1 "measurement.Measurement(x_max,y_max,x_step,y_step)" """ def __init__(self, x_max_arg, y_max_arg,z_max_arg, x_step_arg, y_step_arg,z_step_arg): """ Initialization of the room's coordinates and the step chosen for the measurements """ self.x_coor = 0 self.y_coor = 0 self.z_coor = 0 self.increase = True self.x_step = int(x_step_arg) self.y_step = int(y_step_arg) self.z_step = int(z_step_arg) self.y_max = int(y_max_arg) self.x_max = int(x_max_arg) self.z_max = int(z_max_arg) """ In the initialization, we manage to find the name of the wifi interface present on the machine """ result=commands.getoutput('gksu iwconfig') regex=re.compile('([\w]{2,8}) *(.*)') result_regex=regex.findall(result) for elem in result_regex: if elem[1]!='no wireless extensions.': regex=re.compile('([a-z]{2,4}\d{1})$') result_regex=regex.findall(elem[0]) if result_regex!=[]: inter=result_regex self.interface = inter[0]
[docs] def forceWireless(self, essid, mac_address=''): """ The "forceWireless" function forces the wifi card to connect to a unique SSID or to an AP via its MAC address, or to a SSID on an AP. :argument: mac_address ( str type ) Mac address of the AP we want to force essid ( str type ) Name of the ESSID we want to force """ #If we force the ESSID and the MAC address of the AP if mac_address: if essid is 'hidden ESSID': result=commands.getoutput('gksu iwconfig %s ap %s' % (self.interface,mac_address)) else: result=commands.getoutput('gksu iwconfig %s essid \'%s\' ap %s' % (self.interface,essid,mac_address)) else: result=commands.getoutput('gksu iwconfig %s essid \'%s\'' % (self.interface,essid))
[docs] def moveForward(self): """ The function 'moveForward' permits to create the journey to follow, to realize the measurements list It will manage the incrementation and decrementation of x and y, while avoiding the walls. :return: The function returns 'True' when everything went OK, and 'False' when it's not possible to move forward anymore """ if self.increase: if self.x_coor == self.x_max or (self.x_coor+self.x_step) > self.x_max: if self.y_coor == self.y_max or (self.y_coor+self.y_step) > self.y_max: return False else: self.incY() else: self.incX() else: if self.x_coor == 0 or (self.x_coor-self.x_step) < 0: if self.y_coor == self.y_max or (self.y_coor+self.y_step) > self.y_max: return False else: self.incY() else: self.decX() return True
[docs] def moveBack(self): """ The function 'moveBack' permit to take a few steps back during the measurements notes. It will manages the incrementation and decrementation of x and y the same way than the "moveForward' function :return: The function returns 'True' when everything went OK, and 'False' when it's not possible to move forward anymore """ if self.increase: if self.x_coor == 0 or (self.x_coor-self.x_step) < 0: if self.y_coor == 0: return False else: self.decY() else: self.decX() else: if self.x_coor == self.x_max or (self.x_coor+self.x_step) > self.x_max: if self.y_coor == 0: return False else: self.decY() else: self.incX() return True
[docs] def incX(self): """ The function 'incX' permit to increase 'x' with the step's value """ if (self.x_coor + self.x_step) > self.x_max: raise OverflowException("X") else: self.x_coor += self.x_step
[docs] def incY(self): """ The function 'incY' permit to increase 'y' with the step's value """ if (self.y_coor + self.y_step) > self.y_max: raise OverflowException("Y") else: self.y_coor += self.y_step self.increase = not self.increase
[docs] def incZ(self): """ The function 'incZ' permit to increase 'z' with the step's value """ if (self.z_coor + self.z_step) > self.z_max: raise OverflowException("Z") else: self.z_coor += self.z_step
[docs] def decX(self): """ The function 'decX' permit to decrease 'x' with the step's value """ if (self.x_coor - self.x_step) < 0: raise OverflowException("X") else: self.x_coor -= self.x_step
[docs] def decY(self): """ The function 'decY' permit to decrease 'Y' with the step's value """ if (self.y_coor - self.y_step) < 0: raise OverflowException("Y") else: self.y_coor -= self.y_step self.increase = not self.increase
[docs] def decZ(self): """ The function 'decZ' permit to decrease 'z' with the step's value """ if (self.z_coor - self.z_step) < 0: raise OverflowException("Z") else: self.z_coor -= self.z_step
[docs] def get_coor(self): """ The function 'get_coor' permit to collect the current position. :return: The function returns a tuple with the x and y values\n exemple: (2,5,1) """ return (self.x_coor, self.y_coor, self.z_coor)
[docs] def measurementr(self): """ The function 'measurementr' permit to display the power list at a chosen moment with the help of the iwconfig command :return: A dictionnary representing the different fields we listed with the iwconfig command Here are the dictionnary fields : ESSID, Access Point, Signal level, Bit Rate and Noise level. """ # dictionnary creation dico={} # command running output = commands.getoutput('gksu iwconfig '+self.interface) # Research of the ESSID the user is connected on (by default off/any) if re.search('ESSID',output): regex=re.compile('(ESSID)[:|=]"(.*).*"') result_regex=regex.findall(output) if result_regex: dico[result_regex[0][0]] = result_regex[0][1] else: dico['ESSID'] = "off/any" ;# value by default else: dico['ESSID'] = "off/any" ;# value by default # AP's MAC address research if re.search('Access Point',output): regex=re.compile('(Access Point)[:|=] ([0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}).*') result_regex=regex.findall(output) if result_regex: dico[result_regex[0][0]] = result_regex[0][1] else: dico['Access Point'] = "not associated" # Signal level research (by default -100) if re.search('Signal level',output): regex=re.compile('(Signal level)[:|=](-{0,1}[0-9]{1,3}) *') result_regex=regex.findall(output) if result_regex: dico[result_regex[0][0]] = result_regex[0][1] else: dico['Signal level'] = -256 ;# value by default else: dico['Signal level'] = -256 ;# value by default # Bit rate research on the media if re.search('Bit Rate',output): regex=re.compile('(Bit Rate)[:|=]([0-9\.]{1,4}.*[mMkKgG]{1}b/s)') result_regex=regex.findall(output) if result_regex: dico[result_regex[0][0]] = result_regex[0][1] else: dico['Bit Rate'] = "None" # Noise level research (by default -256) if re.search('Noise level',output): regex=re.compile('(Noise level)[:|=](-{0,1}[0-9]{1,3}) *') result_regex=regex.findall(output) if result_regex: dico[result_regex[0][0]] = result_regex[0][1] else: dico['Noise level'] = -256 ;# value by default else: dico['Noise level'] = -256 ;# value by default # returns the dictionnary return dico
def eperf(self, ip, interval, duration,connection, port, download, upload, tcp, udp, rate_udp='1M'): temp="" # print "ip:"+ip+" interval:"+str(interval)+" duration:"+str(duration)+" port:"+str(port)+" udp:"+str(udp)+" rate_udp:"+str(rate_udp)+" tcp:"+str(tcp)+" upload:"+str(upload)+" download"+str(download) #create the UDP client if udp == True: if "M" in rate_udp: rate_udp=int(rate_udp[:-1])*1024*1024 else: rate_udp=int(rate_udp[:-1])*1024 address=(ip,int(port)+1) CLI_UDP = UDPClient(address,int(interval),int(duration),rate_udp,upload,download) if upload == True: CLI_UDP.uploadUDP() # print "UDP Upload list" # print CLI_UDP.listFinalUp temp=temp+";".join(CLI_UDP.listFinalUp) temp=temp+"#" time.sleep(1) if download == True: CLI_UDP.downloadUDP() # print "UDP Download list" # print CLI_UDP.listFinalDown temp=temp+";".join(CLI_UDP.listFinalDown) else: temp=temp+"#" temp=temp+"*" #create the TCP client if tcp : address=(ip,int(port)) ClI_TCP = ClientTCP(address, int(connection), int(interval), int(duration), upload, download) ClI_TCP.runClient() if upload: # print "TCP Upload list" # print ClI_TCP.bwUpload temp=temp+";".join(ClI_TCP.bwUpload) temp=temp+"#" if download: # print "TCP Download list" # print ClI_TCP.bwDownload temp=temp+";".join(ClI_TCP.bwDownload) else: temp=temp+"#" # print temp return temp #~ def eperf(self, ip, interval, duration, port, udp=False, rate_udp='1M'): #~ """ #~ The function 'eperf' permit to make the bit rate list at a chosen moment with the help of eperf command #~ If the UDP mode es selected, the default rate is 1M. #~ #~ :return: #~ A string with all the bit rates measured, concatenated with the char '/' #~ """ #~ #~ if udp: #~ command = 'eperf -c %s -i %i -t %i -u -b %s -p %i' % (ip, int(interval), int(duration), rate_udp, int(port)) #~ else: #~ command = 'eperf -c %s -i %i -t %i -p %i' % (ip, int(interval), int(duration), int(port)) #~ #~ output=commands.getoutput(command) #~ if re.search('Connection refused', output): #~ regex=re.compile('(Connection refused).*') #~ result_regex=regex.findall(output) #~ if result_regex: #~ return "Connection refused, check the server" #~ output=output.split("/sec") #~ output.pop(0) #~ debit=[] #~ for statement in output: #~ regex=re.compile('.* ([\.0-9]{1,}.*[KkMm]{1}bits)') #~ result_regex=regex.findall(statement) #~ if result_regex: #~ debit.append(result_regex[0]) #~ return ";".join(debit)
[docs] def iwlist(self): """ The function 'iwlist' permit to make the statement of base stations detected at the moment of the measurement with the iwlist command :return: A list of dictionnary of the different fields """ # Run the command output=commands.getoutput('gksu iwlist '+self.interface+' scanning') output=output.split("Cell") output.pop(0) # We return a dictionnary list listdico=[] # We loop on the elements for element in output: if element: # We store all in the dictionnary dico={} # Research of the AP's MAC address if re.search('Address',element): regex=re.compile('(Address)[:|=|\s] ([0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}).*') result_regex=regex.findall(element) if result_regex: dico[result_regex[0][0]]=result_regex[0][1] else : dico['Address']=" " # ESSID research if re.search('ESSID',element): regex=re.compile('(ESSID)[:|=|\s]"(.*).*"') result_regex=regex.findall(element) if result_regex: if result_regex[0][1] == "\\x00" or result_regex[0][1] == "": dico[result_regex[0][0]]="hidden ESSID" else : dico[result_regex[0][0]]=result_regex[0][1] else : dico['ESSID']=" " # Protocol research if re.search('Protocol',element): regex=re.compile('(Protocol)[:|=|\s]IEEE (802.11[abg]{1,2}).*') result_regex=regex.findall(element) if result_regex: dico[result_regex[0][0]]=result_regex[0][1] else : dico['Protocol']=" " # Channel research if re.search('Channel',element): regex=re.compile('(Channel)[:|=|\s]([0-9]{1,2}).*') result_regex=regex.findall(element) if result_regex: dico[result_regex[0][0]]=result_regex[0][1] else : dico['Channel']=-1 # Encryption research if re.search('Encryption key',element): regex=re.compile('(Encryption key)[:|=|\s]([onfONF]{2,3}).*') result_regex=regex.findall(element) if result_regex: # Part with the cryptography if result_regex[0][1]=='off': dico[result_regex[0][0]] = "Off" if result_regex[0][1]=='on': if re.search('IE:',element): result_regex_2=re.search('(WEP|WPA)',element) try: dico['Encryption key'] = result_regex_2.group() except AttributeError: dico['Encryption key'] = "WEP" else: dico['Encryption key'] = "WEP" else : dico['Encryption key'] = "Unknown" # Signal level research if re.search('Signal level',element): regex=re.compile('(Signal level)[:|=|\s](-{0,1}[0-9]{1,3}) *') result_regex=regex.findall(element) if result_regex: dico[result_regex[0][0]]=result_regex[0][1] else : dico['Signal level']=" " # if the dictionnary is not empty, I add it to my list if dico: # Add the dictionnary in the dictionnary list listdico.append(dico) # Returns the dictionnary list return listdico
[docs] def configurationNetwork(self): """ The function 'configurationNetwork' permet to collect the fileds of the network card: fragment, tx_power and rts_cts with the help of the iwconfig function :return: A dictionnary of the different fields ('Fragment thr','Tx-Power','RTS thr') """ # Run the command output = commands.getoutput('gksu iwconfig '+self.interface) #Dictionnary initialization dico={} # Research if we are in fragment mode if re.search('Fragment thr',output): regex=re.compile('(Fragment thr)[:|=]([onfONF]{2,3}).*') result_regex=regex.findall(output) if result_regex: dico[result_regex[0][0]] = result_regex[0][1] else: dico['Fragment thr'] = "Off" # Research if we are in fragment mode if re.search('Tx-Power',output): regex=re.compile('(Tx-Power)[:|=](-{0,1}[0-9]{1,3}) *') result_regex=regex.findall(output) if result_regex: dico[result_regex[0][0]] = result_regex[0][1] else: dico['Tx-Power'] = -100 # Research if we are in fragment mode if re.search('RTS thr',output): regex=re.compile('(RTS thr)[:|=]([onfONF]{2,3}).*') result_regex=regex.findall(output) if result_regex: dico[result_regex[0][0]] = result_regex[0][1] else: dico['RTS thr'] = "Off" #Returns the dictionnary return dico
[docs]class OverflowException(Exception): """ This class will permit to manage the exception of class size overflow """ def __init__(self, axe): self.axe = axe def __str__(self): return self.axe
if __name__ == '__main__': mes = Measurement(5, 3, 1, 1) #mes.eperf('192.168.128.91',1,5) # Test measurementr function result_measurementr = mes.measurementr() print("---------measurementr-----------") for key,value in result_measurementr.items(): print(" {0} = {1}.".format(key, value)) print("----------iwlist-----------") result_iwlist = mes.iwlist() for elt in result_iwlist: print("######") for key,value in elt.items(): print(" {0} = {1}.".format(key, value)) print("----------configurationNetwork------------") result_configurationNetwork = mes.configurationNetwork() print "%s" % result_configurationNetwork