发现身边很多朋友都在学习Python,而Python作为一个计算语言,很少有练习的机会,今天开放一个之前给广州客户做的消息中间件源码,读懂这套代码基本上就会应用Python,包括了:
1、websocket
2、类与对象
3、多线程
4、日志处理
5、json处理
6、时间处理
7、MySQL数据库处理
8、HTTP request 处理
9、头文件引用
一、定义配置文件 uncall_config.py
__author__ = 'ideacall' LISTEN_IP = "xx.xx.xx.xx" LISTEN_PORT = 6899 HOST= "" PORT=3306 USER='root' PASSWD='' DB='' UNCALL_HTTP_HOST = ""
二、主文件
#coding=utf8 #!/usr/bin/python from __future__ import print_function import struct,socket import hashlib import threading,random import time import json import pymysql from base64 import b64encode, b64decode import urllib import uncall_config from time import ctime,sleep SEND_BUF_SIZE = 4096 # send buf RECV_BUF_SIZE = 4096 # recv buf LISTEN_IP = uncall_config.LISTEN_IP #listen ip address LISTEN_PORT = uncall_config.LISTEN_PORT #listen ip ports HOST = uncall_config.HOST #db host PORT = uncall_config.PORT #db port USER = uncall_config.USER #db user PASSWD = uncall_config.PASSWD #db pwd DB = uncall_config.DB # db datanames UNCALL_HTTP_HOST = uncall_config.UNCALL_HTTP_HOST #uncallcc webservices address connectionlist = {} webclasslist = {} pymysql.install_as_MySQLdb() def write_log(msg): f=open("/tmp/cxst_socket.log","a+") f.write(msg+"\n") f.close() #check number strip is 0 def str_number_strip_check(s): if (s.startswith('01')) : return s.strip('0') else: return s #check exten login def check_extension(extension): rows = 0 conn = pymysql.connect(host=HOST, port=PORT,user=USER,passwd=PASSWD,db=DB) cur = conn.cursor() cur.execute("select extension, status, dnd from users where extension='"+str(extension)+"'") rows = cur.rowcount cur.close() conn.close() return rows def pushData(): global webclasslist for webclient in webclasslist.values(): write_log(' TASK THREAD -----> %s is_login status %s %s ' %(webclient.index, webclient.is_login, webclient.extension)) if webclient.is_login > 0 : status = get_extension_status(webclient.extension) if status != webclient.status and len(status)>0 : webclient.status = status ret_data = {'Event':'STATUS','errorCode':'E0','errorMsg':'SUCCESS','data':status} sendMessageIndex(json.dumps(ret_data),webclient.index) pop_list_callin = get_extension_pop_callin(webclient.extension) if len(pop_list_callin) >0 : sendMessageIndex(json.dumps(pop_list_callin),webclient.index) pop_list_callout = get_extension_pop_callout(webclient.extension) if len(pop_list_callout) >0 : sendMessageIndex(json.dumps(pop_list_callout),webclient.index) cdr_list = get_extension_cdr(webclient.extension) if len(cdr_list): ret_data = {'Event':'CDR','errorCode':'E0','errorMsg':'SUCCESS','data':cdr_list} sendMessageIndex(json.dumps(ret_data),webclient.index) sleep(3) def data_check(): while True: pushData() sleep(2) def get_extension_status(extension): conn = pymysql.connect(host=HOST, port=PORT,user=USER,passwd=PASSWD,db=DB) cur = conn.cursor() cur.execute("SELECT d.device as TerminalId, d.extension as StaffId , FROM_UNIXTIME(d.logintime) as `Time`, u.dnd, u.`status` from asterisk.device_login as d LEFT JOIN asterisk.users as u on u.extension = d.device where d.extension ='"+extension+"'") retlist=[] for r in cur: row ={'TerminalId': 'MIDEA'+str(r[0]),'StaffId' : str(r[1]), 'Time': str(r[2]), 'dnd': str(r[3]), 'status': str(r[4])} retlist.append(row) cur.close() conn.close() return retlist def get_extension_status_all(): conn = pymysql.connect(host=HOST, port=PORT,user=USER,passwd=PASSWD,db=DB) cur = conn.cursor() cur.execute("SELECT d.device as TerminalId, d.extension as StaffId , FROM_UNIXTIME(d.logintime) as `Time`, u.dnd, u.`status` from asterisk.device_login as d LEFT JOIN asterisk.users as u on u.extension = d.device") retlist=[] for r in cur: row ={'TerminalId': 'MIDEA'+str(r[0]),'StaffId' : str(r[1]), 'Time': str(r[2]), 'dnd': str(r[3]), 'status': str(r[4])} retlist.append(row) cur.close() conn.close() return retlist def get_extension_cdr(extension): retlist =[] conn = pymysql.connect(host=HOST, port=PORT,user=USER,passwd=PASSWD,db=DB) cur = conn.cursor() cur_update = conn.cursor() cdr_tables = "cdro_"+str(time.strftime("%Y_%-m_%-d",time.localtime())) cur.execute("select 1 from information_schema.TABLES where table_name='"+cdr_tables+"'") if cur.rowcount > 0 : cur.execute("select src, dst, amaflags, billsec,disposition,uniqueid,userfield,calldate,hangup_src from asteriskcdrdb."+cdr_tables+" where (src ='"+extension+"' or dst = '"+extension+"') and analysis = '0' LIMIT 1") for r in cur: row ={'src': str_number_strip_check(str(r[0])),'dst' : str_number_strip_check(str(r[1])),'amaflags': str(r[2]),'billsec': str(r[3]),'disposition': str(r[4]),'uniqueid': str(r[5]),'userfield': str(r[6]),'calldate': str(r[7]),'hangup_src': str(r[8])} retlist.append(row) cur_update.execute("update asteriskcdrdb."+cdr_tables+" set analysis = '1' where uniqueid = '"+str(r[5])+"'") cur_update.close() cur.close() conn.close() return retlist def check_extension_cdr(uniqueid): retlist =[] conn = pymysql.connect(host=HOST, port=PORT,user=USER,passwd=PASSWD,db=DB) cur = conn.cursor() cdr_tables = "cdro_"+str(time.strftime("%Y_%-m_%-d",time.localtime())) cur.execute("select 1 from information_schema.TABLES where table_name='"+cdr_tables+"'") if cur.rowcount > 0 : cur.execute("select src, dst, amaflags, billsec,disposition,uniqueid,userfield,calldate,hangup_src from asteriskcdrdb."+cdr_tables+" where uniqueid = '"+uniqueid+"'") for r in cur: row ={'src': str_number_strip_check(str(r[0])),'dst' : str_number_strip_check(str(r[1])),'amaflags': str(r[2]),'billsec': str(r[3]),'disposition': str(r[4]),'uniqueid': str(r[5]),'userfield': str(r[6]),'calldate': str(r[7]),'hangup_src': str(r[8])} retlist.append(row) cur.close() conn.close() return retlist def get_device_by_extension(extension): conn = pymysql.connect(host=HOST, port=PORT,user=USER,passwd=PASSWD,db=DB) cur = conn.cursor() cur.execute("select device from `asterisk`.`device_login` where extension = '"+extension+"' limit 1 ") device = "" for r in cur: device = 'MIDEA'+str(r[0]) cur.close() conn.close() return device #update set heart data def update_set_heart(extension): conn = pymysql.connect(host=HOST, port=PORT,user=USER,passwd=PASSWD,db=DB) cur = conn.cursor() cur.execute("UPDATE asterisk.device_login set heart_time = UNIX_TIMESTAMP(NOW()) where extension = '" + extension +"'") cur.close() conn.close() return "" def get_extension_pop_callin(extension): conn = pymysql.connect(host=HOST, port=PORT,user=USER,passwd=PASSWD,db=DB) cur = conn.cursor() cur.execute("select activation, calla as src , callb as dst, uid, Stats, action, pjid from asteriskcdrdb.pop where callb='"+extension+"' limit 1 ") row = "" for r in cur: device = get_device_by_extension(str(r[2])); row ={'Event':'CALLIN','Time':str(r[0]),'CallId' : str_number_strip_check(str(r[1])), 'StaffId': str(r[2]), 'CallSession': str(r[3]), 'Stats': str(r[4]),'action': str(r[5]),'action': str(r[6]),'TerminalId':device} cur.execute("DELETE from asteriskcdrdb.pop where uid = '"+str(r[3])+"'") cur.execute("update asterisk.users set heart_times =NOW() where extension = '"+extension+"'") cur.close() conn.close() return row def get_extension_pop_callout(extension): conn = pymysql.connect(host=HOST, port=PORT,user=USER,passwd=PASSWD,db=DB) cur = conn.cursor() cur.execute("select activation, calla as src , callb as dst, uid, Stats, action, pjid from asteriskcdrdb.pop where calla='"+extension+"' limit 1") row = "" for r in cur: device = get_device_by_extension(str(r[1])) row ={'Event':'CALLOUT','Time':str(r[0]),'StaffId' : str(r[1]), 'CallId': str_number_strip_check(str(r[2])), 'CallSession': str(r[3]), 'Stats': str(r[4]),'action': str(r[5]),'action': str(r[6]),'TerminalId':device} cur.execute("DELETE from asteriskcdrdb.pop where uid = '"+str(r[3])+"'") cur.close() conn.close() return row #get http request def http_reques_uncallcc(url): req_header = {'User-Agent':'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11', 'Accept':'text/html;q=0.9,*/*;q=0.8', 'Accept-Charset':'ISO-8859-1,utf-8;q=0.7,*;q=0.3', 'Accept-Encoding':'gzip', 'Connection':'close', 'Referer':None } req_timeout = 3 req = urllib2.Request(url,None,req_header) resp = urllib2.urlopen(req,None,req_timeout) html = resp.read() return html def parse_data(msg): code_length = ord(msg[1]) & 127 if code_length == 126: masks = msg[4:8] data = msg[8:] elif code_length == 127: masks = msg[10:14] data = msg[14:] else: masks = msg[2:6] data = msg[6:] i = 0 raw_str = '' for d in data: raw_str += chr(ord(d) ^ ord(masks[i%4])) i += 1 return raw_str def sendMessage(message): global connectionlist for connection in connectionlist.values(): back_str = [] back_str.append('\x81') data_length = len(message) if data_length <= 125: back_str.append(chr(data_length)) else: back_str.append(chr(126)) back_str.append(chr(data_length >> 8)) back_str.append(chr(data_length & 0xFF)) back_str = "".join(back_str) + message write_log(u'send message:' + message) connection.send(back_str) def sendMessageIndex(message, index): try: global connectionlist back_str = [] back_str.append('\x81') data_length = len(message) if data_length <= 125: back_str.append(chr(data_length)) else: back_str.append(chr(126)) back_str.append(chr(data_length >> 8)) back_str.append(chr(data_length & 0xFF)) back_str = "".join(back_str) + message write_log(u'send message:' + message) connectionlist['connection'+str(index)].send(back_str) except (ZeroDivisionError,Exception) as e: del webclasslist[index] connectionlist['connection'+str(index)].is_login = 0 deleteconnection(str(index)) connectionlist['connection'+str(index)].close() write_log("EX1------->"+e) def deleteconnection(item): global connectionlist global webclasslist del webclasslist[item] del connectionlist['connection'+item] class WebSocket(threading.Thread): GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" def __init__(self,conn,index,name,remote, path="/"): threading.Thread.__init__(self) self.conn = conn self.index = index self.name = name self.remote = remote self.path = path self.buffer = "" self.extension = "" self.is_login = 0 self.device ="" self.status = "" def run(self): write_log('Uncallcc Web Socket%s Start!' % self.index) headers = {} self.handshaken = False while True: if self.handshaken == False: write_log('Socket%s Start Handshaken with %s!' % (self.index,self.remote)) self.buffer += bytes.decode(self.conn.recv(RECV_BUF_SIZE)) if self.buffer.find('\r\n\r\n') != -1: header, data = self.buffer.split('\r\n\r\n', 1) for line in header.split("\r\n")[1:]: key, value = line.split(": ", 1) headers[key] = value headers["Location"] = ("ws://%s%s" %(headers["Host"], self.path)) key = headers['Sec-WebSocket-Key'] token = b64encode(hashlib.sha1(str.encode(str(key + self.GUID))).digest()) handshake="HTTP/1.1 101 Switching Protocols\r\n"\ "Upgrade: websocket\r\n"\ "Connection: Upgrade\r\n"\ "Sec-WebSocket-Accept: "+bytes.decode(token)+"\r\n"\ "WebSocket-Origin: "+str(headers["Origin"])+"\r\n"\ "WebSocket-Location: "+str(headers["Location"])+"\r\n\r\n" self.conn.send(str.encode(str(handshake))) self.handshaken = True write_log('Socket%s Handshaken with %s success!' %(self.index, self.remote)) else: try: mm=self.conn.recv(RECV_BUF_SIZE) msg_utf8 = parse_data(mm) #utf8 msg_unicode = msg_utf8.decode('utf-8', 'ignore') #unicode msg_json_data = json.loads(msg_unicode) action = msg_json_data['Event'] if self.is_login == 0: if action!="LOGIN": if action == 'LOGOUT': #quit extension = msg_json_data['extension'] paaswd = msg_json_data['paaswd'] request_url = "http://"+UNCALL_HTTP_HOST+"/phpapi/deviceloout.php?exten="+ extension +"&device="+devices write_log('-----> %s action LOGOUT : %s ' %(self.index, request_url)) request = http_reques_uncallcc(request_url) if request == "OK": ret_data = {'Event':'LOGOUT','errorCode':'E0','errorMsg':'SUCCESS'} sendMessageIndex(json.dumps(ret_data),self.index) else: ret_data = {'Event':'LOGOUT','errorCode':'E6','errorMsg':request} sendMessageIndex(json.dumps(ret_data),self.index) else : ret_data = {'Event':'LOGIN','errorCode':'E2','errorMsg':'NO LOGIN'} sendMessageIndex(json.dumps(ret_data),self.index) else : extension = msg_json_data['extension'] paaswd = msg_json_data['paaswd'] write_log('----->action start login : %s : %s ' %(self.index, paas)) if check_extension(extension) > 0 : self.is_login=1 self.extension = extension ret_data = {'Event':'LOGIN','errorCode':'E0','errorMsg':'SUCCESS'} sendMessageIndex(json.dumps(ret_data),self.index) else : ret_data = {'Event':'LOGIN','errorCode':'E4','errorMsg':'NO EXTEN'} sendMessageIndex(json.dumps(ret_data),self.index) else: # dial if action != "DIAL" and action != "QUIT" and action != "LOGOUT" and action !="HANGUP" and action != "GETSEATALL" and action !="GETSEAT" and action !="HB" and action !="SETBUSY" and action !="SETIDLE" and action !="TRANSFERCALL" and action!='CHECK_CDR' and action !="CALLIN" and action != "CALLOUT" : ret_data = {'errorCode':'E3','errorMsg':'NO EVENT'} sendMessageIndex(json.dumps(ret_data),self.index) else: if action == 'QUIT': #quit extension = msg_json_data['StaffId'] devices_midea = msg_json_data['TerminalId'] devices = devices_midea[5:] request_url = "http://"+UNCALL_HTTP_HOST+"/phpapi/deviceloout.php?exten="+self.extension+"&device="+devices write_log('-----> %s action QUIT : %s ' %(self.index, request_url)) request = http_reques_uncallcc(request_url) ret_data = {'Event':'QUIT','errorCode':'E0','errorMsg':'SUCCESS'} sendMessageIndex(json.dumps(ret_data),self.index) self.is_login=0 deleteconnection(str(self.index)) self.conn.close() break if action == 'DIAL': #dial event request_url = "http://"+UNCALL_HTTP_HOST+"/phpapi/call.php?extension="+self.device+"&phone="+msg_json_data['DTMF'] request = http_reques_uncallcc(request_url) write_log('-----> %s action DIAL : %s ' %(self.index, request_url)) if request == "OK": ret_data = {'Event':'DIAL','errorCode':'E0','errorMsg':'SUCCESS'} sendMessageIndex(json.dumps(ret_data),self.index) else: ret_data = {'Event':'DIAL','errorCode':'E5','errorMsg':'FAILED'} sendMessageIndex(json.dumps(ret_data),self.index) if action == "HB": #pop event ret_data = {'Event':'HB','errorCode':'E0','errorMsg':'SUCCESS'} write_log('-----> %s action HB : %s ' %(self.index, extension)) update_set_heart(extension) sendMessageIndex(json.dumps(ret_data),self.index) if action == "GETSEATALL" : #ALL STATUS ret_data = {'Event':'GETSEATALL','errorCode':'E0','errorMsg':'SUCCESS','data':get_extension_status_all()} sendMessageIndex(json.dumps(ret_data),self.index) if action == "HANGUP": #HANGUP request_url = "http://"+UNCALL_HTTP_HOST+"/phpapi/hangup.php?extension="+self.device request = http_reques_uncallcc(request_url) write_log('-----> %s action HANGUP : %s ' %(self.index, request_url)) if request == "OK": ret_data = {'Event':'HANGUP','errorCode':'E0','errorMsg':'SUCCESS'} sendMessageIndex(json.dumps(ret_data),self.index) else: ret_data = {'Event':'HANGUP','errorCode':'E5','errorMsg':'FAILED'} sendMessageIndex(json.dumps(ret_data),self.index) if action == "SETBUSY" : #set busy request_url = "http://"+UNCALL_HTTP_HOST+"/phpapi/setbusy.php?extension="+self.device request = http_reques_uncallcc(request_url) write_log('-----> %s action SETBUSY : %s ' %(self.index, request_url)) if request == "OK": ret_data = {'Event':'SETBUSY','errorCode':'E0','errorMsg':'SUCCESS'} sendMessageIndex(json.dumps(ret_data),self.index) else: ret_data = {'Event':'SETBUSY','errorCode':'E5','errorMsg':'FAILED'} sendMessageIndex(json.dumps(ret_data),self.index) if action == "SETIDLE" : #set idle request_url = "http://"+UNCALL_HTTP_HOST+"/phpapi/setIdle.php?extension="+self.device write_log('-----> %s action SETIDLE : %s ' %(self.index, request_url)) request = http_reques_uncallcc(request_url) if request == "OK": ret_data = {'Event':'SETIDLE','errorCode':'E0','errorMsg':'SUCCESS'} sendMessageIndex(json.dumps(ret_data),self.index) else: ret_data = {'Event':'SETIDLE','errorCode':'E5','errorMsg':'FAILED'} sendMessageIndex(json.dumps(ret_data),self.index) if action == "TRANSFERCALL" : #transferCall request_url = "http://"+UNCALL_HTTP_HOST+"/phpapi/transferCall.php?extension="+self.device+"&phone="+msg_json_data['Phone'] write_log('-----> %s action TRANSFERCALL : %s ' %(self.index, request_url)) request = http_reques_uncallcc(request_url) if request == "OK": ret_data = {'Event':'TRANSFERCALL','errorCode':'E0','errorMsg':'SUCCESS'} sendMessageIndex(json.dumps(ret_data),self.index) else: ret_data = {'Event':'TRANSFERCALL','errorCode':'E5','errorMsg':'FAILED'} sendMessageIndex(json.dumps(ret_data),self.index) if action == "CHECK_CDR": uniquid = msg_json_data['Uniquid'] cdr_list = check_extension_cdr(uniquid) ret_data = {'Event':'CHECK_CDR','errorCode':'E0','errorMsg':'SUCCESS','data':cdr_list} sendMessageIndex(json.dumps(ret_data),self.index) if action == "CALLIN": ret_data = {'Event':'CALLIN','errorCode':'E0','errorMsg':'SUCCESS'} sendMessageIndex(json.dumps(ret_data),self.index) if action == "LOGOUT": request_url = "http://"+UNCALL_HTTP_HOST+"/phpapi/deviceloout.php?exten="+self.extension+"&device="+self.device request = http_reques_uncallcc(request_url) write_log('-----> %s action LOGOUT : %s ' %(self.index, request_url)) ret_data = {'Event':'CALLOUT','errorCode':'E0','errorMsg':'SUCCESS'} sendMessageIndex(json.dumps(ret_data),self.index) self.is_login=0 deleteconnection(str(self.index)) self.conn.close() break except (ZeroDivisionError,Exception) as e: ret_data = {'errorCode':'E1','errorMsg':'REQUEST ERROR'} self.is_login=0 deleteconnection(str(self.index)) self.conn.close() write_log("EX02---> "+str(self.index)+" :"+str(e)) break self.buffer = "" class WebSocketServer(object): def __init__(self): self.socket = None def begin(self): write_log('Uncallcc WebSocketServer Start!') self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, SEND_BUF_SIZE) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, RECV_BUF_SIZE) self.socket.bind((LISTEN_IP,LISTEN_PORT)) self.socket.listen(100) global connectionlist global webclasslist i=0 while True: connection, address = self.socket.accept() username=address[0] newSocket = WebSocket(connection,i,username,address) newSocket.start() webclasslist[i]= newSocket connectionlist['connection'+str(i)]=connection write_log('add connection'+str(i)) i = i + 1 if __name__ == "__main__": t = threading.Thread(target=data_check) t.setDaemon(True) t.start() server = WebSocketServer() server.begin()
三、HTML websocket 测试
<!DOCTYPE html> <html> <head> <title>WebSocket</title> <style> html, body { font: normal 0.9em arial, helvetica; } #log { width: 800px; height: 400px; border: 1px solid #7F9DB9; overflow: auto; } #msg { width: 800px; } </style> <script> var socket; function init() { var host = "ws://120.177.122.25:6899/"; try { socket = new WebSocket(host); socket.onopen = function (msg) { log('Connected'); }; socket.onmessage = function (msg) { log(msg.data); if(msg.data=="SUCCESS"){//连接成功 } if(msg.data.indexOf("errorCode")){//有事件交互 var json_data = JSON.parse(msg.data); console.log(json_data['errorCode']); if(json_data['errorCode']=="E0"){ if(json_data['Event']=="LOGIN"){// console.log('SUCCESS LOGIN'); send_hearbeat(); } } } }; socket.onclose = function (msg) { log("Lose Connection!"); }; } catch (ex) { log(ex); } $("msg").focus(); } function send_hearbeat() { socket.send("{\"Event\":\"HB\",\"Exten\":\"801\"}"); setTimeout("send_hearbeat();", 30000); } function send() { var txt, msg; txt = $("msg"); msg = txt.value; if (!msg) { alert("Message can not be empty"); return; } txt.value = ""; txt.focus(); try { socket.send(msg); } catch (ex) { log(ex); } } window.onbeforeunload = function () { try { socket.send('quit'); socket.close(); socket = null; } catch (ex) { log(ex); } }; function $(id) { return document.getElementById(id); } function log(msg) { $("log").innerHTML += "<br>" + msg; } function onkey(event) { if (event.keyCode == 13) { send(); } } </script> </head> <body onload="init()"> <h3>Uncallcc WebSocket Demo</h3> <br><br> <div id="log"></div><br><br> <input id="msg" type="textbox" onkeypress="onkey(event)"/> <br><br> <button onclick="send()">Send</button> </body> </html>
发表评论