python從zk獲取連接並測試dubbo接口


#coding=utf8
import sys
from kazoo.client import KazooClient
import urllib
import json
import telnetlib
import socket
import dubbo_telnet
import unittest
from HTMLTestRunner import HTMLTestRunner


Host = '' # Doubble服務器IP
Port = # Doubble服務端口
tenant_id=""

def coondoubble_data(interface,method,param):
try:
global Host,Port
# 初始化dubbo對象
conn = dubbo_telnet.connect(Host, Port)
# 設置telnet連接超時時間
conn.set_connect_timeout(10)
# 設置dubbo服務返回響應的編碼

conn.set_encoding('utf8')
conn.invoke(interface, method, param)
# print conn.do("ls %s"%(interface) )
command = 'invoke %s.%s("%s")'%(interface,method,param)
# print command
return conn.do(command)
except Exception as e:
return e


def get_dubbo():
global Host
zk = KazooClient(hosts="{}:2181".format(Host))
zk.start()
urls = []
list = zk.get_children("dubbo")
for i in list:
# print i
if 'pacific' in i:
# print i
gg = zk.get_children("/dubbo/{}/consumers".format(i))
if gg:
for j in gg:
url = urllib.unquote(j)
if url.startswith('dubbo:'):
urls.append(url.split('?')[0].split('dubbo://')[1])
gg = zk.get_children("/dubbo/{}/providers".format(i))
if gg:
for j in gg:
url = urllib.unquote(j)
if url.startswith('dubbo:'):
urls.append(url.split('?')[0].split('dubbo://')[1])
services = {}
for i in urls:
path, service = i.split('/')
if not services.get(path):
services.update({path: []})
services[path].append(service)
print json.dumps(services, indent=4)


class ModuleServiceTest(unittest.TestCase):
@classmethod
def setUpClass(self):
global tenant_id
self.interface = 'uyun.pacific.model.api.service.ModelService'
self.tenant_id = tenant_id

def test_getModelVersion(self):
method = 'getModelVersion'
data = coondoubble_data(self.interface, method, self.tenant_id)
self.assertNotEqual(data['versionNum'], '')

def test_checkAndUpgrade(self):
method = 'checkAndUpgrade'
data = coondoubble_data(self.interface, method, self.tenant_id)
self.assertEqual(isinstance(data,bool),True)

def test_(self):
u''''''

if __name__ == '__main__':
get_dubbo()

testsuite = unittest.TestSuite()
testsuite.addTest(unittest.TestLoader().loadTestsFromTestCase(ModuleServiceTest))
with open('test_store_dubbo.html', 'wb') as f:
runner = HTMLTestRunner(stream=f,
verbosity=2,
title='store-res-dubbo接口測試報告'.decode('utf8'),
description='store-res-dubbo接口測試報告'.decode('utf8'))
runner.run(testsuite) # 運行所有的測試用例


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM