jf package¶
Submodules¶
jf.jfio module¶
-
class
jf.jfio.
MinimalAdapter
¶ Bases:
object
>>> a = MinimalAdapter() >>> a(iter([b"abcde", b"fghij"])).read(2) 'ab' >>> a.read(2) 'cd' >>> a.read(2) 'ef'
-
read
(size)¶
-
-
class
jf.jfio.
StructEncoder
(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)¶ Bases:
json.encoder.JSONEncoder
Try to convert everything to json
>>> from datetime import datetime >>> import json >>> len(json.dumps(datetime.now(), cls=StructEncoder)) > 10 True
-
default
(obj)¶ Implement this method in a subclass such that it returns a serializable object for
o
, or calls the base implementation (to raise aTypeError
).For example, to support arbitrary iterators, you could implement default like this:
def default(self, o): try: iterable = iter(o) except TypeError: pass else: return list(iterable) # Let the base class default method raise the TypeError return JSONEncoder.default(self, o)
-
-
jf.jfio.
data_input
(files=None, additionals={}, inputfmt=None, listen=None)¶ Data input function
>>> import tempfile >>> with tempfile.NamedTemporaryFile() as tmpfile: ... tmpfile.write(b'[{"myconfig": "myvalue"}]') and True ... tmpfile.flush() ... len(list(data_input([tmpfile.name]))) True 1 >>> import tempfile >>> with tempfile.NamedTemporaryFile(suffix=".yaml") as tmpfile: ... tmpfile.write(b'[{"myconfig": "myvalue"}]') and True ... tmpfile.flush() ... len(list(data_input([tmpfile.name]))) True 1 >>> with tempfile.NamedTemporaryFile(suffix=".csv") as tmpfile: ... tmpfile.write(b"hello,world\nno,yes\nno,no\n") and True ... tmpfile.flush() ... len(list(data_input([tmpfile.name]))) True 2 >>> list(data_input(["nots3://bucket/key.json"], {})) Traceback (most recent call last): ... NotImplementedError: ...
-
jf.jfio.
fetch_file
(fn, f, additionals)¶ Fetch file with custom handler
>>> from io import StringIO >>> s = StringIO() >>> class fetch_mod: ... def jf_fetch_s3(m): ... return '{"hello": "world"}' >>> fetch_file("s3://bucket/key.json", s, {"mod": fetch_mod}) >>> s.getvalue() '{"hello": "world"}'
-
jf.jfio.
fetch_http
(url)¶
-
jf.jfio.
fetch_https
(url)¶
-
jf.jfio.
get_handler
(method, fntype, additionals)¶
-
jf.jfio.
get_supported_formats
()¶ >>> len(get_supported_formats()) > 2 True
-
jf.jfio.
not_dotaccessible
(it)¶
-
jf.jfio.
print_results
(ret, output, compact=False, raw=False, additionals={})¶ Print array with various formats
>>> data = [{"a": 1}] >>> print_results(data, 'help') - clipboard - csv ... >>> print_results(data, 'py', True) {'a': 1} >>> print_results(data, 'json', True) {"a": 1} >>> print_results(["hello"], 'json', True, raw=True) hello >>> print_results(data, 'json', False) { "a": 1 } >>> print_results(data, 'yaml') a: 1 <BLANKLINE> >>> print_results(data, 'csv') ,a 0,1 <BLANKLINE> >>> print_results(data, 'pickle') <bytes> >>> class serialize_mod: ... def jf_serialize_msg(m): ... return repr(m) >>> print_results(data, 'msg', additionals={"mod": serialize_mod}) <bytes> >>> print_results(data, 'not supported') Traceback (most recent call last): ... NotImplementedError: Cannot output not supported yet. Please consider making a PR!
-
jf.jfio.
save_pandas
(alldata, output, _highligh=None)¶
-
jf.jfio.
write_bytes
(barr)¶
-
jf.jfio.
yield_json_and_json_lines
(inp)¶ Yield json and json lines
Split potentially huge json strings into lines or components for low memory data processing.
Notice: Results are still json strings, so you most likely want to json.loads them.
jf.extra_functions module¶
-
class
jf.extra_functions.
Chain
(*args, **kwargs)¶ Bases:
jf.meta.JFTransformation
Show only the first (N) value(s) >>> list(Chain()(Firstnlast(lambda x: 1)([{“a”: 1}, {“a”: 1}, {“a”: 2}]))) [{‘a’: 1}, {‘a’: 2}]
-
class
jf.extra_functions.
First
(*args, **kwargs)¶ Bases:
jf.meta.JFTransformation
Show only the first (N) value(s) >>> list(First(lambda x: 1)([{“a”: 1}, {“a”: 1}, {“a”: 2}])) [{‘a’: 1}] >>> list(First(“1”)([{“a”: 1}, {“a”: 1}, {“a”: 2}])) [{‘a’: 1}]
-
class
jf.extra_functions.
Firstnlast
(*args, **kwargs)¶ Bases:
jf.meta.JFTransformation
Show first and last (N) items >>> list(Firstnlast()([{“a”: 1}, {“a”: 1}, {“a”: 2}])) [[{‘a’: 1}], [{‘a’: 2}]] >>> list(Firstnlast(“1”)([{“a”: 1}, {“a”: 1}, {“a”: 2}])) [[{‘a’: 1}], [{‘a’: 2}]]
-
class
jf.extra_functions.
Flatten
(*args, **kwargs)¶ Bases:
jf.meta.JFTransformation
Yield all subitems of all item
>>> list(Flatten(lambda x: x["a"])([{"a": [1,2,3], "b": [{"c": 1}], "c": {"d": 1}}])) [{'a.0': 1, 'a.1': 2, 'a.2': 3, 'b.0.c': 1, 'c.d': 1}]
-
class
jf.extra_functions.
GroupBy
(*args, **kwargs)¶ Bases:
jf.meta.JFTransformation
Group items by value
>>> list(GroupBy(lambda x: x["a"])([{"a": 1}, {"a": 1}, {"a": 2}])) [{1: [{'a': 1}, {'a': 1}], 2: [{'a': 2}]}]
-
class
jf.extra_functions.
JfDel
(*args, **kwargs)¶ Bases:
jf.meta.JFTransformation
Yield all subitems of all item
>>> list(YieldFrom(lambda x: x["a"])([{"a": [1,2,3]}])) [1, 2, 3]
-
class
jf.extra_functions.
Last
(*args, **kwargs)¶ Bases:
jf.meta.JFTransformation
Show only the last (N) value(s) >>> list(Last(lambda x: 1)([{“a”: 1}, {“a”: 1}, {“a”: 2}])) [{‘a’: 2}] >>> list(Last(“1”)([{“a”: 1}, {“a”: 1}, {“a”: 2}])) [{‘a’: 2}]
-
class
jf.extra_functions.
Print
(*args, **kwargs)¶ Bases:
jf.meta.JFTransformation
Print (n) values
This prints n values to the stderr, but passes the data through without changes.
>>> list(Print(2)([{"a": 3}, {"a": 1}, {"a": 2}])) [{'a': 3}, {'a': 1}, {'a': 2}] >>> list(Print(lambda x: 2)([{"a": 3}, {"a": 1}, {"a": 2}])) [{'a': 3}, {'a': 1}, {'a': 2}]
-
class
jf.extra_functions.
Sorted
(*args, **kwargs)¶ Bases:
jf.meta.JFTransformation
Sort items based on the column value >>> list(Sorted(lambda x: x[“a”])([{“a”: 3}, {“a”: 1}, {“a”: 2}])) [{‘a’: 1}, {‘a’: 2}, {‘a’: 3}]
-
class
jf.extra_functions.
Transpose
(*args, **kwargs)¶ Bases:
jf.meta.JFTransformation
Transpose input >>> list(Transpose(lambda x: x[“a”])([{“a”: 1}, {“a”: 1}, {“a”: 2}])) [{‘a’: [1, 1, 2]}]
-
class
jf.extra_functions.
Unique
(*args, **kwargs)¶ Bases:
jf.meta.JFTransformation
Calculate unique according to function
>>> list(Unique(lambda x: x["a"])([{"a": 1}, {"a": 1}, {"a": 2}])) [{'a': 1}, {'a': 2}] >>> list(Unique()([{"a": 1}, {"a": 1}, {"a": 2}])) [{'a': 1}, {'a': 2}]
-
class
jf.extra_functions.
YieldFrom
(*args, **kwargs)¶ Bases:
jf.meta.JFTransformation
Yield all subitems of all item
>>> list(YieldFrom(lambda x: x["a"])([{"a": [1,2,3]}])) [1, 2, 3]
-
jf.extra_functions.
age
(datestr)¶ Age of a datetime string
>>> age("1 weeks ago").days 6
jf.main module¶
-
jf.main.
filepath
(x)¶
-
jf.main.
is_datafile
(x)¶
-
jf.main.
jf
(processes, query_and_files, imports, import_path, from_file, compact, listen, inputfmt, output, debug, raw, init)¶ Main of the machine
>>> import tempfile >>> import json >>> def run_with_data(data, jffn): ... with tempfile.NamedTemporaryFile(suffix='.json') as tmpfile: ... tmpfile.write(json.dumps(data).encode()) and True ... tmpfile.flush() ... jffn(tmpfile.name) >>> def jffn(query, *args): ... def _fn(fname): ... ret1 = jf(1, [query, fname], *args) ... ret2 = jf(2, [query, fname], *args) ... assert ret1 == ret2 ... return _fn >>> run_with_data([{"a": "myvalue"}], jffn(".a", [], [], False, True, None, None, 'json', False, False, [])) "myvalue" "myvalue" >>> run_with_data([{"a": "myvalue", "b": 1}], jffn("{a}", [], [], False, True, None, None, 'json', False, False, [])) {"a": "myvalue"} {"a": "myvalue"} >>> run_with_data([{"a": "myvalue", "b": 1}], jffn("{a, good: .b > 0}", [], [], False, True, None, None, 'json', False, False, [])) {"a": "myvalue", "good": true} {"a": "myvalue", "good": true} >>> run_with_data([{"a": "myvalue"}], jffn("{b: .a}", [], [], False, True, None, None, 'json', False, False, [])) {"b": "myvalue"} {"b": "myvalue"} >>> run_with_data([{"a": 1}, {"a": 2}], jffn("(.a > 1)", [], [], False, True, None, None, 'json', False, False, [])) {"a": 2} {"a": 2} >>> run_with_data([{"a": "myvalue"}], jffn("{hash: hashlib.md5(.a.encode()).hexdigest(), ...}", ["hashlib"], [], False, True, None, None, 'json', False, False, [])) {"a": "myvalue", "hash": "d724a7135ce7d2593c25fc5212d4125a"} {"a": "myvalue", "hash": "d724a7135ce7d2593c25fc5212d4125a"}
# Init is broken currently # >>> run_with_data([{“a”: “myvalue”}], jffn(“{hash: hashlib.md5(.a.encode()).hexdigest(), c: C, …}”, [“hashlib”], [], False, True, None, None, ‘json’, False, False, [“C=5”])) # {“a”: “myvalue”, “hash”: “d724a7135ce7d2593c25fc5212d4125a”, “c”: 5} # {“a”: “myvalue”, “hash”: “d724a7135ce7d2593c25fc5212d4125a”, “c”: 5}
-
jf.main.
name
(x)¶
jf.process module¶
-
class
jf.process.
DotAccessible
(*args, **kwargs)¶ Bases:
dict
Dot accessible version of a dict. For syntactic sugar.
>>> it = DotAccessible({"a": 5}) >>> it.a 5 >>> it.b = 6 >>> it {'a': 5, 'b': 6} >>> del it.b >>> isinstance(it.b, DotAccessibleNone) True >>> it {'a': 5} >>> DotAccessible({"a": 5}, b=1) {'a': 5, 'b': 1}
-
class
jf.process.
DotAccessibleNone
¶ Bases:
object
-
jf.process.
HttpServe
(fs, listen, processes)¶
-
class
jf.process.
JFREMOVED
¶ Bases:
object
-
jf.process.
camel_to_snake
(name)¶
-
jf.process.
dict_updater
(_f)¶
-
jf.process.
dotaccessible
(it)¶
-
jf.process.
mymap
(fs, arr, processes=1)¶ My mapping function
Apply functions in fs to items in arr. Also supports multiprocessing.
-
jf.process.
run_query
(query, data, additionals={}, from_file=False, processes=1, listen=False)¶ Run query. This function will utilize global imports if used as a library:
>>> import hashlib >>> list(run_query('.a', [{"a": "521"}, {"a": "643"}])) ['521', '643']
-
jf.process.
undotaccessible
(it)¶
-
jf.process.
worker
(x)¶ worker for multiprocessing >>> worker_init([[“map”, lambda x: x], … [“update”, lambda x: x], … [“function”, lambda x: lambda y: y], … [“filter”, lambda x: x]]) >>> worker({“a”: 1}) {‘a’: 1}
-
jf.process.
worker_init
(funcs)¶ initializer for the worker in multiprocessing
jf.query_parser module¶
-
jf.query_parser.
parse_query
(query, from_file=None, imports=[], import_path=None, debug=False, dosplit=True, inputfmt=None, init=[])¶ Parse user query
>>> parse_query("{A: .b}, {c: .A, ...}, (.c>1),unique(), yield from .a") ('[["map", lambda x: {"A": x.b}], ["update", lambda x: {"c": x.A}], ["filter", lambda x: (x.c>1)], ["function", lambda x: unique(lambda x: ())], ["function", lambda x: yield_from(lambda x: x.a)]]', [], None, None, []) >>> parse_query('{timestamps: t.get(f"train/{.audio}"), ...}') ('[["update", lambda x: {"timestamps": t.get(f"train/{x.audio}")}]]', [], None, None, []) >>> parse_query('{timestamps: t.get(f"train/{.audio}"), ...}', debug=True) query_parse: ... ('[["update", lambda x: {"timestamps": t.get(f"train/{x.audio}")}]]', [], None, None, []) >>> import tempfile >>> with tempfile.NamedTemporaryFile(suffix='.jf') as tmpfile: ... tmpfile.write(b'#!/usr/bin/env jf\n#import hashlib\n{hash: hashlib.md5(.a).hexdigest(), ...}') and True ... tmpfile.flush() ... print(parse_query(tmpfile.name)) True ('[["update", lambda x: {"hash": hashlib.md5(x.a).hexdigest()}]]', ['hashlib'], None, None, [])
-
jf.query_parser.
query_convert
(query, debug=False)¶
-
jf.query_parser.
split_query
(q)¶ Split input query into components
-
jf.query_parser.
withquerytype
(query, is_function=False)¶ Parse query type from query component
>>> withquerytype('{a: 5, ...}') ('update', '{a: 5}') >>> withquerytype('{a: 5}') ('map', '{a: 5}') >>> withquerytype('(.a > 5)') ('filter', '(.a > 5)') >>> withquerytype('count()', True) ('function', 'count()') >>> withquerytype('sorted(.a)', True) ('function', 'sorted(lambda x: (.a))')