jf package

Submodules

jf.jfio module

class jf.jfio.MinimalAdapter

Bases: object

>>> a = MinimalAdapter()
>>> a(iter([b"abcde", b"fghij"])).read(2)
'ab'
>>> a.read(2)
'cd'
>>> a.read(2)
'ef'
read(size)
class jf.jfio.StructEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)

Bases: json.encoder.JSONEncoder

Try to convert everything to json

>>> from datetime import datetime
>>> import json
>>> len(json.dumps(datetime.now(), cls=StructEncoder)) > 10
True
default(obj)

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
jf.jfio.data_input(files=None, additionals={}, inputfmt=None, listen=None)

Data input function

>>> import tempfile
>>> with tempfile.NamedTemporaryFile() as tmpfile:
...     tmpfile.write(b'[{"myconfig": "myvalue"}]') and True
...     tmpfile.flush()
...     len(list(data_input([tmpfile.name])))
True
1
>>> import tempfile
>>> with tempfile.NamedTemporaryFile(suffix=".yaml") as tmpfile:
...     tmpfile.write(b'[{"myconfig": "myvalue"}]') and True
...     tmpfile.flush()
...     len(list(data_input([tmpfile.name])))
True
1
>>> with tempfile.NamedTemporaryFile(suffix=".csv") as tmpfile:
...     tmpfile.write(b"hello,world\nno,yes\nno,no\n") and True
...     tmpfile.flush()
...     len(list(data_input([tmpfile.name])))
True
2
>>> list(data_input(["nots3://bucket/key.json"], {}))
Traceback (most recent call last):
...
NotImplementedError: ...
jf.jfio.fetch_file(fn, f, additionals)

Fetch file with custom handler

>>> from io import StringIO
>>> s = StringIO()
>>> class fetch_mod:
...     def jf_fetch_s3(m):
...          return '{"hello": "world"}'
>>> fetch_file("s3://bucket/key.json", s, {"mod": fetch_mod})
>>> s.getvalue()
'{"hello": "world"}'
jf.jfio.fetch_http(url)
jf.jfio.fetch_https(url)
jf.jfio.get_handler(method, fntype, additionals)
jf.jfio.get_supported_formats()
>>> len(get_supported_formats()) > 2
True
jf.jfio.not_dotaccessible(it)
jf.jfio.print_results(ret, output, compact=False, raw=False, additionals={})

Print array with various formats

>>> data = [{"a": 1}]
>>> print_results(data, 'help')
- clipboard
- csv
...
>>> print_results(data, 'py', True)
{'a': 1}
>>> print_results(data, 'json', True)
{"a": 1}
>>> print_results(["hello"], 'json', True, raw=True)
hello
>>> print_results(data, 'json', False)
{
  "a": 1
}
>>> print_results(data, 'yaml')
a: 1
<BLANKLINE>
>>> print_results(data, 'csv')
,a
0,1
<BLANKLINE>
>>> print_results(data, 'pickle')
<bytes>
>>> class serialize_mod:
...     def jf_serialize_msg(m):
...          return repr(m)
>>> print_results(data, 'msg', additionals={"mod": serialize_mod})
<bytes>
>>> print_results(data, 'not supported')
Traceback (most recent call last):
...
NotImplementedError: Cannot output not supported yet. Please consider making a PR!
jf.jfio.save_pandas(alldata, output, _highligh=None)
jf.jfio.write_bytes(barr)
jf.jfio.yield_json_and_json_lines(inp)

Yield json and json lines

Split potentially huge json strings into lines or components for low memory data processing.

Notice: Results are still json strings, so you most likely want to json.loads them.

jf.extra_functions module

class jf.extra_functions.Chain(*args, **kwargs)

Bases: jf.meta.JFTransformation

Show only the first (N) value(s) >>> list(Chain()(Firstnlast(lambda x: 1)([{“a”: 1}, {“a”: 1}, {“a”: 2}]))) [{‘a’: 1}, {‘a’: 2}]

class jf.extra_functions.First(*args, **kwargs)

Bases: jf.meta.JFTransformation

Show only the first (N) value(s) >>> list(First(lambda x: 1)([{“a”: 1}, {“a”: 1}, {“a”: 2}])) [{‘a’: 1}] >>> list(First(“1”)([{“a”: 1}, {“a”: 1}, {“a”: 2}])) [{‘a’: 1}]

class jf.extra_functions.Firstnlast(*args, **kwargs)

Bases: jf.meta.JFTransformation

Show first and last (N) items >>> list(Firstnlast()([{“a”: 1}, {“a”: 1}, {“a”: 2}])) [[{‘a’: 1}], [{‘a’: 2}]] >>> list(Firstnlast(“1”)([{“a”: 1}, {“a”: 1}, {“a”: 2}])) [[{‘a’: 1}], [{‘a’: 2}]]

