Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
openSUSE:Leap:15.2
python-ijson
tests_asyncio.py
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File tests_asyncio.py of Package python-ijson
# -*- coding:utf-8 -*- import asyncio import io from ijson import compat class AsyncReader(object): def __init__(self, data): if type(data) == compat.bytetype: self.data = io.BytesIO(data) else: self.data = io.StringIO(data) async def read(self, n=-1): return self.data.read(n) class Async(object): '''Test adaptation for async generators''' suffix = '_async' def _run(self, f): loop = asyncio.new_event_loop() try: loop.run_until_complete(f) finally: loop.close() def all(self, routine, json_content, *args, **kwargs): events = [] async def run(): async for event in routine(AsyncReader(json_content), *args, **kwargs): events.append(event) self._run(run()) return events def first(self, routine, json_content, *args, **kwargs): events = [] async def run(): async for event in routine(AsyncReader(json_content), *args, **kwargs): events.append(event) if events: return self._run(run()) return events[0]
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor