Source code for gwa.code_extract

#!/usr/bin/python
# -*- coding: utf-8 -*-

"""
The ExtractController class allows the user to extract informations about the database.
The module allows extracting in XML type.
This file is especially dedicated to IHM_extraction.py, graphical interface file for the extraction file module.
"""


import sys, re, string, pg, os
import ConfigParser
import logging
#import self.main_app

from PyQt4 import QtGui, QtCore
from ihm_extraction import *
from code.DBA import *
from code.extract import *
from code.popups import *

[docs]class ExtractController(QtGui.QWizard): """ This fonction is called at every class instanciation. It is equivalent (in C & C++) to a class builder. It initializes the graphical interface of the Extract part by different steps : - create the link with the main interface. - create the graphical interface. - Listening of the events generated by the graphical interface, with the function "connect_signals()". - Initializing the variables with the function "initVariables()". """ def __init__(self, main=None, parent=None): QtGui.QWidget.__init__(self, parent=parent) # Keeps link towards high_level application self.main_app = main # Draws UI self.ui = Ui_Extraction() self.ui.setupUi(self) # Connects signals of UI to self methods self.connect_signals() #Initializes variables self.initVariables()
[docs] def initVariables(self): """ This function allows to initialize variables needed to realize all the measurements, recover variables of the "gwa.conf" file and connect to the database. The initialized variables are - environments_ids : - id_area : - places_ids : - selected_places_ids : - aps_ids : - selected_aps_ids : - campaigns_ids : - selected_campaigns_ids : """ # Attributes definition self.environments_ids = {} # Dictionnary of the environments in the DB for the displaying in the graphical interface {key => displayed string : value => id_environnement } self.environment_id = None # Variable containing id of the selected environment for the extraction self.places_ids = {} # Dictionnary with a key (selected text on the interface) and the value "place_id" (in the DB) corresponding to the selected place. self.selected_places_ids = [] # List of 'ids' of the selected places in the graphical interface self.aps_ids = {} # Dictionnary of the APs. This dictionnary stores the AP's id for every string displayed on the interface (one string referencing one AP) self.selected_aps_ids = [] # Ids list of the AP selected for the extraction self.campaigns_ids = {} # Dictionnary of campaigns which stores the campaign's ID for every string displayed on the interface (one string referencing one AP) self.selected_campaigns_ids = [] # Ids list of campaigns selected for the extraction self.distinct_ap_id = {} # Dictionnary used to store the ap_id used in the selected measurment campaigns #Configuration file reading config = ConfigParser.RawConfigParser() #If the file can be read if config.read(self.main_app.gwa_path + '/gwa.conf'): self.resolution = config.getint('ImagePNG','resolution') verb = config.get('Verbosity','level') else: #without configuration file self.resolution = 100 verb = 'INFO' if verb == 'INFO': logging.basicConfig(level=logging.INFO, format='%(message)s') elif verb == 'DEBUG': logging.basicConfig(level=logging.DEBUG, format='%(message)s') #Database connection self.db = DBConnect(main=self.main_app) #Popup class instanciation self.popup=Popups()
[docs] def connect_signals(self): """ This function allows listening of the signals generated by the graphical interface, and associate the adapted treatments to it. **example** >>> self.connect(self.ui.toolButton_measurement_campaign_add, QtCore.SIGNAL("clicked()"), self.addCampaign) In this example, when the button "toolButton_measurement_campaign_add" will be clicked ("clicked()") the function addCampaign will be called. """ #Buttons links with the functions self.connect(self, QtCore.SIGNAL("currentIdChanged(int)"), self.pageChange) self.connect(self, QtCore.SIGNAL("rejected()"), self.main_app.switchToMenu) self.connect(self, QtCore.SIGNAL("finished(int)"), self.reinitialize) self.connect(self.ui.toolButton_extract_add_place, QtCore.SIGNAL("clicked()"), self.addPlace) self.connect(self.ui.toolButton_extract_remove_place, QtCore.SIGNAL("clicked()"), self.removePlace) self.connect(self.ui.checkBox_extract_choice_place, QtCore.SIGNAL('stateChanged(int)'), self.chooseAllThePlaces) self.connect(self.ui.toolButton_extract_AP_add, QtCore.SIGNAL("clicked()"), self.addAP) self.connect(self.ui.toolButton_extract_AP_remove, QtCore.SIGNAL("clicked()"), self.removeAP) self.connect(self.ui.checkBox_extract_ap_choice, QtCore.SIGNAL('stateChanged(int)'), self.chooseAllTheAPs) self.connect(self.ui.toolButton_extract_add_campaigns, QtCore.SIGNAL("clicked()"), self.addCampaign) self.connect(self.ui.toolButton_extract_campaigns_remove, QtCore.SIGNAL("clicked()"), self.removeCampaign) self.connect(self.ui.checkBox_extract_campaign_choice, QtCore.SIGNAL('stateChanged(int)'), self.chooseAllTheCampaigns) self.connect(self.ui.toolButton_extract_xml, QtCore.SIGNAL("clicked()"), self.extractToXML)
[docs] def pageChange(self): """ This function is called at avery page change to initialize the lists, and some variables. """ if self.currentId() == 1: logging.info('Page : Environments') self.displayEnvironmentsList() if self.currentId() == 2: logging.info('Page : Places') self.setCurrentEnvironment() self.displayUnselectedPlacesList() self.displaySelectedPlacesList() if self.currentId() == 3: logging.info('Page : Campaigns') self.displayUnselectedCampaignsList() self.displaySelectedCampaignsList() if self.currentId() == 4: logging.info('Page : APs') self.getDistinctAP() self.displayUnselectedAPsList() self.displaySelectedAPsList()
[docs] def reinitialize(self, resultCode): """ This function allows switching to the application menu and re-initializes the module (QWizard) to its first page. """ self.main_app.switchToMenu() self.restart()
[docs] def validateCurrentPage(self): """ Re-implementation of the "validateCurrentPage" function which implements by default "QWizardPage::validatePage()" on "currentPage()". It allows testing that all the elements are filled and/or selected on the current page. Those are unitary tests which permit to send "True" or "False". >>> If "True": The page is validated and its id is modified ==> page change >>> Si "False": The page isn't validated and its id stays the same ==> page stays unchanged """ # Check of a selected environment if self.currentId() == 1: if not self.ui.listWidget_extract_environments_list.currentItem(): return False # Check that at least one place is selected if self.currentId() == 2: if self.selected_places_ids == []: return False # Check that at least one campaign is selected if self.currentId() == 3: if self.selected_campaigns_ids == []: return False # Check that at least one AP is selected if self.currentId() == 4: if self.selected_aps_ids == []: return False # Validation of the tests above return True #----------Page of environment selection -----
[docs] def displayEnvironmentsList(self): """ This function allows displaying, on the interface, the list of environments present in the database """ #We empty the interface list self.ui.listWidget_extract_environments_list.clear() #We select the name and the description od the environments present in the DB sql_result=self.db.select('area',['id_area', 'name','description']) #We update the interface list with the data collected above for id_area, name, description in sql_result: string = "%s : %s" % (name, description) item = QtGui.QListWidgetItem(string) self.ui.listWidget_extract_environments_list.insertItem(0,item) self.environments_ids[string]= id_area # We select automatically the environment selected before in case of a page return ('back' in the graphical interface) if id_area == self.environment_id: self.ui.listWidget_extract_environments_list.setCurrentItem(item) #----------Page of places selection --------------
[docs] def setCurrentEnvironment(self): """ Collects the name of the environment selected, then collect the corresponding id and name in the DB to place them in a global variable 'self.environment_id'. """ #Collecting the selected line in the interface line_select = str(self.ui.listWidget_extract_environments_list.currentItem().text()) #Collecting the id self.environment_id = self.environments_ids[line_select]
[docs] def displayUnselectedPlacesList(self): """ This function allows displaying, on the interface, the list on places, related to the environment selected just before, but not selected by the user. """ #We empty the interface list self.ui.listWidget_extract_place_list.clear() #We select the name and description of the places present on the environment selected before sql_result=self.db.select('place',['id_place', 'name','description'], {'id_area':self.environment_id}) #We update the interface list with the data collected above for id_p, name, desc in sql_result: if id_p not in self.selected_places_ids: string = "%s : %s" % (name, desc) self.places_ids[string] = id_p self.ui.listWidget_extract_place_list.addItem(string)
[docs] def displaySelectedPlacesList(self): """ This function allows displaying, on the interface, the list of places related to the environment selected just before, and selected by the user. """ #We empty the interface list self.ui.listWidget_extract_place_select_list.clear() #We select the name and description of places present in the environment selected before sql_result=self.db.select('place',['id_place', 'name','description'], {'id_area':self.environment_id}) #We update the interface list with the date collected above self.selected_places_id = [] # We re-initialize the selected places for id_p, name, desc in sql_result: if id_p in self.selected_places_ids: string = "%s : %s" % (name, desc) self.places_ids[string] = id_p self.ui.listWidget_extract_place_select_list.addItem(string)
[docs] def addPlace(self): """ Add the selected place in the right list of selected places. """ # Test if an element is selected by the user on the left list if self.ui.listWidget_extract_place_list.currentItem(): line_select = str(self.ui.listWidget_extract_place_list.currentItem().text()) id_place_select = self.places_ids[line_select] self.selected_places_ids.append(id_place_select) self.displayUnselectedPlacesList() self.displaySelectedPlacesList()
[docs] def removePlace(self): """ Remove the place selected in the right list of selected places. """ # Test if an element is selected by the user on the right list if self.ui.listWidget_extract_place_select_list.currentItem(): line_select = str(self.ui.listWidget_extract_place_select_list.currentItem().text()) id_place_select = self.places_ids[line_select] self.selected_places_ids.remove(id_place_select) self.displayUnselectedPlacesList() self.displaySelectedPlacesList()
[docs] def chooseAllThePlaces(self): """ Function executed when the user clicks on the checkbox 'Choose all the places'. By activating the checkbox, the function selects all the places of the left list to place them in the right list. By de-activating the checkbox, the function puts back all the places on the left list. """ # If the user activates the checkbox: if self.ui.checkBox_extract_choice_place.isChecked(): # We empty the left list self.ui.listWidget_extract_place_list.clear() # We re-initialize the right list and we fill it with all the places present in the selected environment self.ui.listWidget_extract_place_select_list.clear() self.selected_places_ids = [] sql_result=self.db.select('place',['id_place', 'name','description'], {'id_area':self.environment_id}) for id_p, name, desc in sql_result: string = "%s : %s" % (name, desc) self.places_ids[string] = id_p self.selected_places_ids.append(id_p) self.ui.listWidget_extract_place_select_list.addItem(string) # If the user de-activate the checkbox: else: # We remove every places which was recorded before self.selected_places_ids = [] # We display all the places in the left list self.displayUnselectedPlacesList() # We empty the right list for the displaying self.ui.listWidget_extract_place_select_list.clear() #----------Page de selection des campagnes --------------
[docs] def displayUnselectedCampaignsList(self): """ This function allows displaying, on the interface, the list of campaigns related to the place(s) selected before, but not selected by the user. """ #We empty the interface list self.ui.listWidget_extract_campaigns_list.clear() for place_id in self.selected_places_ids: #For each place selected before, we collect the list of campaigns recorded in the place sql_result= self.db.select('campaign',['id_campaign', 'name','date'], {'id_place':place_id}) #We display the unselected campaigns in the left list on the graphical interface for id_c, name, date in sql_result: string = "%s - %s" % (date, name) if id_c not in self.selected_campaigns_ids: self.campaigns_ids[string] = id_c self.ui.listWidget_extract_campaigns_list.addItem(string) self.ui.listWidget_extract_campaigns_list.sortItems()
[docs] def displaySelectedCampaignsList(self): """ This function allows displaying, on the interface, the list of campaigns related to the places selected before, and selected by the user """ #We empty the interface list self.ui.listWidget_extract_campaign_select_list.clear() for place_id in self.selected_places_ids: #For each place selected before, we collect the list of campaigns recorded in the place sql_result= self.db.select('campaign',['id_campaign', 'name','date'], {'id_place':place_id}) #We display the campaigns selected in the right list on the graphical interface for id_c, name, date in sql_result: string = "%s - %s" % (date, name) if id_c in self.selected_campaigns_ids: self.campaigns_ids[string] = id_c self.ui.listWidget_extract_campaign_select_list.addItem(string) self.ui.listWidget_extract_campaign_select_list.sortItems()
[docs] def addCampaign(self): """ Add the campaign selected in the right campaigns list. """ # Test if an element is selected by the user on the left list if self.ui.listWidget_extract_campaigns_list.currentItem(): line_select = str(self.ui.listWidget_extract_campaigns_list.currentItem().text()) id_campaigns_select = self.campaigns_ids[line_select] self.selected_campaigns_ids.append(id_campaigns_select) # We force, once again, the displaying of lists in the widgets if because a bug was seen in this window self.displayUnselectedCampaignsList() self.displaySelectedCampaignsList()
[docs] def removeCampaign(self): """ Remove the campaign selected in the right list and replace it in the left list. """ # Test if an element is selected by the user on the right list if self.ui.listWidget_extract_campaign_select_list.currentItem(): line_select = str(self.ui.listWidget_extract_campaign_select_list.currentItem().text()) id_campaigns_select = self.campaigns_ids[line_select] self.selected_campaigns_ids.remove(id_campaigns_select) # We force, once again, the displaying of lists in the widgets if because a bug was seen in this window self.displayUnselectedCampaignsList() self.displaySelectedCampaignsList()
[docs] def chooseAllTheCampaigns(self): """ Function executed when the user clicks on the checkbox 'Choose all the campaigns'. By activating the checkbox, the function selects all the campaigns in the left list to place them in the right list. By de-activating the checkbox, the function puts back all the campaigns in the left list. """ # If the user activates the checkbox: if self.ui.checkBox_extract_campaign_choice.isChecked(): # We empty the left list self.ui.listWidget_extract_campaigns_list.clear() # We re-initialize the right list and the list of campaigns selected before self.ui.listWidget_extract_campaign_select_list.clear() self.selected_campaigns_ids = [] # For each campaign selected before for place_id in self.selected_places_ids: # We display the right list of APs contained in this place sql_result= self.db.select('campaign',['id_campaign', 'name','date'], {'id_place':place_id}) for id_c, name, date in sql_result: string = "%s - %s" % (date, name) self.campaigns_ids[string] = id_c self.selected_campaigns_ids.append(id_c) self.ui.listWidget_extract_campaign_select_list.addItem(string) self.ui.listWidget_extract_campaign_select_list.sortItems() # If the user de-activate the checkbox: else: # We remove all the APs selected before self.selected_campaigns_ids = [] # We display all the APs of the current places in the left list self.displayUnselectedCampaignsList() # We empty the right list for the displaying self.ui.listWidget_extract_campaign_select_list.clear() #----------Page of APs selection --------------
[docs] def getDistinctAP(self): """ This function allows collecting the list of distinct APs used in the campaigns selected in the previous window. The APs are stored in a dictionnary to skip the the dooblon during the run of measurments related to the selected campaigns. The function is called at the arrival on the page of APs selection. """ for id_campagne in self.selected_campaigns_ids: #For each campagn previously selected, we collect the list of APs used during the measurments in the campaign (collected element : ap_id -> foreigner key in the measurment table) int_sql_result = self.db.select('measure',['distinct id_ap'], {'id_campaign':id_campagne}) # We add each distinct AP in a dictionnary to skip the AP doubles when we will run all the selected campaigns for id_ap in int_sql_result: self.distinct_ap_id[id_ap[0]]= ""
[docs] def displayUnselectedAPsList(self): """ This function allows displaying, on the APs list, related to the places selected juste before, but not selected by the user. """ #We empty the interface list self.ui.listWidget_extract_AP_list.clear() #Selection of APs used during the campaigns selected before for id_ap in self.distinct_ap_id.keys(): #For each AP collected, we extract the main informations (SSID and MAC address) sql_result= self.db.select('ap',['mac_address','essid'], {'id_ap':id_ap}) #We prepare the displaying of the SSID and MAC address of the AP in the list string = "%s - %s" % (sql_result[0][1], sql_result[0][0]) # Displaying if not selected if id_ap not in self.selected_aps_ids: self.aps_ids[string] = id_ap self.ui.listWidget_extract_AP_list.addItem(string) self.ui.listWidget_extract_AP_list.sortItems()
[docs] def displaySelectedAPsList(self): """ This function allows displaying, on the interface, the list of APs related to the places selected just before, and selected by the user. """ #We empty the interface list self.ui.listWidget_extract_AP_select_list.clear() #Selection of APs used during the campaigns selected previously for id_ap in self.distinct_ap_id.keys(): #For each AP of the collected dictionnary, we extract the main informations and MAC address) sql_result= self.db.select('ap',['mac_address','essid'], {'id_ap':id_ap}) #We prepare the displaying of SSID and MAC address of the AP in the list string = "%s - %s" % (sql_result[0][1], sql_result[0][0]) # Displaying if not selected if id_ap in self.selected_aps_ids: self.aps_ids[string] = id_ap self.ui.listWidget_extract_AP_select_list.addItem(string) self.ui.listWidget_extract_AP_list.sortItems()
[docs] def addAP(self): """ Add the selected AP in the right list of APs. """ # Test if an element is selected in the left list by the user if self.ui.listWidget_extract_AP_list.currentItem(): line_select = str(self.ui.listWidget_extract_AP_list.currentItem().text()) ap_id_select = self.aps_ids[line_select] self.selected_aps_ids.append(ap_id_select) self.displayUnselectedAPsList() self.displaySelectedAPsList()
[docs] def removeAP(self): """ Remove the selected AP in the right list of APs """ # Test if an element is selected by the user on the right list if self.ui.listWidget_extract_AP_select_list.currentItem(): line_select = str(self.ui.listWidget_extract_AP_select_list.currentItem().text()) ap_id_select = self.aps_ids[line_select] self.selected_aps_ids.remove(ap_id_select) self.displayUnselectedAPsList() self.displaySelectedAPsList()
[docs] def chooseAllTheAPs(self): """ Function executed when the user clicks on the checkbox 'Choose all the APs'. By activating the checkbox, the function selects all the APs of the left list to put them in the right list. By de-activating the checkbox, the function puts back all the APs in the left list. """ # If the user activates the checkbox: if self.ui.checkBox_extract_ap_choice.isChecked(): # We empty the left list self.ui.listWidget_extract_AP_list.clear() # We re-intialize the right list and the list of APs previously selected self.ui.listWidget_extract_AP_select_list.clear() self.selected_aps_ids = [] #Selection of the APs used during the previously selected campaigns for ap_id in self.distinct_ap_id.keys(): #For each AP of the dictionnary collected, we extract the main informations (SSID and MAC address) sql_result= self.db.select('ap',['mac_address','essid'], {'id_ap':ap_id}) #We prepare the displaying of the SSID and MAC address of the AP in the list string = "%s - %s" % (sql_result[0][1], sql_result[0][0]) # Displaying if not selected if ap_id not in self.selected_aps_ids: self.aps_ids[string] = ap_id self.selected_aps_ids.append(ap_id) self.ui.listWidget_extract_AP_select_list.addItem(string) self.ui.listWidget_extract_AP_list.sortItems() # If the user de-activate the checkbox: else: # We remove all the APs selected before self.selected_aps_ids = [] # We display all the APs of the current places in the left list self.displayUnselectedAPsList() # We empty the right list for the displaying self.ui.listWidget_extract_AP_select_list.clear() #----------Page of extraction --------------
[docs] def extractToXML(self): """ Extract the measurments of the DB in XML format, by the data selected. The function instantiates the Extract class pour les requĂȘtes "select()" in the database and for the writing in a file. """ # Extraction after verification and validation of pre-requisites above filename = QtGui.QFileDialog.getSaveFileName(QtGui.QWizard(), 'Create an *.xml file', os.path.expanduser("~")+"/extract.xml") if filename: print "Extraction in process" #Creation of an instance of the Extract class self.extraction = Extract(self.environment_id, self.selected_places_ids, self.selected_aps_ids, self.selected_campaigns_ids, filename, main=self.main_app) self.extraction.toXML() self.popup.popupOK("Extraction finished with success!", "You can now treat your measurments\n"+filename)
if __name__ == "__main__": import sys app= QtGui.QApplication(sys.argv) scr = ControleurExtract() scr.show() sys.exit(app.exec_())