python模块restful使用方法实例

2016-01-25 15:22:14 其它

RESTful架构,目前是比较流行的一种互联网软件架构。REST,即Representational State Transfer的缩写。说白点就是网站即软件,再白点就是一个服务软件支持http的四种方法:GET用来获取资源,POST用来新建资源、更新资源,PUT用来更新资源,DELETE用来删除资源。并对外提供一个或多个URI,每个URI对应一个资源;客户端通过URI配合上面的方法就可以和服务段的软件交互。客户端主要是浏览器,使用restful框架的软件对http的支持也为了web应用带来方便。REST这个词,是Roy Thomas Fielding在他2000年的博士论文中提出的。他的贡献很多,可以了解一下。本人工作的方向是SDN,也算是比较潮的东东,其中floodlight就用到了restful框架。开发者为软件开发出一些功能,并提供URI api,用户就可以利用浏览器、curl等工具通过提供的URI从软件中获得想要的信息或者设置软件的功能。对于发开者来说,就是提供URI和URI对应的资源,并将他们对应上,类似dicts={'/path?':resource}。比如重写http GET方法:首先获得客户端请求的url,解析url然后判断其对应的URI,由于URI与应一个资源,那么url就可以访问这个资源了。具体实现上资源也就是方法或者一个类,要看具体实现了。下面来个很简单的例子,因为对于真正功能强大的restful来说,这个例子有几点不足,但是作为简单的演示,应该够了。复制代码 代码如下:#-*-coding:UTF-8-*-import socket,sys,urllibfrom BaseHTTPServer import *class Restful(BaseHTTPRequestHandler):  #所有rest的父类    def __init__(self,request, client_address, server):        BaseHTTPRequestHandler.__init__(self,request, client_address, server)        self.dp=None        self.router=None    def basepath(self):        pass    def getresetlet(self):        pass    def send(self,src):        self.send_response(200)        self.send_header("Content-type", "text/html")        self.end_headers()        self.wfile.write(src)        self.wfile.close()    def done(self):        self.dp=self.basepath()        self.router=self.getrestlet()class Test(Restful):            #测试1    def test(self):  #这就是一个资源        return "{\"date\":\"2013-11-19\"}"    def do_GET(self):  #重写get方法给了通过客户端请求的url找到对应的资源        self.done()        for key in self.router.keys():            tmp=self.dp+key            if tmp in self.path:                  self.send(self.router[key]()) #执行资源    def basepath(self): #这个简单的说就是和下面函数中的路径配合,即/wm/time        return "/wm"    def getrestlet(self):  #这儿就是URI与资源对应,这里只有test资源,可以注册多个        rr={}        rr['/time']=self.test           return rrclass testjson(Restful):      #测试2    def testjson(self,vpc,vr):  #这里比测试1复杂些,因为参数的值需要从url中获得        src1="{\"vpc\":1,\"vrouter\":3,\"day\":[1,2,3]}"        src2="{\"vpc\":1,\"vrouter\":4,\"day\":[23,21,3]}"        src3="{\"vpc\":5,\"vrouter\":3,\"day\":[13,2,23]}"        tlist=[src1,src2,src3]        cmpvpc="\"vpc\":"+vpc        cmpvr="\"vrouter\":"+vr        for k in tlist:            if cmpvpc in k and cmpvr in k:                return k    def firewall(self):        return "{\"filter\":[\"baid.com/\",\"c.cn/\"],\"acl\":{\"accept\":123,\"reject\":321}}"    def do_GET(self):  #重写GET,解析url,这里的self.path类似:/ins/json?vpc=1&vrouter=3        self.done()        print self.path        if 'vpc' in self.path and 'vrouter' in self.path:            query=None            if '?' in self.path:                query =    urllib.splitquery(self.path)            key=query[0]+'?'            param=query[1].split('&') #解析获得属性信息,传递给资源函数            pdict={}            for p in param:                tmp=p.split('=')                pdict[tmp[0]]=tmp[1]              for k in self.router.keys():                if k in key:                    self.send(self.router[k](pdict['vpc'],pdict['vrouter'])) #执行资源        elif 'firewall' in self.path:            self.send(self.router['/firewall']())        else:            self.send("{}")    def basepath(self):        return "/ins"    def getrestlet(self):        rr={}        rr['/json?']=self.testjson #注册资源        rr['/firewall']=self.firewall        return rr            try:    server=HTTPServer(('',8084),testjson) #测试2    server.serve_forever()except KeyboardInterrupt:    sys.exit(0)如果运行上面的程序,运行的是测试2,运行后此程序监听8084。此时在浏览器地址栏输入http://127.0.0.1:8084/ins/json?vpc=1&vrouter=3得到如下图,本人使用的chrome,安装插件后的效果。这里仅仅作为演示,程序有有许多不足。对于好的框架,注册资源不应该和资源在同一个类中,basepath()与getrestlet()不应该在实现的资源类里(也就是上面的测试里),也使router没起作用。应该在另一个类里注册,这样可以通过不同的URI,两个测试都应该能够运行;还有就是使用的python自带的BaseHTTPServer模块,并不适合做restful。但是工作过程大体就是这样,后续有时间会写个python版的restful简易框架。