class jf.extra_functions.Flatten(*args, **kwargs)

Bases: jf.meta.JFTransformation

Yield all subitems of all item

>>> list(Flatten(lambda x: x["a"])([{"a": [1,2,3], "b": [{"c": 1}], "c": {"d": 1}}]))
[{'a.0': 1, 'a.1': 2, 'a.2': 3, 'b.0.c': 1, 'c.d': 1}]
class jf.extra_functions.GroupBy(*args, **kwargs)

Bases: jf.meta.JFTransformation

Group items by value

>>> list(GroupBy(lambda x: x["a"])([{"a": 1}, {"a": 1}, {"a": 2}]))
[{1: [{'a': 1}, {'a': 1}], 2: [{'a': 2}]}]
class jf.extra_functions.JfDel(*args, **kwargs)

Bases: jf.meta.JFTransformation

Yield all subitems of all item

>>> list(YieldFrom(lambda x: x["a"])([{"a": [1,2,3]}]))
[1, 2, 3]
class jf.extra_functions.Last(*args, **kwargs)

Bases: jf.meta.JFTransformation

Show only the last (N) value(s) >>> list(Last(lambda x: 1)([{“a”: 1}, {“a”: 1}, {“a”: 2}])) [{‘a’: 2}] >>> list(Last(“1”)([{“a”: 1}, {“a”: 1}, {“a”: 2}])) [{‘a’: 2}]

class jf.extra_functions.Print(*args, **kwargs)

Bases: jf.meta.JFTransformation

Print (n) values

This prints n values to the stderr, but passes the data through without changes.

>>> list(Print(2)([{"a": 3}, {"a": 1}, {"a": 2}]))
[{'a': 3}, {'a': 1}, {'a': 2}]
>>> list(Print(lambda x: 2)([{"a": 3}, {"a": 1}, {"a": 2}]))
[{'a': 3}, {'a': 1}, {'a': 2}]
class jf.extra_functions.Sorted(*args, **kwargs)

Bases: jf.meta.JFTransformation

Sort items based on the column value >>> list(Sorted(lambda x: x[“a”])([{“a”: 3}, {“a”: 1}, {“a”: 2}])) [{‘a’: 1}, {‘a’: 2}, {‘a’: 3}]

class jf.extra_functions.Transpose(*args, **kwargs)

Bases: jf.meta.JFTransformation

Transpose input >>> list(Transpose(lambda x: x[“a”])([{“a”: 1}, {“a”: 1}, {“a”: 2}])) [{‘a’: [1, 1, 2]}]

class jf.extra_functions.Unique(*args, **kwargs)

Bases: jf.meta.JFTransformation

Calculate unique according to function

>>> list(Unique(lambda x: x["a"])([{"a": 1}, {"a": 1}, {"a": 2}]))
[{'a': 1}, {'a': 2}]
>>> list(Unique()([{"a": 1}, {"a": 1}, {"a": 2}]))
[{'a': 1}, {'a': 2}]
class jf.extra_functions.YieldFrom(*args, **kwargs)

Bases: jf.meta.JFTransformation

Yield all subitems of all item

>>> list(YieldFrom(lambda x: x["a"])([{"a": [1,2,3]}]))
[1, 2, 3]
jf.extra_functions.age(datestr)

Age of a datetime string

>>> age("1 weeks ago").days
6

jf.main module

jf.main.filepath(x)
jf.main.is_datafile(x)
jf.main.jf(processes, query_and_files, imports, import_path, from_file, compact, listen, inputfmt, output, debug, raw, init)

Main of the machine

>>> import tempfile
>>> import json
>>> def run_with_data(data, jffn):
...     with tempfile.NamedTemporaryFile(suffix='.json') as tmpfile:
...         tmpfile.write(json.dumps(data).encode()) and True
...         tmpfile.flush()
...         jffn(tmpfile.name)
>>> def jffn(query, *args):
...     def _fn(fname):
...         ret1 = jf(1, [query, fname], *args)
...         ret2 = jf(2, [query, fname], *args)
...         assert ret1 == ret2
...     return _fn
>>> run_with_data([{"a": "myvalue"}], jffn(".a", [], [], False, True, None, None, 'json', False, False, []))
"myvalue"
"myvalue"
>>> run_with_data([{"a": "myvalue", "b": 1}], jffn("{a}", [], [], False, True, None, None, 'json', False, False, []))
{"a": "myvalue"}
{"a": "myvalue"}
>>> run_with_data([{"a": "myvalue", "b": 1}], jffn("{a, good: .b > 0}", [], [], False, True, None, None, 'json', False, False, []))
{"a": "myvalue", "good": true}
{"a": "myvalue", "good": true}
>>> run_with_data([{"a": "myvalue"}], jffn("{b: .a}", [], [], False, True, None, None, 'json', False, False, []))
{"b": "myvalue"}
{"b": "myvalue"}
>>> run_with_data([{"a": 1}, {"a": 2}], jffn("(.a > 1)", [], [], False, True, None, None, 'json', False, False, []))
{"a": 2}
{"a": 2}
>>> run_with_data([{"a": "myvalue"}], jffn("{hash: hashlib.md5(.a.encode()).hexdigest(), ...}", ["hashlib"], [], False, True, None, None, 'json', False, False, []))
{"a": "myvalue", "hash": "d724a7135ce7d2593c25fc5212d4125a"}
{"a": "myvalue", "hash": "d724a7135ce7d2593c25fc5212d4125a"}

# Init is broken currently # >>> run_with_data([{“a”: “myvalue”}], jffn(“{hash: hashlib.md5(.a.encode()).hexdigest(), c: C, …}”, [“hashlib”], [], False, True, None, None, ‘json’, False, False, [“C=5”])) # {“a”: “myvalue”, “hash”: “d724a7135ce7d2593c25fc5212d4125a”, “c”: 5} # {“a”: “myvalue”, “hash”: “d724a7135ce7d2593c25fc5212d4125a”, “c”: 5}

jf.main.name(x)

jf.process module

class jf.process.DotAccessible(*args, **kwargs)

Bases: dict

Dot accessible version of a dict. For syntactic sugar.

>>> it = DotAccessible({"a": 5})
>>> it.a
5
>>> it.b = 6
>>> it
{'a': 5, 'b': 6}
>>> del it.b
>>> isinstance(it.b, DotAccessibleNone)
True
>>> it
{'a': 5}
>>> DotAccessible({"a": 5}, b=1)
{'a': 5, 'b': 1}
class jf.process.DotAccessibleNone

Bases: object

jf.process.HttpServe(fs, listen, processes)
class jf.process.JFREMOVED

Bases: object

jf.process.camel_to_snake(name)
jf.process.dict_updater(_f)
jf.process.dotaccessible(it)
jf.process.mymap(fs, arr, processes=1)

My mapping function

Apply functions in fs to items in arr. Also supports multiprocessing.

jf.process.run_query(query, data, additionals={}, from_file=False, processes=1, listen=False)

Run query. This function will utilize global imports if used as a library:

>>> import hashlib
>>> list(run_query('.a', [{"a": "521"}, {"a": "643"}]))
['521', '643']
jf.process.undotaccessible(it)
jf.process.worker(x)

worker for multiprocessing >>> worker_init([[“map”, lambda x: x], … [“update”, lambda x: x], … [“function”, lambda x: lambda y: y], … [“filter”, lambda x: x]]) >>> worker({“a”: 1}) {‘a’: 1}

jf.process.worker_init(funcs)

initializer for the worker in multiprocessing

jf.query_parser module

jf.query_parser.parse_query(query, from_file=None, imports=[], import_path=None, debug=False, dosplit=True, inputfmt=None, init=[])

Parse user query

>>> parse_query("{A: .b}, {c: .A, ...}, (.c>1),unique(), yield from .a")
('[["map", lambda x: {"A": x.b}], ["update", lambda x: {"c": x.A}], ["filter", lambda x: (x.c>1)], ["function", lambda x: unique(lambda x: ())], ["function", lambda x: yield_from(lambda x: x.a)]]', [], None, None, [])
>>> parse_query('{timestamps: t.get(f"train/{.audio}"), ...}')
('[["update", lambda x: {"timestamps": t.get(f"train/{x.audio}")}]]', [], None, None, [])
>>> parse_query('{timestamps: t.get(f"train/{.audio}"), ...}', debug=True)
query_parse:
...
('[["update", lambda x: {"timestamps": t.get(f"train/{x.audio}")}]]', [], None, None, [])
>>> import tempfile
>>> with tempfile.NamedTemporaryFile(suffix='.jf') as tmpfile:
...     tmpfile.write(b'#!/usr/bin/env jf\n#import hashlib\n{hash: hashlib.md5(.a).hexdigest(), ...}') and True
...     tmpfile.flush()
...     print(parse_query(tmpfile.name))
True
('[["update", lambda x: {"hash": hashlib.md5(x.a).hexdigest()}]]', ['hashlib'], None, None, [])
jf.query_parser.query_convert(query, debug=False)
jf.query_parser.split_query(q)

Split input query into components

jf.query_parser.withquerytype(query, is_function=False)

Parse query type from query component

>>> withquerytype('{a: 5, ...}')
('update', '{a: 5}')
>>> withquerytype('{a: 5}')
('map', '{a: 5}')
>>> withquerytype('(.a > 5)')
('filter', '(.a > 5)')
>>> withquerytype('count()', True)
('function', 'count()')
>>> withquerytype('sorted(.a)', True)
('function', 'sorted(lambda x: (.a))')

Module contents