summaryrefslogtreecommitdiffstats
path: root/ffi/lang/python/sequoia/core.py
blob: b985247bf442ca9afd825b9b55076fe5b5026ad3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import io
from enum import Enum

from _sequoia import ffi, lib
from .error import Error
from .glue import SQObject, invoke

class NetworkPolicy(Enum):
    Offline = lib.SQ_NETWORK_POLICY_OFFLINE
    Anonymized = lib.SQ_NETWORK_POLICY_ANONYMIZED
    Encrypted = lib.SQ_NETWORK_POLICY_ENCRYPTED
    Insecure = lib.SQ_NETWORK_POLICY_INSECURE

class IPCPolicy(Enum):
    External = lib.SQ_IPC_POLICY_EXTERNAL
    Internal = lib.SQ_IPC_POLICY_INTERNAL
    Robust = lib.SQ_IPC_POLICY_ROBUST

class Context(SQObject):
    _del = lib.sq_context_free
    def __init__(self,
                 home=None,
                 network_policy=NetworkPolicy.Encrypted,
                 ipc_policy=IPCPolicy.Robust,
                 ephemeral=False):
        cfg = lib.sq_context_configure()
        if home:
            lib.sq_config_home(cfg, home.encode())
        lib.sq_config_network_policy(cfg, network_policy.value)
        lib.sq_config_ipc_policy(cfg, ipc_policy.value)
        if ephemeral:
            lib.sq_config_ephemeral(cfg)
        err = ffi.new("pgp_error_t[1]")
        o = lib.sq_config_build(cfg, err)
        if o == ffi.NULL:
            raise Error._from(err[0])
        super(Context, self).__init__(o)

class AbstractReader(SQObject, io.RawIOBase):
    _del = lib.pgp_reader_free

    def readable(self):
        return True
    def writable(self):
        return False

    def readinto(self, buf):
        return invoke(
            lib.pgp_reader_read,
            self.ref(),
            ffi.cast("uint8_t *", ffi.from_buffer(buf)),
            len(buf))

    def close(self):
        self._delete()

    # Implement the context manager protocol.
    def __enter__(self):
        return self
    def __exit__(self, *args):
        self.close()
        return False

class Reader(AbstractReader):
    @classmethod
    def open(cls, ctx, filename):
        return Reader(
            invoke(lib.pgp_reader_from_file,
                   filename.encode()),
            context=ctx)

    @classmethod
    def from_fd(cls, ctx, fd):
        return Reader(lib.pgp_reader_from_fd(fd),
                      context=ctx)

    @classmethod
    def from_bytes(cls, ctx, buf):
        return Reader(
            lib.pgp_reader_from_bytes(
                ffi.cast("uint8_t *", ffi.from_buffer(buf)), len(buf)),
            context=ctx)

class AbstractWriter(SQObject, io.RawIOBase):
    _del = lib.pgp_writer_free

    def readable(self):
        return False
    def writable(self):
        return True

    def write(self, buf):
        return invoke(
            lib.pgp_writer_write,
            self.ref(),
            ffi.cast("const uint8_t *", ffi.from_buffer(buf)),
            len(buf))

    def close(self):
        self._delete()

    # Implement the context manager protocol.
    def __enter__(self):
        return self
    def __exit__(self, *args):
        self.close()
        return False

class Writer(AbstractWriter):
    @classmethod
    def open(cls, ctx, filename):
        return Writer(
            invoke(lib.pgp_writer_from_file,
                   filename.encode()),
            context=ctx)

    @classmethod
    def from_fd(cls, ctx, fd):
        return Writer(lib.pgp_writer_from_fd(fd),
                      context=ctx)

    @classmethod
    def from_bytes(cls, ctx, buf):
        return Writer(
            lib.pgp_writer_from_bytes(
                ffi.cast("uint8_t *", ffi.from_buffer(buf)), len(buf)),
            context=ctx)