分析数字中经常是3个数字一组,之后跟一个逗号,因此规律为:***,***,***正则式复制代码 代码如下:[a-z]+,[a-z]?复制代码 代码如下:import resen = "abc,123,456,789,mnp"p = re.compile("\d+,\d+?")for com in p.finditer(sen):    mm = com.group()    print "hi:", mm    print "sen_before:", sen    sen = sen.replace(mm, mm.replace(",", ""))    print "sen_back:", sen, '\n'技巧使用函数finditer(string[, pos[, endpos]]) | re.finditer(pattern, string[, flags]):搜索string,返回一个顺序访问每一个匹配结果(Match对象)的迭代器。复制代码 代码如下:sen = "abc,123,456,789,mnp"while 1:    mm = re.search("\d,\d", sen)    if mm:        mm = mm.group()        sen = sen.replace(mm, mm.replace(",", ""))        print sen    else:        break这样的程序针对具体问题,即数字3位一组,如果数字混杂与字母间,干掉数字间的逗号,即把“abc,123,4,789,mnp”转化为“abc,1234789,mnp”更具体的是找正则式“数字,数字”找到后用去掉逗号的替换复制代码 代码如下:sen = "abc,123,4,789,mnp"while 1:    mm = re.search("\d,\d", sen)    if mm:        mm = mm.group()        sen = sen.replace(mm, mm.replace(",", ""))        print sen    else:

复制代码 代码如下:#!/usr/bin/env python# -*- coding:utf-8 -*-#导入random和string模块import random, stringdef GenPassword(length):    #随机出数字的个数    numOfNum = random.randint(1,length-1)    numOfLetter = length - numOfNum    #选中numOfNum个数字    slcNum = [random.choice(string.digits) for i in range(numOfNum)]    #选中numOfLetter个字母    slcLetter = [random.choice(string.ascii_letters) for i in range(numOfLetter)]    #打乱这个组合    slcChar = slcNum + slcLetter    random.shuffle(slcChar)    #生成密码    genPwd = ''.join([i for i in slcChar])    return genPwdif __name__ == '__main__':

本文实例讲述了Python数组定义方法。分享给大家供大家参考,具体如下:Python中没有数组的数据结构,但列表很像数组,如:a=[0,1,2]这时:a[0]=0, a[1]=1, a[[2]=2,但引出一个问题,即如果数组a想定义为0到999怎么办?这时可能通过a = range(0, 1000)实现。或省略为a = range(1000).如果想定义1000长度的a,初始值全为0,则 a = [0 for x in range(0, 1000)]下面是二维数组的定义:直接定义:a=[[1,1],[1,1]]这里定义了一个2*2的,且初始为0的二维数组。间接定义:a=[[0 for x in range(10)] for y in range(10)]这里定义了10*10初始为0的二维数组。还有更简单的字义二维数组的方法:b = [[0]*10]*10定义10*10初始为0的二维数组。与a=[[0 for x in range(10)] for y in range(10)]比较:print a==b的结果为True。但用b的定义方法代替a后,以前的可以正常运行的程序也出错了,经过仔细分析得出区别:a[0][0]=1时,只有a[0][0]为1,其他全为0。b[0][0]=1时,a[0][0],a[1][0],只到a[9,0]全部为1。由此得到大数组中的10个小的一维数据全是一个相同的引用,即指向同一地址。故 b = [[0]*10]*10并不符合我们常规意义上的二维数组。同时经过试验:c=[0]*10的定义与c=[0 for x in range(10)]有同样的效果,而没有上面相同引用的问题,估计数组c的定义时是值类型相乘,而前面b的用类型的相乘,因为一维数组是一个引用(借用C#中的值类型和引用类型,不知是否合适)。更多关于Python相关内容感兴趣的读者可查看本站专题:《Python字符串操作技巧汇总》、《Python入门与进阶经典教程》及《Python文件与目录操作技巧汇总》希望本文所述对大家Python程序设计有所帮助。

最近我在用梯度下降算法绘制神经网络的数据时,遇到了一些算法性能的问题。梯度下降算法的代码如下(伪代码):def gradient_descent(): # the gradient descent code plotly.write(X, Y)一般来说,当网络请求 plot.ly 绘图时会阻塞等待返回,于是也会影响到其他的梯度下降函数的执行速度。一种解决办法是每调用一次 plotly.write 函数就开启一个新的线程,但是这种方法感觉不是很好。 我不想用一个像 cerely(一种分布式任务队列)一样大而全的任务队列框架,因为框架对于我的这点需求来说太重了,并且我的绘图也并不需要 redis 来持久化数据。那用什么办法解决呢?我在 python 中写了一个很小的任务队列,它可以在一个单独的线程中调用 plotly.write函数。下面是程序代码。from threading import Threadimport Queue import timeclass TaskQueue(Queue.Queue):首先我们继承 Queue.Queue 类。从 Queue.Queue 类可以继承 get 和 put 方法,以及队列的行为。def __init__(self, num_workers=1): Queue.Queue.__init__(self) self.num_workers = num_workers self.start_workers()初始化的时候,我们可以不用考虑工作线程的数量。def add_task(self, task, *args, **kwargs): args = args or () kwargs = kwargs or {} self.put((task, args, kwargs))我们把 task, args, kwargs 以元组的形式存储在队列中。*args 可以传递数量不等的参数,**kwargs 可以传递命名参数。def start_workers(self): for i in range(self.num_workers): t = Thread(target=self.worker) t.daemon = True t.start()我们为每个 worker 创建一个线程,然后在后台删除。下面是 worker 函数的代码:def worker(self): while True: tupl = self.get() item, args, kwargs = self.get() item(*args, **kwargs) self.task_done()worker 函数获取队列顶端的任务,并根据输入参数运行,除此之外,没有其他的功能。下面是队列的代码:我们可以通过下面的代码测试:def blokkah(*args, **kwargs): time.sleep(5) print “Blokkah mofo!”q = TaskQueue(num_workers=5)for item in range(1): q.add_task(blokkah)q.join() # wait for all the tasks to finish.print “All done!”Blokkah 是我们要做的任务名称。队列已经缓存在内存中,并且没有执行很多任务。下面的步骤是把主队列当做单独的进程来运行,这样主程序退出以及执行数据库持久化时,队列任务不会停止运行。但是这个例子很好地展示了如何从一个很简单的小任务写成像工作队列这样复杂的程序。def gradient_descent(): # the gradient descent code queue.add_task(plotly.write, x=X, y=Y)修改之后,我的梯度下降算法工作效率似乎更高了。如果你很感兴趣的话,可以参考下面的代码。from threading import Threadimport Queueimport timeclass TaskQueue(Queue.Queue):def __init__(self, num_workers=1):Queue.Queue.__init__(self)self.num_workers = num_workersself.start_workers()def add_task(self, task, *args, **kwargs):args = args or ()kwargs = kwargs or {}self.put((task, args, kwargs))def start_workers(self):for i in range(self.num_workers):t = Thread(target=self.worker)t.daemon = Truet.start()def worker(self):while True:tupl = self.get()item, args, kwargs = self.get()item(*args, **kwargs)self.task_done()def tests():def blokkah(*args, **kwargs):time.sleep(5)print "Blokkah mofo!"q = TaskQueue(num_workers=5)for item in range(10):q.add_task(blokkah)q.join() # block until all tasks are doneprint "All done!"if __name__ == "__main__":