#!/usr/bin/python
# -*- coding: utf-8 -*-
import commands
import re
from eperf.server_udp import *
from eperf.client_udp import *
from eperf.server_tcp import *
from eperf.client_tcp import *
[docs]class Measurement:
"""
The modulce 'measurement.py' permit to manage the movings and note them during a campaign
:arguments:
- x_max (integer)
Choice of the room's lenght
- y_max (integer)
Choice of the width of the room
- x_step (integer)
Choice of the step for the moving on the length of the room
- y_step (integer)
Choice of the step for the movings on the width of the room
**example**
>>> x_max = 20
>>> y_max = 12
>>> x_step = 1
>>> y_step = 1
"measurement.Measurement(x_max,y_max,x_step,y_step)"
"""
def __init__(self, x_max_arg, y_max_arg,z_max_arg, x_step_arg, y_step_arg,z_step_arg):
"""
Initialization of the room's coordinates and the step chosen for the measurements
"""
self.x_coor = 0
self.y_coor = 0
self.z_coor = 0
self.increase = True
self.x_step = int(x_step_arg)
self.y_step = int(y_step_arg)
self.z_step = int(z_step_arg)
self.y_max = int(y_max_arg)
self.x_max = int(x_max_arg)
self.z_max = int(z_max_arg)
"""
In the initialization, we manage to find the name of the wifi interface present on the machine
"""
result=commands.getoutput('gksu iwconfig')
regex=re.compile('([\w]{2,8}) *(.*)')
result_regex=regex.findall(result)
for elem in result_regex:
if elem[1]!='no wireless extensions.':
regex=re.compile('([a-z]{2,4}\d{1})$')
result_regex=regex.findall(elem[0])
if result_regex!=[]:
inter=result_regex
self.interface = inter[0]
[docs] def forceWireless(self, essid, mac_address=''):
"""
The "forceWireless" function forces the wifi card to connect to a unique SSID or to an AP via its MAC address, or to a SSID on an AP.
:argument:
mac_address ( str type )
Mac address of the AP we want to force
essid ( str type )
Name of the ESSID we want to force
"""
#If we force the ESSID and the MAC address of the AP
if mac_address:
if essid is 'hidden ESSID':
result=commands.getoutput('gksu iwconfig %s ap %s' % (self.interface,mac_address))
else:
result=commands.getoutput('gksu iwconfig %s essid \'%s\' ap %s' % (self.interface,essid,mac_address))
else:
result=commands.getoutput('gksu iwconfig %s essid \'%s\'' % (self.interface,essid))
[docs] def moveForward(self):
"""
The function 'moveForward' permits to create the journey to follow, to realize the measurements list
It will manage the incrementation and decrementation of x and y, while avoiding the walls.
:return:
The function returns 'True' when everything went OK, and 'False' when it's not possible to move forward anymore
"""
if self.increase:
if self.x_coor == self.x_max or (self.x_coor+self.x_step) > self.x_max:
if self.y_coor == self.y_max or (self.y_coor+self.y_step) > self.y_max:
return False
else:
self.incY()
else:
self.incX()
else:
if self.x_coor == 0 or (self.x_coor-self.x_step) < 0:
if self.y_coor == self.y_max or (self.y_coor+self.y_step) > self.y_max:
return False
else:
self.incY()
else:
self.decX()
return True
[docs] def moveBack(self):
"""
The function 'moveBack' permit to take a few steps back during the measurements notes.
It will manages the incrementation and decrementation of x and y the same way than the "moveForward' function
:return:
The function returns 'True' when everything went OK, and 'False' when it's not possible to move forward anymore
"""
if self.increase:
if self.x_coor == 0 or (self.x_coor-self.x_step) < 0:
if self.y_coor == 0:
return False
else:
self.decY()
else:
self.decX()
else:
if self.x_coor == self.x_max or (self.x_coor+self.x_step) > self.x_max:
if self.y_coor == 0:
return False
else:
self.decY()
else:
self.incX()
return True
[docs] def incX(self):
"""
The function 'incX' permit to increase 'x' with the step's value
"""
if (self.x_coor + self.x_step) > self.x_max:
raise OverflowException("X")
else:
self.x_coor += self.x_step
[docs] def incY(self):
"""
The function 'incY' permit to increase 'y' with the step's value
"""
if (self.y_coor + self.y_step) > self.y_max:
raise OverflowException("Y")
else:
self.y_coor += self.y_step
self.increase = not self.increase
[docs] def incZ(self):
"""
The function 'incZ' permit to increase 'z' with the step's value
"""
if (self.z_coor + self.z_step) > self.z_max:
raise OverflowException("Z")
else:
self.z_coor += self.z_step
[docs] def decX(self):
"""
The function 'decX' permit to decrease 'x' with the step's value
"""
if (self.x_coor - self.x_step) < 0:
raise OverflowException("X")
else:
self.x_coor -= self.x_step
[docs] def decY(self):
"""
The function 'decY' permit to decrease 'Y' with the step's value
"""
if (self.y_coor - self.y_step) < 0:
raise OverflowException("Y")
else:
self.y_coor -= self.y_step
self.increase = not self.increase
[docs] def decZ(self):
"""
The function 'decZ' permit to decrease 'z' with the step's value
"""
if (self.z_coor - self.z_step) < 0:
raise OverflowException("Z")
else:
self.z_coor -= self.z_step
[docs] def get_coor(self):
"""
The function 'get_coor' permit to collect the current position.
:return:
The function returns a tuple with the x and y values\n
exemple: (2,5,1)
"""
return (self.x_coor, self.y_coor, self.z_coor)
[docs] def measurementr(self):
"""
The function 'measurementr' permit to display the power list at a chosen moment with the help of the iwconfig command
:return:
A dictionnary representing the different fields we listed with the iwconfig command
Here are the dictionnary fields : ESSID, Access Point, Signal level, Bit Rate and Noise level.
"""
# dictionnary creation
dico={}
# command running
output = commands.getoutput('gksu iwconfig '+self.interface)
# Research of the ESSID the user is connected on (by default off/any)
if re.search('ESSID',output):
regex=re.compile('(ESSID)[:|=]"(.*).*"')
result_regex=regex.findall(output)
if result_regex:
dico[result_regex[0][0]] = result_regex[0][1]
else:
dico['ESSID'] = "off/any" ;# value by default
else:
dico['ESSID'] = "off/any" ;# value by default
# AP's MAC address research
if re.search('Access Point',output):
regex=re.compile('(Access Point)[:|=] ([0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}).*')
result_regex=regex.findall(output)
if result_regex:
dico[result_regex[0][0]] = result_regex[0][1]
else:
dico['Access Point'] = "not associated"
# Signal level research (by default -100)
if re.search('Signal level',output):
regex=re.compile('(Signal level)[:|=](-{0,1}[0-9]{1,3}) *')
result_regex=regex.findall(output)
if result_regex:
dico[result_regex[0][0]] = result_regex[0][1]
else:
dico['Signal level'] = -256 ;# value by default
else:
dico['Signal level'] = -256 ;# value by default
# Bit rate research on the media
if re.search('Bit Rate',output):
regex=re.compile('(Bit Rate)[:|=]([0-9\.]{1,4}.*[mMkKgG]{1}b/s)')
result_regex=regex.findall(output)
if result_regex:
dico[result_regex[0][0]] = result_regex[0][1]
else:
dico['Bit Rate'] = "None"
# Noise level research (by default -256)
if re.search('Noise level',output):
regex=re.compile('(Noise level)[:|=](-{0,1}[0-9]{1,3}) *')
result_regex=regex.findall(output)
if result_regex:
dico[result_regex[0][0]] = result_regex[0][1]
else:
dico['Noise level'] = -256 ;# value by default
else:
dico['Noise level'] = -256 ;# value by default
# returns the dictionnary
return dico
def eperf(self, ip, interval, duration,connection, port, download, upload, tcp, udp, rate_udp='1M'):
temp=""
# print "ip:"+ip+" interval:"+str(interval)+" duration:"+str(duration)+" port:"+str(port)+" udp:"+str(udp)+" rate_udp:"+str(rate_udp)+" tcp:"+str(tcp)+" upload:"+str(upload)+" download"+str(download)
#create the UDP client
if udp == True:
if "M" in rate_udp:
rate_udp=int(rate_udp[:-1])*1024*1024
else:
rate_udp=int(rate_udp[:-1])*1024
address=(ip,int(port)+1)
CLI_UDP = UDPClient(address,int(interval),int(duration),rate_udp,upload,download)
if upload == True:
CLI_UDP.uploadUDP()
# print "UDP Upload list"
# print CLI_UDP.listFinalUp
temp=temp+";".join(CLI_UDP.listFinalUp)
temp=temp+"#"
time.sleep(1)
if download == True:
CLI_UDP.downloadUDP()
# print "UDP Download list"
# print CLI_UDP.listFinalDown
temp=temp+";".join(CLI_UDP.listFinalDown)
else:
temp=temp+"#"
temp=temp+"*"
#create the TCP client
if tcp :
address=(ip,int(port))
ClI_TCP = ClientTCP(address, int(connection), int(interval), int(duration), upload, download)
ClI_TCP.runClient()
if upload:
# print "TCP Upload list"
# print ClI_TCP.bwUpload
temp=temp+";".join(ClI_TCP.bwUpload)
temp=temp+"#"
if download:
# print "TCP Download list"
# print ClI_TCP.bwDownload
temp=temp+";".join(ClI_TCP.bwDownload)
else:
temp=temp+"#"
# print temp
return temp
#~ def eperf(self, ip, interval, duration, port, udp=False, rate_udp='1M'):
#~ """
#~ The function 'eperf' permit to make the bit rate list at a chosen moment with the help of eperf command
#~ If the UDP mode es selected, the default rate is 1M.
#~
#~ :return:
#~ A string with all the bit rates measured, concatenated with the char '/'
#~ """
#~
#~ if udp:
#~ command = 'eperf -c %s -i %i -t %i -u -b %s -p %i' % (ip, int(interval), int(duration), rate_udp, int(port))
#~ else:
#~ command = 'eperf -c %s -i %i -t %i -p %i' % (ip, int(interval), int(duration), int(port))
#~
#~ output=commands.getoutput(command)
#~ if re.search('Connection refused', output):
#~ regex=re.compile('(Connection refused).*')
#~ result_regex=regex.findall(output)
#~ if result_regex:
#~ return "Connection refused, check the server"
#~ output=output.split("/sec")
#~ output.pop(0)
#~ debit=[]
#~ for statement in output:
#~ regex=re.compile('.* ([\.0-9]{1,}.*[KkMm]{1}bits)')
#~ result_regex=regex.findall(statement)
#~ if result_regex:
#~ debit.append(result_regex[0])
#~ return ";".join(debit)
[docs] def iwlist(self):
"""
The function 'iwlist' permit to make the statement of base stations detected at the moment of the measurement
with the iwlist command
:return:
A list of dictionnary of the different fields
"""
# Run the command
output=commands.getoutput('gksu iwlist '+self.interface+' scanning')
output=output.split("Cell")
output.pop(0)
# We return a dictionnary list
listdico=[]
# We loop on the elements
for element in output:
if element:
# We store all in the dictionnary
dico={}
# Research of the AP's MAC address
if re.search('Address',element):
regex=re.compile('(Address)[:|=|\s] ([0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}).*')
result_regex=regex.findall(element)
if result_regex:
dico[result_regex[0][0]]=result_regex[0][1]
else :
dico['Address']=" "
# ESSID research
if re.search('ESSID',element):
regex=re.compile('(ESSID)[:|=|\s]"(.*).*"')
result_regex=regex.findall(element)
if result_regex:
if result_regex[0][1] == "\\x00" or result_regex[0][1] == "":
dico[result_regex[0][0]]="hidden ESSID"
else :
dico[result_regex[0][0]]=result_regex[0][1]
else :
dico['ESSID']=" "
# Protocol research
if re.search('Protocol',element):
regex=re.compile('(Protocol)[:|=|\s]IEEE (802.11[abg]{1,2}).*')
result_regex=regex.findall(element)
if result_regex:
dico[result_regex[0][0]]=result_regex[0][1]
else :
dico['Protocol']=" "
# Channel research
if re.search('Channel',element):
regex=re.compile('(Channel)[:|=|\s]([0-9]{1,2}).*')
result_regex=regex.findall(element)
if result_regex:
dico[result_regex[0][0]]=result_regex[0][1]
else :
dico['Channel']=-1
# Encryption research
if re.search('Encryption key',element):
regex=re.compile('(Encryption key)[:|=|\s]([onfONF]{2,3}).*')
result_regex=regex.findall(element)
if result_regex:
# Part with the cryptography
if result_regex[0][1]=='off':
dico[result_regex[0][0]] = "Off"
if result_regex[0][1]=='on':
if re.search('IE:',element):
result_regex_2=re.search('(WEP|WPA)',element)
try:
dico['Encryption key'] = result_regex_2.group()
except AttributeError:
dico['Encryption key'] = "WEP"
else:
dico['Encryption key'] = "WEP"
else :
dico['Encryption key'] = "Unknown"
# Signal level research
if re.search('Signal level',element):
regex=re.compile('(Signal level)[:|=|\s](-{0,1}[0-9]{1,3}) *')
result_regex=regex.findall(element)
if result_regex:
dico[result_regex[0][0]]=result_regex[0][1]
else :
dico['Signal level']=" "
# if the dictionnary is not empty, I add it to my list
if dico:
# Add the dictionnary in the dictionnary list
listdico.append(dico)
# Returns the dictionnary list
return listdico
[docs] def configurationNetwork(self):
"""
The function 'configurationNetwork' permet to collect the fileds of the network card:
fragment, tx_power and rts_cts with the help of the iwconfig function
:return:
A dictionnary of the different fields ('Fragment thr','Tx-Power','RTS thr')
"""
# Run the command
output = commands.getoutput('gksu iwconfig '+self.interface)
#Dictionnary initialization
dico={}
# Research if we are in fragment mode
if re.search('Fragment thr',output):
regex=re.compile('(Fragment thr)[:|=]([onfONF]{2,3}).*')
result_regex=regex.findall(output)
if result_regex:
dico[result_regex[0][0]] = result_regex[0][1]
else:
dico['Fragment thr'] = "Off"
# Research if we are in fragment mode
if re.search('Tx-Power',output):
regex=re.compile('(Tx-Power)[:|=](-{0,1}[0-9]{1,3}) *')
result_regex=regex.findall(output)
if result_regex:
dico[result_regex[0][0]] = result_regex[0][1]
else:
dico['Tx-Power'] = -100
# Research if we are in fragment mode
if re.search('RTS thr',output):
regex=re.compile('(RTS thr)[:|=]([onfONF]{2,3}).*')
result_regex=regex.findall(output)
if result_regex:
dico[result_regex[0][0]] = result_regex[0][1]
else:
dico['RTS thr'] = "Off"
#Returns the dictionnary
return dico
[docs]class OverflowException(Exception):
"""
This class will permit to manage the exception of class size overflow
"""
def __init__(self, axe):
self.axe = axe
def __str__(self):
return self.axe
if __name__ == '__main__':
mes = Measurement(5, 3, 1, 1)
#mes.eperf('192.168.128.91',1,5)
# Test measurementr function
result_measurementr = mes.measurementr()
print("---------measurementr-----------")
for key,value in result_measurementr.items():
print(" {0} = {1}.".format(key, value))
print("----------iwlist-----------")
result_iwlist = mes.iwlist()
for elt in result_iwlist:
print("######")
for key,value in elt.items():
print(" {0} = {1}.".format(key, value))
print("----------configurationNetwork------------")
result_configurationNetwork = mes.configurationNetwork()
print "%s" % result_configurationNetwork