上篇内容我们算是来了一个开场,创建了我们的项目,最后把数据库也建立完成了,这篇我们主要完成后台数据入库的部分,根据我们之前的思路,如果已经能实时获得每个站点性能信息了,如果能将每次获得信息插入到数据库中,这个就完成了我们的入库部分,这里说一下我们的数据库操作部分,要操作数据库,首先我们要跟数据库建立连接,然后进行常规的CRUD操作,操作完毕后再关闭数据库连接,这是一个基本流程,所以为了提高数据库的操作速度,我们可以把一些常用的操作封装成一个类,以下是我门数据库操作的类代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
class database: def __init__(self, dbname=None, dbhost=None): self._logger = logger if dbname is None: self._dbname = DBNAME else: self._dbname = dbname if dbhost is None: self._dbhost = DBHOST else: self._dbhost = dbhost self._dbuser = DBUSER self._dbpassword = DBPWD self._dbport = int(DBPORT) self._conn = self.connectMySQL() if(self._conn): self._cursor = self._conn.cursor() def connectMySQL(self): conn = False try: conn = MySQLdb.connect(host=self._dbhost,user=self._dbuser,passwd=self._dbpassword,db=self._dbname,port=self._dbport) except Exception,data: self._logger.error("connect database failed, %s" % data) conn = False return conn def fetch_all(self, sql): res = '' if(self._conn): try: self._cursor.execute(sql) res = self._cursor.fetchall() except Exception, data: res = False self._logger.warn("query database exception, %s" % data) return res def update(self, sql): flag = False if(self._conn): try: self._cursor.execute(sql) self._conn.commit() flag = True except Exception, data: flag = False self._logger.warn("update database exception, %s" % data) return flag def close(self): if(self._conn): try: if(type(self._cursor)=='object'): self._cursor.close() if(type(self._conn)=='object'): self._conn.close() except Exception, data: self._logger.warn("close database exception, %s,%s,%s" % (data, type(self._cursor), type(self._conn))) |
在这个类中我们先定义了__init__方法,指定了数据库连接,用户,端口,用户等信息,接下来是定义了三个方法,分别是建立数据库连接,获取数据内容、更新数据库及最后关闭数据库连接,正题逻辑就是这样,代码没特别的内容。
有了这个数据库类后,我们就可以用上我们之前的pycurl模块去获取每个站点的数据了,每次获取的数据插入到数据库中,因为可能非常多的站点,所以我们要考虑运行效率,所以我们这里使用并发去检查站点数据,采用ThreadPool线程池去实现,全部代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
#!/usr/bin/env python #coding=utf-8 import MySQLdb import MySQLdb.cursors import logging import requests import pycurl import StringIO import sys import socket from multiprocessing.dummy import Pool as ThreadPool LOGPATH = '/tmp/conndb.log' DBNAME = 'dbname' DBHOST = 'localhost' DBUSER = 'root' DBPWD = 'password' DBPORT = '3306' logging.basicConfig(level=logging.WARNING, filename=LOGPATH,filemode='w') logger = logging.getLogger() class database: def __init__(self, dbname=None, dbhost=None): self._logger = logger if dbname is None: self._dbname = DBNAME else: self._dbname = dbname if dbhost is None: self._dbhost = DBHOST else: self._dbhost = dbhost self._dbuser = DBUSER self._dbpassword = DBPWD self._dbport = int(DBPORT) self._conn = self.connectMySQL() if(self._conn): self._cursor = self._conn.cursor() def connectMySQL(self): conn = False try: conn = MySQLdb.connect(host=self._dbhost,user=self._dbuser,passwd=self._dbpassword,db=self._dbname,port=self._dbport) except Exception,data: self._logger.error("connect database failed, %s" % data) conn = False return conn def fetch_all(self, sql): res = '' if(self._conn): try: self._cursor.execute(sql) res = self._cursor.fetchall() except Exception, data: res = False self._logger.warn("query database exception, %s" % data) return res def update(self, sql): flag = False if(self._conn): try: self._cursor.execute(sql) self._conn.commit() flag = True except Exception, data: flag = False self._logger.warn("update database exception, %s" % data) return flag def close(self): if(self._conn): try: if(type(self._cursor)=='object'): self._cursor.close() if(type(self._conn)=='object'): self._conn.close() except Exception, data: self._logger.warn("close database exception, %s,%s,%s" % (data, type(self._cursor), type(self._conn))) def check(urls): #if url not in errorurl: global HTTP_CODE,DNS_TIME,CONNECT_TIME,PRETRANSFER_TIME,STARTTRANSFER_TIME,TOTAL_TIME,SPEED_DOWNLOAD url = 'http://' + str(urls[0]) c = pycurl.Curl() c.setopt(pycurl.CONNECTTIMEOUT,5) c.setopt(pycurl.TIMEOUT,5) c.setopt(pycurl.NOPROGRESS,1) c.setopt(pycurl.FORBID_REUSE,1) c.setopt(pycurl.MAXREDIRS,3) c.setopt(pycurl.DNS_CACHE_TIMEOUT,30) c.setopt(c.URL, url) try: b = StringIO.StringIO() c.setopt(c.WRITEFUNCTION, b.write) c.perform() HTTP_CODE = c.getinfo(pycurl.HTTP_CODE) DNS_TIME = c.getinfo(c.NAMELOOKUP_TIME) * 1000 CONNECT_TIME = c.getinfo(c.CONNECT_TIME) * 1000 PRETRANSFER_TIME = c.getinfo(c.PRETRANSFER_TIME) * 1000 STARTTRANSFER_TIME = c.getinfo(c.STARTTRANSFER_TIME) * 1000 TOTAL_TIME = c.getinfo(c.TOTAL_TIME) * 1000 SPEED_DOWNLOAD = c.getinfo(c.SPEED_DOWNLOAD) b.close() c.close() except pycurl.error, error: pass #errno, errstr = error #print 'An error occurred: ', errstr #insert to db status = 'ok' sqldb = database() sql = "update site_info set http_code=%d, dns_time=%f,connect_time=%f,pretransfer_time=%f,starttransfter_time=%f,total_time=%f,spee d_download=%f,status='%s' where url='%s'" %(HTTP_CODE,DNS_TIME,CONNECT_TIME,PRETRANSFER_TIME,STARTTRANSFER_TIME,TOTAL_TIME,SPEED_DOWNLO AD,status,urls[0]) sqldb.update(sql) sqldb.close() #print HTTP_CODE,DNS_TIME,CONNECT_TIME,PRETRANSFER_TIME,STARTTRANSFER_TIME,TOTAL_TIME,SPEED_DOWNLOAD if __name__ == '__main__': sqldb = database() sql2 = """select url from site_info""" res = sqldb.fetch_all(sql2) sqldb.close() pool = ThreadPool(3) pool.map(check,res) |
以上就是我们全部的代码内容,pycurl部分我们不在多解释,不理解的小伙伴可以参考之前的文章,Dummy就是多进程模块的克隆文件,唯一不同的是,多进程模块使用的是进程,而dummy则使用线程,pool.map类似python的map函数,需要二个参数第一个参数是一个函数名,第二个参数是个迭代对象,在这里迭代对象我们是从数据库中直接获取的,至于数据库的数据从哪里来的呢?当然是用户输入的,那下篇我们实现下前端输入页面,让用户自己可以增加要监控的站点,这篇就到这里,喜欢的小伙伴请帮忙转发哟。