%% @author Maas-Maarten Zeeman %% @copyright 2011 - 2017 Maas-Maarten Zeeman %% @doc Erlang API for sqlite3 databases %% Copyright 2011 - 2017 Maas-Maarten Zeeman %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. -module(esqlite3). -author("Maas-Maarten Zeeman "). %% higher-level export -compile(export_all). -define(DEFAULT_TIMEOUT, 5000). -define(DEFAULT_CHUNK_SIZE, 5000). %% -type connection() :: {connection, reference(), term()}. -type statement() :: {statement, term(), connection()}. -type sql() :: iodata(). %% @doc Opens a sqlite3 database mentioned in Filename. %% -spec open(FileName) -> {ok, connection()} | {error, _} when FileName :: string(). open(Filename) -> open(Filename, ?DEFAULT_TIMEOUT). %% @doc Open a database connection %% -spec open(Filename, timeout()) -> {ok, connection()} | {error, _} when Filename :: string(). open(Filename, Timeout) -> {ok, Connection} = esqlite3_nif:start(), Ref = make_ref(), ok = esqlite3_nif:open(Connection, Ref, self(), Filename), case receive_answer(Ref, Timeout) of ok -> {ok, {connection, make_ref(), Connection}}; {error, _Msg}=Error -> Error end. %% @doc Subscribe to database notifications %% Messages will come in the shape {action, table, id} %% Where action will be insert | update | delete %% and table will be a string %% and id will be an integer %% -spec set_update_hook(pid(), connection()) -> ok | {error, term()}. set_update_hook(Pid, Connection) -> set_update_hook(Pid, Connection, ?DEFAULT_TIMEOUT). -spec set_update_hook(pid(), connection(), timeout()) -> ok | {error, term()}. set_update_hook(Pid, {connection, _Ref, Connection}, Timeout) -> Ref = make_ref(), ok = esqlite3_nif:set_update_hook(Connection, Ref, self(), Pid), receive_answer(Ref, Timeout). %% @doc Execute a sql statement, returns a list with tuples. -spec q(sql(), connection()) -> list(tuple()) | {error, term()}. q(Sql, Connection) -> q(Sql, [], Connection). %% @doc Execute statement, bind args and return a list with tuples as result. -spec q(sql(), list(), connection()) -> list(tuple()) | {error, term()}. q(Sql, [], Connection) -> case prepare(Sql, Connection) of {ok, Statement} -> fetchall(Statement); {error, _Msg}=Error -> throw(Error) end; q(Sql, Args, Connection) -> case prepare(Sql, Connection) of {ok, Statement} -> ok = bind(Statement, Args), fetchall(Statement); {error, _Msg}=Error -> throw(Error) end. %% @doc -spec map(F, sql(), connection()) -> list(Type) when F :: fun((Row) -> Type) | fun((ColumnNames, Row) -> Type), Row :: tuple(), ColumnNames :: tuple(), Type :: any(). map(F, Sql, Connection) -> case prepare(Sql, Connection) of {ok, Statement} -> map_s(F, Statement); {error, _Msg}=Error -> throw(Error) end. %% @doc -spec foreach(F, sql(), connection()) -> ok when F :: fun((Row) -> any()) | fun((ColumnNames, Row) -> any()), Row :: tuple(), ColumnNames :: tuple(). foreach(F, Sql, Connection) -> case prepare(Sql, Connection) of {ok, Statement} -> foreach_s(F, Statement); {error, _Msg}=Error -> throw(Error) end. %% -spec foreach_s(F, statement()) -> ok when F :: fun((Row) -> any()) | fun((ColumnNames, Row) -> any()), Row :: tuple(), ColumnNames :: tuple(). foreach_s(F, Statement) when is_function(F, 1) -> case try_multi_step(Statement, 1, [], 0) of {'$done', []} -> ok; {error, _} = E -> F(E); {rows, [Row | []]} -> F(Row), foreach_s(F, Statement) end; foreach_s(F, Statement) when is_function(F, 2) -> ColumnNames = column_names(Statement), case try_multi_step(Statement, 1, [], 0) of {'$done', []} -> ok; {error, _} = E -> F([], E); {rows, [Row | []]} -> F(ColumnNames, Row), foreach_s(F, Statement) end. %% -spec map_s(F, statement()) -> list(Type) when F :: fun((Row) -> Type) | fun((ColumnNames, Row) -> Type), Row :: tuple(), ColumnNames :: tuple(), Type :: term(). map_s(F, Statement) when is_function(F, 1) -> case try_multi_step(Statement, 1, [], 0) of {'$done', []} -> []; {error, _} = E -> F(E); {rows, [Row | []]} -> [F(Row) | map_s(F, Statement)] end; map_s(F, Statement) when is_function(F, 2) -> ColumnNames = column_names(Statement), case try_multi_step(Statement, 1, [], 0) of {'$done', []} -> []; {error, _} = E -> F([], E); {rows, [Row | []]} -> [F(ColumnNames, Row) | map_s(F, Statement)] end. %% %%-spec fetchone(statement()) -> tuple(). fetchone(Statement) -> case try_multi_step(Statement, 1, [], 0) of {'$done', []} -> ok; {error, _} = E -> E; {rows, [Row | []]} -> Row end. %% @doc Fetch all records %% @param Statement is prepared sql statement %% @spec fetchall(statement()) -> list(tuple()) | {error, term()}. -spec fetchall(statement()) -> list(tuple()) | {error, term()}. fetchall(Statement) -> fetchall(Statement, ?DEFAULT_CHUNK_SIZE). %% @doc Fetch all records %% @param Statement is prepared sql statement %% @param ChunkSize is a count of rows to read from sqlite and send to erlang process in one bulk. %% Decrease this value if rows are heavy. Default value is 5000 (DEFAULT_CHUNK_SIZE). %% @spec fetchall(statement()) -> list(tuple()) | {error, term()}. -spec fetchall(statement(), pos_integer()) -> list(tuple()) | {error, term()}. fetchall(Statement, ChunkSize) -> case fetchall_internal(Statement, ChunkSize, []) of {'$done', Rows} -> lists:reverse(Rows); {error, _} = E -> E end. %% return rows in revers order -spec fetchall_internal(statement(), pos_integer(), list(tuple())) -> {'$done', list(tuple())} | {error, term()}. fetchall_internal(Statement, ChunkSize, Rest) -> case try_multi_step(Statement, ChunkSize, Rest, 0) of {rows, Rows} -> fetchall_internal(Statement, ChunkSize, Rows); Else -> Else end. %% Try a number of steps, when the database is busy, %% return rows in revers order -spec try_multi_step(statement(), pos_integer(), list(tuple()), non_neg_integer()) -> {rows, list(tuple())} | {'$done', list(tuple())} | {error, term()}. try_multi_step(_Statement, _ChunkSize, _Rest, Tries) when Tries > 5 -> throw(too_many_tries); try_multi_step(Statement, ChunkSize, Rest, Tries) -> case multi_step(Statement, ChunkSize) of {'$busy', Rows} -> %% core can fetch a number of rows (rows < ChunkSize) per 'multi_step' call and then get busy... erlang:display({"busy", Tries}), timer:sleep(100 * Tries), try_multi_step(Statement, ChunkSize, Rows ++ Rest, Tries + 1); {rows, Rows} -> {rows, Rows ++ Rest}; {'$done', Rows} -> {'$done', Rows ++ Rest}; Else -> Else end. %% @doc Execute Sql statement, returns the number of affected rows. %% %% @spec exec(iolist(), connection()) -> integer() | {error, error_message()} exec(Sql, Connection) -> exec("begin", Connection, ?DEFAULT_TIMEOUT), exec(Sql, Connection, ?DEFAULT_TIMEOUT), exec("commit", Connection, ?DEFAULT_TIMEOUT). %% @doc Execute %% %% @spec exec(iolist(), connection(), timeout()) -> integer() | {error, error_message()} exec(Sql, {connection, _Ref, Connection}, Timeout) -> Ref = make_ref(), ok = esqlite3_nif:exec(Connection, Ref, self(), Sql), receive_answer(Ref, Timeout); %% @spec exec(iolist(), list(term()), connection()) -> integer() | {error, error_message()} exec(Sql, Params, {connection, _, _}=Connection) when is_list(Params) -> exec(Sql, Params, Connection, ?DEFAULT_TIMEOUT). %% @spec exec(iolist(), list(term()), connection(), timeout()) -> integer() | {error, error_message()} exec(Sql, Params, {connection, _, _}=Connection, Timeout) when is_list(Params) -> {ok, Statement} = prepare(Sql, Connection, Timeout), bind(Statement, Params), step(Statement, Timeout). %% @doc Return the number of affected rows of last statement. changes(Connection) -> changes(Connection, ?DEFAULT_TIMEOUT). %% @doc Return the number of affected rows of last statement. changes({connection, _Ref, Connection}, Timeout) -> Ref = make_ref(), ok = esqlite3_nif:changes(Connection, Ref, self()), receive_answer(Ref, Timeout). %% @doc Insert records, returns the last rowid. %% %% @spec insert(iolist(), connection()) -> {ok, integer()} | {error, error_message()} insert(Sql, Connection) -> insert(Sql, Connection, ?DEFAULT_TIMEOUT). %% @doc Insert %% %% @spec insert(iolist(), connection(), timeout()) -> {ok, integer()} | {error, error_message()} insert(Sql, {connection, _Ref, Connection}, Timeout) -> Ref = make_ref(), ok = esqlite3_nif:insert(Connection, Ref, self(), Sql), receive_answer(Ref, Timeout). %% @doc Get autocommit %% %% @spec get_autocommit(connection) -> true | false get_autocommit(Connection) -> get_autocommit(Connection, ?DEFAULT_TIMEOUT). get_autocommit({connection, _Ref, Connection}, Timeout) -> Ref = make_ref(), ok = esqlite3_nif:get_autocommit(Connection, Ref, self()), receive_answer(Ref, Timeout). %% @doc Prepare a statement %% %% @spec prepare(iolist(), connection()) -> {ok, prepared_statement()} | {error, error_message()} prepare(Sql, Connection) -> prepare(Sql, Connection, ?DEFAULT_TIMEOUT). %% @doc %% %% @spec(iolist(), connection(), timeout()) -> {ok, prepared_statement()} | {error, error_message()} prepare(Sql, {connection, _Ref, Connection}=C, Timeout) -> Ref = make_ref(), ok = esqlite3_nif:prepare(Connection, Ref, self(), Sql), case receive_answer(Ref, Timeout) of {ok, Stmt} -> {ok, {statement, Stmt, C}}; Else -> Else end. %% @doc Step %% %% @spec step(prepared_statement()) -> tuple() step(Stmt) -> step(Stmt, ?DEFAULT_TIMEOUT). %% @doc %% %% @spec step(prepared_statement(), timeout()) -> tuple() -spec step(term(), timeout()) -> tuple() | '$busy' | '$done'. step({statement, Stmt, {connection, _, Conn}}, Timeout) -> Ref = make_ref(), ok = esqlite3_nif:multi_step(Conn, Stmt, 1, Ref, self()), case receive_answer(Ref, Timeout) of {rows, [Row | []]} -> {row, Row}; {'$done', []} -> '$done'; {'$busy', []} -> '$busy'; Else -> Else end. %% make multiple sqlite steps per call %% return rows in reverse order multi_step(Stmt, ChunkSize) -> multi_step(Stmt, ChunkSize, ?DEFAULT_TIMEOUT). %% make multiple sqlite steps per call %% return rows in reverse order -spec multi_step(term(), pos_integer(), timeout()) -> {rows, list(tuple())} | {'$busy', list(tuple())} | {'$done', list(tuple())} | {error, term()}. multi_step({statement, Stmt, {connection, _, Conn}}, ChunkSize, Timeout) -> Ref = make_ref(), ok = esqlite3_nif:multi_step(Conn, Stmt, ChunkSize, Ref, self()), receive_answer(Ref, Timeout). %% @doc Reset the prepared statement back to its initial state. %% %% @spec reset(prepared_statement()) -> ok | {error, error_message()} reset({statement, Stmt, {connection, _, Conn}}) -> Ref = make_ref(), ok = esqlite3_nif:reset(Conn, Stmt, Ref, self()), receive_answer(Ref, ?DEFAULT_TIMEOUT). %% @doc Bind values to prepared statements %% %% @spec bind(prepared_statement(), value_list()) -> ok | {error, error_message()} bind(Stmt, Args) -> bind(Stmt, Args, ?DEFAULT_TIMEOUT). %% @doc Bind values to prepared statements %% %% @spec bind(prepared_statement(), [], timeout()) -> ok | {error, error_message()} bind({statement, Stmt, {connection, _, Conn}}, Args, Timeout) -> Ref = make_ref(), ok = esqlite3_nif:bind(Conn, Stmt, Ref, self(), Args), receive_answer(Ref, Timeout). %% @doc Return the column names of the prepared statement. %% -spec column_names(statement()) -> {atom()}. column_names(Stmt) -> column_names(Stmt, ?DEFAULT_TIMEOUT). -spec column_names(statement(), timeout()) -> {atom()}. column_names({statement, Stmt, {connection, _, Conn}}, Timeout) -> Ref = make_ref(), ok = esqlite3_nif:column_names(Conn, Stmt, Ref, self()), receive_answer(Ref, Timeout). %% @doc Return the column types of the prepared statement. %% -spec column_types(statement()) -> {atom()}. column_types(Stmt) -> column_types(Stmt, ?DEFAULT_TIMEOUT). -spec column_types(statement(), timeout()) -> {atom()}. column_types({statement, Stmt, {connection, _, Conn}}, Timeout) -> Ref = make_ref(), ok = esqlite3_nif:column_types(Conn, Stmt, Ref, self()), receive_answer(Ref, Timeout). %% @doc Close the database %% %% @spec close(connection()) -> ok | {error, error_message()} -spec close(connection()) -> ok | {error, _}. close(Connection) -> close(Connection, ?DEFAULT_TIMEOUT). %% @doc Close the database %% %% @spec close(connection(), integer()) -> ok | {error, error_message()} -spec close(connection(), timeout()) -> ok | {error, _}. close({connection, _Ref, Connection}, Timeout) -> Ref = make_ref(), ok = esqlite3_nif:close(Connection, Ref, self()), receive_answer(Ref, Timeout). %% Internal functions receive_answer(Ref, Timeout) -> Start = os:timestamp(), receive {esqlite3, Ref, Resp} -> Resp; {esqlite3, _, _}=StaleAnswer -> error_logger:warning_msg("Esqlite3: Ignoring stale answer ~p~n", [StaleAnswer]), PassedMics = timer:now_diff(os:timestamp(), Start) div 1000, NewTimeout = case Timeout - PassedMics of Passed when Passed < 0 -> 0; TO -> TO end, receive_answer(Ref, NewTimeout) after Timeout -> throw({error, timeout, Ref}) end.