This repository was archived by the owner on Nov 8, 2022. It is now read-only.
forked from GoogleCloudPlatform/professional-services
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdofns.py
More file actions
61 lines (51 loc) · 2.57 KB
/
dofns.py
File metadata and controls
61 lines (51 loc) · 2.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# Copyright 2020 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import List
import apache_beam as beam
from apache_beam.io.filesystems import FileSystems
from sideinput_refresh import util
@beam.typehints.with_input_types(bytes)
@beam.typehints.with_output_types(beam.pvalue.TaggedOutput)
class SplitToMultiple(beam.DoFn):
"""Generates a base path for each side input type combining root path received via file notification subscription
and side input type. PCollection recieved will contain only single element representing base path and will
be fired once every x hours matching the side input refresh frequency
Attributes:
sideinput_types: List of Side input types
file_prefix: file_prefix matching required files. Default is * indicating all files
"""
def __init__(self, sideinput_types: List[str], file_prefix: str = "*"):
self.sideinput_types = sideinput_types
self.file_prefix = file_prefix
def process(self,
element,
timestamp=beam.DoFn.TimestampParam,
window=beam.DoFn.WindowParam,
pane_info=beam.DoFn.PaneInfoParam):
# Logging to audit triggering of side input refresh process. Statement will be logged only whenever the pubsub notification
# triggers side input refresh process (i.e normally once in every x hours)
if isinstance(window, beam.transforms.window.GlobalWindow):
logging.info(
f"(Re)loading side input data from basepath {element.decode()} for global window: {timestamp} - {window}"
)
else:
logging.info(
f"(Re)loading side input data from basepath {element.decode()} for window: {util.get_formatted_time(window.start)} - {util.get_formatted_time(window.end)}"
)
for sideinput_type in self.sideinput_types:
yield beam.pvalue.TaggedOutput(
sideinput_type,
FileSystems.join(element.decode(), sideinput_type,
self.file_prefix))