09 октября, 2013

Небольшая обертка для SQLite Python



Ну в общем на коленке накиданная обертка, которая упрощает некоторые, часто используемые действия с SQLite. А именно запросы на создание таблицы, добавление и изъятие данных.




#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = 'Artur Fis'

import sqlite3
import logging

class NewSqlEngine(object):

    def __init__(self,db_name,lock = None):
        self.db_name = db_name
        self.conn = sqlite3.connect(db_name)
        self.cursor = self.conn.cursor()
        self._sql_config = {}
        self.RLock = lock

    def save(self):
        #self.RLock.acquire()
        #self.RLock.release()
        self.conn.commit()


    def close(self):
        self.conn.close()

    def create_table(self,table_name = "",columns = {}):
        #Create table
        columns_name = ''
        if isinstance(columns,dict):
            for column in columns:
                columns_name = '%s,%s' % (columns_name,'"%s" %s' % (column,columns[column]))
            columns_name = columns_name[1:]
        elif isinstance(columns,list):
            columns_name = ','.join(columns)
        result = None
        sql_query = 'CREATE TABLE %s (%s);' % (table_name,columns_name)
        try:
            self.cursor.execute(sql_query)
        except Exception,e:
            logging.error('Create Table Error: %s' % str(e))
            return False
        logging.debug('Table %s created!' % table_name)
        self.save()
        return True

    def execute(self,*args,**kwargs):
        #Exceute SQL Command
        self.cursor.execute(*args,**kwargs)
        return self.cursor.fetchall()

    def clear(self,table):
        # Clear table
        try:
            self.cursor.execute('DELETE from %s' % table)
        except Exception,e:
            logging.error('Error clear: %s' % str(e))
        logging.debug('Clear table: %s' % table)

    def put(self,table_name="",data_bd={}):
        table_column =[]
        table_data = []
        for data in data_bd:
            table_column.append('"%s"' % str(data))
            insert_data = data_bd[data]
            if isinstance(insert_data,int) or isinstance(insert_data,float):
                insert_data ='%s' % str(insert_data)
            else:
                if isinstance(insert_data,unicode):
                    insert_data =u'"%s"' % insert_data
                else:
                    insert_data ='"%s"' % str(insert_data).decode('windows-1251')
            table_data.append(insert_data)
        sql_query = u'INSERT INTO %s (%s) values (%s)' % (table_name, ','.join(table_column), ','.join(table_data) )
        try:
            self.cursor.execute(sql_query)
            logging.debug('Put to table: %s' % sql_query)
        except Exception,e:
            logging.error('Error put table: %s' % str(e))
        self.save()

    def get_all(self,table_name=""):
        sql_query = 'SELECT * FROM %s' % table_name
        try:
            self.cursor.execute(sql_query)
        except Exception,e:
            logging.error('Error get data from table: %s' % str(e))
        return self.cursor.fetchall()

    def get(self,table_name=""):
        if table_name in self._sql_config:
            if isinstance(self._sql_config[table_name],dict):
                if 'number' in self._sql_config[table_name]:
                    pass
                else:
                    self._sql_config[table_name]['number'] = 0
            else:
                self._sql_config[table_name] = {'number':0}
        else:
            self._sql_config[table_name] = {'number':0}
        number = self._sql_config[table_name]['number']
        sql_query = 'SELECT * FROM %s' % table_name
        try:
            self.cursor.execute(sql_query)
        except Exception,e:
            logging.error('Error get data from table: %s' % str(e))
            return (False,str(e))
        result = self.cursor.fetchall()
        if number < len(result):
            self._sql_config[table_name]['number'] +=1
        else:
            return None
        return result[number]


    def get_tables(self):
        sql_query = 'SELECT * FROM sqlite_master WHERE type = "table"'
        result = []
        try:
            self.cursor.execute(sql_query)
        except Exception,e:
            logging.error('Error get table names: %s' % str(e))
        tables = self.cursor.fetchall()
        for table in tables:
            if table[0] == u'table':
                result.append(table[1])
        return result
Писал на Python 2.7, стандартная библиотека sqlite3, logging можете перевести в файл или изменить. Может кому понадобиться.

Комментариев нет:

Отправить комментарий