几周前,我对交易感兴趣,发现大多数公司都提供他们的付费服务来分析外汇数据。 我的目标是实施一些机器学习算法来预测市场。 因此,我决定创建一个实时API以便在React中使用它并测试自己的自动化策略。
在本教程结束时,您无需使用任何在线服务即可将任何网站转换为API。 我们将主要使用Beautiful Soup和Django REST Framework通过抓取外汇数据来构建实时API。
您需要对Django和Ubuntu有基本的了解,才能运行一些重要的命令。 如果您使用的是其他操作系统,则可以下载Anaconda来简化工作。
安装与配置
首先,通过以下命令创建并激活虚拟环境:
1 2 |
virtualenv env . env/bin/activate |
激活环境后,安装Django和Django REST Framework:
1 |
pip install django djangorestframework |
现在,创建一个名为trading的新项目,并在您的项目内部创建一个名为forexAPI的应用程序。
1 2 3 |
django-admin startproject trading cd trading django-admin startapp forexAPI |
然后打开您的settings.py并更新INSTALLED_APPS配置:
settings.py
1 2 3 4 5 6 |
INSTALLED_APPS = [ ... 'rest_framework', 'forexAPI', ] |
为了创建实时API,我们需要不断抓取和更新数据。 一旦我们的应用程序流量超负荷,Web服务器就只能处理一定数量的请求,而使用户等待太久。 此时,Celery是执行后台任务处理的最佳选择。 将搜寻器传递到要在后台执行的队列将使服务器随时准备响应新请求。
1 |
pip install Celery |
另外,Celery需要消息代理来发送和接收消息,因此我们必须利用RabbitMQ作为解决方案。 您可以通过以下命令在Ubuntu的存储库中安装RabbitMQ:
1 |
sudo apt-get install rabbitmq-server |
然后启用并启动RabbitMQ服务:
1 2 |
sudo systemctl enable rabbitmq-server sudo systemctl start rabbitmq-server |
如果您使用其他操作系统,则可以按照RabbitMQ官方文档中的下载说明进行操作。
安装完成后,在settings.py文件末尾添加CELERY_BROKER_URL配置:
settings.py
1 |
CELERY_BROKER_URL = 'amqp://localhost' |
现在,我们必须为“ celery”程序设置默认的Django设置模块。 在根目录内创建一个名为celery.py的新文件,如下图所示:
1 2 3 4 5 6 7 |
. ├── asgi.py ├── celery.py ├── __init__.py ├── settings.py ├── urls.py └── wsgi.py |
celery.py
1 2 3 4 5 6 7 8 |
import os from celery import Celery os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'trading.settings') app = Celery('trading') app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks() |
我们正在为“ celery”程序设置默认的Django设置模块,并从所有已注册的Django应用程序配置加载任务模块。
在同一目录(根目录)中打开__init__.py并导入celery,以确保Django启动后即可加载我们的Celery应用程序。
1 2 3 |
from .celery import app as celery_app __all__ = ['celery_app'] |
用Beautiful Soup 获取数据
我们将使用Beautiful Soup抓取一个流行的实时市场筛选器,其中一个名为investing.com,它很容易使用解析器工具,不需要任何实际的解析理论和技术知识。 得益于出色的文档,可以轻松地通过许多代码示例进行学习。 使用以下命令安装Beautiful Soup:
1 |
pip install beautifulsoup4 |
下一步是创建一个模型,以将爬网的数据保存在数据库中。 如果您打开网站,则可以看到带有列名的外汇表格,这将是我们的模型字段。
models.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
from django.db import models class Currency(models.Model): pair = models.CharField(max_length=20) bid = models.FloatField() ask = models.FloatField() high = models.FloatField() low = models.FloatField() change = models.CharField(max_length=20) change_p = models.CharField(max_length=20) time = models.TimeField() class Meta: verbose_name = 'Currency' verbose_name_plural = 'Currencies' def __str__(self): return self.pair |
通过以下命令迁移数据库:
1 2 |
python manage.py makemigrations forexAPI python manage.py migrate |
迁移后,在应用程序目录(forexAPI)内创建一个名为task.py的新文件,其中将包含我们所有的Celery任务。 我们在项目根目录中构建的Celery应用程序将收集INSTALLED_APPS中提到的所有Django应用程序中的所有任务。 在实施之前,请打开浏览器的开发人员工具以检查将要爬网的表元素。
最初,我们使用urllib的抽象类Request来打开网站,因为Beautiful Soup无法向特定的Web服务器发出请求。 然后,我们必须获取所有表行(<tr>)并对其进行迭代以进入单元格的详细信息(<td>)。 考虑行内的表格单元格,您会注意到类名包括用于定义特定行号的增量值,因此我们还需要保留迭代次数以获取有关行的正确信息。 Python提供了一个内置函数enumerate()来处理这种迭代器,枚举行以在类名内部传递索引。
task.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
from time import sleep from celery import shared_task from bs4 import BeautifulSoup from urllib.request import urlopen, Request from .models import Currency @shared_task # some heavy stuff here def create_currency(): print('Creating forex data ..') req = Request('https://www.investing.com/currencies/single-currency-crosses', headers={'User-Agent': 'Mozilla/5.0'}) html = urlopen(req).read() bs = BeautifulSoup(html, 'html.parser') # get first 5 rows currencies = bs.find("tbody").find_all("tr")[0:5] # enumerate rows to pass index inside class name # starting index from 1 for idx, currency in enumerate(currencies, 1): pair = currency.find("td", class_="plusIconTd").a.text bid = currency.find("td", class_=f"pid-{idx}-bid").text ask = currency.find("td", class_=f"pid-{idx}-ask").text high = currency.find("td", class_=f"pid-{idx}-high").text low = currency.find("td", class_=f"pid-{idx}-low").text change = currency.find("td", class_=f"pid-{idx}-pc").text change_p = currency.find("td", class_=f"pid-{idx}-pc").text time = currency.find("td", class_=f"pid-{idx}-time").text print({'pair':pair, 'bid':bid, 'ask':ask, 'high':high, 'low':low, 'change':change, 'change_p':change_p, 'time':time}) # create objects in database Currency.objects.create( pair=pair, bid=bid, ask=ask, high=high, low=low, change=change, change_p=change_p, time=time ) # sleep few seconds to avoid database block sleep(5) create_currency() |
@shared_task将为每个应用创建任务的独立实例,从而使任务可重用,因此,对于耗时的任务,请指定此装饰器,这一点很重要。 该函数将为每个爬网的行创建一个新对象,并休眠几秒钟,以避免阻塞数据库。
保存文件并在控制台中运行Celery worker,以查看结果。
1 |
celery -A trading worker -l info |
注册模型:
1 2 3 |
from django.contrib import admin from .models import Currency admin.site.register(Currency) |
要创建实时数据,我们需要不断更新这些对象。 我们可以通过在以前的功能中进行一些小的更改来实现。
task.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
@shared_task # some heavy stuff here def update_currency(): print('Updating forex data ..') req = Request('https://www.investing.com/currencies/single-currency-crosses', headers={'User-Agent': 'Mozilla/5.0'}) html = urlopen(req).read() bs = BeautifulSoup(html, 'html.parser') currencies = bs.find("tbody").find_all("tr")[0:5] for idx, currency in enumerate(currencies, 1): pair = currency.find("td", class_="plusIconTd").a.text bid = currency.find("td", class_=f"pid-{idx}-bid").text ask = currency.find("td", class_=f"pid-{idx}-ask").text high = currency.find("td", class_=f"pid-{idx}-high").text low = currency.find("td", class_=f"pid-{idx}-low").text change = currency.find("td", class_=f"pid-{idx}-pc").text change_p = currency.find("td", class_=f"pid-{idx}-pc").text time = currency.find("td", class_=f"pid-{idx}-time").text # create dictionary data = {'pair':pair, 'bid':bid, 'ask':ask, 'high':high, 'low':low, 'change':change, 'change_p':change_p, 'time':time} # find the object by filtering and update all fields Currency.objects.filter(pair=pair).update(**data) sleep(5) |
要更新现有对象,我们应该使用filter方法来查找特定对象,并将字典传递给update()方法。 这是一次处理多个字段的最佳方法之一。 这是实时更新的完整代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
from time import sleep from celery import shared_task from bs4 import BeautifulSoup from urllib.request import urlopen, Request from .models import Currency @shared_task # some heavy stuff here def create_currency(): print('Creating forex data ..') req = Request('https://www.investing.com/currencies/single-currency-crosses', headers={'User-Agent': 'Mozilla/5.0'}) html = urlopen(req).read() bs = BeautifulSoup(html, 'html.parser') # get first 5 rows currencies = bs.find("tbody").find_all("tr")[0:5] # enumerate rows to include index inside class name # starting index from 1 for idx, currency in enumerate(currencies, 1): pair = currency.find("td", class_="plusIconTd").a.text bid = currency.find("td", class_=f"pid-{idx}-bid").text ask = currency.find("td", class_=f"pid-{idx}-ask").text high = currency.find("td", class_=f"pid-{idx}-high").text low = currency.find("td", class_=f"pid-{idx}-low").text change = currency.find("td", class_=f"pid-{idx}-pc").text change_p = currency.find("td", class_=f"pid-{idx}-pc").text time = currency.find("td", class_=f"pid-{idx}-time").text print({'pair':pair, 'bid':bid, 'ask':ask, 'high':high, 'low':low, 'change':change, 'change_p':change_p, 'time':time}) # create objects in database Currency.objects.create( pair=pair, bid=bid, ask=ask, high=high, low=low, change=change, change_p=change_p, time=time ) # sleep few seconds to avoid database block sleep(5) @shared_task # some heavy stuff here def update_currency(): print('Updating forex data ..') req = Request('https://www.investing.com/currencies/single-currency-crosses', headers={'User-Agent': 'Mozilla/5.0'}) html = urlopen(req).read() bs = BeautifulSoup(html, 'html.parser') currencies = bs.find("tbody").find_all("tr")[0:5] for idx, currency in enumerate(currencies, 1): pair = currency.find("td", class_="plusIconTd").a.text bid = currency.find("td", class_=f"pid-{idx}-bid").text ask = currency.find("td", class_=f"pid-{idx}-ask").text high = currency.find("td", class_=f"pid-{idx}-high").text low = currency.find("td", class_=f"pid-{idx}-low").text change = currency.find("td", class_=f"pid-{idx}-pc").text change_p = currency.find("td", class_=f"pid-{idx}-pc").text time = currency.find("td", class_=f"pid-{idx}-time").text # create dictionary data = {'pair':pair, 'bid':bid, 'ask':ask, 'high':high, 'low':low, 'change':change, 'change_p':change_p, 'time':time} # find the object by filtering and update all fields Currency.objects.filter(pair=pair).update(**data) sleep(5) create_currency() while True: # updating data every 15 seconds sleep(15) update_currency() |
实时爬网程序可能会中断服务器,最终导致您无法访问某个网页,因此,在连续抓取并绕过任何限制时被检测不到很重要。 您可以通过在类Request的实例上设置代理来防止检测。
1 2 3 4 5 6 7 8 |
proxy_host = 'localhost:1234' # host and port of your proxy url = 'http://www.httpbin.org/ip' req = urlrequest.Request(url) req.set_proxy(proxy_host, 'http') response = urlrequest.urlopen(req) print(response.read().decode('utf8')) |
使用Dango REST框架创建API
最后一步是创建序列化程序,以从爬网数据构建REST API。 通过使用序列化程序,我们可以将模型实例转换为可以轻松呈现为JSON的本地Python数据类型。 ModelSerializer类提供了一个快捷方式,使您可以自动创建带有与Model字段相对应的字段的Serializer类。 有关更多信息,请查看Django REST框架的官方文档。
在您的应用程序内创建serializers.py:
serializers.py
1 2 3 4 5 6 7 8 |
from rest_framework import serializers from .models import Currency class CurrencySerializer(serializers.ModelSerializer): class Meta: model = Currency fields = '__all__' # importing all fields |
现在,打开views.py以创建表示模型实例集合的ListAPIView。 它用于只读端点,并提供get方法处理程序。
1 2 3 4 5 6 7 8 |
from django.shortcuts import render from rest_framework import generics from .models import Currency from .serializers import CurrencySerializer class ListCurrencyView(generics.ListAPIView): queryset = Currency.objects.all() # used for returning objects from this view serializer_class = CurrencySerializer |
有关通用视图的更多信息,请访问通用视图。 最后,配置urls.py以呈现视图:
1 2 3 4 5 6 7 8 |
from django.contrib import admin from django.urls import path from forexAPI.views import ListCurrencyView urlpatterns = [ path('admin/', admin.site.urls), path('', ListCurrencyView.as_view()) ] |
在基于类的视图中,必须调用函数as_view()以返回可调用视图,该视图接受请求并返回响应。 这是请求-响应周期中通用视图的主要入口点。
你几乎已经完成! 为了正确运行项目,您必须分别运行celery和Django服务器。 最终结果应如下所示:
15秒后值会发生变化。
结论
Web抓取在数据行业中起着主要作用,并被公司用来保持竞争力。 当您想按需获取信息时,实时模式会很有用。 不过请记住,您要在要抓取的网站上增加大量服务器负载,因此也许要检查一下它们是否具有API或其他获取数据的方式。 公司为提供服务付出了很大的努力,因此在生产中使用服务之前,最好尊重他们的业务并征得其许可。
源码:https://github.com/raszidzie/real-time-forex-api/
原文:https://blog.soshace.com/creating-real-time-api-with-beautiful-soup-and-django-rest-framework/